]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/lib/vhost/rte_vhost/socket.c
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / spdk / lib / vhost / rte_vhost / socket.c
1 /*-
2 * BSD LICENSE
3 *
4 * Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 #include <stdint.h>
35 #include <stdio.h>
36 #include <stdbool.h>
37 #include <limits.h>
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <string.h>
41 #include <sys/types.h>
42 #include <sys/socket.h>
43 #include <sys/un.h>
44 #include <sys/queue.h>
45 #include <errno.h>
46 #include <fcntl.h>
47 #include <pthread.h>
48
49 #include <rte_log.h>
50
51 #include "fd_man.h"
52 #include "vhost.h"
53 #include "vhost_user.h"
54
55 /*
56 * Every time rte_vhost_driver_register() is invoked, an associated
57 * vhost_user_socket struct will be created.
58 */
59 struct vhost_user_socket {
60 char *path;
61 int listenfd;
62 int connfd;
63 bool is_server;
64 bool reconnect;
65 bool dequeue_zero_copy;
66 };
67
68 struct vhost_user_connection {
69 struct vhost_user_socket *vsocket;
70 int vid;
71 };
72
73 #define MAX_VHOST_SOCKET 1024
74 struct vhost_user {
75 struct vhost_user_socket *vsockets[MAX_VHOST_SOCKET];
76 struct fdset fdset;
77 int vsocket_cnt;
78 pthread_mutex_t mutex;
79 };
80
81 #define MAX_VIRTIO_BACKLOG 128
82
83 static void vhost_user_server_new_connection(int fd, void *data, int *remove);
84 static void vhost_user_read_cb(int fd, void *dat, int *remove);
85 static int vhost_user_create_client(struct vhost_user_socket *vsocket);
86
87 static struct vhost_user vhost_user = {
88 .fdset = {
89 .fd = { [0 ... MAX_FDS - 1] = {-1, NULL, NULL, NULL, 0} },
90 .fd_mutex = PTHREAD_MUTEX_INITIALIZER,
91 .num = 0
92 },
93 .vsocket_cnt = 0,
94 .mutex = PTHREAD_MUTEX_INITIALIZER,
95 };
96
97 /* return bytes# of read on success or negative val on failure. */
98 int
99 read_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num)
100 {
101 struct iovec iov;
102 struct msghdr msgh;
103 size_t fdsize = fd_num * sizeof(int);
104 char control[CMSG_SPACE(fdsize)];
105 struct cmsghdr *cmsg;
106 int ret;
107
108 memset(&msgh, 0, sizeof(msgh));
109 iov.iov_base = buf;
110 iov.iov_len = buflen;
111
112 msgh.msg_iov = &iov;
113 msgh.msg_iovlen = 1;
114 msgh.msg_control = control;
115 msgh.msg_controllen = sizeof(control);
116
117 ret = recvmsg(sockfd, &msgh, 0);
118 if (ret <= 0) {
119 RTE_LOG(ERR, VHOST_CONFIG, "recvmsg failed\n");
120 return ret;
121 }
122
123 if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
124 RTE_LOG(ERR, VHOST_CONFIG, "truncted msg\n");
125 return -1;
126 }
127
128 for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
129 cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
130 if ((cmsg->cmsg_level == SOL_SOCKET) &&
131 (cmsg->cmsg_type == SCM_RIGHTS)) {
132 memcpy(fds, CMSG_DATA(cmsg), fdsize);
133 break;
134 }
135 }
136
137 return ret;
138 }
139
140 int
141 send_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num)
142 {
143
144 struct iovec iov;
145 struct msghdr msgh;
146 size_t fdsize = fd_num * sizeof(int);
147 char control[CMSG_SPACE(fdsize)];
148 struct cmsghdr *cmsg;
149 int ret;
150
151 memset(&msgh, 0, sizeof(msgh));
152 iov.iov_base = buf;
153 iov.iov_len = buflen;
154
155 msgh.msg_iov = &iov;
156 msgh.msg_iovlen = 1;
157
158 if (fds && fd_num > 0) {
159 msgh.msg_control = control;
160 msgh.msg_controllen = sizeof(control);
161 cmsg = CMSG_FIRSTHDR(&msgh);
162 if (cmsg == NULL) {
163 RTE_LOG(ERR, VHOST_CONFIG, "null cmsg\n");
164 return -1;
165 }
166 cmsg->cmsg_len = CMSG_LEN(fdsize);
167 cmsg->cmsg_level = SOL_SOCKET;
168 cmsg->cmsg_type = SCM_RIGHTS;
169 memcpy(CMSG_DATA(cmsg), fds, fdsize);
170 } else {
171 msgh.msg_control = NULL;
172 msgh.msg_controllen = 0;
173 }
174
175 do {
176 ret = sendmsg(sockfd, &msgh, 0);
177 } while (ret < 0 && errno == EINTR);
178
179 if (ret < 0) {
180 RTE_LOG(ERR, VHOST_CONFIG, "sendmsg error\n");
181 return ret;
182 }
183
184 return ret;
185 }
186
187 static void
188 vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
189 {
190 int vid;
191 size_t size;
192 struct vhost_user_connection *conn;
193 int ret;
194
195 conn = malloc(sizeof(*conn));
196 if (conn == NULL) {
197 close(fd);
198 return;
199 }
200
201 vid = vhost_new_device();
202 if (vid == -1) {
203 close(fd);
204 free(conn);
205 return;
206 }
207
208 size = strnlen(vsocket->path, PATH_MAX);
209 vhost_set_ifname(vid, vsocket->path, size);
210
211 if (vsocket->dequeue_zero_copy)
212 vhost_enable_dequeue_zero_copy(vid);
213
214 RTE_LOG(INFO, VHOST_CONFIG, "new device, handle is %d\n", vid);
215
216 vsocket->connfd = fd;
217 conn->vsocket = vsocket;
218 conn->vid = vid;
219 ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb,
220 NULL, conn);
221 if (ret < 0) {
222 vsocket->connfd = -1;
223 free(conn);
224 close(fd);
225 RTE_LOG(ERR, VHOST_CONFIG,
226 "failed to add fd %d into vhost server fdset\n",
227 fd);
228 }
229 }
230
231 /* call back when there is new vhost-user connection from client */
232 static void
233 vhost_user_server_new_connection(int fd, void *dat, int *remove __rte_unused)
234 {
235 struct vhost_user_socket *vsocket = dat;
236
237 fd = accept(fd, NULL, NULL);
238 if (fd < 0)
239 return;
240
241 RTE_LOG(INFO, VHOST_CONFIG, "new vhost user connection is %d\n", fd);
242 vhost_user_add_connection(fd, vsocket);
243 }
244
245 static void
246 vhost_user_read_cb(int connfd, void *dat, int *remove)
247 {
248 struct vhost_user_connection *conn = dat;
249 struct vhost_user_socket *vsocket = conn->vsocket;
250 int ret;
251
252 ret = vhost_user_msg_handler(conn->vid, connfd);
253 if (ret < 0) {
254 vsocket->connfd = -1;
255 close(connfd);
256 *remove = 1;
257 vhost_destroy_device(conn->vid);
258 free(conn);
259
260 if (vsocket->reconnect)
261 vhost_user_create_client(vsocket);
262 }
263 }
264
265 static int
266 create_unix_socket(const char *path, struct sockaddr_un *un, bool is_server)
267 {
268 int fd;
269
270 fd = socket(AF_UNIX, SOCK_STREAM, 0);
271 if (fd < 0)
272 return -1;
273 RTE_LOG(INFO, VHOST_CONFIG, "vhost-user %s: socket created, fd: %d\n",
274 is_server ? "server" : "client", fd);
275
276 if (!is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) {
277 RTE_LOG(ERR, VHOST_CONFIG,
278 "vhost-user: can't set nonblocking mode for socket, fd: "
279 "%d (%s)\n", fd, strerror(errno));
280 close(fd);
281 return -1;
282 }
283
284 memset(un, 0, sizeof(*un));
285 un->sun_family = AF_UNIX;
286 strncpy(un->sun_path, path, sizeof(un->sun_path));
287 un->sun_path[sizeof(un->sun_path) - 1] = '\0';
288
289 return fd;
290 }
291
292 static int
293 vhost_user_create_server(struct vhost_user_socket *vsocket)
294 {
295 int fd;
296 int ret;
297 struct sockaddr_un un;
298 const char *path = vsocket->path;
299
300 fd = create_unix_socket(path, &un, vsocket->is_server);
301 if (fd < 0)
302 return -1;
303
304 ret = bind(fd, (struct sockaddr *)&un, sizeof(un));
305 if (ret < 0) {
306 RTE_LOG(ERR, VHOST_CONFIG,
307 "failed to bind to %s: %s; remove it and try again\n",
308 path, strerror(errno));
309 goto err;
310 }
311 RTE_LOG(INFO, VHOST_CONFIG, "bind to %s\n", path);
312
313 ret = listen(fd, MAX_VIRTIO_BACKLOG);
314 if (ret < 0)
315 goto err;
316
317 vsocket->listenfd = fd;
318 ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection,
319 NULL, vsocket);
320 if (ret < 0) {
321 RTE_LOG(ERR, VHOST_CONFIG,
322 "failed to add listen fd %d to vhost server fdset\n",
323 fd);
324 goto err;
325 }
326
327 return 0;
328
329 err:
330 close(fd);
331 return -1;
332 }
333
334 struct vhost_user_reconnect {
335 struct sockaddr_un un;
336 int fd;
337 struct vhost_user_socket *vsocket;
338
339 TAILQ_ENTRY(vhost_user_reconnect) next;
340 };
341
342 TAILQ_HEAD(vhost_user_reconnect_tailq_list, vhost_user_reconnect);
343 struct vhost_user_reconnect_list {
344 struct vhost_user_reconnect_tailq_list head;
345 pthread_mutex_t mutex;
346 };
347
348 static struct vhost_user_reconnect_list reconn_list;
349 static pthread_t reconn_tid;
350
351 static int
352 vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz)
353 {
354 int ret, flags;
355
356 ret = connect(fd, un, sz);
357 if (ret < 0 && errno != EISCONN)
358 return -1;
359
360 flags = fcntl(fd, F_GETFL, 0);
361 if (flags < 0) {
362 RTE_LOG(ERR, VHOST_CONFIG,
363 "can't get flags for connfd %d\n", fd);
364 return -2;
365 }
366 if ((flags & O_NONBLOCK) && fcntl(fd, F_SETFL, flags & ~O_NONBLOCK)) {
367 RTE_LOG(ERR, VHOST_CONFIG,
368 "can't disable nonblocking on fd %d\n", fd);
369 return -2;
370 }
371 return 0;
372 }
373
374 static void *
375 vhost_user_client_reconnect(void *arg __rte_unused)
376 {
377 int ret;
378 struct vhost_user_reconnect *reconn, *next;
379
380 while (1) {
381 pthread_mutex_lock(&reconn_list.mutex);
382
383 /*
384 * An equal implementation of TAILQ_FOREACH_SAFE,
385 * which does not exist on all platforms.
386 */
387 for (reconn = TAILQ_FIRST(&reconn_list.head);
388 reconn != NULL; reconn = next) {
389 next = TAILQ_NEXT(reconn, next);
390
391 ret = vhost_user_connect_nonblock(reconn->fd,
392 (struct sockaddr *)&reconn->un,
393 sizeof(reconn->un));
394 if (ret == -2) {
395 close(reconn->fd);
396 RTE_LOG(ERR, VHOST_CONFIG,
397 "reconnection for fd %d failed\n",
398 reconn->fd);
399 goto remove_fd;
400 }
401 if (ret == -1)
402 continue;
403
404 RTE_LOG(INFO, VHOST_CONFIG,
405 "%s: connected\n", reconn->vsocket->path);
406 vhost_user_add_connection(reconn->fd, reconn->vsocket);
407 remove_fd:
408 TAILQ_REMOVE(&reconn_list.head, reconn, next);
409 free(reconn);
410 }
411
412 pthread_mutex_unlock(&reconn_list.mutex);
413 sleep(1);
414 }
415
416 return NULL;
417 }
418
419 static int
420 vhost_user_reconnect_init(void)
421 {
422 int ret;
423
424 pthread_mutex_init(&reconn_list.mutex, NULL);
425 TAILQ_INIT(&reconn_list.head);
426
427 ret = pthread_create(&reconn_tid, NULL,
428 vhost_user_client_reconnect, NULL);
429 if (ret < 0)
430 RTE_LOG(ERR, VHOST_CONFIG, "failed to create reconnect thread");
431
432 return ret;
433 }
434
435 static int
436 vhost_user_create_client(struct vhost_user_socket *vsocket)
437 {
438 int fd;
439 int ret;
440 struct sockaddr_un un;
441 const char *path = vsocket->path;
442 struct vhost_user_reconnect *reconn;
443
444 fd = create_unix_socket(path, &un, vsocket->is_server);
445 if (fd < 0)
446 return -1;
447
448 ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&un,
449 sizeof(un));
450 if (ret == 0) {
451 vhost_user_add_connection(fd, vsocket);
452 return 0;
453 }
454
455 RTE_LOG(ERR, VHOST_CONFIG,
456 "failed to connect to %s: %s\n",
457 path, strerror(errno));
458
459 if (ret == -2 || !vsocket->reconnect) {
460 close(fd);
461 return -1;
462 }
463
464 RTE_LOG(ERR, VHOST_CONFIG, "%s: reconnecting...\n", path);
465 reconn = malloc(sizeof(*reconn));
466 if (reconn == NULL) {
467 RTE_LOG(ERR, VHOST_CONFIG,
468 "failed to allocate memory for reconnect\n");
469 close(fd);
470 return -1;
471 }
472 reconn->un = un;
473 reconn->fd = fd;
474 reconn->vsocket = vsocket;
475 pthread_mutex_lock(&reconn_list.mutex);
476 TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next);
477 pthread_mutex_unlock(&reconn_list.mutex);
478
479 return 0;
480 }
481
482 /*
483 * Register a new vhost-user socket; here we could act as server
484 * (the default case), or client (when RTE_VHOST_USER_CLIENT) flag
485 * is set.
486 */
487 int
488 rte_vhost_driver_register(const char *path, uint64_t flags)
489 {
490 int ret = -1;
491 struct vhost_user_socket *vsocket;
492
493 if (!path)
494 return -1;
495
496 pthread_mutex_lock(&vhost_user.mutex);
497
498 if (vhost_user.vsocket_cnt == MAX_VHOST_SOCKET) {
499 RTE_LOG(ERR, VHOST_CONFIG,
500 "error: the number of vhost sockets reaches maximum\n");
501 goto out;
502 }
503
504 vsocket = malloc(sizeof(struct vhost_user_socket));
505 if (!vsocket)
506 goto out;
507 memset(vsocket, 0, sizeof(struct vhost_user_socket));
508 vsocket->path = strdup(path);
509 vsocket->connfd = -1;
510 vsocket->dequeue_zero_copy = flags & RTE_VHOST_USER_DEQUEUE_ZERO_COPY;
511
512 if ((flags & RTE_VHOST_USER_CLIENT) != 0) {
513 vsocket->reconnect = !(flags & RTE_VHOST_USER_NO_RECONNECT);
514 if (vsocket->reconnect && reconn_tid == 0) {
515 if (vhost_user_reconnect_init() < 0) {
516 free(vsocket->path);
517 free(vsocket);
518 goto out;
519 }
520 }
521 ret = vhost_user_create_client(vsocket);
522 } else {
523 vsocket->is_server = true;
524 ret = vhost_user_create_server(vsocket);
525 }
526 if (ret < 0) {
527 free(vsocket->path);
528 free(vsocket);
529 goto out;
530 }
531
532 vhost_user.vsockets[vhost_user.vsocket_cnt++] = vsocket;
533
534 out:
535 pthread_mutex_unlock(&vhost_user.mutex);
536
537 return ret;
538 }
539
540 static bool
541 vhost_user_remove_reconnect(struct vhost_user_socket *vsocket)
542 {
543 int found = false;
544 struct vhost_user_reconnect *reconn, *next;
545
546 pthread_mutex_lock(&reconn_list.mutex);
547
548 for (reconn = TAILQ_FIRST(&reconn_list.head);
549 reconn != NULL; reconn = next) {
550 next = TAILQ_NEXT(reconn, next);
551
552 if (reconn->vsocket == vsocket) {
553 TAILQ_REMOVE(&reconn_list.head, reconn, next);
554 close(reconn->fd);
555 free(reconn);
556 found = true;
557 break;
558 }
559 }
560 pthread_mutex_unlock(&reconn_list.mutex);
561 return found;
562 }
563
564 /**
565 * Unregister the specified vhost socket
566 */
567 int
568 rte_vhost_driver_unregister(const char *path)
569 {
570 int i;
571 int count;
572 struct vhost_user_connection *conn;
573
574 pthread_mutex_lock(&vhost_user.mutex);
575
576 for (i = 0; i < vhost_user.vsocket_cnt; i++) {
577 struct vhost_user_socket *vsocket = vhost_user.vsockets[i];
578
579 if (!strcmp(vsocket->path, path)) {
580 if (vsocket->is_server) {
581 fdset_del(&vhost_user.fdset, vsocket->listenfd);
582 close(vsocket->listenfd);
583 unlink(path);
584 } else if (vsocket->reconnect) {
585 vhost_user_remove_reconnect(vsocket);
586 }
587
588 conn = fdset_del(&vhost_user.fdset, vsocket->connfd);
589 if (conn) {
590 RTE_LOG(INFO, VHOST_CONFIG,
591 "free connfd = %d for device '%s'\n",
592 vsocket->connfd, path);
593 close(vsocket->connfd);
594 vhost_destroy_device(conn->vid);
595 free(conn);
596 }
597
598 free(vsocket->path);
599 free(vsocket);
600
601 count = --vhost_user.vsocket_cnt;
602 vhost_user.vsockets[i] = vhost_user.vsockets[count];
603 vhost_user.vsockets[count] = NULL;
604 pthread_mutex_unlock(&vhost_user.mutex);
605
606 return 0;
607 }
608 }
609 pthread_mutex_unlock(&vhost_user.mutex);
610
611 return -1;
612 }
613
614 int
615 rte_vhost_driver_session_start(void)
616 {
617 fdset_event_dispatch(&vhost_user.fdset);
618 return 0;
619 }