]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_kafka.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_kafka.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#include "rgw_kafka.h"
5#include "rgw_url.h"
6#include <librdkafka/rdkafka.h>
7#include "include/ceph_assert.h"
8#include <sstream>
9#include <cstring>
10#include <unordered_map>
11#include <string>
12#include <vector>
13#include <thread>
14#include <atomic>
15#include <mutex>
16#include <boost/lockfree/queue.hpp>
17#include "common/dout.h"
18
19#define dout_subsys ceph_subsys_rgw
20
21// TODO investigation, not necessarily issues:
22// (1) in case of single threaded writer context use spsc_queue
23// (2) check performance of emptying queue to local list, and go over the list and publish
24// (3) use std::shared_mutex (c++17) or equivalent for the connections lock
25
26// cmparisson operator between topic pointer and name
27bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) {
28 return name == std::string_view(rd_kafka_topic_name(rkt));
29}
30
31namespace rgw::kafka {
32
33// status codes for publishing
34// TODO: use the actual error code (when conn exists) instead of STATUS_CONNECTION_CLOSED when replying to client
35static const int STATUS_CONNECTION_CLOSED = -0x1002;
36static const int STATUS_QUEUE_FULL = -0x1003;
37static const int STATUS_MAX_INFLIGHT = -0x1004;
38static const int STATUS_MANAGER_STOPPED = -0x1005;
39// status code for connection opening
40static const int STATUS_CONF_ALLOC_FAILED = -0x2001;
41
42static const int STATUS_OK = 0x0;
43
44// struct for holding the callback and its tag in the callback list
45struct reply_callback_with_tag_t {
46 uint64_t tag;
47 reply_callback_t cb;
48
49 reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {}
50
51 bool operator==(uint64_t rhs) {
52 return tag == rhs;
53 }
54};
55
56typedef std::vector<reply_callback_with_tag_t> CallbackList;
57
58// struct for holding the connection state object as well as list of topics
59// it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
60// since references to deleted objects may still exist in the calling code
61struct connection_t {
62 rd_kafka_t* producer = nullptr;
63 rd_kafka_conf_t* temp_conf = nullptr;
64 std::vector<rd_kafka_topic_t*> topics;
9f95a23c 65 uint64_t delivery_tag = 1;
f67539c2 66 int status = STATUS_OK;
9f95a23c
TL
67 mutable std::atomic<int> ref_count = 0;
68 CephContext* const cct;
69 CallbackList callbacks;
70 const std::string broker;
71 const bool use_ssl;
72 const bool verify_ssl; // TODO currently iognored, not supported in librdkafka v0.11.6
73 const boost::optional<std::string> ca_location;
74 const std::string user;
75 const std::string password;
20effc67 76 utime_t timestamp = ceph_clock_now();
9f95a23c
TL
77
78 // cleanup of all internal connection resource
79 // the object can still remain, and internal connection
80 // resources created again on successful reconnection
81 void destroy(int s) {
82 status = s;
83 // destroy temporary conf (if connection was never established)
84 if (temp_conf) {
85 rd_kafka_conf_destroy(temp_conf);
86 return;
87 }
88 // wait for all remaining acks/nacks
89 rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */);
90 // destroy all topics
91 std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
92 // destroy producer
93 rd_kafka_destroy(producer);
94 // fire all remaining callbacks (if not fired by rd_kafka_flush)
95 std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
96 cb_tag.cb(status);
97 ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag << dendl;
98 });
99 callbacks.clear();
100 delivery_tag = 1;
101 }
102
103 bool is_ok() const {
20effc67 104 return (producer != nullptr);
9f95a23c
TL
105 }
106
107 // ctor for setting immutable values
108 connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl,
109 const boost::optional<const std::string&>& _ca_location,
110 const std::string& _user, const std::string& _password) :
111 cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password) {}
112
113 // dtor also destroys the internals
114 ~connection_t() {
115 destroy(STATUS_CONNECTION_CLOSED);
116 }
117
118 friend void intrusive_ptr_add_ref(const connection_t* p);
119 friend void intrusive_ptr_release(const connection_t* p);
120};
121
122std::string to_string(const connection_ptr_t& conn) {
123 std::string str;
124 str += "\nBroker: " + conn->broker;
125 str += conn->use_ssl ? "\nUse SSL" : "";
126 str += conn->ca_location ? "\nCA Location: " + *(conn->ca_location) : "";
127 return str;
128}
129// these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
130void intrusive_ptr_add_ref(const connection_t* p) {
131 ++p->ref_count;
132}
133void intrusive_ptr_release(const connection_t* p) {
134 if (--p->ref_count == 0) {
135 delete p;
136 }
137}
138
139// convert int status to string - including RGW specific values
140std::string status_to_string(int s) {
141 switch (s) {
142 case STATUS_OK:
143 return "STATUS_OK";
144 case STATUS_CONNECTION_CLOSED:
145 return "RGW_KAFKA_STATUS_CONNECTION_CLOSED";
146 case STATUS_QUEUE_FULL:
147 return "RGW_KAFKA_STATUS_QUEUE_FULL";
148 case STATUS_MAX_INFLIGHT:
149 return "RGW_KAFKA_STATUS_MAX_INFLIGHT";
150 case STATUS_MANAGER_STOPPED:
151 return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
152 case STATUS_CONF_ALLOC_FAILED:
153 return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
154 }
155 return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
156}
157
158void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
159 ceph_assert(opaque);
160
161 const auto conn = reinterpret_cast<connection_t*>(opaque);
162 const auto result = rkmessage->err;
163
164 if (!rkmessage->_private) {
165 ldout(conn->cct, 20) << "Kafka run: n/ack received, (no callback) with result=" << result << dendl;
166 return;
167 }
168
169 const auto tag = reinterpret_cast<uint64_t*>(rkmessage->_private);
170 const auto& callbacks_end = conn->callbacks.end();
171 const auto& callbacks_begin = conn->callbacks.begin();
172 const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag);
173 if (tag_it != callbacks_end) {
174 ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" <<
175 *tag << " and result=" << rd_kafka_err2str(result) << dendl;
176 tag_it->cb(result);
177 conn->callbacks.erase(tag_it);
178 } else {
179 // TODO add counter for acks with no callback
180 ldout(conn->cct, 10) << "Kafka run: unsolicited n/ack received with tag=" <<
181 *tag << dendl;
182 }
183 delete tag;
184 // rkmessage is destroyed automatically by librdkafka
185}
186
187// utility function to create a connection, when the connection object already exists
188connection_ptr_t& create_connection(connection_ptr_t& conn) {
189 // pointer must be valid and not marked for deletion
20effc67 190 ceph_assert(conn);
9f95a23c
TL
191
192 // reset all status codes
193 conn->status = STATUS_OK;
194 char errstr[512] = {0};
195
196 conn->temp_conf = rd_kafka_conf_new();
197 if (!conn->temp_conf) {
198 conn->status = STATUS_CONF_ALLOC_FAILED;
199 return conn;
200 }
201
202 // get list of brokers based on the bootsrap broker
203 if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
204
205 if (conn->use_ssl) {
206 if (!conn->user.empty()) {
207 // use SSL+SASL
208 if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
209 rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
210 rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
211 rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
212 ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl;
213 } else {
214 // use only SSL
215 if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
216 ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL security" << dendl;
217 }
218 if (conn->ca_location) {
219 if (rd_kafka_conf_set(conn->temp_conf, "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
220 ldout(conn->cct, 20) << "Kafka connect: successfully configured CA location" << dendl;
221 } else {
222 ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl;
223 }
224 // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call
225 // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
226
227 ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl;
228 }
229
230 // set the global callback for delivery success/fail
231 rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback);
232
233 // set the global opaque pointer to be the connection itself
234 rd_kafka_conf_set_opaque(conn->temp_conf, conn.get());
235
236 // create the producer
237 conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
238 if (!conn->producer) {
239 conn->status = rd_kafka_last_error();
240 ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl;
241 return conn;
242 }
243 ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl;
244
245 // conf ownership passed to producer
246 conn->temp_conf = nullptr;
247 return conn;
248
249conf_error:
250 conn->status = rd_kafka_last_error();
251 ldout(conn->cct, 1) << "Kafka connect: configuration failed: " << errstr << dendl;
252 return conn;
253}
254
255// utility function to create a new connection
256connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct,
257 bool use_ssl,
258 bool verify_ssl,
259 boost::optional<const std::string&> ca_location,
260 const std::string& user,
261 const std::string& password) {
262 // create connection state
263 connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password));
264 return create_connection(conn);
265}
266
267/// struct used for holding messages in the message queue
268struct message_wrapper_t {
269 connection_ptr_t conn;
270 std::string topic;
271 std::string message;
272 reply_callback_t cb;
273
274 message_wrapper_t(connection_ptr_t& _conn,
275 const std::string& _topic,
276 const std::string& _message,
277 reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
278};
279
280typedef std::unordered_map<std::string, connection_ptr_t> ConnectionList;
281typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
282
283// macros used inside a loop where an iterator is either incremented or erased
284#define INCREMENT_AND_CONTINUE(IT) \
285 ++IT; \
286 continue;
287
288#define ERASE_AND_CONTINUE(IT,CONTAINER) \
289 IT=CONTAINER.erase(IT); \
290 --connection_count; \
291 continue;
292
293class Manager {
294public:
295 const size_t max_connections;
296 const size_t max_inflight;
297 const size_t max_queue;
20effc67 298 const size_t max_idle_time;
9f95a23c
TL
299private:
300 std::atomic<size_t> connection_count;
301 bool stopped;
302 int read_timeout_ms;
303 ConnectionList connections;
304 MessageQueue messages;
305 std::atomic<size_t> queued;
306 std::atomic<size_t> dequeued;
307 CephContext* const cct;
308 mutable std::mutex connections_lock;
309 std::thread runner;
310
311 // TODO use rd_kafka_produce_batch for better performance
312 void publish_internal(message_wrapper_t* message) {
313 const std::unique_ptr<message_wrapper_t> msg_owner(message);
314 auto& conn = message->conn;
315
20effc67
TL
316 conn->timestamp = ceph_clock_now();
317
9f95a23c
TL
318 if (!conn->is_ok()) {
319 // connection had an issue while message was in the queue
320 // TODO add error stats
321 ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue. error: " << status_to_string(conn->status) << dendl;
322 if (message->cb) {
323 message->cb(conn->status);
324 }
325 return;
326 }
327
328 // create a new topic unless it was already created
329 auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic);
330 rd_kafka_topic_t* topic = nullptr;
331 if (topic_it == conn->topics.end()) {
332 topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
333 if (!topic) {
334 const auto err = rd_kafka_last_error();
335 ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl;
336 if (message->cb) {
337 message->cb(err);
338 }
339 conn->destroy(err);
340 return;
341 }
342 // TODO use the topics list as an LRU cache
343 conn->topics.push_back(topic);
344 ldout(conn->cct, 20) << "Kafka publish: successfully created topic: " << message->topic << dendl;
345 } else {
346 topic = *topic_it;
347 ldout(conn->cct, 20) << "Kafka publish: reused existing topic: " << message->topic << dendl;
348 }
349
350 const auto tag = (message->cb == nullptr ? nullptr : new uint64_t(conn->delivery_tag++));
351 const auto rc = rd_kafka_produce(
352 topic,
353 // TODO: non builtin partitioning
354 RD_KAFKA_PARTITION_UA,
355 // make a copy of the payload
356 // so it is safe to pass the pointer from the string
357 RD_KAFKA_MSG_F_COPY,
358 message->message.data(),
359 message->message.length(),
360 // optional key and its length
361 nullptr,
362 0,
363 // opaque data: tag, used in the global callback
364 // in order to invoke the real callback
365 // null if no callback exists
366 tag);
367 if (rc == -1) {
368 const auto err = rd_kafka_last_error();
369 ldout(conn->cct, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
370 // TODO: dont error on full queue, and don't destroy connection, retry instead
371 // immediatly invoke callback on error if needed
372 if (message->cb) {
373 message->cb(err);
374 }
375 conn->destroy(err);
376 delete tag;
f67539c2 377 return;
9f95a23c
TL
378 }
379
380 if (tag) {
381 auto const q_len = conn->callbacks.size();
382 if (q_len < max_inflight) {
383 ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
384 conn->callbacks.emplace_back(*tag, message->cb);
385 } else {
386 // immediately invoke callback with error - this is not a connection error
387 ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl;
388 message->cb(STATUS_MAX_INFLIGHT);
389 // tag will be deleted when the global callback is invoked
390 }
391 } else {
392 ldout(conn->cct, 20) << "Kafka publish (no callback): OK" << dendl;
393 }
394 }
395
396 // the managers thread:
397 // (1) empty the queue of messages to be published
398 // (2) loop over all connections and read acks
399 // (3) manages deleted connections
400 // (4) TODO reconnect on connection errors
401 // (5) TODO cleanup timedout callbacks
20effc67 402 void run() noexcept {
9f95a23c
TL
403 while (!stopped) {
404
405 // publish all messages in the queue
406 auto reply_count = 0U;
407 const auto send_count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
408 dequeued += send_count;
409 ConnectionList::iterator conn_it;
410 ConnectionList::const_iterator end_it;
411 {
412 // thread safe access to the connection list
413 // once the iterators are fetched they are guaranteed to remain valid
414 std::lock_guard lock(connections_lock);
415 conn_it = connections.begin();
416 end_it = connections.end();
417 }
418 // loop over all connections to read acks
419 for (;conn_it != end_it;) {
420
421 auto& conn = conn_it->second;
20effc67
TL
422
423 // Checking the connection idlesness
424 if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
425 ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
9f95a23c
TL
426 ERASE_AND_CONTINUE(conn_it, connections);
427 }
428
429 // try to reconnect the connection if it has an error
430 if (!conn->is_ok()) {
431 ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl;
432 const auto& broker = conn_it->first;
433 ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl;
434 if (create_connection(conn)->is_ok() == false) {
435 ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl;
436 // TODO: add error counter for failed retries
437 // TODO: add exponential backoff for retries
438 } else {
439 ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successfull" << dendl;
440 }
441 INCREMENT_AND_CONTINUE(conn_it);
442 }
443
444 reply_count += rd_kafka_poll(conn->producer, read_timeout_ms);
445
446 // just increment the iterator
447 ++conn_it;
448 }
449 // if no messages were received or published
450 // across all connection, sleep for 100ms
451 if (send_count == 0 && reply_count == 0) {
452 std::this_thread::sleep_for(std::chrono::milliseconds(100));
453 }
454 }
455 }
456
457 // used in the dtor for message cleanup
458 static void delete_message(const message_wrapper_t* message) {
459 delete message;
460 }
461
462public:
463 Manager(size_t _max_connections,
464 size_t _max_inflight,
465 size_t _max_queue,
466 int _read_timeout_ms,
467 CephContext* _cct) :
468 max_connections(_max_connections),
469 max_inflight(_max_inflight),
470 max_queue(_max_queue),
20effc67 471 max_idle_time(30),
9f95a23c
TL
472 connection_count(0),
473 stopped(false),
474 read_timeout_ms(_read_timeout_ms),
475 connections(_max_connections),
476 messages(max_queue),
477 queued(0),
478 dequeued(0),
479 cct(_cct),
480 runner(&Manager::run, this) {
481 // The hashmap has "max connections" as the initial number of buckets,
482 // and allows for 10 collisions per bucket before rehash.
483 // This is to prevent rehashing so that iterators are not invalidated
484 // when a new connection is added.
485 connections.max_load_factor(10.0);
486 // give the runner thread a name for easier debugging
487 const auto rc = ceph_pthread_setname(runner.native_handle(), "kafka_manager");
488 ceph_assert(rc==0);
489 }
490
491 // non copyable
492 Manager(const Manager&) = delete;
493 const Manager& operator=(const Manager&) = delete;
494
495 // stop the main thread
496 void stop() {
497 stopped = true;
498 }
499
9f95a23c
TL
500 // connect to a broker, or reuse an existing connection if already connected
501 connection_ptr_t connect(const std::string& url,
502 bool use_ssl,
503 bool verify_ssl,
504 boost::optional<const std::string&> ca_location) {
505 if (stopped) {
506 // TODO: increment counter
507 ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
508 return nullptr;
509 }
510
511 std::string broker;
512 std::string user;
513 std::string password;
514 if (!parse_url_authority(url, broker, user, password)) {
515 // TODO: increment counter
516 ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
517 return nullptr;
518 }
519
520 // this should be validated by the regex in parse_url()
521 ceph_assert(user.empty() == password.empty());
522
523 if (!user.empty() && !use_ssl) {
524 ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
525 return nullptr;
526 }
527
528 std::lock_guard lock(connections_lock);
529 const auto it = connections.find(broker);
530 // note that ssl vs. non-ssl connection to the same host are two separate conenctions
531 if (it != connections.end()) {
9f95a23c
TL
532 // connection found - return even if non-ok
533 ldout(cct, 20) << "Kafka connect: connection found" << dendl;
534 return it->second;
535 }
536
537 // connection not found, creating a new one
538 if (connection_count >= max_connections) {
539 // TODO: increment counter
540 ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl;
541 return nullptr;
542 }
543 const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password);
544 // create_new_connection must always return a connection object
545 // even if error occurred during creation.
546 // in such a case the creation will be retried in the main thread
547 ceph_assert(conn);
548 ++connection_count;
549 ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
550 return connections.emplace(broker, conn).first->second;
551 }
552
553 // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
554 int publish(connection_ptr_t& conn,
555 const std::string& topic,
556 const std::string& message) {
557 if (stopped) {
558 return STATUS_MANAGER_STOPPED;
559 }
560 if (!conn || !conn->is_ok()) {
561 return STATUS_CONNECTION_CLOSED;
562 }
563 if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
564 ++queued;
565 return STATUS_OK;
566 }
567 return STATUS_QUEUE_FULL;
568 }
569
570 int publish_with_confirm(connection_ptr_t& conn,
571 const std::string& topic,
572 const std::string& message,
573 reply_callback_t cb) {
574 if (stopped) {
575 return STATUS_MANAGER_STOPPED;
576 }
577 if (!conn || !conn->is_ok()) {
578 return STATUS_CONNECTION_CLOSED;
579 }
580 if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
581 ++queued;
582 return STATUS_OK;
583 }
584 return STATUS_QUEUE_FULL;
585 }
586
587 // dtor wait for thread to stop
588 // then connection are cleaned-up
589 ~Manager() {
590 stopped = true;
591 runner.join();
592 messages.consume_all(delete_message);
593 }
594
595 // get the number of connections
596 size_t get_connection_count() const {
597 return connection_count;
598 }
599
600 // get the number of in-flight messages
601 size_t get_inflight() const {
602 size_t sum = 0;
603 std::lock_guard lock(connections_lock);
604 std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
605 sum += conn_pair.second->callbacks.size();
606 });
607 return sum;
608 }
609
610 // running counter of the queued messages
611 size_t get_queued() const {
612 return queued;
613 }
614
615 // running counter of the dequeued messages
616 size_t get_dequeued() const {
617 return dequeued;
618 }
619};
620
621// singleton manager
622// note that the manager itself is not a singleton, and multiple instances may co-exist
623// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
624static Manager* s_manager = nullptr;
625
626static const size_t MAX_CONNECTIONS_DEFAULT = 256;
627static const size_t MAX_INFLIGHT_DEFAULT = 8192;
628static const size_t MAX_QUEUE_DEFAULT = 8192;
629static const int READ_TIMEOUT_MS_DEFAULT = 500;
630
631bool init(CephContext* cct) {
632 if (s_manager) {
633 return false;
634 }
635 // TODO: take conf from CephContext
636 s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, READ_TIMEOUT_MS_DEFAULT, cct);
637 return true;
638}
639
640void shutdown() {
641 delete s_manager;
642 s_manager = nullptr;
643}
644
645connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl,
646 boost::optional<const std::string&> ca_location) {
647 if (!s_manager) return nullptr;
648 return s_manager->connect(url, use_ssl, verify_ssl, ca_location);
649}
650
651int publish(connection_ptr_t& conn,
652 const std::string& topic,
653 const std::string& message) {
654 if (!s_manager) return STATUS_MANAGER_STOPPED;
655 return s_manager->publish(conn, topic, message);
656}
657
658int publish_with_confirm(connection_ptr_t& conn,
659 const std::string& topic,
660 const std::string& message,
661 reply_callback_t cb) {
662 if (!s_manager) return STATUS_MANAGER_STOPPED;
663 return s_manager->publish_with_confirm(conn, topic, message, cb);
664}
665
666size_t get_connection_count() {
667 if (!s_manager) return 0;
668 return s_manager->get_connection_count();
669}
670
671size_t get_inflight() {
672 if (!s_manager) return 0;
673 return s_manager->get_inflight();
674}
675
676size_t get_queued() {
677 if (!s_manager) return 0;
678 return s_manager->get_queued();
679}
680
681size_t get_dequeued() {
682 if (!s_manager) return 0;
683 return s_manager->get_dequeued();
684}
685
686size_t get_max_connections() {
687 if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
688 return s_manager->max_connections;
689}
690
691size_t get_max_inflight() {
692 if (!s_manager) return MAX_INFLIGHT_DEFAULT;
693 return s_manager->max_inflight;
694}
695
696size_t get_max_queue() {
697 if (!s_manager) return MAX_QUEUE_DEFAULT;
698 return s_manager->max_queue;
699}
700
9f95a23c
TL
701} // namespace kafka
702