]> git.proxmox.com Git - mirror_qemu.git/blobdiff - migration/rdma.c
block: add missed aio_context_acquire into release_drive
[mirror_qemu.git] / migration / rdma.c
index f6a9992b3e3d1b5416289356861de335539234cb..674ccab12e12767fa57a48d1d44b2df56b4b3cb6 100644 (file)
@@ -2,10 +2,12 @@
  * RDMA protocol and interfaces
  *
  * Copyright IBM, Corp. 2010-2013
+ * Copyright Red Hat, Inc. 2015-2016
  *
  * Authors:
  *  Michael R. Hines <mrhines@us.ibm.com>
  *  Jiuxing Liu <jl@us.ibm.com>
+ *  Daniel P. Berrange <berrange@redhat.com>
  *
  * This work is licensed under the terms of the GNU GPL, version 2 or
  * later.  See the COPYING file in the top-level directory.
@@ -348,6 +350,7 @@ typedef struct RDMAContext {
      */
     int error_state;
     int error_reported;
+    int received_error;
 
     /*
      * Description of ram blocks used throughout the code.
@@ -374,14 +377,20 @@ typedef struct RDMAContext {
     GHashTable *blockmap;
 } RDMAContext;
 
-/*
- * Interface to the rest of the migration call stack.
- */
-typedef struct QEMUFileRDMA {
+#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
+#define QIO_CHANNEL_RDMA(obj)                                     \
+    OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
+
+typedef struct QIOChannelRDMA QIOChannelRDMA;
+
+
+struct QIOChannelRDMA {
+    QIOChannel parent;
     RDMAContext *rdma;
+    QEMUFile *file;
     size_t len;
-    void *file;
-} QEMUFileRDMA;
+    bool blocking; /* XXX we don't actually honour this yet */
+};
 
 /*
  * Main structure for IB Send/Recv control messages.
@@ -1503,7 +1512,7 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
 
     while (1) {
         /*
-         * Coroutine doesn't start until process_incoming_migration()
+         * Coroutine doesn't start until migration_fd_process_incoming()
          * so don't yield unless we know we're running inside of a coroutine.
          */
         if (rdma->migration_started_on_destination) {
@@ -1668,6 +1677,9 @@ static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
                 ", but got: %s (%d), length: %d",
                 control_desc[expecting], expecting,
                 control_desc[head->type], head->type, head->len);
+        if (head->type == RDMA_CONTROL_ERROR) {
+            rdma->received_error = true;
+        }
         return -EIO;
     }
     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
@@ -1926,10 +1938,7 @@ retry:
              * memset() + madvise() the entire chunk without RDMA.
              */
 
-            if (can_use_buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr,
-                                                   length)
-                   && buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr,
-                                                    length) == length) {
+            if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
                 RDMACompress comp = {
                                         .offset = current_addr,
                                         .value = 0,
@@ -2197,7 +2206,7 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
     int ret, idx;
 
     if (rdma->cm_id && rdma->connected) {
-        if (rdma->error_state) {
+        if (rdma->error_state && !rdma->received_error) {
             RDMAControlHeader head = { .len = 0,
                                        .type = RDMA_CONTROL_ERROR,
                                        .repeat = 1,
@@ -2518,15 +2527,19 @@ static void *qemu_rdma_data_init(const char *host_port, Error **errp)
  * SEND messages for control only.
  * VM's ram is handled with regular RDMA messages.
  */
-static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
-                                    int64_t pos, size_t size)
-{
-    QEMUFileRDMA *r = opaque;
-    QEMUFile *f = r->file;
-    RDMAContext *rdma = r->rdma;
-    size_t remaining = size;
-    uint8_t * data = (void *) buf;
+static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
+                                       const struct iovec *iov,
+                                       size_t niov,
+                                       int *fds,
+                                       size_t nfds,
+                                       Error **errp)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+    QEMUFile *f = rioc->file;
+    RDMAContext *rdma = rioc->rdma;
     int ret;
+    ssize_t done = 0;
+    size_t i;
 
     CHECK_ERROR_STATE();
 
@@ -2540,27 +2553,31 @@ static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
         return ret;
     }
 
-    while (remaining) {
-        RDMAControlHeader head;
+    for (i = 0; i < niov; i++) {
+        size_t remaining = iov[i].iov_len;
+        uint8_t * data = (void *)iov[i].iov_base;
+        while (remaining) {
+            RDMAControlHeader head;
 
-        r->len = MIN(remaining, RDMA_SEND_INCREMENT);
-        remaining -= r->len;
+            rioc->len = MIN(remaining, RDMA_SEND_INCREMENT);
+            remaining -= rioc->len;
 
-        /* Guaranteed to fit due to RDMA_SEND_INCREMENT MIN above */
-        head.len = (uint32_t)r->len;
-        head.type = RDMA_CONTROL_QEMU_FILE;
+            head.len = rioc->len;
+            head.type = RDMA_CONTROL_QEMU_FILE;
 
-        ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
+            ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
 
-        if (ret < 0) {
-            rdma->error_state = ret;
-            return ret;
-        }
+            if (ret < 0) {
+                rdma->error_state = ret;
+                return ret;
+            }
 
-        data += r->len;
+            data += rioc->len;
+            done += rioc->len;
+        }
     }
 
-    return size;
+    return done;
 }
 
 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
@@ -2585,41 +2602,74 @@ static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
  * RDMA links don't use bytestreams, so we have to
  * return bytes to QEMUFile opportunistically.
  */
-static ssize_t qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
-                                    int64_t pos, size_t size)
-{
-    QEMUFileRDMA *r = opaque;
-    RDMAContext *rdma = r->rdma;
+static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
+                                      const struct iovec *iov,
+                                      size_t niov,
+                                      int **fds,
+                                      size_t *nfds,
+                                      Error **errp)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+    RDMAContext *rdma = rioc->rdma;
     RDMAControlHeader head;
     int ret = 0;
+    ssize_t i;
+    size_t done = 0;
 
     CHECK_ERROR_STATE();
 
-    /*
-     * First, we hold on to the last SEND message we
-     * were given and dish out the bytes until we run
-     * out of bytes.
-     */
-    r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
-    if (r->len) {
-        return r->len;
-    }
+    for (i = 0; i < niov; i++) {
+        size_t want = iov[i].iov_len;
+        uint8_t *data = (void *)iov[i].iov_base;
 
-    /*
-     * Once we run out, we block and wait for another
-     * SEND message to arrive.
-     */
-    ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
+        /*
+         * First, we hold on to the last SEND message we
+         * were given and dish out the bytes until we run
+         * out of bytes.
+         */
+        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+        done += ret;
+        want -= ret;
+        /* Got what we needed, so go to next iovec */
+        if (want == 0) {
+            continue;
+        }
 
-    if (ret < 0) {
-        rdma->error_state = ret;
-        return ret;
-    }
+        /* If we got any data so far, then don't wait
+         * for more, just return what we have */
+        if (done > 0) {
+            break;
+        }
 
-    /*
-     * SEND was received with new bytes, now try again.
-     */
-    return qemu_rdma_fill(r->rdma, buf, size, 0);
+
+        /* We've got nothing at all, so lets wait for
+         * more to arrive
+         */
+        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
+
+        if (ret < 0) {
+            rdma->error_state = ret;
+            return ret;
+        }
+
+        /*
+         * SEND was received with new bytes, now try again.
+         */
+        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+        done += ret;
+        want -= ret;
+
+        /* Still didn't get enough, so lets just return */
+        if (want) {
+            if (done == 0) {
+                return QIO_CHANNEL_ERR_BLOCK;
+            } else {
+                break;
+            }
+        }
+    }
+    rioc->len = done;
+    return rioc->len;
 }
 
 /*
@@ -2646,15 +2696,125 @@ static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
     return 0;
 }
 
-static int qemu_rdma_close(void *opaque)
+
+static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
+                                         bool blocking,
+                                         Error **errp)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+    /* XXX we should make readv/writev actually honour this :-) */
+    rioc->blocking = blocking;
+    return 0;
+}
+
+
+typedef struct QIOChannelRDMASource QIOChannelRDMASource;
+struct QIOChannelRDMASource {
+    GSource parent;
+    QIOChannelRDMA *rioc;
+    GIOCondition condition;
+};
+
+static gboolean
+qio_channel_rdma_source_prepare(GSource *source,
+                                gint *timeout)
+{
+    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
+    RDMAContext *rdma = rsource->rioc->rdma;
+    GIOCondition cond = 0;
+    *timeout = -1;
+
+    if (rdma->wr_data[0].control_len) {
+        cond |= G_IO_IN;
+    }
+    cond |= G_IO_OUT;
+
+    return cond & rsource->condition;
+}
+
+static gboolean
+qio_channel_rdma_source_check(GSource *source)
+{
+    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
+    RDMAContext *rdma = rsource->rioc->rdma;
+    GIOCondition cond = 0;
+
+    if (rdma->wr_data[0].control_len) {
+        cond |= G_IO_IN;
+    }
+    cond |= G_IO_OUT;
+
+    return cond & rsource->condition;
+}
+
+static gboolean
+qio_channel_rdma_source_dispatch(GSource *source,
+                                 GSourceFunc callback,
+                                 gpointer user_data)
+{
+    QIOChannelFunc func = (QIOChannelFunc)callback;
+    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
+    RDMAContext *rdma = rsource->rioc->rdma;
+    GIOCondition cond = 0;
+
+    if (rdma->wr_data[0].control_len) {
+        cond |= G_IO_IN;
+    }
+    cond |= G_IO_OUT;
+
+    return (*func)(QIO_CHANNEL(rsource->rioc),
+                   (cond & rsource->condition),
+                   user_data);
+}
+
+static void
+qio_channel_rdma_source_finalize(GSource *source)
+{
+    QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
+
+    object_unref(OBJECT(ssource->rioc));
+}
+
+GSourceFuncs qio_channel_rdma_source_funcs = {
+    qio_channel_rdma_source_prepare,
+    qio_channel_rdma_source_check,
+    qio_channel_rdma_source_dispatch,
+    qio_channel_rdma_source_finalize
+};
+
+static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
+                                              GIOCondition condition)
 {
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+    QIOChannelRDMASource *ssource;
+    GSource *source;
+
+    source = g_source_new(&qio_channel_rdma_source_funcs,
+                          sizeof(QIOChannelRDMASource));
+    ssource = (QIOChannelRDMASource *)source;
+
+    ssource->rioc = rioc;
+    object_ref(OBJECT(rioc));
+
+    ssource->condition = condition;
+
+    return source;
+}
+
+
+static int qio_channel_rdma_close(QIOChannel *ioc,
+                                  Error **errp)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
     trace_qemu_rdma_close();
-    QEMUFileRDMA *r = opaque;
-    if (r->rdma) {
-        qemu_rdma_cleanup(r->rdma);
-        g_free(r->rdma);
+    if (rioc->rdma) {
+        if (!rioc->rdma->error_state) {
+            rioc->rdma->error_state = qemu_file_get_error(rioc->file);
+        }
+        qemu_rdma_cleanup(rioc->rdma);
+        g_free(rioc->rdma);
+        rioc->rdma = NULL;
     }
-    g_free(r);
     return 0;
 }
 
@@ -2696,8 +2856,8 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
                                   ram_addr_t block_offset, ram_addr_t offset,
                                   size_t size, uint64_t *bytes_sent)
 {
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+    RDMAContext *rdma = rioc->rdma;
     int ret;
 
     CHECK_ERROR_STATE();
@@ -2951,8 +3111,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
                              };
     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
                                  .repeat = 1 };
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+    RDMAContext *rdma = rioc->rdma;
     RDMALocalBlocks *local = &rdma->local_ram_blocks;
     RDMAControlHeader head;
     RDMARegister *reg, *registers;
@@ -3207,9 +3367,10 @@ out:
  * We've already built our local RAMBlock list, but not yet sent the list to
  * the source.
  */
-static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char *name)
+static int
+rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
 {
-    RDMAContext *rdma = rfile->rdma;
+    RDMAContext *rdma = rioc->rdma;
     int curr;
     int found = -1;
 
@@ -3251,8 +3412,8 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
                                         uint64_t flags, void *data)
 {
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+    RDMAContext *rdma = rioc->rdma;
 
     CHECK_ERROR_STATE();
 
@@ -3271,8 +3432,8 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
                                        uint64_t flags, void *data)
 {
     Error *local_err = NULL, **errp = &local_err;
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+    RDMAContext *rdma = rioc->rdma;
     RDMAControlHeader head = { .len = 0, .repeat = 1 };
     int ret = 0;
 
@@ -3368,47 +3529,74 @@ err:
     return ret;
 }
 
-static int qemu_rdma_get_fd(void *opaque)
-{
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
-
-    return rdma->comp_channel->fd;
-}
-
-static const QEMUFileOps rdma_read_ops = {
-    .get_buffer    = qemu_rdma_get_buffer,
-    .get_fd        = qemu_rdma_get_fd,
-    .close         = qemu_rdma_close,
+static const QEMUFileHooks rdma_read_hooks = {
     .hook_ram_load = rdma_load_hook,
 };
 
-static const QEMUFileOps rdma_write_ops = {
-    .put_buffer         = qemu_rdma_put_buffer,
-    .close              = qemu_rdma_close,
+static const QEMUFileHooks rdma_write_hooks = {
     .before_ram_iterate = qemu_rdma_registration_start,
     .after_ram_iterate  = qemu_rdma_registration_stop,
     .save_page          = qemu_rdma_save_page,
 };
 
-static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
+
+static void qio_channel_rdma_finalize(Object *obj)
 {
-    QEMUFileRDMA *r;
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
+    if (rioc->rdma) {
+        qemu_rdma_cleanup(rioc->rdma);
+        g_free(rioc->rdma);
+        rioc->rdma = NULL;
+    }
+}
+
+static void qio_channel_rdma_class_init(ObjectClass *klass,
+                                        void *class_data G_GNUC_UNUSED)
+{
+    QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
+
+    ioc_klass->io_writev = qio_channel_rdma_writev;
+    ioc_klass->io_readv = qio_channel_rdma_readv;
+    ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
+    ioc_klass->io_close = qio_channel_rdma_close;
+    ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
+}
+
+static const TypeInfo qio_channel_rdma_info = {
+    .parent = TYPE_QIO_CHANNEL,
+    .name = TYPE_QIO_CHANNEL_RDMA,
+    .instance_size = sizeof(QIOChannelRDMA),
+    .instance_finalize = qio_channel_rdma_finalize,
+    .class_init = qio_channel_rdma_class_init,
+};
+
+static void qio_channel_rdma_register_types(void)
+{
+    type_register_static(&qio_channel_rdma_info);
+}
+
+type_init(qio_channel_rdma_register_types);
+
+static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
+{
+    QIOChannelRDMA *rioc;
 
     if (qemu_file_mode_is_not_valid(mode)) {
         return NULL;
     }
 
-    r = g_new0(QEMUFileRDMA, 1);
-    r->rdma = rdma;
+    rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
+    rioc->rdma = rdma;
 
     if (mode[0] == 'w') {
-        r->file = qemu_fopen_ops(r, &rdma_write_ops);
+        rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
+        qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
     } else {
-        r->file = qemu_fopen_ops(r, &rdma_read_ops);
+        rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
+        qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
     }
 
-    return r->file;
+    return rioc->file;
 }
 
 static void rdma_accept_incoming_migration(void *opaque)
@@ -3436,7 +3624,7 @@ static void rdma_accept_incoming_migration(void *opaque)
     }
 
     rdma->migration_started_on_destination = 1;
-    process_incoming_migration(f);
+    migration_fd_process_incoming(f);
 }
 
 void rdma_start_incoming_migration(const char *host_port, Error **errp)
@@ -3481,16 +3669,14 @@ void rdma_start_outgoing_migration(void *opaque,
                             const char *host_port, Error **errp)
 {
     MigrationState *s = opaque;
-    Error *local_err = NULL, **temp = &local_err;
-    RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err);
+    RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
     int ret = 0;
 
     if (rdma == NULL) {
-        ERROR(temp, "Failed to initialize RDMA data structures! %d", ret);
         goto err;
     }
 
-    ret = qemu_rdma_source_init(rdma, &local_err,
+    ret = qemu_rdma_source_init(rdma, errp,
         s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]);
 
     if (ret) {
@@ -3498,7 +3684,7 @@ void rdma_start_outgoing_migration(void *opaque,
     }
 
     trace_rdma_start_outgoing_migration_after_rdma_source_init();
-    ret = qemu_rdma_connect(rdma, &local_err);
+    ret = qemu_rdma_connect(rdma, errp);
 
     if (ret) {
         goto err;
@@ -3510,7 +3696,5 @@ void rdma_start_outgoing_migration(void *opaque,
     migrate_fd_connect(s);
     return;
 err:
-    error_propagate(errp, local_err);
     g_free(rdma);
-    migrate_fd_error(s);
 }