]> git.proxmox.com Git - mirror_ovs.git/blobdiff - lib/dpif-netdev.c
cirrus: Use FreeBSD 12.2.
[mirror_ovs.git] / lib / dpif-netdev.c
index a798db45d9cb0653cd75e9cd19dc7c6e8cbd8e50..e3fd0a07fd5b79152135bdbc800a5c815784a2a1 100644 (file)
 #include "bitmap.h"
 #include "cmap.h"
 #include "conntrack.h"
+#include "conntrack-tp.h"
 #include "coverage.h"
 #include "ct-dpif.h"
 #include "csum.h"
 #include "dp-packet.h"
 #include "dpif.h"
+#include "dpif-netdev-lookup.h"
 #include "dpif-netdev-perf.h"
 #include "dpif-provider.h"
 #include "dummy.h"
@@ -83,9 +85,9 @@
 VLOG_DEFINE_THIS_MODULE(dpif_netdev);
 
 /* Auto Load Balancing Defaults */
-#define ALB_ACCEPTABLE_IMPROVEMENT       25
-#define ALB_PMD_LOAD_THRESHOLD           95
-#define ALB_PMD_REBALANCE_POLL_INTERVAL  1 /* 1 Min */
+#define ALB_IMPROVEMENT_THRESHOLD    25
+#define ALB_LOAD_THRESHOLD           95
+#define ALB_REBALANCE_INTERVAL       1 /* 1 Min */
 #define MIN_TO_MSEC                  60000
 
 #define FLOW_DUMP_MAX_BATCH 50
@@ -97,7 +99,6 @@ DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
 #define DEFAULT_TX_FLUSH_INTERVAL 0
 
 /* Configuration parameters. */
-enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
 enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
 enum { MAX_BANDS = 8 };         /* Maximum number of bands / meter. */
 enum { N_METER_LOCKS = 64 };    /* Maximum number of meters. */
@@ -110,6 +111,7 @@ COVERAGE_DEFINE(datapath_drop_tunnel_push_error);
 COVERAGE_DEFINE(datapath_drop_tunnel_pop_error);
 COVERAGE_DEFINE(datapath_drop_recirc_error);
 COVERAGE_DEFINE(datapath_drop_invalid_port);
+COVERAGE_DEFINE(datapath_drop_invalid_bond);
 COVERAGE_DEFINE(datapath_drop_invalid_tnl_port);
 COVERAGE_DEFINE(datapath_drop_rx_invalid_packet);
 
@@ -239,6 +241,9 @@ struct dfc_cache {
  * and used during rxq to pmd assignment. */
 #define PMD_RXQ_INTERVAL_MAX 6
 
+/* Time in microseconds to try RCU quiescing. */
+#define PMD_RCU_QUIESCE_INTERVAL 10000LL
+
 struct dpcls {
     struct cmap_node node;      /* Within dp_netdev_pmd_thread.classifiers */
     odp_port_t in_port;
@@ -256,6 +261,7 @@ struct dp_packet_flow_map {
 static void dpcls_init(struct dpcls *);
 static void dpcls_destroy(struct dpcls *);
 static void dpcls_sort_subtable_vector(struct dpcls *);
+static uint32_t dpcls_subtable_lookup_reprobe(struct dpcls *cls);
 static void dpcls_insert(struct dpcls *, struct dpcls_rule *,
                          const struct netdev_flow_key *mask);
 static void dpcls_remove(struct dpcls *, struct dpcls_rule *);
@@ -294,6 +300,8 @@ struct pmd_auto_lb {
     bool is_enabled;            /* Current status of Auto load balancing. */
     uint64_t rebalance_intvl;
     uint64_t rebalance_poll_timer;
+    uint8_t rebalance_improve_thresh;
+    atomic_uint8_t rebalance_load_thresh;
 };
 
 /* Datapath based on the network device interface from netdev.h.
@@ -309,6 +317,7 @@ struct pmd_auto_lb {
  *
  *    dp_netdev_mutex (global)
  *    port_mutex
+ *    bond_mutex
  *    non_pmd_mutex
  */
 struct dp_netdev {
@@ -376,6 +385,10 @@ struct dp_netdev {
 
     struct conntrack *conntrack;
     struct pmd_auto_lb pmd_alb;
+
+    /* Bonds. */
+    struct ovs_mutex bond_mutex; /* Protects updates of 'tx_bonds'. */
+    struct cmap tx_bonds; /* Contains 'struct tx_bond'. */
 };
 
 static void meter_lock(const struct dp_netdev *dp, uint32_t meter_id)
@@ -481,6 +494,12 @@ struct dp_netdev_flow_stats {
     atomic_uint16_t tcp_flags;     /* Bitwise-OR of seen tcp_flags values. */
 };
 
+/* Contained by struct dp_netdev_flow's 'last_attrs' member.  */
+struct dp_netdev_flow_attrs {
+    atomic_bool offloaded;         /* True if flow is offloaded to HW. */
+    ATOMIC(const char *) dp_layer; /* DP layer the flow is handled in. */
+};
+
 /* A flow in 'dp_netdev_pmd_thread's 'flow_table'.
  *
  *
@@ -541,6 +560,11 @@ struct dp_netdev_flow {
     /* Statistics. */
     struct dp_netdev_flow_stats stats;
 
+    /* Statistics and attributes received from the netdev offload provider. */
+    atomic_int netdev_flow_get_result;
+    struct dp_netdev_flow_stats last_stats;
+    struct dp_netdev_flow_attrs last_attrs;
+
     /* Actions. */
     OVSRCU_TYPE(struct dp_netdev_actions *) actions;
 
@@ -607,6 +631,20 @@ struct tx_port {
     struct dp_netdev_rxq *output_pkts_rxqs[NETDEV_MAX_BURST];
 };
 
+/* Contained by struct tx_bond 'member_buckets'. */
+struct member_entry {
+    odp_port_t member_id;
+    atomic_ullong n_packets;
+    atomic_ullong n_bytes;
+};
+
+/* Contained by struct dp_netdev_pmd_thread's 'tx_bonds'. */
+struct tx_bond {
+    struct cmap_node node;
+    uint32_t bond_id;
+    struct member_entry member_buckets[BOND_BUCKETS];
+};
+
 /* A set of properties for the current processing loop that is not directly
  * associated with the pmd thread itself, but with the packets being
  * processed or the short-term system configuration (for example, time).
@@ -739,6 +777,11 @@ struct dp_netdev_pmd_thread {
      * read by the pmd thread. */
     struct hmap tx_ports OVS_GUARDED;
 
+    struct ovs_mutex bond_mutex;    /* Protects updates of 'tx_bonds'. */
+    /* Map of 'tx_bond's used for transmission.  Written by the main thread
+     * and read by the pmd thread. */
+    struct cmap tx_bonds;
+
     /* These are thread-local copies of 'tx_ports'.  One contains only tunnel
      * ports (that support push_tunnel/pop_tunnel), the other contains ports
      * with at least one txq (that support send).  A port can be in both.
@@ -762,6 +805,9 @@ struct dp_netdev_pmd_thread {
 
     /* Set to true if the pmd thread needs to be reloaded. */
     bool need_reload;
+
+    /* Next time when PMD should try RCU quiescing. */
+    long long next_rcu_quiesce;
 };
 
 /* Interface to netdev-based datapath. */
@@ -830,6 +876,12 @@ static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
 static int
 dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
                                    bool force);
+static void dp_netdev_add_bond_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
+                                         struct tx_bond *bond, bool update)
+    OVS_EXCLUDED(pmd->bond_mutex);
+static void dp_netdev_del_bond_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
+                                           uint32_t bond_id)
+    OVS_EXCLUDED(pmd->bond_mutex);
 
 static void reconfigure_datapath(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex);
@@ -858,6 +910,9 @@ dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
                                bool purge);
 static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
                                       struct tx_port *tx);
+static inline struct dpcls *
+dp_netdev_pmd_lookup_dpcls(struct dp_netdev_pmd_thread *pmd,
+                           odp_port_t in_port);
 
 static inline bool emc_entry_alive(struct emc_entry *ce);
 static void emc_clear_entry(struct emc_entry *ce);
@@ -1258,6 +1313,121 @@ sorted_poll_thread_list(struct dp_netdev *dp,
     *n = k;
 }
 
+static void
+dpif_netdev_subtable_lookup_get(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                                const char *argv[] OVS_UNUSED,
+                                void *aux OVS_UNUSED)
+{
+    /* Get a list of all lookup functions. */
+    struct dpcls_subtable_lookup_info_t *lookup_funcs = NULL;
+    int32_t count = dpcls_subtable_lookup_info_get(&lookup_funcs);
+    if (count < 0) {
+        unixctl_command_reply_error(conn, "error getting lookup names");
+        return;
+    }
+
+    /* Add all lookup functions to reply string. */
+    struct ds reply = DS_EMPTY_INITIALIZER;
+    ds_put_cstr(&reply, "Available lookup functions (priority : name)\n");
+    for (int i = 0; i < count; i++) {
+        ds_put_format(&reply, "  %d : %s\n", lookup_funcs[i].prio,
+                      lookup_funcs[i].name);
+    }
+    unixctl_command_reply(conn, ds_cstr(&reply));
+    ds_destroy(&reply);
+}
+
+static void
+dpif_netdev_subtable_lookup_set(struct unixctl_conn *conn, int argc,
+                                const char *argv[], void *aux OVS_UNUSED)
+{
+    /* This function requires 2 parameters (argv[1] and argv[2]) to execute.
+     *   argv[1] is subtable name
+     *   argv[2] is priority
+     *   argv[3] is the datapath name (optional if only 1 datapath exists)
+     */
+    const char *func_name = argv[1];
+
+    errno = 0;
+    char *err_char;
+    uint32_t new_prio = strtoul(argv[2], &err_char, 10);
+    if (errno != 0 || new_prio > UINT8_MAX) {
+        unixctl_command_reply_error(conn,
+            "error converting priority, use integer in range 0-255\n");
+        return;
+    }
+
+    int32_t err = dpcls_subtable_set_prio(func_name, new_prio);
+    if (err) {
+        unixctl_command_reply_error(conn,
+            "error, subtable lookup function not found\n");
+        return;
+    }
+
+    /* argv[3] is optional datapath instance. If no datapath name is provided
+     * and only one datapath exists, the one existing datapath is reprobed.
+     */
+    ovs_mutex_lock(&dp_netdev_mutex);
+    struct dp_netdev *dp = NULL;
+
+    if (argc == 4) {
+        dp = shash_find_data(&dp_netdevs, argv[3]);
+    } else if (shash_count(&dp_netdevs) == 1) {
+        dp = shash_first(&dp_netdevs)->data;
+    }
+
+    if (!dp) {
+        ovs_mutex_unlock(&dp_netdev_mutex);
+        unixctl_command_reply_error(conn,
+                                    "please specify an existing datapath");
+        return;
+    }
+
+    /* Get PMD threads list, required to get DPCLS instances. */
+    size_t n;
+    uint32_t lookup_dpcls_changed = 0;
+    uint32_t lookup_subtable_changed = 0;
+    struct dp_netdev_pmd_thread **pmd_list;
+    sorted_poll_thread_list(dp, &pmd_list, &n);
+
+    /* take port mutex as HMAP iters over them. */
+    ovs_mutex_lock(&dp->port_mutex);
+
+    for (size_t i = 0; i < n; i++) {
+        struct dp_netdev_pmd_thread *pmd = pmd_list[i];
+        if (pmd->core_id == NON_PMD_CORE_ID) {
+            continue;
+        }
+
+        struct dp_netdev_port *port = NULL;
+        HMAP_FOR_EACH (port, node, &dp->ports) {
+            odp_port_t in_port = port->port_no;
+            struct dpcls *cls = dp_netdev_pmd_lookup_dpcls(pmd, in_port);
+            if (!cls) {
+                continue;
+            }
+            uint32_t subtbl_changes = dpcls_subtable_lookup_reprobe(cls);
+            if (subtbl_changes) {
+                lookup_dpcls_changed++;
+                lookup_subtable_changed += subtbl_changes;
+            }
+        }
+    }
+
+    /* release port mutex before netdev mutex. */
+    ovs_mutex_unlock(&dp->port_mutex);
+    ovs_mutex_unlock(&dp_netdev_mutex);
+
+    struct ds reply = DS_EMPTY_INITIALIZER;
+    ds_put_format(&reply,
+        "Lookup priority change affected %d dpcls ports and %d subtables.\n",
+        lookup_dpcls_changed, lookup_subtable_changed);
+    const char *reply_str = ds_cstr(&reply);
+    unixctl_command_reply(conn, reply_str);
+    VLOG_INFO("%s", reply_str);
+    ds_destroy(&reply);
+}
+
 static void
 dpif_netdev_pmd_rebalance(struct unixctl_conn *conn, int argc,
                           const char *argv[], void *aux OVS_UNUSED)
@@ -1396,6 +1566,49 @@ pmd_perf_show_cmd(struct unixctl_conn *conn, int argc,
     par.command_type = PMD_INFO_PERF_SHOW;
     dpif_netdev_pmd_info(conn, argc, argv, &par);
 }
+
+static void
+dpif_netdev_bond_show(struct unixctl_conn *conn, int argc,
+                      const char *argv[], void *aux OVS_UNUSED)
+{
+    struct ds reply = DS_EMPTY_INITIALIZER;
+    struct dp_netdev *dp = NULL;
+
+    ovs_mutex_lock(&dp_netdev_mutex);
+    if (argc == 2) {
+        dp = shash_find_data(&dp_netdevs, argv[1]);
+    } else if (shash_count(&dp_netdevs) == 1) {
+        /* There's only one datapath. */
+        dp = shash_first(&dp_netdevs)->data;
+    }
+    if (!dp) {
+        ovs_mutex_unlock(&dp_netdev_mutex);
+        unixctl_command_reply_error(conn,
+                                    "please specify an existing datapath");
+        return;
+    }
+
+    if (cmap_count(&dp->tx_bonds) > 0) {
+        struct tx_bond *dp_bond_entry;
+
+        ds_put_cstr(&reply, "Bonds:\n");
+        CMAP_FOR_EACH (dp_bond_entry, node, &dp->tx_bonds) {
+            ds_put_format(&reply, "  bond-id %"PRIu32":\n",
+                          dp_bond_entry->bond_id);
+            for (int bucket = 0; bucket < BOND_BUCKETS; bucket++) {
+                uint32_t member_id = odp_to_u32(
+                    dp_bond_entry->member_buckets[bucket].member_id);
+                ds_put_format(&reply,
+                              "    bucket %d - member %"PRIu32"\n",
+                              bucket, member_id);
+            }
+        }
+    }
+    ovs_mutex_unlock(&dp_netdev_mutex);
+    unixctl_command_reply(conn, ds_cstr(&reply));
+    ds_destroy(&reply);
+}
+
 \f
 static int
 dpif_netdev_init(void)
@@ -1427,6 +1640,16 @@ dpif_netdev_init(void)
                              "[-us usec] [-q qlen]",
                              0, 10, pmd_perf_log_set_cmd,
                              NULL);
+    unixctl_command_register("dpif-netdev/bond-show", "[dp]",
+                             0, 1, dpif_netdev_bond_show,
+                             NULL);
+    unixctl_command_register("dpif-netdev/subtable-lookup-prio-set",
+                             "[lookup_func] [prio] [dp]",
+                             2, 3, dpif_netdev_subtable_lookup_set,
+                             NULL);
+    unixctl_command_register("dpif-netdev/subtable-lookup-prio-get", "",
+                             0, 0, dpif_netdev_subtable_lookup_get,
+                             NULL);
     return 0;
 }
 
@@ -1551,6 +1774,9 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     ovs_mutex_init_recursive(&dp->port_mutex);
     hmap_init(&dp->ports);
     dp->port_seq = seq_create();
+    ovs_mutex_init(&dp->bond_mutex);
+    cmap_init(&dp->tx_bonds);
+
     fat_rwlock_init(&dp->upcall_rwlock);
 
     dp->reconfigure_seq = seq_create();
@@ -1657,6 +1883,12 @@ dp_delete_meter(struct dp_netdev *dp, uint32_t meter_id)
     }
 }
 
+static uint32_t
+hash_bond_id(uint32_t bond_id)
+{
+    return hash_int(bond_id, 0);
+}
+
 /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
  * through the 'dp_netdevs' shash while freeing 'dp'. */
 static void
@@ -1664,6 +1896,7 @@ dp_netdev_free(struct dp_netdev *dp)
     OVS_REQUIRES(dp_netdev_mutex)
 {
     struct dp_netdev_port *port, *next;
+    struct tx_bond *bond;
 
     shash_find_and_delete(&dp_netdevs, dp->name);
 
@@ -1673,6 +1906,13 @@ dp_netdev_free(struct dp_netdev *dp)
     }
     ovs_mutex_unlock(&dp->port_mutex);
 
+    ovs_mutex_lock(&dp->bond_mutex);
+    CMAP_FOR_EACH (bond, node, &dp->tx_bonds) {
+        cmap_remove(&dp->tx_bonds, &bond->node, hash_bond_id(bond->bond_id));
+        ovsrcu_postpone(free, bond);
+    }
+    ovs_mutex_unlock(&dp->bond_mutex);
+
     dp_netdev_destroy_all_pmds(dp, true);
     cmap_destroy(&dp->poll_threads);
 
@@ -1691,6 +1931,9 @@ dp_netdev_free(struct dp_netdev *dp)
     hmap_destroy(&dp->ports);
     ovs_mutex_destroy(&dp->port_mutex);
 
+    cmap_destroy(&dp->tx_bonds);
+    ovs_mutex_destroy(&dp->bond_mutex);
+
     /* Upcalls must be disabled at this point */
     dp_netdev_destroy_upcall_lock(dp);
 
@@ -2040,6 +2283,8 @@ static void
 do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     OVS_REQUIRES(dp->port_mutex)
 {
+    netdev_flow_flush(port->netdev);
+    netdev_uninit_flow_api(port->netdev);
     hmap_remove(&dp->ports, &port->node);
     seq_change(dp->port_seq);
 
@@ -2149,7 +2394,11 @@ dp_netdev_pmd_find_dpcls(struct dp_netdev_pmd_thread *pmd,
 }
 
 #define MAX_FLOW_MARK       (UINT32_MAX - 1)
-#define INVALID_FLOW_MARK   (UINT32_MAX)
+#define INVALID_FLOW_MARK   0
+/* Zero flow mark is used to indicate the HW to remove the mark. A packet
+ * marked with zero mark is received in SW without a mark at all, so it
+ * cannot be used as a valid mark.
+ */
 
 struct megaflow_to_mark_data {
     const struct cmap_node node;
@@ -2175,7 +2424,7 @@ flow_mark_alloc(void)
 
     if (!flow_mark.pool) {
         /* Haven't initiated yet, do it here */
-        flow_mark.pool = id_pool_create(0, MAX_FLOW_MARK);
+        flow_mark.pool = id_pool_create(1, MAX_FLOW_MARK);
     }
 
     if (id_pool_alloc_id(flow_mark.pool, &mark)) {
@@ -2253,7 +2502,8 @@ mark_to_flow_associate(const uint32_t mark, struct dp_netdev_flow *flow)
                 hash_int(mark, 0));
     flow->mark = mark;
 
-    VLOG_DBG("Associated dp_netdev flow %p with mark %u\n", flow, mark);
+    VLOG_DBG("Associated dp_netdev flow %p with mark %u mega_ufid "UUID_FMT,
+             flow, mark, UUID_ARGS((struct uuid *) &flow->mega_ufid));
 }
 
 static bool
@@ -2275,10 +2525,17 @@ static int
 mark_to_flow_disassociate(struct dp_netdev_pmd_thread *pmd,
                           struct dp_netdev_flow *flow)
 {
-    int ret = 0;
-    uint32_t mark = flow->mark;
+    const char *dpif_type_str = dpif_normalize_type(pmd->dp->class->type);
     struct cmap_node *mark_node = CONST_CAST(struct cmap_node *,
                                              &flow->mark_node);
+    uint32_t mark = flow->mark;
+    int ret = 0;
+
+    /* INVALID_FLOW_MARK may mean that the flow has been disassociated or
+     * never associated. */
+    if (OVS_UNLIKELY(mark == INVALID_FLOW_MARK)) {
+        return EINVAL;
+    }
 
     cmap_remove(&flow_mark.mark_to_flow, mark_node, hash_int(mark, 0));
     flow->mark = INVALID_FLOW_MARK;
@@ -2291,7 +2548,7 @@ mark_to_flow_disassociate(struct dp_netdev_pmd_thread *pmd,
         struct netdev *port;
         odp_port_t in_port = flow->flow.in_port.odp_port;
 
-        port = netdev_ports_get(in_port, pmd->dp->class);
+        port = netdev_ports_get(in_port, dpif_type_str);
         if (port) {
             /* Taking a global 'port_mutex' to fulfill thread safety
              * restrictions for the netdev-offload-dpdk module. */
@@ -2302,7 +2559,8 @@ mark_to_flow_disassociate(struct dp_netdev_pmd_thread *pmd,
         }
 
         flow_mark_free(mark);
-        VLOG_DBG("Freed flow mark %u\n", mark);
+        VLOG_DBG("Freed flow mark %u mega_ufid "UUID_FMT, mark,
+                 UUID_ARGS((struct uuid *) &flow->mega_ufid));
 
         megaflow_to_mark_disassociate(&flow->mega_ufid);
     }
@@ -2398,9 +2656,9 @@ static int
 dp_netdev_flow_offload_put(struct dp_flow_offload_item *offload)
 {
     struct dp_netdev_pmd_thread *pmd = offload->pmd;
-    const struct dpif_class *dpif_class = pmd->dp->class;
     struct dp_netdev_flow *flow = offload->flow;
     odp_port_t in_port = flow->flow.in_port.odp_port;
+    const char *dpif_type_str = dpif_normalize_type(pmd->dp->class->type);
     bool modification = offload->op == DP_NETDEV_FLOW_OFFLOAD_OP_MOD;
     struct offload_info info;
     struct netdev *port;
@@ -2433,12 +2691,12 @@ dp_netdev_flow_offload_put(struct dp_flow_offload_item *offload)
         mark = flow_mark_alloc();
         if (mark == INVALID_FLOW_MARK) {
             VLOG_ERR("Failed to allocate flow mark!\n");
+            return -1;
         }
     }
     info.flow_mark = mark;
-    info.dpif_class = dpif_class;
 
-    port = netdev_ports_get(in_port, pmd->dp->class);
+    port = netdev_ports_get(in_port, dpif_type_str);
     if (!port || netdev_vport_is_vport_class(port->netdev_class)) {
         netdev_close(port);
         goto err_free;
@@ -2509,8 +2767,9 @@ dp_netdev_flow_offload_main(void *data OVS_UNUSED)
             OVS_NOT_REACHED();
         }
 
-        VLOG_DBG("%s to %s netdev flow\n",
-                 ret == 0 ? "succeed" : "failed", op);
+        VLOG_DBG("%s to %s netdev flow "UUID_FMT,
+                 ret == 0 ? "succeed" : "failed", op,
+                 UUID_ARGS((struct uuid *) &offload->flow->mega_ufid));
         dp_netdev_free_flow_offload(offload);
         ovsrcu_quiesce();
     }
@@ -3033,9 +3292,56 @@ dp_netdev_pmd_find_flow(const struct dp_netdev_pmd_thread *pmd,
     return NULL;
 }
 
+static void
+dp_netdev_flow_set_last_stats_attrs(struct dp_netdev_flow *netdev_flow,
+                                    const struct dpif_flow_stats *stats,
+                                    const struct dpif_flow_attrs *attrs,
+                                    int result)
+{
+    struct dp_netdev_flow_stats *last_stats = &netdev_flow->last_stats;
+    struct dp_netdev_flow_attrs *last_attrs = &netdev_flow->last_attrs;
+
+    atomic_store_relaxed(&netdev_flow->netdev_flow_get_result, result);
+    if (result) {
+        return;
+    }
+
+    atomic_store_relaxed(&last_stats->used,         stats->used);
+    atomic_store_relaxed(&last_stats->packet_count, stats->n_packets);
+    atomic_store_relaxed(&last_stats->byte_count,   stats->n_bytes);
+    atomic_store_relaxed(&last_stats->tcp_flags,    stats->tcp_flags);
+
+    atomic_store_relaxed(&last_attrs->offloaded,    attrs->offloaded);
+    atomic_store_relaxed(&last_attrs->dp_layer,     attrs->dp_layer);
+
+}
+
+static void
+dp_netdev_flow_get_last_stats_attrs(struct dp_netdev_flow *netdev_flow,
+                                    struct dpif_flow_stats *stats,
+                                    struct dpif_flow_attrs *attrs,
+                                    int *result)
+{
+    struct dp_netdev_flow_stats *last_stats = &netdev_flow->last_stats;
+    struct dp_netdev_flow_attrs *last_attrs = &netdev_flow->last_attrs;
+
+    atomic_read_relaxed(&netdev_flow->netdev_flow_get_result, result);
+    if (*result) {
+        return;
+    }
+
+    atomic_read_relaxed(&last_stats->used,         &stats->used);
+    atomic_read_relaxed(&last_stats->packet_count, &stats->n_packets);
+    atomic_read_relaxed(&last_stats->byte_count,   &stats->n_bytes);
+    atomic_read_relaxed(&last_stats->tcp_flags,    &stats->tcp_flags);
+
+    atomic_read_relaxed(&last_attrs->offloaded,    &attrs->offloaded);
+    atomic_read_relaxed(&last_attrs->dp_layer,     &attrs->dp_layer);
+}
+
 static bool
 dpif_netdev_get_flow_offload_status(const struct dp_netdev *dp,
-                                    const struct dp_netdev_flow *netdev_flow,
+                                    struct dp_netdev_flow *netdev_flow,
                                     struct dpif_flow_stats *stats,
                                     struct dpif_flow_attrs *attrs)
 {
@@ -3051,17 +3357,38 @@ dpif_netdev_get_flow_offload_status(const struct dp_netdev *dp,
         return false;
     }
 
-    netdev = netdev_ports_get(netdev_flow->flow.in_port.odp_port, dp->class);
+    netdev = netdev_ports_get(netdev_flow->flow.in_port.odp_port,
+                              dpif_normalize_type(dp->class->type));
     if (!netdev) {
         return false;
     }
     ofpbuf_use_stack(&buf, &act_buf, sizeof act_buf);
     /* Taking a global 'port_mutex' to fulfill thread safety
-     * restrictions for the netdev-offload-dpdk module. */
-    ovs_mutex_lock(&dp->port_mutex);
-    ret = netdev_flow_get(netdev, &match, &actions, &netdev_flow->mega_ufid,
-                          stats, attrs, &buf);
-    ovs_mutex_unlock(&dp->port_mutex);
+     * restrictions for the netdev-offload-dpdk module.
+     *
+     * XXX: Main thread will try to pause/stop all revalidators during datapath
+     *      reconfiguration via datapath purge callback (dp_purge_cb) while
+     *      holding 'dp->port_mutex'.  So we're not waiting for mutex here.
+     *      Otherwise, deadlock is possible, bcause revalidators might sleep
+     *      waiting for the main thread to release the lock and main thread
+     *      will wait for them to stop processing.
+     *      This workaround might make statistics less accurate. Especially
+     *      for flow deletion case, since there will be no other attempt.  */
+    if (!ovs_mutex_trylock(&dp->port_mutex)) {
+        ret = netdev_flow_get(netdev, &match, &actions,
+                              &netdev_flow->mega_ufid, stats, attrs, &buf);
+        /* Storing statistics and attributes from the last request for
+         * later use on mutex contention. */
+        dp_netdev_flow_set_last_stats_attrs(netdev_flow, stats, attrs, ret);
+        ovs_mutex_unlock(&dp->port_mutex);
+    } else {
+        dp_netdev_flow_get_last_stats_attrs(netdev_flow, stats, attrs, &ret);
+        if (!ret && !attrs->dp_layer) {
+            /* Flow was never reported as 'offloaded' so it's harmless
+             * to continue to think so. */
+            ret = EAGAIN;
+        }
+    }
     netdev_close(netdev);
     if (ret) {
         return false;
@@ -3330,6 +3657,9 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
     /* Do not allocate extra space. */
     flow = xmalloc(sizeof *flow - sizeof flow->cr.flow.mf + mask.len);
     memset(&flow->stats, 0, sizeof flow->stats);
+    atomic_init(&flow->netdev_flow_get_result, 0);
+    memset(&flow->last_stats, 0, sizeof flow->last_stats);
+    memset(&flow->last_attrs, 0, sizeof flow->last_attrs);
     flow->dead = false;
     flow->batch = NULL;
     flow->mark = INVALID_FLOW_MARK;
@@ -3381,6 +3711,8 @@ dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
 
         ds_put_cstr(&ds, "flow_add: ");
         odp_format_ufid(ufid, &ds);
+        ds_put_cstr(&ds, " mega_");
+        odp_format_ufid(&flow->mega_ufid, &ds);
         ds_put_cstr(&ds, " ");
         odp_flow_format(key_buf.data, key_buf.size,
                         mask_buf.data, mask_buf.size,
@@ -3429,13 +3761,8 @@ flow_put_on_pmd(struct dp_netdev_pmd_thread *pmd,
     netdev_flow = dp_netdev_pmd_lookup_flow(pmd, key, NULL);
     if (!netdev_flow) {
         if (put->flags & DPIF_FP_CREATE) {
-            if (cmap_count(&pmd->flow_table) < MAX_FLOWS) {
-                dp_netdev_flow_add(pmd, match, ufid, put->actions,
-                                   put->actions_len);
-                error = 0;
-            } else {
-                error = EFBIG;
-            }
+            dp_netdev_flow_add(pmd, match, ufid, put->actions,
+                               put->actions_len);
         } else {
             error = ENOENT;
         }
@@ -3876,11 +4203,12 @@ dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops,
 
 /* Enable or Disable PMD auto load balancing. */
 static void
-set_pmd_auto_lb(struct dp_netdev *dp)
+set_pmd_auto_lb(struct dp_netdev *dp, bool always_log)
 {
     unsigned int cnt = 0;
     struct dp_netdev_pmd_thread *pmd;
     struct pmd_auto_lb *pmd_alb = &dp->pmd_alb;
+    uint8_t rebalance_load_thresh;
 
     bool enable_alb = false;
     bool multi_rxq = false;
@@ -3907,18 +4235,24 @@ set_pmd_auto_lb(struct dp_netdev *dp)
     enable_alb = enable_alb && pmd_rxq_assign_cyc &&
                     pmd_alb->auto_lb_requested;
 
-    if (pmd_alb->is_enabled != enable_alb) {
+    if (pmd_alb->is_enabled != enable_alb || always_log) {
         pmd_alb->is_enabled = enable_alb;
         if (pmd_alb->is_enabled) {
+            atomic_read_relaxed(&pmd_alb->rebalance_load_thresh,
+                                &rebalance_load_thresh);
             VLOG_INFO("PMD auto load balance is enabled "
-                      "(with rebalance interval:%"PRIu64" msec)",
-                       pmd_alb->rebalance_intvl);
+                      "interval %"PRIu64" mins, "
+                      "pmd load threshold %"PRIu8"%%, "
+                      "improvement threshold %"PRIu8"%%",
+                       pmd_alb->rebalance_intvl / MIN_TO_MSEC,
+                       rebalance_load_thresh,
+                       pmd_alb->rebalance_improve_thresh);
+
         } else {
             pmd_alb->rebalance_poll_timer = 0;
             VLOG_INFO("PMD auto load balance is disabled");
         }
     }
-
 }
 
 /* Applies datapath configuration from the database. Some of the changes are
@@ -3936,6 +4270,9 @@ dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
     uint32_t insert_min, cur_min;
     uint32_t tx_flush_interval, cur_tx_flush_interval;
     uint64_t rebalance_intvl;
+    uint8_t rebalance_load, cur_rebalance_load;
+    uint8_t rebalance_improve;
+    bool log_autolb = false;
 
     tx_flush_interval = smap_get_int(other_config, "tx-flush-interval",
                                      DEFAULT_TX_FLUSH_INTERVAL);
@@ -4013,7 +4350,7 @@ dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
                               false);
 
     rebalance_intvl = smap_get_int(other_config, "pmd-auto-lb-rebal-interval",
-                              ALB_PMD_REBALANCE_POLL_INTERVAL);
+                                   ALB_REBALANCE_INTERVAL);
 
     /* Input is in min, convert it to msec. */
     rebalance_intvl =
@@ -4021,9 +4358,38 @@ dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
 
     if (pmd_alb->rebalance_intvl != rebalance_intvl) {
         pmd_alb->rebalance_intvl = rebalance_intvl;
-    }
-
-    set_pmd_auto_lb(dp);
+        VLOG_INFO("PMD auto load balance interval set to "
+                  "%"PRIu64" mins\n", rebalance_intvl / MIN_TO_MSEC);
+        log_autolb = true;
+    }
+
+    rebalance_improve = smap_get_int(other_config,
+                                     "pmd-auto-lb-improvement-threshold",
+                                     ALB_IMPROVEMENT_THRESHOLD);
+    if (rebalance_improve > 100) {
+        rebalance_improve = ALB_IMPROVEMENT_THRESHOLD;
+    }
+    if (rebalance_improve != pmd_alb->rebalance_improve_thresh) {
+        pmd_alb->rebalance_improve_thresh = rebalance_improve;
+        VLOG_INFO("PMD auto load balance improvement threshold set to "
+                  "%"PRIu8"%%", rebalance_improve);
+        log_autolb = true;
+    }
+
+    rebalance_load = smap_get_int(other_config, "pmd-auto-lb-load-threshold",
+                                  ALB_LOAD_THRESHOLD);
+    if (rebalance_load > 100) {
+        rebalance_load = ALB_LOAD_THRESHOLD;
+    }
+    atomic_read_relaxed(&pmd_alb->rebalance_load_thresh, &cur_rebalance_load);
+    if (rebalance_load != cur_rebalance_load) {
+        atomic_store_relaxed(&pmd_alb->rebalance_load_thresh,
+                             rebalance_load);
+        VLOG_INFO("PMD auto load balance load threshold set to %"PRIu8"%%",
+                  rebalance_load);
+        log_autolb = true;
+    }
+    set_pmd_auto_lb(dp, log_autolb);
     return 0;
 }
 
@@ -4422,6 +4788,20 @@ tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
     return NULL;
 }
 
+static struct tx_bond *
+tx_bond_lookup(const struct cmap *tx_bonds, uint32_t bond_id)
+{
+    uint32_t hash = hash_bond_id(bond_id);
+    struct tx_bond *tx;
+
+    CMAP_FOR_EACH_WITH_HASH (tx, node, hash, tx_bonds) {
+        if (tx->bond_id == bond_id) {
+            return tx;
+        }
+    }
+    return NULL;
+}
+
 static int
 port_reconfigure(struct dp_netdev_port *port)
 {
@@ -4941,9 +5321,17 @@ reconfigure_datapath(struct dp_netdev *dp)
 
     /* Check for all the ports that need reconfiguration.  We cache this in
      * 'port->need_reconfigure', because netdev_is_reconf_required() can
-     * change at any time. */
+     * change at any time.
+     * Also mark for reconfiguration all ports which will likely change their
+     * 'dynamic_txqs' parameter.  It's required to stop using them before
+     * changing this setting and it's simpler to mark ports here and allow
+     * 'pmd_remove_stale_ports' to remove them from threads.  There will be
+     * no actual reconfiguration in 'port_reconfigure' because it's
+     * unnecessary.  */
     HMAP_FOR_EACH (port, node, &dp->ports) {
-        if (netdev_is_reconf_required(port->netdev)) {
+        if (netdev_is_reconf_required(port->netdev)
+            || (port->dynamic_txqs
+                != (netdev_n_txq(port->netdev) < wanted_txqs))) {
             port->need_reconfigure = true;
         }
     }
@@ -5061,14 +5449,22 @@ reconfigure_datapath(struct dp_netdev *dp)
         }
     }
 
-    /* Add every port to the tx cache of every pmd thread, if it's not
-     * there already and if this pmd has at least one rxq to poll. */
+    /* Add every port and bond to the tx port and bond caches of
+     * every pmd thread, if it's not there already and if this pmd
+     * has at least one rxq to poll.
+     */
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
         ovs_mutex_lock(&pmd->port_mutex);
         if (hmap_count(&pmd->poll_list) || pmd->core_id == NON_PMD_CORE_ID) {
+            struct tx_bond *bond;
+
             HMAP_FOR_EACH (port, node, &dp->ports) {
                 dp_netdev_add_port_tx_to_pmd(pmd, port);
             }
+
+            CMAP_FOR_EACH (bond, node, &dp->tx_bonds) {
+                dp_netdev_add_bond_tx_to_pmd(pmd, bond, false);
+            }
         }
         ovs_mutex_unlock(&pmd->port_mutex);
     }
@@ -5077,7 +5473,7 @@ reconfigure_datapath(struct dp_netdev *dp)
     reload_affected_pmds(dp);
 
     /* Check if PMD Auto LB is to be enabled */
-    set_pmd_auto_lb(dp);
+    set_pmd_auto_lb(dp, false);
 }
 
 /* Returns true if one of the netdevs in 'dp' requires a reconfiguration */
@@ -5321,7 +5717,7 @@ pmd_rebalance_dry_run(struct dp_netdev *dp)
             improvement =
                 ((curr_variance - new_variance) * 100) / curr_variance;
         }
-        if (improvement < ALB_ACCEPTABLE_IMPROVEMENT) {
+        if (improvement < dp->pmd_alb.rebalance_improve_thresh) {
             ret = false;
         }
     }
@@ -5608,6 +6004,9 @@ reload:
     pmd->intrvl_tsc_prev = 0;
     atomic_store_relaxed(&pmd->intrvl_cycles, 0);
     cycles_counter_update(s);
+
+    pmd->next_rcu_quiesce = pmd->ctx.now + PMD_RCU_QUIESCE_INTERVAL;
+
     /* Protect pmd stats from external clearing while polling. */
     ovs_mutex_lock(&pmd->perf_stats.stats_mutex);
     for (;;) {
@@ -5642,6 +6041,16 @@ reload:
             tx_packets = dp_netdev_pmd_flush_output_packets(pmd, false);
         }
 
+        /* Do RCU synchronization at fixed interval.  This ensures that
+         * synchronization would not be delayed long even at high load of
+         * packet processing. */
+        if (pmd->ctx.now > pmd->next_rcu_quiesce) {
+            if (!ovsrcu_try_quiesce()) {
+                pmd->next_rcu_quiesce =
+                    pmd->ctx.now + PMD_RCU_QUIESCE_INTERVAL;
+            }
+        }
+
         if (lc++ > 1024) {
             lc = 0;
 
@@ -5649,6 +6058,8 @@ reload:
             dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt);
             if (!ovsrcu_try_quiesce()) {
                 emc_cache_slow_sweep(&((pmd->flow_cache).emc_cache));
+                pmd->next_rcu_quiesce =
+                    pmd->ctx.now + PMD_RCU_QUIESCE_INTERVAL;
             }
 
             for (i = 0; i < poll_cnt; i++) {
@@ -6111,16 +6522,19 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     atomic_init(&pmd->reload, false);
     ovs_mutex_init(&pmd->flow_mutex);
     ovs_mutex_init(&pmd->port_mutex);
+    ovs_mutex_init(&pmd->bond_mutex);
     cmap_init(&pmd->flow_table);
     cmap_init(&pmd->classifiers);
     pmd->ctx.last_rxq = NULL;
     pmd_thread_ctx_time_update(pmd);
     pmd->next_optimization = pmd->ctx.now + DPCLS_OPTIMIZATION_INTERVAL;
+    pmd->next_rcu_quiesce = pmd->ctx.now + PMD_RCU_QUIESCE_INTERVAL;
     pmd->rxq_next_cycle_store = pmd->ctx.now + PMD_RXQ_INTERVAL_LEN;
     hmap_init(&pmd->poll_list);
     hmap_init(&pmd->tx_ports);
     hmap_init(&pmd->tnl_port_cache);
     hmap_init(&pmd->send_port_cache);
+    cmap_init(&pmd->tx_bonds);
     /* init the 'flow_cache' since there is no
      * actual thread created for NON_PMD_CORE_ID. */
     if (core_id == NON_PMD_CORE_ID) {
@@ -6141,6 +6555,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
     hmap_destroy(&pmd->send_port_cache);
     hmap_destroy(&pmd->tnl_port_cache);
     hmap_destroy(&pmd->tx_ports);
+    cmap_destroy(&pmd->tx_bonds);
     hmap_destroy(&pmd->poll_list);
     /* All flows (including their dpcls_rules) have been deleted already */
     CMAP_FOR_EACH (cls, node, &pmd->classifiers) {
@@ -6152,6 +6567,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
     ovs_mutex_destroy(&pmd->flow_mutex);
     seq_destroy(pmd->reload_seq);
     ovs_mutex_destroy(&pmd->port_mutex);
+    ovs_mutex_destroy(&pmd->bond_mutex);
     free(pmd);
 }
 
@@ -6221,6 +6637,7 @@ dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd)
 {
     struct rxq_poll *poll;
     struct tx_port *port;
+    struct tx_bond *tx;
 
     ovs_mutex_lock(&pmd->port_mutex);
     HMAP_FOR_EACH_POP (poll, node, &pmd->poll_list) {
@@ -6230,6 +6647,13 @@ dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd)
         free(port);
     }
     ovs_mutex_unlock(&pmd->port_mutex);
+
+    ovs_mutex_lock(&pmd->bond_mutex);
+    CMAP_FOR_EACH (tx, node, &pmd->tx_bonds) {
+        cmap_remove(&pmd->tx_bonds, &tx->node, hash_bond_id(tx->bond_id));
+        ovsrcu_postpone(free, tx);
+    }
+    ovs_mutex_unlock(&pmd->bond_mutex);
 }
 
 /* Adds rx queue to poll_list of PMD thread, if it's not there already. */
@@ -6305,6 +6729,62 @@ dp_netdev_del_port_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
     free(tx);
     pmd->need_reload = true;
 }
+
+/* Add bond to the tx bond cmap of 'pmd'. */
+static void
+dp_netdev_add_bond_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
+                             struct tx_bond *bond, bool update)
+    OVS_EXCLUDED(pmd->bond_mutex)
+{
+    struct tx_bond *tx;
+
+    ovs_mutex_lock(&pmd->bond_mutex);
+    tx = tx_bond_lookup(&pmd->tx_bonds, bond->bond_id);
+
+    if (tx && !update) {
+        /* It's not an update and the entry already exists.  Do nothing. */
+        goto unlock;
+    }
+
+    if (tx) {
+        struct tx_bond *new_tx = xmemdup(bond, sizeof *bond);
+
+        /* Copy the stats for each bucket. */
+        for (int i = 0; i < BOND_BUCKETS; i++) {
+            uint64_t n_packets, n_bytes;
+
+            atomic_read_relaxed(&tx->member_buckets[i].n_packets, &n_packets);
+            atomic_read_relaxed(&tx->member_buckets[i].n_bytes, &n_bytes);
+            atomic_init(&new_tx->member_buckets[i].n_packets, n_packets);
+            atomic_init(&new_tx->member_buckets[i].n_bytes, n_bytes);
+        }
+        cmap_replace(&pmd->tx_bonds, &tx->node, &new_tx->node,
+                     hash_bond_id(bond->bond_id));
+        ovsrcu_postpone(free, tx);
+    } else {
+        tx = xmemdup(bond, sizeof *bond);
+        cmap_insert(&pmd->tx_bonds, &tx->node, hash_bond_id(bond->bond_id));
+    }
+unlock:
+    ovs_mutex_unlock(&pmd->bond_mutex);
+}
+
+/* Delete bond from the tx bond cmap of 'pmd'. */
+static void
+dp_netdev_del_bond_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
+                               uint32_t bond_id)
+    OVS_EXCLUDED(pmd->bond_mutex)
+{
+    struct tx_bond *tx;
+
+    ovs_mutex_lock(&pmd->bond_mutex);
+    tx = tx_bond_lookup(&pmd->tx_bonds, bond_id);
+    if (tx) {
+        cmap_remove(&pmd->tx_bonds, &tx->node, hash_bond_id(tx->bond_id));
+        ovsrcu_postpone(free, tx);
+    }
+    ovs_mutex_unlock(&pmd->bond_mutex);
+}
 \f
 static char *
 dpif_netdev_get_datapath_version(void)
@@ -7130,6 +7610,97 @@ dp_execute_userspace_action(struct dp_netdev_pmd_thread *pmd,
     }
 }
 
+static bool
+dp_execute_output_action(struct dp_netdev_pmd_thread *pmd,
+                         struct dp_packet_batch *packets_,
+                         bool should_steal, odp_port_t port_no)
+{
+    struct tx_port *p = pmd_send_port_cache_lookup(pmd, port_no);
+    struct dp_packet_batch out;
+
+    if (!OVS_LIKELY(p)) {
+        COVERAGE_ADD(datapath_drop_invalid_port,
+                     dp_packet_batch_size(packets_));
+        dp_packet_delete_batch(packets_, should_steal);
+        return false;
+    }
+    if (!should_steal) {
+        dp_packet_batch_clone(&out, packets_);
+        dp_packet_batch_reset_cutlen(packets_);
+        packets_ = &out;
+    }
+    dp_packet_batch_apply_cutlen(packets_);
+#ifdef DPDK_NETDEV
+    if (OVS_UNLIKELY(!dp_packet_batch_is_empty(&p->output_pkts)
+                     && packets_->packets[0]->source
+                        != p->output_pkts.packets[0]->source)) {
+        /* XXX: netdev-dpdk assumes that all packets in a single
+         *      output batch has the same source. Flush here to
+         *      avoid memory access issues. */
+        dp_netdev_pmd_flush_output_on_port(pmd, p);
+    }
+#endif
+    if (dp_packet_batch_size(&p->output_pkts)
+        + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
+        /* Flush here to avoid overflow. */
+        dp_netdev_pmd_flush_output_on_port(pmd, p);
+    }
+    if (dp_packet_batch_is_empty(&p->output_pkts)) {
+        pmd->n_output_batches++;
+    }
+
+    struct dp_packet *packet;
+    DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
+        p->output_pkts_rxqs[dp_packet_batch_size(&p->output_pkts)] =
+            pmd->ctx.last_rxq;
+        dp_packet_batch_add(&p->output_pkts, packet);
+    }
+    return true;
+}
+
+static void
+dp_execute_lb_output_action(struct dp_netdev_pmd_thread *pmd,
+                            struct dp_packet_batch *packets_,
+                            bool should_steal, uint32_t bond)
+{
+    struct tx_bond *p_bond = tx_bond_lookup(&pmd->tx_bonds, bond);
+    struct dp_packet_batch out;
+    struct dp_packet *packet;
+
+    if (!p_bond) {
+        COVERAGE_ADD(datapath_drop_invalid_bond,
+                     dp_packet_batch_size(packets_));
+        dp_packet_delete_batch(packets_, should_steal);
+        return;
+    }
+    if (!should_steal) {
+        dp_packet_batch_clone(&out, packets_);
+        dp_packet_batch_reset_cutlen(packets_);
+        packets_ = &out;
+    }
+    dp_packet_batch_apply_cutlen(packets_);
+
+    DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
+        /*
+         * Lookup the bond-hash table using hash to get the member.
+         */
+        uint32_t hash = dp_packet_get_rss_hash(packet);
+        struct member_entry *s_entry
+            = &p_bond->member_buckets[hash & BOND_MASK];
+        odp_port_t bond_member = s_entry->member_id;
+        uint32_t size = dp_packet_size(packet);
+        struct dp_packet_batch output_pkt;
+
+        dp_packet_batch_init_packet(&output_pkt, packet);
+        if (OVS_LIKELY(dp_execute_output_action(pmd, &output_pkt, true,
+                                                bond_member))) {
+            /* Update member stats. */
+            non_atomic_ullong_add(&s_entry->n_packets, 1);
+            non_atomic_ullong_add(&s_entry->n_bytes, size);
+        }
+    }
+}
+
 static void
 dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
               const struct nlattr *a, bool should_steal)
@@ -7145,49 +7716,14 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
 
     switch ((enum ovs_action_attr)type) {
     case OVS_ACTION_ATTR_OUTPUT:
-        p = pmd_send_port_cache_lookup(pmd, nl_attr_get_odp_port(a));
-        if (OVS_LIKELY(p)) {
-            struct dp_packet *packet;
-            struct dp_packet_batch out;
-
-            if (!should_steal) {
-                dp_packet_batch_clone(&out, packets_);
-                dp_packet_batch_reset_cutlen(packets_);
-                packets_ = &out;
-            }
-            dp_packet_batch_apply_cutlen(packets_);
-
-#ifdef DPDK_NETDEV
-            if (OVS_UNLIKELY(!dp_packet_batch_is_empty(&p->output_pkts)
-                             && packets_->packets[0]->source
-                                != p->output_pkts.packets[0]->source)) {
-                /* XXX: netdev-dpdk assumes that all packets in a single
-                 *      output batch has the same source. Flush here to
-                 *      avoid memory access issues. */
-                dp_netdev_pmd_flush_output_on_port(pmd, p);
-            }
-#endif
-            if (dp_packet_batch_size(&p->output_pkts)
-                + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
-                /* Flush here to avoid overflow. */
-                dp_netdev_pmd_flush_output_on_port(pmd, p);
-            }
-
-            if (dp_packet_batch_is_empty(&p->output_pkts)) {
-                pmd->n_output_batches++;
-            }
+        dp_execute_output_action(pmd, packets_, should_steal,
+                                 nl_attr_get_odp_port(a));
+        return;
 
-            DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
-                p->output_pkts_rxqs[dp_packet_batch_size(&p->output_pkts)] =
-                                                             pmd->ctx.last_rxq;
-                dp_packet_batch_add(&p->output_pkts, packet);
-            }
-            return;
-        } else {
-            COVERAGE_ADD(datapath_drop_invalid_port,
-                         dp_packet_batch_size(packets_));
-        }
-        break;
+    case OVS_ACTION_ATTR_LB_OUTPUT:
+        dp_execute_lb_output_action(pmd, packets_, should_steal,
+                                    nl_attr_get_u32(a));
+        return;
 
     case OVS_ACTION_ATTR_TUNNEL_PUSH:
         if (should_steal) {
@@ -7329,6 +7865,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
         bool commit = false;
         unsigned int left;
         uint16_t zone = 0;
+        uint32_t tp_id = 0;
         const char *helper = NULL;
         const uint32_t *setmark = NULL;
         const struct ovs_key_ct_labels *setlabel = NULL;
@@ -7364,8 +7901,11 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                  * netlink events. */
                 break;
             case OVS_CT_ATTR_TIMEOUT:
-                /* Userspace datapath does not support customized timeout
-                 * policy yet. */
+                if (!str_to_uint(nl_attr_get_string(b), 10, &tp_id)) {
+                    VLOG_WARN("Invalid Timeout Policy ID: %s.",
+                              nl_attr_get_string(b));
+                    tp_id = DEFAULT_TP_ID;
+                }
                 break;
             case OVS_CT_ATTR_NAT: {
                 const struct nlattr *b_nest;
@@ -7451,7 +7991,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
         conntrack_execute(dp->conntrack, packets_, aux->flow->dl_type, force,
                           commit, zone, setmark, setlabel, aux->flow->tp_src,
                           aux->flow->tp_dst, helper, nat_action_info_ref,
-                          pmd->ctx.now / 1000);
+                          pmd->ctx.now / 1000, tp_id);
         break;
     }
 
@@ -7684,6 +8224,62 @@ dpif_netdev_ct_del_limits(struct dpif *dpif OVS_UNUSED,
     return err;
 }
 
+static int
+dpif_netdev_ct_set_timeout_policy(struct dpif *dpif,
+                                  const struct ct_dpif_timeout_policy *dpif_tp)
+{
+    struct timeout_policy tp;
+    struct dp_netdev *dp;
+
+    dp = get_dp_netdev(dpif);
+    memcpy(&tp.policy, dpif_tp, sizeof tp.policy);
+    return timeout_policy_update(dp->conntrack, &tp);
+}
+
+static int
+dpif_netdev_ct_get_timeout_policy(struct dpif *dpif, uint32_t tp_id,
+                                  struct ct_dpif_timeout_policy *dpif_tp)
+{
+    struct timeout_policy *tp;
+    struct dp_netdev *dp;
+    int err = 0;
+
+    dp = get_dp_netdev(dpif);
+    tp = timeout_policy_get(dp->conntrack, tp_id);
+    if (!tp) {
+        return ENOENT;
+    }
+    memcpy(dpif_tp, &tp->policy, sizeof tp->policy);
+    return err;
+}
+
+static int
+dpif_netdev_ct_del_timeout_policy(struct dpif *dpif,
+                                  uint32_t tp_id)
+{
+    struct dp_netdev *dp;
+    int err = 0;
+
+    dp = get_dp_netdev(dpif);
+    err = timeout_policy_delete(dp->conntrack, tp_id);
+    return err;
+}
+
+static int
+dpif_netdev_ct_get_timeout_policy_name(struct dpif *dpif OVS_UNUSED,
+                                       uint32_t tp_id,
+                                       uint16_t dl_type OVS_UNUSED,
+                                       uint8_t nw_proto OVS_UNUSED,
+                                       char **tp_name, bool *is_generic)
+{
+    struct ds ds = DS_EMPTY_INITIALIZER;
+
+    ds_put_format(&ds, "%"PRIu32, tp_id);
+    *tp_name = ds_steal_cstr(&ds);
+    *is_generic = true;
+    return 0;
+}
+
 static int
 dpif_netdev_ipf_set_enabled(struct dpif *dpif, bool v6, bool enable)
 {
@@ -7739,6 +8335,98 @@ dpif_netdev_ipf_dump_done(struct dpif *dpif OVS_UNUSED, void *ipf_dump_ctx)
 
 }
 
+static int
+dpif_netdev_bond_add(struct dpif *dpif, uint32_t bond_id,
+                     odp_port_t *member_map)
+{
+    struct tx_bond *new_tx = xzalloc(sizeof *new_tx);
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_pmd_thread *pmd;
+
+    /* Prepare new bond mapping. */
+    new_tx->bond_id = bond_id;
+    for (int bucket = 0; bucket < BOND_BUCKETS; bucket++) {
+        new_tx->member_buckets[bucket].member_id = member_map[bucket];
+    }
+
+    ovs_mutex_lock(&dp->bond_mutex);
+    /* Check if bond already existed. */
+    struct tx_bond *old_tx = tx_bond_lookup(&dp->tx_bonds, bond_id);
+    if (old_tx) {
+        cmap_replace(&dp->tx_bonds, &old_tx->node, &new_tx->node,
+                     hash_bond_id(bond_id));
+        ovsrcu_postpone(free, old_tx);
+    } else {
+        cmap_insert(&dp->tx_bonds, &new_tx->node, hash_bond_id(bond_id));
+    }
+    ovs_mutex_unlock(&dp->bond_mutex);
+
+    /* Update all PMDs with new bond mapping. */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        dp_netdev_add_bond_tx_to_pmd(pmd, new_tx, true);
+    }
+    return 0;
+}
+
+static int
+dpif_netdev_bond_del(struct dpif *dpif, uint32_t bond_id)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_pmd_thread *pmd;
+    struct tx_bond *tx;
+
+    ovs_mutex_lock(&dp->bond_mutex);
+    /* Check if bond existed. */
+    tx = tx_bond_lookup(&dp->tx_bonds, bond_id);
+    if (tx) {
+        cmap_remove(&dp->tx_bonds, &tx->node, hash_bond_id(bond_id));
+        ovsrcu_postpone(free, tx);
+    } else {
+        /* Bond is not present. */
+        ovs_mutex_unlock(&dp->bond_mutex);
+        return ENOENT;
+    }
+    ovs_mutex_unlock(&dp->bond_mutex);
+
+    /* Remove the bond map in all pmds. */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        dp_netdev_del_bond_tx_from_pmd(pmd, bond_id);
+    }
+    return 0;
+}
+
+static int
+dpif_netdev_bond_stats_get(struct dpif *dpif, uint32_t bond_id,
+                           uint64_t *n_bytes)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_pmd_thread *pmd;
+
+    if (!tx_bond_lookup(&dp->tx_bonds, bond_id)) {
+        return ENOENT;
+    }
+
+    /* Search the bond in all PMDs. */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        struct tx_bond *pmd_bond_entry
+            = tx_bond_lookup(&pmd->tx_bonds, bond_id);
+
+        if (!pmd_bond_entry) {
+            continue;
+        }
+
+        /* Read bond stats. */
+        for (int i = 0; i < BOND_BUCKETS; i++) {
+            uint64_t pmd_n_bytes;
+
+            atomic_read_relaxed(&pmd_bond_entry->member_buckets[i].n_bytes,
+                                &pmd_n_bytes);
+            n_bytes[i] += pmd_n_bytes;
+        }
+    }
+    return 0;
+}
+
 const struct dpif_class dpif_netdev_class = {
     "netdev",
     true,                       /* cleanup_required */
@@ -7794,13 +8482,13 @@ const struct dpif_class dpif_netdev_class = {
     dpif_netdev_ct_set_limits,
     dpif_netdev_ct_get_limits,
     dpif_netdev_ct_del_limits,
-    NULL,                       /* ct_set_timeout_policy */
-    NULL,                       /* ct_get_timeout_policy */
-    NULL,                       /* ct_del_timeout_policy */
+    dpif_netdev_ct_set_timeout_policy,
+    dpif_netdev_ct_get_timeout_policy,
+    dpif_netdev_ct_del_timeout_policy,
     NULL,                       /* ct_timeout_policy_dump_start */
     NULL,                       /* ct_timeout_policy_dump_next */
     NULL,                       /* ct_timeout_policy_dump_done */
-    NULL,                       /* ct_get_timeout_policy_name */
+    dpif_netdev_ct_get_timeout_policy_name,
     dpif_netdev_ipf_set_enabled,
     dpif_netdev_ipf_set_min_frag,
     dpif_netdev_ipf_set_max_nfrags,
@@ -7812,6 +8500,9 @@ const struct dpif_class dpif_netdev_class = {
     dpif_netdev_meter_set,
     dpif_netdev_meter_get,
     dpif_netdev_meter_del,
+    dpif_netdev_bond_add,
+    dpif_netdev_bond_del,
+    dpif_netdev_bond_stats_get,
 };
 
 static void
@@ -7984,13 +8675,11 @@ dpcls_create_subtable(struct dpcls *cls, const struct netdev_flow_key *mask)
     subtable->mf_masks = xmalloc(sizeof(uint64_t) * (unit0 + unit1));
     netdev_flow_key_gen_masks(mask, subtable->mf_masks, unit0, unit1);
 
-    /* Probe for a specialized generic lookup function. */
-    subtable->lookup_func = dpcls_subtable_generic_probe(unit0, unit1);
-
-    /* If not set, assign generic lookup. Generic works for any miniflow. */
-    if (!subtable->lookup_func) {
-        subtable->lookup_func = dpcls_subtable_lookup_generic;
-    }
+    /* Get the preferred subtable search function for this (u0,u1) subtable.
+     * The function is guaranteed to always return a valid implementation, and
+     * possibly an ISA optimized, and/or specialized implementation.
+     */
+    subtable->lookup_func = dpcls_subtable_get_best_impl(unit0, unit1);
 
     cmap_insert(&cls->subtables_map, &subtable->cmap_node, mask->hash);
     /* Add the new subtable at the end of the pvector (with no hits yet) */
@@ -8016,6 +8705,28 @@ dpcls_find_subtable(struct dpcls *cls, const struct netdev_flow_key *mask)
     return dpcls_create_subtable(cls, mask);
 }
 
+/* Checks for the best available implementation for each subtable lookup
+ * function, and assigns it as the lookup function pointer for each subtable.
+ * Returns the number of subtables that have changed lookup implementation.
+ */
+static uint32_t
+dpcls_subtable_lookup_reprobe(struct dpcls *cls)
+{
+    struct pvector *pvec = &cls->subtables;
+    uint32_t subtables_changed = 0;
+    struct dpcls_subtable *subtable = NULL;
+
+    PVECTOR_FOR_EACH (subtable, pvec) {
+        uint32_t u0_bits = subtable->mf_bits_set_unit0;
+        uint32_t u1_bits = subtable->mf_bits_set_unit1;
+        void *old_func = subtable->lookup_func;
+        subtable->lookup_func = dpcls_subtable_get_best_impl(u0_bits, u1_bits);
+        subtables_changed += (old_func != subtable->lookup_func);
+    }
+    pvector_publish(pvec);
+
+    return subtables_changed;
+}
 
 /* Periodically sort the dpcls subtable vectors according to hit counts */
 static void
@@ -8041,6 +8752,7 @@ dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd,
 
     if (pmd->ctx.now > pmd->rxq_next_cycle_store) {
         uint64_t curr_tsc;
+        uint8_t rebalance_load_trigger;
         struct pmd_auto_lb *pmd_alb = &pmd->dp->pmd_alb;
         if (pmd_alb->is_enabled && !pmd->isolated
             && (pmd->perf_stats.counters.n[PMD_CYCLES_ITER_IDLE] >=
@@ -8057,7 +8769,9 @@ dp_netdev_pmd_try_optimize(struct dp_netdev_pmd_thread *pmd,
                 pmd_load = ((tot_proc * 100) / (tot_idle + tot_proc));
             }
 
-            if (pmd_load >= ALB_PMD_LOAD_THRESHOLD) {
+            atomic_read_relaxed(&pmd_alb->rebalance_load_thresh,
+                                &rebalance_load_trigger);
+            if (pmd_load >= rebalance_load_trigger) {
                 atomic_count_inc(&pmd->pmd_overloaded);
             } else {
                 atomic_count_set(&pmd->pmd_overloaded, 0);