]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_amqp.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_amqp.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_amqp.h"
5 #include <amqp.h>
6 #include <amqp_tcp_socket.h>
7 #include <amqp_framing.h>
8 #include "include/ceph_assert.h"
9 #include <sstream>
10 #include <cstring>
11 #include <unordered_map>
12 #include <string>
13 #include <vector>
14 #include <thread>
15 #include <atomic>
16 #include <mutex>
17 #include <boost/lockfree/queue.hpp>
18 #include "common/dout.h"
19
20 #define dout_subsys ceph_subsys_rgw
21
22 // TODO investigation, not necessarily issues:
23 // (1) in case of single threaded writer context use spsc_queue
24 // (2) support multiple channels
25 // (3) check performance of emptying queue to local list, and go over the list and publish
26 // (4) use std::shared_mutex (c++17) or equivalent for the connections lock
27
28 namespace rgw::amqp {
29
30 // RGW AMQP status codes for publishing
31 static const int RGW_AMQP_STATUS_BROKER_NACK = -0x1001;
32 static const int RGW_AMQP_STATUS_CONNECTION_CLOSED = -0x1002;
33 static const int RGW_AMQP_STATUS_QUEUE_FULL = -0x1003;
34 static const int RGW_AMQP_STATUS_MAX_INFLIGHT = -0x1004;
35 static const int RGW_AMQP_STATUS_MANAGER_STOPPED = -0x1005;
36 // RGW AMQP status code for connection opening
37 static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED = -0x2001;
38 static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED = -0x2002;
39 static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED = -0x2003;
40 static const int RGW_AMQP_STATUS_LOGIN_FAILED = -0x2004;
41 static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED = -0x2005;
42 static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED = -0x2006;
43 static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED = -0x2007;
44 static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008;
45 static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009;
46
47 static const int RGW_AMQP_RESPONSE_SOCKET_ERROR = -0x3008;
48 static const int RGW_AMQP_NO_REPLY_CODE = 0x0;
49
50 // key class for the connection list
51 struct connection_id_t {
52 const std::string host;
53 const int port;
54 const std::string vhost;
55 // constructed from amqp_connection_info struct
56 connection_id_t(const amqp_connection_info& info)
57 : host(info.host), port(info.port), vhost(info.vhost) {}
58
59 // equality operator and hasher functor are needed
60 // so that connection_id_t could be used as key in unordered_map
61 bool operator==(const connection_id_t& other) const {
62 return host == other.host && port == other.port && vhost == other.vhost;
63 }
64
65 struct hasher {
66 std::size_t operator()(const connection_id_t& k) const {
67 return ((std::hash<std::string>()(k.host)
68 ^ (std::hash<int>()(k.port) << 1)) >> 1)
69 ^ (std::hash<std::string>()(k.vhost) << 1);
70 }
71 };
72 };
73
74 std::string to_string(const connection_id_t& id) {
75 return id.host+":"+std::to_string(id.port)+"/"+id.vhost;
76 }
77
78 // connection_t state cleaner
79 // could be used for automatic cleanup when getting out of scope
80 class ConnectionCleaner {
81 private:
82 amqp_connection_state_t conn;
83 public:
84 ConnectionCleaner(amqp_connection_state_t _conn) : conn(_conn) {}
85 ~ConnectionCleaner() {
86 if (conn) {
87 amqp_destroy_connection(conn);
88 }
89 }
90 // call reset() if cleanup is not needed anymore
91 void reset() {
92 conn = nullptr;
93 }
94 };
95
96 // struct for holding the callback and its tag in the callback list
97 struct reply_callback_with_tag_t {
98 uint64_t tag;
99 reply_callback_t cb;
100
101 reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {}
102
103 bool operator==(uint64_t rhs) {
104 return tag == rhs;
105 }
106 };
107
108 typedef std::vector<reply_callback_with_tag_t> CallbackList;
109
110 // struct for holding the connection state object as well as the exchange
111 // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
112 // since references to deleted objects may still exist in the calling code
113 struct connection_t {
114 amqp_connection_state_t state;
115 std::string exchange;
116 std::string user;
117 std::string password;
118 amqp_bytes_t reply_to_queue;
119 bool marked_for_deletion;
120 uint64_t delivery_tag;
121 int status;
122 int reply_type;
123 int reply_code;
124 mutable std::atomic<int> ref_count;
125 CephContext* cct;
126 CallbackList callbacks;
127
128 // default ctor
129 connection_t() :
130 state(nullptr),
131 reply_to_queue(amqp_empty_bytes),
132 marked_for_deletion(false),
133 delivery_tag(1),
134 status(AMQP_STATUS_OK),
135 reply_type(AMQP_RESPONSE_NORMAL),
136 reply_code(RGW_AMQP_NO_REPLY_CODE),
137 ref_count(0),
138 cct(nullptr) {}
139
140 // cleanup of all internal connection resource
141 // the object can still remain, and internal connection
142 // resources created again on successful reconnection
143 void destroy(int s) {
144 status = s;
145 ConnectionCleaner clean_state(state);
146 state = nullptr;
147 amqp_bytes_free(reply_to_queue);
148 reply_to_queue = amqp_empty_bytes;
149 // fire all remaining callbacks
150 std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
151 cb_tag.cb(status);
152 ldout(cct, 20) << "AMQP destroy: invoking callback with tag=" << cb_tag.tag << dendl;
153 });
154 callbacks.clear();
155 delivery_tag = 1;
156 }
157
158 bool is_ok() const {
159 return (state != nullptr && !marked_for_deletion);
160 }
161
162 // dtor also destroys the internals
163 ~connection_t() {
164 destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
165 }
166
167 friend void intrusive_ptr_add_ref(const connection_t* p);
168 friend void intrusive_ptr_release(const connection_t* p);
169 };
170
171 // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
172 void intrusive_ptr_add_ref(const connection_t* p) {
173 ++p->ref_count;
174 }
175 void intrusive_ptr_release(const connection_t* p) {
176 if (--p->ref_count == 0) {
177 delete p;
178 }
179 }
180
181 // convert connection info to string
182 std::string to_string(const amqp_connection_info& info) {
183 std::stringstream ss;
184 ss << "connection info:" <<
185 "\nHost: " << info.host <<
186 "\nPort: " << info.port <<
187 "\nUser: " << info.user <<
188 "\nPassword: " << info.password <<
189 "\nvhost: " << info.vhost <<
190 "\nSSL support: " << info.ssl << std::endl;
191 return ss.str();
192 }
193
194 // convert reply to error code
195 int reply_to_code(const amqp_rpc_reply_t& reply) {
196 switch (reply.reply_type) {
197 case AMQP_RESPONSE_NONE:
198 case AMQP_RESPONSE_NORMAL:
199 return RGW_AMQP_NO_REPLY_CODE;
200 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
201 return reply.library_error;
202 case AMQP_RESPONSE_SERVER_EXCEPTION:
203 if (reply.reply.decoded) {
204 const amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
205 return m->reply_code;
206 }
207 return reply.reply.id;
208 }
209 return RGW_AMQP_NO_REPLY_CODE;
210 }
211
212 // convert reply to string
213 std::string to_string(const amqp_rpc_reply_t& reply) {
214 std::stringstream ss;
215 switch (reply.reply_type) {
216 case AMQP_RESPONSE_NORMAL:
217 return "";
218 case AMQP_RESPONSE_NONE:
219 return "missing RPC reply type";
220 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
221 return amqp_error_string2(reply.library_error);
222 case AMQP_RESPONSE_SERVER_EXCEPTION:
223 {
224 switch (reply.reply.id) {
225 case AMQP_CONNECTION_CLOSE_METHOD:
226 ss << "server connection error: ";
227 break;
228 case AMQP_CHANNEL_CLOSE_METHOD:
229 ss << "server channel error: ";
230 break;
231 default:
232 ss << "server unknown error: ";
233 break;
234 }
235 if (reply.reply.decoded) {
236 amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
237 ss << m->reply_code << " text: " << std::string((char*)m->reply_text.bytes, m->reply_text.len);
238 }
239 return ss.str();
240 }
241 default:
242 ss << "unknown error, method id: " << reply.reply.id;
243 return ss.str();
244 }
245 }
246
247 // convert status enum to string
248 std::string to_string(amqp_status_enum s) {
249 switch (s) {
250 case AMQP_STATUS_OK:
251 return "AMQP_STATUS_OK";
252 case AMQP_STATUS_NO_MEMORY:
253 return "AMQP_STATUS_NO_MEMORY";
254 case AMQP_STATUS_BAD_AMQP_DATA:
255 return "AMQP_STATUS_BAD_AMQP_DATA";
256 case AMQP_STATUS_UNKNOWN_CLASS:
257 return "AMQP_STATUS_UNKNOWN_CLASS";
258 case AMQP_STATUS_UNKNOWN_METHOD:
259 return "AMQP_STATUS_UNKNOWN_METHOD";
260 case AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED:
261 return "AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED";
262 case AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION:
263 return "AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION";
264 case AMQP_STATUS_CONNECTION_CLOSED:
265 return "AMQP_STATUS_CONNECTION_CLOSED";
266 case AMQP_STATUS_BAD_URL:
267 return "AMQP_STATUS_BAD_URL";
268 case AMQP_STATUS_SOCKET_ERROR:
269 return "AMQP_STATUS_SOCKET_ERROR";
270 case AMQP_STATUS_INVALID_PARAMETER:
271 return "AMQP_STATUS_INVALID_PARAMETER";
272 case AMQP_STATUS_TABLE_TOO_BIG:
273 return "AMQP_STATUS_TABLE_TOO_BIG";
274 case AMQP_STATUS_WRONG_METHOD:
275 return "AMQP_STATUS_WRONG_METHOD";
276 case AMQP_STATUS_TIMEOUT:
277 return "AMQP_STATUS_TIMEOUT";
278 case AMQP_STATUS_TIMER_FAILURE:
279 return "AMQP_STATUS_TIMER_FAILURE";
280 case AMQP_STATUS_HEARTBEAT_TIMEOUT:
281 return "AMQP_STATUS_HEARTBEAT_TIMEOUT";
282 case AMQP_STATUS_UNEXPECTED_STATE:
283 return "AMQP_STATUS_UNEXPECTED_STATE";
284 case AMQP_STATUS_SOCKET_CLOSED:
285 return "AMQP_STATUS_SOCKET_CLOSED";
286 case AMQP_STATUS_SOCKET_INUSE:
287 return "AMQP_STATUS_SOCKET_INUSE";
288 case AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD:
289 return "AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD";
290 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 8, 0, 0)
291 case AMQP_STATUS_UNSUPPORTED:
292 return "AMQP_STATUS_UNSUPPORTED";
293 #endif
294 case _AMQP_STATUS_NEXT_VALUE:
295 return "AMQP_STATUS_INTERNAL";
296 case AMQP_STATUS_TCP_ERROR:
297 return "AMQP_STATUS_TCP_ERROR";
298 case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR:
299 return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR";
300 case _AMQP_STATUS_TCP_NEXT_VALUE:
301 return "AMQP_STATUS_INTERNAL";
302 case AMQP_STATUS_SSL_ERROR:
303 return "AMQP_STATUS_SSL_ERROR";
304 case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED:
305 return "AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED";
306 case AMQP_STATUS_SSL_PEER_VERIFY_FAILED:
307 return "AMQP_STATUS_SSL_PEER_VERIFY_FAILED";
308 case AMQP_STATUS_SSL_CONNECTION_FAILED:
309 return "AMQP_STATUS_SSL_CONNECTION_FAILED";
310 case _AMQP_STATUS_SSL_NEXT_VALUE:
311 return "AMQP_STATUS_INTERNAL";
312 }
313 return "AMQP_STATUS_UNKNOWN";
314 }
315
316 // TODO: add status_to_string on the connection object to prinf full status
317
318 // convert int status to string - including RGW specific values
319 std::string status_to_string(int s) {
320 switch (s) {
321 case RGW_AMQP_STATUS_BROKER_NACK:
322 return "RGW_AMQP_STATUS_BROKER_NACK";
323 case RGW_AMQP_STATUS_CONNECTION_CLOSED:
324 return "RGW_AMQP_STATUS_CONNECTION_CLOSED";
325 case RGW_AMQP_STATUS_QUEUE_FULL:
326 return "RGW_AMQP_STATUS_QUEUE_FULL";
327 case RGW_AMQP_STATUS_MAX_INFLIGHT:
328 return "RGW_AMQP_STATUS_MAX_INFLIGHT";
329 case RGW_AMQP_STATUS_MANAGER_STOPPED:
330 return "RGW_AMQP_STATUS_MANAGER_STOPPED";
331 case RGW_AMQP_STATUS_CONN_ALLOC_FAILED:
332 return "RGW_AMQP_STATUS_CONN_ALLOC_FAILED";
333 case RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED:
334 return "RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED";
335 case RGW_AMQP_STATUS_SOCKET_OPEN_FAILED:
336 return "RGW_AMQP_STATUS_SOCKET_OPEN_FAILED";
337 case RGW_AMQP_STATUS_LOGIN_FAILED:
338 return "RGW_AMQP_STATUS_LOGIN_FAILED";
339 case RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED:
340 return "RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED";
341 case RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED:
342 return "RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED";
343 case RGW_AMQP_STATUS_Q_DECLARE_FAILED:
344 return "RGW_AMQP_STATUS_Q_DECLARE_FAILED";
345 case RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED:
346 return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
347 case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED:
348 return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
349 }
350 return to_string((amqp_status_enum)s);
351 }
352
353 // check the result from calls and return if error (=null)
354 #define RETURN_ON_ERROR(C, S, OK) \
355 if (!OK) { \
356 C->status = S; \
357 return C; \
358 }
359
360 // in case of RPC calls, getting the RPC reply and return if an error is detected
361 #define RETURN_ON_REPLY_ERROR(C, ST, S) { \
362 const auto reply = amqp_get_rpc_reply(ST); \
363 if (reply.reply_type != AMQP_RESPONSE_NORMAL) { \
364 C->status = S; \
365 C->reply_type = reply.reply_type; \
366 C->reply_code = reply_to_code(reply); \
367 return C; \
368 } \
369 }
370
371 static const amqp_channel_t CHANNEL_ID = 1;
372 static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2;
373
374 // utility function to create a connection, when the connection object already exists
375 connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) {
376 // pointer must be valid and not marked for deletion
377 ceph_assert(conn && !conn->marked_for_deletion);
378
379 // reset all status codes
380 conn->status = AMQP_STATUS_OK;
381 conn->reply_type = AMQP_RESPONSE_NORMAL;
382 conn->reply_code = RGW_AMQP_NO_REPLY_CODE;
383
384 auto state = amqp_new_connection();
385 if (!state) {
386 conn->status = RGW_AMQP_STATUS_CONN_ALLOC_FAILED;
387 return conn;
388 }
389 // make sure that the connection state is cleaned up in case of error
390 ConnectionCleaner state_guard(state);
391
392 // create and open socket
393 auto socket = amqp_tcp_socket_new(state);
394 if (!socket) {
395 conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED;
396 return conn;
397 }
398 const auto s = amqp_socket_open(socket, info.host, info.port);
399 if (s < 0) {
400 conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED;
401 conn->reply_type = RGW_AMQP_RESPONSE_SOCKET_ERROR;
402 conn->reply_code = s;
403 return conn;
404 }
405
406 // login to broker
407 const auto reply = amqp_login(state,
408 info.vhost,
409 AMQP_DEFAULT_MAX_CHANNELS,
410 AMQP_DEFAULT_FRAME_SIZE,
411 0, // no heartbeat TODO: add conf
412 AMQP_SASL_METHOD_PLAIN, // TODO: add other types of security
413 info.user,
414 info.password);
415 if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
416 conn->status = RGW_AMQP_STATUS_LOGIN_FAILED;
417 conn->reply_type = reply.reply_type;
418 conn->reply_code = reply_to_code(reply);
419 return conn;
420 }
421
422 // open channels
423 {
424 const auto ok = amqp_channel_open(state, CHANNEL_ID);
425 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED, ok);
426 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED);
427 }
428 {
429 const auto ok = amqp_channel_open(state, CONFIRMING_CHANNEL_ID);
430 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED, ok);
431 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED);
432 }
433 {
434 const auto ok = amqp_confirm_select(state, CONFIRMING_CHANNEL_ID);
435 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED, ok);
436 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED);
437 }
438
439 // verify that the topic exchange is there
440 // TODO: make this step optional
441 {
442 const auto ok = amqp_exchange_declare(state,
443 CHANNEL_ID,
444 amqp_cstring_bytes(conn->exchange.c_str()),
445 amqp_cstring_bytes("topic"),
446 1, // passive - exchange must already exist on broker
447 1, // durable
448 0, // dont auto-delete
449 0, // not internal
450 amqp_empty_table);
451 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED, ok);
452 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED);
453 }
454 {
455 // create queue for confirmations
456 const auto queue_ok = amqp_queue_declare(state,
457 CHANNEL_ID, // use the regular channel for this call
458 amqp_empty_bytes, // let broker allocate queue name
459 0, // not passive - create the queue
460 0, // not durable
461 1, // exclusive
462 1, // auto-delete
463 amqp_empty_table // not args TODO add args from conf: TTL, max length etc.
464 );
465 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_Q_DECLARE_FAILED, queue_ok);
466 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_Q_DECLARE_FAILED);
467
468 // define consumption for connection
469 const auto consume_ok = amqp_basic_consume(state,
470 CONFIRMING_CHANNEL_ID,
471 queue_ok->queue,
472 amqp_empty_bytes, // broker will generate consumer tag
473 1, // messages sent from client are never routed back
474 1, // client does not ack thr acks
475 1, // exclusive access to queue
476 amqp_empty_table // no parameters
477 );
478
479 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED, consume_ok);
480 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED);
481 // broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed
482
483 state_guard.reset();
484 conn->state = state;
485 conn->reply_to_queue = amqp_bytes_malloc_dup(queue_ok->queue);
486 return conn;
487 }
488 }
489
490 // utility function to create a new connection
491 connection_ptr_t create_new_connection(const amqp_connection_info& info,
492 const std::string& exchange, CephContext* cct) {
493 // create connection state
494 connection_ptr_t conn = new connection_t;
495 conn->exchange = exchange;
496 conn->user.assign(info.user);
497 conn->password.assign(info.password);
498 conn->cct = cct;
499 return create_connection(conn, info);
500 }
501
502 /// struct used for holding messages in the message queue
503 struct message_wrapper_t {
504 connection_ptr_t conn;
505 std::string topic;
506 std::string message;
507 reply_callback_t cb;
508
509 message_wrapper_t(connection_ptr_t& _conn,
510 const std::string& _topic,
511 const std::string& _message,
512 reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
513 };
514
515
516 typedef std::unordered_map<connection_id_t, connection_ptr_t, connection_id_t::hasher> ConnectionList;
517 typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
518
519 // macros used inside a loop where an iterator is either incremented or erased
520 #define INCREMENT_AND_CONTINUE(IT) \
521 ++IT; \
522 continue;
523
524 #define ERASE_AND_CONTINUE(IT,CONTAINER) \
525 IT=CONTAINER.erase(IT); \
526 --connection_count; \
527 continue;
528
529 class Manager {
530 public:
531 const size_t max_connections;
532 const size_t max_inflight;
533 const size_t max_queue;
534 private:
535 std::atomic<size_t> connection_count;
536 bool stopped;
537 struct timeval read_timeout;
538 ConnectionList connections;
539 MessageQueue messages;
540 std::atomic<size_t> queued;
541 std::atomic<size_t> dequeued;
542 CephContext* const cct;
543 mutable std::mutex connections_lock;
544 std::thread runner;
545
546 void publish_internal(message_wrapper_t* message) {
547 const std::unique_ptr<message_wrapper_t> msg_owner(message);
548 auto& conn = message->conn;
549
550 if (!conn->is_ok()) {
551 // connection had an issue while message was in the queue
552 // TODO add error stats
553 ldout(conn->cct, 1) << "AMQP publish: connection had an issue while message was in the queue" << dendl;
554 if (message->cb) {
555 message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
556 }
557 return;
558 }
559
560 if (message->cb == nullptr) {
561 // TODO add error stats
562 const auto rc = amqp_basic_publish(conn->state,
563 CHANNEL_ID,
564 amqp_cstring_bytes(conn->exchange.c_str()),
565 amqp_cstring_bytes(message->topic.c_str()),
566 1, // mandatory, TODO: take from conf
567 0, // not immediate
568 nullptr,
569 amqp_cstring_bytes(message->message.c_str()));
570 if (rc == AMQP_STATUS_OK) {
571 ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl;
572 return;
573 }
574 ldout(conn->cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl;
575 // an error occurred, close connection
576 // it will be retied by the main loop
577 conn->destroy(rc);
578 return;
579 }
580
581 amqp_basic_properties_t props;
582 props._flags =
583 AMQP_BASIC_DELIVERY_MODE_FLAG |
584 AMQP_BASIC_REPLY_TO_FLAG;
585 props.delivery_mode = 2; // persistent delivery TODO take from conf
586 props.reply_to = conn->reply_to_queue;
587
588 const auto rc = amqp_basic_publish(conn->state,
589 CONFIRMING_CHANNEL_ID,
590 amqp_cstring_bytes(conn->exchange.c_str()),
591 amqp_cstring_bytes(message->topic.c_str()),
592 1, // mandatory, TODO: take from conf
593 0, // not immediate
594 &props,
595 amqp_cstring_bytes(message->message.c_str()));
596
597 if (rc == AMQP_STATUS_OK) {
598 auto const q_len = conn->callbacks.size();
599 if (q_len < max_inflight) {
600 ldout(conn->cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
601 conn->callbacks.emplace_back(conn->delivery_tag++, message->cb);
602 } else {
603 // immediately invoke callback with error
604 ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
605 message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT);
606 }
607 } else {
608 // an error occurred, close connection
609 // it will be retied by the main loop
610 ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
611 conn->destroy(rc);
612 // immediately invoke callback with error
613 message->cb(rc);
614 }
615 }
616
617 // the managers thread:
618 // (1) empty the queue of messages to be published
619 // (2) loop over all connections and read acks
620 // (3) manages deleted connections
621 // (4) TODO reconnect on connection errors
622 // (5) TODO cleanup timedout callbacks
623 void run() {
624 amqp_frame_t frame;
625 while (!stopped) {
626
627 // publish all messages in the queue
628 const auto count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
629 dequeued += count;
630 ConnectionList::iterator conn_it;
631 ConnectionList::const_iterator end_it;
632 {
633 // thread safe access to the connection list
634 // once the iterators are fetched they are guaranteed to remain valid
635 std::lock_guard lock(connections_lock);
636 conn_it = connections.begin();
637 end_it = connections.end();
638 }
639 auto incoming_message = false;
640 // loop over all connections to read acks
641 for (;conn_it != end_it;) {
642
643 auto& conn = conn_it->second;
644 // delete the connection if marked for deletion
645 if (conn->marked_for_deletion) {
646 ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
647 conn->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
648 std::lock_guard lock(connections_lock);
649 // erase is safe - does not invalidate any other iterator
650 // lock so no insertion happens at the same time
651 ERASE_AND_CONTINUE(conn_it, connections);
652 }
653
654 // try to reconnect the connection if it has an error
655 if (!conn->is_ok()) {
656 // pointers are used temporarily inside the amqp_connection_info object
657 // as read-only values, hence the assignment, and const_cast are safe here
658 amqp_connection_info info;
659 info.host = const_cast<char*>(conn_it->first.host.c_str());
660 info.port = conn_it->first.port;
661 info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
662 info.user = const_cast<char*>(conn->user.c_str());
663 info.password = const_cast<char*>(conn->password.c_str());
664 ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
665 if (create_connection(conn, info)->is_ok() == false) {
666 ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed" << dendl;
667 // TODO: add error counter for failed retries
668 // TODO: add exponential backoff for retries
669 } else {
670 ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
671 }
672 INCREMENT_AND_CONTINUE(conn_it);
673 }
674
675 const auto rc = amqp_simple_wait_frame_noblock(conn->state, &frame, &read_timeout);
676
677 if (rc == AMQP_STATUS_TIMEOUT) {
678 // TODO mark connection as idle
679 INCREMENT_AND_CONTINUE(conn_it);
680 }
681
682 // this is just to prevent spinning idle, does not indicate that a message
683 // was successfully processed or not
684 incoming_message = true;
685
686 // check if error occurred that require reopening the connection
687 if (rc != AMQP_STATUS_OK) {
688 // an error occurred, close connection
689 // it will be retied by the main loop
690 ldout(conn->cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
691 conn->destroy(rc);
692 INCREMENT_AND_CONTINUE(conn_it);
693 }
694
695 if (frame.frame_type != AMQP_FRAME_METHOD) {
696 ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages" << dendl;
697 // handler is for publish confirmation only - handle only method frames
698 // TODO: add a counter
699 INCREMENT_AND_CONTINUE(conn_it);
700 }
701
702 uint64_t tag;
703 bool multiple;
704 int result;
705
706 switch (frame.payload.method.id) {
707 case AMQP_BASIC_ACK_METHOD:
708 {
709 result = AMQP_STATUS_OK;
710 const auto ack = (amqp_basic_ack_t*)frame.payload.method.decoded;
711 ceph_assert(ack);
712 tag = ack->delivery_tag;
713 multiple = ack->multiple;
714 break;
715 }
716 case AMQP_BASIC_NACK_METHOD:
717 {
718 result = RGW_AMQP_STATUS_BROKER_NACK;
719 const auto nack = (amqp_basic_nack_t*)frame.payload.method.decoded;
720 ceph_assert(nack);
721 tag = nack->delivery_tag;
722 multiple = nack->multiple;
723 break;
724 }
725 case AMQP_CONNECTION_CLOSE_METHOD:
726 // TODO on channel close, no need to reopen the connection
727 case AMQP_CHANNEL_CLOSE_METHOD:
728 {
729 // other side closed the connection, no need to continue
730 ldout(conn->cct, 10) << "AMQP run: connection was closed by broker" << dendl;
731 conn->destroy(rc);
732 INCREMENT_AND_CONTINUE(conn_it);
733 }
734 case AMQP_BASIC_RETURN_METHOD:
735 // message was not delivered, returned to sender
736 // TODO: add a counter
737 ldout(conn->cct, 10) << "AMQP run: message delivery error" << dendl;
738 INCREMENT_AND_CONTINUE(conn_it);
739 break;
740 default:
741 // unexpected method
742 // TODO: add a counter
743 ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl;
744 INCREMENT_AND_CONTINUE(conn_it);
745 }
746
747 const auto& callbacks_end = conn->callbacks.end();
748 const auto& callbacks_begin = conn->callbacks.begin();
749 const auto tag_it = std::find(callbacks_begin, callbacks_end, tag);
750 if (tag_it != callbacks_end) {
751 if (multiple) {
752 // n/ack all up to (and including) the tag
753 ldout(conn->cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
754 auto it = callbacks_begin;
755 while (it->tag <= tag && it != conn->callbacks.end()) {
756 ldout(conn->cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
757 it->cb(result);
758 it = conn->callbacks.erase(it);
759 }
760 } else {
761 // n/ack a specific tag
762 ldout(conn->cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
763 tag_it->cb(result);
764 conn->callbacks.erase(tag_it);
765 }
766 } else {
767 // TODO add counter for acks with no callback
768 ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
769 }
770 // just increment the iterator
771 ++conn_it;
772 }
773 // if no messages were received or published, sleep for 100ms
774 if (count == 0 && !incoming_message) {
775 std::this_thread::sleep_for(std::chrono::milliseconds(100));
776 }
777 }
778 }
779
780 // used in the dtor for message cleanup
781 static void delete_message(const message_wrapper_t* message) {
782 delete message;
783 }
784
785 public:
786 Manager(size_t _max_connections,
787 size_t _max_inflight,
788 size_t _max_queue,
789 long _usec_timeout,
790 CephContext* _cct) :
791 max_connections(_max_connections),
792 max_inflight(_max_inflight),
793 max_queue(_max_queue),
794 connection_count(0),
795 stopped(false),
796 read_timeout{0, _usec_timeout},
797 connections(_max_connections),
798 messages(max_queue),
799 queued(0),
800 dequeued(0),
801 cct(_cct),
802 runner(&Manager::run, this) {
803 // The hashmap has "max connections" as the initial number of buckets,
804 // and allows for 10 collisions per bucket before rehash.
805 // This is to prevent rehashing so that iterators are not invalidated
806 // when a new connection is added.
807 connections.max_load_factor(10.0);
808 // give the runner thread a name for easier debugging
809 const auto rc = ceph_pthread_setname(runner.native_handle(), "amqp_manager");
810 ceph_assert(rc==0);
811 }
812
813 // non copyable
814 Manager(const Manager&) = delete;
815 const Manager& operator=(const Manager&) = delete;
816
817 // stop the main thread
818 void stop() {
819 stopped = true;
820 }
821
822 // disconnect from a broker
823 bool disconnect(connection_ptr_t& conn) {
824 if (!conn || stopped) {
825 return false;
826 }
827 conn->marked_for_deletion = true;
828 return true;
829 }
830
831 // connect to a broker, or reuse an existing connection if already connected
832 connection_ptr_t connect(const std::string& url, const std::string& exchange) {
833 if (stopped) {
834 // TODO: increment counter
835 ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
836 return nullptr;
837 }
838
839 struct amqp_connection_info info;
840 // cache the URL so that parsing could happen in-place
841 std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
842 if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) {
843 // TODO: increment counter
844 ldout(cct, 1) << "AMQP connect: URL parsing failed" << dendl;
845 return nullptr;
846 }
847
848 const connection_id_t id(info);
849 std::lock_guard lock(connections_lock);
850 const auto it = connections.find(id);
851 if (it != connections.end()) {
852 if (it->second->marked_for_deletion) {
853 // TODO: increment counter
854 ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl;
855 return nullptr;
856 } else if (it->second->exchange != exchange) {
857 // TODO: increment counter
858 ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
859 return nullptr;
860 }
861 // connection found - return even if non-ok
862 ldout(cct, 20) << "AMQP connect: connection found" << dendl;
863 return it->second;
864 }
865
866 // connection not found, creating a new one
867 if (connection_count >= max_connections) {
868 // TODO: increment counter
869 ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
870 return nullptr;
871 }
872 const auto conn = create_new_connection(info, exchange, cct);
873 // create_new_connection must always return a connection object
874 // even if error occurred during creation.
875 // in such a case the creation will be retried in the main thread
876 ceph_assert(conn);
877 ++connection_count;
878 ldout(cct, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count << dendl;
879 ldout(cct, 10) << "AMQP connect: new connection status is: " << status_to_string(conn->status) << dendl;
880 return connections.emplace(id, conn).first->second;
881 }
882
883 // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
884 int publish(connection_ptr_t& conn,
885 const std::string& topic,
886 const std::string& message) {
887 if (stopped) {
888 return RGW_AMQP_STATUS_MANAGER_STOPPED;
889 }
890 if (!conn || !conn->is_ok()) {
891 return RGW_AMQP_STATUS_CONNECTION_CLOSED;
892 }
893 if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
894 ++queued;
895 return AMQP_STATUS_OK;
896 }
897 return RGW_AMQP_STATUS_QUEUE_FULL;
898 }
899
900 int publish_with_confirm(connection_ptr_t& conn,
901 const std::string& topic,
902 const std::string& message,
903 reply_callback_t cb) {
904 if (stopped) {
905 return RGW_AMQP_STATUS_MANAGER_STOPPED;
906 }
907 if (!conn || !conn->is_ok()) {
908 return RGW_AMQP_STATUS_CONNECTION_CLOSED;
909 }
910 if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
911 ++queued;
912 return AMQP_STATUS_OK;
913 }
914 return RGW_AMQP_STATUS_QUEUE_FULL;
915 }
916
917 // dtor wait for thread to stop
918 // then connection are cleaned-up
919 ~Manager() {
920 stopped = true;
921 runner.join();
922 messages.consume_all(delete_message);
923 }
924
925 // get the number of connections
926 size_t get_connection_count() const {
927 return connection_count;
928 }
929
930 // get the number of in-flight messages
931 size_t get_inflight() const {
932 size_t sum = 0;
933 std::lock_guard lock(connections_lock);
934 std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
935 sum += conn_pair.second->callbacks.size();
936 });
937 return sum;
938 }
939
940 // running counter of the queued messages
941 size_t get_queued() const {
942 return queued;
943 }
944
945 // running counter of the dequeued messages
946 size_t get_dequeued() const {
947 return dequeued;
948 }
949 };
950
951 // singleton manager
952 // note that the manager itself is not a singleton, and multiple instances may co-exist
953 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
954 static Manager* s_manager = nullptr;
955
956 static const size_t MAX_CONNECTIONS_DEFAULT = 256;
957 static const size_t MAX_INFLIGHT_DEFAULT = 8192;
958 static const size_t MAX_QUEUE_DEFAULT = 8192;
959
960 bool init(CephContext* cct) {
961 if (s_manager) {
962 return false;
963 }
964 // TODO: take conf from CephContext
965 s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct);
966 return true;
967 }
968
969 void shutdown() {
970 delete s_manager;
971 s_manager = nullptr;
972 }
973
974 connection_ptr_t connect(const std::string& url, const std::string& exchange) {
975 if (!s_manager) return nullptr;
976 return s_manager->connect(url, exchange);
977 }
978
979 int publish(connection_ptr_t& conn,
980 const std::string& topic,
981 const std::string& message) {
982 if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
983 return s_manager->publish(conn, topic, message);
984 }
985
986 int publish_with_confirm(connection_ptr_t& conn,
987 const std::string& topic,
988 const std::string& message,
989 reply_callback_t cb) {
990 if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
991 return s_manager->publish_with_confirm(conn, topic, message, cb);
992 }
993
994 size_t get_connection_count() {
995 if (!s_manager) return 0;
996 return s_manager->get_connection_count();
997 }
998
999 size_t get_inflight() {
1000 if (!s_manager) return 0;
1001 return s_manager->get_inflight();
1002 }
1003
1004 size_t get_queued() {
1005 if (!s_manager) return 0;
1006 return s_manager->get_queued();
1007 }
1008
1009 size_t get_dequeued() {
1010 if (!s_manager) return 0;
1011 return s_manager->get_dequeued();
1012 }
1013
1014 size_t get_max_connections() {
1015 if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
1016 return s_manager->max_connections;
1017 }
1018
1019 size_t get_max_inflight() {
1020 if (!s_manager) return MAX_INFLIGHT_DEFAULT;
1021 return s_manager->max_inflight;
1022 }
1023
1024 size_t get_max_queue() {
1025 if (!s_manager) return MAX_QUEUE_DEFAULT;
1026 return s_manager->max_queue;
1027 }
1028
1029 bool disconnect(connection_ptr_t& conn) {
1030 if (!s_manager) return false;
1031 return s_manager->disconnect(conn);
1032 }
1033
1034 } // namespace amqp
1035