]> git.proxmox.com Git - mirror_qemu.git/blobdiff - net/colo-compare.c
Fix input-linux reading from device
[mirror_qemu.git] / net / colo-compare.c
index 22b1da19f5be8679723de4d797ec34917989151c..54e6d40525ff550cb8b813238ddf69e99c4971ce 100644 (file)
@@ -68,9 +68,9 @@ typedef struct CompareState {
     char *pri_indev;
     char *sec_indev;
     char *outdev;
-    CharDriverState *chr_pri_in;
-    CharDriverState *chr_sec_in;
-    CharDriverState *chr_out;
+    CharBackend chr_pri_in;
+    CharBackend chr_sec_in;
+    CharBackend chr_out;
     SocketReadState pri_rs;
     SocketReadState sec_rs;
 
@@ -83,28 +83,33 @@ typedef struct CompareState {
     GHashTable *connection_track_table;
     /* compare thread, a thread for each NIC */
     QemuThread thread;
-    /* Timer used on the primary to find packets that are never matched */
-    QEMUTimer *timer;
-    QemuMutex timer_check_lock;
+
+    GMainContext *worker_context;
+    GMainLoop *compare_loop;
 } CompareState;
 
 typedef struct CompareClass {
     ObjectClass parent_class;
 } CompareClass;
 
-typedef struct CompareChardevProps {
-    bool is_socket;
-} CompareChardevProps;
-
 enum {
     PRIMARY_IN = 0,
     SECONDARY_IN,
 };
 
-static int compare_chr_send(CharDriverState *out,
+static int compare_chr_send(CharBackend *out,
                             const uint8_t *buf,
                             uint32_t size);
 
+static gint seq_sorter(Packet *a, Packet *b, gpointer data)
+{
+    struct tcphdr *atcp, *btcp;
+
+    atcp = (struct tcphdr *)(a->transport_header);
+    btcp = (struct tcphdr *)(b->transport_header);
+    return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
+}
+
 /*
  * Return 0 on success, if return -1 means the pkt
  * is unsupported(arp and ipv6) and will be sent later
@@ -141,6 +146,11 @@ static int packet_enqueue(CompareState *s, int mode)
         if (g_queue_get_length(&conn->primary_list) <=
                                MAX_QUEUE_SIZE) {
             g_queue_push_tail(&conn->primary_list, pkt);
+            if (conn->ip_proto == IPPROTO_TCP) {
+                g_queue_sort(&conn->primary_list,
+                             (GCompareDataFunc)seq_sorter,
+                             NULL);
+            }
         } else {
             error_report("colo compare primary queue size too big,"
                          "drop packet");
@@ -149,6 +159,11 @@ static int packet_enqueue(CompareState *s, int mode)
         if (g_queue_get_length(&conn->secondary_list) <=
                                MAX_QUEUE_SIZE) {
             g_queue_push_tail(&conn->secondary_list, pkt);
+            if (conn->ip_proto == IPPROTO_TCP) {
+                g_queue_sort(&conn->secondary_list,
+                             (GCompareDataFunc)seq_sorter,
+                             NULL);
+            }
         } else {
             error_report("colo compare secondary queue size too big,"
                          "drop packet");
@@ -165,16 +180,26 @@ static int packet_enqueue(CompareState *s, int mode)
  * return:    0  means packet same
  *            > 0 || < 0 means packet different
  */
-static int colo_packet_compare(Packet *ppkt, Packet *spkt)
+static int colo_packet_compare_common(Packet *ppkt, Packet *spkt, int offset)
 {
-    trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
-                               inet_ntoa(ppkt->ip->ip_dst), spkt->size,
-                               inet_ntoa(spkt->ip->ip_src),
-                               inet_ntoa(spkt->ip->ip_dst));
+    if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
+        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
+
+        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
+        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
+        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
+        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
+
+        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
+                                   pri_ip_dst, spkt->size,
+                                   sec_ip_src, sec_ip_dst);
+    }
 
     if (ppkt->size == spkt->size) {
-        return memcmp(ppkt->data, spkt->data, spkt->size);
+        return memcmp(ppkt->data + offset, spkt->data + offset,
+                      spkt->size - offset);
     } else {
+        trace_colo_compare_main("Net packet size are not the same");
         return -1;
     }
 }
@@ -188,15 +213,8 @@ static int colo_packet_compare_tcp(Packet *spkt, Packet *ppkt)
 {
     struct tcphdr *ptcp, *stcp;
     int res;
-    char *sdebug, *ddebug;
 
     trace_colo_compare_main("compare tcp");
-    if (ppkt->size != spkt->size) {
-        if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
-            trace_colo_compare_main("pkt size not same");
-        }
-        return -1;
-    }
 
     ptcp = (struct tcphdr *)ppkt->transport_header;
     stcp = (struct tcphdr *)spkt->transport_header;
@@ -215,28 +233,29 @@ static int colo_packet_compare_tcp(Packet *spkt, Packet *ppkt)
         spkt->ip->ip_sum = ppkt->ip->ip_sum;
     }
 
-    res = memcmp(ppkt->data + ETH_HLEN, spkt->data + ETH_HLEN,
-                (spkt->size - ETH_HLEN));
+    if (ptcp->th_sum == stcp->th_sum) {
+        res = colo_packet_compare_common(ppkt, spkt, ETH_HLEN);
+    } else {
+        res = -1;
+    }
 
     if (res != 0 && trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
-        sdebug = strdup(inet_ntoa(ppkt->ip->ip_src));
-        ddebug = strdup(inet_ntoa(ppkt->ip->ip_dst));
-        fprintf(stderr, "%s: src/dst: %s/%s p: seq/ack=%u/%u"
-                " s: seq/ack=%u/%u res=%d flags=%x/%x\n",
-                __func__, sdebug, ddebug,
-                (unsigned int)ntohl(ptcp->th_seq),
-                (unsigned int)ntohl(ptcp->th_ack),
-                (unsigned int)ntohl(stcp->th_seq),
-                (unsigned int)ntohl(stcp->th_ack),
-                res, ptcp->th_flags, stcp->th_flags);
-
-        fprintf(stderr, "Primary len = %d\n", ppkt->size);
-        qemu_hexdump((char *)ppkt->data, stderr, "colo-compare", ppkt->size);
-        fprintf(stderr, "Secondary len = %d\n", spkt->size);
-        qemu_hexdump((char *)spkt->data, stderr, "colo-compare", spkt->size);
-
-        g_free(sdebug);
-        g_free(ddebug);
+        trace_colo_compare_pkt_info_src(inet_ntoa(ppkt->ip->ip_src),
+                                        ntohl(stcp->th_seq),
+                                        ntohl(stcp->th_ack),
+                                        res, stcp->th_flags,
+                                        spkt->size);
+
+        trace_colo_compare_pkt_info_dst(inet_ntoa(ppkt->ip->ip_dst),
+                                        ntohl(ptcp->th_seq),
+                                        ntohl(ptcp->th_ack),
+                                        res, ptcp->th_flags,
+                                        ppkt->size);
+
+        qemu_hexdump((char *)ppkt->data, stderr,
+                     "colo-compare ppkt", ppkt->size);
+        qemu_hexdump((char *)spkt->data, stderr,
+                     "colo-compare spkt", spkt->size);
     }
 
     return res;
@@ -249,15 +268,32 @@ static int colo_packet_compare_tcp(Packet *spkt, Packet *ppkt)
 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
 {
     int ret;
+    int network_header_length = ppkt->ip->ip_hl * 4;
 
     trace_colo_compare_main("compare udp");
-    ret = colo_packet_compare(ppkt, spkt);
+
+    /*
+     * Because of ppkt and spkt are both in the same connection,
+     * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
+     * same with spkt. In addition, IP header's Identification is a random
+     * field, we can handle it in IP fragmentation function later.
+     * COLO just concern the response net packet payload from primary guest
+     * and secondary guest are same or not, So we ignored all IP header include
+     * other field like TOS,TTL,IP Checksum. we only need to compare
+     * the ip payload here.
+     */
+    ret = colo_packet_compare_common(ppkt, spkt,
+                                     network_header_length + ETH_HLEN);
 
     if (ret) {
         trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
-        qemu_hexdump((char *)ppkt->data, stderr, "colo-compare", ppkt->size);
         trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
-        qemu_hexdump((char *)spkt->data, stderr, "colo-compare", spkt->size);
+        if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
+            qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
+                         ppkt->size);
+            qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
+                         spkt->size);
+        }
     }
 
     return ret;
@@ -269,24 +305,32 @@ static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
  */
 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
 {
-    int network_length;
+    int network_header_length = ppkt->ip->ip_hl * 4;
 
     trace_colo_compare_main("compare icmp");
-    network_length = ppkt->ip->ip_hl * 4;
-    if (ppkt->size != spkt->size ||
-        ppkt->size < network_length + ETH_HLEN) {
-        return -1;
-    }
 
-    if (colo_packet_compare(ppkt, spkt)) {
+    /*
+     * Because of ppkt and spkt are both in the same connection,
+     * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
+     * same with spkt. In addition, IP header's Identification is a random
+     * field, we can handle it in IP fragmentation function later.
+     * COLO just concern the response net packet payload from primary guest
+     * and secondary guest are same or not, So we ignored all IP header include
+     * other field like TOS,TTL,IP Checksum. we only need to compare
+     * the ip payload here.
+     */
+    if (colo_packet_compare_common(ppkt, spkt,
+                                   network_header_length + ETH_HLEN)) {
         trace_colo_compare_icmp_miscompare("primary pkt size",
                                            ppkt->size);
-        qemu_hexdump((char *)ppkt->data, stderr, "colo-compare",
-                     ppkt->size);
         trace_colo_compare_icmp_miscompare("Secondary pkt size",
                                            spkt->size);
-        qemu_hexdump((char *)spkt->data, stderr, "colo-compare",
-                     spkt->size);
+        if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
+            qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
+                         ppkt->size);
+            qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
+                         spkt->size);
+        }
         return -1;
     } else {
         return 0;
@@ -300,11 +344,20 @@ static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
 {
     trace_colo_compare_main("compare other");
-    trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
-                               inet_ntoa(ppkt->ip->ip_dst), spkt->size,
-                               inet_ntoa(spkt->ip->ip_src),
-                               inet_ntoa(spkt->ip->ip_dst));
-    return colo_packet_compare(ppkt, spkt);
+    if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
+        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
+
+        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
+        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
+        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
+        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
+
+        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
+                                   pri_ip_dst, spkt->size,
+                                   sec_ip_src, sec_ip_dst);
+    }
+
+    return colo_packet_compare_common(ppkt, spkt, 0);
 }
 
 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
@@ -362,9 +415,7 @@ static void colo_compare_connection(void *opaque, void *user_data)
 
     while (!g_queue_is_empty(&conn->primary_list) &&
            !g_queue_is_empty(&conn->secondary_list)) {
-        qemu_mutex_lock(&s->timer_check_lock);
         pkt = g_queue_pop_tail(&conn->primary_list);
-        qemu_mutex_unlock(&s->timer_check_lock);
         switch (conn->ip_proto) {
         case IPPROTO_TCP:
             result = g_queue_find_custom(&conn->secondary_list,
@@ -385,7 +436,7 @@ static void colo_compare_connection(void *opaque, void *user_data)
         }
 
         if (result) {
-            ret = compare_chr_send(s->chr_out, pkt->data, pkt->size);
+            ret = compare_chr_send(&s->chr_out, pkt->data, pkt->size);
             if (ret < 0) {
                 error_report("colo_send_primary_packet failed");
             }
@@ -399,16 +450,14 @@ static void colo_compare_connection(void *opaque, void *user_data)
              * until next comparison.
              */
             trace_colo_compare_main("packet different");
-            qemu_mutex_lock(&s->timer_check_lock);
             g_queue_push_tail(&conn->primary_list, pkt);
-            qemu_mutex_unlock(&s->timer_check_lock);
             /* TODO: colo_notify_checkpoint();*/
             break;
         }
     }
 }
 
-static int compare_chr_send(CharDriverState *out,
+static int compare_chr_send(CharBackend *out,
                             const uint8_t *buf,
                             uint32_t size)
 {
@@ -451,7 +500,8 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
 
     ret = net_fill_rstate(&s->pri_rs, buf, size);
     if (ret == -1) {
-        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
+        qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL,
+                                 NULL, NULL, true);
         error_report("colo-compare primary_in error");
     }
 }
@@ -467,30 +517,51 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
 
     ret = net_fill_rstate(&s->sec_rs, buf, size);
     if (ret == -1) {
-        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
+        qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL,
+                                 NULL, NULL, true);
         error_report("colo-compare secondary_in error");
     }
 }
 
+/*
+ * Check old packet regularly so it can watch for any packets
+ * that the secondary hasn't produced equivalents of.
+ */
+static gboolean check_old_packet_regular(void *opaque)
+{
+    CompareState *s = opaque;
+
+    /* if have old packet we will notify checkpoint */
+    colo_old_packet_check(s);
+
+    return TRUE;
+}
+
 static void *colo_compare_thread(void *opaque)
 {
-    GMainContext *worker_context;
-    GMainLoop *compare_loop;
     CompareState *s = opaque;
+    GSource *timeout_source;
 
-    worker_context = g_main_context_new();
+    s->worker_context = g_main_context_new();
 
-    qemu_chr_add_handlers_full(s->chr_pri_in, compare_chr_can_read,
-                          compare_pri_chr_in, NULL, s, worker_context);
-    qemu_chr_add_handlers_full(s->chr_sec_in, compare_chr_can_read,
-                          compare_sec_chr_in, NULL, s, worker_context);
+    qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
+                          compare_pri_chr_in, NULL, s, s->worker_context, true);
+    qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
+                          compare_sec_chr_in, NULL, s, s->worker_context, true);
 
-    compare_loop = g_main_loop_new(worker_context, FALSE);
+    s->compare_loop = g_main_loop_new(s->worker_context, FALSE);
 
-    g_main_loop_run(compare_loop);
+    /* To kick any packets that the secondary doesn't match */
+    timeout_source = g_timeout_source_new(REGULAR_PACKET_CHECK_MS);
+    g_source_set_callback(timeout_source,
+                          (GSourceFunc)check_old_packet_regular, s, NULL);
+    g_source_attach(timeout_source, s->worker_context);
 
-    g_main_loop_unref(compare_loop);
-    g_main_context_unref(worker_context);
+    g_main_loop_run(s->compare_loop);
+
+    g_source_unref(timeout_source);
+    g_main_loop_unref(s->compare_loop);
+    g_main_context_unref(s->worker_context);
     return NULL;
 }
 
@@ -545,7 +616,7 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs)
 
     if (packet_enqueue(s, PRIMARY_IN)) {
         trace_colo_compare_main("primary: unsupported packet in");
-        compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
+        compare_chr_send(&s->chr_out, pri_rs->buf, pri_rs->packet_len);
     } else {
         /* compare connection */
         g_queue_foreach(&s->conn_list, colo_compare_connection, s);
@@ -564,40 +635,15 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs)
     }
 }
 
-static int compare_chardev_opts(void *opaque,
-                                const char *name, const char *value,
-                                Error **errp)
-{
-    CompareChardevProps *props = opaque;
-
-    if (strcmp(name, "backend") == 0 &&
-        strcmp(value, "socket") == 0) {
-        props->is_socket = true;
-        return 0;
-    } else if (strcmp(name, "host") == 0 ||
-              (strcmp(name, "port") == 0) ||
-              (strcmp(name, "server") == 0) ||
-              (strcmp(name, "wait") == 0) ||
-              (strcmp(name, "path") == 0)) {
-        return 0;
-    } else {
-        error_setg(errp,
-                   "COLO-compare does not support a chardev with option %s=%s",
-                   name, value);
-        return -1;
-    }
-}
 
 /*
  * Return 0 is success.
  * Return 1 is failed.
  */
-static int find_and_check_chardev(CharDriverState **chr,
+static int find_and_check_chardev(Chardev **chr,
                                   char *chr_name,
                                   Error **errp)
 {
-    CompareChardevProps props;
-
     *chr = qemu_chr_find(chr_name);
     if (*chr == NULL) {
         error_setg(errp, "Device '%s' not found",
@@ -605,37 +651,13 @@ static int find_and_check_chardev(CharDriverState **chr,
         return 1;
     }
 
-    memset(&props, 0, sizeof(props));
-    if (qemu_opt_foreach((*chr)->opts, compare_chardev_opts, &props, errp)) {
-        return 1;
-    }
-
-    if (!props.is_socket) {
-        error_setg(errp, "chardev \"%s\" is not a tcp socket",
+    if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
+        error_setg(errp, "chardev \"%s\" is not reconnectable",
                    chr_name);
         return 1;
     }
-    return 0;
-}
-
-/*
- * Check old packet regularly so it can watch for any packets
- * that the secondary hasn't produced equivalents of.
- */
-static void check_old_packet_regular(void *opaque)
-{
-    CompareState *s = opaque;
 
-    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
-              REGULAR_PACKET_CHECK_MS);
-    /* if have old packet we will notify checkpoint */
-    /*
-     * TODO: Make timer handler run in compare thread
-     * like qemu_chr_add_handlers_full.
-     */
-    qemu_mutex_lock(&s->timer_check_lock);
-    colo_old_packet_check(s);
-    qemu_mutex_unlock(&s->timer_check_lock);
+    return 0;
 }
 
 /*
@@ -645,6 +667,7 @@ static void check_old_packet_regular(void *opaque)
 static void colo_compare_complete(UserCreatable *uc, Error **errp)
 {
     CompareState *s = COLO_COMPARE(uc);
+    Chardev *chr;
     char thread_name[64];
     static int compare_id;
 
@@ -660,29 +683,25 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
         return;
     }
 
-    if (find_and_check_chardev(&s->chr_pri_in, s->pri_indev, errp)) {
+    if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
+        !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
         return;
     }
 
-    if (find_and_check_chardev(&s->chr_sec_in, s->sec_indev, errp)) {
+    if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
+        !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
         return;
     }
 
-    if (find_and_check_chardev(&s->chr_out, s->outdev, errp)) {
+    if (find_and_check_chardev(&chr, s->outdev, errp) ||
+        !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
         return;
     }
 
-    qemu_chr_fe_claim_no_fail(s->chr_pri_in);
-
-    qemu_chr_fe_claim_no_fail(s->chr_sec_in);
-
-    qemu_chr_fe_claim_no_fail(s->chr_out);
-
     net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize);
     net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);
 
     g_queue_init(&s->conn_list);
-    qemu_mutex_init(&s->timer_check_lock);
 
     s->connection_track_table = g_hash_table_new_full(connection_key_hash,
                                                       connection_key_equal,
@@ -695,15 +714,26 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
                        QEMU_THREAD_JOINABLE);
     compare_id++;
 
-    /* A regular timer to kick any packets that the secondary doesn't match */
-    s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest runs */
-                            check_old_packet_regular, s);
-    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
-                        REGULAR_PACKET_CHECK_MS);
-
     return;
 }
 
+static void colo_flush_packets(void *opaque, void *user_data)
+{
+    CompareState *s = user_data;
+    Connection *conn = opaque;
+    Packet *pkt = NULL;
+
+    while (!g_queue_is_empty(&conn->primary_list)) {
+        pkt = g_queue_pop_head(&conn->primary_list);
+        compare_chr_send(&s->chr_out, pkt->data, pkt->size);
+        packet_destroy(pkt, NULL);
+    }
+    while (!g_queue_is_empty(&conn->secondary_list)) {
+        pkt = g_queue_pop_head(&conn->secondary_list);
+        packet_destroy(pkt, NULL);
+    }
+}
+
 static void colo_compare_class_init(ObjectClass *oc, void *data)
 {
     UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
@@ -728,32 +758,21 @@ static void colo_compare_finalize(Object *obj)
 {
     CompareState *s = COLO_COMPARE(obj);
 
-    if (s->chr_pri_in) {
-        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
-        qemu_chr_fe_release(s->chr_pri_in);
-    }
-    if (s->chr_sec_in) {
-        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
-        qemu_chr_fe_release(s->chr_sec_in);
-    }
-    if (s->chr_out) {
-        qemu_chr_fe_release(s->chr_out);
-    }
-
-    g_queue_free(&s->conn_list);
+    qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
+                             s->worker_context, true);
+    qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
+                             s->worker_context, true);
+    qemu_chr_fe_deinit(&s->chr_out);
 
-    if (qemu_thread_is_self(&s->thread)) {
-        /* compare connection */
-        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
-        qemu_thread_join(&s->thread);
-    }
+    g_main_loop_quit(s->compare_loop);
+    qemu_thread_join(&s->thread);
 
-    if (s->timer) {
-        timer_del(s->timer);
-    }
+    /* Release all unhandled packets after compare thead exited */
+    g_queue_foreach(&s->conn_list, colo_flush_packets, s);
 
-    qemu_mutex_destroy(&s->timer_check_lock);
+    g_queue_clear(&s->conn_list);
 
+    g_hash_table_destroy(s->connection_track_table);
     g_free(s->pri_indev);
     g_free(s->sec_indev);
     g_free(s->outdev);