]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_amqp.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / rgw / rgw_amqp.cc
index 186bdd54ad6fe4271912633c6ba5fd334cc3452b..3014edd1db09dbfeb34621f9da0f1661896ce4e6 100644 (file)
@@ -1,4 +1,4 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab ft=cpp
 
 #include "rgw_amqp.h"
@@ -16,6 +16,7 @@
 #include <atomic>
 #include <mutex>
 #include <boost/lockfree/queue.hpp>
+#include <boost/functional/hash.hpp>
 #include "common/dout.h"
 #include <openssl/ssl.h>
 
@@ -50,49 +51,49 @@ static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED =   -0x2010;
 static const int RGW_AMQP_RESPONSE_SOCKET_ERROR =         -0x3008;
 static const int RGW_AMQP_NO_REPLY_CODE =                 0x0;
 
-// key class for the connection list
-struct connection_id_t {
-  const std::string host;
-  const int port;
-  const std::string vhost;
-  // constructed from amqp_connection_info struct
-  connection_id_t(const amqp_connection_info& info) 
-    : host(info.host), port(info.port), vhost(info.vhost) {}
-
-  // equality operator and hasher functor are needed 
-  // so that connection_id_t could be used as key in unordered_map
-  bool operator==(const connection_id_t& other) const {
-    return host == other.host && port == other.port && vhost == other.vhost;
+// the amqp_connection_info struct does not hold any memory and just points to the URL string
+// so, strings are copied into connection_id_t
+connection_id_t::connection_id_t(const amqp_connection_info& info, const std::string& _exchange)
+    : host(info.host), port(info.port), vhost(info.vhost), exchange(_exchange), ssl(info.ssl) {}
+
+// equality operator and hasher functor are needed
+// so that connection_id_t could be used as key in unordered_map
+bool operator==(const connection_id_t& lhs, const connection_id_t& rhs) {
+  return lhs.host == rhs.host && lhs.port == rhs.port &&
+    lhs.vhost == rhs.vhost && lhs.exchange == rhs.exchange;
+}
+
+struct connection_id_hasher {
+  std::size_t operator()(const connection_id_t& k) const {
+    std::size_t h = 0;
+    boost::hash_combine(h, k.host);
+    boost::hash_combine(h, k.port);
+    boost::hash_combine(h, k.vhost);
+    boost::hash_combine(h, k.exchange);
+    return h;
   }
-  
-  struct hasher {
-    std::size_t operator()(const connection_id_t& k) const {
-       return ((std::hash<std::string>()(k.host)
-             ^ (std::hash<int>()(k.port) << 1)) >> 1)
-             ^ (std::hash<std::string>()(k.vhost) << 1); 
-    }
-  };
 };
 
 std::string to_string(const connection_id_t& id) {
-    return id.host+":"+std::to_string(id.port)+id.vhost;
+  return fmt::format("{}://{}:{}{}?exchange={}",
+      id.ssl ? "amqps" : "amqp",
+      id.host, id.port, id.vhost, id.exchange);
 }
 
-// connection_t state cleaner
-// could be used for automatic cleanup when getting out of scope
+// automatically cleans amqp state when gets out of scope
 class ConnectionCleaner {
   private:
-    amqp_connection_state_t conn;
+    amqp_connection_state_t state;
   public:
-    ConnectionCleaner(amqp_connection_state_t _conn) : conn(_conn) {}
+    ConnectionCleaner(amqp_connection_state_t _state) : state(_state) {}
     ~ConnectionCleaner() {
-      if (conn) {
-        amqp_destroy_connection(conn);
+      if (state) {
+        amqp_destroy_connection(state);
       }
     }
     // call reset() if cleanup is not needed anymore
     void reset() {
-      conn = nullptr;
+      state = nullptr;
     }
 };
 
@@ -100,9 +101,9 @@ class ConnectionCleaner {
 struct reply_callback_with_tag_t {
   uint64_t tag;
   reply_callback_t cb;
-  
+
   reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {}
-  
+
   bool operator==(uint64_t rhs) {
     return tag == rhs;
   }
@@ -111,44 +112,26 @@ struct reply_callback_with_tag_t {
 typedef std::vector<reply_callback_with_tag_t> CallbackList;
 
 // struct for holding the connection state object as well as the exchange
-// it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
-// since references to deleted objects may still exist in the calling code
 struct connection_t {
-  std::atomic<amqp_connection_state_t> state;
-  std::string exchange;
+  CephContext* cct = nullptr;
+  amqp_connection_state_t state = nullptr;
+  amqp_bytes_t reply_to_queue = amqp_empty_bytes;
+  uint64_t delivery_tag = 1;
+  int status = AMQP_STATUS_OK;
+  int reply_type = AMQP_RESPONSE_NORMAL;
+  int reply_code = RGW_AMQP_NO_REPLY_CODE;
+  CallbackList callbacks;
+  ceph::coarse_real_clock::time_point next_reconnect = ceph::coarse_real_clock::now();
+  bool mandatory = false;
+  const bool use_ssl = false;
   std::string user;
   std::string password;
-  amqp_bytes_t reply_to_queue;
-  uint64_t delivery_tag;
-  int status;
-  int reply_type;
-  int reply_code;
-  mutable std::atomic<int> ref_count;
-  CephContext* cct;
-  CallbackList callbacks;
-  ceph::coarse_real_clock::time_point next_reconnect;
-  bool mandatory;
-  bool use_ssl;
-  bool verify_ssl;
+  bool verify_ssl = true;
   boost::optional<std::string> ca_location;
   utime_t timestamp = ceph_clock_now();
 
-  // default ctor
-  connection_t() :
-    state(nullptr),
-    reply_to_queue(amqp_empty_bytes),
-    delivery_tag(1),
-    status(AMQP_STATUS_OK),
-    reply_type(AMQP_RESPONSE_NORMAL),
-    reply_code(RGW_AMQP_NO_REPLY_CODE),
-    ref_count(0),
-    cct(nullptr),
-    next_reconnect(ceph::coarse_real_clock::now()),
-    mandatory(false),
-    use_ssl(false),
-    verify_ssl(false),
-    ca_location(boost::none)
-  {}
+  connection_t(CephContext* _cct, const amqp_connection_info& info, bool _verify_ssl, boost::optional<const std::string&> _ca_location) :
+    cct(_cct), use_ssl(info.ssl), user(info.user), password(info.password), verify_ssl(_verify_ssl), ca_location(_ca_location) {}
 
   // cleanup of all internal connection resource
   // the object can still remain, and internal connection
@@ -176,28 +159,15 @@ struct connection_t {
   ~connection_t() {
     destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
   }
-
-  friend void intrusive_ptr_add_ref(const connection_t* p);
-  friend void intrusive_ptr_release(const connection_t* p);
 };
 
-// these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
-void intrusive_ptr_add_ref(const connection_t* p) {
-  ++p->ref_count;
-}
-void intrusive_ptr_release(const connection_t* p) {
-  if (--p->ref_count == 0) {
-    delete p;
-  }
-}
-
 // convert connection info to string
 std::string to_string(const amqp_connection_info& info) {
   std::stringstream ss;
   ss << "connection info:" <<
         "\nHost: " << info.host <<
         "\nPort: " << info.port <<
-        "\nUser: " << info.user << 
+        "\nUser: " << info.user <<
         "\nPassword: " << info.password <<
         "\nvhost: " << info.vhost <<
         "\nSSL support: " << info.ssl << std::endl;
@@ -212,7 +182,7 @@ int reply_to_code(const amqp_rpc_reply_t& reply) {
       return RGW_AMQP_NO_REPLY_CODE;
     case AMQP_RESPONSE_LIBRARY_EXCEPTION:
       return reply.library_error;
-    case AMQP_RESPONSE_SERVER_EXCEPTION: 
+    case AMQP_RESPONSE_SERVER_EXCEPTION:
       if (reply.reply.decoded) {
         const amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
         return m->reply_code;
@@ -232,7 +202,7 @@ std::string to_string(const amqp_rpc_reply_t& reply) {
       return "missing RPC reply type";
     case AMQP_RESPONSE_LIBRARY_EXCEPTION:
       return amqp_error_string2(reply.library_error);
-    case AMQP_RESPONSE_SERVER_EXCEPTION: 
+    case AMQP_RESPONSE_SERVER_EXCEPTION:
       {
         switch (reply.reply.id) {
           case AMQP_CONNECTION_CLOSE_METHOD:
@@ -281,7 +251,7 @@ std::string to_string(amqp_status_enum s) {
     case AMQP_STATUS_SOCKET_ERROR:
       return "AMQP_STATUS_SOCKET_ERROR";
     case AMQP_STATUS_INVALID_PARAMETER:
-      return "AMQP_STATUS_INVALID_PARAMETER"; 
+      return "AMQP_STATUS_INVALID_PARAMETER";
     case AMQP_STATUS_TABLE_TOO_BIG:
       return "AMQP_STATUS_TABLE_TOO_BIG";
     case AMQP_STATUS_WRONG_METHOD:
@@ -305,13 +275,13 @@ std::string to_string(amqp_status_enum s) {
       return "AMQP_STATUS_UNSUPPORTED";
 #endif
     case _AMQP_STATUS_NEXT_VALUE:
-      return "AMQP_STATUS_INTERNAL"; 
+      return "AMQP_STATUS_INTERNAL";
     case AMQP_STATUS_TCP_ERROR:
         return "AMQP_STATUS_TCP_ERROR";
     case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR:
       return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR";
     case _AMQP_STATUS_TCP_NEXT_VALUE:
-      return "AMQP_STATUS_INTERNAL"; 
+      return "AMQP_STATUS_INTERNAL";
     case AMQP_STATUS_SSL_ERROR:
       return "AMQP_STATUS_SSL_ERROR";
     case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED:
@@ -321,7 +291,7 @@ std::string to_string(amqp_status_enum s) {
     case AMQP_STATUS_SSL_CONNECTION_FAILED:
       return "AMQP_STATUS_SSL_CONNECTION_FAILED";
     case _AMQP_STATUS_SSL_NEXT_VALUE:
-      return "AMQP_STATUS_INTERNAL"; 
+      return "AMQP_STATUS_INTERNAL";
 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 11, 0, 0)
     case AMQP_STATUS_SSL_SET_ENGINE_FAILED:
       return "AMQP_STATUS_SSL_SET_ENGINE_FAILED";
@@ -374,7 +344,7 @@ std::string status_to_string(int s) {
 #define RETURN_ON_ERROR(C, S, OK) \
   if (!OK) { \
     C->status = S; \
-    return C; \
+    return false; \
   }
 
 // in case of RPC calls, getting the RPC reply and return if an error is detected
@@ -384,7 +354,7 @@ std::string status_to_string(int s) {
         C->status = S; \
         C->reply_type = reply.reply_type; \
         C->reply_code = reply_to_code(reply); \
-        return C; \
+        return false; \
       } \
     }
 
@@ -392,25 +362,25 @@ static const amqp_channel_t CHANNEL_ID = 1;
 static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2;
 
 // utility function to create a connection, when the connection object already exists
-connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) {
-  ceph_assert(conn);
-
+bool new_state(connection_t* conn, const connection_id_t& conn_id) {
+  // state must be null at this point
+  ceph_assert(!conn->state);
   // reset all status codes
-  conn->status = AMQP_STATUS_OK; 
+  conn->status = AMQP_STATUS_OK;
   conn->reply_type = AMQP_RESPONSE_NORMAL;
   conn->reply_code = RGW_AMQP_NO_REPLY_CODE;
 
   auto state = amqp_new_connection();
   if (!state) {
     conn->status = RGW_AMQP_STATUS_CONN_ALLOC_FAILED;
-    return conn;
+    return false;
   }
   // make sure that the connection state is cleaned up in case of error
   ConnectionCleaner state_guard(state);
 
   // create and open socket
   amqp_socket_t *socket = nullptr;
-  if (info.ssl) {
+  if (conn->use_ssl) {
     socket = amqp_ssl_socket_new(state);
 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
     SSL_CTX* ssl_ctx = reinterpret_cast<SSL_CTX*>(amqp_ssl_socket_get_context(socket));
@@ -433,9 +403,9 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
 
   if (!socket) {
     conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED;
-    return conn;
+    return false;
   }
-  if (info.ssl) {
+  if (conn->use_ssl) {
     if (!conn->verify_ssl) {
       amqp_ssl_socket_set_verify_peer(socket, 0);
       amqp_ssl_socket_set_verify_hostname(socket, 0);
@@ -445,32 +415,32 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
       if (s != AMQP_STATUS_OK) {
         conn->status = RGW_AMQP_STATUS_SOCKET_CACERT_FAILED;
         conn->reply_code = s;
-        return conn;
+        return false;
       }
     }
   }
-  const auto s = amqp_socket_open(socket, info.host, info.port);
+  const auto s = amqp_socket_open(socket, conn_id.host.c_str(), conn_id.port);
   if (s < 0) {
     conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED;
     conn->reply_type = RGW_AMQP_RESPONSE_SOCKET_ERROR;
     conn->reply_code = s;
-    return conn;
+    return false;
   }
 
   // login to broker
   const auto reply = amqp_login(state,
-      info.vhost, 
+      conn_id.vhost.c_str(),
       AMQP_DEFAULT_MAX_CHANNELS,
       AMQP_DEFAULT_FRAME_SIZE,
       0,                        // no heartbeat TODO: add conf
       AMQP_SASL_METHOD_PLAIN,   // TODO: add other types of security
-      info.user,
-      info.password);
+      conn->user.c_str(),
+      conn->password.c_str());
   if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
     conn->status = RGW_AMQP_STATUS_LOGIN_FAILED;
     conn->reply_type = reply.reply_type;
     conn->reply_code = reply_to_code(reply);
-    return conn;
+    return false;
   }
 
   // open channels
@@ -493,9 +463,9 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
   // verify that the topic exchange is there
   // TODO: make this step optional
   {
-    const auto ok = amqp_exchange_declare(state, 
+    const auto ok = amqp_exchange_declare(state,
       CHANNEL_ID,
-      amqp_cstring_bytes(conn->exchange.c_str()),
+      amqp_cstring_bytes(conn_id.exchange.c_str()),
       amqp_cstring_bytes("topic"),
       1, // passive - exchange must already exist on broker
       1, // durable
@@ -507,12 +477,12 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
   }
   {
     // create queue for confirmations
-    const auto queue_ok = amqp_queue_declare(state, 
+    const auto queue_ok = amqp_queue_declare(state,
         CHANNEL_ID,         // use the regular channel for this call
         amqp_empty_bytes,   // let broker allocate queue name
-        0,                  // not passive - create the queue 
-        0,                  // not durable 
-        1,                  // exclusive 
+        0,                  // not passive - create the queue
+        0,                  // not durable
+        1,                  // exclusive
         1,                  // auto-delete
         amqp_empty_table    // not args TODO add args from conf: TTL, max length etc.
         );
@@ -520,8 +490,8 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
     RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_Q_DECLARE_FAILED);
 
     // define consumption for connection
-    const auto consume_ok = amqp_basic_consume(state, 
-        CONFIRMING_CHANNEL_ID, 
+    const auto consume_ok = amqp_basic_consume(state,
+        CONFIRMING_CHANNEL_ID,
         queue_ok->queue,
         amqp_empty_bytes, // broker will generate consumer tag
         1,                // messages sent from client are never routed back
@@ -533,45 +503,30 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
     RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED, consume_ok);
     RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED);
     // broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed
-    
+
     state_guard.reset();
     conn->state = state;
     conn->reply_to_queue = amqp_bytes_malloc_dup(queue_ok->queue);
-    return conn;
   }
-}
-
-// utility function to create a new connection
-connection_ptr_t create_new_connection(const amqp_connection_info& info, 
-    const std::string& exchange, bool mandatory_delivery, CephContext* cct, bool verify_ssl, boost::optional<const std::string&> ca_location) { 
-  // create connection state
-  connection_ptr_t conn = new connection_t;
-  conn->exchange = exchange;
-  conn->user.assign(info.user);
-  conn->password.assign(info.password);
-  conn->mandatory = mandatory_delivery;
-  conn->cct = cct;
-  conn->use_ssl = info.ssl;
-  conn->verify_ssl = verify_ssl;
-  conn->ca_location = ca_location;
-  return create_connection(conn, info);
+  return true;
 }
 
 /// struct used for holding messages in the message queue
 struct message_wrapper_t {
-  connection_ptr_t conn; 
+  connection_id_t conn_id;
   std::string topic;
   std::string message;
   reply_callback_t cb;
-  
-  message_wrapper_t(connection_ptr_t& _conn,
+
+  message_wrapper_t(const connection_id_t& _conn_id,
       const std::string& _topic,
       const std::string& _message,
-      reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
+      reply_callback_t _cb) : conn_id(_conn_id), topic(_topic), message(_message), cb(_cb) {}
 };
 
+using connection_t_ptr = std::unique_ptr<connection_t>;
 
-typedef std::unordered_map<connection_id_t, connection_ptr_t, connection_id_t::hasher> ConnectionList;
+typedef std::unordered_map<connection_id_t, connection_t_ptr, connection_id_hasher> ConnectionList;
 typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
 
 // macros used inside a loop where an iterator is either incremented or erased
@@ -606,14 +561,23 @@ private:
 
   void publish_internal(message_wrapper_t* message) {
     const std::unique_ptr<message_wrapper_t> msg_owner(message);
-    auto& conn = message->conn;
+    const auto& conn_id = message->conn_id;
+    auto conn_it = connections.find(conn_id);
+    if (conn_it == connections.end()) {
+      ldout(cct, 1) << "AMQP publish: connection '" << to_string(conn_id) << "' not found" << dendl;
+      if (message->cb) {
+        message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
+      }
+      return;
+    }
+
+    auto& conn = conn_it->second;
 
     conn->timestamp = ceph_clock_now();
 
     if (!conn->is_ok()) {
       // connection had an issue while message was in the queue
-      // TODO add error stats
-      ldout(conn->cct, 1) << "AMQP publish: connection had an issue while message was in the queue" << dendl;
+      ldout(cct, 1) << "AMQP publish: connection '" << to_string(conn_id) << "' is closed" << dendl;
       if (message->cb) {
         message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
       }
@@ -621,20 +585,19 @@ private:
     }
 
     if (message->cb == nullptr) {
-      // TODO add error stats
       const auto rc = amqp_basic_publish(conn->state,
         CHANNEL_ID,
-        amqp_cstring_bytes(conn->exchange.c_str()),
+        amqp_cstring_bytes(conn_id.exchange.c_str()),
         amqp_cstring_bytes(message->topic.c_str()),
         0, // does not have to be routable
         0, // not immediate
         nullptr, // no properties needed
         amqp_cstring_bytes(message->message.c_str()));
       if (rc == AMQP_STATUS_OK) {
-        ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl;
+        ldout(cct, 20) << "AMQP publish (no callback): OK" << dendl;
         return;
       }
-      ldout(conn->cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl;
+      ldout(cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl;
       // an error occurred, close connection
       // it will be retied by the main loop
       conn->destroy(rc);
@@ -642,15 +605,15 @@ private:
     }
 
     amqp_basic_properties_t props;
-    props._flags = 
-      AMQP_BASIC_DELIVERY_MODE_FLAG | 
+    props._flags =
+      AMQP_BASIC_DELIVERY_MODE_FLAG |
       AMQP_BASIC_REPLY_TO_FLAG;
     props.delivery_mode = 2; // persistent delivery TODO take from conf
     props.reply_to = conn->reply_to_queue;
 
     const auto rc = amqp_basic_publish(conn->state,
       CONFIRMING_CHANNEL_ID,
-      amqp_cstring_bytes(conn->exchange.c_str()),
+      amqp_cstring_bytes(conn_id.exchange.c_str()),
       amqp_cstring_bytes(message->topic.c_str()),
       conn->mandatory,
       0, // not immediate
@@ -660,17 +623,17 @@ private:
     if (rc == AMQP_STATUS_OK) {
       auto const q_len = conn->callbacks.size();
       if (q_len < max_inflight) {
-        ldout(conn->cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
+        ldout(cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
         conn->callbacks.emplace_back(conn->delivery_tag++, message->cb);
       } else {
         // immediately invoke callback with error
-        ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
+        ldout(cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
         message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT);
       }
     } else {
       // an error occurred, close connection
       // it will be retied by the main loop
-      ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
+      ldout(cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
       conn->destroy(rc);
       // immediately invoke callback with error
       message->cb(rc);
@@ -702,12 +665,12 @@ private:
       auto incoming_message = false;
       // loop over all connections to read acks
       for (;conn_it != end_it;) {
-        
+
+        const auto& conn_id = conn_it->first;
         auto& conn = conn_it->second;
-        const auto& conn_key = conn_it->first;
 
         if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
-          ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
+          ldout(cct, 20) << "AMQP run: Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
           ERASE_AND_CONTINUE(conn_it, connections);
         }
 
@@ -717,22 +680,15 @@ private:
           if (now >= conn->next_reconnect) {
             // pointers are used temporarily inside the amqp_connection_info object
             // as read-only values, hence the assignment, and const_cast are safe here
-            amqp_connection_info info;
-            info.host = const_cast<char*>(conn_key.host.c_str());
-            info.port = conn_key.port;
-            info.vhost = const_cast<char*>(conn_key.vhost.c_str());
-            info.user = const_cast<char*>(conn->user.c_str());
-            info.password = const_cast<char*>(conn->password.c_str());
-            info.ssl = conn->use_ssl;
-            ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
-            if (create_connection(conn, info)->is_ok() == false) {
-              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry failed. error: " <<
+            ldout(cct, 20) << "AMQP run: retry connection" << dendl;
+            if (!new_state(conn.get(), conn_id)) {
+              ldout(cct, 10) << "AMQP run: connection '" << to_string(conn_id) << "' retry failed. error: " <<
                 status_to_string(conn->status) << " (" << conn->reply_code << ")"  << dendl;
               // TODO: add error counter for failed retries
               // TODO: add exponential backoff for retries
               conn->next_reconnect = now + reconnect_time;
             } else {
-              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry successfull" << dendl;
+              ldout(cct, 10) << "AMQP run: connection '" << to_string(conn_id) << "' retry successfull" << dendl;
             }
           }
           INCREMENT_AND_CONTINUE(conn_it);
@@ -744,7 +700,7 @@ private:
           // TODO mark connection as idle
           INCREMENT_AND_CONTINUE(conn_it);
         }
-       
+
         // this is just to prevent spinning idle, does not indicate that a message
         // was successfully processed or not
         incoming_message = true;
@@ -753,13 +709,13 @@ private:
         if (rc != AMQP_STATUS_OK) {
           // an error occurred, close connection
           // it will be retied by the main loop
-          ldout(conn->cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
+          ldout(cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
           conn->destroy(rc);
           INCREMENT_AND_CONTINUE(conn_it);
         }
 
         if (frame.frame_type != AMQP_FRAME_METHOD) {
-          ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: " 
+          ldout(cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: "
             << unsigned(frame.frame_type) << dendl;
           // handler is for publish confirmation only - handle only method frames
           INCREMENT_AND_CONTINUE(conn_it);
@@ -770,7 +726,7 @@ private:
         int result;
 
         switch (frame.payload.method.id) {
-          case AMQP_BASIC_ACK_METHOD: 
+          case AMQP_BASIC_ACK_METHOD:
             {
               result = AMQP_STATUS_OK;
               const auto ack = (amqp_basic_ack_t*)frame.payload.method.decoded;
@@ -788,12 +744,12 @@ private:
               multiple = nack->multiple;
               break;
             }
-          case AMQP_BASIC_REJECT_METHOD:                                                   
-            {                                                                              
-              result = RGW_AMQP_STATUS_BROKER_NACK;                                        
-              const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;      
-              tag = reject->delivery_tag;                                                  
-              multiple = false;                                                            
+          case AMQP_BASIC_REJECT_METHOD:
+            {
+              result = RGW_AMQP_STATUS_BROKER_NACK;
+              const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;
+              tag = reject->delivery_tag;
+              multiple = false;
               break;
             }
           case AMQP_CONNECTION_CLOSE_METHOD:
@@ -801,42 +757,40 @@ private:
           case AMQP_CHANNEL_CLOSE_METHOD:
             {
               // other side closed the connection, no need to continue
-              ldout(conn->cct, 10) << "AMQP run: connection was closed by broker" << dendl;
+              ldout(cct, 10) << "AMQP run: connection was closed by broker" << dendl;
               conn->destroy(rc);
               INCREMENT_AND_CONTINUE(conn_it);
             }
           case AMQP_BASIC_RETURN_METHOD:
             // message was not delivered, returned to sender
-            ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl;
+            ldout(cct, 10) << "AMQP run: message was not routable" << dendl;
             INCREMENT_AND_CONTINUE(conn_it);
             break;
           default:
             // unexpected method
-            ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl;
+            ldout(cct, 10) << "AMQP run: unexpected message" << dendl;
             INCREMENT_AND_CONTINUE(conn_it);
         }
 
-        const auto& callbacks_end = conn->callbacks.end();
-        const auto& callbacks_begin = conn->callbacks.begin();
-        const auto tag_it = std::find(callbacks_begin, callbacks_end, tag);
-        if (tag_it != callbacks_end) {
+        const auto tag_it = std::find(conn->callbacks.begin(), conn->callbacks.end(), tag);
+        if (tag_it != conn->callbacks.end()) {
           if (multiple) {
             // n/ack all up to (and including) the tag
-            ldout(conn->cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
-            auto it = callbacks_begin;
+            ldout(cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
+            auto it = conn->callbacks.begin();
             while (it->tag <= tag && it != conn->callbacks.end()) {
-              ldout(conn->cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
+              ldout(cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
               it->cb(result);
               it = conn->callbacks.erase(it);
             }
           } else {
             // n/ack a specific tag
-            ldout(conn->cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
+            ldout(cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
             tag_it->cb(result);
             conn->callbacks.erase(tag_it);
           }
         } else {
-          ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
+          ldout(cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
         }
         // just increment the iterator
         ++conn_it;
@@ -856,11 +810,11 @@ private:
 public:
   Manager(size_t _max_connections,
       size_t _max_inflight,
-      size_t _max_queue, 
+      size_t _max_queue,
       long _usec_timeout,
       unsigned reconnect_time_ms,
       unsigned idle_time_ms,
-      CephContext* _cct) : 
+      CephContext* _cct) :
     max_connections(_max_connections),
     max_inflight(_max_inflight),
     max_queue(_max_queue),
@@ -876,9 +830,9 @@ public:
     idle_time(std::chrono::milliseconds(idle_time_ms)),
     reconnect_time(std::chrono::milliseconds(reconnect_time_ms)),
     runner(&Manager::run, this) {
-      // The hashmap has "max connections" as the initial number of buckets, 
+      // The hashmap has "max connections" as the initial number of buckets,
       // and allows for 10 collisions per bucket before rehash.
-      // This is to prevent rehashing so that iterators are not invalidated 
+      // This is to prevent rehashing so that iterators are not invalidated
       // when a new connection is added.
       connections.max_load_factor(10.0);
       // give the runner thread a name for easier debugging
@@ -896,76 +850,68 @@ public:
   }
 
   // connect to a broker, or reuse an existing connection if already connected
-  connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+  bool connect(connection_id_t& id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
         boost::optional<const std::string&> ca_location) {
     if (stopped) {
       ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
-      return nullptr;
+      return false;
     }
 
-    struct amqp_connection_info info;
+    amqp_connection_info info;
     // cache the URL so that parsing could happen in-place
     std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
     const auto retcode = amqp_parse_url(url_cache.data(), &info);
     if (AMQP_STATUS_OK != retcode) {
       ldout(cct, 1) << "AMQP connect: URL parsing failed. error: " << retcode << dendl;
-      return nullptr;
+      return false;
     }
+    connection_id_t tmp_id(info, exchange);
 
-    const connection_id_t id(info);
     std::lock_guard lock(connections_lock);
-    const auto it = connections.find(id);
+    const auto it = connections.find(tmp_id);
     if (it != connections.end()) {
-      if (it->second->exchange != exchange) {
-        ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
-        return nullptr;
-      }
       // connection found - return even if non-ok
       ldout(cct, 20) << "AMQP connect: connection found" << dendl;
-      return it->second;
+      id = it->first;
+      return true;
     }
 
     // connection not found, creating a new one
     if (connection_count >= max_connections) {
       ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
-      return nullptr;
+      return false;
     }
-    const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct, verify_ssl, ca_location);
-    if (!conn->is_ok()) {
-      ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" <<
-              status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl;
-    }
-    // create_new_connection must always return a connection object
-    // even if error occurred during creation. 
-    // in such a case the creation will be retried in the main thread
-    ceph_assert(conn);
+    // if error occurred during creation the creation will be retried in the main thread
     ++connection_count;
+    auto conn = connections.emplace(tmp_id, std::make_unique<connection_t>(cct, info, verify_ssl, ca_location)).first->second.get();
     ldout(cct, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count << dendl;
-    ldout(cct, 10) << "AMQP connect: new connection status is: " << status_to_string(conn->status) << dendl;
-    return connections.emplace(id, conn).first->second;
+    if (!new_state(conn, tmp_id)) {
+      ldout(cct, 1) << "AMQP connect: new connection '" << to_string(tmp_id) << "' is created. but state creation failed (will retry). error: " <<
+        status_to_string(conn->status) << " (" << conn->reply_code << ")"  << dendl;
+    }
+    id = std::move(tmp_id);
+    return true;
   }
 
   // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
-  int publish(connection_ptr_t& conn, 
+  int publish(const connection_id_t& conn_id,
     const std::string& topic,
     const std::string& message) {
     if (stopped) {
       ldout(cct, 1) << "AMQP publish: manager is not running" << dendl;
       return RGW_AMQP_STATUS_MANAGER_STOPPED;
     }
-    if (!conn || !conn->is_ok()) {
-      ldout(cct, 1) << "AMQP publish: no connection" << dendl;
-      return RGW_AMQP_STATUS_CONNECTION_CLOSED;
-    }
-    if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
+    auto wrapper = std::make_unique<message_wrapper_t>(conn_id, topic, message, nullptr);
+    if (messages.push(wrapper.get())) {
+      std::ignore = wrapper.release();
       ++queued;
       return AMQP_STATUS_OK;
     }
     ldout(cct, 1) << "AMQP publish: queue is full" << dendl;
     return RGW_AMQP_STATUS_QUEUE_FULL;
   }
-  
-  int publish_with_confirm(connection_ptr_t& conn, 
+
+  int publish_with_confirm(const connection_id_t& conn_id,
     const std::string& topic,
     const std::string& message,
     reply_callback_t cb) {
@@ -973,11 +919,9 @@ public:
       ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl;
       return RGW_AMQP_STATUS_MANAGER_STOPPED;
     }
-    if (!conn || !conn->is_ok()) {
-      ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl;
-      return RGW_AMQP_STATUS_CONNECTION_CLOSED;
-    }
-    if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
+    auto wrapper = std::make_unique<message_wrapper_t>(conn_id, topic, message, cb);
+    if (messages.push(wrapper.get())) {
+      std::ignore = wrapper.release();
       ++queued;
       return AMQP_STATUS_OK;
     }
@@ -997,7 +941,7 @@ public:
   size_t get_connection_count() const {
     return connection_count;
   }
-  
+
   // get the number of in-flight messages
   size_t get_inflight() const {
     size_t sum = 0;
@@ -1026,7 +970,7 @@ public:
 static Manager* s_manager = nullptr;
 
 static const size_t MAX_CONNECTIONS_DEFAULT = 256;
-static const size_t MAX_INFLIGHT_DEFAULT = 8192; 
+static const size_t MAX_INFLIGHT_DEFAULT = 8192;
 static const size_t MAX_QUEUE_DEFAULT = 8192;
 static const long READ_TIMEOUT_USEC = 100;
 static const unsigned IDLE_TIME_MS = 100;
@@ -1037,7 +981,7 @@ bool init(CephContext* cct) {
     return false;
   }
   // TODO: take conf from CephContext
-  s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 
+  s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT,
       READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct);
   return true;
 }
@@ -1047,32 +991,32 @@ void shutdown() {
   s_manager = nullptr;
 }
 
-connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+bool connect(connection_id_t& conn_id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
         boost::optional<const std::string&> ca_location) {
-  if (!s_manager) return nullptr;
-  return s_manager->connect(url, exchange, mandatory_delivery, verify_ssl, ca_location);
+  if (!s_manager) return false;
+  return s_manager->connect(conn_id, url, exchange, mandatory_delivery, verify_ssl, ca_location);
 }
 
-int publish(connection_ptr_t& conn, 
+int publish(const connection_id_t& conn_id,
     const std::string& topic,
     const std::string& message) {
   if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
-  return s_manager->publish(conn, topic, message);
+  return s_manager->publish(conn_id, topic, message);
 }
 
-int publish_with_confirm(connection_ptr_t& conn, 
+int publish_with_confirm(const connection_id_t& conn_id,
     const std::string& topic,
     const std::string& message,
     reply_callback_t cb) {
   if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
-  return s_manager->publish_with_confirm(conn, topic, message, cb);
+  return s_manager->publish_with_confirm(conn_id, topic, message, cb);
 }
 
 size_t get_connection_count() {
   if (!s_manager) return 0;
   return s_manager->get_connection_count();
 }
-  
+
 size_t get_inflight() {
   if (!s_manager) return 0;
   return s_manager->get_inflight();