rd_kafka_t* producer = nullptr;
rd_kafka_conf_t* temp_conf = nullptr;
std::vector<rd_kafka_topic_t*> topics;
- bool marked_for_deletion = false;
uint64_t delivery_tag = 1;
int status = STATUS_OK;
mutable std::atomic<int> ref_count = 0;
const boost::optional<std::string> ca_location;
const std::string user;
const std::string password;
+ utime_t timestamp = ceph_clock_now();
// cleanup of all internal connection resource
// the object can still remain, and internal connection
}
bool is_ok() const {
- return (producer != nullptr && !marked_for_deletion);
+ return (producer != nullptr);
}
// ctor for setting immutable values
// utility function to create a connection, when the connection object already exists
connection_ptr_t& create_connection(connection_ptr_t& conn) {
// pointer must be valid and not marked for deletion
- ceph_assert(conn && !conn->marked_for_deletion);
+ ceph_assert(conn);
// reset all status codes
conn->status = STATUS_OK;
const size_t max_connections;
const size_t max_inflight;
const size_t max_queue;
+ const size_t max_idle_time;
private:
std::atomic<size_t> connection_count;
bool stopped;
const std::unique_ptr<message_wrapper_t> msg_owner(message);
auto& conn = message->conn;
+ conn->timestamp = ceph_clock_now();
+
if (!conn->is_ok()) {
// connection had an issue while message was in the queue
// TODO add error stats
// (3) manages deleted connections
// (4) TODO reconnect on connection errors
// (5) TODO cleanup timedout callbacks
- void run() {
+ void run() noexcept {
while (!stopped) {
// publish all messages in the queue
for (;conn_it != end_it;) {
auto& conn = conn_it->second;
- // delete the connection if marked for deletion
- if (conn->marked_for_deletion) {
- ldout(conn->cct, 10) << "Kafka run: connection is deleted" << dendl;
- conn->destroy(STATUS_CONNECTION_CLOSED);
- std::lock_guard lock(connections_lock);
- // erase is safe - does not invalidate any other iterator
- // lock so no insertion happens at the same time
+
+ // Checking the connection idlesness
+ if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+ ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
ERASE_AND_CONTINUE(conn_it, connections);
}
max_connections(_max_connections),
max_inflight(_max_inflight),
max_queue(_max_queue),
+ max_idle_time(30),
connection_count(0),
stopped(false),
read_timeout_ms(_read_timeout_ms),
stopped = true;
}
- // disconnect from a broker
- bool disconnect(connection_ptr_t& conn) {
- if (!conn || stopped) {
- return false;
- }
- conn->marked_for_deletion = true;
- return true;
- }
-
// connect to a broker, or reuse an existing connection if already connected
connection_ptr_t connect(const std::string& url,
bool use_ssl,
const auto it = connections.find(broker);
// note that ssl vs. non-ssl connection to the same host are two separate conenctions
if (it != connections.end()) {
- if (it->second->marked_for_deletion) {
- // TODO: increment counter
- ldout(cct, 1) << "Kafka connect: endpoint marked for deletion" << dendl;
- return nullptr;
- }
// connection found - return even if non-ok
ldout(cct, 20) << "Kafka connect: connection found" << dendl;
return it->second;
return s_manager->max_queue;
}
-bool disconnect(connection_ptr_t& conn) {
- if (!s_manager) return false;
- return s_manager->disconnect(conn);
-}
-
} // namespace kafka