{
int savederrno = 0;
- if (_init_socketpair(knet_h, knet_h->hostsockfd)) {
- savederrno = errno;
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s",
- strerror(savederrno));
- goto exit_fail;
- }
-
if (_init_socketpair(knet_h, knet_h->dstsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s",
static void _close_socks(knet_handle_t knet_h)
{
_close_socketpair(knet_h, knet_h->dstsockfd);
- _close_socketpair(knet_h, knet_h->hostsockfd);
}
static int _init_buffers(knet_handle_t knet_h)
goto exit_fail;
}
- memset(&ev, 0, sizeof(struct epoll_event));
- ev.events = EPOLLIN;
- ev.data.fd = knet_h->hostsockfd[0];
-
- if (epoll_ctl(knet_h->send_to_links_epollfd,
- EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) {
- savederrno = errno;
- log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s",
- strerror(savederrno));
- goto exit_fail;
- }
-
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->dstsockfd[0];
}
}
- epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev);
epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev);
close(knet_h->send_to_links_epollfd);
close(knet_h->recv_from_links_epollfd);
return 0;
}
-int _send_host_info(knet_handle_t knet_h, const void *data, const size_t datalen)
-{
- ssize_t ret = 0;
-
- if (knet_h->fini_in_progress) {
- return 0;
- }
-
- ret = sendto(knet_h->hostsockfd[1], data, datalen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
- if (ret < 0) {
- log_debug(knet_h, KNET_SUB_HOST, "Unable to write data to hostpipe. Error: %s", strerror(errno));
- return -1;
- }
- if ((size_t)ret != datalen) {
- log_debug(knet_h, KNET_SUB_HOST, "Unable to write all data to hostpipe. Expected: %zu, Written: %zd.", datalen, ret);
- return -1;
- }
-
- return 0;
-}
-
static void _clear_cbuffers(struct knet_host *host, seq_num_t rx_seq_num)
{
int i;
int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf);
void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf);
-int _send_host_info(knet_handle_t knet_h, const void *data, const size_t datalen);
int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host);
int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host);
#define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX + 1
-#define KNET_INTERNAL_DATA_CHANNEL KNET_DATAFD_MAX
-
/*
* Size of threads stack. Value is choosen by experimenting, how much is needed
* to sucesfully finish test suite, and at the time of writing patch it was
struct knet_sock sockfd[KNET_DATAFD_MAX + 1];
int logfd;
uint8_t log_levels[KNET_MAX_SUBSYSTEMS];
- int hostsockfd[2];
int dstsockfd[2];
int send_to_links_epollfd;
int recv_from_links_epollfd;
#include "libknet.h"
-#if 0
-
-/*
- * for future protocol extension (re-switching table calculation)
- */
-
-struct knet_hinfo_link {
- uint8_t khl_link_id;
- uint8_t khl_link_dynamic;
- uint8_t khl_link_priority;
- uint64_t khl_link_latency;
- char khl_link_dst_ipaddr[KNET_MAX_HOST_LEN];
- char khl_link_dst_port[KNET_MAX_PORT_LEN];
-} __attribute__((packed));
-
-struct knet_hinfo_link_table {
- knet_node_id_t khlt_node_id;
- uint8_t khlt_local; /* we have this node connected locally */
- struct knet_hinfo_link khlt_link[KNET_MAX_LINK]; /* info we send about each link in the node */
-} __attribute__((packed));
-
-struct link_table {
- knet_node_id_t khdt_host_entries;
- uint8_t khdt_host_maps[0]; /* array of knet_hinfo_link_table[khdt_host_entries] */
-} __attribute__((packed));
-#endif
-
-#define KNET_HOSTINFO_LINK_STATUS_DOWN 0
-#define KNET_HOSTINFO_LINK_STATUS_UP 1
-
-struct knet_hostinfo_payload_link_status {
- uint8_t khip_link_status_link_id; /* link id */
- uint8_t khip_link_status_status; /* up/down status */
-} __attribute__((packed));
-
-/*
- * union to reference possible individual payloads
- */
-
-union knet_hostinfo_payload {
- struct knet_hostinfo_payload_link_status knet_hostinfo_payload_link_status;
-} __attribute__((packed));
-
-/*
- * due to the nature of knet_hostinfo, we are currently
- * sending those data as part of knet_header_payload_data.khp_data_userdata
- * and avoid a union that increses knet_header_payload_data size
- * unnecessarely.
- * This might change later on depending on how we implement
- * host info exchange
- */
-
-#define KNET_HOSTINFO_TYPE_LINK_UP_DOWN 0 // UNUSED
-#define KNET_HOSTINFO_TYPE_LINK_TABLE 1 // NOT IMPLEMENTED
-
-#define KNET_HOSTINFO_UCAST 0 /* send info to a specific host */
-#define KNET_HOSTINFO_BCAST 1 /* send info to all known / connected hosts */
-
-struct knet_hostinfo {
- uint8_t khi_type; /* type of hostinfo we are sending */
- uint8_t khi_bcast; /* hostinfo destination bcast/ucast */
- knet_node_id_t khi_dst_node_id;/* used only if in ucast mode */
- union knet_hostinfo_payload khi_payload;
-} __attribute__((packed));
-
-#define KNET_HOSTINFO_ALL_SIZE sizeof(struct knet_hostinfo)
-#define KNET_HOSTINFO_SIZE (KNET_HOSTINFO_ALL_SIZE - sizeof(union knet_hostinfo_payload))
-#define KNET_HOSTINFO_LINK_STATUS_SIZE (KNET_HOSTINFO_SIZE + sizeof(struct knet_hostinfo_payload_link_status))
-
-#define khip_link_status_status khi_payload.knet_hostinfo_payload_link_status.khip_link_status_status
-#define khip_link_status_link_id khi_payload.knet_hostinfo_payload_link_status.khip_link_status_link_id
-
/*
* typedef uint64_t seq_num_t;
* #define SEQ_MAX UINT64_MAX
#define KNET_HEADER_VERSION 0x01 /* we currently support only one version */
#define KNET_HEADER_TYPE_DATA 0x00 /* pure data packet */
-#define KNET_HEADER_TYPE_HOST_INFO 0x01 /* host status information pckt */
#define KNET_HEADER_TYPE_PMSK 0x80 /* packet mask */
#define KNET_HEADER_TYPE_PING 0x81 /* heartbeat */
printf("KNET_HEADER_PING_SIZE: %zu (%zu)\n", KNET_HEADER_PING_SIZE, sizeof(struct knet_header_payload_ping));
printf("KNET_HEADER_PMTUD_SIZE: %zu (%zu)\n", KNET_HEADER_PMTUD_SIZE, sizeof(struct knet_header_payload_pmtud));
printf("KNET_HEADER_DATA_SIZE: %zu (%zu)\n", KNET_HEADER_DATA_SIZE, sizeof(struct knet_header_payload_data));
- printf("\n");
- printf("KNET_HOSTINFO_ALL_SIZE: %zu\n", KNET_HOSTINFO_ALL_SIZE);
- printf("KNET_HOSTINFO_SIZE: %zu\n", KNET_HOSTINFO_SIZE);
- printf("KNET_HOSTINFO_LINK_STATUS_SIZE: %zu (%zu)\n", KNET_HOSTINFO_LINK_STATUS_SIZE, sizeof(struct knet_hostinfo_payload_link_status));
return 0;
}
struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
ssize_t len = msg->msg_len;
- struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[1];
int8_t channel;
seq_num_t recv_seq_num;
}
switch (inbuf->kh_type) {
- case KNET_HEADER_TYPE_HOST_INFO:
case KNET_HEADER_TYPE_DATA:
/* data stats at the top for consistency with TX */
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
- if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
+ if (knet_h->enabled != 1) /* data forward is disabled */
+ break;
- if (knet_h->enabled != 1) /* data forward is disabled */
- break;
+ if (knet_h->dst_host_filter_fn) {
+ size_t host_idx;
+ int found = 0;
+
+ bcast = knet_h->dst_host_filter_fn(
+ knet_h->dst_host_filter_fn_private_data,
+ (const unsigned char *)inbuf->khp_data_userdata,
+ len - KNET_HEADER_DATA_SIZE,
+ KNET_NOTIFY_RX,
+ knet_h->host_id,
+ inbuf->kh_node,
+ &channel,
+ dst_host_ids,
+ &dst_host_ids_entries);
+ if (bcast < 0) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
+ log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
+ return;
+ }
- if (knet_h->dst_host_filter_fn) {
- size_t host_idx;
- int found = 0;
-
- bcast = knet_h->dst_host_filter_fn(
- knet_h->dst_host_filter_fn_private_data,
- (const unsigned char *)inbuf->khp_data_userdata,
- len - KNET_HEADER_DATA_SIZE,
- KNET_NOTIFY_RX,
- knet_h->host_id,
- inbuf->kh_node,
- &channel,
- dst_host_ids,
- &dst_host_ids_entries);
- if (bcast < 0) {
- pthread_mutex_unlock(&src_link->link_stats_mutex);
- log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
- return;
- }
+ if ((!bcast) && (!dst_host_ids_entries)) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
+ log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
+ return;
+ }
- if ((!bcast) && (!dst_host_ids_entries)) {
+ /* check if we are dst for this packet */
+ if (!bcast) {
+ if (dst_host_ids_entries > KNET_MAX_HOST) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
- log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
+ log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
return;
}
-
- /* check if we are dst for this packet */
- if (!bcast) {
- if (dst_host_ids_entries > KNET_MAX_HOST) {
- pthread_mutex_unlock(&src_link->link_stats_mutex);
- log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
- return;
- }
- for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
- if (dst_host_ids[host_idx] == knet_h->host_id) {
- found = 1;
- break;
- }
- }
- if (!found) {
- pthread_mutex_unlock(&src_link->link_stats_mutex);
- log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
- return;
+ for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
+ if (dst_host_ids[host_idx] == knet_h->host_id) {
+ found = 1;
+ break;
}
}
+ if (!found) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
+ log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
+ return;
+ }
}
}
- if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
- if (!knet_h->sockfd[channel].in_use) {
- pthread_mutex_unlock(&src_link->link_stats_mutex);
- log_debug(knet_h, KNET_SUB_RX,
- "received packet for channel %d but there is no local sock connected",
- channel);
- return;
- }
+ if (!knet_h->sockfd[channel].in_use) {
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
+ log_debug(knet_h, KNET_SUB_RX,
+ "received packet for channel %d but there is no local sock connected",
+ channel);
+ return;
+ }
- outlen = 0;
- memset(iov_out, 0, sizeof(iov_out));
+ outlen = 0;
+ memset(iov_out, 0, sizeof(iov_out));
retry:
- iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen;
- iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE);
-
- outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
- if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
- log_debug(knet_h, KNET_SUB_RX,
- "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n",
- iov_out[0].iov_len, outlen);
- goto retry;
- }
+ iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen;
+ iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE);
- if (outlen <= 0) {
- knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
- knet_h->sockfd[channel].sockfd[0],
- channel,
- KNET_NOTIFY_RX,
- outlen,
- errno);
- pthread_mutex_unlock(&src_link->link_stats_mutex);
- return;
- }
- if ((size_t)outlen == iov_out[0].iov_len) {
- _seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
- }
- } else { /* HOSTINFO */
- knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
- if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
- knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
- }
- if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
- pthread_mutex_unlock(&src_link->link_stats_mutex);
- return;
- }
+ outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
+ if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
+ log_debug(knet_h, KNET_SUB_RX,
+ "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n",
+ iov_out[0].iov_len, outlen);
+ goto retry;
+ }
+
+ if (outlen <= 0) {
+ knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
+ knet_h->sockfd[channel].sockfd[0],
+ channel,
+ KNET_NOTIFY_RX,
+ outlen,
+ errno);
+ pthread_mutex_unlock(&src_link->link_stats_mutex);
+ return;
+ }
+ if ((size_t)outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
- switch(knet_hostinfo->khi_type) {
- case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
- break;
- case KNET_HOSTINFO_TYPE_LINK_TABLE:
- break;
- default:
- log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
- break;
- }
}
break;
case KNET_HEADER_TYPE_PING:
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
- struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[PCKT_FRAG_MAX][2];
int iovcnt_out = 2;
uint8_t frag_idx;
inbuf = knet_h->recv_from_sock_buf;
- if ((knet_h->enabled != 1) &&
- (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
+ if (knet_h->enabled != 1) {
log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
savederrno = ECANCELED;
err = -1;
}
}
break;
- case KNET_HEADER_TYPE_HOST_INFO:
- knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
- if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
- bcast = 0;
- dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
- dst_host_ids_entries_temp = 1;
- knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
- }
- break;
default:
log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
savederrno = ENOMSG;
docallback = 1;
memset(&ev, 0, sizeof(struct epoll_event));
- if (channel != KNET_INTERNAL_DATA_CHANNEL) {
- if (epoll_ctl(knet_h->send_to_links_epollfd,
- EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
- log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
- knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
- } else {
- knet_h->sockfd[channel].has_error = 1;
- }
+ if (epoll_ctl(knet_h->send_to_links_epollfd,
+ EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
+ knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
+ } else {
+ knet_h->sockfd[channel].has_error = 1;
}
- /*
- * TODO: add error handling for KNET_INTERNAL_DATA_CHANNEL
- * once we add support for internal knet communication
- */
} else {
knet_h->recv_from_sock_buf->kh_type = type;
_parse_recv_from_sock(knet_h, inlen, channel, 0);
}
- if ((docallback) && (channel != KNET_INTERNAL_DATA_CHANNEL)) {
+ if (docallback) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
}
for (i = 0; i < nev; i++) {
- if (events[i].data.fd == knet_h->hostsockfd[0]) {
- type = KNET_HEADER_TYPE_HOST_INFO;
- channel = KNET_INTERNAL_DATA_CHANNEL;
- } else {
- type = KNET_HEADER_TYPE_DATA;
- for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
- if ((knet_h->sockfd[channel].in_use) &&
- (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
- break;
- }
- }
- if (channel >= KNET_DATAFD_MAX) {
- log_debug(knet_h, KNET_SUB_TX, "No available channels");
- continue; /* channel not found */
+ type = KNET_HEADER_TYPE_DATA;
+ for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
+ if ((knet_h->sockfd[channel].in_use) &&
+ (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
+ break;
}
}
+ if (channel >= KNET_DATAFD_MAX) {
+ log_debug(knet_h, KNET_SUB_TX, "No available channels");
+ continue; /* channel not found */
+ }
if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
continue;