]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_amqp.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_amqp.cc
index 78446e88f68e667443406700a2659aafdabb57cf..6a11adf0692a18bb5626dfa893973a9746a3860f 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "rgw_amqp.h"
 #include <amqp.h>
+#include <amqp_ssl_socket.h>
 #include <amqp_tcp_socket.h>
 #include <amqp_framing.h>
 #include "include/ceph_assert.h"
@@ -16,6 +17,7 @@
 #include <mutex>
 #include <boost/lockfree/queue.hpp>
 #include "common/dout.h"
+#include <openssl/ssl.h>
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -43,6 +45,7 @@ static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED = -0x2006;
 static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED =       -0x2007;
 static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008;
 static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009;
+static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED =   -0x2010;
 
 static const int RGW_AMQP_RESPONSE_SOCKET_ERROR =         -0x3008;
 static const int RGW_AMQP_NO_REPLY_CODE =                 0x0;
@@ -111,12 +114,11 @@ typedef std::vector<reply_callback_with_tag_t> CallbackList;
 // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
 // since references to deleted objects may still exist in the calling code
 struct connection_t {
-  amqp_connection_state_t state;
+  std::atomic<amqp_connection_state_t> state;
   std::string exchange;
   std::string user;
   std::string password;
   amqp_bytes_t reply_to_queue;
-  bool marked_for_deletion;
   uint64_t delivery_tag;
   int status;
   int reply_type;
@@ -126,12 +128,15 @@ struct connection_t {
   CallbackList callbacks;
   ceph::coarse_real_clock::time_point next_reconnect;
   bool mandatory;
+  bool use_ssl;
+  bool verify_ssl;
+  boost::optional<const std::string&> ca_location;
+  utime_t timestamp = ceph_clock_now();
 
   // default ctor
   connection_t() :
     state(nullptr),
     reply_to_queue(amqp_empty_bytes),
-    marked_for_deletion(false),
     delivery_tag(1),
     status(AMQP_STATUS_OK),
     reply_type(AMQP_RESPONSE_NORMAL),
@@ -139,7 +144,10 @@ struct connection_t {
     ref_count(0),
     cct(nullptr),
     next_reconnect(ceph::coarse_real_clock::now()),
-    mandatory(false)
+    mandatory(false),
+    use_ssl(false),
+    verify_ssl(false),
+    ca_location(boost::none)
   {}
 
   // cleanup of all internal connection resource
@@ -161,7 +169,7 @@ struct connection_t {
   }
 
   bool is_ok() const {
-    return (state != nullptr && !marked_for_deletion);
+    return (state != nullptr);
   }
 
   // dtor also destroys the internals
@@ -314,8 +322,9 @@ std::string to_string(amqp_status_enum s) {
       return "AMQP_STATUS_SSL_CONNECTION_FAILED";
     case _AMQP_STATUS_SSL_NEXT_VALUE:
       return "AMQP_STATUS_INTERNAL"; 
+    default:
+      return "AMQP_STATUS_UNKNOWN";
   }
-  return "AMQP_STATUS_UNKNOWN";
 }
 
 // TODO: add status_to_string on the connection object to prinf full status
@@ -351,6 +360,8 @@ std::string status_to_string(int s) {
       return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
     case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED:
       return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
+    case RGW_AMQP_STATUS_SOCKET_CACERT_FAILED:
+      return "RGW_AMQP_STATUS_SOCKET_CACERT_FAILED";
   }
   return to_string((amqp_status_enum)s);
 }
@@ -378,9 +389,8 @@ static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2;
 
 // utility function to create a connection, when the connection object already exists
 connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) {
-  // pointer must be valid and not marked for deletion
-  ceph_assert(conn && !conn->marked_for_deletion);
-  
+  ceph_assert(conn);
+
   // reset all status codes
   conn->status = AMQP_STATUS_OK; 
   conn->reply_type = AMQP_RESPONSE_NORMAL;
@@ -395,11 +405,46 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
   ConnectionCleaner state_guard(state);
 
   // create and open socket
-  auto socket = amqp_tcp_socket_new(state);
+  amqp_socket_t *socket = nullptr;
+  if (info.ssl) {
+    socket = amqp_ssl_socket_new(state);
+#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
+    SSL_CTX* ssl_ctx = reinterpret_cast<SSL_CTX*>(amqp_ssl_socket_get_context(socket));
+#else
+    // taken from https://github.com/alanxz/rabbitmq-c/pull/560
+    struct hack {
+                 const struct amqp_socket_class_t *klass;
+                 SSL_CTX *ctx;
+         };
+
+         struct hack *h = reinterpret_cast<struct hack*>(socket);
+    SSL_CTX* ssl_ctx = h->ctx;
+#endif
+    // ensure system CA certificates get loaded
+    SSL_CTX_set_default_verify_paths(ssl_ctx);
+  }
+  else {
+    socket = amqp_tcp_socket_new(state);
+  }
+
   if (!socket) {
     conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED;
     return conn;
   }
+  if (info.ssl) {
+    if (!conn->verify_ssl) {
+      amqp_ssl_socket_set_verify_peer(socket, 0);
+      amqp_ssl_socket_set_verify_hostname(socket, 0);
+    }
+    if (conn->ca_location.has_value()) {
+      const auto s = amqp_ssl_socket_set_cacert(socket, conn->ca_location.get().c_str());
+      if (s != AMQP_STATUS_OK) {
+        conn->status = RGW_AMQP_STATUS_SOCKET_CACERT_FAILED;
+        conn->reply_code = s;
+        return conn;
+      }
+    }
+  }
   const auto s = amqp_socket_open(socket, info.host, info.port);
   if (s < 0) {
     conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED;
@@ -494,7 +539,7 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
 
 // utility function to create a new connection
 connection_ptr_t create_new_connection(const amqp_connection_info& info, 
-    const std::string& exchange, bool mandatory_delivery, CephContext* cct) { 
+    const std::string& exchange, bool mandatory_delivery, CephContext* cct, bool verify_ssl, boost::optional<const std::string&> ca_location) { 
   // create connection state
   connection_ptr_t conn = new connection_t;
   conn->exchange = exchange;
@@ -502,6 +547,9 @@ connection_ptr_t create_new_connection(const amqp_connection_info& info,
   conn->password.assign(info.password);
   conn->mandatory = mandatory_delivery;
   conn->cct = cct;
+  conn->use_ssl = info.ssl;
+  conn->verify_ssl = verify_ssl;
+  conn->ca_location = ca_location;
   return create_connection(conn, info);
 }
 
@@ -537,9 +585,10 @@ public:
   const size_t max_connections;
   const size_t max_inflight;
   const size_t max_queue;
+  const size_t max_idle_time;
 private:
   std::atomic<size_t> connection_count;
-  bool stopped;
+  std::atomic<bool> stopped;
   struct timeval read_timeout;
   ConnectionList connections;
   MessageQueue messages;
@@ -547,14 +596,16 @@ private:
   std::atomic<size_t> dequeued;
   CephContext* const cct;
   mutable std::mutex connections_lock;
-  std::thread runner;
   const ceph::coarse_real_clock::duration idle_time;
   const ceph::coarse_real_clock::duration reconnect_time;
+  std::thread runner;
 
   void publish_internal(message_wrapper_t* message) {
     const std::unique_ptr<message_wrapper_t> msg_owner(message);
     auto& conn = message->conn;
 
+    conn->timestamp = ceph_clock_now();
+
     if (!conn->is_ok()) {
       // connection had an issue while message was in the queue
       // TODO add error stats
@@ -628,7 +679,7 @@ private:
   // (3) manages deleted connections
   // (4) TODO reconnect on connection errors
   // (5) TODO cleanup timedout callbacks
-  void run() {
+  void run() noexcept {
     amqp_frame_t frame;
     while (!stopped) {
 
@@ -649,13 +700,10 @@ private:
       for (;conn_it != end_it;) {
         
         auto& conn = conn_it->second;
-        // delete the connection if marked for deletion
-        if (conn->marked_for_deletion) {
-          ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
-          conn->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
-          std::lock_guard lock(connections_lock);
-          // erase is safe - does not invalidate any other iterator
-          // lock so no insertion happens at the same time
+        const auto& conn_key = conn_it->first;
+
+        if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+          ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
           ERASE_AND_CONTINUE(conn_it, connections);
         }
 
@@ -666,20 +714,21 @@ private:
             // pointers are used temporarily inside the amqp_connection_info object
             // as read-only values, hence the assignment, and const_cast are safe here
             amqp_connection_info info;
-            info.host = const_cast<char*>(conn_it->first.host.c_str());
-            info.port = conn_it->first.port;
-            info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
+            info.host = const_cast<char*>(conn_key.host.c_str());
+            info.port = conn_key.port;
+            info.vhost = const_cast<char*>(conn_key.vhost.c_str());
             info.user = const_cast<char*>(conn->user.c_str());
             info.password = const_cast<char*>(conn->password.c_str());
+            info.ssl = conn->use_ssl;
             ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
             if (create_connection(conn, info)->is_ok() == false) {
-              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed. error: " <<
+              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry failed. error: " <<
                 status_to_string(conn->status) << " (" << conn->reply_code << ")"  << dendl;
               // TODO: add error counter for failed retries
               // TODO: add exponential backoff for retries
               conn->next_reconnect = now + reconnect_time;
             } else {
-              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
+              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry successfull" << dendl;
             }
           }
           INCREMENT_AND_CONTINUE(conn_it);
@@ -811,6 +860,7 @@ public:
     max_connections(_max_connections),
     max_inflight(_max_inflight),
     max_queue(_max_queue),
+    max_idle_time(30),
     connection_count(0),
     stopped(false),
     read_timeout{0, _usec_timeout},
@@ -819,9 +869,9 @@ public:
     queued(0),
     dequeued(0),
     cct(_cct),
-    runner(&Manager::run, this),
     idle_time(std::chrono::milliseconds(idle_time_ms)),
-    reconnect_time(std::chrono::milliseconds(reconnect_time_ms)) {
+    reconnect_time(std::chrono::milliseconds(reconnect_time_ms)),
+    runner(&Manager::run, this) {
       // The hashmap has "max connections" as the initial number of buckets, 
       // and allows for 10 collisions per bucket before rehash.
       // This is to prevent rehashing so that iterators are not invalidated 
@@ -841,17 +891,9 @@ public:
     stopped = true;
   }
 
-  // disconnect from a broker
-  bool disconnect(connection_ptr_t& conn) {
-    if (!conn || stopped) {
-      return false;
-    }
-    conn->marked_for_deletion = true;
-    return true;
-  }
-
   // connect to a broker, or reuse an existing connection if already connected
-  connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
+  connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+        boost::optional<const std::string&> ca_location) {
     if (stopped) {
       ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
       return nullptr;
@@ -860,8 +902,9 @@ public:
     struct amqp_connection_info info;
     // cache the URL so that parsing could happen in-place
     std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
-    if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) {
-      ldout(cct, 1) << "AMQP connect: URL parsing failed" << dendl;
+    const auto retcode = amqp_parse_url(url_cache.data(), &info);
+    if (AMQP_STATUS_OK != retcode) {
+      ldout(cct, 1) << "AMQP connect: URL parsing failed. error: " << retcode << dendl;
       return nullptr;
     }
 
@@ -869,10 +912,7 @@ public:
     std::lock_guard lock(connections_lock);
     const auto it = connections.find(id);
     if (it != connections.end()) {
-      if (it->second->marked_for_deletion) {
-        ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl;
-        return nullptr;
-      } else if (it->second->exchange != exchange) {
+      if (it->second->exchange != exchange) {
         ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
         return nullptr;
       }
@@ -886,7 +926,7 @@ public:
       ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
       return nullptr;
     }
-    const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct);
+    const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct, verify_ssl, ca_location);
     if (!conn->is_ok()) {
       ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" <<
               status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl;
@@ -959,6 +999,7 @@ public:
     size_t sum = 0;
     std::lock_guard lock(connections_lock);
     std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
+        // concurrent access to the callback vector is safe without locking
         sum += conn_pair.second->callbacks.size();
       });
     return sum;
@@ -1002,9 +1043,10 @@ void shutdown() {
   s_manager = nullptr;
 }
 
-connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
+connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+        boost::optional<const std::string&> ca_location) {
   if (!s_manager) return nullptr;
-  return s_manager->connect(url, exchange, mandatory_delivery);
+  return s_manager->connect(url, exchange, mandatory_delivery, verify_ssl, ca_location);
 }
 
 int publish(connection_ptr_t& conn, 
@@ -1057,10 +1099,5 @@ size_t get_max_queue() {
   return s_manager->max_queue;
 }
 
-bool disconnect(connection_ptr_t& conn) {
-  if (!s_manager) return false;
-  return s_manager->disconnect(conn);
-}
-
 } // namespace amqp