]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_amqp.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_amqp.cc
index a5f6ca00d84f411f257607bcf6980ea01864fa22..6a11adf0692a18bb5626dfa893973a9746a3860f 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "rgw_amqp.h"
 #include <amqp.h>
+#include <amqp_ssl_socket.h>
 #include <amqp_tcp_socket.h>
 #include <amqp_framing.h>
 #include "include/ceph_assert.h"
@@ -16,6 +17,7 @@
 #include <mutex>
 #include <boost/lockfree/queue.hpp>
 #include "common/dout.h"
+#include <openssl/ssl.h>
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -43,6 +45,7 @@ static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED = -0x2006;
 static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED =       -0x2007;
 static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008;
 static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009;
+static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED =   -0x2010;
 
 static const int RGW_AMQP_RESPONSE_SOCKET_ERROR =         -0x3008;
 static const int RGW_AMQP_NO_REPLY_CODE =                 0x0;
@@ -72,7 +75,7 @@ struct connection_id_t {
 };
 
 std::string to_string(const connection_id_t& id) {
-    return id.host+":"+std::to_string(id.port)+"/"+id.vhost;
+    return id.host+":"+std::to_string(id.port)+id.vhost;
 }
 
 // connection_t state cleaner
@@ -111,12 +114,11 @@ typedef std::vector<reply_callback_with_tag_t> CallbackList;
 // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
 // since references to deleted objects may still exist in the calling code
 struct connection_t {
-  amqp_connection_state_t state;
+  std::atomic<amqp_connection_state_t> state;
   std::string exchange;
   std::string user;
   std::string password;
   amqp_bytes_t reply_to_queue;
-  bool marked_for_deletion;
   uint64_t delivery_tag;
   int status;
   int reply_type;
@@ -124,18 +126,29 @@ struct connection_t {
   mutable std::atomic<int> ref_count;
   CephContext* cct;
   CallbackList callbacks;
+  ceph::coarse_real_clock::time_point next_reconnect;
+  bool mandatory;
+  bool use_ssl;
+  bool verify_ssl;
+  boost::optional<const std::string&> ca_location;
+  utime_t timestamp = ceph_clock_now();
 
   // default ctor
   connection_t() :
     state(nullptr),
     reply_to_queue(amqp_empty_bytes),
-    marked_for_deletion(false),
     delivery_tag(1),
     status(AMQP_STATUS_OK),
     reply_type(AMQP_RESPONSE_NORMAL),
     reply_code(RGW_AMQP_NO_REPLY_CODE),
     ref_count(0),
-    cct(nullptr) {}
+    cct(nullptr),
+    next_reconnect(ceph::coarse_real_clock::now()),
+    mandatory(false),
+    use_ssl(false),
+    verify_ssl(false),
+    ca_location(boost::none)
+  {}
 
   // cleanup of all internal connection resource
   // the object can still remain, and internal connection
@@ -156,7 +169,7 @@ struct connection_t {
   }
 
   bool is_ok() const {
-    return (state != nullptr && !marked_for_deletion);
+    return (state != nullptr);
   }
 
   // dtor also destroys the internals
@@ -309,8 +322,9 @@ std::string to_string(amqp_status_enum s) {
       return "AMQP_STATUS_SSL_CONNECTION_FAILED";
     case _AMQP_STATUS_SSL_NEXT_VALUE:
       return "AMQP_STATUS_INTERNAL"; 
+    default:
+      return "AMQP_STATUS_UNKNOWN";
   }
-  return "AMQP_STATUS_UNKNOWN";
 }
 
 // TODO: add status_to_string on the connection object to prinf full status
@@ -346,6 +360,8 @@ std::string status_to_string(int s) {
       return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
     case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED:
       return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
+    case RGW_AMQP_STATUS_SOCKET_CACERT_FAILED:
+      return "RGW_AMQP_STATUS_SOCKET_CACERT_FAILED";
   }
   return to_string((amqp_status_enum)s);
 }
@@ -373,9 +389,8 @@ static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2;
 
 // utility function to create a connection, when the connection object already exists
 connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) {
-  // pointer must be valid and not marked for deletion
-  ceph_assert(conn && !conn->marked_for_deletion);
-  
+  ceph_assert(conn);
+
   // reset all status codes
   conn->status = AMQP_STATUS_OK; 
   conn->reply_type = AMQP_RESPONSE_NORMAL;
@@ -390,11 +405,46 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
   ConnectionCleaner state_guard(state);
 
   // create and open socket
-  auto socket = amqp_tcp_socket_new(state);
+  amqp_socket_t *socket = nullptr;
+  if (info.ssl) {
+    socket = amqp_ssl_socket_new(state);
+#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
+    SSL_CTX* ssl_ctx = reinterpret_cast<SSL_CTX*>(amqp_ssl_socket_get_context(socket));
+#else
+    // taken from https://github.com/alanxz/rabbitmq-c/pull/560
+    struct hack {
+                 const struct amqp_socket_class_t *klass;
+                 SSL_CTX *ctx;
+         };
+
+         struct hack *h = reinterpret_cast<struct hack*>(socket);
+    SSL_CTX* ssl_ctx = h->ctx;
+#endif
+    // ensure system CA certificates get loaded
+    SSL_CTX_set_default_verify_paths(ssl_ctx);
+  }
+  else {
+    socket = amqp_tcp_socket_new(state);
+  }
+
   if (!socket) {
     conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED;
     return conn;
   }
+  if (info.ssl) {
+    if (!conn->verify_ssl) {
+      amqp_ssl_socket_set_verify_peer(socket, 0);
+      amqp_ssl_socket_set_verify_hostname(socket, 0);
+    }
+    if (conn->ca_location.has_value()) {
+      const auto s = amqp_ssl_socket_set_cacert(socket, conn->ca_location.get().c_str());
+      if (s != AMQP_STATUS_OK) {
+        conn->status = RGW_AMQP_STATUS_SOCKET_CACERT_FAILED;
+        conn->reply_code = s;
+        return conn;
+      }
+    }
+  }
   const auto s = amqp_socket_open(socket, info.host, info.port);
   if (s < 0) {
     conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED;
@@ -489,13 +539,17 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
 
 // utility function to create a new connection
 connection_ptr_t create_new_connection(const amqp_connection_info& info, 
-    const std::string& exchange, CephContext* cct) { 
+    const std::string& exchange, bool mandatory_delivery, CephContext* cct, bool verify_ssl, boost::optional<const std::string&> ca_location) { 
   // create connection state
   connection_ptr_t conn = new connection_t;
   conn->exchange = exchange;
   conn->user.assign(info.user);
   conn->password.assign(info.password);
+  conn->mandatory = mandatory_delivery;
   conn->cct = cct;
+  conn->use_ssl = info.ssl;
+  conn->verify_ssl = verify_ssl;
+  conn->ca_location = ca_location;
   return create_connection(conn, info);
 }
 
@@ -531,9 +585,10 @@ public:
   const size_t max_connections;
   const size_t max_inflight;
   const size_t max_queue;
+  const size_t max_idle_time;
 private:
   std::atomic<size_t> connection_count;
-  bool stopped;
+  std::atomic<bool> stopped;
   struct timeval read_timeout;
   ConnectionList connections;
   MessageQueue messages;
@@ -541,12 +596,16 @@ private:
   std::atomic<size_t> dequeued;
   CephContext* const cct;
   mutable std::mutex connections_lock;
+  const ceph::coarse_real_clock::duration idle_time;
+  const ceph::coarse_real_clock::duration reconnect_time;
   std::thread runner;
 
   void publish_internal(message_wrapper_t* message) {
     const std::unique_ptr<message_wrapper_t> msg_owner(message);
     auto& conn = message->conn;
 
+    conn->timestamp = ceph_clock_now();
+
     if (!conn->is_ok()) {
       // connection had an issue while message was in the queue
       // TODO add error stats
@@ -563,9 +622,9 @@ private:
         CHANNEL_ID,
         amqp_cstring_bytes(conn->exchange.c_str()),
         amqp_cstring_bytes(message->topic.c_str()),
-        1, // mandatory, TODO: take from conf
+        0, // does not have to be routable
         0, // not immediate
-        nullptr,
+        nullptr, // no properties needed
         amqp_cstring_bytes(message->message.c_str()));
       if (rc == AMQP_STATUS_OK) {
         ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl;
@@ -589,7 +648,7 @@ private:
       CONFIRMING_CHANNEL_ID,
       amqp_cstring_bytes(conn->exchange.c_str()),
       amqp_cstring_bytes(message->topic.c_str()),
-      1, // mandatory, TODO: take from conf
+      conn->mandatory,
       0, // not immediate
       &props,
       amqp_cstring_bytes(message->message.c_str()));
@@ -620,7 +679,7 @@ private:
   // (3) manages deleted connections
   // (4) TODO reconnect on connection errors
   // (5) TODO cleanup timedout callbacks
-  void run() {
+  void run() noexcept {
     amqp_frame_t frame;
     while (!stopped) {
 
@@ -641,33 +700,36 @@ private:
       for (;conn_it != end_it;) {
         
         auto& conn = conn_it->second;
-        // delete the connection if marked for deletion
-        if (conn->marked_for_deletion) {
-          ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
-          conn->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
-          std::lock_guard lock(connections_lock);
-          // erase is safe - does not invalidate any other iterator
-          // lock so no insertion happens at the same time
+        const auto& conn_key = conn_it->first;
+
+        if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+          ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
           ERASE_AND_CONTINUE(conn_it, connections);
         }
 
         // try to reconnect the connection if it has an error
         if (!conn->is_ok()) {
-          // pointers are used temporarily inside the amqp_connection_info object
-          // as read-only values, hence the assignment, and const_cast are safe here
-          amqp_connection_info info;
-          info.host = const_cast<char*>(conn_it->first.host.c_str());
-          info.port = conn_it->first.port;
-          info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
-          info.user = const_cast<char*>(conn->user.c_str());
-          info.password = const_cast<char*>(conn->password.c_str());
-          ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
-          if (create_connection(conn, info)->is_ok() == false) {
-            ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed" << dendl;
-            // TODO: add error counter for failed retries
-            // TODO: add exponential backoff for retries
-          } else {
-            ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
+          const auto now = ceph::coarse_real_clock::now();
+          if (now >= conn->next_reconnect) {
+            // pointers are used temporarily inside the amqp_connection_info object
+            // as read-only values, hence the assignment, and const_cast are safe here
+            amqp_connection_info info;
+            info.host = const_cast<char*>(conn_key.host.c_str());
+            info.port = conn_key.port;
+            info.vhost = const_cast<char*>(conn_key.vhost.c_str());
+            info.user = const_cast<char*>(conn->user.c_str());
+            info.password = const_cast<char*>(conn->password.c_str());
+            info.ssl = conn->use_ssl;
+            ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
+            if (create_connection(conn, info)->is_ok() == false) {
+              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry failed. error: " <<
+                status_to_string(conn->status) << " (" << conn->reply_code << ")"  << dendl;
+              // TODO: add error counter for failed retries
+              // TODO: add exponential backoff for retries
+              conn->next_reconnect = now + reconnect_time;
+            } else {
+              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry successfull" << dendl;
+            }
           }
           INCREMENT_AND_CONTINUE(conn_it);
         }
@@ -693,9 +755,9 @@ private:
         }
 
         if (frame.frame_type != AMQP_FRAME_METHOD) {
-          ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages" << dendl;
+          ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: " 
+            << unsigned(frame.frame_type) << dendl;
           // handler is for publish confirmation only - handle only method frames
-          // TODO: add a counter
           INCREMENT_AND_CONTINUE(conn_it);
         }
 
@@ -722,6 +784,14 @@ private:
               multiple = nack->multiple;
               break;
             }
+          case AMQP_BASIC_REJECT_METHOD:                                                   
+            {                                                                              
+              result = RGW_AMQP_STATUS_BROKER_NACK;                                        
+              const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;      
+              tag = reject->delivery_tag;                                                  
+              multiple = false;                                                            
+              break;
+            }
           case AMQP_CONNECTION_CLOSE_METHOD:
             // TODO on channel close, no need to reopen the connection
           case AMQP_CHANNEL_CLOSE_METHOD:
@@ -733,13 +803,11 @@ private:
             }
           case AMQP_BASIC_RETURN_METHOD:
             // message was not delivered, returned to sender
-            // TODO: add a counter
-            ldout(conn->cct, 10) << "AMQP run: message delivery error" << dendl;
+            ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl;
             INCREMENT_AND_CONTINUE(conn_it);
             break;
           default:
             // unexpected method
-            // TODO: add a counter
             ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl;
             INCREMENT_AND_CONTINUE(conn_it);
         }
@@ -764,7 +832,6 @@ private:
             conn->callbacks.erase(tag_it);
           }
         } else {
-          // TODO add counter for acks with no callback
           ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
         }
         // just increment the iterator
@@ -772,7 +839,7 @@ private:
       }
       // if no messages were received or published, sleep for 100ms
       if (count == 0 && !incoming_message) {
-        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        std::this_thread::sleep_for(idle_time);
       }
     }
   }
@@ -787,10 +854,13 @@ public:
       size_t _max_inflight,
       size_t _max_queue, 
       long _usec_timeout,
+      unsigned reconnect_time_ms,
+      unsigned idle_time_ms,
       CephContext* _cct) : 
     max_connections(_max_connections),
     max_inflight(_max_inflight),
     max_queue(_max_queue),
+    max_idle_time(30),
     connection_count(0),
     stopped(false),
     read_timeout{0, _usec_timeout},
@@ -799,6 +869,8 @@ public:
     queued(0),
     dequeued(0),
     cct(_cct),
+    idle_time(std::chrono::milliseconds(idle_time_ms)),
+    reconnect_time(std::chrono::milliseconds(reconnect_time_ms)),
     runner(&Manager::run, this) {
       // The hashmap has "max connections" as the initial number of buckets, 
       // and allows for 10 collisions per bucket before rehash.
@@ -819,19 +891,10 @@ public:
     stopped = true;
   }
 
-  // disconnect from a broker
-  bool disconnect(connection_ptr_t& conn) {
-    if (!conn || stopped) {
-      return false;
-    }
-    conn->marked_for_deletion = true;
-    return true;
-  }
-
   // connect to a broker, or reuse an existing connection if already connected
-  connection_ptr_t connect(const std::string& url, const std::string& exchange) {
+  connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+        boost::optional<const std::string&> ca_location) {
     if (stopped) {
-      // TODO: increment counter
       ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
       return nullptr;
     }
@@ -839,9 +902,9 @@ public:
     struct amqp_connection_info info;
     // cache the URL so that parsing could happen in-place
     std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
-    if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) {
-      // TODO: increment counter
-      ldout(cct, 1) << "AMQP connect: URL parsing failed" << dendl;
+    const auto retcode = amqp_parse_url(url_cache.data(), &info);
+    if (AMQP_STATUS_OK != retcode) {
+      ldout(cct, 1) << "AMQP connect: URL parsing failed. error: " << retcode << dendl;
       return nullptr;
     }
 
@@ -849,12 +912,7 @@ public:
     std::lock_guard lock(connections_lock);
     const auto it = connections.find(id);
     if (it != connections.end()) {
-      if (it->second->marked_for_deletion) {
-        // TODO: increment counter
-        ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl;
-        return nullptr;
-      } else if (it->second->exchange != exchange) {
-        // TODO: increment counter
+      if (it->second->exchange != exchange) {
         ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
         return nullptr;
       }
@@ -865,11 +923,14 @@ public:
 
     // connection not found, creating a new one
     if (connection_count >= max_connections) {
-      // TODO: increment counter
       ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
       return nullptr;
     }
-    const auto conn = create_new_connection(info, exchange, cct);
+    const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct, verify_ssl, ca_location);
+    if (!conn->is_ok()) {
+      ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" <<
+              status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl;
+    }
     // create_new_connection must always return a connection object
     // even if error occurred during creation. 
     // in such a case the creation will be retried in the main thread
@@ -885,15 +946,18 @@ public:
     const std::string& topic,
     const std::string& message) {
     if (stopped) {
+      ldout(cct, 1) << "AMQP publish: manager is not running" << dendl;
       return RGW_AMQP_STATUS_MANAGER_STOPPED;
     }
     if (!conn || !conn->is_ok()) {
+      ldout(cct, 1) << "AMQP publish: no connection" << dendl;
       return RGW_AMQP_STATUS_CONNECTION_CLOSED;
     }
     if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
       ++queued;
       return AMQP_STATUS_OK;
     }
+    ldout(cct, 1) << "AMQP publish: queue is full" << dendl;
     return RGW_AMQP_STATUS_QUEUE_FULL;
   }
   
@@ -902,15 +966,18 @@ public:
     const std::string& message,
     reply_callback_t cb) {
     if (stopped) {
+      ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl;
       return RGW_AMQP_STATUS_MANAGER_STOPPED;
     }
     if (!conn || !conn->is_ok()) {
+      ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl;
       return RGW_AMQP_STATUS_CONNECTION_CLOSED;
     }
     if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
       ++queued;
       return AMQP_STATUS_OK;
     }
+    ldout(cct, 1) << "AMQP publish_with_confirm: queue is full" << dendl;
     return RGW_AMQP_STATUS_QUEUE_FULL;
   }
 
@@ -932,6 +999,7 @@ public:
     size_t sum = 0;
     std::lock_guard lock(connections_lock);
     std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
+        // concurrent access to the callback vector is safe without locking
         sum += conn_pair.second->callbacks.size();
       });
     return sum;
@@ -956,13 +1024,17 @@ static Manager* s_manager = nullptr;
 static const size_t MAX_CONNECTIONS_DEFAULT = 256;
 static const size_t MAX_INFLIGHT_DEFAULT = 8192; 
 static const size_t MAX_QUEUE_DEFAULT = 8192;
+static const long READ_TIMEOUT_USEC = 100;
+static const unsigned IDLE_TIME_MS = 100;
+static const unsigned RECONNECT_TIME_MS = 100;
 
 bool init(CephContext* cct) {
   if (s_manager) {
     return false;
   }
   // TODO: take conf from CephContext
-  s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct);
+  s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 
+      READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct);
   return true;
 }
 
@@ -971,9 +1043,10 @@ void shutdown() {
   s_manager = nullptr;
 }
 
-connection_ptr_t connect(const std::string& url, const std::string& exchange) {
+connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+        boost::optional<const std::string&> ca_location) {
   if (!s_manager) return nullptr;
-  return s_manager->connect(url, exchange);
+  return s_manager->connect(url, exchange, mandatory_delivery, verify_ssl, ca_location);
 }
 
 int publish(connection_ptr_t& conn, 
@@ -1026,10 +1099,5 @@ size_t get_max_queue() {
   return s_manager->max_queue;
 }
 
-bool disconnect(connection_ptr_t& conn) {
-  if (!s_manager) return false;
-  return s_manager->disconnect(conn);
-}
-
 } // namespace amqp