]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
7c673cae FG |
2 | // vim: ts=8 sw=2 smarttab |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include <atomic> | |
18 | #include <iostream> | |
f6b5b4d7 | 19 | #include <memory> |
7c673cae FG |
20 | #include <unistd.h> |
21 | #include <stdlib.h> | |
22 | #include <time.h> | |
11fdf7f2 TL |
23 | #include <set> |
24 | #include <list> | |
9f95a23c | 25 | #include "common/ceph_mutex.h" |
7c673cae FG |
26 | #include "common/ceph_argparse.h" |
27 | #include "global/global_init.h" | |
28 | #include "msg/Dispatcher.h" | |
29 | #include "msg/msg_types.h" | |
30 | #include "msg/Message.h" | |
31 | #include "msg/Messenger.h" | |
32 | #include "msg/Connection.h" | |
33 | #include "messages/MPing.h" | |
34 | #include "messages/MCommand.h" | |
35 | ||
36 | #include <boost/random/mersenne_twister.hpp> | |
37 | #include <boost/random/uniform_int.hpp> | |
38 | #include <boost/random/binomial_distribution.hpp> | |
39 | #include <gtest/gtest.h> | |
40 | ||
41 | typedef boost::mt11213b gen_type; | |
42 | ||
43 | #include "common/dout.h" | |
11fdf7f2 TL |
44 | #include "include/ceph_assert.h" |
45 | ||
46 | #include "auth/DummyAuth.h" | |
7c673cae FG |
47 | |
48 | #define dout_subsys ceph_subsys_ms | |
49 | #undef dout_prefix | |
50 | #define dout_prefix *_dout << " ceph_test_msgr " | |
51 | ||
52 | ||
7c673cae FG |
53 | #define CHECK_AND_WAIT_TRUE(expr) do { \ |
54 | int n = 1000; \ | |
55 | while (--n) { \ | |
56 | if (expr) \ | |
57 | break; \ | |
58 | usleep(1000); \ | |
59 | } \ | |
60 | } while(0); | |
61 | ||
62 | class MessengerTest : public ::testing::TestWithParam<const char*> { | |
63 | public: | |
11fdf7f2 | 64 | DummyAuthClientServer dummy_auth; |
7c673cae FG |
65 | Messenger *server_msgr; |
66 | Messenger *client_msgr; | |
67 | ||
11fdf7f2 TL |
68 | MessengerTest() : dummy_auth(g_ceph_context), |
69 | server_msgr(NULL), client_msgr(NULL) { | |
70 | dummy_auth.auth_registry.refresh_config(); | |
71 | } | |
7c673cae FG |
72 | void SetUp() override { |
73 | lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl; | |
f67539c2 TL |
74 | server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid()); |
75 | client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid()); | |
7c673cae FG |
76 | server_msgr->set_default_policy(Messenger::Policy::stateless_server(0)); |
77 | client_msgr->set_default_policy(Messenger::Policy::lossy_client(0)); | |
11fdf7f2 TL |
78 | server_msgr->set_auth_client(&dummy_auth); |
79 | server_msgr->set_auth_server(&dummy_auth); | |
80 | client_msgr->set_auth_client(&dummy_auth); | |
81 | client_msgr->set_auth_server(&dummy_auth); | |
9f95a23c | 82 | server_msgr->set_require_authorizer(false); |
7c673cae FG |
83 | } |
84 | void TearDown() override { | |
85 | ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0); | |
86 | ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0); | |
87 | delete server_msgr; | |
88 | delete client_msgr; | |
89 | } | |
90 | ||
91 | }; | |
92 | ||
93 | ||
94 | class FakeDispatcher : public Dispatcher { | |
95 | public: | |
96 | struct Session : public RefCountedObject { | |
97 | atomic<uint64_t> count; | |
98 | ConnectionRef con; | |
99 | ||
100 | explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) { | |
101 | } | |
102 | uint64_t get_count() { return count; } | |
103 | }; | |
104 | ||
9f95a23c TL |
105 | ceph::mutex lock = ceph::make_mutex("FakeDispatcher::lock"); |
106 | ceph::condition_variable cond; | |
7c673cae FG |
107 | bool is_server; |
108 | bool got_new; | |
109 | bool got_remote_reset; | |
110 | bool got_connect; | |
111 | bool loopback; | |
11fdf7f2 | 112 | entity_addrvec_t last_accept; |
9f95a23c | 113 | ConnectionRef *last_accept_con_ptr = nullptr; |
7c673cae | 114 | |
9f95a23c | 115 | explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), |
7c673cae | 116 | is_server(s), got_new(false), got_remote_reset(false), |
11fdf7f2 | 117 | got_connect(false), loopback(false) { |
11fdf7f2 | 118 | } |
7c673cae FG |
119 | bool ms_can_fast_dispatch_any() const override { return true; } |
120 | bool ms_can_fast_dispatch(const Message *m) const override { | |
121 | switch (m->get_type()) { | |
122 | case CEPH_MSG_PING: | |
123 | return true; | |
124 | default: | |
125 | return false; | |
126 | } | |
127 | } | |
128 | ||
129 | void ms_handle_fast_connect(Connection *con) override { | |
9f95a23c | 130 | std::scoped_lock l{lock}; |
7c673cae | 131 | lderr(g_ceph_context) << __func__ << " " << con << dendl; |
11fdf7f2 | 132 | auto s = con->get_priv(); |
7c673cae | 133 | if (!s) { |
11fdf7f2 TL |
134 | auto session = new Session(con); |
135 | con->set_priv(RefCountedPtr{session, false}); | |
136 | lderr(g_ceph_context) << __func__ << " con: " << con | |
137 | << " count: " << session->count << dendl; | |
7c673cae | 138 | } |
7c673cae | 139 | got_connect = true; |
9f95a23c | 140 | cond.notify_all(); |
7c673cae FG |
141 | } |
142 | void ms_handle_fast_accept(Connection *con) override { | |
11fdf7f2 | 143 | last_accept = con->get_peer_addrs(); |
9f95a23c TL |
144 | if (last_accept_con_ptr) { |
145 | *last_accept_con_ptr = con; | |
146 | } | |
11fdf7f2 TL |
147 | if (!con->get_priv()) { |
148 | con->set_priv(RefCountedPtr{new Session(con), false}); | |
7c673cae | 149 | } |
7c673cae FG |
150 | } |
151 | bool ms_dispatch(Message *m) override { | |
11fdf7f2 TL |
152 | auto priv = m->get_connection()->get_priv(); |
153 | auto s = static_cast<Session*>(priv.get()); | |
7c673cae FG |
154 | if (!s) { |
155 | s = new Session(m->get_connection()); | |
11fdf7f2 TL |
156 | priv.reset(s, false); |
157 | m->get_connection()->set_priv(priv); | |
7c673cae | 158 | } |
7c673cae FG |
159 | s->count++; |
160 | lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; | |
161 | if (is_server) { | |
162 | reply_message(m); | |
163 | } | |
9f95a23c | 164 | std::lock_guard l{lock}; |
7c673cae | 165 | got_new = true; |
9f95a23c | 166 | cond.notify_all(); |
7c673cae FG |
167 | m->put(); |
168 | return true; | |
169 | } | |
170 | bool ms_handle_reset(Connection *con) override { | |
9f95a23c | 171 | std::lock_guard l{lock}; |
7c673cae | 172 | lderr(g_ceph_context) << __func__ << " " << con << dendl; |
11fdf7f2 TL |
173 | auto priv = con->get_priv(); |
174 | if (auto s = static_cast<Session*>(priv.get()); s) { | |
175 | s->con.reset(); // break con <-> session ref cycle | |
176 | con->set_priv(nullptr); // break ref <-> session cycle, if any | |
7c673cae FG |
177 | } |
178 | return true; | |
179 | } | |
180 | void ms_handle_remote_reset(Connection *con) override { | |
9f95a23c | 181 | std::lock_guard l{lock}; |
7c673cae | 182 | lderr(g_ceph_context) << __func__ << " " << con << dendl; |
11fdf7f2 TL |
183 | auto priv = con->get_priv(); |
184 | if (auto s = static_cast<Session*>(priv.get()); s) { | |
185 | s->con.reset(); // break con <-> session ref cycle | |
186 | con->set_priv(nullptr); // break ref <-> session cycle, if any | |
7c673cae FG |
187 | } |
188 | got_remote_reset = true; | |
9f95a23c | 189 | cond.notify_all(); |
7c673cae FG |
190 | } |
191 | bool ms_handle_refused(Connection *con) override { | |
192 | return false; | |
193 | } | |
194 | void ms_fast_dispatch(Message *m) override { | |
11fdf7f2 TL |
195 | auto priv = m->get_connection()->get_priv(); |
196 | auto s = static_cast<Session*>(priv.get()); | |
7c673cae FG |
197 | if (!s) { |
198 | s = new Session(m->get_connection()); | |
11fdf7f2 TL |
199 | priv.reset(s, false); |
200 | m->get_connection()->set_priv(priv); | |
7c673cae | 201 | } |
7c673cae FG |
202 | s->count++; |
203 | lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; | |
204 | if (is_server) { | |
205 | if (loopback) | |
11fdf7f2 | 206 | ceph_assert(m->get_source().is_osd()); |
7c673cae FG |
207 | else |
208 | reply_message(m); | |
209 | } else if (loopback) { | |
11fdf7f2 | 210 | ceph_assert(m->get_source().is_client()); |
7c673cae FG |
211 | } |
212 | m->put(); | |
9f95a23c | 213 | std::lock_guard l{lock}; |
7c673cae | 214 | got_new = true; |
9f95a23c | 215 | cond.notify_all(); |
7c673cae FG |
216 | } |
217 | ||
11fdf7f2 TL |
218 | int ms_handle_authentication(Connection *con) override { |
219 | return 1; | |
7c673cae FG |
220 | } |
221 | ||
222 | void reply_message(Message *m) { | |
223 | MPing *rm = new MPing(); | |
224 | m->get_connection()->send_message(rm); | |
225 | } | |
226 | }; | |
227 | ||
228 | typedef FakeDispatcher::Session Session; | |
229 | ||
11fdf7f2 TL |
230 | struct TestInterceptor : public Interceptor { |
231 | ||
232 | bool step_waiting = false; | |
233 | bool waiting = true; | |
234 | std::map<Connection *, uint32_t> current_step; | |
235 | std::map<Connection *, std::list<uint32_t>> step_history; | |
236 | std::map<uint32_t, std::optional<ACTION>> decisions; | |
237 | std::set<uint32_t> breakpoints; | |
238 | ||
239 | uint32_t count_step(Connection *conn, uint32_t step) { | |
240 | uint32_t count = 0; | |
241 | for (auto s : step_history[conn]) { | |
242 | if (s == step) { | |
243 | count++; | |
244 | } | |
245 | } | |
246 | return count; | |
247 | } | |
248 | ||
249 | void breakpoint(uint32_t step) { | |
250 | breakpoints.insert(step); | |
251 | } | |
252 | ||
253 | void remove_bp(uint32_t step) { | |
254 | breakpoints.erase(step); | |
255 | } | |
256 | ||
257 | Connection *wait(uint32_t step, Connection *conn=nullptr) { | |
258 | std::unique_lock<std::mutex> l(lock); | |
259 | while(true) { | |
260 | if (conn) { | |
261 | auto it = current_step.find(conn); | |
262 | if (it != current_step.end()) { | |
263 | if (it->second == step) { | |
264 | break; | |
265 | } | |
266 | } | |
267 | } else { | |
268 | for (auto it : current_step) { | |
269 | if (it.second == step) { | |
270 | conn = it.first; | |
271 | break; | |
272 | } | |
273 | } | |
274 | if (conn) { | |
275 | break; | |
276 | } | |
277 | } | |
278 | step_waiting = true; | |
279 | cond_var.wait(l); | |
280 | } | |
281 | step_waiting = false; | |
282 | return conn; | |
283 | } | |
284 | ||
285 | ACTION wait_for_decision(uint32_t step, std::unique_lock<std::mutex> &l) { | |
286 | if (decisions[step]) { | |
287 | return *(decisions[step]); | |
288 | } | |
289 | waiting = true; | |
9f95a23c | 290 | cond_var.wait(l, [this] { return !waiting; }); |
11fdf7f2 TL |
291 | return *(decisions[step]); |
292 | } | |
293 | ||
294 | void proceed(uint32_t step, ACTION decision) { | |
295 | std::unique_lock<std::mutex> l(lock); | |
296 | decisions[step] = decision; | |
297 | if (waiting) { | |
298 | waiting = false; | |
299 | cond_var.notify_one(); | |
300 | } | |
301 | } | |
302 | ||
303 | ACTION intercept(Connection *conn, uint32_t step) override { | |
304 | lderr(g_ceph_context) << __func__ << " conn(" << conn | |
305 | << ") intercept called on step=" << step << dendl; | |
306 | ||
307 | { | |
308 | std::unique_lock<std::mutex> l(lock); | |
309 | step_history[conn].push_back(step); | |
310 | current_step[conn] = step; | |
311 | if (step_waiting) { | |
312 | cond_var.notify_one(); | |
313 | } | |
314 | } | |
315 | ||
316 | std::unique_lock<std::mutex> l(lock); | |
317 | ACTION decision = ACTION::CONTINUE; | |
318 | if (breakpoints.find(step) != breakpoints.end()) { | |
319 | lderr(g_ceph_context) << __func__ << " conn(" << conn | |
320 | << ") pausing on step=" << step << dendl; | |
321 | decision = wait_for_decision(step, l); | |
322 | } else { | |
323 | if (decisions[step]) { | |
324 | decision = *(decisions[step]); | |
325 | } | |
326 | } | |
327 | lderr(g_ceph_context) << __func__ << " conn(" << conn | |
328 | << ") resuming step=" << step << " with decision=" | |
329 | << decision << dendl; | |
330 | decisions[step].reset(); | |
331 | return decision; | |
332 | } | |
333 | ||
334 | }; | |
335 | ||
336 | /** | |
337 | * Scenario: A connects to B, and B connects to A at the same time. | |
338 | */ | |
339 | TEST_P(MessengerTest, ConnectionRaceTest) { | |
11fdf7f2 TL |
340 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); |
341 | ||
342 | TestInterceptor *cli_interceptor = new TestInterceptor(); | |
343 | TestInterceptor *srv_interceptor = new TestInterceptor(); | |
344 | ||
345 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0)); | |
346 | server_msgr->interceptor = srv_interceptor; | |
347 | ||
348 | client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0)); | |
349 | client_msgr->interceptor = cli_interceptor; | |
350 | ||
351 | entity_addr_t bind_addr; | |
352 | bind_addr.parse("v2:127.0.0.1:3300"); | |
353 | server_msgr->bind(bind_addr); | |
354 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
355 | server_msgr->start(); | |
356 | ||
357 | bind_addr.parse("v2:127.0.0.1:3301"); | |
358 | client_msgr->bind(bind_addr); | |
359 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
360 | client_msgr->start(); | |
361 | ||
362 | // pause before sending client_ident message | |
f67539c2 | 363 | cli_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY); |
11fdf7f2 | 364 | // pause before sending client_ident message |
f67539c2 | 365 | srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY); |
11fdf7f2 TL |
366 | |
367 | ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), | |
368 | server_msgr->get_myaddrs()); | |
369 | MPing *m1 = new MPing(); | |
370 | ASSERT_EQ(c2s->send_message(m1), 0); | |
371 | ||
372 | ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(), | |
373 | client_msgr->get_myaddrs()); | |
374 | MPing *m2 = new MPing(); | |
375 | ASSERT_EQ(s2c->send_message(m2), 0); | |
376 | ||
f67539c2 TL |
377 | cli_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, c2s.get()); |
378 | srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, s2c.get()); | |
11fdf7f2 TL |
379 | |
380 | // at this point both connections (A->B, B->A) are paused just before sending | |
381 | // the client_ident message. | |
382 | ||
f67539c2 TL |
383 | cli_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY); |
384 | srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY); | |
11fdf7f2 | 385 | |
f67539c2 TL |
386 | cli_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE); |
387 | srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE); | |
11fdf7f2 TL |
388 | |
389 | { | |
9f95a23c TL |
390 | std::unique_lock l{cli_dispatcher.lock}; |
391 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
392 | cli_dispatcher.got_new = false; |
393 | } | |
394 | ||
395 | { | |
9f95a23c TL |
396 | std::unique_lock l{srv_dispatcher.lock}; |
397 | srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); | |
11fdf7f2 TL |
398 | srv_dispatcher.got_new = false; |
399 | } | |
400 | ||
401 | ASSERT_TRUE(s2c->is_connected()); | |
402 | ASSERT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count()); | |
403 | ASSERT_TRUE(s2c->peer_is_client()); | |
404 | ||
405 | ASSERT_TRUE(c2s->is_connected()); | |
406 | ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); | |
407 | ASSERT_TRUE(c2s->peer_is_osd()); | |
408 | ||
409 | client_msgr->shutdown(); | |
410 | client_msgr->wait(); | |
411 | server_msgr->shutdown(); | |
412 | server_msgr->wait(); | |
413 | ||
414 | delete cli_interceptor; | |
415 | delete srv_interceptor; | |
416 | } | |
417 | ||
f6b5b4d7 TL |
418 | /** |
419 | * Scenario: A connects to B, and B connects to A at the same time. | |
420 | * The first (A -> B) connection gets to message flow handshake, the | |
421 | * second (B -> A) connection is stuck waiting for a banner from A. | |
422 | * After A sends client_ident to B, the first connection wins and B | |
423 | * calls reuse_connection() to replace the second connection's socket | |
424 | * while the second connection is still in BANNER_CONNECTING. | |
425 | */ | |
426 | TEST_P(MessengerTest, ConnectionRaceReuseBannerTest) { | |
427 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); | |
428 | ||
429 | auto cli_interceptor = std::make_unique<TestInterceptor>(); | |
430 | auto srv_interceptor = std::make_unique<TestInterceptor>(); | |
431 | ||
432 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, | |
433 | Messenger::Policy::lossless_peer_reuse(0)); | |
434 | server_msgr->interceptor = srv_interceptor.get(); | |
435 | ||
436 | client_msgr->set_policy(entity_name_t::TYPE_OSD, | |
437 | Messenger::Policy::lossless_peer_reuse(0)); | |
438 | client_msgr->interceptor = cli_interceptor.get(); | |
439 | ||
440 | entity_addr_t bind_addr; | |
441 | bind_addr.parse("v2:127.0.0.1:3300"); | |
442 | server_msgr->bind(bind_addr); | |
443 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
444 | server_msgr->start(); | |
445 | ||
446 | bind_addr.parse("v2:127.0.0.1:3301"); | |
447 | client_msgr->bind(bind_addr); | |
448 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
449 | client_msgr->start(); | |
450 | ||
451 | // pause before sending client_ident message | |
f67539c2 | 452 | srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY); |
f6b5b4d7 TL |
453 | |
454 | ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(), | |
455 | client_msgr->get_myaddrs()); | |
456 | MPing *m1 = new MPing(); | |
457 | ASSERT_EQ(s2c->send_message(m1), 0); | |
458 | ||
f67539c2 TL |
459 | srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY); |
460 | srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY); | |
f6b5b4d7 TL |
461 | |
462 | // pause before sending banner | |
f67539c2 | 463 | cli_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING); |
f6b5b4d7 TL |
464 | |
465 | ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), | |
466 | server_msgr->get_myaddrs()); | |
467 | MPing *m2 = new MPing(); | |
468 | ASSERT_EQ(c2s->send_message(m2), 0); | |
469 | ||
f67539c2 TL |
470 | cli_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING); |
471 | cli_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING); | |
f6b5b4d7 TL |
472 | |
473 | // second connection is in BANNER_CONNECTING, ensure it stays so | |
474 | // and send client_ident | |
f67539c2 TL |
475 | srv_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE); |
476 | srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE); | |
f6b5b4d7 TL |
477 | |
478 | // handle client_ident -- triggers reuse_connection() with exproto | |
479 | // in BANNER_CONNECTING | |
f67539c2 TL |
480 | cli_interceptor->breakpoint(Interceptor::STEP::READY); |
481 | cli_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING, Interceptor::ACTION::CONTINUE); | |
f6b5b4d7 | 482 | |
f67539c2 TL |
483 | cli_interceptor->wait(Interceptor::STEP::READY); |
484 | cli_interceptor->remove_bp(Interceptor::STEP::READY); | |
f6b5b4d7 TL |
485 | |
486 | // first connection is in READY | |
f67539c2 TL |
487 | Connection *s2c_accepter = srv_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE); |
488 | srv_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE); | |
f6b5b4d7 | 489 | |
f67539c2 TL |
490 | srv_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE, Interceptor::ACTION::CONTINUE); |
491 | cli_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE); | |
f6b5b4d7 TL |
492 | |
493 | { | |
494 | std::unique_lock l{cli_dispatcher.lock}; | |
495 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
496 | cli_dispatcher.got_new = false; | |
497 | } | |
498 | ||
499 | { | |
500 | std::unique_lock l{srv_dispatcher.lock}; | |
501 | srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); | |
502 | srv_dispatcher.got_new = false; | |
503 | } | |
504 | ||
505 | EXPECT_TRUE(s2c->is_connected()); | |
506 | EXPECT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count()); | |
507 | EXPECT_TRUE(s2c->peer_is_client()); | |
508 | ||
509 | EXPECT_TRUE(c2s->is_connected()); | |
510 | EXPECT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); | |
511 | EXPECT_TRUE(c2s->peer_is_osd()); | |
512 | ||
513 | // closed in reuse_connection() -- EPIPE when writing banner/hello | |
514 | EXPECT_FALSE(s2c_accepter->is_connected()); | |
515 | ||
516 | // established exactly once, never faulted and reconnected | |
f67539c2 TL |
517 | EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::START_CLIENT_BANNER_EXCHANGE), 1u); |
518 | EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 0u); | |
519 | EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::READY), 1u); | |
f6b5b4d7 TL |
520 | |
521 | client_msgr->shutdown(); | |
522 | client_msgr->wait(); | |
523 | server_msgr->shutdown(); | |
524 | server_msgr->wait(); | |
525 | } | |
526 | ||
11fdf7f2 TL |
527 | /** |
528 | * Scenario: | |
529 | * - A connects to B | |
530 | * - A sends client_ident to B | |
531 | * - B fails before sending server_ident to A | |
532 | * - A reconnects | |
533 | */ | |
534 | TEST_P(MessengerTest, MissingServerIdenTest) { | |
11fdf7f2 TL |
535 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); |
536 | ||
537 | TestInterceptor *cli_interceptor = new TestInterceptor(); | |
538 | TestInterceptor *srv_interceptor = new TestInterceptor(); | |
539 | ||
540 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0)); | |
541 | server_msgr->interceptor = srv_interceptor; | |
542 | ||
543 | client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0)); | |
544 | client_msgr->interceptor = cli_interceptor; | |
545 | ||
546 | entity_addr_t bind_addr; | |
547 | bind_addr.parse("v2:127.0.0.1:3300"); | |
548 | server_msgr->bind(bind_addr); | |
549 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
550 | server_msgr->start(); | |
551 | ||
552 | bind_addr.parse("v2:127.0.0.1:3301"); | |
553 | client_msgr->bind(bind_addr); | |
554 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
555 | client_msgr->start(); | |
556 | ||
f67539c2 TL |
557 | // pause before sending server_ident message |
558 | srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY); | |
11fdf7f2 TL |
559 | |
560 | ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), | |
561 | server_msgr->get_myaddrs()); | |
562 | MPing *m1 = new MPing(); | |
563 | ASSERT_EQ(c2s->send_message(m1), 0); | |
564 | ||
f67539c2 TL |
565 | Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY); |
566 | srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY); | |
11fdf7f2 TL |
567 | |
568 | // We inject a message from this side of the connection to force it to be | |
569 | // in standby when we inject the failure below | |
570 | MPing *m2 = new MPing(); | |
571 | ASSERT_EQ(c2s_accepter->send_message(m2), 0); | |
572 | ||
f67539c2 | 573 | srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL); |
11fdf7f2 TL |
574 | |
575 | { | |
9f95a23c TL |
576 | std::unique_lock l{srv_dispatcher.lock}; |
577 | srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); | |
11fdf7f2 TL |
578 | srv_dispatcher.got_new = false; |
579 | } | |
580 | ||
581 | { | |
9f95a23c TL |
582 | std::unique_lock l{cli_dispatcher.lock}; |
583 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
584 | cli_dispatcher.got_new = false; |
585 | } | |
586 | ||
587 | ASSERT_TRUE(c2s->is_connected()); | |
588 | ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); | |
589 | ASSERT_TRUE(c2s->peer_is_osd()); | |
590 | ||
591 | ASSERT_TRUE(c2s_accepter->is_connected()); | |
592 | ASSERT_EQ(1u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count()); | |
593 | ASSERT_TRUE(c2s_accepter->peer_is_client()); | |
594 | ||
595 | client_msgr->shutdown(); | |
596 | client_msgr->wait(); | |
597 | server_msgr->shutdown(); | |
598 | server_msgr->wait(); | |
599 | ||
600 | delete cli_interceptor; | |
601 | delete srv_interceptor; | |
602 | } | |
603 | ||
604 | /** | |
605 | * Scenario: | |
606 | * - A connects to B | |
607 | * - A sends client_ident to B | |
608 | * - B fails before sending server_ident to A | |
609 | * - A goes to standby | |
610 | * - B reconnects to A | |
611 | */ | |
612 | TEST_P(MessengerTest, MissingServerIdenTest2) { | |
11fdf7f2 TL |
613 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); |
614 | ||
615 | TestInterceptor *cli_interceptor = new TestInterceptor(); | |
616 | TestInterceptor *srv_interceptor = new TestInterceptor(); | |
617 | ||
618 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0)); | |
619 | server_msgr->interceptor = srv_interceptor; | |
620 | ||
621 | client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); | |
622 | client_msgr->interceptor = cli_interceptor; | |
623 | ||
624 | entity_addr_t bind_addr; | |
625 | bind_addr.parse("v2:127.0.0.1:3300"); | |
626 | server_msgr->bind(bind_addr); | |
627 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
628 | server_msgr->start(); | |
629 | ||
630 | bind_addr.parse("v2:127.0.0.1:3301"); | |
631 | client_msgr->bind(bind_addr); | |
632 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
633 | client_msgr->start(); | |
634 | ||
f67539c2 TL |
635 | // pause before sending server_ident message |
636 | srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY); | |
11fdf7f2 TL |
637 | |
638 | ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), | |
639 | server_msgr->get_myaddrs()); | |
640 | ||
f67539c2 TL |
641 | Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY); |
642 | srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY); | |
11fdf7f2 TL |
643 | |
644 | // We inject a message from this side of the connection to force it to be | |
645 | // in standby when we inject the failure below | |
646 | MPing *m2 = new MPing(); | |
647 | ASSERT_EQ(c2s_accepter->send_message(m2), 0); | |
648 | ||
f67539c2 | 649 | srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL); |
11fdf7f2 TL |
650 | |
651 | { | |
9f95a23c TL |
652 | std::unique_lock l{cli_dispatcher.lock}; |
653 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
654 | cli_dispatcher.got_new = false; |
655 | } | |
656 | ||
657 | ASSERT_TRUE(c2s->is_connected()); | |
658 | ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); | |
659 | ASSERT_TRUE(c2s->peer_is_osd()); | |
660 | ||
661 | ASSERT_TRUE(c2s_accepter->is_connected()); | |
662 | ASSERT_EQ(0u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count()); | |
663 | ASSERT_TRUE(c2s_accepter->peer_is_client()); | |
664 | ||
665 | client_msgr->shutdown(); | |
666 | client_msgr->wait(); | |
667 | server_msgr->shutdown(); | |
668 | server_msgr->wait(); | |
669 | ||
670 | delete cli_interceptor; | |
671 | delete srv_interceptor; | |
672 | } | |
673 | ||
674 | /** | |
675 | * Scenario: | |
676 | * - A connects to B | |
677 | * - A and B exchange messages | |
678 | * - A fails | |
679 | * - B goes into standby | |
680 | * - A reconnects | |
681 | */ | |
682 | TEST_P(MessengerTest, ReconnectTest) { | |
11fdf7f2 TL |
683 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); |
684 | ||
685 | TestInterceptor *cli_interceptor = new TestInterceptor(); | |
686 | TestInterceptor *srv_interceptor = new TestInterceptor(); | |
687 | ||
688 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0)); | |
689 | server_msgr->interceptor = srv_interceptor; | |
690 | ||
691 | client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); | |
692 | client_msgr->interceptor = cli_interceptor; | |
693 | ||
694 | entity_addr_t bind_addr; | |
695 | bind_addr.parse("v2:127.0.0.1:3300"); | |
696 | server_msgr->bind(bind_addr); | |
697 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
698 | server_msgr->start(); | |
699 | ||
700 | bind_addr.parse("v2:127.0.0.1:3301"); | |
701 | client_msgr->bind(bind_addr); | |
702 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
703 | client_msgr->start(); | |
704 | ||
705 | ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), | |
706 | server_msgr->get_myaddrs()); | |
707 | ||
708 | MPing *m1 = new MPing(); | |
709 | ASSERT_EQ(c2s->send_message(m1), 0); | |
710 | ||
711 | { | |
9f95a23c TL |
712 | std::unique_lock l{cli_dispatcher.lock}; |
713 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
714 | cli_dispatcher.got_new = false; |
715 | } | |
716 | ||
717 | ASSERT_TRUE(c2s->is_connected()); | |
718 | ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); | |
719 | ASSERT_TRUE(c2s->peer_is_osd()); | |
720 | ||
f67539c2 | 721 | cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE); |
11fdf7f2 TL |
722 | |
723 | MPing *m2 = new MPing(); | |
724 | ASSERT_EQ(c2s->send_message(m2), 0); | |
725 | ||
f67539c2 TL |
726 | cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get()); |
727 | cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE); | |
11fdf7f2 TL |
728 | |
729 | // at this point client and server are connected together | |
730 | ||
f67539c2 | 731 | srv_interceptor->breakpoint(Interceptor::STEP::READY); |
11fdf7f2 TL |
732 | |
733 | // failing client | |
f67539c2 | 734 | cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL); |
11fdf7f2 TL |
735 | |
736 | MPing *m3 = new MPing(); | |
737 | ASSERT_EQ(c2s->send_message(m3), 0); | |
738 | ||
f67539c2 | 739 | Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY); |
11fdf7f2 TL |
740 | // the srv end of theconnection is now paused at ready |
741 | // this means that the reconnect was successful | |
f67539c2 | 742 | srv_interceptor->remove_bp(Interceptor::STEP::READY); |
11fdf7f2 TL |
743 | |
744 | ASSERT_TRUE(c2s_accepter->peer_is_client()); | |
745 | // c2s_accepter sent 0 reconnect messages | |
f67539c2 | 746 | ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u); |
11fdf7f2 | 747 | // c2s_accepter sent 1 reconnect_ok messages |
f67539c2 | 748 | ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 1u); |
11fdf7f2 | 749 | // c2s sent 1 reconnect messages |
f67539c2 | 750 | ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u); |
11fdf7f2 | 751 | // c2s sent 0 reconnect_ok messages |
f67539c2 | 752 | ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 0u); |
11fdf7f2 TL |
753 | |
754 | srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE); | |
755 | ||
756 | { | |
9f95a23c TL |
757 | std::unique_lock l{cli_dispatcher.lock}; |
758 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
759 | cli_dispatcher.got_new = false; |
760 | } | |
761 | ||
762 | client_msgr->shutdown(); | |
763 | client_msgr->wait(); | |
764 | server_msgr->shutdown(); | |
765 | server_msgr->wait(); | |
766 | ||
767 | delete cli_interceptor; | |
768 | delete srv_interceptor; | |
769 | } | |
770 | ||
771 | /** | |
772 | * Scenario: | |
773 | * - A connects to B | |
774 | * - A and B exchange messages | |
775 | * - A fails | |
776 | * - A reconnects // B reconnects | |
777 | */ | |
778 | TEST_P(MessengerTest, ReconnectRaceTest) { | |
11fdf7f2 TL |
779 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); |
780 | ||
781 | TestInterceptor *cli_interceptor = new TestInterceptor(); | |
782 | TestInterceptor *srv_interceptor = new TestInterceptor(); | |
783 | ||
784 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0)); | |
785 | server_msgr->interceptor = srv_interceptor; | |
786 | ||
787 | client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); | |
788 | client_msgr->interceptor = cli_interceptor; | |
789 | ||
790 | entity_addr_t bind_addr; | |
791 | bind_addr.parse("v2:127.0.0.1:3300"); | |
792 | server_msgr->bind(bind_addr); | |
793 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
794 | server_msgr->start(); | |
795 | ||
796 | bind_addr.parse("v2:127.0.0.1:3301"); | |
797 | client_msgr->bind(bind_addr); | |
798 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
799 | client_msgr->start(); | |
800 | ||
801 | ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), | |
802 | server_msgr->get_myaddrs()); | |
803 | ||
804 | MPing *m1 = new MPing(); | |
805 | ASSERT_EQ(c2s->send_message(m1), 0); | |
806 | ||
807 | { | |
9f95a23c TL |
808 | std::unique_lock l{cli_dispatcher.lock}; |
809 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
810 | cli_dispatcher.got_new = false; |
811 | } | |
812 | ||
813 | ASSERT_TRUE(c2s->is_connected()); | |
814 | ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); | |
815 | ASSERT_TRUE(c2s->peer_is_osd()); | |
816 | ||
f67539c2 | 817 | cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE); |
11fdf7f2 TL |
818 | |
819 | MPing *m2 = new MPing(); | |
820 | ASSERT_EQ(c2s->send_message(m2), 0); | |
821 | ||
f67539c2 TL |
822 | cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get()); |
823 | cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE); | |
11fdf7f2 TL |
824 | |
825 | // at this point client and server are connected together | |
826 | ||
827 | // force both client and server to race on reconnect | |
f67539c2 TL |
828 | cli_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT); |
829 | srv_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT); | |
11fdf7f2 TL |
830 | |
831 | // failing client | |
832 | // this will cause both client and server to reconnect at the same time | |
f67539c2 | 833 | cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL); |
11fdf7f2 TL |
834 | |
835 | MPing *m3 = new MPing(); | |
836 | ASSERT_EQ(c2s->send_message(m3), 0); | |
837 | ||
f67539c2 TL |
838 | cli_interceptor->wait(Interceptor::STEP::SEND_RECONNECT, c2s.get()); |
839 | srv_interceptor->wait(Interceptor::STEP::SEND_RECONNECT); | |
11fdf7f2 | 840 | |
f67539c2 TL |
841 | cli_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT); |
842 | srv_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT); | |
11fdf7f2 TL |
843 | |
844 | // pause on "ready" | |
f67539c2 | 845 | srv_interceptor->breakpoint(Interceptor::STEP::READY); |
11fdf7f2 | 846 | |
f67539c2 TL |
847 | cli_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE); |
848 | srv_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE); | |
11fdf7f2 | 849 | |
f67539c2 | 850 | Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY); |
11fdf7f2 TL |
851 | |
852 | // the server has reconnected and is "ready" | |
f67539c2 | 853 | srv_interceptor->remove_bp(Interceptor::STEP::READY); |
11fdf7f2 TL |
854 | |
855 | ASSERT_TRUE(c2s_accepter->peer_is_client()); | |
856 | ASSERT_TRUE(c2s->peer_is_osd()); | |
857 | ||
858 | // the server should win the reconnect race | |
859 | ||
860 | // c2s_accepter sent 1 or 2 reconnect messages | |
f67539c2 TL |
861 | ASSERT_LT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 3u); |
862 | ASSERT_GT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u); | |
11fdf7f2 | 863 | // c2s_accepter sent 0 reconnect_ok messages |
f67539c2 | 864 | ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 0u); |
11fdf7f2 | 865 | // c2s sent 1 reconnect messages |
f67539c2 | 866 | ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u); |
11fdf7f2 | 867 | // c2s sent 1 reconnect_ok messages |
f67539c2 | 868 | ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 1u); |
11fdf7f2 | 869 | |
f67539c2 | 870 | if (srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT) == 2) { |
11fdf7f2 TL |
871 | // if the server send the reconnect message two times then |
872 | // the client must have sent a session retry message to the server | |
f67539c2 | 873 | ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 1u); |
11fdf7f2 | 874 | } else { |
f67539c2 | 875 | ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 0u); |
11fdf7f2 TL |
876 | } |
877 | ||
f67539c2 | 878 | srv_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE); |
11fdf7f2 TL |
879 | |
880 | { | |
9f95a23c TL |
881 | std::unique_lock l{cli_dispatcher.lock}; |
882 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
883 | cli_dispatcher.got_new = false; |
884 | } | |
885 | ||
886 | client_msgr->shutdown(); | |
887 | client_msgr->wait(); | |
888 | server_msgr->shutdown(); | |
889 | server_msgr->wait(); | |
890 | ||
891 | delete cli_interceptor; | |
892 | delete srv_interceptor; | |
893 | } | |
894 | ||
7c673cae FG |
895 | TEST_P(MessengerTest, SimpleTest) { |
896 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
897 | entity_addr_t bind_addr; | |
9f95a23c | 898 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
899 | server_msgr->bind(bind_addr); |
900 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
901 | server_msgr->start(); | |
902 | ||
903 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
904 | client_msgr->start(); | |
905 | ||
906 | // 1. simple round trip | |
907 | MPing *m = new MPing(); | |
11fdf7f2 TL |
908 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
909 | server_msgr->get_myaddrs()); | |
7c673cae FG |
910 | { |
911 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
912 | std::unique_lock l{cli_dispatcher.lock}; |
913 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
914 | cli_dispatcher.got_new = false; |
915 | } | |
916 | ASSERT_TRUE(conn->is_connected()); | |
11fdf7f2 | 917 | ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
918 | ASSERT_TRUE(conn->peer_is_osd()); |
919 | ||
920 | // 2. test rebind port | |
921 | set<int> avoid_ports; | |
11fdf7f2 TL |
922 | for (int i = 0; i < 10 ; i++) { |
923 | for (auto a : server_msgr->get_myaddrs().v) { | |
924 | avoid_ports.insert(a.get_port() + i); | |
925 | } | |
926 | } | |
7c673cae | 927 | server_msgr->rebind(avoid_ports); |
11fdf7f2 TL |
928 | for (auto a : server_msgr->get_myaddrs().v) { |
929 | ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0); | |
930 | } | |
7c673cae | 931 | |
11fdf7f2 TL |
932 | conn = client_msgr->connect_to(server_msgr->get_mytype(), |
933 | server_msgr->get_myaddrs()); | |
7c673cae FG |
934 | { |
935 | m = new MPing(); | |
936 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
937 | std::unique_lock l{cli_dispatcher.lock}; |
938 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
939 | cli_dispatcher.got_new = false; |
940 | } | |
11fdf7f2 | 941 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
942 | |
943 | // 3. test markdown connection | |
944 | conn->mark_down(); | |
945 | ASSERT_FALSE(conn->is_connected()); | |
946 | ||
947 | // 4. test failed connection | |
948 | server_msgr->shutdown(); | |
949 | server_msgr->wait(); | |
950 | ||
951 | m = new MPing(); | |
952 | conn->send_message(m); | |
953 | CHECK_AND_WAIT_TRUE(!conn->is_connected()); | |
954 | ASSERT_FALSE(conn->is_connected()); | |
955 | ||
956 | // 5. loopback connection | |
957 | srv_dispatcher.loopback = true; | |
958 | conn = client_msgr->get_loopback_connection(); | |
959 | { | |
960 | m = new MPing(); | |
961 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
962 | std::unique_lock l{cli_dispatcher.lock}; |
963 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
964 | cli_dispatcher.got_new = false; |
965 | } | |
966 | srv_dispatcher.loopback = false; | |
11fdf7f2 TL |
967 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
968 | client_msgr->shutdown(); | |
969 | client_msgr->wait(); | |
970 | server_msgr->shutdown(); | |
971 | server_msgr->wait(); | |
972 | } | |
973 | ||
974 | TEST_P(MessengerTest, SimpleMsgr2Test) { | |
975 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
976 | entity_addr_t legacy_addr; | |
977 | legacy_addr.parse("v1:127.0.0.1"); | |
978 | entity_addr_t msgr2_addr; | |
979 | msgr2_addr.parse("v2:127.0.0.1"); | |
980 | entity_addrvec_t bind_addrs; | |
981 | bind_addrs.v.push_back(legacy_addr); | |
982 | bind_addrs.v.push_back(msgr2_addr); | |
983 | server_msgr->bindv(bind_addrs); | |
984 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
985 | server_msgr->start(); | |
986 | ||
987 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
988 | client_msgr->start(); | |
989 | ||
990 | // 1. simple round trip | |
991 | MPing *m = new MPing(); | |
992 | ConnectionRef conn = client_msgr->connect_to( | |
993 | server_msgr->get_mytype(), | |
994 | server_msgr->get_myaddrs()); | |
995 | { | |
996 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
997 | std::unique_lock l{cli_dispatcher.lock}; |
998 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
999 | cli_dispatcher.got_new = false; |
1000 | } | |
1001 | ASSERT_TRUE(conn->is_connected()); | |
1002 | ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count()); | |
1003 | ASSERT_TRUE(conn->peer_is_osd()); | |
1004 | ||
1005 | // 2. test rebind port | |
1006 | set<int> avoid_ports; | |
1007 | for (int i = 0; i < 10 ; i++) { | |
1008 | for (auto a : server_msgr->get_myaddrs().v) { | |
1009 | avoid_ports.insert(a.get_port() + i); | |
1010 | } | |
1011 | } | |
1012 | server_msgr->rebind(avoid_ports); | |
1013 | for (auto a : server_msgr->get_myaddrs().v) { | |
1014 | ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0); | |
1015 | } | |
1016 | ||
1017 | conn = client_msgr->connect_to( | |
1018 | server_msgr->get_mytype(), | |
1019 | server_msgr->get_myaddrs()); | |
1020 | { | |
1021 | m = new MPing(); | |
1022 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1023 | std::unique_lock l{cli_dispatcher.lock}; |
1024 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
1025 | cli_dispatcher.got_new = false; |
1026 | } | |
1027 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); | |
1028 | ||
1029 | // 3. test markdown connection | |
1030 | conn->mark_down(); | |
1031 | ASSERT_FALSE(conn->is_connected()); | |
1032 | ||
1033 | // 4. test failed connection | |
1034 | server_msgr->shutdown(); | |
1035 | server_msgr->wait(); | |
1036 | ||
1037 | m = new MPing(); | |
1038 | conn->send_message(m); | |
1039 | CHECK_AND_WAIT_TRUE(!conn->is_connected()); | |
1040 | ASSERT_FALSE(conn->is_connected()); | |
1041 | ||
1042 | // 5. loopback connection | |
1043 | srv_dispatcher.loopback = true; | |
1044 | conn = client_msgr->get_loopback_connection(); | |
1045 | { | |
1046 | m = new MPing(); | |
1047 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1048 | std::unique_lock l{cli_dispatcher.lock}; |
1049 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
1050 | cli_dispatcher.got_new = false; |
1051 | } | |
1052 | srv_dispatcher.loopback = false; | |
1053 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); | |
7c673cae FG |
1054 | client_msgr->shutdown(); |
1055 | client_msgr->wait(); | |
1056 | server_msgr->shutdown(); | |
1057 | server_msgr->wait(); | |
1058 | } | |
1059 | ||
7c673cae FG |
1060 | TEST_P(MessengerTest, FeatureTest) { |
1061 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
1062 | entity_addr_t bind_addr; | |
9f95a23c | 1063 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
1064 | uint64_t all_feature_supported, feature_required, feature_supported = 0; |
1065 | for (int i = 0; i < 10; i++) | |
1066 | feature_supported |= 1ULL << i; | |
11fdf7f2 TL |
1067 | feature_supported |= CEPH_FEATUREMASK_MSG_ADDR2; |
1068 | feature_supported |= CEPH_FEATUREMASK_SERVER_NAUTILUS; | |
7c673cae FG |
1069 | feature_required = feature_supported | 1ULL << 13; |
1070 | all_feature_supported = feature_required | 1ULL << 14; | |
1071 | ||
1072 | Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT); | |
1073 | p.features_required = feature_required; | |
1074 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
1075 | server_msgr->bind(bind_addr); | |
1076 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1077 | server_msgr->start(); | |
1078 | ||
1079 | // 1. Suppose if only support less than required | |
1080 | p = client_msgr->get_policy(entity_name_t::TYPE_OSD); | |
1081 | p.features_supported = feature_supported; | |
1082 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
1083 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1084 | client_msgr->start(); | |
1085 | ||
1086 | MPing *m = new MPing(); | |
11fdf7f2 TL |
1087 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1088 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1089 | conn->send_message(m); |
1090 | CHECK_AND_WAIT_TRUE(!conn->is_connected()); | |
1091 | // should failed build a connection | |
1092 | ASSERT_FALSE(conn->is_connected()); | |
1093 | ||
1094 | client_msgr->shutdown(); | |
1095 | client_msgr->wait(); | |
1096 | ||
1097 | // 2. supported met required | |
1098 | p = client_msgr->get_policy(entity_name_t::TYPE_OSD); | |
1099 | p.features_supported = all_feature_supported; | |
1100 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
1101 | client_msgr->start(); | |
1102 | ||
11fdf7f2 TL |
1103 | conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1104 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1105 | { |
1106 | m = new MPing(); | |
1107 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1108 | std::unique_lock l{cli_dispatcher.lock}; |
1109 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1110 | cli_dispatcher.got_new = false; |
1111 | } | |
11fdf7f2 | 1112 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
1113 | |
1114 | server_msgr->shutdown(); | |
1115 | client_msgr->shutdown(); | |
1116 | server_msgr->wait(); | |
1117 | client_msgr->wait(); | |
1118 | } | |
1119 | ||
1120 | TEST_P(MessengerTest, TimeoutTest) { | |
81eedcae | 1121 | g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "1"); |
7c673cae FG |
1122 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); |
1123 | entity_addr_t bind_addr; | |
9f95a23c | 1124 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
1125 | server_msgr->bind(bind_addr); |
1126 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1127 | server_msgr->start(); | |
1128 | ||
1129 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1130 | client_msgr->start(); | |
1131 | ||
1132 | // 1. build the connection | |
1133 | MPing *m = new MPing(); | |
11fdf7f2 TL |
1134 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1135 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1136 | { |
1137 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1138 | std::unique_lock l{cli_dispatcher.lock}; |
1139 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1140 | cli_dispatcher.got_new = false; |
1141 | } | |
1142 | ASSERT_TRUE(conn->is_connected()); | |
11fdf7f2 | 1143 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
1144 | ASSERT_TRUE(conn->peer_is_osd()); |
1145 | ||
1146 | // 2. wait for idle | |
1147 | usleep(2500*1000); | |
1148 | ASSERT_FALSE(conn->is_connected()); | |
1149 | ||
1150 | server_msgr->shutdown(); | |
1151 | server_msgr->wait(); | |
1152 | ||
1153 | client_msgr->shutdown(); | |
1154 | client_msgr->wait(); | |
81eedcae | 1155 | g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "900"); |
7c673cae FG |
1156 | } |
1157 | ||
1158 | TEST_P(MessengerTest, StatefulTest) { | |
1159 | Message *m; | |
1160 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
1161 | entity_addr_t bind_addr; | |
9f95a23c | 1162 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
1163 | Messenger::Policy p = Messenger::Policy::stateful_server(0); |
1164 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
1165 | p = Messenger::Policy::lossless_client(0); | |
1166 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
1167 | ||
1168 | server_msgr->bind(bind_addr); | |
1169 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1170 | server_msgr->start(); | |
1171 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1172 | client_msgr->start(); | |
1173 | ||
1174 | // 1. test for server standby | |
11fdf7f2 TL |
1175 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1176 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1177 | { |
1178 | m = new MPing(); | |
1179 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1180 | std::unique_lock l{cli_dispatcher.lock}; |
1181 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1182 | cli_dispatcher.got_new = false; |
1183 | } | |
11fdf7f2 | 1184 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
1185 | conn->mark_down(); |
1186 | ASSERT_FALSE(conn->is_connected()); | |
11fdf7f2 TL |
1187 | ConnectionRef server_conn = server_msgr->connect_to( |
1188 | client_msgr->get_mytype(), srv_dispatcher.last_accept); | |
7c673cae | 1189 | // don't lose state |
11fdf7f2 | 1190 | ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count()); |
7c673cae FG |
1191 | |
1192 | srv_dispatcher.got_new = false; | |
11fdf7f2 TL |
1193 | conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1194 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1195 | { |
1196 | m = new MPing(); | |
1197 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1198 | std::unique_lock l{cli_dispatcher.lock}; |
1199 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1200 | cli_dispatcher.got_new = false; |
1201 | } | |
11fdf7f2 TL |
1202 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
1203 | server_conn = server_msgr->connect_to(client_msgr->get_mytype(), | |
1204 | srv_dispatcher.last_accept); | |
7c673cae | 1205 | { |
9f95a23c TL |
1206 | std::unique_lock l{srv_dispatcher.lock}; |
1207 | srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_remote_reset; }); | |
7c673cae FG |
1208 | } |
1209 | ||
1210 | // 2. test for client reconnect | |
1211 | ASSERT_FALSE(cli_dispatcher.got_remote_reset); | |
1212 | cli_dispatcher.got_connect = false; | |
1213 | cli_dispatcher.got_new = false; | |
1214 | cli_dispatcher.got_remote_reset = false; | |
1215 | server_conn->mark_down(); | |
1216 | ASSERT_FALSE(server_conn->is_connected()); | |
1217 | // ensure client detect server socket closed | |
1218 | { | |
9f95a23c TL |
1219 | std::unique_lock l{cli_dispatcher.lock}; |
1220 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; }); | |
7c673cae FG |
1221 | cli_dispatcher.got_remote_reset = false; |
1222 | } | |
1223 | { | |
9f95a23c TL |
1224 | std::unique_lock l{cli_dispatcher.lock}; |
1225 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; }); | |
7c673cae FG |
1226 | cli_dispatcher.got_connect = false; |
1227 | } | |
1228 | CHECK_AND_WAIT_TRUE(conn->is_connected()); | |
1229 | ASSERT_TRUE(conn->is_connected()); | |
1230 | ||
1231 | { | |
1232 | m = new MPing(); | |
1233 | ASSERT_EQ(conn->send_message(m), 0); | |
1234 | ASSERT_TRUE(conn->is_connected()); | |
9f95a23c TL |
1235 | std::unique_lock l{cli_dispatcher.lock}; |
1236 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1237 | cli_dispatcher.got_new = false; |
1238 | } | |
1239 | // resetcheck happen | |
11fdf7f2 TL |
1240 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
1241 | server_conn = server_msgr->connect_to(client_msgr->get_mytype(), | |
1242 | srv_dispatcher.last_accept); | |
1243 | ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count()); | |
7c673cae FG |
1244 | cli_dispatcher.got_remote_reset = false; |
1245 | ||
1246 | server_msgr->shutdown(); | |
1247 | client_msgr->shutdown(); | |
1248 | server_msgr->wait(); | |
1249 | client_msgr->wait(); | |
1250 | } | |
1251 | ||
1252 | TEST_P(MessengerTest, StatelessTest) { | |
1253 | Message *m; | |
1254 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
1255 | entity_addr_t bind_addr; | |
9f95a23c | 1256 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
1257 | Messenger::Policy p = Messenger::Policy::stateless_server(0); |
1258 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
1259 | p = Messenger::Policy::lossy_client(0); | |
1260 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
1261 | ||
1262 | server_msgr->bind(bind_addr); | |
1263 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1264 | server_msgr->start(); | |
1265 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1266 | client_msgr->start(); | |
1267 | ||
1268 | // 1. test for server lose state | |
11fdf7f2 TL |
1269 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1270 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1271 | { |
1272 | m = new MPing(); | |
1273 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1274 | std::unique_lock l{cli_dispatcher.lock}; |
1275 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1276 | cli_dispatcher.got_new = false; |
1277 | } | |
11fdf7f2 | 1278 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
1279 | conn->mark_down(); |
1280 | ASSERT_FALSE(conn->is_connected()); | |
1281 | ||
1282 | srv_dispatcher.got_new = false; | |
9f95a23c TL |
1283 | ConnectionRef server_conn; |
1284 | srv_dispatcher.last_accept_con_ptr = &server_conn; | |
11fdf7f2 TL |
1285 | conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1286 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1287 | { |
1288 | m = new MPing(); | |
1289 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1290 | std::unique_lock l{cli_dispatcher.lock}; |
1291 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1292 | cli_dispatcher.got_new = false; |
1293 | } | |
11fdf7f2 | 1294 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
9f95a23c TL |
1295 | ASSERT_TRUE(server_conn); |
1296 | ||
7c673cae FG |
1297 | // server lose state |
1298 | { | |
9f95a23c TL |
1299 | std::unique_lock l{srv_dispatcher.lock}; |
1300 | srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); | |
7c673cae | 1301 | } |
11fdf7f2 | 1302 | ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count()); |
7c673cae FG |
1303 | |
1304 | // 2. test for client lossy | |
1305 | server_conn->mark_down(); | |
1306 | ASSERT_FALSE(server_conn->is_connected()); | |
1307 | conn->send_keepalive(); | |
1308 | CHECK_AND_WAIT_TRUE(!conn->is_connected()); | |
1309 | ASSERT_FALSE(conn->is_connected()); | |
11fdf7f2 TL |
1310 | conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1311 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1312 | { |
1313 | m = new MPing(); | |
1314 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1315 | std::unique_lock l{cli_dispatcher.lock}; |
1316 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1317 | cli_dispatcher.got_new = false; |
1318 | } | |
11fdf7f2 | 1319 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
1320 | |
1321 | server_msgr->shutdown(); | |
1322 | client_msgr->shutdown(); | |
1323 | server_msgr->wait(); | |
1324 | client_msgr->wait(); | |
1325 | } | |
1326 | ||
9f95a23c TL |
1327 | TEST_P(MessengerTest, AnonTest) { |
1328 | Message *m; | |
1329 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
1330 | entity_addr_t bind_addr; | |
1331 | bind_addr.parse("v2:127.0.0.1"); | |
1332 | Messenger::Policy p = Messenger::Policy::stateless_server(0); | |
1333 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
1334 | p = Messenger::Policy::lossy_client(0); | |
1335 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
1336 | ||
1337 | server_msgr->bind(bind_addr); | |
1338 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1339 | server_msgr->start(); | |
1340 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1341 | client_msgr->start(); | |
1342 | ||
1343 | ConnectionRef server_con_a, server_con_b; | |
1344 | ||
1345 | // a | |
1346 | srv_dispatcher.last_accept_con_ptr = &server_con_a; | |
1347 | ConnectionRef con_a = client_msgr->connect_to(server_msgr->get_mytype(), | |
1348 | server_msgr->get_myaddrs(), | |
1349 | true); | |
1350 | { | |
1351 | m = new MPing(); | |
1352 | ASSERT_EQ(con_a->send_message(m), 0); | |
1353 | std::unique_lock l{cli_dispatcher.lock}; | |
1354 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
1355 | cli_dispatcher.got_new = false; | |
1356 | } | |
1357 | ASSERT_EQ(1U, static_cast<Session*>(con_a->get_priv().get())->get_count()); | |
1358 | ||
1359 | // b | |
1360 | srv_dispatcher.last_accept_con_ptr = &server_con_b; | |
1361 | ConnectionRef con_b = client_msgr->connect_to(server_msgr->get_mytype(), | |
1362 | server_msgr->get_myaddrs(), | |
1363 | true); | |
1364 | { | |
1365 | m = new MPing(); | |
1366 | ASSERT_EQ(con_b->send_message(m), 0); | |
1367 | std::unique_lock l{cli_dispatcher.lock}; | |
1368 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
1369 | cli_dispatcher.got_new = false; | |
1370 | } | |
1371 | ASSERT_EQ(1U, static_cast<Session*>(con_b->get_priv().get())->get_count()); | |
1372 | ||
1373 | // these should be distinct | |
1374 | ASSERT_NE(con_a, con_b); | |
1375 | ASSERT_NE(server_con_a, server_con_b); | |
1376 | ||
1377 | // and both connected | |
1378 | { | |
1379 | m = new MPing(); | |
1380 | ASSERT_EQ(con_a->send_message(m), 0); | |
1381 | std::unique_lock l{cli_dispatcher.lock}; | |
1382 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
1383 | cli_dispatcher.got_new = false; | |
1384 | } | |
1385 | { | |
1386 | m = new MPing(); | |
1387 | ASSERT_EQ(con_b->send_message(m), 0); | |
1388 | std::unique_lock l{cli_dispatcher.lock}; | |
1389 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
1390 | cli_dispatcher.got_new = false; | |
1391 | } | |
1392 | ||
1393 | // clean up | |
1394 | con_a->mark_down(); | |
1395 | ASSERT_FALSE(con_a->is_connected()); | |
1396 | con_b->mark_down(); | |
1397 | ASSERT_FALSE(con_b->is_connected()); | |
1398 | ||
1399 | server_msgr->shutdown(); | |
1400 | client_msgr->shutdown(); | |
1401 | server_msgr->wait(); | |
1402 | client_msgr->wait(); | |
1403 | } | |
1404 | ||
7c673cae FG |
1405 | TEST_P(MessengerTest, ClientStandbyTest) { |
1406 | Message *m; | |
1407 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
1408 | entity_addr_t bind_addr; | |
9f95a23c | 1409 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
1410 | Messenger::Policy p = Messenger::Policy::stateful_server(0); |
1411 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
1412 | p = Messenger::Policy::lossless_peer(0); | |
1413 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
1414 | ||
1415 | server_msgr->bind(bind_addr); | |
1416 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1417 | server_msgr->start(); | |
1418 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1419 | client_msgr->start(); | |
1420 | ||
1421 | // 1. test for client standby, resetcheck | |
11fdf7f2 TL |
1422 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1423 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1424 | { |
1425 | m = new MPing(); | |
1426 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1427 | std::unique_lock l{cli_dispatcher.lock}; |
1428 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1429 | cli_dispatcher.got_new = false; |
1430 | } | |
11fdf7f2 TL |
1431 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
1432 | ConnectionRef server_conn = server_msgr->connect_to( | |
1433 | client_msgr->get_mytype(), | |
1434 | srv_dispatcher.last_accept); | |
7c673cae FG |
1435 | ASSERT_FALSE(cli_dispatcher.got_remote_reset); |
1436 | cli_dispatcher.got_connect = false; | |
1437 | server_conn->mark_down(); | |
1438 | ASSERT_FALSE(server_conn->is_connected()); | |
1439 | // client should be standby | |
1440 | usleep(300*1000); | |
1441 | // client should be standby, so we use original connection | |
1442 | { | |
1443 | // Try send message to verify got remote reset callback | |
1444 | m = new MPing(); | |
1445 | ASSERT_EQ(conn->send_message(m), 0); | |
1446 | { | |
9f95a23c TL |
1447 | std::unique_lock l{cli_dispatcher.lock}; |
1448 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; }); | |
7c673cae | 1449 | cli_dispatcher.got_remote_reset = false; |
9f95a23c | 1450 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; }); |
7c673cae FG |
1451 | cli_dispatcher.got_connect = false; |
1452 | } | |
1453 | CHECK_AND_WAIT_TRUE(conn->is_connected()); | |
1454 | ASSERT_TRUE(conn->is_connected()); | |
1455 | m = new MPing(); | |
1456 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1457 | std::unique_lock l{cli_dispatcher.lock}; |
1458 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1459 | cli_dispatcher.got_new = false; |
1460 | } | |
11fdf7f2 TL |
1461 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
1462 | server_conn = server_msgr->connect_to(client_msgr->get_mytype(), | |
1463 | srv_dispatcher.last_accept); | |
1464 | ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count()); | |
7c673cae FG |
1465 | |
1466 | server_msgr->shutdown(); | |
1467 | client_msgr->shutdown(); | |
1468 | server_msgr->wait(); | |
1469 | client_msgr->wait(); | |
1470 | } | |
1471 | ||
1472 | TEST_P(MessengerTest, AuthTest) { | |
11fdf7f2 TL |
1473 | g_ceph_context->_conf.set_val("auth_cluster_required", "cephx"); |
1474 | g_ceph_context->_conf.set_val("auth_service_required", "cephx"); | |
1475 | g_ceph_context->_conf.set_val("auth_client_required", "cephx"); | |
7c673cae FG |
1476 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); |
1477 | entity_addr_t bind_addr; | |
9f95a23c | 1478 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
1479 | server_msgr->bind(bind_addr); |
1480 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1481 | server_msgr->start(); | |
1482 | ||
1483 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1484 | client_msgr->start(); | |
1485 | ||
1486 | // 1. simple auth round trip | |
1487 | MPing *m = new MPing(); | |
11fdf7f2 TL |
1488 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1489 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1490 | { |
1491 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1492 | std::unique_lock l{cli_dispatcher.lock}; |
1493 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1494 | cli_dispatcher.got_new = false; |
1495 | } | |
1496 | ASSERT_TRUE(conn->is_connected()); | |
11fdf7f2 | 1497 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
1498 | |
1499 | // 2. mix auth | |
11fdf7f2 TL |
1500 | g_ceph_context->_conf.set_val("auth_cluster_required", "none"); |
1501 | g_ceph_context->_conf.set_val("auth_service_required", "none"); | |
1502 | g_ceph_context->_conf.set_val("auth_client_required", "none"); | |
7c673cae FG |
1503 | conn->mark_down(); |
1504 | ASSERT_FALSE(conn->is_connected()); | |
11fdf7f2 TL |
1505 | conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1506 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1507 | { |
1508 | MPing *m = new MPing(); | |
1509 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1510 | std::unique_lock l{cli_dispatcher.lock}; |
1511 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1512 | cli_dispatcher.got_new = false; |
1513 | } | |
1514 | ASSERT_TRUE(conn->is_connected()); | |
11fdf7f2 | 1515 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
1516 | server_msgr->shutdown(); |
1517 | client_msgr->shutdown(); | |
1518 | server_msgr->wait(); | |
1519 | client_msgr->wait(); | |
1520 | } | |
1521 | ||
1522 | TEST_P(MessengerTest, MessageTest) { | |
1523 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
1524 | entity_addr_t bind_addr; | |
9f95a23c | 1525 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
1526 | Messenger::Policy p = Messenger::Policy::stateful_server(0); |
1527 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
1528 | p = Messenger::Policy::lossless_peer(0); | |
1529 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
1530 | ||
1531 | server_msgr->bind(bind_addr); | |
1532 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1533 | server_msgr->start(); | |
1534 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1535 | client_msgr->start(); | |
1536 | ||
1537 | ||
1538 | // 1. A very large "front"(as well as "payload") | |
1539 | // Because a external message need to invade Messenger::decode_message, | |
1540 | // here we only use existing message class(MCommand) | |
11fdf7f2 TL |
1541 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1542 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1543 | { |
1544 | uuid_d uuid; | |
1545 | uuid.generate_random(); | |
1546 | vector<string> cmds; | |
1547 | string s("abcdefghijklmnopqrstuvwxyz"); | |
1548 | for (int i = 0; i < 1024*30; i++) | |
1549 | cmds.push_back(s); | |
1550 | MCommand *m = new MCommand(uuid); | |
1551 | m->cmd = cmds; | |
1552 | conn->send_message(m); | |
9f95a23c TL |
1553 | std::unique_lock l{cli_dispatcher.lock}; |
1554 | cli_dispatcher.cond.wait_for(l, 500s, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1555 | ASSERT_TRUE(cli_dispatcher.got_new); |
1556 | cli_dispatcher.got_new = false; | |
1557 | } | |
1558 | ||
1559 | // 2. A very large "data" | |
1560 | { | |
1561 | bufferlist bl; | |
1562 | string s("abcdefghijklmnopqrstuvwxyz"); | |
1563 | for (int i = 0; i < 1024*30; i++) | |
1564 | bl.append(s); | |
1565 | MPing *m = new MPing(); | |
1566 | m->set_data(bl); | |
1567 | conn->send_message(m); | |
1568 | utime_t t; | |
1569 | t += 1000*1000*500; | |
9f95a23c TL |
1570 | std::unique_lock l{cli_dispatcher.lock}; |
1571 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1572 | ASSERT_TRUE(cli_dispatcher.got_new); |
1573 | cli_dispatcher.got_new = false; | |
1574 | } | |
1575 | server_msgr->shutdown(); | |
1576 | client_msgr->shutdown(); | |
1577 | server_msgr->wait(); | |
1578 | client_msgr->wait(); | |
1579 | } | |
1580 | ||
1581 | ||
1582 | class SyntheticWorkload; | |
1583 | ||
1584 | struct Payload { | |
1585 | enum Who : uint8_t { | |
1586 | PING = 0, | |
1587 | PONG = 1, | |
1588 | }; | |
11fdf7f2 TL |
1589 | uint8_t who = 0; |
1590 | uint64_t seq = 0; | |
7c673cae FG |
1591 | bufferlist data; |
1592 | ||
1593 | Payload(Who who, uint64_t seq, const bufferlist& data) | |
1594 | : who(who), seq(seq), data(data) | |
1595 | {} | |
1596 | Payload() = default; | |
1597 | DENC(Payload, v, p) { | |
1598 | DENC_START(1, 1, p); | |
1599 | denc(v.who, p); | |
1600 | denc(v.seq, p); | |
1601 | denc(v.data, p); | |
1602 | DENC_FINISH(p); | |
1603 | } | |
1604 | }; | |
1605 | WRITE_CLASS_DENC(Payload) | |
1606 | ||
1607 | ostream& operator<<(ostream& out, const Payload &pl) | |
1608 | { | |
1609 | return out << "reply=" << pl.who << " i = " << pl.seq; | |
1610 | } | |
1611 | ||
1612 | class SyntheticDispatcher : public Dispatcher { | |
1613 | public: | |
9f95a23c TL |
1614 | ceph::mutex lock = ceph::make_mutex("SyntheticDispatcher::lock"); |
1615 | ceph::condition_variable cond; | |
7c673cae FG |
1616 | bool is_server; |
1617 | bool got_new; | |
1618 | bool got_remote_reset; | |
1619 | bool got_connect; | |
1620 | map<ConnectionRef, list<uint64_t> > conn_sent; | |
1621 | map<uint64_t, bufferlist> sent; | |
1622 | atomic<uint64_t> index; | |
1623 | SyntheticWorkload *workload; | |
1624 | ||
1625 | SyntheticDispatcher(bool s, SyntheticWorkload *wl): | |
9f95a23c | 1626 | Dispatcher(g_ceph_context), is_server(s), got_new(false), |
11fdf7f2 | 1627 | got_remote_reset(false), got_connect(false), index(0), workload(wl) { |
11fdf7f2 | 1628 | } |
7c673cae FG |
1629 | bool ms_can_fast_dispatch_any() const override { return true; } |
1630 | bool ms_can_fast_dispatch(const Message *m) const override { | |
1631 | switch (m->get_type()) { | |
1632 | case CEPH_MSG_PING: | |
1633 | case MSG_COMMAND: | |
1634 | return true; | |
1635 | default: | |
1636 | return false; | |
1637 | } | |
1638 | } | |
1639 | ||
1640 | void ms_handle_fast_connect(Connection *con) override { | |
9f95a23c | 1641 | std::lock_guard l{lock}; |
7c673cae FG |
1642 | list<uint64_t> c = conn_sent[con]; |
1643 | for (list<uint64_t>::iterator it = c.begin(); | |
1644 | it != c.end(); ++it) | |
1645 | sent.erase(*it); | |
1646 | conn_sent.erase(con); | |
1647 | got_connect = true; | |
9f95a23c | 1648 | cond.notify_all(); |
7c673cae FG |
1649 | } |
1650 | void ms_handle_fast_accept(Connection *con) override { | |
9f95a23c | 1651 | std::lock_guard l{lock}; |
7c673cae FG |
1652 | list<uint64_t> c = conn_sent[con]; |
1653 | for (list<uint64_t>::iterator it = c.begin(); | |
1654 | it != c.end(); ++it) | |
1655 | sent.erase(*it); | |
1656 | conn_sent.erase(con); | |
9f95a23c | 1657 | cond.notify_all(); |
7c673cae FG |
1658 | } |
1659 | bool ms_dispatch(Message *m) override { | |
1660 | ceph_abort(); | |
1661 | } | |
1662 | bool ms_handle_reset(Connection *con) override; | |
1663 | void ms_handle_remote_reset(Connection *con) override { | |
9f95a23c | 1664 | std::lock_guard l{lock}; |
7c673cae FG |
1665 | list<uint64_t> c = conn_sent[con]; |
1666 | for (list<uint64_t>::iterator it = c.begin(); | |
1667 | it != c.end(); ++it) | |
1668 | sent.erase(*it); | |
1669 | conn_sent.erase(con); | |
1670 | got_remote_reset = true; | |
1671 | } | |
1672 | bool ms_handle_refused(Connection *con) override { | |
1673 | return false; | |
1674 | } | |
1675 | void ms_fast_dispatch(Message *m) override { | |
1676 | // MSG_COMMAND is used to disorganize regular message flow | |
1677 | if (m->get_type() == MSG_COMMAND) { | |
1678 | m->put(); | |
1679 | return ; | |
1680 | } | |
1681 | ||
1682 | Payload pl; | |
11fdf7f2 TL |
1683 | auto p = m->get_data().cbegin(); |
1684 | decode(pl, p); | |
7c673cae FG |
1685 | if (pl.who == Payload::PING) { |
1686 | lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; | |
1687 | reply_message(m, pl); | |
1688 | m->put(); | |
9f95a23c | 1689 | std::lock_guard l{lock}; |
7c673cae | 1690 | got_new = true; |
9f95a23c | 1691 | cond.notify_all(); |
7c673cae | 1692 | } else { |
9f95a23c | 1693 | std::lock_guard l{lock}; |
7c673cae FG |
1694 | if (sent.count(pl.seq)) { |
1695 | lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; | |
1696 | ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq); | |
1697 | ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq])); | |
1698 | conn_sent[m->get_connection()].pop_front(); | |
1699 | sent.erase(pl.seq); | |
1700 | } | |
1701 | m->put(); | |
1702 | got_new = true; | |
9f95a23c | 1703 | cond.notify_all(); |
7c673cae FG |
1704 | } |
1705 | } | |
1706 | ||
11fdf7f2 TL |
1707 | int ms_handle_authentication(Connection *con) override { |
1708 | return 1; | |
7c673cae FG |
1709 | } |
1710 | ||
1711 | void reply_message(const Message *m, Payload& pl) { | |
1712 | pl.who = Payload::PONG; | |
1713 | bufferlist bl; | |
11fdf7f2 | 1714 | encode(pl, bl); |
7c673cae FG |
1715 | MPing *rm = new MPing(); |
1716 | rm->set_data(bl); | |
1717 | m->get_connection()->send_message(rm); | |
1718 | lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl; | |
1719 | } | |
1720 | ||
1721 | void send_message_wrap(ConnectionRef con, const bufferlist& data) { | |
1722 | Message *m = new MPing(); | |
1723 | Payload pl{Payload::PING, index++, data}; | |
1724 | bufferlist bl; | |
11fdf7f2 | 1725 | encode(pl, bl); |
7c673cae FG |
1726 | m->set_data(bl); |
1727 | if (!con->get_messenger()->get_default_policy().lossy) { | |
9f95a23c | 1728 | std::lock_guard l{lock}; |
7c673cae FG |
1729 | sent[pl.seq] = pl.data; |
1730 | conn_sent[con].push_back(pl.seq); | |
1731 | } | |
1732 | lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl; | |
1733 | ASSERT_EQ(0, con->send_message(m)); | |
1734 | } | |
1735 | ||
1736 | uint64_t get_pending() { | |
9f95a23c | 1737 | std::lock_guard l{lock}; |
7c673cae FG |
1738 | return sent.size(); |
1739 | } | |
1740 | ||
1741 | void clear_pending(ConnectionRef con) { | |
9f95a23c | 1742 | std::lock_guard l{lock}; |
7c673cae FG |
1743 | |
1744 | for (list<uint64_t>::iterator it = conn_sent[con].begin(); | |
1745 | it != conn_sent[con].end(); ++it) | |
1746 | sent.erase(*it); | |
1747 | conn_sent.erase(con); | |
1748 | } | |
1749 | ||
1750 | void print() { | |
1751 | for (auto && p : conn_sent) { | |
1752 | if (!p.second.empty()) { | |
1753 | lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl; | |
1754 | } | |
1755 | } | |
1756 | } | |
1757 | }; | |
1758 | ||
1759 | ||
1760 | class SyntheticWorkload { | |
9f95a23c TL |
1761 | ceph::mutex lock = ceph::make_mutex("SyntheticWorkload::lock"); |
1762 | ceph::condition_variable cond; | |
7c673cae FG |
1763 | set<Messenger*> available_servers; |
1764 | set<Messenger*> available_clients; | |
11fdf7f2 | 1765 | Messenger::Policy client_policy; |
7c673cae FG |
1766 | map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections; |
1767 | SyntheticDispatcher dispatcher; | |
1768 | gen_type rng; | |
1769 | vector<bufferlist> rand_data; | |
11fdf7f2 | 1770 | DummyAuthClientServer dummy_auth; |
7c673cae FG |
1771 | |
1772 | public: | |
1773 | static const unsigned max_in_flight = 64; | |
1774 | static const unsigned max_connections = 128; | |
1775 | static const unsigned max_message_len = 1024 * 1024 * 4; | |
1776 | ||
1777 | SyntheticWorkload(int servers, int clients, string type, int random_num, | |
11fdf7f2 | 1778 | Messenger::Policy srv_policy, Messenger::Policy cli_policy) |
9f95a23c | 1779 | : client_policy(cli_policy), |
11fdf7f2 TL |
1780 | dispatcher(false, this), |
1781 | rng(time(NULL)), | |
1782 | dummy_auth(g_ceph_context) { | |
1783 | dummy_auth.auth_registry.refresh_config(); | |
7c673cae FG |
1784 | Messenger *msgr; |
1785 | int base_port = 16800; | |
1786 | entity_addr_t bind_addr; | |
1787 | char addr[64]; | |
1788 | for (int i = 0; i < servers; ++i) { | |
1789 | msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), | |
f67539c2 | 1790 | "server", getpid()+i); |
9f95a23c | 1791 | snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d", |
11fdf7f2 | 1792 | base_port+i); |
7c673cae FG |
1793 | bind_addr.parse(addr); |
1794 | msgr->bind(bind_addr); | |
1795 | msgr->add_dispatcher_head(&dispatcher); | |
11fdf7f2 TL |
1796 | msgr->set_auth_client(&dummy_auth); |
1797 | msgr->set_auth_server(&dummy_auth); | |
7c673cae | 1798 | |
11fdf7f2 | 1799 | ceph_assert(msgr); |
7c673cae FG |
1800 | msgr->set_default_policy(srv_policy); |
1801 | available_servers.insert(msgr); | |
1802 | msgr->start(); | |
1803 | } | |
1804 | ||
1805 | for (int i = 0; i < clients; ++i) { | |
1806 | msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1), | |
f67539c2 | 1807 | "client", getpid()+i+servers); |
7c673cae | 1808 | if (cli_policy.standby) { |
9f95a23c | 1809 | snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d", |
11fdf7f2 | 1810 | base_port+i+servers); |
7c673cae FG |
1811 | bind_addr.parse(addr); |
1812 | msgr->bind(bind_addr); | |
1813 | } | |
1814 | msgr->add_dispatcher_head(&dispatcher); | |
11fdf7f2 TL |
1815 | msgr->set_auth_client(&dummy_auth); |
1816 | msgr->set_auth_server(&dummy_auth); | |
7c673cae | 1817 | |
11fdf7f2 | 1818 | ceph_assert(msgr); |
7c673cae FG |
1819 | msgr->set_default_policy(cli_policy); |
1820 | available_clients.insert(msgr); | |
1821 | msgr->start(); | |
1822 | } | |
1823 | ||
1824 | for (int i = 0; i < random_num; i++) { | |
1825 | bufferlist bl; | |
1826 | boost::uniform_int<> u(32, max_message_len); | |
1827 | uint64_t value_len = u(rng); | |
1828 | bufferptr bp(value_len); | |
1829 | bp.zero(); | |
1830 | for (uint64_t j = 0; j < value_len-sizeof(i); ) { | |
1831 | memcpy(bp.c_str()+j, &i, sizeof(i)); | |
1832 | j += 4096; | |
1833 | } | |
1834 | ||
1835 | bl.append(bp); | |
1836 | rand_data.push_back(bl); | |
1837 | } | |
1838 | } | |
1839 | ||
1840 | ConnectionRef _get_random_connection() { | |
1841 | while (dispatcher.get_pending() > max_in_flight) { | |
9f95a23c | 1842 | lock.unlock(); |
7c673cae | 1843 | usleep(500); |
9f95a23c | 1844 | lock.lock(); |
7c673cae | 1845 | } |
9f95a23c | 1846 | ceph_assert(ceph_mutex_is_locked(lock)); |
7c673cae FG |
1847 | boost::uniform_int<> choose(0, available_connections.size() - 1); |
1848 | int index = choose(rng); | |
1849 | map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin(); | |
1850 | for (; index > 0; --index, ++i) ; | |
1851 | return i->first; | |
1852 | } | |
1853 | ||
1854 | bool can_create_connection() { | |
1855 | return available_connections.size() < max_connections; | |
1856 | } | |
1857 | ||
1858 | void generate_connection() { | |
9f95a23c | 1859 | std::lock_guard l{lock}; |
7c673cae FG |
1860 | if (!can_create_connection()) |
1861 | return ; | |
1862 | ||
1863 | Messenger *server, *client; | |
1864 | { | |
1865 | boost::uniform_int<> choose(0, available_servers.size() - 1); | |
1866 | int index = choose(rng); | |
1867 | set<Messenger*>::iterator i = available_servers.begin(); | |
1868 | for (; index > 0; --index, ++i) ; | |
1869 | server = *i; | |
1870 | } | |
1871 | { | |
1872 | boost::uniform_int<> choose(0, available_clients.size() - 1); | |
1873 | int index = choose(rng); | |
1874 | set<Messenger*>::iterator i = available_clients.begin(); | |
1875 | for (; index > 0; --index, ++i) ; | |
1876 | client = *i; | |
1877 | } | |
1878 | ||
1879 | pair<Messenger*, Messenger*> p; | |
1880 | { | |
1881 | boost::uniform_int<> choose(0, available_servers.size() - 1); | |
1882 | if (server->get_default_policy().server) { | |
1883 | p = make_pair(client, server); | |
11fdf7f2 TL |
1884 | ConnectionRef conn = client->connect_to(server->get_mytype(), |
1885 | server->get_myaddrs()); | |
1886 | available_connections[conn] = p; | |
7c673cae | 1887 | } else { |
11fdf7f2 TL |
1888 | ConnectionRef conn = client->connect_to(server->get_mytype(), |
1889 | server->get_myaddrs()); | |
1890 | p = make_pair(client, server); | |
1891 | available_connections[conn] = p; | |
7c673cae FG |
1892 | } |
1893 | } | |
7c673cae FG |
1894 | } |
1895 | ||
1896 | void send_message() { | |
9f95a23c | 1897 | std::lock_guard l{lock}; |
7c673cae FG |
1898 | ConnectionRef conn = _get_random_connection(); |
1899 | boost::uniform_int<> true_false(0, 99); | |
1900 | int val = true_false(rng); | |
1901 | if (val >= 95) { | |
1902 | uuid_d uuid; | |
1903 | uuid.generate_random(); | |
1904 | MCommand *m = new MCommand(uuid); | |
1905 | vector<string> cmds; | |
1906 | cmds.push_back("command"); | |
1907 | m->cmd = cmds; | |
1908 | m->set_priority(200); | |
1909 | conn->send_message(m); | |
1910 | } else { | |
1911 | boost::uniform_int<> u(0, rand_data.size()-1); | |
1912 | dispatcher.send_message_wrap(conn, rand_data[u(rng)]); | |
1913 | } | |
1914 | } | |
1915 | ||
1916 | void drop_connection() { | |
9f95a23c | 1917 | std::lock_guard l{lock}; |
7c673cae FG |
1918 | if (available_connections.size() < 10) |
1919 | return; | |
1920 | ConnectionRef conn = _get_random_connection(); | |
1921 | dispatcher.clear_pending(conn); | |
1922 | conn->mark_down(); | |
11fdf7f2 TL |
1923 | if (!client_policy.server && |
1924 | !client_policy.lossy && | |
1925 | client_policy.standby) { | |
1926 | // it's a lossless policy, so we need to mark down each side | |
1927 | pair<Messenger*, Messenger*> &p = available_connections[conn]; | |
1928 | if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) { | |
1929 | ASSERT_EQ(conn->get_messenger(), p.first); | |
1930 | ConnectionRef peer = p.second->connect_to(p.first->get_mytype(), | |
1931 | p.first->get_myaddrs()); | |
1932 | peer->mark_down(); | |
1933 | dispatcher.clear_pending(peer); | |
1934 | available_connections.erase(peer); | |
1935 | } | |
7c673cae FG |
1936 | } |
1937 | ASSERT_EQ(available_connections.erase(conn), 1U); | |
1938 | } | |
1939 | ||
1940 | void print_internal_state(bool detail=false) { | |
9f95a23c | 1941 | std::lock_guard l{lock}; |
7c673cae FG |
1942 | lderr(g_ceph_context) << "available_connections: " << available_connections.size() |
1943 | << " inflight messages: " << dispatcher.get_pending() << dendl; | |
1944 | if (detail && !available_connections.empty()) { | |
1945 | dispatcher.print(); | |
1946 | } | |
1947 | } | |
1948 | ||
1949 | void wait_for_done() { | |
1950 | int64_t tick_us = 1000 * 100; // 100ms | |
1951 | int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins | |
1952 | int i = 0; | |
1953 | while (dispatcher.get_pending()) { | |
1954 | usleep(tick_us); | |
1955 | timeout_us -= tick_us; | |
1956 | if (i++ % 50 == 0) | |
1957 | print_internal_state(true); | |
1958 | if (timeout_us < 0) | |
11fdf7f2 | 1959 | ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!"); |
7c673cae FG |
1960 | } |
1961 | for (set<Messenger*>::iterator it = available_servers.begin(); | |
1962 | it != available_servers.end(); ++it) { | |
1963 | (*it)->shutdown(); | |
1964 | (*it)->wait(); | |
1965 | ASSERT_EQ((*it)->get_dispatch_queue_len(), 0); | |
1966 | delete (*it); | |
1967 | } | |
1968 | available_servers.clear(); | |
1969 | ||
1970 | for (set<Messenger*>::iterator it = available_clients.begin(); | |
1971 | it != available_clients.end(); ++it) { | |
1972 | (*it)->shutdown(); | |
1973 | (*it)->wait(); | |
1974 | ASSERT_EQ((*it)->get_dispatch_queue_len(), 0); | |
1975 | delete (*it); | |
1976 | } | |
1977 | available_clients.clear(); | |
1978 | } | |
1979 | ||
1980 | void handle_reset(Connection *con) { | |
9f95a23c | 1981 | std::lock_guard l{lock}; |
7c673cae FG |
1982 | available_connections.erase(con); |
1983 | dispatcher.clear_pending(con); | |
1984 | } | |
1985 | }; | |
1986 | ||
1987 | bool SyntheticDispatcher::ms_handle_reset(Connection *con) { | |
1988 | workload->handle_reset(con); | |
1989 | return true; | |
1990 | } | |
1991 | ||
1992 | TEST_P(MessengerTest, SyntheticStressTest) { | |
1993 | SyntheticWorkload test_msg(8, 32, GetParam(), 100, | |
1994 | Messenger::Policy::stateful_server(0), | |
1995 | Messenger::Policy::lossless_client(0)); | |
1996 | for (int i = 0; i < 100; ++i) { | |
1997 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
1998 | test_msg.generate_connection(); | |
1999 | } | |
2000 | gen_type rng(time(NULL)); | |
2001 | for (int i = 0; i < 5000; ++i) { | |
2002 | if (!(i % 10)) { | |
2003 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
2004 | test_msg.print_internal_state(); | |
2005 | } | |
2006 | boost::uniform_int<> true_false(0, 99); | |
2007 | int val = true_false(rng); | |
2008 | if (val > 90) { | |
2009 | test_msg.generate_connection(); | |
2010 | } else if (val > 80) { | |
2011 | test_msg.drop_connection(); | |
2012 | } else if (val > 10) { | |
2013 | test_msg.send_message(); | |
2014 | } else { | |
2015 | usleep(rand() % 1000 + 500); | |
2016 | } | |
2017 | } | |
2018 | test_msg.wait_for_done(); | |
2019 | } | |
2020 | ||
2021 | TEST_P(MessengerTest, SyntheticStressTest1) { | |
2022 | SyntheticWorkload test_msg(16, 32, GetParam(), 100, | |
2023 | Messenger::Policy::lossless_peer_reuse(0), | |
2024 | Messenger::Policy::lossless_peer_reuse(0)); | |
2025 | for (int i = 0; i < 10; ++i) { | |
2026 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
2027 | test_msg.generate_connection(); | |
2028 | } | |
2029 | gen_type rng(time(NULL)); | |
2030 | for (int i = 0; i < 10000; ++i) { | |
2031 | if (!(i % 10)) { | |
2032 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
2033 | test_msg.print_internal_state(); | |
2034 | } | |
2035 | boost::uniform_int<> true_false(0, 99); | |
2036 | int val = true_false(rng); | |
2037 | if (val > 80) { | |
2038 | test_msg.generate_connection(); | |
2039 | } else if (val > 60) { | |
2040 | test_msg.drop_connection(); | |
2041 | } else if (val > 10) { | |
2042 | test_msg.send_message(); | |
2043 | } else { | |
2044 | usleep(rand() % 1000 + 500); | |
2045 | } | |
2046 | } | |
2047 | test_msg.wait_for_done(); | |
2048 | } | |
2049 | ||
2050 | ||
2051 | TEST_P(MessengerTest, SyntheticInjectTest) { | |
2052 | uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes; | |
11fdf7f2 TL |
2053 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30"); |
2054 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); | |
2055 | g_ceph_context->_conf.set_val("ms_dispatch_throttle_bytes", "16777216"); | |
7c673cae FG |
2056 | SyntheticWorkload test_msg(8, 32, GetParam(), 100, |
2057 | Messenger::Policy::stateful_server(0), | |
2058 | Messenger::Policy::lossless_client(0)); | |
2059 | for (int i = 0; i < 100; ++i) { | |
2060 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
2061 | test_msg.generate_connection(); | |
2062 | } | |
2063 | gen_type rng(time(NULL)); | |
2064 | for (int i = 0; i < 1000; ++i) { | |
2065 | if (!(i % 10)) { | |
2066 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
2067 | test_msg.print_internal_state(); | |
2068 | } | |
2069 | boost::uniform_int<> true_false(0, 99); | |
2070 | int val = true_false(rng); | |
2071 | if (val > 90) { | |
2072 | test_msg.generate_connection(); | |
2073 | } else if (val > 80) { | |
2074 | test_msg.drop_connection(); | |
2075 | } else if (val > 10) { | |
2076 | test_msg.send_message(); | |
2077 | } else { | |
2078 | usleep(rand() % 500 + 100); | |
2079 | } | |
2080 | } | |
2081 | test_msg.wait_for_done(); | |
11fdf7f2 TL |
2082 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); |
2083 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); | |
2084 | g_ceph_context->_conf.set_val( | |
7c673cae FG |
2085 | "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes)); |
2086 | } | |
2087 | ||
2088 | TEST_P(MessengerTest, SyntheticInjectTest2) { | |
11fdf7f2 TL |
2089 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30"); |
2090 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); | |
7c673cae FG |
2091 | SyntheticWorkload test_msg(8, 16, GetParam(), 100, |
2092 | Messenger::Policy::lossless_peer_reuse(0), | |
2093 | Messenger::Policy::lossless_peer_reuse(0)); | |
2094 | for (int i = 0; i < 100; ++i) { | |
2095 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
2096 | test_msg.generate_connection(); | |
2097 | } | |
2098 | gen_type rng(time(NULL)); | |
2099 | for (int i = 0; i < 1000; ++i) { | |
2100 | if (!(i % 10)) { | |
2101 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
2102 | test_msg.print_internal_state(); | |
2103 | } | |
2104 | boost::uniform_int<> true_false(0, 99); | |
2105 | int val = true_false(rng); | |
2106 | if (val > 90) { | |
2107 | test_msg.generate_connection(); | |
2108 | } else if (val > 80) { | |
2109 | test_msg.drop_connection(); | |
2110 | } else if (val > 10) { | |
2111 | test_msg.send_message(); | |
2112 | } else { | |
2113 | usleep(rand() % 500 + 100); | |
2114 | } | |
2115 | } | |
2116 | test_msg.wait_for_done(); | |
11fdf7f2 TL |
2117 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); |
2118 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); | |
7c673cae FG |
2119 | } |
2120 | ||
2121 | TEST_P(MessengerTest, SyntheticInjectTest3) { | |
11fdf7f2 TL |
2122 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "600"); |
2123 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); | |
7c673cae FG |
2124 | SyntheticWorkload test_msg(8, 16, GetParam(), 100, |
2125 | Messenger::Policy::stateless_server(0), | |
2126 | Messenger::Policy::lossy_client(0)); | |
2127 | for (int i = 0; i < 100; ++i) { | |
2128 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
2129 | test_msg.generate_connection(); | |
2130 | } | |
2131 | gen_type rng(time(NULL)); | |
2132 | for (int i = 0; i < 1000; ++i) { | |
2133 | if (!(i % 10)) { | |
2134 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
2135 | test_msg.print_internal_state(); | |
2136 | } | |
2137 | boost::uniform_int<> true_false(0, 99); | |
2138 | int val = true_false(rng); | |
2139 | if (val > 90) { | |
2140 | test_msg.generate_connection(); | |
2141 | } else if (val > 80) { | |
2142 | test_msg.drop_connection(); | |
2143 | } else if (val > 10) { | |
2144 | test_msg.send_message(); | |
2145 | } else { | |
2146 | usleep(rand() % 500 + 100); | |
2147 | } | |
2148 | } | |
2149 | test_msg.wait_for_done(); | |
11fdf7f2 TL |
2150 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); |
2151 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); | |
7c673cae FG |
2152 | } |
2153 | ||
2154 | ||
2155 | TEST_P(MessengerTest, SyntheticInjectTest4) { | |
11fdf7f2 TL |
2156 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30"); |
2157 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); | |
2158 | g_ceph_context->_conf.set_val("ms_inject_delay_probability", "1"); | |
2159 | g_ceph_context->_conf.set_val("ms_inject_delay_type", "client osd"); | |
2160 | g_ceph_context->_conf.set_val("ms_inject_delay_max", "5"); | |
7c673cae FG |
2161 | SyntheticWorkload test_msg(16, 32, GetParam(), 100, |
2162 | Messenger::Policy::lossless_peer(0), | |
2163 | Messenger::Policy::lossless_peer(0)); | |
2164 | for (int i = 0; i < 100; ++i) { | |
2165 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
2166 | test_msg.generate_connection(); | |
2167 | } | |
2168 | gen_type rng(time(NULL)); | |
2169 | for (int i = 0; i < 1000; ++i) { | |
2170 | if (!(i % 10)) { | |
2171 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
2172 | test_msg.print_internal_state(); | |
2173 | } | |
2174 | boost::uniform_int<> true_false(0, 99); | |
2175 | int val = true_false(rng); | |
2176 | if (val > 95) { | |
2177 | test_msg.generate_connection(); | |
2178 | } else if (val > 80) { | |
2179 | // test_msg.drop_connection(); | |
2180 | } else if (val > 10) { | |
2181 | test_msg.send_message(); | |
2182 | } else { | |
2183 | usleep(rand() % 500 + 100); | |
2184 | } | |
2185 | } | |
2186 | test_msg.wait_for_done(); | |
11fdf7f2 TL |
2187 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); |
2188 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); | |
2189 | g_ceph_context->_conf.set_val("ms_inject_delay_probability", "0"); | |
2190 | g_ceph_context->_conf.set_val("ms_inject_delay_type", ""); | |
2191 | g_ceph_context->_conf.set_val("ms_inject_delay_max", "0"); | |
7c673cae FG |
2192 | } |
2193 | ||
2194 | ||
2195 | class MarkdownDispatcher : public Dispatcher { | |
9f95a23c | 2196 | ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock"); |
7c673cae FG |
2197 | set<ConnectionRef> conns; |
2198 | bool last_mark; | |
2199 | public: | |
31f18b77 | 2200 | std::atomic<uint64_t> count = { 0 }; |
9f95a23c | 2201 | explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), |
11fdf7f2 | 2202 | last_mark(false) { |
11fdf7f2 | 2203 | } |
7c673cae FG |
2204 | bool ms_can_fast_dispatch_any() const override { return false; } |
2205 | bool ms_can_fast_dispatch(const Message *m) const override { | |
2206 | switch (m->get_type()) { | |
2207 | case CEPH_MSG_PING: | |
2208 | return true; | |
2209 | default: | |
2210 | return false; | |
2211 | } | |
2212 | } | |
2213 | ||
2214 | void ms_handle_fast_connect(Connection *con) override { | |
2215 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
9f95a23c | 2216 | std::lock_guard l{lock}; |
7c673cae FG |
2217 | conns.insert(con); |
2218 | } | |
2219 | void ms_handle_fast_accept(Connection *con) override { | |
9f95a23c | 2220 | std::lock_guard l{lock}; |
7c673cae FG |
2221 | conns.insert(con); |
2222 | } | |
2223 | bool ms_dispatch(Message *m) override { | |
2224 | lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl; | |
9f95a23c | 2225 | std::lock_guard l{lock}; |
31f18b77 | 2226 | count++; |
7c673cae FG |
2227 | conns.insert(m->get_connection()); |
2228 | if (conns.size() < 2 && !last_mark) { | |
2229 | m->put(); | |
2230 | return true; | |
2231 | } | |
2232 | ||
2233 | last_mark = true; | |
2234 | usleep(rand() % 500); | |
2235 | for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) { | |
2236 | if ((*it) != m->get_connection().get()) { | |
2237 | (*it)->mark_down(); | |
2238 | conns.erase(it); | |
2239 | break; | |
2240 | } | |
2241 | } | |
2242 | if (conns.empty()) | |
2243 | last_mark = false; | |
2244 | m->put(); | |
2245 | return true; | |
2246 | } | |
2247 | bool ms_handle_reset(Connection *con) override { | |
2248 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
9f95a23c | 2249 | std::lock_guard l{lock}; |
7c673cae FG |
2250 | conns.erase(con); |
2251 | usleep(rand() % 500); | |
2252 | return true; | |
2253 | } | |
2254 | void ms_handle_remote_reset(Connection *con) override { | |
9f95a23c | 2255 | std::lock_guard l{lock}; |
7c673cae FG |
2256 | conns.erase(con); |
2257 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
2258 | } | |
2259 | bool ms_handle_refused(Connection *con) override { | |
2260 | return false; | |
2261 | } | |
2262 | void ms_fast_dispatch(Message *m) override { | |
2263 | ceph_abort(); | |
2264 | } | |
11fdf7f2 TL |
2265 | int ms_handle_authentication(Connection *con) override { |
2266 | return 1; | |
7c673cae FG |
2267 | } |
2268 | }; | |
2269 | ||
2270 | ||
2271 | // Markdown with external lock | |
2272 | TEST_P(MessengerTest, MarkdownTest) { | |
f67539c2 | 2273 | Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid()); |
7c673cae | 2274 | MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true); |
11fdf7f2 TL |
2275 | DummyAuthClientServer dummy_auth(g_ceph_context); |
2276 | dummy_auth.auth_registry.refresh_config(); | |
7c673cae | 2277 | entity_addr_t bind_addr; |
9f95a23c | 2278 | bind_addr.parse("v2:127.0.0.1:16800"); |
7c673cae FG |
2279 | server_msgr->bind(bind_addr); |
2280 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
11fdf7f2 TL |
2281 | server_msgr->set_auth_client(&dummy_auth); |
2282 | server_msgr->set_auth_server(&dummy_auth); | |
7c673cae | 2283 | server_msgr->start(); |
9f95a23c | 2284 | bind_addr.parse("v2:127.0.0.1:16801"); |
7c673cae FG |
2285 | server_msgr2->bind(bind_addr); |
2286 | server_msgr2->add_dispatcher_head(&srv_dispatcher); | |
11fdf7f2 TL |
2287 | server_msgr2->set_auth_client(&dummy_auth); |
2288 | server_msgr2->set_auth_server(&dummy_auth); | |
7c673cae FG |
2289 | server_msgr2->start(); |
2290 | ||
2291 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
11fdf7f2 TL |
2292 | client_msgr->set_auth_client(&dummy_auth); |
2293 | client_msgr->set_auth_server(&dummy_auth); | |
7c673cae FG |
2294 | client_msgr->start(); |
2295 | ||
2296 | int i = 1000; | |
2297 | uint64_t last = 0; | |
2298 | bool equal = false; | |
2299 | uint64_t equal_count = 0; | |
2300 | while (i--) { | |
11fdf7f2 TL |
2301 | ConnectionRef conn1 = client_msgr->connect_to(server_msgr->get_mytype(), |
2302 | server_msgr->get_myaddrs()); | |
2303 | ConnectionRef conn2 = client_msgr->connect_to(server_msgr2->get_mytype(), | |
2304 | server_msgr2->get_myaddrs()); | |
7c673cae FG |
2305 | MPing *m = new MPing(); |
2306 | ASSERT_EQ(conn1->send_message(m), 0); | |
2307 | m = new MPing(); | |
2308 | ASSERT_EQ(conn2->send_message(m), 0); | |
31f18b77 FG |
2309 | CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1); |
2310 | if (srv_dispatcher.count == last) { | |
7c673cae FG |
2311 | lderr(g_ceph_context) << __func__ << " last is " << last << dendl; |
2312 | equal = true; | |
2313 | equal_count++; | |
2314 | } else { | |
2315 | equal = false; | |
2316 | equal_count = 0; | |
2317 | } | |
31f18b77 | 2318 | last = srv_dispatcher.count; |
7c673cae FG |
2319 | if (equal_count) |
2320 | usleep(1000*500); | |
2321 | ASSERT_FALSE(equal && equal_count > 3); | |
2322 | } | |
2323 | server_msgr->shutdown(); | |
2324 | client_msgr->shutdown(); | |
2325 | server_msgr2->shutdown(); | |
2326 | server_msgr->wait(); | |
2327 | client_msgr->wait(); | |
2328 | server_msgr2->wait(); | |
2329 | delete server_msgr2; | |
2330 | } | |
2331 | ||
9f95a23c | 2332 | INSTANTIATE_TEST_SUITE_P( |
7c673cae FG |
2333 | Messenger, |
2334 | MessengerTest, | |
2335 | ::testing::Values( | |
9f95a23c | 2336 | "async+posix" |
7c673cae FG |
2337 | ) |
2338 | ); | |
2339 | ||
7c673cae FG |
2340 | int main(int argc, char **argv) { |
2341 | vector<const char*> args; | |
2342 | argv_to_vec(argc, (const char **)argv, args); | |
11fdf7f2 TL |
2343 | |
2344 | auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, | |
2345 | CODE_ENVIRONMENT_UTILITY, | |
2346 | CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); | |
2347 | g_ceph_context->_conf.set_val("auth_cluster_required", "none"); | |
2348 | g_ceph_context->_conf.set_val("auth_service_required", "none"); | |
2349 | g_ceph_context->_conf.set_val("auth_client_required", "none"); | |
2350 | g_ceph_context->_conf.set_val("keyring", "/dev/null"); | |
2351 | g_ceph_context->_conf.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async"); | |
2352 | g_ceph_context->_conf.set_val("ms_die_on_bad_msg", "true"); | |
2353 | g_ceph_context->_conf.set_val("ms_die_on_old_message", "true"); | |
2354 | g_ceph_context->_conf.set_val("ms_max_backoff", "1"); | |
7c673cae FG |
2355 | common_init_finish(g_ceph_context); |
2356 | ||
2357 | ::testing::InitGoogleTest(&argc, argv); | |
2358 | return RUN_ALL_TESTS(); | |
2359 | } | |
2360 | ||
2361 | /* | |
2362 | * Local Variables: | |
2363 | * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr" | |
2364 | * End: | |
2365 | */ |