]> git.proxmox.com Git - mirror_ovs.git/blobdiff - lib/dpif-netdev.c
cirrus: Use FreeBSD 12.2.
[mirror_ovs.git] / lib / dpif-netdev.c
index 5ee8d0a4c8fff812418fbc9b5bc1c32e093f0b3a..e3fd0a07fd5b79152135bdbc800a5c815784a2a1 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016, 2017 Nicira, Inc.
+ * Copyright (c) 2009-2014, 2016-2018 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
 
 #include <config.h>
 #include "dpif-netdev.h"
+#include "dpif-netdev-private.h"
 
 #include <ctype.h>
 #include <errno.h>
 #include "bitmap.h"
 #include "cmap.h"
 #include "conntrack.h"
+#include "conntrack-tp.h"
 #include "coverage.h"
 #include "ct-dpif.h"
 #include "csum.h"
 #include "dp-packet.h"
 #include "dpif.h"
+#include "dpif-netdev-lookup.h"
 #include "dpif-netdev-perf.h"
 #include "dpif-provider.h"
 #include "dummy.h"
@@ -47,8 +50,9 @@
 #include "flow.h"
 #include "hmapx.h"
 #include "id-pool.h"
-#include "latch.h"
+#include "ipf.h"
 #include "netdev.h"
+#include "netdev-offload.h"
 #include "netdev-provider.h"
 #include "netdev-vport.h"
 #include "netlink.h"
 
 VLOG_DEFINE_THIS_MODULE(dpif_netdev);
 
+/* Auto Load Balancing Defaults */
+#define ALB_IMPROVEMENT_THRESHOLD    25
+#define ALB_LOAD_THRESHOLD           95
+#define ALB_REBALANCE_INTERVAL       1 /* 1 Min */
+#define MIN_TO_MSEC                  60000
+
 #define FLOW_DUMP_MAX_BATCH 50
 /* Use per thread recirc_depth to prevent recirculation loop. */
 #define MAX_RECIRC_DEPTH 6
@@ -89,11 +99,22 @@ DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
 #define DEFAULT_TX_FLUSH_INTERVAL 0
 
 /* Configuration parameters. */
-enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
 enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
 enum { MAX_BANDS = 8 };         /* Maximum number of bands / meter. */
 enum { N_METER_LOCKS = 64 };    /* Maximum number of meters. */
 
+COVERAGE_DEFINE(datapath_drop_meter);
+COVERAGE_DEFINE(datapath_drop_upcall_error);
+COVERAGE_DEFINE(datapath_drop_lock_error);
+COVERAGE_DEFINE(datapath_drop_userspace_action_error);
+COVERAGE_DEFINE(datapath_drop_tunnel_push_error);
+COVERAGE_DEFINE(datapath_drop_tunnel_pop_error);
+COVERAGE_DEFINE(datapath_drop_recirc_error);
+COVERAGE_DEFINE(datapath_drop_invalid_port);
+COVERAGE_DEFINE(datapath_drop_invalid_bond);
+COVERAGE_DEFINE(datapath_drop_invalid_tnl_port);
+COVERAGE_DEFINE(datapath_drop_rx_invalid_packet);
+
 /* Protects against changes to 'dp_netdevs'. */
 static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;
 
@@ -121,15 +142,6 @@ static struct odp_support dp_netdev_support = {
     .ct_orig_tuple6 = true,
 };
 
-/* Stores a miniflow with inline values */
-
-struct netdev_flow_key {
-    uint32_t hash;       /* Hash function differs for different users. */
-    uint32_t len;        /* Length of the following miniflow (incl. map). */
-    struct miniflow mf;
-    uint64_t buf[FLOW_MAX_PACKET_U64S];
-};
-
 /* EMC cache and SMC cache compose the datapath flow cache (DFC)
  *
  * Exact match cache for frequently used flows
@@ -229,6 +241,9 @@ struct dfc_cache {
  * and used during rxq to pmd assignment. */
 #define PMD_RXQ_INTERVAL_MAX 6
 
+/* Time in microseconds to try RCU quiescing. */
+#define PMD_RCU_QUIESCE_INTERVAL 10000LL
+
 struct dpcls {
     struct cmap_node node;      /* Within dp_netdev_pmd_thread.classifiers */
     odp_port_t in_port;
@@ -236,17 +251,17 @@ struct dpcls {
     struct pvector subtables;
 };
 
-/* A rule to be inserted to the classifier. */
-struct dpcls_rule {
-    struct cmap_node cmap_node;   /* Within struct dpcls_subtable 'rules'. */
-    struct netdev_flow_key *mask; /* Subtable's mask. */
-    struct netdev_flow_key flow;  /* Matching key. */
-    /* 'flow' must be the last field, additional space is allocated here. */
+/* Data structure to keep packet order till fastpath processing. */
+struct dp_packet_flow_map {
+    struct dp_packet *packet;
+    struct dp_netdev_flow *flow;
+    uint16_t tcp_flags;
 };
 
 static void dpcls_init(struct dpcls *);
 static void dpcls_destroy(struct dpcls *);
 static void dpcls_sort_subtable_vector(struct dpcls *);
+static uint32_t dpcls_subtable_lookup_reprobe(struct dpcls *cls);
 static void dpcls_insert(struct dpcls *, struct dpcls_rule *,
                          const struct netdev_flow_key *mask);
 static void dpcls_remove(struct dpcls *, struct dpcls_rule *);
@@ -254,8 +269,7 @@ static bool dpcls_lookup(struct dpcls *cls,
                          const struct netdev_flow_key *keys[],
                          struct dpcls_rule **rules, size_t cnt,
                          int *num_lookups_p);
-static bool dpcls_rule_matches_key(const struct dpcls_rule *rule,
-                            const struct netdev_flow_key *target);
+
 /* Set of supported meter flags */
 #define DP_SUPPORTED_METER_FLAGS_MASK \
     (OFPMF13_STATS | OFPMF13_PKTPS | OFPMF13_KBPS | OFPMF13_BURST)
@@ -281,6 +295,15 @@ struct dp_meter {
     struct dp_meter_band bands[];
 };
 
+struct pmd_auto_lb {
+    bool auto_lb_requested;     /* Auto load balancing requested by user. */
+    bool is_enabled;            /* Current status of Auto load balancing. */
+    uint64_t rebalance_intvl;
+    uint64_t rebalance_poll_timer;
+    uint8_t rebalance_improve_thresh;
+    atomic_uint8_t rebalance_load_thresh;
+};
+
 /* Datapath based on the network device interface from netdev.h.
  *
  *
@@ -294,12 +317,12 @@ struct dp_meter {
  *
  *    dp_netdev_mutex (global)
  *    port_mutex
+ *    bond_mutex
  *    non_pmd_mutex
  */
 struct dp_netdev {
     const struct dpif_class *const class;
     const char *const name;
-    struct dpif *dpif;
     struct ovs_refcount ref_cnt;
     atomic_flag destroyed;
 
@@ -341,6 +364,8 @@ struct dp_netdev {
     /* id pool for per thread static_tx_qid. */
     struct id_pool *tx_qid_pool;
     struct ovs_mutex tx_qid_pool_mutex;
+    /* Use measured cycles for rxq to pmd assignment. */
+    bool pmd_rxq_assign_cyc;
 
     /* Protects the access of the 'struct dp_netdev_pmd_thread'
      * instance for non-pmd thread. */
@@ -358,7 +383,12 @@ struct dp_netdev {
 
     uint64_t last_tnl_conf_seq;
 
-    struct conntrack conntrack;
+    struct conntrack *conntrack;
+    struct pmd_auto_lb pmd_alb;
+
+    /* Bonds. */
+    struct ovs_mutex bond_mutex; /* Protects updates of 'tx_bonds'. */
+    struct cmap tx_bonds; /* Contains 'struct tx_bond'. */
 };
 
 static void meter_lock(const struct dp_netdev *dp, uint32_t meter_id)
@@ -451,6 +481,7 @@ struct dp_netdev_port {
     unsigned n_rxq;             /* Number of elements in 'rxqs' */
     unsigned *txq_used;         /* Number of threads that use each tx queue. */
     struct ovs_mutex txq_used_mutex;
+    bool emc_enabled;           /* If true EMC will be used. */
     char *type;                 /* Port type as requested by user. */
     char *rxq_affinity_list;    /* Requested affinity of rx queues. */
 };
@@ -463,6 +494,12 @@ struct dp_netdev_flow_stats {
     atomic_uint16_t tcp_flags;     /* Bitwise-OR of seen tcp_flags values. */
 };
 
+/* Contained by struct dp_netdev_flow's 'last_attrs' member.  */
+struct dp_netdev_flow_attrs {
+    atomic_bool offloaded;         /* True if flow is offloaded to HW. */
+    ATOMIC(const char *) dp_layer; /* DP layer the flow is handled in. */
+};
+
 /* A flow in 'dp_netdev_pmd_thread's 'flow_table'.
  *
  *
@@ -523,6 +560,11 @@ struct dp_netdev_flow {
     /* Statistics. */
     struct dp_netdev_flow_stats stats;
 
+    /* Statistics and attributes received from the netdev offload provider. */
+    atomic_int netdev_flow_get_result;
+    struct dp_netdev_flow_stats last_stats;
+    struct dp_netdev_flow_attrs last_attrs;
+
     /* Actions. */
     OVSRCU_TYPE(struct dp_netdev_actions *) actions;
 
@@ -533,6 +575,7 @@ struct dp_netdev_flow {
     struct packet_batch_per_flow *batch;
 
     /* Packet classification. */
+    char *dp_extra_info;         /* String to return in a flow dump/get. */
     struct dpcls_rule cr;        /* In owning dp_netdev's 'cls'. */
     /* 'cr' must be the last member. */
 };
@@ -565,6 +608,9 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *);
 struct polled_queue {
     struct dp_netdev_rxq *rxq;
     odp_port_t port_no;
+    bool emc_enabled;
+    bool rxq_enabled;
+    uint64_t change_seq;
 };
 
 /* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */
@@ -585,6 +631,20 @@ struct tx_port {
     struct dp_netdev_rxq *output_pkts_rxqs[NETDEV_MAX_BURST];
 };
 
+/* Contained by struct tx_bond 'member_buckets'. */
+struct member_entry {
+    odp_port_t member_id;
+    atomic_ullong n_packets;
+    atomic_ullong n_bytes;
+};
+
+/* Contained by struct dp_netdev_pmd_thread's 'tx_bonds'. */
+struct tx_bond {
+    struct cmap_node node;
+    uint32_t bond_id;
+    struct member_entry member_buckets[BOND_BUCKETS];
+};
+
 /* A set of properties for the current processing loop that is not directly
  * associated with the pmd thread itself, but with the packets being
  * processed or the short-term system configuration (for example, time).
@@ -594,6 +654,8 @@ struct dp_netdev_pmd_thread_ctx {
     long long now;
     /* RX queue from which last packet was received. */
     struct dp_netdev_rxq *last_rxq;
+    /* EMC insertion probability context for the current processing cycle. */
+    uint32_t emc_insert_min;
 };
 
 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
@@ -619,9 +681,6 @@ struct dp_netdev_pmd_thread {
     struct ovs_refcount ref_cnt;    /* Every reference must be refcount'ed. */
     struct cmap_node node;          /* In 'dp->poll_threads'. */
 
-    pthread_cond_t cond;            /* For synchronizing pmd thread reload. */
-    struct ovs_mutex cond_mutex;    /* Mutex for condition variable. */
-
     /* Per thread exact-match cache.  Note, the instance for cpu core
      * NON_PMD_CORE_ID can be accessed by multiple threads, and thusly
      * need to be protected by 'non_pmd_mutex'.  Every other instance
@@ -653,10 +712,51 @@ struct dp_netdev_pmd_thread {
     /* Current context of the PMD thread. */
     struct dp_netdev_pmd_thread_ctx ctx;
 
-    struct latch exit_latch;        /* For terminating the pmd thread. */
     struct seq *reload_seq;
     uint64_t last_reload_seq;
+
+    /* These are atomic variables used as a synchronization and configuration
+     * points for thread reload/exit.
+     *
+     * 'reload' atomic is the main one and it's used as a memory
+     * synchronization point for all other knobs and data.
+     *
+     * For a thread that requests PMD reload:
+     *
+     *   * All changes that should be visible to the PMD thread must be made
+     *     before setting the 'reload'.  These changes could use any memory
+     *     ordering model including 'relaxed'.
+     *   * Setting the 'reload' atomic should occur in the same thread where
+     *     all other PMD configuration options updated.
+     *   * Setting the 'reload' atomic should be done with 'release' memory
+     *     ordering model or stricter.  This will guarantee that all previous
+     *     changes (including non-atomic and 'relaxed') will be visible to
+     *     the PMD thread.
+     *   * To check that reload is done, thread should poll the 'reload' atomic
+     *     to become 'false'.  Polling should be done with 'acquire' memory
+     *     ordering model or stricter.  This ensures that PMD thread completed
+     *     the reload process.
+     *
+     * For the PMD thread:
+     *
+     *   * PMD thread should read 'reload' atomic with 'acquire' memory
+     *     ordering model or stricter.  This will guarantee that all changes
+     *     made before setting the 'reload' in the requesting thread will be
+     *     visible to the PMD thread.
+     *   * All other configuration data could be read with any memory
+     *     ordering model (including non-atomic and 'relaxed') but *only after*
+     *     reading the 'reload' atomic set to 'true'.
+     *   * When the PMD reload done, PMD should (optionally) set all the below
+     *     knobs except the 'reload' to their default ('false') values and
+     *     (mandatory), as the last step, set the 'reload' to 'false' using
+     *     'release' memory ordering model or stricter.  This will inform the
+     *     requesting thread that PMD has completed a reload cycle.
+     */
     atomic_bool reload;             /* Do we need to reload ports? */
+    atomic_bool wait_for_reload;    /* Can we busy wait for the next reload? */
+    atomic_bool reload_tx_qid;      /* Do we need to reload static_tx_qid? */
+    atomic_bool exit;               /* For terminating the pmd thread. */
+
     pthread_t thread;
     unsigned core_id;               /* CPU core id of this pmd thread. */
     int numa_id;                    /* numa node id of this pmd thread. */
@@ -677,6 +777,11 @@ struct dp_netdev_pmd_thread {
      * read by the pmd thread. */
     struct hmap tx_ports OVS_GUARDED;
 
+    struct ovs_mutex bond_mutex;    /* Protects updates of 'tx_bonds'. */
+    /* Map of 'tx_bond's used for transmission.  Written by the main thread
+     * and read by the pmd thread. */
+    struct cmap tx_bonds;
+
     /* These are thread-local copies of 'tx_ports'.  One contains only tunnel
      * ports (that support push_tunnel/pop_tunnel), the other contains ports
      * with at least one txq (that support send).  A port can be in both.
@@ -693,8 +798,16 @@ struct dp_netdev_pmd_thread {
     /* Keep track of detailed PMD performance statistics. */
     struct pmd_perf_stats perf_stats;
 
+    /* Stats from previous iteration used by automatic pmd
+     * load balance logic. */
+    uint64_t prev_stats[PMD_N_STATS];
+    atomic_count pmd_overloaded;
+
     /* Set to true if the pmd thread needs to be reloaded. */
     bool need_reload;
+
+    /* Next time when PMD should try RCU quiescing. */
+    long long next_rcu_quiesce;
 };
 
 /* Interface to netdev-based datapath. */
@@ -763,6 +876,12 @@ static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
 static int
 dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
                                    bool force);
+static void dp_netdev_add_bond_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
+                                         struct tx_bond *bond, bool update)
+    OVS_EXCLUDED(pmd->bond_mutex);
+static void dp_netdev_del_bond_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
+                                           uint32_t bond_id)
+    OVS_EXCLUDED(pmd->bond_mutex);
 
 static void reconfigure_datapath(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex);
@@ -791,6 +910,9 @@ dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
                                bool purge);
 static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
                                       struct tx_port *tx);
+static inline struct dpcls *
+dp_netdev_pmd_lookup_dpcls(struct dp_netdev_pmd_thread *pmd,
+                           odp_port_t in_port);
 
 static inline bool emc_entry_alive(struct emc_entry *ce);
 static void emc_clear_entry(struct emc_entry *ce);
@@ -1077,6 +1199,7 @@ compare_poll_list(const void *a_, const void *b_)
 static void
 sorted_poll_list(struct dp_netdev_pmd_thread *pmd, struct rxq_poll **list,
                  size_t *n)
+    OVS_REQUIRES(pmd->port_mutex)
 {
     struct rxq_poll *ret, *poll;
     size_t i;
@@ -1129,6 +1252,8 @@ pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd)
             }
             ds_put_format(reply, "  port: %-16s  queue-id: %2d", name,
                           netdev_rxq_get_queue_id(list[i].rxq->rx));
+            ds_put_format(reply, " %s", netdev_rxq_enabled(list[i].rxq->rx)
+                                        ? "(enabled) " : "(disabled)");
             ds_put_format(reply, "  pmd usage: ");
             if (total_cycles) {
                 ds_put_format(reply, "%2"PRIu64"",
@@ -1188,6 +1313,121 @@ sorted_poll_thread_list(struct dp_netdev *dp,
     *n = k;
 }
 
+static void
+dpif_netdev_subtable_lookup_get(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                                const char *argv[] OVS_UNUSED,
+                                void *aux OVS_UNUSED)
+{
+    /* Get a list of all lookup functions. */
+    struct dpcls_subtable_lookup_info_t *lookup_funcs = NULL;
+    int32_t count = dpcls_subtable_lookup_info_get(&lookup_funcs);
+    if (count < 0) {
+        unixctl_command_reply_error(conn, "error getting lookup names");
+        return;
+    }
+
+    /* Add all lookup functions to reply string. */
+    struct ds reply = DS_EMPTY_INITIALIZER;
+    ds_put_cstr(&reply, "Available lookup functions (priority : name)\n");
+    for (int i = 0; i < count; i++) {
+        ds_put_format(&reply, "  %d : %s\n", lookup_funcs[i].prio,
+                      lookup_funcs[i].name);
+    }
+    unixctl_command_reply(conn, ds_cstr(&reply));
+    ds_destroy(&reply);
+}
+
+static void
+dpif_netdev_subtable_lookup_set(struct unixctl_conn *conn, int argc,
+                                const char *argv[], void *aux OVS_UNUSED)
+{
+    /* This function requires 2 parameters (argv[1] and argv[2]) to execute.
+     *   argv[1] is subtable name
+     *   argv[2] is priority
+     *   argv[3] is the datapath name (optional if only 1 datapath exists)
+     */
+    const char *func_name = argv[1];
+
+    errno = 0;
+    char *err_char;
+    uint32_t new_prio = strtoul(argv[2], &err_char, 10);
+    if (errno != 0 || new_prio > UINT8_MAX) {
+        unixctl_command_reply_error(conn,
+            "error converting priority, use integer in range 0-255\n");
+        return;
+    }
+
+    int32_t err = dpcls_subtable_set_prio(func_name, new_prio);
+    if (err) {
+        unixctl_command_reply_error(conn,
+            "error, subtable lookup function not found\n");
+        return;
+    }
+
+    /* argv[3] is optional datapath instance. If no datapath name is provided
+     * and only one datapath exists, the one existing datapath is reprobed.
+     */
+    ovs_mutex_lock(&dp_netdev_mutex);
+    struct dp_netdev *dp = NULL;
+
+    if (argc == 4) {
+        dp = shash_find_data(&dp_netdevs, argv[3]);
+    } else if (shash_count(&dp_netdevs) == 1) {
+        dp = shash_first(&dp_netdevs)->data;
+    }
+
+    if (!dp) {
+        ovs_mutex_unlock(&dp_netdev_mutex);
+        unixctl_command_reply_error(conn,
+                                    "please specify an existing datapath");
+        return;
+    }
+
+    /* Get PMD threads list, required to get DPCLS instances. */
+    size_t n;
+    uint32_t lookup_dpcls_changed = 0;
+    uint32_t lookup_subtable_changed = 0;
+    struct dp_netdev_pmd_thread **pmd_list;
+    sorted_poll_thread_list(dp, &pmd_list, &n);
+
+    /* take port mutex as HMAP iters over them. */
+    ovs_mutex_lock(&dp->port_mutex);
+
+    for (size_t i = 0; i < n; i++) {
+        struct dp_netdev_pmd_thread *pmd = pmd_list[i];
+        if (pmd->core_id == NON_PMD_CORE_ID) {
+            continue;
+        }
+
+        struct dp_netdev_port *port = NULL;
+        HMAP_FOR_EACH (port, node, &dp->ports) {
+            odp_port_t in_port = port->port_no;
+            struct dpcls *cls = dp_netdev_pmd_lookup_dpcls(pmd, in_port);
+            if (!cls) {
+                continue;
+            }
+            uint32_t subtbl_changes = dpcls_subtable_lookup_reprobe(cls);
+            if (subtbl_changes) {
+                lookup_dpcls_changed++;
+                lookup_subtable_changed += subtbl_changes;
+            }
+        }
+    }
+
+    /* release port mutex before netdev mutex. */
+    ovs_mutex_unlock(&dp->port_mutex);
+    ovs_mutex_unlock(&dp_netdev_mutex);
+
+    struct ds reply = DS_EMPTY_INITIALIZER;
+    ds_put_format(&reply,
+        "Lookup priority change affected %d dpcls ports and %d subtables.\n",
+        lookup_dpcls_changed, lookup_subtable_changed);
+    const char *reply_str = ds_cstr(&reply);
+    unixctl_command_reply(conn, reply_str);
+    VLOG_INFO("%s", reply_str);
+    ds_destroy(&reply);
+}
+
 static void
 dpif_netdev_pmd_rebalance(struct unixctl_conn *conn, int argc,
                           const char *argv[], void *aux OVS_UNUSED)
@@ -1326,6 +1566,49 @@ pmd_perf_show_cmd(struct unixctl_conn *conn, int argc,
     par.command_type = PMD_INFO_PERF_SHOW;
     dpif_netdev_pmd_info(conn, argc, argv, &par);
 }
+
+static void
+dpif_netdev_bond_show(struct unixctl_conn *conn, int argc,
+                      const char *argv[], void *aux OVS_UNUSED)
+{
+    struct ds reply = DS_EMPTY_INITIALIZER;
+    struct dp_netdev *dp = NULL;
+
+    ovs_mutex_lock(&dp_netdev_mutex);
+    if (argc == 2) {
+        dp = shash_find_data(&dp_netdevs, argv[1]);
+    } else if (shash_count(&dp_netdevs) == 1) {
+        /* There's only one datapath. */
+        dp = shash_first(&dp_netdevs)->data;
+    }
+    if (!dp) {
+        ovs_mutex_unlock(&dp_netdev_mutex);
+        unixctl_command_reply_error(conn,
+                                    "please specify an existing datapath");
+        return;
+    }
+
+    if (cmap_count(&dp->tx_bonds) > 0) {
+        struct tx_bond *dp_bond_entry;
+
+        ds_put_cstr(&reply, "Bonds:\n");
+        CMAP_FOR_EACH (dp_bond_entry, node, &dp->tx_bonds) {
+            ds_put_format(&reply, "  bond-id %"PRIu32":\n",
+                          dp_bond_entry->bond_id);
+            for (int bucket = 0; bucket < BOND_BUCKETS; bucket++) {
+                uint32_t member_id = odp_to_u32(
+                    dp_bond_entry->member_buckets[bucket].member_id);
+                ds_put_format(&reply,
+                              "    bucket %d - member %"PRIu32"\n",
+                              bucket, member_id);
+            }
+        }
+    }
+    ovs_mutex_unlock(&dp_netdev_mutex);
+    unixctl_command_reply(conn, ds_cstr(&reply));
+    ds_destroy(&reply);
+}
+
 \f
 static int
 dpif_netdev_init(void)
@@ -1357,6 +1640,16 @@ dpif_netdev_init(void)
                              "[-us usec] [-q qlen]",
                              0, 10, pmd_perf_log_set_cmd,
                              NULL);
+    unixctl_command_register("dpif-netdev/bond-show", "[dp]",
+                             0, 1, dpif_netdev_bond_show,
+                             NULL);
+    unixctl_command_register("dpif-netdev/subtable-lookup-prio-set",
+                             "[lookup_func] [prio] [dp]",
+                             2, 3, dpif_netdev_subtable_lookup_set,
+                             NULL);
+    unixctl_command_register("dpif-netdev/subtable-lookup-prio-get", "",
+                             0, 0, dpif_netdev_subtable_lookup_get,
+                             NULL);
     return 0;
 }
 
@@ -1458,9 +1751,18 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
                  struct dp_netdev **dpp)
     OVS_REQUIRES(dp_netdev_mutex)
 {
+    static struct ovsthread_once tsc_freq_check = OVSTHREAD_ONCE_INITIALIZER;
     struct dp_netdev *dp;
     int error;
 
+    /* Avoid estimating TSC frequency for dummy datapath to not slow down
+     * unit tests. */
+    if (!dpif_netdev_class_is_dummy(class)
+        && ovsthread_once_start(&tsc_freq_check)) {
+        pmd_perf_estimate_tsc_frequency();
+        ovsthread_once_done(&tsc_freq_check);
+    }
+
     dp = xzalloc(sizeof *dp);
     shash_add(&dp_netdevs, name, dp);
 
@@ -1469,9 +1771,12 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     ovs_refcount_init(&dp->ref_cnt);
     atomic_flag_clear(&dp->destroyed);
 
-    ovs_mutex_init(&dp->port_mutex);
+    ovs_mutex_init_recursive(&dp->port_mutex);
     hmap_init(&dp->ports);
     dp->port_seq = seq_create();
+    ovs_mutex_init(&dp->bond_mutex);
+    cmap_init(&dp->tx_bonds);
+
     fat_rwlock_init(&dp->upcall_rwlock);
 
     dp->reconfigure_seq = seq_create();
@@ -1486,12 +1791,13 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     dp->upcall_aux = NULL;
     dp->upcall_cb = NULL;
 
-    conntrack_init(&dp->conntrack);
+    dp->conntrack = conntrack_init();
 
     atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
     atomic_init(&dp->tx_flush_interval, DEFAULT_TX_FLUSH_INTERVAL);
 
     cmap_init(&dp->poll_threads);
+    dp->pmd_rxq_assign_cyc = true;
 
     ovs_mutex_init(&dp->tx_qid_pool_mutex);
     /* We need 1 Tx queue for each possible core + 1 for non-PMD threads. */
@@ -1549,7 +1855,6 @@ dpif_netdev_open(const struct dpif_class *class, const char *name,
     }
     if (!error) {
         *dpifp = create_dpif_netdev(dp);
-        dp->dpif = *dpifp;
     }
     ovs_mutex_unlock(&dp_netdev_mutex);
 
@@ -1578,6 +1883,12 @@ dp_delete_meter(struct dp_netdev *dp, uint32_t meter_id)
     }
 }
 
+static uint32_t
+hash_bond_id(uint32_t bond_id)
+{
+    return hash_int(bond_id, 0);
+}
+
 /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
  * through the 'dp_netdevs' shash while freeing 'dp'. */
 static void
@@ -1585,6 +1896,7 @@ dp_netdev_free(struct dp_netdev *dp)
     OVS_REQUIRES(dp_netdev_mutex)
 {
     struct dp_netdev_port *port, *next;
+    struct tx_bond *bond;
 
     shash_find_and_delete(&dp_netdevs, dp->name);
 
@@ -1594,6 +1906,13 @@ dp_netdev_free(struct dp_netdev *dp)
     }
     ovs_mutex_unlock(&dp->port_mutex);
 
+    ovs_mutex_lock(&dp->bond_mutex);
+    CMAP_FOR_EACH (bond, node, &dp->tx_bonds) {
+        cmap_remove(&dp->tx_bonds, &bond->node, hash_bond_id(bond->bond_id));
+        ovsrcu_postpone(free, bond);
+    }
+    ovs_mutex_unlock(&dp->bond_mutex);
+
     dp_netdev_destroy_all_pmds(dp, true);
     cmap_destroy(&dp->poll_threads);
 
@@ -1603,7 +1922,7 @@ dp_netdev_free(struct dp_netdev *dp)
     ovs_mutex_destroy(&dp->non_pmd_mutex);
     ovsthread_key_delete(dp->per_pmd_key);
 
-    conntrack_destroy(&dp->conntrack);
+    conntrack_destroy(dp->conntrack);
 
 
     seq_destroy(dp->reconfigure_seq);
@@ -1612,6 +1931,9 @@ dp_netdev_free(struct dp_netdev *dp)
     hmap_destroy(&dp->ports);
     ovs_mutex_destroy(&dp->port_mutex);
 
+    cmap_destroy(&dp->tx_bonds);
+    ovs_mutex_destroy(&dp->bond_mutex);
+
     /* Upcalls must be disabled at this point */
     dp_netdev_destroy_upcall_lock(dp);
 
@@ -1719,11 +2041,8 @@ dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd)
         return;
     }
 
-    ovs_mutex_lock(&pmd->cond_mutex);
     seq_change(pmd->reload_seq);
-    atomic_store_relaxed(&pmd->reload, true);
-    ovs_mutex_cond_wait(&pmd->cond, &pmd->cond_mutex);
-    ovs_mutex_unlock(&pmd->cond_mutex);
+    atomic_store_explicit(&pmd->reload, true, memory_order_release);
 }
 
 static uint32_t
@@ -1736,7 +2055,6 @@ static int
 port_create(const char *devname, const char *type,
             odp_port_t port_no, struct dp_netdev_port **portp)
 {
-    struct netdev_saved_flags *sf;
     struct dp_netdev_port *port;
     enum netdev_flags flags;
     struct netdev *netdev;
@@ -1758,17 +2076,12 @@ port_create(const char *devname, const char *type,
         goto out;
     }
 
-    error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
-    if (error) {
-        VLOG_ERR("%s: cannot set promisc flag", devname);
-        goto out;
-    }
-
     port = xzalloc(sizeof *port);
     port->port_no = port_no;
     port->netdev = netdev;
     port->type = xstrdup(type);
-    port->sf = sf;
+    port->sf = NULL;
+    port->emc_enabled = true;
     port->need_reconfigure = true;
     ovs_mutex_init(&port->txq_used_mutex);
 
@@ -1786,6 +2099,7 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
             odp_port_t port_no)
     OVS_REQUIRES(dp->port_mutex)
 {
+    struct netdev_saved_flags *sf;
     struct dp_netdev_port *port;
     int error;
 
@@ -1804,6 +2118,24 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
 
     reconfigure_datapath(dp);
 
+    /* Check that port was successfully configured. */
+    if (!dp_netdev_lookup_port(dp, port_no)) {
+        return EINVAL;
+    }
+
+    /* Updating device flags triggers an if_notifier, which triggers a bridge
+     * reconfiguration and another attempt to add this port, leading to an
+     * infinite loop if the device is configured incorrectly and cannot be
+     * added.  Setting the promisc mode after a successful reconfiguration,
+     * since we already know that the device is somehow properly configured. */
+    error = netdev_turn_flags_on(port->netdev, NETDEV_PROMISC, &sf);
+    if (error) {
+        VLOG_ERR("%s: cannot set promisc flag", devname);
+        do_del_port(dp, port);
+        return error;
+    }
+    port->sf = sf;
+
     return 0;
 }
 
@@ -1951,6 +2283,8 @@ static void
 do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     OVS_REQUIRES(dp->port_mutex)
 {
+    netdev_flow_flush(port->netdev);
+    netdev_uninit_flow_api(port->netdev);
     hmap_remove(&dp->ports, &port->node);
     seq_change(dp->port_seq);
 
@@ -2008,6 +2342,7 @@ static void
 dp_netdev_flow_free(struct dp_netdev_flow *flow)
 {
     dp_netdev_actions_free(dp_netdev_flow_get_actions(flow));
+    free(flow->dp_extra_info);
     free(flow);
 }
 
@@ -2059,7 +2394,11 @@ dp_netdev_pmd_find_dpcls(struct dp_netdev_pmd_thread *pmd,
 }
 
 #define MAX_FLOW_MARK       (UINT32_MAX - 1)
-#define INVALID_FLOW_MARK   (UINT32_MAX)
+#define INVALID_FLOW_MARK   0
+/* Zero flow mark is used to indicate the HW to remove the mark. A packet
+ * marked with zero mark is received in SW without a mark at all, so it
+ * cannot be used as a valid mark.
+ */
 
 struct megaflow_to_mark_data {
     const struct cmap_node node;
@@ -2085,7 +2424,7 @@ flow_mark_alloc(void)
 
     if (!flow_mark.pool) {
         /* Haven't initiated yet, do it here */
-        flow_mark.pool = id_pool_create(0, MAX_FLOW_MARK);
+        flow_mark.pool = id_pool_create(1, MAX_FLOW_MARK);
     }
 
     if (id_pool_alloc_id(flow_mark.pool, &mark)) {
@@ -2126,7 +2465,7 @@ megaflow_to_mark_disassociate(const ovs_u128 *mega_ufid)
         if (ovs_u128_equals(*mega_ufid, data->mega_ufid)) {
             cmap_remove(&flow_mark.megaflow_to_mark,
                         CONST_CAST(struct cmap_node *, &data->node), hash);
-            free(data);
+            ovsrcu_postpone(free, data);
             return;
         }
     }
@@ -2147,8 +2486,8 @@ megaflow_to_mark_find(const ovs_u128 *mega_ufid)
         }
     }
 
-    VLOG_WARN("Mark id for ufid "UUID_FMT" was not found\n",
-              UUID_ARGS((struct uuid *)mega_ufid));
+    VLOG_DBG("Mark id for ufid "UUID_FMT" was not found\n",
+             UUID_ARGS((struct uuid *)mega_ufid));
     return INVALID_FLOW_MARK;
 }
 
@@ -2163,7 +2502,8 @@ mark_to_flow_associate(const uint32_t mark, struct dp_netdev_flow *flow)
                 hash_int(mark, 0));
     flow->mark = mark;
 
-    VLOG_DBG("Associated dp_netdev flow %p with mark %u\n", flow, mark);
+    VLOG_DBG("Associated dp_netdev flow %p with mark %u mega_ufid "UUID_FMT,
+             flow, mark, UUID_ARGS((struct uuid *) &flow->mega_ufid));
 }
 
 static bool
@@ -2185,10 +2525,17 @@ static int
 mark_to_flow_disassociate(struct dp_netdev_pmd_thread *pmd,
                           struct dp_netdev_flow *flow)
 {
-    int ret = 0;
-    uint32_t mark = flow->mark;
+    const char *dpif_type_str = dpif_normalize_type(pmd->dp->class->type);
     struct cmap_node *mark_node = CONST_CAST(struct cmap_node *,
                                              &flow->mark_node);
+    uint32_t mark = flow->mark;
+    int ret = 0;
+
+    /* INVALID_FLOW_MARK may mean that the flow has been disassociated or
+     * never associated. */
+    if (OVS_UNLIKELY(mark == INVALID_FLOW_MARK)) {
+        return EINVAL;
+    }
 
     cmap_remove(&flow_mark.mark_to_flow, mark_node, hash_int(mark, 0));
     flow->mark = INVALID_FLOW_MARK;
@@ -2198,18 +2545,22 @@ mark_to_flow_disassociate(struct dp_netdev_pmd_thread *pmd,
      * remove the flow from hardware and free the mark.
      */
     if (flow_mark_has_no_ref(mark)) {
-        struct dp_netdev_port *port;
+        struct netdev *port;
         odp_port_t in_port = flow->flow.in_port.odp_port;
 
-        ovs_mutex_lock(&pmd->dp->port_mutex);
-        port = dp_netdev_lookup_port(pmd->dp, in_port);
+        port = netdev_ports_get(in_port, dpif_type_str);
         if (port) {
-            ret = netdev_flow_del(port->netdev, &flow->mega_ufid, NULL);
+            /* Taking a global 'port_mutex' to fulfill thread safety
+             * restrictions for the netdev-offload-dpdk module. */
+            ovs_mutex_lock(&pmd->dp->port_mutex);
+            ret = netdev_flow_del(port, &flow->mega_ufid, NULL);
+            ovs_mutex_unlock(&pmd->dp->port_mutex);
+            netdev_close(port);
         }
-        ovs_mutex_unlock(&pmd->dp->port_mutex);
 
         flow_mark_free(mark);
-        VLOG_DBG("Freed flow mark %u\n", mark);
+        VLOG_DBG("Freed flow mark %u mega_ufid "UUID_FMT, mark,
+                 UUID_ARGS((struct uuid *) &flow->mega_ufid));
 
         megaflow_to_mark_disassociate(&flow->mega_ufid);
     }
@@ -2304,12 +2655,13 @@ dp_netdev_flow_offload_del(struct dp_flow_offload_item *offload)
 static int
 dp_netdev_flow_offload_put(struct dp_flow_offload_item *offload)
 {
-    struct dp_netdev_port *port;
     struct dp_netdev_pmd_thread *pmd = offload->pmd;
     struct dp_netdev_flow *flow = offload->flow;
     odp_port_t in_port = flow->flow.in_port.odp_port;
+    const char *dpif_type_str = dpif_normalize_type(pmd->dp->class->type);
     bool modification = offload->op == DP_NETDEV_FLOW_OFFLOAD_OP_MOD;
     struct offload_info info;
+    struct netdev *port;
     uint32_t mark;
     int ret;
 
@@ -2339,37 +2691,43 @@ dp_netdev_flow_offload_put(struct dp_flow_offload_item *offload)
         mark = flow_mark_alloc();
         if (mark == INVALID_FLOW_MARK) {
             VLOG_ERR("Failed to allocate flow mark!\n");
+            return -1;
         }
     }
     info.flow_mark = mark;
 
-    ovs_mutex_lock(&pmd->dp->port_mutex);
-    port = dp_netdev_lookup_port(pmd->dp, in_port);
-    if (!port) {
-        ovs_mutex_unlock(&pmd->dp->port_mutex);
-        return -1;
+    port = netdev_ports_get(in_port, dpif_type_str);
+    if (!port || netdev_vport_is_vport_class(port->netdev_class)) {
+        netdev_close(port);
+        goto err_free;
     }
-    ret = netdev_flow_put(port->netdev, &offload->match,
+    /* Taking a global 'port_mutex' to fulfill thread safety restrictions for
+     * the netdev-offload-dpdk module. */
+    ovs_mutex_lock(&pmd->dp->port_mutex);
+    ret = netdev_flow_put(port, &offload->match,
                           CONST_CAST(struct nlattr *, offload->actions),
                           offload->actions_len, &flow->mega_ufid, &info,
                           NULL);
     ovs_mutex_unlock(&pmd->dp->port_mutex);
+    netdev_close(port);
 
     if (ret) {
-        if (!modification) {
-            flow_mark_free(mark);
-        } else {
-            mark_to_flow_disassociate(pmd, flow);
-        }
-        return -1;
+        goto err_free;
     }
 
     if (!modification) {
         megaflow_to_mark_associate(&flow->mega_ufid, mark);
         mark_to_flow_associate(mark, flow);
     }
-
     return 0;
+
+err_free:
+    if (!modification) {
+        flow_mark_free(mark);
+    } else {
+        mark_to_flow_disassociate(pmd, flow);
+    }
+    return -1;
 }
 
 static void *
@@ -2386,6 +2744,7 @@ dp_netdev_flow_offload_main(void *data OVS_UNUSED)
             ovsrcu_quiesce_start();
             ovs_mutex_cond_wait(&dp_flow_offload.cond,
                                 &dp_flow_offload.mutex);
+            ovsrcu_quiesce_end();
         }
         list = ovs_list_pop_front(&dp_flow_offload.list);
         offload = CONTAINER_OF(list, struct dp_flow_offload_item, node);
@@ -2408,9 +2767,11 @@ dp_netdev_flow_offload_main(void *data OVS_UNUSED)
             OVS_NOT_REACHED();
         }
 
-        VLOG_DBG("%s to %s netdev flow\n",
-                 ret == 0 ? "succeed" : "failed", op);
+        VLOG_DBG("%s to %s netdev flow "UUID_FMT,
+                 ret == 0 ? "succeed" : "failed", op,
+                 UUID_ARGS((struct uuid *) &offload->flow->mega_ufid));
         dp_netdev_free_flow_offload(offload);
+        ovsrcu_quiesce();
     }
 
     return NULL;
@@ -2706,27 +3067,6 @@ netdev_flow_key_init_masked(struct netdev_flow_key *dst,
                             (dst_u64 - miniflow_get_values(&dst->mf)) * 8);
 }
 
-/* Iterate through netdev_flow_key TNL u64 values specified by 'FLOWMAP'. */
-#define NETDEV_FLOW_KEY_FOR_EACH_IN_FLOWMAP(VALUE, KEY, FLOWMAP)   \
-    MINIFLOW_FOR_EACH_IN_FLOWMAP(VALUE, &(KEY)->mf, FLOWMAP)
-
-/* Returns a hash value for the bits of 'key' where there are 1-bits in
- * 'mask'. */
-static inline uint32_t
-netdev_flow_key_hash_in_mask(const struct netdev_flow_key *key,
-                             const struct netdev_flow_key *mask)
-{
-    const uint64_t *p = miniflow_get_values(&mask->mf);
-    uint32_t hash = 0;
-    uint64_t value;
-
-    NETDEV_FLOW_KEY_FOR_EACH_IN_FLOWMAP(value, key, mask->mf.map) {
-        hash = hash_add64(hash, value & *p++);
-    }
-
-    return hash_finish(hash, (p - miniflow_get_values(&mask->mf)) * 8);
-}
-
 static inline bool
 emc_entry_alive(struct emc_entry *ce)
 {
@@ -2800,9 +3140,7 @@ emc_probabilistic_insert(struct dp_netdev_pmd_thread *pmd,
      * default the value is UINT32_MAX / 100 which yields an insertion
      * probability of 1/100 ie. 1% */
 
-    uint32_t min;
-
-    atomic_read_relaxed(&pmd->dp->emc_insert_min, &min);
+    uint32_t min = pmd->ctx.emc_insert_min;
 
     if (min && random_uint32() <= min) {
         emc_insert(&(pmd->flow_cache).emc_cache, key, flow);
@@ -2938,7 +3276,7 @@ dp_netdev_pmd_find_flow(const struct dp_netdev_pmd_thread *pmd,
     /* If a UFID is not provided, determine one based on the key. */
     if (!ufidp && key && key_len
         && !dpif_netdev_flow_from_nlattrs(key, key_len, &flow, false)) {
-        dpif_flow_hash(pmd->dp->dpif, &flow, sizeof flow, &ufid);
+        odp_flow_key_hash(&flow, sizeof flow, &ufid);
         ufidp = &ufid;
     }
 
@@ -2955,9 +3293,118 @@ dp_netdev_pmd_find_flow(const struct dp_netdev_pmd_thread *pmd,
 }
 
 static void
-get_dpif_flow_stats(const struct dp_netdev_flow *netdev_flow_,
-                    struct dpif_flow_stats *stats)
+dp_netdev_flow_set_last_stats_attrs(struct dp_netdev_flow *netdev_flow,
+                                    const struct dpif_flow_stats *stats,
+                                    const struct dpif_flow_attrs *attrs,
+                                    int result)
 {
+    struct dp_netdev_flow_stats *last_stats = &netdev_flow->last_stats;
+    struct dp_netdev_flow_attrs *last_attrs = &netdev_flow->last_attrs;
+
+    atomic_store_relaxed(&netdev_flow->netdev_flow_get_result, result);
+    if (result) {
+        return;
+    }
+
+    atomic_store_relaxed(&last_stats->used,         stats->used);
+    atomic_store_relaxed(&last_stats->packet_count, stats->n_packets);
+    atomic_store_relaxed(&last_stats->byte_count,   stats->n_bytes);
+    atomic_store_relaxed(&last_stats->tcp_flags,    stats->tcp_flags);
+
+    atomic_store_relaxed(&last_attrs->offloaded,    attrs->offloaded);
+    atomic_store_relaxed(&last_attrs->dp_layer,     attrs->dp_layer);
+
+}
+
+static void
+dp_netdev_flow_get_last_stats_attrs(struct dp_netdev_flow *netdev_flow,
+                                    struct dpif_flow_stats *stats,
+                                    struct dpif_flow_attrs *attrs,
+                                    int *result)
+{
+    struct dp_netdev_flow_stats *last_stats = &netdev_flow->last_stats;
+    struct dp_netdev_flow_attrs *last_attrs = &netdev_flow->last_attrs;
+
+    atomic_read_relaxed(&netdev_flow->netdev_flow_get_result, result);
+    if (*result) {
+        return;
+    }
+
+    atomic_read_relaxed(&last_stats->used,         &stats->used);
+    atomic_read_relaxed(&last_stats->packet_count, &stats->n_packets);
+    atomic_read_relaxed(&last_stats->byte_count,   &stats->n_bytes);
+    atomic_read_relaxed(&last_stats->tcp_flags,    &stats->tcp_flags);
+
+    atomic_read_relaxed(&last_attrs->offloaded,    &attrs->offloaded);
+    atomic_read_relaxed(&last_attrs->dp_layer,     &attrs->dp_layer);
+}
+
+static bool
+dpif_netdev_get_flow_offload_status(const struct dp_netdev *dp,
+                                    struct dp_netdev_flow *netdev_flow,
+                                    struct dpif_flow_stats *stats,
+                                    struct dpif_flow_attrs *attrs)
+{
+    uint64_t act_buf[1024 / 8];
+    struct nlattr *actions;
+    struct netdev *netdev;
+    struct match match;
+    struct ofpbuf buf;
+
+    int ret = 0;
+
+    if (!netdev_is_flow_api_enabled()) {
+        return false;
+    }
+
+    netdev = netdev_ports_get(netdev_flow->flow.in_port.odp_port,
+                              dpif_normalize_type(dp->class->type));
+    if (!netdev) {
+        return false;
+    }
+    ofpbuf_use_stack(&buf, &act_buf, sizeof act_buf);
+    /* Taking a global 'port_mutex' to fulfill thread safety
+     * restrictions for the netdev-offload-dpdk module.
+     *
+     * XXX: Main thread will try to pause/stop all revalidators during datapath
+     *      reconfiguration via datapath purge callback (dp_purge_cb) while
+     *      holding 'dp->port_mutex'.  So we're not waiting for mutex here.
+     *      Otherwise, deadlock is possible, bcause revalidators might sleep
+     *      waiting for the main thread to release the lock and main thread
+     *      will wait for them to stop processing.
+     *      This workaround might make statistics less accurate. Especially
+     *      for flow deletion case, since there will be no other attempt.  */
+    if (!ovs_mutex_trylock(&dp->port_mutex)) {
+        ret = netdev_flow_get(netdev, &match, &actions,
+                              &netdev_flow->mega_ufid, stats, attrs, &buf);
+        /* Storing statistics and attributes from the last request for
+         * later use on mutex contention. */
+        dp_netdev_flow_set_last_stats_attrs(netdev_flow, stats, attrs, ret);
+        ovs_mutex_unlock(&dp->port_mutex);
+    } else {
+        dp_netdev_flow_get_last_stats_attrs(netdev_flow, stats, attrs, &ret);
+        if (!ret && !attrs->dp_layer) {
+            /* Flow was never reported as 'offloaded' so it's harmless
+             * to continue to think so. */
+            ret = EAGAIN;
+        }
+    }
+    netdev_close(netdev);
+    if (ret) {
+        return false;
+    }
+
+    return true;
+}
+
+static void
+get_dpif_flow_status(const struct dp_netdev *dp,
+                     const struct dp_netdev_flow *netdev_flow_,
+                     struct dpif_flow_stats *stats,
+                     struct dpif_flow_attrs *attrs)
+{
+    struct dpif_flow_stats offload_stats;
+    struct dpif_flow_attrs offload_attrs;
     struct dp_netdev_flow *netdev_flow;
     unsigned long long n;
     long long used;
@@ -2973,6 +3420,21 @@ get_dpif_flow_stats(const struct dp_netdev_flow *netdev_flow_,
     stats->used = used;
     atomic_read_relaxed(&netdev_flow->stats.tcp_flags, &flags);
     stats->tcp_flags = flags;
+
+    if (dpif_netdev_get_flow_offload_status(dp, netdev_flow,
+                                            &offload_stats, &offload_attrs)) {
+        stats->n_packets += offload_stats.n_packets;
+        stats->n_bytes += offload_stats.n_bytes;
+        stats->used = MAX(stats->used, offload_stats.used);
+        stats->tcp_flags |= offload_stats.tcp_flags;
+        if (attrs) {
+            attrs->offloaded = offload_attrs.offloaded;
+            attrs->dp_layer = offload_attrs.dp_layer;
+        }
+    } else if (attrs) {
+        attrs->offloaded = false;
+        attrs->dp_layer = "ovs";
+    }
 }
 
 /* Converts to the dpif_flow format, using 'key_buf' and 'mask_buf' for
@@ -2980,7 +3442,8 @@ get_dpif_flow_stats(const struct dp_netdev_flow *netdev_flow_,
  * 'mask_buf'. Actions will be returned without copying, by relying on RCU to
  * protect them. */
 static void
-dp_netdev_flow_to_dpif_flow(const struct dp_netdev_flow *netdev_flow,
+dp_netdev_flow_to_dpif_flow(const struct dp_netdev *dp,
+                            const struct dp_netdev_flow *netdev_flow,
                             struct ofpbuf *key_buf, struct ofpbuf *mask_buf,
                             struct dpif_flow *flow, bool terse)
 {
@@ -3023,7 +3486,9 @@ dp_netdev_flow_to_dpif_flow(const struct dp_netdev_flow *netdev_flow,
     flow->ufid = netdev_flow->ufid;
     flow->ufid_present = true;
     flow->pmd_id = netdev_flow->pmd_id;
-    get_dpif_flow_stats(netdev_flow, &flow->stats);
+
+    get_dpif_flow_status(dp, netdev_flow, &flow->stats, &flow->attrs);
+    flow->attrs.dp_extra_info = netdev_flow->dp_extra_info;
 }
 
 static int
@@ -3034,7 +3499,7 @@ dpif_netdev_mask_from_nlattrs(const struct nlattr *key, uint32_t key_len,
 {
     enum odp_key_fitness fitness;
 
-    fitness = odp_flow_key_to_mask(mask_key, mask_key_len, wc, flow);
+    fitness = odp_flow_key_to_mask(mask_key, mask_key_len, wc, flow, NULL);
     if (fitness) {
         if (!probe) {
             /* This should not happen: it indicates that
@@ -3065,7 +3530,7 @@ static int
 dpif_netdev_flow_from_nlattrs(const struct nlattr *key, uint32_t key_len,
                               struct flow *flow, bool probe)
 {
-    if (odp_flow_key_to_flow(key, key_len, flow)) {
+    if (odp_flow_key_to_flow(key, key_len, flow, NULL)) {
         if (!probe) {
             /* This should not happen: it indicates that
              * odp_flow_key_from_flow() and odp_flow_key_to_flow() disagree on
@@ -3126,8 +3591,8 @@ dpif_netdev_flow_get(const struct dpif *dpif, const struct dpif_flow_get *get)
         netdev_flow = dp_netdev_pmd_find_flow(pmd, get->ufid, get->key,
                                               get->key_len);
         if (netdev_flow) {
-            dp_netdev_flow_to_dpif_flow(netdev_flow, get->buffer, get->buffer,
-                                        get->flow, false);
+            dp_netdev_flow_to_dpif_flow(dp, netdev_flow, get->buffer,
+                                        get->buffer, get->flow, false);
             error = 0;
             break;
         } else {
@@ -3154,7 +3619,7 @@ dp_netdev_get_mega_ufid(const struct match *match, ovs_u128 *mega_ufid)
         ((uint8_t *)&masked_flow)[i] = ((uint8_t *)&match->flow)[i] &
                                        ((uint8_t *)&match->wc)[i];
     }
-    dpif_flow_hash(NULL, &masked_flow, sizeof(struct flow), mega_ufid);
+    odp_flow_key_hash(&masked_flow, sizeof masked_flow, mega_ufid);
 }
 
 static struct dp_netdev_flow *
@@ -3163,9 +3628,11 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
                    const struct nlattr *actions, size_t actions_len)
     OVS_REQUIRES(pmd->flow_mutex)
 {
+    struct ds extra_info = DS_EMPTY_INITIALIZER;
     struct dp_netdev_flow *flow;
     struct netdev_flow_key mask;
     struct dpcls *cls;
+    size_t unit;
 
     /* Make sure in_port is exact matched before we read it. */
     ovs_assert(match->wc.masks.in_port.odp_port == ODPP_NONE);
@@ -3190,6 +3657,9 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
     /* Do not allocate extra space. */
     flow = xmalloc(sizeof *flow - sizeof flow->cr.flow.mf + mask.len);
     memset(&flow->stats, 0, sizeof flow->stats);
+    atomic_init(&flow->netdev_flow_get_result, 0);
+    memset(&flow->last_stats, 0, sizeof flow->last_stats);
+    memset(&flow->last_attrs, 0, sizeof flow->last_attrs);
     flow->dead = false;
     flow->batch = NULL;
     flow->mark = INVALID_FLOW_MARK;
@@ -3206,6 +3676,18 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
     cls = dp_netdev_pmd_find_dpcls(pmd, in_port);
     dpcls_insert(cls, &flow->cr, &mask);
 
+    ds_put_cstr(&extra_info, "miniflow_bits(");
+    FLOWMAP_FOR_EACH_UNIT (unit) {
+        if (unit) {
+            ds_put_char(&extra_info, ',');
+        }
+        ds_put_format(&extra_info, "%d",
+                      count_1bits(flow->cr.mask->mf.map.bits[unit]));
+    }
+    ds_put_char(&extra_info, ')');
+    flow->dp_extra_info = ds_steal_cstr(&extra_info);
+    ds_destroy(&extra_info);
+
     cmap_insert(&pmd->flow_table, CONST_CAST(struct cmap_node *, &flow->node),
                 dp_netdev_flow_hash(&flow->ufid));
 
@@ -3229,6 +3711,8 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
 
         ds_put_cstr(&ds, "flow_add: ");
         odp_format_ufid(ufid, &ds);
+        ds_put_cstr(&ds, " mega_");
+        odp_format_ufid(&flow->mega_ufid, &ds);
         ds_put_cstr(&ds, " ");
         odp_flow_format(key_buf.data, key_buf.size,
                         mask_buf.data, mask_buf.size,
@@ -3277,13 +3761,8 @@ flow_put_on_pmd(struct dp_netdev_pmd_thread *pmd,
     netdev_flow = dp_netdev_pmd_lookup_flow(pmd, key, NULL);
     if (!netdev_flow) {
         if (put->flags & DPIF_FP_CREATE) {
-            if (cmap_count(&pmd->flow_table) < MAX_FLOWS) {
-                dp_netdev_flow_add(pmd, match, ufid, put->actions,
-                                   put->actions_len);
-                error = 0;
-            } else {
-                error = EFBIG;
-            }
+            dp_netdev_flow_add(pmd, match, ufid, put->actions,
+                               put->actions_len);
         } else {
             error = ENOENT;
         }
@@ -3302,7 +3781,7 @@ flow_put_on_pmd(struct dp_netdev_pmd_thread *pmd,
                                   put->actions, put->actions_len);
 
             if (stats) {
-                get_dpif_flow_stats(netdev_flow, stats);
+                get_dpif_flow_status(pmd->dp, netdev_flow, stats, NULL);
             }
             if (put->flags & DPIF_FP_ZERO_STATS) {
                 /* XXX: The userspace datapath uses thread local statistics
@@ -3358,7 +3837,17 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
     if (put->ufid) {
         ufid = *put->ufid;
     } else {
-        dpif_flow_hash(dpif, &match.flow, sizeof match.flow, &ufid);
+        odp_flow_key_hash(&match.flow, sizeof match.flow, &ufid);
+    }
+
+    /* The Netlink encoding of datapath flow keys cannot express
+     * wildcarding the presence of a VLAN tag. Instead, a missing VLAN
+     * tag is interpreted as exact match on the fact that there is no
+     * VLAN.  Unless we refactor a lot of code that translates between
+     * Netlink and struct flow representations, we have to do the same
+     * here.  This must be in sync with 'match' in handle_packet_upcall(). */
+    if (!match.wc.masks.vlans[0].tci) {
+        match.wc.masks.vlans[0].tci = htons(0xffff);
     }
 
     /* Must produce a netdev_flow_key for lookup.
@@ -3411,7 +3900,7 @@ flow_del_on_pmd(struct dp_netdev_pmd_thread *pmd,
                                           del->key_len);
     if (netdev_flow) {
         if (stats) {
-            get_dpif_flow_stats(netdev_flow, stats);
+            get_dpif_flow_status(pmd->dp, netdev_flow, stats, NULL);
         }
         dp_netdev_pmd_remove_flow(pmd, netdev_flow);
     } else {
@@ -3481,7 +3970,7 @@ dpif_netdev_flow_dump_cast(struct dpif_flow_dump *dump)
 
 static struct dpif_flow_dump *
 dpif_netdev_flow_dump_create(const struct dpif *dpif_, bool terse,
-                             char *type OVS_UNUSED)
+                             struct dpif_flow_dump_types *types OVS_UNUSED)
 {
     struct dpif_netdev_flow_dump *dump;
 
@@ -3545,13 +4034,13 @@ dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_,
         = dpif_netdev_flow_dump_thread_cast(thread_);
     struct dpif_netdev_flow_dump *dump = thread->dump;
     struct dp_netdev_flow *netdev_flows[FLOW_DUMP_MAX_BATCH];
+    struct dpif_netdev *dpif = dpif_netdev_cast(thread->up.dpif);
+    struct dp_netdev *dp = get_dp_netdev(&dpif->dpif);
     int n_flows = 0;
     int i;
 
     ovs_mutex_lock(&dump->mutex);
     if (!dump->status) {
-        struct dpif_netdev *dpif = dpif_netdev_cast(thread->up.dpif);
-        struct dp_netdev *dp = get_dp_netdev(&dpif->dpif);
         struct dp_netdev_pmd_thread *pmd = dump->cur_pmd;
         int flow_limit = MIN(max_flows, FLOW_DUMP_MAX_BATCH);
 
@@ -3608,7 +4097,7 @@ dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_,
 
         ofpbuf_use_stack(&key, keybuf, sizeof *keybuf);
         ofpbuf_use_stack(&mask, maskbuf, sizeof *maskbuf);
-        dp_netdev_flow_to_dpif_flow(netdev_flow, &key, &mask, f,
+        dp_netdev_flow_to_dpif_flow(dp, netdev_flow, &key, &mask, f,
                                     dump->up.terse);
     }
 
@@ -3655,7 +4144,8 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
         ovs_mutex_lock(&dp->non_pmd_mutex);
     }
 
-    /* Update current time in PMD context. */
+    /* Update current time in PMD context. We don't care about EMC insertion
+     * probability, because we are on a slow path. */
     pmd_thread_ctx_time_update(pmd);
 
     /* The action processing expects the RSS hash to be valid, because
@@ -3669,6 +4159,7 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
     }
 
     dp_packet_batch_init_packet(&pp, execute->packet);
+    pp.do_not_steal = true;
     dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
                               execute->actions, execute->actions_len);
     dp_netdev_pmd_flush_output_packets(pmd, true);
@@ -3682,7 +4173,8 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
 }
 
 static void
-dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
+dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops,
+                    enum dpif_offload_type offload_type OVS_UNUSED)
 {
     size_t i;
 
@@ -3709,18 +4201,78 @@ dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
     }
 }
 
-/* Applies datapath configuration from the database. Some of the changes are
- * actually applied in dpif_netdev_run(). */
-static int
-dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
+/* Enable or Disable PMD auto load balancing. */
+static void
+set_pmd_auto_lb(struct dp_netdev *dp, bool always_log)
+{
+    unsigned int cnt = 0;
+    struct dp_netdev_pmd_thread *pmd;
+    struct pmd_auto_lb *pmd_alb = &dp->pmd_alb;
+    uint8_t rebalance_load_thresh;
+
+    bool enable_alb = false;
+    bool multi_rxq = false;
+    bool pmd_rxq_assign_cyc = dp->pmd_rxq_assign_cyc;
+
+    /* Ensure that there is at least 2 non-isolated PMDs and
+     * one of them is polling more than one rxq. */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->core_id == NON_PMD_CORE_ID || pmd->isolated) {
+            continue;
+        }
+
+        if (hmap_count(&pmd->poll_list) > 1) {
+            multi_rxq = true;
+        }
+        if (cnt && multi_rxq) {
+                enable_alb = true;
+                break;
+        }
+        cnt++;
+    }
+
+    /* Enable auto LB if it is requested and cycle based assignment is true. */
+    enable_alb = enable_alb && pmd_rxq_assign_cyc &&
+                    pmd_alb->auto_lb_requested;
+
+    if (pmd_alb->is_enabled != enable_alb || always_log) {
+        pmd_alb->is_enabled = enable_alb;
+        if (pmd_alb->is_enabled) {
+            atomic_read_relaxed(&pmd_alb->rebalance_load_thresh,
+                                &rebalance_load_thresh);
+            VLOG_INFO("PMD auto load balance is enabled "
+                      "interval %"PRIu64" mins, "
+                      "pmd load threshold %"PRIu8"%%, "
+                      "improvement threshold %"PRIu8"%%",
+                       pmd_alb->rebalance_intvl / MIN_TO_MSEC,
+                       rebalance_load_thresh,
+                       pmd_alb->rebalance_improve_thresh);
+
+        } else {
+            pmd_alb->rebalance_poll_timer = 0;
+            VLOG_INFO("PMD auto load balance is disabled");
+        }
+    }
+}
+
+/* Applies datapath configuration from the database. Some of the changes are
+ * actually applied in dpif_netdev_run(). */
+static int
+dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     const char *cmask = smap_get(other_config, "pmd-cpu-mask");
+    const char *pmd_rxq_assign = smap_get_def(other_config, "pmd-rxq-assign",
+                                             "cycles");
     unsigned long long insert_prob =
         smap_get_ullong(other_config, "emc-insert-inv-prob",
                         DEFAULT_EM_FLOW_INSERT_INV_PROB);
     uint32_t insert_min, cur_min;
     uint32_t tx_flush_interval, cur_tx_flush_interval;
+    uint64_t rebalance_intvl;
+    uint8_t rebalance_load, cur_rebalance_load;
+    uint8_t rebalance_improve;
+    bool log_autolb = false;
 
     tx_flush_interval = smap_get_int(other_config, "tx-flush-interval",
                                      DEFAULT_TX_FLUSH_INTERVAL);
@@ -3748,7 +4300,7 @@ dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
     if (insert_min != cur_min) {
         atomic_store_relaxed(&dp->emc_insert_min, insert_min);
         if (insert_min == 0) {
-            VLOG_INFO("EMC has been disabled");
+            VLOG_INFO("EMC insertion probability changed to zero");
         } else {
             VLOG_INFO("EMC insertion probability changed to 1/%llu (~%.2f%%)",
                       insert_prob, (100 / (float)insert_prob));
@@ -3778,6 +4330,66 @@ dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
             VLOG_INFO("SMC cache is disabled");
         }
     }
+
+    bool pmd_rxq_assign_cyc = !strcmp(pmd_rxq_assign, "cycles");
+    if (!pmd_rxq_assign_cyc && strcmp(pmd_rxq_assign, "roundrobin")) {
+        VLOG_WARN("Unsupported Rxq to PMD assignment mode in pmd-rxq-assign. "
+                      "Defaulting to 'cycles'.");
+        pmd_rxq_assign_cyc = true;
+        pmd_rxq_assign = "cycles";
+    }
+    if (dp->pmd_rxq_assign_cyc != pmd_rxq_assign_cyc) {
+        dp->pmd_rxq_assign_cyc = pmd_rxq_assign_cyc;
+        VLOG_INFO("Rxq to PMD assignment mode changed to: \'%s\'.",
+                  pmd_rxq_assign);
+        dp_netdev_request_reconfigure(dp);
+    }
+
+    struct pmd_auto_lb *pmd_alb = &dp->pmd_alb;
+    pmd_alb->auto_lb_requested = smap_get_bool(other_config, "pmd-auto-lb",
+                              false);
+
+    rebalance_intvl = smap_get_int(other_config, "pmd-auto-lb-rebal-interval",
+                                   ALB_REBALANCE_INTERVAL);
+
+    /* Input is in min, convert it to msec. */
+    rebalance_intvl =
+        rebalance_intvl ? rebalance_intvl * MIN_TO_MSEC : MIN_TO_MSEC;
+
+    if (pmd_alb->rebalance_intvl != rebalance_intvl) {
+        pmd_alb->rebalance_intvl = rebalance_intvl;
+        VLOG_INFO("PMD auto load balance interval set to "
+                  "%"PRIu64" mins\n", rebalance_intvl / MIN_TO_MSEC);
+        log_autolb = true;
+    }
+
+    rebalance_improve = smap_get_int(other_config,
+                                     "pmd-auto-lb-improvement-threshold",
+                                     ALB_IMPROVEMENT_THRESHOLD);
+    if (rebalance_improve > 100) {
+        rebalance_improve = ALB_IMPROVEMENT_THRESHOLD;
+    }
+    if (rebalance_improve != pmd_alb->rebalance_improve_thresh) {
+        pmd_alb->rebalance_improve_thresh = rebalance_improve;
+        VLOG_INFO("PMD auto load balance improvement threshold set to "
+                  "%"PRIu8"%%", rebalance_improve);
+        log_autolb = true;
+    }
+
+    rebalance_load = smap_get_int(other_config, "pmd-auto-lb-load-threshold",
+                                  ALB_LOAD_THRESHOLD);
+    if (rebalance_load > 100) {
+        rebalance_load = ALB_LOAD_THRESHOLD;
+    }
+    atomic_read_relaxed(&pmd_alb->rebalance_load_thresh, &cur_rebalance_load);
+    if (rebalance_load != cur_rebalance_load) {
+        atomic_store_relaxed(&pmd_alb->rebalance_load_thresh,
+                             rebalance_load);
+        VLOG_INFO("PMD auto load balance load threshold set to %"PRIu8"%%",
+                  rebalance_load);
+        log_autolb = true;
+    }
+    set_pmd_auto_lb(dp, log_autolb);
     return 0;
 }
 
@@ -3840,8 +4452,29 @@ exit:
     return error;
 }
 
-/* Changes the affinity of port's rx queues.  The changes are actually applied
- * in dpif_netdev_run(). */
+/* Returns 'true' if one of the 'port's RX queues exists in 'poll_list'
+ * of given PMD thread. */
+static bool
+dpif_netdev_pmd_polls_port(struct dp_netdev_pmd_thread *pmd,
+                           struct dp_netdev_port *port)
+    OVS_EXCLUDED(pmd->port_mutex)
+{
+    struct rxq_poll *poll;
+    bool found = false;
+
+    ovs_mutex_lock(&pmd->port_mutex);
+    HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+        if (port == poll->rxq->port) {
+            found = true;
+            break;
+        }
+    }
+    ovs_mutex_unlock(&pmd->port_mutex);
+    return found;
+}
+
+/* Updates port configuration from the database.  The changes are actually
+ * applied in dpif_netdev_run(). */
 static int
 dpif_netdev_port_set_config(struct dpif *dpif, odp_port_t port_no,
                             const struct smap *cfg)
@@ -3850,10 +4483,49 @@ dpif_netdev_port_set_config(struct dpif *dpif, odp_port_t port_no,
     struct dp_netdev_port *port;
     int error = 0;
     const char *affinity_list = smap_get(cfg, "pmd-rxq-affinity");
+    bool emc_enabled = smap_get_bool(cfg, "emc-enable", true);
 
     ovs_mutex_lock(&dp->port_mutex);
     error = get_port_by_number(dp, port_no, &port);
-    if (error || !netdev_is_pmd(port->netdev)
+    if (error) {
+        goto unlock;
+    }
+
+    if (emc_enabled != port->emc_enabled) {
+        struct dp_netdev_pmd_thread *pmd;
+        struct ds ds = DS_EMPTY_INITIALIZER;
+        uint32_t cur_min, insert_prob;
+
+        port->emc_enabled = emc_enabled;
+        /* Mark for reload all the threads that polls this port and request
+         * for reconfiguration for the actual reloading of threads. */
+        CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+            if (dpif_netdev_pmd_polls_port(pmd, port)) {
+                pmd->need_reload = true;
+            }
+        }
+        dp_netdev_request_reconfigure(dp);
+
+        ds_put_format(&ds, "%s: EMC has been %s.",
+                      netdev_get_name(port->netdev),
+                      (emc_enabled) ? "enabled" : "disabled");
+        if (emc_enabled) {
+            ds_put_cstr(&ds, " Current insertion probability is ");
+            atomic_read_relaxed(&dp->emc_insert_min, &cur_min);
+            if (!cur_min) {
+                ds_put_cstr(&ds, "zero.");
+            } else {
+                insert_prob = UINT32_MAX / cur_min;
+                ds_put_format(&ds, "1/%"PRIu32" (~%.2f%%).",
+                              insert_prob, 100 / (float) insert_prob);
+            }
+        }
+        VLOG_INFO("%s", ds_cstr(&ds));
+        ds_destroy(&ds);
+    }
+
+    /* Checking for RXq affinity changes. */
+    if (!netdev_is_pmd(port->netdev)
         || nullable_string_is_equal(affinity_list, port->rxq_affinity_list)) {
         goto unlock;
     }
@@ -4065,7 +4737,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
         /* At least one packet received. */
         *recirc_depth_get() = 0;
         pmd_thread_ctx_time_update(pmd);
-        batch_cnt = batch.count;
+        batch_cnt = dp_packet_batch_size(&batch);
         if (pmd_perf_metrics_enabled(pmd)) {
             /* Update batch histogram. */
             s->current.batches++;
@@ -4116,6 +4788,20 @@ tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
     return NULL;
 }
 
+static struct tx_bond *
+tx_bond_lookup(const struct cmap *tx_bonds, uint32_t bond_id)
+{
+    uint32_t hash = hash_bond_id(bond_id);
+    struct tx_bond *tx;
+
+    CMAP_FOR_EACH_WITH_HASH (tx, node, hash, tx_bonds) {
+        if (tx->bond_id == bond_id) {
+            return tx;
+        }
+    }
+    return NULL;
+}
+
 static int
 port_reconfigure(struct dp_netdev_port *port)
 {
@@ -4248,10 +4934,18 @@ rr_numa_list_populate(struct dp_netdev *dp, struct rr_numa_list *rr)
     }
 }
 
-/* Returns the next pmd from the numa node in
- * incrementing or decrementing order. */
+/*
+ * Returns the next pmd from the numa node.
+ *
+ * If 'updown' is 'true' it will alternate between selecting the next pmd in
+ * either an up or down walk, switching between up/down when the first or last
+ * core is reached. e.g. 1,2,3,3,2,1,1,2...
+ *
+ * If 'updown' is 'false' it will select the next pmd wrapping around when last
+ * core reached. e.g. 1,2,3,1,2,3,1,2...
+ */
 static struct dp_netdev_pmd_thread *
-rr_numa_get_pmd(struct rr_numa *numa)
+rr_numa_get_pmd(struct rr_numa *numa, bool updown)
 {
     int numa_idx = numa->cur_index;
 
@@ -4259,7 +4953,11 @@ rr_numa_get_pmd(struct rr_numa *numa)
         /* Incrementing through list of pmds. */
         if (numa->cur_index == numa->n_pmds-1) {
             /* Reached the last pmd. */
-            numa->idx_inc = false;
+            if (updown) {
+                numa->idx_inc = false;
+            } else {
+                numa->cur_index = 0;
+            }
         } else {
             numa->cur_index++;
         }
@@ -4322,9 +5020,6 @@ compare_rxq_cycles(const void *a, const void *b)
  * queues and marks the pmds as isolated.  Otherwise, assign non isolated
  * pmds to unpinned queues.
  *
- * If 'pinned' is false queues will be sorted by processing cycles they are
- * consuming and then assigned to pmds in round robin order.
- *
  * The function doesn't touch the pmd threads, it just stores the assignment
  * in the 'pmd' member of each rxq. */
 static void
@@ -4337,6 +5032,7 @@ rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
     int n_rxqs = 0;
     struct rr_numa *numa = NULL;
     int numa_id;
+    bool assign_cyc = dp->pmd_rxq_assign_cyc;
 
     HMAP_FOR_EACH (port, node, &dp->ports) {
         if (!netdev_is_pmd(port->netdev)) {
@@ -4357,6 +5053,10 @@ rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
                 } else {
                     q->pmd = pmd;
                     pmd->isolated = true;
+                    VLOG_INFO("Core %d on numa node %d assigned port \'%s\' "
+                              "rx queue %d.", pmd->core_id, pmd->numa_id,
+                              netdev_rxq_get_name(q->rx),
+                              netdev_rxq_get_queue_id(q->rx));
                     dp_netdev_pmd_unref(pmd);
                 }
             } else if (!pinned && q->core_id == OVS_CORE_UNSPEC) {
@@ -4367,19 +5067,22 @@ rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
                 } else {
                     rxqs = xrealloc(rxqs, sizeof *rxqs * (n_rxqs + 1));
                 }
-                /* Sum the queue intervals and store the cycle history. */
-                for (unsigned i = 0; i < PMD_RXQ_INTERVAL_MAX; i++) {
-                    cycle_hist += dp_netdev_rxq_get_intrvl_cycles(q, i);
-                }
-                dp_netdev_rxq_set_cycles(q, RXQ_CYCLES_PROC_HIST, cycle_hist);
 
+                if (assign_cyc) {
+                    /* Sum the queue intervals and store the cycle history. */
+                    for (unsigned i = 0; i < PMD_RXQ_INTERVAL_MAX; i++) {
+                        cycle_hist += dp_netdev_rxq_get_intrvl_cycles(q, i);
+                    }
+                    dp_netdev_rxq_set_cycles(q, RXQ_CYCLES_PROC_HIST,
+                                             cycle_hist);
+                }
                 /* Store the queue. */
                 rxqs[n_rxqs++] = q;
             }
         }
     }
 
-    if (n_rxqs > 1) {
+    if (n_rxqs > 1 && assign_cyc) {
         /* Sort the queues in order of the processing cycles
          * they consumed during their last pmd interval. */
         qsort(rxqs, n_rxqs, sizeof *rxqs, compare_rxq_cycles);
@@ -4403,7 +5106,7 @@ rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
                          netdev_rxq_get_queue_id(rxqs[i]->rx));
                 continue;
             }
-            rxqs[i]->pmd = rr_numa_get_pmd(non_local_numa);
+            rxqs[i]->pmd = rr_numa_get_pmd(non_local_numa, assign_cyc);
             VLOG_WARN("There's no available (non-isolated) pmd thread "
                       "on numa node %d. Queue %d on port \'%s\' will "
                       "be assigned to the pmd on core %d "
@@ -4412,13 +5115,22 @@ rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
                       netdev_rxq_get_name(rxqs[i]->rx),
                       rxqs[i]->pmd->core_id, rxqs[i]->pmd->numa_id);
         } else {
-        rxqs[i]->pmd = rr_numa_get_pmd(numa);
-        VLOG_INFO("Core %d on numa node %d assigned port \'%s\' "
-                  "rx queue %d (measured processing cycles %"PRIu64").",
-                  rxqs[i]->pmd->core_id, numa_id,
-                  netdev_rxq_get_name(rxqs[i]->rx),
-                  netdev_rxq_get_queue_id(rxqs[i]->rx),
-                  dp_netdev_rxq_get_cycles(rxqs[i], RXQ_CYCLES_PROC_HIST));
+            rxqs[i]->pmd = rr_numa_get_pmd(numa, assign_cyc);
+            if (assign_cyc) {
+                VLOG_INFO("Core %d on numa node %d assigned port \'%s\' "
+                          "rx queue %d "
+                          "(measured processing cycles %"PRIu64").",
+                          rxqs[i]->pmd->core_id, numa_id,
+                          netdev_rxq_get_name(rxqs[i]->rx),
+                          netdev_rxq_get_queue_id(rxqs[i]->rx),
+                          dp_netdev_rxq_get_cycles(rxqs[i],
+                                                   RXQ_CYCLES_PROC_HIST));
+            } else {
+                VLOG_INFO("Core %d on numa node %d assigned port \'%s\' "
+                          "rx queue %d.", rxqs[i]->pmd->core_id, numa_id,
+                          netdev_rxq_get_name(rxqs[i]->rx),
+                          netdev_rxq_get_queue_id(rxqs[i]->rx));
+            }
         }
     }
 
@@ -4435,6 +5147,19 @@ reload_affected_pmds(struct dp_netdev *dp)
         if (pmd->need_reload) {
             flow_mark_flush(pmd);
             dp_netdev_reload_pmd__(pmd);
+        }
+    }
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->need_reload) {
+            if (pmd->core_id != NON_PMD_CORE_ID) {
+                bool reload;
+
+                do {
+                    atomic_read_explicit(&pmd->reload, &reload,
+                                         memory_order_acquire);
+                } while (reload);
+            }
             pmd->need_reload = false;
         }
     }
@@ -4481,6 +5206,7 @@ reconfigure_pmd_threads(struct dp_netdev *dp)
                                                     pmd->core_id)) {
             hmapx_add(&to_delete, pmd);
         } else if (need_to_adjust_static_tx_qids) {
+            atomic_store_relaxed(&pmd->reload_tx_qid, true);
             pmd->need_reload = true;
         }
     }
@@ -4504,9 +5230,16 @@ reconfigure_pmd_threads(struct dp_netdev *dp)
     FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
         pmd = dp_netdev_get_pmd(dp, core->core_id);
         if (!pmd) {
+            struct ds name = DS_EMPTY_INITIALIZER;
+
             pmd = xzalloc(sizeof *pmd);
             dp_netdev_configure_pmd(pmd, dp, core->core_id, core->numa_id);
-            pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
+
+            ds_put_format(&name, "pmd-c%02d/id:", core->core_id);
+            pmd->thread = ovs_thread_create(ds_cstr(&name),
+                                            pmd_thread_main, pmd);
+            ds_destroy(&name);
+
             VLOG_INFO("PMD thread on numa_id: %d, core id: %2d created.",
                       pmd->numa_id, pmd->core_id);
             changed = true;
@@ -4564,6 +5297,7 @@ static void
 reconfigure_datapath(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex)
 {
+    struct hmapx busy_threads = HMAPX_INITIALIZER(&busy_threads);
     struct dp_netdev_pmd_thread *pmd;
     struct dp_netdev_port *port;
     int wanted_txqs;
@@ -4587,9 +5321,17 @@ reconfigure_datapath(struct dp_netdev *dp)
 
     /* Check for all the ports that need reconfiguration.  We cache this in
      * 'port->need_reconfigure', because netdev_is_reconf_required() can
-     * change at any time. */
+     * change at any time.
+     * Also mark for reconfiguration all ports which will likely change their
+     * 'dynamic_txqs' parameter.  It's required to stop using them before
+     * changing this setting and it's simpler to mark ports here and allow
+     * 'pmd_remove_stale_ports' to remove them from threads.  There will be
+     * no actual reconfiguration in 'port_reconfigure' because it's
+     * unnecessary.  */
     HMAP_FOR_EACH (port, node, &dp->ports) {
-        if (netdev_is_reconf_required(port->netdev)) {
+        if (netdev_is_reconf_required(port->netdev)
+            || (port->dynamic_txqs
+                != (netdev_n_txq(port->netdev) < wanted_txqs))) {
             port->need_reconfigure = true;
         }
     }
@@ -4651,6 +5393,18 @@ reconfigure_datapath(struct dp_netdev *dp)
     rxq_scheduling(dp, false);
 
     /* Step 5: Remove queues not compliant with new scheduling. */
+
+    /* Count all the threads that will have at least one queue to poll. */
+    HMAP_FOR_EACH (port, node, &dp->ports) {
+        for (int qid = 0; qid < port->n_rxq; qid++) {
+            struct dp_netdev_rxq *q = &port->rxqs[qid];
+
+            if (q->pmd) {
+                hmapx_add(&busy_threads, q->pmd);
+            }
+        }
+    }
+
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
         struct rxq_poll *poll, *poll_next;
 
@@ -4658,11 +5412,21 @@ reconfigure_datapath(struct dp_netdev *dp)
         HMAP_FOR_EACH_SAFE (poll, poll_next, node, &pmd->poll_list) {
             if (poll->rxq->pmd != pmd) {
                 dp_netdev_del_rxq_from_pmd(pmd, poll);
+
+                /* This pmd might sleep after this step if it has no rxq
+                 * remaining. Tell it to busy wait for new assignment if it
+                 * has at least one scheduled queue. */
+                if (hmap_count(&pmd->poll_list) == 0 &&
+                    hmapx_contains(&busy_threads, pmd)) {
+                    atomic_store_relaxed(&pmd->wait_for_reload, true);
+                }
             }
         }
         ovs_mutex_unlock(&pmd->port_mutex);
     }
 
+    hmapx_destroy(&busy_threads);
+
     /* Reload affected pmd threads.  We must wait for the pmd threads to remove
      * the old queues before readding them, otherwise a queue can be polled by
      * two threads at the same time. */
@@ -4685,20 +5449,31 @@ reconfigure_datapath(struct dp_netdev *dp)
         }
     }
 
-    /* Add every port to the tx cache of every pmd thread, if it's not
-     * there already and if this pmd has at least one rxq to poll. */
+    /* Add every port and bond to the tx port and bond caches of
+     * every pmd thread, if it's not there already and if this pmd
+     * has at least one rxq to poll.
+     */
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
         ovs_mutex_lock(&pmd->port_mutex);
         if (hmap_count(&pmd->poll_list) || pmd->core_id == NON_PMD_CORE_ID) {
+            struct tx_bond *bond;
+
             HMAP_FOR_EACH (port, node, &dp->ports) {
                 dp_netdev_add_port_tx_to_pmd(pmd, port);
             }
+
+            CMAP_FOR_EACH (bond, node, &dp->tx_bonds) {
+                dp_netdev_add_bond_tx_to_pmd(pmd, bond, false);
+            }
         }
         ovs_mutex_unlock(&pmd->port_mutex);
     }
 
     /* Reload affected pmd threads. */
     reload_affected_pmds(dp);
+
+    /* Check if PMD Auto LB is to be enabled */
+    set_pmd_auto_lb(dp, false);
 }
 
 /* Returns true if one of the netdevs in 'dp' requires a reconfiguration */
@@ -4717,127 +5492,404 @@ ports_require_restart(const struct dp_netdev *dp)
     return false;
 }
 
-/* Return true if needs to revalidate datapath flows. */
-static bool
-dpif_netdev_run(struct dpif *dpif)
+/* Calculates variance in the values stored in array 'a'. 'n' is the number
+ * of elements in array to be considered for calculating vairance.
+ * Usage example: data array 'a' contains the processing load of each pmd and
+ * 'n' is the number of PMDs. It returns the variance in processing load of
+ * PMDs*/
+static uint64_t
+variance(uint64_t a[], int n)
 {
-    struct dp_netdev_port *port;
-    struct dp_netdev *dp = get_dp_netdev(dpif);
-    struct dp_netdev_pmd_thread *non_pmd;
-    uint64_t new_tnl_seq;
-    bool need_to_flush = true;
-
-    ovs_mutex_lock(&dp->port_mutex);
-    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
-    if (non_pmd) {
-        ovs_mutex_lock(&dp->non_pmd_mutex);
-        HMAP_FOR_EACH (port, node, &dp->ports) {
-            if (!netdev_is_pmd(port->netdev)) {
-                int i;
-
-                for (i = 0; i < port->n_rxq; i++) {
-                    if (dp_netdev_process_rxq_port(non_pmd,
-                                                   &port->rxqs[i],
-                                                   port->port_no)) {
-                        need_to_flush = false;
-                    }
-                }
-            }
-        }
-        if (need_to_flush) {
-            /* We didn't receive anything in the process loop.
-             * Check if we need to send something.
-             * There was no time updates on current iteration. */
-            pmd_thread_ctx_time_update(non_pmd);
-            dp_netdev_pmd_flush_output_packets(non_pmd, false);
-        }
-
-        dpif_netdev_xps_revalidate_pmd(non_pmd, false);
-        ovs_mutex_unlock(&dp->non_pmd_mutex);
+    /* Compute mean (average of elements). */
+    uint64_t sum = 0;
+    uint64_t mean = 0;
+    uint64_t sqDiff = 0;
 
-        dp_netdev_pmd_unref(non_pmd);
+    if (!n) {
+        return 0;
     }
 
-    if (dp_netdev_is_reconf_required(dp) || ports_require_restart(dp)) {
-        reconfigure_datapath(dp);
+    for (int i = 0; i < n; i++) {
+        sum += a[i];
     }
-    ovs_mutex_unlock(&dp->port_mutex);
 
-    tnl_neigh_cache_run();
-    tnl_port_map_run();
-    new_tnl_seq = seq_read(tnl_conf_seq);
+    if (sum) {
+        mean = sum / n;
 
-    if (dp->last_tnl_conf_seq != new_tnl_seq) {
-        dp->last_tnl_conf_seq = new_tnl_seq;
-        return true;
+        /* Compute sum squared differences with mean. */
+        for (int i = 0; i < n; i++) {
+            sqDiff += (a[i] - mean)*(a[i] - mean);
+        }
     }
-    return false;
+    return (sqDiff ? (sqDiff / n) : 0);
 }
 
-static void
-dpif_netdev_wait(struct dpif *dpif)
+
+/* Returns the variance in the PMDs usage as part of dry run of rxqs
+ * assignment to PMDs. */
+static bool
+get_dry_run_variance(struct dp_netdev *dp, uint32_t *core_list,
+                     uint32_t num_pmds, uint64_t *predicted_variance)
+    OVS_REQUIRES(dp->port_mutex)
 {
     struct dp_netdev_port *port;
-    struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_pmd_thread *pmd;
+    struct dp_netdev_rxq **rxqs = NULL;
+    struct rr_numa *numa = NULL;
+    struct rr_numa_list rr;
+    int n_rxqs = 0;
+    bool ret = false;
+    uint64_t *pmd_usage;
+
+    if (!predicted_variance) {
+        return ret;
+    }
+
+    pmd_usage = xcalloc(num_pmds, sizeof(uint64_t));
 
-    ovs_mutex_lock(&dp_netdev_mutex);
-    ovs_mutex_lock(&dp->port_mutex);
     HMAP_FOR_EACH (port, node, &dp->ports) {
-        netdev_wait_reconf_required(port->netdev);
         if (!netdev_is_pmd(port->netdev)) {
-            int i;
-
-            for (i = 0; i < port->n_rxq; i++) {
-                netdev_rxq_wait(port->rxqs[i].rx);
-            }
+            continue;
         }
-    }
-    ovs_mutex_unlock(&dp->port_mutex);
-    ovs_mutex_unlock(&dp_netdev_mutex);
-    seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
-}
 
-static void
-pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
-{
-    struct tx_port *tx_port_cached;
+        for (int qid = 0; qid < port->n_rxq; qid++) {
+            struct dp_netdev_rxq *q = &port->rxqs[qid];
+            uint64_t cycle_hist = 0;
 
-    /* Flush all the queued packets. */
-    dp_netdev_pmd_flush_output_packets(pmd, true);
-    /* Free all used tx queue ids. */
-    dpif_netdev_xps_revalidate_pmd(pmd, true);
+            if (q->pmd->isolated) {
+                continue;
+            }
 
-    HMAP_FOR_EACH_POP (tx_port_cached, node, &pmd->tnl_port_cache) {
-        free(tx_port_cached);
+            if (n_rxqs == 0) {
+                rxqs = xmalloc(sizeof *rxqs);
+            } else {
+                rxqs = xrealloc(rxqs, sizeof *rxqs * (n_rxqs + 1));
+            }
+
+            /* Sum the queue intervals and store the cycle history. */
+            for (unsigned i = 0; i < PMD_RXQ_INTERVAL_MAX; i++) {
+                cycle_hist += dp_netdev_rxq_get_intrvl_cycles(q, i);
+            }
+            dp_netdev_rxq_set_cycles(q, RXQ_CYCLES_PROC_HIST,
+                                         cycle_hist);
+            /* Store the queue. */
+            rxqs[n_rxqs++] = q;
+        }
     }
-    HMAP_FOR_EACH_POP (tx_port_cached, node, &pmd->send_port_cache) {
-        free(tx_port_cached);
+    if (n_rxqs > 1) {
+        /* Sort the queues in order of the processing cycles
+         * they consumed during their last pmd interval. */
+        qsort(rxqs, n_rxqs, sizeof *rxqs, compare_rxq_cycles);
     }
-}
+    rr_numa_list_populate(dp, &rr);
 
-/* Copies ports from 'pmd->tx_ports' (shared with the main thread) to
- * thread-local copies. Copy to 'pmd->tnl_port_cache' if it is a tunnel
- * device, otherwise to 'pmd->send_port_cache' if the port has at least
- * one txq. */
-static void
-pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
-    OVS_REQUIRES(pmd->port_mutex)
-{
-    struct tx_port *tx_port, *tx_port_cached;
+    for (int i = 0; i < n_rxqs; i++) {
+        int numa_id = netdev_get_numa_id(rxqs[i]->port->netdev);
+        numa = rr_numa_list_lookup(&rr, numa_id);
+        if (!numa) {
+            /* Abort if cross NUMA polling. */
+            VLOG_DBG("PMD auto lb dry run."
+                     " Aborting due to cross-numa polling.");
+            goto cleanup;
+        }
 
-    pmd_free_cached_ports(pmd);
-    hmap_shrink(&pmd->send_port_cache);
-    hmap_shrink(&pmd->tnl_port_cache);
+        pmd = rr_numa_get_pmd(numa, true);
+        VLOG_DBG("PMD auto lb dry run. Predicted: Core %d on numa node %d "
+                  "to be assigned port \'%s\' rx queue %d "
+                  "(measured processing cycles %"PRIu64").",
+                  pmd->core_id, numa_id,
+                  netdev_rxq_get_name(rxqs[i]->rx),
+                  netdev_rxq_get_queue_id(rxqs[i]->rx),
+                  dp_netdev_rxq_get_cycles(rxqs[i], RXQ_CYCLES_PROC_HIST));
 
-    HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) {
-        if (netdev_has_tunnel_push_pop(tx_port->port->netdev)) {
-            tx_port_cached = xmemdup(tx_port, sizeof *tx_port_cached);
-            hmap_insert(&pmd->tnl_port_cache, &tx_port_cached->node,
-                        hash_port_no(tx_port_cached->port->port_no));
+        for (int id = 0; id < num_pmds; id++) {
+            if (pmd->core_id == core_list[id]) {
+                /* Add the processing cycles of rxq to pmd polling it. */
+                pmd_usage[id] += dp_netdev_rxq_get_cycles(rxqs[i],
+                                        RXQ_CYCLES_PROC_HIST);
+            }
         }
+    }
 
-        if (netdev_n_txq(tx_port->port->netdev)) {
-            tx_port_cached = xmemdup(tx_port, sizeof *tx_port_cached);
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        uint64_t total_cycles = 0;
+
+        if ((pmd->core_id == NON_PMD_CORE_ID) || pmd->isolated) {
+            continue;
+        }
+
+        /* Get the total pmd cycles for an interval. */
+        atomic_read_relaxed(&pmd->intrvl_cycles, &total_cycles);
+        /* Estimate the cycles to cover all intervals. */
+        total_cycles *= PMD_RXQ_INTERVAL_MAX;
+        for (int id = 0; id < num_pmds; id++) {
+            if (pmd->core_id == core_list[id]) {
+                if (pmd_usage[id]) {
+                    pmd_usage[id] = (pmd_usage[id] * 100) / total_cycles;
+                }
+                VLOG_DBG("PMD auto lb dry run. Predicted: Core %d, "
+                         "usage %"PRIu64"", pmd->core_id, pmd_usage[id]);
+            }
+        }
+    }
+    *predicted_variance = variance(pmd_usage, num_pmds);
+    ret = true;
+
+cleanup:
+    rr_numa_list_destroy(&rr);
+    free(rxqs);
+    free(pmd_usage);
+    return ret;
+}
+
+/* Does the dry run of Rxq assignment to PMDs and returns true if it gives
+ * better distribution of load on PMDs. */
+static bool
+pmd_rebalance_dry_run(struct dp_netdev *dp)
+    OVS_REQUIRES(dp->port_mutex)
+{
+    struct dp_netdev_pmd_thread *pmd;
+    uint64_t *curr_pmd_usage;
+
+    uint64_t curr_variance;
+    uint64_t new_variance;
+    uint64_t improvement = 0;
+    uint32_t num_pmds;
+    uint32_t *pmd_corelist;
+    struct rxq_poll *poll;
+    bool ret;
+
+    num_pmds = cmap_count(&dp->poll_threads);
+
+    if (num_pmds > 1) {
+        curr_pmd_usage = xcalloc(num_pmds, sizeof(uint64_t));
+        pmd_corelist = xcalloc(num_pmds, sizeof(uint32_t));
+    } else {
+        return false;
+    }
+
+    num_pmds = 0;
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        uint64_t total_cycles = 0;
+        uint64_t total_proc = 0;
+
+        if ((pmd->core_id == NON_PMD_CORE_ID) || pmd->isolated) {
+            continue;
+        }
+
+        /* Get the total pmd cycles for an interval. */
+        atomic_read_relaxed(&pmd->intrvl_cycles, &total_cycles);
+        /* Estimate the cycles to cover all intervals. */
+        total_cycles *= PMD_RXQ_INTERVAL_MAX;
+
+        ovs_mutex_lock(&pmd->port_mutex);
+        HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+            for (unsigned i = 0; i < PMD_RXQ_INTERVAL_MAX; i++) {
+                total_proc += dp_netdev_rxq_get_intrvl_cycles(poll->rxq, i);
+            }
+        }
+        ovs_mutex_unlock(&pmd->port_mutex);
+
+        if (total_proc) {
+            curr_pmd_usage[num_pmds] = (total_proc * 100) / total_cycles;
+        }
+
+        VLOG_DBG("PMD auto lb dry run. Current: Core %d, usage %"PRIu64"",
+                  pmd->core_id, curr_pmd_usage[num_pmds]);
+
+        if (atomic_count_get(&pmd->pmd_overloaded)) {
+            atomic_count_set(&pmd->pmd_overloaded, 0);
+        }
+
+        pmd_corelist[num_pmds] = pmd->core_id;
+        num_pmds++;
+    }
+
+    curr_variance = variance(curr_pmd_usage, num_pmds);
+    ret = get_dry_run_variance(dp, pmd_corelist, num_pmds, &new_variance);
+
+    if (ret) {
+        VLOG_DBG("PMD auto lb dry run. Current PMD variance: %"PRIu64","
+                  " Predicted PMD variance: %"PRIu64"",
+                  curr_variance, new_variance);
+
+        if (new_variance < curr_variance) {
+            improvement =
+                ((curr_variance - new_variance) * 100) / curr_variance;
+        }
+        if (improvement < dp->pmd_alb.rebalance_improve_thresh) {
+            ret = false;
+        }
+    }
+
+    free(curr_pmd_usage);
+    free(pmd_corelist);
+    return ret;
+}
+
+
+/* Return true if needs to revalidate datapath flows. */
+static bool
+dpif_netdev_run(struct dpif *dpif)
+{
+    struct dp_netdev_port *port;
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_pmd_thread *non_pmd;
+    uint64_t new_tnl_seq;
+    bool need_to_flush = true;
+    bool pmd_rebalance = false;
+    long long int now = time_msec();
+    struct dp_netdev_pmd_thread *pmd;
+
+    ovs_mutex_lock(&dp->port_mutex);
+    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
+    if (non_pmd) {
+        ovs_mutex_lock(&dp->non_pmd_mutex);
+        HMAP_FOR_EACH (port, node, &dp->ports) {
+            if (!netdev_is_pmd(port->netdev)) {
+                int i;
+
+                if (port->emc_enabled) {
+                    atomic_read_relaxed(&dp->emc_insert_min,
+                                        &non_pmd->ctx.emc_insert_min);
+                } else {
+                    non_pmd->ctx.emc_insert_min = 0;
+                }
+
+                for (i = 0; i < port->n_rxq; i++) {
+
+                    if (!netdev_rxq_enabled(port->rxqs[i].rx)) {
+                        continue;
+                    }
+
+                    if (dp_netdev_process_rxq_port(non_pmd,
+                                                   &port->rxqs[i],
+                                                   port->port_no)) {
+                        need_to_flush = false;
+                    }
+                }
+            }
+        }
+        if (need_to_flush) {
+            /* We didn't receive anything in the process loop.
+             * Check if we need to send something.
+             * There was no time updates on current iteration. */
+            pmd_thread_ctx_time_update(non_pmd);
+            dp_netdev_pmd_flush_output_packets(non_pmd, false);
+        }
+
+        dpif_netdev_xps_revalidate_pmd(non_pmd, false);
+        ovs_mutex_unlock(&dp->non_pmd_mutex);
+
+        dp_netdev_pmd_unref(non_pmd);
+    }
+
+    struct pmd_auto_lb *pmd_alb = &dp->pmd_alb;
+    if (pmd_alb->is_enabled) {
+        if (!pmd_alb->rebalance_poll_timer) {
+            pmd_alb->rebalance_poll_timer = now;
+        } else if ((pmd_alb->rebalance_poll_timer +
+                   pmd_alb->rebalance_intvl) < now) {
+            pmd_alb->rebalance_poll_timer = now;
+            CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+                if (atomic_count_get(&pmd->pmd_overloaded) >=
+                                    PMD_RXQ_INTERVAL_MAX) {
+                    pmd_rebalance = true;
+                    break;
+                }
+            }
+
+            if (pmd_rebalance &&
+                !dp_netdev_is_reconf_required(dp) &&
+                !ports_require_restart(dp) &&
+                pmd_rebalance_dry_run(dp)) {
+                VLOG_INFO("PMD auto lb dry run."
+                          " requesting datapath reconfigure.");
+                dp_netdev_request_reconfigure(dp);
+            }
+        }
+    }
+
+    if (dp_netdev_is_reconf_required(dp) || ports_require_restart(dp)) {
+        reconfigure_datapath(dp);
+    }
+    ovs_mutex_unlock(&dp->port_mutex);
+
+    tnl_neigh_cache_run();
+    tnl_port_map_run();
+    new_tnl_seq = seq_read(tnl_conf_seq);
+
+    if (dp->last_tnl_conf_seq != new_tnl_seq) {
+        dp->last_tnl_conf_seq = new_tnl_seq;
+        return true;
+    }
+    return false;
+}
+
+static void
+dpif_netdev_wait(struct dpif *dpif)
+{
+    struct dp_netdev_port *port;
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+
+    ovs_mutex_lock(&dp_netdev_mutex);
+    ovs_mutex_lock(&dp->port_mutex);
+    HMAP_FOR_EACH (port, node, &dp->ports) {
+        netdev_wait_reconf_required(port->netdev);
+        if (!netdev_is_pmd(port->netdev)) {
+            int i;
+
+            for (i = 0; i < port->n_rxq; i++) {
+                netdev_rxq_wait(port->rxqs[i].rx);
+            }
+        }
+    }
+    ovs_mutex_unlock(&dp->port_mutex);
+    ovs_mutex_unlock(&dp_netdev_mutex);
+    seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
+}
+
+static void
+pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
+{
+    struct tx_port *tx_port_cached;
+
+    /* Flush all the queued packets. */
+    dp_netdev_pmd_flush_output_packets(pmd, true);
+    /* Free all used tx queue ids. */
+    dpif_netdev_xps_revalidate_pmd(pmd, true);
+
+    HMAP_FOR_EACH_POP (tx_port_cached, node, &pmd->tnl_port_cache) {
+        free(tx_port_cached);
+    }
+    HMAP_FOR_EACH_POP (tx_port_cached, node, &pmd->send_port_cache) {
+        free(tx_port_cached);
+    }
+}
+
+/* Copies ports from 'pmd->tx_ports' (shared with the main thread) to
+ * thread-local copies. Copy to 'pmd->tnl_port_cache' if it is a tunnel
+ * device, otherwise to 'pmd->send_port_cache' if the port has at least
+ * one txq. */
+static void
+pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
+    OVS_REQUIRES(pmd->port_mutex)
+{
+    struct tx_port *tx_port, *tx_port_cached;
+
+    pmd_free_cached_ports(pmd);
+    hmap_shrink(&pmd->send_port_cache);
+    hmap_shrink(&pmd->tnl_port_cache);
+
+    HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) {
+        if (netdev_has_tunnel_push_pop(tx_port->port->netdev)) {
+            tx_port_cached = xmemdup(tx_port, sizeof *tx_port_cached);
+            hmap_insert(&pmd->tnl_port_cache, &tx_port_cached->node,
+                        hash_port_no(tx_port_cached->port->port_no));
+        }
+
+        if (netdev_n_txq(tx_port->port->netdev)) {
+            tx_port_cached = xmemdup(tx_port, sizeof *tx_port_cached);
             hmap_insert(&pmd->send_port_cache, &tx_port_cached->node,
                         hash_port_no(tx_port_cached->port->port_no));
         }
@@ -4882,6 +5934,10 @@ pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
     HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
         poll_list[i].rxq = poll->rxq;
         poll_list[i].port_no = poll->rxq->port->port_no;
+        poll_list[i].emc_enabled = poll->rxq->port->emc_enabled;
+        poll_list[i].rxq_enabled = netdev_rxq_enabled(poll->rxq->rx);
+        poll_list[i].change_seq =
+                     netdev_get_change_seq(poll->rxq->port->netdev);
         i++;
     }
 
@@ -4900,7 +5956,10 @@ pmd_thread_main(void *f_)
     struct pmd_perf_stats *s = &pmd->perf_stats;
     unsigned int lc = 0;
     struct polled_queue *poll_list;
+    bool wait_for_reload = false;
+    bool reload_tx_qid;
     bool exiting;
+    bool reload;
     int poll_cnt;
     int i;
     int process_packets = 0;
@@ -4913,9 +5972,11 @@ pmd_thread_main(void *f_)
     dpdk_set_lcore_id(pmd->core_id);
     poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
     dfc_cache_init(&pmd->flow_cache);
-reload:
     pmd_alloc_static_tx_qid(pmd);
 
+reload:
+    atomic_count_init(&pmd->pmd_overloaded, 0);
+
     /* List port/core affinity */
     for (i = 0; i < poll_cnt; i++) {
        VLOG_DBG("Core %d processing port \'%s\' with queue-id %d\n",
@@ -4926,16 +5987,26 @@ reload:
     }
 
     if (!poll_cnt) {
-        while (seq_read(pmd->reload_seq) == pmd->last_reload_seq) {
-            seq_wait(pmd->reload_seq, pmd->last_reload_seq);
-            poll_block();
+        if (wait_for_reload) {
+            /* Don't sleep, control thread will ask for a reload shortly. */
+            do {
+                atomic_read_explicit(&pmd->reload, &reload,
+                                     memory_order_acquire);
+            } while (!reload);
+        } else {
+            while (seq_read(pmd->reload_seq) == pmd->last_reload_seq) {
+                seq_wait(pmd->reload_seq, pmd->last_reload_seq);
+                poll_block();
+            }
         }
-        lc = UINT_MAX;
     }
 
     pmd->intrvl_tsc_prev = 0;
     atomic_store_relaxed(&pmd->intrvl_cycles, 0);
     cycles_counter_update(s);
+
+    pmd->next_rcu_quiesce = pmd->ctx.now + PMD_RCU_QUIESCE_INTERVAL;
+
     /* Protect pmd stats from external clearing while polling. */
     ovs_mutex_lock(&pmd->perf_stats.stats_mutex);
     for (;;) {
@@ -4944,6 +6015,18 @@ reload:
         pmd_perf_start_iteration(s);
 
         for (i = 0; i < poll_cnt; i++) {
+
+            if (!poll_list[i].rxq_enabled) {
+                continue;
+            }
+
+            if (poll_list[i].emc_enabled) {
+                atomic_read_relaxed(&pmd->dp->emc_insert_min,
+                                    &pmd->ctx.emc_insert_min);
+            } else {
+                pmd->ctx.emc_insert_min = 0;
+            }
+
             process_packets =
                 dp_netdev_process_rxq_port(pmd, poll_list[i].rxq,
                                            poll_list[i].port_no);
@@ -4958,39 +6041,66 @@ reload:
             tx_packets = dp_netdev_pmd_flush_output_packets(pmd, false);
         }
 
-        if (lc++ > 1024) {
-            bool reload;
+        /* Do RCU synchronization at fixed interval.  This ensures that
+         * synchronization would not be delayed long even at high load of
+         * packet processing. */
+        if (pmd->ctx.now > pmd->next_rcu_quiesce) {
+            if (!ovsrcu_try_quiesce()) {
+                pmd->next_rcu_quiesce =
+                    pmd->ctx.now + PMD_RCU_QUIESCE_INTERVAL;
+            }
+        }
 
+        if (lc++ > 1024) {
             lc = 0;
 
             coverage_try_clear();
             dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt);
             if (!ovsrcu_try_quiesce()) {
                 emc_cache_slow_sweep(&((pmd->flow_cache).emc_cache));
+                pmd->next_rcu_quiesce =
+                    pmd->ctx.now + PMD_RCU_QUIESCE_INTERVAL;
             }
 
-            atomic_read_relaxed(&pmd->reload, &reload);
-            if (reload) {
-                break;
+            for (i = 0; i < poll_cnt; i++) {
+                uint64_t current_seq =
+                         netdev_get_change_seq(poll_list[i].rxq->port->netdev);
+                if (poll_list[i].change_seq != current_seq) {
+                    poll_list[i].change_seq = current_seq;
+                    poll_list[i].rxq_enabled =
+                                 netdev_rxq_enabled(poll_list[i].rxq->rx);
+                }
             }
         }
+
+        atomic_read_explicit(&pmd->reload, &reload, memory_order_acquire);
+        if (OVS_UNLIKELY(reload)) {
+            break;
+        }
+
         pmd_perf_end_iteration(s, rx_packets, tx_packets,
                                pmd_perf_metrics_enabled(pmd));
     }
     ovs_mutex_unlock(&pmd->perf_stats.stats_mutex);
 
     poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
-    exiting = latch_is_set(&pmd->exit_latch);
+    atomic_read_relaxed(&pmd->wait_for_reload, &wait_for_reload);
+    atomic_read_relaxed(&pmd->reload_tx_qid, &reload_tx_qid);
+    atomic_read_relaxed(&pmd->exit, &exiting);
     /* Signal here to make sure the pmd finishes
      * reloading the updated configuration. */
     dp_netdev_pmd_reload_done(pmd);
 
-    pmd_free_static_tx_qid(pmd);
+    if (reload_tx_qid) {
+        pmd_free_static_tx_qid(pmd);
+        pmd_alloc_static_tx_qid(pmd);
+    }
 
     if (!exiting) {
         goto reload;
     }
 
+    pmd_free_static_tx_qid(pmd);
     dfc_cache_uninit(&pmd->flow_cache);
     free(poll_list);
     pmd_free_cached_ports(pmd);
@@ -5050,7 +6160,15 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
     memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
 
     /* All packets will hit the meter at the same time. */
-    long_delta_t = (now - meter->used) / 1000; /* msec */
+    long_delta_t = now / 1000 - meter->used / 1000; /* msec */
+
+    if (long_delta_t < 0) {
+        /* This condition means that we have several threads fighting for a
+           meter lock, and the one who received the packets a bit later wins.
+           Assuming that all racing threads received packets at the same time
+           to avoid overflow. */
+        long_delta_t = 0;
+    }
 
     /* Make sure delta_t will not be too large, so that bucket will not
      * wrap around below. */
@@ -5147,7 +6265,7 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
             band = &meter->bands[exceeded_band[j]];
             band->packet_count += 1;
             band->byte_count += dp_packet_size(packet);
-
+            COVERAGE_INC(datapath_drop_meter);
             dp_packet_delete(packet);
         } else {
             /* Meter accepts packet. */
@@ -5160,11 +6278,11 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
 
 /* Meter set/get/del processing is still single-threaded. */
 static int
-dpif_netdev_meter_set(struct dpif *dpif, ofproto_meter_id *meter_id,
+dpif_netdev_meter_set(struct dpif *dpif, ofproto_meter_id meter_id,
                       struct ofputil_meter_config *config)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
-    uint32_t mid = meter_id->uint32;
+    uint32_t mid = meter_id.uint32;
     struct dp_meter *meter;
     int i;
 
@@ -5192,44 +6310,42 @@ dpif_netdev_meter_set(struct dpif *dpif, ofproto_meter_id *meter_id,
     /* Allocate meter */
     meter = xzalloc(sizeof *meter
                     + config->n_bands * sizeof(struct dp_meter_band));
-    if (meter) {
-        meter->flags = config->flags;
-        meter->n_bands = config->n_bands;
-        meter->max_delta_t = 0;
-        meter->used = time_usec();
-
-        /* set up bands */
-        for (i = 0; i < config->n_bands; ++i) {
-            uint32_t band_max_delta_t;
-
-            /* Set burst size to a workable value if none specified. */
-            if (config->bands[i].burst_size == 0) {
-                config->bands[i].burst_size = config->bands[i].rate;
-            }
 
-            meter->bands[i].up = config->bands[i];
-            /* Convert burst size to the bucket units: */
-            /* pkts => 1/1000 packets, kilobits => bits. */
-            meter->bands[i].up.burst_size *= 1000;
-            /* Initialize bucket to empty. */
-            meter->bands[i].bucket = 0;
-
-            /* Figure out max delta_t that is enough to fill any bucket. */
-            band_max_delta_t
-                = meter->bands[i].up.burst_size / meter->bands[i].up.rate;
-            if (band_max_delta_t > meter->max_delta_t) {
-                meter->max_delta_t = band_max_delta_t;
-            }
+    meter->flags = config->flags;
+    meter->n_bands = config->n_bands;
+    meter->max_delta_t = 0;
+    meter->used = time_usec();
+
+    /* set up bands */
+    for (i = 0; i < config->n_bands; ++i) {
+        uint32_t band_max_delta_t;
+
+        /* Set burst size to a workable value if none specified. */
+        if (config->bands[i].burst_size == 0) {
+            config->bands[i].burst_size = config->bands[i].rate;
         }
 
-        meter_lock(dp, mid);
-        dp_delete_meter(dp, mid); /* Free existing meter, if any */
-        dp->meters[mid] = meter;
-        meter_unlock(dp, mid);
+        meter->bands[i].up = config->bands[i];
+        /* Convert burst size to the bucket units: */
+        /* pkts => 1/1000 packets, kilobits => bits. */
+        meter->bands[i].up.burst_size *= 1000;
+        /* Initialize bucket to empty. */
+        meter->bands[i].bucket = 0;
 
-        return 0;
+        /* Figure out max delta_t that is enough to fill any bucket. */
+        band_max_delta_t
+            = meter->bands[i].up.burst_size / meter->bands[i].up.rate;
+        if (band_max_delta_t > meter->max_delta_t) {
+            meter->max_delta_t = band_max_delta_t;
+        }
     }
-    return ENOMEM;
+
+    meter_lock(dp, mid);
+    dp_delete_meter(dp, mid); /* Free existing meter, if any */
+    dp->meters[mid] = meter;
+    meter_unlock(dp, mid);
+
+    return 0;
 }
 
 static int
@@ -5238,20 +6354,22 @@ dpif_netdev_meter_get(const struct dpif *dpif,
                       struct ofputil_meter_stats *stats, uint16_t n_bands)
 {
     const struct dp_netdev *dp = get_dp_netdev(dpif);
-    const struct dp_meter *meter;
     uint32_t meter_id = meter_id_.uint32;
+    int retval = 0;
 
     if (meter_id >= MAX_METERS) {
         return EFBIG;
     }
-    meter = dp->meters[meter_id];
+
+    meter_lock(dp, meter_id);
+    const struct dp_meter *meter = dp->meters[meter_id];
     if (!meter) {
-        return ENOENT;
+        retval = ENOENT;
+        goto done;
     }
     if (stats) {
         int i = 0;
 
-        meter_lock(dp, meter_id);
         stats->packet_in_count = meter->packet_count;
         stats->byte_in_count = meter->byte_count;
 
@@ -5259,11 +6377,13 @@ dpif_netdev_meter_get(const struct dpif *dpif,
             stats->bands[i].packet_count = meter->bands[i].packet_count;
             stats->bands[i].byte_count = meter->bands[i].byte_count;
         }
-        meter_unlock(dp, meter_id);
 
         stats->n_bands = i;
     }
-    return 0;
+
+done:
+    meter_unlock(dp, meter_id);
+    return retval;
 }
 
 static int
@@ -5312,11 +6432,10 @@ dpif_netdev_enable_upcall(struct dpif *dpif)
 static void
 dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd)
 {
-    ovs_mutex_lock(&pmd->cond_mutex);
-    atomic_store_relaxed(&pmd->reload, false);
+    atomic_store_relaxed(&pmd->wait_for_reload, false);
+    atomic_store_relaxed(&pmd->reload_tx_qid, false);
     pmd->last_reload_seq = seq_read(pmd->reload_seq);
-    xpthread_cond_signal(&pmd->cond);
-    ovs_mutex_unlock(&pmd->cond_mutex);
+    atomic_store_explicit(&pmd->reload, false, memory_order_release);
 }
 
 /* Finds and refs the dp_netdev_pmd_thread on core 'core_id'.  Returns
@@ -5397,24 +6516,25 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd->n_output_batches = 0;
 
     ovs_refcount_init(&pmd->ref_cnt);
-    latch_init(&pmd->exit_latch);
+    atomic_init(&pmd->exit, false);
     pmd->reload_seq = seq_create();
     pmd->last_reload_seq = seq_read(pmd->reload_seq);
     atomic_init(&pmd->reload, false);
-    xpthread_cond_init(&pmd->cond, NULL);
-    ovs_mutex_init(&pmd->cond_mutex);
     ovs_mutex_init(&pmd->flow_mutex);
     ovs_mutex_init(&pmd->port_mutex);
+    ovs_mutex_init(&pmd->bond_mutex);
     cmap_init(&pmd->flow_table);
     cmap_init(&pmd->classifiers);
     pmd->ctx.last_rxq = NULL;
     pmd_thread_ctx_time_update(pmd);
     pmd->next_optimization = pmd->ctx.now + DPCLS_OPTIMIZATION_INTERVAL;
+    pmd->next_rcu_quiesce = pmd->ctx.now + PMD_RCU_QUIESCE_INTERVAL;
     pmd->rxq_next_cycle_store = pmd->ctx.now + PMD_RXQ_INTERVAL_LEN;
     hmap_init(&pmd->poll_list);
     hmap_init(&pmd->tx_ports);
     hmap_init(&pmd->tnl_port_cache);
     hmap_init(&pmd->send_port_cache);
+    cmap_init(&pmd->tx_bonds);
     /* init the 'flow_cache' since there is no
      * actual thread created for NON_PMD_CORE_ID. */
     if (core_id == NON_PMD_CORE_ID) {
@@ -5435,6 +6555,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
     hmap_destroy(&pmd->send_port_cache);
     hmap_destroy(&pmd->tnl_port_cache);
     hmap_destroy(&pmd->tx_ports);
+    cmap_destroy(&pmd->tx_bonds);
     hmap_destroy(&pmd->poll_list);
     /* All flows (including their dpcls_rules) have been deleted already */
     CMAP_FOR_EACH (cls, node, &pmd->classifiers) {
@@ -5444,11 +6565,9 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
     cmap_destroy(&pmd->classifiers);
     cmap_destroy(&pmd->flow_table);
     ovs_mutex_destroy(&pmd->flow_mutex);
-    latch_destroy(&pmd->exit_latch);
     seq_destroy(pmd->reload_seq);
-    xpthread_cond_destroy(&pmd->cond);
-    ovs_mutex_destroy(&pmd->cond_mutex);
     ovs_mutex_destroy(&pmd->port_mutex);
+    ovs_mutex_destroy(&pmd->bond_mutex);
     free(pmd);
 }
 
@@ -5466,7 +6585,7 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
         pmd_free_static_tx_qid(pmd);
         ovs_mutex_unlock(&dp->non_pmd_mutex);
     } else {
-        latch_set(&pmd->exit_latch);
+        atomic_store_relaxed(&pmd->exit, true);
         dp_netdev_reload_pmd__(pmd);
         xpthread_join(pmd->thread, NULL);
     }
@@ -5518,6 +6637,7 @@ dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd)
 {
     struct rxq_poll *poll;
     struct tx_port *port;
+    struct tx_bond *tx;
 
     ovs_mutex_lock(&pmd->port_mutex);
     HMAP_FOR_EACH_POP (poll, node, &pmd->poll_list) {
@@ -5527,6 +6647,13 @@ dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd)
         free(port);
     }
     ovs_mutex_unlock(&pmd->port_mutex);
+
+    ovs_mutex_lock(&pmd->bond_mutex);
+    CMAP_FOR_EACH (tx, node, &pmd->tx_bonds) {
+        cmap_remove(&pmd->tx_bonds, &tx->node, hash_bond_id(tx->bond_id));
+        ovsrcu_postpone(free, tx);
+    }
+    ovs_mutex_unlock(&pmd->bond_mutex);
 }
 
 /* Adds rx queue to poll_list of PMD thread, if it's not there already. */
@@ -5602,6 +6729,62 @@ dp_netdev_del_port_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
     free(tx);
     pmd->need_reload = true;
 }
+
+/* Add bond to the tx bond cmap of 'pmd'. */
+static void
+dp_netdev_add_bond_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
+                             struct tx_bond *bond, bool update)
+    OVS_EXCLUDED(pmd->bond_mutex)
+{
+    struct tx_bond *tx;
+
+    ovs_mutex_lock(&pmd->bond_mutex);
+    tx = tx_bond_lookup(&pmd->tx_bonds, bond->bond_id);
+
+    if (tx && !update) {
+        /* It's not an update and the entry already exists.  Do nothing. */
+        goto unlock;
+    }
+
+    if (tx) {
+        struct tx_bond *new_tx = xmemdup(bond, sizeof *bond);
+
+        /* Copy the stats for each bucket. */
+        for (int i = 0; i < BOND_BUCKETS; i++) {
+            uint64_t n_packets, n_bytes;
+
+            atomic_read_relaxed(&tx->member_buckets[i].n_packets, &n_packets);
+            atomic_read_relaxed(&tx->member_buckets[i].n_bytes, &n_bytes);
+            atomic_init(&new_tx->member_buckets[i].n_packets, n_packets);
+            atomic_init(&new_tx->member_buckets[i].n_bytes, n_bytes);
+        }
+        cmap_replace(&pmd->tx_bonds, &tx->node, &new_tx->node,
+                     hash_bond_id(bond->bond_id));
+        ovsrcu_postpone(free, tx);
+    } else {
+        tx = xmemdup(bond, sizeof *bond);
+        cmap_insert(&pmd->tx_bonds, &tx->node, hash_bond_id(bond->bond_id));
+    }
+unlock:
+    ovs_mutex_unlock(&pmd->bond_mutex);
+}
+
+/* Delete bond from the tx bond cmap of 'pmd'. */
+static void
+dp_netdev_del_bond_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
+                               uint32_t bond_id)
+    OVS_EXCLUDED(pmd->bond_mutex)
+{
+    struct tx_bond *tx;
+
+    ovs_mutex_lock(&pmd->bond_mutex);
+    tx = tx_bond_lookup(&pmd->tx_bonds, bond_id);
+    if (tx) {
+        cmap_remove(&pmd->tx_bonds, &tx->node, hash_bond_id(tx->bond_id));
+        ovsrcu_postpone(free, tx);
+    }
+    ovs_mutex_unlock(&pmd->bond_mutex);
+}
 \f
 static char *
 dpif_netdev_get_datapath_version(void)
@@ -5698,7 +6881,6 @@ dpif_netdev_packet_get_rss_hash(struct dp_packet *packet,
     recirc_depth = *recirc_depth_get_unsafe();
     if (OVS_UNLIKELY(recirc_depth)) {
         hash = hash_finish(hash, recirc_depth);
-        dp_packet_set_rss_hash(packet, hash);
     }
     return hash;
 }
@@ -5718,7 +6900,7 @@ packet_batch_per_flow_update(struct packet_batch_per_flow *batch,
 {
     batch->byte_count += dp_packet_size(packet);
     batch->tcp_flags |= tcp_flags;
-    batch->array.packets[batch->array.count++] = packet;
+    dp_packet_batch_add(&batch->array, packet);
 }
 
 static inline void
@@ -5740,7 +6922,8 @@ packet_batch_per_flow_execute(struct packet_batch_per_flow *batch,
     struct dp_netdev_actions *actions;
     struct dp_netdev_flow *flow = batch->flow;
 
-    dp_netdev_flow_used(flow, batch->array.count, batch->byte_count,
+    dp_netdev_flow_used(flow, dp_packet_batch_size(&batch->array),
+                        batch->byte_count,
                         batch->tcp_flags, pmd->ctx.now / 1000);
 
     actions = dp_netdev_flow_get_actions(flow);
@@ -5765,6 +6948,19 @@ dp_netdev_queue_batches(struct dp_packet *pkt,
     packet_batch_per_flow_update(batch, pkt, tcp_flags);
 }
 
+static inline void
+packet_enqueue_to_flow_map(struct dp_packet *packet,
+                           struct dp_netdev_flow *flow,
+                           uint16_t tcp_flags,
+                           struct dp_packet_flow_map *flow_map,
+                           size_t index)
+{
+    struct dp_packet_flow_map *map = &flow_map[index];
+    map->flow = flow;
+    map->packet = packet;
+    map->tcp_flags = tcp_flags;
+}
+
 /* SMC lookup function for a batch of packets.
  * By doing batching SMC lookup, we can use prefetch
  * to hide memory access latency.
@@ -5774,8 +6970,9 @@ smc_lookup_batch(struct dp_netdev_pmd_thread *pmd,
             struct netdev_flow_key *keys,
             struct netdev_flow_key **missed_keys,
             struct dp_packet_batch *packets_,
-            struct packet_batch_per_flow batches[],
-            size_t *n_batches, const int cnt)
+            const int cnt,
+            struct dp_packet_flow_map *flow_map,
+            uint8_t *index_map)
 {
     int i;
     struct dp_packet *packet;
@@ -5783,6 +6980,8 @@ smc_lookup_batch(struct dp_netdev_pmd_thread *pmd,
     struct dfc_cache *cache = &pmd->flow_cache;
     struct smc_cache *smc_cache = &cache->smc_cache;
     const struct cmap_node *flow_node;
+    int recv_idx;
+    uint16_t tcp_flags;
 
     /* Prefetch buckets for all packets */
     for (i = 0; i < cnt; i++) {
@@ -5793,6 +6992,8 @@ smc_lookup_batch(struct dp_netdev_pmd_thread *pmd,
         struct dp_netdev_flow *flow = NULL;
         flow_node = smc_entry_get(pmd, keys[i].hash);
         bool hit = false;
+        /* Get the original order of this packet in received batch. */
+        recv_idx = index_map[i];
 
         if (OVS_LIKELY(flow_node != NULL)) {
             CMAP_NODE_FOR_EACH (flow, node, flow_node) {
@@ -5800,12 +7001,17 @@ smc_lookup_batch(struct dp_netdev_pmd_thread *pmd,
                  * number, we need to  verify that the input ports match. */
                 if (OVS_LIKELY(dpcls_rule_matches_key(&flow->cr, &keys[i]) &&
                 flow->flow.in_port.odp_port == packet->md.in_port.odp_port)) {
+                    tcp_flags = miniflow_get_tcp_flags(&keys[i].mf);
+
                     /* SMC hit and emc miss, we insert into EMC */
                     keys[i].len =
                         netdev_flow_key_size(miniflow_n_values(&keys[i].mf));
                     emc_probabilistic_insert(pmd, &keys[i], flow);
-                    dp_netdev_queue_batches(packet, flow,
-                    miniflow_get_tcp_flags(&keys[i].mf), batches, n_batches);
+                    /* Add these packets into the flow map in the same order
+                     * as received.
+                     */
+                    packet_enqueue_to_flow_map(packet, flow, tcp_flags,
+                                               flow_map, recv_idx);
                     n_smc_hit++;
                     hit = true;
                     break;
@@ -5819,6 +7025,10 @@ smc_lookup_batch(struct dp_netdev_pmd_thread *pmd,
         /* SMC missed. Group missed packets together at
          * the beginning of the 'packets' array. */
         dp_packet_batch_refill(packets_, packet, i);
+
+        /* Preserve the order of packet for flow batching. */
+        index_map[n_missed] = recv_idx;
+
         /* Put missed keys to the pointer arrays return to the caller */
         missed_keys[n_missed++] = &keys[i];
     }
@@ -5847,6 +7057,8 @@ dfc_processing(struct dp_netdev_pmd_thread *pmd,
                struct netdev_flow_key *keys,
                struct netdev_flow_key **missed_keys,
                struct packet_batch_per_flow batches[], size_t *n_batches,
+               struct dp_packet_flow_map *flow_map,
+               size_t *n_flows, uint8_t *index_map,
                bool md_is_valid, odp_port_t port_no)
 {
     struct netdev_flow_key *key = &keys[0];
@@ -5854,13 +7066,14 @@ dfc_processing(struct dp_netdev_pmd_thread *pmd,
     struct dfc_cache *cache = &pmd->flow_cache;
     struct dp_packet *packet;
     const size_t cnt = dp_packet_batch_size(packets_);
-    uint32_t cur_min;
+    uint32_t cur_min = pmd->ctx.emc_insert_min;
     int i;
     uint16_t tcp_flags;
     bool smc_enable_db;
+    size_t map_cnt = 0;
+    bool batch_enable = true;
 
     atomic_read_relaxed(&pmd->dp->smc_enable_db, &smc_enable_db);
-    atomic_read_relaxed(&pmd->dp->emc_insert_min, &cur_min);
     pmd_perf_update_counter(&pmd->perf_stats,
                             md_is_valid ? PMD_STAT_RECIRC : PMD_STAT_RECV,
                             cnt);
@@ -5871,6 +7084,7 @@ dfc_processing(struct dp_netdev_pmd_thread *pmd,
 
         if (OVS_UNLIKELY(dp_packet_size(packet) < ETH_HEADER_LEN)) {
             dp_packet_delete(packet);
+            COVERAGE_INC(datapath_drop_rx_invalid_packet);
             continue;
         }
 
@@ -5888,39 +7102,55 @@ dfc_processing(struct dp_netdev_pmd_thread *pmd,
         if ((*recirc_depth_get() == 0) &&
             dp_packet_has_flow_mark(packet, &mark)) {
             flow = mark_to_flow_find(pmd, mark);
-            if (flow) {
+            if (OVS_LIKELY(flow)) {
                 tcp_flags = parse_tcp_flags(packet);
-                dp_netdev_queue_batches(packet, flow, tcp_flags, batches,
-                                        n_batches);
+                if (OVS_LIKELY(batch_enable)) {
+                    dp_netdev_queue_batches(packet, flow, tcp_flags, batches,
+                                            n_batches);
+                } else {
+                    /* Flow batching should be performed only after fast-path
+                     * processing is also completed for packets with emc miss
+                     * or else it will result in reordering of packets with
+                     * same datapath flows. */
+                    packet_enqueue_to_flow_map(packet, flow, tcp_flags,
+                                               flow_map, map_cnt++);
+                }
                 continue;
             }
         }
 
         miniflow_extract(packet, &key->mf);
         key->len = 0; /* Not computed yet. */
-        /* If EMC and SMC disabled skip hash computation */
-        if (smc_enable_db == true || cur_min != 0) {
-            if (!md_is_valid) {
-                key->hash = dpif_netdev_packet_get_rss_hash_orig_pkt(packet,
-                        &key->mf);
-            } else {
-                key->hash = dpif_netdev_packet_get_rss_hash(packet, &key->mf);
-            }
-        }
-        if (cur_min) {
-            flow = emc_lookup(&cache->emc_cache, key);
-        } else {
-            flow = NULL;
-        }
+        key->hash =
+                (md_is_valid == false)
+                ? dpif_netdev_packet_get_rss_hash_orig_pkt(packet, &key->mf)
+                : dpif_netdev_packet_get_rss_hash(packet, &key->mf);
+
+        /* If EMC is disabled skip emc_lookup */
+        flow = (cur_min != 0) ? emc_lookup(&cache->emc_cache, key) : NULL;
         if (OVS_LIKELY(flow)) {
             tcp_flags = miniflow_get_tcp_flags(&key->mf);
-            dp_netdev_queue_batches(packet, flow, tcp_flags, batches,
-                                    n_batches);
             n_emc_hit++;
+            if (OVS_LIKELY(batch_enable)) {
+                dp_netdev_queue_batches(packet, flow, tcp_flags, batches,
+                                        n_batches);
+            } else {
+                /* Flow batching should be performed only after fast-path
+                 * processing is also completed for packets with emc miss
+                 * or else it will result in reordering of packets with
+                 * same datapath flows. */
+                packet_enqueue_to_flow_map(packet, flow, tcp_flags,
+                                           flow_map, map_cnt++);
+            }
         } else {
             /* Exact match cache missed. Group missed packets together at
              * the beginning of the 'packets' array. */
             dp_packet_batch_refill(packets_, packet, i);
+
+            /* Preserve the order of packet for flow batching. */
+            index_map[n_missed] = map_cnt;
+            flow_map[map_cnt++].flow = NULL;
+
             /* 'key[n_missed]' contains the key of the current packet and it
              * will be passed to SMC lookup. The next key should be extracted
              * to 'keys[n_missed + 1]'.
@@ -5928,8 +7158,13 @@ dfc_processing(struct dp_netdev_pmd_thread *pmd,
              * which will be returned to the caller for future processing. */
             missed_keys[n_missed] = key;
             key = &keys[++n_missed];
+
+            /* Skip batching for subsequent packets to avoid reordering. */
+            batch_enable = false;
         }
     }
+    /* Count of packets which are not flow batched. */
+    *n_flows = map_cnt;
 
     pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_EXACT_HIT, n_emc_hit);
 
@@ -5938,8 +7173,8 @@ dfc_processing(struct dp_netdev_pmd_thread *pmd,
     }
 
     /* Packets miss EMC will do a batch lookup in SMC if enabled */
-    smc_lookup_batch(pmd, keys, missed_keys, packets_, batches,
-                            n_batches, n_missed);
+    smc_lookup_batch(pmd, keys, missed_keys, packets_,
+                     n_missed, flow_map, index_map);
 
     return dp_packet_batch_size(packets_);
 }
@@ -5959,16 +7194,18 @@ handle_packet_upcall(struct dp_netdev_pmd_thread *pmd,
 
     match.tun_md.valid = false;
     miniflow_expand(&key->mf, &match.flow);
+    memset(&match.wc, 0, sizeof match.wc);
 
     ofpbuf_clear(actions);
     ofpbuf_clear(put_actions);
 
-    dpif_flow_hash(pmd->dp->dpif, &match.flow, sizeof match.flow, &ufid);
+    odp_flow_key_hash(&match.flow, sizeof match.flow, &ufid);
     error = dp_netdev_upcall(pmd, packet, &match.flow, &match.wc,
                              &ufid, DPIF_UC_MISS, NULL, actions,
                              put_actions);
     if (OVS_UNLIKELY(error && error != ENOSPC)) {
         dp_packet_delete(packet);
+        COVERAGE_INC(datapath_drop_upcall_error);
         return error;
     }
 
@@ -5977,7 +7214,7 @@ handle_packet_upcall(struct dp_netdev_pmd_thread *pmd,
      * tag is interpreted as exact match on the fact that there is no
      * VLAN.  Unless we refactor a lot of code that translates between
      * Netlink and struct flow representations, we have to do the same
-     * here. */
+     * here.  This must be in sync with 'match' in dpif_netdev_flow_put(). */
     if (!match.wc.masks.vlans[0].tci) {
         match.wc.masks.vlans[0].tci = htons(0xffff);
     }
@@ -5997,8 +7234,7 @@ handle_packet_upcall(struct dp_netdev_pmd_thread *pmd,
          * could have already been installed since we last did the flow
          * lookup before upcall.  This could be solved by moving the
          * mutex lock outside the loop, but that's an awful long time
-         * to be locking everyone out of making flow installs.  If we
-         * move to a per-core classifier, it would be reasonable. */
+         * to be locking revalidators out of making flow modifications. */
         ovs_mutex_lock(&pmd->flow_mutex);
         netdev_flow = dp_netdev_pmd_lookup_flow(pmd, key, NULL);
         if (OVS_LIKELY(!netdev_flow)) {
@@ -6026,8 +7262,8 @@ static inline void
 fast_path_processing(struct dp_netdev_pmd_thread *pmd,
                      struct dp_packet_batch *packets_,
                      struct netdev_flow_key **keys,
-                     struct packet_batch_per_flow batches[],
-                     size_t *n_batches,
+                     struct dp_packet_flow_map *flow_map,
+                     uint8_t *index_map,
                      odp_port_t in_port)
 {
     const size_t cnt = dp_packet_batch_size(packets_);
@@ -6100,6 +7336,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
         DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
             if (OVS_UNLIKELY(!rules[i])) {
                 dp_packet_delete(packet);
+                COVERAGE_INC(datapath_drop_lock_error);
                 upcall_fail_cnt++;
             }
         }
@@ -6107,6 +7344,9 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
 
     DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
         struct dp_netdev_flow *flow;
+        /* Get the original order of this packet in received batch. */
+        int recv_idx = index_map[i];
+        uint16_t tcp_flags;
 
         if (OVS_UNLIKELY(!rules[i])) {
             continue;
@@ -6117,9 +7357,12 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
         smc_insert(pmd, keys[i], hash);
 
         emc_probabilistic_insert(pmd, keys[i], flow);
-        dp_netdev_queue_batches(packet, flow,
-                                miniflow_get_tcp_flags(&keys[i]->mf),
-                                batches, n_batches);
+        /* Add these packets into the flow map in the same order
+         * as received.
+         */
+        tcp_flags = miniflow_get_tcp_flags(&keys[i]->mf);
+        packet_enqueue_to_flow_map(packet, flow, tcp_flags,
+                                   flow_map, recv_idx);
     }
 
     pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_MASKED_HIT,
@@ -6152,18 +7395,34 @@ dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
     struct netdev_flow_key *missed_keys[PKT_ARRAY_SIZE];
     struct packet_batch_per_flow batches[PKT_ARRAY_SIZE];
     size_t n_batches;
+    struct dp_packet_flow_map flow_map[PKT_ARRAY_SIZE];
+    uint8_t index_map[PKT_ARRAY_SIZE];
+    size_t n_flows, i;
+
     odp_port_t in_port;
 
     n_batches = 0;
     dfc_processing(pmd, packets, keys, missed_keys, batches, &n_batches,
-                            md_is_valid, port_no);
+                   flow_map, &n_flows, index_map, md_is_valid, port_no);
+
     if (!dp_packet_batch_is_empty(packets)) {
         /* Get ingress port from first packet's metadata. */
         in_port = packets->packets[0]->md.in_port.odp_port;
         fast_path_processing(pmd, packets, missed_keys,
-                             batches, &n_batches, in_port);
+                             flow_map, index_map, in_port);
     }
 
+    /* Batch rest of packets which are in flow map. */
+    for (i = 0; i < n_flows; i++) {
+        struct dp_packet_flow_map *map = &flow_map[i];
+
+        if (OVS_UNLIKELY(!map->flow)) {
+            continue;
+        }
+        dp_netdev_queue_batches(map->packet, map->flow, map->tcp_flags,
+                                batches, &n_batches);
+     }
+
     /* All the flow batches need to be reset before any call to
      * packet_batch_per_flow_execute() as it could potentially trigger
      * recirculation. When a packet matching flow â€˜j’ happens to be
@@ -6173,7 +7432,6 @@ dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
      * already its own batches[k] still waiting to be served.  So if its
      * â€˜batch’ member is not reset, the recirculated packet would be wrongly
      * appended to batches[k] of the 1st call to dp_netdev_input__(). */
-    size_t i;
     for (i = 0; i < n_batches; i++) {
         batches[i].flow->batch = NULL;
     }
@@ -6337,17 +7595,109 @@ dp_execute_userspace_action(struct dp_netdev_pmd_thread *pmd,
     struct dp_packet_batch b;
     int error;
 
-    ofpbuf_clear(actions);
+    ofpbuf_clear(actions);
+
+    error = dp_netdev_upcall(pmd, packet, flow, NULL, ufid,
+                             DPIF_UC_ACTION, userdata, actions,
+                             NULL);
+    if (!error || error == ENOSPC) {
+        dp_packet_batch_init_packet(&b, packet);
+        dp_netdev_execute_actions(pmd, &b, should_steal, flow,
+                                  actions->data, actions->size);
+    } else if (should_steal) {
+        dp_packet_delete(packet);
+        COVERAGE_INC(datapath_drop_userspace_action_error);
+    }
+}
+
+static bool
+dp_execute_output_action(struct dp_netdev_pmd_thread *pmd,
+                         struct dp_packet_batch *packets_,
+                         bool should_steal, odp_port_t port_no)
+{
+    struct tx_port *p = pmd_send_port_cache_lookup(pmd, port_no);
+    struct dp_packet_batch out;
+
+    if (!OVS_LIKELY(p)) {
+        COVERAGE_ADD(datapath_drop_invalid_port,
+                     dp_packet_batch_size(packets_));
+        dp_packet_delete_batch(packets_, should_steal);
+        return false;
+    }
+    if (!should_steal) {
+        dp_packet_batch_clone(&out, packets_);
+        dp_packet_batch_reset_cutlen(packets_);
+        packets_ = &out;
+    }
+    dp_packet_batch_apply_cutlen(packets_);
+#ifdef DPDK_NETDEV
+    if (OVS_UNLIKELY(!dp_packet_batch_is_empty(&p->output_pkts)
+                     && packets_->packets[0]->source
+                        != p->output_pkts.packets[0]->source)) {
+        /* XXX: netdev-dpdk assumes that all packets in a single
+         *      output batch has the same source. Flush here to
+         *      avoid memory access issues. */
+        dp_netdev_pmd_flush_output_on_port(pmd, p);
+    }
+#endif
+    if (dp_packet_batch_size(&p->output_pkts)
+        + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
+        /* Flush here to avoid overflow. */
+        dp_netdev_pmd_flush_output_on_port(pmd, p);
+    }
+    if (dp_packet_batch_is_empty(&p->output_pkts)) {
+        pmd->n_output_batches++;
+    }
+
+    struct dp_packet *packet;
+    DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
+        p->output_pkts_rxqs[dp_packet_batch_size(&p->output_pkts)] =
+            pmd->ctx.last_rxq;
+        dp_packet_batch_add(&p->output_pkts, packet);
+    }
+    return true;
+}
+
+static void
+dp_execute_lb_output_action(struct dp_netdev_pmd_thread *pmd,
+                            struct dp_packet_batch *packets_,
+                            bool should_steal, uint32_t bond)
+{
+    struct tx_bond *p_bond = tx_bond_lookup(&pmd->tx_bonds, bond);
+    struct dp_packet_batch out;
+    struct dp_packet *packet;
+
+    if (!p_bond) {
+        COVERAGE_ADD(datapath_drop_invalid_bond,
+                     dp_packet_batch_size(packets_));
+        dp_packet_delete_batch(packets_, should_steal);
+        return;
+    }
+    if (!should_steal) {
+        dp_packet_batch_clone(&out, packets_);
+        dp_packet_batch_reset_cutlen(packets_);
+        packets_ = &out;
+    }
+    dp_packet_batch_apply_cutlen(packets_);
 
-    error = dp_netdev_upcall(pmd, packet, flow, NULL, ufid,
-                             DPIF_UC_ACTION, userdata, actions,
-                             NULL);
-    if (!error || error == ENOSPC) {
-        dp_packet_batch_init_packet(&b, packet);
-        dp_netdev_execute_actions(pmd, &b, should_steal, flow,
-                                  actions->data, actions->size);
-    } else if (should_steal) {
-        dp_packet_delete(packet);
+    DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
+        /*
+         * Lookup the bond-hash table using hash to get the member.
+         */
+        uint32_t hash = dp_packet_get_rss_hash(packet);
+        struct member_entry *s_entry
+            = &p_bond->member_buckets[hash & BOND_MASK];
+        odp_port_t bond_member = s_entry->member_id;
+        uint32_t size = dp_packet_size(packet);
+        struct dp_packet_batch output_pkt;
+
+        dp_packet_batch_init_packet(&output_pkt, packet);
+        if (OVS_LIKELY(dp_execute_output_action(pmd, &output_pkt, true,
+                                                bond_member))) {
+            /* Update member stats. */
+            non_atomic_ullong_add(&s_entry->n_packets, 1);
+            non_atomic_ullong_add(&s_entry->n_bytes, size);
+        }
     }
 }
 
@@ -6362,49 +7712,18 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
     struct dp_netdev *dp = pmd->dp;
     int type = nl_attr_type(a);
     struct tx_port *p;
+    uint32_t packet_count, packets_dropped;
 
     switch ((enum ovs_action_attr)type) {
     case OVS_ACTION_ATTR_OUTPUT:
-        p = pmd_send_port_cache_lookup(pmd, nl_attr_get_odp_port(a));
-        if (OVS_LIKELY(p)) {
-            struct dp_packet *packet;
-            struct dp_packet_batch out;
-
-            if (!should_steal) {
-                dp_packet_batch_clone(&out, packets_);
-                dp_packet_batch_reset_cutlen(packets_);
-                packets_ = &out;
-            }
-            dp_packet_batch_apply_cutlen(packets_);
-
-#ifdef DPDK_NETDEV
-            if (OVS_UNLIKELY(!dp_packet_batch_is_empty(&p->output_pkts)
-                             && packets_->packets[0]->source
-                                != p->output_pkts.packets[0]->source)) {
-                /* XXX: netdev-dpdk assumes that all packets in a single
-                 *      output batch has the same source. Flush here to
-                 *      avoid memory access issues. */
-                dp_netdev_pmd_flush_output_on_port(pmd, p);
-            }
-#endif
-            if (dp_packet_batch_size(&p->output_pkts)
-                + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
-                /* Flush here to avoid overflow. */
-                dp_netdev_pmd_flush_output_on_port(pmd, p);
-            }
-
-            if (dp_packet_batch_is_empty(&p->output_pkts)) {
-                pmd->n_output_batches++;
-            }
+        dp_execute_output_action(pmd, packets_, should_steal,
+                                 nl_attr_get_odp_port(a));
+        return;
 
-            DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
-                p->output_pkts_rxqs[dp_packet_batch_size(&p->output_pkts)] =
-                                                             pmd->ctx.last_rxq;
-                dp_packet_batch_add(&p->output_pkts, packet);
-            }
-            return;
-        }
-        break;
+    case OVS_ACTION_ATTR_LB_OUTPUT:
+        dp_execute_lb_output_action(pmd, packets_, should_steal,
+                                    nl_attr_get_u32(a));
+        return;
 
     case OVS_ACTION_ATTR_TUNNEL_PUSH:
         if (should_steal) {
@@ -6415,7 +7734,11 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
             break;
         }
         dp_packet_batch_apply_cutlen(packets_);
-        push_tnl_action(pmd, a, packets_);
+        packet_count = dp_packet_batch_size(packets_);
+        if (push_tnl_action(pmd, a, packets_)) {
+            COVERAGE_ADD(datapath_drop_tunnel_push_error,
+                         packet_count);
+        }
         return;
 
     case OVS_ACTION_ATTR_TUNNEL_POP:
@@ -6435,7 +7758,14 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
 
                 dp_packet_batch_apply_cutlen(packets_);
 
+                packet_count = dp_packet_batch_size(packets_);
                 netdev_pop_header(p->port->netdev, packets_);
+                packets_dropped =
+                   packet_count - dp_packet_batch_size(packets_);
+                if (packets_dropped) {
+                    COVERAGE_ADD(datapath_drop_tunnel_pop_error,
+                                 packets_dropped);
+                }
                 if (dp_packet_batch_is_empty(packets_)) {
                     return;
                 }
@@ -6450,6 +7780,11 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 (*depth)--;
                 return;
             }
+            COVERAGE_ADD(datapath_drop_invalid_tnl_port,
+                         dp_packet_batch_size(packets_));
+        } else {
+            COVERAGE_ADD(datapath_drop_recirc_error,
+                         dp_packet_batch_size(packets_));
         }
         break;
 
@@ -6480,7 +7815,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
             struct dp_packet *packet;
             DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
                 flow_extract(packet, &flow);
-                dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid);
+                odp_flow_key_hash(&flow, sizeof flow, &ufid);
                 dp_execute_userspace_action(pmd, packet, should_steal, &flow,
                                             &ufid, &actions, userdata);
             }
@@ -6494,6 +7829,8 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
 
             return;
         }
+        COVERAGE_ADD(datapath_drop_lock_error,
+                     dp_packet_batch_size(packets_));
         break;
 
     case OVS_ACTION_ATTR_RECIRC:
@@ -6517,6 +7854,8 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
             return;
         }
 
+        COVERAGE_ADD(datapath_drop_recirc_error,
+                     dp_packet_batch_size(packets_));
         VLOG_WARN("Packet dropped. Max recirculation depth exceeded.");
         break;
 
@@ -6526,6 +7865,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
         bool commit = false;
         unsigned int left;
         uint16_t zone = 0;
+        uint32_t tp_id = 0;
         const char *helper = NULL;
         const uint32_t *setmark = NULL;
         const struct ovs_key_ct_labels *setlabel = NULL;
@@ -6560,6 +7900,13 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 /* Silently ignored, as userspace datapath does not generate
                  * netlink events. */
                 break;
+            case OVS_CT_ATTR_TIMEOUT:
+                if (!str_to_uint(nl_attr_get_string(b), 10, &tp_id)) {
+                    VLOG_WARN("Invalid Timeout Policy ID: %s.",
+                              nl_attr_get_string(b));
+                    tp_id = DEFAULT_TP_ID;
+                }
+                break;
             case OVS_CT_ATTR_NAT: {
                 const struct nlattr *b_nest;
                 unsigned int left_nest;
@@ -6641,10 +7988,10 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
             VLOG_WARN_RL(&rl, "NAT specified without commit.");
         }
 
-        conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, force,
+        conntrack_execute(dp->conntrack, packets_, aux->flow->dl_type, force,
                           commit, zone, setmark, setlabel, aux->flow->tp_src,
                           aux->flow->tp_dst, helper, nat_action_info_ref,
-                          pmd->ctx.now / 1000);
+                          pmd->ctx.now / 1000, tp_id);
         break;
     }
 
@@ -6669,6 +8016,8 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
     case OVS_ACTION_ATTR_PUSH_NSH:
     case OVS_ACTION_ATTR_POP_NSH:
     case OVS_ACTION_ATTR_CT_CLEAR:
+    case OVS_ACTION_ATTR_CHECK_PKT_LEN:
+    case OVS_ACTION_ATTR_DROP:
     case __OVS_ACTION_ATTR_MAX:
         OVS_NOT_REACHED();
     }
@@ -6704,9 +8053,9 @@ dpif_netdev_ct_dump_start(struct dpif *dpif, struct ct_dpif_dump_state **dump_,
 
     dump = xzalloc(sizeof *dump);
     dump->dp = dp;
-    dump->ct = &dp->conntrack;
+    dump->ct = dp->conntrack;
 
-    conntrack_dump_start(&dp->conntrack, &dump->dump, pzone, ptot_bkts);
+    conntrack_dump_start(dp->conntrack, &dump->dump, pzone, ptot_bkts);
 
     *dump_ = &dump->up;
 
@@ -6748,9 +8097,9 @@ dpif_netdev_ct_flush(struct dpif *dpif, const uint16_t *zone,
     struct dp_netdev *dp = get_dp_netdev(dpif);
 
     if (tuple) {
-        return conntrack_flush_tuple(&dp->conntrack, tuple, zone ? *zone : 0);
+        return conntrack_flush_tuple(dp->conntrack, tuple, zone ? *zone : 0);
     }
-    return conntrack_flush(&dp->conntrack, zone);
+    return conntrack_flush(dp->conntrack, zone);
 }
 
 static int
@@ -6758,7 +8107,7 @@ dpif_netdev_ct_set_maxconns(struct dpif *dpif, uint32_t maxconns)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
 
-    return conntrack_set_maxconns(&dp->conntrack, maxconns);
+    return conntrack_set_maxconns(dp->conntrack, maxconns);
 }
 
 static int
@@ -6766,7 +8115,7 @@ dpif_netdev_ct_get_maxconns(struct dpif *dpif, uint32_t *maxconns)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
 
-    return conntrack_get_maxconns(&dp->conntrack, maxconns);
+    return conntrack_get_maxconns(dp->conntrack, maxconns);
 }
 
 static int
@@ -6774,11 +8123,313 @@ dpif_netdev_ct_get_nconns(struct dpif *dpif, uint32_t *nconns)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
 
-    return conntrack_get_nconns(&dp->conntrack, nconns);
+    return conntrack_get_nconns(dp->conntrack, nconns);
+}
+
+static int
+dpif_netdev_ct_set_tcp_seq_chk(struct dpif *dpif, bool enabled)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+
+    return conntrack_set_tcp_seq_chk(dp->conntrack, enabled);
+}
+
+static int
+dpif_netdev_ct_get_tcp_seq_chk(struct dpif *dpif, bool *enabled)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    *enabled = conntrack_get_tcp_seq_chk(dp->conntrack);
+    return 0;
+}
+
+static int
+dpif_netdev_ct_set_limits(struct dpif *dpif OVS_UNUSED,
+                           const uint32_t *default_limits,
+                           const struct ovs_list *zone_limits)
+{
+    int err = 0;
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    if (default_limits) {
+        err = zone_limit_update(dp->conntrack, DEFAULT_ZONE, *default_limits);
+        if (err != 0) {
+            return err;
+        }
+    }
+
+    struct ct_dpif_zone_limit *zone_limit;
+    LIST_FOR_EACH (zone_limit, node, zone_limits) {
+        err = zone_limit_update(dp->conntrack, zone_limit->zone,
+                                zone_limit->limit);
+        if (err != 0) {
+            break;
+        }
+    }
+    return err;
+}
+
+static int
+dpif_netdev_ct_get_limits(struct dpif *dpif OVS_UNUSED,
+                           uint32_t *default_limit,
+                           const struct ovs_list *zone_limits_request,
+                           struct ovs_list *zone_limits_reply)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct conntrack_zone_limit czl;
+
+    czl = zone_limit_get(dp->conntrack, DEFAULT_ZONE);
+    if (czl.zone == DEFAULT_ZONE) {
+        *default_limit = czl.limit;
+    } else {
+        return EINVAL;
+    }
+
+    if (!ovs_list_is_empty(zone_limits_request)) {
+        struct ct_dpif_zone_limit *zone_limit;
+        LIST_FOR_EACH (zone_limit, node, zone_limits_request) {
+            czl = zone_limit_get(dp->conntrack, zone_limit->zone);
+            if (czl.zone == zone_limit->zone || czl.zone == DEFAULT_ZONE) {
+                ct_dpif_push_zone_limit(zone_limits_reply, zone_limit->zone,
+                                        czl.limit, czl.count);
+            } else {
+                return EINVAL;
+            }
+        }
+    } else {
+        for (int z = MIN_ZONE; z <= MAX_ZONE; z++) {
+            czl = zone_limit_get(dp->conntrack, z);
+            if (czl.zone == z) {
+                ct_dpif_push_zone_limit(zone_limits_reply, z, czl.limit,
+                                        czl.count);
+            }
+        }
+    }
+
+    return 0;
+}
+
+static int
+dpif_netdev_ct_del_limits(struct dpif *dpif OVS_UNUSED,
+                           const struct ovs_list *zone_limits)
+{
+    int err = 0;
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct ct_dpif_zone_limit *zone_limit;
+    LIST_FOR_EACH (zone_limit, node, zone_limits) {
+        err = zone_limit_delete(dp->conntrack, zone_limit->zone);
+        if (err != 0) {
+            break;
+        }
+    }
+
+    return err;
+}
+
+static int
+dpif_netdev_ct_set_timeout_policy(struct dpif *dpif,
+                                  const struct ct_dpif_timeout_policy *dpif_tp)
+{
+    struct timeout_policy tp;
+    struct dp_netdev *dp;
+
+    dp = get_dp_netdev(dpif);
+    memcpy(&tp.policy, dpif_tp, sizeof tp.policy);
+    return timeout_policy_update(dp->conntrack, &tp);
+}
+
+static int
+dpif_netdev_ct_get_timeout_policy(struct dpif *dpif, uint32_t tp_id,
+                                  struct ct_dpif_timeout_policy *dpif_tp)
+{
+    struct timeout_policy *tp;
+    struct dp_netdev *dp;
+    int err = 0;
+
+    dp = get_dp_netdev(dpif);
+    tp = timeout_policy_get(dp->conntrack, tp_id);
+    if (!tp) {
+        return ENOENT;
+    }
+    memcpy(dpif_tp, &tp->policy, sizeof tp->policy);
+    return err;
+}
+
+static int
+dpif_netdev_ct_del_timeout_policy(struct dpif *dpif,
+                                  uint32_t tp_id)
+{
+    struct dp_netdev *dp;
+    int err = 0;
+
+    dp = get_dp_netdev(dpif);
+    err = timeout_policy_delete(dp->conntrack, tp_id);
+    return err;
+}
+
+static int
+dpif_netdev_ct_get_timeout_policy_name(struct dpif *dpif OVS_UNUSED,
+                                       uint32_t tp_id,
+                                       uint16_t dl_type OVS_UNUSED,
+                                       uint8_t nw_proto OVS_UNUSED,
+                                       char **tp_name, bool *is_generic)
+{
+    struct ds ds = DS_EMPTY_INITIALIZER;
+
+    ds_put_format(&ds, "%"PRIu32, tp_id);
+    *tp_name = ds_steal_cstr(&ds);
+    *is_generic = true;
+    return 0;
+}
+
+static int
+dpif_netdev_ipf_set_enabled(struct dpif *dpif, bool v6, bool enable)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    return ipf_set_enabled(conntrack_ipf_ctx(dp->conntrack), v6, enable);
+}
+
+static int
+dpif_netdev_ipf_set_min_frag(struct dpif *dpif, bool v6, uint32_t min_frag)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    return ipf_set_min_frag(conntrack_ipf_ctx(dp->conntrack), v6, min_frag);
+}
+
+static int
+dpif_netdev_ipf_set_max_nfrags(struct dpif *dpif, uint32_t max_frags)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    return ipf_set_max_nfrags(conntrack_ipf_ctx(dp->conntrack), max_frags);
+}
+
+/* Adjust this function if 'dpif_ipf_status' and 'ipf_status' were to
+ * diverge. */
+static int
+dpif_netdev_ipf_get_status(struct dpif *dpif,
+                           struct dpif_ipf_status *dpif_ipf_status)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    ipf_get_status(conntrack_ipf_ctx(dp->conntrack),
+                   (struct ipf_status *) dpif_ipf_status);
+    return 0;
+}
+
+static int
+dpif_netdev_ipf_dump_start(struct dpif *dpif OVS_UNUSED,
+                           struct ipf_dump_ctx **ipf_dump_ctx)
+{
+    return ipf_dump_start(ipf_dump_ctx);
+}
+
+static int
+dpif_netdev_ipf_dump_next(struct dpif *dpif, void *ipf_dump_ctx, char **dump)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    return ipf_dump_next(conntrack_ipf_ctx(dp->conntrack), ipf_dump_ctx,
+                         dump);
+}
+
+static int
+dpif_netdev_ipf_dump_done(struct dpif *dpif OVS_UNUSED, void *ipf_dump_ctx)
+{
+    return ipf_dump_done(ipf_dump_ctx);
+
+}
+
+static int
+dpif_netdev_bond_add(struct dpif *dpif, uint32_t bond_id,
+                     odp_port_t *member_map)
+{
+    struct tx_bond *new_tx = xzalloc(sizeof *new_tx);
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_pmd_thread *pmd;
+
+    /* Prepare new bond mapping. */
+    new_tx->bond_id = bond_id;
+    for (int bucket = 0; bucket < BOND_BUCKETS; bucket++) {
+        new_tx->member_buckets[bucket].member_id = member_map[bucket];
+    }
+
+    ovs_mutex_lock(&dp->bond_mutex);
+    /* Check if bond already existed. */
+    struct tx_bond *old_tx = tx_bond_lookup(&dp->tx_bonds, bond_id);
+    if (old_tx) {
+        cmap_replace(&dp->tx_bonds, &old_tx->node, &new_tx->node,
+                     hash_bond_id(bond_id));
+        ovsrcu_postpone(free, old_tx);
+    } else {
+        cmap_insert(&dp->tx_bonds, &new_tx->node, hash_bond_id(bond_id));
+    }
+    ovs_mutex_unlock(&dp->bond_mutex);
+
+    /* Update all PMDs with new bond mapping. */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        dp_netdev_add_bond_tx_to_pmd(pmd, new_tx, true);
+    }
+    return 0;
+}
+
+static int
+dpif_netdev_bond_del(struct dpif *dpif, uint32_t bond_id)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_pmd_thread *pmd;
+    struct tx_bond *tx;
+
+    ovs_mutex_lock(&dp->bond_mutex);
+    /* Check if bond existed. */
+    tx = tx_bond_lookup(&dp->tx_bonds, bond_id);
+    if (tx) {
+        cmap_remove(&dp->tx_bonds, &tx->node, hash_bond_id(bond_id));
+        ovsrcu_postpone(free, tx);
+    } else {
+        /* Bond is not present. */
+        ovs_mutex_unlock(&dp->bond_mutex);
+        return ENOENT;
+    }
+    ovs_mutex_unlock(&dp->bond_mutex);
+
+    /* Remove the bond map in all pmds. */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        dp_netdev_del_bond_tx_from_pmd(pmd, bond_id);
+    }
+    return 0;
+}
+
+static int
+dpif_netdev_bond_stats_get(struct dpif *dpif, uint32_t bond_id,
+                           uint64_t *n_bytes)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_pmd_thread *pmd;
+
+    if (!tx_bond_lookup(&dp->tx_bonds, bond_id)) {
+        return ENOENT;
+    }
+
+    /* Search the bond in all PMDs. */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        struct tx_bond *pmd_bond_entry
+            = tx_bond_lookup(&pmd->tx_bonds, bond_id);
+
+        if (!pmd_bond_entry) {
+            continue;
+        }
+
+        /* Read bond stats. */
+        for (int i = 0; i < BOND_BUCKETS; i++) {
+            uint64_t pmd_n_bytes;
+
+            atomic_read_relaxed(&pmd_bond_entry->member_buckets[i].n_bytes,
+                                &pmd_n_bytes);
+            n_bytes[i] += pmd_n_bytes;
+        }
+    }
+    return 0;
 }
 
 const struct dpif_class dpif_netdev_class = {
     "netdev",
+    true,                       /* cleanup_required */
     dpif_netdev_init,
     dpif_netdev_enumerate,
     dpif_netdev_port_open_type,
@@ -6788,6 +8439,7 @@ const struct dpif_class dpif_netdev_class = {
     dpif_netdev_run,
     dpif_netdev_wait,
     dpif_netdev_get_stats,
+    NULL,                      /* set_features */
     dpif_netdev_port_add,
     dpif_netdev_port_del,
     dpif_netdev_port_set_config,
@@ -6825,10 +8477,32 @@ const struct dpif_class dpif_netdev_class = {
     dpif_netdev_ct_set_maxconns,
     dpif_netdev_ct_get_maxconns,
     dpif_netdev_ct_get_nconns,
+    dpif_netdev_ct_set_tcp_seq_chk,
+    dpif_netdev_ct_get_tcp_seq_chk,
+    dpif_netdev_ct_set_limits,
+    dpif_netdev_ct_get_limits,
+    dpif_netdev_ct_del_limits,
+    dpif_netdev_ct_set_timeout_policy,
+    dpif_netdev_ct_get_timeout_policy,
+    dpif_netdev_ct_del_timeout_policy,
+    NULL,                       /* ct_timeout_policy_dump_start */
+    NULL,                       /* ct_timeout_policy_dump_next */
+    NULL,                       /* ct_timeout_policy_dump_done */
+    dpif_netdev_ct_get_timeout_policy_name,
+    dpif_netdev_ipf_set_enabled,
+    dpif_netdev_ipf_set_min_frag,
+    dpif_netdev_ipf_set_max_nfrags,
+    dpif_netdev_ipf_get_status,
+    dpif_netdev_ipf_dump_start,
+    dpif_netdev_ipf_dump_next,
+    dpif_netdev_ipf_dump_done,
     dpif_netdev_meter_get_features,
     dpif_netdev_meter_set,
     dpif_netdev_meter_get,
     dpif_netdev_meter_del,
+    dpif_netdev_bond_add,
+    dpif_netdev_bond_del,
+    dpif_netdev_bond_stats_get,
 };
 
 static void
@@ -6934,18 +8608,13 @@ dpif_dummy_register(enum dummy_level level)
 \f
 /* Datapath Classifier. */
 
-/* A set of rules that all have the same fields wildcarded. */
-struct dpcls_subtable {
-    /* The fields are only used by writers. */
-    struct cmap_node cmap_node OVS_GUARDED; /* Within dpcls 'subtables_map'. */
-
-    /* These fields are accessed by readers. */
-    struct cmap rules;           /* Contains "struct dpcls_rule"s. */
-    uint32_t hit_cnt;            /* Number of match hits in subtable in current
-                                    optimization interval. */
-    struct netdev_flow_key mask; /* Wildcards for fields (const). */
-    /* 'mask' must be the last field, additional space is allocated here. */
-};
+static void
+dpcls_subtable_destroy_cb(struct dpcls_subtable *subtable)
+{
+    cmap_destroy(&subtable->rules);
+    ovsrcu_postpone(free, subtable->mf_masks);
+    ovsrcu_postpone(free, subtable);
+}
 
 /* Initializes 'cls' as a classifier that initially contains no classification
  * rules. */
@@ -6963,8 +8632,7 @@ dpcls_destroy_subtable(struct dpcls *cls, struct dpcls_subtable *subtable)
     pvector_remove(&cls->subtables, subtable);
     cmap_remove(&cls->subtables_map, &subtable->cmap_node,
                 subtable->mask.hash);
-    cmap_destroy(&subtable->rules);
-    ovsrcu_postpone(free, subtable);
+    ovsrcu_postpone(dpcls_subtable_destroy_cb, subtable);
 }
 
 /* Destroys 'cls'.  Rules within 'cls', if any, are not freed; this is the
@@ -6996,6 +8664,23 @@ dpcls_create_subtable(struct dpcls *cls, const struct netdev_flow_key *mask)
     cmap_init(&subtable->rules);
     subtable->hit_cnt = 0;
     netdev_flow_key_clone(&subtable->mask, mask);
+
+    /* The count of bits in the mask defines the space required for masks.
+     * Then call gen_masks() to create the appropriate masks, avoiding the cost
+     * of doing runtime calculations. */
+    uint32_t unit0 = count_1bits(mask->mf.map.bits[0]);
+    uint32_t unit1 = count_1bits(mask->mf.map.bits[1]);
+    subtable->mf_bits_set_unit0 = unit0;
+    subtable->mf_bits_set_unit1 = unit1;
+    subtable->mf_masks = xmalloc(sizeof(uint64_t) * (unit0 + unit1));
+    netdev_flow_key_gen_masks(mask, subtable->mf_masks, unit0, unit1);
+
+    /* Get the preferred subtable search function for this (u0,u1) subtable.
+     * The function is guaranteed to always return a valid implementation, and
+     * possibly an ISA optimized, and/or specialized implementation.
+     */
+    subtable->lookup_func = dpcls_subtable_get_best_impl(unit0, unit1);
+
     cmap_insert(&cls->subtables_map, &subtable->cmap_node, mask->hash);
     /* Add the new subtable at the end of the pvector (with no hits yet) */
     pvector_insert(&cls->subtables, subtable, 0);
@@ -7020,6 +8705,28 @@ dpcls_find_subtable(struct dpcls *cls, const struct netdev_flow_key *mask)
     return dpcls_create_subtable(cls, mask);
 }
 
+/* Checks for the best available implementation for each subtable lookup
+ * function, and assigns it as the lookup function pointer for each subtable.
+ * Returns the number of subtables that have changed lookup implementation.
+ */
+static uint32_t
+dpcls_subtable_lookup_reprobe(struct dpcls *cls)
+{
+    struct pvector *pvec = &cls->subtables;
+    uint32_t subtables_changed = 0;
+    struct dpcls_subtable *subtable = NULL;
+
+    PVECTOR_FOR_EACH (subtable, pvec) {
+        uint32_t u0_bits = subtable->mf_bits_set_unit0;
+        uint32_t u1_bits = subtable->mf_bits_set_unit1;
+        void *old_func = subtable->lookup_func;
+        subtable->lookup_func = dpcls_subtable_get_best_impl(u0_bits, u1_bits);
+        subtables_changed += (old_func != subtable->lookup_func);
+    }
+    pvector_publish(pvec);
+
+    return subtables_changed;
+}
 
 /* Periodically sort the dpcls subtable vectors according to hit counts */
 static void
@@ -7040,9 +8747,42 @@ dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd,
                            struct polled_queue *poll_list, int poll_cnt)
 {
     struct dpcls *cls;
+    uint64_t tot_idle = 0, tot_proc = 0;
+    unsigned int pmd_load = 0;
 
     if (pmd->ctx.now > pmd->rxq_next_cycle_store) {
         uint64_t curr_tsc;
+        uint8_t rebalance_load_trigger;
+        struct pmd_auto_lb *pmd_alb = &pmd->dp->pmd_alb;
+        if (pmd_alb->is_enabled && !pmd->isolated
+            && (pmd->perf_stats.counters.n[PMD_CYCLES_ITER_IDLE] >=
+                                       pmd->prev_stats[PMD_CYCLES_ITER_IDLE])
+            && (pmd->perf_stats.counters.n[PMD_CYCLES_ITER_BUSY] >=
+                                        pmd->prev_stats[PMD_CYCLES_ITER_BUSY]))
+            {
+            tot_idle = pmd->perf_stats.counters.n[PMD_CYCLES_ITER_IDLE] -
+                       pmd->prev_stats[PMD_CYCLES_ITER_IDLE];
+            tot_proc = pmd->perf_stats.counters.n[PMD_CYCLES_ITER_BUSY] -
+                       pmd->prev_stats[PMD_CYCLES_ITER_BUSY];
+
+            if (tot_proc) {
+                pmd_load = ((tot_proc * 100) / (tot_idle + tot_proc));
+            }
+
+            atomic_read_relaxed(&pmd_alb->rebalance_load_thresh,
+                                &rebalance_load_trigger);
+            if (pmd_load >= rebalance_load_trigger) {
+                atomic_count_inc(&pmd->pmd_overloaded);
+            } else {
+                atomic_count_set(&pmd->pmd_overloaded, 0);
+            }
+        }
+
+        pmd->prev_stats[PMD_CYCLES_ITER_IDLE] =
+                        pmd->perf_stats.counters.n[PMD_CYCLES_ITER_IDLE];
+        pmd->prev_stats[PMD_CYCLES_ITER_BUSY] =
+                        pmd->perf_stats.counters.n[PMD_CYCLES_ITER_BUSY];
+
         /* Get the cycles that were used to process each queue and store. */
         for (unsigned i = 0; i < poll_cnt; i++) {
             uint64_t rxq_cyc_curr = dp_netdev_rxq_get_cycles(poll_list[i].rxq,
@@ -7108,9 +8848,46 @@ dpcls_remove(struct dpcls *cls, struct dpcls_rule *rule)
     }
 }
 
+/* Inner loop for mask generation of a unit, see netdev_flow_key_gen_masks. */
+static inline void
+netdev_flow_key_gen_mask_unit(uint64_t iter,
+                              const uint64_t count,
+                              uint64_t *mf_masks)
+{
+    int i;
+    for (i = 0; i < count; i++) {
+        uint64_t lowest_bit = (iter & -iter);
+        iter &= ~lowest_bit;
+        mf_masks[i] = (lowest_bit - 1);
+    }
+    /* Checks that count has covered all bits in the iter bitmap. */
+    ovs_assert(iter == 0);
+}
+
+/* Generate a mask for each block in the miniflow, based on the bits set. This
+ * allows easily masking packets with the generated array here, without
+ * calculations. This replaces runtime-calculating the masks.
+ * @param key The table to generate the mf_masks for
+ * @param mf_masks Pointer to a u64 array of at least *mf_bits* in size
+ * @param mf_bits_total Number of bits set in the whole miniflow (both units)
+ * @param mf_bits_unit0 Number of bits set in unit0 of the miniflow
+ */
+void
+netdev_flow_key_gen_masks(const struct netdev_flow_key *tbl,
+                          uint64_t *mf_masks,
+                          const uint32_t mf_bits_u0,
+                          const uint32_t mf_bits_u1)
+{
+    uint64_t iter_u0 = tbl->mf.map.bits[0];
+    uint64_t iter_u1 = tbl->mf.map.bits[1];
+
+    netdev_flow_key_gen_mask_unit(iter_u0, mf_bits_u0, &mf_masks[0]);
+    netdev_flow_key_gen_mask_unit(iter_u1, mf_bits_u1, &mf_masks[mf_bits_u0]);
+}
+
 /* Returns true if 'target' satisfies 'key' in 'mask', that is, if each 1-bit
  * in 'mask' the values in 'key' and 'target' are the same. */
-static bool
+bool
 dpcls_rule_matches_key(const struct dpcls_rule *rule,
                        const struct netdev_flow_key *target)
 {
@@ -7144,16 +8921,11 @@ dpcls_lookup(struct dpcls *cls, const struct netdev_flow_key *keys[],
     /* The received 'cnt' miniflows are the search-keys that will be processed
      * to find a matching entry into the available subtables.
      * The number of bits in map_type is equal to NETDEV_MAX_BURST. */
-    typedef uint32_t map_type;
-#define MAP_BITS (sizeof(map_type) * CHAR_BIT)
+#define MAP_BITS (sizeof(uint32_t) * CHAR_BIT)
     BUILD_ASSERT_DECL(MAP_BITS >= NETDEV_MAX_BURST);
 
     struct dpcls_subtable *subtable;
-
-    map_type keys_map = TYPE_MAXIMUM(map_type); /* Set all bits. */
-    map_type found_map;
-    uint32_t hashes[MAP_BITS];
-    const struct cmap_node *nodes[MAP_BITS];
+    uint32_t keys_map = TYPE_MAXIMUM(uint32_t); /* Set all bits. */
 
     if (cnt != MAP_BITS) {
         keys_map >>= MAP_BITS - cnt; /* Clear extra bits. */
@@ -7161,6 +8933,7 @@ dpcls_lookup(struct dpcls *cls, const struct netdev_flow_key *keys[],
     memset(rules, 0, cnt * sizeof *rules);
 
     int lookups_match = 0, subtable_pos = 1;
+    uint32_t found_map;
 
     /* The Datapath classifier - aka dpcls - is composed of subtables.
      * Subtables are dynamically created as needed when new rules are inserted.
@@ -7170,52 +8943,27 @@ dpcls_lookup(struct dpcls *cls, const struct netdev_flow_key *keys[],
      * search-key, the search for that key can stop because the rules are
      * non-overlapping. */
     PVECTOR_FOR_EACH (subtable, &cls->subtables) {
-        int i;
-
-        /* Compute hashes for the remaining keys.  Each search-key is
-         * masked with the subtable's mask to avoid hashing the wildcarded
-         * bits. */
-        ULLONG_FOR_EACH_1(i, keys_map) {
-            hashes[i] = netdev_flow_key_hash_in_mask(keys[i],
-                                                     &subtable->mask);
-        }
-        /* Lookup. */
-        found_map = cmap_find_batch(&subtable->rules, keys_map, hashes, nodes);
-        /* Check results.  When the i-th bit of found_map is set, it means
-         * that a set of nodes with a matching hash value was found for the
-         * i-th search-key.  Due to possible hash collisions we need to check
-         * which of the found rules, if any, really matches our masked
-         * search-key. */
-        ULLONG_FOR_EACH_1(i, found_map) {
-            struct dpcls_rule *rule;
-
-            CMAP_NODE_FOR_EACH (rule, cmap_node, nodes[i]) {
-                if (OVS_LIKELY(dpcls_rule_matches_key(rule, keys[i]))) {
-                    rules[i] = rule;
-                    /* Even at 20 Mpps the 32-bit hit_cnt cannot wrap
-                     * within one second optimization interval. */
-                    subtable->hit_cnt++;
-                    lookups_match += subtable_pos;
-                    goto next;
-                }
-            }
-            /* None of the found rules was a match.  Reset the i-th bit to
-             * keep searching this key in the next subtable. */
-            ULLONG_SET0(found_map, i);  /* Did not match. */
-        next:
-            ;                     /* Keep Sparse happy. */
-        }
-        keys_map &= ~found_map;             /* Clear the found rules. */
+        /* Call the subtable specific lookup function. */
+        found_map = subtable->lookup_func(subtable, keys_map, keys, rules);
+
+        /* Count the number of subtables searched for this packet match. This
+         * estimates the "spread" of subtables looked at per matched packet. */
+        uint32_t pkts_matched = count_1bits(found_map);
+        lookups_match += pkts_matched * subtable_pos;
+
+        /* Clear the found rules, and return early if all packets are found. */
+        keys_map &= ~found_map;
         if (!keys_map) {
             if (num_lookups_p) {
                 *num_lookups_p = lookups_match;
             }
-            return true;              /* All found. */
+            return true;
         }
         subtable_pos++;
     }
+
     if (num_lookups_p) {
         *num_lookups_p = lookups_match;
     }
-    return false;                     /* Some misses. */
+    return false;
 }