1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
6 #include <librdkafka/rdkafka.h>
7 #include "include/ceph_assert.h"
10 #include <unordered_map>
16 #include <boost/lockfree/queue.hpp>
17 #include "common/dout.h"
19 #define dout_subsys ceph_subsys_rgw
21 // TODO investigation, not necessarily issues:
22 // (1) in case of single threaded writer context use spsc_queue
23 // (2) check performance of emptying queue to local list, and go over the list and publish
24 // (3) use std::shared_mutex (c++17) or equivalent for the connections lock
26 // cmparisson operator between topic pointer and name
27 bool operator==(const rd_kafka_topic_t
* rkt
, const std::string
& name
) {
28 return name
== std::string_view(rd_kafka_topic_name(rkt
));
31 namespace rgw::kafka
{
33 // status codes for publishing
34 // TODO: use the actual error code (when conn exists) instead of STATUS_CONNECTION_CLOSED when replying to client
35 static const int STATUS_CONNECTION_CLOSED
= -0x1002;
36 static const int STATUS_QUEUE_FULL
= -0x1003;
37 static const int STATUS_MAX_INFLIGHT
= -0x1004;
38 static const int STATUS_MANAGER_STOPPED
= -0x1005;
39 // status code for connection opening
40 static const int STATUS_CONF_ALLOC_FAILED
= -0x2001;
42 static const int STATUS_OK
= 0x0;
44 // struct for holding the callback and its tag in the callback list
45 struct reply_callback_with_tag_t
{
49 reply_callback_with_tag_t(uint64_t _tag
, reply_callback_t _cb
) : tag(_tag
), cb(_cb
) {}
51 bool operator==(uint64_t rhs
) {
56 typedef std::vector
<reply_callback_with_tag_t
> CallbackList
;
58 // struct for holding the connection state object as well as list of topics
59 // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
60 // since references to deleted objects may still exist in the calling code
62 rd_kafka_t
* producer
= nullptr;
63 rd_kafka_conf_t
* temp_conf
= nullptr;
64 std::vector
<rd_kafka_topic_t
*> topics
;
65 uint64_t delivery_tag
= 1;
66 int status
= STATUS_OK
;
67 mutable std::atomic
<int> ref_count
= 0;
68 CephContext
* const cct
;
69 CallbackList callbacks
;
70 const std::string broker
;
72 const bool verify_ssl
; // TODO currently iognored, not supported in librdkafka v0.11.6
73 const boost::optional
<std::string
> ca_location
;
74 const std::string user
;
75 const std::string password
;
76 utime_t timestamp
= ceph_clock_now();
78 // cleanup of all internal connection resource
79 // the object can still remain, and internal connection
80 // resources created again on successful reconnection
83 // destroy temporary conf (if connection was never established)
85 rd_kafka_conf_destroy(temp_conf
);
88 // wait for all remaining acks/nacks
89 rd_kafka_flush(producer
, 5*1000 /* wait for max 5 seconds */);
91 std::for_each(topics
.begin(), topics
.end(), [](auto topic
) {rd_kafka_topic_destroy(topic
);});
93 rd_kafka_destroy(producer
);
94 // fire all remaining callbacks (if not fired by rd_kafka_flush)
95 std::for_each(callbacks
.begin(), callbacks
.end(), [this](auto& cb_tag
) {
97 ldout(cct
, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag
.tag
<< dendl
;
104 return (producer
!= nullptr);
107 // ctor for setting immutable values
108 connection_t(CephContext
* _cct
, const std::string
& _broker
, bool _use_ssl
, bool _verify_ssl
,
109 const boost::optional
<const std::string
&>& _ca_location
,
110 const std::string
& _user
, const std::string
& _password
) :
111 cct(_cct
), broker(_broker
), use_ssl(_use_ssl
), verify_ssl(_verify_ssl
), ca_location(_ca_location
), user(_user
), password(_password
) {}
113 // dtor also destroys the internals
115 destroy(STATUS_CONNECTION_CLOSED
);
118 friend void intrusive_ptr_add_ref(const connection_t
* p
);
119 friend void intrusive_ptr_release(const connection_t
* p
);
122 std::string
to_string(const connection_ptr_t
& conn
) {
124 str
+= "\nBroker: " + conn
->broker
;
125 str
+= conn
->use_ssl
? "\nUse SSL" : "";
126 str
+= conn
->ca_location
? "\nCA Location: " + *(conn
->ca_location
) : "";
129 // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
130 void intrusive_ptr_add_ref(const connection_t
* p
) {
133 void intrusive_ptr_release(const connection_t
* p
) {
134 if (--p
->ref_count
== 0) {
139 // convert int status to string - including RGW specific values
140 std::string
status_to_string(int s
) {
144 case STATUS_CONNECTION_CLOSED
:
145 return "RGW_KAFKA_STATUS_CONNECTION_CLOSED";
146 case STATUS_QUEUE_FULL
:
147 return "RGW_KAFKA_STATUS_QUEUE_FULL";
148 case STATUS_MAX_INFLIGHT
:
149 return "RGW_KAFKA_STATUS_MAX_INFLIGHT";
150 case STATUS_MANAGER_STOPPED
:
151 return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
152 case STATUS_CONF_ALLOC_FAILED
:
153 return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
155 return std::string(rd_kafka_err2str((rd_kafka_resp_err_t
)s
));
158 void message_callback(rd_kafka_t
* rk
, const rd_kafka_message_t
* rkmessage
, void* opaque
) {
161 const auto conn
= reinterpret_cast<connection_t
*>(opaque
);
162 const auto result
= rkmessage
->err
;
164 if (!rkmessage
->_private
) {
165 ldout(conn
->cct
, 20) << "Kafka run: n/ack received, (no callback) with result=" << result
<< dendl
;
169 const auto tag
= reinterpret_cast<uint64_t*>(rkmessage
->_private
);
170 const auto& callbacks_end
= conn
->callbacks
.end();
171 const auto& callbacks_begin
= conn
->callbacks
.begin();
172 const auto tag_it
= std::find(callbacks_begin
, callbacks_end
, *tag
);
173 if (tag_it
!= callbacks_end
) {
174 ldout(conn
->cct
, 20) << "Kafka run: n/ack received, invoking callback with tag=" <<
175 *tag
<< " and result=" << rd_kafka_err2str(result
) << dendl
;
177 conn
->callbacks
.erase(tag_it
);
179 // TODO add counter for acks with no callback
180 ldout(conn
->cct
, 10) << "Kafka run: unsolicited n/ack received with tag=" <<
184 // rkmessage is destroyed automatically by librdkafka
187 // utility function to create a connection, when the connection object already exists
188 connection_ptr_t
& create_connection(connection_ptr_t
& conn
) {
189 // pointer must be valid and not marked for deletion
192 // reset all status codes
193 conn
->status
= STATUS_OK
;
194 char errstr
[512] = {0};
196 conn
->temp_conf
= rd_kafka_conf_new();
197 if (!conn
->temp_conf
) {
198 conn
->status
= STATUS_CONF_ALLOC_FAILED
;
202 // get list of brokers based on the bootsrap broker
203 if (rd_kafka_conf_set(conn
->temp_conf
, "bootstrap.servers", conn
->broker
.c_str(), errstr
, sizeof(errstr
)) != RD_KAFKA_CONF_OK
) goto conf_error
;
206 if (!conn
->user
.empty()) {
208 if (rd_kafka_conf_set(conn
->temp_conf
, "security.protocol", "SASL_SSL", errstr
, sizeof(errstr
)) != RD_KAFKA_CONF_OK
||
209 rd_kafka_conf_set(conn
->temp_conf
, "sasl.mechanism", "PLAIN", errstr
, sizeof(errstr
)) != RD_KAFKA_CONF_OK
||
210 rd_kafka_conf_set(conn
->temp_conf
, "sasl.username", conn
->user
.c_str(), errstr
, sizeof(errstr
)) != RD_KAFKA_CONF_OK
||
211 rd_kafka_conf_set(conn
->temp_conf
, "sasl.password", conn
->password
.c_str(), errstr
, sizeof(errstr
)) != RD_KAFKA_CONF_OK
) goto conf_error
;
212 ldout(conn
->cct
, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl
;
215 if (rd_kafka_conf_set(conn
->temp_conf
, "security.protocol", "SSL", errstr
, sizeof(errstr
)) != RD_KAFKA_CONF_OK
) goto conf_error
;
216 ldout(conn
->cct
, 20) << "Kafka connect: successfully configured SSL security" << dendl
;
218 if (conn
->ca_location
) {
219 if (rd_kafka_conf_set(conn
->temp_conf
, "ssl.ca.location", conn
->ca_location
->c_str(), errstr
, sizeof(errstr
)) != RD_KAFKA_CONF_OK
) goto conf_error
;
220 ldout(conn
->cct
, 20) << "Kafka connect: successfully configured CA location" << dendl
;
222 ldout(conn
->cct
, 20) << "Kafka connect: using default CA location" << dendl
;
224 // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call
225 // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
227 ldout(conn
->cct
, 20) << "Kafka connect: successfully configured security" << dendl
;
230 // set the global callback for delivery success/fail
231 rd_kafka_conf_set_dr_msg_cb(conn
->temp_conf
, message_callback
);
233 // set the global opaque pointer to be the connection itself
234 rd_kafka_conf_set_opaque(conn
->temp_conf
, conn
.get());
236 // create the producer
237 conn
->producer
= rd_kafka_new(RD_KAFKA_PRODUCER
, conn
->temp_conf
, errstr
, sizeof(errstr
));
238 if (!conn
->producer
) {
239 conn
->status
= rd_kafka_last_error();
240 ldout(conn
->cct
, 1) << "Kafka connect: failed to create producer: " << errstr
<< dendl
;
243 ldout(conn
->cct
, 20) << "Kafka connect: successfully created new producer" << dendl
;
245 // conf ownership passed to producer
246 conn
->temp_conf
= nullptr;
250 conn
->status
= rd_kafka_last_error();
251 ldout(conn
->cct
, 1) << "Kafka connect: configuration failed: " << errstr
<< dendl
;
255 // utility function to create a new connection
256 connection_ptr_t
create_new_connection(const std::string
& broker
, CephContext
* cct
,
259 boost::optional
<const std::string
&> ca_location
,
260 const std::string
& user
,
261 const std::string
& password
) {
262 // create connection state
263 connection_ptr_t
conn(new connection_t(cct
, broker
, use_ssl
, verify_ssl
, ca_location
, user
, password
));
264 return create_connection(conn
);
267 /// struct used for holding messages in the message queue
268 struct message_wrapper_t
{
269 connection_ptr_t conn
;
274 message_wrapper_t(connection_ptr_t
& _conn
,
275 const std::string
& _topic
,
276 const std::string
& _message
,
277 reply_callback_t _cb
) : conn(_conn
), topic(_topic
), message(_message
), cb(_cb
) {}
280 typedef std::unordered_map
<std::string
, connection_ptr_t
> ConnectionList
;
281 typedef boost::lockfree::queue
<message_wrapper_t
*, boost::lockfree::fixed_sized
<true>> MessageQueue
;
283 // macros used inside a loop where an iterator is either incremented or erased
284 #define INCREMENT_AND_CONTINUE(IT) \
288 #define ERASE_AND_CONTINUE(IT,CONTAINER) \
289 IT=CONTAINER.erase(IT); \
290 --connection_count; \
295 const size_t max_connections
;
296 const size_t max_inflight
;
297 const size_t max_queue
;
298 const size_t max_idle_time
;
300 std::atomic
<size_t> connection_count
;
303 ConnectionList connections
;
304 MessageQueue messages
;
305 std::atomic
<size_t> queued
;
306 std::atomic
<size_t> dequeued
;
307 CephContext
* const cct
;
308 mutable std::mutex connections_lock
;
311 // TODO use rd_kafka_produce_batch for better performance
312 void publish_internal(message_wrapper_t
* message
) {
313 const std::unique_ptr
<message_wrapper_t
> msg_owner(message
);
314 auto& conn
= message
->conn
;
316 conn
->timestamp
= ceph_clock_now();
318 if (!conn
->is_ok()) {
319 // connection had an issue while message was in the queue
320 // TODO add error stats
321 ldout(conn
->cct
, 1) << "Kafka publish: connection had an issue while message was in the queue. error: " << status_to_string(conn
->status
) << dendl
;
323 message
->cb(conn
->status
);
328 // create a new topic unless it was already created
329 auto topic_it
= std::find(conn
->topics
.begin(), conn
->topics
.end(), message
->topic
);
330 rd_kafka_topic_t
* topic
= nullptr;
331 if (topic_it
== conn
->topics
.end()) {
332 topic
= rd_kafka_topic_new(conn
->producer
, message
->topic
.c_str(), nullptr);
334 const auto err
= rd_kafka_last_error();
335 ldout(conn
->cct
, 1) << "Kafka publish: failed to create topic: " << message
->topic
<< " error: " << status_to_string(err
) << dendl
;
342 // TODO use the topics list as an LRU cache
343 conn
->topics
.push_back(topic
);
344 ldout(conn
->cct
, 20) << "Kafka publish: successfully created topic: " << message
->topic
<< dendl
;
347 ldout(conn
->cct
, 20) << "Kafka publish: reused existing topic: " << message
->topic
<< dendl
;
350 const auto tag
= (message
->cb
== nullptr ? nullptr : new uint64_t(conn
->delivery_tag
++));
351 const auto rc
= rd_kafka_produce(
353 // TODO: non builtin partitioning
354 RD_KAFKA_PARTITION_UA
,
355 // make a copy of the payload
356 // so it is safe to pass the pointer from the string
358 message
->message
.data(),
359 message
->message
.length(),
360 // optional key and its length
363 // opaque data: tag, used in the global callback
364 // in order to invoke the real callback
365 // null if no callback exists
368 const auto err
= rd_kafka_last_error();
369 ldout(conn
->cct
, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err
) << dendl
;
370 // TODO: dont error on full queue, and don't destroy connection, retry instead
371 // immediatly invoke callback on error if needed
381 auto const q_len
= conn
->callbacks
.size();
382 if (q_len
< max_inflight
) {
383 ldout(conn
->cct
, 20) << "Kafka publish (with callback, tag=" << *tag
<< "): OK. Queue has: " << q_len
<< " callbacks" << dendl
;
384 conn
->callbacks
.emplace_back(*tag
, message
->cb
);
386 // immediately invoke callback with error - this is not a connection error
387 ldout(conn
->cct
, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl
;
388 message
->cb(STATUS_MAX_INFLIGHT
);
389 // tag will be deleted when the global callback is invoked
392 ldout(conn
->cct
, 20) << "Kafka publish (no callback): OK" << dendl
;
396 // the managers thread:
397 // (1) empty the queue of messages to be published
398 // (2) loop over all connections and read acks
399 // (3) manages deleted connections
400 // (4) TODO reconnect on connection errors
401 // (5) TODO cleanup timedout callbacks
402 void run() noexcept
{
405 // publish all messages in the queue
406 auto reply_count
= 0U;
407 const auto send_count
= messages
.consume_all(std::bind(&Manager::publish_internal
, this, std::placeholders::_1
));
408 dequeued
+= send_count
;
409 ConnectionList::iterator conn_it
;
410 ConnectionList::const_iterator end_it
;
412 // thread safe access to the connection list
413 // once the iterators are fetched they are guaranteed to remain valid
414 std::lock_guard
lock(connections_lock
);
415 conn_it
= connections
.begin();
416 end_it
= connections
.end();
418 // loop over all connections to read acks
419 for (;conn_it
!= end_it
;) {
421 auto& conn
= conn_it
->second
;
423 // Checking the connection idlesness
424 if(conn
->timestamp
.sec() + max_idle_time
< ceph_clock_now()) {
425 ldout(conn
->cct
, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl
;
426 ERASE_AND_CONTINUE(conn_it
, connections
);
429 // try to reconnect the connection if it has an error
430 if (!conn
->is_ok()) {
431 ldout(conn
->cct
, 10) << "Kafka run: connection status is: " << status_to_string(conn
->status
) << dendl
;
432 const auto& broker
= conn_it
->first
;
433 ldout(conn
->cct
, 20) << "Kafka run: retry connection" << dendl
;
434 if (create_connection(conn
)->is_ok() == false) {
435 ldout(conn
->cct
, 10) << "Kafka run: connection (" << broker
<< ") retry failed" << dendl
;
436 // TODO: add error counter for failed retries
437 // TODO: add exponential backoff for retries
439 ldout(conn
->cct
, 10) << "Kafka run: connection (" << broker
<< ") retry successfull" << dendl
;
441 INCREMENT_AND_CONTINUE(conn_it
);
444 reply_count
+= rd_kafka_poll(conn
->producer
, read_timeout_ms
);
446 // just increment the iterator
449 // if no messages were received or published
450 // across all connection, sleep for 100ms
451 if (send_count
== 0 && reply_count
== 0) {
452 std::this_thread::sleep_for(std::chrono::milliseconds(100));
457 // used in the dtor for message cleanup
458 static void delete_message(const message_wrapper_t
* message
) {
463 Manager(size_t _max_connections
,
464 size_t _max_inflight
,
466 int _read_timeout_ms
,
468 max_connections(_max_connections
),
469 max_inflight(_max_inflight
),
470 max_queue(_max_queue
),
474 read_timeout_ms(_read_timeout_ms
),
475 connections(_max_connections
),
480 runner(&Manager::run
, this) {
481 // The hashmap has "max connections" as the initial number of buckets,
482 // and allows for 10 collisions per bucket before rehash.
483 // This is to prevent rehashing so that iterators are not invalidated
484 // when a new connection is added.
485 connections
.max_load_factor(10.0);
486 // give the runner thread a name for easier debugging
487 const auto rc
= ceph_pthread_setname(runner
.native_handle(), "kafka_manager");
492 Manager(const Manager
&) = delete;
493 const Manager
& operator=(const Manager
&) = delete;
495 // stop the main thread
500 // connect to a broker, or reuse an existing connection if already connected
501 connection_ptr_t
connect(const std::string
& url
,
504 boost::optional
<const std::string
&> ca_location
) {
506 // TODO: increment counter
507 ldout(cct
, 1) << "Kafka connect: manager is stopped" << dendl
;
513 std::string password
;
514 if (!parse_url_authority(url
, broker
, user
, password
)) {
515 // TODO: increment counter
516 ldout(cct
, 1) << "Kafka connect: URL parsing failed" << dendl
;
520 // this should be validated by the regex in parse_url()
521 ceph_assert(user
.empty() == password
.empty());
523 if (!user
.empty() && !use_ssl
) {
524 ldout(cct
, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl
;
528 std::lock_guard
lock(connections_lock
);
529 const auto it
= connections
.find(broker
);
530 // note that ssl vs. non-ssl connection to the same host are two separate conenctions
531 if (it
!= connections
.end()) {
532 // connection found - return even if non-ok
533 ldout(cct
, 20) << "Kafka connect: connection found" << dendl
;
537 // connection not found, creating a new one
538 if (connection_count
>= max_connections
) {
539 // TODO: increment counter
540 ldout(cct
, 1) << "Kafka connect: max connections exceeded" << dendl
;
543 const auto conn
= create_new_connection(broker
, cct
, use_ssl
, verify_ssl
, ca_location
, user
, password
);
544 // create_new_connection must always return a connection object
545 // even if error occurred during creation.
546 // in such a case the creation will be retried in the main thread
549 ldout(cct
, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count
<< dendl
;
550 return connections
.emplace(broker
, conn
).first
->second
;
553 // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
554 int publish(connection_ptr_t
& conn
,
555 const std::string
& topic
,
556 const std::string
& message
) {
558 return STATUS_MANAGER_STOPPED
;
560 if (!conn
|| !conn
->is_ok()) {
561 return STATUS_CONNECTION_CLOSED
;
563 if (messages
.push(new message_wrapper_t(conn
, topic
, message
, nullptr))) {
567 return STATUS_QUEUE_FULL
;
570 int publish_with_confirm(connection_ptr_t
& conn
,
571 const std::string
& topic
,
572 const std::string
& message
,
573 reply_callback_t cb
) {
575 return STATUS_MANAGER_STOPPED
;
577 if (!conn
|| !conn
->is_ok()) {
578 return STATUS_CONNECTION_CLOSED
;
580 if (messages
.push(new message_wrapper_t(conn
, topic
, message
, cb
))) {
584 return STATUS_QUEUE_FULL
;
587 // dtor wait for thread to stop
588 // then connection are cleaned-up
592 messages
.consume_all(delete_message
);
595 // get the number of connections
596 size_t get_connection_count() const {
597 return connection_count
;
600 // get the number of in-flight messages
601 size_t get_inflight() const {
603 std::lock_guard
lock(connections_lock
);
604 std::for_each(connections
.begin(), connections
.end(), [&sum
](auto& conn_pair
) {
605 sum
+= conn_pair
.second
->callbacks
.size();
610 // running counter of the queued messages
611 size_t get_queued() const {
615 // running counter of the dequeued messages
616 size_t get_dequeued() const {
622 // note that the manager itself is not a singleton, and multiple instances may co-exist
623 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
624 static Manager
* s_manager
= nullptr;
626 static const size_t MAX_CONNECTIONS_DEFAULT
= 256;
627 static const size_t MAX_INFLIGHT_DEFAULT
= 8192;
628 static const size_t MAX_QUEUE_DEFAULT
= 8192;
629 static const int READ_TIMEOUT_MS_DEFAULT
= 500;
631 bool init(CephContext
* cct
) {
635 // TODO: take conf from CephContext
636 s_manager
= new Manager(MAX_CONNECTIONS_DEFAULT
, MAX_INFLIGHT_DEFAULT
, MAX_QUEUE_DEFAULT
, READ_TIMEOUT_MS_DEFAULT
, cct
);
645 connection_ptr_t
connect(const std::string
& url
, bool use_ssl
, bool verify_ssl
,
646 boost::optional
<const std::string
&> ca_location
) {
647 if (!s_manager
) return nullptr;
648 return s_manager
->connect(url
, use_ssl
, verify_ssl
, ca_location
);
651 int publish(connection_ptr_t
& conn
,
652 const std::string
& topic
,
653 const std::string
& message
) {
654 if (!s_manager
) return STATUS_MANAGER_STOPPED
;
655 return s_manager
->publish(conn
, topic
, message
);
658 int publish_with_confirm(connection_ptr_t
& conn
,
659 const std::string
& topic
,
660 const std::string
& message
,
661 reply_callback_t cb
) {
662 if (!s_manager
) return STATUS_MANAGER_STOPPED
;
663 return s_manager
->publish_with_confirm(conn
, topic
, message
, cb
);
666 size_t get_connection_count() {
667 if (!s_manager
) return 0;
668 return s_manager
->get_connection_count();
671 size_t get_inflight() {
672 if (!s_manager
) return 0;
673 return s_manager
->get_inflight();
676 size_t get_queued() {
677 if (!s_manager
) return 0;
678 return s_manager
->get_queued();
681 size_t get_dequeued() {
682 if (!s_manager
) return 0;
683 return s_manager
->get_dequeued();
686 size_t get_max_connections() {
687 if (!s_manager
) return MAX_CONNECTIONS_DEFAULT
;
688 return s_manager
->max_connections
;
691 size_t get_max_inflight() {
692 if (!s_manager
) return MAX_INFLIGHT_DEFAULT
;
693 return s_manager
->max_inflight
;
696 size_t get_max_queue() {
697 if (!s_manager
) return MAX_QUEUE_DEFAULT
;
698 return s_manager
->max_queue
;