4 * Copyright (c) 2003-2008 Fabrice Bellard
5 * Copyright (c) 2011-2015 Red Hat Inc
8 * Juan Quintela <quintela@redhat.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 #include "qemu/osdep.h"
30 #include "qemu/cutils.h"
32 #include "ram-compress.h"
34 #include "qemu/error-report.h"
35 #include "qemu/stats64.h"
36 #include "migration.h"
38 #include "io/channel-null.h"
39 #include "exec/target_page.h"
40 #include "exec/ramblock.h"
42 #include "migration-stats.h"
44 CompressionStats compression_counters
;
46 static CompressParam
*comp_param
;
47 static QemuThread
*compress_threads
;
48 /* comp_done_cond is used to wake up the migration thread when
49 * one of the compression threads has finished the compression.
50 * comp_done_lock is used to co-work with comp_done_cond.
52 static QemuMutex comp_done_lock
;
53 static QemuCond comp_done_cond
;
55 struct DecompressParam
{
65 typedef struct DecompressParam DecompressParam
;
67 static QEMUFile
*decomp_file
;
68 static DecompressParam
*decomp_param
;
69 static QemuThread
*decompress_threads
;
70 static QemuMutex decomp_done_lock
;
71 static QemuCond decomp_done_cond
;
73 static CompressResult
do_compress_ram_page(QEMUFile
*f
, z_stream
*stream
,
74 RAMBlock
*block
, ram_addr_t offset
,
77 static void *do_data_compress(void *opaque
)
79 CompressParam
*param
= opaque
;
82 CompressResult result
;
84 qemu_mutex_lock(¶m
->mutex
);
85 while (!param
->quit
) {
88 offset
= param
->offset
;
89 param
->trigger
= false;
90 qemu_mutex_unlock(¶m
->mutex
);
92 result
= do_compress_ram_page(param
->file
, ¶m
->stream
,
93 block
, offset
, param
->originbuf
);
95 qemu_mutex_lock(&comp_done_lock
);
97 param
->result
= result
;
98 qemu_cond_signal(&comp_done_cond
);
99 qemu_mutex_unlock(&comp_done_lock
);
101 qemu_mutex_lock(¶m
->mutex
);
103 qemu_cond_wait(¶m
->cond
, ¶m
->mutex
);
106 qemu_mutex_unlock(¶m
->mutex
);
111 void compress_threads_save_cleanup(void)
115 if (!migrate_compress() || !comp_param
) {
119 thread_count
= migrate_compress_threads();
120 for (i
= 0; i
< thread_count
; i
++) {
122 * we use it as a indicator which shows if the thread is
123 * properly init'd or not
125 if (!comp_param
[i
].file
) {
129 qemu_mutex_lock(&comp_param
[i
].mutex
);
130 comp_param
[i
].quit
= true;
131 qemu_cond_signal(&comp_param
[i
].cond
);
132 qemu_mutex_unlock(&comp_param
[i
].mutex
);
134 qemu_thread_join(compress_threads
+ i
);
135 qemu_mutex_destroy(&comp_param
[i
].mutex
);
136 qemu_cond_destroy(&comp_param
[i
].cond
);
137 deflateEnd(&comp_param
[i
].stream
);
138 g_free(comp_param
[i
].originbuf
);
139 qemu_fclose(comp_param
[i
].file
);
140 comp_param
[i
].file
= NULL
;
142 qemu_mutex_destroy(&comp_done_lock
);
143 qemu_cond_destroy(&comp_done_cond
);
144 g_free(compress_threads
);
146 compress_threads
= NULL
;
150 int compress_threads_save_setup(void)
154 if (!migrate_compress()) {
157 thread_count
= migrate_compress_threads();
158 compress_threads
= g_new0(QemuThread
, thread_count
);
159 comp_param
= g_new0(CompressParam
, thread_count
);
160 qemu_cond_init(&comp_done_cond
);
161 qemu_mutex_init(&comp_done_lock
);
162 for (i
= 0; i
< thread_count
; i
++) {
163 comp_param
[i
].originbuf
= g_try_malloc(qemu_target_page_size());
164 if (!comp_param
[i
].originbuf
) {
168 if (deflateInit(&comp_param
[i
].stream
,
169 migrate_compress_level()) != Z_OK
) {
170 g_free(comp_param
[i
].originbuf
);
174 /* comp_param[i].file is just used as a dummy buffer to save data,
175 * set its ops to empty.
177 comp_param
[i
].file
= qemu_file_new_output(
178 QIO_CHANNEL(qio_channel_null_new()));
179 comp_param
[i
].done
= true;
180 comp_param
[i
].quit
= false;
181 qemu_mutex_init(&comp_param
[i
].mutex
);
182 qemu_cond_init(&comp_param
[i
].cond
);
183 qemu_thread_create(compress_threads
+ i
, "compress",
184 do_data_compress
, comp_param
+ i
,
185 QEMU_THREAD_JOINABLE
);
190 compress_threads_save_cleanup();
194 static CompressResult
do_compress_ram_page(QEMUFile
*f
, z_stream
*stream
,
195 RAMBlock
*block
, ram_addr_t offset
,
198 uint8_t *p
= block
->host
+ offset
;
199 size_t page_size
= qemu_target_page_size();
202 assert(qemu_file_buffer_empty(f
));
204 if (buffer_is_zero(p
, page_size
)) {
209 * copy it to a internal buffer to avoid it being modified by VM
210 * so that we can catch up the error during compression and
213 memcpy(source_buf
, p
, page_size
);
214 ret
= qemu_put_compression_data(f
, stream
, source_buf
, page_size
);
216 qemu_file_set_error(migrate_get_current()->to_dst_file
, ret
);
217 error_report("compressed data failed!");
224 static inline void compress_reset_result(CompressParam
*param
)
226 param
->result
= RES_NONE
;
231 void flush_compressed_data(int (send_queued_data(CompressParam
*)))
233 int thread_count
= migrate_compress_threads();
235 qemu_mutex_lock(&comp_done_lock
);
236 for (int i
= 0; i
< thread_count
; i
++) {
237 while (!comp_param
[i
].done
) {
238 qemu_cond_wait(&comp_done_cond
, &comp_done_lock
);
241 qemu_mutex_unlock(&comp_done_lock
);
243 for (int i
= 0; i
< thread_count
; i
++) {
244 qemu_mutex_lock(&comp_param
[i
].mutex
);
245 if (!comp_param
[i
].quit
) {
246 CompressParam
*param
= &comp_param
[i
];
247 send_queued_data(param
);
248 assert(qemu_file_buffer_empty(param
->file
));
249 compress_reset_result(param
);
251 qemu_mutex_unlock(&comp_param
[i
].mutex
);
255 static inline void set_compress_params(CompressParam
*param
, RAMBlock
*block
,
258 param
->block
= block
;
259 param
->offset
= offset
;
260 param
->trigger
= true;
263 int compress_page_with_multi_thread(RAMBlock
*block
, ram_addr_t offset
,
264 int (send_queued_data(CompressParam
*)))
266 int thread_count
, pages
= -1;
267 bool wait
= migrate_compress_wait_thread();
269 thread_count
= migrate_compress_threads();
270 qemu_mutex_lock(&comp_done_lock
);
272 for (int i
= 0; i
< thread_count
; i
++) {
273 if (comp_param
[i
].done
) {
274 CompressParam
*param
= &comp_param
[i
];
275 qemu_mutex_lock(¶m
->mutex
);
277 send_queued_data(param
);
278 assert(qemu_file_buffer_empty(param
->file
));
279 compress_reset_result(param
);
280 set_compress_params(param
, block
, offset
);
282 qemu_cond_signal(¶m
->cond
);
283 qemu_mutex_unlock(¶m
->mutex
);
290 * wait for the free thread if the user specifies 'compress-wait-thread',
291 * otherwise we will post the page out in the main thread as normal page.
293 if (pages
< 0 && wait
) {
294 qemu_cond_wait(&comp_done_cond
, &comp_done_lock
);
297 qemu_mutex_unlock(&comp_done_lock
);
302 /* return the size after decompression, or negative value on error */
304 qemu_uncompress_data(z_stream
*stream
, uint8_t *dest
, size_t dest_len
,
305 const uint8_t *source
, size_t source_len
)
309 err
= inflateReset(stream
);
314 stream
->avail_in
= source_len
;
315 stream
->next_in
= (uint8_t *)source
;
316 stream
->avail_out
= dest_len
;
317 stream
->next_out
= dest
;
319 err
= inflate(stream
, Z_NO_FLUSH
);
320 if (err
!= Z_STREAM_END
) {
324 return stream
->total_out
;
327 static void *do_data_decompress(void *opaque
)
329 DecompressParam
*param
= opaque
;
330 unsigned long pagesize
;
334 qemu_mutex_lock(¶m
->mutex
);
335 while (!param
->quit
) {
340 qemu_mutex_unlock(¶m
->mutex
);
342 pagesize
= qemu_target_page_size();
344 ret
= qemu_uncompress_data(¶m
->stream
, des
, pagesize
,
345 param
->compbuf
, len
);
346 if (ret
< 0 && migrate_get_current()->decompress_error_check
) {
347 error_report("decompress data failed");
348 qemu_file_set_error(decomp_file
, ret
);
351 qemu_mutex_lock(&decomp_done_lock
);
353 qemu_cond_signal(&decomp_done_cond
);
354 qemu_mutex_unlock(&decomp_done_lock
);
356 qemu_mutex_lock(¶m
->mutex
);
358 qemu_cond_wait(¶m
->cond
, ¶m
->mutex
);
361 qemu_mutex_unlock(¶m
->mutex
);
366 int wait_for_decompress_done(void)
368 if (!migrate_compress()) {
372 int thread_count
= migrate_decompress_threads();
373 qemu_mutex_lock(&decomp_done_lock
);
374 for (int i
= 0; i
< thread_count
; i
++) {
375 while (!decomp_param
[i
].done
) {
376 qemu_cond_wait(&decomp_done_cond
, &decomp_done_lock
);
379 qemu_mutex_unlock(&decomp_done_lock
);
380 return qemu_file_get_error(decomp_file
);
383 void compress_threads_load_cleanup(void)
387 if (!migrate_compress()) {
390 thread_count
= migrate_decompress_threads();
391 for (i
= 0; i
< thread_count
; i
++) {
393 * we use it as a indicator which shows if the thread is
394 * properly init'd or not
396 if (!decomp_param
[i
].compbuf
) {
400 qemu_mutex_lock(&decomp_param
[i
].mutex
);
401 decomp_param
[i
].quit
= true;
402 qemu_cond_signal(&decomp_param
[i
].cond
);
403 qemu_mutex_unlock(&decomp_param
[i
].mutex
);
405 for (i
= 0; i
< thread_count
; i
++) {
406 if (!decomp_param
[i
].compbuf
) {
410 qemu_thread_join(decompress_threads
+ i
);
411 qemu_mutex_destroy(&decomp_param
[i
].mutex
);
412 qemu_cond_destroy(&decomp_param
[i
].cond
);
413 inflateEnd(&decomp_param
[i
].stream
);
414 g_free(decomp_param
[i
].compbuf
);
415 decomp_param
[i
].compbuf
= NULL
;
417 g_free(decompress_threads
);
418 g_free(decomp_param
);
419 decompress_threads
= NULL
;
424 int compress_threads_load_setup(QEMUFile
*f
)
428 if (!migrate_compress()) {
433 * set compression_counters memory to zero for a new migration
435 memset(&compression_counters
, 0, sizeof(compression_counters
));
437 thread_count
= migrate_decompress_threads();
438 decompress_threads
= g_new0(QemuThread
, thread_count
);
439 decomp_param
= g_new0(DecompressParam
, thread_count
);
440 qemu_mutex_init(&decomp_done_lock
);
441 qemu_cond_init(&decomp_done_cond
);
443 for (i
= 0; i
< thread_count
; i
++) {
444 if (inflateInit(&decomp_param
[i
].stream
) != Z_OK
) {
448 size_t compbuf_size
= compressBound(qemu_target_page_size());
449 decomp_param
[i
].compbuf
= g_malloc0(compbuf_size
);
450 qemu_mutex_init(&decomp_param
[i
].mutex
);
451 qemu_cond_init(&decomp_param
[i
].cond
);
452 decomp_param
[i
].done
= true;
453 decomp_param
[i
].quit
= false;
454 qemu_thread_create(decompress_threads
+ i
, "decompress",
455 do_data_decompress
, decomp_param
+ i
,
456 QEMU_THREAD_JOINABLE
);
460 compress_threads_load_cleanup();
464 void decompress_data_with_multi_threads(QEMUFile
*f
, void *host
, int len
)
466 int thread_count
= migrate_decompress_threads();
467 QEMU_LOCK_GUARD(&decomp_done_lock
);
469 for (int i
= 0; i
< thread_count
; i
++) {
470 if (decomp_param
[i
].done
) {
471 decomp_param
[i
].done
= false;
472 qemu_mutex_lock(&decomp_param
[i
].mutex
);
473 qemu_get_buffer(f
, decomp_param
[i
].compbuf
, len
);
474 decomp_param
[i
].des
= host
;
475 decomp_param
[i
].len
= len
;
476 qemu_cond_signal(&decomp_param
[i
].cond
);
477 qemu_mutex_unlock(&decomp_param
[i
].mutex
);
481 qemu_cond_wait(&decomp_done_cond
, &decomp_done_lock
);
485 void populate_compress(MigrationInfo
*info
)
487 if (!migrate_compress()) {
490 info
->compression
= g_malloc0(sizeof(*info
->compression
));
491 info
->compression
->pages
= compression_counters
.pages
;
492 info
->compression
->busy
= compression_counters
.busy
;
493 info
->compression
->busy_rate
= compression_counters
.busy_rate
;
494 info
->compression
->compressed_size
= compression_counters
.compressed_size
;
495 info
->compression
->compression_rate
= compression_counters
.compression_rate
;
498 uint64_t ram_compressed_pages(void)
500 return compression_counters
.pages
;
503 void update_compress_thread_counts(const CompressParam
*param
, int bytes_xmit
)
505 ram_transferred_add(bytes_xmit
);
507 if (param
->result
== RES_ZEROPAGE
) {
508 stat64_add(&mig_stats
.zero_pages
, 1);
512 /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
513 compression_counters
.compressed_size
+= bytes_xmit
- 8;
514 compression_counters
.pages
++;