]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #include "rgw_kafka.h" | |
5 | #include "rgw_url.h" | |
6 | #include <librdkafka/rdkafka.h> | |
7 | #include "include/ceph_assert.h" | |
8 | #include <sstream> | |
9 | #include <cstring> | |
10 | #include <unordered_map> | |
11 | #include <string> | |
12 | #include <vector> | |
13 | #include <thread> | |
14 | #include <atomic> | |
15 | #include <mutex> | |
16 | #include <boost/lockfree/queue.hpp> | |
17 | #include "common/dout.h" | |
18 | ||
19 | #define dout_subsys ceph_subsys_rgw | |
20 | ||
21 | // TODO investigation, not necessarily issues: | |
22 | // (1) in case of single threaded writer context use spsc_queue | |
23 | // (2) check performance of emptying queue to local list, and go over the list and publish | |
24 | // (3) use std::shared_mutex (c++17) or equivalent for the connections lock | |
25 | ||
26 | // cmparisson operator between topic pointer and name | |
27 | bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) { | |
28 | return name == std::string_view(rd_kafka_topic_name(rkt)); | |
29 | } | |
30 | ||
31 | namespace rgw::kafka { | |
32 | ||
33 | // status codes for publishing | |
9f95a23c TL |
34 | static const int STATUS_CONNECTION_CLOSED = -0x1002; |
35 | static const int STATUS_QUEUE_FULL = -0x1003; | |
36 | static const int STATUS_MAX_INFLIGHT = -0x1004; | |
37 | static const int STATUS_MANAGER_STOPPED = -0x1005; | |
aee94f69 | 38 | static const int STATUS_CONNECTION_IDLE = -0x1006; |
9f95a23c TL |
39 | // status code for connection opening |
40 | static const int STATUS_CONF_ALLOC_FAILED = -0x2001; | |
1e59de90 | 41 | static const int STATUS_CONF_REPLCACE = -0x2002; |
9f95a23c TL |
42 | |
43 | static const int STATUS_OK = 0x0; | |
44 | ||
45 | // struct for holding the callback and its tag in the callback list | |
46 | struct reply_callback_with_tag_t { | |
47 | uint64_t tag; | |
48 | reply_callback_t cb; | |
49 | ||
50 | reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {} | |
51 | ||
52 | bool operator==(uint64_t rhs) { | |
53 | return tag == rhs; | |
54 | } | |
55 | }; | |
56 | ||
57 | typedef std::vector<reply_callback_with_tag_t> CallbackList; | |
58 | ||
59 | // struct for holding the connection state object as well as list of topics | |
60 | // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr) | |
61 | // since references to deleted objects may still exist in the calling code | |
62 | struct connection_t { | |
63 | rd_kafka_t* producer = nullptr; | |
64 | rd_kafka_conf_t* temp_conf = nullptr; | |
65 | std::vector<rd_kafka_topic_t*> topics; | |
9f95a23c | 66 | uint64_t delivery_tag = 1; |
f67539c2 | 67 | int status = STATUS_OK; |
9f95a23c TL |
68 | CephContext* const cct; |
69 | CallbackList callbacks; | |
70 | const std::string broker; | |
71 | const bool use_ssl; | |
72 | const bool verify_ssl; // TODO currently iognored, not supported in librdkafka v0.11.6 | |
73 | const boost::optional<std::string> ca_location; | |
74 | const std::string user; | |
75 | const std::string password; | |
1e59de90 | 76 | const boost::optional<std::string> mechanism; |
20effc67 | 77 | utime_t timestamp = ceph_clock_now(); |
9f95a23c TL |
78 | |
79 | // cleanup of all internal connection resource | |
80 | // the object can still remain, and internal connection | |
81 | // resources created again on successful reconnection | |
82 | void destroy(int s) { | |
83 | status = s; | |
84 | // destroy temporary conf (if connection was never established) | |
85 | if (temp_conf) { | |
86 | rd_kafka_conf_destroy(temp_conf); | |
87 | return; | |
88 | } | |
aee94f69 TL |
89 | if (!is_ok()) { |
90 | // no producer, nothing to destroy | |
91 | return; | |
92 | } | |
9f95a23c TL |
93 | // wait for all remaining acks/nacks |
94 | rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */); | |
95 | // destroy all topics | |
96 | std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);}); | |
97 | // destroy producer | |
98 | rd_kafka_destroy(producer); | |
aee94f69 | 99 | producer = nullptr; |
9f95a23c TL |
100 | // fire all remaining callbacks (if not fired by rd_kafka_flush) |
101 | std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) { | |
102 | cb_tag.cb(status); | |
aee94f69 TL |
103 | ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag << |
104 | " for: " << broker << dendl; | |
9f95a23c TL |
105 | }); |
106 | callbacks.clear(); | |
107 | delivery_tag = 1; | |
aee94f69 | 108 | ldout(cct, 20) << "Kafka destroy: complete for: " << broker << dendl; |
9f95a23c TL |
109 | } |
110 | ||
111 | bool is_ok() const { | |
20effc67 | 112 | return (producer != nullptr); |
9f95a23c TL |
113 | } |
114 | ||
115 | // ctor for setting immutable values | |
116 | connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl, | |
117 | const boost::optional<const std::string&>& _ca_location, | |
1e59de90 TL |
118 | const std::string& _user, const std::string& _password, const boost::optional<const std::string&>& _mechanism) : |
119 | cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password), mechanism(_mechanism) {} | |
9f95a23c TL |
120 | |
121 | // dtor also destroys the internals | |
122 | ~connection_t() { | |
aee94f69 | 123 | destroy(status); |
9f95a23c | 124 | } |
9f95a23c TL |
125 | }; |
126 | ||
9f95a23c TL |
127 | // convert int status to string - including RGW specific values |
128 | std::string status_to_string(int s) { | |
129 | switch (s) { | |
130 | case STATUS_OK: | |
131 | return "STATUS_OK"; | |
132 | case STATUS_CONNECTION_CLOSED: | |
133 | return "RGW_KAFKA_STATUS_CONNECTION_CLOSED"; | |
134 | case STATUS_QUEUE_FULL: | |
135 | return "RGW_KAFKA_STATUS_QUEUE_FULL"; | |
136 | case STATUS_MAX_INFLIGHT: | |
137 | return "RGW_KAFKA_STATUS_MAX_INFLIGHT"; | |
138 | case STATUS_MANAGER_STOPPED: | |
139 | return "RGW_KAFKA_STATUS_MANAGER_STOPPED"; | |
140 | case STATUS_CONF_ALLOC_FAILED: | |
141 | return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED"; | |
1e59de90 TL |
142 | case STATUS_CONF_REPLCACE: |
143 | return "RGW_KAFKA_STATUS_CONF_REPLCACE"; | |
aee94f69 TL |
144 | case STATUS_CONNECTION_IDLE: |
145 | return "RGW_KAFKA_STATUS_CONNECTION_IDLE"; | |
9f95a23c TL |
146 | } |
147 | return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s)); | |
148 | } | |
149 | ||
150 | void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) { | |
151 | ceph_assert(opaque); | |
152 | ||
153 | const auto conn = reinterpret_cast<connection_t*>(opaque); | |
154 | const auto result = rkmessage->err; | |
155 | ||
156 | if (!rkmessage->_private) { | |
157 | ldout(conn->cct, 20) << "Kafka run: n/ack received, (no callback) with result=" << result << dendl; | |
158 | return; | |
159 | } | |
160 | ||
161 | const auto tag = reinterpret_cast<uint64_t*>(rkmessage->_private); | |
162 | const auto& callbacks_end = conn->callbacks.end(); | |
163 | const auto& callbacks_begin = conn->callbacks.begin(); | |
164 | const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag); | |
165 | if (tag_it != callbacks_end) { | |
166 | ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" << | |
167 | *tag << " and result=" << rd_kafka_err2str(result) << dendl; | |
168 | tag_it->cb(result); | |
169 | conn->callbacks.erase(tag_it); | |
170 | } else { | |
171 | // TODO add counter for acks with no callback | |
172 | ldout(conn->cct, 10) << "Kafka run: unsolicited n/ack received with tag=" << | |
173 | *tag << dendl; | |
174 | } | |
175 | delete tag; | |
176 | // rkmessage is destroyed automatically by librdkafka | |
177 | } | |
178 | ||
1e59de90 TL |
179 | void log_callback(const rd_kafka_t* rk, int level, const char *fac, const char *buf) { |
180 | ceph_assert(rd_kafka_opaque(rk)); | |
9f95a23c | 181 | |
1e59de90 TL |
182 | const auto conn = reinterpret_cast<connection_t*>(rd_kafka_opaque(rk)); |
183 | if (level <= 3) | |
184 | ldout(conn->cct, 1) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl; | |
185 | else if (level <= 5) | |
186 | ldout(conn->cct, 2) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl; | |
187 | else if (level <= 6) | |
188 | ldout(conn->cct, 10) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl; | |
189 | else | |
190 | ldout(conn->cct, 20) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl; | |
191 | } | |
192 | ||
193 | void poll_err_callback(rd_kafka_t *rk, int err, const char *reason, void *opaque) { | |
194 | const auto conn = reinterpret_cast<connection_t*>(rd_kafka_opaque(rk)); | |
195 | ldout(conn->cct, 10) << "Kafka run: poll error(" << err << "): " << reason << dendl; | |
196 | } | |
197 | ||
198 | using connection_t_ptr = std::unique_ptr<connection_t>; | |
199 | ||
200 | // utility function to create a producer, when the connection object already exists | |
201 | bool new_producer(connection_t* conn) { | |
9f95a23c TL |
202 | // reset all status codes |
203 | conn->status = STATUS_OK; | |
204 | char errstr[512] = {0}; | |
205 | ||
206 | conn->temp_conf = rd_kafka_conf_new(); | |
207 | if (!conn->temp_conf) { | |
208 | conn->status = STATUS_CONF_ALLOC_FAILED; | |
1e59de90 | 209 | return false; |
9f95a23c TL |
210 | } |
211 | ||
212 | // get list of brokers based on the bootsrap broker | |
213 | if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
214 | ||
215 | if (conn->use_ssl) { | |
216 | if (!conn->user.empty()) { | |
217 | // use SSL+SASL | |
218 | if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || | |
9f95a23c TL |
219 | rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || |
220 | rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
221 | ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl; | |
1e59de90 TL |
222 | |
223 | if (conn->mechanism) { | |
224 | if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
225 | ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL mechanism" << dendl; | |
226 | } else { | |
227 | if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
228 | ldout(conn->cct, 20) << "Kafka connect: using default SASL mechanism" << dendl; | |
229 | } | |
230 | ||
9f95a23c TL |
231 | } else { |
232 | // use only SSL | |
233 | if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
234 | ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL security" << dendl; | |
235 | } | |
236 | if (conn->ca_location) { | |
237 | if (rd_kafka_conf_set(conn->temp_conf, "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
238 | ldout(conn->cct, 20) << "Kafka connect: successfully configured CA location" << dendl; | |
239 | } else { | |
240 | ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl; | |
241 | } | |
242 | // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call | |
243 | // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
244 | ||
245 | ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl; | |
1e59de90 TL |
246 | } else if (!conn->user.empty()) { |
247 | // use SASL+PLAINTEXT | |
248 | if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || | |
249 | rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || | |
250 | rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
251 | ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL_PLAINTEXT" << dendl; | |
252 | ||
253 | if (conn->mechanism) { | |
254 | if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
255 | ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL mechanism" << dendl; | |
256 | } else { | |
257 | if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; | |
258 | ldout(conn->cct, 20) << "Kafka connect: using default SASL mechanism" << dendl; | |
259 | } | |
9f95a23c TL |
260 | } |
261 | ||
262 | // set the global callback for delivery success/fail | |
263 | rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback); | |
264 | ||
265 | // set the global opaque pointer to be the connection itself | |
1e59de90 | 266 | rd_kafka_conf_set_opaque(conn->temp_conf, conn); |
9f95a23c | 267 | |
1e59de90 TL |
268 | // redirect kafka logs to RGW |
269 | rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback); | |
aee94f69 TL |
270 | // define poll callback to allow reconnect |
271 | rd_kafka_conf_set_error_cb(conn->temp_conf, poll_err_callback); | |
9f95a23c | 272 | // create the producer |
1e59de90 TL |
273 | if (conn->producer) { |
274 | ldout(conn->cct, 5) << "Kafka connect: producer already exists. detroying the existing before creating a new one" << dendl; | |
275 | conn->destroy(STATUS_CONF_REPLCACE); | |
276 | } | |
9f95a23c TL |
277 | conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr)); |
278 | if (!conn->producer) { | |
279 | conn->status = rd_kafka_last_error(); | |
280 | ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl; | |
1e59de90 | 281 | return false; |
9f95a23c TL |
282 | } |
283 | ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl; | |
1e59de90 TL |
284 | { |
285 | // set log level of producer | |
286 | const auto log_level = conn->cct->_conf->subsys.get_log_level(ceph_subsys_rgw); | |
287 | if (log_level <= 1) | |
288 | rd_kafka_set_log_level(conn->producer, 3); | |
289 | else if (log_level <= 2) | |
290 | rd_kafka_set_log_level(conn->producer, 5); | |
291 | else if (log_level <= 10) | |
292 | rd_kafka_set_log_level(conn->producer, 6); | |
293 | else | |
294 | rd_kafka_set_log_level(conn->producer, 7); | |
295 | } | |
9f95a23c TL |
296 | |
297 | // conf ownership passed to producer | |
298 | conn->temp_conf = nullptr; | |
1e59de90 | 299 | return true; |
9f95a23c TL |
300 | |
301 | conf_error: | |
302 | conn->status = rd_kafka_last_error(); | |
303 | ldout(conn->cct, 1) << "Kafka connect: configuration failed: " << errstr << dendl; | |
1e59de90 | 304 | return false; |
9f95a23c TL |
305 | } |
306 | ||
1e59de90 | 307 | // struct used for holding messages in the message queue |
9f95a23c | 308 | struct message_wrapper_t { |
1e59de90 | 309 | std::string conn_name; |
9f95a23c TL |
310 | std::string topic; |
311 | std::string message; | |
1e59de90 | 312 | const reply_callback_t cb; |
9f95a23c | 313 | |
1e59de90 | 314 | message_wrapper_t(const std::string& _conn_name, |
9f95a23c TL |
315 | const std::string& _topic, |
316 | const std::string& _message, | |
1e59de90 | 317 | reply_callback_t _cb) : conn_name(_conn_name), topic(_topic), message(_message), cb(_cb) {} |
9f95a23c TL |
318 | }; |
319 | ||
1e59de90 | 320 | typedef std::unordered_map<std::string, connection_t_ptr> ConnectionList; |
9f95a23c TL |
321 | typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue; |
322 | ||
9f95a23c TL |
323 | class Manager { |
324 | public: | |
325 | const size_t max_connections; | |
326 | const size_t max_inflight; | |
327 | const size_t max_queue; | |
20effc67 | 328 | const size_t max_idle_time; |
9f95a23c TL |
329 | private: |
330 | std::atomic<size_t> connection_count; | |
331 | bool stopped; | |
332 | int read_timeout_ms; | |
333 | ConnectionList connections; | |
334 | MessageQueue messages; | |
335 | std::atomic<size_t> queued; | |
336 | std::atomic<size_t> dequeued; | |
337 | CephContext* const cct; | |
338 | mutable std::mutex connections_lock; | |
339 | std::thread runner; | |
340 | ||
341 | // TODO use rd_kafka_produce_batch for better performance | |
342 | void publish_internal(message_wrapper_t* message) { | |
1e59de90 TL |
343 | const std::unique_ptr<message_wrapper_t> msg_deleter(message); |
344 | const auto conn_it = connections.find(message->conn_name); | |
345 | if (conn_it == connections.end()) { | |
346 | ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue. error: " << STATUS_CONNECTION_CLOSED << dendl; | |
347 | if (message->cb) { | |
348 | message->cb(STATUS_CONNECTION_CLOSED); | |
349 | } | |
350 | return; | |
351 | } | |
352 | auto& conn = conn_it->second; | |
9f95a23c | 353 | |
20effc67 TL |
354 | conn->timestamp = ceph_clock_now(); |
355 | ||
9f95a23c TL |
356 | if (!conn->is_ok()) { |
357 | // connection had an issue while message was in the queue | |
358 | // TODO add error stats | |
1e59de90 | 359 | ldout(conn->cct, 1) << "Kafka publish: producer was closed while message was in the queue. error: " << status_to_string(conn->status) << dendl; |
9f95a23c TL |
360 | if (message->cb) { |
361 | message->cb(conn->status); | |
362 | } | |
363 | return; | |
364 | } | |
365 | ||
366 | // create a new topic unless it was already created | |
367 | auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic); | |
368 | rd_kafka_topic_t* topic = nullptr; | |
369 | if (topic_it == conn->topics.end()) { | |
370 | topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr); | |
371 | if (!topic) { | |
372 | const auto err = rd_kafka_last_error(); | |
373 | ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl; | |
374 | if (message->cb) { | |
375 | message->cb(err); | |
376 | } | |
377 | conn->destroy(err); | |
378 | return; | |
379 | } | |
380 | // TODO use the topics list as an LRU cache | |
381 | conn->topics.push_back(topic); | |
382 | ldout(conn->cct, 20) << "Kafka publish: successfully created topic: " << message->topic << dendl; | |
383 | } else { | |
384 | topic = *topic_it; | |
385 | ldout(conn->cct, 20) << "Kafka publish: reused existing topic: " << message->topic << dendl; | |
386 | } | |
387 | ||
388 | const auto tag = (message->cb == nullptr ? nullptr : new uint64_t(conn->delivery_tag++)); | |
389 | const auto rc = rd_kafka_produce( | |
390 | topic, | |
391 | // TODO: non builtin partitioning | |
392 | RD_KAFKA_PARTITION_UA, | |
393 | // make a copy of the payload | |
394 | // so it is safe to pass the pointer from the string | |
395 | RD_KAFKA_MSG_F_COPY, | |
396 | message->message.data(), | |
397 | message->message.length(), | |
398 | // optional key and its length | |
399 | nullptr, | |
400 | 0, | |
401 | // opaque data: tag, used in the global callback | |
402 | // in order to invoke the real callback | |
403 | // null if no callback exists | |
404 | tag); | |
405 | if (rc == -1) { | |
406 | const auto err = rd_kafka_last_error(); | |
407 | ldout(conn->cct, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl; | |
408 | // TODO: dont error on full queue, and don't destroy connection, retry instead | |
409 | // immediatly invoke callback on error if needed | |
410 | if (message->cb) { | |
411 | message->cb(err); | |
412 | } | |
413 | conn->destroy(err); | |
414 | delete tag; | |
f67539c2 | 415 | return; |
9f95a23c TL |
416 | } |
417 | ||
418 | if (tag) { | |
419 | auto const q_len = conn->callbacks.size(); | |
420 | if (q_len < max_inflight) { | |
421 | ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl; | |
422 | conn->callbacks.emplace_back(*tag, message->cb); | |
423 | } else { | |
424 | // immediately invoke callback with error - this is not a connection error | |
425 | ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl; | |
426 | message->cb(STATUS_MAX_INFLIGHT); | |
427 | // tag will be deleted when the global callback is invoked | |
428 | } | |
429 | } else { | |
430 | ldout(conn->cct, 20) << "Kafka publish (no callback): OK" << dendl; | |
431 | } | |
432 | } | |
433 | ||
434 | // the managers thread: | |
435 | // (1) empty the queue of messages to be published | |
436 | // (2) loop over all connections and read acks | |
437 | // (3) manages deleted connections | |
438 | // (4) TODO reconnect on connection errors | |
439 | // (5) TODO cleanup timedout callbacks | |
20effc67 | 440 | void run() noexcept { |
9f95a23c TL |
441 | while (!stopped) { |
442 | ||
443 | // publish all messages in the queue | |
444 | auto reply_count = 0U; | |
445 | const auto send_count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1)); | |
446 | dequeued += send_count; | |
447 | ConnectionList::iterator conn_it; | |
448 | ConnectionList::const_iterator end_it; | |
449 | { | |
450 | // thread safe access to the connection list | |
451 | // once the iterators are fetched they are guaranteed to remain valid | |
452 | std::lock_guard lock(connections_lock); | |
453 | conn_it = connections.begin(); | |
454 | end_it = connections.end(); | |
455 | } | |
456 | // loop over all connections to read acks | |
457 | for (;conn_it != end_it;) { | |
458 | ||
459 | auto& conn = conn_it->second; | |
20effc67 TL |
460 | |
461 | // Checking the connection idlesness | |
462 | if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) { | |
aee94f69 | 463 | ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; |
1e59de90 | 464 | std::lock_guard lock(connections_lock); |
aee94f69 TL |
465 | conn->destroy(STATUS_CONNECTION_IDLE); |
466 | conn_it = connections.erase(conn_it); | |
467 | --connection_count; \ | |
468 | continue; | |
9f95a23c TL |
469 | } |
470 | ||
471 | // try to reconnect the connection if it has an error | |
472 | if (!conn->is_ok()) { | |
473 | ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl; | |
474 | const auto& broker = conn_it->first; | |
475 | ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl; | |
1e59de90 | 476 | if (new_producer(conn.get()) == false) { |
9f95a23c TL |
477 | ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl; |
478 | // TODO: add error counter for failed retries | |
479 | // TODO: add exponential backoff for retries | |
480 | } else { | |
481 | ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successfull" << dendl; | |
482 | } | |
aee94f69 TL |
483 | ++conn_it; |
484 | continue; | |
9f95a23c TL |
485 | } |
486 | ||
487 | reply_count += rd_kafka_poll(conn->producer, read_timeout_ms); | |
488 | ||
489 | // just increment the iterator | |
490 | ++conn_it; | |
491 | } | |
492 | // if no messages were received or published | |
493 | // across all connection, sleep for 100ms | |
494 | if (send_count == 0 && reply_count == 0) { | |
495 | std::this_thread::sleep_for(std::chrono::milliseconds(100)); | |
496 | } | |
497 | } | |
498 | } | |
499 | ||
500 | // used in the dtor for message cleanup | |
501 | static void delete_message(const message_wrapper_t* message) { | |
502 | delete message; | |
503 | } | |
504 | ||
505 | public: | |
506 | Manager(size_t _max_connections, | |
507 | size_t _max_inflight, | |
508 | size_t _max_queue, | |
509 | int _read_timeout_ms, | |
510 | CephContext* _cct) : | |
511 | max_connections(_max_connections), | |
512 | max_inflight(_max_inflight), | |
513 | max_queue(_max_queue), | |
20effc67 | 514 | max_idle_time(30), |
9f95a23c TL |
515 | connection_count(0), |
516 | stopped(false), | |
517 | read_timeout_ms(_read_timeout_ms), | |
518 | connections(_max_connections), | |
519 | messages(max_queue), | |
520 | queued(0), | |
521 | dequeued(0), | |
522 | cct(_cct), | |
523 | runner(&Manager::run, this) { | |
524 | // The hashmap has "max connections" as the initial number of buckets, | |
525 | // and allows for 10 collisions per bucket before rehash. | |
526 | // This is to prevent rehashing so that iterators are not invalidated | |
527 | // when a new connection is added. | |
528 | connections.max_load_factor(10.0); | |
529 | // give the runner thread a name for easier debugging | |
530 | const auto rc = ceph_pthread_setname(runner.native_handle(), "kafka_manager"); | |
531 | ceph_assert(rc==0); | |
532 | } | |
533 | ||
534 | // non copyable | |
535 | Manager(const Manager&) = delete; | |
536 | const Manager& operator=(const Manager&) = delete; | |
537 | ||
538 | // stop the main thread | |
539 | void stop() { | |
540 | stopped = true; | |
541 | } | |
542 | ||
9f95a23c | 543 | // connect to a broker, or reuse an existing connection if already connected |
1e59de90 TL |
544 | bool connect(std::string& broker, |
545 | const std::string& url, | |
9f95a23c TL |
546 | bool use_ssl, |
547 | bool verify_ssl, | |
1e59de90 TL |
548 | boost::optional<const std::string&> ca_location, |
549 | boost::optional<const std::string&> mechanism) { | |
9f95a23c | 550 | if (stopped) { |
9f95a23c | 551 | ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl; |
1e59de90 | 552 | return false; |
9f95a23c TL |
553 | } |
554 | ||
1e59de90 TL |
555 | std::string user; |
556 | std::string password; | |
9f95a23c TL |
557 | if (!parse_url_authority(url, broker, user, password)) { |
558 | // TODO: increment counter | |
559 | ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl; | |
1e59de90 | 560 | return false; |
9f95a23c TL |
561 | } |
562 | ||
563 | // this should be validated by the regex in parse_url() | |
564 | ceph_assert(user.empty() == password.empty()); | |
565 | ||
1e59de90 | 566 | if (!user.empty() && !use_ssl && !g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) { |
9f95a23c | 567 | ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl; |
1e59de90 TL |
568 | return false; |
569 | } | |
9f95a23c TL |
570 | |
571 | std::lock_guard lock(connections_lock); | |
572 | const auto it = connections.find(broker); | |
573 | // note that ssl vs. non-ssl connection to the same host are two separate conenctions | |
574 | if (it != connections.end()) { | |
9f95a23c TL |
575 | // connection found - return even if non-ok |
576 | ldout(cct, 20) << "Kafka connect: connection found" << dendl; | |
1e59de90 | 577 | return it->second.get(); |
9f95a23c TL |
578 | } |
579 | ||
580 | // connection not found, creating a new one | |
581 | if (connection_count >= max_connections) { | |
582 | // TODO: increment counter | |
583 | ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl; | |
1e59de90 | 584 | return false; |
9f95a23c | 585 | } |
1e59de90 | 586 | // create_connection must always return a connection object |
9f95a23c TL |
587 | // even if error occurred during creation. |
588 | // in such a case the creation will be retried in the main thread | |
9f95a23c TL |
589 | ++connection_count; |
590 | ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl; | |
1e59de90 TL |
591 | auto conn = connections.emplace(broker, std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism)).first->second.get(); |
592 | if (!new_producer(conn)) { | |
593 | ldout(cct, 10) << "Kafka connect: new connection is created. But producer creation failed. will retry" << dendl; | |
594 | } | |
595 | return true; | |
9f95a23c TL |
596 | } |
597 | ||
598 | // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack) | |
1e59de90 | 599 | int publish(const std::string& conn_name, |
9f95a23c TL |
600 | const std::string& topic, |
601 | const std::string& message) { | |
602 | if (stopped) { | |
603 | return STATUS_MANAGER_STOPPED; | |
604 | } | |
1e59de90 | 605 | if (messages.push(new message_wrapper_t(conn_name, topic, message, nullptr))) { |
9f95a23c TL |
606 | ++queued; |
607 | return STATUS_OK; | |
608 | } | |
609 | return STATUS_QUEUE_FULL; | |
610 | } | |
611 | ||
1e59de90 | 612 | int publish_with_confirm(const std::string& conn_name, |
9f95a23c TL |
613 | const std::string& topic, |
614 | const std::string& message, | |
615 | reply_callback_t cb) { | |
616 | if (stopped) { | |
617 | return STATUS_MANAGER_STOPPED; | |
618 | } | |
1e59de90 | 619 | if (messages.push(new message_wrapper_t(conn_name, topic, message, cb))) { |
9f95a23c TL |
620 | ++queued; |
621 | return STATUS_OK; | |
622 | } | |
623 | return STATUS_QUEUE_FULL; | |
624 | } | |
625 | ||
626 | // dtor wait for thread to stop | |
627 | // then connection are cleaned-up | |
628 | ~Manager() { | |
629 | stopped = true; | |
630 | runner.join(); | |
631 | messages.consume_all(delete_message); | |
632 | } | |
633 | ||
634 | // get the number of connections | |
635 | size_t get_connection_count() const { | |
636 | return connection_count; | |
637 | } | |
638 | ||
639 | // get the number of in-flight messages | |
640 | size_t get_inflight() const { | |
641 | size_t sum = 0; | |
642 | std::lock_guard lock(connections_lock); | |
643 | std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) { | |
644 | sum += conn_pair.second->callbacks.size(); | |
645 | }); | |
646 | return sum; | |
647 | } | |
648 | ||
649 | // running counter of the queued messages | |
650 | size_t get_queued() const { | |
651 | return queued; | |
652 | } | |
653 | ||
654 | // running counter of the dequeued messages | |
655 | size_t get_dequeued() const { | |
656 | return dequeued; | |
657 | } | |
658 | }; | |
659 | ||
660 | // singleton manager | |
661 | // note that the manager itself is not a singleton, and multiple instances may co-exist | |
662 | // TODO make the pointer atomic in allocation and deallocation to avoid race conditions | |
663 | static Manager* s_manager = nullptr; | |
664 | ||
665 | static const size_t MAX_CONNECTIONS_DEFAULT = 256; | |
666 | static const size_t MAX_INFLIGHT_DEFAULT = 8192; | |
667 | static const size_t MAX_QUEUE_DEFAULT = 8192; | |
668 | static const int READ_TIMEOUT_MS_DEFAULT = 500; | |
669 | ||
670 | bool init(CephContext* cct) { | |
671 | if (s_manager) { | |
672 | return false; | |
673 | } | |
674 | // TODO: take conf from CephContext | |
675 | s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, READ_TIMEOUT_MS_DEFAULT, cct); | |
676 | return true; | |
677 | } | |
678 | ||
679 | void shutdown() { | |
680 | delete s_manager; | |
681 | s_manager = nullptr; | |
682 | } | |
683 | ||
1e59de90 TL |
684 | bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, |
685 | boost::optional<const std::string&> ca_location, | |
686 | boost::optional<const std::string&> mechanism) { | |
687 | if (!s_manager) return false; | |
688 | return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism); | |
9f95a23c TL |
689 | } |
690 | ||
1e59de90 | 691 | int publish(const std::string& conn_name, |
9f95a23c TL |
692 | const std::string& topic, |
693 | const std::string& message) { | |
694 | if (!s_manager) return STATUS_MANAGER_STOPPED; | |
1e59de90 | 695 | return s_manager->publish(conn_name, topic, message); |
9f95a23c TL |
696 | } |
697 | ||
1e59de90 | 698 | int publish_with_confirm(const std::string& conn_name, |
9f95a23c TL |
699 | const std::string& topic, |
700 | const std::string& message, | |
701 | reply_callback_t cb) { | |
702 | if (!s_manager) return STATUS_MANAGER_STOPPED; | |
1e59de90 | 703 | return s_manager->publish_with_confirm(conn_name, topic, message, cb); |
9f95a23c TL |
704 | } |
705 | ||
706 | size_t get_connection_count() { | |
707 | if (!s_manager) return 0; | |
708 | return s_manager->get_connection_count(); | |
709 | } | |
710 | ||
711 | size_t get_inflight() { | |
712 | if (!s_manager) return 0; | |
713 | return s_manager->get_inflight(); | |
714 | } | |
715 | ||
716 | size_t get_queued() { | |
717 | if (!s_manager) return 0; | |
718 | return s_manager->get_queued(); | |
719 | } | |
720 | ||
721 | size_t get_dequeued() { | |
722 | if (!s_manager) return 0; | |
723 | return s_manager->get_dequeued(); | |
724 | } | |
725 | ||
726 | size_t get_max_connections() { | |
727 | if (!s_manager) return MAX_CONNECTIONS_DEFAULT; | |
728 | return s_manager->max_connections; | |
729 | } | |
730 | ||
731 | size_t get_max_inflight() { | |
732 | if (!s_manager) return MAX_INFLIGHT_DEFAULT; | |
733 | return s_manager->max_inflight; | |
734 | } | |
735 | ||
736 | size_t get_max_queue() { | |
737 | if (!s_manager) return MAX_QUEUE_DEFAULT; | |
738 | return s_manager->max_queue; | |
739 | } | |
740 | ||
9f95a23c TL |
741 | } // namespace kafka |
742 |