]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_kafka.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_kafka.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_kafka.h"
5 #include "rgw_url.h"
6 #include <librdkafka/rdkafka.h>
7 #include "include/ceph_assert.h"
8 #include <sstream>
9 #include <cstring>
10 #include <unordered_map>
11 #include <string>
12 #include <vector>
13 #include <thread>
14 #include <atomic>
15 #include <mutex>
16 #include <boost/lockfree/queue.hpp>
17 #include "common/dout.h"
18
19 #define dout_subsys ceph_subsys_rgw
20
21 // TODO investigation, not necessarily issues:
22 // (1) in case of single threaded writer context use spsc_queue
23 // (2) check performance of emptying queue to local list, and go over the list and publish
24 // (3) use std::shared_mutex (c++17) or equivalent for the connections lock
25
26 // cmparisson operator between topic pointer and name
27 bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) {
28 return name == std::string_view(rd_kafka_topic_name(rkt));
29 }
30
31 namespace rgw::kafka {
32
33 // status codes for publishing
34 // TODO: use the actual error code (when conn exists) instead of STATUS_CONNECTION_CLOSED when replying to client
35 static const int STATUS_CONNECTION_CLOSED = -0x1002;
36 static const int STATUS_QUEUE_FULL = -0x1003;
37 static const int STATUS_MAX_INFLIGHT = -0x1004;
38 static const int STATUS_MANAGER_STOPPED = -0x1005;
39 // status code for connection opening
40 static const int STATUS_CONF_ALLOC_FAILED = -0x2001;
41
42 static const int STATUS_OK = 0x0;
43
44 // struct for holding the callback and its tag in the callback list
45 struct reply_callback_with_tag_t {
46 uint64_t tag;
47 reply_callback_t cb;
48
49 reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {}
50
51 bool operator==(uint64_t rhs) {
52 return tag == rhs;
53 }
54 };
55
56 typedef std::vector<reply_callback_with_tag_t> CallbackList;
57
58 // struct for holding the connection state object as well as list of topics
59 // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
60 // since references to deleted objects may still exist in the calling code
61 struct connection_t {
62 rd_kafka_t* producer = nullptr;
63 rd_kafka_conf_t* temp_conf = nullptr;
64 std::vector<rd_kafka_topic_t*> topics;
65 bool marked_for_deletion = false;
66 uint64_t delivery_tag = 1;
67 int status;
68 mutable std::atomic<int> ref_count = 0;
69 CephContext* const cct;
70 CallbackList callbacks;
71 const std::string broker;
72 const bool use_ssl;
73 const bool verify_ssl; // TODO currently iognored, not supported in librdkafka v0.11.6
74 const boost::optional<std::string> ca_location;
75 const std::string user;
76 const std::string password;
77
78 // cleanup of all internal connection resource
79 // the object can still remain, and internal connection
80 // resources created again on successful reconnection
81 void destroy(int s) {
82 status = s;
83 // destroy temporary conf (if connection was never established)
84 if (temp_conf) {
85 rd_kafka_conf_destroy(temp_conf);
86 return;
87 }
88 // wait for all remaining acks/nacks
89 rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */);
90 // destroy all topics
91 std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
92 // destroy producer
93 rd_kafka_destroy(producer);
94 // fire all remaining callbacks (if not fired by rd_kafka_flush)
95 std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
96 cb_tag.cb(status);
97 ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag << dendl;
98 });
99 callbacks.clear();
100 delivery_tag = 1;
101 }
102
103 bool is_ok() const {
104 return (producer != nullptr && !marked_for_deletion);
105 }
106
107 // ctor for setting immutable values
108 connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl,
109 const boost::optional<const std::string&>& _ca_location,
110 const std::string& _user, const std::string& _password) :
111 cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password) {}
112
113 // dtor also destroys the internals
114 ~connection_t() {
115 destroy(STATUS_CONNECTION_CLOSED);
116 }
117
118 friend void intrusive_ptr_add_ref(const connection_t* p);
119 friend void intrusive_ptr_release(const connection_t* p);
120 };
121
122 std::string to_string(const connection_ptr_t& conn) {
123 std::string str;
124 str += "\nBroker: " + conn->broker;
125 str += conn->use_ssl ? "\nUse SSL" : "";
126 str += conn->ca_location ? "\nCA Location: " + *(conn->ca_location) : "";
127 return str;
128 }
129 // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
130 void intrusive_ptr_add_ref(const connection_t* p) {
131 ++p->ref_count;
132 }
133 void intrusive_ptr_release(const connection_t* p) {
134 if (--p->ref_count == 0) {
135 delete p;
136 }
137 }
138
139 // convert int status to string - including RGW specific values
140 std::string status_to_string(int s) {
141 switch (s) {
142 case STATUS_OK:
143 return "STATUS_OK";
144 case STATUS_CONNECTION_CLOSED:
145 return "RGW_KAFKA_STATUS_CONNECTION_CLOSED";
146 case STATUS_QUEUE_FULL:
147 return "RGW_KAFKA_STATUS_QUEUE_FULL";
148 case STATUS_MAX_INFLIGHT:
149 return "RGW_KAFKA_STATUS_MAX_INFLIGHT";
150 case STATUS_MANAGER_STOPPED:
151 return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
152 case STATUS_CONF_ALLOC_FAILED:
153 return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
154 }
155 return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
156 }
157
158 void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
159 ceph_assert(opaque);
160
161 const auto conn = reinterpret_cast<connection_t*>(opaque);
162 const auto result = rkmessage->err;
163
164 if (!rkmessage->_private) {
165 ldout(conn->cct, 20) << "Kafka run: n/ack received, (no callback) with result=" << result << dendl;
166 return;
167 }
168
169 const auto tag = reinterpret_cast<uint64_t*>(rkmessage->_private);
170 const auto& callbacks_end = conn->callbacks.end();
171 const auto& callbacks_begin = conn->callbacks.begin();
172 const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag);
173 if (tag_it != callbacks_end) {
174 ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" <<
175 *tag << " and result=" << rd_kafka_err2str(result) << dendl;
176 tag_it->cb(result);
177 conn->callbacks.erase(tag_it);
178 } else {
179 // TODO add counter for acks with no callback
180 ldout(conn->cct, 10) << "Kafka run: unsolicited n/ack received with tag=" <<
181 *tag << dendl;
182 }
183 delete tag;
184 // rkmessage is destroyed automatically by librdkafka
185 }
186
187 // utility function to create a connection, when the connection object already exists
188 connection_ptr_t& create_connection(connection_ptr_t& conn) {
189 // pointer must be valid and not marked for deletion
190 ceph_assert(conn && !conn->marked_for_deletion);
191
192 // reset all status codes
193 conn->status = STATUS_OK;
194 char errstr[512] = {0};
195
196 conn->temp_conf = rd_kafka_conf_new();
197 if (!conn->temp_conf) {
198 conn->status = STATUS_CONF_ALLOC_FAILED;
199 return conn;
200 }
201
202 // get list of brokers based on the bootsrap broker
203 if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
204
205 if (conn->use_ssl) {
206 if (!conn->user.empty()) {
207 // use SSL+SASL
208 if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
209 rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
210 rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
211 rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
212 ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl;
213 } else {
214 // use only SSL
215 if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
216 ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL security" << dendl;
217 }
218 if (conn->ca_location) {
219 if (rd_kafka_conf_set(conn->temp_conf, "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
220 ldout(conn->cct, 20) << "Kafka connect: successfully configured CA location" << dendl;
221 } else {
222 ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl;
223 }
224 // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call
225 // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
226
227 ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl;
228 }
229
230 // set the global callback for delivery success/fail
231 rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback);
232
233 // set the global opaque pointer to be the connection itself
234 rd_kafka_conf_set_opaque(conn->temp_conf, conn.get());
235
236 // create the producer
237 conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
238 if (!conn->producer) {
239 conn->status = rd_kafka_last_error();
240 ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl;
241 return conn;
242 }
243 ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl;
244
245 // conf ownership passed to producer
246 conn->temp_conf = nullptr;
247 return conn;
248
249 conf_error:
250 conn->status = rd_kafka_last_error();
251 ldout(conn->cct, 1) << "Kafka connect: configuration failed: " << errstr << dendl;
252 return conn;
253 }
254
255 // utility function to create a new connection
256 connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct,
257 bool use_ssl,
258 bool verify_ssl,
259 boost::optional<const std::string&> ca_location,
260 const std::string& user,
261 const std::string& password) {
262 // create connection state
263 connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password));
264 return create_connection(conn);
265 }
266
267 /// struct used for holding messages in the message queue
268 struct message_wrapper_t {
269 connection_ptr_t conn;
270 std::string topic;
271 std::string message;
272 reply_callback_t cb;
273
274 message_wrapper_t(connection_ptr_t& _conn,
275 const std::string& _topic,
276 const std::string& _message,
277 reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
278 };
279
280 typedef std::unordered_map<std::string, connection_ptr_t> ConnectionList;
281 typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
282
283 // macros used inside a loop where an iterator is either incremented or erased
284 #define INCREMENT_AND_CONTINUE(IT) \
285 ++IT; \
286 continue;
287
288 #define ERASE_AND_CONTINUE(IT,CONTAINER) \
289 IT=CONTAINER.erase(IT); \
290 --connection_count; \
291 continue;
292
293 class Manager {
294 public:
295 const size_t max_connections;
296 const size_t max_inflight;
297 const size_t max_queue;
298 private:
299 std::atomic<size_t> connection_count;
300 bool stopped;
301 int read_timeout_ms;
302 ConnectionList connections;
303 MessageQueue messages;
304 std::atomic<size_t> queued;
305 std::atomic<size_t> dequeued;
306 CephContext* const cct;
307 mutable std::mutex connections_lock;
308 std::thread runner;
309
310 // TODO use rd_kafka_produce_batch for better performance
311 void publish_internal(message_wrapper_t* message) {
312 const std::unique_ptr<message_wrapper_t> msg_owner(message);
313 auto& conn = message->conn;
314
315 if (!conn->is_ok()) {
316 // connection had an issue while message was in the queue
317 // TODO add error stats
318 ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue. error: " << status_to_string(conn->status) << dendl;
319 if (message->cb) {
320 message->cb(conn->status);
321 }
322 return;
323 }
324
325 // create a new topic unless it was already created
326 auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic);
327 rd_kafka_topic_t* topic = nullptr;
328 if (topic_it == conn->topics.end()) {
329 topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
330 if (!topic) {
331 const auto err = rd_kafka_last_error();
332 ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl;
333 if (message->cb) {
334 message->cb(err);
335 }
336 conn->destroy(err);
337 return;
338 }
339 // TODO use the topics list as an LRU cache
340 conn->topics.push_back(topic);
341 ldout(conn->cct, 20) << "Kafka publish: successfully created topic: " << message->topic << dendl;
342 } else {
343 topic = *topic_it;
344 ldout(conn->cct, 20) << "Kafka publish: reused existing topic: " << message->topic << dendl;
345 }
346
347 const auto tag = (message->cb == nullptr ? nullptr : new uint64_t(conn->delivery_tag++));
348 const auto rc = rd_kafka_produce(
349 topic,
350 // TODO: non builtin partitioning
351 RD_KAFKA_PARTITION_UA,
352 // make a copy of the payload
353 // so it is safe to pass the pointer from the string
354 RD_KAFKA_MSG_F_COPY,
355 message->message.data(),
356 message->message.length(),
357 // optional key and its length
358 nullptr,
359 0,
360 // opaque data: tag, used in the global callback
361 // in order to invoke the real callback
362 // null if no callback exists
363 tag);
364 if (rc == -1) {
365 const auto err = rd_kafka_last_error();
366 ldout(conn->cct, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
367 // TODO: dont error on full queue, and don't destroy connection, retry instead
368 // immediatly invoke callback on error if needed
369 if (message->cb) {
370 message->cb(err);
371 }
372 conn->destroy(err);
373 delete tag;
374 }
375
376 if (tag) {
377 auto const q_len = conn->callbacks.size();
378 if (q_len < max_inflight) {
379 ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
380 conn->callbacks.emplace_back(*tag, message->cb);
381 } else {
382 // immediately invoke callback with error - this is not a connection error
383 ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl;
384 message->cb(STATUS_MAX_INFLIGHT);
385 // tag will be deleted when the global callback is invoked
386 }
387 } else {
388 ldout(conn->cct, 20) << "Kafka publish (no callback): OK" << dendl;
389 }
390 }
391
392 // the managers thread:
393 // (1) empty the queue of messages to be published
394 // (2) loop over all connections and read acks
395 // (3) manages deleted connections
396 // (4) TODO reconnect on connection errors
397 // (5) TODO cleanup timedout callbacks
398 void run() {
399 while (!stopped) {
400
401 // publish all messages in the queue
402 auto reply_count = 0U;
403 const auto send_count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
404 dequeued += send_count;
405 ConnectionList::iterator conn_it;
406 ConnectionList::const_iterator end_it;
407 {
408 // thread safe access to the connection list
409 // once the iterators are fetched they are guaranteed to remain valid
410 std::lock_guard lock(connections_lock);
411 conn_it = connections.begin();
412 end_it = connections.end();
413 }
414 // loop over all connections to read acks
415 for (;conn_it != end_it;) {
416
417 auto& conn = conn_it->second;
418 // delete the connection if marked for deletion
419 if (conn->marked_for_deletion) {
420 ldout(conn->cct, 10) << "Kafka run: connection is deleted" << dendl;
421 conn->destroy(STATUS_CONNECTION_CLOSED);
422 std::lock_guard lock(connections_lock);
423 // erase is safe - does not invalidate any other iterator
424 // lock so no insertion happens at the same time
425 ERASE_AND_CONTINUE(conn_it, connections);
426 }
427
428 // try to reconnect the connection if it has an error
429 if (!conn->is_ok()) {
430 ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl;
431 const auto& broker = conn_it->first;
432 ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl;
433 if (create_connection(conn)->is_ok() == false) {
434 ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl;
435 // TODO: add error counter for failed retries
436 // TODO: add exponential backoff for retries
437 } else {
438 ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successfull" << dendl;
439 }
440 INCREMENT_AND_CONTINUE(conn_it);
441 }
442
443 reply_count += rd_kafka_poll(conn->producer, read_timeout_ms);
444
445 // just increment the iterator
446 ++conn_it;
447 }
448 // if no messages were received or published
449 // across all connection, sleep for 100ms
450 if (send_count == 0 && reply_count == 0) {
451 std::this_thread::sleep_for(std::chrono::milliseconds(100));
452 }
453 }
454 }
455
456 // used in the dtor for message cleanup
457 static void delete_message(const message_wrapper_t* message) {
458 delete message;
459 }
460
461 public:
462 Manager(size_t _max_connections,
463 size_t _max_inflight,
464 size_t _max_queue,
465 int _read_timeout_ms,
466 CephContext* _cct) :
467 max_connections(_max_connections),
468 max_inflight(_max_inflight),
469 max_queue(_max_queue),
470 connection_count(0),
471 stopped(false),
472 read_timeout_ms(_read_timeout_ms),
473 connections(_max_connections),
474 messages(max_queue),
475 queued(0),
476 dequeued(0),
477 cct(_cct),
478 runner(&Manager::run, this) {
479 // The hashmap has "max connections" as the initial number of buckets,
480 // and allows for 10 collisions per bucket before rehash.
481 // This is to prevent rehashing so that iterators are not invalidated
482 // when a new connection is added.
483 connections.max_load_factor(10.0);
484 // give the runner thread a name for easier debugging
485 const auto rc = ceph_pthread_setname(runner.native_handle(), "kafka_manager");
486 ceph_assert(rc==0);
487 }
488
489 // non copyable
490 Manager(const Manager&) = delete;
491 const Manager& operator=(const Manager&) = delete;
492
493 // stop the main thread
494 void stop() {
495 stopped = true;
496 }
497
498 // disconnect from a broker
499 bool disconnect(connection_ptr_t& conn) {
500 if (!conn || stopped) {
501 return false;
502 }
503 conn->marked_for_deletion = true;
504 return true;
505 }
506
507 // connect to a broker, or reuse an existing connection if already connected
508 connection_ptr_t connect(const std::string& url,
509 bool use_ssl,
510 bool verify_ssl,
511 boost::optional<const std::string&> ca_location) {
512 if (stopped) {
513 // TODO: increment counter
514 ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
515 return nullptr;
516 }
517
518 std::string broker;
519 std::string user;
520 std::string password;
521 if (!parse_url_authority(url, broker, user, password)) {
522 // TODO: increment counter
523 ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
524 return nullptr;
525 }
526
527 // this should be validated by the regex in parse_url()
528 ceph_assert(user.empty() == password.empty());
529
530 if (!user.empty() && !use_ssl) {
531 ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
532 return nullptr;
533 }
534
535 std::lock_guard lock(connections_lock);
536 const auto it = connections.find(broker);
537 // note that ssl vs. non-ssl connection to the same host are two separate conenctions
538 if (it != connections.end()) {
539 if (it->second->marked_for_deletion) {
540 // TODO: increment counter
541 ldout(cct, 1) << "Kafka connect: endpoint marked for deletion" << dendl;
542 return nullptr;
543 }
544 // connection found - return even if non-ok
545 ldout(cct, 20) << "Kafka connect: connection found" << dendl;
546 return it->second;
547 }
548
549 // connection not found, creating a new one
550 if (connection_count >= max_connections) {
551 // TODO: increment counter
552 ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl;
553 return nullptr;
554 }
555 const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password);
556 // create_new_connection must always return a connection object
557 // even if error occurred during creation.
558 // in such a case the creation will be retried in the main thread
559 ceph_assert(conn);
560 ++connection_count;
561 ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
562 return connections.emplace(broker, conn).first->second;
563 }
564
565 // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
566 int publish(connection_ptr_t& conn,
567 const std::string& topic,
568 const std::string& message) {
569 if (stopped) {
570 return STATUS_MANAGER_STOPPED;
571 }
572 if (!conn || !conn->is_ok()) {
573 return STATUS_CONNECTION_CLOSED;
574 }
575 if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
576 ++queued;
577 return STATUS_OK;
578 }
579 return STATUS_QUEUE_FULL;
580 }
581
582 int publish_with_confirm(connection_ptr_t& conn,
583 const std::string& topic,
584 const std::string& message,
585 reply_callback_t cb) {
586 if (stopped) {
587 return STATUS_MANAGER_STOPPED;
588 }
589 if (!conn || !conn->is_ok()) {
590 return STATUS_CONNECTION_CLOSED;
591 }
592 if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
593 ++queued;
594 return STATUS_OK;
595 }
596 return STATUS_QUEUE_FULL;
597 }
598
599 // dtor wait for thread to stop
600 // then connection are cleaned-up
601 ~Manager() {
602 stopped = true;
603 runner.join();
604 messages.consume_all(delete_message);
605 }
606
607 // get the number of connections
608 size_t get_connection_count() const {
609 return connection_count;
610 }
611
612 // get the number of in-flight messages
613 size_t get_inflight() const {
614 size_t sum = 0;
615 std::lock_guard lock(connections_lock);
616 std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
617 sum += conn_pair.second->callbacks.size();
618 });
619 return sum;
620 }
621
622 // running counter of the queued messages
623 size_t get_queued() const {
624 return queued;
625 }
626
627 // running counter of the dequeued messages
628 size_t get_dequeued() const {
629 return dequeued;
630 }
631 };
632
633 // singleton manager
634 // note that the manager itself is not a singleton, and multiple instances may co-exist
635 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
636 static Manager* s_manager = nullptr;
637
638 static const size_t MAX_CONNECTIONS_DEFAULT = 256;
639 static const size_t MAX_INFLIGHT_DEFAULT = 8192;
640 static const size_t MAX_QUEUE_DEFAULT = 8192;
641 static const int READ_TIMEOUT_MS_DEFAULT = 500;
642
643 bool init(CephContext* cct) {
644 if (s_manager) {
645 return false;
646 }
647 // TODO: take conf from CephContext
648 s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, READ_TIMEOUT_MS_DEFAULT, cct);
649 return true;
650 }
651
652 void shutdown() {
653 delete s_manager;
654 s_manager = nullptr;
655 }
656
657 connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl,
658 boost::optional<const std::string&> ca_location) {
659 if (!s_manager) return nullptr;
660 return s_manager->connect(url, use_ssl, verify_ssl, ca_location);
661 }
662
663 int publish(connection_ptr_t& conn,
664 const std::string& topic,
665 const std::string& message) {
666 if (!s_manager) return STATUS_MANAGER_STOPPED;
667 return s_manager->publish(conn, topic, message);
668 }
669
670 int publish_with_confirm(connection_ptr_t& conn,
671 const std::string& topic,
672 const std::string& message,
673 reply_callback_t cb) {
674 if (!s_manager) return STATUS_MANAGER_STOPPED;
675 return s_manager->publish_with_confirm(conn, topic, message, cb);
676 }
677
678 size_t get_connection_count() {
679 if (!s_manager) return 0;
680 return s_manager->get_connection_count();
681 }
682
683 size_t get_inflight() {
684 if (!s_manager) return 0;
685 return s_manager->get_inflight();
686 }
687
688 size_t get_queued() {
689 if (!s_manager) return 0;
690 return s_manager->get_queued();
691 }
692
693 size_t get_dequeued() {
694 if (!s_manager) return 0;
695 return s_manager->get_dequeued();
696 }
697
698 size_t get_max_connections() {
699 if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
700 return s_manager->max_connections;
701 }
702
703 size_t get_max_inflight() {
704 if (!s_manager) return MAX_INFLIGHT_DEFAULT;
705 return s_manager->max_inflight;
706 }
707
708 size_t get_max_queue() {
709 if (!s_manager) return MAX_QUEUE_DEFAULT;
710 return s_manager->max_queue;
711 }
712
713 bool disconnect(connection_ptr_t& conn) {
714 if (!s_manager) return false;
715 return s_manager->disconnect(conn);
716 }
717
718 } // namespace kafka
719