]> git.proxmox.com Git - mirror_qemu.git/blob - migration/colo.c
COLO: Shutdown related socket fd while do failover
[mirror_qemu.git] / migration / colo.c
1 /*
2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3 * (a.k.a. Fault Tolerance or Continuous Replication)
4 *
5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6 * Copyright (c) 2016 FUJITSU LIMITED
7 * Copyright (c) 2016 Intel Corporation
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2 or
10 * later. See the COPYING file in the top-level directory.
11 */
12
13 #include "qemu/osdep.h"
14 #include "qemu/timer.h"
15 #include "sysemu/sysemu.h"
16 #include "migration/colo.h"
17 #include "io/channel-buffer.h"
18 #include "trace.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "migration/failover.h"
22
23 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
24
25 bool colo_supported(void)
26 {
27 return true;
28 }
29
30 bool migration_in_colo_state(void)
31 {
32 MigrationState *s = migrate_get_current();
33
34 return (s->state == MIGRATION_STATUS_COLO);
35 }
36
37 bool migration_incoming_in_colo_state(void)
38 {
39 MigrationIncomingState *mis = migration_incoming_get_current();
40
41 return mis && (mis->state == MIGRATION_STATUS_COLO);
42 }
43
44 static bool colo_runstate_is_stopped(void)
45 {
46 return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
47 }
48
49 static void secondary_vm_do_failover(void)
50 {
51 int old_state;
52 MigrationIncomingState *mis = migration_incoming_get_current();
53
54 migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
55 MIGRATION_STATUS_COMPLETED);
56
57 if (!autostart) {
58 error_report("\"-S\" qemu option will be ignored in secondary side");
59 /* recover runstate to normal migration finish state */
60 autostart = true;
61 }
62 /*
63 * Make sure COLO incoming thread not block in recv or send,
64 * If mis->from_src_file and mis->to_src_file use the same fd,
65 * The second shutdown() will return -1, we ignore this value,
66 * It is harmless.
67 */
68 if (mis->from_src_file) {
69 qemu_file_shutdown(mis->from_src_file);
70 }
71 if (mis->to_src_file) {
72 qemu_file_shutdown(mis->to_src_file);
73 }
74
75 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
76 FAILOVER_STATUS_COMPLETED);
77 if (old_state != FAILOVER_STATUS_ACTIVE) {
78 error_report("Incorrect state (%s) while doing failover for "
79 "secondary VM", FailoverStatus_lookup[old_state]);
80 return;
81 }
82 /* Notify COLO incoming thread that failover work is finished */
83 qemu_sem_post(&mis->colo_incoming_sem);
84 /* For Secondary VM, jump to incoming co */
85 if (mis->migration_incoming_co) {
86 qemu_coroutine_enter(mis->migration_incoming_co);
87 }
88 }
89
90 static void primary_vm_do_failover(void)
91 {
92 MigrationState *s = migrate_get_current();
93 int old_state;
94
95 migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
96 MIGRATION_STATUS_COMPLETED);
97
98 /*
99 * Wake up COLO thread which may blocked in recv() or send(),
100 * The s->rp_state.from_dst_file and s->to_dst_file may use the
101 * same fd, but we still shutdown the fd for twice, it is harmless.
102 */
103 if (s->to_dst_file) {
104 qemu_file_shutdown(s->to_dst_file);
105 }
106 if (s->rp_state.from_dst_file) {
107 qemu_file_shutdown(s->rp_state.from_dst_file);
108 }
109
110 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
111 FAILOVER_STATUS_COMPLETED);
112 if (old_state != FAILOVER_STATUS_ACTIVE) {
113 error_report("Incorrect state (%s) while doing failover for Primary VM",
114 FailoverStatus_lookup[old_state]);
115 return;
116 }
117 /* Notify COLO thread that failover work is finished */
118 qemu_sem_post(&s->colo_exit_sem);
119 }
120
121 void colo_do_failover(MigrationState *s)
122 {
123 /* Make sure VM stopped while failover happened. */
124 if (!colo_runstate_is_stopped()) {
125 vm_stop_force_state(RUN_STATE_COLO);
126 }
127
128 if (get_colo_mode() == COLO_MODE_PRIMARY) {
129 primary_vm_do_failover();
130 } else {
131 secondary_vm_do_failover();
132 }
133 }
134
135 static void colo_send_message(QEMUFile *f, COLOMessage msg,
136 Error **errp)
137 {
138 int ret;
139
140 if (msg >= COLO_MESSAGE__MAX) {
141 error_setg(errp, "%s: Invalid message", __func__);
142 return;
143 }
144 qemu_put_be32(f, msg);
145 qemu_fflush(f);
146
147 ret = qemu_file_get_error(f);
148 if (ret < 0) {
149 error_setg_errno(errp, -ret, "Can't send COLO message");
150 }
151 trace_colo_send_message(COLOMessage_lookup[msg]);
152 }
153
154 static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
155 uint64_t value, Error **errp)
156 {
157 Error *local_err = NULL;
158 int ret;
159
160 colo_send_message(f, msg, &local_err);
161 if (local_err) {
162 error_propagate(errp, local_err);
163 return;
164 }
165 qemu_put_be64(f, value);
166 qemu_fflush(f);
167
168 ret = qemu_file_get_error(f);
169 if (ret < 0) {
170 error_setg_errno(errp, -ret, "Failed to send value for message:%s",
171 COLOMessage_lookup[msg]);
172 }
173 }
174
175 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
176 {
177 COLOMessage msg;
178 int ret;
179
180 msg = qemu_get_be32(f);
181 ret = qemu_file_get_error(f);
182 if (ret < 0) {
183 error_setg_errno(errp, -ret, "Can't receive COLO message");
184 return msg;
185 }
186 if (msg >= COLO_MESSAGE__MAX) {
187 error_setg(errp, "%s: Invalid message", __func__);
188 return msg;
189 }
190 trace_colo_receive_message(COLOMessage_lookup[msg]);
191 return msg;
192 }
193
194 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
195 Error **errp)
196 {
197 COLOMessage msg;
198 Error *local_err = NULL;
199
200 msg = colo_receive_message(f, &local_err);
201 if (local_err) {
202 error_propagate(errp, local_err);
203 return;
204 }
205 if (msg != expect_msg) {
206 error_setg(errp, "Unexpected COLO message %d, expected %d",
207 msg, expect_msg);
208 }
209 }
210
211 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
212 Error **errp)
213 {
214 Error *local_err = NULL;
215 uint64_t value;
216 int ret;
217
218 colo_receive_check_message(f, expect_msg, &local_err);
219 if (local_err) {
220 error_propagate(errp, local_err);
221 return 0;
222 }
223
224 value = qemu_get_be64(f);
225 ret = qemu_file_get_error(f);
226 if (ret < 0) {
227 error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
228 COLOMessage_lookup[expect_msg]);
229 }
230 return value;
231 }
232
233 static int colo_do_checkpoint_transaction(MigrationState *s,
234 QIOChannelBuffer *bioc,
235 QEMUFile *fb)
236 {
237 Error *local_err = NULL;
238 int ret = -1;
239
240 colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
241 &local_err);
242 if (local_err) {
243 goto out;
244 }
245
246 colo_receive_check_message(s->rp_state.from_dst_file,
247 COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
248 if (local_err) {
249 goto out;
250 }
251 /* Reset channel-buffer directly */
252 qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
253 bioc->usage = 0;
254
255 qemu_mutex_lock_iothread();
256 if (failover_get_state() != FAILOVER_STATUS_NONE) {
257 qemu_mutex_unlock_iothread();
258 goto out;
259 }
260 vm_stop_force_state(RUN_STATE_COLO);
261 qemu_mutex_unlock_iothread();
262 trace_colo_vm_state_change("run", "stop");
263 /*
264 * Failover request bh could be called after vm_stop_force_state(),
265 * So we need check failover_request_is_active() again.
266 */
267 if (failover_get_state() != FAILOVER_STATUS_NONE) {
268 goto out;
269 }
270
271 /* Disable block migration */
272 s->params.blk = 0;
273 s->params.shared = 0;
274 qemu_savevm_state_header(fb);
275 qemu_savevm_state_begin(fb, &s->params);
276 qemu_mutex_lock_iothread();
277 qemu_savevm_state_complete_precopy(fb, false);
278 qemu_mutex_unlock_iothread();
279
280 qemu_fflush(fb);
281
282 colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
283 if (local_err) {
284 goto out;
285 }
286 /*
287 * We need the size of the VMstate data in Secondary side,
288 * With which we can decide how much data should be read.
289 */
290 colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
291 bioc->usage, &local_err);
292 if (local_err) {
293 goto out;
294 }
295
296 qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
297 qemu_fflush(s->to_dst_file);
298 ret = qemu_file_get_error(s->to_dst_file);
299 if (ret < 0) {
300 goto out;
301 }
302
303 colo_receive_check_message(s->rp_state.from_dst_file,
304 COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
305 if (local_err) {
306 goto out;
307 }
308
309 colo_receive_check_message(s->rp_state.from_dst_file,
310 COLO_MESSAGE_VMSTATE_LOADED, &local_err);
311 if (local_err) {
312 goto out;
313 }
314
315 ret = 0;
316
317 qemu_mutex_lock_iothread();
318 vm_start();
319 qemu_mutex_unlock_iothread();
320 trace_colo_vm_state_change("stop", "run");
321
322 out:
323 if (local_err) {
324 error_report_err(local_err);
325 }
326 return ret;
327 }
328
329 static void colo_process_checkpoint(MigrationState *s)
330 {
331 QIOChannelBuffer *bioc;
332 QEMUFile *fb = NULL;
333 int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
334 Error *local_err = NULL;
335 int ret;
336
337 failover_init_state();
338
339 s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
340 if (!s->rp_state.from_dst_file) {
341 error_report("Open QEMUFile from_dst_file failed");
342 goto out;
343 }
344
345 /*
346 * Wait for Secondary finish loading VM states and enter COLO
347 * restore.
348 */
349 colo_receive_check_message(s->rp_state.from_dst_file,
350 COLO_MESSAGE_CHECKPOINT_READY, &local_err);
351 if (local_err) {
352 goto out;
353 }
354 bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
355 fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
356 object_unref(OBJECT(bioc));
357
358 qemu_mutex_lock_iothread();
359 vm_start();
360 qemu_mutex_unlock_iothread();
361 trace_colo_vm_state_change("stop", "run");
362
363 timer_mod(s->colo_delay_timer,
364 current_time + s->parameters.x_checkpoint_delay);
365
366 while (s->state == MIGRATION_STATUS_COLO) {
367 if (failover_get_state() != FAILOVER_STATUS_NONE) {
368 error_report("failover request");
369 goto out;
370 }
371
372 qemu_sem_wait(&s->colo_checkpoint_sem);
373
374 ret = colo_do_checkpoint_transaction(s, bioc, fb);
375 if (ret < 0) {
376 goto out;
377 }
378 }
379
380 out:
381 /* Throw the unreported error message after exited from loop */
382 if (local_err) {
383 error_report_err(local_err);
384 }
385
386 if (fb) {
387 qemu_fclose(fb);
388 }
389
390 timer_del(s->colo_delay_timer);
391
392 /* Hope this not to be too long to wait here */
393 qemu_sem_wait(&s->colo_exit_sem);
394 qemu_sem_destroy(&s->colo_exit_sem);
395 /*
396 * Must be called after failover BH is completed,
397 * Or the failover BH may shutdown the wrong fd that
398 * re-used by other threads after we release here.
399 */
400 if (s->rp_state.from_dst_file) {
401 qemu_fclose(s->rp_state.from_dst_file);
402 }
403 }
404
405 void colo_checkpoint_notify(void *opaque)
406 {
407 MigrationState *s = opaque;
408 int64_t next_notify_time;
409
410 qemu_sem_post(&s->colo_checkpoint_sem);
411 s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
412 next_notify_time = s->colo_checkpoint_time +
413 s->parameters.x_checkpoint_delay;
414 timer_mod(s->colo_delay_timer, next_notify_time);
415 }
416
417 void migrate_start_colo_process(MigrationState *s)
418 {
419 qemu_mutex_unlock_iothread();
420 qemu_sem_init(&s->colo_checkpoint_sem, 0);
421 s->colo_delay_timer = timer_new_ms(QEMU_CLOCK_HOST,
422 colo_checkpoint_notify, s);
423
424 qemu_sem_init(&s->colo_exit_sem, 0);
425 migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
426 MIGRATION_STATUS_COLO);
427 colo_process_checkpoint(s);
428 qemu_mutex_lock_iothread();
429 }
430
431 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
432 Error **errp)
433 {
434 COLOMessage msg;
435 Error *local_err = NULL;
436
437 msg = colo_receive_message(f, &local_err);
438 if (local_err) {
439 error_propagate(errp, local_err);
440 return;
441 }
442
443 switch (msg) {
444 case COLO_MESSAGE_CHECKPOINT_REQUEST:
445 *checkpoint_request = 1;
446 break;
447 default:
448 *checkpoint_request = 0;
449 error_setg(errp, "Got unknown COLO message: %d", msg);
450 break;
451 }
452 }
453
454 void *colo_process_incoming_thread(void *opaque)
455 {
456 MigrationIncomingState *mis = opaque;
457 QEMUFile *fb = NULL;
458 QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
459 uint64_t total_size;
460 uint64_t value;
461 Error *local_err = NULL;
462
463 qemu_sem_init(&mis->colo_incoming_sem, 0);
464
465 migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
466 MIGRATION_STATUS_COLO);
467
468 failover_init_state();
469
470 mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
471 if (!mis->to_src_file) {
472 error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
473 goto out;
474 }
475 /*
476 * Note: the communication between Primary side and Secondary side
477 * should be sequential, we set the fd to unblocked in migration incoming
478 * coroutine, and here we are in the COLO incoming thread, so it is ok to
479 * set the fd back to blocked.
480 */
481 qemu_file_set_blocking(mis->from_src_file, true);
482
483 bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
484 fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
485 object_unref(OBJECT(bioc));
486
487 colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
488 &local_err);
489 if (local_err) {
490 goto out;
491 }
492
493 while (mis->state == MIGRATION_STATUS_COLO) {
494 int request = 0;
495
496 colo_wait_handle_message(mis->from_src_file, &request, &local_err);
497 if (local_err) {
498 goto out;
499 }
500 assert(request);
501 if (failover_get_state() != FAILOVER_STATUS_NONE) {
502 error_report("failover request");
503 goto out;
504 }
505
506 /* FIXME: This is unnecessary for periodic checkpoint mode */
507 colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
508 &local_err);
509 if (local_err) {
510 goto out;
511 }
512
513 colo_receive_check_message(mis->from_src_file,
514 COLO_MESSAGE_VMSTATE_SEND, &local_err);
515 if (local_err) {
516 goto out;
517 }
518
519 value = colo_receive_message_value(mis->from_src_file,
520 COLO_MESSAGE_VMSTATE_SIZE, &local_err);
521 if (local_err) {
522 goto out;
523 }
524
525 /*
526 * Read VM device state data into channel buffer,
527 * It's better to re-use the memory allocated.
528 * Here we need to handle the channel buffer directly.
529 */
530 if (value > bioc->capacity) {
531 bioc->capacity = value;
532 bioc->data = g_realloc(bioc->data, bioc->capacity);
533 }
534 total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
535 if (total_size != value) {
536 error_report("Got %" PRIu64 " VMState data, less than expected"
537 " %" PRIu64, total_size, value);
538 goto out;
539 }
540 bioc->usage = total_size;
541 qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
542
543 colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
544 &local_err);
545 if (local_err) {
546 goto out;
547 }
548
549 qemu_mutex_lock_iothread();
550 qemu_system_reset(VMRESET_SILENT);
551 if (qemu_loadvm_state(fb) < 0) {
552 error_report("COLO: loadvm failed");
553 qemu_mutex_unlock_iothread();
554 goto out;
555 }
556 qemu_mutex_unlock_iothread();
557
558 colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
559 &local_err);
560 if (local_err) {
561 goto out;
562 }
563 }
564
565 out:
566 /* Throw the unreported error message after exited from loop */
567 if (local_err) {
568 error_report_err(local_err);
569 }
570
571 if (fb) {
572 qemu_fclose(fb);
573 }
574
575 /* Hope this not to be too long to loop here */
576 qemu_sem_wait(&mis->colo_incoming_sem);
577 qemu_sem_destroy(&mis->colo_incoming_sem);
578 /* Must be called after failover BH is completed */
579 if (mis->to_src_file) {
580 qemu_fclose(mis->to_src_file);
581 }
582 migration_incoming_exit_colo();
583
584 return NULL;
585 }