]> git.proxmox.com Git - mirror_kronosnet.git/commitdiff
[threads] allow knet_handle_setfwd to flush socket queues
authorFabio M. Di Nitto <fdinitto@redhat.com>
Thu, 27 Jun 2019 08:55:23 +0000 (10:55 +0200)
committerFabio M. Di Nitto <fdinitto@redhat.com>
Thu, 27 Jun 2019 08:55:23 +0000 (10:55 +0200)
Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
12 files changed:
libknet/handle.c
libknet/internals.h
libknet/tests/api_knet_send.c
libknet/tests/api_knet_send_compress.c
libknet/tests/api_knet_send_crypto.c
libknet/tests/api_knet_send_loopback.c
libknet/tests/api_knet_send_sync.c
libknet/tests/test-common.c
libknet/threads_common.c
libknet/threads_common.h
libknet/threads_rx.c
libknet/threads_tx.c

index ba771b48db0573113ad43b1cfd72fbe61cc032d8..05babafb72002ec0c621fc6564b5072da63a0f8f 100644 (file)
@@ -1173,16 +1173,57 @@ int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled)
                return -1;
        }
 
-       knet_h->enabled = enabled;
-
        if (enabled) {
+               knet_h->enabled = enabled;
                log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled");
        } else {
-               log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
+               /*
+                * notify TX and RX threads to flush the queues
+                */
+               if (set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSH) < 0) {
+                       log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for TX thread");
+               }
+               if (set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSH) < 0) {
+                       log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for RX thread");
+               }
        }
 
        pthread_rwlock_unlock(&knet_h->global_rwlock);
 
+       /*
+        * when disabling data forward, we need to give time to TX and RX
+        * to flush the queues.
+        *
+        * the TX thread is the main leader here. When there is no more
+        * data in the TX queue, we will also close traffic for RX.
+        */
+       if (!enabled) {
+               /*
+                * this usleep might be unnecessary, but wait_all_threads_flush_queue
+                * adds extra locking delay.
+                *
+                * allow all threads to run free without extra locking interference
+                * and then we switch to a more active wait in case the scheduler
+                * has decided to delay one thread or another
+                */
+               usleep(knet_h->threads_timer_res * 2);
+               wait_all_threads_flush_queue(knet_h);
+
+               /*
+                * all threads have done flushing the queue, we can stop data forwarding
+                */
+               savederrno = get_global_wrlock(knet_h);
+               if (savederrno) {
+                       log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+                               strerror(savederrno));
+                       errno = savederrno;
+                       return -1;
+               }
+               knet_h->enabled = enabled;
+               log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
+               pthread_rwlock_unlock(&knet_h->global_rwlock);
+       }
+
        errno = 0;
        return 0;
 }
index d20c6152c8c138344e16226b728c78243d527285..3f4859fc3ad7f1925eedb6f154bcfb54e73d1546 100644 (file)
@@ -178,6 +178,7 @@ struct knet_handle {
        struct knet_header *pingbuf;
        struct knet_header *pmtudbuf;
        uint8_t threads_status[KNET_THREAD_MAX];
+       uint8_t threads_flush_queue[KNET_THREAD_MAX];
        useconds_t threads_timer_res;
        pthread_mutex_t threads_status_mutex;
        pthread_t send_to_links_thread;
index 601613409717467dceac7ed98f6f170620ffa22a..50469ee30dfede6664d37502b2919cd43ca21b58 100644 (file)
@@ -246,6 +246,17 @@ static void test(uint8_t transport)
 
        flush_logs(logfds[0], stdout);
 
+       if (knet_handle_setfwd(knet_h, 0) < 0) {
+               printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+               knet_link_set_enable(knet_h, 1, 0, 0);
+               knet_link_clear_config(knet_h, 1, 0);
+               knet_host_remove(knet_h, 1);
+               knet_handle_free(knet_h);
+               flush_logs(logfds[0], stdout);
+               close_logpipes(logfds);
+               exit(FAIL);
+       }
+
        if (wait_for_packet(knet_h, 10, datafd, logfds[0], stdout)) {
                printf("Error waiting for packet: %s\n", strerror(errno));
                knet_link_set_enable(knet_h, 1, 0, 0);
index 6d5f4457eabdeba834b7ab54df97b70f6cc278e7..bbb357a4455ae26375e948a292cb7bba7bc4cca5 100644 (file)
@@ -170,6 +170,19 @@ static void test(const char *model)
 
        flush_logs(logfds[0], stdout);
 
+       if (knet_handle_setfwd(knet_h, 0) < 0) {
+               printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+               knet_link_set_enable(knet_h, 1, 0, 0);
+               knet_link_clear_config(knet_h, 1, 0);
+               knet_host_remove(knet_h, 1);
+               knet_handle_free(knet_h);
+               flush_logs(logfds[0], stdout);
+               close_logpipes(logfds);
+               exit(FAIL);
+       }
+
+       flush_logs(logfds[0], stdout);
+
        if (wait_for_packet(knet_h, 10, datafd, logfds[0], stdout)) {
                printf("Error waiting for packet: %s\n", strerror(errno));
                knet_link_set_enable(knet_h, 1, 0, 0);
index 11de85763f4599243fdb3bf53bd453518c6f00da..3e53e039c5b00304117cb5a9cb856e2dcc274b8a 100644 (file)
@@ -171,6 +171,17 @@ static void test(const char *model)
 
        flush_logs(logfds[0], stdout);
 
+       if (knet_handle_setfwd(knet_h, 0) < 0) {
+               printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+               knet_link_set_enable(knet_h, 1, 0, 0);
+               knet_link_clear_config(knet_h, 1, 0);
+               knet_host_remove(knet_h, 1);
+               knet_handle_free(knet_h);
+               flush_logs(logfds[0], stdout);
+               close_logpipes(logfds);
+               exit(FAIL);
+       }
+
        if (wait_for_packet(knet_h, 10, datafd, logfds[0], stdout)) {
                printf("Error waiting for packet: %s\n", strerror(errno));
                knet_link_set_enable(knet_h, 1, 0, 0);
index 741b51d3adebd4d812d74e37f4bdb9e60adad449..6c0054c2fede18d100189c7f4fe9196619ecaae8 100644 (file)
@@ -251,6 +251,17 @@ static void test(void)
 
        flush_logs(logfds[0], stdout);
 
+       if (knet_handle_setfwd(knet_h, 0) < 0) {
+               printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+               knet_link_set_enable(knet_h, 1, 0, 0);
+               knet_link_clear_config(knet_h, 1, 0);
+               knet_host_remove(knet_h, 1);
+               knet_handle_free(knet_h);
+               flush_logs(logfds[0], stdout);
+               close_logpipes(logfds);
+               exit(FAIL);
+       }
+
        if (wait_for_packet(knet_h, 10, datafd, logfds[0], stdout)) {
                printf("Error waiting for packet: %s\n", strerror(errno));
                knet_link_set_enable(knet_h, 1, 0, 0);
@@ -317,6 +328,17 @@ static void test(void)
 
        printf("Test knet_send with only localhost\n");
 
+       if (knet_handle_setfwd(knet_h, 1) < 0) {
+               printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+               knet_link_set_enable(knet_h, 1, 0, 0);
+               knet_link_clear_config(knet_h, 1, 0);
+               knet_host_remove(knet_h, 1);
+               knet_handle_free(knet_h);
+               flush_logs(logfds[0], stdout);
+               close_logpipes(logfds);
+               exit(FAIL);
+       }
+
        if (knet_handle_enable_filter(knet_h, NULL, dhost_filter) < 0) {
                printf("knet_handle_enable_filter failed: %s\n", strerror(errno));
                knet_link_set_enable(knet_h, 1, 0, 0);
@@ -352,6 +374,17 @@ static void test(void)
 
        flush_logs(logfds[0], stdout);
 
+       if (knet_handle_setfwd(knet_h, 0) < 0) {
+               printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+               knet_link_set_enable(knet_h, 1, 0, 0);
+               knet_link_clear_config(knet_h, 1, 0);
+               knet_host_remove(knet_h, 1);
+               knet_handle_free(knet_h);
+               flush_logs(logfds[0], stdout);
+               close_logpipes(logfds);
+               exit(FAIL);
+       }
+
        if (wait_for_packet(knet_h, 10, datafd, logfds[0], stdout)) {
                printf("Error waiting for packet: %s\n", strerror(errno));
                knet_link_set_enable(knet_h, 1, 0, 0);
index 96cb716a9a7689aedc344482a69e7fd09d314b9c..007e4f59a9048eed6ba2c963801bf68072898ca1 100644 (file)
@@ -375,6 +375,17 @@ static void test(void)
 
        flush_logs(logfds[0], stdout);
 
+       if (knet_handle_setfwd(knet_h, 0) < 0) {
+               printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+               knet_link_set_enable(knet_h, 1, 0, 0);
+               knet_link_clear_config(knet_h, 1, 0);
+               knet_host_remove(knet_h, 1);
+               knet_handle_free(knet_h);
+               flush_logs(logfds[0], stdout);
+               close_logpipes(logfds);
+               exit(FAIL);
+       }
+
        knet_link_set_enable(knet_h, 1, 0, 0);
        knet_link_clear_config(knet_h, 1, 0);
        knet_host_remove(knet_h, 1);
index 13dd44932b100dd84d10530a208c876f1484a3f7..02afde6cf3c69622fa8d7f9728f2a9143413c288 100644 (file)
@@ -404,6 +404,11 @@ int knet_handle_stop(knet_handle_t knet_h)
                return -1;
        }
 
+       if (knet_handle_setfwd(knet_h, 0) < 0) {
+               printf("knet_handle_setfwd failed: %s\n", strerror(errno));
+               return -1;
+       }
+
        if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) {
                printf("knet_host_get_host_list failed: %s\n", strerror(errno));
                return -1;
index d1ba85ccda87083c6305d1e41abf0a7b5e965790..99c87978d0c7b765936393d7e32b6de0cb5f2f04 100644 (file)
@@ -109,6 +109,68 @@ static const char *get_thread_name(uint8_t thread_id)
        return "unknown";
 }
 
+int get_thread_flush_queue(knet_handle_t knet_h, uint8_t thread_id)
+{
+       uint8_t flush;
+
+       if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) {
+               log_debug(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock");
+               return -1;
+       }
+
+       flush = knet_h->threads_flush_queue[thread_id];
+
+       pthread_mutex_unlock(&knet_h->threads_status_mutex);
+       return flush;
+}
+
+int set_thread_flush_queue(knet_handle_t knet_h, uint8_t thread_id, uint8_t status)
+{
+       if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) {
+               log_debug(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock");
+               return -1;
+       }
+
+       knet_h->threads_flush_queue[thread_id] = status;
+
+       log_debug(knet_h, KNET_SUB_HANDLE, "Updated flush queue request for thread %s to %u",
+                 get_thread_name(thread_id), status);
+
+       pthread_mutex_unlock(&knet_h->threads_status_mutex);
+       return 0;
+}
+
+int wait_all_threads_flush_queue(knet_handle_t knet_h)
+{
+       uint8_t i = 0, found = 0;
+
+       while (!found) {
+               usleep(knet_h->threads_timer_res);
+
+               if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) {
+                       continue;
+               }
+
+               found = 1;
+
+               for (i = 0; i < KNET_THREAD_MAX; i++) {
+                       if (knet_h->threads_flush_queue[i] == KNET_THREAD_QUEUE_FLUSHED) {
+                               continue;
+                       }
+                       log_debug(knet_h, KNET_SUB_HANDLE, "Checking thread: %s queue: %u",
+                                       get_thread_name(i),
+                                       knet_h->threads_flush_queue[i]);
+                       if (knet_h->threads_flush_queue[i] != KNET_THREAD_QUEUE_FLUSHED) {
+                               found = 0;
+                       }
+               }
+
+               pthread_mutex_unlock(&knet_h->threads_status_mutex);
+       }
+
+       return 0;
+}
+
 int set_thread_status(knet_handle_t knet_h, uint8_t thread_id, uint8_t status)
 {
        if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) {
index 7abcd534dbe6dbdfe3454c1f8201b45b07a20407..5c3a6a71802b9a5a2f98cd7af83337f339f5a690 100644 (file)
@@ -30,6 +30,9 @@
 #endif
 #define KNET_THREAD_MAX                32
 
+#define KNET_THREAD_QUEUE_FLUSHED 0
+#define KNET_THREAD_QUEUE_FLUSH   1
+
 #define timespec_diff(start, end, diff) \
 do { \
        if (end.tv_sec > start.tv_sec) \
@@ -41,6 +44,9 @@ do { \
 
 int shutdown_in_progress(knet_handle_t knet_h);
 int get_global_wrlock(knet_handle_t knet_h);
+int get_thread_flush_queue(knet_handle_t knet_h, uint8_t thread_id);
+int set_thread_flush_queue(knet_handle_t knet_h, uint8_t thread_id, uint8_t status);
+int wait_all_threads_flush_queue(knet_handle_t knet_h);
 int set_thread_status(knet_handle_t knet_h, uint8_t thread_id, uint8_t status);
 int wait_all_threads_status(knet_handle_t knet_h, uint8_t status);
 void force_pmtud_run(knet_handle_t knet_h, uint8_t subsystem, uint8_t reset_mtu);
index da1d6c91dd9a7c88d2fdd983666e96299b4ca681..584150ba4735440f7753370e45d348b126341cb6 100644 (file)
@@ -862,6 +862,15 @@ void *_handle_recv_from_links_thread(void *data)
        while (!shutdown_in_progress(knet_h)) {
                nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000);
 
+               /*
+                * the RX threads only need to notify that there has been at least
+                * one successful run after queue flush has been requested.
+                * See setfwd in handle.c
+                */
+               if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) {
+                       set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
+               }
+
                /*
                 * we use timeout to detect if thread is shutting down
                 */
index 9c369bcfbd274902f0049107b15cd4d6fb922255..50a43a01154ab06e6a55ec743a24a6f2bb11ddc4 100644 (file)
@@ -676,6 +676,7 @@ void *_handle_send_to_links_thread(void *data)
        knet_handle_t knet_h = (knet_handle_t) data;
        struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
        int i, nev, type;
+       int flush, flush_queue_limit;
        int8_t channel;
        struct iovec iov_in;
        struct msghdr msg;
@@ -703,16 +704,49 @@ void *_handle_send_to_links_thread(void *data)
                knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
        }
 
+       flush_queue_limit = 0;
+
        while (!shutdown_in_progress(knet_h)) {
                nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, knet_h->threads_timer_res / 1000);
 
+               flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX);
+
                /*
                 * we use timeout to detect if thread is shutting down
                 */
                if (nev == 0) {
+                       /*
+                        * ideally we want to communicate that we are done flushing
+                        * the queue when we have an epoll timeout event
+                        */
+                       if (flush == KNET_THREAD_QUEUE_FLUSH) {
+                               set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
+                               flush_queue_limit = 0;
+                       }
                        continue;
                }
 
+               /*
+                * fall back in case the TX sockets will continue receive traffic
+                * and we do not hit an epoll timeout.
+                *
+                * allow up to a 100 loops to flush queues, then we give up.
+                * there might be more clean ways to do it by checking the buffer queue
+                * on each socket, but we have tons of sockets and calculations can go wrong.
+                * Also, why would you disable data forwarding and still send packets?
+                */
+               if (flush == KNET_THREAD_QUEUE_FLUSH) {
+                       if (flush_queue_limit >= 100) {
+                               log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss");
+                               set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
+                               flush_queue_limit = 0;
+                       } else {
+                               flush_queue_limit++;
+                       }
+               } else {
+                       flush_queue_limit = 0;
+               }
+
                if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
                        log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
                        continue;
@@ -742,6 +776,7 @@ void *_handle_send_to_links_thread(void *data)
                        _handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type);
                        pthread_mutex_unlock(&knet_h->tx_mutex);
                }
+
                pthread_rwlock_unlock(&knet_h->global_rwlock);
        }