]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #include "rgw_kafka.h" | |
5 | #include "rgw_url.h" | |
6 | #include <librdkafka/rdkafka.h> | |
7 | #include "include/ceph_assert.h" | |
8 | #include <sstream> | |
9 | #include <cstring> | |
10 | #include <unordered_map> | |
11 | #include <string> | |
12 | #include <vector> | |
13 | #include <thread> | |
14 | #include <atomic> | |
15 | #include <mutex> | |
16 | #include <boost/lockfree/queue.hpp> | |
17 | #include "common/dout.h" | |
18 | ||
19 | #define dout_subsys ceph_subsys_rgw | |
20 | ||
21 | // TODO investigation, not necessarily issues: | |
22 | // (1) in case of single threaded writer context use spsc_queue | |
23 | // (2) check performance of emptying queue to local list, and go over the list and publish | |
24 | // (3) use std::shared_mutex (c++17) or equivalent for the connections lock | |
25 | ||
26 | // cmparisson operator between topic pointer and name | |
27 | bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) { | |
28 | return name == std::string_view(rd_kafka_topic_name(rkt)); | |
29 | } | |
30 | ||
31 | namespace rgw::kafka { | |
32 | ||
33 | // status codes for publishing | |
34 | // TODO: use the actual error code (when conn exists) instead of STATUS_CONNECTION_CLOSED when replying to client | |
35 | static const int STATUS_CONNECTION_CLOSED = -0x1002; | |
36 | static const int STATUS_QUEUE_FULL = -0x1003; | |
37 | static const int STATUS_MAX_INFLIGHT = -0x1004; | |
38 | static const int STATUS_MANAGER_STOPPED = -0x1005; | |
39 | // status code for connection opening | |
40 | static const int STATUS_CONF_ALLOC_FAILED = -0x2001; | |
41 | ||
42 | static const int STATUS_OK = 0x0; | |
43 | ||
44 | // struct for holding the callback and its tag in the callback list | |
45 | struct reply_callback_with_tag_t { | |
46 | uint64_t tag; | |
47 | reply_callback_t cb; | |
48 | ||
49 | reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {} | |
50 | ||
51 | bool operator==(uint64_t rhs) { | |
52 | return tag == rhs; | |
53 | } | |
54 | }; | |
55 | ||
56 | typedef std::vector<reply_callback_with_tag_t> CallbackList; | |
57 | ||
58 | // struct for holding the connection state object as well as list of topics | |
59 | // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr) | |
60 | // since references to deleted objects may still exist in the calling code | |
61 | struct connection_t { | |
62 | rd_kafka_t* producer = nullptr; | |
63 | rd_kafka_conf_t* temp_conf = nullptr; | |
64 | std::vector<rd_kafka_topic_t*> topics; | |
9f95a23c | 65 | uint64_t delivery_tag = 1; |
f67539c2 | 66 | int status = STATUS_OK; |
9f95a23c TL |
67 | mutable std::atomic<int> ref_count = 0; |
68 | CephContext* const cct; | |
69 | CallbackList callbacks; | |
70 | const std::string broker; | |
71 | const bool use_ssl; | |
72 | const bool verify_ssl; // TODO currently iognored, not supported in librdkafka v0.11.6 | |
73 | const boost::optional<std::string> ca_location; | |
74 | const std::string user; | |
75 | const std::string password; | |
20effc67 | 76 | utime_t timestamp = ceph_clock_now(); |
9f95a23c TL |
77 | |
78 | // cleanup of all internal connection resource | |
79 | // the object can still remain, and internal connection | |
80 | // resources created again on successful reconnection | |
81 | void destroy(int s) { | |
82 | status = s; | |
83 | // destroy temporary conf (if connection was never established) | |
84 | if (temp_conf) { | |
85 | rd_kafka_conf_destroy(temp_conf); | |
86 | return; | |
87 | } | |
88 | // wait for all remaining acks/nacks | |
89 | rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */); | |
90 | // destroy all topics | |
91 | std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);}); | |
92 | // destroy producer | |
93 | rd_kafka_destroy(producer); | |
94 | // fire all remaining callbacks (if not fired by rd_kafka_flush) | |
95 | std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) { | |
96 | cb_tag.cb(status); | |
97 | ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag << dendl; | |
98 | }); | |
99 | callbacks.clear(); | |
100 | delivery_tag = 1; | |
101 | } | |
102 | ||
103 | bool is_ok() const { | |
20effc67 | 104 | return (producer != nullptr); |
9f95a23c TL |
105 | } |
106 | ||
107 | // ctor for setting immutable values | |
108 | connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl, | |
109 | const boost::optional<const std::string&>& _ca_location, | |
110 | const std::string& _user, const std::string& _password) : | |
111 | cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password) {} | |
112 | ||
113 | // dtor also destroys the internals | |
114 | ~connection_t() { | |
115 | destroy(STATUS_CONNECTION_CLOSED); | |
116 | } | |
117 | ||
118 | friend void intrusive_ptr_add_ref(const connection_t* p); | |
119 | friend void intrusive_ptr_release(const connection_t* p); | |
120 | }; | |
121 | ||
122 | std::string to_string(const connection_ptr_t& conn) { | |
123 | std::string str; | |
124 | str += "\nBroker: " + conn->broker; | |
125 | str += conn->use_ssl ? "\nUse SSL" : ""; | |
126 | str += conn->ca_location ? "\nCA Location: " + *(conn->ca_location) : ""; | |
127 | return str; | |
128 | } | |
129 | // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr | |
130 | void intrusive_ptr_add_ref(const connection_t* p) { | |
131 | ++p->ref_count; | |
132 | } | |
133 | void intrusive_ptr_release(const connection_t* p) { | |
134 | if (--p->ref_count == 0) { | |
135 | delete p; | |
136 | } | |
137 | } | |
138 | ||
139 | // convert int status to string - including RGW specific values | |
140 | std::string status_to_string(int s) { | |
141 | switch (s) { | |
142 | case STATUS_OK: | |
143 | return "STATUS_OK"; | |
144 | case STATUS_CONNECTION_CLOSED: | |
145 | return "RGW_KAFKA_STATUS_CONNECTION_CLOSED"; | |
146 | case STATUS_QUEUE_FULL: | |
147 | return "RGW_KAFKA_STATUS_QUEUE_FULL"; | |
148 | case STATUS_MAX_INFLIGHT: | |
149 | return "RGW_KAFKA_STATUS_MAX_INFLIGHT"; | |
150 | case STATUS_MANAGER_STOPPED: | |
151 | return "RGW_KAFKA_STATUS_MANAGER_STOPPED"; | |
152 | case STATUS_CONF_ALLOC_FAILED: | |
153 | return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED"; | |
154 | } | |
155 | return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s)); | |
156 | } | |
157 | ||
158 | void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) { | |
159 | ceph_assert(opaque); | |
160 | ||
161 | const auto conn = reinterpret_cast<connection_t*>(opaque); | |
162 | const auto result = rkmessage->err; | |
163 | ||
164 | if (!rkmessage->_private) { | |
165 | ldout(conn->cct, 20) << "Kafka run: n/ack received, (no callback) with result=" << result << dendl; | |
166 | return; | |
167 | } | |
168 | ||
169 | const auto tag = reinterpret_cast<uint64_t*>(rkmessage->_private); | |
170 | const auto& callbacks_end = conn->callbacks.end(); | |
171 | const auto& callbacks_begin = conn->callbacks.begin(); | |
172 | const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag); | |
173 | if (tag_it != callbacks_end) { | |
174 | ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" << | |
175 | *tag << " and result=" << rd_kafka_err2str(result) << dendl; | |
176 | tag_it->cb(result); | |
177 | conn->callbacks.erase(tag_it); | |
178 | } else { | |
179 | // TODO add counter for acks with no callback | |
180 | ldout(conn->cct, 10) << "Kafka run: unsolicited n/ack received with tag=" << | |
181 | *tag << dendl; | |
182 | } | |
183 | delete tag; | |
184 | // rkmessage is destroyed automatically by librdkafka | |
185 | } | |
186 | ||
187 | // utility function to create a connection, when the connection object already exists | |
188 | connection_ptr_t& create_connection(connection_ptr_t& conn) { | |
189 | // pointer must be valid and not marked for deletion | |
20effc67 | 190 | ceph_assert(conn); |
9f95a23c TL |
191 | |
192 | // reset all status codes | |
193 | conn->status = STATUS_OK; | |
194 | char errstr[512] = {0}; | |
195 | ||
196 | conn->temp_conf = rd_kafka_conf_new(); | |
197 | if (!conn->temp_conf) { | |
198 | conn->status = STATUS_CONF_ALLOC_FAILED; | |
199 | return conn; | |
200 | } | |
201 | ||
202 | // get list of brokers based on the bootsrap broker | |
203 | if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
204 | ||
205 | if (conn->use_ssl) { | |
206 | if (!conn->user.empty()) { | |
207 | // use SSL+SASL | |
208 | if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || | |
209 | rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || | |
210 | rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || | |
211 | rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
212 | ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl; | |
213 | } else { | |
214 | // use only SSL | |
215 | if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
216 | ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL security" << dendl; | |
217 | } | |
218 | if (conn->ca_location) { | |
219 | if (rd_kafka_conf_set(conn->temp_conf, "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
220 | ldout(conn->cct, 20) << "Kafka connect: successfully configured CA location" << dendl; | |
221 | } else { | |
222 | ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl; | |
223 | } | |
224 | // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call | |
225 | // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
226 | ||
227 | ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl; | |
228 | } | |
229 | ||
230 | // set the global callback for delivery success/fail | |
231 | rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback); | |
232 | ||
233 | // set the global opaque pointer to be the connection itself | |
234 | rd_kafka_conf_set_opaque(conn->temp_conf, conn.get()); | |
235 | ||
236 | // create the producer | |
237 | conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr)); | |
238 | if (!conn->producer) { | |
239 | conn->status = rd_kafka_last_error(); | |
240 | ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl; | |
241 | return conn; | |
242 | } | |
243 | ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl; | |
244 | ||
245 | // conf ownership passed to producer | |
246 | conn->temp_conf = nullptr; | |
247 | return conn; | |
248 | ||
249 | conf_error: | |
250 | conn->status = rd_kafka_last_error(); | |
251 | ldout(conn->cct, 1) << "Kafka connect: configuration failed: " << errstr << dendl; | |
252 | return conn; | |
253 | } | |
254 | ||
255 | // utility function to create a new connection | |
256 | connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct, | |
257 | bool use_ssl, | |
258 | bool verify_ssl, | |
259 | boost::optional<const std::string&> ca_location, | |
260 | const std::string& user, | |
261 | const std::string& password) { | |
262 | // create connection state | |
263 | connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password)); | |
264 | return create_connection(conn); | |
265 | } | |
266 | ||
267 | /// struct used for holding messages in the message queue | |
268 | struct message_wrapper_t { | |
269 | connection_ptr_t conn; | |
270 | std::string topic; | |
271 | std::string message; | |
272 | reply_callback_t cb; | |
273 | ||
274 | message_wrapper_t(connection_ptr_t& _conn, | |
275 | const std::string& _topic, | |
276 | const std::string& _message, | |
277 | reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {} | |
278 | }; | |
279 | ||
280 | typedef std::unordered_map<std::string, connection_ptr_t> ConnectionList; | |
281 | typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue; | |
282 | ||
283 | // macros used inside a loop where an iterator is either incremented or erased | |
284 | #define INCREMENT_AND_CONTINUE(IT) \ | |
285 | ++IT; \ | |
286 | continue; | |
287 | ||
288 | #define ERASE_AND_CONTINUE(IT,CONTAINER) \ | |
289 | IT=CONTAINER.erase(IT); \ | |
290 | --connection_count; \ | |
291 | continue; | |
292 | ||
293 | class Manager { | |
294 | public: | |
295 | const size_t max_connections; | |
296 | const size_t max_inflight; | |
297 | const size_t max_queue; | |
20effc67 | 298 | const size_t max_idle_time; |
9f95a23c TL |
299 | private: |
300 | std::atomic<size_t> connection_count; | |
301 | bool stopped; | |
302 | int read_timeout_ms; | |
303 | ConnectionList connections; | |
304 | MessageQueue messages; | |
305 | std::atomic<size_t> queued; | |
306 | std::atomic<size_t> dequeued; | |
307 | CephContext* const cct; | |
308 | mutable std::mutex connections_lock; | |
309 | std::thread runner; | |
310 | ||
311 | // TODO use rd_kafka_produce_batch for better performance | |
312 | void publish_internal(message_wrapper_t* message) { | |
313 | const std::unique_ptr<message_wrapper_t> msg_owner(message); | |
314 | auto& conn = message->conn; | |
315 | ||
20effc67 TL |
316 | conn->timestamp = ceph_clock_now(); |
317 | ||
9f95a23c TL |
318 | if (!conn->is_ok()) { |
319 | // connection had an issue while message was in the queue | |
320 | // TODO add error stats | |
321 | ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue. error: " << status_to_string(conn->status) << dendl; | |
322 | if (message->cb) { | |
323 | message->cb(conn->status); | |
324 | } | |
325 | return; | |
326 | } | |
327 | ||
328 | // create a new topic unless it was already created | |
329 | auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic); | |
330 | rd_kafka_topic_t* topic = nullptr; | |
331 | if (topic_it == conn->topics.end()) { | |
332 | topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr); | |
333 | if (!topic) { | |
334 | const auto err = rd_kafka_last_error(); | |
335 | ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl; | |
336 | if (message->cb) { | |
337 | message->cb(err); | |
338 | } | |
339 | conn->destroy(err); | |
340 | return; | |
341 | } | |
342 | // TODO use the topics list as an LRU cache | |
343 | conn->topics.push_back(topic); | |
344 | ldout(conn->cct, 20) << "Kafka publish: successfully created topic: " << message->topic << dendl; | |
345 | } else { | |
346 | topic = *topic_it; | |
347 | ldout(conn->cct, 20) << "Kafka publish: reused existing topic: " << message->topic << dendl; | |
348 | } | |
349 | ||
350 | const auto tag = (message->cb == nullptr ? nullptr : new uint64_t(conn->delivery_tag++)); | |
351 | const auto rc = rd_kafka_produce( | |
352 | topic, | |
353 | // TODO: non builtin partitioning | |
354 | RD_KAFKA_PARTITION_UA, | |
355 | // make a copy of the payload | |
356 | // so it is safe to pass the pointer from the string | |
357 | RD_KAFKA_MSG_F_COPY, | |
358 | message->message.data(), | |
359 | message->message.length(), | |
360 | // optional key and its length | |
361 | nullptr, | |
362 | 0, | |
363 | // opaque data: tag, used in the global callback | |
364 | // in order to invoke the real callback | |
365 | // null if no callback exists | |
366 | tag); | |
367 | if (rc == -1) { | |
368 | const auto err = rd_kafka_last_error(); | |
369 | ldout(conn->cct, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl; | |
370 | // TODO: dont error on full queue, and don't destroy connection, retry instead | |
371 | // immediatly invoke callback on error if needed | |
372 | if (message->cb) { | |
373 | message->cb(err); | |
374 | } | |
375 | conn->destroy(err); | |
376 | delete tag; | |
f67539c2 | 377 | return; |
9f95a23c TL |
378 | } |
379 | ||
380 | if (tag) { | |
381 | auto const q_len = conn->callbacks.size(); | |
382 | if (q_len < max_inflight) { | |
383 | ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl; | |
384 | conn->callbacks.emplace_back(*tag, message->cb); | |
385 | } else { | |
386 | // immediately invoke callback with error - this is not a connection error | |
387 | ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl; | |
388 | message->cb(STATUS_MAX_INFLIGHT); | |
389 | // tag will be deleted when the global callback is invoked | |
390 | } | |
391 | } else { | |
392 | ldout(conn->cct, 20) << "Kafka publish (no callback): OK" << dendl; | |
393 | } | |
394 | } | |
395 | ||
396 | // the managers thread: | |
397 | // (1) empty the queue of messages to be published | |
398 | // (2) loop over all connections and read acks | |
399 | // (3) manages deleted connections | |
400 | // (4) TODO reconnect on connection errors | |
401 | // (5) TODO cleanup timedout callbacks | |
20effc67 | 402 | void run() noexcept { |
9f95a23c TL |
403 | while (!stopped) { |
404 | ||
405 | // publish all messages in the queue | |
406 | auto reply_count = 0U; | |
407 | const auto send_count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1)); | |
408 | dequeued += send_count; | |
409 | ConnectionList::iterator conn_it; | |
410 | ConnectionList::const_iterator end_it; | |
411 | { | |
412 | // thread safe access to the connection list | |
413 | // once the iterators are fetched they are guaranteed to remain valid | |
414 | std::lock_guard lock(connections_lock); | |
415 | conn_it = connections.begin(); | |
416 | end_it = connections.end(); | |
417 | } | |
418 | // loop over all connections to read acks | |
419 | for (;conn_it != end_it;) { | |
420 | ||
421 | auto& conn = conn_it->second; | |
20effc67 TL |
422 | |
423 | // Checking the connection idlesness | |
424 | if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) { | |
425 | ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; | |
9f95a23c TL |
426 | ERASE_AND_CONTINUE(conn_it, connections); |
427 | } | |
428 | ||
429 | // try to reconnect the connection if it has an error | |
430 | if (!conn->is_ok()) { | |
431 | ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl; | |
432 | const auto& broker = conn_it->first; | |
433 | ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl; | |
434 | if (create_connection(conn)->is_ok() == false) { | |
435 | ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl; | |
436 | // TODO: add error counter for failed retries | |
437 | // TODO: add exponential backoff for retries | |
438 | } else { | |
439 | ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successfull" << dendl; | |
440 | } | |
441 | INCREMENT_AND_CONTINUE(conn_it); | |
442 | } | |
443 | ||
444 | reply_count += rd_kafka_poll(conn->producer, read_timeout_ms); | |
445 | ||
446 | // just increment the iterator | |
447 | ++conn_it; | |
448 | } | |
449 | // if no messages were received or published | |
450 | // across all connection, sleep for 100ms | |
451 | if (send_count == 0 && reply_count == 0) { | |
452 | std::this_thread::sleep_for(std::chrono::milliseconds(100)); | |
453 | } | |
454 | } | |
455 | } | |
456 | ||
457 | // used in the dtor for message cleanup | |
458 | static void delete_message(const message_wrapper_t* message) { | |
459 | delete message; | |
460 | } | |
461 | ||
462 | public: | |
463 | Manager(size_t _max_connections, | |
464 | size_t _max_inflight, | |
465 | size_t _max_queue, | |
466 | int _read_timeout_ms, | |
467 | CephContext* _cct) : | |
468 | max_connections(_max_connections), | |
469 | max_inflight(_max_inflight), | |
470 | max_queue(_max_queue), | |
20effc67 | 471 | max_idle_time(30), |
9f95a23c TL |
472 | connection_count(0), |
473 | stopped(false), | |
474 | read_timeout_ms(_read_timeout_ms), | |
475 | connections(_max_connections), | |
476 | messages(max_queue), | |
477 | queued(0), | |
478 | dequeued(0), | |
479 | cct(_cct), | |
480 | runner(&Manager::run, this) { | |
481 | // The hashmap has "max connections" as the initial number of buckets, | |
482 | // and allows for 10 collisions per bucket before rehash. | |
483 | // This is to prevent rehashing so that iterators are not invalidated | |
484 | // when a new connection is added. | |
485 | connections.max_load_factor(10.0); | |
486 | // give the runner thread a name for easier debugging | |
487 | const auto rc = ceph_pthread_setname(runner.native_handle(), "kafka_manager"); | |
488 | ceph_assert(rc==0); | |
489 | } | |
490 | ||
491 | // non copyable | |
492 | Manager(const Manager&) = delete; | |
493 | const Manager& operator=(const Manager&) = delete; | |
494 | ||
495 | // stop the main thread | |
496 | void stop() { | |
497 | stopped = true; | |
498 | } | |
499 | ||
9f95a23c TL |
500 | // connect to a broker, or reuse an existing connection if already connected |
501 | connection_ptr_t connect(const std::string& url, | |
502 | bool use_ssl, | |
503 | bool verify_ssl, | |
504 | boost::optional<const std::string&> ca_location) { | |
505 | if (stopped) { | |
506 | // TODO: increment counter | |
507 | ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl; | |
508 | return nullptr; | |
509 | } | |
510 | ||
511 | std::string broker; | |
512 | std::string user; | |
513 | std::string password; | |
514 | if (!parse_url_authority(url, broker, user, password)) { | |
515 | // TODO: increment counter | |
516 | ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl; | |
517 | return nullptr; | |
518 | } | |
519 | ||
520 | // this should be validated by the regex in parse_url() | |
521 | ceph_assert(user.empty() == password.empty()); | |
522 | ||
523 | if (!user.empty() && !use_ssl) { | |
524 | ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl; | |
525 | return nullptr; | |
526 | } | |
527 | ||
528 | std::lock_guard lock(connections_lock); | |
529 | const auto it = connections.find(broker); | |
530 | // note that ssl vs. non-ssl connection to the same host are two separate conenctions | |
531 | if (it != connections.end()) { | |
9f95a23c TL |
532 | // connection found - return even if non-ok |
533 | ldout(cct, 20) << "Kafka connect: connection found" << dendl; | |
534 | return it->second; | |
535 | } | |
536 | ||
537 | // connection not found, creating a new one | |
538 | if (connection_count >= max_connections) { | |
539 | // TODO: increment counter | |
540 | ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl; | |
541 | return nullptr; | |
542 | } | |
543 | const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password); | |
544 | // create_new_connection must always return a connection object | |
545 | // even if error occurred during creation. | |
546 | // in such a case the creation will be retried in the main thread | |
547 | ceph_assert(conn); | |
548 | ++connection_count; | |
549 | ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl; | |
550 | return connections.emplace(broker, conn).first->second; | |
551 | } | |
552 | ||
553 | // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack) | |
554 | int publish(connection_ptr_t& conn, | |
555 | const std::string& topic, | |
556 | const std::string& message) { | |
557 | if (stopped) { | |
558 | return STATUS_MANAGER_STOPPED; | |
559 | } | |
560 | if (!conn || !conn->is_ok()) { | |
561 | return STATUS_CONNECTION_CLOSED; | |
562 | } | |
563 | if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) { | |
564 | ++queued; | |
565 | return STATUS_OK; | |
566 | } | |
567 | return STATUS_QUEUE_FULL; | |
568 | } | |
569 | ||
570 | int publish_with_confirm(connection_ptr_t& conn, | |
571 | const std::string& topic, | |
572 | const std::string& message, | |
573 | reply_callback_t cb) { | |
574 | if (stopped) { | |
575 | return STATUS_MANAGER_STOPPED; | |
576 | } | |
577 | if (!conn || !conn->is_ok()) { | |
578 | return STATUS_CONNECTION_CLOSED; | |
579 | } | |
580 | if (messages.push(new message_wrapper_t(conn, topic, message, cb))) { | |
581 | ++queued; | |
582 | return STATUS_OK; | |
583 | } | |
584 | return STATUS_QUEUE_FULL; | |
585 | } | |
586 | ||
587 | // dtor wait for thread to stop | |
588 | // then connection are cleaned-up | |
589 | ~Manager() { | |
590 | stopped = true; | |
591 | runner.join(); | |
592 | messages.consume_all(delete_message); | |
593 | } | |
594 | ||
595 | // get the number of connections | |
596 | size_t get_connection_count() const { | |
597 | return connection_count; | |
598 | } | |
599 | ||
600 | // get the number of in-flight messages | |
601 | size_t get_inflight() const { | |
602 | size_t sum = 0; | |
603 | std::lock_guard lock(connections_lock); | |
604 | std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) { | |
605 | sum += conn_pair.second->callbacks.size(); | |
606 | }); | |
607 | return sum; | |
608 | } | |
609 | ||
610 | // running counter of the queued messages | |
611 | size_t get_queued() const { | |
612 | return queued; | |
613 | } | |
614 | ||
615 | // running counter of the dequeued messages | |
616 | size_t get_dequeued() const { | |
617 | return dequeued; | |
618 | } | |
619 | }; | |
620 | ||
621 | // singleton manager | |
622 | // note that the manager itself is not a singleton, and multiple instances may co-exist | |
623 | // TODO make the pointer atomic in allocation and deallocation to avoid race conditions | |
624 | static Manager* s_manager = nullptr; | |
625 | ||
626 | static const size_t MAX_CONNECTIONS_DEFAULT = 256; | |
627 | static const size_t MAX_INFLIGHT_DEFAULT = 8192; | |
628 | static const size_t MAX_QUEUE_DEFAULT = 8192; | |
629 | static const int READ_TIMEOUT_MS_DEFAULT = 500; | |
630 | ||
631 | bool init(CephContext* cct) { | |
632 | if (s_manager) { | |
633 | return false; | |
634 | } | |
635 | // TODO: take conf from CephContext | |
636 | s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, READ_TIMEOUT_MS_DEFAULT, cct); | |
637 | return true; | |
638 | } | |
639 | ||
640 | void shutdown() { | |
641 | delete s_manager; | |
642 | s_manager = nullptr; | |
643 | } | |
644 | ||
645 | connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, | |
646 | boost::optional<const std::string&> ca_location) { | |
647 | if (!s_manager) return nullptr; | |
648 | return s_manager->connect(url, use_ssl, verify_ssl, ca_location); | |
649 | } | |
650 | ||
651 | int publish(connection_ptr_t& conn, | |
652 | const std::string& topic, | |
653 | const std::string& message) { | |
654 | if (!s_manager) return STATUS_MANAGER_STOPPED; | |
655 | return s_manager->publish(conn, topic, message); | |
656 | } | |
657 | ||
658 | int publish_with_confirm(connection_ptr_t& conn, | |
659 | const std::string& topic, | |
660 | const std::string& message, | |
661 | reply_callback_t cb) { | |
662 | if (!s_manager) return STATUS_MANAGER_STOPPED; | |
663 | return s_manager->publish_with_confirm(conn, topic, message, cb); | |
664 | } | |
665 | ||
666 | size_t get_connection_count() { | |
667 | if (!s_manager) return 0; | |
668 | return s_manager->get_connection_count(); | |
669 | } | |
670 | ||
671 | size_t get_inflight() { | |
672 | if (!s_manager) return 0; | |
673 | return s_manager->get_inflight(); | |
674 | } | |
675 | ||
676 | size_t get_queued() { | |
677 | if (!s_manager) return 0; | |
678 | return s_manager->get_queued(); | |
679 | } | |
680 | ||
681 | size_t get_dequeued() { | |
682 | if (!s_manager) return 0; | |
683 | return s_manager->get_dequeued(); | |
684 | } | |
685 | ||
686 | size_t get_max_connections() { | |
687 | if (!s_manager) return MAX_CONNECTIONS_DEFAULT; | |
688 | return s_manager->max_connections; | |
689 | } | |
690 | ||
691 | size_t get_max_inflight() { | |
692 | if (!s_manager) return MAX_INFLIGHT_DEFAULT; | |
693 | return s_manager->max_inflight; | |
694 | } | |
695 | ||
696 | size_t get_max_queue() { | |
697 | if (!s_manager) return MAX_QUEUE_DEFAULT; | |
698 | return s_manager->max_queue; | |
699 | } | |
700 | ||
9f95a23c TL |
701 | } // namespace kafka |
702 |