1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
6 #include <amqp_tcp_socket.h>
7 #include <amqp_framing.h>
8 #include "include/ceph_assert.h"
11 #include <unordered_map>
17 #include <boost/lockfree/queue.hpp>
18 #include "common/dout.h"
20 #define dout_subsys ceph_subsys_rgw
22 // TODO investigation, not necessarily issues:
23 // (1) in case of single threaded writer context use spsc_queue
24 // (2) support multiple channels
25 // (3) check performance of emptying queue to local list, and go over the list and publish
26 // (4) use std::shared_mutex (c++17) or equivalent for the connections lock
30 // RGW AMQP status codes for publishing
31 static const int RGW_AMQP_STATUS_BROKER_NACK
= -0x1001;
32 static const int RGW_AMQP_STATUS_CONNECTION_CLOSED
= -0x1002;
33 static const int RGW_AMQP_STATUS_QUEUE_FULL
= -0x1003;
34 static const int RGW_AMQP_STATUS_MAX_INFLIGHT
= -0x1004;
35 static const int RGW_AMQP_STATUS_MANAGER_STOPPED
= -0x1005;
36 // RGW AMQP status code for connection opening
37 static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED
= -0x2001;
38 static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED
= -0x2002;
39 static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED
= -0x2003;
40 static const int RGW_AMQP_STATUS_LOGIN_FAILED
= -0x2004;
41 static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
= -0x2005;
42 static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
= -0x2006;
43 static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED
= -0x2007;
44 static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
= -0x2008;
45 static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
= -0x2009;
47 static const int RGW_AMQP_RESPONSE_SOCKET_ERROR
= -0x3008;
48 static const int RGW_AMQP_NO_REPLY_CODE
= 0x0;
50 // key class for the connection list
51 struct connection_id_t
{
52 const std::string host
;
54 const std::string vhost
;
55 // constructed from amqp_connection_info struct
56 connection_id_t(const amqp_connection_info
& info
)
57 : host(info
.host
), port(info
.port
), vhost(info
.vhost
) {}
59 // equality operator and hasher functor are needed
60 // so that connection_id_t could be used as key in unordered_map
61 bool operator==(const connection_id_t
& other
) const {
62 return host
== other
.host
&& port
== other
.port
&& vhost
== other
.vhost
;
66 std::size_t operator()(const connection_id_t
& k
) const {
67 return ((std::hash
<std::string
>()(k
.host
)
68 ^ (std::hash
<int>()(k
.port
) << 1)) >> 1)
69 ^ (std::hash
<std::string
>()(k
.vhost
) << 1);
74 std::string
to_string(const connection_id_t
& id
) {
75 return id
.host
+":"+std::to_string(id
.port
)+"/"+id
.vhost
;
78 // connection_t state cleaner
79 // could be used for automatic cleanup when getting out of scope
80 class ConnectionCleaner
{
82 amqp_connection_state_t conn
;
84 ConnectionCleaner(amqp_connection_state_t _conn
) : conn(_conn
) {}
85 ~ConnectionCleaner() {
87 amqp_destroy_connection(conn
);
90 // call reset() if cleanup is not needed anymore
96 // struct for holding the callback and its tag in the callback list
97 struct reply_callback_with_tag_t
{
101 reply_callback_with_tag_t(uint64_t _tag
, reply_callback_t _cb
) : tag(_tag
), cb(_cb
) {}
103 bool operator==(uint64_t rhs
) {
108 typedef std::vector
<reply_callback_with_tag_t
> CallbackList
;
110 // struct for holding the connection state object as well as the exchange
111 // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
112 // since references to deleted objects may still exist in the calling code
113 struct connection_t
{
114 amqp_connection_state_t state
;
115 std::string exchange
;
117 std::string password
;
118 amqp_bytes_t reply_to_queue
;
119 bool marked_for_deletion
;
120 uint64_t delivery_tag
;
124 mutable std::atomic
<int> ref_count
;
126 CallbackList callbacks
;
131 reply_to_queue(amqp_empty_bytes
),
132 marked_for_deletion(false),
134 status(AMQP_STATUS_OK
),
135 reply_type(AMQP_RESPONSE_NORMAL
),
136 reply_code(RGW_AMQP_NO_REPLY_CODE
),
140 // cleanup of all internal connection resource
141 // the object can still remain, and internal connection
142 // resources created again on successful reconnection
143 void destroy(int s
) {
145 ConnectionCleaner
clean_state(state
);
147 amqp_bytes_free(reply_to_queue
);
148 reply_to_queue
= amqp_empty_bytes
;
149 // fire all remaining callbacks
150 std::for_each(callbacks
.begin(), callbacks
.end(), [this](auto& cb_tag
) {
152 ldout(cct
, 20) << "AMQP destroy: invoking callback with tag=" << cb_tag
.tag
<< dendl
;
159 return (state
!= nullptr && !marked_for_deletion
);
162 // dtor also destroys the internals
164 destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED
);
167 friend void intrusive_ptr_add_ref(const connection_t
* p
);
168 friend void intrusive_ptr_release(const connection_t
* p
);
171 // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
172 void intrusive_ptr_add_ref(const connection_t
* p
) {
175 void intrusive_ptr_release(const connection_t
* p
) {
176 if (--p
->ref_count
== 0) {
181 // convert connection info to string
182 std::string
to_string(const amqp_connection_info
& info
) {
183 std::stringstream ss
;
184 ss
<< "connection info:" <<
185 "\nHost: " << info
.host
<<
186 "\nPort: " << info
.port
<<
187 "\nUser: " << info
.user
<<
188 "\nPassword: " << info
.password
<<
189 "\nvhost: " << info
.vhost
<<
190 "\nSSL support: " << info
.ssl
<< std::endl
;
194 // convert reply to error code
195 int reply_to_code(const amqp_rpc_reply_t
& reply
) {
196 switch (reply
.reply_type
) {
197 case AMQP_RESPONSE_NONE
:
198 case AMQP_RESPONSE_NORMAL
:
199 return RGW_AMQP_NO_REPLY_CODE
;
200 case AMQP_RESPONSE_LIBRARY_EXCEPTION
:
201 return reply
.library_error
;
202 case AMQP_RESPONSE_SERVER_EXCEPTION
:
203 if (reply
.reply
.decoded
) {
204 const amqp_connection_close_t
* m
= (amqp_connection_close_t
*)reply
.reply
.decoded
;
205 return m
->reply_code
;
207 return reply
.reply
.id
;
209 return RGW_AMQP_NO_REPLY_CODE
;
212 // convert reply to string
213 std::string
to_string(const amqp_rpc_reply_t
& reply
) {
214 std::stringstream ss
;
215 switch (reply
.reply_type
) {
216 case AMQP_RESPONSE_NORMAL
:
218 case AMQP_RESPONSE_NONE
:
219 return "missing RPC reply type";
220 case AMQP_RESPONSE_LIBRARY_EXCEPTION
:
221 return amqp_error_string2(reply
.library_error
);
222 case AMQP_RESPONSE_SERVER_EXCEPTION
:
224 switch (reply
.reply
.id
) {
225 case AMQP_CONNECTION_CLOSE_METHOD
:
226 ss
<< "server connection error: ";
228 case AMQP_CHANNEL_CLOSE_METHOD
:
229 ss
<< "server channel error: ";
232 ss
<< "server unknown error: ";
235 if (reply
.reply
.decoded
) {
236 amqp_connection_close_t
* m
= (amqp_connection_close_t
*)reply
.reply
.decoded
;
237 ss
<< m
->reply_code
<< " text: " << std::string((char*)m
->reply_text
.bytes
, m
->reply_text
.len
);
242 ss
<< "unknown error, method id: " << reply
.reply
.id
;
247 // convert status enum to string
248 std::string
to_string(amqp_status_enum s
) {
251 return "AMQP_STATUS_OK";
252 case AMQP_STATUS_NO_MEMORY
:
253 return "AMQP_STATUS_NO_MEMORY";
254 case AMQP_STATUS_BAD_AMQP_DATA
:
255 return "AMQP_STATUS_BAD_AMQP_DATA";
256 case AMQP_STATUS_UNKNOWN_CLASS
:
257 return "AMQP_STATUS_UNKNOWN_CLASS";
258 case AMQP_STATUS_UNKNOWN_METHOD
:
259 return "AMQP_STATUS_UNKNOWN_METHOD";
260 case AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED
:
261 return "AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED";
262 case AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION
:
263 return "AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION";
264 case AMQP_STATUS_CONNECTION_CLOSED
:
265 return "AMQP_STATUS_CONNECTION_CLOSED";
266 case AMQP_STATUS_BAD_URL
:
267 return "AMQP_STATUS_BAD_URL";
268 case AMQP_STATUS_SOCKET_ERROR
:
269 return "AMQP_STATUS_SOCKET_ERROR";
270 case AMQP_STATUS_INVALID_PARAMETER
:
271 return "AMQP_STATUS_INVALID_PARAMETER";
272 case AMQP_STATUS_TABLE_TOO_BIG
:
273 return "AMQP_STATUS_TABLE_TOO_BIG";
274 case AMQP_STATUS_WRONG_METHOD
:
275 return "AMQP_STATUS_WRONG_METHOD";
276 case AMQP_STATUS_TIMEOUT
:
277 return "AMQP_STATUS_TIMEOUT";
278 case AMQP_STATUS_TIMER_FAILURE
:
279 return "AMQP_STATUS_TIMER_FAILURE";
280 case AMQP_STATUS_HEARTBEAT_TIMEOUT
:
281 return "AMQP_STATUS_HEARTBEAT_TIMEOUT";
282 case AMQP_STATUS_UNEXPECTED_STATE
:
283 return "AMQP_STATUS_UNEXPECTED_STATE";
284 case AMQP_STATUS_SOCKET_CLOSED
:
285 return "AMQP_STATUS_SOCKET_CLOSED";
286 case AMQP_STATUS_SOCKET_INUSE
:
287 return "AMQP_STATUS_SOCKET_INUSE";
288 case AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD
:
289 return "AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD";
290 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 8, 0, 0)
291 case AMQP_STATUS_UNSUPPORTED
:
292 return "AMQP_STATUS_UNSUPPORTED";
294 case _AMQP_STATUS_NEXT_VALUE
:
295 return "AMQP_STATUS_INTERNAL";
296 case AMQP_STATUS_TCP_ERROR
:
297 return "AMQP_STATUS_TCP_ERROR";
298 case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR
:
299 return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR";
300 case _AMQP_STATUS_TCP_NEXT_VALUE
:
301 return "AMQP_STATUS_INTERNAL";
302 case AMQP_STATUS_SSL_ERROR
:
303 return "AMQP_STATUS_SSL_ERROR";
304 case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED
:
305 return "AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED";
306 case AMQP_STATUS_SSL_PEER_VERIFY_FAILED
:
307 return "AMQP_STATUS_SSL_PEER_VERIFY_FAILED";
308 case AMQP_STATUS_SSL_CONNECTION_FAILED
:
309 return "AMQP_STATUS_SSL_CONNECTION_FAILED";
310 case _AMQP_STATUS_SSL_NEXT_VALUE
:
311 return "AMQP_STATUS_INTERNAL";
313 return "AMQP_STATUS_UNKNOWN";
316 // TODO: add status_to_string on the connection object to prinf full status
318 // convert int status to string - including RGW specific values
319 std::string
status_to_string(int s
) {
321 case RGW_AMQP_STATUS_BROKER_NACK
:
322 return "RGW_AMQP_STATUS_BROKER_NACK";
323 case RGW_AMQP_STATUS_CONNECTION_CLOSED
:
324 return "RGW_AMQP_STATUS_CONNECTION_CLOSED";
325 case RGW_AMQP_STATUS_QUEUE_FULL
:
326 return "RGW_AMQP_STATUS_QUEUE_FULL";
327 case RGW_AMQP_STATUS_MAX_INFLIGHT
:
328 return "RGW_AMQP_STATUS_MAX_INFLIGHT";
329 case RGW_AMQP_STATUS_MANAGER_STOPPED
:
330 return "RGW_AMQP_STATUS_MANAGER_STOPPED";
331 case RGW_AMQP_STATUS_CONN_ALLOC_FAILED
:
332 return "RGW_AMQP_STATUS_CONN_ALLOC_FAILED";
333 case RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED
:
334 return "RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED";
335 case RGW_AMQP_STATUS_SOCKET_OPEN_FAILED
:
336 return "RGW_AMQP_STATUS_SOCKET_OPEN_FAILED";
337 case RGW_AMQP_STATUS_LOGIN_FAILED
:
338 return "RGW_AMQP_STATUS_LOGIN_FAILED";
339 case RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
:
340 return "RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED";
341 case RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
:
342 return "RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED";
343 case RGW_AMQP_STATUS_Q_DECLARE_FAILED
:
344 return "RGW_AMQP_STATUS_Q_DECLARE_FAILED";
345 case RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
:
346 return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
347 case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
:
348 return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
350 return to_string((amqp_status_enum
)s
);
353 // check the result from calls and return if error (=null)
354 #define RETURN_ON_ERROR(C, S, OK) \
360 // in case of RPC calls, getting the RPC reply and return if an error is detected
361 #define RETURN_ON_REPLY_ERROR(C, ST, S) { \
362 const auto reply = amqp_get_rpc_reply(ST); \
363 if (reply.reply_type != AMQP_RESPONSE_NORMAL) { \
365 C->reply_type = reply.reply_type; \
366 C->reply_code = reply_to_code(reply); \
371 static const amqp_channel_t CHANNEL_ID
= 1;
372 static const amqp_channel_t CONFIRMING_CHANNEL_ID
= 2;
374 // utility function to create a connection, when the connection object already exists
375 connection_ptr_t
& create_connection(connection_ptr_t
& conn
, const amqp_connection_info
& info
) {
376 // pointer must be valid and not marked for deletion
377 ceph_assert(conn
&& !conn
->marked_for_deletion
);
379 // reset all status codes
380 conn
->status
= AMQP_STATUS_OK
;
381 conn
->reply_type
= AMQP_RESPONSE_NORMAL
;
382 conn
->reply_code
= RGW_AMQP_NO_REPLY_CODE
;
384 auto state
= amqp_new_connection();
386 conn
->status
= RGW_AMQP_STATUS_CONN_ALLOC_FAILED
;
389 // make sure that the connection state is cleaned up in case of error
390 ConnectionCleaner
state_guard(state
);
392 // create and open socket
393 auto socket
= amqp_tcp_socket_new(state
);
395 conn
->status
= RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED
;
398 const auto s
= amqp_socket_open(socket
, info
.host
, info
.port
);
400 conn
->status
= RGW_AMQP_STATUS_SOCKET_OPEN_FAILED
;
401 conn
->reply_type
= RGW_AMQP_RESPONSE_SOCKET_ERROR
;
402 conn
->reply_code
= s
;
407 const auto reply
= amqp_login(state
,
409 AMQP_DEFAULT_MAX_CHANNELS
,
410 AMQP_DEFAULT_FRAME_SIZE
,
411 0, // no heartbeat TODO: add conf
412 AMQP_SASL_METHOD_PLAIN
, // TODO: add other types of security
415 if (reply
.reply_type
!= AMQP_RESPONSE_NORMAL
) {
416 conn
->status
= RGW_AMQP_STATUS_LOGIN_FAILED
;
417 conn
->reply_type
= reply
.reply_type
;
418 conn
->reply_code
= reply_to_code(reply
);
424 const auto ok
= amqp_channel_open(state
, CHANNEL_ID
);
425 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
, ok
);
426 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
);
429 const auto ok
= amqp_channel_open(state
, CONFIRMING_CHANNEL_ID
);
430 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
, ok
);
431 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
);
434 const auto ok
= amqp_confirm_select(state
, CONFIRMING_CHANNEL_ID
);
435 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
, ok
);
436 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
);
439 // verify that the topic exchange is there
440 // TODO: make this step optional
442 const auto ok
= amqp_exchange_declare(state
,
444 amqp_cstring_bytes(conn
->exchange
.c_str()),
445 amqp_cstring_bytes("topic"),
446 1, // passive - exchange must already exist on broker
448 0, // dont auto-delete
451 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
, ok
);
452 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
);
455 // create queue for confirmations
456 const auto queue_ok
= amqp_queue_declare(state
,
457 CHANNEL_ID
, // use the regular channel for this call
458 amqp_empty_bytes
, // let broker allocate queue name
459 0, // not passive - create the queue
463 amqp_empty_table
// not args TODO add args from conf: TTL, max length etc.
465 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_Q_DECLARE_FAILED
, queue_ok
);
466 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_Q_DECLARE_FAILED
);
468 // define consumption for connection
469 const auto consume_ok
= amqp_basic_consume(state
,
470 CONFIRMING_CHANNEL_ID
,
472 amqp_empty_bytes
, // broker will generate consumer tag
473 1, // messages sent from client are never routed back
474 1, // client does not ack thr acks
475 1, // exclusive access to queue
476 amqp_empty_table
// no parameters
479 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
, consume_ok
);
480 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
);
481 // broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed
485 conn
->reply_to_queue
= amqp_bytes_malloc_dup(queue_ok
->queue
);
490 // utility function to create a new connection
491 connection_ptr_t
create_new_connection(const amqp_connection_info
& info
,
492 const std::string
& exchange
, CephContext
* cct
) {
493 // create connection state
494 connection_ptr_t conn
= new connection_t
;
495 conn
->exchange
= exchange
;
496 conn
->user
.assign(info
.user
);
497 conn
->password
.assign(info
.password
);
499 return create_connection(conn
, info
);
502 /// struct used for holding messages in the message queue
503 struct message_wrapper_t
{
504 connection_ptr_t conn
;
509 message_wrapper_t(connection_ptr_t
& _conn
,
510 const std::string
& _topic
,
511 const std::string
& _message
,
512 reply_callback_t _cb
) : conn(_conn
), topic(_topic
), message(_message
), cb(_cb
) {}
516 typedef std::unordered_map
<connection_id_t
, connection_ptr_t
, connection_id_t::hasher
> ConnectionList
;
517 typedef boost::lockfree::queue
<message_wrapper_t
*, boost::lockfree::fixed_sized
<true>> MessageQueue
;
519 // macros used inside a loop where an iterator is either incremented or erased
520 #define INCREMENT_AND_CONTINUE(IT) \
524 #define ERASE_AND_CONTINUE(IT,CONTAINER) \
525 IT=CONTAINER.erase(IT); \
526 --connection_count; \
531 const size_t max_connections
;
532 const size_t max_inflight
;
533 const size_t max_queue
;
535 std::atomic
<size_t> connection_count
;
537 struct timeval read_timeout
;
538 ConnectionList connections
;
539 MessageQueue messages
;
540 std::atomic
<size_t> queued
;
541 std::atomic
<size_t> dequeued
;
542 CephContext
* const cct
;
543 mutable std::mutex connections_lock
;
546 void publish_internal(message_wrapper_t
* message
) {
547 const std::unique_ptr
<message_wrapper_t
> msg_owner(message
);
548 auto& conn
= message
->conn
;
550 if (!conn
->is_ok()) {
551 // connection had an issue while message was in the queue
552 // TODO add error stats
553 ldout(conn
->cct
, 1) << "AMQP publish: connection had an issue while message was in the queue" << dendl
;
555 message
->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED
);
560 if (message
->cb
== nullptr) {
561 // TODO add error stats
562 const auto rc
= amqp_basic_publish(conn
->state
,
564 amqp_cstring_bytes(conn
->exchange
.c_str()),
565 amqp_cstring_bytes(message
->topic
.c_str()),
566 1, // mandatory, TODO: take from conf
569 amqp_cstring_bytes(message
->message
.c_str()));
570 if (rc
== AMQP_STATUS_OK
) {
571 ldout(conn
->cct
, 20) << "AMQP publish (no callback): OK" << dendl
;
574 ldout(conn
->cct
, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc
) << dendl
;
575 // an error occurred, close connection
576 // it will be retied by the main loop
581 amqp_basic_properties_t props
;
583 AMQP_BASIC_DELIVERY_MODE_FLAG
|
584 AMQP_BASIC_REPLY_TO_FLAG
;
585 props
.delivery_mode
= 2; // persistent delivery TODO take from conf
586 props
.reply_to
= conn
->reply_to_queue
;
588 const auto rc
= amqp_basic_publish(conn
->state
,
589 CONFIRMING_CHANNEL_ID
,
590 amqp_cstring_bytes(conn
->exchange
.c_str()),
591 amqp_cstring_bytes(message
->topic
.c_str()),
592 1, // mandatory, TODO: take from conf
595 amqp_cstring_bytes(message
->message
.c_str()));
597 if (rc
== AMQP_STATUS_OK
) {
598 auto const q_len
= conn
->callbacks
.size();
599 if (q_len
< max_inflight
) {
600 ldout(conn
->cct
, 20) << "AMQP publish (with callback, tag=" << conn
->delivery_tag
<< "): OK. Queue has: " << q_len
<< " callbacks" << dendl
;
601 conn
->callbacks
.emplace_back(conn
->delivery_tag
++, message
->cb
);
603 // immediately invoke callback with error
604 ldout(conn
->cct
, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl
;
605 message
->cb(RGW_AMQP_STATUS_MAX_INFLIGHT
);
608 // an error occurred, close connection
609 // it will be retied by the main loop
610 ldout(conn
->cct
, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc
) << dendl
;
612 // immediately invoke callback with error
617 // the managers thread:
618 // (1) empty the queue of messages to be published
619 // (2) loop over all connections and read acks
620 // (3) manages deleted connections
621 // (4) TODO reconnect on connection errors
622 // (5) TODO cleanup timedout callbacks
627 // publish all messages in the queue
628 const auto count
= messages
.consume_all(std::bind(&Manager::publish_internal
, this, std::placeholders::_1
));
630 ConnectionList::iterator conn_it
;
631 ConnectionList::const_iterator end_it
;
633 // thread safe access to the connection list
634 // once the iterators are fetched they are guaranteed to remain valid
635 std::lock_guard
lock(connections_lock
);
636 conn_it
= connections
.begin();
637 end_it
= connections
.end();
639 auto incoming_message
= false;
640 // loop over all connections to read acks
641 for (;conn_it
!= end_it
;) {
643 auto& conn
= conn_it
->second
;
644 // delete the connection if marked for deletion
645 if (conn
->marked_for_deletion
) {
646 ldout(conn
->cct
, 10) << "AMQP run: connection is deleted" << dendl
;
647 conn
->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED
);
648 std::lock_guard
lock(connections_lock
);
649 // erase is safe - does not invalidate any other iterator
650 // lock so no insertion happens at the same time
651 ERASE_AND_CONTINUE(conn_it
, connections
);
654 // try to reconnect the connection if it has an error
655 if (!conn
->is_ok()) {
656 // pointers are used temporarily inside the amqp_connection_info object
657 // as read-only values, hence the assignment, and const_cast are safe here
658 amqp_connection_info info
;
659 info
.host
= const_cast<char*>(conn_it
->first
.host
.c_str());
660 info
.port
= conn_it
->first
.port
;
661 info
.vhost
= const_cast<char*>(conn_it
->first
.vhost
.c_str());
662 info
.user
= const_cast<char*>(conn
->user
.c_str());
663 info
.password
= const_cast<char*>(conn
->password
.c_str());
664 ldout(conn
->cct
, 20) << "AMQP run: retry connection" << dendl
;
665 if (create_connection(conn
, info
)->is_ok() == false) {
666 ldout(conn
->cct
, 10) << "AMQP run: connection (" << to_string(conn_it
->first
) << ") retry failed" << dendl
;
667 // TODO: add error counter for failed retries
668 // TODO: add exponential backoff for retries
670 ldout(conn
->cct
, 10) << "AMQP run: connection (" << to_string(conn_it
->first
) << ") retry successfull" << dendl
;
672 INCREMENT_AND_CONTINUE(conn_it
);
675 const auto rc
= amqp_simple_wait_frame_noblock(conn
->state
, &frame
, &read_timeout
);
677 if (rc
== AMQP_STATUS_TIMEOUT
) {
678 // TODO mark connection as idle
679 INCREMENT_AND_CONTINUE(conn_it
);
682 // this is just to prevent spinning idle, does not indicate that a message
683 // was successfully processed or not
684 incoming_message
= true;
686 // check if error occurred that require reopening the connection
687 if (rc
!= AMQP_STATUS_OK
) {
688 // an error occurred, close connection
689 // it will be retied by the main loop
690 ldout(conn
->cct
, 1) << "AMQP run: connection read error: " << status_to_string(rc
) << dendl
;
692 INCREMENT_AND_CONTINUE(conn_it
);
695 if (frame
.frame_type
!= AMQP_FRAME_METHOD
) {
696 ldout(conn
->cct
, 10) << "AMQP run: ignoring non n/ack messages" << dendl
;
697 // handler is for publish confirmation only - handle only method frames
698 // TODO: add a counter
699 INCREMENT_AND_CONTINUE(conn_it
);
706 switch (frame
.payload
.method
.id
) {
707 case AMQP_BASIC_ACK_METHOD
:
709 result
= AMQP_STATUS_OK
;
710 const auto ack
= (amqp_basic_ack_t
*)frame
.payload
.method
.decoded
;
712 tag
= ack
->delivery_tag
;
713 multiple
= ack
->multiple
;
716 case AMQP_BASIC_NACK_METHOD
:
718 result
= RGW_AMQP_STATUS_BROKER_NACK
;
719 const auto nack
= (amqp_basic_nack_t
*)frame
.payload
.method
.decoded
;
721 tag
= nack
->delivery_tag
;
722 multiple
= nack
->multiple
;
725 case AMQP_CONNECTION_CLOSE_METHOD
:
726 // TODO on channel close, no need to reopen the connection
727 case AMQP_CHANNEL_CLOSE_METHOD
:
729 // other side closed the connection, no need to continue
730 ldout(conn
->cct
, 10) << "AMQP run: connection was closed by broker" << dendl
;
732 INCREMENT_AND_CONTINUE(conn_it
);
734 case AMQP_BASIC_RETURN_METHOD
:
735 // message was not delivered, returned to sender
736 // TODO: add a counter
737 ldout(conn
->cct
, 10) << "AMQP run: message delivery error" << dendl
;
738 INCREMENT_AND_CONTINUE(conn_it
);
742 // TODO: add a counter
743 ldout(conn
->cct
, 10) << "AMQP run: unexpected message" << dendl
;
744 INCREMENT_AND_CONTINUE(conn_it
);
747 const auto& callbacks_end
= conn
->callbacks
.end();
748 const auto& callbacks_begin
= conn
->callbacks
.begin();
749 const auto tag_it
= std::find(callbacks_begin
, callbacks_end
, tag
);
750 if (tag_it
!= callbacks_end
) {
752 // n/ack all up to (and including) the tag
753 ldout(conn
->cct
, 20) << "AMQP run: multiple n/acks received with tag=" << tag
<< " and result=" << result
<< dendl
;
754 auto it
= callbacks_begin
;
755 while (it
->tag
<= tag
&& it
!= conn
->callbacks
.end()) {
756 ldout(conn
->cct
, 20) << "AMQP run: invoking callback with tag=" << it
->tag
<< dendl
;
758 it
= conn
->callbacks
.erase(it
);
761 // n/ack a specific tag
762 ldout(conn
->cct
, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag
<< " and result=" << result
<< dendl
;
764 conn
->callbacks
.erase(tag_it
);
767 // TODO add counter for acks with no callback
768 ldout(conn
->cct
, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag
<< dendl
;
770 // just increment the iterator
773 // if no messages were received or published, sleep for 100ms
774 if (count
== 0 && !incoming_message
) {
775 std::this_thread::sleep_for(std::chrono::milliseconds(100));
780 // used in the dtor for message cleanup
781 static void delete_message(const message_wrapper_t
* message
) {
786 Manager(size_t _max_connections
,
787 size_t _max_inflight
,
791 max_connections(_max_connections
),
792 max_inflight(_max_inflight
),
793 max_queue(_max_queue
),
796 read_timeout
{0, _usec_timeout
},
797 connections(_max_connections
),
802 runner(&Manager::run
, this) {
803 // The hashmap has "max connections" as the initial number of buckets,
804 // and allows for 10 collisions per bucket before rehash.
805 // This is to prevent rehashing so that iterators are not invalidated
806 // when a new connection is added.
807 connections
.max_load_factor(10.0);
808 // give the runner thread a name for easier debugging
809 const auto rc
= ceph_pthread_setname(runner
.native_handle(), "amqp_manager");
814 Manager(const Manager
&) = delete;
815 const Manager
& operator=(const Manager
&) = delete;
817 // stop the main thread
822 // disconnect from a broker
823 bool disconnect(connection_ptr_t
& conn
) {
824 if (!conn
|| stopped
) {
827 conn
->marked_for_deletion
= true;
831 // connect to a broker, or reuse an existing connection if already connected
832 connection_ptr_t
connect(const std::string
& url
, const std::string
& exchange
) {
834 // TODO: increment counter
835 ldout(cct
, 1) << "AMQP connect: manager is stopped" << dendl
;
839 struct amqp_connection_info info
;
840 // cache the URL so that parsing could happen in-place
841 std::vector
<char> url_cache(url
.c_str(), url
.c_str()+url
.size()+1);
842 if (AMQP_STATUS_OK
!= amqp_parse_url(url_cache
.data(), &info
)) {
843 // TODO: increment counter
844 ldout(cct
, 1) << "AMQP connect: URL parsing failed" << dendl
;
848 const connection_id_t
id(info
);
849 std::lock_guard
lock(connections_lock
);
850 const auto it
= connections
.find(id
);
851 if (it
!= connections
.end()) {
852 if (it
->second
->marked_for_deletion
) {
853 // TODO: increment counter
854 ldout(cct
, 1) << "AMQP connect: endpoint marked for deletion" << dendl
;
856 } else if (it
->second
->exchange
!= exchange
) {
857 // TODO: increment counter
858 ldout(cct
, 1) << "AMQP connect: exchange mismatch" << dendl
;
861 // connection found - return even if non-ok
862 ldout(cct
, 20) << "AMQP connect: connection found" << dendl
;
866 // connection not found, creating a new one
867 if (connection_count
>= max_connections
) {
868 // TODO: increment counter
869 ldout(cct
, 1) << "AMQP connect: max connections exceeded" << dendl
;
872 const auto conn
= create_new_connection(info
, exchange
, cct
);
873 // create_new_connection must always return a connection object
874 // even if error occurred during creation.
875 // in such a case the creation will be retried in the main thread
878 ldout(cct
, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count
<< dendl
;
879 ldout(cct
, 10) << "AMQP connect: new connection status is: " << status_to_string(conn
->status
) << dendl
;
880 return connections
.emplace(id
, conn
).first
->second
;
883 // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
884 int publish(connection_ptr_t
& conn
,
885 const std::string
& topic
,
886 const std::string
& message
) {
888 return RGW_AMQP_STATUS_MANAGER_STOPPED
;
890 if (!conn
|| !conn
->is_ok()) {
891 return RGW_AMQP_STATUS_CONNECTION_CLOSED
;
893 if (messages
.push(new message_wrapper_t(conn
, topic
, message
, nullptr))) {
895 return AMQP_STATUS_OK
;
897 return RGW_AMQP_STATUS_QUEUE_FULL
;
900 int publish_with_confirm(connection_ptr_t
& conn
,
901 const std::string
& topic
,
902 const std::string
& message
,
903 reply_callback_t cb
) {
905 return RGW_AMQP_STATUS_MANAGER_STOPPED
;
907 if (!conn
|| !conn
->is_ok()) {
908 return RGW_AMQP_STATUS_CONNECTION_CLOSED
;
910 if (messages
.push(new message_wrapper_t(conn
, topic
, message
, cb
))) {
912 return AMQP_STATUS_OK
;
914 return RGW_AMQP_STATUS_QUEUE_FULL
;
917 // dtor wait for thread to stop
918 // then connection are cleaned-up
922 messages
.consume_all(delete_message
);
925 // get the number of connections
926 size_t get_connection_count() const {
927 return connection_count
;
930 // get the number of in-flight messages
931 size_t get_inflight() const {
933 std::lock_guard
lock(connections_lock
);
934 std::for_each(connections
.begin(), connections
.end(), [&sum
](auto& conn_pair
) {
935 sum
+= conn_pair
.second
->callbacks
.size();
940 // running counter of the queued messages
941 size_t get_queued() const {
945 // running counter of the dequeued messages
946 size_t get_dequeued() const {
952 // note that the manager itself is not a singleton, and multiple instances may co-exist
953 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
954 static Manager
* s_manager
= nullptr;
956 static const size_t MAX_CONNECTIONS_DEFAULT
= 256;
957 static const size_t MAX_INFLIGHT_DEFAULT
= 8192;
958 static const size_t MAX_QUEUE_DEFAULT
= 8192;
960 bool init(CephContext
* cct
) {
964 // TODO: take conf from CephContext
965 s_manager
= new Manager(MAX_CONNECTIONS_DEFAULT
, MAX_INFLIGHT_DEFAULT
, MAX_QUEUE_DEFAULT
, 100, cct
);
974 connection_ptr_t
connect(const std::string
& url
, const std::string
& exchange
) {
975 if (!s_manager
) return nullptr;
976 return s_manager
->connect(url
, exchange
);
979 int publish(connection_ptr_t
& conn
,
980 const std::string
& topic
,
981 const std::string
& message
) {
982 if (!s_manager
) return RGW_AMQP_STATUS_MANAGER_STOPPED
;
983 return s_manager
->publish(conn
, topic
, message
);
986 int publish_with_confirm(connection_ptr_t
& conn
,
987 const std::string
& topic
,
988 const std::string
& message
,
989 reply_callback_t cb
) {
990 if (!s_manager
) return RGW_AMQP_STATUS_MANAGER_STOPPED
;
991 return s_manager
->publish_with_confirm(conn
, topic
, message
, cb
);
994 size_t get_connection_count() {
995 if (!s_manager
) return 0;
996 return s_manager
->get_connection_count();
999 size_t get_inflight() {
1000 if (!s_manager
) return 0;
1001 return s_manager
->get_inflight();
1004 size_t get_queued() {
1005 if (!s_manager
) return 0;
1006 return s_manager
->get_queued();
1009 size_t get_dequeued() {
1010 if (!s_manager
) return 0;
1011 return s_manager
->get_dequeued();
1014 size_t get_max_connections() {
1015 if (!s_manager
) return MAX_CONNECTIONS_DEFAULT
;
1016 return s_manager
->max_connections
;
1019 size_t get_max_inflight() {
1020 if (!s_manager
) return MAX_INFLIGHT_DEFAULT
;
1021 return s_manager
->max_inflight
;
1024 size_t get_max_queue() {
1025 if (!s_manager
) return MAX_QUEUE_DEFAULT
;
1026 return s_manager
->max_queue
;
1029 bool disconnect(connection_ptr_t
& conn
) {
1030 if (!s_manager
) return false;
1031 return s_manager
->disconnect(conn
);