]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/direct_messenger/test_direct_messenger.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / direct_messenger / test_direct_messenger.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <condition_variable>
5 #include <mutex>
6 #include <thread>
7
8 #include <gtest/gtest.h>
9
10 #include "global/global_init.h"
11 #include "common/ceph_argparse.h"
12
13 #include "DirectMessenger.h"
14 #include "FastStrategy.h"
15 #include "QueueStrategy.h"
16 #include "messages/MPing.h"
17
18
19 /// mock dispatcher that calls the given callback
20 class MockDispatcher : public Dispatcher {
21 std::function<void(Message*)> callback;
22 public:
23 MockDispatcher(CephContext *cct, std::function<void(Message*)> callback)
24 : Dispatcher(cct), callback(std::move(callback)) {}
25 bool ms_handle_reset(Connection *con) override { return false; }
26 void ms_handle_remote_reset(Connection *con) override {}
27 bool ms_handle_refused(Connection *con) override { return false; }
28 bool ms_dispatch(Message *m) override {
29 callback(m);
30 m->put();
31 return true;
32 }
33 };
34
35 /// test synchronous dispatch of messenger and connection interfaces
36 TEST(DirectMessenger, SyncDispatch)
37 {
38 auto cct = g_ceph_context;
39
40 // use FastStrategy for synchronous dispatch
41 DirectMessenger client(cct, entity_name_t::CLIENT(1),
42 "client", 0, new FastStrategy());
43 DirectMessenger server(cct, entity_name_t::CLIENT(2),
44 "server", 0, new FastStrategy());
45
46 ASSERT_EQ(0, client.set_direct_peer(&server));
47 ASSERT_EQ(0, server.set_direct_peer(&client));
48
49 bool got_request = false;
50 bool got_reply = false;
51
52 MockDispatcher client_dispatcher(cct, [&] (Message *m) {
53 got_reply = true;
54 });
55 client.add_dispatcher_head(&client_dispatcher);
56
57 MockDispatcher server_dispatcher(cct, [&] (Message *m) {
58 got_request = true;
59 ASSERT_EQ(0, m->get_connection()->send_message(new MPing()));
60 });
61 server.add_dispatcher_head(&server_dispatcher);
62
63 ASSERT_EQ(0, client.start());
64 ASSERT_EQ(0, server.start());
65
66 // test DirectMessenger::send_message()
67 ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst()));
68 ASSERT_TRUE(got_request);
69 ASSERT_TRUE(got_reply);
70
71 // test DirectConnection::send_message()
72 {
73 got_request = false;
74 got_reply = false;
75 auto conn = client.get_connection(server.get_myinst());
76 ASSERT_EQ(0, conn->send_message(new MPing()));
77 ASSERT_TRUE(got_request);
78 ASSERT_TRUE(got_reply);
79 }
80
81 // test DirectMessenger::send_message() with loopback address
82 got_request = false;
83 got_reply = false;
84 ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst()));
85 ASSERT_FALSE(got_request); // server should never see this
86 ASSERT_TRUE(got_reply);
87
88 // test DirectConnection::send_message() with loopback address
89 {
90 got_request = false;
91 got_reply = false;
92 auto conn = client.get_connection(client.get_myinst());
93 ASSERT_EQ(0, conn->send_message(new MPing()));
94 ASSERT_FALSE(got_request); // server should never see this
95 ASSERT_TRUE(got_reply);
96 }
97
98 // test DirectConnection::send_message() with loopback connection
99 {
100 got_request = false;
101 got_reply = false;
102 auto conn = client.get_loopback_connection();
103 ASSERT_EQ(0, conn->send_message(new MPing()));
104 ASSERT_FALSE(got_request); // server should never see this
105 ASSERT_TRUE(got_reply);
106 }
107
108 ASSERT_EQ(0, client.shutdown());
109 client.wait();
110
111 ASSERT_EQ(0, server.shutdown());
112 server.wait();
113 }
114
115 /// test asynchronous dispatch of messenger and connection interfaces
116 TEST(DirectMessenger, AsyncDispatch)
117 {
118 auto cct = g_ceph_context;
119
120 // use QueueStrategy for async replies
121 DirectMessenger client(cct, entity_name_t::CLIENT(1),
122 "client", 0, new QueueStrategy(1));
123 DirectMessenger server(cct, entity_name_t::CLIENT(2),
124 "server", 0, new FastStrategy());
125
126 ASSERT_EQ(0, client.set_direct_peer(&server));
127 ASSERT_EQ(0, server.set_direct_peer(&client));
128
129 // condition variable to wait on ping reply
130 std::mutex mutex;
131 std::condition_variable cond;
132 bool done = false;
133
134 auto wait_for_reply = [&] {
135 std::unique_lock<std::mutex> lock(mutex);
136 while (!done) {
137 cond.wait(lock);
138 }
139 done = false; // clear for reuse
140 };
141
142 // client dispatcher signals the condition variable on reply
143 MockDispatcher client_dispatcher(cct, [&] (Message *m) {
144 std::lock_guard<std::mutex> lock(mutex);
145 done = true;
146 cond.notify_one();
147 });
148 client.add_dispatcher_head(&client_dispatcher);
149
150 MockDispatcher server_dispatcher(cct, [&] (Message *m) {
151 // hold the lock over the call to send_message() to prove that the client's
152 // dispatch is asynchronous. if it isn't, it will deadlock
153 std::lock_guard<std::mutex> lock(mutex);
154 ASSERT_EQ(0, m->get_connection()->send_message(new MPing()));
155 });
156 server.add_dispatcher_head(&server_dispatcher);
157
158 ASSERT_EQ(0, client.start());
159 ASSERT_EQ(0, server.start());
160
161 // test DirectMessenger::send_message()
162 ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst()));
163 wait_for_reply();
164
165 // test DirectConnection::send_message()
166 {
167 auto conn = client.get_connection(server.get_myinst());
168 ASSERT_EQ(0, conn->send_message(new MPing()));
169 }
170 wait_for_reply();
171
172 // test DirectMessenger::send_message() with loopback address
173 {
174 // hold the lock to test that loopback dispatch is asynchronous
175 std::lock_guard<std::mutex> lock(mutex);
176 ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst()));
177 }
178 wait_for_reply();
179
180 // test DirectConnection::send_message() with loopback address
181 {
182 auto conn = client.get_connection(client.get_myinst());
183 // hold the lock to test that loopback dispatch is asynchronous
184 std::lock_guard<std::mutex> lock(mutex);
185 ASSERT_EQ(0, conn->send_message(new MPing()));
186 }
187 wait_for_reply();
188
189 // test DirectConnection::send_message() with loopback connection
190 {
191 auto conn = client.get_loopback_connection();
192 // hold the lock to test that loopback dispatch is asynchronous
193 std::lock_guard<std::mutex> lock(mutex);
194 ASSERT_EQ(0, conn->send_message(new MPing()));
195 }
196 wait_for_reply();
197
198 ASSERT_EQ(0, client.shutdown());
199 client.wait();
200
201 ASSERT_EQ(0, server.shutdown());
202 server.wait();
203 }
204
205 /// test that wait() blocks until shutdown()
206 TEST(DirectMessenger, WaitShutdown)
207 {
208 auto cct = g_ceph_context;
209
210 // test wait() with both Queue- and FastStrategy
211 DirectMessenger client(cct, entity_name_t::CLIENT(1),
212 "client", 0, new QueueStrategy(1));
213 DirectMessenger server(cct, entity_name_t::CLIENT(2),
214 "server", 0, new FastStrategy());
215
216 ASSERT_EQ(0, client.set_direct_peer(&server));
217 ASSERT_EQ(0, server.set_direct_peer(&client));
218
219 ASSERT_EQ(0, client.start());
220 ASSERT_EQ(0, server.start());
221
222 std::atomic<bool> client_waiting{false};
223 std::atomic<bool> server_waiting{false};
224
225 // spawn threads to wait() on each of the messengers
226 std::thread client_thread([&] {
227 client_waiting = true;
228 client.wait();
229 client_waiting = false;
230 });
231 std::thread server_thread([&] {
232 server_waiting = true;
233 server.wait();
234 server_waiting = false;
235 });
236
237 // give them time to start
238 std::this_thread::sleep_for(std::chrono::milliseconds(50));
239
240 ASSERT_TRUE(client_waiting);
241 ASSERT_TRUE(server_waiting);
242
243 // call shutdown to unblock the waiting threads
244 ASSERT_EQ(0, client.shutdown());
245 ASSERT_EQ(0, server.shutdown());
246
247 client_thread.join();
248 server_thread.join();
249
250 ASSERT_FALSE(client_waiting);
251 ASSERT_FALSE(server_waiting);
252 }
253
254 /// test connection and messenger interfaces after mark_down()
255 TEST(DirectMessenger, MarkDown)
256 {
257 auto cct = g_ceph_context;
258
259 DirectMessenger client(cct, entity_name_t::CLIENT(1),
260 "client", 0, new FastStrategy());
261 DirectMessenger server(cct, entity_name_t::CLIENT(2),
262 "server", 0, new FastStrategy());
263
264 ASSERT_EQ(0, client.set_direct_peer(&server));
265 ASSERT_EQ(0, server.set_direct_peer(&client));
266
267 ASSERT_EQ(0, client.start());
268 ASSERT_EQ(0, server.start());
269
270 auto client_to_server = client.get_connection(server.get_myinst());
271 auto server_to_client = server.get_connection(client.get_myinst());
272
273 ASSERT_TRUE(client_to_server->is_connected());
274 ASSERT_TRUE(server_to_client->is_connected());
275
276 // mark_down() breaks the connection on both sides
277 client_to_server->mark_down();
278
279 ASSERT_FALSE(client_to_server->is_connected());
280 ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing()));
281 ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst()));
282
283 ASSERT_FALSE(server_to_client->is_connected());
284 ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing()));
285 ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client.get_myinst()));
286
287 ASSERT_EQ(0, client.shutdown());
288 client.wait();
289
290 ASSERT_EQ(0, server.shutdown());
291 server.wait();
292 }
293
294 /// test connection and messenger interfaces after shutdown()
295 TEST(DirectMessenger, SendShutdown)
296 {
297 auto cct = g_ceph_context;
298
299 // put client on the heap so we can free it early
300 std::unique_ptr<DirectMessenger> client{
301 new DirectMessenger(cct, entity_name_t::CLIENT(1),
302 "client", 0, new FastStrategy())};
303 DirectMessenger server(cct, entity_name_t::CLIENT(2),
304 "server", 0, new FastStrategy());
305
306 ASSERT_EQ(0, client->set_direct_peer(&server));
307 ASSERT_EQ(0, server.set_direct_peer(client.get()));
308
309 ASSERT_EQ(0, client->start());
310 ASSERT_EQ(0, server.start());
311
312 const auto client_inst = client->get_myinst();
313 const auto server_inst = server.get_myinst();
314
315 auto client_to_server = client->get_connection(server_inst);
316 auto server_to_client = server.get_connection(client_inst);
317
318 ASSERT_TRUE(client_to_server->is_connected());
319 ASSERT_TRUE(server_to_client->is_connected());
320
321 // shut down the client to break connections
322 ASSERT_EQ(0, client->shutdown());
323 client->wait();
324
325 ASSERT_FALSE(client_to_server->is_connected());
326 ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing()));
327 ASSERT_EQ(-ENOTCONN, client->send_message(new MPing(), server_inst));
328
329 // free the client connection/messenger to test that calls to the server no
330 // longer try to dereference them
331 client_to_server.reset();
332 client.reset();
333
334 ASSERT_FALSE(server_to_client->is_connected());
335 ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing()));
336 ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client_inst));
337
338 ASSERT_EQ(0, server.shutdown());
339 server.wait();
340 }
341
342 /// test connection and messenger interfaces after bind()
343 TEST(DirectMessenger, Bind)
344 {
345 auto cct = g_ceph_context;
346
347 DirectMessenger client(cct, entity_name_t::CLIENT(1),
348 "client", 0, new FastStrategy());
349 DirectMessenger server(cct, entity_name_t::CLIENT(2),
350 "server", 0, new FastStrategy());
351
352 entity_addr_t client_addr;
353 client_addr.set_family(AF_INET);
354 client_addr.set_port(1);
355
356 // client bind succeeds before set_direct_peer()
357 ASSERT_EQ(0, client.bind(client_addr));
358
359 ASSERT_EQ(0, client.set_direct_peer(&server));
360 ASSERT_EQ(0, server.set_direct_peer(&client));
361
362 // server bind fails after set_direct_peer()
363 entity_addr_t empty_addr;
364 ASSERT_EQ(-EINVAL, server.bind(empty_addr));
365
366 ASSERT_EQ(0, client.start());
367 ASSERT_EQ(0, server.start());
368
369 auto client_to_server = client.get_connection(server.get_myinst());
370 auto server_to_client = server.get_connection(client.get_myinst());
371
372 ASSERT_TRUE(client_to_server->is_connected());
373 ASSERT_TRUE(server_to_client->is_connected());
374
375 // no address in connection to server
376 ASSERT_EQ(empty_addr, client_to_server->get_peer_addr());
377 // bind address is reflected in connection to client
378 ASSERT_EQ(client_addr, server_to_client->get_peer_addr());
379
380 // mark_down() with bind address breaks the connection
381 server.mark_down(client_addr);
382
383 ASSERT_FALSE(client_to_server->is_connected());
384 ASSERT_FALSE(server_to_client->is_connected());
385
386 ASSERT_EQ(0, client.shutdown());
387 client.wait();
388
389 ASSERT_EQ(0, server.shutdown());
390 server.wait();
391 }
392
393 /// test connection and messenger interfaces before calls to set_direct_peer()
394 TEST(DirectMessenger, StartWithoutPeer)
395 {
396 auto cct = g_ceph_context;
397
398 DirectMessenger client(cct, entity_name_t::CLIENT(1),
399 "client", 0, new FastStrategy());
400 DirectMessenger server(cct, entity_name_t::CLIENT(2),
401 "server", 0, new FastStrategy());
402
403 // can't start until set_direct_peer()
404 ASSERT_EQ(-EINVAL, client.start());
405 ASSERT_EQ(-EINVAL, server.start());
406
407 ASSERT_EQ(0, client.set_direct_peer(&server));
408
409 // only client can start
410 ASSERT_EQ(0, client.start());
411 ASSERT_EQ(-EINVAL, server.start());
412
413 // client has a connection but can't send
414 auto conn = client.get_connection(server.get_myinst());
415 ASSERT_NE(nullptr, conn);
416 ASSERT_FALSE(conn->is_connected());
417 ASSERT_EQ(-ENOTCONN, conn->send_message(new MPing()));
418 ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst()));
419
420 ASSERT_EQ(0, client.shutdown());
421 client.wait();
422 }
423
424 int main(int argc, char **argv)
425 {
426 // command-line arguments
427 auto args = argv_to_vec(argc, argv);
428
429 auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_ANY,
430 CODE_ENVIRONMENT_DAEMON,
431 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
432 common_init_finish(cct.get());
433
434 ::testing::InitGoogleTest(&argc, argv);
435 return RUN_ALL_TESTS();
436 }