]> git.proxmox.com Git - mirror_ovs.git/blobdiff - ofproto/ofproto-dpif-upcall.c
cirrus: Use FreeBSD 12.2.
[mirror_ovs.git] / ofproto / ofproto-dpif-upcall.c
index e4473080ad659bf985987deaca21ebe511803be0..5fae46adfc25b89a0a5836d1928c51912c96d8d1 100644 (file)
@@ -22,6 +22,7 @@
 #include "connmgr.h"
 #include "coverage.h"
 #include "cmap.h"
+#include "lib/dpif-provider.h"
 #include "dpif.h"
 #include "openvswitch/dynamic-string.h"
 #include "fail-open.h"
 #include "ofproto-dpif-ipfix.h"
 #include "ofproto-dpif-sflow.h"
 #include "ofproto-dpif-xlate.h"
+#include "ofproto-dpif-xlate-cache.h"
+#include "ofproto-dpif-trace.h"
 #include "ovs-rcu.h"
 #include "packets.h"
-#include "poll-loop.h"
+#include "openvswitch/poll-loop.h"
 #include "seq.h"
+#include "tunnel.h"
 #include "unixctl.h"
 #include "openvswitch/vlog.h"
+#include "lib/netdev-provider.h"
 
-#define MAX_QUEUE_LENGTH 512
 #define UPCALL_MAX_BATCH 64
 #define REVALIDATE_MAX_BATCH 50
 
@@ -50,7 +54,10 @@ COVERAGE_DEFINE(dumped_duplicate_flow);
 COVERAGE_DEFINE(dumped_new_flow);
 COVERAGE_DEFINE(handler_duplicate_upcall);
 COVERAGE_DEFINE(upcall_ukey_contention);
+COVERAGE_DEFINE(upcall_ukey_replace);
 COVERAGE_DEFINE(revalidate_missed_dp_flow);
+COVERAGE_DEFINE(upcall_flow_limit_hit);
+COVERAGE_DEFINE(upcall_flow_limit_kill);
 
 /* A thread that reads upcalls from dpif, forwards each upcall's packet,
  * and possibly sets up a kernel flow as a cache. */
@@ -176,14 +183,18 @@ struct udpif {
     uint64_t conn_seq;                 /* Corresponds to 'dump_seq' when
                                           conns[n_conns-1] was stored. */
     size_t n_conns;                    /* Number of connections waiting. */
+
+    long long int offload_rebalance_time;  /* Time of last offload rebalance */
 };
 
 enum upcall_type {
     BAD_UPCALL,                 /* Some kind of bug somewhere. */
     MISS_UPCALL,                /* A flow miss.  */
+    SLOW_PATH_UPCALL,           /* Slow path upcall.  */
     SFLOW_UPCALL,               /* sFlow sample. */
     FLOW_SAMPLE_UPCALL,         /* Per-flow sampling. */
-    IPFIX_UPCALL                /* Per-bridge sampling. */
+    IPFIX_UPCALL,               /* Per-bridge sampling. */
+    CONTROLLER_UPCALL           /* Destined for the controller. */
 };
 
 enum reval_result {
@@ -201,15 +212,16 @@ struct upcall {
      * dpif-netdev.  If a modification is absolutely necessary, a const cast
      * may be used with other datapaths. */
     const struct flow *flow;       /* Parsed representation of the packet. */
+    enum odp_key_fitness fitness;  /* Fitness of 'flow' relative to ODP key. */
     const ovs_u128 *ufid;          /* Unique identifier for 'flow'. */
     unsigned pmd_id;               /* Datapath poll mode driver id. */
     const struct dp_packet *packet;   /* Packet associated with this upcall. */
-    ofp_port_t in_port;            /* OpenFlow in port, or OFPP_NONE. */
+    ofp_port_t ofp_in_port;        /* OpenFlow in port, or OFPP_NONE. */
     uint16_t mru;                  /* If !0, Maximum receive unit of
                                       fragmented IP packet */
+    uint64_t hash;
 
-    enum dpif_upcall_type type;    /* Datapath type of the upcall. */
-    const struct nlattr *userdata; /* Userdata for DPIF_UC_ACTION Upcalls. */
+    enum upcall_type type;         /* Type of the upcall. */
     const struct nlattr *actions;  /* Flow actions in DPIF_UC_ACTION Upcalls. */
 
     bool xout_initialized;         /* True if 'xout' must be uninitialized. */
@@ -225,7 +237,6 @@ struct upcall {
     bool ukey_persists;            /* Set true to keep 'ukey' beyond the
                                       lifetime of this upcall. */
 
-    uint64_t dump_seq;             /* udpif->dump_seq at translation time. */
     uint64_t reval_seq;            /* udpif->reval_seq at translation time. */
 
     /* Not used by the upcall callback interface. */
@@ -233,9 +244,22 @@ struct upcall {
     size_t key_len;                /* Datapath flow key length. */
     const struct nlattr *out_tun_key;  /* Datapath output tunnel key. */
 
+    struct user_action_cookie cookie;
+
     uint64_t odp_actions_stub[1024 / 8]; /* Stub for odp_actions. */
 };
 
+/* Ukeys must transition through these states using transition_ukey(). */
+enum ukey_state {
+    UKEY_CREATED = 0,
+    UKEY_VISIBLE,       /* Ukey is in umap, datapath flow install is queued. */
+    UKEY_OPERATIONAL,   /* Ukey is in umap, datapath flow is installed. */
+    UKEY_EVICTING,      /* Ukey is in umap, datapath flow delete is queued. */
+    UKEY_EVICTED,       /* Ukey is in umap, datapath flow is deleted. */
+    UKEY_DELETED,       /* Ukey removed from umap, ukey free is deferred. */
+};
+#define N_UKEY_STATES (UKEY_DELETED + 1)
+
 /* 'udpif_key's are responsible for tracking the little bit of state udpif
  * needs to do flow expiration which can't be pulled directly from the
  * datapath.  They may be created by any handler or revalidator thread at any
@@ -266,8 +290,12 @@ struct udpif_key {
     long long int created OVS_GUARDED;        /* Estimate of creation time. */
     uint64_t dump_seq OVS_GUARDED;            /* Tracks udpif->dump_seq. */
     uint64_t reval_seq OVS_GUARDED;           /* Tracks udpif->reval_seq. */
-    bool flow_exists OVS_GUARDED;             /* Ensures flows are only deleted
-                                                 once. */
+    enum ukey_state state OVS_GUARDED;        /* Tracks ukey lifetime. */
+
+    /* 'state' debug information. */
+    unsigned int state_thread OVS_GUARDED;    /* Thread that transitions. */
+    const char *state_where OVS_GUARDED;      /* transition_ukey() locator. */
+
     /* Datapath flow actions as nlattrs.  Protected by RCU.  Read with
      * ukey_get_actions(), and write with ukey_set_actions(). */
     OVSRCU_TYPE(struct ofpbuf *) actions;
@@ -282,6 +310,14 @@ struct udpif_key {
 
     uint32_t key_recirc_id;   /* Non-zero if reference is held by the ukey. */
     struct recirc_refs recircs;  /* Action recirc IDs with references held. */
+
+#define OFFL_REBAL_INTVL_MSEC  3000    /* dynamic offload rebalance freq */
+    struct netdev *in_netdev;          /* in_odp_port's netdev */
+    bool offloaded;                    /* True if flow is offloaded */
+    uint64_t flow_pps_rate;            /* Packets-Per-Second rate */
+    long long int flow_time;           /* last pps update time */
+    uint64_t flow_packets;             /* #pkts seen in interval */
+    uint64_t flow_backlog_packets;     /* prev-mode #pkts (offl or kernel) */
 };
 
 /* Datapath operation with optional ukey attached. */
@@ -298,7 +334,7 @@ static size_t recv_upcalls(struct handler *);
 static int process_upcall(struct udpif *, struct upcall *,
                           struct ofpbuf *odp_actions, struct flow_wildcards *);
 static void handle_upcalls(struct udpif *, struct upcall *, size_t n_upcalls);
-static void udpif_stop_threads(struct udpif *);
+static void udpif_stop_threads(struct udpif *, bool delete_flows);
 static void udpif_start_threads(struct udpif *, size_t n_handlers,
                                 size_t n_revalidators);
 static void udpif_pause_revalidators(struct udpif *);
@@ -334,9 +370,14 @@ static int ukey_create_from_dpif_flow(const struct udpif *,
                                       struct udpif_key **);
 static void ukey_get_actions(struct udpif_key *, const struct nlattr **actions,
                              size_t *size);
-static bool ukey_install_start(struct udpif *, struct udpif_key *ukey);
-static bool ukey_install_finish(struct udpif_key *ukey, int error);
+static bool ukey_install__(struct udpif *, struct udpif_key *ukey)
+    OVS_TRY_LOCK(true, ukey->mutex);
 static bool ukey_install(struct udpif *udpif, struct udpif_key *ukey);
+static void transition_ukey_at(struct udpif_key *ukey, enum ukey_state dst,
+                               const char *where)
+    OVS_REQUIRES(ukey->mutex);
+#define transition_ukey(UKEY, DST) \
+    transition_ukey_at(UKEY, DST, OVS_SOURCE_LOCATOR)
 static struct udpif_key *ukey_lookup(struct udpif *udpif,
                                      const ovs_u128 *ufid,
                                      const unsigned pmd_id);
@@ -345,7 +386,13 @@ static int ukey_acquire(struct udpif *, const struct dpif_flow *,
 static void ukey_delete__(struct udpif_key *);
 static void ukey_delete(struct umap *, struct udpif_key *);
 static enum upcall_type classify_upcall(enum dpif_upcall_type type,
-                                        const struct nlattr *userdata);
+                                        const struct nlattr *userdata,
+                                        struct user_action_cookie *cookie);
+
+static void put_op_init(struct ukey_op *op, struct udpif_key *ukey,
+                        enum dpif_flow_put_flags flags);
+static void delete_op_init(struct udpif *udpif, struct ukey_op *op,
+                           struct udpif_key *ukey);
 
 static int upcall_receive(struct upcall *, const struct dpif_backer *,
                           const struct dp_packet *packet, enum dpif_upcall_type,
@@ -354,6 +401,12 @@ static int upcall_receive(struct upcall *, const struct dpif_backer *,
                           const ovs_u128 *ufid, const unsigned pmd_id);
 static void upcall_uninit(struct upcall *);
 
+static void udpif_flow_rebalance(struct udpif *udpif);
+static int udpif_flow_program(struct udpif *udpif, struct udpif_key *ukey,
+                              enum dpif_offload_type offload_type);
+static int udpif_flow_unprogram(struct udpif *udpif, struct udpif_key *ukey,
+                                enum dpif_offload_type offload_type);
+
 static upcall_callback upcall_cb;
 static dp_purge_callback dp_purge_cb;
 
@@ -375,8 +428,8 @@ udpif_init(void)
                                  upcall_unixctl_disable_ufid, NULL);
         unixctl_command_register("upcall/enable-ufid", "", 0, 0,
                                  upcall_unixctl_enable_ufid, NULL);
-        unixctl_command_register("upcall/set-flow-limit", "", 1, 1,
-                                 upcall_unixctl_set_flow_limit, NULL);
+        unixctl_command_register("upcall/set-flow-limit", "flow-limit-number",
+                                 1, 1, upcall_unixctl_set_flow_limit, NULL);
         unixctl_command_register("revalidator/wait", "", 0, 0,
                                  upcall_unixctl_dump_wait, NULL);
         unixctl_command_register("revalidator/purge", "", 0, 0,
@@ -432,7 +485,7 @@ udpif_run(struct udpif *udpif)
 void
 udpif_destroy(struct udpif *udpif)
 {
-    udpif_stop_threads(udpif);
+    udpif_stop_threads(udpif, false);
 
     dpif_register_dp_purge_cb(udpif->dpif, NULL, udpif);
     dpif_register_upcall_cb(udpif->dpif, NULL, udpif);
@@ -453,34 +506,38 @@ udpif_destroy(struct udpif *udpif)
     free(udpif);
 }
 
-/* Stops the handler and revalidator threads, must be enclosed in
- * ovsrcu quiescent state unless when destroying udpif. */
+/* Stops the handler and revalidator threads.
+ *
+ * If 'delete_flows' is true, we delete ukeys and delete all flows from the
+ * datapath.  Otherwise, we end up double-counting stats for flows that remain
+ * in the datapath.  If 'delete_flows' is false, we skip this step.  This is
+ * appropriate if OVS is about to exit anyway and it is desirable to let
+ * existing network connections continue being forwarded afterward. */
 static void
-udpif_stop_threads(struct udpif *udpif)
+udpif_stop_threads(struct udpif *udpif, bool delete_flows)
 {
     if (udpif && (udpif->n_handlers != 0 || udpif->n_revalidators != 0)) {
         size_t i;
 
+        /* Tell the threads to exit. */
         latch_set(&udpif->exit_latch);
 
+        /* Wait for the threads to exit.  Quiesce because this can take a long
+         * time.. */
+        ovsrcu_quiesce_start();
         for (i = 0; i < udpif->n_handlers; i++) {
-            struct handler *handler = &udpif->handlers[i];
-
-            xpthread_join(handler->thread, NULL);
+            xpthread_join(udpif->handlers[i].thread, NULL);
         }
-
         for (i = 0; i < udpif->n_revalidators; i++) {
             xpthread_join(udpif->revalidators[i].thread, NULL);
         }
-
         dpif_disable_upcall(udpif->dpif);
+        ovsrcu_quiesce_end();
 
-        for (i = 0; i < udpif->n_revalidators; i++) {
-            struct revalidator *revalidator = &udpif->revalidators[i];
-
-            /* Delete ukeys, and delete all flows from the datapath to prevent
-             * double-counting stats. */
-            revalidator_purge(revalidator);
+        if (delete_flows) {
+            for (i = 0; i < udpif->n_revalidators; i++) {
+                revalidator_purge(&udpif->revalidators[i]);
+            }
         }
 
         latch_poll(&udpif->exit_latch);
@@ -498,21 +555,21 @@ udpif_stop_threads(struct udpif *udpif)
     }
 }
 
-/* Starts the handler and revalidator threads, must be enclosed in
- * ovsrcu quiescent state. */
+/* Starts the handler and revalidator threads. */
 static void
-udpif_start_threads(struct udpif *udpif, size_t n_handlers,
-                    size_t n_revalidators)
+udpif_start_threads(struct udpif *udpif, size_t n_handlers_,
+                    size_t n_revalidators_)
 {
-    if (udpif && n_handlers && n_revalidators) {
-        size_t i;
-        bool enable_ufid;
+    if (udpif && n_handlers_ && n_revalidators_) {
+        /* Creating a thread can take a significant amount of time on some
+         * systems, even hundred of milliseconds, so quiesce around it. */
+        ovsrcu_quiesce_start();
 
-        udpif->n_handlers = n_handlers;
-        udpif->n_revalidators = n_revalidators;
+        udpif->n_handlers = n_handlers_;
+        udpif->n_revalidators = n_revalidators_;
 
         udpif->handlers = xzalloc(udpif->n_handlers * sizeof *udpif->handlers);
-        for (i = 0; i < udpif->n_handlers; i++) {
+        for (size_t i = 0; i < udpif->n_handlers; i++) {
             struct handler *handler = &udpif->handlers[i];
 
             handler->udpif = udpif;
@@ -521,23 +578,24 @@ udpif_start_threads(struct udpif *udpif, size_t n_handlers,
                 "handler", udpif_upcall_handler, handler);
         }
 
-        enable_ufid = ofproto_dpif_get_enable_ufid(udpif->backer);
-        atomic_init(&udpif->enable_ufid, enable_ufid);
+        atomic_init(&udpif->enable_ufid, udpif->backer->rt_support.ufid);
         dpif_enable_upcall(udpif->dpif);
 
         ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators);
         ovs_barrier_init(&udpif->pause_barrier, udpif->n_revalidators + 1);
         udpif->reval_exit = false;
         udpif->pause = false;
+        udpif->offload_rebalance_time = time_msec();
         udpif->revalidators = xzalloc(udpif->n_revalidators
                                       * sizeof *udpif->revalidators);
-        for (i = 0; i < udpif->n_revalidators; i++) {
+        for (size_t i = 0; i < udpif->n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
 
             revalidator->udpif = udpif;
             revalidator->thread = ovs_thread_create(
                 "revalidator", udpif_revalidator, revalidator);
         }
+        ovsrcu_quiesce_end();
     }
 }
 
@@ -547,7 +605,7 @@ udpif_start_threads(struct udpif *udpif, size_t n_handlers,
 static void
 udpif_pause_revalidators(struct udpif *udpif)
 {
-    if (ofproto_dpif_backer_enabled(udpif->backer)) {
+    if (udpif->backer->recv_set_enable) {
         latch_set(&udpif->pause_latch);
         ovs_barrier_block(&udpif->pause_barrier);
     }
@@ -558,61 +616,40 @@ udpif_pause_revalidators(struct udpif *udpif)
 static void
 udpif_resume_revalidators(struct udpif *udpif)
 {
-    if (ofproto_dpif_backer_enabled(udpif->backer)) {
+    if (udpif->backer->recv_set_enable) {
         latch_poll(&udpif->pause_latch);
         ovs_barrier_block(&udpif->pause_barrier);
     }
 }
 
 /* Tells 'udpif' how many threads it should use to handle upcalls.
- * 'n_handlers' and 'n_revalidators' can never be zero.  'udpif''s
+ * 'n_handlers_' and 'n_revalidators_' can never be zero.  'udpif''s
  * datapath handle must have packet reception enabled before starting
  * threads. */
 void
-udpif_set_threads(struct udpif *udpif, size_t n_handlers,
-                  size_t n_revalidators)
+udpif_set_threads(struct udpif *udpif, size_t n_handlers_,
+                  size_t n_revalidators_)
 {
     ovs_assert(udpif);
-    ovs_assert(n_handlers && n_revalidators);
+    ovs_assert(n_handlers_ && n_revalidators_);
 
-    ovsrcu_quiesce_start();
-    if (udpif->n_handlers != n_handlers
-        || udpif->n_revalidators != n_revalidators) {
-        udpif_stop_threads(udpif);
+    if (udpif->n_handlers != n_handlers_
+        || udpif->n_revalidators != n_revalidators_) {
+        udpif_stop_threads(udpif, true);
     }
 
     if (!udpif->handlers && !udpif->revalidators) {
         int error;
 
-        error = dpif_handlers_set(udpif->dpif, n_handlers);
+        error = dpif_handlers_set(udpif->dpif, n_handlers_);
         if (error) {
             VLOG_ERR("failed to configure handlers in dpif %s: %s",
                      dpif_name(udpif->dpif), ovs_strerror(error));
             return;
         }
 
-        udpif_start_threads(udpif, n_handlers, n_revalidators);
+        udpif_start_threads(udpif, n_handlers_, n_revalidators_);
     }
-    ovsrcu_quiesce_end();
-}
-
-/* Waits for all ongoing upcall translations to complete.  This ensures that
- * there are no transient references to any removed ofprotos (or other
- * objects).  In particular, this should be called after an ofproto is removed
- * (e.g. via xlate_remove_ofproto()) but before it is destroyed. */
-void
-udpif_synchronize(struct udpif *udpif)
-{
-    /* This is stronger than necessary.  It would be sufficient to ensure
-     * (somehow) that each handler and revalidator thread had passed through
-     * its main loop once. */
-    size_t n_handlers = udpif->n_handlers;
-    size_t n_revalidators = udpif->n_revalidators;
-
-    ovsrcu_quiesce_start();
-    udpif_stop_threads(udpif);
-    udpif_start_threads(udpif, n_handlers, n_revalidators);
-    ovsrcu_quiesce_end();
 }
 
 /* Notifies 'udpif' that something changed which may render previous
@@ -649,18 +686,12 @@ udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
 void
 udpif_flush(struct udpif *udpif)
 {
-    size_t n_handlers, n_revalidators;
-
-    n_handlers = udpif->n_handlers;
-    n_revalidators = udpif->n_revalidators;
-
-    ovsrcu_quiesce_start();
+    size_t n_handlers_ = udpif->n_handlers;
+    size_t n_revalidators_ = udpif->n_revalidators;
 
-    udpif_stop_threads(udpif);
+    udpif_stop_threads(udpif, true);
     dpif_flow_flush(udpif->dpif);
-    udpif_start_threads(udpif, n_handlers, n_revalidators);
-
-    ovsrcu_quiesce_end();
+    udpif_start_threads(udpif, n_handlers_, n_revalidators_);
 }
 
 /* Removes all flows from all datapaths. */
@@ -680,7 +711,7 @@ udpif_use_ufid(struct udpif *udpif)
     bool enable;
 
     atomic_read_relaxed(&enable_ufid, &enable);
-    return enable && ofproto_dpif_get_enable_ufid(udpif->backer);
+    return enable && udpif->backer->rt_support.ufid;
 }
 
 \f
@@ -745,7 +776,8 @@ recv_upcalls(struct handler *handler)
         struct dpif_upcall *dupcall = &dupcalls[n_upcalls];
         struct upcall *upcall = &upcalls[n_upcalls];
         struct flow *flow = &flows[n_upcalls];
-        unsigned int mru;
+        unsigned int mru = 0;
+        uint64_t hash = 0;
         int error;
 
         ofpbuf_use_stub(recv_buf, recv_stubs[n_upcalls],
@@ -755,15 +787,18 @@ recv_upcalls(struct handler *handler)
             break;
         }
 
-        if (odp_flow_key_to_flow(dupcall->key, dupcall->key_len, flow)
-            == ODP_FIT_ERROR) {
+        upcall->fitness = odp_flow_key_to_flow(dupcall->key, dupcall->key_len,
+                                               flow, NULL);
+        if (upcall->fitness == ODP_FIT_ERROR) {
             goto free_dupcall;
         }
 
         if (dupcall->mru) {
             mru = nl_attr_get_u16(dupcall->mru);
-        } else {
-            mru = 0;
+        }
+
+        if (dupcall->hash) {
+            hash = nl_attr_get_u64(dupcall->hash);
         }
 
         error = upcall_receive(upcall, udpif->backer, &dupcall->packet,
@@ -787,6 +822,7 @@ recv_upcalls(struct handler *handler)
         upcall->key = dupcall->key;
         upcall->key_len = dupcall->key_len;
         upcall->ufid = &dupcall->ufid;
+        upcall->hash = hash;
 
         upcall->out_tun_key = dupcall->out_tun_key;
         upcall->actions = dupcall->actions;
@@ -822,6 +858,26 @@ free_dupcall:
     return n_upcalls;
 }
 
+static void
+udpif_run_flow_rebalance(struct udpif *udpif)
+{
+    long long int now = 0;
+
+    /* Don't rebalance if OFFL_REBAL_INTVL_MSEC have not elapsed */
+    now = time_msec();
+    if (now < udpif->offload_rebalance_time + OFFL_REBAL_INTVL_MSEC) {
+        return;
+    }
+
+    if (!netdev_any_oor()) {
+        return;
+    }
+
+    VLOG_DBG("Offload rebalance: Found OOR netdevs");
+    udpif->offload_rebalance_time = now;
+    udpif_flow_rebalance(udpif);
+}
+
 static void *
 udpif_revalidator(void *arg)
 {
@@ -865,7 +921,8 @@ udpif_revalidator(void *arg)
                 bool terse_dump;
 
                 terse_dump = udpif_use_ufid(udpif);
-                udpif->dump = dpif_flow_dump_create(udpif->dpif, terse_dump);
+                udpif->dump = dpif_flow_dump_create(udpif->dpif, terse_dump,
+                                                    NULL);
             }
         }
 
@@ -895,6 +952,9 @@ udpif_revalidator(void *arg)
 
             dpif_flow_dump_destroy(udpif->dump);
             seq_change(udpif->dump_seq);
+            if (netdev_is_offload_rebalance_policy_enabled()) {
+                udpif_run_flow_rebalance(udpif);
+            }
 
             duration = MAX(time_msec() - start_time, 1);
             udpif->dump_duration = duration;
@@ -902,8 +962,8 @@ udpif_revalidator(void *arg)
                 flow_limit /= duration / 1000;
             } else if (duration > 1300) {
                 flow_limit = flow_limit * 3 / 4;
-            } else if (duration < 1000 && n_flows > 2000
-                       && flow_limit < n_flows * 1000 / duration) {
+            } else if (duration < 1000 &&
+                       flow_limit < n_flows * 1000 / duration) {
                 flow_limit += 1000;
             }
             flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
@@ -914,11 +974,27 @@ udpif_revalidator(void *arg)
                           duration);
             }
 
-            poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
+            poll_timer_wait_until(start_time + MIN(ofproto_max_idle,
+                                                   ofproto_max_revalidator));
             seq_wait(udpif->reval_seq, last_reval_seq);
             latch_wait(&udpif->exit_latch);
             latch_wait(&udpif->pause_latch);
             poll_block();
+
+            if (!latch_is_set(&udpif->pause_latch) &&
+                !latch_is_set(&udpif->exit_latch)) {
+                long long int now = time_msec();
+                /* Block again if we are woken up within 5ms of the last start
+                 * time. */
+                start_time += 5;
+
+                if (now < start_time) {
+                    poll_timer_wait_until(start_time);
+                    latch_wait(&udpif->exit_latch);
+                    latch_wait(&udpif->pause_latch);
+                    poll_block();
+                }
+            }
         }
     }
 
@@ -926,11 +1002,9 @@ udpif_revalidator(void *arg)
 }
 \f
 static enum upcall_type
-classify_upcall(enum dpif_upcall_type type, const struct nlattr *userdata)
+classify_upcall(enum dpif_upcall_type type, const struct nlattr *userdata,
+                struct user_action_cookie *cookie)
 {
-    union user_action_cookie cookie;
-    size_t userdata_len;
-
     /* First look at the upcall type. */
     switch (type) {
     case DPIF_UC_ACTION:
@@ -950,30 +1024,27 @@ classify_upcall(enum dpif_upcall_type type, const struct nlattr *userdata)
         VLOG_WARN_RL(&rl, "action upcall missing cookie");
         return BAD_UPCALL;
     }
-    userdata_len = nl_attr_get_size(userdata);
-    if (userdata_len < sizeof cookie.type
-        || userdata_len > sizeof cookie) {
+
+    size_t userdata_len = nl_attr_get_size(userdata);
+    if (userdata_len != sizeof *cookie) {
         VLOG_WARN_RL(&rl, "action upcall cookie has unexpected size %"PRIuSIZE,
                      userdata_len);
         return BAD_UPCALL;
     }
-    memset(&cookie, 0, sizeof cookie);
-    memcpy(&cookie, nl_attr_get(userdata), userdata_len);
-    if (userdata_len == MAX(8, sizeof cookie.sflow)
-        && cookie.type == USER_ACTION_COOKIE_SFLOW) {
+    memcpy(cookie, nl_attr_get(userdata), sizeof *cookie);
+    if (cookie->type == USER_ACTION_COOKIE_SFLOW) {
         return SFLOW_UPCALL;
-    } else if (userdata_len == MAX(8, sizeof cookie.slow_path)
-               && cookie.type == USER_ACTION_COOKIE_SLOW_PATH) {
-        return MISS_UPCALL;
-    } else if (userdata_len == MAX(8, sizeof cookie.flow_sample)
-               && cookie.type == USER_ACTION_COOKIE_FLOW_SAMPLE) {
+    } else if (cookie->type == USER_ACTION_COOKIE_SLOW_PATH) {
+        return SLOW_PATH_UPCALL;
+    } else if (cookie->type == USER_ACTION_COOKIE_FLOW_SAMPLE) {
         return FLOW_SAMPLE_UPCALL;
-    } else if (userdata_len == MAX(8, sizeof cookie.ipfix)
-               && cookie.type == USER_ACTION_COOKIE_IPFIX) {
+    } else if (cookie->type == USER_ACTION_COOKIE_IPFIX) {
         return IPFIX_UPCALL;
+    } else if (cookie->type == USER_ACTION_COOKIE_CONTROLLER) {
+        return CONTROLLER_UPCALL;
     } else {
         VLOG_WARN_RL(&rl, "invalid user cookie of type %"PRIu16
-                     " and size %"PRIuSIZE, cookie.type, userdata_len);
+                     " and size %"PRIuSIZE, cookie->type, userdata_len);
         return BAD_UPCALL;
     }
 }
@@ -982,23 +1053,43 @@ classify_upcall(enum dpif_upcall_type type, const struct nlattr *userdata)
  * initialized with at least 128 bytes of space. */
 static void
 compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
-                  const struct flow *flow, odp_port_t odp_in_port,
-                  struct ofpbuf *buf)
+                  odp_port_t odp_in_port, ofp_port_t ofp_in_port,
+                  struct ofpbuf *buf, uint32_t meter_id,
+                  struct uuid *ofproto_uuid)
 {
-    union user_action_cookie cookie;
+    struct user_action_cookie cookie;
     odp_port_t port;
     uint32_t pid;
 
+    memset(&cookie, 0, sizeof cookie);
     cookie.type = USER_ACTION_COOKIE_SLOW_PATH;
-    cookie.slow_path.unused = 0;
+    cookie.ofp_in_port = ofp_in_port;
+    cookie.ofproto_uuid = *ofproto_uuid;
     cookie.slow_path.reason = xout->slow;
 
     port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
         ? ODPP_NONE
         : odp_in_port;
-    pid = dpif_port_get_pid(udpif->dpif, port, flow_hash_5tuple(flow, 0));
-    odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path,
-                             ODPP_NONE, false, buf);
+    pid = dpif_port_get_pid(udpif->dpif, port);
+
+    size_t offset;
+    size_t ac_offset;
+    if (meter_id != UINT32_MAX) {
+        /* If slowpath meter is configured, generate clone(meter, userspace)
+         * action. */
+        offset = nl_msg_start_nested(buf, OVS_ACTION_ATTR_SAMPLE);
+        nl_msg_put_u32(buf, OVS_SAMPLE_ATTR_PROBABILITY, UINT32_MAX);
+        ac_offset = nl_msg_start_nested(buf, OVS_SAMPLE_ATTR_ACTIONS);
+        nl_msg_put_u32(buf, OVS_ACTION_ATTR_METER, meter_id);
+    }
+
+    odp_put_userspace_action(pid, &cookie, sizeof cookie,
+                             ODPP_NONE, false, buf, NULL);
+
+    if (meter_id != UINT32_MAX) {
+        nl_msg_end_nested(buf, ac_offset);
+        nl_msg_end_nested(buf, offset);
+    }
 }
 
 /* If there is no error, the upcall must be destroyed with upcall_uninit()
@@ -1014,10 +1105,26 @@ upcall_receive(struct upcall *upcall, const struct dpif_backer *backer,
 {
     int error;
 
-    error = xlate_lookup(backer, flow, &upcall->ofproto, &upcall->ipfix,
-                         &upcall->sflow, NULL, &upcall->in_port);
-    if (error) {
-        return error;
+    upcall->type = classify_upcall(type, userdata, &upcall->cookie);
+    if (upcall->type == BAD_UPCALL) {
+        return EAGAIN;
+    } else if (upcall->type == MISS_UPCALL) {
+        error = xlate_lookup(backer, flow, &upcall->ofproto, &upcall->ipfix,
+                             &upcall->sflow, NULL, &upcall->ofp_in_port);
+        if (error) {
+            return error;
+        }
+    } else {
+        struct ofproto_dpif *ofproto
+            = ofproto_dpif_lookup_by_uuid(&upcall->cookie.ofproto_uuid);
+        if (!ofproto) {
+            VLOG_INFO_RL(&rl, "upcall could not find ofproto");
+            return ENODEV;
+        }
+        upcall->ofproto = ofproto;
+        upcall->ipfix = ofproto->ipfix;
+        upcall->sflow = ofproto->sflow;
+        upcall->ofp_in_port = upcall->cookie.ofp_in_port;
     }
 
     upcall->recirc = NULL;
@@ -1026,8 +1133,6 @@ upcall_receive(struct upcall *upcall, const struct dpif_backer *backer,
     upcall->packet = packet;
     upcall->ufid = ufid;
     upcall->pmd_id = pmd_id;
-    upcall->type = type;
-    upcall->userdata = userdata;
     ofpbuf_use_stub(&upcall->odp_actions, upcall->odp_actions_stub,
                     sizeof upcall->odp_actions_stub);
     ofpbuf_init(&upcall->put_actions, 0);
@@ -1051,17 +1156,21 @@ upcall_xlate(struct udpif *udpif, struct upcall *upcall,
              struct ofpbuf *odp_actions, struct flow_wildcards *wc)
 {
     struct dpif_flow_stats stats;
+    enum xlate_error xerr;
     struct xlate_in xin;
+    struct ds output;
 
     stats.n_packets = 1;
     stats.n_bytes = dp_packet_size(upcall->packet);
     stats.used = time_msec();
     stats.tcp_flags = ntohs(upcall->flow->tcp_flags);
 
-    xlate_in_init(&xin, upcall->ofproto, upcall->flow, upcall->in_port, NULL,
+    xlate_in_init(&xin, upcall->ofproto,
+                  ofproto_dpif_get_tables_version(upcall->ofproto),
+                  upcall->flow, upcall->ofp_in_port, NULL,
                   stats.tcp_flags, upcall->packet, wc, odp_actions);
 
-    if (upcall->type == DPIF_UC_MISS) {
+    if (upcall->type == MISS_UPCALL) {
         xin.resubmit_stats = &stats;
 
         if (xin.frozen_state) {
@@ -1081,10 +1190,26 @@ upcall_xlate(struct udpif *udpif, struct upcall *upcall,
          * with pushing its stats eventually. */
     }
 
-    upcall->dump_seq = seq_read(udpif->dump_seq);
     upcall->reval_seq = seq_read(udpif->reval_seq);
 
-    xlate_actions(&xin, &upcall->xout);
+    xerr = xlate_actions(&xin, &upcall->xout);
+
+    /* Translate again and log the ofproto trace for
+     * these two error types. */
+    if (xerr == XLATE_RECURSION_TOO_DEEP ||
+        xerr == XLATE_TOO_MANY_RESUBMITS) {
+        static struct vlog_rate_limit rll = VLOG_RATE_LIMIT_INIT(1, 1);
+
+        /* This is a huge log, so be conservative. */
+        if (!VLOG_DROP_WARN(&rll)) {
+            ds_init(&output);
+            ofproto_trace(upcall->ofproto, upcall->flow,
+                          upcall->packet, NULL, 0, NULL, &output);
+            VLOG_WARN("%s", ds_cstr(&output));
+            ds_destroy(&output);
+        }
+    }
+
     if (wc) {
         /* Convert the input port wildcard from OFP to ODP format. There's no
          * real way to do this for arbitrary bitmasks since the numbering spaces
@@ -1095,20 +1220,25 @@ upcall_xlate(struct udpif *udpif, struct upcall *upcall,
 
     upcall->xout_initialized = true;
 
+    if (upcall->fitness == ODP_FIT_TOO_LITTLE) {
+        upcall->xout.slow |= SLOW_MATCH;
+    }
     if (!upcall->xout.slow) {
         ofpbuf_use_const(&upcall->put_actions,
                          odp_actions->data, odp_actions->size);
     } else {
         /* upcall->put_actions already initialized by upcall_receive(). */
-        compose_slow_path(udpif, &upcall->xout, upcall->flow,
-                          upcall->flow->in_port.odp_port,
-                          &upcall->put_actions);
+        compose_slow_path(udpif, &upcall->xout,
+                          upcall->flow->in_port.odp_port, upcall->ofp_in_port,
+                          &upcall->put_actions,
+                          upcall->ofproto->up.slowpath_meter_id,
+                          &upcall->ofproto->uuid);
     }
 
     /* This function is also called for slow-pathed flows.  As we are only
      * going to create new datapath flows for actual datapath misses, there is
      * no point in creating a ukey otherwise. */
-    if (upcall->type == DPIF_UC_MISS) {
+    if (upcall->type == MISS_UPCALL) {
         upcall->ukey = ukey_create_from_upcall(upcall, wc);
     }
 }
@@ -1144,7 +1274,7 @@ should_install_flow(struct udpif *udpif, struct upcall *upcall)
 {
     unsigned int flow_limit;
 
-    if (upcall->type != DPIF_UC_MISS) {
+    if (upcall->type != MISS_UPCALL) {
         return false;
     } else if (upcall->recirc && !upcall->have_recirc_ref) {
         VLOG_DBG_RL(&rl, "upcall: no reference for recirc flow");
@@ -1153,7 +1283,10 @@ should_install_flow(struct udpif *udpif, struct upcall *upcall)
 
     atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
     if (udpif_get_n_flows(udpif) >= flow_limit) {
-        VLOG_WARN_RL(&rl, "upcall: datapath flow limit reached");
+        COVERAGE_INC(upcall_flow_limit_hit);
+        VLOG_WARN_RL(&rl,
+                     "upcall: datapath reached the dynamic limit of %u flows.",
+                     flow_limit);
         return false;
     }
 
@@ -1166,7 +1299,6 @@ upcall_cb(const struct dp_packet *packet, const struct flow *flow, ovs_u128 *ufi
           const struct nlattr *userdata, struct ofpbuf *actions,
           struct flow_wildcards *wc, struct ofpbuf *put_actions, void *aux)
 {
-    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
     struct udpif *udpif = aux;
     struct upcall upcall;
     bool megaflow;
@@ -1180,6 +1312,7 @@ upcall_cb(const struct dp_packet *packet, const struct flow *flow, ovs_u128 *ufi
         return error;
     }
 
+    upcall.fitness = ODP_FIT_PERFECT;
     error = process_upcall(udpif, &upcall, actions, wc);
     if (error) {
         goto out;
@@ -1190,7 +1323,7 @@ upcall_cb(const struct dp_packet *packet, const struct flow *flow, ovs_u128 *ufi
                    upcall.put_actions.size);
     }
 
-    if (OVS_UNLIKELY(!megaflow)) {
+    if (OVS_UNLIKELY(!megaflow && wc)) {
         flow_wildcards_init_for_packet(wc, flow);
     }
 
@@ -1200,7 +1333,8 @@ upcall_cb(const struct dp_packet *packet, const struct flow *flow, ovs_u128 *ufi
     }
 
     if (upcall.ukey && !ukey_install(udpif, upcall.ukey)) {
-        VLOG_WARN_RL(&rl, "upcall_cb failure: ukey installation fails");
+        static struct vlog_rate_limit rll = VLOG_RATE_LIMIT_INIT(1, 1);
+        VLOG_WARN_RL(&rll, "upcall_cb failure: ukey installation fails");
         error = ENOSPC;
     }
 out:
@@ -1211,92 +1345,205 @@ out:
     return error;
 }
 
+static size_t
+dpif_get_actions(struct udpif *udpif, struct upcall *upcall,
+                 const struct nlattr **actions)
+{
+    size_t actions_len = 0;
+
+    if (upcall->actions) {
+        /* Actions were passed up from datapath. */
+        *actions = nl_attr_get(upcall->actions);
+        actions_len = nl_attr_get_size(upcall->actions);
+    }
+
+    if (actions_len == 0) {
+        /* Lookup actions in userspace cache. */
+        struct udpif_key *ukey = ukey_lookup(udpif, upcall->ufid,
+                                             upcall->pmd_id);
+        if (ukey) {
+            ukey_get_actions(ukey, actions, &actions_len);
+        }
+    }
+
+    return actions_len;
+}
+
+static size_t
+dpif_read_actions(struct udpif *udpif, struct upcall *upcall,
+                  const struct flow *flow, enum upcall_type type,
+                  void *upcall_data)
+{
+    const struct nlattr *actions = NULL;
+    size_t actions_len = dpif_get_actions(udpif, upcall, &actions);
+
+    if (!actions || !actions_len) {
+        return 0;
+    }
+
+    switch (type) {
+    case SFLOW_UPCALL:
+        dpif_sflow_read_actions(flow, actions, actions_len, upcall_data, true);
+        break;
+    case FLOW_SAMPLE_UPCALL:
+    case IPFIX_UPCALL:
+        dpif_ipfix_read_actions(flow, actions, actions_len, upcall_data);
+        break;
+    case BAD_UPCALL:
+    case MISS_UPCALL:
+    case SLOW_PATH_UPCALL:
+    case CONTROLLER_UPCALL:
+    default:
+        break;
+    }
+
+    return actions_len;
+}
+
 static int
 process_upcall(struct udpif *udpif, struct upcall *upcall,
                struct ofpbuf *odp_actions, struct flow_wildcards *wc)
 {
-    const struct nlattr *userdata = upcall->userdata;
     const struct dp_packet *packet = upcall->packet;
     const struct flow *flow = upcall->flow;
+    size_t actions_len = 0;
 
-    switch (classify_upcall(upcall->type, userdata)) {
+    switch (upcall->type) {
     case MISS_UPCALL:
+    case SLOW_PATH_UPCALL:
         upcall_xlate(udpif, upcall, odp_actions, wc);
         return 0;
 
     case SFLOW_UPCALL:
         if (upcall->sflow) {
-            union user_action_cookie cookie;
-            const struct nlattr *actions;
-            size_t actions_len = 0;
             struct dpif_sflow_actions sflow_actions;
+
             memset(&sflow_actions, 0, sizeof sflow_actions);
-            memset(&cookie, 0, sizeof cookie);
-            memcpy(&cookie, nl_attr_get(userdata), sizeof cookie.sflow);
-            if (upcall->actions) {
-                /* Actions were passed up from datapath. */
-                actions = nl_attr_get(upcall->actions);
-                actions_len = nl_attr_get_size(upcall->actions);
-                if (actions && actions_len) {
-                    dpif_sflow_read_actions(flow, actions, actions_len,
-                                            &sflow_actions);
-                }
-            }
-            if (actions_len == 0) {
-                /* Lookup actions in userspace cache. */
-                struct udpif_key *ukey = ukey_lookup(udpif, upcall->ufid,
-                                                     upcall->pmd_id);
-                if (ukey) {
-                    ukey_get_actions(ukey, &actions, &actions_len);
-                    dpif_sflow_read_actions(flow, actions, actions_len,
-                                            &sflow_actions);
-                }
-            }
+
+            actions_len = dpif_read_actions(udpif, upcall, flow,
+                                            upcall->type, &sflow_actions);
             dpif_sflow_received(upcall->sflow, packet, flow,
-                                flow->in_port.odp_port, &cookie,
+                                flow->in_port.odp_port, &upcall->cookie,
                                 actions_len > 0 ? &sflow_actions : NULL);
         }
         break;
 
     case IPFIX_UPCALL:
+    case FLOW_SAMPLE_UPCALL:
         if (upcall->ipfix) {
-            union user_action_cookie cookie;
             struct flow_tnl output_tunnel_key;
+            struct dpif_ipfix_actions ipfix_actions;
 
-            memset(&cookie, 0, sizeof cookie);
-            memcpy(&cookie, nl_attr_get(userdata), sizeof cookie.ipfix);
+            memset(&ipfix_actions, 0, sizeof ipfix_actions);
 
             if (upcall->out_tun_key) {
-                odp_tun_key_from_attr(upcall->out_tun_key, false,
-                                      &output_tunnel_key);
+                odp_tun_key_from_attr(upcall->out_tun_key, &output_tunnel_key,
+                                      NULL);
+            }
+
+            actions_len = dpif_read_actions(udpif, upcall, flow,
+                                            upcall->type, &ipfix_actions);
+            if (upcall->type == IPFIX_UPCALL) {
+                dpif_ipfix_bridge_sample(upcall->ipfix, packet, flow,
+                                         flow->in_port.odp_port,
+                                         upcall->cookie.ipfix.output_odp_port,
+                                         upcall->out_tun_key ?
+                                             &output_tunnel_key : NULL,
+                                         actions_len > 0 ?
+                                             &ipfix_actions: NULL);
+            } else {
+                /* The flow reflects exactly the contents of the packet.
+                 * Sample the packet using it. */
+                dpif_ipfix_flow_sample(upcall->ipfix, packet, flow,
+                                       &upcall->cookie, flow->in_port.odp_port,
+                                       upcall->out_tun_key ?
+                                           &output_tunnel_key : NULL,
+                                       actions_len > 0 ? &ipfix_actions: NULL);
             }
-            dpif_ipfix_bridge_sample(upcall->ipfix, packet, flow,
-                                     flow->in_port.odp_port,
-                                     cookie.ipfix.output_odp_port,
-                                     upcall->out_tun_key ?
-                                         &output_tunnel_key : NULL);
         }
         break;
 
-    case FLOW_SAMPLE_UPCALL:
-        if (upcall->ipfix) {
-            union user_action_cookie cookie;
-            struct flow_tnl output_tunnel_key;
+    case CONTROLLER_UPCALL:
+        {
+            struct user_action_cookie *cookie = &upcall->cookie;
 
-            memset(&cookie, 0, sizeof cookie);
-            memcpy(&cookie, nl_attr_get(userdata), sizeof cookie.flow_sample);
+            if (cookie->controller.dont_send) {
+                return 0;
+            }
 
-            if (upcall->out_tun_key) {
-                odp_tun_key_from_attr(upcall->out_tun_key, false,
-                                      &output_tunnel_key);
+            uint32_t recirc_id = cookie->controller.recirc_id;
+            if (!recirc_id) {
+                break;
             }
 
-            /* The flow reflects exactly the contents of the packet.
-             * Sample the packet using it. */
-            dpif_ipfix_flow_sample(upcall->ipfix, packet, flow,
-                                   &cookie, flow->in_port.odp_port,
-                                   upcall->out_tun_key ?
-                                       &output_tunnel_key : NULL);
+            const struct recirc_id_node *recirc_node
+                                = recirc_id_node_find(recirc_id);
+            if (!recirc_node) {
+                break;
+            }
+
+            const struct frozen_state *state = &recirc_node->state;
+
+            struct ofproto_async_msg *am = xmalloc(sizeof *am);
+            *am = (struct ofproto_async_msg) {
+                .controller_id = cookie->controller.controller_id,
+                .oam = OAM_PACKET_IN,
+                .pin = {
+                    .up = {
+                        .base = {
+                            .packet = xmemdup(dp_packet_data(packet),
+                                              dp_packet_size(packet)),
+                            .packet_len = dp_packet_size(packet),
+                            .reason = cookie->controller.reason,
+                            .table_id = state->table_id,
+                            .cookie = get_32aligned_be64(
+                                         &cookie->controller.rule_cookie),
+                            .userdata = (recirc_node->state.userdata_len
+                                     ? xmemdup(recirc_node->state.userdata,
+                                               recirc_node->state.userdata_len)
+                                      : NULL),
+                            .userdata_len = recirc_node->state.userdata_len,
+                        },
+                    },
+                    .max_len = cookie->controller.max_len,
+                },
+            };
+
+            if (cookie->controller.continuation) {
+                am->pin.up.stack = (state->stack_size
+                          ? xmemdup(state->stack, state->stack_size)
+                          : NULL),
+                am->pin.up.stack_size = state->stack_size,
+                am->pin.up.mirrors = state->mirrors,
+                am->pin.up.conntracked = state->conntracked,
+                am->pin.up.actions = (state->ofpacts_len
+                            ? xmemdup(state->ofpacts,
+                                      state->ofpacts_len) : NULL),
+                am->pin.up.actions_len = state->ofpacts_len,
+                am->pin.up.action_set = (state->action_set_len
+                               ? xmemdup(state->action_set,
+                                         state->action_set_len)
+                               : NULL),
+                am->pin.up.action_set_len = state->action_set_len,
+                am->pin.up.bridge = upcall->ofproto->uuid;
+                am->pin.up.odp_port = upcall->packet->md.in_port.odp_port;
+            }
+
+            /* We don't want to use the upcall 'flow', since it may be
+             * more specific than the point at which the "controller"
+             * action was specified. */
+            struct flow frozen_flow;
+
+            frozen_flow = *flow;
+            if (!state->conntracked) {
+                flow_clear_conntrack(&frozen_flow);
+            }
+
+            frozen_metadata_to_flow(&upcall->ofproto->up, &state->metadata,
+                                    &frozen_flow);
+            flow_get_metadata(&frozen_flow, &am->pin.up.base.flow_metadata);
+
+            ofproto_dpif_send_async_msg(upcall->ofproto, am);
         }
         break;
 
@@ -1317,11 +1564,11 @@ handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
 
     /* Handle the packets individually in order of arrival.
      *
-     *   - For SLOW_CFM, SLOW_LACP, SLOW_STP, and SLOW_BFD, translation is what
-     *     processes received packets for these protocols.
+     *   - For SLOW_CFM, SLOW_LACP, SLOW_STP, SLOW_BFD, and SLOW_LLDP,
+     *     translation is what processes received packets for these
+     *     protocols.
      *
-     *   - For SLOW_CONTROLLER, translation sends the packet to the OpenFlow
-     *     controller.
+     *   - For SLOW_ACTION, translation executes the actions directly.
      *
      * The loop fills 'ops' with an array of operations to execute in the
      * datapath. */
@@ -1334,62 +1581,46 @@ handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
         if (should_install_flow(udpif, upcall)) {
             struct udpif_key *ukey = upcall->ukey;
 
-            upcall->ukey_persists = true;
-            op = &ops[n_ops++];
-
-            op->ukey = ukey;
-            op->dop.type = DPIF_OP_FLOW_PUT;
-            op->dop.u.flow_put.flags = DPIF_FP_CREATE;
-            op->dop.u.flow_put.key = ukey->key;
-            op->dop.u.flow_put.key_len = ukey->key_len;
-            op->dop.u.flow_put.mask = ukey->mask;
-            op->dop.u.flow_put.mask_len = ukey->mask_len;
-            op->dop.u.flow_put.ufid = upcall->ufid;
-            op->dop.u.flow_put.stats = NULL;
-            ukey_get_actions(ukey, &op->dop.u.flow_put.actions,
-                             &op->dop.u.flow_put.actions_len);
+            if (ukey_install(udpif, ukey)) {
+                upcall->ukey_persists = true;
+                put_op_init(&ops[n_ops++], ukey, DPIF_FP_CREATE);
+            }
         }
 
         if (upcall->odp_actions.size) {
             op = &ops[n_ops++];
             op->ukey = NULL;
             op->dop.type = DPIF_OP_EXECUTE;
-            op->dop.u.execute.packet = CONST_CAST(struct dp_packet *, packet);
-            op->dop.u.execute.flow = upcall->flow;
-            odp_key_to_pkt_metadata(upcall->key, upcall->key_len,
-                                    &op->dop.u.execute.packet->md);
-            op->dop.u.execute.actions = upcall->odp_actions.data;
-            op->dop.u.execute.actions_len = upcall->odp_actions.size;
-            op->dop.u.execute.needs_help = (upcall->xout.slow & SLOW_ACTION) != 0;
-            op->dop.u.execute.probe = false;
-            op->dop.u.execute.mtu = upcall->mru;
+            op->dop.execute.packet = CONST_CAST(struct dp_packet *, packet);
+            op->dop.execute.flow = upcall->flow;
+            odp_key_to_dp_packet(upcall->key, upcall->key_len,
+                                 op->dop.execute.packet);
+            op->dop.execute.actions = upcall->odp_actions.data;
+            op->dop.execute.actions_len = upcall->odp_actions.size;
+            op->dop.execute.needs_help = (upcall->xout.slow & SLOW_ACTION) != 0;
+            op->dop.execute.probe = false;
+            op->dop.execute.mtu = upcall->mru;
+            op->dop.execute.hash = upcall->hash;
         }
     }
 
-    /* Execute batch.
-     *
-     * We install ukeys before installing the flows, locking them for exclusive
-     * access by this thread for the period of installation. This ensures that
-     * other threads won't attempt to delete the flows as we are creating them.
-     */
+    /* Execute batch. */
     n_opsp = 0;
+    for (i = 0; i < n_ops; i++) {
+        opsp[n_opsp++] = &ops[i].dop;
+    }
+    dpif_operate(udpif->dpif, opsp, n_opsp, DPIF_OFFLOAD_AUTO);
     for (i = 0; i < n_ops; i++) {
         struct udpif_key *ukey = ops[i].ukey;
 
         if (ukey) {
-            /* If we can't install the ukey, don't install the flow. */
-            if (!ukey_install_start(udpif, ukey)) {
-                ukey_delete__(ukey);
-                ops[i].ukey = NULL;
-                continue;
+            ovs_mutex_lock(&ukey->mutex);
+            if (ops[i].dop.error) {
+                transition_ukey(ukey, UKEY_EVICTED);
+            } else if (ukey->state < UKEY_OPERATIONAL) {
+                transition_ukey(ukey, UKEY_OPERATIONAL);
             }
-        }
-        opsp[n_opsp++] = &ops[i].dop;
-    }
-    dpif_operate(udpif->dpif, opsp, n_opsp);
-    for (i = 0; i < n_ops; i++) {
-        if (ops[i].ukey) {
-            ukey_install_finish(ops[i].ukey, ops[i].dop.error);
+            ovs_mutex_unlock(&ukey->mutex);
         }
     }
 }
@@ -1429,8 +1660,13 @@ ukey_get_actions(struct udpif_key *ukey, const struct nlattr **actions, size_t *
 static void
 ukey_set_actions(struct udpif_key *ukey, const struct ofpbuf *actions)
 {
-    ovsrcu_postpone(ofpbuf_delete,
-                    ovsrcu_get_protected(struct ofpbuf *, &ukey->actions));
+    struct ofpbuf *old_actions = ovsrcu_get_protected(struct ofpbuf *,
+                                                      &ukey->actions);
+
+    if (old_actions) {
+        ovsrcu_postpone(ofpbuf_delete, old_actions);
+    }
+
     ovsrcu_set(&ukey->actions, ofpbuf_clone(actions));
 }
 
@@ -1439,7 +1675,7 @@ ukey_create__(const struct nlattr *key, size_t key_len,
               const struct nlattr *mask, size_t mask_len,
               bool ufid_present, const ovs_u128 *ufid,
               const unsigned pmd_id, const struct ofpbuf *actions,
-              uint64_t dump_seq, uint64_t reval_seq, long long int used,
+              uint64_t reval_seq, long long int used,
               uint32_t key_recirc_id, struct xlate_out *xout)
     OVS_NO_THREAD_SAFETY_ANALYSIS
 {
@@ -1460,14 +1696,20 @@ ukey_create__(const struct nlattr *key, size_t key_len,
     ukey_set_actions(ukey, actions);
 
     ovs_mutex_init(&ukey->mutex);
-    ukey->dump_seq = dump_seq;
+    ukey->dump_seq = 0;     /* Not yet dumped */
     ukey->reval_seq = reval_seq;
-    ukey->flow_exists = false;
-    ukey->created = time_msec();
+    ukey->state = UKEY_CREATED;
+    ukey->state_thread = ovsthread_id_self();
+    ukey->state_where = OVS_SOURCE_LOCATOR;
+    ukey->created = ukey->flow_time = time_msec();
     memset(&ukey->stats, 0, sizeof ukey->stats);
     ukey->stats.used = used;
     ukey->xcache = NULL;
 
+    ukey->offloaded = false;
+    ukey->in_netdev = NULL;
+    ukey->flow_packets = ukey->flow_backlog_packets = 0;
+
     ukey->key_recirc_id = key_recirc_id;
     recirc_refs_init(&ukey->recircs);
     if (xout) {
@@ -1486,10 +1728,10 @@ ukey_create_from_upcall(struct upcall *upcall, struct flow_wildcards *wc)
     bool megaflow;
     struct odp_flow_key_parms odp_parms = {
         .flow = upcall->flow,
-        .mask = &wc->masks,
+        .mask = wc ? &wc->masks : NULL,
     };
 
-    odp_parms.support = ofproto_dpif_get_support(upcall->ofproto)->odp;
+    odp_parms.support = upcall->ofproto->backer->rt_support.odp;
     if (upcall->key_len) {
         ofpbuf_use_const(&keybuf, upcall->key, upcall->key_len);
     } else {
@@ -1501,15 +1743,14 @@ ukey_create_from_upcall(struct upcall *upcall, struct flow_wildcards *wc)
 
     atomic_read_relaxed(&enable_megaflows, &megaflow);
     ofpbuf_use_stack(&maskbuf, &maskstub, sizeof maskstub);
-    if (megaflow) {
+    if (megaflow && wc) {
         odp_parms.key_buf = &keybuf;
         odp_flow_key_from_mask(&odp_parms, &maskbuf);
     }
 
     return ukey_create__(keybuf.data, keybuf.size, maskbuf.data, maskbuf.size,
                          true, upcall->ufid, upcall->pmd_id,
-                         &upcall->put_actions, upcall->dump_seq,
-                         upcall->reval_seq, 0,
+                         &upcall->put_actions, upcall->reval_seq, 0,
                          upcall->have_recirc_ref ? upcall->recirc->id : 0,
                          &upcall->xout);
 }
@@ -1521,7 +1762,7 @@ ukey_create_from_dpif_flow(const struct udpif *udpif,
 {
     struct dpif_flow full_flow;
     struct ofpbuf actions;
-    uint64_t dump_seq, reval_seq;
+    uint64_t reval_seq;
     uint64_t stub[DPIF_FLOW_BUFSIZE / 8];
     const struct nlattr *a;
     unsigned int left;
@@ -1546,35 +1787,67 @@ ukey_create_from_dpif_flow(const struct udpif *udpif,
      * relies on OVS userspace internal state, we need to delete all old
      * datapath flows with either a non-zero recirc_id in the key, or any
      * recirculation actions upon OVS restart. */
-    NL_ATTR_FOR_EACH_UNSAFE (a, left, flow->key, flow->key_len) {
+    NL_ATTR_FOR_EACH (a, left, flow->key, flow->key_len) {
         if (nl_attr_type(a) == OVS_KEY_ATTR_RECIRC_ID
             && nl_attr_get_u32(a) != 0) {
             return EINVAL;
         }
     }
-    NL_ATTR_FOR_EACH_UNSAFE (a, left, flow->actions, flow->actions_len) {
+    NL_ATTR_FOR_EACH (a, left, flow->actions, flow->actions_len) {
         if (nl_attr_type(a) == OVS_ACTION_ATTR_RECIRC) {
             return EINVAL;
         }
     }
 
-    dump_seq = seq_read(udpif->dump_seq);
-    reval_seq = seq_read(udpif->reval_seq);
-    ofpbuf_use_const(&actions, &flow->actions, flow->actions_len);
+    reval_seq = seq_read(udpif->reval_seq) - 1; /* Ensure revalidation. */
+    ofpbuf_use_const(&actions, flow->actions, flow->actions_len);
     *ukey = ukey_create__(flow->key, flow->key_len,
                           flow->mask, flow->mask_len, flow->ufid_present,
-                          &flow->ufid, flow->pmd_id, &actions, dump_seq,
+                          &flow->ufid, flow->pmd_id, &actions,
                           reval_seq, flow->stats.used, 0, NULL);
 
     return 0;
 }
 
+static bool
+try_ukey_replace(struct umap *umap, struct udpif_key *old_ukey,
+                 struct udpif_key *new_ukey)
+    OVS_REQUIRES(umap->mutex)
+    OVS_TRY_LOCK(true, new_ukey->mutex)
+{
+    bool replaced = false;
+
+    if (!ovs_mutex_trylock(&old_ukey->mutex)) {
+        if (old_ukey->state == UKEY_EVICTED) {
+            /* The flow was deleted during the current revalidator dump,
+             * but its ukey won't be fully cleaned up until the sweep phase.
+             * In the mean time, we are receiving upcalls for this traffic.
+             * Expedite the (new) flow install by replacing the ukey. */
+            ovs_mutex_lock(&new_ukey->mutex);
+            cmap_replace(&umap->cmap, &old_ukey->cmap_node,
+                         &new_ukey->cmap_node, new_ukey->hash);
+            ovsrcu_postpone(ukey_delete__, old_ukey);
+            transition_ukey(old_ukey, UKEY_DELETED);
+            transition_ukey(new_ukey, UKEY_VISIBLE);
+            replaced = true;
+        }
+        ovs_mutex_unlock(&old_ukey->mutex);
+    }
+
+    if (replaced) {
+        COVERAGE_INC(upcall_ukey_replace);
+    } else {
+        COVERAGE_INC(handler_duplicate_upcall);
+    }
+    return replaced;
+}
+
 /* Attempts to insert a ukey into the shared ukey maps.
  *
  * On success, returns true, installs the ukey and returns it in a locked
  * state. Otherwise, returns false. */
 static bool
-ukey_install_start(struct udpif *udpif, struct udpif_key *new_ukey)
+ukey_install__(struct udpif *udpif, struct udpif_key *new_ukey)
     OVS_TRY_LOCK(true, new_ukey->mutex)
 {
     struct umap *umap;
@@ -1590,7 +1863,7 @@ ukey_install_start(struct udpif *udpif, struct udpif_key *new_ukey)
         /* Uncommon case: A ukey is already installed with the same UFID. */
         if (old_ukey->key_len == new_ukey->key_len
             && !memcmp(old_ukey->key, new_ukey->key, new_ukey->key_len)) {
-            COVERAGE_INC(handler_duplicate_upcall);
+            locked = try_ukey_replace(umap, old_ukey, new_ukey);
         } else {
             struct ds ds = DS_EMPTY_INITIALIZER;
 
@@ -1608,6 +1881,7 @@ ukey_install_start(struct udpif *udpif, struct udpif_key *new_ukey)
     } else {
         ovs_mutex_lock(&new_ukey->mutex);
         cmap_insert(&umap->cmap, &new_ukey->cmap_node, new_ukey->hash);
+        transition_ukey(new_ukey, UKEY_VISIBLE);
         locked = true;
     }
     ovs_mutex_unlock(&umap->mutex);
@@ -1616,37 +1890,64 @@ ukey_install_start(struct udpif *udpif, struct udpif_key *new_ukey)
 }
 
 static void
-ukey_install_finish__(struct udpif_key *ukey) OVS_REQUIRES(ukey->mutex)
+transition_ukey_at(struct udpif_key *ukey, enum ukey_state dst,
+                   const char *where)
+    OVS_REQUIRES(ukey->mutex)
 {
-    ukey->flow_exists = true;
-}
+    if (dst < ukey->state) {
+        VLOG_ABORT("Invalid ukey transition %d->%d (last transitioned from "
+                   "thread %u at %s)", ukey->state, dst, ukey->state_thread,
+                   ukey->state_where);
+    }
+    if (ukey->state == dst && dst == UKEY_OPERATIONAL) {
+        return;
+    }
+
+    /* Valid state transitions:
+     * UKEY_CREATED -> UKEY_VISIBLE
+     *  Ukey is now visible in the umap.
+     * UKEY_VISIBLE -> UKEY_OPERATIONAL
+     *  A handler has installed the flow, and the flow is in the datapath.
+     * UKEY_VISIBLE -> UKEY_EVICTING
+     *  A handler installs the flow, then revalidator sweeps the ukey before
+     *  the flow is dumped. Most likely the flow was installed; start trying
+     *  to delete it.
+     * UKEY_VISIBLE -> UKEY_EVICTED
+     *  A handler attempts to install the flow, but the datapath rejects it.
+     *  Consider that the datapath has already destroyed it.
+     * UKEY_OPERATIONAL -> UKEY_EVICTING
+     *  A revalidator decides to evict the datapath flow.
+     * UKEY_EVICTING    -> UKEY_EVICTED
+     *  A revalidator has evicted the datapath flow.
+     * UKEY_EVICTED     -> UKEY_DELETED
+     *  A revalidator has removed the ukey from the umap and is deleting it.
+     */
+    if (ukey->state == dst - 1 || (ukey->state == UKEY_VISIBLE &&
+                                   dst < UKEY_DELETED)) {
+        ukey->state = dst;
+    } else {
+        struct ds ds = DS_EMPTY_INITIALIZER;
 
-static bool
-ukey_install_finish(struct udpif_key *ukey, int error)
-    OVS_RELEASES(ukey->mutex)
-{
-    if (!error) {
-        ukey_install_finish__(ukey);
+        odp_format_ufid(&ukey->ufid, &ds);
+        VLOG_WARN_RL(&rl, "Invalid state transition for ukey %s: %d -> %d",
+                     ds_cstr(&ds), ukey->state, dst);
+        ds_destroy(&ds);
     }
-    ovs_mutex_unlock(&ukey->mutex);
-
-    return !error;
+    ukey->state_thread = ovsthread_id_self();
+    ukey->state_where = where;
 }
 
 static bool
 ukey_install(struct udpif *udpif, struct udpif_key *ukey)
 {
-    /* The usual way to keep 'ukey->flow_exists' in sync with the datapath is
-     * to call ukey_install_start(), install the corresponding datapath flow,
-     * then call ukey_install_finish(). The netdev interface using upcall_cb()
-     * doesn't provide a function to separately finish the flow installation,
-     * so we perform the operations together here.
-     *
-     * This is fine currently, as revalidator threads will only delete this
-     * ukey during revalidator_sweep() and only if the dump_seq is mismatched.
-     * It is unlikely for a revalidator thread to advance dump_seq and reach
-     * the next GC phase between ukey creation and flow installation. */
-    return ukey_install_start(udpif, ukey) && ukey_install_finish(ukey, 0);
+    bool installed;
+
+    installed = ukey_install__(udpif, ukey);
+    if (installed) {
+        ovs_mutex_unlock(&ukey->mutex);
+    }
+
+    return installed;
 }
 
 /* Searches for a ukey in 'udpif->ukeys' that matches 'flow' and attempts to
@@ -1682,9 +1983,8 @@ ukey_acquire(struct udpif *udpif, const struct dpif_flow *flow,
         if (retval) {
             goto done;
         }
-        install = ukey_install_start(udpif, ukey);
+        install = ukey_install__(udpif, ukey);
         if (install) {
-            ukey_install_finish__(ukey);
             retval = 0;
         } else {
             ukey_delete__(ukey);
@@ -1722,8 +2022,13 @@ static void
 ukey_delete(struct umap *umap, struct udpif_key *ukey)
     OVS_REQUIRES(umap->mutex)
 {
-    cmap_remove(&umap->cmap, &ukey->cmap_node, ukey->hash);
-    ovsrcu_postpone(ukey_delete__, ukey);
+    ovs_mutex_lock(&ukey->mutex);
+    if (ukey->state < UKEY_DELETED) {
+        cmap_remove(&umap->cmap, &ukey->cmap_node, ukey->hash);
+        ovsrcu_postpone(ukey_delete__, ukey);
+        transition_ukey(ukey, UKEY_DELETED);
+    }
+    ovs_mutex_unlock(&ukey->mutex);
 }
 
 static bool
@@ -1732,7 +2037,12 @@ should_revalidate(const struct udpif *udpif, uint64_t packets,
 {
     long long int metric, now, duration;
 
-    if (udpif->dump_duration < 200) {
+    if (!used) {
+        /* Always revalidate the first time a flow is dumped. */
+        return true;
+    }
+
+    if (udpif->dump_duration < ofproto_max_revalidator / 2) {
         /* We are likely to handle full revalidation for the flows. */
         return true;
     }
@@ -1752,126 +2062,150 @@ should_revalidate(const struct udpif *udpif, uint64_t packets,
     duration = now - used;
     metric = duration / packets;
 
-    if (metric < 200) {
-        /* The flow is receiving more than ~5pps, so keep it. */
+    if (metric < 1000 / ofproto_min_revalidate_pps) {
+        /* The flow is receiving more than min-revalidate-pps, so keep it. */
         return true;
     }
     return false;
 }
 
-/* Verifies that the datapath actions of 'ukey' are still correct, and pushes
- * 'stats' for it.
- *
- * Returns a recommended action for 'ukey', options include:
- *      UKEY_DELETE The ukey should be deleted.
- *      UKEY_KEEP   The ukey is fine as is.
- *      UKEY_MODIFY The ukey's actions should be changed but is otherwise
- *                  fine.  Callers should change the actions to those found
- *                  in the caller supplied 'odp_actions' buffer.  The
- *                  recirculation references can be found in 'recircs' and
- *                  must be handled by the caller.
+struct reval_context {
+    /* Optional output parameters */
+    struct flow_wildcards *wc;
+    struct ofpbuf *odp_actions;
+    struct netflow **netflow;
+    struct xlate_cache *xcache;
+
+    /* Required output parameters */
+    struct xlate_out xout;
+    struct flow flow;
+};
+
+/* Translates 'key' into a flow, populating 'ctx' as it goes along.
  *
- * If the result is UKEY_MODIFY, then references to all recirc_ids used by the
- * new flow will be held within 'recircs' (which may be none).
+ * Returns 0 on success, otherwise a positive errno value.
  *
- * The caller is responsible for both initializing 'recircs' prior this call,
- * and ensuring any references are eventually freed.
+ * The caller is responsible for uninitializing ctx->xout on success.
  */
-static enum reval_result
-revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
-                const struct dpif_flow_stats *stats,
-                struct ofpbuf *odp_actions, uint64_t reval_seq,
-                struct recirc_refs *recircs)
-    OVS_REQUIRES(ukey->mutex)
+static int
+xlate_key(struct udpif *udpif, const struct nlattr *key, unsigned int len,
+          const struct dpif_flow_stats *push, struct reval_context *ctx)
 {
-    struct xlate_out xout, *xoutp;
-    struct netflow *netflow;
     struct ofproto_dpif *ofproto;
-    struct dpif_flow_stats push;
-    struct flow flow;
-    struct flow_wildcards dp_mask, wc;
-    enum reval_result result;
     ofp_port_t ofp_in_port;
+    enum odp_key_fitness fitness;
     struct xlate_in xin;
-    long long int last_used;
     int error;
-    bool need_revalidate;
-
-    result = UKEY_DELETE;
-    xoutp = NULL;
-    netflow = NULL;
-
-    ofpbuf_clear(odp_actions);
-    need_revalidate = (ukey->reval_seq != reval_seq);
-    last_used = ukey->stats.used;
-    push.used = stats->used;
-    push.tcp_flags = stats->tcp_flags;
-    push.n_packets = (stats->n_packets > ukey->stats.n_packets
-                      ? stats->n_packets - ukey->stats.n_packets
-                      : 0);
-    push.n_bytes = (stats->n_bytes > ukey->stats.n_bytes
-                    ? stats->n_bytes - ukey->stats.n_bytes
-                    : 0);
 
-    if (need_revalidate && last_used
-        && !should_revalidate(udpif, push.n_packets, last_used)) {
-        goto exit;
+    fitness = odp_flow_key_to_flow(key, len, &ctx->flow, NULL);
+    if (fitness == ODP_FIT_ERROR) {
+        return EINVAL;
     }
 
-    /* We will push the stats, so update the ukey stats cache. */
-    ukey->stats = *stats;
-    if (!push.n_packets && !need_revalidate) {
-        result = UKEY_KEEP;
-        goto exit;
+    error = xlate_lookup(udpif->backer, &ctx->flow, &ofproto, NULL, NULL,
+                         ctx->netflow, &ofp_in_port);
+    if (error) {
+        return error;
     }
 
-    if (ukey->xcache && !need_revalidate) {
-        xlate_push_stats(ukey->xcache, &push);
-        result = UKEY_KEEP;
-        goto exit;
+    xlate_in_init(&xin, ofproto, ofproto_dpif_get_tables_version(ofproto),
+                  &ctx->flow, ofp_in_port, NULL, push->tcp_flags,
+                  NULL, ctx->wc, ctx->odp_actions);
+    if (push->n_packets) {
+        xin.resubmit_stats = push;
+        xin.allow_side_effects = true;
     }
-
-    if (odp_flow_key_to_flow(ukey->key, ukey->key_len, &flow)
-        == ODP_FIT_ERROR) {
-        goto exit;
+    xin.xcache = ctx->xcache;
+    xlate_actions(&xin, &ctx->xout);
+    if (fitness == ODP_FIT_TOO_LITTLE) {
+        ctx->xout.slow |= SLOW_MATCH;
     }
 
-    error = xlate_lookup(udpif->backer, &flow, &ofproto, NULL, NULL, &netflow,
-                         &ofp_in_port);
+    return 0;
+}
+
+static int
+xlate_ukey(struct udpif *udpif, const struct udpif_key *ukey,
+           uint16_t tcp_flags, struct reval_context *ctx)
+{
+    struct dpif_flow_stats push = {
+        .tcp_flags = tcp_flags,
+    };
+    return xlate_key(udpif, ukey->key, ukey->key_len, &push, ctx);
+}
+
+static int
+populate_xcache(struct udpif *udpif, struct udpif_key *ukey,
+                uint16_t tcp_flags)
+    OVS_REQUIRES(ukey->mutex)
+{
+    struct reval_context ctx = {
+        .odp_actions = NULL,
+        .netflow = NULL,
+        .wc = NULL,
+    };
+    int error;
+
+    ovs_assert(!ukey->xcache);
+    ukey->xcache = ctx.xcache = xlate_cache_new();
+    error = xlate_ukey(udpif, ukey, tcp_flags, &ctx);
     if (error) {
-        goto exit;
+        return error;
     }
+    xlate_out_uninit(&ctx.xout);
 
-    if (need_revalidate) {
-        xlate_cache_clear(ukey->xcache);
-    }
-    if (!ukey->xcache) {
-        ukey->xcache = xlate_cache_new();
-    }
+    return 0;
+}
+
+static enum reval_result
+revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey,
+                  uint16_t tcp_flags, struct ofpbuf *odp_actions,
+                  struct recirc_refs *recircs, struct xlate_cache *xcache)
+{
+    struct xlate_out *xoutp;
+    struct netflow *netflow;
+    struct flow_wildcards dp_mask, wc;
+    enum reval_result result;
+    struct reval_context ctx = {
+        .odp_actions = odp_actions,
+        .netflow = &netflow,
+        .xcache = xcache,
+        .wc = &wc,
+    };
+
+    result = UKEY_DELETE;
+    xoutp = NULL;
+    netflow = NULL;
 
-    xlate_in_init(&xin, ofproto, &flow, ofp_in_port, NULL, push.tcp_flags,
-                  NULL, need_revalidate ? &wc : NULL, odp_actions);
-    if (push.n_packets) {
-        xin.resubmit_stats = &push;
-        xin.may_learn = true;
+    if (xlate_ukey(udpif, ukey, tcp_flags, &ctx)) {
+        goto exit;
     }
-    xin.xcache = ukey->xcache;
-    xlate_actions(&xin, &xout);
-    xoutp = &xout;
+    xoutp = &ctx.xout;
 
-    if (!need_revalidate) {
-        result = UKEY_KEEP;
+    if (xoutp->avoid_caching) {
         goto exit;
     }
 
-    if (xout.slow) {
+    if (xoutp->slow) {
+        struct ofproto_dpif *ofproto;
+        ofp_port_t ofp_in_port;
+
+        ofproto = xlate_lookup_ofproto(udpif->backer, &ctx.flow, &ofp_in_port,
+                                       NULL);
+
         ofpbuf_clear(odp_actions);
-        compose_slow_path(udpif, &xout, &flow, flow.in_port.odp_port,
-                          odp_actions);
+
+        if (!ofproto) {
+            goto exit;
+        }
+
+        compose_slow_path(udpif, xoutp, ctx.flow.in_port.odp_port,
+                          ofp_in_port, odp_actions,
+                          ofproto->up.slowpath_meter_id, &ofproto->uuid);
     }
 
-    if (odp_flow_key_to_mask(ukey->mask, ukey->mask_len, ukey->key,
-                             ukey->key_len, &dp_mask, &flow)
+    if (odp_flow_key_to_mask(ukey->mask, ukey->mask_len, &dp_mask, &ctx.flow,
+                             NULL)
         == ODP_FIT_ERROR) {
         goto exit;
     }
@@ -1880,8 +2214,8 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
      * but not the newly revalidated wildcard mask (wc), i.e., if revalidation
      * tells that the datapath flow is now too generic and must be narrowed
      * down.  Note that we do not know if the datapath has ignored any of the
-     * wildcarded bits, so we may be overtly conservative here. */
-    if (flow_wildcards_has_extra(&dp_mask, &wc)) {
+     * wildcarded bits, so we may be overly conservative here. */
+    if (flow_wildcards_has_extra(&dp_mask, ctx.wc)) {
         goto exit;
     }
 
@@ -1898,28 +2232,90 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
     result = UKEY_KEEP;
 
 exit:
-    if (result != UKEY_DELETE) {
-        ukey->reval_seq = reval_seq;
-    }
     if (netflow && result == UKEY_DELETE) {
-        netflow_flow_clear(netflow, &flow);
+        netflow_flow_clear(netflow, &ctx.flow);
     }
     xlate_out_uninit(xoutp);
     return result;
 }
 
+/* Verifies that the datapath actions of 'ukey' are still correct, and pushes
+ * 'stats' for it.
+ *
+ * Returns a recommended action for 'ukey', options include:
+ *      UKEY_DELETE The ukey should be deleted.
+ *      UKEY_KEEP   The ukey is fine as is.
+ *      UKEY_MODIFY The ukey's actions should be changed but is otherwise
+ *                  fine.  Callers should change the actions to those found
+ *                  in the caller supplied 'odp_actions' buffer.  The
+ *                  recirculation references can be found in 'recircs' and
+ *                  must be handled by the caller.
+ *
+ * If the result is UKEY_MODIFY, then references to all recirc_ids used by the
+ * new flow will be held within 'recircs' (which may be none).
+ *
+ * The caller is responsible for both initializing 'recircs' prior this call,
+ * and ensuring any references are eventually freed.
+ */
+static enum reval_result
+revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
+                const struct dpif_flow_stats *stats,
+                struct ofpbuf *odp_actions, uint64_t reval_seq,
+                struct recirc_refs *recircs, bool offloaded)
+    OVS_REQUIRES(ukey->mutex)
+{
+    bool need_revalidate = ukey->reval_seq != reval_seq;
+    enum reval_result result = UKEY_DELETE;
+    struct dpif_flow_stats push;
+
+    ofpbuf_clear(odp_actions);
+
+    push.used = stats->used;
+    push.tcp_flags = stats->tcp_flags;
+    push.n_packets = (stats->n_packets > ukey->stats.n_packets
+                      ? stats->n_packets - ukey->stats.n_packets
+                      : 0);
+    push.n_bytes = (stats->n_bytes > ukey->stats.n_bytes
+                    ? stats->n_bytes - ukey->stats.n_bytes
+                    : 0);
+
+    if (need_revalidate) {
+        if (should_revalidate(udpif, push.n_packets, ukey->stats.used)) {
+            if (!ukey->xcache) {
+                ukey->xcache = xlate_cache_new();
+            } else {
+                xlate_cache_clear(ukey->xcache);
+            }
+            result = revalidate_ukey__(udpif, ukey, push.tcp_flags,
+                                       odp_actions, recircs, ukey->xcache);
+        } /* else delete; too expensive to revalidate */
+    } else if (!push.n_packets || ukey->xcache
+               || !populate_xcache(udpif, ukey, push.tcp_flags)) {
+        result = UKEY_KEEP;
+    }
+
+    /* Stats for deleted flows will be attributed upon flow deletion. Skip. */
+    if (result != UKEY_DELETE) {
+        xlate_push_stats(ukey->xcache, &push, offloaded);
+        ukey->stats = *stats;
+        ukey->reval_seq = reval_seq;
+    }
+
+    return result;
+}
+
 static void
 delete_op_init__(struct udpif *udpif, struct ukey_op *op,
                  const struct dpif_flow *flow)
 {
     op->ukey = NULL;
     op->dop.type = DPIF_OP_FLOW_DEL;
-    op->dop.u.flow_del.key = flow->key;
-    op->dop.u.flow_del.key_len = flow->key_len;
-    op->dop.u.flow_del.ufid = flow->ufid_present ? &flow->ufid : NULL;
-    op->dop.u.flow_del.pmd_id = flow->pmd_id;
-    op->dop.u.flow_del.stats = &op->stats;
-    op->dop.u.flow_del.terse = udpif_use_ufid(udpif);
+    op->dop.flow_del.key = flow->key;
+    op->dop.flow_del.key_len = flow->key_len;
+    op->dop.flow_del.ufid = flow->ufid_present ? &flow->ufid : NULL;
+    op->dop.flow_del.pmd_id = flow->pmd_id;
+    op->dop.flow_del.stats = &op->stats;
+    op->dop.flow_del.terse = udpif_use_ufid(udpif);
 }
 
 static void
@@ -1927,29 +2323,30 @@ delete_op_init(struct udpif *udpif, struct ukey_op *op, struct udpif_key *ukey)
 {
     op->ukey = ukey;
     op->dop.type = DPIF_OP_FLOW_DEL;
-    op->dop.u.flow_del.key = ukey->key;
-    op->dop.u.flow_del.key_len = ukey->key_len;
-    op->dop.u.flow_del.ufid = ukey->ufid_present ? &ukey->ufid : NULL;
-    op->dop.u.flow_del.pmd_id = ukey->pmd_id;
-    op->dop.u.flow_del.stats = &op->stats;
-    op->dop.u.flow_del.terse = udpif_use_ufid(udpif);
+    op->dop.flow_del.key = ukey->key;
+    op->dop.flow_del.key_len = ukey->key_len;
+    op->dop.flow_del.ufid = ukey->ufid_present ? &ukey->ufid : NULL;
+    op->dop.flow_del.pmd_id = ukey->pmd_id;
+    op->dop.flow_del.stats = &op->stats;
+    op->dop.flow_del.terse = udpif_use_ufid(udpif);
 }
 
 static void
-modify_op_init(struct ukey_op *op, struct udpif_key *ukey)
+put_op_init(struct ukey_op *op, struct udpif_key *ukey,
+            enum dpif_flow_put_flags flags)
 {
     op->ukey = ukey;
     op->dop.type = DPIF_OP_FLOW_PUT;
-    op->dop.u.flow_put.flags = DPIF_FP_MODIFY;
-    op->dop.u.flow_put.key = ukey->key;
-    op->dop.u.flow_put.key_len = ukey->key_len;
-    op->dop.u.flow_put.mask = ukey->mask;
-    op->dop.u.flow_put.mask_len = ukey->mask_len;
-    op->dop.u.flow_put.ufid = ukey->ufid_present ? &ukey->ufid : NULL;
-    op->dop.u.flow_put.pmd_id = ukey->pmd_id;
-    op->dop.u.flow_put.stats = NULL;
-    ukey_get_actions(ukey, &op->dop.u.flow_put.actions,
-                     &op->dop.u.flow_put.actions_len);
+    op->dop.flow_put.flags = flags;
+    op->dop.flow_put.key = ukey->key;
+    op->dop.flow_put.key_len = ukey->key_len;
+    op->dop.flow_put.mask = ukey->mask;
+    op->dop.flow_put.mask_len = ukey->mask_len;
+    op->dop.flow_put.ufid = ukey->ufid_present ? &ukey->ufid : NULL;
+    op->dop.flow_put.pmd_id = ukey->pmd_id;
+    op->dop.flow_put.stats = NULL;
+    ukey_get_actions(ukey, &op->dop.flow_put.actions,
+                     &op->dop.flow_put.actions_len);
 }
 
 /* Executes datapath operations 'ops' and attributes stats retrieved from the
@@ -1964,13 +2361,13 @@ push_dp_ops(struct udpif *udpif, struct ukey_op *ops, size_t n_ops)
     for (i = 0; i < n_ops; i++) {
         opsp[i] = &ops[i].dop;
     }
-    dpif_operate(udpif->dpif, opsp, n_ops);
+    dpif_operate(udpif->dpif, opsp, n_ops, DPIF_OFFLOAD_AUTO);
 
     for (i = 0; i < n_ops; i++) {
         struct ukey_op *op = &ops[i];
         struct dpif_flow_stats *push, *stats, push_buf;
 
-        stats = op->dop.u.flow_del.stats;
+        stats = op->dop.flow_del.stats;
         push = &push_buf;
 
         if (op->dop.type != DPIF_OP_FLOW_DEL) {
@@ -1980,11 +2377,17 @@ push_dp_ops(struct udpif *udpif, struct ukey_op *ops, size_t n_ops)
 
         if (op->dop.error) {
             /* flow_del error, 'stats' is unusable. */
+            if (op->ukey) {
+                ovs_mutex_lock(&op->ukey->mutex);
+                transition_ukey(op->ukey, UKEY_EVICTED);
+                ovs_mutex_unlock(&op->ukey->mutex);
+            }
             continue;
         }
 
         if (op->ukey) {
             ovs_mutex_lock(&op->ukey->mutex);
+            transition_ukey(op->ukey, UKEY_EVICTED);
             push->used = MAX(stats->used, op->ukey->stats.used);
             push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
             push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
@@ -1995,18 +2398,18 @@ push_dp_ops(struct udpif *udpif, struct ukey_op *ops, size_t n_ops)
         }
 
         if (push->n_packets || netflow_exists()) {
-            const struct nlattr *key = op->dop.u.flow_del.key;
-            size_t key_len = op->dop.u.flow_del.key_len;
-            struct ofproto_dpif *ofproto;
+            const struct nlattr *key = op->dop.flow_del.key;
+            size_t key_len = op->dop.flow_del.key_len;
             struct netflow *netflow;
-            ofp_port_t ofp_in_port;
-            struct flow flow;
+            struct reval_context ctx = {
+                .netflow = &netflow,
+            };
             int error;
 
             if (op->ukey) {
                 ovs_mutex_lock(&op->ukey->mutex);
                 if (op->ukey->xcache) {
-                    xlate_push_stats(op->ukey->xcache, push);
+                    xlate_push_stats(op->ukey->xcache, push, false);
                     ovs_mutex_unlock(&op->ukey->mutex);
                     continue;
                 }
@@ -2015,24 +2418,15 @@ push_dp_ops(struct udpif *udpif, struct ukey_op *ops, size_t n_ops)
                 key_len = op->ukey->key_len;
             }
 
-            if (odp_flow_key_to_flow(key, key_len, &flow)
-                == ODP_FIT_ERROR) {
-                continue;
-            }
-
-            error = xlate_lookup(udpif->backer, &flow, &ofproto, NULL, NULL,
-                                 &netflow, &ofp_in_port);
-            if (!error) {
-                struct xlate_in xin;
-
-                xlate_in_init(&xin, ofproto, &flow, ofp_in_port, NULL,
-                              push->tcp_flags, NULL, NULL, NULL);
-                xin.resubmit_stats = push->n_packets ? push : NULL;
-                xin.may_learn = push->n_packets > 0;
-                xlate_actions_for_side_effects(&xin);
-
+            error = xlate_key(udpif, key, key_len, push, &ctx);
+            if (error) {
+                static struct vlog_rate_limit rll = VLOG_RATE_LIMIT_INIT(1, 5);
+                VLOG_WARN_RL(&rll, "xlate_key failed (%s)!",
+                             ovs_strerror(error));
+            } else {
+                xlate_out_uninit(&ctx.xout);
                 if (netflow) {
-                    netflow_flow_clear(netflow, &flow);
+                    netflow_flow_clear(netflow, &ctx.flow);
                 }
             }
         }
@@ -2060,13 +2454,15 @@ push_ukey_ops(struct udpif *udpif, struct umap *umap,
 static void
 log_unexpected_flow(const struct dpif_flow *flow, int error)
 {
-    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 60);
     struct ds ds = DS_EMPTY_INITIALIZER;
 
     ds_put_format(&ds, "Failed to acquire udpif_key corresponding to "
                   "unexpected flow (%s): ", ovs_strerror(error));
     odp_format_ufid(&flow->ufid, &ds);
-    VLOG_WARN_RL(&rl, "%s", ds_cstr(&ds));
+
+    static struct vlog_rate_limit rll = VLOG_RATE_LIMIT_INIT(10, 60);
+    VLOG_WARN_RL(&rll, "%s", ds_cstr(&ds));
+
     ds_destroy(&ds);
 }
 
@@ -2074,9 +2470,11 @@ static void
 reval_op_init(struct ukey_op *op, enum reval_result result,
               struct udpif *udpif, struct udpif_key *ukey,
               struct recirc_refs *recircs, struct ofpbuf *odp_actions)
+    OVS_REQUIRES(ukey->mutex)
 {
     if (result == UKEY_DELETE) {
         delete_op_init(udpif, op, ukey);
+        transition_ukey(ukey, UKEY_EVICTING);
     } else if (result == UKEY_MODIFY) {
         /* Store the new recircs. */
         recirc_refs_swap(&ukey->recircs, recircs);
@@ -2085,8 +2483,120 @@ reval_op_init(struct ukey_op *op, enum reval_result result,
         /* ukey->key_recirc_id remains, as the key is the same as before. */
 
         ukey_set_actions(ukey, odp_actions);
-        modify_op_init(op, ukey);
+        put_op_init(op, ukey, DPIF_FP_MODIFY);
+    }
+}
+
+static void
+ukey_netdev_unref(struct udpif_key *ukey)
+{
+    if (!ukey->in_netdev) {
+        return;
     }
+    netdev_close(ukey->in_netdev);
+    ukey->in_netdev = NULL;
+}
+
+/*
+ * Given a udpif_key, get its input port (netdev) by parsing the flow keys
+ * and actions. The flow may not contain flow attributes if it is a terse
+ * dump; read its attributes from the ukey and then parse the flow to get
+ * the port info. Save them in udpif_key.
+ */
+static void
+ukey_to_flow_netdev(struct udpif *udpif, struct udpif_key *ukey)
+{
+    const char *dpif_type_str = dpif_normalize_type(dpif_type(udpif->dpif));
+    const struct nlattr *k;
+    unsigned int left;
+
+    /* Remove existing references to netdev */
+    ukey_netdev_unref(ukey);
+
+    /* Find the input port and get a reference to its netdev */
+    NL_ATTR_FOR_EACH (k, left, ukey->key, ukey->key_len) {
+        enum ovs_key_attr type = nl_attr_type(k);
+
+        if (type == OVS_KEY_ATTR_IN_PORT) {
+            ukey->in_netdev = netdev_ports_get(nl_attr_get_odp_port(k),
+                                               dpif_type_str);
+        } else if (type == OVS_KEY_ATTR_TUNNEL) {
+            struct flow_tnl tnl;
+            enum odp_key_fitness res;
+
+            if (ukey->in_netdev) {
+                netdev_close(ukey->in_netdev);
+                ukey->in_netdev = NULL;
+            }
+            res = odp_tun_key_from_attr(k, &tnl, NULL);
+            if (res != ODP_FIT_ERROR) {
+                ukey->in_netdev = flow_get_tunnel_netdev(&tnl);
+                break;
+            }
+        }
+    }
+}
+
+static uint64_t
+udpif_flow_packet_delta(struct udpif_key *ukey, const struct dpif_flow *f)
+{
+    return f->stats.n_packets + ukey->flow_backlog_packets -
+                ukey->flow_packets;
+}
+
+static long long int
+udpif_flow_time_delta(struct udpif *udpif, struct udpif_key *ukey)
+{
+    return (udpif->dpif->current_ms - ukey->flow_time) / 1000;
+}
+
+/*
+ * Save backlog packet count while switching modes
+ * between offloaded and kernel datapaths.
+ */
+static void
+udpif_set_ukey_backlog_packets(struct udpif_key *ukey)
+{
+    ukey->flow_backlog_packets = ukey->flow_packets;
+}
+
+/* Gather pps-rate for the given dpif_flow and save it in its ukey */
+static void
+udpif_update_flow_pps(struct udpif *udpif, struct udpif_key *ukey,
+                      const struct dpif_flow *f)
+{
+    uint64_t pps;
+
+    /* Update pps-rate only when we are close to rebalance interval */
+    if (udpif->dpif->current_ms - ukey->flow_time < OFFL_REBAL_INTVL_MSEC) {
+        return;
+    }
+
+    ukey->offloaded = f->attrs.offloaded;
+    pps = udpif_flow_packet_delta(ukey, f) /
+                    udpif_flow_time_delta(udpif, ukey);
+    ukey->flow_pps_rate = pps;
+    ukey->flow_packets = ukey->flow_backlog_packets + f->stats.n_packets;
+    ukey->flow_time = udpif->dpif->current_ms;
+}
+
+static long long int
+udpif_update_used(struct udpif *udpif, struct udpif_key *ukey,
+                  struct dpif_flow_stats *stats)
+    OVS_REQUIRES(ukey->mutex)
+{
+    if (!udpif->dump->terse) {
+        return ukey->created;
+    }
+
+    if (stats->n_packets > ukey->stats.n_packets) {
+        stats->used = udpif->dpif->current_ms;
+    } else if (ukey->stats.used) {
+        stats->used = ukey->stats.used;
+    } else {
+        stats->used = ukey->created;
+    }
+    return stats->used;
 }
 
 static void
@@ -2098,6 +2608,7 @@ revalidate(struct revalidator *revalidator)
     struct udpif *udpif = revalidator->udpif;
     struct dpif_flow_dump_thread *dump_thread;
     uint64_t dump_seq, reval_seq;
+    bool kill_warn_print = true;
     unsigned int flow_limit;
 
     dump_seq = seq_read(udpif->dump_seq);
@@ -2114,6 +2625,7 @@ revalidate(struct revalidator *revalidator)
 
         long long int max_idle;
         long long int now;
+        size_t kill_all_limit;
         size_t n_dp_flows;
         bool kill_them_all;
 
@@ -2137,12 +2649,34 @@ revalidate(struct revalidator *revalidator)
          *       datapath flows, so we will recover before all the flows are
          *       gone.) */
         n_dp_flows = udpif_get_n_flows(udpif);
-        kill_them_all = n_dp_flows > flow_limit * 2;
+        if (n_dp_flows >= flow_limit) {
+            COVERAGE_INC(upcall_flow_limit_hit);
+        }
+
+        kill_them_all = false;
+        kill_all_limit = flow_limit * 2;
+        if (OVS_UNLIKELY(n_dp_flows > kill_all_limit)) {
+            static struct vlog_rate_limit rlem = VLOG_RATE_LIMIT_INIT(1, 1);
+
+            kill_them_all = true;
+            COVERAGE_INC(upcall_flow_limit_kill);
+            if (kill_warn_print) {
+                kill_warn_print = false;
+                VLOG_WARN_RL(&rlem,
+                    "Number of datapath flows (%"PRIuSIZE") twice as high as "
+                    "current dynamic flow limit (%"PRIuSIZE").  "
+                    "Starting to delete flows unconditionally "
+                    "as an emergency measure.", n_dp_flows, kill_all_limit);
+            }
+        }
+
         max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;
 
+        udpif->dpif->current_ms = time_msec();
         for (f = flows; f < &flows[n_dumped]; f++) {
             long long int used = f->stats.used;
             struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER;
+            struct dpif_flow_stats stats = f->stats;
             enum reval_result result;
             struct udpif_key *ukey;
             bool already_dumped;
@@ -2175,17 +2709,33 @@ revalidate(struct revalidator *revalidator)
                 continue;
             }
 
+            if (ukey->state <= UKEY_OPERATIONAL) {
+                /* The flow is now confirmed to be in the datapath. */
+                transition_ukey(ukey, UKEY_OPERATIONAL);
+            } else {
+                VLOG_INFO("Unexpected ukey transition from state %d "
+                          "(last transitioned from thread %u at %s)",
+                          ukey->state, ukey->state_thread, ukey->state_where);
+                ovs_mutex_unlock(&ukey->mutex);
+                continue;
+            }
+
             if (!used) {
-                used = ukey->created;
+                used = udpif_update_used(udpif, ukey, &stats);
             }
             if (kill_them_all || (used && used < now - max_idle)) {
                 result = UKEY_DELETE;
             } else {
-                result = revalidate_ukey(udpif, ukey, &f->stats, &odp_actions,
-                                         reval_seq, &recircs);
+                result = revalidate_ukey(udpif, ukey, &stats, &odp_actions,
+                                         reval_seq, &recircs,
+                                         f->attrs.offloaded);
             }
             ukey->dump_seq = dump_seq;
-            ukey->flow_exists = result != UKEY_DELETE;
+
+            if (netdev_is_offload_rebalance_policy_enabled() &&
+                result != UKEY_DELETE) {
+                udpif_update_flow_pps(udpif, ukey, f);
+            }
 
             if (result != UKEY_KEEP) {
                 /* Takes ownership of 'recircs'. */
@@ -2239,15 +2789,16 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge)
         size_t n_ops = 0;
 
         CMAP_FOR_EACH(ukey, cmap_node, &umap->cmap) {
-            bool flow_exists;
+            enum ukey_state ukey_state;
 
             /* Handler threads could be holding a ukey lock while it installs a
              * new flow, so don't hang around waiting for access to it. */
             if (ovs_mutex_trylock(&ukey->mutex)) {
                 continue;
             }
-            flow_exists = ukey->flow_exists;
-            if (flow_exists) {
+            ukey_state = ukey->state;
+            if (ukey_state == UKEY_OPERATIONAL
+                || (ukey_state == UKEY_VISIBLE && purge)) {
                 struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER;
                 bool seq_mismatch = (ukey->dump_seq != dump_seq
                                      && ukey->reval_seq != reval_seq);
@@ -2262,7 +2813,7 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge)
                     COVERAGE_INC(revalidate_missed_dp_flow);
                     memset(&stats, 0, sizeof stats);
                     result = revalidate_ukey(udpif, ukey, &stats, &odp_actions,
-                                             reval_seq, &recircs);
+                                             reval_seq, &recircs, false);
                 }
                 if (result != UKEY_KEEP) {
                     /* Clears 'recircs' if filled by revalidate_ukey(). */
@@ -2272,7 +2823,7 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge)
             }
             ovs_mutex_unlock(&ukey->mutex);
 
-            if (!flow_exists) {
+            if (ukey_state == UKEY_EVICTED) {
                 /* The common flow deletion case involves deletion of the flow
                  * during the dump phase and ukey deletion here. */
                 ovs_mutex_lock(&umap->mutex);
@@ -2312,6 +2863,7 @@ revalidator_purge(struct revalidator *revalidator)
 /* In reaction to dpif purge, purges all 'ukey's with same 'pmd_id'. */
 static void
 dp_purge_cb(void *aux, unsigned pmd_id)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
     struct udpif *udpif = aux;
     size_t i;
@@ -2324,8 +2876,10 @@ dp_purge_cb(void *aux, unsigned pmd_id)
         size_t n_ops = 0;
 
         CMAP_FOR_EACH(ukey, cmap_node, &umap->cmap) {
-             if (ukey->pmd_id == pmd_id) {
+            if (ukey->pmd_id == pmd_id) {
                 delete_op_init(udpif, &ops[n_ops++], ukey);
+                transition_ukey(ukey, UKEY_EVICTING);
+
                 if (n_ops == REVALIDATE_MAX_BATCH) {
                     push_ukey_ops(udpif, umap, ops, n_ops);
                     n_ops = 0;
@@ -2347,6 +2901,7 @@ upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
                     const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
 {
     struct ds ds = DS_EMPTY_INITIALIZER;
+    uint64_t n_offloaded_flows;
     struct udpif *udpif;
 
     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
@@ -2358,11 +2913,15 @@ upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
         ufid_enabled = udpif_use_ufid(udpif);
 
         ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
-        ds_put_format(&ds, "\tflows         : (current %lu)"
+        ds_put_format(&ds, "  flows         : (current %lu)"
             " (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
             udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
-        ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
-        ds_put_format(&ds, "\tufid enabled : ");
+        if (!dpif_get_n_offloaded_flows(udpif->dpif, &n_offloaded_flows)) {
+            ds_put_format(&ds, "  offloaded flows : %"PRIu64"\n",
+                          n_offloaded_flows);
+        }
+        ds_put_format(&ds, "  dump duration : %lldms\n", udpif->dump_duration);
+        ds_put_format(&ds, "  ufid enabled : ");
         if (ufid_enabled) {
             ds_put_format(&ds, "true\n");
         } else {
@@ -2377,7 +2936,7 @@ upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
             for (j = i; j < N_UMAPS; j += n_revalidators) {
                 elements += cmap_count(&udpif->ukeys[j].cmap);
             }
-            ds_put_format(&ds, "\t%u: (keys %d)\n", revalidator->id, elements);
+            ds_put_format(&ds, "  %u: (keys %d)\n", revalidator->id, elements);
         }
     }
 
@@ -2447,7 +3006,7 @@ upcall_unixctl_enable_ufid(struct unixctl_conn *conn, int argc OVS_UNUSED,
 static void
 upcall_unixctl_set_flow_limit(struct unixctl_conn *conn,
                               int argc OVS_UNUSED,
-                              const char *argv[] OVS_UNUSED,
+                              const char *argv[],
                               void *aux OVS_UNUSED)
 {
     struct ds ds = DS_EMPTY_INITIALIZER;
@@ -2497,3 +3056,342 @@ upcall_unixctl_purge(struct unixctl_conn *conn, int argc OVS_UNUSED,
     }
     unixctl_command_reply(conn, "");
 }
+
+/* Flows are sorted in the following order:
+ * netdev, flow state (offloaded/kernel path), flow_pps_rate.
+ */
+static int
+flow_compare_rebalance(const void *elem1, const void *elem2)
+{
+    const struct udpif_key *f1 = *(struct udpif_key **)elem1;
+    const struct udpif_key *f2 = *(struct udpif_key **)elem2;
+    int64_t diff;
+
+    if (f1->in_netdev < f2->in_netdev) {
+        return -1;
+    } else if (f1->in_netdev > f2->in_netdev) {
+        return 1;
+    }
+
+    if (f1->offloaded != f2->offloaded) {
+        return f2->offloaded - f1->offloaded;
+    }
+
+    diff = (f1->offloaded == true) ?
+        f1->flow_pps_rate - f2->flow_pps_rate :
+        f2->flow_pps_rate - f1->flow_pps_rate;
+
+    return (diff < 0) ? -1 : 1;
+}
+
+/* Insert flows from pending array during rebalancing */
+static int
+rebalance_insert_pending(struct udpif *udpif, struct udpif_key **pending_flows,
+                         int pending_count, int insert_count,
+                         uint64_t rate_threshold)
+{
+    int count = 0;
+
+    for (int i = 0; i < pending_count; i++) {
+        struct udpif_key *flow = pending_flows[i];
+        int err;
+
+        /* Stop offloading pending flows if the insert count is
+         * reached and the flow rate is less than the threshold
+         */
+        if (count >= insert_count && flow->flow_pps_rate < rate_threshold) {
+                break;
+        }
+
+        /* Offload the flow to netdev */
+        err = udpif_flow_program(udpif, flow, DPIF_OFFLOAD_ALWAYS);
+
+        if (err == ENOSPC) {
+            /* Stop if we are out of resources */
+            break;
+        }
+
+        if (err) {
+            continue;
+        }
+
+        /* Offload succeeded; delete it from the kernel datapath */
+        udpif_flow_unprogram(udpif, flow, DPIF_OFFLOAD_NEVER);
+
+        /* Change the state of the flow, adjust dpif counters */
+        flow->offloaded = true;
+
+        udpif_set_ukey_backlog_packets(flow);
+        count++;
+    }
+
+    return count;
+}
+
+/* Remove flows from offloaded array during rebalancing */
+static void
+rebalance_remove_offloaded(struct udpif *udpif,
+                           struct udpif_key **offloaded_flows,
+                           int offload_count)
+{
+    for (int i = 0; i < offload_count; i++) {
+        struct udpif_key *flow = offloaded_flows[i];
+        int err;
+
+        /* Install the flow into kernel path first */
+        err = udpif_flow_program(udpif, flow, DPIF_OFFLOAD_NEVER);
+        if (err) {
+            continue;
+        }
+
+        /* Success; now remove offloaded flow from netdev */
+        err = udpif_flow_unprogram(udpif, flow, DPIF_OFFLOAD_ALWAYS);
+        if (err) {
+            udpif_flow_unprogram(udpif, flow, DPIF_OFFLOAD_NEVER);
+            continue;
+        }
+        udpif_set_ukey_backlog_packets(flow);
+        flow->offloaded = false;
+    }
+}
+
+/*
+ * Rebalance offloaded flows on a netdev that's in OOR state.
+ *
+ * The rebalancing is done in two phases. In the first phase, we check if
+ * the pending flows can be offloaded (if some resources became available
+ * in the meantime) by trying to offload each pending flow. If all pending
+ * flows get successfully offloaded, the OOR state is cleared on the netdev
+ * and there's nothing to rebalance.
+ *
+ * If some of the pending flows could not be offloaded, i.e, we still see
+ * the OOR error, then we move to the second phase of rebalancing. In this
+ * phase, the rebalancer compares pps-rate of an offloaded flow with the
+ * least pps-rate with that of a pending flow with the highest pps-rate from
+ * their respective sorted arrays. If pps-rate of the offloaded flow is less
+ * than the pps-rate of the pending flow, then it deletes the offloaded flow
+ * from the HW/netdev and adds it to kernel datapath and then offloads pending
+ * to HW/netdev. This process is repeated for every pair of offloaded and
+ * pending flows in the ordered list. The process stops when we encounter an
+ * offloaded flow that has a higher pps-rate than the corresponding pending
+ * flow. The entire rebalancing process is repeated in the next iteration.
+ */
+static bool
+rebalance_device(struct udpif *udpif, struct udpif_key **offloaded_flows,
+                 int offload_count, struct udpif_key **pending_flows,
+                 int pending_count)
+{
+
+    /* Phase 1 */
+    int num_inserted = rebalance_insert_pending(udpif, pending_flows,
+                                                pending_count, pending_count,
+                                                0);
+    if (num_inserted) {
+        VLOG_DBG("Offload rebalance: Phase1: inserted %d pending flows",
+                  num_inserted);
+    }
+
+    /* Adjust pending array */
+    pending_flows = &pending_flows[num_inserted];
+    pending_count -= num_inserted;
+
+    if (!pending_count) {
+        /*
+         * Successfully offloaded all pending flows. The device
+         * is no longer in OOR state; done rebalancing this device.
+         */
+        return false;
+    }
+
+    /*
+     * Phase 2; determine how many offloaded flows to churn.
+     */
+#define        OFFL_REBAL_MAX_CHURN    1024
+    int churn_count = 0;
+    while (churn_count < OFFL_REBAL_MAX_CHURN && churn_count < offload_count
+           && churn_count < pending_count) {
+        if (pending_flows[churn_count]->flow_pps_rate <=
+            offloaded_flows[churn_count]->flow_pps_rate)
+                break;
+        churn_count++;
+    }
+
+    if (churn_count) {
+        VLOG_DBG("Offload rebalance: Phase2: removing %d offloaded flows",
+                  churn_count);
+    }
+
+    /* Bail early if nothing to churn */
+    if (!churn_count) {
+        return true;
+    }
+
+    /* Remove offloaded flows */
+    rebalance_remove_offloaded(udpif, offloaded_flows, churn_count);
+
+    /* Adjust offloaded array */
+    offloaded_flows = &offloaded_flows[churn_count];
+    offload_count -= churn_count;
+
+    /* Replace offloaded flows with pending flows */
+    num_inserted = rebalance_insert_pending(udpif, pending_flows,
+                                            pending_count, churn_count,
+                                            offload_count ?
+                                            offloaded_flows[0]->flow_pps_rate :
+                                            0);
+    if (num_inserted) {
+        VLOG_DBG("Offload rebalance: Phase2: inserted %d pending flows",
+                  num_inserted);
+    }
+
+    return true;
+}
+
+static struct udpif_key **
+udpif_add_oor_flows(struct udpif_key **sort_flows, size_t *total_flow_count,
+                    size_t *alloc_flow_count, struct udpif_key *ukey)
+{
+    if (*total_flow_count >= *alloc_flow_count) {
+        sort_flows = x2nrealloc(sort_flows, alloc_flow_count, sizeof ukey);
+    }
+    sort_flows[(*total_flow_count)++] = ukey;
+    return sort_flows;
+}
+
+/*
+ * Build sort_flows[] initially with flows that
+ * reference an 'OOR' netdev as their input port.
+ */
+static struct udpif_key **
+udpif_build_oor_flows(struct udpif_key **sort_flows, size_t *total_flow_count,
+                      size_t *alloc_flow_count, struct udpif_key *ukey,
+                      int *oor_netdev_count)
+{
+    struct netdev *netdev;
+    int count;
+
+    /* Input netdev must be available for the flow */
+    netdev = ukey->in_netdev;
+    if (!netdev) {
+        return sort_flows;
+    }
+
+    /* Is the in-netdev for this flow in OOR state ? */
+    if (!netdev_get_hw_info(netdev, HW_INFO_TYPE_OOR)) {
+        ukey_netdev_unref(ukey);
+        return sort_flows;
+    }
+
+    /* Add the flow to sort_flows[] */
+    sort_flows = udpif_add_oor_flows(sort_flows, total_flow_count,
+                                      alloc_flow_count, ukey);
+    if (ukey->offloaded) {
+        count = netdev_get_hw_info(netdev, HW_INFO_TYPE_OFFL_COUNT);
+        ovs_assert(count >= 0);
+        if (count++ == 0) {
+            (*oor_netdev_count)++;
+        }
+        netdev_set_hw_info(netdev, HW_INFO_TYPE_OFFL_COUNT, count);
+    } else {
+        count = netdev_get_hw_info(netdev, HW_INFO_TYPE_PEND_COUNT);
+        ovs_assert(count >= 0);
+        netdev_set_hw_info(netdev, HW_INFO_TYPE_PEND_COUNT, ++count);
+    }
+
+    return sort_flows;
+}
+
+/*
+ * Rebalance offloaded flows on HW netdevs that are in OOR state.
+ */
+static void
+udpif_flow_rebalance(struct udpif *udpif)
+{
+    struct udpif_key **sort_flows = NULL;
+    size_t alloc_flow_count = 0;
+    size_t total_flow_count = 0;
+    int oor_netdev_count = 0;
+    int offload_index = 0;
+    int pending_index;
+
+    /* Collect flows (offloaded and pending) that reference OOR netdevs */
+    for (size_t i = 0; i < N_UMAPS; i++) {
+        struct udpif_key *ukey;
+        struct umap *umap = &udpif->ukeys[i];
+
+        CMAP_FOR_EACH (ukey, cmap_node, &umap->cmap) {
+            ukey_to_flow_netdev(udpif, ukey);
+            sort_flows = udpif_build_oor_flows(sort_flows, &total_flow_count,
+                                               &alloc_flow_count, ukey,
+                                               &oor_netdev_count);
+        }
+    }
+
+    /* Sort flows by OOR netdevs, state (offloaded/pending) and pps-rate  */
+    qsort(sort_flows, total_flow_count, sizeof(struct udpif_key *),
+          flow_compare_rebalance);
+
+    /*
+     * We now have flows referencing OOR netdevs, that are sorted. We also
+     * have a count of offloaded and pending flows on each of the netdevs
+     * that are in OOR state. Now rebalance each oor-netdev.
+     */
+    while (oor_netdev_count) {
+        struct netdev *netdev;
+        int offload_count;
+        int pending_count;
+        bool oor;
+
+        netdev = sort_flows[offload_index]->in_netdev;
+        ovs_assert(netdev_get_hw_info(netdev, HW_INFO_TYPE_OOR) == true);
+        VLOG_DBG("Offload rebalance: netdev: %s is OOR", netdev->name);
+
+        offload_count = netdev_get_hw_info(netdev, HW_INFO_TYPE_OFFL_COUNT);
+        pending_count = netdev_get_hw_info(netdev, HW_INFO_TYPE_PEND_COUNT);
+        pending_index = offload_index + offload_count;
+
+        oor = rebalance_device(udpif,
+                               &sort_flows[offload_index], offload_count,
+                               &sort_flows[pending_index], pending_count);
+        netdev_set_hw_info(netdev, HW_INFO_TYPE_OOR, oor);
+
+        offload_index = pending_index + pending_count;
+        netdev_set_hw_info(netdev, HW_INFO_TYPE_OFFL_COUNT, 0);
+        netdev_set_hw_info(netdev, HW_INFO_TYPE_PEND_COUNT, 0);
+        oor_netdev_count--;
+    }
+
+    for (int i = 0; i < total_flow_count; i++) {
+        struct udpif_key *ukey = sort_flows[i];
+        ukey_netdev_unref(ukey);
+    }
+    free(sort_flows);
+}
+
+static int
+udpif_flow_program(struct udpif *udpif, struct udpif_key *ukey,
+                   enum dpif_offload_type offload_type)
+{
+    struct dpif_op *opsp;
+    struct ukey_op uop;
+
+    opsp = &uop.dop;
+    put_op_init(&uop, ukey, DPIF_FP_CREATE);
+    dpif_operate(udpif->dpif, &opsp, 1, offload_type);
+
+    return opsp->error;
+}
+
+static int
+udpif_flow_unprogram(struct udpif *udpif, struct udpif_key *ukey,
+                     enum dpif_offload_type offload_type)
+{
+    struct dpif_op *opsp;
+    struct ukey_op uop;
+
+    opsp = &uop.dop;
+    delete_op_init(udpif, &uop, ukey);
+    dpif_operate(udpif->dpif, &opsp, 1, offload_type);
+
+    return opsp->error;
+}