]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_kafka.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / rgw / rgw_kafka.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#include "rgw_kafka.h"
5#include "rgw_url.h"
6#include <librdkafka/rdkafka.h>
7#include "include/ceph_assert.h"
8#include <sstream>
9#include <cstring>
10#include <unordered_map>
11#include <string>
12#include <vector>
13#include <thread>
14#include <atomic>
15#include <mutex>
16#include <boost/lockfree/queue.hpp>
17#include "common/dout.h"
18
19#define dout_subsys ceph_subsys_rgw
20
21// TODO investigation, not necessarily issues:
22// (1) in case of single threaded writer context use spsc_queue
23// (2) check performance of emptying queue to local list, and go over the list and publish
24// (3) use std::shared_mutex (c++17) or equivalent for the connections lock
25
26// cmparisson operator between topic pointer and name
27bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) {
28 return name == std::string_view(rd_kafka_topic_name(rkt));
29}
30
31namespace rgw::kafka {
32
33// status codes for publishing
9f95a23c
TL
34static const int STATUS_CONNECTION_CLOSED = -0x1002;
35static const int STATUS_QUEUE_FULL = -0x1003;
36static const int STATUS_MAX_INFLIGHT = -0x1004;
37static const int STATUS_MANAGER_STOPPED = -0x1005;
aee94f69 38static const int STATUS_CONNECTION_IDLE = -0x1006;
9f95a23c
TL
39// status code for connection opening
40static const int STATUS_CONF_ALLOC_FAILED = -0x2001;
1e59de90 41static const int STATUS_CONF_REPLCACE = -0x2002;
9f95a23c
TL
42
43static const int STATUS_OK = 0x0;
44
45// struct for holding the callback and its tag in the callback list
46struct reply_callback_with_tag_t {
47 uint64_t tag;
48 reply_callback_t cb;
49
50 reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {}
51
52 bool operator==(uint64_t rhs) {
53 return tag == rhs;
54 }
55};
56
57typedef std::vector<reply_callback_with_tag_t> CallbackList;
58
59// struct for holding the connection state object as well as list of topics
60// it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
61// since references to deleted objects may still exist in the calling code
62struct connection_t {
63 rd_kafka_t* producer = nullptr;
64 rd_kafka_conf_t* temp_conf = nullptr;
65 std::vector<rd_kafka_topic_t*> topics;
9f95a23c 66 uint64_t delivery_tag = 1;
f67539c2 67 int status = STATUS_OK;
9f95a23c
TL
68 CephContext* const cct;
69 CallbackList callbacks;
70 const std::string broker;
71 const bool use_ssl;
72 const bool verify_ssl; // TODO currently iognored, not supported in librdkafka v0.11.6
73 const boost::optional<std::string> ca_location;
74 const std::string user;
75 const std::string password;
1e59de90 76 const boost::optional<std::string> mechanism;
20effc67 77 utime_t timestamp = ceph_clock_now();
9f95a23c
TL
78
79 // cleanup of all internal connection resource
80 // the object can still remain, and internal connection
81 // resources created again on successful reconnection
82 void destroy(int s) {
83 status = s;
84 // destroy temporary conf (if connection was never established)
85 if (temp_conf) {
86 rd_kafka_conf_destroy(temp_conf);
87 return;
88 }
aee94f69
TL
89 if (!is_ok()) {
90 // no producer, nothing to destroy
91 return;
92 }
9f95a23c
TL
93 // wait for all remaining acks/nacks
94 rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */);
95 // destroy all topics
96 std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
97 // destroy producer
98 rd_kafka_destroy(producer);
aee94f69 99 producer = nullptr;
9f95a23c
TL
100 // fire all remaining callbacks (if not fired by rd_kafka_flush)
101 std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
102 cb_tag.cb(status);
aee94f69
TL
103 ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag <<
104 " for: " << broker << dendl;
9f95a23c
TL
105 });
106 callbacks.clear();
107 delivery_tag = 1;
aee94f69 108 ldout(cct, 20) << "Kafka destroy: complete for: " << broker << dendl;
9f95a23c
TL
109 }
110
111 bool is_ok() const {
20effc67 112 return (producer != nullptr);
9f95a23c
TL
113 }
114
115 // ctor for setting immutable values
116 connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl,
117 const boost::optional<const std::string&>& _ca_location,
1e59de90
TL
118 const std::string& _user, const std::string& _password, const boost::optional<const std::string&>& _mechanism) :
119 cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password), mechanism(_mechanism) {}
9f95a23c
TL
120
121 // dtor also destroys the internals
122 ~connection_t() {
aee94f69 123 destroy(status);
9f95a23c 124 }
9f95a23c
TL
125};
126
9f95a23c
TL
127// convert int status to string - including RGW specific values
128std::string status_to_string(int s) {
129 switch (s) {
130 case STATUS_OK:
131 return "STATUS_OK";
132 case STATUS_CONNECTION_CLOSED:
133 return "RGW_KAFKA_STATUS_CONNECTION_CLOSED";
134 case STATUS_QUEUE_FULL:
135 return "RGW_KAFKA_STATUS_QUEUE_FULL";
136 case STATUS_MAX_INFLIGHT:
137 return "RGW_KAFKA_STATUS_MAX_INFLIGHT";
138 case STATUS_MANAGER_STOPPED:
139 return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
140 case STATUS_CONF_ALLOC_FAILED:
141 return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
1e59de90
TL
142 case STATUS_CONF_REPLCACE:
143 return "RGW_KAFKA_STATUS_CONF_REPLCACE";
aee94f69
TL
144 case STATUS_CONNECTION_IDLE:
145 return "RGW_KAFKA_STATUS_CONNECTION_IDLE";
9f95a23c
TL
146 }
147 return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
148}
149
150void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
151 ceph_assert(opaque);
152
153 const auto conn = reinterpret_cast<connection_t*>(opaque);
154 const auto result = rkmessage->err;
155
156 if (!rkmessage->_private) {
157 ldout(conn->cct, 20) << "Kafka run: n/ack received, (no callback) with result=" << result << dendl;
158 return;
159 }
160
161 const auto tag = reinterpret_cast<uint64_t*>(rkmessage->_private);
162 const auto& callbacks_end = conn->callbacks.end();
163 const auto& callbacks_begin = conn->callbacks.begin();
164 const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag);
165 if (tag_it != callbacks_end) {
166 ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" <<
167 *tag << " and result=" << rd_kafka_err2str(result) << dendl;
168 tag_it->cb(result);
169 conn->callbacks.erase(tag_it);
170 } else {
171 // TODO add counter for acks with no callback
172 ldout(conn->cct, 10) << "Kafka run: unsolicited n/ack received with tag=" <<
173 *tag << dendl;
174 }
175 delete tag;
176 // rkmessage is destroyed automatically by librdkafka
177}
178
1e59de90
TL
179void log_callback(const rd_kafka_t* rk, int level, const char *fac, const char *buf) {
180 ceph_assert(rd_kafka_opaque(rk));
9f95a23c 181
1e59de90
TL
182 const auto conn = reinterpret_cast<connection_t*>(rd_kafka_opaque(rk));
183 if (level <= 3)
184 ldout(conn->cct, 1) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
185 else if (level <= 5)
186 ldout(conn->cct, 2) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
187 else if (level <= 6)
188 ldout(conn->cct, 10) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
189 else
190 ldout(conn->cct, 20) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
191}
192
193void poll_err_callback(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
194 const auto conn = reinterpret_cast<connection_t*>(rd_kafka_opaque(rk));
195 ldout(conn->cct, 10) << "Kafka run: poll error(" << err << "): " << reason << dendl;
196}
197
198using connection_t_ptr = std::unique_ptr<connection_t>;
199
200// utility function to create a producer, when the connection object already exists
201bool new_producer(connection_t* conn) {
9f95a23c
TL
202 // reset all status codes
203 conn->status = STATUS_OK;
204 char errstr[512] = {0};
205
206 conn->temp_conf = rd_kafka_conf_new();
207 if (!conn->temp_conf) {
208 conn->status = STATUS_CONF_ALLOC_FAILED;
1e59de90 209 return false;
9f95a23c
TL
210 }
211
212 // get list of brokers based on the bootsrap broker
213 if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
214
215 if (conn->use_ssl) {
216 if (!conn->user.empty()) {
217 // use SSL+SASL
218 if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
9f95a23c
TL
219 rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
220 rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
221 ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl;
1e59de90
TL
222
223 if (conn->mechanism) {
224 if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
225 ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL mechanism" << dendl;
226 } else {
227 if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
228 ldout(conn->cct, 20) << "Kafka connect: using default SASL mechanism" << dendl;
229 }
230
9f95a23c
TL
231 } else {
232 // use only SSL
233 if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
234 ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL security" << dendl;
235 }
236 if (conn->ca_location) {
237 if (rd_kafka_conf_set(conn->temp_conf, "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
238 ldout(conn->cct, 20) << "Kafka connect: successfully configured CA location" << dendl;
239 } else {
240 ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl;
241 }
242 // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call
243 // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
244
245 ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl;
1e59de90
TL
246 } else if (!conn->user.empty()) {
247 // use SASL+PLAINTEXT
248 if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
249 rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
250 rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
251 ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL_PLAINTEXT" << dendl;
252
253 if (conn->mechanism) {
254 if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
255 ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL mechanism" << dendl;
256 } else {
257 if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
258 ldout(conn->cct, 20) << "Kafka connect: using default SASL mechanism" << dendl;
259 }
9f95a23c
TL
260 }
261
262 // set the global callback for delivery success/fail
263 rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback);
264
265 // set the global opaque pointer to be the connection itself
1e59de90 266 rd_kafka_conf_set_opaque(conn->temp_conf, conn);
9f95a23c 267
1e59de90
TL
268 // redirect kafka logs to RGW
269 rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback);
aee94f69
TL
270 // define poll callback to allow reconnect
271 rd_kafka_conf_set_error_cb(conn->temp_conf, poll_err_callback);
9f95a23c 272 // create the producer
1e59de90
TL
273 if (conn->producer) {
274 ldout(conn->cct, 5) << "Kafka connect: producer already exists. detroying the existing before creating a new one" << dendl;
275 conn->destroy(STATUS_CONF_REPLCACE);
276 }
9f95a23c
TL
277 conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
278 if (!conn->producer) {
279 conn->status = rd_kafka_last_error();
280 ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl;
1e59de90 281 return false;
9f95a23c
TL
282 }
283 ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl;
1e59de90
TL
284 {
285 // set log level of producer
286 const auto log_level = conn->cct->_conf->subsys.get_log_level(ceph_subsys_rgw);
287 if (log_level <= 1)
288 rd_kafka_set_log_level(conn->producer, 3);
289 else if (log_level <= 2)
290 rd_kafka_set_log_level(conn->producer, 5);
291 else if (log_level <= 10)
292 rd_kafka_set_log_level(conn->producer, 6);
293 else
294 rd_kafka_set_log_level(conn->producer, 7);
295 }
9f95a23c
TL
296
297 // conf ownership passed to producer
298 conn->temp_conf = nullptr;
1e59de90 299 return true;
9f95a23c
TL
300
301conf_error:
302 conn->status = rd_kafka_last_error();
303 ldout(conn->cct, 1) << "Kafka connect: configuration failed: " << errstr << dendl;
1e59de90 304 return false;
9f95a23c
TL
305}
306
1e59de90 307// struct used for holding messages in the message queue
9f95a23c 308struct message_wrapper_t {
1e59de90 309 std::string conn_name;
9f95a23c
TL
310 std::string topic;
311 std::string message;
1e59de90 312 const reply_callback_t cb;
9f95a23c 313
1e59de90 314 message_wrapper_t(const std::string& _conn_name,
9f95a23c
TL
315 const std::string& _topic,
316 const std::string& _message,
1e59de90 317 reply_callback_t _cb) : conn_name(_conn_name), topic(_topic), message(_message), cb(_cb) {}
9f95a23c
TL
318};
319
1e59de90 320typedef std::unordered_map<std::string, connection_t_ptr> ConnectionList;
9f95a23c
TL
321typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
322
9f95a23c
TL
323class Manager {
324public:
325 const size_t max_connections;
326 const size_t max_inflight;
327 const size_t max_queue;
20effc67 328 const size_t max_idle_time;
9f95a23c
TL
329private:
330 std::atomic<size_t> connection_count;
331 bool stopped;
332 int read_timeout_ms;
333 ConnectionList connections;
334 MessageQueue messages;
335 std::atomic<size_t> queued;
336 std::atomic<size_t> dequeued;
337 CephContext* const cct;
338 mutable std::mutex connections_lock;
339 std::thread runner;
340
341 // TODO use rd_kafka_produce_batch for better performance
342 void publish_internal(message_wrapper_t* message) {
1e59de90
TL
343 const std::unique_ptr<message_wrapper_t> msg_deleter(message);
344 const auto conn_it = connections.find(message->conn_name);
345 if (conn_it == connections.end()) {
346 ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue. error: " << STATUS_CONNECTION_CLOSED << dendl;
347 if (message->cb) {
348 message->cb(STATUS_CONNECTION_CLOSED);
349 }
350 return;
351 }
352 auto& conn = conn_it->second;
9f95a23c 353
20effc67
TL
354 conn->timestamp = ceph_clock_now();
355
9f95a23c
TL
356 if (!conn->is_ok()) {
357 // connection had an issue while message was in the queue
358 // TODO add error stats
1e59de90 359 ldout(conn->cct, 1) << "Kafka publish: producer was closed while message was in the queue. error: " << status_to_string(conn->status) << dendl;
9f95a23c
TL
360 if (message->cb) {
361 message->cb(conn->status);
362 }
363 return;
364 }
365
366 // create a new topic unless it was already created
367 auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic);
368 rd_kafka_topic_t* topic = nullptr;
369 if (topic_it == conn->topics.end()) {
370 topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
371 if (!topic) {
372 const auto err = rd_kafka_last_error();
373 ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl;
374 if (message->cb) {
375 message->cb(err);
376 }
377 conn->destroy(err);
378 return;
379 }
380 // TODO use the topics list as an LRU cache
381 conn->topics.push_back(topic);
382 ldout(conn->cct, 20) << "Kafka publish: successfully created topic: " << message->topic << dendl;
383 } else {
384 topic = *topic_it;
385 ldout(conn->cct, 20) << "Kafka publish: reused existing topic: " << message->topic << dendl;
386 }
387
388 const auto tag = (message->cb == nullptr ? nullptr : new uint64_t(conn->delivery_tag++));
389 const auto rc = rd_kafka_produce(
390 topic,
391 // TODO: non builtin partitioning
392 RD_KAFKA_PARTITION_UA,
393 // make a copy of the payload
394 // so it is safe to pass the pointer from the string
395 RD_KAFKA_MSG_F_COPY,
396 message->message.data(),
397 message->message.length(),
398 // optional key and its length
399 nullptr,
400 0,
401 // opaque data: tag, used in the global callback
402 // in order to invoke the real callback
403 // null if no callback exists
404 tag);
405 if (rc == -1) {
406 const auto err = rd_kafka_last_error();
407 ldout(conn->cct, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
408 // TODO: dont error on full queue, and don't destroy connection, retry instead
409 // immediatly invoke callback on error if needed
410 if (message->cb) {
411 message->cb(err);
412 }
413 conn->destroy(err);
414 delete tag;
f67539c2 415 return;
9f95a23c
TL
416 }
417
418 if (tag) {
419 auto const q_len = conn->callbacks.size();
420 if (q_len < max_inflight) {
421 ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
422 conn->callbacks.emplace_back(*tag, message->cb);
423 } else {
424 // immediately invoke callback with error - this is not a connection error
425 ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl;
426 message->cb(STATUS_MAX_INFLIGHT);
427 // tag will be deleted when the global callback is invoked
428 }
429 } else {
430 ldout(conn->cct, 20) << "Kafka publish (no callback): OK" << dendl;
431 }
432 }
433
434 // the managers thread:
435 // (1) empty the queue of messages to be published
436 // (2) loop over all connections and read acks
437 // (3) manages deleted connections
438 // (4) TODO reconnect on connection errors
439 // (5) TODO cleanup timedout callbacks
20effc67 440 void run() noexcept {
9f95a23c
TL
441 while (!stopped) {
442
443 // publish all messages in the queue
444 auto reply_count = 0U;
445 const auto send_count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
446 dequeued += send_count;
447 ConnectionList::iterator conn_it;
448 ConnectionList::const_iterator end_it;
449 {
450 // thread safe access to the connection list
451 // once the iterators are fetched they are guaranteed to remain valid
452 std::lock_guard lock(connections_lock);
453 conn_it = connections.begin();
454 end_it = connections.end();
455 }
456 // loop over all connections to read acks
457 for (;conn_it != end_it;) {
458
459 auto& conn = conn_it->second;
20effc67
TL
460
461 // Checking the connection idlesness
462 if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
aee94f69 463 ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
1e59de90 464 std::lock_guard lock(connections_lock);
aee94f69
TL
465 conn->destroy(STATUS_CONNECTION_IDLE);
466 conn_it = connections.erase(conn_it);
467 --connection_count; \
468 continue;
9f95a23c
TL
469 }
470
471 // try to reconnect the connection if it has an error
472 if (!conn->is_ok()) {
473 ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl;
474 const auto& broker = conn_it->first;
475 ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl;
1e59de90 476 if (new_producer(conn.get()) == false) {
9f95a23c
TL
477 ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl;
478 // TODO: add error counter for failed retries
479 // TODO: add exponential backoff for retries
480 } else {
481 ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successfull" << dendl;
482 }
aee94f69
TL
483 ++conn_it;
484 continue;
9f95a23c
TL
485 }
486
487 reply_count += rd_kafka_poll(conn->producer, read_timeout_ms);
488
489 // just increment the iterator
490 ++conn_it;
491 }
492 // if no messages were received or published
493 // across all connection, sleep for 100ms
494 if (send_count == 0 && reply_count == 0) {
495 std::this_thread::sleep_for(std::chrono::milliseconds(100));
496 }
497 }
498 }
499
500 // used in the dtor for message cleanup
501 static void delete_message(const message_wrapper_t* message) {
502 delete message;
503 }
504
505public:
506 Manager(size_t _max_connections,
507 size_t _max_inflight,
508 size_t _max_queue,
509 int _read_timeout_ms,
510 CephContext* _cct) :
511 max_connections(_max_connections),
512 max_inflight(_max_inflight),
513 max_queue(_max_queue),
20effc67 514 max_idle_time(30),
9f95a23c
TL
515 connection_count(0),
516 stopped(false),
517 read_timeout_ms(_read_timeout_ms),
518 connections(_max_connections),
519 messages(max_queue),
520 queued(0),
521 dequeued(0),
522 cct(_cct),
523 runner(&Manager::run, this) {
524 // The hashmap has "max connections" as the initial number of buckets,
525 // and allows for 10 collisions per bucket before rehash.
526 // This is to prevent rehashing so that iterators are not invalidated
527 // when a new connection is added.
528 connections.max_load_factor(10.0);
529 // give the runner thread a name for easier debugging
530 const auto rc = ceph_pthread_setname(runner.native_handle(), "kafka_manager");
531 ceph_assert(rc==0);
532 }
533
534 // non copyable
535 Manager(const Manager&) = delete;
536 const Manager& operator=(const Manager&) = delete;
537
538 // stop the main thread
539 void stop() {
540 stopped = true;
541 }
542
9f95a23c 543 // connect to a broker, or reuse an existing connection if already connected
1e59de90
TL
544 bool connect(std::string& broker,
545 const std::string& url,
9f95a23c
TL
546 bool use_ssl,
547 bool verify_ssl,
1e59de90
TL
548 boost::optional<const std::string&> ca_location,
549 boost::optional<const std::string&> mechanism) {
9f95a23c 550 if (stopped) {
9f95a23c 551 ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
1e59de90 552 return false;
9f95a23c
TL
553 }
554
1e59de90
TL
555 std::string user;
556 std::string password;
9f95a23c
TL
557 if (!parse_url_authority(url, broker, user, password)) {
558 // TODO: increment counter
559 ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
1e59de90 560 return false;
9f95a23c
TL
561 }
562
563 // this should be validated by the regex in parse_url()
564 ceph_assert(user.empty() == password.empty());
565
1e59de90 566 if (!user.empty() && !use_ssl && !g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) {
9f95a23c 567 ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
1e59de90
TL
568 return false;
569 }
9f95a23c
TL
570
571 std::lock_guard lock(connections_lock);
572 const auto it = connections.find(broker);
573 // note that ssl vs. non-ssl connection to the same host are two separate conenctions
574 if (it != connections.end()) {
9f95a23c
TL
575 // connection found - return even if non-ok
576 ldout(cct, 20) << "Kafka connect: connection found" << dendl;
1e59de90 577 return it->second.get();
9f95a23c
TL
578 }
579
580 // connection not found, creating a new one
581 if (connection_count >= max_connections) {
582 // TODO: increment counter
583 ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl;
1e59de90 584 return false;
9f95a23c 585 }
1e59de90 586 // create_connection must always return a connection object
9f95a23c
TL
587 // even if error occurred during creation.
588 // in such a case the creation will be retried in the main thread
9f95a23c
TL
589 ++connection_count;
590 ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
1e59de90
TL
591 auto conn = connections.emplace(broker, std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism)).first->second.get();
592 if (!new_producer(conn)) {
593 ldout(cct, 10) << "Kafka connect: new connection is created. But producer creation failed. will retry" << dendl;
594 }
595 return true;
9f95a23c
TL
596 }
597
598 // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
1e59de90 599 int publish(const std::string& conn_name,
9f95a23c
TL
600 const std::string& topic,
601 const std::string& message) {
602 if (stopped) {
603 return STATUS_MANAGER_STOPPED;
604 }
1e59de90 605 if (messages.push(new message_wrapper_t(conn_name, topic, message, nullptr))) {
9f95a23c
TL
606 ++queued;
607 return STATUS_OK;
608 }
609 return STATUS_QUEUE_FULL;
610 }
611
1e59de90 612 int publish_with_confirm(const std::string& conn_name,
9f95a23c
TL
613 const std::string& topic,
614 const std::string& message,
615 reply_callback_t cb) {
616 if (stopped) {
617 return STATUS_MANAGER_STOPPED;
618 }
1e59de90 619 if (messages.push(new message_wrapper_t(conn_name, topic, message, cb))) {
9f95a23c
TL
620 ++queued;
621 return STATUS_OK;
622 }
623 return STATUS_QUEUE_FULL;
624 }
625
626 // dtor wait for thread to stop
627 // then connection are cleaned-up
628 ~Manager() {
629 stopped = true;
630 runner.join();
631 messages.consume_all(delete_message);
632 }
633
634 // get the number of connections
635 size_t get_connection_count() const {
636 return connection_count;
637 }
638
639 // get the number of in-flight messages
640 size_t get_inflight() const {
641 size_t sum = 0;
642 std::lock_guard lock(connections_lock);
643 std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
644 sum += conn_pair.second->callbacks.size();
645 });
646 return sum;
647 }
648
649 // running counter of the queued messages
650 size_t get_queued() const {
651 return queued;
652 }
653
654 // running counter of the dequeued messages
655 size_t get_dequeued() const {
656 return dequeued;
657 }
658};
659
660// singleton manager
661// note that the manager itself is not a singleton, and multiple instances may co-exist
662// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
663static Manager* s_manager = nullptr;
664
665static const size_t MAX_CONNECTIONS_DEFAULT = 256;
666static const size_t MAX_INFLIGHT_DEFAULT = 8192;
667static const size_t MAX_QUEUE_DEFAULT = 8192;
668static const int READ_TIMEOUT_MS_DEFAULT = 500;
669
670bool init(CephContext* cct) {
671 if (s_manager) {
672 return false;
673 }
674 // TODO: take conf from CephContext
675 s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, READ_TIMEOUT_MS_DEFAULT, cct);
676 return true;
677}
678
679void shutdown() {
680 delete s_manager;
681 s_manager = nullptr;
682}
683
1e59de90
TL
684bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl,
685 boost::optional<const std::string&> ca_location,
686 boost::optional<const std::string&> mechanism) {
687 if (!s_manager) return false;
688 return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism);
9f95a23c
TL
689}
690
1e59de90 691int publish(const std::string& conn_name,
9f95a23c
TL
692 const std::string& topic,
693 const std::string& message) {
694 if (!s_manager) return STATUS_MANAGER_STOPPED;
1e59de90 695 return s_manager->publish(conn_name, topic, message);
9f95a23c
TL
696}
697
1e59de90 698int publish_with_confirm(const std::string& conn_name,
9f95a23c
TL
699 const std::string& topic,
700 const std::string& message,
701 reply_callback_t cb) {
702 if (!s_manager) return STATUS_MANAGER_STOPPED;
1e59de90 703 return s_manager->publish_with_confirm(conn_name, topic, message, cb);
9f95a23c
TL
704}
705
706size_t get_connection_count() {
707 if (!s_manager) return 0;
708 return s_manager->get_connection_count();
709}
710
711size_t get_inflight() {
712 if (!s_manager) return 0;
713 return s_manager->get_inflight();
714}
715
716size_t get_queued() {
717 if (!s_manager) return 0;
718 return s_manager->get_queued();
719}
720
721size_t get_dequeued() {
722 if (!s_manager) return 0;
723 return s_manager->get_dequeued();
724}
725
726size_t get_max_connections() {
727 if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
728 return s_manager->max_connections;
729}
730
731size_t get_max_inflight() {
732 if (!s_manager) return MAX_INFLIGHT_DEFAULT;
733 return s_manager->max_inflight;
734}
735
736size_t get_max_queue() {
737 if (!s_manager) return MAX_QUEUE_DEFAULT;
738 return s_manager->max_queue;
739}
740
9f95a23c
TL
741} // namespace kafka
742