]> git.proxmox.com Git - mirror_ovs.git/blobdiff - lib/dpif-netdev.c
sparse: Add guards to prevent FreeBSD-incompatible #include order.
[mirror_ovs.git] / lib / dpif-netdev.c
index 2b65dc74a269842efdbe65605a1002e1dcc84dc7..6ba025b6697e8a2e1ab64ad65d9a3573b572ba88 100644 (file)
@@ -22,6 +22,7 @@
 #include <fcntl.h>
 #include <inttypes.h>
 #include <net/if.h>
+#include <sys/types.h>
 #include <netinet/in.h>
 #include <stdint.h>
 #include <stdlib.h>
@@ -48,6 +49,7 @@
 #include "fat-rwlock.h"
 #include "flow.h"
 #include "hmapx.h"
+#include "id-pool.h"
 #include "latch.h"
 #include "netdev.h"
 #include "netdev-vport.h"
@@ -65,7 +67,7 @@
 #include "ovs-numa.h"
 #include "ovs-rcu.h"
 #include "packets.h"
-#include "poll-loop.h"
+#include "openvswitch/poll-loop.h"
 #include "pvector.h"
 #include "random.h"
 #include "seq.h"
@@ -81,7 +83,7 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
 
 #define FLOW_DUMP_MAX_BATCH 50
 /* Use per thread recirc_depth to prevent recirculation loop. */
-#define MAX_RECIRC_DEPTH 5
+#define MAX_RECIRC_DEPTH 6
 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
 
 /* Configuration parameters. */
@@ -112,6 +114,9 @@ static struct odp_support dp_netdev_support = {
     .ct_zone = true,
     .ct_mark = true,
     .ct_label = true,
+    .ct_state_nat = true,
+    .ct_orig_tuple = true,
+    .ct_orig_tuple6 = true,
 };
 
 /* Stores a miniflow with inline values */
@@ -177,6 +182,14 @@ struct emc_cache {
 /* Time in ms between successive optimizations of the dpcls subtable vector */
 #define DPCLS_OPTIMIZATION_INTERVAL 1000
 
+/* Time in ms of the interval in which rxq processing cycles used in
+ * rxq to pmd assignments is measured and stored. */
+#define PMD_RXQ_INTERVAL_LEN 10000
+
+/* Number of intervals for which cycles are stored
+ * and used during rxq to pmd assignment. */
+#define PMD_RXQ_INTERVAL_MAX 6
+
 struct dpcls {
     struct cmap_node node;      /* Within dp_netdev_pmd_thread.classifiers */
     odp_port_t in_port;
@@ -278,6 +291,9 @@ struct dp_netdev {
 
     /* Stores all 'struct dp_netdev_pmd_thread's. */
     struct cmap poll_threads;
+    /* id pool for per thread static_tx_qid. */
+    struct id_pool *tx_qid_pool;
+    struct ovs_mutex tx_qid_pool_mutex;
 
     /* Protects the access of the 'struct dp_netdev_pmd_thread'
      * instance for non-pmd thread. */
@@ -326,11 +342,21 @@ enum dp_stat_type {
 };
 
 enum pmd_cycles_counter_type {
-    PMD_CYCLES_POLLING,         /* Cycles spent polling NICs. */
-    PMD_CYCLES_PROCESSING,      /* Cycles spent processing packets */
+    PMD_CYCLES_IDLE,            /* Cycles spent idle or unsuccessful polling */
+    PMD_CYCLES_PROCESSING,      /* Cycles spent successfully polling and
+                                 * processing polled packets */
     PMD_N_CYCLES
 };
 
+enum rxq_cycles_counter_type {
+    RXQ_CYCLES_PROC_CURR,       /* Cycles spent successfully polling and
+                                   processing packets during the current
+                                   interval. */
+    RXQ_CYCLES_PROC_HIST,       /* Total cycles of all intervals that are used
+                                   during rxq to pmd assignment. */
+    RXQ_N_CYCLES
+};
+
 #define XPS_TIMEOUT_MS 500LL
 
 /* Contained by struct dp_netdev_port's 'rxqs' member.  */
@@ -341,23 +367,30 @@ struct dp_netdev_rxq {
                                           pinned. OVS_CORE_UNSPEC if the
                                           queue doesn't need to be pinned to a
                                           particular core. */
+    unsigned intrvl_idx;               /* Write index for 'cycles_intrvl'. */
     struct dp_netdev_pmd_thread *pmd;  /* pmd thread that polls this queue. */
+
+    /* Counters of cycles spent successfully polling and processing pkts. */
+    atomic_ullong cycles[RXQ_N_CYCLES];
+    /* We store PMD_RXQ_INTERVAL_MAX intervals of data for an rxq and then
+       sum them to yield the cycles used for an rxq. */
+    atomic_ullong cycles_intrvl[PMD_RXQ_INTERVAL_MAX];
 };
 
 /* A port in a netdev-based datapath. */
 struct dp_netdev_port {
     odp_port_t port_no;
+    bool dynamic_txqs;          /* If true XPS will be used. */
+    bool need_reconfigure;      /* True if we should reconfigure netdev. */
     struct netdev *netdev;
     struct hmap_node node;      /* Node in dp_netdev's 'ports'. */
     struct netdev_saved_flags *sf;
     struct dp_netdev_rxq *rxqs;
-    unsigned n_rxq;             /* Number of elements in 'rxq' */
-    bool dynamic_txqs;          /* If true XPS will be used. */
+    unsigned n_rxq;             /* Number of elements in 'rxqs' */
     unsigned *txq_used;         /* Number of threads that use each tx queue. */
     struct ovs_mutex txq_used_mutex;
     char *type;                 /* Port type as requested by user. */
     char *rxq_affinity_list;    /* Requested affinity of rx queues. */
-    bool need_reconfigure;      /* True if we should reconfigure netdev. */
 };
 
 /* Contained by struct dp_netdev_flow's 'stats' member.  */
@@ -477,7 +510,7 @@ struct dp_netdev_pmd_cycles {
 };
 
 struct polled_queue {
-    struct netdev_rxq *rx;
+    struct dp_netdev_rxq *rxq;
     odp_port_t port_no;
 };
 
@@ -541,6 +574,9 @@ struct dp_netdev_pmd_thread {
     struct cmap classifiers;
     /* Periodically sort subtable vectors according to hit frequencies */
     long long int next_optimization;
+    /* End of the next time interval for which processing cycles
+       are stored for each polled rxq. */
+    long long int rxq_next_cycle_store;
 
     /* Statistics. */
     struct dp_netdev_pmd_stats stats;
@@ -563,7 +599,7 @@ struct dp_netdev_pmd_thread {
     /* Queue id used by this pmd thread to send packets on all netdevs if
      * XPS disabled for this netdev. All static_tx_qid's are unique and less
      * than 'cmap_count(dp->poll_threads)'. */
-    const int static_tx_qid;
+    uint32_t static_tx_qid;
 
     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
     /* List of rx queues to poll. */
@@ -643,6 +679,8 @@ static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp,
                                                       unsigned core_id);
 static struct dp_netdev_pmd_thread *
 dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
+static void dp_netdev_del_pmd(struct dp_netdev *dp,
+                              struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd);
 static void dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
@@ -665,8 +703,20 @@ static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd);
 static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
     OVS_REQUIRES(pmd->port_mutex);
 static inline void
-dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd);
-
+dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd,
+                           struct polled_queue *poll_list, int poll_cnt);
+static void
+dp_netdev_rxq_set_cycles(struct dp_netdev_rxq *rx,
+                         enum rxq_cycles_counter_type type,
+                         unsigned long long cycles);
+static uint64_t
+dp_netdev_rxq_get_cycles(struct dp_netdev_rxq *rx,
+                         enum rxq_cycles_counter_type type);
+static void
+dp_netdev_rxq_set_intrvl_cycles(struct dp_netdev_rxq *rx,
+                           unsigned long long cycles);
+static uint64_t
+dp_netdev_rxq_get_intrvl_cycles(struct dp_netdev_rxq *rx, unsigned idx);
 static void
 dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
                                long long now, bool purge);
@@ -676,6 +726,8 @@ static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
 static inline bool emc_entry_alive(struct emc_entry *ce);
 static void emc_clear_entry(struct emc_entry *ce);
 
+static void dp_netdev_request_reconfigure(struct dp_netdev *dp);
+
 static void
 emc_cache_init(struct emc_cache *flow_cache)
 {
@@ -745,7 +797,7 @@ pmd_info_show_stats(struct ds *reply,
                     unsigned long long stats[DP_N_STATS],
                     uint64_t cycles[PMD_N_CYCLES])
 {
-    unsigned long long total_packets = 0;
+    unsigned long long total_packets;
     uint64_t total_cycles = 0;
     int i;
 
@@ -761,13 +813,12 @@ pmd_info_show_stats(struct ds *reply,
         } else {
             stats[i] = 0;
         }
-
-        if (i != DP_STAT_LOST) {
-            /* Lost packets are already included in DP_STAT_MISS */
-            total_packets += stats[i];
-        }
     }
 
+    /* Sum of all the matched and not matched packets gives the total.  */
+    total_packets = stats[DP_STAT_EXACT_HIT] + stats[DP_STAT_MASKED_HIT]
+                    + stats[DP_STAT_MISS];
+
     for (i = 0; i < PMD_N_CYCLES; i++) {
         if (cycles[i] > pmd->cycles_zero[i]) {
            cycles[i] -= pmd->cycles_zero[i];
@@ -804,10 +855,10 @@ pmd_info_show_stats(struct ds *reply,
     }
 
     ds_put_format(reply,
-                  "\tpolling cycles:%"PRIu64" (%.02f%%)\n"
+                  "\tidle cycles:%"PRIu64" (%.02f%%)\n"
                   "\tprocessing cycles:%"PRIu64" (%.02f%%)\n",
-                  cycles[PMD_CYCLES_POLLING],
-                  cycles[PMD_CYCLES_POLLING] / (double)total_cycles * 100,
+                  cycles[PMD_CYCLES_IDLE],
+                  cycles[PMD_CYCLES_IDLE] / (double)total_cycles * 100,
                   cycles[PMD_CYCLES_PROCESSING],
                   cycles[PMD_CYCLES_PROCESSING] / (double)total_cycles * 100);
 
@@ -884,10 +935,9 @@ sorted_poll_list(struct dp_netdev_pmd_thread *pmd, struct rxq_poll **list,
             i++;
         }
         ovs_assert(i == *n);
+        qsort(ret, *n, sizeof *ret, compare_poll_list);
     }
 
-    qsort(ret, *n, sizeof *ret, compare_poll_list);
-
     *list = ret;
 }
 
@@ -969,6 +1019,36 @@ sorted_poll_thread_list(struct dp_netdev *dp,
     *n = k;
 }
 
+static void
+dpif_netdev_pmd_rebalance(struct unixctl_conn *conn, int argc,
+                          const char *argv[], void *aux OVS_UNUSED)
+{
+    struct ds reply = DS_EMPTY_INITIALIZER;
+    struct dp_netdev *dp = NULL;
+
+    ovs_mutex_lock(&dp_netdev_mutex);
+
+    if (argc == 2) {
+        dp = shash_find_data(&dp_netdevs, argv[1]);
+    } else if (shash_count(&dp_netdevs) == 1) {
+        /* There's only one datapath */
+        dp = shash_first(&dp_netdevs)->data;
+    }
+
+    if (!dp) {
+        ovs_mutex_unlock(&dp_netdev_mutex);
+        unixctl_command_reply_error(conn,
+                                    "please specify an existing datapath");
+        return;
+    }
+
+    dp_netdev_request_reconfigure(dp);
+    ovs_mutex_unlock(&dp_netdev_mutex);
+    ds_put_cstr(&reply, "pmd rxq rebalance requested.\n");
+    unixctl_command_reply(conn, ds_cstr(&reply));
+    ds_destroy(&reply);
+}
+
 static void
 dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
                      void *aux)
@@ -1007,14 +1087,13 @@ dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
         } else {
             unsigned long long stats[DP_N_STATS];
             uint64_t cycles[PMD_N_CYCLES];
-            int i;
 
             /* Read current stats and cycle counters */
-            for (i = 0; i < ARRAY_SIZE(stats); i++) {
-                atomic_read_relaxed(&pmd->stats.n[i], &stats[i]);
+            for (size_t j = 0; j < ARRAY_SIZE(stats); j++) {
+                atomic_read_relaxed(&pmd->stats.n[j], &stats[j]);
             }
-            for (i = 0; i < ARRAY_SIZE(cycles); i++) {
-                atomic_read_relaxed(&pmd->cycles.n[i], &cycles[i]);
+            for (size_t j = 0; j < ARRAY_SIZE(cycles); j++) {
+                atomic_read_relaxed(&pmd->cycles.n[j], &cycles[j]);
             }
 
             if (type == PMD_INFO_CLEAR_STATS) {
@@ -1048,6 +1127,9 @@ dpif_netdev_init(void)
     unixctl_command_register("dpif-netdev/pmd-rxq-show", "[dp]",
                              0, 1, dpif_netdev_pmd_info,
                              (void *)&poll_aux);
+    unixctl_command_register("dpif-netdev/pmd-rxq-rebalance", "[dp]",
+                             0, 1, dpif_netdev_pmd_rebalance,
+                             NULL);
     return 0;
 }
 
@@ -1182,10 +1264,17 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
 
     cmap_init(&dp->poll_threads);
+
+    ovs_mutex_init(&dp->tx_qid_pool_mutex);
+    /* We need 1 Tx queue for each possible core + 1 for non-PMD threads. */
+    dp->tx_qid_pool = id_pool_create(0, ovs_numa_get_n_cores() + 1);
+
     ovs_mutex_init_recursive(&dp->non_pmd_mutex);
     ovsthread_key_create(&dp->per_pmd_key, NULL);
 
     ovs_mutex_lock(&dp->port_mutex);
+    /* non-PMD will be created before all other threads and will
+     * allocate static_tx_qid = 0. */
     dp_netdev_set_nonpmd(dp);
 
     error = do_add_port(dp, name, dpif_netdev_port_open_type(dp->class,
@@ -1280,6 +1369,9 @@ dp_netdev_free(struct dp_netdev *dp)
     dp_netdev_destroy_all_pmds(dp, true);
     cmap_destroy(&dp->poll_threads);
 
+    ovs_mutex_destroy(&dp->tx_qid_pool_mutex);
+    id_pool_destroy(dp->tx_qid_pool);
+
     ovs_mutex_destroy(&dp->non_pmd_mutex);
     ovsthread_key_delete(dp->per_pmd_key);
 
@@ -2074,11 +2166,7 @@ emc_probabilistic_insert(struct dp_netdev_pmd_thread *pmd,
     uint32_t min;
     atomic_read_relaxed(&pmd->dp->emc_insert_min, &min);
 
-#ifdef DPDK_NETDEV
-    if (min && (key->hash ^ (uint32_t) pmd->last_cycles) <= min) {
-#else
-    if (min && (key->hash ^ random_uint32()) <= min) {
-#endif
+    if (min && random_uint32() <= min) {
         emc_insert(&pmd->flow_cache, key, flow);
     }
 }
@@ -2258,8 +2346,6 @@ static int
 dpif_netdev_flow_from_nlattrs(const struct nlattr *key, uint32_t key_len,
                               struct flow *flow, bool probe)
 {
-    odp_port_t in_port;
-
     if (odp_flow_key_to_flow(key, key_len, flow)) {
         if (!probe) {
             /* This should not happen: it indicates that
@@ -2281,11 +2367,6 @@ dpif_netdev_flow_from_nlattrs(const struct nlattr *key, uint32_t key_len,
         return EINVAL;
     }
 
-    in_port = flow->in_port.odp_port;
-    if (!is_valid_port_number(in_port) && in_port != ODPP_NONE) {
-        return EINVAL;
-    }
-
     if (flow->ct_state & DP_NETDEV_CS_UNSUPPORTED_MASK) {
         return EINVAL;
     }
@@ -2417,7 +2498,7 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
                         mask_buf.data, mask_buf.size,
                         NULL, &ds, false);
         ds_put_cstr(&ds, ", actions:");
-        format_odp_actions(&ds, actions, actions_len);
+        format_odp_actions(&ds, actions, actions_len, NULL);
 
         VLOG_DBG("%s", ds_cstr(&ds));
 
@@ -2430,6 +2511,7 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
         ds_put_cstr(&ds, "flow match: ");
         miniflow_expand(&flow->cr.flow.mf, &m.flow);
         miniflow_expand(&flow->cr.mask->mf, &m.wc.masks);
+        memset(&m.tun_md, 0, sizeof m.tun_md);
         match_format(&m, NULL, &ds, OFP_DEFAULT_PRIORITY);
 
         VLOG_DBG("%s", ds_cstr(&ds));
@@ -2822,6 +2904,9 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
         /* If this is part of a probe, Drop the packet, since executing
          * the action may actually cause spurious packets be sent into
          * the network. */
+        if (pmd->core_id == NON_PMD_CORE_ID) {
+            dp_netdev_pmd_unref(pmd);
+        }
         return 0;
     }
 
@@ -3021,7 +3106,7 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
 
 \f
 /* Creates and returns a new 'struct dp_netdev_actions', whose actions are
- * a copy of the 'ofpacts_len' bytes of 'ofpacts'. */
+ * a copy of the 'size' bytes of 'actions' input parameters. */
 struct dp_netdev_actions *
 dp_netdev_actions_create(const struct nlattr *actions, size_t size)
 {
@@ -3080,30 +3165,81 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd,
     non_atomic_ullong_add(&pmd->cycles.n[type], interval);
 }
 
+/* Calculate the intermediate cycle result and add to the counter 'type' */
+static inline void
+cycles_count_intermediate(struct dp_netdev_pmd_thread *pmd,
+                          struct dp_netdev_rxq *rxq,
+                          enum pmd_cycles_counter_type type)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
+{
+    unsigned long long new_cycles = cycles_counter();
+    unsigned long long interval = new_cycles - pmd->last_cycles;
+    pmd->last_cycles = new_cycles;
+
+    non_atomic_ullong_add(&pmd->cycles.n[type], interval);
+    if (rxq && (type == PMD_CYCLES_PROCESSING)) {
+        /* Add to the amount of current processing cycles. */
+        non_atomic_ullong_add(&rxq->cycles[RXQ_CYCLES_PROC_CURR], interval);
+    }
+}
+
+static void
+dp_netdev_rxq_set_cycles(struct dp_netdev_rxq *rx,
+                         enum rxq_cycles_counter_type type,
+                         unsigned long long cycles)
+{
+   atomic_store_relaxed(&rx->cycles[type], cycles);
+}
+
+static uint64_t
+dp_netdev_rxq_get_cycles(struct dp_netdev_rxq *rx,
+                         enum rxq_cycles_counter_type type)
+{
+    unsigned long long processing_cycles;
+    atomic_read_relaxed(&rx->cycles[type], &processing_cycles);
+    return processing_cycles;
+}
+
 static void
+dp_netdev_rxq_set_intrvl_cycles(struct dp_netdev_rxq *rx,
+                                unsigned long long cycles)
+{
+    unsigned int idx = rx->intrvl_idx++ % PMD_RXQ_INTERVAL_MAX;
+    atomic_store_relaxed(&rx->cycles_intrvl[idx], cycles);
+}
+
+static uint64_t
+dp_netdev_rxq_get_intrvl_cycles(struct dp_netdev_rxq *rx, unsigned idx)
+{
+    unsigned long long processing_cycles;
+    atomic_read_relaxed(&rx->cycles_intrvl[idx], &processing_cycles);
+    return processing_cycles;
+}
+
+static int
 dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                            struct netdev_rxq *rx,
                            odp_port_t port_no)
 {
     struct dp_packet_batch batch;
     int error;
+    int batch_cnt = 0;
 
     dp_packet_batch_init(&batch);
-    cycles_count_start(pmd);
     error = netdev_rxq_recv(rx, &batch);
-    cycles_count_end(pmd, PMD_CYCLES_POLLING);
     if (!error) {
         *recirc_depth_get() = 0;
 
-        cycles_count_start(pmd);
+        batch_cnt = batch.count;
         dp_netdev_input(pmd, &batch, port_no);
-        cycles_count_end(pmd, PMD_CYCLES_PROCESSING);
     } else if (error != EAGAIN && error != EOPNOTSUPP) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
         VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
                     netdev_rxq_get_name(rx), ovs_strerror(error));
     }
+
+    return batch_cnt;
 }
 
 static struct tx_port *
@@ -3133,6 +3269,7 @@ port_reconfigure(struct dp_netdev_port *port)
         netdev_rxq_close(port->rxqs[i].rx);
         port->rxqs[i].rx = NULL;
     }
+    unsigned last_nrxq = port->n_rxq;
     port->n_rxq = 0;
 
     /* Allows 'netdev' to apply the pending configuration changes. */
@@ -3152,7 +3289,13 @@ port_reconfigure(struct dp_netdev_port *port)
     port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
 
     for (i = 0; i < netdev_n_rxq(netdev); i++) {
+        bool new_queue = i >= last_nrxq;
+        if (new_queue) {
+            memset(&port->rxqs[i], 0, sizeof port->rxqs[i]);
+        }
+
         port->rxqs[i].port = port;
+
         err = netdev_rxq_open(netdev, &port->rxqs[i].rx, i);
         if (err) {
             return err;
@@ -3180,6 +3323,7 @@ struct rr_numa {
     int n_pmds;
 
     int cur_index;
+    bool idx_inc;
 };
 
 static struct rr_numa *
@@ -3196,6 +3340,24 @@ rr_numa_list_lookup(struct rr_numa_list *rr, int numa_id)
     return NULL;
 }
 
+/* Returns the next node in numa list following 'numa' in round-robin fashion.
+ * Returns first node if 'numa' is a null pointer or the last node in 'rr'.
+ * Returns NULL if 'rr' numa list is empty. */
+static struct rr_numa *
+rr_numa_list_next(struct rr_numa_list *rr, const struct rr_numa *numa)
+{
+    struct hmap_node *node = NULL;
+
+    if (numa) {
+        node = hmap_next(&rr->numas, &numa->node);
+    }
+    if (!node) {
+        node = hmap_first(&rr->numas);
+    }
+
+    return (node) ? CONTAINER_OF(node, struct rr_numa, node) : NULL;
+}
+
 static void
 rr_numa_list_populate(struct dp_netdev *dp, struct rr_numa_list *rr)
 {
@@ -3218,13 +3380,37 @@ rr_numa_list_populate(struct dp_netdev *dp, struct rr_numa_list *rr)
         numa->n_pmds++;
         numa->pmds = xrealloc(numa->pmds, numa->n_pmds * sizeof *numa->pmds);
         numa->pmds[numa->n_pmds - 1] = pmd;
+        /* At least one pmd so initialise curr_idx and idx_inc. */
+        numa->cur_index = 0;
+        numa->idx_inc = true;
     }
 }
 
+/* Returns the next pmd from the numa node in
+ * incrementing or decrementing order. */
 static struct dp_netdev_pmd_thread *
 rr_numa_get_pmd(struct rr_numa *numa)
 {
-    return numa->pmds[numa->cur_index++ % numa->n_pmds];
+    int numa_idx = numa->cur_index;
+
+    if (numa->idx_inc == true) {
+        /* Incrementing through list of pmds. */
+        if (numa->cur_index == numa->n_pmds-1) {
+            /* Reached the last pmd. */
+            numa->idx_inc = false;
+        } else {
+            numa->cur_index++;
+        }
+    } else {
+        /* Decrementing through list of pmds. */
+        if (numa->cur_index == 0) {
+            /* Reached the first pmd. */
+            numa->idx_inc = true;
+        } else {
+            numa->cur_index--;
+        }
+    }
+    return numa->pmds[numa_idx];
 }
 
 static void
@@ -3239,10 +3425,44 @@ rr_numa_list_destroy(struct rr_numa_list *rr)
     hmap_destroy(&rr->numas);
 }
 
+/* Sort Rx Queues by the processing cycles they are consuming. */
+static int
+compare_rxq_cycles(const void *a, const void *b)
+{
+    struct dp_netdev_rxq *qa;
+    struct dp_netdev_rxq *qb;
+    uint64_t cycles_qa, cycles_qb;
+
+    qa = *(struct dp_netdev_rxq **) a;
+    qb = *(struct dp_netdev_rxq **) b;
+
+    cycles_qa = dp_netdev_rxq_get_cycles(qa, RXQ_CYCLES_PROC_HIST);
+    cycles_qb = dp_netdev_rxq_get_cycles(qb, RXQ_CYCLES_PROC_HIST);
+
+    if (cycles_qa != cycles_qb) {
+        return (cycles_qa < cycles_qb) ? 1 : -1;
+    } else {
+        /* Cycles are the same so tiebreak on port/queue id.
+         * Tiebreaking (as opposed to return 0) ensures consistent
+         * sort results across multiple OS's. */
+        uint32_t port_qa = odp_to_u32(qa->port->port_no);
+        uint32_t port_qb = odp_to_u32(qb->port->port_no);
+        if (port_qa != port_qb) {
+            return port_qa > port_qb ? 1 : -1;
+        } else {
+            return netdev_rxq_get_queue_id(qa->rx)
+                    - netdev_rxq_get_queue_id(qb->rx);
+        }
+    }
+}
+
 /* Assign pmds to queues.  If 'pinned' is true, assign pmds to pinned
  * queues and marks the pmds as isolated.  Otherwise, assign non isolated
  * pmds to unpinned queues.
  *
+ * If 'pinned' is false queues will be sorted by processing cycles they are
+ * consuming and then assigned to pmds in round robin order.
+ *
  * The function doesn't touch the pmd threads, it just stores the assignment
  * in the 'pmd' member of each rxq. */
 static void
@@ -3250,20 +3470,17 @@ rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
 {
     struct dp_netdev_port *port;
     struct rr_numa_list rr;
-
-    rr_numa_list_populate(dp, &rr);
+    struct rr_numa *non_local_numa = NULL;
+    struct dp_netdev_rxq ** rxqs = NULL;
+    int i, n_rxqs = 0;
+    struct rr_numa *numa = NULL;
+    int numa_id;
 
     HMAP_FOR_EACH (port, node, &dp->ports) {
-        struct rr_numa *numa;
-        int numa_id;
-
         if (!netdev_is_pmd(port->netdev)) {
             continue;
         }
 
-        numa_id = netdev_get_numa_id(port->netdev);
-        numa = rr_numa_list_lookup(&rr, numa_id);
-
         for (int qid = 0; qid < port->n_rxq; qid++) {
             struct dp_netdev_rxq *q = &port->rxqs[qid];
 
@@ -3281,19 +3498,83 @@ rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
                     dp_netdev_pmd_unref(pmd);
                 }
             } else if (!pinned && q->core_id == OVS_CORE_UNSPEC) {
-                if (!numa) {
-                    VLOG_WARN("There's no available (non isolated) pmd thread "
-                              "on numa node %d. Queue %d on port \'%s\' will "
-                              "not be polled.",
-                              numa_id, qid, netdev_get_name(port->netdev));
+                uint64_t cycle_hist = 0;
+
+                if (n_rxqs == 0) {
+                    rxqs = xmalloc(sizeof *rxqs);
                 } else {
-                    q->pmd = rr_numa_get_pmd(numa);
+                    rxqs = xrealloc(rxqs, sizeof *rxqs * (n_rxqs + 1));
+                }
+                /* Sum the queue intervals and store the cycle history. */
+                for (unsigned i = 0; i < PMD_RXQ_INTERVAL_MAX; i++) {
+                    cycle_hist += dp_netdev_rxq_get_intrvl_cycles(q, i);
                 }
+                dp_netdev_rxq_set_cycles(q, RXQ_CYCLES_PROC_HIST, cycle_hist);
+
+                /* Store the queue. */
+                rxqs[n_rxqs++] = q;
             }
         }
     }
 
+    if (n_rxqs > 1) {
+        /* Sort the queues in order of the processing cycles
+         * they consumed during their last pmd interval. */
+        qsort(rxqs, n_rxqs, sizeof *rxqs, compare_rxq_cycles);
+    }
+
+    rr_numa_list_populate(dp, &rr);
+    /* Assign the sorted queues to pmds in round robin. */
+    for (i = 0; i < n_rxqs; i++) {
+        numa_id = netdev_get_numa_id(rxqs[i]->port->netdev);
+        numa = rr_numa_list_lookup(&rr, numa_id);
+        if (!numa) {
+            /* There are no pmds on the queue's local NUMA node.
+               Round robin on the NUMA nodes that do have pmds. */
+            non_local_numa = rr_numa_list_next(&rr, non_local_numa);
+            if (!non_local_numa) {
+                VLOG_ERR("There is no available (non-isolated) pmd "
+                         "thread for port \'%s\' queue %d. This queue "
+                         "will not be polled. Is pmd-cpu-mask set to "
+                         "zero? Or are all PMDs isolated to other "
+                         "queues?", netdev_rxq_get_name(rxqs[i]->rx),
+                         netdev_rxq_get_queue_id(rxqs[i]->rx));
+                continue;
+            }
+            rxqs[i]->pmd = rr_numa_get_pmd(non_local_numa);
+            VLOG_WARN("There's no available (non-isolated) pmd thread "
+                      "on numa node %d. Queue %d on port \'%s\' will "
+                      "be assigned to the pmd on core %d "
+                      "(numa node %d). Expect reduced performance.",
+                      numa_id, netdev_rxq_get_queue_id(rxqs[i]->rx),
+                      netdev_rxq_get_name(rxqs[i]->rx),
+                      rxqs[i]->pmd->core_id, rxqs[i]->pmd->numa_id);
+        } else {
+        rxqs[i]->pmd = rr_numa_get_pmd(numa);
+        VLOG_INFO("Core %d on numa node %d assigned port \'%s\' "
+                  "rx queue %d (measured processing cycles %"PRIu64").",
+                  rxqs[i]->pmd->core_id, numa_id,
+                  netdev_rxq_get_name(rxqs[i]->rx),
+                  netdev_rxq_get_queue_id(rxqs[i]->rx),
+                  dp_netdev_rxq_get_cycles(rxqs[i], RXQ_CYCLES_PROC_HIST));
+        }
+    }
+
     rr_numa_list_destroy(&rr);
+    free(rxqs);
+}
+
+static void
+reload_affected_pmds(struct dp_netdev *dp)
+{
+    struct dp_netdev_pmd_thread *pmd;
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->need_reload) {
+            dp_netdev_reload_pmd__(pmd);
+            pmd->need_reload = false;
+        }
+    }
 }
 
 static void
@@ -3302,7 +3583,11 @@ reconfigure_pmd_threads(struct dp_netdev *dp)
 {
     struct dp_netdev_pmd_thread *pmd;
     struct ovs_numa_dump *pmd_cores;
+    struct ovs_numa_info_core *core;
+    struct hmapx to_delete = HMAPX_INITIALIZER(&to_delete);
+    struct hmapx_node *node;
     bool changed = false;
+    bool need_to_adjust_static_tx_qids = false;
 
     /* The pmd threads should be started only if there's a pmd port in the
      * datapath.  If the user didn't provide any "pmd-cpu-mask", we start
@@ -3315,40 +3600,64 @@ reconfigure_pmd_threads(struct dp_netdev *dp)
         pmd_cores = ovs_numa_dump_n_cores_per_numa(NR_PMD_THREADS);
     }
 
-    /* Check for changed configuration */
-    if (ovs_numa_dump_count(pmd_cores) != cmap_count(&dp->poll_threads) - 1) {
-        changed = true;
-    } else {
-        CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-            if (pmd->core_id != NON_PMD_CORE_ID
-                && !ovs_numa_dump_contains_core(pmd_cores,
-                                                pmd->numa_id,
-                                                pmd->core_id)) {
-                changed = true;
-                break;
-            }
+    /* We need to adjust 'static_tx_qid's only if we're reducing number of
+     * PMD threads. Otherwise, new threads will allocate all the freed ids. */
+    if (ovs_numa_dump_count(pmd_cores) < cmap_count(&dp->poll_threads) - 1) {
+        /* Adjustment is required to keep 'static_tx_qid's sequential and
+         * avoid possible issues, for example, imbalanced tx queue usage
+         * and unnecessary locking caused by remapping on netdev level. */
+        need_to_adjust_static_tx_qids = true;
+    }
+
+    /* Check for unwanted pmd threads */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->core_id == NON_PMD_CORE_ID) {
+            continue;
+        }
+        if (!ovs_numa_dump_contains_core(pmd_cores, pmd->numa_id,
+                                                    pmd->core_id)) {
+            hmapx_add(&to_delete, pmd);
+        } else if (need_to_adjust_static_tx_qids) {
+            pmd->need_reload = true;
         }
     }
 
-    /* Destroy the old and recreate the new pmd threads.  We don't perform an
-     * incremental update because we would have to adjust 'static_tx_qid'. */
-    if (changed) {
-        struct ovs_numa_info_core *core;
-        struct ovs_numa_info_numa *numa;
+    HMAPX_FOR_EACH (node, &to_delete) {
+        pmd = (struct dp_netdev_pmd_thread *) node->data;
+        VLOG_INFO("PMD thread on numa_id: %d, core id: %2d destroyed.",
+                  pmd->numa_id, pmd->core_id);
+        dp_netdev_del_pmd(dp, pmd);
+    }
+    changed = !hmapx_is_empty(&to_delete);
+    hmapx_destroy(&to_delete);
 
-        /* Do not destroy the non pmd thread. */
-        dp_netdev_destroy_all_pmds(dp, false);
-        FOR_EACH_CORE_ON_DUMP (core, pmd_cores) {
-            struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
+    if (need_to_adjust_static_tx_qids) {
+        /* 'static_tx_qid's are not sequential now.
+         * Reload remaining threads to fix this. */
+        reload_affected_pmds(dp);
+    }
 
+    /* Check for required new pmd threads */
+    FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
+        pmd = dp_netdev_get_pmd(dp, core->core_id);
+        if (!pmd) {
+            pmd = xzalloc(sizeof *pmd);
             dp_netdev_configure_pmd(pmd, dp, core->core_id, core->numa_id);
-
             pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
+            VLOG_INFO("PMD thread on numa_id: %d, core id: %2d created.",
+                      pmd->numa_id, pmd->core_id);
+            changed = true;
+        } else {
+            dp_netdev_pmd_unref(pmd);
         }
+    }
+
+    if (changed) {
+        struct ovs_numa_info_numa *numa;
 
         /* Log the number of pmd threads per numa node. */
         FOR_EACH_NUMA_ON_DUMP (numa, pmd_cores) {
-            VLOG_INFO("Created %"PRIuSIZE" pmd threads on numa node %d",
+            VLOG_INFO("There are %"PRIuSIZE" pmd threads on numa node %d",
                       numa->n_cores, numa->numa_id);
         }
     }
@@ -3356,19 +3665,6 @@ reconfigure_pmd_threads(struct dp_netdev *dp)
     ovs_numa_dump_destroy(pmd_cores);
 }
 
-static void
-reload_affected_pmds(struct dp_netdev *dp)
-{
-    struct dp_netdev_pmd_thread *pmd;
-
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        if (pmd->need_reload) {
-            dp_netdev_reload_pmd__(pmd);
-            pmd->need_reload = false;
-        }
-    }
-}
-
 static void
 pmd_remove_stale_ports(struct dp_netdev *dp,
                        struct dp_netdev_pmd_thread *pmd)
@@ -3427,8 +3723,8 @@ reconfigure_datapath(struct dp_netdev *dp)
      * need reconfiguration. */
 
     /* Check for all the ports that need reconfiguration.  We cache this in
-     * 'port->reconfigure', because netdev_is_reconf_required() can change at
-     * any time. */
+     * 'port->need_reconfigure', because netdev_is_reconf_required() can
+     * change at any time. */
     HMAP_FOR_EACH (port, node, &dp->ports) {
         if (netdev_is_reconf_required(port->netdev)) {
             port->need_reconfigure = true;
@@ -3566,21 +3862,30 @@ dpif_netdev_run(struct dpif *dpif)
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_pmd_thread *non_pmd;
     uint64_t new_tnl_seq;
+    int process_packets = 0;
 
     ovs_mutex_lock(&dp->port_mutex);
     non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
     if (non_pmd) {
         ovs_mutex_lock(&dp->non_pmd_mutex);
+        cycles_count_start(non_pmd);
         HMAP_FOR_EACH (port, node, &dp->ports) {
             if (!netdev_is_pmd(port->netdev)) {
                 int i;
 
                 for (i = 0; i < port->n_rxq; i++) {
-                    dp_netdev_process_rxq_port(non_pmd, port->rxqs[i].rx,
-                                               port->port_no);
+                    process_packets =
+                        dp_netdev_process_rxq_port(non_pmd,
+                                                   port->rxqs[i].rx,
+                                                   port->port_no);
+                    cycles_count_intermediate(non_pmd, NULL,
+                                              process_packets
+                                              ? PMD_CYCLES_PROCESSING
+                                              : PMD_CYCLES_IDLE);
                 }
             }
         }
+        cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
         dpif_netdev_xps_revalidate_pmd(non_pmd, time_msec(), false);
         ovs_mutex_unlock(&dp->non_pmd_mutex);
 
@@ -3643,7 +3948,9 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
 }
 
 /* Copies ports from 'pmd->tx_ports' (shared with the main thread) to
- * 'pmd->port_cache' (thread local) */
+ * thread-local copies. Copy to 'pmd->tnl_port_cache' if it is a tunnel
+ * device, otherwise to 'pmd->send_port_cache' if the port has at least
+ * one txq. */
 static void
 pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
     OVS_REQUIRES(pmd->port_mutex)
@@ -3669,6 +3976,28 @@ pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
     }
 }
 
+static void
+pmd_alloc_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
+{
+    ovs_mutex_lock(&pmd->dp->tx_qid_pool_mutex);
+    if (!id_pool_alloc_id(pmd->dp->tx_qid_pool, &pmd->static_tx_qid)) {
+        VLOG_ABORT("static_tx_qid allocation failed for PMD on core %2d"
+                   ", numa_id %d.", pmd->core_id, pmd->numa_id);
+    }
+    ovs_mutex_unlock(&pmd->dp->tx_qid_pool_mutex);
+
+    VLOG_DBG("static_tx_qid = %d allocated for PMD thread on core %2d"
+             ", numa_id %d.", pmd->static_tx_qid, pmd->core_id, pmd->numa_id);
+}
+
+static void
+pmd_free_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
+{
+    ovs_mutex_lock(&pmd->dp->tx_qid_pool_mutex);
+    id_pool_free_id(pmd->dp->tx_qid_pool, pmd->static_tx_qid);
+    ovs_mutex_unlock(&pmd->dp->tx_qid_pool_mutex);
+}
+
 static int
 pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
                           struct polled_queue **ppoll_list)
@@ -3683,7 +4012,7 @@ pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
 
     i = 0;
     HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
-        poll_list[i].rx = poll->rxq->rx;
+        poll_list[i].rxq = poll->rxq;
         poll_list[i].port_no = poll->rxq->port->port_no;
         i++;
     }
@@ -3705,6 +4034,7 @@ pmd_thread_main(void *f_)
     bool exiting;
     int poll_cnt;
     int i;
+    int process_packets = 0;
 
     poll_list = NULL;
 
@@ -3713,14 +4043,15 @@ pmd_thread_main(void *f_)
     ovs_numa_thread_setaffinity_core(pmd->core_id);
     dpdk_set_lcore_id(pmd->core_id);
     poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
-reload:
     emc_cache_init(&pmd->flow_cache);
+reload:
+    pmd_alloc_static_tx_qid(pmd);
 
     /* List port/core affinity */
     for (i = 0; i < poll_cnt; i++) {
        VLOG_DBG("Core %d processing port \'%s\' with queue-id %d\n",
-                pmd->core_id, netdev_rxq_get_name(poll_list[i].rx),
-                netdev_rxq_get_queue_id(poll_list[i].rx));
+                pmd->core_id, netdev_rxq_get_name(poll_list[i].rxq->rx),
+                netdev_rxq_get_queue_id(poll_list[i].rxq->rx));
     }
 
     if (!poll_cnt) {
@@ -3731,10 +4062,15 @@ reload:
         lc = UINT_MAX;
     }
 
+    cycles_count_start(pmd);
     for (;;) {
         for (i = 0; i < poll_cnt; i++) {
-            dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
-                                       poll_list[i].port_no);
+            process_packets =
+                dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx,
+                                           poll_list[i].port_no);
+            cycles_count_intermediate(pmd, poll_list[i].rxq,
+                                      process_packets ? PMD_CYCLES_PROCESSING
+                                                      : PMD_CYCLES_IDLE);
         }
 
         if (lc++ > 1024) {
@@ -3743,7 +4079,7 @@ reload:
             lc = 0;
 
             coverage_try_clear();
-            dp_netdev_pmd_try_optimize(pmd);
+            dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt);
             if (!ovsrcu_try_quiesce()) {
                 emc_cache_slow_sweep(&pmd->flow_cache);
             }
@@ -3755,18 +4091,21 @@ reload:
         }
     }
 
+    cycles_count_end(pmd, PMD_CYCLES_IDLE);
+
     poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
     exiting = latch_is_set(&pmd->exit_latch);
     /* Signal here to make sure the pmd finishes
      * reloading the updated configuration. */
     dp_netdev_pmd_reload_done(pmd);
 
-    emc_cache_uninit(&pmd->flow_cache);
+    pmd_free_static_tx_qid(pmd);
 
     if (!exiting) {
         goto reload;
     }
 
+    emc_cache_uninit(&pmd->flow_cache);
     free(poll_list);
     pmd_free_cached_ports(pmd);
     return NULL;
@@ -3799,10 +4138,11 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
 {
     struct dp_meter *meter;
     struct dp_meter_band *band;
+    struct dp_packet *packet;
     long long int long_delta_t; /* msec */
     uint32_t delta_t; /* msec */
     int i;
-    int cnt = packets_->count;
+    const size_t cnt = dp_packet_batch_size(packets_);
     uint32_t bytes, volume;
     int exceeded_band[NETDEV_MAX_BURST];
     uint32_t exceeded_rate[NETDEV_MAX_BURST];
@@ -3835,8 +4175,8 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
     meter->used = now;
     meter->packet_count += cnt;
     bytes = 0;
-    for (i = 0; i < cnt; i++) {
-        bytes += dp_packet_size(packets_->packets[i]);
+    DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+        bytes += dp_packet_size(packet);
     }
     meter->byte_count += bytes;
 
@@ -3886,8 +4226,8 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
             } else {
                 /* Packet sizes differ, must process one-by-one. */
                 band_exceeded_pkt = cnt;
-                for (i = 0; i < cnt; i++) {
-                    uint32_t bits = dp_packet_size(packets_->packets[i]) * 8;
+                DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+                    uint32_t bits = dp_packet_size(packet) * 8;
 
                     if (band->bucket >= bits) {
                         band->bucket -= bits;
@@ -3915,10 +4255,8 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
     /* Fire the highest rate band exceeded by each packet.
      * Drop packets if needed, by swapping packet to the end that will be
      * ignored. */
-    const size_t size = dp_packet_batch_size(packets_);
-    struct dp_packet *packet;
     size_t j;
-    DP_PACKET_BATCH_REFILL_FOR_EACH (j, size, packet, packets_) {
+    DP_PACKET_BATCH_REFILL_FOR_EACH (j, cnt, packet, packets_) {
         if (exceeded_band[j] >= 0) {
             /* Meter drop packet. */
             band = &meter->bands[exceeded_band[j]];
@@ -3953,10 +4291,19 @@ dpif_netdev_meter_set(struct dpif *dpif, ofproto_meter_id *meter_id,
         !(config->flags & (OFPMF13_KBPS | OFPMF13_PKTPS))) {
         return EBADF; /* Unsupported flags set */
     }
+
     /* Validate bands */
     if (config->n_bands == 0 || config->n_bands > MAX_BANDS) {
         return EINVAL; /* Too many bands */
     }
+
+    /* Validate rates */
+    for (i = 0; i < config->n_bands; i++) {
+        if (config->bands[i].rate == 0) {
+            return EDOM; /* rate must be non-zero */
+        }
+    }
+
     for (i = 0; i < config->n_bands; ++i) {
         switch (config->bands[i].type) {
         case OFPMBT13_DROP:
@@ -4172,8 +4519,6 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd->numa_id = numa_id;
     pmd->need_reload = false;
 
-    *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp->poll_threads);
-
     ovs_refcount_init(&pmd->ref_cnt);
     latch_init(&pmd->exit_latch);
     pmd->reload_seq = seq_create();
@@ -4186,6 +4531,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     cmap_init(&pmd->flow_table);
     cmap_init(&pmd->classifiers);
     pmd->next_optimization = time_msec() + DPCLS_OPTIMIZATION_INTERVAL;
+    pmd->rxq_next_cycle_store = time_msec() + PMD_RXQ_INTERVAL_LEN;
     hmap_init(&pmd->poll_list);
     hmap_init(&pmd->tx_ports);
     hmap_init(&pmd->tnl_port_cache);
@@ -4194,6 +4540,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
      * actual thread created for NON_PMD_CORE_ID. */
     if (core_id == NON_PMD_CORE_ID) {
         emc_cache_init(&pmd->flow_cache);
+        pmd_alloc_static_tx_qid(pmd);
     }
     cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
                 hash_int(core_id, 0));
@@ -4236,6 +4583,7 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
         ovs_mutex_lock(&dp->non_pmd_mutex);
         emc_cache_uninit(&pmd->flow_cache);
         pmd_free_cached_ports(pmd);
+        pmd_free_static_tx_qid(pmd);
         ovs_mutex_unlock(&dp->non_pmd_mutex);
     } else {
         latch_set(&pmd->exit_latch);
@@ -4441,6 +4789,22 @@ dp_netdev_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet_,
                          actions, wc, put_actions, dp->upcall_aux);
 }
 
+static inline uint32_t
+dpif_netdev_packet_get_rss_hash_orig_pkt(struct dp_packet *packet,
+                                const struct miniflow *mf)
+{
+    uint32_t hash;
+
+    if (OVS_LIKELY(dp_packet_rss_valid(packet))) {
+        hash = dp_packet_get_rss_hash(packet);
+    } else {
+        hash = miniflow_hash_5tuple(mf, 0);
+        dp_packet_set_rss_hash(packet, hash);
+    }
+
+    return hash;
+}
+
 static inline uint32_t
 dpif_netdev_packet_get_rss_hash(struct dp_packet *packet,
                                 const struct miniflow *mf)
@@ -4535,8 +4899,11 @@ dp_netdev_queue_batches(struct dp_packet *pkt,
  * The function returns the number of packets that needs to be processed in the
  * 'packets' array (they have been moved to the beginning of the vector).
  *
- * If 'md_is_valid' is false, the metadata in 'packets' is not valid and must
- * be initialized by this function using 'port_no'.
+ * For performance reasons a caller may choose not to initialize the metadata
+ * in 'packets_'.  If 'md_is_valid' is false, the metadata in 'packets'
+ * is not valid and must be initialized by this function using 'port_no'.
+ * If 'md_is_valid' is true, the metadata is already valid and 'port_no'
+ * will be ignored.
  */
 static inline size_t
 emc_processing(struct dp_netdev_pmd_thread *pmd,
@@ -4549,13 +4916,13 @@ emc_processing(struct dp_netdev_pmd_thread *pmd,
     struct netdev_flow_key *key = &keys[0];
     size_t n_missed = 0, n_dropped = 0;
     struct dp_packet *packet;
-    const size_t size = dp_packet_batch_size(packets_);
+    const size_t cnt = dp_packet_batch_size(packets_);
     uint32_t cur_min;
     int i;
 
     atomic_read_relaxed(&pmd->dp->emc_insert_min, &cur_min);
 
-    DP_PACKET_BATCH_REFILL_FOR_EACH (i, size, packet, packets_) {
+    DP_PACKET_BATCH_REFILL_FOR_EACH (i, cnt, packet, packets_) {
         struct dp_netdev_flow *flow;
 
         if (OVS_UNLIKELY(dp_packet_size(packet) < ETH_HEADER_LEN)) {
@@ -4564,7 +4931,7 @@ emc_processing(struct dp_netdev_pmd_thread *pmd,
             continue;
         }
 
-        if (i != size - 1) {
+        if (i != cnt - 1) {
             struct dp_packet **packets = packets_->packets;
             /* Prefetch next packet data and metadata. */
             OVS_PREFETCH(dp_packet_data(packets[i+1]));
@@ -4576,10 +4943,18 @@ emc_processing(struct dp_netdev_pmd_thread *pmd,
         }
         miniflow_extract(packet, &key->mf);
         key->len = 0; /* Not computed yet. */
-        key->hash = dpif_netdev_packet_get_rss_hash(packet, &key->mf);
-
-        /* If EMC is disabled skip emc_lookup */
-        flow = (cur_min == 0) ? NULL: emc_lookup(flow_cache, key);
+        /* If EMC is disabled skip hash computation and emc_lookup */
+        if (cur_min) {
+            if (!md_is_valid) {
+                key->hash = dpif_netdev_packet_get_rss_hash_orig_pkt(packet,
+                        &key->mf);
+            } else {
+                key->hash = dpif_netdev_packet_get_rss_hash(packet, &key->mf);
+            }
+            flow = emc_lookup(flow_cache, key);
+        } else {
+            flow = NULL;
+        }
         if (OVS_LIKELY(flow)) {
             dp_netdev_queue_batches(packet, flow, &key->mf, batches,
                                     n_batches);
@@ -4595,7 +4970,7 @@ emc_processing(struct dp_netdev_pmd_thread *pmd,
     }
 
     dp_netdev_count_packet(pmd, DP_STAT_EXACT_HIT,
-                           size - n_dropped - n_missed);
+                           cnt - n_dropped - n_missed);
 
     return dp_packet_batch_size(packets_);
 }
@@ -4676,14 +5051,14 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
                      odp_port_t in_port,
                      long long now)
 {
-    int cnt = packets_->count;
+    const size_t cnt = dp_packet_batch_size(packets_);
 #if !defined(__CHECKER__) && !defined(_WIN32)
     const size_t PKT_ARRAY_SIZE = cnt;
 #else
     /* Sparse or MSVC doesn't like variable length array. */
     enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
 #endif
-    struct dp_packet **packets = packets_->packets;
+    struct dp_packet *packet;
     struct dpcls *cls;
     struct dpcls_rule *rules[PKT_ARRAY_SIZE];
     struct dp_netdev *dp = pmd->dp;
@@ -4711,7 +5086,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
         ofpbuf_use_stub(&actions, actions_stub, sizeof actions_stub);
         ofpbuf_use_stub(&put_actions, slow_stub, sizeof slow_stub);
 
-        for (i = 0; i < cnt; i++) {
+        DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
             struct dp_netdev_flow *netdev_flow;
 
             if (OVS_LIKELY(rules[i])) {
@@ -4730,7 +5105,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
             }
 
             miss_cnt++;
-            handle_packet_upcall(pmd, packets[i], &keys[i], &actions,
+            handle_packet_upcall(pmd, packet, &keys[i], &actions,
                                  &put_actions, &lost_cnt, now);
         }
 
@@ -4738,17 +5113,16 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
         ofpbuf_uninit(&put_actions);
         fat_rwlock_unlock(&dp->upcall_rwlock);
     } else if (OVS_UNLIKELY(any_miss)) {
-        for (i = 0; i < cnt; i++) {
+        DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
             if (OVS_UNLIKELY(!rules[i])) {
-                dp_packet_delete(packets[i]);
+                dp_packet_delete(packet);
                 lost_cnt++;
                 miss_cnt++;
             }
         }
     }
 
-    for (i = 0; i < cnt; i++) {
-        struct dp_packet *packet = packets[i];
+    DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
         struct dp_netdev_flow *flow;
 
         if (OVS_UNLIKELY(!rules[i])) {
@@ -4769,18 +5143,15 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
 
 /* Packets enter the datapath from a port (or from recirculation) here.
  *
- * For performance reasons a caller may choose not to initialize the metadata
- * in 'packets': in this case 'mdinit' is false and this function needs to
- * initialize it using 'port_no'.  If the metadata in 'packets' is already
- * valid, 'md_is_valid' must be true and 'port_no' will be ignored. */
+ * When 'md_is_valid' is true the metadata in 'packets' are already valid.
+ * When false the metadata in 'packets' need to be initialized. */
 static void
 dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
                   struct dp_packet_batch *packets,
                   bool md_is_valid, odp_port_t port_no)
 {
-    int cnt = packets->count;
 #if !defined(__CHECKER__) && !defined(_WIN32)
-    const size_t PKT_ARRAY_SIZE = cnt;
+    const size_t PKT_ARRAY_SIZE = dp_packet_batch_size(packets);
 #else
     /* Sparse or MSVC doesn't like variable length array. */
     enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
@@ -4956,7 +5327,7 @@ push_tnl_action(const struct dp_netdev_pmd_thread *pmd,
 
     data = nl_attr_get(attr);
 
-    tun_port = pmd_tnl_port_cache_lookup(pmd, u32_to_odp(data->tnl_port));
+    tun_port = pmd_tnl_port_cache_lookup(pmd, data->tnl_port);
     if (!tun_port) {
         err = -EINVAL;
         goto error;
@@ -5029,24 +5400,8 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
 
     case OVS_ACTION_ATTR_TUNNEL_PUSH:
         if (*depth < MAX_RECIRC_DEPTH) {
-            struct dp_packet_batch tnl_pkt;
-            struct dp_packet_batch *orig_packets_ = packets_;
-            int err;
-
-            if (!may_steal) {
-                dp_packet_batch_clone(&tnl_pkt, packets_);
-                packets_ = &tnl_pkt;
-                dp_packet_batch_reset_cutlen(orig_packets_);
-            }
-
             dp_packet_batch_apply_cutlen(packets_);
-
-            err = push_tnl_action(pmd, a, packets_);
-            if (!err) {
-                (*depth)++;
-                dp_netdev_recirculate(pmd, packets_);
-                (*depth)--;
-            }
+            push_tnl_action(pmd, a, packets_);
             return;
         }
         break;
@@ -5275,8 +5630,8 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
         }
 
         conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, force,
-                          commit, zone, setmark, setlabel, helper,
-                          nat_action_info_ref);
+                          commit, zone, setmark, setlabel, aux->flow->tp_src,
+                          aux->flow->tp_dst, helper, nat_action_info_ref, now);
         break;
     }
 
@@ -5298,6 +5653,8 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
     case OVS_ACTION_ATTR_PUSH_ETH:
     case OVS_ACTION_ATTR_POP_ETH:
     case OVS_ACTION_ATTR_CLONE:
+    case OVS_ACTION_ATTR_ENCAP_NSH:
+    case OVS_ACTION_ATTR_DECAP_NSH:
     case __OVS_ACTION_ATTR_MAX:
         OVS_NOT_REACHED();
     }
@@ -5327,7 +5684,7 @@ struct dp_netdev_ct_dump {
 
 static int
 dpif_netdev_ct_dump_start(struct dpif *dpif, struct ct_dpif_dump_state **dump_,
-                          const uint16_t *pzone)
+                          const uint16_t *pzone, int *ptot_bkts)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_ct_dump *dump;
@@ -5336,7 +5693,7 @@ dpif_netdev_ct_dump_start(struct dpif *dpif, struct ct_dpif_dump_state **dump_,
     dump->dp = dp;
     dump->ct = &dp->conntrack;
 
-    conntrack_dump_start(&dp->conntrack, &dump->dump, pzone);
+    conntrack_dump_start(&dp->conntrack, &dump->dump, pzone, ptot_bkts);
 
     *dump_ = &dump->up;
 
@@ -5372,10 +5729,14 @@ dpif_netdev_ct_dump_done(struct dpif *dpif OVS_UNUSED,
 }
 
 static int
-dpif_netdev_ct_flush(struct dpif *dpif, const uint16_t *zone)
+dpif_netdev_ct_flush(struct dpif *dpif, const uint16_t *zone,
+                     const struct ct_dpif_tuple *tuple)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
 
+    if (tuple) {
+        return EOPNOTSUPP;
+    }
     return conntrack_flush(&dp->conntrack, zone);
 }
 
@@ -5635,11 +5996,25 @@ dpcls_sort_subtable_vector(struct dpcls *cls)
 }
 
 static inline void
-dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd)
+dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd,
+                           struct polled_queue *poll_list, int poll_cnt)
 {
     struct dpcls *cls;
     long long int now = time_msec();
 
+    if (now > pmd->rxq_next_cycle_store) {
+        /* Get the cycles that were used to process each queue and store. */
+        for (unsigned i = 0; i < poll_cnt; i++) {
+            uint64_t rxq_cyc_curr = dp_netdev_rxq_get_cycles(poll_list[i].rxq,
+                                                        RXQ_CYCLES_PROC_CURR);
+            dp_netdev_rxq_set_intrvl_cycles(poll_list[i].rxq, rxq_cyc_curr);
+            dp_netdev_rxq_set_cycles(poll_list[i].rxq, RXQ_CYCLES_PROC_CURR,
+                                     0);
+        }
+        /* Start new measuring interval */
+        pmd->rxq_next_cycle_store = now + PMD_RXQ_INTERVAL_LEN;
+    }
+
     if (now > pmd->next_optimization) {
         /* Try to obtain the flow lock to block out revalidator threads.
          * If not possible, just try next time. */