#include "fat-rwlock.h"
#include "flow.h"
#include "hmapx.h"
+#include "id-pool.h"
#include "latch.h"
#include "netdev.h"
#include "netdev-vport.h"
.ct_zone = true,
.ct_mark = true,
.ct_label = true,
+ .ct_state_nat = true,
+ .ct_orig_tuple = true,
+ .ct_orig_tuple6 = true,
};
/* Stores a miniflow with inline values */
/* Stores all 'struct dp_netdev_pmd_thread's. */
struct cmap poll_threads;
+ /* id pool for per thread static_tx_qid. */
+ struct id_pool *tx_qid_pool;
+ struct ovs_mutex tx_qid_pool_mutex;
/* Protects the access of the 'struct dp_netdev_pmd_thread'
* instance for non-pmd thread. */
/* Queue id used by this pmd thread to send packets on all netdevs if
* XPS disabled for this netdev. All static_tx_qid's are unique and less
* than 'cmap_count(dp->poll_threads)'. */
- const int static_tx_qid;
+ uint32_t static_tx_qid;
struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and 'tx_ports'. */
/* List of rx queues to poll. */
unsigned core_id);
static struct dp_netdev_pmd_thread *
dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
+static void dp_netdev_del_pmd(struct dp_netdev *dp,
+ struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd);
static void dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
cmap_init(&dp->poll_threads);
+
+ ovs_mutex_init(&dp->tx_qid_pool_mutex);
+ /* We need 1 Tx queue for each possible core + 1 for non-PMD threads. */
+ dp->tx_qid_pool = id_pool_create(0, ovs_numa_get_n_cores() + 1);
+
ovs_mutex_init_recursive(&dp->non_pmd_mutex);
ovsthread_key_create(&dp->per_pmd_key, NULL);
ovs_mutex_lock(&dp->port_mutex);
+ /* non-PMD will be created before all other threads and will
+ * allocate static_tx_qid = 0. */
dp_netdev_set_nonpmd(dp);
error = do_add_port(dp, name, dpif_netdev_port_open_type(dp->class,
dp_netdev_destroy_all_pmds(dp, true);
cmap_destroy(&dp->poll_threads);
+ ovs_mutex_destroy(&dp->tx_qid_pool_mutex);
+ id_pool_destroy(dp->tx_qid_pool);
+
ovs_mutex_destroy(&dp->non_pmd_mutex);
ovsthread_key_delete(dp->per_pmd_key);
return NULL;
}
+/* Returns the next node in numa list following 'numa' in round-robin fashion.
+ * Returns first node if 'numa' is a null pointer or the last node in 'rr'.
+ * Returns NULL if 'rr' numa list is empty. */
+static struct rr_numa *
+rr_numa_list_next(struct rr_numa_list *rr, const struct rr_numa *numa)
+{
+ struct hmap_node *node = NULL;
+
+ if (numa) {
+ node = hmap_next(&rr->numas, &numa->node);
+ }
+ if (!node) {
+ node = hmap_first(&rr->numas);
+ }
+
+ return (node) ? CONTAINER_OF(node, struct rr_numa, node) : NULL;
+}
+
static void
rr_numa_list_populate(struct dp_netdev *dp, struct rr_numa_list *rr)
{
{
struct dp_netdev_port *port;
struct rr_numa_list rr;
+ struct rr_numa *non_local_numa = NULL;
rr_numa_list_populate(dp, &rr);
}
} else if (!pinned && q->core_id == OVS_CORE_UNSPEC) {
if (!numa) {
- VLOG_WARN("There's no available (non isolated) pmd thread "
+ /* There are no pmds on the queue's local NUMA node.
+ Round-robin on the NUMA nodes that do have pmds. */
+ non_local_numa = rr_numa_list_next(&rr, non_local_numa);
+ if (!non_local_numa) {
+ VLOG_ERR("There is no available (non-isolated) pmd "
+ "thread for port \'%s\' queue %d. This queue "
+ "will not be polled. Is pmd-cpu-mask set to "
+ "zero? Or are all PMDs isolated to other "
+ "queues?", netdev_get_name(port->netdev),
+ qid);
+ continue;
+ }
+ q->pmd = rr_numa_get_pmd(non_local_numa);
+ VLOG_WARN("There's no available (non-isolated) pmd thread "
"on numa node %d. Queue %d on port \'%s\' will "
- "not be polled.",
- numa_id, qid, netdev_get_name(port->netdev));
+ "be assigned to the pmd on core %d "
+ "(numa node %d). Expect reduced performance.",
+ numa_id, qid, netdev_get_name(port->netdev),
+ q->pmd->core_id, q->pmd->numa_id);
} else {
+ /* Assign queue to the next (round-robin) PMD on it's local
+ NUMA node. */
q->pmd = rr_numa_get_pmd(numa);
}
}
rr_numa_list_destroy(&rr);
}
+static void
+reload_affected_pmds(struct dp_netdev *dp)
+{
+ struct dp_netdev_pmd_thread *pmd;
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->need_reload) {
+ dp_netdev_reload_pmd__(pmd);
+ pmd->need_reload = false;
+ }
+ }
+}
+
static void
reconfigure_pmd_threads(struct dp_netdev *dp)
OVS_REQUIRES(dp->port_mutex)
{
struct dp_netdev_pmd_thread *pmd;
struct ovs_numa_dump *pmd_cores;
+ struct ovs_numa_info_core *core;
+ struct hmapx to_delete = HMAPX_INITIALIZER(&to_delete);
+ struct hmapx_node *node;
bool changed = false;
+ bool need_to_adjust_static_tx_qids = false;
/* The pmd threads should be started only if there's a pmd port in the
* datapath. If the user didn't provide any "pmd-cpu-mask", we start
pmd_cores = ovs_numa_dump_n_cores_per_numa(NR_PMD_THREADS);
}
- /* Check for changed configuration */
- if (ovs_numa_dump_count(pmd_cores) != cmap_count(&dp->poll_threads) - 1) {
- changed = true;
- } else {
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- if (pmd->core_id != NON_PMD_CORE_ID
- && !ovs_numa_dump_contains_core(pmd_cores,
- pmd->numa_id,
- pmd->core_id)) {
- changed = true;
- break;
- }
+ /* We need to adjust 'static_tx_qid's only if we're reducing number of
+ * PMD threads. Otherwise, new threads will allocate all the freed ids. */
+ if (ovs_numa_dump_count(pmd_cores) < cmap_count(&dp->poll_threads) - 1) {
+ /* Adjustment is required to keep 'static_tx_qid's sequential and
+ * avoid possible issues, for example, imbalanced tx queue usage
+ * and unnecessary locking caused by remapping on netdev level. */
+ need_to_adjust_static_tx_qids = true;
+ }
+
+ /* Check for unwanted pmd threads */
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->core_id == NON_PMD_CORE_ID) {
+ continue;
+ }
+ if (!ovs_numa_dump_contains_core(pmd_cores, pmd->numa_id,
+ pmd->core_id)) {
+ hmapx_add(&to_delete, pmd);
+ } else if (need_to_adjust_static_tx_qids) {
+ pmd->need_reload = true;
}
}
- /* Destroy the old and recreate the new pmd threads. We don't perform an
- * incremental update because we would have to adjust 'static_tx_qid'. */
- if (changed) {
- struct ovs_numa_info_core *core;
- struct ovs_numa_info_numa *numa;
+ HMAPX_FOR_EACH (node, &to_delete) {
+ pmd = (struct dp_netdev_pmd_thread *) node->data;
+ VLOG_INFO("PMD thread on numa_id: %d, core id: %2d destroyed.",
+ pmd->numa_id, pmd->core_id);
+ dp_netdev_del_pmd(dp, pmd);
+ }
+ changed = !hmapx_is_empty(&to_delete);
+ hmapx_destroy(&to_delete);
- /* Do not destroy the non pmd thread. */
- dp_netdev_destroy_all_pmds(dp, false);
- FOR_EACH_CORE_ON_DUMP (core, pmd_cores) {
- struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
+ if (need_to_adjust_static_tx_qids) {
+ /* 'static_tx_qid's are not sequential now.
+ * Reload remaining threads to fix this. */
+ reload_affected_pmds(dp);
+ }
+ /* Check for required new pmd threads */
+ FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
+ pmd = dp_netdev_get_pmd(dp, core->core_id);
+ if (!pmd) {
+ pmd = xzalloc(sizeof *pmd);
dp_netdev_configure_pmd(pmd, dp, core->core_id, core->numa_id);
-
pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
+ VLOG_INFO("PMD thread on numa_id: %d, core id: %2d created.",
+ pmd->numa_id, pmd->core_id);
+ changed = true;
+ } else {
+ dp_netdev_pmd_unref(pmd);
}
+ }
+
+ if (changed) {
+ struct ovs_numa_info_numa *numa;
/* Log the number of pmd threads per numa node. */
FOR_EACH_NUMA_ON_DUMP (numa, pmd_cores) {
- VLOG_INFO("Created %"PRIuSIZE" pmd threads on numa node %d",
+ VLOG_INFO("There are %"PRIuSIZE" pmd threads on numa node %d",
numa->n_cores, numa->numa_id);
}
}
ovs_numa_dump_destroy(pmd_cores);
}
-static void
-reload_affected_pmds(struct dp_netdev *dp)
-{
- struct dp_netdev_pmd_thread *pmd;
-
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- if (pmd->need_reload) {
- dp_netdev_reload_pmd__(pmd);
- pmd->need_reload = false;
- }
- }
-}
-
static void
pmd_remove_stale_ports(struct dp_netdev *dp,
struct dp_netdev_pmd_thread *pmd)
}
}
+static void
+pmd_alloc_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
+{
+ ovs_mutex_lock(&pmd->dp->tx_qid_pool_mutex);
+ if (!id_pool_alloc_id(pmd->dp->tx_qid_pool, &pmd->static_tx_qid)) {
+ VLOG_ABORT("static_tx_qid allocation failed for PMD on core %2d"
+ ", numa_id %d.", pmd->core_id, pmd->numa_id);
+ }
+ ovs_mutex_unlock(&pmd->dp->tx_qid_pool_mutex);
+
+ VLOG_DBG("static_tx_qid = %d allocated for PMD thread on core %2d"
+ ", numa_id %d.", pmd->static_tx_qid, pmd->core_id, pmd->numa_id);
+}
+
+static void
+pmd_free_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
+{
+ ovs_mutex_lock(&pmd->dp->tx_qid_pool_mutex);
+ id_pool_free_id(pmd->dp->tx_qid_pool, pmd->static_tx_qid);
+ ovs_mutex_unlock(&pmd->dp->tx_qid_pool_mutex);
+}
+
static int
pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
struct polled_queue **ppoll_list)
ovs_numa_thread_setaffinity_core(pmd->core_id);
dpdk_set_lcore_id(pmd->core_id);
poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
-reload:
emc_cache_init(&pmd->flow_cache);
+reload:
+ pmd_alloc_static_tx_qid(pmd);
/* List port/core affinity */
for (i = 0; i < poll_cnt; i++) {
* reloading the updated configuration. */
dp_netdev_pmd_reload_done(pmd);
- emc_cache_uninit(&pmd->flow_cache);
+ pmd_free_static_tx_qid(pmd);
if (!exiting) {
goto reload;
}
+ emc_cache_uninit(&pmd->flow_cache);
free(poll_list);
pmd_free_cached_ports(pmd);
return NULL;
pmd->numa_id = numa_id;
pmd->need_reload = false;
- *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp->poll_threads);
-
ovs_refcount_init(&pmd->ref_cnt);
latch_init(&pmd->exit_latch);
pmd->reload_seq = seq_create();
* actual thread created for NON_PMD_CORE_ID. */
if (core_id == NON_PMD_CORE_ID) {
emc_cache_init(&pmd->flow_cache);
+ pmd_alloc_static_tx_qid(pmd);
}
cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
hash_int(core_id, 0));
ovs_mutex_lock(&dp->non_pmd_mutex);
emc_cache_uninit(&pmd->flow_cache);
pmd_free_cached_ports(pmd);
+ pmd_free_static_tx_qid(pmd);
ovs_mutex_unlock(&dp->non_pmd_mutex);
} else {
latch_set(&pmd->exit_latch);
case OVS_ACTION_ATTR_TUNNEL_PUSH:
if (*depth < MAX_RECIRC_DEPTH) {
- struct dp_packet_batch tnl_pkt;
- struct dp_packet_batch *orig_packets_ = packets_;
- int err;
-
- if (!may_steal) {
- dp_packet_batch_clone(&tnl_pkt, packets_);
- packets_ = &tnl_pkt;
- dp_packet_batch_reset_cutlen(orig_packets_);
- }
-
dp_packet_batch_apply_cutlen(packets_);
-
- err = push_tnl_action(pmd, a, packets_);
- if (!err) {
- (*depth)++;
- dp_netdev_recirculate(pmd, packets_);
- (*depth)--;
- }
+ push_tnl_action(pmd, a, packets_);
return;
}
break;