#define PCKT_FRAG_MAX UINT8_MAX
#define PCKT_RX_BUFS 512
-#define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX
+#define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX + 1
+
+#define KNET_INTERNAL_DATA_CHANNEL KNET_DATAFD_MAX
typedef void *knet_transport_link_t; /* per link transport handle */
typedef void *knet_transport_t; /* per knet_h transport handle */
struct knet_handle {
knet_node_id_t host_id;
unsigned int enabled:1;
- struct knet_sock sockfd[KNET_DATAFD_MAX];
+ struct knet_sock sockfd[KNET_DATAFD_MAX + 1];
int logfd;
uint8_t log_levels[KNET_MAX_SUBSYSTEMS];
int hostsockfd[2];
docallback = 1;
memset(&ev, 0, sizeof(struct epoll_event));
- if (epoll_ctl(knet_h->send_to_links_epollfd,
- EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
- log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
- knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
- } else {
- knet_h->sockfd[channel].has_error = 1;
+ if (channel != KNET_INTERNAL_DATA_CHANNEL) {
+ if (epoll_ctl(knet_h->send_to_links_epollfd,
+ EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
+ knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
+ } else {
+ knet_h->sockfd[channel].has_error = 1;
+ }
}
+ /*
+ * TODO: add error handling for KNET_INTERNAL_DATA_CHANNEL
+ * once we add support for internal knet communication
+ */
} else {
knet_h->recv_from_sock_buf->kh_type = type;
_parse_recv_from_sock(knet_h, inlen, channel, 0);
}
- if (docallback) {
+ if ((docallback) && (channel != KNET_INTERNAL_DATA_CHANNEL)) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
for (i = 0; i < nev; i++) {
if (events[i].data.fd == knet_h->hostsockfd[0]) {
type = KNET_HEADER_TYPE_HOST_INFO;
- channel = -1;
+ channel = KNET_INTERNAL_DATA_CHANNEL;
} else {
type = KNET_HEADER_TYPE_DATA;
for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {