]> git.proxmox.com Git - mirror_qemu.git/blobdiff - io/channel.c
Merge remote-tracking branch 'remotes/kraxel/tags/vga-20190705-pull-request' into...
[mirror_qemu.git] / io / channel.c
index dd6fc0eb28898bc134d6e6e729ff5c6e23ca18d7..e4376eb0bc405eadf7ff5785373ac4aed765e805 100644 (file)
 
 #include "qemu/osdep.h"
 #include "io/channel.h"
-#include "qemu/coroutine.h"
+#include "qapi/error.h"
+#include "qemu/main-loop.h"
+#include "qemu/module.h"
+#include "qemu/iov.h"
 
 bool qio_channel_has_feature(QIOChannel *ioc,
                              QIOChannelFeature feature)
@@ -29,6 +32,21 @@ bool qio_channel_has_feature(QIOChannel *ioc,
 }
 
 
+void qio_channel_set_feature(QIOChannel *ioc,
+                             QIOChannelFeature feature)
+{
+    ioc->features |= (1 << feature);
+}
+
+
+void qio_channel_set_name(QIOChannel *ioc,
+                          const char *name)
+{
+    g_free(ioc->name);
+    ioc->name = g_strdup(name);
+}
+
+
 ssize_t qio_channel_readv_full(QIOChannel *ioc,
                                const struct iovec *iov,
                                size_t niov,
@@ -39,7 +57,7 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 
     if ((fds || nfds) &&
-        !(ioc->features & (1 << QIO_CHANNEL_FEATURE_FD_PASS))) {
+        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
         error_setg_errno(errp, EINVAL,
                          "Channel does not support file descriptor passing");
         return -1;
@@ -59,7 +77,7 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 
     if ((fds || nfds) &&
-        !(ioc->features & (1 << QIO_CHANNEL_FEATURE_FD_PASS))) {
+        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
         error_setg_errno(errp, EINVAL,
                          "Channel does not support file descriptor passing");
         return -1;
@@ -69,6 +87,109 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
 }
 
 
+int qio_channel_readv_all_eof(QIOChannel *ioc,
+                              const struct iovec *iov,
+                              size_t niov,
+                              Error **errp)
+{
+    int ret = -1;
+    struct iovec *local_iov = g_new(struct iovec, niov);
+    struct iovec *local_iov_head = local_iov;
+    unsigned int nlocal_iov = niov;
+    bool partial = false;
+
+    nlocal_iov = iov_copy(local_iov, nlocal_iov,
+                          iov, niov,
+                          0, iov_size(iov, niov));
+
+    while (nlocal_iov > 0) {
+        ssize_t len;
+        len = qio_channel_readv(ioc, local_iov, nlocal_iov, errp);
+        if (len == QIO_CHANNEL_ERR_BLOCK) {
+            if (qemu_in_coroutine()) {
+                qio_channel_yield(ioc, G_IO_IN);
+            } else {
+                qio_channel_wait(ioc, G_IO_IN);
+            }
+            continue;
+        } else if (len < 0) {
+            goto cleanup;
+        } else if (len == 0) {
+            if (partial) {
+                error_setg(errp,
+                           "Unexpected end-of-file before all bytes were read");
+            } else {
+                ret = 0;
+            }
+            goto cleanup;
+        }
+
+        partial = true;
+        iov_discard_front(&local_iov, &nlocal_iov, len);
+    }
+
+    ret = 1;
+
+ cleanup:
+    g_free(local_iov_head);
+    return ret;
+}
+
+int qio_channel_readv_all(QIOChannel *ioc,
+                          const struct iovec *iov,
+                          size_t niov,
+                          Error **errp)
+{
+    int ret = qio_channel_readv_all_eof(ioc, iov, niov, errp);
+
+    if (ret == 0) {
+        ret = -1;
+        error_setg(errp,
+                   "Unexpected end-of-file before all bytes were read");
+    } else if (ret == 1) {
+        ret = 0;
+    }
+    return ret;
+}
+
+int qio_channel_writev_all(QIOChannel *ioc,
+                           const struct iovec *iov,
+                           size_t niov,
+                           Error **errp)
+{
+    int ret = -1;
+    struct iovec *local_iov = g_new(struct iovec, niov);
+    struct iovec *local_iov_head = local_iov;
+    unsigned int nlocal_iov = niov;
+
+    nlocal_iov = iov_copy(local_iov, nlocal_iov,
+                          iov, niov,
+                          0, iov_size(iov, niov));
+
+    while (nlocal_iov > 0) {
+        ssize_t len;
+        len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp);
+        if (len == QIO_CHANNEL_ERR_BLOCK) {
+            if (qemu_in_coroutine()) {
+                qio_channel_yield(ioc, G_IO_OUT);
+            } else {
+                qio_channel_wait(ioc, G_IO_OUT);
+            }
+            continue;
+        }
+        if (len < 0) {
+            goto cleanup;
+        }
+
+        iov_discard_front(&local_iov, &nlocal_iov, len);
+    }
+
+    ret = 0;
+ cleanup:
+    g_free(local_iov_head);
+    return ret;
+}
+
 ssize_t qio_channel_readv(QIOChannel *ioc,
                           const struct iovec *iov,
                           size_t niov,
@@ -107,6 +228,36 @@ ssize_t qio_channel_write(QIOChannel *ioc,
 }
 
 
+int qio_channel_read_all_eof(QIOChannel *ioc,
+                             char *buf,
+                             size_t buflen,
+                             Error **errp)
+{
+    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
+    return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
+}
+
+
+int qio_channel_read_all(QIOChannel *ioc,
+                         char *buf,
+                         size_t buflen,
+                         Error **errp)
+{
+    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
+    return qio_channel_readv_all(ioc, &iov, 1, errp);
+}
+
+
+int qio_channel_write_all(QIOChannel *ioc,
+                          const char *buf,
+                          size_t buflen,
+                          Error **errp)
+{
+    struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
+    return qio_channel_writev_all(ioc, &iov, 1, errp);
+}
+
+
 int qio_channel_set_blocking(QIOChannel *ioc,
                               bool enabled,
                               Error **errp)
@@ -128,15 +279,33 @@ GSource *qio_channel_create_watch(QIOChannel *ioc,
                                   GIOCondition condition)
 {
     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
-    return klass->io_create_watch(ioc, condition);
+    GSource *ret = klass->io_create_watch(ioc, condition);
+
+    if (ioc->name) {
+        g_source_set_name(ret, ioc->name);
+    }
+
+    return ret;
 }
 
 
-guint qio_channel_add_watch(QIOChannel *ioc,
-                            GIOCondition condition,
-                            QIOChannelFunc func,
-                            gpointer user_data,
-                            GDestroyNotify notify)
+void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
+                                    AioContext *ctx,
+                                    IOHandler *io_read,
+                                    IOHandler *io_write,
+                                    void *opaque)
+{
+    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+    klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
+}
+
+guint qio_channel_add_watch_full(QIOChannel *ioc,
+                                 GIOCondition condition,
+                                 QIOChannelFunc func,
+                                 gpointer user_data,
+                                 GDestroyNotify notify,
+                                 GMainContext *context)
 {
     GSource *source;
     guint id;
@@ -145,12 +314,39 @@ guint qio_channel_add_watch(QIOChannel *ioc,
 
     g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
 
-    id = g_source_attach(source, NULL);
+    id = g_source_attach(source, context);
     g_source_unref(source);
 
     return id;
 }
 
+guint qio_channel_add_watch(QIOChannel *ioc,
+                            GIOCondition condition,
+                            QIOChannelFunc func,
+                            gpointer user_data,
+                            GDestroyNotify notify)
+{
+    return qio_channel_add_watch_full(ioc, condition, func,
+                                      user_data, notify, NULL);
+}
+
+GSource *qio_channel_add_watch_source(QIOChannel *ioc,
+                                      GIOCondition condition,
+                                      QIOChannelFunc func,
+                                      gpointer user_data,
+                                      GDestroyNotify notify,
+                                      GMainContext *context)
+{
+    GSource *source;
+    guint id;
+
+    id = qio_channel_add_watch_full(ioc, condition, func,
+                                    user_data, notify, context);
+    source = g_main_context_find_source_by_id(context, id);
+    g_source_ref(source);
+    return source;
+}
+
 
 int qio_channel_shutdown(QIOChannel *ioc,
                          QIOChannelShutdown how,
@@ -205,37 +401,85 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
 }
 
 
-typedef struct QIOChannelYieldData QIOChannelYieldData;
-struct QIOChannelYieldData {
-    QIOChannel *ioc;
-    Coroutine *co;
-};
+static void qio_channel_restart_read(void *opaque)
+{
+    QIOChannel *ioc = opaque;
+    Coroutine *co = ioc->read_coroutine;
 
+    /* Assert that aio_co_wake() reenters the coroutine directly */
+    assert(qemu_get_current_aio_context() ==
+           qemu_coroutine_get_aio_context(co));
+    aio_co_wake(co);
+}
 
-static gboolean qio_channel_yield_enter(QIOChannel *ioc,
-                                        GIOCondition condition,
-                                        gpointer opaque)
+static void qio_channel_restart_write(void *opaque)
 {
-    QIOChannelYieldData *data = opaque;
-    qemu_coroutine_enter(data->co, NULL);
-    return FALSE;
+    QIOChannel *ioc = opaque;
+    Coroutine *co = ioc->write_coroutine;
+
+    /* Assert that aio_co_wake() reenters the coroutine directly */
+    assert(qemu_get_current_aio_context() ==
+           qemu_coroutine_get_aio_context(co));
+    aio_co_wake(co);
+}
+
+static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
+{
+    IOHandler *rd_handler = NULL, *wr_handler = NULL;
+    AioContext *ctx;
+
+    if (ioc->read_coroutine) {
+        rd_handler = qio_channel_restart_read;
+    }
+    if (ioc->write_coroutine) {
+        wr_handler = qio_channel_restart_write;
+    }
+
+    ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
+    qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
 }
 
+void qio_channel_attach_aio_context(QIOChannel *ioc,
+                                    AioContext *ctx)
+{
+    assert(!ioc->read_coroutine);
+    assert(!ioc->write_coroutine);
+    ioc->ctx = ctx;
+}
+
+void qio_channel_detach_aio_context(QIOChannel *ioc)
+{
+    ioc->read_coroutine = NULL;
+    ioc->write_coroutine = NULL;
+    qio_channel_set_aio_fd_handlers(ioc);
+    ioc->ctx = NULL;
+}
 
 void coroutine_fn qio_channel_yield(QIOChannel *ioc,
                                     GIOCondition condition)
 {
-    QIOChannelYieldData data;
-
     assert(qemu_in_coroutine());
-    data.ioc = ioc;
-    data.co = qemu_coroutine_self();
-    qio_channel_add_watch(ioc,
-                          condition,
-                          qio_channel_yield_enter,
-                          &data,
-                          NULL);
+    if (condition == G_IO_IN) {
+        assert(!ioc->read_coroutine);
+        ioc->read_coroutine = qemu_coroutine_self();
+    } else if (condition == G_IO_OUT) {
+        assert(!ioc->write_coroutine);
+        ioc->write_coroutine = qemu_coroutine_self();
+    } else {
+        abort();
+    }
+    qio_channel_set_aio_fd_handlers(ioc);
     qemu_coroutine_yield();
+
+    /* Allow interrupting the operation by reentering the coroutine other than
+     * through the aio_fd_handlers. */
+    if (condition == G_IO_IN && ioc->read_coroutine) {
+        ioc->read_coroutine = NULL;
+        qio_channel_set_aio_fd_handlers(ioc);
+    } else if (condition == G_IO_OUT && ioc->write_coroutine) {
+        ioc->write_coroutine = NULL;
+        qio_channel_set_aio_fd_handlers(ioc);
+    }
 }
 
 
@@ -274,24 +518,24 @@ void qio_channel_wait(QIOChannel *ioc,
 }
 
 
-#ifdef _WIN32
 static void qio_channel_finalize(Object *obj)
 {
     QIOChannel *ioc = QIO_CHANNEL(obj);
 
+    g_free(ioc->name);
+
+#ifdef _WIN32
     if (ioc->event) {
         CloseHandle(ioc->event);
     }
-}
 #endif
+}
 
 static const TypeInfo qio_channel_info = {
     .parent = TYPE_OBJECT,
     .name = TYPE_QIO_CHANNEL,
     .instance_size = sizeof(QIOChannel),
-#ifdef _WIN32
     .instance_finalize = qio_channel_finalize,
-#endif
     .abstract = true,
     .class_size = sizeof(QIOChannelClass),
 };