1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
6 #include <librdkafka/rdkafka.h>
7 #include "include/ceph_assert.h"
10 #include <unordered_map>
16 #include <boost/lockfree/queue.hpp>
17 #include "common/dout.h"
19 #define dout_subsys ceph_subsys_rgw
21 // TODO investigation, not necessarily issues:
22 // (1) in case of single threaded writer context use spsc_queue
23 // (2) check performance of emptying queue to local list, and go over the list and publish
24 // (3) use std::shared_mutex (c++17) or equivalent for the connections lock
26 // cmparisson operator between topic pointer and name
27 bool operator==(const rd_kafka_topic_t
* rkt
, const std::string
& name
) {
28 return name
== std::string_view(rd_kafka_topic_name(rkt
));
31 namespace rgw::kafka
{
33 // status codes for publishing
34 // TODO: use the actual error code (when conn exists) instead of STATUS_CONNECTION_CLOSED when replying to client
35 static const int STATUS_CONNECTION_CLOSED
= -0x1002;
36 static const int STATUS_QUEUE_FULL
= -0x1003;
37 static const int STATUS_MAX_INFLIGHT
= -0x1004;
38 static const int STATUS_MANAGER_STOPPED
= -0x1005;
39 // status code for connection opening
40 static const int STATUS_CONF_ALLOC_FAILED
= -0x2001;
42 static const int STATUS_OK
= 0x0;
44 // struct for holding the callback and its tag in the callback list
45 struct reply_callback_with_tag_t
{
49 reply_callback_with_tag_t(uint64_t _tag
, reply_callback_t _cb
) : tag(_tag
), cb(_cb
) {}
51 bool operator==(uint64_t rhs
) {
56 typedef std::vector
<reply_callback_with_tag_t
> CallbackList
;
58 // struct for holding the connection state object as well as list of topics
59 // it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
60 // since references to deleted objects may still exist in the calling code
62 rd_kafka_t
* producer
= nullptr;
63 rd_kafka_conf_t
* temp_conf
= nullptr;
64 std::vector
<rd_kafka_topic_t
*> topics
;
65 bool marked_for_deletion
= false;
66 uint64_t delivery_tag
= 1;
68 mutable std::atomic
<int> ref_count
= 0;
69 CephContext
* const cct
;
70 CallbackList callbacks
;
71 const std::string broker
;
73 const bool verify_ssl
; // TODO currently iognored, not supported in librdkafka v0.11.6
74 const boost::optional
<std::string
> ca_location
;
75 const std::string user
;
76 const std::string password
;
78 // cleanup of all internal connection resource
79 // the object can still remain, and internal connection
80 // resources created again on successful reconnection
83 // destroy temporary conf (if connection was never established)
85 rd_kafka_conf_destroy(temp_conf
);
88 // wait for all remaining acks/nacks
89 rd_kafka_flush(producer
, 5*1000 /* wait for max 5 seconds */);
91 std::for_each(topics
.begin(), topics
.end(), [](auto topic
) {rd_kafka_topic_destroy(topic
);});
93 rd_kafka_destroy(producer
);
94 // fire all remaining callbacks (if not fired by rd_kafka_flush)
95 std::for_each(callbacks
.begin(), callbacks
.end(), [this](auto& cb_tag
) {
97 ldout(cct
, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag
.tag
<< dendl
;
104 return (producer
!= nullptr && !marked_for_deletion
);
107 // ctor for setting immutable values
108 connection_t(CephContext
* _cct
, const std::string
& _broker
, bool _use_ssl
, bool _verify_ssl
,
109 const boost::optional
<const std::string
&>& _ca_location
,
110 const std::string
& _user
, const std::string
& _password
) :
111 cct(_cct
), broker(_broker
), use_ssl(_use_ssl
), verify_ssl(_verify_ssl
), ca_location(_ca_location
), user(_user
), password(_password
) {}
113 // dtor also destroys the internals
115 destroy(STATUS_CONNECTION_CLOSED
);
118 friend void intrusive_ptr_add_ref(const connection_t
* p
);
119 friend void intrusive_ptr_release(const connection_t
* p
);
122 std::string
to_string(const connection_ptr_t
& conn
) {
124 str
+= "\nBroker: " + conn
->broker
;
125 str
+= conn
->use_ssl
? "\nUse SSL" : "";
126 str
+= conn
->ca_location
? "\nCA Location: " + *(conn
->ca_location
) : "";
129 // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
130 void intrusive_ptr_add_ref(const connection_t
* p
) {
133 void intrusive_ptr_release(const connection_t
* p
) {
134 if (--p
->ref_count
== 0) {
139 // convert int status to string - including RGW specific values
140 std::string
status_to_string(int s
) {
144 case STATUS_CONNECTION_CLOSED
:
145 return "RGW_KAFKA_STATUS_CONNECTION_CLOSED";
146 case STATUS_QUEUE_FULL
:
147 return "RGW_KAFKA_STATUS_QUEUE_FULL";
148 case STATUS_MAX_INFLIGHT
:
149 return "RGW_KAFKA_STATUS_MAX_INFLIGHT";
150 case STATUS_MANAGER_STOPPED
:
151 return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
152 case STATUS_CONF_ALLOC_FAILED
:
153 return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
155 return std::string(rd_kafka_err2str((rd_kafka_resp_err_t
)s
));
158 void message_callback(rd_kafka_t
* rk
, const rd_kafka_message_t
* rkmessage
, void* opaque
) {
161 const auto conn
= reinterpret_cast<connection_t
*>(opaque
);
162 const auto result
= rkmessage
->err
;
164 if (!rkmessage
->_private
) {
165 ldout(conn
->cct
, 20) << "Kafka run: n/ack received, (no callback) with result=" << result
<< dendl
;
169 const auto tag
= reinterpret_cast<uint64_t*>(rkmessage
->_private
);
170 const auto& callbacks_end
= conn
->callbacks
.end();
171 const auto& callbacks_begin
= conn
->callbacks
.begin();
172 const auto tag_it
= std::find(callbacks_begin
, callbacks_end
, *tag
);
173 if (tag_it
!= callbacks_end
) {
174 ldout(conn
->cct
, 20) << "Kafka run: n/ack received, invoking callback with tag=" <<
175 *tag
<< " and result=" << rd_kafka_err2str(result
) << dendl
;
177 conn
->callbacks
.erase(tag_it
);
179 // TODO add counter for acks with no callback
180 ldout(conn
->cct
, 10) << "Kafka run: unsolicited n/ack received with tag=" <<
184 // rkmessage is destroyed automatically by librdkafka
187 // utility function to create a connection, when the connection object already exists
188 connection_ptr_t
& create_connection(connection_ptr_t
& conn
) {
189 // pointer must be valid and not marked for deletion
190 ceph_assert(conn
&& !conn
->marked_for_deletion
);
192 // reset all status codes
193 conn
->status
= STATUS_OK
;
194 char errstr
[512] = {0};
196 conn
->temp_conf
= rd_kafka_conf_new();
197 if (!conn
->temp_conf
) {
198 conn
->status
= STATUS_CONF_ALLOC_FAILED
;
202 // get list of brokers based on the bootsrap broker
203 if (rd_kafka_conf_set(conn
->temp_conf
, "bootstrap.servers", conn
->broker
.c_str(), errstr
, sizeof(errstr
)) != RD_KAFKA_CONF_OK
) goto conf_error
;
206 if (!conn
->user
.empty()) {
208 if (rd_kafka_conf_set(conn
->temp_conf
, "security.protocol", "SASL_SSL", errstr
, sizeof(errstr
)) != RD_KAFKA_CONF_OK
||
209 rd_kafka_conf_set(conn
->temp_conf
, "sasl.mechanism", "PLAIN", errstr
, sizeof(errstr
)) != RD_KAFKA_CONF_OK
||
210 rd_kafka_conf_set(conn
->temp_conf
, "sasl.username", conn
->user
.c_str(), errstr
, sizeof(errstr
)) != RD_KAFKA_CONF_OK
||
211 rd_kafka_conf_set(conn
->temp_conf
, "sasl.password", conn
->password
.c_str(), errstr
, sizeof(errstr
)) != RD_KAFKA_CONF_OK
) goto conf_error
;
212 ldout(conn
->cct
, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl
;
215 if (rd_kafka_conf_set(conn
->temp_conf
, "security.protocol", "SSL", errstr
, sizeof(errstr
)) != RD_KAFKA_CONF_OK
) goto conf_error
;
216 ldout(conn
->cct
, 20) << "Kafka connect: successfully configured SSL security" << dendl
;
218 if (conn
->ca_location
) {
219 if (rd_kafka_conf_set(conn
->temp_conf
, "ssl.ca.location", conn
->ca_location
->c_str(), errstr
, sizeof(errstr
)) != RD_KAFKA_CONF_OK
) goto conf_error
;
220 ldout(conn
->cct
, 20) << "Kafka connect: successfully configured CA location" << dendl
;
222 ldout(conn
->cct
, 20) << "Kafka connect: using default CA location" << dendl
;
224 // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call
225 // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
227 ldout(conn
->cct
, 20) << "Kafka connect: successfully configured security" << dendl
;
230 // set the global callback for delivery success/fail
231 rd_kafka_conf_set_dr_msg_cb(conn
->temp_conf
, message_callback
);
233 // set the global opaque pointer to be the connection itself
234 rd_kafka_conf_set_opaque(conn
->temp_conf
, conn
.get());
236 // create the producer
237 conn
->producer
= rd_kafka_new(RD_KAFKA_PRODUCER
, conn
->temp_conf
, errstr
, sizeof(errstr
));
238 if (!conn
->producer
) {
239 conn
->status
= rd_kafka_last_error();
240 ldout(conn
->cct
, 1) << "Kafka connect: failed to create producer: " << errstr
<< dendl
;
243 ldout(conn
->cct
, 20) << "Kafka connect: successfully created new producer" << dendl
;
245 // conf ownership passed to producer
246 conn
->temp_conf
= nullptr;
250 conn
->status
= rd_kafka_last_error();
251 ldout(conn
->cct
, 1) << "Kafka connect: configuration failed: " << errstr
<< dendl
;
255 // utility function to create a new connection
256 connection_ptr_t
create_new_connection(const std::string
& broker
, CephContext
* cct
,
259 boost::optional
<const std::string
&> ca_location
,
260 const std::string
& user
,
261 const std::string
& password
) {
262 // create connection state
263 connection_ptr_t
conn(new connection_t(cct
, broker
, use_ssl
, verify_ssl
, ca_location
, user
, password
));
264 return create_connection(conn
);
267 /// struct used for holding messages in the message queue
268 struct message_wrapper_t
{
269 connection_ptr_t conn
;
274 message_wrapper_t(connection_ptr_t
& _conn
,
275 const std::string
& _topic
,
276 const std::string
& _message
,
277 reply_callback_t _cb
) : conn(_conn
), topic(_topic
), message(_message
), cb(_cb
) {}
280 typedef std::unordered_map
<std::string
, connection_ptr_t
> ConnectionList
;
281 typedef boost::lockfree::queue
<message_wrapper_t
*, boost::lockfree::fixed_sized
<true>> MessageQueue
;
283 // macros used inside a loop where an iterator is either incremented or erased
284 #define INCREMENT_AND_CONTINUE(IT) \
288 #define ERASE_AND_CONTINUE(IT,CONTAINER) \
289 IT=CONTAINER.erase(IT); \
290 --connection_count; \
295 const size_t max_connections
;
296 const size_t max_inflight
;
297 const size_t max_queue
;
299 std::atomic
<size_t> connection_count
;
302 ConnectionList connections
;
303 MessageQueue messages
;
304 std::atomic
<size_t> queued
;
305 std::atomic
<size_t> dequeued
;
306 CephContext
* const cct
;
307 mutable std::mutex connections_lock
;
310 // TODO use rd_kafka_produce_batch for better performance
311 void publish_internal(message_wrapper_t
* message
) {
312 const std::unique_ptr
<message_wrapper_t
> msg_owner(message
);
313 auto& conn
= message
->conn
;
315 if (!conn
->is_ok()) {
316 // connection had an issue while message was in the queue
317 // TODO add error stats
318 ldout(conn
->cct
, 1) << "Kafka publish: connection had an issue while message was in the queue. error: " << status_to_string(conn
->status
) << dendl
;
320 message
->cb(conn
->status
);
325 // create a new topic unless it was already created
326 auto topic_it
= std::find(conn
->topics
.begin(), conn
->topics
.end(), message
->topic
);
327 rd_kafka_topic_t
* topic
= nullptr;
328 if (topic_it
== conn
->topics
.end()) {
329 topic
= rd_kafka_topic_new(conn
->producer
, message
->topic
.c_str(), nullptr);
331 const auto err
= rd_kafka_last_error();
332 ldout(conn
->cct
, 1) << "Kafka publish: failed to create topic: " << message
->topic
<< " error: " << status_to_string(err
) << dendl
;
339 // TODO use the topics list as an LRU cache
340 conn
->topics
.push_back(topic
);
341 ldout(conn
->cct
, 20) << "Kafka publish: successfully created topic: " << message
->topic
<< dendl
;
344 ldout(conn
->cct
, 20) << "Kafka publish: reused existing topic: " << message
->topic
<< dendl
;
347 const auto tag
= (message
->cb
== nullptr ? nullptr : new uint64_t(conn
->delivery_tag
++));
348 const auto rc
= rd_kafka_produce(
350 // TODO: non builtin partitioning
351 RD_KAFKA_PARTITION_UA
,
352 // make a copy of the payload
353 // so it is safe to pass the pointer from the string
355 message
->message
.data(),
356 message
->message
.length(),
357 // optional key and its length
360 // opaque data: tag, used in the global callback
361 // in order to invoke the real callback
362 // null if no callback exists
365 const auto err
= rd_kafka_last_error();
366 ldout(conn
->cct
, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err
) << dendl
;
367 // TODO: dont error on full queue, and don't destroy connection, retry instead
368 // immediatly invoke callback on error if needed
377 auto const q_len
= conn
->callbacks
.size();
378 if (q_len
< max_inflight
) {
379 ldout(conn
->cct
, 20) << "Kafka publish (with callback, tag=" << *tag
<< "): OK. Queue has: " << q_len
<< " callbacks" << dendl
;
380 conn
->callbacks
.emplace_back(*tag
, message
->cb
);
382 // immediately invoke callback with error - this is not a connection error
383 ldout(conn
->cct
, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl
;
384 message
->cb(STATUS_MAX_INFLIGHT
);
385 // tag will be deleted when the global callback is invoked
388 ldout(conn
->cct
, 20) << "Kafka publish (no callback): OK" << dendl
;
392 // the managers thread:
393 // (1) empty the queue of messages to be published
394 // (2) loop over all connections and read acks
395 // (3) manages deleted connections
396 // (4) TODO reconnect on connection errors
397 // (5) TODO cleanup timedout callbacks
401 // publish all messages in the queue
402 auto reply_count
= 0U;
403 const auto send_count
= messages
.consume_all(std::bind(&Manager::publish_internal
, this, std::placeholders::_1
));
404 dequeued
+= send_count
;
405 ConnectionList::iterator conn_it
;
406 ConnectionList::const_iterator end_it
;
408 // thread safe access to the connection list
409 // once the iterators are fetched they are guaranteed to remain valid
410 std::lock_guard
lock(connections_lock
);
411 conn_it
= connections
.begin();
412 end_it
= connections
.end();
414 // loop over all connections to read acks
415 for (;conn_it
!= end_it
;) {
417 auto& conn
= conn_it
->second
;
418 // delete the connection if marked for deletion
419 if (conn
->marked_for_deletion
) {
420 ldout(conn
->cct
, 10) << "Kafka run: connection is deleted" << dendl
;
421 conn
->destroy(STATUS_CONNECTION_CLOSED
);
422 std::lock_guard
lock(connections_lock
);
423 // erase is safe - does not invalidate any other iterator
424 // lock so no insertion happens at the same time
425 ERASE_AND_CONTINUE(conn_it
, connections
);
428 // try to reconnect the connection if it has an error
429 if (!conn
->is_ok()) {
430 ldout(conn
->cct
, 10) << "Kafka run: connection status is: " << status_to_string(conn
->status
) << dendl
;
431 const auto& broker
= conn_it
->first
;
432 ldout(conn
->cct
, 20) << "Kafka run: retry connection" << dendl
;
433 if (create_connection(conn
)->is_ok() == false) {
434 ldout(conn
->cct
, 10) << "Kafka run: connection (" << broker
<< ") retry failed" << dendl
;
435 // TODO: add error counter for failed retries
436 // TODO: add exponential backoff for retries
438 ldout(conn
->cct
, 10) << "Kafka run: connection (" << broker
<< ") retry successfull" << dendl
;
440 INCREMENT_AND_CONTINUE(conn_it
);
443 reply_count
+= rd_kafka_poll(conn
->producer
, read_timeout_ms
);
445 // just increment the iterator
448 // if no messages were received or published
449 // across all connection, sleep for 100ms
450 if (send_count
== 0 && reply_count
== 0) {
451 std::this_thread::sleep_for(std::chrono::milliseconds(100));
456 // used in the dtor for message cleanup
457 static void delete_message(const message_wrapper_t
* message
) {
462 Manager(size_t _max_connections
,
463 size_t _max_inflight
,
465 int _read_timeout_ms
,
467 max_connections(_max_connections
),
468 max_inflight(_max_inflight
),
469 max_queue(_max_queue
),
472 read_timeout_ms(_read_timeout_ms
),
473 connections(_max_connections
),
478 runner(&Manager::run
, this) {
479 // The hashmap has "max connections" as the initial number of buckets,
480 // and allows for 10 collisions per bucket before rehash.
481 // This is to prevent rehashing so that iterators are not invalidated
482 // when a new connection is added.
483 connections
.max_load_factor(10.0);
484 // give the runner thread a name for easier debugging
485 const auto rc
= ceph_pthread_setname(runner
.native_handle(), "kafka_manager");
490 Manager(const Manager
&) = delete;
491 const Manager
& operator=(const Manager
&) = delete;
493 // stop the main thread
498 // disconnect from a broker
499 bool disconnect(connection_ptr_t
& conn
) {
500 if (!conn
|| stopped
) {
503 conn
->marked_for_deletion
= true;
507 // connect to a broker, or reuse an existing connection if already connected
508 connection_ptr_t
connect(const std::string
& url
,
511 boost::optional
<const std::string
&> ca_location
) {
513 // TODO: increment counter
514 ldout(cct
, 1) << "Kafka connect: manager is stopped" << dendl
;
520 std::string password
;
521 if (!parse_url_authority(url
, broker
, user
, password
)) {
522 // TODO: increment counter
523 ldout(cct
, 1) << "Kafka connect: URL parsing failed" << dendl
;
527 // this should be validated by the regex in parse_url()
528 ceph_assert(user
.empty() == password
.empty());
530 if (!user
.empty() && !use_ssl
) {
531 ldout(cct
, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl
;
535 std::lock_guard
lock(connections_lock
);
536 const auto it
= connections
.find(broker
);
537 // note that ssl vs. non-ssl connection to the same host are two separate conenctions
538 if (it
!= connections
.end()) {
539 if (it
->second
->marked_for_deletion
) {
540 // TODO: increment counter
541 ldout(cct
, 1) << "Kafka connect: endpoint marked for deletion" << dendl
;
544 // connection found - return even if non-ok
545 ldout(cct
, 20) << "Kafka connect: connection found" << dendl
;
549 // connection not found, creating a new one
550 if (connection_count
>= max_connections
) {
551 // TODO: increment counter
552 ldout(cct
, 1) << "Kafka connect: max connections exceeded" << dendl
;
555 const auto conn
= create_new_connection(broker
, cct
, use_ssl
, verify_ssl
, ca_location
, user
, password
);
556 // create_new_connection must always return a connection object
557 // even if error occurred during creation.
558 // in such a case the creation will be retried in the main thread
561 ldout(cct
, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count
<< dendl
;
562 return connections
.emplace(broker
, conn
).first
->second
;
565 // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
566 int publish(connection_ptr_t
& conn
,
567 const std::string
& topic
,
568 const std::string
& message
) {
570 return STATUS_MANAGER_STOPPED
;
572 if (!conn
|| !conn
->is_ok()) {
573 return STATUS_CONNECTION_CLOSED
;
575 if (messages
.push(new message_wrapper_t(conn
, topic
, message
, nullptr))) {
579 return STATUS_QUEUE_FULL
;
582 int publish_with_confirm(connection_ptr_t
& conn
,
583 const std::string
& topic
,
584 const std::string
& message
,
585 reply_callback_t cb
) {
587 return STATUS_MANAGER_STOPPED
;
589 if (!conn
|| !conn
->is_ok()) {
590 return STATUS_CONNECTION_CLOSED
;
592 if (messages
.push(new message_wrapper_t(conn
, topic
, message
, cb
))) {
596 return STATUS_QUEUE_FULL
;
599 // dtor wait for thread to stop
600 // then connection are cleaned-up
604 messages
.consume_all(delete_message
);
607 // get the number of connections
608 size_t get_connection_count() const {
609 return connection_count
;
612 // get the number of in-flight messages
613 size_t get_inflight() const {
615 std::lock_guard
lock(connections_lock
);
616 std::for_each(connections
.begin(), connections
.end(), [&sum
](auto& conn_pair
) {
617 sum
+= conn_pair
.second
->callbacks
.size();
622 // running counter of the queued messages
623 size_t get_queued() const {
627 // running counter of the dequeued messages
628 size_t get_dequeued() const {
634 // note that the manager itself is not a singleton, and multiple instances may co-exist
635 // TODO make the pointer atomic in allocation and deallocation to avoid race conditions
636 static Manager
* s_manager
= nullptr;
638 static const size_t MAX_CONNECTIONS_DEFAULT
= 256;
639 static const size_t MAX_INFLIGHT_DEFAULT
= 8192;
640 static const size_t MAX_QUEUE_DEFAULT
= 8192;
641 static const int READ_TIMEOUT_MS_DEFAULT
= 500;
643 bool init(CephContext
* cct
) {
647 // TODO: take conf from CephContext
648 s_manager
= new Manager(MAX_CONNECTIONS_DEFAULT
, MAX_INFLIGHT_DEFAULT
, MAX_QUEUE_DEFAULT
, READ_TIMEOUT_MS_DEFAULT
, cct
);
657 connection_ptr_t
connect(const std::string
& url
, bool use_ssl
, bool verify_ssl
,
658 boost::optional
<const std::string
&> ca_location
) {
659 if (!s_manager
) return nullptr;
660 return s_manager
->connect(url
, use_ssl
, verify_ssl
, ca_location
);
663 int publish(connection_ptr_t
& conn
,
664 const std::string
& topic
,
665 const std::string
& message
) {
666 if (!s_manager
) return STATUS_MANAGER_STOPPED
;
667 return s_manager
->publish(conn
, topic
, message
);
670 int publish_with_confirm(connection_ptr_t
& conn
,
671 const std::string
& topic
,
672 const std::string
& message
,
673 reply_callback_t cb
) {
674 if (!s_manager
) return STATUS_MANAGER_STOPPED
;
675 return s_manager
->publish_with_confirm(conn
, topic
, message
, cb
);
678 size_t get_connection_count() {
679 if (!s_manager
) return 0;
680 return s_manager
->get_connection_count();
683 size_t get_inflight() {
684 if (!s_manager
) return 0;
685 return s_manager
->get_inflight();
688 size_t get_queued() {
689 if (!s_manager
) return 0;
690 return s_manager
->get_queued();
693 size_t get_dequeued() {
694 if (!s_manager
) return 0;
695 return s_manager
->get_dequeued();
698 size_t get_max_connections() {
699 if (!s_manager
) return MAX_CONNECTIONS_DEFAULT
;
700 return s_manager
->max_connections
;
703 size_t get_max_inflight() {
704 if (!s_manager
) return MAX_INFLIGHT_DEFAULT
;
705 return s_manager
->max_inflight
;
708 size_t get_max_queue() {
709 if (!s_manager
) return MAX_QUEUE_DEFAULT
;
710 return s_manager
->max_queue
;
713 bool disconnect(connection_ptr_t
& conn
) {
714 if (!s_manager
) return false;
715 return s_manager
->disconnect(conn
);