]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_kafka.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_kafka.cc
index 1e8af36ba879bfcdc20d5d2ea9214c37acdd6dd0..9b75c9d0885749b55d3278cb9e6a27bd7f7b22dd 100644 (file)
@@ -62,7 +62,6 @@ struct connection_t {
   rd_kafka_t* producer = nullptr;
   rd_kafka_conf_t* temp_conf = nullptr;
   std::vector<rd_kafka_topic_t*> topics;
-  bool marked_for_deletion = false;
   uint64_t delivery_tag = 1;
   int status = STATUS_OK;
   mutable std::atomic<int> ref_count = 0;
@@ -74,6 +73,7 @@ struct connection_t {
   const boost::optional<std::string> ca_location;
   const std::string user;
   const std::string password;
+  utime_t timestamp = ceph_clock_now();
 
   // cleanup of all internal connection resource
   // the object can still remain, and internal connection
@@ -101,7 +101,7 @@ struct connection_t {
   }
 
   bool is_ok() const {
-    return (producer != nullptr && !marked_for_deletion);
+    return (producer != nullptr);
   }
 
   // ctor for setting immutable values
@@ -187,7 +187,7 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void*
 // utility function to create a connection, when the connection object already exists
 connection_ptr_t& create_connection(connection_ptr_t& conn) {
   // pointer must be valid and not marked for deletion
-  ceph_assert(conn && !conn->marked_for_deletion);
+  ceph_assert(conn);
   
   // reset all status codes
   conn->status = STATUS_OK; 
@@ -295,6 +295,7 @@ public:
   const size_t max_connections;
   const size_t max_inflight;
   const size_t max_queue;
+  const size_t max_idle_time;
 private:
   std::atomic<size_t> connection_count;
   bool stopped;
@@ -312,6 +313,8 @@ private:
     const std::unique_ptr<message_wrapper_t> msg_owner(message);
     auto& conn = message->conn;
 
+    conn->timestamp = ceph_clock_now(); 
+
     if (!conn->is_ok()) {
       // connection had an issue while message was in the queue
       // TODO add error stats
@@ -396,7 +399,7 @@ private:
   // (3) manages deleted connections
   // (4) TODO reconnect on connection errors
   // (5) TODO cleanup timedout callbacks
-  void run() {
+  void run() noexcept {
     while (!stopped) {
 
       // publish all messages in the queue
@@ -416,13 +419,10 @@ private:
       for (;conn_it != end_it;) {
         
         auto& conn = conn_it->second;
-        // delete the connection if marked for deletion
-        if (conn->marked_for_deletion) {
-          ldout(conn->cct, 10) << "Kafka run: connection is deleted" << dendl;
-          conn->destroy(STATUS_CONNECTION_CLOSED);
-          std::lock_guard lock(connections_lock);
-          // erase is safe - does not invalidate any other iterator
-          // lock so no insertion happens at the same time
+
+        // Checking the connection idlesness
+        if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+          ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
           ERASE_AND_CONTINUE(conn_it, connections);
         }
 
@@ -468,6 +468,7 @@ public:
     max_connections(_max_connections),
     max_inflight(_max_inflight),
     max_queue(_max_queue),
+    max_idle_time(30),
     connection_count(0),
     stopped(false),
     read_timeout_ms(_read_timeout_ms),
@@ -496,15 +497,6 @@ public:
     stopped = true;
   }
 
-  // disconnect from a broker
-  bool disconnect(connection_ptr_t& conn) {
-    if (!conn || stopped) {
-      return false;
-    }
-    conn->marked_for_deletion = true;
-    return true;
-  }
-
   // connect to a broker, or reuse an existing connection if already connected
   connection_ptr_t connect(const std::string& url, 
           bool use_ssl,
@@ -537,11 +529,6 @@ public:
     const auto it = connections.find(broker);
     // note that ssl vs. non-ssl connection to the same host are two separate conenctions
     if (it != connections.end()) {
-      if (it->second->marked_for_deletion) {
-        // TODO: increment counter
-        ldout(cct, 1) << "Kafka connect: endpoint marked for deletion" << dendl;
-        return nullptr;
-      }
       // connection found - return even if non-ok
       ldout(cct, 20) << "Kafka connect: connection found" << dendl;
       return it->second;
@@ -711,10 +698,5 @@ size_t get_max_queue() {
   return s_manager->max_queue;
 }
 
-bool disconnect(connection_ptr_t& conn) {
-  if (!s_manager) return false;
-  return s_manager->disconnect(conn);
-}
-
 } // namespace kafka