#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
-#include <netinet/in.h>
-#include <sys/socket.h>
#include <net/if.h>
+#include <netinet/in.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
+#include <sys/socket.h>
#include <sys/stat.h>
#include <unistd.h>
+#ifdef DPDK_NETDEV
+#include <rte_cycles.h>
+#endif
+
#include "bitmap.h"
#include "cmap.h"
+#include "conntrack.h"
+#include "coverage.h"
+#include "ct-dpif.h"
#include "csum.h"
#include "dp-packet.h"
#include "dpif.h"
#include "dpif-provider.h"
#include "dummy.h"
-#include "dynamic-string.h"
#include "fat-rwlock.h"
#include "flow.h"
-#include "cmap.h"
-#include "coverage.h"
+#include "hmapx.h"
#include "latch.h"
-#include "list.h"
-#include "match.h"
#include "netdev.h"
-#include "netdev-dpdk.h"
#include "netdev-vport.h"
#include "netlink.h"
#include "odp-execute.h"
#include "odp-util.h"
-#include "ofp-print.h"
-#include "ofpbuf.h"
+#include "openvswitch/dynamic-string.h"
+#include "openvswitch/list.h"
+#include "openvswitch/match.h"
+#include "openvswitch/ofp-print.h"
+#include "openvswitch/ofp-util.h"
+#include "openvswitch/ofpbuf.h"
+#include "openvswitch/shash.h"
+#include "openvswitch/vlog.h"
#include "ovs-numa.h"
#include "ovs-rcu.h"
#include "packets.h"
#include "pvector.h"
#include "random.h"
#include "seq.h"
-#include "shash.h"
+#include "smap.h"
#include "sset.h"
#include "timeval.h"
#include "tnl-neigh-cache.h"
#include "tnl-ports.h"
#include "unixctl.h"
#include "util.h"
-#include "openvswitch/vlog.h"
VLOG_DEFINE_THIS_MODULE(dpif_netdev);
static struct vlog_rate_limit upcall_rl = VLOG_RATE_LIMIT_INIT(600, 600);
+#define DP_NETDEV_CS_SUPPORTED_MASK (CS_NEW | CS_ESTABLISHED | CS_RELATED \
+ | CS_INVALID | CS_REPLY_DIR | CS_TRACKED)
+#define DP_NETDEV_CS_UNSUPPORTED_MASK (~(uint32_t)DP_NETDEV_CS_SUPPORTED_MASK)
+
static struct odp_support dp_netdev_support = {
.max_mpls_depth = SIZE_MAX,
.recirc = true,
+ .ct_state = true,
+ .ct_zone = true,
+ .ct_mark = true,
+ .ct_label = true,
};
/* Stores a miniflow with inline values */
\f
/* Simple non-wildcarding single-priority classifier. */
+/* Time in ms between successive optimizations of the dpcls subtable vector */
+#define DPCLS_OPTIMIZATION_INTERVAL 1000
+
struct dpcls {
+ struct cmap_node node; /* Within dp_netdev_pmd_thread.classifiers */
+ odp_port_t in_port;
struct cmap subtables_map;
struct pvector subtables;
};
static void dpcls_init(struct dpcls *);
static void dpcls_destroy(struct dpcls *);
+static void dpcls_sort_subtable_vector(struct dpcls *);
static void dpcls_insert(struct dpcls *, struct dpcls_rule *,
const struct netdev_flow_key *mask);
static void dpcls_remove(struct dpcls *, struct dpcls_rule *);
-static bool dpcls_lookup(const struct dpcls *cls,
+static bool dpcls_lookup(struct dpcls *cls,
const struct netdev_flow_key keys[],
- struct dpcls_rule **rules, size_t cnt);
+ struct dpcls_rule **rules, size_t cnt,
+ int *num_lookups_p);
\f
/* Datapath based on the network device interface from netdev.h.
*
*
* dp_netdev_mutex (global)
* port_mutex
+ * non_pmd_mutex
*/
struct dp_netdev {
const struct dpif_class *const class;
/* Ports.
*
- * Protected by RCU. Take the mutex to add or remove ports. */
+ * Any lookup into 'ports' or any access to the dp_netdev_ports found
+ * through 'ports' requires taking 'port_mutex'. */
struct ovs_mutex port_mutex;
- struct cmap ports;
+ struct hmap ports;
struct seq *port_seq; /* Incremented whenever a port changes. */
/* Protects access to ofproto-dpif-upcall interface during revalidator
* 'struct dp_netdev_pmd_thread' in 'per_pmd_key'. */
ovsthread_key_t per_pmd_key;
+ struct seq *reconfigure_seq;
+ uint64_t last_reconfigure_seq;
+
/* Cpu mask for pin of pmd threads. */
char *pmd_cmask;
+
uint64_t last_tnl_conf_seq;
+
+ struct conntrack conntrack;
};
static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
- odp_port_t);
+ odp_port_t)
+ OVS_REQUIRES(dp->port_mutex);
enum dp_stat_type {
DP_STAT_EXACT_HIT, /* Packets that had an exact match (emc). */
DP_STAT_MASKED_HIT, /* Packets that matched in the flow table. */
DP_STAT_MISS, /* Packets that did not match. */
DP_STAT_LOST, /* Packets not passed up to the client. */
+ DP_STAT_LOOKUP_HIT, /* Number of subtable lookups for flow table
+ hits */
DP_N_STATS
};
PMD_N_CYCLES
};
+#define XPS_TIMEOUT_MS 500LL
+
+/* Contained by struct dp_netdev_port's 'rxqs' member. */
+struct dp_netdev_rxq {
+ struct netdev_rxq *rxq;
+ unsigned core_id; /* Сore to which this queue is pinned. */
+};
+
/* A port in a netdev-based datapath. */
struct dp_netdev_port {
odp_port_t port_no;
struct netdev *netdev;
- struct cmap_node node; /* Node in dp_netdev's 'ports'. */
+ struct hmap_node node; /* Node in dp_netdev's 'ports'. */
struct netdev_saved_flags *sf;
- struct netdev_rxq **rxq;
- struct ovs_refcount ref_cnt;
+ struct dp_netdev_rxq *rxqs;
+ unsigned n_rxq; /* Number of elements in 'rxq' */
+ bool dynamic_txqs; /* If true XPS will be used. */
+ unsigned *txq_used; /* Number of threads that uses each tx queue. */
+ struct ovs_mutex txq_used_mutex;
char *type; /* Port type as requested by user. */
- int latest_requested_n_rxq; /* Latest requested from netdev number
- of rx queues. */
+ char *rxq_affinity_list; /* Requested affinity of rx queues. */
};
/* Contained by struct dp_netdev_flow's 'stats' member. */
/* While processing a group of input packets, the datapath uses the next
* member to store a pointer to the output batch for the flow. It is
* reset after the batch has been sent out (See dp_netdev_queue_batches(),
- * packet_batch_init() and packet_batch_execute()). */
- struct packet_batch *batch;
+ * packet_batch_per_flow_init() and packet_batch_per_flow_execute()). */
+ struct packet_batch_per_flow *batch;
/* Packet classification. */
struct dpcls_rule cr; /* In owning dp_netdev's 'cls'. */
struct ovs_list node;
};
+/* Contained by struct dp_netdev_pmd_thread's 'port_cache' or 'tx_ports'. */
+struct tx_port {
+ struct dp_netdev_port *port;
+ int qid;
+ long long last_used;
+ struct hmap_node node;
+};
+
/* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate
* the performance overhead of interrupt processing. Therefore netdev can
* not implement rx-wait for these devices. dpif-netdev needs to poll
/* Per thread exact-match cache. Note, the instance for cpu core
* NON_PMD_CORE_ID can be accessed by multiple threads, and thusly
- * need to be protected (e.g. by 'dp_netdev_mutex'). All other
- * instances will only be accessed by its own pmd thread. */
+ * need to be protected by 'non_pmd_mutex'. Every other instance
+ * will only be accessed by its own pmd thread. */
struct emc_cache flow_cache;
- /* Classifier and Flow-Table.
+ /* Flow-Table and classifiers
*
* Writers of 'flow_table' must take the 'flow_mutex'. Corresponding
- * changes to 'cls' must be made while still holding the 'flow_mutex'.
+ * changes to 'classifiers' must be made while still holding the
+ * 'flow_mutex'.
*/
struct ovs_mutex flow_mutex;
- struct dpcls cls;
struct cmap flow_table OVS_GUARDED; /* Flow table. */
+ /* One classifier per in_port polled by the pmd */
+ struct cmap classifiers;
+ /* Periodically sort subtable vectors according to hit frequencies */
+ long long int next_optimization;
+
/* Statistics. */
struct dp_netdev_pmd_stats stats;
struct latch exit_latch; /* For terminating the pmd thread. */
atomic_uint change_seq; /* For reloading pmd ports. */
pthread_t thread;
- int index; /* Idx of this pmd thread among pmd*/
- /* threads on same numa node. */
unsigned core_id; /* CPU core id of this pmd thread. */
int numa_id; /* numa node id of this pmd thread. */
- atomic_int tx_qid; /* Queue id used by this pmd thread to
- * send packets on all netdevs */
+ bool isolated;
+
+ /* Queue id used by this pmd thread to send packets on all netdevs if
+ * XPS disabled for this netdev. All static_tx_qid's are unique and less
+ * than 'ovs_numa_get_n_cores() + 1'. */
+ atomic_int static_tx_qid;
- struct ovs_mutex poll_mutex; /* Mutex for poll_list. */
+ struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and 'tx_ports'. */
/* List of rx queues to poll. */
struct ovs_list poll_list OVS_GUARDED;
- int poll_cnt; /* Number of elemints in poll_list. */
+ /* Number of elements in 'poll_list' */
+ int poll_cnt;
+ /* Map of 'tx_port's used for transmission. Written by the main thread,
+ * read by the pmd thread. */
+ struct hmap tx_ports OVS_GUARDED;
+
+ /* Map of 'tx_port' used in the fast path. This is a thread-local copy of
+ * 'tx_ports'. The instance for cpu core NON_PMD_CORE_ID can be accessed
+ * by multiple threads, and thusly need to be protected by 'non_pmd_mutex'.
+ * Every other instance will only be accessed by its own pmd thread. */
+ struct hmap port_cache;
/* Only a pmd thread can write on its own 'cycles' and 'stats'.
* The main thread keeps 'stats_zero' and 'cycles_zero' as base
};
static int get_port_by_number(struct dp_netdev *dp, odp_port_t port_no,
- struct dp_netdev_port **portp);
+ struct dp_netdev_port **portp)
+ OVS_REQUIRES(dp->port_mutex);
static int get_port_by_name(struct dp_netdev *dp, const char *devname,
- struct dp_netdev_port **portp);
+ struct dp_netdev_port **portp)
+ OVS_REQUIRES(dp->port_mutex);
static void dp_netdev_free(struct dp_netdev *)
OVS_REQUIRES(dp_netdev_mutex);
static int do_add_port(struct dp_netdev *dp, const char *devname,
static int dpif_netdev_open(const struct dpif_class *, const char *name,
bool create, struct dpif **);
static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
- struct dp_packet **, int c,
- bool may_steal,
+ struct dp_packet_batch *,
+ bool may_steal, const struct flow *flow,
const struct nlattr *actions,
- size_t actions_len);
+ size_t actions_len,
+ long long now);
static void dp_netdev_input(struct dp_netdev_pmd_thread *,
- struct dp_packet **, int cnt, odp_port_t port_no);
+ struct dp_packet_batch *, odp_port_t port_no);
static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *,
- struct dp_packet **, int cnt);
+ struct dp_packet_batch *);
static void dp_netdev_disable_upcall(struct dp_netdev *);
static void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
- struct dp_netdev *dp, int index,
- unsigned core_id, int numa_id);
+ struct dp_netdev *dp, unsigned core_id,
+ int numa_id);
static void dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd);
-static void dp_netdev_set_nonpmd(struct dp_netdev *dp);
+static void dp_netdev_set_nonpmd(struct dp_netdev *dp)
+ OVS_REQUIRES(dp->port_mutex);
+
static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp,
unsigned core_id);
static struct dp_netdev_pmd_thread *
dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id);
-static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id);
-static void
-dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
- struct dp_netdev_port *port, struct netdev_rxq *rx);
+static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
+ OVS_REQUIRES(dp->port_mutex);
+static void dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd);
+static void dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
+ struct dp_netdev_port *port);
+static void dp_netdev_add_port_to_pmds(struct dp_netdev *dp,
+ struct dp_netdev_port *port);
+static void dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct dp_netdev_port *port);
+static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct dp_netdev_port *port,
+ struct netdev_rxq *rx);
static struct dp_netdev_pmd_thread *
dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
-static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp);
+static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
+ OVS_REQUIRES(dp->port_mutex);
+static void reconfigure_pmd_threads(struct dp_netdev *dp)
+ OVS_REQUIRES(dp->port_mutex);
static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd);
+static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
+ OVS_REQUIRES(pmd->port_mutex);
+static inline void
+dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd);
+
+static void
+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
+ long long now, bool purge);
+static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
+ struct tx_port *tx, long long now);
static inline bool emc_entry_alive(struct emc_entry *ce);
static void emc_clear_entry(struct emc_entry *ce);
ds_put_format(reply,
"\temc hits:%llu\n\tmegaflow hits:%llu\n"
+ "\tavg. subtable lookups per hit:%.2f\n"
"\tmiss:%llu\n\tlost:%llu\n",
stats[DP_STAT_EXACT_HIT], stats[DP_STAT_MASKED_HIT],
+ stats[DP_STAT_MASKED_HIT] > 0
+ ? (1.0*stats[DP_STAT_LOOKUP_HIT])/stats[DP_STAT_MASKED_HIT]
+ : 0,
stats[DP_STAT_MISS], stats[DP_STAT_LOST]);
if (total_cycles == 0) {
struct rxq_poll *poll;
const char *prev_name = NULL;
- ds_put_format(reply, "pmd thread numa_id %d core_id %u:\n",
- pmd->numa_id, pmd->core_id);
+ ds_put_format(reply,
+ "pmd thread numa_id %d core_id %u:\n\tisolated : %s\n",
+ pmd->numa_id, pmd->core_id, (pmd->isolated)
+ ? "true" : "false");
- ovs_mutex_lock(&pmd->poll_mutex);
+ ovs_mutex_lock(&pmd->port_mutex);
LIST_FOR_EACH (poll, node, &pmd->poll_list) {
const char *name = netdev_get_name(poll->port->netdev);
ds_put_format(reply, " %d", netdev_rxq_get_queue_id(poll->rx));
prev_name = name;
}
- ovs_mutex_unlock(&pmd->poll_mutex);
+ ovs_mutex_unlock(&pmd->port_mutex);
ds_put_cstr(reply, "\n");
}
}
dpif_netdev_port_open_type(const struct dpif_class *class, const char *type)
{
return strcmp(type, "internal") ? type
- : dpif_netdev_class_is_dummy(class) ? "dummy"
+ : dpif_netdev_class_is_dummy(class) ? "dummy-internal"
: "tap";
}
atomic_flag_clear(&dp->destroyed);
ovs_mutex_init(&dp->port_mutex);
- cmap_init(&dp->ports);
+ hmap_init(&dp->ports);
dp->port_seq = seq_create();
fat_rwlock_init(&dp->upcall_rwlock);
+ dp->reconfigure_seq = seq_create();
+ dp->last_reconfigure_seq = seq_read(dp->reconfigure_seq);
+
/* Disable upcalls by default. */
dp_netdev_disable_upcall(dp);
dp->upcall_aux = NULL;
dp->upcall_cb = NULL;
+ conntrack_init(&dp->conntrack);
+
cmap_init(&dp->poll_threads);
ovs_mutex_init_recursive(&dp->non_pmd_mutex);
ovsthread_key_create(&dp->per_pmd_key, NULL);
+ ovs_mutex_lock(&dp->port_mutex);
dp_netdev_set_nonpmd(dp);
- ovs_mutex_lock(&dp->port_mutex);
- error = do_add_port(dp, name, "internal", ODPP_LOCAL);
+ error = do_add_port(dp, name, dpif_netdev_port_open_type(dp->class,
+ "internal"),
+ ODPP_LOCAL);
ovs_mutex_unlock(&dp->port_mutex);
if (error) {
dp_netdev_free(dp);
return 0;
}
+static void
+dp_netdev_request_reconfigure(struct dp_netdev *dp)
+{
+ seq_change(dp->reconfigure_seq);
+}
+
+static bool
+dp_netdev_is_reconf_required(struct dp_netdev *dp)
+{
+ return seq_read(dp->reconfigure_seq) != dp->last_reconfigure_seq;
+}
+
static int
dpif_netdev_open(const struct dpif_class *class, const char *name,
bool create, struct dpif **dpifp)
dp_netdev_free(struct dp_netdev *dp)
OVS_REQUIRES(dp_netdev_mutex)
{
- struct dp_netdev_port *port;
+ struct dp_netdev_port *port, *next;
shash_find_and_delete(&dp_netdevs, dp->name);
ovs_mutex_destroy(&dp->non_pmd_mutex);
ovsthread_key_delete(dp->per_pmd_key);
+ conntrack_destroy(&dp->conntrack);
+
ovs_mutex_lock(&dp->port_mutex);
- CMAP_FOR_EACH (port, node, &dp->ports) {
- /* PMD threads are destroyed here. do_del_port() cannot quiesce */
+ HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) {
do_del_port(dp, port);
}
ovs_mutex_unlock(&dp->port_mutex);
cmap_destroy(&dp->poll_threads);
+ seq_destroy(dp->reconfigure_seq);
+
seq_destroy(dp->port_seq);
- cmap_destroy(&dp->ports);
+ hmap_destroy(&dp->ports);
+ ovs_mutex_destroy(&dp->port_mutex);
/* Upcalls must be disabled at this point */
dp_netdev_destroy_upcall_lock(dp);
int old_seq;
if (pmd->core_id == NON_PMD_CORE_ID) {
+ ovs_mutex_lock(&pmd->dp->non_pmd_mutex);
+ ovs_mutex_lock(&pmd->port_mutex);
+ pmd_load_cached_ports(pmd);
+ ovs_mutex_unlock(&pmd->port_mutex);
+ ovs_mutex_unlock(&pmd->dp->non_pmd_mutex);
return;
}
}
static int
-do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
- odp_port_t port_no)
- OVS_REQUIRES(dp->port_mutex)
+port_create(const char *devname, const char *type,
+ odp_port_t port_no, struct dp_netdev_port **portp)
{
struct netdev_saved_flags *sf;
struct dp_netdev_port *port;
- struct netdev *netdev;
enum netdev_flags flags;
- const char *open_type;
- int error;
- int i;
+ struct netdev *netdev;
+ int n_open_rxqs = 0;
+ int n_cores = 0;
+ int i, error;
+ bool dynamic_txqs = false;
- /* Reject devices already in 'dp'. */
- if (!get_port_by_name(dp, devname, &port)) {
- return EEXIST;
- }
+ *portp = NULL;
/* Open and validate network device. */
- open_type = dpif_netdev_port_open_type(dp->class, type);
- error = netdev_open(devname, open_type, &netdev);
+ error = netdev_open(devname, type, &netdev);
if (error) {
return error;
}
netdev_get_flags(netdev, &flags);
if (flags & NETDEV_LOOPBACK) {
VLOG_ERR("%s: cannot add a loopback device", devname);
- netdev_close(netdev);
- return EINVAL;
+ error = EINVAL;
+ goto out;
}
if (netdev_is_pmd(netdev)) {
- int n_cores = ovs_numa_get_n_cores();
+ n_cores = ovs_numa_get_n_cores();
if (n_cores == OVS_CORE_UNSPEC) {
VLOG_ERR("%s, cannot get cpu core info", devname);
- return ENOENT;
+ error = ENOENT;
+ goto out;
}
/* There can only be ovs_numa_get_n_cores() pmd threads,
* so creates a txq for each, and one extra for the non
* pmd threads. */
- error = netdev_set_multiq(netdev, n_cores + 1,
- netdev_requested_n_rxq(netdev));
+ error = netdev_set_tx_multiq(netdev, n_cores + 1);
if (error && (error != EOPNOTSUPP)) {
VLOG_ERR("%s, cannot set multiq", devname);
- return errno;
+ goto out;
+ }
+ }
+
+ if (netdev_is_reconf_required(netdev)) {
+ error = netdev_reconfigure(netdev);
+ if (error) {
+ goto out;
+ }
+ }
+
+ if (netdev_is_pmd(netdev)) {
+ if (netdev_n_txq(netdev) < n_cores + 1) {
+ dynamic_txqs = true;
}
}
+
port = xzalloc(sizeof *port);
port->port_no = port_no;
port->netdev = netdev;
- port->rxq = xmalloc(sizeof *port->rxq * netdev_n_rxq(netdev));
+ port->n_rxq = netdev_n_rxq(netdev);
+ port->rxqs = xcalloc(port->n_rxq, sizeof *port->rxqs);
+ port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
port->type = xstrdup(type);
- port->latest_requested_n_rxq = netdev_requested_n_rxq(netdev);
- for (i = 0; i < netdev_n_rxq(netdev); i++) {
- error = netdev_rxq_open(netdev, &port->rxq[i], i);
- if (error
- && !(error == EOPNOTSUPP && dpif_netdev_class_is_dummy(dp->class))) {
+ ovs_mutex_init(&port->txq_used_mutex);
+ port->dynamic_txqs = dynamic_txqs;
+
+ for (i = 0; i < port->n_rxq; i++) {
+ error = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);
+ if (error) {
VLOG_ERR("%s: cannot receive packets on this network device (%s)",
devname, ovs_strerror(errno));
- netdev_close(netdev);
- free(port->type);
- free(port->rxq);
- free(port);
- return error;
+ goto out_rxq_close;
}
+ port->rxqs[i].core_id = -1;
+ n_open_rxqs++;
}
error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
if (error) {
- for (i = 0; i < netdev_n_rxq(netdev); i++) {
- netdev_rxq_close(port->rxq[i]);
- }
- netdev_close(netdev);
- free(port->type);
- free(port->rxq);
- free(port);
- return error;
+ goto out_rxq_close;
}
port->sf = sf;
- ovs_refcount_init(&port->ref_cnt);
- cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
+ *portp = port;
- if (netdev_is_pmd(netdev)) {
- int numa_id = netdev_get_numa_id(netdev);
- struct dp_netdev_pmd_thread *pmd;
+ return 0;
- /* Cannot create pmd threads for invalid numa node. */
- ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
+out_rxq_close:
+ for (i = 0; i < n_open_rxqs; i++) {
+ netdev_rxq_close(port->rxqs[i].rxq);
+ }
+ ovs_mutex_destroy(&port->txq_used_mutex);
+ free(port->type);
+ free(port->txq_used);
+ free(port->rxqs);
+ free(port);
- for (i = 0; i < netdev_n_rxq(netdev); i++) {
- pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
- if (!pmd) {
- /* There is no pmd threads on this numa node. */
- dp_netdev_set_pmds_on_numa(dp, numa_id);
- /* Assigning of rx queues done. */
- break;
- }
+out:
+ netdev_close(netdev);
+ return error;
+}
- ovs_mutex_lock(&pmd->poll_mutex);
- dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);
- ovs_mutex_unlock(&pmd->poll_mutex);
- dp_netdev_reload_pmd__(pmd);
- }
+static int
+do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
+ odp_port_t port_no)
+ OVS_REQUIRES(dp->port_mutex)
+{
+ struct dp_netdev_port *port;
+ int error;
+
+ /* Reject devices already in 'dp'. */
+ if (!get_port_by_name(dp, devname, &port)) {
+ return EEXIST;
+ }
+
+ error = port_create(devname, type, port_no, &port);
+ if (error) {
+ return error;
+ }
+
+ if (netdev_is_pmd(port->netdev)) {
+ int numa_id = netdev_get_numa_id(port->netdev);
+
+ ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
+ dp_netdev_set_pmds_on_numa(dp, numa_id);
}
+
+ dp_netdev_add_port_to_pmds(dp, port);
+
+ hmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
seq_change(dp->port_seq);
return 0;
static struct dp_netdev_port *
dp_netdev_lookup_port(const struct dp_netdev *dp, odp_port_t port_no)
+ OVS_REQUIRES(dp->port_mutex)
{
struct dp_netdev_port *port;
- CMAP_FOR_EACH_WITH_HASH (port, node, hash_port_no(port_no), &dp->ports) {
+ HMAP_FOR_EACH_WITH_HASH (port, node, hash_port_no(port_no), &dp->ports) {
if (port->port_no == port_no) {
return port;
}
static int
get_port_by_number(struct dp_netdev *dp,
odp_port_t port_no, struct dp_netdev_port **portp)
+ OVS_REQUIRES(dp->port_mutex)
{
if (!is_valid_port_number(port_no)) {
*portp = NULL;
}
static void
-port_ref(struct dp_netdev_port *port)
+port_destroy(struct dp_netdev_port *port)
{
- if (port) {
- ovs_refcount_ref(&port->ref_cnt);
+ if (!port) {
+ return;
}
-}
-
-static void
-port_unref(struct dp_netdev_port *port)
-{
- if (port && ovs_refcount_unref_relaxed(&port->ref_cnt) == 1) {
- int n_rxq = netdev_n_rxq(port->netdev);
- int i;
- netdev_close(port->netdev);
- netdev_restore_flags(port->sf);
+ netdev_close(port->netdev);
+ netdev_restore_flags(port->sf);
- for (i = 0; i < n_rxq; i++) {
- netdev_rxq_close(port->rxq[i]);
- }
- free(port->rxq);
- free(port->type);
- free(port);
+ for (unsigned i = 0; i < port->n_rxq; i++) {
+ netdev_rxq_close(port->rxqs[i].rxq);
}
+ ovs_mutex_destroy(&port->txq_used_mutex);
+ free(port->rxq_affinity_list);
+ free(port->txq_used);
+ free(port->rxqs);
+ free(port->type);
+ free(port);
}
static int
{
struct dp_netdev_port *port;
- CMAP_FOR_EACH (port, node, &dp->ports) {
+ HMAP_FOR_EACH (port, node, &dp->ports) {
if (!strcmp(netdev_get_name(port->netdev), devname)) {
*portp = port;
return 0;
* is on numa node 'numa_id'. */
static bool
has_pmd_port_for_numa(struct dp_netdev *dp, int numa_id)
+ OVS_REQUIRES(dp->port_mutex)
{
struct dp_netdev_port *port;
- CMAP_FOR_EACH (port, node, &dp->ports) {
+ HMAP_FOR_EACH (port, node, &dp->ports) {
if (netdev_is_pmd(port->netdev)
&& netdev_get_numa_id(port->netdev) == numa_id) {
return true;
do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
OVS_REQUIRES(dp->port_mutex)
{
- cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
+ hmap_remove(&dp->ports, &port->node);
seq_change(dp->port_seq);
+
+ dp_netdev_del_port_from_all_pmds(dp, port);
+
if (netdev_is_pmd(port->netdev)) {
int numa_id = netdev_get_numa_id(port->netdev);
/* PMD threads can not be on invalid numa node. */
ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
/* If there is no netdev on the numa node, deletes the pmd threads
- * for that numa. Else, deletes the queues from polling lists. */
+ * for that numa. */
if (!has_pmd_port_for_numa(dp, numa_id)) {
dp_netdev_del_pmds_on_numa(dp, numa_id);
- } else {
- struct dp_netdev_pmd_thread *pmd;
- struct rxq_poll *poll, *next;
-
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- if (pmd->numa_id == numa_id) {
- bool found = false;
-
- ovs_mutex_lock(&pmd->poll_mutex);
- LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
- if (poll->port == port) {
- found = true;
- port_unref(poll->port);
- list_remove(&poll->node);
- pmd->poll_cnt--;
- free(poll);
- }
- }
- ovs_mutex_unlock(&pmd->poll_mutex);
- if (found) {
- dp_netdev_reload_pmd__(pmd);
- }
- }
- }
}
}
- port_unref(port);
+ port_destroy(port);
}
static void
struct dp_netdev_port *port;
int error;
+ ovs_mutex_lock(&dp->port_mutex);
error = get_port_by_number(dp, port_no, &port);
if (!error && dpif_port) {
answer_port_query(port, dpif_port);
}
+ ovs_mutex_unlock(&dp->port_mutex);
return error;
}
return ufid->u32[0];
}
+static inline struct dpcls *
+dp_netdev_pmd_lookup_dpcls(struct dp_netdev_pmd_thread *pmd,
+ odp_port_t in_port)
+{
+ struct dpcls *cls;
+ uint32_t hash = hash_port_no(in_port);
+ CMAP_FOR_EACH_WITH_HASH (cls, node, hash, &pmd->classifiers) {
+ if (cls->in_port == in_port) {
+ /* Port classifier exists already */
+ return cls;
+ }
+ }
+ return NULL;
+}
+
+static inline struct dpcls *
+dp_netdev_pmd_find_dpcls(struct dp_netdev_pmd_thread *pmd,
+ odp_port_t in_port)
+ OVS_REQUIRES(pmd->flow_mutex)
+{
+ struct dpcls *cls = dp_netdev_pmd_lookup_dpcls(pmd, in_port);
+ uint32_t hash = hash_port_no(in_port);
+
+ if (!cls) {
+ /* Create new classifier for in_port */
+ cls = xmalloc(sizeof(*cls));
+ dpcls_init(cls);
+ cls->in_port = in_port;
+ cmap_insert(&pmd->classifiers, &cls->node, hash);
+ VLOG_DBG("Creating dpcls %p for in_port %d", cls, in_port);
+ }
+ return cls;
+}
+
static void
dp_netdev_pmd_remove_flow(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev_flow *flow)
OVS_REQUIRES(pmd->flow_mutex)
{
struct cmap_node *node = CONST_CAST(struct cmap_node *, &flow->node);
+ struct dpcls *cls;
+ odp_port_t in_port = flow->flow.in_port.odp_port;
- dpcls_remove(&pmd->cls, &flow->cr);
- flow->cr.mask = NULL; /* Accessing rule's mask after this is not safe. */
-
+ cls = dp_netdev_pmd_lookup_dpcls(pmd, in_port);
+ ovs_assert(cls != NULL);
+ dpcls_remove(cls, &flow->cr);
cmap_remove(&pmd->flow_table, node, dp_netdev_flow_hash(&flow->ufid));
flow->dead = true;
}
struct dp_netdev_port_state {
- struct cmap_position position;
+ struct hmap_position position;
char *name;
};
{
struct dp_netdev_port_state *state = state_;
struct dp_netdev *dp = get_dp_netdev(dpif);
- struct cmap_node *node;
+ struct hmap_node *node;
int retval;
- node = cmap_next_position(&dp->ports, &state->position);
+ ovs_mutex_lock(&dp->port_mutex);
+ node = hmap_at_position(&dp->ports, &state->position);
if (node) {
struct dp_netdev_port *port;
} else {
retval = EOF;
}
+ ovs_mutex_unlock(&dp->port_mutex);
return retval;
}
}
/* Used to compare 'netdev_flow_key' in the exact match cache to a miniflow.
- * The maps are compared bitwise, so both 'key->mf' 'mf' must have been
+ * The maps are compared bitwise, so both 'key->mf' and 'mf' must have been
* generated by miniflow_extract. */
static inline bool
netdev_flow_key_equal_mf(const struct netdev_flow_key *key,
}
static struct dp_netdev_flow *
-dp_netdev_pmd_lookup_flow(const struct dp_netdev_pmd_thread *pmd,
- const struct netdev_flow_key *key)
+dp_netdev_pmd_lookup_flow(struct dp_netdev_pmd_thread *pmd,
+ const struct netdev_flow_key *key,
+ int *lookup_num_p)
{
- struct dp_netdev_flow *netdev_flow;
+ struct dpcls *cls;
struct dpcls_rule *rule;
+ odp_port_t in_port = u32_to_odp(MINIFLOW_GET_U32(&key->mf, in_port));
+ struct dp_netdev_flow *netdev_flow = NULL;
- dpcls_lookup(&pmd->cls, key, &rule, 1);
- netdev_flow = dp_netdev_flow_cast(rule);
-
+ cls = dp_netdev_pmd_lookup_dpcls(pmd, in_port);
+ if (OVS_LIKELY(cls)) {
+ dpcls_lookup(cls, key, &rule, 1, lookup_num_p);
+ netdev_flow = dp_netdev_flow_cast(rule);
+ }
return netdev_flow;
}
if (ufidp) {
CMAP_FOR_EACH_WITH_HASH (netdev_flow, node, dp_netdev_flow_hash(ufidp),
&pmd->flow_table) {
- if (ovs_u128_equals(&netdev_flow->ufid, ufidp)) {
+ if (ovs_u128_equals(netdev_flow->ufid, *ufidp)) {
return netdev_flow;
}
}
/* Key */
offset = key_buf->size;
flow->key = ofpbuf_tail(key_buf);
- odp_parms.odp_in_port = netdev_flow->flow.in_port.odp_port;
odp_flow_key_from_flow(&odp_parms, key_buf);
flow->key_len = key_buf->size - offset;
/* Mask */
offset = mask_buf->size;
flow->mask = ofpbuf_tail(mask_buf);
- odp_parms.odp_in_port = wc.masks.in_port.odp_port;
odp_parms.key_buf = key_buf;
odp_flow_key_from_mask(&odp_parms, mask_buf);
flow->mask_len = mask_buf->size - offset;
{
enum odp_key_fitness fitness;
- fitness = odp_flow_key_to_mask_udpif(mask_key, mask_key_len, key,
- key_len, wc, flow);
+ fitness = odp_flow_key_to_mask(mask_key, mask_key_len, wc, flow);
if (fitness) {
/* This should not happen: it indicates that
* odp_flow_key_from_mask() and odp_flow_key_to_mask()
{
odp_port_t in_port;
- if (odp_flow_key_to_flow_udpif(key, key_len, flow)) {
+ if (odp_flow_key_to_flow(key, key_len, flow)) {
/* This should not happen: it indicates that odp_flow_key_from_flow()
* and odp_flow_key_to_flow() disagree on the acceptable form of a
* flow. Log the problem as an error, with enough details to enable
return EINVAL;
}
- /* Userspace datapath doesn't support conntrack. */
- if (flow->ct_state || flow->ct_zone || flow->ct_mark
- || !ovs_u128_is_zero(&flow->ct_label)) {
+ if (flow->ct_state & DP_NETDEV_CS_UNSUPPORTED_MASK) {
return EINVAL;
}
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_flow *netdev_flow;
struct dp_netdev_pmd_thread *pmd;
- unsigned pmd_id = get->pmd_id == PMD_ID_NULL
- ? NON_PMD_CORE_ID : get->pmd_id;
- int error = 0;
+ struct hmapx to_find = HMAPX_INITIALIZER(&to_find);
+ struct hmapx_node *node;
+ int error = EINVAL;
- pmd = dp_netdev_get_pmd(dp, pmd_id);
- if (!pmd) {
- return EINVAL;
+ if (get->pmd_id == PMD_ID_NULL) {
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (dp_netdev_pmd_try_ref(pmd) && !hmapx_add(&to_find, pmd)) {
+ dp_netdev_pmd_unref(pmd);
+ }
+ }
+ } else {
+ pmd = dp_netdev_get_pmd(dp, get->pmd_id);
+ if (!pmd) {
+ goto out;
+ }
+ hmapx_add(&to_find, pmd);
}
- netdev_flow = dp_netdev_pmd_find_flow(pmd, get->ufid, get->key,
- get->key_len);
- if (netdev_flow) {
- dp_netdev_flow_to_dpif_flow(netdev_flow, get->buffer, get->buffer,
- get->flow, false);
- } else {
- error = ENOENT;
+ if (!hmapx_count(&to_find)) {
+ goto out;
}
- dp_netdev_pmd_unref(pmd);
+ HMAPX_FOR_EACH (node, &to_find) {
+ pmd = (struct dp_netdev_pmd_thread *) node->data;
+ netdev_flow = dp_netdev_pmd_find_flow(pmd, get->ufid, get->key,
+ get->key_len);
+ if (netdev_flow) {
+ dp_netdev_flow_to_dpif_flow(netdev_flow, get->buffer, get->buffer,
+ get->flow, false);
+ error = 0;
+ break;
+ } else {
+ error = ENOENT;
+ }
+ }
+ HMAPX_FOR_EACH (node, &to_find) {
+ pmd = (struct dp_netdev_pmd_thread *) node->data;
+ dp_netdev_pmd_unref(pmd);
+ }
+out:
+ hmapx_destroy(&to_find);
return error;
}
{
struct dp_netdev_flow *flow;
struct netdev_flow_key mask;
+ struct dpcls *cls;
+ odp_port_t in_port = match->flow.in_port.odp_port;
netdev_flow_mask_init(&mask, match);
/* Make sure wc does not have metadata. */
ovsrcu_set(&flow->actions, dp_netdev_actions_create(actions, actions_len));
netdev_flow_key_init_masked(&flow->cr.flow, &match->flow, &mask);
- dpcls_insert(&pmd->cls, &flow->cr, &mask);
+
+ /* Select dpcls for in_port. Relies on in_port to be exact match */
+ ovs_assert(match->wc.masks.in_port.odp_port == ODP_PORT_C(UINT32_MAX));
+ cls = dp_netdev_pmd_find_dpcls(pmd, in_port);
+ dpcls_insert(cls, &flow->cr, &mask);
cmap_insert(&pmd->flow_table, CONST_CAST(struct cmap_node *, &flow->node),
dp_netdev_flow_hash(&flow->ufid));
if (OVS_UNLIKELY(VLOG_IS_DBG_ENABLED())) {
- struct match match;
struct ds ds = DS_EMPTY_INITIALIZER;
+ struct ofpbuf key_buf, mask_buf;
+ struct odp_flow_key_parms odp_parms = {
+ .flow = &match->flow,
+ .mask = &match->wc.masks,
+ .support = dp_netdev_support,
+ };
- match.tun_md.valid = false;
- match.flow = flow->flow;
- miniflow_expand(&flow->cr.mask->mf, &match.wc.masks);
+ ofpbuf_init(&key_buf, 0);
+ ofpbuf_init(&mask_buf, 0);
+
+ odp_flow_key_from_flow(&odp_parms, &key_buf);
+ odp_parms.key_buf = &key_buf;
+ odp_flow_key_from_mask(&odp_parms, &mask_buf);
ds_put_cstr(&ds, "flow_add: ");
odp_format_ufid(ufid, &ds);
ds_put_cstr(&ds, " ");
- match_format(&match, &ds, OFP_DEFAULT_PRIORITY);
+ odp_flow_format(key_buf.data, key_buf.size,
+ mask_buf.data, mask_buf.size,
+ NULL, &ds, false);
ds_put_cstr(&ds, ", actions:");
format_odp_actions(&ds, actions, actions_len);
VLOG_DBG_RL(&upcall_rl, "%s", ds_cstr(&ds));
+ ofpbuf_uninit(&key_buf);
+ ofpbuf_uninit(&mask_buf);
ds_destroy(&ds);
}
}
ovs_mutex_lock(&pmd->flow_mutex);
- netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &key);
+ netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &key, NULL);
if (!netdev_flow) {
if (put->flags & DPIF_FP_CREATE) {
if (cmap_count(&pmd->flow_table) < MAX_FLOWS) {
{
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_pmd_thread *pmd;
- struct dp_packet *pp;
+ struct dp_packet_batch pp;
if (dp_packet_size(execute->packet) < ETH_HEADER_LEN ||
dp_packet_size(execute->packet) > UINT16_MAX) {
pmd = ovsthread_getspecific(dp->per_pmd_key);
if (!pmd) {
pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
+ if (!pmd) {
+ return EBUSY;
+ }
}
/* If the current thread is non-pmd thread, acquires
* the 'non_pmd_mutex'. */
if (pmd->core_id == NON_PMD_CORE_ID) {
ovs_mutex_lock(&dp->non_pmd_mutex);
- ovs_mutex_lock(&dp->port_mutex);
}
- pp = execute->packet;
- dp_netdev_execute_actions(pmd, &pp, 1, false, execute->actions,
- execute->actions_len);
+ /* The action processing expects the RSS hash to be valid, because
+ * it's always initialized at the beginning of datapath processing.
+ * In this case, though, 'execute->packet' may not have gone through
+ * the datapath at all, it may have been generated by the upper layer
+ * (OpenFlow packet-out, BFD frame, ...). */
+ if (!dp_packet_rss_valid(execute->packet)) {
+ dp_packet_set_rss_hash(execute->packet,
+ flow_hash_5tuple(execute->flow, 0));
+ }
+
+ packet_batch_init_packet(&pp, execute->packet);
+ dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
+ execute->actions, execute->actions_len,
+ time_msec());
+
if (pmd->core_id == NON_PMD_CORE_ID) {
- dp_netdev_pmd_unref(pmd);
- ovs_mutex_unlock(&dp->port_mutex);
ovs_mutex_unlock(&dp->non_pmd_mutex);
+ dp_netdev_pmd_unref(pmd);
}
return 0;
}
}
-/* Returns true if the configuration for rx queues or cpu mask
- * is changed. */
-static bool
-pmd_config_changed(const struct dp_netdev *dp, const char *cmask)
+/* Changes the number or the affinity of pmd threads. The changes are actually
+ * applied in dpif_netdev_run(). */
+static int
+dpif_netdev_pmd_set(struct dpif *dpif, const char *cmask)
{
- struct dp_netdev_port *port;
+ struct dp_netdev *dp = get_dp_netdev(dpif);
- CMAP_FOR_EACH (port, node, &dp->ports) {
- struct netdev *netdev = port->netdev;
- int requested_n_rxq = netdev_requested_n_rxq(netdev);
- if (netdev_is_pmd(netdev)
- && port->latest_requested_n_rxq != requested_n_rxq) {
- return true;
- }
+ if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
+ free(dp->pmd_cmask);
+ dp->pmd_cmask = nullable_xstrdup(cmask);
+ dp_netdev_request_reconfigure(dp);
}
- if (dp->pmd_cmask != NULL && cmask != NULL) {
- return strcmp(dp->pmd_cmask, cmask);
- } else {
- return (dp->pmd_cmask != NULL || cmask != NULL);
- }
+ return 0;
}
-/* Resets pmd threads if the configuration for 'rxq's or cpu mask changes. */
+/* Parses affinity list and returns result in 'core_ids'. */
static int
-dpif_netdev_pmd_set(struct dpif *dpif, const char *cmask)
+parse_affinity_list(const char *affinity_list, unsigned *core_ids, int n_rxq)
{
- struct dp_netdev *dp = get_dp_netdev(dpif);
+ unsigned i;
+ char *list, *copy, *key, *value;
+ int error = 0;
- if (pmd_config_changed(dp, cmask)) {
- struct dp_netdev_port *port;
+ for (i = 0; i < n_rxq; i++) {
+ core_ids[i] = -1;
+ }
- dp_netdev_destroy_all_pmds(dp);
+ if (!affinity_list) {
+ return 0;
+ }
- CMAP_FOR_EACH (port, node, &dp->ports) {
- struct netdev *netdev = port->netdev;
- int requested_n_rxq = netdev_requested_n_rxq(netdev);
- if (netdev_is_pmd(port->netdev)
- && port->latest_requested_n_rxq != requested_n_rxq) {
- int i, err;
+ list = copy = xstrdup(affinity_list);
- /* Closes the existing 'rxq's. */
- for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
- netdev_rxq_close(port->rxq[i]);
- port->rxq[i] = NULL;
- }
+ while (ofputil_parse_key_value(&list, &key, &value)) {
+ int rxq_id, core_id;
- /* Sets the new rx queue config. */
- err = netdev_set_multiq(port->netdev,
- ovs_numa_get_n_cores() + 1,
- requested_n_rxq);
- if (err && (err != EOPNOTSUPP)) {
- VLOG_ERR("Failed to set dpdk interface %s rx_queue to:"
- " %u", netdev_get_name(port->netdev),
- requested_n_rxq);
- return err;
- }
- port->latest_requested_n_rxq = requested_n_rxq;
- /* If the set_multiq() above succeeds, reopens the 'rxq's. */
- port->rxq = xrealloc(port->rxq, sizeof *port->rxq
- * netdev_n_rxq(port->netdev));
- for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
- netdev_rxq_open(port->netdev, &port->rxq[i], i);
- }
- }
+ if (!str_to_int(key, 0, &rxq_id) || rxq_id < 0
+ || !str_to_int(value, 0, &core_id) || core_id < 0) {
+ error = EINVAL;
+ break;
}
- /* Reconfigures the cpu mask. */
- ovs_numa_set_cpu_mask(cmask);
- free(dp->pmd_cmask);
- dp->pmd_cmask = cmask ? xstrdup(cmask) : NULL;
- /* Restores the non-pmd. */
- dp_netdev_set_nonpmd(dp);
- /* Restores all pmd threads. */
- dp_netdev_reset_pmd_threads(dp);
+ if (rxq_id < n_rxq) {
+ core_ids[rxq_id] = core_id;
+ }
}
- return 0;
+ free(copy);
+ return error;
+}
+
+/* Parses 'affinity_list' and applies configuration if it is valid. */
+static int
+dpif_netdev_port_set_rxq_affinity(struct dp_netdev_port *port,
+ const char *affinity_list)
+{
+ unsigned *core_ids, i;
+ int error = 0;
+
+ core_ids = xmalloc(port->n_rxq * sizeof *core_ids);
+ if (parse_affinity_list(affinity_list, core_ids, port->n_rxq)) {
+ error = EINVAL;
+ goto exit;
+ }
+
+ for (i = 0; i < port->n_rxq; i++) {
+ port->rxqs[i].core_id = core_ids[i];
+ }
+
+exit:
+ free(core_ids);
+ return error;
+}
+
+/* Changes the affinity of port's rx queues. The changes are actually applied
+ * in dpif_netdev_run(). */
+static int
+dpif_netdev_port_set_config(struct dpif *dpif, odp_port_t port_no,
+ const struct smap *cfg)
+{
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+ struct dp_netdev_port *port;
+ int error = 0;
+ const char *affinity_list = smap_get(cfg, "pmd-rxq-affinity");
+
+ ovs_mutex_lock(&dp->port_mutex);
+ error = get_port_by_number(dp, port_no, &port);
+ if (error || !netdev_is_pmd(port->netdev)
+ || nullable_string_is_equal(affinity_list, port->rxq_affinity_list)) {
+ goto unlock;
+ }
+
+ error = dpif_netdev_port_set_rxq_affinity(port, affinity_list);
+ if (error) {
+ goto unlock;
+ }
+ free(port->rxq_affinity_list);
+ port->rxq_affinity_list = nullable_xstrdup(affinity_list);
+
+ dp_netdev_request_reconfigure(dp);
+unlock:
+ ovs_mutex_unlock(&dp->port_mutex);
+ return error;
}
static int
struct dp_netdev_port *port,
struct netdev_rxq *rxq)
{
- struct dp_packet *packets[NETDEV_MAX_BURST];
- int error, cnt;
+ struct dp_packet_batch batch;
+ int error;
+ dp_packet_batch_init(&batch);
cycles_count_start(pmd);
- error = netdev_rxq_recv(rxq, packets, &cnt);
+ error = netdev_rxq_recv(rxq, &batch);
cycles_count_end(pmd, PMD_CYCLES_POLLING);
if (!error) {
*recirc_depth_get() = 0;
cycles_count_start(pmd);
- dp_netdev_input(pmd, packets, cnt, port->port_no);
+ dp_netdev_input(pmd, &batch, port->port_no);
cycles_count_end(pmd, PMD_CYCLES_PROCESSING);
} else if (error != EAGAIN && error != EOPNOTSUPP) {
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
}
}
-/* Return true if needs to revalidate datapath flows. */
-static bool
-dpif_netdev_run(struct dpif *dpif)
+static int
+port_reconfigure(struct dp_netdev_port *port)
{
- struct dp_netdev_port *port;
- struct dp_netdev *dp = get_dp_netdev(dpif);
- struct dp_netdev_pmd_thread *non_pmd = dp_netdev_get_pmd(dp,
- NON_PMD_CORE_ID);
- uint64_t new_tnl_seq;
+ struct netdev *netdev = port->netdev;
+ int i, err;
- ovs_mutex_lock(&dp->non_pmd_mutex);
- CMAP_FOR_EACH (port, node, &dp->ports) {
- if (!netdev_is_pmd(port->netdev)) {
- int i;
+ if (!netdev_is_reconf_required(netdev)) {
+ return 0;
+ }
+
+ /* Closes the existing 'rxq's. */
+ for (i = 0; i < port->n_rxq; i++) {
+ netdev_rxq_close(port->rxqs[i].rxq);
+ port->rxqs[i].rxq = NULL;
+ }
+ port->n_rxq = 0;
+
+ /* Allows 'netdev' to apply the pending configuration changes. */
+ err = netdev_reconfigure(netdev);
+ if (err && (err != EOPNOTSUPP)) {
+ VLOG_ERR("Failed to set interface %s new configuration",
+ netdev_get_name(netdev));
+ return err;
+ }
+ /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */
+ port->rxqs = xrealloc(port->rxqs,
+ sizeof *port->rxqs * netdev_n_rxq(netdev));
+ /* Realloc 'used' counters for tx queues. */
+ free(port->txq_used);
+ port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
+
+ for (i = 0; i < netdev_n_rxq(netdev); i++) {
+ err = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);
+ if (err) {
+ return err;
+ }
+ port->n_rxq++;
+ }
+
+ /* Parse affinity list to apply configuration for new queues. */
+ dpif_netdev_port_set_rxq_affinity(port, port->rxq_affinity_list);
+
+ return 0;
+}
+
+static void
+reconfigure_pmd_threads(struct dp_netdev *dp)
+ OVS_REQUIRES(dp->port_mutex)
+{
+ struct dp_netdev_port *port, *next;
+ int n_cores;
+
+ dp->last_reconfigure_seq = seq_read(dp->reconfigure_seq);
+
+ dp_netdev_destroy_all_pmds(dp);
+
+ /* Reconfigures the cpu mask. */
+ ovs_numa_set_cpu_mask(dp->pmd_cmask);
+
+ n_cores = ovs_numa_get_n_cores();
+ if (n_cores == OVS_CORE_UNSPEC) {
+ VLOG_ERR("Cannot get cpu core info");
+ return;
+ }
+
+ HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) {
+ int err;
+
+ err = port_reconfigure(port);
+ if (err) {
+ hmap_remove(&dp->ports, &port->node);
+ seq_change(dp->port_seq);
+ port_destroy(port);
+ } else {
+ port->dynamic_txqs = netdev_n_txq(port->netdev) < n_cores + 1;
+ }
+ }
+ /* Restores the non-pmd. */
+ dp_netdev_set_nonpmd(dp);
+ /* Restores all pmd threads. */
+ dp_netdev_reset_pmd_threads(dp);
+}
+
+/* Returns true if one of the netdevs in 'dp' requires a reconfiguration */
+static bool
+ports_require_restart(const struct dp_netdev *dp)
+ OVS_REQUIRES(dp->port_mutex)
+{
+ struct dp_netdev_port *port;
+
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ if (netdev_is_reconf_required(port->netdev)) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+/* Return true if needs to revalidate datapath flows. */
+static bool
+dpif_netdev_run(struct dpif *dpif)
+{
+ struct dp_netdev_port *port;
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+ struct dp_netdev_pmd_thread *non_pmd;
+ uint64_t new_tnl_seq;
+
+ ovs_mutex_lock(&dp->port_mutex);
+ non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
+ if (non_pmd) {
+ ovs_mutex_lock(&dp->non_pmd_mutex);
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ if (!netdev_is_pmd(port->netdev)) {
+ int i;
- for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
- dp_netdev_process_rxq_port(non_pmd, port, port->rxq[i]);
+ for (i = 0; i < port->n_rxq; i++) {
+ dp_netdev_process_rxq_port(non_pmd, port,
+ port->rxqs[i].rxq);
+ }
}
}
+ dpif_netdev_xps_revalidate_pmd(non_pmd, time_msec(), false);
+ ovs_mutex_unlock(&dp->non_pmd_mutex);
+
+ dp_netdev_pmd_unref(non_pmd);
}
- ovs_mutex_unlock(&dp->non_pmd_mutex);
- dp_netdev_pmd_unref(non_pmd);
+
+ if (dp_netdev_is_reconf_required(dp) || ports_require_restart(dp)) {
+ reconfigure_pmd_threads(dp);
+ }
+ ovs_mutex_unlock(&dp->port_mutex);
tnl_neigh_cache_run();
tnl_port_map_run();
struct dp_netdev *dp = get_dp_netdev(dpif);
ovs_mutex_lock(&dp_netdev_mutex);
- CMAP_FOR_EACH (port, node, &dp->ports) {
+ ovs_mutex_lock(&dp->port_mutex);
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ netdev_wait_reconf_required(port->netdev);
if (!netdev_is_pmd(port->netdev)) {
int i;
- for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
- netdev_rxq_wait(port->rxq[i]);
+ for (i = 0; i < port->n_rxq; i++) {
+ netdev_rxq_wait(port->rxqs[i].rxq);
}
}
}
+ ovs_mutex_unlock(&dp->port_mutex);
ovs_mutex_unlock(&dp_netdev_mutex);
seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
}
+static void
+pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
+{
+ struct tx_port *tx_port_cached;
+
+ /* Free all used tx queue ids. */
+ dpif_netdev_xps_revalidate_pmd(pmd, 0, true);
+
+ HMAP_FOR_EACH_POP (tx_port_cached, node, &pmd->port_cache) {
+ free(tx_port_cached);
+ }
+}
+
+/* Copies ports from 'pmd->tx_ports' (shared with the main thread) to
+ * 'pmd->port_cache' (thread local) */
+static void
+pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
+ OVS_REQUIRES(pmd->port_mutex)
+{
+ struct tx_port *tx_port, *tx_port_cached;
+
+ pmd_free_cached_ports(pmd);
+ hmap_shrink(&pmd->port_cache);
+
+ HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) {
+ tx_port_cached = xmemdup(tx_port, sizeof *tx_port_cached);
+ hmap_insert(&pmd->port_cache, &tx_port_cached->node,
+ hash_port_no(tx_port_cached->port->port_no));
+ }
+}
+
static int
-pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
- struct rxq_poll **ppoll_list, int poll_cnt)
- OVS_REQUIRES(pmd->poll_mutex)
+pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
+ struct rxq_poll **ppoll_list)
{
struct rxq_poll *poll_list = *ppoll_list;
struct rxq_poll *poll;
int i;
- for (i = 0; i < poll_cnt; i++) {
- port_unref(poll_list[i].port);
- }
-
+ ovs_mutex_lock(&pmd->port_mutex);
poll_list = xrealloc(poll_list, pmd->poll_cnt * sizeof *poll_list);
i = 0;
LIST_FOR_EACH (poll, node, &pmd->poll_list) {
- port_ref(poll->port);
poll_list[i++] = *poll;
}
+ pmd_load_cached_ports(pmd);
+
+ ovs_mutex_unlock(&pmd->port_mutex);
+
*ppoll_list = poll_list;
- return pmd->poll_cnt;
+ return i;
}
static void *
unsigned int lc = 0;
struct rxq_poll *poll_list;
unsigned int port_seq = PMD_INITIAL_SEQ;
+ bool exiting;
int poll_cnt;
int i;
- poll_cnt = 0;
poll_list = NULL;
/* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
- pmd_thread_setaffinity_cpu(pmd->core_id);
+ ovs_numa_thread_setaffinity_core(pmd->core_id);
+ dpdk_set_lcore_id(pmd->core_id);
+ poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
reload:
emc_cache_init(&pmd->flow_cache);
- ovs_mutex_lock(&pmd->poll_mutex);
- poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt);
- ovs_mutex_unlock(&pmd->poll_mutex);
-
/* List port/core affinity */
for (i = 0; i < poll_cnt; i++) {
VLOG_DBG("Core %d processing port \'%s\' with queue-id %d\n",
netdev_rxq_get_queue_id(poll_list[i].rx));
}
- /* Signal here to make sure the pmd finishes
- * reloading the updated configuration. */
- dp_netdev_pmd_reload_done(pmd);
-
for (;;) {
for (i = 0; i < poll_cnt; i++) {
dp_netdev_process_rxq_port(pmd, poll_list[i].port, poll_list[i].rx);
lc = 0;
- emc_cache_slow_sweep(&pmd->flow_cache);
coverage_try_clear();
- ovsrcu_quiesce();
+ dp_netdev_pmd_try_optimize(pmd);
+ if (!ovsrcu_try_quiesce()) {
+ emc_cache_slow_sweep(&pmd->flow_cache);
+ }
atomic_read_relaxed(&pmd->change_seq, &seq);
if (seq != port_seq) {
}
}
+ poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
+ exiting = latch_is_set(&pmd->exit_latch);
+ /* Signal here to make sure the pmd finishes
+ * reloading the updated configuration. */
+ dp_netdev_pmd_reload_done(pmd);
+
emc_cache_uninit(&pmd->flow_cache);
- if (!latch_is_set(&pmd->exit_latch)){
+ if (!exiting) {
goto reload;
}
- for (i = 0; i < poll_cnt; i++) {
- port_unref(poll_list[i].port);
- }
-
- dp_netdev_pmd_reload_done(pmd);
-
free(poll_list);
+ pmd_free_cached_ports(pmd);
return NULL;
}
}
/* Finds and refs the dp_netdev_pmd_thread on core 'core_id'. Returns
- * the pointer if succeeds, otherwise, NULL.
+ * the pointer if succeeds, otherwise, NULL (it can return NULL even if
+ * 'core_id' is NON_PMD_CORE_ID).
*
* Caller must unrefs the returned reference. */
static struct dp_netdev_pmd_thread *
/* Sets the 'struct dp_netdev_pmd_thread' for non-pmd threads. */
static void
dp_netdev_set_nonpmd(struct dp_netdev *dp)
+ OVS_REQUIRES(dp->port_mutex)
{
struct dp_netdev_pmd_thread *non_pmd;
+ struct dp_netdev_port *port;
non_pmd = xzalloc(sizeof *non_pmd);
- dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID,
- OVS_NUMA_UNSPEC);
+ dp_netdev_configure_pmd(non_pmd, dp, NON_PMD_CORE_ID, OVS_NUMA_UNSPEC);
+
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ dp_netdev_add_port_tx_to_pmd(non_pmd, port);
+ }
+
+ dp_netdev_reload_pmd__(non_pmd);
}
/* Caller must have valid pointer to 'pmd'. */
/* Configures the 'pmd' based on the input argument. */
static void
dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
- int index, unsigned core_id, int numa_id)
+ unsigned core_id, int numa_id)
{
pmd->dp = dp;
- pmd->index = index;
pmd->core_id = core_id;
pmd->numa_id = numa_id;
pmd->poll_cnt = 0;
- atomic_init(&pmd->tx_qid,
+ atomic_init(&pmd->static_tx_qid,
(core_id == NON_PMD_CORE_ID)
? ovs_numa_get_n_cores()
: get_n_pmd_threads(dp));
xpthread_cond_init(&pmd->cond, NULL);
ovs_mutex_init(&pmd->cond_mutex);
ovs_mutex_init(&pmd->flow_mutex);
- ovs_mutex_init(&pmd->poll_mutex);
- dpcls_init(&pmd->cls);
+ ovs_mutex_init(&pmd->port_mutex);
cmap_init(&pmd->flow_table);
- list_init(&pmd->poll_list);
+ cmap_init(&pmd->classifiers);
+ pmd->next_optimization = time_msec() + DPCLS_OPTIMIZATION_INTERVAL;
+ ovs_list_init(&pmd->poll_list);
+ hmap_init(&pmd->tx_ports);
+ hmap_init(&pmd->port_cache);
/* init the 'flow_cache' since there is no
* actual thread created for NON_PMD_CORE_ID. */
if (core_id == NON_PMD_CORE_ID) {
static void
dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
{
+ struct dpcls *cls;
+
dp_netdev_pmd_flow_flush(pmd);
- dpcls_destroy(&pmd->cls);
+ hmap_destroy(&pmd->port_cache);
+ hmap_destroy(&pmd->tx_ports);
+ /* All flows (including their dpcls_rules) have been deleted already */
+ CMAP_FOR_EACH (cls, node, &pmd->classifiers) {
+ dpcls_destroy(cls);
+ }
+ cmap_destroy(&pmd->classifiers);
cmap_destroy(&pmd->flow_table);
ovs_mutex_destroy(&pmd->flow_mutex);
latch_destroy(&pmd->exit_latch);
xpthread_cond_destroy(&pmd->cond);
ovs_mutex_destroy(&pmd->cond_mutex);
- ovs_mutex_destroy(&pmd->poll_mutex);
+ ovs_mutex_destroy(&pmd->port_mutex);
free(pmd);
}
static void
dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
{
- struct rxq_poll *poll;
-
- /* Uninit the 'flow_cache' since there is
- * no actual thread uninit it for NON_PMD_CORE_ID. */
+ /* NON_PMD_CORE_ID doesn't have a thread, so we don't have to synchronize,
+ * but extra cleanup is necessary */
if (pmd->core_id == NON_PMD_CORE_ID) {
emc_cache_uninit(&pmd->flow_cache);
+ pmd_free_cached_ports(pmd);
} else {
latch_set(&pmd->exit_latch);
dp_netdev_reload_pmd__(pmd);
xpthread_join(pmd->thread, NULL);
}
- /* Unref all ports and free poll_list. */
- LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) {
- port_unref(poll->port);
- free(poll);
- }
+ dp_netdev_pmd_clear_ports(pmd);
/* Purges the 'pmd''s flows after stopping the thread, but before
* destroying the flows, so that the flow stats can be collected. */
}
/* Deletes all pmd threads on numa node 'numa_id' and
- * fixes tx_qids of other threads to keep them sequential. */
+ * fixes static_tx_qids of other threads to keep them sequential. */
static void
dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
{
* 'dp->poll_threads' (while we're iterating it) and it
* might quiesce. */
if (pmd->numa_id == numa_id) {
- atomic_read_relaxed(&pmd->tx_qid, &free_idx[k]);
+ atomic_read_relaxed(&pmd->static_tx_qid, &free_idx[k]);
pmd_list[k] = pmd;
ovs_assert(k < n_pmds_on_numa);
k++;
CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
int old_tx_qid;
- atomic_read_relaxed(&pmd->tx_qid, &old_tx_qid);
+ atomic_read_relaxed(&pmd->static_tx_qid, &old_tx_qid);
if (old_tx_qid >= n_pmds) {
int new_tx_qid = free_idx[--k];
- atomic_store_relaxed(&pmd->tx_qid, new_tx_qid);
+ atomic_store_relaxed(&pmd->static_tx_qid, new_tx_qid);
}
}
free(free_idx);
}
-/* Returns PMD thread from this numa node with fewer rx queues to poll.
- * Returns NULL if there is no PMD threads on this numa node.
- * Can be called safely only by main thread. */
+/* Deletes all rx queues from pmd->poll_list and all the ports from
+ * pmd->tx_ports. */
+static void
+dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd)
+{
+ struct rxq_poll *poll;
+ struct tx_port *port;
+
+ ovs_mutex_lock(&pmd->port_mutex);
+ LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) {
+ free(poll);
+ }
+ pmd->poll_cnt = 0;
+ HMAP_FOR_EACH_POP (port, node, &pmd->tx_ports) {
+ free(port);
+ }
+ ovs_mutex_unlock(&pmd->port_mutex);
+}
+
+static struct tx_port *
+tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
+{
+ struct tx_port *tx;
+
+ HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
+ if (tx->port->port_no == port_no) {
+ return tx;
+ }
+ }
+
+ return NULL;
+}
+
+/* Deletes all rx queues of 'port' from 'poll_list', and the 'port' from
+ * 'tx_ports' of 'pmd' thread. Returns true if 'port' was found in 'pmd'
+ * (therefore a restart is required). */
+static bool
+dp_netdev_del_port_from_pmd__(struct dp_netdev_port *port,
+ struct dp_netdev_pmd_thread *pmd)
+{
+ struct rxq_poll *poll, *next;
+ struct tx_port *tx;
+ bool found = false;
+
+ ovs_mutex_lock(&pmd->port_mutex);
+ LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
+ if (poll->port == port) {
+ found = true;
+ ovs_list_remove(&poll->node);
+ pmd->poll_cnt--;
+ free(poll);
+ }
+ }
+
+ tx = tx_port_lookup(&pmd->tx_ports, port->port_no);
+ if (tx) {
+ hmap_remove(&pmd->tx_ports, &tx->node);
+ free(tx);
+ found = true;
+ }
+ ovs_mutex_unlock(&pmd->port_mutex);
+
+ return found;
+}
+
+/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
+ * threads. The pmd threads that need to be restarted are inserted in
+ * 'to_reload'. */
+static void
+dp_netdev_del_port_from_all_pmds__(struct dp_netdev *dp,
+ struct dp_netdev_port *port,
+ struct hmapx *to_reload)
+{
+ struct dp_netdev_pmd_thread *pmd;
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ bool found;
+
+ found = dp_netdev_del_port_from_pmd__(port, pmd);
+
+ if (found) {
+ hmapx_add(to_reload, pmd);
+ }
+ }
+}
+
+/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
+ * threads. Reloads the threads if needed. */
+static void
+dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
+ struct dp_netdev_port *port)
+{
+ struct dp_netdev_pmd_thread *pmd;
+ struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
+ struct hmapx_node *node;
+
+ dp_netdev_del_port_from_all_pmds__(dp, port, &to_reload);
+
+ HMAPX_FOR_EACH (node, &to_reload) {
+ pmd = (struct dp_netdev_pmd_thread *) node->data;
+ dp_netdev_reload_pmd__(pmd);
+ }
+
+ hmapx_destroy(&to_reload);
+}
+
+
+/* Returns non-isolated PMD thread from this numa node with fewer
+ * rx queues to poll. Returns NULL if there is no non-isolated PMD threads
+ * on this numa node. Can be called safely only by main thread. */
static struct dp_netdev_pmd_thread *
dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
{
struct dp_netdev_pmd_thread *pmd, *res = NULL;
CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- if (pmd->numa_id == numa_id
+ if (!pmd->isolated && pmd->numa_id == numa_id
&& (min_cnt > pmd->poll_cnt || res == NULL)) {
min_cnt = pmd->poll_cnt;
res = pmd;
static void
dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev_port *port, struct netdev_rxq *rx)
- OVS_REQUIRES(pmd->poll_mutex)
+ OVS_REQUIRES(pmd->port_mutex)
{
struct rxq_poll *poll = xmalloc(sizeof *poll);
- port_ref(port);
poll->port = port;
poll->rx = rx;
- list_push_back(&pmd->poll_list, &poll->node);
+ ovs_list_push_back(&pmd->poll_list, &poll->node);
pmd->poll_cnt++;
}
-/* Checks the numa node id of 'netdev' and starts pmd threads for
- * the numa node. */
+/* Add 'port' to the tx port cache of 'pmd', which must be reloaded for the
+ * changes to take effect. */
+static void
+dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct dp_netdev_port *port)
+{
+ struct tx_port *tx = xzalloc(sizeof *tx);
+
+ tx->port = port;
+ tx->qid = -1;
+
+ ovs_mutex_lock(&pmd->port_mutex);
+ hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
+ ovs_mutex_unlock(&pmd->port_mutex);
+}
+
+/* Distribute all {pinned|non-pinned} rx queues of 'port' between PMD
+ * threads in 'dp'. The pmd threads that need to be restarted are inserted
+ * in 'to_reload'. PMD threads with pinned queues marked as isolated. */
+static void
+dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,
+ struct dp_netdev_port *port,
+ struct hmapx *to_reload, bool pinned)
+{
+ int numa_id = netdev_get_numa_id(port->netdev);
+ struct dp_netdev_pmd_thread *pmd;
+ int i;
+
+ if (!netdev_is_pmd(port->netdev)) {
+ return;
+ }
+
+ for (i = 0; i < port->n_rxq; i++) {
+ if (pinned) {
+ if (port->rxqs[i].core_id == -1) {
+ continue;
+ }
+ pmd = dp_netdev_get_pmd(dp, port->rxqs[i].core_id);
+ if (!pmd) {
+ VLOG_WARN("There is no PMD thread on core %d. "
+ "Queue %d on port \'%s\' will not be polled.",
+ port->rxqs[i].core_id, i,
+ netdev_get_name(port->netdev));
+ continue;
+ }
+ pmd->isolated = true;
+ dp_netdev_pmd_unref(pmd);
+ } else {
+ if (port->rxqs[i].core_id != -1) {
+ continue;
+ }
+ pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
+ if (!pmd) {
+ VLOG_WARN("There's no available pmd thread on numa node %d",
+ numa_id);
+ break;
+ }
+ }
+
+ ovs_mutex_lock(&pmd->port_mutex);
+ dp_netdev_add_rxq_to_pmd(pmd, port, port->rxqs[i].rxq);
+ ovs_mutex_unlock(&pmd->port_mutex);
+
+ hmapx_add(to_reload, pmd);
+ }
+}
+
+/* Distributes all non-pinned rx queues of 'port' between all PMD threads
+ * in 'dp' and inserts 'port' in the PMD threads 'tx_ports'. The pmd threads
+ * that need to be restarted are inserted in 'to_reload'. */
+static void
+dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,
+ struct hmapx *to_reload)
+{
+ struct dp_netdev_pmd_thread *pmd;
+
+ dp_netdev_add_port_rx_to_pmds(dp, port, to_reload, false);
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ dp_netdev_add_port_tx_to_pmd(pmd, port);
+ hmapx_add(to_reload, pmd);
+ }
+}
+
+/* Distributes all non-pinned rx queues of 'port' between all PMD threads
+ * in 'dp', inserts 'port' in the PMD threads 'tx_ports' and reloads them,
+ * if needed. */
+static void
+dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
+{
+ struct dp_netdev_pmd_thread *pmd;
+ struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
+ struct hmapx_node *node;
+
+ dp_netdev_add_port_to_pmds__(dp, port, &to_reload);
+
+ HMAPX_FOR_EACH (node, &to_reload) {
+ pmd = (struct dp_netdev_pmd_thread *) node->data;
+ dp_netdev_reload_pmd__(pmd);
+ }
+
+ hmapx_destroy(&to_reload);
+}
+
+/* Starts pmd threads for the numa node 'numa_id', if not already started.
+ * The function takes care of filling the threads tx port cache. */
static void
dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
+ OVS_REQUIRES(dp->port_mutex)
{
int n_pmds;
if (!ovs_numa_numa_id_is_valid(numa_id)) {
- VLOG_ERR("Cannot create pmd threads due to numa id (%d)"
- "invalid", numa_id);
- return ;
+ VLOG_WARN("Cannot create pmd threads due to numa id (%d) invalid",
+ numa_id);
+ return;
}
n_pmds = get_n_pmd_threads_on_numa(dp, numa_id);
* in which 'netdev' is on, do nothing. Else, creates the
* pmd threads for the numa node. */
if (!n_pmds) {
- int can_have, n_unpinned, i, index = 0;
- struct dp_netdev_pmd_thread **pmds;
- struct dp_netdev_port *port;
+ int can_have, n_unpinned, i;
n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
if (!n_unpinned) {
- VLOG_ERR("Cannot create pmd threads due to out of unpinned "
- "cores on numa node");
+ VLOG_WARN("Cannot create pmd threads due to out of unpinned "
+ "cores on numa node %d", numa_id);
return;
}
/* If cpu mask is specified, uses all unpinned cores, otherwise
* tries creating NR_PMD_THREADS pmd threads. */
can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS);
- pmds = xzalloc(can_have * sizeof *pmds);
for (i = 0; i < can_have; i++) {
unsigned core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
- pmds[i] = xzalloc(sizeof **pmds);
- dp_netdev_configure_pmd(pmds[i], dp, i, core_id, numa_id);
- }
+ struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
+ struct dp_netdev_port *port;
- /* Distributes rx queues of this numa node between new pmd threads. */
- CMAP_FOR_EACH (port, node, &dp->ports) {
- if (netdev_is_pmd(port->netdev)
- && netdev_get_numa_id(port->netdev) == numa_id) {
- for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
- /* Make thread-safety analyser happy. */
- ovs_mutex_lock(&pmds[index]->poll_mutex);
- dp_netdev_add_rxq_to_pmd(pmds[index], port, port->rxq[i]);
- ovs_mutex_unlock(&pmds[index]->poll_mutex);
- index = (index + 1) % can_have;
- }
+ dp_netdev_configure_pmd(pmd, dp, core_id, numa_id);
+
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ dp_netdev_add_port_tx_to_pmd(pmd, port);
}
- }
- /* Actual start of pmd threads. */
- for (i = 0; i < can_have; i++) {
- pmds[i]->thread = ovs_thread_create("pmd", pmd_thread_main, pmds[i]);
+ pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
}
- free(pmds);
VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id);
}
}
* new configuration. */
static void
dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
+ OVS_REQUIRES(dp->port_mutex)
{
+ struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
+ struct dp_netdev_pmd_thread *pmd;
struct dp_netdev_port *port;
+ struct hmapx_node *node;
- CMAP_FOR_EACH (port, node, &dp->ports) {
+ HMAP_FOR_EACH (port, node, &dp->ports) {
if (netdev_is_pmd(port->netdev)) {
int numa_id = netdev_get_numa_id(port->netdev);
dp_netdev_set_pmds_on_numa(dp, numa_id);
}
+ /* Distribute only pinned rx queues first to mark threads as isolated */
+ dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, true);
+ }
+
+ /* Distribute remaining non-pinned rx queues to non-isolated PMD threads. */
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, false);
}
+
+ HMAPX_FOR_EACH (node, &to_reload) {
+ pmd = (struct dp_netdev_pmd_thread *) node->data;
+ dp_netdev_reload_pmd__(pmd);
+ }
+
+ hmapx_destroy(&to_reload);
}
static char *
struct ofpbuf *actions, struct ofpbuf *put_actions)
{
struct dp_netdev *dp = pmd->dp;
- struct flow_tnl orig_tunnel;
- int err;
if (OVS_UNLIKELY(!dp->upcall_cb)) {
return ENODEV;
}
- /* Upcall processing expects the Geneve options to be in the translated
- * format but we need to retain the raw format for datapath use. */
- orig_tunnel.flags = flow->tunnel.flags;
- if (flow->tunnel.flags & FLOW_TNL_F_UDPIF) {
- orig_tunnel.metadata.present.len = flow->tunnel.metadata.present.len;
- memcpy(orig_tunnel.metadata.opts.gnv, flow->tunnel.metadata.opts.gnv,
- flow->tunnel.metadata.present.len);
- err = tun_metadata_from_geneve_udpif(&orig_tunnel, &orig_tunnel,
- &flow->tunnel);
- if (err) {
- return err;
- }
- }
-
if (OVS_UNLIKELY(!VLOG_DROP_DBG(&upcall_rl))) {
struct ds ds = DS_EMPTY_INITIALIZER;
char *packet_str;
struct odp_flow_key_parms odp_parms = {
.flow = flow,
.mask = &wc->masks,
- .odp_in_port = flow->in_port.odp_port,
.support = dp_netdev_support,
};
ds_destroy(&ds);
}
- err = dp->upcall_cb(packet_, flow, ufid, pmd->core_id, type, userdata,
- actions, wc, put_actions, dp->upcall_aux);
- if (err && err != ENOSPC) {
- return err;
- }
-
- /* Translate tunnel metadata masks to datapath format. */
- if (wc) {
- if (wc->masks.tunnel.metadata.present.map) {
- struct geneve_opt opts[TLV_TOT_OPT_SIZE /
- sizeof(struct geneve_opt)];
-
- if (orig_tunnel.flags & FLOW_TNL_F_UDPIF) {
- tun_metadata_to_geneve_udpif_mask(&flow->tunnel,
- &wc->masks.tunnel,
- orig_tunnel.metadata.opts.gnv,
- orig_tunnel.metadata.present.len,
- opts);
- } else {
- orig_tunnel.metadata.present.len = 0;
- }
-
- memset(&wc->masks.tunnel.metadata, 0,
- sizeof wc->masks.tunnel.metadata);
- memcpy(&wc->masks.tunnel.metadata.opts.gnv, opts,
- orig_tunnel.metadata.present.len);
- }
- wc->masks.tunnel.metadata.present.len = 0xff;
- }
-
- /* Restore tunnel metadata. We need to use the saved options to ensure
- * that any unknown options are not lost. The generated mask will have
- * the same structure, matching on types and lengths but wildcarding
- * option data we don't care about. */
- if (orig_tunnel.flags & FLOW_TNL_F_UDPIF) {
- memcpy(&flow->tunnel.metadata.opts.gnv, orig_tunnel.metadata.opts.gnv,
- orig_tunnel.metadata.present.len);
- flow->tunnel.metadata.present.len = orig_tunnel.metadata.present.len;
- flow->tunnel.flags |= FLOW_TNL_F_UDPIF;
- }
-
- return err;
+ return dp->upcall_cb(packet_, flow, ufid, pmd->core_id, type, userdata,
+ actions, wc, put_actions, dp->upcall_aux);
}
static inline uint32_t
return hash;
}
-struct packet_batch {
- unsigned int packet_count;
+struct packet_batch_per_flow {
unsigned int byte_count;
uint16_t tcp_flags;
-
struct dp_netdev_flow *flow;
- struct dp_packet *packets[NETDEV_MAX_BURST];
+ struct dp_packet_batch array;
};
static inline void
-packet_batch_update(struct packet_batch *batch, struct dp_packet *packet,
- const struct miniflow *mf)
+packet_batch_per_flow_update(struct packet_batch_per_flow *batch,
+ struct dp_packet *packet,
+ const struct miniflow *mf)
{
- batch->tcp_flags |= miniflow_get_tcp_flags(mf);
- batch->packets[batch->packet_count++] = packet;
batch->byte_count += dp_packet_size(packet);
+ batch->tcp_flags |= miniflow_get_tcp_flags(mf);
+ batch->array.packets[batch->array.count++] = packet;
}
static inline void
-packet_batch_init(struct packet_batch *batch, struct dp_netdev_flow *flow)
+packet_batch_per_flow_init(struct packet_batch_per_flow *batch,
+ struct dp_netdev_flow *flow)
{
flow->batch = batch;
batch->flow = flow;
- batch->packet_count = 0;
+ dp_packet_batch_init(&batch->array);
batch->byte_count = 0;
batch->tcp_flags = 0;
}
static inline void
-packet_batch_execute(struct packet_batch *batch,
- struct dp_netdev_pmd_thread *pmd,
- long long now)
+packet_batch_per_flow_execute(struct packet_batch_per_flow *batch,
+ struct dp_netdev_pmd_thread *pmd,
+ long long now)
{
struct dp_netdev_actions *actions;
struct dp_netdev_flow *flow = batch->flow;
- dp_netdev_flow_used(flow, batch->packet_count, batch->byte_count,
+ dp_netdev_flow_used(flow, batch->array.count, batch->byte_count,
batch->tcp_flags, now);
actions = dp_netdev_flow_get_actions(flow);
- dp_netdev_execute_actions(pmd, batch->packets, batch->packet_count, true,
- actions->actions, actions->size);
+ dp_netdev_execute_actions(pmd, &batch->array, true, &flow->flow,
+ actions->actions, actions->size, now);
}
static inline void
dp_netdev_queue_batches(struct dp_packet *pkt,
struct dp_netdev_flow *flow, const struct miniflow *mf,
- struct packet_batch *batches, size_t *n_batches)
+ struct packet_batch_per_flow *batches, size_t *n_batches)
{
- struct packet_batch *batch = flow->batch;
+ struct packet_batch_per_flow *batch = flow->batch;
if (OVS_UNLIKELY(!batch)) {
batch = &batches[(*n_batches)++];
- packet_batch_init(batch, flow);
+ packet_batch_per_flow_init(batch, flow);
}
- packet_batch_update(batch, pkt, mf);
+ packet_batch_per_flow_update(batch, pkt, mf);
}
/* Try to process all ('cnt') the 'packets' using only the exact match cache
* initialized by this function using 'port_no'.
*/
static inline size_t
-emc_processing(struct dp_netdev_pmd_thread *pmd, struct dp_packet **packets,
- size_t cnt, struct netdev_flow_key *keys,
- struct packet_batch batches[], size_t *n_batches,
+emc_processing(struct dp_netdev_pmd_thread *pmd, struct dp_packet_batch *packets_,
+ struct netdev_flow_key *keys,
+ struct packet_batch_per_flow batches[], size_t *n_batches,
bool md_is_valid, odp_port_t port_no)
{
struct emc_cache *flow_cache = &pmd->flow_cache;
struct netdev_flow_key *key = &keys[0];
size_t i, n_missed = 0, n_dropped = 0;
+ struct dp_packet **packets = packets_->packets;
+ int cnt = packets_->count;
for (i = 0; i < cnt; i++) {
struct dp_netdev_flow *flow;
return n_missed;
}
+static inline void
+handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet,
+ const struct netdev_flow_key *key,
+ struct ofpbuf *actions, struct ofpbuf *put_actions,
+ int *lost_cnt, long long now)
+{
+ struct ofpbuf *add_actions;
+ struct dp_packet_batch b;
+ struct match match;
+ ovs_u128 ufid;
+ int error;
+
+ match.tun_md.valid = false;
+ miniflow_expand(&key->mf, &match.flow);
+
+ ofpbuf_clear(actions);
+ ofpbuf_clear(put_actions);
+
+ dpif_flow_hash(pmd->dp->dpif, &match.flow, sizeof match.flow, &ufid);
+ error = dp_netdev_upcall(pmd, packet, &match.flow, &match.wc,
+ &ufid, DPIF_UC_MISS, NULL, actions,
+ put_actions);
+ if (OVS_UNLIKELY(error && error != ENOSPC)) {
+ dp_packet_delete(packet);
+ (*lost_cnt)++;
+ return;
+ }
+
+ /* The Netlink encoding of datapath flow keys cannot express
+ * wildcarding the presence of a VLAN tag. Instead, a missing VLAN
+ * tag is interpreted as exact match on the fact that there is no
+ * VLAN. Unless we refactor a lot of code that translates between
+ * Netlink and struct flow representations, we have to do the same
+ * here. */
+ if (!match.wc.masks.vlan_tci) {
+ match.wc.masks.vlan_tci = htons(0xffff);
+ }
+
+ /* We can't allow the packet batching in the next loop to execute
+ * the actions. Otherwise, if there are any slow path actions,
+ * we'll send the packet up twice. */
+ packet_batch_init_packet(&b, packet);
+ dp_netdev_execute_actions(pmd, &b, true, &match.flow,
+ actions->data, actions->size, now);
+
+ add_actions = put_actions->size ? put_actions : actions;
+ if (OVS_LIKELY(error != ENOSPC)) {
+ struct dp_netdev_flow *netdev_flow;
+
+ /* XXX: There's a race window where a flow covering this packet
+ * could have already been installed since we last did the flow
+ * lookup before upcall. This could be solved by moving the
+ * mutex lock outside the loop, but that's an awful long time
+ * to be locking everyone out of making flow installs. If we
+ * move to a per-core classifier, it would be reasonable. */
+ ovs_mutex_lock(&pmd->flow_mutex);
+ netdev_flow = dp_netdev_pmd_lookup_flow(pmd, key, NULL);
+ if (OVS_LIKELY(!netdev_flow)) {
+ netdev_flow = dp_netdev_flow_add(pmd, &match, &ufid,
+ add_actions->data,
+ add_actions->size);
+ }
+ ovs_mutex_unlock(&pmd->flow_mutex);
+
+ emc_insert(&pmd->flow_cache, key, netdev_flow);
+ }
+}
+
static inline void
fast_path_processing(struct dp_netdev_pmd_thread *pmd,
- struct dp_packet **packets, size_t cnt,
+ struct dp_packet_batch *packets_,
struct netdev_flow_key *keys,
- struct packet_batch batches[], size_t *n_batches)
+ struct packet_batch_per_flow batches[], size_t *n_batches,
+ odp_port_t in_port,
+ long long now)
{
+ int cnt = packets_->count;
#if !defined(__CHECKER__) && !defined(_WIN32)
const size_t PKT_ARRAY_SIZE = cnt;
#else
/* Sparse or MSVC doesn't like variable length array. */
enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
#endif
+ struct dp_packet **packets = packets_->packets;
+ struct dpcls *cls;
struct dpcls_rule *rules[PKT_ARRAY_SIZE];
struct dp_netdev *dp = pmd->dp;
struct emc_cache *flow_cache = &pmd->flow_cache;
int miss_cnt = 0, lost_cnt = 0;
+ int lookup_cnt = 0, add_lookup_cnt;
bool any_miss;
size_t i;
/* Key length is needed in all the cases, hash computed on demand. */
keys[i].len = netdev_flow_key_size(miniflow_n_values(&keys[i].mf));
}
- any_miss = !dpcls_lookup(&pmd->cls, keys, rules, cnt);
+ /* Get the classifier for the in_port */
+ cls = dp_netdev_pmd_lookup_dpcls(pmd, in_port);
+ if (OVS_LIKELY(cls)) {
+ any_miss = !dpcls_lookup(cls, keys, rules, cnt, &lookup_cnt);
+ } else {
+ any_miss = true;
+ memset(rules, 0, sizeof(rules));
+ }
if (OVS_UNLIKELY(any_miss) && !fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
uint64_t actions_stub[512 / 8], slow_stub[512 / 8];
struct ofpbuf actions, put_actions;
- ovs_u128 ufid;
ofpbuf_use_stub(&actions, actions_stub, sizeof actions_stub);
ofpbuf_use_stub(&put_actions, slow_stub, sizeof slow_stub);
for (i = 0; i < cnt; i++) {
struct dp_netdev_flow *netdev_flow;
- struct ofpbuf *add_actions;
- struct match match;
- int error;
if (OVS_LIKELY(rules[i])) {
continue;
/* It's possible that an earlier slow path execution installed
* a rule covering this flow. In this case, it's a lot cheaper
* to catch it here than execute a miss. */
- netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &keys[i]);
+ netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &keys[i],
+ &add_lookup_cnt);
if (netdev_flow) {
+ lookup_cnt += add_lookup_cnt;
rules[i] = &netdev_flow->cr;
continue;
}
miss_cnt++;
-
- match.tun_md.valid = false;
- miniflow_expand(&keys[i].mf, &match.flow);
-
- ofpbuf_clear(&actions);
- ofpbuf_clear(&put_actions);
-
- dpif_flow_hash(dp->dpif, &match.flow, sizeof match.flow, &ufid);
- error = dp_netdev_upcall(pmd, packets[i], &match.flow, &match.wc,
- &ufid, DPIF_UC_MISS, NULL, &actions,
- &put_actions);
- if (OVS_UNLIKELY(error && error != ENOSPC)) {
- dp_packet_delete(packets[i]);
- lost_cnt++;
- continue;
- }
-
- /* The Netlink encoding of datapath flow keys cannot express
- * wildcarding the presence of a VLAN tag. Instead, a missing VLAN
- * tag is interpreted as exact match on the fact that there is no
- * VLAN. Unless we refactor a lot of code that translates between
- * Netlink and struct flow representations, we have to do the same
- * here. */
- if (!match.wc.masks.vlan_tci) {
- match.wc.masks.vlan_tci = htons(0xffff);
- }
-
- /* We can't allow the packet batching in the next loop to execute
- * the actions. Otherwise, if there are any slow path actions,
- * we'll send the packet up twice. */
- dp_netdev_execute_actions(pmd, &packets[i], 1, true,
- actions.data, actions.size);
-
- add_actions = put_actions.size ? &put_actions : &actions;
- if (OVS_LIKELY(error != ENOSPC)) {
- /* XXX: There's a race window where a flow covering this packet
- * could have already been installed since we last did the flow
- * lookup before upcall. This could be solved by moving the
- * mutex lock outside the loop, but that's an awful long time
- * to be locking everyone out of making flow installs. If we
- * move to a per-core classifier, it would be reasonable. */
- ovs_mutex_lock(&pmd->flow_mutex);
- netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &keys[i]);
- if (OVS_LIKELY(!netdev_flow)) {
- netdev_flow = dp_netdev_flow_add(pmd, &match, &ufid,
- add_actions->data,
- add_actions->size);
- }
- ovs_mutex_unlock(&pmd->flow_mutex);
-
- emc_insert(flow_cache, &keys[i], netdev_flow);
- }
+ handle_packet_upcall(pmd, packets[i], &keys[i], &actions,
+ &put_actions, &lost_cnt, now);
}
ofpbuf_uninit(&actions);
}
dp_netdev_count_packet(pmd, DP_STAT_MASKED_HIT, cnt - miss_cnt);
+ dp_netdev_count_packet(pmd, DP_STAT_LOOKUP_HIT, lookup_cnt);
dp_netdev_count_packet(pmd, DP_STAT_MISS, miss_cnt);
dp_netdev_count_packet(pmd, DP_STAT_LOST, lost_cnt);
}
* valid, 'md_is_valid' must be true and 'port_no' will be ignored. */
static void
dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
- struct dp_packet **packets, int cnt,
+ struct dp_packet_batch *packets,
bool md_is_valid, odp_port_t port_no)
{
+ int cnt = packets->count;
#if !defined(__CHECKER__) && !defined(_WIN32)
const size_t PKT_ARRAY_SIZE = cnt;
#else
/* Sparse or MSVC doesn't like variable length array. */
enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
#endif
- struct netdev_flow_key keys[PKT_ARRAY_SIZE];
- struct packet_batch batches[PKT_ARRAY_SIZE];
+ struct netdev_flow_key keys[PKT_ARRAY_SIZE] OVS_ALIGNED_VAR(CACHE_LINE_SIZE);
+ struct packet_batch_per_flow batches[PKT_ARRAY_SIZE];
long long now = time_msec();
size_t newcnt, n_batches, i;
+ odp_port_t in_port;
n_batches = 0;
- newcnt = emc_processing(pmd, packets, cnt, keys, batches, &n_batches,
+ newcnt = emc_processing(pmd, packets, keys, batches, &n_batches,
md_is_valid, port_no);
if (OVS_UNLIKELY(newcnt)) {
- fast_path_processing(pmd, packets, newcnt, keys, batches, &n_batches);
- }
-
+ packets->count = newcnt;
+ /* Get ingress port from first packet's metadata. */
+ in_port = packets->packets[0]->md.in_port.odp_port;
+ fast_path_processing(pmd, packets, keys, batches, &n_batches, in_port, now);
+ }
+
+ /* All the flow batches need to be reset before any call to
+ * packet_batch_per_flow_execute() as it could potentially trigger
+ * recirculation. When a packet matching flow ‘j’ happens to be
+ * recirculated, the nested call to dp_netdev_input__() could potentially
+ * classify the packet as matching another flow - say 'k'. It could happen
+ * that in the previous call to dp_netdev_input__() that same flow 'k' had
+ * already its own batches[k] still waiting to be served. So if its
+ * ‘batch’ member is not reset, the recirculated packet would be wrongly
+ * appended to batches[k] of the 1st call to dp_netdev_input__(). */
for (i = 0; i < n_batches; i++) {
batches[i].flow->batch = NULL;
}
for (i = 0; i < n_batches; i++) {
- packet_batch_execute(&batches[i], pmd, now);
+ packet_batch_per_flow_execute(&batches[i], pmd, now);
}
}
static void
dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
- struct dp_packet **packets, int cnt,
+ struct dp_packet_batch *packets,
odp_port_t port_no)
{
- dp_netdev_input__(pmd, packets, cnt, false, port_no);
+ dp_netdev_input__(pmd, packets, false, port_no);
}
static void
dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd,
- struct dp_packet **packets, int cnt)
+ struct dp_packet_batch *packets)
{
- dp_netdev_input__(pmd, packets, cnt, true, 0);
+ dp_netdev_input__(pmd, packets, true, 0);
}
struct dp_netdev_execute_aux {
struct dp_netdev_pmd_thread *pmd;
+ long long now;
+ const struct flow *flow;
};
static void
}
static void
-dp_netdev_drop_packets(struct dp_packet **packets, int cnt, bool may_steal)
+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
+ long long now, bool purge)
{
- if (may_steal) {
- int i;
+ struct tx_port *tx;
+ struct dp_netdev_port *port;
+ long long interval;
- for (i = 0; i < cnt; i++) {
- dp_packet_delete(packets[i]);
+ HMAP_FOR_EACH (tx, node, &pmd->port_cache) {
+ if (!tx->port->dynamic_txqs) {
+ continue;
+ }
+ interval = now - tx->last_used;
+ if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {
+ port = tx->port;
+ ovs_mutex_lock(&port->txq_used_mutex);
+ port->txq_used[tx->qid]--;
+ ovs_mutex_unlock(&port->txq_used_mutex);
+ tx->qid = -1;
}
}
}
static int
-push_tnl_action(const struct dp_netdev *dp,
- const struct nlattr *attr,
- struct dp_packet **packets, int cnt)
+dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
+ struct tx_port *tx, long long now)
{
- struct dp_netdev_port *tun_port;
+ struct dp_netdev_port *port;
+ long long interval;
+ int i, min_cnt, min_qid;
+
+ if (OVS_UNLIKELY(!now)) {
+ now = time_msec();
+ }
+
+ interval = now - tx->last_used;
+ tx->last_used = now;
+
+ if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {
+ return tx->qid;
+ }
+
+ port = tx->port;
+
+ ovs_mutex_lock(&port->txq_used_mutex);
+ if (tx->qid >= 0) {
+ port->txq_used[tx->qid]--;
+ tx->qid = -1;
+ }
+
+ min_cnt = -1;
+ min_qid = 0;
+ for (i = 0; i < netdev_n_txq(port->netdev); i++) {
+ if (port->txq_used[i] < min_cnt || min_cnt == -1) {
+ min_cnt = port->txq_used[i];
+ min_qid = i;
+ }
+ }
+
+ port->txq_used[min_qid]++;
+ tx->qid = min_qid;
+
+ ovs_mutex_unlock(&port->txq_used_mutex);
+
+ dpif_netdev_xps_revalidate_pmd(pmd, now, false);
+
+ VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
+ pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
+ return min_qid;
+}
+
+static struct tx_port *
+pmd_tx_port_cache_lookup(const struct dp_netdev_pmd_thread *pmd,
+ odp_port_t port_no)
+{
+ return tx_port_lookup(&pmd->port_cache, port_no);
+}
+
+static int
+push_tnl_action(const struct dp_netdev_pmd_thread *pmd,
+ const struct nlattr *attr,
+ struct dp_packet_batch *batch)
+{
+ struct tx_port *tun_port;
const struct ovs_action_push_tnl *data;
+ int err;
data = nl_attr_get(attr);
- tun_port = dp_netdev_lookup_port(dp, u32_to_odp(data->tnl_port));
+ tun_port = pmd_tx_port_cache_lookup(pmd, u32_to_odp(data->tnl_port));
if (!tun_port) {
- return -EINVAL;
+ err = -EINVAL;
+ goto error;
}
- netdev_push_header(tun_port->netdev, packets, cnt, data);
-
- return 0;
+ err = netdev_push_header(tun_port->port->netdev, batch, data);
+ if (!err) {
+ return 0;
+ }
+error:
+ dp_packet_delete_batch(batch, true);
+ return err;
}
static void
-dp_netdev_clone_pkt_batch(struct dp_packet **dst_pkts,
- struct dp_packet **src_pkts, int cnt)
+dp_execute_userspace_action(struct dp_netdev_pmd_thread *pmd,
+ struct dp_packet *packet, bool may_steal,
+ struct flow *flow, ovs_u128 *ufid,
+ struct ofpbuf *actions,
+ const struct nlattr *userdata, long long now)
{
- int i;
+ struct dp_packet_batch b;
+ int error;
- for (i = 0; i < cnt; i++) {
- dst_pkts[i] = dp_packet_clone(src_pkts[i]);
+ ofpbuf_clear(actions);
+
+ error = dp_netdev_upcall(pmd, packet, flow, NULL, ufid,
+ DPIF_UC_ACTION, userdata, actions,
+ NULL);
+ if (!error || error == ENOSPC) {
+ packet_batch_init_packet(&b, packet);
+ dp_netdev_execute_actions(pmd, &b, may_steal, flow,
+ actions->data, actions->size, now);
+ } else if (may_steal) {
+ dp_packet_delete(packet);
}
}
static void
-dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
+dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
const struct nlattr *a, bool may_steal)
- OVS_NO_THREAD_SAFETY_ANALYSIS
{
struct dp_netdev_execute_aux *aux = aux_;
uint32_t *depth = recirc_depth_get();
struct dp_netdev_pmd_thread *pmd = aux->pmd;
struct dp_netdev *dp = pmd->dp;
int type = nl_attr_type(a);
- struct dp_netdev_port *p;
- int i;
+ long long now = aux->now;
+ struct tx_port *p;
switch ((enum ovs_action_attr)type) {
case OVS_ACTION_ATTR_OUTPUT:
- p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
+ p = pmd_tx_port_cache_lookup(pmd, u32_to_odp(nl_attr_get_u32(a)));
if (OVS_LIKELY(p)) {
int tx_qid;
+ bool dynamic_txqs;
- atomic_read_relaxed(&pmd->tx_qid, &tx_qid);
+ dynamic_txqs = p->port->dynamic_txqs;
+ if (dynamic_txqs) {
+ tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
+ } else {
+ atomic_read_relaxed(&pmd->static_tx_qid, &tx_qid);
+ }
- netdev_send(p->netdev, tx_qid, packets, cnt, may_steal);
+ netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
+ dynamic_txqs);
return;
}
break;
case OVS_ACTION_ATTR_TUNNEL_PUSH:
if (*depth < MAX_RECIRC_DEPTH) {
- struct dp_packet *tnl_pkt[NETDEV_MAX_BURST];
+ struct dp_packet_batch tnl_pkt;
+ struct dp_packet_batch *orig_packets_ = packets_;
int err;
if (!may_steal) {
- dp_netdev_clone_pkt_batch(tnl_pkt, packets, cnt);
- packets = tnl_pkt;
+ dp_packet_batch_clone(&tnl_pkt, packets_);
+ packets_ = &tnl_pkt;
+ dp_packet_batch_reset_cutlen(orig_packets_);
}
- err = push_tnl_action(dp, a, packets, cnt);
+ dp_packet_batch_apply_cutlen(packets_);
+
+ err = push_tnl_action(pmd, a, packets_);
if (!err) {
(*depth)++;
- dp_netdev_recirculate(pmd, packets, cnt);
+ dp_netdev_recirculate(pmd, packets_);
(*depth)--;
- } else {
- dp_netdev_drop_packets(tnl_pkt, cnt, !may_steal);
}
return;
}
case OVS_ACTION_ATTR_TUNNEL_POP:
if (*depth < MAX_RECIRC_DEPTH) {
+ struct dp_packet_batch *orig_packets_ = packets_;
odp_port_t portno = u32_to_odp(nl_attr_get_u32(a));
- p = dp_netdev_lookup_port(dp, portno);
+ p = pmd_tx_port_cache_lookup(pmd, portno);
if (p) {
- struct dp_packet *tnl_pkt[NETDEV_MAX_BURST];
- int err;
+ struct dp_packet_batch tnl_pkt;
+ int i;
if (!may_steal) {
- dp_netdev_clone_pkt_batch(tnl_pkt, packets, cnt);
- packets = tnl_pkt;
+ dp_packet_batch_clone(&tnl_pkt, packets_);
+ packets_ = &tnl_pkt;
+ dp_packet_batch_reset_cutlen(orig_packets_);
}
- err = netdev_pop_header(p->netdev, packets, cnt);
- if (!err) {
+ dp_packet_batch_apply_cutlen(packets_);
- for (i = 0; i < cnt; i++) {
- packets[i]->md.in_port.odp_port = portno;
- }
+ netdev_pop_header(p->port->netdev, packets_);
+ if (!packets_->count) {
+ return;
+ }
- (*depth)++;
- dp_netdev_recirculate(pmd, packets, cnt);
- (*depth)--;
- } else {
- dp_netdev_drop_packets(tnl_pkt, cnt, !may_steal);
+ for (i = 0; i < packets_->count; i++) {
+ packets_->packets[i]->md.in_port.odp_port = portno;
}
+
+ (*depth)++;
+ dp_netdev_recirculate(pmd, packets_);
+ (*depth)--;
return;
}
}
case OVS_ACTION_ATTR_USERSPACE:
if (!fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
+ struct dp_packet_batch *orig_packets_ = packets_;
+ struct dp_packet **packets = packets_->packets;
const struct nlattr *userdata;
+ struct dp_packet_batch usr_pkt;
struct ofpbuf actions;
struct flow flow;
ovs_u128 ufid;
+ bool clone = false;
+ int i;
userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
ofpbuf_init(&actions, 0);
- for (i = 0; i < cnt; i++) {
- int error;
+ if (packets_->trunc) {
+ if (!may_steal) {
+ dp_packet_batch_clone(&usr_pkt, packets_);
+ packets_ = &usr_pkt;
+ packets = packets_->packets;
+ clone = true;
+ dp_packet_batch_reset_cutlen(orig_packets_);
+ }
- ofpbuf_clear(&actions);
+ dp_packet_batch_apply_cutlen(packets_);
+ }
+ for (i = 0; i < packets_->count; i++) {
flow_extract(packets[i], &flow);
dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid);
- error = dp_netdev_upcall(pmd, packets[i], &flow, NULL, &ufid,
- DPIF_UC_ACTION, userdata,&actions,
- NULL);
- if (!error || error == ENOSPC) {
- dp_netdev_execute_actions(pmd, &packets[i], 1, may_steal,
- actions.data, actions.size);
- } else if (may_steal) {
- dp_packet_delete(packets[i]);
- }
+ dp_execute_userspace_action(pmd, packets[i], may_steal, &flow,
+ &ufid, &actions, userdata, now);
}
+
+ if (clone) {
+ dp_packet_delete_batch(packets_, true);
+ }
+
ofpbuf_uninit(&actions);
fat_rwlock_unlock(&dp->upcall_rwlock);
case OVS_ACTION_ATTR_RECIRC:
if (*depth < MAX_RECIRC_DEPTH) {
- struct dp_packet *recirc_pkts[NETDEV_MAX_BURST];
+ struct dp_packet_batch recirc_pkts;
+ int i;
if (!may_steal) {
- dp_netdev_clone_pkt_batch(recirc_pkts, packets, cnt);
- packets = recirc_pkts;
+ dp_packet_batch_clone(&recirc_pkts, packets_);
+ packets_ = &recirc_pkts;
}
- for (i = 0; i < cnt; i++) {
- packets[i]->md.recirc_id = nl_attr_get_u32(a);
+ for (i = 0; i < packets_->count; i++) {
+ packets_->packets[i]->md.recirc_id = nl_attr_get_u32(a);
}
(*depth)++;
- dp_netdev_recirculate(pmd, packets, cnt);
+ dp_netdev_recirculate(pmd, packets_);
(*depth)--;
return;
VLOG_WARN("Packet dropped. Max recirculation depth exceeded.");
break;
- case OVS_ACTION_ATTR_CT:
- /* If a flow with this action is slow-pathed, datapath assistance is
- * required to implement it. However, we don't support this action
- * in the userspace datapath. */
- VLOG_WARN("Cannot execute conntrack action in userspace.");
+ case OVS_ACTION_ATTR_CT: {
+ const struct nlattr *b;
+ bool commit = false;
+ unsigned int left;
+ uint16_t zone = 0;
+ const char *helper = NULL;
+ const uint32_t *setmark = NULL;
+ const struct ovs_key_ct_labels *setlabel = NULL;
+
+ NL_ATTR_FOR_EACH_UNSAFE (b, left, nl_attr_get(a),
+ nl_attr_get_size(a)) {
+ enum ovs_ct_attr sub_type = nl_attr_type(b);
+
+ switch(sub_type) {
+ case OVS_CT_ATTR_COMMIT:
+ commit = true;
+ break;
+ case OVS_CT_ATTR_ZONE:
+ zone = nl_attr_get_u16(b);
+ break;
+ case OVS_CT_ATTR_HELPER:
+ helper = nl_attr_get_string(b);
+ break;
+ case OVS_CT_ATTR_MARK:
+ setmark = nl_attr_get(b);
+ break;
+ case OVS_CT_ATTR_LABELS:
+ setlabel = nl_attr_get(b);
+ break;
+ case OVS_CT_ATTR_NAT:
+ case OVS_CT_ATTR_UNSPEC:
+ case __OVS_CT_ATTR_MAX:
+ OVS_NOT_REACHED();
+ }
+ }
+
+ conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, commit,
+ zone, setmark, setlabel, helper);
break;
+ }
case OVS_ACTION_ATTR_PUSH_VLAN:
case OVS_ACTION_ATTR_POP_VLAN:
case OVS_ACTION_ATTR_SAMPLE:
case OVS_ACTION_ATTR_HASH:
case OVS_ACTION_ATTR_UNSPEC:
+ case OVS_ACTION_ATTR_TRUNC:
case __OVS_ACTION_ATTR_MAX:
OVS_NOT_REACHED();
}
- dp_netdev_drop_packets(packets, cnt, may_steal);
+ dp_packet_delete_batch(packets_, may_steal);
}
static void
dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
- struct dp_packet **packets, int cnt,
- bool may_steal,
- const struct nlattr *actions, size_t actions_len)
+ struct dp_packet_batch *packets,
+ bool may_steal, const struct flow *flow,
+ const struct nlattr *actions, size_t actions_len,
+ long long now)
{
- struct dp_netdev_execute_aux aux = { pmd };
+ struct dp_netdev_execute_aux aux = { pmd, now, flow };
- odp_execute_actions(&aux, packets, cnt, may_steal, actions,
+ odp_execute_actions(&aux, packets, may_steal, actions,
actions_len, dp_execute_cb);
}
+struct dp_netdev_ct_dump {
+ struct ct_dpif_dump_state up;
+ struct conntrack_dump dump;
+ struct conntrack *ct;
+ struct dp_netdev *dp;
+};
+
+static int
+dpif_netdev_ct_dump_start(struct dpif *dpif, struct ct_dpif_dump_state **dump_,
+ const uint16_t *pzone)
+{
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+ struct dp_netdev_ct_dump *dump;
+
+ dump = xzalloc(sizeof *dump);
+ dump->dp = dp;
+ dump->ct = &dp->conntrack;
+
+ conntrack_dump_start(&dp->conntrack, &dump->dump, pzone);
+
+ *dump_ = &dump->up;
+
+ return 0;
+}
+
+static int
+dpif_netdev_ct_dump_next(struct dpif *dpif OVS_UNUSED,
+ struct ct_dpif_dump_state *dump_,
+ struct ct_dpif_entry *entry)
+{
+ struct dp_netdev_ct_dump *dump;
+
+ INIT_CONTAINER(dump, dump_, up);
+
+ return conntrack_dump_next(&dump->dump, entry);
+}
+
+static int
+dpif_netdev_ct_dump_done(struct dpif *dpif OVS_UNUSED,
+ struct ct_dpif_dump_state *dump_)
+{
+ struct dp_netdev_ct_dump *dump;
+ int err;
+
+ INIT_CONTAINER(dump, dump_, up);
+
+ err = conntrack_dump_done(&dump->dump);
+
+ free(dump);
+
+ return err;
+}
+
+static int
+dpif_netdev_ct_flush(struct dpif *dpif, const uint16_t *zone)
+{
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+
+ return conntrack_flush(&dp->conntrack, zone);
+}
+
const struct dpif_class dpif_netdev_class = {
"netdev",
dpif_netdev_init,
dpif_netdev_get_stats,
dpif_netdev_port_add,
dpif_netdev_port_del,
+ dpif_netdev_port_set_config,
dpif_netdev_port_query_by_number,
dpif_netdev_port_query_by_name,
NULL, /* port_get_pid */
dpif_netdev_enable_upcall,
dpif_netdev_disable_upcall,
dpif_netdev_get_datapath_version,
- NULL, /* ct_dump_start */
- NULL, /* ct_dump_next */
- NULL, /* ct_dump_done */
- NULL, /* ct_flush */
+ dpif_netdev_ct_dump_start,
+ dpif_netdev_ct_dump_next,
+ dpif_netdev_ct_dump_done,
+ dpif_netdev_ct_flush,
};
static void
dpif_dummy_change_port_number(struct unixctl_conn *conn, int argc OVS_UNUSED,
const char *argv[], void *aux OVS_UNUSED)
{
- struct dp_netdev_port *old_port;
- struct dp_netdev_port *new_port;
+ struct dp_netdev_port *port;
struct dp_netdev *dp;
odp_port_t port_no;
ovs_mutex_unlock(&dp_netdev_mutex);
ovs_mutex_lock(&dp->port_mutex);
- if (get_port_by_name(dp, argv[2], &old_port)) {
+ if (get_port_by_name(dp, argv[2], &port)) {
unixctl_command_reply_error(conn, "unknown port");
goto exit;
}
goto exit;
}
- /* Remove old port. */
- cmap_remove(&dp->ports, &old_port->node, hash_port_no(old_port->port_no));
- ovsrcu_postpone(free, old_port);
+ /* Remove port. */
+ hmap_remove(&dp->ports, &port->node);
+ dp_netdev_del_port_from_all_pmds(dp, port);
- /* Insert new port (cmap semantics mean we cannot re-insert 'old_port'). */
- new_port = xmemdup(old_port, sizeof *old_port);
- new_port->port_no = port_no;
- cmap_insert(&dp->ports, &new_port->node, hash_port_no(port_no));
+ /* Reinsert with new port number. */
+ port->port_no = port_no;
+ hmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
+ dp_netdev_add_port_to_pmds(dp, port);
seq_change(dp->port_seq);
unixctl_command_reply(conn, NULL);
dp_netdev_unref(dp);
}
-static void
-dpif_dummy_delete_port(struct unixctl_conn *conn, int argc OVS_UNUSED,
- const char *argv[], void *aux OVS_UNUSED)
-{
- struct dp_netdev_port *port;
- struct dp_netdev *dp;
-
- ovs_mutex_lock(&dp_netdev_mutex);
- dp = shash_find_data(&dp_netdevs, argv[1]);
- if (!dp || !dpif_netdev_class_is_dummy(dp->class)) {
- ovs_mutex_unlock(&dp_netdev_mutex);
- unixctl_command_reply_error(conn, "unknown datapath or not a dummy");
- return;
- }
- ovs_refcount_ref(&dp->ref_cnt);
- ovs_mutex_unlock(&dp_netdev_mutex);
-
- ovs_mutex_lock(&dp->port_mutex);
- if (get_port_by_name(dp, argv[2], &port)) {
- unixctl_command_reply_error(conn, "unknown port");
- } else if (port->port_no == ODPP_LOCAL) {
- unixctl_command_reply_error(conn, "can't delete local port");
- } else {
- do_del_port(dp, port);
- unixctl_command_reply(conn, NULL);
- }
- ovs_mutex_unlock(&dp->port_mutex);
-
- dp_netdev_unref(dp);
-}
-
static void
dpif_dummy_register__(const char *type)
{
unixctl_command_register("dpif-dummy/change-port-number",
"dp port new-number",
3, 3, dpif_dummy_change_port_number, NULL);
- unixctl_command_register("dpif-dummy/delete-port", "dp port",
- 2, 2, dpif_dummy_delete_port, NULL);
}
\f
/* Datapath Classifier. */
/* These fields are accessed by readers. */
struct cmap rules; /* Contains "struct dpcls_rule"s. */
+ uint32_t hit_cnt; /* Number of match hits in subtable in current
+ optimization interval. */
struct netdev_flow_key mask; /* Wildcards for fields (const). */
/* 'mask' must be the last field, additional space is allocated here. */
};
static void
dpcls_destroy_subtable(struct dpcls *cls, struct dpcls_subtable *subtable)
{
+ VLOG_DBG("Destroying subtable %p for in_port %d", subtable, cls->in_port);
pvector_remove(&cls->subtables, subtable);
cmap_remove(&cls->subtables_map, &subtable->cmap_node,
subtable->mask.hash);
subtable = xmalloc(sizeof *subtable
- sizeof subtable->mask.mf + mask->len);
cmap_init(&subtable->rules);
+ subtable->hit_cnt = 0;
netdev_flow_key_clone(&subtable->mask, mask);
cmap_insert(&cls->subtables_map, &subtable->cmap_node, mask->hash);
+ /* Add the new subtable at the end of the pvector (with no hits yet) */
pvector_insert(&cls->subtables, subtable, 0);
+ VLOG_DBG("Creating %"PRIuSIZE". subtable %p for in_port %d",
+ cmap_count(&cls->subtables_map), subtable, cls->in_port);
pvector_publish(&cls->subtables);
return subtable;
return dpcls_create_subtable(cls, mask);
}
+
+/* Periodically sort the dpcls subtable vectors according to hit counts */
+static void
+dpcls_sort_subtable_vector(struct dpcls *cls)
+{
+ struct pvector *pvec = &cls->subtables;
+ struct dpcls_subtable *subtable;
+
+ PVECTOR_FOR_EACH (subtable, pvec) {
+ pvector_change_priority(pvec, subtable, subtable->hit_cnt);
+ subtable->hit_cnt = 0;
+ }
+ pvector_publish(pvec);
+}
+
+static inline void
+dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd)
+{
+ struct dpcls *cls;
+ long long int now = time_msec();
+
+ if (now > pmd->next_optimization) {
+ /* Try to obtain the flow lock to block out revalidator threads.
+ * If not possible, just try next time. */
+ if (!ovs_mutex_trylock(&pmd->flow_mutex)) {
+ /* Optimize each classifier */
+ CMAP_FOR_EACH (cls, node, &pmd->classifiers) {
+ dpcls_sort_subtable_vector(cls);
+ }
+ ovs_mutex_unlock(&pmd->flow_mutex);
+ /* Start new measuring interval */
+ pmd->next_optimization = now + DPCLS_OPTIMIZATION_INTERVAL;
+ }
+ }
+}
+
/* Insert 'rule' into 'cls'. */
static void
dpcls_insert(struct dpcls *cls, struct dpcls_rule *rule,
{
struct dpcls_subtable *subtable = dpcls_find_subtable(cls, mask);
+ /* Refer to subtable's mask, also for later removal. */
rule->mask = &subtable->mask;
cmap_insert(&subtable->rules, &rule->cmap_node, rule->flow.hash);
}
ovs_assert(rule->mask);
+ /* Get subtable from reference in rule->mask. */
INIT_CONTAINER(subtable, rule->mask, mask);
-
if (cmap_remove(&subtable->rules, &rule->cmap_node, rule->flow.hash)
== 0) {
+ /* Delete empty subtable. */
dpcls_destroy_subtable(cls, subtable);
pvector_publish(&cls->subtables);
}
return true;
}
-/* For each miniflow in 'flows' performs a classifier lookup writing the result
- * into the corresponding slot in 'rules'. If a particular entry in 'flows' is
+/* For each miniflow in 'keys' performs a classifier lookup writing the result
+ * into the corresponding slot in 'rules'. If a particular entry in 'keys' is
* NULL it is skipped.
*
* This function is optimized for use in the userspace datapath and therefore
* classifier_lookup() function. Specifically, it does not implement
* priorities, instead returning any rule which matches the flow.
*
- * Returns true if all flows found a corresponding rule. */
+ * Returns true if all miniflows found a corresponding rule. */
static bool
-dpcls_lookup(const struct dpcls *cls, const struct netdev_flow_key keys[],
- struct dpcls_rule **rules, const size_t cnt)
-{
- /* The batch size 16 was experimentally found faster than 8 or 32. */
+dpcls_lookup(struct dpcls *cls, const struct netdev_flow_key keys[],
+ struct dpcls_rule **rules, const size_t cnt,
+ int *num_lookups_p)
+{
+ /* The received 'cnt' miniflows are the search-keys that will be processed
+ * in batches of 16 elements. N_MAPS will contain the number of these
+ * 16-elements batches. i.e. for 'cnt' = 32, N_MAPS will be 2. The batch
+ * size 16 was experimentally found faster than 8 or 32. */
typedef uint16_t map_type;
#define MAP_BITS (sizeof(map_type) * CHAR_BIT)
}
memset(rules, 0, cnt * sizeof *rules);
+ int lookups_match = 0, subtable_pos = 1;
+
+ /* The Datapath classifier - aka dpcls - is composed of subtables.
+ * Subtables are dynamically created as needed when new rules are inserted.
+ * Each subtable collects rules with matches on a specific subset of packet
+ * fields as defined by the subtable's mask. We proceed to process every
+ * search-key against each subtable, but when a match is found for a
+ * search-key, the search for that key can stop because the rules are
+ * non-overlapping. */
PVECTOR_FOR_EACH (subtable, &cls->subtables) {
const struct netdev_flow_key *mkeys = keys;
struct dpcls_rule **mrules = rules;
BUILD_ASSERT_DECL(sizeof remains == sizeof *maps);
+ /* Loops on each batch of 16 search-keys. */
for (m = 0; m < N_MAPS; m++, mkeys += MAP_BITS, mrules += MAP_BITS) {
uint32_t hashes[MAP_BITS];
const struct cmap_node *nodes[MAP_BITS];
continue; /* Skip empty maps. */
}
- /* Compute hashes for the remaining keys. */
+ /* Compute hashes for the remaining keys. Each search-key is
+ * masked with the subtable's mask to avoid hashing the wildcarded
+ * bits. */
ULLONG_FOR_EACH_1(i, map) {
hashes[i] = netdev_flow_key_hash_in_mask(&mkeys[i],
&subtable->mask);
}
/* Lookup. */
map = cmap_find_batch(&subtable->rules, map, hashes, nodes);
- /* Check results. */
+ /* Check results. When the i-th bit of map is set, it means that a
+ * set of nodes with a matching hash value was found for the i-th
+ * search-key. Due to possible hash collisions we need to check
+ * which of the found rules, if any, really matches our masked
+ * search-key. */
ULLONG_FOR_EACH_1(i, map) {
struct dpcls_rule *rule;
CMAP_NODE_FOR_EACH (rule, cmap_node, nodes[i]) {
if (OVS_LIKELY(dpcls_rule_matches_key(rule, &mkeys[i]))) {
mrules[i] = rule;
+ /* Even at 20 Mpps the 32-bit hit_cnt cannot wrap
+ * within one second optimization interval */
+ subtable->hit_cnt++;
+ lookups_match += subtable_pos;
goto next;
}
}
+ /* None of the found rules was a match. Reset the i-th bit to
+ * keep searching in the next subtable. */
ULLONG_SET0(map, i); /* Did not match. */
next:
; /* Keep Sparse happy. */
remains |= maps[m];
}
if (!remains) {
+ if (num_lookups_p) {
+ *num_lookups_p = lookups_match;
+ }
return true; /* All found. */
}
+ subtable_pos++;
+ }
+ if (num_lookups_p) {
+ *num_lookups_p = lookups_match;
}
return false; /* Some misses. */
}