]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_amqp.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rgw / rgw_amqp.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_amqp.h"
5 #include <amqp.h>
6 #include <amqp_ssl_socket.h>
7 #include <amqp_tcp_socket.h>
8 #include <amqp_framing.h>
9 #include "include/ceph_assert.h"
10 #include <sstream>
11 #include <cstring>
12 #include <unordered_map>
13 #include <string>
14 #include <vector>
15 #include <thread>
16 #include <atomic>
17 #include <mutex>
18 #include <boost/lockfree/queue.hpp>
19 #include <boost/functional/hash.hpp>
20 #include "common/dout.h"
21 #include <openssl/ssl.h>
22
23 #define dout_subsys ceph_subsys_rgw
24
25 // TODO investigation, not necessarily issues:
26 // (1) in case of single threaded writer context use spsc_queue
27 // (2) support multiple channels
28 // (3) check performance of emptying queue to local list, and go over the list and publish
29 // (4) use std::shared_mutex (c++17) or equivalent for the connections lock
30
31 namespace rgw::amqp {
32
33 // RGW AMQP status codes for publishing
34 static const int RGW_AMQP_STATUS_BROKER_NACK = -0x1001;
35 static const int RGW_AMQP_STATUS_CONNECTION_CLOSED = -0x1002;
36 static const int RGW_AMQP_STATUS_QUEUE_FULL = -0x1003;
37 static const int RGW_AMQP_STATUS_MAX_INFLIGHT = -0x1004;
38 static const int RGW_AMQP_STATUS_MANAGER_STOPPED = -0x1005;
39 // RGW AMQP status code for connection opening
40 static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED = -0x2001;
41 static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED = -0x2002;
42 static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED = -0x2003;
43 static const int RGW_AMQP_STATUS_LOGIN_FAILED = -0x2004;
44 static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED = -0x2005;
45 static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED = -0x2006;
46 static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED = -0x2007;
47 static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008;
48 static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009;
49 static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED = -0x2010;
50
51 static const int RGW_AMQP_RESPONSE_SOCKET_ERROR = -0x3008;
52 static const int RGW_AMQP_NO_REPLY_CODE = 0x0;
53
54 // the amqp_connection_info struct does not hold any memory and just points to the URL string
55 // so, strings are copied into connection_id_t
56 connection_id_t::connection_id_t(const amqp_connection_info& info, const std::string& _exchange)
57 : host(info.host), port(info.port), vhost(info.vhost), exchange(_exchange), ssl(info.ssl) {}
58
59 // equality operator and hasher functor are needed
60 // so that connection_id_t could be used as key in unordered_map
61 bool operator==(const connection_id_t& lhs, const connection_id_t& rhs) {
62 return lhs.host == rhs.host && lhs.port == rhs.port &&
63 lhs.vhost == rhs.vhost && lhs.exchange == rhs.exchange;
64 }
65
66 struct connection_id_hasher {
67 std::size_t operator()(const connection_id_t& k) const {
68 std::size_t h = 0;
69 boost::hash_combine(h, k.host);
70 boost::hash_combine(h, k.port);
71 boost::hash_combine(h, k.vhost);
72 boost::hash_combine(h, k.exchange);
73 return h;
74 }
75 };
76
77 std::string to_string(const connection_id_t& id) {
78 return fmt::format("{}://{}:{}{}?exchange={}",
79 id.ssl ? "amqps" : "amqp",
80 id.host, id.port, id.vhost, id.exchange);
81 }
82
83 // automatically cleans amqp state when gets out of scope
84 class ConnectionCleaner {
85 private:
86 amqp_connection_state_t state;
87 public:
88 ConnectionCleaner(amqp_connection_state_t _state) : state(_state) {}
89 ~ConnectionCleaner() {
90 if (state) {
91 amqp_destroy_connection(state);
92 }
93 }
94 // call reset() if cleanup is not needed anymore
95 void reset() {
96 state = nullptr;
97 }
98 };
99
100 // struct for holding the callback and its tag in the callback list
101 struct reply_callback_with_tag_t {
102 uint64_t tag;
103 reply_callback_t cb;
104
105 reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {}
106
107 bool operator==(uint64_t rhs) {
108 return tag == rhs;
109 }
110 };
111
112 typedef std::vector<reply_callback_with_tag_t> CallbackList;
113
114 // struct for holding the connection state object as well as the exchange
115 struct connection_t {
116 CephContext* cct = nullptr;
117 amqp_connection_state_t state = nullptr;
118 amqp_bytes_t reply_to_queue = amqp_empty_bytes;
119 uint64_t delivery_tag = 1;
120 int status = AMQP_STATUS_OK;
121 int reply_type = AMQP_RESPONSE_NORMAL;
122 int reply_code = RGW_AMQP_NO_REPLY_CODE;
123 CallbackList callbacks;
124 ceph::coarse_real_clock::time_point next_reconnect = ceph::coarse_real_clock::now();
125 bool mandatory = false;
126 const bool use_ssl = false;
127 std::string user;
128 std::string password;
129 bool verify_ssl = true;
130 boost::optional<std::string> ca_location;
131 utime_t timestamp = ceph_clock_now();
132
133 connection_t(CephContext* _cct, const amqp_connection_info& info, bool _verify_ssl, boost::optional<const std::string&> _ca_location) :
134 cct(_cct), use_ssl(info.ssl), user(info.user), password(info.password), verify_ssl(_verify_ssl), ca_location(_ca_location) {}
135
136 // cleanup of all internal connection resource
137 // the object can still remain, and internal connection
138 // resources created again on successful reconnection
139 void destroy(int s) {
140 status = s;
141 ConnectionCleaner clean_state(state);
142 state = nullptr;
143 amqp_bytes_free(reply_to_queue);
144 reply_to_queue = amqp_empty_bytes;
145 // fire all remaining callbacks
146 std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
147 cb_tag.cb(status);
148 ldout(cct, 20) << "AMQP destroy: invoking callback with tag=" << cb_tag.tag << dendl;
149 });
150 callbacks.clear();
151 delivery_tag = 1;
152 }
153
154 bool is_ok() const {
155 return (state != nullptr);
156 }
157
158 // dtor also destroys the internals
159 ~connection_t() {
160 destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
161 }
162 };
163
164 // convert connection info to string
165 std::string to_string(const amqp_connection_info& info) {
166 std::stringstream ss;
167 ss << "connection info:" <<
168 "\nHost: " << info.host <<
169 "\nPort: " << info.port <<
170 "\nUser: " << info.user <<
171 "\nPassword: " << info.password <<
172 "\nvhost: " << info.vhost <<
173 "\nSSL support: " << info.ssl << std::endl;
174 return ss.str();
175 }
176
177 // convert reply to error code
178 int reply_to_code(const amqp_rpc_reply_t& reply) {
179 switch (reply.reply_type) {
180 case AMQP_RESPONSE_NONE:
181 case AMQP_RESPONSE_NORMAL:
182 return RGW_AMQP_NO_REPLY_CODE;
183 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
184 return reply.library_error;
185 case AMQP_RESPONSE_SERVER_EXCEPTION:
186 if (reply.reply.decoded) {
187 const amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
188 return m->reply_code;
189 }
190 return reply.reply.id;
191 }
192 return RGW_AMQP_NO_REPLY_CODE;
193 }
194
195 // convert reply to string
196 std::string to_string(const amqp_rpc_reply_t& reply) {
197 std::stringstream ss;
198 switch (reply.reply_type) {
199 case AMQP_RESPONSE_NORMAL:
200 return "";
201 case AMQP_RESPONSE_NONE:
202 return "missing RPC reply type";
203 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
204 return amqp_error_string2(reply.library_error);
205 case AMQP_RESPONSE_SERVER_EXCEPTION:
206 {
207 switch (reply.reply.id) {
208 case AMQP_CONNECTION_CLOSE_METHOD:
209 ss << "server connection error: ";
210 break;
211 case AMQP_CHANNEL_CLOSE_METHOD:
212 ss << "server channel error: ";
213 break;
214 default:
215 ss << "server unknown error: ";
216 break;
217 }
218 if (reply.reply.decoded) {
219 amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
220 ss << m->reply_code << " text: " << std::string((char*)m->reply_text.bytes, m->reply_text.len);
221 }
222 return ss.str();
223 }
224 default:
225 ss << "unknown error, method id: " << reply.reply.id;
226 return ss.str();
227 }
228 }
229
230 // convert status enum to string
231 std::string to_string(amqp_status_enum s) {
232 switch (s) {
233 case AMQP_STATUS_OK:
234 return "AMQP_STATUS_OK";
235 case AMQP_STATUS_NO_MEMORY:
236 return "AMQP_STATUS_NO_MEMORY";
237 case AMQP_STATUS_BAD_AMQP_DATA:
238 return "AMQP_STATUS_BAD_AMQP_DATA";
239 case AMQP_STATUS_UNKNOWN_CLASS:
240 return "AMQP_STATUS_UNKNOWN_CLASS";
241 case AMQP_STATUS_UNKNOWN_METHOD:
242 return "AMQP_STATUS_UNKNOWN_METHOD";
243 case AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED:
244 return "AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED";
245 case AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION:
246 return "AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION";
247 case AMQP_STATUS_CONNECTION_CLOSED:
248 return "AMQP_STATUS_CONNECTION_CLOSED";
249 case AMQP_STATUS_BAD_URL:
250 return "AMQP_STATUS_BAD_URL";
251 case AMQP_STATUS_SOCKET_ERROR:
252 return "AMQP_STATUS_SOCKET_ERROR";
253 case AMQP_STATUS_INVALID_PARAMETER:
254 return "AMQP_STATUS_INVALID_PARAMETER";
255 case AMQP_STATUS_TABLE_TOO_BIG:
256 return "AMQP_STATUS_TABLE_TOO_BIG";
257 case AMQP_STATUS_WRONG_METHOD:
258 return "AMQP_STATUS_WRONG_METHOD";
259 case AMQP_STATUS_TIMEOUT:
260 return "AMQP_STATUS_TIMEOUT";
261 case AMQP_STATUS_TIMER_FAILURE:
262 return "AMQP_STATUS_TIMER_FAILURE";
263 case AMQP_STATUS_HEARTBEAT_TIMEOUT:
264 return "AMQP_STATUS_HEARTBEAT_TIMEOUT";
265 case AMQP_STATUS_UNEXPECTED_STATE:
266 return "AMQP_STATUS_UNEXPECTED_STATE";
267 case AMQP_STATUS_SOCKET_CLOSED:
268 return "AMQP_STATUS_SOCKET_CLOSED";
269 case AMQP_STATUS_SOCKET_INUSE:
270 return "AMQP_STATUS_SOCKET_INUSE";
271 case AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD:
272 return "AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD";
273 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 8, 0, 0)
274 case AMQP_STATUS_UNSUPPORTED:
275 return "AMQP_STATUS_UNSUPPORTED";
276 #endif
277 case _AMQP_STATUS_NEXT_VALUE:
278 return "AMQP_STATUS_INTERNAL";
279 case AMQP_STATUS_TCP_ERROR:
280 return "AMQP_STATUS_TCP_ERROR";
281 case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR:
282 return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR";
283 case _AMQP_STATUS_TCP_NEXT_VALUE:
284 return "AMQP_STATUS_INTERNAL";
285 case AMQP_STATUS_SSL_ERROR:
286 return "AMQP_STATUS_SSL_ERROR";
287 case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED:
288 return "AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED";
289 case AMQP_STATUS_SSL_PEER_VERIFY_FAILED:
290 return "AMQP_STATUS_SSL_PEER_VERIFY_FAILED";
291 case AMQP_STATUS_SSL_CONNECTION_FAILED:
292 return "AMQP_STATUS_SSL_CONNECTION_FAILED";
293 case _AMQP_STATUS_SSL_NEXT_VALUE:
294 return "AMQP_STATUS_INTERNAL";
295 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 11, 0, 0)
296 case AMQP_STATUS_SSL_SET_ENGINE_FAILED:
297 return "AMQP_STATUS_SSL_SET_ENGINE_FAILED";
298 #endif
299 default:
300 return "AMQP_STATUS_UNKNOWN";
301 }
302 }
303
304 // TODO: add status_to_string on the connection object to prinf full status
305
306 // convert int status to string - including RGW specific values
307 std::string status_to_string(int s) {
308 switch (s) {
309 case RGW_AMQP_STATUS_BROKER_NACK:
310 return "RGW_AMQP_STATUS_BROKER_NACK";
311 case RGW_AMQP_STATUS_CONNECTION_CLOSED:
312 return "RGW_AMQP_STATUS_CONNECTION_CLOSED";
313 case RGW_AMQP_STATUS_QUEUE_FULL:
314 return "RGW_AMQP_STATUS_QUEUE_FULL";
315 case RGW_AMQP_STATUS_MAX_INFLIGHT:
316 return "RGW_AMQP_STATUS_MAX_INFLIGHT";
317 case RGW_AMQP_STATUS_MANAGER_STOPPED:
318 return "RGW_AMQP_STATUS_MANAGER_STOPPED";
319 case RGW_AMQP_STATUS_CONN_ALLOC_FAILED:
320 return "RGW_AMQP_STATUS_CONN_ALLOC_FAILED";
321 case RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED:
322 return "RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED";
323 case RGW_AMQP_STATUS_SOCKET_OPEN_FAILED:
324 return "RGW_AMQP_STATUS_SOCKET_OPEN_FAILED";
325 case RGW_AMQP_STATUS_LOGIN_FAILED:
326 return "RGW_AMQP_STATUS_LOGIN_FAILED";
327 case RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED:
328 return "RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED";
329 case RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED:
330 return "RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED";
331 case RGW_AMQP_STATUS_Q_DECLARE_FAILED:
332 return "RGW_AMQP_STATUS_Q_DECLARE_FAILED";
333 case RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED:
334 return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
335 case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED:
336 return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
337 case RGW_AMQP_STATUS_SOCKET_CACERT_FAILED:
338 return "RGW_AMQP_STATUS_SOCKET_CACERT_FAILED";
339 }
340 return to_string((amqp_status_enum)s);
341 }
342
343 // check the result from calls and return if error (=null)
344 #define RETURN_ON_ERROR(C, S, OK) \
345 if (!OK) { \
346 C->status = S; \
347 return false; \
348 }
349
350 // in case of RPC calls, getting the RPC reply and return if an error is detected
351 #define RETURN_ON_REPLY_ERROR(C, ST, S) { \
352 const auto reply = amqp_get_rpc_reply(ST); \
353 if (reply.reply_type != AMQP_RESPONSE_NORMAL) { \
354 C->status = S; \
355 C->reply_type = reply.reply_type; \
356 C->reply_code = reply_to_code(reply); \
357 return false; \
358 } \
359 }
360
361 static const amqp_channel_t CHANNEL_ID = 1;
362 static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2;
363
364 // utility function to create a connection, when the connection object already exists
365 bool new_state(connection_t* conn, const connection_id_t& conn_id) {
366 // state must be null at this point
367 ceph_assert(!conn->state);
368 // reset all status codes
369 conn->status = AMQP_STATUS_OK;
370 conn->reply_type = AMQP_RESPONSE_NORMAL;
371 conn->reply_code = RGW_AMQP_NO_REPLY_CODE;
372
373 auto state = amqp_new_connection();
374 if (!state) {
375 conn->status = RGW_AMQP_STATUS_CONN_ALLOC_FAILED;
376 return false;
377 }
378 // make sure that the connection state is cleaned up in case of error
379 ConnectionCleaner state_guard(state);
380
381 // create and open socket
382 amqp_socket_t *socket = nullptr;
383 if (conn->use_ssl) {
384 socket = amqp_ssl_socket_new(state);
385 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
386 SSL_CTX* ssl_ctx = reinterpret_cast<SSL_CTX*>(amqp_ssl_socket_get_context(socket));
387 #else
388 // taken from https://github.com/alanxz/rabbitmq-c/pull/560
389 struct hack {
390 const struct amqp_socket_class_t *klass;
391 SSL_CTX *ctx;
392 };
393
394 struct hack *h = reinterpret_cast<struct hack*>(socket);
395 SSL_CTX* ssl_ctx = h->ctx;
396 #endif
397 // ensure system CA certificates get loaded
398 SSL_CTX_set_default_verify_paths(ssl_ctx);
399 }
400 else {
401 socket = amqp_tcp_socket_new(state);
402 }
403
404 if (!socket) {
405 conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED;
406 return false;
407 }
408 if (conn->use_ssl) {
409 if (!conn->verify_ssl) {
410 amqp_ssl_socket_set_verify_peer(socket, 0);
411 amqp_ssl_socket_set_verify_hostname(socket, 0);
412 }
413 if (conn->ca_location.has_value()) {
414 const auto s = amqp_ssl_socket_set_cacert(socket, conn->ca_location.get().c_str());
415 if (s != AMQP_STATUS_OK) {
416 conn->status = RGW_AMQP_STATUS_SOCKET_CACERT_FAILED;
417 conn->reply_code = s;
418 return false;
419 }
420 }
421 }
422 const auto s = amqp_socket_open(socket, conn_id.host.c_str(), conn_id.port);
423 if (s < 0) {
424 conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED;
425 conn->reply_type = RGW_AMQP_RESPONSE_SOCKET_ERROR;
426 conn->reply_code = s;
427 return false;
428 }
429
430 // login to broker
431 const auto reply = amqp_login(state,
432 conn_id.vhost.c_str(),
433 AMQP_DEFAULT_MAX_CHANNELS,
434 AMQP_DEFAULT_FRAME_SIZE,
435 0, // no heartbeat TODO: add conf
436 AMQP_SASL_METHOD_PLAIN, // TODO: add other types of security
437 conn->user.c_str(),
438 conn->password.c_str());
439 if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
440 conn->status = RGW_AMQP_STATUS_LOGIN_FAILED;
441 conn->reply_type = reply.reply_type;
442 conn->reply_code = reply_to_code(reply);
443 return false;
444 }
445
446 // open channels
447 {
448 const auto ok = amqp_channel_open(state, CHANNEL_ID);
449 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED, ok);
450 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED);
451 }
452 {
453 const auto ok = amqp_channel_open(state, CONFIRMING_CHANNEL_ID);
454 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED, ok);
455 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED);
456 }
457 {
458 const auto ok = amqp_confirm_select(state, CONFIRMING_CHANNEL_ID);
459 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED, ok);
460 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED);
461 }
462
463 // verify that the topic exchange is there
464 // TODO: make this step optional
465 {
466 const auto ok = amqp_exchange_declare(state,
467 CHANNEL_ID,
468 amqp_cstring_bytes(conn_id.exchange.c_str()),
469 amqp_cstring_bytes("topic"),
470 1, // passive - exchange must already exist on broker
471 1, // durable
472 0, // dont auto-delete
473 0, // not internal
474 amqp_empty_table);
475 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED, ok);
476 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED);
477 }
478 {
479 // create queue for confirmations
480 const auto queue_ok = amqp_queue_declare(state,
481 CHANNEL_ID, // use the regular channel for this call
482 amqp_empty_bytes, // let broker allocate queue name
483 0, // not passive - create the queue
484 0, // not durable
485 1, // exclusive
486 1, // auto-delete
487 amqp_empty_table // not args TODO add args from conf: TTL, max length etc.
488 );
489 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_Q_DECLARE_FAILED, queue_ok);
490 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_Q_DECLARE_FAILED);
491
492 // define consumption for connection
493 const auto consume_ok = amqp_basic_consume(state,
494 CONFIRMING_CHANNEL_ID,
495 queue_ok->queue,
496 amqp_empty_bytes, // broker will generate consumer tag
497 1, // messages sent from client are never routed back
498 1, // client does not ack thr acks
499 1, // exclusive access to queue
500 amqp_empty_table // no parameters
501 );
502
503 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED, consume_ok);
504 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED);
505 // broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed
506
507 state_guard.reset();
508 conn->state = state;
509 conn->reply_to_queue = amqp_bytes_malloc_dup(queue_ok->queue);
510 }
511 return true;
512 }
513
514 /// struct used for holding messages in the message queue
515 struct message_wrapper_t {
516 connection_id_t conn_id;
517 std::string topic;
518 std::string message;
519 reply_callback_t cb;
520
521 message_wrapper_t(const connection_id_t& _conn_id,
522 const std::string& _topic,
523 const std::string& _message,
524 reply_callback_t _cb) : conn_id(_conn_id), topic(_topic), message(_message), cb(_cb) {}
525 };
526
527 using connection_t_ptr = std::unique_ptr<connection_t>;
528
529 typedef std::unordered_map<connection_id_t, connection_t_ptr, connection_id_hasher> ConnectionList;
530 typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
531
532 // macros used inside a loop where an iterator is either incremented or erased
533 #define INCREMENT_AND_CONTINUE(IT) \
534 ++IT; \
535 continue;
536
537 #define ERASE_AND_CONTINUE(IT,CONTAINER) \
538 IT=CONTAINER.erase(IT); \
539 --connection_count; \
540 continue;
541
542 class Manager {
543 public:
544 const size_t max_connections;
545 const size_t max_inflight;
546 const size_t max_queue;
547 const size_t max_idle_time;
548 private:
549 std::atomic<size_t> connection_count;
550 std::atomic<bool> stopped;
551 struct timeval read_timeout;
552 ConnectionList connections;
553 MessageQueue messages;
554 std::atomic<size_t> queued;
555 std::atomic<size_t> dequeued;
556 CephContext* const cct;
557 mutable std::mutex connections_lock;
558 const ceph::coarse_real_clock::duration idle_time;
559 const ceph::coarse_real_clock::duration reconnect_time;
560 std::thread runner;
561
562 void publish_internal(message_wrapper_t* message) {
563 const std::unique_ptr<message_wrapper_t> msg_owner(message);
564 const auto& conn_id = message->conn_id;
565 auto conn_it = connections.find(conn_id);
566 if (conn_it == connections.end()) {
567 ldout(cct, 1) << "AMQP publish: connection '" << to_string(conn_id) << "' not found" << dendl;
568 if (message->cb) {
569 message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
570 }
571 return;
572 }
573
574 auto& conn = conn_it->second;
575
576 conn->timestamp = ceph_clock_now();
577
578 if (!conn->is_ok()) {
579 // connection had an issue while message was in the queue
580 ldout(cct, 1) << "AMQP publish: connection '" << to_string(conn_id) << "' is closed" << dendl;
581 if (message->cb) {
582 message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
583 }
584 return;
585 }
586
587 if (message->cb == nullptr) {
588 const auto rc = amqp_basic_publish(conn->state,
589 CHANNEL_ID,
590 amqp_cstring_bytes(conn_id.exchange.c_str()),
591 amqp_cstring_bytes(message->topic.c_str()),
592 0, // does not have to be routable
593 0, // not immediate
594 nullptr, // no properties needed
595 amqp_cstring_bytes(message->message.c_str()));
596 if (rc == AMQP_STATUS_OK) {
597 ldout(cct, 20) << "AMQP publish (no callback): OK" << dendl;
598 return;
599 }
600 ldout(cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl;
601 // an error occurred, close connection
602 // it will be retied by the main loop
603 conn->destroy(rc);
604 return;
605 }
606
607 amqp_basic_properties_t props;
608 props._flags =
609 AMQP_BASIC_DELIVERY_MODE_FLAG |
610 AMQP_BASIC_REPLY_TO_FLAG;
611 props.delivery_mode = 2; // persistent delivery TODO take from conf
612 props.reply_to = conn->reply_to_queue;
613
614 const auto rc = amqp_basic_publish(conn->state,
615 CONFIRMING_CHANNEL_ID,
616 amqp_cstring_bytes(conn_id.exchange.c_str()),
617 amqp_cstring_bytes(message->topic.c_str()),
618 conn->mandatory,
619 0, // not immediate
620 &props,
621 amqp_cstring_bytes(message->message.c_str()));
622
623 if (rc == AMQP_STATUS_OK) {
624 auto const q_len = conn->callbacks.size();
625 if (q_len < max_inflight) {
626 ldout(cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
627 conn->callbacks.emplace_back(conn->delivery_tag++, message->cb);
628 } else {
629 // immediately invoke callback with error
630 ldout(cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
631 message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT);
632 }
633 } else {
634 // an error occurred, close connection
635 // it will be retied by the main loop
636 ldout(cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
637 conn->destroy(rc);
638 // immediately invoke callback with error
639 message->cb(rc);
640 }
641 }
642
643 // the managers thread:
644 // (1) empty the queue of messages to be published
645 // (2) loop over all connections and read acks
646 // (3) manages deleted connections
647 // (4) TODO reconnect on connection errors
648 // (5) TODO cleanup timedout callbacks
649 void run() noexcept {
650 amqp_frame_t frame;
651 while (!stopped) {
652
653 // publish all messages in the queue
654 const auto count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
655 dequeued += count;
656 ConnectionList::iterator conn_it;
657 ConnectionList::const_iterator end_it;
658 {
659 // thread safe access to the connection list
660 // once the iterators are fetched they are guaranteed to remain valid
661 std::lock_guard lock(connections_lock);
662 conn_it = connections.begin();
663 end_it = connections.end();
664 }
665 auto incoming_message = false;
666 // loop over all connections to read acks
667 for (;conn_it != end_it;) {
668
669 const auto& conn_id = conn_it->first;
670 auto& conn = conn_it->second;
671
672 if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
673 ldout(cct, 20) << "AMQP run: Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
674 ERASE_AND_CONTINUE(conn_it, connections);
675 }
676
677 // try to reconnect the connection if it has an error
678 if (!conn->is_ok()) {
679 const auto now = ceph::coarse_real_clock::now();
680 if (now >= conn->next_reconnect) {
681 // pointers are used temporarily inside the amqp_connection_info object
682 // as read-only values, hence the assignment, and const_cast are safe here
683 ldout(cct, 20) << "AMQP run: retry connection" << dendl;
684 if (!new_state(conn.get(), conn_id)) {
685 ldout(cct, 10) << "AMQP run: connection '" << to_string(conn_id) << "' retry failed. error: " <<
686 status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl;
687 // TODO: add error counter for failed retries
688 // TODO: add exponential backoff for retries
689 conn->next_reconnect = now + reconnect_time;
690 } else {
691 ldout(cct, 10) << "AMQP run: connection '" << to_string(conn_id) << "' retry successfull" << dendl;
692 }
693 }
694 INCREMENT_AND_CONTINUE(conn_it);
695 }
696
697 const auto rc = amqp_simple_wait_frame_noblock(conn->state, &frame, &read_timeout);
698
699 if (rc == AMQP_STATUS_TIMEOUT) {
700 // TODO mark connection as idle
701 INCREMENT_AND_CONTINUE(conn_it);
702 }
703
704 // this is just to prevent spinning idle, does not indicate that a message
705 // was successfully processed or not
706 incoming_message = true;
707
708 // check if error occurred that require reopening the connection
709 if (rc != AMQP_STATUS_OK) {
710 // an error occurred, close connection
711 // it will be retied by the main loop
712 ldout(cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
713 conn->destroy(rc);
714 INCREMENT_AND_CONTINUE(conn_it);
715 }
716
717 if (frame.frame_type != AMQP_FRAME_METHOD) {
718 ldout(cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: "
719 << unsigned(frame.frame_type) << dendl;
720 // handler is for publish confirmation only - handle only method frames
721 INCREMENT_AND_CONTINUE(conn_it);
722 }
723
724 uint64_t tag;
725 bool multiple;
726 int result;
727
728 switch (frame.payload.method.id) {
729 case AMQP_BASIC_ACK_METHOD:
730 {
731 result = AMQP_STATUS_OK;
732 const auto ack = (amqp_basic_ack_t*)frame.payload.method.decoded;
733 ceph_assert(ack);
734 tag = ack->delivery_tag;
735 multiple = ack->multiple;
736 break;
737 }
738 case AMQP_BASIC_NACK_METHOD:
739 {
740 result = RGW_AMQP_STATUS_BROKER_NACK;
741 const auto nack = (amqp_basic_nack_t*)frame.payload.method.decoded;
742 ceph_assert(nack);
743 tag = nack->delivery_tag;
744 multiple = nack->multiple;
745 break;
746 }
747 case AMQP_BASIC_REJECT_METHOD:
748 {
749 result = RGW_AMQP_STATUS_BROKER_NACK;
750 const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;
751 tag = reject->delivery_tag;
752 multiple = false;
753 break;
754 }
755 case AMQP_CONNECTION_CLOSE_METHOD:
756 // TODO on channel close, no need to reopen the connection
757 case AMQP_CHANNEL_CLOSE_METHOD:
758 {
759 // other side closed the connection, no need to continue
760 ldout(cct, 10) << "AMQP run: connection was closed by broker" << dendl;
761 conn->destroy(rc);
762 INCREMENT_AND_CONTINUE(conn_it);
763 }
764 case AMQP_BASIC_RETURN_METHOD:
765 // message was not delivered, returned to sender
766 ldout(cct, 10) << "AMQP run: message was not routable" << dendl;
767 INCREMENT_AND_CONTINUE(conn_it);
768 break;
769 default:
770 // unexpected method
771 ldout(cct, 10) << "AMQP run: unexpected message" << dendl;
772 INCREMENT_AND_CONTINUE(conn_it);
773 }
774
775 const auto tag_it = std::find(conn->callbacks.begin(), conn->callbacks.end(), tag);
776 if (tag_it != conn->callbacks.end()) {
777 if (multiple) {
778 // n/ack all up to (and including) the tag
779 ldout(cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
780 auto it = conn->callbacks.begin();
781 while (it->tag <= tag && it != conn->callbacks.end()) {
782 ldout(cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
783 it->cb(result);
784 it = conn->callbacks.erase(it);
785 }
786 } else {
787 // n/ack a specific tag
788 ldout(cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
789 tag_it->cb(result);
790 conn->callbacks.erase(tag_it);
791 }
792 } else {
793 ldout(cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
794 }
795 // just increment the iterator
796 ++conn_it;
797 }
798 // if no messages were received or published, sleep for 100ms
799 if (count == 0 && !incoming_message) {
800 std::this_thread::sleep_for(idle_time);
801 }
802 }
803 }
804
805 // used in the dtor for message cleanup
806 static void delete_message(const message_wrapper_t* message) {
807 delete message;
808 }
809
810 public:
811 Manager(size_t _max_connections,
812 size_t _max_inflight,
813 size_t _max_queue,
814 long _usec_timeout,
815 unsigned reconnect_time_ms,
816 unsigned idle_time_ms,
817 CephContext* _cct) :
818 max_connections(_max_connections),
819 max_inflight(_max_inflight),
820 max_queue(_max_queue),
821 max_idle_time(30),
822 connection_count(0),
823 stopped(false),
824 read_timeout{0, _usec_timeout},
825 connections(_max_connections),
826 messages(max_queue),
827 queued(0),
828 dequeued(0),
829 cct(_cct),
830 idle_time(std::chrono::milliseconds(idle_time_ms)),
831 reconnect_time(std::chrono::milliseconds(reconnect_time_ms)),
832 runner(&Manager::run, this) {
833 // The hashmap has "max connections" as the initial number of buckets,
834 // and allows for 10 collisions per bucket before rehash.
835 // This is to prevent rehashing so that iterators are not invalidated
836 // when a new connection is added.
837 connections.max_load_factor(10.0);
838 // give the runner thread a name for easier debugging
839 const auto rc = ceph_pthread_setname(runner.native_handle(), "amqp_manager");
840 ceph_assert(rc==0);
841 }
842
843 // non copyable
844 Manager(const Manager&) = delete;
845 const Manager& operator=(const Manager&) = delete;
846
847 // stop the main thread
848 void stop() {
849 stopped = true;
850 }
851
852 // connect to a broker, or reuse an existing connection if already connected
853 bool connect(connection_id_t& id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
854 boost::optional<const std::string&> ca_location) {
855 if (stopped) {
856 ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
857 return false;
858 }
859
860 amqp_connection_info info;
861 // cache the URL so that parsing could happen in-place
862 std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
863 const auto retcode = amqp_parse_url(url_cache.data(), &info);
864 if (AMQP_STATUS_OK != retcode) {
865 ldout(cct, 1) << "AMQP connect: URL parsing failed. error: " << retcode << dendl;
866 return false;
867 }
868 connection_id_t tmp_id(info, exchange);
869
870 std::lock_guard lock(connections_lock);
871 const auto it = connections.find(tmp_id);
872 if (it != connections.end()) {
873 // connection found - return even if non-ok
874 ldout(cct, 20) << "AMQP connect: connection found" << dendl;
875 id = it->first;
876 return true;
877 }
878
879 // connection not found, creating a new one
880 if (connection_count >= max_connections) {
881 ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
882 return false;
883 }
884 // if error occurred during creation the creation will be retried in the main thread
885 ++connection_count;
886 auto conn = connections.emplace(tmp_id, std::make_unique<connection_t>(cct, info, verify_ssl, ca_location)).first->second.get();
887 ldout(cct, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count << dendl;
888 if (!new_state(conn, tmp_id)) {
889 ldout(cct, 1) << "AMQP connect: new connection '" << to_string(tmp_id) << "' is created. but state creation failed (will retry). error: " <<
890 status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl;
891 }
892 id = std::move(tmp_id);
893 return true;
894 }
895
896 // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
897 int publish(const connection_id_t& conn_id,
898 const std::string& topic,
899 const std::string& message) {
900 if (stopped) {
901 ldout(cct, 1) << "AMQP publish: manager is not running" << dendl;
902 return RGW_AMQP_STATUS_MANAGER_STOPPED;
903 }
904 auto wrapper = std::make_unique<message_wrapper_t>(conn_id, topic, message, nullptr);
905 if (messages.push(wrapper.get())) {
906 std::ignore = wrapper.release();
907 ++queued;
908 return AMQP_STATUS_OK;
909 }
910 ldout(cct, 1) << "AMQP publish: queue is full" << dendl;
911 return RGW_AMQP_STATUS_QUEUE_FULL;
912 }
913
914 int publish_with_confirm(const connection_id_t& conn_id,
915 const std::string& topic,
916 const std::string& message,
917 reply_callback_t cb) {
918 if (stopped) {
919 ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl;
920 return RGW_AMQP_STATUS_MANAGER_STOPPED;
921 }
922 auto wrapper = std::make_unique<message_wrapper_t>(conn_id, topic, message, cb);
923 if (messages.push(wrapper.get())) {
924 std::ignore = wrapper.release();
925 ++queued;
926 return AMQP_STATUS_OK;
927 }
928 ldout(cct, 1) << "AMQP publish_with_confirm: queue is full" << dendl;
929 return RGW_AMQP_STATUS_QUEUE_FULL;
930 }
931
932 // dtor wait for thread to stop
933 // then connection are cleaned-up
934 ~Manager() {
935 stopped = true;
936 runner.join();
937 messages.consume_all(delete_message);
938 }
939
940 // get the number of connections
941 size_t get_connection_count() const {
942 return connection_count;
943 }
944
945 // get the number of in-flight messages
946 size_t get_inflight() const {
947 size_t sum = 0;
948 std::lock_guard lock(connections_lock);
949 std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
950 // concurrent access to the callback vector is safe without locking
951 sum += conn_pair.second->callbacks.size();
952 });
953 return sum;
954 }
955
956 // running counter of the queued messages
957 size_t get_queued() const {
958 return queued;
959 }
960
961 // running counter of the dequeued messages
962 size_t get_dequeued() const {
963 return dequeued;
964 }
965 };
966
967 // singleton manager
968 // note that the manager itself is not a singleton, and multiple instances may co-exist
969 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
970 static Manager* s_manager = nullptr;
971
972 static const size_t MAX_CONNECTIONS_DEFAULT = 256;
973 static const size_t MAX_INFLIGHT_DEFAULT = 8192;
974 static const size_t MAX_QUEUE_DEFAULT = 8192;
975 static const long READ_TIMEOUT_USEC = 100;
976 static const unsigned IDLE_TIME_MS = 100;
977 static const unsigned RECONNECT_TIME_MS = 100;
978
979 bool init(CephContext* cct) {
980 if (s_manager) {
981 return false;
982 }
983 // TODO: take conf from CephContext
984 s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT,
985 READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct);
986 return true;
987 }
988
989 void shutdown() {
990 delete s_manager;
991 s_manager = nullptr;
992 }
993
994 bool connect(connection_id_t& conn_id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
995 boost::optional<const std::string&> ca_location) {
996 if (!s_manager) return false;
997 return s_manager->connect(conn_id, url, exchange, mandatory_delivery, verify_ssl, ca_location);
998 }
999
1000 int publish(const connection_id_t& conn_id,
1001 const std::string& topic,
1002 const std::string& message) {
1003 if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
1004 return s_manager->publish(conn_id, topic, message);
1005 }
1006
1007 int publish_with_confirm(const connection_id_t& conn_id,
1008 const std::string& topic,
1009 const std::string& message,
1010 reply_callback_t cb) {
1011 if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
1012 return s_manager->publish_with_confirm(conn_id, topic, message, cb);
1013 }
1014
1015 size_t get_connection_count() {
1016 if (!s_manager) return 0;
1017 return s_manager->get_connection_count();
1018 }
1019
1020 size_t get_inflight() {
1021 if (!s_manager) return 0;
1022 return s_manager->get_inflight();
1023 }
1024
1025 size_t get_queued() {
1026 if (!s_manager) return 0;
1027 return s_manager->get_queued();
1028 }
1029
1030 size_t get_dequeued() {
1031 if (!s_manager) return 0;
1032 return s_manager->get_dequeued();
1033 }
1034
1035 size_t get_max_connections() {
1036 if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
1037 return s_manager->max_connections;
1038 }
1039
1040 size_t get_max_inflight() {
1041 if (!s_manager) return MAX_INFLIGHT_DEFAULT;
1042 return s_manager->max_inflight;
1043 }
1044
1045 size_t get_max_queue() {
1046 if (!s_manager) return MAX_QUEUE_DEFAULT;
1047 return s_manager->max_queue;
1048 }
1049
1050 } // namespace amqp
1051