4 * Copyright (c) 2019-2020 Red Hat Inc
7 * Juan Quintela <quintela@redhat.com>
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
13 #include "qemu/osdep.h"
15 #include "exec/target_page.h"
16 #include "sysemu/sysemu.h"
17 #include "exec/ramblock.h"
18 #include "qemu/error-report.h"
19 #include "qapi/error.h"
22 #include "migration.h"
23 #include "migration-stats.h"
26 #include "qemu-file.h"
29 #include "threadinfo.h"
31 #include "qemu/yank.h"
32 #include "io/channel-file.h"
33 #include "io/channel-socket.h"
34 #include "yank_functions.h"
38 #define MULTIFD_MAGIC 0x11223344U
39 #define MULTIFD_VERSION 1
44 unsigned char uuid
[16]; /* QemuUUID */
46 uint8_t unused1
[7]; /* Reserved for future use */
47 uint64_t unused2
[4]; /* Reserved for future use */
48 } __attribute__((packed
)) MultiFDInit_t
;
51 MultiFDSendParams
*params
;
52 /* array of pages to sent */
53 MultiFDPages_t
*pages
;
55 * Global number of generated multifd packets.
57 * Note that we used 'uintptr_t' because it'll naturally support atomic
58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems
59 * multifd will overflow the packet_num easier, but that should be
62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63 * hosts, however so far it does not support atomic fetch_add() yet.
64 * Make it easy for now.
68 * Synchronization point past which no more channels will be
71 QemuSemaphore channels_created
;
72 /* send channels ready */
73 QemuSemaphore channels_ready
;
75 * Have we already run terminate threads. There is a race when it
76 * happens that we got one error while we are exiting.
77 * We will use atomic operations. Only valid values are 0 and 1.
82 } *multifd_send_state
;
85 MultiFDRecvParams
*params
;
86 MultiFDRecvData
*data
;
87 /* number of created threads */
90 * This is always posted by the recv threads, the migration thread
91 * uses it to wait for recv threads to finish assigned tasks.
93 QemuSemaphore sem_sync
;
94 /* global number of generated multifd packets */
99 } *multifd_recv_state
;
101 static bool multifd_use_packets(void)
103 return !migrate_mapped_ram();
106 void multifd_send_channel_created(void)
108 qemu_sem_post(&multifd_send_state
->channels_created
);
111 static void multifd_set_file_bitmap(MultiFDSendParams
*p
)
113 MultiFDPages_t
*pages
= p
->pages
;
115 assert(pages
->block
);
117 for (int i
= 0; i
< p
->pages
->num
; i
++) {
118 ramblock_set_file_bmap_atomic(pages
->block
, pages
->offset
[i
], true);
122 /* Multifd without compression */
125 * nocomp_send_setup: setup send side
127 * @p: Params for the channel that we are using
128 * @errp: pointer to an error
130 static int nocomp_send_setup(MultiFDSendParams
*p
, Error
**errp
)
132 if (migrate_zero_copy_send()) {
133 p
->write_flags
|= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
;
140 * nocomp_send_cleanup: cleanup send side
142 * For no compression this function does nothing.
144 * @p: Params for the channel that we are using
145 * @errp: pointer to an error
147 static void nocomp_send_cleanup(MultiFDSendParams
*p
, Error
**errp
)
152 static void multifd_send_prepare_iovs(MultiFDSendParams
*p
)
154 MultiFDPages_t
*pages
= p
->pages
;
156 for (int i
= 0; i
< pages
->num
; i
++) {
157 p
->iov
[p
->iovs_num
].iov_base
= pages
->block
->host
+ pages
->offset
[i
];
158 p
->iov
[p
->iovs_num
].iov_len
= p
->page_size
;
162 p
->next_packet_size
= pages
->num
* p
->page_size
;
166 * nocomp_send_prepare: prepare date to be able to send
168 * For no compression we just have to calculate the size of the
171 * Returns 0 for success or -1 for error
173 * @p: Params for the channel that we are using
174 * @errp: pointer to an error
176 static int nocomp_send_prepare(MultiFDSendParams
*p
, Error
**errp
)
178 bool use_zero_copy_send
= migrate_zero_copy_send();
181 if (!multifd_use_packets()) {
182 multifd_send_prepare_iovs(p
);
183 multifd_set_file_bitmap(p
);
188 if (!use_zero_copy_send
) {
190 * Only !zerocopy needs the header in IOV; zerocopy will
191 * send it separately.
193 multifd_send_prepare_header(p
);
196 multifd_send_prepare_iovs(p
);
197 p
->flags
|= MULTIFD_FLAG_NOCOMP
;
199 multifd_send_fill_packet(p
);
201 if (use_zero_copy_send
) {
202 /* Send header first, without zerocopy */
203 ret
= qio_channel_write_all(p
->c
, (void *)p
->packet
,
204 p
->packet_len
, errp
);
214 * nocomp_recv_setup: setup receive side
216 * For no compression this function does nothing.
218 * Returns 0 for success or -1 for error
220 * @p: Params for the channel that we are using
221 * @errp: pointer to an error
223 static int nocomp_recv_setup(MultiFDRecvParams
*p
, Error
**errp
)
229 * nocomp_recv_cleanup: setup receive side
231 * For no compression this function does nothing.
233 * @p: Params for the channel that we are using
235 static void nocomp_recv_cleanup(MultiFDRecvParams
*p
)
240 * nocomp_recv: read the data from the channel
242 * For no compression we just need to read things into the correct place.
244 * Returns 0 for success or -1 for error
246 * @p: Params for the channel that we are using
247 * @errp: pointer to an error
249 static int nocomp_recv(MultiFDRecvParams
*p
, Error
**errp
)
253 if (!multifd_use_packets()) {
254 return multifd_file_recv_data(p
, errp
);
257 flags
= p
->flags
& MULTIFD_FLAG_COMPRESSION_MASK
;
259 if (flags
!= MULTIFD_FLAG_NOCOMP
) {
260 error_setg(errp
, "multifd %u: flags received %x flags expected %x",
261 p
->id
, flags
, MULTIFD_FLAG_NOCOMP
);
264 for (int i
= 0; i
< p
->normal_num
; i
++) {
265 p
->iov
[i
].iov_base
= p
->host
+ p
->normal
[i
];
266 p
->iov
[i
].iov_len
= p
->page_size
;
268 return qio_channel_readv_all(p
->c
, p
->iov
, p
->normal_num
, errp
);
271 static MultiFDMethods multifd_nocomp_ops
= {
272 .send_setup
= nocomp_send_setup
,
273 .send_cleanup
= nocomp_send_cleanup
,
274 .send_prepare
= nocomp_send_prepare
,
275 .recv_setup
= nocomp_recv_setup
,
276 .recv_cleanup
= nocomp_recv_cleanup
,
280 static MultiFDMethods
*multifd_ops
[MULTIFD_COMPRESSION__MAX
] = {
281 [MULTIFD_COMPRESSION_NONE
] = &multifd_nocomp_ops
,
284 void multifd_register_ops(int method
, MultiFDMethods
*ops
)
286 assert(0 < method
&& method
< MULTIFD_COMPRESSION__MAX
);
287 multifd_ops
[method
] = ops
;
290 /* Reset a MultiFDPages_t* object for the next use */
291 static void multifd_pages_reset(MultiFDPages_t
*pages
)
294 * We don't need to touch offset[] array, because it will be
295 * overwritten later when reused.
301 static int multifd_send_initial_packet(MultiFDSendParams
*p
, Error
**errp
)
303 MultiFDInit_t msg
= {};
304 size_t size
= sizeof(msg
);
307 msg
.magic
= cpu_to_be32(MULTIFD_MAGIC
);
308 msg
.version
= cpu_to_be32(MULTIFD_VERSION
);
310 memcpy(msg
.uuid
, &qemu_uuid
.data
, sizeof(msg
.uuid
));
312 ret
= qio_channel_write_all(p
->c
, (char *)&msg
, size
, errp
);
316 stat64_add(&mig_stats
.multifd_bytes
, size
);
320 static int multifd_recv_initial_packet(QIOChannel
*c
, Error
**errp
)
325 ret
= qio_channel_read_all(c
, (char *)&msg
, sizeof(msg
), errp
);
330 msg
.magic
= be32_to_cpu(msg
.magic
);
331 msg
.version
= be32_to_cpu(msg
.version
);
333 if (msg
.magic
!= MULTIFD_MAGIC
) {
334 error_setg(errp
, "multifd: received packet magic %x "
335 "expected %x", msg
.magic
, MULTIFD_MAGIC
);
339 if (msg
.version
!= MULTIFD_VERSION
) {
340 error_setg(errp
, "multifd: received packet version %u "
341 "expected %u", msg
.version
, MULTIFD_VERSION
);
345 if (memcmp(msg
.uuid
, &qemu_uuid
, sizeof(qemu_uuid
))) {
346 char *uuid
= qemu_uuid_unparse_strdup(&qemu_uuid
);
347 char *msg_uuid
= qemu_uuid_unparse_strdup((const QemuUUID
*)msg
.uuid
);
349 error_setg(errp
, "multifd: received uuid '%s' and expected "
350 "uuid '%s' for channel %hhd", msg_uuid
, uuid
, msg
.id
);
356 if (msg
.id
> migrate_multifd_channels()) {
357 error_setg(errp
, "multifd: received channel id %u is greater than "
358 "number of channels %u", msg
.id
, migrate_multifd_channels());
365 static MultiFDPages_t
*multifd_pages_init(uint32_t n
)
367 MultiFDPages_t
*pages
= g_new0(MultiFDPages_t
, 1);
369 pages
->allocated
= n
;
370 pages
->offset
= g_new0(ram_addr_t
, n
);
375 static void multifd_pages_clear(MultiFDPages_t
*pages
)
377 multifd_pages_reset(pages
);
378 pages
->allocated
= 0;
379 g_free(pages
->offset
);
380 pages
->offset
= NULL
;
384 void multifd_send_fill_packet(MultiFDSendParams
*p
)
386 MultiFDPacket_t
*packet
= p
->packet
;
387 MultiFDPages_t
*pages
= p
->pages
;
391 packet
->flags
= cpu_to_be32(p
->flags
);
392 packet
->pages_alloc
= cpu_to_be32(p
->pages
->allocated
);
393 packet
->normal_pages
= cpu_to_be32(pages
->num
);
394 packet
->next_packet_size
= cpu_to_be32(p
->next_packet_size
);
396 packet_num
= qatomic_fetch_inc(&multifd_send_state
->packet_num
);
397 packet
->packet_num
= cpu_to_be64(packet_num
);
400 strncpy(packet
->ramblock
, pages
->block
->idstr
, 256);
403 for (i
= 0; i
< pages
->num
; i
++) {
404 /* there are architectures where ram_addr_t is 32 bit */
405 uint64_t temp
= pages
->offset
[i
];
407 packet
->offset
[i
] = cpu_to_be64(temp
);
411 p
->total_normal_pages
+= pages
->num
;
413 trace_multifd_send(p
->id
, packet_num
, pages
->num
, p
->flags
,
414 p
->next_packet_size
);
417 static int multifd_recv_unfill_packet(MultiFDRecvParams
*p
, Error
**errp
)
419 MultiFDPacket_t
*packet
= p
->packet
;
422 packet
->magic
= be32_to_cpu(packet
->magic
);
423 if (packet
->magic
!= MULTIFD_MAGIC
) {
424 error_setg(errp
, "multifd: received packet "
425 "magic %x and expected magic %x",
426 packet
->magic
, MULTIFD_MAGIC
);
430 packet
->version
= be32_to_cpu(packet
->version
);
431 if (packet
->version
!= MULTIFD_VERSION
) {
432 error_setg(errp
, "multifd: received packet "
433 "version %u and expected version %u",
434 packet
->version
, MULTIFD_VERSION
);
438 p
->flags
= be32_to_cpu(packet
->flags
);
440 packet
->pages_alloc
= be32_to_cpu(packet
->pages_alloc
);
442 * If we received a packet that is 100 times bigger than expected
443 * just stop migration. It is a magic number.
445 if (packet
->pages_alloc
> p
->page_count
) {
446 error_setg(errp
, "multifd: received packet "
447 "with size %u and expected a size of %u",
448 packet
->pages_alloc
, p
->page_count
) ;
452 p
->normal_num
= be32_to_cpu(packet
->normal_pages
);
453 if (p
->normal_num
> packet
->pages_alloc
) {
454 error_setg(errp
, "multifd: received packet "
455 "with %u pages and expected maximum pages are %u",
456 p
->normal_num
, packet
->pages_alloc
) ;
460 p
->next_packet_size
= be32_to_cpu(packet
->next_packet_size
);
461 p
->packet_num
= be64_to_cpu(packet
->packet_num
);
463 p
->total_normal_pages
+= p
->normal_num
;
465 trace_multifd_recv(p
->id
, p
->packet_num
, p
->normal_num
, p
->flags
,
466 p
->next_packet_size
);
468 if (p
->normal_num
== 0) {
472 /* make sure that ramblock is 0 terminated */
473 packet
->ramblock
[255] = 0;
474 p
->block
= qemu_ram_block_by_name(packet
->ramblock
);
476 error_setg(errp
, "multifd: unknown ram block %s",
481 p
->host
= p
->block
->host
;
482 for (i
= 0; i
< p
->normal_num
; i
++) {
483 uint64_t offset
= be64_to_cpu(packet
->offset
[i
]);
485 if (offset
> (p
->block
->used_length
- p
->page_size
)) {
486 error_setg(errp
, "multifd: offset too long %" PRIu64
487 " (max " RAM_ADDR_FMT
")",
488 offset
, p
->block
->used_length
);
491 p
->normal
[i
] = offset
;
497 static bool multifd_send_should_exit(void)
499 return qatomic_read(&multifd_send_state
->exiting
);
502 static bool multifd_recv_should_exit(void)
504 return qatomic_read(&multifd_recv_state
->exiting
);
508 * The migration thread can wait on either of the two semaphores. This
509 * function can be used to kick the main thread out of waiting on either of
510 * them. Should mostly only be called when something wrong happened with
511 * the current multifd send thread.
513 static void multifd_send_kick_main(MultiFDSendParams
*p
)
515 qemu_sem_post(&p
->sem_sync
);
516 qemu_sem_post(&multifd_send_state
->channels_ready
);
520 * How we use multifd_send_state->pages and channel->pages?
522 * We create a pages for each channel, and a main one. Each time that
523 * we need to send a batch of pages we interchange the ones between
524 * multifd_send_state and the channel that is sending it. There are
525 * two reasons for that:
526 * - to not have to do so many mallocs during migration
527 * - to make easier to know what to free at the end of migration
529 * This way we always know who is the owner of each "pages" struct,
530 * and we don't need any locking. It belongs to the migration thread
531 * or to the channel thread. Switching is safe because the migration
532 * thread is using the channel mutex when changing it, and the channel
533 * have to had finish with its own, otherwise pending_job can't be
536 * Returns true if succeed, false otherwise.
538 static bool multifd_send_pages(void)
541 static int next_channel
;
542 MultiFDSendParams
*p
= NULL
; /* make happy gcc */
543 MultiFDPages_t
*pages
= multifd_send_state
->pages
;
545 if (multifd_send_should_exit()) {
549 /* We wait here, until at least one channel is ready */
550 qemu_sem_wait(&multifd_send_state
->channels_ready
);
553 * next_channel can remain from a previous migration that was
554 * using more channels, so ensure it doesn't overflow if the
555 * limit is lower now.
557 next_channel
%= migrate_multifd_channels();
558 for (i
= next_channel
;; i
= (i
+ 1) % migrate_multifd_channels()) {
559 if (multifd_send_should_exit()) {
562 p
= &multifd_send_state
->params
[i
];
564 * Lockless read to p->pending_job is safe, because only multifd
565 * sender thread can clear it.
567 if (qatomic_read(&p
->pending_job
) == false) {
568 next_channel
= (i
+ 1) % migrate_multifd_channels();
574 * Make sure we read p->pending_job before all the rest. Pairs with
575 * qatomic_store_release() in multifd_send_thread().
578 assert(!p
->pages
->num
);
579 multifd_send_state
->pages
= p
->pages
;
582 * Making sure p->pages is setup before marking pending_job=true. Pairs
583 * with the qatomic_load_acquire() in multifd_send_thread().
585 qatomic_store_release(&p
->pending_job
, true);
586 qemu_sem_post(&p
->sem
);
591 static inline bool multifd_queue_empty(MultiFDPages_t
*pages
)
593 return pages
->num
== 0;
596 static inline bool multifd_queue_full(MultiFDPages_t
*pages
)
598 return pages
->num
== pages
->allocated
;
601 static inline void multifd_enqueue(MultiFDPages_t
*pages
, ram_addr_t offset
)
603 pages
->offset
[pages
->num
++] = offset
;
606 /* Returns true if enqueue successful, false otherwise */
607 bool multifd_queue_page(RAMBlock
*block
, ram_addr_t offset
)
609 MultiFDPages_t
*pages
;
612 pages
= multifd_send_state
->pages
;
614 /* If the queue is empty, we can already enqueue now */
615 if (multifd_queue_empty(pages
)) {
616 pages
->block
= block
;
617 multifd_enqueue(pages
, offset
);
622 * Not empty, meanwhile we need a flush. It can because of either:
624 * (1) The page is not on the same ramblock of previous ones, or,
625 * (2) The queue is full.
627 * After flush, always retry.
629 if (pages
->block
!= block
|| multifd_queue_full(pages
)) {
630 if (!multifd_send_pages()) {
636 /* Not empty, and we still have space, do it! */
637 multifd_enqueue(pages
, offset
);
641 /* Multifd send side hit an error; remember it and prepare to quit */
642 static void multifd_send_set_error(Error
*err
)
645 * We don't want to exit each threads twice. Depending on where
646 * we get the error, or if there are two independent errors in two
647 * threads at the same time, we can end calling this function
650 if (qatomic_xchg(&multifd_send_state
->exiting
, 1)) {
655 MigrationState
*s
= migrate_get_current();
656 migrate_set_error(s
, err
);
657 if (s
->state
== MIGRATION_STATUS_SETUP
||
658 s
->state
== MIGRATION_STATUS_PRE_SWITCHOVER
||
659 s
->state
== MIGRATION_STATUS_DEVICE
||
660 s
->state
== MIGRATION_STATUS_ACTIVE
) {
661 migrate_set_state(&s
->state
, s
->state
,
662 MIGRATION_STATUS_FAILED
);
667 static void multifd_send_terminate_threads(void)
671 trace_multifd_send_terminate_threads();
674 * Tell everyone we're quitting. No xchg() needed here; we simply
677 qatomic_set(&multifd_send_state
->exiting
, 1);
680 * Firstly, kick all threads out; no matter whether they are just idle,
681 * or blocked in an IO system call.
683 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
684 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
686 qemu_sem_post(&p
->sem
);
688 qio_channel_shutdown(p
->c
, QIO_CHANNEL_SHUTDOWN_BOTH
, NULL
);
693 * Finally recycle all the threads.
695 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
696 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
698 if (p
->tls_thread_created
) {
699 qemu_thread_join(&p
->tls_thread
);
702 if (p
->thread_created
) {
703 qemu_thread_join(&p
->thread
);
708 static bool multifd_send_cleanup_channel(MultiFDSendParams
*p
, Error
**errp
)
711 migration_ioc_unregister_yank(p
->c
);
713 * The object_unref() cannot guarantee the fd will always be
714 * released because finalize() of the iochannel is only
715 * triggered on the last reference and it's not guaranteed
716 * that we always hold the last refcount when reaching here.
718 * Closing the fd explicitly has the benefit that if there is any
719 * registered I/O handler callbacks on such fd, that will get a
720 * POLLNVAL event and will further trigger the cleanup to finally
723 * FIXME: It should logically be guaranteed that all multifd
724 * channels have no I/O handler callback registered when reaching
725 * here, because migration thread will wait for all multifd channel
726 * establishments to complete during setup. Since
727 * migrate_fd_cleanup() will be scheduled in main thread too, all
728 * previous callbacks should guarantee to be completed when
729 * reaching here. See multifd_send_state.channels_created and its
730 * usage. In the future, we could replace this with an assert
731 * making sure we're the last reference, or simply drop it if above
732 * is more clear to be justified.
734 qio_channel_close(p
->c
, &error_abort
);
735 object_unref(OBJECT(p
->c
));
738 qemu_sem_destroy(&p
->sem
);
739 qemu_sem_destroy(&p
->sem_sync
);
742 multifd_pages_clear(p
->pages
);
749 multifd_send_state
->ops
->send_cleanup(p
, errp
);
751 return *errp
== NULL
;
754 static void multifd_send_cleanup_state(void)
756 file_cleanup_outgoing_migration();
757 fd_cleanup_outgoing_migration();
758 socket_cleanup_outgoing_migration();
759 qemu_sem_destroy(&multifd_send_state
->channels_created
);
760 qemu_sem_destroy(&multifd_send_state
->channels_ready
);
761 g_free(multifd_send_state
->params
);
762 multifd_send_state
->params
= NULL
;
763 multifd_pages_clear(multifd_send_state
->pages
);
764 multifd_send_state
->pages
= NULL
;
765 g_free(multifd_send_state
);
766 multifd_send_state
= NULL
;
769 void multifd_send_shutdown(void)
773 if (!migrate_multifd()) {
777 multifd_send_terminate_threads();
779 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
780 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
781 Error
*local_err
= NULL
;
783 if (!multifd_send_cleanup_channel(p
, &local_err
)) {
784 migrate_set_error(migrate_get_current(), local_err
);
785 error_free(local_err
);
789 multifd_send_cleanup_state();
792 static int multifd_zero_copy_flush(QIOChannel
*c
)
797 ret
= qio_channel_flush(c
, &err
);
799 error_report_err(err
);
803 stat64_add(&mig_stats
.dirty_sync_missed_zero_copy
, 1);
809 int multifd_send_sync_main(void)
812 bool flush_zero_copy
;
814 if (!migrate_multifd()) {
817 if (multifd_send_state
->pages
->num
) {
818 if (!multifd_send_pages()) {
819 error_report("%s: multifd_send_pages fail", __func__
);
824 flush_zero_copy
= migrate_zero_copy_send();
826 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
827 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
829 if (multifd_send_should_exit()) {
833 trace_multifd_send_sync_main_signal(p
->id
);
836 * We should be the only user so far, so not possible to be set by
837 * others concurrently.
839 assert(qatomic_read(&p
->pending_sync
) == false);
840 qatomic_set(&p
->pending_sync
, true);
841 qemu_sem_post(&p
->sem
);
843 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
844 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
846 if (multifd_send_should_exit()) {
850 qemu_sem_wait(&multifd_send_state
->channels_ready
);
851 trace_multifd_send_sync_main_wait(p
->id
);
852 qemu_sem_wait(&p
->sem_sync
);
854 if (flush_zero_copy
&& p
->c
&& (multifd_zero_copy_flush(p
->c
) < 0)) {
858 trace_multifd_send_sync_main(multifd_send_state
->packet_num
);
863 static void *multifd_send_thread(void *opaque
)
865 MultiFDSendParams
*p
= opaque
;
866 MigrationThread
*thread
= NULL
;
867 Error
*local_err
= NULL
;
869 bool use_packets
= multifd_use_packets();
871 thread
= migration_threads_add(p
->name
, qemu_get_thread_id());
873 trace_multifd_send_thread_start(p
->id
);
874 rcu_register_thread();
877 if (multifd_send_initial_packet(p
, &local_err
) < 0) {
884 qemu_sem_post(&multifd_send_state
->channels_ready
);
885 qemu_sem_wait(&p
->sem
);
887 if (multifd_send_should_exit()) {
892 * Read pending_job flag before p->pages. Pairs with the
893 * qatomic_store_release() in multifd_send_pages().
895 if (qatomic_load_acquire(&p
->pending_job
)) {
896 MultiFDPages_t
*pages
= p
->pages
;
901 ret
= multifd_send_state
->ops
->send_prepare(p
, &local_err
);
906 if (migrate_mapped_ram()) {
907 ret
= file_write_ramblock_iov(p
->c
, p
->iov
, p
->iovs_num
,
908 p
->pages
->block
, &local_err
);
910 ret
= qio_channel_writev_full_all(p
->c
, p
->iov
, p
->iovs_num
,
911 NULL
, 0, p
->write_flags
,
919 stat64_add(&mig_stats
.multifd_bytes
,
920 p
->next_packet_size
+ p
->packet_len
);
922 multifd_pages_reset(p
->pages
);
923 p
->next_packet_size
= 0;
926 * Making sure p->pages is published before saying "we're
927 * free". Pairs with the smp_mb_acquire() in
928 * multifd_send_pages().
930 qatomic_store_release(&p
->pending_job
, false);
933 * If not a normal job, must be a sync request. Note that
934 * pending_sync is a standalone flag (unlike pending_job), so
935 * it doesn't require explicit memory barriers.
937 assert(qatomic_read(&p
->pending_sync
));
940 p
->flags
= MULTIFD_FLAG_SYNC
;
941 multifd_send_fill_packet(p
);
942 ret
= qio_channel_write_all(p
->c
, (void *)p
->packet
,
943 p
->packet_len
, &local_err
);
947 /* p->next_packet_size will always be zero for a SYNC packet */
948 stat64_add(&mig_stats
.multifd_bytes
, p
->packet_len
);
952 qatomic_set(&p
->pending_sync
, false);
953 qemu_sem_post(&p
->sem_sync
);
960 trace_multifd_send_error(p
->id
);
961 multifd_send_set_error(local_err
);
962 multifd_send_kick_main(p
);
963 error_free(local_err
);
966 rcu_unregister_thread();
967 migration_threads_remove(thread
);
968 trace_multifd_send_thread_end(p
->id
, p
->packets_sent
, p
->total_normal_pages
);
973 static void multifd_new_send_channel_async(QIOTask
*task
, gpointer opaque
);
976 MultiFDSendParams
*p
;
978 } MultiFDTLSThreadArgs
;
980 static void *multifd_tls_handshake_thread(void *opaque
)
982 MultiFDTLSThreadArgs
*args
= opaque
;
984 qio_channel_tls_handshake(args
->tioc
,
985 multifd_new_send_channel_async
,
994 static bool multifd_tls_channel_connect(MultiFDSendParams
*p
,
998 MigrationState
*s
= migrate_get_current();
999 const char *hostname
= s
->hostname
;
1000 MultiFDTLSThreadArgs
*args
;
1001 QIOChannelTLS
*tioc
;
1003 tioc
= migration_tls_client_create(ioc
, hostname
, errp
);
1009 * Ownership of the socket channel now transfers to the newly
1010 * created TLS channel, which has already taken a reference.
1012 object_unref(OBJECT(ioc
));
1013 trace_multifd_tls_outgoing_handshake_start(ioc
, tioc
, hostname
);
1014 qio_channel_set_name(QIO_CHANNEL(tioc
), "multifd-tls-outgoing");
1016 args
= g_new0(MultiFDTLSThreadArgs
, 1);
1020 p
->tls_thread_created
= true;
1021 qemu_thread_create(&p
->tls_thread
, "multifd-tls-handshake-worker",
1022 multifd_tls_handshake_thread
, args
,
1023 QEMU_THREAD_JOINABLE
);
1027 void multifd_channel_connect(MultiFDSendParams
*p
, QIOChannel
*ioc
)
1029 qio_channel_set_delay(ioc
, false);
1031 migration_ioc_register_yank(ioc
);
1032 /* Setup p->c only if the channel is completely setup */
1035 p
->thread_created
= true;
1036 qemu_thread_create(&p
->thread
, p
->name
, multifd_send_thread
, p
,
1037 QEMU_THREAD_JOINABLE
);
1041 * When TLS is enabled this function is called once to establish the
1042 * TLS connection and a second time after the TLS handshake to create
1043 * the multifd channel. Without TLS it goes straight into the channel
1046 static void multifd_new_send_channel_async(QIOTask
*task
, gpointer opaque
)
1048 MultiFDSendParams
*p
= opaque
;
1049 QIOChannel
*ioc
= QIO_CHANNEL(qio_task_get_source(task
));
1050 Error
*local_err
= NULL
;
1053 trace_multifd_new_send_channel_async(p
->id
);
1055 if (qio_task_propagate_error(task
, &local_err
)) {
1060 trace_multifd_set_outgoing_channel(ioc
, object_get_typename(OBJECT(ioc
)),
1061 migrate_get_current()->hostname
);
1063 if (migrate_channel_requires_tls_upgrade(ioc
)) {
1064 ret
= multifd_tls_channel_connect(p
, ioc
, &local_err
);
1069 multifd_channel_connect(p
, ioc
);
1075 * Here we're not interested whether creation succeeded, only that
1076 * it happened at all.
1078 multifd_send_channel_created();
1084 trace_multifd_new_send_channel_async_error(p
->id
, local_err
);
1085 multifd_send_set_error(local_err
);
1087 * For error cases (TLS or non-TLS), IO channel is always freed here
1088 * rather than when cleanup multifd: since p->c is not set, multifd
1089 * cleanup code doesn't even know its existence.
1091 object_unref(OBJECT(ioc
));
1092 error_free(local_err
);
1095 static bool multifd_new_send_channel_create(gpointer opaque
, Error
**errp
)
1097 if (!multifd_use_packets()) {
1098 return file_send_channel_create(opaque
, errp
);
1101 socket_send_channel_create(multifd_new_send_channel_async
, opaque
);
1105 bool multifd_send_setup(void)
1107 MigrationState
*s
= migrate_get_current();
1108 Error
*local_err
= NULL
;
1109 int thread_count
, ret
= 0;
1110 uint32_t page_count
= MULTIFD_PACKET_SIZE
/ qemu_target_page_size();
1111 bool use_packets
= multifd_use_packets();
1114 if (!migrate_multifd()) {
1118 thread_count
= migrate_multifd_channels();
1119 multifd_send_state
= g_malloc0(sizeof(*multifd_send_state
));
1120 multifd_send_state
->params
= g_new0(MultiFDSendParams
, thread_count
);
1121 multifd_send_state
->pages
= multifd_pages_init(page_count
);
1122 qemu_sem_init(&multifd_send_state
->channels_created
, 0);
1123 qemu_sem_init(&multifd_send_state
->channels_ready
, 0);
1124 qatomic_set(&multifd_send_state
->exiting
, 0);
1125 multifd_send_state
->ops
= multifd_ops
[migrate_multifd_compression()];
1127 for (i
= 0; i
< thread_count
; i
++) {
1128 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
1130 qemu_sem_init(&p
->sem
, 0);
1131 qemu_sem_init(&p
->sem_sync
, 0);
1133 p
->pages
= multifd_pages_init(page_count
);
1136 p
->packet_len
= sizeof(MultiFDPacket_t
)
1137 + sizeof(uint64_t) * page_count
;
1138 p
->packet
= g_malloc0(p
->packet_len
);
1139 p
->packet
->magic
= cpu_to_be32(MULTIFD_MAGIC
);
1140 p
->packet
->version
= cpu_to_be32(MULTIFD_VERSION
);
1142 /* We need one extra place for the packet header */
1143 p
->iov
= g_new0(struct iovec
, page_count
+ 1);
1145 p
->iov
= g_new0(struct iovec
, page_count
);
1147 p
->name
= g_strdup_printf("multifdsend_%d", i
);
1148 p
->page_size
= qemu_target_page_size();
1149 p
->page_count
= page_count
;
1152 if (!multifd_new_send_channel_create(p
, &local_err
)) {
1158 * Wait until channel creation has started for all channels. The
1159 * creation can still fail, but no more channels will be created
1162 for (i
= 0; i
< thread_count
; i
++) {
1163 qemu_sem_wait(&multifd_send_state
->channels_created
);
1166 for (i
= 0; i
< thread_count
; i
++) {
1167 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
1169 ret
= multifd_send_state
->ops
->send_setup(p
, &local_err
);
1176 migrate_set_error(s
, local_err
);
1177 error_report_err(local_err
);
1178 migrate_set_state(&s
->state
, MIGRATION_STATUS_SETUP
,
1179 MIGRATION_STATUS_FAILED
);
1186 bool multifd_recv(void)
1189 static int next_recv_channel
;
1190 MultiFDRecvParams
*p
= NULL
;
1191 MultiFDRecvData
*data
= multifd_recv_state
->data
;
1194 * next_channel can remain from a previous migration that was
1195 * using more channels, so ensure it doesn't overflow if the
1196 * limit is lower now.
1198 next_recv_channel
%= migrate_multifd_channels();
1199 for (i
= next_recv_channel
;; i
= (i
+ 1) % migrate_multifd_channels()) {
1200 if (multifd_recv_should_exit()) {
1204 p
= &multifd_recv_state
->params
[i
];
1206 if (qatomic_read(&p
->pending_job
) == false) {
1207 next_recv_channel
= (i
+ 1) % migrate_multifd_channels();
1213 * Order pending_job read before manipulating p->data below. Pairs
1214 * with qatomic_store_release() at multifd_recv_thread().
1218 assert(!p
->data
->size
);
1219 multifd_recv_state
->data
= p
->data
;
1223 * Order p->data update before setting pending_job. Pairs with
1224 * qatomic_load_acquire() at multifd_recv_thread().
1226 qatomic_store_release(&p
->pending_job
, true);
1227 qemu_sem_post(&p
->sem
);
1232 MultiFDRecvData
*multifd_get_recv_data(void)
1234 return multifd_recv_state
->data
;
1237 static void multifd_recv_terminate_threads(Error
*err
)
1241 trace_multifd_recv_terminate_threads(err
!= NULL
);
1243 if (qatomic_xchg(&multifd_recv_state
->exiting
, 1)) {
1248 MigrationState
*s
= migrate_get_current();
1249 migrate_set_error(s
, err
);
1250 if (s
->state
== MIGRATION_STATUS_SETUP
||
1251 s
->state
== MIGRATION_STATUS_ACTIVE
) {
1252 migrate_set_state(&s
->state
, s
->state
,
1253 MIGRATION_STATUS_FAILED
);
1257 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
1258 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1261 * The migration thread and channels interact differently
1262 * depending on the presence of packets.
1264 if (multifd_use_packets()) {
1266 * The channel receives as long as there are packets. When
1267 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1268 * channel waits for the migration thread to sync. If the
1269 * sync never happens, do it here.
1271 qemu_sem_post(&p
->sem_sync
);
1274 * The channel waits for the migration thread to give it
1275 * work. When the migration thread runs out of work, it
1276 * releases the channel and waits for any pending work to
1277 * finish. If we reach here (e.g. due to error) before the
1278 * work runs out, release the channel.
1280 qemu_sem_post(&p
->sem
);
1284 * We could arrive here for two reasons:
1285 * - normal quit, i.e. everything went fine, just finished
1286 * - error quit: We close the channels so the channel threads
1287 * finish the qio_channel_read_all_eof()
1290 qio_channel_shutdown(p
->c
, QIO_CHANNEL_SHUTDOWN_BOTH
, NULL
);
1295 void multifd_recv_shutdown(void)
1297 if (migrate_multifd()) {
1298 multifd_recv_terminate_threads(NULL
);
1302 static void multifd_recv_cleanup_channel(MultiFDRecvParams
*p
)
1304 migration_ioc_unregister_yank(p
->c
);
1305 object_unref(OBJECT(p
->c
));
1307 qemu_mutex_destroy(&p
->mutex
);
1308 qemu_sem_destroy(&p
->sem_sync
);
1309 qemu_sem_destroy(&p
->sem
);
1319 multifd_recv_state
->ops
->recv_cleanup(p
);
1322 static void multifd_recv_cleanup_state(void)
1324 qemu_sem_destroy(&multifd_recv_state
->sem_sync
);
1325 g_free(multifd_recv_state
->params
);
1326 multifd_recv_state
->params
= NULL
;
1327 g_free(multifd_recv_state
->data
);
1328 multifd_recv_state
->data
= NULL
;
1329 g_free(multifd_recv_state
);
1330 multifd_recv_state
= NULL
;
1333 void multifd_recv_cleanup(void)
1337 if (!migrate_multifd()) {
1340 multifd_recv_terminate_threads(NULL
);
1341 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
1342 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1344 if (p
->thread_created
) {
1345 qemu_thread_join(&p
->thread
);
1348 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
1349 multifd_recv_cleanup_channel(&multifd_recv_state
->params
[i
]);
1351 multifd_recv_cleanup_state();
1354 void multifd_recv_sync_main(void)
1356 int thread_count
= migrate_multifd_channels();
1357 bool file_based
= !multifd_use_packets();
1360 if (!migrate_multifd()) {
1365 * File-based channels don't use packets and therefore need to
1366 * wait for more work. Release them to start the sync.
1369 for (i
= 0; i
< thread_count
; i
++) {
1370 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1372 trace_multifd_recv_sync_main_signal(p
->id
);
1373 qemu_sem_post(&p
->sem
);
1378 * Initiate the synchronization by waiting for all channels.
1380 * For socket-based migration this means each channel has received
1381 * the SYNC packet on the stream.
1383 * For file-based migration this means each channel is done with
1384 * the work (pending_job=false).
1386 for (i
= 0; i
< thread_count
; i
++) {
1387 trace_multifd_recv_sync_main_wait(i
);
1388 qemu_sem_wait(&multifd_recv_state
->sem_sync
);
1393 * For file-based loading is done in one iteration. We're
1400 * Sync done. Release the channels for the next iteration.
1402 for (i
= 0; i
< thread_count
; i
++) {
1403 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1405 WITH_QEMU_LOCK_GUARD(&p
->mutex
) {
1406 if (multifd_recv_state
->packet_num
< p
->packet_num
) {
1407 multifd_recv_state
->packet_num
= p
->packet_num
;
1410 trace_multifd_recv_sync_main_signal(p
->id
);
1411 qemu_sem_post(&p
->sem_sync
);
1413 trace_multifd_recv_sync_main(multifd_recv_state
->packet_num
);
1416 static void *multifd_recv_thread(void *opaque
)
1418 MultiFDRecvParams
*p
= opaque
;
1419 Error
*local_err
= NULL
;
1420 bool use_packets
= multifd_use_packets();
1423 trace_multifd_recv_thread_start(p
->id
);
1424 rcu_register_thread();
1428 bool has_data
= false;
1432 if (multifd_recv_should_exit()) {
1436 ret
= qio_channel_read_all_eof(p
->c
, (void *)p
->packet
,
1437 p
->packet_len
, &local_err
);
1438 if (ret
== 0 || ret
== -1) { /* 0: EOF -1: Error */
1442 qemu_mutex_lock(&p
->mutex
);
1443 ret
= multifd_recv_unfill_packet(p
, &local_err
);
1445 qemu_mutex_unlock(&p
->mutex
);
1450 /* recv methods don't know how to handle the SYNC flag */
1451 p
->flags
&= ~MULTIFD_FLAG_SYNC
;
1452 has_data
= !!p
->normal_num
;
1453 qemu_mutex_unlock(&p
->mutex
);
1456 * No packets, so we need to wait for the vmstate code to
1459 qemu_sem_wait(&p
->sem
);
1461 if (multifd_recv_should_exit()) {
1465 /* pairs with qatomic_store_release() at multifd_recv() */
1466 if (!qatomic_load_acquire(&p
->pending_job
)) {
1468 * Migration thread did not send work, this is
1469 * equivalent to pending_sync on the sending
1470 * side. Post sem_sync to notify we reached this
1473 qemu_sem_post(&multifd_recv_state
->sem_sync
);
1477 has_data
= !!p
->data
->size
;
1481 ret
= multifd_recv_state
->ops
->recv(p
, &local_err
);
1488 if (flags
& MULTIFD_FLAG_SYNC
) {
1489 qemu_sem_post(&multifd_recv_state
->sem_sync
);
1490 qemu_sem_wait(&p
->sem_sync
);
1493 p
->total_normal_pages
+= p
->data
->size
/ qemu_target_page_size();
1496 * Order data->size update before clearing
1497 * pending_job. Pairs with smp_mb_acquire() at
1500 qatomic_store_release(&p
->pending_job
, false);
1505 multifd_recv_terminate_threads(local_err
);
1506 error_free(local_err
);
1509 rcu_unregister_thread();
1510 trace_multifd_recv_thread_end(p
->id
, p
->packets_recved
, p
->total_normal_pages
);
1515 int multifd_recv_setup(Error
**errp
)
1518 uint32_t page_count
= MULTIFD_PACKET_SIZE
/ qemu_target_page_size();
1519 bool use_packets
= multifd_use_packets();
1523 * Return successfully if multiFD recv state is already initialised
1524 * or multiFD is not enabled.
1526 if (multifd_recv_state
|| !migrate_multifd()) {
1530 thread_count
= migrate_multifd_channels();
1531 multifd_recv_state
= g_malloc0(sizeof(*multifd_recv_state
));
1532 multifd_recv_state
->params
= g_new0(MultiFDRecvParams
, thread_count
);
1534 multifd_recv_state
->data
= g_new0(MultiFDRecvData
, 1);
1535 multifd_recv_state
->data
->size
= 0;
1537 qatomic_set(&multifd_recv_state
->count
, 0);
1538 qatomic_set(&multifd_recv_state
->exiting
, 0);
1539 qemu_sem_init(&multifd_recv_state
->sem_sync
, 0);
1540 multifd_recv_state
->ops
= multifd_ops
[migrate_multifd_compression()];
1542 for (i
= 0; i
< thread_count
; i
++) {
1543 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1545 qemu_mutex_init(&p
->mutex
);
1546 qemu_sem_init(&p
->sem_sync
, 0);
1547 qemu_sem_init(&p
->sem
, 0);
1548 p
->pending_job
= false;
1551 p
->data
= g_new0(MultiFDRecvData
, 1);
1555 p
->packet_len
= sizeof(MultiFDPacket_t
)
1556 + sizeof(uint64_t) * page_count
;
1557 p
->packet
= g_malloc0(p
->packet_len
);
1559 p
->name
= g_strdup_printf("multifdrecv_%d", i
);
1560 p
->iov
= g_new0(struct iovec
, page_count
);
1561 p
->normal
= g_new0(ram_addr_t
, page_count
);
1562 p
->page_count
= page_count
;
1563 p
->page_size
= qemu_target_page_size();
1566 for (i
= 0; i
< thread_count
; i
++) {
1567 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1570 ret
= multifd_recv_state
->ops
->recv_setup(p
, errp
);
1578 bool multifd_recv_all_channels_created(void)
1580 int thread_count
= migrate_multifd_channels();
1582 if (!migrate_multifd()) {
1586 if (!multifd_recv_state
) {
1587 /* Called before any connections created */
1591 return thread_count
== qatomic_read(&multifd_recv_state
->count
);
1595 * Try to receive all multifd channels to get ready for the migration.
1596 * Sets @errp when failing to receive the current channel.
1598 void multifd_recv_new_channel(QIOChannel
*ioc
, Error
**errp
)
1600 MultiFDRecvParams
*p
;
1601 Error
*local_err
= NULL
;
1602 bool use_packets
= multifd_use_packets();
1606 id
= multifd_recv_initial_packet(ioc
, &local_err
);
1608 multifd_recv_terminate_threads(local_err
);
1609 error_propagate_prepend(errp
, local_err
,
1610 "failed to receive packet"
1611 " via multifd channel %d: ",
1612 qatomic_read(&multifd_recv_state
->count
));
1615 trace_multifd_recv_new_channel(id
);
1617 id
= qatomic_read(&multifd_recv_state
->count
);
1620 p
= &multifd_recv_state
->params
[id
];
1622 error_setg(&local_err
, "multifd: received id '%d' already setup'",
1624 multifd_recv_terminate_threads(local_err
);
1625 error_propagate(errp
, local_err
);
1629 object_ref(OBJECT(ioc
));
1631 p
->thread_created
= true;
1632 qemu_thread_create(&p
->thread
, p
->name
, multifd_recv_thread
, p
,
1633 QEMU_THREAD_JOINABLE
);
1634 qatomic_inc(&multifd_recv_state
->count
);