#include "utils.h"
#define KNET_MAX_EVENTS 8
-#define KNET_PING_TIMERES 5000
+#define KNET_PING_TIMERES 500
#define KNET_BUFSIZE 2048
struct __knet_handle {
ssize_t len;
struct sockaddr_storage address;
socklen_t addrlen;
+ struct knet_host *i;
+ struct knet_link *j, *link_src;
+
+ if (pthread_rwlock_rdlock(&knet_h->host_rwlock) != 0)
+ return;
len = recvfrom(sockfd, knet_h->buff, KNET_BUFSIZE,
MSG_DONTWAIT, (struct sockaddr *) &address, &addrlen);
if (len < sizeof(struct knet_frame))
- return;
+ goto exit_unlock;
if (ntohl(knet_h->buff->magic) != KNET_FRAME_MAGIC)
- return;
+ goto exit_unlock;
if (knet_h->buff->version != KNET_FRAME_VERSION)
- return;
+ goto exit_unlock;
+
+ /* searching host/link, TODO: improve lookup */
+ link_src = NULL;
+
+ for (i = knet_h->host_head; i != NULL; i = i->next) {
+ for (j = i->link; j != NULL; j = j->next) {
+ if (memcmp(&address, &j->address, addrlen) == 0) {
+ link_src = j;
+ break;
+ }
+ }
+ }
+
+ if (link_src == NULL) /* host/link not found */
+ goto exit_unlock;
switch (knet_h->buff->type) {
case KNET_FRAME_DATA:
knet_h->buff + 1, len - sizeof(struct knet_frame));
break;
case KNET_FRAME_PING:
- /* TODO: reply using KNET_FRAME_PONG */
+ knet_h->buff->type = KNET_FRAME_PONG;
+ sendto(j->sock, knet_h->buff, sizeof(struct knet_frame),
+ MSG_DONTWAIT, (struct sockaddr *) &j->address,
+ sizeof(struct sockaddr_storage));
break;
case KNET_FRAME_PONG:
- /* TODO: find the link and mark enabled */
+ j->enabled = 1; /* TODO: might need write lock */
+ clock_gettime(CLOCK_MONOTONIC, &j->clk_pong);
break;
}
+
+ exit_unlock:
+ pthread_rwlock_unlock(&knet_h->host_rwlock);
+}
+
+static void knet_tsdiff(struct timespec *start, struct timespec *end, suseconds_t *diff)
+{
+ *diff = (end->tv_sec - start->tv_sec) * 1000000; /* micro-seconds */
+ *diff += (end->tv_nsec - start->tv_nsec) / 1000; /* micro-seconds */
+}
+
+static void knet_heartbeat_check(knet_handle_t knet_h)
+{
+ struct knet_host *i;
+ struct knet_link *j;
+ struct timespec clock_now;
+ suseconds_t diff_ping, diff_pong;
+
+ if (pthread_rwlock_rdlock(&knet_h->host_rwlock) != 0)
+ return;
+
+ if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0)
+ return;
+
+ for (i = knet_h->host_head; i != NULL; i = i->next) {
+ for (j = i->link; j != NULL; j = j->next) {
+ knet_tsdiff(&j->clk_ping, &clock_now, &diff_ping);
+
+ if (diff_ping > 1000000) {
+ printf("diff_ping: %lu\n", diff_ping);
+ knet_h->buff->type = KNET_FRAME_PING;
+ sendto(j->sock, knet_h->buff,
+ sizeof(struct knet_frame),
+ MSG_DONTWAIT,
+ (struct sockaddr *) &j->address,
+ sizeof(struct sockaddr_storage));
+ clock_gettime(CLOCK_MONOTONIC, &j->clk_ping);
+ /* TODO: send ping */
+ }
+
+ if (j->enabled == 1) {
+ knet_tsdiff(&j->clk_pong, &clock_now, &diff_pong);
+
+ if (diff_pong > 3000000) {
+ printf("diff_pong: %lu\n", diff_pong);
+ j->enabled = 0; /* TODO: might need write lock */
+ }
+ }
+ }
+ }
+
+ pthread_rwlock_unlock(&knet_h->host_rwlock);
}
static void *knet_control_thread(void *data)
knet_recv_frame(knet_h, events[i].data.fd);
}
}
+
+ knet_heartbeat_check(knet_h);
}
return NULL;