]> git.proxmox.com Git - mirror_qemu.git/blob - net/colo-compare.c
colo-compare: Insert packet into the suitable position of packet queue directly
[mirror_qemu.git] / net / colo-compare.c
1 /*
2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3 * (a.k.a. Fault Tolerance or Continuous Replication)
4 *
5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6 * Copyright (c) 2016 FUJITSU LIMITED
7 * Copyright (c) 2016 Intel Corporation
8 *
9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
10 *
11 * This work is licensed under the terms of the GNU GPL, version 2 or
12 * later. See the COPYING file in the top-level directory.
13 */
14
15 #include "qemu/osdep.h"
16 #include "qemu/error-report.h"
17 #include "trace.h"
18 #include "qemu-common.h"
19 #include "qapi/qmp/qerror.h"
20 #include "qapi/error.h"
21 #include "net/net.h"
22 #include "net/eth.h"
23 #include "qom/object_interfaces.h"
24 #include "qemu/iov.h"
25 #include "qom/object.h"
26 #include "qemu/typedefs.h"
27 #include "net/queue.h"
28 #include "chardev/char-fe.h"
29 #include "qemu/sockets.h"
30 #include "qapi-visit.h"
31 #include "net/colo.h"
32 #include "sysemu/iothread.h"
33
34 #define TYPE_COLO_COMPARE "colo-compare"
35 #define COLO_COMPARE(obj) \
36 OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
37
38 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
39 #define MAX_QUEUE_SIZE 1024
40
41 /* TODO: Should be configurable */
42 #define REGULAR_PACKET_CHECK_MS 3000
43
44 /*
45 * + CompareState ++
46 * | |
47 * +---------------+ +---------------+ +---------------+
48 * | conn list + - > conn + ------- > conn + -- > ......
49 * +---------------+ +---------------+ +---------------+
50 * | | | | | |
51 * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
52 * |primary | |secondary |primary | |secondary
53 * |packet | |packet + |packet | |packet +
54 * +--------+ +--------+ +--------+ +--------+
55 * | | | |
56 * +---v----+ +---v----+ +---v----+ +---v----+
57 * |primary | |secondary |primary | |secondary
58 * |packet | |packet + |packet | |packet +
59 * +--------+ +--------+ +--------+ +--------+
60 * | | | |
61 * +---v----+ +---v----+ +---v----+ +---v----+
62 * |primary | |secondary |primary | |secondary
63 * |packet | |packet + |packet | |packet +
64 * +--------+ +--------+ +--------+ +--------+
65 */
66 typedef struct CompareState {
67 Object parent;
68
69 char *pri_indev;
70 char *sec_indev;
71 char *outdev;
72 CharBackend chr_pri_in;
73 CharBackend chr_sec_in;
74 CharBackend chr_out;
75 SocketReadState pri_rs;
76 SocketReadState sec_rs;
77 bool vnet_hdr;
78
79 /*
80 * Record the connection that through the NIC
81 * Element type: Connection
82 */
83 GQueue conn_list;
84 /* Record the connection without repetition */
85 GHashTable *connection_track_table;
86
87 IOThread *iothread;
88 GMainContext *worker_context;
89 QEMUTimer *packet_check_timer;
90 } CompareState;
91
92 typedef struct CompareClass {
93 ObjectClass parent_class;
94 } CompareClass;
95
96 enum {
97 PRIMARY_IN = 0,
98 SECONDARY_IN,
99 };
100
101 static int compare_chr_send(CompareState *s,
102 const uint8_t *buf,
103 uint32_t size,
104 uint32_t vnet_hdr_len);
105
106 static gint seq_sorter(Packet *a, Packet *b, gpointer data)
107 {
108 struct tcphdr *atcp, *btcp;
109
110 atcp = (struct tcphdr *)(a->transport_header);
111 btcp = (struct tcphdr *)(b->transport_header);
112 return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
113 }
114
115 /*
116 * Return 1 on success, if return 0 means the
117 * packet will be dropped
118 */
119 static int colo_insert_packet(GQueue *queue, Packet *pkt)
120 {
121 if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
122 if (pkt->ip->ip_p == IPPROTO_TCP) {
123 g_queue_insert_sorted(queue,
124 pkt,
125 (GCompareDataFunc)seq_sorter,
126 NULL);
127 } else {
128 g_queue_push_tail(queue, pkt);
129 }
130 return 1;
131 }
132 return 0;
133 }
134
135 /*
136 * Return 0 on success, if return -1 means the pkt
137 * is unsupported(arp and ipv6) and will be sent later
138 */
139 static int packet_enqueue(CompareState *s, int mode)
140 {
141 ConnectionKey key;
142 Packet *pkt = NULL;
143 Connection *conn;
144
145 if (mode == PRIMARY_IN) {
146 pkt = packet_new(s->pri_rs.buf,
147 s->pri_rs.packet_len,
148 s->pri_rs.vnet_hdr_len);
149 } else {
150 pkt = packet_new(s->sec_rs.buf,
151 s->sec_rs.packet_len,
152 s->sec_rs.vnet_hdr_len);
153 }
154
155 if (parse_packet_early(pkt)) {
156 packet_destroy(pkt, NULL);
157 pkt = NULL;
158 return -1;
159 }
160 fill_connection_key(pkt, &key);
161
162 conn = connection_get(s->connection_track_table,
163 &key,
164 &s->conn_list);
165
166 if (!conn->processing) {
167 g_queue_push_tail(&s->conn_list, conn);
168 conn->processing = true;
169 }
170
171 if (mode == PRIMARY_IN) {
172 if (!colo_insert_packet(&conn->primary_list, pkt)) {
173 error_report("colo compare primary queue size too big,"
174 "drop packet");
175 }
176 } else {
177 if (!colo_insert_packet(&conn->secondary_list, pkt)) {
178 error_report("colo compare secondary queue size too big,"
179 "drop packet");
180 }
181 }
182
183 return 0;
184 }
185
186 /*
187 * The IP packets sent by primary and secondary
188 * will be compared in here
189 * TODO support ip fragment, Out-Of-Order
190 * return: 0 means packet same
191 * > 0 || < 0 means packet different
192 */
193 static int colo_packet_compare_common(Packet *ppkt,
194 Packet *spkt,
195 int poffset,
196 int soffset)
197 {
198 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
199 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
200
201 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
202 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
203 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
204 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
205
206 trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
207 pri_ip_dst, spkt->size,
208 sec_ip_src, sec_ip_dst);
209 }
210
211 poffset = ppkt->vnet_hdr_len + poffset;
212 soffset = ppkt->vnet_hdr_len + soffset;
213
214 if (ppkt->size - poffset == spkt->size - soffset) {
215 return memcmp(ppkt->data + poffset,
216 spkt->data + soffset,
217 spkt->size - soffset);
218 } else {
219 trace_colo_compare_main("Net packet size are not the same");
220 return -1;
221 }
222 }
223
224 /*
225 * Called from the compare thread on the primary
226 * for compare tcp packet
227 * compare_tcp copied from Dr. David Alan Gilbert's branch
228 */
229 static int colo_packet_compare_tcp(Packet *spkt, Packet *ppkt)
230 {
231 struct tcphdr *ptcp, *stcp;
232 int res;
233
234 trace_colo_compare_main("compare tcp");
235
236 ptcp = (struct tcphdr *)ppkt->transport_header;
237 stcp = (struct tcphdr *)spkt->transport_header;
238
239 /*
240 * The 'identification' field in the IP header is *very* random
241 * it almost never matches. Fudge this by ignoring differences in
242 * unfragmented packets; they'll normally sort themselves out if different
243 * anyway, and it should recover at the TCP level.
244 * An alternative would be to get both the primary and secondary to rewrite
245 * somehow; but that would need some sync traffic to sync the state
246 */
247 if (ntohs(ppkt->ip->ip_off) & IP_DF) {
248 spkt->ip->ip_id = ppkt->ip->ip_id;
249 /* and the sum will be different if the IDs were different */
250 spkt->ip->ip_sum = ppkt->ip->ip_sum;
251 }
252
253 /*
254 * Check tcp header length for tcp option field.
255 * th_off > 5 means this tcp packet have options field.
256 * The tcp options maybe always different.
257 * for example:
258 * From RFC 7323.
259 * TCP Timestamps option (TSopt):
260 * Kind: 8
261 *
262 * Length: 10 bytes
263 *
264 * +-------+-------+---------------------+---------------------+
265 * |Kind=8 | 10 | TS Value (TSval) |TS Echo Reply (TSecr)|
266 * +-------+-------+---------------------+---------------------+
267 * 1 1 4 4
268 *
269 * In this case the primary guest's timestamp always different with
270 * the secondary guest's timestamp. COLO just focus on payload,
271 * so we just need skip this field.
272 */
273 if (ptcp->th_off > 5) {
274 ptrdiff_t ptcp_offset, stcp_offset;
275
276 ptcp_offset = ppkt->transport_header - (uint8_t *)ppkt->data
277 + (ptcp->th_off * 4) - ppkt->vnet_hdr_len;
278 stcp_offset = spkt->transport_header - (uint8_t *)spkt->data
279 + (stcp->th_off * 4) - spkt->vnet_hdr_len;
280
281 /*
282 * When network is busy, some tcp options(like sack) will unpredictable
283 * occur in primary side or secondary side. it will make packet size
284 * not same, but the two packet's payload is identical. colo just
285 * care about packet payload, so we skip the option field.
286 */
287 res = colo_packet_compare_common(ppkt, spkt, ptcp_offset, stcp_offset);
288 } else if (ptcp->th_sum == stcp->th_sum) {
289 res = colo_packet_compare_common(ppkt, spkt, ETH_HLEN, ETH_HLEN);
290 } else {
291 res = -1;
292 }
293
294 if (res != 0 &&
295 trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
296 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
297
298 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
299 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
300 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
301 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
302
303 trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
304 pri_ip_dst, spkt->size,
305 sec_ip_src, sec_ip_dst);
306
307 trace_colo_compare_tcp_info("pri tcp packet",
308 ntohl(ptcp->th_seq),
309 ntohl(ptcp->th_ack),
310 res, ptcp->th_flags,
311 ppkt->size);
312
313 trace_colo_compare_tcp_info("sec tcp packet",
314 ntohl(stcp->th_seq),
315 ntohl(stcp->th_ack),
316 res, stcp->th_flags,
317 spkt->size);
318
319 qemu_hexdump((char *)ppkt->data, stderr,
320 "colo-compare ppkt", ppkt->size);
321 qemu_hexdump((char *)spkt->data, stderr,
322 "colo-compare spkt", spkt->size);
323 }
324
325 return res;
326 }
327
328 /*
329 * Called from the compare thread on the primary
330 * for compare udp packet
331 */
332 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
333 {
334 int ret;
335 int network_header_length = ppkt->ip->ip_hl * 4;
336
337 trace_colo_compare_main("compare udp");
338
339 /*
340 * Because of ppkt and spkt are both in the same connection,
341 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
342 * same with spkt. In addition, IP header's Identification is a random
343 * field, we can handle it in IP fragmentation function later.
344 * COLO just concern the response net packet payload from primary guest
345 * and secondary guest are same or not, So we ignored all IP header include
346 * other field like TOS,TTL,IP Checksum. we only need to compare
347 * the ip payload here.
348 */
349 ret = colo_packet_compare_common(ppkt, spkt,
350 network_header_length + ETH_HLEN,
351 network_header_length + ETH_HLEN);
352
353 if (ret) {
354 trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
355 trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
356 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
357 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
358 ppkt->size);
359 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
360 spkt->size);
361 }
362 }
363
364 return ret;
365 }
366
367 /*
368 * Called from the compare thread on the primary
369 * for compare icmp packet
370 */
371 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
372 {
373 int network_header_length = ppkt->ip->ip_hl * 4;
374
375 trace_colo_compare_main("compare icmp");
376
377 /*
378 * Because of ppkt and spkt are both in the same connection,
379 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
380 * same with spkt. In addition, IP header's Identification is a random
381 * field, we can handle it in IP fragmentation function later.
382 * COLO just concern the response net packet payload from primary guest
383 * and secondary guest are same or not, So we ignored all IP header include
384 * other field like TOS,TTL,IP Checksum. we only need to compare
385 * the ip payload here.
386 */
387 if (colo_packet_compare_common(ppkt, spkt,
388 network_header_length + ETH_HLEN,
389 network_header_length + ETH_HLEN)) {
390 trace_colo_compare_icmp_miscompare("primary pkt size",
391 ppkt->size);
392 trace_colo_compare_icmp_miscompare("Secondary pkt size",
393 spkt->size);
394 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
395 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
396 ppkt->size);
397 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
398 spkt->size);
399 }
400 return -1;
401 } else {
402 return 0;
403 }
404 }
405
406 /*
407 * Called from the compare thread on the primary
408 * for compare other packet
409 */
410 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
411 {
412 trace_colo_compare_main("compare other");
413 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
414 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
415
416 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
417 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
418 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
419 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
420
421 trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
422 pri_ip_dst, spkt->size,
423 sec_ip_src, sec_ip_dst);
424 }
425
426 return colo_packet_compare_common(ppkt, spkt, 0, 0);
427 }
428
429 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
430 {
431 int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
432
433 if ((now - pkt->creation_ms) > (*check_time)) {
434 trace_colo_old_packet_check_found(pkt->creation_ms);
435 return 0;
436 } else {
437 return 1;
438 }
439 }
440
441 static int colo_old_packet_check_one_conn(Connection *conn,
442 void *user_data)
443 {
444 GList *result = NULL;
445 int64_t check_time = REGULAR_PACKET_CHECK_MS;
446
447 result = g_queue_find_custom(&conn->primary_list,
448 &check_time,
449 (GCompareFunc)colo_old_packet_check_one);
450
451 if (result) {
452 /* Do checkpoint will flush old packet */
453 /*
454 * TODO: Notify colo frame to do checkpoint.
455 * colo_compare_inconsistent_notify();
456 */
457 return 0;
458 }
459
460 return 1;
461 }
462
463 /*
464 * Look for old packets that the secondary hasn't matched,
465 * if we have some then we have to checkpoint to wake
466 * the secondary up.
467 */
468 static void colo_old_packet_check(void *opaque)
469 {
470 CompareState *s = opaque;
471
472 /*
473 * If we find one old packet, stop finding job and notify
474 * COLO frame do checkpoint.
475 */
476 g_queue_find_custom(&s->conn_list, NULL,
477 (GCompareFunc)colo_old_packet_check_one_conn);
478 }
479
480 /*
481 * Called from the compare thread on the primary
482 * for compare connection
483 */
484 static void colo_compare_connection(void *opaque, void *user_data)
485 {
486 CompareState *s = user_data;
487 Connection *conn = opaque;
488 Packet *pkt = NULL;
489 GList *result = NULL;
490 int ret;
491
492 while (!g_queue_is_empty(&conn->primary_list) &&
493 !g_queue_is_empty(&conn->secondary_list)) {
494 pkt = g_queue_pop_head(&conn->primary_list);
495 switch (conn->ip_proto) {
496 case IPPROTO_TCP:
497 result = g_queue_find_custom(&conn->secondary_list,
498 pkt, (GCompareFunc)colo_packet_compare_tcp);
499 break;
500 case IPPROTO_UDP:
501 result = g_queue_find_custom(&conn->secondary_list,
502 pkt, (GCompareFunc)colo_packet_compare_udp);
503 break;
504 case IPPROTO_ICMP:
505 result = g_queue_find_custom(&conn->secondary_list,
506 pkt, (GCompareFunc)colo_packet_compare_icmp);
507 break;
508 default:
509 result = g_queue_find_custom(&conn->secondary_list,
510 pkt, (GCompareFunc)colo_packet_compare_other);
511 break;
512 }
513
514 if (result) {
515 ret = compare_chr_send(s,
516 pkt->data,
517 pkt->size,
518 pkt->vnet_hdr_len);
519 if (ret < 0) {
520 error_report("colo_send_primary_packet failed");
521 }
522 trace_colo_compare_main("packet same and release packet");
523 g_queue_remove(&conn->secondary_list, result->data);
524 packet_destroy(pkt, NULL);
525 } else {
526 /*
527 * If one packet arrive late, the secondary_list or
528 * primary_list will be empty, so we can't compare it
529 * until next comparison.
530 */
531 trace_colo_compare_main("packet different");
532 g_queue_push_head(&conn->primary_list, pkt);
533 /* TODO: colo_notify_checkpoint();*/
534 break;
535 }
536 }
537 }
538
539 static int compare_chr_send(CompareState *s,
540 const uint8_t *buf,
541 uint32_t size,
542 uint32_t vnet_hdr_len)
543 {
544 int ret = 0;
545 uint32_t len = htonl(size);
546
547 if (!size) {
548 return 0;
549 }
550
551 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
552 if (ret != sizeof(len)) {
553 goto err;
554 }
555
556 if (s->vnet_hdr) {
557 /*
558 * We send vnet header len make other module(like filter-redirector)
559 * know how to parse net packet correctly.
560 */
561 len = htonl(vnet_hdr_len);
562 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
563 if (ret != sizeof(len)) {
564 goto err;
565 }
566 }
567
568 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
569 if (ret != size) {
570 goto err;
571 }
572
573 return 0;
574
575 err:
576 return ret < 0 ? ret : -EIO;
577 }
578
579 static int compare_chr_can_read(void *opaque)
580 {
581 return COMPARE_READ_LEN_MAX;
582 }
583
584 /*
585 * Called from the main thread on the primary for packets
586 * arriving over the socket from the primary.
587 */
588 static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
589 {
590 CompareState *s = COLO_COMPARE(opaque);
591 int ret;
592
593 ret = net_fill_rstate(&s->pri_rs, buf, size);
594 if (ret == -1) {
595 qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
596 NULL, NULL, true);
597 error_report("colo-compare primary_in error");
598 }
599 }
600
601 /*
602 * Called from the main thread on the primary for packets
603 * arriving over the socket from the secondary.
604 */
605 static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
606 {
607 CompareState *s = COLO_COMPARE(opaque);
608 int ret;
609
610 ret = net_fill_rstate(&s->sec_rs, buf, size);
611 if (ret == -1) {
612 qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
613 NULL, NULL, true);
614 error_report("colo-compare secondary_in error");
615 }
616 }
617
618 /*
619 * Check old packet regularly so it can watch for any packets
620 * that the secondary hasn't produced equivalents of.
621 */
622 static void check_old_packet_regular(void *opaque)
623 {
624 CompareState *s = opaque;
625
626 /* if have old packet we will notify checkpoint */
627 colo_old_packet_check(s);
628 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
629 REGULAR_PACKET_CHECK_MS);
630 }
631
632 static void colo_compare_timer_init(CompareState *s)
633 {
634 AioContext *ctx = iothread_get_aio_context(s->iothread);
635
636 s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
637 SCALE_MS, check_old_packet_regular,
638 s);
639 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
640 REGULAR_PACKET_CHECK_MS);
641 }
642
643 static void colo_compare_timer_del(CompareState *s)
644 {
645 if (s->packet_check_timer) {
646 timer_del(s->packet_check_timer);
647 timer_free(s->packet_check_timer);
648 s->packet_check_timer = NULL;
649 }
650 }
651
652 static void colo_compare_iothread(CompareState *s)
653 {
654 object_ref(OBJECT(s->iothread));
655 s->worker_context = iothread_get_g_main_context(s->iothread);
656
657 qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
658 compare_pri_chr_in, NULL, NULL,
659 s, s->worker_context, true);
660 qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
661 compare_sec_chr_in, NULL, NULL,
662 s, s->worker_context, true);
663
664 colo_compare_timer_init(s);
665 }
666
667 static char *compare_get_pri_indev(Object *obj, Error **errp)
668 {
669 CompareState *s = COLO_COMPARE(obj);
670
671 return g_strdup(s->pri_indev);
672 }
673
674 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
675 {
676 CompareState *s = COLO_COMPARE(obj);
677
678 g_free(s->pri_indev);
679 s->pri_indev = g_strdup(value);
680 }
681
682 static char *compare_get_sec_indev(Object *obj, Error **errp)
683 {
684 CompareState *s = COLO_COMPARE(obj);
685
686 return g_strdup(s->sec_indev);
687 }
688
689 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
690 {
691 CompareState *s = COLO_COMPARE(obj);
692
693 g_free(s->sec_indev);
694 s->sec_indev = g_strdup(value);
695 }
696
697 static char *compare_get_outdev(Object *obj, Error **errp)
698 {
699 CompareState *s = COLO_COMPARE(obj);
700
701 return g_strdup(s->outdev);
702 }
703
704 static void compare_set_outdev(Object *obj, const char *value, Error **errp)
705 {
706 CompareState *s = COLO_COMPARE(obj);
707
708 g_free(s->outdev);
709 s->outdev = g_strdup(value);
710 }
711
712 static bool compare_get_vnet_hdr(Object *obj, Error **errp)
713 {
714 CompareState *s = COLO_COMPARE(obj);
715
716 return s->vnet_hdr;
717 }
718
719 static void compare_set_vnet_hdr(Object *obj,
720 bool value,
721 Error **errp)
722 {
723 CompareState *s = COLO_COMPARE(obj);
724
725 s->vnet_hdr = value;
726 }
727
728 static void compare_pri_rs_finalize(SocketReadState *pri_rs)
729 {
730 CompareState *s = container_of(pri_rs, CompareState, pri_rs);
731
732 if (packet_enqueue(s, PRIMARY_IN)) {
733 trace_colo_compare_main("primary: unsupported packet in");
734 compare_chr_send(s,
735 pri_rs->buf,
736 pri_rs->packet_len,
737 pri_rs->vnet_hdr_len);
738 } else {
739 /* compare connection */
740 g_queue_foreach(&s->conn_list, colo_compare_connection, s);
741 }
742 }
743
744 static void compare_sec_rs_finalize(SocketReadState *sec_rs)
745 {
746 CompareState *s = container_of(sec_rs, CompareState, sec_rs);
747
748 if (packet_enqueue(s, SECONDARY_IN)) {
749 trace_colo_compare_main("secondary: unsupported packet in");
750 } else {
751 /* compare connection */
752 g_queue_foreach(&s->conn_list, colo_compare_connection, s);
753 }
754 }
755
756
757 /*
758 * Return 0 is success.
759 * Return 1 is failed.
760 */
761 static int find_and_check_chardev(Chardev **chr,
762 char *chr_name,
763 Error **errp)
764 {
765 *chr = qemu_chr_find(chr_name);
766 if (*chr == NULL) {
767 error_setg(errp, "Device '%s' not found",
768 chr_name);
769 return 1;
770 }
771
772 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
773 error_setg(errp, "chardev \"%s\" is not reconnectable",
774 chr_name);
775 return 1;
776 }
777
778 return 0;
779 }
780
781 /*
782 * Called from the main thread on the primary
783 * to setup colo-compare.
784 */
785 static void colo_compare_complete(UserCreatable *uc, Error **errp)
786 {
787 CompareState *s = COLO_COMPARE(uc);
788 Chardev *chr;
789
790 if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
791 error_setg(errp, "colo compare needs 'primary_in' ,"
792 "'secondary_in','outdev','iothread' property set");
793 return;
794 } else if (!strcmp(s->pri_indev, s->outdev) ||
795 !strcmp(s->sec_indev, s->outdev) ||
796 !strcmp(s->pri_indev, s->sec_indev)) {
797 error_setg(errp, "'indev' and 'outdev' could not be same "
798 "for compare module");
799 return;
800 }
801
802 if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
803 !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
804 return;
805 }
806
807 if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
808 !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
809 return;
810 }
811
812 if (find_and_check_chardev(&chr, s->outdev, errp) ||
813 !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
814 return;
815 }
816
817 net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
818 net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
819
820 g_queue_init(&s->conn_list);
821
822 s->connection_track_table = g_hash_table_new_full(connection_key_hash,
823 connection_key_equal,
824 g_free,
825 connection_destroy);
826
827 colo_compare_iothread(s);
828 return;
829 }
830
831 static void colo_flush_packets(void *opaque, void *user_data)
832 {
833 CompareState *s = user_data;
834 Connection *conn = opaque;
835 Packet *pkt = NULL;
836
837 while (!g_queue_is_empty(&conn->primary_list)) {
838 pkt = g_queue_pop_head(&conn->primary_list);
839 compare_chr_send(s,
840 pkt->data,
841 pkt->size,
842 pkt->vnet_hdr_len);
843 packet_destroy(pkt, NULL);
844 }
845 while (!g_queue_is_empty(&conn->secondary_list)) {
846 pkt = g_queue_pop_head(&conn->secondary_list);
847 packet_destroy(pkt, NULL);
848 }
849 }
850
851 static void colo_compare_class_init(ObjectClass *oc, void *data)
852 {
853 UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
854
855 ucc->complete = colo_compare_complete;
856 }
857
858 static void colo_compare_init(Object *obj)
859 {
860 CompareState *s = COLO_COMPARE(obj);
861
862 object_property_add_str(obj, "primary_in",
863 compare_get_pri_indev, compare_set_pri_indev,
864 NULL);
865 object_property_add_str(obj, "secondary_in",
866 compare_get_sec_indev, compare_set_sec_indev,
867 NULL);
868 object_property_add_str(obj, "outdev",
869 compare_get_outdev, compare_set_outdev,
870 NULL);
871 object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
872 (Object **)&s->iothread,
873 object_property_allow_set_link,
874 OBJ_PROP_LINK_UNREF_ON_RELEASE, NULL);
875
876 s->vnet_hdr = false;
877 object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
878 compare_set_vnet_hdr, NULL);
879 }
880
881 static void colo_compare_finalize(Object *obj)
882 {
883 CompareState *s = COLO_COMPARE(obj);
884
885 qemu_chr_fe_deinit(&s->chr_pri_in, false);
886 qemu_chr_fe_deinit(&s->chr_sec_in, false);
887 qemu_chr_fe_deinit(&s->chr_out, false);
888 if (s->iothread) {
889 colo_compare_timer_del(s);
890 }
891 /* Release all unhandled packets after compare thead exited */
892 g_queue_foreach(&s->conn_list, colo_flush_packets, s);
893
894 g_queue_clear(&s->conn_list);
895
896 if (s->connection_track_table) {
897 g_hash_table_destroy(s->connection_track_table);
898 }
899
900 if (s->iothread) {
901 object_unref(OBJECT(s->iothread));
902 }
903 g_free(s->pri_indev);
904 g_free(s->sec_indev);
905 g_free(s->outdev);
906 }
907
908 static const TypeInfo colo_compare_info = {
909 .name = TYPE_COLO_COMPARE,
910 .parent = TYPE_OBJECT,
911 .instance_size = sizeof(CompareState),
912 .instance_init = colo_compare_init,
913 .instance_finalize = colo_compare_finalize,
914 .class_size = sizeof(CompareClass),
915 .class_init = colo_compare_class_init,
916 .interfaces = (InterfaceInfo[]) {
917 { TYPE_USER_CREATABLE },
918 { }
919 }
920 };
921
922 static void register_types(void)
923 {
924 type_register_static(&colo_compare_info);
925 }
926
927 type_init(register_types);