]> git.proxmox.com Git - mirror_qemu.git/blob - migration/ram-compress.c
aspeed: Clean up includes
[mirror_qemu.git] / migration / ram-compress.c
1 /*
2 * QEMU System Emulator
3 *
4 * Copyright (c) 2003-2008 Fabrice Bellard
5 * Copyright (c) 2011-2015 Red Hat Inc
6 *
7 * Authors:
8 * Juan Quintela <quintela@redhat.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 */
28
29 #include "qemu/osdep.h"
30 #include "qemu/cutils.h"
31
32 #include "ram-compress.h"
33
34 #include "qemu/error-report.h"
35 #include "qemu/stats64.h"
36 #include "migration.h"
37 #include "options.h"
38 #include "io/channel-null.h"
39 #include "exec/target_page.h"
40 #include "exec/ramblock.h"
41 #include "ram.h"
42 #include "migration-stats.h"
43
44 static struct {
45 int64_t pages;
46 int64_t busy;
47 double busy_rate;
48 int64_t compressed_size;
49 double compression_rate;
50 /* compression statistics since the beginning of the period */
51 /* amount of count that no free thread to compress data */
52 uint64_t compress_thread_busy_prev;
53 /* amount bytes after compression */
54 uint64_t compressed_size_prev;
55 /* amount of compressed pages */
56 uint64_t compress_pages_prev;
57 } compression_counters;
58
59 static CompressParam *comp_param;
60 static QemuThread *compress_threads;
61 /* comp_done_cond is used to wake up the migration thread when
62 * one of the compression threads has finished the compression.
63 * comp_done_lock is used to co-work with comp_done_cond.
64 */
65 static QemuMutex comp_done_lock;
66 static QemuCond comp_done_cond;
67
68 struct DecompressParam {
69 bool done;
70 bool quit;
71 QemuMutex mutex;
72 QemuCond cond;
73 void *des;
74 uint8_t *compbuf;
75 int len;
76 z_stream stream;
77 };
78 typedef struct DecompressParam DecompressParam;
79
80 static QEMUFile *decomp_file;
81 static DecompressParam *decomp_param;
82 static QemuThread *decompress_threads;
83 static QemuMutex decomp_done_lock;
84 static QemuCond decomp_done_cond;
85
86 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
87 RAMBlock *block, ram_addr_t offset,
88 uint8_t *source_buf);
89
90 static void *do_data_compress(void *opaque)
91 {
92 CompressParam *param = opaque;
93 RAMBlock *block;
94 ram_addr_t offset;
95 CompressResult result;
96
97 qemu_mutex_lock(&param->mutex);
98 while (!param->quit) {
99 if (param->trigger) {
100 block = param->block;
101 offset = param->offset;
102 param->trigger = false;
103 qemu_mutex_unlock(&param->mutex);
104
105 result = do_compress_ram_page(param->file, &param->stream,
106 block, offset, param->originbuf);
107
108 qemu_mutex_lock(&comp_done_lock);
109 param->done = true;
110 param->result = result;
111 qemu_cond_signal(&comp_done_cond);
112 qemu_mutex_unlock(&comp_done_lock);
113
114 qemu_mutex_lock(&param->mutex);
115 } else {
116 qemu_cond_wait(&param->cond, &param->mutex);
117 }
118 }
119 qemu_mutex_unlock(&param->mutex);
120
121 return NULL;
122 }
123
124 void compress_threads_save_cleanup(void)
125 {
126 int i, thread_count;
127
128 if (!migrate_compress() || !comp_param) {
129 return;
130 }
131
132 thread_count = migrate_compress_threads();
133 for (i = 0; i < thread_count; i++) {
134 /*
135 * we use it as a indicator which shows if the thread is
136 * properly init'd or not
137 */
138 if (!comp_param[i].file) {
139 break;
140 }
141
142 qemu_mutex_lock(&comp_param[i].mutex);
143 comp_param[i].quit = true;
144 qemu_cond_signal(&comp_param[i].cond);
145 qemu_mutex_unlock(&comp_param[i].mutex);
146
147 qemu_thread_join(compress_threads + i);
148 qemu_mutex_destroy(&comp_param[i].mutex);
149 qemu_cond_destroy(&comp_param[i].cond);
150 deflateEnd(&comp_param[i].stream);
151 g_free(comp_param[i].originbuf);
152 qemu_fclose(comp_param[i].file);
153 comp_param[i].file = NULL;
154 }
155 qemu_mutex_destroy(&comp_done_lock);
156 qemu_cond_destroy(&comp_done_cond);
157 g_free(compress_threads);
158 g_free(comp_param);
159 compress_threads = NULL;
160 comp_param = NULL;
161 }
162
163 int compress_threads_save_setup(void)
164 {
165 int i, thread_count;
166
167 if (!migrate_compress()) {
168 return 0;
169 }
170 thread_count = migrate_compress_threads();
171 compress_threads = g_new0(QemuThread, thread_count);
172 comp_param = g_new0(CompressParam, thread_count);
173 qemu_cond_init(&comp_done_cond);
174 qemu_mutex_init(&comp_done_lock);
175 for (i = 0; i < thread_count; i++) {
176 comp_param[i].originbuf = g_try_malloc(qemu_target_page_size());
177 if (!comp_param[i].originbuf) {
178 goto exit;
179 }
180
181 if (deflateInit(&comp_param[i].stream,
182 migrate_compress_level()) != Z_OK) {
183 g_free(comp_param[i].originbuf);
184 goto exit;
185 }
186
187 /* comp_param[i].file is just used as a dummy buffer to save data,
188 * set its ops to empty.
189 */
190 comp_param[i].file = qemu_file_new_output(
191 QIO_CHANNEL(qio_channel_null_new()));
192 comp_param[i].done = true;
193 comp_param[i].quit = false;
194 qemu_mutex_init(&comp_param[i].mutex);
195 qemu_cond_init(&comp_param[i].cond);
196 qemu_thread_create(compress_threads + i, "compress",
197 do_data_compress, comp_param + i,
198 QEMU_THREAD_JOINABLE);
199 }
200 return 0;
201
202 exit:
203 compress_threads_save_cleanup();
204 return -1;
205 }
206
207 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
208 RAMBlock *block, ram_addr_t offset,
209 uint8_t *source_buf)
210 {
211 uint8_t *p = block->host + offset;
212 size_t page_size = qemu_target_page_size();
213 int ret;
214
215 assert(qemu_file_buffer_empty(f));
216
217 if (buffer_is_zero(p, page_size)) {
218 return RES_ZEROPAGE;
219 }
220
221 /*
222 * copy it to a internal buffer to avoid it being modified by VM
223 * so that we can catch up the error during compression and
224 * decompression
225 */
226 memcpy(source_buf, p, page_size);
227 ret = qemu_put_compression_data(f, stream, source_buf, page_size);
228 if (ret < 0) {
229 qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
230 error_report("compressed data failed!");
231 qemu_fflush(f);
232 return RES_NONE;
233 }
234 return RES_COMPRESS;
235 }
236
237 static inline void compress_reset_result(CompressParam *param)
238 {
239 param->result = RES_NONE;
240 param->block = NULL;
241 param->offset = 0;
242 }
243
244 void compress_flush_data(void)
245 {
246 int thread_count = migrate_compress_threads();
247
248 if (!migrate_compress()) {
249 return;
250 }
251
252 qemu_mutex_lock(&comp_done_lock);
253 for (int i = 0; i < thread_count; i++) {
254 while (!comp_param[i].done) {
255 qemu_cond_wait(&comp_done_cond, &comp_done_lock);
256 }
257 }
258 qemu_mutex_unlock(&comp_done_lock);
259
260 for (int i = 0; i < thread_count; i++) {
261 qemu_mutex_lock(&comp_param[i].mutex);
262 if (!comp_param[i].quit) {
263 CompressParam *param = &comp_param[i];
264 compress_send_queued_data(param);
265 assert(qemu_file_buffer_empty(param->file));
266 compress_reset_result(param);
267 }
268 qemu_mutex_unlock(&comp_param[i].mutex);
269 }
270 }
271
272 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
273 ram_addr_t offset)
274 {
275 param->block = block;
276 param->offset = offset;
277 param->trigger = true;
278 }
279
280 /*
281 * Return true when it compress a page
282 */
283 bool compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
284 int (send_queued_data(CompressParam *)))
285 {
286 int thread_count;
287 bool wait = migrate_compress_wait_thread();
288
289 thread_count = migrate_compress_threads();
290 qemu_mutex_lock(&comp_done_lock);
291
292 while (true) {
293 for (int i = 0; i < thread_count; i++) {
294 if (comp_param[i].done) {
295 CompressParam *param = &comp_param[i];
296 qemu_mutex_lock(&param->mutex);
297 param->done = false;
298 send_queued_data(param);
299 assert(qemu_file_buffer_empty(param->file));
300 compress_reset_result(param);
301 set_compress_params(param, block, offset);
302
303 qemu_cond_signal(&param->cond);
304 qemu_mutex_unlock(&param->mutex);
305 qemu_mutex_unlock(&comp_done_lock);
306 return true;
307 }
308 }
309 if (!wait) {
310 qemu_mutex_unlock(&comp_done_lock);
311 compression_counters.busy++;
312 return false;
313 }
314 /*
315 * wait for a free thread if the user specifies
316 * 'compress-wait-thread', otherwise we will post the page out
317 * in the main thread as normal page.
318 */
319 qemu_cond_wait(&comp_done_cond, &comp_done_lock);
320 }
321 }
322
323 /* return the size after decompression, or negative value on error */
324 static int
325 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
326 const uint8_t *source, size_t source_len)
327 {
328 int err;
329
330 err = inflateReset(stream);
331 if (err != Z_OK) {
332 return -1;
333 }
334
335 stream->avail_in = source_len;
336 stream->next_in = (uint8_t *)source;
337 stream->avail_out = dest_len;
338 stream->next_out = dest;
339
340 err = inflate(stream, Z_NO_FLUSH);
341 if (err != Z_STREAM_END) {
342 return -1;
343 }
344
345 return stream->total_out;
346 }
347
348 static void *do_data_decompress(void *opaque)
349 {
350 DecompressParam *param = opaque;
351 unsigned long pagesize;
352 uint8_t *des;
353 int len, ret;
354
355 qemu_mutex_lock(&param->mutex);
356 while (!param->quit) {
357 if (param->des) {
358 des = param->des;
359 len = param->len;
360 param->des = 0;
361 qemu_mutex_unlock(&param->mutex);
362
363 pagesize = qemu_target_page_size();
364
365 ret = qemu_uncompress_data(&param->stream, des, pagesize,
366 param->compbuf, len);
367 if (ret < 0 && migrate_get_current()->decompress_error_check) {
368 error_report("decompress data failed");
369 qemu_file_set_error(decomp_file, ret);
370 }
371
372 qemu_mutex_lock(&decomp_done_lock);
373 param->done = true;
374 qemu_cond_signal(&decomp_done_cond);
375 qemu_mutex_unlock(&decomp_done_lock);
376
377 qemu_mutex_lock(&param->mutex);
378 } else {
379 qemu_cond_wait(&param->cond, &param->mutex);
380 }
381 }
382 qemu_mutex_unlock(&param->mutex);
383
384 return NULL;
385 }
386
387 int wait_for_decompress_done(void)
388 {
389 if (!migrate_compress()) {
390 return 0;
391 }
392
393 int thread_count = migrate_decompress_threads();
394 qemu_mutex_lock(&decomp_done_lock);
395 for (int i = 0; i < thread_count; i++) {
396 while (!decomp_param[i].done) {
397 qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
398 }
399 }
400 qemu_mutex_unlock(&decomp_done_lock);
401 return qemu_file_get_error(decomp_file);
402 }
403
404 void compress_threads_load_cleanup(void)
405 {
406 int i, thread_count;
407
408 if (!migrate_compress()) {
409 return;
410 }
411 thread_count = migrate_decompress_threads();
412 for (i = 0; i < thread_count; i++) {
413 /*
414 * we use it as a indicator which shows if the thread is
415 * properly init'd or not
416 */
417 if (!decomp_param[i].compbuf) {
418 break;
419 }
420
421 qemu_mutex_lock(&decomp_param[i].mutex);
422 decomp_param[i].quit = true;
423 qemu_cond_signal(&decomp_param[i].cond);
424 qemu_mutex_unlock(&decomp_param[i].mutex);
425 }
426 for (i = 0; i < thread_count; i++) {
427 if (!decomp_param[i].compbuf) {
428 break;
429 }
430
431 qemu_thread_join(decompress_threads + i);
432 qemu_mutex_destroy(&decomp_param[i].mutex);
433 qemu_cond_destroy(&decomp_param[i].cond);
434 inflateEnd(&decomp_param[i].stream);
435 g_free(decomp_param[i].compbuf);
436 decomp_param[i].compbuf = NULL;
437 }
438 g_free(decompress_threads);
439 g_free(decomp_param);
440 decompress_threads = NULL;
441 decomp_param = NULL;
442 decomp_file = NULL;
443 }
444
445 int compress_threads_load_setup(QEMUFile *f)
446 {
447 int i, thread_count;
448
449 if (!migrate_compress()) {
450 return 0;
451 }
452
453 /*
454 * set compression_counters memory to zero for a new migration
455 */
456 memset(&compression_counters, 0, sizeof(compression_counters));
457
458 thread_count = migrate_decompress_threads();
459 decompress_threads = g_new0(QemuThread, thread_count);
460 decomp_param = g_new0(DecompressParam, thread_count);
461 qemu_mutex_init(&decomp_done_lock);
462 qemu_cond_init(&decomp_done_cond);
463 decomp_file = f;
464 for (i = 0; i < thread_count; i++) {
465 if (inflateInit(&decomp_param[i].stream) != Z_OK) {
466 goto exit;
467 }
468
469 size_t compbuf_size = compressBound(qemu_target_page_size());
470 decomp_param[i].compbuf = g_malloc0(compbuf_size);
471 qemu_mutex_init(&decomp_param[i].mutex);
472 qemu_cond_init(&decomp_param[i].cond);
473 decomp_param[i].done = true;
474 decomp_param[i].quit = false;
475 qemu_thread_create(decompress_threads + i, "decompress",
476 do_data_decompress, decomp_param + i,
477 QEMU_THREAD_JOINABLE);
478 }
479 return 0;
480 exit:
481 compress_threads_load_cleanup();
482 return -1;
483 }
484
485 void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len)
486 {
487 int thread_count = migrate_decompress_threads();
488 QEMU_LOCK_GUARD(&decomp_done_lock);
489 while (true) {
490 for (int i = 0; i < thread_count; i++) {
491 if (decomp_param[i].done) {
492 decomp_param[i].done = false;
493 qemu_mutex_lock(&decomp_param[i].mutex);
494 qemu_get_buffer(f, decomp_param[i].compbuf, len);
495 decomp_param[i].des = host;
496 decomp_param[i].len = len;
497 qemu_cond_signal(&decomp_param[i].cond);
498 qemu_mutex_unlock(&decomp_param[i].mutex);
499 return;
500 }
501 }
502 qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
503 }
504 }
505
506 void populate_compress(MigrationInfo *info)
507 {
508 if (!migrate_compress()) {
509 return;
510 }
511 info->compression = g_malloc0(sizeof(*info->compression));
512 info->compression->pages = compression_counters.pages;
513 info->compression->busy = compression_counters.busy;
514 info->compression->busy_rate = compression_counters.busy_rate;
515 info->compression->compressed_size = compression_counters.compressed_size;
516 info->compression->compression_rate = compression_counters.compression_rate;
517 }
518
519 uint64_t compress_ram_pages(void)
520 {
521 return compression_counters.pages;
522 }
523
524 void update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
525 {
526 ram_transferred_add(bytes_xmit);
527
528 if (param->result == RES_ZEROPAGE) {
529 stat64_add(&mig_stats.zero_pages, 1);
530 return;
531 }
532
533 /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
534 compression_counters.compressed_size += bytes_xmit - 8;
535 compression_counters.pages++;
536 }
537
538 void compress_update_rates(uint64_t page_count)
539 {
540 if (!migrate_compress()) {
541 return;
542 }
543 compression_counters.busy_rate = (double)(compression_counters.busy -
544 compression_counters.compress_thread_busy_prev) / page_count;
545 compression_counters.compress_thread_busy_prev =
546 compression_counters.busy;
547
548 double compressed_size = compression_counters.compressed_size -
549 compression_counters.compressed_size_prev;
550 if (compressed_size) {
551 double uncompressed_size = (compression_counters.pages -
552 compression_counters.compress_pages_prev) *
553 qemu_target_page_size();
554
555 /* Compression-Ratio = Uncompressed-size / Compressed-size */
556 compression_counters.compression_rate =
557 uncompressed_size / compressed_size;
558
559 compression_counters.compress_pages_prev =
560 compression_counters.pages;
561 compression_counters.compressed_size_prev =
562 compression_counters.compressed_size;
563 }
564 }