]> git.proxmox.com Git - mirror_kronosnet.git/commitdiff
[libknet] allow better handling of internal threads
authorFabio M. Di Nitto <fdinitto@redhat.com>
Mon, 7 Jan 2019 08:54:31 +0000 (09:54 +0100)
committerFabio M. Di Nitto <fdinitto@redhat.com>
Mon, 7 Jan 2019 08:54:31 +0000 (09:54 +0100)
while playing around adding some dynamic internal threads, i did
hit a limitation with allocating a dynamic THREAD_MAX value based
on OS availability of certain features.

this patch changes from dynamic to static allocation of threads, that
can always be increased if necessary without breaking anything.

simplify thread numbering and conditionals on OS features

improve debugging of thread status by adding a 2 phase thread registration
process:

- register a thread before starting it with THREAD_REGISTERED before pthread_create
- set a thread status as started with THREAD_STARTED

any failure in between can now be detected in debugging logs.

Note for developers: all threads *MUST* be registered/started before
wait_all_threads_status check in handle.c.

Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
libknet/handle.c
libknet/threads_common.c
libknet/threads_common.h
libknet/threads_dsthandler.c
libknet/threads_heartbeat.c
libknet/threads_pmtud.c
libknet/threads_rx.c
libknet/threads_tx.c
libknet/transport_sctp.c

index 7f0008eaa71b615084d93e98b2a5b1d6d3df610f..07a616c266e55986a38bb8dafc90a03e053367b0 100644 (file)
@@ -454,6 +454,7 @@ static int _start_threads(knet_handle_t knet_h)
 {
        int savederrno = 0;
 
+       set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_REGISTERED);
        savederrno = pthread_create(&knet_h->pmtud_link_handler_thread, 0,
                                    _handle_pmtud_link_thread, (void *) knet_h);
        if (savederrno) {
@@ -462,6 +463,7 @@ static int _start_threads(knet_handle_t knet_h)
                goto exit_fail;
        }
 
+       set_thread_status(knet_h, KNET_THREAD_DST_LINK, KNET_THREAD_REGISTERED);
        savederrno = pthread_create(&knet_h->dst_link_handler_thread, 0,
                                    _handle_dst_link_handler_thread, (void *) knet_h);
        if (savederrno) {
@@ -470,6 +472,7 @@ static int _start_threads(knet_handle_t knet_h)
                goto exit_fail;
        }
 
+       set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_REGISTERED);
        savederrno = pthread_create(&knet_h->send_to_links_thread, 0,
                                    _handle_send_to_links_thread, (void *) knet_h);
        if (savederrno) {
@@ -478,6 +481,7 @@ static int _start_threads(knet_handle_t knet_h)
                goto exit_fail;
        }
 
+       set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_REGISTERED);
        savederrno = pthread_create(&knet_h->recv_from_links_thread, 0,
                                    _handle_recv_from_links_thread, (void *) knet_h);
        if (savederrno) {
@@ -486,6 +490,7 @@ static int _start_threads(knet_handle_t knet_h)
                goto exit_fail;
        }
 
+       set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_REGISTERED);
        savederrno = pthread_create(&knet_h->heartbt_thread, 0,
                                    _handle_heartbt_thread, (void *) knet_h);
        if (savederrno) {
@@ -697,7 +702,7 @@ knet_handle_t knet_handle_new(knet_node_id_t host_id,
                goto exit_fail;
        }
 
-       wait_all_threads_status(knet_h, KNET_THREAD_RUNNING);
+       wait_all_threads_status(knet_h, KNET_THREAD_STARTED);
 
        errno = 0;
        return knet_h;
index fc10a029d88f23a65920359430bc0a87dba71527..6f0ce4a4f9fa9497618226c04dbaa748af18c434 100644 (file)
@@ -64,7 +64,7 @@ int get_global_wrlock(knet_handle_t knet_h)
        return pthread_rwlock_wrlock(&knet_h->global_rwlock);
 }
 
-static struct pretty_names thread_names[] =
+static struct pretty_names thread_names[KNET_THREAD_MAX] =
 {
        { "TX", KNET_THREAD_TX },
        { "RX", KNET_THREAD_RX },
@@ -79,16 +79,12 @@ static struct pretty_names thread_names[] =
 
 static struct pretty_names thread_status[] =
 {
-       { "stopped", KNET_THREAD_STOPPED },
-       { "running", KNET_THREAD_RUNNING }
+       { "unregistered", KNET_THREAD_UNREGISTERED },
+       { "registered", KNET_THREAD_REGISTERED },
+       { "started", KNET_THREAD_STARTED },
+       { "stopped", KNET_THREAD_STOPPED }
 };
 
-/*
- * this seems overloaded at the moment but
- * we might want to expand status checks
- * to include "starting" and "stopping"
- */
-
 static const char *get_thread_status_name(uint8_t status)
 {
        unsigned int i;
@@ -143,6 +139,9 @@ int wait_all_threads_status(knet_handle_t knet_h, uint8_t status)
                found = 1;
 
                for (i = 0; i < KNET_THREAD_MAX; i++) {
+                       if (knet_h->threads_status[i] == KNET_THREAD_UNREGISTERED) {
+                               continue;
+                       }
                        log_debug(knet_h, KNET_SUB_HANDLE, "Checking thread: %s status: %s req: %s",
                                        get_thread_name(i),
                                        get_thread_status_name(knet_h->threads_status[i]),
index 78e1cf0289a99fd6af1ed39e1f75e25aa41ccdf1..1314c017a70d456532a02de4befc957b2284f108 100644 (file)
 
 #define KNET_THREADS_TIMERES 200000
 
-#define KNET_THREAD_STOPPED    0
-#define KNET_THREAD_RUNNING    1
-#define KNET_THREAD_STATUS_MAX KNET_THREAD_RUNNING + 1
+#define KNET_THREAD_UNREGISTERED       0 /* thread does not exist */
+#define KNET_THREAD_REGISTERED         1 /* thread has been registered before  pthread_create invocation.
+                                            make sure threads are registered before calling wait_all_thread_status */
+#define KNET_THREAD_STARTED            2 /* thread has reported to be running */
+#define KNET_THREAD_STOPPED            3 /* thread has returned */
+#define KNET_THREAD_STATUS_MAX KNET_THREAD_STOPPED + 1
 
 #define KNET_THREAD_TX         0
 #define KNET_THREAD_RX         1
 #define KNET_THREAD_HB         2
 #define KNET_THREAD_PMTUD      3
 #define KNET_THREAD_DST_LINK   4
-#ifndef HAVE_NETINET_SCTP_H
-#define KNET_THREAD_MAX                KNET_THREAD_DST_LINK + 1
-#else
+#ifdef HAVE_NETINET_SCTP_H
 #define KNET_THREAD_SCTP_LISTEN        5
 #define KNET_THREAD_SCTP_CONN  6
-#define KNET_THREAD_MAX                KNET_THREAD_SCTP_CONN + 1
 #endif
+#define KNET_THREAD_MAX                32
 
 #define timespec_diff(start, end, diff) \
 do { \
index 529ff153b4fa76fed86685695d326b5f58d18c6b..74e7ef07aebe3e86add81e6319457948faa89b6d 100644 (file)
@@ -53,7 +53,7 @@ void *_handle_dst_link_handler_thread(void *data)
        knet_handle_t knet_h = (knet_handle_t) data;
        struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
 
-       set_thread_status(knet_h, KNET_THREAD_DST_LINK, KNET_THREAD_RUNNING);
+       set_thread_status(knet_h, KNET_THREAD_DST_LINK, KNET_THREAD_STARTED);
 
        while (!shutdown_in_progress(knet_h)) {
                if (epoll_wait(knet_h->dst_link_handler_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000) >= 1)
index c3838129234a067918c0c8b5f867446b6b25a7a8..12aaa767c8c45a15dd8a8a3bf75dbd87814cdaf9 100644 (file)
@@ -186,7 +186,7 @@ void *_handle_heartbt_thread(void *data)
        knet_handle_t knet_h = (knet_handle_t) data;
        int i = 1;
 
-       set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_RUNNING);
+       set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_STARTED);
 
        /* preparing ping buffer */
        knet_h->pingbuf->kh_version = KNET_HEADER_VERSION;
index 820f273ccede4f9dbb78691b3da1ef614a15d99a..442a3465d15a59254acc9f161872c61ad9249570 100644 (file)
@@ -479,7 +479,7 @@ void *_handle_pmtud_link_thread(void *data)
        int link_has_mtu;
        int force_run = 0;
 
-       set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_RUNNING);
+       set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STARTED);
 
        knet_h->data_mtu = KNET_PMTUD_MIN_MTU_V4 - KNET_HEADER_ALL_SIZE - knet_h->sec_header_size;
 
index 49d574ebb2a64ded8193b324c4a2b01dabbf128e..d6244e13a9333236cbcdba96e8d34e2af35edea8 100644 (file)
@@ -815,7 +815,7 @@ void *_handle_recv_from_links_thread(void *data)
        struct knet_mmsghdr msg[PCKT_RX_BUFS];
        struct iovec iov_in[PCKT_RX_BUFS];
 
-       set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_RUNNING);
+       set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED);
 
        memset(&msg, 0, sizeof(msg));
 
index 3ab075ca405ad98326eefebbab9093dcb1e9db5a..489168dad20099f0006ddc37b889292a22595652 100644 (file)
@@ -681,7 +681,7 @@ void *_handle_send_to_links_thread(void *data)
        struct msghdr msg;
        struct sockaddr_storage address;
 
-       set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_RUNNING);
+       set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED);
 
        memset(&iov_in, 0, sizeof(iov_in));
        iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata;
index 409ac6a719314ef4ee3c62a78c6b39e6f15f2beb..83fc359ca15b3c52e1148e665985d6a2bbe2e9a2 100644 (file)
@@ -626,7 +626,7 @@ static void *_sctp_connect_thread(void *data)
        sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
        struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
 
-       set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_RUNNING);
+       set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_STARTED);
 
        while (!shutdown_in_progress(knet_h)) {
                nev = epoll_wait(handle_info->connect_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000);
@@ -871,7 +871,7 @@ static void *_sctp_listen_thread(void *data)
        sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
        struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
 
-       set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_RUNNING);
+       set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_STARTED);
 
        while (!shutdown_in_progress(knet_h)) {
                nev = epoll_wait(handle_info->listen_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000);
@@ -1472,6 +1472,7 @@ int sctp_transport_init(knet_handle_t knet_h)
        /*
         * Start connect & listener threads
         */
+       set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_REGISTERED);
        savederrno = pthread_create(&handle_info->listen_thread, 0, _sctp_listen_thread, (void *) knet_h);
        if (savederrno) {
                err = -1;
@@ -1480,6 +1481,7 @@ int sctp_transport_init(knet_handle_t knet_h)
                goto exit_fail;
        }
 
+       set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_REGISTERED);
        savederrno = pthread_create(&handle_info->connect_thread, 0, _sctp_connect_thread, (void *) knet_h);
        if (savederrno) {
                err = -1;