]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_amqp.cc
import ceph 14.2.5
[ceph.git] / ceph / src / rgw / rgw_amqp.cc
index d0cbc7b0325140e75c13698f04776084a5c8ea35..ae62682c59d6d33fc507826cbaad20273a3937a4 100644 (file)
@@ -1,6 +1,7 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
 // vim: ts=8 sw=2 smarttab
 
+#include "include/compat.h"
 #include "rgw_amqp.h"
 #include <amqp.h>
 #include <amqp_tcp_socket.h>
@@ -15,6 +16,9 @@
 #include <atomic>
 #include <mutex>
 #include <boost/lockfree/queue.hpp>
+#include "common/dout.h"
+
+#define dout_subsys ceph_subsys_rgw
 
 // TODO investigation, not necessarily issues:
 // (1) in case of single threaded writer context use spsc_queue
 namespace rgw::amqp {
 
 // RGW AMQP status codes for publishing
-static const int RGW_AMQP_STATUS_BROKER_NACK =          -0x1001;
-static const int RGW_AMQP_STATUS_CONNECTION_CLOSED =    -0x1002;
-static const int RGW_AMQP_STATUS_QUEUE_FULL =           -0x1003;
-static const int RGW_AMQP_STATUS_MAX_INFLIGHT =         -0x1004;
+static const int RGW_AMQP_STATUS_BROKER_NACK =            -0x1001;
+static const int RGW_AMQP_STATUS_CONNECTION_CLOSED =      -0x1002;
+static const int RGW_AMQP_STATUS_QUEUE_FULL =             -0x1003;
+static const int RGW_AMQP_STATUS_MAX_INFLIGHT =           -0x1004;
+static const int RGW_AMQP_STATUS_MANAGER_STOPPED =        -0x1005;
 // RGW AMQP status code for connection opening
-static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED =    -0x2001;
-static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED =  -0x2002;
-static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED =   -0x2003;
-static const int RGW_AMQP_STATUS_LOGIN_FAILED =         -0x2004;
-static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED =  -0x2005;
+static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED =      -0x2001;
+static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED =    -0x2002;
+static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED =     -0x2003;
+static const int RGW_AMQP_STATUS_LOGIN_FAILED =           -0x2004;
+static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED =    -0x2005;
 static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED = -0x2006;
-static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED =     -0x2007;
+static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED =       -0x2007;
 static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008;
 static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009;
 
-static const int RGW_AMQP_RESPONSE_SOCKET_ERROR =       -0x3008;
-static const int RGW_AMQP_NO_REPLY_CODE =               0x0;
+static const int RGW_AMQP_RESPONSE_SOCKET_ERROR =         -0x3008;
+static const int RGW_AMQP_NO_REPLY_CODE =                 0x0;
 
 // key class for the connection list
 struct connection_id_t {
@@ -67,6 +72,10 @@ struct connection_id_t {
   };
 };
 
+std::string to_string(const connection_id_t& id) {
+    return id.host+":"+"/"+id.vhost;
+}
+
 // connection_t state cleaner
 // could be used for automatic cleanup when getting out of scope
 class ConnectionCleaner {
@@ -114,6 +123,7 @@ struct connection_t {
   int reply_type;
   int reply_code;
   mutable std::atomic<int> ref_count;
+  CephContext* cct;
   CallbackList callbacks;
 
   // default ctor
@@ -125,7 +135,8 @@ struct connection_t {
     status(AMQP_STATUS_OK),
     reply_type(AMQP_RESPONSE_NORMAL),
     reply_code(RGW_AMQP_NO_REPLY_CODE),
-    ref_count(0) {}
+    ref_count(0),
+    cct(nullptr) {}
 
   // cleanup of all internal connection resource
   // the object can still remain, and internal connection
@@ -137,9 +148,11 @@ struct connection_t {
     amqp_bytes_free(reply_to_queue);
     reply_to_queue = amqp_empty_bytes;
     // fire all remaining callbacks
-    std::for_each(callbacks.begin(), callbacks.end(), [s](auto& cb_tag) {
-        cb_tag.cb(s);
+    std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
+        cb_tag.cb(status);
+        ldout(cct, 20) << "AMQP destroy: invoking callback with tag=" << cb_tag.tag << dendl;
       });
+    callbacks.clear();
     delivery_tag = 1;
   }
 
@@ -314,6 +327,8 @@ std::string status_to_string(int s) {
       return "RGW_AMQP_STATUS_QUEUE_FULL";
     case RGW_AMQP_STATUS_MAX_INFLIGHT:
       return "RGW_AMQP_STATUS_MAX_INFLIGHT";
+    case RGW_AMQP_STATUS_MANAGER_STOPPED:
+      return "RGW_AMQP_STATUS_MANAGER_STOPPED";
     case RGW_AMQP_STATUS_CONN_ALLOC_FAILED:
       return "RGW_AMQP_STATUS_CONN_ALLOC_FAILED";
     case RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED:
@@ -475,12 +490,13 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
 
 // utility function to create a new connection
 connection_ptr_t create_new_connection(const amqp_connection_info& info, 
-    const std::string& exchange) { 
+    const std::string& exchange, CephContext* cct) { 
   // create connection state
   connection_ptr_t conn = new connection_t;
   conn->exchange = exchange;
   conn->user.assign(info.user);
   conn->password.assign(info.password);
+  conn->cct = cct;
   return create_connection(conn, info);
 }
 
@@ -524,6 +540,7 @@ private:
   MessageQueue messages;
   std::atomic<size_t> queued;
   std::atomic<size_t> dequeued;
+  CephContext* const cct;
   mutable std::mutex connections_lock;
   std::thread runner;
 
@@ -534,6 +551,7 @@ private:
     if (!conn->is_ok()) {
       // connection had an issue while message was in the queue
       // TODO add error stats
+      ldout(conn->cct, 1) << "AMQP publish: connection had an issue while message was in the queue" << dendl;
       if (message->cb) {
         message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
       }
@@ -551,8 +569,10 @@ private:
         nullptr,
         amqp_cstring_bytes(message->message.c_str()));
       if (rc == AMQP_STATUS_OK) {
+        ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl;
         return;
       }
+      ldout(conn->cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl;
       // an error occurred, close connection
       // it will be retied by the main loop
       conn->destroy(rc);
@@ -576,15 +596,19 @@ private:
       amqp_cstring_bytes(message->message.c_str()));
 
     if (rc == AMQP_STATUS_OK) {
-      if (conn->callbacks.size() < max_inflight) {
+      auto const q_len = conn->callbacks.size();
+      if (q_len < max_inflight) {
+        ldout(conn->cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
         conn->callbacks.emplace_back(conn->delivery_tag++, message->cb);
       } else {
         // immediately invoke callback with error
+        ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
         message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT);
       }
     } else {
       // an error occurred, close connection
       // it will be retied by the main loop
+      ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
       conn->destroy(rc);
       // immediately invoke callback with error
       message->cb(rc);
@@ -620,6 +644,7 @@ private:
         auto& conn = conn_it->second;
         // delete the connection if marked for deletion
         if (conn->marked_for_deletion) {
+          ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
           conn->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
           std::lock_guard<std::mutex> lock(connections_lock);
           // erase is safe - does not invalidate any other iterator
@@ -637,9 +662,13 @@ private:
           info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
           info.user = const_cast<char*>(conn->user.c_str());
           info.password = const_cast<char*>(conn->password.c_str());
+          ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
           if (create_connection(conn, info)->is_ok() == false) {
+            ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed" << dendl;
             // TODO: add error counter for failed retries
             // TODO: add exponential backoff for retries
+          } else {
+            ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
           }
           INCREMENT_AND_CONTINUE(conn_it);
         }
@@ -659,11 +688,13 @@ private:
         if (rc != AMQP_STATUS_OK) {
           // an error occurred, close connection
           // it will be retied by the main loop
+          ldout(conn->cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
           conn->destroy(rc);
           INCREMENT_AND_CONTINUE(conn_it);
         }
 
         if (frame.frame_type != AMQP_FRAME_METHOD) {
+          ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages" << dendl;
           // handler is for publish confirmation only - handle only method frames
           // TODO: add a counter
           INCREMENT_AND_CONTINUE(conn_it);
@@ -697,37 +728,45 @@ private:
           case AMQP_CHANNEL_CLOSE_METHOD:
             {
               // other side closed the connection, no need to continue
+              ldout(conn->cct, 10) << "AMQP run: connection was closed by broker" << dendl;
               conn->destroy(rc);
               INCREMENT_AND_CONTINUE(conn_it);
             }
           case AMQP_BASIC_RETURN_METHOD:
             // message was not delivered, returned to sender
             // TODO: add a counter
+            ldout(conn->cct, 10) << "AMQP run: message delivery error" << dendl;
             INCREMENT_AND_CONTINUE(conn_it);
             break;
           default:
             // unexpected method
             // TODO: add a counter
+            ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl;
             INCREMENT_AND_CONTINUE(conn_it);
         }
 
         const auto& callbacks_end = conn->callbacks.end();
         const auto& callbacks_begin = conn->callbacks.begin();
-        const auto it = std::find(callbacks_begin, callbacks_end, tag);
-        if (it != callbacks_end) {
+        const auto tag_it = std::find(callbacks_begin, callbacks_end, tag);
+        if (tag_it != callbacks_end) {
           if (multiple) {
             // n/ack all up to (and including) the tag
-            for (auto rit = it; rit >= callbacks_begin; --rit) {
-              rit->cb(result);
-              conn->callbacks.erase(rit);
+            ldout(conn->cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
+            auto it = callbacks_begin;
+            while (it->tag <= tag && it != conn->callbacks.end()) {
+              ldout(conn->cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
+              it->cb(result);
+              it = conn->callbacks.erase(it);
             }
           } else {
             // n/ack a specific tag
-            it->cb(result);
-            conn->callbacks.erase(it);
+            ldout(conn->cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
+            tag_it->cb(result);
+            conn->callbacks.erase(tag_it);
           }
         } else {
           // TODO add counter for acks with no callback
+          ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
         }
         // just increment the iterator
         ++conn_it;
@@ -748,7 +787,8 @@ public:
   Manager(size_t _max_connections,
       size_t _max_inflight,
       size_t _max_queue, 
-      long _usec_timeout) : 
+      long _usec_timeout,
+      CephContext* _cct) : 
     max_connections(_max_connections),
     max_inflight(_max_inflight),
     max_queue(_max_queue),
@@ -759,12 +799,16 @@ public:
     messages(max_queue),
     queued(0),
     dequeued(0),
+    cct(_cct),
     runner(&Manager::run, this) {
       // The hashmap has "max connections" as the initial number of buckets, 
       // and allows for 10 collisions per bucket before rehash.
       // This is to prevent rehashing so that iterators are not invalidated 
       // when a new connection is added.
       connections.max_load_factor(10.0);
+      // give the runner thread a name for easier debugging
+      const auto rc = ceph_pthread_setname(runner.native_handle(), "amqp_manager");
+      ceph_assert(rc==0);
   }
 
   // non copyable
@@ -789,6 +833,7 @@ public:
   connection_ptr_t connect(const std::string& url, const std::string& exchange) {
     if (stopped) {
       // TODO: increment counter
+      ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
       return nullptr;
     }
 
@@ -797,6 +842,7 @@ public:
     std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
     if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) {
       // TODO: increment counter
+      ldout(cct, 1) << "AMQP connect: URL parsing failed" << dendl;
       return nullptr;
     }
 
@@ -806,26 +852,32 @@ public:
     if (it != connections.end()) {
       if (it->second->marked_for_deletion) {
         // TODO: increment counter
+        ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl;
         return nullptr;
       } else if (it->second->exchange != exchange) {
         // TODO: increment counter
+        ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
         return nullptr;
       }
       // connection found - return even if non-ok
+      ldout(cct, 20) << "AMQP connect: connection found" << dendl;
       return it->second;
     }
 
     // connection not found, creating a new one
     if (connection_count >= max_connections) {
       // TODO: increment counter
+      ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
       return nullptr;
     }
-    const auto conn = create_new_connection(info, exchange);
+    const auto conn = create_new_connection(info, exchange, cct);
     // create_new_connection must always return a connection object
     // even if error occurred during creation. 
     // in such a case the creation will be retried in the main thread
     ceph_assert(conn);
     ++connection_count;
+    ldout(cct, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count << dendl;
+    ldout(cct, 10) << "AMQP connect: new connection status is: " << status_to_string(conn->status) << dendl;
     return connections.emplace(id, conn).first->second;
   }
 
@@ -833,6 +885,9 @@ public:
   int publish(connection_ptr_t& conn, 
     const std::string& topic,
     const std::string& message) {
+    if (stopped) {
+      return RGW_AMQP_STATUS_MANAGER_STOPPED;
+    }
     if (!conn || !conn->is_ok()) {
       return RGW_AMQP_STATUS_CONNECTION_CLOSED;
     }
@@ -847,6 +902,9 @@ public:
     const std::string& topic,
     const std::string& message,
     reply_callback_t cb) {
+    if (stopped) {
+      return RGW_AMQP_STATUS_MANAGER_STOPPED;
+    }
     if (!conn || !conn->is_ok()) {
       return RGW_AMQP_STATUS_CONNECTION_CLOSED;
     }
@@ -893,56 +951,85 @@ public:
 
 // singleton manager
 // note that the manager itself is not a singleton, and multiple instances may co-exist
-// TODO get parameters from conf
-Manager s_manager(256, 8192, 8192, 100);
+// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
+static Manager* s_manager = nullptr;
+
+static const size_t MAX_CONNECTIONS_DEFAULT = 256;
+static const size_t MAX_INFLIGHT_DEFAULT = 8192; 
+static const size_t MAX_QUEUE_DEFAULT = 8192;
+
+bool init(CephContext* cct) {
+  if (s_manager) {
+    return false;
+  }
+  // TODO: take conf from CephContext
+  s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct);
+  return true;
+}
+
+void shutdown() {
+  delete s_manager;
+  s_manager = nullptr;
+}
 
 connection_ptr_t connect(const std::string& url, const std::string& exchange) {
-  return s_manager.connect(url, exchange);
+  if (!s_manager) return nullptr;
+  return s_manager->connect(url, exchange);
 }
 
 int publish(connection_ptr_t& conn, 
     const std::string& topic,
     const std::string& message) {
-  return s_manager.publish(conn, topic, message);
+  if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
+  return s_manager->publish(conn, topic, message);
 }
 
 int publish_with_confirm(connection_ptr_t& conn, 
     const std::string& topic,
     const std::string& message,
     reply_callback_t cb) {
-  return s_manager.publish_with_confirm(conn, topic, message, cb);
+  if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
+  return s_manager->publish_with_confirm(conn, topic, message, cb);
 }
 
 size_t get_connection_count() {
-  return s_manager.get_connection_count();
+  if (!s_manager) return 0;
+  return s_manager->get_connection_count();
 }
   
 size_t get_inflight() {
-  return s_manager.get_inflight();
+  if (!s_manager) return 0;
+  return s_manager->get_inflight();
 }
 
 size_t get_queued() {
-  return s_manager.get_queued();
+  if (!s_manager) return 0;
+  return s_manager->get_queued();
 }
 
 size_t get_dequeued() {
-  return s_manager.get_dequeued();
+  if (!s_manager) return 0;
+  return s_manager->get_dequeued();
 }
 
 size_t get_max_connections() {
-  return s_manager.max_connections;
+  if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
+  return s_manager->max_connections;
 }
 
 size_t get_max_inflight() {
-  return s_manager.max_inflight;
+  if (!s_manager) return MAX_INFLIGHT_DEFAULT;
+  return s_manager->max_inflight;
 }
 
 size_t get_max_queue() {
-  return s_manager.max_queue;
+  if (!s_manager) return MAX_QUEUE_DEFAULT;
+  return s_manager->max_queue;
 }
 
 bool disconnect(connection_ptr_t& conn) {
-  return s_manager.disconnect(conn);
+  if (!s_manager) return false;
+  return s_manager->disconnect(conn);
 }
 
 } // namespace amqp