#include <fcntl.h>
#include <inttypes.h>
#include <net/if.h>
+#include <sys/types.h>
#include <netinet/in.h>
#include <stdint.h>
#include <stdlib.h>
#include "fat-rwlock.h"
#include "flow.h"
#include "hmapx.h"
+#include "id-pool.h"
#include "latch.h"
#include "netdev.h"
#include "netdev-vport.h"
#include "ovs-numa.h"
#include "ovs-rcu.h"
#include "packets.h"
-#include "poll-loop.h"
+#include "openvswitch/poll-loop.h"
#include "pvector.h"
#include "random.h"
#include "seq.h"
#define FLOW_DUMP_MAX_BATCH 50
/* Use per thread recirc_depth to prevent recirculation loop. */
-#define MAX_RECIRC_DEPTH 5
+#define MAX_RECIRC_DEPTH 6
DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
/* Configuration parameters. */
.ct_zone = true,
.ct_mark = true,
.ct_label = true,
+ .ct_state_nat = true,
+ .ct_orig_tuple = true,
+ .ct_orig_tuple6 = true,
};
/* Stores a miniflow with inline values */
/* Time in ms between successive optimizations of the dpcls subtable vector */
#define DPCLS_OPTIMIZATION_INTERVAL 1000
+/* Time in ms of the interval in which rxq processing cycles used in
+ * rxq to pmd assignments is measured and stored. */
+#define PMD_RXQ_INTERVAL_LEN 10000
+
+/* Number of intervals for which cycles are stored
+ * and used during rxq to pmd assignment. */
+#define PMD_RXQ_INTERVAL_MAX 6
+
struct dpcls {
struct cmap_node node; /* Within dp_netdev_pmd_thread.classifiers */
odp_port_t in_port;
/* Stores all 'struct dp_netdev_pmd_thread's. */
struct cmap poll_threads;
+ /* id pool for per thread static_tx_qid. */
+ struct id_pool *tx_qid_pool;
+ struct ovs_mutex tx_qid_pool_mutex;
/* Protects the access of the 'struct dp_netdev_pmd_thread'
* instance for non-pmd thread. */
};
enum pmd_cycles_counter_type {
- PMD_CYCLES_POLLING, /* Cycles spent polling NICs. */
- PMD_CYCLES_PROCESSING, /* Cycles spent processing packets */
+ PMD_CYCLES_IDLE, /* Cycles spent idle or unsuccessful polling */
+ PMD_CYCLES_PROCESSING, /* Cycles spent successfully polling and
+ * processing polled packets */
PMD_N_CYCLES
};
+enum rxq_cycles_counter_type {
+ RXQ_CYCLES_PROC_CURR, /* Cycles spent successfully polling and
+ processing packets during the current
+ interval. */
+ RXQ_CYCLES_PROC_HIST, /* Total cycles of all intervals that are used
+ during rxq to pmd assignment. */
+ RXQ_N_CYCLES
+};
+
#define XPS_TIMEOUT_MS 500LL
/* Contained by struct dp_netdev_port's 'rxqs' member. */
pinned. OVS_CORE_UNSPEC if the
queue doesn't need to be pinned to a
particular core. */
+ unsigned intrvl_idx; /* Write index for 'cycles_intrvl'. */
struct dp_netdev_pmd_thread *pmd; /* pmd thread that polls this queue. */
+
+ /* Counters of cycles spent successfully polling and processing pkts. */
+ atomic_ullong cycles[RXQ_N_CYCLES];
+ /* We store PMD_RXQ_INTERVAL_MAX intervals of data for an rxq and then
+ sum them to yield the cycles used for an rxq. */
+ atomic_ullong cycles_intrvl[PMD_RXQ_INTERVAL_MAX];
};
/* A port in a netdev-based datapath. */
struct dp_netdev_port {
odp_port_t port_no;
+ bool dynamic_txqs; /* If true XPS will be used. */
+ bool need_reconfigure; /* True if we should reconfigure netdev. */
struct netdev *netdev;
struct hmap_node node; /* Node in dp_netdev's 'ports'. */
struct netdev_saved_flags *sf;
struct dp_netdev_rxq *rxqs;
- unsigned n_rxq; /* Number of elements in 'rxq' */
- bool dynamic_txqs; /* If true XPS will be used. */
+ unsigned n_rxq; /* Number of elements in 'rxqs' */
unsigned *txq_used; /* Number of threads that use each tx queue. */
struct ovs_mutex txq_used_mutex;
char *type; /* Port type as requested by user. */
char *rxq_affinity_list; /* Requested affinity of rx queues. */
- bool need_reconfigure; /* True if we should reconfigure netdev. */
};
/* Contained by struct dp_netdev_flow's 'stats' member. */
};
struct polled_queue {
- struct netdev_rxq *rx;
+ struct dp_netdev_rxq *rxq;
odp_port_t port_no;
};
struct cmap classifiers;
/* Periodically sort subtable vectors according to hit frequencies */
long long int next_optimization;
+ /* End of the next time interval for which processing cycles
+ are stored for each polled rxq. */
+ long long int rxq_next_cycle_store;
/* Statistics. */
struct dp_netdev_pmd_stats stats;
/* Queue id used by this pmd thread to send packets on all netdevs if
* XPS disabled for this netdev. All static_tx_qid's are unique and less
* than 'cmap_count(dp->poll_threads)'. */
- const int static_tx_qid;
+ uint32_t static_tx_qid;
struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and 'tx_ports'. */
/* List of rx queues to poll. */
unsigned core_id);
static struct dp_netdev_pmd_thread *
dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
+static void dp_netdev_del_pmd(struct dp_netdev *dp,
+ struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd);
static void dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
OVS_REQUIRES(pmd->port_mutex);
static inline void
-dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd);
-
+dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd,
+ struct polled_queue *poll_list, int poll_cnt);
+static void
+dp_netdev_rxq_set_cycles(struct dp_netdev_rxq *rx,
+ enum rxq_cycles_counter_type type,
+ unsigned long long cycles);
+static uint64_t
+dp_netdev_rxq_get_cycles(struct dp_netdev_rxq *rx,
+ enum rxq_cycles_counter_type type);
+static void
+dp_netdev_rxq_set_intrvl_cycles(struct dp_netdev_rxq *rx,
+ unsigned long long cycles);
+static uint64_t
+dp_netdev_rxq_get_intrvl_cycles(struct dp_netdev_rxq *rx, unsigned idx);
static void
dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
long long now, bool purge);
static inline bool emc_entry_alive(struct emc_entry *ce);
static void emc_clear_entry(struct emc_entry *ce);
+static void dp_netdev_request_reconfigure(struct dp_netdev *dp);
+
static void
emc_cache_init(struct emc_cache *flow_cache)
{
unsigned long long stats[DP_N_STATS],
uint64_t cycles[PMD_N_CYCLES])
{
- unsigned long long total_packets = 0;
+ unsigned long long total_packets;
uint64_t total_cycles = 0;
int i;
} else {
stats[i] = 0;
}
-
- if (i != DP_STAT_LOST) {
- /* Lost packets are already included in DP_STAT_MISS */
- total_packets += stats[i];
- }
}
+ /* Sum of all the matched and not matched packets gives the total. */
+ total_packets = stats[DP_STAT_EXACT_HIT] + stats[DP_STAT_MASKED_HIT]
+ + stats[DP_STAT_MISS];
+
for (i = 0; i < PMD_N_CYCLES; i++) {
if (cycles[i] > pmd->cycles_zero[i]) {
cycles[i] -= pmd->cycles_zero[i];
}
ds_put_format(reply,
- "\tpolling cycles:%"PRIu64" (%.02f%%)\n"
+ "\tidle cycles:%"PRIu64" (%.02f%%)\n"
"\tprocessing cycles:%"PRIu64" (%.02f%%)\n",
- cycles[PMD_CYCLES_POLLING],
- cycles[PMD_CYCLES_POLLING] / (double)total_cycles * 100,
+ cycles[PMD_CYCLES_IDLE],
+ cycles[PMD_CYCLES_IDLE] / (double)total_cycles * 100,
cycles[PMD_CYCLES_PROCESSING],
cycles[PMD_CYCLES_PROCESSING] / (double)total_cycles * 100);
i++;
}
ovs_assert(i == *n);
+ qsort(ret, *n, sizeof *ret, compare_poll_list);
}
- qsort(ret, *n, sizeof *ret, compare_poll_list);
-
*list = ret;
}
*n = k;
}
+static void
+dpif_netdev_pmd_rebalance(struct unixctl_conn *conn, int argc,
+ const char *argv[], void *aux OVS_UNUSED)
+{
+ struct ds reply = DS_EMPTY_INITIALIZER;
+ struct dp_netdev *dp = NULL;
+
+ ovs_mutex_lock(&dp_netdev_mutex);
+
+ if (argc == 2) {
+ dp = shash_find_data(&dp_netdevs, argv[1]);
+ } else if (shash_count(&dp_netdevs) == 1) {
+ /* There's only one datapath */
+ dp = shash_first(&dp_netdevs)->data;
+ }
+
+ if (!dp) {
+ ovs_mutex_unlock(&dp_netdev_mutex);
+ unixctl_command_reply_error(conn,
+ "please specify an existing datapath");
+ return;
+ }
+
+ dp_netdev_request_reconfigure(dp);
+ ovs_mutex_unlock(&dp_netdev_mutex);
+ ds_put_cstr(&reply, "pmd rxq rebalance requested.\n");
+ unixctl_command_reply(conn, ds_cstr(&reply));
+ ds_destroy(&reply);
+}
+
static void
dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
void *aux)
} else {
unsigned long long stats[DP_N_STATS];
uint64_t cycles[PMD_N_CYCLES];
- int i;
/* Read current stats and cycle counters */
- for (i = 0; i < ARRAY_SIZE(stats); i++) {
- atomic_read_relaxed(&pmd->stats.n[i], &stats[i]);
+ for (size_t j = 0; j < ARRAY_SIZE(stats); j++) {
+ atomic_read_relaxed(&pmd->stats.n[j], &stats[j]);
}
- for (i = 0; i < ARRAY_SIZE(cycles); i++) {
- atomic_read_relaxed(&pmd->cycles.n[i], &cycles[i]);
+ for (size_t j = 0; j < ARRAY_SIZE(cycles); j++) {
+ atomic_read_relaxed(&pmd->cycles.n[j], &cycles[j]);
}
if (type == PMD_INFO_CLEAR_STATS) {
unixctl_command_register("dpif-netdev/pmd-rxq-show", "[dp]",
0, 1, dpif_netdev_pmd_info,
(void *)&poll_aux);
+ unixctl_command_register("dpif-netdev/pmd-rxq-rebalance", "[dp]",
+ 0, 1, dpif_netdev_pmd_rebalance,
+ NULL);
return 0;
}
atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
cmap_init(&dp->poll_threads);
+
+ ovs_mutex_init(&dp->tx_qid_pool_mutex);
+ /* We need 1 Tx queue for each possible core + 1 for non-PMD threads. */
+ dp->tx_qid_pool = id_pool_create(0, ovs_numa_get_n_cores() + 1);
+
ovs_mutex_init_recursive(&dp->non_pmd_mutex);
ovsthread_key_create(&dp->per_pmd_key, NULL);
ovs_mutex_lock(&dp->port_mutex);
+ /* non-PMD will be created before all other threads and will
+ * allocate static_tx_qid = 0. */
dp_netdev_set_nonpmd(dp);
error = do_add_port(dp, name, dpif_netdev_port_open_type(dp->class,
dp_netdev_destroy_all_pmds(dp, true);
cmap_destroy(&dp->poll_threads);
+ ovs_mutex_destroy(&dp->tx_qid_pool_mutex);
+ id_pool_destroy(dp->tx_qid_pool);
+
ovs_mutex_destroy(&dp->non_pmd_mutex);
ovsthread_key_delete(dp->per_pmd_key);
uint32_t min;
atomic_read_relaxed(&pmd->dp->emc_insert_min, &min);
-#ifdef DPDK_NETDEV
- if (min && (key->hash ^ (uint32_t) pmd->last_cycles) <= min) {
-#else
- if (min && (key->hash ^ random_uint32()) <= min) {
-#endif
+ if (min && random_uint32() <= min) {
emc_insert(&pmd->flow_cache, key, flow);
}
}
dpif_netdev_flow_from_nlattrs(const struct nlattr *key, uint32_t key_len,
struct flow *flow, bool probe)
{
- odp_port_t in_port;
-
if (odp_flow_key_to_flow(key, key_len, flow)) {
if (!probe) {
/* This should not happen: it indicates that
return EINVAL;
}
- in_port = flow->in_port.odp_port;
- if (!is_valid_port_number(in_port) && in_port != ODPP_NONE) {
- return EINVAL;
- }
-
if (flow->ct_state & DP_NETDEV_CS_UNSUPPORTED_MASK) {
return EINVAL;
}
mask_buf.data, mask_buf.size,
NULL, &ds, false);
ds_put_cstr(&ds, ", actions:");
- format_odp_actions(&ds, actions, actions_len);
+ format_odp_actions(&ds, actions, actions_len, NULL);
VLOG_DBG("%s", ds_cstr(&ds));
ds_put_cstr(&ds, "flow match: ");
miniflow_expand(&flow->cr.flow.mf, &m.flow);
miniflow_expand(&flow->cr.mask->mf, &m.wc.masks);
+ memset(&m.tun_md, 0, sizeof m.tun_md);
match_format(&m, NULL, &ds, OFP_DEFAULT_PRIORITY);
VLOG_DBG("%s", ds_cstr(&ds));
/* If this is part of a probe, Drop the packet, since executing
* the action may actually cause spurious packets be sent into
* the network. */
+ if (pmd->core_id == NON_PMD_CORE_ID) {
+ dp_netdev_pmd_unref(pmd);
+ }
return 0;
}
\f
/* Creates and returns a new 'struct dp_netdev_actions', whose actions are
- * a copy of the 'ofpacts_len' bytes of 'ofpacts'. */
+ * a copy of the 'size' bytes of 'actions' input parameters. */
struct dp_netdev_actions *
dp_netdev_actions_create(const struct nlattr *actions, size_t size)
{
non_atomic_ullong_add(&pmd->cycles.n[type], interval);
}
+/* Calculate the intermediate cycle result and add to the counter 'type' */
+static inline void
+cycles_count_intermediate(struct dp_netdev_pmd_thread *pmd,
+ struct dp_netdev_rxq *rxq,
+ enum pmd_cycles_counter_type type)
+ OVS_NO_THREAD_SAFETY_ANALYSIS
+{
+ unsigned long long new_cycles = cycles_counter();
+ unsigned long long interval = new_cycles - pmd->last_cycles;
+ pmd->last_cycles = new_cycles;
+
+ non_atomic_ullong_add(&pmd->cycles.n[type], interval);
+ if (rxq && (type == PMD_CYCLES_PROCESSING)) {
+ /* Add to the amount of current processing cycles. */
+ non_atomic_ullong_add(&rxq->cycles[RXQ_CYCLES_PROC_CURR], interval);
+ }
+}
+
+static void
+dp_netdev_rxq_set_cycles(struct dp_netdev_rxq *rx,
+ enum rxq_cycles_counter_type type,
+ unsigned long long cycles)
+{
+ atomic_store_relaxed(&rx->cycles[type], cycles);
+}
+
+static uint64_t
+dp_netdev_rxq_get_cycles(struct dp_netdev_rxq *rx,
+ enum rxq_cycles_counter_type type)
+{
+ unsigned long long processing_cycles;
+ atomic_read_relaxed(&rx->cycles[type], &processing_cycles);
+ return processing_cycles;
+}
+
static void
+dp_netdev_rxq_set_intrvl_cycles(struct dp_netdev_rxq *rx,
+ unsigned long long cycles)
+{
+ unsigned int idx = rx->intrvl_idx++ % PMD_RXQ_INTERVAL_MAX;
+ atomic_store_relaxed(&rx->cycles_intrvl[idx], cycles);
+}
+
+static uint64_t
+dp_netdev_rxq_get_intrvl_cycles(struct dp_netdev_rxq *rx, unsigned idx)
+{
+ unsigned long long processing_cycles;
+ atomic_read_relaxed(&rx->cycles_intrvl[idx], &processing_cycles);
+ return processing_cycles;
+}
+
+static int
dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
struct netdev_rxq *rx,
odp_port_t port_no)
{
struct dp_packet_batch batch;
int error;
+ int batch_cnt = 0;
dp_packet_batch_init(&batch);
- cycles_count_start(pmd);
error = netdev_rxq_recv(rx, &batch);
- cycles_count_end(pmd, PMD_CYCLES_POLLING);
if (!error) {
*recirc_depth_get() = 0;
- cycles_count_start(pmd);
+ batch_cnt = batch.count;
dp_netdev_input(pmd, &batch, port_no);
- cycles_count_end(pmd, PMD_CYCLES_PROCESSING);
} else if (error != EAGAIN && error != EOPNOTSUPP) {
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
netdev_rxq_get_name(rx), ovs_strerror(error));
}
+
+ return batch_cnt;
}
static struct tx_port *
netdev_rxq_close(port->rxqs[i].rx);
port->rxqs[i].rx = NULL;
}
+ unsigned last_nrxq = port->n_rxq;
port->n_rxq = 0;
/* Allows 'netdev' to apply the pending configuration changes. */
port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
for (i = 0; i < netdev_n_rxq(netdev); i++) {
+ bool new_queue = i >= last_nrxq;
+ if (new_queue) {
+ memset(&port->rxqs[i], 0, sizeof port->rxqs[i]);
+ }
+
port->rxqs[i].port = port;
+
err = netdev_rxq_open(netdev, &port->rxqs[i].rx, i);
if (err) {
return err;
int n_pmds;
int cur_index;
+ bool idx_inc;
};
static struct rr_numa *
return NULL;
}
+/* Returns the next node in numa list following 'numa' in round-robin fashion.
+ * Returns first node if 'numa' is a null pointer or the last node in 'rr'.
+ * Returns NULL if 'rr' numa list is empty. */
+static struct rr_numa *
+rr_numa_list_next(struct rr_numa_list *rr, const struct rr_numa *numa)
+{
+ struct hmap_node *node = NULL;
+
+ if (numa) {
+ node = hmap_next(&rr->numas, &numa->node);
+ }
+ if (!node) {
+ node = hmap_first(&rr->numas);
+ }
+
+ return (node) ? CONTAINER_OF(node, struct rr_numa, node) : NULL;
+}
+
static void
rr_numa_list_populate(struct dp_netdev *dp, struct rr_numa_list *rr)
{
numa->n_pmds++;
numa->pmds = xrealloc(numa->pmds, numa->n_pmds * sizeof *numa->pmds);
numa->pmds[numa->n_pmds - 1] = pmd;
+ /* At least one pmd so initialise curr_idx and idx_inc. */
+ numa->cur_index = 0;
+ numa->idx_inc = true;
}
}
+/* Returns the next pmd from the numa node in
+ * incrementing or decrementing order. */
static struct dp_netdev_pmd_thread *
rr_numa_get_pmd(struct rr_numa *numa)
{
- return numa->pmds[numa->cur_index++ % numa->n_pmds];
+ int numa_idx = numa->cur_index;
+
+ if (numa->idx_inc == true) {
+ /* Incrementing through list of pmds. */
+ if (numa->cur_index == numa->n_pmds-1) {
+ /* Reached the last pmd. */
+ numa->idx_inc = false;
+ } else {
+ numa->cur_index++;
+ }
+ } else {
+ /* Decrementing through list of pmds. */
+ if (numa->cur_index == 0) {
+ /* Reached the first pmd. */
+ numa->idx_inc = true;
+ } else {
+ numa->cur_index--;
+ }
+ }
+ return numa->pmds[numa_idx];
}
static void
hmap_destroy(&rr->numas);
}
+/* Sort Rx Queues by the processing cycles they are consuming. */
+static int
+compare_rxq_cycles(const void *a, const void *b)
+{
+ struct dp_netdev_rxq *qa;
+ struct dp_netdev_rxq *qb;
+ uint64_t cycles_qa, cycles_qb;
+
+ qa = *(struct dp_netdev_rxq **) a;
+ qb = *(struct dp_netdev_rxq **) b;
+
+ cycles_qa = dp_netdev_rxq_get_cycles(qa, RXQ_CYCLES_PROC_HIST);
+ cycles_qb = dp_netdev_rxq_get_cycles(qb, RXQ_CYCLES_PROC_HIST);
+
+ if (cycles_qa != cycles_qb) {
+ return (cycles_qa < cycles_qb) ? 1 : -1;
+ } else {
+ /* Cycles are the same so tiebreak on port/queue id.
+ * Tiebreaking (as opposed to return 0) ensures consistent
+ * sort results across multiple OS's. */
+ uint32_t port_qa = odp_to_u32(qa->port->port_no);
+ uint32_t port_qb = odp_to_u32(qb->port->port_no);
+ if (port_qa != port_qb) {
+ return port_qa > port_qb ? 1 : -1;
+ } else {
+ return netdev_rxq_get_queue_id(qa->rx)
+ - netdev_rxq_get_queue_id(qb->rx);
+ }
+ }
+}
+
/* Assign pmds to queues. If 'pinned' is true, assign pmds to pinned
* queues and marks the pmds as isolated. Otherwise, assign non isolated
* pmds to unpinned queues.
*
+ * If 'pinned' is false queues will be sorted by processing cycles they are
+ * consuming and then assigned to pmds in round robin order.
+ *
* The function doesn't touch the pmd threads, it just stores the assignment
* in the 'pmd' member of each rxq. */
static void
{
struct dp_netdev_port *port;
struct rr_numa_list rr;
-
- rr_numa_list_populate(dp, &rr);
+ struct rr_numa *non_local_numa = NULL;
+ struct dp_netdev_rxq ** rxqs = NULL;
+ int i, n_rxqs = 0;
+ struct rr_numa *numa = NULL;
+ int numa_id;
HMAP_FOR_EACH (port, node, &dp->ports) {
- struct rr_numa *numa;
- int numa_id;
-
if (!netdev_is_pmd(port->netdev)) {
continue;
}
- numa_id = netdev_get_numa_id(port->netdev);
- numa = rr_numa_list_lookup(&rr, numa_id);
-
for (int qid = 0; qid < port->n_rxq; qid++) {
struct dp_netdev_rxq *q = &port->rxqs[qid];
dp_netdev_pmd_unref(pmd);
}
} else if (!pinned && q->core_id == OVS_CORE_UNSPEC) {
- if (!numa) {
- VLOG_WARN("There's no available (non isolated) pmd thread "
- "on numa node %d. Queue %d on port \'%s\' will "
- "not be polled.",
- numa_id, qid, netdev_get_name(port->netdev));
+ uint64_t cycle_hist = 0;
+
+ if (n_rxqs == 0) {
+ rxqs = xmalloc(sizeof *rxqs);
} else {
- q->pmd = rr_numa_get_pmd(numa);
+ rxqs = xrealloc(rxqs, sizeof *rxqs * (n_rxqs + 1));
+ }
+ /* Sum the queue intervals and store the cycle history. */
+ for (unsigned i = 0; i < PMD_RXQ_INTERVAL_MAX; i++) {
+ cycle_hist += dp_netdev_rxq_get_intrvl_cycles(q, i);
}
+ dp_netdev_rxq_set_cycles(q, RXQ_CYCLES_PROC_HIST, cycle_hist);
+
+ /* Store the queue. */
+ rxqs[n_rxqs++] = q;
}
}
}
+ if (n_rxqs > 1) {
+ /* Sort the queues in order of the processing cycles
+ * they consumed during their last pmd interval. */
+ qsort(rxqs, n_rxqs, sizeof *rxqs, compare_rxq_cycles);
+ }
+
+ rr_numa_list_populate(dp, &rr);
+ /* Assign the sorted queues to pmds in round robin. */
+ for (i = 0; i < n_rxqs; i++) {
+ numa_id = netdev_get_numa_id(rxqs[i]->port->netdev);
+ numa = rr_numa_list_lookup(&rr, numa_id);
+ if (!numa) {
+ /* There are no pmds on the queue's local NUMA node.
+ Round robin on the NUMA nodes that do have pmds. */
+ non_local_numa = rr_numa_list_next(&rr, non_local_numa);
+ if (!non_local_numa) {
+ VLOG_ERR("There is no available (non-isolated) pmd "
+ "thread for port \'%s\' queue %d. This queue "
+ "will not be polled. Is pmd-cpu-mask set to "
+ "zero? Or are all PMDs isolated to other "
+ "queues?", netdev_rxq_get_name(rxqs[i]->rx),
+ netdev_rxq_get_queue_id(rxqs[i]->rx));
+ continue;
+ }
+ rxqs[i]->pmd = rr_numa_get_pmd(non_local_numa);
+ VLOG_WARN("There's no available (non-isolated) pmd thread "
+ "on numa node %d. Queue %d on port \'%s\' will "
+ "be assigned to the pmd on core %d "
+ "(numa node %d). Expect reduced performance.",
+ numa_id, netdev_rxq_get_queue_id(rxqs[i]->rx),
+ netdev_rxq_get_name(rxqs[i]->rx),
+ rxqs[i]->pmd->core_id, rxqs[i]->pmd->numa_id);
+ } else {
+ rxqs[i]->pmd = rr_numa_get_pmd(numa);
+ VLOG_INFO("Core %d on numa node %d assigned port \'%s\' "
+ "rx queue %d (measured processing cycles %"PRIu64").",
+ rxqs[i]->pmd->core_id, numa_id,
+ netdev_rxq_get_name(rxqs[i]->rx),
+ netdev_rxq_get_queue_id(rxqs[i]->rx),
+ dp_netdev_rxq_get_cycles(rxqs[i], RXQ_CYCLES_PROC_HIST));
+ }
+ }
+
rr_numa_list_destroy(&rr);
+ free(rxqs);
+}
+
+static void
+reload_affected_pmds(struct dp_netdev *dp)
+{
+ struct dp_netdev_pmd_thread *pmd;
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->need_reload) {
+ dp_netdev_reload_pmd__(pmd);
+ pmd->need_reload = false;
+ }
+ }
}
static void
{
struct dp_netdev_pmd_thread *pmd;
struct ovs_numa_dump *pmd_cores;
+ struct ovs_numa_info_core *core;
+ struct hmapx to_delete = HMAPX_INITIALIZER(&to_delete);
+ struct hmapx_node *node;
bool changed = false;
+ bool need_to_adjust_static_tx_qids = false;
/* The pmd threads should be started only if there's a pmd port in the
* datapath. If the user didn't provide any "pmd-cpu-mask", we start
pmd_cores = ovs_numa_dump_n_cores_per_numa(NR_PMD_THREADS);
}
- /* Check for changed configuration */
- if (ovs_numa_dump_count(pmd_cores) != cmap_count(&dp->poll_threads) - 1) {
- changed = true;
- } else {
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- if (pmd->core_id != NON_PMD_CORE_ID
- && !ovs_numa_dump_contains_core(pmd_cores,
- pmd->numa_id,
- pmd->core_id)) {
- changed = true;
- break;
- }
+ /* We need to adjust 'static_tx_qid's only if we're reducing number of
+ * PMD threads. Otherwise, new threads will allocate all the freed ids. */
+ if (ovs_numa_dump_count(pmd_cores) < cmap_count(&dp->poll_threads) - 1) {
+ /* Adjustment is required to keep 'static_tx_qid's sequential and
+ * avoid possible issues, for example, imbalanced tx queue usage
+ * and unnecessary locking caused by remapping on netdev level. */
+ need_to_adjust_static_tx_qids = true;
+ }
+
+ /* Check for unwanted pmd threads */
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->core_id == NON_PMD_CORE_ID) {
+ continue;
+ }
+ if (!ovs_numa_dump_contains_core(pmd_cores, pmd->numa_id,
+ pmd->core_id)) {
+ hmapx_add(&to_delete, pmd);
+ } else if (need_to_adjust_static_tx_qids) {
+ pmd->need_reload = true;
}
}
- /* Destroy the old and recreate the new pmd threads. We don't perform an
- * incremental update because we would have to adjust 'static_tx_qid'. */
- if (changed) {
- struct ovs_numa_info_core *core;
- struct ovs_numa_info_numa *numa;
+ HMAPX_FOR_EACH (node, &to_delete) {
+ pmd = (struct dp_netdev_pmd_thread *) node->data;
+ VLOG_INFO("PMD thread on numa_id: %d, core id: %2d destroyed.",
+ pmd->numa_id, pmd->core_id);
+ dp_netdev_del_pmd(dp, pmd);
+ }
+ changed = !hmapx_is_empty(&to_delete);
+ hmapx_destroy(&to_delete);
- /* Do not destroy the non pmd thread. */
- dp_netdev_destroy_all_pmds(dp, false);
- FOR_EACH_CORE_ON_DUMP (core, pmd_cores) {
- struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
+ if (need_to_adjust_static_tx_qids) {
+ /* 'static_tx_qid's are not sequential now.
+ * Reload remaining threads to fix this. */
+ reload_affected_pmds(dp);
+ }
+ /* Check for required new pmd threads */
+ FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
+ pmd = dp_netdev_get_pmd(dp, core->core_id);
+ if (!pmd) {
+ pmd = xzalloc(sizeof *pmd);
dp_netdev_configure_pmd(pmd, dp, core->core_id, core->numa_id);
-
pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
+ VLOG_INFO("PMD thread on numa_id: %d, core id: %2d created.",
+ pmd->numa_id, pmd->core_id);
+ changed = true;
+ } else {
+ dp_netdev_pmd_unref(pmd);
}
+ }
+
+ if (changed) {
+ struct ovs_numa_info_numa *numa;
/* Log the number of pmd threads per numa node. */
FOR_EACH_NUMA_ON_DUMP (numa, pmd_cores) {
- VLOG_INFO("Created %"PRIuSIZE" pmd threads on numa node %d",
+ VLOG_INFO("There are %"PRIuSIZE" pmd threads on numa node %d",
numa->n_cores, numa->numa_id);
}
}
ovs_numa_dump_destroy(pmd_cores);
}
-static void
-reload_affected_pmds(struct dp_netdev *dp)
-{
- struct dp_netdev_pmd_thread *pmd;
-
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- if (pmd->need_reload) {
- dp_netdev_reload_pmd__(pmd);
- pmd->need_reload = false;
- }
- }
-}
-
static void
pmd_remove_stale_ports(struct dp_netdev *dp,
struct dp_netdev_pmd_thread *pmd)
* need reconfiguration. */
/* Check for all the ports that need reconfiguration. We cache this in
- * 'port->reconfigure', because netdev_is_reconf_required() can change at
- * any time. */
+ * 'port->need_reconfigure', because netdev_is_reconf_required() can
+ * change at any time. */
HMAP_FOR_EACH (port, node, &dp->ports) {
if (netdev_is_reconf_required(port->netdev)) {
port->need_reconfigure = true;
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_pmd_thread *non_pmd;
uint64_t new_tnl_seq;
+ int process_packets = 0;
ovs_mutex_lock(&dp->port_mutex);
non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
if (non_pmd) {
ovs_mutex_lock(&dp->non_pmd_mutex);
+ cycles_count_start(non_pmd);
HMAP_FOR_EACH (port, node, &dp->ports) {
if (!netdev_is_pmd(port->netdev)) {
int i;
for (i = 0; i < port->n_rxq; i++) {
- dp_netdev_process_rxq_port(non_pmd, port->rxqs[i].rx,
- port->port_no);
+ process_packets =
+ dp_netdev_process_rxq_port(non_pmd,
+ port->rxqs[i].rx,
+ port->port_no);
+ cycles_count_intermediate(non_pmd, NULL,
+ process_packets
+ ? PMD_CYCLES_PROCESSING
+ : PMD_CYCLES_IDLE);
}
}
}
+ cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
dpif_netdev_xps_revalidate_pmd(non_pmd, time_msec(), false);
ovs_mutex_unlock(&dp->non_pmd_mutex);
}
/* Copies ports from 'pmd->tx_ports' (shared with the main thread) to
- * 'pmd->port_cache' (thread local) */
+ * thread-local copies. Copy to 'pmd->tnl_port_cache' if it is a tunnel
+ * device, otherwise to 'pmd->send_port_cache' if the port has at least
+ * one txq. */
static void
pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
OVS_REQUIRES(pmd->port_mutex)
}
}
+static void
+pmd_alloc_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
+{
+ ovs_mutex_lock(&pmd->dp->tx_qid_pool_mutex);
+ if (!id_pool_alloc_id(pmd->dp->tx_qid_pool, &pmd->static_tx_qid)) {
+ VLOG_ABORT("static_tx_qid allocation failed for PMD on core %2d"
+ ", numa_id %d.", pmd->core_id, pmd->numa_id);
+ }
+ ovs_mutex_unlock(&pmd->dp->tx_qid_pool_mutex);
+
+ VLOG_DBG("static_tx_qid = %d allocated for PMD thread on core %2d"
+ ", numa_id %d.", pmd->static_tx_qid, pmd->core_id, pmd->numa_id);
+}
+
+static void
+pmd_free_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
+{
+ ovs_mutex_lock(&pmd->dp->tx_qid_pool_mutex);
+ id_pool_free_id(pmd->dp->tx_qid_pool, pmd->static_tx_qid);
+ ovs_mutex_unlock(&pmd->dp->tx_qid_pool_mutex);
+}
+
static int
pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
struct polled_queue **ppoll_list)
i = 0;
HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
- poll_list[i].rx = poll->rxq->rx;
+ poll_list[i].rxq = poll->rxq;
poll_list[i].port_no = poll->rxq->port->port_no;
i++;
}
bool exiting;
int poll_cnt;
int i;
+ int process_packets = 0;
poll_list = NULL;
ovs_numa_thread_setaffinity_core(pmd->core_id);
dpdk_set_lcore_id(pmd->core_id);
poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
-reload:
emc_cache_init(&pmd->flow_cache);
+reload:
+ pmd_alloc_static_tx_qid(pmd);
/* List port/core affinity */
for (i = 0; i < poll_cnt; i++) {
VLOG_DBG("Core %d processing port \'%s\' with queue-id %d\n",
- pmd->core_id, netdev_rxq_get_name(poll_list[i].rx),
- netdev_rxq_get_queue_id(poll_list[i].rx));
+ pmd->core_id, netdev_rxq_get_name(poll_list[i].rxq->rx),
+ netdev_rxq_get_queue_id(poll_list[i].rxq->rx));
}
if (!poll_cnt) {
lc = UINT_MAX;
}
+ cycles_count_start(pmd);
for (;;) {
for (i = 0; i < poll_cnt; i++) {
- dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
- poll_list[i].port_no);
+ process_packets =
+ dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx,
+ poll_list[i].port_no);
+ cycles_count_intermediate(pmd, poll_list[i].rxq,
+ process_packets ? PMD_CYCLES_PROCESSING
+ : PMD_CYCLES_IDLE);
}
if (lc++ > 1024) {
lc = 0;
coverage_try_clear();
- dp_netdev_pmd_try_optimize(pmd);
+ dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt);
if (!ovsrcu_try_quiesce()) {
emc_cache_slow_sweep(&pmd->flow_cache);
}
}
}
+ cycles_count_end(pmd, PMD_CYCLES_IDLE);
+
poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
exiting = latch_is_set(&pmd->exit_latch);
/* Signal here to make sure the pmd finishes
* reloading the updated configuration. */
dp_netdev_pmd_reload_done(pmd);
- emc_cache_uninit(&pmd->flow_cache);
+ pmd_free_static_tx_qid(pmd);
if (!exiting) {
goto reload;
}
+ emc_cache_uninit(&pmd->flow_cache);
free(poll_list);
pmd_free_cached_ports(pmd);
return NULL;
{
struct dp_meter *meter;
struct dp_meter_band *band;
+ struct dp_packet *packet;
long long int long_delta_t; /* msec */
uint32_t delta_t; /* msec */
int i;
- int cnt = packets_->count;
+ const size_t cnt = dp_packet_batch_size(packets_);
uint32_t bytes, volume;
int exceeded_band[NETDEV_MAX_BURST];
uint32_t exceeded_rate[NETDEV_MAX_BURST];
meter->used = now;
meter->packet_count += cnt;
bytes = 0;
- for (i = 0; i < cnt; i++) {
- bytes += dp_packet_size(packets_->packets[i]);
+ DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+ bytes += dp_packet_size(packet);
}
meter->byte_count += bytes;
} else {
/* Packet sizes differ, must process one-by-one. */
band_exceeded_pkt = cnt;
- for (i = 0; i < cnt; i++) {
- uint32_t bits = dp_packet_size(packets_->packets[i]) * 8;
+ DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+ uint32_t bits = dp_packet_size(packet) * 8;
if (band->bucket >= bits) {
band->bucket -= bits;
/* Fire the highest rate band exceeded by each packet.
* Drop packets if needed, by swapping packet to the end that will be
* ignored. */
- const size_t size = dp_packet_batch_size(packets_);
- struct dp_packet *packet;
size_t j;
- DP_PACKET_BATCH_REFILL_FOR_EACH (j, size, packet, packets_) {
+ DP_PACKET_BATCH_REFILL_FOR_EACH (j, cnt, packet, packets_) {
if (exceeded_band[j] >= 0) {
/* Meter drop packet. */
band = &meter->bands[exceeded_band[j]];
!(config->flags & (OFPMF13_KBPS | OFPMF13_PKTPS))) {
return EBADF; /* Unsupported flags set */
}
+
/* Validate bands */
if (config->n_bands == 0 || config->n_bands > MAX_BANDS) {
return EINVAL; /* Too many bands */
}
+
+ /* Validate rates */
+ for (i = 0; i < config->n_bands; i++) {
+ if (config->bands[i].rate == 0) {
+ return EDOM; /* rate must be non-zero */
+ }
+ }
+
for (i = 0; i < config->n_bands; ++i) {
switch (config->bands[i].type) {
case OFPMBT13_DROP:
pmd->numa_id = numa_id;
pmd->need_reload = false;
- *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp->poll_threads);
-
ovs_refcount_init(&pmd->ref_cnt);
latch_init(&pmd->exit_latch);
pmd->reload_seq = seq_create();
cmap_init(&pmd->flow_table);
cmap_init(&pmd->classifiers);
pmd->next_optimization = time_msec() + DPCLS_OPTIMIZATION_INTERVAL;
+ pmd->rxq_next_cycle_store = time_msec() + PMD_RXQ_INTERVAL_LEN;
hmap_init(&pmd->poll_list);
hmap_init(&pmd->tx_ports);
hmap_init(&pmd->tnl_port_cache);
* actual thread created for NON_PMD_CORE_ID. */
if (core_id == NON_PMD_CORE_ID) {
emc_cache_init(&pmd->flow_cache);
+ pmd_alloc_static_tx_qid(pmd);
}
cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
hash_int(core_id, 0));
ovs_mutex_lock(&dp->non_pmd_mutex);
emc_cache_uninit(&pmd->flow_cache);
pmd_free_cached_ports(pmd);
+ pmd_free_static_tx_qid(pmd);
ovs_mutex_unlock(&dp->non_pmd_mutex);
} else {
latch_set(&pmd->exit_latch);
actions, wc, put_actions, dp->upcall_aux);
}
+static inline uint32_t
+dpif_netdev_packet_get_rss_hash_orig_pkt(struct dp_packet *packet,
+ const struct miniflow *mf)
+{
+ uint32_t hash;
+
+ if (OVS_LIKELY(dp_packet_rss_valid(packet))) {
+ hash = dp_packet_get_rss_hash(packet);
+ } else {
+ hash = miniflow_hash_5tuple(mf, 0);
+ dp_packet_set_rss_hash(packet, hash);
+ }
+
+ return hash;
+}
+
static inline uint32_t
dpif_netdev_packet_get_rss_hash(struct dp_packet *packet,
const struct miniflow *mf)
* The function returns the number of packets that needs to be processed in the
* 'packets' array (they have been moved to the beginning of the vector).
*
- * If 'md_is_valid' is false, the metadata in 'packets' is not valid and must
- * be initialized by this function using 'port_no'.
+ * For performance reasons a caller may choose not to initialize the metadata
+ * in 'packets_'. If 'md_is_valid' is false, the metadata in 'packets'
+ * is not valid and must be initialized by this function using 'port_no'.
+ * If 'md_is_valid' is true, the metadata is already valid and 'port_no'
+ * will be ignored.
*/
static inline size_t
emc_processing(struct dp_netdev_pmd_thread *pmd,
struct netdev_flow_key *key = &keys[0];
size_t n_missed = 0, n_dropped = 0;
struct dp_packet *packet;
- const size_t size = dp_packet_batch_size(packets_);
+ const size_t cnt = dp_packet_batch_size(packets_);
uint32_t cur_min;
int i;
atomic_read_relaxed(&pmd->dp->emc_insert_min, &cur_min);
- DP_PACKET_BATCH_REFILL_FOR_EACH (i, size, packet, packets_) {
+ DP_PACKET_BATCH_REFILL_FOR_EACH (i, cnt, packet, packets_) {
struct dp_netdev_flow *flow;
if (OVS_UNLIKELY(dp_packet_size(packet) < ETH_HEADER_LEN)) {
continue;
}
- if (i != size - 1) {
+ if (i != cnt - 1) {
struct dp_packet **packets = packets_->packets;
/* Prefetch next packet data and metadata. */
OVS_PREFETCH(dp_packet_data(packets[i+1]));
}
miniflow_extract(packet, &key->mf);
key->len = 0; /* Not computed yet. */
- key->hash = dpif_netdev_packet_get_rss_hash(packet, &key->mf);
-
- /* If EMC is disabled skip emc_lookup */
- flow = (cur_min == 0) ? NULL: emc_lookup(flow_cache, key);
+ /* If EMC is disabled skip hash computation and emc_lookup */
+ if (cur_min) {
+ if (!md_is_valid) {
+ key->hash = dpif_netdev_packet_get_rss_hash_orig_pkt(packet,
+ &key->mf);
+ } else {
+ key->hash = dpif_netdev_packet_get_rss_hash(packet, &key->mf);
+ }
+ flow = emc_lookup(flow_cache, key);
+ } else {
+ flow = NULL;
+ }
if (OVS_LIKELY(flow)) {
dp_netdev_queue_batches(packet, flow, &key->mf, batches,
n_batches);
}
dp_netdev_count_packet(pmd, DP_STAT_EXACT_HIT,
- size - n_dropped - n_missed);
+ cnt - n_dropped - n_missed);
return dp_packet_batch_size(packets_);
}
odp_port_t in_port,
long long now)
{
- int cnt = packets_->count;
+ const size_t cnt = dp_packet_batch_size(packets_);
#if !defined(__CHECKER__) && !defined(_WIN32)
const size_t PKT_ARRAY_SIZE = cnt;
#else
/* Sparse or MSVC doesn't like variable length array. */
enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
#endif
- struct dp_packet **packets = packets_->packets;
+ struct dp_packet *packet;
struct dpcls *cls;
struct dpcls_rule *rules[PKT_ARRAY_SIZE];
struct dp_netdev *dp = pmd->dp;
ofpbuf_use_stub(&actions, actions_stub, sizeof actions_stub);
ofpbuf_use_stub(&put_actions, slow_stub, sizeof slow_stub);
- for (i = 0; i < cnt; i++) {
+ DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
struct dp_netdev_flow *netdev_flow;
if (OVS_LIKELY(rules[i])) {
}
miss_cnt++;
- handle_packet_upcall(pmd, packets[i], &keys[i], &actions,
+ handle_packet_upcall(pmd, packet, &keys[i], &actions,
&put_actions, &lost_cnt, now);
}
ofpbuf_uninit(&put_actions);
fat_rwlock_unlock(&dp->upcall_rwlock);
} else if (OVS_UNLIKELY(any_miss)) {
- for (i = 0; i < cnt; i++) {
+ DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
if (OVS_UNLIKELY(!rules[i])) {
- dp_packet_delete(packets[i]);
+ dp_packet_delete(packet);
lost_cnt++;
miss_cnt++;
}
}
}
- for (i = 0; i < cnt; i++) {
- struct dp_packet *packet = packets[i];
+ DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
struct dp_netdev_flow *flow;
if (OVS_UNLIKELY(!rules[i])) {
/* Packets enter the datapath from a port (or from recirculation) here.
*
- * For performance reasons a caller may choose not to initialize the metadata
- * in 'packets': in this case 'mdinit' is false and this function needs to
- * initialize it using 'port_no'. If the metadata in 'packets' is already
- * valid, 'md_is_valid' must be true and 'port_no' will be ignored. */
+ * When 'md_is_valid' is true the metadata in 'packets' are already valid.
+ * When false the metadata in 'packets' need to be initialized. */
static void
dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
struct dp_packet_batch *packets,
bool md_is_valid, odp_port_t port_no)
{
- int cnt = packets->count;
#if !defined(__CHECKER__) && !defined(_WIN32)
- const size_t PKT_ARRAY_SIZE = cnt;
+ const size_t PKT_ARRAY_SIZE = dp_packet_batch_size(packets);
#else
/* Sparse or MSVC doesn't like variable length array. */
enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
data = nl_attr_get(attr);
- tun_port = pmd_tnl_port_cache_lookup(pmd, u32_to_odp(data->tnl_port));
+ tun_port = pmd_tnl_port_cache_lookup(pmd, data->tnl_port);
if (!tun_port) {
err = -EINVAL;
goto error;
case OVS_ACTION_ATTR_TUNNEL_PUSH:
if (*depth < MAX_RECIRC_DEPTH) {
- struct dp_packet_batch tnl_pkt;
- struct dp_packet_batch *orig_packets_ = packets_;
- int err;
-
- if (!may_steal) {
- dp_packet_batch_clone(&tnl_pkt, packets_);
- packets_ = &tnl_pkt;
- dp_packet_batch_reset_cutlen(orig_packets_);
- }
-
dp_packet_batch_apply_cutlen(packets_);
-
- err = push_tnl_action(pmd, a, packets_);
- if (!err) {
- (*depth)++;
- dp_netdev_recirculate(pmd, packets_);
- (*depth)--;
- }
+ push_tnl_action(pmd, a, packets_);
return;
}
break;
}
conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, force,
- commit, zone, setmark, setlabel, helper,
- nat_action_info_ref);
+ commit, zone, setmark, setlabel, aux->flow->tp_src,
+ aux->flow->tp_dst, helper, nat_action_info_ref, now);
break;
}
case OVS_ACTION_ATTR_PUSH_ETH:
case OVS_ACTION_ATTR_POP_ETH:
case OVS_ACTION_ATTR_CLONE:
+ case OVS_ACTION_ATTR_ENCAP_NSH:
+ case OVS_ACTION_ATTR_DECAP_NSH:
case __OVS_ACTION_ATTR_MAX:
OVS_NOT_REACHED();
}
static int
dpif_netdev_ct_dump_start(struct dpif *dpif, struct ct_dpif_dump_state **dump_,
- const uint16_t *pzone)
+ const uint16_t *pzone, int *ptot_bkts)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_ct_dump *dump;
dump->dp = dp;
dump->ct = &dp->conntrack;
- conntrack_dump_start(&dp->conntrack, &dump->dump, pzone);
+ conntrack_dump_start(&dp->conntrack, &dump->dump, pzone, ptot_bkts);
*dump_ = &dump->up;
}
static int
-dpif_netdev_ct_flush(struct dpif *dpif, const uint16_t *zone)
+dpif_netdev_ct_flush(struct dpif *dpif, const uint16_t *zone,
+ const struct ct_dpif_tuple *tuple)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
+ if (tuple) {
+ return EOPNOTSUPP;
+ }
return conntrack_flush(&dp->conntrack, zone);
}
}
static inline void
-dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd)
+dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd,
+ struct polled_queue *poll_list, int poll_cnt)
{
struct dpcls *cls;
long long int now = time_msec();
+ if (now > pmd->rxq_next_cycle_store) {
+ /* Get the cycles that were used to process each queue and store. */
+ for (unsigned i = 0; i < poll_cnt; i++) {
+ uint64_t rxq_cyc_curr = dp_netdev_rxq_get_cycles(poll_list[i].rxq,
+ RXQ_CYCLES_PROC_CURR);
+ dp_netdev_rxq_set_intrvl_cycles(poll_list[i].rxq, rxq_cyc_curr);
+ dp_netdev_rxq_set_cycles(poll_list[i].rxq, RXQ_CYCLES_PROC_CURR,
+ 0);
+ }
+ /* Start new measuring interval */
+ pmd->rxq_next_cycle_store = now + PMD_RXQ_INTERVAL_LEN;
+ }
+
if (now > pmd->next_optimization) {
/* Try to obtain the flow lock to block out revalidator threads.
* If not possible, just try next time. */