]> git.proxmox.com Git - mirror_qemu.git/blob - net/colo-compare.c
Use DECLARE_*CHECKER* macros
[mirror_qemu.git] / net / colo-compare.c
1 /*
2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3 * (a.k.a. Fault Tolerance or Continuous Replication)
4 *
5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6 * Copyright (c) 2016 FUJITSU LIMITED
7 * Copyright (c) 2016 Intel Corporation
8 *
9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
10 *
11 * This work is licensed under the terms of the GNU GPL, version 2 or
12 * later. See the COPYING file in the top-level directory.
13 */
14
15 #include "qemu/osdep.h"
16 #include "qemu-common.h"
17 #include "qemu/error-report.h"
18 #include "trace.h"
19 #include "qapi/error.h"
20 #include "net/net.h"
21 #include "net/eth.h"
22 #include "qom/object_interfaces.h"
23 #include "qemu/iov.h"
24 #include "qom/object.h"
25 #include "net/queue.h"
26 #include "chardev/char-fe.h"
27 #include "qemu/sockets.h"
28 #include "colo.h"
29 #include "sysemu/iothread.h"
30 #include "net/colo-compare.h"
31 #include "migration/colo.h"
32 #include "migration/migration.h"
33 #include "util.h"
34
35 #include "block/aio-wait.h"
36 #include "qemu/coroutine.h"
37
38 #define TYPE_COLO_COMPARE "colo-compare"
39 typedef struct CompareState CompareState;
40 DECLARE_INSTANCE_CHECKER(CompareState, COLO_COMPARE,
41 TYPE_COLO_COMPARE)
42
43 static QTAILQ_HEAD(, CompareState) net_compares =
44 QTAILQ_HEAD_INITIALIZER(net_compares);
45
46 static NotifierList colo_compare_notifiers =
47 NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
48
49 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
50 #define MAX_QUEUE_SIZE 1024
51
52 #define COLO_COMPARE_FREE_PRIMARY 0x01
53 #define COLO_COMPARE_FREE_SECONDARY 0x02
54
55 #define REGULAR_PACKET_CHECK_MS 3000
56 #define DEFAULT_TIME_OUT_MS 3000
57
58 static QemuMutex colo_compare_mutex;
59 static bool colo_compare_active;
60 static QemuMutex event_mtx;
61 static QemuCond event_complete_cond;
62 static int event_unhandled_count;
63 static uint32_t max_queue_size;
64
65 /*
66 * + CompareState ++
67 * | |
68 * +---------------+ +---------------+ +---------------+
69 * | conn list + - > conn + ------- > conn + -- > ......
70 * +---------------+ +---------------+ +---------------+
71 * | | | | | |
72 * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
73 * |primary | |secondary |primary | |secondary
74 * |packet | |packet + |packet | |packet +
75 * +--------+ +--------+ +--------+ +--------+
76 * | | | |
77 * +---v----+ +---v----+ +---v----+ +---v----+
78 * |primary | |secondary |primary | |secondary
79 * |packet | |packet + |packet | |packet +
80 * +--------+ +--------+ +--------+ +--------+
81 * | | | |
82 * +---v----+ +---v----+ +---v----+ +---v----+
83 * |primary | |secondary |primary | |secondary
84 * |packet | |packet + |packet | |packet +
85 * +--------+ +--------+ +--------+ +--------+
86 */
87
88 typedef struct SendCo {
89 Coroutine *co;
90 struct CompareState *s;
91 CharBackend *chr;
92 GQueue send_list;
93 bool notify_remote_frame;
94 bool done;
95 int ret;
96 } SendCo;
97
98 typedef struct SendEntry {
99 uint32_t size;
100 uint32_t vnet_hdr_len;
101 uint8_t *buf;
102 } SendEntry;
103
104 struct CompareState {
105 Object parent;
106
107 char *pri_indev;
108 char *sec_indev;
109 char *outdev;
110 char *notify_dev;
111 CharBackend chr_pri_in;
112 CharBackend chr_sec_in;
113 CharBackend chr_out;
114 CharBackend chr_notify_dev;
115 SocketReadState pri_rs;
116 SocketReadState sec_rs;
117 SocketReadState notify_rs;
118 SendCo out_sendco;
119 SendCo notify_sendco;
120 bool vnet_hdr;
121 uint32_t compare_timeout;
122 uint32_t expired_scan_cycle;
123
124 /*
125 * Record the connection that through the NIC
126 * Element type: Connection
127 */
128 GQueue conn_list;
129 /* Record the connection without repetition */
130 GHashTable *connection_track_table;
131
132 IOThread *iothread;
133 GMainContext *worker_context;
134 QEMUTimer *packet_check_timer;
135
136 QEMUBH *event_bh;
137 enum colo_event event;
138
139 QTAILQ_ENTRY(CompareState) next;
140 };
141
142 typedef struct CompareClass {
143 ObjectClass parent_class;
144 } CompareClass;
145
146 enum {
147 PRIMARY_IN = 0,
148 SECONDARY_IN,
149 };
150
151 static const char *colo_mode[] = {
152 [PRIMARY_IN] = "primary",
153 [SECONDARY_IN] = "secondary",
154 };
155
156 static int compare_chr_send(CompareState *s,
157 uint8_t *buf,
158 uint32_t size,
159 uint32_t vnet_hdr_len,
160 bool notify_remote_frame,
161 bool zero_copy);
162
163 static bool packet_matches_str(const char *str,
164 const uint8_t *buf,
165 uint32_t packet_len)
166 {
167 if (packet_len != strlen(str)) {
168 return false;
169 }
170
171 return !memcmp(str, buf, strlen(str));
172 }
173
174 static void notify_remote_frame(CompareState *s)
175 {
176 char msg[] = "DO_CHECKPOINT";
177 int ret = 0;
178
179 ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
180 if (ret < 0) {
181 error_report("Notify Xen COLO-frame failed");
182 }
183 }
184
185 static void colo_compare_inconsistency_notify(CompareState *s)
186 {
187 if (s->notify_dev) {
188 notify_remote_frame(s);
189 } else {
190 notifier_list_notify(&colo_compare_notifiers,
191 migrate_get_current());
192 }
193 }
194
195 static gint seq_sorter(Packet *a, Packet *b, gpointer data)
196 {
197 struct tcp_hdr *atcp, *btcp;
198
199 atcp = (struct tcp_hdr *)(a->transport_header);
200 btcp = (struct tcp_hdr *)(b->transport_header);
201 return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
202 }
203
204 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
205 {
206 Packet *pkt = data;
207 struct tcp_hdr *tcphd;
208
209 tcphd = (struct tcp_hdr *)pkt->transport_header;
210
211 pkt->tcp_seq = ntohl(tcphd->th_seq);
212 pkt->tcp_ack = ntohl(tcphd->th_ack);
213 *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
214 pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
215 + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
216 pkt->payload_size = pkt->size - pkt->header_size;
217 pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
218 pkt->flags = tcphd->th_flags;
219 }
220
221 /*
222 * Return 1 on success, if return 0 means the
223 * packet will be dropped
224 */
225 static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
226 {
227 if (g_queue_get_length(queue) <= max_queue_size) {
228 if (pkt->ip->ip_p == IPPROTO_TCP) {
229 fill_pkt_tcp_info(pkt, max_ack);
230 g_queue_insert_sorted(queue,
231 pkt,
232 (GCompareDataFunc)seq_sorter,
233 NULL);
234 } else {
235 g_queue_push_tail(queue, pkt);
236 }
237 return 1;
238 }
239 return 0;
240 }
241
242 /*
243 * Return 0 on success, if return -1 means the pkt
244 * is unsupported(arp and ipv6) and will be sent later
245 */
246 static int packet_enqueue(CompareState *s, int mode, Connection **con)
247 {
248 ConnectionKey key;
249 Packet *pkt = NULL;
250 Connection *conn;
251 int ret;
252
253 if (mode == PRIMARY_IN) {
254 pkt = packet_new(s->pri_rs.buf,
255 s->pri_rs.packet_len,
256 s->pri_rs.vnet_hdr_len);
257 } else {
258 pkt = packet_new(s->sec_rs.buf,
259 s->sec_rs.packet_len,
260 s->sec_rs.vnet_hdr_len);
261 }
262
263 if (parse_packet_early(pkt)) {
264 packet_destroy(pkt, NULL);
265 pkt = NULL;
266 return -1;
267 }
268 fill_connection_key(pkt, &key);
269
270 conn = connection_get(s->connection_track_table,
271 &key,
272 &s->conn_list);
273
274 if (!conn->processing) {
275 g_queue_push_tail(&s->conn_list, conn);
276 conn->processing = true;
277 }
278
279 if (mode == PRIMARY_IN) {
280 ret = colo_insert_packet(&conn->primary_list, pkt, &conn->pack);
281 } else {
282 ret = colo_insert_packet(&conn->secondary_list, pkt, &conn->sack);
283 }
284
285 if (!ret) {
286 trace_colo_compare_drop_packet(colo_mode[mode],
287 "queue size too big, drop packet");
288 packet_destroy(pkt, NULL);
289 pkt = NULL;
290 }
291
292 *con = conn;
293
294 return 0;
295 }
296
297 static inline bool after(uint32_t seq1, uint32_t seq2)
298 {
299 return (int32_t)(seq1 - seq2) > 0;
300 }
301
302 static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
303 {
304 int ret;
305 ret = compare_chr_send(s,
306 pkt->data,
307 pkt->size,
308 pkt->vnet_hdr_len,
309 false,
310 true);
311 if (ret < 0) {
312 error_report("colo send primary packet failed");
313 }
314 trace_colo_compare_main("packet same and release packet");
315 packet_destroy_partial(pkt, NULL);
316 }
317
318 /*
319 * The IP packets sent by primary and secondary
320 * will be compared in here
321 * TODO support ip fragment, Out-Of-Order
322 * return: 0 means packet same
323 * > 0 || < 0 means packet different
324 */
325 static int colo_compare_packet_payload(Packet *ppkt,
326 Packet *spkt,
327 uint16_t poffset,
328 uint16_t soffset,
329 uint16_t len)
330
331 {
332 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
333 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
334
335 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
336 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
337 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
338 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
339
340 trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
341 pri_ip_dst, spkt->size,
342 sec_ip_src, sec_ip_dst);
343 }
344
345 return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
346 }
347
348 /*
349 * return true means that the payload is consist and
350 * need to make the next comparison, false means do
351 * the checkpoint
352 */
353 static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
354 int8_t *mark, uint32_t max_ack)
355 {
356 *mark = 0;
357
358 if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
359 if (!colo_compare_packet_payload(ppkt, spkt,
360 ppkt->header_size, spkt->header_size,
361 ppkt->payload_size)) {
362 *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
363 return true;
364 }
365 }
366
367 /* one part of secondary packet payload still need to be compared */
368 if (!after(ppkt->seq_end, spkt->seq_end)) {
369 if (!colo_compare_packet_payload(ppkt, spkt,
370 ppkt->header_size + ppkt->offset,
371 spkt->header_size + spkt->offset,
372 ppkt->payload_size - ppkt->offset)) {
373 if (!after(ppkt->tcp_ack, max_ack)) {
374 *mark = COLO_COMPARE_FREE_PRIMARY;
375 spkt->offset += ppkt->payload_size - ppkt->offset;
376 return true;
377 } else {
378 /* secondary guest hasn't ack the data, don't send
379 * out this packet
380 */
381 return false;
382 }
383 }
384 } else {
385 /* primary packet is longer than secondary packet, compare
386 * the same part and mark the primary packet offset
387 */
388 if (!colo_compare_packet_payload(ppkt, spkt,
389 ppkt->header_size + ppkt->offset,
390 spkt->header_size + spkt->offset,
391 spkt->payload_size - spkt->offset)) {
392 *mark = COLO_COMPARE_FREE_SECONDARY;
393 ppkt->offset += spkt->payload_size - spkt->offset;
394 return true;
395 }
396 }
397
398 return false;
399 }
400
401 static void colo_compare_tcp(CompareState *s, Connection *conn)
402 {
403 Packet *ppkt = NULL, *spkt = NULL;
404 int8_t mark;
405
406 /*
407 * If ppkt and spkt have the same payload, but ppkt's ACK
408 * is greater than spkt's ACK, in this case we can not
409 * send the ppkt because it will cause the secondary guest
410 * to miss sending some data in the next. Therefore, we
411 * record the maximum ACK in the current queue at both
412 * primary side and secondary side. Only when the ack is
413 * less than the smaller of the two maximum ack, then we
414 * can ensure that the packet's payload is acknowledged by
415 * primary and secondary.
416 */
417 uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
418
419 pri:
420 if (g_queue_is_empty(&conn->primary_list)) {
421 return;
422 }
423 ppkt = g_queue_pop_head(&conn->primary_list);
424 sec:
425 if (g_queue_is_empty(&conn->secondary_list)) {
426 g_queue_push_head(&conn->primary_list, ppkt);
427 return;
428 }
429 spkt = g_queue_pop_head(&conn->secondary_list);
430
431 if (ppkt->tcp_seq == ppkt->seq_end) {
432 colo_release_primary_pkt(s, ppkt);
433 ppkt = NULL;
434 }
435
436 if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
437 trace_colo_compare_main("pri: this packet has compared");
438 colo_release_primary_pkt(s, ppkt);
439 ppkt = NULL;
440 }
441
442 if (spkt->tcp_seq == spkt->seq_end) {
443 packet_destroy(spkt, NULL);
444 if (!ppkt) {
445 goto pri;
446 } else {
447 goto sec;
448 }
449 } else {
450 if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
451 trace_colo_compare_main("sec: this packet has compared");
452 packet_destroy(spkt, NULL);
453 if (!ppkt) {
454 goto pri;
455 } else {
456 goto sec;
457 }
458 }
459 if (!ppkt) {
460 g_queue_push_head(&conn->secondary_list, spkt);
461 goto pri;
462 }
463 }
464
465 if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
466 trace_colo_compare_tcp_info("pri",
467 ppkt->tcp_seq, ppkt->tcp_ack,
468 ppkt->header_size, ppkt->payload_size,
469 ppkt->offset, ppkt->flags);
470
471 trace_colo_compare_tcp_info("sec",
472 spkt->tcp_seq, spkt->tcp_ack,
473 spkt->header_size, spkt->payload_size,
474 spkt->offset, spkt->flags);
475
476 if (mark == COLO_COMPARE_FREE_PRIMARY) {
477 conn->compare_seq = ppkt->seq_end;
478 colo_release_primary_pkt(s, ppkt);
479 g_queue_push_head(&conn->secondary_list, spkt);
480 goto pri;
481 }
482 if (mark == COLO_COMPARE_FREE_SECONDARY) {
483 conn->compare_seq = spkt->seq_end;
484 packet_destroy(spkt, NULL);
485 goto sec;
486 }
487 if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
488 conn->compare_seq = ppkt->seq_end;
489 colo_release_primary_pkt(s, ppkt);
490 packet_destroy(spkt, NULL);
491 goto pri;
492 }
493 } else {
494 g_queue_push_head(&conn->primary_list, ppkt);
495 g_queue_push_head(&conn->secondary_list, spkt);
496
497 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
498 qemu_hexdump((char *)ppkt->data, stderr,
499 "colo-compare ppkt", ppkt->size);
500 qemu_hexdump((char *)spkt->data, stderr,
501 "colo-compare spkt", spkt->size);
502 }
503
504 colo_compare_inconsistency_notify(s);
505 }
506 }
507
508
509 /*
510 * Called from the compare thread on the primary
511 * for compare udp packet
512 */
513 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
514 {
515 uint16_t network_header_length = ppkt->ip->ip_hl << 2;
516 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
517
518 trace_colo_compare_main("compare udp");
519
520 /*
521 * Because of ppkt and spkt are both in the same connection,
522 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
523 * same with spkt. In addition, IP header's Identification is a random
524 * field, we can handle it in IP fragmentation function later.
525 * COLO just concern the response net packet payload from primary guest
526 * and secondary guest are same or not, So we ignored all IP header include
527 * other field like TOS,TTL,IP Checksum. we only need to compare
528 * the ip payload here.
529 */
530 if (ppkt->size != spkt->size) {
531 trace_colo_compare_main("UDP: payload size of packets are different");
532 return -1;
533 }
534 if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
535 ppkt->size - offset)) {
536 trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
537 trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
538 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
539 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
540 ppkt->size);
541 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
542 spkt->size);
543 }
544 return -1;
545 } else {
546 return 0;
547 }
548 }
549
550 /*
551 * Called from the compare thread on the primary
552 * for compare icmp packet
553 */
554 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
555 {
556 uint16_t network_header_length = ppkt->ip->ip_hl << 2;
557 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
558
559 trace_colo_compare_main("compare icmp");
560
561 /*
562 * Because of ppkt and spkt are both in the same connection,
563 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
564 * same with spkt. In addition, IP header's Identification is a random
565 * field, we can handle it in IP fragmentation function later.
566 * COLO just concern the response net packet payload from primary guest
567 * and secondary guest are same or not, So we ignored all IP header include
568 * other field like TOS,TTL,IP Checksum. we only need to compare
569 * the ip payload here.
570 */
571 if (ppkt->size != spkt->size) {
572 trace_colo_compare_main("ICMP: payload size of packets are different");
573 return -1;
574 }
575 if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
576 ppkt->size - offset)) {
577 trace_colo_compare_icmp_miscompare("primary pkt size",
578 ppkt->size);
579 trace_colo_compare_icmp_miscompare("Secondary pkt size",
580 spkt->size);
581 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
582 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
583 ppkt->size);
584 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
585 spkt->size);
586 }
587 return -1;
588 } else {
589 return 0;
590 }
591 }
592
593 /*
594 * Called from the compare thread on the primary
595 * for compare other packet
596 */
597 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
598 {
599 uint16_t offset = ppkt->vnet_hdr_len;
600
601 trace_colo_compare_main("compare other");
602 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
603 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
604
605 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
606 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
607 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
608 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
609
610 trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
611 pri_ip_dst, spkt->size,
612 sec_ip_src, sec_ip_dst);
613 }
614
615 if (ppkt->size != spkt->size) {
616 trace_colo_compare_main("Other: payload size of packets are different");
617 return -1;
618 }
619 return colo_compare_packet_payload(ppkt, spkt, offset, offset,
620 ppkt->size - offset);
621 }
622
623 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
624 {
625 int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
626
627 if ((now - pkt->creation_ms) > (*check_time)) {
628 trace_colo_old_packet_check_found(pkt->creation_ms);
629 return 0;
630 } else {
631 return 1;
632 }
633 }
634
635 void colo_compare_register_notifier(Notifier *notify)
636 {
637 notifier_list_add(&colo_compare_notifiers, notify);
638 }
639
640 void colo_compare_unregister_notifier(Notifier *notify)
641 {
642 notifier_remove(notify);
643 }
644
645 static int colo_old_packet_check_one_conn(Connection *conn,
646 CompareState *s)
647 {
648 GList *result = NULL;
649
650 result = g_queue_find_custom(&conn->primary_list,
651 &s->compare_timeout,
652 (GCompareFunc)colo_old_packet_check_one);
653
654 if (result) {
655 /* Do checkpoint will flush old packet */
656 colo_compare_inconsistency_notify(s);
657 return 0;
658 }
659
660 return 1;
661 }
662
663 /*
664 * Look for old packets that the secondary hasn't matched,
665 * if we have some then we have to checkpoint to wake
666 * the secondary up.
667 */
668 static void colo_old_packet_check(void *opaque)
669 {
670 CompareState *s = opaque;
671
672 /*
673 * If we find one old packet, stop finding job and notify
674 * COLO frame do checkpoint.
675 */
676 g_queue_find_custom(&s->conn_list, s,
677 (GCompareFunc)colo_old_packet_check_one_conn);
678 }
679
680 static void colo_compare_packet(CompareState *s, Connection *conn,
681 int (*HandlePacket)(Packet *spkt,
682 Packet *ppkt))
683 {
684 Packet *pkt = NULL;
685 GList *result = NULL;
686
687 while (!g_queue_is_empty(&conn->primary_list) &&
688 !g_queue_is_empty(&conn->secondary_list)) {
689 pkt = g_queue_pop_head(&conn->primary_list);
690 result = g_queue_find_custom(&conn->secondary_list,
691 pkt, (GCompareFunc)HandlePacket);
692
693 if (result) {
694 colo_release_primary_pkt(s, pkt);
695 g_queue_remove(&conn->secondary_list, result->data);
696 } else {
697 /*
698 * If one packet arrive late, the secondary_list or
699 * primary_list will be empty, so we can't compare it
700 * until next comparison. If the packets in the list are
701 * timeout, it will trigger a checkpoint request.
702 */
703 trace_colo_compare_main("packet different");
704 g_queue_push_head(&conn->primary_list, pkt);
705
706 colo_compare_inconsistency_notify(s);
707 break;
708 }
709 }
710 }
711
712 /*
713 * Called from the compare thread on the primary
714 * for compare packet with secondary list of the
715 * specified connection when a new packet was
716 * queued to it.
717 */
718 static void colo_compare_connection(void *opaque, void *user_data)
719 {
720 CompareState *s = user_data;
721 Connection *conn = opaque;
722
723 switch (conn->ip_proto) {
724 case IPPROTO_TCP:
725 colo_compare_tcp(s, conn);
726 break;
727 case IPPROTO_UDP:
728 colo_compare_packet(s, conn, colo_packet_compare_udp);
729 break;
730 case IPPROTO_ICMP:
731 colo_compare_packet(s, conn, colo_packet_compare_icmp);
732 break;
733 default:
734 colo_compare_packet(s, conn, colo_packet_compare_other);
735 break;
736 }
737 }
738
739 static void coroutine_fn _compare_chr_send(void *opaque)
740 {
741 SendCo *sendco = opaque;
742 CompareState *s = sendco->s;
743 int ret = 0;
744
745 while (!g_queue_is_empty(&sendco->send_list)) {
746 SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
747 uint32_t len = htonl(entry->size);
748
749 ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len));
750
751 if (ret != sizeof(len)) {
752 g_free(entry->buf);
753 g_slice_free(SendEntry, entry);
754 goto err;
755 }
756
757 if (!sendco->notify_remote_frame && s->vnet_hdr) {
758 /*
759 * We send vnet header len make other module(like filter-redirector)
760 * know how to parse net packet correctly.
761 */
762 len = htonl(entry->vnet_hdr_len);
763
764 ret = qemu_chr_fe_write_all(sendco->chr,
765 (uint8_t *)&len,
766 sizeof(len));
767
768 if (ret != sizeof(len)) {
769 g_free(entry->buf);
770 g_slice_free(SendEntry, entry);
771 goto err;
772 }
773 }
774
775 ret = qemu_chr_fe_write_all(sendco->chr,
776 (uint8_t *)entry->buf,
777 entry->size);
778
779 if (ret != entry->size) {
780 g_free(entry->buf);
781 g_slice_free(SendEntry, entry);
782 goto err;
783 }
784
785 g_free(entry->buf);
786 g_slice_free(SendEntry, entry);
787 }
788
789 sendco->ret = 0;
790 goto out;
791
792 err:
793 while (!g_queue_is_empty(&sendco->send_list)) {
794 SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
795 g_free(entry->buf);
796 g_slice_free(SendEntry, entry);
797 }
798 sendco->ret = ret < 0 ? ret : -EIO;
799 out:
800 sendco->co = NULL;
801 sendco->done = true;
802 aio_wait_kick();
803 }
804
805 static int compare_chr_send(CompareState *s,
806 uint8_t *buf,
807 uint32_t size,
808 uint32_t vnet_hdr_len,
809 bool notify_remote_frame,
810 bool zero_copy)
811 {
812 SendCo *sendco;
813 SendEntry *entry;
814
815 if (notify_remote_frame) {
816 sendco = &s->notify_sendco;
817 } else {
818 sendco = &s->out_sendco;
819 }
820
821 if (!size) {
822 return 0;
823 }
824
825 entry = g_slice_new(SendEntry);
826 entry->size = size;
827 entry->vnet_hdr_len = vnet_hdr_len;
828 if (zero_copy) {
829 entry->buf = buf;
830 } else {
831 entry->buf = g_malloc(size);
832 memcpy(entry->buf, buf, size);
833 }
834 g_queue_push_head(&sendco->send_list, entry);
835
836 if (sendco->done) {
837 sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
838 sendco->done = false;
839 qemu_coroutine_enter(sendco->co);
840 if (sendco->done) {
841 /* report early errors */
842 return sendco->ret;
843 }
844 }
845
846 /* assume success */
847 return 0;
848 }
849
850 static int compare_chr_can_read(void *opaque)
851 {
852 return COMPARE_READ_LEN_MAX;
853 }
854
855 /*
856 * Called from the main thread on the primary for packets
857 * arriving over the socket from the primary.
858 */
859 static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
860 {
861 CompareState *s = COLO_COMPARE(opaque);
862 int ret;
863
864 ret = net_fill_rstate(&s->pri_rs, buf, size);
865 if (ret == -1) {
866 qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
867 NULL, NULL, true);
868 error_report("colo-compare primary_in error");
869 }
870 }
871
872 /*
873 * Called from the main thread on the primary for packets
874 * arriving over the socket from the secondary.
875 */
876 static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
877 {
878 CompareState *s = COLO_COMPARE(opaque);
879 int ret;
880
881 ret = net_fill_rstate(&s->sec_rs, buf, size);
882 if (ret == -1) {
883 qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
884 NULL, NULL, true);
885 error_report("colo-compare secondary_in error");
886 }
887 }
888
889 static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
890 {
891 CompareState *s = COLO_COMPARE(opaque);
892 int ret;
893
894 ret = net_fill_rstate(&s->notify_rs, buf, size);
895 if (ret == -1) {
896 qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
897 NULL, NULL, true);
898 error_report("colo-compare notify_dev error");
899 }
900 }
901
902 /*
903 * Check old packet regularly so it can watch for any packets
904 * that the secondary hasn't produced equivalents of.
905 */
906 static void check_old_packet_regular(void *opaque)
907 {
908 CompareState *s = opaque;
909
910 /* if have old packet we will notify checkpoint */
911 colo_old_packet_check(s);
912 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
913 s->expired_scan_cycle);
914 }
915
916 /* Public API, Used for COLO frame to notify compare event */
917 void colo_notify_compares_event(void *opaque, int event, Error **errp)
918 {
919 CompareState *s;
920 qemu_mutex_lock(&colo_compare_mutex);
921
922 if (!colo_compare_active) {
923 qemu_mutex_unlock(&colo_compare_mutex);
924 return;
925 }
926
927 qemu_mutex_lock(&event_mtx);
928 QTAILQ_FOREACH(s, &net_compares, next) {
929 s->event = event;
930 qemu_bh_schedule(s->event_bh);
931 event_unhandled_count++;
932 }
933 /* Wait all compare threads to finish handling this event */
934 while (event_unhandled_count > 0) {
935 qemu_cond_wait(&event_complete_cond, &event_mtx);
936 }
937
938 qemu_mutex_unlock(&event_mtx);
939 qemu_mutex_unlock(&colo_compare_mutex);
940 }
941
942 static void colo_compare_timer_init(CompareState *s)
943 {
944 AioContext *ctx = iothread_get_aio_context(s->iothread);
945
946 s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
947 SCALE_MS, check_old_packet_regular,
948 s);
949 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
950 s->expired_scan_cycle);
951 }
952
953 static void colo_compare_timer_del(CompareState *s)
954 {
955 if (s->packet_check_timer) {
956 timer_del(s->packet_check_timer);
957 timer_free(s->packet_check_timer);
958 s->packet_check_timer = NULL;
959 }
960 }
961
962 static void colo_flush_packets(void *opaque, void *user_data);
963
964 static void colo_compare_handle_event(void *opaque)
965 {
966 CompareState *s = opaque;
967
968 switch (s->event) {
969 case COLO_EVENT_CHECKPOINT:
970 g_queue_foreach(&s->conn_list, colo_flush_packets, s);
971 break;
972 case COLO_EVENT_FAILOVER:
973 break;
974 default:
975 break;
976 }
977
978 qemu_mutex_lock(&event_mtx);
979 assert(event_unhandled_count > 0);
980 event_unhandled_count--;
981 qemu_cond_broadcast(&event_complete_cond);
982 qemu_mutex_unlock(&event_mtx);
983 }
984
985 static void colo_compare_iothread(CompareState *s)
986 {
987 AioContext *ctx = iothread_get_aio_context(s->iothread);
988 object_ref(OBJECT(s->iothread));
989 s->worker_context = iothread_get_g_main_context(s->iothread);
990
991 qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
992 compare_pri_chr_in, NULL, NULL,
993 s, s->worker_context, true);
994 qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
995 compare_sec_chr_in, NULL, NULL,
996 s, s->worker_context, true);
997 if (s->notify_dev) {
998 qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
999 compare_notify_chr, NULL, NULL,
1000 s, s->worker_context, true);
1001 }
1002
1003 colo_compare_timer_init(s);
1004 s->event_bh = aio_bh_new(ctx, colo_compare_handle_event, s);
1005 }
1006
1007 static char *compare_get_pri_indev(Object *obj, Error **errp)
1008 {
1009 CompareState *s = COLO_COMPARE(obj);
1010
1011 return g_strdup(s->pri_indev);
1012 }
1013
1014 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
1015 {
1016 CompareState *s = COLO_COMPARE(obj);
1017
1018 g_free(s->pri_indev);
1019 s->pri_indev = g_strdup(value);
1020 }
1021
1022 static char *compare_get_sec_indev(Object *obj, Error **errp)
1023 {
1024 CompareState *s = COLO_COMPARE(obj);
1025
1026 return g_strdup(s->sec_indev);
1027 }
1028
1029 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
1030 {
1031 CompareState *s = COLO_COMPARE(obj);
1032
1033 g_free(s->sec_indev);
1034 s->sec_indev = g_strdup(value);
1035 }
1036
1037 static char *compare_get_outdev(Object *obj, Error **errp)
1038 {
1039 CompareState *s = COLO_COMPARE(obj);
1040
1041 return g_strdup(s->outdev);
1042 }
1043
1044 static void compare_set_outdev(Object *obj, const char *value, Error **errp)
1045 {
1046 CompareState *s = COLO_COMPARE(obj);
1047
1048 g_free(s->outdev);
1049 s->outdev = g_strdup(value);
1050 }
1051
1052 static bool compare_get_vnet_hdr(Object *obj, Error **errp)
1053 {
1054 CompareState *s = COLO_COMPARE(obj);
1055
1056 return s->vnet_hdr;
1057 }
1058
1059 static void compare_set_vnet_hdr(Object *obj,
1060 bool value,
1061 Error **errp)
1062 {
1063 CompareState *s = COLO_COMPARE(obj);
1064
1065 s->vnet_hdr = value;
1066 }
1067
1068 static char *compare_get_notify_dev(Object *obj, Error **errp)
1069 {
1070 CompareState *s = COLO_COMPARE(obj);
1071
1072 return g_strdup(s->notify_dev);
1073 }
1074
1075 static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
1076 {
1077 CompareState *s = COLO_COMPARE(obj);
1078
1079 g_free(s->notify_dev);
1080 s->notify_dev = g_strdup(value);
1081 }
1082
1083 static void compare_get_timeout(Object *obj, Visitor *v,
1084 const char *name, void *opaque,
1085 Error **errp)
1086 {
1087 CompareState *s = COLO_COMPARE(obj);
1088 uint32_t value = s->compare_timeout;
1089
1090 visit_type_uint32(v, name, &value, errp);
1091 }
1092
1093 static void compare_set_timeout(Object *obj, Visitor *v,
1094 const char *name, void *opaque,
1095 Error **errp)
1096 {
1097 CompareState *s = COLO_COMPARE(obj);
1098 uint32_t value;
1099
1100 if (!visit_type_uint32(v, name, &value, errp)) {
1101 return;
1102 }
1103 if (!value) {
1104 error_setg(errp, "Property '%s.%s' requires a positive value",
1105 object_get_typename(obj), name);
1106 return;
1107 }
1108 s->compare_timeout = value;
1109 }
1110
1111 static void compare_get_expired_scan_cycle(Object *obj, Visitor *v,
1112 const char *name, void *opaque,
1113 Error **errp)
1114 {
1115 CompareState *s = COLO_COMPARE(obj);
1116 uint32_t value = s->expired_scan_cycle;
1117
1118 visit_type_uint32(v, name, &value, errp);
1119 }
1120
1121 static void compare_set_expired_scan_cycle(Object *obj, Visitor *v,
1122 const char *name, void *opaque,
1123 Error **errp)
1124 {
1125 CompareState *s = COLO_COMPARE(obj);
1126 uint32_t value;
1127
1128 if (!visit_type_uint32(v, name, &value, errp)) {
1129 return;
1130 }
1131 if (!value) {
1132 error_setg(errp, "Property '%s.%s' requires a positive value",
1133 object_get_typename(obj), name);
1134 return;
1135 }
1136 s->expired_scan_cycle = value;
1137 }
1138
1139 static void get_max_queue_size(Object *obj, Visitor *v,
1140 const char *name, void *opaque,
1141 Error **errp)
1142 {
1143 uint32_t value = max_queue_size;
1144
1145 visit_type_uint32(v, name, &value, errp);
1146 }
1147
1148 static void set_max_queue_size(Object *obj, Visitor *v,
1149 const char *name, void *opaque,
1150 Error **errp)
1151 {
1152 Error *local_err = NULL;
1153 uint32_t value;
1154
1155 visit_type_uint32(v, name, &value, &local_err);
1156 if (local_err) {
1157 goto out;
1158 }
1159 if (!value) {
1160 error_setg(&local_err, "Property '%s.%s' requires a positive value",
1161 object_get_typename(obj), name);
1162 goto out;
1163 }
1164 max_queue_size = value;
1165
1166 out:
1167 error_propagate(errp, local_err);
1168 }
1169
1170 static void compare_pri_rs_finalize(SocketReadState *pri_rs)
1171 {
1172 CompareState *s = container_of(pri_rs, CompareState, pri_rs);
1173 Connection *conn = NULL;
1174
1175 if (packet_enqueue(s, PRIMARY_IN, &conn)) {
1176 trace_colo_compare_main("primary: unsupported packet in");
1177 compare_chr_send(s,
1178 pri_rs->buf,
1179 pri_rs->packet_len,
1180 pri_rs->vnet_hdr_len,
1181 false,
1182 false);
1183 } else {
1184 /* compare packet in the specified connection */
1185 colo_compare_connection(conn, s);
1186 }
1187 }
1188
1189 static void compare_sec_rs_finalize(SocketReadState *sec_rs)
1190 {
1191 CompareState *s = container_of(sec_rs, CompareState, sec_rs);
1192 Connection *conn = NULL;
1193
1194 if (packet_enqueue(s, SECONDARY_IN, &conn)) {
1195 trace_colo_compare_main("secondary: unsupported packet in");
1196 } else {
1197 /* compare packet in the specified connection */
1198 colo_compare_connection(conn, s);
1199 }
1200 }
1201
1202 static void compare_notify_rs_finalize(SocketReadState *notify_rs)
1203 {
1204 CompareState *s = container_of(notify_rs, CompareState, notify_rs);
1205
1206 const char msg[] = "COLO_COMPARE_GET_XEN_INIT";
1207 int ret;
1208
1209 if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
1210 notify_rs->buf,
1211 notify_rs->packet_len)) {
1212 ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
1213 if (ret < 0) {
1214 error_report("Notify Xen COLO-frame INIT failed");
1215 }
1216 } else if (packet_matches_str("COLO_CHECKPOINT",
1217 notify_rs->buf,
1218 notify_rs->packet_len)) {
1219 /* colo-compare do checkpoint, flush pri packet and remove sec packet */
1220 g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1221 } else {
1222 error_report("COLO compare got unsupported instruction");
1223 }
1224 }
1225
1226 /*
1227 * Return 0 is success.
1228 * Return 1 is failed.
1229 */
1230 static int find_and_check_chardev(Chardev **chr,
1231 char *chr_name,
1232 Error **errp)
1233 {
1234 *chr = qemu_chr_find(chr_name);
1235 if (*chr == NULL) {
1236 error_setg(errp, "Device '%s' not found",
1237 chr_name);
1238 return 1;
1239 }
1240
1241 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
1242 error_setg(errp, "chardev \"%s\" is not reconnectable",
1243 chr_name);
1244 return 1;
1245 }
1246
1247 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
1248 error_setg(errp, "chardev \"%s\" cannot switch context",
1249 chr_name);
1250 return 1;
1251 }
1252
1253 return 0;
1254 }
1255
1256 /*
1257 * Called from the main thread on the primary
1258 * to setup colo-compare.
1259 */
1260 static void colo_compare_complete(UserCreatable *uc, Error **errp)
1261 {
1262 CompareState *s = COLO_COMPARE(uc);
1263 Chardev *chr;
1264
1265 if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
1266 error_setg(errp, "colo compare needs 'primary_in' ,"
1267 "'secondary_in','outdev','iothread' property set");
1268 return;
1269 } else if (!strcmp(s->pri_indev, s->outdev) ||
1270 !strcmp(s->sec_indev, s->outdev) ||
1271 !strcmp(s->pri_indev, s->sec_indev)) {
1272 error_setg(errp, "'indev' and 'outdev' could not be same "
1273 "for compare module");
1274 return;
1275 }
1276
1277 if (!s->compare_timeout) {
1278 /* Set default value to 3000 MS */
1279 s->compare_timeout = DEFAULT_TIME_OUT_MS;
1280 }
1281
1282 if (!s->expired_scan_cycle) {
1283 /* Set default value to 3000 MS */
1284 s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS;
1285 }
1286
1287 if (!max_queue_size) {
1288 /* Set default queue size to 1024 */
1289 max_queue_size = MAX_QUEUE_SIZE;
1290 }
1291
1292 if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
1293 !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
1294 return;
1295 }
1296
1297 if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
1298 !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
1299 return;
1300 }
1301
1302 if (find_and_check_chardev(&chr, s->outdev, errp) ||
1303 !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
1304 return;
1305 }
1306
1307 net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
1308 net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
1309
1310 /* Try to enable remote notify chardev, currently just for Xen COLO */
1311 if (s->notify_dev) {
1312 if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
1313 !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
1314 return;
1315 }
1316
1317 net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
1318 s->vnet_hdr);
1319 }
1320
1321 s->out_sendco.s = s;
1322 s->out_sendco.chr = &s->chr_out;
1323 s->out_sendco.notify_remote_frame = false;
1324 s->out_sendco.done = true;
1325 g_queue_init(&s->out_sendco.send_list);
1326
1327 if (s->notify_dev) {
1328 s->notify_sendco.s = s;
1329 s->notify_sendco.chr = &s->chr_notify_dev;
1330 s->notify_sendco.notify_remote_frame = true;
1331 s->notify_sendco.done = true;
1332 g_queue_init(&s->notify_sendco.send_list);
1333 }
1334
1335 g_queue_init(&s->conn_list);
1336
1337 s->connection_track_table = g_hash_table_new_full(connection_key_hash,
1338 connection_key_equal,
1339 g_free,
1340 connection_destroy);
1341
1342 colo_compare_iothread(s);
1343
1344 qemu_mutex_lock(&colo_compare_mutex);
1345 if (!colo_compare_active) {
1346 qemu_mutex_init(&event_mtx);
1347 qemu_cond_init(&event_complete_cond);
1348 colo_compare_active = true;
1349 }
1350 QTAILQ_INSERT_TAIL(&net_compares, s, next);
1351 qemu_mutex_unlock(&colo_compare_mutex);
1352
1353 return;
1354 }
1355
1356 static void colo_flush_packets(void *opaque, void *user_data)
1357 {
1358 CompareState *s = user_data;
1359 Connection *conn = opaque;
1360 Packet *pkt = NULL;
1361
1362 while (!g_queue_is_empty(&conn->primary_list)) {
1363 pkt = g_queue_pop_head(&conn->primary_list);
1364 compare_chr_send(s,
1365 pkt->data,
1366 pkt->size,
1367 pkt->vnet_hdr_len,
1368 false,
1369 true);
1370 packet_destroy_partial(pkt, NULL);
1371 }
1372 while (!g_queue_is_empty(&conn->secondary_list)) {
1373 pkt = g_queue_pop_head(&conn->secondary_list);
1374 packet_destroy(pkt, NULL);
1375 }
1376 }
1377
1378 static void colo_compare_class_init(ObjectClass *oc, void *data)
1379 {
1380 UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
1381
1382 ucc->complete = colo_compare_complete;
1383 }
1384
1385 static void colo_compare_init(Object *obj)
1386 {
1387 CompareState *s = COLO_COMPARE(obj);
1388
1389 object_property_add_str(obj, "primary_in",
1390 compare_get_pri_indev, compare_set_pri_indev);
1391 object_property_add_str(obj, "secondary_in",
1392 compare_get_sec_indev, compare_set_sec_indev);
1393 object_property_add_str(obj, "outdev",
1394 compare_get_outdev, compare_set_outdev);
1395 object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
1396 (Object **)&s->iothread,
1397 object_property_allow_set_link,
1398 OBJ_PROP_LINK_STRONG);
1399 /* This parameter just for Xen COLO */
1400 object_property_add_str(obj, "notify_dev",
1401 compare_get_notify_dev, compare_set_notify_dev);
1402
1403 object_property_add(obj, "compare_timeout", "uint32",
1404 compare_get_timeout,
1405 compare_set_timeout, NULL, NULL);
1406
1407 object_property_add(obj, "expired_scan_cycle", "uint32",
1408 compare_get_expired_scan_cycle,
1409 compare_set_expired_scan_cycle, NULL, NULL);
1410
1411 object_property_add(obj, "max_queue_size", "uint32",
1412 get_max_queue_size,
1413 set_max_queue_size, NULL, NULL);
1414
1415 s->vnet_hdr = false;
1416 object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
1417 compare_set_vnet_hdr);
1418 }
1419
1420 static void colo_compare_finalize(Object *obj)
1421 {
1422 CompareState *s = COLO_COMPARE(obj);
1423 CompareState *tmp = NULL;
1424
1425 qemu_mutex_lock(&colo_compare_mutex);
1426 QTAILQ_FOREACH(tmp, &net_compares, next) {
1427 if (tmp == s) {
1428 QTAILQ_REMOVE(&net_compares, s, next);
1429 break;
1430 }
1431 }
1432 if (QTAILQ_EMPTY(&net_compares)) {
1433 colo_compare_active = false;
1434 qemu_mutex_destroy(&event_mtx);
1435 qemu_cond_destroy(&event_complete_cond);
1436 }
1437 qemu_mutex_unlock(&colo_compare_mutex);
1438
1439 qemu_chr_fe_deinit(&s->chr_pri_in, false);
1440 qemu_chr_fe_deinit(&s->chr_sec_in, false);
1441 qemu_chr_fe_deinit(&s->chr_out, false);
1442 if (s->notify_dev) {
1443 qemu_chr_fe_deinit(&s->chr_notify_dev, false);
1444 }
1445
1446 colo_compare_timer_del(s);
1447
1448 qemu_bh_delete(s->event_bh);
1449
1450 AioContext *ctx = iothread_get_aio_context(s->iothread);
1451 aio_context_acquire(ctx);
1452 AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
1453 if (s->notify_dev) {
1454 AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
1455 }
1456 aio_context_release(ctx);
1457
1458 /* Release all unhandled packets after compare thead exited */
1459 g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1460 AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
1461
1462 g_queue_clear(&s->conn_list);
1463 g_queue_clear(&s->out_sendco.send_list);
1464 if (s->notify_dev) {
1465 g_queue_clear(&s->notify_sendco.send_list);
1466 }
1467
1468 if (s->connection_track_table) {
1469 g_hash_table_destroy(s->connection_track_table);
1470 }
1471
1472 object_unref(OBJECT(s->iothread));
1473
1474 g_free(s->pri_indev);
1475 g_free(s->sec_indev);
1476 g_free(s->outdev);
1477 g_free(s->notify_dev);
1478 }
1479
1480 static void __attribute__((__constructor__)) colo_compare_init_globals(void)
1481 {
1482 colo_compare_active = false;
1483 qemu_mutex_init(&colo_compare_mutex);
1484 }
1485
1486 static const TypeInfo colo_compare_info = {
1487 .name = TYPE_COLO_COMPARE,
1488 .parent = TYPE_OBJECT,
1489 .instance_size = sizeof(CompareState),
1490 .instance_init = colo_compare_init,
1491 .instance_finalize = colo_compare_finalize,
1492 .class_size = sizeof(CompareClass),
1493 .class_init = colo_compare_class_init,
1494 .interfaces = (InterfaceInfo[]) {
1495 { TYPE_USER_CREATABLE },
1496 { }
1497 }
1498 };
1499
1500 static void register_types(void)
1501 {
1502 type_register_static(&colo_compare_info);
1503 }
1504
1505 type_init(register_types);