#include "rgw_amqp.h"
#include <amqp.h>
+#include <amqp_ssl_socket.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include "include/ceph_assert.h"
#include <mutex>
#include <boost/lockfree/queue.hpp>
#include "common/dout.h"
+#include <openssl/ssl.h>
#define dout_subsys ceph_subsys_rgw
static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED = -0x2007;
static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008;
static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009;
+static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED = -0x2010;
static const int RGW_AMQP_RESPONSE_SOCKET_ERROR = -0x3008;
static const int RGW_AMQP_NO_REPLY_CODE = 0x0;
};
std::string to_string(const connection_id_t& id) {
- return id.host+":"+std::to_string(id.port)+"/"+id.vhost;
+ return id.host+":"+std::to_string(id.port)+id.vhost;
}
// connection_t state cleaner
// it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
// since references to deleted objects may still exist in the calling code
struct connection_t {
- amqp_connection_state_t state;
+ std::atomic<amqp_connection_state_t> state;
std::string exchange;
std::string user;
std::string password;
amqp_bytes_t reply_to_queue;
- bool marked_for_deletion;
uint64_t delivery_tag;
int status;
int reply_type;
mutable std::atomic<int> ref_count;
CephContext* cct;
CallbackList callbacks;
+ ceph::coarse_real_clock::time_point next_reconnect;
+ bool mandatory;
+ bool use_ssl;
+ bool verify_ssl;
+ boost::optional<const std::string&> ca_location;
+ utime_t timestamp = ceph_clock_now();
// default ctor
connection_t() :
state(nullptr),
reply_to_queue(amqp_empty_bytes),
- marked_for_deletion(false),
delivery_tag(1),
status(AMQP_STATUS_OK),
reply_type(AMQP_RESPONSE_NORMAL),
reply_code(RGW_AMQP_NO_REPLY_CODE),
ref_count(0),
- cct(nullptr) {}
+ cct(nullptr),
+ next_reconnect(ceph::coarse_real_clock::now()),
+ mandatory(false),
+ use_ssl(false),
+ verify_ssl(false),
+ ca_location(boost::none)
+ {}
// cleanup of all internal connection resource
// the object can still remain, and internal connection
}
bool is_ok() const {
- return (state != nullptr && !marked_for_deletion);
+ return (state != nullptr);
}
// dtor also destroys the internals
return "AMQP_STATUS_SSL_CONNECTION_FAILED";
case _AMQP_STATUS_SSL_NEXT_VALUE:
return "AMQP_STATUS_INTERNAL";
+ default:
+ return "AMQP_STATUS_UNKNOWN";
}
- return "AMQP_STATUS_UNKNOWN";
}
// TODO: add status_to_string on the connection object to prinf full status
return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED:
return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
+ case RGW_AMQP_STATUS_SOCKET_CACERT_FAILED:
+ return "RGW_AMQP_STATUS_SOCKET_CACERT_FAILED";
}
return to_string((amqp_status_enum)s);
}
// utility function to create a connection, when the connection object already exists
connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) {
- // pointer must be valid and not marked for deletion
- ceph_assert(conn && !conn->marked_for_deletion);
-
+ ceph_assert(conn);
+
// reset all status codes
conn->status = AMQP_STATUS_OK;
conn->reply_type = AMQP_RESPONSE_NORMAL;
ConnectionCleaner state_guard(state);
// create and open socket
- auto socket = amqp_tcp_socket_new(state);
+ amqp_socket_t *socket = nullptr;
+ if (info.ssl) {
+ socket = amqp_ssl_socket_new(state);
+#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
+ SSL_CTX* ssl_ctx = reinterpret_cast<SSL_CTX*>(amqp_ssl_socket_get_context(socket));
+#else
+ // taken from https://github.com/alanxz/rabbitmq-c/pull/560
+ struct hack {
+ const struct amqp_socket_class_t *klass;
+ SSL_CTX *ctx;
+ };
+
+ struct hack *h = reinterpret_cast<struct hack*>(socket);
+ SSL_CTX* ssl_ctx = h->ctx;
+#endif
+ // ensure system CA certificates get loaded
+ SSL_CTX_set_default_verify_paths(ssl_ctx);
+ }
+ else {
+ socket = amqp_tcp_socket_new(state);
+ }
+
if (!socket) {
conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED;
return conn;
}
+ if (info.ssl) {
+ if (!conn->verify_ssl) {
+ amqp_ssl_socket_set_verify_peer(socket, 0);
+ amqp_ssl_socket_set_verify_hostname(socket, 0);
+ }
+ if (conn->ca_location.has_value()) {
+ const auto s = amqp_ssl_socket_set_cacert(socket, conn->ca_location.get().c_str());
+ if (s != AMQP_STATUS_OK) {
+ conn->status = RGW_AMQP_STATUS_SOCKET_CACERT_FAILED;
+ conn->reply_code = s;
+ return conn;
+ }
+ }
+ }
const auto s = amqp_socket_open(socket, info.host, info.port);
if (s < 0) {
conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED;
// utility function to create a new connection
connection_ptr_t create_new_connection(const amqp_connection_info& info,
- const std::string& exchange, CephContext* cct) {
+ const std::string& exchange, bool mandatory_delivery, CephContext* cct, bool verify_ssl, boost::optional<const std::string&> ca_location) {
// create connection state
connection_ptr_t conn = new connection_t;
conn->exchange = exchange;
conn->user.assign(info.user);
conn->password.assign(info.password);
+ conn->mandatory = mandatory_delivery;
conn->cct = cct;
+ conn->use_ssl = info.ssl;
+ conn->verify_ssl = verify_ssl;
+ conn->ca_location = ca_location;
return create_connection(conn, info);
}
const size_t max_connections;
const size_t max_inflight;
const size_t max_queue;
+ const size_t max_idle_time;
private:
std::atomic<size_t> connection_count;
- bool stopped;
+ std::atomic<bool> stopped;
struct timeval read_timeout;
ConnectionList connections;
MessageQueue messages;
std::atomic<size_t> dequeued;
CephContext* const cct;
mutable std::mutex connections_lock;
+ const ceph::coarse_real_clock::duration idle_time;
+ const ceph::coarse_real_clock::duration reconnect_time;
std::thread runner;
void publish_internal(message_wrapper_t* message) {
const std::unique_ptr<message_wrapper_t> msg_owner(message);
auto& conn = message->conn;
+ conn->timestamp = ceph_clock_now();
+
if (!conn->is_ok()) {
// connection had an issue while message was in the queue
// TODO add error stats
CHANNEL_ID,
amqp_cstring_bytes(conn->exchange.c_str()),
amqp_cstring_bytes(message->topic.c_str()),
- 1, // mandatory, TODO: take from conf
+ 0, // does not have to be routable
0, // not immediate
- nullptr,
+ nullptr, // no properties needed
amqp_cstring_bytes(message->message.c_str()));
if (rc == AMQP_STATUS_OK) {
ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl;
CONFIRMING_CHANNEL_ID,
amqp_cstring_bytes(conn->exchange.c_str()),
amqp_cstring_bytes(message->topic.c_str()),
- 1, // mandatory, TODO: take from conf
+ conn->mandatory,
0, // not immediate
&props,
amqp_cstring_bytes(message->message.c_str()));
// (3) manages deleted connections
// (4) TODO reconnect on connection errors
// (5) TODO cleanup timedout callbacks
- void run() {
+ void run() noexcept {
amqp_frame_t frame;
while (!stopped) {
for (;conn_it != end_it;) {
auto& conn = conn_it->second;
- // delete the connection if marked for deletion
- if (conn->marked_for_deletion) {
- ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
- conn->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
- std::lock_guard lock(connections_lock);
- // erase is safe - does not invalidate any other iterator
- // lock so no insertion happens at the same time
+ const auto& conn_key = conn_it->first;
+
+ if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+ ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
ERASE_AND_CONTINUE(conn_it, connections);
}
// try to reconnect the connection if it has an error
if (!conn->is_ok()) {
- // pointers are used temporarily inside the amqp_connection_info object
- // as read-only values, hence the assignment, and const_cast are safe here
- amqp_connection_info info;
- info.host = const_cast<char*>(conn_it->first.host.c_str());
- info.port = conn_it->first.port;
- info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
- info.user = const_cast<char*>(conn->user.c_str());
- info.password = const_cast<char*>(conn->password.c_str());
- ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
- if (create_connection(conn, info)->is_ok() == false) {
- ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed" << dendl;
- // TODO: add error counter for failed retries
- // TODO: add exponential backoff for retries
- } else {
- ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
+ const auto now = ceph::coarse_real_clock::now();
+ if (now >= conn->next_reconnect) {
+ // pointers are used temporarily inside the amqp_connection_info object
+ // as read-only values, hence the assignment, and const_cast are safe here
+ amqp_connection_info info;
+ info.host = const_cast<char*>(conn_key.host.c_str());
+ info.port = conn_key.port;
+ info.vhost = const_cast<char*>(conn_key.vhost.c_str());
+ info.user = const_cast<char*>(conn->user.c_str());
+ info.password = const_cast<char*>(conn->password.c_str());
+ info.ssl = conn->use_ssl;
+ ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
+ if (create_connection(conn, info)->is_ok() == false) {
+ ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry failed. error: " <<
+ status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl;
+ // TODO: add error counter for failed retries
+ // TODO: add exponential backoff for retries
+ conn->next_reconnect = now + reconnect_time;
+ } else {
+ ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry successfull" << dendl;
+ }
}
INCREMENT_AND_CONTINUE(conn_it);
}
}
if (frame.frame_type != AMQP_FRAME_METHOD) {
- ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages" << dendl;
+ ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: "
+ << unsigned(frame.frame_type) << dendl;
// handler is for publish confirmation only - handle only method frames
- // TODO: add a counter
INCREMENT_AND_CONTINUE(conn_it);
}
multiple = nack->multiple;
break;
}
+ case AMQP_BASIC_REJECT_METHOD:
+ {
+ result = RGW_AMQP_STATUS_BROKER_NACK;
+ const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;
+ tag = reject->delivery_tag;
+ multiple = false;
+ break;
+ }
case AMQP_CONNECTION_CLOSE_METHOD:
// TODO on channel close, no need to reopen the connection
case AMQP_CHANNEL_CLOSE_METHOD:
}
case AMQP_BASIC_RETURN_METHOD:
// message was not delivered, returned to sender
- // TODO: add a counter
- ldout(conn->cct, 10) << "AMQP run: message delivery error" << dendl;
+ ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl;
INCREMENT_AND_CONTINUE(conn_it);
break;
default:
// unexpected method
- // TODO: add a counter
ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl;
INCREMENT_AND_CONTINUE(conn_it);
}
conn->callbacks.erase(tag_it);
}
} else {
- // TODO add counter for acks with no callback
ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
}
// just increment the iterator
}
// if no messages were received or published, sleep for 100ms
if (count == 0 && !incoming_message) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ std::this_thread::sleep_for(idle_time);
}
}
}
size_t _max_inflight,
size_t _max_queue,
long _usec_timeout,
+ unsigned reconnect_time_ms,
+ unsigned idle_time_ms,
CephContext* _cct) :
max_connections(_max_connections),
max_inflight(_max_inflight),
max_queue(_max_queue),
+ max_idle_time(30),
connection_count(0),
stopped(false),
read_timeout{0, _usec_timeout},
queued(0),
dequeued(0),
cct(_cct),
+ idle_time(std::chrono::milliseconds(idle_time_ms)),
+ reconnect_time(std::chrono::milliseconds(reconnect_time_ms)),
runner(&Manager::run, this) {
// The hashmap has "max connections" as the initial number of buckets,
// and allows for 10 collisions per bucket before rehash.
stopped = true;
}
- // disconnect from a broker
- bool disconnect(connection_ptr_t& conn) {
- if (!conn || stopped) {
- return false;
- }
- conn->marked_for_deletion = true;
- return true;
- }
-
// connect to a broker, or reuse an existing connection if already connected
- connection_ptr_t connect(const std::string& url, const std::string& exchange) {
+ connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+ boost::optional<const std::string&> ca_location) {
if (stopped) {
- // TODO: increment counter
ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
return nullptr;
}
struct amqp_connection_info info;
// cache the URL so that parsing could happen in-place
std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
- if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) {
- // TODO: increment counter
- ldout(cct, 1) << "AMQP connect: URL parsing failed" << dendl;
+ const auto retcode = amqp_parse_url(url_cache.data(), &info);
+ if (AMQP_STATUS_OK != retcode) {
+ ldout(cct, 1) << "AMQP connect: URL parsing failed. error: " << retcode << dendl;
return nullptr;
}
std::lock_guard lock(connections_lock);
const auto it = connections.find(id);
if (it != connections.end()) {
- if (it->second->marked_for_deletion) {
- // TODO: increment counter
- ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl;
- return nullptr;
- } else if (it->second->exchange != exchange) {
- // TODO: increment counter
+ if (it->second->exchange != exchange) {
ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
return nullptr;
}
// connection not found, creating a new one
if (connection_count >= max_connections) {
- // TODO: increment counter
ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
return nullptr;
}
- const auto conn = create_new_connection(info, exchange, cct);
+ const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct, verify_ssl, ca_location);
+ if (!conn->is_ok()) {
+ ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" <<
+ status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl;
+ }
// create_new_connection must always return a connection object
// even if error occurred during creation.
// in such a case the creation will be retried in the main thread
const std::string& topic,
const std::string& message) {
if (stopped) {
+ ldout(cct, 1) << "AMQP publish: manager is not running" << dendl;
return RGW_AMQP_STATUS_MANAGER_STOPPED;
}
if (!conn || !conn->is_ok()) {
+ ldout(cct, 1) << "AMQP publish: no connection" << dendl;
return RGW_AMQP_STATUS_CONNECTION_CLOSED;
}
if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
++queued;
return AMQP_STATUS_OK;
}
+ ldout(cct, 1) << "AMQP publish: queue is full" << dendl;
return RGW_AMQP_STATUS_QUEUE_FULL;
}
const std::string& message,
reply_callback_t cb) {
if (stopped) {
+ ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl;
return RGW_AMQP_STATUS_MANAGER_STOPPED;
}
if (!conn || !conn->is_ok()) {
+ ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl;
return RGW_AMQP_STATUS_CONNECTION_CLOSED;
}
if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
++queued;
return AMQP_STATUS_OK;
}
+ ldout(cct, 1) << "AMQP publish_with_confirm: queue is full" << dendl;
return RGW_AMQP_STATUS_QUEUE_FULL;
}
size_t sum = 0;
std::lock_guard lock(connections_lock);
std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
+ // concurrent access to the callback vector is safe without locking
sum += conn_pair.second->callbacks.size();
});
return sum;
static const size_t MAX_CONNECTIONS_DEFAULT = 256;
static const size_t MAX_INFLIGHT_DEFAULT = 8192;
static const size_t MAX_QUEUE_DEFAULT = 8192;
+static const long READ_TIMEOUT_USEC = 100;
+static const unsigned IDLE_TIME_MS = 100;
+static const unsigned RECONNECT_TIME_MS = 100;
bool init(CephContext* cct) {
if (s_manager) {
return false;
}
// TODO: take conf from CephContext
- s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct);
+ s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT,
+ READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct);
return true;
}
s_manager = nullptr;
}
-connection_ptr_t connect(const std::string& url, const std::string& exchange) {
+connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+ boost::optional<const std::string&> ca_location) {
if (!s_manager) return nullptr;
- return s_manager->connect(url, exchange);
+ return s_manager->connect(url, exchange, mandatory_delivery, verify_ssl, ca_location);
}
int publish(connection_ptr_t& conn,
return s_manager->max_queue;
}
-bool disconnect(connection_ptr_t& conn) {
- if (!s_manager) return false;
- return s_manager->disconnect(conn);
-}
-
} // namespace amqp