]> git.proxmox.com Git - mirror_ovs.git/blobdiff - lib/dpif-netdev.c
sparse: Add guards to prevent FreeBSD-incompatible #include order.
[mirror_ovs.git] / lib / dpif-netdev.c
index 6cc6e141cf738e238e1a1797f54d46bfff527912..6ba025b6697e8a2e1ab64ad65d9a3573b572ba88 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016, 2017 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
 #include <fcntl.h>
 #include <inttypes.h>
 #include <net/if.h>
+#include <sys/types.h>
 #include <netinet/in.h>
 #include <stdint.h>
 #include <stdlib.h>
@@ -48,6 +49,7 @@
 #include "fat-rwlock.h"
 #include "flow.h"
 #include "hmapx.h"
+#include "id-pool.h"
 #include "latch.h"
 #include "netdev.h"
 #include "netdev-vport.h"
@@ -65,7 +67,7 @@
 #include "ovs-numa.h"
 #include "ovs-rcu.h"
 #include "packets.h"
-#include "poll-loop.h"
+#include "openvswitch/poll-loop.h"
 #include "pvector.h"
 #include "random.h"
 #include "seq.h"
@@ -81,11 +83,14 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
 
 #define FLOW_DUMP_MAX_BATCH 50
 /* Use per thread recirc_depth to prevent recirculation loop. */
-#define MAX_RECIRC_DEPTH 5
+#define MAX_RECIRC_DEPTH 6
 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
 
 /* Configuration parameters. */
 enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
+enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
+enum { MAX_BANDS = 8 };         /* Maximum number of bands / meter. */
+enum { N_METER_LOCKS = 64 };    /* Maximum number of meters. */
 
 /* Protects against changes to 'dp_netdevs'. */
 static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;
@@ -97,16 +102,21 @@ static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex)
 static struct vlog_rate_limit upcall_rl = VLOG_RATE_LIMIT_INIT(600, 600);
 
 #define DP_NETDEV_CS_SUPPORTED_MASK (CS_NEW | CS_ESTABLISHED | CS_RELATED \
-                                     | CS_INVALID | CS_REPLY_DIR | CS_TRACKED)
+                                     | CS_INVALID | CS_REPLY_DIR | CS_TRACKED \
+                                     | CS_SRC_NAT | CS_DST_NAT)
 #define DP_NETDEV_CS_UNSUPPORTED_MASK (~(uint32_t)DP_NETDEV_CS_SUPPORTED_MASK)
 
 static struct odp_support dp_netdev_support = {
+    .max_vlan_headers = SIZE_MAX,
     .max_mpls_depth = SIZE_MAX,
     .recirc = true,
     .ct_state = true,
     .ct_zone = true,
     .ct_mark = true,
     .ct_label = true,
+    .ct_state_nat = true,
+    .ct_orig_tuple = true,
+    .ct_orig_tuple6 = true,
 };
 
 /* Stores a miniflow with inline values */
@@ -144,6 +154,11 @@ struct netdev_flow_key {
 #define EM_FLOW_HASH_MASK (EM_FLOW_HASH_ENTRIES - 1)
 #define EM_FLOW_HASH_SEGS 2
 
+/* Default EMC insert probability is 1 / DEFAULT_EM_FLOW_INSERT_INV_PROB */
+#define DEFAULT_EM_FLOW_INSERT_INV_PROB 100
+#define DEFAULT_EM_FLOW_INSERT_MIN (UINT32_MAX /                     \
+                                    DEFAULT_EM_FLOW_INSERT_INV_PROB)
+
 struct emc_entry {
     struct dp_netdev_flow *flow;
     struct netdev_flow_key key;   /* key.hash used for emc hash value. */
@@ -167,6 +182,14 @@ struct emc_cache {
 /* Time in ms between successive optimizations of the dpcls subtable vector */
 #define DPCLS_OPTIMIZATION_INTERVAL 1000
 
+/* Time in ms of the interval in which rxq processing cycles used in
+ * rxq to pmd assignments is measured and stored. */
+#define PMD_RXQ_INTERVAL_LEN 10000
+
+/* Number of intervals for which cycles are stored
+ * and used during rxq to pmd assignment. */
+#define PMD_RXQ_INTERVAL_MAX 6
+
 struct dpcls {
     struct cmap_node node;      /* Within dp_netdev_pmd_thread.classifiers */
     odp_port_t in_port;
@@ -193,6 +216,31 @@ static bool dpcls_lookup(struct dpcls *cls,
                          struct dpcls_rule **rules, size_t cnt,
                          int *num_lookups_p);
 \f
+/* Set of supported meter flags */
+#define DP_SUPPORTED_METER_FLAGS_MASK \
+    (OFPMF13_STATS | OFPMF13_PKTPS | OFPMF13_KBPS | OFPMF13_BURST)
+
+/* Set of supported meter band types */
+#define DP_SUPPORTED_METER_BAND_TYPES           \
+    ( 1 << OFPMBT13_DROP )
+
+struct dp_meter_band {
+    struct ofputil_meter_band up; /* type, prec_level, pad, rate, burst_size */
+    uint32_t bucket; /* In 1/1000 packets (for PKTPS), or in bits (for KBPS) */
+    uint64_t packet_count;
+    uint64_t byte_count;
+};
+
+struct dp_meter {
+    uint16_t flags;
+    uint16_t n_bands;
+    uint32_t max_delta_t;
+    uint64_t used;
+    uint64_t packet_count;
+    uint64_t byte_count;
+    struct dp_meter_band bands[];
+};
+
 /* Datapath based on the network device interface from netdev.h.
  *
  *
@@ -223,6 +271,13 @@ struct dp_netdev {
     struct hmap ports;
     struct seq *port_seq;       /* Incremented whenever a port changes. */
 
+    /* Meters. */
+    struct ovs_mutex meter_locks[N_METER_LOCKS];
+    struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
+
+    /* Probability of EMC insertions is a factor of 'emc_insert_min'.*/
+    OVS_ALIGNED_VAR(CACHE_LINE_SIZE) atomic_uint32_t emc_insert_min;
+
     /* Protects access to ofproto-dpif-upcall interface during revalidator
      * thread synchronization. */
     struct fat_rwlock upcall_rwlock;
@@ -236,6 +291,9 @@ struct dp_netdev {
 
     /* Stores all 'struct dp_netdev_pmd_thread's. */
     struct cmap poll_threads;
+    /* id pool for per thread static_tx_qid. */
+    struct id_pool *tx_qid_pool;
+    struct ovs_mutex tx_qid_pool_mutex;
 
     /* Protects the access of the 'struct dp_netdev_pmd_thread'
      * instance for non-pmd thread. */
@@ -256,6 +314,19 @@ struct dp_netdev {
     struct conntrack conntrack;
 };
 
+static void meter_lock(const struct dp_netdev *dp, uint32_t meter_id)
+    OVS_ACQUIRES(dp->meter_locks[meter_id % N_METER_LOCKS])
+{
+    ovs_mutex_lock(&dp->meter_locks[meter_id % N_METER_LOCKS]);
+}
+
+static void meter_unlock(const struct dp_netdev *dp, uint32_t meter_id)
+    OVS_RELEASES(dp->meter_locks[meter_id % N_METER_LOCKS])
+{
+    ovs_mutex_unlock(&dp->meter_locks[meter_id % N_METER_LOCKS]);
+}
+
+
 static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
                                                     odp_port_t)
     OVS_REQUIRES(dp->port_mutex);
@@ -271,29 +342,52 @@ enum dp_stat_type {
 };
 
 enum pmd_cycles_counter_type {
-    PMD_CYCLES_POLLING,         /* Cycles spent polling NICs. */
-    PMD_CYCLES_PROCESSING,      /* Cycles spent processing packets */
+    PMD_CYCLES_IDLE,            /* Cycles spent idle or unsuccessful polling */
+    PMD_CYCLES_PROCESSING,      /* Cycles spent successfully polling and
+                                 * processing polled packets */
     PMD_N_CYCLES
 };
 
+enum rxq_cycles_counter_type {
+    RXQ_CYCLES_PROC_CURR,       /* Cycles spent successfully polling and
+                                   processing packets during the current
+                                   interval. */
+    RXQ_CYCLES_PROC_HIST,       /* Total cycles of all intervals that are used
+                                   during rxq to pmd assignment. */
+    RXQ_N_CYCLES
+};
+
 #define XPS_TIMEOUT_MS 500LL
 
 /* Contained by struct dp_netdev_port's 'rxqs' member.  */
 struct dp_netdev_rxq {
-    struct netdev_rxq *rxq;
-    unsigned core_id;           /* Сore to which this queue is pinned. */
+    struct dp_netdev_port *port;
+    struct netdev_rxq *rx;
+    unsigned core_id;                  /* Core to which this queue should be
+                                          pinned. OVS_CORE_UNSPEC if the
+                                          queue doesn't need to be pinned to a
+                                          particular core. */
+    unsigned intrvl_idx;               /* Write index for 'cycles_intrvl'. */
+    struct dp_netdev_pmd_thread *pmd;  /* pmd thread that polls this queue. */
+
+    /* Counters of cycles spent successfully polling and processing pkts. */
+    atomic_ullong cycles[RXQ_N_CYCLES];
+    /* We store PMD_RXQ_INTERVAL_MAX intervals of data for an rxq and then
+       sum them to yield the cycles used for an rxq. */
+    atomic_ullong cycles_intrvl[PMD_RXQ_INTERVAL_MAX];
 };
 
 /* A port in a netdev-based datapath. */
 struct dp_netdev_port {
     odp_port_t port_no;
+    bool dynamic_txqs;          /* If true XPS will be used. */
+    bool need_reconfigure;      /* True if we should reconfigure netdev. */
     struct netdev *netdev;
     struct hmap_node node;      /* Node in dp_netdev's 'ports'. */
     struct netdev_saved_flags *sf;
     struct dp_netdev_rxq *rxqs;
-    unsigned n_rxq;             /* Number of elements in 'rxq' */
-    bool dynamic_txqs;          /* If true XPS will be used. */
-    unsigned *txq_used;         /* Number of threads that uses each tx queue. */
+    unsigned n_rxq;             /* Number of elements in 'rxqs' */
+    unsigned *txq_used;         /* Number of threads that use each tx queue. */
     struct ovs_mutex txq_used_mutex;
     char *type;                 /* Port type as requested by user. */
     char *rxq_affinity_list;    /* Requested affinity of rx queues. */
@@ -381,7 +475,7 @@ struct dp_netdev_flow {
 static void dp_netdev_flow_unref(struct dp_netdev_flow *);
 static bool dp_netdev_flow_ref(struct dp_netdev_flow *);
 static int dpif_netdev_flow_from_nlattrs(const struct nlattr *, uint32_t,
-                                         struct flow *);
+                                         struct flow *, bool);
 
 /* A set of datapath actions within a "struct dp_netdev_flow".
  *
@@ -415,11 +509,15 @@ struct dp_netdev_pmd_cycles {
     atomic_ullong n[PMD_N_CYCLES];
 };
 
+struct polled_queue {
+    struct dp_netdev_rxq *rxq;
+    odp_port_t port_no;
+};
+
 /* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */
 struct rxq_poll {
-    struct dp_netdev_port *port;
-    struct netdev_rxq *rx;
-    struct ovs_list node;
+    struct dp_netdev_rxq *rxq;
+    struct hmap_node node;
 };
 
 /* Contained by struct dp_netdev_pmd_thread's 'send_port_cache',
@@ -443,9 +541,11 @@ struct tx_port {
  * I/O of all non-pmd threads.  There will be no actual thread created
  * for the instance.
  *
- * Each struct has its own flow table and classifier.  Packets received
- * from managed ports are looked up in the corresponding pmd thread's
- * flow table, and are executed with the found actions.
+ * Each struct has its own flow cache and classifier per managed ingress port.
+ * For packets received on ingress port, a look up is done on corresponding PMD
+ * thread's flow cache and in case of a miss, lookup is performed in the
+ * corresponding classifier of port.  Packets are executed with the found
+ * actions in either case.
  * */
 struct dp_netdev_pmd_thread {
     struct dp_netdev *dp;
@@ -474,6 +574,9 @@ struct dp_netdev_pmd_thread {
     struct cmap classifiers;
     /* Periodically sort subtable vectors according to hit frequencies */
     long long int next_optimization;
+    /* End of the next time interval for which processing cycles
+       are stored for each polled rxq. */
+    long long int rxq_next_cycle_store;
 
     /* Statistics. */
     struct dp_netdev_pmd_stats stats;
@@ -495,14 +598,12 @@ struct dp_netdev_pmd_thread {
 
     /* Queue id used by this pmd thread to send packets on all netdevs if
      * XPS disabled for this netdev. All static_tx_qid's are unique and less
-     * than 'ovs_numa_get_n_cores() + 1'. */
-    atomic_int static_tx_qid;
+     * than 'cmap_count(dp->poll_threads)'. */
+    uint32_t static_tx_qid;
 
     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
     /* List of rx queues to poll. */
-    struct ovs_list poll_list OVS_GUARDED;
-    /* Number of elements in 'poll_list' */
-    int poll_cnt;
+    struct hmap poll_list OVS_GUARDED;
     /* Map of 'tx_port's used for transmission.  Written by the main thread,
      * read by the pmd thread. */
     struct hmap tx_ports OVS_GUARDED;
@@ -526,6 +627,9 @@ struct dp_netdev_pmd_thread {
      * reporting to the user */
     unsigned long long stats_zero[DP_N_STATS];
     uint64_t cycles_zero[PMD_N_CYCLES];
+
+    /* Set to true if the pmd thread needs to be reloaded. */
+    bool need_reload;
 };
 
 /* Interface to netdev-based datapath. */
@@ -570,29 +674,28 @@ static void dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_set_nonpmd(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex);
 
+static void *pmd_thread_main(void *);
 static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp,
                                                       unsigned core_id);
 static struct dp_netdev_pmd_thread *
 dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
-static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
-static void dp_netdev_stop_pmds(struct dp_netdev *dp);
-static void dp_netdev_start_pmds(struct dp_netdev *dp)
-    OVS_REQUIRES(dp->port_mutex);
+static void dp_netdev_del_pmd(struct dp_netdev *dp,
+                              struct dp_netdev_pmd_thread *pmd);
+static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd);
 static void dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd);
-static void dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
-                                             struct dp_netdev_port *port);
-static void dp_netdev_add_port_to_pmds(struct dp_netdev *dp,
-                                       struct dp_netdev_port *port);
 static void dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
-                                         struct dp_netdev_port *port);
+                                         struct dp_netdev_port *port)
+    OVS_REQUIRES(pmd->port_mutex);
+static void dp_netdev_del_port_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
+                                           struct tx_port *tx)
+    OVS_REQUIRES(pmd->port_mutex);
 static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
-                                     struct dp_netdev_port *port,
-                                     struct netdev_rxq *rx);
-static struct dp_netdev_pmd_thread *
-dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
-static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
-    OVS_REQUIRES(dp->port_mutex);
-static void reconfigure_pmd_threads(struct dp_netdev *dp)
+                                     struct dp_netdev_rxq *rxq)
+    OVS_REQUIRES(pmd->port_mutex);
+static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
+                                       struct rxq_poll *poll)
+    OVS_REQUIRES(pmd->port_mutex);
+static void reconfigure_datapath(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex);
 static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
@@ -600,8 +703,20 @@ static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd);
 static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
     OVS_REQUIRES(pmd->port_mutex);
 static inline void
-dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd);
-
+dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd,
+                           struct polled_queue *poll_list, int poll_cnt);
+static void
+dp_netdev_rxq_set_cycles(struct dp_netdev_rxq *rx,
+                         enum rxq_cycles_counter_type type,
+                         unsigned long long cycles);
+static uint64_t
+dp_netdev_rxq_get_cycles(struct dp_netdev_rxq *rx,
+                         enum rxq_cycles_counter_type type);
+static void
+dp_netdev_rxq_set_intrvl_cycles(struct dp_netdev_rxq *rx,
+                           unsigned long long cycles);
+static uint64_t
+dp_netdev_rxq_get_intrvl_cycles(struct dp_netdev_rxq *rx, unsigned idx);
 static void
 dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
                                long long now, bool purge);
@@ -611,6 +726,8 @@ static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
 static inline bool emc_entry_alive(struct emc_entry *ce);
 static void emc_clear_entry(struct emc_entry *ce);
 
+static void dp_netdev_request_reconfigure(struct dp_netdev *dp);
+
 static void
 emc_cache_init(struct emc_cache *flow_cache)
 {
@@ -680,7 +797,7 @@ pmd_info_show_stats(struct ds *reply,
                     unsigned long long stats[DP_N_STATS],
                     uint64_t cycles[PMD_N_CYCLES])
 {
-    unsigned long long total_packets = 0;
+    unsigned long long total_packets;
     uint64_t total_cycles = 0;
     int i;
 
@@ -696,13 +813,12 @@ pmd_info_show_stats(struct ds *reply,
         } else {
             stats[i] = 0;
         }
-
-        if (i != DP_STAT_LOST) {
-            /* Lost packets are already included in DP_STAT_MISS */
-            total_packets += stats[i];
-        }
     }
 
+    /* Sum of all the matched and not matched packets gives the total.  */
+    total_packets = stats[DP_STAT_EXACT_HIT] + stats[DP_STAT_MASKED_HIT]
+                    + stats[DP_STAT_MISS];
+
     for (i = 0; i < PMD_N_CYCLES; i++) {
         if (cycles[i] > pmd->cycles_zero[i]) {
            cycles[i] -= pmd->cycles_zero[i];
@@ -739,10 +855,10 @@ pmd_info_show_stats(struct ds *reply,
     }
 
     ds_put_format(reply,
-                  "\tpolling cycles:%"PRIu64" (%.02f%%)\n"
+                  "\tidle cycles:%"PRIu64" (%.02f%%)\n"
                   "\tprocessing cycles:%"PRIu64" (%.02f%%)\n",
-                  cycles[PMD_CYCLES_POLLING],
-                  cycles[PMD_CYCLES_POLLING] / (double)total_cycles * 100,
+                  cycles[PMD_CYCLES_IDLE],
+                  cycles[PMD_CYCLES_IDLE] / (double)total_cycles * 100,
                   cycles[PMD_CYCLES_PROCESSING],
                   cycles[PMD_CYCLES_PROCESSING] / (double)total_cycles * 100);
 
@@ -783,12 +899,55 @@ pmd_info_clear_stats(struct ds *reply OVS_UNUSED,
     }
 }
 
+static int
+compare_poll_list(const void *a_, const void *b_)
+{
+    const struct rxq_poll *a = a_;
+    const struct rxq_poll *b = b_;
+
+    const char *namea = netdev_rxq_get_name(a->rxq->rx);
+    const char *nameb = netdev_rxq_get_name(b->rxq->rx);
+
+    int cmp = strcmp(namea, nameb);
+    if (!cmp) {
+        return netdev_rxq_get_queue_id(a->rxq->rx)
+               - netdev_rxq_get_queue_id(b->rxq->rx);
+    } else {
+        return cmp;
+    }
+}
+
+static void
+sorted_poll_list(struct dp_netdev_pmd_thread *pmd, struct rxq_poll **list,
+                 size_t *n)
+{
+    struct rxq_poll *ret, *poll;
+    size_t i;
+
+    *n = hmap_count(&pmd->poll_list);
+    if (!*n) {
+        ret = NULL;
+    } else {
+        ret = xcalloc(*n, sizeof *ret);
+        i = 0;
+        HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+            ret[i] = *poll;
+            i++;
+        }
+        ovs_assert(i == *n);
+        qsort(ret, *n, sizeof *ret, compare_poll_list);
+    }
+
+    *list = ret;
+}
+
 static void
 pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd)
 {
     if (pmd->core_id != NON_PMD_CORE_ID) {
-        struct rxq_poll *poll;
         const char *prev_name = NULL;
+        struct rxq_poll *list;
+        size_t i, n;
 
         ds_put_format(reply,
                       "pmd thread numa_id %d core_id %u:\n\tisolated : %s\n",
@@ -796,22 +955,98 @@ pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd)
                                                   ? "true" : "false");
 
         ovs_mutex_lock(&pmd->port_mutex);
-        LIST_FOR_EACH (poll, node, &pmd->poll_list) {
-            const char *name = netdev_get_name(poll->port->netdev);
+        sorted_poll_list(pmd, &list, &n);
+        for (i = 0; i < n; i++) {
+            const char *name = netdev_rxq_get_name(list[i].rxq->rx);
 
             if (!prev_name || strcmp(name, prev_name)) {
                 if (prev_name) {
                     ds_put_cstr(reply, "\n");
                 }
-                ds_put_format(reply, "\tport: %s\tqueue-id:",
-                              netdev_get_name(poll->port->netdev));
+                ds_put_format(reply, "\tport: %s\tqueue-id:", name);
             }
-            ds_put_format(reply, " %d", netdev_rxq_get_queue_id(poll->rx));
+            ds_put_format(reply, " %d",
+                          netdev_rxq_get_queue_id(list[i].rxq->rx));
             prev_name = name;
         }
         ovs_mutex_unlock(&pmd->port_mutex);
         ds_put_cstr(reply, "\n");
+        free(list);
+    }
+}
+
+static int
+compare_poll_thread_list(const void *a_, const void *b_)
+{
+    const struct dp_netdev_pmd_thread *a, *b;
+
+    a = *(struct dp_netdev_pmd_thread **)a_;
+    b = *(struct dp_netdev_pmd_thread **)b_;
+
+    if (a->core_id < b->core_id) {
+        return -1;
+    }
+    if (a->core_id > b->core_id) {
+        return 1;
+    }
+    return 0;
+}
+
+/* Create a sorted list of pmd's from the dp->poll_threads cmap. We can use
+ * this list, as long as we do not go to quiescent state. */
+static void
+sorted_poll_thread_list(struct dp_netdev *dp,
+                        struct dp_netdev_pmd_thread ***list,
+                        size_t *n)
+{
+    struct dp_netdev_pmd_thread *pmd;
+    struct dp_netdev_pmd_thread **pmd_list;
+    size_t k = 0, n_pmds;
+
+    n_pmds = cmap_count(&dp->poll_threads);
+    pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (k >= n_pmds) {
+            break;
+        }
+        pmd_list[k++] = pmd;
+    }
+
+    qsort(pmd_list, k, sizeof *pmd_list, compare_poll_thread_list);
+
+    *list = pmd_list;
+    *n = k;
+}
+
+static void
+dpif_netdev_pmd_rebalance(struct unixctl_conn *conn, int argc,
+                          const char *argv[], void *aux OVS_UNUSED)
+{
+    struct ds reply = DS_EMPTY_INITIALIZER;
+    struct dp_netdev *dp = NULL;
+
+    ovs_mutex_lock(&dp_netdev_mutex);
+
+    if (argc == 2) {
+        dp = shash_find_data(&dp_netdevs, argv[1]);
+    } else if (shash_count(&dp_netdevs) == 1) {
+        /* There's only one datapath */
+        dp = shash_first(&dp_netdevs)->data;
+    }
+
+    if (!dp) {
+        ovs_mutex_unlock(&dp_netdev_mutex);
+        unixctl_command_reply_error(conn,
+                                    "please specify an existing datapath");
+        return;
     }
+
+    dp_netdev_request_reconfigure(dp);
+    ovs_mutex_unlock(&dp_netdev_mutex);
+    ds_put_cstr(&reply, "pmd rxq rebalance requested.\n");
+    unixctl_command_reply(conn, ds_cstr(&reply));
+    ds_destroy(&reply);
 }
 
 static void
@@ -819,8 +1054,9 @@ dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
                      void *aux)
 {
     struct ds reply = DS_EMPTY_INITIALIZER;
-    struct dp_netdev_pmd_thread *pmd;
+    struct dp_netdev_pmd_thread **pmd_list;
     struct dp_netdev *dp = NULL;
+    size_t n;
     enum pmd_info_type type = *(enum pmd_info_type *) aux;
 
     ovs_mutex_lock(&dp_netdev_mutex);
@@ -839,20 +1075,25 @@ dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
         return;
     }
 
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+    sorted_poll_thread_list(dp, &pmd_list, &n);
+    for (size_t i = 0; i < n; i++) {
+        struct dp_netdev_pmd_thread *pmd = pmd_list[i];
+        if (!pmd) {
+            break;
+        }
+
         if (type == PMD_INFO_SHOW_RXQ) {
             pmd_info_show_rxq(&reply, pmd);
         } else {
             unsigned long long stats[DP_N_STATS];
             uint64_t cycles[PMD_N_CYCLES];
-            int i;
 
             /* Read current stats and cycle counters */
-            for (i = 0; i < ARRAY_SIZE(stats); i++) {
-                atomic_read_relaxed(&pmd->stats.n[i], &stats[i]);
+            for (size_t j = 0; j < ARRAY_SIZE(stats); j++) {
+                atomic_read_relaxed(&pmd->stats.n[j], &stats[j]);
             }
-            for (i = 0; i < ARRAY_SIZE(cycles); i++) {
-                atomic_read_relaxed(&pmd->cycles.n[i], &cycles[i]);
+            for (size_t j = 0; j < ARRAY_SIZE(cycles); j++) {
+                atomic_read_relaxed(&pmd->cycles.n[j], &cycles[j]);
             }
 
             if (type == PMD_INFO_CLEAR_STATS) {
@@ -862,6 +1103,7 @@ dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
             }
         }
     }
+    free(pmd_list);
 
     ovs_mutex_unlock(&dp_netdev_mutex);
 
@@ -885,6 +1127,9 @@ dpif_netdev_init(void)
     unixctl_command_register("dpif-netdev/pmd-rxq-show", "[dp]",
                              0, 1, dpif_netdev_pmd_info,
                              (void *)&poll_aux);
+    unixctl_command_register("dpif-netdev/pmd-rxq-rebalance", "[dp]",
+                             0, 1, dpif_netdev_pmd_rebalance,
+                             NULL);
     return 0;
 }
 
@@ -1005,6 +1250,10 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     dp->reconfigure_seq = seq_create();
     dp->last_reconfigure_seq = seq_read(dp->reconfigure_seq);
 
+    for (int i = 0; i < N_METER_LOCKS; ++i) {
+        ovs_mutex_init_adaptive(&dp->meter_locks[i]);
+    }
+
     /* Disable upcalls by default. */
     dp_netdev_disable_upcall(dp);
     dp->upcall_aux = NULL;
@@ -1012,11 +1261,20 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
 
     conntrack_init(&dp->conntrack);
 
+    atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
+
     cmap_init(&dp->poll_threads);
+
+    ovs_mutex_init(&dp->tx_qid_pool_mutex);
+    /* We need 1 Tx queue for each possible core + 1 for non-PMD threads. */
+    dp->tx_qid_pool = id_pool_create(0, ovs_numa_get_n_cores() + 1);
+
     ovs_mutex_init_recursive(&dp->non_pmd_mutex);
     ovsthread_key_create(&dp->per_pmd_key, NULL);
 
     ovs_mutex_lock(&dp->port_mutex);
+    /* non-PMD will be created before all other threads and will
+     * allocate static_tx_qid = 0. */
     dp_netdev_set_nonpmd(dp);
 
     error = do_add_port(dp, name, dpif_netdev_port_open_type(dp->class,
@@ -1082,6 +1340,16 @@ dp_netdev_destroy_upcall_lock(struct dp_netdev *dp)
     fat_rwlock_destroy(&dp->upcall_rwlock);
 }
 
+static void
+dp_delete_meter(struct dp_netdev *dp, uint32_t meter_id)
+    OVS_REQUIRES(dp->meter_locks[meter_id % N_METER_LOCKS])
+{
+    if (dp->meters[meter_id]) {
+        free(dp->meters[meter_id]);
+        dp->meters[meter_id] = NULL;
+    }
+}
+
 /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
  * through the 'dp_netdevs' shash while freeing 'dp'. */
 static void
@@ -1097,9 +1365,13 @@ dp_netdev_free(struct dp_netdev *dp)
         do_del_port(dp, port);
     }
     ovs_mutex_unlock(&dp->port_mutex);
-    dp_netdev_destroy_all_pmds(dp);
+
+    dp_netdev_destroy_all_pmds(dp, true);
     cmap_destroy(&dp->poll_threads);
 
+    ovs_mutex_destroy(&dp->tx_qid_pool_mutex);
+    id_pool_destroy(dp->tx_qid_pool);
+
     ovs_mutex_destroy(&dp->non_pmd_mutex);
     ovsthread_key_delete(dp->per_pmd_key);
 
@@ -1115,6 +1387,17 @@ dp_netdev_free(struct dp_netdev *dp)
     /* Upcalls must be disabled at this point */
     dp_netdev_destroy_upcall_lock(dp);
 
+    int i;
+
+    for (i = 0; i < MAX_METERS; ++i) {
+        meter_lock(dp, i);
+        dp_delete_meter(dp, i);
+        meter_unlock(dp, i);
+    }
+    for (i = 0; i < N_METER_LOCKS; ++i) {
+        ovs_mutex_destroy(&dp->meter_locks[i]);
+    }
+
     free(dp->pmd_cmask);
     free(CONST_CAST(char *, dp->name));
     free(dp);
@@ -1232,10 +1515,7 @@ port_create(const char *devname, const char *type,
     struct dp_netdev_port *port;
     enum netdev_flags flags;
     struct netdev *netdev;
-    int n_open_rxqs = 0;
-    int n_cores = 0;
-    int i, error;
-    bool dynamic_txqs = false;
+    int error;
 
     *portp = NULL;
 
@@ -1253,78 +1533,24 @@ port_create(const char *devname, const char *type,
         goto out;
     }
 
-    if (netdev_is_pmd(netdev)) {
-        n_cores = ovs_numa_get_n_cores();
-
-        if (n_cores == OVS_CORE_UNSPEC) {
-            VLOG_ERR("%s, cannot get cpu core info", devname);
-            error = ENOENT;
-            goto out;
-        }
-        /* There can only be ovs_numa_get_n_cores() pmd threads,
-         * so creates a txq for each, and one extra for the non
-         * pmd threads. */
-        error = netdev_set_tx_multiq(netdev, n_cores + 1);
-        if (error && (error != EOPNOTSUPP)) {
-            VLOG_ERR("%s, cannot set multiq", devname);
-            goto out;
-        }
-    }
-
-    if (netdev_is_reconf_required(netdev)) {
-        error = netdev_reconfigure(netdev);
-        if (error) {
-            goto out;
-        }
-    }
-
-    if (netdev_is_pmd(netdev)) {
-        if (netdev_n_txq(netdev) < n_cores + 1) {
-            dynamic_txqs = true;
-        }
+    error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
+    if (error) {
+        VLOG_ERR("%s: cannot set promisc flag", devname);
+        goto out;
     }
 
     port = xzalloc(sizeof *port);
     port->port_no = port_no;
     port->netdev = netdev;
-    port->n_rxq = netdev_n_rxq(netdev);
-    port->rxqs = xcalloc(port->n_rxq, sizeof *port->rxqs);
-    port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
     port->type = xstrdup(type);
-    ovs_mutex_init(&port->txq_used_mutex);
-    port->dynamic_txqs = dynamic_txqs;
-
-    for (i = 0; i < port->n_rxq; i++) {
-        error = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);
-        if (error) {
-            VLOG_ERR("%s: cannot receive packets on this network device (%s)",
-                     devname, ovs_strerror(errno));
-            goto out_rxq_close;
-        }
-        port->rxqs[i].core_id = OVS_CORE_UNSPEC;
-        n_open_rxqs++;
-    }
-
-    error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
-    if (error) {
-        goto out_rxq_close;
-    }
     port->sf = sf;
+    port->need_reconfigure = true;
+    ovs_mutex_init(&port->txq_used_mutex);
 
     *portp = port;
 
     return 0;
 
-out_rxq_close:
-    for (i = 0; i < n_open_rxqs; i++) {
-        netdev_rxq_close(port->rxqs[i].rxq);
-    }
-    ovs_mutex_destroy(&port->txq_used_mutex);
-    free(port->type);
-    free(port->txq_used);
-    free(port->rxqs);
-    free(port);
-
 out:
     netdev_close(netdev);
     return error;
@@ -1348,15 +1574,11 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
         return error;
     }
 
-    if (netdev_is_pmd(port->netdev)) {
-        dp_netdev_start_pmds(dp);
-    }
-
-    dp_netdev_add_port_to_pmds(dp, port);
-
     hmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
     seq_change(dp->port_seq);
 
+    reconfigure_datapath(dp);
+
     return 0;
 }
 
@@ -1455,7 +1677,7 @@ port_destroy(struct dp_netdev_port *port)
     netdev_restore_flags(port->sf);
 
     for (unsigned i = 0; i < port->n_rxq; i++) {
-        netdev_rxq_close(port->rxqs[i].rxq);
+        netdev_rxq_close(port->rxqs[i].rx);
     }
     ovs_mutex_destroy(&port->txq_used_mutex);
     free(port->rxq_affinity_list);
@@ -1484,13 +1706,6 @@ get_port_by_name(struct dp_netdev *dp,
     return ENODEV;
 }
 
-static int
-get_n_pmd_threads(struct dp_netdev *dp)
-{
-    /* There is one non pmd thread in dp->poll_threads */
-    return cmap_count(&dp->poll_threads) - 1;
-}
-
 /* Returns 'true' if there is a port with pmd netdev. */
 static bool
 has_pmd_port(struct dp_netdev *dp)
@@ -1507,7 +1722,6 @@ has_pmd_port(struct dp_netdev *dp)
     return false;
 }
 
-
 static void
 do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     OVS_REQUIRES(dp->port_mutex)
@@ -1515,14 +1729,7 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     hmap_remove(&dp->ports, &port->node);
     seq_change(dp->port_seq);
 
-    dp_netdev_del_port_from_all_pmds(dp, port);
-
-    if (netdev_is_pmd(port->netdev)) {
-        /* If there is no pmd netdev, delete the pmd threads */
-        if (!has_pmd_port(dp)) {
-            dp_netdev_stop_pmds(dp);
-        }
-    }
+    reconfigure_datapath(dp);
 
     port_destroy(port);
 }
@@ -1804,24 +2011,6 @@ netdev_flow_key_clone(struct netdev_flow_key *dst,
            offsetof(struct netdev_flow_key, mf) + src->len);
 }
 
-/* Slow. */
-static void
-netdev_flow_key_from_flow(struct netdev_flow_key *dst,
-                          const struct flow *src)
-{
-    struct dp_packet packet;
-    uint64_t buf_stub[512 / 8];
-
-    dp_packet_use_stub(&packet, buf_stub, sizeof buf_stub);
-    pkt_metadata_from_flow(&packet.md, src);
-    flow_compose(&packet, src);
-    miniflow_extract(&packet, &dst->mf);
-    dp_packet_uninit(&packet);
-
-    dst->len = netdev_flow_key_size(miniflow_n_values(&dst->mf));
-    dst->hash = 0; /* Not computed yet. */
-}
-
 /* Initialize a netdev_flow_key 'mask' from 'match'. */
 static inline void
 netdev_flow_mask_init(struct netdev_flow_key *mask,
@@ -1965,6 +2154,23 @@ emc_insert(struct emc_cache *cache, const struct netdev_flow_key *key,
     emc_change_entry(to_be_replaced, flow, key);
 }
 
+static inline void
+emc_probabilistic_insert(struct dp_netdev_pmd_thread *pmd,
+                         const struct netdev_flow_key *key,
+                         struct dp_netdev_flow *flow)
+{
+    /* Insert an entry into the EMC based on probability value 'min'. By
+     * default the value is UINT32_MAX / 100 which yields an insertion
+     * probability of 1/100 ie. 1% */
+
+    uint32_t min;
+    atomic_read_relaxed(&pmd->dp->emc_insert_min, &min);
+
+    if (min && random_uint32() <= min) {
+        emc_insert(&pmd->flow_cache, key, flow);
+    }
+}
+
 static inline struct dp_netdev_flow *
 emc_lookup(struct emc_cache *cache, const struct netdev_flow_key *key)
 {
@@ -2012,7 +2218,7 @@ dp_netdev_pmd_find_flow(const struct dp_netdev_pmd_thread *pmd,
 
     /* If a UFID is not provided, determine one based on the key. */
     if (!ufidp && key && key_len
-        && !dpif_netdev_flow_from_nlattrs(key, key_len, &flow)) {
+        && !dpif_netdev_flow_from_nlattrs(key, key_len, &flow, false)) {
         dpif_flow_hash(pmd->dp->dpif, &flow, sizeof flow, &ufid);
         ufidp = &ufid;
     }
@@ -2105,27 +2311,29 @@ static int
 dpif_netdev_mask_from_nlattrs(const struct nlattr *key, uint32_t key_len,
                               const struct nlattr *mask_key,
                               uint32_t mask_key_len, const struct flow *flow,
-                              struct flow_wildcards *wc)
+                              struct flow_wildcards *wc, bool probe)
 {
     enum odp_key_fitness fitness;
 
     fitness = odp_flow_key_to_mask(mask_key, mask_key_len, wc, flow);
     if (fitness) {
-        /* This should not happen: it indicates that
-         * odp_flow_key_from_mask() and odp_flow_key_to_mask()
-         * disagree on the acceptable form of a mask.  Log the problem
-         * as an error, with enough details to enable debugging. */
-        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
-
-        if (!VLOG_DROP_ERR(&rl)) {
-            struct ds s;
-
-            ds_init(&s);
-            odp_flow_format(key, key_len, mask_key, mask_key_len, NULL, &s,
-                            true);
-            VLOG_ERR("internal error parsing flow mask %s (%s)",
-                     ds_cstr(&s), odp_key_fitness_to_string(fitness));
-            ds_destroy(&s);
+        if (!probe) {
+            /* This should not happen: it indicates that
+             * odp_flow_key_from_mask() and odp_flow_key_to_mask()
+             * disagree on the acceptable form of a mask.  Log the problem
+             * as an error, with enough details to enable debugging. */
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+
+            if (!VLOG_DROP_ERR(&rl)) {
+                struct ds s;
+
+                ds_init(&s);
+                odp_flow_format(key, key_len, mask_key, mask_key_len, NULL, &s,
+                                true);
+                VLOG_ERR("internal error parsing flow mask %s (%s)",
+                ds_cstr(&s), odp_key_fitness_to_string(fitness));
+                ds_destroy(&s);
+            }
         }
 
         return EINVAL;
@@ -2136,34 +2344,29 @@ dpif_netdev_mask_from_nlattrs(const struct nlattr *key, uint32_t key_len,
 
 static int
 dpif_netdev_flow_from_nlattrs(const struct nlattr *key, uint32_t key_len,
-                              struct flow *flow)
+                              struct flow *flow, bool probe)
 {
-    odp_port_t in_port;
-
     if (odp_flow_key_to_flow(key, key_len, flow)) {
-        /* This should not happen: it indicates that odp_flow_key_from_flow()
-         * and odp_flow_key_to_flow() disagree on the acceptable form of a
-         * flow.  Log the problem as an error, with enough details to enable
-         * debugging. */
-        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
-
-        if (!VLOG_DROP_ERR(&rl)) {
-            struct ds s;
-
-            ds_init(&s);
-            odp_flow_format(key, key_len, NULL, 0, NULL, &s, true);
-            VLOG_ERR("internal error parsing flow key %s", ds_cstr(&s));
-            ds_destroy(&s);
+        if (!probe) {
+            /* This should not happen: it indicates that
+             * odp_flow_key_from_flow() and odp_flow_key_to_flow() disagree on
+             * the acceptable form of a flow.  Log the problem as an error,
+             * with enough details to enable debugging. */
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+
+            if (!VLOG_DROP_ERR(&rl)) {
+                struct ds s;
+
+                ds_init(&s);
+                odp_flow_format(key, key_len, NULL, 0, NULL, &s, true);
+                VLOG_ERR("internal error parsing flow key %s", ds_cstr(&s));
+                ds_destroy(&s);
+            }
         }
 
         return EINVAL;
     }
 
-    in_port = flow->in_port.odp_port;
-    if (!is_valid_port_number(in_port) && in_port != ODPP_NONE) {
-        return EINVAL;
-    }
-
     if (flow->ct_state & DP_NETDEV_CS_UNSUPPORTED_MASK) {
         return EINVAL;
     }
@@ -2272,7 +2475,7 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
     cmap_insert(&pmd->flow_table, CONST_CAST(struct cmap_node *, &flow->node),
                 dp_netdev_flow_hash(&flow->ufid));
 
-    if (OVS_UNLIKELY(VLOG_IS_DBG_ENABLED())) {
+    if (OVS_UNLIKELY(!VLOG_DROP_DBG((&upcall_rl)))) {
         struct ds ds = DS_EMPTY_INITIALIZER;
         struct ofpbuf key_buf, mask_buf;
         struct odp_flow_key_parms odp_parms = {
@@ -2295,12 +2498,24 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
                         mask_buf.data, mask_buf.size,
                         NULL, &ds, false);
         ds_put_cstr(&ds, ", actions:");
-        format_odp_actions(&ds, actions, actions_len);
+        format_odp_actions(&ds, actions, actions_len, NULL);
 
-        VLOG_DBG_RL(&upcall_rl, "%s", ds_cstr(&ds));
+        VLOG_DBG("%s", ds_cstr(&ds));
 
         ofpbuf_uninit(&key_buf);
         ofpbuf_uninit(&mask_buf);
+
+        /* Add a printout of the actual match installed. */
+        struct match m;
+        ds_clear(&ds);
+        ds_put_cstr(&ds, "flow match: ");
+        miniflow_expand(&flow->cr.flow.mf, &m.flow);
+        miniflow_expand(&flow->cr.mask->mf, &m.wc.masks);
+        memset(&m.tun_md, 0, sizeof m.tun_md);
+        match_format(&m, NULL, &ds, OFP_DEFAULT_PRIORITY);
+
+        VLOG_DBG("%s", ds_cstr(&ds));
+
         ds_destroy(&ds);
     }
 
@@ -2308,54 +2523,26 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
 }
 
 static int
-dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
+flow_put_on_pmd(struct dp_netdev_pmd_thread *pmd,
+                struct netdev_flow_key *key,
+                struct match *match,
+                ovs_u128 *ufid,
+                const struct dpif_flow_put *put,
+                struct dpif_flow_stats *stats)
 {
-    struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_flow *netdev_flow;
-    struct netdev_flow_key key;
-    struct dp_netdev_pmd_thread *pmd;
-    struct match match;
-    ovs_u128 ufid;
-    unsigned pmd_id = put->pmd_id == PMD_ID_NULL
-                      ? NON_PMD_CORE_ID : put->pmd_id;
-    int error;
-
-    error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &match.flow);
-    if (error) {
-        return error;
-    }
-    error = dpif_netdev_mask_from_nlattrs(put->key, put->key_len,
-                                          put->mask, put->mask_len,
-                                          &match.flow, &match.wc);
-    if (error) {
-        return error;
-    }
-
-    pmd = dp_netdev_get_pmd(dp, pmd_id);
-    if (!pmd) {
-        return EINVAL;
-    }
-
-    /* Must produce a netdev_flow_key for lookup.
-     * This interface is no longer performance critical, since it is not used
-     * for upcall processing any more. */
-    netdev_flow_key_from_flow(&key, &match.flow);
+    int error = 0;
 
-    if (put->ufid) {
-        ufid = *put->ufid;
-    } else {
-        dpif_flow_hash(dpif, &match.flow, sizeof match.flow, &ufid);
+    if (stats) {
+        memset(stats, 0, sizeof *stats);
     }
 
     ovs_mutex_lock(&pmd->flow_mutex);
-    netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &key, NULL);
+    netdev_flow = dp_netdev_pmd_lookup_flow(pmd, key, NULL);
     if (!netdev_flow) {
         if (put->flags & DPIF_FP_CREATE) {
             if (cmap_count(&pmd->flow_table) < MAX_FLOWS) {
-                if (put->stats) {
-                    memset(put->stats, 0, sizeof *put->stats);
-                }
-                dp_netdev_flow_add(pmd, &match, &ufid, put->actions,
+                dp_netdev_flow_add(pmd, match, ufid, put->actions,
                                    put->actions_len);
                 error = 0;
             } else {
@@ -2365,8 +2552,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
             error = ENOENT;
         }
     } else {
-        if (put->flags & DPIF_FP_MODIFY
-            && flow_equal(&match.flow, &netdev_flow->flow)) {
+        if (put->flags & DPIF_FP_MODIFY) {
             struct dp_netdev_actions *new_actions;
             struct dp_netdev_actions *old_actions;
 
@@ -2376,8 +2562,8 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
             old_actions = dp_netdev_flow_get_actions(netdev_flow);
             ovsrcu_set(&netdev_flow->actions, new_actions);
 
-            if (put->stats) {
-                get_dpif_flow_stats(netdev_flow, put->stats);
+            if (stats) {
+                get_dpif_flow_stats(netdev_flow, stats);
             }
             if (put->flags & DPIF_FP_ZERO_STATS) {
                 /* XXX: The userspace datapath uses thread local statistics
@@ -2401,39 +2587,140 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
         }
     }
     ovs_mutex_unlock(&pmd->flow_mutex);
-    dp_netdev_pmd_unref(pmd);
-
     return error;
 }
 
 static int
-dpif_netdev_flow_del(struct dpif *dpif, const struct dpif_flow_del *del)
+dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
-    struct dp_netdev_flow *netdev_flow;
+    struct netdev_flow_key key, mask;
     struct dp_netdev_pmd_thread *pmd;
-    unsigned pmd_id = del->pmd_id == PMD_ID_NULL
-                      ? NON_PMD_CORE_ID : del->pmd_id;
-    int error = 0;
-
-    pmd = dp_netdev_get_pmd(dp, pmd_id);
-    if (!pmd) {
-        return EINVAL;
+    struct match match;
+    ovs_u128 ufid;
+    int error;
+    bool probe = put->flags & DPIF_FP_PROBE;
+
+    if (put->stats) {
+        memset(put->stats, 0, sizeof *put->stats);
+    }
+    error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &match.flow,
+                                          probe);
+    if (error) {
+        return error;
+    }
+    error = dpif_netdev_mask_from_nlattrs(put->key, put->key_len,
+                                          put->mask, put->mask_len,
+                                          &match.flow, &match.wc, probe);
+    if (error) {
+        return error;
+    }
+
+    if (put->ufid) {
+        ufid = *put->ufid;
+    } else {
+        dpif_flow_hash(dpif, &match.flow, sizeof match.flow, &ufid);
+    }
+
+    /* Must produce a netdev_flow_key for lookup.
+     * Use the same method as employed to create the key when adding
+     * the flow to the dplcs to make sure they match. */
+    netdev_flow_mask_init(&mask, &match);
+    netdev_flow_key_init_masked(&key, &match.flow, &mask);
+
+    if (put->pmd_id == PMD_ID_NULL) {
+        if (cmap_count(&dp->poll_threads) == 0) {
+            return EINVAL;
+        }
+        CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+            struct dpif_flow_stats pmd_stats;
+            int pmd_error;
+
+            pmd_error = flow_put_on_pmd(pmd, &key, &match, &ufid, put,
+                                        &pmd_stats);
+            if (pmd_error) {
+                error = pmd_error;
+            } else if (put->stats) {
+                put->stats->n_packets += pmd_stats.n_packets;
+                put->stats->n_bytes += pmd_stats.n_bytes;
+                put->stats->used = MAX(put->stats->used, pmd_stats.used);
+                put->stats->tcp_flags |= pmd_stats.tcp_flags;
+            }
+        }
+    } else {
+        pmd = dp_netdev_get_pmd(dp, put->pmd_id);
+        if (!pmd) {
+            return EINVAL;
+        }
+        error = flow_put_on_pmd(pmd, &key, &match, &ufid, put, put->stats);
+        dp_netdev_pmd_unref(pmd);
     }
 
+    return error;
+}
+
+static int
+flow_del_on_pmd(struct dp_netdev_pmd_thread *pmd,
+                struct dpif_flow_stats *stats,
+                const struct dpif_flow_del *del)
+{
+    struct dp_netdev_flow *netdev_flow;
+    int error = 0;
+
     ovs_mutex_lock(&pmd->flow_mutex);
     netdev_flow = dp_netdev_pmd_find_flow(pmd, del->ufid, del->key,
                                           del->key_len);
     if (netdev_flow) {
-        if (del->stats) {
-            get_dpif_flow_stats(netdev_flow, del->stats);
+        if (stats) {
+            get_dpif_flow_stats(netdev_flow, stats);
         }
         dp_netdev_pmd_remove_flow(pmd, netdev_flow);
     } else {
         error = ENOENT;
     }
     ovs_mutex_unlock(&pmd->flow_mutex);
-    dp_netdev_pmd_unref(pmd);
+
+    return error;
+}
+
+static int
+dpif_netdev_flow_del(struct dpif *dpif, const struct dpif_flow_del *del)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_pmd_thread *pmd;
+    int error = 0;
+
+    if (del->stats) {
+        memset(del->stats, 0, sizeof *del->stats);
+    }
+
+    if (del->pmd_id == PMD_ID_NULL) {
+        if (cmap_count(&dp->poll_threads) == 0) {
+            return EINVAL;
+        }
+        CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+            struct dpif_flow_stats pmd_stats;
+            int pmd_error;
+
+            pmd_error = flow_del_on_pmd(pmd, &pmd_stats, del);
+            if (pmd_error) {
+                error = pmd_error;
+            } else if (del->stats) {
+                del->stats->n_packets += pmd_stats.n_packets;
+                del->stats->n_bytes += pmd_stats.n_bytes;
+                del->stats->used = MAX(del->stats->used, pmd_stats.used);
+                del->stats->tcp_flags |= pmd_stats.tcp_flags;
+            }
+        }
+    } else {
+        pmd = dp_netdev_get_pmd(dp, del->pmd_id);
+        if (!pmd) {
+            return EINVAL;
+        }
+        error = flow_del_on_pmd(pmd, del->stats, del);
+        dp_netdev_pmd_unref(pmd);
+    }
+
 
     return error;
 }
@@ -2454,7 +2741,8 @@ dpif_netdev_flow_dump_cast(struct dpif_flow_dump *dump)
 }
 
 static struct dpif_flow_dump *
-dpif_netdev_flow_dump_create(const struct dpif *dpif_, bool terse)
+dpif_netdev_flow_dump_create(const struct dpif *dpif_, bool terse,
+                             char *type OVS_UNUSED)
 {
     struct dpif_netdev_flow_dump *dump;
 
@@ -2612,6 +2900,16 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
         }
     }
 
+    if (execute->probe) {
+        /* If this is part of a probe, Drop the packet, since executing
+         * the action may actually cause spurious packets be sent into
+         * the network. */
+        if (pmd->core_id == NON_PMD_CORE_ID) {
+            dp_netdev_pmd_unref(pmd);
+        }
+        return 0;
+    }
+
     /* If the current thread is non-pmd thread, acquires
      * the 'non_pmd_mutex'. */
     if (pmd->core_id == NON_PMD_CORE_ID) {
@@ -2628,7 +2926,7 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
                                flow_hash_5tuple(execute->flow, 0));
     }
 
-    packet_batch_init_packet(&pp, execute->packet);
+    dp_packet_batch_init_packet(&pp, execute->packet);
     dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
                               execute->actions, execute->actions_len,
                               time_msec());
@@ -2669,12 +2967,17 @@ dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
     }
 }
 
-/* Changes the number or the affinity of pmd threads.  The changes are actually
- * applied in dpif_netdev_run(). */
+/* Applies datapath configuration from the database. Some of the changes are
+ * actually applied in dpif_netdev_run(). */
 static int
-dpif_netdev_pmd_set(struct dpif *dpif, const char *cmask)
+dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
+    const char *cmask = smap_get(other_config, "pmd-cpu-mask");
+    unsigned long long insert_prob =
+        smap_get_ullong(other_config, "emc-insert-inv-prob",
+                        DEFAULT_EM_FLOW_INSERT_INV_PROB);
+    uint32_t insert_min, cur_min;
 
     if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
         free(dp->pmd_cmask);
@@ -2682,6 +2985,24 @@ dpif_netdev_pmd_set(struct dpif *dpif, const char *cmask)
         dp_netdev_request_reconfigure(dp);
     }
 
+    atomic_read_relaxed(&dp->emc_insert_min, &cur_min);
+    if (insert_prob <= UINT32_MAX) {
+        insert_min = insert_prob == 0 ? 0 : UINT32_MAX / insert_prob;
+    } else {
+        insert_min = DEFAULT_EM_FLOW_INSERT_MIN;
+        insert_prob = DEFAULT_EM_FLOW_INSERT_INV_PROB;
+    }
+
+    if (insert_min != cur_min) {
+        atomic_store_relaxed(&dp->emc_insert_min, insert_min);
+        if (insert_min == 0) {
+            VLOG_INFO("EMC has been disabled");
+        } else {
+            VLOG_INFO("EMC insertion probability changed to 1/%llu (~%.2f%%)",
+                      insert_prob, (100 / (float)insert_prob));
+        }
+    }
+
     return 0;
 }
 
@@ -2785,7 +3106,7 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
 
 \f
 /* Creates and returns a new 'struct dp_netdev_actions', whose actions are
- * a copy of the 'ofpacts_len' bytes of 'ofpacts'. */
+ * a copy of the 'size' bytes of 'actions' input parameters. */
 struct dp_netdev_actions *
 dp_netdev_actions_create(const struct nlattr *actions, size_t size)
 {
@@ -2844,30 +3165,95 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd,
     non_atomic_ullong_add(&pmd->cycles.n[type], interval);
 }
 
+/* Calculate the intermediate cycle result and add to the counter 'type' */
+static inline void
+cycles_count_intermediate(struct dp_netdev_pmd_thread *pmd,
+                          struct dp_netdev_rxq *rxq,
+                          enum pmd_cycles_counter_type type)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
+{
+    unsigned long long new_cycles = cycles_counter();
+    unsigned long long interval = new_cycles - pmd->last_cycles;
+    pmd->last_cycles = new_cycles;
+
+    non_atomic_ullong_add(&pmd->cycles.n[type], interval);
+    if (rxq && (type == PMD_CYCLES_PROCESSING)) {
+        /* Add to the amount of current processing cycles. */
+        non_atomic_ullong_add(&rxq->cycles[RXQ_CYCLES_PROC_CURR], interval);
+    }
+}
+
 static void
+dp_netdev_rxq_set_cycles(struct dp_netdev_rxq *rx,
+                         enum rxq_cycles_counter_type type,
+                         unsigned long long cycles)
+{
+   atomic_store_relaxed(&rx->cycles[type], cycles);
+}
+
+static uint64_t
+dp_netdev_rxq_get_cycles(struct dp_netdev_rxq *rx,
+                         enum rxq_cycles_counter_type type)
+{
+    unsigned long long processing_cycles;
+    atomic_read_relaxed(&rx->cycles[type], &processing_cycles);
+    return processing_cycles;
+}
+
+static void
+dp_netdev_rxq_set_intrvl_cycles(struct dp_netdev_rxq *rx,
+                                unsigned long long cycles)
+{
+    unsigned int idx = rx->intrvl_idx++ % PMD_RXQ_INTERVAL_MAX;
+    atomic_store_relaxed(&rx->cycles_intrvl[idx], cycles);
+}
+
+static uint64_t
+dp_netdev_rxq_get_intrvl_cycles(struct dp_netdev_rxq *rx, unsigned idx)
+{
+    unsigned long long processing_cycles;
+    atomic_read_relaxed(&rx->cycles_intrvl[idx], &processing_cycles);
+    return processing_cycles;
+}
+
+static int
 dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
-                           struct dp_netdev_port *port,
-                           struct netdev_rxq *rxq)
+                           struct netdev_rxq *rx,
+                           odp_port_t port_no)
 {
     struct dp_packet_batch batch;
     int error;
+    int batch_cnt = 0;
 
     dp_packet_batch_init(&batch);
-    cycles_count_start(pmd);
-    error = netdev_rxq_recv(rxq, &batch);
-    cycles_count_end(pmd, PMD_CYCLES_POLLING);
+    error = netdev_rxq_recv(rx, &batch);
     if (!error) {
         *recirc_depth_get() = 0;
 
-        cycles_count_start(pmd);
-        dp_netdev_input(pmd, &batch, port->port_no);
-        cycles_count_end(pmd, PMD_CYCLES_PROCESSING);
+        batch_cnt = batch.count;
+        dp_netdev_input(pmd, &batch, port_no);
     } else if (error != EAGAIN && error != EOPNOTSUPP) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
         VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
-                    netdev_get_name(port->netdev), ovs_strerror(error));
+                    netdev_rxq_get_name(rx), ovs_strerror(error));
+    }
+
+    return batch_cnt;
+}
+
+static struct tx_port *
+tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
+{
+    struct tx_port *tx;
+
+    HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
+        if (tx->port->port_no == port_no) {
+            return tx;
+        }
     }
+
+    return NULL;
 }
 
 static int
@@ -2876,23 +3262,24 @@ port_reconfigure(struct dp_netdev_port *port)
     struct netdev *netdev = port->netdev;
     int i, err;
 
-    if (!netdev_is_reconf_required(netdev)) {
-        return 0;
-    }
+    port->need_reconfigure = false;
 
     /* Closes the existing 'rxq's. */
     for (i = 0; i < port->n_rxq; i++) {
-        netdev_rxq_close(port->rxqs[i].rxq);
-        port->rxqs[i].rxq = NULL;
+        netdev_rxq_close(port->rxqs[i].rx);
+        port->rxqs[i].rx = NULL;
     }
+    unsigned last_nrxq = port->n_rxq;
     port->n_rxq = 0;
 
     /* Allows 'netdev' to apply the pending configuration changes. */
-    err = netdev_reconfigure(netdev);
-    if (err && (err != EOPNOTSUPP)) {
-        VLOG_ERR("Failed to set interface %s new configuration",
-                 netdev_get_name(netdev));
-        return err;
+    if (netdev_is_reconf_required(netdev)) {
+        err = netdev_reconfigure(netdev);
+        if (err && (err != EOPNOTSUPP)) {
+            VLOG_ERR("Failed to set interface %s new configuration",
+                     netdev_get_name(netdev));
+            return err;
+        }
     }
     /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */
     port->rxqs = xrealloc(port->rxqs,
@@ -2902,7 +3289,14 @@ port_reconfigure(struct dp_netdev_port *port)
     port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
 
     for (i = 0; i < netdev_n_rxq(netdev); i++) {
-        err = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);
+        bool new_queue = i >= last_nrxq;
+        if (new_queue) {
+            memset(&port->rxqs[i], 0, sizeof port->rxqs[i]);
+        }
+
+        port->rxqs[i].port = port;
+
+        err = netdev_rxq_open(netdev, &port->rxqs[i].rx, i);
         if (err) {
             return err;
         }
@@ -2915,42 +3309,533 @@ port_reconfigure(struct dp_netdev_port *port)
     return 0;
 }
 
+struct rr_numa_list {
+    struct hmap numas;  /* Contains 'struct rr_numa' */
+};
+
+struct rr_numa {
+    struct hmap_node node;
+
+    int numa_id;
+
+    /* Non isolated pmds on numa node 'numa_id' */
+    struct dp_netdev_pmd_thread **pmds;
+    int n_pmds;
+
+    int cur_index;
+    bool idx_inc;
+};
+
+static struct rr_numa *
+rr_numa_list_lookup(struct rr_numa_list *rr, int numa_id)
+{
+    struct rr_numa *numa;
+
+    HMAP_FOR_EACH_WITH_HASH (numa, node, hash_int(numa_id, 0), &rr->numas) {
+        if (numa->numa_id == numa_id) {
+            return numa;
+        }
+    }
+
+    return NULL;
+}
+
+/* Returns the next node in numa list following 'numa' in round-robin fashion.
+ * Returns first node if 'numa' is a null pointer or the last node in 'rr'.
+ * Returns NULL if 'rr' numa list is empty. */
+static struct rr_numa *
+rr_numa_list_next(struct rr_numa_list *rr, const struct rr_numa *numa)
+{
+    struct hmap_node *node = NULL;
+
+    if (numa) {
+        node = hmap_next(&rr->numas, &numa->node);
+    }
+    if (!node) {
+        node = hmap_first(&rr->numas);
+    }
+
+    return (node) ? CONTAINER_OF(node, struct rr_numa, node) : NULL;
+}
+
+static void
+rr_numa_list_populate(struct dp_netdev *dp, struct rr_numa_list *rr)
+{
+    struct dp_netdev_pmd_thread *pmd;
+    struct rr_numa *numa;
+
+    hmap_init(&rr->numas);
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->core_id == NON_PMD_CORE_ID || pmd->isolated) {
+            continue;
+        }
+
+        numa = rr_numa_list_lookup(rr, pmd->numa_id);
+        if (!numa) {
+            numa = xzalloc(sizeof *numa);
+            numa->numa_id = pmd->numa_id;
+            hmap_insert(&rr->numas, &numa->node, hash_int(pmd->numa_id, 0));
+        }
+        numa->n_pmds++;
+        numa->pmds = xrealloc(numa->pmds, numa->n_pmds * sizeof *numa->pmds);
+        numa->pmds[numa->n_pmds - 1] = pmd;
+        /* At least one pmd so initialise curr_idx and idx_inc. */
+        numa->cur_index = 0;
+        numa->idx_inc = true;
+    }
+}
+
+/* Returns the next pmd from the numa node in
+ * incrementing or decrementing order. */
+static struct dp_netdev_pmd_thread *
+rr_numa_get_pmd(struct rr_numa *numa)
+{
+    int numa_idx = numa->cur_index;
+
+    if (numa->idx_inc == true) {
+        /* Incrementing through list of pmds. */
+        if (numa->cur_index == numa->n_pmds-1) {
+            /* Reached the last pmd. */
+            numa->idx_inc = false;
+        } else {
+            numa->cur_index++;
+        }
+    } else {
+        /* Decrementing through list of pmds. */
+        if (numa->cur_index == 0) {
+            /* Reached the first pmd. */
+            numa->idx_inc = true;
+        } else {
+            numa->cur_index--;
+        }
+    }
+    return numa->pmds[numa_idx];
+}
+
+static void
+rr_numa_list_destroy(struct rr_numa_list *rr)
+{
+    struct rr_numa *numa;
+
+    HMAP_FOR_EACH_POP (numa, node, &rr->numas) {
+        free(numa->pmds);
+        free(numa);
+    }
+    hmap_destroy(&rr->numas);
+}
+
+/* Sort Rx Queues by the processing cycles they are consuming. */
+static int
+compare_rxq_cycles(const void *a, const void *b)
+{
+    struct dp_netdev_rxq *qa;
+    struct dp_netdev_rxq *qb;
+    uint64_t cycles_qa, cycles_qb;
+
+    qa = *(struct dp_netdev_rxq **) a;
+    qb = *(struct dp_netdev_rxq **) b;
+
+    cycles_qa = dp_netdev_rxq_get_cycles(qa, RXQ_CYCLES_PROC_HIST);
+    cycles_qb = dp_netdev_rxq_get_cycles(qb, RXQ_CYCLES_PROC_HIST);
+
+    if (cycles_qa != cycles_qb) {
+        return (cycles_qa < cycles_qb) ? 1 : -1;
+    } else {
+        /* Cycles are the same so tiebreak on port/queue id.
+         * Tiebreaking (as opposed to return 0) ensures consistent
+         * sort results across multiple OS's. */
+        uint32_t port_qa = odp_to_u32(qa->port->port_no);
+        uint32_t port_qb = odp_to_u32(qb->port->port_no);
+        if (port_qa != port_qb) {
+            return port_qa > port_qb ? 1 : -1;
+        } else {
+            return netdev_rxq_get_queue_id(qa->rx)
+                    - netdev_rxq_get_queue_id(qb->rx);
+        }
+    }
+}
+
+/* Assign pmds to queues.  If 'pinned' is true, assign pmds to pinned
+ * queues and marks the pmds as isolated.  Otherwise, assign non isolated
+ * pmds to unpinned queues.
+ *
+ * If 'pinned' is false queues will be sorted by processing cycles they are
+ * consuming and then assigned to pmds in round robin order.
+ *
+ * The function doesn't touch the pmd threads, it just stores the assignment
+ * in the 'pmd' member of each rxq. */
+static void
+rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
+{
+    struct dp_netdev_port *port;
+    struct rr_numa_list rr;
+    struct rr_numa *non_local_numa = NULL;
+    struct dp_netdev_rxq ** rxqs = NULL;
+    int i, n_rxqs = 0;
+    struct rr_numa *numa = NULL;
+    int numa_id;
+
+    HMAP_FOR_EACH (port, node, &dp->ports) {
+        if (!netdev_is_pmd(port->netdev)) {
+            continue;
+        }
+
+        for (int qid = 0; qid < port->n_rxq; qid++) {
+            struct dp_netdev_rxq *q = &port->rxqs[qid];
+
+            if (pinned && q->core_id != OVS_CORE_UNSPEC) {
+                struct dp_netdev_pmd_thread *pmd;
+
+                pmd = dp_netdev_get_pmd(dp, q->core_id);
+                if (!pmd) {
+                    VLOG_WARN("There is no PMD thread on core %d. Queue "
+                              "%d on port \'%s\' will not be polled.",
+                              q->core_id, qid, netdev_get_name(port->netdev));
+                } else {
+                    q->pmd = pmd;
+                    pmd->isolated = true;
+                    dp_netdev_pmd_unref(pmd);
+                }
+            } else if (!pinned && q->core_id == OVS_CORE_UNSPEC) {
+                uint64_t cycle_hist = 0;
+
+                if (n_rxqs == 0) {
+                    rxqs = xmalloc(sizeof *rxqs);
+                } else {
+                    rxqs = xrealloc(rxqs, sizeof *rxqs * (n_rxqs + 1));
+                }
+                /* Sum the queue intervals and store the cycle history. */
+                for (unsigned i = 0; i < PMD_RXQ_INTERVAL_MAX; i++) {
+                    cycle_hist += dp_netdev_rxq_get_intrvl_cycles(q, i);
+                }
+                dp_netdev_rxq_set_cycles(q, RXQ_CYCLES_PROC_HIST, cycle_hist);
+
+                /* Store the queue. */
+                rxqs[n_rxqs++] = q;
+            }
+        }
+    }
+
+    if (n_rxqs > 1) {
+        /* Sort the queues in order of the processing cycles
+         * they consumed during their last pmd interval. */
+        qsort(rxqs, n_rxqs, sizeof *rxqs, compare_rxq_cycles);
+    }
+
+    rr_numa_list_populate(dp, &rr);
+    /* Assign the sorted queues to pmds in round robin. */
+    for (i = 0; i < n_rxqs; i++) {
+        numa_id = netdev_get_numa_id(rxqs[i]->port->netdev);
+        numa = rr_numa_list_lookup(&rr, numa_id);
+        if (!numa) {
+            /* There are no pmds on the queue's local NUMA node.
+               Round robin on the NUMA nodes that do have pmds. */
+            non_local_numa = rr_numa_list_next(&rr, non_local_numa);
+            if (!non_local_numa) {
+                VLOG_ERR("There is no available (non-isolated) pmd "
+                         "thread for port \'%s\' queue %d. This queue "
+                         "will not be polled. Is pmd-cpu-mask set to "
+                         "zero? Or are all PMDs isolated to other "
+                         "queues?", netdev_rxq_get_name(rxqs[i]->rx),
+                         netdev_rxq_get_queue_id(rxqs[i]->rx));
+                continue;
+            }
+            rxqs[i]->pmd = rr_numa_get_pmd(non_local_numa);
+            VLOG_WARN("There's no available (non-isolated) pmd thread "
+                      "on numa node %d. Queue %d on port \'%s\' will "
+                      "be assigned to the pmd on core %d "
+                      "(numa node %d). Expect reduced performance.",
+                      numa_id, netdev_rxq_get_queue_id(rxqs[i]->rx),
+                      netdev_rxq_get_name(rxqs[i]->rx),
+                      rxqs[i]->pmd->core_id, rxqs[i]->pmd->numa_id);
+        } else {
+        rxqs[i]->pmd = rr_numa_get_pmd(numa);
+        VLOG_INFO("Core %d on numa node %d assigned port \'%s\' "
+                  "rx queue %d (measured processing cycles %"PRIu64").",
+                  rxqs[i]->pmd->core_id, numa_id,
+                  netdev_rxq_get_name(rxqs[i]->rx),
+                  netdev_rxq_get_queue_id(rxqs[i]->rx),
+                  dp_netdev_rxq_get_cycles(rxqs[i], RXQ_CYCLES_PROC_HIST));
+        }
+    }
+
+    rr_numa_list_destroy(&rr);
+    free(rxqs);
+}
+
+static void
+reload_affected_pmds(struct dp_netdev *dp)
+{
+    struct dp_netdev_pmd_thread *pmd;
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->need_reload) {
+            dp_netdev_reload_pmd__(pmd);
+            pmd->need_reload = false;
+        }
+    }
+}
+
 static void
 reconfigure_pmd_threads(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex)
 {
-    struct dp_netdev_port *port, *next;
-    int n_cores;
+    struct dp_netdev_pmd_thread *pmd;
+    struct ovs_numa_dump *pmd_cores;
+    struct ovs_numa_info_core *core;
+    struct hmapx to_delete = HMAPX_INITIALIZER(&to_delete);
+    struct hmapx_node *node;
+    bool changed = false;
+    bool need_to_adjust_static_tx_qids = false;
+
+    /* The pmd threads should be started only if there's a pmd port in the
+     * datapath.  If the user didn't provide any "pmd-cpu-mask", we start
+     * NR_PMD_THREADS per numa node. */
+    if (!has_pmd_port(dp)) {
+        pmd_cores = ovs_numa_dump_n_cores_per_numa(0);
+    } else if (dp->pmd_cmask && dp->pmd_cmask[0]) {
+        pmd_cores = ovs_numa_dump_cores_with_cmask(dp->pmd_cmask);
+    } else {
+        pmd_cores = ovs_numa_dump_n_cores_per_numa(NR_PMD_THREADS);
+    }
+
+    /* We need to adjust 'static_tx_qid's only if we're reducing number of
+     * PMD threads. Otherwise, new threads will allocate all the freed ids. */
+    if (ovs_numa_dump_count(pmd_cores) < cmap_count(&dp->poll_threads) - 1) {
+        /* Adjustment is required to keep 'static_tx_qid's sequential and
+         * avoid possible issues, for example, imbalanced tx queue usage
+         * and unnecessary locking caused by remapping on netdev level. */
+        need_to_adjust_static_tx_qids = true;
+    }
+
+    /* Check for unwanted pmd threads */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->core_id == NON_PMD_CORE_ID) {
+            continue;
+        }
+        if (!ovs_numa_dump_contains_core(pmd_cores, pmd->numa_id,
+                                                    pmd->core_id)) {
+            hmapx_add(&to_delete, pmd);
+        } else if (need_to_adjust_static_tx_qids) {
+            pmd->need_reload = true;
+        }
+    }
+
+    HMAPX_FOR_EACH (node, &to_delete) {
+        pmd = (struct dp_netdev_pmd_thread *) node->data;
+        VLOG_INFO("PMD thread on numa_id: %d, core id: %2d destroyed.",
+                  pmd->numa_id, pmd->core_id);
+        dp_netdev_del_pmd(dp, pmd);
+    }
+    changed = !hmapx_is_empty(&to_delete);
+    hmapx_destroy(&to_delete);
+
+    if (need_to_adjust_static_tx_qids) {
+        /* 'static_tx_qid's are not sequential now.
+         * Reload remaining threads to fix this. */
+        reload_affected_pmds(dp);
+    }
+
+    /* Check for required new pmd threads */
+    FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
+        pmd = dp_netdev_get_pmd(dp, core->core_id);
+        if (!pmd) {
+            pmd = xzalloc(sizeof *pmd);
+            dp_netdev_configure_pmd(pmd, dp, core->core_id, core->numa_id);
+            pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
+            VLOG_INFO("PMD thread on numa_id: %d, core id: %2d created.",
+                      pmd->numa_id, pmd->core_id);
+            changed = true;
+        } else {
+            dp_netdev_pmd_unref(pmd);
+        }
+    }
+
+    if (changed) {
+        struct ovs_numa_info_numa *numa;
+
+        /* Log the number of pmd threads per numa node. */
+        FOR_EACH_NUMA_ON_DUMP (numa, pmd_cores) {
+            VLOG_INFO("There are %"PRIuSIZE" pmd threads on numa node %d",
+                      numa->n_cores, numa->numa_id);
+        }
+    }
+
+    ovs_numa_dump_destroy(pmd_cores);
+}
+
+static void
+pmd_remove_stale_ports(struct dp_netdev *dp,
+                       struct dp_netdev_pmd_thread *pmd)
+    OVS_EXCLUDED(pmd->port_mutex)
+    OVS_REQUIRES(dp->port_mutex)
+{
+    struct rxq_poll *poll, *poll_next;
+    struct tx_port *tx, *tx_next;
+
+    ovs_mutex_lock(&pmd->port_mutex);
+    HMAP_FOR_EACH_SAFE (poll, poll_next, node, &pmd->poll_list) {
+        struct dp_netdev_port *port = poll->rxq->port;
+
+        if (port->need_reconfigure
+            || !hmap_contains(&dp->ports, &port->node)) {
+            dp_netdev_del_rxq_from_pmd(pmd, poll);
+        }
+    }
+    HMAP_FOR_EACH_SAFE (tx, tx_next, node, &pmd->tx_ports) {
+        struct dp_netdev_port *port = tx->port;
+
+        if (port->need_reconfigure
+            || !hmap_contains(&dp->ports, &port->node)) {
+            dp_netdev_del_port_tx_from_pmd(pmd, tx);
+        }
+    }
+    ovs_mutex_unlock(&pmd->port_mutex);
+}
+
+/* Must be called each time a port is added/removed or the cmask changes.
+ * This creates and destroys pmd threads, reconfigures ports, opens their
+ * rxqs and assigns all rxqs/txqs to pmd threads. */
+static void
+reconfigure_datapath(struct dp_netdev *dp)
+    OVS_REQUIRES(dp->port_mutex)
+{
+    struct dp_netdev_pmd_thread *pmd;
+    struct dp_netdev_port *port;
+    int wanted_txqs;
 
     dp->last_reconfigure_seq = seq_read(dp->reconfigure_seq);
 
-    dp_netdev_destroy_all_pmds(dp);
+    /* Step 1: Adjust the pmd threads based on the datapath ports, the cores
+     * on the system and the user configuration. */
+    reconfigure_pmd_threads(dp);
 
-    /* Reconfigures the cpu mask. */
-    ovs_numa_set_cpu_mask(dp->pmd_cmask);
+    wanted_txqs = cmap_count(&dp->poll_threads);
 
-    n_cores = ovs_numa_get_n_cores();
-    if (n_cores == OVS_CORE_UNSPEC) {
-        VLOG_ERR("Cannot get cpu core info");
-        return;
+    /* The number of pmd threads might have changed, or a port can be new:
+     * adjust the txqs. */
+    HMAP_FOR_EACH (port, node, &dp->ports) {
+        netdev_set_tx_multiq(port->netdev, wanted_txqs);
     }
 
-    HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) {
+    /* Step 2: Remove from the pmd threads ports that have been removed or
+     * need reconfiguration. */
+
+    /* Check for all the ports that need reconfiguration.  We cache this in
+     * 'port->need_reconfigure', because netdev_is_reconf_required() can
+     * change at any time. */
+    HMAP_FOR_EACH (port, node, &dp->ports) {
+        if (netdev_is_reconf_required(port->netdev)) {
+            port->need_reconfigure = true;
+        }
+    }
+
+    /* Remove from the pmd threads all the ports that have been deleted or
+     * need reconfiguration. */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        pmd_remove_stale_ports(dp, pmd);
+    }
+
+    /* Reload affected pmd threads.  We must wait for the pmd threads before
+     * reconfiguring the ports, because a port cannot be reconfigured while
+     * it's being used. */
+    reload_affected_pmds(dp);
+
+    /* Step 3: Reconfigure ports. */
+
+    /* We only reconfigure the ports that we determined above, because they're
+     * not being used by any pmd thread at the moment.  If a port fails to
+     * reconfigure we remove it from the datapath. */
+    struct dp_netdev_port *next_port;
+    HMAP_FOR_EACH_SAFE (port, next_port, node, &dp->ports) {
         int err;
 
+        if (!port->need_reconfigure) {
+            continue;
+        }
+
         err = port_reconfigure(port);
         if (err) {
             hmap_remove(&dp->ports, &port->node);
             seq_change(dp->port_seq);
             port_destroy(port);
         } else {
-            port->dynamic_txqs = netdev_n_txq(port->netdev) < n_cores + 1;
+            port->dynamic_txqs = netdev_n_txq(port->netdev) < wanted_txqs;
         }
     }
-    /* Restores the non-pmd. */
-    dp_netdev_set_nonpmd(dp);
-    /* Restores all pmd threads. */
-    dp_netdev_reset_pmd_threads(dp);
+
+    /* Step 4: Compute new rxq scheduling.  We don't touch the pmd threads
+     * for now, we just update the 'pmd' pointer in each rxq to point to the
+     * wanted thread according to the scheduling policy. */
+
+    /* Reset all the pmd threads to non isolated. */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        pmd->isolated = false;
+    }
+
+    /* Reset all the queues to unassigned */
+    HMAP_FOR_EACH (port, node, &dp->ports) {
+        for (int i = 0; i < port->n_rxq; i++) {
+            port->rxqs[i].pmd = NULL;
+        }
+    }
+
+    /* Add pinned queues and mark pmd threads isolated. */
+    rxq_scheduling(dp, true);
+
+    /* Add non-pinned queues. */
+    rxq_scheduling(dp, false);
+
+    /* Step 5: Remove queues not compliant with new scheduling. */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        struct rxq_poll *poll, *poll_next;
+
+        ovs_mutex_lock(&pmd->port_mutex);
+        HMAP_FOR_EACH_SAFE (poll, poll_next, node, &pmd->poll_list) {
+            if (poll->rxq->pmd != pmd) {
+                dp_netdev_del_rxq_from_pmd(pmd, poll);
+            }
+        }
+        ovs_mutex_unlock(&pmd->port_mutex);
+    }
+
+    /* Reload affected pmd threads.  We must wait for the pmd threads to remove
+     * the old queues before readding them, otherwise a queue can be polled by
+     * two threads at the same time. */
+    reload_affected_pmds(dp);
+
+    /* Step 6: Add queues from scheduling, if they're not there already. */
+    HMAP_FOR_EACH (port, node, &dp->ports) {
+        if (!netdev_is_pmd(port->netdev)) {
+            continue;
+        }
+
+        for (int qid = 0; qid < port->n_rxq; qid++) {
+            struct dp_netdev_rxq *q = &port->rxqs[qid];
+
+            if (q->pmd) {
+                ovs_mutex_lock(&q->pmd->port_mutex);
+                dp_netdev_add_rxq_to_pmd(q->pmd, q);
+                ovs_mutex_unlock(&q->pmd->port_mutex);
+            }
+        }
+    }
+
+    /* Add every port to the tx cache of every pmd thread, if it's not
+     * there already and if this pmd has at least one rxq to poll. */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        ovs_mutex_lock(&pmd->port_mutex);
+        if (hmap_count(&pmd->poll_list) || pmd->core_id == NON_PMD_CORE_ID) {
+            HMAP_FOR_EACH (port, node, &dp->ports) {
+                dp_netdev_add_port_tx_to_pmd(pmd, port);
+            }
+        }
+        ovs_mutex_unlock(&pmd->port_mutex);
+    }
+
+    /* Reload affected pmd threads. */
+    reload_affected_pmds(dp);
 }
 
 /* Returns true if one of the netdevs in 'dp' requires a reconfiguration */
@@ -2977,21 +3862,30 @@ dpif_netdev_run(struct dpif *dpif)
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_pmd_thread *non_pmd;
     uint64_t new_tnl_seq;
+    int process_packets = 0;
 
     ovs_mutex_lock(&dp->port_mutex);
     non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
     if (non_pmd) {
         ovs_mutex_lock(&dp->non_pmd_mutex);
+        cycles_count_start(non_pmd);
         HMAP_FOR_EACH (port, node, &dp->ports) {
             if (!netdev_is_pmd(port->netdev)) {
                 int i;
 
                 for (i = 0; i < port->n_rxq; i++) {
-                    dp_netdev_process_rxq_port(non_pmd, port,
-                                               port->rxqs[i].rxq);
+                    process_packets =
+                        dp_netdev_process_rxq_port(non_pmd,
+                                                   port->rxqs[i].rx,
+                                                   port->port_no);
+                    cycles_count_intermediate(non_pmd, NULL,
+                                              process_packets
+                                              ? PMD_CYCLES_PROCESSING
+                                              : PMD_CYCLES_IDLE);
                 }
             }
         }
+        cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
         dpif_netdev_xps_revalidate_pmd(non_pmd, time_msec(), false);
         ovs_mutex_unlock(&dp->non_pmd_mutex);
 
@@ -2999,7 +3893,7 @@ dpif_netdev_run(struct dpif *dpif)
     }
 
     if (dp_netdev_is_reconf_required(dp) || ports_require_restart(dp)) {
-        reconfigure_pmd_threads(dp);
+        reconfigure_datapath(dp);
     }
     ovs_mutex_unlock(&dp->port_mutex);
 
@@ -3028,7 +3922,7 @@ dpif_netdev_wait(struct dpif *dpif)
             int i;
 
             for (i = 0; i < port->n_rxq; i++) {
-                netdev_rxq_wait(port->rxqs[i].rxq);
+                netdev_rxq_wait(port->rxqs[i].rx);
             }
         }
     }
@@ -3054,7 +3948,9 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
 }
 
 /* Copies ports from 'pmd->tx_ports' (shared with the main thread) to
- * 'pmd->port_cache' (thread local) */
+ * thread-local copies. Copy to 'pmd->tnl_port_cache' if it is a tunnel
+ * device, otherwise to 'pmd->send_port_cache' if the port has at least
+ * one txq. */
 static void
 pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
     OVS_REQUIRES(pmd->port_mutex)
@@ -3080,20 +3976,45 @@ pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
     }
 }
 
+static void
+pmd_alloc_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
+{
+    ovs_mutex_lock(&pmd->dp->tx_qid_pool_mutex);
+    if (!id_pool_alloc_id(pmd->dp->tx_qid_pool, &pmd->static_tx_qid)) {
+        VLOG_ABORT("static_tx_qid allocation failed for PMD on core %2d"
+                   ", numa_id %d.", pmd->core_id, pmd->numa_id);
+    }
+    ovs_mutex_unlock(&pmd->dp->tx_qid_pool_mutex);
+
+    VLOG_DBG("static_tx_qid = %d allocated for PMD thread on core %2d"
+             ", numa_id %d.", pmd->static_tx_qid, pmd->core_id, pmd->numa_id);
+}
+
+static void
+pmd_free_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
+{
+    ovs_mutex_lock(&pmd->dp->tx_qid_pool_mutex);
+    id_pool_free_id(pmd->dp->tx_qid_pool, pmd->static_tx_qid);
+    ovs_mutex_unlock(&pmd->dp->tx_qid_pool_mutex);
+}
+
 static int
 pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
-                          struct rxq_poll **ppoll_list)
+                          struct polled_queue **ppoll_list)
 {
-    struct rxq_poll *poll_list = *ppoll_list;
+    struct polled_queue *poll_list = *ppoll_list;
     struct rxq_poll *poll;
     int i;
 
     ovs_mutex_lock(&pmd->port_mutex);
-    poll_list = xrealloc(poll_list, pmd->poll_cnt * sizeof *poll_list);
+    poll_list = xrealloc(poll_list, hmap_count(&pmd->poll_list)
+                                    * sizeof *poll_list);
 
     i = 0;
-    LIST_FOR_EACH (poll, node, &pmd->poll_list) {
-        poll_list[i++] = *poll;
+    HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+        poll_list[i].rxq = poll->rxq;
+        poll_list[i].port_no = poll->rxq->port->port_no;
+        i++;
     }
 
     pmd_load_cached_ports(pmd);
@@ -3109,10 +4030,11 @@ pmd_thread_main(void *f_)
 {
     struct dp_netdev_pmd_thread *pmd = f_;
     unsigned int lc = 0;
-    struct rxq_poll *poll_list;
+    struct polled_queue *poll_list;
     bool exiting;
     int poll_cnt;
     int i;
+    int process_packets = 0;
 
     poll_list = NULL;
 
@@ -3121,14 +4043,15 @@ pmd_thread_main(void *f_)
     ovs_numa_thread_setaffinity_core(pmd->core_id);
     dpdk_set_lcore_id(pmd->core_id);
     poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
-reload:
     emc_cache_init(&pmd->flow_cache);
+reload:
+    pmd_alloc_static_tx_qid(pmd);
 
     /* List port/core affinity */
     for (i = 0; i < poll_cnt; i++) {
        VLOG_DBG("Core %d processing port \'%s\' with queue-id %d\n",
-                pmd->core_id, netdev_get_name(poll_list[i].port->netdev),
-                netdev_rxq_get_queue_id(poll_list[i].rx));
+                pmd->core_id, netdev_rxq_get_name(poll_list[i].rxq->rx),
+                netdev_rxq_get_queue_id(poll_list[i].rxq->rx));
     }
 
     if (!poll_cnt) {
@@ -3139,9 +4062,15 @@ reload:
         lc = UINT_MAX;
     }
 
+    cycles_count_start(pmd);
     for (;;) {
         for (i = 0; i < poll_cnt; i++) {
-            dp_netdev_process_rxq_port(pmd, poll_list[i].port, poll_list[i].rx);
+            process_packets =
+                dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx,
+                                           poll_list[i].port_no);
+            cycles_count_intermediate(pmd, poll_list[i].rxq,
+                                      process_packets ? PMD_CYCLES_PROCESSING
+                                                      : PMD_CYCLES_IDLE);
         }
 
         if (lc++ > 1024) {
@@ -3150,7 +4079,7 @@ reload:
             lc = 0;
 
             coverage_try_clear();
-            dp_netdev_pmd_try_optimize(pmd);
+            dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt);
             if (!ovsrcu_try_quiesce()) {
                 emc_cache_slow_sweep(&pmd->flow_cache);
             }
@@ -3162,18 +4091,21 @@ reload:
         }
     }
 
+    cycles_count_end(pmd, PMD_CYCLES_IDLE);
+
     poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
     exiting = latch_is_set(&pmd->exit_latch);
     /* Signal here to make sure the pmd finishes
      * reloading the updated configuration. */
     dp_netdev_pmd_reload_done(pmd);
 
-    emc_cache_uninit(&pmd->flow_cache);
+    pmd_free_static_tx_qid(pmd);
 
     if (!exiting) {
         goto reload;
     }
 
+    emc_cache_uninit(&pmd->flow_cache);
     free(poll_list);
     pmd_free_cached_ports(pmd);
     return NULL;
@@ -3186,6 +4118,298 @@ dp_netdev_disable_upcall(struct dp_netdev *dp)
     fat_rwlock_wrlock(&dp->upcall_rwlock);
 }
 
+\f
+/* Meters */
+static void
+dpif_netdev_meter_get_features(const struct dpif * dpif OVS_UNUSED,
+                               struct ofputil_meter_features *features)
+{
+    features->max_meters = MAX_METERS;
+    features->band_types = DP_SUPPORTED_METER_BAND_TYPES;
+    features->capabilities = DP_SUPPORTED_METER_FLAGS_MASK;
+    features->max_bands = MAX_BANDS;
+    features->max_color = 0;
+}
+
+/* Returns false when packet needs to be dropped. */
+static void
+dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
+                    uint32_t meter_id, long long int now)
+{
+    struct dp_meter *meter;
+    struct dp_meter_band *band;
+    struct dp_packet *packet;
+    long long int long_delta_t; /* msec */
+    uint32_t delta_t; /* msec */
+    int i;
+    const size_t cnt = dp_packet_batch_size(packets_);
+    uint32_t bytes, volume;
+    int exceeded_band[NETDEV_MAX_BURST];
+    uint32_t exceeded_rate[NETDEV_MAX_BURST];
+    int exceeded_pkt = cnt; /* First packet that exceeded a band rate. */
+
+    if (meter_id >= MAX_METERS) {
+        return;
+    }
+
+    meter_lock(dp, meter_id);
+    meter = dp->meters[meter_id];
+    if (!meter) {
+        goto out;
+    }
+
+    /* Initialize as negative values. */
+    memset(exceeded_band, 0xff, cnt * sizeof *exceeded_band);
+    /* Initialize as zeroes. */
+    memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
+
+    /* All packets will hit the meter at the same time. */
+    long_delta_t = (now - meter->used); /* msec */
+
+    /* Make sure delta_t will not be too large, so that bucket will not
+     * wrap around below. */
+    delta_t = (long_delta_t > (long long int)meter->max_delta_t)
+        ? meter->max_delta_t : (uint32_t)long_delta_t;
+
+    /* Update meter stats. */
+    meter->used = now;
+    meter->packet_count += cnt;
+    bytes = 0;
+    DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+        bytes += dp_packet_size(packet);
+    }
+    meter->byte_count += bytes;
+
+    /* Meters can operate in terms of packets per second or kilobits per
+     * second. */
+    if (meter->flags & OFPMF13_PKTPS) {
+        /* Rate in packets/second, bucket 1/1000 packets. */
+        /* msec * packets/sec = 1/1000 packets. */
+        volume = cnt * 1000; /* Take 'cnt' packets from the bucket. */
+    } else {
+        /* Rate in kbps, bucket in bits. */
+        /* msec * kbps = bits */
+        volume = bytes * 8;
+    }
+
+    /* Update all bands and find the one hit with the highest rate for each
+     * packet (if any). */
+    for (int m = 0; m < meter->n_bands; ++m) {
+        band = &meter->bands[m];
+
+        /* Update band's bucket. */
+        band->bucket += delta_t * band->up.rate;
+        if (band->bucket > band->up.burst_size) {
+            band->bucket = band->up.burst_size;
+        }
+
+        /* Drain the bucket for all the packets, if possible. */
+        if (band->bucket >= volume) {
+            band->bucket -= volume;
+        } else {
+            int band_exceeded_pkt;
+
+            /* Band limit hit, must process packet-by-packet. */
+            if (meter->flags & OFPMF13_PKTPS) {
+                band_exceeded_pkt = band->bucket / 1000;
+                band->bucket %= 1000; /* Remainder stays in bucket. */
+
+                /* Update the exceeding band for each exceeding packet.
+                 * (Only one band will be fired by a packet, and that
+                 * can be different for each packet.) */
+                for (i = band_exceeded_pkt; i < cnt; i++) {
+                    if (band->up.rate > exceeded_rate[i]) {
+                        exceeded_rate[i] = band->up.rate;
+                        exceeded_band[i] = m;
+                    }
+                }
+            } else {
+                /* Packet sizes differ, must process one-by-one. */
+                band_exceeded_pkt = cnt;
+                DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+                    uint32_t bits = dp_packet_size(packet) * 8;
+
+                    if (band->bucket >= bits) {
+                        band->bucket -= bits;
+                    } else {
+                        if (i < band_exceeded_pkt) {
+                            band_exceeded_pkt = i;
+                        }
+                        /* Update the exceeding band for the exceeding packet.
+                         * (Only one band will be fired by a packet, and that
+                         * can be different for each packet.) */
+                        if (band->up.rate > exceeded_rate[i]) {
+                            exceeded_rate[i] = band->up.rate;
+                            exceeded_band[i] = m;
+                        }
+                    }
+                }
+            }
+            /* Remember the first exceeding packet. */
+            if (exceeded_pkt > band_exceeded_pkt) {
+                exceeded_pkt = band_exceeded_pkt;
+            }
+        }
+    }
+
+    /* Fire the highest rate band exceeded by each packet.
+     * Drop packets if needed, by swapping packet to the end that will be
+     * ignored. */
+    size_t j;
+    DP_PACKET_BATCH_REFILL_FOR_EACH (j, cnt, packet, packets_) {
+        if (exceeded_band[j] >= 0) {
+            /* Meter drop packet. */
+            band = &meter->bands[exceeded_band[j]];
+            band->packet_count += 1;
+            band->byte_count += dp_packet_size(packet);
+
+            dp_packet_delete(packet);
+        } else {
+            /* Meter accepts packet. */
+            dp_packet_batch_refill(packets_, packet, j);
+        }
+    }
+ out:
+    meter_unlock(dp, meter_id);
+}
+
+/* Meter set/get/del processing is still single-threaded. */
+static int
+dpif_netdev_meter_set(struct dpif *dpif, ofproto_meter_id *meter_id,
+                      struct ofputil_meter_config *config)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    uint32_t mid = meter_id->uint32;
+    struct dp_meter *meter;
+    int i;
+
+    if (mid >= MAX_METERS) {
+        return EFBIG; /* Meter_id out of range. */
+    }
+
+    if (config->flags & ~DP_SUPPORTED_METER_FLAGS_MASK ||
+        !(config->flags & (OFPMF13_KBPS | OFPMF13_PKTPS))) {
+        return EBADF; /* Unsupported flags set */
+    }
+
+    /* Validate bands */
+    if (config->n_bands == 0 || config->n_bands > MAX_BANDS) {
+        return EINVAL; /* Too many bands */
+    }
+
+    /* Validate rates */
+    for (i = 0; i < config->n_bands; i++) {
+        if (config->bands[i].rate == 0) {
+            return EDOM; /* rate must be non-zero */
+        }
+    }
+
+    for (i = 0; i < config->n_bands; ++i) {
+        switch (config->bands[i].type) {
+        case OFPMBT13_DROP:
+            break;
+        default:
+            return ENODEV; /* Unsupported band type */
+        }
+    }
+
+    /* Allocate meter */
+    meter = xzalloc(sizeof *meter
+                    + config->n_bands * sizeof(struct dp_meter_band));
+    if (meter) {
+        meter->flags = config->flags;
+        meter->n_bands = config->n_bands;
+        meter->max_delta_t = 0;
+        meter->used = time_msec();
+
+        /* set up bands */
+        for (i = 0; i < config->n_bands; ++i) {
+            uint32_t band_max_delta_t;
+
+            /* Set burst size to a workable value if none specified. */
+            if (config->bands[i].burst_size == 0) {
+                config->bands[i].burst_size = config->bands[i].rate;
+            }
+
+            meter->bands[i].up = config->bands[i];
+            /* Convert burst size to the bucket units: */
+            /* pkts => 1/1000 packets, kilobits => bits. */
+            meter->bands[i].up.burst_size *= 1000;
+            /* Initialize bucket to empty. */
+            meter->bands[i].bucket = 0;
+
+            /* Figure out max delta_t that is enough to fill any bucket. */
+            band_max_delta_t
+                = meter->bands[i].up.burst_size / meter->bands[i].up.rate;
+            if (band_max_delta_t > meter->max_delta_t) {
+                meter->max_delta_t = band_max_delta_t;
+            }
+        }
+
+        meter_lock(dp, mid);
+        dp_delete_meter(dp, mid); /* Free existing meter, if any */
+        dp->meters[mid] = meter;
+        meter_unlock(dp, mid);
+
+        return 0;
+    }
+    return ENOMEM;
+}
+
+static int
+dpif_netdev_meter_get(const struct dpif *dpif,
+                      ofproto_meter_id meter_id_,
+                      struct ofputil_meter_stats *stats, uint16_t n_bands)
+{
+    const struct dp_netdev *dp = get_dp_netdev(dpif);
+    const struct dp_meter *meter;
+    uint32_t meter_id = meter_id_.uint32;
+
+    if (meter_id >= MAX_METERS) {
+        return EFBIG;
+    }
+    meter = dp->meters[meter_id];
+    if (!meter) {
+        return ENOENT;
+    }
+    if (stats) {
+        int i = 0;
+
+        meter_lock(dp, meter_id);
+        stats->packet_in_count = meter->packet_count;
+        stats->byte_in_count = meter->byte_count;
+
+        for (i = 0; i < n_bands && i < meter->n_bands; ++i) {
+            stats->bands[i].packet_count = meter->bands[i].packet_count;
+            stats->bands[i].byte_count = meter->bands[i].byte_count;
+        }
+        meter_unlock(dp, meter_id);
+
+        stats->n_bands = i;
+    }
+    return 0;
+}
+
+static int
+dpif_netdev_meter_del(struct dpif *dpif,
+                      ofproto_meter_id meter_id_,
+                      struct ofputil_meter_stats *stats, uint16_t n_bands)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    int error;
+
+    error = dpif_netdev_meter_get(dpif, meter_id_, stats, n_bands);
+    if (!error) {
+        uint32_t meter_id = meter_id_.uint32;
+
+        meter_lock(dp, meter_id);
+        dp_delete_meter(dp, meter_id);
+        meter_unlock(dp, meter_id);
+    }
+    return error;
+}
+
+\f
 static void
 dpif_netdev_disable_upcall(struct dpif *dpif)
     OVS_NO_THREAD_SAFETY_ANALYSIS
@@ -3245,16 +4469,9 @@ dp_netdev_set_nonpmd(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex)
 {
     struct dp_netdev_pmd_thread *non_pmd;
-    struct dp_netdev_port *port;
 
     non_pmd = xzalloc(sizeof *non_pmd);
     dp_netdev_configure_pmd(non_pmd, dp, NON_PMD_CORE_ID, OVS_NUMA_UNSPEC);
-
-    HMAP_FOR_EACH (port, node, &dp->ports) {
-        dp_netdev_add_port_tx_to_pmd(non_pmd, port);
-    }
-
-    dp_netdev_reload_pmd__(non_pmd);
 }
 
 /* Caller must have valid pointer to 'pmd'. */
@@ -3300,12 +4517,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd->dp = dp;
     pmd->core_id = core_id;
     pmd->numa_id = numa_id;
-    pmd->poll_cnt = 0;
-
-    atomic_init(&pmd->static_tx_qid,
-                (core_id == NON_PMD_CORE_ID)
-                ? ovs_numa_get_n_cores()
-                : get_n_pmd_threads(dp));
+    pmd->need_reload = false;
 
     ovs_refcount_init(&pmd->ref_cnt);
     latch_init(&pmd->exit_latch);
@@ -3319,7 +4531,8 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     cmap_init(&pmd->flow_table);
     cmap_init(&pmd->classifiers);
     pmd->next_optimization = time_msec() + DPCLS_OPTIMIZATION_INTERVAL;
-    ovs_list_init(&pmd->poll_list);
+    pmd->rxq_next_cycle_store = time_msec() + PMD_RXQ_INTERVAL_LEN;
+    hmap_init(&pmd->poll_list);
     hmap_init(&pmd->tx_ports);
     hmap_init(&pmd->tnl_port_cache);
     hmap_init(&pmd->send_port_cache);
@@ -3327,6 +4540,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
      * actual thread created for NON_PMD_CORE_ID. */
     if (core_id == NON_PMD_CORE_ID) {
         emc_cache_init(&pmd->flow_cache);
+        pmd_alloc_static_tx_qid(pmd);
     }
     cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
                 hash_int(core_id, 0));
@@ -3341,6 +4555,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
     hmap_destroy(&pmd->send_port_cache);
     hmap_destroy(&pmd->tnl_port_cache);
     hmap_destroy(&pmd->tx_ports);
+    hmap_destroy(&pmd->poll_list);
     /* All flows (including their dpcls_rules) have been deleted already */
     CMAP_FOR_EACH (cls, node, &pmd->classifiers) {
         dpcls_destroy(cls);
@@ -3368,11 +4583,11 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
         ovs_mutex_lock(&dp->non_pmd_mutex);
         emc_cache_uninit(&pmd->flow_cache);
         pmd_free_cached_ports(pmd);
+        pmd_free_static_tx_qid(pmd);
         ovs_mutex_unlock(&dp->non_pmd_mutex);
     } else {
         latch_set(&pmd->exit_latch);
         dp_netdev_reload_pmd__(pmd);
-        ovs_numa_unpin_core(pmd->core_id);
         xpthread_join(pmd->thread, NULL);
     }
 
@@ -3387,20 +4602,20 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
     dp_netdev_pmd_unref(pmd);
 }
 
-/* Destroys all pmd threads, but not the non pmd thread. */
+/* Destroys all pmd threads. If 'non_pmd' is true it also destroys the non pmd
+ * thread. */
 static void
-dp_netdev_stop_pmds(struct dp_netdev *dp)
+dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd)
 {
     struct dp_netdev_pmd_thread *pmd;
     struct dp_netdev_pmd_thread **pmd_list;
     size_t k = 0, n_pmds;
 
-    n_pmds = get_n_pmd_threads(dp);
+    n_pmds = cmap_count(&dp->poll_threads);
     pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        /* We don't need to destroy the non pmd thread */
-        if (pmd->core_id == NON_PMD_CORE_ID) {
+        if (!non_pmd && pmd->core_id == NON_PMD_CORE_ID) {
             continue;
         }
         /* We cannot call dp_netdev_del_pmd(), since it alters
@@ -3416,32 +4631,6 @@ dp_netdev_stop_pmds(struct dp_netdev *dp)
     free(pmd_list);
 }
 
-/* Destroys all pmd threads, including the non pmd thread. */
-static void
-dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
-{
-    struct dp_netdev_pmd_thread *pmd;
-    struct dp_netdev_pmd_thread **pmd_list;
-    size_t k = 0, n_pmds;
-
-    n_pmds = cmap_count(&dp->poll_threads);
-    pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
-
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        /* We cannot call dp_netdev_del_pmd(), since it alters
-         * 'dp->poll_threads' (while we're iterating it) and it
-         * might quiesce. */
-        ovs_assert(k < n_pmds);
-        pmd_list[k++] = pmd;
-    }
-
-    for (size_t i = 0; i < k; i++) {
-        dp_netdev_del_pmd(dp, pmd_list[i]);
-    }
-
-    free(pmd_list);
-}
-
 /* Deletes all rx queues from pmd->poll_list and all the ports from
  * pmd->tx_ports. */
 static void
@@ -3451,137 +4640,49 @@ dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd)
     struct tx_port *port;
 
     ovs_mutex_lock(&pmd->port_mutex);
-    LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) {
+    HMAP_FOR_EACH_POP (poll, node, &pmd->poll_list) {
         free(poll);
     }
-    pmd->poll_cnt = 0;
     HMAP_FOR_EACH_POP (port, node, &pmd->tx_ports) {
         free(port);
     }
     ovs_mutex_unlock(&pmd->port_mutex);
 }
 
-static struct tx_port *
-tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
-{
-    struct tx_port *tx;
-
-    HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
-        if (tx->port->port_no == port_no) {
-            return tx;
-        }
-    }
-
-    return NULL;
-}
-
-/* Deletes all rx queues of 'port' from 'poll_list', and the 'port' from
- * 'tx_ports' of 'pmd' thread.  Returns true if 'port' was found in 'pmd'
- * (therefore a restart is required). */
-static bool
-dp_netdev_del_port_from_pmd__(struct dp_netdev_port *port,
-                              struct dp_netdev_pmd_thread *pmd)
-{
-    struct rxq_poll *poll, *next;
-    struct tx_port *tx;
-    bool found = false;
-
-    ovs_mutex_lock(&pmd->port_mutex);
-    LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
-        if (poll->port == port) {
-            found = true;
-            ovs_list_remove(&poll->node);
-            pmd->poll_cnt--;
-            free(poll);
-        }
-    }
-
-    tx = tx_port_lookup(&pmd->tx_ports, port->port_no);
-    if (tx) {
-        hmap_remove(&pmd->tx_ports, &tx->node);
-        free(tx);
-        found = true;
-    }
-    ovs_mutex_unlock(&pmd->port_mutex);
-
-    return found;
-}
-
-/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
- * threads.  The pmd threads that need to be restarted are inserted in
- * 'to_reload'. */
+/* Adds rx queue to poll_list of PMD thread, if it's not there already. */
 static void
-dp_netdev_del_port_from_all_pmds__(struct dp_netdev *dp,
-                                   struct dp_netdev_port *port,
-                                   struct hmapx *to_reload)
+dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
+                         struct dp_netdev_rxq *rxq)
+    OVS_REQUIRES(pmd->port_mutex)
 {
-    struct dp_netdev_pmd_thread *pmd;
-
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        bool found;
-
-        found = dp_netdev_del_port_from_pmd__(port, pmd);
+    int qid = netdev_rxq_get_queue_id(rxq->rx);
+    uint32_t hash = hash_2words(odp_to_u32(rxq->port->port_no), qid);
+    struct rxq_poll *poll;
 
-        if (found) {
-            hmapx_add(to_reload, pmd);
+    HMAP_FOR_EACH_WITH_HASH (poll, node, hash, &pmd->poll_list) {
+        if (poll->rxq == rxq) {
+            /* 'rxq' is already polled by this thread. Do nothing. */
+            return;
         }
     }
-}
-
-/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
- * threads. Reloads the threads if needed. */
-static void
-dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
-                                 struct dp_netdev_port *port)
-{
-    struct dp_netdev_pmd_thread *pmd;
-    struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
-    struct hmapx_node *node;
-
-    dp_netdev_del_port_from_all_pmds__(dp, port, &to_reload);
-
-    HMAPX_FOR_EACH (node, &to_reload) {
-        pmd = (struct dp_netdev_pmd_thread *) node->data;
-        dp_netdev_reload_pmd__(pmd);
-    }
-
-    hmapx_destroy(&to_reload);
-}
-
-
-/* Returns non-isolated PMD thread from this numa node with fewer
- * rx queues to poll. Returns NULL if there is no non-isolated  PMD threads
- * on this numa node. Can be called safely only by main thread. */
-static struct dp_netdev_pmd_thread *
-dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
-{
-    int min_cnt = -1;
-    struct dp_netdev_pmd_thread *pmd, *res = NULL;
 
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        if (!pmd->isolated && pmd->numa_id == numa_id
-            && (min_cnt > pmd->poll_cnt || res == NULL)) {
-            min_cnt = pmd->poll_cnt;
-            res = pmd;
-        }
-    }
+    poll = xmalloc(sizeof *poll);
+    poll->rxq = rxq;
+    hmap_insert(&pmd->poll_list, &poll->node, hash);
 
-    return res;
+    pmd->need_reload = true;
 }
 
-/* Adds rx queue to poll_list of PMD thread. */
+/* Delete 'poll' from poll_list of PMD thread. */
 static void
-dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
-                         struct dp_netdev_port *port, struct netdev_rxq *rx)
+dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
+                           struct rxq_poll *poll)
     OVS_REQUIRES(pmd->port_mutex)
 {
-    struct rxq_poll *poll = xmalloc(sizeof *poll);
+    hmap_remove(&pmd->poll_list, &poll->node);
+    free(poll);
 
-    poll->port = port;
-    poll->rx = rx;
-
-    ovs_list_push_back(&pmd->poll_list, &poll->node);
-    pmd->poll_cnt++;
+    pmd->need_reload = true;
 }
 
 /* Add 'port' to the tx port cache of 'pmd', which must be reloaded for the
@@ -3589,190 +4690,37 @@ dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
 static void
 dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
                              struct dp_netdev_port *port)
+    OVS_REQUIRES(pmd->port_mutex)
 {
     struct tx_port *tx;
 
+    tx = tx_port_lookup(&pmd->tx_ports, port->port_no);
+    if (tx) {
+        /* 'port' is already on this thread tx cache. Do nothing. */
+        return;
+    }
+
     tx = xzalloc(sizeof *tx);
 
     tx->port = port;
     tx->qid = -1;
 
-    ovs_mutex_lock(&pmd->port_mutex);
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
-    ovs_mutex_unlock(&pmd->port_mutex);
-}
-
-/* Distribute all {pinned|non-pinned} rx queues of 'port' between PMD
- * threads in 'dp'. The pmd threads that need to be restarted are inserted
- * in 'to_reload'. PMD threads with pinned queues marked as isolated. */
-static void
-dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,
-                              struct dp_netdev_port *port,
-                              struct hmapx *to_reload, bool pinned)
-{
-    int numa_id = netdev_get_numa_id(port->netdev);
-    struct dp_netdev_pmd_thread *pmd;
-    int i;
-
-    if (!netdev_is_pmd(port->netdev)) {
-        return;
-    }
-
-    for (i = 0; i < port->n_rxq; i++) {
-        if (pinned) {
-            if (port->rxqs[i].core_id == OVS_CORE_UNSPEC) {
-                continue;
-            }
-            pmd = dp_netdev_get_pmd(dp, port->rxqs[i].core_id);
-            if (!pmd) {
-                VLOG_WARN("There is no PMD thread on core %d. "
-                          "Queue %d on port \'%s\' will not be polled.",
-                          port->rxqs[i].core_id, i,
-                          netdev_get_name(port->netdev));
-                continue;
-            }
-            pmd->isolated = true;
-            dp_netdev_pmd_unref(pmd);
-        } else {
-            if (port->rxqs[i].core_id != OVS_CORE_UNSPEC) {
-                continue;
-            }
-            pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
-            if (!pmd) {
-                VLOG_WARN("There's no available pmd thread on numa node %d",
-                          numa_id);
-                break;
-            }
-        }
-
-        ovs_mutex_lock(&pmd->port_mutex);
-        dp_netdev_add_rxq_to_pmd(pmd, port, port->rxqs[i].rxq);
-        ovs_mutex_unlock(&pmd->port_mutex);
-
-        hmapx_add(to_reload, pmd);
-    }
-}
-
-/* Distributes all non-pinned rx queues of 'port' between all PMD threads
- * in 'dp' and inserts 'port' in the PMD threads 'tx_ports'. The pmd threads
- * that need to be restarted are inserted in 'to_reload'. */
-static void
-dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,
-                             struct hmapx *to_reload)
-{
-    struct dp_netdev_pmd_thread *pmd;
-
-    dp_netdev_add_port_rx_to_pmds(dp, port, to_reload, false);
-
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        dp_netdev_add_port_tx_to_pmd(pmd, port);
-        hmapx_add(to_reload, pmd);
-    }
-}
-
-/* Distributes all non-pinned rx queues of 'port' between all PMD threads
- * in 'dp', inserts 'port' in the PMD threads 'tx_ports' and reloads them,
- * if needed. */
-static void
-dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
-{
-    struct dp_netdev_pmd_thread *pmd;
-    struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
-    struct hmapx_node *node;
-
-    dp_netdev_add_port_to_pmds__(dp, port, &to_reload);
-
-    HMAPX_FOR_EACH (node, &to_reload) {
-        pmd = (struct dp_netdev_pmd_thread *) node->data;
-        dp_netdev_reload_pmd__(pmd);
-    }
-
-    hmapx_destroy(&to_reload);
-}
-
-static void
-dp_netdev_start_pmds_on_numa(struct dp_netdev *dp, int numa_id)
-{
-    int can_have, n_unpinned, i;
-
-    n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
-    if (!n_unpinned) {
-        VLOG_WARN("Cannot create pmd threads due to out of unpinned "
-                  "cores on numa node %d", numa_id);
-        return;
-    }
-
-    /* If cpu mask is specified, uses all unpinned cores, otherwise
-     * tries creating NR_PMD_THREADS pmd threads. */
-    can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS);
-    for (i = 0; i < can_have; i++) {
-        unsigned core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
-        struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
-        struct dp_netdev_port *port;
-
-        dp_netdev_configure_pmd(pmd, dp, core_id, numa_id);
-
-        HMAP_FOR_EACH (port, node, &dp->ports) {
-            dp_netdev_add_port_tx_to_pmd(pmd, port);
-        }
-
-        pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
-    }
-    VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id);
+    pmd->need_reload = true;
 }
 
-/* Starts pmd threads, if not already started. The function takes care of
- * filling the threads tx port cache. */
+/* Del 'tx' from the tx port cache of 'pmd', which must be reloaded for the
+ * changes to take effect. */
 static void
-dp_netdev_start_pmds(struct dp_netdev *dp)
-    OVS_REQUIRES(dp->port_mutex)
+dp_netdev_del_port_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
+                               struct tx_port *tx)
+    OVS_REQUIRES(pmd->port_mutex)
 {
-    int n_pmds;
-
-    n_pmds = get_n_pmd_threads(dp);
-
-    /* If there are already pmd threads created for the datapath, do nothing.
-     * Else, creates the pmd threads. */
-    if (!n_pmds) {
-        int n_numas = ovs_numa_get_n_numas();
-
-        for (int numa_id = 0; numa_id < n_numas; numa_id++) {
-            dp_netdev_start_pmds_on_numa(dp, numa_id);
-        }
-    }
+    hmap_remove(&pmd->tx_ports, &tx->node);
+    free(tx);
+    pmd->need_reload = true;
 }
-
 \f
-/* Called after pmd threads config change.  Restarts pmd threads with
- * new configuration. */
-static void
-dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
-    OVS_REQUIRES(dp->port_mutex)
-{
-    struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
-    struct dp_netdev_pmd_thread *pmd;
-    struct dp_netdev_port *port;
-    struct hmapx_node *node;
-
-    dp_netdev_start_pmds(dp);
-    /* Distribute only pinned rx queues first to mark threads as isolated */
-    HMAP_FOR_EACH (port, node, &dp->ports) {
-        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, true);
-    }
-
-    /* Distribute remaining non-pinned rx queues to non-isolated PMD threads. */
-    HMAP_FOR_EACH (port, node, &dp->ports) {
-        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, false);
-    }
-
-    HMAPX_FOR_EACH (node, &to_reload) {
-        pmd = (struct dp_netdev_pmd_thread *) node->data;
-        dp_netdev_reload_pmd__(pmd);
-    }
-
-    hmapx_destroy(&to_reload);
-}
-
 static char *
 dpif_netdev_get_datapath_version(void)
 {
@@ -3824,8 +4772,7 @@ dp_netdev_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet_,
 
         ofpbuf_init(&key, 0);
         odp_flow_key_from_flow(&odp_parms, &key);
-        packet_str = ofp_packet_to_string(dp_packet_data(packet_),
-                                          dp_packet_size(packet_));
+        packet_str = ofp_dp_packet_to_string(packet_);
 
         odp_flow_key_format(key.data, key.size, &ds);
 
@@ -3842,6 +4789,22 @@ dp_netdev_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet_,
                          actions, wc, put_actions, dp->upcall_aux);
 }
 
+static inline uint32_t
+dpif_netdev_packet_get_rss_hash_orig_pkt(struct dp_packet *packet,
+                                const struct miniflow *mf)
+{
+    uint32_t hash;
+
+    if (OVS_LIKELY(dp_packet_rss_valid(packet))) {
+        hash = dp_packet_get_rss_hash(packet);
+    } else {
+        hash = miniflow_hash_5tuple(mf, 0);
+        dp_packet_set_rss_hash(packet, hash);
+    }
+
+    return hash;
+}
+
 static inline uint32_t
 dpif_netdev_packet_get_rss_hash(struct dp_packet *packet,
                                 const struct miniflow *mf)
@@ -3915,7 +4878,8 @@ packet_batch_per_flow_execute(struct packet_batch_per_flow *batch,
 static inline void
 dp_netdev_queue_batches(struct dp_packet *pkt,
                         struct dp_netdev_flow *flow, const struct miniflow *mf,
-                        struct packet_batch_per_flow *batches, size_t *n_batches)
+                        struct packet_batch_per_flow *batches,
+                        size_t *n_batches)
 {
     struct packet_batch_per_flow *batch = flow->batch;
 
@@ -3935,24 +4899,31 @@ dp_netdev_queue_batches(struct dp_packet *pkt,
  * The function returns the number of packets that needs to be processed in the
  * 'packets' array (they have been moved to the beginning of the vector).
  *
- * If 'md_is_valid' is false, the metadata in 'packets' is not valid and must be
- * initialized by this function using 'port_no'.
+ * For performance reasons a caller may choose not to initialize the metadata
+ * in 'packets_'.  If 'md_is_valid' is false, the metadata in 'packets'
+ * is not valid and must be initialized by this function using 'port_no'.
+ * If 'md_is_valid' is true, the metadata is already valid and 'port_no'
+ * will be ignored.
  */
 static inline size_t
-emc_processing(struct dp_netdev_pmd_thread *pmd, struct dp_packet_batch *packets_,
+emc_processing(struct dp_netdev_pmd_thread *pmd,
+               struct dp_packet_batch *packets_,
                struct netdev_flow_key *keys,
                struct packet_batch_per_flow batches[], size_t *n_batches,
                bool md_is_valid, odp_port_t port_no)
 {
     struct emc_cache *flow_cache = &pmd->flow_cache;
     struct netdev_flow_key *key = &keys[0];
-    size_t i, n_missed = 0, n_dropped = 0;
-    struct dp_packet **packets = packets_->packets;
-    int cnt = packets_->count;
+    size_t n_missed = 0, n_dropped = 0;
+    struct dp_packet *packet;
+    const size_t cnt = dp_packet_batch_size(packets_);
+    uint32_t cur_min;
+    int i;
 
-    for (i = 0; i < cnt; i++) {
+    atomic_read_relaxed(&pmd->dp->emc_insert_min, &cur_min);
+
+    DP_PACKET_BATCH_REFILL_FOR_EACH (i, cnt, packet, packets_) {
         struct dp_netdev_flow *flow;
-        struct dp_packet *packet = packets[i];
 
         if (OVS_UNLIKELY(dp_packet_size(packet) < ETH_HEADER_LEN)) {
             dp_packet_delete(packet);
@@ -3961,6 +4932,7 @@ emc_processing(struct dp_netdev_pmd_thread *pmd, struct dp_packet_batch *packets
         }
 
         if (i != cnt - 1) {
+            struct dp_packet **packets = packets_->packets;
             /* Prefetch next packet data and metadata. */
             OVS_PREFETCH(dp_packet_data(packets[i+1]));
             pkt_metadata_prefetch_init(&packets[i+1]->md);
@@ -3971,16 +4943,25 @@ emc_processing(struct dp_netdev_pmd_thread *pmd, struct dp_packet_batch *packets
         }
         miniflow_extract(packet, &key->mf);
         key->len = 0; /* Not computed yet. */
-        key->hash = dpif_netdev_packet_get_rss_hash(packet, &key->mf);
-
-        flow = emc_lookup(flow_cache, key);
+        /* If EMC is disabled skip hash computation and emc_lookup */
+        if (cur_min) {
+            if (!md_is_valid) {
+                key->hash = dpif_netdev_packet_get_rss_hash_orig_pkt(packet,
+                        &key->mf);
+            } else {
+                key->hash = dpif_netdev_packet_get_rss_hash(packet, &key->mf);
+            }
+            flow = emc_lookup(flow_cache, key);
+        } else {
+            flow = NULL;
+        }
         if (OVS_LIKELY(flow)) {
             dp_netdev_queue_batches(packet, flow, &key->mf, batches,
                                     n_batches);
         } else {
             /* Exact match cache missed. Group missed packets together at
-             * the beginning of the 'packets' array.  */
-            packets[n_missed] = packet;
+             * the beginning of the 'packets' array. */
+            dp_packet_batch_refill(packets_, packet, i);
             /* 'key[n_missed]' contains the key of the current packet and it
              * must be returned to the caller. The next key should be extracted
              * to 'keys[n_missed + 1]'. */
@@ -3988,13 +4969,15 @@ emc_processing(struct dp_netdev_pmd_thread *pmd, struct dp_packet_batch *packets
         }
     }
 
-    dp_netdev_count_packet(pmd, DP_STAT_EXACT_HIT, cnt - n_dropped - n_missed);
+    dp_netdev_count_packet(pmd, DP_STAT_EXACT_HIT,
+                           cnt - n_dropped - n_missed);
 
-    return n_missed;
+    return dp_packet_batch_size(packets_);
 }
 
 static inline void
-handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet,
+handle_packet_upcall(struct dp_netdev_pmd_thread *pmd,
+                     struct dp_packet *packet,
                      const struct netdev_flow_key *key,
                      struct ofpbuf *actions, struct ofpbuf *put_actions,
                      int *lost_cnt, long long now)
@@ -4027,14 +5010,14 @@ handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet,
      * VLAN.  Unless we refactor a lot of code that translates between
      * Netlink and struct flow representations, we have to do the same
      * here. */
-    if (!match.wc.masks.vlan_tci) {
-        match.wc.masks.vlan_tci = htons(0xffff);
+    if (!match.wc.masks.vlans[0].tci) {
+        match.wc.masks.vlans[0].tci = htons(0xffff);
     }
 
     /* We can't allow the packet batching in the next loop to execute
      * the actions.  Otherwise, if there are any slow path actions,
      * we'll send the packet up twice. */
-    packet_batch_init_packet(&b, packet);
+    dp_packet_batch_init_packet(&b, packet);
     dp_netdev_execute_actions(pmd, &b, true, &match.flow,
                               actions->data, actions->size, now);
 
@@ -4056,8 +5039,7 @@ handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet,
                                              add_actions->size);
         }
         ovs_mutex_unlock(&pmd->flow_mutex);
-
-        emc_insert(&pmd->flow_cache, key, netdev_flow);
+        emc_probabilistic_insert(pmd, key, netdev_flow);
     }
 }
 
@@ -4069,18 +5051,17 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
                      odp_port_t in_port,
                      long long now)
 {
-    int cnt = packets_->count;
+    const size_t cnt = dp_packet_batch_size(packets_);
 #if !defined(__CHECKER__) && !defined(_WIN32)
     const size_t PKT_ARRAY_SIZE = cnt;
 #else
     /* Sparse or MSVC doesn't like variable length array. */
     enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
 #endif
-    struct dp_packet **packets = packets_->packets;
+    struct dp_packet *packet;
     struct dpcls *cls;
     struct dpcls_rule *rules[PKT_ARRAY_SIZE];
     struct dp_netdev *dp = pmd->dp;
-    struct emc_cache *flow_cache = &pmd->flow_cache;
     int miss_cnt = 0, lost_cnt = 0;
     int lookup_cnt = 0, add_lookup_cnt;
     bool any_miss;
@@ -4105,7 +5086,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
         ofpbuf_use_stub(&actions, actions_stub, sizeof actions_stub);
         ofpbuf_use_stub(&put_actions, slow_stub, sizeof slow_stub);
 
-        for (i = 0; i < cnt; i++) {
+        DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
             struct dp_netdev_flow *netdev_flow;
 
             if (OVS_LIKELY(rules[i])) {
@@ -4124,26 +5105,24 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
             }
 
             miss_cnt++;
-            handle_packet_upcall(pmd, packets[i], &keys[i], &actions,
+            handle_packet_upcall(pmd, packet, &keys[i], &actions,
                                  &put_actions, &lost_cnt, now);
         }
 
         ofpbuf_uninit(&actions);
         ofpbuf_uninit(&put_actions);
         fat_rwlock_unlock(&dp->upcall_rwlock);
-        dp_netdev_count_packet(pmd, DP_STAT_LOST, lost_cnt);
     } else if (OVS_UNLIKELY(any_miss)) {
-        for (i = 0; i < cnt; i++) {
+        DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
             if (OVS_UNLIKELY(!rules[i])) {
-                dp_packet_delete(packets[i]);
+                dp_packet_delete(packet);
                 lost_cnt++;
                 miss_cnt++;
             }
         }
     }
 
-    for (i = 0; i < cnt; i++) {
-        struct dp_packet *packet = packets[i];
+    DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
         struct dp_netdev_flow *flow;
 
         if (OVS_UNLIKELY(!rules[i])) {
@@ -4152,7 +5131,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
 
         flow = dp_netdev_flow_cast(rules[i]);
 
-        emc_insert(flow_cache, &keys[i], flow);
+        emc_probabilistic_insert(pmd, &keys[i], flow);
         dp_netdev_queue_batches(packet, flow, &keys[i].mf, batches, n_batches);
     }
 
@@ -4164,36 +5143,34 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
 
 /* Packets enter the datapath from a port (or from recirculation) here.
  *
- * For performance reasons a caller may choose not to initialize the metadata
- * in 'packets': in this case 'mdinit' is false and this function needs to
- * initialize it using 'port_no'.  If the metadata in 'packets' is already
- * valid, 'md_is_valid' must be true and 'port_no' will be ignored. */
+ * When 'md_is_valid' is true the metadata in 'packets' are already valid.
+ * When false the metadata in 'packets' need to be initialized. */
 static void
 dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
                   struct dp_packet_batch *packets,
                   bool md_is_valid, odp_port_t port_no)
 {
-    int cnt = packets->count;
 #if !defined(__CHECKER__) && !defined(_WIN32)
-    const size_t PKT_ARRAY_SIZE = cnt;
+    const size_t PKT_ARRAY_SIZE = dp_packet_batch_size(packets);
 #else
     /* Sparse or MSVC doesn't like variable length array. */
     enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
 #endif
-    OVS_ALIGNED_VAR(CACHE_LINE_SIZE) struct netdev_flow_key keys[PKT_ARRAY_SIZE];
+    OVS_ALIGNED_VAR(CACHE_LINE_SIZE)
+        struct netdev_flow_key keys[PKT_ARRAY_SIZE];
     struct packet_batch_per_flow batches[PKT_ARRAY_SIZE];
     long long now = time_msec();
-    size_t newcnt, n_batches, i;
+    size_t n_batches;
     odp_port_t in_port;
 
     n_batches = 0;
-    newcnt = emc_processing(pmd, packets, keys, batches, &n_batches,
+    emc_processing(pmd, packets, keys, batches, &n_batches,
                             md_is_valid, port_no);
-    if (OVS_UNLIKELY(newcnt)) {
-        packets->count = newcnt;
+    if (!dp_packet_batch_is_empty(packets)) {
         /* Get ingress port from first packet's metadata. */
         in_port = packets->packets[0]->md.in_port.odp_port;
-        fast_path_processing(pmd, packets, keys, batches, &n_batches, in_port, now);
+        fast_path_processing(pmd, packets, keys, batches, &n_batches,
+                             in_port, now);
     }
 
     /* All the flow batches need to be reset before any call to
@@ -4205,6 +5182,7 @@ dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
      * already its own batches[k] still waiting to be served.  So if its
      * ‘batch’ member is not reset, the recirculated packet would be wrongly
      * appended to batches[k] of the 1st call to dp_netdev_input__(). */
+    size_t i;
     for (i = 0; i < n_batches; i++) {
         batches[i].flow->batch = NULL;
     }
@@ -4349,7 +5327,7 @@ push_tnl_action(const struct dp_netdev_pmd_thread *pmd,
 
     data = nl_attr_get(attr);
 
-    tun_port = pmd_tnl_port_cache_lookup(pmd, u32_to_odp(data->tnl_port));
+    tun_port = pmd_tnl_port_cache_lookup(pmd, data->tnl_port);
     if (!tun_port) {
         err = -EINVAL;
         goto error;
@@ -4379,7 +5357,7 @@ dp_execute_userspace_action(struct dp_netdev_pmd_thread *pmd,
                              DPIF_UC_ACTION, userdata, actions,
                              NULL);
     if (!error || error == ENOSPC) {
-        packet_batch_init_packet(&b, packet);
+        dp_packet_batch_init_packet(&b, packet);
         dp_netdev_execute_actions(pmd, &b, may_steal, flow,
                                   actions->data, actions->size, now);
     } else if (may_steal) {
@@ -4390,6 +5368,7 @@ dp_execute_userspace_action(struct dp_netdev_pmd_thread *pmd,
 static void
 dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
               const struct nlattr *a, bool may_steal)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
     struct dp_netdev_execute_aux *aux = aux_;
     uint32_t *depth = recirc_depth_get();
@@ -4410,7 +5389,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
             if (dynamic_txqs) {
                 tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
             } else {
-                atomic_read_relaxed(&pmd->static_tx_qid, &tx_qid);
+                tx_qid = pmd->static_tx_qid;
             }
 
             netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
@@ -4421,24 +5400,8 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
 
     case OVS_ACTION_ATTR_TUNNEL_PUSH:
         if (*depth < MAX_RECIRC_DEPTH) {
-            struct dp_packet_batch tnl_pkt;
-            struct dp_packet_batch *orig_packets_ = packets_;
-            int err;
-
-            if (!may_steal) {
-                dp_packet_batch_clone(&tnl_pkt, packets_);
-                packets_ = &tnl_pkt;
-                dp_packet_batch_reset_cutlen(orig_packets_);
-            }
-
             dp_packet_batch_apply_cutlen(packets_);
-
-            err = push_tnl_action(pmd, a, packets_);
-            if (!err) {
-                (*depth)++;
-                dp_netdev_recirculate(pmd, packets_);
-                (*depth)--;
-            }
+            push_tnl_action(pmd, a, packets_);
             return;
         }
         break;
@@ -4451,7 +5414,6 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
             p = pmd_tnl_port_cache_lookup(pmd, portno);
             if (p) {
                 struct dp_packet_batch tnl_pkt;
-                int i;
 
                 if (!may_steal) {
                     dp_packet_batch_clone(&tnl_pkt, packets_);
@@ -4462,12 +5424,13 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 dp_packet_batch_apply_cutlen(packets_);
 
                 netdev_pop_header(p->port->netdev, packets_);
-                if (!packets_->count) {
+                if (dp_packet_batch_is_empty(packets_)) {
                     return;
                 }
 
-                for (i = 0; i < packets_->count; i++) {
-                    packets_->packets[i]->md.in_port.odp_port = portno;
+                struct dp_packet *packet;
+                DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+                    packet->md.in_port.odp_port = portno;
                 }
 
                 (*depth)++;
@@ -4481,14 +5444,12 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
     case OVS_ACTION_ATTR_USERSPACE:
         if (!fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
             struct dp_packet_batch *orig_packets_ = packets_;
-            struct dp_packet **packets = packets_->packets;
             const struct nlattr *userdata;
             struct dp_packet_batch usr_pkt;
             struct ofpbuf actions;
             struct flow flow;
             ovs_u128 ufid;
             bool clone = false;
-            int i;
 
             userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
             ofpbuf_init(&actions, 0);
@@ -4497,7 +5458,6 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 if (!may_steal) {
                     dp_packet_batch_clone(&usr_pkt, packets_);
                     packets_ = &usr_pkt;
-                    packets = packets_->packets;
                     clone = true;
                     dp_packet_batch_reset_cutlen(orig_packets_);
                 }
@@ -4505,10 +5465,11 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 dp_packet_batch_apply_cutlen(packets_);
             }
 
-            for (i = 0; i < packets_->count; i++) {
-                flow_extract(packets[i], &flow);
+            struct dp_packet *packet;
+            DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+                flow_extract(packet, &flow);
                 dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid);
-                dp_execute_userspace_action(pmd, packets[i], may_steal, &flow,
+                dp_execute_userspace_action(pmd, packet, may_steal, &flow,
                                             &ufid, &actions, userdata, now);
             }
 
@@ -4526,15 +5487,15 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
     case OVS_ACTION_ATTR_RECIRC:
         if (*depth < MAX_RECIRC_DEPTH) {
             struct dp_packet_batch recirc_pkts;
-            int i;
 
             if (!may_steal) {
                dp_packet_batch_clone(&recirc_pkts, packets_);
                packets_ = &recirc_pkts;
             }
 
-            for (i = 0; i < packets_->count; i++) {
-                packets_->packets[i]->md.recirc_id = nl_attr_get_u32(a);
+            struct dp_packet *packet;
+            DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+                packet->md.recirc_id = nl_attr_get_u32(a);
             }
 
             (*depth)++;
@@ -4549,18 +5510,25 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
 
     case OVS_ACTION_ATTR_CT: {
         const struct nlattr *b;
+        bool force = false;
         bool commit = false;
         unsigned int left;
         uint16_t zone = 0;
         const char *helper = NULL;
         const uint32_t *setmark = NULL;
         const struct ovs_key_ct_labels *setlabel = NULL;
+        struct nat_action_info_t nat_action_info;
+        struct nat_action_info_t *nat_action_info_ref = NULL;
+        bool nat_config = false;
 
         NL_ATTR_FOR_EACH_UNSAFE (b, left, nl_attr_get(a),
                                  nl_attr_get_size(a)) {
             enum ovs_ct_attr sub_type = nl_attr_type(b);
 
             switch(sub_type) {
+            case OVS_CT_ATTR_FORCE_COMMIT:
+                force = true;
+                /* fall through. */
             case OVS_CT_ATTR_COMMIT:
                 commit = true;
                 break;
@@ -4576,18 +5544,102 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
             case OVS_CT_ATTR_LABELS:
                 setlabel = nl_attr_get(b);
                 break;
-            case OVS_CT_ATTR_NAT:
+            case OVS_CT_ATTR_EVENTMASK:
+                /* Silently ignored, as userspace datapath does not generate
+                 * netlink events. */
+                break;
+            case OVS_CT_ATTR_NAT: {
+                const struct nlattr *b_nest;
+                unsigned int left_nest;
+                bool ip_min_specified = false;
+                bool proto_num_min_specified = false;
+                bool ip_max_specified = false;
+                bool proto_num_max_specified = false;
+                memset(&nat_action_info, 0, sizeof nat_action_info);
+                nat_action_info_ref = &nat_action_info;
+
+                NL_NESTED_FOR_EACH_UNSAFE (b_nest, left_nest, b) {
+                    enum ovs_nat_attr sub_type_nest = nl_attr_type(b_nest);
+
+                    switch (sub_type_nest) {
+                    case OVS_NAT_ATTR_SRC:
+                    case OVS_NAT_ATTR_DST:
+                        nat_config = true;
+                        nat_action_info.nat_action |=
+                            ((sub_type_nest == OVS_NAT_ATTR_SRC)
+                                ? NAT_ACTION_SRC : NAT_ACTION_DST);
+                        break;
+                    case OVS_NAT_ATTR_IP_MIN:
+                        memcpy(&nat_action_info.min_addr,
+                               nl_attr_get(b_nest),
+                               nl_attr_get_size(b_nest));
+                        ip_min_specified = true;
+                        break;
+                    case OVS_NAT_ATTR_IP_MAX:
+                        memcpy(&nat_action_info.max_addr,
+                               nl_attr_get(b_nest),
+                               nl_attr_get_size(b_nest));
+                        ip_max_specified = true;
+                        break;
+                    case OVS_NAT_ATTR_PROTO_MIN:
+                        nat_action_info.min_port =
+                            nl_attr_get_u16(b_nest);
+                        proto_num_min_specified = true;
+                        break;
+                    case OVS_NAT_ATTR_PROTO_MAX:
+                        nat_action_info.max_port =
+                            nl_attr_get_u16(b_nest);
+                        proto_num_max_specified = true;
+                        break;
+                    case OVS_NAT_ATTR_PERSISTENT:
+                    case OVS_NAT_ATTR_PROTO_HASH:
+                    case OVS_NAT_ATTR_PROTO_RANDOM:
+                        break;
+                    case OVS_NAT_ATTR_UNSPEC:
+                    case __OVS_NAT_ATTR_MAX:
+                        OVS_NOT_REACHED();
+                    }
+                }
+
+                if (ip_min_specified && !ip_max_specified) {
+                    nat_action_info.max_addr = nat_action_info.min_addr;
+                }
+                if (proto_num_min_specified && !proto_num_max_specified) {
+                    nat_action_info.max_port = nat_action_info.min_port;
+                }
+                if (proto_num_min_specified || proto_num_max_specified) {
+                    if (nat_action_info.nat_action & NAT_ACTION_SRC) {
+                        nat_action_info.nat_action |= NAT_ACTION_SRC_PORT;
+                    } else if (nat_action_info.nat_action & NAT_ACTION_DST) {
+                        nat_action_info.nat_action |= NAT_ACTION_DST_PORT;
+                    }
+                }
+                break;
+            }
             case OVS_CT_ATTR_UNSPEC:
             case __OVS_CT_ATTR_MAX:
                 OVS_NOT_REACHED();
             }
         }
 
-        conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, commit,
-                          zone, setmark, setlabel, helper);
+        /* We won't be able to function properly in this case, hence
+         * complain loudly. */
+        if (nat_config && !commit) {
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
+            VLOG_WARN_RL(&rl, "NAT specified without commit.");
+        }
+
+        conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, force,
+                          commit, zone, setmark, setlabel, aux->flow->tp_src,
+                          aux->flow->tp_dst, helper, nat_action_info_ref, now);
         break;
     }
 
+    case OVS_ACTION_ATTR_METER:
+        dp_netdev_run_meter(pmd->dp, packets_, nl_attr_get_u32(a),
+                            time_msec());
+        break;
+
     case OVS_ACTION_ATTR_PUSH_VLAN:
     case OVS_ACTION_ATTR_POP_VLAN:
     case OVS_ACTION_ATTR_PUSH_MPLS:
@@ -4598,6 +5650,11 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
     case OVS_ACTION_ATTR_HASH:
     case OVS_ACTION_ATTR_UNSPEC:
     case OVS_ACTION_ATTR_TRUNC:
+    case OVS_ACTION_ATTR_PUSH_ETH:
+    case OVS_ACTION_ATTR_POP_ETH:
+    case OVS_ACTION_ATTR_CLONE:
+    case OVS_ACTION_ATTR_ENCAP_NSH:
+    case OVS_ACTION_ATTR_DECAP_NSH:
     case __OVS_ACTION_ATTR_MAX:
         OVS_NOT_REACHED();
     }
@@ -4627,7 +5684,7 @@ struct dp_netdev_ct_dump {
 
 static int
 dpif_netdev_ct_dump_start(struct dpif *dpif, struct ct_dpif_dump_state **dump_,
-                          const uint16_t *pzone)
+                          const uint16_t *pzone, int *ptot_bkts)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_ct_dump *dump;
@@ -4636,7 +5693,7 @@ dpif_netdev_ct_dump_start(struct dpif *dpif, struct ct_dpif_dump_state **dump_,
     dump->dp = dp;
     dump->ct = &dp->conntrack;
 
-    conntrack_dump_start(&dp->conntrack, &dump->dump, pzone);
+    conntrack_dump_start(&dp->conntrack, &dump->dump, pzone, ptot_bkts);
 
     *dump_ = &dump->up;
 
@@ -4672,10 +5729,14 @@ dpif_netdev_ct_dump_done(struct dpif *dpif OVS_UNUSED,
 }
 
 static int
-dpif_netdev_ct_flush(struct dpif *dpif, const uint16_t *zone)
+dpif_netdev_ct_flush(struct dpif *dpif, const uint16_t *zone,
+                     const struct ct_dpif_tuple *tuple)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
 
+    if (tuple) {
+        return EOPNOTSUPP;
+    }
     return conntrack_flush(&dp->conntrack, zone);
 }
 
@@ -4710,7 +5771,7 @@ const struct dpif_class dpif_netdev_class = {
     dpif_netdev_operate,
     NULL,                       /* recv_set */
     NULL,                       /* handlers_set */
-    dpif_netdev_pmd_set,
+    dpif_netdev_set_config,
     dpif_netdev_queue_to_priority,
     NULL,                       /* recv */
     NULL,                       /* recv_wait */
@@ -4724,6 +5785,10 @@ const struct dpif_class dpif_netdev_class = {
     dpif_netdev_ct_dump_next,
     dpif_netdev_ct_dump_done,
     dpif_netdev_ct_flush,
+    dpif_netdev_meter_get_features,
+    dpif_netdev_meter_set,
+    dpif_netdev_meter_get,
+    dpif_netdev_meter_del,
 };
 
 static void
@@ -4762,12 +5827,12 @@ dpif_dummy_change_port_number(struct unixctl_conn *conn, int argc OVS_UNUSED,
 
     /* Remove port. */
     hmap_remove(&dp->ports, &port->node);
-    dp_netdev_del_port_from_all_pmds(dp, port);
+    reconfigure_datapath(dp);
 
     /* Reinsert with new port number. */
     port->port_no = port_no;
     hmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
-    dp_netdev_add_port_to_pmds(dp, port);
+    reconfigure_datapath(dp);
 
     seq_change(dp->port_seq);
     unixctl_command_reply(conn, NULL);
@@ -4931,11 +5996,25 @@ dpcls_sort_subtable_vector(struct dpcls *cls)
 }
 
 static inline void
-dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd)
+dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd,
+                           struct polled_queue *poll_list, int poll_cnt)
 {
     struct dpcls *cls;
     long long int now = time_msec();
 
+    if (now > pmd->rxq_next_cycle_store) {
+        /* Get the cycles that were used to process each queue and store. */
+        for (unsigned i = 0; i < poll_cnt; i++) {
+            uint64_t rxq_cyc_curr = dp_netdev_rxq_get_cycles(poll_list[i].rxq,
+                                                        RXQ_CYCLES_PROC_CURR);
+            dp_netdev_rxq_set_intrvl_cycles(poll_list[i].rxq, rxq_cyc_curr);
+            dp_netdev_rxq_set_cycles(poll_list[i].rxq, RXQ_CYCLES_PROC_CURR,
+                                     0);
+        }
+        /* Start new measuring interval */
+        pmd->rxq_next_cycle_store = now + PMD_RXQ_INTERVAL_LEN;
+    }
+
     if (now > pmd->next_optimization) {
         /* Try to obtain the flow lock to block out revalidator threads.
          * If not possible, just try next time. */