]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/module/sock/posix/posix.c
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / spdk / module / sock / posix / posix.c
1 /*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation. All rights reserved.
5 * Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 #include "spdk/stdinc.h"
35
36 #if defined(__linux__)
37 #include <sys/epoll.h>
38 #include <linux/errqueue.h>
39 #elif defined(__FreeBSD__)
40 #include <sys/event.h>
41 #endif
42
43 #include "spdk/log.h"
44 #include "spdk/pipe.h"
45 #include "spdk/sock.h"
46 #include "spdk/util.h"
47 #include "spdk/likely.h"
48 #include "spdk_internal/sock.h"
49
50 #define MAX_TMPBUF 1024
51 #define PORTNUMLEN 32
52 #define MIN_SO_RCVBUF_SIZE (2 * 1024 * 1024)
53 #define MIN_SO_SNDBUF_SIZE (2 * 1024 * 1024)
54 #define IOV_BATCH_SIZE 64
55
56 #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
57 #define SPDK_ZEROCOPY
58 #endif
59
60 struct spdk_posix_sock {
61 struct spdk_sock base;
62 int fd;
63
64 uint32_t sendmsg_idx;
65 bool zcopy;
66
67 struct spdk_pipe *recv_pipe;
68 void *recv_buf;
69 int recv_buf_sz;
70 bool pending_recv;
71 int so_priority;
72
73 TAILQ_ENTRY(spdk_posix_sock) link;
74 };
75
76 struct spdk_posix_sock_group_impl {
77 struct spdk_sock_group_impl base;
78 int fd;
79 TAILQ_HEAD(, spdk_posix_sock) pending_recv;
80 };
81
82 static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = {
83 .recv_buf_size = MIN_SO_RCVBUF_SIZE,
84 .send_buf_size = MIN_SO_SNDBUF_SIZE,
85 .enable_recv_pipe = true,
86 .enable_zerocopy_send = true
87 };
88
89 static int
90 get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
91 {
92 const char *result = NULL;
93
94 if (sa == NULL || host == NULL) {
95 return -1;
96 }
97
98 switch (sa->sa_family) {
99 case AF_INET:
100 result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
101 host, hlen);
102 break;
103 case AF_INET6:
104 result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
105 host, hlen);
106 break;
107 default:
108 break;
109 }
110
111 if (result != NULL) {
112 return 0;
113 } else {
114 return -1;
115 }
116 }
117
118 #define __posix_sock(sock) (struct spdk_posix_sock *)sock
119 #define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
120
121 static int
122 posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
123 char *caddr, int clen, uint16_t *cport)
124 {
125 struct spdk_posix_sock *sock = __posix_sock(_sock);
126 struct sockaddr_storage sa;
127 socklen_t salen;
128 int rc;
129
130 assert(sock != NULL);
131
132 memset(&sa, 0, sizeof sa);
133 salen = sizeof sa;
134 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
135 if (rc != 0) {
136 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
137 return -1;
138 }
139
140 switch (sa.ss_family) {
141 case AF_UNIX:
142 /* Acceptable connection types that don't have IPs */
143 return 0;
144 case AF_INET:
145 case AF_INET6:
146 /* Code below will get IP addresses */
147 break;
148 default:
149 /* Unsupported socket family */
150 return -1;
151 }
152
153 rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
154 if (rc != 0) {
155 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
156 return -1;
157 }
158
159 if (sport) {
160 if (sa.ss_family == AF_INET) {
161 *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
162 } else if (sa.ss_family == AF_INET6) {
163 *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
164 }
165 }
166
167 memset(&sa, 0, sizeof sa);
168 salen = sizeof sa;
169 rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
170 if (rc != 0) {
171 SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
172 return -1;
173 }
174
175 rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
176 if (rc != 0) {
177 SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
178 return -1;
179 }
180
181 if (cport) {
182 if (sa.ss_family == AF_INET) {
183 *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
184 } else if (sa.ss_family == AF_INET6) {
185 *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
186 }
187 }
188
189 return 0;
190 }
191
192 enum posix_sock_create_type {
193 SPDK_SOCK_CREATE_LISTEN,
194 SPDK_SOCK_CREATE_CONNECT,
195 };
196
197 static int
198 posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
199 {
200 uint8_t *new_buf;
201 struct spdk_pipe *new_pipe;
202 struct iovec siov[2];
203 struct iovec diov[2];
204 int sbytes;
205 ssize_t bytes;
206
207 if (sock->recv_buf_sz == sz) {
208 return 0;
209 }
210
211 /* If the new size is 0, just free the pipe */
212 if (sz == 0) {
213 spdk_pipe_destroy(sock->recv_pipe);
214 free(sock->recv_buf);
215 sock->recv_pipe = NULL;
216 sock->recv_buf = NULL;
217 return 0;
218 } else if (sz < MIN_SOCK_PIPE_SIZE) {
219 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
220 return -1;
221 }
222
223 /* Round up to next 64 byte multiple */
224 new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
225 if (!new_buf) {
226 SPDK_ERRLOG("socket recv buf allocation failed\n");
227 return -ENOMEM;
228 }
229
230 new_pipe = spdk_pipe_create(new_buf, sz + 1);
231 if (new_pipe == NULL) {
232 SPDK_ERRLOG("socket pipe allocation failed\n");
233 free(new_buf);
234 return -ENOMEM;
235 }
236
237 if (sock->recv_pipe != NULL) {
238 /* Pull all of the data out of the old pipe */
239 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
240 if (sbytes > sz) {
241 /* Too much data to fit into the new pipe size */
242 spdk_pipe_destroy(new_pipe);
243 free(new_buf);
244 return -EINVAL;
245 }
246
247 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
248 assert(sbytes == sz);
249
250 bytes = spdk_iovcpy(siov, 2, diov, 2);
251 spdk_pipe_writer_advance(new_pipe, bytes);
252
253 spdk_pipe_destroy(sock->recv_pipe);
254 free(sock->recv_buf);
255 }
256
257 sock->recv_buf_sz = sz;
258 sock->recv_buf = new_buf;
259 sock->recv_pipe = new_pipe;
260
261 return 0;
262 }
263
264 static int
265 posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
266 {
267 struct spdk_posix_sock *sock = __posix_sock(_sock);
268 int rc;
269
270 assert(sock != NULL);
271
272 if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) {
273 rc = posix_sock_alloc_pipe(sock, sz);
274 if (rc) {
275 return rc;
276 }
277 }
278
279 /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */
280 if (sz < MIN_SO_RCVBUF_SIZE) {
281 sz = MIN_SO_RCVBUF_SIZE;
282 }
283
284 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
285 if (rc < 0) {
286 return rc;
287 }
288
289 return 0;
290 }
291
292 static int
293 posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
294 {
295 struct spdk_posix_sock *sock = __posix_sock(_sock);
296 int rc;
297
298 assert(sock != NULL);
299
300 if (sz < MIN_SO_SNDBUF_SIZE) {
301 sz = MIN_SO_SNDBUF_SIZE;
302 }
303
304 rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
305 if (rc < 0) {
306 return rc;
307 }
308
309 return 0;
310 }
311
312 static struct spdk_posix_sock *
313 posix_sock_alloc(int fd, bool enable_zero_copy)
314 {
315 struct spdk_posix_sock *sock;
316 #ifdef SPDK_ZEROCOPY
317 int rc;
318 int flag;
319 #endif
320
321 sock = calloc(1, sizeof(*sock));
322 if (sock == NULL) {
323 SPDK_ERRLOG("sock allocation failed\n");
324 return NULL;
325 }
326
327 sock->fd = fd;
328
329 #ifdef SPDK_ZEROCOPY
330 if (!enable_zero_copy || !g_spdk_posix_sock_impl_opts.enable_zerocopy_send) {
331 return sock;
332 }
333
334 /* Try to turn on zero copy sends */
335 flag = 1;
336 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
337 if (rc == 0) {
338 sock->zcopy = true;
339 }
340 #endif
341
342 return sock;
343 }
344
345 static bool
346 sock_is_loopback(int fd)
347 {
348 struct ifaddrs *addrs, *tmp;
349 struct sockaddr_storage sa = {};
350 socklen_t salen;
351 struct ifreq ifr = {};
352 char ip_addr[256], ip_addr_tmp[256];
353 int rc;
354 bool is_loopback = false;
355
356 salen = sizeof(sa);
357 rc = getsockname(fd, (struct sockaddr *)&sa, &salen);
358 if (rc != 0) {
359 return is_loopback;
360 }
361
362 memset(ip_addr, 0, sizeof(ip_addr));
363 rc = get_addr_str((struct sockaddr *)&sa, ip_addr, sizeof(ip_addr));
364 if (rc != 0) {
365 return is_loopback;
366 }
367
368 getifaddrs(&addrs);
369 for (tmp = addrs; tmp != NULL; tmp = tmp->ifa_next) {
370 if (tmp->ifa_addr && (tmp->ifa_flags & IFF_UP) &&
371 (tmp->ifa_addr->sa_family == sa.ss_family)) {
372 memset(ip_addr_tmp, 0, sizeof(ip_addr_tmp));
373 rc = get_addr_str(tmp->ifa_addr, ip_addr_tmp, sizeof(ip_addr_tmp));
374 if (rc != 0) {
375 continue;
376 }
377
378 if (strncmp(ip_addr, ip_addr_tmp, sizeof(ip_addr)) == 0) {
379 memcpy(ifr.ifr_name, tmp->ifa_name, sizeof(ifr.ifr_name));
380 ioctl(fd, SIOCGIFFLAGS, &ifr);
381 if (ifr.ifr_flags & IFF_LOOPBACK) {
382 is_loopback = true;
383 }
384 goto end;
385 }
386 }
387 }
388
389 end:
390 freeifaddrs(addrs);
391 return is_loopback;
392 }
393
394 static struct spdk_sock *
395 posix_sock_create(const char *ip, int port,
396 enum posix_sock_create_type type,
397 struct spdk_sock_opts *opts)
398 {
399 struct spdk_posix_sock *sock;
400 char buf[MAX_TMPBUF];
401 char portnum[PORTNUMLEN];
402 char *p;
403 struct addrinfo hints, *res, *res0;
404 int fd, flag;
405 int val = 1;
406 int rc, sz;
407 bool enable_zero_copy = true;
408
409 if (ip == NULL) {
410 return NULL;
411 }
412 if (ip[0] == '[') {
413 snprintf(buf, sizeof(buf), "%s", ip + 1);
414 p = strchr(buf, ']');
415 if (p != NULL) {
416 *p = '\0';
417 }
418 ip = (const char *) &buf[0];
419 }
420
421 snprintf(portnum, sizeof portnum, "%d", port);
422 memset(&hints, 0, sizeof hints);
423 hints.ai_family = PF_UNSPEC;
424 hints.ai_socktype = SOCK_STREAM;
425 hints.ai_flags = AI_NUMERICSERV;
426 hints.ai_flags |= AI_PASSIVE;
427 hints.ai_flags |= AI_NUMERICHOST;
428 rc = getaddrinfo(ip, portnum, &hints, &res0);
429 if (rc != 0) {
430 SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
431 return NULL;
432 }
433
434 /* try listen */
435 fd = -1;
436 for (res = res0; res != NULL; res = res->ai_next) {
437 retry:
438 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
439 if (fd < 0) {
440 /* error */
441 continue;
442 }
443
444 sz = g_spdk_posix_sock_impl_opts.recv_buf_size;
445 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
446 if (rc) {
447 /* Not fatal */
448 }
449
450 sz = g_spdk_posix_sock_impl_opts.send_buf_size;
451 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
452 if (rc) {
453 /* Not fatal */
454 }
455
456 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
457 if (rc != 0) {
458 close(fd);
459 /* error */
460 continue;
461 }
462 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
463 if (rc != 0) {
464 close(fd);
465 /* error */
466 continue;
467 }
468
469 #if defined(SO_PRIORITY)
470 if (opts != NULL && opts->priority) {
471 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
472 if (rc != 0) {
473 close(fd);
474 /* error */
475 continue;
476 }
477 }
478 #endif
479
480 if (res->ai_family == AF_INET6) {
481 rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
482 if (rc != 0) {
483 close(fd);
484 /* error */
485 continue;
486 }
487 }
488
489 if (type == SPDK_SOCK_CREATE_LISTEN) {
490 rc = bind(fd, res->ai_addr, res->ai_addrlen);
491 if (rc != 0) {
492 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
493 switch (errno) {
494 case EINTR:
495 /* interrupted? */
496 close(fd);
497 goto retry;
498 case EADDRNOTAVAIL:
499 SPDK_ERRLOG("IP address %s not available. "
500 "Verify IP address in config file "
501 "and make sure setup script is "
502 "run before starting spdk app.\n", ip);
503 /* FALLTHROUGH */
504 default:
505 /* try next family */
506 close(fd);
507 fd = -1;
508 continue;
509 }
510 }
511 /* bind OK */
512 rc = listen(fd, 512);
513 if (rc != 0) {
514 SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
515 close(fd);
516 fd = -1;
517 break;
518 }
519 } else if (type == SPDK_SOCK_CREATE_CONNECT) {
520 rc = connect(fd, res->ai_addr, res->ai_addrlen);
521 if (rc != 0) {
522 SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
523 /* try next family */
524 close(fd);
525 fd = -1;
526 continue;
527 }
528 }
529
530 flag = fcntl(fd, F_GETFL);
531 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
532 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
533 close(fd);
534 fd = -1;
535 break;
536 }
537 break;
538 }
539 freeaddrinfo(res0);
540
541 if (fd < 0) {
542 return NULL;
543 }
544
545 if (type == SPDK_SOCK_CREATE_LISTEN) {
546 /* Only enable zero copy for non-loopback sockets. */
547 enable_zero_copy = !sock_is_loopback(fd);
548 } else if (type == SPDK_SOCK_CREATE_CONNECT) {
549 /* Disable zero copy for client sockets until support is added */
550 enable_zero_copy = false;
551 }
552
553 sock = posix_sock_alloc(fd, enable_zero_copy);
554 if (sock == NULL) {
555 SPDK_ERRLOG("sock allocation failed\n");
556 close(fd);
557 return NULL;
558 }
559
560 if (opts != NULL) {
561 sock->so_priority = opts->priority;
562 }
563 return &sock->base;
564 }
565
566 static struct spdk_sock *
567 posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
568 {
569 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
570 }
571
572 static struct spdk_sock *
573 posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
574 {
575 return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
576 }
577
578 static struct spdk_sock *
579 posix_sock_accept(struct spdk_sock *_sock)
580 {
581 struct spdk_posix_sock *sock = __posix_sock(_sock);
582 struct sockaddr_storage sa;
583 socklen_t salen;
584 int rc, fd;
585 struct spdk_posix_sock *new_sock;
586 int flag;
587
588 memset(&sa, 0, sizeof(sa));
589 salen = sizeof(sa);
590
591 assert(sock != NULL);
592
593 rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
594
595 if (rc == -1) {
596 return NULL;
597 }
598
599 fd = rc;
600
601 flag = fcntl(fd, F_GETFL);
602 if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
603 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
604 close(fd);
605 return NULL;
606 }
607
608 #if defined(SO_PRIORITY)
609 /* The priority is not inherited, so call this function again */
610 if (sock->base.opts.priority) {
611 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
612 if (rc != 0) {
613 close(fd);
614 return NULL;
615 }
616 }
617 #endif
618
619 /* Inherit the zero copy feature from the listen socket */
620 new_sock = posix_sock_alloc(fd, sock->zcopy);
621 if (new_sock == NULL) {
622 close(fd);
623 return NULL;
624 }
625 new_sock->so_priority = sock->base.opts.priority;
626
627 return &new_sock->base;
628 }
629
630 static int
631 posix_sock_close(struct spdk_sock *_sock)
632 {
633 struct spdk_posix_sock *sock = __posix_sock(_sock);
634
635 assert(TAILQ_EMPTY(&_sock->pending_reqs));
636
637 /* If the socket fails to close, the best choice is to
638 * leak the fd but continue to free the rest of the sock
639 * memory. */
640 close(sock->fd);
641
642 spdk_pipe_destroy(sock->recv_pipe);
643 free(sock->recv_buf);
644 free(sock);
645
646 return 0;
647 }
648
649 #ifdef SPDK_ZEROCOPY
650 static int
651 _sock_check_zcopy(struct spdk_sock *sock)
652 {
653 struct spdk_posix_sock *psock = __posix_sock(sock);
654 struct msghdr msgh = {};
655 uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
656 ssize_t rc;
657 struct sock_extended_err *serr;
658 struct cmsghdr *cm;
659 uint32_t idx;
660 struct spdk_sock_request *req, *treq;
661 bool found;
662
663 msgh.msg_control = buf;
664 msgh.msg_controllen = sizeof(buf);
665
666 while (true) {
667 rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
668
669 if (rc < 0) {
670 if (errno == EWOULDBLOCK || errno == EAGAIN) {
671 return 0;
672 }
673
674 if (!TAILQ_EMPTY(&sock->pending_reqs)) {
675 SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
676 } else {
677 SPDK_WARNLOG("Recvmsg yielded an error!\n");
678 }
679 return 0;
680 }
681
682 cm = CMSG_FIRSTHDR(&msgh);
683 if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) {
684 SPDK_WARNLOG("Unexpected cmsg level or type!\n");
685 return 0;
686 }
687
688 serr = (struct sock_extended_err *)CMSG_DATA(cm);
689 if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
690 SPDK_WARNLOG("Unexpected extended error origin\n");
691 return 0;
692 }
693
694 /* Most of the time, the pending_reqs array is in the exact
695 * order we need such that all of the requests to complete are
696 * in order, in the front. It is guaranteed that all requests
697 * belonging to the same sendmsg call are sequential, so once
698 * we encounter one match we can stop looping as soon as a
699 * non-match is found.
700 */
701 for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
702 found = false;
703 TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
704 if (req->internal.offset == idx) {
705 found = true;
706
707 rc = spdk_sock_request_put(sock, req, 0);
708 if (rc < 0) {
709 return rc;
710 }
711
712 } else if (found) {
713 break;
714 }
715 }
716
717 }
718 }
719
720 return 0;
721 }
722 #endif
723
724 static int
725 _sock_flush(struct spdk_sock *sock)
726 {
727 struct spdk_posix_sock *psock = __posix_sock(sock);
728 struct msghdr msg = {};
729 int flags;
730 struct iovec iovs[IOV_BATCH_SIZE];
731 int iovcnt;
732 int retval;
733 struct spdk_sock_request *req;
734 int i;
735 ssize_t rc;
736 unsigned int offset;
737 size_t len;
738
739 /* Can't flush from within a callback or we end up with recursive calls */
740 if (sock->cb_cnt > 0) {
741 return 0;
742 }
743
744 /* Gather an iov */
745 iovcnt = 0;
746 req = TAILQ_FIRST(&sock->queued_reqs);
747 while (req) {
748 offset = req->internal.offset;
749
750 for (i = 0; i < req->iovcnt; i++) {
751 /* Consume any offset first */
752 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
753 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
754 continue;
755 }
756
757 iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
758 iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
759 iovcnt++;
760
761 offset = 0;
762
763 if (iovcnt >= IOV_BATCH_SIZE) {
764 break;
765 }
766 }
767
768 if (iovcnt >= IOV_BATCH_SIZE) {
769 break;
770 }
771
772 req = TAILQ_NEXT(req, internal.link);
773 }
774
775 if (iovcnt == 0) {
776 return 0;
777 }
778
779 /* Perform the vectored write */
780 msg.msg_iov = iovs;
781 msg.msg_iovlen = iovcnt;
782 #ifdef SPDK_ZEROCOPY
783 if (psock->zcopy) {
784 flags = MSG_ZEROCOPY;
785 } else
786 #endif
787 {
788 flags = 0;
789 }
790 rc = sendmsg(psock->fd, &msg, flags);
791 if (rc <= 0) {
792 if (errno == EAGAIN || errno == EWOULDBLOCK) {
793 return 0;
794 }
795 return rc;
796 }
797
798 psock->sendmsg_idx++;
799
800 /* Consume the requests that were actually written */
801 req = TAILQ_FIRST(&sock->queued_reqs);
802 while (req) {
803 offset = req->internal.offset;
804
805 for (i = 0; i < req->iovcnt; i++) {
806 /* Advance by the offset first */
807 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
808 offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
809 continue;
810 }
811
812 /* Calculate the remaining length of this element */
813 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
814
815 if (len > (size_t)rc) {
816 /* This element was partially sent. */
817 req->internal.offset += rc;
818 return 0;
819 }
820
821 offset = 0;
822 req->internal.offset += len;
823 rc -= len;
824 }
825
826 /* Handled a full request. */
827 spdk_sock_request_pend(sock, req);
828
829 if (!psock->zcopy) {
830 /* The sendmsg syscall above isn't currently asynchronous,
831 * so it's already done. */
832 retval = spdk_sock_request_put(sock, req, 0);
833 if (retval) {
834 break;
835 }
836 } else {
837 /* Re-use the offset field to hold the sendmsg call index. The
838 * index is 0 based, so subtract one here because we've already
839 * incremented above. */
840 req->internal.offset = psock->sendmsg_idx - 1;
841 }
842
843 if (rc == 0) {
844 break;
845 }
846
847 req = TAILQ_FIRST(&sock->queued_reqs);
848 }
849
850 return 0;
851 }
852
853 static int
854 posix_sock_flush(struct spdk_sock *_sock)
855 {
856 return _sock_flush(_sock);
857 }
858
859 static ssize_t
860 posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
861 {
862 struct iovec siov[2];
863 int sbytes;
864 ssize_t bytes;
865 struct spdk_posix_sock_group_impl *group;
866
867 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
868 if (sbytes < 0) {
869 errno = EINVAL;
870 return -1;
871 } else if (sbytes == 0) {
872 errno = EAGAIN;
873 return -1;
874 }
875
876 bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
877
878 if (bytes == 0) {
879 /* The only way this happens is if diov is 0 length */
880 errno = EINVAL;
881 return -1;
882 }
883
884 spdk_pipe_reader_advance(sock->recv_pipe, bytes);
885
886 /* If we drained the pipe, take it off the level-triggered list */
887 if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
888 group = __posix_group_impl(sock->base.group_impl);
889 TAILQ_REMOVE(&group->pending_recv, sock, link);
890 sock->pending_recv = false;
891 }
892
893 return bytes;
894 }
895
896 static inline ssize_t
897 posix_sock_read(struct spdk_posix_sock *sock)
898 {
899 struct iovec iov[2];
900 int bytes;
901 struct spdk_posix_sock_group_impl *group;
902
903 bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
904
905 if (bytes > 0) {
906 bytes = readv(sock->fd, iov, 2);
907 if (bytes > 0) {
908 spdk_pipe_writer_advance(sock->recv_pipe, bytes);
909 if (sock->base.group_impl) {
910 group = __posix_group_impl(sock->base.group_impl);
911 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
912 sock->pending_recv = true;
913 }
914 }
915 }
916
917 return bytes;
918 }
919
920 static ssize_t
921 posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
922 {
923 struct spdk_posix_sock *sock = __posix_sock(_sock);
924 int rc, i;
925 size_t len;
926
927 if (sock->recv_pipe == NULL) {
928 return readv(sock->fd, iov, iovcnt);
929 }
930
931 len = 0;
932 for (i = 0; i < iovcnt; i++) {
933 len += iov[i].iov_len;
934 }
935
936 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
937 /* If the user is receiving a sufficiently large amount of data,
938 * receive directly to their buffers. */
939 if (len >= MIN_SOCK_PIPE_SIZE) {
940 return readv(sock->fd, iov, iovcnt);
941 }
942
943 /* Otherwise, do a big read into our pipe */
944 rc = posix_sock_read(sock);
945 if (rc <= 0) {
946 return rc;
947 }
948 }
949
950 return posix_sock_recv_from_pipe(sock, iov, iovcnt);
951 }
952
953 static ssize_t
954 posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
955 {
956 struct iovec iov[1];
957
958 iov[0].iov_base = buf;
959 iov[0].iov_len = len;
960
961 return posix_sock_readv(sock, iov, 1);
962 }
963
964 static ssize_t
965 posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
966 {
967 struct spdk_posix_sock *sock = __posix_sock(_sock);
968 int rc;
969
970 /* In order to process a writev, we need to flush any asynchronous writes
971 * first. */
972 rc = _sock_flush(_sock);
973 if (rc < 0) {
974 return rc;
975 }
976
977 if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
978 /* We weren't able to flush all requests */
979 errno = EAGAIN;
980 return -1;
981 }
982
983 return writev(sock->fd, iov, iovcnt);
984 }
985
986 static void
987 posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
988 {
989 int rc;
990
991 spdk_sock_request_queue(sock, req);
992
993 /* If there are a sufficient number queued, just flush them out immediately. */
994 if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
995 rc = _sock_flush(sock);
996 if (rc) {
997 spdk_sock_abort_requests(sock);
998 }
999 }
1000 }
1001
1002 static int
1003 posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
1004 {
1005 struct spdk_posix_sock *sock = __posix_sock(_sock);
1006 int val;
1007 int rc;
1008
1009 assert(sock != NULL);
1010
1011 val = nbytes;
1012 rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
1013 if (rc != 0) {
1014 return -1;
1015 }
1016 return 0;
1017 }
1018
1019 static bool
1020 posix_sock_is_ipv6(struct spdk_sock *_sock)
1021 {
1022 struct spdk_posix_sock *sock = __posix_sock(_sock);
1023 struct sockaddr_storage sa;
1024 socklen_t salen;
1025 int rc;
1026
1027 assert(sock != NULL);
1028
1029 memset(&sa, 0, sizeof sa);
1030 salen = sizeof sa;
1031 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1032 if (rc != 0) {
1033 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1034 return false;
1035 }
1036
1037 return (sa.ss_family == AF_INET6);
1038 }
1039
1040 static bool
1041 posix_sock_is_ipv4(struct spdk_sock *_sock)
1042 {
1043 struct spdk_posix_sock *sock = __posix_sock(_sock);
1044 struct sockaddr_storage sa;
1045 socklen_t salen;
1046 int rc;
1047
1048 assert(sock != NULL);
1049
1050 memset(&sa, 0, sizeof sa);
1051 salen = sizeof sa;
1052 rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
1053 if (rc != 0) {
1054 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
1055 return false;
1056 }
1057
1058 return (sa.ss_family == AF_INET);
1059 }
1060
1061 static bool
1062 posix_sock_is_connected(struct spdk_sock *_sock)
1063 {
1064 struct spdk_posix_sock *sock = __posix_sock(_sock);
1065 uint8_t byte;
1066 int rc;
1067
1068 rc = recv(sock->fd, &byte, 1, MSG_PEEK);
1069 if (rc == 0) {
1070 return false;
1071 }
1072
1073 if (rc < 0) {
1074 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1075 return true;
1076 }
1077
1078 return false;
1079 }
1080
1081 return true;
1082 }
1083
1084 static int
1085 posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id)
1086 {
1087 int rc = -1;
1088
1089 #if defined(SO_INCOMING_NAPI_ID)
1090 struct spdk_posix_sock *sock = __posix_sock(_sock);
1091 socklen_t salen = sizeof(int);
1092
1093 rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen);
1094 if (rc != 0) {
1095 SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno);
1096 }
1097
1098 #endif
1099 return rc;
1100 }
1101
1102 static struct spdk_sock_group_impl *
1103 posix_sock_group_impl_create(void)
1104 {
1105 struct spdk_posix_sock_group_impl *group_impl;
1106 int fd;
1107
1108 #if defined(__linux__)
1109 fd = epoll_create1(0);
1110 #elif defined(__FreeBSD__)
1111 fd = kqueue();
1112 #endif
1113 if (fd == -1) {
1114 return NULL;
1115 }
1116
1117 group_impl = calloc(1, sizeof(*group_impl));
1118 if (group_impl == NULL) {
1119 SPDK_ERRLOG("group_impl allocation failed\n");
1120 close(fd);
1121 return NULL;
1122 }
1123
1124 group_impl->fd = fd;
1125 TAILQ_INIT(&group_impl->pending_recv);
1126
1127 return &group_impl->base;
1128 }
1129
1130 static int
1131 posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1132 {
1133 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1134 struct spdk_posix_sock *sock = __posix_sock(_sock);
1135 int rc;
1136
1137 #if defined(__linux__)
1138 struct epoll_event event;
1139
1140 memset(&event, 0, sizeof(event));
1141 /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
1142 event.events = EPOLLIN | EPOLLERR;
1143 event.data.ptr = sock;
1144
1145 rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
1146 #elif defined(__FreeBSD__)
1147 struct kevent event;
1148 struct timespec ts = {0};
1149
1150 EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
1151
1152 rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1153 #endif
1154
1155 /* switched from another polling group due to scheduling */
1156 if (spdk_unlikely(sock->recv_pipe != NULL &&
1157 (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
1158 assert(sock->pending_recv == false);
1159 sock->pending_recv = true;
1160 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
1161 }
1162
1163 return rc;
1164 }
1165
1166 static int
1167 posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
1168 {
1169 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1170 struct spdk_posix_sock *sock = __posix_sock(_sock);
1171 int rc;
1172
1173 if (sock->recv_pipe != NULL) {
1174 if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) {
1175 TAILQ_REMOVE(&group->pending_recv, sock, link);
1176 sock->pending_recv = false;
1177 }
1178 assert(sock->pending_recv == false);
1179 }
1180
1181 #if defined(__linux__)
1182 struct epoll_event event;
1183
1184 /* Event parameter is ignored but some old kernel version still require it. */
1185 rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
1186 #elif defined(__FreeBSD__)
1187 struct kevent event;
1188 struct timespec ts = {0};
1189
1190 EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1191
1192 rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
1193 if (rc == 0 && event.flags & EV_ERROR) {
1194 rc = -1;
1195 errno = event.data;
1196 }
1197 #endif
1198
1199 spdk_sock_abort_requests(_sock);
1200
1201 return rc;
1202 }
1203
1204 static int
1205 posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
1206 struct spdk_sock **socks)
1207 {
1208 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1209 struct spdk_sock *sock, *tmp;
1210 int num_events, i, rc;
1211 struct spdk_posix_sock *psock, *ptmp;
1212 #if defined(__linux__)
1213 struct epoll_event events[MAX_EVENTS_PER_POLL];
1214 #elif defined(__FreeBSD__)
1215 struct kevent events[MAX_EVENTS_PER_POLL];
1216 struct timespec ts = {0};
1217 #endif
1218
1219 /* This must be a TAILQ_FOREACH_SAFE because while flushing,
1220 * a completion callback could remove the sock from the
1221 * group. */
1222 TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
1223 rc = _sock_flush(sock);
1224 if (rc) {
1225 spdk_sock_abort_requests(sock);
1226 }
1227 }
1228
1229 #if defined(__linux__)
1230 num_events = epoll_wait(group->fd, events, max_events, 0);
1231 #elif defined(__FreeBSD__)
1232 num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
1233 #endif
1234
1235 if (num_events == -1) {
1236 return -1;
1237 } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) {
1238 uint8_t byte;
1239
1240 sock = TAILQ_FIRST(&_group->socks);
1241 psock = __posix_sock(sock);
1242 /* a recv is done here to busy poll the queue associated with
1243 * first socket in list and potentially reap incoming data.
1244 */
1245 if (psock->so_priority) {
1246 recv(psock->fd, &byte, 1, MSG_PEEK);
1247 }
1248 }
1249
1250 for (i = 0; i < num_events; i++) {
1251 #if defined(__linux__)
1252 sock = events[i].data.ptr;
1253 psock = __posix_sock(sock);
1254
1255 #ifdef SPDK_ZEROCOPY
1256 if (events[i].events & EPOLLERR) {
1257 rc = _sock_check_zcopy(sock);
1258 /* If the socket was closed or removed from
1259 * the group in response to a send ack, don't
1260 * add it to the array here. */
1261 if (rc || sock->cb_fn == NULL) {
1262 continue;
1263 }
1264 }
1265 #endif
1266 if ((events[i].events & EPOLLIN) == 0) {
1267 continue;
1268 }
1269
1270 #elif defined(__FreeBSD__)
1271 sock = events[i].udata;
1272 psock = __posix_sock(sock);
1273 #endif
1274
1275 /* If the socket does not already have recv pending, add it now */
1276 if (!psock->pending_recv) {
1277 psock->pending_recv = true;
1278 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
1279 }
1280 }
1281
1282 num_events = 0;
1283
1284 TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) {
1285 if (num_events == max_events) {
1286 break;
1287 }
1288
1289 socks[num_events++] = &psock->base;
1290 }
1291
1292 /* Cycle the pending_recv list so that each time we poll things aren't
1293 * in the same order. */
1294 for (i = 0; i < num_events; i++) {
1295 psock = __posix_sock(socks[i]);
1296
1297 TAILQ_REMOVE(&group->pending_recv, psock, link);
1298
1299 if (psock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) {
1300 psock->pending_recv = false;
1301 } else {
1302 TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
1303 }
1304
1305 }
1306
1307 return num_events;
1308 }
1309
1310 static int
1311 posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
1312 {
1313 struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
1314 int rc;
1315
1316 rc = close(group->fd);
1317 free(group);
1318 return rc;
1319 }
1320
1321 static int
1322 posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
1323 {
1324 if (!opts || !len) {
1325 errno = EINVAL;
1326 return -1;
1327 }
1328
1329 #define FIELD_OK(field) \
1330 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
1331
1332 #define GET_FIELD(field) \
1333 if (FIELD_OK(field)) { \
1334 opts->field = g_spdk_posix_sock_impl_opts.field; \
1335 }
1336
1337 GET_FIELD(recv_buf_size);
1338 GET_FIELD(send_buf_size);
1339 GET_FIELD(enable_recv_pipe);
1340 GET_FIELD(enable_zerocopy_send);
1341
1342 #undef GET_FIELD
1343 #undef FIELD_OK
1344
1345 *len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts));
1346 return 0;
1347 }
1348
1349 static int
1350 posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
1351 {
1352 if (!opts) {
1353 errno = EINVAL;
1354 return -1;
1355 }
1356
1357 #define FIELD_OK(field) \
1358 offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
1359
1360 #define SET_FIELD(field) \
1361 if (FIELD_OK(field)) { \
1362 g_spdk_posix_sock_impl_opts.field = opts->field; \
1363 }
1364
1365 SET_FIELD(recv_buf_size);
1366 SET_FIELD(send_buf_size);
1367 SET_FIELD(enable_recv_pipe);
1368 SET_FIELD(enable_zerocopy_send);
1369
1370 #undef SET_FIELD
1371 #undef FIELD_OK
1372
1373 return 0;
1374 }
1375
1376
1377 static struct spdk_net_impl g_posix_net_impl = {
1378 .name = "posix",
1379 .getaddr = posix_sock_getaddr,
1380 .connect = posix_sock_connect,
1381 .listen = posix_sock_listen,
1382 .accept = posix_sock_accept,
1383 .close = posix_sock_close,
1384 .recv = posix_sock_recv,
1385 .readv = posix_sock_readv,
1386 .writev = posix_sock_writev,
1387 .writev_async = posix_sock_writev_async,
1388 .flush = posix_sock_flush,
1389 .set_recvlowat = posix_sock_set_recvlowat,
1390 .set_recvbuf = posix_sock_set_recvbuf,
1391 .set_sendbuf = posix_sock_set_sendbuf,
1392 .is_ipv6 = posix_sock_is_ipv6,
1393 .is_ipv4 = posix_sock_is_ipv4,
1394 .is_connected = posix_sock_is_connected,
1395 .get_placement_id = posix_sock_get_placement_id,
1396 .group_impl_create = posix_sock_group_impl_create,
1397 .group_impl_add_sock = posix_sock_group_impl_add_sock,
1398 .group_impl_remove_sock = posix_sock_group_impl_remove_sock,
1399 .group_impl_poll = posix_sock_group_impl_poll,
1400 .group_impl_close = posix_sock_group_impl_close,
1401 .get_opts = posix_sock_impl_get_opts,
1402 .set_opts = posix_sock_impl_set_opts,
1403 };
1404
1405 SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY);