]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_amqp.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_amqp.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_amqp.h"
5 #include <amqp.h>
6 #include <amqp_tcp_socket.h>
7 #include <amqp_framing.h>
8 #include "include/ceph_assert.h"
9 #include <sstream>
10 #include <cstring>
11 #include <unordered_map>
12 #include <string>
13 #include <vector>
14 #include <thread>
15 #include <atomic>
16 #include <mutex>
17 #include <boost/lockfree/queue.hpp>
18
19 // TODO investigation, not necessarily issues:
20 // (1) in case of single threaded writer context use spsc_queue
21 // (2) support multiple channels
22 // (3) check performance of emptying queue to local list, and go over the list and publish
23 // (4) use std::shared_mutex (c++17) or equivalent for the connections lock
24
25 namespace rgw::amqp {
26
27 // RGW AMQP status codes for publishing
28 static const int RGW_AMQP_STATUS_BROKER_NACK = -0x1001;
29 static const int RGW_AMQP_STATUS_CONNECTION_CLOSED = -0x1002;
30 static const int RGW_AMQP_STATUS_QUEUE_FULL = -0x1003;
31 static const int RGW_AMQP_STATUS_MAX_INFLIGHT = -0x1004;
32 // RGW AMQP status code for connection opening
33 static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED = -0x2001;
34 static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED = -0x2002;
35 static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED = -0x2003;
36 static const int RGW_AMQP_STATUS_LOGIN_FAILED = -0x2004;
37 static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED = -0x2005;
38 static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED = -0x2006;
39 static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED = -0x2007;
40 static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008;
41 static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009;
42
43 static const int RGW_AMQP_RESPONSE_SOCKET_ERROR = -0x3008;
44 static const int RGW_AMQP_NO_REPLY_CODE = 0x0;
45
46 // key class for the connection list
47 struct connection_id_t {
48 std::string host;
49 int port;
50 std::string vhost;
51 // constructed from amqp_connection_info struct
52 connection_id_t(const amqp_connection_info& info)
53 : host(info.host), port(info.port), vhost(info.vhost) {}
54
55 // equality operator and hasher functor are needed
56 // so that connection_id_t could be used as key in unordered_map
57 bool operator==(const connection_id_t& other) const {
58 return host == other.host && port == other.port && vhost == other.vhost;
59 }
60
61 struct hasher {
62 std::size_t operator()(const connection_id_t& k) const {
63 return ((std::hash<std::string>()(k.host)
64 ^ (std::hash<int>()(k.port) << 1)) >> 1)
65 ^ (std::hash<std::string>()(k.vhost) << 1);
66 }
67 };
68 };
69
70 // connection_t state cleaner
71 // could be used for automatic cleanup when getting out of scope
72 class ConnectionCleaner {
73 private:
74 amqp_connection_state_t conn;
75 public:
76 ConnectionCleaner(amqp_connection_state_t _conn) : conn(_conn) {}
77 ~ConnectionCleaner() {
78 if (conn) {
79 amqp_destroy_connection(conn);
80 }
81 }
82 // call reset() if cleanup is not needed anymore
83 void reset() {
84 conn = nullptr;
85 }
86 };
87
88 // struct for holding the callback and its tag in the callback list
89 struct reply_callback_with_tag_t {
90 uint64_t tag;
91 reply_callback_t cb;
92
93 reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {}
94
95 bool operator==(uint64_t rhs) {
96 return tag == rhs;
97 }
98 };
99
100 typedef std::vector<reply_callback_with_tag_t> CallbackList;
101
102 // struct for holding the connection state object as well as the exchange
103 // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
104 // since references to deleted objects may still exist in the calling code
105 struct connection_t {
106 amqp_connection_state_t state;
107 std::string exchange;
108 std::string user;
109 std::string password;
110 amqp_bytes_t reply_to_queue;
111 bool marked_for_deletion;
112 uint64_t delivery_tag;
113 int status;
114 int reply_type;
115 int reply_code;
116 mutable std::atomic<int> ref_count;
117 CallbackList callbacks;
118
119 // default ctor
120 connection_t() :
121 state(nullptr),
122 reply_to_queue(amqp_empty_bytes),
123 marked_for_deletion(false),
124 delivery_tag(1),
125 status(AMQP_STATUS_OK),
126 reply_type(AMQP_RESPONSE_NORMAL),
127 reply_code(RGW_AMQP_NO_REPLY_CODE),
128 ref_count(0) {}
129
130 // cleanup of all internal connection resource
131 // the object can still remain, and internal connection
132 // resources created again on successful reconnection
133 void destroy(int s) {
134 status = s;
135 ConnectionCleaner clean_state(state);
136 state = nullptr;
137 amqp_bytes_free(reply_to_queue);
138 reply_to_queue = amqp_empty_bytes;
139 // fire all remaining callbacks
140 std::for_each(callbacks.begin(), callbacks.end(), [s](auto& cb_tag) {
141 cb_tag.cb(s);
142 });
143 delivery_tag = 1;
144 }
145
146 bool is_ok() const {
147 return (state != nullptr && !marked_for_deletion);
148 }
149
150 // dtor also destroys the internals
151 ~connection_t() {
152 destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
153 }
154
155 friend void intrusive_ptr_add_ref(const connection_t* p);
156 friend void intrusive_ptr_release(const connection_t* p);
157 };
158
159 // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
160 void intrusive_ptr_add_ref(const connection_t* p) {
161 ++p->ref_count;
162 }
163 void intrusive_ptr_release(const connection_t* p) {
164 if (--p->ref_count == 0) {
165 delete p;
166 }
167 }
168
169 // convert connection info to string
170 std::string to_string(const amqp_connection_info& info) {
171 std::stringstream ss;
172 ss << "connection info:" <<
173 "\nHost: " << info.host <<
174 "\nPort: " << info.port <<
175 "\nUser: " << info.user <<
176 "\nPassword: " << info.password <<
177 "\nvhost: " << info.vhost <<
178 "\nSSL support: " << info.ssl << std::endl;
179 return ss.str();
180 }
181
182 // convert reply to error code
183 int reply_to_code(const amqp_rpc_reply_t& reply) {
184 switch (reply.reply_type) {
185 case AMQP_RESPONSE_NONE:
186 case AMQP_RESPONSE_NORMAL:
187 return RGW_AMQP_NO_REPLY_CODE;
188 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
189 return reply.library_error;
190 case AMQP_RESPONSE_SERVER_EXCEPTION:
191 if (reply.reply.decoded) {
192 const amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
193 return m->reply_code;
194 }
195 return reply.reply.id;
196 }
197 return RGW_AMQP_NO_REPLY_CODE;
198 }
199
200 // convert reply to string
201 std::string to_string(const amqp_rpc_reply_t& reply) {
202 std::stringstream ss;
203 switch (reply.reply_type) {
204 case AMQP_RESPONSE_NORMAL:
205 return "";
206 case AMQP_RESPONSE_NONE:
207 return "missing RPC reply type";
208 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
209 return amqp_error_string2(reply.library_error);
210 case AMQP_RESPONSE_SERVER_EXCEPTION:
211 {
212 switch (reply.reply.id) {
213 case AMQP_CONNECTION_CLOSE_METHOD:
214 ss << "server connection error: ";
215 break;
216 case AMQP_CHANNEL_CLOSE_METHOD:
217 ss << "server channel error: ";
218 break;
219 default:
220 ss << "server unknown error: ";
221 break;
222 }
223 if (reply.reply.decoded) {
224 amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
225 ss << m->reply_code << " text: " << std::string((char*)m->reply_text.bytes, m->reply_text.len);
226 }
227 return ss.str();
228 }
229 default:
230 ss << "unknown error, method id: " << reply.reply.id;
231 return ss.str();
232 }
233 }
234
235 // convert status enum to string
236 std::string to_string(amqp_status_enum s) {
237 switch (s) {
238 case AMQP_STATUS_OK:
239 return "AMQP_STATUS_OK";
240 case AMQP_STATUS_NO_MEMORY:
241 return "AMQP_STATUS_NO_MEMORY";
242 case AMQP_STATUS_BAD_AMQP_DATA:
243 return "AMQP_STATUS_BAD_AMQP_DATA";
244 case AMQP_STATUS_UNKNOWN_CLASS:
245 return "AMQP_STATUS_UNKNOWN_CLASS";
246 case AMQP_STATUS_UNKNOWN_METHOD:
247 return "AMQP_STATUS_UNKNOWN_METHOD";
248 case AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED:
249 return "AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED";
250 case AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION:
251 return "AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION";
252 case AMQP_STATUS_CONNECTION_CLOSED:
253 return "AMQP_STATUS_CONNECTION_CLOSED";
254 case AMQP_STATUS_BAD_URL:
255 return "AMQP_STATUS_BAD_URL";
256 case AMQP_STATUS_SOCKET_ERROR:
257 return "AMQP_STATUS_SOCKET_ERROR";
258 case AMQP_STATUS_INVALID_PARAMETER:
259 return "AMQP_STATUS_INVALID_PARAMETER";
260 case AMQP_STATUS_TABLE_TOO_BIG:
261 return "AMQP_STATUS_TABLE_TOO_BIG";
262 case AMQP_STATUS_WRONG_METHOD:
263 return "AMQP_STATUS_WRONG_METHOD";
264 case AMQP_STATUS_TIMEOUT:
265 return "AMQP_STATUS_TIMEOUT";
266 case AMQP_STATUS_TIMER_FAILURE:
267 return "AMQP_STATUS_TIMER_FAILURE";
268 case AMQP_STATUS_HEARTBEAT_TIMEOUT:
269 return "AMQP_STATUS_HEARTBEAT_TIMEOUT";
270 case AMQP_STATUS_UNEXPECTED_STATE:
271 return "AMQP_STATUS_UNEXPECTED_STATE";
272 case AMQP_STATUS_SOCKET_CLOSED:
273 return "AMQP_STATUS_SOCKET_CLOSED";
274 case AMQP_STATUS_SOCKET_INUSE:
275 return "AMQP_STATUS_SOCKET_INUSE";
276 case AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD:
277 return "AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD";
278 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 8, 0, 0)
279 case AMQP_STATUS_UNSUPPORTED:
280 return "AMQP_STATUS_UNSUPPORTED";
281 #endif
282 case _AMQP_STATUS_NEXT_VALUE:
283 return "AMQP_STATUS_INTERNAL";
284 case AMQP_STATUS_TCP_ERROR:
285 return "AMQP_STATUS_TCP_ERROR";
286 case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR:
287 return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR";
288 case _AMQP_STATUS_TCP_NEXT_VALUE:
289 return "AMQP_STATUS_INTERNAL";
290 case AMQP_STATUS_SSL_ERROR:
291 return "AMQP_STATUS_SSL_ERROR";
292 case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED:
293 return "AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED";
294 case AMQP_STATUS_SSL_PEER_VERIFY_FAILED:
295 return "AMQP_STATUS_SSL_PEER_VERIFY_FAILED";
296 case AMQP_STATUS_SSL_CONNECTION_FAILED:
297 return "AMQP_STATUS_SSL_CONNECTION_FAILED";
298 case _AMQP_STATUS_SSL_NEXT_VALUE:
299 return "AMQP_STATUS_INTERNAL";
300 }
301 return "AMQP_STATUS_UNKNOWN";
302 }
303
304 // TODO: add status_to_string on the connection object to prinf full status
305
306 // convert int status to string - including RGW specific values
307 std::string status_to_string(int s) {
308 switch (s) {
309 case RGW_AMQP_STATUS_BROKER_NACK:
310 return "RGW_AMQP_STATUS_BROKER_NACK";
311 case RGW_AMQP_STATUS_CONNECTION_CLOSED:
312 return "RGW_AMQP_STATUS_CONNECTION_CLOSED";
313 case RGW_AMQP_STATUS_QUEUE_FULL:
314 return "RGW_AMQP_STATUS_QUEUE_FULL";
315 case RGW_AMQP_STATUS_MAX_INFLIGHT:
316 return "RGW_AMQP_STATUS_MAX_INFLIGHT";
317 case RGW_AMQP_STATUS_CONN_ALLOC_FAILED:
318 return "RGW_AMQP_STATUS_CONN_ALLOC_FAILED";
319 case RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED:
320 return "RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED";
321 case RGW_AMQP_STATUS_SOCKET_OPEN_FAILED:
322 return "RGW_AMQP_STATUS_SOCKET_OPEN_FAILED";
323 case RGW_AMQP_STATUS_LOGIN_FAILED:
324 return "RGW_AMQP_STATUS_LOGIN_FAILED";
325 case RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED:
326 return "RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED";
327 case RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED:
328 return "RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED";
329 case RGW_AMQP_STATUS_Q_DECLARE_FAILED:
330 return "RGW_AMQP_STATUS_Q_DECLARE_FAILED";
331 case RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED:
332 return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
333 case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED:
334 return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
335 }
336 return to_string((amqp_status_enum)s);
337 }
338
339 // check the result from calls and return if error (=null)
340 #define RETURN_ON_ERROR(C, S, OK) \
341 if (!OK) { \
342 C->status = S; \
343 return C; \
344 }
345
346 // in case of RPC calls, getting the RPC reply and return if an error is detected
347 #define RETURN_ON_REPLY_ERROR(C, ST, S) { \
348 const auto reply = amqp_get_rpc_reply(ST); \
349 if (reply.reply_type != AMQP_RESPONSE_NORMAL) { \
350 C->status = S; \
351 C->reply_type = reply.reply_type; \
352 C->reply_code = reply_to_code(reply); \
353 return C; \
354 } \
355 }
356
357 static const amqp_channel_t CHANNEL_ID = 1;
358 static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2;
359
360 // utility function to create a connection, when the connection object already exists
361 connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) {
362 // pointer must be valid and not marked for deletion
363 ceph_assert(conn && !conn->marked_for_deletion);
364
365 // reset all status codes
366 conn->status = AMQP_STATUS_OK;
367 conn->reply_type = AMQP_RESPONSE_NORMAL;
368 conn->reply_code = RGW_AMQP_NO_REPLY_CODE;
369
370 auto state = amqp_new_connection();
371 if (!state) {
372 conn->status = RGW_AMQP_STATUS_CONN_ALLOC_FAILED;
373 return conn;
374 }
375 // make sure that the connection state is cleaned up in case of error
376 ConnectionCleaner state_guard(state);
377
378 // create and open socket
379 auto socket = amqp_tcp_socket_new(state);
380 if (!socket) {
381 conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED;
382 return conn;
383 }
384 const auto s = amqp_socket_open(socket, info.host, info.port);
385 if (s < 0) {
386 conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED;
387 conn->reply_type = RGW_AMQP_RESPONSE_SOCKET_ERROR;
388 conn->reply_code = s;
389 return conn;
390 }
391
392 // login to broker
393 const auto reply = amqp_login(state,
394 info.vhost,
395 AMQP_DEFAULT_MAX_CHANNELS,
396 AMQP_DEFAULT_FRAME_SIZE,
397 0, // no heartbeat TODO: add conf
398 AMQP_SASL_METHOD_PLAIN, // TODO: add other types of security
399 info.user,
400 info.password);
401 if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
402 conn->status = RGW_AMQP_STATUS_LOGIN_FAILED;
403 conn->reply_type = reply.reply_type;
404 conn->reply_code = reply_to_code(reply);
405 return conn;
406 }
407
408 // open channels
409 {
410 const auto ok = amqp_channel_open(state, CHANNEL_ID);
411 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED, ok);
412 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED);
413 }
414 {
415 const auto ok = amqp_channel_open(state, CONFIRMING_CHANNEL_ID);
416 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED, ok);
417 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED);
418 }
419 {
420 const auto ok = amqp_confirm_select(state, CONFIRMING_CHANNEL_ID);
421 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED, ok);
422 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED);
423 }
424
425 // verify that the topic exchange is there
426 // TODO: make this step optional
427 {
428 const auto ok = amqp_exchange_declare(state,
429 CHANNEL_ID,
430 amqp_cstring_bytes(conn->exchange.c_str()),
431 amqp_cstring_bytes("topic"),
432 1, // passive - exchange must already exist on broker
433 1, // durable
434 0, // dont auto-delete
435 0, // not internal
436 amqp_empty_table);
437 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED, ok);
438 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED);
439 }
440 {
441 // create queue for confirmations
442 const auto queue_ok = amqp_queue_declare(state,
443 CHANNEL_ID, // use the regular channel for this call
444 amqp_empty_bytes, // let broker allocate queue name
445 0, // not passive - create the queue
446 0, // not durable
447 1, // exclusive
448 1, // auto-delete
449 amqp_empty_table // not args TODO add args from conf: TTL, max length etc.
450 );
451 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_Q_DECLARE_FAILED, queue_ok);
452 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_Q_DECLARE_FAILED);
453
454 // define consumption for connection
455 const auto consume_ok = amqp_basic_consume(state,
456 CONFIRMING_CHANNEL_ID,
457 queue_ok->queue,
458 amqp_empty_bytes, // broker will generate consumer tag
459 1, // messages sent from client are never routed back
460 1, // client does not ack thr acks
461 1, // exclusive access to queue
462 amqp_empty_table // no parameters
463 );
464
465 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED, consume_ok);
466 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED);
467 // broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed
468
469 state_guard.reset();
470 conn->state = state;
471 conn->reply_to_queue = amqp_bytes_malloc_dup(queue_ok->queue);
472 return conn;
473 }
474 }
475
476 // utility function to create a new connection
477 connection_ptr_t create_new_connection(const amqp_connection_info& info,
478 const std::string& exchange) {
479 // create connection state
480 connection_ptr_t conn = new connection_t;
481 conn->exchange = exchange;
482 conn->user.assign(info.user);
483 conn->password.assign(info.password);
484 return create_connection(conn, info);
485 }
486
487 /// struct used for holding messages in the message queue
488 struct message_wrapper_t {
489 connection_ptr_t conn;
490 std::string topic;
491 std::string message;
492 reply_callback_t cb;
493
494 message_wrapper_t(connection_ptr_t& _conn,
495 const std::string& _topic,
496 const std::string& _message,
497 reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
498 };
499
500
501 typedef std::unordered_map<connection_id_t, connection_ptr_t, connection_id_t::hasher> ConnectionList;
502 typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
503
504 // macros used inside a loop where an iterator is either incremented or erased
505 #define INCREMENT_AND_CONTINUE(IT) \
506 ++IT; \
507 continue;
508
509 #define ERASE_AND_CONTINUE(IT,CONTAINER) \
510 IT=CONTAINER.erase(IT); \
511 --connection_count; \
512 continue;
513
514 class Manager {
515 public:
516 const size_t max_connections;
517 const size_t max_inflight;
518 const size_t max_queue;
519 private:
520 std::atomic<size_t> connection_count;
521 bool stopped;
522 struct timeval read_timeout;
523 ConnectionList connections;
524 MessageQueue messages;
525 std::atomic<size_t> queued;
526 std::atomic<size_t> dequeued;
527 mutable std::mutex connections_lock;
528 std::thread runner;
529
530 void publish_internal(message_wrapper_t* message) {
531 const std::unique_ptr<message_wrapper_t> msg_owner(message);
532 auto& conn = message->conn;
533
534 if (!conn->is_ok()) {
535 // connection had an issue while message was in the queue
536 // TODO add error stats
537 if (message->cb) {
538 message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
539 }
540 return;
541 }
542
543 if (message->cb == nullptr) {
544 // TODO add error stats
545 const auto rc = amqp_basic_publish(conn->state,
546 CHANNEL_ID,
547 amqp_cstring_bytes(conn->exchange.c_str()),
548 amqp_cstring_bytes(message->topic.c_str()),
549 1, // mandatory, TODO: take from conf
550 0, // not immediate
551 nullptr,
552 amqp_cstring_bytes(message->message.c_str()));
553 if (rc == AMQP_STATUS_OK) {
554 return;
555 }
556 // an error occurred, close connection
557 // it will be retied by the main loop
558 conn->destroy(rc);
559 return;
560 }
561
562 amqp_basic_properties_t props;
563 props._flags =
564 AMQP_BASIC_DELIVERY_MODE_FLAG |
565 AMQP_BASIC_REPLY_TO_FLAG;
566 props.delivery_mode = 2; // persistent delivery TODO take from conf
567 props.reply_to = conn->reply_to_queue;
568
569 const auto rc = amqp_basic_publish(conn->state,
570 CONFIRMING_CHANNEL_ID,
571 amqp_cstring_bytes(conn->exchange.c_str()),
572 amqp_cstring_bytes(message->topic.c_str()),
573 1, // mandatory, TODO: take from conf
574 0, // not immediate
575 &props,
576 amqp_cstring_bytes(message->message.c_str()));
577
578 if (rc == AMQP_STATUS_OK) {
579 if (conn->callbacks.size() < max_inflight) {
580 conn->callbacks.emplace_back(conn->delivery_tag++, message->cb);
581 } else {
582 // immediately invoke callback with error
583 message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT);
584 }
585 } else {
586 // an error occurred, close connection
587 // it will be retied by the main loop
588 conn->destroy(rc);
589 // immediately invoke callback with error
590 message->cb(rc);
591 }
592 }
593
594 // the managers thread:
595 // (1) empty the queue of messages to be published
596 // (2) loop over all connections and read acks
597 // (3) manages deleted connections
598 // (4) TODO reconnect on connection errors
599 // (5) TODO cleanup timedout callbacks
600 void run() {
601 amqp_frame_t frame;
602 while (!stopped) {
603
604 // publish all messages in the queue
605 const auto count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
606 dequeued += count;
607 ConnectionList::iterator conn_it;
608 ConnectionList::const_iterator end_it;
609 {
610 // thread safe access to the connection list
611 // once the iterators are fetched they are guaranteed to remain valid
612 std::lock_guard<std::mutex> lock(connections_lock);
613 conn_it = connections.begin();
614 end_it = connections.end();
615 }
616 auto incoming_message = false;
617 // loop over all connections to read acks
618 for (;conn_it != end_it;) {
619
620 auto& conn = conn_it->second;
621 // delete the connection if marked for deletion
622 if (conn->marked_for_deletion) {
623 conn->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
624 std::lock_guard<std::mutex> lock(connections_lock);
625 // erase is safe - does not invalidate any other iterator
626 // lock so no insertion happens at the same time
627 ERASE_AND_CONTINUE(conn_it, connections);
628 }
629
630 // try to reconnect the connection if it has an error
631 if (!conn->is_ok()) {
632 // pointers are used temporarily inside the amqp_connection_info object
633 // as read-only values, hence the assignment, and const_cast are safe here
634 amqp_connection_info info;
635 info.host = const_cast<char*>(conn_it->first.host.c_str());
636 info.port = conn_it->first.port;
637 info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
638 info.user = const_cast<char*>(conn->user.c_str());
639 info.password = const_cast<char*>(conn->password.c_str());
640 if (create_connection(conn, info)->is_ok() == false) {
641 // TODO: add error counter for failed retries
642 // TODO: add exponential backoff for retries
643 }
644 INCREMENT_AND_CONTINUE(conn_it);
645 }
646
647 const auto rc = amqp_simple_wait_frame_noblock(conn->state, &frame, &read_timeout);
648
649 if (rc == AMQP_STATUS_TIMEOUT) {
650 // TODO mark connection as idle
651 INCREMENT_AND_CONTINUE(conn_it);
652 }
653
654 // this is just to prevent spinning idle, does not indicate that a message
655 // was successfully processed or not
656 incoming_message = true;
657
658 // check if error occurred that require reopening the connection
659 if (rc != AMQP_STATUS_OK) {
660 // an error occurred, close connection
661 // it will be retied by the main loop
662 conn->destroy(rc);
663 INCREMENT_AND_CONTINUE(conn_it);
664 }
665
666 if (frame.frame_type != AMQP_FRAME_METHOD) {
667 // handler is for publish confirmation only - handle only method frames
668 // TODO: add a counter
669 INCREMENT_AND_CONTINUE(conn_it);
670 }
671
672 uint64_t tag;
673 bool multiple;
674 int result;
675
676 switch (frame.payload.method.id) {
677 case AMQP_BASIC_ACK_METHOD:
678 {
679 result = AMQP_STATUS_OK;
680 const auto ack = (amqp_basic_ack_t*)frame.payload.method.decoded;
681 ceph_assert(ack);
682 tag = ack->delivery_tag;
683 multiple = ack->multiple;
684 break;
685 }
686 case AMQP_BASIC_NACK_METHOD:
687 {
688 result = RGW_AMQP_STATUS_BROKER_NACK;
689 const auto nack = (amqp_basic_nack_t*)frame.payload.method.decoded;
690 ceph_assert(nack);
691 tag = nack->delivery_tag;
692 multiple = nack->multiple;
693 break;
694 }
695 case AMQP_CONNECTION_CLOSE_METHOD:
696 // TODO on channel close, no need to reopen the connection
697 case AMQP_CHANNEL_CLOSE_METHOD:
698 {
699 // other side closed the connection, no need to continue
700 conn->destroy(rc);
701 INCREMENT_AND_CONTINUE(conn_it);
702 }
703 case AMQP_BASIC_RETURN_METHOD:
704 // message was not delivered, returned to sender
705 // TODO: add a counter
706 INCREMENT_AND_CONTINUE(conn_it);
707 break;
708 default:
709 // unexpected method
710 // TODO: add a counter
711 INCREMENT_AND_CONTINUE(conn_it);
712 }
713
714 const auto& callbacks_end = conn->callbacks.end();
715 const auto& callbacks_begin = conn->callbacks.begin();
716 const auto it = std::find(callbacks_begin, callbacks_end, tag);
717 if (it != callbacks_end) {
718 if (multiple) {
719 // n/ack all up to (and including) the tag
720 for (auto rit = it; rit >= callbacks_begin; --rit) {
721 rit->cb(result);
722 conn->callbacks.erase(rit);
723 }
724 } else {
725 // n/ack a specific tag
726 it->cb(result);
727 conn->callbacks.erase(it);
728 }
729 } else {
730 // TODO add counter for acks with no callback
731 }
732 // just increment the iterator
733 ++conn_it;
734 }
735 // if no messages were received or published, sleep for 100ms
736 if (count == 0 && !incoming_message) {
737 std::this_thread::sleep_for(std::chrono::milliseconds(100));
738 }
739 }
740 }
741
742 // used in the dtor for message cleanup
743 static void delete_message(const message_wrapper_t* message) {
744 delete message;
745 }
746
747 public:
748 Manager(size_t _max_connections,
749 size_t _max_inflight,
750 size_t _max_queue,
751 long _usec_timeout) :
752 max_connections(_max_connections),
753 max_inflight(_max_inflight),
754 max_queue(_max_queue),
755 connection_count(0),
756 stopped(false),
757 read_timeout{0, _usec_timeout},
758 connections(_max_connections),
759 messages(max_queue),
760 queued(0),
761 dequeued(0),
762 runner(&Manager::run, this) {
763 // The hashmap has "max connections" as the initial number of buckets,
764 // and allows for 10 collisions per bucket before rehash.
765 // This is to prevent rehashing so that iterators are not invalidated
766 // when a new connection is added.
767 connections.max_load_factor(10.0);
768 }
769
770 // non copyable
771 Manager(const Manager&) = delete;
772 const Manager& operator=(const Manager&) = delete;
773
774 // stop the main thread
775 void stop() {
776 stopped = true;
777 }
778
779 // disconnect from a broker
780 bool disconnect(connection_ptr_t& conn) {
781 if (!conn || stopped) {
782 return false;
783 }
784 conn->marked_for_deletion = true;
785 return true;
786 }
787
788 // connect to a broker, or reuse an existing connection if already connected
789 connection_ptr_t connect(const std::string& url, const std::string& exchange) {
790 if (stopped) {
791 // TODO: increment counter
792 return nullptr;
793 }
794
795 struct amqp_connection_info info;
796 // cache the URL so that parsing could happen in-place
797 std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
798 if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) {
799 // TODO: increment counter
800 return nullptr;
801 }
802
803 const connection_id_t id(info);
804 std::lock_guard<std::mutex> lock(connections_lock);
805 const auto it = connections.find(id);
806 if (it != connections.end()) {
807 if (it->second->marked_for_deletion) {
808 // TODO: increment counter
809 return nullptr;
810 } else if (it->second->exchange != exchange) {
811 // TODO: increment counter
812 return nullptr;
813 }
814 // connection found - return even if non-ok
815 return it->second;
816 }
817
818 // connection not found, creating a new one
819 if (connection_count >= max_connections) {
820 // TODO: increment counter
821 return nullptr;
822 }
823 const auto conn = create_new_connection(info, exchange);
824 // create_new_connection must always return a connection object
825 // even if error occurred during creation.
826 // in such a case the creation will be retried in the main thread
827 ceph_assert(conn);
828 ++connection_count;
829 return connections.emplace(id, conn).first->second;
830 }
831
832 // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
833 int publish(connection_ptr_t& conn,
834 const std::string& topic,
835 const std::string& message) {
836 if (!conn || !conn->is_ok()) {
837 return RGW_AMQP_STATUS_CONNECTION_CLOSED;
838 }
839 if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
840 ++queued;
841 return AMQP_STATUS_OK;
842 }
843 return RGW_AMQP_STATUS_QUEUE_FULL;
844 }
845
846 int publish_with_confirm(connection_ptr_t& conn,
847 const std::string& topic,
848 const std::string& message,
849 reply_callback_t cb) {
850 if (!conn || !conn->is_ok()) {
851 return RGW_AMQP_STATUS_CONNECTION_CLOSED;
852 }
853 if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
854 ++queued;
855 return AMQP_STATUS_OK;
856 }
857 return RGW_AMQP_STATUS_QUEUE_FULL;
858 }
859
860 // dtor wait for thread to stop
861 // then connection are cleaned-up
862 ~Manager() {
863 stopped = true;
864 runner.join();
865 messages.consume_all(delete_message);
866 }
867
868 // get the number of connections
869 size_t get_connection_count() const {
870 return connection_count;
871 }
872
873 // get the number of in-flight messages
874 size_t get_inflight() const {
875 size_t sum = 0;
876 std::lock_guard<std::mutex> lock(connections_lock);
877 std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
878 sum += conn_pair.second->callbacks.size();
879 });
880 return sum;
881 }
882
883 // running counter of the queued messages
884 size_t get_queued() const {
885 return queued;
886 }
887
888 // running counter of the dequeued messages
889 size_t get_dequeued() const {
890 return dequeued;
891 }
892 };
893
894 // singleton manager
895 // note that the manager itself is not a singleton, and multiple instances may co-exist
896 // TODO get parameters from conf
897 Manager s_manager(256, 8192, 8192, 100);
898
899 connection_ptr_t connect(const std::string& url, const std::string& exchange) {
900 return s_manager.connect(url, exchange);
901 }
902
903 int publish(connection_ptr_t& conn,
904 const std::string& topic,
905 const std::string& message) {
906 return s_manager.publish(conn, topic, message);
907 }
908
909 int publish_with_confirm(connection_ptr_t& conn,
910 const std::string& topic,
911 const std::string& message,
912 reply_callback_t cb) {
913 return s_manager.publish_with_confirm(conn, topic, message, cb);
914 }
915
916 size_t get_connection_count() {
917 return s_manager.get_connection_count();
918 }
919
920 size_t get_inflight() {
921 return s_manager.get_inflight();
922 }
923
924 size_t get_queued() {
925 return s_manager.get_queued();
926 }
927
928 size_t get_dequeued() {
929 return s_manager.get_dequeued();
930 }
931
932 size_t get_max_connections() {
933 return s_manager.max_connections;
934 }
935
936 size_t get_max_inflight() {
937 return s_manager.max_inflight;
938 }
939
940 size_t get_max_queue() {
941 return s_manager.max_queue;
942 }
943
944 bool disconnect(connection_ptr_t& conn) {
945 return s_manager.disconnect(conn);
946 }
947
948 } // namespace amqp
949