}
static enum ct_update_res
-tcp_conn_update(struct conn* conn_, struct dp_packet *pkt, bool reply,
- long long now)
+tcp_conn_update(struct conn *conn_, struct conntrack_bucket *ctb,
+ struct dp_packet *pkt, bool reply, long long now)
{
struct conn_tcp *conn = conn_tcp_cast(conn_);
struct tcp_header *tcp = dp_packet_l4(pkt);
if (src->state >= CT_DPIF_TCPS_FIN_WAIT_2
&& dst->state >= CT_DPIF_TCPS_FIN_WAIT_2) {
- update_expiration(conn_, CT_TM_TCP_CLOSED, now);
+ conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSED, now);
} else if (src->state >= CT_DPIF_TCPS_CLOSING
&& dst->state >= CT_DPIF_TCPS_CLOSING) {
- update_expiration(conn_, CT_TM_TCP_FIN_WAIT, now);
+ conn_update_expiration(ctb, &conn->up, CT_TM_TCP_FIN_WAIT, now);
} else if (src->state < CT_DPIF_TCPS_ESTABLISHED
|| dst->state < CT_DPIF_TCPS_ESTABLISHED) {
- update_expiration(conn_, now, CT_TM_TCP_OPENING);
+ conn_update_expiration(ctb, &conn->up, CT_TM_TCP_OPENING, now);
} else if (src->state >= CT_DPIF_TCPS_CLOSING
|| dst->state >= CT_DPIF_TCPS_CLOSING) {
- update_expiration(conn_, now, CT_TM_TCP_CLOSING);
+ conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSING, now);
} else {
- update_expiration(conn_, now, CT_TM_TCP_ESTABLISHED);
+ conn_update_expiration(ctb, &conn->up, CT_TM_TCP_ESTABLISHED, now);
}
} else if ((dst->state < CT_DPIF_TCPS_SYN_SENT
|| dst->state >= CT_DPIF_TCPS_FIN_WAIT_2
}
static struct conn *
-tcp_new_conn(struct dp_packet *pkt, long long now)
+tcp_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt,
+ long long now)
{
struct conn_tcp* newconn = NULL;
struct tcp_header *tcp = dp_packet_l4(pkt);
src->state = CT_DPIF_TCPS_SYN_SENT;
dst->state = CT_DPIF_TCPS_CLOSED;
- update_expiration(&newconn->up, now, CT_TM_TCP_FIRST_PACKET);
+ conn_init_expiration(ctb, &newconn->up, CT_TM_TCP_FIRST_PACKET,
+ now);
return &newconn->up;
}
#include "openvswitch/hmap.h"
#include "openvswitch/vlog.h"
#include "ovs-rcu.h"
+#include "ovs-thread.h"
+#include "poll-loop.h"
#include "random.h"
#include "timeval.h"
VLOG_DEFINE_THIS_MODULE(conntrack);
COVERAGE_DEFINE(conntrack_full);
+COVERAGE_DEFINE(conntrack_long_cleanup);
struct conn_lookup_ctx {
struct conn_key key;
struct conn_lookup_ctx *ctx,
long long now);
static bool valid_new(struct dp_packet *pkt, struct conn_key *);
-static struct conn *new_conn(struct dp_packet *pkt, struct conn_key *,
- long long now);
+static struct conn *new_conn(struct conntrack_bucket *, struct dp_packet *pkt,
+ struct conn_key *, long long now);
static void delete_conn(struct conn *);
-static enum ct_update_res conn_update(struct conn *, struct dp_packet*,
- bool reply, long long now);
+static enum ct_update_res conn_update(struct conn *,
+ struct conntrack_bucket *ctb,
+ struct dp_packet *, bool reply,
+ long long now);
static bool conn_expired(struct conn *, long long now);
static void set_mark(struct dp_packet *, struct conn *,
uint32_t val, uint32_t mask);
static void set_label(struct dp_packet *, struct conn *,
const struct ovs_key_ct_labels *val,
const struct ovs_key_ct_labels *mask);
+static void *clean_thread_main(void *f_);
static struct ct_l4_proto *l4_protos[] = {
[IPPROTO_TCP] = &ct_proto_tcp,
void
conntrack_init(struct conntrack *ct)
{
- unsigned i;
+ unsigned i, j;
+ long long now = time_msec();
for (i = 0; i < CONNTRACK_BUCKETS; i++) {
struct conntrack_bucket *ctb = &ct->buckets[i];
ct_lock_init(&ctb->lock);
ct_lock_lock(&ctb->lock);
hmap_init(&ctb->connections);
+ for (j = 0; j < ARRAY_SIZE(ctb->exp_lists); j++) {
+ ovs_list_init(&ctb->exp_lists[j]);
+ }
ct_lock_unlock(&ctb->lock);
+ ovs_mutex_init(&ctb->cleanup_mutex);
+ ovs_mutex_lock(&ctb->cleanup_mutex);
+ ctb->next_cleanup = now + CT_TM_MIN;
+ ovs_mutex_unlock(&ctb->cleanup_mutex);
}
ct->hash_basis = random_uint32();
atomic_count_init(&ct->n_conn, 0);
atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT);
+ latch_init(&ct->clean_thread_exit);
+ ct->clean_thread = ovs_thread_create("ct_clean", clean_thread_main, ct);
}
/* Destroys the connection tracker 'ct' and frees all the allocated memory. */
{
unsigned i;
+ latch_set(&ct->clean_thread_exit);
+ pthread_join(ct->clean_thread, NULL);
+ latch_destroy(&ct->clean_thread_exit);
for (i = 0; i < CONNTRACK_BUCKETS; i++) {
struct conntrack_bucket *ctb = &ct->buckets[i];
struct conn *conn;
+ ovs_mutex_destroy(&ctb->cleanup_mutex);
ct_lock_lock(&ctb->lock);
HMAP_FOR_EACH_POP(conn, node, &ctb->connections) {
atomic_count_dec(&ct->n_conn);
return nc;
}
- nc = new_conn(pkt, &ctx->key, now);
+ nc = new_conn(&ct->buckets[bucket], pkt, &ctx->key, now);
memcpy(&nc->rev_key, &ctx->key, sizeof nc->rev_key);
} else {
enum ct_update_res res;
- res = conn_update(conn, pkt, ctx->reply, now);
+ res = conn_update(conn, &ct->buckets[bucket], pkt,
+ ctx->reply, now);
switch (res) {
case CT_UPDATE_VALID:
state |= CS_INVALID;
break;
case CT_UPDATE_NEW:
+ ovs_list_remove(&conn->exp_node);
hmap_remove(&ct->buckets[bucket].connections, &conn->node);
atomic_count_dec(&ct->n_conn);
delete_conn(conn);
conn->label = pkt->md.ct_label;
}
\f
+/* Delete the expired connections from 'ctb', up to 'limit'. Returns the
+ * earliest expiration time among the remaining connections in 'ctb'. Returns
+ * LLONG_MAX if 'ctb' is empty. The return value might be smaller than 'now',
+ * if 'limit' is reached */
+static long long
+sweep_bucket(struct conntrack *ct, struct conntrack_bucket *ctb, long long now,
+ size_t limit)
+ OVS_REQUIRES(ctb->lock)
+{
+ struct conn *conn, *next;
+ long long min_expiration = LLONG_MAX;
+ unsigned i;
+ size_t count = 0;
+
+ for (i = 0; i < N_CT_TM; i++) {
+ LIST_FOR_EACH_SAFE (conn, next, exp_node, &ctb->exp_lists[i]) {
+ if (!conn_expired(conn, now) || count >= limit) {
+ min_expiration = MIN(min_expiration, conn->expiration);
+ if (count >= limit) {
+ /* Do not check other lists. */
+ COVERAGE_INC(conntrack_long_cleanup);
+ return min_expiration;
+ }
+ break;
+ }
+ ovs_list_remove(&conn->exp_node);
+ hmap_remove(&ctb->connections, &conn->node);
+ atomic_count_dec(&ct->n_conn);
+ delete_conn(conn);
+ count++;
+ }
+ }
+
+ return min_expiration;
+}
+
+/* Cleans up old connection entries from 'ct'. Returns the time when the
+ * next expiration might happen. The return value might be smaller than
+ * 'now', meaning that an internal limit has been reached, and some expired
+ * connections have not been deleted. */
+static long long
+conntrack_clean(struct conntrack *ct, long long now)
+{
+ long long next_wakeup = now + CT_TM_MIN;
+ unsigned int n_conn_limit;
+ size_t clean_count = 0;
+ unsigned i;
+
+ atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
+
+ for (i = 0; i < CONNTRACK_BUCKETS; i++) {
+ struct conntrack_bucket *ctb = &ct->buckets[i];
+ size_t prev_count;
+ long long min_exp;
+
+ ovs_mutex_lock(&ctb->cleanup_mutex);
+ if (ctb->next_cleanup > now) {
+ goto next_bucket;
+ }
+
+ ct_lock_lock(&ctb->lock);
+ prev_count = hmap_count(&ctb->connections);
+ /* If the connections are well distributed among buckets, we want to
+ * limit to 10% of the global limit equally split among buckets. If
+ * the bucket is busier than the others, we limit to 10% of its
+ * current size. */
+ min_exp = sweep_bucket(ct, ctb, now,
+ MAX(prev_count/10, n_conn_limit/(CONNTRACK_BUCKETS*10)));
+ clean_count += prev_count - hmap_count(&ctb->connections);
+
+ if (min_exp > now) {
+ /* We call hmap_shrink() only if sweep_bucket() managed to delete
+ * every expired connection. */
+ hmap_shrink(&ctb->connections);
+ }
+
+ ct_lock_unlock(&ctb->lock);
+
+ ctb->next_cleanup = MIN(min_exp, now + CT_TM_MIN);
+
+next_bucket:
+ next_wakeup = MIN(next_wakeup, ctb->next_cleanup);
+ ovs_mutex_unlock(&ctb->cleanup_mutex);
+ }
+
+ VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec",
+ clean_count, time_msec() - now);
+
+ return next_wakeup;
+}
+
+/* Cleanup:
+ *
+ *
+ * We must call conntrack_clean() periodically. conntrack_clean() return
+ * value gives an hint on when the next cleanup must be done (either because
+ * there is an actual connection that expires, or because a new connection
+ * might be created with the minimum timeout).
+ *
+ * The logic below has two goals:
+ *
+ * - Avoid calling conntrack_clean() too often. If we call conntrack_clean()
+ * each time a connection expires, the thread will consume 100% CPU, so we
+ * try to call the function _at most_ once every CT_CLEAN_INTERVAL, to batch
+ * removal.
+ *
+ * - On the other hand, it's not a good idea to keep the buckets locked for
+ * too long, as we might prevent traffic from flowing. If conntrack_clean()
+ * returns a value which is in the past, it means that the internal limit
+ * has been reached and more cleanup is required. In this case, just wait
+ * CT_CLEAN_MIN_INTERVAL before the next call.
+ */
+#define CT_CLEAN_INTERVAL 5000 /* 5 seconds */
+#define CT_CLEAN_MIN_INTERVAL 200 /* 0.2 seconds */
+
+static void *
+clean_thread_main(void *f_)
+{
+ struct conntrack *ct = f_;
+
+ while (!latch_is_set(&ct->clean_thread_exit)) {
+ long long next_wake;
+ long long now = time_msec();
+
+ next_wake = conntrack_clean(ct, now);
+
+ if (next_wake < now) {
+ poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL);
+ } else {
+ poll_timer_wait_until(MAX(next_wake, now + CT_CLEAN_INTERVAL));
+ }
+ latch_wait(&ct->clean_thread_exit);
+ poll_block();
+ }
+
+ return NULL;
+}
+\f
/* Key extraction */
/* The function stores a pointer to the first byte after the header in
}
static enum ct_update_res
-conn_update(struct conn *conn, struct dp_packet *pkt, bool reply,
- long long now)
+conn_update(struct conn *conn, struct conntrack_bucket *ctb,
+ struct dp_packet *pkt, bool reply, long long now)
{
- return l4_protos[conn->key.nw_proto]->conn_update(conn, pkt, reply, now);
+ return l4_protos[conn->key.nw_proto]->conn_update(conn, ctb, pkt,
+ reply, now);
}
static bool
}
static struct conn *
-new_conn(struct dp_packet *pkt, struct conn_key *key, long long now)
+new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt,
+ struct conn_key *key, long long now)
{
struct conn *newconn;
- newconn = l4_protos[key->nw_proto]->new_conn(pkt, now);
+ newconn = l4_protos[key->nw_proto]->new_conn(ctb, pkt, now);
if (newconn) {
newconn->key = *key;
#include <stdbool.h>
+#include "latch.h"
#include "odp-netlink.h"
#include "openvswitch/hmap.h"
+#include "openvswitch/list.h"
#include "openvswitch/thread.h"
#include "openvswitch/types.h"
#include "ovs-atomic.h"
struct conntrack;
void conntrack_init(struct conntrack *);
-void conntrack_run(struct conntrack *);
void conntrack_destroy(struct conntrack *);
int conntrack_execute(struct conntrack *, struct dp_packet_batch *,
CT_TIMEOUT(OTHER_MULTIPLE, 60 * 1000) \
CT_TIMEOUT(OTHER_BIDIR, 30 * 1000) \
+/* The smallest of the above values: it is used as an upper bound for the
+ * interval between two rounds of cleanup of expired entries */
+#define CT_TM_MIN (30 * 1000)
+
+#define CT_TIMEOUT(NAME, VAL) BUILD_ASSERT_DECL(VAL >= CT_TM_MIN);
+ CT_TIMEOUTS
+#undef CT_TIMEOUT
+
enum ct_timeout {
#define CT_TIMEOUT(NAME, VALUE) CT_TM_##NAME,
CT_TIMEOUTS
*
* The connections are kept in different buckets, which are completely
* independent. The connection bucket is determined by the hash of its key.
+ *
+ * Each bucket has two locks. Acquisition order is, from outermost to
+ * innermost:
+ *
+ * cleanup_mutex
+ * lock
+ *
* */
struct conntrack_bucket {
+ /* Protects 'connections' and 'exp_lists'. Used in the fast path */
struct ct_lock lock;
+ /* Contains the connections in the bucket, indexed by 'struct conn_key' */
struct hmap connections OVS_GUARDED;
+ /* For each possible timeout we have a list of connections. When the
+ * timeout of a connection is updated, we move it to the back of the list.
+ * Since the connection in a list have the same relative timeout, the list
+ * will be ordered, with the oldest connections to the front. */
+ struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
+
+ /* Protects 'next_cleanup'. Used to make sure that there's only one thread
+ * performing the cleanup. */
+ struct ovs_mutex cleanup_mutex;
+ long long next_cleanup OVS_GUARDED;
};
#define CONNTRACK_BUCKETS_SHIFT 8
/* Salt for hashing a connection key. */
uint32_t hash_basis;
+ /* The thread performing periodic cleanup of the connection
+ * tracker */
+ pthread_t clean_thread;
+ /* Latch to destroy the 'clean_thread' */
+ struct latch clean_thread_exit;
+
/* Number of connections currently in the connection tracker. */
atomic_count n_conn;
/* Connections limit. When this limit is reached, no new connection