]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_kafka.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / rgw / rgw_kafka.cc
index 651d7099ebc58bedc7f9132e952e1dbbe4fcc0ec..642787a38cf17b38600dec777ed0fc98ca4214f2 100644 (file)
@@ -35,6 +35,7 @@ static const int STATUS_CONNECTION_CLOSED =      -0x1002;
 static const int STATUS_QUEUE_FULL =             -0x1003;
 static const int STATUS_MAX_INFLIGHT =           -0x1004;
 static const int STATUS_MANAGER_STOPPED =        -0x1005;
+static const int STATUS_CONNECTION_IDLE =        -0x1006;
 // status code for connection opening
 static const int STATUS_CONF_ALLOC_FAILED      = -0x2001;
 static const int STATUS_CONF_REPLCACE          = -0x2002;
@@ -85,19 +86,26 @@ struct connection_t {
         rd_kafka_conf_destroy(temp_conf);
         return;
     }
+    if (!is_ok()) {
+      // no producer, nothing to destroy
+      return;
+    }
     // wait for all remaining acks/nacks
     rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */);
     // destroy all topics
     std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
     // destroy producer
     rd_kafka_destroy(producer);
+    producer = nullptr;
     // fire all remaining callbacks (if not fired by rd_kafka_flush)
     std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
         cb_tag.cb(status);
-        ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag << dendl;
+        ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag << 
+          " for: " << broker << dendl;
       });
     callbacks.clear();
     delivery_tag = 1;
+    ldout(cct, 20) << "Kafka destroy: complete for: " << broker << dendl;
   }
 
   bool is_ok() const {
@@ -112,7 +120,7 @@ struct connection_t {
 
   // dtor also destroys the internals
   ~connection_t() {
-    destroy(STATUS_CONNECTION_CLOSED);
+    destroy(status);
   }
 };
 
@@ -133,6 +141,8 @@ std::string status_to_string(int s) {
       return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
     case STATUS_CONF_REPLCACE:
       return "RGW_KAFKA_STATUS_CONF_REPLCACE";
+    case STATUS_CONNECTION_IDLE:
+      return "RGW_KAFKA_STATUS_CONNECTION_IDLE";
   }
   return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
 }
@@ -257,6 +267,8 @@ bool new_producer(connection_t* conn) {
 
   // redirect kafka logs to RGW
   rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback);
+  // define poll callback to allow reconnect
+  rd_kafka_conf_set_error_cb(conn->temp_conf, poll_err_callback);
   // create the producer
   if (conn->producer) {
     ldout(conn->cct, 5) << "Kafka connect: producer already exists. detroying the existing before creating a new one" << dendl;
@@ -308,16 +320,6 @@ struct message_wrapper_t {
 typedef std::unordered_map<std::string, connection_t_ptr> ConnectionList;
 typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
 
-// macros used inside a loop where an iterator is either incremented or erased
-#define INCREMENT_AND_CONTINUE(IT) \
-          ++IT; \
-          continue;
-
-#define ERASE_AND_CONTINUE(IT,CONTAINER) \
-          IT=CONTAINER.erase(IT); \
-          --connection_count; \
-          continue;
-
 class Manager {
 public:
   const size_t max_connections;
@@ -458,9 +460,12 @@ private:
 
         // Checking the connection idlesness
         if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
-          ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
+          ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
           std::lock_guard lock(connections_lock);
-          ERASE_AND_CONTINUE(conn_it, connections);
+          conn->destroy(STATUS_CONNECTION_IDLE);
+          conn_it = connections.erase(conn_it);
+          --connection_count; \
+          continue;
         }
 
         // try to reconnect the connection if it has an error
@@ -475,7 +480,8 @@ private:
           } else {
             ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successfull" << dendl;
           }
-          INCREMENT_AND_CONTINUE(conn_it);
+          ++conn_it;
+          continue;
         }
 
         reply_count += rd_kafka_poll(conn->producer, read_timeout_ms);