/*
- * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016, 2017 Nicira, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <fcntl.h>
#include <inttypes.h>
#include <net/if.h>
+#include <sys/types.h>
#include <netinet/in.h>
#include <stdint.h>
#include <stdlib.h>
#include "fat-rwlock.h"
#include "flow.h"
#include "hmapx.h"
+#include "id-pool.h"
#include "latch.h"
#include "netdev.h"
#include "netdev-vport.h"
#include "ovs-numa.h"
#include "ovs-rcu.h"
#include "packets.h"
-#include "poll-loop.h"
+#include "openvswitch/poll-loop.h"
#include "pvector.h"
#include "random.h"
#include "seq.h"
#define FLOW_DUMP_MAX_BATCH 50
/* Use per thread recirc_depth to prevent recirculation loop. */
-#define MAX_RECIRC_DEPTH 5
+#define MAX_RECIRC_DEPTH 6
DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
/* Configuration parameters. */
enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow table. */
+enum { MAX_METERS = 65536 }; /* Maximum number of meters. */
+enum { MAX_BANDS = 8 }; /* Maximum number of bands / meter. */
+enum { N_METER_LOCKS = 64 }; /* Maximum number of meters. */
/* Protects against changes to 'dp_netdevs'. */
static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;
static struct vlog_rate_limit upcall_rl = VLOG_RATE_LIMIT_INIT(600, 600);
#define DP_NETDEV_CS_SUPPORTED_MASK (CS_NEW | CS_ESTABLISHED | CS_RELATED \
- | CS_INVALID | CS_REPLY_DIR | CS_TRACKED)
+ | CS_INVALID | CS_REPLY_DIR | CS_TRACKED \
+ | CS_SRC_NAT | CS_DST_NAT)
#define DP_NETDEV_CS_UNSUPPORTED_MASK (~(uint32_t)DP_NETDEV_CS_SUPPORTED_MASK)
static struct odp_support dp_netdev_support = {
+ .max_vlan_headers = SIZE_MAX,
.max_mpls_depth = SIZE_MAX,
.recirc = true,
.ct_state = true,
.ct_zone = true,
.ct_mark = true,
.ct_label = true,
+ .ct_state_nat = true,
+ .ct_orig_tuple = true,
+ .ct_orig_tuple6 = true,
};
/* Stores a miniflow with inline values */
#define EM_FLOW_HASH_MASK (EM_FLOW_HASH_ENTRIES - 1)
#define EM_FLOW_HASH_SEGS 2
+/* Default EMC insert probability is 1 / DEFAULT_EM_FLOW_INSERT_INV_PROB */
+#define DEFAULT_EM_FLOW_INSERT_INV_PROB 100
+#define DEFAULT_EM_FLOW_INSERT_MIN (UINT32_MAX / \
+ DEFAULT_EM_FLOW_INSERT_INV_PROB)
+
struct emc_entry {
struct dp_netdev_flow *flow;
struct netdev_flow_key key; /* key.hash used for emc hash value. */
/* Time in ms between successive optimizations of the dpcls subtable vector */
#define DPCLS_OPTIMIZATION_INTERVAL 1000
+/* Time in ms of the interval in which rxq processing cycles used in
+ * rxq to pmd assignments is measured and stored. */
+#define PMD_RXQ_INTERVAL_LEN 10000
+
+/* Number of intervals for which cycles are stored
+ * and used during rxq to pmd assignment. */
+#define PMD_RXQ_INTERVAL_MAX 6
+
struct dpcls {
struct cmap_node node; /* Within dp_netdev_pmd_thread.classifiers */
odp_port_t in_port;
struct dpcls_rule **rules, size_t cnt,
int *num_lookups_p);
\f
+/* Set of supported meter flags */
+#define DP_SUPPORTED_METER_FLAGS_MASK \
+ (OFPMF13_STATS | OFPMF13_PKTPS | OFPMF13_KBPS | OFPMF13_BURST)
+
+/* Set of supported meter band types */
+#define DP_SUPPORTED_METER_BAND_TYPES \
+ ( 1 << OFPMBT13_DROP )
+
+struct dp_meter_band {
+ struct ofputil_meter_band up; /* type, prec_level, pad, rate, burst_size */
+ uint32_t bucket; /* In 1/1000 packets (for PKTPS), or in bits (for KBPS) */
+ uint64_t packet_count;
+ uint64_t byte_count;
+};
+
+struct dp_meter {
+ uint16_t flags;
+ uint16_t n_bands;
+ uint32_t max_delta_t;
+ uint64_t used;
+ uint64_t packet_count;
+ uint64_t byte_count;
+ struct dp_meter_band bands[];
+};
+
/* Datapath based on the network device interface from netdev.h.
*
*
struct hmap ports;
struct seq *port_seq; /* Incremented whenever a port changes. */
+ /* Meters. */
+ struct ovs_mutex meter_locks[N_METER_LOCKS];
+ struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
+
+ /* Probability of EMC insertions is a factor of 'emc_insert_min'.*/
+ OVS_ALIGNED_VAR(CACHE_LINE_SIZE) atomic_uint32_t emc_insert_min;
+
/* Protects access to ofproto-dpif-upcall interface during revalidator
* thread synchronization. */
struct fat_rwlock upcall_rwlock;
/* Stores all 'struct dp_netdev_pmd_thread's. */
struct cmap poll_threads;
+ /* id pool for per thread static_tx_qid. */
+ struct id_pool *tx_qid_pool;
+ struct ovs_mutex tx_qid_pool_mutex;
/* Protects the access of the 'struct dp_netdev_pmd_thread'
* instance for non-pmd thread. */
struct conntrack conntrack;
};
+static void meter_lock(const struct dp_netdev *dp, uint32_t meter_id)
+ OVS_ACQUIRES(dp->meter_locks[meter_id % N_METER_LOCKS])
+{
+ ovs_mutex_lock(&dp->meter_locks[meter_id % N_METER_LOCKS]);
+}
+
+static void meter_unlock(const struct dp_netdev *dp, uint32_t meter_id)
+ OVS_RELEASES(dp->meter_locks[meter_id % N_METER_LOCKS])
+{
+ ovs_mutex_unlock(&dp->meter_locks[meter_id % N_METER_LOCKS]);
+}
+
+
static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
odp_port_t)
OVS_REQUIRES(dp->port_mutex);
};
enum pmd_cycles_counter_type {
- PMD_CYCLES_POLLING, /* Cycles spent polling NICs. */
- PMD_CYCLES_PROCESSING, /* Cycles spent processing packets */
+ PMD_CYCLES_IDLE, /* Cycles spent idle or unsuccessful polling */
+ PMD_CYCLES_PROCESSING, /* Cycles spent successfully polling and
+ * processing polled packets */
PMD_N_CYCLES
};
+enum rxq_cycles_counter_type {
+ RXQ_CYCLES_PROC_CURR, /* Cycles spent successfully polling and
+ processing packets during the current
+ interval. */
+ RXQ_CYCLES_PROC_HIST, /* Total cycles of all intervals that are used
+ during rxq to pmd assignment. */
+ RXQ_N_CYCLES
+};
+
#define XPS_TIMEOUT_MS 500LL
/* Contained by struct dp_netdev_port's 'rxqs' member. */
struct dp_netdev_rxq {
- struct netdev_rxq *rxq;
- unsigned core_id; /* Сore to which this queue is pinned. */
+ struct dp_netdev_port *port;
+ struct netdev_rxq *rx;
+ unsigned core_id; /* Core to which this queue should be
+ pinned. OVS_CORE_UNSPEC if the
+ queue doesn't need to be pinned to a
+ particular core. */
+ unsigned intrvl_idx; /* Write index for 'cycles_intrvl'. */
+ struct dp_netdev_pmd_thread *pmd; /* pmd thread that polls this queue. */
+
+ /* Counters of cycles spent successfully polling and processing pkts. */
+ atomic_ullong cycles[RXQ_N_CYCLES];
+ /* We store PMD_RXQ_INTERVAL_MAX intervals of data for an rxq and then
+ sum them to yield the cycles used for an rxq. */
+ atomic_ullong cycles_intrvl[PMD_RXQ_INTERVAL_MAX];
};
/* A port in a netdev-based datapath. */
struct dp_netdev_port {
odp_port_t port_no;
+ bool dynamic_txqs; /* If true XPS will be used. */
+ bool need_reconfigure; /* True if we should reconfigure netdev. */
struct netdev *netdev;
struct hmap_node node; /* Node in dp_netdev's 'ports'. */
struct netdev_saved_flags *sf;
struct dp_netdev_rxq *rxqs;
- unsigned n_rxq; /* Number of elements in 'rxq' */
- bool dynamic_txqs; /* If true XPS will be used. */
- unsigned *txq_used; /* Number of threads that uses each tx queue. */
+ unsigned n_rxq; /* Number of elements in 'rxqs' */
+ unsigned *txq_used; /* Number of threads that use each tx queue. */
struct ovs_mutex txq_used_mutex;
char *type; /* Port type as requested by user. */
char *rxq_affinity_list; /* Requested affinity of rx queues. */
static void dp_netdev_flow_unref(struct dp_netdev_flow *);
static bool dp_netdev_flow_ref(struct dp_netdev_flow *);
static int dpif_netdev_flow_from_nlattrs(const struct nlattr *, uint32_t,
- struct flow *);
+ struct flow *, bool);
/* A set of datapath actions within a "struct dp_netdev_flow".
*
atomic_ullong n[PMD_N_CYCLES];
};
+struct polled_queue {
+ struct dp_netdev_rxq *rxq;
+ odp_port_t port_no;
+};
+
/* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */
struct rxq_poll {
- struct dp_netdev_port *port;
- struct netdev_rxq *rx;
- struct ovs_list node;
+ struct dp_netdev_rxq *rxq;
+ struct hmap_node node;
};
/* Contained by struct dp_netdev_pmd_thread's 'send_port_cache',
* I/O of all non-pmd threads. There will be no actual thread created
* for the instance.
*
- * Each struct has its own flow table and classifier. Packets received
- * from managed ports are looked up in the corresponding pmd thread's
- * flow table, and are executed with the found actions.
+ * Each struct has its own flow cache and classifier per managed ingress port.
+ * For packets received on ingress port, a look up is done on corresponding PMD
+ * thread's flow cache and in case of a miss, lookup is performed in the
+ * corresponding classifier of port. Packets are executed with the found
+ * actions in either case.
* */
struct dp_netdev_pmd_thread {
struct dp_netdev *dp;
struct cmap classifiers;
/* Periodically sort subtable vectors according to hit frequencies */
long long int next_optimization;
+ /* End of the next time interval for which processing cycles
+ are stored for each polled rxq. */
+ long long int rxq_next_cycle_store;
/* Statistics. */
struct dp_netdev_pmd_stats stats;
/* Queue id used by this pmd thread to send packets on all netdevs if
* XPS disabled for this netdev. All static_tx_qid's are unique and less
- * than 'ovs_numa_get_n_cores() + 1'. */
- atomic_int static_tx_qid;
+ * than 'cmap_count(dp->poll_threads)'. */
+ uint32_t static_tx_qid;
struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and 'tx_ports'. */
/* List of rx queues to poll. */
- struct ovs_list poll_list OVS_GUARDED;
- /* Number of elements in 'poll_list' */
- int poll_cnt;
+ struct hmap poll_list OVS_GUARDED;
/* Map of 'tx_port's used for transmission. Written by the main thread,
* read by the pmd thread. */
struct hmap tx_ports OVS_GUARDED;
* reporting to the user */
unsigned long long stats_zero[DP_N_STATS];
uint64_t cycles_zero[PMD_N_CYCLES];
+
+ /* Set to true if the pmd thread needs to be reloaded. */
+ bool need_reload;
};
/* Interface to netdev-based datapath. */
static void dp_netdev_set_nonpmd(struct dp_netdev *dp)
OVS_REQUIRES(dp->port_mutex);
+static void *pmd_thread_main(void *);
static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp,
unsigned core_id);
static struct dp_netdev_pmd_thread *
dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
-static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
-static void dp_netdev_stop_pmds(struct dp_netdev *dp);
-static void dp_netdev_start_pmds(struct dp_netdev *dp)
- OVS_REQUIRES(dp->port_mutex);
+static void dp_netdev_del_pmd(struct dp_netdev *dp,
+ struct dp_netdev_pmd_thread *pmd);
+static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd);
static void dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd);
-static void dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
- struct dp_netdev_port *port);
-static void dp_netdev_add_port_to_pmds(struct dp_netdev *dp,
- struct dp_netdev_port *port);
static void dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
- struct dp_netdev_port *port);
+ struct dp_netdev_port *port)
+ OVS_REQUIRES(pmd->port_mutex);
+static void dp_netdev_del_port_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct tx_port *tx)
+ OVS_REQUIRES(pmd->port_mutex);
static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
- struct dp_netdev_port *port,
- struct netdev_rxq *rx);
-static struct dp_netdev_pmd_thread *
-dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
-static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
- OVS_REQUIRES(dp->port_mutex);
-static void reconfigure_pmd_threads(struct dp_netdev *dp)
+ struct dp_netdev_rxq *rxq)
+ OVS_REQUIRES(pmd->port_mutex);
+static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct rxq_poll *poll)
+ OVS_REQUIRES(pmd->port_mutex);
+static void reconfigure_datapath(struct dp_netdev *dp)
OVS_REQUIRES(dp->port_mutex);
static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
OVS_REQUIRES(pmd->port_mutex);
static inline void
-dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd);
-
+dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd,
+ struct polled_queue *poll_list, int poll_cnt);
+static void
+dp_netdev_rxq_set_cycles(struct dp_netdev_rxq *rx,
+ enum rxq_cycles_counter_type type,
+ unsigned long long cycles);
+static uint64_t
+dp_netdev_rxq_get_cycles(struct dp_netdev_rxq *rx,
+ enum rxq_cycles_counter_type type);
+static void
+dp_netdev_rxq_set_intrvl_cycles(struct dp_netdev_rxq *rx,
+ unsigned long long cycles);
+static uint64_t
+dp_netdev_rxq_get_intrvl_cycles(struct dp_netdev_rxq *rx, unsigned idx);
static void
dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
long long now, bool purge);
static inline bool emc_entry_alive(struct emc_entry *ce);
static void emc_clear_entry(struct emc_entry *ce);
+static void dp_netdev_request_reconfigure(struct dp_netdev *dp);
+
static void
emc_cache_init(struct emc_cache *flow_cache)
{
unsigned long long stats[DP_N_STATS],
uint64_t cycles[PMD_N_CYCLES])
{
- unsigned long long total_packets = 0;
+ unsigned long long total_packets;
uint64_t total_cycles = 0;
int i;
} else {
stats[i] = 0;
}
-
- if (i != DP_STAT_LOST) {
- /* Lost packets are already included in DP_STAT_MISS */
- total_packets += stats[i];
- }
}
+ /* Sum of all the matched and not matched packets gives the total. */
+ total_packets = stats[DP_STAT_EXACT_HIT] + stats[DP_STAT_MASKED_HIT]
+ + stats[DP_STAT_MISS];
+
for (i = 0; i < PMD_N_CYCLES; i++) {
if (cycles[i] > pmd->cycles_zero[i]) {
cycles[i] -= pmd->cycles_zero[i];
}
ds_put_format(reply,
- "\tpolling cycles:%"PRIu64" (%.02f%%)\n"
+ "\tidle cycles:%"PRIu64" (%.02f%%)\n"
"\tprocessing cycles:%"PRIu64" (%.02f%%)\n",
- cycles[PMD_CYCLES_POLLING],
- cycles[PMD_CYCLES_POLLING] / (double)total_cycles * 100,
+ cycles[PMD_CYCLES_IDLE],
+ cycles[PMD_CYCLES_IDLE] / (double)total_cycles * 100,
cycles[PMD_CYCLES_PROCESSING],
cycles[PMD_CYCLES_PROCESSING] / (double)total_cycles * 100);
}
}
+static int
+compare_poll_list(const void *a_, const void *b_)
+{
+ const struct rxq_poll *a = a_;
+ const struct rxq_poll *b = b_;
+
+ const char *namea = netdev_rxq_get_name(a->rxq->rx);
+ const char *nameb = netdev_rxq_get_name(b->rxq->rx);
+
+ int cmp = strcmp(namea, nameb);
+ if (!cmp) {
+ return netdev_rxq_get_queue_id(a->rxq->rx)
+ - netdev_rxq_get_queue_id(b->rxq->rx);
+ } else {
+ return cmp;
+ }
+}
+
+static void
+sorted_poll_list(struct dp_netdev_pmd_thread *pmd, struct rxq_poll **list,
+ size_t *n)
+{
+ struct rxq_poll *ret, *poll;
+ size_t i;
+
+ *n = hmap_count(&pmd->poll_list);
+ if (!*n) {
+ ret = NULL;
+ } else {
+ ret = xcalloc(*n, sizeof *ret);
+ i = 0;
+ HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+ ret[i] = *poll;
+ i++;
+ }
+ ovs_assert(i == *n);
+ qsort(ret, *n, sizeof *ret, compare_poll_list);
+ }
+
+ *list = ret;
+}
+
static void
pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd)
{
if (pmd->core_id != NON_PMD_CORE_ID) {
- struct rxq_poll *poll;
const char *prev_name = NULL;
+ struct rxq_poll *list;
+ size_t i, n;
ds_put_format(reply,
"pmd thread numa_id %d core_id %u:\n\tisolated : %s\n",
? "true" : "false");
ovs_mutex_lock(&pmd->port_mutex);
- LIST_FOR_EACH (poll, node, &pmd->poll_list) {
- const char *name = netdev_get_name(poll->port->netdev);
+ sorted_poll_list(pmd, &list, &n);
+ for (i = 0; i < n; i++) {
+ const char *name = netdev_rxq_get_name(list[i].rxq->rx);
if (!prev_name || strcmp(name, prev_name)) {
if (prev_name) {
ds_put_cstr(reply, "\n");
}
- ds_put_format(reply, "\tport: %s\tqueue-id:",
- netdev_get_name(poll->port->netdev));
+ ds_put_format(reply, "\tport: %s\tqueue-id:", name);
}
- ds_put_format(reply, " %d", netdev_rxq_get_queue_id(poll->rx));
+ ds_put_format(reply, " %d",
+ netdev_rxq_get_queue_id(list[i].rxq->rx));
prev_name = name;
}
ovs_mutex_unlock(&pmd->port_mutex);
ds_put_cstr(reply, "\n");
+ free(list);
+ }
+}
+
+static int
+compare_poll_thread_list(const void *a_, const void *b_)
+{
+ const struct dp_netdev_pmd_thread *a, *b;
+
+ a = *(struct dp_netdev_pmd_thread **)a_;
+ b = *(struct dp_netdev_pmd_thread **)b_;
+
+ if (a->core_id < b->core_id) {
+ return -1;
+ }
+ if (a->core_id > b->core_id) {
+ return 1;
+ }
+ return 0;
+}
+
+/* Create a sorted list of pmd's from the dp->poll_threads cmap. We can use
+ * this list, as long as we do not go to quiescent state. */
+static void
+sorted_poll_thread_list(struct dp_netdev *dp,
+ struct dp_netdev_pmd_thread ***list,
+ size_t *n)
+{
+ struct dp_netdev_pmd_thread *pmd;
+ struct dp_netdev_pmd_thread **pmd_list;
+ size_t k = 0, n_pmds;
+
+ n_pmds = cmap_count(&dp->poll_threads);
+ pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (k >= n_pmds) {
+ break;
+ }
+ pmd_list[k++] = pmd;
+ }
+
+ qsort(pmd_list, k, sizeof *pmd_list, compare_poll_thread_list);
+
+ *list = pmd_list;
+ *n = k;
+}
+
+static void
+dpif_netdev_pmd_rebalance(struct unixctl_conn *conn, int argc,
+ const char *argv[], void *aux OVS_UNUSED)
+{
+ struct ds reply = DS_EMPTY_INITIALIZER;
+ struct dp_netdev *dp = NULL;
+
+ ovs_mutex_lock(&dp_netdev_mutex);
+
+ if (argc == 2) {
+ dp = shash_find_data(&dp_netdevs, argv[1]);
+ } else if (shash_count(&dp_netdevs) == 1) {
+ /* There's only one datapath */
+ dp = shash_first(&dp_netdevs)->data;
+ }
+
+ if (!dp) {
+ ovs_mutex_unlock(&dp_netdev_mutex);
+ unixctl_command_reply_error(conn,
+ "please specify an existing datapath");
+ return;
}
+
+ dp_netdev_request_reconfigure(dp);
+ ovs_mutex_unlock(&dp_netdev_mutex);
+ ds_put_cstr(&reply, "pmd rxq rebalance requested.\n");
+ unixctl_command_reply(conn, ds_cstr(&reply));
+ ds_destroy(&reply);
}
static void
void *aux)
{
struct ds reply = DS_EMPTY_INITIALIZER;
- struct dp_netdev_pmd_thread *pmd;
+ struct dp_netdev_pmd_thread **pmd_list;
struct dp_netdev *dp = NULL;
+ size_t n;
enum pmd_info_type type = *(enum pmd_info_type *) aux;
ovs_mutex_lock(&dp_netdev_mutex);
return;
}
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ sorted_poll_thread_list(dp, &pmd_list, &n);
+ for (size_t i = 0; i < n; i++) {
+ struct dp_netdev_pmd_thread *pmd = pmd_list[i];
+ if (!pmd) {
+ break;
+ }
+
if (type == PMD_INFO_SHOW_RXQ) {
pmd_info_show_rxq(&reply, pmd);
} else {
unsigned long long stats[DP_N_STATS];
uint64_t cycles[PMD_N_CYCLES];
- int i;
/* Read current stats and cycle counters */
- for (i = 0; i < ARRAY_SIZE(stats); i++) {
- atomic_read_relaxed(&pmd->stats.n[i], &stats[i]);
+ for (size_t j = 0; j < ARRAY_SIZE(stats); j++) {
+ atomic_read_relaxed(&pmd->stats.n[j], &stats[j]);
}
- for (i = 0; i < ARRAY_SIZE(cycles); i++) {
- atomic_read_relaxed(&pmd->cycles.n[i], &cycles[i]);
+ for (size_t j = 0; j < ARRAY_SIZE(cycles); j++) {
+ atomic_read_relaxed(&pmd->cycles.n[j], &cycles[j]);
}
if (type == PMD_INFO_CLEAR_STATS) {
}
}
}
+ free(pmd_list);
ovs_mutex_unlock(&dp_netdev_mutex);
unixctl_command_register("dpif-netdev/pmd-rxq-show", "[dp]",
0, 1, dpif_netdev_pmd_info,
(void *)&poll_aux);
+ unixctl_command_register("dpif-netdev/pmd-rxq-rebalance", "[dp]",
+ 0, 1, dpif_netdev_pmd_rebalance,
+ NULL);
return 0;
}
dp->reconfigure_seq = seq_create();
dp->last_reconfigure_seq = seq_read(dp->reconfigure_seq);
+ for (int i = 0; i < N_METER_LOCKS; ++i) {
+ ovs_mutex_init_adaptive(&dp->meter_locks[i]);
+ }
+
/* Disable upcalls by default. */
dp_netdev_disable_upcall(dp);
dp->upcall_aux = NULL;
conntrack_init(&dp->conntrack);
+ atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
+
cmap_init(&dp->poll_threads);
+
+ ovs_mutex_init(&dp->tx_qid_pool_mutex);
+ /* We need 1 Tx queue for each possible core + 1 for non-PMD threads. */
+ dp->tx_qid_pool = id_pool_create(0, ovs_numa_get_n_cores() + 1);
+
ovs_mutex_init_recursive(&dp->non_pmd_mutex);
ovsthread_key_create(&dp->per_pmd_key, NULL);
ovs_mutex_lock(&dp->port_mutex);
+ /* non-PMD will be created before all other threads and will
+ * allocate static_tx_qid = 0. */
dp_netdev_set_nonpmd(dp);
error = do_add_port(dp, name, dpif_netdev_port_open_type(dp->class,
fat_rwlock_destroy(&dp->upcall_rwlock);
}
+static void
+dp_delete_meter(struct dp_netdev *dp, uint32_t meter_id)
+ OVS_REQUIRES(dp->meter_locks[meter_id % N_METER_LOCKS])
+{
+ if (dp->meters[meter_id]) {
+ free(dp->meters[meter_id]);
+ dp->meters[meter_id] = NULL;
+ }
+}
+
/* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
* through the 'dp_netdevs' shash while freeing 'dp'. */
static void
do_del_port(dp, port);
}
ovs_mutex_unlock(&dp->port_mutex);
- dp_netdev_destroy_all_pmds(dp);
+
+ dp_netdev_destroy_all_pmds(dp, true);
cmap_destroy(&dp->poll_threads);
+ ovs_mutex_destroy(&dp->tx_qid_pool_mutex);
+ id_pool_destroy(dp->tx_qid_pool);
+
ovs_mutex_destroy(&dp->non_pmd_mutex);
ovsthread_key_delete(dp->per_pmd_key);
/* Upcalls must be disabled at this point */
dp_netdev_destroy_upcall_lock(dp);
+ int i;
+
+ for (i = 0; i < MAX_METERS; ++i) {
+ meter_lock(dp, i);
+ dp_delete_meter(dp, i);
+ meter_unlock(dp, i);
+ }
+ for (i = 0; i < N_METER_LOCKS; ++i) {
+ ovs_mutex_destroy(&dp->meter_locks[i]);
+ }
+
free(dp->pmd_cmask);
free(CONST_CAST(char *, dp->name));
free(dp);
struct dp_netdev_port *port;
enum netdev_flags flags;
struct netdev *netdev;
- int n_open_rxqs = 0;
- int n_cores = 0;
- int i, error;
- bool dynamic_txqs = false;
+ int error;
*portp = NULL;
goto out;
}
- if (netdev_is_pmd(netdev)) {
- n_cores = ovs_numa_get_n_cores();
-
- if (n_cores == OVS_CORE_UNSPEC) {
- VLOG_ERR("%s, cannot get cpu core info", devname);
- error = ENOENT;
- goto out;
- }
- /* There can only be ovs_numa_get_n_cores() pmd threads,
- * so creates a txq for each, and one extra for the non
- * pmd threads. */
- error = netdev_set_tx_multiq(netdev, n_cores + 1);
- if (error && (error != EOPNOTSUPP)) {
- VLOG_ERR("%s, cannot set multiq", devname);
- goto out;
- }
- }
-
- if (netdev_is_reconf_required(netdev)) {
- error = netdev_reconfigure(netdev);
- if (error) {
- goto out;
- }
- }
-
- if (netdev_is_pmd(netdev)) {
- if (netdev_n_txq(netdev) < n_cores + 1) {
- dynamic_txqs = true;
- }
+ error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
+ if (error) {
+ VLOG_ERR("%s: cannot set promisc flag", devname);
+ goto out;
}
port = xzalloc(sizeof *port);
port->port_no = port_no;
port->netdev = netdev;
- port->n_rxq = netdev_n_rxq(netdev);
- port->rxqs = xcalloc(port->n_rxq, sizeof *port->rxqs);
- port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
port->type = xstrdup(type);
- ovs_mutex_init(&port->txq_used_mutex);
- port->dynamic_txqs = dynamic_txqs;
-
- for (i = 0; i < port->n_rxq; i++) {
- error = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);
- if (error) {
- VLOG_ERR("%s: cannot receive packets on this network device (%s)",
- devname, ovs_strerror(errno));
- goto out_rxq_close;
- }
- port->rxqs[i].core_id = OVS_CORE_UNSPEC;
- n_open_rxqs++;
- }
-
- error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
- if (error) {
- goto out_rxq_close;
- }
port->sf = sf;
+ port->need_reconfigure = true;
+ ovs_mutex_init(&port->txq_used_mutex);
*portp = port;
return 0;
-out_rxq_close:
- for (i = 0; i < n_open_rxqs; i++) {
- netdev_rxq_close(port->rxqs[i].rxq);
- }
- ovs_mutex_destroy(&port->txq_used_mutex);
- free(port->type);
- free(port->txq_used);
- free(port->rxqs);
- free(port);
-
out:
netdev_close(netdev);
return error;
return error;
}
- if (netdev_is_pmd(port->netdev)) {
- dp_netdev_start_pmds(dp);
- }
-
- dp_netdev_add_port_to_pmds(dp, port);
-
hmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
seq_change(dp->port_seq);
+ reconfigure_datapath(dp);
+
return 0;
}
netdev_restore_flags(port->sf);
for (unsigned i = 0; i < port->n_rxq; i++) {
- netdev_rxq_close(port->rxqs[i].rxq);
+ netdev_rxq_close(port->rxqs[i].rx);
}
ovs_mutex_destroy(&port->txq_used_mutex);
free(port->rxq_affinity_list);
return ENODEV;
}
-static int
-get_n_pmd_threads(struct dp_netdev *dp)
-{
- /* There is one non pmd thread in dp->poll_threads */
- return cmap_count(&dp->poll_threads) - 1;
-}
-
/* Returns 'true' if there is a port with pmd netdev. */
static bool
has_pmd_port(struct dp_netdev *dp)
return false;
}
-
static void
do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
OVS_REQUIRES(dp->port_mutex)
hmap_remove(&dp->ports, &port->node);
seq_change(dp->port_seq);
- dp_netdev_del_port_from_all_pmds(dp, port);
-
- if (netdev_is_pmd(port->netdev)) {
- /* If there is no pmd netdev, delete the pmd threads */
- if (!has_pmd_port(dp)) {
- dp_netdev_stop_pmds(dp);
- }
- }
+ reconfigure_datapath(dp);
port_destroy(port);
}
offsetof(struct netdev_flow_key, mf) + src->len);
}
-/* Slow. */
-static void
-netdev_flow_key_from_flow(struct netdev_flow_key *dst,
- const struct flow *src)
-{
- struct dp_packet packet;
- uint64_t buf_stub[512 / 8];
-
- dp_packet_use_stub(&packet, buf_stub, sizeof buf_stub);
- pkt_metadata_from_flow(&packet.md, src);
- flow_compose(&packet, src);
- miniflow_extract(&packet, &dst->mf);
- dp_packet_uninit(&packet);
-
- dst->len = netdev_flow_key_size(miniflow_n_values(&dst->mf));
- dst->hash = 0; /* Not computed yet. */
-}
-
/* Initialize a netdev_flow_key 'mask' from 'match'. */
static inline void
netdev_flow_mask_init(struct netdev_flow_key *mask,
emc_change_entry(to_be_replaced, flow, key);
}
+static inline void
+emc_probabilistic_insert(struct dp_netdev_pmd_thread *pmd,
+ const struct netdev_flow_key *key,
+ struct dp_netdev_flow *flow)
+{
+ /* Insert an entry into the EMC based on probability value 'min'. By
+ * default the value is UINT32_MAX / 100 which yields an insertion
+ * probability of 1/100 ie. 1% */
+
+ uint32_t min;
+ atomic_read_relaxed(&pmd->dp->emc_insert_min, &min);
+
+ if (min && random_uint32() <= min) {
+ emc_insert(&pmd->flow_cache, key, flow);
+ }
+}
+
static inline struct dp_netdev_flow *
emc_lookup(struct emc_cache *cache, const struct netdev_flow_key *key)
{
/* If a UFID is not provided, determine one based on the key. */
if (!ufidp && key && key_len
- && !dpif_netdev_flow_from_nlattrs(key, key_len, &flow)) {
+ && !dpif_netdev_flow_from_nlattrs(key, key_len, &flow, false)) {
dpif_flow_hash(pmd->dp->dpif, &flow, sizeof flow, &ufid);
ufidp = &ufid;
}
dpif_netdev_mask_from_nlattrs(const struct nlattr *key, uint32_t key_len,
const struct nlattr *mask_key,
uint32_t mask_key_len, const struct flow *flow,
- struct flow_wildcards *wc)
+ struct flow_wildcards *wc, bool probe)
{
enum odp_key_fitness fitness;
fitness = odp_flow_key_to_mask(mask_key, mask_key_len, wc, flow);
if (fitness) {
- /* This should not happen: it indicates that
- * odp_flow_key_from_mask() and odp_flow_key_to_mask()
- * disagree on the acceptable form of a mask. Log the problem
- * as an error, with enough details to enable debugging. */
- static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
-
- if (!VLOG_DROP_ERR(&rl)) {
- struct ds s;
-
- ds_init(&s);
- odp_flow_format(key, key_len, mask_key, mask_key_len, NULL, &s,
- true);
- VLOG_ERR("internal error parsing flow mask %s (%s)",
- ds_cstr(&s), odp_key_fitness_to_string(fitness));
- ds_destroy(&s);
+ if (!probe) {
+ /* This should not happen: it indicates that
+ * odp_flow_key_from_mask() and odp_flow_key_to_mask()
+ * disagree on the acceptable form of a mask. Log the problem
+ * as an error, with enough details to enable debugging. */
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+
+ if (!VLOG_DROP_ERR(&rl)) {
+ struct ds s;
+
+ ds_init(&s);
+ odp_flow_format(key, key_len, mask_key, mask_key_len, NULL, &s,
+ true);
+ VLOG_ERR("internal error parsing flow mask %s (%s)",
+ ds_cstr(&s), odp_key_fitness_to_string(fitness));
+ ds_destroy(&s);
+ }
}
return EINVAL;
static int
dpif_netdev_flow_from_nlattrs(const struct nlattr *key, uint32_t key_len,
- struct flow *flow)
+ struct flow *flow, bool probe)
{
- odp_port_t in_port;
-
if (odp_flow_key_to_flow(key, key_len, flow)) {
- /* This should not happen: it indicates that odp_flow_key_from_flow()
- * and odp_flow_key_to_flow() disagree on the acceptable form of a
- * flow. Log the problem as an error, with enough details to enable
- * debugging. */
- static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
-
- if (!VLOG_DROP_ERR(&rl)) {
- struct ds s;
-
- ds_init(&s);
- odp_flow_format(key, key_len, NULL, 0, NULL, &s, true);
- VLOG_ERR("internal error parsing flow key %s", ds_cstr(&s));
- ds_destroy(&s);
+ if (!probe) {
+ /* This should not happen: it indicates that
+ * odp_flow_key_from_flow() and odp_flow_key_to_flow() disagree on
+ * the acceptable form of a flow. Log the problem as an error,
+ * with enough details to enable debugging. */
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+
+ if (!VLOG_DROP_ERR(&rl)) {
+ struct ds s;
+
+ ds_init(&s);
+ odp_flow_format(key, key_len, NULL, 0, NULL, &s, true);
+ VLOG_ERR("internal error parsing flow key %s", ds_cstr(&s));
+ ds_destroy(&s);
+ }
}
return EINVAL;
}
- in_port = flow->in_port.odp_port;
- if (!is_valid_port_number(in_port) && in_port != ODPP_NONE) {
- return EINVAL;
- }
-
if (flow->ct_state & DP_NETDEV_CS_UNSUPPORTED_MASK) {
return EINVAL;
}
cmap_insert(&pmd->flow_table, CONST_CAST(struct cmap_node *, &flow->node),
dp_netdev_flow_hash(&flow->ufid));
- if (OVS_UNLIKELY(VLOG_IS_DBG_ENABLED())) {
+ if (OVS_UNLIKELY(!VLOG_DROP_DBG((&upcall_rl)))) {
struct ds ds = DS_EMPTY_INITIALIZER;
struct ofpbuf key_buf, mask_buf;
struct odp_flow_key_parms odp_parms = {
mask_buf.data, mask_buf.size,
NULL, &ds, false);
ds_put_cstr(&ds, ", actions:");
- format_odp_actions(&ds, actions, actions_len);
+ format_odp_actions(&ds, actions, actions_len, NULL);
- VLOG_DBG_RL(&upcall_rl, "%s", ds_cstr(&ds));
+ VLOG_DBG("%s", ds_cstr(&ds));
ofpbuf_uninit(&key_buf);
ofpbuf_uninit(&mask_buf);
+
+ /* Add a printout of the actual match installed. */
+ struct match m;
+ ds_clear(&ds);
+ ds_put_cstr(&ds, "flow match: ");
+ miniflow_expand(&flow->cr.flow.mf, &m.flow);
+ miniflow_expand(&flow->cr.mask->mf, &m.wc.masks);
+ memset(&m.tun_md, 0, sizeof m.tun_md);
+ match_format(&m, NULL, &ds, OFP_DEFAULT_PRIORITY);
+
+ VLOG_DBG("%s", ds_cstr(&ds));
+
ds_destroy(&ds);
}
}
static int
-dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
+flow_put_on_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct netdev_flow_key *key,
+ struct match *match,
+ ovs_u128 *ufid,
+ const struct dpif_flow_put *put,
+ struct dpif_flow_stats *stats)
{
- struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_flow *netdev_flow;
- struct netdev_flow_key key;
- struct dp_netdev_pmd_thread *pmd;
- struct match match;
- ovs_u128 ufid;
- unsigned pmd_id = put->pmd_id == PMD_ID_NULL
- ? NON_PMD_CORE_ID : put->pmd_id;
- int error;
-
- error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &match.flow);
- if (error) {
- return error;
- }
- error = dpif_netdev_mask_from_nlattrs(put->key, put->key_len,
- put->mask, put->mask_len,
- &match.flow, &match.wc);
- if (error) {
- return error;
- }
-
- pmd = dp_netdev_get_pmd(dp, pmd_id);
- if (!pmd) {
- return EINVAL;
- }
-
- /* Must produce a netdev_flow_key for lookup.
- * This interface is no longer performance critical, since it is not used
- * for upcall processing any more. */
- netdev_flow_key_from_flow(&key, &match.flow);
+ int error = 0;
- if (put->ufid) {
- ufid = *put->ufid;
- } else {
- dpif_flow_hash(dpif, &match.flow, sizeof match.flow, &ufid);
+ if (stats) {
+ memset(stats, 0, sizeof *stats);
}
ovs_mutex_lock(&pmd->flow_mutex);
- netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &key, NULL);
+ netdev_flow = dp_netdev_pmd_lookup_flow(pmd, key, NULL);
if (!netdev_flow) {
if (put->flags & DPIF_FP_CREATE) {
if (cmap_count(&pmd->flow_table) < MAX_FLOWS) {
- if (put->stats) {
- memset(put->stats, 0, sizeof *put->stats);
- }
- dp_netdev_flow_add(pmd, &match, &ufid, put->actions,
+ dp_netdev_flow_add(pmd, match, ufid, put->actions,
put->actions_len);
error = 0;
} else {
error = ENOENT;
}
} else {
- if (put->flags & DPIF_FP_MODIFY
- && flow_equal(&match.flow, &netdev_flow->flow)) {
+ if (put->flags & DPIF_FP_MODIFY) {
struct dp_netdev_actions *new_actions;
struct dp_netdev_actions *old_actions;
old_actions = dp_netdev_flow_get_actions(netdev_flow);
ovsrcu_set(&netdev_flow->actions, new_actions);
- if (put->stats) {
- get_dpif_flow_stats(netdev_flow, put->stats);
+ if (stats) {
+ get_dpif_flow_stats(netdev_flow, stats);
}
if (put->flags & DPIF_FP_ZERO_STATS) {
/* XXX: The userspace datapath uses thread local statistics
}
}
ovs_mutex_unlock(&pmd->flow_mutex);
- dp_netdev_pmd_unref(pmd);
-
return error;
}
static int
-dpif_netdev_flow_del(struct dpif *dpif, const struct dpif_flow_del *del)
+dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
- struct dp_netdev_flow *netdev_flow;
+ struct netdev_flow_key key, mask;
struct dp_netdev_pmd_thread *pmd;
- unsigned pmd_id = del->pmd_id == PMD_ID_NULL
- ? NON_PMD_CORE_ID : del->pmd_id;
- int error = 0;
-
- pmd = dp_netdev_get_pmd(dp, pmd_id);
- if (!pmd) {
- return EINVAL;
+ struct match match;
+ ovs_u128 ufid;
+ int error;
+ bool probe = put->flags & DPIF_FP_PROBE;
+
+ if (put->stats) {
+ memset(put->stats, 0, sizeof *put->stats);
+ }
+ error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &match.flow,
+ probe);
+ if (error) {
+ return error;
+ }
+ error = dpif_netdev_mask_from_nlattrs(put->key, put->key_len,
+ put->mask, put->mask_len,
+ &match.flow, &match.wc, probe);
+ if (error) {
+ return error;
+ }
+
+ if (put->ufid) {
+ ufid = *put->ufid;
+ } else {
+ dpif_flow_hash(dpif, &match.flow, sizeof match.flow, &ufid);
+ }
+
+ /* Must produce a netdev_flow_key for lookup.
+ * Use the same method as employed to create the key when adding
+ * the flow to the dplcs to make sure they match. */
+ netdev_flow_mask_init(&mask, &match);
+ netdev_flow_key_init_masked(&key, &match.flow, &mask);
+
+ if (put->pmd_id == PMD_ID_NULL) {
+ if (cmap_count(&dp->poll_threads) == 0) {
+ return EINVAL;
+ }
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ struct dpif_flow_stats pmd_stats;
+ int pmd_error;
+
+ pmd_error = flow_put_on_pmd(pmd, &key, &match, &ufid, put,
+ &pmd_stats);
+ if (pmd_error) {
+ error = pmd_error;
+ } else if (put->stats) {
+ put->stats->n_packets += pmd_stats.n_packets;
+ put->stats->n_bytes += pmd_stats.n_bytes;
+ put->stats->used = MAX(put->stats->used, pmd_stats.used);
+ put->stats->tcp_flags |= pmd_stats.tcp_flags;
+ }
+ }
+ } else {
+ pmd = dp_netdev_get_pmd(dp, put->pmd_id);
+ if (!pmd) {
+ return EINVAL;
+ }
+ error = flow_put_on_pmd(pmd, &key, &match, &ufid, put, put->stats);
+ dp_netdev_pmd_unref(pmd);
}
+ return error;
+}
+
+static int
+flow_del_on_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct dpif_flow_stats *stats,
+ const struct dpif_flow_del *del)
+{
+ struct dp_netdev_flow *netdev_flow;
+ int error = 0;
+
ovs_mutex_lock(&pmd->flow_mutex);
netdev_flow = dp_netdev_pmd_find_flow(pmd, del->ufid, del->key,
del->key_len);
if (netdev_flow) {
- if (del->stats) {
- get_dpif_flow_stats(netdev_flow, del->stats);
+ if (stats) {
+ get_dpif_flow_stats(netdev_flow, stats);
}
dp_netdev_pmd_remove_flow(pmd, netdev_flow);
} else {
error = ENOENT;
}
ovs_mutex_unlock(&pmd->flow_mutex);
- dp_netdev_pmd_unref(pmd);
+
+ return error;
+}
+
+static int
+dpif_netdev_flow_del(struct dpif *dpif, const struct dpif_flow_del *del)
+{
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+ struct dp_netdev_pmd_thread *pmd;
+ int error = 0;
+
+ if (del->stats) {
+ memset(del->stats, 0, sizeof *del->stats);
+ }
+
+ if (del->pmd_id == PMD_ID_NULL) {
+ if (cmap_count(&dp->poll_threads) == 0) {
+ return EINVAL;
+ }
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ struct dpif_flow_stats pmd_stats;
+ int pmd_error;
+
+ pmd_error = flow_del_on_pmd(pmd, &pmd_stats, del);
+ if (pmd_error) {
+ error = pmd_error;
+ } else if (del->stats) {
+ del->stats->n_packets += pmd_stats.n_packets;
+ del->stats->n_bytes += pmd_stats.n_bytes;
+ del->stats->used = MAX(del->stats->used, pmd_stats.used);
+ del->stats->tcp_flags |= pmd_stats.tcp_flags;
+ }
+ }
+ } else {
+ pmd = dp_netdev_get_pmd(dp, del->pmd_id);
+ if (!pmd) {
+ return EINVAL;
+ }
+ error = flow_del_on_pmd(pmd, del->stats, del);
+ dp_netdev_pmd_unref(pmd);
+ }
+
return error;
}
}
static struct dpif_flow_dump *
-dpif_netdev_flow_dump_create(const struct dpif *dpif_, bool terse)
+dpif_netdev_flow_dump_create(const struct dpif *dpif_, bool terse,
+ char *type OVS_UNUSED)
{
struct dpif_netdev_flow_dump *dump;
}
}
+ if (execute->probe) {
+ /* If this is part of a probe, Drop the packet, since executing
+ * the action may actually cause spurious packets be sent into
+ * the network. */
+ if (pmd->core_id == NON_PMD_CORE_ID) {
+ dp_netdev_pmd_unref(pmd);
+ }
+ return 0;
+ }
+
/* If the current thread is non-pmd thread, acquires
* the 'non_pmd_mutex'. */
if (pmd->core_id == NON_PMD_CORE_ID) {
flow_hash_5tuple(execute->flow, 0));
}
- packet_batch_init_packet(&pp, execute->packet);
+ dp_packet_batch_init_packet(&pp, execute->packet);
dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
execute->actions, execute->actions_len,
time_msec());
}
}
-/* Changes the number or the affinity of pmd threads. The changes are actually
- * applied in dpif_netdev_run(). */
+/* Applies datapath configuration from the database. Some of the changes are
+ * actually applied in dpif_netdev_run(). */
static int
-dpif_netdev_pmd_set(struct dpif *dpif, const char *cmask)
+dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
+ const char *cmask = smap_get(other_config, "pmd-cpu-mask");
+ unsigned long long insert_prob =
+ smap_get_ullong(other_config, "emc-insert-inv-prob",
+ DEFAULT_EM_FLOW_INSERT_INV_PROB);
+ uint32_t insert_min, cur_min;
if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
free(dp->pmd_cmask);
dp_netdev_request_reconfigure(dp);
}
+ atomic_read_relaxed(&dp->emc_insert_min, &cur_min);
+ if (insert_prob <= UINT32_MAX) {
+ insert_min = insert_prob == 0 ? 0 : UINT32_MAX / insert_prob;
+ } else {
+ insert_min = DEFAULT_EM_FLOW_INSERT_MIN;
+ insert_prob = DEFAULT_EM_FLOW_INSERT_INV_PROB;
+ }
+
+ if (insert_min != cur_min) {
+ atomic_store_relaxed(&dp->emc_insert_min, insert_min);
+ if (insert_min == 0) {
+ VLOG_INFO("EMC has been disabled");
+ } else {
+ VLOG_INFO("EMC insertion probability changed to 1/%llu (~%.2f%%)",
+ insert_prob, (100 / (float)insert_prob));
+ }
+ }
+
return 0;
}
\f
/* Creates and returns a new 'struct dp_netdev_actions', whose actions are
- * a copy of the 'ofpacts_len' bytes of 'ofpacts'. */
+ * a copy of the 'size' bytes of 'actions' input parameters. */
struct dp_netdev_actions *
dp_netdev_actions_create(const struct nlattr *actions, size_t size)
{
non_atomic_ullong_add(&pmd->cycles.n[type], interval);
}
+/* Calculate the intermediate cycle result and add to the counter 'type' */
+static inline void
+cycles_count_intermediate(struct dp_netdev_pmd_thread *pmd,
+ struct dp_netdev_rxq *rxq,
+ enum pmd_cycles_counter_type type)
+ OVS_NO_THREAD_SAFETY_ANALYSIS
+{
+ unsigned long long new_cycles = cycles_counter();
+ unsigned long long interval = new_cycles - pmd->last_cycles;
+ pmd->last_cycles = new_cycles;
+
+ non_atomic_ullong_add(&pmd->cycles.n[type], interval);
+ if (rxq && (type == PMD_CYCLES_PROCESSING)) {
+ /* Add to the amount of current processing cycles. */
+ non_atomic_ullong_add(&rxq->cycles[RXQ_CYCLES_PROC_CURR], interval);
+ }
+}
+
static void
+dp_netdev_rxq_set_cycles(struct dp_netdev_rxq *rx,
+ enum rxq_cycles_counter_type type,
+ unsigned long long cycles)
+{
+ atomic_store_relaxed(&rx->cycles[type], cycles);
+}
+
+static uint64_t
+dp_netdev_rxq_get_cycles(struct dp_netdev_rxq *rx,
+ enum rxq_cycles_counter_type type)
+{
+ unsigned long long processing_cycles;
+ atomic_read_relaxed(&rx->cycles[type], &processing_cycles);
+ return processing_cycles;
+}
+
+static void
+dp_netdev_rxq_set_intrvl_cycles(struct dp_netdev_rxq *rx,
+ unsigned long long cycles)
+{
+ unsigned int idx = rx->intrvl_idx++ % PMD_RXQ_INTERVAL_MAX;
+ atomic_store_relaxed(&rx->cycles_intrvl[idx], cycles);
+}
+
+static uint64_t
+dp_netdev_rxq_get_intrvl_cycles(struct dp_netdev_rxq *rx, unsigned idx)
+{
+ unsigned long long processing_cycles;
+ atomic_read_relaxed(&rx->cycles_intrvl[idx], &processing_cycles);
+ return processing_cycles;
+}
+
+static int
dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
- struct dp_netdev_port *port,
- struct netdev_rxq *rxq)
+ struct netdev_rxq *rx,
+ odp_port_t port_no)
{
struct dp_packet_batch batch;
int error;
+ int batch_cnt = 0;
dp_packet_batch_init(&batch);
- cycles_count_start(pmd);
- error = netdev_rxq_recv(rxq, &batch);
- cycles_count_end(pmd, PMD_CYCLES_POLLING);
+ error = netdev_rxq_recv(rx, &batch);
if (!error) {
*recirc_depth_get() = 0;
- cycles_count_start(pmd);
- dp_netdev_input(pmd, &batch, port->port_no);
- cycles_count_end(pmd, PMD_CYCLES_PROCESSING);
+ batch_cnt = batch.count;
+ dp_netdev_input(pmd, &batch, port_no);
} else if (error != EAGAIN && error != EOPNOTSUPP) {
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
- netdev_get_name(port->netdev), ovs_strerror(error));
+ netdev_rxq_get_name(rx), ovs_strerror(error));
+ }
+
+ return batch_cnt;
+}
+
+static struct tx_port *
+tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
+{
+ struct tx_port *tx;
+
+ HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
+ if (tx->port->port_no == port_no) {
+ return tx;
+ }
}
+
+ return NULL;
}
static int
struct netdev *netdev = port->netdev;
int i, err;
- if (!netdev_is_reconf_required(netdev)) {
- return 0;
- }
+ port->need_reconfigure = false;
/* Closes the existing 'rxq's. */
for (i = 0; i < port->n_rxq; i++) {
- netdev_rxq_close(port->rxqs[i].rxq);
- port->rxqs[i].rxq = NULL;
+ netdev_rxq_close(port->rxqs[i].rx);
+ port->rxqs[i].rx = NULL;
}
+ unsigned last_nrxq = port->n_rxq;
port->n_rxq = 0;
/* Allows 'netdev' to apply the pending configuration changes. */
- err = netdev_reconfigure(netdev);
- if (err && (err != EOPNOTSUPP)) {
- VLOG_ERR("Failed to set interface %s new configuration",
- netdev_get_name(netdev));
- return err;
+ if (netdev_is_reconf_required(netdev)) {
+ err = netdev_reconfigure(netdev);
+ if (err && (err != EOPNOTSUPP)) {
+ VLOG_ERR("Failed to set interface %s new configuration",
+ netdev_get_name(netdev));
+ return err;
+ }
}
/* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */
port->rxqs = xrealloc(port->rxqs,
port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
for (i = 0; i < netdev_n_rxq(netdev); i++) {
- err = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);
+ bool new_queue = i >= last_nrxq;
+ if (new_queue) {
+ memset(&port->rxqs[i], 0, sizeof port->rxqs[i]);
+ }
+
+ port->rxqs[i].port = port;
+
+ err = netdev_rxq_open(netdev, &port->rxqs[i].rx, i);
if (err) {
return err;
}
return 0;
}
+struct rr_numa_list {
+ struct hmap numas; /* Contains 'struct rr_numa' */
+};
+
+struct rr_numa {
+ struct hmap_node node;
+
+ int numa_id;
+
+ /* Non isolated pmds on numa node 'numa_id' */
+ struct dp_netdev_pmd_thread **pmds;
+ int n_pmds;
+
+ int cur_index;
+ bool idx_inc;
+};
+
+static struct rr_numa *
+rr_numa_list_lookup(struct rr_numa_list *rr, int numa_id)
+{
+ struct rr_numa *numa;
+
+ HMAP_FOR_EACH_WITH_HASH (numa, node, hash_int(numa_id, 0), &rr->numas) {
+ if (numa->numa_id == numa_id) {
+ return numa;
+ }
+ }
+
+ return NULL;
+}
+
+/* Returns the next node in numa list following 'numa' in round-robin fashion.
+ * Returns first node if 'numa' is a null pointer or the last node in 'rr'.
+ * Returns NULL if 'rr' numa list is empty. */
+static struct rr_numa *
+rr_numa_list_next(struct rr_numa_list *rr, const struct rr_numa *numa)
+{
+ struct hmap_node *node = NULL;
+
+ if (numa) {
+ node = hmap_next(&rr->numas, &numa->node);
+ }
+ if (!node) {
+ node = hmap_first(&rr->numas);
+ }
+
+ return (node) ? CONTAINER_OF(node, struct rr_numa, node) : NULL;
+}
+
+static void
+rr_numa_list_populate(struct dp_netdev *dp, struct rr_numa_list *rr)
+{
+ struct dp_netdev_pmd_thread *pmd;
+ struct rr_numa *numa;
+
+ hmap_init(&rr->numas);
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->core_id == NON_PMD_CORE_ID || pmd->isolated) {
+ continue;
+ }
+
+ numa = rr_numa_list_lookup(rr, pmd->numa_id);
+ if (!numa) {
+ numa = xzalloc(sizeof *numa);
+ numa->numa_id = pmd->numa_id;
+ hmap_insert(&rr->numas, &numa->node, hash_int(pmd->numa_id, 0));
+ }
+ numa->n_pmds++;
+ numa->pmds = xrealloc(numa->pmds, numa->n_pmds * sizeof *numa->pmds);
+ numa->pmds[numa->n_pmds - 1] = pmd;
+ /* At least one pmd so initialise curr_idx and idx_inc. */
+ numa->cur_index = 0;
+ numa->idx_inc = true;
+ }
+}
+
+/* Returns the next pmd from the numa node in
+ * incrementing or decrementing order. */
+static struct dp_netdev_pmd_thread *
+rr_numa_get_pmd(struct rr_numa *numa)
+{
+ int numa_idx = numa->cur_index;
+
+ if (numa->idx_inc == true) {
+ /* Incrementing through list of pmds. */
+ if (numa->cur_index == numa->n_pmds-1) {
+ /* Reached the last pmd. */
+ numa->idx_inc = false;
+ } else {
+ numa->cur_index++;
+ }
+ } else {
+ /* Decrementing through list of pmds. */
+ if (numa->cur_index == 0) {
+ /* Reached the first pmd. */
+ numa->idx_inc = true;
+ } else {
+ numa->cur_index--;
+ }
+ }
+ return numa->pmds[numa_idx];
+}
+
+static void
+rr_numa_list_destroy(struct rr_numa_list *rr)
+{
+ struct rr_numa *numa;
+
+ HMAP_FOR_EACH_POP (numa, node, &rr->numas) {
+ free(numa->pmds);
+ free(numa);
+ }
+ hmap_destroy(&rr->numas);
+}
+
+/* Sort Rx Queues by the processing cycles they are consuming. */
+static int
+compare_rxq_cycles(const void *a, const void *b)
+{
+ struct dp_netdev_rxq *qa;
+ struct dp_netdev_rxq *qb;
+ uint64_t cycles_qa, cycles_qb;
+
+ qa = *(struct dp_netdev_rxq **) a;
+ qb = *(struct dp_netdev_rxq **) b;
+
+ cycles_qa = dp_netdev_rxq_get_cycles(qa, RXQ_CYCLES_PROC_HIST);
+ cycles_qb = dp_netdev_rxq_get_cycles(qb, RXQ_CYCLES_PROC_HIST);
+
+ if (cycles_qa != cycles_qb) {
+ return (cycles_qa < cycles_qb) ? 1 : -1;
+ } else {
+ /* Cycles are the same so tiebreak on port/queue id.
+ * Tiebreaking (as opposed to return 0) ensures consistent
+ * sort results across multiple OS's. */
+ uint32_t port_qa = odp_to_u32(qa->port->port_no);
+ uint32_t port_qb = odp_to_u32(qb->port->port_no);
+ if (port_qa != port_qb) {
+ return port_qa > port_qb ? 1 : -1;
+ } else {
+ return netdev_rxq_get_queue_id(qa->rx)
+ - netdev_rxq_get_queue_id(qb->rx);
+ }
+ }
+}
+
+/* Assign pmds to queues. If 'pinned' is true, assign pmds to pinned
+ * queues and marks the pmds as isolated. Otherwise, assign non isolated
+ * pmds to unpinned queues.
+ *
+ * If 'pinned' is false queues will be sorted by processing cycles they are
+ * consuming and then assigned to pmds in round robin order.
+ *
+ * The function doesn't touch the pmd threads, it just stores the assignment
+ * in the 'pmd' member of each rxq. */
+static void
+rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
+{
+ struct dp_netdev_port *port;
+ struct rr_numa_list rr;
+ struct rr_numa *non_local_numa = NULL;
+ struct dp_netdev_rxq ** rxqs = NULL;
+ int i, n_rxqs = 0;
+ struct rr_numa *numa = NULL;
+ int numa_id;
+
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ if (!netdev_is_pmd(port->netdev)) {
+ continue;
+ }
+
+ for (int qid = 0; qid < port->n_rxq; qid++) {
+ struct dp_netdev_rxq *q = &port->rxqs[qid];
+
+ if (pinned && q->core_id != OVS_CORE_UNSPEC) {
+ struct dp_netdev_pmd_thread *pmd;
+
+ pmd = dp_netdev_get_pmd(dp, q->core_id);
+ if (!pmd) {
+ VLOG_WARN("There is no PMD thread on core %d. Queue "
+ "%d on port \'%s\' will not be polled.",
+ q->core_id, qid, netdev_get_name(port->netdev));
+ } else {
+ q->pmd = pmd;
+ pmd->isolated = true;
+ dp_netdev_pmd_unref(pmd);
+ }
+ } else if (!pinned && q->core_id == OVS_CORE_UNSPEC) {
+ uint64_t cycle_hist = 0;
+
+ if (n_rxqs == 0) {
+ rxqs = xmalloc(sizeof *rxqs);
+ } else {
+ rxqs = xrealloc(rxqs, sizeof *rxqs * (n_rxqs + 1));
+ }
+ /* Sum the queue intervals and store the cycle history. */
+ for (unsigned i = 0; i < PMD_RXQ_INTERVAL_MAX; i++) {
+ cycle_hist += dp_netdev_rxq_get_intrvl_cycles(q, i);
+ }
+ dp_netdev_rxq_set_cycles(q, RXQ_CYCLES_PROC_HIST, cycle_hist);
+
+ /* Store the queue. */
+ rxqs[n_rxqs++] = q;
+ }
+ }
+ }
+
+ if (n_rxqs > 1) {
+ /* Sort the queues in order of the processing cycles
+ * they consumed during their last pmd interval. */
+ qsort(rxqs, n_rxqs, sizeof *rxqs, compare_rxq_cycles);
+ }
+
+ rr_numa_list_populate(dp, &rr);
+ /* Assign the sorted queues to pmds in round robin. */
+ for (i = 0; i < n_rxqs; i++) {
+ numa_id = netdev_get_numa_id(rxqs[i]->port->netdev);
+ numa = rr_numa_list_lookup(&rr, numa_id);
+ if (!numa) {
+ /* There are no pmds on the queue's local NUMA node.
+ Round robin on the NUMA nodes that do have pmds. */
+ non_local_numa = rr_numa_list_next(&rr, non_local_numa);
+ if (!non_local_numa) {
+ VLOG_ERR("There is no available (non-isolated) pmd "
+ "thread for port \'%s\' queue %d. This queue "
+ "will not be polled. Is pmd-cpu-mask set to "
+ "zero? Or are all PMDs isolated to other "
+ "queues?", netdev_rxq_get_name(rxqs[i]->rx),
+ netdev_rxq_get_queue_id(rxqs[i]->rx));
+ continue;
+ }
+ rxqs[i]->pmd = rr_numa_get_pmd(non_local_numa);
+ VLOG_WARN("There's no available (non-isolated) pmd thread "
+ "on numa node %d. Queue %d on port \'%s\' will "
+ "be assigned to the pmd on core %d "
+ "(numa node %d). Expect reduced performance.",
+ numa_id, netdev_rxq_get_queue_id(rxqs[i]->rx),
+ netdev_rxq_get_name(rxqs[i]->rx),
+ rxqs[i]->pmd->core_id, rxqs[i]->pmd->numa_id);
+ } else {
+ rxqs[i]->pmd = rr_numa_get_pmd(numa);
+ VLOG_INFO("Core %d on numa node %d assigned port \'%s\' "
+ "rx queue %d (measured processing cycles %"PRIu64").",
+ rxqs[i]->pmd->core_id, numa_id,
+ netdev_rxq_get_name(rxqs[i]->rx),
+ netdev_rxq_get_queue_id(rxqs[i]->rx),
+ dp_netdev_rxq_get_cycles(rxqs[i], RXQ_CYCLES_PROC_HIST));
+ }
+ }
+
+ rr_numa_list_destroy(&rr);
+ free(rxqs);
+}
+
+static void
+reload_affected_pmds(struct dp_netdev *dp)
+{
+ struct dp_netdev_pmd_thread *pmd;
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->need_reload) {
+ dp_netdev_reload_pmd__(pmd);
+ pmd->need_reload = false;
+ }
+ }
+}
+
static void
reconfigure_pmd_threads(struct dp_netdev *dp)
OVS_REQUIRES(dp->port_mutex)
{
- struct dp_netdev_port *port, *next;
- int n_cores;
+ struct dp_netdev_pmd_thread *pmd;
+ struct ovs_numa_dump *pmd_cores;
+ struct ovs_numa_info_core *core;
+ struct hmapx to_delete = HMAPX_INITIALIZER(&to_delete);
+ struct hmapx_node *node;
+ bool changed = false;
+ bool need_to_adjust_static_tx_qids = false;
+
+ /* The pmd threads should be started only if there's a pmd port in the
+ * datapath. If the user didn't provide any "pmd-cpu-mask", we start
+ * NR_PMD_THREADS per numa node. */
+ if (!has_pmd_port(dp)) {
+ pmd_cores = ovs_numa_dump_n_cores_per_numa(0);
+ } else if (dp->pmd_cmask && dp->pmd_cmask[0]) {
+ pmd_cores = ovs_numa_dump_cores_with_cmask(dp->pmd_cmask);
+ } else {
+ pmd_cores = ovs_numa_dump_n_cores_per_numa(NR_PMD_THREADS);
+ }
+
+ /* We need to adjust 'static_tx_qid's only if we're reducing number of
+ * PMD threads. Otherwise, new threads will allocate all the freed ids. */
+ if (ovs_numa_dump_count(pmd_cores) < cmap_count(&dp->poll_threads) - 1) {
+ /* Adjustment is required to keep 'static_tx_qid's sequential and
+ * avoid possible issues, for example, imbalanced tx queue usage
+ * and unnecessary locking caused by remapping on netdev level. */
+ need_to_adjust_static_tx_qids = true;
+ }
+
+ /* Check for unwanted pmd threads */
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->core_id == NON_PMD_CORE_ID) {
+ continue;
+ }
+ if (!ovs_numa_dump_contains_core(pmd_cores, pmd->numa_id,
+ pmd->core_id)) {
+ hmapx_add(&to_delete, pmd);
+ } else if (need_to_adjust_static_tx_qids) {
+ pmd->need_reload = true;
+ }
+ }
+
+ HMAPX_FOR_EACH (node, &to_delete) {
+ pmd = (struct dp_netdev_pmd_thread *) node->data;
+ VLOG_INFO("PMD thread on numa_id: %d, core id: %2d destroyed.",
+ pmd->numa_id, pmd->core_id);
+ dp_netdev_del_pmd(dp, pmd);
+ }
+ changed = !hmapx_is_empty(&to_delete);
+ hmapx_destroy(&to_delete);
+
+ if (need_to_adjust_static_tx_qids) {
+ /* 'static_tx_qid's are not sequential now.
+ * Reload remaining threads to fix this. */
+ reload_affected_pmds(dp);
+ }
+
+ /* Check for required new pmd threads */
+ FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
+ pmd = dp_netdev_get_pmd(dp, core->core_id);
+ if (!pmd) {
+ pmd = xzalloc(sizeof *pmd);
+ dp_netdev_configure_pmd(pmd, dp, core->core_id, core->numa_id);
+ pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
+ VLOG_INFO("PMD thread on numa_id: %d, core id: %2d created.",
+ pmd->numa_id, pmd->core_id);
+ changed = true;
+ } else {
+ dp_netdev_pmd_unref(pmd);
+ }
+ }
+
+ if (changed) {
+ struct ovs_numa_info_numa *numa;
+
+ /* Log the number of pmd threads per numa node. */
+ FOR_EACH_NUMA_ON_DUMP (numa, pmd_cores) {
+ VLOG_INFO("There are %"PRIuSIZE" pmd threads on numa node %d",
+ numa->n_cores, numa->numa_id);
+ }
+ }
+
+ ovs_numa_dump_destroy(pmd_cores);
+}
+
+static void
+pmd_remove_stale_ports(struct dp_netdev *dp,
+ struct dp_netdev_pmd_thread *pmd)
+ OVS_EXCLUDED(pmd->port_mutex)
+ OVS_REQUIRES(dp->port_mutex)
+{
+ struct rxq_poll *poll, *poll_next;
+ struct tx_port *tx, *tx_next;
+
+ ovs_mutex_lock(&pmd->port_mutex);
+ HMAP_FOR_EACH_SAFE (poll, poll_next, node, &pmd->poll_list) {
+ struct dp_netdev_port *port = poll->rxq->port;
+
+ if (port->need_reconfigure
+ || !hmap_contains(&dp->ports, &port->node)) {
+ dp_netdev_del_rxq_from_pmd(pmd, poll);
+ }
+ }
+ HMAP_FOR_EACH_SAFE (tx, tx_next, node, &pmd->tx_ports) {
+ struct dp_netdev_port *port = tx->port;
+
+ if (port->need_reconfigure
+ || !hmap_contains(&dp->ports, &port->node)) {
+ dp_netdev_del_port_tx_from_pmd(pmd, tx);
+ }
+ }
+ ovs_mutex_unlock(&pmd->port_mutex);
+}
+
+/* Must be called each time a port is added/removed or the cmask changes.
+ * This creates and destroys pmd threads, reconfigures ports, opens their
+ * rxqs and assigns all rxqs/txqs to pmd threads. */
+static void
+reconfigure_datapath(struct dp_netdev *dp)
+ OVS_REQUIRES(dp->port_mutex)
+{
+ struct dp_netdev_pmd_thread *pmd;
+ struct dp_netdev_port *port;
+ int wanted_txqs;
dp->last_reconfigure_seq = seq_read(dp->reconfigure_seq);
- dp_netdev_destroy_all_pmds(dp);
+ /* Step 1: Adjust the pmd threads based on the datapath ports, the cores
+ * on the system and the user configuration. */
+ reconfigure_pmd_threads(dp);
- /* Reconfigures the cpu mask. */
- ovs_numa_set_cpu_mask(dp->pmd_cmask);
+ wanted_txqs = cmap_count(&dp->poll_threads);
- n_cores = ovs_numa_get_n_cores();
- if (n_cores == OVS_CORE_UNSPEC) {
- VLOG_ERR("Cannot get cpu core info");
- return;
+ /* The number of pmd threads might have changed, or a port can be new:
+ * adjust the txqs. */
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ netdev_set_tx_multiq(port->netdev, wanted_txqs);
}
- HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) {
+ /* Step 2: Remove from the pmd threads ports that have been removed or
+ * need reconfiguration. */
+
+ /* Check for all the ports that need reconfiguration. We cache this in
+ * 'port->need_reconfigure', because netdev_is_reconf_required() can
+ * change at any time. */
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ if (netdev_is_reconf_required(port->netdev)) {
+ port->need_reconfigure = true;
+ }
+ }
+
+ /* Remove from the pmd threads all the ports that have been deleted or
+ * need reconfiguration. */
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ pmd_remove_stale_ports(dp, pmd);
+ }
+
+ /* Reload affected pmd threads. We must wait for the pmd threads before
+ * reconfiguring the ports, because a port cannot be reconfigured while
+ * it's being used. */
+ reload_affected_pmds(dp);
+
+ /* Step 3: Reconfigure ports. */
+
+ /* We only reconfigure the ports that we determined above, because they're
+ * not being used by any pmd thread at the moment. If a port fails to
+ * reconfigure we remove it from the datapath. */
+ struct dp_netdev_port *next_port;
+ HMAP_FOR_EACH_SAFE (port, next_port, node, &dp->ports) {
int err;
+ if (!port->need_reconfigure) {
+ continue;
+ }
+
err = port_reconfigure(port);
if (err) {
hmap_remove(&dp->ports, &port->node);
seq_change(dp->port_seq);
port_destroy(port);
} else {
- port->dynamic_txqs = netdev_n_txq(port->netdev) < n_cores + 1;
+ port->dynamic_txqs = netdev_n_txq(port->netdev) < wanted_txqs;
}
}
- /* Restores the non-pmd. */
- dp_netdev_set_nonpmd(dp);
- /* Restores all pmd threads. */
- dp_netdev_reset_pmd_threads(dp);
+
+ /* Step 4: Compute new rxq scheduling. We don't touch the pmd threads
+ * for now, we just update the 'pmd' pointer in each rxq to point to the
+ * wanted thread according to the scheduling policy. */
+
+ /* Reset all the pmd threads to non isolated. */
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ pmd->isolated = false;
+ }
+
+ /* Reset all the queues to unassigned */
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ for (int i = 0; i < port->n_rxq; i++) {
+ port->rxqs[i].pmd = NULL;
+ }
+ }
+
+ /* Add pinned queues and mark pmd threads isolated. */
+ rxq_scheduling(dp, true);
+
+ /* Add non-pinned queues. */
+ rxq_scheduling(dp, false);
+
+ /* Step 5: Remove queues not compliant with new scheduling. */
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ struct rxq_poll *poll, *poll_next;
+
+ ovs_mutex_lock(&pmd->port_mutex);
+ HMAP_FOR_EACH_SAFE (poll, poll_next, node, &pmd->poll_list) {
+ if (poll->rxq->pmd != pmd) {
+ dp_netdev_del_rxq_from_pmd(pmd, poll);
+ }
+ }
+ ovs_mutex_unlock(&pmd->port_mutex);
+ }
+
+ /* Reload affected pmd threads. We must wait for the pmd threads to remove
+ * the old queues before readding them, otherwise a queue can be polled by
+ * two threads at the same time. */
+ reload_affected_pmds(dp);
+
+ /* Step 6: Add queues from scheduling, if they're not there already. */
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ if (!netdev_is_pmd(port->netdev)) {
+ continue;
+ }
+
+ for (int qid = 0; qid < port->n_rxq; qid++) {
+ struct dp_netdev_rxq *q = &port->rxqs[qid];
+
+ if (q->pmd) {
+ ovs_mutex_lock(&q->pmd->port_mutex);
+ dp_netdev_add_rxq_to_pmd(q->pmd, q);
+ ovs_mutex_unlock(&q->pmd->port_mutex);
+ }
+ }
+ }
+
+ /* Add every port to the tx cache of every pmd thread, if it's not
+ * there already and if this pmd has at least one rxq to poll. */
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ ovs_mutex_lock(&pmd->port_mutex);
+ if (hmap_count(&pmd->poll_list) || pmd->core_id == NON_PMD_CORE_ID) {
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ dp_netdev_add_port_tx_to_pmd(pmd, port);
+ }
+ }
+ ovs_mutex_unlock(&pmd->port_mutex);
+ }
+
+ /* Reload affected pmd threads. */
+ reload_affected_pmds(dp);
}
/* Returns true if one of the netdevs in 'dp' requires a reconfiguration */
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_pmd_thread *non_pmd;
uint64_t new_tnl_seq;
+ int process_packets = 0;
ovs_mutex_lock(&dp->port_mutex);
non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
if (non_pmd) {
ovs_mutex_lock(&dp->non_pmd_mutex);
+ cycles_count_start(non_pmd);
HMAP_FOR_EACH (port, node, &dp->ports) {
if (!netdev_is_pmd(port->netdev)) {
int i;
for (i = 0; i < port->n_rxq; i++) {
- dp_netdev_process_rxq_port(non_pmd, port,
- port->rxqs[i].rxq);
+ process_packets =
+ dp_netdev_process_rxq_port(non_pmd,
+ port->rxqs[i].rx,
+ port->port_no);
+ cycles_count_intermediate(non_pmd, NULL,
+ process_packets
+ ? PMD_CYCLES_PROCESSING
+ : PMD_CYCLES_IDLE);
}
}
}
+ cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
dpif_netdev_xps_revalidate_pmd(non_pmd, time_msec(), false);
ovs_mutex_unlock(&dp->non_pmd_mutex);
}
if (dp_netdev_is_reconf_required(dp) || ports_require_restart(dp)) {
- reconfigure_pmd_threads(dp);
+ reconfigure_datapath(dp);
}
ovs_mutex_unlock(&dp->port_mutex);
int i;
for (i = 0; i < port->n_rxq; i++) {
- netdev_rxq_wait(port->rxqs[i].rxq);
+ netdev_rxq_wait(port->rxqs[i].rx);
}
}
}
}
/* Copies ports from 'pmd->tx_ports' (shared with the main thread) to
- * 'pmd->port_cache' (thread local) */
+ * thread-local copies. Copy to 'pmd->tnl_port_cache' if it is a tunnel
+ * device, otherwise to 'pmd->send_port_cache' if the port has at least
+ * one txq. */
static void
pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
OVS_REQUIRES(pmd->port_mutex)
}
}
+static void
+pmd_alloc_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
+{
+ ovs_mutex_lock(&pmd->dp->tx_qid_pool_mutex);
+ if (!id_pool_alloc_id(pmd->dp->tx_qid_pool, &pmd->static_tx_qid)) {
+ VLOG_ABORT("static_tx_qid allocation failed for PMD on core %2d"
+ ", numa_id %d.", pmd->core_id, pmd->numa_id);
+ }
+ ovs_mutex_unlock(&pmd->dp->tx_qid_pool_mutex);
+
+ VLOG_DBG("static_tx_qid = %d allocated for PMD thread on core %2d"
+ ", numa_id %d.", pmd->static_tx_qid, pmd->core_id, pmd->numa_id);
+}
+
+static void
+pmd_free_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
+{
+ ovs_mutex_lock(&pmd->dp->tx_qid_pool_mutex);
+ id_pool_free_id(pmd->dp->tx_qid_pool, pmd->static_tx_qid);
+ ovs_mutex_unlock(&pmd->dp->tx_qid_pool_mutex);
+}
+
static int
pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
- struct rxq_poll **ppoll_list)
+ struct polled_queue **ppoll_list)
{
- struct rxq_poll *poll_list = *ppoll_list;
+ struct polled_queue *poll_list = *ppoll_list;
struct rxq_poll *poll;
int i;
ovs_mutex_lock(&pmd->port_mutex);
- poll_list = xrealloc(poll_list, pmd->poll_cnt * sizeof *poll_list);
+ poll_list = xrealloc(poll_list, hmap_count(&pmd->poll_list)
+ * sizeof *poll_list);
i = 0;
- LIST_FOR_EACH (poll, node, &pmd->poll_list) {
- poll_list[i++] = *poll;
+ HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+ poll_list[i].rxq = poll->rxq;
+ poll_list[i].port_no = poll->rxq->port->port_no;
+ i++;
}
pmd_load_cached_ports(pmd);
{
struct dp_netdev_pmd_thread *pmd = f_;
unsigned int lc = 0;
- struct rxq_poll *poll_list;
+ struct polled_queue *poll_list;
bool exiting;
int poll_cnt;
int i;
+ int process_packets = 0;
poll_list = NULL;
ovs_numa_thread_setaffinity_core(pmd->core_id);
dpdk_set_lcore_id(pmd->core_id);
poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
-reload:
emc_cache_init(&pmd->flow_cache);
+reload:
+ pmd_alloc_static_tx_qid(pmd);
/* List port/core affinity */
for (i = 0; i < poll_cnt; i++) {
VLOG_DBG("Core %d processing port \'%s\' with queue-id %d\n",
- pmd->core_id, netdev_get_name(poll_list[i].port->netdev),
- netdev_rxq_get_queue_id(poll_list[i].rx));
+ pmd->core_id, netdev_rxq_get_name(poll_list[i].rxq->rx),
+ netdev_rxq_get_queue_id(poll_list[i].rxq->rx));
}
if (!poll_cnt) {
lc = UINT_MAX;
}
+ cycles_count_start(pmd);
for (;;) {
for (i = 0; i < poll_cnt; i++) {
- dp_netdev_process_rxq_port(pmd, poll_list[i].port, poll_list[i].rx);
+ process_packets =
+ dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx,
+ poll_list[i].port_no);
+ cycles_count_intermediate(pmd, poll_list[i].rxq,
+ process_packets ? PMD_CYCLES_PROCESSING
+ : PMD_CYCLES_IDLE);
}
if (lc++ > 1024) {
lc = 0;
coverage_try_clear();
- dp_netdev_pmd_try_optimize(pmd);
+ dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt);
if (!ovsrcu_try_quiesce()) {
emc_cache_slow_sweep(&pmd->flow_cache);
}
}
}
+ cycles_count_end(pmd, PMD_CYCLES_IDLE);
+
poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
exiting = latch_is_set(&pmd->exit_latch);
/* Signal here to make sure the pmd finishes
* reloading the updated configuration. */
dp_netdev_pmd_reload_done(pmd);
- emc_cache_uninit(&pmd->flow_cache);
+ pmd_free_static_tx_qid(pmd);
if (!exiting) {
goto reload;
}
+ emc_cache_uninit(&pmd->flow_cache);
free(poll_list);
pmd_free_cached_ports(pmd);
return NULL;
fat_rwlock_wrlock(&dp->upcall_rwlock);
}
+\f
+/* Meters */
+static void
+dpif_netdev_meter_get_features(const struct dpif * dpif OVS_UNUSED,
+ struct ofputil_meter_features *features)
+{
+ features->max_meters = MAX_METERS;
+ features->band_types = DP_SUPPORTED_METER_BAND_TYPES;
+ features->capabilities = DP_SUPPORTED_METER_FLAGS_MASK;
+ features->max_bands = MAX_BANDS;
+ features->max_color = 0;
+}
+
+/* Returns false when packet needs to be dropped. */
+static void
+dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
+ uint32_t meter_id, long long int now)
+{
+ struct dp_meter *meter;
+ struct dp_meter_band *band;
+ struct dp_packet *packet;
+ long long int long_delta_t; /* msec */
+ uint32_t delta_t; /* msec */
+ int i;
+ const size_t cnt = dp_packet_batch_size(packets_);
+ uint32_t bytes, volume;
+ int exceeded_band[NETDEV_MAX_BURST];
+ uint32_t exceeded_rate[NETDEV_MAX_BURST];
+ int exceeded_pkt = cnt; /* First packet that exceeded a band rate. */
+
+ if (meter_id >= MAX_METERS) {
+ return;
+ }
+
+ meter_lock(dp, meter_id);
+ meter = dp->meters[meter_id];
+ if (!meter) {
+ goto out;
+ }
+
+ /* Initialize as negative values. */
+ memset(exceeded_band, 0xff, cnt * sizeof *exceeded_band);
+ /* Initialize as zeroes. */
+ memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
+
+ /* All packets will hit the meter at the same time. */
+ long_delta_t = (now - meter->used); /* msec */
+
+ /* Make sure delta_t will not be too large, so that bucket will not
+ * wrap around below. */
+ delta_t = (long_delta_t > (long long int)meter->max_delta_t)
+ ? meter->max_delta_t : (uint32_t)long_delta_t;
+
+ /* Update meter stats. */
+ meter->used = now;
+ meter->packet_count += cnt;
+ bytes = 0;
+ DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+ bytes += dp_packet_size(packet);
+ }
+ meter->byte_count += bytes;
+
+ /* Meters can operate in terms of packets per second or kilobits per
+ * second. */
+ if (meter->flags & OFPMF13_PKTPS) {
+ /* Rate in packets/second, bucket 1/1000 packets. */
+ /* msec * packets/sec = 1/1000 packets. */
+ volume = cnt * 1000; /* Take 'cnt' packets from the bucket. */
+ } else {
+ /* Rate in kbps, bucket in bits. */
+ /* msec * kbps = bits */
+ volume = bytes * 8;
+ }
+
+ /* Update all bands and find the one hit with the highest rate for each
+ * packet (if any). */
+ for (int m = 0; m < meter->n_bands; ++m) {
+ band = &meter->bands[m];
+
+ /* Update band's bucket. */
+ band->bucket += delta_t * band->up.rate;
+ if (band->bucket > band->up.burst_size) {
+ band->bucket = band->up.burst_size;
+ }
+
+ /* Drain the bucket for all the packets, if possible. */
+ if (band->bucket >= volume) {
+ band->bucket -= volume;
+ } else {
+ int band_exceeded_pkt;
+
+ /* Band limit hit, must process packet-by-packet. */
+ if (meter->flags & OFPMF13_PKTPS) {
+ band_exceeded_pkt = band->bucket / 1000;
+ band->bucket %= 1000; /* Remainder stays in bucket. */
+
+ /* Update the exceeding band for each exceeding packet.
+ * (Only one band will be fired by a packet, and that
+ * can be different for each packet.) */
+ for (i = band_exceeded_pkt; i < cnt; i++) {
+ if (band->up.rate > exceeded_rate[i]) {
+ exceeded_rate[i] = band->up.rate;
+ exceeded_band[i] = m;
+ }
+ }
+ } else {
+ /* Packet sizes differ, must process one-by-one. */
+ band_exceeded_pkt = cnt;
+ DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+ uint32_t bits = dp_packet_size(packet) * 8;
+
+ if (band->bucket >= bits) {
+ band->bucket -= bits;
+ } else {
+ if (i < band_exceeded_pkt) {
+ band_exceeded_pkt = i;
+ }
+ /* Update the exceeding band for the exceeding packet.
+ * (Only one band will be fired by a packet, and that
+ * can be different for each packet.) */
+ if (band->up.rate > exceeded_rate[i]) {
+ exceeded_rate[i] = band->up.rate;
+ exceeded_band[i] = m;
+ }
+ }
+ }
+ }
+ /* Remember the first exceeding packet. */
+ if (exceeded_pkt > band_exceeded_pkt) {
+ exceeded_pkt = band_exceeded_pkt;
+ }
+ }
+ }
+
+ /* Fire the highest rate band exceeded by each packet.
+ * Drop packets if needed, by swapping packet to the end that will be
+ * ignored. */
+ size_t j;
+ DP_PACKET_BATCH_REFILL_FOR_EACH (j, cnt, packet, packets_) {
+ if (exceeded_band[j] >= 0) {
+ /* Meter drop packet. */
+ band = &meter->bands[exceeded_band[j]];
+ band->packet_count += 1;
+ band->byte_count += dp_packet_size(packet);
+
+ dp_packet_delete(packet);
+ } else {
+ /* Meter accepts packet. */
+ dp_packet_batch_refill(packets_, packet, j);
+ }
+ }
+ out:
+ meter_unlock(dp, meter_id);
+}
+
+/* Meter set/get/del processing is still single-threaded. */
+static int
+dpif_netdev_meter_set(struct dpif *dpif, ofproto_meter_id *meter_id,
+ struct ofputil_meter_config *config)
+{
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+ uint32_t mid = meter_id->uint32;
+ struct dp_meter *meter;
+ int i;
+
+ if (mid >= MAX_METERS) {
+ return EFBIG; /* Meter_id out of range. */
+ }
+
+ if (config->flags & ~DP_SUPPORTED_METER_FLAGS_MASK ||
+ !(config->flags & (OFPMF13_KBPS | OFPMF13_PKTPS))) {
+ return EBADF; /* Unsupported flags set */
+ }
+
+ /* Validate bands */
+ if (config->n_bands == 0 || config->n_bands > MAX_BANDS) {
+ return EINVAL; /* Too many bands */
+ }
+
+ /* Validate rates */
+ for (i = 0; i < config->n_bands; i++) {
+ if (config->bands[i].rate == 0) {
+ return EDOM; /* rate must be non-zero */
+ }
+ }
+
+ for (i = 0; i < config->n_bands; ++i) {
+ switch (config->bands[i].type) {
+ case OFPMBT13_DROP:
+ break;
+ default:
+ return ENODEV; /* Unsupported band type */
+ }
+ }
+
+ /* Allocate meter */
+ meter = xzalloc(sizeof *meter
+ + config->n_bands * sizeof(struct dp_meter_band));
+ if (meter) {
+ meter->flags = config->flags;
+ meter->n_bands = config->n_bands;
+ meter->max_delta_t = 0;
+ meter->used = time_msec();
+
+ /* set up bands */
+ for (i = 0; i < config->n_bands; ++i) {
+ uint32_t band_max_delta_t;
+
+ /* Set burst size to a workable value if none specified. */
+ if (config->bands[i].burst_size == 0) {
+ config->bands[i].burst_size = config->bands[i].rate;
+ }
+
+ meter->bands[i].up = config->bands[i];
+ /* Convert burst size to the bucket units: */
+ /* pkts => 1/1000 packets, kilobits => bits. */
+ meter->bands[i].up.burst_size *= 1000;
+ /* Initialize bucket to empty. */
+ meter->bands[i].bucket = 0;
+
+ /* Figure out max delta_t that is enough to fill any bucket. */
+ band_max_delta_t
+ = meter->bands[i].up.burst_size / meter->bands[i].up.rate;
+ if (band_max_delta_t > meter->max_delta_t) {
+ meter->max_delta_t = band_max_delta_t;
+ }
+ }
+
+ meter_lock(dp, mid);
+ dp_delete_meter(dp, mid); /* Free existing meter, if any */
+ dp->meters[mid] = meter;
+ meter_unlock(dp, mid);
+
+ return 0;
+ }
+ return ENOMEM;
+}
+
+static int
+dpif_netdev_meter_get(const struct dpif *dpif,
+ ofproto_meter_id meter_id_,
+ struct ofputil_meter_stats *stats, uint16_t n_bands)
+{
+ const struct dp_netdev *dp = get_dp_netdev(dpif);
+ const struct dp_meter *meter;
+ uint32_t meter_id = meter_id_.uint32;
+
+ if (meter_id >= MAX_METERS) {
+ return EFBIG;
+ }
+ meter = dp->meters[meter_id];
+ if (!meter) {
+ return ENOENT;
+ }
+ if (stats) {
+ int i = 0;
+
+ meter_lock(dp, meter_id);
+ stats->packet_in_count = meter->packet_count;
+ stats->byte_in_count = meter->byte_count;
+
+ for (i = 0; i < n_bands && i < meter->n_bands; ++i) {
+ stats->bands[i].packet_count = meter->bands[i].packet_count;
+ stats->bands[i].byte_count = meter->bands[i].byte_count;
+ }
+ meter_unlock(dp, meter_id);
+
+ stats->n_bands = i;
+ }
+ return 0;
+}
+
+static int
+dpif_netdev_meter_del(struct dpif *dpif,
+ ofproto_meter_id meter_id_,
+ struct ofputil_meter_stats *stats, uint16_t n_bands)
+{
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+ int error;
+
+ error = dpif_netdev_meter_get(dpif, meter_id_, stats, n_bands);
+ if (!error) {
+ uint32_t meter_id = meter_id_.uint32;
+
+ meter_lock(dp, meter_id);
+ dp_delete_meter(dp, meter_id);
+ meter_unlock(dp, meter_id);
+ }
+ return error;
+}
+
+\f
static void
dpif_netdev_disable_upcall(struct dpif *dpif)
OVS_NO_THREAD_SAFETY_ANALYSIS
OVS_REQUIRES(dp->port_mutex)
{
struct dp_netdev_pmd_thread *non_pmd;
- struct dp_netdev_port *port;
non_pmd = xzalloc(sizeof *non_pmd);
dp_netdev_configure_pmd(non_pmd, dp, NON_PMD_CORE_ID, OVS_NUMA_UNSPEC);
-
- HMAP_FOR_EACH (port, node, &dp->ports) {
- dp_netdev_add_port_tx_to_pmd(non_pmd, port);
- }
-
- dp_netdev_reload_pmd__(non_pmd);
}
/* Caller must have valid pointer to 'pmd'. */
pmd->dp = dp;
pmd->core_id = core_id;
pmd->numa_id = numa_id;
- pmd->poll_cnt = 0;
-
- atomic_init(&pmd->static_tx_qid,
- (core_id == NON_PMD_CORE_ID)
- ? ovs_numa_get_n_cores()
- : get_n_pmd_threads(dp));
+ pmd->need_reload = false;
ovs_refcount_init(&pmd->ref_cnt);
latch_init(&pmd->exit_latch);
cmap_init(&pmd->flow_table);
cmap_init(&pmd->classifiers);
pmd->next_optimization = time_msec() + DPCLS_OPTIMIZATION_INTERVAL;
- ovs_list_init(&pmd->poll_list);
+ pmd->rxq_next_cycle_store = time_msec() + PMD_RXQ_INTERVAL_LEN;
+ hmap_init(&pmd->poll_list);
hmap_init(&pmd->tx_ports);
hmap_init(&pmd->tnl_port_cache);
hmap_init(&pmd->send_port_cache);
* actual thread created for NON_PMD_CORE_ID. */
if (core_id == NON_PMD_CORE_ID) {
emc_cache_init(&pmd->flow_cache);
+ pmd_alloc_static_tx_qid(pmd);
}
cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
hash_int(core_id, 0));
hmap_destroy(&pmd->send_port_cache);
hmap_destroy(&pmd->tnl_port_cache);
hmap_destroy(&pmd->tx_ports);
+ hmap_destroy(&pmd->poll_list);
/* All flows (including their dpcls_rules) have been deleted already */
CMAP_FOR_EACH (cls, node, &pmd->classifiers) {
dpcls_destroy(cls);
ovs_mutex_lock(&dp->non_pmd_mutex);
emc_cache_uninit(&pmd->flow_cache);
pmd_free_cached_ports(pmd);
+ pmd_free_static_tx_qid(pmd);
ovs_mutex_unlock(&dp->non_pmd_mutex);
} else {
latch_set(&pmd->exit_latch);
dp_netdev_reload_pmd__(pmd);
- ovs_numa_unpin_core(pmd->core_id);
xpthread_join(pmd->thread, NULL);
}
dp_netdev_pmd_unref(pmd);
}
-/* Destroys all pmd threads, but not the non pmd thread. */
+/* Destroys all pmd threads. If 'non_pmd' is true it also destroys the non pmd
+ * thread. */
static void
-dp_netdev_stop_pmds(struct dp_netdev *dp)
+dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd)
{
struct dp_netdev_pmd_thread *pmd;
struct dp_netdev_pmd_thread **pmd_list;
size_t k = 0, n_pmds;
- n_pmds = get_n_pmd_threads(dp);
+ n_pmds = cmap_count(&dp->poll_threads);
pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- /* We don't need to destroy the non pmd thread */
- if (pmd->core_id == NON_PMD_CORE_ID) {
+ if (!non_pmd && pmd->core_id == NON_PMD_CORE_ID) {
continue;
}
/* We cannot call dp_netdev_del_pmd(), since it alters
free(pmd_list);
}
-/* Destroys all pmd threads, including the non pmd thread. */
-static void
-dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
-{
- struct dp_netdev_pmd_thread *pmd;
- struct dp_netdev_pmd_thread **pmd_list;
- size_t k = 0, n_pmds;
-
- n_pmds = cmap_count(&dp->poll_threads);
- pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
-
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- /* We cannot call dp_netdev_del_pmd(), since it alters
- * 'dp->poll_threads' (while we're iterating it) and it
- * might quiesce. */
- ovs_assert(k < n_pmds);
- pmd_list[k++] = pmd;
- }
-
- for (size_t i = 0; i < k; i++) {
- dp_netdev_del_pmd(dp, pmd_list[i]);
- }
-
- free(pmd_list);
-}
-
/* Deletes all rx queues from pmd->poll_list and all the ports from
* pmd->tx_ports. */
static void
struct tx_port *port;
ovs_mutex_lock(&pmd->port_mutex);
- LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) {
+ HMAP_FOR_EACH_POP (poll, node, &pmd->poll_list) {
free(poll);
}
- pmd->poll_cnt = 0;
HMAP_FOR_EACH_POP (port, node, &pmd->tx_ports) {
free(port);
}
ovs_mutex_unlock(&pmd->port_mutex);
}
-static struct tx_port *
-tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
-{
- struct tx_port *tx;
-
- HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
- if (tx->port->port_no == port_no) {
- return tx;
- }
- }
-
- return NULL;
-}
-
-/* Deletes all rx queues of 'port' from 'poll_list', and the 'port' from
- * 'tx_ports' of 'pmd' thread. Returns true if 'port' was found in 'pmd'
- * (therefore a restart is required). */
-static bool
-dp_netdev_del_port_from_pmd__(struct dp_netdev_port *port,
- struct dp_netdev_pmd_thread *pmd)
-{
- struct rxq_poll *poll, *next;
- struct tx_port *tx;
- bool found = false;
-
- ovs_mutex_lock(&pmd->port_mutex);
- LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
- if (poll->port == port) {
- found = true;
- ovs_list_remove(&poll->node);
- pmd->poll_cnt--;
- free(poll);
- }
- }
-
- tx = tx_port_lookup(&pmd->tx_ports, port->port_no);
- if (tx) {
- hmap_remove(&pmd->tx_ports, &tx->node);
- free(tx);
- found = true;
- }
- ovs_mutex_unlock(&pmd->port_mutex);
-
- return found;
-}
-
-/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
- * threads. The pmd threads that need to be restarted are inserted in
- * 'to_reload'. */
+/* Adds rx queue to poll_list of PMD thread, if it's not there already. */
static void
-dp_netdev_del_port_from_all_pmds__(struct dp_netdev *dp,
- struct dp_netdev_port *port,
- struct hmapx *to_reload)
+dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct dp_netdev_rxq *rxq)
+ OVS_REQUIRES(pmd->port_mutex)
{
- struct dp_netdev_pmd_thread *pmd;
-
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- bool found;
-
- found = dp_netdev_del_port_from_pmd__(port, pmd);
+ int qid = netdev_rxq_get_queue_id(rxq->rx);
+ uint32_t hash = hash_2words(odp_to_u32(rxq->port->port_no), qid);
+ struct rxq_poll *poll;
- if (found) {
- hmapx_add(to_reload, pmd);
+ HMAP_FOR_EACH_WITH_HASH (poll, node, hash, &pmd->poll_list) {
+ if (poll->rxq == rxq) {
+ /* 'rxq' is already polled by this thread. Do nothing. */
+ return;
}
}
-}
-
-/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
- * threads. Reloads the threads if needed. */
-static void
-dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
- struct dp_netdev_port *port)
-{
- struct dp_netdev_pmd_thread *pmd;
- struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
- struct hmapx_node *node;
-
- dp_netdev_del_port_from_all_pmds__(dp, port, &to_reload);
-
- HMAPX_FOR_EACH (node, &to_reload) {
- pmd = (struct dp_netdev_pmd_thread *) node->data;
- dp_netdev_reload_pmd__(pmd);
- }
-
- hmapx_destroy(&to_reload);
-}
-
-
-/* Returns non-isolated PMD thread from this numa node with fewer
- * rx queues to poll. Returns NULL if there is no non-isolated PMD threads
- * on this numa node. Can be called safely only by main thread. */
-static struct dp_netdev_pmd_thread *
-dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
-{
- int min_cnt = -1;
- struct dp_netdev_pmd_thread *pmd, *res = NULL;
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- if (!pmd->isolated && pmd->numa_id == numa_id
- && (min_cnt > pmd->poll_cnt || res == NULL)) {
- min_cnt = pmd->poll_cnt;
- res = pmd;
- }
- }
+ poll = xmalloc(sizeof *poll);
+ poll->rxq = rxq;
+ hmap_insert(&pmd->poll_list, &poll->node, hash);
- return res;
+ pmd->need_reload = true;
}
-/* Adds rx queue to poll_list of PMD thread. */
+/* Delete 'poll' from poll_list of PMD thread. */
static void
-dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
- struct dp_netdev_port *port, struct netdev_rxq *rx)
+dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct rxq_poll *poll)
OVS_REQUIRES(pmd->port_mutex)
{
- struct rxq_poll *poll = xmalloc(sizeof *poll);
+ hmap_remove(&pmd->poll_list, &poll->node);
+ free(poll);
- poll->port = port;
- poll->rx = rx;
-
- ovs_list_push_back(&pmd->poll_list, &poll->node);
- pmd->poll_cnt++;
+ pmd->need_reload = true;
}
/* Add 'port' to the tx port cache of 'pmd', which must be reloaded for the
static void
dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev_port *port)
+ OVS_REQUIRES(pmd->port_mutex)
{
struct tx_port *tx;
+ tx = tx_port_lookup(&pmd->tx_ports, port->port_no);
+ if (tx) {
+ /* 'port' is already on this thread tx cache. Do nothing. */
+ return;
+ }
+
tx = xzalloc(sizeof *tx);
tx->port = port;
tx->qid = -1;
- ovs_mutex_lock(&pmd->port_mutex);
hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
- ovs_mutex_unlock(&pmd->port_mutex);
-}
-
-/* Distribute all {pinned|non-pinned} rx queues of 'port' between PMD
- * threads in 'dp'. The pmd threads that need to be restarted are inserted
- * in 'to_reload'. PMD threads with pinned queues marked as isolated. */
-static void
-dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,
- struct dp_netdev_port *port,
- struct hmapx *to_reload, bool pinned)
-{
- int numa_id = netdev_get_numa_id(port->netdev);
- struct dp_netdev_pmd_thread *pmd;
- int i;
-
- if (!netdev_is_pmd(port->netdev)) {
- return;
- }
-
- for (i = 0; i < port->n_rxq; i++) {
- if (pinned) {
- if (port->rxqs[i].core_id == OVS_CORE_UNSPEC) {
- continue;
- }
- pmd = dp_netdev_get_pmd(dp, port->rxqs[i].core_id);
- if (!pmd) {
- VLOG_WARN("There is no PMD thread on core %d. "
- "Queue %d on port \'%s\' will not be polled.",
- port->rxqs[i].core_id, i,
- netdev_get_name(port->netdev));
- continue;
- }
- pmd->isolated = true;
- dp_netdev_pmd_unref(pmd);
- } else {
- if (port->rxqs[i].core_id != OVS_CORE_UNSPEC) {
- continue;
- }
- pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
- if (!pmd) {
- VLOG_WARN("There's no available pmd thread on numa node %d",
- numa_id);
- break;
- }
- }
-
- ovs_mutex_lock(&pmd->port_mutex);
- dp_netdev_add_rxq_to_pmd(pmd, port, port->rxqs[i].rxq);
- ovs_mutex_unlock(&pmd->port_mutex);
-
- hmapx_add(to_reload, pmd);
- }
-}
-
-/* Distributes all non-pinned rx queues of 'port' between all PMD threads
- * in 'dp' and inserts 'port' in the PMD threads 'tx_ports'. The pmd threads
- * that need to be restarted are inserted in 'to_reload'. */
-static void
-dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,
- struct hmapx *to_reload)
-{
- struct dp_netdev_pmd_thread *pmd;
-
- dp_netdev_add_port_rx_to_pmds(dp, port, to_reload, false);
-
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- dp_netdev_add_port_tx_to_pmd(pmd, port);
- hmapx_add(to_reload, pmd);
- }
-}
-
-/* Distributes all non-pinned rx queues of 'port' between all PMD threads
- * in 'dp', inserts 'port' in the PMD threads 'tx_ports' and reloads them,
- * if needed. */
-static void
-dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
-{
- struct dp_netdev_pmd_thread *pmd;
- struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
- struct hmapx_node *node;
-
- dp_netdev_add_port_to_pmds__(dp, port, &to_reload);
-
- HMAPX_FOR_EACH (node, &to_reload) {
- pmd = (struct dp_netdev_pmd_thread *) node->data;
- dp_netdev_reload_pmd__(pmd);
- }
-
- hmapx_destroy(&to_reload);
-}
-
-static void
-dp_netdev_start_pmds_on_numa(struct dp_netdev *dp, int numa_id)
-{
- int can_have, n_unpinned, i;
-
- n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
- if (!n_unpinned) {
- VLOG_WARN("Cannot create pmd threads due to out of unpinned "
- "cores on numa node %d", numa_id);
- return;
- }
-
- /* If cpu mask is specified, uses all unpinned cores, otherwise
- * tries creating NR_PMD_THREADS pmd threads. */
- can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS);
- for (i = 0; i < can_have; i++) {
- unsigned core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
- struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
- struct dp_netdev_port *port;
-
- dp_netdev_configure_pmd(pmd, dp, core_id, numa_id);
-
- HMAP_FOR_EACH (port, node, &dp->ports) {
- dp_netdev_add_port_tx_to_pmd(pmd, port);
- }
-
- pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
- }
- VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id);
+ pmd->need_reload = true;
}
-/* Starts pmd threads, if not already started. The function takes care of
- * filling the threads tx port cache. */
+/* Del 'tx' from the tx port cache of 'pmd', which must be reloaded for the
+ * changes to take effect. */
static void
-dp_netdev_start_pmds(struct dp_netdev *dp)
- OVS_REQUIRES(dp->port_mutex)
+dp_netdev_del_port_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct tx_port *tx)
+ OVS_REQUIRES(pmd->port_mutex)
{
- int n_pmds;
-
- n_pmds = get_n_pmd_threads(dp);
-
- /* If there are already pmd threads created for the datapath, do nothing.
- * Else, creates the pmd threads. */
- if (!n_pmds) {
- int n_numas = ovs_numa_get_n_numas();
-
- for (int numa_id = 0; numa_id < n_numas; numa_id++) {
- dp_netdev_start_pmds_on_numa(dp, numa_id);
- }
- }
+ hmap_remove(&pmd->tx_ports, &tx->node);
+ free(tx);
+ pmd->need_reload = true;
}
-
\f
-/* Called after pmd threads config change. Restarts pmd threads with
- * new configuration. */
-static void
-dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
- OVS_REQUIRES(dp->port_mutex)
-{
- struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
- struct dp_netdev_pmd_thread *pmd;
- struct dp_netdev_port *port;
- struct hmapx_node *node;
-
- dp_netdev_start_pmds(dp);
- /* Distribute only pinned rx queues first to mark threads as isolated */
- HMAP_FOR_EACH (port, node, &dp->ports) {
- dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, true);
- }
-
- /* Distribute remaining non-pinned rx queues to non-isolated PMD threads. */
- HMAP_FOR_EACH (port, node, &dp->ports) {
- dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, false);
- }
-
- HMAPX_FOR_EACH (node, &to_reload) {
- pmd = (struct dp_netdev_pmd_thread *) node->data;
- dp_netdev_reload_pmd__(pmd);
- }
-
- hmapx_destroy(&to_reload);
-}
-
static char *
dpif_netdev_get_datapath_version(void)
{
ofpbuf_init(&key, 0);
odp_flow_key_from_flow(&odp_parms, &key);
- packet_str = ofp_packet_to_string(dp_packet_data(packet_),
- dp_packet_size(packet_));
+ packet_str = ofp_dp_packet_to_string(packet_);
odp_flow_key_format(key.data, key.size, &ds);
actions, wc, put_actions, dp->upcall_aux);
}
+static inline uint32_t
+dpif_netdev_packet_get_rss_hash_orig_pkt(struct dp_packet *packet,
+ const struct miniflow *mf)
+{
+ uint32_t hash;
+
+ if (OVS_LIKELY(dp_packet_rss_valid(packet))) {
+ hash = dp_packet_get_rss_hash(packet);
+ } else {
+ hash = miniflow_hash_5tuple(mf, 0);
+ dp_packet_set_rss_hash(packet, hash);
+ }
+
+ return hash;
+}
+
static inline uint32_t
dpif_netdev_packet_get_rss_hash(struct dp_packet *packet,
const struct miniflow *mf)
static inline void
dp_netdev_queue_batches(struct dp_packet *pkt,
struct dp_netdev_flow *flow, const struct miniflow *mf,
- struct packet_batch_per_flow *batches, size_t *n_batches)
+ struct packet_batch_per_flow *batches,
+ size_t *n_batches)
{
struct packet_batch_per_flow *batch = flow->batch;
* The function returns the number of packets that needs to be processed in the
* 'packets' array (they have been moved to the beginning of the vector).
*
- * If 'md_is_valid' is false, the metadata in 'packets' is not valid and must be
- * initialized by this function using 'port_no'.
+ * For performance reasons a caller may choose not to initialize the metadata
+ * in 'packets_'. If 'md_is_valid' is false, the metadata in 'packets'
+ * is not valid and must be initialized by this function using 'port_no'.
+ * If 'md_is_valid' is true, the metadata is already valid and 'port_no'
+ * will be ignored.
*/
static inline size_t
-emc_processing(struct dp_netdev_pmd_thread *pmd, struct dp_packet_batch *packets_,
+emc_processing(struct dp_netdev_pmd_thread *pmd,
+ struct dp_packet_batch *packets_,
struct netdev_flow_key *keys,
struct packet_batch_per_flow batches[], size_t *n_batches,
bool md_is_valid, odp_port_t port_no)
{
struct emc_cache *flow_cache = &pmd->flow_cache;
struct netdev_flow_key *key = &keys[0];
- size_t i, n_missed = 0, n_dropped = 0;
- struct dp_packet **packets = packets_->packets;
- int cnt = packets_->count;
+ size_t n_missed = 0, n_dropped = 0;
+ struct dp_packet *packet;
+ const size_t cnt = dp_packet_batch_size(packets_);
+ uint32_t cur_min;
+ int i;
- for (i = 0; i < cnt; i++) {
+ atomic_read_relaxed(&pmd->dp->emc_insert_min, &cur_min);
+
+ DP_PACKET_BATCH_REFILL_FOR_EACH (i, cnt, packet, packets_) {
struct dp_netdev_flow *flow;
- struct dp_packet *packet = packets[i];
if (OVS_UNLIKELY(dp_packet_size(packet) < ETH_HEADER_LEN)) {
dp_packet_delete(packet);
}
if (i != cnt - 1) {
+ struct dp_packet **packets = packets_->packets;
/* Prefetch next packet data and metadata. */
OVS_PREFETCH(dp_packet_data(packets[i+1]));
pkt_metadata_prefetch_init(&packets[i+1]->md);
}
miniflow_extract(packet, &key->mf);
key->len = 0; /* Not computed yet. */
- key->hash = dpif_netdev_packet_get_rss_hash(packet, &key->mf);
-
- flow = emc_lookup(flow_cache, key);
+ /* If EMC is disabled skip hash computation and emc_lookup */
+ if (cur_min) {
+ if (!md_is_valid) {
+ key->hash = dpif_netdev_packet_get_rss_hash_orig_pkt(packet,
+ &key->mf);
+ } else {
+ key->hash = dpif_netdev_packet_get_rss_hash(packet, &key->mf);
+ }
+ flow = emc_lookup(flow_cache, key);
+ } else {
+ flow = NULL;
+ }
if (OVS_LIKELY(flow)) {
dp_netdev_queue_batches(packet, flow, &key->mf, batches,
n_batches);
} else {
/* Exact match cache missed. Group missed packets together at
- * the beginning of the 'packets' array. */
- packets[n_missed] = packet;
+ * the beginning of the 'packets' array. */
+ dp_packet_batch_refill(packets_, packet, i);
/* 'key[n_missed]' contains the key of the current packet and it
* must be returned to the caller. The next key should be extracted
* to 'keys[n_missed + 1]'. */
}
}
- dp_netdev_count_packet(pmd, DP_STAT_EXACT_HIT, cnt - n_dropped - n_missed);
+ dp_netdev_count_packet(pmd, DP_STAT_EXACT_HIT,
+ cnt - n_dropped - n_missed);
- return n_missed;
+ return dp_packet_batch_size(packets_);
}
static inline void
-handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet,
+handle_packet_upcall(struct dp_netdev_pmd_thread *pmd,
+ struct dp_packet *packet,
const struct netdev_flow_key *key,
struct ofpbuf *actions, struct ofpbuf *put_actions,
int *lost_cnt, long long now)
* VLAN. Unless we refactor a lot of code that translates between
* Netlink and struct flow representations, we have to do the same
* here. */
- if (!match.wc.masks.vlan_tci) {
- match.wc.masks.vlan_tci = htons(0xffff);
+ if (!match.wc.masks.vlans[0].tci) {
+ match.wc.masks.vlans[0].tci = htons(0xffff);
}
/* We can't allow the packet batching in the next loop to execute
* the actions. Otherwise, if there are any slow path actions,
* we'll send the packet up twice. */
- packet_batch_init_packet(&b, packet);
+ dp_packet_batch_init_packet(&b, packet);
dp_netdev_execute_actions(pmd, &b, true, &match.flow,
actions->data, actions->size, now);
add_actions->size);
}
ovs_mutex_unlock(&pmd->flow_mutex);
-
- emc_insert(&pmd->flow_cache, key, netdev_flow);
+ emc_probabilistic_insert(pmd, key, netdev_flow);
}
}
odp_port_t in_port,
long long now)
{
- int cnt = packets_->count;
+ const size_t cnt = dp_packet_batch_size(packets_);
#if !defined(__CHECKER__) && !defined(_WIN32)
const size_t PKT_ARRAY_SIZE = cnt;
#else
/* Sparse or MSVC doesn't like variable length array. */
enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
#endif
- struct dp_packet **packets = packets_->packets;
+ struct dp_packet *packet;
struct dpcls *cls;
struct dpcls_rule *rules[PKT_ARRAY_SIZE];
struct dp_netdev *dp = pmd->dp;
- struct emc_cache *flow_cache = &pmd->flow_cache;
int miss_cnt = 0, lost_cnt = 0;
int lookup_cnt = 0, add_lookup_cnt;
bool any_miss;
ofpbuf_use_stub(&actions, actions_stub, sizeof actions_stub);
ofpbuf_use_stub(&put_actions, slow_stub, sizeof slow_stub);
- for (i = 0; i < cnt; i++) {
+ DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
struct dp_netdev_flow *netdev_flow;
if (OVS_LIKELY(rules[i])) {
}
miss_cnt++;
- handle_packet_upcall(pmd, packets[i], &keys[i], &actions,
+ handle_packet_upcall(pmd, packet, &keys[i], &actions,
&put_actions, &lost_cnt, now);
}
ofpbuf_uninit(&actions);
ofpbuf_uninit(&put_actions);
fat_rwlock_unlock(&dp->upcall_rwlock);
- dp_netdev_count_packet(pmd, DP_STAT_LOST, lost_cnt);
} else if (OVS_UNLIKELY(any_miss)) {
- for (i = 0; i < cnt; i++) {
+ DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
if (OVS_UNLIKELY(!rules[i])) {
- dp_packet_delete(packets[i]);
+ dp_packet_delete(packet);
lost_cnt++;
miss_cnt++;
}
}
}
- for (i = 0; i < cnt; i++) {
- struct dp_packet *packet = packets[i];
+ DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
struct dp_netdev_flow *flow;
if (OVS_UNLIKELY(!rules[i])) {
flow = dp_netdev_flow_cast(rules[i]);
- emc_insert(flow_cache, &keys[i], flow);
+ emc_probabilistic_insert(pmd, &keys[i], flow);
dp_netdev_queue_batches(packet, flow, &keys[i].mf, batches, n_batches);
}
/* Packets enter the datapath from a port (or from recirculation) here.
*
- * For performance reasons a caller may choose not to initialize the metadata
- * in 'packets': in this case 'mdinit' is false and this function needs to
- * initialize it using 'port_no'. If the metadata in 'packets' is already
- * valid, 'md_is_valid' must be true and 'port_no' will be ignored. */
+ * When 'md_is_valid' is true the metadata in 'packets' are already valid.
+ * When false the metadata in 'packets' need to be initialized. */
static void
dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
struct dp_packet_batch *packets,
bool md_is_valid, odp_port_t port_no)
{
- int cnt = packets->count;
#if !defined(__CHECKER__) && !defined(_WIN32)
- const size_t PKT_ARRAY_SIZE = cnt;
+ const size_t PKT_ARRAY_SIZE = dp_packet_batch_size(packets);
#else
/* Sparse or MSVC doesn't like variable length array. */
enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
#endif
- OVS_ALIGNED_VAR(CACHE_LINE_SIZE) struct netdev_flow_key keys[PKT_ARRAY_SIZE];
+ OVS_ALIGNED_VAR(CACHE_LINE_SIZE)
+ struct netdev_flow_key keys[PKT_ARRAY_SIZE];
struct packet_batch_per_flow batches[PKT_ARRAY_SIZE];
long long now = time_msec();
- size_t newcnt, n_batches, i;
+ size_t n_batches;
odp_port_t in_port;
n_batches = 0;
- newcnt = emc_processing(pmd, packets, keys, batches, &n_batches,
+ emc_processing(pmd, packets, keys, batches, &n_batches,
md_is_valid, port_no);
- if (OVS_UNLIKELY(newcnt)) {
- packets->count = newcnt;
+ if (!dp_packet_batch_is_empty(packets)) {
/* Get ingress port from first packet's metadata. */
in_port = packets->packets[0]->md.in_port.odp_port;
- fast_path_processing(pmd, packets, keys, batches, &n_batches, in_port, now);
+ fast_path_processing(pmd, packets, keys, batches, &n_batches,
+ in_port, now);
}
/* All the flow batches need to be reset before any call to
* already its own batches[k] still waiting to be served. So if its
* ‘batch’ member is not reset, the recirculated packet would be wrongly
* appended to batches[k] of the 1st call to dp_netdev_input__(). */
+ size_t i;
for (i = 0; i < n_batches; i++) {
batches[i].flow->batch = NULL;
}
data = nl_attr_get(attr);
- tun_port = pmd_tnl_port_cache_lookup(pmd, u32_to_odp(data->tnl_port));
+ tun_port = pmd_tnl_port_cache_lookup(pmd, data->tnl_port);
if (!tun_port) {
err = -EINVAL;
goto error;
DPIF_UC_ACTION, userdata, actions,
NULL);
if (!error || error == ENOSPC) {
- packet_batch_init_packet(&b, packet);
+ dp_packet_batch_init_packet(&b, packet);
dp_netdev_execute_actions(pmd, &b, may_steal, flow,
actions->data, actions->size, now);
} else if (may_steal) {
static void
dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
const struct nlattr *a, bool may_steal)
+ OVS_NO_THREAD_SAFETY_ANALYSIS
{
struct dp_netdev_execute_aux *aux = aux_;
uint32_t *depth = recirc_depth_get();
if (dynamic_txqs) {
tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
} else {
- atomic_read_relaxed(&pmd->static_tx_qid, &tx_qid);
+ tx_qid = pmd->static_tx_qid;
}
netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
case OVS_ACTION_ATTR_TUNNEL_PUSH:
if (*depth < MAX_RECIRC_DEPTH) {
- struct dp_packet_batch tnl_pkt;
- struct dp_packet_batch *orig_packets_ = packets_;
- int err;
-
- if (!may_steal) {
- dp_packet_batch_clone(&tnl_pkt, packets_);
- packets_ = &tnl_pkt;
- dp_packet_batch_reset_cutlen(orig_packets_);
- }
-
dp_packet_batch_apply_cutlen(packets_);
-
- err = push_tnl_action(pmd, a, packets_);
- if (!err) {
- (*depth)++;
- dp_netdev_recirculate(pmd, packets_);
- (*depth)--;
- }
+ push_tnl_action(pmd, a, packets_);
return;
}
break;
p = pmd_tnl_port_cache_lookup(pmd, portno);
if (p) {
struct dp_packet_batch tnl_pkt;
- int i;
if (!may_steal) {
dp_packet_batch_clone(&tnl_pkt, packets_);
dp_packet_batch_apply_cutlen(packets_);
netdev_pop_header(p->port->netdev, packets_);
- if (!packets_->count) {
+ if (dp_packet_batch_is_empty(packets_)) {
return;
}
- for (i = 0; i < packets_->count; i++) {
- packets_->packets[i]->md.in_port.odp_port = portno;
+ struct dp_packet *packet;
+ DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+ packet->md.in_port.odp_port = portno;
}
(*depth)++;
case OVS_ACTION_ATTR_USERSPACE:
if (!fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
struct dp_packet_batch *orig_packets_ = packets_;
- struct dp_packet **packets = packets_->packets;
const struct nlattr *userdata;
struct dp_packet_batch usr_pkt;
struct ofpbuf actions;
struct flow flow;
ovs_u128 ufid;
bool clone = false;
- int i;
userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
ofpbuf_init(&actions, 0);
if (!may_steal) {
dp_packet_batch_clone(&usr_pkt, packets_);
packets_ = &usr_pkt;
- packets = packets_->packets;
clone = true;
dp_packet_batch_reset_cutlen(orig_packets_);
}
dp_packet_batch_apply_cutlen(packets_);
}
- for (i = 0; i < packets_->count; i++) {
- flow_extract(packets[i], &flow);
+ struct dp_packet *packet;
+ DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+ flow_extract(packet, &flow);
dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid);
- dp_execute_userspace_action(pmd, packets[i], may_steal, &flow,
+ dp_execute_userspace_action(pmd, packet, may_steal, &flow,
&ufid, &actions, userdata, now);
}
case OVS_ACTION_ATTR_RECIRC:
if (*depth < MAX_RECIRC_DEPTH) {
struct dp_packet_batch recirc_pkts;
- int i;
if (!may_steal) {
dp_packet_batch_clone(&recirc_pkts, packets_);
packets_ = &recirc_pkts;
}
- for (i = 0; i < packets_->count; i++) {
- packets_->packets[i]->md.recirc_id = nl_attr_get_u32(a);
+ struct dp_packet *packet;
+ DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+ packet->md.recirc_id = nl_attr_get_u32(a);
}
(*depth)++;
case OVS_ACTION_ATTR_CT: {
const struct nlattr *b;
+ bool force = false;
bool commit = false;
unsigned int left;
uint16_t zone = 0;
const char *helper = NULL;
const uint32_t *setmark = NULL;
const struct ovs_key_ct_labels *setlabel = NULL;
+ struct nat_action_info_t nat_action_info;
+ struct nat_action_info_t *nat_action_info_ref = NULL;
+ bool nat_config = false;
NL_ATTR_FOR_EACH_UNSAFE (b, left, nl_attr_get(a),
nl_attr_get_size(a)) {
enum ovs_ct_attr sub_type = nl_attr_type(b);
switch(sub_type) {
+ case OVS_CT_ATTR_FORCE_COMMIT:
+ force = true;
+ /* fall through. */
case OVS_CT_ATTR_COMMIT:
commit = true;
break;
case OVS_CT_ATTR_LABELS:
setlabel = nl_attr_get(b);
break;
- case OVS_CT_ATTR_NAT:
+ case OVS_CT_ATTR_EVENTMASK:
+ /* Silently ignored, as userspace datapath does not generate
+ * netlink events. */
+ break;
+ case OVS_CT_ATTR_NAT: {
+ const struct nlattr *b_nest;
+ unsigned int left_nest;
+ bool ip_min_specified = false;
+ bool proto_num_min_specified = false;
+ bool ip_max_specified = false;
+ bool proto_num_max_specified = false;
+ memset(&nat_action_info, 0, sizeof nat_action_info);
+ nat_action_info_ref = &nat_action_info;
+
+ NL_NESTED_FOR_EACH_UNSAFE (b_nest, left_nest, b) {
+ enum ovs_nat_attr sub_type_nest = nl_attr_type(b_nest);
+
+ switch (sub_type_nest) {
+ case OVS_NAT_ATTR_SRC:
+ case OVS_NAT_ATTR_DST:
+ nat_config = true;
+ nat_action_info.nat_action |=
+ ((sub_type_nest == OVS_NAT_ATTR_SRC)
+ ? NAT_ACTION_SRC : NAT_ACTION_DST);
+ break;
+ case OVS_NAT_ATTR_IP_MIN:
+ memcpy(&nat_action_info.min_addr,
+ nl_attr_get(b_nest),
+ nl_attr_get_size(b_nest));
+ ip_min_specified = true;
+ break;
+ case OVS_NAT_ATTR_IP_MAX:
+ memcpy(&nat_action_info.max_addr,
+ nl_attr_get(b_nest),
+ nl_attr_get_size(b_nest));
+ ip_max_specified = true;
+ break;
+ case OVS_NAT_ATTR_PROTO_MIN:
+ nat_action_info.min_port =
+ nl_attr_get_u16(b_nest);
+ proto_num_min_specified = true;
+ break;
+ case OVS_NAT_ATTR_PROTO_MAX:
+ nat_action_info.max_port =
+ nl_attr_get_u16(b_nest);
+ proto_num_max_specified = true;
+ break;
+ case OVS_NAT_ATTR_PERSISTENT:
+ case OVS_NAT_ATTR_PROTO_HASH:
+ case OVS_NAT_ATTR_PROTO_RANDOM:
+ break;
+ case OVS_NAT_ATTR_UNSPEC:
+ case __OVS_NAT_ATTR_MAX:
+ OVS_NOT_REACHED();
+ }
+ }
+
+ if (ip_min_specified && !ip_max_specified) {
+ nat_action_info.max_addr = nat_action_info.min_addr;
+ }
+ if (proto_num_min_specified && !proto_num_max_specified) {
+ nat_action_info.max_port = nat_action_info.min_port;
+ }
+ if (proto_num_min_specified || proto_num_max_specified) {
+ if (nat_action_info.nat_action & NAT_ACTION_SRC) {
+ nat_action_info.nat_action |= NAT_ACTION_SRC_PORT;
+ } else if (nat_action_info.nat_action & NAT_ACTION_DST) {
+ nat_action_info.nat_action |= NAT_ACTION_DST_PORT;
+ }
+ }
+ break;
+ }
case OVS_CT_ATTR_UNSPEC:
case __OVS_CT_ATTR_MAX:
OVS_NOT_REACHED();
}
}
- conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, commit,
- zone, setmark, setlabel, helper);
+ /* We won't be able to function properly in this case, hence
+ * complain loudly. */
+ if (nat_config && !commit) {
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
+ VLOG_WARN_RL(&rl, "NAT specified without commit.");
+ }
+
+ conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, force,
+ commit, zone, setmark, setlabel, aux->flow->tp_src,
+ aux->flow->tp_dst, helper, nat_action_info_ref, now);
break;
}
+ case OVS_ACTION_ATTR_METER:
+ dp_netdev_run_meter(pmd->dp, packets_, nl_attr_get_u32(a),
+ time_msec());
+ break;
+
case OVS_ACTION_ATTR_PUSH_VLAN:
case OVS_ACTION_ATTR_POP_VLAN:
case OVS_ACTION_ATTR_PUSH_MPLS:
case OVS_ACTION_ATTR_HASH:
case OVS_ACTION_ATTR_UNSPEC:
case OVS_ACTION_ATTR_TRUNC:
+ case OVS_ACTION_ATTR_PUSH_ETH:
+ case OVS_ACTION_ATTR_POP_ETH:
+ case OVS_ACTION_ATTR_CLONE:
+ case OVS_ACTION_ATTR_ENCAP_NSH:
+ case OVS_ACTION_ATTR_DECAP_NSH:
case __OVS_ACTION_ATTR_MAX:
OVS_NOT_REACHED();
}
static int
dpif_netdev_ct_dump_start(struct dpif *dpif, struct ct_dpif_dump_state **dump_,
- const uint16_t *pzone)
+ const uint16_t *pzone, int *ptot_bkts)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_ct_dump *dump;
dump->dp = dp;
dump->ct = &dp->conntrack;
- conntrack_dump_start(&dp->conntrack, &dump->dump, pzone);
+ conntrack_dump_start(&dp->conntrack, &dump->dump, pzone, ptot_bkts);
*dump_ = &dump->up;
}
static int
-dpif_netdev_ct_flush(struct dpif *dpif, const uint16_t *zone)
+dpif_netdev_ct_flush(struct dpif *dpif, const uint16_t *zone,
+ const struct ct_dpif_tuple *tuple)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
+ if (tuple) {
+ return EOPNOTSUPP;
+ }
return conntrack_flush(&dp->conntrack, zone);
}
dpif_netdev_operate,
NULL, /* recv_set */
NULL, /* handlers_set */
- dpif_netdev_pmd_set,
+ dpif_netdev_set_config,
dpif_netdev_queue_to_priority,
NULL, /* recv */
NULL, /* recv_wait */
dpif_netdev_ct_dump_next,
dpif_netdev_ct_dump_done,
dpif_netdev_ct_flush,
+ dpif_netdev_meter_get_features,
+ dpif_netdev_meter_set,
+ dpif_netdev_meter_get,
+ dpif_netdev_meter_del,
};
static void
/* Remove port. */
hmap_remove(&dp->ports, &port->node);
- dp_netdev_del_port_from_all_pmds(dp, port);
+ reconfigure_datapath(dp);
/* Reinsert with new port number. */
port->port_no = port_no;
hmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
- dp_netdev_add_port_to_pmds(dp, port);
+ reconfigure_datapath(dp);
seq_change(dp->port_seq);
unixctl_command_reply(conn, NULL);
}
static inline void
-dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd)
+dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd,
+ struct polled_queue *poll_list, int poll_cnt)
{
struct dpcls *cls;
long long int now = time_msec();
+ if (now > pmd->rxq_next_cycle_store) {
+ /* Get the cycles that were used to process each queue and store. */
+ for (unsigned i = 0; i < poll_cnt; i++) {
+ uint64_t rxq_cyc_curr = dp_netdev_rxq_get_cycles(poll_list[i].rxq,
+ RXQ_CYCLES_PROC_CURR);
+ dp_netdev_rxq_set_intrvl_cycles(poll_list[i].rxq, rxq_cyc_curr);
+ dp_netdev_rxq_set_cycles(poll_list[i].rxq, RXQ_CYCLES_PROC_CURR,
+ 0);
+ }
+ /* Start new measuring interval */
+ pmd->rxq_next_cycle_store = now + PMD_RXQ_INTERVAL_LEN;
+ }
+
if (now > pmd->next_optimization) {
/* Try to obtain the flow lock to block out revalidator threads.
* If not possible, just try next time. */