*/
#include "qemu/osdep.h"
-#include "qemu-common.h"
#include "qemu/error-report.h"
#include "trace.h"
#include "qapi/error.h"
#include "qemu/coroutine.h"
#define TYPE_COLO_COMPARE "colo-compare"
-#define COLO_COMPARE(obj) \
- OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
+typedef struct CompareState CompareState;
+DECLARE_INSTANCE_CHECKER(CompareState, COLO_COMPARE,
+ TYPE_COLO_COMPARE)
static QTAILQ_HEAD(, CompareState) net_compares =
QTAILQ_HEAD_INITIALIZER(net_compares);
#define COLO_COMPARE_FREE_PRIMARY 0x01
#define COLO_COMPARE_FREE_SECONDARY 0x02
-#define REGULAR_PACKET_CHECK_MS 3000
+#define REGULAR_PACKET_CHECK_MS 1000
#define DEFAULT_TIME_OUT_MS 3000
+/* #define DEBUG_COLO_PACKETS */
+
static QemuMutex colo_compare_mutex;
static bool colo_compare_active;
static QemuMutex event_mtx;
uint8_t *buf;
} SendEntry;
-typedef struct CompareState {
+struct CompareState {
Object parent;
char *pri_indev;
SendCo out_sendco;
SendCo notify_sendco;
bool vnet_hdr;
- uint32_t compare_timeout;
+ uint64_t compare_timeout;
uint32_t expired_scan_cycle;
/*
enum colo_event event;
QTAILQ_ENTRY(CompareState) next;
-} CompareState;
+};
typedef struct CompareClass {
ObjectClass parent_class;
return false;
}
- return !memcmp(str, buf, strlen(str));
+ return !memcmp(str, buf, packet_len);
}
static void notify_remote_frame(CompareState *s)
}
}
+/* Use restricted to colo_insert_packet() */
static gint seq_sorter(Packet *a, Packet *b, gpointer data)
{
- struct tcp_hdr *atcp, *btcp;
-
- atcp = (struct tcp_hdr *)(a->transport_header);
- btcp = (struct tcp_hdr *)(b->transport_header);
- return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
+ return b->tcp_seq - a->tcp_seq;
}
static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
pkt->tcp_seq = ntohl(tcphd->th_seq);
pkt->tcp_ack = ntohl(tcphd->th_ack);
- *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
+ /* Need to consider ACK will bigger than uint32_t MAX */
+ *max_ack = pkt->tcp_ack - *max_ack > 0 ? pkt->tcp_ack : *max_ack;
pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
- + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
+ + (tcphd->th_off << 2);
pkt->payload_size = pkt->size - pkt->header_size;
pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
pkt->flags = tcphd->th_flags;
pkt = NULL;
return -1;
}
- fill_connection_key(pkt, &key);
+ fill_connection_key(pkt, &key, false);
conn = connection_get(s->connection_track_table,
&key,
uint16_t len)
{
- if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
+ if (trace_event_get_state_backends(TRACE_COLO_COMPARE_IP_INFO)) {
char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
* can ensure that the packet's payload is acknowledged by
* primary and secondary.
*/
- uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
+ uint32_t min_ack = conn->pack - conn->sack > 0 ?
+ conn->sack : conn->pack;
pri:
if (g_queue_is_empty(&conn->primary_list)) {
return;
}
- ppkt = g_queue_pop_head(&conn->primary_list);
+ ppkt = g_queue_pop_tail(&conn->primary_list);
sec:
if (g_queue_is_empty(&conn->secondary_list)) {
- g_queue_push_head(&conn->primary_list, ppkt);
+ g_queue_push_tail(&conn->primary_list, ppkt);
return;
}
- spkt = g_queue_pop_head(&conn->secondary_list);
+ spkt = g_queue_pop_tail(&conn->secondary_list);
if (ppkt->tcp_seq == ppkt->seq_end) {
colo_release_primary_pkt(s, ppkt);
}
}
if (!ppkt) {
- g_queue_push_head(&conn->secondary_list, spkt);
+ g_queue_push_tail(&conn->secondary_list, spkt);
goto pri;
}
}
if (mark == COLO_COMPARE_FREE_PRIMARY) {
conn->compare_seq = ppkt->seq_end;
colo_release_primary_pkt(s, ppkt);
- g_queue_push_head(&conn->secondary_list, spkt);
+ g_queue_push_tail(&conn->secondary_list, spkt);
goto pri;
- }
- if (mark == COLO_COMPARE_FREE_SECONDARY) {
+ } else if (mark == COLO_COMPARE_FREE_SECONDARY) {
conn->compare_seq = spkt->seq_end;
packet_destroy(spkt, NULL);
goto sec;
- }
- if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
+ } else if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
conn->compare_seq = ppkt->seq_end;
colo_release_primary_pkt(s, ppkt);
packet_destroy(spkt, NULL);
goto pri;
}
} else {
- g_queue_push_head(&conn->primary_list, ppkt);
- g_queue_push_head(&conn->secondary_list, spkt);
-
- if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
- qemu_hexdump((char *)ppkt->data, stderr,
- "colo-compare ppkt", ppkt->size);
- qemu_hexdump((char *)spkt->data, stderr,
- "colo-compare spkt", spkt->size);
- }
+ g_queue_push_tail(&conn->primary_list, ppkt);
+ g_queue_push_tail(&conn->secondary_list, spkt);
+
+#ifdef DEBUG_COLO_PACKETS
+ qemu_hexdump(stderr, "colo-compare ppkt", ppkt->data, ppkt->size);
+ qemu_hexdump(stderr, "colo-compare spkt", spkt->data, spkt->size);
+#endif
colo_compare_inconsistency_notify(s);
}
ppkt->size - offset)) {
trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
- if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
- qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
- ppkt->size);
- qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
- spkt->size);
- }
+#ifdef DEBUG_COLO_PACKETS
+ qemu_hexdump(stderr, "colo-compare pri pkt", ppkt->data, ppkt->size);
+ qemu_hexdump(stderr, "colo-compare sec pkt", spkt->data, spkt->size);
+#endif
return -1;
} else {
return 0;
ppkt->size);
trace_colo_compare_icmp_miscompare("Secondary pkt size",
spkt->size);
- if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
- qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
- ppkt->size);
- qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
- spkt->size);
- }
+#ifdef DEBUG_COLO_PACKETS
+ qemu_hexdump(stderr, "colo-compare pri pkt", ppkt->data, ppkt->size);
+ qemu_hexdump(stderr, "colo-compare sec pkt", spkt->data, spkt->size);
+#endif
return -1;
} else {
return 0;
uint16_t offset = ppkt->vnet_hdr_len;
trace_colo_compare_main("compare other");
- if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
- char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
-
- strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
- strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
- strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
- strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
-
- trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
- pri_ip_dst, spkt->size,
- sec_ip_src, sec_ip_dst);
- }
-
if (ppkt->size != spkt->size) {
trace_colo_compare_main("Other: payload size of packets are different");
return -1;
static int colo_old_packet_check_one_conn(Connection *conn,
CompareState *s)
{
- GList *result = NULL;
-
- result = g_queue_find_custom(&conn->primary_list,
- &s->compare_timeout,
- (GCompareFunc)colo_old_packet_check_one);
+ if (!g_queue_is_empty(&conn->primary_list)) {
+ if (g_queue_find_custom(&conn->primary_list,
+ &s->compare_timeout,
+ (GCompareFunc)colo_old_packet_check_one))
+ goto out;
+ }
- if (result) {
- /* Do checkpoint will flush old packet */
- colo_compare_inconsistency_notify(s);
- return 0;
+ if (!g_queue_is_empty(&conn->secondary_list)) {
+ if (g_queue_find_custom(&conn->secondary_list,
+ &s->compare_timeout,
+ (GCompareFunc)colo_old_packet_check_one))
+ goto out;
}
return 1;
+
+out:
+ /* Do checkpoint will flush old packet */
+ colo_compare_inconsistency_notify(s);
+ return 0;
}
/*
while (!g_queue_is_empty(&conn->primary_list) &&
!g_queue_is_empty(&conn->secondary_list)) {
- pkt = g_queue_pop_head(&conn->primary_list);
+ pkt = g_queue_pop_tail(&conn->primary_list);
result = g_queue_find_custom(&conn->secondary_list,
pkt, (GCompareFunc)HandlePacket);
if (result) {
colo_release_primary_pkt(s, pkt);
- g_queue_remove(&conn->secondary_list, result->data);
+ packet_destroy(result->data, NULL);
+ g_queue_delete_link(&conn->secondary_list, result);
} else {
/*
* If one packet arrive late, the secondary_list or
* timeout, it will trigger a checkpoint request.
*/
trace_colo_compare_main("packet different");
- g_queue_push_head(&conn->primary_list, pkt);
+ g_queue_push_tail(&conn->primary_list, pkt);
colo_compare_inconsistency_notify(s);
break;
}
if (!size) {
- return 0;
+ return -1;
}
entry = g_slice_new(SendEntry);
entry->buf = g_malloc(size);
memcpy(entry->buf, buf, size);
}
- g_queue_push_head(&sendco->send_list, entry);
+ g_queue_push_tail(&sendco->send_list, entry);
if (sendco->done) {
sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
/* if have old packet we will notify checkpoint */
colo_old_packet_check(s);
- timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
+ timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
s->expired_scan_cycle);
}
{
AioContext *ctx = iothread_get_aio_context(s->iothread);
- s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
+ s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_HOST,
SCALE_MS, check_old_packet_regular,
s);
- timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
+ timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
s->expired_scan_cycle);
}
static void colo_compare_timer_del(CompareState *s)
{
if (s->packet_check_timer) {
- timer_del(s->packet_check_timer);
timer_free(s->packet_check_timer);
s->packet_check_timer = NULL;
}
Error **errp)
{
CompareState *s = COLO_COMPARE(obj);
- uint32_t value = s->compare_timeout;
+ uint64_t value = s->compare_timeout;
- visit_type_uint32(v, name, &value, errp);
+ visit_type_uint64(v, name, &value, errp);
}
static void compare_set_timeout(Object *obj, Visitor *v,
const char *name, void *opaque,
Error **errp)
{
- Error *local_err = NULL;
- uint32_t value;
+ uint64_t value;
- visit_type_uint32(v, name, &value, &local_err);
- if (local_err) {
- goto out;
+ if (!visit_type_uint64(v, name, &value, errp)) {
+ return;
}
if (!value) {
- error_setg(&local_err, "Property '%s.%s' requires a positive value",
+ error_setg(errp, "Property '%s.%s' requires a positive value",
object_get_typename(obj), name);
- goto out;
+ return;
}
max_queue_size = value;
-
-out:
- error_propagate(errp, local_err);
}
static void compare_pri_rs_finalize(SocketReadState *pri_rs)
}
if (!s->expired_scan_cycle) {
- /* Set default value to 3000 MS */
+ /* Set default value to 1000 MS */
s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS;
}
s->connection_track_table = g_hash_table_new_full(connection_key_hash,
connection_key_equal,
g_free,
- connection_destroy);
+ NULL);
colo_compare_iothread(s);
Packet *pkt = NULL;
while (!g_queue_is_empty(&conn->primary_list)) {
- pkt = g_queue_pop_head(&conn->primary_list);
+ pkt = g_queue_pop_tail(&conn->primary_list);
compare_chr_send(s,
pkt->data,
pkt->size,
packet_destroy_partial(pkt, NULL);
}
while (!g_queue_is_empty(&conn->secondary_list)) {
- pkt = g_queue_pop_head(&conn->secondary_list);
+ pkt = g_queue_pop_tail(&conn->secondary_list);
packet_destroy(pkt, NULL);
}
}
object_property_add_str(obj, "notify_dev",
compare_get_notify_dev, compare_set_notify_dev);
- object_property_add(obj, "compare_timeout", "uint32",
+ object_property_add(obj, "compare_timeout", "uint64",
compare_get_timeout,
compare_set_timeout, NULL, NULL);
compare_set_vnet_hdr);
}
+void colo_compare_cleanup(void)
+{
+ CompareState *tmp = NULL;
+ CompareState *n = NULL;
+
+ QTAILQ_FOREACH_SAFE(tmp, &net_compares, next, n) {
+ object_unparent(OBJECT(tmp));
+ }
+}
+
static void colo_compare_finalize(Object *obj)
{
CompareState *s = COLO_COMPARE(obj);
qemu_chr_fe_deinit(&s->chr_notify_dev, false);
}
- if (s->iothread) {
- colo_compare_timer_del(s);
- }
+ colo_compare_timer_del(s);
qemu_bh_delete(s->event_bh);
AioContext *ctx = iothread_get_aio_context(s->iothread);
- aio_context_acquire(ctx);
AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
if (s->notify_dev) {
AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
}
- aio_context_release(ctx);
/* Release all unhandled packets after compare thead exited */
g_queue_foreach(&s->conn_list, colo_flush_packets, s);
g_hash_table_destroy(s->connection_track_table);
}
- if (s->iothread) {
- object_unref(OBJECT(s->iothread));
- }
+ object_unref(OBJECT(s->iothread));
g_free(s->pri_indev);
g_free(s->sec_indev);