]> git.proxmox.com Git - ovs.git/blobdiff - lib/dpif-netdev.c
conntrack : Use Rx checksum offload feature on DPDK ports for conntrack.
[ovs.git] / lib / dpif-netdev.c
index b47abbdf7dd9af47866e24ecb8a92d02d0ffd6e3..284cecc74753189eb910ed692319b4b10f3c4e0f 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016, 2017 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -48,6 +48,7 @@
 #include "fat-rwlock.h"
 #include "flow.h"
 #include "hmapx.h"
+#include "id-pool.h"
 #include "latch.h"
 #include "netdev.h"
 #include "netdev-vport.h"
@@ -100,7 +101,8 @@ static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex)
 static struct vlog_rate_limit upcall_rl = VLOG_RATE_LIMIT_INIT(600, 600);
 
 #define DP_NETDEV_CS_SUPPORTED_MASK (CS_NEW | CS_ESTABLISHED | CS_RELATED \
-                                     | CS_INVALID | CS_REPLY_DIR | CS_TRACKED)
+                                     | CS_INVALID | CS_REPLY_DIR | CS_TRACKED \
+                                     | CS_SRC_NAT | CS_DST_NAT)
 #define DP_NETDEV_CS_UNSUPPORTED_MASK (~(uint32_t)DP_NETDEV_CS_SUPPORTED_MASK)
 
 static struct odp_support dp_netdev_support = {
@@ -111,6 +113,9 @@ static struct odp_support dp_netdev_support = {
     .ct_zone = true,
     .ct_mark = true,
     .ct_label = true,
+    .ct_state_nat = true,
+    .ct_orig_tuple = true,
+    .ct_orig_tuple6 = true,
 };
 
 /* Stores a miniflow with inline values */
@@ -261,6 +266,9 @@ struct dp_netdev {
     struct ovs_mutex meter_locks[N_METER_LOCKS];
     struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
 
+    /* Probability of EMC insertions is a factor of 'emc_insert_min'.*/
+    OVS_ALIGNED_VAR(CACHE_LINE_SIZE) atomic_uint32_t emc_insert_min;
+
     /* Protects access to ofproto-dpif-upcall interface during revalidator
      * thread synchronization. */
     struct fat_rwlock upcall_rwlock;
@@ -274,6 +282,9 @@ struct dp_netdev {
 
     /* Stores all 'struct dp_netdev_pmd_thread's. */
     struct cmap poll_threads;
+    /* id pool for per thread static_tx_qid. */
+    struct id_pool *tx_qid_pool;
+    struct ovs_mutex tx_qid_pool_mutex;
 
     /* Protects the access of the 'struct dp_netdev_pmd_thread'
      * instance for non-pmd thread. */
@@ -292,9 +303,6 @@ struct dp_netdev {
     uint64_t last_tnl_conf_seq;
 
     struct conntrack conntrack;
-
-    /* Probability of EMC insertions is a factor of 'emc_insert_min'.*/
-    OVS_ALIGNED_VAR(CACHE_LINE_SIZE) atomic_uint32_t emc_insert_min;
 };
 
 static void meter_lock(const struct dp_netdev *dp, uint32_t meter_id)
@@ -325,8 +333,9 @@ enum dp_stat_type {
 };
 
 enum pmd_cycles_counter_type {
-    PMD_CYCLES_POLLING,         /* Cycles spent polling NICs. */
-    PMD_CYCLES_PROCESSING,      /* Cycles spent processing packets */
+    PMD_CYCLES_IDLE,            /* Cycles spent idle or unsuccessful polling */
+    PMD_CYCLES_PROCESSING,      /* Cycles spent successfully polling and
+                                 * processing polled packets */
     PMD_N_CYCLES
 };
 
@@ -350,7 +359,7 @@ struct dp_netdev_port {
     struct hmap_node node;      /* Node in dp_netdev's 'ports'. */
     struct netdev_saved_flags *sf;
     struct dp_netdev_rxq *rxqs;
-    unsigned n_rxq;             /* Number of elements in 'rxq' */
+    unsigned n_rxq;             /* Number of elements in 'rxqs' */
     bool dynamic_txqs;          /* If true XPS will be used. */
     unsigned *txq_used;         /* Number of threads that use each tx queue. */
     struct ovs_mutex txq_used_mutex;
@@ -507,9 +516,11 @@ struct tx_port {
  * I/O of all non-pmd threads.  There will be no actual thread created
  * for the instance.
  *
- * Each struct has its own flow table and classifier.  Packets received
- * from managed ports are looked up in the corresponding pmd thread's
- * flow table, and are executed with the found actions.
+ * Each struct has its own flow cache and classifier per managed ingress port.
+ * For packets received on ingress port, a look up is done on corresponding PMD
+ * thread's flow cache and in case of a miss, lookup is performed in the
+ * corresponding classifier of port.  Packets are executed with the found
+ * actions in either case.
  * */
 struct dp_netdev_pmd_thread {
     struct dp_netdev *dp;
@@ -560,7 +571,7 @@ struct dp_netdev_pmd_thread {
     /* Queue id used by this pmd thread to send packets on all netdevs if
      * XPS disabled for this netdev. All static_tx_qid's are unique and less
      * than 'cmap_count(dp->poll_threads)'. */
-    const int static_tx_qid;
+    uint32_t static_tx_qid;
 
     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
     /* List of rx queues to poll. */
@@ -640,6 +651,8 @@ static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp,
                                                       unsigned core_id);
 static struct dp_netdev_pmd_thread *
 dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
+static void dp_netdev_del_pmd(struct dp_netdev *dp,
+                              struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd);
 static void dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
@@ -801,10 +814,10 @@ pmd_info_show_stats(struct ds *reply,
     }
 
     ds_put_format(reply,
-                  "\tpolling cycles:%"PRIu64" (%.02f%%)\n"
+                  "\tidle cycles:%"PRIu64" (%.02f%%)\n"
                   "\tprocessing cycles:%"PRIu64" (%.02f%%)\n",
-                  cycles[PMD_CYCLES_POLLING],
-                  cycles[PMD_CYCLES_POLLING] / (double)total_cycles * 100,
+                  cycles[PMD_CYCLES_IDLE],
+                  cycles[PMD_CYCLES_IDLE] / (double)total_cycles * 100,
                   cycles[PMD_CYCLES_PROCESSING],
                   cycles[PMD_CYCLES_PROCESSING] / (double)total_cycles * 100);
 
@@ -881,10 +894,9 @@ sorted_poll_list(struct dp_netdev_pmd_thread *pmd, struct rxq_poll **list,
             i++;
         }
         ovs_assert(i == *n);
+        qsort(ret, *n, sizeof *ret, compare_poll_list);
     }
 
-    qsort(ret, *n, sizeof *ret, compare_poll_list);
-
     *list = ret;
 }
 
@@ -922,13 +934,58 @@ pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd)
     }
 }
 
+static int
+compare_poll_thread_list(const void *a_, const void *b_)
+{
+    const struct dp_netdev_pmd_thread *a, *b;
+
+    a = *(struct dp_netdev_pmd_thread **)a_;
+    b = *(struct dp_netdev_pmd_thread **)b_;
+
+    if (a->core_id < b->core_id) {
+        return -1;
+    }
+    if (a->core_id > b->core_id) {
+        return 1;
+    }
+    return 0;
+}
+
+/* Create a sorted list of pmd's from the dp->poll_threads cmap. We can use
+ * this list, as long as we do not go to quiescent state. */
+static void
+sorted_poll_thread_list(struct dp_netdev *dp,
+                        struct dp_netdev_pmd_thread ***list,
+                        size_t *n)
+{
+    struct dp_netdev_pmd_thread *pmd;
+    struct dp_netdev_pmd_thread **pmd_list;
+    size_t k = 0, n_pmds;
+
+    n_pmds = cmap_count(&dp->poll_threads);
+    pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (k >= n_pmds) {
+            break;
+        }
+        pmd_list[k++] = pmd;
+    }
+
+    qsort(pmd_list, k, sizeof *pmd_list, compare_poll_thread_list);
+
+    *list = pmd_list;
+    *n = k;
+}
+
 static void
 dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
                      void *aux)
 {
     struct ds reply = DS_EMPTY_INITIALIZER;
-    struct dp_netdev_pmd_thread *pmd;
+    struct dp_netdev_pmd_thread **pmd_list;
     struct dp_netdev *dp = NULL;
+    size_t n;
     enum pmd_info_type type = *(enum pmd_info_type *) aux;
 
     ovs_mutex_lock(&dp_netdev_mutex);
@@ -947,7 +1004,13 @@ dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
         return;
     }
 
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+    sorted_poll_thread_list(dp, &pmd_list, &n);
+    for (size_t i = 0; i < n; i++) {
+        struct dp_netdev_pmd_thread *pmd = pmd_list[i];
+        if (!pmd) {
+            break;
+        }
+
         if (type == PMD_INFO_SHOW_RXQ) {
             pmd_info_show_rxq(&reply, pmd);
         } else {
@@ -970,6 +1033,7 @@ dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
             }
         }
     }
+    free(pmd_list);
 
     ovs_mutex_unlock(&dp_netdev_mutex);
 
@@ -1127,10 +1191,17 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
 
     cmap_init(&dp->poll_threads);
+
+    ovs_mutex_init(&dp->tx_qid_pool_mutex);
+    /* We need 1 Tx queue for each possible core + 1 for non-PMD threads. */
+    dp->tx_qid_pool = id_pool_create(0, ovs_numa_get_n_cores() + 1);
+
     ovs_mutex_init_recursive(&dp->non_pmd_mutex);
     ovsthread_key_create(&dp->per_pmd_key, NULL);
 
     ovs_mutex_lock(&dp->port_mutex);
+    /* non-PMD will be created before all other threads and will
+     * allocate static_tx_qid = 0. */
     dp_netdev_set_nonpmd(dp);
 
     error = do_add_port(dp, name, dpif_netdev_port_open_type(dp->class,
@@ -1225,6 +1296,9 @@ dp_netdev_free(struct dp_netdev *dp)
     dp_netdev_destroy_all_pmds(dp, true);
     cmap_destroy(&dp->poll_threads);
 
+    ovs_mutex_destroy(&dp->tx_qid_pool_mutex);
+    id_pool_destroy(dp->tx_qid_pool);
+
     ovs_mutex_destroy(&dp->non_pmd_mutex);
     ovsthread_key_delete(dp->per_pmd_key);
 
@@ -1864,24 +1938,6 @@ netdev_flow_key_clone(struct netdev_flow_key *dst,
            offsetof(struct netdev_flow_key, mf) + src->len);
 }
 
-/* Slow. */
-static void
-netdev_flow_key_from_flow(struct netdev_flow_key *dst,
-                          const struct flow *src)
-{
-    struct dp_packet packet;
-    uint64_t buf_stub[512 / 8];
-
-    dp_packet_use_stub(&packet, buf_stub, sizeof buf_stub);
-    pkt_metadata_from_flow(&packet.md, src);
-    flow_compose(&packet, src);
-    miniflow_extract(&packet, &dst->mf);
-    dp_packet_uninit(&packet);
-
-    dst->len = netdev_flow_key_size(miniflow_n_values(&dst->mf));
-    dst->hash = 0; /* Not computed yet. */
-}
-
 /* Initialize a netdev_flow_key 'mask' from 'match'. */
 static inline void
 netdev_flow_mask_init(struct netdev_flow_key *mask,
@@ -2037,11 +2093,7 @@ emc_probabilistic_insert(struct dp_netdev_pmd_thread *pmd,
     uint32_t min;
     atomic_read_relaxed(&pmd->dp->emc_insert_min, &min);
 
-#ifdef DPDK_NETDEV
-    if (min && (key->hash ^ (uint32_t) pmd->last_cycles) <= min) {
-#else
-    if (min && (key->hash ^ random_uint32()) <= min) {
-#endif
+    if (min && random_uint32() <= min) {
         emc_insert(&pmd->flow_cache, key, flow);
     }
 }
@@ -2221,8 +2273,6 @@ static int
 dpif_netdev_flow_from_nlattrs(const struct nlattr *key, uint32_t key_len,
                               struct flow *flow, bool probe)
 {
-    odp_port_t in_port;
-
     if (odp_flow_key_to_flow(key, key_len, flow)) {
         if (!probe) {
             /* This should not happen: it indicates that
@@ -2244,11 +2294,6 @@ dpif_netdev_flow_from_nlattrs(const struct nlattr *key, uint32_t key_len,
         return EINVAL;
     }
 
-    in_port = flow->in_port.odp_port;
-    if (!is_valid_port_number(in_port) && in_port != ODPP_NONE) {
-        return EINVAL;
-    }
-
     if (flow->ct_state & DP_NETDEV_CS_UNSUPPORTED_MASK) {
         return EINVAL;
     }
@@ -2357,7 +2402,7 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
     cmap_insert(&pmd->flow_table, CONST_CAST(struct cmap_node *, &flow->node),
                 dp_netdev_flow_hash(&flow->ufid));
 
-    if (OVS_UNLIKELY(VLOG_IS_DBG_ENABLED())) {
+    if (OVS_UNLIKELY(!VLOG_DROP_DBG((&upcall_rl)))) {
         struct ds ds = DS_EMPTY_INITIALIZER;
         struct ofpbuf key_buf, mask_buf;
         struct odp_flow_key_parms odp_parms = {
@@ -2380,12 +2425,24 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
                         mask_buf.data, mask_buf.size,
                         NULL, &ds, false);
         ds_put_cstr(&ds, ", actions:");
-        format_odp_actions(&ds, actions, actions_len);
+        format_odp_actions(&ds, actions, actions_len, NULL);
 
-        VLOG_DBG_RL(&upcall_rl, "%s", ds_cstr(&ds));
+        VLOG_DBG("%s", ds_cstr(&ds));
 
         ofpbuf_uninit(&key_buf);
         ofpbuf_uninit(&mask_buf);
+
+        /* Add a printout of the actual match installed. */
+        struct match m;
+        ds_clear(&ds);
+        ds_put_cstr(&ds, "flow match: ");
+        miniflow_expand(&flow->cr.flow.mf, &m.flow);
+        miniflow_expand(&flow->cr.mask->mf, &m.wc.masks);
+        memset(&m.tun_md, 0, sizeof m.tun_md);
+        match_format(&m, NULL, &ds, OFP_DEFAULT_PRIORITY);
+
+        VLOG_DBG("%s", ds_cstr(&ds));
+
         ds_destroy(&ds);
     }
 
@@ -2422,8 +2479,7 @@ flow_put_on_pmd(struct dp_netdev_pmd_thread *pmd,
             error = ENOENT;
         }
     } else {
-        if (put->flags & DPIF_FP_MODIFY
-            && flow_equal(&match->flow, &netdev_flow->flow)) {
+        if (put->flags & DPIF_FP_MODIFY) {
             struct dp_netdev_actions *new_actions;
             struct dp_netdev_actions *old_actions;
 
@@ -2465,7 +2521,7 @@ static int
 dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
-    struct netdev_flow_key key;
+    struct netdev_flow_key key, mask;
     struct dp_netdev_pmd_thread *pmd;
     struct match match;
     ovs_u128 ufid;
@@ -2494,9 +2550,10 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
     }
 
     /* Must produce a netdev_flow_key for lookup.
-     * This interface is no longer performance critical, since it is not used
-     * for upcall processing any more. */
-    netdev_flow_key_from_flow(&key, &match.flow);
+     * Use the same method as employed to create the key when adding
+     * the flow to the dplcs to make sure they match. */
+    netdev_flow_mask_init(&mask, &match);
+    netdev_flow_key_init_masked(&key, &match.flow, &mask);
 
     if (put->pmd_id == PMD_ID_NULL) {
         if (cmap_count(&dp->poll_threads) == 0) {
@@ -2611,7 +2668,8 @@ dpif_netdev_flow_dump_cast(struct dpif_flow_dump *dump)
 }
 
 static struct dpif_flow_dump *
-dpif_netdev_flow_dump_create(const struct dpif *dpif_, bool terse)
+dpif_netdev_flow_dump_create(const struct dpif *dpif_, bool terse,
+                             char *type OVS_UNUSED)
 {
     struct dpif_netdev_flow_dump *dump;
 
@@ -2972,7 +3030,7 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
 
 \f
 /* Creates and returns a new 'struct dp_netdev_actions', whose actions are
- * a copy of the 'ofpacts_len' bytes of 'ofpacts'. */
+ * a copy of the 'size' bytes of 'actions' input parameters. */
 struct dp_netdev_actions *
 dp_netdev_actions_create(const struct nlattr *actions, size_t size)
 {
@@ -3031,30 +3089,43 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd,
     non_atomic_ullong_add(&pmd->cycles.n[type], interval);
 }
 
-static void
+/* Calculate the intermediate cycle result and add to the counter 'type' */
+static inline void
+cycles_count_intermediate(struct dp_netdev_pmd_thread *pmd,
+                          enum pmd_cycles_counter_type type)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
+{
+    unsigned long long new_cycles = cycles_counter();
+    unsigned long long interval = new_cycles - pmd->last_cycles;
+    pmd->last_cycles = new_cycles;
+
+    non_atomic_ullong_add(&pmd->cycles.n[type], interval);
+}
+
+static int
 dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                            struct netdev_rxq *rx,
                            odp_port_t port_no)
 {
     struct dp_packet_batch batch;
     int error;
+    int batch_cnt = 0;
 
     dp_packet_batch_init(&batch);
-    cycles_count_start(pmd);
     error = netdev_rxq_recv(rx, &batch);
-    cycles_count_end(pmd, PMD_CYCLES_POLLING);
     if (!error) {
         *recirc_depth_get() = 0;
 
-        cycles_count_start(pmd);
+        batch_cnt = batch.count;
         dp_netdev_input(pmd, &batch, port_no);
-        cycles_count_end(pmd, PMD_CYCLES_PROCESSING);
     } else if (error != EAGAIN && error != EOPNOTSUPP) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
         VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
                     netdev_rxq_get_name(rx), ovs_strerror(error));
     }
+
+    return batch_cnt;
 }
 
 static struct tx_port *
@@ -3147,6 +3218,24 @@ rr_numa_list_lookup(struct rr_numa_list *rr, int numa_id)
     return NULL;
 }
 
+/* Returns the next node in numa list following 'numa' in round-robin fashion.
+ * Returns first node if 'numa' is a null pointer or the last node in 'rr'.
+ * Returns NULL if 'rr' numa list is empty. */
+static struct rr_numa *
+rr_numa_list_next(struct rr_numa_list *rr, const struct rr_numa *numa)
+{
+    struct hmap_node *node = NULL;
+
+    if (numa) {
+        node = hmap_next(&rr->numas, &numa->node);
+    }
+    if (!node) {
+        node = hmap_first(&rr->numas);
+    }
+
+    return (node) ? CONTAINER_OF(node, struct rr_numa, node) : NULL;
+}
+
 static void
 rr_numa_list_populate(struct dp_netdev *dp, struct rr_numa_list *rr)
 {
@@ -3201,6 +3290,7 @@ rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
 {
     struct dp_netdev_port *port;
     struct rr_numa_list rr;
+    struct rr_numa *non_local_numa = NULL;
 
     rr_numa_list_populate(dp, &rr);
 
@@ -3233,11 +3323,28 @@ rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
                 }
             } else if (!pinned && q->core_id == OVS_CORE_UNSPEC) {
                 if (!numa) {
-                    VLOG_WARN("There's no available (non isolated) pmd thread "
+                    /* There are no pmds on the queue's local NUMA node.
+                       Round-robin on the NUMA nodes that do have pmds. */
+                    non_local_numa = rr_numa_list_next(&rr, non_local_numa);
+                    if (!non_local_numa) {
+                        VLOG_ERR("There is no available (non-isolated) pmd "
+                                 "thread for port \'%s\' queue %d. This queue "
+                                 "will not be polled. Is pmd-cpu-mask set to "
+                                 "zero? Or are all PMDs isolated to other "
+                                 "queues?", netdev_get_name(port->netdev),
+                                 qid);
+                        continue;
+                    }
+                    q->pmd = rr_numa_get_pmd(non_local_numa);
+                    VLOG_WARN("There's no available (non-isolated) pmd thread "
                               "on numa node %d. Queue %d on port \'%s\' will "
-                              "not be polled.",
-                              numa_id, qid, netdev_get_name(port->netdev));
+                              "be assigned to the pmd on core %d "
+                              "(numa node %d). Expect reduced performance.",
+                              numa_id, qid, netdev_get_name(port->netdev),
+                              q->pmd->core_id, q->pmd->numa_id);
                 } else {
+                    /* Assign queue to the next (round-robin) PMD on it's local
+                       NUMA node. */
                     q->pmd = rr_numa_get_pmd(numa);
                 }
             }
@@ -3247,13 +3354,30 @@ rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
     rr_numa_list_destroy(&rr);
 }
 
+static void
+reload_affected_pmds(struct dp_netdev *dp)
+{
+    struct dp_netdev_pmd_thread *pmd;
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->need_reload) {
+            dp_netdev_reload_pmd__(pmd);
+            pmd->need_reload = false;
+        }
+    }
+}
+
 static void
 reconfigure_pmd_threads(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex)
 {
     struct dp_netdev_pmd_thread *pmd;
     struct ovs_numa_dump *pmd_cores;
+    struct ovs_numa_info_core *core;
+    struct hmapx to_delete = HMAPX_INITIALIZER(&to_delete);
+    struct hmapx_node *node;
     bool changed = false;
+    bool need_to_adjust_static_tx_qids = false;
 
     /* The pmd threads should be started only if there's a pmd port in the
      * datapath.  If the user didn't provide any "pmd-cpu-mask", we start
@@ -3266,40 +3390,64 @@ reconfigure_pmd_threads(struct dp_netdev *dp)
         pmd_cores = ovs_numa_dump_n_cores_per_numa(NR_PMD_THREADS);
     }
 
-    /* Check for changed configuration */
-    if (ovs_numa_dump_count(pmd_cores) != cmap_count(&dp->poll_threads) - 1) {
-        changed = true;
-    } else {
-        CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-            if (pmd->core_id != NON_PMD_CORE_ID
-                && !ovs_numa_dump_contains_core(pmd_cores,
-                                                pmd->numa_id,
-                                                pmd->core_id)) {
-                changed = true;
-                break;
-            }
+    /* We need to adjust 'static_tx_qid's only if we're reducing number of
+     * PMD threads. Otherwise, new threads will allocate all the freed ids. */
+    if (ovs_numa_dump_count(pmd_cores) < cmap_count(&dp->poll_threads) - 1) {
+        /* Adjustment is required to keep 'static_tx_qid's sequential and
+         * avoid possible issues, for example, imbalanced tx queue usage
+         * and unnecessary locking caused by remapping on netdev level. */
+        need_to_adjust_static_tx_qids = true;
+    }
+
+    /* Check for unwanted pmd threads */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->core_id == NON_PMD_CORE_ID) {
+            continue;
+        }
+        if (!ovs_numa_dump_contains_core(pmd_cores, pmd->numa_id,
+                                                    pmd->core_id)) {
+            hmapx_add(&to_delete, pmd);
+        } else if (need_to_adjust_static_tx_qids) {
+            pmd->need_reload = true;
         }
     }
 
-    /* Destroy the old and recreate the new pmd threads.  We don't perform an
-     * incremental update because we would have to adjust 'static_tx_qid'. */
-    if (changed) {
-        struct ovs_numa_info_core *core;
-        struct ovs_numa_info_numa *numa;
+    HMAPX_FOR_EACH (node, &to_delete) {
+        pmd = (struct dp_netdev_pmd_thread *) node->data;
+        VLOG_INFO("PMD thread on numa_id: %d, core id: %2d destroyed.",
+                  pmd->numa_id, pmd->core_id);
+        dp_netdev_del_pmd(dp, pmd);
+    }
+    changed = !hmapx_is_empty(&to_delete);
+    hmapx_destroy(&to_delete);
 
-        /* Do not destroy the non pmd thread. */
-        dp_netdev_destroy_all_pmds(dp, false);
-        FOR_EACH_CORE_ON_DUMP (core, pmd_cores) {
-            struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
+    if (need_to_adjust_static_tx_qids) {
+        /* 'static_tx_qid's are not sequential now.
+         * Reload remaining threads to fix this. */
+        reload_affected_pmds(dp);
+    }
 
+    /* Check for required new pmd threads */
+    FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
+        pmd = dp_netdev_get_pmd(dp, core->core_id);
+        if (!pmd) {
+            pmd = xzalloc(sizeof *pmd);
             dp_netdev_configure_pmd(pmd, dp, core->core_id, core->numa_id);
-
             pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
+            VLOG_INFO("PMD thread on numa_id: %d, core id: %2d created.",
+                      pmd->numa_id, pmd->core_id);
+            changed = true;
+        } else {
+            dp_netdev_pmd_unref(pmd);
         }
+    }
+
+    if (changed) {
+        struct ovs_numa_info_numa *numa;
 
         /* Log the number of pmd threads per numa node. */
         FOR_EACH_NUMA_ON_DUMP (numa, pmd_cores) {
-            VLOG_INFO("Created %"PRIuSIZE" pmd threads on numa node %d",
+            VLOG_INFO("There are %"PRIuSIZE" pmd threads on numa node %d",
                       numa->n_cores, numa->numa_id);
         }
     }
@@ -3307,19 +3455,6 @@ reconfigure_pmd_threads(struct dp_netdev *dp)
     ovs_numa_dump_destroy(pmd_cores);
 }
 
-static void
-reload_affected_pmds(struct dp_netdev *dp)
-{
-    struct dp_netdev_pmd_thread *pmd;
-
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        if (pmd->need_reload) {
-            dp_netdev_reload_pmd__(pmd);
-            pmd->need_reload = false;
-        }
-    }
-}
-
 static void
 pmd_remove_stale_ports(struct dp_netdev *dp,
                        struct dp_netdev_pmd_thread *pmd)
@@ -3378,8 +3513,8 @@ reconfigure_datapath(struct dp_netdev *dp)
      * need reconfiguration. */
 
     /* Check for all the ports that need reconfiguration.  We cache this in
-     * 'port->reconfigure', because netdev_is_reconf_required() can change at
-     * any time. */
+     * 'port->need_reconfigure', because netdev_is_reconf_required() can
+     * change at any time. */
     HMAP_FOR_EACH (port, node, &dp->ports) {
         if (netdev_is_reconf_required(port->netdev)) {
             port->need_reconfigure = true;
@@ -3402,7 +3537,8 @@ reconfigure_datapath(struct dp_netdev *dp)
     /* We only reconfigure the ports that we determined above, because they're
      * not being used by any pmd thread at the moment.  If a port fails to
      * reconfigure we remove it from the datapath. */
-    HMAP_FOR_EACH (port, node, &dp->ports) {
+    struct dp_netdev_port *next_port;
+    HMAP_FOR_EACH_SAFE (port, next_port, node, &dp->ports) {
         int err;
 
         if (!port->need_reconfigure) {
@@ -3516,21 +3652,29 @@ dpif_netdev_run(struct dpif *dpif)
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_pmd_thread *non_pmd;
     uint64_t new_tnl_seq;
+    int process_packets = 0;
 
     ovs_mutex_lock(&dp->port_mutex);
     non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
     if (non_pmd) {
         ovs_mutex_lock(&dp->non_pmd_mutex);
+        cycles_count_start(non_pmd);
         HMAP_FOR_EACH (port, node, &dp->ports) {
             if (!netdev_is_pmd(port->netdev)) {
                 int i;
 
                 for (i = 0; i < port->n_rxq; i++) {
-                    dp_netdev_process_rxq_port(non_pmd, port->rxqs[i].rx,
-                                               port->port_no);
+                    process_packets =
+                        dp_netdev_process_rxq_port(non_pmd,
+                                                   port->rxqs[i].rx,
+                                                   port->port_no);
+                    cycles_count_intermediate(non_pmd, process_packets ?
+                                                       PMD_CYCLES_PROCESSING
+                                                     : PMD_CYCLES_IDLE);
                 }
             }
         }
+        cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
         dpif_netdev_xps_revalidate_pmd(non_pmd, time_msec(), false);
         ovs_mutex_unlock(&dp->non_pmd_mutex);
 
@@ -3619,6 +3763,28 @@ pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
     }
 }
 
+static void
+pmd_alloc_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
+{
+    ovs_mutex_lock(&pmd->dp->tx_qid_pool_mutex);
+    if (!id_pool_alloc_id(pmd->dp->tx_qid_pool, &pmd->static_tx_qid)) {
+        VLOG_ABORT("static_tx_qid allocation failed for PMD on core %2d"
+                   ", numa_id %d.", pmd->core_id, pmd->numa_id);
+    }
+    ovs_mutex_unlock(&pmd->dp->tx_qid_pool_mutex);
+
+    VLOG_DBG("static_tx_qid = %d allocated for PMD thread on core %2d"
+             ", numa_id %d.", pmd->static_tx_qid, pmd->core_id, pmd->numa_id);
+}
+
+static void
+pmd_free_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
+{
+    ovs_mutex_lock(&pmd->dp->tx_qid_pool_mutex);
+    id_pool_free_id(pmd->dp->tx_qid_pool, pmd->static_tx_qid);
+    ovs_mutex_unlock(&pmd->dp->tx_qid_pool_mutex);
+}
+
 static int
 pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
                           struct polled_queue **ppoll_list)
@@ -3655,6 +3821,7 @@ pmd_thread_main(void *f_)
     bool exiting;
     int poll_cnt;
     int i;
+    int process_packets = 0;
 
     poll_list = NULL;
 
@@ -3663,8 +3830,9 @@ pmd_thread_main(void *f_)
     ovs_numa_thread_setaffinity_core(pmd->core_id);
     dpdk_set_lcore_id(pmd->core_id);
     poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
-reload:
     emc_cache_init(&pmd->flow_cache);
+reload:
+    pmd_alloc_static_tx_qid(pmd);
 
     /* List port/core affinity */
     for (i = 0; i < poll_cnt; i++) {
@@ -3681,10 +3849,15 @@ reload:
         lc = UINT_MAX;
     }
 
+    cycles_count_start(pmd);
     for (;;) {
         for (i = 0; i < poll_cnt; i++) {
-            dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
-                                       poll_list[i].port_no);
+            process_packets =
+                dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
+                                           poll_list[i].port_no);
+            cycles_count_intermediate(pmd,
+                                      process_packets ? PMD_CYCLES_PROCESSING
+                                                      : PMD_CYCLES_IDLE);
         }
 
         if (lc++ > 1024) {
@@ -3705,18 +3878,21 @@ reload:
         }
     }
 
+    cycles_count_end(pmd, PMD_CYCLES_IDLE);
+
     poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
     exiting = latch_is_set(&pmd->exit_latch);
     /* Signal here to make sure the pmd finishes
      * reloading the updated configuration. */
     dp_netdev_pmd_reload_done(pmd);
 
-    emc_cache_uninit(&pmd->flow_cache);
+    pmd_free_static_tx_qid(pmd);
 
     if (!exiting) {
         goto reload;
     }
 
+    emc_cache_uninit(&pmd->flow_cache);
     free(poll_list);
     pmd_free_cached_ports(pmd);
     return NULL;
@@ -4122,8 +4298,6 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd->numa_id = numa_id;
     pmd->need_reload = false;
 
-    *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp->poll_threads);
-
     ovs_refcount_init(&pmd->ref_cnt);
     latch_init(&pmd->exit_latch);
     pmd->reload_seq = seq_create();
@@ -4144,6 +4318,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
      * actual thread created for NON_PMD_CORE_ID. */
     if (core_id == NON_PMD_CORE_ID) {
         emc_cache_init(&pmd->flow_cache);
+        pmd_alloc_static_tx_qid(pmd);
     }
     cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
                 hash_int(core_id, 0));
@@ -4186,6 +4361,7 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
         ovs_mutex_lock(&dp->non_pmd_mutex);
         emc_cache_uninit(&pmd->flow_cache);
         pmd_free_cached_ports(pmd);
+        pmd_free_static_tx_qid(pmd);
         ovs_mutex_unlock(&dp->non_pmd_mutex);
     } else {
         latch_set(&pmd->exit_latch);
@@ -4500,8 +4676,11 @@ emc_processing(struct dp_netdev_pmd_thread *pmd,
     size_t n_missed = 0, n_dropped = 0;
     struct dp_packet *packet;
     const size_t size = dp_packet_batch_size(packets_);
+    uint32_t cur_min;
     int i;
 
+    atomic_read_relaxed(&pmd->dp->emc_insert_min, &cur_min);
+
     DP_PACKET_BATCH_REFILL_FOR_EACH (i, size, packet, packets_) {
         struct dp_netdev_flow *flow;
 
@@ -4525,7 +4704,8 @@ emc_processing(struct dp_netdev_pmd_thread *pmd,
         key->len = 0; /* Not computed yet. */
         key->hash = dpif_netdev_packet_get_rss_hash(packet, &key->mf);
 
-        flow = emc_lookup(flow_cache, key);
+        /* If EMC is disabled skip emc_lookup */
+        flow = (cur_min == 0) ? NULL: emc_lookup(flow_cache, key);
         if (OVS_LIKELY(flow)) {
             dp_netdev_queue_batches(packet, flow, &key->mf, batches,
                                     n_batches);
@@ -4902,7 +5082,7 @@ push_tnl_action(const struct dp_netdev_pmd_thread *pmd,
 
     data = nl_attr_get(attr);
 
-    tun_port = pmd_tnl_port_cache_lookup(pmd, u32_to_odp(data->tnl_port));
+    tun_port = pmd_tnl_port_cache_lookup(pmd, data->tnl_port);
     if (!tun_port) {
         err = -EINVAL;
         goto error;
@@ -4975,24 +5155,8 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
 
     case OVS_ACTION_ATTR_TUNNEL_PUSH:
         if (*depth < MAX_RECIRC_DEPTH) {
-            struct dp_packet_batch tnl_pkt;
-            struct dp_packet_batch *orig_packets_ = packets_;
-            int err;
-
-            if (!may_steal) {
-                dp_packet_batch_clone(&tnl_pkt, packets_);
-                packets_ = &tnl_pkt;
-                dp_packet_batch_reset_cutlen(orig_packets_);
-            }
-
             dp_packet_batch_apply_cutlen(packets_);
-
-            err = push_tnl_action(pmd, a, packets_);
-            if (!err) {
-                (*depth)++;
-                dp_netdev_recirculate(pmd, packets_);
-                (*depth)--;
-            }
+            push_tnl_action(pmd, a, packets_);
             return;
         }
         break;
@@ -5108,6 +5272,9 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
         const char *helper = NULL;
         const uint32_t *setmark = NULL;
         const struct ovs_key_ct_labels *setlabel = NULL;
+        struct nat_action_info_t nat_action_info;
+        struct nat_action_info_t *nat_action_info_ref = NULL;
+        bool nat_config = false;
 
         NL_ATTR_FOR_EACH_UNSAFE (b, left, nl_attr_get(a),
                                  nl_attr_get_size(a)) {
@@ -5136,15 +5303,90 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 /* Silently ignored, as userspace datapath does not generate
                  * netlink events. */
                 break;
-            case OVS_CT_ATTR_NAT:
+            case OVS_CT_ATTR_NAT: {
+                const struct nlattr *b_nest;
+                unsigned int left_nest;
+                bool ip_min_specified = false;
+                bool proto_num_min_specified = false;
+                bool ip_max_specified = false;
+                bool proto_num_max_specified = false;
+                memset(&nat_action_info, 0, sizeof nat_action_info);
+                nat_action_info_ref = &nat_action_info;
+
+                NL_NESTED_FOR_EACH_UNSAFE (b_nest, left_nest, b) {
+                    enum ovs_nat_attr sub_type_nest = nl_attr_type(b_nest);
+
+                    switch (sub_type_nest) {
+                    case OVS_NAT_ATTR_SRC:
+                    case OVS_NAT_ATTR_DST:
+                        nat_config = true;
+                        nat_action_info.nat_action |=
+                            ((sub_type_nest == OVS_NAT_ATTR_SRC)
+                                ? NAT_ACTION_SRC : NAT_ACTION_DST);
+                        break;
+                    case OVS_NAT_ATTR_IP_MIN:
+                        memcpy(&nat_action_info.min_addr,
+                               nl_attr_get(b_nest),
+                               nl_attr_get_size(b_nest));
+                        ip_min_specified = true;
+                        break;
+                    case OVS_NAT_ATTR_IP_MAX:
+                        memcpy(&nat_action_info.max_addr,
+                               nl_attr_get(b_nest),
+                               nl_attr_get_size(b_nest));
+                        ip_max_specified = true;
+                        break;
+                    case OVS_NAT_ATTR_PROTO_MIN:
+                        nat_action_info.min_port =
+                            nl_attr_get_u16(b_nest);
+                        proto_num_min_specified = true;
+                        break;
+                    case OVS_NAT_ATTR_PROTO_MAX:
+                        nat_action_info.max_port =
+                            nl_attr_get_u16(b_nest);
+                        proto_num_max_specified = true;
+                        break;
+                    case OVS_NAT_ATTR_PERSISTENT:
+                    case OVS_NAT_ATTR_PROTO_HASH:
+                    case OVS_NAT_ATTR_PROTO_RANDOM:
+                        break;
+                    case OVS_NAT_ATTR_UNSPEC:
+                    case __OVS_NAT_ATTR_MAX:
+                        OVS_NOT_REACHED();
+                    }
+                }
+
+                if (ip_min_specified && !ip_max_specified) {
+                    nat_action_info.max_addr = nat_action_info.min_addr;
+                }
+                if (proto_num_min_specified && !proto_num_max_specified) {
+                    nat_action_info.max_port = nat_action_info.min_port;
+                }
+                if (proto_num_min_specified || proto_num_max_specified) {
+                    if (nat_action_info.nat_action & NAT_ACTION_SRC) {
+                        nat_action_info.nat_action |= NAT_ACTION_SRC_PORT;
+                    } else if (nat_action_info.nat_action & NAT_ACTION_DST) {
+                        nat_action_info.nat_action |= NAT_ACTION_DST_PORT;
+                    }
+                }
+                break;
+            }
             case OVS_CT_ATTR_UNSPEC:
             case __OVS_CT_ATTR_MAX:
                 OVS_NOT_REACHED();
             }
         }
 
+        /* We won't be able to function properly in this case, hence
+         * complain loudly. */
+        if (nat_config && !commit) {
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
+            VLOG_WARN_RL(&rl, "NAT specified without commit.");
+        }
+
         conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, force,
-                          commit, zone, setmark, setlabel, helper);
+                          commit, zone, setmark, setlabel, helper,
+                          nat_action_info_ref);
         break;
     }