1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
6 #include <amqp_ssl_socket.h>
7 #include <amqp_tcp_socket.h>
8 #include <amqp_framing.h>
9 #include "include/ceph_assert.h"
12 #include <unordered_map>
18 #include <boost/lockfree/queue.hpp>
19 #include "common/dout.h"
20 #include <openssl/ssl.h>
22 #define dout_subsys ceph_subsys_rgw
24 // TODO investigation, not necessarily issues:
25 // (1) in case of single threaded writer context use spsc_queue
26 // (2) support multiple channels
27 // (3) check performance of emptying queue to local list, and go over the list and publish
28 // (4) use std::shared_mutex (c++17) or equivalent for the connections lock
32 // RGW AMQP status codes for publishing
33 static const int RGW_AMQP_STATUS_BROKER_NACK
= -0x1001;
34 static const int RGW_AMQP_STATUS_CONNECTION_CLOSED
= -0x1002;
35 static const int RGW_AMQP_STATUS_QUEUE_FULL
= -0x1003;
36 static const int RGW_AMQP_STATUS_MAX_INFLIGHT
= -0x1004;
37 static const int RGW_AMQP_STATUS_MANAGER_STOPPED
= -0x1005;
38 // RGW AMQP status code for connection opening
39 static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED
= -0x2001;
40 static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED
= -0x2002;
41 static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED
= -0x2003;
42 static const int RGW_AMQP_STATUS_LOGIN_FAILED
= -0x2004;
43 static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
= -0x2005;
44 static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
= -0x2006;
45 static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED
= -0x2007;
46 static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
= -0x2008;
47 static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
= -0x2009;
48 static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED
= -0x2010;
50 static const int RGW_AMQP_RESPONSE_SOCKET_ERROR
= -0x3008;
51 static const int RGW_AMQP_NO_REPLY_CODE
= 0x0;
53 // key class for the connection list
54 struct connection_id_t
{
55 const std::string host
;
57 const std::string vhost
;
58 // constructed from amqp_connection_info struct
59 connection_id_t(const amqp_connection_info
& info
)
60 : host(info
.host
), port(info
.port
), vhost(info
.vhost
) {}
62 // equality operator and hasher functor are needed
63 // so that connection_id_t could be used as key in unordered_map
64 bool operator==(const connection_id_t
& other
) const {
65 return host
== other
.host
&& port
== other
.port
&& vhost
== other
.vhost
;
69 std::size_t operator()(const connection_id_t
& k
) const {
70 return ((std::hash
<std::string
>()(k
.host
)
71 ^ (std::hash
<int>()(k
.port
) << 1)) >> 1)
72 ^ (std::hash
<std::string
>()(k
.vhost
) << 1);
77 std::string
to_string(const connection_id_t
& id
) {
78 return id
.host
+":"+std::to_string(id
.port
)+id
.vhost
;
81 // connection_t state cleaner
82 // could be used for automatic cleanup when getting out of scope
83 class ConnectionCleaner
{
85 amqp_connection_state_t conn
;
87 ConnectionCleaner(amqp_connection_state_t _conn
) : conn(_conn
) {}
88 ~ConnectionCleaner() {
90 amqp_destroy_connection(conn
);
93 // call reset() if cleanup is not needed anymore
99 // struct for holding the callback and its tag in the callback list
100 struct reply_callback_with_tag_t
{
104 reply_callback_with_tag_t(uint64_t _tag
, reply_callback_t _cb
) : tag(_tag
), cb(_cb
) {}
106 bool operator==(uint64_t rhs
) {
111 typedef std::vector
<reply_callback_with_tag_t
> CallbackList
;
113 // struct for holding the connection state object as well as the exchange
114 // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
115 // since references to deleted objects may still exist in the calling code
116 struct connection_t
{
117 std::atomic
<amqp_connection_state_t
> state
;
118 std::string exchange
;
120 std::string password
;
121 amqp_bytes_t reply_to_queue
;
122 uint64_t delivery_tag
;
126 mutable std::atomic
<int> ref_count
;
128 CallbackList callbacks
;
129 ceph::coarse_real_clock::time_point next_reconnect
;
133 boost::optional
<std::string
> ca_location
;
134 utime_t timestamp
= ceph_clock_now();
139 reply_to_queue(amqp_empty_bytes
),
141 status(AMQP_STATUS_OK
),
142 reply_type(AMQP_RESPONSE_NORMAL
),
143 reply_code(RGW_AMQP_NO_REPLY_CODE
),
146 next_reconnect(ceph::coarse_real_clock::now()),
150 ca_location(boost::none
)
153 // cleanup of all internal connection resource
154 // the object can still remain, and internal connection
155 // resources created again on successful reconnection
156 void destroy(int s
) {
158 ConnectionCleaner
clean_state(state
);
160 amqp_bytes_free(reply_to_queue
);
161 reply_to_queue
= amqp_empty_bytes
;
162 // fire all remaining callbacks
163 std::for_each(callbacks
.begin(), callbacks
.end(), [this](auto& cb_tag
) {
165 ldout(cct
, 20) << "AMQP destroy: invoking callback with tag=" << cb_tag
.tag
<< dendl
;
172 return (state
!= nullptr);
175 // dtor also destroys the internals
177 destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED
);
180 friend void intrusive_ptr_add_ref(const connection_t
* p
);
181 friend void intrusive_ptr_release(const connection_t
* p
);
184 // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
185 void intrusive_ptr_add_ref(const connection_t
* p
) {
188 void intrusive_ptr_release(const connection_t
* p
) {
189 if (--p
->ref_count
== 0) {
194 // convert connection info to string
195 std::string
to_string(const amqp_connection_info
& info
) {
196 std::stringstream ss
;
197 ss
<< "connection info:" <<
198 "\nHost: " << info
.host
<<
199 "\nPort: " << info
.port
<<
200 "\nUser: " << info
.user
<<
201 "\nPassword: " << info
.password
<<
202 "\nvhost: " << info
.vhost
<<
203 "\nSSL support: " << info
.ssl
<< std::endl
;
207 // convert reply to error code
208 int reply_to_code(const amqp_rpc_reply_t
& reply
) {
209 switch (reply
.reply_type
) {
210 case AMQP_RESPONSE_NONE
:
211 case AMQP_RESPONSE_NORMAL
:
212 return RGW_AMQP_NO_REPLY_CODE
;
213 case AMQP_RESPONSE_LIBRARY_EXCEPTION
:
214 return reply
.library_error
;
215 case AMQP_RESPONSE_SERVER_EXCEPTION
:
216 if (reply
.reply
.decoded
) {
217 const amqp_connection_close_t
* m
= (amqp_connection_close_t
*)reply
.reply
.decoded
;
218 return m
->reply_code
;
220 return reply
.reply
.id
;
222 return RGW_AMQP_NO_REPLY_CODE
;
225 // convert reply to string
226 std::string
to_string(const amqp_rpc_reply_t
& reply
) {
227 std::stringstream ss
;
228 switch (reply
.reply_type
) {
229 case AMQP_RESPONSE_NORMAL
:
231 case AMQP_RESPONSE_NONE
:
232 return "missing RPC reply type";
233 case AMQP_RESPONSE_LIBRARY_EXCEPTION
:
234 return amqp_error_string2(reply
.library_error
);
235 case AMQP_RESPONSE_SERVER_EXCEPTION
:
237 switch (reply
.reply
.id
) {
238 case AMQP_CONNECTION_CLOSE_METHOD
:
239 ss
<< "server connection error: ";
241 case AMQP_CHANNEL_CLOSE_METHOD
:
242 ss
<< "server channel error: ";
245 ss
<< "server unknown error: ";
248 if (reply
.reply
.decoded
) {
249 amqp_connection_close_t
* m
= (amqp_connection_close_t
*)reply
.reply
.decoded
;
250 ss
<< m
->reply_code
<< " text: " << std::string((char*)m
->reply_text
.bytes
, m
->reply_text
.len
);
255 ss
<< "unknown error, method id: " << reply
.reply
.id
;
260 // convert status enum to string
261 std::string
to_string(amqp_status_enum s
) {
264 return "AMQP_STATUS_OK";
265 case AMQP_STATUS_NO_MEMORY
:
266 return "AMQP_STATUS_NO_MEMORY";
267 case AMQP_STATUS_BAD_AMQP_DATA
:
268 return "AMQP_STATUS_BAD_AMQP_DATA";
269 case AMQP_STATUS_UNKNOWN_CLASS
:
270 return "AMQP_STATUS_UNKNOWN_CLASS";
271 case AMQP_STATUS_UNKNOWN_METHOD
:
272 return "AMQP_STATUS_UNKNOWN_METHOD";
273 case AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED
:
274 return "AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED";
275 case AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION
:
276 return "AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION";
277 case AMQP_STATUS_CONNECTION_CLOSED
:
278 return "AMQP_STATUS_CONNECTION_CLOSED";
279 case AMQP_STATUS_BAD_URL
:
280 return "AMQP_STATUS_BAD_URL";
281 case AMQP_STATUS_SOCKET_ERROR
:
282 return "AMQP_STATUS_SOCKET_ERROR";
283 case AMQP_STATUS_INVALID_PARAMETER
:
284 return "AMQP_STATUS_INVALID_PARAMETER";
285 case AMQP_STATUS_TABLE_TOO_BIG
:
286 return "AMQP_STATUS_TABLE_TOO_BIG";
287 case AMQP_STATUS_WRONG_METHOD
:
288 return "AMQP_STATUS_WRONG_METHOD";
289 case AMQP_STATUS_TIMEOUT
:
290 return "AMQP_STATUS_TIMEOUT";
291 case AMQP_STATUS_TIMER_FAILURE
:
292 return "AMQP_STATUS_TIMER_FAILURE";
293 case AMQP_STATUS_HEARTBEAT_TIMEOUT
:
294 return "AMQP_STATUS_HEARTBEAT_TIMEOUT";
295 case AMQP_STATUS_UNEXPECTED_STATE
:
296 return "AMQP_STATUS_UNEXPECTED_STATE";
297 case AMQP_STATUS_SOCKET_CLOSED
:
298 return "AMQP_STATUS_SOCKET_CLOSED";
299 case AMQP_STATUS_SOCKET_INUSE
:
300 return "AMQP_STATUS_SOCKET_INUSE";
301 case AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD
:
302 return "AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD";
303 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 8, 0, 0)
304 case AMQP_STATUS_UNSUPPORTED
:
305 return "AMQP_STATUS_UNSUPPORTED";
307 case _AMQP_STATUS_NEXT_VALUE
:
308 return "AMQP_STATUS_INTERNAL";
309 case AMQP_STATUS_TCP_ERROR
:
310 return "AMQP_STATUS_TCP_ERROR";
311 case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR
:
312 return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR";
313 case _AMQP_STATUS_TCP_NEXT_VALUE
:
314 return "AMQP_STATUS_INTERNAL";
315 case AMQP_STATUS_SSL_ERROR
:
316 return "AMQP_STATUS_SSL_ERROR";
317 case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED
:
318 return "AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED";
319 case AMQP_STATUS_SSL_PEER_VERIFY_FAILED
:
320 return "AMQP_STATUS_SSL_PEER_VERIFY_FAILED";
321 case AMQP_STATUS_SSL_CONNECTION_FAILED
:
322 return "AMQP_STATUS_SSL_CONNECTION_FAILED";
323 case _AMQP_STATUS_SSL_NEXT_VALUE
:
324 return "AMQP_STATUS_INTERNAL";
325 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 11, 0, 0)
326 case AMQP_STATUS_SSL_SET_ENGINE_FAILED
:
327 return "AMQP_STATUS_SSL_SET_ENGINE_FAILED";
330 return "AMQP_STATUS_UNKNOWN";
334 // TODO: add status_to_string on the connection object to prinf full status
336 // convert int status to string - including RGW specific values
337 std::string
status_to_string(int s
) {
339 case RGW_AMQP_STATUS_BROKER_NACK
:
340 return "RGW_AMQP_STATUS_BROKER_NACK";
341 case RGW_AMQP_STATUS_CONNECTION_CLOSED
:
342 return "RGW_AMQP_STATUS_CONNECTION_CLOSED";
343 case RGW_AMQP_STATUS_QUEUE_FULL
:
344 return "RGW_AMQP_STATUS_QUEUE_FULL";
345 case RGW_AMQP_STATUS_MAX_INFLIGHT
:
346 return "RGW_AMQP_STATUS_MAX_INFLIGHT";
347 case RGW_AMQP_STATUS_MANAGER_STOPPED
:
348 return "RGW_AMQP_STATUS_MANAGER_STOPPED";
349 case RGW_AMQP_STATUS_CONN_ALLOC_FAILED
:
350 return "RGW_AMQP_STATUS_CONN_ALLOC_FAILED";
351 case RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED
:
352 return "RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED";
353 case RGW_AMQP_STATUS_SOCKET_OPEN_FAILED
:
354 return "RGW_AMQP_STATUS_SOCKET_OPEN_FAILED";
355 case RGW_AMQP_STATUS_LOGIN_FAILED
:
356 return "RGW_AMQP_STATUS_LOGIN_FAILED";
357 case RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
:
358 return "RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED";
359 case RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
:
360 return "RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED";
361 case RGW_AMQP_STATUS_Q_DECLARE_FAILED
:
362 return "RGW_AMQP_STATUS_Q_DECLARE_FAILED";
363 case RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
:
364 return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
365 case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
:
366 return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
367 case RGW_AMQP_STATUS_SOCKET_CACERT_FAILED
:
368 return "RGW_AMQP_STATUS_SOCKET_CACERT_FAILED";
370 return to_string((amqp_status_enum
)s
);
373 // check the result from calls and return if error (=null)
374 #define RETURN_ON_ERROR(C, S, OK) \
380 // in case of RPC calls, getting the RPC reply and return if an error is detected
381 #define RETURN_ON_REPLY_ERROR(C, ST, S) { \
382 const auto reply = amqp_get_rpc_reply(ST); \
383 if (reply.reply_type != AMQP_RESPONSE_NORMAL) { \
385 C->reply_type = reply.reply_type; \
386 C->reply_code = reply_to_code(reply); \
391 static const amqp_channel_t CHANNEL_ID
= 1;
392 static const amqp_channel_t CONFIRMING_CHANNEL_ID
= 2;
394 // utility function to create a connection, when the connection object already exists
395 connection_ptr_t
& create_connection(connection_ptr_t
& conn
, const amqp_connection_info
& info
) {
398 // reset all status codes
399 conn
->status
= AMQP_STATUS_OK
;
400 conn
->reply_type
= AMQP_RESPONSE_NORMAL
;
401 conn
->reply_code
= RGW_AMQP_NO_REPLY_CODE
;
403 auto state
= amqp_new_connection();
405 conn
->status
= RGW_AMQP_STATUS_CONN_ALLOC_FAILED
;
408 // make sure that the connection state is cleaned up in case of error
409 ConnectionCleaner
state_guard(state
);
411 // create and open socket
412 amqp_socket_t
*socket
= nullptr;
414 socket
= amqp_ssl_socket_new(state
);
415 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
416 SSL_CTX
* ssl_ctx
= reinterpret_cast<SSL_CTX
*>(amqp_ssl_socket_get_context(socket
));
418 // taken from https://github.com/alanxz/rabbitmq-c/pull/560
420 const struct amqp_socket_class_t
*klass
;
424 struct hack
*h
= reinterpret_cast<struct hack
*>(socket
);
425 SSL_CTX
* ssl_ctx
= h
->ctx
;
427 // ensure system CA certificates get loaded
428 SSL_CTX_set_default_verify_paths(ssl_ctx
);
431 socket
= amqp_tcp_socket_new(state
);
435 conn
->status
= RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED
;
439 if (!conn
->verify_ssl
) {
440 amqp_ssl_socket_set_verify_peer(socket
, 0);
441 amqp_ssl_socket_set_verify_hostname(socket
, 0);
443 if (conn
->ca_location
.has_value()) {
444 const auto s
= amqp_ssl_socket_set_cacert(socket
, conn
->ca_location
.get().c_str());
445 if (s
!= AMQP_STATUS_OK
) {
446 conn
->status
= RGW_AMQP_STATUS_SOCKET_CACERT_FAILED
;
447 conn
->reply_code
= s
;
452 const auto s
= amqp_socket_open(socket
, info
.host
, info
.port
);
454 conn
->status
= RGW_AMQP_STATUS_SOCKET_OPEN_FAILED
;
455 conn
->reply_type
= RGW_AMQP_RESPONSE_SOCKET_ERROR
;
456 conn
->reply_code
= s
;
461 const auto reply
= amqp_login(state
,
463 AMQP_DEFAULT_MAX_CHANNELS
,
464 AMQP_DEFAULT_FRAME_SIZE
,
465 0, // no heartbeat TODO: add conf
466 AMQP_SASL_METHOD_PLAIN
, // TODO: add other types of security
469 if (reply
.reply_type
!= AMQP_RESPONSE_NORMAL
) {
470 conn
->status
= RGW_AMQP_STATUS_LOGIN_FAILED
;
471 conn
->reply_type
= reply
.reply_type
;
472 conn
->reply_code
= reply_to_code(reply
);
478 const auto ok
= amqp_channel_open(state
, CHANNEL_ID
);
479 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
, ok
);
480 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
);
483 const auto ok
= amqp_channel_open(state
, CONFIRMING_CHANNEL_ID
);
484 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
, ok
);
485 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED
);
488 const auto ok
= amqp_confirm_select(state
, CONFIRMING_CHANNEL_ID
);
489 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
, ok
);
490 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED
);
493 // verify that the topic exchange is there
494 // TODO: make this step optional
496 const auto ok
= amqp_exchange_declare(state
,
498 amqp_cstring_bytes(conn
->exchange
.c_str()),
499 amqp_cstring_bytes("topic"),
500 1, // passive - exchange must already exist on broker
502 0, // dont auto-delete
505 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
, ok
);
506 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
);
509 // create queue for confirmations
510 const auto queue_ok
= amqp_queue_declare(state
,
511 CHANNEL_ID
, // use the regular channel for this call
512 amqp_empty_bytes
, // let broker allocate queue name
513 0, // not passive - create the queue
517 amqp_empty_table
// not args TODO add args from conf: TTL, max length etc.
519 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_Q_DECLARE_FAILED
, queue_ok
);
520 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_Q_DECLARE_FAILED
);
522 // define consumption for connection
523 const auto consume_ok
= amqp_basic_consume(state
,
524 CONFIRMING_CHANNEL_ID
,
526 amqp_empty_bytes
, // broker will generate consumer tag
527 1, // messages sent from client are never routed back
528 1, // client does not ack thr acks
529 1, // exclusive access to queue
530 amqp_empty_table
// no parameters
533 RETURN_ON_ERROR(conn
, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
, consume_ok
);
534 RETURN_ON_REPLY_ERROR(conn
, state
, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED
);
535 // broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed
539 conn
->reply_to_queue
= amqp_bytes_malloc_dup(queue_ok
->queue
);
544 // utility function to create a new connection
545 connection_ptr_t
create_new_connection(const amqp_connection_info
& info
,
546 const std::string
& exchange
, bool mandatory_delivery
, CephContext
* cct
, bool verify_ssl
, boost::optional
<const std::string
&> ca_location
) {
547 // create connection state
548 connection_ptr_t conn
= new connection_t
;
549 conn
->exchange
= exchange
;
550 conn
->user
.assign(info
.user
);
551 conn
->password
.assign(info
.password
);
552 conn
->mandatory
= mandatory_delivery
;
554 conn
->use_ssl
= info
.ssl
;
555 conn
->verify_ssl
= verify_ssl
;
556 conn
->ca_location
= ca_location
;
557 return create_connection(conn
, info
);
560 /// struct used for holding messages in the message queue
561 struct message_wrapper_t
{
562 connection_ptr_t conn
;
567 message_wrapper_t(connection_ptr_t
& _conn
,
568 const std::string
& _topic
,
569 const std::string
& _message
,
570 reply_callback_t _cb
) : conn(_conn
), topic(_topic
), message(_message
), cb(_cb
) {}
574 typedef std::unordered_map
<connection_id_t
, connection_ptr_t
, connection_id_t::hasher
> ConnectionList
;
575 typedef boost::lockfree::queue
<message_wrapper_t
*, boost::lockfree::fixed_sized
<true>> MessageQueue
;
577 // macros used inside a loop where an iterator is either incremented or erased
578 #define INCREMENT_AND_CONTINUE(IT) \
582 #define ERASE_AND_CONTINUE(IT,CONTAINER) \
583 IT=CONTAINER.erase(IT); \
584 --connection_count; \
589 const size_t max_connections
;
590 const size_t max_inflight
;
591 const size_t max_queue
;
592 const size_t max_idle_time
;
594 std::atomic
<size_t> connection_count
;
595 std::atomic
<bool> stopped
;
596 struct timeval read_timeout
;
597 ConnectionList connections
;
598 MessageQueue messages
;
599 std::atomic
<size_t> queued
;
600 std::atomic
<size_t> dequeued
;
601 CephContext
* const cct
;
602 mutable std::mutex connections_lock
;
603 const ceph::coarse_real_clock::duration idle_time
;
604 const ceph::coarse_real_clock::duration reconnect_time
;
607 void publish_internal(message_wrapper_t
* message
) {
608 const std::unique_ptr
<message_wrapper_t
> msg_owner(message
);
609 auto& conn
= message
->conn
;
611 conn
->timestamp
= ceph_clock_now();
613 if (!conn
->is_ok()) {
614 // connection had an issue while message was in the queue
615 // TODO add error stats
616 ldout(conn
->cct
, 1) << "AMQP publish: connection had an issue while message was in the queue" << dendl
;
618 message
->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED
);
623 if (message
->cb
== nullptr) {
624 // TODO add error stats
625 const auto rc
= amqp_basic_publish(conn
->state
,
627 amqp_cstring_bytes(conn
->exchange
.c_str()),
628 amqp_cstring_bytes(message
->topic
.c_str()),
629 0, // does not have to be routable
631 nullptr, // no properties needed
632 amqp_cstring_bytes(message
->message
.c_str()));
633 if (rc
== AMQP_STATUS_OK
) {
634 ldout(conn
->cct
, 20) << "AMQP publish (no callback): OK" << dendl
;
637 ldout(conn
->cct
, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc
) << dendl
;
638 // an error occurred, close connection
639 // it will be retied by the main loop
644 amqp_basic_properties_t props
;
646 AMQP_BASIC_DELIVERY_MODE_FLAG
|
647 AMQP_BASIC_REPLY_TO_FLAG
;
648 props
.delivery_mode
= 2; // persistent delivery TODO take from conf
649 props
.reply_to
= conn
->reply_to_queue
;
651 const auto rc
= amqp_basic_publish(conn
->state
,
652 CONFIRMING_CHANNEL_ID
,
653 amqp_cstring_bytes(conn
->exchange
.c_str()),
654 amqp_cstring_bytes(message
->topic
.c_str()),
658 amqp_cstring_bytes(message
->message
.c_str()));
660 if (rc
== AMQP_STATUS_OK
) {
661 auto const q_len
= conn
->callbacks
.size();
662 if (q_len
< max_inflight
) {
663 ldout(conn
->cct
, 20) << "AMQP publish (with callback, tag=" << conn
->delivery_tag
<< "): OK. Queue has: " << q_len
<< " callbacks" << dendl
;
664 conn
->callbacks
.emplace_back(conn
->delivery_tag
++, message
->cb
);
666 // immediately invoke callback with error
667 ldout(conn
->cct
, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl
;
668 message
->cb(RGW_AMQP_STATUS_MAX_INFLIGHT
);
671 // an error occurred, close connection
672 // it will be retied by the main loop
673 ldout(conn
->cct
, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc
) << dendl
;
675 // immediately invoke callback with error
680 // the managers thread:
681 // (1) empty the queue of messages to be published
682 // (2) loop over all connections and read acks
683 // (3) manages deleted connections
684 // (4) TODO reconnect on connection errors
685 // (5) TODO cleanup timedout callbacks
686 void run() noexcept
{
690 // publish all messages in the queue
691 const auto count
= messages
.consume_all(std::bind(&Manager::publish_internal
, this, std::placeholders::_1
));
693 ConnectionList::iterator conn_it
;
694 ConnectionList::const_iterator end_it
;
696 // thread safe access to the connection list
697 // once the iterators are fetched they are guaranteed to remain valid
698 std::lock_guard
lock(connections_lock
);
699 conn_it
= connections
.begin();
700 end_it
= connections
.end();
702 auto incoming_message
= false;
703 // loop over all connections to read acks
704 for (;conn_it
!= end_it
;) {
706 auto& conn
= conn_it
->second
;
707 const auto& conn_key
= conn_it
->first
;
709 if(conn
->timestamp
.sec() + max_idle_time
< ceph_clock_now()) {
710 ldout(conn
->cct
, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl
;
711 ERASE_AND_CONTINUE(conn_it
, connections
);
714 // try to reconnect the connection if it has an error
715 if (!conn
->is_ok()) {
716 const auto now
= ceph::coarse_real_clock::now();
717 if (now
>= conn
->next_reconnect
) {
718 // pointers are used temporarily inside the amqp_connection_info object
719 // as read-only values, hence the assignment, and const_cast are safe here
720 amqp_connection_info info
;
721 info
.host
= const_cast<char*>(conn_key
.host
.c_str());
722 info
.port
= conn_key
.port
;
723 info
.vhost
= const_cast<char*>(conn_key
.vhost
.c_str());
724 info
.user
= const_cast<char*>(conn
->user
.c_str());
725 info
.password
= const_cast<char*>(conn
->password
.c_str());
726 info
.ssl
= conn
->use_ssl
;
727 ldout(conn
->cct
, 20) << "AMQP run: retry connection" << dendl
;
728 if (create_connection(conn
, info
)->is_ok() == false) {
729 ldout(conn
->cct
, 10) << "AMQP run: connection (" << to_string(conn_key
) << ") retry failed. error: " <<
730 status_to_string(conn
->status
) << " (" << conn
->reply_code
<< ")" << dendl
;
731 // TODO: add error counter for failed retries
732 // TODO: add exponential backoff for retries
733 conn
->next_reconnect
= now
+ reconnect_time
;
735 ldout(conn
->cct
, 10) << "AMQP run: connection (" << to_string(conn_key
) << ") retry successfull" << dendl
;
738 INCREMENT_AND_CONTINUE(conn_it
);
741 const auto rc
= amqp_simple_wait_frame_noblock(conn
->state
, &frame
, &read_timeout
);
743 if (rc
== AMQP_STATUS_TIMEOUT
) {
744 // TODO mark connection as idle
745 INCREMENT_AND_CONTINUE(conn_it
);
748 // this is just to prevent spinning idle, does not indicate that a message
749 // was successfully processed or not
750 incoming_message
= true;
752 // check if error occurred that require reopening the connection
753 if (rc
!= AMQP_STATUS_OK
) {
754 // an error occurred, close connection
755 // it will be retied by the main loop
756 ldout(conn
->cct
, 1) << "AMQP run: connection read error: " << status_to_string(rc
) << dendl
;
758 INCREMENT_AND_CONTINUE(conn_it
);
761 if (frame
.frame_type
!= AMQP_FRAME_METHOD
) {
762 ldout(conn
->cct
, 10) << "AMQP run: ignoring non n/ack messages. frame type: "
763 << unsigned(frame
.frame_type
) << dendl
;
764 // handler is for publish confirmation only - handle only method frames
765 INCREMENT_AND_CONTINUE(conn_it
);
772 switch (frame
.payload
.method
.id
) {
773 case AMQP_BASIC_ACK_METHOD
:
775 result
= AMQP_STATUS_OK
;
776 const auto ack
= (amqp_basic_ack_t
*)frame
.payload
.method
.decoded
;
778 tag
= ack
->delivery_tag
;
779 multiple
= ack
->multiple
;
782 case AMQP_BASIC_NACK_METHOD
:
784 result
= RGW_AMQP_STATUS_BROKER_NACK
;
785 const auto nack
= (amqp_basic_nack_t
*)frame
.payload
.method
.decoded
;
787 tag
= nack
->delivery_tag
;
788 multiple
= nack
->multiple
;
791 case AMQP_BASIC_REJECT_METHOD
:
793 result
= RGW_AMQP_STATUS_BROKER_NACK
;
794 const auto reject
= (amqp_basic_reject_t
*)frame
.payload
.method
.decoded
;
795 tag
= reject
->delivery_tag
;
799 case AMQP_CONNECTION_CLOSE_METHOD
:
800 // TODO on channel close, no need to reopen the connection
801 case AMQP_CHANNEL_CLOSE_METHOD
:
803 // other side closed the connection, no need to continue
804 ldout(conn
->cct
, 10) << "AMQP run: connection was closed by broker" << dendl
;
806 INCREMENT_AND_CONTINUE(conn_it
);
808 case AMQP_BASIC_RETURN_METHOD
:
809 // message was not delivered, returned to sender
810 ldout(conn
->cct
, 10) << "AMQP run: message was not routable" << dendl
;
811 INCREMENT_AND_CONTINUE(conn_it
);
815 ldout(conn
->cct
, 10) << "AMQP run: unexpected message" << dendl
;
816 INCREMENT_AND_CONTINUE(conn_it
);
819 const auto& callbacks_end
= conn
->callbacks
.end();
820 const auto& callbacks_begin
= conn
->callbacks
.begin();
821 const auto tag_it
= std::find(callbacks_begin
, callbacks_end
, tag
);
822 if (tag_it
!= callbacks_end
) {
824 // n/ack all up to (and including) the tag
825 ldout(conn
->cct
, 20) << "AMQP run: multiple n/acks received with tag=" << tag
<< " and result=" << result
<< dendl
;
826 auto it
= callbacks_begin
;
827 while (it
->tag
<= tag
&& it
!= conn
->callbacks
.end()) {
828 ldout(conn
->cct
, 20) << "AMQP run: invoking callback with tag=" << it
->tag
<< dendl
;
830 it
= conn
->callbacks
.erase(it
);
833 // n/ack a specific tag
834 ldout(conn
->cct
, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag
<< " and result=" << result
<< dendl
;
836 conn
->callbacks
.erase(tag_it
);
839 ldout(conn
->cct
, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag
<< dendl
;
841 // just increment the iterator
844 // if no messages were received or published, sleep for 100ms
845 if (count
== 0 && !incoming_message
) {
846 std::this_thread::sleep_for(idle_time
);
851 // used in the dtor for message cleanup
852 static void delete_message(const message_wrapper_t
* message
) {
857 Manager(size_t _max_connections
,
858 size_t _max_inflight
,
861 unsigned reconnect_time_ms
,
862 unsigned idle_time_ms
,
864 max_connections(_max_connections
),
865 max_inflight(_max_inflight
),
866 max_queue(_max_queue
),
870 read_timeout
{0, _usec_timeout
},
871 connections(_max_connections
),
876 idle_time(std::chrono::milliseconds(idle_time_ms
)),
877 reconnect_time(std::chrono::milliseconds(reconnect_time_ms
)),
878 runner(&Manager::run
, this) {
879 // The hashmap has "max connections" as the initial number of buckets,
880 // and allows for 10 collisions per bucket before rehash.
881 // This is to prevent rehashing so that iterators are not invalidated
882 // when a new connection is added.
883 connections
.max_load_factor(10.0);
884 // give the runner thread a name for easier debugging
885 const auto rc
= ceph_pthread_setname(runner
.native_handle(), "amqp_manager");
890 Manager(const Manager
&) = delete;
891 const Manager
& operator=(const Manager
&) = delete;
893 // stop the main thread
898 // connect to a broker, or reuse an existing connection if already connected
899 connection_ptr_t
connect(const std::string
& url
, const std::string
& exchange
, bool mandatory_delivery
, bool verify_ssl
,
900 boost::optional
<const std::string
&> ca_location
) {
902 ldout(cct
, 1) << "AMQP connect: manager is stopped" << dendl
;
906 struct amqp_connection_info info
;
907 // cache the URL so that parsing could happen in-place
908 std::vector
<char> url_cache(url
.c_str(), url
.c_str()+url
.size()+1);
909 const auto retcode
= amqp_parse_url(url_cache
.data(), &info
);
910 if (AMQP_STATUS_OK
!= retcode
) {
911 ldout(cct
, 1) << "AMQP connect: URL parsing failed. error: " << retcode
<< dendl
;
915 const connection_id_t
id(info
);
916 std::lock_guard
lock(connections_lock
);
917 const auto it
= connections
.find(id
);
918 if (it
!= connections
.end()) {
919 if (it
->second
->exchange
!= exchange
) {
920 ldout(cct
, 1) << "AMQP connect: exchange mismatch" << dendl
;
923 // connection found - return even if non-ok
924 ldout(cct
, 20) << "AMQP connect: connection found" << dendl
;
928 // connection not found, creating a new one
929 if (connection_count
>= max_connections
) {
930 ldout(cct
, 1) << "AMQP connect: max connections exceeded" << dendl
;
933 const auto conn
= create_new_connection(info
, exchange
, mandatory_delivery
, cct
, verify_ssl
, ca_location
);
934 if (!conn
->is_ok()) {
935 ldout(cct
, 10) << "AMQP connect: connection (" << to_string(id
) << ") creation failed. error:" <<
936 status_to_string(conn
->status
) << "(" << conn
->reply_code
<< ")" << dendl
;
938 // create_new_connection must always return a connection object
939 // even if error occurred during creation.
940 // in such a case the creation will be retried in the main thread
943 ldout(cct
, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count
<< dendl
;
944 ldout(cct
, 10) << "AMQP connect: new connection status is: " << status_to_string(conn
->status
) << dendl
;
945 return connections
.emplace(id
, conn
).first
->second
;
948 // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
949 int publish(connection_ptr_t
& conn
,
950 const std::string
& topic
,
951 const std::string
& message
) {
953 ldout(cct
, 1) << "AMQP publish: manager is not running" << dendl
;
954 return RGW_AMQP_STATUS_MANAGER_STOPPED
;
956 if (!conn
|| !conn
->is_ok()) {
957 ldout(cct
, 1) << "AMQP publish: no connection" << dendl
;
958 return RGW_AMQP_STATUS_CONNECTION_CLOSED
;
960 if (messages
.push(new message_wrapper_t(conn
, topic
, message
, nullptr))) {
962 return AMQP_STATUS_OK
;
964 ldout(cct
, 1) << "AMQP publish: queue is full" << dendl
;
965 return RGW_AMQP_STATUS_QUEUE_FULL
;
968 int publish_with_confirm(connection_ptr_t
& conn
,
969 const std::string
& topic
,
970 const std::string
& message
,
971 reply_callback_t cb
) {
973 ldout(cct
, 1) << "AMQP publish_with_confirm: manager is not running" << dendl
;
974 return RGW_AMQP_STATUS_MANAGER_STOPPED
;
976 if (!conn
|| !conn
->is_ok()) {
977 ldout(cct
, 1) << "AMQP publish_with_confirm: no connection" << dendl
;
978 return RGW_AMQP_STATUS_CONNECTION_CLOSED
;
980 if (messages
.push(new message_wrapper_t(conn
, topic
, message
, cb
))) {
982 return AMQP_STATUS_OK
;
984 ldout(cct
, 1) << "AMQP publish_with_confirm: queue is full" << dendl
;
985 return RGW_AMQP_STATUS_QUEUE_FULL
;
988 // dtor wait for thread to stop
989 // then connection are cleaned-up
993 messages
.consume_all(delete_message
);
996 // get the number of connections
997 size_t get_connection_count() const {
998 return connection_count
;
1001 // get the number of in-flight messages
1002 size_t get_inflight() const {
1004 std::lock_guard
lock(connections_lock
);
1005 std::for_each(connections
.begin(), connections
.end(), [&sum
](auto& conn_pair
) {
1006 // concurrent access to the callback vector is safe without locking
1007 sum
+= conn_pair
.second
->callbacks
.size();
1012 // running counter of the queued messages
1013 size_t get_queued() const {
1017 // running counter of the dequeued messages
1018 size_t get_dequeued() const {
1023 // singleton manager
1024 // note that the manager itself is not a singleton, and multiple instances may co-exist
1025 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
1026 static Manager
* s_manager
= nullptr;
1028 static const size_t MAX_CONNECTIONS_DEFAULT
= 256;
1029 static const size_t MAX_INFLIGHT_DEFAULT
= 8192;
1030 static const size_t MAX_QUEUE_DEFAULT
= 8192;
1031 static const long READ_TIMEOUT_USEC
= 100;
1032 static const unsigned IDLE_TIME_MS
= 100;
1033 static const unsigned RECONNECT_TIME_MS
= 100;
1035 bool init(CephContext
* cct
) {
1039 // TODO: take conf from CephContext
1040 s_manager
= new Manager(MAX_CONNECTIONS_DEFAULT
, MAX_INFLIGHT_DEFAULT
, MAX_QUEUE_DEFAULT
,
1041 READ_TIMEOUT_USEC
, IDLE_TIME_MS
, RECONNECT_TIME_MS
, cct
);
1047 s_manager
= nullptr;
1050 connection_ptr_t
connect(const std::string
& url
, const std::string
& exchange
, bool mandatory_delivery
, bool verify_ssl
,
1051 boost::optional
<const std::string
&> ca_location
) {
1052 if (!s_manager
) return nullptr;
1053 return s_manager
->connect(url
, exchange
, mandatory_delivery
, verify_ssl
, ca_location
);
1056 int publish(connection_ptr_t
& conn
,
1057 const std::string
& topic
,
1058 const std::string
& message
) {
1059 if (!s_manager
) return RGW_AMQP_STATUS_MANAGER_STOPPED
;
1060 return s_manager
->publish(conn
, topic
, message
);
1063 int publish_with_confirm(connection_ptr_t
& conn
,
1064 const std::string
& topic
,
1065 const std::string
& message
,
1066 reply_callback_t cb
) {
1067 if (!s_manager
) return RGW_AMQP_STATUS_MANAGER_STOPPED
;
1068 return s_manager
->publish_with_confirm(conn
, topic
, message
, cb
);
1071 size_t get_connection_count() {
1072 if (!s_manager
) return 0;
1073 return s_manager
->get_connection_count();
1076 size_t get_inflight() {
1077 if (!s_manager
) return 0;
1078 return s_manager
->get_inflight();
1081 size_t get_queued() {
1082 if (!s_manager
) return 0;
1083 return s_manager
->get_queued();
1086 size_t get_dequeued() {
1087 if (!s_manager
) return 0;
1088 return s_manager
->get_dequeued();
1091 size_t get_max_connections() {
1092 if (!s_manager
) return MAX_CONNECTIONS_DEFAULT
;
1093 return s_manager
->max_connections
;
1096 size_t get_max_inflight() {
1097 if (!s_manager
) return MAX_INFLIGHT_DEFAULT
;
1098 return s_manager
->max_inflight
;
1101 size_t get_max_queue() {
1102 if (!s_manager
) return MAX_QUEUE_DEFAULT
;
1103 return s_manager
->max_queue
;