static const int STATUS_QUEUE_FULL = -0x1003;
static const int STATUS_MAX_INFLIGHT = -0x1004;
static const int STATUS_MANAGER_STOPPED = -0x1005;
+static const int STATUS_CONNECTION_IDLE = -0x1006;
// status code for connection opening
static const int STATUS_CONF_ALLOC_FAILED = -0x2001;
static const int STATUS_CONF_REPLCACE = -0x2002;
rd_kafka_conf_destroy(temp_conf);
return;
}
+ if (!is_ok()) {
+ // no producer, nothing to destroy
+ return;
+ }
// wait for all remaining acks/nacks
rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */);
// destroy all topics
std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
// destroy producer
rd_kafka_destroy(producer);
+ producer = nullptr;
// fire all remaining callbacks (if not fired by rd_kafka_flush)
std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
cb_tag.cb(status);
- ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag << dendl;
+ ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag <<
+ " for: " << broker << dendl;
});
callbacks.clear();
delivery_tag = 1;
+ ldout(cct, 20) << "Kafka destroy: complete for: " << broker << dendl;
}
bool is_ok() const {
// dtor also destroys the internals
~connection_t() {
- destroy(STATUS_CONNECTION_CLOSED);
+ destroy(status);
}
};
return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
case STATUS_CONF_REPLCACE:
return "RGW_KAFKA_STATUS_CONF_REPLCACE";
+ case STATUS_CONNECTION_IDLE:
+ return "RGW_KAFKA_STATUS_CONNECTION_IDLE";
}
return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
}
// redirect kafka logs to RGW
rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback);
+ // define poll callback to allow reconnect
+ rd_kafka_conf_set_error_cb(conn->temp_conf, poll_err_callback);
// create the producer
if (conn->producer) {
ldout(conn->cct, 5) << "Kafka connect: producer already exists. detroying the existing before creating a new one" << dendl;
typedef std::unordered_map<std::string, connection_t_ptr> ConnectionList;
typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
-// macros used inside a loop where an iterator is either incremented or erased
-#define INCREMENT_AND_CONTINUE(IT) \
- ++IT; \
- continue;
-
-#define ERASE_AND_CONTINUE(IT,CONTAINER) \
- IT=CONTAINER.erase(IT); \
- --connection_count; \
- continue;
-
class Manager {
public:
const size_t max_connections;
// Checking the connection idlesness
if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
- ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
+ ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
std::lock_guard lock(connections_lock);
- ERASE_AND_CONTINUE(conn_it, connections);
+ conn->destroy(STATUS_CONNECTION_IDLE);
+ conn_it = connections.erase(conn_it);
+ --connection_count; \
+ continue;
}
// try to reconnect the connection if it has an error
} else {
ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successfull" << dendl;
}
- INCREMENT_AND_CONTINUE(conn_it);
+ ++conn_it;
+ continue;
}
reply_count += rd_kafka_poll(conn->producer, read_timeout_ms);