1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <amqp_tcp_socket.h>
7 #include <amqp_framing.h>
8 #include "include/ceph_assert.h"
11 #include <unordered_map>
17 #include <boost/lockfree/queue.hpp>
19 // TODO investigation, not necessarily issues:
20 // (1) in case of single threaded writer context use spsc_queue
21 // (2) support multiple channels
22 // (3) check performance of emptying queue to local list, and go over the list and publish
23 // (4) use std::shared_mutex (c++17) or equivalent for the connections lock
27 // RGW AMQP status codes for publishing
28 static const int RGW_AMQP_STATUS_BROKER_NACK
= -0x1001;
29 static const int RGW_AMQP_STATUS_CONNECTION_CLOSED
= -0x1002;
30 static const int RGW_AMQP_STATUS_QUEUE_FULL
= -0x1003;
31 static const int RGW_AMQP_STATUS_MAX_INFLIGHT
= -0x1004;
32 // RGW AMQP status code for connection opening
33 static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED
= -0x2001;
34 static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED
= -0x2002;
35 static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED
= -0x2003;
36 static const int RGW_AMQP_STATUS_LOGIN_FAILED
= -0x2004;
37 static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
= -0x2005;
38 static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
= -0x2006;
39 static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED
= -0x2007;
40 static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
= -0x2008;
41 static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
= -0x2009;
43 static const int RGW_AMQP_RESPONSE_SOCKET_ERROR
= -0x3008;
44 static const int RGW_AMQP_NO_REPLY_CODE
= 0x0;
46 // key class for the connection list
47 struct connection_id_t
{
51 // constructed from amqp_connection_info struct
52 connection_id_t(const amqp_connection_info
& info
)
53 : host(info
.host
), port(info
.port
), vhost(info
.vhost
) {}
55 // equality operator and hasher functor are needed
56 // so that connection_id_t could be used as key in unordered_map
57 bool operator==(const connection_id_t
& other
) const {
58 return host
== other
.host
&& port
== other
.port
&& vhost
== other
.vhost
;
62 std::size_t operator()(const connection_id_t
& k
) const {
63 return ((std::hash
<std::string
>()(k
.host
)
64 ^ (std::hash
<int>()(k
.port
) << 1)) >> 1)
65 ^ (std::hash
<std::string
>()(k
.vhost
) << 1);
70 // connection_t state cleaner
71 // could be used for automatic cleanup when getting out of scope
72 class ConnectionCleaner
{
74 amqp_connection_state_t conn
;
76 ConnectionCleaner(amqp_connection_state_t _conn
) : conn(_conn
) {}
77 ~ConnectionCleaner() {
79 amqp_destroy_connection(conn
);
82 // call reset() if cleanup is not needed anymore
88 // struct for holding the callback and its tag in the callback list
89 struct reply_callback_with_tag_t
{
93 reply_callback_with_tag_t(uint64_t _tag
, reply_callback_t _cb
) : tag(_tag
), cb(_cb
) {}
95 bool operator==(uint64_t rhs
) {
100 typedef std::vector
<reply_callback_with_tag_t
> CallbackList
;
102 // struct for holding the connection state object as well as the exchange
103 // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
104 // since references to deleted objects may still exist in the calling code
105 struct connection_t
{
106 amqp_connection_state_t state
;
107 std::string exchange
;
109 std::string password
;
110 amqp_bytes_t reply_to_queue
;
111 bool marked_for_deletion
;
112 uint64_t delivery_tag
;
116 mutable std::atomic
<int> ref_count
;
117 CallbackList callbacks
;
122 reply_to_queue(amqp_empty_bytes
),
123 marked_for_deletion(false),
125 status(AMQP_STATUS_OK
),
126 reply_type(AMQP_RESPONSE_NORMAL
),
127 reply_code(RGW_AMQP_NO_REPLY_CODE
),
130 // cleanup of all internal connection resource
131 // the object can still remain, and internal connection
132 // resources created again on successful reconnection
133 void destroy(int s
) {
135 ConnectionCleaner
clean_state(state
);
137 amqp_bytes_free(reply_to_queue
);
138 reply_to_queue
= amqp_empty_bytes
;
139 // fire all remaining callbacks
140 std::for_each(callbacks
.begin(), callbacks
.end(), [s
](auto& cb_tag
) {
147 return (state
!= nullptr && !marked_for_deletion
);
150 // dtor also destroys the internals
152 destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED
);
155 friend void intrusive_ptr_add_ref(const connection_t
* p
);
156 friend void intrusive_ptr_release(const connection_t
* p
);
159 // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
160 void intrusive_ptr_add_ref(const connection_t
* p
) {
163 void intrusive_ptr_release(const connection_t
* p
) {
164 if (--p
->ref_count
== 0) {
169 // convert connection info to string
170 std::string
to_string(const amqp_connection_info
& info
) {
171 std::stringstream ss
;
172 ss
<< "connection info:" <<
173 "\nHost: " << info
.host
<<
174 "\nPort: " << info
.port
<<
175 "\nUser: " << info
.user
<<
176 "\nPassword: " << info
.password
<<
177 "\nvhost: " << info
.vhost
<<
178 "\nSSL support: " << info
.ssl
<< std::endl
;
182 // convert reply to error code
183 int reply_to_code(const amqp_rpc_reply_t
& reply
) {
184 switch (reply
.reply_type
) {
185 case AMQP_RESPONSE_NONE
:
186 case AMQP_RESPONSE_NORMAL
:
187 return RGW_AMQP_NO_REPLY_CODE
;
188 case AMQP_RESPONSE_LIBRARY_EXCEPTION
:
189 return reply
.library_error
;
190 case AMQP_RESPONSE_SERVER_EXCEPTION
:
191 if (reply
.reply
.decoded
) {
192 const amqp_connection_close_t
* m
= (amqp_connection_close_t
*)reply
.reply
.decoded
;
193 return m
->reply_code
;
195 return reply
.reply
.id
;
197 return RGW_AMQP_NO_REPLY_CODE
;
200 // convert reply to string
201 std::string
to_string(const amqp_rpc_reply_t
& reply
) {
202 std::stringstream ss
;
203 switch (reply
.reply_type
) {
204 case AMQP_RESPONSE_NORMAL
:
206 case AMQP_RESPONSE_NONE
:
207 return "missing RPC reply type";
208 case AMQP_RESPONSE_LIBRARY_EXCEPTION
:
209 return amqp_error_string2(reply
.library_error
);
210 case AMQP_RESPONSE_SERVER_EXCEPTION
:
212 switch (reply
.reply
.id
) {
213 case AMQP_CONNECTION_CLOSE_METHOD
:
214 ss
<< "server connection error: ";
216 case AMQP_CHANNEL_CLOSE_METHOD
:
217 ss
<< "server channel error: ";
220 ss
<< "server unknown error: ";
223 if (reply
.reply
.decoded
) {
224 amqp_connection_close_t
* m
= (amqp_connection_close_t
*)reply
.reply
.decoded
;
225 ss
<< m
->reply_code
<< " text: " << std::string((char*)m
->reply_text
.bytes
, m
->reply_text
.len
);
230 ss
<< "unknown error, method id: " << reply
.reply
.id
;
235 // convert status enum to string
236 std::string
to_string(amqp_status_enum s
) {
239 return "AMQP_STATUS_OK";
240 case AMQP_STATUS_NO_MEMORY
:
241 return "AMQP_STATUS_NO_MEMORY";
242 case AMQP_STATUS_BAD_AMQP_DATA
:
243 return "AMQP_STATUS_BAD_AMQP_DATA";
244 case AMQP_STATUS_UNKNOWN_CLASS
:
245 return "AMQP_STATUS_UNKNOWN_CLASS";
246 case AMQP_STATUS_UNKNOWN_METHOD
:
247 return "AMQP_STATUS_UNKNOWN_METHOD";
248 case AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED
:
249 return "AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED";
250 case AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION
:
251 return "AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION";
252 case AMQP_STATUS_CONNECTION_CLOSED
:
253 return "AMQP_STATUS_CONNECTION_CLOSED";
254 case AMQP_STATUS_BAD_URL
:
255 return "AMQP_STATUS_BAD_URL";
256 case AMQP_STATUS_SOCKET_ERROR
:
257 return "AMQP_STATUS_SOCKET_ERROR";
258 case AMQP_STATUS_INVALID_PARAMETER
:
259 return "AMQP_STATUS_INVALID_PARAMETER";
260 case AMQP_STATUS_TABLE_TOO_BIG
:
261 return "AMQP_STATUS_TABLE_TOO_BIG";
262 case AMQP_STATUS_WRONG_METHOD
:
263 return "AMQP_STATUS_WRONG_METHOD";
264 case AMQP_STATUS_TIMEOUT
:
265 return "AMQP_STATUS_TIMEOUT";
266 case AMQP_STATUS_TIMER_FAILURE
:
267 return "AMQP_STATUS_TIMER_FAILURE";
268 case AMQP_STATUS_HEARTBEAT_TIMEOUT
:
269 return "AMQP_STATUS_HEARTBEAT_TIMEOUT";
270 case AMQP_STATUS_UNEXPECTED_STATE
:
271 return "AMQP_STATUS_UNEXPECTED_STATE";
272 case AMQP_STATUS_SOCKET_CLOSED
:
273 return "AMQP_STATUS_SOCKET_CLOSED";
274 case AMQP_STATUS_SOCKET_INUSE
:
275 return "AMQP_STATUS_SOCKET_INUSE";
276 case AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD
:
277 return "AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD";
278 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 8, 0, 0)
279 case AMQP_STATUS_UNSUPPORTED
:
280 return "AMQP_STATUS_UNSUPPORTED";
282 case _AMQP_STATUS_NEXT_VALUE
:
283 return "AMQP_STATUS_INTERNAL";
284 case AMQP_STATUS_TCP_ERROR
:
285 return "AMQP_STATUS_TCP_ERROR";
286 case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR
:
287 return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR";
288 case _AMQP_STATUS_TCP_NEXT_VALUE
:
289 return "AMQP_STATUS_INTERNAL";
290 case AMQP_STATUS_SSL_ERROR
:
291 return "AMQP_STATUS_SSL_ERROR";
292 case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED
:
293 return "AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED";
294 case AMQP_STATUS_SSL_PEER_VERIFY_FAILED
:
295 return "AMQP_STATUS_SSL_PEER_VERIFY_FAILED";
296 case AMQP_STATUS_SSL_CONNECTION_FAILED
:
297 return "AMQP_STATUS_SSL_CONNECTION_FAILED";
298 case _AMQP_STATUS_SSL_NEXT_VALUE
:
299 return "AMQP_STATUS_INTERNAL";
301 return "AMQP_STATUS_UNKNOWN";
304 // TODO: add status_to_string on the connection object to prinf full status
306 // convert int status to string - including RGW specific values
307 std::string
status_to_string(int s
) {
309 case RGW_AMQP_STATUS_BROKER_NACK
:
310 return "RGW_AMQP_STATUS_BROKER_NACK";
311 case RGW_AMQP_STATUS_CONNECTION_CLOSED
:
312 return "RGW_AMQP_STATUS_CONNECTION_CLOSED";
313 case RGW_AMQP_STATUS_QUEUE_FULL
:
314 return "RGW_AMQP_STATUS_QUEUE_FULL";
315 case RGW_AMQP_STATUS_MAX_INFLIGHT
:
316 return "RGW_AMQP_STATUS_MAX_INFLIGHT";
317 case RGW_AMQP_STATUS_CONN_ALLOC_FAILED
:
318 return "RGW_AMQP_STATUS_CONN_ALLOC_FAILED";
319 case RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED
:
320 return "RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED";
321 case RGW_AMQP_STATUS_SOCKET_OPEN_FAILED
:
322 return "RGW_AMQP_STATUS_SOCKET_OPEN_FAILED";
323 case RGW_AMQP_STATUS_LOGIN_FAILED
:
324 return "RGW_AMQP_STATUS_LOGIN_FAILED";
325 case RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
:
326 return "RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED";
327 case RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
:
328 return "RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED";
329 case RGW_AMQP_STATUS_Q_DECLARE_FAILED
:
330 return "RGW_AMQP_STATUS_Q_DECLARE_FAILED";
331 case RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
:
332 return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
333 case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
:
334 return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
336 return to_string((amqp_status_enum
)s
);
339 // check the result from calls and return if error (=null)
340 #define RETURN_ON_ERROR(C, S, OK) \
346 // in case of RPC calls, getting the RPC reply and return if an error is detected
347 #define RETURN_ON_REPLY_ERROR(C, ST, S) { \
348 const auto reply = amqp_get_rpc_reply(ST); \
349 if (reply.reply_type != AMQP_RESPONSE_NORMAL) { \
351 C->reply_type = reply.reply_type; \
352 C->reply_code = reply_to_code(reply); \
357 static const amqp_channel_t CHANNEL_ID
= 1;
358 static const amqp_channel_t CONFIRMING_CHANNEL_ID
= 2;
360 // utility function to create a connection, when the connection object already exists
361 connection_ptr_t
& create_connection(connection_ptr_t
& conn
, const amqp_connection_info
& info
) {
362 // pointer must be valid and not marked for deletion
363 ceph_assert(conn
&& !conn
->marked_for_deletion
);
365 // reset all status codes
366 conn
->status
= AMQP_STATUS_OK
;
367 conn
->reply_type
= AMQP_RESPONSE_NORMAL
;
368 conn
->reply_code
= RGW_AMQP_NO_REPLY_CODE
;
370 auto state
= amqp_new_connection();
372 conn
->status
= RGW_AMQP_STATUS_CONN_ALLOC_FAILED
;
375 // make sure that the connection state is cleaned up in case of error
376 ConnectionCleaner
state_guard(state
);
378 // create and open socket
379 auto socket
= amqp_tcp_socket_new(state
);
381 conn
->status
= RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED
;
384 const auto s
= amqp_socket_open(socket
, info
.host
, info
.port
);
386 conn
->status
= RGW_AMQP_STATUS_SOCKET_OPEN_FAILED
;
387 conn
->reply_type
= RGW_AMQP_RESPONSE_SOCKET_ERROR
;
388 conn
->reply_code
= s
;
393 const auto reply
= amqp_login(state
,
395 AMQP_DEFAULT_MAX_CHANNELS
,
396 AMQP_DEFAULT_FRAME_SIZE
,
397 0, // no heartbeat TODO: add conf
398 AMQP_SASL_METHOD_PLAIN
, // TODO: add other types of security
401 if (reply
.reply_type
!= AMQP_RESPONSE_NORMAL
) {
402 conn
->status
= RGW_AMQP_STATUS_LOGIN_FAILED
;
403 conn
->reply_type
= reply
.reply_type
;
404 conn
->reply_code
= reply_to_code(reply
);
410 const auto ok
= amqp_channel_open(state
, CHANNEL_ID
);
411 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
, ok
);
412 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
);
415 const auto ok
= amqp_channel_open(state
, CONFIRMING_CHANNEL_ID
);
416 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
, ok
);
417 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
);
420 const auto ok
= amqp_confirm_select(state
, CONFIRMING_CHANNEL_ID
);
421 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
, ok
);
422 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
);
425 // verify that the topic exchange is there
426 // TODO: make this step optional
428 const auto ok
= amqp_exchange_declare(state
,
430 amqp_cstring_bytes(conn
->exchange
.c_str()),
431 amqp_cstring_bytes("topic"),
432 1, // passive - exchange must already exist on broker
434 0, // dont auto-delete
437 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
, ok
);
438 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
);
441 // create queue for confirmations
442 const auto queue_ok
= amqp_queue_declare(state
,
443 CHANNEL_ID
, // use the regular channel for this call
444 amqp_empty_bytes
, // let broker allocate queue name
445 0, // not passive - create the queue
449 amqp_empty_table
// not args TODO add args from conf: TTL, max length etc.
451 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_Q_DECLARE_FAILED
, queue_ok
);
452 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_Q_DECLARE_FAILED
);
454 // define consumption for connection
455 const auto consume_ok
= amqp_basic_consume(state
,
456 CONFIRMING_CHANNEL_ID
,
458 amqp_empty_bytes
, // broker will generate consumer tag
459 1, // messages sent from client are never routed back
460 1, // client does not ack thr acks
461 1, // exclusive access to queue
462 amqp_empty_table
// no parameters
465 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
, consume_ok
);
466 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
);
467 // broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed
471 conn
->reply_to_queue
= amqp_bytes_malloc_dup(queue_ok
->queue
);
476 // utility function to create a new connection
477 connection_ptr_t
create_new_connection(const amqp_connection_info
& info
,
478 const std::string
& exchange
) {
479 // create connection state
480 connection_ptr_t conn
= new connection_t
;
481 conn
->exchange
= exchange
;
482 conn
->user
.assign(info
.user
);
483 conn
->password
.assign(info
.password
);
484 return create_connection(conn
, info
);
487 /// struct used for holding messages in the message queue
488 struct message_wrapper_t
{
489 connection_ptr_t conn
;
494 message_wrapper_t(connection_ptr_t
& _conn
,
495 const std::string
& _topic
,
496 const std::string
& _message
,
497 reply_callback_t _cb
) : conn(_conn
), topic(_topic
), message(_message
), cb(_cb
) {}
501 typedef std::unordered_map
<connection_id_t
, connection_ptr_t
, connection_id_t::hasher
> ConnectionList
;
502 typedef boost::lockfree::queue
<message_wrapper_t
*, boost::lockfree::fixed_sized
<true>> MessageQueue
;
504 // macros used inside a loop where an iterator is either incremented or erased
505 #define INCREMENT_AND_CONTINUE(IT) \
509 #define ERASE_AND_CONTINUE(IT,CONTAINER) \
510 IT=CONTAINER.erase(IT); \
511 --connection_count; \
516 const size_t max_connections
;
517 const size_t max_inflight
;
518 const size_t max_queue
;
520 std::atomic
<size_t> connection_count
;
522 struct timeval read_timeout
;
523 ConnectionList connections
;
524 MessageQueue messages
;
525 std::atomic
<size_t> queued
;
526 std::atomic
<size_t> dequeued
;
527 mutable std::mutex connections_lock
;
530 void publish_internal(message_wrapper_t
* message
) {
531 const std::unique_ptr
<message_wrapper_t
> msg_owner(message
);
532 auto& conn
= message
->conn
;
534 if (!conn
->is_ok()) {
535 // connection had an issue while message was in the queue
536 // TODO add error stats
538 message
->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED
);
543 if (message
->cb
== nullptr) {
544 // TODO add error stats
545 const auto rc
= amqp_basic_publish(conn
->state
,
547 amqp_cstring_bytes(conn
->exchange
.c_str()),
548 amqp_cstring_bytes(message
->topic
.c_str()),
549 1, // mandatory, TODO: take from conf
552 amqp_cstring_bytes(message
->message
.c_str()));
553 if (rc
== AMQP_STATUS_OK
) {
556 // an error occurred, close connection
557 // it will be retied by the main loop
562 amqp_basic_properties_t props
;
564 AMQP_BASIC_DELIVERY_MODE_FLAG
|
565 AMQP_BASIC_REPLY_TO_FLAG
;
566 props
.delivery_mode
= 2; // persistent delivery TODO take from conf
567 props
.reply_to
= conn
->reply_to_queue
;
569 const auto rc
= amqp_basic_publish(conn
->state
,
570 CONFIRMING_CHANNEL_ID
,
571 amqp_cstring_bytes(conn
->exchange
.c_str()),
572 amqp_cstring_bytes(message
->topic
.c_str()),
573 1, // mandatory, TODO: take from conf
576 amqp_cstring_bytes(message
->message
.c_str()));
578 if (rc
== AMQP_STATUS_OK
) {
579 if (conn
->callbacks
.size() < max_inflight
) {
580 conn
->callbacks
.emplace_back(conn
->delivery_tag
++, message
->cb
);
582 // immediately invoke callback with error
583 message
->cb(RGW_AMQP_STATUS_MAX_INFLIGHT
);
586 // an error occurred, close connection
587 // it will be retied by the main loop
589 // immediately invoke callback with error
594 // the managers thread:
595 // (1) empty the queue of messages to be published
596 // (2) loop over all connections and read acks
597 // (3) manages deleted connections
598 // (4) TODO reconnect on connection errors
599 // (5) TODO cleanup timedout callbacks
604 // publish all messages in the queue
605 const auto count
= messages
.consume_all(std::bind(&Manager::publish_internal
, this, std::placeholders::_1
));
607 ConnectionList::iterator conn_it
;
608 ConnectionList::const_iterator end_it
;
610 // thread safe access to the connection list
611 // once the iterators are fetched they are guaranteed to remain valid
612 std::lock_guard
<std::mutex
> lock(connections_lock
);
613 conn_it
= connections
.begin();
614 end_it
= connections
.end();
616 auto incoming_message
= false;
617 // loop over all connections to read acks
618 for (;conn_it
!= end_it
;) {
620 auto& conn
= conn_it
->second
;
621 // delete the connection if marked for deletion
622 if (conn
->marked_for_deletion
) {
623 conn
->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED
);
624 std::lock_guard
<std::mutex
> lock(connections_lock
);
625 // erase is safe - does not invalidate any other iterator
626 // lock so no insertion happens at the same time
627 ERASE_AND_CONTINUE(conn_it
, connections
);
630 // try to reconnect the connection if it has an error
631 if (!conn
->is_ok()) {
632 // pointers are used temporarily inside the amqp_connection_info object
633 // as read-only values, hence the assignment, and const_cast are safe here
634 amqp_connection_info info
;
635 info
.host
= const_cast<char*>(conn_it
->first
.host
.c_str());
636 info
.port
= conn_it
->first
.port
;
637 info
.vhost
= const_cast<char*>(conn_it
->first
.vhost
.c_str());
638 info
.user
= const_cast<char*>(conn
->user
.c_str());
639 info
.password
= const_cast<char*>(conn
->password
.c_str());
640 if (create_connection(conn
, info
)->is_ok() == false) {
641 // TODO: add error counter for failed retries
642 // TODO: add exponential backoff for retries
644 INCREMENT_AND_CONTINUE(conn_it
);
647 const auto rc
= amqp_simple_wait_frame_noblock(conn
->state
, &frame
, &read_timeout
);
649 if (rc
== AMQP_STATUS_TIMEOUT
) {
650 // TODO mark connection as idle
651 INCREMENT_AND_CONTINUE(conn_it
);
654 // this is just to prevent spinning idle, does not indicate that a message
655 // was successfully processed or not
656 incoming_message
= true;
658 // check if error occurred that require reopening the connection
659 if (rc
!= AMQP_STATUS_OK
) {
660 // an error occurred, close connection
661 // it will be retied by the main loop
663 INCREMENT_AND_CONTINUE(conn_it
);
666 if (frame
.frame_type
!= AMQP_FRAME_METHOD
) {
667 // handler is for publish confirmation only - handle only method frames
668 // TODO: add a counter
669 INCREMENT_AND_CONTINUE(conn_it
);
676 switch (frame
.payload
.method
.id
) {
677 case AMQP_BASIC_ACK_METHOD
:
679 result
= AMQP_STATUS_OK
;
680 const auto ack
= (amqp_basic_ack_t
*)frame
.payload
.method
.decoded
;
682 tag
= ack
->delivery_tag
;
683 multiple
= ack
->multiple
;
686 case AMQP_BASIC_NACK_METHOD
:
688 result
= RGW_AMQP_STATUS_BROKER_NACK
;
689 const auto nack
= (amqp_basic_nack_t
*)frame
.payload
.method
.decoded
;
691 tag
= nack
->delivery_tag
;
692 multiple
= nack
->multiple
;
695 case AMQP_CONNECTION_CLOSE_METHOD
:
696 // TODO on channel close, no need to reopen the connection
697 case AMQP_CHANNEL_CLOSE_METHOD
:
699 // other side closed the connection, no need to continue
701 INCREMENT_AND_CONTINUE(conn_it
);
703 case AMQP_BASIC_RETURN_METHOD
:
704 // message was not delivered, returned to sender
705 // TODO: add a counter
706 INCREMENT_AND_CONTINUE(conn_it
);
710 // TODO: add a counter
711 INCREMENT_AND_CONTINUE(conn_it
);
714 const auto& callbacks_end
= conn
->callbacks
.end();
715 const auto& callbacks_begin
= conn
->callbacks
.begin();
716 const auto it
= std::find(callbacks_begin
, callbacks_end
, tag
);
717 if (it
!= callbacks_end
) {
719 // n/ack all up to (and including) the tag
720 for (auto rit
= it
; rit
>= callbacks_begin
; --rit
) {
722 conn
->callbacks
.erase(rit
);
725 // n/ack a specific tag
727 conn
->callbacks
.erase(it
);
730 // TODO add counter for acks with no callback
732 // just increment the iterator
735 // if no messages were received or published, sleep for 100ms
736 if (count
== 0 && !incoming_message
) {
737 std::this_thread::sleep_for(std::chrono::milliseconds(100));
742 // used in the dtor for message cleanup
743 static void delete_message(const message_wrapper_t
* message
) {
748 Manager(size_t _max_connections
,
749 size_t _max_inflight
,
751 long _usec_timeout
) :
752 max_connections(_max_connections
),
753 max_inflight(_max_inflight
),
754 max_queue(_max_queue
),
757 read_timeout
{0, _usec_timeout
},
758 connections(_max_connections
),
762 runner(&Manager::run
, this) {
763 // The hashmap has "max connections" as the initial number of buckets,
764 // and allows for 10 collisions per bucket before rehash.
765 // This is to prevent rehashing so that iterators are not invalidated
766 // when a new connection is added.
767 connections
.max_load_factor(10.0);
771 Manager(const Manager
&) = delete;
772 const Manager
& operator=(const Manager
&) = delete;
774 // stop the main thread
779 // disconnect from a broker
780 bool disconnect(connection_ptr_t
& conn
) {
781 if (!conn
|| stopped
) {
784 conn
->marked_for_deletion
= true;
788 // connect to a broker, or reuse an existing connection if already connected
789 connection_ptr_t
connect(const std::string
& url
, const std::string
& exchange
) {
791 // TODO: increment counter
795 struct amqp_connection_info info
;
796 // cache the URL so that parsing could happen in-place
797 std::vector
<char> url_cache(url
.c_str(), url
.c_str()+url
.size()+1);
798 if (AMQP_STATUS_OK
!= amqp_parse_url(url_cache
.data(), &info
)) {
799 // TODO: increment counter
803 const connection_id_t
id(info
);
804 std::lock_guard
<std::mutex
> lock(connections_lock
);
805 const auto it
= connections
.find(id
);
806 if (it
!= connections
.end()) {
807 if (it
->second
->marked_for_deletion
) {
808 // TODO: increment counter
810 } else if (it
->second
->exchange
!= exchange
) {
811 // TODO: increment counter
814 // connection found - return even if non-ok
818 // connection not found, creating a new one
819 if (connection_count
>= max_connections
) {
820 // TODO: increment counter
823 const auto conn
= create_new_connection(info
, exchange
);
824 // create_new_connection must always return a connection object
825 // even if error occurred during creation.
826 // in such a case the creation will be retried in the main thread
829 return connections
.emplace(id
, conn
).first
->second
;
832 // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
833 int publish(connection_ptr_t
& conn
,
834 const std::string
& topic
,
835 const std::string
& message
) {
836 if (!conn
|| !conn
->is_ok()) {
837 return RGW_AMQP_STATUS_CONNECTION_CLOSED
;
839 if (messages
.push(new message_wrapper_t(conn
, topic
, message
, nullptr))) {
841 return AMQP_STATUS_OK
;
843 return RGW_AMQP_STATUS_QUEUE_FULL
;
846 int publish_with_confirm(connection_ptr_t
& conn
,
847 const std::string
& topic
,
848 const std::string
& message
,
849 reply_callback_t cb
) {
850 if (!conn
|| !conn
->is_ok()) {
851 return RGW_AMQP_STATUS_CONNECTION_CLOSED
;
853 if (messages
.push(new message_wrapper_t(conn
, topic
, message
, cb
))) {
855 return AMQP_STATUS_OK
;
857 return RGW_AMQP_STATUS_QUEUE_FULL
;
860 // dtor wait for thread to stop
861 // then connection are cleaned-up
865 messages
.consume_all(delete_message
);
868 // get the number of connections
869 size_t get_connection_count() const {
870 return connection_count
;
873 // get the number of in-flight messages
874 size_t get_inflight() const {
876 std::lock_guard
<std::mutex
> lock(connections_lock
);
877 std::for_each(connections
.begin(), connections
.end(), [&sum
](auto& conn_pair
) {
878 sum
+= conn_pair
.second
->callbacks
.size();
883 // running counter of the queued messages
884 size_t get_queued() const {
888 // running counter of the dequeued messages
889 size_t get_dequeued() const {
895 // note that the manager itself is not a singleton, and multiple instances may co-exist
896 // TODO get parameters from conf
897 Manager
s_manager(256, 8192, 8192, 100);
899 connection_ptr_t
connect(const std::string
& url
, const std::string
& exchange
) {
900 return s_manager
.connect(url
, exchange
);
903 int publish(connection_ptr_t
& conn
,
904 const std::string
& topic
,
905 const std::string
& message
) {
906 return s_manager
.publish(conn
, topic
, message
);
909 int publish_with_confirm(connection_ptr_t
& conn
,
910 const std::string
& topic
,
911 const std::string
& message
,
912 reply_callback_t cb
) {
913 return s_manager
.publish_with_confirm(conn
, topic
, message
, cb
);
916 size_t get_connection_count() {
917 return s_manager
.get_connection_count();
920 size_t get_inflight() {
921 return s_manager
.get_inflight();
924 size_t get_queued() {
925 return s_manager
.get_queued();
928 size_t get_dequeued() {
929 return s_manager
.get_dequeued();
932 size_t get_max_connections() {
933 return s_manager
.max_connections
;
936 size_t get_max_inflight() {
937 return s_manager
.max_inflight
;
940 size_t get_max_queue() {
941 return s_manager
.max_queue
;
944 bool disconnect(connection_ptr_t
& conn
) {
945 return s_manager
.disconnect(conn
);