]> git.proxmox.com Git - mirror_qemu.git/blobdiff - net/colo-compare.c
net: Introduce announce timer
[mirror_qemu.git] / net / colo-compare.c
index dd745a491ba37262466c9e4586bf5624ca5ffd94..bf10526f054bea922dee910b40ee8f1aa2de8df0 100644 (file)
 #include "qemu/sockets.h"
 #include "colo.h"
 #include "sysemu/iothread.h"
+#include "net/colo-compare.h"
+#include "migration/colo.h"
+#include "migration/migration.h"
+#include "util.h"
 
 #define TYPE_COLO_COMPARE "colo-compare"
 #define COLO_COMPARE(obj) \
     OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
 
+static QTAILQ_HEAD(, CompareState) net_compares =
+       QTAILQ_HEAD_INITIALIZER(net_compares);
+
+static NotifierList colo_compare_notifiers =
+    NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
+
 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
 #define MAX_QUEUE_SIZE 1024
 
 /* TODO: Should be configurable */
 #define REGULAR_PACKET_CHECK_MS 3000
 
+static QemuMutex event_mtx;
+static QemuCond event_complete_cond;
+static int event_unhandled_count;
+
 /*
  *  + CompareState ++
  *  |               |
@@ -87,6 +101,11 @@ typedef struct CompareState {
     IOThread *iothread;
     GMainContext *worker_context;
     QEMUTimer *packet_check_timer;
+
+    QEMUBH *event_bh;
+    enum colo_event event;
+
+    QTAILQ_ENTRY(CompareState) next;
 } CompareState;
 
 typedef struct CompareClass {
@@ -98,6 +117,12 @@ enum {
     SECONDARY_IN,
 };
 
+static void colo_compare_inconsistency_notify(void)
+{
+    notifier_list_notify(&colo_compare_notifiers,
+                migrate_get_current());
+}
+
 static int compare_chr_send(CompareState *s,
                             const uint8_t *buf,
                             uint32_t size,
@@ -105,19 +130,19 @@ static int compare_chr_send(CompareState *s,
 
 static gint seq_sorter(Packet *a, Packet *b, gpointer data)
 {
-    struct tcphdr *atcp, *btcp;
+    struct tcp_hdr *atcp, *btcp;
 
-    atcp = (struct tcphdr *)(a->transport_header);
-    btcp = (struct tcphdr *)(b->transport_header);
+    atcp = (struct tcp_hdr *)(a->transport_header);
+    btcp = (struct tcp_hdr *)(b->transport_header);
     return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
 }
 
 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
 {
     Packet *pkt = data;
-    struct tcphdr *tcphd;
+    struct tcp_hdr *tcphd;
 
-    tcphd = (struct tcphdr *)pkt->transport_header;
+    tcphd = (struct tcp_hdr *)pkt->transport_header;
 
     pkt->tcp_seq = ntohl(tcphd->th_seq);
     pkt->tcp_ack = ntohl(tcphd->th_ack);
@@ -261,14 +286,6 @@ static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
 {
     *mark = 0;
 
-    if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
-        if (colo_compare_packet_payload(ppkt, spkt,
-                                        ppkt->header_size, spkt->header_size,
-                                        ppkt->payload_size)) {
-            *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
-            return true;
-        }
-    }
     if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
         if (colo_compare_packet_payload(ppkt, spkt,
                                         ppkt->header_size, spkt->header_size,
@@ -413,10 +430,7 @@ sec:
         qemu_hexdump((char *)spkt->data, stderr,
                      "colo-compare spkt", spkt->size);
 
-        /*
-         * colo_compare_inconsistent_notify();
-         * TODO: notice to checkpoint();
-         */
+        colo_compare_inconsistency_notify();
     }
 }
 
@@ -547,8 +561,18 @@ static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
     }
 }
 
+void colo_compare_register_notifier(Notifier *notify)
+{
+    notifier_list_add(&colo_compare_notifiers, notify);
+}
+
+void colo_compare_unregister_notifier(Notifier *notify)
+{
+    notifier_remove(notify);
+}
+
 static int colo_old_packet_check_one_conn(Connection *conn,
-                                          void *user_data)
+                                           void *user_data)
 {
     GList *result = NULL;
     int64_t check_time = REGULAR_PACKET_CHECK_MS;
@@ -559,10 +583,7 @@ static int colo_old_packet_check_one_conn(Connection *conn,
 
     if (result) {
         /* Do checkpoint will flush old packet */
-        /*
-         * TODO: Notify colo frame to do checkpoint.
-         * colo_compare_inconsistent_notify();
-         */
+        colo_compare_inconsistency_notify();
         return 0;
     }
 
@@ -606,11 +627,12 @@ static void colo_compare_packet(CompareState *s, Connection *conn,
             /*
              * If one packet arrive late, the secondary_list or
              * primary_list will be empty, so we can't compare it
-             * until next comparison.
+             * until next comparison. If the packets in the list are
+             * timeout, it will trigger a checkpoint request.
              */
             trace_colo_compare_main("packet different");
             g_queue_push_head(&conn->primary_list, pkt);
-            /* TODO: colo_notify_checkpoint();*/
+            colo_compare_inconsistency_notify();
             break;
         }
     }
@@ -736,6 +758,25 @@ static void check_old_packet_regular(void *opaque)
                 REGULAR_PACKET_CHECK_MS);
 }
 
+/* Public API, Used for COLO frame to notify compare event */
+void colo_notify_compares_event(void *opaque, int event, Error **errp)
+{
+    CompareState *s;
+
+    qemu_mutex_lock(&event_mtx);
+    QTAILQ_FOREACH(s, &net_compares, next) {
+        s->event = event;
+        qemu_bh_schedule(s->event_bh);
+        event_unhandled_count++;
+    }
+    /* Wait all compare threads to finish handling this event */
+    while (event_unhandled_count > 0) {
+        qemu_cond_wait(&event_complete_cond, &event_mtx);
+    }
+
+    qemu_mutex_unlock(&event_mtx);
+}
+
 static void colo_compare_timer_init(CompareState *s)
 {
     AioContext *ctx = iothread_get_aio_context(s->iothread);
@@ -756,6 +797,30 @@ static void colo_compare_timer_del(CompareState *s)
     }
  }
 
+static void colo_flush_packets(void *opaque, void *user_data);
+
+static void colo_compare_handle_event(void *opaque)
+{
+    CompareState *s = opaque;
+
+    switch (s->event) {
+    case COLO_EVENT_CHECKPOINT:
+        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
+        break;
+    case COLO_EVENT_FAILOVER:
+        break;
+    default:
+        break;
+    }
+
+    assert(event_unhandled_count > 0);
+
+    qemu_mutex_lock(&event_mtx);
+    event_unhandled_count--;
+    qemu_cond_broadcast(&event_complete_cond);
+    qemu_mutex_unlock(&event_mtx);
+}
+
 static void colo_compare_iothread(CompareState *s)
 {
     object_ref(OBJECT(s->iothread));
@@ -769,6 +834,7 @@ static void colo_compare_iothread(CompareState *s)
                              s, s->worker_context, true);
 
     colo_compare_timer_init(s);
+    s->event_bh = qemu_bh_new(colo_compare_handle_event, s);
 }
 
 static char *compare_get_pri_indev(Object *obj, Error **errp)
@@ -884,6 +950,12 @@ static int find_and_check_chardev(Chardev **chr,
         return 1;
     }
 
+    if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
+        error_setg(errp, "chardev \"%s\" cannot switch context",
+                   chr_name);
+        return 1;
+    }
+
     return 0;
 }
 
@@ -926,8 +998,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
     net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
     net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
 
+    QTAILQ_INSERT_TAIL(&net_compares, s, next);
+
     g_queue_init(&s->conn_list);
 
+    qemu_mutex_init(&event_mtx);
+    qemu_cond_init(&event_complete_cond);
+
     s->connection_track_table = g_hash_table_new_full(connection_key_hash,
                                                       connection_key_equal,
                                                       g_free,
@@ -990,6 +1067,7 @@ static void colo_compare_init(Object *obj)
 static void colo_compare_finalize(Object *obj)
 {
     CompareState *s = COLO_COMPARE(obj);
+    CompareState *tmp = NULL;
 
     qemu_chr_fe_deinit(&s->chr_pri_in, false);
     qemu_chr_fe_deinit(&s->chr_sec_in, false);
@@ -997,6 +1075,16 @@ static void colo_compare_finalize(Object *obj)
     if (s->iothread) {
         colo_compare_timer_del(s);
     }
+
+    qemu_bh_delete(s->event_bh);
+
+    QTAILQ_FOREACH(tmp, &net_compares, next) {
+        if (tmp == s) {
+            QTAILQ_REMOVE(&net_compares, s, next);
+            break;
+        }
+    }
+
     /* Release all unhandled packets after compare thead exited */
     g_queue_foreach(&s->conn_list, colo_flush_packets, s);
 
@@ -1009,6 +1097,10 @@ static void colo_compare_finalize(Object *obj)
     if (s->iothread) {
         object_unref(OBJECT(s->iothread));
     }
+
+    qemu_mutex_destroy(&event_mtx);
+    qemu_cond_destroy(&event_complete_cond);
+
     g_free(s->pri_indev);
     g_free(s->sec_indev);
     g_free(s->outdev);