]> git.proxmox.com Git - mirror_kronosnet.git/commitdiff
knet: add trivial heartbeat feature
authorFederico Simoncelli <federico.simoncelli@gmail.com>
Wed, 10 Nov 2010 13:32:56 +0000 (14:32 +0100)
committerFederico Simoncelli <federico.simoncelli@gmail.com>
Wed, 10 Nov 2010 13:32:56 +0000 (14:32 +0100)
configure.ac
ring.c
ring.h

index 6abeff1001b5fbe73995b16e08497b74abb60348..62cf5a00f8ac07a73cd1a4c520341a05e240a2ac 100644 (file)
@@ -70,6 +70,7 @@ AC_C_INLINE
 
 # Checks for libraries.
 AC_CHECK_LIB([pthread], [pthread_create])
+AC_CHECK_LIB([rt], [clock_gettime])
 
 # Checks for header files.
 AC_CHECK_HEADERS([fcntl.h])
diff --git a/ring.c b/ring.c
index c6e14885ac6a5b11aedd718cd100c3e9d02e2d64..b81c3b29a8a3037cc8021be30726ae019ae35d9b 100644 (file)
--- a/ring.c
+++ b/ring.c
@@ -12,7 +12,7 @@
 #include "utils.h"
 
 #define KNET_MAX_EVENTS 8
-#define KNET_PING_TIMERES 5000
+#define KNET_PING_TIMERES 500
 #define KNET_BUFSIZE 2048
 
 struct __knet_handle {
@@ -240,18 +240,38 @@ static void knet_recv_frame(knet_handle_t knet_h, int sockfd)
        ssize_t len;
        struct sockaddr_storage address;
        socklen_t addrlen;
+       struct knet_host *i;
+       struct knet_link *j, *link_src;
+
+       if (pthread_rwlock_rdlock(&knet_h->host_rwlock) != 0)
+               return;
 
        len = recvfrom(sockfd, knet_h->buff, KNET_BUFSIZE,
                MSG_DONTWAIT, (struct sockaddr *) &address, &addrlen);
 
        if (len < sizeof(struct knet_frame))
-               return;
+               goto exit_unlock;
 
        if (ntohl(knet_h->buff->magic) != KNET_FRAME_MAGIC)
-               return;
+               goto exit_unlock;
 
        if (knet_h->buff->version != KNET_FRAME_VERSION)
-               return;
+               goto exit_unlock;
+
+       /* searching host/link, TODO: improve lookup */
+       link_src = NULL;
+
+       for (i = knet_h->host_head; i != NULL; i = i->next) {
+               for (j = i->link; j != NULL; j = j->next) {
+                       if (memcmp(&address, &j->address, addrlen) == 0) {
+                               link_src = j;
+                               break;
+                       }
+               }
+       }
+
+       if (link_src == NULL) /* host/link not found */
+               goto exit_unlock;
 
        switch (knet_h->buff->type) {
        case KNET_FRAME_DATA:
@@ -259,12 +279,68 @@ static void knet_recv_frame(knet_handle_t knet_h, int sockfd)
                        knet_h->buff + 1, len - sizeof(struct knet_frame));
                break;
        case KNET_FRAME_PING:
-               /* TODO: reply using KNET_FRAME_PONG */
+               knet_h->buff->type = KNET_FRAME_PONG;
+               sendto(j->sock, knet_h->buff, sizeof(struct knet_frame),
+                               MSG_DONTWAIT, (struct sockaddr *) &j->address,
+                               sizeof(struct sockaddr_storage));
                break;
        case KNET_FRAME_PONG:
-               /* TODO: find the link and mark enabled */
+               j->enabled = 1; /* TODO: might need write lock */
+               clock_gettime(CLOCK_MONOTONIC, &j->clk_pong);
                break;
        }
+
+ exit_unlock:
+       pthread_rwlock_unlock(&knet_h->host_rwlock);
+}
+
+static void knet_tsdiff(struct timespec *start, struct timespec *end, suseconds_t *diff)
+{
+       *diff = (end->tv_sec - start->tv_sec) * 1000000; /* micro-seconds */
+       *diff += (end->tv_nsec - start->tv_nsec) / 1000; /* micro-seconds */
+}
+
+static void knet_heartbeat_check(knet_handle_t knet_h)
+{
+       struct knet_host *i;
+       struct knet_link *j;
+       struct timespec clock_now;
+       suseconds_t diff_ping, diff_pong;
+
+       if (pthread_rwlock_rdlock(&knet_h->host_rwlock) != 0)
+               return;
+
+       if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0)
+               return;
+
+       for (i = knet_h->host_head; i != NULL; i = i->next) {
+               for (j = i->link; j != NULL; j = j->next) {
+                       knet_tsdiff(&j->clk_ping, &clock_now, &diff_ping);
+
+                       if (diff_ping > 1000000) {
+                               printf("diff_ping: %lu\n", diff_ping);
+                               knet_h->buff->type = KNET_FRAME_PING;
+                               sendto(j->sock, knet_h->buff,
+                                       sizeof(struct knet_frame),
+                                       MSG_DONTWAIT,
+                                       (struct sockaddr *) &j->address,
+                                       sizeof(struct sockaddr_storage));
+                               clock_gettime(CLOCK_MONOTONIC, &j->clk_ping);
+                               /* TODO: send ping */
+                       }
+
+                       if (j->enabled == 1) {
+                               knet_tsdiff(&j->clk_pong, &clock_now, &diff_pong);
+
+                               if (diff_pong > 3000000) {
+                                       printf("diff_pong: %lu\n", diff_pong);
+                                       j->enabled = 0; /* TODO: might need write lock */
+                               }
+                       }
+               }
+       }
+
+       pthread_rwlock_unlock(&knet_h->host_rwlock);
 }
 
 static void *knet_control_thread(void *data)
@@ -286,6 +362,8 @@ static void *knet_control_thread(void *data)
                                knet_recv_frame(knet_h, events[i].data.fd);
                        }
                }
+
+               knet_heartbeat_check(knet_h);
        }
 
        return NULL;
diff --git a/ring.h b/ring.h
index 964f39aba30a93c5d97cd17b53881975327e5858..45aaa530520f70b19535a420d78b65ae0220f7a5 100644 (file)
--- a/ring.h
+++ b/ring.h
@@ -17,8 +17,10 @@ struct knet_host {
 
 struct knet_link {
        int sock;
-       unsigned int enabled:1; /* link is enabled for data */
        struct sockaddr_storage address;
+       unsigned int enabled:1; /* link is enabled for data */
+       struct timespec clk_ping;
+       struct timespec clk_pong;
        struct knet_link *next;
 };