4 * Copyright (c) Intel Corporation. All rights reserved.
5 * Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 #include "spdk/stdinc.h"
36 #if defined(__linux__)
37 #include <sys/epoll.h>
38 #include <linux/errqueue.h>
39 #elif defined(__FreeBSD__)
40 #include <sys/event.h>
44 #include "spdk/pipe.h"
45 #include "spdk/sock.h"
46 #include "spdk/util.h"
47 #include "spdk/likely.h"
48 #include "spdk_internal/sock.h"
50 #define MAX_TMPBUF 1024
52 #define MIN_SO_RCVBUF_SIZE (2 * 1024 * 1024)
53 #define MIN_SO_SNDBUF_SIZE (2 * 1024 * 1024)
54 #define IOV_BATCH_SIZE 64
56 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
60 struct spdk_posix_sock
{
61 struct spdk_sock base
;
67 struct spdk_pipe
*recv_pipe
;
73 TAILQ_ENTRY(spdk_posix_sock
) link
;
76 struct spdk_posix_sock_group_impl
{
77 struct spdk_sock_group_impl base
;
79 TAILQ_HEAD(, spdk_posix_sock
) pending_recv
;
82 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts
= {
83 .recv_buf_size
= MIN_SO_RCVBUF_SIZE
,
84 .send_buf_size
= MIN_SO_SNDBUF_SIZE
,
85 .enable_recv_pipe
= true,
86 .enable_zerocopy_send
= true
90 get_addr_str(struct sockaddr
*sa
, char *host
, size_t hlen
)
92 const char *result
= NULL
;
94 if (sa
== NULL
|| host
== NULL
) {
98 switch (sa
->sa_family
) {
100 result
= inet_ntop(AF_INET
, &(((struct sockaddr_in
*)sa
)->sin_addr
),
104 result
= inet_ntop(AF_INET6
, &(((struct sockaddr_in6
*)sa
)->sin6_addr
),
111 if (result
!= NULL
) {
118 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
119 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
122 posix_sock_getaddr(struct spdk_sock
*_sock
, char *saddr
, int slen
, uint16_t *sport
,
123 char *caddr
, int clen
, uint16_t *cport
)
125 struct spdk_posix_sock
*sock
= __posix_sock(_sock
);
126 struct sockaddr_storage sa
;
130 assert(sock
!= NULL
);
132 memset(&sa
, 0, sizeof sa
);
134 rc
= getsockname(sock
->fd
, (struct sockaddr
*) &sa
, &salen
);
136 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno
);
140 switch (sa
.ss_family
) {
142 /* Acceptable connection types that don't have IPs */
146 /* Code below will get IP addresses */
149 /* Unsupported socket family */
153 rc
= get_addr_str((struct sockaddr
*)&sa
, saddr
, slen
);
155 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno
);
160 if (sa
.ss_family
== AF_INET
) {
161 *sport
= ntohs(((struct sockaddr_in
*) &sa
)->sin_port
);
162 } else if (sa
.ss_family
== AF_INET6
) {
163 *sport
= ntohs(((struct sockaddr_in6
*) &sa
)->sin6_port
);
167 memset(&sa
, 0, sizeof sa
);
169 rc
= getpeername(sock
->fd
, (struct sockaddr
*) &sa
, &salen
);
171 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno
);
175 rc
= get_addr_str((struct sockaddr
*)&sa
, caddr
, clen
);
177 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno
);
182 if (sa
.ss_family
== AF_INET
) {
183 *cport
= ntohs(((struct sockaddr_in
*) &sa
)->sin_port
);
184 } else if (sa
.ss_family
== AF_INET6
) {
185 *cport
= ntohs(((struct sockaddr_in6
*) &sa
)->sin6_port
);
192 enum posix_sock_create_type
{
193 SPDK_SOCK_CREATE_LISTEN
,
194 SPDK_SOCK_CREATE_CONNECT
,
198 posix_sock_alloc_pipe(struct spdk_posix_sock
*sock
, int sz
)
201 struct spdk_pipe
*new_pipe
;
202 struct iovec siov
[2];
203 struct iovec diov
[2];
207 if (sock
->recv_buf_sz
== sz
) {
211 /* If the new size is 0, just free the pipe */
213 spdk_pipe_destroy(sock
->recv_pipe
);
214 free(sock
->recv_buf
);
215 sock
->recv_pipe
= NULL
;
216 sock
->recv_buf
= NULL
;
218 } else if (sz
< MIN_SOCK_PIPE_SIZE
) {
219 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE
);
223 /* Round up to next 64 byte multiple */
224 new_buf
= calloc(SPDK_ALIGN_CEIL(sz
+ 1, 64), sizeof(uint8_t));
226 SPDK_ERRLOG("socket recv buf allocation failed\n");
230 new_pipe
= spdk_pipe_create(new_buf
, sz
+ 1);
231 if (new_pipe
== NULL
) {
232 SPDK_ERRLOG("socket pipe allocation failed\n");
237 if (sock
->recv_pipe
!= NULL
) {
238 /* Pull all of the data out of the old pipe */
239 sbytes
= spdk_pipe_reader_get_buffer(sock
->recv_pipe
, sock
->recv_buf_sz
, siov
);
241 /* Too much data to fit into the new pipe size */
242 spdk_pipe_destroy(new_pipe
);
247 sbytes
= spdk_pipe_writer_get_buffer(new_pipe
, sz
, diov
);
248 assert(sbytes
== sz
);
250 bytes
= spdk_iovcpy(siov
, 2, diov
, 2);
251 spdk_pipe_writer_advance(new_pipe
, bytes
);
253 spdk_pipe_destroy(sock
->recv_pipe
);
254 free(sock
->recv_buf
);
257 sock
->recv_buf_sz
= sz
;
258 sock
->recv_buf
= new_buf
;
259 sock
->recv_pipe
= new_pipe
;
265 posix_sock_set_recvbuf(struct spdk_sock
*_sock
, int sz
)
267 struct spdk_posix_sock
*sock
= __posix_sock(_sock
);
270 assert(sock
!= NULL
);
272 if (g_spdk_posix_sock_impl_opts
.enable_recv_pipe
) {
273 rc
= posix_sock_alloc_pipe(sock
, sz
);
279 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */
280 if (sz
< MIN_SO_RCVBUF_SIZE
) {
281 sz
= MIN_SO_RCVBUF_SIZE
;
284 rc
= setsockopt(sock
->fd
, SOL_SOCKET
, SO_RCVBUF
, &sz
, sizeof(sz
));
293 posix_sock_set_sendbuf(struct spdk_sock
*_sock
, int sz
)
295 struct spdk_posix_sock
*sock
= __posix_sock(_sock
);
298 assert(sock
!= NULL
);
300 if (sz
< MIN_SO_SNDBUF_SIZE
) {
301 sz
= MIN_SO_SNDBUF_SIZE
;
304 rc
= setsockopt(sock
->fd
, SOL_SOCKET
, SO_SNDBUF
, &sz
, sizeof(sz
));
312 static struct spdk_posix_sock
*
313 posix_sock_alloc(int fd
, bool enable_zero_copy
)
315 struct spdk_posix_sock
*sock
;
321 sock
= calloc(1, sizeof(*sock
));
323 SPDK_ERRLOG("sock allocation failed\n");
330 if (!enable_zero_copy
|| !g_spdk_posix_sock_impl_opts
.enable_zerocopy_send
) {
334 /* Try to turn on zero copy sends */
336 rc
= setsockopt(sock
->fd
, SOL_SOCKET
, SO_ZEROCOPY
, &flag
, sizeof(flag
));
346 sock_is_loopback(int fd
)
348 struct ifaddrs
*addrs
, *tmp
;
349 struct sockaddr_storage sa
= {};
351 struct ifreq ifr
= {};
352 char ip_addr
[256], ip_addr_tmp
[256];
354 bool is_loopback
= false;
357 rc
= getsockname(fd
, (struct sockaddr
*)&sa
, &salen
);
362 memset(ip_addr
, 0, sizeof(ip_addr
));
363 rc
= get_addr_str((struct sockaddr
*)&sa
, ip_addr
, sizeof(ip_addr
));
369 for (tmp
= addrs
; tmp
!= NULL
; tmp
= tmp
->ifa_next
) {
370 if (tmp
->ifa_addr
&& (tmp
->ifa_flags
& IFF_UP
) &&
371 (tmp
->ifa_addr
->sa_family
== sa
.ss_family
)) {
372 memset(ip_addr_tmp
, 0, sizeof(ip_addr_tmp
));
373 rc
= get_addr_str(tmp
->ifa_addr
, ip_addr_tmp
, sizeof(ip_addr_tmp
));
378 if (strncmp(ip_addr
, ip_addr_tmp
, sizeof(ip_addr
)) == 0) {
379 memcpy(ifr
.ifr_name
, tmp
->ifa_name
, sizeof(ifr
.ifr_name
));
380 ioctl(fd
, SIOCGIFFLAGS
, &ifr
);
381 if (ifr
.ifr_flags
& IFF_LOOPBACK
) {
394 static struct spdk_sock
*
395 posix_sock_create(const char *ip
, int port
,
396 enum posix_sock_create_type type
,
397 struct spdk_sock_opts
*opts
)
399 struct spdk_posix_sock
*sock
;
400 char buf
[MAX_TMPBUF
];
401 char portnum
[PORTNUMLEN
];
403 struct addrinfo hints
, *res
, *res0
;
407 bool enable_zero_copy
= true;
413 snprintf(buf
, sizeof(buf
), "%s", ip
+ 1);
414 p
= strchr(buf
, ']');
418 ip
= (const char *) &buf
[0];
421 snprintf(portnum
, sizeof portnum
, "%d", port
);
422 memset(&hints
, 0, sizeof hints
);
423 hints
.ai_family
= PF_UNSPEC
;
424 hints
.ai_socktype
= SOCK_STREAM
;
425 hints
.ai_flags
= AI_NUMERICSERV
;
426 hints
.ai_flags
|= AI_PASSIVE
;
427 hints
.ai_flags
|= AI_NUMERICHOST
;
428 rc
= getaddrinfo(ip
, portnum
, &hints
, &res0
);
430 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno
);
436 for (res
= res0
; res
!= NULL
; res
= res
->ai_next
) {
438 fd
= socket(res
->ai_family
, res
->ai_socktype
, res
->ai_protocol
);
444 sz
= g_spdk_posix_sock_impl_opts
.recv_buf_size
;
445 rc
= setsockopt(fd
, SOL_SOCKET
, SO_RCVBUF
, &sz
, sizeof(sz
));
450 sz
= g_spdk_posix_sock_impl_opts
.send_buf_size
;
451 rc
= setsockopt(fd
, SOL_SOCKET
, SO_SNDBUF
, &sz
, sizeof(sz
));
456 rc
= setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, &val
, sizeof val
);
462 rc
= setsockopt(fd
, IPPROTO_TCP
, TCP_NODELAY
, &val
, sizeof val
);
469 #if defined(SO_PRIORITY)
470 if (opts
!= NULL
&& opts
->priority
) {
471 rc
= setsockopt(fd
, SOL_SOCKET
, SO_PRIORITY
, &opts
->priority
, sizeof val
);
480 if (res
->ai_family
== AF_INET6
) {
481 rc
= setsockopt(fd
, IPPROTO_IPV6
, IPV6_V6ONLY
, &val
, sizeof val
);
489 if (type
== SPDK_SOCK_CREATE_LISTEN
) {
490 rc
= bind(fd
, res
->ai_addr
, res
->ai_addrlen
);
492 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port
, errno
);
499 SPDK_ERRLOG("IP address %s not available. "
500 "Verify IP address in config file "
501 "and make sure setup script is "
502 "run before starting spdk app.\n", ip
);
505 /* try next family */
512 rc
= listen(fd
, 512);
514 SPDK_ERRLOG("listen() failed, errno = %d\n", errno
);
519 } else if (type
== SPDK_SOCK_CREATE_CONNECT
) {
520 rc
= connect(fd
, res
->ai_addr
, res
->ai_addrlen
);
522 SPDK_ERRLOG("connect() failed, errno = %d\n", errno
);
523 /* try next family */
530 flag
= fcntl(fd
, F_GETFL
);
531 if (fcntl(fd
, F_SETFL
, flag
| O_NONBLOCK
) < 0) {
532 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd
, errno
);
545 if (type
== SPDK_SOCK_CREATE_LISTEN
) {
546 /* Only enable zero copy for non-loopback sockets. */
547 enable_zero_copy
= !sock_is_loopback(fd
);
548 } else if (type
== SPDK_SOCK_CREATE_CONNECT
) {
549 /* Disable zero copy for client sockets until support is added */
550 enable_zero_copy
= false;
553 sock
= posix_sock_alloc(fd
, enable_zero_copy
);
555 SPDK_ERRLOG("sock allocation failed\n");
561 sock
->so_priority
= opts
->priority
;
566 static struct spdk_sock
*
567 posix_sock_listen(const char *ip
, int port
, struct spdk_sock_opts
*opts
)
569 return posix_sock_create(ip
, port
, SPDK_SOCK_CREATE_LISTEN
, opts
);
572 static struct spdk_sock
*
573 posix_sock_connect(const char *ip
, int port
, struct spdk_sock_opts
*opts
)
575 return posix_sock_create(ip
, port
, SPDK_SOCK_CREATE_CONNECT
, opts
);
578 static struct spdk_sock
*
579 posix_sock_accept(struct spdk_sock
*_sock
)
581 struct spdk_posix_sock
*sock
= __posix_sock(_sock
);
582 struct sockaddr_storage sa
;
585 struct spdk_posix_sock
*new_sock
;
588 memset(&sa
, 0, sizeof(sa
));
591 assert(sock
!= NULL
);
593 rc
= accept(sock
->fd
, (struct sockaddr
*)&sa
, &salen
);
601 flag
= fcntl(fd
, F_GETFL
);
602 if ((!(flag
& O_NONBLOCK
)) && (fcntl(fd
, F_SETFL
, flag
| O_NONBLOCK
) < 0)) {
603 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd
, errno
);
608 #if defined(SO_PRIORITY)
609 /* The priority is not inherited, so call this function again */
610 if (sock
->base
.opts
.priority
) {
611 rc
= setsockopt(fd
, SOL_SOCKET
, SO_PRIORITY
, &sock
->base
.opts
.priority
, sizeof(int));
619 /* Inherit the zero copy feature from the listen socket */
620 new_sock
= posix_sock_alloc(fd
, sock
->zcopy
);
621 if (new_sock
== NULL
) {
625 new_sock
->so_priority
= sock
->base
.opts
.priority
;
627 return &new_sock
->base
;
631 posix_sock_close(struct spdk_sock
*_sock
)
633 struct spdk_posix_sock
*sock
= __posix_sock(_sock
);
635 assert(TAILQ_EMPTY(&_sock
->pending_reqs
));
637 /* If the socket fails to close, the best choice is to
638 * leak the fd but continue to free the rest of the sock
642 spdk_pipe_destroy(sock
->recv_pipe
);
643 free(sock
->recv_buf
);
651 _sock_check_zcopy(struct spdk_sock
*sock
)
653 struct spdk_posix_sock
*psock
= __posix_sock(sock
);
654 struct msghdr msgh
= {};
655 uint8_t buf
[sizeof(struct cmsghdr
) + sizeof(struct sock_extended_err
)];
657 struct sock_extended_err
*serr
;
660 struct spdk_sock_request
*req
, *treq
;
663 msgh
.msg_control
= buf
;
664 msgh
.msg_controllen
= sizeof(buf
);
667 rc
= recvmsg(psock
->fd
, &msgh
, MSG_ERRQUEUE
);
670 if (errno
== EWOULDBLOCK
|| errno
== EAGAIN
) {
674 if (!TAILQ_EMPTY(&sock
->pending_reqs
)) {
675 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
677 SPDK_WARNLOG("Recvmsg yielded an error!\n");
682 cm
= CMSG_FIRSTHDR(&msgh
);
683 if (!cm
|| cm
->cmsg_level
!= SOL_IP
|| cm
->cmsg_type
!= IP_RECVERR
) {
684 SPDK_WARNLOG("Unexpected cmsg level or type!\n");
688 serr
= (struct sock_extended_err
*)CMSG_DATA(cm
);
689 if (serr
->ee_errno
!= 0 || serr
->ee_origin
!= SO_EE_ORIGIN_ZEROCOPY
) {
690 SPDK_WARNLOG("Unexpected extended error origin\n");
694 /* Most of the time, the pending_reqs array is in the exact
695 * order we need such that all of the requests to complete are
696 * in order, in the front. It is guaranteed that all requests
697 * belonging to the same sendmsg call are sequential, so once
698 * we encounter one match we can stop looping as soon as a
699 * non-match is found.
701 for (idx
= serr
->ee_info
; idx
<= serr
->ee_data
; idx
++) {
703 TAILQ_FOREACH_SAFE(req
, &sock
->pending_reqs
, internal
.link
, treq
) {
704 if (req
->internal
.offset
== idx
) {
707 rc
= spdk_sock_request_put(sock
, req
, 0);
725 _sock_flush(struct spdk_sock
*sock
)
727 struct spdk_posix_sock
*psock
= __posix_sock(sock
);
728 struct msghdr msg
= {};
730 struct iovec iovs
[IOV_BATCH_SIZE
];
733 struct spdk_sock_request
*req
;
739 /* Can't flush from within a callback or we end up with recursive calls */
740 if (sock
->cb_cnt
> 0) {
746 req
= TAILQ_FIRST(&sock
->queued_reqs
);
748 offset
= req
->internal
.offset
;
750 for (i
= 0; i
< req
->iovcnt
; i
++) {
751 /* Consume any offset first */
752 if (offset
>= SPDK_SOCK_REQUEST_IOV(req
, i
)->iov_len
) {
753 offset
-= SPDK_SOCK_REQUEST_IOV(req
, i
)->iov_len
;
757 iovs
[iovcnt
].iov_base
= SPDK_SOCK_REQUEST_IOV(req
, i
)->iov_base
+ offset
;
758 iovs
[iovcnt
].iov_len
= SPDK_SOCK_REQUEST_IOV(req
, i
)->iov_len
- offset
;
763 if (iovcnt
>= IOV_BATCH_SIZE
) {
768 if (iovcnt
>= IOV_BATCH_SIZE
) {
772 req
= TAILQ_NEXT(req
, internal
.link
);
779 /* Perform the vectored write */
781 msg
.msg_iovlen
= iovcnt
;
784 flags
= MSG_ZEROCOPY
;
790 rc
= sendmsg(psock
->fd
, &msg
, flags
);
792 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
798 psock
->sendmsg_idx
++;
800 /* Consume the requests that were actually written */
801 req
= TAILQ_FIRST(&sock
->queued_reqs
);
803 offset
= req
->internal
.offset
;
805 for (i
= 0; i
< req
->iovcnt
; i
++) {
806 /* Advance by the offset first */
807 if (offset
>= SPDK_SOCK_REQUEST_IOV(req
, i
)->iov_len
) {
808 offset
-= SPDK_SOCK_REQUEST_IOV(req
, i
)->iov_len
;
812 /* Calculate the remaining length of this element */
813 len
= SPDK_SOCK_REQUEST_IOV(req
, i
)->iov_len
- offset
;
815 if (len
> (size_t)rc
) {
816 /* This element was partially sent. */
817 req
->internal
.offset
+= rc
;
822 req
->internal
.offset
+= len
;
826 /* Handled a full request. */
827 spdk_sock_request_pend(sock
, req
);
830 /* The sendmsg syscall above isn't currently asynchronous,
831 * so it's already done. */
832 retval
= spdk_sock_request_put(sock
, req
, 0);
837 /* Re-use the offset field to hold the sendmsg call index. The
838 * index is 0 based, so subtract one here because we've already
839 * incremented above. */
840 req
->internal
.offset
= psock
->sendmsg_idx
- 1;
847 req
= TAILQ_FIRST(&sock
->queued_reqs
);
854 posix_sock_flush(struct spdk_sock
*_sock
)
856 return _sock_flush(_sock
);
860 posix_sock_recv_from_pipe(struct spdk_posix_sock
*sock
, struct iovec
*diov
, int diovcnt
)
862 struct iovec siov
[2];
865 struct spdk_posix_sock_group_impl
*group
;
867 sbytes
= spdk_pipe_reader_get_buffer(sock
->recv_pipe
, sock
->recv_buf_sz
, siov
);
871 } else if (sbytes
== 0) {
876 bytes
= spdk_iovcpy(siov
, 2, diov
, diovcnt
);
879 /* The only way this happens is if diov is 0 length */
884 spdk_pipe_reader_advance(sock
->recv_pipe
, bytes
);
886 /* If we drained the pipe, take it off the level-triggered list */
887 if (sock
->base
.group_impl
&& spdk_pipe_reader_bytes_available(sock
->recv_pipe
) == 0) {
888 group
= __posix_group_impl(sock
->base
.group_impl
);
889 TAILQ_REMOVE(&group
->pending_recv
, sock
, link
);
890 sock
->pending_recv
= false;
896 static inline ssize_t
897 posix_sock_read(struct spdk_posix_sock
*sock
)
901 struct spdk_posix_sock_group_impl
*group
;
903 bytes
= spdk_pipe_writer_get_buffer(sock
->recv_pipe
, sock
->recv_buf_sz
, iov
);
906 bytes
= readv(sock
->fd
, iov
, 2);
908 spdk_pipe_writer_advance(sock
->recv_pipe
, bytes
);
909 if (sock
->base
.group_impl
) {
910 group
= __posix_group_impl(sock
->base
.group_impl
);
911 TAILQ_INSERT_TAIL(&group
->pending_recv
, sock
, link
);
912 sock
->pending_recv
= true;
921 posix_sock_readv(struct spdk_sock
*_sock
, struct iovec
*iov
, int iovcnt
)
923 struct spdk_posix_sock
*sock
= __posix_sock(_sock
);
927 if (sock
->recv_pipe
== NULL
) {
928 return readv(sock
->fd
, iov
, iovcnt
);
932 for (i
= 0; i
< iovcnt
; i
++) {
933 len
+= iov
[i
].iov_len
;
936 if (spdk_pipe_reader_bytes_available(sock
->recv_pipe
) == 0) {
937 /* If the user is receiving a sufficiently large amount of data,
938 * receive directly to their buffers. */
939 if (len
>= MIN_SOCK_PIPE_SIZE
) {
940 return readv(sock
->fd
, iov
, iovcnt
);
943 /* Otherwise, do a big read into our pipe */
944 rc
= posix_sock_read(sock
);
950 return posix_sock_recv_from_pipe(sock
, iov
, iovcnt
);
954 posix_sock_recv(struct spdk_sock
*sock
, void *buf
, size_t len
)
958 iov
[0].iov_base
= buf
;
959 iov
[0].iov_len
= len
;
961 return posix_sock_readv(sock
, iov
, 1);
965 posix_sock_writev(struct spdk_sock
*_sock
, struct iovec
*iov
, int iovcnt
)
967 struct spdk_posix_sock
*sock
= __posix_sock(_sock
);
970 /* In order to process a writev, we need to flush any asynchronous writes
972 rc
= _sock_flush(_sock
);
977 if (!TAILQ_EMPTY(&_sock
->queued_reqs
)) {
978 /* We weren't able to flush all requests */
983 return writev(sock
->fd
, iov
, iovcnt
);
987 posix_sock_writev_async(struct spdk_sock
*sock
, struct spdk_sock_request
*req
)
991 spdk_sock_request_queue(sock
, req
);
993 /* If there are a sufficient number queued, just flush them out immediately. */
994 if (sock
->queued_iovcnt
>= IOV_BATCH_SIZE
) {
995 rc
= _sock_flush(sock
);
997 spdk_sock_abort_requests(sock
);
1003 posix_sock_set_recvlowat(struct spdk_sock
*_sock
, int nbytes
)
1005 struct spdk_posix_sock
*sock
= __posix_sock(_sock
);
1009 assert(sock
!= NULL
);
1012 rc
= setsockopt(sock
->fd
, SOL_SOCKET
, SO_RCVLOWAT
, &val
, sizeof val
);
1020 posix_sock_is_ipv6(struct spdk_sock
*_sock
)
1022 struct spdk_posix_sock
*sock
= __posix_sock(_sock
);
1023 struct sockaddr_storage sa
;
1027 assert(sock
!= NULL
);
1029 memset(&sa
, 0, sizeof sa
);
1031 rc
= getsockname(sock
->fd
, (struct sockaddr
*) &sa
, &salen
);
1033 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno
);
1037 return (sa
.ss_family
== AF_INET6
);
1041 posix_sock_is_ipv4(struct spdk_sock
*_sock
)
1043 struct spdk_posix_sock
*sock
= __posix_sock(_sock
);
1044 struct sockaddr_storage sa
;
1048 assert(sock
!= NULL
);
1050 memset(&sa
, 0, sizeof sa
);
1052 rc
= getsockname(sock
->fd
, (struct sockaddr
*) &sa
, &salen
);
1054 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno
);
1058 return (sa
.ss_family
== AF_INET
);
1062 posix_sock_is_connected(struct spdk_sock
*_sock
)
1064 struct spdk_posix_sock
*sock
= __posix_sock(_sock
);
1068 rc
= recv(sock
->fd
, &byte
, 1, MSG_PEEK
);
1074 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
1085 posix_sock_get_placement_id(struct spdk_sock
*_sock
, int *placement_id
)
1089 #if defined(SO_INCOMING_NAPI_ID)
1090 struct spdk_posix_sock
*sock
= __posix_sock(_sock
);
1091 socklen_t salen
= sizeof(int);
1093 rc
= getsockopt(sock
->fd
, SOL_SOCKET
, SO_INCOMING_NAPI_ID
, placement_id
, &salen
);
1095 SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno
);
1102 static struct spdk_sock_group_impl
*
1103 posix_sock_group_impl_create(void)
1105 struct spdk_posix_sock_group_impl
*group_impl
;
1108 #if defined(__linux__)
1109 fd
= epoll_create1(0);
1110 #elif defined(__FreeBSD__)
1117 group_impl
= calloc(1, sizeof(*group_impl
));
1118 if (group_impl
== NULL
) {
1119 SPDK_ERRLOG("group_impl allocation failed\n");
1124 group_impl
->fd
= fd
;
1125 TAILQ_INIT(&group_impl
->pending_recv
);
1127 return &group_impl
->base
;
1131 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl
*_group
, struct spdk_sock
*_sock
)
1133 struct spdk_posix_sock_group_impl
*group
= __posix_group_impl(_group
);
1134 struct spdk_posix_sock
*sock
= __posix_sock(_sock
);
1137 #if defined(__linux__)
1138 struct epoll_event event
;
1140 memset(&event
, 0, sizeof(event
));
1141 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1142 event
.events
= EPOLLIN
| EPOLLERR
;
1143 event
.data
.ptr
= sock
;
1145 rc
= epoll_ctl(group
->fd
, EPOLL_CTL_ADD
, sock
->fd
, &event
);
1146 #elif defined(__FreeBSD__)
1147 struct kevent event
;
1148 struct timespec ts
= {0};
1150 EV_SET(&event
, sock
->fd
, EVFILT_READ
, EV_ADD
, 0, 0, sock
);
1152 rc
= kevent(group
->fd
, &event
, 1, NULL
, 0, &ts
);
1155 /* switched from another polling group due to scheduling */
1156 if (spdk_unlikely(sock
->recv_pipe
!= NULL
&&
1157 (spdk_pipe_reader_bytes_available(sock
->recv_pipe
) > 0))) {
1158 assert(sock
->pending_recv
== false);
1159 sock
->pending_recv
= true;
1160 TAILQ_INSERT_TAIL(&group
->pending_recv
, sock
, link
);
1167 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl
*_group
, struct spdk_sock
*_sock
)
1169 struct spdk_posix_sock_group_impl
*group
= __posix_group_impl(_group
);
1170 struct spdk_posix_sock
*sock
= __posix_sock(_sock
);
1173 if (sock
->recv_pipe
!= NULL
) {
1174 if (spdk_pipe_reader_bytes_available(sock
->recv_pipe
) > 0) {
1175 TAILQ_REMOVE(&group
->pending_recv
, sock
, link
);
1176 sock
->pending_recv
= false;
1178 assert(sock
->pending_recv
== false);
1181 #if defined(__linux__)
1182 struct epoll_event event
;
1184 /* Event parameter is ignored but some old kernel version still require it. */
1185 rc
= epoll_ctl(group
->fd
, EPOLL_CTL_DEL
, sock
->fd
, &event
);
1186 #elif defined(__FreeBSD__)
1187 struct kevent event
;
1188 struct timespec ts
= {0};
1190 EV_SET(&event
, sock
->fd
, EVFILT_READ
, EV_DELETE
, 0, 0, NULL
);
1192 rc
= kevent(group
->fd
, &event
, 1, NULL
, 0, &ts
);
1193 if (rc
== 0 && event
.flags
& EV_ERROR
) {
1199 spdk_sock_abort_requests(_sock
);
1205 posix_sock_group_impl_poll(struct spdk_sock_group_impl
*_group
, int max_events
,
1206 struct spdk_sock
**socks
)
1208 struct spdk_posix_sock_group_impl
*group
= __posix_group_impl(_group
);
1209 struct spdk_sock
*sock
, *tmp
;
1210 int num_events
, i
, rc
;
1211 struct spdk_posix_sock
*psock
, *ptmp
;
1212 #if defined(__linux__)
1213 struct epoll_event events
[MAX_EVENTS_PER_POLL
];
1214 #elif defined(__FreeBSD__)
1215 struct kevent events
[MAX_EVENTS_PER_POLL
];
1216 struct timespec ts
= {0};
1219 /* This must be a TAILQ_FOREACH_SAFE because while flushing,
1220 * a completion callback could remove the sock from the
1222 TAILQ_FOREACH_SAFE(sock
, &_group
->socks
, link
, tmp
) {
1223 rc
= _sock_flush(sock
);
1225 spdk_sock_abort_requests(sock
);
1229 #if defined(__linux__)
1230 num_events
= epoll_wait(group
->fd
, events
, max_events
, 0);
1231 #elif defined(__FreeBSD__)
1232 num_events
= kevent(group
->fd
, NULL
, 0, events
, max_events
, &ts
);
1235 if (num_events
== -1) {
1237 } else if (num_events
== 0 && !TAILQ_EMPTY(&_group
->socks
)) {
1240 sock
= TAILQ_FIRST(&_group
->socks
);
1241 psock
= __posix_sock(sock
);
1242 /* a recv is done here to busy poll the queue associated with
1243 * first socket in list and potentially reap incoming data.
1245 if (psock
->so_priority
) {
1246 recv(psock
->fd
, &byte
, 1, MSG_PEEK
);
1250 for (i
= 0; i
< num_events
; i
++) {
1251 #if defined(__linux__)
1252 sock
= events
[i
].data
.ptr
;
1253 psock
= __posix_sock(sock
);
1255 #ifdef SPDK_ZEROCOPY
1256 if (events
[i
].events
& EPOLLERR
) {
1257 rc
= _sock_check_zcopy(sock
);
1258 /* If the socket was closed or removed from
1259 * the group in response to a send ack, don't
1260 * add it to the array here. */
1261 if (rc
|| sock
->cb_fn
== NULL
) {
1266 if ((events
[i
].events
& EPOLLIN
) == 0) {
1270 #elif defined(__FreeBSD__)
1271 sock
= events
[i
].udata
;
1272 psock
= __posix_sock(sock
);
1275 /* If the socket does not already have recv pending, add it now */
1276 if (!psock
->pending_recv
) {
1277 psock
->pending_recv
= true;
1278 TAILQ_INSERT_TAIL(&group
->pending_recv
, psock
, link
);
1284 TAILQ_FOREACH_SAFE(psock
, &group
->pending_recv
, link
, ptmp
) {
1285 if (num_events
== max_events
) {
1289 socks
[num_events
++] = &psock
->base
;
1292 /* Cycle the pending_recv list so that each time we poll things aren't
1293 * in the same order. */
1294 for (i
= 0; i
< num_events
; i
++) {
1295 psock
= __posix_sock(socks
[i
]);
1297 TAILQ_REMOVE(&group
->pending_recv
, psock
, link
);
1299 if (psock
->recv_pipe
== NULL
|| spdk_pipe_reader_bytes_available(psock
->recv_pipe
) == 0) {
1300 psock
->pending_recv
= false;
1302 TAILQ_INSERT_TAIL(&group
->pending_recv
, psock
, link
);
1311 posix_sock_group_impl_close(struct spdk_sock_group_impl
*_group
)
1313 struct spdk_posix_sock_group_impl
*group
= __posix_group_impl(_group
);
1316 rc
= close(group
->fd
);
1322 posix_sock_impl_get_opts(struct spdk_sock_impl_opts
*opts
, size_t *len
)
1324 if (!opts
|| !len
) {
1329 #define FIELD_OK(field) \
1330 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
1332 #define GET_FIELD(field) \
1333 if (FIELD_OK(field)) { \
1334 opts->field = g_spdk_posix_sock_impl_opts.field; \
1337 GET_FIELD(recv_buf_size
);
1338 GET_FIELD(send_buf_size
);
1339 GET_FIELD(enable_recv_pipe
);
1340 GET_FIELD(enable_zerocopy_send
);
1345 *len
= spdk_min(*len
, sizeof(g_spdk_posix_sock_impl_opts
));
1350 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts
*opts
, size_t len
)
1357 #define FIELD_OK(field) \
1358 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
1360 #define SET_FIELD(field) \
1361 if (FIELD_OK(field)) { \
1362 g_spdk_posix_sock_impl_opts.field = opts->field; \
1365 SET_FIELD(recv_buf_size
);
1366 SET_FIELD(send_buf_size
);
1367 SET_FIELD(enable_recv_pipe
);
1368 SET_FIELD(enable_zerocopy_send
);
1377 static struct spdk_net_impl g_posix_net_impl
= {
1379 .getaddr
= posix_sock_getaddr
,
1380 .connect
= posix_sock_connect
,
1381 .listen
= posix_sock_listen
,
1382 .accept
= posix_sock_accept
,
1383 .close
= posix_sock_close
,
1384 .recv
= posix_sock_recv
,
1385 .readv
= posix_sock_readv
,
1386 .writev
= posix_sock_writev
,
1387 .writev_async
= posix_sock_writev_async
,
1388 .flush
= posix_sock_flush
,
1389 .set_recvlowat
= posix_sock_set_recvlowat
,
1390 .set_recvbuf
= posix_sock_set_recvbuf
,
1391 .set_sendbuf
= posix_sock_set_sendbuf
,
1392 .is_ipv6
= posix_sock_is_ipv6
,
1393 .is_ipv4
= posix_sock_is_ipv4
,
1394 .is_connected
= posix_sock_is_connected
,
1395 .get_placement_id
= posix_sock_get_placement_id
,
1396 .group_impl_create
= posix_sock_group_impl_create
,
1397 .group_impl_add_sock
= posix_sock_group_impl_add_sock
,
1398 .group_impl_remove_sock
= posix_sock_group_impl_remove_sock
,
1399 .group_impl_poll
= posix_sock_group_impl_poll
,
1400 .group_impl_close
= posix_sock_group_impl_close
,
1401 .get_opts
= posix_sock_impl_get_opts
,
1402 .set_opts
= posix_sock_impl_set_opts
,
1405 SPDK_NET_IMPL_REGISTER(posix
, &g_posix_net_impl
, DEFAULT_SOCK_PRIORITY
);