]> git.proxmox.com Git - mirror_qemu.git/blobdiff - migration/migration.c
Add migration-capability boolean for postcopy-ram.
[mirror_qemu.git] / migration / migration.c
index b092f386b4ebd07256b1ab2cf0f47678bbf89035..f849f890d91c3506558e3c98c10d3421d266b2d5 100644 (file)
@@ -95,14 +95,17 @@ MigrationIncomingState *migration_incoming_get_current(void)
 MigrationIncomingState *migration_incoming_state_new(QEMUFile* f)
 {
     mis_current = g_new0(MigrationIncomingState, 1);
-    mis_current->file = f;
+    mis_current->from_src_file = f;
     QLIST_INIT(&mis_current->loadvm_handlers);
+    qemu_mutex_init(&mis_current->rp_mutex);
+    qemu_event_init(&mis_current->main_thread_load_event, false);
 
     return mis_current;
 }
 
 void migration_incoming_state_destroy(void)
 {
+    qemu_event_destroy(&mis_current->main_thread_load_event);
     loadvm_free_handlers(mis_current);
     g_free(mis_current);
     mis_current = NULL;
@@ -344,6 +347,50 @@ void process_incoming_migration(QEMUFile *f)
     qemu_coroutine_enter(co, f);
 }
 
+/*
+ * Send a message on the return channel back to the source
+ * of the migration.
+ */
+void migrate_send_rp_message(MigrationIncomingState *mis,
+                             enum mig_rp_message_type message_type,
+                             uint16_t len, void *data)
+{
+    trace_migrate_send_rp_message((int)message_type, len);
+    qemu_mutex_lock(&mis->rp_mutex);
+    qemu_put_be16(mis->to_src_file, (unsigned int)message_type);
+    qemu_put_be16(mis->to_src_file, len);
+    qemu_put_buffer(mis->to_src_file, data, len);
+    qemu_fflush(mis->to_src_file);
+    qemu_mutex_unlock(&mis->rp_mutex);
+}
+
+/*
+ * Send a 'SHUT' message on the return channel with the given value
+ * to indicate that we've finished with the RP.  Non-0 value indicates
+ * error.
+ */
+void migrate_send_rp_shut(MigrationIncomingState *mis,
+                          uint32_t value)
+{
+    uint32_t buf;
+
+    buf = cpu_to_be32(value);
+    migrate_send_rp_message(mis, MIG_RP_MSG_SHUT, sizeof(buf), &buf);
+}
+
+/*
+ * Send a 'PONG' message on the return channel with the given value
+ * (normally in response to a 'PING')
+ */
+void migrate_send_rp_pong(MigrationIncomingState *mis,
+                          uint32_t value)
+{
+    uint32_t buf;
+
+    buf = cpu_to_be32(value);
+    migrate_send_rp_message(mis, MIG_RP_MSG_PONG, sizeof(buf), &buf);
+}
+
 /* amount of nanoseconds we are willing to wait for migration to be down.
  * the choice of nanoseconds is because it is the maximum resolution that
  * get_clock() can achieve. It is an internal measure. All user-visible
@@ -399,6 +446,23 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
     return params;
 }
 
+/*
+ * Return true if we're already in the middle of a migration
+ * (i.e. any of the active or setup states)
+ */
+static bool migration_is_setup_or_active(int state)
+{
+    switch (state) {
+    case MIGRATION_STATUS_ACTIVE:
+    case MIGRATION_STATUS_SETUP:
+        return true;
+
+    default:
+        return false;
+
+    }
+}
+
 static void get_xbzrle_cache_stats(MigrationInfo *info)
 {
     if (migrate_use_xbzrle()) {
@@ -506,8 +570,7 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
     MigrationState *s = migrate_get_current();
     MigrationCapabilityStatusList *cap;
 
-    if (s->state == MIGRATION_STATUS_ACTIVE ||
-        s->state == MIGRATION_STATUS_SETUP) {
+    if (migration_is_setup_or_active(s->state)) {
         error_setg(errp, QERR_MIGRATION_ACTIVE);
         return;
     }
@@ -515,6 +578,20 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
     for (cap = params; cap; cap = cap->next) {
         s->enabled_capabilities[cap->value->capability] = cap->value->state;
     }
+
+    if (migrate_postcopy_ram()) {
+        if (migrate_use_compression()) {
+            /* The decompression threads asynchronously write into RAM
+             * rather than use the atomic copies needed to avoid
+             * userfaulting.  It should be possible to fix the decompression
+             * threads for compatibility in future.
+             */
+            error_report("Postcopy is not currently compatible with "
+                         "compression");
+            s->enabled_capabilities[MIGRATION_CAPABILITY_X_POSTCOPY_RAM] =
+                false;
+        }
+    }
 }
 
 void qmp_migrate_set_parameters(bool has_compress_level,
@@ -613,12 +690,9 @@ static void migrate_fd_cleanup(void *opaque)
 
     assert(s->state != MIGRATION_STATUS_ACTIVE);
 
-    if (s->state != MIGRATION_STATUS_COMPLETED) {
-        qemu_savevm_state_cancel();
-        if (s->state == MIGRATION_STATUS_CANCELLING) {
-            migrate_set_state(s, MIGRATION_STATUS_CANCELLING,
-                              MIGRATION_STATUS_CANCELLED);
-        }
+    if (s->state == MIGRATION_STATUS_CANCELLING) {
+        migrate_set_state(s, MIGRATION_STATUS_CANCELLING,
+                          MIGRATION_STATUS_CANCELLED);
     }
 
     notifier_list_notify(&migration_state_notifiers, s);
@@ -638,10 +712,14 @@ static void migrate_fd_cancel(MigrationState *s)
     QEMUFile *f = migrate_get_current()->file;
     trace_migrate_fd_cancel();
 
+    if (s->rp_state.from_dst_file) {
+        /* shutdown the rp socket, so causing the rp thread to shutdown */
+        qemu_file_shutdown(s->rp_state.from_dst_file);
+    }
+
     do {
         old_state = s->state;
-        if (old_state != MIGRATION_STATUS_SETUP &&
-            old_state != MIGRATION_STATUS_ACTIVE) {
+        if (!migration_is_setup_or_active(old_state)) {
             break;
         }
         migrate_set_state(s, old_state, MIGRATION_STATUS_CANCELLING);
@@ -685,7 +763,7 @@ bool migration_has_failed(MigrationState *s)
             s->state == MIGRATION_STATUS_FAILED);
 }
 
-static MigrationState *migrate_init(const MigrationParams *params)
+MigrationState *migrate_init(const MigrationParams *params)
 {
     MigrationState *s = migrate_get_current();
     int64_t bandwidth_limit = s->bandwidth_limit;
@@ -773,8 +851,7 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
     params.blk = has_blk && blk;
     params.shared = has_inc && inc;
 
-    if (s->state == MIGRATION_STATUS_ACTIVE ||
-        s->state == MIGRATION_STATUS_SETUP ||
+    if (migration_is_setup_or_active(s->state) ||
         s->state == MIGRATION_STATUS_CANCELLING) {
         error_setg(errp, QERR_MIGRATION_ACTIVE);
         return;
@@ -893,6 +970,15 @@ void qmp_migrate_set_downtime(double value, Error **errp)
     max_downtime = (uint64_t)value;
 }
 
+bool migrate_postcopy_ram(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_X_POSTCOPY_RAM];
+}
+
 bool migrate_auto_converge(void)
 {
     MigrationState *s;
@@ -974,6 +1060,154 @@ int64_t migrate_xbzrle_cache_size(void)
     return s->xbzrle_cache_size;
 }
 
+/* migration thread support */
+/*
+ * Something bad happened to the RP stream, mark an error
+ * The caller shall print or trace something to indicate why
+ */
+static void mark_source_rp_bad(MigrationState *s)
+{
+    s->rp_state.error = true;
+}
+
+static struct rp_cmd_args {
+    ssize_t     len; /* -1 = variable */
+    const char *name;
+} rp_cmd_args[] = {
+    [MIG_RP_MSG_INVALID]        = { .len = -1, .name = "INVALID" },
+    [MIG_RP_MSG_SHUT]           = { .len =  4, .name = "SHUT" },
+    [MIG_RP_MSG_PONG]           = { .len =  4, .name = "PONG" },
+    [MIG_RP_MSG_MAX]            = { .len = -1, .name = "MAX" },
+};
+
+/*
+ * Handles messages sent on the return path towards the source VM
+ *
+ */
+static void *source_return_path_thread(void *opaque)
+{
+    MigrationState *ms = opaque;
+    QEMUFile *rp = ms->rp_state.from_dst_file;
+    uint16_t header_len, header_type;
+    const int max_len = 512;
+    uint8_t buf[max_len];
+    uint32_t tmp32, sibling_error;
+    int res;
+
+    trace_source_return_path_thread_entry();
+    while (!ms->rp_state.error && !qemu_file_get_error(rp) &&
+           migration_is_setup_or_active(ms->state)) {
+        trace_source_return_path_thread_loop_top();
+        header_type = qemu_get_be16(rp);
+        header_len = qemu_get_be16(rp);
+
+        if (header_type >= MIG_RP_MSG_MAX ||
+            header_type == MIG_RP_MSG_INVALID) {
+            error_report("RP: Received invalid message 0x%04x length 0x%04x",
+                    header_type, header_len);
+            mark_source_rp_bad(ms);
+            goto out;
+        }
+
+        if ((rp_cmd_args[header_type].len != -1 &&
+            header_len != rp_cmd_args[header_type].len) ||
+            header_len > max_len) {
+            error_report("RP: Received '%s' message (0x%04x) with"
+                    "incorrect length %d expecting %zu",
+                    rp_cmd_args[header_type].name, header_type, header_len,
+                    (size_t)rp_cmd_args[header_type].len);
+            mark_source_rp_bad(ms);
+            goto out;
+        }
+
+        /* We know we've got a valid header by this point */
+        res = qemu_get_buffer(rp, buf, header_len);
+        if (res != header_len) {
+            error_report("RP: Failed reading data for message 0x%04x"
+                         " read %d expected %d",
+                         header_type, res, header_len);
+            mark_source_rp_bad(ms);
+            goto out;
+        }
+
+        /* OK, we have the message and the data */
+        switch (header_type) {
+        case MIG_RP_MSG_SHUT:
+            sibling_error = be32_to_cpup((uint32_t *)buf);
+            trace_source_return_path_thread_shut(sibling_error);
+            if (sibling_error) {
+                error_report("RP: Sibling indicated error %d", sibling_error);
+                mark_source_rp_bad(ms);
+            }
+            /*
+             * We'll let the main thread deal with closing the RP
+             * we could do a shutdown(2) on it, but we're the only user
+             * anyway, so there's nothing gained.
+             */
+            goto out;
+
+        case MIG_RP_MSG_PONG:
+            tmp32 = be32_to_cpup((uint32_t *)buf);
+            trace_source_return_path_thread_pong(tmp32);
+            break;
+
+        default:
+            break;
+        }
+    }
+    if (rp && qemu_file_get_error(rp)) {
+        trace_source_return_path_thread_bad_end();
+        mark_source_rp_bad(ms);
+    }
+
+    trace_source_return_path_thread_end();
+out:
+    ms->rp_state.from_dst_file = NULL;
+    qemu_fclose(rp);
+    return NULL;
+}
+
+__attribute__ (( unused )) /* Until later in patch series */
+static int open_return_path_on_source(MigrationState *ms)
+{
+
+    ms->rp_state.from_dst_file = qemu_file_get_return_path(ms->file);
+    if (!ms->rp_state.from_dst_file) {
+        return -1;
+    }
+
+    trace_open_return_path_on_source();
+    qemu_thread_create(&ms->rp_state.rp_thread, "return path",
+                       source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
+
+    trace_open_return_path_on_source_continue();
+
+    return 0;
+}
+
+__attribute__ (( unused )) /* Until later in patch series */
+/* Returns 0 if the RP was ok, otherwise there was an error on the RP */
+static int await_return_path_close_on_source(MigrationState *ms)
+{
+    /*
+     * If this is a normal exit then the destination will send a SHUT and the
+     * rp_thread will exit, however if there's an error we need to cause
+     * it to exit.
+     */
+    if (qemu_file_get_error(ms->file) && ms->rp_state.from_dst_file) {
+        /*
+         * shutdown(2), if we have it, will cause it to unblock if it's stuck
+         * waiting for the destination.
+         */
+        qemu_file_shutdown(ms->rp_state.from_dst_file);
+        mark_source_rp_bad(ms);
+    }
+    trace_await_return_path_close_on_source_joining();
+    qemu_thread_join(&ms->rp_state.rp_thread);
+    trace_await_return_path_close_on_source_close();
+    return ms->rp_state.error;
+}
+
 /**
  * migration_completion: Used by migration_thread when there's not much left.
  *   The caller 'breaks' the loop when this returns.
@@ -997,7 +1231,7 @@ static void migration_completion(MigrationState *s, bool *old_vm_running,
         ret = vm_stop_force_state(RUN_STATE_FINISH_MIGRATE);
         if (ret >= 0) {
             qemu_file_set_rate_limit(s->file, INT64_MAX);
-            qemu_savevm_state_complete(s->file);
+            qemu_savevm_state_complete_precopy(s->file);
         }
     }
     qemu_mutex_unlock_iothread();
@@ -1018,8 +1252,10 @@ fail:
     migrate_set_state(s, MIGRATION_STATUS_ACTIVE, MIGRATION_STATUS_FAILED);
 }
 
-/* migration thread support */
-
+/*
+ * Master migration thread on the source VM.
+ * It drives the migration and pumps the data down the outgoing channel.
+ */
 static void *migration_thread(void *opaque)
 {
     MigrationState *s = opaque;
@@ -1028,6 +1264,7 @@ static void *migration_thread(void *opaque)
     int64_t initial_bytes = 0;
     int64_t max_size = 0;
     int64_t start_time = initial_time;
+    int64_t end_time;
     bool old_vm_running = false;
 
     rcu_register_thread();
@@ -1089,10 +1326,11 @@ static void *migration_thread(void *opaque)
 
     /* If we enabled cpu throttling for auto-converge, turn it off. */
     cpu_throttle_stop();
+    end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
 
     qemu_mutex_lock_iothread();
+    qemu_savevm_state_cleanup();
     if (s->state == MIGRATION_STATUS_COMPLETED) {
-        int64_t end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
         uint64_t transferred_bytes = qemu_ftell(s->file);
         s->total_time = end_time - s->total_time;
         s->downtime = end_time - start_time;