]> git.proxmox.com Git - mirror_qemu.git/blob - migration/ram-compress.c
migration: Replace the return path retry logic
[mirror_qemu.git] / migration / ram-compress.c
1 /*
2 * QEMU System Emulator
3 *
4 * Copyright (c) 2003-2008 Fabrice Bellard
5 * Copyright (c) 2011-2015 Red Hat Inc
6 *
7 * Authors:
8 * Juan Quintela <quintela@redhat.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 */
28
29 #include "qemu/osdep.h"
30 #include "qemu/cutils.h"
31
32 #include "ram-compress.h"
33
34 #include "qemu/error-report.h"
35 #include "migration.h"
36 #include "options.h"
37 #include "io/channel-null.h"
38 #include "exec/target_page.h"
39 #include "exec/ramblock.h"
40
41 CompressionStats compression_counters;
42
43 static CompressParam *comp_param;
44 static QemuThread *compress_threads;
45 /* comp_done_cond is used to wake up the migration thread when
46 * one of the compression threads has finished the compression.
47 * comp_done_lock is used to co-work with comp_done_cond.
48 */
49 static QemuMutex comp_done_lock;
50 static QemuCond comp_done_cond;
51
52 struct DecompressParam {
53 bool done;
54 bool quit;
55 QemuMutex mutex;
56 QemuCond cond;
57 void *des;
58 uint8_t *compbuf;
59 int len;
60 z_stream stream;
61 };
62 typedef struct DecompressParam DecompressParam;
63
64 static QEMUFile *decomp_file;
65 static DecompressParam *decomp_param;
66 static QemuThread *decompress_threads;
67 static QemuMutex decomp_done_lock;
68 static QemuCond decomp_done_cond;
69
70 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
71 RAMBlock *block, ram_addr_t offset,
72 uint8_t *source_buf);
73
74 static void *do_data_compress(void *opaque)
75 {
76 CompressParam *param = opaque;
77 RAMBlock *block;
78 ram_addr_t offset;
79 CompressResult result;
80
81 qemu_mutex_lock(&param->mutex);
82 while (!param->quit) {
83 if (param->trigger) {
84 block = param->block;
85 offset = param->offset;
86 param->trigger = false;
87 qemu_mutex_unlock(&param->mutex);
88
89 result = do_compress_ram_page(param->file, &param->stream,
90 block, offset, param->originbuf);
91
92 qemu_mutex_lock(&comp_done_lock);
93 param->done = true;
94 param->result = result;
95 qemu_cond_signal(&comp_done_cond);
96 qemu_mutex_unlock(&comp_done_lock);
97
98 qemu_mutex_lock(&param->mutex);
99 } else {
100 qemu_cond_wait(&param->cond, &param->mutex);
101 }
102 }
103 qemu_mutex_unlock(&param->mutex);
104
105 return NULL;
106 }
107
108 void compress_threads_save_cleanup(void)
109 {
110 int i, thread_count;
111
112 if (!migrate_compress() || !comp_param) {
113 return;
114 }
115
116 thread_count = migrate_compress_threads();
117 for (i = 0; i < thread_count; i++) {
118 /*
119 * we use it as a indicator which shows if the thread is
120 * properly init'd or not
121 */
122 if (!comp_param[i].file) {
123 break;
124 }
125
126 qemu_mutex_lock(&comp_param[i].mutex);
127 comp_param[i].quit = true;
128 qemu_cond_signal(&comp_param[i].cond);
129 qemu_mutex_unlock(&comp_param[i].mutex);
130
131 qemu_thread_join(compress_threads + i);
132 qemu_mutex_destroy(&comp_param[i].mutex);
133 qemu_cond_destroy(&comp_param[i].cond);
134 deflateEnd(&comp_param[i].stream);
135 g_free(comp_param[i].originbuf);
136 qemu_fclose(comp_param[i].file);
137 comp_param[i].file = NULL;
138 }
139 qemu_mutex_destroy(&comp_done_lock);
140 qemu_cond_destroy(&comp_done_cond);
141 g_free(compress_threads);
142 g_free(comp_param);
143 compress_threads = NULL;
144 comp_param = NULL;
145 }
146
147 int compress_threads_save_setup(void)
148 {
149 int i, thread_count;
150
151 if (!migrate_compress()) {
152 return 0;
153 }
154 thread_count = migrate_compress_threads();
155 compress_threads = g_new0(QemuThread, thread_count);
156 comp_param = g_new0(CompressParam, thread_count);
157 qemu_cond_init(&comp_done_cond);
158 qemu_mutex_init(&comp_done_lock);
159 for (i = 0; i < thread_count; i++) {
160 comp_param[i].originbuf = g_try_malloc(qemu_target_page_size());
161 if (!comp_param[i].originbuf) {
162 goto exit;
163 }
164
165 if (deflateInit(&comp_param[i].stream,
166 migrate_compress_level()) != Z_OK) {
167 g_free(comp_param[i].originbuf);
168 goto exit;
169 }
170
171 /* comp_param[i].file is just used as a dummy buffer to save data,
172 * set its ops to empty.
173 */
174 comp_param[i].file = qemu_file_new_output(
175 QIO_CHANNEL(qio_channel_null_new()));
176 comp_param[i].done = true;
177 comp_param[i].quit = false;
178 qemu_mutex_init(&comp_param[i].mutex);
179 qemu_cond_init(&comp_param[i].cond);
180 qemu_thread_create(compress_threads + i, "compress",
181 do_data_compress, comp_param + i,
182 QEMU_THREAD_JOINABLE);
183 }
184 return 0;
185
186 exit:
187 compress_threads_save_cleanup();
188 return -1;
189 }
190
191 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
192 RAMBlock *block, ram_addr_t offset,
193 uint8_t *source_buf)
194 {
195 uint8_t *p = block->host + offset;
196 size_t page_size = qemu_target_page_size();
197 int ret;
198
199 assert(qemu_file_buffer_empty(f));
200
201 if (buffer_is_zero(p, page_size)) {
202 return RES_ZEROPAGE;
203 }
204
205 /*
206 * copy it to a internal buffer to avoid it being modified by VM
207 * so that we can catch up the error during compression and
208 * decompression
209 */
210 memcpy(source_buf, p, page_size);
211 ret = qemu_put_compression_data(f, stream, source_buf, page_size);
212 if (ret < 0) {
213 qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
214 error_report("compressed data failed!");
215 qemu_fflush(f);
216 return RES_NONE;
217 }
218 return RES_COMPRESS;
219 }
220
221 static inline void compress_reset_result(CompressParam *param)
222 {
223 param->result = RES_NONE;
224 param->block = NULL;
225 param->offset = 0;
226 }
227
228 void flush_compressed_data(int (send_queued_data(CompressParam *)))
229 {
230 int idx, thread_count;
231
232 thread_count = migrate_compress_threads();
233
234 qemu_mutex_lock(&comp_done_lock);
235 for (idx = 0; idx < thread_count; idx++) {
236 while (!comp_param[idx].done) {
237 qemu_cond_wait(&comp_done_cond, &comp_done_lock);
238 }
239 }
240 qemu_mutex_unlock(&comp_done_lock);
241
242 for (idx = 0; idx < thread_count; idx++) {
243 qemu_mutex_lock(&comp_param[idx].mutex);
244 if (!comp_param[idx].quit) {
245 CompressParam *param = &comp_param[idx];
246 send_queued_data(param);
247 assert(qemu_file_buffer_empty(param->file));
248 compress_reset_result(param);
249 }
250 qemu_mutex_unlock(&comp_param[idx].mutex);
251 }
252 }
253
254 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
255 ram_addr_t offset)
256 {
257 param->block = block;
258 param->offset = offset;
259 param->trigger = true;
260 }
261
262 int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
263 int (send_queued_data(CompressParam *)))
264 {
265 int idx, thread_count, pages = -1;
266 bool wait = migrate_compress_wait_thread();
267
268 thread_count = migrate_compress_threads();
269 qemu_mutex_lock(&comp_done_lock);
270 retry:
271 for (idx = 0; idx < thread_count; idx++) {
272 if (comp_param[idx].done) {
273 CompressParam *param = &comp_param[idx];
274 qemu_mutex_lock(&param->mutex);
275 param->done = false;
276 send_queued_data(param);
277 assert(qemu_file_buffer_empty(param->file));
278 compress_reset_result(param);
279 set_compress_params(param, block, offset);
280
281 qemu_cond_signal(&param->cond);
282 qemu_mutex_unlock(&param->mutex);
283 pages = 1;
284 break;
285 }
286 }
287
288 /*
289 * wait for the free thread if the user specifies 'compress-wait-thread',
290 * otherwise we will post the page out in the main thread as normal page.
291 */
292 if (pages < 0 && wait) {
293 qemu_cond_wait(&comp_done_cond, &comp_done_lock);
294 goto retry;
295 }
296 qemu_mutex_unlock(&comp_done_lock);
297
298 return pages;
299 }
300
301 /* return the size after decompression, or negative value on error */
302 static int
303 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
304 const uint8_t *source, size_t source_len)
305 {
306 int err;
307
308 err = inflateReset(stream);
309 if (err != Z_OK) {
310 return -1;
311 }
312
313 stream->avail_in = source_len;
314 stream->next_in = (uint8_t *)source;
315 stream->avail_out = dest_len;
316 stream->next_out = dest;
317
318 err = inflate(stream, Z_NO_FLUSH);
319 if (err != Z_STREAM_END) {
320 return -1;
321 }
322
323 return stream->total_out;
324 }
325
326 static void *do_data_decompress(void *opaque)
327 {
328 DecompressParam *param = opaque;
329 unsigned long pagesize;
330 uint8_t *des;
331 int len, ret;
332
333 qemu_mutex_lock(&param->mutex);
334 while (!param->quit) {
335 if (param->des) {
336 des = param->des;
337 len = param->len;
338 param->des = 0;
339 qemu_mutex_unlock(&param->mutex);
340
341 pagesize = qemu_target_page_size();
342
343 ret = qemu_uncompress_data(&param->stream, des, pagesize,
344 param->compbuf, len);
345 if (ret < 0 && migrate_get_current()->decompress_error_check) {
346 error_report("decompress data failed");
347 qemu_file_set_error(decomp_file, ret);
348 }
349
350 qemu_mutex_lock(&decomp_done_lock);
351 param->done = true;
352 qemu_cond_signal(&decomp_done_cond);
353 qemu_mutex_unlock(&decomp_done_lock);
354
355 qemu_mutex_lock(&param->mutex);
356 } else {
357 qemu_cond_wait(&param->cond, &param->mutex);
358 }
359 }
360 qemu_mutex_unlock(&param->mutex);
361
362 return NULL;
363 }
364
365 int wait_for_decompress_done(void)
366 {
367 int idx, thread_count;
368
369 if (!migrate_compress()) {
370 return 0;
371 }
372
373 thread_count = migrate_decompress_threads();
374 qemu_mutex_lock(&decomp_done_lock);
375 for (idx = 0; idx < thread_count; idx++) {
376 while (!decomp_param[idx].done) {
377 qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
378 }
379 }
380 qemu_mutex_unlock(&decomp_done_lock);
381 return qemu_file_get_error(decomp_file);
382 }
383
384 void compress_threads_load_cleanup(void)
385 {
386 int i, thread_count;
387
388 if (!migrate_compress()) {
389 return;
390 }
391 thread_count = migrate_decompress_threads();
392 for (i = 0; i < thread_count; i++) {
393 /*
394 * we use it as a indicator which shows if the thread is
395 * properly init'd or not
396 */
397 if (!decomp_param[i].compbuf) {
398 break;
399 }
400
401 qemu_mutex_lock(&decomp_param[i].mutex);
402 decomp_param[i].quit = true;
403 qemu_cond_signal(&decomp_param[i].cond);
404 qemu_mutex_unlock(&decomp_param[i].mutex);
405 }
406 for (i = 0; i < thread_count; i++) {
407 if (!decomp_param[i].compbuf) {
408 break;
409 }
410
411 qemu_thread_join(decompress_threads + i);
412 qemu_mutex_destroy(&decomp_param[i].mutex);
413 qemu_cond_destroy(&decomp_param[i].cond);
414 inflateEnd(&decomp_param[i].stream);
415 g_free(decomp_param[i].compbuf);
416 decomp_param[i].compbuf = NULL;
417 }
418 g_free(decompress_threads);
419 g_free(decomp_param);
420 decompress_threads = NULL;
421 decomp_param = NULL;
422 decomp_file = NULL;
423 }
424
425 int compress_threads_load_setup(QEMUFile *f)
426 {
427 int i, thread_count;
428
429 if (!migrate_compress()) {
430 return 0;
431 }
432
433 thread_count = migrate_decompress_threads();
434 decompress_threads = g_new0(QemuThread, thread_count);
435 decomp_param = g_new0(DecompressParam, thread_count);
436 qemu_mutex_init(&decomp_done_lock);
437 qemu_cond_init(&decomp_done_cond);
438 decomp_file = f;
439 for (i = 0; i < thread_count; i++) {
440 if (inflateInit(&decomp_param[i].stream) != Z_OK) {
441 goto exit;
442 }
443
444 size_t compbuf_size = compressBound(qemu_target_page_size());
445 decomp_param[i].compbuf = g_malloc0(compbuf_size);
446 qemu_mutex_init(&decomp_param[i].mutex);
447 qemu_cond_init(&decomp_param[i].cond);
448 decomp_param[i].done = true;
449 decomp_param[i].quit = false;
450 qemu_thread_create(decompress_threads + i, "decompress",
451 do_data_decompress, decomp_param + i,
452 QEMU_THREAD_JOINABLE);
453 }
454 return 0;
455 exit:
456 compress_threads_load_cleanup();
457 return -1;
458 }
459
460 void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len)
461 {
462 int idx, thread_count;
463
464 thread_count = migrate_decompress_threads();
465 QEMU_LOCK_GUARD(&decomp_done_lock);
466 while (true) {
467 for (idx = 0; idx < thread_count; idx++) {
468 if (decomp_param[idx].done) {
469 decomp_param[idx].done = false;
470 qemu_mutex_lock(&decomp_param[idx].mutex);
471 qemu_get_buffer(f, decomp_param[idx].compbuf, len);
472 decomp_param[idx].des = host;
473 decomp_param[idx].len = len;
474 qemu_cond_signal(&decomp_param[idx].cond);
475 qemu_mutex_unlock(&decomp_param[idx].mutex);
476 break;
477 }
478 }
479 if (idx < thread_count) {
480 break;
481 } else {
482 qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
483 }
484 }
485 }