1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
6 #include <amqp_ssl_socket.h>
7 #include <amqp_tcp_socket.h>
8 #include <amqp_framing.h>
9 #include "include/ceph_assert.h"
12 #include <unordered_map>
18 #include <boost/lockfree/queue.hpp>
19 #include <boost/functional/hash.hpp>
20 #include "common/dout.h"
21 #include <openssl/ssl.h>
23 #define dout_subsys ceph_subsys_rgw
25 // TODO investigation, not necessarily issues:
26 // (1) in case of single threaded writer context use spsc_queue
27 // (2) support multiple channels
28 // (3) check performance of emptying queue to local list, and go over the list and publish
29 // (4) use std::shared_mutex (c++17) or equivalent for the connections lock
33 // RGW AMQP status codes for publishing
34 static const int RGW_AMQP_STATUS_BROKER_NACK
= -0x1001;
35 static const int RGW_AMQP_STATUS_CONNECTION_CLOSED
= -0x1002;
36 static const int RGW_AMQP_STATUS_QUEUE_FULL
= -0x1003;
37 static const int RGW_AMQP_STATUS_MAX_INFLIGHT
= -0x1004;
38 static const int RGW_AMQP_STATUS_MANAGER_STOPPED
= -0x1005;
39 // RGW AMQP status code for connection opening
40 static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED
= -0x2001;
41 static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED
= -0x2002;
42 static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED
= -0x2003;
43 static const int RGW_AMQP_STATUS_LOGIN_FAILED
= -0x2004;
44 static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
= -0x2005;
45 static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
= -0x2006;
46 static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED
= -0x2007;
47 static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
= -0x2008;
48 static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
= -0x2009;
49 static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED
= -0x2010;
51 static const int RGW_AMQP_RESPONSE_SOCKET_ERROR
= -0x3008;
52 static const int RGW_AMQP_NO_REPLY_CODE
= 0x0;
54 // the amqp_connection_info struct does not hold any memory and just points to the URL string
55 // so, strings are copied into connection_id_t
56 connection_id_t::connection_id_t(const amqp_connection_info
& info
, const std::string
& _exchange
)
57 : host(info
.host
), port(info
.port
), vhost(info
.vhost
), exchange(_exchange
), ssl(info
.ssl
) {}
59 // equality operator and hasher functor are needed
60 // so that connection_id_t could be used as key in unordered_map
61 bool operator==(const connection_id_t
& lhs
, const connection_id_t
& rhs
) {
62 return lhs
.host
== rhs
.host
&& lhs
.port
== rhs
.port
&&
63 lhs
.vhost
== rhs
.vhost
&& lhs
.exchange
== rhs
.exchange
;
66 struct connection_id_hasher
{
67 std::size_t operator()(const connection_id_t
& k
) const {
69 boost::hash_combine(h
, k
.host
);
70 boost::hash_combine(h
, k
.port
);
71 boost::hash_combine(h
, k
.vhost
);
72 boost::hash_combine(h
, k
.exchange
);
77 std::string
to_string(const connection_id_t
& id
) {
78 return fmt::format("{}://{}:{}{}?exchange={}",
79 id
.ssl
? "amqps" : "amqp",
80 id
.host
, id
.port
, id
.vhost
, id
.exchange
);
83 // automatically cleans amqp state when gets out of scope
84 class ConnectionCleaner
{
86 amqp_connection_state_t state
;
88 ConnectionCleaner(amqp_connection_state_t _state
) : state(_state
) {}
89 ~ConnectionCleaner() {
91 amqp_destroy_connection(state
);
94 // call reset() if cleanup is not needed anymore
100 // struct for holding the callback and its tag in the callback list
101 struct reply_callback_with_tag_t
{
105 reply_callback_with_tag_t(uint64_t _tag
, reply_callback_t _cb
) : tag(_tag
), cb(_cb
) {}
107 bool operator==(uint64_t rhs
) {
112 typedef std::vector
<reply_callback_with_tag_t
> CallbackList
;
114 // struct for holding the connection state object as well as the exchange
115 struct connection_t
{
116 CephContext
* cct
= nullptr;
117 amqp_connection_state_t state
= nullptr;
118 amqp_bytes_t reply_to_queue
= amqp_empty_bytes
;
119 uint64_t delivery_tag
= 1;
120 int status
= AMQP_STATUS_OK
;
121 int reply_type
= AMQP_RESPONSE_NORMAL
;
122 int reply_code
= RGW_AMQP_NO_REPLY_CODE
;
123 CallbackList callbacks
;
124 ceph::coarse_real_clock::time_point next_reconnect
= ceph::coarse_real_clock::now();
125 bool mandatory
= false;
126 const bool use_ssl
= false;
128 std::string password
;
129 bool verify_ssl
= true;
130 boost::optional
<std::string
> ca_location
;
131 utime_t timestamp
= ceph_clock_now();
133 connection_t(CephContext
* _cct
, const amqp_connection_info
& info
, bool _verify_ssl
, boost::optional
<const std::string
&> _ca_location
) :
134 cct(_cct
), use_ssl(info
.ssl
), user(info
.user
), password(info
.password
), verify_ssl(_verify_ssl
), ca_location(_ca_location
) {}
136 // cleanup of all internal connection resource
137 // the object can still remain, and internal connection
138 // resources created again on successful reconnection
139 void destroy(int s
) {
141 ConnectionCleaner
clean_state(state
);
143 amqp_bytes_free(reply_to_queue
);
144 reply_to_queue
= amqp_empty_bytes
;
145 // fire all remaining callbacks
146 std::for_each(callbacks
.begin(), callbacks
.end(), [this](auto& cb_tag
) {
148 ldout(cct
, 20) << "AMQP destroy: invoking callback with tag=" << cb_tag
.tag
<< dendl
;
155 return (state
!= nullptr);
158 // dtor also destroys the internals
160 destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED
);
164 // convert connection info to string
165 std::string
to_string(const amqp_connection_info
& info
) {
166 std::stringstream ss
;
167 ss
<< "connection info:" <<
168 "\nHost: " << info
.host
<<
169 "\nPort: " << info
.port
<<
170 "\nUser: " << info
.user
<<
171 "\nPassword: " << info
.password
<<
172 "\nvhost: " << info
.vhost
<<
173 "\nSSL support: " << info
.ssl
<< std::endl
;
177 // convert reply to error code
178 int reply_to_code(const amqp_rpc_reply_t
& reply
) {
179 switch (reply
.reply_type
) {
180 case AMQP_RESPONSE_NONE
:
181 case AMQP_RESPONSE_NORMAL
:
182 return RGW_AMQP_NO_REPLY_CODE
;
183 case AMQP_RESPONSE_LIBRARY_EXCEPTION
:
184 return reply
.library_error
;
185 case AMQP_RESPONSE_SERVER_EXCEPTION
:
186 if (reply
.reply
.decoded
) {
187 const amqp_connection_close_t
* m
= (amqp_connection_close_t
*)reply
.reply
.decoded
;
188 return m
->reply_code
;
190 return reply
.reply
.id
;
192 return RGW_AMQP_NO_REPLY_CODE
;
195 // convert reply to string
196 std::string
to_string(const amqp_rpc_reply_t
& reply
) {
197 std::stringstream ss
;
198 switch (reply
.reply_type
) {
199 case AMQP_RESPONSE_NORMAL
:
201 case AMQP_RESPONSE_NONE
:
202 return "missing RPC reply type";
203 case AMQP_RESPONSE_LIBRARY_EXCEPTION
:
204 return amqp_error_string2(reply
.library_error
);
205 case AMQP_RESPONSE_SERVER_EXCEPTION
:
207 switch (reply
.reply
.id
) {
208 case AMQP_CONNECTION_CLOSE_METHOD
:
209 ss
<< "server connection error: ";
211 case AMQP_CHANNEL_CLOSE_METHOD
:
212 ss
<< "server channel error: ";
215 ss
<< "server unknown error: ";
218 if (reply
.reply
.decoded
) {
219 amqp_connection_close_t
* m
= (amqp_connection_close_t
*)reply
.reply
.decoded
;
220 ss
<< m
->reply_code
<< " text: " << std::string((char*)m
->reply_text
.bytes
, m
->reply_text
.len
);
225 ss
<< "unknown error, method id: " << reply
.reply
.id
;
230 // convert status enum to string
231 std::string
to_string(amqp_status_enum s
) {
234 return "AMQP_STATUS_OK";
235 case AMQP_STATUS_NO_MEMORY
:
236 return "AMQP_STATUS_NO_MEMORY";
237 case AMQP_STATUS_BAD_AMQP_DATA
:
238 return "AMQP_STATUS_BAD_AMQP_DATA";
239 case AMQP_STATUS_UNKNOWN_CLASS
:
240 return "AMQP_STATUS_UNKNOWN_CLASS";
241 case AMQP_STATUS_UNKNOWN_METHOD
:
242 return "AMQP_STATUS_UNKNOWN_METHOD";
243 case AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED
:
244 return "AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED";
245 case AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION
:
246 return "AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION";
247 case AMQP_STATUS_CONNECTION_CLOSED
:
248 return "AMQP_STATUS_CONNECTION_CLOSED";
249 case AMQP_STATUS_BAD_URL
:
250 return "AMQP_STATUS_BAD_URL";
251 case AMQP_STATUS_SOCKET_ERROR
:
252 return "AMQP_STATUS_SOCKET_ERROR";
253 case AMQP_STATUS_INVALID_PARAMETER
:
254 return "AMQP_STATUS_INVALID_PARAMETER";
255 case AMQP_STATUS_TABLE_TOO_BIG
:
256 return "AMQP_STATUS_TABLE_TOO_BIG";
257 case AMQP_STATUS_WRONG_METHOD
:
258 return "AMQP_STATUS_WRONG_METHOD";
259 case AMQP_STATUS_TIMEOUT
:
260 return "AMQP_STATUS_TIMEOUT";
261 case AMQP_STATUS_TIMER_FAILURE
:
262 return "AMQP_STATUS_TIMER_FAILURE";
263 case AMQP_STATUS_HEARTBEAT_TIMEOUT
:
264 return "AMQP_STATUS_HEARTBEAT_TIMEOUT";
265 case AMQP_STATUS_UNEXPECTED_STATE
:
266 return "AMQP_STATUS_UNEXPECTED_STATE";
267 case AMQP_STATUS_SOCKET_CLOSED
:
268 return "AMQP_STATUS_SOCKET_CLOSED";
269 case AMQP_STATUS_SOCKET_INUSE
:
270 return "AMQP_STATUS_SOCKET_INUSE";
271 case AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD
:
272 return "AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD";
273 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 8, 0, 0)
274 case AMQP_STATUS_UNSUPPORTED
:
275 return "AMQP_STATUS_UNSUPPORTED";
277 case _AMQP_STATUS_NEXT_VALUE
:
278 return "AMQP_STATUS_INTERNAL";
279 case AMQP_STATUS_TCP_ERROR
:
280 return "AMQP_STATUS_TCP_ERROR";
281 case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR
:
282 return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR";
283 case _AMQP_STATUS_TCP_NEXT_VALUE
:
284 return "AMQP_STATUS_INTERNAL";
285 case AMQP_STATUS_SSL_ERROR
:
286 return "AMQP_STATUS_SSL_ERROR";
287 case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED
:
288 return "AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED";
289 case AMQP_STATUS_SSL_PEER_VERIFY_FAILED
:
290 return "AMQP_STATUS_SSL_PEER_VERIFY_FAILED";
291 case AMQP_STATUS_SSL_CONNECTION_FAILED
:
292 return "AMQP_STATUS_SSL_CONNECTION_FAILED";
293 case _AMQP_STATUS_SSL_NEXT_VALUE
:
294 return "AMQP_STATUS_INTERNAL";
295 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 11, 0, 0)
296 case AMQP_STATUS_SSL_SET_ENGINE_FAILED
:
297 return "AMQP_STATUS_SSL_SET_ENGINE_FAILED";
300 return "AMQP_STATUS_UNKNOWN";
304 // TODO: add status_to_string on the connection object to prinf full status
306 // convert int status to string - including RGW specific values
307 std::string
status_to_string(int s
) {
309 case RGW_AMQP_STATUS_BROKER_NACK
:
310 return "RGW_AMQP_STATUS_BROKER_NACK";
311 case RGW_AMQP_STATUS_CONNECTION_CLOSED
:
312 return "RGW_AMQP_STATUS_CONNECTION_CLOSED";
313 case RGW_AMQP_STATUS_QUEUE_FULL
:
314 return "RGW_AMQP_STATUS_QUEUE_FULL";
315 case RGW_AMQP_STATUS_MAX_INFLIGHT
:
316 return "RGW_AMQP_STATUS_MAX_INFLIGHT";
317 case RGW_AMQP_STATUS_MANAGER_STOPPED
:
318 return "RGW_AMQP_STATUS_MANAGER_STOPPED";
319 case RGW_AMQP_STATUS_CONN_ALLOC_FAILED
:
320 return "RGW_AMQP_STATUS_CONN_ALLOC_FAILED";
321 case RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED
:
322 return "RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED";
323 case RGW_AMQP_STATUS_SOCKET_OPEN_FAILED
:
324 return "RGW_AMQP_STATUS_SOCKET_OPEN_FAILED";
325 case RGW_AMQP_STATUS_LOGIN_FAILED
:
326 return "RGW_AMQP_STATUS_LOGIN_FAILED";
327 case RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
:
328 return "RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED";
329 case RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
:
330 return "RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED";
331 case RGW_AMQP_STATUS_Q_DECLARE_FAILED
:
332 return "RGW_AMQP_STATUS_Q_DECLARE_FAILED";
333 case RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
:
334 return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
335 case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
:
336 return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
337 case RGW_AMQP_STATUS_SOCKET_CACERT_FAILED
:
338 return "RGW_AMQP_STATUS_SOCKET_CACERT_FAILED";
340 return to_string((amqp_status_enum
)s
);
343 // check the result from calls and return if error (=null)
344 #define RETURN_ON_ERROR(C, S, OK) \
350 // in case of RPC calls, getting the RPC reply and return if an error is detected
351 #define RETURN_ON_REPLY_ERROR(C, ST, S) { \
352 const auto reply = amqp_get_rpc_reply(ST); \
353 if (reply.reply_type != AMQP_RESPONSE_NORMAL) { \
355 C->reply_type = reply.reply_type; \
356 C->reply_code = reply_to_code(reply); \
361 static const amqp_channel_t CHANNEL_ID
= 1;
362 static const amqp_channel_t CONFIRMING_CHANNEL_ID
= 2;
364 // utility function to create a connection, when the connection object already exists
365 bool new_state(connection_t
* conn
, const connection_id_t
& conn_id
) {
366 // state must be null at this point
367 ceph_assert(!conn
->state
);
368 // reset all status codes
369 conn
->status
= AMQP_STATUS_OK
;
370 conn
->reply_type
= AMQP_RESPONSE_NORMAL
;
371 conn
->reply_code
= RGW_AMQP_NO_REPLY_CODE
;
373 auto state
= amqp_new_connection();
375 conn
->status
= RGW_AMQP_STATUS_CONN_ALLOC_FAILED
;
378 // make sure that the connection state is cleaned up in case of error
379 ConnectionCleaner
state_guard(state
);
381 // create and open socket
382 amqp_socket_t
*socket
= nullptr;
384 socket
= amqp_ssl_socket_new(state
);
385 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
386 SSL_CTX
* ssl_ctx
= reinterpret_cast<SSL_CTX
*>(amqp_ssl_socket_get_context(socket
));
388 // taken from https://github.com/alanxz/rabbitmq-c/pull/560
390 const struct amqp_socket_class_t
*klass
;
394 struct hack
*h
= reinterpret_cast<struct hack
*>(socket
);
395 SSL_CTX
* ssl_ctx
= h
->ctx
;
397 // ensure system CA certificates get loaded
398 SSL_CTX_set_default_verify_paths(ssl_ctx
);
401 socket
= amqp_tcp_socket_new(state
);
405 conn
->status
= RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED
;
409 if (!conn
->verify_ssl
) {
410 amqp_ssl_socket_set_verify_peer(socket
, 0);
411 amqp_ssl_socket_set_verify_hostname(socket
, 0);
413 if (conn
->ca_location
.has_value()) {
414 const auto s
= amqp_ssl_socket_set_cacert(socket
, conn
->ca_location
.get().c_str());
415 if (s
!= AMQP_STATUS_OK
) {
416 conn
->status
= RGW_AMQP_STATUS_SOCKET_CACERT_FAILED
;
417 conn
->reply_code
= s
;
422 const auto s
= amqp_socket_open(socket
, conn_id
.host
.c_str(), conn_id
.port
);
424 conn
->status
= RGW_AMQP_STATUS_SOCKET_OPEN_FAILED
;
425 conn
->reply_type
= RGW_AMQP_RESPONSE_SOCKET_ERROR
;
426 conn
->reply_code
= s
;
431 const auto reply
= amqp_login(state
,
432 conn_id
.vhost
.c_str(),
433 AMQP_DEFAULT_MAX_CHANNELS
,
434 AMQP_DEFAULT_FRAME_SIZE
,
435 0, // no heartbeat TODO: add conf
436 AMQP_SASL_METHOD_PLAIN
, // TODO: add other types of security
438 conn
->password
.c_str());
439 if (reply
.reply_type
!= AMQP_RESPONSE_NORMAL
) {
440 conn
->status
= RGW_AMQP_STATUS_LOGIN_FAILED
;
441 conn
->reply_type
= reply
.reply_type
;
442 conn
->reply_code
= reply_to_code(reply
);
448 const auto ok
= amqp_channel_open(state
, CHANNEL_ID
);
449 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
, ok
);
450 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
);
453 const auto ok
= amqp_channel_open(state
, CONFIRMING_CHANNEL_ID
);
454 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
, ok
);
455 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
);
458 const auto ok
= amqp_confirm_select(state
, CONFIRMING_CHANNEL_ID
);
459 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
, ok
);
460 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
);
463 // verify that the topic exchange is there
464 // TODO: make this step optional
466 const auto ok
= amqp_exchange_declare(state
,
468 amqp_cstring_bytes(conn_id
.exchange
.c_str()),
469 amqp_cstring_bytes("topic"),
470 1, // passive - exchange must already exist on broker
472 0, // dont auto-delete
475 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
, ok
);
476 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
);
479 // create queue for confirmations
480 const auto queue_ok
= amqp_queue_declare(state
,
481 CHANNEL_ID
, // use the regular channel for this call
482 amqp_empty_bytes
, // let broker allocate queue name
483 0, // not passive - create the queue
487 amqp_empty_table
// not args TODO add args from conf: TTL, max length etc.
489 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_Q_DECLARE_FAILED
, queue_ok
);
490 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_Q_DECLARE_FAILED
);
492 // define consumption for connection
493 const auto consume_ok
= amqp_basic_consume(state
,
494 CONFIRMING_CHANNEL_ID
,
496 amqp_empty_bytes
, // broker will generate consumer tag
497 1, // messages sent from client are never routed back
498 1, // client does not ack thr acks
499 1, // exclusive access to queue
500 amqp_empty_table
// no parameters
503 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
, consume_ok
);
504 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
);
505 // broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed
509 conn
->reply_to_queue
= amqp_bytes_malloc_dup(queue_ok
->queue
);
514 /// struct used for holding messages in the message queue
515 struct message_wrapper_t
{
516 connection_id_t conn_id
;
521 message_wrapper_t(const connection_id_t
& _conn_id
,
522 const std::string
& _topic
,
523 const std::string
& _message
,
524 reply_callback_t _cb
) : conn_id(_conn_id
), topic(_topic
), message(_message
), cb(_cb
) {}
527 using connection_t_ptr
= std::unique_ptr
<connection_t
>;
529 typedef std::unordered_map
<connection_id_t
, connection_t_ptr
, connection_id_hasher
> ConnectionList
;
530 typedef boost::lockfree::queue
<message_wrapper_t
*, boost::lockfree::fixed_sized
<true>> MessageQueue
;
532 // macros used inside a loop where an iterator is either incremented or erased
533 #define INCREMENT_AND_CONTINUE(IT) \
537 #define ERASE_AND_CONTINUE(IT,CONTAINER) \
538 IT=CONTAINER.erase(IT); \
539 --connection_count; \
544 const size_t max_connections
;
545 const size_t max_inflight
;
546 const size_t max_queue
;
547 const size_t max_idle_time
;
549 std::atomic
<size_t> connection_count
;
550 std::atomic
<bool> stopped
;
551 struct timeval read_timeout
;
552 ConnectionList connections
;
553 MessageQueue messages
;
554 std::atomic
<size_t> queued
;
555 std::atomic
<size_t> dequeued
;
556 CephContext
* const cct
;
557 mutable std::mutex connections_lock
;
558 const ceph::coarse_real_clock::duration idle_time
;
559 const ceph::coarse_real_clock::duration reconnect_time
;
562 void publish_internal(message_wrapper_t
* message
) {
563 const std::unique_ptr
<message_wrapper_t
> msg_owner(message
);
564 const auto& conn_id
= message
->conn_id
;
565 auto conn_it
= connections
.find(conn_id
);
566 if (conn_it
== connections
.end()) {
567 ldout(cct
, 1) << "AMQP publish: connection '" << to_string(conn_id
) << "' not found" << dendl
;
569 message
->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED
);
574 auto& conn
= conn_it
->second
;
576 conn
->timestamp
= ceph_clock_now();
578 if (!conn
->is_ok()) {
579 // connection had an issue while message was in the queue
580 ldout(cct
, 1) << "AMQP publish: connection '" << to_string(conn_id
) << "' is closed" << dendl
;
582 message
->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED
);
587 if (message
->cb
== nullptr) {
588 const auto rc
= amqp_basic_publish(conn
->state
,
590 amqp_cstring_bytes(conn_id
.exchange
.c_str()),
591 amqp_cstring_bytes(message
->topic
.c_str()),
592 0, // does not have to be routable
594 nullptr, // no properties needed
595 amqp_cstring_bytes(message
->message
.c_str()));
596 if (rc
== AMQP_STATUS_OK
) {
597 ldout(cct
, 20) << "AMQP publish (no callback): OK" << dendl
;
600 ldout(cct
, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc
) << dendl
;
601 // an error occurred, close connection
602 // it will be retied by the main loop
607 amqp_basic_properties_t props
;
609 AMQP_BASIC_DELIVERY_MODE_FLAG
|
610 AMQP_BASIC_REPLY_TO_FLAG
;
611 props
.delivery_mode
= 2; // persistent delivery TODO take from conf
612 props
.reply_to
= conn
->reply_to_queue
;
614 const auto rc
= amqp_basic_publish(conn
->state
,
615 CONFIRMING_CHANNEL_ID
,
616 amqp_cstring_bytes(conn_id
.exchange
.c_str()),
617 amqp_cstring_bytes(message
->topic
.c_str()),
621 amqp_cstring_bytes(message
->message
.c_str()));
623 if (rc
== AMQP_STATUS_OK
) {
624 auto const q_len
= conn
->callbacks
.size();
625 if (q_len
< max_inflight
) {
626 ldout(cct
, 20) << "AMQP publish (with callback, tag=" << conn
->delivery_tag
<< "): OK. Queue has: " << q_len
<< " callbacks" << dendl
;
627 conn
->callbacks
.emplace_back(conn
->delivery_tag
++, message
->cb
);
629 // immediately invoke callback with error
630 ldout(cct
, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl
;
631 message
->cb(RGW_AMQP_STATUS_MAX_INFLIGHT
);
634 // an error occurred, close connection
635 // it will be retied by the main loop
636 ldout(cct
, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc
) << dendl
;
638 // immediately invoke callback with error
643 // the managers thread:
644 // (1) empty the queue of messages to be published
645 // (2) loop over all connections and read acks
646 // (3) manages deleted connections
647 // (4) TODO reconnect on connection errors
648 // (5) TODO cleanup timedout callbacks
649 void run() noexcept
{
653 // publish all messages in the queue
654 const auto count
= messages
.consume_all(std::bind(&Manager::publish_internal
, this, std::placeholders::_1
));
656 ConnectionList::iterator conn_it
;
657 ConnectionList::const_iterator end_it
;
659 // thread safe access to the connection list
660 // once the iterators are fetched they are guaranteed to remain valid
661 std::lock_guard
lock(connections_lock
);
662 conn_it
= connections
.begin();
663 end_it
= connections
.end();
665 auto incoming_message
= false;
666 // loop over all connections to read acks
667 for (;conn_it
!= end_it
;) {
669 const auto& conn_id
= conn_it
->first
;
670 auto& conn
= conn_it
->second
;
672 if(conn
->timestamp
.sec() + max_idle_time
< ceph_clock_now()) {
673 ldout(cct
, 20) << "AMQP run: Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl
;
674 ERASE_AND_CONTINUE(conn_it
, connections
);
677 // try to reconnect the connection if it has an error
678 if (!conn
->is_ok()) {
679 const auto now
= ceph::coarse_real_clock::now();
680 if (now
>= conn
->next_reconnect
) {
681 // pointers are used temporarily inside the amqp_connection_info object
682 // as read-only values, hence the assignment, and const_cast are safe here
683 ldout(cct
, 20) << "AMQP run: retry connection" << dendl
;
684 if (!new_state(conn
.get(), conn_id
)) {
685 ldout(cct
, 10) << "AMQP run: connection '" << to_string(conn_id
) << "' retry failed. error: " <<
686 status_to_string(conn
->status
) << " (" << conn
->reply_code
<< ")" << dendl
;
687 // TODO: add error counter for failed retries
688 // TODO: add exponential backoff for retries
689 conn
->next_reconnect
= now
+ reconnect_time
;
691 ldout(cct
, 10) << "AMQP run: connection '" << to_string(conn_id
) << "' retry successfull" << dendl
;
694 INCREMENT_AND_CONTINUE(conn_it
);
697 const auto rc
= amqp_simple_wait_frame_noblock(conn
->state
, &frame
, &read_timeout
);
699 if (rc
== AMQP_STATUS_TIMEOUT
) {
700 // TODO mark connection as idle
701 INCREMENT_AND_CONTINUE(conn_it
);
704 // this is just to prevent spinning idle, does not indicate that a message
705 // was successfully processed or not
706 incoming_message
= true;
708 // check if error occurred that require reopening the connection
709 if (rc
!= AMQP_STATUS_OK
) {
710 // an error occurred, close connection
711 // it will be retied by the main loop
712 ldout(cct
, 1) << "AMQP run: connection read error: " << status_to_string(rc
) << dendl
;
714 INCREMENT_AND_CONTINUE(conn_it
);
717 if (frame
.frame_type
!= AMQP_FRAME_METHOD
) {
718 ldout(cct
, 10) << "AMQP run: ignoring non n/ack messages. frame type: "
719 << unsigned(frame
.frame_type
) << dendl
;
720 // handler is for publish confirmation only - handle only method frames
721 INCREMENT_AND_CONTINUE(conn_it
);
728 switch (frame
.payload
.method
.id
) {
729 case AMQP_BASIC_ACK_METHOD
:
731 result
= AMQP_STATUS_OK
;
732 const auto ack
= (amqp_basic_ack_t
*)frame
.payload
.method
.decoded
;
734 tag
= ack
->delivery_tag
;
735 multiple
= ack
->multiple
;
738 case AMQP_BASIC_NACK_METHOD
:
740 result
= RGW_AMQP_STATUS_BROKER_NACK
;
741 const auto nack
= (amqp_basic_nack_t
*)frame
.payload
.method
.decoded
;
743 tag
= nack
->delivery_tag
;
744 multiple
= nack
->multiple
;
747 case AMQP_BASIC_REJECT_METHOD
:
749 result
= RGW_AMQP_STATUS_BROKER_NACK
;
750 const auto reject
= (amqp_basic_reject_t
*)frame
.payload
.method
.decoded
;
751 tag
= reject
->delivery_tag
;
755 case AMQP_CONNECTION_CLOSE_METHOD
:
756 // TODO on channel close, no need to reopen the connection
757 case AMQP_CHANNEL_CLOSE_METHOD
:
759 // other side closed the connection, no need to continue
760 ldout(cct
, 10) << "AMQP run: connection was closed by broker" << dendl
;
762 INCREMENT_AND_CONTINUE(conn_it
);
764 case AMQP_BASIC_RETURN_METHOD
:
765 // message was not delivered, returned to sender
766 ldout(cct
, 10) << "AMQP run: message was not routable" << dendl
;
767 INCREMENT_AND_CONTINUE(conn_it
);
771 ldout(cct
, 10) << "AMQP run: unexpected message" << dendl
;
772 INCREMENT_AND_CONTINUE(conn_it
);
775 const auto tag_it
= std::find(conn
->callbacks
.begin(), conn
->callbacks
.end(), tag
);
776 if (tag_it
!= conn
->callbacks
.end()) {
778 // n/ack all up to (and including) the tag
779 ldout(cct
, 20) << "AMQP run: multiple n/acks received with tag=" << tag
<< " and result=" << result
<< dendl
;
780 auto it
= conn
->callbacks
.begin();
781 while (it
->tag
<= tag
&& it
!= conn
->callbacks
.end()) {
782 ldout(cct
, 20) << "AMQP run: invoking callback with tag=" << it
->tag
<< dendl
;
784 it
= conn
->callbacks
.erase(it
);
787 // n/ack a specific tag
788 ldout(cct
, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag
<< " and result=" << result
<< dendl
;
790 conn
->callbacks
.erase(tag_it
);
793 ldout(cct
, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag
<< dendl
;
795 // just increment the iterator
798 // if no messages were received or published, sleep for 100ms
799 if (count
== 0 && !incoming_message
) {
800 std::this_thread::sleep_for(idle_time
);
805 // used in the dtor for message cleanup
806 static void delete_message(const message_wrapper_t
* message
) {
811 Manager(size_t _max_connections
,
812 size_t _max_inflight
,
815 unsigned reconnect_time_ms
,
816 unsigned idle_time_ms
,
818 max_connections(_max_connections
),
819 max_inflight(_max_inflight
),
820 max_queue(_max_queue
),
824 read_timeout
{0, _usec_timeout
},
825 connections(_max_connections
),
830 idle_time(std::chrono::milliseconds(idle_time_ms
)),
831 reconnect_time(std::chrono::milliseconds(reconnect_time_ms
)),
832 runner(&Manager::run
, this) {
833 // The hashmap has "max connections" as the initial number of buckets,
834 // and allows for 10 collisions per bucket before rehash.
835 // This is to prevent rehashing so that iterators are not invalidated
836 // when a new connection is added.
837 connections
.max_load_factor(10.0);
838 // give the runner thread a name for easier debugging
839 const auto rc
= ceph_pthread_setname(runner
.native_handle(), "amqp_manager");
844 Manager(const Manager
&) = delete;
845 const Manager
& operator=(const Manager
&) = delete;
847 // stop the main thread
852 // connect to a broker, or reuse an existing connection if already connected
853 bool connect(connection_id_t
& id
, const std::string
& url
, const std::string
& exchange
, bool mandatory_delivery
, bool verify_ssl
,
854 boost::optional
<const std::string
&> ca_location
) {
856 ldout(cct
, 1) << "AMQP connect: manager is stopped" << dendl
;
860 amqp_connection_info info
;
861 // cache the URL so that parsing could happen in-place
862 std::vector
<char> url_cache(url
.c_str(), url
.c_str()+url
.size()+1);
863 const auto retcode
= amqp_parse_url(url_cache
.data(), &info
);
864 if (AMQP_STATUS_OK
!= retcode
) {
865 ldout(cct
, 1) << "AMQP connect: URL parsing failed. error: " << retcode
<< dendl
;
868 connection_id_t
tmp_id(info
, exchange
);
870 std::lock_guard
lock(connections_lock
);
871 const auto it
= connections
.find(tmp_id
);
872 if (it
!= connections
.end()) {
873 // connection found - return even if non-ok
874 ldout(cct
, 20) << "AMQP connect: connection found" << dendl
;
879 // connection not found, creating a new one
880 if (connection_count
>= max_connections
) {
881 ldout(cct
, 1) << "AMQP connect: max connections exceeded" << dendl
;
884 // if error occurred during creation the creation will be retried in the main thread
886 auto conn
= connections
.emplace(tmp_id
, std::make_unique
<connection_t
>(cct
, info
, verify_ssl
, ca_location
)).first
->second
.get();
887 ldout(cct
, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count
<< dendl
;
888 if (!new_state(conn
, tmp_id
)) {
889 ldout(cct
, 1) << "AMQP connect: new connection '" << to_string(tmp_id
) << "' is created. but state creation failed (will retry). error: " <<
890 status_to_string(conn
->status
) << " (" << conn
->reply_code
<< ")" << dendl
;
892 id
= std::move(tmp_id
);
896 // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
897 int publish(const connection_id_t
& conn_id
,
898 const std::string
& topic
,
899 const std::string
& message
) {
901 ldout(cct
, 1) << "AMQP publish: manager is not running" << dendl
;
902 return RGW_AMQP_STATUS_MANAGER_STOPPED
;
904 auto wrapper
= std::make_unique
<message_wrapper_t
>(conn_id
, topic
, message
, nullptr);
905 if (messages
.push(wrapper
.get())) {
906 std::ignore
= wrapper
.release();
908 return AMQP_STATUS_OK
;
910 ldout(cct
, 1) << "AMQP publish: queue is full" << dendl
;
911 return RGW_AMQP_STATUS_QUEUE_FULL
;
914 int publish_with_confirm(const connection_id_t
& conn_id
,
915 const std::string
& topic
,
916 const std::string
& message
,
917 reply_callback_t cb
) {
919 ldout(cct
, 1) << "AMQP publish_with_confirm: manager is not running" << dendl
;
920 return RGW_AMQP_STATUS_MANAGER_STOPPED
;
922 auto wrapper
= std::make_unique
<message_wrapper_t
>(conn_id
, topic
, message
, cb
);
923 if (messages
.push(wrapper
.get())) {
924 std::ignore
= wrapper
.release();
926 return AMQP_STATUS_OK
;
928 ldout(cct
, 1) << "AMQP publish_with_confirm: queue is full" << dendl
;
929 return RGW_AMQP_STATUS_QUEUE_FULL
;
932 // dtor wait for thread to stop
933 // then connection are cleaned-up
937 messages
.consume_all(delete_message
);
940 // get the number of connections
941 size_t get_connection_count() const {
942 return connection_count
;
945 // get the number of in-flight messages
946 size_t get_inflight() const {
948 std::lock_guard
lock(connections_lock
);
949 std::for_each(connections
.begin(), connections
.end(), [&sum
](auto& conn_pair
) {
950 // concurrent access to the callback vector is safe without locking
951 sum
+= conn_pair
.second
->callbacks
.size();
956 // running counter of the queued messages
957 size_t get_queued() const {
961 // running counter of the dequeued messages
962 size_t get_dequeued() const {
968 // note that the manager itself is not a singleton, and multiple instances may co-exist
969 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
970 static Manager
* s_manager
= nullptr;
972 static const size_t MAX_CONNECTIONS_DEFAULT
= 256;
973 static const size_t MAX_INFLIGHT_DEFAULT
= 8192;
974 static const size_t MAX_QUEUE_DEFAULT
= 8192;
975 static const long READ_TIMEOUT_USEC
= 100;
976 static const unsigned IDLE_TIME_MS
= 100;
977 static const unsigned RECONNECT_TIME_MS
= 100;
979 bool init(CephContext
* cct
) {
983 // TODO: take conf from CephContext
984 s_manager
= new Manager(MAX_CONNECTIONS_DEFAULT
, MAX_INFLIGHT_DEFAULT
, MAX_QUEUE_DEFAULT
,
985 READ_TIMEOUT_USEC
, IDLE_TIME_MS
, RECONNECT_TIME_MS
, cct
);
994 bool connect(connection_id_t
& conn_id
, const std::string
& url
, const std::string
& exchange
, bool mandatory_delivery
, bool verify_ssl
,
995 boost::optional
<const std::string
&> ca_location
) {
996 if (!s_manager
) return false;
997 return s_manager
->connect(conn_id
, url
, exchange
, mandatory_delivery
, verify_ssl
, ca_location
);
1000 int publish(const connection_id_t
& conn_id
,
1001 const std::string
& topic
,
1002 const std::string
& message
) {
1003 if (!s_manager
) return RGW_AMQP_STATUS_MANAGER_STOPPED
;
1004 return s_manager
->publish(conn_id
, topic
, message
);
1007 int publish_with_confirm(const connection_id_t
& conn_id
,
1008 const std::string
& topic
,
1009 const std::string
& message
,
1010 reply_callback_t cb
) {
1011 if (!s_manager
) return RGW_AMQP_STATUS_MANAGER_STOPPED
;
1012 return s_manager
->publish_with_confirm(conn_id
, topic
, message
, cb
);
1015 size_t get_connection_count() {
1016 if (!s_manager
) return 0;
1017 return s_manager
->get_connection_count();
1020 size_t get_inflight() {
1021 if (!s_manager
) return 0;
1022 return s_manager
->get_inflight();
1025 size_t get_queued() {
1026 if (!s_manager
) return 0;
1027 return s_manager
->get_queued();
1030 size_t get_dequeued() {
1031 if (!s_manager
) return 0;
1032 return s_manager
->get_dequeued();
1035 size_t get_max_connections() {
1036 if (!s_manager
) return MAX_CONNECTIONS_DEFAULT
;
1037 return s_manager
->max_connections
;
1040 size_t get_max_inflight() {
1041 if (!s_manager
) return MAX_INFLIGHT_DEFAULT
;
1042 return s_manager
->max_inflight
;
1045 size_t get_max_queue() {
1046 if (!s_manager
) return MAX_QUEUE_DEFAULT
;
1047 return s_manager
->max_queue
;