]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_amqp.cc
186bdd54ad6fe4271912633c6ba5fd334cc3452b
[ceph.git] / ceph / src / rgw / rgw_amqp.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_amqp.h"
5 #include <amqp.h>
6 #include <amqp_ssl_socket.h>
7 #include <amqp_tcp_socket.h>
8 #include <amqp_framing.h>
9 #include "include/ceph_assert.h"
10 #include <sstream>
11 #include <cstring>
12 #include <unordered_map>
13 #include <string>
14 #include <vector>
15 #include <thread>
16 #include <atomic>
17 #include <mutex>
18 #include <boost/lockfree/queue.hpp>
19 #include "common/dout.h"
20 #include <openssl/ssl.h>
21
22 #define dout_subsys ceph_subsys_rgw
23
24 // TODO investigation, not necessarily issues:
25 // (1) in case of single threaded writer context use spsc_queue
26 // (2) support multiple channels
27 // (3) check performance of emptying queue to local list, and go over the list and publish
28 // (4) use std::shared_mutex (c++17) or equivalent for the connections lock
29
30 namespace rgw::amqp {
31
32 // RGW AMQP status codes for publishing
33 static const int RGW_AMQP_STATUS_BROKER_NACK = -0x1001;
34 static const int RGW_AMQP_STATUS_CONNECTION_CLOSED = -0x1002;
35 static const int RGW_AMQP_STATUS_QUEUE_FULL = -0x1003;
36 static const int RGW_AMQP_STATUS_MAX_INFLIGHT = -0x1004;
37 static const int RGW_AMQP_STATUS_MANAGER_STOPPED = -0x1005;
38 // RGW AMQP status code for connection opening
39 static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED = -0x2001;
40 static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED = -0x2002;
41 static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED = -0x2003;
42 static const int RGW_AMQP_STATUS_LOGIN_FAILED = -0x2004;
43 static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED = -0x2005;
44 static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED = -0x2006;
45 static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED = -0x2007;
46 static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008;
47 static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009;
48 static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED = -0x2010;
49
50 static const int RGW_AMQP_RESPONSE_SOCKET_ERROR = -0x3008;
51 static const int RGW_AMQP_NO_REPLY_CODE = 0x0;
52
53 // key class for the connection list
54 struct connection_id_t {
55 const std::string host;
56 const int port;
57 const std::string vhost;
58 // constructed from amqp_connection_info struct
59 connection_id_t(const amqp_connection_info& info)
60 : host(info.host), port(info.port), vhost(info.vhost) {}
61
62 // equality operator and hasher functor are needed
63 // so that connection_id_t could be used as key in unordered_map
64 bool operator==(const connection_id_t& other) const {
65 return host == other.host && port == other.port && vhost == other.vhost;
66 }
67
68 struct hasher {
69 std::size_t operator()(const connection_id_t& k) const {
70 return ((std::hash<std::string>()(k.host)
71 ^ (std::hash<int>()(k.port) << 1)) >> 1)
72 ^ (std::hash<std::string>()(k.vhost) << 1);
73 }
74 };
75 };
76
77 std::string to_string(const connection_id_t& id) {
78 return id.host+":"+std::to_string(id.port)+id.vhost;
79 }
80
81 // connection_t state cleaner
82 // could be used for automatic cleanup when getting out of scope
83 class ConnectionCleaner {
84 private:
85 amqp_connection_state_t conn;
86 public:
87 ConnectionCleaner(amqp_connection_state_t _conn) : conn(_conn) {}
88 ~ConnectionCleaner() {
89 if (conn) {
90 amqp_destroy_connection(conn);
91 }
92 }
93 // call reset() if cleanup is not needed anymore
94 void reset() {
95 conn = nullptr;
96 }
97 };
98
99 // struct for holding the callback and its tag in the callback list
100 struct reply_callback_with_tag_t {
101 uint64_t tag;
102 reply_callback_t cb;
103
104 reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {}
105
106 bool operator==(uint64_t rhs) {
107 return tag == rhs;
108 }
109 };
110
111 typedef std::vector<reply_callback_with_tag_t> CallbackList;
112
113 // struct for holding the connection state object as well as the exchange
114 // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
115 // since references to deleted objects may still exist in the calling code
116 struct connection_t {
117 std::atomic<amqp_connection_state_t> state;
118 std::string exchange;
119 std::string user;
120 std::string password;
121 amqp_bytes_t reply_to_queue;
122 uint64_t delivery_tag;
123 int status;
124 int reply_type;
125 int reply_code;
126 mutable std::atomic<int> ref_count;
127 CephContext* cct;
128 CallbackList callbacks;
129 ceph::coarse_real_clock::time_point next_reconnect;
130 bool mandatory;
131 bool use_ssl;
132 bool verify_ssl;
133 boost::optional<std::string> ca_location;
134 utime_t timestamp = ceph_clock_now();
135
136 // default ctor
137 connection_t() :
138 state(nullptr),
139 reply_to_queue(amqp_empty_bytes),
140 delivery_tag(1),
141 status(AMQP_STATUS_OK),
142 reply_type(AMQP_RESPONSE_NORMAL),
143 reply_code(RGW_AMQP_NO_REPLY_CODE),
144 ref_count(0),
145 cct(nullptr),
146 next_reconnect(ceph::coarse_real_clock::now()),
147 mandatory(false),
148 use_ssl(false),
149 verify_ssl(false),
150 ca_location(boost::none)
151 {}
152
153 // cleanup of all internal connection resource
154 // the object can still remain, and internal connection
155 // resources created again on successful reconnection
156 void destroy(int s) {
157 status = s;
158 ConnectionCleaner clean_state(state);
159 state = nullptr;
160 amqp_bytes_free(reply_to_queue);
161 reply_to_queue = amqp_empty_bytes;
162 // fire all remaining callbacks
163 std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
164 cb_tag.cb(status);
165 ldout(cct, 20) << "AMQP destroy: invoking callback with tag=" << cb_tag.tag << dendl;
166 });
167 callbacks.clear();
168 delivery_tag = 1;
169 }
170
171 bool is_ok() const {
172 return (state != nullptr);
173 }
174
175 // dtor also destroys the internals
176 ~connection_t() {
177 destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
178 }
179
180 friend void intrusive_ptr_add_ref(const connection_t* p);
181 friend void intrusive_ptr_release(const connection_t* p);
182 };
183
184 // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
185 void intrusive_ptr_add_ref(const connection_t* p) {
186 ++p->ref_count;
187 }
188 void intrusive_ptr_release(const connection_t* p) {
189 if (--p->ref_count == 0) {
190 delete p;
191 }
192 }
193
194 // convert connection info to string
195 std::string to_string(const amqp_connection_info& info) {
196 std::stringstream ss;
197 ss << "connection info:" <<
198 "\nHost: " << info.host <<
199 "\nPort: " << info.port <<
200 "\nUser: " << info.user <<
201 "\nPassword: " << info.password <<
202 "\nvhost: " << info.vhost <<
203 "\nSSL support: " << info.ssl << std::endl;
204 return ss.str();
205 }
206
207 // convert reply to error code
208 int reply_to_code(const amqp_rpc_reply_t& reply) {
209 switch (reply.reply_type) {
210 case AMQP_RESPONSE_NONE:
211 case AMQP_RESPONSE_NORMAL:
212 return RGW_AMQP_NO_REPLY_CODE;
213 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
214 return reply.library_error;
215 case AMQP_RESPONSE_SERVER_EXCEPTION:
216 if (reply.reply.decoded) {
217 const amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
218 return m->reply_code;
219 }
220 return reply.reply.id;
221 }
222 return RGW_AMQP_NO_REPLY_CODE;
223 }
224
225 // convert reply to string
226 std::string to_string(const amqp_rpc_reply_t& reply) {
227 std::stringstream ss;
228 switch (reply.reply_type) {
229 case AMQP_RESPONSE_NORMAL:
230 return "";
231 case AMQP_RESPONSE_NONE:
232 return "missing RPC reply type";
233 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
234 return amqp_error_string2(reply.library_error);
235 case AMQP_RESPONSE_SERVER_EXCEPTION:
236 {
237 switch (reply.reply.id) {
238 case AMQP_CONNECTION_CLOSE_METHOD:
239 ss << "server connection error: ";
240 break;
241 case AMQP_CHANNEL_CLOSE_METHOD:
242 ss << "server channel error: ";
243 break;
244 default:
245 ss << "server unknown error: ";
246 break;
247 }
248 if (reply.reply.decoded) {
249 amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
250 ss << m->reply_code << " text: " << std::string((char*)m->reply_text.bytes, m->reply_text.len);
251 }
252 return ss.str();
253 }
254 default:
255 ss << "unknown error, method id: " << reply.reply.id;
256 return ss.str();
257 }
258 }
259
260 // convert status enum to string
261 std::string to_string(amqp_status_enum s) {
262 switch (s) {
263 case AMQP_STATUS_OK:
264 return "AMQP_STATUS_OK";
265 case AMQP_STATUS_NO_MEMORY:
266 return "AMQP_STATUS_NO_MEMORY";
267 case AMQP_STATUS_BAD_AMQP_DATA:
268 return "AMQP_STATUS_BAD_AMQP_DATA";
269 case AMQP_STATUS_UNKNOWN_CLASS:
270 return "AMQP_STATUS_UNKNOWN_CLASS";
271 case AMQP_STATUS_UNKNOWN_METHOD:
272 return "AMQP_STATUS_UNKNOWN_METHOD";
273 case AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED:
274 return "AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED";
275 case AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION:
276 return "AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION";
277 case AMQP_STATUS_CONNECTION_CLOSED:
278 return "AMQP_STATUS_CONNECTION_CLOSED";
279 case AMQP_STATUS_BAD_URL:
280 return "AMQP_STATUS_BAD_URL";
281 case AMQP_STATUS_SOCKET_ERROR:
282 return "AMQP_STATUS_SOCKET_ERROR";
283 case AMQP_STATUS_INVALID_PARAMETER:
284 return "AMQP_STATUS_INVALID_PARAMETER";
285 case AMQP_STATUS_TABLE_TOO_BIG:
286 return "AMQP_STATUS_TABLE_TOO_BIG";
287 case AMQP_STATUS_WRONG_METHOD:
288 return "AMQP_STATUS_WRONG_METHOD";
289 case AMQP_STATUS_TIMEOUT:
290 return "AMQP_STATUS_TIMEOUT";
291 case AMQP_STATUS_TIMER_FAILURE:
292 return "AMQP_STATUS_TIMER_FAILURE";
293 case AMQP_STATUS_HEARTBEAT_TIMEOUT:
294 return "AMQP_STATUS_HEARTBEAT_TIMEOUT";
295 case AMQP_STATUS_UNEXPECTED_STATE:
296 return "AMQP_STATUS_UNEXPECTED_STATE";
297 case AMQP_STATUS_SOCKET_CLOSED:
298 return "AMQP_STATUS_SOCKET_CLOSED";
299 case AMQP_STATUS_SOCKET_INUSE:
300 return "AMQP_STATUS_SOCKET_INUSE";
301 case AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD:
302 return "AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD";
303 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 8, 0, 0)
304 case AMQP_STATUS_UNSUPPORTED:
305 return "AMQP_STATUS_UNSUPPORTED";
306 #endif
307 case _AMQP_STATUS_NEXT_VALUE:
308 return "AMQP_STATUS_INTERNAL";
309 case AMQP_STATUS_TCP_ERROR:
310 return "AMQP_STATUS_TCP_ERROR";
311 case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR:
312 return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR";
313 case _AMQP_STATUS_TCP_NEXT_VALUE:
314 return "AMQP_STATUS_INTERNAL";
315 case AMQP_STATUS_SSL_ERROR:
316 return "AMQP_STATUS_SSL_ERROR";
317 case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED:
318 return "AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED";
319 case AMQP_STATUS_SSL_PEER_VERIFY_FAILED:
320 return "AMQP_STATUS_SSL_PEER_VERIFY_FAILED";
321 case AMQP_STATUS_SSL_CONNECTION_FAILED:
322 return "AMQP_STATUS_SSL_CONNECTION_FAILED";
323 case _AMQP_STATUS_SSL_NEXT_VALUE:
324 return "AMQP_STATUS_INTERNAL";
325 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 11, 0, 0)
326 case AMQP_STATUS_SSL_SET_ENGINE_FAILED:
327 return "AMQP_STATUS_SSL_SET_ENGINE_FAILED";
328 #endif
329 default:
330 return "AMQP_STATUS_UNKNOWN";
331 }
332 }
333
334 // TODO: add status_to_string on the connection object to prinf full status
335
336 // convert int status to string - including RGW specific values
337 std::string status_to_string(int s) {
338 switch (s) {
339 case RGW_AMQP_STATUS_BROKER_NACK:
340 return "RGW_AMQP_STATUS_BROKER_NACK";
341 case RGW_AMQP_STATUS_CONNECTION_CLOSED:
342 return "RGW_AMQP_STATUS_CONNECTION_CLOSED";
343 case RGW_AMQP_STATUS_QUEUE_FULL:
344 return "RGW_AMQP_STATUS_QUEUE_FULL";
345 case RGW_AMQP_STATUS_MAX_INFLIGHT:
346 return "RGW_AMQP_STATUS_MAX_INFLIGHT";
347 case RGW_AMQP_STATUS_MANAGER_STOPPED:
348 return "RGW_AMQP_STATUS_MANAGER_STOPPED";
349 case RGW_AMQP_STATUS_CONN_ALLOC_FAILED:
350 return "RGW_AMQP_STATUS_CONN_ALLOC_FAILED";
351 case RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED:
352 return "RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED";
353 case RGW_AMQP_STATUS_SOCKET_OPEN_FAILED:
354 return "RGW_AMQP_STATUS_SOCKET_OPEN_FAILED";
355 case RGW_AMQP_STATUS_LOGIN_FAILED:
356 return "RGW_AMQP_STATUS_LOGIN_FAILED";
357 case RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED:
358 return "RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED";
359 case RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED:
360 return "RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED";
361 case RGW_AMQP_STATUS_Q_DECLARE_FAILED:
362 return "RGW_AMQP_STATUS_Q_DECLARE_FAILED";
363 case RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED:
364 return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
365 case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED:
366 return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
367 case RGW_AMQP_STATUS_SOCKET_CACERT_FAILED:
368 return "RGW_AMQP_STATUS_SOCKET_CACERT_FAILED";
369 }
370 return to_string((amqp_status_enum)s);
371 }
372
373 // check the result from calls and return if error (=null)
374 #define RETURN_ON_ERROR(C, S, OK) \
375 if (!OK) { \
376 C->status = S; \
377 return C; \
378 }
379
380 // in case of RPC calls, getting the RPC reply and return if an error is detected
381 #define RETURN_ON_REPLY_ERROR(C, ST, S) { \
382 const auto reply = amqp_get_rpc_reply(ST); \
383 if (reply.reply_type != AMQP_RESPONSE_NORMAL) { \
384 C->status = S; \
385 C->reply_type = reply.reply_type; \
386 C->reply_code = reply_to_code(reply); \
387 return C; \
388 } \
389 }
390
391 static const amqp_channel_t CHANNEL_ID = 1;
392 static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2;
393
394 // utility function to create a connection, when the connection object already exists
395 connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) {
396 ceph_assert(conn);
397
398 // reset all status codes
399 conn->status = AMQP_STATUS_OK;
400 conn->reply_type = AMQP_RESPONSE_NORMAL;
401 conn->reply_code = RGW_AMQP_NO_REPLY_CODE;
402
403 auto state = amqp_new_connection();
404 if (!state) {
405 conn->status = RGW_AMQP_STATUS_CONN_ALLOC_FAILED;
406 return conn;
407 }
408 // make sure that the connection state is cleaned up in case of error
409 ConnectionCleaner state_guard(state);
410
411 // create and open socket
412 amqp_socket_t *socket = nullptr;
413 if (info.ssl) {
414 socket = amqp_ssl_socket_new(state);
415 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
416 SSL_CTX* ssl_ctx = reinterpret_cast<SSL_CTX*>(amqp_ssl_socket_get_context(socket));
417 #else
418 // taken from https://github.com/alanxz/rabbitmq-c/pull/560
419 struct hack {
420 const struct amqp_socket_class_t *klass;
421 SSL_CTX *ctx;
422 };
423
424 struct hack *h = reinterpret_cast<struct hack*>(socket);
425 SSL_CTX* ssl_ctx = h->ctx;
426 #endif
427 // ensure system CA certificates get loaded
428 SSL_CTX_set_default_verify_paths(ssl_ctx);
429 }
430 else {
431 socket = amqp_tcp_socket_new(state);
432 }
433
434 if (!socket) {
435 conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED;
436 return conn;
437 }
438 if (info.ssl) {
439 if (!conn->verify_ssl) {
440 amqp_ssl_socket_set_verify_peer(socket, 0);
441 amqp_ssl_socket_set_verify_hostname(socket, 0);
442 }
443 if (conn->ca_location.has_value()) {
444 const auto s = amqp_ssl_socket_set_cacert(socket, conn->ca_location.get().c_str());
445 if (s != AMQP_STATUS_OK) {
446 conn->status = RGW_AMQP_STATUS_SOCKET_CACERT_FAILED;
447 conn->reply_code = s;
448 return conn;
449 }
450 }
451 }
452 const auto s = amqp_socket_open(socket, info.host, info.port);
453 if (s < 0) {
454 conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED;
455 conn->reply_type = RGW_AMQP_RESPONSE_SOCKET_ERROR;
456 conn->reply_code = s;
457 return conn;
458 }
459
460 // login to broker
461 const auto reply = amqp_login(state,
462 info.vhost,
463 AMQP_DEFAULT_MAX_CHANNELS,
464 AMQP_DEFAULT_FRAME_SIZE,
465 0, // no heartbeat TODO: add conf
466 AMQP_SASL_METHOD_PLAIN, // TODO: add other types of security
467 info.user,
468 info.password);
469 if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
470 conn->status = RGW_AMQP_STATUS_LOGIN_FAILED;
471 conn->reply_type = reply.reply_type;
472 conn->reply_code = reply_to_code(reply);
473 return conn;
474 }
475
476 // open channels
477 {
478 const auto ok = amqp_channel_open(state, CHANNEL_ID);
479 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED, ok);
480 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED);
481 }
482 {
483 const auto ok = amqp_channel_open(state, CONFIRMING_CHANNEL_ID);
484 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED, ok);
485 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED);
486 }
487 {
488 const auto ok = amqp_confirm_select(state, CONFIRMING_CHANNEL_ID);
489 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED, ok);
490 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED);
491 }
492
493 // verify that the topic exchange is there
494 // TODO: make this step optional
495 {
496 const auto ok = amqp_exchange_declare(state,
497 CHANNEL_ID,
498 amqp_cstring_bytes(conn->exchange.c_str()),
499 amqp_cstring_bytes("topic"),
500 1, // passive - exchange must already exist on broker
501 1, // durable
502 0, // dont auto-delete
503 0, // not internal
504 amqp_empty_table);
505 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED, ok);
506 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED);
507 }
508 {
509 // create queue for confirmations
510 const auto queue_ok = amqp_queue_declare(state,
511 CHANNEL_ID, // use the regular channel for this call
512 amqp_empty_bytes, // let broker allocate queue name
513 0, // not passive - create the queue
514 0, // not durable
515 1, // exclusive
516 1, // auto-delete
517 amqp_empty_table // not args TODO add args from conf: TTL, max length etc.
518 );
519 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_Q_DECLARE_FAILED, queue_ok);
520 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_Q_DECLARE_FAILED);
521
522 // define consumption for connection
523 const auto consume_ok = amqp_basic_consume(state,
524 CONFIRMING_CHANNEL_ID,
525 queue_ok->queue,
526 amqp_empty_bytes, // broker will generate consumer tag
527 1, // messages sent from client are never routed back
528 1, // client does not ack thr acks
529 1, // exclusive access to queue
530 amqp_empty_table // no parameters
531 );
532
533 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED, consume_ok);
534 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED);
535 // broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed
536
537 state_guard.reset();
538 conn->state = state;
539 conn->reply_to_queue = amqp_bytes_malloc_dup(queue_ok->queue);
540 return conn;
541 }
542 }
543
544 // utility function to create a new connection
545 connection_ptr_t create_new_connection(const amqp_connection_info& info,
546 const std::string& exchange, bool mandatory_delivery, CephContext* cct, bool verify_ssl, boost::optional<const std::string&> ca_location) {
547 // create connection state
548 connection_ptr_t conn = new connection_t;
549 conn->exchange = exchange;
550 conn->user.assign(info.user);
551 conn->password.assign(info.password);
552 conn->mandatory = mandatory_delivery;
553 conn->cct = cct;
554 conn->use_ssl = info.ssl;
555 conn->verify_ssl = verify_ssl;
556 conn->ca_location = ca_location;
557 return create_connection(conn, info);
558 }
559
560 /// struct used for holding messages in the message queue
561 struct message_wrapper_t {
562 connection_ptr_t conn;
563 std::string topic;
564 std::string message;
565 reply_callback_t cb;
566
567 message_wrapper_t(connection_ptr_t& _conn,
568 const std::string& _topic,
569 const std::string& _message,
570 reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
571 };
572
573
574 typedef std::unordered_map<connection_id_t, connection_ptr_t, connection_id_t::hasher> ConnectionList;
575 typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
576
577 // macros used inside a loop where an iterator is either incremented or erased
578 #define INCREMENT_AND_CONTINUE(IT) \
579 ++IT; \
580 continue;
581
582 #define ERASE_AND_CONTINUE(IT,CONTAINER) \
583 IT=CONTAINER.erase(IT); \
584 --connection_count; \
585 continue;
586
587 class Manager {
588 public:
589 const size_t max_connections;
590 const size_t max_inflight;
591 const size_t max_queue;
592 const size_t max_idle_time;
593 private:
594 std::atomic<size_t> connection_count;
595 std::atomic<bool> stopped;
596 struct timeval read_timeout;
597 ConnectionList connections;
598 MessageQueue messages;
599 std::atomic<size_t> queued;
600 std::atomic<size_t> dequeued;
601 CephContext* const cct;
602 mutable std::mutex connections_lock;
603 const ceph::coarse_real_clock::duration idle_time;
604 const ceph::coarse_real_clock::duration reconnect_time;
605 std::thread runner;
606
607 void publish_internal(message_wrapper_t* message) {
608 const std::unique_ptr<message_wrapper_t> msg_owner(message);
609 auto& conn = message->conn;
610
611 conn->timestamp = ceph_clock_now();
612
613 if (!conn->is_ok()) {
614 // connection had an issue while message was in the queue
615 // TODO add error stats
616 ldout(conn->cct, 1) << "AMQP publish: connection had an issue while message was in the queue" << dendl;
617 if (message->cb) {
618 message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
619 }
620 return;
621 }
622
623 if (message->cb == nullptr) {
624 // TODO add error stats
625 const auto rc = amqp_basic_publish(conn->state,
626 CHANNEL_ID,
627 amqp_cstring_bytes(conn->exchange.c_str()),
628 amqp_cstring_bytes(message->topic.c_str()),
629 0, // does not have to be routable
630 0, // not immediate
631 nullptr, // no properties needed
632 amqp_cstring_bytes(message->message.c_str()));
633 if (rc == AMQP_STATUS_OK) {
634 ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl;
635 return;
636 }
637 ldout(conn->cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl;
638 // an error occurred, close connection
639 // it will be retied by the main loop
640 conn->destroy(rc);
641 return;
642 }
643
644 amqp_basic_properties_t props;
645 props._flags =
646 AMQP_BASIC_DELIVERY_MODE_FLAG |
647 AMQP_BASIC_REPLY_TO_FLAG;
648 props.delivery_mode = 2; // persistent delivery TODO take from conf
649 props.reply_to = conn->reply_to_queue;
650
651 const auto rc = amqp_basic_publish(conn->state,
652 CONFIRMING_CHANNEL_ID,
653 amqp_cstring_bytes(conn->exchange.c_str()),
654 amqp_cstring_bytes(message->topic.c_str()),
655 conn->mandatory,
656 0, // not immediate
657 &props,
658 amqp_cstring_bytes(message->message.c_str()));
659
660 if (rc == AMQP_STATUS_OK) {
661 auto const q_len = conn->callbacks.size();
662 if (q_len < max_inflight) {
663 ldout(conn->cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
664 conn->callbacks.emplace_back(conn->delivery_tag++, message->cb);
665 } else {
666 // immediately invoke callback with error
667 ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
668 message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT);
669 }
670 } else {
671 // an error occurred, close connection
672 // it will be retied by the main loop
673 ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
674 conn->destroy(rc);
675 // immediately invoke callback with error
676 message->cb(rc);
677 }
678 }
679
680 // the managers thread:
681 // (1) empty the queue of messages to be published
682 // (2) loop over all connections and read acks
683 // (3) manages deleted connections
684 // (4) TODO reconnect on connection errors
685 // (5) TODO cleanup timedout callbacks
686 void run() noexcept {
687 amqp_frame_t frame;
688 while (!stopped) {
689
690 // publish all messages in the queue
691 const auto count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
692 dequeued += count;
693 ConnectionList::iterator conn_it;
694 ConnectionList::const_iterator end_it;
695 {
696 // thread safe access to the connection list
697 // once the iterators are fetched they are guaranteed to remain valid
698 std::lock_guard lock(connections_lock);
699 conn_it = connections.begin();
700 end_it = connections.end();
701 }
702 auto incoming_message = false;
703 // loop over all connections to read acks
704 for (;conn_it != end_it;) {
705
706 auto& conn = conn_it->second;
707 const auto& conn_key = conn_it->first;
708
709 if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
710 ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
711 ERASE_AND_CONTINUE(conn_it, connections);
712 }
713
714 // try to reconnect the connection if it has an error
715 if (!conn->is_ok()) {
716 const auto now = ceph::coarse_real_clock::now();
717 if (now >= conn->next_reconnect) {
718 // pointers are used temporarily inside the amqp_connection_info object
719 // as read-only values, hence the assignment, and const_cast are safe here
720 amqp_connection_info info;
721 info.host = const_cast<char*>(conn_key.host.c_str());
722 info.port = conn_key.port;
723 info.vhost = const_cast<char*>(conn_key.vhost.c_str());
724 info.user = const_cast<char*>(conn->user.c_str());
725 info.password = const_cast<char*>(conn->password.c_str());
726 info.ssl = conn->use_ssl;
727 ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
728 if (create_connection(conn, info)->is_ok() == false) {
729 ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry failed. error: " <<
730 status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl;
731 // TODO: add error counter for failed retries
732 // TODO: add exponential backoff for retries
733 conn->next_reconnect = now + reconnect_time;
734 } else {
735 ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry successfull" << dendl;
736 }
737 }
738 INCREMENT_AND_CONTINUE(conn_it);
739 }
740
741 const auto rc = amqp_simple_wait_frame_noblock(conn->state, &frame, &read_timeout);
742
743 if (rc == AMQP_STATUS_TIMEOUT) {
744 // TODO mark connection as idle
745 INCREMENT_AND_CONTINUE(conn_it);
746 }
747
748 // this is just to prevent spinning idle, does not indicate that a message
749 // was successfully processed or not
750 incoming_message = true;
751
752 // check if error occurred that require reopening the connection
753 if (rc != AMQP_STATUS_OK) {
754 // an error occurred, close connection
755 // it will be retied by the main loop
756 ldout(conn->cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
757 conn->destroy(rc);
758 INCREMENT_AND_CONTINUE(conn_it);
759 }
760
761 if (frame.frame_type != AMQP_FRAME_METHOD) {
762 ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: "
763 << unsigned(frame.frame_type) << dendl;
764 // handler is for publish confirmation only - handle only method frames
765 INCREMENT_AND_CONTINUE(conn_it);
766 }
767
768 uint64_t tag;
769 bool multiple;
770 int result;
771
772 switch (frame.payload.method.id) {
773 case AMQP_BASIC_ACK_METHOD:
774 {
775 result = AMQP_STATUS_OK;
776 const auto ack = (amqp_basic_ack_t*)frame.payload.method.decoded;
777 ceph_assert(ack);
778 tag = ack->delivery_tag;
779 multiple = ack->multiple;
780 break;
781 }
782 case AMQP_BASIC_NACK_METHOD:
783 {
784 result = RGW_AMQP_STATUS_BROKER_NACK;
785 const auto nack = (amqp_basic_nack_t*)frame.payload.method.decoded;
786 ceph_assert(nack);
787 tag = nack->delivery_tag;
788 multiple = nack->multiple;
789 break;
790 }
791 case AMQP_BASIC_REJECT_METHOD:
792 {
793 result = RGW_AMQP_STATUS_BROKER_NACK;
794 const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;
795 tag = reject->delivery_tag;
796 multiple = false;
797 break;
798 }
799 case AMQP_CONNECTION_CLOSE_METHOD:
800 // TODO on channel close, no need to reopen the connection
801 case AMQP_CHANNEL_CLOSE_METHOD:
802 {
803 // other side closed the connection, no need to continue
804 ldout(conn->cct, 10) << "AMQP run: connection was closed by broker" << dendl;
805 conn->destroy(rc);
806 INCREMENT_AND_CONTINUE(conn_it);
807 }
808 case AMQP_BASIC_RETURN_METHOD:
809 // message was not delivered, returned to sender
810 ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl;
811 INCREMENT_AND_CONTINUE(conn_it);
812 break;
813 default:
814 // unexpected method
815 ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl;
816 INCREMENT_AND_CONTINUE(conn_it);
817 }
818
819 const auto& callbacks_end = conn->callbacks.end();
820 const auto& callbacks_begin = conn->callbacks.begin();
821 const auto tag_it = std::find(callbacks_begin, callbacks_end, tag);
822 if (tag_it != callbacks_end) {
823 if (multiple) {
824 // n/ack all up to (and including) the tag
825 ldout(conn->cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
826 auto it = callbacks_begin;
827 while (it->tag <= tag && it != conn->callbacks.end()) {
828 ldout(conn->cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
829 it->cb(result);
830 it = conn->callbacks.erase(it);
831 }
832 } else {
833 // n/ack a specific tag
834 ldout(conn->cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
835 tag_it->cb(result);
836 conn->callbacks.erase(tag_it);
837 }
838 } else {
839 ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
840 }
841 // just increment the iterator
842 ++conn_it;
843 }
844 // if no messages were received or published, sleep for 100ms
845 if (count == 0 && !incoming_message) {
846 std::this_thread::sleep_for(idle_time);
847 }
848 }
849 }
850
851 // used in the dtor for message cleanup
852 static void delete_message(const message_wrapper_t* message) {
853 delete message;
854 }
855
856 public:
857 Manager(size_t _max_connections,
858 size_t _max_inflight,
859 size_t _max_queue,
860 long _usec_timeout,
861 unsigned reconnect_time_ms,
862 unsigned idle_time_ms,
863 CephContext* _cct) :
864 max_connections(_max_connections),
865 max_inflight(_max_inflight),
866 max_queue(_max_queue),
867 max_idle_time(30),
868 connection_count(0),
869 stopped(false),
870 read_timeout{0, _usec_timeout},
871 connections(_max_connections),
872 messages(max_queue),
873 queued(0),
874 dequeued(0),
875 cct(_cct),
876 idle_time(std::chrono::milliseconds(idle_time_ms)),
877 reconnect_time(std::chrono::milliseconds(reconnect_time_ms)),
878 runner(&Manager::run, this) {
879 // The hashmap has "max connections" as the initial number of buckets,
880 // and allows for 10 collisions per bucket before rehash.
881 // This is to prevent rehashing so that iterators are not invalidated
882 // when a new connection is added.
883 connections.max_load_factor(10.0);
884 // give the runner thread a name for easier debugging
885 const auto rc = ceph_pthread_setname(runner.native_handle(), "amqp_manager");
886 ceph_assert(rc==0);
887 }
888
889 // non copyable
890 Manager(const Manager&) = delete;
891 const Manager& operator=(const Manager&) = delete;
892
893 // stop the main thread
894 void stop() {
895 stopped = true;
896 }
897
898 // connect to a broker, or reuse an existing connection if already connected
899 connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
900 boost::optional<const std::string&> ca_location) {
901 if (stopped) {
902 ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
903 return nullptr;
904 }
905
906 struct amqp_connection_info info;
907 // cache the URL so that parsing could happen in-place
908 std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
909 const auto retcode = amqp_parse_url(url_cache.data(), &info);
910 if (AMQP_STATUS_OK != retcode) {
911 ldout(cct, 1) << "AMQP connect: URL parsing failed. error: " << retcode << dendl;
912 return nullptr;
913 }
914
915 const connection_id_t id(info);
916 std::lock_guard lock(connections_lock);
917 const auto it = connections.find(id);
918 if (it != connections.end()) {
919 if (it->second->exchange != exchange) {
920 ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
921 return nullptr;
922 }
923 // connection found - return even if non-ok
924 ldout(cct, 20) << "AMQP connect: connection found" << dendl;
925 return it->second;
926 }
927
928 // connection not found, creating a new one
929 if (connection_count >= max_connections) {
930 ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
931 return nullptr;
932 }
933 const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct, verify_ssl, ca_location);
934 if (!conn->is_ok()) {
935 ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" <<
936 status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl;
937 }
938 // create_new_connection must always return a connection object
939 // even if error occurred during creation.
940 // in such a case the creation will be retried in the main thread
941 ceph_assert(conn);
942 ++connection_count;
943 ldout(cct, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count << dendl;
944 ldout(cct, 10) << "AMQP connect: new connection status is: " << status_to_string(conn->status) << dendl;
945 return connections.emplace(id, conn).first->second;
946 }
947
948 // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
949 int publish(connection_ptr_t& conn,
950 const std::string& topic,
951 const std::string& message) {
952 if (stopped) {
953 ldout(cct, 1) << "AMQP publish: manager is not running" << dendl;
954 return RGW_AMQP_STATUS_MANAGER_STOPPED;
955 }
956 if (!conn || !conn->is_ok()) {
957 ldout(cct, 1) << "AMQP publish: no connection" << dendl;
958 return RGW_AMQP_STATUS_CONNECTION_CLOSED;
959 }
960 if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
961 ++queued;
962 return AMQP_STATUS_OK;
963 }
964 ldout(cct, 1) << "AMQP publish: queue is full" << dendl;
965 return RGW_AMQP_STATUS_QUEUE_FULL;
966 }
967
968 int publish_with_confirm(connection_ptr_t& conn,
969 const std::string& topic,
970 const std::string& message,
971 reply_callback_t cb) {
972 if (stopped) {
973 ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl;
974 return RGW_AMQP_STATUS_MANAGER_STOPPED;
975 }
976 if (!conn || !conn->is_ok()) {
977 ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl;
978 return RGW_AMQP_STATUS_CONNECTION_CLOSED;
979 }
980 if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
981 ++queued;
982 return AMQP_STATUS_OK;
983 }
984 ldout(cct, 1) << "AMQP publish_with_confirm: queue is full" << dendl;
985 return RGW_AMQP_STATUS_QUEUE_FULL;
986 }
987
988 // dtor wait for thread to stop
989 // then connection are cleaned-up
990 ~Manager() {
991 stopped = true;
992 runner.join();
993 messages.consume_all(delete_message);
994 }
995
996 // get the number of connections
997 size_t get_connection_count() const {
998 return connection_count;
999 }
1000
1001 // get the number of in-flight messages
1002 size_t get_inflight() const {
1003 size_t sum = 0;
1004 std::lock_guard lock(connections_lock);
1005 std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
1006 // concurrent access to the callback vector is safe without locking
1007 sum += conn_pair.second->callbacks.size();
1008 });
1009 return sum;
1010 }
1011
1012 // running counter of the queued messages
1013 size_t get_queued() const {
1014 return queued;
1015 }
1016
1017 // running counter of the dequeued messages
1018 size_t get_dequeued() const {
1019 return dequeued;
1020 }
1021 };
1022
1023 // singleton manager
1024 // note that the manager itself is not a singleton, and multiple instances may co-exist
1025 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
1026 static Manager* s_manager = nullptr;
1027
1028 static const size_t MAX_CONNECTIONS_DEFAULT = 256;
1029 static const size_t MAX_INFLIGHT_DEFAULT = 8192;
1030 static const size_t MAX_QUEUE_DEFAULT = 8192;
1031 static const long READ_TIMEOUT_USEC = 100;
1032 static const unsigned IDLE_TIME_MS = 100;
1033 static const unsigned RECONNECT_TIME_MS = 100;
1034
1035 bool init(CephContext* cct) {
1036 if (s_manager) {
1037 return false;
1038 }
1039 // TODO: take conf from CephContext
1040 s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT,
1041 READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct);
1042 return true;
1043 }
1044
1045 void shutdown() {
1046 delete s_manager;
1047 s_manager = nullptr;
1048 }
1049
1050 connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
1051 boost::optional<const std::string&> ca_location) {
1052 if (!s_manager) return nullptr;
1053 return s_manager->connect(url, exchange, mandatory_delivery, verify_ssl, ca_location);
1054 }
1055
1056 int publish(connection_ptr_t& conn,
1057 const std::string& topic,
1058 const std::string& message) {
1059 if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
1060 return s_manager->publish(conn, topic, message);
1061 }
1062
1063 int publish_with_confirm(connection_ptr_t& conn,
1064 const std::string& topic,
1065 const std::string& message,
1066 reply_callback_t cb) {
1067 if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
1068 return s_manager->publish_with_confirm(conn, topic, message, cb);
1069 }
1070
1071 size_t get_connection_count() {
1072 if (!s_manager) return 0;
1073 return s_manager->get_connection_count();
1074 }
1075
1076 size_t get_inflight() {
1077 if (!s_manager) return 0;
1078 return s_manager->get_inflight();
1079 }
1080
1081 size_t get_queued() {
1082 if (!s_manager) return 0;
1083 return s_manager->get_queued();
1084 }
1085
1086 size_t get_dequeued() {
1087 if (!s_manager) return 0;
1088 return s_manager->get_dequeued();
1089 }
1090
1091 size_t get_max_connections() {
1092 if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
1093 return s_manager->max_connections;
1094 }
1095
1096 size_t get_max_inflight() {
1097 if (!s_manager) return MAX_INFLIGHT_DEFAULT;
1098 return s_manager->max_inflight;
1099 }
1100
1101 size_t get_max_queue() {
1102 if (!s_manager) return MAX_QUEUE_DEFAULT;
1103 return s_manager->max_queue;
1104 }
1105
1106 } // namespace amqp
1107