]> git.proxmox.com Git - mirror_qemu.git/blobdiff - arch_init.c
spapr_iommu: Make H_PUT_TCE_INDIRECT endian-safe
[mirror_qemu.git] / arch_init.c
index fcfa32828d8564a763a9a1bbfe1ada5e55a228ef..23d3feba44ab960680b216a39afe53bd5a813b14 100644 (file)
@@ -24,6 +24,7 @@
 #include <stdint.h>
 #include <stdarg.h>
 #include <stdlib.h>
+#include <zlib.h>
 #ifndef _WIN32
 #include <sys/types.h>
 #include <sys/mman.h>
@@ -127,6 +128,7 @@ static uint64_t bitmap_sync_count;
 #define RAM_SAVE_FLAG_CONTINUE 0x20
 #define RAM_SAVE_FLAG_XBZRLE   0x40
 /* 0x80 is reserved in migration.h start with 0x100 next */
+#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
 
 static struct defconfig_file {
     const char *filename;
@@ -316,6 +318,147 @@ static uint64_t migration_dirty_pages;
 static uint32_t last_version;
 static bool ram_bulk_stage;
 
+struct CompressParam {
+    bool start;
+    bool done;
+    QEMUFile *file;
+    QemuMutex mutex;
+    QemuCond cond;
+    RAMBlock *block;
+    ram_addr_t offset;
+};
+typedef struct CompressParam CompressParam;
+
+struct DecompressParam {
+    bool start;
+    QemuMutex mutex;
+    QemuCond cond;
+    void *des;
+    uint8 *compbuf;
+    int len;
+};
+typedef struct DecompressParam DecompressParam;
+
+static CompressParam *comp_param;
+static QemuThread *compress_threads;
+/* comp_done_cond is used to wake up the migration thread when
+ * one of the compression threads has finished the compression.
+ * comp_done_lock is used to co-work with comp_done_cond.
+ */
+static QemuMutex *comp_done_lock;
+static QemuCond *comp_done_cond;
+/* The empty QEMUFileOps will be used by file in CompressParam */
+static const QEMUFileOps empty_ops = { };
+
+static bool compression_switch;
+static bool quit_comp_thread;
+static bool quit_decomp_thread;
+static DecompressParam *decomp_param;
+static QemuThread *decompress_threads;
+static uint8_t *compressed_data_buf;
+
+static int do_compress_ram_page(CompressParam *param);
+
+static void *do_data_compress(void *opaque)
+{
+    CompressParam *param = opaque;
+
+    while (!quit_comp_thread) {
+        qemu_mutex_lock(&param->mutex);
+        /* Re-check the quit_comp_thread in case of
+         * terminate_compression_threads is called just before
+         * qemu_mutex_lock(&param->mutex) and after
+         * while(!quit_comp_thread), re-check it here can make
+         * sure the compression thread terminate as expected.
+         */
+        while (!param->start && !quit_comp_thread) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+        }
+        if (!quit_comp_thread) {
+            do_compress_ram_page(param);
+        }
+        param->start = false;
+        qemu_mutex_unlock(&param->mutex);
+
+        qemu_mutex_lock(comp_done_lock);
+        param->done = true;
+        qemu_cond_signal(comp_done_cond);
+        qemu_mutex_unlock(comp_done_lock);
+    }
+
+    return NULL;
+}
+
+static inline void terminate_compression_threads(void)
+{
+    int idx, thread_count;
+
+    thread_count = migrate_compress_threads();
+    quit_comp_thread = true;
+    for (idx = 0; idx < thread_count; idx++) {
+        qemu_mutex_lock(&comp_param[idx].mutex);
+        qemu_cond_signal(&comp_param[idx].cond);
+        qemu_mutex_unlock(&comp_param[idx].mutex);
+    }
+}
+
+void migrate_compress_threads_join(void)
+{
+    int i, thread_count;
+
+    if (!migrate_use_compression()) {
+        return;
+    }
+    terminate_compression_threads();
+    thread_count = migrate_compress_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_thread_join(compress_threads + i);
+        qemu_fclose(comp_param[i].file);
+        qemu_mutex_destroy(&comp_param[i].mutex);
+        qemu_cond_destroy(&comp_param[i].cond);
+    }
+    qemu_mutex_destroy(comp_done_lock);
+    qemu_cond_destroy(comp_done_cond);
+    g_free(compress_threads);
+    g_free(comp_param);
+    g_free(comp_done_cond);
+    g_free(comp_done_lock);
+    compress_threads = NULL;
+    comp_param = NULL;
+    comp_done_cond = NULL;
+    comp_done_lock = NULL;
+}
+
+void migrate_compress_threads_create(void)
+{
+    int i, thread_count;
+
+    if (!migrate_use_compression()) {
+        return;
+    }
+    quit_comp_thread = false;
+    compression_switch = true;
+    thread_count = migrate_compress_threads();
+    compress_threads = g_new0(QemuThread, thread_count);
+    comp_param = g_new0(CompressParam, thread_count);
+    comp_done_cond = g_new0(QemuCond, 1);
+    comp_done_lock = g_new0(QemuMutex, 1);
+    qemu_cond_init(comp_done_cond);
+    qemu_mutex_init(comp_done_lock);
+    for (i = 0; i < thread_count; i++) {
+        /* com_param[i].file is just used as a dummy buffer to save data, set
+         * it's ops to empty.
+         */
+        comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
+        comp_param[i].done = true;
+        qemu_mutex_init(&comp_param[i].mutex);
+        qemu_cond_init(&comp_param[i].cond);
+        qemu_thread_create(compress_threads + i, "compress",
+                           do_data_compress, comp_param + i,
+                           QEMU_THREAD_JOINABLE);
+    }
+}
+
 /**
  * save_page_header: Write page header to wire
  *
@@ -332,19 +475,14 @@ static size_t save_page_header(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
 {
     size_t size;
 
-    if (block == last_sent_block) {
-        offset |= RAM_SAVE_FLAG_CONTINUE;
-    }
-
     qemu_put_be64(f, offset);
     size = 8;
 
-    if (block != last_sent_block) {
+    if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
         qemu_put_byte(f, strlen(block->idstr));
         qemu_put_buffer(f, (uint8_t *)block->idstr,
                         strlen(block->idstr));
         size += 1 + strlen(block->idstr);
-        last_sent_block = block;
     }
     return size;
 }
@@ -525,12 +663,16 @@ static void migration_bitmap_sync_range(ram_addr_t start, ram_addr_t length)
 static int64_t start_time;
 static int64_t bytes_xfer_prev;
 static int64_t num_dirty_pages_period;
+static uint64_t xbzrle_cache_miss_prev;
+static uint64_t iterations_prev;
 
 static void migration_bitmap_sync_init(void)
 {
     start_time = 0;
     bytes_xfer_prev = 0;
     num_dirty_pages_period = 0;
+    xbzrle_cache_miss_prev = 0;
+    iterations_prev = 0;
 }
 
 /* Called with iothread lock held, to protect ram_list.dirty_memory[] */
@@ -541,8 +683,6 @@ static void migration_bitmap_sync(void)
     MigrationState *s = migrate_get_current();
     int64_t end_time;
     int64_t bytes_xfer_now;
-    static uint64_t xbzrle_cache_miss_prev;
-    static uint64_t iterations_prev;
 
     bitmap_sync_count++;
 
@@ -590,7 +730,7 @@ static void migration_bitmap_sync(void)
              mig_throttle_on = false;
         }
         if (migrate_use_xbzrle()) {
-            if (iterations_prev != 0) {
+            if (iterations_prev != acct_info.iterations) {
                 acct_info.xbzrle_cache_miss_rate =
                    (double)(acct_info.xbzrle_cache_miss -
                             xbzrle_cache_miss_prev) /
@@ -604,8 +744,36 @@ static void migration_bitmap_sync(void)
         s->dirty_bytes_rate = s->dirty_pages_rate * TARGET_PAGE_SIZE;
         start_time = end_time;
         num_dirty_pages_period = 0;
-        s->dirty_sync_count = bitmap_sync_count;
     }
+    s->dirty_sync_count = bitmap_sync_count;
+}
+
+/**
+ * save_zero_page: Send the zero page to the stream
+ *
+ * Returns: Number of pages written.
+ *
+ * @f: QEMUFile where to send the data
+ * @block: block that contains the page we want to send
+ * @offset: offset inside the block for the page
+ * @p: pointer to the page
+ * @bytes_transferred: increase it with the number of transferred bytes
+ */
+static int save_zero_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
+                          uint8_t *p, uint64_t *bytes_transferred)
+{
+    int pages = -1;
+
+    if (is_zero_range(p, TARGET_PAGE_SIZE)) {
+        acct_info.dup_pages++;
+        *bytes_transferred += save_page_header(f, block,
+                                               offset | RAM_SAVE_FLAG_COMPRESS);
+        qemu_put_byte(f, 0);
+        *bytes_transferred += 1;
+        pages = 1;
+    }
+
+    return pages;
 }
 
 /**
@@ -644,6 +812,10 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
     XBZRLE_cache_lock();
 
     current_addr = block->offset + offset;
+
+    if (block == last_sent_block) {
+        offset |= RAM_SAVE_FLAG_CONTINUE;
+    }
     if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
         if (ret != RAM_SAVE_CONTROL_DELAYED) {
             if (bytes_xmit > 0) {
@@ -652,25 +824,22 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
                 acct_info.dup_pages++;
             }
         }
-    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
-        acct_info.dup_pages++;
-        *bytes_transferred += save_page_header(f, block,
-                                               offset | RAM_SAVE_FLAG_COMPRESS);
-        qemu_put_byte(f, 0);
-        *bytes_transferred += 1;
-        pages = 1;
-        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
-         * page would be stale
-         */
-        xbzrle_cache_zero_page(current_addr);
-    } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
-        pages = save_xbzrle_page(f, &p, current_addr, block,
-                                 offset, last_stage, bytes_transferred);
-        if (!last_stage) {
-            /* Can't send this cached data async, since the cache page
-             * might get updated before it gets to the wire
+    } else {
+        pages = save_zero_page(f, block, offset, p, bytes_transferred);
+        if (pages > 0) {
+            /* Must let xbzrle know, otherwise a previous (now 0'd) cached
+             * page would be stale
              */
-            send_async = false;
+            xbzrle_cache_zero_page(current_addr);
+        } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
+            pages = save_xbzrle_page(f, &p, current_addr, block,
+                                     offset, last_stage, bytes_transferred);
+            if (!last_stage) {
+                /* Can't send this cached data async, since the cache page
+                 * might get updated before it gets to the wire
+                 */
+                send_async = false;
+            }
         }
     }
 
@@ -693,6 +862,178 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
     return pages;
 }
 
+static int do_compress_ram_page(CompressParam *param)
+{
+    int bytes_sent, blen;
+    uint8_t *p;
+    RAMBlock *block = param->block;
+    ram_addr_t offset = param->offset;
+
+    p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
+
+    bytes_sent = save_page_header(param->file, block, offset |
+                                  RAM_SAVE_FLAG_COMPRESS_PAGE);
+    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
+                                     migrate_compress_level());
+    bytes_sent += blen;
+
+    return bytes_sent;
+}
+
+static inline void start_compression(CompressParam *param)
+{
+    param->done = false;
+    qemu_mutex_lock(&param->mutex);
+    param->start = true;
+    qemu_cond_signal(&param->cond);
+    qemu_mutex_unlock(&param->mutex);
+}
+
+static inline void start_decompression(DecompressParam *param)
+{
+    qemu_mutex_lock(&param->mutex);
+    param->start = true;
+    qemu_cond_signal(&param->cond);
+    qemu_mutex_unlock(&param->mutex);
+}
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f)
+{
+    int idx, len, thread_count;
+
+    if (!migrate_use_compression()) {
+        return;
+    }
+    thread_count = migrate_compress_threads();
+    for (idx = 0; idx < thread_count; idx++) {
+        if (!comp_param[idx].done) {
+            qemu_mutex_lock(comp_done_lock);
+            while (!comp_param[idx].done && !quit_comp_thread) {
+                qemu_cond_wait(comp_done_cond, comp_done_lock);
+            }
+            qemu_mutex_unlock(comp_done_lock);
+        }
+        if (!quit_comp_thread) {
+            len = qemu_put_qemu_file(f, comp_param[idx].file);
+            bytes_transferred += len;
+        }
+    }
+}
+
+static inline void set_compress_params(CompressParam *param, RAMBlock *block,
+                                       ram_addr_t offset)
+{
+    param->block = block;
+    param->offset = offset;
+}
+
+static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
+                                           ram_addr_t offset,
+                                           uint64_t *bytes_transferred)
+{
+    int idx, thread_count, bytes_xmit = -1, pages = -1;
+
+    thread_count = migrate_compress_threads();
+    qemu_mutex_lock(comp_done_lock);
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            if (comp_param[idx].done) {
+                bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
+                set_compress_params(&comp_param[idx], block, offset);
+                start_compression(&comp_param[idx]);
+                pages = 1;
+                acct_info.norm_pages++;
+                *bytes_transferred += bytes_xmit;
+                break;
+            }
+        }
+        if (pages > 0) {
+            break;
+        } else {
+            qemu_cond_wait(comp_done_cond, comp_done_lock);
+        }
+    }
+    qemu_mutex_unlock(comp_done_lock);
+
+    return pages;
+}
+
+/**
+ * ram_save_compressed_page: compress the given page and send it to the stream
+ *
+ * Returns: Number of pages written.
+ *
+ * @f: QEMUFile where to send the data
+ * @block: block that contains the page we want to send
+ * @offset: offset inside the block for the page
+ * @last_stage: if we are at the completion stage
+ * @bytes_transferred: increase it with the number of transferred bytes
+ */
+static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
+                                    ram_addr_t offset, bool last_stage,
+                                    uint64_t *bytes_transferred)
+{
+    int pages = -1;
+    uint64_t bytes_xmit;
+    MemoryRegion *mr = block->mr;
+    uint8_t *p;
+    int ret;
+
+    p = memory_region_get_ram_ptr(mr) + offset;
+
+    bytes_xmit = 0;
+    ret = ram_control_save_page(f, block->offset,
+                                offset, TARGET_PAGE_SIZE, &bytes_xmit);
+    if (bytes_xmit) {
+        *bytes_transferred += bytes_xmit;
+        pages = 1;
+    }
+    if (block == last_sent_block) {
+        offset |= RAM_SAVE_FLAG_CONTINUE;
+    }
+    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+        if (ret != RAM_SAVE_CONTROL_DELAYED) {
+            if (bytes_xmit > 0) {
+                acct_info.norm_pages++;
+            } else if (bytes_xmit == 0) {
+                acct_info.dup_pages++;
+            }
+        }
+    } else {
+        /* When starting the process of a new block, the first page of
+         * the block should be sent out before other pages in the same
+         * block, and all the pages in last block should have been sent
+         * out, keeping this order is important, because the 'cont' flag
+         * is used to avoid resending the block name.
+         */
+        if (block != last_sent_block) {
+            flush_compressed_data(f);
+            pages = save_zero_page(f, block, offset, p, bytes_transferred);
+            if (pages == -1) {
+                set_compress_params(&comp_param[0], block, offset);
+                /* Use the qemu thread to compress the data to make sure the
+                 * first page is sent out before other pages
+                 */
+                bytes_xmit = do_compress_ram_page(&comp_param[0]);
+                acct_info.norm_pages++;
+                qemu_put_qemu_file(f, comp_param[0].file);
+                *bytes_transferred += bytes_xmit;
+                pages = 1;
+            }
+        } else {
+            pages = save_zero_page(f, block, offset, p, bytes_transferred);
+            if (pages == -1) {
+                pages = compress_page_with_multi_thread(f, block, offset,
+                                                        bytes_transferred);
+            }
+        }
+    }
+
+    return pages;
+}
+
 /**
  * ram_find_and_save_block: Finds a dirty page and sends it to f
  *
@@ -732,13 +1073,26 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage,
                 block = QLIST_FIRST_RCU(&ram_list.blocks);
                 complete_round = true;
                 ram_bulk_stage = false;
+                if (migrate_use_xbzrle()) {
+                    /* If xbzrle is on, stop using the data compression at this
+                     * point. In theory, xbzrle can do better than compression.
+                     */
+                    flush_compressed_data(f);
+                    compression_switch = false;
+                }
             }
         } else {
-            pages = ram_save_page(f, block, offset, last_stage,
-                                  bytes_transferred);
+            if (compression_switch && migrate_use_compression()) {
+                pages = ram_save_compressed_page(f, block, offset, last_stage,
+                                                 bytes_transferred);
+            } else {
+                pages = ram_save_page(f, block, offset, last_stage,
+                                      bytes_transferred);
+            }
 
             /* if page is unmodified, continue to the next */
             if (pages > 0) {
+                last_sent_block = block;
                 break;
             }
         }
@@ -750,8 +1104,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage,
     return pages;
 }
 
-static uint64_t bytes_transferred;
-
 void acct_update_position(QEMUFile *f, size_t size, bool zero)
 {
     uint64_t pages = size / TARGET_PAGE_SIZE;
@@ -965,6 +1317,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
         }
         i++;
     }
+    flush_compressed_data(f);
     rcu_read_unlock();
 
     /*
@@ -1006,6 +1359,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
         }
     }
 
+    flush_compressed_data(f);
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();
 
@@ -1113,10 +1467,104 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
+static void *do_data_decompress(void *opaque)
+{
+    DecompressParam *param = opaque;
+    unsigned long pagesize;
+
+    while (!quit_decomp_thread) {
+        qemu_mutex_lock(&param->mutex);
+        while (!param->start && !quit_decomp_thread) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+            pagesize = TARGET_PAGE_SIZE;
+            if (!quit_decomp_thread) {
+                /* uncompress() will return failed in some case, especially
+                 * when the page is dirted when doing the compression, it's
+                 * not a problem because the dirty page will be retransferred
+                 * and uncompress() won't break the data in other pages.
+                 */
+                uncompress((Bytef *)param->des, &pagesize,
+                           (const Bytef *)param->compbuf, param->len);
+            }
+            param->start = false;
+        }
+        qemu_mutex_unlock(&param->mutex);
+    }
+
+    return NULL;
+}
+
+void migrate_decompress_threads_create(void)
+{
+    int i, thread_count;
+
+    thread_count = migrate_decompress_threads();
+    decompress_threads = g_new0(QemuThread, thread_count);
+    decomp_param = g_new0(DecompressParam, thread_count);
+    compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+    quit_decomp_thread = false;
+    for (i = 0; i < thread_count; i++) {
+        qemu_mutex_init(&decomp_param[i].mutex);
+        qemu_cond_init(&decomp_param[i].cond);
+        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+        qemu_thread_create(decompress_threads + i, "decompress",
+                           do_data_decompress, decomp_param + i,
+                           QEMU_THREAD_JOINABLE);
+    }
+}
+
+void migrate_decompress_threads_join(void)
+{
+    int i, thread_count;
+
+    quit_decomp_thread = true;
+    thread_count = migrate_decompress_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_mutex_lock(&decomp_param[i].mutex);
+        qemu_cond_signal(&decomp_param[i].cond);
+        qemu_mutex_unlock(&decomp_param[i].mutex);
+    }
+    for (i = 0; i < thread_count; i++) {
+        qemu_thread_join(decompress_threads + i);
+        qemu_mutex_destroy(&decomp_param[i].mutex);
+        qemu_cond_destroy(&decomp_param[i].cond);
+        g_free(decomp_param[i].compbuf);
+    }
+    g_free(decompress_threads);
+    g_free(decomp_param);
+    g_free(compressed_data_buf);
+    decompress_threads = NULL;
+    decomp_param = NULL;
+    compressed_data_buf = NULL;
+}
+
+static void decompress_data_with_multi_threads(uint8_t *compbuf,
+                                               void *host, int len)
+{
+    int idx, thread_count;
+
+    thread_count = migrate_decompress_threads();
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            if (!decomp_param[idx].start) {
+                memcpy(decomp_param[idx].compbuf, compbuf, len);
+                decomp_param[idx].des = host;
+                decomp_param[idx].len = len;
+                start_decompression(&decomp_param[idx]);
+                break;
+            }
+        }
+        if (idx < thread_count) {
+            break;
+        }
+    }
+}
+
 static int ram_load(QEMUFile *f, void *opaque, int version_id)
 {
     int flags = 0, ret = 0;
     static uint64_t seq_iter;
+    int len = 0;
 
     seq_iter++;
 
@@ -1196,6 +1644,23 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             }
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
+        case RAM_SAVE_FLAG_COMPRESS_PAGE:
+            host = host_from_stream_offset(f, addr, flags);
+            if (!host) {
+                error_report("Invalid RAM offset " RAM_ADDR_FMT, addr);
+                ret = -EINVAL;
+                break;
+            }
+
+            len = qemu_get_be32(f);
+            if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
+                error_report("Invalid compressed data length: %d", len);
+                ret = -EINVAL;
+                break;
+            }
+            qemu_get_buffer(f, compressed_data_buf, len);
+            decompress_data_with_multi_threads(compressed_data_buf, host, len);
+            break;
         case RAM_SAVE_FLAG_XBZRLE:
             host = host_from_stream_offset(f, addr, flags);
             if (!host) {