// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+#include "include/compat.h"
#include "rgw_amqp.h"
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <atomic>
#include <mutex>
#include <boost/lockfree/queue.hpp>
+#include "common/dout.h"
+
+#define dout_subsys ceph_subsys_rgw
// TODO investigation, not necessarily issues:
// (1) in case of single threaded writer context use spsc_queue
namespace rgw::amqp {
// RGW AMQP status codes for publishing
-static const int RGW_AMQP_STATUS_BROKER_NACK = -0x1001;
-static const int RGW_AMQP_STATUS_CONNECTION_CLOSED = -0x1002;
-static const int RGW_AMQP_STATUS_QUEUE_FULL = -0x1003;
-static const int RGW_AMQP_STATUS_MAX_INFLIGHT = -0x1004;
+static const int RGW_AMQP_STATUS_BROKER_NACK = -0x1001;
+static const int RGW_AMQP_STATUS_CONNECTION_CLOSED = -0x1002;
+static const int RGW_AMQP_STATUS_QUEUE_FULL = -0x1003;
+static const int RGW_AMQP_STATUS_MAX_INFLIGHT = -0x1004;
+static const int RGW_AMQP_STATUS_MANAGER_STOPPED = -0x1005;
// RGW AMQP status code for connection opening
-static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED = -0x2001;
-static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED = -0x2002;
-static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED = -0x2003;
-static const int RGW_AMQP_STATUS_LOGIN_FAILED = -0x2004;
-static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED = -0x2005;
+static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED = -0x2001;
+static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED = -0x2002;
+static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED = -0x2003;
+static const int RGW_AMQP_STATUS_LOGIN_FAILED = -0x2004;
+static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED = -0x2005;
static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED = -0x2006;
-static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED = -0x2007;
+static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED = -0x2007;
static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008;
static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009;
-static const int RGW_AMQP_RESPONSE_SOCKET_ERROR = -0x3008;
-static const int RGW_AMQP_NO_REPLY_CODE = 0x0;
+static const int RGW_AMQP_RESPONSE_SOCKET_ERROR = -0x3008;
+static const int RGW_AMQP_NO_REPLY_CODE = 0x0;
// key class for the connection list
struct connection_id_t {
};
};
+std::string to_string(const connection_id_t& id) {
+ return id.host+":"+"/"+id.vhost;
+}
+
// connection_t state cleaner
// could be used for automatic cleanup when getting out of scope
class ConnectionCleaner {
int reply_type;
int reply_code;
mutable std::atomic<int> ref_count;
+ CephContext* cct;
CallbackList callbacks;
// default ctor
status(AMQP_STATUS_OK),
reply_type(AMQP_RESPONSE_NORMAL),
reply_code(RGW_AMQP_NO_REPLY_CODE),
- ref_count(0) {}
+ ref_count(0),
+ cct(nullptr) {}
// cleanup of all internal connection resource
// the object can still remain, and internal connection
amqp_bytes_free(reply_to_queue);
reply_to_queue = amqp_empty_bytes;
// fire all remaining callbacks
- std::for_each(callbacks.begin(), callbacks.end(), [s](auto& cb_tag) {
- cb_tag.cb(s);
+ std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
+ cb_tag.cb(status);
+ ldout(cct, 20) << "AMQP destroy: invoking callback with tag=" << cb_tag.tag << dendl;
});
+ callbacks.clear();
delivery_tag = 1;
}
return "RGW_AMQP_STATUS_QUEUE_FULL";
case RGW_AMQP_STATUS_MAX_INFLIGHT:
return "RGW_AMQP_STATUS_MAX_INFLIGHT";
+ case RGW_AMQP_STATUS_MANAGER_STOPPED:
+ return "RGW_AMQP_STATUS_MANAGER_STOPPED";
case RGW_AMQP_STATUS_CONN_ALLOC_FAILED:
return "RGW_AMQP_STATUS_CONN_ALLOC_FAILED";
case RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED:
// utility function to create a new connection
connection_ptr_t create_new_connection(const amqp_connection_info& info,
- const std::string& exchange) {
+ const std::string& exchange, CephContext* cct) {
// create connection state
connection_ptr_t conn = new connection_t;
conn->exchange = exchange;
conn->user.assign(info.user);
conn->password.assign(info.password);
+ conn->cct = cct;
return create_connection(conn, info);
}
MessageQueue messages;
std::atomic<size_t> queued;
std::atomic<size_t> dequeued;
+ CephContext* const cct;
mutable std::mutex connections_lock;
std::thread runner;
if (!conn->is_ok()) {
// connection had an issue while message was in the queue
// TODO add error stats
+ ldout(conn->cct, 1) << "AMQP publish: connection had an issue while message was in the queue" << dendl;
if (message->cb) {
message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
}
nullptr,
amqp_cstring_bytes(message->message.c_str()));
if (rc == AMQP_STATUS_OK) {
+ ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl;
return;
}
+ ldout(conn->cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl;
// an error occurred, close connection
// it will be retied by the main loop
conn->destroy(rc);
amqp_cstring_bytes(message->message.c_str()));
if (rc == AMQP_STATUS_OK) {
- if (conn->callbacks.size() < max_inflight) {
+ auto const q_len = conn->callbacks.size();
+ if (q_len < max_inflight) {
+ ldout(conn->cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
conn->callbacks.emplace_back(conn->delivery_tag++, message->cb);
} else {
// immediately invoke callback with error
+ ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT);
}
} else {
// an error occurred, close connection
// it will be retied by the main loop
+ ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
conn->destroy(rc);
// immediately invoke callback with error
message->cb(rc);
auto& conn = conn_it->second;
// delete the connection if marked for deletion
if (conn->marked_for_deletion) {
+ ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
conn->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
std::lock_guard<std::mutex> lock(connections_lock);
// erase is safe - does not invalidate any other iterator
info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
info.user = const_cast<char*>(conn->user.c_str());
info.password = const_cast<char*>(conn->password.c_str());
+ ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
if (create_connection(conn, info)->is_ok() == false) {
+ ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed" << dendl;
// TODO: add error counter for failed retries
// TODO: add exponential backoff for retries
+ } else {
+ ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
}
INCREMENT_AND_CONTINUE(conn_it);
}
if (rc != AMQP_STATUS_OK) {
// an error occurred, close connection
// it will be retied by the main loop
+ ldout(conn->cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
conn->destroy(rc);
INCREMENT_AND_CONTINUE(conn_it);
}
if (frame.frame_type != AMQP_FRAME_METHOD) {
+ ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages" << dendl;
// handler is for publish confirmation only - handle only method frames
// TODO: add a counter
INCREMENT_AND_CONTINUE(conn_it);
case AMQP_CHANNEL_CLOSE_METHOD:
{
// other side closed the connection, no need to continue
+ ldout(conn->cct, 10) << "AMQP run: connection was closed by broker" << dendl;
conn->destroy(rc);
INCREMENT_AND_CONTINUE(conn_it);
}
case AMQP_BASIC_RETURN_METHOD:
// message was not delivered, returned to sender
// TODO: add a counter
+ ldout(conn->cct, 10) << "AMQP run: message delivery error" << dendl;
INCREMENT_AND_CONTINUE(conn_it);
break;
default:
// unexpected method
// TODO: add a counter
+ ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl;
INCREMENT_AND_CONTINUE(conn_it);
}
const auto& callbacks_end = conn->callbacks.end();
const auto& callbacks_begin = conn->callbacks.begin();
- const auto it = std::find(callbacks_begin, callbacks_end, tag);
- if (it != callbacks_end) {
+ const auto tag_it = std::find(callbacks_begin, callbacks_end, tag);
+ if (tag_it != callbacks_end) {
if (multiple) {
// n/ack all up to (and including) the tag
- for (auto rit = it; rit >= callbacks_begin; --rit) {
- rit->cb(result);
- conn->callbacks.erase(rit);
+ ldout(conn->cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
+ auto it = callbacks_begin;
+ while (it->tag <= tag && it != conn->callbacks.end()) {
+ ldout(conn->cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
+ it->cb(result);
+ it = conn->callbacks.erase(it);
}
} else {
// n/ack a specific tag
- it->cb(result);
- conn->callbacks.erase(it);
+ ldout(conn->cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
+ tag_it->cb(result);
+ conn->callbacks.erase(tag_it);
}
} else {
// TODO add counter for acks with no callback
+ ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
}
// just increment the iterator
++conn_it;
Manager(size_t _max_connections,
size_t _max_inflight,
size_t _max_queue,
- long _usec_timeout) :
+ long _usec_timeout,
+ CephContext* _cct) :
max_connections(_max_connections),
max_inflight(_max_inflight),
max_queue(_max_queue),
messages(max_queue),
queued(0),
dequeued(0),
+ cct(_cct),
runner(&Manager::run, this) {
// The hashmap has "max connections" as the initial number of buckets,
// and allows for 10 collisions per bucket before rehash.
// This is to prevent rehashing so that iterators are not invalidated
// when a new connection is added.
connections.max_load_factor(10.0);
+ // give the runner thread a name for easier debugging
+ const auto rc = ceph_pthread_setname(runner.native_handle(), "amqp_manager");
+ ceph_assert(rc==0);
}
// non copyable
connection_ptr_t connect(const std::string& url, const std::string& exchange) {
if (stopped) {
// TODO: increment counter
+ ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
return nullptr;
}
std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) {
// TODO: increment counter
+ ldout(cct, 1) << "AMQP connect: URL parsing failed" << dendl;
return nullptr;
}
if (it != connections.end()) {
if (it->second->marked_for_deletion) {
// TODO: increment counter
+ ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl;
return nullptr;
} else if (it->second->exchange != exchange) {
// TODO: increment counter
+ ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
return nullptr;
}
// connection found - return even if non-ok
+ ldout(cct, 20) << "AMQP connect: connection found" << dendl;
return it->second;
}
// connection not found, creating a new one
if (connection_count >= max_connections) {
// TODO: increment counter
+ ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
return nullptr;
}
- const auto conn = create_new_connection(info, exchange);
+ const auto conn = create_new_connection(info, exchange, cct);
// create_new_connection must always return a connection object
// even if error occurred during creation.
// in such a case the creation will be retried in the main thread
ceph_assert(conn);
++connection_count;
+ ldout(cct, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count << dendl;
+ ldout(cct, 10) << "AMQP connect: new connection status is: " << status_to_string(conn->status) << dendl;
return connections.emplace(id, conn).first->second;
}
int publish(connection_ptr_t& conn,
const std::string& topic,
const std::string& message) {
+ if (stopped) {
+ return RGW_AMQP_STATUS_MANAGER_STOPPED;
+ }
if (!conn || !conn->is_ok()) {
return RGW_AMQP_STATUS_CONNECTION_CLOSED;
}
const std::string& topic,
const std::string& message,
reply_callback_t cb) {
+ if (stopped) {
+ return RGW_AMQP_STATUS_MANAGER_STOPPED;
+ }
if (!conn || !conn->is_ok()) {
return RGW_AMQP_STATUS_CONNECTION_CLOSED;
}
// singleton manager
// note that the manager itself is not a singleton, and multiple instances may co-exist
-// TODO get parameters from conf
-Manager s_manager(256, 8192, 8192, 100);
+// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
+static Manager* s_manager = nullptr;
+
+static const size_t MAX_CONNECTIONS_DEFAULT = 256;
+static const size_t MAX_INFLIGHT_DEFAULT = 8192;
+static const size_t MAX_QUEUE_DEFAULT = 8192;
+
+bool init(CephContext* cct) {
+ if (s_manager) {
+ return false;
+ }
+ // TODO: take conf from CephContext
+ s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct);
+ return true;
+}
+
+void shutdown() {
+ delete s_manager;
+ s_manager = nullptr;
+}
connection_ptr_t connect(const std::string& url, const std::string& exchange) {
- return s_manager.connect(url, exchange);
+ if (!s_manager) return nullptr;
+ return s_manager->connect(url, exchange);
}
int publish(connection_ptr_t& conn,
const std::string& topic,
const std::string& message) {
- return s_manager.publish(conn, topic, message);
+ if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
+ return s_manager->publish(conn, topic, message);
}
int publish_with_confirm(connection_ptr_t& conn,
const std::string& topic,
const std::string& message,
reply_callback_t cb) {
- return s_manager.publish_with_confirm(conn, topic, message, cb);
+ if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
+ return s_manager->publish_with_confirm(conn, topic, message, cb);
}
size_t get_connection_count() {
- return s_manager.get_connection_count();
+ if (!s_manager) return 0;
+ return s_manager->get_connection_count();
}
size_t get_inflight() {
- return s_manager.get_inflight();
+ if (!s_manager) return 0;
+ return s_manager->get_inflight();
}
size_t get_queued() {
- return s_manager.get_queued();
+ if (!s_manager) return 0;
+ return s_manager->get_queued();
}
size_t get_dequeued() {
- return s_manager.get_dequeued();
+ if (!s_manager) return 0;
+ return s_manager->get_dequeued();
}
size_t get_max_connections() {
- return s_manager.max_connections;
+ if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
+ return s_manager->max_connections;
}
size_t get_max_inflight() {
- return s_manager.max_inflight;
+ if (!s_manager) return MAX_INFLIGHT_DEFAULT;
+ return s_manager->max_inflight;
}
size_t get_max_queue() {
- return s_manager.max_queue;
+ if (!s_manager) return MAX_QUEUE_DEFAULT;
+ return s_manager->max_queue;
}
bool disconnect(connection_ptr_t& conn) {
- return s_manager.disconnect(conn);
+ if (!s_manager) return false;
+ return s_manager->disconnect(conn);
}
} // namespace amqp