]>
Commit | Line | Data |
---|---|---|
d32ca5ad JQ |
1 | /* |
2 | * Multifd common code | |
3 | * | |
4 | * Copyright (c) 2019-2020 Red Hat Inc | |
5 | * | |
6 | * Authors: | |
7 | * Juan Quintela <quintela@redhat.com> | |
8 | * | |
9 | * This work is licensed under the terms of the GNU GPL, version 2 or later. | |
10 | * See the COPYING file in the top-level directory. | |
11 | */ | |
12 | ||
13 | #include "qemu/osdep.h" | |
14 | #include "qemu/rcu.h" | |
15 | #include "exec/target_page.h" | |
16 | #include "sysemu/sysemu.h" | |
17 | #include "exec/ramblock.h" | |
18 | #include "qemu/error-report.h" | |
19 | #include "qapi/error.h" | |
20 | #include "ram.h" | |
21 | #include "migration.h" | |
22 | #include "socket.h" | |
29647140 | 23 | #include "tls.h" |
d32ca5ad JQ |
24 | #include "qemu-file.h" |
25 | #include "trace.h" | |
26 | #include "multifd.h" | |
27 | ||
b5eea99e LS |
28 | #include "qemu/yank.h" |
29 | #include "io/channel-socket.h" | |
1a92d6d5 | 30 | #include "yank_functions.h" |
b5eea99e | 31 | |
d32ca5ad JQ |
32 | /* Multiple fd's */ |
33 | ||
34 | #define MULTIFD_MAGIC 0x11223344U | |
35 | #define MULTIFD_VERSION 1 | |
36 | ||
37 | typedef struct { | |
38 | uint32_t magic; | |
39 | uint32_t version; | |
40 | unsigned char uuid[16]; /* QemuUUID */ | |
41 | uint8_t id; | |
42 | uint8_t unused1[7]; /* Reserved for future use */ | |
43 | uint64_t unused2[4]; /* Reserved for future use */ | |
44 | } __attribute__((packed)) MultiFDInit_t; | |
45 | ||
ab7cbb0b JQ |
46 | /* Multifd without compression */ |
47 | ||
48 | /** | |
49 | * nocomp_send_setup: setup send side | |
50 | * | |
51 | * For no compression this function does nothing. | |
52 | * | |
53 | * Returns 0 for success or -1 for error | |
54 | * | |
55 | * @p: Params for the channel that we are using | |
56 | * @errp: pointer to an error | |
57 | */ | |
58 | static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) | |
59 | { | |
60 | return 0; | |
61 | } | |
62 | ||
63 | /** | |
64 | * nocomp_send_cleanup: cleanup send side | |
65 | * | |
66 | * For no compression this function does nothing. | |
67 | * | |
68 | * @p: Params for the channel that we are using | |
69 | */ | |
70 | static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) | |
71 | { | |
72 | return; | |
73 | } | |
74 | ||
75 | /** | |
76 | * nocomp_send_prepare: prepare date to be able to send | |
77 | * | |
78 | * For no compression we just have to calculate the size of the | |
79 | * packet. | |
80 | * | |
81 | * Returns 0 for success or -1 for error | |
82 | * | |
83 | * @p: Params for the channel that we are using | |
84 | * @used: number of pages used | |
85 | * @errp: pointer to an error | |
86 | */ | |
87 | static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used, | |
88 | Error **errp) | |
89 | { | |
90 | p->next_packet_size = used * qemu_target_page_size(); | |
91 | p->flags |= MULTIFD_FLAG_NOCOMP; | |
92 | return 0; | |
93 | } | |
94 | ||
95 | /** | |
96 | * nocomp_send_write: do the actual write of the data | |
97 | * | |
98 | * For no compression we just have to write the data. | |
99 | * | |
100 | * Returns 0 for success or -1 for error | |
101 | * | |
102 | * @p: Params for the channel that we are using | |
103 | * @used: number of pages used | |
104 | * @errp: pointer to an error | |
105 | */ | |
106 | static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp) | |
107 | { | |
108 | return qio_channel_writev_all(p->c, p->pages->iov, used, errp); | |
109 | } | |
110 | ||
111 | /** | |
112 | * nocomp_recv_setup: setup receive side | |
113 | * | |
114 | * For no compression this function does nothing. | |
115 | * | |
116 | * Returns 0 for success or -1 for error | |
117 | * | |
118 | * @p: Params for the channel that we are using | |
119 | * @errp: pointer to an error | |
120 | */ | |
121 | static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) | |
122 | { | |
123 | return 0; | |
124 | } | |
125 | ||
126 | /** | |
127 | * nocomp_recv_cleanup: setup receive side | |
128 | * | |
129 | * For no compression this function does nothing. | |
130 | * | |
131 | * @p: Params for the channel that we are using | |
132 | */ | |
133 | static void nocomp_recv_cleanup(MultiFDRecvParams *p) | |
134 | { | |
135 | } | |
136 | ||
137 | /** | |
138 | * nocomp_recv_pages: read the data from the channel into actual pages | |
139 | * | |
140 | * For no compression we just need to read things into the correct place. | |
141 | * | |
142 | * Returns 0 for success or -1 for error | |
143 | * | |
144 | * @p: Params for the channel that we are using | |
145 | * @used: number of pages used | |
146 | * @errp: pointer to an error | |
147 | */ | |
148 | static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp) | |
149 | { | |
150 | uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; | |
151 | ||
152 | if (flags != MULTIFD_FLAG_NOCOMP) { | |
153 | error_setg(errp, "multifd %d: flags received %x flags expected %x", | |
154 | p->id, flags, MULTIFD_FLAG_NOCOMP); | |
155 | return -1; | |
156 | } | |
157 | return qio_channel_readv_all(p->c, p->pages->iov, used, errp); | |
158 | } | |
159 | ||
160 | static MultiFDMethods multifd_nocomp_ops = { | |
161 | .send_setup = nocomp_send_setup, | |
162 | .send_cleanup = nocomp_send_cleanup, | |
163 | .send_prepare = nocomp_send_prepare, | |
164 | .send_write = nocomp_send_write, | |
165 | .recv_setup = nocomp_recv_setup, | |
166 | .recv_cleanup = nocomp_recv_cleanup, | |
167 | .recv_pages = nocomp_recv_pages | |
168 | }; | |
169 | ||
170 | static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { | |
171 | [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, | |
172 | }; | |
173 | ||
7ec2c2b3 JQ |
174 | void multifd_register_ops(int method, MultiFDMethods *ops) |
175 | { | |
176 | assert(0 < method && method < MULTIFD_COMPRESSION__MAX); | |
177 | multifd_ops[method] = ops; | |
178 | } | |
179 | ||
d32ca5ad JQ |
180 | static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) |
181 | { | |
182 | MultiFDInit_t msg = {}; | |
183 | int ret; | |
184 | ||
185 | msg.magic = cpu_to_be32(MULTIFD_MAGIC); | |
186 | msg.version = cpu_to_be32(MULTIFD_VERSION); | |
187 | msg.id = p->id; | |
188 | memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); | |
189 | ||
190 | ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); | |
191 | if (ret != 0) { | |
192 | return -1; | |
193 | } | |
194 | return 0; | |
195 | } | |
196 | ||
197 | static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) | |
198 | { | |
199 | MultiFDInit_t msg; | |
200 | int ret; | |
201 | ||
202 | ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); | |
203 | if (ret != 0) { | |
204 | return -1; | |
205 | } | |
206 | ||
207 | msg.magic = be32_to_cpu(msg.magic); | |
208 | msg.version = be32_to_cpu(msg.version); | |
209 | ||
210 | if (msg.magic != MULTIFD_MAGIC) { | |
211 | error_setg(errp, "multifd: received packet magic %x " | |
212 | "expected %x", msg.magic, MULTIFD_MAGIC); | |
213 | return -1; | |
214 | } | |
215 | ||
216 | if (msg.version != MULTIFD_VERSION) { | |
217 | error_setg(errp, "multifd: received packet version %d " | |
218 | "expected %d", msg.version, MULTIFD_VERSION); | |
219 | return -1; | |
220 | } | |
221 | ||
222 | if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { | |
223 | char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); | |
224 | char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); | |
225 | ||
226 | error_setg(errp, "multifd: received uuid '%s' and expected " | |
227 | "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); | |
228 | g_free(uuid); | |
229 | g_free(msg_uuid); | |
230 | return -1; | |
231 | } | |
232 | ||
233 | if (msg.id > migrate_multifd_channels()) { | |
234 | error_setg(errp, "multifd: received channel version %d " | |
235 | "expected %d", msg.version, MULTIFD_VERSION); | |
236 | return -1; | |
237 | } | |
238 | ||
239 | return msg.id; | |
240 | } | |
241 | ||
242 | static MultiFDPages_t *multifd_pages_init(size_t size) | |
243 | { | |
244 | MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); | |
245 | ||
246 | pages->allocated = size; | |
247 | pages->iov = g_new0(struct iovec, size); | |
248 | pages->offset = g_new0(ram_addr_t, size); | |
249 | ||
250 | return pages; | |
251 | } | |
252 | ||
253 | static void multifd_pages_clear(MultiFDPages_t *pages) | |
254 | { | |
255 | pages->used = 0; | |
256 | pages->allocated = 0; | |
257 | pages->packet_num = 0; | |
258 | pages->block = NULL; | |
259 | g_free(pages->iov); | |
260 | pages->iov = NULL; | |
261 | g_free(pages->offset); | |
262 | pages->offset = NULL; | |
263 | g_free(pages); | |
264 | } | |
265 | ||
266 | static void multifd_send_fill_packet(MultiFDSendParams *p) | |
267 | { | |
268 | MultiFDPacket_t *packet = p->packet; | |
269 | int i; | |
270 | ||
271 | packet->flags = cpu_to_be32(p->flags); | |
272 | packet->pages_alloc = cpu_to_be32(p->pages->allocated); | |
273 | packet->pages_used = cpu_to_be32(p->pages->used); | |
274 | packet->next_packet_size = cpu_to_be32(p->next_packet_size); | |
275 | packet->packet_num = cpu_to_be64(p->packet_num); | |
276 | ||
277 | if (p->pages->block) { | |
278 | strncpy(packet->ramblock, p->pages->block->idstr, 256); | |
279 | } | |
280 | ||
281 | for (i = 0; i < p->pages->used; i++) { | |
282 | /* there are architectures where ram_addr_t is 32 bit */ | |
283 | uint64_t temp = p->pages->offset[i]; | |
284 | ||
285 | packet->offset[i] = cpu_to_be64(temp); | |
286 | } | |
287 | } | |
288 | ||
289 | static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) | |
290 | { | |
291 | MultiFDPacket_t *packet = p->packet; | |
292 | uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size(); | |
293 | RAMBlock *block; | |
294 | int i; | |
295 | ||
296 | packet->magic = be32_to_cpu(packet->magic); | |
297 | if (packet->magic != MULTIFD_MAGIC) { | |
298 | error_setg(errp, "multifd: received packet " | |
299 | "magic %x and expected magic %x", | |
300 | packet->magic, MULTIFD_MAGIC); | |
301 | return -1; | |
302 | } | |
303 | ||
304 | packet->version = be32_to_cpu(packet->version); | |
305 | if (packet->version != MULTIFD_VERSION) { | |
306 | error_setg(errp, "multifd: received packet " | |
307 | "version %d and expected version %d", | |
308 | packet->version, MULTIFD_VERSION); | |
309 | return -1; | |
310 | } | |
311 | ||
312 | p->flags = be32_to_cpu(packet->flags); | |
313 | ||
314 | packet->pages_alloc = be32_to_cpu(packet->pages_alloc); | |
315 | /* | |
316 | * If we received a packet that is 100 times bigger than expected | |
317 | * just stop migration. It is a magic number. | |
318 | */ | |
319 | if (packet->pages_alloc > pages_max * 100) { | |
320 | error_setg(errp, "multifd: received packet " | |
321 | "with size %d and expected a maximum size of %d", | |
322 | packet->pages_alloc, pages_max * 100) ; | |
323 | return -1; | |
324 | } | |
325 | /* | |
326 | * We received a packet that is bigger than expected but inside | |
327 | * reasonable limits (see previous comment). Just reallocate. | |
328 | */ | |
329 | if (packet->pages_alloc > p->pages->allocated) { | |
330 | multifd_pages_clear(p->pages); | |
331 | p->pages = multifd_pages_init(packet->pages_alloc); | |
332 | } | |
333 | ||
334 | p->pages->used = be32_to_cpu(packet->pages_used); | |
335 | if (p->pages->used > packet->pages_alloc) { | |
336 | error_setg(errp, "multifd: received packet " | |
337 | "with %d pages and expected maximum pages are %d", | |
338 | p->pages->used, packet->pages_alloc) ; | |
339 | return -1; | |
340 | } | |
341 | ||
342 | p->next_packet_size = be32_to_cpu(packet->next_packet_size); | |
343 | p->packet_num = be64_to_cpu(packet->packet_num); | |
344 | ||
345 | if (p->pages->used == 0) { | |
346 | return 0; | |
347 | } | |
348 | ||
349 | /* make sure that ramblock is 0 terminated */ | |
350 | packet->ramblock[255] = 0; | |
351 | block = qemu_ram_block_by_name(packet->ramblock); | |
352 | if (!block) { | |
353 | error_setg(errp, "multifd: unknown ram block %s", | |
354 | packet->ramblock); | |
355 | return -1; | |
356 | } | |
357 | ||
358 | for (i = 0; i < p->pages->used; i++) { | |
359 | uint64_t offset = be64_to_cpu(packet->offset[i]); | |
360 | ||
361 | if (offset > (block->used_length - qemu_target_page_size())) { | |
362 | error_setg(errp, "multifd: offset too long %" PRIu64 | |
363 | " (max " RAM_ADDR_FMT ")", | |
364 | offset, block->max_length); | |
365 | return -1; | |
366 | } | |
367 | p->pages->iov[i].iov_base = block->host + offset; | |
368 | p->pages->iov[i].iov_len = qemu_target_page_size(); | |
369 | } | |
370 | ||
371 | return 0; | |
372 | } | |
373 | ||
374 | struct { | |
375 | MultiFDSendParams *params; | |
376 | /* array of pages to sent */ | |
377 | MultiFDPages_t *pages; | |
378 | /* global number of generated multifd packets */ | |
379 | uint64_t packet_num; | |
380 | /* send channels ready */ | |
381 | QemuSemaphore channels_ready; | |
382 | /* | |
383 | * Have we already run terminate threads. There is a race when it | |
384 | * happens that we got one error while we are exiting. | |
385 | * We will use atomic operations. Only valid values are 0 and 1. | |
386 | */ | |
387 | int exiting; | |
ab7cbb0b JQ |
388 | /* multifd ops */ |
389 | MultiFDMethods *ops; | |
d32ca5ad JQ |
390 | } *multifd_send_state; |
391 | ||
392 | /* | |
393 | * How we use multifd_send_state->pages and channel->pages? | |
394 | * | |
395 | * We create a pages for each channel, and a main one. Each time that | |
396 | * we need to send a batch of pages we interchange the ones between | |
397 | * multifd_send_state and the channel that is sending it. There are | |
398 | * two reasons for that: | |
399 | * - to not have to do so many mallocs during migration | |
400 | * - to make easier to know what to free at the end of migration | |
401 | * | |
402 | * This way we always know who is the owner of each "pages" struct, | |
403 | * and we don't need any locking. It belongs to the migration thread | |
404 | * or to the channel thread. Switching is safe because the migration | |
405 | * thread is using the channel mutex when changing it, and the channel | |
406 | * have to had finish with its own, otherwise pending_job can't be | |
407 | * false. | |
408 | */ | |
409 | ||
410 | static int multifd_send_pages(QEMUFile *f) | |
411 | { | |
412 | int i; | |
413 | static int next_channel; | |
414 | MultiFDSendParams *p = NULL; /* make happy gcc */ | |
415 | MultiFDPages_t *pages = multifd_send_state->pages; | |
416 | uint64_t transferred; | |
417 | ||
d73415a3 | 418 | if (qatomic_read(&multifd_send_state->exiting)) { |
d32ca5ad JQ |
419 | return -1; |
420 | } | |
421 | ||
422 | qemu_sem_wait(&multifd_send_state->channels_ready); | |
7e89a140 LV |
423 | /* |
424 | * next_channel can remain from a previous migration that was | |
425 | * using more channels, so ensure it doesn't overflow if the | |
426 | * limit is lower now. | |
427 | */ | |
428 | next_channel %= migrate_multifd_channels(); | |
d32ca5ad JQ |
429 | for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { |
430 | p = &multifd_send_state->params[i]; | |
431 | ||
432 | qemu_mutex_lock(&p->mutex); | |
433 | if (p->quit) { | |
434 | error_report("%s: channel %d has already quit!", __func__, i); | |
435 | qemu_mutex_unlock(&p->mutex); | |
436 | return -1; | |
437 | } | |
438 | if (!p->pending_job) { | |
439 | p->pending_job++; | |
440 | next_channel = (i + 1) % migrate_multifd_channels(); | |
441 | break; | |
442 | } | |
443 | qemu_mutex_unlock(&p->mutex); | |
444 | } | |
445 | assert(!p->pages->used); | |
446 | assert(!p->pages->block); | |
447 | ||
448 | p->packet_num = multifd_send_state->packet_num++; | |
449 | multifd_send_state->pages = p->pages; | |
450 | p->pages = pages; | |
451 | transferred = ((uint64_t) pages->used) * qemu_target_page_size() | |
452 | + p->packet_len; | |
453 | qemu_file_update_transfer(f, transferred); | |
454 | ram_counters.multifd_bytes += transferred; | |
df555094 | 455 | ram_counters.transferred += transferred; |
d32ca5ad JQ |
456 | qemu_mutex_unlock(&p->mutex); |
457 | qemu_sem_post(&p->sem); | |
458 | ||
459 | return 1; | |
460 | } | |
461 | ||
462 | int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) | |
463 | { | |
464 | MultiFDPages_t *pages = multifd_send_state->pages; | |
465 | ||
466 | if (!pages->block) { | |
467 | pages->block = block; | |
468 | } | |
469 | ||
470 | if (pages->block == block) { | |
471 | pages->offset[pages->used] = offset; | |
472 | pages->iov[pages->used].iov_base = block->host + offset; | |
473 | pages->iov[pages->used].iov_len = qemu_target_page_size(); | |
474 | pages->used++; | |
475 | ||
476 | if (pages->used < pages->allocated) { | |
477 | return 1; | |
478 | } | |
479 | } | |
480 | ||
481 | if (multifd_send_pages(f) < 0) { | |
482 | return -1; | |
483 | } | |
484 | ||
485 | if (pages->block != block) { | |
486 | return multifd_queue_page(f, block, offset); | |
487 | } | |
488 | ||
489 | return 1; | |
490 | } | |
491 | ||
492 | static void multifd_send_terminate_threads(Error *err) | |
493 | { | |
494 | int i; | |
495 | ||
496 | trace_multifd_send_terminate_threads(err != NULL); | |
497 | ||
498 | if (err) { | |
499 | MigrationState *s = migrate_get_current(); | |
500 | migrate_set_error(s, err); | |
501 | if (s->state == MIGRATION_STATUS_SETUP || | |
502 | s->state == MIGRATION_STATUS_PRE_SWITCHOVER || | |
503 | s->state == MIGRATION_STATUS_DEVICE || | |
504 | s->state == MIGRATION_STATUS_ACTIVE) { | |
505 | migrate_set_state(&s->state, s->state, | |
506 | MIGRATION_STATUS_FAILED); | |
507 | } | |
508 | } | |
509 | ||
510 | /* | |
511 | * We don't want to exit each threads twice. Depending on where | |
512 | * we get the error, or if there are two independent errors in two | |
513 | * threads at the same time, we can end calling this function | |
514 | * twice. | |
515 | */ | |
d73415a3 | 516 | if (qatomic_xchg(&multifd_send_state->exiting, 1)) { |
d32ca5ad JQ |
517 | return; |
518 | } | |
519 | ||
520 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
521 | MultiFDSendParams *p = &multifd_send_state->params[i]; | |
522 | ||
523 | qemu_mutex_lock(&p->mutex); | |
524 | p->quit = true; | |
525 | qemu_sem_post(&p->sem); | |
526 | qemu_mutex_unlock(&p->mutex); | |
527 | } | |
528 | } | |
529 | ||
530 | void multifd_save_cleanup(void) | |
531 | { | |
532 | int i; | |
533 | ||
534 | if (!migrate_use_multifd()) { | |
535 | return; | |
536 | } | |
537 | multifd_send_terminate_threads(NULL); | |
538 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
539 | MultiFDSendParams *p = &multifd_send_state->params[i]; | |
540 | ||
541 | if (p->running) { | |
542 | qemu_thread_join(&p->thread); | |
543 | } | |
544 | } | |
545 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
546 | MultiFDSendParams *p = &multifd_send_state->params[i]; | |
ab7cbb0b | 547 | Error *local_err = NULL; |
d32ca5ad JQ |
548 | |
549 | socket_send_channel_destroy(p->c); | |
550 | p->c = NULL; | |
551 | qemu_mutex_destroy(&p->mutex); | |
552 | qemu_sem_destroy(&p->sem); | |
553 | qemu_sem_destroy(&p->sem_sync); | |
554 | g_free(p->name); | |
555 | p->name = NULL; | |
8e5fa059 CZ |
556 | g_free(p->tls_hostname); |
557 | p->tls_hostname = NULL; | |
d32ca5ad JQ |
558 | multifd_pages_clear(p->pages); |
559 | p->pages = NULL; | |
560 | p->packet_len = 0; | |
561 | g_free(p->packet); | |
562 | p->packet = NULL; | |
ab7cbb0b JQ |
563 | multifd_send_state->ops->send_cleanup(p, &local_err); |
564 | if (local_err) { | |
565 | migrate_set_error(migrate_get_current(), local_err); | |
13f2cb21 | 566 | error_free(local_err); |
ab7cbb0b | 567 | } |
d32ca5ad JQ |
568 | } |
569 | qemu_sem_destroy(&multifd_send_state->channels_ready); | |
570 | g_free(multifd_send_state->params); | |
571 | multifd_send_state->params = NULL; | |
572 | multifd_pages_clear(multifd_send_state->pages); | |
573 | multifd_send_state->pages = NULL; | |
574 | g_free(multifd_send_state); | |
575 | multifd_send_state = NULL; | |
576 | } | |
577 | ||
578 | void multifd_send_sync_main(QEMUFile *f) | |
579 | { | |
580 | int i; | |
581 | ||
582 | if (!migrate_use_multifd()) { | |
583 | return; | |
584 | } | |
585 | if (multifd_send_state->pages->used) { | |
586 | if (multifd_send_pages(f) < 0) { | |
587 | error_report("%s: multifd_send_pages fail", __func__); | |
588 | return; | |
589 | } | |
590 | } | |
591 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
592 | MultiFDSendParams *p = &multifd_send_state->params[i]; | |
593 | ||
594 | trace_multifd_send_sync_main_signal(p->id); | |
595 | ||
596 | qemu_mutex_lock(&p->mutex); | |
597 | ||
598 | if (p->quit) { | |
599 | error_report("%s: channel %d has already quit", __func__, i); | |
600 | qemu_mutex_unlock(&p->mutex); | |
601 | return; | |
602 | } | |
603 | ||
604 | p->packet_num = multifd_send_state->packet_num++; | |
605 | p->flags |= MULTIFD_FLAG_SYNC; | |
606 | p->pending_job++; | |
607 | qemu_file_update_transfer(f, p->packet_len); | |
608 | ram_counters.multifd_bytes += p->packet_len; | |
609 | ram_counters.transferred += p->packet_len; | |
610 | qemu_mutex_unlock(&p->mutex); | |
611 | qemu_sem_post(&p->sem); | |
612 | } | |
613 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
614 | MultiFDSendParams *p = &multifd_send_state->params[i]; | |
615 | ||
616 | trace_multifd_send_sync_main_wait(p->id); | |
617 | qemu_sem_wait(&p->sem_sync); | |
618 | } | |
619 | trace_multifd_send_sync_main(multifd_send_state->packet_num); | |
620 | } | |
621 | ||
622 | static void *multifd_send_thread(void *opaque) | |
623 | { | |
624 | MultiFDSendParams *p = opaque; | |
625 | Error *local_err = NULL; | |
626 | int ret = 0; | |
627 | uint32_t flags = 0; | |
628 | ||
629 | trace_multifd_send_thread_start(p->id); | |
630 | rcu_register_thread(); | |
631 | ||
632 | if (multifd_send_initial_packet(p, &local_err) < 0) { | |
633 | ret = -1; | |
634 | goto out; | |
635 | } | |
636 | /* initial packet */ | |
637 | p->num_packets = 1; | |
638 | ||
639 | while (true) { | |
640 | qemu_sem_wait(&p->sem); | |
641 | ||
d73415a3 | 642 | if (qatomic_read(&multifd_send_state->exiting)) { |
d32ca5ad JQ |
643 | break; |
644 | } | |
645 | qemu_mutex_lock(&p->mutex); | |
646 | ||
647 | if (p->pending_job) { | |
648 | uint32_t used = p->pages->used; | |
649 | uint64_t packet_num = p->packet_num; | |
650 | flags = p->flags; | |
651 | ||
ab7cbb0b JQ |
652 | if (used) { |
653 | ret = multifd_send_state->ops->send_prepare(p, used, | |
654 | &local_err); | |
655 | if (ret != 0) { | |
656 | qemu_mutex_unlock(&p->mutex); | |
657 | break; | |
658 | } | |
659 | } | |
d32ca5ad JQ |
660 | multifd_send_fill_packet(p); |
661 | p->flags = 0; | |
662 | p->num_packets++; | |
663 | p->num_pages += used; | |
664 | p->pages->used = 0; | |
665 | p->pages->block = NULL; | |
666 | qemu_mutex_unlock(&p->mutex); | |
667 | ||
668 | trace_multifd_send(p->id, packet_num, used, flags, | |
669 | p->next_packet_size); | |
670 | ||
671 | ret = qio_channel_write_all(p->c, (void *)p->packet, | |
672 | p->packet_len, &local_err); | |
673 | if (ret != 0) { | |
674 | break; | |
675 | } | |
676 | ||
677 | if (used) { | |
ab7cbb0b | 678 | ret = multifd_send_state->ops->send_write(p, used, &local_err); |
d32ca5ad JQ |
679 | if (ret != 0) { |
680 | break; | |
681 | } | |
682 | } | |
683 | ||
684 | qemu_mutex_lock(&p->mutex); | |
685 | p->pending_job--; | |
686 | qemu_mutex_unlock(&p->mutex); | |
687 | ||
688 | if (flags & MULTIFD_FLAG_SYNC) { | |
689 | qemu_sem_post(&p->sem_sync); | |
690 | } | |
691 | qemu_sem_post(&multifd_send_state->channels_ready); | |
692 | } else if (p->quit) { | |
693 | qemu_mutex_unlock(&p->mutex); | |
694 | break; | |
695 | } else { | |
696 | qemu_mutex_unlock(&p->mutex); | |
697 | /* sometimes there are spurious wakeups */ | |
698 | } | |
699 | } | |
700 | ||
701 | out: | |
702 | if (local_err) { | |
703 | trace_multifd_send_error(p->id); | |
704 | multifd_send_terminate_threads(local_err); | |
13f2cb21 | 705 | error_free(local_err); |
d32ca5ad JQ |
706 | } |
707 | ||
708 | /* | |
709 | * Error happen, I will exit, but I can't just leave, tell | |
710 | * who pay attention to me. | |
711 | */ | |
712 | if (ret != 0) { | |
713 | qemu_sem_post(&p->sem_sync); | |
714 | qemu_sem_post(&multifd_send_state->channels_ready); | |
715 | } | |
716 | ||
717 | qemu_mutex_lock(&p->mutex); | |
718 | p->running = false; | |
719 | qemu_mutex_unlock(&p->mutex); | |
720 | ||
721 | rcu_unregister_thread(); | |
722 | trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); | |
723 | ||
724 | return NULL; | |
725 | } | |
726 | ||
29647140 CZ |
727 | static bool multifd_channel_connect(MultiFDSendParams *p, |
728 | QIOChannel *ioc, | |
729 | Error *error); | |
730 | ||
731 | static void multifd_tls_outgoing_handshake(QIOTask *task, | |
732 | gpointer opaque) | |
733 | { | |
734 | MultiFDSendParams *p = opaque; | |
735 | QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); | |
736 | Error *err = NULL; | |
737 | ||
894f0214 CZ |
738 | if (qio_task_propagate_error(task, &err)) { |
739 | trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err)); | |
740 | } else { | |
741 | trace_multifd_tls_outgoing_handshake_complete(ioc); | |
742 | } | |
fca67642 HW |
743 | |
744 | if (!multifd_channel_connect(p, ioc, err)) { | |
745 | /* | |
746 | * Error happen, mark multifd_send_thread status as 'quit' although it | |
747 | * is not created, and then tell who pay attention to me. | |
748 | */ | |
749 | p->quit = true; | |
750 | qemu_sem_post(&multifd_send_state->channels_ready); | |
751 | qemu_sem_post(&p->sem_sync); | |
752 | } | |
29647140 CZ |
753 | } |
754 | ||
a1af605b CZ |
755 | static void *multifd_tls_handshake_thread(void *opaque) |
756 | { | |
757 | MultiFDSendParams *p = opaque; | |
758 | QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c); | |
759 | ||
760 | qio_channel_tls_handshake(tioc, | |
761 | multifd_tls_outgoing_handshake, | |
762 | p, | |
763 | NULL, | |
764 | NULL); | |
765 | return NULL; | |
766 | } | |
767 | ||
29647140 CZ |
768 | static void multifd_tls_channel_connect(MultiFDSendParams *p, |
769 | QIOChannel *ioc, | |
770 | Error **errp) | |
771 | { | |
772 | MigrationState *s = migrate_get_current(); | |
773 | const char *hostname = p->tls_hostname; | |
774 | QIOChannelTLS *tioc; | |
775 | ||
776 | tioc = migration_tls_client_create(s, ioc, hostname, errp); | |
777 | if (!tioc) { | |
778 | return; | |
779 | } | |
780 | ||
9e842408 | 781 | object_unref(OBJECT(ioc)); |
894f0214 | 782 | trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); |
29647140 | 783 | qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); |
a1af605b CZ |
784 | p->c = QIO_CHANNEL(tioc); |
785 | qemu_thread_create(&p->thread, "multifd-tls-handshake-worker", | |
786 | multifd_tls_handshake_thread, p, | |
787 | QEMU_THREAD_JOINABLE); | |
29647140 CZ |
788 | } |
789 | ||
790 | static bool multifd_channel_connect(MultiFDSendParams *p, | |
791 | QIOChannel *ioc, | |
792 | Error *error) | |
793 | { | |
794 | MigrationState *s = migrate_get_current(); | |
795 | ||
894f0214 CZ |
796 | trace_multifd_set_outgoing_channel( |
797 | ioc, object_get_typename(OBJECT(ioc)), p->tls_hostname, error); | |
798 | ||
29647140 CZ |
799 | if (!error) { |
800 | if (s->parameters.tls_creds && | |
801 | *s->parameters.tls_creds && | |
802 | !object_dynamic_cast(OBJECT(ioc), | |
803 | TYPE_QIO_CHANNEL_TLS)) { | |
804 | multifd_tls_channel_connect(p, ioc, &error); | |
805 | if (!error) { | |
806 | /* | |
807 | * tls_channel_connect will call back to this | |
808 | * function after the TLS handshake, | |
809 | * so we mustn't call multifd_send_thread until then | |
810 | */ | |
29647140 | 811 | return true; |
a339149a HW |
812 | } else { |
813 | return false; | |
29647140 CZ |
814 | } |
815 | } else { | |
816 | /* update for tls qio channel */ | |
817 | p->c = ioc; | |
818 | qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, | |
819 | QEMU_THREAD_JOINABLE); | |
820 | } | |
a339149a | 821 | return true; |
29647140 CZ |
822 | } |
823 | ||
a339149a | 824 | return false; |
29647140 CZ |
825 | } |
826 | ||
03c7a42d CZ |
827 | static void multifd_new_send_channel_cleanup(MultiFDSendParams *p, |
828 | QIOChannel *ioc, Error *err) | |
829 | { | |
830 | migrate_set_error(migrate_get_current(), err); | |
831 | /* Error happen, we need to tell who pay attention to me */ | |
832 | qemu_sem_post(&multifd_send_state->channels_ready); | |
833 | qemu_sem_post(&p->sem_sync); | |
834 | /* | |
835 | * Although multifd_send_thread is not created, but main migration | |
836 | * thread neet to judge whether it is running, so we need to mark | |
837 | * its status. | |
838 | */ | |
839 | p->quit = true; | |
840 | object_unref(OBJECT(ioc)); | |
841 | error_free(err); | |
842 | } | |
843 | ||
d32ca5ad JQ |
844 | static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) |
845 | { | |
846 | MultiFDSendParams *p = opaque; | |
847 | QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); | |
848 | Error *local_err = NULL; | |
849 | ||
850 | trace_multifd_new_send_channel_async(p->id); | |
851 | if (qio_task_propagate_error(task, &local_err)) { | |
03c7a42d | 852 | goto cleanup; |
d32ca5ad JQ |
853 | } else { |
854 | p->c = QIO_CHANNEL(sioc); | |
855 | qio_channel_set_delay(p->c, false); | |
856 | p->running = true; | |
a339149a | 857 | if (!multifd_channel_connect(p, sioc, local_err)) { |
29647140 CZ |
858 | goto cleanup; |
859 | } | |
03c7a42d | 860 | return; |
d32ca5ad | 861 | } |
03c7a42d CZ |
862 | |
863 | cleanup: | |
864 | multifd_new_send_channel_cleanup(p, sioc, local_err); | |
d32ca5ad JQ |
865 | } |
866 | ||
867 | int multifd_save_setup(Error **errp) | |
868 | { | |
869 | int thread_count; | |
870 | uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); | |
871 | uint8_t i; | |
8e5fa059 | 872 | MigrationState *s; |
d32ca5ad JQ |
873 | |
874 | if (!migrate_use_multifd()) { | |
875 | return 0; | |
876 | } | |
8e5fa059 | 877 | s = migrate_get_current(); |
d32ca5ad JQ |
878 | thread_count = migrate_multifd_channels(); |
879 | multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); | |
880 | multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); | |
881 | multifd_send_state->pages = multifd_pages_init(page_count); | |
882 | qemu_sem_init(&multifd_send_state->channels_ready, 0); | |
d73415a3 | 883 | qatomic_set(&multifd_send_state->exiting, 0); |
ab7cbb0b | 884 | multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; |
d32ca5ad JQ |
885 | |
886 | for (i = 0; i < thread_count; i++) { | |
887 | MultiFDSendParams *p = &multifd_send_state->params[i]; | |
888 | ||
889 | qemu_mutex_init(&p->mutex); | |
890 | qemu_sem_init(&p->sem, 0); | |
891 | qemu_sem_init(&p->sem_sync, 0); | |
892 | p->quit = false; | |
893 | p->pending_job = 0; | |
894 | p->id = i; | |
895 | p->pages = multifd_pages_init(page_count); | |
896 | p->packet_len = sizeof(MultiFDPacket_t) | |
897 | + sizeof(uint64_t) * page_count; | |
898 | p->packet = g_malloc0(p->packet_len); | |
899 | p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); | |
900 | p->packet->version = cpu_to_be32(MULTIFD_VERSION); | |
901 | p->name = g_strdup_printf("multifdsend_%d", i); | |
8e5fa059 | 902 | p->tls_hostname = g_strdup(s->hostname); |
d32ca5ad JQ |
903 | socket_send_channel_create(multifd_new_send_channel_async, p); |
904 | } | |
ab7cbb0b JQ |
905 | |
906 | for (i = 0; i < thread_count; i++) { | |
907 | MultiFDSendParams *p = &multifd_send_state->params[i]; | |
908 | Error *local_err = NULL; | |
909 | int ret; | |
910 | ||
911 | ret = multifd_send_state->ops->send_setup(p, &local_err); | |
912 | if (ret) { | |
913 | error_propagate(errp, local_err); | |
914 | return ret; | |
915 | } | |
916 | } | |
d32ca5ad JQ |
917 | return 0; |
918 | } | |
919 | ||
920 | struct { | |
921 | MultiFDRecvParams *params; | |
922 | /* number of created threads */ | |
923 | int count; | |
924 | /* syncs main thread and channels */ | |
925 | QemuSemaphore sem_sync; | |
926 | /* global number of generated multifd packets */ | |
927 | uint64_t packet_num; | |
ab7cbb0b JQ |
928 | /* multifd ops */ |
929 | MultiFDMethods *ops; | |
d32ca5ad JQ |
930 | } *multifd_recv_state; |
931 | ||
932 | static void multifd_recv_terminate_threads(Error *err) | |
933 | { | |
934 | int i; | |
935 | ||
936 | trace_multifd_recv_terminate_threads(err != NULL); | |
937 | ||
938 | if (err) { | |
939 | MigrationState *s = migrate_get_current(); | |
940 | migrate_set_error(s, err); | |
941 | if (s->state == MIGRATION_STATUS_SETUP || | |
942 | s->state == MIGRATION_STATUS_ACTIVE) { | |
943 | migrate_set_state(&s->state, s->state, | |
944 | MIGRATION_STATUS_FAILED); | |
945 | } | |
946 | } | |
947 | ||
948 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
949 | MultiFDRecvParams *p = &multifd_recv_state->params[i]; | |
950 | ||
951 | qemu_mutex_lock(&p->mutex); | |
952 | p->quit = true; | |
953 | /* | |
954 | * We could arrive here for two reasons: | |
955 | * - normal quit, i.e. everything went fine, just finished | |
956 | * - error quit: We close the channels so the channel threads | |
957 | * finish the qio_channel_read_all_eof() | |
958 | */ | |
959 | if (p->c) { | |
960 | qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); | |
961 | } | |
962 | qemu_mutex_unlock(&p->mutex); | |
963 | } | |
964 | } | |
965 | ||
966 | int multifd_load_cleanup(Error **errp) | |
967 | { | |
968 | int i; | |
d32ca5ad JQ |
969 | |
970 | if (!migrate_use_multifd()) { | |
971 | return 0; | |
972 | } | |
973 | multifd_recv_terminate_threads(NULL); | |
974 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
975 | MultiFDRecvParams *p = &multifd_recv_state->params[i]; | |
976 | ||
977 | if (p->running) { | |
978 | p->quit = true; | |
979 | /* | |
980 | * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, | |
981 | * however try to wakeup it without harm in cleanup phase. | |
982 | */ | |
983 | qemu_sem_post(&p->sem_sync); | |
984 | qemu_thread_join(&p->thread); | |
985 | } | |
986 | } | |
987 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
988 | MultiFDRecvParams *p = &multifd_recv_state->params[i]; | |
989 | ||
b5eea99e LS |
990 | if (object_dynamic_cast(OBJECT(p->c), TYPE_QIO_CHANNEL_SOCKET) |
991 | && OBJECT(p->c)->ref == 1) { | |
992 | yank_unregister_function(MIGRATION_YANK_INSTANCE, | |
1a92d6d5 | 993 | migration_yank_iochannel, |
b5eea99e LS |
994 | QIO_CHANNEL(p->c)); |
995 | } | |
996 | ||
d32ca5ad JQ |
997 | object_unref(OBJECT(p->c)); |
998 | p->c = NULL; | |
999 | qemu_mutex_destroy(&p->mutex); | |
1000 | qemu_sem_destroy(&p->sem_sync); | |
1001 | g_free(p->name); | |
1002 | p->name = NULL; | |
1003 | multifd_pages_clear(p->pages); | |
1004 | p->pages = NULL; | |
1005 | p->packet_len = 0; | |
1006 | g_free(p->packet); | |
1007 | p->packet = NULL; | |
ab7cbb0b | 1008 | multifd_recv_state->ops->recv_cleanup(p); |
d32ca5ad JQ |
1009 | } |
1010 | qemu_sem_destroy(&multifd_recv_state->sem_sync); | |
1011 | g_free(multifd_recv_state->params); | |
1012 | multifd_recv_state->params = NULL; | |
1013 | g_free(multifd_recv_state); | |
1014 | multifd_recv_state = NULL; | |
1015 | ||
ab7cbb0b | 1016 | return 0; |
d32ca5ad JQ |
1017 | } |
1018 | ||
1019 | void multifd_recv_sync_main(void) | |
1020 | { | |
1021 | int i; | |
1022 | ||
1023 | if (!migrate_use_multifd()) { | |
1024 | return; | |
1025 | } | |
1026 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
1027 | MultiFDRecvParams *p = &multifd_recv_state->params[i]; | |
1028 | ||
1029 | trace_multifd_recv_sync_main_wait(p->id); | |
1030 | qemu_sem_wait(&multifd_recv_state->sem_sync); | |
1031 | } | |
1032 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
1033 | MultiFDRecvParams *p = &multifd_recv_state->params[i]; | |
1034 | ||
6e8a355d DB |
1035 | WITH_QEMU_LOCK_GUARD(&p->mutex) { |
1036 | if (multifd_recv_state->packet_num < p->packet_num) { | |
1037 | multifd_recv_state->packet_num = p->packet_num; | |
1038 | } | |
d32ca5ad | 1039 | } |
d32ca5ad JQ |
1040 | trace_multifd_recv_sync_main_signal(p->id); |
1041 | qemu_sem_post(&p->sem_sync); | |
1042 | } | |
1043 | trace_multifd_recv_sync_main(multifd_recv_state->packet_num); | |
1044 | } | |
1045 | ||
1046 | static void *multifd_recv_thread(void *opaque) | |
1047 | { | |
1048 | MultiFDRecvParams *p = opaque; | |
1049 | Error *local_err = NULL; | |
1050 | int ret; | |
1051 | ||
1052 | trace_multifd_recv_thread_start(p->id); | |
1053 | rcu_register_thread(); | |
1054 | ||
1055 | while (true) { | |
1056 | uint32_t used; | |
1057 | uint32_t flags; | |
1058 | ||
1059 | if (p->quit) { | |
1060 | break; | |
1061 | } | |
1062 | ||
1063 | ret = qio_channel_read_all_eof(p->c, (void *)p->packet, | |
1064 | p->packet_len, &local_err); | |
1065 | if (ret == 0) { /* EOF */ | |
1066 | break; | |
1067 | } | |
1068 | if (ret == -1) { /* Error */ | |
1069 | break; | |
1070 | } | |
1071 | ||
1072 | qemu_mutex_lock(&p->mutex); | |
1073 | ret = multifd_recv_unfill_packet(p, &local_err); | |
1074 | if (ret) { | |
1075 | qemu_mutex_unlock(&p->mutex); | |
1076 | break; | |
1077 | } | |
1078 | ||
1079 | used = p->pages->used; | |
1080 | flags = p->flags; | |
ab7cbb0b JQ |
1081 | /* recv methods don't know how to handle the SYNC flag */ |
1082 | p->flags &= ~MULTIFD_FLAG_SYNC; | |
d32ca5ad JQ |
1083 | trace_multifd_recv(p->id, p->packet_num, used, flags, |
1084 | p->next_packet_size); | |
1085 | p->num_packets++; | |
1086 | p->num_pages += used; | |
1087 | qemu_mutex_unlock(&p->mutex); | |
1088 | ||
1089 | if (used) { | |
ab7cbb0b | 1090 | ret = multifd_recv_state->ops->recv_pages(p, used, &local_err); |
d32ca5ad JQ |
1091 | if (ret != 0) { |
1092 | break; | |
1093 | } | |
1094 | } | |
1095 | ||
1096 | if (flags & MULTIFD_FLAG_SYNC) { | |
1097 | qemu_sem_post(&multifd_recv_state->sem_sync); | |
1098 | qemu_sem_wait(&p->sem_sync); | |
1099 | } | |
1100 | } | |
1101 | ||
1102 | if (local_err) { | |
1103 | multifd_recv_terminate_threads(local_err); | |
13f2cb21 | 1104 | error_free(local_err); |
d32ca5ad JQ |
1105 | } |
1106 | qemu_mutex_lock(&p->mutex); | |
1107 | p->running = false; | |
1108 | qemu_mutex_unlock(&p->mutex); | |
1109 | ||
1110 | rcu_unregister_thread(); | |
1111 | trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); | |
1112 | ||
1113 | return NULL; | |
1114 | } | |
1115 | ||
1116 | int multifd_load_setup(Error **errp) | |
1117 | { | |
1118 | int thread_count; | |
1119 | uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); | |
1120 | uint8_t i; | |
1121 | ||
1122 | if (!migrate_use_multifd()) { | |
1123 | return 0; | |
1124 | } | |
1125 | thread_count = migrate_multifd_channels(); | |
1126 | multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); | |
1127 | multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); | |
d73415a3 | 1128 | qatomic_set(&multifd_recv_state->count, 0); |
d32ca5ad | 1129 | qemu_sem_init(&multifd_recv_state->sem_sync, 0); |
ab7cbb0b | 1130 | multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; |
d32ca5ad JQ |
1131 | |
1132 | for (i = 0; i < thread_count; i++) { | |
1133 | MultiFDRecvParams *p = &multifd_recv_state->params[i]; | |
1134 | ||
1135 | qemu_mutex_init(&p->mutex); | |
1136 | qemu_sem_init(&p->sem_sync, 0); | |
1137 | p->quit = false; | |
1138 | p->id = i; | |
1139 | p->pages = multifd_pages_init(page_count); | |
1140 | p->packet_len = sizeof(MultiFDPacket_t) | |
1141 | + sizeof(uint64_t) * page_count; | |
1142 | p->packet = g_malloc0(p->packet_len); | |
1143 | p->name = g_strdup_printf("multifdrecv_%d", i); | |
1144 | } | |
ab7cbb0b JQ |
1145 | |
1146 | for (i = 0; i < thread_count; i++) { | |
1147 | MultiFDRecvParams *p = &multifd_recv_state->params[i]; | |
1148 | Error *local_err = NULL; | |
1149 | int ret; | |
1150 | ||
1151 | ret = multifd_recv_state->ops->recv_setup(p, &local_err); | |
1152 | if (ret) { | |
1153 | error_propagate(errp, local_err); | |
1154 | return ret; | |
1155 | } | |
1156 | } | |
d32ca5ad JQ |
1157 | return 0; |
1158 | } | |
1159 | ||
1160 | bool multifd_recv_all_channels_created(void) | |
1161 | { | |
1162 | int thread_count = migrate_multifd_channels(); | |
1163 | ||
1164 | if (!migrate_use_multifd()) { | |
1165 | return true; | |
1166 | } | |
1167 | ||
d73415a3 | 1168 | return thread_count == qatomic_read(&multifd_recv_state->count); |
d32ca5ad JQ |
1169 | } |
1170 | ||
1171 | /* | |
1172 | * Try to receive all multifd channels to get ready for the migration. | |
3a4452d8 | 1173 | * - Return true and do not set @errp when correctly receiving all channels; |
d32ca5ad JQ |
1174 | * - Return false and do not set @errp when correctly receiving the current one; |
1175 | * - Return false and set @errp when failing to receive the current channel. | |
1176 | */ | |
1177 | bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) | |
1178 | { | |
1179 | MultiFDRecvParams *p; | |
1180 | Error *local_err = NULL; | |
1181 | int id; | |
1182 | ||
1183 | id = multifd_recv_initial_packet(ioc, &local_err); | |
1184 | if (id < 0) { | |
1185 | multifd_recv_terminate_threads(local_err); | |
1186 | error_propagate_prepend(errp, local_err, | |
1187 | "failed to receive packet" | |
1188 | " via multifd channel %d: ", | |
d73415a3 | 1189 | qatomic_read(&multifd_recv_state->count)); |
d32ca5ad JQ |
1190 | return false; |
1191 | } | |
1192 | trace_multifd_recv_new_channel(id); | |
1193 | ||
1194 | p = &multifd_recv_state->params[id]; | |
1195 | if (p->c != NULL) { | |
1196 | error_setg(&local_err, "multifd: received id '%d' already setup'", | |
1197 | id); | |
1198 | multifd_recv_terminate_threads(local_err); | |
1199 | error_propagate(errp, local_err); | |
1200 | return false; | |
1201 | } | |
1202 | p->c = ioc; | |
1203 | object_ref(OBJECT(ioc)); | |
1204 | /* initial packet */ | |
1205 | p->num_packets = 1; | |
1206 | ||
1207 | p->running = true; | |
1208 | qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, | |
1209 | QEMU_THREAD_JOINABLE); | |
d73415a3 SH |
1210 | qatomic_inc(&multifd_recv_state->count); |
1211 | return qatomic_read(&multifd_recv_state->count) == | |
d32ca5ad JQ |
1212 | migrate_multifd_channels(); |
1213 | } |