]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 TL |
3 | |
4 | #include "rgw_amqp.h" | |
5 | #include <amqp.h> | |
f67539c2 | 6 | #include <amqp_ssl_socket.h> |
11fdf7f2 TL |
7 | #include <amqp_tcp_socket.h> |
8 | #include <amqp_framing.h> | |
9 | #include "include/ceph_assert.h" | |
10 | #include <sstream> | |
11 | #include <cstring> | |
12 | #include <unordered_map> | |
13 | #include <string> | |
14 | #include <vector> | |
15 | #include <thread> | |
16 | #include <atomic> | |
17 | #include <mutex> | |
18 | #include <boost/lockfree/queue.hpp> | |
eafe8130 | 19 | #include "common/dout.h" |
f67539c2 | 20 | #include <openssl/ssl.h> |
eafe8130 TL |
21 | |
22 | #define dout_subsys ceph_subsys_rgw | |
11fdf7f2 TL |
23 | |
24 | // TODO investigation, not necessarily issues: | |
25 | // (1) in case of single threaded writer context use spsc_queue | |
26 | // (2) support multiple channels | |
27 | // (3) check performance of emptying queue to local list, and go over the list and publish | |
28 | // (4) use std::shared_mutex (c++17) or equivalent for the connections lock | |
29 | ||
30 | namespace rgw::amqp { | |
31 | ||
32 | // RGW AMQP status codes for publishing | |
eafe8130 TL |
33 | static const int RGW_AMQP_STATUS_BROKER_NACK = -0x1001; |
34 | static const int RGW_AMQP_STATUS_CONNECTION_CLOSED = -0x1002; | |
35 | static const int RGW_AMQP_STATUS_QUEUE_FULL = -0x1003; | |
36 | static const int RGW_AMQP_STATUS_MAX_INFLIGHT = -0x1004; | |
37 | static const int RGW_AMQP_STATUS_MANAGER_STOPPED = -0x1005; | |
11fdf7f2 | 38 | // RGW AMQP status code for connection opening |
eafe8130 TL |
39 | static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED = -0x2001; |
40 | static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED = -0x2002; | |
41 | static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED = -0x2003; | |
42 | static const int RGW_AMQP_STATUS_LOGIN_FAILED = -0x2004; | |
43 | static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED = -0x2005; | |
11fdf7f2 | 44 | static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED = -0x2006; |
eafe8130 | 45 | static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED = -0x2007; |
11fdf7f2 TL |
46 | static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008; |
47 | static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009; | |
f67539c2 | 48 | static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED = -0x2010; |
11fdf7f2 | 49 | |
eafe8130 TL |
50 | static const int RGW_AMQP_RESPONSE_SOCKET_ERROR = -0x3008; |
51 | static const int RGW_AMQP_NO_REPLY_CODE = 0x0; | |
11fdf7f2 TL |
52 | |
53 | // key class for the connection list | |
54 | struct connection_id_t { | |
9f95a23c TL |
55 | const std::string host; |
56 | const int port; | |
57 | const std::string vhost; | |
11fdf7f2 TL |
58 | // constructed from amqp_connection_info struct |
59 | connection_id_t(const amqp_connection_info& info) | |
60 | : host(info.host), port(info.port), vhost(info.vhost) {} | |
61 | ||
62 | // equality operator and hasher functor are needed | |
63 | // so that connection_id_t could be used as key in unordered_map | |
64 | bool operator==(const connection_id_t& other) const { | |
65 | return host == other.host && port == other.port && vhost == other.vhost; | |
66 | } | |
67 | ||
68 | struct hasher { | |
69 | std::size_t operator()(const connection_id_t& k) const { | |
70 | return ((std::hash<std::string>()(k.host) | |
71 | ^ (std::hash<int>()(k.port) << 1)) >> 1) | |
72 | ^ (std::hash<std::string>()(k.vhost) << 1); | |
73 | } | |
74 | }; | |
75 | }; | |
76 | ||
eafe8130 | 77 | std::string to_string(const connection_id_t& id) { |
e306af50 | 78 | return id.host+":"+std::to_string(id.port)+id.vhost; |
eafe8130 TL |
79 | } |
80 | ||
11fdf7f2 TL |
81 | // connection_t state cleaner |
82 | // could be used for automatic cleanup when getting out of scope | |
83 | class ConnectionCleaner { | |
84 | private: | |
85 | amqp_connection_state_t conn; | |
86 | public: | |
87 | ConnectionCleaner(amqp_connection_state_t _conn) : conn(_conn) {} | |
88 | ~ConnectionCleaner() { | |
89 | if (conn) { | |
90 | amqp_destroy_connection(conn); | |
91 | } | |
92 | } | |
93 | // call reset() if cleanup is not needed anymore | |
94 | void reset() { | |
95 | conn = nullptr; | |
96 | } | |
97 | }; | |
98 | ||
99 | // struct for holding the callback and its tag in the callback list | |
100 | struct reply_callback_with_tag_t { | |
101 | uint64_t tag; | |
102 | reply_callback_t cb; | |
103 | ||
104 | reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {} | |
105 | ||
106 | bool operator==(uint64_t rhs) { | |
107 | return tag == rhs; | |
108 | } | |
109 | }; | |
110 | ||
111 | typedef std::vector<reply_callback_with_tag_t> CallbackList; | |
112 | ||
113 | // struct for holding the connection state object as well as the exchange | |
114 | // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr) | |
115 | // since references to deleted objects may still exist in the calling code | |
116 | struct connection_t { | |
20effc67 | 117 | std::atomic<amqp_connection_state_t> state; |
11fdf7f2 TL |
118 | std::string exchange; |
119 | std::string user; | |
120 | std::string password; | |
121 | amqp_bytes_t reply_to_queue; | |
11fdf7f2 TL |
122 | uint64_t delivery_tag; |
123 | int status; | |
124 | int reply_type; | |
125 | int reply_code; | |
126 | mutable std::atomic<int> ref_count; | |
eafe8130 | 127 | CephContext* cct; |
11fdf7f2 | 128 | CallbackList callbacks; |
e306af50 TL |
129 | ceph::coarse_real_clock::time_point next_reconnect; |
130 | bool mandatory; | |
20effc67 | 131 | bool use_ssl; |
f67539c2 TL |
132 | bool verify_ssl; |
133 | boost::optional<const std::string&> ca_location; | |
20effc67 | 134 | utime_t timestamp = ceph_clock_now(); |
11fdf7f2 TL |
135 | |
136 | // default ctor | |
137 | connection_t() : | |
138 | state(nullptr), | |
139 | reply_to_queue(amqp_empty_bytes), | |
11fdf7f2 TL |
140 | delivery_tag(1), |
141 | status(AMQP_STATUS_OK), | |
142 | reply_type(AMQP_RESPONSE_NORMAL), | |
143 | reply_code(RGW_AMQP_NO_REPLY_CODE), | |
eafe8130 | 144 | ref_count(0), |
e306af50 TL |
145 | cct(nullptr), |
146 | next_reconnect(ceph::coarse_real_clock::now()), | |
f67539c2 | 147 | mandatory(false), |
20effc67 | 148 | use_ssl(false), |
f67539c2 TL |
149 | verify_ssl(false), |
150 | ca_location(boost::none) | |
e306af50 | 151 | {} |
11fdf7f2 TL |
152 | |
153 | // cleanup of all internal connection resource | |
154 | // the object can still remain, and internal connection | |
155 | // resources created again on successful reconnection | |
156 | void destroy(int s) { | |
157 | status = s; | |
158 | ConnectionCleaner clean_state(state); | |
159 | state = nullptr; | |
160 | amqp_bytes_free(reply_to_queue); | |
161 | reply_to_queue = amqp_empty_bytes; | |
162 | // fire all remaining callbacks | |
eafe8130 TL |
163 | std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) { |
164 | cb_tag.cb(status); | |
165 | ldout(cct, 20) << "AMQP destroy: invoking callback with tag=" << cb_tag.tag << dendl; | |
11fdf7f2 | 166 | }); |
eafe8130 | 167 | callbacks.clear(); |
11fdf7f2 TL |
168 | delivery_tag = 1; |
169 | } | |
170 | ||
171 | bool is_ok() const { | |
20effc67 | 172 | return (state != nullptr); |
11fdf7f2 TL |
173 | } |
174 | ||
175 | // dtor also destroys the internals | |
176 | ~connection_t() { | |
177 | destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED); | |
178 | } | |
179 | ||
180 | friend void intrusive_ptr_add_ref(const connection_t* p); | |
181 | friend void intrusive_ptr_release(const connection_t* p); | |
182 | }; | |
183 | ||
184 | // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr | |
185 | void intrusive_ptr_add_ref(const connection_t* p) { | |
186 | ++p->ref_count; | |
187 | } | |
188 | void intrusive_ptr_release(const connection_t* p) { | |
189 | if (--p->ref_count == 0) { | |
190 | delete p; | |
191 | } | |
192 | } | |
193 | ||
194 | // convert connection info to string | |
195 | std::string to_string(const amqp_connection_info& info) { | |
196 | std::stringstream ss; | |
197 | ss << "connection info:" << | |
198 | "\nHost: " << info.host << | |
199 | "\nPort: " << info.port << | |
200 | "\nUser: " << info.user << | |
201 | "\nPassword: " << info.password << | |
202 | "\nvhost: " << info.vhost << | |
203 | "\nSSL support: " << info.ssl << std::endl; | |
204 | return ss.str(); | |
205 | } | |
206 | ||
207 | // convert reply to error code | |
208 | int reply_to_code(const amqp_rpc_reply_t& reply) { | |
209 | switch (reply.reply_type) { | |
210 | case AMQP_RESPONSE_NONE: | |
211 | case AMQP_RESPONSE_NORMAL: | |
212 | return RGW_AMQP_NO_REPLY_CODE; | |
213 | case AMQP_RESPONSE_LIBRARY_EXCEPTION: | |
214 | return reply.library_error; | |
215 | case AMQP_RESPONSE_SERVER_EXCEPTION: | |
216 | if (reply.reply.decoded) { | |
217 | const amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded; | |
218 | return m->reply_code; | |
219 | } | |
220 | return reply.reply.id; | |
221 | } | |
222 | return RGW_AMQP_NO_REPLY_CODE; | |
223 | } | |
224 | ||
225 | // convert reply to string | |
226 | std::string to_string(const amqp_rpc_reply_t& reply) { | |
227 | std::stringstream ss; | |
228 | switch (reply.reply_type) { | |
229 | case AMQP_RESPONSE_NORMAL: | |
230 | return ""; | |
231 | case AMQP_RESPONSE_NONE: | |
232 | return "missing RPC reply type"; | |
233 | case AMQP_RESPONSE_LIBRARY_EXCEPTION: | |
234 | return amqp_error_string2(reply.library_error); | |
235 | case AMQP_RESPONSE_SERVER_EXCEPTION: | |
236 | { | |
237 | switch (reply.reply.id) { | |
238 | case AMQP_CONNECTION_CLOSE_METHOD: | |
239 | ss << "server connection error: "; | |
240 | break; | |
241 | case AMQP_CHANNEL_CLOSE_METHOD: | |
242 | ss << "server channel error: "; | |
243 | break; | |
244 | default: | |
245 | ss << "server unknown error: "; | |
246 | break; | |
247 | } | |
248 | if (reply.reply.decoded) { | |
249 | amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded; | |
250 | ss << m->reply_code << " text: " << std::string((char*)m->reply_text.bytes, m->reply_text.len); | |
251 | } | |
252 | return ss.str(); | |
253 | } | |
254 | default: | |
255 | ss << "unknown error, method id: " << reply.reply.id; | |
256 | return ss.str(); | |
257 | } | |
258 | } | |
259 | ||
260 | // convert status enum to string | |
261 | std::string to_string(amqp_status_enum s) { | |
262 | switch (s) { | |
263 | case AMQP_STATUS_OK: | |
264 | return "AMQP_STATUS_OK"; | |
265 | case AMQP_STATUS_NO_MEMORY: | |
266 | return "AMQP_STATUS_NO_MEMORY"; | |
267 | case AMQP_STATUS_BAD_AMQP_DATA: | |
268 | return "AMQP_STATUS_BAD_AMQP_DATA"; | |
269 | case AMQP_STATUS_UNKNOWN_CLASS: | |
270 | return "AMQP_STATUS_UNKNOWN_CLASS"; | |
271 | case AMQP_STATUS_UNKNOWN_METHOD: | |
272 | return "AMQP_STATUS_UNKNOWN_METHOD"; | |
273 | case AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED: | |
274 | return "AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED"; | |
275 | case AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION: | |
276 | return "AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION"; | |
277 | case AMQP_STATUS_CONNECTION_CLOSED: | |
278 | return "AMQP_STATUS_CONNECTION_CLOSED"; | |
279 | case AMQP_STATUS_BAD_URL: | |
280 | return "AMQP_STATUS_BAD_URL"; | |
281 | case AMQP_STATUS_SOCKET_ERROR: | |
282 | return "AMQP_STATUS_SOCKET_ERROR"; | |
283 | case AMQP_STATUS_INVALID_PARAMETER: | |
284 | return "AMQP_STATUS_INVALID_PARAMETER"; | |
285 | case AMQP_STATUS_TABLE_TOO_BIG: | |
286 | return "AMQP_STATUS_TABLE_TOO_BIG"; | |
287 | case AMQP_STATUS_WRONG_METHOD: | |
288 | return "AMQP_STATUS_WRONG_METHOD"; | |
289 | case AMQP_STATUS_TIMEOUT: | |
290 | return "AMQP_STATUS_TIMEOUT"; | |
291 | case AMQP_STATUS_TIMER_FAILURE: | |
292 | return "AMQP_STATUS_TIMER_FAILURE"; | |
293 | case AMQP_STATUS_HEARTBEAT_TIMEOUT: | |
294 | return "AMQP_STATUS_HEARTBEAT_TIMEOUT"; | |
295 | case AMQP_STATUS_UNEXPECTED_STATE: | |
296 | return "AMQP_STATUS_UNEXPECTED_STATE"; | |
297 | case AMQP_STATUS_SOCKET_CLOSED: | |
298 | return "AMQP_STATUS_SOCKET_CLOSED"; | |
299 | case AMQP_STATUS_SOCKET_INUSE: | |
300 | return "AMQP_STATUS_SOCKET_INUSE"; | |
301 | case AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD: | |
302 | return "AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD"; | |
303 | #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 8, 0, 0) | |
304 | case AMQP_STATUS_UNSUPPORTED: | |
305 | return "AMQP_STATUS_UNSUPPORTED"; | |
306 | #endif | |
307 | case _AMQP_STATUS_NEXT_VALUE: | |
308 | return "AMQP_STATUS_INTERNAL"; | |
309 | case AMQP_STATUS_TCP_ERROR: | |
310 | return "AMQP_STATUS_TCP_ERROR"; | |
311 | case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR: | |
312 | return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR"; | |
313 | case _AMQP_STATUS_TCP_NEXT_VALUE: | |
314 | return "AMQP_STATUS_INTERNAL"; | |
315 | case AMQP_STATUS_SSL_ERROR: | |
316 | return "AMQP_STATUS_SSL_ERROR"; | |
317 | case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED: | |
318 | return "AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED"; | |
319 | case AMQP_STATUS_SSL_PEER_VERIFY_FAILED: | |
320 | return "AMQP_STATUS_SSL_PEER_VERIFY_FAILED"; | |
321 | case AMQP_STATUS_SSL_CONNECTION_FAILED: | |
322 | return "AMQP_STATUS_SSL_CONNECTION_FAILED"; | |
323 | case _AMQP_STATUS_SSL_NEXT_VALUE: | |
324 | return "AMQP_STATUS_INTERNAL"; | |
20effc67 TL |
325 | default: |
326 | return "AMQP_STATUS_UNKNOWN"; | |
11fdf7f2 | 327 | } |
11fdf7f2 TL |
328 | } |
329 | ||
330 | // TODO: add status_to_string on the connection object to prinf full status | |
331 | ||
332 | // convert int status to string - including RGW specific values | |
333 | std::string status_to_string(int s) { | |
334 | switch (s) { | |
335 | case RGW_AMQP_STATUS_BROKER_NACK: | |
336 | return "RGW_AMQP_STATUS_BROKER_NACK"; | |
337 | case RGW_AMQP_STATUS_CONNECTION_CLOSED: | |
338 | return "RGW_AMQP_STATUS_CONNECTION_CLOSED"; | |
339 | case RGW_AMQP_STATUS_QUEUE_FULL: | |
340 | return "RGW_AMQP_STATUS_QUEUE_FULL"; | |
341 | case RGW_AMQP_STATUS_MAX_INFLIGHT: | |
342 | return "RGW_AMQP_STATUS_MAX_INFLIGHT"; | |
eafe8130 TL |
343 | case RGW_AMQP_STATUS_MANAGER_STOPPED: |
344 | return "RGW_AMQP_STATUS_MANAGER_STOPPED"; | |
11fdf7f2 TL |
345 | case RGW_AMQP_STATUS_CONN_ALLOC_FAILED: |
346 | return "RGW_AMQP_STATUS_CONN_ALLOC_FAILED"; | |
347 | case RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED: | |
348 | return "RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED"; | |
349 | case RGW_AMQP_STATUS_SOCKET_OPEN_FAILED: | |
350 | return "RGW_AMQP_STATUS_SOCKET_OPEN_FAILED"; | |
351 | case RGW_AMQP_STATUS_LOGIN_FAILED: | |
352 | return "RGW_AMQP_STATUS_LOGIN_FAILED"; | |
353 | case RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED: | |
354 | return "RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED"; | |
355 | case RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED: | |
356 | return "RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED"; | |
357 | case RGW_AMQP_STATUS_Q_DECLARE_FAILED: | |
358 | return "RGW_AMQP_STATUS_Q_DECLARE_FAILED"; | |
359 | case RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED: | |
360 | return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED"; | |
361 | case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED: | |
362 | return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED"; | |
f67539c2 TL |
363 | case RGW_AMQP_STATUS_SOCKET_CACERT_FAILED: |
364 | return "RGW_AMQP_STATUS_SOCKET_CACERT_FAILED"; | |
11fdf7f2 TL |
365 | } |
366 | return to_string((amqp_status_enum)s); | |
367 | } | |
368 | ||
369 | // check the result from calls and return if error (=null) | |
370 | #define RETURN_ON_ERROR(C, S, OK) \ | |
371 | if (!OK) { \ | |
372 | C->status = S; \ | |
373 | return C; \ | |
374 | } | |
375 | ||
376 | // in case of RPC calls, getting the RPC reply and return if an error is detected | |
377 | #define RETURN_ON_REPLY_ERROR(C, ST, S) { \ | |
378 | const auto reply = amqp_get_rpc_reply(ST); \ | |
379 | if (reply.reply_type != AMQP_RESPONSE_NORMAL) { \ | |
380 | C->status = S; \ | |
381 | C->reply_type = reply.reply_type; \ | |
382 | C->reply_code = reply_to_code(reply); \ | |
383 | return C; \ | |
384 | } \ | |
385 | } | |
386 | ||
387 | static const amqp_channel_t CHANNEL_ID = 1; | |
388 | static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2; | |
389 | ||
390 | // utility function to create a connection, when the connection object already exists | |
391 | connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connection_info& info) { | |
20effc67 TL |
392 | ceph_assert(conn); |
393 | ||
11fdf7f2 TL |
394 | // reset all status codes |
395 | conn->status = AMQP_STATUS_OK; | |
396 | conn->reply_type = AMQP_RESPONSE_NORMAL; | |
397 | conn->reply_code = RGW_AMQP_NO_REPLY_CODE; | |
398 | ||
399 | auto state = amqp_new_connection(); | |
400 | if (!state) { | |
401 | conn->status = RGW_AMQP_STATUS_CONN_ALLOC_FAILED; | |
402 | return conn; | |
403 | } | |
404 | // make sure that the connection state is cleaned up in case of error | |
405 | ConnectionCleaner state_guard(state); | |
406 | ||
407 | // create and open socket | |
f67539c2 TL |
408 | amqp_socket_t *socket = nullptr; |
409 | if (info.ssl) { | |
410 | socket = amqp_ssl_socket_new(state); | |
411 | #if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1) | |
412 | SSL_CTX* ssl_ctx = reinterpret_cast<SSL_CTX*>(amqp_ssl_socket_get_context(socket)); | |
413 | #else | |
414 | // taken from https://github.com/alanxz/rabbitmq-c/pull/560 | |
415 | struct hack { | |
416 | const struct amqp_socket_class_t *klass; | |
417 | SSL_CTX *ctx; | |
418 | }; | |
419 | ||
420 | struct hack *h = reinterpret_cast<struct hack*>(socket); | |
421 | SSL_CTX* ssl_ctx = h->ctx; | |
422 | #endif | |
423 | // ensure system CA certificates get loaded | |
424 | SSL_CTX_set_default_verify_paths(ssl_ctx); | |
425 | } | |
426 | else { | |
427 | socket = amqp_tcp_socket_new(state); | |
428 | } | |
429 | ||
11fdf7f2 TL |
430 | if (!socket) { |
431 | conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED; | |
432 | return conn; | |
433 | } | |
f67539c2 TL |
434 | if (info.ssl) { |
435 | if (!conn->verify_ssl) { | |
436 | amqp_ssl_socket_set_verify_peer(socket, 0); | |
437 | amqp_ssl_socket_set_verify_hostname(socket, 0); | |
438 | } | |
439 | if (conn->ca_location.has_value()) { | |
440 | const auto s = amqp_ssl_socket_set_cacert(socket, conn->ca_location.get().c_str()); | |
441 | if (s != AMQP_STATUS_OK) { | |
442 | conn->status = RGW_AMQP_STATUS_SOCKET_CACERT_FAILED; | |
443 | conn->reply_code = s; | |
444 | return conn; | |
445 | } | |
446 | } | |
447 | } | |
11fdf7f2 TL |
448 | const auto s = amqp_socket_open(socket, info.host, info.port); |
449 | if (s < 0) { | |
450 | conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED; | |
451 | conn->reply_type = RGW_AMQP_RESPONSE_SOCKET_ERROR; | |
452 | conn->reply_code = s; | |
453 | return conn; | |
454 | } | |
455 | ||
456 | // login to broker | |
457 | const auto reply = amqp_login(state, | |
458 | info.vhost, | |
459 | AMQP_DEFAULT_MAX_CHANNELS, | |
460 | AMQP_DEFAULT_FRAME_SIZE, | |
461 | 0, // no heartbeat TODO: add conf | |
462 | AMQP_SASL_METHOD_PLAIN, // TODO: add other types of security | |
463 | info.user, | |
464 | info.password); | |
465 | if (reply.reply_type != AMQP_RESPONSE_NORMAL) { | |
466 | conn->status = RGW_AMQP_STATUS_LOGIN_FAILED; | |
467 | conn->reply_type = reply.reply_type; | |
468 | conn->reply_code = reply_to_code(reply); | |
469 | return conn; | |
470 | } | |
471 | ||
472 | // open channels | |
473 | { | |
474 | const auto ok = amqp_channel_open(state, CHANNEL_ID); | |
475 | RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED, ok); | |
476 | RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED); | |
477 | } | |
478 | { | |
479 | const auto ok = amqp_channel_open(state, CONFIRMING_CHANNEL_ID); | |
480 | RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED, ok); | |
481 | RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED); | |
482 | } | |
483 | { | |
484 | const auto ok = amqp_confirm_select(state, CONFIRMING_CHANNEL_ID); | |
485 | RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED, ok); | |
486 | RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED); | |
487 | } | |
488 | ||
489 | // verify that the topic exchange is there | |
490 | // TODO: make this step optional | |
491 | { | |
492 | const auto ok = amqp_exchange_declare(state, | |
493 | CHANNEL_ID, | |
494 | amqp_cstring_bytes(conn->exchange.c_str()), | |
495 | amqp_cstring_bytes("topic"), | |
496 | 1, // passive - exchange must already exist on broker | |
497 | 1, // durable | |
498 | 0, // dont auto-delete | |
499 | 0, // not internal | |
500 | amqp_empty_table); | |
501 | RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED, ok); | |
502 | RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED); | |
503 | } | |
504 | { | |
505 | // create queue for confirmations | |
506 | const auto queue_ok = amqp_queue_declare(state, | |
507 | CHANNEL_ID, // use the regular channel for this call | |
508 | amqp_empty_bytes, // let broker allocate queue name | |
509 | 0, // not passive - create the queue | |
510 | 0, // not durable | |
511 | 1, // exclusive | |
512 | 1, // auto-delete | |
513 | amqp_empty_table // not args TODO add args from conf: TTL, max length etc. | |
514 | ); | |
515 | RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_Q_DECLARE_FAILED, queue_ok); | |
516 | RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_Q_DECLARE_FAILED); | |
517 | ||
518 | // define consumption for connection | |
519 | const auto consume_ok = amqp_basic_consume(state, | |
520 | CONFIRMING_CHANNEL_ID, | |
521 | queue_ok->queue, | |
522 | amqp_empty_bytes, // broker will generate consumer tag | |
523 | 1, // messages sent from client are never routed back | |
524 | 1, // client does not ack thr acks | |
525 | 1, // exclusive access to queue | |
526 | amqp_empty_table // no parameters | |
527 | ); | |
528 | ||
529 | RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED, consume_ok); | |
530 | RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED); | |
531 | // broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed | |
532 | ||
533 | state_guard.reset(); | |
534 | conn->state = state; | |
535 | conn->reply_to_queue = amqp_bytes_malloc_dup(queue_ok->queue); | |
536 | return conn; | |
537 | } | |
538 | } | |
539 | ||
540 | // utility function to create a new connection | |
541 | connection_ptr_t create_new_connection(const amqp_connection_info& info, | |
f67539c2 | 542 | const std::string& exchange, bool mandatory_delivery, CephContext* cct, bool verify_ssl, boost::optional<const std::string&> ca_location) { |
11fdf7f2 TL |
543 | // create connection state |
544 | connection_ptr_t conn = new connection_t; | |
545 | conn->exchange = exchange; | |
546 | conn->user.assign(info.user); | |
547 | conn->password.assign(info.password); | |
e306af50 | 548 | conn->mandatory = mandatory_delivery; |
eafe8130 | 549 | conn->cct = cct; |
20effc67 | 550 | conn->use_ssl = info.ssl; |
f67539c2 | 551 | conn->verify_ssl = verify_ssl; |
20effc67 | 552 | conn->ca_location = ca_location; |
11fdf7f2 TL |
553 | return create_connection(conn, info); |
554 | } | |
555 | ||
556 | /// struct used for holding messages in the message queue | |
557 | struct message_wrapper_t { | |
558 | connection_ptr_t conn; | |
559 | std::string topic; | |
560 | std::string message; | |
561 | reply_callback_t cb; | |
562 | ||
563 | message_wrapper_t(connection_ptr_t& _conn, | |
564 | const std::string& _topic, | |
565 | const std::string& _message, | |
566 | reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {} | |
567 | }; | |
568 | ||
569 | ||
570 | typedef std::unordered_map<connection_id_t, connection_ptr_t, connection_id_t::hasher> ConnectionList; | |
571 | typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue; | |
572 | ||
573 | // macros used inside a loop where an iterator is either incremented or erased | |
574 | #define INCREMENT_AND_CONTINUE(IT) \ | |
575 | ++IT; \ | |
576 | continue; | |
577 | ||
578 | #define ERASE_AND_CONTINUE(IT,CONTAINER) \ | |
579 | IT=CONTAINER.erase(IT); \ | |
580 | --connection_count; \ | |
581 | continue; | |
582 | ||
583 | class Manager { | |
584 | public: | |
585 | const size_t max_connections; | |
586 | const size_t max_inflight; | |
587 | const size_t max_queue; | |
20effc67 | 588 | const size_t max_idle_time; |
11fdf7f2 TL |
589 | private: |
590 | std::atomic<size_t> connection_count; | |
20effc67 | 591 | std::atomic<bool> stopped; |
11fdf7f2 TL |
592 | struct timeval read_timeout; |
593 | ConnectionList connections; | |
594 | MessageQueue messages; | |
595 | std::atomic<size_t> queued; | |
596 | std::atomic<size_t> dequeued; | |
eafe8130 | 597 | CephContext* const cct; |
11fdf7f2 | 598 | mutable std::mutex connections_lock; |
e306af50 TL |
599 | const ceph::coarse_real_clock::duration idle_time; |
600 | const ceph::coarse_real_clock::duration reconnect_time; | |
f67539c2 | 601 | std::thread runner; |
11fdf7f2 TL |
602 | |
603 | void publish_internal(message_wrapper_t* message) { | |
604 | const std::unique_ptr<message_wrapper_t> msg_owner(message); | |
605 | auto& conn = message->conn; | |
606 | ||
20effc67 TL |
607 | conn->timestamp = ceph_clock_now(); |
608 | ||
11fdf7f2 TL |
609 | if (!conn->is_ok()) { |
610 | // connection had an issue while message was in the queue | |
611 | // TODO add error stats | |
eafe8130 | 612 | ldout(conn->cct, 1) << "AMQP publish: connection had an issue while message was in the queue" << dendl; |
11fdf7f2 TL |
613 | if (message->cb) { |
614 | message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED); | |
615 | } | |
616 | return; | |
617 | } | |
618 | ||
619 | if (message->cb == nullptr) { | |
620 | // TODO add error stats | |
621 | const auto rc = amqp_basic_publish(conn->state, | |
622 | CHANNEL_ID, | |
623 | amqp_cstring_bytes(conn->exchange.c_str()), | |
624 | amqp_cstring_bytes(message->topic.c_str()), | |
e306af50 | 625 | 0, // does not have to be routable |
11fdf7f2 | 626 | 0, // not immediate |
e306af50 | 627 | nullptr, // no properties needed |
11fdf7f2 TL |
628 | amqp_cstring_bytes(message->message.c_str())); |
629 | if (rc == AMQP_STATUS_OK) { | |
eafe8130 | 630 | ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl; |
11fdf7f2 TL |
631 | return; |
632 | } | |
eafe8130 | 633 | ldout(conn->cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl; |
11fdf7f2 TL |
634 | // an error occurred, close connection |
635 | // it will be retied by the main loop | |
636 | conn->destroy(rc); | |
637 | return; | |
638 | } | |
639 | ||
640 | amqp_basic_properties_t props; | |
641 | props._flags = | |
642 | AMQP_BASIC_DELIVERY_MODE_FLAG | | |
643 | AMQP_BASIC_REPLY_TO_FLAG; | |
644 | props.delivery_mode = 2; // persistent delivery TODO take from conf | |
645 | props.reply_to = conn->reply_to_queue; | |
646 | ||
647 | const auto rc = amqp_basic_publish(conn->state, | |
648 | CONFIRMING_CHANNEL_ID, | |
649 | amqp_cstring_bytes(conn->exchange.c_str()), | |
650 | amqp_cstring_bytes(message->topic.c_str()), | |
e306af50 | 651 | conn->mandatory, |
11fdf7f2 TL |
652 | 0, // not immediate |
653 | &props, | |
654 | amqp_cstring_bytes(message->message.c_str())); | |
655 | ||
656 | if (rc == AMQP_STATUS_OK) { | |
eafe8130 TL |
657 | auto const q_len = conn->callbacks.size(); |
658 | if (q_len < max_inflight) { | |
659 | ldout(conn->cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl; | |
11fdf7f2 TL |
660 | conn->callbacks.emplace_back(conn->delivery_tag++, message->cb); |
661 | } else { | |
662 | // immediately invoke callback with error | |
eafe8130 | 663 | ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl; |
11fdf7f2 TL |
664 | message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT); |
665 | } | |
666 | } else { | |
667 | // an error occurred, close connection | |
668 | // it will be retied by the main loop | |
eafe8130 | 669 | ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl; |
11fdf7f2 TL |
670 | conn->destroy(rc); |
671 | // immediately invoke callback with error | |
672 | message->cb(rc); | |
673 | } | |
674 | } | |
675 | ||
676 | // the managers thread: | |
677 | // (1) empty the queue of messages to be published | |
678 | // (2) loop over all connections and read acks | |
679 | // (3) manages deleted connections | |
680 | // (4) TODO reconnect on connection errors | |
681 | // (5) TODO cleanup timedout callbacks | |
20effc67 | 682 | void run() noexcept { |
11fdf7f2 TL |
683 | amqp_frame_t frame; |
684 | while (!stopped) { | |
685 | ||
686 | // publish all messages in the queue | |
687 | const auto count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1)); | |
688 | dequeued += count; | |
689 | ConnectionList::iterator conn_it; | |
690 | ConnectionList::const_iterator end_it; | |
691 | { | |
692 | // thread safe access to the connection list | |
693 | // once the iterators are fetched they are guaranteed to remain valid | |
9f95a23c | 694 | std::lock_guard lock(connections_lock); |
11fdf7f2 TL |
695 | conn_it = connections.begin(); |
696 | end_it = connections.end(); | |
697 | } | |
698 | auto incoming_message = false; | |
699 | // loop over all connections to read acks | |
700 | for (;conn_it != end_it;) { | |
701 | ||
702 | auto& conn = conn_it->second; | |
20effc67 TL |
703 | const auto& conn_key = conn_it->first; |
704 | ||
705 | if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) { | |
706 | ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; | |
11fdf7f2 TL |
707 | ERASE_AND_CONTINUE(conn_it, connections); |
708 | } | |
709 | ||
710 | // try to reconnect the connection if it has an error | |
711 | if (!conn->is_ok()) { | |
e306af50 TL |
712 | const auto now = ceph::coarse_real_clock::now(); |
713 | if (now >= conn->next_reconnect) { | |
714 | // pointers are used temporarily inside the amqp_connection_info object | |
715 | // as read-only values, hence the assignment, and const_cast are safe here | |
716 | amqp_connection_info info; | |
20effc67 TL |
717 | info.host = const_cast<char*>(conn_key.host.c_str()); |
718 | info.port = conn_key.port; | |
719 | info.vhost = const_cast<char*>(conn_key.vhost.c_str()); | |
e306af50 TL |
720 | info.user = const_cast<char*>(conn->user.c_str()); |
721 | info.password = const_cast<char*>(conn->password.c_str()); | |
20effc67 | 722 | info.ssl = conn->use_ssl; |
e306af50 TL |
723 | ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl; |
724 | if (create_connection(conn, info)->is_ok() == false) { | |
20effc67 | 725 | ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry failed. error: " << |
e306af50 TL |
726 | status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl; |
727 | // TODO: add error counter for failed retries | |
728 | // TODO: add exponential backoff for retries | |
729 | conn->next_reconnect = now + reconnect_time; | |
730 | } else { | |
20effc67 | 731 | ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_key) << ") retry successfull" << dendl; |
e306af50 | 732 | } |
11fdf7f2 TL |
733 | } |
734 | INCREMENT_AND_CONTINUE(conn_it); | |
735 | } | |
736 | ||
737 | const auto rc = amqp_simple_wait_frame_noblock(conn->state, &frame, &read_timeout); | |
738 | ||
739 | if (rc == AMQP_STATUS_TIMEOUT) { | |
740 | // TODO mark connection as idle | |
741 | INCREMENT_AND_CONTINUE(conn_it); | |
742 | } | |
743 | ||
744 | // this is just to prevent spinning idle, does not indicate that a message | |
745 | // was successfully processed or not | |
746 | incoming_message = true; | |
747 | ||
748 | // check if error occurred that require reopening the connection | |
749 | if (rc != AMQP_STATUS_OK) { | |
750 | // an error occurred, close connection | |
751 | // it will be retied by the main loop | |
eafe8130 | 752 | ldout(conn->cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl; |
11fdf7f2 TL |
753 | conn->destroy(rc); |
754 | INCREMENT_AND_CONTINUE(conn_it); | |
755 | } | |
756 | ||
757 | if (frame.frame_type != AMQP_FRAME_METHOD) { | |
e306af50 TL |
758 | ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: " |
759 | << unsigned(frame.frame_type) << dendl; | |
11fdf7f2 | 760 | // handler is for publish confirmation only - handle only method frames |
11fdf7f2 TL |
761 | INCREMENT_AND_CONTINUE(conn_it); |
762 | } | |
763 | ||
764 | uint64_t tag; | |
765 | bool multiple; | |
766 | int result; | |
767 | ||
768 | switch (frame.payload.method.id) { | |
769 | case AMQP_BASIC_ACK_METHOD: | |
770 | { | |
771 | result = AMQP_STATUS_OK; | |
772 | const auto ack = (amqp_basic_ack_t*)frame.payload.method.decoded; | |
773 | ceph_assert(ack); | |
774 | tag = ack->delivery_tag; | |
775 | multiple = ack->multiple; | |
776 | break; | |
777 | } | |
778 | case AMQP_BASIC_NACK_METHOD: | |
779 | { | |
780 | result = RGW_AMQP_STATUS_BROKER_NACK; | |
781 | const auto nack = (amqp_basic_nack_t*)frame.payload.method.decoded; | |
782 | ceph_assert(nack); | |
783 | tag = nack->delivery_tag; | |
784 | multiple = nack->multiple; | |
785 | break; | |
786 | } | |
e306af50 TL |
787 | case AMQP_BASIC_REJECT_METHOD: |
788 | { | |
789 | result = RGW_AMQP_STATUS_BROKER_NACK; | |
790 | const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded; | |
791 | tag = reject->delivery_tag; | |
792 | multiple = false; | |
793 | break; | |
794 | } | |
11fdf7f2 TL |
795 | case AMQP_CONNECTION_CLOSE_METHOD: |
796 | // TODO on channel close, no need to reopen the connection | |
797 | case AMQP_CHANNEL_CLOSE_METHOD: | |
798 | { | |
799 | // other side closed the connection, no need to continue | |
eafe8130 | 800 | ldout(conn->cct, 10) << "AMQP run: connection was closed by broker" << dendl; |
11fdf7f2 TL |
801 | conn->destroy(rc); |
802 | INCREMENT_AND_CONTINUE(conn_it); | |
803 | } | |
804 | case AMQP_BASIC_RETURN_METHOD: | |
805 | // message was not delivered, returned to sender | |
e306af50 | 806 | ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl; |
11fdf7f2 TL |
807 | INCREMENT_AND_CONTINUE(conn_it); |
808 | break; | |
809 | default: | |
810 | // unexpected method | |
eafe8130 | 811 | ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl; |
11fdf7f2 TL |
812 | INCREMENT_AND_CONTINUE(conn_it); |
813 | } | |
814 | ||
815 | const auto& callbacks_end = conn->callbacks.end(); | |
816 | const auto& callbacks_begin = conn->callbacks.begin(); | |
eafe8130 TL |
817 | const auto tag_it = std::find(callbacks_begin, callbacks_end, tag); |
818 | if (tag_it != callbacks_end) { | |
11fdf7f2 TL |
819 | if (multiple) { |
820 | // n/ack all up to (and including) the tag | |
eafe8130 TL |
821 | ldout(conn->cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl; |
822 | auto it = callbacks_begin; | |
823 | while (it->tag <= tag && it != conn->callbacks.end()) { | |
824 | ldout(conn->cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl; | |
825 | it->cb(result); | |
826 | it = conn->callbacks.erase(it); | |
11fdf7f2 TL |
827 | } |
828 | } else { | |
829 | // n/ack a specific tag | |
eafe8130 TL |
830 | ldout(conn->cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl; |
831 | tag_it->cb(result); | |
832 | conn->callbacks.erase(tag_it); | |
11fdf7f2 TL |
833 | } |
834 | } else { | |
eafe8130 | 835 | ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl; |
11fdf7f2 TL |
836 | } |
837 | // just increment the iterator | |
838 | ++conn_it; | |
839 | } | |
840 | // if no messages were received or published, sleep for 100ms | |
841 | if (count == 0 && !incoming_message) { | |
e306af50 | 842 | std::this_thread::sleep_for(idle_time); |
11fdf7f2 TL |
843 | } |
844 | } | |
845 | } | |
846 | ||
847 | // used in the dtor for message cleanup | |
848 | static void delete_message(const message_wrapper_t* message) { | |
849 | delete message; | |
850 | } | |
851 | ||
852 | public: | |
853 | Manager(size_t _max_connections, | |
854 | size_t _max_inflight, | |
855 | size_t _max_queue, | |
eafe8130 | 856 | long _usec_timeout, |
e306af50 TL |
857 | unsigned reconnect_time_ms, |
858 | unsigned idle_time_ms, | |
eafe8130 | 859 | CephContext* _cct) : |
11fdf7f2 TL |
860 | max_connections(_max_connections), |
861 | max_inflight(_max_inflight), | |
862 | max_queue(_max_queue), | |
20effc67 | 863 | max_idle_time(30), |
11fdf7f2 TL |
864 | connection_count(0), |
865 | stopped(false), | |
866 | read_timeout{0, _usec_timeout}, | |
867 | connections(_max_connections), | |
868 | messages(max_queue), | |
869 | queued(0), | |
870 | dequeued(0), | |
eafe8130 | 871 | cct(_cct), |
e306af50 | 872 | idle_time(std::chrono::milliseconds(idle_time_ms)), |
f67539c2 TL |
873 | reconnect_time(std::chrono::milliseconds(reconnect_time_ms)), |
874 | runner(&Manager::run, this) { | |
11fdf7f2 TL |
875 | // The hashmap has "max connections" as the initial number of buckets, |
876 | // and allows for 10 collisions per bucket before rehash. | |
877 | // This is to prevent rehashing so that iterators are not invalidated | |
878 | // when a new connection is added. | |
879 | connections.max_load_factor(10.0); | |
eafe8130 TL |
880 | // give the runner thread a name for easier debugging |
881 | const auto rc = ceph_pthread_setname(runner.native_handle(), "amqp_manager"); | |
882 | ceph_assert(rc==0); | |
11fdf7f2 TL |
883 | } |
884 | ||
885 | // non copyable | |
886 | Manager(const Manager&) = delete; | |
887 | const Manager& operator=(const Manager&) = delete; | |
888 | ||
889 | // stop the main thread | |
890 | void stop() { | |
891 | stopped = true; | |
892 | } | |
893 | ||
11fdf7f2 | 894 | // connect to a broker, or reuse an existing connection if already connected |
f67539c2 TL |
895 | connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl, |
896 | boost::optional<const std::string&> ca_location) { | |
11fdf7f2 | 897 | if (stopped) { |
eafe8130 | 898 | ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl; |
11fdf7f2 TL |
899 | return nullptr; |
900 | } | |
901 | ||
902 | struct amqp_connection_info info; | |
903 | // cache the URL so that parsing could happen in-place | |
904 | std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1); | |
20effc67 TL |
905 | const auto retcode = amqp_parse_url(url_cache.data(), &info); |
906 | if (AMQP_STATUS_OK != retcode) { | |
907 | ldout(cct, 1) << "AMQP connect: URL parsing failed. error: " << retcode << dendl; | |
11fdf7f2 TL |
908 | return nullptr; |
909 | } | |
910 | ||
911 | const connection_id_t id(info); | |
9f95a23c | 912 | std::lock_guard lock(connections_lock); |
11fdf7f2 TL |
913 | const auto it = connections.find(id); |
914 | if (it != connections.end()) { | |
20effc67 | 915 | if (it->second->exchange != exchange) { |
eafe8130 | 916 | ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl; |
11fdf7f2 TL |
917 | return nullptr; |
918 | } | |
919 | // connection found - return even if non-ok | |
eafe8130 | 920 | ldout(cct, 20) << "AMQP connect: connection found" << dendl; |
11fdf7f2 TL |
921 | return it->second; |
922 | } | |
923 | ||
924 | // connection not found, creating a new one | |
925 | if (connection_count >= max_connections) { | |
eafe8130 | 926 | ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl; |
11fdf7f2 TL |
927 | return nullptr; |
928 | } | |
f67539c2 | 929 | const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct, verify_ssl, ca_location); |
e306af50 TL |
930 | if (!conn->is_ok()) { |
931 | ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" << | |
932 | status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl; | |
933 | } | |
11fdf7f2 TL |
934 | // create_new_connection must always return a connection object |
935 | // even if error occurred during creation. | |
936 | // in such a case the creation will be retried in the main thread | |
937 | ceph_assert(conn); | |
938 | ++connection_count; | |
eafe8130 TL |
939 | ldout(cct, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count << dendl; |
940 | ldout(cct, 10) << "AMQP connect: new connection status is: " << status_to_string(conn->status) << dendl; | |
11fdf7f2 TL |
941 | return connections.emplace(id, conn).first->second; |
942 | } | |
943 | ||
944 | // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack) | |
945 | int publish(connection_ptr_t& conn, | |
946 | const std::string& topic, | |
947 | const std::string& message) { | |
eafe8130 | 948 | if (stopped) { |
e306af50 | 949 | ldout(cct, 1) << "AMQP publish: manager is not running" << dendl; |
eafe8130 TL |
950 | return RGW_AMQP_STATUS_MANAGER_STOPPED; |
951 | } | |
11fdf7f2 | 952 | if (!conn || !conn->is_ok()) { |
e306af50 | 953 | ldout(cct, 1) << "AMQP publish: no connection" << dendl; |
11fdf7f2 TL |
954 | return RGW_AMQP_STATUS_CONNECTION_CLOSED; |
955 | } | |
956 | if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) { | |
957 | ++queued; | |
958 | return AMQP_STATUS_OK; | |
959 | } | |
e306af50 | 960 | ldout(cct, 1) << "AMQP publish: queue is full" << dendl; |
11fdf7f2 TL |
961 | return RGW_AMQP_STATUS_QUEUE_FULL; |
962 | } | |
963 | ||
964 | int publish_with_confirm(connection_ptr_t& conn, | |
965 | const std::string& topic, | |
966 | const std::string& message, | |
967 | reply_callback_t cb) { | |
eafe8130 | 968 | if (stopped) { |
e306af50 | 969 | ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl; |
eafe8130 TL |
970 | return RGW_AMQP_STATUS_MANAGER_STOPPED; |
971 | } | |
11fdf7f2 | 972 | if (!conn || !conn->is_ok()) { |
e306af50 | 973 | ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl; |
11fdf7f2 TL |
974 | return RGW_AMQP_STATUS_CONNECTION_CLOSED; |
975 | } | |
976 | if (messages.push(new message_wrapper_t(conn, topic, message, cb))) { | |
977 | ++queued; | |
978 | return AMQP_STATUS_OK; | |
979 | } | |
e306af50 | 980 | ldout(cct, 1) << "AMQP publish_with_confirm: queue is full" << dendl; |
11fdf7f2 TL |
981 | return RGW_AMQP_STATUS_QUEUE_FULL; |
982 | } | |
983 | ||
984 | // dtor wait for thread to stop | |
985 | // then connection are cleaned-up | |
986 | ~Manager() { | |
987 | stopped = true; | |
988 | runner.join(); | |
989 | messages.consume_all(delete_message); | |
990 | } | |
991 | ||
992 | // get the number of connections | |
993 | size_t get_connection_count() const { | |
994 | return connection_count; | |
995 | } | |
996 | ||
997 | // get the number of in-flight messages | |
998 | size_t get_inflight() const { | |
999 | size_t sum = 0; | |
9f95a23c | 1000 | std::lock_guard lock(connections_lock); |
11fdf7f2 | 1001 | std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) { |
20effc67 | 1002 | // concurrent access to the callback vector is safe without locking |
11fdf7f2 TL |
1003 | sum += conn_pair.second->callbacks.size(); |
1004 | }); | |
1005 | return sum; | |
1006 | } | |
1007 | ||
1008 | // running counter of the queued messages | |
1009 | size_t get_queued() const { | |
1010 | return queued; | |
1011 | } | |
1012 | ||
1013 | // running counter of the dequeued messages | |
1014 | size_t get_dequeued() const { | |
1015 | return dequeued; | |
1016 | } | |
1017 | }; | |
1018 | ||
1019 | // singleton manager | |
1020 | // note that the manager itself is not a singleton, and multiple instances may co-exist | |
eafe8130 TL |
1021 | // TODO make the pointer atomic in allocation and deallocation to avoid race conditions |
1022 | static Manager* s_manager = nullptr; | |
1023 | ||
1024 | static const size_t MAX_CONNECTIONS_DEFAULT = 256; | |
1025 | static const size_t MAX_INFLIGHT_DEFAULT = 8192; | |
1026 | static const size_t MAX_QUEUE_DEFAULT = 8192; | |
e306af50 TL |
1027 | static const long READ_TIMEOUT_USEC = 100; |
1028 | static const unsigned IDLE_TIME_MS = 100; | |
1029 | static const unsigned RECONNECT_TIME_MS = 100; | |
eafe8130 TL |
1030 | |
1031 | bool init(CephContext* cct) { | |
1032 | if (s_manager) { | |
1033 | return false; | |
1034 | } | |
1035 | // TODO: take conf from CephContext | |
e306af50 TL |
1036 | s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, |
1037 | READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct); | |
eafe8130 TL |
1038 | return true; |
1039 | } | |
1040 | ||
1041 | void shutdown() { | |
1042 | delete s_manager; | |
1043 | s_manager = nullptr; | |
1044 | } | |
11fdf7f2 | 1045 | |
f67539c2 TL |
1046 | connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl, |
1047 | boost::optional<const std::string&> ca_location) { | |
eafe8130 | 1048 | if (!s_manager) return nullptr; |
f67539c2 | 1049 | return s_manager->connect(url, exchange, mandatory_delivery, verify_ssl, ca_location); |
11fdf7f2 TL |
1050 | } |
1051 | ||
1052 | int publish(connection_ptr_t& conn, | |
1053 | const std::string& topic, | |
1054 | const std::string& message) { | |
eafe8130 TL |
1055 | if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED; |
1056 | return s_manager->publish(conn, topic, message); | |
11fdf7f2 TL |
1057 | } |
1058 | ||
1059 | int publish_with_confirm(connection_ptr_t& conn, | |
1060 | const std::string& topic, | |
1061 | const std::string& message, | |
1062 | reply_callback_t cb) { | |
eafe8130 TL |
1063 | if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED; |
1064 | return s_manager->publish_with_confirm(conn, topic, message, cb); | |
11fdf7f2 TL |
1065 | } |
1066 | ||
1067 | size_t get_connection_count() { | |
eafe8130 TL |
1068 | if (!s_manager) return 0; |
1069 | return s_manager->get_connection_count(); | |
11fdf7f2 TL |
1070 | } |
1071 | ||
1072 | size_t get_inflight() { | |
eafe8130 TL |
1073 | if (!s_manager) return 0; |
1074 | return s_manager->get_inflight(); | |
11fdf7f2 TL |
1075 | } |
1076 | ||
1077 | size_t get_queued() { | |
eafe8130 TL |
1078 | if (!s_manager) return 0; |
1079 | return s_manager->get_queued(); | |
11fdf7f2 TL |
1080 | } |
1081 | ||
1082 | size_t get_dequeued() { | |
eafe8130 TL |
1083 | if (!s_manager) return 0; |
1084 | return s_manager->get_dequeued(); | |
11fdf7f2 TL |
1085 | } |
1086 | ||
1087 | size_t get_max_connections() { | |
eafe8130 TL |
1088 | if (!s_manager) return MAX_CONNECTIONS_DEFAULT; |
1089 | return s_manager->max_connections; | |
11fdf7f2 TL |
1090 | } |
1091 | ||
1092 | size_t get_max_inflight() { | |
eafe8130 TL |
1093 | if (!s_manager) return MAX_INFLIGHT_DEFAULT; |
1094 | return s_manager->max_inflight; | |
11fdf7f2 TL |
1095 | } |
1096 | ||
1097 | size_t get_max_queue() { | |
eafe8130 TL |
1098 | if (!s_manager) return MAX_QUEUE_DEFAULT; |
1099 | return s_manager->max_queue; | |
11fdf7f2 TL |
1100 | } |
1101 | ||
11fdf7f2 TL |
1102 | } // namespace amqp |
1103 |