]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_amqp.cc
import ceph 15.2.10
[ceph.git] / ceph / src / rgw / rgw_amqp.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_amqp.h"
5 #include <amqp.h>
6 #include <amqp_tcp_socket.h>
7 #include <amqp_framing.h>
8 #include "include/ceph_assert.h"
9 #include <sstream>
10 #include <cstring>
11 #include <unordered_map>
12 #include <string>
13 #include <vector>
14 #include <thread>
15 #include <atomic>
16 #include <mutex>
17 #include <boost/lockfree/queue.hpp>
18 #include "common/dout.h"
19
20 #define dout_subsys ceph_subsys_rgw
21
22 // TODO investigation, not necessarily issues:
23 // (1) in case of single threaded writer context use spsc_queue
24 // (2) support multiple channels
25 // (3) check performance of emptying queue to local list, and go over the list and publish
26 // (4) use std::shared_mutex (c++17) or equivalent for the connections lock
27
28 namespace rgw::amqp {
29
30 // RGW AMQP status codes for publishing
31 static const int RGW_AMQP_STATUS_BROKER_NACK = -0x1001;
32 static const int RGW_AMQP_STATUS_CONNECTION_CLOSED = -0x1002;
33 static const int RGW_AMQP_STATUS_QUEUE_FULL = -0x1003;
34 static const int RGW_AMQP_STATUS_MAX_INFLIGHT = -0x1004;
35 static const int RGW_AMQP_STATUS_MANAGER_STOPPED = -0x1005;
36 // RGW AMQP status code for connection opening
37 static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED = -0x2001;
38 static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED = -0x2002;
39 static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED = -0x2003;
40 static const int RGW_AMQP_STATUS_LOGIN_FAILED = -0x2004;
41 static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED = -0x2005;
42 static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED = -0x2006;
43 static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED = -0x2007;
44 static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008;
45 static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009;
46
47 static const int RGW_AMQP_RESPONSE_SOCKET_ERROR = -0x3008;
48 static const int RGW_AMQP_NO_REPLY_CODE = 0x0;
49
50 // key class for the connection list
51 struct connection_id_t {
52 const std::string host;
53 const int port;
54 const std::string vhost;
55 // constructed from amqp_connection_info struct
56 connection_id_t(const amqp_connection_info& info)
57 : host(info.host), port(info.port), vhost(info.vhost) {}
58
59 // equality operator and hasher functor are needed
60 // so that connection_id_t could be used as key in unordered_map
61 bool operator==(const connection_id_t& other) const {
62 return host == other.host && port == other.port && vhost == other.vhost;
63 }
64
65 struct hasher {
66 std::size_t operator()(const connection_id_t& k) const {
67 return ((std::hash<std::string>()(k.host)
68 ^ (std::hash<int>()(k.port) << 1)) >> 1)
69 ^ (std::hash<std::string>()(k.vhost) << 1);
70 }
71 };
72 };
73
74 std::string to_string(const connection_id_t& id) {
75 return id.host+":"+std::to_string(id.port)+id.vhost;
76 }
77
78 // connection_t state cleaner
79 // could be used for automatic cleanup when getting out of scope
80 class ConnectionCleaner {
81 private:
82 amqp_connection_state_t conn;
83 public:
84 ConnectionCleaner(amqp_connection_state_t _conn) : conn(_conn) {}
85 ~ConnectionCleaner() {
86 if (conn) {
87 amqp_destroy_connection(conn);
88 }
89 }
90 // call reset() if cleanup is not needed anymore
91 void reset() {
92 conn = nullptr;
93 }
94 };
95
96 // struct for holding the callback and its tag in the callback list
97 struct reply_callback_with_tag_t {
98 uint64_t tag;
99 reply_callback_t cb;
100
101 reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {}
102
103 bool operator==(uint64_t rhs) {
104 return tag == rhs;
105 }
106 };
107
108 typedef std::vector<reply_callback_with_tag_t> CallbackList;
109
110 // struct for holding the connection state object as well as the exchange
111 // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
112 // since references to deleted objects may still exist in the calling code
113 struct connection_t {
114 amqp_connection_state_t state;
115 std::string exchange;
116 std::string user;
117 std::string password;
118 amqp_bytes_t reply_to_queue;
119 bool marked_for_deletion;
120 uint64_t delivery_tag;
121 int status;
122 int reply_type;
123 int reply_code;
124 mutable std::atomic<int> ref_count;
125 CephContext* cct;
126 CallbackList callbacks;
127 ceph::coarse_real_clock::time_point next_reconnect;
128 bool mandatory;
129
130 // default ctor
131 connection_t() :
132 state(nullptr),
133 reply_to_queue(amqp_empty_bytes),
134 marked_for_deletion(false),
135 delivery_tag(1),
136 status(AMQP_STATUS_OK),
137 reply_type(AMQP_RESPONSE_NORMAL),
138 reply_code(RGW_AMQP_NO_REPLY_CODE),
139 ref_count(0),
140 cct(nullptr),
141 next_reconnect(ceph::coarse_real_clock::now()),
142 mandatory(false)
143 {}
144
145 // cleanup of all internal connection resource
146 // the object can still remain, and internal connection
147 // resources created again on successful reconnection
148 void destroy(int s) {
149 status = s;
150 ConnectionCleaner clean_state(state);
151 state = nullptr;
152 amqp_bytes_free(reply_to_queue);
153 reply_to_queue = amqp_empty_bytes;
154 // fire all remaining callbacks
155 std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
156 cb_tag.cb(status);
157 ldout(cct, 20) << "AMQP destroy: invoking callback with tag=" << cb_tag.tag << dendl;
158 });
159 callbacks.clear();
160 delivery_tag = 1;
161 }
162
163 bool is_ok() const {
164 return (state != nullptr && !marked_for_deletion);
165 }
166
167 // dtor also destroys the internals
168 ~connection_t() {
169 destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
170 }
171
172 friend void intrusive_ptr_add_ref(const connection_t* p);
173 friend void intrusive_ptr_release(const connection_t* p);
174 };
175
176 // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
177 void intrusive_ptr_add_ref(const connection_t* p) {
178 ++p->ref_count;
179 }
180 void intrusive_ptr_release(const connection_t* p) {
181 if (--p->ref_count == 0) {
182 delete p;
183 }
184 }
185
186 // convert connection info to string
187 std::string to_string(const amqp_connection_info& info) {
188 std::stringstream ss;
189 ss << "connection info:" <<
190 "\nHost: " << info.host <<
191 "\nPort: " << info.port <<
192 "\nUser: " << info.user <<
193 "\nPassword: " << info.password <<
194 "\nvhost: " << info.vhost <<
195 "\nSSL support: " << info.ssl << std::endl;
196 return ss.str();
197 }
198
199 // convert reply to error code
200 int reply_to_code(const amqp_rpc_reply_t& reply) {
201 switch (reply.reply_type) {
202 case AMQP_RESPONSE_NONE:
203 case AMQP_RESPONSE_NORMAL:
204 return RGW_AMQP_NO_REPLY_CODE;
205 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
206 return reply.library_error;
207 case AMQP_RESPONSE_SERVER_EXCEPTION:
208 if (reply.reply.decoded) {
209 const amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
210 return m->reply_code;
211 }
212 return reply.reply.id;
213 }
214 return RGW_AMQP_NO_REPLY_CODE;
215 }
216
217 // convert reply to string
218 std::string to_string(const amqp_rpc_reply_t& reply) {
219 std::stringstream ss;
220 switch (reply.reply_type) {
221 case AMQP_RESPONSE_NORMAL:
222 return "";
223 case AMQP_RESPONSE_NONE:
224 return "missing RPC reply type";
225 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
226 return amqp_error_string2(reply.library_error);
227 case AMQP_RESPONSE_SERVER_EXCEPTION:
228 {
229 switch (reply.reply.id) {
230 case AMQP_CONNECTION_CLOSE_METHOD:
231 ss << "server connection error: ";
232 break;
233 case AMQP_CHANNEL_CLOSE_METHOD:
234 ss << "server channel error: ";
235 break;
236 default:
237 ss << "server unknown error: ";
238 break;
239 }
240 if (reply.reply.decoded) {
241 amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
242 ss << m->reply_code << " text: " << std::string((char*)m->reply_text.bytes, m->reply_text.len);
243 }
244 return ss.str();
245 }
246 default:
247 ss << "unknown error, method id: " << reply.reply.id;
248 return ss.str();
249 }
250 }
251
252 // convert status enum to string
253 std::string to_string(amqp_status_enum s) {
254 switch (s) {
255 case AMQP_STATUS_OK:
256 return "AMQP_STATUS_OK";
257 case AMQP_STATUS_NO_MEMORY:
258 return "AMQP_STATUS_NO_MEMORY";
259 case AMQP_STATUS_BAD_AMQP_DATA:
260 return "AMQP_STATUS_BAD_AMQP_DATA";
261 case AMQP_STATUS_UNKNOWN_CLASS:
262 return "AMQP_STATUS_UNKNOWN_CLASS";
263 case AMQP_STATUS_UNKNOWN_METHOD:
264 return "AMQP_STATUS_UNKNOWN_METHOD";
265 case AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED:
266 return "AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED";
267 case AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION:
268 return "AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION";
269 case AMQP_STATUS_CONNECTION_CLOSED:
270 return "AMQP_STATUS_CONNECTION_CLOSED";
271 case AMQP_STATUS_BAD_URL:
272 return "AMQP_STATUS_BAD_URL";
273 case AMQP_STATUS_SOCKET_ERROR:
274 return "AMQP_STATUS_SOCKET_ERROR";
275 case AMQP_STATUS_INVALID_PARAMETER:
276 return "AMQP_STATUS_INVALID_PARAMETER";
277 case AMQP_STATUS_TABLE_TOO_BIG:
278 return "AMQP_STATUS_TABLE_TOO_BIG";
279 case AMQP_STATUS_WRONG_METHOD:
280 return "AMQP_STATUS_WRONG_METHOD";
281 case AMQP_STATUS_TIMEOUT:
282 return "AMQP_STATUS_TIMEOUT";
283 case AMQP_STATUS_TIMER_FAILURE:
284 return "AMQP_STATUS_TIMER_FAILURE";
285 case AMQP_STATUS_HEARTBEAT_TIMEOUT:
286 return "AMQP_STATUS_HEARTBEAT_TIMEOUT";
287 case AMQP_STATUS_UNEXPECTED_STATE:
288 return "AMQP_STATUS_UNEXPECTED_STATE";
289 case AMQP_STATUS_SOCKET_CLOSED:
290 return "AMQP_STATUS_SOCKET_CLOSED";
291 case AMQP_STATUS_SOCKET_INUSE:
292 return "AMQP_STATUS_SOCKET_INUSE";
293 case AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD:
294 return "AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD";
295 #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 8, 0, 0)
296 case AMQP_STATUS_UNSUPPORTED:
297 return "AMQP_STATUS_UNSUPPORTED";
298 #endif
299 case _AMQP_STATUS_NEXT_VALUE:
300 return "AMQP_STATUS_INTERNAL";
301 case AMQP_STATUS_TCP_ERROR:
302 return "AMQP_STATUS_TCP_ERROR";
303 case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR:
304 return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR";
305 case _AMQP_STATUS_TCP_NEXT_VALUE:
306 return "AMQP_STATUS_INTERNAL";
307 case AMQP_STATUS_SSL_ERROR:
308 return "AMQP_STATUS_SSL_ERROR";
309 case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED:
310 return "AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED";
311 case AMQP_STATUS_SSL_PEER_VERIFY_FAILED:
312 return "AMQP_STATUS_SSL_PEER_VERIFY_FAILED";
313 case AMQP_STATUS_SSL_CONNECTION_FAILED:
314 return "AMQP_STATUS_SSL_CONNECTION_FAILED";
315 case _AMQP_STATUS_SSL_NEXT_VALUE:
316 return "AMQP_STATUS_INTERNAL";
317 }
318 return "AMQP_STATUS_UNKNOWN";
319 }
320
321 // TODO: add status_to_string on the connection object to prinf full status
322
323 // convert int status to string - including RGW specific values
324 std::string status_to_string(int s) {
325 switch (s) {
326 case RGW_AMQP_STATUS_BROKER_NACK:
327 return "RGW_AMQP_STATUS_BROKER_NACK";
328 case RGW_AMQP_STATUS_CONNECTION_CLOSED:
329 return "RGW_AMQP_STATUS_CONNECTION_CLOSED";
330 case RGW_AMQP_STATUS_QUEUE_FULL:
331 return "RGW_AMQP_STATUS_QUEUE_FULL";
332 case RGW_AMQP_STATUS_MAX_INFLIGHT:
333 return "RGW_AMQP_STATUS_MAX_INFLIGHT";
334 case RGW_AMQP_STATUS_MANAGER_STOPPED:
335 return "RGW_AMQP_STATUS_MANAGER_STOPPED";
336 case RGW_AMQP_STATUS_CONN_ALLOC_FAILED:
337 return "RGW_AMQP_STATUS_CONN_ALLOC_FAILED";
338 case RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED:
339 return "RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED";
340 case RGW_AMQP_STATUS_SOCKET_OPEN_FAILED:
341 return "RGW_AMQP_STATUS_SOCKET_OPEN_FAILED";
342 case RGW_AMQP_STATUS_LOGIN_FAILED:
343 return "RGW_AMQP_STATUS_LOGIN_FAILED";
344 case RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED:
345 return "RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED";
346 case RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED:
347 return "RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED";
348 case RGW_AMQP_STATUS_Q_DECLARE_FAILED:
349 return "RGW_AMQP_STATUS_Q_DECLARE_FAILED";
350 case RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED:
351 return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
352 case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED:
353 return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
354 }
355 return to_string((amqp_status_enum)s);
356 }
357
358 // check the result from calls and return if error (=null)
359 #define RETURN_ON_ERROR(C, S, OK) \
360 if (!OK) { \
361 C->status = S; \
362 return C; \
363 }
364
365 // in case of RPC calls, getting the RPC reply and return if an error is detected
366 #define RETURN_ON_REPLY_ERROR(C, ST, S) { \
367 const auto reply = amqp_get_rpc_reply(ST); \
368 if (reply.reply_type != AMQP_RESPONSE_NORMAL) { \
369 C->status = S; \
370 C->reply_type = reply.reply_type; \
371 C->reply_code = reply_to_code(reply); \
372 return C; \
373 } \
374 }
375
376 static const amqp_channel_t CHANNEL_ID = 1;
377 static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2;
378
379 // utility function to create a connection, when the connection object already exists
380 connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) {
381 // pointer must be valid and not marked for deletion
382 ceph_assert(conn && !conn->marked_for_deletion);
383
384 // reset all status codes
385 conn->status = AMQP_STATUS_OK;
386 conn->reply_type = AMQP_RESPONSE_NORMAL;
387 conn->reply_code = RGW_AMQP_NO_REPLY_CODE;
388
389 auto state = amqp_new_connection();
390 if (!state) {
391 conn->status = RGW_AMQP_STATUS_CONN_ALLOC_FAILED;
392 return conn;
393 }
394 // make sure that the connection state is cleaned up in case of error
395 ConnectionCleaner state_guard(state);
396
397 // create and open socket
398 auto socket = amqp_tcp_socket_new(state);
399 if (!socket) {
400 conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED;
401 return conn;
402 }
403 const auto s = amqp_socket_open(socket, info.host, info.port);
404 if (s < 0) {
405 conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED;
406 conn->reply_type = RGW_AMQP_RESPONSE_SOCKET_ERROR;
407 conn->reply_code = s;
408 return conn;
409 }
410
411 // login to broker
412 const auto reply = amqp_login(state,
413 info.vhost,
414 AMQP_DEFAULT_MAX_CHANNELS,
415 AMQP_DEFAULT_FRAME_SIZE,
416 0, // no heartbeat TODO: add conf
417 AMQP_SASL_METHOD_PLAIN, // TODO: add other types of security
418 info.user,
419 info.password);
420 if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
421 conn->status = RGW_AMQP_STATUS_LOGIN_FAILED;
422 conn->reply_type = reply.reply_type;
423 conn->reply_code = reply_to_code(reply);
424 return conn;
425 }
426
427 // open channels
428 {
429 const auto ok = amqp_channel_open(state, CHANNEL_ID);
430 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED, ok);
431 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED);
432 }
433 {
434 const auto ok = amqp_channel_open(state, CONFIRMING_CHANNEL_ID);
435 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED, ok);
436 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED);
437 }
438 {
439 const auto ok = amqp_confirm_select(state, CONFIRMING_CHANNEL_ID);
440 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED, ok);
441 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED);
442 }
443
444 // verify that the topic exchange is there
445 // TODO: make this step optional
446 {
447 const auto ok = amqp_exchange_declare(state,
448 CHANNEL_ID,
449 amqp_cstring_bytes(conn->exchange.c_str()),
450 amqp_cstring_bytes("topic"),
451 1, // passive - exchange must already exist on broker
452 1, // durable
453 0, // dont auto-delete
454 0, // not internal
455 amqp_empty_table);
456 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED, ok);
457 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED);
458 }
459 {
460 // create queue for confirmations
461 const auto queue_ok = amqp_queue_declare(state,
462 CHANNEL_ID, // use the regular channel for this call
463 amqp_empty_bytes, // let broker allocate queue name
464 0, // not passive - create the queue
465 0, // not durable
466 1, // exclusive
467 1, // auto-delete
468 amqp_empty_table // not args TODO add args from conf: TTL, max length etc.
469 );
470 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_Q_DECLARE_FAILED, queue_ok);
471 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_Q_DECLARE_FAILED);
472
473 // define consumption for connection
474 const auto consume_ok = amqp_basic_consume(state,
475 CONFIRMING_CHANNEL_ID,
476 queue_ok->queue,
477 amqp_empty_bytes, // broker will generate consumer tag
478 1, // messages sent from client are never routed back
479 1, // client does not ack thr acks
480 1, // exclusive access to queue
481 amqp_empty_table // no parameters
482 );
483
484 RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED, consume_ok);
485 RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED);
486 // broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed
487
488 state_guard.reset();
489 conn->state = state;
490 conn->reply_to_queue = amqp_bytes_malloc_dup(queue_ok->queue);
491 return conn;
492 }
493 }
494
495 // utility function to create a new connection
496 connection_ptr_t create_new_connection(const amqp_connection_info& info,
497 const std::string& exchange, bool mandatory_delivery, CephContext* cct) {
498 // create connection state
499 connection_ptr_t conn = new connection_t;
500 conn->exchange = exchange;
501 conn->user.assign(info.user);
502 conn->password.assign(info.password);
503 conn->mandatory = mandatory_delivery;
504 conn->cct = cct;
505 return create_connection(conn, info);
506 }
507
508 /// struct used for holding messages in the message queue
509 struct message_wrapper_t {
510 connection_ptr_t conn;
511 std::string topic;
512 std::string message;
513 reply_callback_t cb;
514
515 message_wrapper_t(connection_ptr_t& _conn,
516 const std::string& _topic,
517 const std::string& _message,
518 reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
519 };
520
521
522 typedef std::unordered_map<connection_id_t, connection_ptr_t, connection_id_t::hasher> ConnectionList;
523 typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
524
525 // macros used inside a loop where an iterator is either incremented or erased
526 #define INCREMENT_AND_CONTINUE(IT) \
527 ++IT; \
528 continue;
529
530 #define ERASE_AND_CONTINUE(IT,CONTAINER) \
531 IT=CONTAINER.erase(IT); \
532 --connection_count; \
533 continue;
534
535 class Manager {
536 public:
537 const size_t max_connections;
538 const size_t max_inflight;
539 const size_t max_queue;
540 private:
541 std::atomic<size_t> connection_count;
542 bool stopped;
543 struct timeval read_timeout;
544 ConnectionList connections;
545 MessageQueue messages;
546 std::atomic<size_t> queued;
547 std::atomic<size_t> dequeued;
548 CephContext* const cct;
549 mutable std::mutex connections_lock;
550 std::thread runner;
551 const ceph::coarse_real_clock::duration idle_time;
552 const ceph::coarse_real_clock::duration reconnect_time;
553
554 void publish_internal(message_wrapper_t* message) {
555 const std::unique_ptr<message_wrapper_t> msg_owner(message);
556 auto& conn = message->conn;
557
558 if (!conn->is_ok()) {
559 // connection had an issue while message was in the queue
560 // TODO add error stats
561 ldout(conn->cct, 1) << "AMQP publish: connection had an issue while message was in the queue" << dendl;
562 if (message->cb) {
563 message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
564 }
565 return;
566 }
567
568 if (message->cb == nullptr) {
569 // TODO add error stats
570 const auto rc = amqp_basic_publish(conn->state,
571 CHANNEL_ID,
572 amqp_cstring_bytes(conn->exchange.c_str()),
573 amqp_cstring_bytes(message->topic.c_str()),
574 0, // does not have to be routable
575 0, // not immediate
576 nullptr, // no properties needed
577 amqp_cstring_bytes(message->message.c_str()));
578 if (rc == AMQP_STATUS_OK) {
579 ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl;
580 return;
581 }
582 ldout(conn->cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl;
583 // an error occurred, close connection
584 // it will be retied by the main loop
585 conn->destroy(rc);
586 return;
587 }
588
589 amqp_basic_properties_t props;
590 props._flags =
591 AMQP_BASIC_DELIVERY_MODE_FLAG |
592 AMQP_BASIC_REPLY_TO_FLAG;
593 props.delivery_mode = 2; // persistent delivery TODO take from conf
594 props.reply_to = conn->reply_to_queue;
595
596 const auto rc = amqp_basic_publish(conn->state,
597 CONFIRMING_CHANNEL_ID,
598 amqp_cstring_bytes(conn->exchange.c_str()),
599 amqp_cstring_bytes(message->topic.c_str()),
600 conn->mandatory,
601 0, // not immediate
602 &props,
603 amqp_cstring_bytes(message->message.c_str()));
604
605 if (rc == AMQP_STATUS_OK) {
606 auto const q_len = conn->callbacks.size();
607 if (q_len < max_inflight) {
608 ldout(conn->cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
609 conn->callbacks.emplace_back(conn->delivery_tag++, message->cb);
610 } else {
611 // immediately invoke callback with error
612 ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
613 message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT);
614 }
615 } else {
616 // an error occurred, close connection
617 // it will be retied by the main loop
618 ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
619 conn->destroy(rc);
620 // immediately invoke callback with error
621 message->cb(rc);
622 }
623 }
624
625 // the managers thread:
626 // (1) empty the queue of messages to be published
627 // (2) loop over all connections and read acks
628 // (3) manages deleted connections
629 // (4) TODO reconnect on connection errors
630 // (5) TODO cleanup timedout callbacks
631 void run() {
632 amqp_frame_t frame;
633 while (!stopped) {
634
635 // publish all messages in the queue
636 const auto count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
637 dequeued += count;
638 ConnectionList::iterator conn_it;
639 ConnectionList::const_iterator end_it;
640 {
641 // thread safe access to the connection list
642 // once the iterators are fetched they are guaranteed to remain valid
643 std::lock_guard lock(connections_lock);
644 conn_it = connections.begin();
645 end_it = connections.end();
646 }
647 auto incoming_message = false;
648 // loop over all connections to read acks
649 for (;conn_it != end_it;) {
650
651 auto& conn = conn_it->second;
652 // delete the connection if marked for deletion
653 if (conn->marked_for_deletion) {
654 ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
655 conn->destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
656 std::lock_guard lock(connections_lock);
657 // erase is safe - does not invalidate any other iterator
658 // lock so no insertion happens at the same time
659 ERASE_AND_CONTINUE(conn_it, connections);
660 }
661
662 // try to reconnect the connection if it has an error
663 if (!conn->is_ok()) {
664 const auto now = ceph::coarse_real_clock::now();
665 if (now >= conn->next_reconnect) {
666 // pointers are used temporarily inside the amqp_connection_info object
667 // as read-only values, hence the assignment, and const_cast are safe here
668 amqp_connection_info info;
669 info.host = const_cast<char*>(conn_it->first.host.c_str());
670 info.port = conn_it->first.port;
671 info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
672 info.user = const_cast<char*>(conn->user.c_str());
673 info.password = const_cast<char*>(conn->password.c_str());
674 ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
675 if (create_connection(conn, info)->is_ok() == false) {
676 ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed. error: " <<
677 status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl;
678 // TODO: add error counter for failed retries
679 // TODO: add exponential backoff for retries
680 conn->next_reconnect = now + reconnect_time;
681 } else {
682 ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
683 }
684 }
685 INCREMENT_AND_CONTINUE(conn_it);
686 }
687
688 const auto rc = amqp_simple_wait_frame_noblock(conn->state, &frame, &read_timeout);
689
690 if (rc == AMQP_STATUS_TIMEOUT) {
691 // TODO mark connection as idle
692 INCREMENT_AND_CONTINUE(conn_it);
693 }
694
695 // this is just to prevent spinning idle, does not indicate that a message
696 // was successfully processed or not
697 incoming_message = true;
698
699 // check if error occurred that require reopening the connection
700 if (rc != AMQP_STATUS_OK) {
701 // an error occurred, close connection
702 // it will be retied by the main loop
703 ldout(conn->cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
704 conn->destroy(rc);
705 INCREMENT_AND_CONTINUE(conn_it);
706 }
707
708 if (frame.frame_type != AMQP_FRAME_METHOD) {
709 ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: "
710 << unsigned(frame.frame_type) << dendl;
711 // handler is for publish confirmation only - handle only method frames
712 INCREMENT_AND_CONTINUE(conn_it);
713 }
714
715 uint64_t tag;
716 bool multiple;
717 int result;
718
719 switch (frame.payload.method.id) {
720 case AMQP_BASIC_ACK_METHOD:
721 {
722 result = AMQP_STATUS_OK;
723 const auto ack = (amqp_basic_ack_t*)frame.payload.method.decoded;
724 ceph_assert(ack);
725 tag = ack->delivery_tag;
726 multiple = ack->multiple;
727 break;
728 }
729 case AMQP_BASIC_NACK_METHOD:
730 {
731 result = RGW_AMQP_STATUS_BROKER_NACK;
732 const auto nack = (amqp_basic_nack_t*)frame.payload.method.decoded;
733 ceph_assert(nack);
734 tag = nack->delivery_tag;
735 multiple = nack->multiple;
736 break;
737 }
738 case AMQP_BASIC_REJECT_METHOD:
739 {
740 result = RGW_AMQP_STATUS_BROKER_NACK;
741 const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;
742 tag = reject->delivery_tag;
743 multiple = false;
744 break;
745 }
746 case AMQP_CONNECTION_CLOSE_METHOD:
747 // TODO on channel close, no need to reopen the connection
748 case AMQP_CHANNEL_CLOSE_METHOD:
749 {
750 // other side closed the connection, no need to continue
751 ldout(conn->cct, 10) << "AMQP run: connection was closed by broker" << dendl;
752 conn->destroy(rc);
753 INCREMENT_AND_CONTINUE(conn_it);
754 }
755 case AMQP_BASIC_RETURN_METHOD:
756 // message was not delivered, returned to sender
757 ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl;
758 INCREMENT_AND_CONTINUE(conn_it);
759 break;
760 default:
761 // unexpected method
762 ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl;
763 INCREMENT_AND_CONTINUE(conn_it);
764 }
765
766 const auto& callbacks_end = conn->callbacks.end();
767 const auto& callbacks_begin = conn->callbacks.begin();
768 const auto tag_it = std::find(callbacks_begin, callbacks_end, tag);
769 if (tag_it != callbacks_end) {
770 if (multiple) {
771 // n/ack all up to (and including) the tag
772 ldout(conn->cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
773 auto it = callbacks_begin;
774 while (it->tag <= tag && it != conn->callbacks.end()) {
775 ldout(conn->cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
776 it->cb(result);
777 it = conn->callbacks.erase(it);
778 }
779 } else {
780 // n/ack a specific tag
781 ldout(conn->cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
782 tag_it->cb(result);
783 conn->callbacks.erase(tag_it);
784 }
785 } else {
786 ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
787 }
788 // just increment the iterator
789 ++conn_it;
790 }
791 // if no messages were received or published, sleep for 100ms
792 if (count == 0 && !incoming_message) {
793 std::this_thread::sleep_for(idle_time);
794 }
795 }
796 }
797
798 // used in the dtor for message cleanup
799 static void delete_message(const message_wrapper_t* message) {
800 delete message;
801 }
802
803 public:
804 Manager(size_t _max_connections,
805 size_t _max_inflight,
806 size_t _max_queue,
807 long _usec_timeout,
808 unsigned reconnect_time_ms,
809 unsigned idle_time_ms,
810 CephContext* _cct) :
811 max_connections(_max_connections),
812 max_inflight(_max_inflight),
813 max_queue(_max_queue),
814 connection_count(0),
815 stopped(false),
816 read_timeout{0, _usec_timeout},
817 connections(_max_connections),
818 messages(max_queue),
819 queued(0),
820 dequeued(0),
821 cct(_cct),
822 runner(&Manager::run, this),
823 idle_time(std::chrono::milliseconds(idle_time_ms)),
824 reconnect_time(std::chrono::milliseconds(reconnect_time_ms)) {
825 // The hashmap has "max connections" as the initial number of buckets,
826 // and allows for 10 collisions per bucket before rehash.
827 // This is to prevent rehashing so that iterators are not invalidated
828 // when a new connection is added.
829 connections.max_load_factor(10.0);
830 // give the runner thread a name for easier debugging
831 const auto rc = ceph_pthread_setname(runner.native_handle(), "amqp_manager");
832 ceph_assert(rc==0);
833 }
834
835 // non copyable
836 Manager(const Manager&) = delete;
837 const Manager& operator=(const Manager&) = delete;
838
839 // stop the main thread
840 void stop() {
841 stopped = true;
842 }
843
844 // disconnect from a broker
845 bool disconnect(connection_ptr_t& conn) {
846 if (!conn || stopped) {
847 return false;
848 }
849 conn->marked_for_deletion = true;
850 return true;
851 }
852
853 // connect to a broker, or reuse an existing connection if already connected
854 connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
855 if (stopped) {
856 ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
857 return nullptr;
858 }
859
860 struct amqp_connection_info info;
861 // cache the URL so that parsing could happen in-place
862 std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
863 if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) {
864 ldout(cct, 1) << "AMQP connect: URL parsing failed" << dendl;
865 return nullptr;
866 }
867
868 const connection_id_t id(info);
869 std::lock_guard lock(connections_lock);
870 const auto it = connections.find(id);
871 if (it != connections.end()) {
872 if (it->second->marked_for_deletion) {
873 ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl;
874 return nullptr;
875 } else if (it->second->exchange != exchange) {
876 ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
877 return nullptr;
878 }
879 // connection found - return even if non-ok
880 ldout(cct, 20) << "AMQP connect: connection found" << dendl;
881 return it->second;
882 }
883
884 // connection not found, creating a new one
885 if (connection_count >= max_connections) {
886 ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
887 return nullptr;
888 }
889 const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct);
890 if (!conn->is_ok()) {
891 ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" <<
892 status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl;
893 }
894 // create_new_connection must always return a connection object
895 // even if error occurred during creation.
896 // in such a case the creation will be retried in the main thread
897 ceph_assert(conn);
898 ++connection_count;
899 ldout(cct, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count << dendl;
900 ldout(cct, 10) << "AMQP connect: new connection status is: " << status_to_string(conn->status) << dendl;
901 return connections.emplace(id, conn).first->second;
902 }
903
904 // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
905 int publish(connection_ptr_t& conn,
906 const std::string& topic,
907 const std::string& message) {
908 if (stopped) {
909 ldout(cct, 1) << "AMQP publish: manager is not running" << dendl;
910 return RGW_AMQP_STATUS_MANAGER_STOPPED;
911 }
912 if (!conn || !conn->is_ok()) {
913 ldout(cct, 1) << "AMQP publish: no connection" << dendl;
914 return RGW_AMQP_STATUS_CONNECTION_CLOSED;
915 }
916 if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
917 ++queued;
918 return AMQP_STATUS_OK;
919 }
920 ldout(cct, 1) << "AMQP publish: queue is full" << dendl;
921 return RGW_AMQP_STATUS_QUEUE_FULL;
922 }
923
924 int publish_with_confirm(connection_ptr_t& conn,
925 const std::string& topic,
926 const std::string& message,
927 reply_callback_t cb) {
928 if (stopped) {
929 ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl;
930 return RGW_AMQP_STATUS_MANAGER_STOPPED;
931 }
932 if (!conn || !conn->is_ok()) {
933 ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl;
934 return RGW_AMQP_STATUS_CONNECTION_CLOSED;
935 }
936 if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
937 ++queued;
938 return AMQP_STATUS_OK;
939 }
940 ldout(cct, 1) << "AMQP publish_with_confirm: queue is full" << dendl;
941 return RGW_AMQP_STATUS_QUEUE_FULL;
942 }
943
944 // dtor wait for thread to stop
945 // then connection are cleaned-up
946 ~Manager() {
947 stopped = true;
948 runner.join();
949 messages.consume_all(delete_message);
950 }
951
952 // get the number of connections
953 size_t get_connection_count() const {
954 return connection_count;
955 }
956
957 // get the number of in-flight messages
958 size_t get_inflight() const {
959 size_t sum = 0;
960 std::lock_guard lock(connections_lock);
961 std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
962 sum += conn_pair.second->callbacks.size();
963 });
964 return sum;
965 }
966
967 // running counter of the queued messages
968 size_t get_queued() const {
969 return queued;
970 }
971
972 // running counter of the dequeued messages
973 size_t get_dequeued() const {
974 return dequeued;
975 }
976 };
977
978 // singleton manager
979 // note that the manager itself is not a singleton, and multiple instances may co-exist
980 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
981 static Manager* s_manager = nullptr;
982
983 static const size_t MAX_CONNECTIONS_DEFAULT = 256;
984 static const size_t MAX_INFLIGHT_DEFAULT = 8192;
985 static const size_t MAX_QUEUE_DEFAULT = 8192;
986 static const long READ_TIMEOUT_USEC = 100;
987 static const unsigned IDLE_TIME_MS = 100;
988 static const unsigned RECONNECT_TIME_MS = 100;
989
990 bool init(CephContext* cct) {
991 if (s_manager) {
992 return false;
993 }
994 // TODO: take conf from CephContext
995 s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT,
996 READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct);
997 return true;
998 }
999
1000 void shutdown() {
1001 delete s_manager;
1002 s_manager = nullptr;
1003 }
1004
1005 connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
1006 if (!s_manager) return nullptr;
1007 return s_manager->connect(url, exchange, mandatory_delivery);
1008 }
1009
1010 int publish(connection_ptr_t& conn,
1011 const std::string& topic,
1012 const std::string& message) {
1013 if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
1014 return s_manager->publish(conn, topic, message);
1015 }
1016
1017 int publish_with_confirm(connection_ptr_t& conn,
1018 const std::string& topic,
1019 const std::string& message,
1020 reply_callback_t cb) {
1021 if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
1022 return s_manager->publish_with_confirm(conn, topic, message, cb);
1023 }
1024
1025 size_t get_connection_count() {
1026 if (!s_manager) return 0;
1027 return s_manager->get_connection_count();
1028 }
1029
1030 size_t get_inflight() {
1031 if (!s_manager) return 0;
1032 return s_manager->get_inflight();
1033 }
1034
1035 size_t get_queued() {
1036 if (!s_manager) return 0;
1037 return s_manager->get_queued();
1038 }
1039
1040 size_t get_dequeued() {
1041 if (!s_manager) return 0;
1042 return s_manager->get_dequeued();
1043 }
1044
1045 size_t get_max_connections() {
1046 if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
1047 return s_manager->max_connections;
1048 }
1049
1050 size_t get_max_inflight() {
1051 if (!s_manager) return MAX_INFLIGHT_DEFAULT;
1052 return s_manager->max_inflight;
1053 }
1054
1055 size_t get_max_queue() {
1056 if (!s_manager) return MAX_QUEUE_DEFAULT;
1057 return s_manager->max_queue;
1058 }
1059
1060 bool disconnect(connection_ptr_t& conn) {
1061 if (!s_manager) return false;
1062 return s_manager->disconnect(conn);
1063 }
1064
1065 } // namespace amqp
1066