]> git.proxmox.com Git - mirror_qemu.git/blob - migration/multifd.c
migration/multifd: Allow clearing of the file_bmap from multifd
[mirror_qemu.git] / migration / multifd.c
1 /*
2 * Multifd common code
3 *
4 * Copyright (c) 2019-2020 Red Hat Inc
5 *
6 * Authors:
7 * Juan Quintela <quintela@redhat.com>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
11 */
12
13 #include "qemu/osdep.h"
14 #include "qemu/rcu.h"
15 #include "exec/target_page.h"
16 #include "sysemu/sysemu.h"
17 #include "exec/ramblock.h"
18 #include "qemu/error-report.h"
19 #include "qapi/error.h"
20 #include "fd.h"
21 #include "file.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "socket.h"
25 #include "tls.h"
26 #include "qemu-file.h"
27 #include "trace.h"
28 #include "multifd.h"
29 #include "threadinfo.h"
30 #include "options.h"
31 #include "qemu/yank.h"
32 #include "io/channel-file.h"
33 #include "io/channel-socket.h"
34 #include "yank_functions.h"
35
36 /* Multiple fd's */
37
38 #define MULTIFD_MAGIC 0x11223344U
39 #define MULTIFD_VERSION 1
40
41 typedef struct {
42 uint32_t magic;
43 uint32_t version;
44 unsigned char uuid[16]; /* QemuUUID */
45 uint8_t id;
46 uint8_t unused1[7]; /* Reserved for future use */
47 uint64_t unused2[4]; /* Reserved for future use */
48 } __attribute__((packed)) MultiFDInit_t;
49
50 struct {
51 MultiFDSendParams *params;
52 /* array of pages to sent */
53 MultiFDPages_t *pages;
54 /*
55 * Global number of generated multifd packets.
56 *
57 * Note that we used 'uintptr_t' because it'll naturally support atomic
58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems
59 * multifd will overflow the packet_num easier, but that should be
60 * fine.
61 *
62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63 * hosts, however so far it does not support atomic fetch_add() yet.
64 * Make it easy for now.
65 */
66 uintptr_t packet_num;
67 /*
68 * Synchronization point past which no more channels will be
69 * created.
70 */
71 QemuSemaphore channels_created;
72 /* send channels ready */
73 QemuSemaphore channels_ready;
74 /*
75 * Have we already run terminate threads. There is a race when it
76 * happens that we got one error while we are exiting.
77 * We will use atomic operations. Only valid values are 0 and 1.
78 */
79 int exiting;
80 /* multifd ops */
81 MultiFDMethods *ops;
82 } *multifd_send_state;
83
84 struct {
85 MultiFDRecvParams *params;
86 MultiFDRecvData *data;
87 /* number of created threads */
88 int count;
89 /*
90 * This is always posted by the recv threads, the migration thread
91 * uses it to wait for recv threads to finish assigned tasks.
92 */
93 QemuSemaphore sem_sync;
94 /* global number of generated multifd packets */
95 uint64_t packet_num;
96 int exiting;
97 /* multifd ops */
98 MultiFDMethods *ops;
99 } *multifd_recv_state;
100
101 static bool multifd_use_packets(void)
102 {
103 return !migrate_mapped_ram();
104 }
105
106 void multifd_send_channel_created(void)
107 {
108 qemu_sem_post(&multifd_send_state->channels_created);
109 }
110
111 static void multifd_set_file_bitmap(MultiFDSendParams *p)
112 {
113 MultiFDPages_t *pages = p->pages;
114
115 assert(pages->block);
116
117 for (int i = 0; i < p->pages->num; i++) {
118 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true);
119 }
120 }
121
122 /* Multifd without compression */
123
124 /**
125 * nocomp_send_setup: setup send side
126 *
127 * @p: Params for the channel that we are using
128 * @errp: pointer to an error
129 */
130 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
131 {
132 if (migrate_zero_copy_send()) {
133 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
134 }
135
136 return 0;
137 }
138
139 /**
140 * nocomp_send_cleanup: cleanup send side
141 *
142 * For no compression this function does nothing.
143 *
144 * @p: Params for the channel that we are using
145 * @errp: pointer to an error
146 */
147 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
148 {
149 return;
150 }
151
152 static void multifd_send_prepare_iovs(MultiFDSendParams *p)
153 {
154 MultiFDPages_t *pages = p->pages;
155
156 for (int i = 0; i < pages->num; i++) {
157 p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
158 p->iov[p->iovs_num].iov_len = p->page_size;
159 p->iovs_num++;
160 }
161
162 p->next_packet_size = pages->num * p->page_size;
163 }
164
165 /**
166 * nocomp_send_prepare: prepare date to be able to send
167 *
168 * For no compression we just have to calculate the size of the
169 * packet.
170 *
171 * Returns 0 for success or -1 for error
172 *
173 * @p: Params for the channel that we are using
174 * @errp: pointer to an error
175 */
176 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
177 {
178 bool use_zero_copy_send = migrate_zero_copy_send();
179 int ret;
180
181 if (!multifd_use_packets()) {
182 multifd_send_prepare_iovs(p);
183 multifd_set_file_bitmap(p);
184
185 return 0;
186 }
187
188 if (!use_zero_copy_send) {
189 /*
190 * Only !zerocopy needs the header in IOV; zerocopy will
191 * send it separately.
192 */
193 multifd_send_prepare_header(p);
194 }
195
196 multifd_send_prepare_iovs(p);
197 p->flags |= MULTIFD_FLAG_NOCOMP;
198
199 multifd_send_fill_packet(p);
200
201 if (use_zero_copy_send) {
202 /* Send header first, without zerocopy */
203 ret = qio_channel_write_all(p->c, (void *)p->packet,
204 p->packet_len, errp);
205 if (ret != 0) {
206 return -1;
207 }
208 }
209
210 return 0;
211 }
212
213 /**
214 * nocomp_recv_setup: setup receive side
215 *
216 * For no compression this function does nothing.
217 *
218 * Returns 0 for success or -1 for error
219 *
220 * @p: Params for the channel that we are using
221 * @errp: pointer to an error
222 */
223 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
224 {
225 return 0;
226 }
227
228 /**
229 * nocomp_recv_cleanup: setup receive side
230 *
231 * For no compression this function does nothing.
232 *
233 * @p: Params for the channel that we are using
234 */
235 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
236 {
237 }
238
239 /**
240 * nocomp_recv: read the data from the channel
241 *
242 * For no compression we just need to read things into the correct place.
243 *
244 * Returns 0 for success or -1 for error
245 *
246 * @p: Params for the channel that we are using
247 * @errp: pointer to an error
248 */
249 static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
250 {
251 uint32_t flags;
252
253 if (!multifd_use_packets()) {
254 return multifd_file_recv_data(p, errp);
255 }
256
257 flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
258
259 if (flags != MULTIFD_FLAG_NOCOMP) {
260 error_setg(errp, "multifd %u: flags received %x flags expected %x",
261 p->id, flags, MULTIFD_FLAG_NOCOMP);
262 return -1;
263 }
264 for (int i = 0; i < p->normal_num; i++) {
265 p->iov[i].iov_base = p->host + p->normal[i];
266 p->iov[i].iov_len = p->page_size;
267 }
268 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
269 }
270
271 static MultiFDMethods multifd_nocomp_ops = {
272 .send_setup = nocomp_send_setup,
273 .send_cleanup = nocomp_send_cleanup,
274 .send_prepare = nocomp_send_prepare,
275 .recv_setup = nocomp_recv_setup,
276 .recv_cleanup = nocomp_recv_cleanup,
277 .recv = nocomp_recv
278 };
279
280 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
281 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
282 };
283
284 void multifd_register_ops(int method, MultiFDMethods *ops)
285 {
286 assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
287 multifd_ops[method] = ops;
288 }
289
290 /* Reset a MultiFDPages_t* object for the next use */
291 static void multifd_pages_reset(MultiFDPages_t *pages)
292 {
293 /*
294 * We don't need to touch offset[] array, because it will be
295 * overwritten later when reused.
296 */
297 pages->num = 0;
298 pages->block = NULL;
299 }
300
301 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
302 {
303 MultiFDInit_t msg = {};
304 size_t size = sizeof(msg);
305 int ret;
306
307 msg.magic = cpu_to_be32(MULTIFD_MAGIC);
308 msg.version = cpu_to_be32(MULTIFD_VERSION);
309 msg.id = p->id;
310 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
311
312 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
313 if (ret != 0) {
314 return -1;
315 }
316 stat64_add(&mig_stats.multifd_bytes, size);
317 return 0;
318 }
319
320 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
321 {
322 MultiFDInit_t msg;
323 int ret;
324
325 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
326 if (ret != 0) {
327 return -1;
328 }
329
330 msg.magic = be32_to_cpu(msg.magic);
331 msg.version = be32_to_cpu(msg.version);
332
333 if (msg.magic != MULTIFD_MAGIC) {
334 error_setg(errp, "multifd: received packet magic %x "
335 "expected %x", msg.magic, MULTIFD_MAGIC);
336 return -1;
337 }
338
339 if (msg.version != MULTIFD_VERSION) {
340 error_setg(errp, "multifd: received packet version %u "
341 "expected %u", msg.version, MULTIFD_VERSION);
342 return -1;
343 }
344
345 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
346 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
347 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
348
349 error_setg(errp, "multifd: received uuid '%s' and expected "
350 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
351 g_free(uuid);
352 g_free(msg_uuid);
353 return -1;
354 }
355
356 if (msg.id > migrate_multifd_channels()) {
357 error_setg(errp, "multifd: received channel id %u is greater than "
358 "number of channels %u", msg.id, migrate_multifd_channels());
359 return -1;
360 }
361
362 return msg.id;
363 }
364
365 static MultiFDPages_t *multifd_pages_init(uint32_t n)
366 {
367 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
368
369 pages->allocated = n;
370 pages->offset = g_new0(ram_addr_t, n);
371
372 return pages;
373 }
374
375 static void multifd_pages_clear(MultiFDPages_t *pages)
376 {
377 multifd_pages_reset(pages);
378 pages->allocated = 0;
379 g_free(pages->offset);
380 pages->offset = NULL;
381 g_free(pages);
382 }
383
384 void multifd_send_fill_packet(MultiFDSendParams *p)
385 {
386 MultiFDPacket_t *packet = p->packet;
387 MultiFDPages_t *pages = p->pages;
388 uint64_t packet_num;
389 int i;
390
391 packet->flags = cpu_to_be32(p->flags);
392 packet->pages_alloc = cpu_to_be32(p->pages->allocated);
393 packet->normal_pages = cpu_to_be32(pages->num);
394 packet->next_packet_size = cpu_to_be32(p->next_packet_size);
395
396 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
397 packet->packet_num = cpu_to_be64(packet_num);
398
399 if (pages->block) {
400 strncpy(packet->ramblock, pages->block->idstr, 256);
401 }
402
403 for (i = 0; i < pages->num; i++) {
404 /* there are architectures where ram_addr_t is 32 bit */
405 uint64_t temp = pages->offset[i];
406
407 packet->offset[i] = cpu_to_be64(temp);
408 }
409
410 p->packets_sent++;
411 p->total_normal_pages += pages->num;
412
413 trace_multifd_send(p->id, packet_num, pages->num, p->flags,
414 p->next_packet_size);
415 }
416
417 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
418 {
419 MultiFDPacket_t *packet = p->packet;
420 int i;
421
422 packet->magic = be32_to_cpu(packet->magic);
423 if (packet->magic != MULTIFD_MAGIC) {
424 error_setg(errp, "multifd: received packet "
425 "magic %x and expected magic %x",
426 packet->magic, MULTIFD_MAGIC);
427 return -1;
428 }
429
430 packet->version = be32_to_cpu(packet->version);
431 if (packet->version != MULTIFD_VERSION) {
432 error_setg(errp, "multifd: received packet "
433 "version %u and expected version %u",
434 packet->version, MULTIFD_VERSION);
435 return -1;
436 }
437
438 p->flags = be32_to_cpu(packet->flags);
439
440 packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
441 /*
442 * If we received a packet that is 100 times bigger than expected
443 * just stop migration. It is a magic number.
444 */
445 if (packet->pages_alloc > p->page_count) {
446 error_setg(errp, "multifd: received packet "
447 "with size %u and expected a size of %u",
448 packet->pages_alloc, p->page_count) ;
449 return -1;
450 }
451
452 p->normal_num = be32_to_cpu(packet->normal_pages);
453 if (p->normal_num > packet->pages_alloc) {
454 error_setg(errp, "multifd: received packet "
455 "with %u pages and expected maximum pages are %u",
456 p->normal_num, packet->pages_alloc) ;
457 return -1;
458 }
459
460 p->next_packet_size = be32_to_cpu(packet->next_packet_size);
461 p->packet_num = be64_to_cpu(packet->packet_num);
462 p->packets_recved++;
463 p->total_normal_pages += p->normal_num;
464
465 trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags,
466 p->next_packet_size);
467
468 if (p->normal_num == 0) {
469 return 0;
470 }
471
472 /* make sure that ramblock is 0 terminated */
473 packet->ramblock[255] = 0;
474 p->block = qemu_ram_block_by_name(packet->ramblock);
475 if (!p->block) {
476 error_setg(errp, "multifd: unknown ram block %s",
477 packet->ramblock);
478 return -1;
479 }
480
481 p->host = p->block->host;
482 for (i = 0; i < p->normal_num; i++) {
483 uint64_t offset = be64_to_cpu(packet->offset[i]);
484
485 if (offset > (p->block->used_length - p->page_size)) {
486 error_setg(errp, "multifd: offset too long %" PRIu64
487 " (max " RAM_ADDR_FMT ")",
488 offset, p->block->used_length);
489 return -1;
490 }
491 p->normal[i] = offset;
492 }
493
494 return 0;
495 }
496
497 static bool multifd_send_should_exit(void)
498 {
499 return qatomic_read(&multifd_send_state->exiting);
500 }
501
502 static bool multifd_recv_should_exit(void)
503 {
504 return qatomic_read(&multifd_recv_state->exiting);
505 }
506
507 /*
508 * The migration thread can wait on either of the two semaphores. This
509 * function can be used to kick the main thread out of waiting on either of
510 * them. Should mostly only be called when something wrong happened with
511 * the current multifd send thread.
512 */
513 static void multifd_send_kick_main(MultiFDSendParams *p)
514 {
515 qemu_sem_post(&p->sem_sync);
516 qemu_sem_post(&multifd_send_state->channels_ready);
517 }
518
519 /*
520 * How we use multifd_send_state->pages and channel->pages?
521 *
522 * We create a pages for each channel, and a main one. Each time that
523 * we need to send a batch of pages we interchange the ones between
524 * multifd_send_state and the channel that is sending it. There are
525 * two reasons for that:
526 * - to not have to do so many mallocs during migration
527 * - to make easier to know what to free at the end of migration
528 *
529 * This way we always know who is the owner of each "pages" struct,
530 * and we don't need any locking. It belongs to the migration thread
531 * or to the channel thread. Switching is safe because the migration
532 * thread is using the channel mutex when changing it, and the channel
533 * have to had finish with its own, otherwise pending_job can't be
534 * false.
535 *
536 * Returns true if succeed, false otherwise.
537 */
538 static bool multifd_send_pages(void)
539 {
540 int i;
541 static int next_channel;
542 MultiFDSendParams *p = NULL; /* make happy gcc */
543 MultiFDPages_t *pages = multifd_send_state->pages;
544
545 if (multifd_send_should_exit()) {
546 return false;
547 }
548
549 /* We wait here, until at least one channel is ready */
550 qemu_sem_wait(&multifd_send_state->channels_ready);
551
552 /*
553 * next_channel can remain from a previous migration that was
554 * using more channels, so ensure it doesn't overflow if the
555 * limit is lower now.
556 */
557 next_channel %= migrate_multifd_channels();
558 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
559 if (multifd_send_should_exit()) {
560 return false;
561 }
562 p = &multifd_send_state->params[i];
563 /*
564 * Lockless read to p->pending_job is safe, because only multifd
565 * sender thread can clear it.
566 */
567 if (qatomic_read(&p->pending_job) == false) {
568 next_channel = (i + 1) % migrate_multifd_channels();
569 break;
570 }
571 }
572
573 /*
574 * Make sure we read p->pending_job before all the rest. Pairs with
575 * qatomic_store_release() in multifd_send_thread().
576 */
577 smp_mb_acquire();
578 assert(!p->pages->num);
579 multifd_send_state->pages = p->pages;
580 p->pages = pages;
581 /*
582 * Making sure p->pages is setup before marking pending_job=true. Pairs
583 * with the qatomic_load_acquire() in multifd_send_thread().
584 */
585 qatomic_store_release(&p->pending_job, true);
586 qemu_sem_post(&p->sem);
587
588 return true;
589 }
590
591 static inline bool multifd_queue_empty(MultiFDPages_t *pages)
592 {
593 return pages->num == 0;
594 }
595
596 static inline bool multifd_queue_full(MultiFDPages_t *pages)
597 {
598 return pages->num == pages->allocated;
599 }
600
601 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
602 {
603 pages->offset[pages->num++] = offset;
604 }
605
606 /* Returns true if enqueue successful, false otherwise */
607 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
608 {
609 MultiFDPages_t *pages;
610
611 retry:
612 pages = multifd_send_state->pages;
613
614 /* If the queue is empty, we can already enqueue now */
615 if (multifd_queue_empty(pages)) {
616 pages->block = block;
617 multifd_enqueue(pages, offset);
618 return true;
619 }
620
621 /*
622 * Not empty, meanwhile we need a flush. It can because of either:
623 *
624 * (1) The page is not on the same ramblock of previous ones, or,
625 * (2) The queue is full.
626 *
627 * After flush, always retry.
628 */
629 if (pages->block != block || multifd_queue_full(pages)) {
630 if (!multifd_send_pages()) {
631 return false;
632 }
633 goto retry;
634 }
635
636 /* Not empty, and we still have space, do it! */
637 multifd_enqueue(pages, offset);
638 return true;
639 }
640
641 /* Multifd send side hit an error; remember it and prepare to quit */
642 static void multifd_send_set_error(Error *err)
643 {
644 /*
645 * We don't want to exit each threads twice. Depending on where
646 * we get the error, or if there are two independent errors in two
647 * threads at the same time, we can end calling this function
648 * twice.
649 */
650 if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
651 return;
652 }
653
654 if (err) {
655 MigrationState *s = migrate_get_current();
656 migrate_set_error(s, err);
657 if (s->state == MIGRATION_STATUS_SETUP ||
658 s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
659 s->state == MIGRATION_STATUS_DEVICE ||
660 s->state == MIGRATION_STATUS_ACTIVE) {
661 migrate_set_state(&s->state, s->state,
662 MIGRATION_STATUS_FAILED);
663 }
664 }
665 }
666
667 static void multifd_send_terminate_threads(void)
668 {
669 int i;
670
671 trace_multifd_send_terminate_threads();
672
673 /*
674 * Tell everyone we're quitting. No xchg() needed here; we simply
675 * always set it.
676 */
677 qatomic_set(&multifd_send_state->exiting, 1);
678
679 /*
680 * Firstly, kick all threads out; no matter whether they are just idle,
681 * or blocked in an IO system call.
682 */
683 for (i = 0; i < migrate_multifd_channels(); i++) {
684 MultiFDSendParams *p = &multifd_send_state->params[i];
685
686 qemu_sem_post(&p->sem);
687 if (p->c) {
688 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
689 }
690 }
691
692 /*
693 * Finally recycle all the threads.
694 */
695 for (i = 0; i < migrate_multifd_channels(); i++) {
696 MultiFDSendParams *p = &multifd_send_state->params[i];
697
698 if (p->tls_thread_created) {
699 qemu_thread_join(&p->tls_thread);
700 }
701
702 if (p->thread_created) {
703 qemu_thread_join(&p->thread);
704 }
705 }
706 }
707
708 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
709 {
710 if (p->c) {
711 migration_ioc_unregister_yank(p->c);
712 /*
713 * The object_unref() cannot guarantee the fd will always be
714 * released because finalize() of the iochannel is only
715 * triggered on the last reference and it's not guaranteed
716 * that we always hold the last refcount when reaching here.
717 *
718 * Closing the fd explicitly has the benefit that if there is any
719 * registered I/O handler callbacks on such fd, that will get a
720 * POLLNVAL event and will further trigger the cleanup to finally
721 * release the IOC.
722 *
723 * FIXME: It should logically be guaranteed that all multifd
724 * channels have no I/O handler callback registered when reaching
725 * here, because migration thread will wait for all multifd channel
726 * establishments to complete during setup. Since
727 * migrate_fd_cleanup() will be scheduled in main thread too, all
728 * previous callbacks should guarantee to be completed when
729 * reaching here. See multifd_send_state.channels_created and its
730 * usage. In the future, we could replace this with an assert
731 * making sure we're the last reference, or simply drop it if above
732 * is more clear to be justified.
733 */
734 qio_channel_close(p->c, &error_abort);
735 object_unref(OBJECT(p->c));
736 p->c = NULL;
737 }
738 qemu_sem_destroy(&p->sem);
739 qemu_sem_destroy(&p->sem_sync);
740 g_free(p->name);
741 p->name = NULL;
742 multifd_pages_clear(p->pages);
743 p->pages = NULL;
744 p->packet_len = 0;
745 g_free(p->packet);
746 p->packet = NULL;
747 g_free(p->iov);
748 p->iov = NULL;
749 multifd_send_state->ops->send_cleanup(p, errp);
750
751 return *errp == NULL;
752 }
753
754 static void multifd_send_cleanup_state(void)
755 {
756 file_cleanup_outgoing_migration();
757 fd_cleanup_outgoing_migration();
758 socket_cleanup_outgoing_migration();
759 qemu_sem_destroy(&multifd_send_state->channels_created);
760 qemu_sem_destroy(&multifd_send_state->channels_ready);
761 g_free(multifd_send_state->params);
762 multifd_send_state->params = NULL;
763 multifd_pages_clear(multifd_send_state->pages);
764 multifd_send_state->pages = NULL;
765 g_free(multifd_send_state);
766 multifd_send_state = NULL;
767 }
768
769 void multifd_send_shutdown(void)
770 {
771 int i;
772
773 if (!migrate_multifd()) {
774 return;
775 }
776
777 multifd_send_terminate_threads();
778
779 for (i = 0; i < migrate_multifd_channels(); i++) {
780 MultiFDSendParams *p = &multifd_send_state->params[i];
781 Error *local_err = NULL;
782
783 if (!multifd_send_cleanup_channel(p, &local_err)) {
784 migrate_set_error(migrate_get_current(), local_err);
785 error_free(local_err);
786 }
787 }
788
789 multifd_send_cleanup_state();
790 }
791
792 static int multifd_zero_copy_flush(QIOChannel *c)
793 {
794 int ret;
795 Error *err = NULL;
796
797 ret = qio_channel_flush(c, &err);
798 if (ret < 0) {
799 error_report_err(err);
800 return -1;
801 }
802 if (ret == 1) {
803 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
804 }
805
806 return ret;
807 }
808
809 int multifd_send_sync_main(void)
810 {
811 int i;
812 bool flush_zero_copy;
813
814 if (!migrate_multifd()) {
815 return 0;
816 }
817 if (multifd_send_state->pages->num) {
818 if (!multifd_send_pages()) {
819 error_report("%s: multifd_send_pages fail", __func__);
820 return -1;
821 }
822 }
823
824 flush_zero_copy = migrate_zero_copy_send();
825
826 for (i = 0; i < migrate_multifd_channels(); i++) {
827 MultiFDSendParams *p = &multifd_send_state->params[i];
828
829 if (multifd_send_should_exit()) {
830 return -1;
831 }
832
833 trace_multifd_send_sync_main_signal(p->id);
834
835 /*
836 * We should be the only user so far, so not possible to be set by
837 * others concurrently.
838 */
839 assert(qatomic_read(&p->pending_sync) == false);
840 qatomic_set(&p->pending_sync, true);
841 qemu_sem_post(&p->sem);
842 }
843 for (i = 0; i < migrate_multifd_channels(); i++) {
844 MultiFDSendParams *p = &multifd_send_state->params[i];
845
846 if (multifd_send_should_exit()) {
847 return -1;
848 }
849
850 qemu_sem_wait(&multifd_send_state->channels_ready);
851 trace_multifd_send_sync_main_wait(p->id);
852 qemu_sem_wait(&p->sem_sync);
853
854 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
855 return -1;
856 }
857 }
858 trace_multifd_send_sync_main(multifd_send_state->packet_num);
859
860 return 0;
861 }
862
863 static void *multifd_send_thread(void *opaque)
864 {
865 MultiFDSendParams *p = opaque;
866 MigrationThread *thread = NULL;
867 Error *local_err = NULL;
868 int ret = 0;
869 bool use_packets = multifd_use_packets();
870
871 thread = migration_threads_add(p->name, qemu_get_thread_id());
872
873 trace_multifd_send_thread_start(p->id);
874 rcu_register_thread();
875
876 if (use_packets) {
877 if (multifd_send_initial_packet(p, &local_err) < 0) {
878 ret = -1;
879 goto out;
880 }
881 }
882
883 while (true) {
884 qemu_sem_post(&multifd_send_state->channels_ready);
885 qemu_sem_wait(&p->sem);
886
887 if (multifd_send_should_exit()) {
888 break;
889 }
890
891 /*
892 * Read pending_job flag before p->pages. Pairs with the
893 * qatomic_store_release() in multifd_send_pages().
894 */
895 if (qatomic_load_acquire(&p->pending_job)) {
896 MultiFDPages_t *pages = p->pages;
897
898 p->iovs_num = 0;
899 assert(pages->num);
900
901 ret = multifd_send_state->ops->send_prepare(p, &local_err);
902 if (ret != 0) {
903 break;
904 }
905
906 if (migrate_mapped_ram()) {
907 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
908 p->pages->block, &local_err);
909 } else {
910 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
911 NULL, 0, p->write_flags,
912 &local_err);
913 }
914
915 if (ret != 0) {
916 break;
917 }
918
919 stat64_add(&mig_stats.multifd_bytes,
920 p->next_packet_size + p->packet_len);
921
922 multifd_pages_reset(p->pages);
923 p->next_packet_size = 0;
924
925 /*
926 * Making sure p->pages is published before saying "we're
927 * free". Pairs with the smp_mb_acquire() in
928 * multifd_send_pages().
929 */
930 qatomic_store_release(&p->pending_job, false);
931 } else {
932 /*
933 * If not a normal job, must be a sync request. Note that
934 * pending_sync is a standalone flag (unlike pending_job), so
935 * it doesn't require explicit memory barriers.
936 */
937 assert(qatomic_read(&p->pending_sync));
938
939 if (use_packets) {
940 p->flags = MULTIFD_FLAG_SYNC;
941 multifd_send_fill_packet(p);
942 ret = qio_channel_write_all(p->c, (void *)p->packet,
943 p->packet_len, &local_err);
944 if (ret != 0) {
945 break;
946 }
947 /* p->next_packet_size will always be zero for a SYNC packet */
948 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
949 p->flags = 0;
950 }
951
952 qatomic_set(&p->pending_sync, false);
953 qemu_sem_post(&p->sem_sync);
954 }
955 }
956
957 out:
958 if (ret) {
959 assert(local_err);
960 trace_multifd_send_error(p->id);
961 multifd_send_set_error(local_err);
962 multifd_send_kick_main(p);
963 error_free(local_err);
964 }
965
966 rcu_unregister_thread();
967 migration_threads_remove(thread);
968 trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages);
969
970 return NULL;
971 }
972
973 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
974
975 typedef struct {
976 MultiFDSendParams *p;
977 QIOChannelTLS *tioc;
978 } MultiFDTLSThreadArgs;
979
980 static void *multifd_tls_handshake_thread(void *opaque)
981 {
982 MultiFDTLSThreadArgs *args = opaque;
983
984 qio_channel_tls_handshake(args->tioc,
985 multifd_new_send_channel_async,
986 args->p,
987 NULL,
988 NULL);
989 g_free(args);
990
991 return NULL;
992 }
993
994 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
995 QIOChannel *ioc,
996 Error **errp)
997 {
998 MigrationState *s = migrate_get_current();
999 const char *hostname = s->hostname;
1000 MultiFDTLSThreadArgs *args;
1001 QIOChannelTLS *tioc;
1002
1003 tioc = migration_tls_client_create(ioc, hostname, errp);
1004 if (!tioc) {
1005 return false;
1006 }
1007
1008 /*
1009 * Ownership of the socket channel now transfers to the newly
1010 * created TLS channel, which has already taken a reference.
1011 */
1012 object_unref(OBJECT(ioc));
1013 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
1014 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
1015
1016 args = g_new0(MultiFDTLSThreadArgs, 1);
1017 args->tioc = tioc;
1018 args->p = p;
1019
1020 p->tls_thread_created = true;
1021 qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker",
1022 multifd_tls_handshake_thread, args,
1023 QEMU_THREAD_JOINABLE);
1024 return true;
1025 }
1026
1027 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
1028 {
1029 qio_channel_set_delay(ioc, false);
1030
1031 migration_ioc_register_yank(ioc);
1032 /* Setup p->c only if the channel is completely setup */
1033 p->c = ioc;
1034
1035 p->thread_created = true;
1036 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1037 QEMU_THREAD_JOINABLE);
1038 }
1039
1040 /*
1041 * When TLS is enabled this function is called once to establish the
1042 * TLS connection and a second time after the TLS handshake to create
1043 * the multifd channel. Without TLS it goes straight into the channel
1044 * creation.
1045 */
1046 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1047 {
1048 MultiFDSendParams *p = opaque;
1049 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
1050 Error *local_err = NULL;
1051 bool ret;
1052
1053 trace_multifd_new_send_channel_async(p->id);
1054
1055 if (qio_task_propagate_error(task, &local_err)) {
1056 ret = false;
1057 goto out;
1058 }
1059
1060 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
1061 migrate_get_current()->hostname);
1062
1063 if (migrate_channel_requires_tls_upgrade(ioc)) {
1064 ret = multifd_tls_channel_connect(p, ioc, &local_err);
1065 if (ret) {
1066 return;
1067 }
1068 } else {
1069 multifd_channel_connect(p, ioc);
1070 ret = true;
1071 }
1072
1073 out:
1074 /*
1075 * Here we're not interested whether creation succeeded, only that
1076 * it happened at all.
1077 */
1078 multifd_send_channel_created();
1079
1080 if (ret) {
1081 return;
1082 }
1083
1084 trace_multifd_new_send_channel_async_error(p->id, local_err);
1085 multifd_send_set_error(local_err);
1086 /*
1087 * For error cases (TLS or non-TLS), IO channel is always freed here
1088 * rather than when cleanup multifd: since p->c is not set, multifd
1089 * cleanup code doesn't even know its existence.
1090 */
1091 object_unref(OBJECT(ioc));
1092 error_free(local_err);
1093 }
1094
1095 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
1096 {
1097 if (!multifd_use_packets()) {
1098 return file_send_channel_create(opaque, errp);
1099 }
1100
1101 socket_send_channel_create(multifd_new_send_channel_async, opaque);
1102 return true;
1103 }
1104
1105 bool multifd_send_setup(void)
1106 {
1107 MigrationState *s = migrate_get_current();
1108 Error *local_err = NULL;
1109 int thread_count, ret = 0;
1110 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1111 bool use_packets = multifd_use_packets();
1112 uint8_t i;
1113
1114 if (!migrate_multifd()) {
1115 return true;
1116 }
1117
1118 thread_count = migrate_multifd_channels();
1119 multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1120 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1121 multifd_send_state->pages = multifd_pages_init(page_count);
1122 qemu_sem_init(&multifd_send_state->channels_created, 0);
1123 qemu_sem_init(&multifd_send_state->channels_ready, 0);
1124 qatomic_set(&multifd_send_state->exiting, 0);
1125 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
1126
1127 for (i = 0; i < thread_count; i++) {
1128 MultiFDSendParams *p = &multifd_send_state->params[i];
1129
1130 qemu_sem_init(&p->sem, 0);
1131 qemu_sem_init(&p->sem_sync, 0);
1132 p->id = i;
1133 p->pages = multifd_pages_init(page_count);
1134
1135 if (use_packets) {
1136 p->packet_len = sizeof(MultiFDPacket_t)
1137 + sizeof(uint64_t) * page_count;
1138 p->packet = g_malloc0(p->packet_len);
1139 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1140 p->packet->version = cpu_to_be32(MULTIFD_VERSION);
1141
1142 /* We need one extra place for the packet header */
1143 p->iov = g_new0(struct iovec, page_count + 1);
1144 } else {
1145 p->iov = g_new0(struct iovec, page_count);
1146 }
1147 p->name = g_strdup_printf("multifdsend_%d", i);
1148 p->page_size = qemu_target_page_size();
1149 p->page_count = page_count;
1150 p->write_flags = 0;
1151
1152 if (!multifd_new_send_channel_create(p, &local_err)) {
1153 return false;
1154 }
1155 }
1156
1157 /*
1158 * Wait until channel creation has started for all channels. The
1159 * creation can still fail, but no more channels will be created
1160 * past this point.
1161 */
1162 for (i = 0; i < thread_count; i++) {
1163 qemu_sem_wait(&multifd_send_state->channels_created);
1164 }
1165
1166 for (i = 0; i < thread_count; i++) {
1167 MultiFDSendParams *p = &multifd_send_state->params[i];
1168
1169 ret = multifd_send_state->ops->send_setup(p, &local_err);
1170 if (ret) {
1171 break;
1172 }
1173 }
1174
1175 if (ret) {
1176 migrate_set_error(s, local_err);
1177 error_report_err(local_err);
1178 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
1179 MIGRATION_STATUS_FAILED);
1180 return false;
1181 }
1182
1183 return true;
1184 }
1185
1186 bool multifd_recv(void)
1187 {
1188 int i;
1189 static int next_recv_channel;
1190 MultiFDRecvParams *p = NULL;
1191 MultiFDRecvData *data = multifd_recv_state->data;
1192
1193 /*
1194 * next_channel can remain from a previous migration that was
1195 * using more channels, so ensure it doesn't overflow if the
1196 * limit is lower now.
1197 */
1198 next_recv_channel %= migrate_multifd_channels();
1199 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1200 if (multifd_recv_should_exit()) {
1201 return false;
1202 }
1203
1204 p = &multifd_recv_state->params[i];
1205
1206 if (qatomic_read(&p->pending_job) == false) {
1207 next_recv_channel = (i + 1) % migrate_multifd_channels();
1208 break;
1209 }
1210 }
1211
1212 /*
1213 * Order pending_job read before manipulating p->data below. Pairs
1214 * with qatomic_store_release() at multifd_recv_thread().
1215 */
1216 smp_mb_acquire();
1217
1218 assert(!p->data->size);
1219 multifd_recv_state->data = p->data;
1220 p->data = data;
1221
1222 /*
1223 * Order p->data update before setting pending_job. Pairs with
1224 * qatomic_load_acquire() at multifd_recv_thread().
1225 */
1226 qatomic_store_release(&p->pending_job, true);
1227 qemu_sem_post(&p->sem);
1228
1229 return true;
1230 }
1231
1232 MultiFDRecvData *multifd_get_recv_data(void)
1233 {
1234 return multifd_recv_state->data;
1235 }
1236
1237 static void multifd_recv_terminate_threads(Error *err)
1238 {
1239 int i;
1240
1241 trace_multifd_recv_terminate_threads(err != NULL);
1242
1243 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1244 return;
1245 }
1246
1247 if (err) {
1248 MigrationState *s = migrate_get_current();
1249 migrate_set_error(s, err);
1250 if (s->state == MIGRATION_STATUS_SETUP ||
1251 s->state == MIGRATION_STATUS_ACTIVE) {
1252 migrate_set_state(&s->state, s->state,
1253 MIGRATION_STATUS_FAILED);
1254 }
1255 }
1256
1257 for (i = 0; i < migrate_multifd_channels(); i++) {
1258 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1259
1260 /*
1261 * The migration thread and channels interact differently
1262 * depending on the presence of packets.
1263 */
1264 if (multifd_use_packets()) {
1265 /*
1266 * The channel receives as long as there are packets. When
1267 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1268 * channel waits for the migration thread to sync. If the
1269 * sync never happens, do it here.
1270 */
1271 qemu_sem_post(&p->sem_sync);
1272 } else {
1273 /*
1274 * The channel waits for the migration thread to give it
1275 * work. When the migration thread runs out of work, it
1276 * releases the channel and waits for any pending work to
1277 * finish. If we reach here (e.g. due to error) before the
1278 * work runs out, release the channel.
1279 */
1280 qemu_sem_post(&p->sem);
1281 }
1282
1283 /*
1284 * We could arrive here for two reasons:
1285 * - normal quit, i.e. everything went fine, just finished
1286 * - error quit: We close the channels so the channel threads
1287 * finish the qio_channel_read_all_eof()
1288 */
1289 if (p->c) {
1290 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1291 }
1292 }
1293 }
1294
1295 void multifd_recv_shutdown(void)
1296 {
1297 if (migrate_multifd()) {
1298 multifd_recv_terminate_threads(NULL);
1299 }
1300 }
1301
1302 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1303 {
1304 migration_ioc_unregister_yank(p->c);
1305 object_unref(OBJECT(p->c));
1306 p->c = NULL;
1307 qemu_mutex_destroy(&p->mutex);
1308 qemu_sem_destroy(&p->sem_sync);
1309 qemu_sem_destroy(&p->sem);
1310 g_free(p->name);
1311 p->name = NULL;
1312 p->packet_len = 0;
1313 g_free(p->packet);
1314 p->packet = NULL;
1315 g_free(p->iov);
1316 p->iov = NULL;
1317 g_free(p->normal);
1318 p->normal = NULL;
1319 multifd_recv_state->ops->recv_cleanup(p);
1320 }
1321
1322 static void multifd_recv_cleanup_state(void)
1323 {
1324 qemu_sem_destroy(&multifd_recv_state->sem_sync);
1325 g_free(multifd_recv_state->params);
1326 multifd_recv_state->params = NULL;
1327 g_free(multifd_recv_state->data);
1328 multifd_recv_state->data = NULL;
1329 g_free(multifd_recv_state);
1330 multifd_recv_state = NULL;
1331 }
1332
1333 void multifd_recv_cleanup(void)
1334 {
1335 int i;
1336
1337 if (!migrate_multifd()) {
1338 return;
1339 }
1340 multifd_recv_terminate_threads(NULL);
1341 for (i = 0; i < migrate_multifd_channels(); i++) {
1342 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1343
1344 if (p->thread_created) {
1345 qemu_thread_join(&p->thread);
1346 }
1347 }
1348 for (i = 0; i < migrate_multifd_channels(); i++) {
1349 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1350 }
1351 multifd_recv_cleanup_state();
1352 }
1353
1354 void multifd_recv_sync_main(void)
1355 {
1356 int thread_count = migrate_multifd_channels();
1357 bool file_based = !multifd_use_packets();
1358 int i;
1359
1360 if (!migrate_multifd()) {
1361 return;
1362 }
1363
1364 /*
1365 * File-based channels don't use packets and therefore need to
1366 * wait for more work. Release them to start the sync.
1367 */
1368 if (file_based) {
1369 for (i = 0; i < thread_count; i++) {
1370 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1371
1372 trace_multifd_recv_sync_main_signal(p->id);
1373 qemu_sem_post(&p->sem);
1374 }
1375 }
1376
1377 /*
1378 * Initiate the synchronization by waiting for all channels.
1379 *
1380 * For socket-based migration this means each channel has received
1381 * the SYNC packet on the stream.
1382 *
1383 * For file-based migration this means each channel is done with
1384 * the work (pending_job=false).
1385 */
1386 for (i = 0; i < thread_count; i++) {
1387 trace_multifd_recv_sync_main_wait(i);
1388 qemu_sem_wait(&multifd_recv_state->sem_sync);
1389 }
1390
1391 if (file_based) {
1392 /*
1393 * For file-based loading is done in one iteration. We're
1394 * done.
1395 */
1396 return;
1397 }
1398
1399 /*
1400 * Sync done. Release the channels for the next iteration.
1401 */
1402 for (i = 0; i < thread_count; i++) {
1403 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1404
1405 WITH_QEMU_LOCK_GUARD(&p->mutex) {
1406 if (multifd_recv_state->packet_num < p->packet_num) {
1407 multifd_recv_state->packet_num = p->packet_num;
1408 }
1409 }
1410 trace_multifd_recv_sync_main_signal(p->id);
1411 qemu_sem_post(&p->sem_sync);
1412 }
1413 trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1414 }
1415
1416 static void *multifd_recv_thread(void *opaque)
1417 {
1418 MultiFDRecvParams *p = opaque;
1419 Error *local_err = NULL;
1420 bool use_packets = multifd_use_packets();
1421 int ret;
1422
1423 trace_multifd_recv_thread_start(p->id);
1424 rcu_register_thread();
1425
1426 while (true) {
1427 uint32_t flags = 0;
1428 bool has_data = false;
1429 p->normal_num = 0;
1430
1431 if (use_packets) {
1432 if (multifd_recv_should_exit()) {
1433 break;
1434 }
1435
1436 ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1437 p->packet_len, &local_err);
1438 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
1439 break;
1440 }
1441
1442 qemu_mutex_lock(&p->mutex);
1443 ret = multifd_recv_unfill_packet(p, &local_err);
1444 if (ret) {
1445 qemu_mutex_unlock(&p->mutex);
1446 break;
1447 }
1448
1449 flags = p->flags;
1450 /* recv methods don't know how to handle the SYNC flag */
1451 p->flags &= ~MULTIFD_FLAG_SYNC;
1452 has_data = !!p->normal_num;
1453 qemu_mutex_unlock(&p->mutex);
1454 } else {
1455 /*
1456 * No packets, so we need to wait for the vmstate code to
1457 * give us work.
1458 */
1459 qemu_sem_wait(&p->sem);
1460
1461 if (multifd_recv_should_exit()) {
1462 break;
1463 }
1464
1465 /* pairs with qatomic_store_release() at multifd_recv() */
1466 if (!qatomic_load_acquire(&p->pending_job)) {
1467 /*
1468 * Migration thread did not send work, this is
1469 * equivalent to pending_sync on the sending
1470 * side. Post sem_sync to notify we reached this
1471 * point.
1472 */
1473 qemu_sem_post(&multifd_recv_state->sem_sync);
1474 continue;
1475 }
1476
1477 has_data = !!p->data->size;
1478 }
1479
1480 if (has_data) {
1481 ret = multifd_recv_state->ops->recv(p, &local_err);
1482 if (ret != 0) {
1483 break;
1484 }
1485 }
1486
1487 if (use_packets) {
1488 if (flags & MULTIFD_FLAG_SYNC) {
1489 qemu_sem_post(&multifd_recv_state->sem_sync);
1490 qemu_sem_wait(&p->sem_sync);
1491 }
1492 } else {
1493 p->total_normal_pages += p->data->size / qemu_target_page_size();
1494 p->data->size = 0;
1495 /*
1496 * Order data->size update before clearing
1497 * pending_job. Pairs with smp_mb_acquire() at
1498 * multifd_recv().
1499 */
1500 qatomic_store_release(&p->pending_job, false);
1501 }
1502 }
1503
1504 if (local_err) {
1505 multifd_recv_terminate_threads(local_err);
1506 error_free(local_err);
1507 }
1508
1509 rcu_unregister_thread();
1510 trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages);
1511
1512 return NULL;
1513 }
1514
1515 int multifd_recv_setup(Error **errp)
1516 {
1517 int thread_count;
1518 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1519 bool use_packets = multifd_use_packets();
1520 uint8_t i;
1521
1522 /*
1523 * Return successfully if multiFD recv state is already initialised
1524 * or multiFD is not enabled.
1525 */
1526 if (multifd_recv_state || !migrate_multifd()) {
1527 return 0;
1528 }
1529
1530 thread_count = migrate_multifd_channels();
1531 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1532 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1533
1534 multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1535 multifd_recv_state->data->size = 0;
1536
1537 qatomic_set(&multifd_recv_state->count, 0);
1538 qatomic_set(&multifd_recv_state->exiting, 0);
1539 qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1540 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1541
1542 for (i = 0; i < thread_count; i++) {
1543 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1544
1545 qemu_mutex_init(&p->mutex);
1546 qemu_sem_init(&p->sem_sync, 0);
1547 qemu_sem_init(&p->sem, 0);
1548 p->pending_job = false;
1549 p->id = i;
1550
1551 p->data = g_new0(MultiFDRecvData, 1);
1552 p->data->size = 0;
1553
1554 if (use_packets) {
1555 p->packet_len = sizeof(MultiFDPacket_t)
1556 + sizeof(uint64_t) * page_count;
1557 p->packet = g_malloc0(p->packet_len);
1558 }
1559 p->name = g_strdup_printf("multifdrecv_%d", i);
1560 p->iov = g_new0(struct iovec, page_count);
1561 p->normal = g_new0(ram_addr_t, page_count);
1562 p->page_count = page_count;
1563 p->page_size = qemu_target_page_size();
1564 }
1565
1566 for (i = 0; i < thread_count; i++) {
1567 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1568 int ret;
1569
1570 ret = multifd_recv_state->ops->recv_setup(p, errp);
1571 if (ret) {
1572 return ret;
1573 }
1574 }
1575 return 0;
1576 }
1577
1578 bool multifd_recv_all_channels_created(void)
1579 {
1580 int thread_count = migrate_multifd_channels();
1581
1582 if (!migrate_multifd()) {
1583 return true;
1584 }
1585
1586 if (!multifd_recv_state) {
1587 /* Called before any connections created */
1588 return false;
1589 }
1590
1591 return thread_count == qatomic_read(&multifd_recv_state->count);
1592 }
1593
1594 /*
1595 * Try to receive all multifd channels to get ready for the migration.
1596 * Sets @errp when failing to receive the current channel.
1597 */
1598 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1599 {
1600 MultiFDRecvParams *p;
1601 Error *local_err = NULL;
1602 bool use_packets = multifd_use_packets();
1603 int id;
1604
1605 if (use_packets) {
1606 id = multifd_recv_initial_packet(ioc, &local_err);
1607 if (id < 0) {
1608 multifd_recv_terminate_threads(local_err);
1609 error_propagate_prepend(errp, local_err,
1610 "failed to receive packet"
1611 " via multifd channel %d: ",
1612 qatomic_read(&multifd_recv_state->count));
1613 return;
1614 }
1615 trace_multifd_recv_new_channel(id);
1616 } else {
1617 id = qatomic_read(&multifd_recv_state->count);
1618 }
1619
1620 p = &multifd_recv_state->params[id];
1621 if (p->c != NULL) {
1622 error_setg(&local_err, "multifd: received id '%d' already setup'",
1623 id);
1624 multifd_recv_terminate_threads(local_err);
1625 error_propagate(errp, local_err);
1626 return;
1627 }
1628 p->c = ioc;
1629 object_ref(OBJECT(ioc));
1630
1631 p->thread_created = true;
1632 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1633 QEMU_THREAD_JOINABLE);
1634 qatomic_inc(&multifd_recv_state->count);
1635 }