]> git.proxmox.com Git - ovs.git/commitdiff
dpif-netdev: Polling threads directly call ofproto upcall functions.
authorRyan Wilson <wryan@nicira.com>
Sat, 26 Jul 2014 06:51:55 +0000 (06:51 +0000)
committerEthan Jackson <ethan@nicira.com>
Fri, 25 Jul 2014 23:19:40 +0000 (16:19 -0700)
Typically, kernel datapath threads send upcalls to userspace where
handler threads process the upcalls. For TAP and DPDK devices, the
datapath threads operate in userspace, so there is no need for
separate handler threads.

This patch allows userspace datapath threads to directly call the
ofproto upcall functions, eliminating the need for handler threads
for datapaths of type 'netdev'.

Signed-off-by: Ryan Wilson <wryan@nicira.com>
Signed-off-by: Ethan Jackson <ethan@nicira.com>
Acked-by: Ethan Jackson <ethan@nicira.com>
lib/dpif-linux.c
lib/dpif-netdev.c
lib/dpif-netdev.h
lib/dpif-provider.h
lib/dpif.c
lib/dpif.h
ofproto/ofproto-dpif-upcall.c
ofproto/ofproto-dpif-upcall.h

index 447573186f1dffa26d0779ec870e396443966be3..b98413da4086460d11cb298b644f36112521899e 100644 (file)
@@ -1934,6 +1934,9 @@ const struct dpif_class dpif_linux_class = {
     dpif_linux_recv,
     dpif_linux_recv_wait,
     dpif_linux_recv_purge,
+    NULL,                       /* register_upcall_cb */
+    NULL,                       /* enable_upcall */
+    NULL,                       /* disable_upcall */
 };
 \f
 static int
index 8422c897501263856a631b2e10d58adab78db2c4..20813a8fd9087cc82bad4be779dc3536b145b2e5 100644 (file)
@@ -78,11 +78,6 @@ DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
 /* Configuration parameters. */
 enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
 
-/* Queues. */
-enum { MAX_QUEUE_LEN = 128 };   /* Maximum number of packets per queue. */
-enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 };
-BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN));
-
 /* Protects against changes to 'dp_netdevs'. */
 static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;
 
@@ -90,27 +85,15 @@ static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;
 static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex)
     = SHASH_INITIALIZER(&dp_netdevs);
 
-struct dp_netdev_upcall {
-    struct dpif_upcall upcall;  /* Queued upcall information. */
-    struct ofpbuf buf;          /* ofpbuf instance for upcall.packet. */
-};
-
-/* A queue passing packets from a struct dp_netdev to its clients (handlers).
- *
- *
- * Thread-safety
- * =============
- *
- * Any access at all requires the owning 'dp_netdev''s queue_rwlock and
- * its own mutex. */
 struct dp_netdev_queue {
-    struct ovs_mutex mutex;
-    struct seq *seq;      /* Incremented whenever a packet is queued. */
-    struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED;
-    unsigned int head OVS_GUARDED;
-    unsigned int tail OVS_GUARDED;
+    unsigned int packet_count;
+
+    struct dpif_upcall upcalls[NETDEV_MAX_RX_BATCH];
+    struct ofpbuf bufs[NETDEV_MAX_RX_BATCH];
 };
 
+#define DP_NETDEV_QUEUE_INITIALIZER { .packet_count = 0 }
+
 /* Datapath based on the network device interface from netdev.h.
  *
  *
@@ -125,11 +108,11 @@ struct dp_netdev_queue {
  *    dp_netdev_mutex (global)
  *    port_mutex
  *    flow_mutex
- *    queue_rwlock
  */
 struct dp_netdev {
     const struct dpif_class *const class;
     const char *const name;
+    struct dpif *dpif;
     struct ovs_refcount ref_cnt;
     atomic_flag destroyed;
 
@@ -142,15 +125,6 @@ struct dp_netdev {
     struct classifier cls;
     struct cmap flow_table OVS_GUARDED; /* Flow table. */
 
-    /* Queues.
-     *
-     * 'queue_rwlock' protects the modification of 'handler_queues' and
-     * 'n_handlers'.  The queue elements are protected by its
-     * 'handler_queues''s mutex. */
-    struct fat_rwlock queue_rwlock;
-    struct dp_netdev_queue *handler_queues;
-    uint32_t n_handlers;
-
     /* Statistics.
      *
      * ovsthread_stats is internally synchronized. */
@@ -163,6 +137,11 @@ struct dp_netdev {
     struct cmap ports;
     struct seq *port_seq;       /* Incremented whenever a port changes. */
 
+    /* Protects access to ofproto-dpif-upcall interface during revalidator
+     * thread synchronization. */
+    struct fat_rwlock upcall_rwlock;
+    exec_upcall_cb *upcall_cb;  /* Callback function for executing upcalls. */
+
     /* Forwarding threads. */
     struct latch exit_latch;
     struct pmd_thread *pmd_threads;
@@ -339,14 +318,14 @@ static int do_add_port(struct dp_netdev *dp, const char *devname,
     OVS_REQUIRES(dp->port_mutex);
 static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *)
     OVS_REQUIRES(dp->port_mutex);
-static void dp_netdev_destroy_all_queues(struct dp_netdev *dp)
-    OVS_REQ_WRLOCK(dp->queue_rwlock);
 static int dpif_netdev_open(const struct dpif_class *, const char *name,
                             bool create, struct dpif **);
-static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *,
-                                      int queue_no, int type,
-                                      const struct miniflow *,
-                                      const struct nlattr *userdata);
+static int dp_netdev_queue_userspace_packet(struct dp_netdev_queue *,
+                                            struct ofpbuf *, int type,
+                                            const struct miniflow *,
+                                            const struct nlattr *);
+static void dp_netdev_execute_userspace_queue(struct dp_netdev_queue *,
+                                              struct dp_netdev *);
 static void dp_netdev_execute_actions(struct dp_netdev *dp,
                                       struct dpif_packet **, int c,
                                       bool may_steal, struct pkt_metadata *,
@@ -357,6 +336,7 @@ static void dp_netdev_port_input(struct dp_netdev *dp,
                                  odp_port_t port_no);
 
 static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
+static void dp_netdev_disable_upcall(struct dp_netdev *);
 
 static struct dpif_netdev *
 dpif_netdev_cast(const struct dpif *dpif)
@@ -484,14 +464,17 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     classifier_init(&dp->cls, NULL);
     cmap_init(&dp->flow_table);
 
-    fat_rwlock_init(&dp->queue_rwlock);
-
     ovsthread_stats_init(&dp->stats);
 
     ovs_mutex_init(&dp->port_mutex);
     cmap_init(&dp->ports);
     dp->port_seq = seq_create();
     latch_init(&dp->exit_latch);
+    fat_rwlock_init(&dp->upcall_rwlock);
+
+    /* Disable upcalls by default. */
+    dp_netdev_disable_upcall(dp);
+    dp->upcall_cb = NULL;
 
     ovs_mutex_lock(&dp->port_mutex);
     error = do_add_port(dp, name, "internal", ODPP_LOCAL);
@@ -523,31 +506,13 @@ dpif_netdev_open(const struct dpif_class *class, const char *name,
     }
     if (!error) {
         *dpifp = create_dpif_netdev(dp);
+        dp->dpif = *dpifp;
     }
     ovs_mutex_unlock(&dp_netdev_mutex);
 
     return error;
 }
 
-static void
-dp_netdev_purge_queues(struct dp_netdev *dp)
-    OVS_REQ_WRLOCK(dp->queue_rwlock)
-{
-    int i;
-
-    for (i = 0; i < dp->n_handlers; i++) {
-        struct dp_netdev_queue *q = &dp->handler_queues[i];
-
-        ovs_mutex_lock(&q->mutex);
-        while (q->tail != q->head) {
-            struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
-            ofpbuf_uninit(&u->upcall.packet);
-            ofpbuf_uninit(&u->buf);
-        }
-        ovs_mutex_unlock(&q->mutex);
-    }
-}
-
 /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
  * through the 'dp_netdevs' shash while freeing 'dp'. */
 static void
@@ -576,17 +541,12 @@ dp_netdev_free(struct dp_netdev *dp)
     }
     ovsthread_stats_destroy(&dp->stats);
 
-    fat_rwlock_wrlock(&dp->queue_rwlock);
-    dp_netdev_destroy_all_queues(dp);
-    fat_rwlock_unlock(&dp->queue_rwlock);
-
-    fat_rwlock_destroy(&dp->queue_rwlock);
-
     classifier_destroy(&dp->cls);
     cmap_destroy(&dp->flow_table);
     ovs_mutex_destroy(&dp->flow_mutex);
     seq_destroy(dp->port_seq);
     cmap_destroy(&dp->ports);
+    fat_rwlock_destroy(&dp->upcall_rwlock);
     latch_destroy(&dp->exit_latch);
     free(CONST_CAST(char *, dp->name));
     free(dp);
@@ -1559,80 +1519,6 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
     return 0;
 }
 
-static void
-dp_netdev_destroy_all_queues(struct dp_netdev *dp)
-    OVS_REQ_WRLOCK(dp->queue_rwlock)
-{
-    size_t i;
-
-    dp_netdev_purge_queues(dp);
-
-    for (i = 0; i < dp->n_handlers; i++) {
-        struct dp_netdev_queue *q = &dp->handler_queues[i];
-
-        ovs_mutex_destroy(&q->mutex);
-        seq_destroy(q->seq);
-    }
-    free(dp->handler_queues);
-    dp->handler_queues = NULL;
-    dp->n_handlers = 0;
-}
-
-static void
-dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers)
-    OVS_REQ_WRLOCK(dp->queue_rwlock)
-{
-    if (dp->n_handlers != n_handlers) {
-        size_t i;
-
-        dp_netdev_destroy_all_queues(dp);
-
-        dp->n_handlers = n_handlers;
-        dp->handler_queues = xzalloc(n_handlers * sizeof *dp->handler_queues);
-
-        for (i = 0; i < n_handlers; i++) {
-            struct dp_netdev_queue *q = &dp->handler_queues[i];
-
-            ovs_mutex_init(&q->mutex);
-            q->seq = seq_create();
-        }
-    }
-}
-
-static int
-dpif_netdev_recv_set(struct dpif *dpif, bool enable)
-{
-    struct dp_netdev *dp = get_dp_netdev(dpif);
-
-    if ((dp->handler_queues != NULL) == enable) {
-        return 0;
-    }
-
-    fat_rwlock_wrlock(&dp->queue_rwlock);
-    if (!enable) {
-        dp_netdev_destroy_all_queues(dp);
-    } else {
-        dp_netdev_refresh_queues(dp, 1);
-    }
-    fat_rwlock_unlock(&dp->queue_rwlock);
-
-    return 0;
-}
-
-static int
-dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers)
-{
-    struct dp_netdev *dp = get_dp_netdev(dpif);
-
-    fat_rwlock_wrlock(&dp->queue_rwlock);
-    if (dp->handler_queues) {
-        dp_netdev_refresh_queues(dp, n_handlers);
-    }
-    fat_rwlock_unlock(&dp->queue_rwlock);
-
-    return 0;
-}
-
 static int
 dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
                               uint32_t queue_id, uint32_t *priority)
@@ -1641,97 +1527,6 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
     return 0;
 }
 
-static bool
-dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t handler_id)
-    OVS_REQ_RDLOCK(dp->queue_rwlock)
-{
-    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
-
-    if (!dp->handler_queues) {
-        VLOG_WARN_RL(&rl, "receiving upcall disabled");
-        return false;
-    }
-
-    if (handler_id >= dp->n_handlers) {
-        VLOG_WARN_RL(&rl, "handler index out of bound");
-        return false;
-    }
-
-    return true;
-}
-
-static int
-dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id,
-                 struct dpif_upcall *upcall, struct ofpbuf *buf)
-{
-    struct dp_netdev *dp = get_dp_netdev(dpif);
-    struct dp_netdev_queue *q;
-    int error = 0;
-
-    fat_rwlock_rdlock(&dp->queue_rwlock);
-
-    if (!dp_netdev_recv_check(dp, handler_id)) {
-        error = EAGAIN;
-        goto out;
-    }
-
-    q = &dp->handler_queues[handler_id];
-    ovs_mutex_lock(&q->mutex);
-    if (q->head != q->tail) {
-        struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
-
-        *upcall = u->upcall;
-
-        ofpbuf_uninit(buf);
-        *buf = u->buf;
-    } else {
-        error = EAGAIN;
-    }
-    ovs_mutex_unlock(&q->mutex);
-
-out:
-    fat_rwlock_unlock(&dp->queue_rwlock);
-
-    return error;
-}
-
-static void
-dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id)
-{
-    struct dp_netdev *dp = get_dp_netdev(dpif);
-    struct dp_netdev_queue *q;
-    uint64_t seq;
-
-    fat_rwlock_rdlock(&dp->queue_rwlock);
-
-    if (!dp_netdev_recv_check(dp, handler_id)) {
-        goto out;
-    }
-
-    q = &dp->handler_queues[handler_id];
-    ovs_mutex_lock(&q->mutex);
-    seq = seq_read(q->seq);
-    if (q->head != q->tail) {
-        poll_immediate_wake();
-    } else {
-        seq_wait(q->seq, seq);
-    }
-
-    ovs_mutex_unlock(&q->mutex);
-
-out:
-    fat_rwlock_unlock(&dp->queue_rwlock);
-}
-
-static void
-dpif_netdev_recv_purge(struct dpif *dpif)
-{
-    struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
-
-    fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock);
-    dp_netdev_purge_queues(dpif_netdev->dp);
-    fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock);
-}
 \f
 /* Creates and returns a new 'struct dp_netdev_actions', with a reference count
  * of 1, whose actions are a copy of from the 'ofpacts_len' bytes of
@@ -1918,6 +1713,36 @@ reload:
     return NULL;
 }
 
+static void
+dp_netdev_disable_upcall(struct dp_netdev *dp)
+    OVS_ACQUIRES(dp->upcall_rwlock)
+{
+    fat_rwlock_wrlock(&dp->upcall_rwlock);
+}
+
+static void
+dpif_netdev_disable_upcall(struct dpif *dpif)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    dp_netdev_disable_upcall(dp);
+}
+
+static void
+dp_netdev_enable_upcall(struct dp_netdev *dp)
+    OVS_RELEASES(dp->upcall_rwlock)
+{
+    fat_rwlock_unlock(&dp->upcall_rwlock);
+}
+
+static void
+dpif_netdev_enable_upcall(struct dpif *dpif)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    dp_netdev_enable_upcall(dp);
+}
+
 static void
 dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
 {
@@ -2056,6 +1881,7 @@ static void
 dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
                 struct pkt_metadata *md)
 {
+    struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER;
     struct packet_batch batches[NETDEV_MAX_RX_BATCH];
     struct netdev_flow_key keys[NETDEV_MAX_RX_BATCH];
     const struct miniflow *mfs[NETDEV_MAX_RX_BATCH]; /* NULL at bad packets. */
@@ -2087,17 +1913,11 @@ dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
         }
 
         if (OVS_UNLIKELY(!rules[i])) {
+            struct ofpbuf *buf = &packets[i]->ofpbuf;
 
             dp_netdev_count_packet(dp, DP_STAT_MISS, 1);
-
-            if (OVS_LIKELY(dp->handler_queues)) {
-                uint32_t hash = miniflow_hash_5tuple(mfs[i], 0);
-                struct ofpbuf *buf = &packets[i]->ofpbuf;
-
-                dp_netdev_output_userspace(dp, buf, hash % dp->n_handlers,
-                                           DPIF_UC_MISS, mfs[i], NULL);
-            }
-
+            dp_netdev_queue_userspace_packet(&q, buf, DPIF_UC_MISS,
+                                             mfs[i], NULL);
             dpif_packet_delete(packets[i]);
             continue;
         }
@@ -2127,6 +1947,10 @@ dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
     for (i = 0; i < n_batches; i++) {
         packet_batch_execute(&batches[i], dp);
     }
+
+    if (q.packet_count) {
+        dp_netdev_execute_userspace_queue(&q, dp);
+    }
 }
 
 static void
@@ -2145,12 +1969,11 @@ dp_netdev_queue_userspace_packet(struct dp_netdev_queue *q,
                                  struct ofpbuf *packet, int type,
                                  const struct miniflow *key,
                                  const struct nlattr *userdata)
-OVS_REQUIRES(q->mutex)
 {
-    if (q->head - q->tail < MAX_QUEUE_LEN) {
-        struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK];
-        struct dpif_upcall *upcall = &u->upcall;
-        struct ofpbuf *buf = &u->buf;
+    if (q->packet_count < NETDEV_MAX_RX_BATCH) {
+        int cnt = q->packet_count;
+        struct dpif_upcall *upcall = &q->upcalls[cnt];
+        struct ofpbuf *buf = &q->bufs[cnt];
         size_t buf_size;
         struct flow flow;
         void *data;
@@ -2174,7 +1997,7 @@ OVS_REQUIRES(q->mutex)
         /* Put userdata. */
         if (userdata) {
             upcall->userdata = ofpbuf_put(buf, userdata,
-                    NLA_ALIGN(userdata->nla_len));
+                                          NLA_ALIGN(userdata->nla_len));
         }
 
         /* We have to perform a copy of the packet, because we cannot send DPDK
@@ -2184,41 +2007,46 @@ OVS_REQUIRES(q->mutex)
         ofpbuf_use_stub(&upcall->packet, data, ofpbuf_size(packet));
         ofpbuf_set_size(&upcall->packet, ofpbuf_size(packet));
 
-        seq_change(q->seq);
-
+        q->packet_count++;
         return 0;
     } else {
         return ENOBUFS;
     }
 }
 
-static int
-dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
-                           int queue_no, int type,
-                           const struct miniflow *key,
-                           const struct nlattr *userdata)
+static void
+dp_netdev_execute_userspace_queue(struct dp_netdev_queue *q,
+                                  struct dp_netdev *dp)
 {
-    struct dp_netdev_queue *q;
-    int error;
+    struct dpif_upcall *upcalls = q->upcalls;
+    struct ofpbuf *bufs = q->bufs;
+    int cnt = q->packet_count;
 
-    fat_rwlock_rdlock(&dp->queue_rwlock);
-    q = &dp->handler_queues[queue_no];
-    ovs_mutex_lock(&q->mutex);
-    error = dp_netdev_queue_userspace_packet(q, packet, type, key,
-                                             userdata);
-    if (error == ENOBUFS) {
-        dp_netdev_count_packet(dp, DP_STAT_LOST, 1);
-    }
-    ovs_mutex_unlock(&q->mutex);
-    fat_rwlock_unlock(&dp->queue_rwlock);
+    if (!fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
+        ovs_assert(dp->upcall_cb);
+        dp->upcall_cb(dp->dpif, upcalls, bufs, cnt);
+        fat_rwlock_unlock(&dp->upcall_rwlock);
+    } else {
+        int i;
 
-    return error;
+        for (i = 0; i < cnt; i++) {
+            ofpbuf_uninit(&bufs[i]);
+            ofpbuf_uninit(&upcalls[i].packet);
+        }
+    }
 }
 
 struct dp_netdev_execute_aux {
     struct dp_netdev *dp;
 };
 
+static void
+dpif_netdev_register_upcall_cb(struct dpif *dpif, exec_upcall_cb *cb)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    dp->upcall_cb = cb;
+}
+
 static void
 dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
               struct pkt_metadata *md,
@@ -2246,6 +2074,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
     case OVS_ACTION_ATTR_USERSPACE: {
         const struct nlattr *userdata;
         struct netdev_flow_key key;
+        struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER;
 
         userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
 
@@ -2258,15 +2087,17 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
 
             miniflow_extract(packet, md, &key.flow);
 
-            dp_netdev_output_userspace(aux->dp, packet,
-                                       miniflow_hash_5tuple(&key.flow, 0)
-                                           % aux->dp->n_handlers,
-                                       DPIF_UC_ACTION, &key.flow,
-                                       userdata);
+            dp_netdev_queue_userspace_packet(&q, packet,
+                                             DPIF_UC_ACTION, &key.flow,
+                                             userdata);
             if (may_steal) {
                 dpif_packet_delete(packets[i]);
             }
         }
+
+        if (q.packet_count) {
+            dp_netdev_execute_userspace_queue(&q, aux->dp);
+        }
         break;
     }
 
@@ -2392,12 +2223,15 @@ const struct dpif_class dpif_netdev_class = {
     dpif_netdev_flow_dump_next,
     dpif_netdev_execute,
     NULL,                       /* operate */
-    dpif_netdev_recv_set,
-    dpif_netdev_handlers_set,
+    NULL,                       /* recv_set */
+    NULL,                       /* handlers_set */
     dpif_netdev_queue_to_priority,
-    dpif_netdev_recv,
-    dpif_netdev_recv_wait,
-    dpif_netdev_recv_purge,
+    NULL,                       /* recv */
+    NULL,                       /* recv_wait */
+    NULL,                       /* recv_purge */
+    dpif_netdev_register_upcall_cb,
+    dpif_netdev_enable_upcall,
+    dpif_netdev_disable_upcall,
 };
 
 static void
index 0f42d7a947e92305b029dcf09361688a22d4957d..410fcfa1554f6490d25f549562c2b06cd7f12c25 100644 (file)
@@ -20,6 +20,7 @@
 #include <stdbool.h>
 #include <stddef.h>
 #include <stdint.h>
+#include "dpif.h"
 #include "openvswitch/types.h"
 #include "ofpbuf.h"
 #include "packets.h"
index 6a06cf8dd89496c3a082fa318b0e8f9d9d727126..bf24a9d04a0430c3699cb3fbc965fb040592959b 100644 (file)
@@ -411,6 +411,22 @@ struct dpif_class {
     /* Throws away any queued upcalls that 'dpif' currently has ready to
      * return. */
     void (*recv_purge)(struct dpif *dpif);
+
+    /* For datapaths that run in userspace (i.e. dpif-netdev), threads polling
+     * for incoming packets can directly call upcall functions instead of
+     * offloading packet processing to separate handler threads. Datapaths
+     * that directly call upcall functions should use the functions below to
+     * to register an upcall function and enable / disable upcalls.
+     *
+     * Registers an upcall callback function with 'dpif'. This is only used if
+     * if 'dpif' directly executes upcall functions. */
+    void (*register_upcall_cb)(struct dpif *, exec_upcall_cb *);
+
+    /* Enables upcalls if 'dpif' directly executes upcall functions. */
+    void (*enable_upcall)(struct dpif *);
+
+    /* Disables upcalls if 'dpif' directly executes upcall functions. */
+    void (*disable_upcall)(struct dpif *);
 };
 
 extern const struct dpif_class dpif_linux_class;
index a3258057ced6e3cd2b131ec8d23483db45bc1f0a..35015694724e014884b11332ddd9064ef4f77de2 100644 (file)
@@ -1305,8 +1305,12 @@ dpif_upcall_type_to_string(enum dpif_upcall_type type)
 int
 dpif_recv_set(struct dpif *dpif, bool enable)
 {
-    int error = dpif->dpif_class->recv_set(dpif, enable);
-    log_operation(dpif, "recv_set", error);
+    int error = 0;
+
+    if (dpif->dpif_class->recv_set) {
+        error = dpif->dpif_class->recv_set(dpif, enable);
+        log_operation(dpif, "recv_set", error);
+    }
     return error;
 }
 
@@ -1333,11 +1337,61 @@ dpif_recv_set(struct dpif *dpif, bool enable)
 int
 dpif_handlers_set(struct dpif *dpif, uint32_t n_handlers)
 {
-    int error = dpif->dpif_class->handlers_set(dpif, n_handlers);
-    log_operation(dpif, "handlers_set", error);
+    int error = 0;
+
+    if (dpif->dpif_class->handlers_set) {
+        error = dpif->dpif_class->handlers_set(dpif, n_handlers);
+        log_operation(dpif, "handlers_set", error);
+    }
     return error;
 }
 
+void
+dpif_register_upcall_cb(struct dpif *dpif, exec_upcall_cb *cb)
+{
+    if (dpif->dpif_class->register_upcall_cb) {
+        dpif->dpif_class->register_upcall_cb(dpif, cb);
+    }
+}
+
+void
+dpif_enable_upcall(struct dpif *dpif)
+{
+    if (dpif->dpif_class->enable_upcall) {
+        dpif->dpif_class->enable_upcall(dpif);
+    }
+}
+
+void
+dpif_disable_upcall(struct dpif *dpif)
+{
+    if (dpif->dpif_class->disable_upcall) {
+        dpif->dpif_class->disable_upcall(dpif);
+    }
+}
+
+void
+dpif_print_packet(struct dpif *dpif, struct dpif_upcall *upcall)
+{
+    if (!VLOG_DROP_DBG(&dpmsg_rl)) {
+        struct ds flow;
+        char *packet;
+
+        packet = ofp_packet_to_string(ofpbuf_data(&upcall->packet),
+                                      ofpbuf_size(&upcall->packet));
+
+        ds_init(&flow);
+        odp_flow_key_format(upcall->key, upcall->key_len, &flow);
+
+        VLOG_DBG("%s: %s upcall:\n%s\n%s",
+                 dpif_name(dpif), dpif_upcall_type_to_string(upcall->type),
+                 ds_cstr(&flow), packet);
+
+        ds_destroy(&flow);
+        free(packet);
+    }
+}
+
 /* Polls for an upcall from 'dpif' for an upcall handler.  Since there
  * there can be multiple poll loops, 'handler_id' is needed as index to
  * identify the corresponding poll loop.  If successful, stores the upcall
@@ -1360,25 +1414,15 @@ int
 dpif_recv(struct dpif *dpif, uint32_t handler_id, struct dpif_upcall *upcall,
           struct ofpbuf *buf)
 {
-    int error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf);
-    if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) {
-        struct ds flow;
-        char *packet;
+    int error = EAGAIN;
 
-        packet = ofp_packet_to_string(ofpbuf_data(&upcall->packet),
-                                      ofpbuf_size(&upcall->packet));
-
-        ds_init(&flow);
-        odp_flow_key_format(upcall->key, upcall->key_len, &flow);
-
-        VLOG_DBG("%s: %s upcall:\n%s\n%s",
-                 dpif_name(dpif), dpif_upcall_type_to_string(upcall->type),
-                 ds_cstr(&flow), packet);
-
-        ds_destroy(&flow);
-        free(packet);
-    } else if (error && error != EAGAIN) {
-        log_operation(dpif, "recv", error);
+    if (dpif->dpif_class->recv) {
+        error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf);
+        if (!error) {
+            dpif_print_packet(dpif, upcall);
+        } else if (error != EAGAIN) {
+            log_operation(dpif, "recv", error);
+        }
     }
     return error;
 }
@@ -1401,7 +1445,9 @@ dpif_recv_purge(struct dpif *dpif)
 void
 dpif_recv_wait(struct dpif *dpif, uint32_t handler_id)
 {
-    dpif->dpif_class->recv_wait(dpif, handler_id);
+    if (dpif->dpif_class->recv_wait) {
+        dpif->dpif_class->recv_wait(dpif, handler_id);
+    }
 }
 
 /* Obtains the NetFlow engine type and engine ID for 'dpif' into '*engine_type'
index 94bcacc29cbb2e60e020584f634c1bf9f57dd887..8d8e43a26b1b97cf58a287fcbc1df1d294c1a6f4 100644 (file)
@@ -671,12 +671,20 @@ struct dpif_upcall {
     struct nlattr *userdata;    /* Argument to OVS_ACTION_ATTR_USERSPACE. */
 };
 
+typedef void exec_upcall_cb(struct dpif *, struct dpif_upcall *,
+                            struct ofpbuf *, int cnt);
+
 int dpif_recv_set(struct dpif *, bool enable);
 int dpif_handlers_set(struct dpif *, uint32_t n_handlers);
 int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *,
               struct ofpbuf *);
 void dpif_recv_purge(struct dpif *);
 void dpif_recv_wait(struct dpif *, uint32_t handler_id);
+void dpif_register_upcall_cb(struct dpif *, exec_upcall_cb *);
+void dpif_enable_upcall(struct dpif *);
+void dpif_disable_upcall(struct dpif *);
+
+void dpif_print_packet(struct dpif *, struct dpif_upcall *);
 \f
 /* Miscellaneous. */
 
index df33643c83ee87ade02a7a28db07fab2023ede4c..f00c17f457d29bf665fca3232a448f3c248ef127 100644 (file)
@@ -40,7 +40,7 @@
 #include "vlog.h"
 
 #define MAX_QUEUE_LENGTH 512
-#define UPCALL_MAX_BATCH 50
+#define UPCALL_MAX_BATCH 64
 #define REVALIDATE_MAX_BATCH 50
 
 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
@@ -201,7 +201,9 @@ static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
 
 static size_t read_upcalls(struct handler *,
                            struct upcall upcalls[UPCALL_MAX_BATCH]);
-static void handle_upcalls(struct handler *, struct upcall *, size_t n_upcalls);
+static void free_upcall(struct upcall *);
+static int convert_upcall(struct udpif *, struct upcall *);
+static void handle_upcalls(struct udpif *, struct upcall *, size_t n_upcalls);
 static void udpif_stop_threads(struct udpif *);
 static void udpif_start_threads(struct udpif *, size_t n_handlers,
                                 size_t n_revalidators);
@@ -266,6 +268,8 @@ udpif_create(struct dpif_backer *backer, struct dpif *dpif)
     atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
     ovs_mutex_init(&udpif->n_flows_mutex);
 
+    dpif_register_upcall_cb(dpif, exec_upcalls);
+
     return udpif;
 }
 
@@ -317,6 +321,8 @@ udpif_stop_threads(struct udpif *udpif)
             xpthread_join(udpif->revalidators[i].thread, NULL);
         }
 
+        dpif_disable_upcall(udpif->dpif);
+
         for (i = 0; i < udpif->n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
 
@@ -367,6 +373,8 @@ udpif_start_threads(struct udpif *udpif, size_t n_handlers,
                 "handler", udpif_upcall_handler, handler);
         }
 
+        dpif_enable_upcall(udpif->dpif);
+
         ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators);
         udpif->reval_exit = false;
         udpif->revalidators = xzalloc(udpif->n_revalidators
@@ -539,12 +547,10 @@ udpif_upcall_handler(void *arg)
             latch_wait(&udpif->exit_latch);
             poll_block();
         } else {
-            handle_upcalls(handler, upcalls, n_upcalls);
+            handle_upcalls(handler->udpif, upcalls, n_upcalls);
 
             for (i = 0; i < n_upcalls; i++) {
-                xlate_out_uninit(&upcalls[i].xout);
-                ofpbuf_uninit(&upcalls[i].dpif_upcall.packet);
-                ofpbuf_uninit(&upcalls[i].upcall_buf);
+                free_upcall(&upcalls[i]);
             }
         }
         coverage_clear();
@@ -751,6 +757,63 @@ upcall_init(struct upcall *upcall, struct flow *flow, struct ofpbuf *packet,
     xlate_actions(&xin, &upcall->xout);
 }
 
+void
+free_upcall(struct upcall *upcall)
+{
+    xlate_out_uninit(&upcall->xout);
+    ofpbuf_uninit(&upcall->dpif_upcall.packet);
+    ofpbuf_uninit(&upcall->upcall_buf);
+}
+
+static struct udpif *
+find_udpif(struct dpif *dpif)
+{
+    struct udpif *udpif;
+
+    LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+        if (udpif->dpif == dpif) {
+            return udpif;
+        }
+    }
+    return NULL;
+}
+
+void
+exec_upcalls(struct dpif *dpif, struct dpif_upcall *dupcalls,
+             struct ofpbuf *bufs, int cnt)
+{
+    struct upcall upcalls[UPCALL_MAX_BATCH];
+    struct udpif *udpif;
+    int i, j;
+
+    udpif = find_udpif(dpif);
+    ovs_assert(udpif);
+
+    for (i = 0; i < cnt; i += UPCALL_MAX_BATCH) {
+        size_t n_upcalls = 0;
+        for (j = i; j < MIN(i + UPCALL_MAX_BATCH, cnt); j++) {
+            struct upcall *upcall = &upcalls[n_upcalls];
+            struct dpif_upcall *dupcall = &dupcalls[j];
+            struct ofpbuf *buf = &bufs[j];
+
+            upcall->dpif_upcall = *dupcall;
+            upcall->upcall_buf = *buf;
+
+            dpif_print_packet(dpif, dupcall);
+            if (!convert_upcall(udpif, upcall)) {
+                n_upcalls += 1;
+            }
+        }
+
+        if (n_upcalls) {
+            handle_upcalls(udpif, upcalls, n_upcalls);
+            for (j = 0; j < n_upcalls; j++) {
+                free_upcall(&upcalls[j]);
+            }
+        }
+    }
+}
+
 /* Reads and classifies upcalls.  Returns the number of upcalls successfully
  * read. */
 static size_t
@@ -764,14 +827,6 @@ read_upcalls(struct handler *handler,
     /* Try reading UPCALL_MAX_BATCH upcalls from dpif. */
     for (i = 0; i < UPCALL_MAX_BATCH; i++) {
         struct upcall *upcall = &upcalls[n_upcalls];
-        struct dpif_upcall *dupcall;
-        struct ofpbuf *packet;
-        struct ofproto_dpif *ofproto;
-        struct dpif_sflow *sflow;
-        struct dpif_ipfix *ipfix;
-        struct flow flow;
-        enum upcall_type type;
-        odp_port_t odp_in_port;
         int error;
 
         ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
@@ -783,91 +838,107 @@ read_upcalls(struct handler *handler,
             break;
         }
 
-        dupcall = &upcall->dpif_upcall;
-        packet = &dupcall->packet;
-        error = xlate_receive(udpif->backer, packet, dupcall->key,
-                              dupcall->key_len, &flow,
-                              &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
-        if (error) {
-            if (error == ENODEV) {
-                /* Received packet on datapath port for which we couldn't
-                 * associate an ofproto.  This can happen if a port is removed
-                 * while traffic is being received.  Print a rate-limited
-                 * message in case it happens frequently.  Install a drop flow
-                 * so that future packets of the flow are inexpensively dropped
-                 * in the kernel. */
-                VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
-                             "port %"PRIu32, odp_in_port);
-                dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
-                              dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
-                              NULL);
-            }
-            goto destroy_upcall;
+        if (!convert_upcall(udpif, upcall)) {
+            n_upcalls += 1;
         }
+    }
+    return n_upcalls;
+}
 
-        type = classify_upcall(upcall);
-        if (type == MISS_UPCALL) {
-            upcall_init(upcall, &flow, packet, ofproto, dupcall, odp_in_port);
-            n_upcalls++;
-            continue;
-        }
+int
+convert_upcall(struct udpif *udpif, struct upcall *upcall)
+{
+    struct dpif_upcall *dupcall = &upcall->dpif_upcall;
+    struct ofpbuf *packet = &dupcall->packet;
+    struct ofproto_dpif *ofproto;
+    struct dpif_sflow *sflow;
+    struct dpif_ipfix *ipfix;
+    struct flow flow;
+    enum upcall_type type;
+    odp_port_t odp_in_port;
+    int error;
 
-        switch (type) {
-        case SFLOW_UPCALL:
-            if (sflow) {
-                union user_action_cookie cookie;
+    error = xlate_receive(udpif->backer, packet, dupcall->key,
+                          dupcall->key_len, &flow,
+                          &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
 
-                memset(&cookie, 0, sizeof cookie);
-                memcpy(&cookie, nl_attr_get(dupcall->userdata),
-                       sizeof cookie.sflow);
-                dpif_sflow_received(sflow, packet, &flow, odp_in_port,
-                                    &cookie);
-            }
-            break;
-        case IPFIX_UPCALL:
-            if (ipfix) {
-                dpif_ipfix_bridge_sample(ipfix, packet, &flow);
-            }
-            break;
-        case FLOW_SAMPLE_UPCALL:
-            if (ipfix) {
-                union user_action_cookie cookie;
-
-                memset(&cookie, 0, sizeof cookie);
-                memcpy(&cookie, nl_attr_get(dupcall->userdata),
-                       sizeof cookie.flow_sample);
-
-                /* The flow reflects exactly the contents of the packet.
-                 * Sample the packet using it. */
-                dpif_ipfix_flow_sample(ipfix, packet, &flow,
-                                       cookie.flow_sample.collector_set_id,
-                                       cookie.flow_sample.probability,
-                                       cookie.flow_sample.obs_domain_id,
-                                       cookie.flow_sample.obs_point_id);
-            }
-            break;
-        case BAD_UPCALL:
-            break;
-        case MISS_UPCALL:
-            OVS_NOT_REACHED();
+    if (error) {
+        if (error == ENODEV) {
+            /* Received packet on datapath port for which we couldn't
+             * associate an ofproto.  This can happen if a port is removed
+             * while traffic is being received.  Print a rate-limited
+             * message in case it happens frequently.  Install a drop flow
+             * so that future packets of the flow are inexpensively dropped
+             * in the kernel. */
+            VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
+                         "port %"PRIu32, odp_in_port);
+            dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
+                          dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
+                          NULL);
         }
+        goto destroy_upcall;
+    }
 
-        dpif_ipfix_unref(ipfix);
-        dpif_sflow_unref(sflow);
+    type = classify_upcall(upcall);
+    if (type == MISS_UPCALL) {
+        upcall_init(upcall, &flow, packet, ofproto, dupcall, odp_in_port);
+        return error;
+    }
 
-destroy_upcall:
-        ofpbuf_uninit(&upcall->dpif_upcall.packet);
-        ofpbuf_uninit(&upcall->upcall_buf);
+    switch (type) {
+    case SFLOW_UPCALL:
+        if (sflow) {
+            union user_action_cookie cookie;
+
+            memset(&cookie, 0, sizeof cookie);
+            memcpy(&cookie, nl_attr_get(dupcall->userdata),
+                   sizeof cookie.sflow);
+            dpif_sflow_received(sflow, packet, &flow, odp_in_port,
+                                &cookie);
+        }
+        break;
+    case IPFIX_UPCALL:
+        if (ipfix) {
+            dpif_ipfix_bridge_sample(ipfix, packet, &flow);
+        }
+        break;
+    case FLOW_SAMPLE_UPCALL:
+        if (ipfix) {
+            union user_action_cookie cookie;
+
+            memset(&cookie, 0, sizeof cookie);
+            memcpy(&cookie, nl_attr_get(dupcall->userdata),
+                   sizeof cookie.flow_sample);
+
+            /* The flow reflects exactly the contents of the packet.
+             * Sample the packet using it. */
+            dpif_ipfix_flow_sample(ipfix, packet, &flow,
+                                   cookie.flow_sample.collector_set_id,
+                                   cookie.flow_sample.probability,
+                                   cookie.flow_sample.obs_domain_id,
+                                   cookie.flow_sample.obs_point_id);
+        }
+        break;
+    case BAD_UPCALL:
+        break;
+    case MISS_UPCALL:
+        OVS_NOT_REACHED();
     }
 
-    return n_upcalls;
+    dpif_ipfix_unref(ipfix);
+    dpif_sflow_unref(sflow);
+    error = EAGAIN;
+
+destroy_upcall:
+    ofpbuf_uninit(&upcall->dpif_upcall.packet);
+    ofpbuf_uninit(&upcall->upcall_buf);
+    return error;
 }
 
 static void
-handle_upcalls(struct handler *handler, struct upcall *upcalls,
+handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
                size_t n_upcalls)
 {
-    struct udpif *udpif = handler->udpif;
     struct dpif_op *opsp[UPCALL_MAX_BATCH * 2];
     struct dpif_op ops[UPCALL_MAX_BATCH * 2];
     size_t n_ops, i;
index 8c4b655c7dbacb9adc43bcf7bdf4357b9301d66e..2b197ada6f739ad85bae2fb26ef27eca1dbfd728 100644 (file)
@@ -19,6 +19,8 @@
 
 struct dpif;
 struct dpif_backer;
+struct dpif_upcall;
+struct ofpbuf;
 struct seq;
 struct simap;
 
@@ -26,6 +28,9 @@ struct simap;
  * them.  Additionally, it's responsible for maintaining the datapath flow
  * table. */
 
+void exec_upcalls(struct dpif *, struct dpif_upcall *, struct ofpbuf *,
+                  int cnt);
+
 struct udpif *udpif_create(struct dpif_backer *, struct dpif *);
 void udpif_run(struct udpif *udpif);
 void udpif_set_threads(struct udpif *, size_t n_handlers,