]> git.proxmox.com Git - mirror_qemu.git/blobdiff - migration/multifd.c
Merge tag 'for-upstream' of https://gitlab.com/bonzini/qemu into staging
[mirror_qemu.git] / migration / multifd.c
index cac8496edc795842ce3212f0b38b9deb3926a434..25cbc6dc6be83cc089b00c03e491ed3c1c5eaafa 100644 (file)
 #include "qapi/error.h"
 #include "ram.h"
 #include "migration.h"
+#include "migration-stats.h"
 #include "socket.h"
 #include "tls.h"
 #include "qemu-file.h"
 #include "trace.h"
 #include "multifd.h"
 #include "threadinfo.h"
-
+#include "options.h"
 #include "qemu/yank.h"
 #include "io/channel-socket.h"
 #include "yank_functions.h"
@@ -174,6 +175,7 @@ void multifd_register_ops(int method, MultiFDMethods *ops)
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 {
     MultiFDInit_t msg = {};
+    size_t size = sizeof(msg);
     int ret;
 
     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
@@ -181,10 +183,11 @@ static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
     msg.id = p->id;
     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
 
-    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
+    ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
     if (ret != 0) {
         return -1;
     }
+    stat64_add(&mig_stats.multifd_bytes, size);
     return 0;
 }
 
@@ -225,20 +228,20 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
     }
 
     if (msg.id > migrate_multifd_channels()) {
-        error_setg(errp, "multifd: received channel version %u "
-                   "expected %u", msg.version, MULTIFD_VERSION);
+        error_setg(errp, "multifd: received channel id %u is greater than "
+                   "number of channels %u", msg.id, migrate_multifd_channels());
         return -1;
     }
 
     return msg.id;
 }
 
-static MultiFDPages_t *multifd_pages_init(size_t size)
+static MultiFDPages_t *multifd_pages_init(uint32_t n)
 {
     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
 
-    pages->allocated = size;
-    pages->offset = g_new0(ram_addr_t, size);
+    pages->allocated = n;
+    pages->offset = g_new0(ram_addr_t, n);
 
     return pages;
 }
@@ -247,7 +250,6 @@ static void multifd_pages_clear(MultiFDPages_t *pages)
 {
     pages->num = 0;
     pages->allocated = 0;
-    pages->packet_num = 0;
     pages->block = NULL;
     g_free(pages->offset);
     pages->offset = NULL;
@@ -280,7 +282,6 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
 {
     MultiFDPacket_t *packet = p->packet;
-    RAMBlock *block;
     int i;
 
     packet->magic = be32_to_cpu(packet->magic);
@@ -330,21 +331,21 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
 
     /* make sure that ramblock is 0 terminated */
     packet->ramblock[255] = 0;
-    block = qemu_ram_block_by_name(packet->ramblock);
-    if (!block) {
+    p->block = qemu_ram_block_by_name(packet->ramblock);
+    if (!p->block) {
         error_setg(errp, "multifd: unknown ram block %s",
                    packet->ramblock);
         return -1;
     }
 
-    p->host = block->host;
+    p->host = p->block->host;
     for (i = 0; i < p->normal_num; i++) {
         uint64_t offset = be64_to_cpu(packet->offset[i]);
 
-        if (offset > (block->used_length - p->page_size)) {
+        if (offset > (p->block->used_length - p->page_size)) {
             error_setg(errp, "multifd: offset too long %" PRIu64
                        " (max " RAM_ADDR_FMT ")",
-                       offset, block->used_length);
+                       offset, p->block->used_length);
             return -1;
         }
         p->normal[i] = offset;
@@ -389,13 +390,12 @@ struct {
  * false.
  */
 
-static int multifd_send_pages(QEMUFile *f)
+static int multifd_send_pages(void)
 {
     int i;
     static int next_channel;
     MultiFDSendParams *p = NULL; /* make happy gcc */
     MultiFDPages_t *pages = multifd_send_state->pages;
-    uint64_t transferred;
 
     if (qatomic_read(&multifd_send_state->exiting)) {
         return -1;
@@ -430,17 +430,13 @@ static int multifd_send_pages(QEMUFile *f)
     p->packet_num = multifd_send_state->packet_num++;
     multifd_send_state->pages = p->pages;
     p->pages = pages;
-    transferred = ((uint64_t) pages->num) * p->page_size + p->packet_len;
-    qemu_file_acct_rate_limit(f, transferred);
-    ram_counters.multifd_bytes += transferred;
-    stat64_add(&ram_atomic_counters.transferred, transferred);
     qemu_mutex_unlock(&p->mutex);
     qemu_sem_post(&p->sem);
 
     return 1;
 }
 
-int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
+int multifd_queue_page(RAMBlock *block, ram_addr_t offset)
 {
     MultiFDPages_t *pages = multifd_send_state->pages;
     bool changed = false;
@@ -460,12 +456,12 @@ int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
         changed = true;
     }
 
-    if (multifd_send_pages(f) < 0) {
+    if (multifd_send_pages() < 0) {
         return -1;
     }
 
     if (changed) {
-        return multifd_queue_page(f, block, offset);
+        return multifd_queue_page(block, offset);
     }
 
     return 1;
@@ -512,11 +508,16 @@ static void multifd_send_terminate_threads(Error *err)
     }
 }
 
+static int multifd_send_channel_destroy(QIOChannel *send)
+{
+    return socket_send_channel_destroy(send);
+}
+
 void multifd_save_cleanup(void)
 {
     int i;
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_multifd()) {
         return;
     }
     multifd_send_terminate_threads(NULL);
@@ -534,7 +535,7 @@ void multifd_save_cleanup(void)
         if (p->registered_yank) {
             migration_ioc_unregister_yank(p->c);
         }
-        socket_send_channel_destroy(p->c);
+        multifd_send_channel_destroy(p->c);
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
@@ -576,22 +577,22 @@ static int multifd_zero_copy_flush(QIOChannel *c)
         return -1;
     }
     if (ret == 1) {
-        dirty_sync_missed_zero_copy();
+        stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
     }
 
     return ret;
 }
 
-int multifd_send_sync_main(QEMUFile *f)
+int multifd_send_sync_main(void)
 {
     int i;
     bool flush_zero_copy;
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_multifd()) {
         return 0;
     }
     if (multifd_send_state->pages->num) {
-        if (multifd_send_pages(f) < 0) {
+        if (multifd_send_pages() < 0) {
             error_report("%s: multifd_send_pages fail", __func__);
             return -1;
         }
@@ -608,7 +609,7 @@ int multifd_send_sync_main(QEMUFile *f)
      * all the dirty bitmaps.
      */
 
-    flush_zero_copy = migrate_use_zero_copy_send();
+    flush_zero_copy = migrate_zero_copy_send();
 
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
@@ -626,15 +627,13 @@ int multifd_send_sync_main(QEMUFile *f)
         p->packet_num = multifd_send_state->packet_num++;
         p->flags |= MULTIFD_FLAG_SYNC;
         p->pending_job++;
-        qemu_file_acct_rate_limit(f, p->packet_len);
-        ram_counters.multifd_bytes += p->packet_len;
-        stat64_add(&ram_atomic_counters.transferred, p->packet_len);
         qemu_mutex_unlock(&p->mutex);
         qemu_sem_post(&p->sem);
     }
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
+        qemu_sem_wait(&multifd_send_state->channels_ready);
         trace_multifd_send_sync_main_wait(p->id);
         qemu_sem_wait(&p->sem_sync);
 
@@ -653,9 +652,9 @@ static void *multifd_send_thread(void *opaque)
     MigrationThread *thread = NULL;
     Error *local_err = NULL;
     int ret = 0;
-    bool use_zero_copy_send = migrate_use_zero_copy_send();
+    bool use_zero_copy_send = migrate_zero_copy_send();
 
-    thread = MigrationThreadAdd(p->name, qemu_get_thread_id());
+    thread = migration_threads_add(p->name, qemu_get_thread_id());
 
     trace_multifd_send_thread_start(p->id);
     rcu_register_thread();
@@ -668,6 +667,7 @@ static void *multifd_send_thread(void *opaque)
     p->num_packets = 1;
 
     while (true) {
+        qemu_sem_post(&multifd_send_state->channels_ready);
         qemu_sem_wait(&p->sem);
 
         if (qatomic_read(&multifd_send_state->exiting)) {
@@ -677,7 +677,7 @@ static void *multifd_send_thread(void *opaque)
 
         if (p->pending_job) {
             uint64_t packet_num = p->packet_num;
-            uint32_t flags = p->flags;
+            uint32_t flags;
             p->normal_num = 0;
 
             if (use_zero_copy_send) {
@@ -699,6 +699,7 @@ static void *multifd_send_thread(void *opaque)
                 }
             }
             multifd_send_fill_packet(p);
+            flags = p->flags;
             p->flags = 0;
             p->num_packets++;
             p->total_normal_pages += p->normal_num;
@@ -728,6 +729,9 @@ static void *multifd_send_thread(void *opaque)
                 break;
             }
 
+            stat64_add(&mig_stats.multifd_bytes,
+                       p->next_packet_size + p->packet_len);
+            p->next_packet_size = 0;
             qemu_mutex_lock(&p->mutex);
             p->pending_job--;
             qemu_mutex_unlock(&p->mutex);
@@ -735,10 +739,6 @@ static void *multifd_send_thread(void *opaque)
             if (flags & MULTIFD_FLAG_SYNC) {
                 qemu_sem_post(&p->sem_sync);
             }
-            qemu_sem_post(&multifd_send_state->channels_ready);
-        } else if (p->quit) {
-            qemu_mutex_unlock(&p->mutex);
-            break;
         } else {
             qemu_mutex_unlock(&p->mutex);
             /* sometimes there are spurious wakeups */
@@ -746,19 +746,13 @@ static void *multifd_send_thread(void *opaque)
     }
 
 out:
-    if (local_err) {
+    if (ret) {
+        assert(local_err);
         trace_multifd_send_error(p->id);
         multifd_send_terminate_threads(local_err);
-        error_free(local_err);
-    }
-
-    /*
-     * Error happen, I will exit, but I can't just leave, tell
-     * who pay attention to me.
-     */
-    if (ret != 0) {
         qemu_sem_post(&p->sem_sync);
         qemu_sem_post(&multifd_send_state->channels_ready);
+        error_free(local_err);
     }
 
     qemu_mutex_lock(&p->mutex);
@@ -766,7 +760,7 @@ out:
     qemu_mutex_unlock(&p->mutex);
 
     rcu_unregister_thread();
-    MigrationThreadDel(thread);
+    migration_threads_remove(thread);
     trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages);
 
     return NULL;
@@ -774,7 +768,7 @@ out:
 
 static bool multifd_channel_connect(MultiFDSendParams *p,
                                     QIOChannel *ioc,
-                                    Error *error);
+                                    Error **errp);
 
 static void multifd_tls_outgoing_handshake(QIOTask *task,
                                            gpointer opaque)
@@ -783,21 +777,24 @@ static void multifd_tls_outgoing_handshake(QIOTask *task,
     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
     Error *err = NULL;
 
-    if (qio_task_propagate_error(task, &err)) {
-        trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
-    } else {
+    if (!qio_task_propagate_error(task, &err)) {
         trace_multifd_tls_outgoing_handshake_complete(ioc);
+        if (multifd_channel_connect(p, ioc, &err)) {
+            return;
+        }
     }
 
-    if (!multifd_channel_connect(p, ioc, err)) {
-        /*
-         * Error happen, mark multifd_send_thread status as 'quit' although it
-         * is not created, and then tell who pay attention to me.
-         */
-        p->quit = true;
-        qemu_sem_post(&multifd_send_state->channels_ready);
-        qemu_sem_post(&p->sem_sync);
-    }
+    trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
+
+    migrate_set_error(migrate_get_current(), err);
+    /*
+     * Error happen, mark multifd_send_thread status as 'quit' although it
+     * is not created, and then tell who pay attention to me.
+     */
+    p->quit = true;
+    qemu_sem_post(&multifd_send_state->channels_ready);
+    qemu_sem_post(&p->sem_sync);
+    error_free(err);
 }
 
 static void *multifd_tls_handshake_thread(void *opaque)
@@ -813,7 +810,7 @@ static void *multifd_tls_handshake_thread(void *opaque)
     return NULL;
 }
 
-static void multifd_tls_channel_connect(MultiFDSendParams *p,
+static bool multifd_tls_channel_connect(MultiFDSendParams *p,
                                         QIOChannel *ioc,
                                         Error **errp)
 {
@@ -821,9 +818,9 @@ static void multifd_tls_channel_connect(MultiFDSendParams *p,
     const char *hostname = s->hostname;
     QIOChannelTLS *tioc;
 
-    tioc = migration_tls_client_create(s, ioc, hostname, errp);
+    tioc = migration_tls_client_create(ioc, hostname, errp);
     if (!tioc) {
-        return;
+        return false;
     }
 
     object_unref(OBJECT(ioc));
@@ -833,38 +830,31 @@ static void multifd_tls_channel_connect(MultiFDSendParams *p,
     qemu_thread_create(&p->thread, "multifd-tls-handshake-worker",
                        multifd_tls_handshake_thread, p,
                        QEMU_THREAD_JOINABLE);
+    return true;
 }
 
 static bool multifd_channel_connect(MultiFDSendParams *p,
                                     QIOChannel *ioc,
-                                    Error *error)
+                                    Error **errp)
 {
     trace_multifd_set_outgoing_channel(
         ioc, object_get_typename(OBJECT(ioc)),
-        migrate_get_current()->hostname, error);
+        migrate_get_current()->hostname);
 
-    if (error) {
-        return false;
-    }
     if (migrate_channel_requires_tls_upgrade(ioc)) {
-        multifd_tls_channel_connect(p, ioc, &error);
-        if (!error) {
-            /*
-             * tls_channel_connect will call back to this
-             * function after the TLS handshake,
-             * so we mustn't call multifd_send_thread until then
-             */
-            return true;
-        } else {
-            return false;
-        }
-    } else {
-        migration_ioc_register_yank(ioc);
-        p->registered_yank = true;
-        p->c = ioc;
-        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
-                           QEMU_THREAD_JOINABLE);
+        /*
+         * tls_channel_connect will call back to this
+         * function after the TLS handshake,
+         * so we mustn't call multifd_send_thread until then
+         */
+        return multifd_tls_channel_connect(p, ioc, errp);
     }
+
+    migration_ioc_register_yank(ioc);
+    p->registered_yank = true;
+    p->c = ioc;
+    qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
+                       QEMU_THREAD_JOINABLE);
     return true;
 }
 
@@ -877,7 +867,7 @@ static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
      qemu_sem_post(&p->sem_sync);
      /*
       * Although multifd_send_thread is not created, but main migration
-      * thread neet to judge whether it is running, so we need to mark
+      * thread need to judge whether it is running, so we need to mark
       * its status.
       */
      p->quit = true;
@@ -888,20 +878,25 @@ static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
 {
     MultiFDSendParams *p = opaque;
-    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
+    QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
     Error *local_err = NULL;
 
     trace_multifd_new_send_channel_async(p->id);
     if (!qio_task_propagate_error(task, &local_err)) {
-        p->c = QIO_CHANNEL(sioc);
-        qio_channel_set_delay(p->c, false);
+        qio_channel_set_delay(ioc, false);
         p->running = true;
-        if (multifd_channel_connect(p, sioc, local_err)) {
+        if (multifd_channel_connect(p, ioc, &local_err)) {
             return;
         }
     }
 
-    multifd_new_send_channel_cleanup(p, sioc, local_err);
+    trace_multifd_new_send_channel_async_error(p->id, local_err);
+    multifd_new_send_channel_cleanup(p, ioc, local_err);
+}
+
+static void multifd_new_send_channel_create(gpointer opaque)
+{
+    socket_send_channel_create(multifd_new_send_channel_async, opaque);
 }
 
 int multifd_save_setup(Error **errp)
@@ -910,7 +905,7 @@ int multifd_save_setup(Error **errp)
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
     uint8_t i;
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_multifd()) {
         return 0;
     }
 
@@ -944,23 +939,21 @@ int multifd_save_setup(Error **errp)
         p->page_size = qemu_target_page_size();
         p->page_count = page_count;
 
-        if (migrate_use_zero_copy_send()) {
+        if (migrate_zero_copy_send()) {
             p->write_flags = QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
         } else {
             p->write_flags = 0;
         }
 
-        socket_send_channel_create(multifd_new_send_channel_async, p);
+        multifd_new_send_channel_create(p);
     }
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
-        Error *local_err = NULL;
         int ret;
 
-        ret = multifd_send_state->ops->send_setup(p, &local_err);
+        ret = multifd_send_state->ops->send_setup(p, errp);
         if (ret) {
-            error_propagate(errp, local_err);
             return ret;
         }
     }
@@ -1013,11 +1006,18 @@ static void multifd_recv_terminate_threads(Error *err)
     }
 }
 
+void multifd_load_shutdown(void)
+{
+    if (migrate_multifd()) {
+        multifd_recv_terminate_threads(NULL);
+    }
+}
+
 void multifd_load_cleanup(void)
 {
     int i;
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_multifd()) {
         return;
     }
     multifd_recv_terminate_threads(NULL);
@@ -1025,14 +1025,14 @@ void multifd_load_cleanup(void)
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         if (p->running) {
-            p->quit = true;
             /*
              * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
              * however try to wakeup it without harm in cleanup phase.
              */
             qemu_sem_post(&p->sem_sync);
-            qemu_thread_join(&p->thread);
         }
+
+        qemu_thread_join(&p->thread);
     }
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
@@ -1064,7 +1064,7 @@ void multifd_recv_sync_main(void)
 {
     int i;
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_multifd()) {
         return;
     }
     for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -1162,7 +1162,7 @@ int multifd_load_setup(Error **errp)
      * Return successfully if multiFD recv state is already initialised
      * or multiFD is not enabled.
      */
-    if (multifd_recv_state || !migrate_use_multifd()) {
+    if (multifd_recv_state || !migrate_multifd()) {
         return 0;
     }
 
@@ -1192,12 +1192,10 @@ int multifd_load_setup(Error **errp)
 
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
-        Error *local_err = NULL;
         int ret;
 
-        ret = multifd_recv_state->ops->recv_setup(p, &local_err);
+        ret = multifd_recv_state->ops->recv_setup(p, errp);
         if (ret) {
-            error_propagate(errp, local_err);
             return ret;
         }
     }
@@ -1208,7 +1206,7 @@ bool multifd_recv_all_channels_created(void)
 {
     int thread_count = migrate_multifd_channels();
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_multifd()) {
         return true;
     }