]> git.proxmox.com Git - mirror_qemu.git/blobdiff - net/colo-compare.c
net/colo-compare.c: Correct ordering in complete and finalize
[mirror_qemu.git] / net / colo-compare.c
index 23b2d2c4cc662ffd4998eab7f4655cb3437c2f80..ed1f3d0af0e635e256d958623fb730c85b555073 100644 (file)
@@ -13,9 +13,9 @@
  */
 
 #include "qemu/osdep.h"
+#include "qemu-common.h"
 #include "qemu/error-report.h"
 #include "trace.h"
-#include "qemu-common.h"
 #include "qapi/error.h"
 #include "net/net.h"
 #include "net/eth.h"
 #include "net/queue.h"
 #include "chardev/char-fe.h"
 #include "qemu/sockets.h"
-#include "net/colo.h"
+#include "colo.h"
 #include "sysemu/iothread.h"
+#include "net/colo-compare.h"
+#include "migration/colo.h"
+#include "migration/migration.h"
+#include "util.h"
+
+#include "block/aio-wait.h"
+#include "qemu/coroutine.h"
 
 #define TYPE_COLO_COMPARE "colo-compare"
 #define COLO_COMPARE(obj) \
     OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
 
+static QTAILQ_HEAD(, CompareState) net_compares =
+       QTAILQ_HEAD_INITIALIZER(net_compares);
+
+static NotifierList colo_compare_notifiers =
+    NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
+
 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
 #define MAX_QUEUE_SIZE 1024
 
 #define COLO_COMPARE_FREE_PRIMARY     0x01
 #define COLO_COMPARE_FREE_SECONDARY   0x02
 
-/* TODO: Should be configurable */
 #define REGULAR_PACKET_CHECK_MS 3000
+#define DEFAULT_TIME_OUT_MS 3000
+
+static QemuMutex colo_compare_mutex;
+static bool colo_compare_active;
+static QemuMutex event_mtx;
+static QemuCond event_complete_cond;
+static int event_unhandled_count;
 
 /*
  *  + CompareState ++
  *                    |packet  |  |packet  +    |packet  | |packet  +
  *                    +--------+  +--------+    +--------+ +--------+
  */
+
+typedef struct SendCo {
+    Coroutine *co;
+    struct CompareState *s;
+    CharBackend *chr;
+    GQueue send_list;
+    bool notify_remote_frame;
+    bool done;
+    int ret;
+} SendCo;
+
+typedef struct SendEntry {
+    uint32_t size;
+    uint32_t vnet_hdr_len;
+    uint8_t *buf;
+} SendEntry;
+
 typedef struct CompareState {
     Object parent;
 
     char *pri_indev;
     char *sec_indev;
     char *outdev;
+    char *notify_dev;
     CharBackend chr_pri_in;
     CharBackend chr_sec_in;
     CharBackend chr_out;
+    CharBackend chr_notify_dev;
     SocketReadState pri_rs;
     SocketReadState sec_rs;
+    SocketReadState notify_rs;
+    SendCo out_sendco;
+    SendCo notify_sendco;
     bool vnet_hdr;
+    uint32_t compare_timeout;
+    uint32_t expired_scan_cycle;
 
     /*
      * Record the connection that through the NIC
@@ -87,6 +130,11 @@ typedef struct CompareState {
     IOThread *iothread;
     GMainContext *worker_context;
     QEMUTimer *packet_check_timer;
+
+    QEMUBH *event_bh;
+    enum colo_event event;
+
+    QTAILQ_ENTRY(CompareState) next;
 } CompareState;
 
 typedef struct CompareClass {
@@ -98,26 +146,61 @@ enum {
     SECONDARY_IN,
 };
 
+
 static int compare_chr_send(CompareState *s,
-                            const uint8_t *buf,
+                            uint8_t *buf,
                             uint32_t size,
-                            uint32_t vnet_hdr_len);
+                            uint32_t vnet_hdr_len,
+                            bool notify_remote_frame,
+                            bool zero_copy);
+
+static bool packet_matches_str(const char *str,
+                               const uint8_t *buf,
+                               uint32_t packet_len)
+{
+    if (packet_len != strlen(str)) {
+        return false;
+    }
+
+    return !memcmp(str, buf, strlen(str));
+}
+
+static void notify_remote_frame(CompareState *s)
+{
+    char msg[] = "DO_CHECKPOINT";
+    int ret = 0;
+
+    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
+    if (ret < 0) {
+        error_report("Notify Xen COLO-frame failed");
+    }
+}
+
+static void colo_compare_inconsistency_notify(CompareState *s)
+{
+    if (s->notify_dev) {
+        notify_remote_frame(s);
+    } else {
+        notifier_list_notify(&colo_compare_notifiers,
+                             migrate_get_current());
+    }
+}
 
 static gint seq_sorter(Packet *a, Packet *b, gpointer data)
 {
-    struct tcphdr *atcp, *btcp;
+    struct tcp_hdr *atcp, *btcp;
 
-    atcp = (struct tcphdr *)(a->transport_header);
-    btcp = (struct tcphdr *)(b->transport_header);
+    atcp = (struct tcp_hdr *)(a->transport_header);
+    btcp = (struct tcp_hdr *)(b->transport_header);
     return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
 }
 
 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
 {
     Packet *pkt = data;
-    struct tcphdr *tcphd;
+    struct tcp_hdr *tcphd;
 
-    tcphd = (struct tcphdr *)pkt->transport_header;
+    tcphd = (struct tcp_hdr *)pkt->transport_header;
 
     pkt->tcp_seq = ntohl(tcphd->th_seq);
     pkt->tcp_ack = ntohl(tcphd->th_ack);
@@ -213,12 +296,14 @@ static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
     ret = compare_chr_send(s,
                            pkt->data,
                            pkt->size,
-                           pkt->vnet_hdr_len);
+                           pkt->vnet_hdr_len,
+                           false,
+                           true);
     if (ret < 0) {
         error_report("colo send primary packet failed");
     }
     trace_colo_compare_main("packet same and release packet");
-    packet_destroy(pkt, NULL);
+    packet_destroy_partial(pkt, NULL);
 }
 
 /*
@@ -262,15 +347,7 @@ static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
     *mark = 0;
 
     if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
-        if (colo_compare_packet_payload(ppkt, spkt,
-                                        ppkt->header_size, spkt->header_size,
-                                        ppkt->payload_size)) {
-            *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
-            return true;
-        }
-    }
-    if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
-        if (colo_compare_packet_payload(ppkt, spkt,
+        if (!colo_compare_packet_payload(ppkt, spkt,
                                         ppkt->header_size, spkt->header_size,
                                         ppkt->payload_size)) {
             *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
@@ -280,7 +357,7 @@ static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
 
     /* one part of secondary packet payload still need to be compared */
     if (!after(ppkt->seq_end, spkt->seq_end)) {
-        if (colo_compare_packet_payload(ppkt, spkt,
+        if (!colo_compare_packet_payload(ppkt, spkt,
                                         ppkt->header_size + ppkt->offset,
                                         spkt->header_size + spkt->offset,
                                         ppkt->payload_size - ppkt->offset)) {
@@ -299,7 +376,7 @@ static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
         /* primary packet is longer than secondary packet, compare
          * the same part and mark the primary packet offset
          */
-        if (colo_compare_packet_payload(ppkt, spkt,
+        if (!colo_compare_packet_payload(ppkt, spkt,
                                         ppkt->header_size + ppkt->offset,
                                         spkt->header_size + spkt->offset,
                                         spkt->payload_size - spkt->offset)) {
@@ -408,15 +485,14 @@ sec:
         g_queue_push_head(&conn->primary_list, ppkt);
         g_queue_push_head(&conn->secondary_list, spkt);
 
-        qemu_hexdump((char *)ppkt->data, stderr,
-                     "colo-compare ppkt", ppkt->size);
-        qemu_hexdump((char *)spkt->data, stderr,
-                     "colo-compare spkt", spkt->size);
+        if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
+            qemu_hexdump((char *)ppkt->data, stderr,
+                        "colo-compare ppkt", ppkt->size);
+            qemu_hexdump((char *)spkt->data, stderr,
+                        "colo-compare spkt", spkt->size);
+        }
 
-        /*
-         * colo_compare_inconsistent_notify();
-         * TODO: notice to checkpoint();
-         */
+        colo_compare_inconsistency_notify(s);
     }
 }
 
@@ -547,22 +623,28 @@ static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
     }
 }
 
+void colo_compare_register_notifier(Notifier *notify)
+{
+    notifier_list_add(&colo_compare_notifiers, notify);
+}
+
+void colo_compare_unregister_notifier(Notifier *notify)
+{
+    notifier_remove(notify);
+}
+
 static int colo_old_packet_check_one_conn(Connection *conn,
-                                          void *user_data)
+                                          CompareState *s)
 {
     GList *result = NULL;
-    int64_t check_time = REGULAR_PACKET_CHECK_MS;
 
     result = g_queue_find_custom(&conn->primary_list,
-                                 &check_time,
+                                 &s->compare_timeout,
                                  (GCompareFunc)colo_old_packet_check_one);
 
     if (result) {
         /* Do checkpoint will flush old packet */
-        /*
-         * TODO: Notify colo frame to do checkpoint.
-         * colo_compare_inconsistent_notify();
-         */
+        colo_compare_inconsistency_notify(s);
         return 0;
     }
 
@@ -582,7 +664,7 @@ static void colo_old_packet_check(void *opaque)
      * If we find one old packet, stop finding job and notify
      * COLO frame do checkpoint.
      */
-    g_queue_find_custom(&s->conn_list, NULL,
+    g_queue_find_custom(&s->conn_list, s,
                         (GCompareFunc)colo_old_packet_check_one_conn);
 }
 
@@ -606,11 +688,13 @@ static void colo_compare_packet(CompareState *s, Connection *conn,
             /*
              * If one packet arrive late, the secondary_list or
              * primary_list will be empty, so we can't compare it
-             * until next comparison.
+             * until next comparison. If the packets in the list are
+             * timeout, it will trigger a checkpoint request.
              */
             trace_colo_compare_main("packet different");
             g_queue_push_head(&conn->primary_list, pkt);
-            /* TODO: colo_notify_checkpoint();*/
+
+            colo_compare_inconsistency_notify(s);
             break;
         }
     }
@@ -643,44 +727,115 @@ static void colo_compare_connection(void *opaque, void *user_data)
     }
 }
 
-static int compare_chr_send(CompareState *s,
-                            const uint8_t *buf,
-                            uint32_t size,
-                            uint32_t vnet_hdr_len)
+static void coroutine_fn _compare_chr_send(void *opaque)
 {
+    SendCo *sendco = opaque;
+    CompareState *s = sendco->s;
     int ret = 0;
-    uint32_t len = htonl(size);
 
-    if (!size) {
-        return 0;
-    }
+    while (!g_queue_is_empty(&sendco->send_list)) {
+        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+        uint32_t len = htonl(entry->size);
 
-    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
-    if (ret != sizeof(len)) {
-        goto err;
-    }
+        ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len));
 
-    if (s->vnet_hdr) {
-        /*
-         * We send vnet header len make other module(like filter-redirector)
-         * know how to parse net packet correctly.
-         */
-        len = htonl(vnet_hdr_len);
-        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
         if (ret != sizeof(len)) {
+            g_free(entry->buf);
+            g_slice_free(SendEntry, entry);
             goto err;
         }
-    }
 
-    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
-    if (ret != size) {
-        goto err;
+        if (!sendco->notify_remote_frame && s->vnet_hdr) {
+            /*
+             * We send vnet header len make other module(like filter-redirector)
+             * know how to parse net packet correctly.
+             */
+            len = htonl(entry->vnet_hdr_len);
+
+            ret = qemu_chr_fe_write_all(sendco->chr,
+                                        (uint8_t *)&len,
+                                        sizeof(len));
+
+            if (ret != sizeof(len)) {
+                g_free(entry->buf);
+                g_slice_free(SendEntry, entry);
+                goto err;
+            }
+        }
+
+        ret = qemu_chr_fe_write_all(sendco->chr,
+                                    (uint8_t *)entry->buf,
+                                    entry->size);
+
+        if (ret != entry->size) {
+            g_free(entry->buf);
+            g_slice_free(SendEntry, entry);
+            goto err;
+        }
+
+        g_free(entry->buf);
+        g_slice_free(SendEntry, entry);
     }
 
-    return 0;
+    sendco->ret = 0;
+    goto out;
 
 err:
-    return ret < 0 ? ret : -EIO;
+    while (!g_queue_is_empty(&sendco->send_list)) {
+        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+        g_free(entry->buf);
+        g_slice_free(SendEntry, entry);
+    }
+    sendco->ret = ret < 0 ? ret : -EIO;
+out:
+    sendco->co = NULL;
+    sendco->done = true;
+    aio_wait_kick();
+}
+
+static int compare_chr_send(CompareState *s,
+                            uint8_t *buf,
+                            uint32_t size,
+                            uint32_t vnet_hdr_len,
+                            bool notify_remote_frame,
+                            bool zero_copy)
+{
+    SendCo *sendco;
+    SendEntry *entry;
+
+    if (notify_remote_frame) {
+        sendco = &s->notify_sendco;
+    } else {
+        sendco = &s->out_sendco;
+    }
+
+    if (!size) {
+        return 0;
+    }
+
+    entry = g_slice_new(SendEntry);
+    entry->size = size;
+    entry->vnet_hdr_len = vnet_hdr_len;
+    if (zero_copy) {
+        entry->buf = buf;
+    } else {
+        entry->buf = g_malloc(size);
+        memcpy(entry->buf, buf, size);
+    }
+    g_queue_push_head(&sendco->send_list, entry);
+
+    if (sendco->done) {
+        sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
+        sendco->done = false;
+        qemu_coroutine_enter(sendco->co);
+        if (sendco->done) {
+            /* report early errors */
+            return sendco->ret;
+        }
+    }
+
+    /* assume success */
+    return 0;
 }
 
 static int compare_chr_can_read(void *opaque)
@@ -722,6 +877,19 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
     }
 }
 
+static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
+{
+    CompareState *s = COLO_COMPARE(opaque);
+    int ret;
+
+    ret = net_fill_rstate(&s->notify_rs, buf, size);
+    if (ret == -1) {
+        qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
+                                 NULL, NULL, true);
+        error_report("colo-compare notify_dev error");
+    }
+}
+
 /*
  * Check old packet regularly so it can watch for any packets
  * that the secondary hasn't produced equivalents of.
@@ -733,7 +901,33 @@ static void check_old_packet_regular(void *opaque)
     /* if have old packet we will notify checkpoint */
     colo_old_packet_check(s);
     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
-                REGULAR_PACKET_CHECK_MS);
+              s->expired_scan_cycle);
+}
+
+/* Public API, Used for COLO frame to notify compare event */
+void colo_notify_compares_event(void *opaque, int event, Error **errp)
+{
+    CompareState *s;
+    qemu_mutex_lock(&colo_compare_mutex);
+
+    if (!colo_compare_active) {
+        qemu_mutex_unlock(&colo_compare_mutex);
+        return;
+    }
+
+    qemu_mutex_lock(&event_mtx);
+    QTAILQ_FOREACH(s, &net_compares, next) {
+        s->event = event;
+        qemu_bh_schedule(s->event_bh);
+        event_unhandled_count++;
+    }
+    /* Wait all compare threads to finish handling this event */
+    while (event_unhandled_count > 0) {
+        qemu_cond_wait(&event_complete_cond, &event_mtx);
+    }
+
+    qemu_mutex_unlock(&event_mtx);
+    qemu_mutex_unlock(&colo_compare_mutex);
 }
 
 static void colo_compare_timer_init(CompareState *s)
@@ -744,7 +938,7 @@ static void colo_compare_timer_init(CompareState *s)
                                 SCALE_MS, check_old_packet_regular,
                                 s);
     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
-                    REGULAR_PACKET_CHECK_MS);
+              s->expired_scan_cycle);
 }
 
 static void colo_compare_timer_del(CompareState *s)
@@ -756,8 +950,32 @@ static void colo_compare_timer_del(CompareState *s)
     }
  }
 
+static void colo_flush_packets(void *opaque, void *user_data);
+
+static void colo_compare_handle_event(void *opaque)
+{
+    CompareState *s = opaque;
+
+    switch (s->event) {
+    case COLO_EVENT_CHECKPOINT:
+        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
+        break;
+    case COLO_EVENT_FAILOVER:
+        break;
+    default:
+        break;
+    }
+
+    qemu_mutex_lock(&event_mtx);
+    assert(event_unhandled_count > 0);
+    event_unhandled_count--;
+    qemu_cond_broadcast(&event_complete_cond);
+    qemu_mutex_unlock(&event_mtx);
+}
+
 static void colo_compare_iothread(CompareState *s)
 {
+    AioContext *ctx = iothread_get_aio_context(s->iothread);
     object_ref(OBJECT(s->iothread));
     s->worker_context = iothread_get_g_main_context(s->iothread);
 
@@ -767,8 +985,14 @@ static void colo_compare_iothread(CompareState *s)
     qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
                              compare_sec_chr_in, NULL, NULL,
                              s, s->worker_context, true);
+    if (s->notify_dev) {
+        qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
+                                 compare_notify_chr, NULL, NULL,
+                                 s, s->worker_context, true);
+    }
 
     colo_compare_timer_init(s);
+    s->event_bh = aio_bh_new(ctx, colo_compare_handle_event, s);
 }
 
 static char *compare_get_pri_indev(Object *obj, Error **errp)
@@ -832,6 +1056,87 @@ static void compare_set_vnet_hdr(Object *obj,
     s->vnet_hdr = value;
 }
 
+static char *compare_get_notify_dev(Object *obj, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    return g_strdup(s->notify_dev);
+}
+
+static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    g_free(s->notify_dev);
+    s->notify_dev = g_strdup(value);
+}
+
+static void compare_get_timeout(Object *obj, Visitor *v,
+                                const char *name, void *opaque,
+                                Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+    uint32_t value = s->compare_timeout;
+
+    visit_type_uint32(v, name, &value, errp);
+}
+
+static void compare_set_timeout(Object *obj, Visitor *v,
+                                const char *name, void *opaque,
+                                Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+    Error *local_err = NULL;
+    uint32_t value;
+
+    visit_type_uint32(v, name, &value, &local_err);
+    if (local_err) {
+        goto out;
+    }
+    if (!value) {
+        error_setg(&local_err, "Property '%s.%s' requires a positive value",
+                   object_get_typename(obj), name);
+        goto out;
+    }
+    s->compare_timeout = value;
+
+out:
+    error_propagate(errp, local_err);
+}
+
+static void compare_get_expired_scan_cycle(Object *obj, Visitor *v,
+                                           const char *name, void *opaque,
+                                           Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+    uint32_t value = s->expired_scan_cycle;
+
+    visit_type_uint32(v, name, &value, errp);
+}
+
+static void compare_set_expired_scan_cycle(Object *obj, Visitor *v,
+                                           const char *name, void *opaque,
+                                           Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+    Error *local_err = NULL;
+    uint32_t value;
+
+    visit_type_uint32(v, name, &value, &local_err);
+    if (local_err) {
+        goto out;
+    }
+    if (!value) {
+        error_setg(&local_err, "Property '%s.%s' requires a positive value",
+                   object_get_typename(obj), name);
+        goto out;
+    }
+    s->expired_scan_cycle = value;
+
+out:
+    error_propagate(errp, local_err);
+}
+
 static void compare_pri_rs_finalize(SocketReadState *pri_rs)
 {
     CompareState *s = container_of(pri_rs, CompareState, pri_rs);
@@ -842,7 +1147,9 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs)
         compare_chr_send(s,
                          pri_rs->buf,
                          pri_rs->packet_len,
-                         pri_rs->vnet_hdr_len);
+                         pri_rs->vnet_hdr_len,
+                         false,
+                         false);
     } else {
         /* compare packet in the specified connection */
         colo_compare_connection(conn, s);
@@ -862,6 +1169,29 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs)
     }
 }
 
+static void compare_notify_rs_finalize(SocketReadState *notify_rs)
+{
+    CompareState *s = container_of(notify_rs, CompareState, notify_rs);
+
+    const char msg[] = "COLO_COMPARE_GET_XEN_INIT";
+    int ret;
+
+    if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
+                           notify_rs->buf,
+                           notify_rs->packet_len)) {
+        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
+        if (ret < 0) {
+            error_report("Notify Xen COLO-frame INIT failed");
+        }
+    } else if (packet_matches_str("COLO_CHECKPOINT",
+                                  notify_rs->buf,
+                                  notify_rs->packet_len)) {
+        /* colo-compare do checkpoint, flush pri packet and remove sec packet */
+        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
+    } else {
+        error_report("COLO compare got unsupported instruction");
+    }
+}
 
 /*
  * Return 0 is success.
@@ -884,6 +1214,12 @@ static int find_and_check_chardev(Chardev **chr,
         return 1;
     }
 
+    if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
+        error_setg(errp, "chardev \"%s\" cannot switch context",
+                   chr_name);
+        return 1;
+    }
+
     return 0;
 }
 
@@ -908,6 +1244,16 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
         return;
     }
 
+    if (!s->compare_timeout) {
+        /* Set default value to 3000 MS */
+        s->compare_timeout = DEFAULT_TIME_OUT_MS;
+    }
+
+    if (!s->expired_scan_cycle) {
+        /* Set default value to 3000 MS */
+        s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS;
+    }
+
     if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
         !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
         return;
@@ -926,6 +1272,31 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
     net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
     net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
 
+    /* Try to enable remote notify chardev, currently just for Xen COLO */
+    if (s->notify_dev) {
+        if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
+            !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
+            return;
+        }
+
+        net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
+                           s->vnet_hdr);
+    }
+
+    s->out_sendco.s = s;
+    s->out_sendco.chr = &s->chr_out;
+    s->out_sendco.notify_remote_frame = false;
+    s->out_sendco.done = true;
+    g_queue_init(&s->out_sendco.send_list);
+
+    if (s->notify_dev) {
+        s->notify_sendco.s = s;
+        s->notify_sendco.chr = &s->chr_notify_dev;
+        s->notify_sendco.notify_remote_frame = true;
+        s->notify_sendco.done = true;
+        g_queue_init(&s->notify_sendco.send_list);
+    }
+
     g_queue_init(&s->conn_list);
 
     s->connection_track_table = g_hash_table_new_full(connection_key_hash,
@@ -934,6 +1305,16 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
                                                       connection_destroy);
 
     colo_compare_iothread(s);
+
+    qemu_mutex_lock(&colo_compare_mutex);
+    if (!colo_compare_active) {
+        qemu_mutex_init(&event_mtx);
+        qemu_cond_init(&event_complete_cond);
+        colo_compare_active = true;
+    }
+    QTAILQ_INSERT_TAIL(&net_compares, s, next);
+    qemu_mutex_unlock(&colo_compare_mutex);
+
     return;
 }
 
@@ -948,8 +1329,10 @@ static void colo_flush_packets(void *opaque, void *user_data)
         compare_chr_send(s,
                          pkt->data,
                          pkt->size,
-                         pkt->vnet_hdr_len);
-        packet_destroy(pkt, NULL);
+                         pkt->vnet_hdr_len,
+                         false,
+                         true);
+        packet_destroy_partial(pkt, NULL);
     }
     while (!g_queue_is_empty(&conn->secondary_list)) {
         pkt = g_queue_pop_head(&conn->secondary_list);
@@ -969,38 +1352,81 @@ static void colo_compare_init(Object *obj)
     CompareState *s = COLO_COMPARE(obj);
 
     object_property_add_str(obj, "primary_in",
-                            compare_get_pri_indev, compare_set_pri_indev,
-                            NULL);
+                            compare_get_pri_indev, compare_set_pri_indev);
     object_property_add_str(obj, "secondary_in",
-                            compare_get_sec_indev, compare_set_sec_indev,
-                            NULL);
+                            compare_get_sec_indev, compare_set_sec_indev);
     object_property_add_str(obj, "outdev",
-                            compare_get_outdev, compare_set_outdev,
-                            NULL);
+                            compare_get_outdev, compare_set_outdev);
     object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
                             (Object **)&s->iothread,
                             object_property_allow_set_link,
-                            OBJ_PROP_LINK_UNREF_ON_RELEASE, NULL);
+                            OBJ_PROP_LINK_STRONG);
+    /* This parameter just for Xen COLO */
+    object_property_add_str(obj, "notify_dev",
+                            compare_get_notify_dev, compare_set_notify_dev);
+
+    object_property_add(obj, "compare_timeout", "uint32",
+                        compare_get_timeout,
+                        compare_set_timeout, NULL, NULL);
+
+    object_property_add(obj, "expired_scan_cycle", "uint32",
+                        compare_get_expired_scan_cycle,
+                        compare_set_expired_scan_cycle, NULL, NULL);
 
     s->vnet_hdr = false;
     object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
-                             compare_set_vnet_hdr, NULL);
+                             compare_set_vnet_hdr);
 }
 
 static void colo_compare_finalize(Object *obj)
 {
     CompareState *s = COLO_COMPARE(obj);
+    CompareState *tmp = NULL;
+
+    qemu_mutex_lock(&colo_compare_mutex);
+    QTAILQ_FOREACH(tmp, &net_compares, next) {
+        if (tmp == s) {
+            QTAILQ_REMOVE(&net_compares, s, next);
+            break;
+        }
+    }
+    if (QTAILQ_EMPTY(&net_compares)) {
+        colo_compare_active = false;
+        qemu_mutex_destroy(&event_mtx);
+        qemu_cond_destroy(&event_complete_cond);
+    }
+    qemu_mutex_unlock(&colo_compare_mutex);
 
     qemu_chr_fe_deinit(&s->chr_pri_in, false);
     qemu_chr_fe_deinit(&s->chr_sec_in, false);
     qemu_chr_fe_deinit(&s->chr_out, false);
+    if (s->notify_dev) {
+        qemu_chr_fe_deinit(&s->chr_notify_dev, false);
+    }
+
     if (s->iothread) {
         colo_compare_timer_del(s);
     }
+
+    qemu_bh_delete(s->event_bh);
+
+    AioContext *ctx = iothread_get_aio_context(s->iothread);
+    aio_context_acquire(ctx);
+    AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
+    if (s->notify_dev) {
+        AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
+    }
+    aio_context_release(ctx);
+
     /* Release all unhandled packets after compare thead exited */
     g_queue_foreach(&s->conn_list, colo_flush_packets, s);
+    AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
 
     g_queue_clear(&s->conn_list);
+    g_queue_clear(&s->out_sendco.send_list);
+    if (s->notify_dev) {
+        g_queue_clear(&s->notify_sendco.send_list);
+    }
 
     if (s->connection_track_table) {
         g_hash_table_destroy(s->connection_track_table);
@@ -1009,9 +1435,17 @@ static void colo_compare_finalize(Object *obj)
     if (s->iothread) {
         object_unref(OBJECT(s->iothread));
     }
+
     g_free(s->pri_indev);
     g_free(s->sec_indev);
     g_free(s->outdev);
+    g_free(s->notify_dev);
+}
+
+static void __attribute__((__constructor__)) colo_compare_init_globals(void)
+{
+    colo_compare_active = false;
+    qemu_mutex_init(&colo_compare_mutex);
 }
 
 static const TypeInfo colo_compare_info = {