-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
#include "rgw_amqp.h"
#include <atomic>
#include <mutex>
#include <boost/lockfree/queue.hpp>
+#include <boost/functional/hash.hpp>
#include "common/dout.h"
#include <openssl/ssl.h>
static const int RGW_AMQP_RESPONSE_SOCKET_ERROR = -0x3008;
static const int RGW_AMQP_NO_REPLY_CODE = 0x0;
-// key class for the connection list
-struct connection_id_t {
- const std::string host;
- const int port;
- const std::string vhost;
- // constructed from amqp_connection_info struct
- connection_id_t(const amqp_connection_info& info)
- : host(info.host), port(info.port), vhost(info.vhost) {}
-
- // equality operator and hasher functor are needed
- // so that connection_id_t could be used as key in unordered_map
- bool operator==(const connection_id_t& other) const {
- return host == other.host && port == other.port && vhost == other.vhost;
+// the amqp_connection_info struct does not hold any memory and just points to the URL string
+// so, strings are copied into connection_id_t
+connection_id_t::connection_id_t(const amqp_connection_info& info, const std::string& _exchange)
+ : host(info.host), port(info.port), vhost(info.vhost), exchange(_exchange), ssl(info.ssl) {}
+
+// equality operator and hasher functor are needed
+// so that connection_id_t could be used as key in unordered_map
+bool operator==(const connection_id_t& lhs, const connection_id_t& rhs) {
+ return lhs.host == rhs.host && lhs.port == rhs.port &&
+ lhs.vhost == rhs.vhost && lhs.exchange == rhs.exchange;
+}
+
+struct connection_id_hasher {
+ std::size_t operator()(const connection_id_t& k) const {
+ std::size_t h = 0;
+ boost::hash_combine(h, k.host);
+ boost::hash_combine(h, k.port);
+ boost::hash_combine(h, k.vhost);
+ boost::hash_combine(h, k.exchange);
+ return h;
}
-
- struct hasher {
- std::size_t operator()(const connection_id_t& k) const {
- return ((std::hash<std::string>()(k.host)
- ^ (std::hash<int>()(k.port) << 1)) >> 1)
- ^ (std::hash<std::string>()(k.vhost) << 1);
- }
- };
};
std::string to_string(const connection_id_t& id) {
- return id.host+":"+std::to_string(id.port)+id.vhost;
+ return fmt::format("{}://{}:{}{}?exchange={}",
+ id.ssl ? "amqps" : "amqp",
+ id.host, id.port, id.vhost, id.exchange);
}
-// connection_t state cleaner
-// could be used for automatic cleanup when getting out of scope
+// automatically cleans amqp state when gets out of scope
class ConnectionCleaner {
private:
- amqp_connection_state_t conn;
+ amqp_connection_state_t state;
public:
- ConnectionCleaner(amqp_connection_state_t _conn) : conn(_conn) {}
+ ConnectionCleaner(amqp_connection_state_t _state) : state(_state) {}
~ConnectionCleaner() {
- if (conn) {
- amqp_destroy_connection(conn);
+ if (state) {
+ amqp_destroy_connection(state);
}
}
// call reset() if cleanup is not needed anymore
void reset() {
- conn = nullptr;
+ state = nullptr;
}
};
struct reply_callback_with_tag_t {
uint64_t tag;
reply_callback_t cb;
-
+
reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {}
-
+
bool operator==(uint64_t rhs) {
return tag == rhs;
}
typedef std::vector<reply_callback_with_tag_t> CallbackList;
// struct for holding the connection state object as well as the exchange
-// it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
-// since references to deleted objects may still exist in the calling code
struct connection_t {
- std::atomic<amqp_connection_state_t> state;
- std::string exchange;
+ CephContext* cct = nullptr;
+ amqp_connection_state_t state = nullptr;
+ amqp_bytes_t reply_to_queue = amqp_empty_bytes;
+ uint64_t delivery_tag = 1;
+ int status = AMQP_STATUS_OK;
+ int reply_type = AMQP_RESPONSE_NORMAL;
+ int reply_code = RGW_AMQP_NO_REPLY_CODE;
+ CallbackList callbacks;
+ ceph::coarse_real_clock::time_point next_reconnect = ceph::coarse_real_clock::now();
+ bool mandatory = false;
+ const bool use_ssl = false;
std::string user;
std::string password;
- amqp_bytes_t reply_to_queue;
- uint64_t delivery_tag;
- int status;
- int reply_type;
- int reply_code;
- mutable std::atomic<int> ref_count;
- CephContext* cct;
- CallbackList callbacks;
- ceph::coarse_real_clock::time_point next_reconnect;
- bool mandatory;
- bool use_ssl;
- bool verify_ssl;
+ bool verify_ssl = true;
boost::optional<std::string> ca_location;
utime_t timestamp = ceph_clock_now();
- // default ctor
- connection_t() :
- state(nullptr),
- reply_to_queue(amqp_empty_bytes),
- delivery_tag(1),
- status(AMQP_STATUS_OK),
- reply_type(AMQP_RESPONSE_NORMAL),
- reply_code(RGW_AMQP_NO_REPLY_CODE),
- ref_count(0),
- cct(nullptr),
- next_reconnect(ceph::coarse_real_clock::now()),
- mandatory(false),
- use_ssl(false),
- verify_ssl(false),
- ca_location(boost::none)
- {}
+ connection_t(CephContext* _cct, const amqp_connection_info& info, bool _verify_ssl, boost::optional<const std::string&> _ca_location) :
+ cct(_cct), use_ssl(info.ssl), user(info.user), password(info.password), verify_ssl(_verify_ssl), ca_location(_ca_location) {}
// cleanup of all internal connection resource
// the object can still remain, and internal connection
~connection_t() {
destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
}
-
- friend void intrusive_ptr_add_ref(const connection_t* p);
- friend void intrusive_ptr_release(const connection_t* p);
};
-// these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
-void intrusive_ptr_add_ref(const connection_t* p) {
- ++p->ref_count;
-}
-void intrusive_ptr_release(const connection_t* p) {
- if (--p->ref_count == 0) {
- delete p;
- }
-}
-
// convert connection info to string
std::string to_string(const amqp_connection_info& info) {
std::stringstream ss;
ss << "connection info:" <<
"\nHost: " << info.host <<
"\nPort: " << info.port <<
- "\nUser: " << info.user <<
+ "\nUser: " << info.user <<
"\nPassword: " << info.password <<
"\nvhost: " << info.vhost <<
"\nSSL support: " << info.ssl << std::endl;
return RGW_AMQP_NO_REPLY_CODE;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
return reply.library_error;
- case AMQP_RESPONSE_SERVER_EXCEPTION:
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
if (reply.reply.decoded) {
const amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
return m->reply_code;
return "missing RPC reply type";
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
return amqp_error_string2(reply.library_error);
- case AMQP_RESPONSE_SERVER_EXCEPTION:
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
{
switch (reply.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD:
case AMQP_STATUS_SOCKET_ERROR:
return "AMQP_STATUS_SOCKET_ERROR";
case AMQP_STATUS_INVALID_PARAMETER:
- return "AMQP_STATUS_INVALID_PARAMETER";
+ return "AMQP_STATUS_INVALID_PARAMETER";
case AMQP_STATUS_TABLE_TOO_BIG:
return "AMQP_STATUS_TABLE_TOO_BIG";
case AMQP_STATUS_WRONG_METHOD:
return "AMQP_STATUS_UNSUPPORTED";
#endif
case _AMQP_STATUS_NEXT_VALUE:
- return "AMQP_STATUS_INTERNAL";
+ return "AMQP_STATUS_INTERNAL";
case AMQP_STATUS_TCP_ERROR:
return "AMQP_STATUS_TCP_ERROR";
case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR:
return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR";
case _AMQP_STATUS_TCP_NEXT_VALUE:
- return "AMQP_STATUS_INTERNAL";
+ return "AMQP_STATUS_INTERNAL";
case AMQP_STATUS_SSL_ERROR:
return "AMQP_STATUS_SSL_ERROR";
case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED:
case AMQP_STATUS_SSL_CONNECTION_FAILED:
return "AMQP_STATUS_SSL_CONNECTION_FAILED";
case _AMQP_STATUS_SSL_NEXT_VALUE:
- return "AMQP_STATUS_INTERNAL";
+ return "AMQP_STATUS_INTERNAL";
#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 11, 0, 0)
case AMQP_STATUS_SSL_SET_ENGINE_FAILED:
return "AMQP_STATUS_SSL_SET_ENGINE_FAILED";
#define RETURN_ON_ERROR(C, S, OK) \
if (!OK) { \
C->status = S; \
- return C; \
+ return false; \
}
// in case of RPC calls, getting the RPC reply and return if an error is detected
C->status = S; \
C->reply_type = reply.reply_type; \
C->reply_code = reply_to_code(reply); \
- return C; \
+ return false; \
} \
}
static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2;
// utility function to create a connection, when the connection object already exists
-connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) {
- ceph_assert(conn);
-
+bool new_state(connection_t* conn, const connection_id_t& conn_id) {
+ // state must be null at this point
+ ceph_assert(!conn->state);
// reset all status codes
- conn->status = AMQP_STATUS_OK;
+ conn->status = AMQP_STATUS_OK;
conn->reply_type = AMQP_RESPONSE_NORMAL;
conn->reply_code = RGW_AMQP_NO_REPLY_CODE;
auto state = amqp_new_connection();
if (!state) {
conn->status = RGW_AMQP_STATUS_CONN_ALLOC_FAILED;
- return conn;
+ return false;
}
// make sure that the connection state is cleaned up in case of error
ConnectionCleaner state_guard(state);
// create and open socket
amqp_socket_t *socket = nullptr;
- if (info.ssl) {
+ if (conn->use_ssl) {
socket = amqp_ssl_socket_new(state);
#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
SSL_CTX* ssl_ctx = reinterpret_cast<SSL_CTX*>(amqp_ssl_socket_get_context(socket));
if (!socket) {
conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED;
- return conn;
+ return false;
}
- if (info.ssl) {
+ if (conn->use_ssl) {
if (!conn->verify_ssl) {
amqp_ssl_socket_set_verify_peer(socket, 0);
amqp_ssl_socket_set_verify_hostname(socket, 0);
if (s != AMQP_STATUS_OK) {
conn->status = RGW_AMQP_STATUS_SOCKET_CACERT_FAILED;
conn->reply_code = s;
- return conn;
+ return false;
}
}
}
- const auto s = amqp_socket_open(socket, info.host, info.port);
+ const auto s = amqp_socket_open(socket, conn_id.host.c_str(), conn_id.port);
if (s < 0) {
conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED;
conn->reply_type = RGW_AMQP_RESPONSE_SOCKET_ERROR;
conn->reply_code = s;
- return conn;
+ return false;
}
// login to broker
const auto reply = amqp_login(state,
- info.vhost,
+ conn_id.vhost.c_str(),
AMQP_DEFAULT_MAX_CHANNELS,
AMQP_DEFAULT_FRAME_SIZE,
0, // no heartbeat TODO: add conf
AMQP_SASL_METHOD_PLAIN, // TODO: add other types of security
- info.user,
- info.password);
+ conn->user.c_str(),
+ conn->password.c_str());
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
conn->status = RGW_AMQP_STATUS_LOGIN_FAILED;
conn->reply_type = reply.reply_type;
conn->reply_code = reply_to_code(reply);
- return conn;
+ return false;
}
// open channels
// verify that the topic exchange is there
// TODO: make this step optional
{
- const auto ok = amqp_exchange_declare(state,
+ const auto ok = amqp_exchange_declare(state,
CHANNEL_ID,
- amqp_cstring_bytes(conn->exchange.c_str()),
+ amqp_cstring_bytes(conn_id.exchange.c_str()),
amqp_cstring_bytes("topic"),
1, // passive - exchange must already exist on broker
1, // durable
}
{
// create queue for confirmations
- const auto queue_ok = amqp_queue_declare(state,
+ const auto queue_ok = amqp_queue_declare(state,
CHANNEL_ID, // use the regular channel for this call
amqp_empty_bytes, // let broker allocate queue name
- 0, // not passive - create the queue
- 0, // not durable
- 1, // exclusive
+ 0, // not passive - create the queue
+ 0, // not durable
+ 1, // exclusive
1, // auto-delete
amqp_empty_table // not args TODO add args from conf: TTL, max length etc.
);
RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_Q_DECLARE_FAILED);
// define consumption for connection
- const auto consume_ok = amqp_basic_consume(state,
- CONFIRMING_CHANNEL_ID,
+ const auto consume_ok = amqp_basic_consume(state,
+ CONFIRMING_CHANNEL_ID,
queue_ok->queue,
amqp_empty_bytes, // broker will generate consumer tag
1, // messages sent from client are never routed back
RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED, consume_ok);
RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED);
// broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed
-
+
state_guard.reset();
conn->state = state;
conn->reply_to_queue = amqp_bytes_malloc_dup(queue_ok->queue);
- return conn;
}
-}
-
-// utility function to create a new connection
-connection_ptr_t create_new_connection(const amqp_connection_info& info,
- const std::string& exchange, bool mandatory_delivery, CephContext* cct, bool verify_ssl, boost::optional<const std::string&> ca_location) {
- // create connection state
- connection_ptr_t conn = new connection_t;
- conn->exchange = exchange;
- conn->user.assign(info.user);
- conn->password.assign(info.password);
- conn->mandatory = mandatory_delivery;
- conn->cct = cct;
- conn->use_ssl = info.ssl;
- conn->verify_ssl = verify_ssl;
- conn->ca_location = ca_location;
- return create_connection(conn, info);
+ return true;
}
/// struct used for holding messages in the message queue
struct message_wrapper_t {
- connection_ptr_t conn;
+ connection_id_t conn_id;
std::string topic;
std::string message;
reply_callback_t cb;
-
- message_wrapper_t(connection_ptr_t& _conn,
+
+ message_wrapper_t(const connection_id_t& _conn_id,
const std::string& _topic,
const std::string& _message,
- reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
+ reply_callback_t _cb) : conn_id(_conn_id), topic(_topic), message(_message), cb(_cb) {}
};
+using connection_t_ptr = std::unique_ptr<connection_t>;
-typedef std::unordered_map<connection_id_t, connection_ptr_t, connection_id_t::hasher> ConnectionList;
+typedef std::unordered_map<connection_id_t, connection_t_ptr, connection_id_hasher> ConnectionList;
typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
// macros used inside a loop where an iterator is either incremented or erased
void publish_internal(message_wrapper_t* message) {
const std::unique_ptr<message_wrapper_t> msg_owner(message);
- auto& conn = message->conn;
+ const auto& conn_id = message->conn_id;
+ auto conn_it = connections.find(conn_id);
+ if (conn_it == connections.end()) {
+ ldout(cct, 1) << "AMQP publish: connection '" << to_string(conn_id) << "' not found" << dendl;
+ if (message->cb) {
+ message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
+ }
+ return;
+ }
+
+ auto& conn = conn_it->second;
conn->timestamp = ceph_clock_now();
if (!conn->is_ok()) {
// connection had an issue while message was in the queue
- // TODO add error stats
- ldout(conn->cct, 1) << "AMQP publish: connection had an issue while message was in the queue" << dendl;
+ ldout(cct, 1) << "AMQP publish: connection '" << to_string(conn_id) << "' is closed" << dendl;
if (message->cb) {
message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
}
}
if (message->cb == nullptr) {
- // TODO add error stats
const auto rc = amqp_basic_publish(conn->state,
CHANNEL_ID,
- amqp_cstring_bytes(conn->exchange.c_str()),
+ amqp_cstring_bytes(conn_id.exchange.c_str()),
amqp_cstring_bytes(message->topic.c_str()),
0, // does not have to be routable
0, // not immediate
nullptr, // no properties needed
amqp_cstring_bytes(message->message.c_str()));
if (rc == AMQP_STATUS_OK) {
- ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl;
+ ldout(cct, 20) << "AMQP publish (no callback): OK" << dendl;
return;
}
- ldout(conn->cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl;
+ ldout(cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl;
// an error occurred, close connection
// it will be retied by the main loop
conn->destroy(rc);
}
amqp_basic_properties_t props;
- props._flags =
- AMQP_BASIC_DELIVERY_MODE_FLAG |
+ props._flags =
+ AMQP_BASIC_DELIVERY_MODE_FLAG |
AMQP_BASIC_REPLY_TO_FLAG;
props.delivery_mode = 2; // persistent delivery TODO take from conf
props.reply_to = conn->reply_to_queue;
const auto rc = amqp_basic_publish(conn->state,
CONFIRMING_CHANNEL_ID,
- amqp_cstring_bytes(conn->exchange.c_str()),
+ amqp_cstring_bytes(conn_id.exchange.c_str()),
amqp_cstring_bytes(message->topic.c_str()),
conn->mandatory,
0, // not immediate
if (rc == AMQP_STATUS_OK) {
auto const q_len = conn->callbacks.size();
if (q_len < max_inflight) {
- ldout(conn->cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
+ ldout(cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
conn->callbacks.emplace_back(conn->delivery_tag++, message->cb);
} else {
// immediately invoke callback with error
- ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
+ ldout(cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT);
}
} else {
// an error occurred, close connection
// it will be retied by the main loop
- ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
+ ldout(cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
conn->destroy(rc);
// immediately invoke callback with error
message->cb(rc);
auto incoming_message = false;
// loop over all connections to read acks
for (;conn_it != end_it;) {
-
+
+ const auto& conn_id = conn_it->first;
auto& conn = conn_it->second;
- const auto& conn_key = conn_it->first;
if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
- ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
+ ldout(cct, 20) << "AMQP run: Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
ERASE_AND_CONTINUE(conn_it, connections);
}
if (now >= conn->next_reconnect) {
// pointers are used temporarily inside the amqp_connection_info object
// as read-only values, hence the assignment, and const_cast are safe here
- amqp_connection_info info;
- info.host = const_cast<char*>(conn_key.host.c_str());
- info.port = conn_key.port;
- info.vhost = const_cast<char*>(conn_key.vhost.c_str());
- info.user = const_cast<char*>(conn->user.c_str());
- info.password = const_cast<char*>(conn->password.c_str());
- info.ssl = conn->use_ssl;
- ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
- if (create_connection(conn, info)->is_ok() == false) {
- ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry failed. error: " <<
+ ldout(cct, 20) << "AMQP run: retry connection" << dendl;
+ if (!new_state(conn.get(), conn_id)) {
+ ldout(cct, 10) << "AMQP run: connection '" << to_string(conn_id) << "' retry failed. error: " <<
status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl;
// TODO: add error counter for failed retries
// TODO: add exponential backoff for retries
conn->next_reconnect = now + reconnect_time;
} else {
- ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry successfull" << dendl;
+ ldout(cct, 10) << "AMQP run: connection '" << to_string(conn_id) << "' retry successfull" << dendl;
}
}
INCREMENT_AND_CONTINUE(conn_it);
// TODO mark connection as idle
INCREMENT_AND_CONTINUE(conn_it);
}
-
+
// this is just to prevent spinning idle, does not indicate that a message
// was successfully processed or not
incoming_message = true;
if (rc != AMQP_STATUS_OK) {
// an error occurred, close connection
// it will be retied by the main loop
- ldout(conn->cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
+ ldout(cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
conn->destroy(rc);
INCREMENT_AND_CONTINUE(conn_it);
}
if (frame.frame_type != AMQP_FRAME_METHOD) {
- ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: "
+ ldout(cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: "
<< unsigned(frame.frame_type) << dendl;
// handler is for publish confirmation only - handle only method frames
INCREMENT_AND_CONTINUE(conn_it);
int result;
switch (frame.payload.method.id) {
- case AMQP_BASIC_ACK_METHOD:
+ case AMQP_BASIC_ACK_METHOD:
{
result = AMQP_STATUS_OK;
const auto ack = (amqp_basic_ack_t*)frame.payload.method.decoded;
multiple = nack->multiple;
break;
}
- case AMQP_BASIC_REJECT_METHOD:
- {
- result = RGW_AMQP_STATUS_BROKER_NACK;
- const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;
- tag = reject->delivery_tag;
- multiple = false;
+ case AMQP_BASIC_REJECT_METHOD:
+ {
+ result = RGW_AMQP_STATUS_BROKER_NACK;
+ const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;
+ tag = reject->delivery_tag;
+ multiple = false;
break;
}
case AMQP_CONNECTION_CLOSE_METHOD:
case AMQP_CHANNEL_CLOSE_METHOD:
{
// other side closed the connection, no need to continue
- ldout(conn->cct, 10) << "AMQP run: connection was closed by broker" << dendl;
+ ldout(cct, 10) << "AMQP run: connection was closed by broker" << dendl;
conn->destroy(rc);
INCREMENT_AND_CONTINUE(conn_it);
}
case AMQP_BASIC_RETURN_METHOD:
// message was not delivered, returned to sender
- ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl;
+ ldout(cct, 10) << "AMQP run: message was not routable" << dendl;
INCREMENT_AND_CONTINUE(conn_it);
break;
default:
// unexpected method
- ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl;
+ ldout(cct, 10) << "AMQP run: unexpected message" << dendl;
INCREMENT_AND_CONTINUE(conn_it);
}
- const auto& callbacks_end = conn->callbacks.end();
- const auto& callbacks_begin = conn->callbacks.begin();
- const auto tag_it = std::find(callbacks_begin, callbacks_end, tag);
- if (tag_it != callbacks_end) {
+ const auto tag_it = std::find(conn->callbacks.begin(), conn->callbacks.end(), tag);
+ if (tag_it != conn->callbacks.end()) {
if (multiple) {
// n/ack all up to (and including) the tag
- ldout(conn->cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
- auto it = callbacks_begin;
+ ldout(cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
+ auto it = conn->callbacks.begin();
while (it->tag <= tag && it != conn->callbacks.end()) {
- ldout(conn->cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
+ ldout(cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
it->cb(result);
it = conn->callbacks.erase(it);
}
} else {
// n/ack a specific tag
- ldout(conn->cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
+ ldout(cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
tag_it->cb(result);
conn->callbacks.erase(tag_it);
}
} else {
- ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
+ ldout(cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
}
// just increment the iterator
++conn_it;
public:
Manager(size_t _max_connections,
size_t _max_inflight,
- size_t _max_queue,
+ size_t _max_queue,
long _usec_timeout,
unsigned reconnect_time_ms,
unsigned idle_time_ms,
- CephContext* _cct) :
+ CephContext* _cct) :
max_connections(_max_connections),
max_inflight(_max_inflight),
max_queue(_max_queue),
idle_time(std::chrono::milliseconds(idle_time_ms)),
reconnect_time(std::chrono::milliseconds(reconnect_time_ms)),
runner(&Manager::run, this) {
- // The hashmap has "max connections" as the initial number of buckets,
+ // The hashmap has "max connections" as the initial number of buckets,
// and allows for 10 collisions per bucket before rehash.
- // This is to prevent rehashing so that iterators are not invalidated
+ // This is to prevent rehashing so that iterators are not invalidated
// when a new connection is added.
connections.max_load_factor(10.0);
// give the runner thread a name for easier debugging
}
// connect to a broker, or reuse an existing connection if already connected
- connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+ bool connect(connection_id_t& id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
boost::optional<const std::string&> ca_location) {
if (stopped) {
ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
- return nullptr;
+ return false;
}
- struct amqp_connection_info info;
+ amqp_connection_info info;
// cache the URL so that parsing could happen in-place
std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
const auto retcode = amqp_parse_url(url_cache.data(), &info);
if (AMQP_STATUS_OK != retcode) {
ldout(cct, 1) << "AMQP connect: URL parsing failed. error: " << retcode << dendl;
- return nullptr;
+ return false;
}
+ connection_id_t tmp_id(info, exchange);
- const connection_id_t id(info);
std::lock_guard lock(connections_lock);
- const auto it = connections.find(id);
+ const auto it = connections.find(tmp_id);
if (it != connections.end()) {
- if (it->second->exchange != exchange) {
- ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
- return nullptr;
- }
// connection found - return even if non-ok
ldout(cct, 20) << "AMQP connect: connection found" << dendl;
- return it->second;
+ id = it->first;
+ return true;
}
// connection not found, creating a new one
if (connection_count >= max_connections) {
ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
- return nullptr;
+ return false;
}
- const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct, verify_ssl, ca_location);
- if (!conn->is_ok()) {
- ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" <<
- status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl;
- }
- // create_new_connection must always return a connection object
- // even if error occurred during creation.
- // in such a case the creation will be retried in the main thread
- ceph_assert(conn);
+ // if error occurred during creation the creation will be retried in the main thread
++connection_count;
+ auto conn = connections.emplace(tmp_id, std::make_unique<connection_t>(cct, info, verify_ssl, ca_location)).first->second.get();
ldout(cct, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count << dendl;
- ldout(cct, 10) << "AMQP connect: new connection status is: " << status_to_string(conn->status) << dendl;
- return connections.emplace(id, conn).first->second;
+ if (!new_state(conn, tmp_id)) {
+ ldout(cct, 1) << "AMQP connect: new connection '" << to_string(tmp_id) << "' is created. but state creation failed (will retry). error: " <<
+ status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl;
+ }
+ id = std::move(tmp_id);
+ return true;
}
// TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
- int publish(connection_ptr_t& conn,
+ int publish(const connection_id_t& conn_id,
const std::string& topic,
const std::string& message) {
if (stopped) {
ldout(cct, 1) << "AMQP publish: manager is not running" << dendl;
return RGW_AMQP_STATUS_MANAGER_STOPPED;
}
- if (!conn || !conn->is_ok()) {
- ldout(cct, 1) << "AMQP publish: no connection" << dendl;
- return RGW_AMQP_STATUS_CONNECTION_CLOSED;
- }
- if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
+ auto wrapper = std::make_unique<message_wrapper_t>(conn_id, topic, message, nullptr);
+ if (messages.push(wrapper.get())) {
+ std::ignore = wrapper.release();
++queued;
return AMQP_STATUS_OK;
}
ldout(cct, 1) << "AMQP publish: queue is full" << dendl;
return RGW_AMQP_STATUS_QUEUE_FULL;
}
-
- int publish_with_confirm(connection_ptr_t& conn,
+
+ int publish_with_confirm(const connection_id_t& conn_id,
const std::string& topic,
const std::string& message,
reply_callback_t cb) {
ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl;
return RGW_AMQP_STATUS_MANAGER_STOPPED;
}
- if (!conn || !conn->is_ok()) {
- ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl;
- return RGW_AMQP_STATUS_CONNECTION_CLOSED;
- }
- if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
+ auto wrapper = std::make_unique<message_wrapper_t>(conn_id, topic, message, cb);
+ if (messages.push(wrapper.get())) {
+ std::ignore = wrapper.release();
++queued;
return AMQP_STATUS_OK;
}
size_t get_connection_count() const {
return connection_count;
}
-
+
// get the number of in-flight messages
size_t get_inflight() const {
size_t sum = 0;
static Manager* s_manager = nullptr;
static const size_t MAX_CONNECTIONS_DEFAULT = 256;
-static const size_t MAX_INFLIGHT_DEFAULT = 8192;
+static const size_t MAX_INFLIGHT_DEFAULT = 8192;
static const size_t MAX_QUEUE_DEFAULT = 8192;
static const long READ_TIMEOUT_USEC = 100;
static const unsigned IDLE_TIME_MS = 100;
return false;
}
// TODO: take conf from CephContext
- s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT,
+ s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT,
READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct);
return true;
}
s_manager = nullptr;
}
-connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+bool connect(connection_id_t& conn_id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
boost::optional<const std::string&> ca_location) {
- if (!s_manager) return nullptr;
- return s_manager->connect(url, exchange, mandatory_delivery, verify_ssl, ca_location);
+ if (!s_manager) return false;
+ return s_manager->connect(conn_id, url, exchange, mandatory_delivery, verify_ssl, ca_location);
}
-int publish(connection_ptr_t& conn,
+int publish(const connection_id_t& conn_id,
const std::string& topic,
const std::string& message) {
if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
- return s_manager->publish(conn, topic, message);
+ return s_manager->publish(conn_id, topic, message);
}
-int publish_with_confirm(connection_ptr_t& conn,
+int publish_with_confirm(const connection_id_t& conn_id,
const std::string& topic,
const std::string& message,
reply_callback_t cb) {
if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
- return s_manager->publish_with_confirm(conn, topic, message, cb);
+ return s_manager->publish_with_confirm(conn_id, topic, message, cb);
}
size_t get_connection_count() {
if (!s_manager) return 0;
return s_manager->get_connection_count();
}
-
+
size_t get_inflight() {
if (!s_manager) return 0;
return s_manager->get_inflight();