1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <condition_variable>
8 #include <gtest/gtest.h>
10 #include "global/global_init.h"
11 #include "common/ceph_argparse.h"
13 #include "DirectMessenger.h"
14 #include "FastStrategy.h"
15 #include "QueueStrategy.h"
16 #include "messages/MPing.h"
19 /// mock dispatcher that calls the given callback
20 class MockDispatcher
: public Dispatcher
{
21 std::function
<void(Message
*)> callback
;
23 MockDispatcher(CephContext
*cct
, std::function
<void(Message
*)> callback
)
24 : Dispatcher(cct
), callback(std::move(callback
)) {}
25 bool ms_handle_reset(Connection
*con
) override
{ return false; }
26 void ms_handle_remote_reset(Connection
*con
) override
{}
27 bool ms_handle_refused(Connection
*con
) override
{ return false; }
28 bool ms_dispatch(Message
*m
) override
{
35 /// test synchronous dispatch of messenger and connection interfaces
36 TEST(DirectMessenger
, SyncDispatch
)
38 auto cct
= g_ceph_context
;
40 // use FastStrategy for synchronous dispatch
41 DirectMessenger
client(cct
, entity_name_t::CLIENT(1),
42 "client", 0, new FastStrategy());
43 DirectMessenger
server(cct
, entity_name_t::CLIENT(2),
44 "server", 0, new FastStrategy());
46 ASSERT_EQ(0, client
.set_direct_peer(&server
));
47 ASSERT_EQ(0, server
.set_direct_peer(&client
));
49 bool got_request
= false;
50 bool got_reply
= false;
52 MockDispatcher
client_dispatcher(cct
, [&] (Message
*m
) {
55 client
.add_dispatcher_head(&client_dispatcher
);
57 MockDispatcher
server_dispatcher(cct
, [&] (Message
*m
) {
59 ASSERT_EQ(0, m
->get_connection()->send_message(new MPing()));
61 server
.add_dispatcher_head(&server_dispatcher
);
63 ASSERT_EQ(0, client
.start());
64 ASSERT_EQ(0, server
.start());
66 // test DirectMessenger::send_message()
67 ASSERT_EQ(0, client
.send_message(new MPing(), server
.get_myinst()));
68 ASSERT_TRUE(got_request
);
69 ASSERT_TRUE(got_reply
);
71 // test DirectConnection::send_message()
75 auto conn
= client
.get_connection(server
.get_myinst());
76 ASSERT_EQ(0, conn
->send_message(new MPing()));
77 ASSERT_TRUE(got_request
);
78 ASSERT_TRUE(got_reply
);
81 // test DirectMessenger::send_message() with loopback address
84 ASSERT_EQ(0, client
.send_message(new MPing(), client
.get_myinst()));
85 ASSERT_FALSE(got_request
); // server should never see this
86 ASSERT_TRUE(got_reply
);
88 // test DirectConnection::send_message() with loopback address
92 auto conn
= client
.get_connection(client
.get_myinst());
93 ASSERT_EQ(0, conn
->send_message(new MPing()));
94 ASSERT_FALSE(got_request
); // server should never see this
95 ASSERT_TRUE(got_reply
);
98 // test DirectConnection::send_message() with loopback connection
102 auto conn
= client
.get_loopback_connection();
103 ASSERT_EQ(0, conn
->send_message(new MPing()));
104 ASSERT_FALSE(got_request
); // server should never see this
105 ASSERT_TRUE(got_reply
);
108 ASSERT_EQ(0, client
.shutdown());
111 ASSERT_EQ(0, server
.shutdown());
115 /// test asynchronous dispatch of messenger and connection interfaces
116 TEST(DirectMessenger
, AsyncDispatch
)
118 auto cct
= g_ceph_context
;
120 // use QueueStrategy for async replies
121 DirectMessenger
client(cct
, entity_name_t::CLIENT(1),
122 "client", 0, new QueueStrategy(1));
123 DirectMessenger
server(cct
, entity_name_t::CLIENT(2),
124 "server", 0, new FastStrategy());
126 ASSERT_EQ(0, client
.set_direct_peer(&server
));
127 ASSERT_EQ(0, server
.set_direct_peer(&client
));
129 // condition variable to wait on ping reply
131 std::condition_variable cond
;
134 auto wait_for_reply
= [&] {
135 std::unique_lock
<std::mutex
> lock(mutex
);
139 done
= false; // clear for reuse
142 // client dispatcher signals the condition variable on reply
143 MockDispatcher
client_dispatcher(cct
, [&] (Message
*m
) {
144 std::lock_guard
<std::mutex
> lock(mutex
);
148 client
.add_dispatcher_head(&client_dispatcher
);
150 MockDispatcher
server_dispatcher(cct
, [&] (Message
*m
) {
151 // hold the lock over the call to send_message() to prove that the client's
152 // dispatch is asynchronous. if it isn't, it will deadlock
153 std::lock_guard
<std::mutex
> lock(mutex
);
154 ASSERT_EQ(0, m
->get_connection()->send_message(new MPing()));
156 server
.add_dispatcher_head(&server_dispatcher
);
158 ASSERT_EQ(0, client
.start());
159 ASSERT_EQ(0, server
.start());
161 // test DirectMessenger::send_message()
162 ASSERT_EQ(0, client
.send_message(new MPing(), server
.get_myinst()));
165 // test DirectConnection::send_message()
167 auto conn
= client
.get_connection(server
.get_myinst());
168 ASSERT_EQ(0, conn
->send_message(new MPing()));
172 // test DirectMessenger::send_message() with loopback address
174 // hold the lock to test that loopback dispatch is asynchronous
175 std::lock_guard
<std::mutex
> lock(mutex
);
176 ASSERT_EQ(0, client
.send_message(new MPing(), client
.get_myinst()));
180 // test DirectConnection::send_message() with loopback address
182 auto conn
= client
.get_connection(client
.get_myinst());
183 // hold the lock to test that loopback dispatch is asynchronous
184 std::lock_guard
<std::mutex
> lock(mutex
);
185 ASSERT_EQ(0, conn
->send_message(new MPing()));
189 // test DirectConnection::send_message() with loopback connection
191 auto conn
= client
.get_loopback_connection();
192 // hold the lock to test that loopback dispatch is asynchronous
193 std::lock_guard
<std::mutex
> lock(mutex
);
194 ASSERT_EQ(0, conn
->send_message(new MPing()));
198 ASSERT_EQ(0, client
.shutdown());
201 ASSERT_EQ(0, server
.shutdown());
205 /// test that wait() blocks until shutdown()
206 TEST(DirectMessenger
, WaitShutdown
)
208 auto cct
= g_ceph_context
;
210 // test wait() with both Queue- and FastStrategy
211 DirectMessenger
client(cct
, entity_name_t::CLIENT(1),
212 "client", 0, new QueueStrategy(1));
213 DirectMessenger
server(cct
, entity_name_t::CLIENT(2),
214 "server", 0, new FastStrategy());
216 ASSERT_EQ(0, client
.set_direct_peer(&server
));
217 ASSERT_EQ(0, server
.set_direct_peer(&client
));
219 ASSERT_EQ(0, client
.start());
220 ASSERT_EQ(0, server
.start());
222 std::atomic
<bool> client_waiting
{false};
223 std::atomic
<bool> server_waiting
{false};
225 // spawn threads to wait() on each of the messengers
226 std::thread
client_thread([&] {
227 client_waiting
= true;
229 client_waiting
= false;
231 std::thread
server_thread([&] {
232 server_waiting
= true;
234 server_waiting
= false;
237 // give them time to start
238 std::this_thread::sleep_for(std::chrono::milliseconds(50));
240 ASSERT_TRUE(client_waiting
);
241 ASSERT_TRUE(server_waiting
);
243 // call shutdown to unblock the waiting threads
244 ASSERT_EQ(0, client
.shutdown());
245 ASSERT_EQ(0, server
.shutdown());
247 client_thread
.join();
248 server_thread
.join();
250 ASSERT_FALSE(client_waiting
);
251 ASSERT_FALSE(server_waiting
);
254 /// test connection and messenger interfaces after mark_down()
255 TEST(DirectMessenger
, MarkDown
)
257 auto cct
= g_ceph_context
;
259 DirectMessenger
client(cct
, entity_name_t::CLIENT(1),
260 "client", 0, new FastStrategy());
261 DirectMessenger
server(cct
, entity_name_t::CLIENT(2),
262 "server", 0, new FastStrategy());
264 ASSERT_EQ(0, client
.set_direct_peer(&server
));
265 ASSERT_EQ(0, server
.set_direct_peer(&client
));
267 ASSERT_EQ(0, client
.start());
268 ASSERT_EQ(0, server
.start());
270 auto client_to_server
= client
.get_connection(server
.get_myinst());
271 auto server_to_client
= server
.get_connection(client
.get_myinst());
273 ASSERT_TRUE(client_to_server
->is_connected());
274 ASSERT_TRUE(server_to_client
->is_connected());
276 // mark_down() breaks the connection on both sides
277 client_to_server
->mark_down();
279 ASSERT_FALSE(client_to_server
->is_connected());
280 ASSERT_EQ(-ENOTCONN
, client_to_server
->send_message(new MPing()));
281 ASSERT_EQ(-ENOTCONN
, client
.send_message(new MPing(), server
.get_myinst()));
283 ASSERT_FALSE(server_to_client
->is_connected());
284 ASSERT_EQ(-ENOTCONN
, server_to_client
->send_message(new MPing()));
285 ASSERT_EQ(-ENOTCONN
, server
.send_message(new MPing(), client
.get_myinst()));
287 ASSERT_EQ(0, client
.shutdown());
290 ASSERT_EQ(0, server
.shutdown());
294 /// test connection and messenger interfaces after shutdown()
295 TEST(DirectMessenger
, SendShutdown
)
297 auto cct
= g_ceph_context
;
299 // put client on the heap so we can free it early
300 std::unique_ptr
<DirectMessenger
> client
{
301 new DirectMessenger(cct
, entity_name_t::CLIENT(1),
302 "client", 0, new FastStrategy())};
303 DirectMessenger
server(cct
, entity_name_t::CLIENT(2),
304 "server", 0, new FastStrategy());
306 ASSERT_EQ(0, client
->set_direct_peer(&server
));
307 ASSERT_EQ(0, server
.set_direct_peer(client
.get()));
309 ASSERT_EQ(0, client
->start());
310 ASSERT_EQ(0, server
.start());
312 const auto client_inst
= client
->get_myinst();
313 const auto server_inst
= server
.get_myinst();
315 auto client_to_server
= client
->get_connection(server_inst
);
316 auto server_to_client
= server
.get_connection(client_inst
);
318 ASSERT_TRUE(client_to_server
->is_connected());
319 ASSERT_TRUE(server_to_client
->is_connected());
321 // shut down the client to break connections
322 ASSERT_EQ(0, client
->shutdown());
325 ASSERT_FALSE(client_to_server
->is_connected());
326 ASSERT_EQ(-ENOTCONN
, client_to_server
->send_message(new MPing()));
327 ASSERT_EQ(-ENOTCONN
, client
->send_message(new MPing(), server_inst
));
329 // free the client connection/messenger to test that calls to the server no
330 // longer try to dereference them
331 client_to_server
.reset();
334 ASSERT_FALSE(server_to_client
->is_connected());
335 ASSERT_EQ(-ENOTCONN
, server_to_client
->send_message(new MPing()));
336 ASSERT_EQ(-ENOTCONN
, server
.send_message(new MPing(), client_inst
));
338 ASSERT_EQ(0, server
.shutdown());
342 /// test connection and messenger interfaces after bind()
343 TEST(DirectMessenger
, Bind
)
345 auto cct
= g_ceph_context
;
347 DirectMessenger
client(cct
, entity_name_t::CLIENT(1),
348 "client", 0, new FastStrategy());
349 DirectMessenger
server(cct
, entity_name_t::CLIENT(2),
350 "server", 0, new FastStrategy());
352 entity_addr_t client_addr
;
353 client_addr
.set_family(AF_INET
);
354 client_addr
.set_port(1);
356 // client bind succeeds before set_direct_peer()
357 ASSERT_EQ(0, client
.bind(client_addr
));
359 ASSERT_EQ(0, client
.set_direct_peer(&server
));
360 ASSERT_EQ(0, server
.set_direct_peer(&client
));
362 // server bind fails after set_direct_peer()
363 entity_addr_t empty_addr
;
364 ASSERT_EQ(-EINVAL
, server
.bind(empty_addr
));
366 ASSERT_EQ(0, client
.start());
367 ASSERT_EQ(0, server
.start());
369 auto client_to_server
= client
.get_connection(server
.get_myinst());
370 auto server_to_client
= server
.get_connection(client
.get_myinst());
372 ASSERT_TRUE(client_to_server
->is_connected());
373 ASSERT_TRUE(server_to_client
->is_connected());
375 // no address in connection to server
376 ASSERT_EQ(empty_addr
, client_to_server
->get_peer_addr());
377 // bind address is reflected in connection to client
378 ASSERT_EQ(client_addr
, server_to_client
->get_peer_addr());
380 // mark_down() with bind address breaks the connection
381 server
.mark_down(client_addr
);
383 ASSERT_FALSE(client_to_server
->is_connected());
384 ASSERT_FALSE(server_to_client
->is_connected());
386 ASSERT_EQ(0, client
.shutdown());
389 ASSERT_EQ(0, server
.shutdown());
393 /// test connection and messenger interfaces before calls to set_direct_peer()
394 TEST(DirectMessenger
, StartWithoutPeer
)
396 auto cct
= g_ceph_context
;
398 DirectMessenger
client(cct
, entity_name_t::CLIENT(1),
399 "client", 0, new FastStrategy());
400 DirectMessenger
server(cct
, entity_name_t::CLIENT(2),
401 "server", 0, new FastStrategy());
403 // can't start until set_direct_peer()
404 ASSERT_EQ(-EINVAL
, client
.start());
405 ASSERT_EQ(-EINVAL
, server
.start());
407 ASSERT_EQ(0, client
.set_direct_peer(&server
));
409 // only client can start
410 ASSERT_EQ(0, client
.start());
411 ASSERT_EQ(-EINVAL
, server
.start());
413 // client has a connection but can't send
414 auto conn
= client
.get_connection(server
.get_myinst());
415 ASSERT_NE(nullptr, conn
);
416 ASSERT_FALSE(conn
->is_connected());
417 ASSERT_EQ(-ENOTCONN
, conn
->send_message(new MPing()));
418 ASSERT_EQ(-ENOTCONN
, client
.send_message(new MPing(), server
.get_myinst()));
420 ASSERT_EQ(0, client
.shutdown());
424 int main(int argc
, char **argv
)
426 // command-line arguments
427 auto args
= argv_to_vec(argc
, argv
);
429 auto cct
= global_init(nullptr, args
, CEPH_ENTITY_TYPE_ANY
,
430 CODE_ENVIRONMENT_DAEMON
,
431 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE
);
432 common_init_finish(cct
.get());
434 ::testing::InitGoogleTest(&argc
, argv
);
435 return RUN_ALL_TESTS();