]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_amqp.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_amqp.cc
index 7a1b9df50619bb4f218949a3266860486e1547fe..6a11adf0692a18bb5626dfa893973a9746a3860f 100644 (file)
@@ -114,12 +114,11 @@ typedef std::vector<reply_callback_with_tag_t> CallbackList;
 // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
 // since references to deleted objects may still exist in the calling code
 struct connection_t {
-  amqp_connection_state_t state;
+  std::atomic<amqp_connection_state_t> state;
   std::string exchange;
   std::string user;
   std::string password;
   amqp_bytes_t reply_to_queue;
-  bool marked_for_deletion;
   uint64_t delivery_tag;
   int status;
   int reply_type;
@@ -129,14 +128,15 @@ struct connection_t {
   CallbackList callbacks;
   ceph::coarse_real_clock::time_point next_reconnect;
   bool mandatory;
+  bool use_ssl;
   bool verify_ssl;
   boost::optional<const std::string&> ca_location;
+  utime_t timestamp = ceph_clock_now();
 
   // default ctor
   connection_t() :
     state(nullptr),
     reply_to_queue(amqp_empty_bytes),
-    marked_for_deletion(false),
     delivery_tag(1),
     status(AMQP_STATUS_OK),
     reply_type(AMQP_RESPONSE_NORMAL),
@@ -145,6 +145,7 @@ struct connection_t {
     cct(nullptr),
     next_reconnect(ceph::coarse_real_clock::now()),
     mandatory(false),
+    use_ssl(false),
     verify_ssl(false),
     ca_location(boost::none)
   {}
@@ -168,7 +169,7 @@ struct connection_t {
   }
 
   bool is_ok() const {
-    return (state != nullptr && !marked_for_deletion);
+    return (state != nullptr);
   }
 
   // dtor also destroys the internals
@@ -321,8 +322,9 @@ std::string to_string(amqp_status_enum s) {
       return "AMQP_STATUS_SSL_CONNECTION_FAILED";
     case _AMQP_STATUS_SSL_NEXT_VALUE:
       return "AMQP_STATUS_INTERNAL"; 
+    default:
+      return "AMQP_STATUS_UNKNOWN";
   }
-  return "AMQP_STATUS_UNKNOWN";
 }
 
 // TODO: add status_to_string on the connection object to prinf full status
@@ -387,9 +389,8 @@ static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2;
 
 // utility function to create a connection, when the connection object already exists
 connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) {
-  // pointer must be valid and not marked for deletion
-  ceph_assert(conn && !conn->marked_for_deletion);
-  
+  ceph_assert(conn);
+
   // reset all status codes
   conn->status = AMQP_STATUS_OK; 
   conn->reply_type = AMQP_RESPONSE_NORMAL;
@@ -546,8 +547,9 @@ connection_ptr_t create_new_connection(const amqp_connection_info& info,
   conn->password.assign(info.password);
   conn->mandatory = mandatory_delivery;
   conn->cct = cct;
+  conn->use_ssl = info.ssl;
   conn->verify_ssl = verify_ssl;
-  conn->ca_location =  ca_location;
+  conn->ca_location = ca_location;
   return create_connection(conn, info);
 }
 
@@ -583,9 +585,10 @@ public:
   const size_t max_connections;
   const size_t max_inflight;
   const size_t max_queue;
+  const size_t max_idle_time;
 private:
   std::atomic<size_t> connection_count;
-  bool stopped;
+  std::atomic<bool> stopped;
   struct timeval read_timeout;
   ConnectionList connections;
   MessageQueue messages;
@@ -601,6 +604,8 @@ private:
     const std::unique_ptr<message_wrapper_t> msg_owner(message);
     auto& conn = message->conn;
 
+    conn->timestamp = ceph_clock_now();
+
     if (!conn->is_ok()) {
       // connection had an issue while message was in the queue
       // TODO add error stats
@@ -674,7 +679,7 @@ private:
   // (3) manages deleted connections
   // (4) TODO reconnect on connection errors
   // (5) TODO cleanup timedout callbacks
-  void run() {
+  void run() noexcept {
     amqp_frame_t frame;
     while (!stopped) {
 
@@ -695,13 +700,10 @@ private:
       for (;conn_it != end_it;) {
         
         auto& conn = conn_it->second;
-        // delete the connection if marked for deletion
-        if (conn->marked_for_deletion) {
-          ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
-          conn->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
-          std::lock_guard lock(connections_lock);
-          // erase is safe - does not invalidate any other iterator
-          // lock so no insertion happens at the same time
+        const auto& conn_key = conn_it->first;
+
+        if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+          ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
           ERASE_AND_CONTINUE(conn_it, connections);
         }
 
@@ -712,20 +714,21 @@ private:
             // pointers are used temporarily inside the amqp_connection_info object
             // as read-only values, hence the assignment, and const_cast are safe here
             amqp_connection_info info;
-            info.host = const_cast<char*>(conn_it->first.host.c_str());
-            info.port = conn_it->first.port;
-            info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
+            info.host = const_cast<char*>(conn_key.host.c_str());
+            info.port = conn_key.port;
+            info.vhost = const_cast<char*>(conn_key.vhost.c_str());
             info.user = const_cast<char*>(conn->user.c_str());
             info.password = const_cast<char*>(conn->password.c_str());
+            info.ssl = conn->use_ssl;
             ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
             if (create_connection(conn, info)->is_ok() == false) {
-              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed. error: " <<
+              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry failed. error: " <<
                 status_to_string(conn->status) << " (" << conn->reply_code << ")"  << dendl;
               // TODO: add error counter for failed retries
               // TODO: add exponential backoff for retries
               conn->next_reconnect = now + reconnect_time;
             } else {
-              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
+              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry successfull" << dendl;
             }
           }
           INCREMENT_AND_CONTINUE(conn_it);
@@ -857,6 +860,7 @@ public:
     max_connections(_max_connections),
     max_inflight(_max_inflight),
     max_queue(_max_queue),
+    max_idle_time(30),
     connection_count(0),
     stopped(false),
     read_timeout{0, _usec_timeout},
@@ -887,15 +891,6 @@ public:
     stopped = true;
   }
 
-  // disconnect from a broker
-  bool disconnect(connection_ptr_t& conn) {
-    if (!conn || stopped) {
-      return false;
-    }
-    conn->marked_for_deletion = true;
-    return true;
-  }
-
   // connect to a broker, or reuse an existing connection if already connected
   connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
         boost::optional<const std::string&> ca_location) {
@@ -907,8 +902,9 @@ public:
     struct amqp_connection_info info;
     // cache the URL so that parsing could happen in-place
     std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
-    if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) {
-      ldout(cct, 1) << "AMQP connect: URL parsing failed" << dendl;
+    const auto retcode = amqp_parse_url(url_cache.data(), &info);
+    if (AMQP_STATUS_OK != retcode) {
+      ldout(cct, 1) << "AMQP connect: URL parsing failed. error: " << retcode << dendl;
       return nullptr;
     }
 
@@ -916,10 +912,7 @@ public:
     std::lock_guard lock(connections_lock);
     const auto it = connections.find(id);
     if (it != connections.end()) {
-      if (it->second->marked_for_deletion) {
-        ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl;
-        return nullptr;
-      } else if (it->second->exchange != exchange) {
+      if (it->second->exchange != exchange) {
         ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
         return nullptr;
       }
@@ -1006,6 +999,7 @@ public:
     size_t sum = 0;
     std::lock_guard lock(connections_lock);
     std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
+        // concurrent access to the callback vector is safe without locking
         sum += conn_pair.second->callbacks.size();
       });
     return sum;
@@ -1105,10 +1099,5 @@ size_t get_max_queue() {
   return s_manager->max_queue;
 }
 
-bool disconnect(connection_ptr_t& conn) {
-  if (!s_manager) return false;
-  return s_manager->disconnect(conn);
-}
-
 } // namespace amqp