]>
Commit | Line | Data |
---|---|---|
d32ca5ad JQ |
1 | /* |
2 | * Multifd common code | |
3 | * | |
4 | * Copyright (c) 2019-2020 Red Hat Inc | |
5 | * | |
6 | * Authors: | |
7 | * Juan Quintela <quintela@redhat.com> | |
8 | * | |
9 | * This work is licensed under the terms of the GNU GPL, version 2 or later. | |
10 | * See the COPYING file in the top-level directory. | |
11 | */ | |
12 | ||
13 | #include "qemu/osdep.h" | |
303e6f54 | 14 | #include "qemu/cutils.h" |
d32ca5ad JQ |
15 | #include "qemu/rcu.h" |
16 | #include "exec/target_page.h" | |
17 | #include "sysemu/sysemu.h" | |
18 | #include "exec/ramblock.h" | |
19 | #include "qemu/error-report.h" | |
20 | #include "qapi/error.h" | |
b7b03eb6 | 21 | #include "file.h" |
d32ca5ad | 22 | #include "migration.h" |
947701cc | 23 | #include "migration-stats.h" |
d32ca5ad | 24 | #include "socket.h" |
29647140 | 25 | #include "tls.h" |
d32ca5ad JQ |
26 | #include "qemu-file.h" |
27 | #include "trace.h" | |
28 | #include "multifd.h" | |
1b1f4ab6 | 29 | #include "threadinfo.h" |
b4bc342c | 30 | #include "options.h" |
b5eea99e | 31 | #include "qemu/yank.h" |
b7b03eb6 | 32 | #include "io/channel-file.h" |
b5eea99e | 33 | #include "io/channel-socket.h" |
1a92d6d5 | 34 | #include "yank_functions.h" |
b5eea99e | 35 | |
d32ca5ad JQ |
36 | /* Multiple fd's */ |
37 | ||
38 | #define MULTIFD_MAGIC 0x11223344U | |
39 | #define MULTIFD_VERSION 1 | |
40 | ||
41 | typedef struct { | |
42 | uint32_t magic; | |
43 | uint32_t version; | |
44 | unsigned char uuid[16]; /* QemuUUID */ | |
45 | uint8_t id; | |
46 | uint8_t unused1[7]; /* Reserved for future use */ | |
47 | uint64_t unused2[4]; /* Reserved for future use */ | |
48 | } __attribute__((packed)) MultiFDInit_t; | |
49 | ||
98ea497d PX |
50 | struct { |
51 | MultiFDSendParams *params; | |
52 | /* array of pages to sent */ | |
53 | MultiFDPages_t *pages; | |
54 | /* | |
55 | * Global number of generated multifd packets. | |
56 | * | |
57 | * Note that we used 'uintptr_t' because it'll naturally support atomic | |
58 | * operations on both 32bit / 64 bits hosts. It means on 32bit systems | |
59 | * multifd will overflow the packet_num easier, but that should be | |
60 | * fine. | |
61 | * | |
62 | * Another option is to use QEMU's Stat64 then it'll be 64 bits on all | |
63 | * hosts, however so far it does not support atomic fetch_add() yet. | |
64 | * Make it easy for now. | |
65 | */ | |
66 | uintptr_t packet_num; | |
93fa9dc2 FR |
67 | /* |
68 | * Synchronization point past which no more channels will be | |
69 | * created. | |
70 | */ | |
71 | QemuSemaphore channels_created; | |
98ea497d PX |
72 | /* send channels ready */ |
73 | QemuSemaphore channels_ready; | |
74 | /* | |
75 | * Have we already run terminate threads. There is a race when it | |
76 | * happens that we got one error while we are exiting. | |
77 | * We will use atomic operations. Only valid values are 0 and 1. | |
78 | */ | |
79 | int exiting; | |
80 | /* multifd ops */ | |
81 | MultiFDMethods *ops; | |
82 | } *multifd_send_state; | |
83 | ||
11dd7be5 FR |
84 | struct { |
85 | MultiFDRecvParams *params; | |
d117ed06 | 86 | MultiFDRecvData *data; |
11dd7be5 FR |
87 | /* number of created threads */ |
88 | int count; | |
d117ed06 FR |
89 | /* |
90 | * This is always posted by the recv threads, the migration thread | |
91 | * uses it to wait for recv threads to finish assigned tasks. | |
92 | */ | |
11dd7be5 FR |
93 | QemuSemaphore sem_sync; |
94 | /* global number of generated multifd packets */ | |
95 | uint64_t packet_num; | |
96 | int exiting; | |
97 | /* multifd ops */ | |
98 | MultiFDMethods *ops; | |
99 | } *multifd_recv_state; | |
100 | ||
06833d83 FR |
101 | static bool multifd_use_packets(void) |
102 | { | |
103 | return !migrate_mapped_ram(); | |
104 | } | |
105 | ||
a8a3e710 FR |
106 | void multifd_send_channel_created(void) |
107 | { | |
108 | qemu_sem_post(&multifd_send_state->channels_created); | |
109 | } | |
110 | ||
f427d90b FR |
111 | static void multifd_set_file_bitmap(MultiFDSendParams *p) |
112 | { | |
113 | MultiFDPages_t *pages = p->pages; | |
114 | ||
115 | assert(pages->block); | |
116 | ||
303e6f54 | 117 | for (int i = 0; i < p->pages->normal_num; i++) { |
c3cdf3fb | 118 | ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true); |
f427d90b | 119 | } |
303e6f54 | 120 | |
8fa1a21c | 121 | for (int i = p->pages->normal_num; i < p->pages->num; i++) { |
303e6f54 HX |
122 | ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false); |
123 | } | |
f427d90b FR |
124 | } |
125 | ||
ab7cbb0b JQ |
126 | /* Multifd without compression */ |
127 | ||
128 | /** | |
129 | * nocomp_send_setup: setup send side | |
130 | * | |
ab7cbb0b JQ |
131 | * @p: Params for the channel that we are using |
132 | * @errp: pointer to an error | |
133 | */ | |
134 | static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) | |
135 | { | |
25a1f878 PX |
136 | if (migrate_zero_copy_send()) { |
137 | p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; | |
138 | } | |
139 | ||
ab7cbb0b JQ |
140 | return 0; |
141 | } | |
142 | ||
143 | /** | |
144 | * nocomp_send_cleanup: cleanup send side | |
145 | * | |
146 | * For no compression this function does nothing. | |
147 | * | |
148 | * @p: Params for the channel that we are using | |
18ede636 | 149 | * @errp: pointer to an error |
ab7cbb0b JQ |
150 | */ |
151 | static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) | |
152 | { | |
153 | return; | |
154 | } | |
155 | ||
06833d83 FR |
156 | static void multifd_send_prepare_iovs(MultiFDSendParams *p) |
157 | { | |
158 | MultiFDPages_t *pages = p->pages; | |
159 | ||
303e6f54 | 160 | for (int i = 0; i < pages->normal_num; i++) { |
06833d83 FR |
161 | p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; |
162 | p->iov[p->iovs_num].iov_len = p->page_size; | |
163 | p->iovs_num++; | |
164 | } | |
165 | ||
303e6f54 | 166 | p->next_packet_size = pages->normal_num * p->page_size; |
06833d83 FR |
167 | } |
168 | ||
ab7cbb0b JQ |
169 | /** |
170 | * nocomp_send_prepare: prepare date to be able to send | |
171 | * | |
172 | * For no compression we just have to calculate the size of the | |
173 | * packet. | |
174 | * | |
175 | * Returns 0 for success or -1 for error | |
176 | * | |
177 | * @p: Params for the channel that we are using | |
ab7cbb0b JQ |
178 | * @errp: pointer to an error |
179 | */ | |
02fb8104 | 180 | static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) |
ab7cbb0b | 181 | { |
25a1f878 | 182 | bool use_zero_copy_send = migrate_zero_copy_send(); |
25a1f878 PX |
183 | int ret; |
184 | ||
303e6f54 HX |
185 | multifd_send_zero_page_detect(p); |
186 | ||
06833d83 FR |
187 | if (!multifd_use_packets()) { |
188 | multifd_send_prepare_iovs(p); | |
f427d90b FR |
189 | multifd_set_file_bitmap(p); |
190 | ||
06833d83 FR |
191 | return 0; |
192 | } | |
193 | ||
25a1f878 PX |
194 | if (!use_zero_copy_send) { |
195 | /* | |
196 | * Only !zerocopy needs the header in IOV; zerocopy will | |
197 | * send it separately. | |
198 | */ | |
199 | multifd_send_prepare_header(p); | |
200 | } | |
226468ba | 201 | |
06833d83 | 202 | multifd_send_prepare_iovs(p); |
ab7cbb0b | 203 | p->flags |= MULTIFD_FLAG_NOCOMP; |
25a1f878 PX |
204 | |
205 | multifd_send_fill_packet(p); | |
206 | ||
207 | if (use_zero_copy_send) { | |
208 | /* Send header first, without zerocopy */ | |
209 | ret = qio_channel_write_all(p->c, (void *)p->packet, | |
210 | p->packet_len, errp); | |
211 | if (ret != 0) { | |
212 | return -1; | |
213 | } | |
214 | } | |
215 | ||
ab7cbb0b JQ |
216 | return 0; |
217 | } | |
218 | ||
ab7cbb0b JQ |
219 | /** |
220 | * nocomp_recv_setup: setup receive side | |
221 | * | |
222 | * For no compression this function does nothing. | |
223 | * | |
224 | * Returns 0 for success or -1 for error | |
225 | * | |
226 | * @p: Params for the channel that we are using | |
227 | * @errp: pointer to an error | |
228 | */ | |
229 | static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) | |
230 | { | |
231 | return 0; | |
232 | } | |
233 | ||
234 | /** | |
235 | * nocomp_recv_cleanup: setup receive side | |
236 | * | |
237 | * For no compression this function does nothing. | |
238 | * | |
239 | * @p: Params for the channel that we are using | |
240 | */ | |
241 | static void nocomp_recv_cleanup(MultiFDRecvParams *p) | |
242 | { | |
243 | } | |
244 | ||
245 | /** | |
9db19125 | 246 | * nocomp_recv: read the data from the channel |
ab7cbb0b JQ |
247 | * |
248 | * For no compression we just need to read things into the correct place. | |
249 | * | |
250 | * Returns 0 for success or -1 for error | |
251 | * | |
252 | * @p: Params for the channel that we are using | |
ab7cbb0b JQ |
253 | * @errp: pointer to an error |
254 | */ | |
9db19125 | 255 | static int nocomp_recv(MultiFDRecvParams *p, Error **errp) |
ab7cbb0b | 256 | { |
06833d83 FR |
257 | uint32_t flags; |
258 | ||
259 | if (!multifd_use_packets()) { | |
a49d15a3 | 260 | return multifd_file_recv_data(p, errp); |
06833d83 FR |
261 | } |
262 | ||
263 | flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; | |
ab7cbb0b JQ |
264 | |
265 | if (flags != MULTIFD_FLAG_NOCOMP) { | |
04e11404 | 266 | error_setg(errp, "multifd %u: flags received %x flags expected %x", |
ab7cbb0b JQ |
267 | p->id, flags, MULTIFD_FLAG_NOCOMP); |
268 | return -1; | |
269 | } | |
303e6f54 HX |
270 | |
271 | multifd_recv_zero_page_process(p); | |
272 | ||
273 | if (!p->normal_num) { | |
274 | return 0; | |
275 | } | |
276 | ||
cf2d4aa8 | 277 | for (int i = 0; i < p->normal_num; i++) { |
faf60935 | 278 | p->iov[i].iov_base = p->host + p->normal[i]; |
ddec20f8 | 279 | p->iov[i].iov_len = p->page_size; |
5ef7e26b | 280 | ramblock_recv_bitmap_set_offset(p->block, p->normal[i]); |
226468ba | 281 | } |
cf2d4aa8 | 282 | return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); |
ab7cbb0b JQ |
283 | } |
284 | ||
285 | static MultiFDMethods multifd_nocomp_ops = { | |
286 | .send_setup = nocomp_send_setup, | |
287 | .send_cleanup = nocomp_send_cleanup, | |
288 | .send_prepare = nocomp_send_prepare, | |
ab7cbb0b JQ |
289 | .recv_setup = nocomp_recv_setup, |
290 | .recv_cleanup = nocomp_recv_cleanup, | |
9db19125 | 291 | .recv = nocomp_recv |
ab7cbb0b JQ |
292 | }; |
293 | ||
294 | static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { | |
295 | [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, | |
296 | }; | |
297 | ||
7ec2c2b3 JQ |
298 | void multifd_register_ops(int method, MultiFDMethods *ops) |
299 | { | |
300 | assert(0 < method && method < MULTIFD_COMPRESSION__MAX); | |
301 | multifd_ops[method] = ops; | |
302 | } | |
303 | ||
836eca47 PX |
304 | /* Reset a MultiFDPages_t* object for the next use */ |
305 | static void multifd_pages_reset(MultiFDPages_t *pages) | |
306 | { | |
307 | /* | |
308 | * We don't need to touch offset[] array, because it will be | |
309 | * overwritten later when reused. | |
310 | */ | |
311 | pages->num = 0; | |
303e6f54 | 312 | pages->normal_num = 0; |
836eca47 PX |
313 | pages->block = NULL; |
314 | } | |
315 | ||
d32ca5ad JQ |
316 | static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) |
317 | { | |
318 | MultiFDInit_t msg = {}; | |
cbec7eb7 | 319 | size_t size = sizeof(msg); |
d32ca5ad JQ |
320 | int ret; |
321 | ||
322 | msg.magic = cpu_to_be32(MULTIFD_MAGIC); | |
323 | msg.version = cpu_to_be32(MULTIFD_VERSION); | |
324 | msg.id = p->id; | |
325 | memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); | |
326 | ||
cbec7eb7 | 327 | ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); |
d32ca5ad JQ |
328 | if (ret != 0) { |
329 | return -1; | |
330 | } | |
cbec7eb7 | 331 | stat64_add(&mig_stats.multifd_bytes, size); |
d32ca5ad JQ |
332 | return 0; |
333 | } | |
334 | ||
335 | static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) | |
336 | { | |
337 | MultiFDInit_t msg; | |
338 | int ret; | |
339 | ||
340 | ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); | |
341 | if (ret != 0) { | |
342 | return -1; | |
343 | } | |
344 | ||
345 | msg.magic = be32_to_cpu(msg.magic); | |
346 | msg.version = be32_to_cpu(msg.version); | |
347 | ||
348 | if (msg.magic != MULTIFD_MAGIC) { | |
349 | error_setg(errp, "multifd: received packet magic %x " | |
350 | "expected %x", msg.magic, MULTIFD_MAGIC); | |
351 | return -1; | |
352 | } | |
353 | ||
354 | if (msg.version != MULTIFD_VERSION) { | |
04e11404 JQ |
355 | error_setg(errp, "multifd: received packet version %u " |
356 | "expected %u", msg.version, MULTIFD_VERSION); | |
d32ca5ad JQ |
357 | return -1; |
358 | } | |
359 | ||
360 | if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { | |
361 | char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); | |
362 | char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); | |
363 | ||
364 | error_setg(errp, "multifd: received uuid '%s' and expected " | |
365 | "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); | |
366 | g_free(uuid); | |
367 | g_free(msg_uuid); | |
368 | return -1; | |
369 | } | |
370 | ||
371 | if (msg.id > migrate_multifd_channels()) { | |
c77b4085 AH |
372 | error_setg(errp, "multifd: received channel id %u is greater than " |
373 | "number of channels %u", msg.id, migrate_multifd_channels()); | |
d32ca5ad JQ |
374 | return -1; |
375 | } | |
376 | ||
377 | return msg.id; | |
378 | } | |
379 | ||
6074f816 | 380 | static MultiFDPages_t *multifd_pages_init(uint32_t n) |
d32ca5ad JQ |
381 | { |
382 | MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); | |
383 | ||
6074f816 FR |
384 | pages->allocated = n; |
385 | pages->offset = g_new0(ram_addr_t, n); | |
d32ca5ad JQ |
386 | |
387 | return pages; | |
388 | } | |
389 | ||
390 | static void multifd_pages_clear(MultiFDPages_t *pages) | |
391 | { | |
836eca47 | 392 | multifd_pages_reset(pages); |
d32ca5ad | 393 | pages->allocated = 0; |
d32ca5ad JQ |
394 | g_free(pages->offset); |
395 | pages->offset = NULL; | |
396 | g_free(pages); | |
397 | } | |
398 | ||
25a1f878 | 399 | void multifd_send_fill_packet(MultiFDSendParams *p) |
d32ca5ad JQ |
400 | { |
401 | MultiFDPacket_t *packet = p->packet; | |
efd8c543 | 402 | MultiFDPages_t *pages = p->pages; |
98ea497d | 403 | uint64_t packet_num; |
303e6f54 | 404 | uint32_t zero_num = pages->num - pages->normal_num; |
d32ca5ad JQ |
405 | int i; |
406 | ||
407 | packet->flags = cpu_to_be32(p->flags); | |
408 | packet->pages_alloc = cpu_to_be32(p->pages->allocated); | |
303e6f54 HX |
409 | packet->normal_pages = cpu_to_be32(pages->normal_num); |
410 | packet->zero_pages = cpu_to_be32(zero_num); | |
d32ca5ad | 411 | packet->next_packet_size = cpu_to_be32(p->next_packet_size); |
98ea497d PX |
412 | |
413 | packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); | |
414 | packet->packet_num = cpu_to_be64(packet_num); | |
d32ca5ad | 415 | |
efd8c543 PX |
416 | if (pages->block) { |
417 | strncpy(packet->ramblock, pages->block->idstr, 256); | |
d32ca5ad JQ |
418 | } |
419 | ||
efd8c543 | 420 | for (i = 0; i < pages->num; i++) { |
d32ca5ad | 421 | /* there are architectures where ram_addr_t is 32 bit */ |
efd8c543 | 422 | uint64_t temp = pages->offset[i]; |
d32ca5ad JQ |
423 | |
424 | packet->offset[i] = cpu_to_be64(temp); | |
425 | } | |
05b7ec18 PX |
426 | |
427 | p->packets_sent++; | |
303e6f54 HX |
428 | p->total_normal_pages += pages->normal_num; |
429 | p->total_zero_pages += zero_num; | |
8a9ef173 | 430 | |
303e6f54 HX |
431 | trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num, |
432 | p->flags, p->next_packet_size); | |
d32ca5ad JQ |
433 | } |
434 | ||
435 | static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) | |
436 | { | |
437 | MultiFDPacket_t *packet = p->packet; | |
d32ca5ad JQ |
438 | int i; |
439 | ||
440 | packet->magic = be32_to_cpu(packet->magic); | |
441 | if (packet->magic != MULTIFD_MAGIC) { | |
442 | error_setg(errp, "multifd: received packet " | |
443 | "magic %x and expected magic %x", | |
444 | packet->magic, MULTIFD_MAGIC); | |
445 | return -1; | |
446 | } | |
447 | ||
448 | packet->version = be32_to_cpu(packet->version); | |
449 | if (packet->version != MULTIFD_VERSION) { | |
450 | error_setg(errp, "multifd: received packet " | |
04e11404 | 451 | "version %u and expected version %u", |
d32ca5ad JQ |
452 | packet->version, MULTIFD_VERSION); |
453 | return -1; | |
454 | } | |
455 | ||
456 | p->flags = be32_to_cpu(packet->flags); | |
457 | ||
458 | packet->pages_alloc = be32_to_cpu(packet->pages_alloc); | |
459 | /* | |
460 | * If we received a packet that is 100 times bigger than expected | |
461 | * just stop migration. It is a magic number. | |
462 | */ | |
d6f45eba | 463 | if (packet->pages_alloc > p->page_count) { |
d32ca5ad | 464 | error_setg(errp, "multifd: received packet " |
cf2d4aa8 | 465 | "with size %u and expected a size of %u", |
d6f45eba | 466 | packet->pages_alloc, p->page_count) ; |
d32ca5ad JQ |
467 | return -1; |
468 | } | |
d32ca5ad | 469 | |
8c0ec0b2 | 470 | p->normal_num = be32_to_cpu(packet->normal_pages); |
cf2d4aa8 | 471 | if (p->normal_num > packet->pages_alloc) { |
d32ca5ad | 472 | error_setg(errp, "multifd: received packet " |
303e6f54 | 473 | "with %u normal pages and expected maximum pages are %u", |
cf2d4aa8 | 474 | p->normal_num, packet->pages_alloc) ; |
d32ca5ad JQ |
475 | return -1; |
476 | } | |
477 | ||
303e6f54 HX |
478 | p->zero_num = be32_to_cpu(packet->zero_pages); |
479 | if (p->zero_num > packet->pages_alloc - p->normal_num) { | |
480 | error_setg(errp, "multifd: received packet " | |
481 | "with %u zero pages and expected maximum zero pages are %u", | |
482 | p->zero_num, packet->pages_alloc - p->normal_num) ; | |
483 | return -1; | |
484 | } | |
485 | ||
d32ca5ad JQ |
486 | p->next_packet_size = be32_to_cpu(packet->next_packet_size); |
487 | p->packet_num = be64_to_cpu(packet->packet_num); | |
05b7ec18 | 488 | p->packets_recved++; |
db7e1cc5 | 489 | p->total_normal_pages += p->normal_num; |
303e6f54 | 490 | p->total_zero_pages += p->zero_num; |
d32ca5ad | 491 | |
303e6f54 HX |
492 | trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num, |
493 | p->flags, p->next_packet_size); | |
8a9ef173 | 494 | |
303e6f54 | 495 | if (p->normal_num == 0 && p->zero_num == 0) { |
d32ca5ad JQ |
496 | return 0; |
497 | } | |
498 | ||
499 | /* make sure that ramblock is 0 terminated */ | |
500 | packet->ramblock[255] = 0; | |
5d1d1fcf LS |
501 | p->block = qemu_ram_block_by_name(packet->ramblock); |
502 | if (!p->block) { | |
d32ca5ad JQ |
503 | error_setg(errp, "multifd: unknown ram block %s", |
504 | packet->ramblock); | |
505 | return -1; | |
506 | } | |
507 | ||
5d1d1fcf | 508 | p->host = p->block->host; |
cf2d4aa8 | 509 | for (i = 0; i < p->normal_num; i++) { |
d32ca5ad JQ |
510 | uint64_t offset = be64_to_cpu(packet->offset[i]); |
511 | ||
5d1d1fcf | 512 | if (offset > (p->block->used_length - p->page_size)) { |
d32ca5ad JQ |
513 | error_setg(errp, "multifd: offset too long %" PRIu64 |
514 | " (max " RAM_ADDR_FMT ")", | |
5d1d1fcf | 515 | offset, p->block->used_length); |
d32ca5ad JQ |
516 | return -1; |
517 | } | |
cf2d4aa8 | 518 | p->normal[i] = offset; |
d32ca5ad JQ |
519 | } |
520 | ||
303e6f54 HX |
521 | for (i = 0; i < p->zero_num; i++) { |
522 | uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]); | |
523 | ||
524 | if (offset > (p->block->used_length - p->page_size)) { | |
525 | error_setg(errp, "multifd: offset too long %" PRIu64 | |
526 | " (max " RAM_ADDR_FMT ")", | |
527 | offset, p->block->used_length); | |
528 | return -1; | |
529 | } | |
530 | p->zero[i] = offset; | |
531 | } | |
532 | ||
d32ca5ad JQ |
533 | return 0; |
534 | } | |
535 | ||
15f3f21d PX |
536 | static bool multifd_send_should_exit(void) |
537 | { | |
538 | return qatomic_read(&multifd_send_state->exiting); | |
539 | } | |
540 | ||
11dd7be5 FR |
541 | static bool multifd_recv_should_exit(void) |
542 | { | |
543 | return qatomic_read(&multifd_recv_state->exiting); | |
544 | } | |
545 | ||
48c0f5d5 PX |
546 | /* |
547 | * The migration thread can wait on either of the two semaphores. This | |
548 | * function can be used to kick the main thread out of waiting on either of | |
549 | * them. Should mostly only be called when something wrong happened with | |
550 | * the current multifd send thread. | |
551 | */ | |
552 | static void multifd_send_kick_main(MultiFDSendParams *p) | |
553 | { | |
554 | qemu_sem_post(&p->sem_sync); | |
555 | qemu_sem_post(&multifd_send_state->channels_ready); | |
556 | } | |
557 | ||
d32ca5ad JQ |
558 | /* |
559 | * How we use multifd_send_state->pages and channel->pages? | |
560 | * | |
561 | * We create a pages for each channel, and a main one. Each time that | |
562 | * we need to send a batch of pages we interchange the ones between | |
563 | * multifd_send_state and the channel that is sending it. There are | |
564 | * two reasons for that: | |
565 | * - to not have to do so many mallocs during migration | |
566 | * - to make easier to know what to free at the end of migration | |
567 | * | |
568 | * This way we always know who is the owner of each "pages" struct, | |
569 | * and we don't need any locking. It belongs to the migration thread | |
570 | * or to the channel thread. Switching is safe because the migration | |
571 | * thread is using the channel mutex when changing it, and the channel | |
572 | * have to had finish with its own, otherwise pending_job can't be | |
573 | * false. | |
3b40964a PX |
574 | * |
575 | * Returns true if succeed, false otherwise. | |
d32ca5ad | 576 | */ |
3b40964a | 577 | static bool multifd_send_pages(void) |
d32ca5ad JQ |
578 | { |
579 | int i; | |
580 | static int next_channel; | |
581 | MultiFDSendParams *p = NULL; /* make happy gcc */ | |
582 | MultiFDPages_t *pages = multifd_send_state->pages; | |
d32ca5ad | 583 | |
15f3f21d | 584 | if (multifd_send_should_exit()) { |
3b40964a | 585 | return false; |
d32ca5ad JQ |
586 | } |
587 | ||
e3cce9af | 588 | /* We wait here, until at least one channel is ready */ |
d32ca5ad | 589 | qemu_sem_wait(&multifd_send_state->channels_ready); |
e3cce9af | 590 | |
7e89a140 LV |
591 | /* |
592 | * next_channel can remain from a previous migration that was | |
593 | * using more channels, so ensure it doesn't overflow if the | |
594 | * limit is lower now. | |
595 | */ | |
596 | next_channel %= migrate_multifd_channels(); | |
d32ca5ad | 597 | for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { |
15f3f21d | 598 | if (multifd_send_should_exit()) { |
3b40964a | 599 | return false; |
d32ca5ad | 600 | } |
15f3f21d | 601 | p = &multifd_send_state->params[i]; |
e3cce9af PX |
602 | /* |
603 | * Lockless read to p->pending_job is safe, because only multifd | |
604 | * sender thread can clear it. | |
605 | */ | |
f5f48a78 | 606 | if (qatomic_read(&p->pending_job) == false) { |
d32ca5ad JQ |
607 | next_channel = (i + 1) % migrate_multifd_channels(); |
608 | break; | |
609 | } | |
d32ca5ad | 610 | } |
e3cce9af | 611 | |
e3cce9af | 612 | /* |
488c84ac PX |
613 | * Make sure we read p->pending_job before all the rest. Pairs with |
614 | * qatomic_store_release() in multifd_send_thread(). | |
e3cce9af | 615 | */ |
488c84ac PX |
616 | smp_mb_acquire(); |
617 | assert(!p->pages->num); | |
d32ca5ad JQ |
618 | multifd_send_state->pages = p->pages; |
619 | p->pages = pages; | |
488c84ac PX |
620 | /* |
621 | * Making sure p->pages is setup before marking pending_job=true. Pairs | |
622 | * with the qatomic_load_acquire() in multifd_send_thread(). | |
623 | */ | |
624 | qatomic_store_release(&p->pending_job, true); | |
d32ca5ad JQ |
625 | qemu_sem_post(&p->sem); |
626 | ||
3b40964a | 627 | return true; |
d32ca5ad JQ |
628 | } |
629 | ||
f88f86c4 PX |
630 | static inline bool multifd_queue_empty(MultiFDPages_t *pages) |
631 | { | |
632 | return pages->num == 0; | |
633 | } | |
634 | ||
635 | static inline bool multifd_queue_full(MultiFDPages_t *pages) | |
636 | { | |
637 | return pages->num == pages->allocated; | |
638 | } | |
639 | ||
640 | static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) | |
641 | { | |
642 | pages->offset[pages->num++] = offset; | |
643 | } | |
644 | ||
d6556d17 PX |
645 | /* Returns true if enqueue successful, false otherwise */ |
646 | bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) | |
d32ca5ad | 647 | { |
f88f86c4 | 648 | MultiFDPages_t *pages; |
d32ca5ad | 649 | |
f88f86c4 PX |
650 | retry: |
651 | pages = multifd_send_state->pages; | |
652 | ||
653 | /* If the queue is empty, we can already enqueue now */ | |
654 | if (multifd_queue_empty(pages)) { | |
d32ca5ad | 655 | pages->block = block; |
f88f86c4 PX |
656 | multifd_enqueue(pages, offset); |
657 | return true; | |
d32ca5ad JQ |
658 | } |
659 | ||
f88f86c4 PX |
660 | /* |
661 | * Not empty, meanwhile we need a flush. It can because of either: | |
662 | * | |
663 | * (1) The page is not on the same ramblock of previous ones, or, | |
664 | * (2) The queue is full. | |
665 | * | |
666 | * After flush, always retry. | |
667 | */ | |
668 | if (pages->block != block || multifd_queue_full(pages)) { | |
669 | if (!multifd_send_pages()) { | |
670 | return false; | |
d32ca5ad | 671 | } |
f88f86c4 | 672 | goto retry; |
d32ca5ad JQ |
673 | } |
674 | ||
f88f86c4 PX |
675 | /* Not empty, and we still have space, do it! */ |
676 | multifd_enqueue(pages, offset); | |
d6556d17 | 677 | return true; |
d32ca5ad JQ |
678 | } |
679 | ||
3ab4441d PX |
680 | /* Multifd send side hit an error; remember it and prepare to quit */ |
681 | static void multifd_send_set_error(Error *err) | |
d32ca5ad | 682 | { |
15f3f21d PX |
683 | /* |
684 | * We don't want to exit each threads twice. Depending on where | |
685 | * we get the error, or if there are two independent errors in two | |
686 | * threads at the same time, we can end calling this function | |
687 | * twice. | |
688 | */ | |
689 | if (qatomic_xchg(&multifd_send_state->exiting, 1)) { | |
690 | return; | |
691 | } | |
692 | ||
d32ca5ad JQ |
693 | if (err) { |
694 | MigrationState *s = migrate_get_current(); | |
695 | migrate_set_error(s, err); | |
696 | if (s->state == MIGRATION_STATUS_SETUP || | |
697 | s->state == MIGRATION_STATUS_PRE_SWITCHOVER || | |
698 | s->state == MIGRATION_STATUS_DEVICE || | |
699 | s->state == MIGRATION_STATUS_ACTIVE) { | |
700 | migrate_set_state(&s->state, s->state, | |
701 | MIGRATION_STATUS_FAILED); | |
702 | } | |
703 | } | |
3ab4441d PX |
704 | } |
705 | ||
706 | static void multifd_send_terminate_threads(void) | |
707 | { | |
708 | int i; | |
709 | ||
710 | trace_multifd_send_terminate_threads(); | |
d32ca5ad | 711 | |
3ab4441d PX |
712 | /* |
713 | * Tell everyone we're quitting. No xchg() needed here; we simply | |
714 | * always set it. | |
715 | */ | |
716 | qatomic_set(&multifd_send_state->exiting, 1); | |
12808db3 PX |
717 | |
718 | /* | |
719 | * Firstly, kick all threads out; no matter whether they are just idle, | |
720 | * or blocked in an IO system call. | |
721 | */ | |
d32ca5ad JQ |
722 | for (i = 0; i < migrate_multifd_channels(); i++) { |
723 | MultiFDSendParams *p = &multifd_send_state->params[i]; | |
724 | ||
d32ca5ad | 725 | qemu_sem_post(&p->sem); |
077fbb59 LZ |
726 | if (p->c) { |
727 | qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); | |
728 | } | |
d32ca5ad | 729 | } |
12808db3 PX |
730 | |
731 | /* | |
732 | * Finally recycle all the threads. | |
12808db3 PX |
733 | */ |
734 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
735 | MultiFDSendParams *p = &multifd_send_state->params[i]; | |
736 | ||
e1921f10 FR |
737 | if (p->tls_thread_created) { |
738 | qemu_thread_join(&p->tls_thread); | |
739 | } | |
740 | ||
a2a63c4a | 741 | if (p->thread_created) { |
12808db3 PX |
742 | qemu_thread_join(&p->thread); |
743 | } | |
744 | } | |
d32ca5ad JQ |
745 | } |
746 | ||
12808db3 PX |
747 | static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) |
748 | { | |
0518b5d8 | 749 | if (p->c) { |
12808db3 | 750 | migration_ioc_unregister_yank(p->c); |
1a6e217c | 751 | /* |
61dec060 FR |
752 | * The object_unref() cannot guarantee the fd will always be |
753 | * released because finalize() of the iochannel is only | |
754 | * triggered on the last reference and it's not guaranteed | |
755 | * that we always hold the last refcount when reaching here. | |
1a6e217c | 756 | * |
61dec060 FR |
757 | * Closing the fd explicitly has the benefit that if there is any |
758 | * registered I/O handler callbacks on such fd, that will get a | |
759 | * POLLNVAL event and will further trigger the cleanup to finally | |
760 | * release the IOC. | |
761 | * | |
762 | * FIXME: It should logically be guaranteed that all multifd | |
763 | * channels have no I/O handler callback registered when reaching | |
764 | * here, because migration thread will wait for all multifd channel | |
765 | * establishments to complete during setup. Since | |
766 | * migrate_fd_cleanup() will be scheduled in main thread too, all | |
767 | * previous callbacks should guarantee to be completed when | |
768 | * reaching here. See multifd_send_state.channels_created and its | |
769 | * usage. In the future, we could replace this with an assert | |
770 | * making sure we're the last reference, or simply drop it if above | |
771 | * is more clear to be justified. | |
1a6e217c | 772 | */ |
b7b03eb6 | 773 | qio_channel_close(p->c, &error_abort); |
c9a7e83c | 774 | object_unref(OBJECT(p->c)); |
0518b5d8 | 775 | p->c = NULL; |
12808db3 | 776 | } |
12808db3 PX |
777 | qemu_sem_destroy(&p->sem); |
778 | qemu_sem_destroy(&p->sem_sync); | |
779 | g_free(p->name); | |
780 | p->name = NULL; | |
781 | multifd_pages_clear(p->pages); | |
782 | p->pages = NULL; | |
783 | p->packet_len = 0; | |
784 | g_free(p->packet); | |
785 | p->packet = NULL; | |
786 | g_free(p->iov); | |
787 | p->iov = NULL; | |
788 | multifd_send_state->ops->send_cleanup(p, errp); | |
789 | ||
790 | return *errp == NULL; | |
791 | } | |
792 | ||
793 | static void multifd_send_cleanup_state(void) | |
794 | { | |
b7b03eb6 | 795 | file_cleanup_outgoing_migration(); |
72b90b96 | 796 | socket_cleanup_outgoing_migration(); |
93fa9dc2 | 797 | qemu_sem_destroy(&multifd_send_state->channels_created); |
12808db3 PX |
798 | qemu_sem_destroy(&multifd_send_state->channels_ready); |
799 | g_free(multifd_send_state->params); | |
800 | multifd_send_state->params = NULL; | |
801 | multifd_pages_clear(multifd_send_state->pages); | |
802 | multifd_send_state->pages = NULL; | |
803 | g_free(multifd_send_state); | |
804 | multifd_send_state = NULL; | |
805 | } | |
806 | ||
cde85c37 | 807 | void multifd_send_shutdown(void) |
d32ca5ad JQ |
808 | { |
809 | int i; | |
810 | ||
51b07548 | 811 | if (!migrate_multifd()) { |
d32ca5ad JQ |
812 | return; |
813 | } | |
12808db3 | 814 | |
3ab4441d | 815 | multifd_send_terminate_threads(); |
d32ca5ad | 816 | |
d32ca5ad JQ |
817 | for (i = 0; i < migrate_multifd_channels(); i++) { |
818 | MultiFDSendParams *p = &multifd_send_state->params[i]; | |
ab7cbb0b | 819 | Error *local_err = NULL; |
d32ca5ad | 820 | |
12808db3 | 821 | if (!multifd_send_cleanup_channel(p, &local_err)) { |
ab7cbb0b | 822 | migrate_set_error(migrate_get_current(), local_err); |
13f2cb21 | 823 | error_free(local_err); |
ab7cbb0b | 824 | } |
d32ca5ad | 825 | } |
12808db3 PX |
826 | |
827 | multifd_send_cleanup_state(); | |
d32ca5ad JQ |
828 | } |
829 | ||
4cc47b43 LB |
830 | static int multifd_zero_copy_flush(QIOChannel *c) |
831 | { | |
832 | int ret; | |
833 | Error *err = NULL; | |
834 | ||
835 | ret = qio_channel_flush(c, &err); | |
836 | if (ret < 0) { | |
837 | error_report_err(err); | |
838 | return -1; | |
839 | } | |
840 | if (ret == 1) { | |
aff3f660 | 841 | stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); |
4cc47b43 LB |
842 | } |
843 | ||
844 | return ret; | |
845 | } | |
846 | ||
9346fa18 | 847 | int multifd_send_sync_main(void) |
d32ca5ad JQ |
848 | { |
849 | int i; | |
5b1d9bab | 850 | bool flush_zero_copy; |
d32ca5ad | 851 | |
51b07548 | 852 | if (!migrate_multifd()) { |
33d70973 | 853 | return 0; |
d32ca5ad | 854 | } |
90a3d2f9 | 855 | if (multifd_send_state->pages->num) { |
3b40964a | 856 | if (!multifd_send_pages()) { |
d32ca5ad | 857 | error_report("%s: multifd_send_pages fail", __func__); |
33d70973 | 858 | return -1; |
d32ca5ad JQ |
859 | } |
860 | } | |
5b1d9bab | 861 | |
b4bc342c | 862 | flush_zero_copy = migrate_zero_copy_send(); |
5b1d9bab | 863 | |
d32ca5ad JQ |
864 | for (i = 0; i < migrate_multifd_channels(); i++) { |
865 | MultiFDSendParams *p = &multifd_send_state->params[i]; | |
866 | ||
15f3f21d | 867 | if (multifd_send_should_exit()) { |
33d70973 | 868 | return -1; |
d32ca5ad JQ |
869 | } |
870 | ||
15f3f21d PX |
871 | trace_multifd_send_sync_main_signal(p->id); |
872 | ||
f5f48a78 PX |
873 | /* |
874 | * We should be the only user so far, so not possible to be set by | |
875 | * others concurrently. | |
876 | */ | |
877 | assert(qatomic_read(&p->pending_sync) == false); | |
878 | qatomic_set(&p->pending_sync, true); | |
d32ca5ad JQ |
879 | qemu_sem_post(&p->sem); |
880 | } | |
881 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
882 | MultiFDSendParams *p = &multifd_send_state->params[i]; | |
883 | ||
15f3f21d PX |
884 | if (multifd_send_should_exit()) { |
885 | return -1; | |
886 | } | |
887 | ||
d2026ee1 | 888 | qemu_sem_wait(&multifd_send_state->channels_ready); |
d32ca5ad JQ |
889 | trace_multifd_send_sync_main_wait(p->id); |
890 | qemu_sem_wait(&p->sem_sync); | |
ebfc5787 ZD |
891 | |
892 | if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { | |
893 | return -1; | |
894 | } | |
d32ca5ad JQ |
895 | } |
896 | trace_multifd_send_sync_main(multifd_send_state->packet_num); | |
33d70973 LB |
897 | |
898 | return 0; | |
d32ca5ad JQ |
899 | } |
900 | ||
901 | static void *multifd_send_thread(void *opaque) | |
902 | { | |
903 | MultiFDSendParams *p = opaque; | |
1b1f4ab6 | 904 | MigrationThread *thread = NULL; |
d32ca5ad JQ |
905 | Error *local_err = NULL; |
906 | int ret = 0; | |
06833d83 | 907 | bool use_packets = multifd_use_packets(); |
d32ca5ad | 908 | |
788fa680 | 909 | thread = migration_threads_add(p->name, qemu_get_thread_id()); |
1b1f4ab6 | 910 | |
d32ca5ad JQ |
911 | trace_multifd_send_thread_start(p->id); |
912 | rcu_register_thread(); | |
913 | ||
06833d83 FR |
914 | if (use_packets) { |
915 | if (multifd_send_initial_packet(p, &local_err) < 0) { | |
916 | ret = -1; | |
917 | goto out; | |
918 | } | |
d32ca5ad | 919 | } |
d32ca5ad JQ |
920 | |
921 | while (true) { | |
d2026ee1 | 922 | qemu_sem_post(&multifd_send_state->channels_ready); |
d32ca5ad JQ |
923 | qemu_sem_wait(&p->sem); |
924 | ||
15f3f21d | 925 | if (multifd_send_should_exit()) { |
d32ca5ad JQ |
926 | break; |
927 | } | |
d32ca5ad | 928 | |
488c84ac PX |
929 | /* |
930 | * Read pending_job flag before p->pages. Pairs with the | |
931 | * qatomic_store_release() in multifd_send_pages(). | |
932 | */ | |
933 | if (qatomic_load_acquire(&p->pending_job)) { | |
efd8c543 | 934 | MultiFDPages_t *pages = p->pages; |
815956f0 | 935 | |
452b2057 | 936 | p->iovs_num = 0; |
83c560fb PX |
937 | assert(pages->num); |
938 | ||
939 | ret = multifd_send_state->ops->send_prepare(p, &local_err); | |
940 | if (ret != 0) { | |
83c560fb | 941 | break; |
ab7cbb0b | 942 | } |
83c560fb | 943 | |
f427d90b FR |
944 | if (migrate_mapped_ram()) { |
945 | ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, | |
946 | p->pages->block, &local_err); | |
947 | } else { | |
948 | ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, | |
949 | NULL, 0, p->write_flags, | |
950 | &local_err); | |
951 | } | |
952 | ||
d32ca5ad JQ |
953 | if (ret != 0) { |
954 | break; | |
955 | } | |
956 | ||
68b6e000 EU |
957 | stat64_add(&mig_stats.multifd_bytes, |
958 | p->next_packet_size + p->packet_len); | |
303e6f54 HX |
959 | stat64_add(&mig_stats.normal_pages, pages->normal_num); |
960 | stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num); | |
836eca47 PX |
961 | |
962 | multifd_pages_reset(p->pages); | |
1618f552 | 963 | p->next_packet_size = 0; |
488c84ac PX |
964 | |
965 | /* | |
966 | * Making sure p->pages is published before saying "we're | |
967 | * free". Pairs with the smp_mb_acquire() in | |
968 | * multifd_send_pages(). | |
969 | */ | |
970 | qatomic_store_release(&p->pending_job, false); | |
859ebaf3 | 971 | } else { |
488c84ac PX |
972 | /* |
973 | * If not a normal job, must be a sync request. Note that | |
974 | * pending_sync is a standalone flag (unlike pending_job), so | |
975 | * it doesn't require explicit memory barriers. | |
976 | */ | |
859ebaf3 | 977 | assert(qatomic_read(&p->pending_sync)); |
06833d83 FR |
978 | |
979 | if (use_packets) { | |
980 | p->flags = MULTIFD_FLAG_SYNC; | |
981 | multifd_send_fill_packet(p); | |
982 | ret = qio_channel_write_all(p->c, (void *)p->packet, | |
983 | p->packet_len, &local_err); | |
984 | if (ret != 0) { | |
985 | break; | |
986 | } | |
987 | /* p->next_packet_size will always be zero for a SYNC packet */ | |
988 | stat64_add(&mig_stats.multifd_bytes, p->packet_len); | |
989 | p->flags = 0; | |
d32ca5ad | 990 | } |
06833d83 | 991 | |
f5f48a78 | 992 | qatomic_set(&p->pending_sync, false); |
f5f48a78 | 993 | qemu_sem_post(&p->sem_sync); |
d32ca5ad JQ |
994 | } |
995 | } | |
996 | ||
997 | out: | |
ee8a7c9c FR |
998 | if (ret) { |
999 | assert(local_err); | |
d32ca5ad | 1000 | trace_multifd_send_error(p->id); |
3ab4441d | 1001 | multifd_send_set_error(local_err); |
48c0f5d5 | 1002 | multifd_send_kick_main(p); |
ee8a7c9c | 1003 | error_free(local_err); |
d32ca5ad JQ |
1004 | } |
1005 | ||
d32ca5ad | 1006 | rcu_unregister_thread(); |
788fa680 | 1007 | migration_threads_remove(thread); |
303e6f54 HX |
1008 | trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages, |
1009 | p->total_zero_pages); | |
d32ca5ad JQ |
1010 | |
1011 | return NULL; | |
1012 | } | |
1013 | ||
2576ae48 | 1014 | static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); |
29647140 | 1015 | |
9221e3c6 PX |
1016 | typedef struct { |
1017 | MultiFDSendParams *p; | |
1018 | QIOChannelTLS *tioc; | |
1019 | } MultiFDTLSThreadArgs; | |
1020 | ||
a1af605b CZ |
1021 | static void *multifd_tls_handshake_thread(void *opaque) |
1022 | { | |
9221e3c6 | 1023 | MultiFDTLSThreadArgs *args = opaque; |
a1af605b | 1024 | |
9221e3c6 | 1025 | qio_channel_tls_handshake(args->tioc, |
2576ae48 | 1026 | multifd_new_send_channel_async, |
9221e3c6 | 1027 | args->p, |
a1af605b CZ |
1028 | NULL, |
1029 | NULL); | |
9221e3c6 PX |
1030 | g_free(args); |
1031 | ||
a1af605b CZ |
1032 | return NULL; |
1033 | } | |
1034 | ||
967e3889 | 1035 | static bool multifd_tls_channel_connect(MultiFDSendParams *p, |
29647140 CZ |
1036 | QIOChannel *ioc, |
1037 | Error **errp) | |
1038 | { | |
1039 | MigrationState *s = migrate_get_current(); | |
7f692ec7 | 1040 | const char *hostname = s->hostname; |
9221e3c6 | 1041 | MultiFDTLSThreadArgs *args; |
29647140 CZ |
1042 | QIOChannelTLS *tioc; |
1043 | ||
0deb7e9b | 1044 | tioc = migration_tls_client_create(ioc, hostname, errp); |
29647140 | 1045 | if (!tioc) { |
967e3889 | 1046 | return false; |
29647140 CZ |
1047 | } |
1048 | ||
2576ae48 FR |
1049 | /* |
1050 | * Ownership of the socket channel now transfers to the newly | |
1051 | * created TLS channel, which has already taken a reference. | |
1052 | */ | |
9e842408 | 1053 | object_unref(OBJECT(ioc)); |
894f0214 | 1054 | trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); |
29647140 | 1055 | qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); |
9221e3c6 PX |
1056 | |
1057 | args = g_new0(MultiFDTLSThreadArgs, 1); | |
1058 | args->tioc = tioc; | |
1059 | args->p = p; | |
e1921f10 FR |
1060 | |
1061 | p->tls_thread_created = true; | |
1062 | qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker", | |
9221e3c6 | 1063 | multifd_tls_handshake_thread, args, |
a1af605b | 1064 | QEMU_THREAD_JOINABLE); |
967e3889 | 1065 | return true; |
29647140 CZ |
1066 | } |
1067 | ||
b7b03eb6 | 1068 | void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) |
29647140 | 1069 | { |
2576ae48 | 1070 | qio_channel_set_delay(ioc, false); |
a4395f5d AH |
1071 | |
1072 | migration_ioc_register_yank(ioc); | |
9221e3c6 | 1073 | /* Setup p->c only if the channel is completely setup */ |
a4395f5d | 1074 | p->c = ioc; |
a2a63c4a FR |
1075 | |
1076 | p->thread_created = true; | |
a4395f5d AH |
1077 | qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, |
1078 | QEMU_THREAD_JOINABLE); | |
29647140 CZ |
1079 | } |
1080 | ||
2576ae48 FR |
1081 | /* |
1082 | * When TLS is enabled this function is called once to establish the | |
1083 | * TLS connection and a second time after the TLS handshake to create | |
1084 | * the multifd channel. Without TLS it goes straight into the channel | |
1085 | * creation. | |
1086 | */ | |
d32ca5ad JQ |
1087 | static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) |
1088 | { | |
1089 | MultiFDSendParams *p = opaque; | |
0e92f644 | 1090 | QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); |
d32ca5ad | 1091 | Error *local_err = NULL; |
2576ae48 | 1092 | bool ret; |
d32ca5ad JQ |
1093 | |
1094 | trace_multifd_new_send_channel_async(p->id); | |
2576ae48 FR |
1095 | |
1096 | if (qio_task_propagate_error(task, &local_err)) { | |
1097 | ret = false; | |
1098 | goto out; | |
1099 | } | |
1100 | ||
1101 | trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), | |
1102 | migrate_get_current()->hostname); | |
1103 | ||
1104 | if (migrate_channel_requires_tls_upgrade(ioc)) { | |
1105 | ret = multifd_tls_channel_connect(p, ioc, &local_err); | |
93fa9dc2 FR |
1106 | if (ret) { |
1107 | return; | |
1108 | } | |
2576ae48 | 1109 | } else { |
770de49c PX |
1110 | multifd_channel_connect(p, ioc); |
1111 | ret = true; | |
d32ca5ad | 1112 | } |
03c7a42d | 1113 | |
93fa9dc2 FR |
1114 | out: |
1115 | /* | |
1116 | * Here we're not interested whether creation succeeded, only that | |
1117 | * it happened at all. | |
1118 | */ | |
a8a3e710 | 1119 | multifd_send_channel_created(); |
93fa9dc2 | 1120 | |
2576ae48 FR |
1121 | if (ret) { |
1122 | return; | |
1123 | } | |
1124 | ||
967e3889 | 1125 | trace_multifd_new_send_channel_async_error(p->id, local_err); |
3ab4441d | 1126 | multifd_send_set_error(local_err); |
9221e3c6 PX |
1127 | /* |
1128 | * For error cases (TLS or non-TLS), IO channel is always freed here | |
1129 | * rather than when cleanup multifd: since p->c is not set, multifd | |
1130 | * cleanup code doesn't even know its existence. | |
1131 | */ | |
1132 | object_unref(OBJECT(ioc)); | |
15f3f21d | 1133 | error_free(local_err); |
0e92f644 FR |
1134 | } |
1135 | ||
b7b03eb6 | 1136 | static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) |
0e92f644 | 1137 | { |
b7b03eb6 FR |
1138 | if (!multifd_use_packets()) { |
1139 | return file_send_channel_create(opaque, errp); | |
1140 | } | |
1141 | ||
0e92f644 | 1142 | socket_send_channel_create(multifd_new_send_channel_async, opaque); |
b7b03eb6 | 1143 | return true; |
d32ca5ad JQ |
1144 | } |
1145 | ||
bd8b0a8f | 1146 | bool multifd_send_setup(void) |
d32ca5ad | 1147 | { |
bd8b0a8f FR |
1148 | MigrationState *s = migrate_get_current(); |
1149 | Error *local_err = NULL; | |
1150 | int thread_count, ret = 0; | |
d32ca5ad | 1151 | uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); |
06833d83 | 1152 | bool use_packets = multifd_use_packets(); |
d32ca5ad JQ |
1153 | uint8_t i; |
1154 | ||
51b07548 | 1155 | if (!migrate_multifd()) { |
bd8b0a8f | 1156 | return true; |
d32ca5ad | 1157 | } |
b7acd657 | 1158 | |
d32ca5ad JQ |
1159 | thread_count = migrate_multifd_channels(); |
1160 | multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); | |
1161 | multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); | |
1162 | multifd_send_state->pages = multifd_pages_init(page_count); | |
93fa9dc2 | 1163 | qemu_sem_init(&multifd_send_state->channels_created, 0); |
d32ca5ad | 1164 | qemu_sem_init(&multifd_send_state->channels_ready, 0); |
d73415a3 | 1165 | qatomic_set(&multifd_send_state->exiting, 0); |
ab7cbb0b | 1166 | multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; |
d32ca5ad JQ |
1167 | |
1168 | for (i = 0; i < thread_count; i++) { | |
1169 | MultiFDSendParams *p = &multifd_send_state->params[i]; | |
1170 | ||
d32ca5ad JQ |
1171 | qemu_sem_init(&p->sem, 0); |
1172 | qemu_sem_init(&p->sem_sync, 0); | |
d32ca5ad JQ |
1173 | p->id = i; |
1174 | p->pages = multifd_pages_init(page_count); | |
06833d83 FR |
1175 | |
1176 | if (use_packets) { | |
1177 | p->packet_len = sizeof(MultiFDPacket_t) | |
1178 | + sizeof(uint64_t) * page_count; | |
1179 | p->packet = g_malloc0(p->packet_len); | |
1180 | p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); | |
1181 | p->packet->version = cpu_to_be32(MULTIFD_VERSION); | |
1182 | ||
1183 | /* We need one extra place for the packet header */ | |
1184 | p->iov = g_new0(struct iovec, page_count + 1); | |
1185 | } else { | |
1186 | p->iov = g_new0(struct iovec, page_count); | |
1187 | } | |
d32ca5ad | 1188 | p->name = g_strdup_printf("multifdsend_%d", i); |
ddec20f8 | 1189 | p->page_size = qemu_target_page_size(); |
d6f45eba | 1190 | p->page_count = page_count; |
25a1f878 | 1191 | p->write_flags = 0; |
b7b03eb6 FR |
1192 | |
1193 | if (!multifd_new_send_channel_create(p, &local_err)) { | |
1194 | return false; | |
1195 | } | |
d32ca5ad | 1196 | } |
ab7cbb0b | 1197 | |
93fa9dc2 FR |
1198 | /* |
1199 | * Wait until channel creation has started for all channels. The | |
1200 | * creation can still fail, but no more channels will be created | |
1201 | * past this point. | |
1202 | */ | |
1203 | for (i = 0; i < thread_count; i++) { | |
1204 | qemu_sem_wait(&multifd_send_state->channels_created); | |
1205 | } | |
1206 | ||
ab7cbb0b JQ |
1207 | for (i = 0; i < thread_count; i++) { |
1208 | MultiFDSendParams *p = &multifd_send_state->params[i]; | |
ab7cbb0b | 1209 | |
bd8b0a8f | 1210 | ret = multifd_send_state->ops->send_setup(p, &local_err); |
ab7cbb0b | 1211 | if (ret) { |
bd8b0a8f | 1212 | break; |
ab7cbb0b JQ |
1213 | } |
1214 | } | |
bd8b0a8f FR |
1215 | |
1216 | if (ret) { | |
1217 | migrate_set_error(s, local_err); | |
1218 | error_report_err(local_err); | |
1219 | migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, | |
1220 | MIGRATION_STATUS_FAILED); | |
1221 | return false; | |
1222 | } | |
1223 | ||
1224 | return true; | |
d32ca5ad JQ |
1225 | } |
1226 | ||
d117ed06 FR |
1227 | bool multifd_recv(void) |
1228 | { | |
1229 | int i; | |
1230 | static int next_recv_channel; | |
1231 | MultiFDRecvParams *p = NULL; | |
1232 | MultiFDRecvData *data = multifd_recv_state->data; | |
1233 | ||
1234 | /* | |
1235 | * next_channel can remain from a previous migration that was | |
1236 | * using more channels, so ensure it doesn't overflow if the | |
1237 | * limit is lower now. | |
1238 | */ | |
1239 | next_recv_channel %= migrate_multifd_channels(); | |
1240 | for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { | |
1241 | if (multifd_recv_should_exit()) { | |
1242 | return false; | |
1243 | } | |
1244 | ||
1245 | p = &multifd_recv_state->params[i]; | |
1246 | ||
1247 | if (qatomic_read(&p->pending_job) == false) { | |
1248 | next_recv_channel = (i + 1) % migrate_multifd_channels(); | |
1249 | break; | |
1250 | } | |
1251 | } | |
1252 | ||
1253 | /* | |
1254 | * Order pending_job read before manipulating p->data below. Pairs | |
1255 | * with qatomic_store_release() at multifd_recv_thread(). | |
1256 | */ | |
1257 | smp_mb_acquire(); | |
1258 | ||
1259 | assert(!p->data->size); | |
1260 | multifd_recv_state->data = p->data; | |
1261 | p->data = data; | |
1262 | ||
1263 | /* | |
1264 | * Order p->data update before setting pending_job. Pairs with | |
1265 | * qatomic_load_acquire() at multifd_recv_thread(). | |
1266 | */ | |
1267 | qatomic_store_release(&p->pending_job, true); | |
1268 | qemu_sem_post(&p->sem); | |
1269 | ||
1270 | return true; | |
1271 | } | |
1272 | ||
1273 | MultiFDRecvData *multifd_get_recv_data(void) | |
1274 | { | |
1275 | return multifd_recv_state->data; | |
1276 | } | |
1277 | ||
d32ca5ad JQ |
1278 | static void multifd_recv_terminate_threads(Error *err) |
1279 | { | |
1280 | int i; | |
1281 | ||
1282 | trace_multifd_recv_terminate_threads(err != NULL); | |
1283 | ||
11dd7be5 FR |
1284 | if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { |
1285 | return; | |
1286 | } | |
1287 | ||
d32ca5ad JQ |
1288 | if (err) { |
1289 | MigrationState *s = migrate_get_current(); | |
1290 | migrate_set_error(s, err); | |
1291 | if (s->state == MIGRATION_STATUS_SETUP || | |
1292 | s->state == MIGRATION_STATUS_ACTIVE) { | |
1293 | migrate_set_state(&s->state, s->state, | |
1294 | MIGRATION_STATUS_FAILED); | |
1295 | } | |
1296 | } | |
1297 | ||
1298 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
1299 | MultiFDRecvParams *p = &multifd_recv_state->params[i]; | |
1300 | ||
d13f0026 | 1301 | /* |
d117ed06 FR |
1302 | * The migration thread and channels interact differently |
1303 | * depending on the presence of packets. | |
d13f0026 | 1304 | */ |
06833d83 | 1305 | if (multifd_use_packets()) { |
d117ed06 FR |
1306 | /* |
1307 | * The channel receives as long as there are packets. When | |
1308 | * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the | |
1309 | * channel waits for the migration thread to sync. If the | |
1310 | * sync never happens, do it here. | |
1311 | */ | |
06833d83 | 1312 | qemu_sem_post(&p->sem_sync); |
d117ed06 FR |
1313 | } else { |
1314 | /* | |
1315 | * The channel waits for the migration thread to give it | |
1316 | * work. When the migration thread runs out of work, it | |
1317 | * releases the channel and waits for any pending work to | |
1318 | * finish. If we reach here (e.g. due to error) before the | |
1319 | * work runs out, release the channel. | |
1320 | */ | |
1321 | qemu_sem_post(&p->sem); | |
06833d83 | 1322 | } |
d13f0026 | 1323 | |
d32ca5ad JQ |
1324 | /* |
1325 | * We could arrive here for two reasons: | |
1326 | * - normal quit, i.e. everything went fine, just finished | |
1327 | * - error quit: We close the channels so the channel threads | |
1328 | * finish the qio_channel_read_all_eof() | |
1329 | */ | |
1330 | if (p->c) { | |
1331 | qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); | |
1332 | } | |
d32ca5ad JQ |
1333 | } |
1334 | } | |
1335 | ||
cde85c37 | 1336 | void multifd_recv_shutdown(void) |
cfc3bcf3 | 1337 | { |
51b07548 | 1338 | if (migrate_multifd()) { |
cfc3bcf3 LB |
1339 | multifd_recv_terminate_threads(NULL); |
1340 | } | |
1341 | } | |
1342 | ||
5e6ea8a1 PX |
1343 | static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) |
1344 | { | |
1345 | migration_ioc_unregister_yank(p->c); | |
1346 | object_unref(OBJECT(p->c)); | |
1347 | p->c = NULL; | |
1348 | qemu_mutex_destroy(&p->mutex); | |
1349 | qemu_sem_destroy(&p->sem_sync); | |
d117ed06 | 1350 | qemu_sem_destroy(&p->sem); |
5e6ea8a1 PX |
1351 | g_free(p->name); |
1352 | p->name = NULL; | |
1353 | p->packet_len = 0; | |
1354 | g_free(p->packet); | |
1355 | p->packet = NULL; | |
1356 | g_free(p->iov); | |
1357 | p->iov = NULL; | |
1358 | g_free(p->normal); | |
1359 | p->normal = NULL; | |
303e6f54 HX |
1360 | g_free(p->zero); |
1361 | p->zero = NULL; | |
5e6ea8a1 PX |
1362 | multifd_recv_state->ops->recv_cleanup(p); |
1363 | } | |
1364 | ||
1365 | static void multifd_recv_cleanup_state(void) | |
1366 | { | |
1367 | qemu_sem_destroy(&multifd_recv_state->sem_sync); | |
1368 | g_free(multifd_recv_state->params); | |
1369 | multifd_recv_state->params = NULL; | |
d117ed06 FR |
1370 | g_free(multifd_recv_state->data); |
1371 | multifd_recv_state->data = NULL; | |
5e6ea8a1 PX |
1372 | g_free(multifd_recv_state); |
1373 | multifd_recv_state = NULL; | |
1374 | } | |
1375 | ||
cde85c37 | 1376 | void multifd_recv_cleanup(void) |
d32ca5ad JQ |
1377 | { |
1378 | int i; | |
d32ca5ad | 1379 | |
51b07548 | 1380 | if (!migrate_multifd()) { |
e5bac1f5 | 1381 | return; |
d32ca5ad JQ |
1382 | } |
1383 | multifd_recv_terminate_threads(NULL); | |
1384 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
1385 | MultiFDRecvParams *p = &multifd_recv_state->params[i]; | |
1386 | ||
a2a63c4a FR |
1387 | if (p->thread_created) { |
1388 | qemu_thread_join(&p->thread); | |
1389 | } | |
d32ca5ad JQ |
1390 | } |
1391 | for (i = 0; i < migrate_multifd_channels(); i++) { | |
5e6ea8a1 | 1392 | multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); |
d32ca5ad | 1393 | } |
5e6ea8a1 | 1394 | multifd_recv_cleanup_state(); |
d32ca5ad JQ |
1395 | } |
1396 | ||
1397 | void multifd_recv_sync_main(void) | |
1398 | { | |
4aac6b1e | 1399 | int thread_count = migrate_multifd_channels(); |
a49d15a3 | 1400 | bool file_based = !multifd_use_packets(); |
d32ca5ad JQ |
1401 | int i; |
1402 | ||
a49d15a3 | 1403 | if (!migrate_multifd()) { |
d32ca5ad JQ |
1404 | return; |
1405 | } | |
d32ca5ad | 1406 | |
a49d15a3 FR |
1407 | /* |
1408 | * File-based channels don't use packets and therefore need to | |
1409 | * wait for more work. Release them to start the sync. | |
1410 | */ | |
1411 | if (file_based) { | |
1412 | for (i = 0; i < thread_count; i++) { | |
1413 | MultiFDRecvParams *p = &multifd_recv_state->params[i]; | |
1414 | ||
1415 | trace_multifd_recv_sync_main_signal(p->id); | |
1416 | qemu_sem_post(&p->sem); | |
1417 | } | |
1418 | } | |
1419 | ||
4aac6b1e FR |
1420 | /* |
1421 | * Initiate the synchronization by waiting for all channels. | |
a49d15a3 | 1422 | * |
4aac6b1e FR |
1423 | * For socket-based migration this means each channel has received |
1424 | * the SYNC packet on the stream. | |
a49d15a3 FR |
1425 | * |
1426 | * For file-based migration this means each channel is done with | |
1427 | * the work (pending_job=false). | |
4aac6b1e FR |
1428 | */ |
1429 | for (i = 0; i < thread_count; i++) { | |
1430 | trace_multifd_recv_sync_main_wait(i); | |
d32ca5ad JQ |
1431 | qemu_sem_wait(&multifd_recv_state->sem_sync); |
1432 | } | |
4aac6b1e | 1433 | |
a49d15a3 FR |
1434 | if (file_based) { |
1435 | /* | |
1436 | * For file-based loading is done in one iteration. We're | |
1437 | * done. | |
1438 | */ | |
1439 | return; | |
1440 | } | |
1441 | ||
4aac6b1e FR |
1442 | /* |
1443 | * Sync done. Release the channels for the next iteration. | |
1444 | */ | |
1445 | for (i = 0; i < thread_count; i++) { | |
d32ca5ad JQ |
1446 | MultiFDRecvParams *p = &multifd_recv_state->params[i]; |
1447 | ||
6e8a355d DB |
1448 | WITH_QEMU_LOCK_GUARD(&p->mutex) { |
1449 | if (multifd_recv_state->packet_num < p->packet_num) { | |
1450 | multifd_recv_state->packet_num = p->packet_num; | |
1451 | } | |
d32ca5ad | 1452 | } |
d32ca5ad JQ |
1453 | trace_multifd_recv_sync_main_signal(p->id); |
1454 | qemu_sem_post(&p->sem_sync); | |
1455 | } | |
1456 | trace_multifd_recv_sync_main(multifd_recv_state->packet_num); | |
1457 | } | |
1458 | ||
1459 | static void *multifd_recv_thread(void *opaque) | |
1460 | { | |
1461 | MultiFDRecvParams *p = opaque; | |
1462 | Error *local_err = NULL; | |
06833d83 | 1463 | bool use_packets = multifd_use_packets(); |
d32ca5ad JQ |
1464 | int ret; |
1465 | ||
1466 | trace_multifd_recv_thread_start(p->id); | |
1467 | rcu_register_thread(); | |
1468 | ||
1469 | while (true) { | |
06833d83 | 1470 | uint32_t flags = 0; |
9db19125 FR |
1471 | bool has_data = false; |
1472 | p->normal_num = 0; | |
d32ca5ad | 1473 | |
06833d83 | 1474 | if (use_packets) { |
d117ed06 FR |
1475 | if (multifd_recv_should_exit()) { |
1476 | break; | |
1477 | } | |
1478 | ||
06833d83 FR |
1479 | ret = qio_channel_read_all_eof(p->c, (void *)p->packet, |
1480 | p->packet_len, &local_err); | |
1481 | if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ | |
1482 | break; | |
1483 | } | |
d32ca5ad | 1484 | |
06833d83 FR |
1485 | qemu_mutex_lock(&p->mutex); |
1486 | ret = multifd_recv_unfill_packet(p, &local_err); | |
1487 | if (ret) { | |
1488 | qemu_mutex_unlock(&p->mutex); | |
1489 | break; | |
1490 | } | |
1491 | ||
1492 | flags = p->flags; | |
1493 | /* recv methods don't know how to handle the SYNC flag */ | |
1494 | p->flags &= ~MULTIFD_FLAG_SYNC; | |
303e6f54 | 1495 | has_data = p->normal_num || p->zero_num; |
d32ca5ad | 1496 | qemu_mutex_unlock(&p->mutex); |
d117ed06 FR |
1497 | } else { |
1498 | /* | |
1499 | * No packets, so we need to wait for the vmstate code to | |
1500 | * give us work. | |
1501 | */ | |
1502 | qemu_sem_wait(&p->sem); | |
1503 | ||
1504 | if (multifd_recv_should_exit()) { | |
1505 | break; | |
1506 | } | |
1507 | ||
1508 | /* pairs with qatomic_store_release() at multifd_recv() */ | |
1509 | if (!qatomic_load_acquire(&p->pending_job)) { | |
1510 | /* | |
1511 | * Migration thread did not send work, this is | |
1512 | * equivalent to pending_sync on the sending | |
1513 | * side. Post sem_sync to notify we reached this | |
1514 | * point. | |
1515 | */ | |
1516 | qemu_sem_post(&multifd_recv_state->sem_sync); | |
1517 | continue; | |
1518 | } | |
1519 | ||
1520 | has_data = !!p->data->size; | |
d32ca5ad JQ |
1521 | } |
1522 | ||
9db19125 FR |
1523 | if (has_data) { |
1524 | ret = multifd_recv_state->ops->recv(p, &local_err); | |
d32ca5ad JQ |
1525 | if (ret != 0) { |
1526 | break; | |
1527 | } | |
1528 | } | |
1529 | ||
06833d83 FR |
1530 | if (use_packets) { |
1531 | if (flags & MULTIFD_FLAG_SYNC) { | |
1532 | qemu_sem_post(&multifd_recv_state->sem_sync); | |
1533 | qemu_sem_wait(&p->sem_sync); | |
1534 | } | |
d117ed06 FR |
1535 | } else { |
1536 | p->total_normal_pages += p->data->size / qemu_target_page_size(); | |
1537 | p->data->size = 0; | |
1538 | /* | |
1539 | * Order data->size update before clearing | |
1540 | * pending_job. Pairs with smp_mb_acquire() at | |
1541 | * multifd_recv(). | |
1542 | */ | |
1543 | qatomic_store_release(&p->pending_job, false); | |
d32ca5ad JQ |
1544 | } |
1545 | } | |
1546 | ||
1547 | if (local_err) { | |
1548 | multifd_recv_terminate_threads(local_err); | |
13f2cb21 | 1549 | error_free(local_err); |
d32ca5ad | 1550 | } |
d32ca5ad JQ |
1551 | |
1552 | rcu_unregister_thread(); | |
303e6f54 HX |
1553 | trace_multifd_recv_thread_end(p->id, p->packets_recved, |
1554 | p->total_normal_pages, | |
1555 | p->total_zero_pages); | |
d32ca5ad JQ |
1556 | |
1557 | return NULL; | |
1558 | } | |
1559 | ||
cde85c37 | 1560 | int multifd_recv_setup(Error **errp) |
d32ca5ad JQ |
1561 | { |
1562 | int thread_count; | |
1563 | uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); | |
06833d83 | 1564 | bool use_packets = multifd_use_packets(); |
d32ca5ad JQ |
1565 | uint8_t i; |
1566 | ||
6720c2b3 | 1567 | /* |
1568 | * Return successfully if multiFD recv state is already initialised | |
1569 | * or multiFD is not enabled. | |
1570 | */ | |
51b07548 | 1571 | if (multifd_recv_state || !migrate_multifd()) { |
d32ca5ad JQ |
1572 | return 0; |
1573 | } | |
6720c2b3 | 1574 | |
d32ca5ad JQ |
1575 | thread_count = migrate_multifd_channels(); |
1576 | multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); | |
1577 | multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); | |
d117ed06 FR |
1578 | |
1579 | multifd_recv_state->data = g_new0(MultiFDRecvData, 1); | |
1580 | multifd_recv_state->data->size = 0; | |
1581 | ||
d73415a3 | 1582 | qatomic_set(&multifd_recv_state->count, 0); |
11dd7be5 | 1583 | qatomic_set(&multifd_recv_state->exiting, 0); |
d32ca5ad | 1584 | qemu_sem_init(&multifd_recv_state->sem_sync, 0); |
ab7cbb0b | 1585 | multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; |
d32ca5ad JQ |
1586 | |
1587 | for (i = 0; i < thread_count; i++) { | |
1588 | MultiFDRecvParams *p = &multifd_recv_state->params[i]; | |
1589 | ||
1590 | qemu_mutex_init(&p->mutex); | |
1591 | qemu_sem_init(&p->sem_sync, 0); | |
d117ed06 FR |
1592 | qemu_sem_init(&p->sem, 0); |
1593 | p->pending_job = false; | |
d32ca5ad | 1594 | p->id = i; |
06833d83 | 1595 | |
d117ed06 FR |
1596 | p->data = g_new0(MultiFDRecvData, 1); |
1597 | p->data->size = 0; | |
1598 | ||
06833d83 FR |
1599 | if (use_packets) { |
1600 | p->packet_len = sizeof(MultiFDPacket_t) | |
1601 | + sizeof(uint64_t) * page_count; | |
1602 | p->packet = g_malloc0(p->packet_len); | |
1603 | } | |
d32ca5ad | 1604 | p->name = g_strdup_printf("multifdrecv_%d", i); |
226468ba | 1605 | p->iov = g_new0(struct iovec, page_count); |
cf2d4aa8 | 1606 | p->normal = g_new0(ram_addr_t, page_count); |
303e6f54 | 1607 | p->zero = g_new0(ram_addr_t, page_count); |
d6f45eba | 1608 | p->page_count = page_count; |
ddec20f8 | 1609 | p->page_size = qemu_target_page_size(); |
d32ca5ad | 1610 | } |
ab7cbb0b JQ |
1611 | |
1612 | for (i = 0; i < thread_count; i++) { | |
1613 | MultiFDRecvParams *p = &multifd_recv_state->params[i]; | |
ab7cbb0b JQ |
1614 | int ret; |
1615 | ||
3fc58efa | 1616 | ret = multifd_recv_state->ops->recv_setup(p, errp); |
ab7cbb0b | 1617 | if (ret) { |
ab7cbb0b JQ |
1618 | return ret; |
1619 | } | |
1620 | } | |
d32ca5ad JQ |
1621 | return 0; |
1622 | } | |
1623 | ||
1624 | bool multifd_recv_all_channels_created(void) | |
1625 | { | |
1626 | int thread_count = migrate_multifd_channels(); | |
1627 | ||
51b07548 | 1628 | if (!migrate_multifd()) { |
d32ca5ad JQ |
1629 | return true; |
1630 | } | |
1631 | ||
a59136f3 DDAG |
1632 | if (!multifd_recv_state) { |
1633 | /* Called before any connections created */ | |
1634 | return false; | |
1635 | } | |
1636 | ||
d73415a3 | 1637 | return thread_count == qatomic_read(&multifd_recv_state->count); |
d32ca5ad JQ |
1638 | } |
1639 | ||
1640 | /* | |
1641 | * Try to receive all multifd channels to get ready for the migration. | |
6720c2b3 | 1642 | * Sets @errp when failing to receive the current channel. |
d32ca5ad | 1643 | */ |
6720c2b3 | 1644 | void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) |
d32ca5ad JQ |
1645 | { |
1646 | MultiFDRecvParams *p; | |
1647 | Error *local_err = NULL; | |
06833d83 | 1648 | bool use_packets = multifd_use_packets(); |
d32ca5ad JQ |
1649 | int id; |
1650 | ||
06833d83 FR |
1651 | if (use_packets) { |
1652 | id = multifd_recv_initial_packet(ioc, &local_err); | |
1653 | if (id < 0) { | |
1654 | multifd_recv_terminate_threads(local_err); | |
1655 | error_propagate_prepend(errp, local_err, | |
1656 | "failed to receive packet" | |
1657 | " via multifd channel %d: ", | |
1658 | qatomic_read(&multifd_recv_state->count)); | |
1659 | return; | |
1660 | } | |
1661 | trace_multifd_recv_new_channel(id); | |
1662 | } else { | |
2dd7ee7a | 1663 | id = qatomic_read(&multifd_recv_state->count); |
d32ca5ad | 1664 | } |
d32ca5ad JQ |
1665 | |
1666 | p = &multifd_recv_state->params[id]; | |
1667 | if (p->c != NULL) { | |
1668 | error_setg(&local_err, "multifd: received id '%d' already setup'", | |
1669 | id); | |
1670 | multifd_recv_terminate_threads(local_err); | |
1671 | error_propagate(errp, local_err); | |
6720c2b3 | 1672 | return; |
d32ca5ad JQ |
1673 | } |
1674 | p->c = ioc; | |
1675 | object_ref(OBJECT(ioc)); | |
d32ca5ad | 1676 | |
a2a63c4a | 1677 | p->thread_created = true; |
d32ca5ad JQ |
1678 | qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, |
1679 | QEMU_THREAD_JOINABLE); | |
d73415a3 | 1680 | qatomic_inc(&multifd_recv_state->count); |
d32ca5ad | 1681 | } |
303e6f54 HX |
1682 | |
1683 | bool multifd_send_prepare_common(MultiFDSendParams *p) | |
1684 | { | |
1685 | multifd_send_zero_page_detect(p); | |
1686 | ||
1687 | if (!p->pages->normal_num) { | |
1688 | p->next_packet_size = 0; | |
1689 | return false; | |
1690 | } | |
1691 | ||
1692 | multifd_send_prepare_header(p); | |
1693 | ||
1694 | return true; | |
1695 | } |