]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/rgw/test_rgw_amqp.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / test / rgw / test_rgw_amqp.cc
index bf8671771d9eb72e4dc8739059236e9929080747..f49d309c78cefff203e770866ad43f9ac82e8998 100644 (file)
@@ -13,6 +13,7 @@ using namespace rgw;
 
 const std::chrono::milliseconds wait_time(10);
 const std::chrono::milliseconds long_wait_time = wait_time*50;
+const std::chrono::seconds idle_time(35);
 
 
 class CctCleaner {
@@ -34,7 +35,7 @@ CctCleaner cleaner(cct);
 
 class TestAMQP : public ::testing::Test {
 protected:
-  amqp::connection_ptr_t conn = nullptr;
+  amqp::connection_id_t conn_id;
   unsigned current_dequeued = 0U;
 
   void SetUp() override {
@@ -58,13 +59,54 @@ protected:
   }
 };
 
+std::atomic<bool> callback_invoked = false;
+
+std::atomic<int> callbacks_invoked = 0;
+
+// note: because these callback are shared among different "publish" calls
+// they should be used on different connections
+
+void my_callback_expect_ack(int rc) {
+  EXPECT_EQ(0, rc);
+  callback_invoked = true;
+}
+
+void my_callback_expect_nack(int rc) {
+  EXPECT_LT(rc, 0);
+  callback_invoked = true;
+}
+
+void my_callback_expect_multiple_acks(int rc) {
+  EXPECT_EQ(0, rc);
+  ++callbacks_invoked;
+}
+
+class dynamic_callback_wrapper {
+    dynamic_callback_wrapper() = default;
+public:
+    static dynamic_callback_wrapper* create() {
+        return new dynamic_callback_wrapper;
+    }
+    void callback(int rc) {
+      EXPECT_EQ(0, rc);
+      ++callbacks_invoked;
+      delete this;
+    }
+};
+
+void my_callback_expect_close_or_ack(int rc) {
+  // deleting the connection should trigger the callback with -4098
+  // but due to race conditions, some my get an ack
+  EXPECT_TRUE(-4098 == rc || 0 == rc);
+}
+
 TEST_F(TestAMQP, ConnectionOK)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  auto rc = amqp::connect(conn_id, "amqp://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
+  rc = amqp::publish(conn_id, "topic", "message");
   EXPECT_EQ(rc, 0);
 }
 
@@ -73,10 +115,10 @@ TEST_F(TestAMQP, SSLConnectionOK)
   const int port = 5671;
   const auto connection_number = amqp::get_connection_count();
   amqp_mock::set_valid_port(port);
-  conn = amqp::connect("amqps://localhost", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  auto rc = amqp::connect(conn_id, "amqps://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
+  rc = amqp::publish(conn_id, "topic", "message");
   EXPECT_EQ(rc, 0);
   amqp_mock::set_valid_port(5672);
 }
@@ -86,193 +128,197 @@ TEST_F(TestAMQP, PlainAndSSLConnectionsOK)
   const int port = 5671;
   const auto connection_number = amqp::get_connection_count();
   amqp_mock::set_valid_port(port);
-  amqp::connection_ptr_t conn1 = amqp::connect("amqps://localhost", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn1);
+  amqp::connection_id_t conn_id1;
+  auto rc = amqp::connect(conn_id1, "amqps://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn1, "topic", "message");
+  rc = amqp::publish(conn_id1, "topic", "message");
   EXPECT_EQ(rc, 0);
+  EXPECT_EQ(amqp::to_string(conn_id1), "amqps://localhost:5671/?exchange=ex1");
   amqp_mock::set_valid_port(5672);
-  amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn2);
+  amqp::connection_id_t conn_id2;
+  rc = amqp::connect(conn_id2, "amqp://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
+  EXPECT_EQ(amqp::to_string(conn_id2), "amqp://localhost:5672/?exchange=ex1");
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 2);
-  rc = amqp::publish(conn2, "topic", "message");
+  rc = amqp::publish(conn_id2, "topic", "message");
   EXPECT_EQ(rc, 0);
 }
 
 TEST_F(TestAMQP, ConnectionReuse)
 {
-  amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn1);
+  amqp::connection_id_t conn_id1;
+  auto rc = amqp::connect(conn_id1, "amqp://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn2);
+  amqp::connection_id_t conn_id2;
+  rc = amqp::connect(conn_id2, "amqp://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
-  auto rc = amqp::publish(conn1, "topic", "message");
+  rc = amqp::publish(conn_id1, "topic", "message");
   EXPECT_EQ(rc, 0);
 }
 
 TEST_F(TestAMQP, NameResolutionFail)
 {
+  callback_invoked = false;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://kaboom", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://kaboom", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
 }
 
 TEST_F(TestAMQP, InvalidPort)
 {
+  callback_invoked = false;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost:1234", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://localhost:1234", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
 }
 
 TEST_F(TestAMQP, InvalidHost)
 {
+  callback_invoked = false;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://0.0.0.1", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://0.0.0.1", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
 }
 
 TEST_F(TestAMQP, InvalidVhost)
 {
+  callback_invoked = false;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost/kaboom", "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://localhost/kaboom", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
 }
 
 TEST_F(TestAMQP, UserPassword)
 {
   amqp_mock::set_valid_host("127.0.0.1");
   {
+    callback_invoked = false;
     const auto connection_number = amqp::get_connection_count();
-    conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none);
-    EXPECT_TRUE(conn);
+    amqp::connection_id_t conn_id;
+    auto rc = amqp::connect(conn_id, "amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none);
+    EXPECT_TRUE(rc);
     EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-    auto rc = amqp::publish(conn, "topic", "message");
-    EXPECT_LT(rc, 0);
+    rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+    EXPECT_EQ(rc, 0);
+    wait_until_drained();
+    EXPECT_TRUE(callback_invoked);
   }
   // now try the same connection with default user/password
   amqp_mock::set_valid_host("127.0.0.2");
   {
+    callback_invoked = false;
     const auto connection_number = amqp::get_connection_count();
-    conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none);
-    EXPECT_TRUE(conn);
+    amqp::connection_id_t conn_id;
+    auto rc = amqp::connect(conn_id, "amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none);
+    EXPECT_TRUE(rc);
     EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-    auto rc = amqp::publish(conn, "topic", "message");
+    rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
     EXPECT_EQ(rc, 0);
+    wait_until_drained();
+    EXPECT_TRUE(callback_invoked);
   }
   amqp_mock::set_valid_host("localhost");
 }
 
 TEST_F(TestAMQP, URLParseError)
 {
+  callback_invoked = false;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("http://localhost", "ex1", false, false, boost::none);
-  EXPECT_FALSE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "http://localhost", "ex1", false, false, boost::none);
+  EXPECT_FALSE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
 }
 
 TEST_F(TestAMQP, ExchangeMismatch)
 {
+  callback_invoked = false;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("http://localhost", "ex2", false, false, boost::none);
-  EXPECT_FALSE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "http://localhost", "ex2", false, false, boost::none);
+  EXPECT_FALSE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
 }
 
 TEST_F(TestAMQP, MaxConnections)
 {
   // fill up all connections
-  std::vector<amqp::connection_ptr_t> connections;
+  std::vector<amqp::connection_id_t> connections;
   auto remaining_connections = amqp::get_max_connections() - amqp::get_connection_count();
   while (remaining_connections > 0) {
     const auto host = "127.10.0." + std::to_string(remaining_connections);
     amqp_mock::set_valid_host(host);
-    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-    EXPECT_TRUE(conn);
-    auto rc = amqp::publish(conn, "topic", "message");
+    amqp::connection_id_t conn_id;
+    auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+    EXPECT_TRUE(rc);
+    rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
     EXPECT_EQ(rc, 0);
     --remaining_connections;
-    connections.push_back(conn);
+    connections.push_back(conn_id);
   }
   EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
+  wait_until_drained();
   // try to add another connection
   {
     const std::string host = "toomany";
     amqp_mock::set_valid_host(host);
-    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-    EXPECT_FALSE(conn);
-    auto rc = amqp::publish(conn, "topic", "message");
-    EXPECT_LT(rc, 0);
+    amqp::connection_id_t conn_id;
+    auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+    EXPECT_FALSE(rc);
+    rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+    EXPECT_EQ(rc, 0);
+    wait_until_drained();
   }
   EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
   amqp_mock::set_valid_host("localhost");
 }
 
-std::atomic<bool> callback_invoked = false;
-
-std::atomic<int> callbacks_invoked = 0;
-
-// note: because these callback are shared among different "publish" calls
-// they should be used on different connections
-
-void my_callback_expect_ack(int rc) {
-  EXPECT_EQ(0, rc);
-  callback_invoked = true;
-}
-
-void my_callback_expect_nack(int rc) {
-  EXPECT_LT(rc, 0);
-  callback_invoked = true;
-}
-
-void my_callback_expect_multiple_acks(int rc) {
-  EXPECT_EQ(0, rc);
-  ++callbacks_invoked;
-}
-
-class dynamic_callback_wrapper {
-    dynamic_callback_wrapper() = default;
-public:
-    static dynamic_callback_wrapper* create() {
-        return new dynamic_callback_wrapper;
-    }
-    void callback(int rc) {
-      EXPECT_EQ(0, rc);
-      ++callbacks_invoked;
-      delete this;
-    }
-};
-
-void my_callback_expect_close_or_ack(int rc) {
-  // deleting the connection should trigger the callback with -4098
-  // but due to race conditions, some my get an ack
-  EXPECT_TRUE(-4098 == rc || 0 == rc);
-}
 
 TEST_F(TestAMQP, ReceiveAck)
 {
   callback_invoked = false;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
-  auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
   EXPECT_EQ(rc, 0);
   wait_until_drained();
   EXPECT_TRUE(callback_invoked);
@@ -284,16 +330,15 @@ TEST_F(TestAMQP, ImplicitConnectionClose)
   callback_invoked = false;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   const auto NUMBER_OF_CALLS = 2000;
   for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
-    auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_close_or_ack);
+    auto rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_close_or_ack);
     EXPECT_EQ(rc, 0);
   }
   wait_until_drained();
-  // deleting the connection object should close the connection
-  conn.reset(nullptr);
   amqp_mock::set_valid_host("localhost");
 }
 
@@ -302,11 +347,12 @@ TEST_F(TestAMQP, ReceiveMultipleAck)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   const auto NUMBER_OF_CALLS = 100;
   for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
-    auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
+    auto rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_multiple_acks);
     EXPECT_EQ(rc, 0);
   }
   wait_until_drained();
@@ -320,12 +366,13 @@ TEST_F(TestAMQP, ReceiveAckForMultiple)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   amqp_mock::set_multiple(59);
   const auto NUMBER_OF_CALLS = 100;
   for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
-    auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
+    rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_multiple_acks);
     EXPECT_EQ(rc, 0);
   }
   wait_until_drained();
@@ -339,12 +386,13 @@ TEST_F(TestAMQP, DynamicCallback)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   amqp_mock::set_multiple(59);
   const auto NUMBER_OF_CALLS = 100;
   for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
-    auto rc = publish_with_confirm(conn, "topic", "message",
+    rc = publish_with_confirm(conn_id, "topic", "message",
             std::bind(&dynamic_callback_wrapper::callback, dynamic_callback_wrapper::create(), std::placeholders::_1));
     EXPECT_EQ(rc, 0);
   }
@@ -360,9 +408,10 @@ TEST_F(TestAMQP, ReceiveNack)
   amqp_mock::REPLY_ACK = false;
   const std::string host("localhost2");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
-  auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
   wait_until_drained();
   EXPECT_TRUE(callback_invoked);
@@ -377,9 +426,10 @@ TEST_F(TestAMQP, FailWrite)
   amqp_mock::FAIL_NEXT_WRITE = true;
   const std::string host("localhost2");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
-  auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
   wait_until_drained();
   EXPECT_TRUE(callback_invoked);
@@ -390,35 +440,49 @@ TEST_F(TestAMQP, FailWrite)
 
 TEST_F(TestAMQP, RetryInvalidHost)
 {
+  callback_invoked = false;
   const std::string host = "192.168.0.1";
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://"+host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://"+host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
   // now next retry should be ok
+  callback_invoked = false;
   amqp_mock::set_valid_host(host);
   std::this_thread::sleep_for(long_wait_time);
-  rc = amqp::publish(conn, "topic", "message");
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
   EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
   amqp_mock::set_valid_host("localhost");
 }
 
 TEST_F(TestAMQP, RetryInvalidPort)
 {
+  callback_invoked = false;
   const int port = 9999;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://localhost:" + std::to_string(port), "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
-  auto rc = amqp::publish(conn, "topic", "message");
-  EXPECT_LT(rc, 0);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
   // now next retry should be ok
+  callback_invoked = false;
   amqp_mock::set_valid_port(port);
   std::this_thread::sleep_for(long_wait_time);
-  rc = amqp::publish(conn, "topic", "message");
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
   EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
   amqp_mock::set_valid_port(5672);
 }
 
@@ -426,34 +490,40 @@ TEST_F(TestAMQP, RetryFailWrite)
 {
   callback_invoked = false;
   amqp_mock::FAIL_NEXT_WRITE = true;
-  const std::string host("localhost4");
+  const std::string host("localhost2");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
-  EXPECT_TRUE(conn);
-  auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
-  // set port to a different one, so that reconnect would fail
-  amqp_mock::set_valid_port(9999);
   wait_until_drained();
   EXPECT_TRUE(callback_invoked);
-  callback_invoked = false;
-  rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
-  EXPECT_LT(rc, 0);
-  // expect immediate failure, no callback called after sleep
-  std::this_thread::sleep_for(long_wait_time);
-  EXPECT_FALSE(callback_invoked);
-  // set port to the right one so that reconnect would succeed
-  amqp_mock::set_valid_port(5672);
-  callback_invoked = false;
+  // now next retry should be ok
   amqp_mock::FAIL_NEXT_WRITE = false;
-  // give time to reconnect
+  callback_invoked = false;
   std::this_thread::sleep_for(long_wait_time);
-  // retry to publish should succeed now
-  rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
   EXPECT_EQ(rc, 0);
   wait_until_drained();
   EXPECT_TRUE(callback_invoked);
-  callback_invoked = false;
   amqp_mock::set_valid_host("localhost");
 }
 
+TEST_F(TestAMQP, IdleConnection)
+{
+  // this test is skipped since it takes 30seconds
+  //GTEST_SKIP();
+  const auto connection_number = amqp::get_connection_count();
+  amqp::connection_id_t conn_id;
+  auto rc = amqp::connect(conn_id, "amqp://localhost", "ex1", false, false, boost::none);
+  EXPECT_TRUE(rc);
+  EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
+  std::this_thread::sleep_for(idle_time);
+  EXPECT_EQ(amqp::get_connection_count(), connection_number);
+  rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
+  EXPECT_EQ(rc, 0);
+  wait_until_drained();
+  EXPECT_TRUE(callback_invoked);
+}
+