]> git.proxmox.com Git - mirror_qemu.git/blobdiff - migration/migration.c
migration/rdma: fix potential nullptr access in rdma_start_incoming_migration
[mirror_qemu.git] / migration / migration.c
index 0c51aa6ac7b85251f94a73a6605b04471eb07755..0bb042a0f78bf9b748f7ef9e0fc956fd93c090e3 100644 (file)
@@ -52,6 +52,8 @@
 #include "hw/qdev-properties.h"
 #include "monitor/monitor.h"
 #include "net/announce.h"
+#include "qemu/queue.h"
+#include "multifd.h"
 
 #define MAX_THROTTLE  (32 << 20)      /* Migration transfer speed throttling */
 
@@ -76,6 +78,7 @@
 /*0: means nocompress, 1: best speed, ... 9: best compress ratio */
 #define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
 /* Define default autoconverge cpu throttle migration parameters */
+#define DEFAULT_MIGRATE_THROTTLE_TRIGGER_THRESHOLD 50
 #define DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL 20
 #define DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT 10
 #define DEFAULT_MIGRATE_MAX_CPU_THROTTLE 99
 /* The delay time (in ms) between two COLO checkpoints */
 #define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY (200 * 100)
 #define DEFAULT_MIGRATE_MULTIFD_CHANNELS 2
+#define DEFAULT_MIGRATE_MULTIFD_COMPRESSION MULTIFD_COMPRESSION_NONE
+/* 0: means nocompress, 1: best speed, ... 9: best compress ratio */
+#define DEFAULT_MIGRATE_MULTIFD_ZLIB_LEVEL 1
+/* 0: means nocompress, 1: best speed, ... 20: best compress ratio */
+#define DEFAULT_MIGRATE_MULTIFD_ZSTD_LEVEL 1
 
 /* Background transfer rate for postcopy, 0 means unlimited, note
  * that page requests can still exceed this limit.
@@ -482,11 +490,6 @@ static void process_incoming_migration_co(void *opaque)
             goto fail;
         }
 
-        if (colo_init_ram_cache() < 0) {
-            error_report("Init ram cache failed");
-            goto fail;
-        }
-
         qemu_thread_create(&mis->colo_incoming_thread, "COLO incoming",
              colo_process_incoming_thread, mis, QEMU_THREAD_JOINABLE);
         mis->have_colo_incoming_thread = true;
@@ -517,13 +520,23 @@ fail:
     exit(EXIT_FAILURE);
 }
 
-static void migration_incoming_setup(QEMUFile *f)
+/**
+ * @migration_incoming_setup: Setup incoming migration
+ *
+ * Returns 0 for no error or 1 for error
+ *
+ * @f: file for main migration channel
+ * @errp: where to put errors
+ */
+static int migration_incoming_setup(QEMUFile *f, Error **errp)
 {
     MigrationIncomingState *mis = migration_incoming_get_current();
+    Error *local_err = NULL;
 
-    if (multifd_load_setup() != 0) {
+    if (multifd_load_setup(&local_err) != 0) {
         /* We haven't been able to create multifd threads
            nothing better to do */
+        error_report_err(local_err);
         exit(EXIT_FAILURE);
     }
 
@@ -531,6 +544,7 @@ static void migration_incoming_setup(QEMUFile *f)
         mis->from_src_file = f;
     }
     qemu_file_set_blocking(f, false);
+    return 0;
 }
 
 void migration_incoming_process(void)
@@ -571,19 +585,27 @@ static bool postcopy_try_recover(QEMUFile *f)
     return false;
 }
 
-void migration_fd_process_incoming(QEMUFile *f)
+void migration_fd_process_incoming(QEMUFile *f, Error **errp)
 {
+    Error *local_err = NULL;
+
     if (postcopy_try_recover(f)) {
         return;
     }
 
-    migration_incoming_setup(f);
+    if (migration_incoming_setup(f, &local_err)) {
+        if (local_err) {
+            error_propagate(errp, local_err);
+        }
+        return;
+    }
     migration_incoming_process();
 }
 
 void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
 {
     MigrationIncomingState *mis = migration_incoming_get_current();
+    Error *local_err = NULL;
     bool start_migration;
 
     if (!mis->from_src_file) {
@@ -595,7 +617,12 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
             return;
         }
 
-        migration_incoming_setup(f);
+        if (migration_incoming_setup(f, &local_err)) {
+            if (local_err) {
+                error_propagate(errp, local_err);
+            }
+            return;
+        }
 
         /*
          * Common migration only needs one channel, so we can start
@@ -603,7 +630,6 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
          */
         start_migration = !migrate_use_multifd();
     } else {
-        Error *local_err = NULL;
         /* Multiple connections */
         assert(migrate_use_multifd());
         start_migration = multifd_recv_new_channel(ioc, &local_err);
@@ -753,16 +779,21 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
     params->compress_wait_thread = s->parameters.compress_wait_thread;
     params->has_decompress_threads = true;
     params->decompress_threads = s->parameters.decompress_threads;
+    params->has_throttle_trigger_threshold = true;
+    params->throttle_trigger_threshold = s->parameters.throttle_trigger_threshold;
     params->has_cpu_throttle_initial = true;
     params->cpu_throttle_initial = s->parameters.cpu_throttle_initial;
     params->has_cpu_throttle_increment = true;
     params->cpu_throttle_increment = s->parameters.cpu_throttle_increment;
+    params->has_cpu_throttle_tailslow = true;
+    params->cpu_throttle_tailslow = s->parameters.cpu_throttle_tailslow;
     params->has_tls_creds = true;
     params->tls_creds = g_strdup(s->parameters.tls_creds);
     params->has_tls_hostname = true;
     params->tls_hostname = g_strdup(s->parameters.tls_hostname);
     params->has_tls_authz = true;
-    params->tls_authz = g_strdup(s->parameters.tls_authz);
+    params->tls_authz = g_strdup(s->parameters.tls_authz ?
+                                 s->parameters.tls_authz : "");
     params->has_max_bandwidth = true;
     params->max_bandwidth = s->parameters.max_bandwidth;
     params->has_downtime_limit = true;
@@ -773,6 +804,12 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
     params->block_incremental = s->parameters.block_incremental;
     params->has_multifd_channels = true;
     params->multifd_channels = s->parameters.multifd_channels;
+    params->has_multifd_compression = true;
+    params->multifd_compression = s->parameters.multifd_compression;
+    params->has_multifd_zlib_level = true;
+    params->multifd_zlib_level = s->parameters.multifd_zlib_level;
+    params->has_multifd_zstd_level = true;
+    params->multifd_zstd_level = s->parameters.multifd_zstd_level;
     params->has_xbzrle_cache_size = true;
     params->xbzrle_cache_size = s->parameters.xbzrle_cache_size;
     params->has_max_postcopy_bandwidth = true;
@@ -819,6 +856,28 @@ bool migration_is_setup_or_active(int state)
     case MIGRATION_STATUS_SETUP:
     case MIGRATION_STATUS_PRE_SWITCHOVER:
     case MIGRATION_STATUS_DEVICE:
+    case MIGRATION_STATUS_WAIT_UNPLUG:
+    case MIGRATION_STATUS_COLO:
+        return true;
+
+    default:
+        return false;
+
+    }
+}
+
+bool migration_is_running(int state)
+{
+    switch (state) {
+    case MIGRATION_STATUS_ACTIVE:
+    case MIGRATION_STATUS_POSTCOPY_ACTIVE:
+    case MIGRATION_STATUS_POSTCOPY_PAUSED:
+    case MIGRATION_STATUS_POSTCOPY_RECOVER:
+    case MIGRATION_STATUS_SETUP:
+    case MIGRATION_STATUS_PRE_SWITCHOVER:
+    case MIGRATION_STATUS_DEVICE:
+    case MIGRATION_STATUS_WAIT_UNPLUG:
+    case MIGRATION_STATUS_CANCELLING:
         return true;
 
     default:
@@ -873,6 +932,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
         info->xbzrle_cache->pages = xbzrle_counters.pages;
         info->xbzrle_cache->cache_miss = xbzrle_counters.cache_miss;
         info->xbzrle_cache->cache_miss_rate = xbzrle_counters.cache_miss_rate;
+        info->xbzrle_cache->encoding_rate = xbzrle_counters.encoding_rate;
         info->xbzrle_cache->overflow = xbzrle_counters.overflow;
     }
 
@@ -954,6 +1014,9 @@ static void fill_source_migration_info(MigrationInfo *info)
     case MIGRATION_STATUS_CANCELLED:
         info->has_status = true;
         break;
+    case MIGRATION_STATUS_WAIT_UNPLUG:
+        info->has_status = true;
+        break;
     }
     info->status = s->state;
 }
@@ -1000,17 +1063,6 @@ static bool migrate_caps_check(bool *cap_list,
 #endif
 
     if (cap_list[MIGRATION_CAPABILITY_POSTCOPY_RAM]) {
-        if (cap_list[MIGRATION_CAPABILITY_COMPRESS]) {
-            /* The decompression threads asynchronously write into RAM
-             * rather than use the atomic copies needed to avoid
-             * userfaulting.  It should be possible to fix the decompression
-             * threads for compatibility in future.
-             */
-            error_setg(errp, "Postcopy is not currently compatible "
-                       "with compression");
-            return false;
-        }
-
         /* This check is reasonably expensive, so only when it's being
          * set the first time, also it's only the destination that needs
          * special support.
@@ -1083,7 +1135,7 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
     MigrationCapabilityStatusList *cap;
     bool cap_list[MIGRATION_CAPABILITY__MAX];
 
-    if (migration_is_setup_or_active(s->state)) {
+    if (migration_is_running(s->state)) {
         error_setg(errp, QERR_MIGRATION_ACTIVE);
         return;
     }
@@ -1125,6 +1177,15 @@ static bool migrate_params_check(MigrationParameters *params, Error **errp)
         return false;
     }
 
+    if (params->has_throttle_trigger_threshold &&
+        (params->throttle_trigger_threshold < 1 ||
+         params->throttle_trigger_threshold > 100)) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "throttle_trigger_threshold",
+                   "an integer in the range of 1 to 100");
+        return false;
+    }
+
     if (params->has_cpu_throttle_initial &&
         (params->cpu_throttle_initial < 1 ||
          params->cpu_throttle_initial > 99)) {
@@ -1144,16 +1205,19 @@ static bool migrate_params_check(MigrationParameters *params, Error **errp)
     }
 
     if (params->has_max_bandwidth && (params->max_bandwidth > SIZE_MAX)) {
-        error_setg(errp, "Parameter 'max_bandwidth' expects an integer in the"
-                         " range of 0 to %zu bytes/second", SIZE_MAX);
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "max_bandwidth",
+                   "an integer in the range of 0 to "stringify(SIZE_MAX)
+                   " bytes/second");
         return false;
     }
 
     if (params->has_downtime_limit &&
         (params->downtime_limit > MAX_MIGRATE_DOWNTIME)) {
-        error_setg(errp, "Parameter 'downtime_limit' expects an integer in "
-                         "the range of 0 to %d milliseconds",
-                         MAX_MIGRATE_DOWNTIME);
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "downtime_limit",
+                   "an integer in the range of 0 to "
+                    stringify(MAX_MIGRATE_DOWNTIME)" ms");
         return false;
     }
 
@@ -1166,13 +1230,27 @@ static bool migrate_params_check(MigrationParameters *params, Error **errp)
         return false;
     }
 
+    if (params->has_multifd_zlib_level &&
+        (params->multifd_zlib_level > 9)) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "multifd_zlib_level",
+                   "is invalid, it should be in the range of 0 to 9");
+        return false;
+    }
+
+    if (params->has_multifd_zstd_level &&
+        (params->multifd_zstd_level > 20)) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "multifd_zstd_level",
+                   "is invalid, it should be in the range of 0 to 20");
+        return false;
+    }
+
     if (params->has_xbzrle_cache_size &&
         (params->xbzrle_cache_size < qemu_target_page_size() ||
          !is_power_of_2(params->xbzrle_cache_size))) {
         error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
                    "xbzrle_cache_size",
                    "is invalid, it should be bigger than target page size"
-                   " and a power of two");
+                   " and a power of 2");
         return false;
     }
 
@@ -1240,6 +1318,10 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
         dest->decompress_threads = params->decompress_threads;
     }
 
+    if (params->has_throttle_trigger_threshold) {
+        dest->throttle_trigger_threshold = params->throttle_trigger_threshold;
+    }
+
     if (params->has_cpu_throttle_initial) {
         dest->cpu_throttle_initial = params->cpu_throttle_initial;
     }
@@ -1248,6 +1330,10 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
         dest->cpu_throttle_increment = params->cpu_throttle_increment;
     }
 
+    if (params->has_cpu_throttle_tailslow) {
+        dest->cpu_throttle_tailslow = params->cpu_throttle_tailslow;
+    }
+
     if (params->has_tls_creds) {
         assert(params->tls_creds->type == QTYPE_QSTRING);
         dest->tls_creds = g_strdup(params->tls_creds->u.s);
@@ -1276,6 +1362,9 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
     if (params->has_multifd_channels) {
         dest->multifd_channels = params->multifd_channels;
     }
+    if (params->has_multifd_compression) {
+        dest->multifd_compression = params->multifd_compression;
+    }
     if (params->has_xbzrle_cache_size) {
         dest->xbzrle_cache_size = params->xbzrle_cache_size;
     }
@@ -1321,6 +1410,10 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
         s->parameters.decompress_threads = params->decompress_threads;
     }
 
+    if (params->has_throttle_trigger_threshold) {
+        s->parameters.throttle_trigger_threshold = params->throttle_trigger_threshold;
+    }
+
     if (params->has_cpu_throttle_initial) {
         s->parameters.cpu_throttle_initial = params->cpu_throttle_initial;
     }
@@ -1329,6 +1422,10 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
         s->parameters.cpu_throttle_increment = params->cpu_throttle_increment;
     }
 
+    if (params->has_cpu_throttle_tailslow) {
+        s->parameters.cpu_throttle_tailslow = params->cpu_throttle_tailslow;
+    }
+
     if (params->has_tls_creds) {
         g_free(s->parameters.tls_creds);
         assert(params->tls_creds->type == QTYPE_QSTRING);
@@ -1372,6 +1469,9 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
     if (params->has_multifd_channels) {
         s->parameters.multifd_channels = params->multifd_channels;
     }
+    if (params->has_multifd_compression) {
+        s->parameters.multifd_compression = params->multifd_compression;
+    }
     if (params->has_xbzrle_cache_size) {
         s->parameters.xbzrle_cache_size = params->xbzrle_cache_size;
         xbzrle_cache_resize(params->xbzrle_cache_size, errp);
@@ -1567,11 +1667,10 @@ static void migrate_fd_cleanup_bh(void *opaque)
 
 void migrate_set_error(MigrationState *s, const Error *error)
 {
-    qemu_mutex_lock(&s->error_mutex);
+    QEMU_LOCK_GUARD(&s->error_mutex);
     if (!s->error) {
         s->error = error_copy(error);
     }
-    qemu_mutex_unlock(&s->error_mutex);
 }
 
 void migrate_fd_error(MigrationState *s, const Error *error)
@@ -1596,7 +1695,7 @@ static void migrate_fd_cancel(MigrationState *s)
 
     do {
         old_state = s->state;
-        if (!migration_is_setup_or_active(old_state)) {
+        if (!migration_is_running(old_state)) {
             break;
         }
         /* If the migration is paused, kick it out of the pause */
@@ -1694,6 +1793,7 @@ bool migration_is_idle(void)
     case MIGRATION_STATUS_COLO:
     case MIGRATION_STATUS_PRE_SWITCHOVER:
     case MIGRATION_STATUS_DEVICE:
+    case MIGRATION_STATUS_WAIT_UNPLUG:
         return false;
     case MIGRATION_STATUS__MAX:
         g_assert_not_reached();
@@ -1778,6 +1878,7 @@ void qmp_migrate_incoming(const char *uri, Error **errp)
     }
     if (!once) {
         error_setg(errp, "The incoming migration has already been started");
+        return;
     }
 
     qemu_start_incoming_migration(uri, &local_err);
@@ -1892,9 +1993,7 @@ static bool migrate_prepare(MigrationState *s, bool blk, bool blk_inc,
         return true;
     }
 
-    if (migration_is_setup_or_active(s->state) ||
-        s->state == MIGRATION_STATUS_CANCELLING ||
-        s->state == MIGRATION_STATUS_COLO) {
+    if (migration_is_running(s->state)) {
         error_setg(errp, QERR_MIGRATION_ACTIVE);
         return false;
     }
@@ -2022,18 +2121,18 @@ void qmp_migrate_set_speed(int64_t value, Error **errp)
 void qmp_migrate_set_downtime(double value, Error **errp)
 {
     if (value < 0 || value > MAX_MIGRATE_DOWNTIME_SECONDS) {
-        error_setg(errp, "Parameter 'downtime_limit' expects an integer in "
-                         "the range of 0 to %d seconds",
-                         MAX_MIGRATE_DOWNTIME_SECONDS);
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "downtime_limit",
+                   "an integer in the range of 0 to "
+                    stringify(MAX_MIGRATE_DOWNTIME_SECONDS)" seconds");
         return;
     }
 
     value *= 1000; /* Convert to milliseconds */
-    value = MAX(0, MIN(INT64_MAX, value));
 
     MigrateSetParameters p = {
         .has_downtime_limit = true,
-        .downtime_limit = value,
+        .downtime_limit = (int64_t)value,
     };
 
     qmp_migrate_set_parameters(&p, errp);
@@ -2198,6 +2297,33 @@ int migrate_multifd_channels(void)
     return s->parameters.multifd_channels;
 }
 
+MultiFDCompression migrate_multifd_compression(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters.multifd_compression;
+}
+
+int migrate_multifd_zlib_level(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters.multifd_zlib_level;
+}
+
+int migrate_multifd_zstd_level(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters.multifd_zstd_level;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
@@ -2284,7 +2410,7 @@ static struct rp_cmd_args {
 static void migrate_handle_rp_req_pages(MigrationState *ms, const char* rbname,
                                        ram_addr_t start, size_t len)
 {
-    long our_host_ps = getpagesize();
+    long our_host_ps = qemu_real_host_page_size;
 
     trace_migrate_handle_rp_req_pages(rbname, start, len);
 
@@ -2383,7 +2509,7 @@ retry:
         if (header_type >= MIG_RP_MSG_MAX ||
             header_type == MIG_RP_MSG_INVALID) {
             error_report("RP: Received invalid message 0x%04x length 0x%04x",
-                    header_type, header_len);
+                         header_type, header_len);
             mark_source_rp_bad(ms);
             goto out;
         }
@@ -2392,9 +2518,9 @@ retry:
             header_len != rp_cmd_args[header_type].len) ||
             header_len > sizeof(buf)) {
             error_report("RP: Received '%s' message (0x%04x) with"
-                    "incorrect length %d expecting %zu",
-                    rp_cmd_args[header_type].name, header_type, header_len,
-                    (size_t)rp_cmd_args[header_type].len);
+                         "incorrect length %d expecting %zu",
+                         rp_cmd_args[header_type].name, header_type, header_len,
+                         (size_t)rp_cmd_args[header_type].len);
             mark_source_rp_bad(ms);
             goto out;
         }
@@ -2449,7 +2575,7 @@ retry:
             }
             if (header_len != expected_len) {
                 error_report("RP: Req_Page_id with length %d expecting %zd",
-                        header_len, expected_len);
+                             header_len, expected_len);
                 mark_source_rp_bad(ms);
                 goto out;
             }
@@ -2486,7 +2612,7 @@ retry:
 out:
     res = qemu_file_get_error(rp);
     if (res) {
-        if (res == -EIO) {
+        if (res == -EIO && migration_in_postcopy()) {
             /*
              * Maybe there is something we can do: it looks like a
              * network down issue, and we pause for a recovery.
@@ -2759,14 +2885,22 @@ static int migration_maybe_pause(MigrationState *s,
         /* This block intentionally left blank */
     }
 
-    qemu_mutex_unlock_iothread();
-    migrate_set_state(&s->state, *current_active_state,
-                      MIGRATION_STATUS_PRE_SWITCHOVER);
-    qemu_sem_wait(&s->pause_sem);
-    migrate_set_state(&s->state, MIGRATION_STATUS_PRE_SWITCHOVER,
-                      new_state);
-    *current_active_state = new_state;
-    qemu_mutex_lock_iothread();
+    /*
+     * If the migration is cancelled when it is in the completion phase,
+     * the migration state is set to MIGRATION_STATUS_CANCELLING.
+     * So we don't need to wait a semaphore, otherwise we would always
+     * wait for the 'pause_sem' semaphore.
+     */
+    if (s->state != MIGRATION_STATUS_CANCELLING) {
+        qemu_mutex_unlock_iothread();
+        migrate_set_state(&s->state, *current_active_state,
+                          MIGRATION_STATUS_PRE_SWITCHOVER);
+        qemu_sem_wait(&s->pause_sem);
+        migrate_set_state(&s->state, MIGRATION_STATUS_PRE_SWITCHOVER,
+                          new_state);
+        *current_active_state = new_state;
+        qemu_mutex_lock_iothread();
+    }
 
     return s->state == new_state ? 0 : -EINVAL;
 }
@@ -3149,8 +3283,7 @@ static MigIterateState migration_iteration_run(MigrationState *s)
             return MIG_ITERATE_SKIP;
         }
         /* Just another iteration step */
-        qemu_savevm_state_iterate(s->to_dst_file,
-            s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE);
+        qemu_savevm_state_iterate(s->to_dst_file, in_postcopy);
     } else {
         trace_migration_thread_low_pending(pending_size);
         migration_completion(s);
@@ -3219,6 +3352,37 @@ void migration_consume_urgent_request(void)
     qemu_sem_wait(&migrate_get_current()->rate_limit_sem);
 }
 
+/* Returns true if the rate limiting was broken by an urgent request */
+bool migration_rate_limit(void)
+{
+    int64_t now = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+    MigrationState *s = migrate_get_current();
+
+    bool urgent = false;
+    migration_update_counters(s, now);
+    if (qemu_file_rate_limit(s->to_dst_file)) {
+        /*
+         * Wait for a delay to do rate limiting OR
+         * something urgent to post the semaphore.
+         */
+        int ms = s->iteration_start_time + BUFFER_DELAY - now;
+        trace_migration_rate_limit_pre(ms);
+        if (qemu_sem_timedwait(&s->rate_limit_sem, ms) == 0) {
+            /*
+             * We were woken by one or more urgent things but
+             * the timedwait will have consumed one of them.
+             * The service routine for the urgent wake will dec
+             * the semaphore itself for each item it consumes,
+             * so add this one we just eat back.
+             */
+            qemu_sem_post(&s->rate_limit_sem);
+            urgent = true;
+        }
+        trace_migration_rate_limit_post(urgent);
+    }
+    return urgent;
+}
+
 /*
  * Master migration thread on the source VM.
  * It drives the migration and pumps the data down the outgoing channel.
@@ -3265,6 +3429,19 @@ static void *migration_thread(void *opaque)
 
     qemu_savevm_state_setup(s->to_dst_file);
 
+    if (qemu_savevm_state_guest_unplug_pending()) {
+        migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
+                          MIGRATION_STATUS_WAIT_UNPLUG);
+
+        while (s->state == MIGRATION_STATUS_WAIT_UNPLUG &&
+               qemu_savevm_state_guest_unplug_pending()) {
+            qemu_sem_timedwait(&s->wait_unplug_sem, 250);
+        }
+
+        migrate_set_state(&s->state, MIGRATION_STATUS_WAIT_UNPLUG,
+                MIGRATION_STATUS_ACTIVE);
+    }
+
     s->setup_time = qemu_clock_get_ms(QEMU_CLOCK_HOST) - setup_start;
     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
                       MIGRATION_STATUS_ACTIVE);
@@ -3272,8 +3449,6 @@ static void *migration_thread(void *opaque)
     trace_migration_thread_setup_complete();
 
     while (migration_is_active(s)) {
-        int64_t current_time;
-
         if (urgent || !qemu_file_rate_limit(s->to_dst_file)) {
             MigIterateState iter_state = migration_iteration_run(s);
             if (iter_state == MIG_ITERATE_SKIP) {
@@ -3300,29 +3475,7 @@ static void *migration_thread(void *opaque)
             update_iteration_initial_status(s);
         }
 
-        current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
-
-        migration_update_counters(s, current_time);
-
-        urgent = false;
-        if (qemu_file_rate_limit(s->to_dst_file)) {
-            /* Wait for a delay to do rate limiting OR
-             * something urgent to post the semaphore.
-             */
-            int ms = s->iteration_start_time + BUFFER_DELAY - current_time;
-            trace_migration_thread_ratelimit_pre(ms);
-            if (qemu_sem_timedwait(&s->rate_limit_sem, ms) == 0) {
-                /* We were worken by one or more urgent things but
-                 * the timedwait will have consumed one of them.
-                 * The service routine for the urgent wake will dec
-                 * the semaphore itself for each item it consumes,
-                 * so add this one we just eat back.
-                 */
-                qemu_sem_post(&s->rate_limit_sem);
-                urgent = true;
-            }
-            trace_migration_thread_ratelimit_post(urgent);
-        }
+        urgent = migration_rate_limit();
     }
 
     trace_migration_thread_after_loop();
@@ -3334,11 +3487,17 @@ static void *migration_thread(void *opaque)
 
 void migrate_fd_connect(MigrationState *s, Error *error_in)
 {
+    Error *local_err = NULL;
     int64_t rate_limit;
     bool resume = s->state == MIGRATION_STATUS_POSTCOPY_PAUSED;
 
     s->expected_downtime = s->parameters.downtime_limit;
-    s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup_bh, s);
+    if (resume) {
+        assert(s->cleanup_bh);
+    } else {
+        assert(!s->cleanup_bh);
+        s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup_bh, s);
+    }
     if (error_in) {
         migrate_fd_error(s, error_in);
         migrate_fd_cleanup(s);
@@ -3382,7 +3541,8 @@ void migrate_fd_connect(MigrationState *s, Error *error_in)
         return;
     }
 
-    if (multifd_save_setup() != 0) {
+    if (multifd_save_setup(&local_err) != 0) {
+        error_report_err(local_err);
         migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
                           MIGRATION_STATUS_FAILED);
         migrate_fd_cleanup(s);
@@ -3439,12 +3599,17 @@ static Property migration_properties[] = {
     DEFINE_PROP_UINT8("x-decompress-threads", MigrationState,
                       parameters.decompress_threads,
                       DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT),
+    DEFINE_PROP_UINT8("x-throttle-trigger-threshold", MigrationState,
+                      parameters.throttle_trigger_threshold,
+                      DEFAULT_MIGRATE_THROTTLE_TRIGGER_THRESHOLD),
     DEFINE_PROP_UINT8("x-cpu-throttle-initial", MigrationState,
                       parameters.cpu_throttle_initial,
                       DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL),
     DEFINE_PROP_UINT8("x-cpu-throttle-increment", MigrationState,
                       parameters.cpu_throttle_increment,
                       DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT),
+    DEFINE_PROP_BOOL("x-cpu-throttle-tailslow", MigrationState,
+                      parameters.cpu_throttle_tailslow, false),
     DEFINE_PROP_SIZE("x-max-bandwidth", MigrationState,
                       parameters.max_bandwidth, MAX_THROTTLE),
     DEFINE_PROP_UINT64("x-downtime-limit", MigrationState,
@@ -3456,6 +3621,15 @@ static Property migration_properties[] = {
     DEFINE_PROP_UINT8("multifd-channels", MigrationState,
                       parameters.multifd_channels,
                       DEFAULT_MIGRATE_MULTIFD_CHANNELS),
+    DEFINE_PROP_MULTIFD_COMPRESSION("multifd-compression", MigrationState,
+                      parameters.multifd_compression,
+                      DEFAULT_MIGRATE_MULTIFD_COMPRESSION),
+    DEFINE_PROP_UINT8("multifd-zlib-level", MigrationState,
+                      parameters.multifd_zlib_level,
+                      DEFAULT_MIGRATE_MULTIFD_ZLIB_LEVEL),
+    DEFINE_PROP_UINT8("multifd-zstd-level", MigrationState,
+                      parameters.multifd_zstd_level,
+                      DEFAULT_MIGRATE_MULTIFD_ZSTD_LEVEL),
     DEFINE_PROP_SIZE("xbzrle-cache-size", MigrationState,
                       parameters.xbzrle_cache_size,
                       DEFAULT_MIGRATE_XBZRLE_CACHE_SIZE),
@@ -3500,7 +3674,7 @@ static void migration_class_init(ObjectClass *klass, void *data)
     DeviceClass *dc = DEVICE_CLASS(klass);
 
     dc->user_creatable = false;
-    dc->props = migration_properties;
+    device_class_set_props(dc, migration_properties);
 }
 
 static void migration_instance_finalize(Object *obj)
@@ -3512,6 +3686,7 @@ static void migration_instance_finalize(Object *obj)
     qemu_mutex_destroy(&ms->qemu_file_lock);
     g_free(params->tls_hostname);
     g_free(params->tls_creds);
+    qemu_sem_destroy(&ms->wait_unplug_sem);
     qemu_sem_destroy(&ms->rate_limit_sem);
     qemu_sem_destroy(&ms->pause_sem);
     qemu_sem_destroy(&ms->postcopy_pause_sem);
@@ -3538,13 +3713,18 @@ static void migration_instance_init(Object *obj)
     params->has_compress_level = true;
     params->has_compress_threads = true;
     params->has_decompress_threads = true;
+    params->has_throttle_trigger_threshold = true;
     params->has_cpu_throttle_initial = true;
     params->has_cpu_throttle_increment = true;
+    params->has_cpu_throttle_tailslow = true;
     params->has_max_bandwidth = true;
     params->has_downtime_limit = true;
     params->has_x_checkpoint_delay = true;
     params->has_block_incremental = true;
     params->has_multifd_channels = true;
+    params->has_multifd_compression = true;
+    params->has_multifd_zlib_level = true;
+    params->has_multifd_zstd_level = true;
     params->has_xbzrle_cache_size = true;
     params->has_max_postcopy_bandwidth = true;
     params->has_max_cpu_throttle = true;
@@ -3557,6 +3737,7 @@ static void migration_instance_init(Object *obj)
     qemu_sem_init(&ms->postcopy_pause_rp_sem, 0);
     qemu_sem_init(&ms->rp_state.rp_sem, 0);
     qemu_sem_init(&ms->rate_limit_sem, 0);
+    qemu_sem_init(&ms->wait_unplug_sem, 0);
     qemu_mutex_init(&ms->qemu_file_lock);
 }