1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "common/ceph_context.h"
7 #include <gtest/gtest.h>
14 const std::chrono::milliseconds
wait_time(10);
15 const std::chrono::milliseconds long_wait_time
= wait_time
*50;
21 CctCleaner(CephContext
* _cct
) : cct(_cct
) {}
31 auto cct
= new CephContext(CEPH_ENTITY_TYPE_CLIENT
);
33 CctCleaner
cleaner(cct
);
35 class TestAMQP
: public ::testing::Test
{
37 amqp::connection_ptr_t conn
= nullptr;
38 unsigned current_dequeued
= 0U;
40 void SetUp() override
{
41 ASSERT_TRUE(amqp::init(cct
));
44 void TearDown() override
{
48 // wait for at least one new (since last drain) message to be dequeueud
49 // and then wait for all pending answers to be received
50 void wait_until_drained() {
51 while (amqp::get_dequeued() == current_dequeued
) {
52 std::this_thread::sleep_for(wait_time
);
54 while (amqp::get_inflight() > 0) {
55 std::this_thread::sleep_for(wait_time
);
57 current_dequeued
= amqp::get_dequeued();
61 TEST_F(TestAMQP
, ConnectionOK
)
63 const auto connection_number
= amqp::get_connection_count();
64 conn
= amqp::connect("amqp://localhost", "ex1", false, false, boost::none
);
66 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
67 auto rc
= amqp::publish(conn
, "topic", "message");
71 TEST_F(TestAMQP
, SSLConnectionOK
)
73 const int port
= 5671;
74 const auto connection_number
= amqp::get_connection_count();
75 amqp_mock::set_valid_port(port
);
76 conn
= amqp::connect("amqps://localhost", "ex1", false, false, boost::none
);
78 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
79 auto rc
= amqp::publish(conn
, "topic", "message");
81 amqp_mock::set_valid_port(5672);
84 TEST_F(TestAMQP
, PlainAndSSLConnectionsOK
)
86 const int port
= 5671;
87 const auto connection_number
= amqp::get_connection_count();
88 amqp_mock::set_valid_port(port
);
89 amqp::connection_ptr_t conn1
= amqp::connect("amqps://localhost", "ex1", false, false, boost::none
);
91 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
92 auto rc
= amqp::publish(conn1
, "topic", "message");
94 amqp_mock::set_valid_port(5672);
95 amqp::connection_ptr_t conn2
= amqp::connect("amqp://localhost", "ex1", false, false, boost::none
);
97 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 2);
98 rc
= amqp::publish(conn2
, "topic", "message");
102 TEST_F(TestAMQP
, ConnectionReuse
)
104 amqp::connection_ptr_t conn1
= amqp::connect("amqp://localhost", "ex1", false, false, boost::none
);
106 const auto connection_number
= amqp::get_connection_count();
107 amqp::connection_ptr_t conn2
= amqp::connect("amqp://localhost", "ex1", false, false, boost::none
);
109 EXPECT_EQ(amqp::get_connection_count(), connection_number
);
110 auto rc
= amqp::publish(conn1
, "topic", "message");
114 TEST_F(TestAMQP
, NameResolutionFail
)
116 const auto connection_number
= amqp::get_connection_count();
117 conn
= amqp::connect("amqp://kaboom", "ex1", false, false, boost::none
);
119 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
120 auto rc
= amqp::publish(conn
, "topic", "message");
124 TEST_F(TestAMQP
, InvalidPort
)
126 const auto connection_number
= amqp::get_connection_count();
127 conn
= amqp::connect("amqp://localhost:1234", "ex1", false, false, boost::none
);
129 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
130 auto rc
= amqp::publish(conn
, "topic", "message");
134 TEST_F(TestAMQP
, InvalidHost
)
136 const auto connection_number
= amqp::get_connection_count();
137 conn
= amqp::connect("amqp://0.0.0.1", "ex1", false, false, boost::none
);
139 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
140 auto rc
= amqp::publish(conn
, "topic", "message");
144 TEST_F(TestAMQP
, InvalidVhost
)
146 const auto connection_number
= amqp::get_connection_count();
147 conn
= amqp::connect("amqp://localhost/kaboom", "ex1", false, false, boost::none
);
149 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
150 auto rc
= amqp::publish(conn
, "topic", "message");
154 TEST_F(TestAMQP
, UserPassword
)
156 amqp_mock::set_valid_host("127.0.0.1");
158 const auto connection_number
= amqp::get_connection_count();
159 conn
= amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none
);
161 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
162 auto rc
= amqp::publish(conn
, "topic", "message");
165 // now try the same connection with default user/password
166 amqp_mock::set_valid_host("127.0.0.2");
168 const auto connection_number
= amqp::get_connection_count();
169 conn
= amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none
);
171 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
172 auto rc
= amqp::publish(conn
, "topic", "message");
175 amqp_mock::set_valid_host("localhost");
178 TEST_F(TestAMQP
, URLParseError
)
180 const auto connection_number
= amqp::get_connection_count();
181 conn
= amqp::connect("http://localhost", "ex1", false, false, boost::none
);
183 EXPECT_EQ(amqp::get_connection_count(), connection_number
);
184 auto rc
= amqp::publish(conn
, "topic", "message");
188 TEST_F(TestAMQP
, ExchangeMismatch
)
190 const auto connection_number
= amqp::get_connection_count();
191 conn
= amqp::connect("http://localhost", "ex2", false, false, boost::none
);
193 EXPECT_EQ(amqp::get_connection_count(), connection_number
);
194 auto rc
= amqp::publish(conn
, "topic", "message");
198 TEST_F(TestAMQP
, MaxConnections
)
200 // fill up all connections
201 std::vector
<amqp::connection_ptr_t
> connections
;
202 auto remaining_connections
= amqp::get_max_connections() - amqp::get_connection_count();
203 while (remaining_connections
> 0) {
204 const auto host
= "127.10.0." + std::to_string(remaining_connections
);
205 amqp_mock::set_valid_host(host
);
206 amqp::connection_ptr_t conn
= amqp::connect("amqp://" + host
, "ex1", false, false, boost::none
);
208 auto rc
= amqp::publish(conn
, "topic", "message");
210 --remaining_connections
;
211 connections
.push_back(conn
);
213 EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
214 // try to add another connection
216 const std::string host
= "toomany";
217 amqp_mock::set_valid_host(host
);
218 amqp::connection_ptr_t conn
= amqp::connect("amqp://" + host
, "ex1", false, false, boost::none
);
220 auto rc
= amqp::publish(conn
, "topic", "message");
223 EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
224 amqp_mock::set_valid_host("localhost");
227 std::atomic
<bool> callback_invoked
= false;
229 std::atomic
<int> callbacks_invoked
= 0;
231 // note: because these callback are shared among different "publish" calls
232 // they should be used on different connections
234 void my_callback_expect_ack(int rc
) {
236 callback_invoked
= true;
239 void my_callback_expect_nack(int rc
) {
241 callback_invoked
= true;
244 void my_callback_expect_multiple_acks(int rc
) {
249 class dynamic_callback_wrapper
{
250 dynamic_callback_wrapper() = default;
252 static dynamic_callback_wrapper
* create() {
253 return new dynamic_callback_wrapper
;
255 void callback(int rc
) {
262 void my_callback_expect_close_or_ack(int rc
) {
263 // deleting the connection should trigger the callback with -4098
264 // but due to race conditions, some my get an ack
265 EXPECT_TRUE(-4098 == rc
|| 0 == rc
);
268 TEST_F(TestAMQP
, ReceiveAck
)
270 callback_invoked
= false;
271 const std::string
host("localhost1");
272 amqp_mock::set_valid_host(host
);
273 conn
= amqp::connect("amqp://" + host
, "ex1", false, false, boost::none
);
275 auto rc
= publish_with_confirm(conn
, "topic", "message", my_callback_expect_ack
);
277 wait_until_drained();
278 EXPECT_TRUE(callback_invoked
);
279 amqp_mock::set_valid_host("localhost");
282 TEST_F(TestAMQP
, ImplicitConnectionClose
)
284 callback_invoked
= false;
285 const std::string
host("localhost1");
286 amqp_mock::set_valid_host(host
);
287 conn
= amqp::connect("amqp://" + host
, "ex1", false, false, boost::none
);
289 const auto NUMBER_OF_CALLS
= 2000;
290 for (auto i
= 0; i
< NUMBER_OF_CALLS
; ++i
) {
291 auto rc
= publish_with_confirm(conn
, "topic", "message", my_callback_expect_close_or_ack
);
294 wait_until_drained();
295 // deleting the connection object should close the connection
297 amqp_mock::set_valid_host("localhost");
300 TEST_F(TestAMQP
, ReceiveMultipleAck
)
302 callbacks_invoked
= 0;
303 const std::string
host("localhost1");
304 amqp_mock::set_valid_host(host
);
305 conn
= amqp::connect("amqp://" + host
, "ex1", false, false, boost::none
);
307 const auto NUMBER_OF_CALLS
= 100;
308 for (auto i
=0; i
< NUMBER_OF_CALLS
; ++i
) {
309 auto rc
= publish_with_confirm(conn
, "topic", "message", my_callback_expect_multiple_acks
);
312 wait_until_drained();
313 EXPECT_EQ(callbacks_invoked
, NUMBER_OF_CALLS
);
314 callbacks_invoked
= 0;
315 amqp_mock::set_valid_host("localhost");
318 TEST_F(TestAMQP
, ReceiveAckForMultiple
)
320 callbacks_invoked
= 0;
321 const std::string
host("localhost1");
322 amqp_mock::set_valid_host(host
);
323 conn
= amqp::connect("amqp://" + host
, "ex1", false, false, boost::none
);
325 amqp_mock::set_multiple(59);
326 const auto NUMBER_OF_CALLS
= 100;
327 for (auto i
=0; i
< NUMBER_OF_CALLS
; ++i
) {
328 auto rc
= publish_with_confirm(conn
, "topic", "message", my_callback_expect_multiple_acks
);
331 wait_until_drained();
332 EXPECT_EQ(callbacks_invoked
, NUMBER_OF_CALLS
);
333 callbacks_invoked
= 0;
334 amqp_mock::set_valid_host("localhost");
337 TEST_F(TestAMQP
, DynamicCallback
)
339 callbacks_invoked
= 0;
340 const std::string
host("localhost1");
341 amqp_mock::set_valid_host(host
);
342 conn
= amqp::connect("amqp://" + host
, "ex1", false, false, boost::none
);
344 amqp_mock::set_multiple(59);
345 const auto NUMBER_OF_CALLS
= 100;
346 for (auto i
=0; i
< NUMBER_OF_CALLS
; ++i
) {
347 auto rc
= publish_with_confirm(conn
, "topic", "message",
348 std::bind(&dynamic_callback_wrapper::callback
, dynamic_callback_wrapper::create(), std::placeholders::_1
));
351 wait_until_drained();
352 EXPECT_EQ(callbacks_invoked
, NUMBER_OF_CALLS
);
353 callbacks_invoked
= 0;
354 amqp_mock::set_valid_host("localhost");
357 TEST_F(TestAMQP
, ReceiveNack
)
359 callback_invoked
= false;
360 amqp_mock::REPLY_ACK
= false;
361 const std::string
host("localhost2");
362 amqp_mock::set_valid_host(host
);
363 conn
= amqp::connect("amqp://" + host
, "ex1", false, false, boost::none
);
365 auto rc
= publish_with_confirm(conn
, "topic", "message", my_callback_expect_nack
);
367 wait_until_drained();
368 EXPECT_TRUE(callback_invoked
);
369 amqp_mock::REPLY_ACK
= true;
370 callback_invoked
= false;
371 amqp_mock::set_valid_host("localhost");
374 TEST_F(TestAMQP
, FailWrite
)
376 callback_invoked
= false;
377 amqp_mock::FAIL_NEXT_WRITE
= true;
378 const std::string
host("localhost2");
379 amqp_mock::set_valid_host(host
);
380 conn
= amqp::connect("amqp://" + host
, "ex1", false, false, boost::none
);
382 auto rc
= publish_with_confirm(conn
, "topic", "message", my_callback_expect_nack
);
384 wait_until_drained();
385 EXPECT_TRUE(callback_invoked
);
386 amqp_mock::FAIL_NEXT_WRITE
= false;
387 callback_invoked
= false;
388 amqp_mock::set_valid_host("localhost");
391 TEST_F(TestAMQP
, RetryInvalidHost
)
393 const std::string host
= "192.168.0.1";
394 const auto connection_number
= amqp::get_connection_count();
395 conn
= amqp::connect("amqp://"+host
, "ex1", false, false, boost::none
);
397 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
398 auto rc
= amqp::publish(conn
, "topic", "message");
400 // now next retry should be ok
401 amqp_mock::set_valid_host(host
);
402 std::this_thread::sleep_for(long_wait_time
);
403 rc
= amqp::publish(conn
, "topic", "message");
405 amqp_mock::set_valid_host("localhost");
408 TEST_F(TestAMQP
, RetryInvalidPort
)
410 const int port
= 9999;
411 const auto connection_number
= amqp::get_connection_count();
412 conn
= amqp::connect("amqp://localhost:" + std::to_string(port
), "ex1", false, false, boost::none
);
414 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
415 auto rc
= amqp::publish(conn
, "topic", "message");
417 // now next retry should be ok
418 amqp_mock::set_valid_port(port
);
419 std::this_thread::sleep_for(long_wait_time
);
420 rc
= amqp::publish(conn
, "topic", "message");
422 amqp_mock::set_valid_port(5672);
425 TEST_F(TestAMQP
, RetryFailWrite
)
427 callback_invoked
= false;
428 amqp_mock::FAIL_NEXT_WRITE
= true;
429 const std::string
host("localhost4");
430 amqp_mock::set_valid_host(host
);
431 conn
= amqp::connect("amqp://" + host
, "ex1", false, false, boost::none
);
433 auto rc
= publish_with_confirm(conn
, "topic", "message", my_callback_expect_nack
);
435 // set port to a different one, so that reconnect would fail
436 amqp_mock::set_valid_port(9999);
437 wait_until_drained();
438 EXPECT_TRUE(callback_invoked
);
439 callback_invoked
= false;
440 rc
= publish_with_confirm(conn
, "topic", "message", my_callback_expect_nack
);
442 // expect immediate failure, no callback called after sleep
443 std::this_thread::sleep_for(long_wait_time
);
444 EXPECT_FALSE(callback_invoked
);
445 // set port to the right one so that reconnect would succeed
446 amqp_mock::set_valid_port(5672);
447 callback_invoked
= false;
448 amqp_mock::FAIL_NEXT_WRITE
= false;
449 // give time to reconnect
450 std::this_thread::sleep_for(long_wait_time
);
451 // retry to publish should succeed now
452 rc
= publish_with_confirm(conn
, "topic", "message", my_callback_expect_ack
);
454 wait_until_drained();
455 EXPECT_TRUE(callback_invoked
);
456 callback_invoked
= false;
457 amqp_mock::set_valid_host("localhost");