1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "common/ceph_context.h"
7 #include <gtest/gtest.h>
14 const std::chrono::milliseconds
wait_time(10);
15 const std::chrono::milliseconds long_wait_time
= wait_time
*50;
16 const std::chrono::seconds
idle_time(35);
22 CctCleaner(CephContext
* _cct
) : cct(_cct
) {}
32 auto cct
= new CephContext(CEPH_ENTITY_TYPE_CLIENT
);
34 CctCleaner
cleaner(cct
);
36 class TestAMQP
: public ::testing::Test
{
38 amqp::connection_id_t conn_id
;
39 unsigned current_dequeued
= 0U;
41 void SetUp() override
{
42 ASSERT_TRUE(amqp::init(cct
));
45 void TearDown() override
{
49 // wait for at least one new (since last drain) message to be dequeueud
50 // and then wait for all pending answers to be received
51 void wait_until_drained() {
52 while (amqp::get_dequeued() == current_dequeued
) {
53 std::this_thread::sleep_for(wait_time
);
55 while (amqp::get_inflight() > 0) {
56 std::this_thread::sleep_for(wait_time
);
58 current_dequeued
= amqp::get_dequeued();
62 std::atomic
<bool> callback_invoked
= false;
64 std::atomic
<int> callbacks_invoked
= 0;
66 // note: because these callback are shared among different "publish" calls
67 // they should be used on different connections
69 void my_callback_expect_ack(int rc
) {
71 callback_invoked
= true;
74 void my_callback_expect_nack(int rc
) {
76 callback_invoked
= true;
79 void my_callback_expect_multiple_acks(int rc
) {
84 class dynamic_callback_wrapper
{
85 dynamic_callback_wrapper() = default;
87 static dynamic_callback_wrapper
* create() {
88 return new dynamic_callback_wrapper
;
90 void callback(int rc
) {
97 void my_callback_expect_close_or_ack(int rc
) {
98 // deleting the connection should trigger the callback with -4098
99 // but due to race conditions, some my get an ack
100 EXPECT_TRUE(-4098 == rc
|| 0 == rc
);
103 TEST_F(TestAMQP
, ConnectionOK
)
105 const auto connection_number
= amqp::get_connection_count();
106 auto rc
= amqp::connect(conn_id
, "amqp://localhost", "ex1", false, false, boost::none
);
108 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
109 rc
= amqp::publish(conn_id
, "topic", "message");
113 TEST_F(TestAMQP
, SSLConnectionOK
)
115 const int port
= 5671;
116 const auto connection_number
= amqp::get_connection_count();
117 amqp_mock::set_valid_port(port
);
118 auto rc
= amqp::connect(conn_id
, "amqps://localhost", "ex1", false, false, boost::none
);
120 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
121 rc
= amqp::publish(conn_id
, "topic", "message");
123 amqp_mock::set_valid_port(5672);
126 TEST_F(TestAMQP
, PlainAndSSLConnectionsOK
)
128 const int port
= 5671;
129 const auto connection_number
= amqp::get_connection_count();
130 amqp_mock::set_valid_port(port
);
131 amqp::connection_id_t conn_id1
;
132 auto rc
= amqp::connect(conn_id1
, "amqps://localhost", "ex1", false, false, boost::none
);
134 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
135 rc
= amqp::publish(conn_id1
, "topic", "message");
137 EXPECT_EQ(amqp::to_string(conn_id1
), "amqps://localhost:5671/?exchange=ex1");
138 amqp_mock::set_valid_port(5672);
139 amqp::connection_id_t conn_id2
;
140 rc
= amqp::connect(conn_id2
, "amqp://localhost", "ex1", false, false, boost::none
);
142 EXPECT_EQ(amqp::to_string(conn_id2
), "amqp://localhost:5672/?exchange=ex1");
143 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 2);
144 rc
= amqp::publish(conn_id2
, "topic", "message");
148 TEST_F(TestAMQP
, ConnectionReuse
)
150 amqp::connection_id_t conn_id1
;
151 auto rc
= amqp::connect(conn_id1
, "amqp://localhost", "ex1", false, false, boost::none
);
153 const auto connection_number
= amqp::get_connection_count();
154 amqp::connection_id_t conn_id2
;
155 rc
= amqp::connect(conn_id2
, "amqp://localhost", "ex1", false, false, boost::none
);
157 EXPECT_EQ(amqp::get_connection_count(), connection_number
);
158 rc
= amqp::publish(conn_id1
, "topic", "message");
162 TEST_F(TestAMQP
, NameResolutionFail
)
164 callback_invoked
= false;
165 const auto connection_number
= amqp::get_connection_count();
166 amqp::connection_id_t conn_id
;
167 auto rc
= amqp::connect(conn_id
, "amqp://kaboom", "ex1", false, false, boost::none
);
169 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
170 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_nack
);
172 wait_until_drained();
173 EXPECT_TRUE(callback_invoked
);
176 TEST_F(TestAMQP
, InvalidPort
)
178 callback_invoked
= false;
179 const auto connection_number
= amqp::get_connection_count();
180 amqp::connection_id_t conn_id
;
181 auto rc
= amqp::connect(conn_id
, "amqp://localhost:1234", "ex1", false, false, boost::none
);
183 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
184 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_nack
);
186 wait_until_drained();
187 EXPECT_TRUE(callback_invoked
);
190 TEST_F(TestAMQP
, InvalidHost
)
192 callback_invoked
= false;
193 const auto connection_number
= amqp::get_connection_count();
194 amqp::connection_id_t conn_id
;
195 auto rc
= amqp::connect(conn_id
, "amqp://0.0.0.1", "ex1", false, false, boost::none
);
197 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
198 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
199 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_nack
);
201 wait_until_drained();
202 EXPECT_TRUE(callback_invoked
);
205 TEST_F(TestAMQP
, InvalidVhost
)
207 callback_invoked
= false;
208 const auto connection_number
= amqp::get_connection_count();
209 amqp::connection_id_t conn_id
;
210 auto rc
= amqp::connect(conn_id
, "amqp://localhost/kaboom", "ex1", false, false, boost::none
);
212 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
213 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_nack
);
215 wait_until_drained();
216 EXPECT_TRUE(callback_invoked
);
219 TEST_F(TestAMQP
, UserPassword
)
221 amqp_mock::set_valid_host("127.0.0.1");
223 callback_invoked
= false;
224 const auto connection_number
= amqp::get_connection_count();
225 amqp::connection_id_t conn_id
;
226 auto rc
= amqp::connect(conn_id
, "amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none
);
228 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
229 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_nack
);
231 wait_until_drained();
232 EXPECT_TRUE(callback_invoked
);
234 // now try the same connection with default user/password
235 amqp_mock::set_valid_host("127.0.0.2");
237 callback_invoked
= false;
238 const auto connection_number
= amqp::get_connection_count();
239 amqp::connection_id_t conn_id
;
240 auto rc
= amqp::connect(conn_id
, "amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none
);
242 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
243 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_ack
);
245 wait_until_drained();
246 EXPECT_TRUE(callback_invoked
);
248 amqp_mock::set_valid_host("localhost");
251 TEST_F(TestAMQP
, URLParseError
)
253 callback_invoked
= false;
254 const auto connection_number
= amqp::get_connection_count();
255 amqp::connection_id_t conn_id
;
256 auto rc
= amqp::connect(conn_id
, "http://localhost", "ex1", false, false, boost::none
);
258 EXPECT_EQ(amqp::get_connection_count(), connection_number
);
259 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_nack
);
261 wait_until_drained();
262 EXPECT_TRUE(callback_invoked
);
265 TEST_F(TestAMQP
, ExchangeMismatch
)
267 callback_invoked
= false;
268 const auto connection_number
= amqp::get_connection_count();
269 amqp::connection_id_t conn_id
;
270 auto rc
= amqp::connect(conn_id
, "http://localhost", "ex2", false, false, boost::none
);
272 EXPECT_EQ(amqp::get_connection_count(), connection_number
);
273 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_nack
);
275 wait_until_drained();
276 EXPECT_TRUE(callback_invoked
);
279 TEST_F(TestAMQP
, MaxConnections
)
281 // fill up all connections
282 std::vector
<amqp::connection_id_t
> connections
;
283 auto remaining_connections
= amqp::get_max_connections() - amqp::get_connection_count();
284 while (remaining_connections
> 0) {
285 const auto host
= "127.10.0." + std::to_string(remaining_connections
);
286 amqp_mock::set_valid_host(host
);
287 amqp::connection_id_t conn_id
;
288 auto rc
= amqp::connect(conn_id
, "amqp://" + host
, "ex1", false, false, boost::none
);
290 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_ack
);
292 --remaining_connections
;
293 connections
.push_back(conn_id
);
295 EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
296 wait_until_drained();
297 // try to add another connection
299 const std::string host
= "toomany";
300 amqp_mock::set_valid_host(host
);
301 amqp::connection_id_t conn_id
;
302 auto rc
= amqp::connect(conn_id
, "amqp://" + host
, "ex1", false, false, boost::none
);
304 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_nack
);
306 wait_until_drained();
308 EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
309 amqp_mock::set_valid_host("localhost");
313 TEST_F(TestAMQP
, ReceiveAck
)
315 callback_invoked
= false;
316 const std::string
host("localhost1");
317 amqp_mock::set_valid_host(host
);
318 amqp::connection_id_t conn_id
;
319 auto rc
= amqp::connect(conn_id
, "amqp://" + host
, "ex1", false, false, boost::none
);
321 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_ack
);
323 wait_until_drained();
324 EXPECT_TRUE(callback_invoked
);
325 amqp_mock::set_valid_host("localhost");
328 TEST_F(TestAMQP
, ImplicitConnectionClose
)
330 callback_invoked
= false;
331 const std::string
host("localhost1");
332 amqp_mock::set_valid_host(host
);
333 amqp::connection_id_t conn_id
;
334 auto rc
= amqp::connect(conn_id
, "amqp://" + host
, "ex1", false, false, boost::none
);
336 const auto NUMBER_OF_CALLS
= 2000;
337 for (auto i
= 0; i
< NUMBER_OF_CALLS
; ++i
) {
338 auto rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_close_or_ack
);
341 wait_until_drained();
342 amqp_mock::set_valid_host("localhost");
345 TEST_F(TestAMQP
, ReceiveMultipleAck
)
347 callbacks_invoked
= 0;
348 const std::string
host("localhost1");
349 amqp_mock::set_valid_host(host
);
350 amqp::connection_id_t conn_id
;
351 auto rc
= amqp::connect(conn_id
, "amqp://" + host
, "ex1", false, false, boost::none
);
353 const auto NUMBER_OF_CALLS
= 100;
354 for (auto i
=0; i
< NUMBER_OF_CALLS
; ++i
) {
355 auto rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_multiple_acks
);
358 wait_until_drained();
359 EXPECT_EQ(callbacks_invoked
, NUMBER_OF_CALLS
);
360 callbacks_invoked
= 0;
361 amqp_mock::set_valid_host("localhost");
364 TEST_F(TestAMQP
, ReceiveAckForMultiple
)
366 callbacks_invoked
= 0;
367 const std::string
host("localhost1");
368 amqp_mock::set_valid_host(host
);
369 amqp::connection_id_t conn_id
;
370 auto rc
= amqp::connect(conn_id
, "amqp://" + host
, "ex1", false, false, boost::none
);
372 amqp_mock::set_multiple(59);
373 const auto NUMBER_OF_CALLS
= 100;
374 for (auto i
=0; i
< NUMBER_OF_CALLS
; ++i
) {
375 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_multiple_acks
);
378 wait_until_drained();
379 EXPECT_EQ(callbacks_invoked
, NUMBER_OF_CALLS
);
380 callbacks_invoked
= 0;
381 amqp_mock::set_valid_host("localhost");
384 TEST_F(TestAMQP
, DynamicCallback
)
386 callbacks_invoked
= 0;
387 const std::string
host("localhost1");
388 amqp_mock::set_valid_host(host
);
389 amqp::connection_id_t conn_id
;
390 auto rc
= amqp::connect(conn_id
, "amqp://" + host
, "ex1", false, false, boost::none
);
392 amqp_mock::set_multiple(59);
393 const auto NUMBER_OF_CALLS
= 100;
394 for (auto i
=0; i
< NUMBER_OF_CALLS
; ++i
) {
395 rc
= publish_with_confirm(conn_id
, "topic", "message",
396 std::bind(&dynamic_callback_wrapper::callback
, dynamic_callback_wrapper::create(), std::placeholders::_1
));
399 wait_until_drained();
400 EXPECT_EQ(callbacks_invoked
, NUMBER_OF_CALLS
);
401 callbacks_invoked
= 0;
402 amqp_mock::set_valid_host("localhost");
405 TEST_F(TestAMQP
, ReceiveNack
)
407 callback_invoked
= false;
408 amqp_mock::REPLY_ACK
= false;
409 const std::string
host("localhost2");
410 amqp_mock::set_valid_host(host
);
411 amqp::connection_id_t conn_id
;
412 auto rc
= amqp::connect(conn_id
, "amqp://" + host
, "ex1", false, false, boost::none
);
414 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_nack
);
416 wait_until_drained();
417 EXPECT_TRUE(callback_invoked
);
418 amqp_mock::REPLY_ACK
= true;
419 callback_invoked
= false;
420 amqp_mock::set_valid_host("localhost");
423 TEST_F(TestAMQP
, FailWrite
)
425 callback_invoked
= false;
426 amqp_mock::FAIL_NEXT_WRITE
= true;
427 const std::string
host("localhost2");
428 amqp_mock::set_valid_host(host
);
429 amqp::connection_id_t conn_id
;
430 auto rc
= amqp::connect(conn_id
, "amqp://" + host
, "ex1", false, false, boost::none
);
432 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_nack
);
434 wait_until_drained();
435 EXPECT_TRUE(callback_invoked
);
436 amqp_mock::FAIL_NEXT_WRITE
= false;
437 callback_invoked
= false;
438 amqp_mock::set_valid_host("localhost");
441 TEST_F(TestAMQP
, RetryInvalidHost
)
443 callback_invoked
= false;
444 const std::string host
= "192.168.0.1";
445 const auto connection_number
= amqp::get_connection_count();
446 amqp::connection_id_t conn_id
;
447 auto rc
= amqp::connect(conn_id
, "amqp://"+host
, "ex1", false, false, boost::none
);
449 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
450 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_nack
);
452 wait_until_drained();
453 EXPECT_TRUE(callback_invoked
);
454 // now next retry should be ok
455 callback_invoked
= false;
456 amqp_mock::set_valid_host(host
);
457 std::this_thread::sleep_for(long_wait_time
);
458 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_ack
);
460 wait_until_drained();
461 EXPECT_TRUE(callback_invoked
);
462 amqp_mock::set_valid_host("localhost");
465 TEST_F(TestAMQP
, RetryInvalidPort
)
467 callback_invoked
= false;
468 const int port
= 9999;
469 const auto connection_number
= amqp::get_connection_count();
470 amqp::connection_id_t conn_id
;
471 auto rc
= amqp::connect(conn_id
, "amqp://localhost:" + std::to_string(port
), "ex1", false, false, boost::none
);
473 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
474 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_nack
);
476 wait_until_drained();
477 EXPECT_TRUE(callback_invoked
);
478 // now next retry should be ok
479 callback_invoked
= false;
480 amqp_mock::set_valid_port(port
);
481 std::this_thread::sleep_for(long_wait_time
);
482 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_ack
);
484 wait_until_drained();
485 EXPECT_TRUE(callback_invoked
);
486 amqp_mock::set_valid_port(5672);
489 TEST_F(TestAMQP
, RetryFailWrite
)
491 callback_invoked
= false;
492 amqp_mock::FAIL_NEXT_WRITE
= true;
493 const std::string
host("localhost2");
494 amqp_mock::set_valid_host(host
);
495 amqp::connection_id_t conn_id
;
496 auto rc
= amqp::connect(conn_id
, "amqp://" + host
, "ex1", false, false, boost::none
);
498 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_nack
);
500 wait_until_drained();
501 EXPECT_TRUE(callback_invoked
);
502 // now next retry should be ok
503 amqp_mock::FAIL_NEXT_WRITE
= false;
504 callback_invoked
= false;
505 std::this_thread::sleep_for(long_wait_time
);
506 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_ack
);
508 wait_until_drained();
509 EXPECT_TRUE(callback_invoked
);
510 amqp_mock::set_valid_host("localhost");
513 TEST_F(TestAMQP
, IdleConnection
)
515 // this test is skipped since it takes 30seconds
517 const auto connection_number
= amqp::get_connection_count();
518 amqp::connection_id_t conn_id
;
519 auto rc
= amqp::connect(conn_id
, "amqp://localhost", "ex1", false, false, boost::none
);
521 EXPECT_EQ(amqp::get_connection_count(), connection_number
+ 1);
522 std::this_thread::sleep_for(idle_time
);
523 EXPECT_EQ(amqp::get_connection_count(), connection_number
);
524 rc
= publish_with_confirm(conn_id
, "topic", "message", my_callback_expect_nack
);
526 wait_until_drained();
527 EXPECT_TRUE(callback_invoked
);