return -1;
}
- knet_h->enabled = enabled;
-
if (enabled) {
+ knet_h->enabled = enabled;
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled");
} else {
- log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
+ /*
+ * notify TX and RX threads to flush the queues
+ */
+ if (set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSH) < 0) {
+ log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for TX thread");
+ }
+ if (set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSH) < 0) {
+ log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for RX thread");
+ }
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
+ /*
+ * when disabling data forward, we need to give time to TX and RX
+ * to flush the queues.
+ *
+ * the TX thread is the main leader here. When there is no more
+ * data in the TX queue, we will also close traffic for RX.
+ */
+ if (!enabled) {
+ /*
+ * this usleep might be unnecessary, but wait_all_threads_flush_queue
+ * adds extra locking delay.
+ *
+ * allow all threads to run free without extra locking interference
+ * and then we switch to a more active wait in case the scheduler
+ * has decided to delay one thread or another
+ */
+ usleep(knet_h->threads_timer_res * 2);
+ wait_all_threads_flush_queue(knet_h);
+
+ /*
+ * all threads have done flushing the queue, we can stop data forwarding
+ */
+ savederrno = get_global_wrlock(knet_h);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+ knet_h->enabled = enabled;
+ log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ }
+
errno = 0;
return 0;
}
struct knet_header *pingbuf;
struct knet_header *pmtudbuf;
uint8_t threads_status[KNET_THREAD_MAX];
+ uint8_t threads_flush_queue[KNET_THREAD_MAX];
useconds_t threads_timer_res;
pthread_mutex_t threads_status_mutex;
pthread_t send_to_links_thread;
flush_logs(logfds[0], stdout);
+ if (knet_handle_setfwd(knet_h, 0) < 0) {
+ printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+ knet_link_set_enable(knet_h, 1, 0, 0);
+ knet_link_clear_config(knet_h, 1, 0);
+ knet_host_remove(knet_h, 1);
+ knet_handle_free(knet_h);
+ flush_logs(logfds[0], stdout);
+ close_logpipes(logfds);
+ exit(FAIL);
+ }
+
if (wait_for_packet(knet_h, 10, datafd, logfds[0], stdout)) {
printf("Error waiting for packet: %s\n", strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
flush_logs(logfds[0], stdout);
+ if (knet_handle_setfwd(knet_h, 0) < 0) {
+ printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+ knet_link_set_enable(knet_h, 1, 0, 0);
+ knet_link_clear_config(knet_h, 1, 0);
+ knet_host_remove(knet_h, 1);
+ knet_handle_free(knet_h);
+ flush_logs(logfds[0], stdout);
+ close_logpipes(logfds);
+ exit(FAIL);
+ }
+
+ flush_logs(logfds[0], stdout);
+
if (wait_for_packet(knet_h, 10, datafd, logfds[0], stdout)) {
printf("Error waiting for packet: %s\n", strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
flush_logs(logfds[0], stdout);
+ if (knet_handle_setfwd(knet_h, 0) < 0) {
+ printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+ knet_link_set_enable(knet_h, 1, 0, 0);
+ knet_link_clear_config(knet_h, 1, 0);
+ knet_host_remove(knet_h, 1);
+ knet_handle_free(knet_h);
+ flush_logs(logfds[0], stdout);
+ close_logpipes(logfds);
+ exit(FAIL);
+ }
+
if (wait_for_packet(knet_h, 10, datafd, logfds[0], stdout)) {
printf("Error waiting for packet: %s\n", strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
flush_logs(logfds[0], stdout);
+ if (knet_handle_setfwd(knet_h, 0) < 0) {
+ printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+ knet_link_set_enable(knet_h, 1, 0, 0);
+ knet_link_clear_config(knet_h, 1, 0);
+ knet_host_remove(knet_h, 1);
+ knet_handle_free(knet_h);
+ flush_logs(logfds[0], stdout);
+ close_logpipes(logfds);
+ exit(FAIL);
+ }
+
if (wait_for_packet(knet_h, 10, datafd, logfds[0], stdout)) {
printf("Error waiting for packet: %s\n", strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
printf("Test knet_send with only localhost\n");
+ if (knet_handle_setfwd(knet_h, 1) < 0) {
+ printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+ knet_link_set_enable(knet_h, 1, 0, 0);
+ knet_link_clear_config(knet_h, 1, 0);
+ knet_host_remove(knet_h, 1);
+ knet_handle_free(knet_h);
+ flush_logs(logfds[0], stdout);
+ close_logpipes(logfds);
+ exit(FAIL);
+ }
+
if (knet_handle_enable_filter(knet_h, NULL, dhost_filter) < 0) {
printf("knet_handle_enable_filter failed: %s\n", strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
flush_logs(logfds[0], stdout);
+ if (knet_handle_setfwd(knet_h, 0) < 0) {
+ printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+ knet_link_set_enable(knet_h, 1, 0, 0);
+ knet_link_clear_config(knet_h, 1, 0);
+ knet_host_remove(knet_h, 1);
+ knet_handle_free(knet_h);
+ flush_logs(logfds[0], stdout);
+ close_logpipes(logfds);
+ exit(FAIL);
+ }
+
if (wait_for_packet(knet_h, 10, datafd, logfds[0], stdout)) {
printf("Error waiting for packet: %s\n", strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
flush_logs(logfds[0], stdout);
+ if (knet_handle_setfwd(knet_h, 0) < 0) {
+ printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+ knet_link_set_enable(knet_h, 1, 0, 0);
+ knet_link_clear_config(knet_h, 1, 0);
+ knet_host_remove(knet_h, 1);
+ knet_handle_free(knet_h);
+ flush_logs(logfds[0], stdout);
+ close_logpipes(logfds);
+ exit(FAIL);
+ }
+
knet_link_set_enable(knet_h, 1, 0, 0);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
return -1;
}
+ if (knet_handle_setfwd(knet_h, 0) < 0) {
+ printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+ return -1;
+ }
+
if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) {
printf("knet_host_get_host_list failed: %s\n", strerror(errno));
return -1;
return "unknown";
}
+int get_thread_flush_queue(knet_handle_t knet_h, uint8_t thread_id)
+{
+ uint8_t flush;
+
+ if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) {
+ log_debug(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock");
+ return -1;
+ }
+
+ flush = knet_h->threads_flush_queue[thread_id];
+
+ pthread_mutex_unlock(&knet_h->threads_status_mutex);
+ return flush;
+}
+
+int set_thread_flush_queue(knet_handle_t knet_h, uint8_t thread_id, uint8_t status)
+{
+ if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) {
+ log_debug(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock");
+ return -1;
+ }
+
+ knet_h->threads_flush_queue[thread_id] = status;
+
+ log_debug(knet_h, KNET_SUB_HANDLE, "Updated flush queue request for thread %s to %u",
+ get_thread_name(thread_id), status);
+
+ pthread_mutex_unlock(&knet_h->threads_status_mutex);
+ return 0;
+}
+
+int wait_all_threads_flush_queue(knet_handle_t knet_h)
+{
+ uint8_t i = 0, found = 0;
+
+ while (!found) {
+ usleep(knet_h->threads_timer_res);
+
+ if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) {
+ continue;
+ }
+
+ found = 1;
+
+ for (i = 0; i < KNET_THREAD_MAX; i++) {
+ if (knet_h->threads_flush_queue[i] == KNET_THREAD_QUEUE_FLUSHED) {
+ continue;
+ }
+ log_debug(knet_h, KNET_SUB_HANDLE, "Checking thread: %s queue: %u",
+ get_thread_name(i),
+ knet_h->threads_flush_queue[i]);
+ if (knet_h->threads_flush_queue[i] != KNET_THREAD_QUEUE_FLUSHED) {
+ found = 0;
+ }
+ }
+
+ pthread_mutex_unlock(&knet_h->threads_status_mutex);
+ }
+
+ return 0;
+}
+
int set_thread_status(knet_handle_t knet_h, uint8_t thread_id, uint8_t status)
{
if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) {
#endif
#define KNET_THREAD_MAX 32
+#define KNET_THREAD_QUEUE_FLUSHED 0
+#define KNET_THREAD_QUEUE_FLUSH 1
+
#define timespec_diff(start, end, diff) \
do { \
if (end.tv_sec > start.tv_sec) \
int shutdown_in_progress(knet_handle_t knet_h);
int get_global_wrlock(knet_handle_t knet_h);
+int get_thread_flush_queue(knet_handle_t knet_h, uint8_t thread_id);
+int set_thread_flush_queue(knet_handle_t knet_h, uint8_t thread_id, uint8_t status);
+int wait_all_threads_flush_queue(knet_handle_t knet_h);
int set_thread_status(knet_handle_t knet_h, uint8_t thread_id, uint8_t status);
int wait_all_threads_status(knet_handle_t knet_h, uint8_t status);
void force_pmtud_run(knet_handle_t knet_h, uint8_t subsystem, uint8_t reset_mtu);
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000);
+ /*
+ * the RX threads only need to notify that there has been at least
+ * one successful run after queue flush has been requested.
+ * See setfwd in handle.c
+ */
+ if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) {
+ set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
+ }
+
/*
* we use timeout to detect if thread is shutting down
*/
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
int i, nev, type;
+ int flush, flush_queue_limit;
int8_t channel;
struct iovec iov_in;
struct msghdr msg;
knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
}
+ flush_queue_limit = 0;
+
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, knet_h->threads_timer_res / 1000);
+ flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX);
+
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
+ /*
+ * ideally we want to communicate that we are done flushing
+ * the queue when we have an epoll timeout event
+ */
+ if (flush == KNET_THREAD_QUEUE_FLUSH) {
+ set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
+ flush_queue_limit = 0;
+ }
continue;
}
+ /*
+ * fall back in case the TX sockets will continue receive traffic
+ * and we do not hit an epoll timeout.
+ *
+ * allow up to a 100 loops to flush queues, then we give up.
+ * there might be more clean ways to do it by checking the buffer queue
+ * on each socket, but we have tons of sockets and calculations can go wrong.
+ * Also, why would you disable data forwarding and still send packets?
+ */
+ if (flush == KNET_THREAD_QUEUE_FLUSH) {
+ if (flush_queue_limit >= 100) {
+ log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss");
+ set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
+ flush_queue_limit = 0;
+ } else {
+ flush_queue_limit++;
+ }
+ } else {
+ flush_queue_limit = 0;
+ }
+
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
continue;
_handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type);
pthread_mutex_unlock(&knet_h->tx_mutex);
}
+
pthread_rwlock_unlock(&knet_h->global_rwlock);
}