]> git.proxmox.com Git - mirror_qemu.git/blobdiff - migration/migration.c
Merge remote-tracking branch 'remotes/bonzini/tags/for-upstream' into staging
[mirror_qemu.git] / migration / migration.c
index 02ebd6c9d10274025505d830f7080768c5427e90..2865ae3fa94884a1ad9890711e7373d2e55a70fa 100644 (file)
@@ -31,6 +31,8 @@
 #include "migration/vmstate.h"
 #include "block/block.h"
 #include "qapi/error.h"
+#include "qapi/clone-visitor.h"
+#include "qapi/qapi-visit-sockets.h"
 #include "qapi/qapi-commands-migration.h"
 #include "qapi/qapi-events-migration.h"
 #include "qapi/qmp/qerror.h"
@@ -45,6 +47,7 @@
 #include "migration/colo.h"
 #include "hw/boards.h"
 #include "monitor/monitor.h"
+#include "net/announce.h"
 
 #define MAX_THROTTLE  (32 << 20)      /* Migration transfer speed throttling */
 
 /* Define default autoconverge cpu throttle migration parameters */
 #define DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL 20
 #define DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT 10
+#define DEFAULT_MIGRATE_MAX_CPU_THROTTLE 99
 
 /* Migration XBZRLE default cache size */
 #define DEFAULT_MIGRATE_XBZRLE_CACHE_SIZE (64 * 1024 * 1024)
 
-/* The delay time (in ms) between two COLO checkpoints
- * Note: Please change this default value to 10000 when we support hybrid mode.
- */
-#define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY 200
+/* The delay time (in ms) between two COLO checkpoints */
+#define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY (200 * 100)
 #define DEFAULT_MIGRATE_MULTIFD_CHANNELS 2
-#define DEFAULT_MIGRATE_MULTIFD_PAGE_COUNT 16
+
+/* Background transfer rate for postcopy, 0 means unlimited, note
+ * that page requests can still exceed this limit.
+ */
+#define DEFAULT_MIGRATE_MAX_POSTCOPY_BANDWIDTH 0
+
+/*
+ * Parameters for self_announce_delay giving a stream of RARP/ARP
+ * packets after migration.
+ */
+#define DEFAULT_MIGRATE_ANNOUNCE_INITIAL  50
+#define DEFAULT_MIGRATE_ANNOUNCE_MAX     550
+#define DEFAULT_MIGRATE_ANNOUNCE_ROUNDS    5
+#define DEFAULT_MIGRATE_ANNOUNCE_STEP    100
 
 static NotifierList migration_state_notifiers =
     NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
@@ -95,6 +110,8 @@ enum mig_rp_message_type {
 
     MIG_RP_MSG_REQ_PAGES_ID, /* data (start: be64, len: be32, id: string) */
     MIG_RP_MSG_REQ_PAGES,    /* data (start: be64, len: be32) */
+    MIG_RP_MSG_RECV_BITMAP,  /* send recved_bitmap back to source */
+    MIG_RP_MSG_RESUME_ACK,   /* tell source that we are ready to resume */
 
     MIG_RP_MSG_MAX
 };
@@ -104,11 +121,13 @@ enum mig_rp_message_type {
    dynamic creation of migration */
 
 static MigrationState *current_migration;
+static MigrationIncomingState *current_incoming;
 
 static bool migration_object_check(MigrationState *ms, Error **errp);
 static int migration_maybe_pause(MigrationState *s,
                                  int *current_active_state,
                                  int new_state);
+static void migrate_fd_cancel(MigrationState *s);
 
 void migration_object_init(void)
 {
@@ -119,6 +138,22 @@ void migration_object_init(void)
     assert(!current_migration);
     current_migration = MIGRATION_OBJ(object_new(TYPE_MIGRATION));
 
+    /*
+     * Init the migrate incoming object as well no matter whether
+     * we'll use it or not.
+     */
+    assert(!current_incoming);
+    current_incoming = g_new0(MigrationIncomingState, 1);
+    current_incoming->state = MIGRATION_STATUS_NONE;
+    current_incoming->postcopy_remote_fds =
+        g_array_new(FALSE, TRUE, sizeof(struct PostCopyFD));
+    qemu_mutex_init(&current_incoming->rp_mutex);
+    qemu_event_init(&current_incoming->main_thread_load_event, false);
+    qemu_sem_init(&current_incoming->postcopy_pause_sem_dst, 0);
+    qemu_sem_init(&current_incoming->postcopy_pause_sem_fault, 0);
+
+    init_dirty_bitmap_incoming_migration();
+
     if (!migration_object_check(current_migration, &err)) {
         error_report_err(err);
         exit(1);
@@ -134,8 +169,13 @@ void migration_object_init(void)
     }
 }
 
-void migration_object_finalize(void)
+void migration_shutdown(void)
 {
+    /*
+     * Cancel the current migration - that will (eventually)
+     * stop the migration using this structure
+     */
+    migrate_fd_cancel(current_migration);
     object_unref(OBJECT(current_migration));
 }
 
@@ -149,22 +189,8 @@ MigrationState *migrate_get_current(void)
 
 MigrationIncomingState *migration_incoming_get_current(void)
 {
-    static bool once;
-    static MigrationIncomingState mis_current;
-
-    if (!once) {
-        mis_current.state = MIGRATION_STATUS_NONE;
-        memset(&mis_current, 0, sizeof(MigrationIncomingState));
-        mis_current.postcopy_remote_fds = g_array_new(FALSE, TRUE,
-                                                   sizeof(struct PostCopyFD));
-        qemu_mutex_init(&mis_current.rp_mutex);
-        qemu_event_init(&mis_current.main_thread_load_event, false);
-
-        init_dirty_bitmap_incoming_migration();
-
-        once = true;
-    }
-    return &mis_current;
+    assert(current_incoming);
+    return current_incoming;
 }
 
 void migration_incoming_state_destroy(void)
@@ -188,15 +214,30 @@ void migration_incoming_state_destroy(void)
     }
 
     qemu_event_reset(&mis->main_thread_load_event);
+
+    if (mis->socket_address_list) {
+        qapi_free_SocketAddressList(mis->socket_address_list);
+        mis->socket_address_list = NULL;
+    }
 }
 
 static void migrate_generate_event(int new_state)
 {
     if (migrate_use_events()) {
-        qapi_event_send_migration(new_state, &error_abort);
+        qapi_event_send_migration(new_state);
     }
 }
 
+static bool migrate_late_block_activate(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->enabled_capabilities[
+        MIGRATION_CAPABILITY_LATE_BLOCK_ACTIVATE];
+}
+
 /*
  * Called on -incoming with a defer: uri.
  * The migration can be started later after any parameters have been
@@ -277,11 +318,38 @@ int migrate_send_rp_req_pages(MigrationIncomingState *mis, const char *rbname,
     return migrate_send_rp_message(mis, msg_type, msglen, bufc);
 }
 
+static bool migration_colo_enabled;
+bool migration_incoming_colo_enabled(void)
+{
+    return migration_colo_enabled;
+}
+
+void migration_incoming_disable_colo(void)
+{
+    migration_colo_enabled = false;
+}
+
+void migration_incoming_enable_colo(void)
+{
+    migration_colo_enabled = true;
+}
+
+void migrate_add_address(SocketAddress *address)
+{
+    MigrationIncomingState *mis = migration_incoming_get_current();
+    SocketAddressList *addrs;
+
+    addrs = g_new0(SocketAddressList, 1);
+    addrs->next = mis->socket_address_list;
+    mis->socket_address_list = addrs;
+    addrs->value = QAPI_CLONE(SocketAddress, address);
+}
+
 void qemu_start_incoming_migration(const char *uri, Error **errp)
 {
     const char *p;
 
-    qapi_event_send_migration(MIGRATION_STATUS_SETUP, &error_abort);
+    qapi_event_send_migration(MIGRATION_STATUS_SETUP);
     if (!strcmp(uri, "defer")) {
         deferred_incoming_migration(errp);
     } else if (strstart(uri, "tcp:", &p)) {
@@ -306,20 +374,30 @@ static void process_incoming_migration_bh(void *opaque)
     Error *local_err = NULL;
     MigrationIncomingState *mis = opaque;
 
-    /* Make sure all file formats flush their mutable metadata.
-     * If we get an error here, just don't restart the VM yet. */
-    bdrv_invalidate_cache_all(&local_err);
-    if (local_err) {
-        error_report_err(local_err);
-        local_err = NULL;
-        autostart = false;
+    /* If capability late_block_activate is set:
+     * Only fire up the block code now if we're going to restart the
+     * VM, else 'cont' will do it.
+     * This causes file locking to happen; so we don't want it to happen
+     * unless we really are starting the VM.
+     */
+    if (!migrate_late_block_activate() ||
+         (autostart && (!global_state_received() ||
+            global_state_get_runstate() == RUN_STATE_RUNNING))) {
+        /* Make sure all file formats flush their mutable metadata.
+         * If we get an error here, just don't restart the VM yet. */
+        bdrv_invalidate_cache_all(&local_err);
+        if (local_err) {
+            error_report_err(local_err);
+            local_err = NULL;
+            autostart = false;
+        }
     }
 
     /*
      * This must happen after all error conditions are dealt with and
      * we're sure the VM is going to be running on this host.
      */
-    qemu_announce_self();
+    qemu_announce_self(&mis->announce_timer, migrate_announce_params());
 
     if (multifd_load_cleanup(&local_err) != 0) {
         error_report_err(local_err);
@@ -338,6 +416,9 @@ static void process_incoming_migration_bh(void *opaque)
         } else {
             runstate_set(RUN_STATE_PAUSED);
         }
+    } else if (migration_incoming_colo_enabled()) {
+        migration_incoming_disable_colo();
+        vm_start();
     } else {
         runstate_set(global_state_get_runstate());
     }
@@ -357,8 +438,10 @@ static void process_incoming_migration_co(void *opaque)
     MigrationIncomingState *mis = migration_incoming_get_current();
     PostcopyState ps;
     int ret;
+    Error *local_err = NULL;
 
     assert(mis->from_src_file);
+    mis->migration_incoming_co = qemu_coroutine_self();
     mis->largest_page_size = qemu_ram_pagesize_largest();
     postcopy_state_set(POSTCOPY_INCOMING_NONE);
     migrate_set_state(&mis->state, MIGRATION_STATUS_NONE,
@@ -387,8 +470,19 @@ static void process_incoming_migration_co(void *opaque)
     }
 
     /* we get COLO info, and know if we are in COLO mode */
-    if (!ret && migration_incoming_enable_colo()) {
-        mis->migration_incoming_co = qemu_coroutine_self();
+    if (!ret && migration_incoming_colo_enabled()) {
+        /* Make sure all file formats flush their mutable metadata */
+        bdrv_invalidate_cache_all(&local_err);
+        if (local_err) {
+            error_report_err(local_err);
+            goto fail;
+        }
+
+        if (colo_init_ram_cache() < 0) {
+            error_report("Init ram cache failed");
+            goto fail;
+        }
+
         qemu_thread_create(&mis->colo_incoming_thread, "COLO incoming",
              colo_process_incoming_thread, mis, QEMU_THREAD_JOINABLE);
         mis->have_colo_incoming_thread = true;
@@ -396,22 +490,27 @@ static void process_incoming_migration_co(void *opaque)
 
         /* Wait checkpoint incoming thread exit before free resource */
         qemu_thread_join(&mis->colo_incoming_thread);
+        /* We hold the global iothread lock, so it is safe here */
+        colo_release_ram_cache();
     }
 
     if (ret < 0) {
-        Error *local_err = NULL;
-
-        migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
-                          MIGRATION_STATUS_FAILED);
         error_report("load of migration failed: %s", strerror(-ret));
-        qemu_fclose(mis->from_src_file);
-        if (multifd_load_cleanup(&local_err) != 0) {
-            error_report_err(local_err);
-        }
-        exit(EXIT_FAILURE);
+        goto fail;
     }
     mis->bh = qemu_bh_new(process_incoming_migration_bh, mis);
     qemu_bh_schedule(mis->bh);
+    mis->migration_incoming_co = NULL;
+    return;
+fail:
+    local_err = NULL;
+    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
+                      MIGRATION_STATUS_FAILED);
+    qemu_fclose(mis->from_src_file);
+    if (multifd_load_cleanup(&local_err) != 0) {
+        error_report_err(local_err);
+    }
+    exit(EXIT_FAILURE);
 }
 
 static void migration_incoming_setup(QEMUFile *f)
@@ -436,22 +535,83 @@ void migration_incoming_process(void)
     qemu_coroutine_enter(co);
 }
 
+/* Returns true if recovered from a paused migration, otherwise false */
+static bool postcopy_try_recover(QEMUFile *f)
+{
+    MigrationIncomingState *mis = migration_incoming_get_current();
+
+    if (mis->state == MIGRATION_STATUS_POSTCOPY_PAUSED) {
+        /* Resumed from a paused postcopy migration */
+
+        mis->from_src_file = f;
+        /* Postcopy has standalone thread to do vm load */
+        qemu_file_set_blocking(f, true);
+
+        /* Re-configure the return path */
+        mis->to_src_file = qemu_file_get_return_path(f);
+
+        migrate_set_state(&mis->state, MIGRATION_STATUS_POSTCOPY_PAUSED,
+                          MIGRATION_STATUS_POSTCOPY_RECOVER);
+
+        /*
+         * Here, we only wake up the main loading thread (while the
+         * fault thread will still be waiting), so that we can receive
+         * commands from source now, and answer it if needed. The
+         * fault thread will be woken up afterwards until we are sure
+         * that source is ready to reply to page requests.
+         */
+        qemu_sem_post(&mis->postcopy_pause_sem_dst);
+        return true;
+    }
+
+    return false;
+}
+
 void migration_fd_process_incoming(QEMUFile *f)
 {
+    if (postcopy_try_recover(f)) {
+        return;
+    }
+
     migration_incoming_setup(f);
     migration_incoming_process();
 }
 
-void migration_ioc_process_incoming(QIOChannel *ioc)
+void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
 {
     MigrationIncomingState *mis = migration_incoming_get_current();
+    bool start_migration;
 
     if (!mis->from_src_file) {
+        /* The first connection (multifd may have multiple) */
         QEMUFile *f = qemu_fopen_channel_input(ioc);
+
+        /* If it's a recovery, we're done */
+        if (postcopy_try_recover(f)) {
+            return;
+        }
+
         migration_incoming_setup(f);
-        return;
+
+        /*
+         * Common migration only needs one channel, so we can start
+         * right now.  Multifd needs more than one channel, we wait.
+         */
+        start_migration = !migrate_use_multifd();
+    } else {
+        Error *local_err = NULL;
+        /* Multiple connections */
+        assert(migrate_use_multifd());
+        start_migration = multifd_recv_new_channel(ioc, &local_err);
+        if (local_err) {
+            error_propagate(errp, local_err);
+            return;
+        }
+    }
+
+    if (start_migration) {
+        migration_incoming_process();
     }
-    multifd_recv_new_channel(ioc);
 }
 
 /**
@@ -462,11 +622,12 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
  */
 bool migration_has_all_channels(void)
 {
+    MigrationIncomingState *mis = migration_incoming_get_current();
     bool all_channels;
 
     all_channels = multifd_recv_all_channels_created();
 
-    return all_channels;
+    return all_channels && mis->from_src_file != NULL;
 }
 
 /*
@@ -496,6 +657,53 @@ void migrate_send_rp_pong(MigrationIncomingState *mis,
     migrate_send_rp_message(mis, MIG_RP_MSG_PONG, sizeof(buf), &buf);
 }
 
+void migrate_send_rp_recv_bitmap(MigrationIncomingState *mis,
+                                 char *block_name)
+{
+    char buf[512];
+    int len;
+    int64_t res;
+
+    /*
+     * First, we send the header part. It contains only the len of
+     * idstr, and the idstr itself.
+     */
+    len = strlen(block_name);
+    buf[0] = len;
+    memcpy(buf + 1, block_name, len);
+
+    if (mis->state != MIGRATION_STATUS_POSTCOPY_RECOVER) {
+        error_report("%s: MSG_RP_RECV_BITMAP only used for recovery",
+                     __func__);
+        return;
+    }
+
+    migrate_send_rp_message(mis, MIG_RP_MSG_RECV_BITMAP, len + 1, buf);
+
+    /*
+     * Next, we dump the received bitmap to the stream.
+     *
+     * TODO: currently we are safe since we are the only one that is
+     * using the to_src_file handle (fault thread is still paused),
+     * and it's ok even not taking the mutex. However the best way is
+     * to take the lock before sending the message header, and release
+     * the lock after sending the bitmap.
+     */
+    qemu_mutex_lock(&mis->rp_mutex);
+    res = ramblock_recv_bitmap_send(mis->to_src_file, block_name);
+    qemu_mutex_unlock(&mis->rp_mutex);
+
+    trace_migrate_send_rp_recv_bitmap(block_name, res);
+}
+
+void migrate_send_rp_resume_ack(MigrationIncomingState *mis, uint32_t value)
+{
+    uint32_t buf;
+
+    buf = cpu_to_be32(value);
+    migrate_send_rp_message(mis, MIG_RP_MSG_RESUME_ACK, sizeof(buf), &buf);
+}
+
 MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp)
 {
     MigrationCapabilityStatusList *head = NULL;
@@ -537,6 +745,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
     params->compress_level = s->parameters.compress_level;
     params->has_compress_threads = true;
     params->compress_threads = s->parameters.compress_threads;
+    params->has_compress_wait_thread = true;
+    params->compress_wait_thread = s->parameters.compress_wait_thread;
     params->has_decompress_threads = true;
     params->decompress_threads = s->parameters.decompress_threads;
     params->has_cpu_throttle_initial = true;
@@ -547,6 +757,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
     params->tls_creds = g_strdup(s->parameters.tls_creds);
     params->has_tls_hostname = true;
     params->tls_hostname = g_strdup(s->parameters.tls_hostname);
+    params->has_tls_authz = true;
+    params->tls_authz = g_strdup(s->parameters.tls_authz);
     params->has_max_bandwidth = true;
     params->max_bandwidth = s->parameters.max_bandwidth;
     params->has_downtime_limit = true;
@@ -555,26 +767,51 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
     params->x_checkpoint_delay = s->parameters.x_checkpoint_delay;
     params->has_block_incremental = true;
     params->block_incremental = s->parameters.block_incremental;
-    params->has_x_multifd_channels = true;
-    params->x_multifd_channels = s->parameters.x_multifd_channels;
-    params->has_x_multifd_page_count = true;
-    params->x_multifd_page_count = s->parameters.x_multifd_page_count;
+    params->has_multifd_channels = true;
+    params->multifd_channels = s->parameters.multifd_channels;
     params->has_xbzrle_cache_size = true;
     params->xbzrle_cache_size = s->parameters.xbzrle_cache_size;
+    params->has_max_postcopy_bandwidth = true;
+    params->max_postcopy_bandwidth = s->parameters.max_postcopy_bandwidth;
+    params->has_max_cpu_throttle = true;
+    params->max_cpu_throttle = s->parameters.max_cpu_throttle;
+    params->has_announce_initial = true;
+    params->announce_initial = s->parameters.announce_initial;
+    params->has_announce_max = true;
+    params->announce_max = s->parameters.announce_max;
+    params->has_announce_rounds = true;
+    params->announce_rounds = s->parameters.announce_rounds;
+    params->has_announce_step = true;
+    params->announce_step = s->parameters.announce_step;
 
     return params;
 }
 
+AnnounceParameters *migrate_announce_params(void)
+{
+    static AnnounceParameters ap;
+
+    MigrationState *s = migrate_get_current();
+
+    ap.initial = s->parameters.announce_initial;
+    ap.max = s->parameters.announce_max;
+    ap.rounds = s->parameters.announce_rounds;
+    ap.step = s->parameters.announce_step;
+
+    return &ap;
+}
+
 /*
  * Return true if we're already in the middle of a migration
  * (i.e. any of the active or setup states)
  */
-static bool migration_is_setup_or_active(int state)
+bool migration_is_setup_or_active(int state)
 {
     switch (state) {
     case MIGRATION_STATUS_ACTIVE:
     case MIGRATION_STATUS_POSTCOPY_ACTIVE:
     case MIGRATION_STATUS_POSTCOPY_PAUSED:
+    case MIGRATION_STATUS_POSTCOPY_RECOVER:
     case MIGRATION_STATUS_SETUP:
     case MIGRATION_STATUS_PRE_SWITCHOVER:
     case MIGRATION_STATUS_DEVICE:
@@ -602,6 +839,8 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
     info->ram->dirty_sync_count = ram_counters.dirty_sync_count;
     info->ram->postcopy_requests = ram_counters.postcopy_requests;
     info->ram->page_size = qemu_target_page_size();
+    info->ram->multifd_bytes = ram_counters.multifd_bytes;
+    info->ram->pages_per_second = s->pages_per_second;
 
     if (migrate_use_xbzrle()) {
         info->has_xbzrle_cache = true;
@@ -614,6 +853,18 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
         info->xbzrle_cache->overflow = xbzrle_counters.overflow;
     }
 
+    if (migrate_use_compression()) {
+        info->has_compression = true;
+        info->compression = g_malloc0(sizeof(*info->compression));
+        info->compression->pages = compression_counters.pages;
+        info->compression->busy = compression_counters.busy;
+        info->compression->busy_rate = compression_counters.busy_rate;
+        info->compression->compressed_size =
+                                    compression_counters.compressed_size;
+        info->compression->compression_rate =
+                                    compression_counters.compression_rate;
+    }
+
     if (cpu_throttle_active()) {
         info->has_cpu_throttle_percentage = true;
         info->cpu_throttle_percentage = cpu_throttle_get_percentage();
@@ -656,6 +907,7 @@ static void fill_source_migration_info(MigrationInfo *info)
     case MIGRATION_STATUS_PRE_SWITCHOVER:
     case MIGRATION_STATUS_DEVICE:
     case MIGRATION_STATUS_POSTCOPY_PAUSED:
+    case MIGRATION_STATUS_POSTCOPY_RECOVER:
          /* TODO add some postcopy stats */
         info->has_status = true;
         info->has_total_time = true;
@@ -730,6 +982,15 @@ static bool migrate_caps_check(bool *cap_list,
     }
 #endif
 
+#ifndef CONFIG_REPLICATION
+    if (cap_list[MIGRATION_CAPABILITY_X_COLO]) {
+        error_setg(errp, "QEMU compiled without replication module"
+                   " can't enable COLO");
+        error_append_hint(errp, "Please enable replication before COLO.\n");
+        return false;
+    }
+#endif
+
     if (cap_list[MIGRATION_CAPABILITY_POSTCOPY_RAM]) {
         if (cap_list[MIGRATION_CAPABILITY_COMPRESS]) {
             /* The decompression threads asynchronously write into RAM
@@ -754,6 +1015,11 @@ static bool migrate_caps_check(bool *cap_list,
             error_setg(errp, "Postcopy is not supported");
             return false;
         }
+
+        if (cap_list[MIGRATION_CAPABILITY_X_IGNORE_SHARED]) {
+            error_setg(errp, "Postcopy is not compatible with ignore-shared");
+            return false;
+        }
     }
 
     return true;
@@ -763,6 +1029,12 @@ static void fill_destination_migration_info(MigrationInfo *info)
 {
     MigrationIncomingState *mis = migration_incoming_get_current();
 
+    if (mis->socket_address_list) {
+        info->has_socket_address = true;
+        info->socket_address =
+            QAPI_CLONE(SocketAddressList, mis->socket_address_list);
+    }
+
     switch (mis->state) {
     case MIGRATION_STATUS_NONE:
         return;
@@ -772,6 +1044,8 @@ static void fill_destination_migration_info(MigrationInfo *info)
     case MIGRATION_STATUS_CANCELLED:
     case MIGRATION_STATUS_ACTIVE:
     case MIGRATION_STATUS_POSTCOPY_ACTIVE:
+    case MIGRATION_STATUS_POSTCOPY_PAUSED:
+    case MIGRATION_STATUS_POSTCOPY_RECOVER:
     case MIGRATION_STATUS_FAILED:
     case MIGRATION_STATUS_COLO:
         info->has_status = true;
@@ -877,20 +1151,12 @@ static bool migrate_params_check(MigrationParameters *params, Error **errp)
 
     /* x_checkpoint_delay is now always positive */
 
-    if (params->has_x_multifd_channels && (params->x_multifd_channels < 1)) {
+    if (params->has_multifd_channels && (params->multifd_channels < 1)) {
         error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
                    "multifd_channels",
                    "is invalid, it should be in the range of 1 to 255");
         return false;
     }
-    if (params->has_x_multifd_page_count &&
-        (params->x_multifd_page_count < 1 ||
-         params->x_multifd_page_count > 10000)) {
-        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
-                   "multifd_page_count",
-                   "is invalid, it should be in the range of 1 to 10000");
-        return false;
-    }
 
     if (params->has_xbzrle_cache_size &&
         (params->xbzrle_cache_size < qemu_target_page_size() ||
@@ -902,6 +1168,44 @@ static bool migrate_params_check(MigrationParameters *params, Error **errp)
         return false;
     }
 
+    if (params->has_max_cpu_throttle &&
+        (params->max_cpu_throttle < params->cpu_throttle_initial ||
+         params->max_cpu_throttle > 99)) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "max_cpu_throttle",
+                   "an integer in the range of cpu_throttle_initial to 99");
+        return false;
+    }
+
+    if (params->has_announce_initial &&
+        params->announce_initial > 100000) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "announce_initial",
+                   "is invalid, it must be less than 100000 ms");
+        return false;
+    }
+    if (params->has_announce_max &&
+        params->announce_max > 100000) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "announce_max",
+                   "is invalid, it must be less than 100000 ms");
+       return false;
+    }
+    if (params->has_announce_rounds &&
+        params->announce_rounds > 1000) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "announce_rounds",
+                   "is invalid, it must be in the range of 0 to 1000");
+       return false;
+    }
+    if (params->has_announce_step &&
+        (params->announce_step < 1 ||
+        params->announce_step > 10000)) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "announce_step",
+                   "is invalid, it must be in the range of 1 to 10000 ms");
+       return false;
+    }
     return true;
 }
 
@@ -920,6 +1224,10 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
         dest->compress_threads = params->compress_threads;
     }
 
+    if (params->has_compress_wait_thread) {
+        dest->compress_wait_thread = params->compress_wait_thread;
+    }
+
     if (params->has_decompress_threads) {
         dest->decompress_threads = params->decompress_threads;
     }
@@ -957,15 +1265,30 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
     if (params->has_block_incremental) {
         dest->block_incremental = params->block_incremental;
     }
-    if (params->has_x_multifd_channels) {
-        dest->x_multifd_channels = params->x_multifd_channels;
-    }
-    if (params->has_x_multifd_page_count) {
-        dest->x_multifd_page_count = params->x_multifd_page_count;
+    if (params->has_multifd_channels) {
+        dest->multifd_channels = params->multifd_channels;
     }
     if (params->has_xbzrle_cache_size) {
         dest->xbzrle_cache_size = params->xbzrle_cache_size;
     }
+    if (params->has_max_postcopy_bandwidth) {
+        dest->max_postcopy_bandwidth = params->max_postcopy_bandwidth;
+    }
+    if (params->has_max_cpu_throttle) {
+        dest->max_cpu_throttle = params->max_cpu_throttle;
+    }
+    if (params->has_announce_initial) {
+        dest->announce_initial = params->announce_initial;
+    }
+    if (params->has_announce_max) {
+        dest->announce_max = params->announce_max;
+    }
+    if (params->has_announce_rounds) {
+        dest->announce_rounds = params->announce_rounds;
+    }
+    if (params->has_announce_step) {
+        dest->announce_step = params->announce_step;
+    }
 }
 
 static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
@@ -982,6 +1305,10 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
         s->parameters.compress_threads = params->compress_threads;
     }
 
+    if (params->has_compress_wait_thread) {
+        s->parameters.compress_wait_thread = params->compress_wait_thread;
+    }
+
     if (params->has_decompress_threads) {
         s->parameters.decompress_threads = params->decompress_threads;
     }
@@ -1006,9 +1333,15 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
         s->parameters.tls_hostname = g_strdup(params->tls_hostname->u.s);
     }
 
+    if (params->has_tls_authz) {
+        g_free(s->parameters.tls_authz);
+        assert(params->tls_authz->type == QTYPE_QSTRING);
+        s->parameters.tls_authz = g_strdup(params->tls_authz->u.s);
+    }
+
     if (params->has_max_bandwidth) {
         s->parameters.max_bandwidth = params->max_bandwidth;
-        if (s->to_dst_file) {
+        if (s->to_dst_file && !migration_in_postcopy()) {
             qemu_file_set_rate_limit(s->to_dst_file,
                                 s->parameters.max_bandwidth / XFER_LIMIT_RATIO);
         }
@@ -1028,16 +1361,35 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
     if (params->has_block_incremental) {
         s->parameters.block_incremental = params->block_incremental;
     }
-    if (params->has_x_multifd_channels) {
-        s->parameters.x_multifd_channels = params->x_multifd_channels;
-    }
-    if (params->has_x_multifd_page_count) {
-        s->parameters.x_multifd_page_count = params->x_multifd_page_count;
+    if (params->has_multifd_channels) {
+        s->parameters.multifd_channels = params->multifd_channels;
     }
     if (params->has_xbzrle_cache_size) {
         s->parameters.xbzrle_cache_size = params->xbzrle_cache_size;
         xbzrle_cache_resize(params->xbzrle_cache_size, errp);
     }
+    if (params->has_max_postcopy_bandwidth) {
+        s->parameters.max_postcopy_bandwidth = params->max_postcopy_bandwidth;
+        if (s->to_dst_file && migration_in_postcopy()) {
+            qemu_file_set_rate_limit(s->to_dst_file,
+                    s->parameters.max_postcopy_bandwidth / XFER_LIMIT_RATIO);
+        }
+    }
+    if (params->has_max_cpu_throttle) {
+        s->parameters.max_cpu_throttle = params->max_cpu_throttle;
+    }
+    if (params->has_announce_initial) {
+        s->parameters.announce_initial = params->announce_initial;
+    }
+    if (params->has_announce_max) {
+        s->parameters.announce_max = params->announce_max;
+    }
+    if (params->has_announce_rounds) {
+        s->parameters.announce_rounds = params->announce_rounds;
+    }
+    if (params->has_announce_step) {
+        s->parameters.announce_step = params->announce_step;
+    }
 }
 
 void qmp_migrate_set_parameters(MigrateSetParameters *params, Error **errp)
@@ -1143,17 +1495,15 @@ static void block_cleanup_parameters(MigrationState *s)
     }
 }
 
-static void migrate_fd_cleanup(void *opaque)
+static void migrate_fd_cleanup(MigrationState *s)
 {
-    MigrationState *s = opaque;
-
     qemu_bh_delete(s->cleanup_bh);
     s->cleanup_bh = NULL;
 
     qemu_savevm_state_cleanup();
 
     if (s->to_dst_file) {
-        Error *local_err = NULL;
+        QEMUFile *tmp;
 
         trace_migrate_fd_cleanup();
         qemu_mutex_unlock_iothread();
@@ -1163,11 +1513,16 @@ static void migrate_fd_cleanup(void *opaque)
         }
         qemu_mutex_lock_iothread();
 
-        if (multifd_save_cleanup(&local_err) != 0) {
-            error_report_err(local_err);
-        }
-        qemu_fclose(s->to_dst_file);
+        multifd_save_cleanup();
+        qemu_mutex_lock(&s->qemu_file_lock);
+        tmp = s->to_dst_file;
         s->to_dst_file = NULL;
+        qemu_mutex_unlock(&s->qemu_file_lock);
+        /*
+         * Close the file handle without the lock to make sure the
+         * critical section won't block for long.
+         */
+        qemu_fclose(tmp);
     }
 
     assert((s->state != MIGRATION_STATUS_ACTIVE) &&
@@ -1186,6 +1541,23 @@ static void migrate_fd_cleanup(void *opaque)
     block_cleanup_parameters(s);
 }
 
+static void migrate_fd_cleanup_schedule(MigrationState *s)
+{
+    /*
+     * Ref the state for bh, because it may be called when
+     * there're already no other refs
+     */
+    object_ref(OBJECT(s));
+    qemu_bh_schedule(s->cleanup_bh);
+}
+
+static void migrate_fd_cleanup_bh(void *opaque)
+{
+    MigrationState *s = opaque;
+    migrate_fd_cleanup(s);
+    object_unref(OBJECT(s));
+}
+
 void migrate_set_error(MigrationState *s, const Error *error)
 {
     qemu_mutex_lock(&s->error_mutex);
@@ -1289,7 +1661,11 @@ bool migration_in_postcopy_after_devices(MigrationState *s)
 
 bool migration_is_idle(void)
 {
-    MigrationState *s = migrate_get_current();
+    MigrationState *s = current_migration;
+
+    if (!s) {
+        return true;
+    }
 
     switch (s->state) {
     case MIGRATION_STATUS_NONE:
@@ -1320,13 +1696,13 @@ void migrate_init(MigrationState *s)
      * locks.
      */
     s->bytes_xfer = 0;
-    s->xfer_limit = 0;
     s->cleanup_bh = 0;
     s->to_dst_file = NULL;
     s->state = MIGRATION_STATUS_NONE;
     s->rp_state.from_dst_file = NULL;
     s->rp_state.error = false;
     s->mbps = 0.0;
+    s->pages_per_second = 0.0;
     s->downtime = 0;
     s->expected_downtime = 0;
     s->setup_time = 0;
@@ -1349,10 +1725,10 @@ static GSList *migration_blockers;
 
 int migrate_add_blocker(Error *reason, Error **errp)
 {
-    if (migrate_get_current()->only_migratable) {
-        error_propagate(errp, error_copy(reason));
-        error_prepend(errp, "disallowing migration blocker "
-                          "(--only_migratable) for: ");
+    if (only_migratable) {
+        error_propagate_prepend(errp, error_copy(reason),
+                                "disallowing migration blocker "
+                                "(--only-migratable) for: ");
         return -EACCES;
     }
 
@@ -1361,9 +1737,9 @@ int migrate_add_blocker(Error *reason, Error **errp)
         return 0;
     }
 
-    error_propagate(errp, error_copy(reason));
-    error_prepend(errp, "disallowing migration blocker (migration in "
-                      "progress) for: ");
+    error_propagate_prepend(errp, error_copy(reason),
+                            "disallowing migration blocker "
+                            "(migration in progress) for: ");
     return -EBUSY;
 }
 
@@ -1395,6 +1771,59 @@ void qmp_migrate_incoming(const char *uri, Error **errp)
     once = false;
 }
 
+void qmp_migrate_recover(const char *uri, Error **errp)
+{
+    MigrationIncomingState *mis = migration_incoming_get_current();
+
+    if (mis->state != MIGRATION_STATUS_POSTCOPY_PAUSED) {
+        error_setg(errp, "Migrate recover can only be run "
+                   "when postcopy is paused.");
+        return;
+    }
+
+    if (atomic_cmpxchg(&mis->postcopy_recover_triggered,
+                       false, true) == true) {
+        error_setg(errp, "Migrate recovery is triggered already");
+        return;
+    }
+
+    /*
+     * Note that this call will never start a real migration; it will
+     * only re-setup the migration stream and poke existing migration
+     * to continue using that newly established channel.
+     */
+    qemu_start_incoming_migration(uri, errp);
+}
+
+void qmp_migrate_pause(Error **errp)
+{
+    MigrationState *ms = migrate_get_current();
+    MigrationIncomingState *mis = migration_incoming_get_current();
+    int ret;
+
+    if (ms->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+        /* Source side, during postcopy */
+        qemu_mutex_lock(&ms->qemu_file_lock);
+        ret = qemu_file_shutdown(ms->to_dst_file);
+        qemu_mutex_unlock(&ms->qemu_file_lock);
+        if (ret) {
+            error_setg(errp, "Failed to pause source migration");
+        }
+        return;
+    }
+
+    if (mis->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+        ret = qemu_file_shutdown(mis->from_src_file);
+        if (ret) {
+            error_setg(errp, "Failed to pause destination migration");
+        }
+        return;
+    }
+
+    error_setg(errp, "migrate-pause is currently only supported "
+               "during postcopy-active state");
+}
+
 bool migration_is_blocked(Error **errp)
 {
     if (qemu_savevm_state_blocked(errp)) {
@@ -1409,49 +1838,94 @@ bool migration_is_blocked(Error **errp)
     return false;
 }
 
-void qmp_migrate(const char *uri, bool has_blk, bool blk,
-                 bool has_inc, bool inc, bool has_detach, bool detach,
-                 Error **errp)
+/* Returns true if continue to migrate, or false if error detected */
+static bool migrate_prepare(MigrationState *s, bool blk, bool blk_inc,
+                            bool resume, Error **errp)
 {
     Error *local_err = NULL;
-    MigrationState *s = migrate_get_current();
-    const char *p;
+
+    if (resume) {
+        if (s->state != MIGRATION_STATUS_POSTCOPY_PAUSED) {
+            error_setg(errp, "Cannot resume if there is no "
+                       "paused migration");
+            return false;
+        }
+
+        /*
+         * Postcopy recovery won't work well with release-ram
+         * capability since release-ram will drop the page buffer as
+         * long as the page is put into the send buffer.  So if there
+         * is a network failure happened, any page buffers that have
+         * not yet reached the destination VM but have already been
+         * sent from the source VM will be lost forever.  Let's refuse
+         * the client from resuming such a postcopy migration.
+         * Luckily release-ram was designed to only be used when src
+         * and destination VMs are on the same host, so it should be
+         * fine.
+         */
+        if (migrate_release_ram()) {
+            error_setg(errp, "Postcopy recovery cannot work "
+                       "when release-ram capability is set");
+            return false;
+        }
+
+        /* This is a resume, skip init status */
+        return true;
+    }
 
     if (migration_is_setup_or_active(s->state) ||
         s->state == MIGRATION_STATUS_CANCELLING ||
         s->state == MIGRATION_STATUS_COLO) {
         error_setg(errp, QERR_MIGRATION_ACTIVE);
-        return;
+        return false;
     }
+
     if (runstate_check(RUN_STATE_INMIGRATE)) {
         error_setg(errp, "Guest is waiting for an incoming migration");
-        return;
+        return false;
     }
 
     if (migration_is_blocked(errp)) {
-        return;
+        return false;
     }
 
-    if ((has_blk && blk) || (has_inc && inc)) {
+    if (blk || blk_inc) {
         if (migrate_use_block() || migrate_use_block_incremental()) {
             error_setg(errp, "Command options are incompatible with "
                        "current migration capabilities");
-            return;
+            return false;
         }
         migrate_set_block_enabled(true, &local_err);
         if (local_err) {
             error_propagate(errp, local_err);
-            return;
+            return false;
         }
         s->must_remove_block_options = true;
     }
 
-    if (has_inc && inc) {
+    if (blk_inc) {
         migrate_set_block_incremental(s, true);
     }
 
     migrate_init(s);
 
+    return true;
+}
+
+void qmp_migrate(const char *uri, bool has_blk, bool blk,
+                 bool has_inc, bool inc, bool has_detach, bool detach,
+                 bool has_resume, bool resume, Error **errp)
+{
+    Error *local_err = NULL;
+    MigrationState *s = migrate_get_current();
+    const char *p;
+
+    if (!migrate_prepare(s, has_blk && blk, has_inc && inc,
+                         has_resume && resume, errp)) {
+        /* Error detected, put into errp */
+        return;
+    }
+
     if (strstart(uri, "tcp:", &p)) {
         tcp_start_outgoing_migration(s, p, &local_err);
 #ifdef CONFIG_RDMA
@@ -1618,6 +2092,15 @@ int migrate_compress_threads(void)
     return s->parameters.compress_threads;
 }
 
+int migrate_compress_wait_thread(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters.compress_wait_thread;
+}
+
 int migrate_decompress_threads(void)
 {
     MigrationState *s;
@@ -1636,6 +2119,15 @@ bool migrate_dirty_bitmaps(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_DIRTY_BITMAPS];
 }
 
+bool migrate_ignore_shared(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_X_IGNORE_SHARED];
+}
+
 bool migrate_use_events(void)
 {
     MigrationState *s;
@@ -1651,7 +2143,7 @@ bool migrate_use_multifd(void)
 
     s = migrate_get_current();
 
-    return s->enabled_capabilities[MIGRATION_CAPABILITY_X_MULTIFD];
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_MULTIFD];
 }
 
 bool migrate_pause_before_switchover(void)
@@ -1670,34 +2162,34 @@ int migrate_multifd_channels(void)
 
     s = migrate_get_current();
 
-    return s->parameters.x_multifd_channels;
+    return s->parameters.multifd_channels;
 }
 
-int migrate_multifd_page_count(void)
+int migrate_use_xbzrle(void)
 {
     MigrationState *s;
 
     s = migrate_get_current();
 
-    return s->parameters.x_multifd_page_count;
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_XBZRLE];
 }
 
-int migrate_use_xbzrle(void)
+int64_t migrate_xbzrle_cache_size(void)
 {
     MigrationState *s;
 
     s = migrate_get_current();
 
-    return s->enabled_capabilities[MIGRATION_CAPABILITY_XBZRLE];
+    return s->parameters.xbzrle_cache_size;
 }
 
-int64_t migrate_xbzrle_cache_size(void)
+static int64_t migrate_max_postcopy_bandwidth(void)
 {
     MigrationState *s;
 
     s = migrate_get_current();
 
-    return s->parameters.xbzrle_cache_size;
+    return s->parameters.max_postcopy_bandwidth;
 }
 
 bool migrate_use_block(void)
@@ -1746,6 +2238,8 @@ static struct rp_cmd_args {
     [MIG_RP_MSG_PONG]           = { .len =  4, .name = "PONG" },
     [MIG_RP_MSG_REQ_PAGES]      = { .len = 12, .name = "REQ_PAGES" },
     [MIG_RP_MSG_REQ_PAGES_ID]   = { .len = -1, .name = "REQ_PAGES_ID" },
+    [MIG_RP_MSG_RECV_BITMAP]    = { .len = -1, .name = "RECV_BITMAP" },
+    [MIG_RP_MSG_RESUME_ACK]     = { .len =  4, .name = "RESUME_ACK" },
     [MIG_RP_MSG_MAX]            = { .len = -1, .name = "MAX" },
 };
 
@@ -1778,6 +2272,51 @@ static void migrate_handle_rp_req_pages(MigrationState *ms, const char* rbname,
     }
 }
 
+/* Return true to retry, false to quit */
+static bool postcopy_pause_return_path_thread(MigrationState *s)
+{
+    trace_postcopy_pause_return_path();
+
+    qemu_sem_wait(&s->postcopy_pause_rp_sem);
+
+    trace_postcopy_pause_return_path_continued();
+
+    return true;
+}
+
+static int migrate_handle_rp_recv_bitmap(MigrationState *s, char *block_name)
+{
+    RAMBlock *block = qemu_ram_block_by_name(block_name);
+
+    if (!block) {
+        error_report("%s: invalid block name '%s'", __func__, block_name);
+        return -EINVAL;
+    }
+
+    /* Fetch the received bitmap and refresh the dirty bitmap */
+    return ram_dirty_bitmap_reload(s, block);
+}
+
+static int migrate_handle_rp_resume_ack(MigrationState *s, uint32_t value)
+{
+    trace_source_return_path_thread_resume_ack(value);
+
+    if (value != MIGRATION_RESUME_ACK_VALUE) {
+        error_report("%s: illegal resume_ack value %"PRIu32,
+                     __func__, value);
+        return -1;
+    }
+
+    /* Now both sides are active. */
+    migrate_set_state(&s->state, MIGRATION_STATUS_POSTCOPY_RECOVER,
+                      MIGRATION_STATUS_POSTCOPY_ACTIVE);
+
+    /* Notify send thread that time to continue send pages */
+    qemu_sem_post(&s->rp_state.rp_sem);
+
+    return 0;
+}
+
 /*
  * Handles messages sent on the return path towards the source VM
  *
@@ -1794,6 +2333,9 @@ static void *source_return_path_thread(void *opaque)
     int res;
 
     trace_source_return_path_thread_entry();
+    rcu_register_thread();
+
+retry:
     while (!ms->rp_state.error && !qemu_file_get_error(rp) &&
            migration_is_setup_or_active(ms->state)) {
         trace_source_return_path_thread_loop_top();
@@ -1881,23 +2423,65 @@ static void *source_return_path_thread(void *opaque)
             migrate_handle_rp_req_pages(ms, (char *)&buf[13], start, len);
             break;
 
+        case MIG_RP_MSG_RECV_BITMAP:
+            if (header_len < 1) {
+                error_report("%s: missing block name", __func__);
+                mark_source_rp_bad(ms);
+                goto out;
+            }
+            /* Format: len (1B) + idstr (<255B). This ends the idstr. */
+            buf[buf[0] + 1] = '\0';
+            if (migrate_handle_rp_recv_bitmap(ms, (char *)(buf + 1))) {
+                mark_source_rp_bad(ms);
+                goto out;
+            }
+            break;
+
+        case MIG_RP_MSG_RESUME_ACK:
+            tmp32 = ldl_be_p(buf);
+            if (migrate_handle_rp_resume_ack(ms, tmp32)) {
+                mark_source_rp_bad(ms);
+                goto out;
+            }
+            break;
+
         default:
             break;
         }
     }
-    if (qemu_file_get_error(rp)) {
+
+out:
+    res = qemu_file_get_error(rp);
+    if (res) {
+        if (res == -EIO) {
+            /*
+             * Maybe there is something we can do: it looks like a
+             * network down issue, and we pause for a recovery.
+             */
+            if (postcopy_pause_return_path_thread(ms)) {
+                /* Reload rp, reset the rest */
+                if (rp != ms->rp_state.from_dst_file) {
+                    qemu_fclose(rp);
+                    rp = ms->rp_state.from_dst_file;
+                }
+                ms->rp_state.error = false;
+                goto retry;
+            }
+        }
+
         trace_source_return_path_thread_bad_end();
         mark_source_rp_bad(ms);
     }
 
     trace_source_return_path_thread_end();
-out:
     ms->rp_state.from_dst_file = NULL;
     qemu_fclose(rp);
+    rcu_unregister_thread();
     return NULL;
 }
 
-static int open_return_path_on_source(MigrationState *ms)
+static int open_return_path_on_source(MigrationState *ms,
+                                      bool create_thread)
 {
 
     ms->rp_state.from_dst_file = qemu_file_get_return_path(ms->to_dst_file);
@@ -1906,6 +2490,12 @@ static int open_return_path_on_source(MigrationState *ms)
     }
 
     trace_open_return_path_on_source();
+
+    if (!create_thread) {
+        /* We're done */
+        return 0;
+    }
+
     qemu_thread_create(&ms->rp_state.rp_thread, "return path",
                        source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
 
@@ -1946,6 +2536,7 @@ static int postcopy_start(MigrationState *ms)
     QIOChannelBuffer *bioc;
     QEMUFile *fb;
     int64_t time_at_stop = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+    int64_t bandwidth = migrate_max_postcopy_bandwidth();
     bool restart_block = false;
     int cur_state = MIGRATION_STATUS_ACTIVE;
     if (!migrate_pause_before_switchover()) {
@@ -1957,7 +2548,7 @@ static int postcopy_start(MigrationState *ms)
     qemu_mutex_lock_iothread();
     trace_postcopy_start_set_run();
 
-    qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER);
+    qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER, NULL);
     global_state_store();
     ret = vm_stop_force_state(RUN_STATE_FINISH_MIGRATE);
     if (ret < 0) {
@@ -2000,7 +2591,12 @@ static int postcopy_start(MigrationState *ms)
      * will notice we're in POSTCOPY_ACTIVE and not actually
      * wrap their state up here
      */
-    qemu_file_set_rate_limit(ms->to_dst_file, INT64_MAX);
+    /* 0 max-postcopy-bandwidth means unlimited */
+    if (!bandwidth) {
+        qemu_file_set_rate_limit(ms->to_dst_file, INT64_MAX);
+    } else {
+        qemu_file_set_rate_limit(ms->to_dst_file, bandwidth / XFER_LIMIT_RATIO);
+    }
     if (migrate_postcopy_ram()) {
         /* Ping just for debugging, helps line traces up */
         qemu_savevm_send_ping(ms->to_dst_file, 2);
@@ -2156,7 +2752,7 @@ static void migration_completion(MigrationState *s)
     if (s->state == MIGRATION_STATUS_ACTIVE) {
         qemu_mutex_lock_iothread();
         s->downtime_start = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
-        qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER);
+        qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER, NULL);
         s->vm_was_running = runstate_is_running();
         ret = global_state_store();
 
@@ -2245,10 +2841,174 @@ bool migrate_colo_enabled(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_X_COLO];
 }
 
+typedef enum MigThrError {
+    /* No error detected */
+    MIG_THR_ERR_NONE = 0,
+    /* Detected error, but resumed successfully */
+    MIG_THR_ERR_RECOVERED = 1,
+    /* Detected fatal error, need to exit */
+    MIG_THR_ERR_FATAL = 2,
+} MigThrError;
+
+static int postcopy_resume_handshake(MigrationState *s)
+{
+    qemu_savevm_send_postcopy_resume(s->to_dst_file);
+
+    while (s->state == MIGRATION_STATUS_POSTCOPY_RECOVER) {
+        qemu_sem_wait(&s->rp_state.rp_sem);
+    }
+
+    if (s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+        return 0;
+    }
+
+    return -1;
+}
+
+/* Return zero if success, or <0 for error */
+static int postcopy_do_resume(MigrationState *s)
+{
+    int ret;
+
+    /*
+     * Call all the resume_prepare() hooks, so that modules can be
+     * ready for the migration resume.
+     */
+    ret = qemu_savevm_state_resume_prepare(s);
+    if (ret) {
+        error_report("%s: resume_prepare() failure detected: %d",
+                     __func__, ret);
+        return ret;
+    }
+
+    /*
+     * Last handshake with destination on the resume (destination will
+     * switch to postcopy-active afterwards)
+     */
+    ret = postcopy_resume_handshake(s);
+    if (ret) {
+        error_report("%s: handshake failed: %d", __func__, ret);
+        return ret;
+    }
+
+    return 0;
+}
+
+/*
+ * We don't return until we are in a safe state to continue current
+ * postcopy migration.  Returns MIG_THR_ERR_RECOVERED if recovered, or
+ * MIG_THR_ERR_FATAL if unrecovery failure happened.
+ */
+static MigThrError postcopy_pause(MigrationState *s)
+{
+    assert(s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE);
+
+    while (true) {
+        QEMUFile *file;
+
+        migrate_set_state(&s->state, s->state,
+                          MIGRATION_STATUS_POSTCOPY_PAUSED);
+
+        /* Current channel is possibly broken. Release it. */
+        assert(s->to_dst_file);
+        qemu_mutex_lock(&s->qemu_file_lock);
+        file = s->to_dst_file;
+        s->to_dst_file = NULL;
+        qemu_mutex_unlock(&s->qemu_file_lock);
+
+        qemu_file_shutdown(file);
+        qemu_fclose(file);
+
+        error_report("Detected IO failure for postcopy. "
+                     "Migration paused.");
+
+        /*
+         * We wait until things fixed up. Then someone will setup the
+         * status back for us.
+         */
+        while (s->state == MIGRATION_STATUS_POSTCOPY_PAUSED) {
+            qemu_sem_wait(&s->postcopy_pause_sem);
+        }
+
+        if (s->state == MIGRATION_STATUS_POSTCOPY_RECOVER) {
+            /* Woken up by a recover procedure. Give it a shot */
+
+            /*
+             * Firstly, let's wake up the return path now, with a new
+             * return path channel.
+             */
+            qemu_sem_post(&s->postcopy_pause_rp_sem);
+
+            /* Do the resume logic */
+            if (postcopy_do_resume(s) == 0) {
+                /* Let's continue! */
+                trace_postcopy_pause_continued();
+                return MIG_THR_ERR_RECOVERED;
+            } else {
+                /*
+                 * Something wrong happened during the recovery, let's
+                 * pause again. Pause is always better than throwing
+                 * data away.
+                 */
+                continue;
+            }
+        } else {
+            /* This is not right... Time to quit. */
+            return MIG_THR_ERR_FATAL;
+        }
+    }
+}
+
+static MigThrError migration_detect_error(MigrationState *s)
+{
+    int ret;
+    int state = s->state;
+
+    if (state == MIGRATION_STATUS_CANCELLING ||
+        state == MIGRATION_STATUS_CANCELLED) {
+        /* End the migration, but don't set the state to failed */
+        return MIG_THR_ERR_FATAL;
+    }
+
+    /* Try to detect any file errors */
+    ret = qemu_file_get_error(s->to_dst_file);
+
+    if (!ret) {
+        /* Everything is fine */
+        return MIG_THR_ERR_NONE;
+    }
+
+    if (state == MIGRATION_STATUS_POSTCOPY_ACTIVE && ret == -EIO) {
+        /*
+         * For postcopy, we allow the network to be down for a
+         * while. After that, it can be continued by a
+         * recovery phase.
+         */
+        return postcopy_pause(s);
+    } else {
+        /*
+         * For precopy (or postcopy with error outside IO), we fail
+         * with no time.
+         */
+        migrate_set_state(&s->state, state, MIGRATION_STATUS_FAILED);
+        trace_migration_thread_file_err();
+
+        /* Time to stop the migration, now. */
+        return MIG_THR_ERR_FATAL;
+    }
+}
+
+/* How many bytes have we transferred since the beggining of the migration */
+static uint64_t migration_total_bytes(MigrationState *s)
+{
+    return qemu_ftell(s->to_dst_file) + ram_counters.multifd_bytes;
+}
+
 static void migration_calculate_complete(MigrationState *s)
 {
-    uint64_t bytes = qemu_ftell(s->to_dst_file);
+    uint64_t bytes = migration_total_bytes(s);
     int64_t end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+    int64_t transfer_time;
 
     s->total_time = end_time - s->start_time;
     if (!s->downtime) {
@@ -2259,22 +3019,25 @@ static void migration_calculate_complete(MigrationState *s)
         s->downtime = end_time - s->downtime_start;
     }
 
-    if (s->total_time) {
-        s->mbps = ((double) bytes * 8.0) / s->total_time / 1000;
+    transfer_time = s->total_time - s->setup_time;
+    if (transfer_time) {
+        s->mbps = ((double) bytes * 8.0) / transfer_time / 1000;
     }
 }
 
 static void migration_update_counters(MigrationState *s,
                                       int64_t current_time)
 {
-    uint64_t transferred, time_spent;
+    uint64_t transferred, transferred_pages, time_spent;
+    uint64_t current_bytes; /* bytes transferred since the beginning */
     double bandwidth;
 
     if (current_time < s->iteration_start_time + BUFFER_DELAY) {
         return;
     }
 
-    transferred = qemu_ftell(s->to_dst_file) - s->iteration_initial_bytes;
+    current_bytes = migration_total_bytes(s);
+    transferred = current_bytes - s->iteration_initial_bytes;
     time_spent = current_time - s->iteration_start_time;
     bandwidth = (double)transferred / time_spent;
     s->threshold_size = bandwidth * s->parameters.downtime_limit;
@@ -2282,19 +3045,24 @@ static void migration_update_counters(MigrationState *s,
     s->mbps = (((double) transferred * 8.0) /
                ((double) time_spent / 1000.0)) / 1000.0 / 1000.0;
 
+    transferred_pages = ram_get_total_transferred_pages() -
+                            s->iteration_initial_pages;
+    s->pages_per_second = (double) transferred_pages /
+                             (((double) time_spent / 1000.0));
+
     /*
      * if we haven't sent anything, we don't want to
      * recalculate. 10000 is a small enough number for our purposes
      */
     if (ram_counters.dirty_pages_rate && transferred > 10000) {
-        s->expected_downtime = ram_counters.dirty_pages_rate *
-            qemu_target_page_size() / bandwidth;
+        s->expected_downtime = ram_counters.remaining / bandwidth;
     }
 
     qemu_file_reset_rate_limit(s->to_dst_file);
 
     s->iteration_start_time = current_time;
-    s->iteration_initial_bytes = qemu_ftell(s->to_dst_file);
+    s->iteration_initial_bytes = current_bytes;
+    s->iteration_initial_pages = ram_get_total_transferred_pages();
 
     trace_migrate_transferred(transferred, time_spent,
                               bandwidth, s->threshold_size);
@@ -2375,6 +3143,7 @@ static void migration_iteration_finish(MigrationState *s)
         /* Fallthrough */
     case MIGRATION_STATUS_FAILED:
     case MIGRATION_STATUS_CANCELLED:
+    case MIGRATION_STATUS_CANCELLING:
         if (s->vm_was_running) {
             vm_start();
         } else {
@@ -2389,10 +3158,20 @@ static void migration_iteration_finish(MigrationState *s)
         error_report("%s: Unknown ending state %d", __func__, s->state);
         break;
     }
-    qemu_bh_schedule(s->cleanup_bh);
+    migrate_fd_cleanup_schedule(s);
     qemu_mutex_unlock_iothread();
 }
 
+void migration_make_urgent_request(void)
+{
+    qemu_sem_post(&migrate_get_current()->rate_limit_sem);
+}
+
+void migration_consume_urgent_request(void)
+{
+    qemu_sem_wait(&migrate_get_current()->rate_limit_sem);
+}
+
 /*
  * Master migration thread on the source VM.
  * It drives the migration and pumps the data down the outgoing channel.
@@ -2401,9 +3180,12 @@ static void *migration_thread(void *opaque)
 {
     MigrationState *s = opaque;
     int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
+    MigThrError thr_error;
+    bool urgent = false;
 
     rcu_register_thread();
 
+    object_ref(OBJECT(s));
     s->iteration_start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
 
     qemu_savevm_state_header(s->to_dst_file);
@@ -2429,6 +3211,11 @@ static void *migration_thread(void *opaque)
         qemu_savevm_send_postcopy_advise(s->to_dst_file);
     }
 
+    if (migrate_colo_enabled()) {
+        /* Notify migration destination that we enable COLO */
+        qemu_savevm_send_colo_enable(s->to_dst_file);
+    }
+
     qemu_savevm_state_setup(s->to_dst_file);
 
     s->setup_time = qemu_clock_get_ms(QEMU_CLOCK_HOST) - setup_start;
@@ -2441,7 +3228,7 @@ static void *migration_thread(void *opaque)
            s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
         int64_t current_time;
 
-        if (!qemu_file_rate_limit(s->to_dst_file)) {
+        if (urgent || !qemu_file_rate_limit(s->to_dst_file)) {
             MigIterateState iter_state = migration_iteration_run(s);
             if (iter_state == MIG_ITERATE_SKIP) {
                 continue;
@@ -2450,48 +3237,82 @@ static void *migration_thread(void *opaque)
             }
         }
 
-        if (qemu_file_get_error(s->to_dst_file)) {
-            if (migration_is_setup_or_active(s->state)) {
-                migrate_set_state(&s->state, s->state,
-                                  MIGRATION_STATUS_FAILED);
-            }
-            trace_migration_thread_file_err();
+        /*
+         * Try to detect any kind of failures, and see whether we
+         * should stop the migration now.
+         */
+        thr_error = migration_detect_error(s);
+        if (thr_error == MIG_THR_ERR_FATAL) {
+            /* Stop migration */
             break;
+        } else if (thr_error == MIG_THR_ERR_RECOVERED) {
+            /*
+             * Just recovered from a e.g. network failure, reset all
+             * the local variables. This is important to avoid
+             * breaking transferred_bytes and bandwidth calculation
+             */
+            s->iteration_start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+            s->iteration_initial_bytes = 0;
         }
 
         current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
 
         migration_update_counters(s, current_time);
 
+        urgent = false;
         if (qemu_file_rate_limit(s->to_dst_file)) {
-            /* usleep expects microseconds */
-            g_usleep((s->iteration_start_time + BUFFER_DELAY -
-                      current_time) * 1000);
+            /* Wait for a delay to do rate limiting OR
+             * something urgent to post the semaphore.
+             */
+            int ms = s->iteration_start_time + BUFFER_DELAY - current_time;
+            trace_migration_thread_ratelimit_pre(ms);
+            if (qemu_sem_timedwait(&s->rate_limit_sem, ms) == 0) {
+                /* We were worken by one or more urgent things but
+                 * the timedwait will have consumed one of them.
+                 * The service routine for the urgent wake will dec
+                 * the semaphore itself for each item it consumes,
+                 * so add this one we just eat back.
+                 */
+                qemu_sem_post(&s->rate_limit_sem);
+                urgent = true;
+            }
+            trace_migration_thread_ratelimit_post(urgent);
         }
     }
 
     trace_migration_thread_after_loop();
     migration_iteration_finish(s);
+    object_unref(OBJECT(s));
     rcu_unregister_thread();
     return NULL;
 }
 
 void migrate_fd_connect(MigrationState *s, Error *error_in)
 {
+    int64_t rate_limit;
+    bool resume = s->state == MIGRATION_STATUS_POSTCOPY_PAUSED;
+
     s->expected_downtime = s->parameters.downtime_limit;
-    s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s);
+    s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup_bh, s);
     if (error_in) {
         migrate_fd_error(s, error_in);
         migrate_fd_cleanup(s);
         return;
     }
 
-    qemu_file_set_blocking(s->to_dst_file, true);
-    qemu_file_set_rate_limit(s->to_dst_file,
-                             s->parameters.max_bandwidth / XFER_LIMIT_RATIO);
+    if (resume) {
+        /* This is a resumed migration */
+        rate_limit = INT64_MAX;
+    } else {
+        /* This is a fresh new migration */
+        rate_limit = s->parameters.max_bandwidth / XFER_LIMIT_RATIO;
 
-    /* Notify before starting migration thread */
-    notifier_list_notify(&migration_state_notifiers, s);
+        /* Notify before starting migration thread */
+        notifier_list_notify(&migration_state_notifiers, s);
+    }
+
+    qemu_file_set_rate_limit(s->to_dst_file, rate_limit);
+    qemu_file_set_blocking(s->to_dst_file, true);
 
     /*
      * Open the return path. For postcopy, it is used exclusively. For
@@ -2499,15 +3320,22 @@ void migrate_fd_connect(MigrationState *s, Error *error_in)
      * QEMU uses the return path.
      */
     if (migrate_postcopy_ram() || migrate_use_return_path()) {
-        if (open_return_path_on_source(s)) {
+        if (open_return_path_on_source(s, !resume)) {
             error_report("Unable to open return-path for postcopy");
-            migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
-                              MIGRATION_STATUS_FAILED);
+            migrate_set_state(&s->state, s->state, MIGRATION_STATUS_FAILED);
             migrate_fd_cleanup(s);
             return;
         }
     }
 
+    if (resume) {
+        /* Wakeup the main migration thread to do the recovery */
+        migrate_set_state(&s->state, MIGRATION_STATUS_POSTCOPY_PAUSED,
+                          MIGRATION_STATUS_POSTCOPY_RECOVER);
+        qemu_sem_post(&s->postcopy_pause_sem);
+        return;
+    }
+
     if (multifd_save_setup() != 0) {
         migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
                           MIGRATION_STATUS_FAILED);
@@ -2527,11 +3355,13 @@ void migration_global_dump(Monitor *mon)
     monitor_printf(mon, "store-global-state: %s\n",
                    ms->store_global_state ? "on" : "off");
     monitor_printf(mon, "only-migratable: %s\n",
-                   ms->only_migratable ? "on" : "off");
+                   only_migratable ? "on" : "off");
     monitor_printf(mon, "send-configuration: %s\n",
                    ms->send_configuration ? "on" : "off");
     monitor_printf(mon, "send-section-footer: %s\n",
                    ms->send_section_footer ? "on" : "off");
+    monitor_printf(mon, "decompress-error-check: %s\n",
+                   ms->decompress_error_check ? "on" : "off");
 }
 
 #define DEFINE_PROP_MIG_CAP(name, x)             \
@@ -2540,11 +3370,12 @@ void migration_global_dump(Monitor *mon)
 static Property migration_properties[] = {
     DEFINE_PROP_BOOL("store-global-state", MigrationState,
                      store_global_state, true),
-    DEFINE_PROP_BOOL("only-migratable", MigrationState, only_migratable, false),
     DEFINE_PROP_BOOL("send-configuration", MigrationState,
                      send_configuration, true),
     DEFINE_PROP_BOOL("send-section-footer", MigrationState,
                      send_section_footer, true),
+    DEFINE_PROP_BOOL("decompress-error-check", MigrationState,
+                      decompress_error_check, true),
 
     /* Migration parameters */
     DEFINE_PROP_UINT8("x-compress-level", MigrationState,
@@ -2553,6 +3384,8 @@ static Property migration_properties[] = {
     DEFINE_PROP_UINT8("x-compress-threads", MigrationState,
                       parameters.compress_threads,
                       DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT),
+    DEFINE_PROP_BOOL("x-compress-wait-thread", MigrationState,
+                      parameters.compress_wait_thread, true),
     DEFINE_PROP_UINT8("x-decompress-threads", MigrationState,
                       parameters.decompress_threads,
                       DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT),
@@ -2570,15 +3403,30 @@ static Property migration_properties[] = {
     DEFINE_PROP_UINT32("x-checkpoint-delay", MigrationState,
                       parameters.x_checkpoint_delay,
                       DEFAULT_MIGRATE_X_CHECKPOINT_DELAY),
-    DEFINE_PROP_UINT8("x-multifd-channels", MigrationState,
-                      parameters.x_multifd_channels,
+    DEFINE_PROP_UINT8("multifd-channels", MigrationState,
+                      parameters.multifd_channels,
                       DEFAULT_MIGRATE_MULTIFD_CHANNELS),
-    DEFINE_PROP_UINT32("x-multifd-page-count", MigrationState,
-                      parameters.x_multifd_page_count,
-                      DEFAULT_MIGRATE_MULTIFD_PAGE_COUNT),
     DEFINE_PROP_SIZE("xbzrle-cache-size", MigrationState,
                       parameters.xbzrle_cache_size,
                       DEFAULT_MIGRATE_XBZRLE_CACHE_SIZE),
+    DEFINE_PROP_SIZE("max-postcopy-bandwidth", MigrationState,
+                      parameters.max_postcopy_bandwidth,
+                      DEFAULT_MIGRATE_MAX_POSTCOPY_BANDWIDTH),
+    DEFINE_PROP_UINT8("max-cpu-throttle", MigrationState,
+                      parameters.max_cpu_throttle,
+                      DEFAULT_MIGRATE_MAX_CPU_THROTTLE),
+    DEFINE_PROP_SIZE("announce-initial", MigrationState,
+                      parameters.announce_initial,
+                      DEFAULT_MIGRATE_ANNOUNCE_INITIAL),
+    DEFINE_PROP_SIZE("announce-max", MigrationState,
+                      parameters.announce_max,
+                      DEFAULT_MIGRATE_ANNOUNCE_MAX),
+    DEFINE_PROP_SIZE("announce-rounds", MigrationState,
+                      parameters.announce_rounds,
+                      DEFAULT_MIGRATE_ANNOUNCE_ROUNDS),
+    DEFINE_PROP_SIZE("announce-step", MigrationState,
+                      parameters.announce_step,
+                      DEFAULT_MIGRATE_ANNOUNCE_STEP),
 
     /* Migration capabilities */
     DEFINE_PROP_MIG_CAP("x-xbzrle", MIGRATION_CAPABILITY_XBZRLE),
@@ -2592,7 +3440,7 @@ static Property migration_properties[] = {
     DEFINE_PROP_MIG_CAP("x-release-ram", MIGRATION_CAPABILITY_RELEASE_RAM),
     DEFINE_PROP_MIG_CAP("x-block", MIGRATION_CAPABILITY_BLOCK),
     DEFINE_PROP_MIG_CAP("x-return-path", MIGRATION_CAPABILITY_RETURN_PATH),
-    DEFINE_PROP_MIG_CAP("x-multifd", MIGRATION_CAPABILITY_X_MULTIFD),
+    DEFINE_PROP_MIG_CAP("x-multifd", MIGRATION_CAPABILITY_MULTIFD),
 
     DEFINE_PROP_END_OF_LIST(),
 };
@@ -2611,9 +3459,14 @@ static void migration_instance_finalize(Object *obj)
     MigrationParameters *params = &ms->parameters;
 
     qemu_mutex_destroy(&ms->error_mutex);
+    qemu_mutex_destroy(&ms->qemu_file_lock);
     g_free(params->tls_hostname);
     g_free(params->tls_creds);
+    qemu_sem_destroy(&ms->rate_limit_sem);
     qemu_sem_destroy(&ms->pause_sem);
+    qemu_sem_destroy(&ms->postcopy_pause_sem);
+    qemu_sem_destroy(&ms->postcopy_pause_rp_sem);
+    qemu_sem_destroy(&ms->rp_state.rp_sem);
     error_free(ms->error);
 }
 
@@ -2624,6 +3477,7 @@ static void migration_instance_init(Object *obj)
 
     ms->state = MIGRATION_STATUS_NONE;
     ms->mbps = -1;
+    ms->pages_per_second = -1;
     qemu_sem_init(&ms->pause_sem, 0);
     qemu_mutex_init(&ms->error_mutex);
 
@@ -2640,9 +3494,20 @@ static void migration_instance_init(Object *obj)
     params->has_downtime_limit = true;
     params->has_x_checkpoint_delay = true;
     params->has_block_incremental = true;
-    params->has_x_multifd_channels = true;
-    params->has_x_multifd_page_count = true;
+    params->has_multifd_channels = true;
     params->has_xbzrle_cache_size = true;
+    params->has_max_postcopy_bandwidth = true;
+    params->has_max_cpu_throttle = true;
+    params->has_announce_initial = true;
+    params->has_announce_max = true;
+    params->has_announce_rounds = true;
+    params->has_announce_step = true;
+
+    qemu_sem_init(&ms->postcopy_pause_sem, 0);
+    qemu_sem_init(&ms->postcopy_pause_rp_sem, 0);
+    qemu_sem_init(&ms->rp_state.rp_sem, 0);
+    qemu_sem_init(&ms->rate_limit_sem, 0);
+    qemu_mutex_init(&ms->qemu_file_lock);
 }
 
 /*