]> git.proxmox.com Git - mirror_qemu.git/blobdiff - io/channel-socket.c
Merge tag 'pull-trivial-patches' of https://gitlab.com/mjt0k/qemu into staging
[mirror_qemu.git] / io / channel-socket.c
index bc5f80e780ebc8fea2205b1fd68e04e75ed57752..3a899b060858de9a12a6692098b2ff09fe1b3352 100644 (file)
@@ -6,7 +6,7 @@
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
  * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
+ * version 2.1 of the License, or (at your option) any later version.
  *
  * This library is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  *
  * You should have received a copy of the GNU Lesser General Public
  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
- *
  */
 
 #include "qemu/osdep.h"
 #include "qapi/error.h"
 #include "qapi/qapi-visit-sockets.h"
+#include "qemu/module.h"
 #include "io/channel-socket.h"
+#include "io/channel-util.h"
 #include "io/channel-watch.h"
 #include "trace.h"
 #include "qapi/clone-visitor.h"
+#ifdef CONFIG_LINUX
+#include <linux/errqueue.h>
+#include <sys/socket.h>
+
+#if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY))
+#define QEMU_MSG_ZEROCOPY
+#endif
+#endif
 
 #define SOCKET_MAX_FDS 16
 
@@ -54,6 +63,8 @@ qio_channel_socket_new(void)
 
     sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
     sioc->fd = -1;
+    sioc->zero_copy_queued = 0;
+    sioc->zero_copy_sent = 0;
 
     ioc = QIO_CHANNEL(sioc);
     qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
@@ -153,6 +164,19 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
         return -1;
     }
 
+#ifdef QEMU_MSG_ZEROCOPY
+    int ret, v = 1;
+    ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
+    if (ret == 0) {
+        /* Zero copy available on host */
+        qio_channel_set_feature(QIO_CHANNEL(ioc),
+                                QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY);
+    }
+#endif
+
+    qio_channel_set_feature(QIO_CHANNEL(ioc),
+                            QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
+
     return 0;
 }
 
@@ -196,12 +220,13 @@ void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
 
 int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
                                    SocketAddress *addr,
+                                   int num,
                                    Error **errp)
 {
     int fd;
 
-    trace_qio_channel_socket_listen_sync(ioc, addr);
-    fd = socket_listen(addr, errp);
+    trace_qio_channel_socket_listen_sync(ioc, addr, num);
+    fd = socket_listen(addr, num, errp);
     if (fd < 0) {
         trace_qio_channel_socket_listen_fail(ioc);
         return -1;
@@ -218,14 +243,27 @@ int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
 }
 
 
+struct QIOChannelListenWorkerData {
+    SocketAddress *addr;
+    int num; /* amount of expected connections */
+};
+
+static void qio_channel_listen_worker_free(gpointer opaque)
+{
+    struct QIOChannelListenWorkerData *data = opaque;
+
+    qapi_free_SocketAddress(data->addr);
+    g_free(data);
+}
+
 static void qio_channel_socket_listen_worker(QIOTask *task,
                                              gpointer opaque)
 {
     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
-    SocketAddress *addr = opaque;
+    struct QIOChannelListenWorkerData *data = opaque;
     Error *err = NULL;
 
-    qio_channel_socket_listen_sync(ioc, addr, &err);
+    qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err);
 
     qio_task_set_error(task, err);
 }
@@ -233,6 +271,7 @@ static void qio_channel_socket_listen_worker(QIOTask *task,
 
 void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
                                      SocketAddress *addr,
+                                     int num,
                                      QIOTaskFunc callback,
                                      gpointer opaque,
                                      GDestroyNotify destroy,
@@ -240,16 +279,18 @@ void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
 {
     QIOTask *task = qio_task_new(
         OBJECT(ioc), callback, opaque, destroy);
-    SocketAddress *addrCopy;
+    struct QIOChannelListenWorkerData *data;
 
-    addrCopy = QAPI_CLONE(SocketAddress, addr);
+    data = g_new0(struct QIOChannelListenWorkerData, 1);
+    data->addr = QAPI_CLONE(SocketAddress, addr);
+    data->num = num;
 
     /* socket_listen() blocks in DNS lookups, so we must use a thread */
-    trace_qio_channel_socket_listen_async(ioc, addr);
+    trace_qio_channel_socket_listen_async(ioc, addr, num);
     qio_task_run_in_thread(task,
                            qio_channel_socket_listen_worker,
-                           addrCopy,
-                           (GDestroyNotify)qapi_free_SocketAddress,
+                           data,
+                           qio_channel_listen_worker_free,
                            context);
 }
 
@@ -369,6 +410,9 @@ qio_channel_socket_accept(QIOChannelSocket *ioc,
     }
 #endif /* WIN32 */
 
+    qio_channel_set_feature(QIO_CHANNEL(cioc),
+                            QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
+
     trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd);
     return cioc;
 
@@ -399,9 +443,9 @@ static void qio_channel_socket_finalize(Object *obj)
             }
         }
 #ifdef WIN32
-        WSAEventSelect(ioc->fd, NULL, 0);
+        qemu_socket_unselect(ioc->fd, NULL);
 #endif
-        closesocket(ioc->fd);
+        close(ioc->fd);
         ioc->fd = -1;
     }
 }
@@ -443,7 +487,7 @@ static void qio_channel_socket_copy_fds(struct msghdr *msg,
             }
 
             /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */
-            qemu_set_block(fd);
+            qemu_socket_set_block(fd);
 
 #ifndef MSG_CMSG_CLOEXEC
             qemu_set_cloexec(fd);
@@ -459,6 +503,7 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
                                         size_t niov,
                                         int **fds,
                                         size_t *nfds,
+                                        int flags,
                                         Error **errp)
 {
     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
@@ -469,15 +514,19 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
 
     memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
 
-#ifdef MSG_CMSG_CLOEXEC
-    sflags |= MSG_CMSG_CLOEXEC;
-#endif
-
     msg.msg_iov = (struct iovec *)iov;
     msg.msg_iovlen = niov;
     if (fds && nfds) {
         msg.msg_control = control;
         msg.msg_controllen = sizeof(control);
+#ifdef MSG_CMSG_CLOEXEC
+        sflags |= MSG_CMSG_CLOEXEC;
+#endif
+
+    }
+
+    if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
+        sflags |= MSG_PEEK;
     }
 
  retry:
@@ -507,6 +556,7 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
                                          size_t niov,
                                          int *fds,
                                          size_t nfds,
+                                         int flags,
                                          Error **errp)
 {
     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
@@ -515,6 +565,7 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
     char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
     size_t fdsize = sizeof(int) * nfds;
     struct cmsghdr *cmsg;
+    int sflags = 0;
 
     memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
 
@@ -539,19 +590,44 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
         memcpy(CMSG_DATA(cmsg), fds, fdsize);
     }
 
+    if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
+#ifdef QEMU_MSG_ZEROCOPY
+        sflags = MSG_ZEROCOPY;
+#else
+        /*
+         * We expect QIOChannel class entry point to have
+         * blocked this code path already
+         */
+        g_assert_not_reached();
+#endif
+    }
+
  retry:
-    ret = sendmsg(sioc->fd, &msg, 0);
+    ret = sendmsg(sioc->fd, &msg, sflags);
     if (ret <= 0) {
-        if (errno == EAGAIN) {
+        switch (errno) {
+        case EAGAIN:
             return QIO_CHANNEL_ERR_BLOCK;
-        }
-        if (errno == EINTR) {
+        case EINTR:
             goto retry;
+        case ENOBUFS:
+            if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
+                error_setg_errno(errp, errno,
+                                 "Process can't lock enough memory for using MSG_ZEROCOPY");
+                return -1;
+            }
+            break;
         }
+
         error_setg_errno(errp, errno,
                          "Unable to write to socket");
         return -1;
     }
+
+    if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
+        sioc->zero_copy_queued++;
+    }
+
     return ret;
 }
 #else /* WIN32 */
@@ -560,11 +636,17 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
                                         size_t niov,
                                         int **fds,
                                         size_t *nfds,
+                                        int flags,
                                         Error **errp)
 {
     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
     ssize_t done = 0;
     ssize_t i;
+    int sflags = 0;
+
+    if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
+        sflags |= MSG_PEEK;
+    }
 
     for (i = 0; i < niov; i++) {
         ssize_t ret;
@@ -572,7 +654,7 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
         ret = recv(sioc->fd,
                    iov[i].iov_base,
                    iov[i].iov_len,
-                   0);
+                   sflags);
         if (ret < 0) {
             if (errno == EAGAIN) {
                 if (done) {
@@ -602,6 +684,7 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
                                          size_t niov,
                                          int *fds,
                                          size_t nfds,
+                                         int flags,
                                          Error **errp)
 {
     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
@@ -640,6 +723,85 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
 }
 #endif /* WIN32 */
 
+
+#ifdef QEMU_MSG_ZEROCOPY
+static int qio_channel_socket_flush(QIOChannel *ioc,
+                                    Error **errp)
+{
+    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+    struct msghdr msg = {};
+    struct sock_extended_err *serr;
+    struct cmsghdr *cm;
+    char control[CMSG_SPACE(sizeof(*serr))];
+    int received;
+    int ret;
+
+    if (sioc->zero_copy_queued == sioc->zero_copy_sent) {
+        return 0;
+    }
+
+    msg.msg_control = control;
+    msg.msg_controllen = sizeof(control);
+    memset(control, 0, sizeof(control));
+
+    ret = 1;
+
+    while (sioc->zero_copy_sent < sioc->zero_copy_queued) {
+        received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
+        if (received < 0) {
+            switch (errno) {
+            case EAGAIN:
+                /* Nothing on errqueue, wait until something is available */
+                qio_channel_wait(ioc, G_IO_ERR);
+                continue;
+            case EINTR:
+                continue;
+            default:
+                error_setg_errno(errp, errno,
+                                 "Unable to read errqueue");
+                return -1;
+            }
+        }
+
+        cm = CMSG_FIRSTHDR(&msg);
+        if (cm->cmsg_level != SOL_IP   && cm->cmsg_type != IP_RECVERR &&
+            cm->cmsg_level != SOL_IPV6 && cm->cmsg_type != IPV6_RECVERR) {
+            error_setg_errno(errp, EPROTOTYPE,
+                             "Wrong cmsg in errqueue");
+            return -1;
+        }
+
+        serr = (void *) CMSG_DATA(cm);
+        if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
+            error_setg_errno(errp, serr->ee_errno,
+                             "Error on socket");
+            return -1;
+        }
+        if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
+            error_setg_errno(errp, serr->ee_origin,
+                             "Error not from zero copy");
+            return -1;
+        }
+        if (serr->ee_data < serr->ee_info) {
+            error_setg_errno(errp, serr->ee_origin,
+                             "Wrong notification bounds");
+            return -1;
+        }
+
+        /* No errors, count successfully finished sendmsg()*/
+        sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1;
+
+        /* If any sendmsg() succeeded using zero copy, return 0 at the end */
+        if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) {
+            ret = 0;
+        }
+    }
+
+    return ret;
+}
+
+#endif /* QEMU_MSG_ZEROCOPY */
+
 static int
 qio_channel_socket_set_blocking(QIOChannel *ioc,
                                 bool enabled,
@@ -648,9 +810,9 @@ qio_channel_socket_set_blocking(QIOChannel *ioc,
     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
 
     if (enabled) {
-        qemu_set_block(sioc->fd);
+        qemu_socket_set_block(sioc->fd);
     } else {
-        qemu_set_nonblock(sioc->fd);
+        qemu_socket_set_nonblock(sioc->fd);
     }
     return 0;
 }
@@ -663,9 +825,9 @@ qio_channel_socket_set_delay(QIOChannel *ioc,
     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
     int v = enabled ? 0 : 1;
 
-    qemu_setsockopt(sioc->fd,
-                    IPPROTO_TCP, TCP_NODELAY,
-                    &v, sizeof(v));
+    setsockopt(sioc->fd,
+               IPPROTO_TCP, TCP_NODELAY,
+               &v, sizeof(v));
 }
 
 
@@ -686,19 +848,20 @@ qio_channel_socket_close(QIOChannel *ioc,
 {
     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
     int rc = 0;
+    Error *err = NULL;
 
     if (sioc->fd != -1) {
 #ifdef WIN32
-        WSAEventSelect(sioc->fd, NULL, 0);
+        qemu_socket_unselect(sioc->fd, NULL);
 #endif
         if (qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_LISTEN)) {
             socket_listen_cleanup(sioc->fd, errp);
         }
 
-        if (closesocket(sioc->fd) < 0) {
+        if (close(sioc->fd) < 0) {
             sioc->fd = -1;
-            error_setg_errno(errp, errno,
-                             "Unable to close socket");
+            error_setg_errno(&err, errno, "Unable to close socket");
+            error_propagate(errp, err);
             return -1;
         }
         sioc->fd = -1;
@@ -736,13 +899,17 @@ qio_channel_socket_shutdown(QIOChannel *ioc,
 }
 
 static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
-                                                  AioContext *ctx,
+                                                  AioContext *read_ctx,
                                                   IOHandler *io_read,
+                                                  AioContext *write_ctx,
                                                   IOHandler *io_write,
                                                   void *opaque)
 {
     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
-    aio_set_fd_handler(ctx, sioc->fd, false, io_read, io_write, NULL, opaque);
+
+    qio_channel_util_set_aio_fd_handler(sioc->fd, read_ctx, io_read,
+                                        sioc->fd, write_ctx, io_write,
+                                        opaque);
 }
 
 static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
@@ -768,6 +935,9 @@ static void qio_channel_socket_class_init(ObjectClass *klass,
     ioc_klass->io_set_delay = qio_channel_socket_set_delay;
     ioc_klass->io_create_watch = qio_channel_socket_create_watch;
     ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
+#ifdef QEMU_MSG_ZEROCOPY
+    ioc_klass->io_flush = qio_channel_socket_flush;
+#endif
 }
 
 static const TypeInfo qio_channel_socket_info = {