]> git.proxmox.com Git - mirror_qemu.git/blame - net/colo-compare.c
COLO-compare: Add new parameter to communicate with remote colo-frame
[mirror_qemu.git] / net / colo-compare.c
CommitLineData
7dce4e6f
ZC
1/*
2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3 * (a.k.a. Fault Tolerance or Continuous Replication)
4 *
5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6 * Copyright (c) 2016 FUJITSU LIMITED
7 * Copyright (c) 2016 Intel Corporation
8 *
9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
10 *
11 * This work is licensed under the terms of the GNU GPL, version 2 or
12 * later. See the COPYING file in the top-level directory.
13 */
14
15#include "qemu/osdep.h"
a8d25326 16#include "qemu-common.h"
7dce4e6f 17#include "qemu/error-report.h"
59509ec1 18#include "trace.h"
7dce4e6f
ZC
19#include "qapi/error.h"
20#include "net/net.h"
f4b61836 21#include "net/eth.h"
7dce4e6f
ZC
22#include "qom/object_interfaces.h"
23#include "qemu/iov.h"
24#include "qom/object.h"
7dce4e6f 25#include "net/queue.h"
4d43a603 26#include "chardev/char-fe.h"
7dce4e6f 27#include "qemu/sockets.h"
f27f01db 28#include "colo.h"
dd321ecf 29#include "sysemu/iothread.h"
0ffcece3
ZC
30#include "net/colo-compare.h"
31#include "migration/colo.h"
dccd0313 32#include "migration/migration.h"
e05ae1d9 33#include "util.h"
7dce4e6f
ZC
34
35#define TYPE_COLO_COMPARE "colo-compare"
36#define COLO_COMPARE(obj) \
37 OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
38
0ffcece3
ZC
39static QTAILQ_HEAD(, CompareState) net_compares =
40 QTAILQ_HEAD_INITIALIZER(net_compares);
41
dccd0313
ZC
42static NotifierList colo_compare_notifiers =
43 NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
44
0682e15b 45#define COMPARE_READ_LEN_MAX NET_BUFSIZE
b6540d40
ZC
46#define MAX_QUEUE_SIZE 1024
47
f449c9e5
MZ
48#define COLO_COMPARE_FREE_PRIMARY 0x01
49#define COLO_COMPARE_FREE_SECONDARY 0x02
50
0682e15b
ZC
51/* TODO: Should be configurable */
52#define REGULAR_PACKET_CHECK_MS 3000
53
0ffcece3
ZC
54static QemuMutex event_mtx;
55static QemuCond event_complete_cond;
56static int event_unhandled_count;
57
59509ec1 58/*
61c5f469
ZC
59 * + CompareState ++
60 * | |
61 * +---------------+ +---------------+ +---------------+
62 * | conn list + - > conn + ------- > conn + -- > ......
63 * +---------------+ +---------------+ +---------------+
64 * | | | | | |
65 * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
66 * |primary | |secondary |primary | |secondary
67 * |packet | |packet + |packet | |packet +
68 * +--------+ +--------+ +--------+ +--------+
69 * | | | |
70 * +---v----+ +---v----+ +---v----+ +---v----+
71 * |primary | |secondary |primary | |secondary
72 * |packet | |packet + |packet | |packet +
73 * +--------+ +--------+ +--------+ +--------+
74 * | | | |
75 * +---v----+ +---v----+ +---v----+ +---v----+
76 * |primary | |secondary |primary | |secondary
77 * |packet | |packet + |packet | |packet +
78 * +--------+ +--------+ +--------+ +--------+
79 */
7dce4e6f
ZC
80typedef struct CompareState {
81 Object parent;
82
83 char *pri_indev;
84 char *sec_indev;
85 char *outdev;
cf6af766 86 char *notify_dev;
32a6ebec
MAL
87 CharBackend chr_pri_in;
88 CharBackend chr_sec_in;
89 CharBackend chr_out;
7dce4e6f
ZC
90 SocketReadState pri_rs;
91 SocketReadState sec_rs;
aa3a7032 92 bool vnet_hdr;
59509ec1 93
61c5f469
ZC
94 /*
95 * Record the connection that through the NIC
96 * Element type: Connection
b6540d40
ZC
97 */
98 GQueue conn_list;
61c5f469 99 /* Record the connection without repetition */
59509ec1 100 GHashTable *connection_track_table;
dfd917a9 101
dd321ecf 102 IOThread *iothread;
b43decb0 103 GMainContext *worker_context;
dd321ecf 104 QEMUTimer *packet_check_timer;
0ffcece3
ZC
105
106 QEMUBH *event_bh;
107 enum colo_event event;
108
109 QTAILQ_ENTRY(CompareState) next;
7dce4e6f
ZC
110} CompareState;
111
112typedef struct CompareClass {
113 ObjectClass parent_class;
114} CompareClass;
115
59509ec1
ZC
116enum {
117 PRIMARY_IN = 0,
118 SECONDARY_IN,
119};
120
24525e93
ZC
121static void colo_compare_inconsistency_notify(void)
122{
123 notifier_list_notify(&colo_compare_notifiers,
124 migrate_get_current());
125}
126
3037e7a5 127static int compare_chr_send(CompareState *s,
59509ec1 128 const uint8_t *buf,
aa3a7032
ZC
129 uint32_t size,
130 uint32_t vnet_hdr_len);
59509ec1 131
a935cc31
ZC
132static gint seq_sorter(Packet *a, Packet *b, gpointer data)
133{
e05ae1d9 134 struct tcp_hdr *atcp, *btcp;
a935cc31 135
e05ae1d9
MAL
136 atcp = (struct tcp_hdr *)(a->transport_header);
137 btcp = (struct tcp_hdr *)(b->transport_header);
a935cc31
ZC
138 return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
139}
140
f449c9e5
MZ
141static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
142{
143 Packet *pkt = data;
e05ae1d9 144 struct tcp_hdr *tcphd;
f449c9e5 145
e05ae1d9 146 tcphd = (struct tcp_hdr *)pkt->transport_header;
f449c9e5
MZ
147
148 pkt->tcp_seq = ntohl(tcphd->th_seq);
149 pkt->tcp_ack = ntohl(tcphd->th_ack);
150 *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
151 pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
152 + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
153 pkt->payload_size = pkt->size - pkt->header_size;
154 pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
155 pkt->flags = tcphd->th_flags;
156}
157
8850d4ca
MZ
158/*
159 * Return 1 on success, if return 0 means the
160 * packet will be dropped
161 */
f449c9e5 162static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
8850d4ca
MZ
163{
164 if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
165 if (pkt->ip->ip_p == IPPROTO_TCP) {
f449c9e5 166 fill_pkt_tcp_info(pkt, max_ack);
8850d4ca
MZ
167 g_queue_insert_sorted(queue,
168 pkt,
169 (GCompareDataFunc)seq_sorter,
170 NULL);
171 } else {
172 g_queue_push_tail(queue, pkt);
173 }
174 return 1;
175 }
176 return 0;
177}
178
59509ec1
ZC
179/*
180 * Return 0 on success, if return -1 means the pkt
181 * is unsupported(arp and ipv6) and will be sent later
182 */
8ec14402 183static int packet_enqueue(CompareState *s, int mode, Connection **con)
59509ec1 184{
b6540d40 185 ConnectionKey key;
59509ec1 186 Packet *pkt = NULL;
b6540d40 187 Connection *conn;
59509ec1
ZC
188
189 if (mode == PRIMARY_IN) {
ada1a33f
ZC
190 pkt = packet_new(s->pri_rs.buf,
191 s->pri_rs.packet_len,
192 s->pri_rs.vnet_hdr_len);
59509ec1 193 } else {
ada1a33f
ZC
194 pkt = packet_new(s->sec_rs.buf,
195 s->sec_rs.packet_len,
196 s->sec_rs.vnet_hdr_len);
59509ec1
ZC
197 }
198
199 if (parse_packet_early(pkt)) {
200 packet_destroy(pkt, NULL);
201 pkt = NULL;
202 return -1;
203 }
b6540d40 204 fill_connection_key(pkt, &key);
59509ec1 205
b6540d40
ZC
206 conn = connection_get(s->connection_track_table,
207 &key,
208 &s->conn_list);
59509ec1 209
b6540d40
ZC
210 if (!conn->processing) {
211 g_queue_push_tail(&s->conn_list, conn);
212 conn->processing = true;
213 }
214
215 if (mode == PRIMARY_IN) {
f449c9e5 216 if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) {
b6540d40
ZC
217 error_report("colo compare primary queue size too big,"
218 "drop packet");
219 }
220 } else {
f449c9e5 221 if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) {
b6540d40
ZC
222 error_report("colo compare secondary queue size too big,"
223 "drop packet");
224 }
225 }
4d366235 226 *con = conn;
59509ec1
ZC
227
228 return 0;
229}
230
f449c9e5
MZ
231static inline bool after(uint32_t seq1, uint32_t seq2)
232{
233 return (int32_t)(seq1 - seq2) > 0;
234}
235
236static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
237{
238 int ret;
239 ret = compare_chr_send(s,
240 pkt->data,
241 pkt->size,
242 pkt->vnet_hdr_len);
243 if (ret < 0) {
244 error_report("colo send primary packet failed");
245 }
246 trace_colo_compare_main("packet same and release packet");
247 packet_destroy(pkt, NULL);
248}
249
0682e15b
ZC
250/*
251 * The IP packets sent by primary and secondary
252 * will be compared in here
253 * TODO support ip fragment, Out-Of-Order
254 * return: 0 means packet same
255 * > 0 || < 0 means packet different
256 */
9394133f
MZ
257static int colo_compare_packet_payload(Packet *ppkt,
258 Packet *spkt,
259 uint16_t poffset,
260 uint16_t soffset,
261 uint16_t len)
262
0682e15b 263{
d87aa138 264 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
e630b2bf
ZC
265 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
266
267 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
268 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
269 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
270 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
271
272 trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
273 pri_ip_dst, spkt->size,
274 sec_ip_src, sec_ip_dst);
275 }
0682e15b 276
9394133f 277 return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
0682e15b
ZC
278}
279
f4b61836 280/*
f449c9e5
MZ
281 * return true means that the payload is consist and
282 * need to make the next comparison, false means do
283 * the checkpoint
284*/
285static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
286 int8_t *mark, uint32_t max_ack)
0682e15b 287{
f449c9e5
MZ
288 *mark = 0;
289
f449c9e5
MZ
290 if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
291 if (colo_compare_packet_payload(ppkt, spkt,
292 ppkt->header_size, spkt->header_size,
293 ppkt->payload_size)) {
294 *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
295 return true;
296 }
297 }
f4b61836 298
f449c9e5
MZ
299 /* one part of secondary packet payload still need to be compared */
300 if (!after(ppkt->seq_end, spkt->seq_end)) {
301 if (colo_compare_packet_payload(ppkt, spkt,
302 ppkt->header_size + ppkt->offset,
303 spkt->header_size + spkt->offset,
304 ppkt->payload_size - ppkt->offset)) {
305 if (!after(ppkt->tcp_ack, max_ack)) {
306 *mark = COLO_COMPARE_FREE_PRIMARY;
307 spkt->offset += ppkt->payload_size - ppkt->offset;
308 return true;
309 } else {
310 /* secondary guest hasn't ack the data, don't send
311 * out this packet
312 */
313 return false;
314 }
315 }
316 } else {
317 /* primary packet is longer than secondary packet, compare
318 * the same part and mark the primary packet offset
319 */
320 if (colo_compare_packet_payload(ppkt, spkt,
321 ppkt->header_size + ppkt->offset,
322 spkt->header_size + spkt->offset,
323 spkt->payload_size - spkt->offset)) {
324 *mark = COLO_COMPARE_FREE_SECONDARY;
325 ppkt->offset += spkt->payload_size - spkt->offset;
326 return true;
327 }
328 }
329
330 return false;
331}
2ad7ca4c 332
f449c9e5
MZ
333static void colo_compare_tcp(CompareState *s, Connection *conn)
334{
335 Packet *ppkt = NULL, *spkt = NULL;
336 int8_t mark;
f4b61836
ZC
337
338 /*
f449c9e5
MZ
339 * If ppkt and spkt have the same payload, but ppkt's ACK
340 * is greater than spkt's ACK, in this case we can not
341 * send the ppkt because it will cause the secondary guest
342 * to miss sending some data in the next. Therefore, we
343 * record the maximum ACK in the current queue at both
344 * primary side and secondary side. Only when the ack is
345 * less than the smaller of the two maximum ack, then we
346 * can ensure that the packet's payload is acknowledged by
347 * primary and secondary.
348 */
349 uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
350
351pri:
352 if (g_queue_is_empty(&conn->primary_list)) {
353 return;
354 }
355 ppkt = g_queue_pop_head(&conn->primary_list);
356sec:
357 if (g_queue_is_empty(&conn->secondary_list)) {
358 g_queue_push_head(&conn->primary_list, ppkt);
359 return;
f4b61836 360 }
f449c9e5 361 spkt = g_queue_pop_head(&conn->secondary_list);
f4b61836 362
f449c9e5
MZ
363 if (ppkt->tcp_seq == ppkt->seq_end) {
364 colo_release_primary_pkt(s, ppkt);
365 ppkt = NULL;
366 }
9394133f 367
f449c9e5
MZ
368 if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
369 trace_colo_compare_main("pri: this packet has compared");
370 colo_release_primary_pkt(s, ppkt);
371 ppkt = NULL;
372 }
9394133f 373
f449c9e5
MZ
374 if (spkt->tcp_seq == spkt->seq_end) {
375 packet_destroy(spkt, NULL);
376 if (!ppkt) {
377 goto pri;
378 } else {
379 goto sec;
380 }
6efeb328 381 } else {
f449c9e5
MZ
382 if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
383 trace_colo_compare_main("sec: this packet has compared");
384 packet_destroy(spkt, NULL);
385 if (!ppkt) {
386 goto pri;
387 } else {
388 goto sec;
389 }
390 }
391 if (!ppkt) {
392 g_queue_push_head(&conn->secondary_list, spkt);
393 goto pri;
394 }
6efeb328 395 }
f4b61836 396
f449c9e5
MZ
397 if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
398 trace_colo_compare_tcp_info("pri",
399 ppkt->tcp_seq, ppkt->tcp_ack,
400 ppkt->header_size, ppkt->payload_size,
401 ppkt->offset, ppkt->flags);
402
403 trace_colo_compare_tcp_info("sec",
404 spkt->tcp_seq, spkt->tcp_ack,
405 spkt->header_size, spkt->payload_size,
406 spkt->offset, spkt->flags);
407
408 if (mark == COLO_COMPARE_FREE_PRIMARY) {
409 conn->compare_seq = ppkt->seq_end;
410 colo_release_primary_pkt(s, ppkt);
411 g_queue_push_head(&conn->secondary_list, spkt);
412 goto pri;
413 }
414 if (mark == COLO_COMPARE_FREE_SECONDARY) {
415 conn->compare_seq = spkt->seq_end;
416 packet_destroy(spkt, NULL);
417 goto sec;
418 }
419 if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
420 conn->compare_seq = ppkt->seq_end;
421 colo_release_primary_pkt(s, ppkt);
422 packet_destroy(spkt, NULL);
423 goto pri;
424 }
425 } else {
426 g_queue_push_head(&conn->primary_list, ppkt);
427 g_queue_push_head(&conn->secondary_list, spkt);
2061c14c
ZC
428
429 qemu_hexdump((char *)ppkt->data, stderr,
430 "colo-compare ppkt", ppkt->size);
431 qemu_hexdump((char *)spkt->data, stderr,
432 "colo-compare spkt", spkt->size);
f4b61836 433
dccd0313 434 colo_compare_inconsistency_notify();
f449c9e5 435 }
f4b61836
ZC
436}
437
f449c9e5 438
f4b61836
ZC
439/*
440 * Called from the compare thread on the primary
441 * for compare udp packet
442 */
443static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
444{
9394133f
MZ
445 uint16_t network_header_length = ppkt->ip->ip_hl << 2;
446 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
f4b61836
ZC
447
448 trace_colo_compare_main("compare udp");
2ad7ca4c 449
6efeb328
ZC
450 /*
451 * Because of ppkt and spkt are both in the same connection,
452 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
453 * same with spkt. In addition, IP header's Identification is a random
454 * field, we can handle it in IP fragmentation function later.
455 * COLO just concern the response net packet payload from primary guest
456 * and secondary guest are same or not, So we ignored all IP header include
457 * other field like TOS,TTL,IP Checksum. we only need to compare
458 * the ip payload here.
459 */
9394133f
MZ
460 if (ppkt->size != spkt->size) {
461 trace_colo_compare_main("UDP: payload size of packets are different");
462 return -1;
463 }
464 if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
465 ppkt->size - offset)) {
f4b61836 466 trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
f4b61836 467 trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
d87aa138 468 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
1723a7f7
ZC
469 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
470 ppkt->size);
471 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
472 spkt->size);
473 }
9394133f
MZ
474 return -1;
475 } else {
476 return 0;
f4b61836 477 }
f4b61836
ZC
478}
479
480/*
481 * Called from the compare thread on the primary
482 * for compare icmp packet
483 */
484static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
485{
9394133f
MZ
486 uint16_t network_header_length = ppkt->ip->ip_hl << 2;
487 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
6efeb328 488
f4b61836 489 trace_colo_compare_main("compare icmp");
f4b61836 490
6efeb328
ZC
491 /*
492 * Because of ppkt and spkt are both in the same connection,
493 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
494 * same with spkt. In addition, IP header's Identification is a random
495 * field, we can handle it in IP fragmentation function later.
496 * COLO just concern the response net packet payload from primary guest
497 * and secondary guest are same or not, So we ignored all IP header include
498 * other field like TOS,TTL,IP Checksum. we only need to compare
499 * the ip payload here.
500 */
9394133f
MZ
501 if (ppkt->size != spkt->size) {
502 trace_colo_compare_main("ICMP: payload size of packets are different");
503 return -1;
504 }
505 if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
506 ppkt->size - offset)) {
f4b61836
ZC
507 trace_colo_compare_icmp_miscompare("primary pkt size",
508 ppkt->size);
f4b61836
ZC
509 trace_colo_compare_icmp_miscompare("Secondary pkt size",
510 spkt->size);
d87aa138 511 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
1723a7f7
ZC
512 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
513 ppkt->size);
514 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
515 spkt->size);
516 }
f4b61836
ZC
517 return -1;
518 } else {
519 return 0;
520 }
521}
522
523/*
524 * Called from the compare thread on the primary
525 * for compare other packet
526 */
527static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
528{
9394133f
MZ
529 uint16_t offset = ppkt->vnet_hdr_len;
530
f4b61836 531 trace_colo_compare_main("compare other");
d87aa138 532 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
e630b2bf
ZC
533 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
534
535 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
536 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
537 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
538 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
539
540 trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
541 pri_ip_dst, spkt->size,
542 sec_ip_src, sec_ip_dst);
543 }
544
9394133f
MZ
545 if (ppkt->size != spkt->size) {
546 trace_colo_compare_main("Other: payload size of packets are different");
547 return -1;
548 }
549 return colo_compare_packet_payload(ppkt, spkt, offset, offset,
550 ppkt->size - offset);
0682e15b
ZC
551}
552
553static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
554{
555 int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
556
557 if ((now - pkt->creation_ms) > (*check_time)) {
558 trace_colo_old_packet_check_found(pkt->creation_ms);
559 return 0;
560 } else {
561 return 1;
562 }
563}
564
dccd0313
ZC
565void colo_compare_register_notifier(Notifier *notify)
566{
567 notifier_list_add(&colo_compare_notifiers, notify);
568}
569
570void colo_compare_unregister_notifier(Notifier *notify)
571{
572 notifier_remove(notify);
573}
574
d25a7dab 575static int colo_old_packet_check_one_conn(Connection *conn,
dccd0313 576 void *user_data)
0682e15b 577{
0682e15b
ZC
578 GList *result = NULL;
579 int64_t check_time = REGULAR_PACKET_CHECK_MS;
580
581 result = g_queue_find_custom(&conn->primary_list,
582 &check_time,
583 (GCompareFunc)colo_old_packet_check_one);
584
585 if (result) {
61c5f469 586 /* Do checkpoint will flush old packet */
dccd0313 587 colo_compare_inconsistency_notify();
d25a7dab 588 return 0;
0682e15b 589 }
d25a7dab
ZC
590
591 return 1;
0682e15b
ZC
592}
593
594/*
595 * Look for old packets that the secondary hasn't matched,
596 * if we have some then we have to checkpoint to wake
597 * the secondary up.
598 */
599static void colo_old_packet_check(void *opaque)
600{
601 CompareState *s = opaque;
602
d25a7dab
ZC
603 /*
604 * If we find one old packet, stop finding job and notify
605 * COLO frame do checkpoint.
606 */
607 g_queue_find_custom(&s->conn_list, NULL,
608 (GCompareFunc)colo_old_packet_check_one_conn);
0682e15b
ZC
609}
610
f449c9e5
MZ
611static void colo_compare_packet(CompareState *s, Connection *conn,
612 int (*HandlePacket)(Packet *spkt,
613 Packet *ppkt))
0682e15b 614{
0682e15b
ZC
615 Packet *pkt = NULL;
616 GList *result = NULL;
0682e15b
ZC
617
618 while (!g_queue_is_empty(&conn->primary_list) &&
619 !g_queue_is_empty(&conn->secondary_list)) {
626bba98 620 pkt = g_queue_pop_head(&conn->primary_list);
f449c9e5
MZ
621 result = g_queue_find_custom(&conn->secondary_list,
622 pkt, (GCompareFunc)HandlePacket);
0682e15b
ZC
623
624 if (result) {
f449c9e5 625 colo_release_primary_pkt(s, pkt);
0682e15b 626 g_queue_remove(&conn->secondary_list, result->data);
0682e15b
ZC
627 } else {
628 /*
629 * If one packet arrive late, the secondary_list or
630 * primary_list will be empty, so we can't compare it
dccd0313
ZC
631 * until next comparison. If the packets in the list are
632 * timeout, it will trigger a checkpoint request.
0682e15b
ZC
633 */
634 trace_colo_compare_main("packet different");
626bba98 635 g_queue_push_head(&conn->primary_list, pkt);
dccd0313 636 colo_compare_inconsistency_notify();
0682e15b
ZC
637 break;
638 }
639 }
640}
641
f449c9e5
MZ
642/*
643 * Called from the compare thread on the primary
644 * for compare packet with secondary list of the
645 * specified connection when a new packet was
646 * queued to it.
647 */
648static void colo_compare_connection(void *opaque, void *user_data)
649{
650 CompareState *s = user_data;
651 Connection *conn = opaque;
652
653 switch (conn->ip_proto) {
654 case IPPROTO_TCP:
655 colo_compare_tcp(s, conn);
656 break;
657 case IPPROTO_UDP:
658 colo_compare_packet(s, conn, colo_packet_compare_udp);
659 break;
660 case IPPROTO_ICMP:
661 colo_compare_packet(s, conn, colo_packet_compare_icmp);
662 break;
663 default:
664 colo_compare_packet(s, conn, colo_packet_compare_other);
665 break;
666 }
667}
668
3037e7a5 669static int compare_chr_send(CompareState *s,
59509ec1 670 const uint8_t *buf,
aa3a7032
ZC
671 uint32_t size,
672 uint32_t vnet_hdr_len)
59509ec1
ZC
673{
674 int ret = 0;
675 uint32_t len = htonl(size);
676
677 if (!size) {
678 return 0;
679 }
680
3037e7a5 681 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
59509ec1
ZC
682 if (ret != sizeof(len)) {
683 goto err;
684 }
685
aa3a7032
ZC
686 if (s->vnet_hdr) {
687 /*
688 * We send vnet header len make other module(like filter-redirector)
689 * know how to parse net packet correctly.
690 */
691 len = htonl(vnet_hdr_len);
692 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
693 if (ret != sizeof(len)) {
694 goto err;
695 }
696 }
697
3037e7a5 698 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
59509ec1
ZC
699 if (ret != size) {
700 goto err;
701 }
702
703 return 0;
704
705err:
706 return ret < 0 ? ret : -EIO;
707}
708
0682e15b
ZC
709static int compare_chr_can_read(void *opaque)
710{
711 return COMPARE_READ_LEN_MAX;
712}
713
714/*
715 * Called from the main thread on the primary for packets
716 * arriving over the socket from the primary.
717 */
718static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
719{
720 CompareState *s = COLO_COMPARE(opaque);
721 int ret;
722
723 ret = net_fill_rstate(&s->pri_rs, buf, size);
724 if (ret == -1) {
81517ba3 725 qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
39ab61c6 726 NULL, NULL, true);
0682e15b
ZC
727 error_report("colo-compare primary_in error");
728 }
729}
730
731/*
732 * Called from the main thread on the primary for packets
733 * arriving over the socket from the secondary.
734 */
735static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
736{
737 CompareState *s = COLO_COMPARE(opaque);
738 int ret;
739
740 ret = net_fill_rstate(&s->sec_rs, buf, size);
741 if (ret == -1) {
81517ba3 742 qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
39ab61c6 743 NULL, NULL, true);
0682e15b
ZC
744 error_report("colo-compare secondary_in error");
745 }
746}
747
66d2a242
HZ
748/*
749 * Check old packet regularly so it can watch for any packets
750 * that the secondary hasn't produced equivalents of.
751 */
dd321ecf 752static void check_old_packet_regular(void *opaque)
66d2a242
HZ
753{
754 CompareState *s = opaque;
755
756 /* if have old packet we will notify checkpoint */
757 colo_old_packet_check(s);
dd321ecf
WY
758 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
759 REGULAR_PACKET_CHECK_MS);
760}
761
0ffcece3
ZC
762/* Public API, Used for COLO frame to notify compare event */
763void colo_notify_compares_event(void *opaque, int event, Error **errp)
764{
765 CompareState *s;
766
767 qemu_mutex_lock(&event_mtx);
768 QTAILQ_FOREACH(s, &net_compares, next) {
769 s->event = event;
770 qemu_bh_schedule(s->event_bh);
771 event_unhandled_count++;
772 }
773 /* Wait all compare threads to finish handling this event */
774 while (event_unhandled_count > 0) {
775 qemu_cond_wait(&event_complete_cond, &event_mtx);
776 }
777
778 qemu_mutex_unlock(&event_mtx);
779}
780
dd321ecf
WY
781static void colo_compare_timer_init(CompareState *s)
782{
783 AioContext *ctx = iothread_get_aio_context(s->iothread);
66d2a242 784
dd321ecf
WY
785 s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
786 SCALE_MS, check_old_packet_regular,
787 s);
788 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
789 REGULAR_PACKET_CHECK_MS);
66d2a242
HZ
790}
791
dd321ecf 792static void colo_compare_timer_del(CompareState *s)
0682e15b 793{
dd321ecf
WY
794 if (s->packet_check_timer) {
795 timer_del(s->packet_check_timer);
796 timer_free(s->packet_check_timer);
797 s->packet_check_timer = NULL;
798 }
799 }
0682e15b 800
0ffcece3
ZC
801static void colo_flush_packets(void *opaque, void *user_data);
802
803static void colo_compare_handle_event(void *opaque)
804{
805 CompareState *s = opaque;
806
807 switch (s->event) {
808 case COLO_EVENT_CHECKPOINT:
809 g_queue_foreach(&s->conn_list, colo_flush_packets, s);
810 break;
811 case COLO_EVENT_FAILOVER:
812 break;
813 default:
814 break;
815 }
816
0ffcece3 817 qemu_mutex_lock(&event_mtx);
78e4f446 818 assert(event_unhandled_count > 0);
0ffcece3
ZC
819 event_unhandled_count--;
820 qemu_cond_broadcast(&event_complete_cond);
821 qemu_mutex_unlock(&event_mtx);
822}
823
dd321ecf
WY
824static void colo_compare_iothread(CompareState *s)
825{
826 object_ref(OBJECT(s->iothread));
827 s->worker_context = iothread_get_g_main_context(s->iothread);
0682e15b 828
5345fdb4 829 qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
81517ba3
AN
830 compare_pri_chr_in, NULL, NULL,
831 s, s->worker_context, true);
5345fdb4 832 qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
81517ba3
AN
833 compare_sec_chr_in, NULL, NULL,
834 s, s->worker_context, true);
0682e15b 835
dd321ecf 836 colo_compare_timer_init(s);
0ffcece3 837 s->event_bh = qemu_bh_new(colo_compare_handle_event, s);
0682e15b
ZC
838}
839
7dce4e6f
ZC
840static char *compare_get_pri_indev(Object *obj, Error **errp)
841{
842 CompareState *s = COLO_COMPARE(obj);
843
844 return g_strdup(s->pri_indev);
845}
846
847static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
848{
849 CompareState *s = COLO_COMPARE(obj);
850
851 g_free(s->pri_indev);
852 s->pri_indev = g_strdup(value);
853}
854
855static char *compare_get_sec_indev(Object *obj, Error **errp)
856{
857 CompareState *s = COLO_COMPARE(obj);
858
859 return g_strdup(s->sec_indev);
860}
861
862static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
863{
864 CompareState *s = COLO_COMPARE(obj);
865
866 g_free(s->sec_indev);
867 s->sec_indev = g_strdup(value);
868}
869
870static char *compare_get_outdev(Object *obj, Error **errp)
871{
872 CompareState *s = COLO_COMPARE(obj);
873
874 return g_strdup(s->outdev);
875}
876
877static void compare_set_outdev(Object *obj, const char *value, Error **errp)
878{
879 CompareState *s = COLO_COMPARE(obj);
880
881 g_free(s->outdev);
882 s->outdev = g_strdup(value);
883}
884
aa3a7032
ZC
885static bool compare_get_vnet_hdr(Object *obj, Error **errp)
886{
887 CompareState *s = COLO_COMPARE(obj);
888
889 return s->vnet_hdr;
890}
891
892static void compare_set_vnet_hdr(Object *obj,
893 bool value,
894 Error **errp)
895{
896 CompareState *s = COLO_COMPARE(obj);
897
898 s->vnet_hdr = value;
899}
900
cf6af766
ZC
901static char *compare_get_notify_dev(Object *obj, Error **errp)
902{
903 CompareState *s = COLO_COMPARE(obj);
904
905 return g_strdup(s->notify_dev);
906}
907
908static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
909{
910 CompareState *s = COLO_COMPARE(obj);
911
912 g_free(s->notify_dev);
913 s->notify_dev = g_strdup(value);
914}
915
7dce4e6f
ZC
916static void compare_pri_rs_finalize(SocketReadState *pri_rs)
917{
59509ec1 918 CompareState *s = container_of(pri_rs, CompareState, pri_rs);
8ec14402 919 Connection *conn = NULL;
59509ec1 920
8ec14402 921 if (packet_enqueue(s, PRIMARY_IN, &conn)) {
59509ec1 922 trace_colo_compare_main("primary: unsupported packet in");
aa3a7032
ZC
923 compare_chr_send(s,
924 pri_rs->buf,
925 pri_rs->packet_len,
926 pri_rs->vnet_hdr_len);
0682e15b 927 } else {
3463218c 928 /* compare packet in the specified connection */
8ec14402 929 colo_compare_connection(conn, s);
59509ec1 930 }
7dce4e6f
ZC
931}
932
933static void compare_sec_rs_finalize(SocketReadState *sec_rs)
934{
59509ec1 935 CompareState *s = container_of(sec_rs, CompareState, sec_rs);
8ec14402 936 Connection *conn = NULL;
59509ec1 937
8ec14402 938 if (packet_enqueue(s, SECONDARY_IN, &conn)) {
59509ec1 939 trace_colo_compare_main("secondary: unsupported packet in");
0682e15b 940 } else {
3463218c 941 /* compare packet in the specified connection */
8ec14402 942 colo_compare_connection(conn, s);
59509ec1 943 }
7dce4e6f
ZC
944}
945
7dce4e6f
ZC
946
947/*
948 * Return 0 is success.
949 * Return 1 is failed.
950 */
0ec7b3e7 951static int find_and_check_chardev(Chardev **chr,
7dce4e6f
ZC
952 char *chr_name,
953 Error **errp)
954{
7dce4e6f
ZC
955 *chr = qemu_chr_find(chr_name);
956 if (*chr == NULL) {
957 error_setg(errp, "Device '%s' not found",
958 chr_name);
959 return 1;
960 }
961
0a73336d
DB
962 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
963 error_setg(errp, "chardev \"%s\" is not reconnectable",
7dce4e6f
ZC
964 chr_name);
965 return 1;
966 }
fbf3cc3a 967
269d25cd
MAL
968 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
969 error_setg(errp, "chardev \"%s\" cannot switch context",
970 chr_name);
971 return 1;
972 }
973
7dce4e6f
ZC
974 return 0;
975}
976
977/*
978 * Called from the main thread on the primary
979 * to setup colo-compare.
980 */
981static void colo_compare_complete(UserCreatable *uc, Error **errp)
982{
983 CompareState *s = COLO_COMPARE(uc);
0ec7b3e7 984 Chardev *chr;
7dce4e6f 985
dd321ecf 986 if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
7dce4e6f 987 error_setg(errp, "colo compare needs 'primary_in' ,"
dd321ecf 988 "'secondary_in','outdev','iothread' property set");
7dce4e6f
ZC
989 return;
990 } else if (!strcmp(s->pri_indev, s->outdev) ||
991 !strcmp(s->sec_indev, s->outdev) ||
992 !strcmp(s->pri_indev, s->sec_indev)) {
993 error_setg(errp, "'indev' and 'outdev' could not be same "
994 "for compare module");
995 return;
996 }
997
5345fdb4
MAL
998 if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
999 !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
7dce4e6f
ZC
1000 return;
1001 }
1002
5345fdb4
MAL
1003 if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
1004 !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
7dce4e6f
ZC
1005 return;
1006 }
1007
5345fdb4
MAL
1008 if (find_and_check_chardev(&chr, s->outdev, errp) ||
1009 !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
7dce4e6f
ZC
1010 return;
1011 }
1012
aa3a7032
ZC
1013 net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
1014 net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
7dce4e6f 1015
0ffcece3
ZC
1016 QTAILQ_INSERT_TAIL(&net_compares, s, next);
1017
b6540d40
ZC
1018 g_queue_init(&s->conn_list);
1019
0ffcece3
ZC
1020 qemu_mutex_init(&event_mtx);
1021 qemu_cond_init(&event_complete_cond);
1022
b6540d40
ZC
1023 s->connection_track_table = g_hash_table_new_full(connection_key_hash,
1024 connection_key_equal,
1025 g_free,
1026 connection_destroy);
59509ec1 1027
dd321ecf 1028 colo_compare_iothread(s);
7dce4e6f
ZC
1029 return;
1030}
1031
dfd917a9
HZ
1032static void colo_flush_packets(void *opaque, void *user_data)
1033{
1034 CompareState *s = user_data;
1035 Connection *conn = opaque;
1036 Packet *pkt = NULL;
1037
1038 while (!g_queue_is_empty(&conn->primary_list)) {
1039 pkt = g_queue_pop_head(&conn->primary_list);
aa3a7032
ZC
1040 compare_chr_send(s,
1041 pkt->data,
1042 pkt->size,
1043 pkt->vnet_hdr_len);
dfd917a9
HZ
1044 packet_destroy(pkt, NULL);
1045 }
1046 while (!g_queue_is_empty(&conn->secondary_list)) {
1047 pkt = g_queue_pop_head(&conn->secondary_list);
1048 packet_destroy(pkt, NULL);
1049 }
1050}
1051
7dce4e6f
ZC
1052static void colo_compare_class_init(ObjectClass *oc, void *data)
1053{
1054 UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
1055
1056 ucc->complete = colo_compare_complete;
1057}
1058
1059static void colo_compare_init(Object *obj)
1060{
aa3a7032
ZC
1061 CompareState *s = COLO_COMPARE(obj);
1062
7dce4e6f
ZC
1063 object_property_add_str(obj, "primary_in",
1064 compare_get_pri_indev, compare_set_pri_indev,
1065 NULL);
1066 object_property_add_str(obj, "secondary_in",
1067 compare_get_sec_indev, compare_set_sec_indev,
1068 NULL);
1069 object_property_add_str(obj, "outdev",
1070 compare_get_outdev, compare_set_outdev,
1071 NULL);
dd321ecf
WY
1072 object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
1073 (Object **)&s->iothread,
1074 object_property_allow_set_link,
265b578c 1075 OBJ_PROP_LINK_STRONG, NULL);
cf6af766
ZC
1076 /* This parameter just for Xen COLO */
1077 object_property_add_str(obj, "notify_dev",
1078 compare_get_notify_dev, compare_set_notify_dev,
1079 NULL);
aa3a7032
ZC
1080
1081 s->vnet_hdr = false;
1082 object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
1083 compare_set_vnet_hdr, NULL);
7dce4e6f
ZC
1084}
1085
1086static void colo_compare_finalize(Object *obj)
1087{
1088 CompareState *s = COLO_COMPARE(obj);
0ffcece3 1089 CompareState *tmp = NULL;
7dce4e6f 1090
1ce2610c
MAL
1091 qemu_chr_fe_deinit(&s->chr_pri_in, false);
1092 qemu_chr_fe_deinit(&s->chr_sec_in, false);
1093 qemu_chr_fe_deinit(&s->chr_out, false);
dd321ecf
WY
1094 if (s->iothread) {
1095 colo_compare_timer_del(s);
1096 }
0ffcece3
ZC
1097
1098 qemu_bh_delete(s->event_bh);
1099
1100 QTAILQ_FOREACH(tmp, &net_compares, next) {
1101 if (tmp == s) {
1102 QTAILQ_REMOVE(&net_compares, s, next);
1103 break;
1104 }
1105 }
1106
dfd917a9
HZ
1107 /* Release all unhandled packets after compare thead exited */
1108 g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1109
727c2d76 1110 g_queue_clear(&s->conn_list);
0682e15b 1111
dd321ecf
WY
1112 if (s->connection_track_table) {
1113 g_hash_table_destroy(s->connection_track_table);
1114 }
1115
1116 if (s->iothread) {
1117 object_unref(OBJECT(s->iothread));
1118 }
0ffcece3
ZC
1119
1120 qemu_mutex_destroy(&event_mtx);
1121 qemu_cond_destroy(&event_complete_cond);
1122
7dce4e6f
ZC
1123 g_free(s->pri_indev);
1124 g_free(s->sec_indev);
1125 g_free(s->outdev);
cf6af766 1126 g_free(s->notify_dev);
7dce4e6f
ZC
1127}
1128
1129static const TypeInfo colo_compare_info = {
1130 .name = TYPE_COLO_COMPARE,
1131 .parent = TYPE_OBJECT,
1132 .instance_size = sizeof(CompareState),
1133 .instance_init = colo_compare_init,
1134 .instance_finalize = colo_compare_finalize,
1135 .class_size = sizeof(CompareClass),
1136 .class_init = colo_compare_class_init,
1137 .interfaces = (InterfaceInfo[]) {
1138 { TYPE_USER_CREATABLE },
1139 { }
1140 }
1141};
1142
1143static void register_types(void)
1144{
1145 type_register_static(&colo_compare_info);
1146}
1147
1148type_init(register_types);