]> git.proxmox.com Git - mirror_qemu.git/blobdiff - net/colo-compare.c
vdpa: Add SetSteeringEBPF method for NetClientState
[mirror_qemu.git] / net / colo-compare.c
index 398b7546ff8c040a970b844969e74d3d6afa2a6c..7f9e6f89ce058fdaa89b0084cf350298ed20b8ab 100644 (file)
@@ -13,7 +13,6 @@
  */
 
 #include "qemu/osdep.h"
-#include "qemu-common.h"
 #include "qemu/error-report.h"
 #include "trace.h"
 #include "qapi/error.h"
@@ -36,8 +35,9 @@
 #include "qemu/coroutine.h"
 
 #define TYPE_COLO_COMPARE "colo-compare"
-#define COLO_COMPARE(obj) \
-    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
+typedef struct CompareState CompareState;
+DECLARE_INSTANCE_CHECKER(CompareState, COLO_COMPARE,
+                         TYPE_COLO_COMPARE)
 
 static QTAILQ_HEAD(, CompareState) net_compares =
        QTAILQ_HEAD_INITIALIZER(net_compares);
@@ -51,14 +51,17 @@ static NotifierList colo_compare_notifiers =
 #define COLO_COMPARE_FREE_PRIMARY     0x01
 #define COLO_COMPARE_FREE_SECONDARY   0x02
 
-#define REGULAR_PACKET_CHECK_MS 3000
+#define REGULAR_PACKET_CHECK_MS 1000
 #define DEFAULT_TIME_OUT_MS 3000
 
+/* #define DEBUG_COLO_PACKETS */
+
 static QemuMutex colo_compare_mutex;
 static bool colo_compare_active;
 static QemuMutex event_mtx;
 static QemuCond event_complete_cond;
 static int event_unhandled_count;
+static uint32_t max_queue_size;
 
 /*
  *  + CompareState ++
@@ -99,7 +102,7 @@ typedef struct SendEntry {
     uint8_t *buf;
 } SendEntry;
 
-typedef struct CompareState {
+struct CompareState {
     Object parent;
 
     char *pri_indev;
@@ -116,7 +119,7 @@ typedef struct CompareState {
     SendCo out_sendco;
     SendCo notify_sendco;
     bool vnet_hdr;
-    uint32_t compare_timeout;
+    uint64_t compare_timeout;
     uint32_t expired_scan_cycle;
 
     /*
@@ -135,7 +138,7 @@ typedef struct CompareState {
     enum colo_event event;
 
     QTAILQ_ENTRY(CompareState) next;
-} CompareState;
+};
 
 typedef struct CompareClass {
     ObjectClass parent_class;
@@ -166,7 +169,7 @@ static bool packet_matches_str(const char *str,
         return false;
     }
 
-    return !memcmp(str, buf, strlen(str));
+    return !memcmp(str, buf, packet_len);
 }
 
 static void notify_remote_frame(CompareState *s)
@@ -190,13 +193,10 @@ static void colo_compare_inconsistency_notify(CompareState *s)
     }
 }
 
+/* Use restricted to colo_insert_packet() */
 static gint seq_sorter(Packet *a, Packet *b, gpointer data)
 {
-    struct tcp_hdr *atcp, *btcp;
-
-    atcp = (struct tcp_hdr *)(a->transport_header);
-    btcp = (struct tcp_hdr *)(b->transport_header);
-    return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
+    return b->tcp_seq - a->tcp_seq;
 }
 
 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
@@ -208,9 +208,10 @@ static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
 
     pkt->tcp_seq = ntohl(tcphd->th_seq);
     pkt->tcp_ack = ntohl(tcphd->th_ack);
-    *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
+    /* Need to consider ACK will bigger than uint32_t MAX */
+    *max_ack = pkt->tcp_ack - *max_ack > 0 ? pkt->tcp_ack : *max_ack;
     pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
-                       + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
+                       + (tcphd->th_off << 2);
     pkt->payload_size = pkt->size - pkt->header_size;
     pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
     pkt->flags = tcphd->th_flags;
@@ -222,7 +223,7 @@ static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
  */
 static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
 {
-    if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
+    if (g_queue_get_length(queue) <= max_queue_size) {
         if (pkt->ip->ip_p == IPPROTO_TCP) {
             fill_pkt_tcp_info(pkt, max_ack);
             g_queue_insert_sorted(queue,
@@ -263,7 +264,7 @@ static int packet_enqueue(CompareState *s, int mode, Connection **con)
         pkt = NULL;
         return -1;
     }
-    fill_connection_key(pkt, &key);
+    fill_connection_key(pkt, &key, false);
 
     conn = connection_get(s->connection_track_table,
                           &key,
@@ -327,7 +328,7 @@ static int colo_compare_packet_payload(Packet *ppkt,
                                        uint16_t len)
 
 {
-    if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
+    if (trace_event_get_state_backends(TRACE_COLO_COMPARE_IP_INFO)) {
         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
 
         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
@@ -412,19 +413,20 @@ static void colo_compare_tcp(CompareState *s, Connection *conn)
      * can ensure that the packet's payload is acknowledged by
      * primary and secondary.
     */
-    uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
+    uint32_t min_ack = conn->pack - conn->sack > 0 ?
+                       conn->sack : conn->pack;
 
 pri:
     if (g_queue_is_empty(&conn->primary_list)) {
         return;
     }
-    ppkt = g_queue_pop_head(&conn->primary_list);
+    ppkt = g_queue_pop_tail(&conn->primary_list);
 sec:
     if (g_queue_is_empty(&conn->secondary_list)) {
-        g_queue_push_head(&conn->primary_list, ppkt);
+        g_queue_push_tail(&conn->primary_list, ppkt);
         return;
     }
-    spkt = g_queue_pop_head(&conn->secondary_list);
+    spkt = g_queue_pop_tail(&conn->secondary_list);
 
     if (ppkt->tcp_seq == ppkt->seq_end) {
         colo_release_primary_pkt(s, ppkt);
@@ -455,7 +457,7 @@ sec:
             }
         }
         if (!ppkt) {
-            g_queue_push_head(&conn->secondary_list, spkt);
+            g_queue_push_tail(&conn->secondary_list, spkt);
             goto pri;
         }
     }
@@ -474,30 +476,26 @@ sec:
         if (mark == COLO_COMPARE_FREE_PRIMARY) {
             conn->compare_seq = ppkt->seq_end;
             colo_release_primary_pkt(s, ppkt);
-            g_queue_push_head(&conn->secondary_list, spkt);
+            g_queue_push_tail(&conn->secondary_list, spkt);
             goto pri;
-        }
-        if (mark == COLO_COMPARE_FREE_SECONDARY) {
+        } else if (mark == COLO_COMPARE_FREE_SECONDARY) {
             conn->compare_seq = spkt->seq_end;
             packet_destroy(spkt, NULL);
             goto sec;
-        }
-        if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
+        } else if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
             conn->compare_seq = ppkt->seq_end;
             colo_release_primary_pkt(s, ppkt);
             packet_destroy(spkt, NULL);
             goto pri;
         }
     } else {
-        g_queue_push_head(&conn->primary_list, ppkt);
-        g_queue_push_head(&conn->secondary_list, spkt);
-
-        if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
-            qemu_hexdump((char *)ppkt->data, stderr,
-                        "colo-compare ppkt", ppkt->size);
-            qemu_hexdump((char *)spkt->data, stderr,
-                        "colo-compare spkt", spkt->size);
-        }
+        g_queue_push_tail(&conn->primary_list, ppkt);
+        g_queue_push_tail(&conn->secondary_list, spkt);
+
+#ifdef DEBUG_COLO_PACKETS
+        qemu_hexdump(stderr, "colo-compare ppkt", ppkt->data, ppkt->size);
+        qemu_hexdump(stderr, "colo-compare spkt", spkt->data, spkt->size);
+#endif
 
         colo_compare_inconsistency_notify(s);
     }
@@ -533,12 +531,10 @@ static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
                                     ppkt->size - offset)) {
         trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
         trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
-        if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
-            qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
-                         ppkt->size);
-            qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
-                         spkt->size);
-        }
+#ifdef DEBUG_COLO_PACKETS
+        qemu_hexdump(stderr, "colo-compare pri pkt", ppkt->data, ppkt->size);
+        qemu_hexdump(stderr, "colo-compare sec pkt", spkt->data, spkt->size);
+#endif
         return -1;
     } else {
         return 0;
@@ -576,12 +572,10 @@ static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
                                            ppkt->size);
         trace_colo_compare_icmp_miscompare("Secondary pkt size",
                                            spkt->size);
-        if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
-            qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
-                         ppkt->size);
-            qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
-                         spkt->size);
-        }
+#ifdef DEBUG_COLO_PACKETS
+        qemu_hexdump(stderr, "colo-compare pri pkt", ppkt->data, ppkt->size);
+        qemu_hexdump(stderr, "colo-compare sec pkt", spkt->data, spkt->size);
+#endif
         return -1;
     } else {
         return 0;
@@ -597,19 +591,6 @@ static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
     uint16_t offset = ppkt->vnet_hdr_len;
 
     trace_colo_compare_main("compare other");
-    if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
-        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
-
-        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
-        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
-        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
-        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
-
-        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
-                                   pri_ip_dst, spkt->size,
-                                   sec_ip_src, sec_ip_dst);
-    }
-
     if (ppkt->size != spkt->size) {
         trace_colo_compare_main("Other: payload size of packets are different");
         return -1;
@@ -643,19 +624,26 @@ void colo_compare_unregister_notifier(Notifier *notify)
 static int colo_old_packet_check_one_conn(Connection *conn,
                                           CompareState *s)
 {
-    GList *result = NULL;
-
-    result = g_queue_find_custom(&conn->primary_list,
-                                 &s->compare_timeout,
-                                 (GCompareFunc)colo_old_packet_check_one);
+    if (!g_queue_is_empty(&conn->primary_list)) {
+        if (g_queue_find_custom(&conn->primary_list,
+                                &s->compare_timeout,
+                                (GCompareFunc)colo_old_packet_check_one))
+            goto out;
+    }
 
-    if (result) {
-        /* Do checkpoint will flush old packet */
-        colo_compare_inconsistency_notify(s);
-        return 0;
+    if (!g_queue_is_empty(&conn->secondary_list)) {
+        if (g_queue_find_custom(&conn->secondary_list,
+                                &s->compare_timeout,
+                                (GCompareFunc)colo_old_packet_check_one))
+            goto out;
     }
 
     return 1;
+
+out:
+    /* Do checkpoint will flush old packet */
+    colo_compare_inconsistency_notify(s);
+    return 0;
 }
 
 /*
@@ -684,13 +672,14 @@ static void colo_compare_packet(CompareState *s, Connection *conn,
 
     while (!g_queue_is_empty(&conn->primary_list) &&
            !g_queue_is_empty(&conn->secondary_list)) {
-        pkt = g_queue_pop_head(&conn->primary_list);
+        pkt = g_queue_pop_tail(&conn->primary_list);
         result = g_queue_find_custom(&conn->secondary_list,
                  pkt, (GCompareFunc)HandlePacket);
 
         if (result) {
             colo_release_primary_pkt(s, pkt);
-            g_queue_remove(&conn->secondary_list, result->data);
+            packet_destroy(result->data, NULL);
+            g_queue_delete_link(&conn->secondary_list, result);
         } else {
             /*
              * If one packet arrive late, the secondary_list or
@@ -699,7 +688,7 @@ static void colo_compare_packet(CompareState *s, Connection *conn,
              * timeout, it will trigger a checkpoint request.
              */
             trace_colo_compare_main("packet different");
-            g_queue_push_head(&conn->primary_list, pkt);
+            g_queue_push_tail(&conn->primary_list, pkt);
 
             colo_compare_inconsistency_notify(s);
             break;
@@ -817,7 +806,7 @@ static int compare_chr_send(CompareState *s,
     }
 
     if (!size) {
-        return 0;
+        return -1;
     }
 
     entry = g_slice_new(SendEntry);
@@ -829,7 +818,7 @@ static int compare_chr_send(CompareState *s,
         entry->buf = g_malloc(size);
         memcpy(entry->buf, buf, size);
     }
-    g_queue_push_head(&sendco->send_list, entry);
+    g_queue_push_tail(&sendco->send_list, entry);
 
     if (sendco->done) {
         sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
@@ -907,7 +896,7 @@ static void check_old_packet_regular(void *opaque)
 
     /* if have old packet we will notify checkpoint */
     colo_old_packet_check(s);
-    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
+    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
               s->expired_scan_cycle);
 }
 
@@ -941,17 +930,16 @@ static void colo_compare_timer_init(CompareState *s)
 {
     AioContext *ctx = iothread_get_aio_context(s->iothread);
 
-    s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
+    s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_HOST,
                                 SCALE_MS, check_old_packet_regular,
                                 s);
-    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
+    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
               s->expired_scan_cycle);
 }
 
 static void colo_compare_timer_del(CompareState *s)
 {
     if (s->packet_check_timer) {
-        timer_del(s->packet_check_timer);
         timer_free(s->packet_check_timer);
         s->packet_check_timer = NULL;
     }
@@ -1083,9 +1071,9 @@ static void compare_get_timeout(Object *obj, Visitor *v,
                                 Error **errp)
 {
     CompareState *s = COLO_COMPARE(obj);
-    uint32_t value = s->compare_timeout;
+    uint64_t value = s->compare_timeout;
 
-    visit_type_uint32(v, name, &value, errp);
+    visit_type_uint64(v, name, &value, errp);
 }
 
 static void compare_set_timeout(Object *obj, Visitor *v,
@@ -1134,6 +1122,32 @@ static void compare_set_expired_scan_cycle(Object *obj, Visitor *v,
     s->expired_scan_cycle = value;
 }
 
+static void get_max_queue_size(Object *obj, Visitor *v,
+                               const char *name, void *opaque,
+                               Error **errp)
+{
+    uint32_t value = max_queue_size;
+
+    visit_type_uint32(v, name, &value, errp);
+}
+
+static void set_max_queue_size(Object *obj, Visitor *v,
+                               const char *name, void *opaque,
+                               Error **errp)
+{
+    uint64_t value;
+
+    if (!visit_type_uint64(v, name, &value, errp)) {
+        return;
+    }
+    if (!value) {
+        error_setg(errp, "Property '%s.%s' requires a positive value",
+                   object_get_typename(obj), name);
+        return;
+    }
+    max_queue_size = value;
+}
+
 static void compare_pri_rs_finalize(SocketReadState *pri_rs)
 {
     CompareState *s = container_of(pri_rs, CompareState, pri_rs);
@@ -1247,10 +1261,15 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
     }
 
     if (!s->expired_scan_cycle) {
-        /* Set default value to 3000 MS */
+        /* Set default value to 1000 MS */
         s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS;
     }
 
+    if (!max_queue_size) {
+        /* Set default queue size to 1024 */
+        max_queue_size = MAX_QUEUE_SIZE;
+    }
+
     if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
         !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
         return;
@@ -1299,7 +1318,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
     s->connection_track_table = g_hash_table_new_full(connection_key_hash,
                                                       connection_key_equal,
                                                       g_free,
-                                                      connection_destroy);
+                                                      NULL);
 
     colo_compare_iothread(s);
 
@@ -1322,7 +1341,7 @@ static void colo_flush_packets(void *opaque, void *user_data)
     Packet *pkt = NULL;
 
     while (!g_queue_is_empty(&conn->primary_list)) {
-        pkt = g_queue_pop_head(&conn->primary_list);
+        pkt = g_queue_pop_tail(&conn->primary_list);
         compare_chr_send(s,
                          pkt->data,
                          pkt->size,
@@ -1332,7 +1351,7 @@ static void colo_flush_packets(void *opaque, void *user_data)
         packet_destroy_partial(pkt, NULL);
     }
     while (!g_queue_is_empty(&conn->secondary_list)) {
-        pkt = g_queue_pop_head(&conn->secondary_list);
+        pkt = g_queue_pop_tail(&conn->secondary_list);
         packet_destroy(pkt, NULL);
     }
 }
@@ -1362,7 +1381,7 @@ static void colo_compare_init(Object *obj)
     object_property_add_str(obj, "notify_dev",
                             compare_get_notify_dev, compare_set_notify_dev);
 
-    object_property_add(obj, "compare_timeout", "uint32",
+    object_property_add(obj, "compare_timeout", "uint64",
                         compare_get_timeout,
                         compare_set_timeout, NULL, NULL);
 
@@ -1370,11 +1389,25 @@ static void colo_compare_init(Object *obj)
                         compare_get_expired_scan_cycle,
                         compare_set_expired_scan_cycle, NULL, NULL);
 
+    object_property_add(obj, "max_queue_size", "uint32",
+                        get_max_queue_size,
+                        set_max_queue_size, NULL, NULL);
+
     s->vnet_hdr = false;
     object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
                              compare_set_vnet_hdr);
 }
 
+void colo_compare_cleanup(void)
+{
+    CompareState *tmp = NULL;
+    CompareState *n = NULL;
+
+    QTAILQ_FOREACH_SAFE(tmp, &net_compares, next, n) {
+        object_unparent(OBJECT(tmp));
+    }
+}
+
 static void colo_compare_finalize(Object *obj)
 {
     CompareState *s = COLO_COMPARE(obj);
@@ -1401,9 +1434,7 @@ static void colo_compare_finalize(Object *obj)
         qemu_chr_fe_deinit(&s->chr_notify_dev, false);
     }
 
-    if (s->iothread) {
-        colo_compare_timer_del(s);
-    }
+    colo_compare_timer_del(s);
 
     qemu_bh_delete(s->event_bh);
 
@@ -1429,9 +1460,7 @@ static void colo_compare_finalize(Object *obj)
         g_hash_table_destroy(s->connection_track_table);
     }
 
-    if (s->iothread) {
-        object_unref(OBJECT(s->iothread));
-    }
+    object_unref(OBJECT(s->iothread));
 
     g_free(s->pri_indev);
     g_free(s->sec_indev);