]> git.proxmox.com Git - mirror_qemu.git/blame - migration/multifd.c
migration/multifd: Allow clearing of the file_bmap from multifd
[mirror_qemu.git] / migration / multifd.c
CommitLineData
d32ca5ad
JQ
1/*
2 * Multifd common code
3 *
4 * Copyright (c) 2019-2020 Red Hat Inc
5 *
6 * Authors:
7 * Juan Quintela <quintela@redhat.com>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
11 */
12
13#include "qemu/osdep.h"
14#include "qemu/rcu.h"
15#include "exec/target_page.h"
16#include "sysemu/sysemu.h"
17#include "exec/ramblock.h"
18#include "qemu/error-report.h"
19#include "qapi/error.h"
decdc767 20#include "fd.h"
b7b03eb6 21#include "file.h"
d32ca5ad 22#include "migration.h"
947701cc 23#include "migration-stats.h"
d32ca5ad 24#include "socket.h"
29647140 25#include "tls.h"
d32ca5ad
JQ
26#include "qemu-file.h"
27#include "trace.h"
28#include "multifd.h"
1b1f4ab6 29#include "threadinfo.h"
b4bc342c 30#include "options.h"
b5eea99e 31#include "qemu/yank.h"
b7b03eb6 32#include "io/channel-file.h"
b5eea99e 33#include "io/channel-socket.h"
1a92d6d5 34#include "yank_functions.h"
b5eea99e 35
d32ca5ad
JQ
36/* Multiple fd's */
37
38#define MULTIFD_MAGIC 0x11223344U
39#define MULTIFD_VERSION 1
40
41typedef struct {
42 uint32_t magic;
43 uint32_t version;
44 unsigned char uuid[16]; /* QemuUUID */
45 uint8_t id;
46 uint8_t unused1[7]; /* Reserved for future use */
47 uint64_t unused2[4]; /* Reserved for future use */
48} __attribute__((packed)) MultiFDInit_t;
49
98ea497d
PX
50struct {
51 MultiFDSendParams *params;
52 /* array of pages to sent */
53 MultiFDPages_t *pages;
54 /*
55 * Global number of generated multifd packets.
56 *
57 * Note that we used 'uintptr_t' because it'll naturally support atomic
58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems
59 * multifd will overflow the packet_num easier, but that should be
60 * fine.
61 *
62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63 * hosts, however so far it does not support atomic fetch_add() yet.
64 * Make it easy for now.
65 */
66 uintptr_t packet_num;
93fa9dc2
FR
67 /*
68 * Synchronization point past which no more channels will be
69 * created.
70 */
71 QemuSemaphore channels_created;
98ea497d
PX
72 /* send channels ready */
73 QemuSemaphore channels_ready;
74 /*
75 * Have we already run terminate threads. There is a race when it
76 * happens that we got one error while we are exiting.
77 * We will use atomic operations. Only valid values are 0 and 1.
78 */
79 int exiting;
80 /* multifd ops */
81 MultiFDMethods *ops;
82} *multifd_send_state;
83
11dd7be5
FR
84struct {
85 MultiFDRecvParams *params;
d117ed06 86 MultiFDRecvData *data;
11dd7be5
FR
87 /* number of created threads */
88 int count;
d117ed06
FR
89 /*
90 * This is always posted by the recv threads, the migration thread
91 * uses it to wait for recv threads to finish assigned tasks.
92 */
11dd7be5
FR
93 QemuSemaphore sem_sync;
94 /* global number of generated multifd packets */
95 uint64_t packet_num;
96 int exiting;
97 /* multifd ops */
98 MultiFDMethods *ops;
99} *multifd_recv_state;
100
06833d83
FR
101static bool multifd_use_packets(void)
102{
103 return !migrate_mapped_ram();
104}
105
a8a3e710
FR
106void multifd_send_channel_created(void)
107{
108 qemu_sem_post(&multifd_send_state->channels_created);
109}
110
f427d90b
FR
111static void multifd_set_file_bitmap(MultiFDSendParams *p)
112{
113 MultiFDPages_t *pages = p->pages;
114
115 assert(pages->block);
116
117 for (int i = 0; i < p->pages->num; i++) {
c3cdf3fb 118 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true);
f427d90b
FR
119 }
120}
121
ab7cbb0b
JQ
122/* Multifd without compression */
123
124/**
125 * nocomp_send_setup: setup send side
126 *
ab7cbb0b
JQ
127 * @p: Params for the channel that we are using
128 * @errp: pointer to an error
129 */
130static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
131{
25a1f878
PX
132 if (migrate_zero_copy_send()) {
133 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
134 }
135
ab7cbb0b
JQ
136 return 0;
137}
138
139/**
140 * nocomp_send_cleanup: cleanup send side
141 *
142 * For no compression this function does nothing.
143 *
144 * @p: Params for the channel that we are using
18ede636 145 * @errp: pointer to an error
ab7cbb0b
JQ
146 */
147static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
148{
149 return;
150}
151
06833d83
FR
152static void multifd_send_prepare_iovs(MultiFDSendParams *p)
153{
154 MultiFDPages_t *pages = p->pages;
155
156 for (int i = 0; i < pages->num; i++) {
157 p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
158 p->iov[p->iovs_num].iov_len = p->page_size;
159 p->iovs_num++;
160 }
161
162 p->next_packet_size = pages->num * p->page_size;
163}
164
ab7cbb0b
JQ
165/**
166 * nocomp_send_prepare: prepare date to be able to send
167 *
168 * For no compression we just have to calculate the size of the
169 * packet.
170 *
171 * Returns 0 for success or -1 for error
172 *
173 * @p: Params for the channel that we are using
ab7cbb0b
JQ
174 * @errp: pointer to an error
175 */
02fb8104 176static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
ab7cbb0b 177{
25a1f878 178 bool use_zero_copy_send = migrate_zero_copy_send();
25a1f878
PX
179 int ret;
180
06833d83
FR
181 if (!multifd_use_packets()) {
182 multifd_send_prepare_iovs(p);
f427d90b
FR
183 multifd_set_file_bitmap(p);
184
06833d83
FR
185 return 0;
186 }
187
25a1f878
PX
188 if (!use_zero_copy_send) {
189 /*
190 * Only !zerocopy needs the header in IOV; zerocopy will
191 * send it separately.
192 */
193 multifd_send_prepare_header(p);
194 }
226468ba 195
06833d83 196 multifd_send_prepare_iovs(p);
ab7cbb0b 197 p->flags |= MULTIFD_FLAG_NOCOMP;
25a1f878
PX
198
199 multifd_send_fill_packet(p);
200
201 if (use_zero_copy_send) {
202 /* Send header first, without zerocopy */
203 ret = qio_channel_write_all(p->c, (void *)p->packet,
204 p->packet_len, errp);
205 if (ret != 0) {
206 return -1;
207 }
208 }
209
ab7cbb0b
JQ
210 return 0;
211}
212
ab7cbb0b
JQ
213/**
214 * nocomp_recv_setup: setup receive side
215 *
216 * For no compression this function does nothing.
217 *
218 * Returns 0 for success or -1 for error
219 *
220 * @p: Params for the channel that we are using
221 * @errp: pointer to an error
222 */
223static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
224{
225 return 0;
226}
227
228/**
229 * nocomp_recv_cleanup: setup receive side
230 *
231 * For no compression this function does nothing.
232 *
233 * @p: Params for the channel that we are using
234 */
235static void nocomp_recv_cleanup(MultiFDRecvParams *p)
236{
237}
238
239/**
9db19125 240 * nocomp_recv: read the data from the channel
ab7cbb0b
JQ
241 *
242 * For no compression we just need to read things into the correct place.
243 *
244 * Returns 0 for success or -1 for error
245 *
246 * @p: Params for the channel that we are using
ab7cbb0b
JQ
247 * @errp: pointer to an error
248 */
9db19125 249static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
ab7cbb0b 250{
06833d83
FR
251 uint32_t flags;
252
253 if (!multifd_use_packets()) {
a49d15a3 254 return multifd_file_recv_data(p, errp);
06833d83
FR
255 }
256
257 flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
ab7cbb0b
JQ
258
259 if (flags != MULTIFD_FLAG_NOCOMP) {
04e11404 260 error_setg(errp, "multifd %u: flags received %x flags expected %x",
ab7cbb0b
JQ
261 p->id, flags, MULTIFD_FLAG_NOCOMP);
262 return -1;
263 }
cf2d4aa8 264 for (int i = 0; i < p->normal_num; i++) {
faf60935 265 p->iov[i].iov_base = p->host + p->normal[i];
ddec20f8 266 p->iov[i].iov_len = p->page_size;
226468ba 267 }
cf2d4aa8 268 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
ab7cbb0b
JQ
269}
270
271static MultiFDMethods multifd_nocomp_ops = {
272 .send_setup = nocomp_send_setup,
273 .send_cleanup = nocomp_send_cleanup,
274 .send_prepare = nocomp_send_prepare,
ab7cbb0b
JQ
275 .recv_setup = nocomp_recv_setup,
276 .recv_cleanup = nocomp_recv_cleanup,
9db19125 277 .recv = nocomp_recv
ab7cbb0b
JQ
278};
279
280static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
281 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
282};
283
7ec2c2b3
JQ
284void multifd_register_ops(int method, MultiFDMethods *ops)
285{
286 assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
287 multifd_ops[method] = ops;
288}
289
836eca47
PX
290/* Reset a MultiFDPages_t* object for the next use */
291static void multifd_pages_reset(MultiFDPages_t *pages)
292{
293 /*
294 * We don't need to touch offset[] array, because it will be
295 * overwritten later when reused.
296 */
297 pages->num = 0;
298 pages->block = NULL;
299}
300
d32ca5ad
JQ
301static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
302{
303 MultiFDInit_t msg = {};
cbec7eb7 304 size_t size = sizeof(msg);
d32ca5ad
JQ
305 int ret;
306
307 msg.magic = cpu_to_be32(MULTIFD_MAGIC);
308 msg.version = cpu_to_be32(MULTIFD_VERSION);
309 msg.id = p->id;
310 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
311
cbec7eb7 312 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
d32ca5ad
JQ
313 if (ret != 0) {
314 return -1;
315 }
cbec7eb7 316 stat64_add(&mig_stats.multifd_bytes, size);
d32ca5ad
JQ
317 return 0;
318}
319
320static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
321{
322 MultiFDInit_t msg;
323 int ret;
324
325 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
326 if (ret != 0) {
327 return -1;
328 }
329
330 msg.magic = be32_to_cpu(msg.magic);
331 msg.version = be32_to_cpu(msg.version);
332
333 if (msg.magic != MULTIFD_MAGIC) {
334 error_setg(errp, "multifd: received packet magic %x "
335 "expected %x", msg.magic, MULTIFD_MAGIC);
336 return -1;
337 }
338
339 if (msg.version != MULTIFD_VERSION) {
04e11404
JQ
340 error_setg(errp, "multifd: received packet version %u "
341 "expected %u", msg.version, MULTIFD_VERSION);
d32ca5ad
JQ
342 return -1;
343 }
344
345 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
346 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
347 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
348
349 error_setg(errp, "multifd: received uuid '%s' and expected "
350 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
351 g_free(uuid);
352 g_free(msg_uuid);
353 return -1;
354 }
355
356 if (msg.id > migrate_multifd_channels()) {
c77b4085
AH
357 error_setg(errp, "multifd: received channel id %u is greater than "
358 "number of channels %u", msg.id, migrate_multifd_channels());
d32ca5ad
JQ
359 return -1;
360 }
361
362 return msg.id;
363}
364
6074f816 365static MultiFDPages_t *multifd_pages_init(uint32_t n)
d32ca5ad
JQ
366{
367 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
368
6074f816
FR
369 pages->allocated = n;
370 pages->offset = g_new0(ram_addr_t, n);
d32ca5ad
JQ
371
372 return pages;
373}
374
375static void multifd_pages_clear(MultiFDPages_t *pages)
376{
836eca47 377 multifd_pages_reset(pages);
d32ca5ad 378 pages->allocated = 0;
d32ca5ad
JQ
379 g_free(pages->offset);
380 pages->offset = NULL;
381 g_free(pages);
382}
383
25a1f878 384void multifd_send_fill_packet(MultiFDSendParams *p)
d32ca5ad
JQ
385{
386 MultiFDPacket_t *packet = p->packet;
efd8c543 387 MultiFDPages_t *pages = p->pages;
98ea497d 388 uint64_t packet_num;
d32ca5ad
JQ
389 int i;
390
391 packet->flags = cpu_to_be32(p->flags);
392 packet->pages_alloc = cpu_to_be32(p->pages->allocated);
efd8c543 393 packet->normal_pages = cpu_to_be32(pages->num);
d32ca5ad 394 packet->next_packet_size = cpu_to_be32(p->next_packet_size);
98ea497d
PX
395
396 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
397 packet->packet_num = cpu_to_be64(packet_num);
d32ca5ad 398
efd8c543
PX
399 if (pages->block) {
400 strncpy(packet->ramblock, pages->block->idstr, 256);
d32ca5ad
JQ
401 }
402
efd8c543 403 for (i = 0; i < pages->num; i++) {
d32ca5ad 404 /* there are architectures where ram_addr_t is 32 bit */
efd8c543 405 uint64_t temp = pages->offset[i];
d32ca5ad
JQ
406
407 packet->offset[i] = cpu_to_be64(temp);
408 }
05b7ec18
PX
409
410 p->packets_sent++;
db7e1cc5 411 p->total_normal_pages += pages->num;
8a9ef173 412
98ea497d 413 trace_multifd_send(p->id, packet_num, pages->num, p->flags,
8a9ef173 414 p->next_packet_size);
d32ca5ad
JQ
415}
416
417static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
418{
419 MultiFDPacket_t *packet = p->packet;
d32ca5ad
JQ
420 int i;
421
422 packet->magic = be32_to_cpu(packet->magic);
423 if (packet->magic != MULTIFD_MAGIC) {
424 error_setg(errp, "multifd: received packet "
425 "magic %x and expected magic %x",
426 packet->magic, MULTIFD_MAGIC);
427 return -1;
428 }
429
430 packet->version = be32_to_cpu(packet->version);
431 if (packet->version != MULTIFD_VERSION) {
432 error_setg(errp, "multifd: received packet "
04e11404 433 "version %u and expected version %u",
d32ca5ad
JQ
434 packet->version, MULTIFD_VERSION);
435 return -1;
436 }
437
438 p->flags = be32_to_cpu(packet->flags);
439
440 packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
441 /*
442 * If we received a packet that is 100 times bigger than expected
443 * just stop migration. It is a magic number.
444 */
d6f45eba 445 if (packet->pages_alloc > p->page_count) {
d32ca5ad 446 error_setg(errp, "multifd: received packet "
cf2d4aa8 447 "with size %u and expected a size of %u",
d6f45eba 448 packet->pages_alloc, p->page_count) ;
d32ca5ad
JQ
449 return -1;
450 }
d32ca5ad 451
8c0ec0b2 452 p->normal_num = be32_to_cpu(packet->normal_pages);
cf2d4aa8 453 if (p->normal_num > packet->pages_alloc) {
d32ca5ad 454 error_setg(errp, "multifd: received packet "
04e11404 455 "with %u pages and expected maximum pages are %u",
cf2d4aa8 456 p->normal_num, packet->pages_alloc) ;
d32ca5ad
JQ
457 return -1;
458 }
459
460 p->next_packet_size = be32_to_cpu(packet->next_packet_size);
461 p->packet_num = be64_to_cpu(packet->packet_num);
05b7ec18 462 p->packets_recved++;
db7e1cc5 463 p->total_normal_pages += p->normal_num;
d32ca5ad 464
8a9ef173
PX
465 trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags,
466 p->next_packet_size);
467
cf2d4aa8 468 if (p->normal_num == 0) {
d32ca5ad
JQ
469 return 0;
470 }
471
472 /* make sure that ramblock is 0 terminated */
473 packet->ramblock[255] = 0;
5d1d1fcf
LS
474 p->block = qemu_ram_block_by_name(packet->ramblock);
475 if (!p->block) {
d32ca5ad
JQ
476 error_setg(errp, "multifd: unknown ram block %s",
477 packet->ramblock);
478 return -1;
479 }
480
5d1d1fcf 481 p->host = p->block->host;
cf2d4aa8 482 for (i = 0; i < p->normal_num; i++) {
d32ca5ad
JQ
483 uint64_t offset = be64_to_cpu(packet->offset[i]);
484
5d1d1fcf 485 if (offset > (p->block->used_length - p->page_size)) {
d32ca5ad
JQ
486 error_setg(errp, "multifd: offset too long %" PRIu64
487 " (max " RAM_ADDR_FMT ")",
5d1d1fcf 488 offset, p->block->used_length);
d32ca5ad
JQ
489 return -1;
490 }
cf2d4aa8 491 p->normal[i] = offset;
d32ca5ad
JQ
492 }
493
494 return 0;
495}
496
15f3f21d
PX
497static bool multifd_send_should_exit(void)
498{
499 return qatomic_read(&multifd_send_state->exiting);
500}
501
11dd7be5
FR
502static bool multifd_recv_should_exit(void)
503{
504 return qatomic_read(&multifd_recv_state->exiting);
505}
506
48c0f5d5
PX
507/*
508 * The migration thread can wait on either of the two semaphores. This
509 * function can be used to kick the main thread out of waiting on either of
510 * them. Should mostly only be called when something wrong happened with
511 * the current multifd send thread.
512 */
513static void multifd_send_kick_main(MultiFDSendParams *p)
514{
515 qemu_sem_post(&p->sem_sync);
516 qemu_sem_post(&multifd_send_state->channels_ready);
517}
518
d32ca5ad
JQ
519/*
520 * How we use multifd_send_state->pages and channel->pages?
521 *
522 * We create a pages for each channel, and a main one. Each time that
523 * we need to send a batch of pages we interchange the ones between
524 * multifd_send_state and the channel that is sending it. There are
525 * two reasons for that:
526 * - to not have to do so many mallocs during migration
527 * - to make easier to know what to free at the end of migration
528 *
529 * This way we always know who is the owner of each "pages" struct,
530 * and we don't need any locking. It belongs to the migration thread
531 * or to the channel thread. Switching is safe because the migration
532 * thread is using the channel mutex when changing it, and the channel
533 * have to had finish with its own, otherwise pending_job can't be
534 * false.
3b40964a
PX
535 *
536 * Returns true if succeed, false otherwise.
d32ca5ad 537 */
3b40964a 538static bool multifd_send_pages(void)
d32ca5ad
JQ
539{
540 int i;
541 static int next_channel;
542 MultiFDSendParams *p = NULL; /* make happy gcc */
543 MultiFDPages_t *pages = multifd_send_state->pages;
d32ca5ad 544
15f3f21d 545 if (multifd_send_should_exit()) {
3b40964a 546 return false;
d32ca5ad
JQ
547 }
548
e3cce9af 549 /* We wait here, until at least one channel is ready */
d32ca5ad 550 qemu_sem_wait(&multifd_send_state->channels_ready);
e3cce9af 551
7e89a140
LV
552 /*
553 * next_channel can remain from a previous migration that was
554 * using more channels, so ensure it doesn't overflow if the
555 * limit is lower now.
556 */
557 next_channel %= migrate_multifd_channels();
d32ca5ad 558 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
15f3f21d 559 if (multifd_send_should_exit()) {
3b40964a 560 return false;
d32ca5ad 561 }
15f3f21d 562 p = &multifd_send_state->params[i];
e3cce9af
PX
563 /*
564 * Lockless read to p->pending_job is safe, because only multifd
565 * sender thread can clear it.
566 */
f5f48a78 567 if (qatomic_read(&p->pending_job) == false) {
d32ca5ad
JQ
568 next_channel = (i + 1) % migrate_multifd_channels();
569 break;
570 }
d32ca5ad 571 }
e3cce9af 572
e3cce9af 573 /*
488c84ac
PX
574 * Make sure we read p->pending_job before all the rest. Pairs with
575 * qatomic_store_release() in multifd_send_thread().
e3cce9af 576 */
488c84ac
PX
577 smp_mb_acquire();
578 assert(!p->pages->num);
d32ca5ad
JQ
579 multifd_send_state->pages = p->pages;
580 p->pages = pages;
488c84ac
PX
581 /*
582 * Making sure p->pages is setup before marking pending_job=true. Pairs
583 * with the qatomic_load_acquire() in multifd_send_thread().
584 */
585 qatomic_store_release(&p->pending_job, true);
d32ca5ad
JQ
586 qemu_sem_post(&p->sem);
587
3b40964a 588 return true;
d32ca5ad
JQ
589}
590
f88f86c4
PX
591static inline bool multifd_queue_empty(MultiFDPages_t *pages)
592{
593 return pages->num == 0;
594}
595
596static inline bool multifd_queue_full(MultiFDPages_t *pages)
597{
598 return pages->num == pages->allocated;
599}
600
601static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
602{
603 pages->offset[pages->num++] = offset;
604}
605
d6556d17
PX
606/* Returns true if enqueue successful, false otherwise */
607bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
d32ca5ad 608{
f88f86c4 609 MultiFDPages_t *pages;
d32ca5ad 610
f88f86c4
PX
611retry:
612 pages = multifd_send_state->pages;
613
614 /* If the queue is empty, we can already enqueue now */
615 if (multifd_queue_empty(pages)) {
d32ca5ad 616 pages->block = block;
f88f86c4
PX
617 multifd_enqueue(pages, offset);
618 return true;
d32ca5ad
JQ
619 }
620
f88f86c4
PX
621 /*
622 * Not empty, meanwhile we need a flush. It can because of either:
623 *
624 * (1) The page is not on the same ramblock of previous ones, or,
625 * (2) The queue is full.
626 *
627 * After flush, always retry.
628 */
629 if (pages->block != block || multifd_queue_full(pages)) {
630 if (!multifd_send_pages()) {
631 return false;
d32ca5ad 632 }
f88f86c4 633 goto retry;
d32ca5ad
JQ
634 }
635
f88f86c4
PX
636 /* Not empty, and we still have space, do it! */
637 multifd_enqueue(pages, offset);
d6556d17 638 return true;
d32ca5ad
JQ
639}
640
3ab4441d
PX
641/* Multifd send side hit an error; remember it and prepare to quit */
642static void multifd_send_set_error(Error *err)
d32ca5ad 643{
15f3f21d
PX
644 /*
645 * We don't want to exit each threads twice. Depending on where
646 * we get the error, or if there are two independent errors in two
647 * threads at the same time, we can end calling this function
648 * twice.
649 */
650 if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
651 return;
652 }
653
d32ca5ad
JQ
654 if (err) {
655 MigrationState *s = migrate_get_current();
656 migrate_set_error(s, err);
657 if (s->state == MIGRATION_STATUS_SETUP ||
658 s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
659 s->state == MIGRATION_STATUS_DEVICE ||
660 s->state == MIGRATION_STATUS_ACTIVE) {
661 migrate_set_state(&s->state, s->state,
662 MIGRATION_STATUS_FAILED);
663 }
664 }
3ab4441d
PX
665}
666
667static void multifd_send_terminate_threads(void)
668{
669 int i;
670
671 trace_multifd_send_terminate_threads();
d32ca5ad 672
3ab4441d
PX
673 /*
674 * Tell everyone we're quitting. No xchg() needed here; we simply
675 * always set it.
676 */
677 qatomic_set(&multifd_send_state->exiting, 1);
12808db3
PX
678
679 /*
680 * Firstly, kick all threads out; no matter whether they are just idle,
681 * or blocked in an IO system call.
682 */
d32ca5ad
JQ
683 for (i = 0; i < migrate_multifd_channels(); i++) {
684 MultiFDSendParams *p = &multifd_send_state->params[i];
685
d32ca5ad 686 qemu_sem_post(&p->sem);
077fbb59
LZ
687 if (p->c) {
688 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
689 }
d32ca5ad 690 }
12808db3
PX
691
692 /*
693 * Finally recycle all the threads.
12808db3
PX
694 */
695 for (i = 0; i < migrate_multifd_channels(); i++) {
696 MultiFDSendParams *p = &multifd_send_state->params[i];
697
e1921f10
FR
698 if (p->tls_thread_created) {
699 qemu_thread_join(&p->tls_thread);
700 }
701
a2a63c4a 702 if (p->thread_created) {
12808db3
PX
703 qemu_thread_join(&p->thread);
704 }
705 }
d32ca5ad
JQ
706}
707
12808db3
PX
708static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
709{
0518b5d8 710 if (p->c) {
12808db3 711 migration_ioc_unregister_yank(p->c);
1a6e217c 712 /*
61dec060
FR
713 * The object_unref() cannot guarantee the fd will always be
714 * released because finalize() of the iochannel is only
715 * triggered on the last reference and it's not guaranteed
716 * that we always hold the last refcount when reaching here.
1a6e217c 717 *
61dec060
FR
718 * Closing the fd explicitly has the benefit that if there is any
719 * registered I/O handler callbacks on such fd, that will get a
720 * POLLNVAL event and will further trigger the cleanup to finally
721 * release the IOC.
722 *
723 * FIXME: It should logically be guaranteed that all multifd
724 * channels have no I/O handler callback registered when reaching
725 * here, because migration thread will wait for all multifd channel
726 * establishments to complete during setup. Since
727 * migrate_fd_cleanup() will be scheduled in main thread too, all
728 * previous callbacks should guarantee to be completed when
729 * reaching here. See multifd_send_state.channels_created and its
730 * usage. In the future, we could replace this with an assert
731 * making sure we're the last reference, or simply drop it if above
732 * is more clear to be justified.
1a6e217c 733 */
b7b03eb6 734 qio_channel_close(p->c, &error_abort);
c9a7e83c 735 object_unref(OBJECT(p->c));
0518b5d8 736 p->c = NULL;
12808db3 737 }
12808db3
PX
738 qemu_sem_destroy(&p->sem);
739 qemu_sem_destroy(&p->sem_sync);
740 g_free(p->name);
741 p->name = NULL;
742 multifd_pages_clear(p->pages);
743 p->pages = NULL;
744 p->packet_len = 0;
745 g_free(p->packet);
746 p->packet = NULL;
747 g_free(p->iov);
748 p->iov = NULL;
749 multifd_send_state->ops->send_cleanup(p, errp);
750
751 return *errp == NULL;
752}
753
754static void multifd_send_cleanup_state(void)
755{
b7b03eb6 756 file_cleanup_outgoing_migration();
decdc767 757 fd_cleanup_outgoing_migration();
72b90b96 758 socket_cleanup_outgoing_migration();
93fa9dc2 759 qemu_sem_destroy(&multifd_send_state->channels_created);
12808db3
PX
760 qemu_sem_destroy(&multifd_send_state->channels_ready);
761 g_free(multifd_send_state->params);
762 multifd_send_state->params = NULL;
763 multifd_pages_clear(multifd_send_state->pages);
764 multifd_send_state->pages = NULL;
765 g_free(multifd_send_state);
766 multifd_send_state = NULL;
767}
768
cde85c37 769void multifd_send_shutdown(void)
d32ca5ad
JQ
770{
771 int i;
772
51b07548 773 if (!migrate_multifd()) {
d32ca5ad
JQ
774 return;
775 }
12808db3 776
3ab4441d 777 multifd_send_terminate_threads();
d32ca5ad 778
d32ca5ad
JQ
779 for (i = 0; i < migrate_multifd_channels(); i++) {
780 MultiFDSendParams *p = &multifd_send_state->params[i];
ab7cbb0b 781 Error *local_err = NULL;
d32ca5ad 782
12808db3 783 if (!multifd_send_cleanup_channel(p, &local_err)) {
ab7cbb0b 784 migrate_set_error(migrate_get_current(), local_err);
13f2cb21 785 error_free(local_err);
ab7cbb0b 786 }
d32ca5ad 787 }
12808db3
PX
788
789 multifd_send_cleanup_state();
d32ca5ad
JQ
790}
791
4cc47b43
LB
792static int multifd_zero_copy_flush(QIOChannel *c)
793{
794 int ret;
795 Error *err = NULL;
796
797 ret = qio_channel_flush(c, &err);
798 if (ret < 0) {
799 error_report_err(err);
800 return -1;
801 }
802 if (ret == 1) {
aff3f660 803 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
4cc47b43
LB
804 }
805
806 return ret;
807}
808
9346fa18 809int multifd_send_sync_main(void)
d32ca5ad
JQ
810{
811 int i;
5b1d9bab 812 bool flush_zero_copy;
d32ca5ad 813
51b07548 814 if (!migrate_multifd()) {
33d70973 815 return 0;
d32ca5ad 816 }
90a3d2f9 817 if (multifd_send_state->pages->num) {
3b40964a 818 if (!multifd_send_pages()) {
d32ca5ad 819 error_report("%s: multifd_send_pages fail", __func__);
33d70973 820 return -1;
d32ca5ad
JQ
821 }
822 }
5b1d9bab 823
b4bc342c 824 flush_zero_copy = migrate_zero_copy_send();
5b1d9bab 825
d32ca5ad
JQ
826 for (i = 0; i < migrate_multifd_channels(); i++) {
827 MultiFDSendParams *p = &multifd_send_state->params[i];
828
15f3f21d 829 if (multifd_send_should_exit()) {
33d70973 830 return -1;
d32ca5ad
JQ
831 }
832
15f3f21d
PX
833 trace_multifd_send_sync_main_signal(p->id);
834
f5f48a78
PX
835 /*
836 * We should be the only user so far, so not possible to be set by
837 * others concurrently.
838 */
839 assert(qatomic_read(&p->pending_sync) == false);
840 qatomic_set(&p->pending_sync, true);
d32ca5ad
JQ
841 qemu_sem_post(&p->sem);
842 }
843 for (i = 0; i < migrate_multifd_channels(); i++) {
844 MultiFDSendParams *p = &multifd_send_state->params[i];
845
15f3f21d
PX
846 if (multifd_send_should_exit()) {
847 return -1;
848 }
849
d2026ee1 850 qemu_sem_wait(&multifd_send_state->channels_ready);
d32ca5ad
JQ
851 trace_multifd_send_sync_main_wait(p->id);
852 qemu_sem_wait(&p->sem_sync);
ebfc5787
ZD
853
854 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
855 return -1;
856 }
d32ca5ad
JQ
857 }
858 trace_multifd_send_sync_main(multifd_send_state->packet_num);
33d70973
LB
859
860 return 0;
d32ca5ad
JQ
861}
862
863static void *multifd_send_thread(void *opaque)
864{
865 MultiFDSendParams *p = opaque;
1b1f4ab6 866 MigrationThread *thread = NULL;
d32ca5ad
JQ
867 Error *local_err = NULL;
868 int ret = 0;
06833d83 869 bool use_packets = multifd_use_packets();
d32ca5ad 870
788fa680 871 thread = migration_threads_add(p->name, qemu_get_thread_id());
1b1f4ab6 872
d32ca5ad
JQ
873 trace_multifd_send_thread_start(p->id);
874 rcu_register_thread();
875
06833d83
FR
876 if (use_packets) {
877 if (multifd_send_initial_packet(p, &local_err) < 0) {
878 ret = -1;
879 goto out;
880 }
d32ca5ad 881 }
d32ca5ad
JQ
882
883 while (true) {
d2026ee1 884 qemu_sem_post(&multifd_send_state->channels_ready);
d32ca5ad
JQ
885 qemu_sem_wait(&p->sem);
886
15f3f21d 887 if (multifd_send_should_exit()) {
d32ca5ad
JQ
888 break;
889 }
d32ca5ad 890
488c84ac
PX
891 /*
892 * Read pending_job flag before p->pages. Pairs with the
893 * qatomic_store_release() in multifd_send_pages().
894 */
895 if (qatomic_load_acquire(&p->pending_job)) {
efd8c543 896 MultiFDPages_t *pages = p->pages;
815956f0 897
452b2057 898 p->iovs_num = 0;
83c560fb
PX
899 assert(pages->num);
900
901 ret = multifd_send_state->ops->send_prepare(p, &local_err);
902 if (ret != 0) {
83c560fb 903 break;
ab7cbb0b 904 }
83c560fb 905
f427d90b
FR
906 if (migrate_mapped_ram()) {
907 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
908 p->pages->block, &local_err);
909 } else {
910 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
911 NULL, 0, p->write_flags,
912 &local_err);
913 }
914
d32ca5ad
JQ
915 if (ret != 0) {
916 break;
917 }
918
68b6e000
EU
919 stat64_add(&mig_stats.multifd_bytes,
920 p->next_packet_size + p->packet_len);
836eca47
PX
921
922 multifd_pages_reset(p->pages);
1618f552 923 p->next_packet_size = 0;
488c84ac
PX
924
925 /*
926 * Making sure p->pages is published before saying "we're
927 * free". Pairs with the smp_mb_acquire() in
928 * multifd_send_pages().
929 */
930 qatomic_store_release(&p->pending_job, false);
859ebaf3 931 } else {
488c84ac
PX
932 /*
933 * If not a normal job, must be a sync request. Note that
934 * pending_sync is a standalone flag (unlike pending_job), so
935 * it doesn't require explicit memory barriers.
936 */
859ebaf3 937 assert(qatomic_read(&p->pending_sync));
06833d83
FR
938
939 if (use_packets) {
940 p->flags = MULTIFD_FLAG_SYNC;
941 multifd_send_fill_packet(p);
942 ret = qio_channel_write_all(p->c, (void *)p->packet,
943 p->packet_len, &local_err);
944 if (ret != 0) {
945 break;
946 }
947 /* p->next_packet_size will always be zero for a SYNC packet */
948 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
949 p->flags = 0;
d32ca5ad 950 }
06833d83 951
f5f48a78 952 qatomic_set(&p->pending_sync, false);
f5f48a78 953 qemu_sem_post(&p->sem_sync);
d32ca5ad
JQ
954 }
955 }
956
957out:
ee8a7c9c
FR
958 if (ret) {
959 assert(local_err);
d32ca5ad 960 trace_multifd_send_error(p->id);
3ab4441d 961 multifd_send_set_error(local_err);
48c0f5d5 962 multifd_send_kick_main(p);
ee8a7c9c 963 error_free(local_err);
d32ca5ad
JQ
964 }
965
d32ca5ad 966 rcu_unregister_thread();
788fa680 967 migration_threads_remove(thread);
05b7ec18 968 trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages);
d32ca5ad
JQ
969
970 return NULL;
971}
972
2576ae48 973static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
29647140 974
9221e3c6
PX
975typedef struct {
976 MultiFDSendParams *p;
977 QIOChannelTLS *tioc;
978} MultiFDTLSThreadArgs;
979
a1af605b
CZ
980static void *multifd_tls_handshake_thread(void *opaque)
981{
9221e3c6 982 MultiFDTLSThreadArgs *args = opaque;
a1af605b 983
9221e3c6 984 qio_channel_tls_handshake(args->tioc,
2576ae48 985 multifd_new_send_channel_async,
9221e3c6 986 args->p,
a1af605b
CZ
987 NULL,
988 NULL);
9221e3c6
PX
989 g_free(args);
990
a1af605b
CZ
991 return NULL;
992}
993
967e3889 994static bool multifd_tls_channel_connect(MultiFDSendParams *p,
29647140
CZ
995 QIOChannel *ioc,
996 Error **errp)
997{
998 MigrationState *s = migrate_get_current();
7f692ec7 999 const char *hostname = s->hostname;
9221e3c6 1000 MultiFDTLSThreadArgs *args;
29647140
CZ
1001 QIOChannelTLS *tioc;
1002
0deb7e9b 1003 tioc = migration_tls_client_create(ioc, hostname, errp);
29647140 1004 if (!tioc) {
967e3889 1005 return false;
29647140
CZ
1006 }
1007
2576ae48
FR
1008 /*
1009 * Ownership of the socket channel now transfers to the newly
1010 * created TLS channel, which has already taken a reference.
1011 */
9e842408 1012 object_unref(OBJECT(ioc));
894f0214 1013 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
29647140 1014 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
9221e3c6
PX
1015
1016 args = g_new0(MultiFDTLSThreadArgs, 1);
1017 args->tioc = tioc;
1018 args->p = p;
e1921f10
FR
1019
1020 p->tls_thread_created = true;
1021 qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker",
9221e3c6 1022 multifd_tls_handshake_thread, args,
a1af605b 1023 QEMU_THREAD_JOINABLE);
967e3889 1024 return true;
29647140
CZ
1025}
1026
b7b03eb6 1027void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
29647140 1028{
2576ae48 1029 qio_channel_set_delay(ioc, false);
a4395f5d
AH
1030
1031 migration_ioc_register_yank(ioc);
9221e3c6 1032 /* Setup p->c only if the channel is completely setup */
a4395f5d 1033 p->c = ioc;
a2a63c4a
FR
1034
1035 p->thread_created = true;
a4395f5d
AH
1036 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1037 QEMU_THREAD_JOINABLE);
29647140
CZ
1038}
1039
2576ae48
FR
1040/*
1041 * When TLS is enabled this function is called once to establish the
1042 * TLS connection and a second time after the TLS handshake to create
1043 * the multifd channel. Without TLS it goes straight into the channel
1044 * creation.
1045 */
d32ca5ad
JQ
1046static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1047{
1048 MultiFDSendParams *p = opaque;
0e92f644 1049 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
d32ca5ad 1050 Error *local_err = NULL;
2576ae48 1051 bool ret;
d32ca5ad
JQ
1052
1053 trace_multifd_new_send_channel_async(p->id);
2576ae48
FR
1054
1055 if (qio_task_propagate_error(task, &local_err)) {
1056 ret = false;
1057 goto out;
1058 }
1059
1060 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
1061 migrate_get_current()->hostname);
1062
1063 if (migrate_channel_requires_tls_upgrade(ioc)) {
1064 ret = multifd_tls_channel_connect(p, ioc, &local_err);
93fa9dc2
FR
1065 if (ret) {
1066 return;
1067 }
2576ae48 1068 } else {
770de49c
PX
1069 multifd_channel_connect(p, ioc);
1070 ret = true;
d32ca5ad 1071 }
03c7a42d 1072
93fa9dc2
FR
1073out:
1074 /*
1075 * Here we're not interested whether creation succeeded, only that
1076 * it happened at all.
1077 */
a8a3e710 1078 multifd_send_channel_created();
93fa9dc2 1079
2576ae48
FR
1080 if (ret) {
1081 return;
1082 }
1083
967e3889 1084 trace_multifd_new_send_channel_async_error(p->id, local_err);
3ab4441d 1085 multifd_send_set_error(local_err);
9221e3c6
PX
1086 /*
1087 * For error cases (TLS or non-TLS), IO channel is always freed here
1088 * rather than when cleanup multifd: since p->c is not set, multifd
1089 * cleanup code doesn't even know its existence.
1090 */
1091 object_unref(OBJECT(ioc));
15f3f21d 1092 error_free(local_err);
0e92f644
FR
1093}
1094
b7b03eb6 1095static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
0e92f644 1096{
b7b03eb6
FR
1097 if (!multifd_use_packets()) {
1098 return file_send_channel_create(opaque, errp);
1099 }
1100
0e92f644 1101 socket_send_channel_create(multifd_new_send_channel_async, opaque);
b7b03eb6 1102 return true;
d32ca5ad
JQ
1103}
1104
bd8b0a8f 1105bool multifd_send_setup(void)
d32ca5ad 1106{
bd8b0a8f
FR
1107 MigrationState *s = migrate_get_current();
1108 Error *local_err = NULL;
1109 int thread_count, ret = 0;
d32ca5ad 1110 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
06833d83 1111 bool use_packets = multifd_use_packets();
d32ca5ad
JQ
1112 uint8_t i;
1113
51b07548 1114 if (!migrate_multifd()) {
bd8b0a8f 1115 return true;
d32ca5ad 1116 }
b7acd657 1117
d32ca5ad
JQ
1118 thread_count = migrate_multifd_channels();
1119 multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1120 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1121 multifd_send_state->pages = multifd_pages_init(page_count);
93fa9dc2 1122 qemu_sem_init(&multifd_send_state->channels_created, 0);
d32ca5ad 1123 qemu_sem_init(&multifd_send_state->channels_ready, 0);
d73415a3 1124 qatomic_set(&multifd_send_state->exiting, 0);
ab7cbb0b 1125 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
d32ca5ad
JQ
1126
1127 for (i = 0; i < thread_count; i++) {
1128 MultiFDSendParams *p = &multifd_send_state->params[i];
1129
d32ca5ad
JQ
1130 qemu_sem_init(&p->sem, 0);
1131 qemu_sem_init(&p->sem_sync, 0);
d32ca5ad
JQ
1132 p->id = i;
1133 p->pages = multifd_pages_init(page_count);
06833d83
FR
1134
1135 if (use_packets) {
1136 p->packet_len = sizeof(MultiFDPacket_t)
1137 + sizeof(uint64_t) * page_count;
1138 p->packet = g_malloc0(p->packet_len);
1139 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1140 p->packet->version = cpu_to_be32(MULTIFD_VERSION);
1141
1142 /* We need one extra place for the packet header */
1143 p->iov = g_new0(struct iovec, page_count + 1);
1144 } else {
1145 p->iov = g_new0(struct iovec, page_count);
1146 }
d32ca5ad 1147 p->name = g_strdup_printf("multifdsend_%d", i);
ddec20f8 1148 p->page_size = qemu_target_page_size();
d6f45eba 1149 p->page_count = page_count;
25a1f878 1150 p->write_flags = 0;
b7b03eb6
FR
1151
1152 if (!multifd_new_send_channel_create(p, &local_err)) {
1153 return false;
1154 }
d32ca5ad 1155 }
ab7cbb0b 1156
93fa9dc2
FR
1157 /*
1158 * Wait until channel creation has started for all channels. The
1159 * creation can still fail, but no more channels will be created
1160 * past this point.
1161 */
1162 for (i = 0; i < thread_count; i++) {
1163 qemu_sem_wait(&multifd_send_state->channels_created);
1164 }
1165
ab7cbb0b
JQ
1166 for (i = 0; i < thread_count; i++) {
1167 MultiFDSendParams *p = &multifd_send_state->params[i];
ab7cbb0b 1168
bd8b0a8f 1169 ret = multifd_send_state->ops->send_setup(p, &local_err);
ab7cbb0b 1170 if (ret) {
bd8b0a8f 1171 break;
ab7cbb0b
JQ
1172 }
1173 }
bd8b0a8f
FR
1174
1175 if (ret) {
1176 migrate_set_error(s, local_err);
1177 error_report_err(local_err);
1178 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
1179 MIGRATION_STATUS_FAILED);
1180 return false;
1181 }
1182
1183 return true;
d32ca5ad
JQ
1184}
1185
d117ed06
FR
1186bool multifd_recv(void)
1187{
1188 int i;
1189 static int next_recv_channel;
1190 MultiFDRecvParams *p = NULL;
1191 MultiFDRecvData *data = multifd_recv_state->data;
1192
1193 /*
1194 * next_channel can remain from a previous migration that was
1195 * using more channels, so ensure it doesn't overflow if the
1196 * limit is lower now.
1197 */
1198 next_recv_channel %= migrate_multifd_channels();
1199 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1200 if (multifd_recv_should_exit()) {
1201 return false;
1202 }
1203
1204 p = &multifd_recv_state->params[i];
1205
1206 if (qatomic_read(&p->pending_job) == false) {
1207 next_recv_channel = (i + 1) % migrate_multifd_channels();
1208 break;
1209 }
1210 }
1211
1212 /*
1213 * Order pending_job read before manipulating p->data below. Pairs
1214 * with qatomic_store_release() at multifd_recv_thread().
1215 */
1216 smp_mb_acquire();
1217
1218 assert(!p->data->size);
1219 multifd_recv_state->data = p->data;
1220 p->data = data;
1221
1222 /*
1223 * Order p->data update before setting pending_job. Pairs with
1224 * qatomic_load_acquire() at multifd_recv_thread().
1225 */
1226 qatomic_store_release(&p->pending_job, true);
1227 qemu_sem_post(&p->sem);
1228
1229 return true;
1230}
1231
1232MultiFDRecvData *multifd_get_recv_data(void)
1233{
1234 return multifd_recv_state->data;
1235}
1236
d32ca5ad
JQ
1237static void multifd_recv_terminate_threads(Error *err)
1238{
1239 int i;
1240
1241 trace_multifd_recv_terminate_threads(err != NULL);
1242
11dd7be5
FR
1243 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1244 return;
1245 }
1246
d32ca5ad
JQ
1247 if (err) {
1248 MigrationState *s = migrate_get_current();
1249 migrate_set_error(s, err);
1250 if (s->state == MIGRATION_STATUS_SETUP ||
1251 s->state == MIGRATION_STATUS_ACTIVE) {
1252 migrate_set_state(&s->state, s->state,
1253 MIGRATION_STATUS_FAILED);
1254 }
1255 }
1256
1257 for (i = 0; i < migrate_multifd_channels(); i++) {
1258 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1259
d13f0026 1260 /*
d117ed06
FR
1261 * The migration thread and channels interact differently
1262 * depending on the presence of packets.
d13f0026 1263 */
06833d83 1264 if (multifd_use_packets()) {
d117ed06
FR
1265 /*
1266 * The channel receives as long as there are packets. When
1267 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1268 * channel waits for the migration thread to sync. If the
1269 * sync never happens, do it here.
1270 */
06833d83 1271 qemu_sem_post(&p->sem_sync);
d117ed06
FR
1272 } else {
1273 /*
1274 * The channel waits for the migration thread to give it
1275 * work. When the migration thread runs out of work, it
1276 * releases the channel and waits for any pending work to
1277 * finish. If we reach here (e.g. due to error) before the
1278 * work runs out, release the channel.
1279 */
1280 qemu_sem_post(&p->sem);
06833d83 1281 }
d13f0026 1282
d32ca5ad
JQ
1283 /*
1284 * We could arrive here for two reasons:
1285 * - normal quit, i.e. everything went fine, just finished
1286 * - error quit: We close the channels so the channel threads
1287 * finish the qio_channel_read_all_eof()
1288 */
1289 if (p->c) {
1290 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1291 }
d32ca5ad
JQ
1292 }
1293}
1294
cde85c37 1295void multifd_recv_shutdown(void)
cfc3bcf3 1296{
51b07548 1297 if (migrate_multifd()) {
cfc3bcf3
LB
1298 multifd_recv_terminate_threads(NULL);
1299 }
1300}
1301
5e6ea8a1
PX
1302static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1303{
1304 migration_ioc_unregister_yank(p->c);
1305 object_unref(OBJECT(p->c));
1306 p->c = NULL;
1307 qemu_mutex_destroy(&p->mutex);
1308 qemu_sem_destroy(&p->sem_sync);
d117ed06 1309 qemu_sem_destroy(&p->sem);
5e6ea8a1
PX
1310 g_free(p->name);
1311 p->name = NULL;
1312 p->packet_len = 0;
1313 g_free(p->packet);
1314 p->packet = NULL;
1315 g_free(p->iov);
1316 p->iov = NULL;
1317 g_free(p->normal);
1318 p->normal = NULL;
1319 multifd_recv_state->ops->recv_cleanup(p);
1320}
1321
1322static void multifd_recv_cleanup_state(void)
1323{
1324 qemu_sem_destroy(&multifd_recv_state->sem_sync);
1325 g_free(multifd_recv_state->params);
1326 multifd_recv_state->params = NULL;
d117ed06
FR
1327 g_free(multifd_recv_state->data);
1328 multifd_recv_state->data = NULL;
5e6ea8a1
PX
1329 g_free(multifd_recv_state);
1330 multifd_recv_state = NULL;
1331}
1332
cde85c37 1333void multifd_recv_cleanup(void)
d32ca5ad
JQ
1334{
1335 int i;
d32ca5ad 1336
51b07548 1337 if (!migrate_multifd()) {
e5bac1f5 1338 return;
d32ca5ad
JQ
1339 }
1340 multifd_recv_terminate_threads(NULL);
1341 for (i = 0; i < migrate_multifd_channels(); i++) {
1342 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1343
a2a63c4a
FR
1344 if (p->thread_created) {
1345 qemu_thread_join(&p->thread);
1346 }
d32ca5ad
JQ
1347 }
1348 for (i = 0; i < migrate_multifd_channels(); i++) {
5e6ea8a1 1349 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
d32ca5ad 1350 }
5e6ea8a1 1351 multifd_recv_cleanup_state();
d32ca5ad
JQ
1352}
1353
1354void multifd_recv_sync_main(void)
1355{
4aac6b1e 1356 int thread_count = migrate_multifd_channels();
a49d15a3 1357 bool file_based = !multifd_use_packets();
d32ca5ad
JQ
1358 int i;
1359
a49d15a3 1360 if (!migrate_multifd()) {
d32ca5ad
JQ
1361 return;
1362 }
d32ca5ad 1363
a49d15a3
FR
1364 /*
1365 * File-based channels don't use packets and therefore need to
1366 * wait for more work. Release them to start the sync.
1367 */
1368 if (file_based) {
1369 for (i = 0; i < thread_count; i++) {
1370 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1371
1372 trace_multifd_recv_sync_main_signal(p->id);
1373 qemu_sem_post(&p->sem);
1374 }
1375 }
1376
4aac6b1e
FR
1377 /*
1378 * Initiate the synchronization by waiting for all channels.
a49d15a3 1379 *
4aac6b1e
FR
1380 * For socket-based migration this means each channel has received
1381 * the SYNC packet on the stream.
a49d15a3
FR
1382 *
1383 * For file-based migration this means each channel is done with
1384 * the work (pending_job=false).
4aac6b1e
FR
1385 */
1386 for (i = 0; i < thread_count; i++) {
1387 trace_multifd_recv_sync_main_wait(i);
d32ca5ad
JQ
1388 qemu_sem_wait(&multifd_recv_state->sem_sync);
1389 }
4aac6b1e 1390
a49d15a3
FR
1391 if (file_based) {
1392 /*
1393 * For file-based loading is done in one iteration. We're
1394 * done.
1395 */
1396 return;
1397 }
1398
4aac6b1e
FR
1399 /*
1400 * Sync done. Release the channels for the next iteration.
1401 */
1402 for (i = 0; i < thread_count; i++) {
d32ca5ad
JQ
1403 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1404
6e8a355d
DB
1405 WITH_QEMU_LOCK_GUARD(&p->mutex) {
1406 if (multifd_recv_state->packet_num < p->packet_num) {
1407 multifd_recv_state->packet_num = p->packet_num;
1408 }
d32ca5ad 1409 }
d32ca5ad
JQ
1410 trace_multifd_recv_sync_main_signal(p->id);
1411 qemu_sem_post(&p->sem_sync);
1412 }
1413 trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1414}
1415
1416static void *multifd_recv_thread(void *opaque)
1417{
1418 MultiFDRecvParams *p = opaque;
1419 Error *local_err = NULL;
06833d83 1420 bool use_packets = multifd_use_packets();
d32ca5ad
JQ
1421 int ret;
1422
1423 trace_multifd_recv_thread_start(p->id);
1424 rcu_register_thread();
1425
1426 while (true) {
06833d83 1427 uint32_t flags = 0;
9db19125
FR
1428 bool has_data = false;
1429 p->normal_num = 0;
d32ca5ad 1430
06833d83 1431 if (use_packets) {
d117ed06
FR
1432 if (multifd_recv_should_exit()) {
1433 break;
1434 }
1435
06833d83
FR
1436 ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1437 p->packet_len, &local_err);
1438 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
1439 break;
1440 }
d32ca5ad 1441
06833d83
FR
1442 qemu_mutex_lock(&p->mutex);
1443 ret = multifd_recv_unfill_packet(p, &local_err);
1444 if (ret) {
1445 qemu_mutex_unlock(&p->mutex);
1446 break;
1447 }
1448
1449 flags = p->flags;
1450 /* recv methods don't know how to handle the SYNC flag */
1451 p->flags &= ~MULTIFD_FLAG_SYNC;
1452 has_data = !!p->normal_num;
d32ca5ad 1453 qemu_mutex_unlock(&p->mutex);
d117ed06
FR
1454 } else {
1455 /*
1456 * No packets, so we need to wait for the vmstate code to
1457 * give us work.
1458 */
1459 qemu_sem_wait(&p->sem);
1460
1461 if (multifd_recv_should_exit()) {
1462 break;
1463 }
1464
1465 /* pairs with qatomic_store_release() at multifd_recv() */
1466 if (!qatomic_load_acquire(&p->pending_job)) {
1467 /*
1468 * Migration thread did not send work, this is
1469 * equivalent to pending_sync on the sending
1470 * side. Post sem_sync to notify we reached this
1471 * point.
1472 */
1473 qemu_sem_post(&multifd_recv_state->sem_sync);
1474 continue;
1475 }
1476
1477 has_data = !!p->data->size;
d32ca5ad
JQ
1478 }
1479
9db19125
FR
1480 if (has_data) {
1481 ret = multifd_recv_state->ops->recv(p, &local_err);
d32ca5ad
JQ
1482 if (ret != 0) {
1483 break;
1484 }
1485 }
1486
06833d83
FR
1487 if (use_packets) {
1488 if (flags & MULTIFD_FLAG_SYNC) {
1489 qemu_sem_post(&multifd_recv_state->sem_sync);
1490 qemu_sem_wait(&p->sem_sync);
1491 }
d117ed06
FR
1492 } else {
1493 p->total_normal_pages += p->data->size / qemu_target_page_size();
1494 p->data->size = 0;
1495 /*
1496 * Order data->size update before clearing
1497 * pending_job. Pairs with smp_mb_acquire() at
1498 * multifd_recv().
1499 */
1500 qatomic_store_release(&p->pending_job, false);
d32ca5ad
JQ
1501 }
1502 }
1503
1504 if (local_err) {
1505 multifd_recv_terminate_threads(local_err);
13f2cb21 1506 error_free(local_err);
d32ca5ad 1507 }
d32ca5ad
JQ
1508
1509 rcu_unregister_thread();
05b7ec18 1510 trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages);
d32ca5ad
JQ
1511
1512 return NULL;
1513}
1514
cde85c37 1515int multifd_recv_setup(Error **errp)
d32ca5ad
JQ
1516{
1517 int thread_count;
1518 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
06833d83 1519 bool use_packets = multifd_use_packets();
d32ca5ad
JQ
1520 uint8_t i;
1521
6720c2b3 1522 /*
1523 * Return successfully if multiFD recv state is already initialised
1524 * or multiFD is not enabled.
1525 */
51b07548 1526 if (multifd_recv_state || !migrate_multifd()) {
d32ca5ad
JQ
1527 return 0;
1528 }
6720c2b3 1529
d32ca5ad
JQ
1530 thread_count = migrate_multifd_channels();
1531 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1532 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
d117ed06
FR
1533
1534 multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1535 multifd_recv_state->data->size = 0;
1536
d73415a3 1537 qatomic_set(&multifd_recv_state->count, 0);
11dd7be5 1538 qatomic_set(&multifd_recv_state->exiting, 0);
d32ca5ad 1539 qemu_sem_init(&multifd_recv_state->sem_sync, 0);
ab7cbb0b 1540 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
d32ca5ad
JQ
1541
1542 for (i = 0; i < thread_count; i++) {
1543 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1544
1545 qemu_mutex_init(&p->mutex);
1546 qemu_sem_init(&p->sem_sync, 0);
d117ed06
FR
1547 qemu_sem_init(&p->sem, 0);
1548 p->pending_job = false;
d32ca5ad 1549 p->id = i;
06833d83 1550
d117ed06
FR
1551 p->data = g_new0(MultiFDRecvData, 1);
1552 p->data->size = 0;
1553
06833d83
FR
1554 if (use_packets) {
1555 p->packet_len = sizeof(MultiFDPacket_t)
1556 + sizeof(uint64_t) * page_count;
1557 p->packet = g_malloc0(p->packet_len);
1558 }
d32ca5ad 1559 p->name = g_strdup_printf("multifdrecv_%d", i);
226468ba 1560 p->iov = g_new0(struct iovec, page_count);
cf2d4aa8 1561 p->normal = g_new0(ram_addr_t, page_count);
d6f45eba 1562 p->page_count = page_count;
ddec20f8 1563 p->page_size = qemu_target_page_size();
d32ca5ad 1564 }
ab7cbb0b
JQ
1565
1566 for (i = 0; i < thread_count; i++) {
1567 MultiFDRecvParams *p = &multifd_recv_state->params[i];
ab7cbb0b
JQ
1568 int ret;
1569
3fc58efa 1570 ret = multifd_recv_state->ops->recv_setup(p, errp);
ab7cbb0b 1571 if (ret) {
ab7cbb0b
JQ
1572 return ret;
1573 }
1574 }
d32ca5ad
JQ
1575 return 0;
1576}
1577
1578bool multifd_recv_all_channels_created(void)
1579{
1580 int thread_count = migrate_multifd_channels();
1581
51b07548 1582 if (!migrate_multifd()) {
d32ca5ad
JQ
1583 return true;
1584 }
1585
a59136f3
DDAG
1586 if (!multifd_recv_state) {
1587 /* Called before any connections created */
1588 return false;
1589 }
1590
d73415a3 1591 return thread_count == qatomic_read(&multifd_recv_state->count);
d32ca5ad
JQ
1592}
1593
1594/*
1595 * Try to receive all multifd channels to get ready for the migration.
6720c2b3 1596 * Sets @errp when failing to receive the current channel.
d32ca5ad 1597 */
6720c2b3 1598void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
d32ca5ad
JQ
1599{
1600 MultiFDRecvParams *p;
1601 Error *local_err = NULL;
06833d83 1602 bool use_packets = multifd_use_packets();
d32ca5ad
JQ
1603 int id;
1604
06833d83
FR
1605 if (use_packets) {
1606 id = multifd_recv_initial_packet(ioc, &local_err);
1607 if (id < 0) {
1608 multifd_recv_terminate_threads(local_err);
1609 error_propagate_prepend(errp, local_err,
1610 "failed to receive packet"
1611 " via multifd channel %d: ",
1612 qatomic_read(&multifd_recv_state->count));
1613 return;
1614 }
1615 trace_multifd_recv_new_channel(id);
1616 } else {
2dd7ee7a 1617 id = qatomic_read(&multifd_recv_state->count);
d32ca5ad 1618 }
d32ca5ad
JQ
1619
1620 p = &multifd_recv_state->params[id];
1621 if (p->c != NULL) {
1622 error_setg(&local_err, "multifd: received id '%d' already setup'",
1623 id);
1624 multifd_recv_terminate_threads(local_err);
1625 error_propagate(errp, local_err);
6720c2b3 1626 return;
d32ca5ad
JQ
1627 }
1628 p->c = ioc;
1629 object_ref(OBJECT(ioc));
d32ca5ad 1630
a2a63c4a 1631 p->thread_created = true;
d32ca5ad
JQ
1632 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1633 QEMU_THREAD_JOINABLE);
d73415a3 1634 qatomic_inc(&multifd_recv_state->count);
d32ca5ad 1635}