]> git.proxmox.com Git - mirror_kronosnet.git/commitdiff
[threads] split send recv threads in separate file
authorFabio M. Di Nitto <fdinitto@redhat.com>
Sun, 20 Sep 2015 08:38:42 +0000 (10:38 +0200)
committerFabio M. Di Nitto <fdinitto@redhat.com>
Sun, 20 Sep 2015 08:38:42 +0000 (10:38 +0200)
Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
libknet/Makefile.am
libknet/handle.c
libknet/threads.c [deleted file]
libknet/threads.h [deleted file]
libknet/threads_common.h [new file with mode: 0644]
libknet/threads_dsthandler.c
libknet/threads_heartbeat.c
libknet/threads_pmtud.c
libknet/threads_send_recv.c [new file with mode: 0644]
libknet/threads_send_recv.h [new file with mode: 0644]
libknet/timediff_test.c

index 9ab0959175a1f88897751a9869f5c5286af9186b..eac623f34062291f8b85d130e0a311a63dc4e962 100644 (file)
@@ -24,10 +24,10 @@ sources                     = \
                          link.c \
                          logging.c \
                          nsscrypto.c \
-                         threads.c \
                          threads_dsthandler.c \
                          threads_heartbeat.c \
-                         threads_pmtud.c
+                         threads_pmtud.c \
+                         threads_send_recv.c
 
 include_HEADERS                = libknet.h
 
@@ -45,10 +45,11 @@ noinst_HEADERS              = \
                          logging.h \
                          nsscrypto.h \
                          onwire.h \
-                         threads.h \
+                         threads_common.h \
                          threads_dsthandler.h \
                          threads_heartbeat.h \
-                         threads_pmtud.h
+                         threads_pmtud.h \
+                         threads_send_recv.h
 
 lib_LTLIBRARIES                = libknet.la
 
index cfe3ba8bd82f8d1eb5110caf86f5b07ef2caac3b..8c3db0e8a43f574ea7f01ea1cea2a55cdd46e963 100644 (file)
 #include "internals.h"
 #include "crypto.h"
 #include "common.h"
-#include "threads.h"
+#include "threads_common.h"
 #include "threads_heartbeat.h"
 #include "threads_pmtud.h"
 #include "threads_dsthandler.h"
+#include "threads_send_recv.h"
 #include "logging.h"
 #include "onwire.h"
 
diff --git a/libknet/threads.c b/libknet/threads.c
deleted file mode 100644 (file)
index 556eba6..0000000
+++ /dev/null
@@ -1,615 +0,0 @@
-/*
- * Copyright (C) 2010-2012 Red Hat, Inc.  All rights reserved.
- *
- * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
- *          Federico Simoncelli <fsimon@kronosnet.org>
- *
- * This software licensed under GPL-2.0+, LGPL-2.0+
- */
-
-#include "config.h"
-
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-#include <errno.h>
-#include <pthread.h>
-#include <sys/epoll.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <stdio.h>
-#include <time.h>
-#include <sys/uio.h>
-#include <math.h>
-
-#include "internals.h"
-#include "onwire.h"
-#include "crypto.h"
-#include "common.h"
-#include "host.h"
-#include "logging.h"
-#include "listener.h"
-#include "link.h"
-#include "threads.h"
-
-static void _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, unsigned char *outbuf, ssize_t outlen)
-{
-       int link_idx;
-
-       if (pthread_mutex_lock(&dst_host->active_links_mutex) != 0) {
-               log_debug(knet_h, KNET_SUB_SEND_T, "Unable to get active links mutex");
-               return;
-       }
-
-       for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
-               if (sendto(dst_host->link[dst_host->active_links[link_idx]].listener_sock,
-                          outbuf, outlen, MSG_DONTWAIT,
-                          (struct sockaddr *) &dst_host->link[dst_host->active_links[link_idx]].dst_addr,
-                          sizeof(struct sockaddr_storage)) < 0) {
-                       log_debug(knet_h, KNET_SUB_SEND_T, "Unable to send data packet to host %s (%u) link %s:%s (%u): %s",
-                                 dst_host->name, dst_host->host_id,
-                                 dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
-                                 dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
-                                 dst_host->link[dst_host->active_links[link_idx]].link_id,
-                                 strerror(errno));
-               }
-
-               if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
-                   (dst_host->active_link_entries > 1)) {
-                       uint8_t cur_link_id = dst_host->active_links[0];
-
-                       memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
-                       dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
-
-                       break;
-               }
-       }
-
-       pthread_mutex_unlock(&dst_host->active_links_mutex);
-
-       return;
-}
-
-static void _handle_send_to_links(knet_handle_t knet_h, int sockfd)
-{
-       ssize_t inlen = 0, len, outlen, frag_len;
-       struct knet_host *dst_host;
-       uint16_t dst_host_ids[KNET_MAX_HOST];
-       size_t dst_host_ids_entries = 0;
-       int bcast = 1;
-       unsigned char *outbuf = (unsigned char *)knet_h->send_to_links_buf;
-       struct knet_hostinfo *knet_hostinfo;
-       struct iovec iov_in;
-       unsigned int temp_data_mtu;
-
-       if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) {
-               log_debug(knet_h, KNET_SUB_SEND_T, "Unable to get read lock");
-               goto host_unlock;
-       }
-
-       memset(&iov_in, 0, sizeof(iov_in));
-       iov_in.iov_base = (void *)knet_h->send_to_links_buf->khp_data_userdata;
-       iov_in.iov_len = KNET_MAX_PACKET_SIZE;
-
-       inlen = readv(sockfd, &iov_in, 1);
-
-       if (inlen < 0) {
-               log_err(knet_h, KNET_SUB_SEND_T, "Unrecoverable error: %s", strerror(errno));
-               goto out_unlock;
-       }
-
-       if (inlen == 0) {
-               log_err(knet_h, KNET_SUB_SEND_T, "Unrecoverable error! Got 0 bytes from socket!");
-               /* TODO: disconnection, should never happen! */
-               goto out_unlock;
-       }
-
-       if ((knet_h->enabled != 1) &&
-           (knet_h->send_to_links_buf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
-               log_debug(knet_h, KNET_SUB_SEND_T, "Received data packet but forwarding is disabled");
-               goto out_unlock;
-       }
-
-       /*
-        * move this into a separate function to expand on
-        * extra switching rules
-        */
-       switch(knet_h->send_to_links_buf->kh_type) {
-               case KNET_HEADER_TYPE_DATA:
-                       if (knet_h->dst_host_filter_fn) {
-                               bcast = knet_h->dst_host_filter_fn(
-                                               (const unsigned char *)knet_h->send_to_links_buf->khp_data_userdata,
-                                               inlen,
-                                               knet_h->send_to_links_buf->kh_node,
-                                               dst_host_ids,
-                                               &dst_host_ids_entries);
-                               if (bcast < 0) {
-                                       log_debug(knet_h, KNET_SUB_SEND_T, "Error from dst_host_filter_fn: %d", bcast);
-                                       goto out_unlock;
-                               }
-
-                               if ((!bcast) && (!dst_host_ids_entries)) {
-                                       log_debug(knet_h, KNET_SUB_SEND_T, "Message is unicast but no dst_host_ids_entries");
-                                       goto out_unlock;
-                               }
-                       }
-                       break;
-               case KNET_HEADER_TYPE_HOST_INFO:
-                       knet_hostinfo = (struct knet_hostinfo *)knet_h->send_to_links_buf->khp_data_userdata;
-                       if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
-                               bcast = 0;
-                               dst_host_ids[0] = ntohs(knet_hostinfo->khi_dst_node_id);
-                               dst_host_ids_entries = 1;
-                       }
-                       break;
-               default:
-                       log_warn(knet_h, KNET_SUB_SEND_T, "Receiving unknown messages from socket");
-                       goto out_unlock;
-                       break;
-       }
-
-       if (!knet_h->data_mtu) {
-               /*
-                * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
-                */
-               log_debug(knet_h, KNET_SUB_SEND_T,
-                         "Received data packet but data MTU is still unknown."
-                         " Packet might not be delivered."
-                         " Assuming mininum IPv4 mtu (%d)",
-                         KNET_PMTUD_MIN_MTU_V4);
-               temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
-       } else {
-               /*
-                * take a copy of the mtu to avoid value changing under
-                * our feet while we are sending a fragmented pckt
-                */
-               temp_data_mtu = knet_h->data_mtu;
-       }
-
-       frag_len = inlen;
-
-       knet_h->send_to_links_buf->khp_data_frag_seq = 0;
-       knet_h->send_to_links_buf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
-
-       if (knet_h->send_to_links_buf->khp_data_frag_num > 1) {
-               log_debug(knet_h, KNET_SUB_SEND_T, "Current inlen: %zu data mtu: %u frags: %d",
-                         inlen, temp_data_mtu, knet_h->send_to_links_buf->khp_data_frag_num);
-       }
-
-       if (!bcast) {
-               int host_idx;
-
-               while (knet_h->send_to_links_buf->khp_data_frag_seq < knet_h->send_to_links_buf->khp_data_frag_num) {
-
-                       knet_h->send_to_links_buf->khp_data_frag_seq++;
-
-                       if (frag_len > temp_data_mtu) {
-                               outlen = len = temp_data_mtu + KNET_HEADER_DATA_SIZE;
-                       } else {
-                               outlen = len = frag_len + KNET_HEADER_DATA_SIZE;
-                       }
-
-                       for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
-                               dst_host = knet_h->host_index[dst_host_ids[host_idx]];
-                               if (!dst_host) {
-                                       log_debug(knet_h, KNET_SUB_SEND_T, "unicast packet, host not found");
-                                       continue;
-                               }
-
-                               if (knet_h->send_to_links_buf->khp_data_frag_seq == 1) {
-                                       knet_h->send_to_links_buf->khp_data_seq_num = htons(++dst_host->ucast_seq_num_tx);
-                               } else {
-                                       knet_h->send_to_links_buf->khp_data_seq_num = htons(dst_host->ucast_seq_num_tx);
-                               }
-
-                               if (knet_h->crypto_instance) {
-                                       if (crypto_encrypt_and_sign(knet_h,
-                                                           (const unsigned char *)knet_h->send_to_links_buf,
-                                                           len,
-                                                           knet_h->send_to_links_buf_crypt,
-                                                           &outlen) < 0) {
-                                               log_debug(knet_h, KNET_SUB_SEND_T, "Unable to encrypt unicast packet");
-                                               goto out_unlock;
-                                       }
-                                       outbuf = knet_h->send_to_links_buf_crypt;
-                               }
-
-                               _dispatch_to_links(knet_h, dst_host, outbuf, outlen);
-
-                       }
-
-                       if (frag_len > temp_data_mtu) {
-                               frag_len = frag_len - temp_data_mtu;
-                               /*
-                                * we can't verify this memmove till reassembly code is implemented
-                                */
-                               memmove(knet_h->send_to_links_buf->khp_data_userdata,
-                                       knet_h->send_to_links_buf->khp_data_userdata + temp_data_mtu,
-                                       temp_data_mtu);
-                       }
-               }
-       } else {
-               knet_h->send_to_links_buf->khp_data_seq_num = htons(++knet_h->bcast_seq_num_tx);
-
-               while (knet_h->send_to_links_buf->khp_data_frag_seq < knet_h->send_to_links_buf->khp_data_frag_num) {
-
-                       knet_h->send_to_links_buf->khp_data_frag_seq++;
-
-                       if (frag_len > temp_data_mtu) {
-                               outlen = len = temp_data_mtu + KNET_HEADER_DATA_SIZE;
-                       } else {
-                               outlen = len = frag_len + KNET_HEADER_DATA_SIZE;
-                       }
-
-                       if (knet_h->crypto_instance) {
-                               if (crypto_encrypt_and_sign(knet_h,
-                                                   (const unsigned char *)knet_h->send_to_links_buf,
-                                                   len,
-                                                   knet_h->send_to_links_buf_crypt,
-                                                   &outlen) < 0) {
-                                       log_debug(knet_h, KNET_SUB_SEND_T, "Unable to encrypt mcast/bcast packet");
-                                       goto out_unlock;
-                               }
-                               outbuf = knet_h->send_to_links_buf_crypt;
-                       }
-
-                       for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
-                               _dispatch_to_links(knet_h, dst_host, outbuf, outlen);
-                       }
-
-                       if (frag_len > temp_data_mtu) {
-                               frag_len = frag_len - temp_data_mtu;
-                               /*
-                                * we can't verify this memmove till reassembly code is implemented
-                                */
-                               memmove(knet_h->send_to_links_buf->khp_data_userdata,
-                                       knet_h->send_to_links_buf->khp_data_userdata + temp_data_mtu,
-                                       temp_data_mtu);
-                       }
-               }
-       }
-
-out_unlock:
-       pthread_rwlock_unlock(&knet_h->list_rwlock);
-
-host_unlock:
-       if ((inlen > 0) && (knet_h->send_to_links_buf->kh_type == KNET_HEADER_TYPE_HOST_INFO)) {
-               if (pthread_mutex_lock(&knet_h->host_mutex) != 0)
-                       log_debug(knet_h, KNET_SUB_SEND_T, "Unable to get mutex lock");
-               pthread_cond_signal(&knet_h->host_cond);
-               pthread_mutex_unlock(&knet_h->host_mutex);
-       }
-}
-
-static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd)
-{
-       ssize_t len, outlen;
-       struct sockaddr_storage address;
-       socklen_t addrlen;
-       struct knet_host *src_host;
-       struct knet_link *src_link;
-       unsigned long long latency_last;
-       uint16_t dst_host_ids[KNET_MAX_HOST];
-       size_t dst_host_ids_entries = 0;
-       int bcast = 1;
-       struct timespec recvtime;
-       unsigned char *outbuf = (unsigned char *)knet_h->recv_from_links_buf;
-       struct knet_hostinfo *knet_hostinfo;
-       struct iovec iov_out[1];
-
-       if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) {
-               log_debug(knet_h, KNET_SUB_LINK_T, "Unable to get read lock");
-               return;
-       }
-
-       addrlen = sizeof(struct sockaddr_storage);
-       len = recvfrom(sockfd, knet_h->recv_from_links_buf, KNET_DATABUFSIZE,
-               MSG_DONTWAIT, (struct sockaddr *) &address, &addrlen);
-
-       if (knet_h->crypto_instance) {
-               if (crypto_authenticate_and_decrypt(knet_h,
-                                                   (unsigned char *)knet_h->recv_from_links_buf,
-                                                   &len) < 0) {
-                       log_debug(knet_h, KNET_SUB_LINK_T, "Unable to decrypt/auth packet");
-                       goto exit_unlock;
-               }
-       }
-
-       if (len < (KNET_HEADER_SIZE + 1)) {
-               log_debug(knet_h, KNET_SUB_LINK_T, "Packet is too short");
-               goto exit_unlock;
-       }
-
-       if (knet_h->recv_from_links_buf->kh_version != KNET_HEADER_VERSION) {
-               log_debug(knet_h, KNET_SUB_LINK_T, "Packet version does not match");
-               goto exit_unlock;
-       }
-
-       /*
-        * skip fragmented pckt for now
-        */
-
-       if (knet_h->recv_from_links_buf->kh_type == KNET_HEADER_TYPE_DATA) {
-               if (knet_h->recv_from_links_buf->khp_data_frag_num > 1) {
-                       if (knet_h->recv_from_links_buf->khp_data_frag_seq == 1) {
-                               log_warn(knet_h, KNET_SUB_LINK_T, "pckt reassembly code not implemented! seq: %u frag: %u size: %zu",
-                                        ntohs(knet_h->recv_from_links_buf->khp_data_seq_num),
-                                        knet_h->recv_from_links_buf->khp_data_frag_seq,
-                                        len - KNET_HEADER_SIZE);
-                       }
-                       goto exit_unlock;
-               }
-       }
-
-       knet_h->recv_from_links_buf->kh_node = ntohs(knet_h->recv_from_links_buf->kh_node);
-       src_host = knet_h->host_index[knet_h->recv_from_links_buf->kh_node];
-       if (src_host == NULL) {  /* host not found */
-               log_debug(knet_h, KNET_SUB_LINK_T, "Unable to find source host for this packet");
-               goto exit_unlock;
-       }
-
-       src_link = NULL;
-
-       if ((knet_h->recv_from_links_buf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
-               src_link = src_host->link +
-                               (knet_h->recv_from_links_buf->khp_ping_link % KNET_MAX_LINK);
-               if (src_link->dynamic == KNET_LINK_DYNIP) {
-                       if (memcmp(&src_link->dst_addr, &address, sizeof(struct sockaddr_storage)) != 0) {
-                               log_debug(knet_h, KNET_SUB_LINK_T, "host: %u link: %u appears to have changed ip address",
-                                         src_host->host_id, src_link->link_id);
-                               memcpy(&src_link->dst_addr, &address, sizeof(struct sockaddr_storage));
-                               if (getnameinfo((const struct sockaddr *)&src_link->dst_addr, sizeof(struct sockaddr_storage),
-                                               src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
-                                               src_link->status.dst_port, KNET_MAX_PORT_LEN,
-                                               NI_NUMERICHOST | NI_NUMERICSERV) != 0) {
-                                       log_debug(knet_h, KNET_SUB_LINK_T, "Unable to resolve ???");
-                                       snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
-                                       snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
-                               }
-                       }
-                       src_link->status.dynconnected = 1;
-               }
-       }
-
-       switch (knet_h->recv_from_links_buf->kh_type) {
-       case KNET_HEADER_TYPE_DATA:
-               if (knet_h->enabled != 1) /* data forward is disabled */
-                       break;
-
-               knet_h->recv_from_links_buf->khp_data_seq_num = ntohs(knet_h->recv_from_links_buf->khp_data_seq_num);
-
-               if (knet_h->dst_host_filter_fn) {
-                       int host_idx;
-                       int found = 0;
-
-                       bcast = knet_h->dst_host_filter_fn(
-                                       (const unsigned char *)knet_h->recv_from_links_buf->khp_data_userdata,
-                                       len,
-                                       knet_h->recv_from_links_buf->kh_node,
-                                       dst_host_ids,
-                                       &dst_host_ids_entries);
-                       if (bcast < 0) {
-                               log_debug(knet_h, KNET_SUB_LINK_T, "Error from dst_host_filter_fn: %d", bcast);
-                               goto exit_unlock;
-                       }
-
-                       if ((!bcast) && (!dst_host_ids_entries)) {
-                               log_debug(knet_h, KNET_SUB_LINK_T, "Message is unicast but no dst_host_ids_entries");
-                               goto exit_unlock;
-                       }
-
-                       /* check if we are dst for this packet */
-                       if (!bcast) {
-                               for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
-                                       if (dst_host_ids[host_idx] == knet_h->host_id) {
-                                               found = 1;
-                                               break;
-                                       }
-                               }
-                               if (!found) {
-                                       log_debug(knet_h, KNET_SUB_LINK_T, "Packet is not for us");
-                                       goto exit_unlock;
-                               }
-                       }
-               }
-
-               if (!_should_deliver(src_host, bcast, knet_h->recv_from_links_buf->khp_data_seq_num)) {
-                       if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
-                               log_debug(knet_h, KNET_SUB_LINK_T, "Packet has already been delivered");
-                       }
-                       goto exit_unlock;
-               }
-
-               memset(iov_out, 0, sizeof(iov_out));
-               iov_out[0].iov_base = (void *) knet_h->recv_from_links_buf->khp_data_userdata;
-               iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE;
-
-               if (writev(knet_h->sockfd, iov_out, 1) == iov_out[0].iov_len) {
-                       _has_been_delivered(src_host, bcast, knet_h->recv_from_links_buf->khp_data_seq_num);
-               } else {
-                       log_debug(knet_h, KNET_SUB_LINK_T, "Packet has not been delivered");
-               }
-
-               break;
-       case KNET_HEADER_TYPE_PING:
-               outlen = KNET_HEADER_PING_SIZE;
-               knet_h->recv_from_links_buf->kh_type = KNET_HEADER_TYPE_PONG;
-               knet_h->recv_from_links_buf->kh_node = htons(knet_h->host_id);
-
-               if (knet_h->crypto_instance) {
-                       if (crypto_encrypt_and_sign(knet_h,
-                                                   (const unsigned char *)knet_h->recv_from_links_buf,
-                                                   len,
-                                                   knet_h->recv_from_links_buf_crypt,
-                                                   &outlen) < 0) {
-                               log_debug(knet_h, KNET_SUB_LINK_T, "Unable to encrypt pong packet");
-                               break;
-                       }
-                       outbuf = knet_h->recv_from_links_buf_crypt;
-               }
-
-               sendto(src_link->listener_sock, outbuf, outlen, MSG_DONTWAIT,
-                               (struct sockaddr *) &src_link->dst_addr,
-                               sizeof(struct sockaddr_storage));
-
-               break;
-       case KNET_HEADER_TYPE_PONG:
-               clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
-
-               memcpy(&recvtime, &knet_h->recv_from_links_buf->khp_ping_time[0], sizeof(struct timespec));
-               timespec_diff(recvtime,
-                               src_link->status.pong_last, &latency_last);
-
-               src_link->status.latency =
-                       ((src_link->status.latency * src_link->latency_exp) +
-                       ((latency_last / 1000llu) *
-                               (src_link->latency_fix - src_link->latency_exp))) /
-                                       src_link->latency_fix;
-
-               if (src_link->status.latency < src_link->pong_timeout) {
-                       if (!src_link->status.connected) {
-                               if (src_link->received_pong >= src_link->pong_count) {
-                                       log_info(knet_h, KNET_SUB_LINK, "host: %u link: %u is up",
-                                                src_host->host_id, src_link->link_id);
-                                       _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1);
-                               } else {
-                                       src_link->received_pong++;
-                                       log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u received pong: %u",
-                                                 src_host->host_id, src_link->link_id, src_link->received_pong);
-                               }
-                       }
-               }
-
-               break;
-       case KNET_HEADER_TYPE_PMTUD:
-               outlen = KNET_HEADER_PMTUD_SIZE;
-               knet_h->recv_from_links_buf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
-               knet_h->recv_from_links_buf->kh_node = htons(knet_h->host_id);
-
-               if (knet_h->crypto_instance) {
-                       if (crypto_encrypt_and_sign(knet_h,
-                                                   (const unsigned char *)knet_h->recv_from_links_buf,
-                                                   len,
-                                                   knet_h->recv_from_links_buf_crypt,
-                                                   &outlen) < 0) {
-                               log_debug(knet_h, KNET_SUB_LINK_T, "Unable to encrypt PMTUd reply packet");
-                               break;
-                       }
-                       outbuf = knet_h->recv_from_links_buf_crypt;
-               }
-
-               sendto(src_link->listener_sock, outbuf, outlen, MSG_DONTWAIT,
-                               (struct sockaddr *) &src_link->dst_addr,
-                               sizeof(struct sockaddr_storage));
-
-               break;
-       case KNET_HEADER_TYPE_PMTUD_REPLY:
-               if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
-                       log_debug(knet_h, KNET_SUB_LINK_T, "Unable to get mutex lock");
-                       break;
-               }
-               src_link->last_recv_mtu = knet_h->recv_from_links_buf->khp_pmtud_size;
-               pthread_cond_signal(&knet_h->pmtud_cond);
-               pthread_mutex_unlock(&knet_h->pmtud_mutex);
-               break;
-       case KNET_HEADER_TYPE_HOST_INFO:
-               knet_hostinfo = (struct knet_hostinfo *)knet_h->recv_from_links_buf->khp_data_userdata;
-               if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
-                       knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
-               }
-               switch(knet_hostinfo->khi_type) {
-                       case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
-                               src_link = src_host->link +
-                                       (knet_hostinfo->khip_link_status_link_id % KNET_MAX_LINK);
-                               /*
-                                * basically if the node is coming back to life from a crash
-                                * we should receive a host info where local previous status == remote current status
-                                * and so we can detect that node is showing up again
-                                * we need to clear cbuffers and notify the node of our status by resending our host info
-                                */
-                               if ((src_link->remoteconnected == KNET_HOSTINFO_LINK_STATUS_UP) &&
-                                   (src_link->remoteconnected == knet_hostinfo->khip_link_status_status)) {
-                                       src_link->host_info_up_sent = 0;
-                               }
-                               src_link->remoteconnected = knet_hostinfo->khip_link_status_status;
-                               if (src_link->remoteconnected == KNET_HOSTINFO_LINK_STATUS_DOWN) {
-                                       /*
-                                        * if a host is disconnecting clean, we note that in donnotremoteupdate
-                                        * so that we don't send host info back immediately but we wait
-                                        * for the node to send an update when it's alive again
-                                        */
-                                       src_link->host_info_up_sent = 0;
-                                       src_link->donnotremoteupdate = 1;
-                               } else {
-                                       src_link->donnotremoteupdate = 0;
-                               }
-                               log_debug(knet_h, KNET_SUB_LINK, "host message up/down. from host: %u link: %u remote connected: %u",
-                                         src_host->host_id,
-                                         src_link->link_id,
-                                         src_link->remoteconnected);
-                               if (_host_dstcache_update_async(knet_h, src_host)) {
-                                       log_debug(knet_h, KNET_SUB_LINK,
-                                                 "Unable to update switch cache for host: %u link: %u remote connected: %u)",
-                                                 src_host->host_id,
-                                                 src_link->link_id,
-                                                 src_link->remoteconnected);
-                               }
-                               break;
-                       case KNET_HOSTINFO_TYPE_LINK_TABLE:
-                               break;
-                       default:
-                               log_warn(knet_h, KNET_SUB_LINK, "Receiving unknown host info message from host %u", src_host->host_id);
-                               break;
-               }
-               break;
-       default:
-               goto exit_unlock;
-       }
-
- exit_unlock:
-       pthread_rwlock_unlock(&knet_h->list_rwlock);
-}
-
-void *_handle_send_to_links_thread(void *data)
-{
-       knet_handle_t knet_h = (knet_handle_t) data;
-       struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
-       int i, nev;
-
-       /* preparing data buffer */
-       knet_h->send_to_links_buf->kh_version = KNET_HEADER_VERSION;
-       knet_h->send_to_links_buf->kh_node = htons(knet_h->host_id);
-
-       while (!knet_h->fini_in_progress) {
-               nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
-
-               for (i = 0; i < nev; i++) {
-                       if (events[i].data.fd == knet_h->sockfd) {
-                               knet_h->send_to_links_buf->kh_type = KNET_HEADER_TYPE_DATA;
-                       } else {
-                               knet_h->send_to_links_buf->kh_type = KNET_HEADER_TYPE_HOST_INFO;
-                       }
-                       _handle_send_to_links(knet_h, events[i].data.fd);
-               }
-       }
-
-       return NULL;
-
-}
-
-void *_handle_recv_from_links_thread(void *data)
-{
-       int i, nev;
-       knet_handle_t knet_h = (knet_handle_t) data;
-       struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
-
-       while (!knet_h->fini_in_progress) {
-               nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
-
-               for (i = 0; i < nev; i++) {
-                       _handle_recv_from_links(knet_h, events[i].data.fd);
-               }
-       }
-
-       return NULL;
-}
diff --git a/libknet/threads.h b/libknet/threads.h
deleted file mode 100644 (file)
index 319f81b..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (C) 2010-2012 Red Hat, Inc.  All rights reserved.
- *
- * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
- *          Federico Simoncelli <fsimon@kronosnet.org>
- *
- * This software licensed under GPL-2.0+, LGPL-2.0+
- */
-
-#ifndef __THREADS_H__
-#define __THREADS_H__
-
-#define timespec_diff(start, end, diff) \
-do { \
-       if (end.tv_sec > start.tv_sec) \
-               *(diff) = ((end.tv_sec - start.tv_sec) * 1000000000llu) \
-                                       + end.tv_nsec - start.tv_nsec; \
-       else \
-               *(diff) = end.tv_nsec - start.tv_nsec; \
-} while (0);
-
-#define KNET_EPOLL_MAX_EVENTS 8
-
-void *_handle_send_to_links_thread(void *data);
-void *_handle_recv_from_links_thread(void *data);
-
-#endif
diff --git a/libknet/threads_common.h b/libknet/threads_common.h
new file mode 100644 (file)
index 0000000..db698a9
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (C) 2010-2012 Red Hat, Inc.  All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ *          Federico Simoncelli <fsimon@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#ifndef __THREADS_COMMON_H__
+#define __THREADS_COMMON_H__
+
+#define timespec_diff(start, end, diff) \
+do { \
+       if (end.tv_sec > start.tv_sec) \
+               *(diff) = ((end.tv_sec - start.tv_sec) * 1000000000llu) \
+                                       + end.tv_nsec - start.tv_nsec; \
+       else \
+               *(diff) = end.tv_nsec - start.tv_nsec; \
+} while (0);
+
+#define KNET_EPOLL_MAX_EVENTS 8
+
+#endif
index 64e17c00ada95b37467fb2d5de1344bf4a9d2fcc..3d166242864ea465efc688a975c6540e4470e435 100644 (file)
@@ -30,7 +30,7 @@
 #include "logging.h"
 #include "listener.h"
 #include "link.h"
-#include "threads.h"
+#include "threads_common.h"
 #include "threads_dsthandler.h"
 
 static void _handle_dst_link_updates(knet_handle_t knet_h)
index 1c295b9297915b3a41f10474a3b4f5cc89b266de..48f2645f499e9784781fd98ee72ba6825c131148 100644 (file)
@@ -30,7 +30,7 @@
 #include "logging.h"
 #include "listener.h"
 #include "link.h"
-#include "threads.h"
+#include "threads_common.h"
 #include "threads_heartbeat.h"
 
 #define KNET_PING_TIMERES 200000
index c9389b81934c400cf6b65934c54e3bf9bc9467b0..f98d982f4c35fe011f85c36884e86e0036200859 100644 (file)
@@ -30,7 +30,7 @@
 #include "logging.h"
 #include "listener.h"
 #include "link.h"
-#include "threads.h"
+#include "threads_common.h"
 #include "threads_pmtud.h"
 
 static void _handle_check_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link)
diff --git a/libknet/threads_send_recv.c b/libknet/threads_send_recv.c
new file mode 100644 (file)
index 0000000..bf5d3bc
--- /dev/null
@@ -0,0 +1,616 @@
+/*
+ * Copyright (C) 2010-2012 Red Hat, Inc.  All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ *          Federico Simoncelli <fsimon@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#include "config.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <pthread.h>
+#include <sys/epoll.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <stdio.h>
+#include <time.h>
+#include <sys/uio.h>
+#include <math.h>
+
+#include "internals.h"
+#include "onwire.h"
+#include "crypto.h"
+#include "common.h"
+#include "host.h"
+#include "logging.h"
+#include "listener.h"
+#include "link.h"
+#include "threads_common.h"
+#include "threads_send_recv.h"
+
+static void _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, unsigned char *outbuf, ssize_t outlen)
+{
+       int link_idx;
+
+       if (pthread_mutex_lock(&dst_host->active_links_mutex) != 0) {
+               log_debug(knet_h, KNET_SUB_SEND_T, "Unable to get active links mutex");
+               return;
+       }
+
+       for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
+               if (sendto(dst_host->link[dst_host->active_links[link_idx]].listener_sock,
+                          outbuf, outlen, MSG_DONTWAIT,
+                          (struct sockaddr *) &dst_host->link[dst_host->active_links[link_idx]].dst_addr,
+                          sizeof(struct sockaddr_storage)) < 0) {
+                       log_debug(knet_h, KNET_SUB_SEND_T, "Unable to send data packet to host %s (%u) link %s:%s (%u): %s",
+                                 dst_host->name, dst_host->host_id,
+                                 dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
+                                 dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
+                                 dst_host->link[dst_host->active_links[link_idx]].link_id,
+                                 strerror(errno));
+               }
+
+               if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
+                   (dst_host->active_link_entries > 1)) {
+                       uint8_t cur_link_id = dst_host->active_links[0];
+
+                       memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
+                       dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
+
+                       break;
+               }
+       }
+
+       pthread_mutex_unlock(&dst_host->active_links_mutex);
+
+       return;
+}
+
+static void _handle_send_to_links(knet_handle_t knet_h, int sockfd)
+{
+       ssize_t inlen = 0, len, outlen, frag_len;
+       struct knet_host *dst_host;
+       uint16_t dst_host_ids[KNET_MAX_HOST];
+       size_t dst_host_ids_entries = 0;
+       int bcast = 1;
+       unsigned char *outbuf = (unsigned char *)knet_h->send_to_links_buf;
+       struct knet_hostinfo *knet_hostinfo;
+       struct iovec iov_in;
+       unsigned int temp_data_mtu;
+
+       if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) {
+               log_debug(knet_h, KNET_SUB_SEND_T, "Unable to get read lock");
+               goto host_unlock;
+       }
+
+       memset(&iov_in, 0, sizeof(iov_in));
+       iov_in.iov_base = (void *)knet_h->send_to_links_buf->khp_data_userdata;
+       iov_in.iov_len = KNET_MAX_PACKET_SIZE;
+
+       inlen = readv(sockfd, &iov_in, 1);
+
+       if (inlen < 0) {
+               log_err(knet_h, KNET_SUB_SEND_T, "Unrecoverable error: %s", strerror(errno));
+               goto out_unlock;
+       }
+
+       if (inlen == 0) {
+               log_err(knet_h, KNET_SUB_SEND_T, "Unrecoverable error! Got 0 bytes from socket!");
+               /* TODO: disconnection, should never happen! */
+               goto out_unlock;
+       }
+
+       if ((knet_h->enabled != 1) &&
+           (knet_h->send_to_links_buf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
+               log_debug(knet_h, KNET_SUB_SEND_T, "Received data packet but forwarding is disabled");
+               goto out_unlock;
+       }
+
+       /*
+        * move this into a separate function to expand on
+        * extra switching rules
+        */
+       switch(knet_h->send_to_links_buf->kh_type) {
+               case KNET_HEADER_TYPE_DATA:
+                       if (knet_h->dst_host_filter_fn) {
+                               bcast = knet_h->dst_host_filter_fn(
+                                               (const unsigned char *)knet_h->send_to_links_buf->khp_data_userdata,
+                                               inlen,
+                                               knet_h->send_to_links_buf->kh_node,
+                                               dst_host_ids,
+                                               &dst_host_ids_entries);
+                               if (bcast < 0) {
+                                       log_debug(knet_h, KNET_SUB_SEND_T, "Error from dst_host_filter_fn: %d", bcast);
+                                       goto out_unlock;
+                               }
+
+                               if ((!bcast) && (!dst_host_ids_entries)) {
+                                       log_debug(knet_h, KNET_SUB_SEND_T, "Message is unicast but no dst_host_ids_entries");
+                                       goto out_unlock;
+                               }
+                       }
+                       break;
+               case KNET_HEADER_TYPE_HOST_INFO:
+                       knet_hostinfo = (struct knet_hostinfo *)knet_h->send_to_links_buf->khp_data_userdata;
+                       if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
+                               bcast = 0;
+                               dst_host_ids[0] = ntohs(knet_hostinfo->khi_dst_node_id);
+                               dst_host_ids_entries = 1;
+                       }
+                       break;
+               default:
+                       log_warn(knet_h, KNET_SUB_SEND_T, "Receiving unknown messages from socket");
+                       goto out_unlock;
+                       break;
+       }
+
+       if (!knet_h->data_mtu) {
+               /*
+                * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
+                */
+               log_debug(knet_h, KNET_SUB_SEND_T,
+                         "Received data packet but data MTU is still unknown."
+                         " Packet might not be delivered."
+                         " Assuming mininum IPv4 mtu (%d)",
+                         KNET_PMTUD_MIN_MTU_V4);
+               temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
+       } else {
+               /*
+                * take a copy of the mtu to avoid value changing under
+                * our feet while we are sending a fragmented pckt
+                */
+               temp_data_mtu = knet_h->data_mtu;
+       }
+
+       frag_len = inlen;
+
+       knet_h->send_to_links_buf->khp_data_frag_seq = 0;
+       knet_h->send_to_links_buf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
+
+       if (knet_h->send_to_links_buf->khp_data_frag_num > 1) {
+               log_debug(knet_h, KNET_SUB_SEND_T, "Current inlen: %zu data mtu: %u frags: %d",
+                         inlen, temp_data_mtu, knet_h->send_to_links_buf->khp_data_frag_num);
+       }
+
+       if (!bcast) {
+               int host_idx;
+
+               while (knet_h->send_to_links_buf->khp_data_frag_seq < knet_h->send_to_links_buf->khp_data_frag_num) {
+
+                       knet_h->send_to_links_buf->khp_data_frag_seq++;
+
+                       if (frag_len > temp_data_mtu) {
+                               outlen = len = temp_data_mtu + KNET_HEADER_DATA_SIZE;
+                       } else {
+                               outlen = len = frag_len + KNET_HEADER_DATA_SIZE;
+                       }
+
+                       for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
+                               dst_host = knet_h->host_index[dst_host_ids[host_idx]];
+                               if (!dst_host) {
+                                       log_debug(knet_h, KNET_SUB_SEND_T, "unicast packet, host not found");
+                                       continue;
+                               }
+
+                               if (knet_h->send_to_links_buf->khp_data_frag_seq == 1) {
+                                       knet_h->send_to_links_buf->khp_data_seq_num = htons(++dst_host->ucast_seq_num_tx);
+                               } else {
+                                       knet_h->send_to_links_buf->khp_data_seq_num = htons(dst_host->ucast_seq_num_tx);
+                               }
+
+                               if (knet_h->crypto_instance) {
+                                       if (crypto_encrypt_and_sign(knet_h,
+                                                           (const unsigned char *)knet_h->send_to_links_buf,
+                                                           len,
+                                                           knet_h->send_to_links_buf_crypt,
+                                                           &outlen) < 0) {
+                                               log_debug(knet_h, KNET_SUB_SEND_T, "Unable to encrypt unicast packet");
+                                               goto out_unlock;
+                                       }
+                                       outbuf = knet_h->send_to_links_buf_crypt;
+                               }
+
+                               _dispatch_to_links(knet_h, dst_host, outbuf, outlen);
+
+                       }
+
+                       if (frag_len > temp_data_mtu) {
+                               frag_len = frag_len - temp_data_mtu;
+                               /*
+                                * we can't verify this memmove till reassembly code is implemented
+                                */
+                               memmove(knet_h->send_to_links_buf->khp_data_userdata,
+                                       knet_h->send_to_links_buf->khp_data_userdata + temp_data_mtu,
+                                       temp_data_mtu);
+                       }
+               }
+       } else {
+               knet_h->send_to_links_buf->khp_data_seq_num = htons(++knet_h->bcast_seq_num_tx);
+
+               while (knet_h->send_to_links_buf->khp_data_frag_seq < knet_h->send_to_links_buf->khp_data_frag_num) {
+
+                       knet_h->send_to_links_buf->khp_data_frag_seq++;
+
+                       if (frag_len > temp_data_mtu) {
+                               outlen = len = temp_data_mtu + KNET_HEADER_DATA_SIZE;
+                       } else {
+                               outlen = len = frag_len + KNET_HEADER_DATA_SIZE;
+                       }
+
+                       if (knet_h->crypto_instance) {
+                               if (crypto_encrypt_and_sign(knet_h,
+                                                   (const unsigned char *)knet_h->send_to_links_buf,
+                                                   len,
+                                                   knet_h->send_to_links_buf_crypt,
+                                                   &outlen) < 0) {
+                                       log_debug(knet_h, KNET_SUB_SEND_T, "Unable to encrypt mcast/bcast packet");
+                                       goto out_unlock;
+                               }
+                               outbuf = knet_h->send_to_links_buf_crypt;
+                       }
+
+                       for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
+                               _dispatch_to_links(knet_h, dst_host, outbuf, outlen);
+                       }
+
+                       if (frag_len > temp_data_mtu) {
+                               frag_len = frag_len - temp_data_mtu;
+                               /*
+                                * we can't verify this memmove till reassembly code is implemented
+                                */
+                               memmove(knet_h->send_to_links_buf->khp_data_userdata,
+                                       knet_h->send_to_links_buf->khp_data_userdata + temp_data_mtu,
+                                       temp_data_mtu);
+                       }
+               }
+       }
+
+out_unlock:
+       pthread_rwlock_unlock(&knet_h->list_rwlock);
+
+host_unlock:
+       if ((inlen > 0) && (knet_h->send_to_links_buf->kh_type == KNET_HEADER_TYPE_HOST_INFO)) {
+               if (pthread_mutex_lock(&knet_h->host_mutex) != 0)
+                       log_debug(knet_h, KNET_SUB_SEND_T, "Unable to get mutex lock");
+               pthread_cond_signal(&knet_h->host_cond);
+               pthread_mutex_unlock(&knet_h->host_mutex);
+       }
+}
+
+static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd)
+{
+       ssize_t len, outlen;
+       struct sockaddr_storage address;
+       socklen_t addrlen;
+       struct knet_host *src_host;
+       struct knet_link *src_link;
+       unsigned long long latency_last;
+       uint16_t dst_host_ids[KNET_MAX_HOST];
+       size_t dst_host_ids_entries = 0;
+       int bcast = 1;
+       struct timespec recvtime;
+       unsigned char *outbuf = (unsigned char *)knet_h->recv_from_links_buf;
+       struct knet_hostinfo *knet_hostinfo;
+       struct iovec iov_out[1];
+
+       if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) {
+               log_debug(knet_h, KNET_SUB_LINK_T, "Unable to get read lock");
+               return;
+       }
+
+       addrlen = sizeof(struct sockaddr_storage);
+       len = recvfrom(sockfd, knet_h->recv_from_links_buf, KNET_DATABUFSIZE,
+               MSG_DONTWAIT, (struct sockaddr *) &address, &addrlen);
+
+       if (knet_h->crypto_instance) {
+               if (crypto_authenticate_and_decrypt(knet_h,
+                                                   (unsigned char *)knet_h->recv_from_links_buf,
+                                                   &len) < 0) {
+                       log_debug(knet_h, KNET_SUB_LINK_T, "Unable to decrypt/auth packet");
+                       goto exit_unlock;
+               }
+       }
+
+       if (len < (KNET_HEADER_SIZE + 1)) {
+               log_debug(knet_h, KNET_SUB_LINK_T, "Packet is too short");
+               goto exit_unlock;
+       }
+
+       if (knet_h->recv_from_links_buf->kh_version != KNET_HEADER_VERSION) {
+               log_debug(knet_h, KNET_SUB_LINK_T, "Packet version does not match");
+               goto exit_unlock;
+       }
+
+       /*
+        * skip fragmented pckt for now
+        */
+
+       if (knet_h->recv_from_links_buf->kh_type == KNET_HEADER_TYPE_DATA) {
+               if (knet_h->recv_from_links_buf->khp_data_frag_num > 1) {
+                       if (knet_h->recv_from_links_buf->khp_data_frag_seq == 1) {
+                               log_warn(knet_h, KNET_SUB_LINK_T, "pckt reassembly code not implemented! seq: %u frag: %u size: %zu",
+                                        ntohs(knet_h->recv_from_links_buf->khp_data_seq_num),
+                                        knet_h->recv_from_links_buf->khp_data_frag_seq,
+                                        len - KNET_HEADER_SIZE);
+                       }
+                       goto exit_unlock;
+               }
+       }
+
+       knet_h->recv_from_links_buf->kh_node = ntohs(knet_h->recv_from_links_buf->kh_node);
+       src_host = knet_h->host_index[knet_h->recv_from_links_buf->kh_node];
+       if (src_host == NULL) {  /* host not found */
+               log_debug(knet_h, KNET_SUB_LINK_T, "Unable to find source host for this packet");
+               goto exit_unlock;
+       }
+
+       src_link = NULL;
+
+       if ((knet_h->recv_from_links_buf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
+               src_link = src_host->link +
+                               (knet_h->recv_from_links_buf->khp_ping_link % KNET_MAX_LINK);
+               if (src_link->dynamic == KNET_LINK_DYNIP) {
+                       if (memcmp(&src_link->dst_addr, &address, sizeof(struct sockaddr_storage)) != 0) {
+                               log_debug(knet_h, KNET_SUB_LINK_T, "host: %u link: %u appears to have changed ip address",
+                                         src_host->host_id, src_link->link_id);
+                               memcpy(&src_link->dst_addr, &address, sizeof(struct sockaddr_storage));
+                               if (getnameinfo((const struct sockaddr *)&src_link->dst_addr, sizeof(struct sockaddr_storage),
+                                               src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
+                                               src_link->status.dst_port, KNET_MAX_PORT_LEN,
+                                               NI_NUMERICHOST | NI_NUMERICSERV) != 0) {
+                                       log_debug(knet_h, KNET_SUB_LINK_T, "Unable to resolve ???");
+                                       snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
+                                       snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
+                               }
+                       }
+                       src_link->status.dynconnected = 1;
+               }
+       }
+
+       switch (knet_h->recv_from_links_buf->kh_type) {
+       case KNET_HEADER_TYPE_DATA:
+               if (knet_h->enabled != 1) /* data forward is disabled */
+                       break;
+
+               knet_h->recv_from_links_buf->khp_data_seq_num = ntohs(knet_h->recv_from_links_buf->khp_data_seq_num);
+
+               if (knet_h->dst_host_filter_fn) {
+                       int host_idx;
+                       int found = 0;
+
+                       bcast = knet_h->dst_host_filter_fn(
+                                       (const unsigned char *)knet_h->recv_from_links_buf->khp_data_userdata,
+                                       len,
+                                       knet_h->recv_from_links_buf->kh_node,
+                                       dst_host_ids,
+                                       &dst_host_ids_entries);
+                       if (bcast < 0) {
+                               log_debug(knet_h, KNET_SUB_LINK_T, "Error from dst_host_filter_fn: %d", bcast);
+                               goto exit_unlock;
+                       }
+
+                       if ((!bcast) && (!dst_host_ids_entries)) {
+                               log_debug(knet_h, KNET_SUB_LINK_T, "Message is unicast but no dst_host_ids_entries");
+                               goto exit_unlock;
+                       }
+
+                       /* check if we are dst for this packet */
+                       if (!bcast) {
+                               for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
+                                       if (dst_host_ids[host_idx] == knet_h->host_id) {
+                                               found = 1;
+                                               break;
+                                       }
+                               }
+                               if (!found) {
+                                       log_debug(knet_h, KNET_SUB_LINK_T, "Packet is not for us");
+                                       goto exit_unlock;
+                               }
+                       }
+               }
+
+               if (!_should_deliver(src_host, bcast, knet_h->recv_from_links_buf->khp_data_seq_num)) {
+                       if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
+                               log_debug(knet_h, KNET_SUB_LINK_T, "Packet has already been delivered");
+                       }
+                       goto exit_unlock;
+               }
+
+               memset(iov_out, 0, sizeof(iov_out));
+               iov_out[0].iov_base = (void *) knet_h->recv_from_links_buf->khp_data_userdata;
+               iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE;
+
+               if (writev(knet_h->sockfd, iov_out, 1) == iov_out[0].iov_len) {
+                       _has_been_delivered(src_host, bcast, knet_h->recv_from_links_buf->khp_data_seq_num);
+               } else {
+                       log_debug(knet_h, KNET_SUB_LINK_T, "Packet has not been delivered");
+               }
+
+               break;
+       case KNET_HEADER_TYPE_PING:
+               outlen = KNET_HEADER_PING_SIZE;
+               knet_h->recv_from_links_buf->kh_type = KNET_HEADER_TYPE_PONG;
+               knet_h->recv_from_links_buf->kh_node = htons(knet_h->host_id);
+
+               if (knet_h->crypto_instance) {
+                       if (crypto_encrypt_and_sign(knet_h,
+                                                   (const unsigned char *)knet_h->recv_from_links_buf,
+                                                   len,
+                                                   knet_h->recv_from_links_buf_crypt,
+                                                   &outlen) < 0) {
+                               log_debug(knet_h, KNET_SUB_LINK_T, "Unable to encrypt pong packet");
+                               break;
+                       }
+                       outbuf = knet_h->recv_from_links_buf_crypt;
+               }
+
+               sendto(src_link->listener_sock, outbuf, outlen, MSG_DONTWAIT,
+                               (struct sockaddr *) &src_link->dst_addr,
+                               sizeof(struct sockaddr_storage));
+
+               break;
+       case KNET_HEADER_TYPE_PONG:
+               clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
+
+               memcpy(&recvtime, &knet_h->recv_from_links_buf->khp_ping_time[0], sizeof(struct timespec));
+               timespec_diff(recvtime,
+                               src_link->status.pong_last, &latency_last);
+
+               src_link->status.latency =
+                       ((src_link->status.latency * src_link->latency_exp) +
+                       ((latency_last / 1000llu) *
+                               (src_link->latency_fix - src_link->latency_exp))) /
+                                       src_link->latency_fix;
+
+               if (src_link->status.latency < src_link->pong_timeout) {
+                       if (!src_link->status.connected) {
+                               if (src_link->received_pong >= src_link->pong_count) {
+                                       log_info(knet_h, KNET_SUB_LINK, "host: %u link: %u is up",
+                                                src_host->host_id, src_link->link_id);
+                                       _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1);
+                               } else {
+                                       src_link->received_pong++;
+                                       log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u received pong: %u",
+                                                 src_host->host_id, src_link->link_id, src_link->received_pong);
+                               }
+                       }
+               }
+
+               break;
+       case KNET_HEADER_TYPE_PMTUD:
+               outlen = KNET_HEADER_PMTUD_SIZE;
+               knet_h->recv_from_links_buf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
+               knet_h->recv_from_links_buf->kh_node = htons(knet_h->host_id);
+
+               if (knet_h->crypto_instance) {
+                       if (crypto_encrypt_and_sign(knet_h,
+                                                   (const unsigned char *)knet_h->recv_from_links_buf,
+                                                   len,
+                                                   knet_h->recv_from_links_buf_crypt,
+                                                   &outlen) < 0) {
+                               log_debug(knet_h, KNET_SUB_LINK_T, "Unable to encrypt PMTUd reply packet");
+                               break;
+                       }
+                       outbuf = knet_h->recv_from_links_buf_crypt;
+               }
+
+               sendto(src_link->listener_sock, outbuf, outlen, MSG_DONTWAIT,
+                               (struct sockaddr *) &src_link->dst_addr,
+                               sizeof(struct sockaddr_storage));
+
+               break;
+       case KNET_HEADER_TYPE_PMTUD_REPLY:
+               if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
+                       log_debug(knet_h, KNET_SUB_LINK_T, "Unable to get mutex lock");
+                       break;
+               }
+               src_link->last_recv_mtu = knet_h->recv_from_links_buf->khp_pmtud_size;
+               pthread_cond_signal(&knet_h->pmtud_cond);
+               pthread_mutex_unlock(&knet_h->pmtud_mutex);
+               break;
+       case KNET_HEADER_TYPE_HOST_INFO:
+               knet_hostinfo = (struct knet_hostinfo *)knet_h->recv_from_links_buf->khp_data_userdata;
+               if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
+                       knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
+               }
+               switch(knet_hostinfo->khi_type) {
+                       case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
+                               src_link = src_host->link +
+                                       (knet_hostinfo->khip_link_status_link_id % KNET_MAX_LINK);
+                               /*
+                                * basically if the node is coming back to life from a crash
+                                * we should receive a host info where local previous status == remote current status
+                                * and so we can detect that node is showing up again
+                                * we need to clear cbuffers and notify the node of our status by resending our host info
+                                */
+                               if ((src_link->remoteconnected == KNET_HOSTINFO_LINK_STATUS_UP) &&
+                                   (src_link->remoteconnected == knet_hostinfo->khip_link_status_status)) {
+                                       src_link->host_info_up_sent = 0;
+                               }
+                               src_link->remoteconnected = knet_hostinfo->khip_link_status_status;
+                               if (src_link->remoteconnected == KNET_HOSTINFO_LINK_STATUS_DOWN) {
+                                       /*
+                                        * if a host is disconnecting clean, we note that in donnotremoteupdate
+                                        * so that we don't send host info back immediately but we wait
+                                        * for the node to send an update when it's alive again
+                                        */
+                                       src_link->host_info_up_sent = 0;
+                                       src_link->donnotremoteupdate = 1;
+                               } else {
+                                       src_link->donnotremoteupdate = 0;
+                               }
+                               log_debug(knet_h, KNET_SUB_LINK, "host message up/down. from host: %u link: %u remote connected: %u",
+                                         src_host->host_id,
+                                         src_link->link_id,
+                                         src_link->remoteconnected);
+                               if (_host_dstcache_update_async(knet_h, src_host)) {
+                                       log_debug(knet_h, KNET_SUB_LINK,
+                                                 "Unable to update switch cache for host: %u link: %u remote connected: %u)",
+                                                 src_host->host_id,
+                                                 src_link->link_id,
+                                                 src_link->remoteconnected);
+                               }
+                               break;
+                       case KNET_HOSTINFO_TYPE_LINK_TABLE:
+                               break;
+                       default:
+                               log_warn(knet_h, KNET_SUB_LINK, "Receiving unknown host info message from host %u", src_host->host_id);
+                               break;
+               }
+               break;
+       default:
+               goto exit_unlock;
+       }
+
+ exit_unlock:
+       pthread_rwlock_unlock(&knet_h->list_rwlock);
+}
+
+void *_handle_send_to_links_thread(void *data)
+{
+       knet_handle_t knet_h = (knet_handle_t) data;
+       struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
+       int i, nev;
+
+       /* preparing data buffer */
+       knet_h->send_to_links_buf->kh_version = KNET_HEADER_VERSION;
+       knet_h->send_to_links_buf->kh_node = htons(knet_h->host_id);
+
+       while (!knet_h->fini_in_progress) {
+               nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
+
+               for (i = 0; i < nev; i++) {
+                       if (events[i].data.fd == knet_h->sockfd) {
+                               knet_h->send_to_links_buf->kh_type = KNET_HEADER_TYPE_DATA;
+                       } else {
+                               knet_h->send_to_links_buf->kh_type = KNET_HEADER_TYPE_HOST_INFO;
+                       }
+                       _handle_send_to_links(knet_h, events[i].data.fd);
+               }
+       }
+
+       return NULL;
+
+}
+
+void *_handle_recv_from_links_thread(void *data)
+{
+       int i, nev;
+       knet_handle_t knet_h = (knet_handle_t) data;
+       struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
+
+       while (!knet_h->fini_in_progress) {
+               nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
+
+               for (i = 0; i < nev; i++) {
+                       _handle_recv_from_links(knet_h, events[i].data.fd);
+               }
+       }
+
+       return NULL;
+}
diff --git a/libknet/threads_send_recv.h b/libknet/threads_send_recv.h
new file mode 100644 (file)
index 0000000..319f81b
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2010-2012 Red Hat, Inc.  All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ *          Federico Simoncelli <fsimon@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#ifndef __THREADS_H__
+#define __THREADS_H__
+
+#define timespec_diff(start, end, diff) \
+do { \
+       if (end.tv_sec > start.tv_sec) \
+               *(diff) = ((end.tv_sec - start.tv_sec) * 1000000000llu) \
+                                       + end.tv_nsec - start.tv_nsec; \
+       else \
+               *(diff) = end.tv_nsec - start.tv_nsec; \
+} while (0);
+
+#define KNET_EPOLL_MAX_EVENTS 8
+
+void *_handle_send_to_links_thread(void *data);
+void *_handle_recv_from_links_thread(void *data);
+
+#endif
index bb2f14e18b2ffee90164aa6c74db2f552a57f7e2..ed3c7a84edb77f2ab13224327b87a4d5e049c2f6 100644 (file)
@@ -13,7 +13,7 @@
 #include <time.h>
 #include <stdlib.h>
 
-#include "threads.h"
+#include "threads_common.h"
 
 #define timespec_set(x, sec, nsec) \
 do { \