const std::chrono::milliseconds wait_time(10);
const std::chrono::milliseconds long_wait_time = wait_time*50;
+const std::chrono::seconds idle_time(35);
class CctCleaner {
class TestAMQP : public ::testing::Test {
protected:
- amqp::connection_ptr_t conn = nullptr;
+ amqp::connection_id_t conn_id;
unsigned current_dequeued = 0U;
void SetUp() override {
}
};
+std::atomic<bool> callback_invoked = false;
+
+std::atomic<int> callbacks_invoked = 0;
+
+// note: because these callback are shared among different "publish" calls
+// they should be used on different connections
+
+void my_callback_expect_ack(int rc) {
+ EXPECT_EQ(0, rc);
+ callback_invoked = true;
+}
+
+void my_callback_expect_nack(int rc) {
+ EXPECT_LT(rc, 0);
+ callback_invoked = true;
+}
+
+void my_callback_expect_multiple_acks(int rc) {
+ EXPECT_EQ(0, rc);
+ ++callbacks_invoked;
+}
+
+class dynamic_callback_wrapper {
+ dynamic_callback_wrapper() = default;
+public:
+ static dynamic_callback_wrapper* create() {
+ return new dynamic_callback_wrapper;
+ }
+ void callback(int rc) {
+ EXPECT_EQ(0, rc);
+ ++callbacks_invoked;
+ delete this;
+ }
+};
+
+void my_callback_expect_close_or_ack(int rc) {
+ // deleting the connection should trigger the callback with -4098
+ // but due to race conditions, some my get an ack
+ EXPECT_TRUE(-4098 == rc || 0 == rc);
+}
+
TEST_F(TestAMQP, ConnectionOK)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
+ auto rc = amqp::connect(conn_id, "amqp://localhost", "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
- auto rc = amqp::publish(conn, "topic", "message");
+ rc = amqp::publish(conn_id, "topic", "message");
EXPECT_EQ(rc, 0);
}
const int port = 5671;
const auto connection_number = amqp::get_connection_count();
amqp_mock::set_valid_port(port);
- conn = amqp::connect("amqps://localhost", "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
+ auto rc = amqp::connect(conn_id, "amqps://localhost", "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
- auto rc = amqp::publish(conn, "topic", "message");
+ rc = amqp::publish(conn_id, "topic", "message");
EXPECT_EQ(rc, 0);
amqp_mock::set_valid_port(5672);
}
const int port = 5671;
const auto connection_number = amqp::get_connection_count();
amqp_mock::set_valid_port(port);
- amqp::connection_ptr_t conn1 = amqp::connect("amqps://localhost", "ex1", false, false, boost::none);
- EXPECT_TRUE(conn1);
+ amqp::connection_id_t conn_id1;
+ auto rc = amqp::connect(conn_id1, "amqps://localhost", "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
- auto rc = amqp::publish(conn1, "topic", "message");
+ rc = amqp::publish(conn_id1, "topic", "message");
EXPECT_EQ(rc, 0);
+ EXPECT_EQ(amqp::to_string(conn_id1), "amqps://localhost:5671/?exchange=ex1");
amqp_mock::set_valid_port(5672);
- amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
- EXPECT_TRUE(conn2);
+ amqp::connection_id_t conn_id2;
+ rc = amqp::connect(conn_id2, "amqp://localhost", "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
+ EXPECT_EQ(amqp::to_string(conn_id2), "amqp://localhost:5672/?exchange=ex1");
EXPECT_EQ(amqp::get_connection_count(), connection_number + 2);
- rc = amqp::publish(conn2, "topic", "message");
+ rc = amqp::publish(conn_id2, "topic", "message");
EXPECT_EQ(rc, 0);
}
TEST_F(TestAMQP, ConnectionReuse)
{
- amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
- EXPECT_TRUE(conn1);
+ amqp::connection_id_t conn_id1;
+ auto rc = amqp::connect(conn_id1, "amqp://localhost", "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
- EXPECT_TRUE(conn2);
+ amqp::connection_id_t conn_id2;
+ rc = amqp::connect(conn_id2, "amqp://localhost", "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
EXPECT_EQ(amqp::get_connection_count(), connection_number);
- auto rc = amqp::publish(conn1, "topic", "message");
+ rc = amqp::publish(conn_id1, "topic", "message");
EXPECT_EQ(rc, 0);
}
TEST_F(TestAMQP, NameResolutionFail)
{
+ callback_invoked = false;
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://kaboom", "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://kaboom", "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
- auto rc = amqp::publish(conn, "topic", "message");
- EXPECT_LT(rc, 0);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+ EXPECT_EQ(rc, 0);
+ wait_until_drained();
+ EXPECT_TRUE(callback_invoked);
}
TEST_F(TestAMQP, InvalidPort)
{
+ callback_invoked = false;
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://localhost:1234", "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://localhost:1234", "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
- auto rc = amqp::publish(conn, "topic", "message");
- EXPECT_LT(rc, 0);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+ EXPECT_EQ(rc, 0);
+ wait_until_drained();
+ EXPECT_TRUE(callback_invoked);
}
TEST_F(TestAMQP, InvalidHost)
{
+ callback_invoked = false;
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://0.0.0.1", "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://0.0.0.1", "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
- auto rc = amqp::publish(conn, "topic", "message");
- EXPECT_LT(rc, 0);
+ EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+ EXPECT_EQ(rc, 0);
+ wait_until_drained();
+ EXPECT_TRUE(callback_invoked);
}
TEST_F(TestAMQP, InvalidVhost)
{
+ callback_invoked = false;
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://localhost/kaboom", "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://localhost/kaboom", "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
- auto rc = amqp::publish(conn, "topic", "message");
- EXPECT_LT(rc, 0);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+ EXPECT_EQ(rc, 0);
+ wait_until_drained();
+ EXPECT_TRUE(callback_invoked);
}
TEST_F(TestAMQP, UserPassword)
{
amqp_mock::set_valid_host("127.0.0.1");
{
+ callback_invoked = false;
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
- auto rc = amqp::publish(conn, "topic", "message");
- EXPECT_LT(rc, 0);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+ EXPECT_EQ(rc, 0);
+ wait_until_drained();
+ EXPECT_TRUE(callback_invoked);
}
// now try the same connection with default user/password
amqp_mock::set_valid_host("127.0.0.2");
{
+ callback_invoked = false;
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
- auto rc = amqp::publish(conn, "topic", "message");
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
EXPECT_EQ(rc, 0);
+ wait_until_drained();
+ EXPECT_TRUE(callback_invoked);
}
amqp_mock::set_valid_host("localhost");
}
TEST_F(TestAMQP, URLParseError)
{
+ callback_invoked = false;
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("http://localhost", "ex1", false, false, boost::none);
- EXPECT_FALSE(conn);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "http://localhost", "ex1", false, false, boost::none);
+ EXPECT_FALSE(rc);
EXPECT_EQ(amqp::get_connection_count(), connection_number);
- auto rc = amqp::publish(conn, "topic", "message");
- EXPECT_LT(rc, 0);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+ EXPECT_EQ(rc, 0);
+ wait_until_drained();
+ EXPECT_TRUE(callback_invoked);
}
TEST_F(TestAMQP, ExchangeMismatch)
{
+ callback_invoked = false;
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("http://localhost", "ex2", false, false, boost::none);
- EXPECT_FALSE(conn);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "http://localhost", "ex2", false, false, boost::none);
+ EXPECT_FALSE(rc);
EXPECT_EQ(amqp::get_connection_count(), connection_number);
- auto rc = amqp::publish(conn, "topic", "message");
- EXPECT_LT(rc, 0);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+ EXPECT_EQ(rc, 0);
+ wait_until_drained();
+ EXPECT_TRUE(callback_invoked);
}
TEST_F(TestAMQP, MaxConnections)
{
// fill up all connections
- std::vector<amqp::connection_ptr_t> connections;
+ std::vector<amqp::connection_id_t> connections;
auto remaining_connections = amqp::get_max_connections() - amqp::get_connection_count();
while (remaining_connections > 0) {
const auto host = "127.10.0." + std::to_string(remaining_connections);
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
- auto rc = amqp::publish(conn, "topic", "message");
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
EXPECT_EQ(rc, 0);
--remaining_connections;
- connections.push_back(conn);
+ connections.push_back(conn_id);
}
EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
+ wait_until_drained();
// try to add another connection
{
const std::string host = "toomany";
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
- EXPECT_FALSE(conn);
- auto rc = amqp::publish(conn, "topic", "message");
- EXPECT_LT(rc, 0);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+ EXPECT_FALSE(rc);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+ EXPECT_EQ(rc, 0);
+ wait_until_drained();
}
EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
amqp_mock::set_valid_host("localhost");
}
-std::atomic<bool> callback_invoked = false;
-
-std::atomic<int> callbacks_invoked = 0;
-
-// note: because these callback are shared among different "publish" calls
-// they should be used on different connections
-
-void my_callback_expect_ack(int rc) {
- EXPECT_EQ(0, rc);
- callback_invoked = true;
-}
-
-void my_callback_expect_nack(int rc) {
- EXPECT_LT(rc, 0);
- callback_invoked = true;
-}
-
-void my_callback_expect_multiple_acks(int rc) {
- EXPECT_EQ(0, rc);
- ++callbacks_invoked;
-}
-
-class dynamic_callback_wrapper {
- dynamic_callback_wrapper() = default;
-public:
- static dynamic_callback_wrapper* create() {
- return new dynamic_callback_wrapper;
- }
- void callback(int rc) {
- EXPECT_EQ(0, rc);
- ++callbacks_invoked;
- delete this;
- }
-};
-
-void my_callback_expect_close_or_ack(int rc) {
- // deleting the connection should trigger the callback with -4098
- // but due to race conditions, some my get an ack
- EXPECT_TRUE(-4098 == rc || 0 == rc);
-}
TEST_F(TestAMQP, ReceiveAck)
{
callback_invoked = false;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
- auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
EXPECT_EQ(rc, 0);
wait_until_drained();
EXPECT_TRUE(callback_invoked);
callback_invoked = false;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
const auto NUMBER_OF_CALLS = 2000;
for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
- auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_close_or_ack);
+ auto rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_close_or_ack);
EXPECT_EQ(rc, 0);
}
wait_until_drained();
- // deleting the connection object should close the connection
- conn.reset(nullptr);
amqp_mock::set_valid_host("localhost");
}
callbacks_invoked = 0;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
const auto NUMBER_OF_CALLS = 100;
for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
- auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
+ auto rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_multiple_acks);
EXPECT_EQ(rc, 0);
}
wait_until_drained();
callbacks_invoked = 0;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
amqp_mock::set_multiple(59);
const auto NUMBER_OF_CALLS = 100;
for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
- auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_multiple_acks);
EXPECT_EQ(rc, 0);
}
wait_until_drained();
callbacks_invoked = 0;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
amqp_mock::set_multiple(59);
const auto NUMBER_OF_CALLS = 100;
for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
- auto rc = publish_with_confirm(conn, "topic", "message",
+ rc = publish_with_confirm(conn_id, "topic", "message",
std::bind(&dynamic_callback_wrapper::callback, dynamic_callback_wrapper::create(), std::placeholders::_1));
EXPECT_EQ(rc, 0);
}
amqp_mock::REPLY_ACK = false;
const std::string host("localhost2");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
- auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);
wait_until_drained();
EXPECT_TRUE(callback_invoked);
amqp_mock::FAIL_NEXT_WRITE = true;
const std::string host("localhost2");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
- auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);
wait_until_drained();
EXPECT_TRUE(callback_invoked);
TEST_F(TestAMQP, RetryInvalidHost)
{
+ callback_invoked = false;
const std::string host = "192.168.0.1";
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://"+host, "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://"+host, "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
- auto rc = amqp::publish(conn, "topic", "message");
- EXPECT_LT(rc, 0);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+ EXPECT_EQ(rc, 0);
+ wait_until_drained();
+ EXPECT_TRUE(callback_invoked);
// now next retry should be ok
+ callback_invoked = false;
amqp_mock::set_valid_host(host);
std::this_thread::sleep_for(long_wait_time);
- rc = amqp::publish(conn, "topic", "message");
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
EXPECT_EQ(rc, 0);
+ wait_until_drained();
+ EXPECT_TRUE(callback_invoked);
amqp_mock::set_valid_host("localhost");
}
TEST_F(TestAMQP, RetryInvalidPort)
{
+ callback_invoked = false;
const int port = 9999;
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://localhost:" + std::to_string(port), "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
- auto rc = amqp::publish(conn, "topic", "message");
- EXPECT_LT(rc, 0);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+ EXPECT_EQ(rc, 0);
+ wait_until_drained();
+ EXPECT_TRUE(callback_invoked);
// now next retry should be ok
+ callback_invoked = false;
amqp_mock::set_valid_port(port);
std::this_thread::sleep_for(long_wait_time);
- rc = amqp::publish(conn, "topic", "message");
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
EXPECT_EQ(rc, 0);
+ wait_until_drained();
+ EXPECT_TRUE(callback_invoked);
amqp_mock::set_valid_port(5672);
}
{
callback_invoked = false;
amqp_mock::FAIL_NEXT_WRITE = true;
- const std::string host("localhost4");
+ const std::string host("localhost2");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
- EXPECT_TRUE(conn);
- auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);
- // set port to a different one, so that reconnect would fail
- amqp_mock::set_valid_port(9999);
wait_until_drained();
EXPECT_TRUE(callback_invoked);
- callback_invoked = false;
- rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
- EXPECT_LT(rc, 0);
- // expect immediate failure, no callback called after sleep
- std::this_thread::sleep_for(long_wait_time);
- EXPECT_FALSE(callback_invoked);
- // set port to the right one so that reconnect would succeed
- amqp_mock::set_valid_port(5672);
- callback_invoked = false;
+ // now next retry should be ok
amqp_mock::FAIL_NEXT_WRITE = false;
- // give time to reconnect
+ callback_invoked = false;
std::this_thread::sleep_for(long_wait_time);
- // retry to publish should succeed now
- rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
EXPECT_EQ(rc, 0);
wait_until_drained();
EXPECT_TRUE(callback_invoked);
- callback_invoked = false;
amqp_mock::set_valid_host("localhost");
}
+TEST_F(TestAMQP, IdleConnection)
+{
+ // this test is skipped since it takes 30seconds
+ //GTEST_SKIP();
+ const auto connection_number = amqp::get_connection_count();
+ amqp::connection_id_t conn_id;
+ auto rc = amqp::connect(conn_id, "amqp://localhost", "ex1", false, false, boost::none);
+ EXPECT_TRUE(rc);
+ EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
+ std::this_thread::sleep_for(idle_time);
+ EXPECT_EQ(amqp::get_connection_count(), connection_number);
+ rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+ EXPECT_EQ(rc, 0);
+ wait_until_drained();
+ EXPECT_TRUE(callback_invoked);
+}
+