]> git.proxmox.com Git - mirror_ovs.git/blobdiff - lib/ovsdb-idl.c
ovsdb-idl: Use modern form of <monitor-requests>.
[mirror_ovs.git] / lib / ovsdb-idl.c
index 2a109a948e92d68ce6ff5172477d489fe08b6748..642ce66f9e0ff2fac7f2dbd1890a106bde93b3e2 100644 (file)
@@ -1,4 +1,5 @@
-/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017 Nicira, Inc.
+ * Copyright (C) 2016 Hewlett Packard Enterprise Development LP
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 #include "bitmap.h"
 #include "coverage.h"
-#include "dynamic-string.h"
+#include "hash.h"
+#include "openvswitch/dynamic-string.h"
 #include "fatal-signal.h"
-#include "json.h"
+#include "openvswitch/json.h"
 #include "jsonrpc.h"
 #include "ovsdb/ovsdb.h"
 #include "ovsdb/table.h"
 #include "ovsdb-error.h"
 #include "ovsdb-idl-provider.h"
 #include "ovsdb-parser.h"
-#include "poll-loop.h"
-#include "shash.h"
+#include "openvswitch/poll-loop.h"
+#include "openvswitch/shash.h"
+#include "skiplist.h"
 #include "sset.h"
 #include "util.h"
+#include "uuid.h"
 #include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
@@ -75,28 +79,62 @@ struct ovsdb_idl_arc {
     struct ovsdb_idl_row *dst;  /* Destination row. */
 };
 
+/* Connection state machine.
+ *
+ * When a JSON-RPC session connects, the IDL sends a "get_schema" request and
+ * transitions to IDL_S_SCHEMA_REQUESTED.  If the session drops and reconnects,
+ * the IDL starts over again in the same way. */
 enum ovsdb_idl_state {
+    /* Waits for "get_schema" reply, then sends a "monitor_cond" request whose
+     * details are informed by the schema and transitions to
+     * IDL_S_MONITOR_COND_REQUESTED. */
     IDL_S_SCHEMA_REQUESTED,
+
+    /* Waits for "monitor_cond" reply:
+     *
+     *    - If the reply indicates success, replaces the IDL contents by the
+     *      data carried in the reply and transitions to IDL_S_MONITORING_COND.
+     *
+     *    - If the reply indicates failure because the database is too old to
+     *      support monitor_cond, sends a "monitor" request and transitions to
+     *      IDl_S_MONITOR_REQUESTED.  */
+    IDL_S_MONITOR_COND_REQUESTED,
+
+    /* Waits for "monitor" reply, then replaces the IDL contents by the data
+     * carried in the reply and transitions to IDL_S_MONITORING.  */
     IDL_S_MONITOR_REQUESTED,
+
+    /* Terminal states that process "update2" (IDL_S_MONITORING_COND) or
+     * "update" (IDL_S_MONITORING) notifications. */
+    IDL_S_MONITORING_COND,
     IDL_S_MONITORING,
-    IDL_S_MONITOR2_REQUESTED,
-    IDL_S_MONITORING2,
+
+    /* Terminal error state that indicates that nothing useful can be done.
+     * The most likely reason is that the database server doesn't have the
+     * desired database.  We maintain the session with the database server
+     * anyway.  If it starts serving the database that we want, then it will
+     * kill the session and we will automatically reconnect and try again. */
     IDL_S_NO_SCHEMA
 };
 
 struct ovsdb_idl {
-    const struct ovsdb_idl_class *class;
-    struct jsonrpc_session *session;
-    struct shash table_by_name;
-    struct ovsdb_idl_table *tables; /* Contains "struct ovsdb_idl_table *"s.*/
+    /* Data. */
+    const struct ovsdb_idl_class *class_;
+    struct shash table_by_name; /* Contains "struct ovsdb_idl_table *"s.*/
+    struct ovsdb_idl_table *tables; /* Array of ->class_->n_tables elements. */
     unsigned int change_seqno;
-    bool verify_write_only;
 
-    /* Session state. */
-    unsigned int state_seqno;
-    enum ovsdb_idl_state state;
-    struct json *request_id;
-    struct json *schema;
+    /* Session state.
+     *
+     *'state_seqno' is a snapshot of the session's sequence number as returned
+     * jsonrpc_session_get_seqno(session), so if it differs from the value that
+     * function currently returns then the session has reconnected and the
+     * state machine must restart.  */
+    struct jsonrpc_session *session; /* Connection to the server. */
+    enum ovsdb_idl_state state;      /* Current session state. */
+    unsigned int state_seqno;        /* See above. */
+    struct json *request_id;         /* JSON ID for request awaiting reply. */
+    struct json *schema;             /* Temporary copy of database schema. */
 
     /* Database locking. */
     char *lock_name;            /* Name of lock we need, NULL if none. */
@@ -107,6 +145,14 @@ struct ovsdb_idl {
     /* Transaction support. */
     struct ovsdb_idl_txn *txn;
     struct hmap outstanding_txns;
+    bool verify_write_only;
+
+    /* Conditional monitoring. */
+    bool cond_changed;
+    unsigned int cond_seqno;   /* Keep track of condition clauses changes
+                                  over a single conditional monitoring session.
+                                  Reverts to zero when idl session
+                                  reconnects.  */
 };
 
 struct ovsdb_idl_txn {
@@ -123,6 +169,7 @@ struct ovsdb_idl_txn {
     const char *inc_table;
     const char *inc_column;
     struct uuid inc_row;
+    bool inc_force;
     unsigned int inc_index;
     int64_t inc_new_value;
 
@@ -150,11 +197,12 @@ static const char *row_update_names[] = {"row_update", "row_update2"};
 
 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
+static struct vlog_rate_limit other_rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
 static void ovsdb_idl_clear(struct ovsdb_idl *);
 static void ovsdb_idl_send_schema_request(struct ovsdb_idl *);
 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
-static void ovsdb_idl_send_monitor2_request(struct ovsdb_idl *);
+static void ovsdb_idl_send_monitor_cond_request(struct ovsdb_idl *);
 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *,
                                    enum ovsdb_update_version);
 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
@@ -181,6 +229,8 @@ static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
                                                   const struct uuid *);
 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
 static void ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *);
+static void ovsdb_idl_destroy_all_map_op_lists(struct ovsdb_idl_row *);
+static void ovsdb_idl_destroy_all_set_op_lists(struct ovsdb_idl_row *);
 
 static void ovsdb_idl_row_parse(struct ovsdb_idl_row *);
 static void ovsdb_idl_row_unparse(struct ovsdb_idl_row *);
@@ -191,6 +241,16 @@ static void ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *, bool destroy_dsts);
 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
                                         const struct jsonrpc_msg *msg);
+static bool ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *,
+                                            struct json *);
+static void ovsdb_idl_txn_add_map_op(struct ovsdb_idl_row *,
+                                     const struct ovsdb_idl_column *,
+                                     struct ovsdb_datum *,
+                                     enum map_op_type);
+static void ovsdb_idl_txn_add_set_op(struct ovsdb_idl_row *,
+                                     const struct ovsdb_idl_column *,
+                                     struct ovsdb_datum *,
+                                     enum set_op_type);
 
 static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
 static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
@@ -203,6 +263,15 @@ static struct ovsdb_idl_table *
 ovsdb_idl_table_from_class(const struct ovsdb_idl *,
                            const struct ovsdb_idl_table_class *);
 static bool ovsdb_idl_track_is_set(struct ovsdb_idl_table *table);
+static void ovsdb_idl_send_cond_change(struct ovsdb_idl *idl);
+
+static struct ovsdb_idl_index *ovsdb_idl_create_index_(const struct
+                                                       ovsdb_idl_table *table,
+                                                       size_t allocated_cols);
+static void
+ ovsdb_idl_destroy_indexes(struct ovsdb_idl_table *table);
+static void ovsdb_idl_add_to_indexes(const struct ovsdb_idl_row *);
+static void ovsdb_idl_remove_from_indexes(const struct ovsdb_idl_row *);
 
 /* Creates and returns a connection to database 'remote', which should be in a
  * form acceptable to jsonrpc_session_open().  The connection will maintain an
@@ -235,7 +304,7 @@ ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
                     : 0);
 
     idl = xzalloc(sizeof *idl);
-    idl->class = class;
+    idl->class_ = class;
     idl->session = jsonrpc_session_open(remote, retry);
     shash_init(&idl->table_by_name);
     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
@@ -245,24 +314,30 @@ ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
         size_t j;
 
         shash_add_assert(&idl->table_by_name, tc->name, table);
-        table->class = tc;
+        table->class_ = tc;
         table->modes = xmalloc(tc->n_columns);
         memset(table->modes, default_mode, tc->n_columns);
         table->need_table = false;
         shash_init(&table->columns);
+        shash_init(&table->indexes);
         for (j = 0; j < tc->n_columns; j++) {
             const struct ovsdb_idl_column *column = &tc->columns[j];
 
             shash_add_assert(&table->columns, column->name, column);
         }
         hmap_init(&table->rows);
-        list_init(&table->track_list);
+        ovs_list_init(&table->track_list);
         table->change_seqno[OVSDB_IDL_CHANGE_INSERT]
             = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]
             = table->change_seqno[OVSDB_IDL_CHANGE_DELETE] = 0;
         table->idl = idl;
+        ovsdb_idl_condition_init(&table->condition);
+        ovsdb_idl_condition_add_clause_true(&table->condition);
+        table->cond_changed = false;
     }
 
+    idl->cond_changed = false;
+    idl->cond_seqno = 0;
     idl->state_seqno = UINT_MAX;
     idl->request_id = NULL;
     idl->schema = NULL;
@@ -272,6 +347,19 @@ ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
     return idl;
 }
 
+/* Changes the remote and creates a new session. */
+void
+ovsdb_idl_set_remote(struct ovsdb_idl *idl, const char *remote,
+                     bool retry)
+{
+    if (idl) {
+        ovs_assert(!idl->txn);
+        jsonrpc_session_close(idl->session);
+        idl->session = jsonrpc_session_open(remote, retry);
+        idl->state_seqno = UINT_MAX;
+    }
+}
+
 /* Destroys 'idl' and all of the data structures that it manages. */
 void
 ovsdb_idl_destroy(struct ovsdb_idl *idl)
@@ -283,8 +371,10 @@ ovsdb_idl_destroy(struct ovsdb_idl *idl)
         ovsdb_idl_clear(idl);
         jsonrpc_session_close(idl->session);
 
-        for (i = 0; i < idl->class->n_tables; i++) {
+        for (i = 0; i < idl->class_->n_tables; i++) {
             struct ovsdb_idl_table *table = &idl->tables[i];
+            ovsdb_idl_condition_destroy(&table->condition);
+            ovsdb_idl_destroy_indexes(table);
             shash_destroy(&table->columns);
             hmap_destroy(&table->rows);
             free(table->modes);
@@ -306,10 +396,11 @@ ovsdb_idl_clear(struct ovsdb_idl *idl)
     bool changed = false;
     size_t i;
 
-    for (i = 0; i < idl->class->n_tables; i++) {
+    for (i = 0; i < idl->class_->n_tables; i++) {
         struct ovsdb_idl_table *table = &idl->tables[i];
         struct ovsdb_idl_row *row, *next_row;
 
+        table->cond_changed = false;
         if (hmap_is_empty(&table->rows)) {
             continue;
         }
@@ -319,6 +410,7 @@ ovsdb_idl_clear(struct ovsdb_idl *idl)
             struct ovsdb_idl_arc *arc, *next_arc;
 
             if (!ovsdb_idl_row_is_orphan(row)) {
+                ovsdb_idl_remove_from_indexes(row);
                 ovsdb_idl_row_unparse(row);
             }
             LIST_FOR_EACH_SAFE (arc, next_arc, src_node, &row->src_arcs) {
@@ -327,14 +419,16 @@ ovsdb_idl_clear(struct ovsdb_idl *idl)
             /* No need to do anything with dst_arcs: some node has those arcs
              * as forward arcs and will destroy them itself. */
 
-            if (!list_is_empty(&row->track_node)) {
-                list_remove(&row->track_node);
+            if (!ovs_list_is_empty(&row->track_node)) {
+                ovs_list_remove(&row->track_node);
             }
 
             ovsdb_idl_row_destroy(row);
         }
     }
 
+    idl->cond_changed = false;
+    idl->cond_seqno = 0;
     ovsdb_idl_track_clear(idl);
 
     if (changed) {
@@ -351,6 +445,9 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
     int i;
 
     ovs_assert(!idl->txn);
+
+    ovsdb_idl_send_cond_change(idl);
+
     jsonrpc_session_run(idl->session);
     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
         struct jsonrpc_msg *msg;
@@ -379,7 +476,7 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
             && !strcmp(msg->method, "update2")
             && msg->params->type == JSON_ARRAY
             && msg->params->u.array.n == 2
-            && msg->params->u.array.elems[0]->type == JSON_NULL) {
+            && msg->params->u.array.elems[0]->type == JSON_STRING) {
             /* Database contents changed. */
             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
                                    OVSDB_UPDATE2);
@@ -393,20 +490,20 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
             case IDL_S_SCHEMA_REQUESTED:
                 /* Reply to our "get_schema" request. */
                 idl->schema = json_clone(msg->result);
-                ovsdb_idl_send_monitor2_request(idl);
-                idl->state = IDL_S_MONITOR2_REQUESTED;
+                ovsdb_idl_send_monitor_cond_request(idl);
+                idl->state = IDL_S_MONITOR_COND_REQUESTED;
                 break;
 
             case IDL_S_MONITOR_REQUESTED:
-            case IDL_S_MONITOR2_REQUESTED:
-                /* Reply to our "monitor" or "monitor2" request. */
+            case IDL_S_MONITOR_COND_REQUESTED:
+                /* Reply to our "monitor" or "monitor_cond" request. */
                 idl->change_seqno++;
                 ovsdb_idl_clear(idl);
                 if (idl->state == IDL_S_MONITOR_REQUESTED) {
                     idl->state = IDL_S_MONITORING;
                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE);
-                } else { /* IDL_S_MONITOR2_REQUESTED. */
-                    idl->state = IDL_S_MONITORING2;
+                } else { /* IDL_S_MONITOR_COND_REQUESTED. */
+                    idl->state = IDL_S_MONITORING_COND;
                     ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE2);
                 }
 
@@ -416,17 +513,23 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
                 idl->schema = NULL;
                 break;
 
+            case IDL_S_MONITORING_COND:
+                /* Conditional monitor clauses were updated. Send out
+                 * the next condition changes, in any, immediately. */
+                ovsdb_idl_send_cond_change(idl);
+                idl->cond_seqno++;
+                break;
+
             case IDL_S_MONITORING:
-            case IDL_S_MONITORING2:
             case IDL_S_NO_SCHEMA:
             default:
                 OVS_NOT_REACHED();
             }
         } else if (msg->type == JSONRPC_NOTIFY
-            && !strcmp(msg->method, "update")
-            && msg->params->type == JSON_ARRAY
-            && msg->params->u.array.n == 2
-            && msg->params->u.array.elems[0]->type == JSON_NULL) {
+                   && !strcmp(msg->method, "update")
+                   && msg->params->type == JSON_ARRAY
+                   && msg->params->u.array.n == 2
+                   && msg->params->u.array.elems[0]->type == JSON_STRING) {
             /* Database contents changed. */
             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
                                    OVSDB_UPDATE);
@@ -444,26 +547,35 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
             /* Someone else stole our lock. */
             ovsdb_idl_parse_lock_notify(idl, msg->params, false);
         } else if (msg->type == JSONRPC_ERROR
-                   && idl->state == IDL_S_MONITOR2_REQUESTED
+                   && idl->state == IDL_S_MONITOR_COND_REQUESTED
                    && idl->request_id
                    && json_equal(idl->request_id, msg->id)) {
-            if (msg->error && !strcmp(json_string(msg->error),
-                                      "unknown method")) {
+            if (msg->error && msg->error->type == JSON_STRING
+                && !strcmp(json_string(msg->error), "unknown method")) {
                 /* Fall back to using "monitor" method.  */
                 json_destroy(idl->request_id);
                 idl->request_id = NULL;
                 ovsdb_idl_send_monitor_request(idl);
                 idl->state = IDL_S_MONITOR_REQUESTED;
             }
+        } else if (msg->type == JSONRPC_ERROR
+                   && idl->state == IDL_S_MONITORING_COND
+                   && idl->request_id
+                   && json_equal(idl->request_id, msg->id)) {
+            json_destroy(idl->request_id);
+            idl->request_id = NULL;
+            VLOG_ERR("%s: conditional monitor update failed",
+                     jsonrpc_session_get_name(idl->session));
+            idl->state = IDL_S_NO_SCHEMA;
         } else if (msg->type == JSONRPC_ERROR
                    && idl->state == IDL_S_SCHEMA_REQUESTED
                    && idl->request_id
                    && json_equal(idl->request_id, msg->id)) {
-                json_destroy(idl->request_id);
-                idl->request_id = NULL;
-                VLOG_ERR("%s: requested schema not found",
-                         jsonrpc_session_get_name(idl->session));
-                idl->state = IDL_S_NO_SCHEMA;
+            json_destroy(idl->request_id);
+            idl->request_id = NULL;
+            VLOG_ERR("%s: requested schema not found",
+                     jsonrpc_session_get_name(idl->session));
+            idl->state = IDL_S_NO_SCHEMA;
         } else if ((msg->type == JSONRPC_ERROR
                     || msg->type == JSONRPC_REPLY)
                    && ovsdb_idl_txn_process_reply(idl, msg)) {
@@ -513,6 +625,26 @@ ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
     return idl->change_seqno;
 }
 
+/* Returns a "sequence number" that represents the number of conditional
+ * monitoring updates successfully received by the OVSDB server of an IDL
+ * connection.
+ *
+ * ovsdb_idl_set_condition() sets a new condition that is different from
+ * the current condtion, the next expected "sequence number" is returned.
+ *
+ * Whenever ovsdb_idl_get_cond_seqno() returns a value that matches
+ * the return value of ovsdb_idl_set_condition(),  The client is
+ * assured that:
+ *   -  The ovsdb_idl_set_condition() changes has been acknowledged by
+ *      the OVSDB sever.
+ *
+ *   -  'idl' now contains the content matches the new conditions.   */
+unsigned int
+ovsdb_idl_get_condition_seqno(const struct ovsdb_idl *idl)
+{
+    return idl->cond_seqno;
+}
+
 /* Returns true if 'idl' successfully connected to the remote database and
  * retrieved its contents (even if the connection subsequently dropped and is
  * in the process of reconnecting).  If so, then 'idl' contains an atomic
@@ -582,27 +714,180 @@ ovsdb_idl_get_last_error(const struct ovsdb_idl *idl)
         return 0;
     }
 }
-\f
-static unsigned char *
-ovsdb_idl_get_mode(struct ovsdb_idl *idl,
-                   const struct ovsdb_idl_column *column)
+
+/* Sets the "probe interval" for 'idl->session' to 'probe_interval', in
+ * milliseconds.
+ */
+void
+ovsdb_idl_set_probe_interval(const struct ovsdb_idl *idl, int probe_interval)
 {
-    size_t i;
+    jsonrpc_session_set_probe_interval(idl->session, probe_interval);
+}
 
-    ovs_assert(!idl->change_seqno);
+static size_t
+find_uuid_in_array(const struct uuid *target,
+                   const struct uuid *array, size_t n)
+{
+    for (size_t i = 0; i < n; i++) {
+        if (uuid_equals(&array[i], target)) {
+            return i;
+        }
+    }
+    return SIZE_MAX;
+}
+
+static size_t
+array_contains_uuid(const struct uuid *target,
+                    const struct uuid *array, size_t n)
+{
+    return find_uuid_in_array(target, array, n) != SIZE_MAX;
+}
+
+static bool
+remove_uuid_from_array(const struct uuid *target,
+                       struct uuid *array, size_t *n)
+{
+    size_t i = find_uuid_in_array(target, array, *n);
+    if (i != SIZE_MAX) {
+        array[i] = array[--*n];
+        return true;
+    } else {
+        return false;
+    }
+}
+
+static void
+add_row_references(const struct ovsdb_base_type *type,
+                   const union ovsdb_atom *atoms, size_t n_atoms,
+                   const struct uuid *exclude_uuid,
+                   struct uuid **dstsp, size_t *n_dstsp,
+                   size_t *allocated_dstsp)
+{
+    if (type->type != OVSDB_TYPE_UUID || !type->u.uuid.refTableName) {
+        return;
+    }
+
+    for (size_t i = 0; i < n_atoms; i++) {
+        const struct uuid *uuid = &atoms[i].uuid;
+        if (!uuid_equals(uuid, exclude_uuid)
+            && !array_contains_uuid(uuid, *dstsp, *n_dstsp)) {
+            if (*n_dstsp >= *allocated_dstsp) {
+                *dstsp = x2nrealloc(*dstsp, allocated_dstsp,
+                                    sizeof **dstsp);
+
+            }
+            (*dstsp)[*n_dstsp] = *uuid;
+            ++*n_dstsp;
+        }
+    }
+}
+
+/* Checks for consistency in 'idl''s graph of arcs between database rows.  Each
+ * reference from one row to a different row should be reflected as a "struct
+ * ovsdb_idl_arc" between those rows.
+ *
+ * This function is slow, big-O wise, and aborts if it finds an inconsistency,
+ * thus it is only for use in test programs. */
+void
+ovsdb_idl_check_consistency(const struct ovsdb_idl *idl)
+{
+    /* Consistency is broken while a transaction is in progress. */
+    if (!idl->txn) {
+        return;
+    }
+
+    bool ok = true;
+
+    struct uuid *dsts = NULL;
+    size_t allocated_dsts = 0;
 
-    for (i = 0; i < idl->class->n_tables; i++) {
+    for (size_t i = 0; i < idl->class_->n_tables; i++) {
         const struct ovsdb_idl_table *table = &idl->tables[i];
-        const struct ovsdb_idl_table_class *tc = table->class;
+        const struct ovsdb_idl_table_class *class = table->class_;
+
+        const struct ovsdb_idl_row *row;
+        HMAP_FOR_EACH (row, hmap_node, &table->rows) {
+            size_t n_dsts = 0;
+            if (row->new_datum) {
+                size_t n_columns = shash_count(&row->table->columns);
+                for (size_t j = 0; j < n_columns; j++) {
+                    const struct ovsdb_type *type = &class->columns[j].type;
+                    const struct ovsdb_datum *datum = &row->new_datum[j];
+                    add_row_references(&type->key,
+                                       datum->keys, datum->n, &row->uuid,
+                                       &dsts, &n_dsts, &allocated_dsts);
+                    add_row_references(&type->value,
+                                       datum->values, datum->n, &row->uuid,
+                                       &dsts, &n_dsts, &allocated_dsts);
+                }
+            }
+            const struct ovsdb_idl_arc *arc;
+            LIST_FOR_EACH (arc, src_node, &row->src_arcs) {
+                if (!remove_uuid_from_array(&arc->dst->uuid,
+                                            dsts, &n_dsts)) {
+                    VLOG_ERR("unexpected arc from %s row "UUID_FMT" to %s "
+                             "row "UUID_FMT,
+                             table->class_->name,
+                             UUID_ARGS(&row->uuid),
+                             arc->dst->table->class_->name,
+                             UUID_ARGS(&arc->dst->uuid));
+                    ok = false;
+                }
+            }
+            for (size_t j = 0; j < n_dsts; j++) {
+                VLOG_ERR("%s row "UUID_FMT" missing arc to row "UUID_FMT,
+                         table->class_->name, UUID_ARGS(&row->uuid),
+                         UUID_ARGS(&dsts[j]));
+                ok = false;
+            }
+        }
+    }
+    free(dsts);
+    ovs_assert(ok);
+}
+\f
+const struct ovsdb_idl_class *
+ovsdb_idl_get_class(const struct ovsdb_idl *idl)
+{
+    return idl->class_;
+}
 
+/* Given 'column' in some table in 'class', returns the table's class. */
+const struct ovsdb_idl_table_class *
+ovsdb_idl_table_class_from_column(const struct ovsdb_idl_class *class,
+                                  const struct ovsdb_idl_column *column)
+{
+    for (size_t i = 0; i < class->n_tables; i++) {
+        const struct ovsdb_idl_table_class *tc = &class->tables[i];
         if (column >= tc->columns && column < &tc->columns[tc->n_columns]) {
-            return &table->modes[column - tc->columns];
+            return tc;
         }
     }
 
     OVS_NOT_REACHED();
 }
 
+/* Given 'column' in some table in 'idl', returns the table. */
+static struct ovsdb_idl_table *
+ovsdb_idl_table_from_column(struct ovsdb_idl *idl,
+                            const struct ovsdb_idl_column *column)
+{
+    const struct ovsdb_idl_table_class *tc =
+        ovsdb_idl_table_class_from_column(idl->class_, column);
+    return &idl->tables[tc - idl->class_->tables];
+}
+
+static unsigned char *
+ovsdb_idl_get_mode(struct ovsdb_idl *idl,
+                   const struct ovsdb_idl_column *column)
+{
+    ovs_assert(!idl->change_seqno);
+
+    const struct ovsdb_idl_table *table = ovsdb_idl_table_from_column(idl,
+                                                                      column);
+    return &table->modes[column - table->class_->columns];
+}
+
 static void
 add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
 {
@@ -615,7 +900,7 @@ add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
             table->need_table = true;
         } else {
             VLOG_WARN("%s IDL class missing referenced table %s",
-                      idl->class->database, base->u.uuid.refTableName);
+                      idl->class_->database, base->u.uuid.refTableName);
         }
     }
 }
@@ -657,10 +942,10 @@ ovsdb_idl_add_table(struct ovsdb_idl *idl,
 {
     size_t i;
 
-    for (i = 0; i < idl->class->n_tables; i++) {
+    for (i = 0; i < idl->class_->n_tables; i++) {
         struct ovsdb_idl_table *table = &idl->tables[i];
 
-        if (table->class == tc) {
+        if (table->class_ == tc) {
             table->need_table = true;
             return;
         }
@@ -668,6 +953,304 @@ ovsdb_idl_add_table(struct ovsdb_idl *idl,
 
     OVS_NOT_REACHED();
 }
+\f
+/* A single clause within an ovsdb_idl_condition. */
+struct ovsdb_idl_clause {
+    struct hmap_node hmap_node;   /* In struct ovsdb_idl_condition. */
+    enum ovsdb_function function; /* Never OVSDB_F_TRUE or OVSDB_F_FALSE. */
+    const struct ovsdb_idl_column *column; /* Must be nonnull. */
+    struct ovsdb_datum arg;       /* Has ovsdb_type ->column->type. */
+};
+
+static uint32_t
+ovsdb_idl_clause_hash(const struct ovsdb_idl_clause *clause)
+{
+    uint32_t hash = hash_pointer(clause->column, clause->function);
+    return ovsdb_datum_hash(&clause->arg, &clause->column->type, hash);
+}
+
+static int
+ovsdb_idl_clause_equals(const struct ovsdb_idl_clause *a,
+                        const struct ovsdb_idl_clause *b)
+{
+    return (a->function == b->function
+            && a->column == b->column
+            && ovsdb_datum_equals(&a->arg, &b->arg, &a->column->type));
+}
+
+static struct json *
+ovsdb_idl_clause_to_json(const struct ovsdb_idl_clause *clause)
+{
+    const char *function = ovsdb_function_to_string(clause->function);
+    return json_array_create_3(json_string_create(clause->column->name),
+                               json_string_create(function),
+                               ovsdb_datum_to_json(&clause->arg,
+                                                   &clause->column->type));
+}
+
+static void
+ovsdb_idl_clause_destroy(struct ovsdb_idl_clause *clause)
+{
+    if (clause) {
+        ovsdb_datum_destroy(&clause->arg, &clause->column->type);
+        free(clause);
+    }
+}
+\f
+/* ovsdb_idl_condition. */
+
+void
+ovsdb_idl_condition_init(struct ovsdb_idl_condition *cnd)
+{
+    hmap_init(&cnd->clauses);
+    cnd->is_true = false;
+}
+
+void
+ovsdb_idl_condition_destroy(struct ovsdb_idl_condition *cond)
+{
+    if (cond) {
+        ovsdb_idl_condition_clear(cond);
+        hmap_destroy(&cond->clauses);
+    }
+}
+
+void
+ovsdb_idl_condition_clear(struct ovsdb_idl_condition *cond)
+{
+    struct ovsdb_idl_clause *clause, *next;
+    HMAP_FOR_EACH_SAFE (clause, next, hmap_node, &cond->clauses) {
+        hmap_remove(&cond->clauses, &clause->hmap_node);
+        ovsdb_idl_clause_destroy(clause);
+    }
+    cond->is_true = false;
+}
+
+bool
+ovsdb_idl_condition_is_true(const struct ovsdb_idl_condition *condition)
+{
+    return condition->is_true;
+}
+
+static struct ovsdb_idl_clause *
+ovsdb_idl_condition_find_clause(const struct ovsdb_idl_condition *condition,
+                                const struct ovsdb_idl_clause *target,
+                                uint32_t hash)
+{
+    struct ovsdb_idl_clause *clause;
+    HMAP_FOR_EACH_WITH_HASH (clause, hmap_node, hash, &condition->clauses) {
+        if (ovsdb_idl_clause_equals(clause, target)) {
+            return clause;
+        }
+    }
+    return NULL;
+}
+
+static void
+ovsdb_idl_condition_add_clause__(struct ovsdb_idl_condition *condition,
+                                 const struct ovsdb_idl_clause *src,
+                                 uint32_t hash)
+{
+    struct ovsdb_idl_clause *clause = xmalloc(sizeof *clause);
+    clause->function = src->function;
+    clause->column = src->column;
+    ovsdb_datum_clone(&clause->arg, &src->arg, &src->column->type);
+    hmap_insert(&condition->clauses, &clause->hmap_node, hash);
+}
+
+/* Adds a clause to the condition for replicating the table with class 'tc' in
+ * 'idl'.
+ *
+ * The IDL replicates only rows in a table that satisfy at least one clause in
+ * the table's condition.  The default condition for a table has a single
+ * clause with function OVSDB_F_TRUE, so that the IDL replicates all rows in
+ * the table.  When the IDL client replaces the default condition by one of its
+ * own, the condition can have any number of clauses.  If it has no conditions,
+ * then no rows are replicated.
+ *
+ * Two distinct of clauses can usefully be added:
+ *
+ *    - A 'function' of OVSDB_F_TRUE.  A "true" clause causes every row to be
+ *      replicated, regardless of whether other clauses exist.  'column' and
+ *      'arg' are ignored.
+ *
+ *    - Binary 'functions' add a clause of the form "<column> <function>
+ *      <arg>", e.g. "column == 5" or "column <= 10".  In this case, 'arg' must
+ *      have a type that is compatible with 'column'.
+ */
+void
+ovsdb_idl_condition_add_clause(struct ovsdb_idl_condition *condition,
+                               enum ovsdb_function function,
+                               const struct ovsdb_idl_column *column,
+                               const struct ovsdb_datum *arg)
+{
+    if (condition->is_true) {
+        /* Adding a clause to an always-true condition has no effect.  */
+    } else if (function == OVSDB_F_TRUE) {
+        ovsdb_idl_condition_add_clause_true(condition);
+    } else if (function == OVSDB_F_FALSE) {
+        /* Adding a "false" clause never has any effect. */
+    } else {
+        struct ovsdb_idl_clause clause = {
+            .function = function,
+            .column = column,
+            .arg = *arg,
+        };
+        uint32_t hash = ovsdb_idl_clause_hash(&clause);
+        if (!ovsdb_idl_condition_find_clause(condition, &clause, hash)) {
+            ovsdb_idl_condition_add_clause__(condition, &clause, hash);
+        }
+    }
+}
+
+void
+ovsdb_idl_condition_add_clause_true(struct ovsdb_idl_condition *condition)
+{
+    if (!condition->is_true) {
+        ovsdb_idl_condition_clear(condition);
+        condition->is_true = true;
+    }
+}
+
+static bool
+ovsdb_idl_condition_equals(const struct ovsdb_idl_condition *a,
+                           const struct ovsdb_idl_condition *b)
+{
+    if (hmap_count(&a->clauses) != hmap_count(&b->clauses)) {
+        return false;
+    }
+    if (a->is_true != b->is_true) {
+        return false;
+    }
+
+    const struct ovsdb_idl_clause *clause;
+    HMAP_FOR_EACH (clause, hmap_node, &a->clauses) {
+        if (!ovsdb_idl_condition_find_clause(b, clause,
+                                             clause->hmap_node.hash)) {
+            return false;
+        }
+    }
+    return true;
+}
+
+static void
+ovsdb_idl_condition_clone(struct ovsdb_idl_condition *dst,
+                          const struct ovsdb_idl_condition *src)
+{
+    ovsdb_idl_condition_init(dst);
+
+    dst->is_true = src->is_true;
+
+    const struct ovsdb_idl_clause *clause;
+    HMAP_FOR_EACH (clause, hmap_node, &src->clauses) {
+        ovsdb_idl_condition_add_clause__(dst, clause, clause->hmap_node.hash);
+    }
+}
+
+/* Sets the replication condition for 'tc' in 'idl' to 'condition' and
+ * arranges to send the new condition to the database server.
+ *
+ * Return the next conditional update sequence number.  When this
+ * value and ovsdb_idl_get_condition_seqno() matches, the 'idl'
+ * contains rows that match the 'condition'. */
+unsigned int
+ovsdb_idl_set_condition(struct ovsdb_idl *idl,
+                        const struct ovsdb_idl_table_class *tc,
+                        const struct ovsdb_idl_condition *condition)
+{
+    struct ovsdb_idl_table *table = ovsdb_idl_table_from_class(idl, tc);
+    unsigned int seqno = idl->cond_seqno;
+    if (!ovsdb_idl_condition_equals(condition, &table->condition)) {
+        ovsdb_idl_condition_destroy(&table->condition);
+        ovsdb_idl_condition_clone(&table->condition, condition);
+        idl->cond_changed = table->cond_changed = true;
+        poll_immediate_wake();
+        return seqno + 1;
+    }
+
+    return seqno;
+}
+
+static struct json *
+ovsdb_idl_condition_to_json(const struct ovsdb_idl_condition *cnd)
+{
+    if (cnd->is_true) {
+        return json_array_create_empty();
+    }
+
+    size_t n = hmap_count(&cnd->clauses);
+    if (!n) {
+        return json_array_create_1(json_boolean_create(false));
+    }
+
+    struct json **clauses = xmalloc(n * sizeof *clauses);
+    const struct ovsdb_idl_clause *clause;
+    size_t i = 0;
+    HMAP_FOR_EACH (clause, hmap_node, &cnd->clauses) {
+        clauses[i++] = ovsdb_idl_clause_to_json(clause);
+    }
+    ovs_assert(i == n);
+    return json_array_create(clauses, n);
+}
+\f
+static struct json *
+ovsdb_idl_create_cond_change_req(struct ovsdb_idl_table *table)
+{
+    const struct ovsdb_idl_condition *cond = &table->condition;
+    struct json *monitor_cond_change_request = json_object_create();
+    struct json *cond_json = ovsdb_idl_condition_to_json(cond);
+
+    json_object_put(monitor_cond_change_request, "where", cond_json);
+
+    return monitor_cond_change_request;
+}
+
+static void
+ovsdb_idl_send_cond_change(struct ovsdb_idl *idl)
+{
+    int i;
+    struct json *params;
+    struct jsonrpc_msg *request;
+
+    /* When 'idl-request_id' is not NULL, there is an outstanding
+     * conditional monitoring update request that we have not heard
+     * from the server yet. Don't generate another request in this case.  */
+    if (!idl->cond_changed || !jsonrpc_session_is_connected(idl->session) ||
+        idl->state != IDL_S_MONITORING_COND || idl->request_id) {
+        return;
+    }
+
+    struct json *monitor_cond_change_requests = NULL;
+
+    for (i = 0; i < idl->class_->n_tables; i++) {
+        struct ovsdb_idl_table *table = &idl->tables[i];
+
+        if (table->cond_changed) {
+            struct json *req = ovsdb_idl_create_cond_change_req(table);
+            if (req) {
+                if (!monitor_cond_change_requests) {
+                    monitor_cond_change_requests = json_object_create();
+                }
+                json_object_put(monitor_cond_change_requests,
+                             table->class_->name,
+                             json_array_create_1(req));
+            }
+            table->cond_changed = false;
+        }
+    }
+
+    /* Send request if not empty. */
+    if (monitor_cond_change_requests) {
+        params = json_array_create_3(json_string_create("monid"),
+                                     json_string_create("monid"),
+                                     monitor_cond_change_requests);
+
+        request = jsonrpc_create_request("monitor_cond_change", params,
+                                         &idl->request_id);
+        jsonrpc_session_send(idl->session, request);
+    }
+    idl->cond_changed = false;
+}
 
 /* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
  *
@@ -749,8 +1332,8 @@ ovsdb_idl_track_add_all(struct ovsdb_idl *idl)
 {
     size_t i, j;
 
-    for (i = 0; i < idl->class->n_tables; i++) {
-        const struct ovsdb_idl_table_class *tc = &idl->class->tables[i];
+    for (i = 0; i < idl->class_->n_tables; i++) {
+        const struct ovsdb_idl_table_class *tc = &idl->class_->tables[i];
 
         for (j = 0; j < tc->n_columns; j++) {
             const struct ovsdb_idl_column *column = &tc->columns[j];
@@ -765,7 +1348,7 @@ ovsdb_idl_track_is_set(struct ovsdb_idl_table *table)
 {
     size_t i;
 
-    for (i = 0; i < table->class->n_columns; i++) {
+    for (i = 0; i < table->class_->n_columns; i++) {
         if (table->modes[i] & OVSDB_IDL_TRACK) {
             return true;
         }
@@ -782,8 +1365,8 @@ ovsdb_idl_track_get_first(const struct ovsdb_idl *idl,
     struct ovsdb_idl_table *table
         = ovsdb_idl_table_from_class(idl, table_class);
 
-    if (!list_is_empty(&table->track_list)) {
-        return CONTAINER_OF(list_front(&table->track_list), struct ovsdb_idl_row, track_node);
+    if (!ovs_list_is_empty(&table->track_list)) {
+        return CONTAINER_OF(ovs_list_front(&table->track_list), struct ovsdb_idl_row, track_node);
     }
     return NULL;
 }
@@ -813,7 +1396,7 @@ ovsdb_idl_track_is_updated(const struct ovsdb_idl_row *row,
     const struct ovsdb_idl_table_class *class;
     size_t column_idx;
 
-    class = row->table->class;
+    class = row->table->class_;
     column_idx = column - class->columns;
 
     if (row->updated && bitmap_is_set(row->updated, column_idx)) {
@@ -833,10 +1416,10 @@ ovsdb_idl_track_clear(const struct ovsdb_idl *idl)
 {
     size_t i;
 
-    for (i = 0; i < idl->class->n_tables; i++) {
+    for (i = 0; i < idl->class_->n_tables; i++) {
         struct ovsdb_idl_table *table = &idl->tables[i];
 
-        if (!list_is_empty(&table->track_list)) {
+        if (!ovs_list_is_empty(&table->track_list)) {
             struct ovsdb_idl_row *row, *next;
 
             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
@@ -844,8 +1427,8 @@ ovsdb_idl_track_clear(const struct ovsdb_idl *idl)
                     free(row->updated);
                     row->updated = NULL;
                 }
-                list_remove(&row->track_node);
-                list_init(&row->track_node);
+                ovs_list_remove(&row->track_node);
+                ovs_list_init(&row->track_node);
                 if (ovsdb_idl_row_is_orphan(row)) {
                     ovsdb_idl_row_clear_old(row);
                     free(row);
@@ -864,7 +1447,7 @@ ovsdb_idl_send_schema_request(struct ovsdb_idl *idl)
     json_destroy(idl->request_id);
     msg = jsonrpc_create_request(
         "get_schema",
-        json_array_create_1(json_string_create(idl->class->database)),
+        json_array_create_1(json_string_create(idl->class_->database)),
         &idl->request_id);
     jsonrpc_session_send(idl->session, msg);
 }
@@ -872,10 +1455,9 @@ ovsdb_idl_send_schema_request(struct ovsdb_idl *idl)
 static void
 log_error(struct ovsdb_error *error)
 {
-    char *s = ovsdb_error_to_string(error);
+    char *s = ovsdb_error_to_string_free(error);
     VLOG_WARN("error parsing database schema: %s", s);
     free(s);
-    ovsdb_error_destroy(error);
 }
 
 /* Frees 'schema', which is in the format returned by parse_schema(). */
@@ -964,26 +1546,33 @@ ovsdb_idl_send_monitor_request__(struct ovsdb_idl *idl,
 
     schema = parse_schema(idl->schema);
     monitor_requests = json_object_create();
-    for (i = 0; i < idl->class->n_tables; i++) {
-        const struct ovsdb_idl_table *table = &idl->tables[i];
-        const struct ovsdb_idl_table_class *tc = table->class;
-        struct json *monitor_request, *columns;
+    for (i = 0; i < idl->class_->n_tables; i++) {
+        struct ovsdb_idl_table *table = &idl->tables[i];
+        const struct ovsdb_idl_table_class *tc = table->class_;
+        struct json *monitor_request, *columns, *where;
         const struct sset *table_schema;
         size_t j;
 
         table_schema = (schema
-                        ? shash_find_data(schema, table->class->name)
+                        ? shash_find_data(schema, table->class_->name)
                         : NULL);
 
         columns = table->need_table ? json_array_create_empty() : NULL;
         for (j = 0; j < tc->n_columns; j++) {
             const struct ovsdb_idl_column *column = &tc->columns[j];
-            if (table->modes[j] & OVSDB_IDL_MONITOR) {
-                if (table_schema
-                    && !sset_contains(table_schema, column->name)) {
+            bool db_has_column = (table_schema &&
+                                  sset_contains(table_schema, column->name));
+            if (column->is_synthetic) {
+                if (db_has_column) {
+                    VLOG_WARN("%s table in %s database has synthetic "
+                              "column %s", table->class_->name,
+                              idl->class_->database, column->name);
+                }
+            } else if (table->modes[j] & OVSDB_IDL_MONITOR) {
+                if (table_schema && !db_has_column) {
                     VLOG_WARN("%s table in %s database lacks %s column "
                               "(database needs upgrade?)",
-                              table->class->name, idl->class->database,
+                              table->class_->name, idl->class_->database,
                               column->name);
                     continue;
                 }
@@ -998,25 +1587,34 @@ ovsdb_idl_send_monitor_request__(struct ovsdb_idl *idl,
             if (schema && !table_schema) {
                 VLOG_WARN("%s database lacks %s table "
                           "(database needs upgrade?)",
-                          idl->class->database, table->class->name);
+                          idl->class_->database, table->class_->name);
                 json_destroy(columns);
                 continue;
             }
 
             monitor_request = json_object_create();
             json_object_put(monitor_request, "columns", columns);
-            json_object_put(monitor_requests, tc->name, monitor_request);
+            if (!strcmp(method, "monitor_cond")
+                && !ovsdb_idl_condition_is_true(&table->condition)) {
+                where = ovsdb_idl_condition_to_json(&table->condition);
+                json_object_put(monitor_request, "where", where);
+                table->cond_changed = false;
+            }
+            json_object_put(monitor_requests, tc->name,
+                            json_array_create_1(monitor_request));
         }
     }
     free_schema(schema);
 
     json_destroy(idl->request_id);
+
     msg = jsonrpc_create_request(
         method,
-        json_array_create_3(json_string_create(idl->class->database),
-                            json_null_create(), monitor_requests),
+        json_array_create_3(json_string_create(idl->class_->database),
+                            json_string_create("monid"), monitor_requests),
         &idl->request_id);
     jsonrpc_session_send(idl->session, msg);
+    idl->cond_changed = false;
 }
 
 static void
@@ -1028,18 +1626,18 @@ ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
 static void
 log_parse_update_error(struct ovsdb_error *error)
 {
-        if (!VLOG_DROP_WARN(&syntax_rl)) {
-            char *s = ovsdb_error_to_string(error);
-            VLOG_WARN_RL(&syntax_rl, "%s", s);
-            free(s);
-        }
-        ovsdb_error_destroy(error);
+    if (!VLOG_DROP_WARN(&syntax_rl)) {
+        char *s = ovsdb_error_to_string(error);
+        VLOG_WARN_RL(&syntax_rl, "%s", s);
+        free(s);
+    }
+    ovsdb_error_destroy(error);
 }
 
 static void
-ovsdb_idl_send_monitor2_request(struct ovsdb_idl *idl)
+ovsdb_idl_send_monitor_cond_request(struct ovsdb_idl *idl)
 {
-    ovsdb_idl_send_monitor_request__(idl, "monitor2");
+    ovsdb_idl_send_monitor_request__(idl, "monitor_cond");
 }
 
 static void
@@ -1088,7 +1686,7 @@ ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
                                       "<%s> for table \"%s\" is "
                                       "not an object",
                                       table_update_name,
-                                      table->class->name);
+                                      table->class_->name);
         }
         SHASH_FOR_EACH (table_node, json_object(table_update)) {
             const struct json *row_update = table_node->data;
@@ -1101,7 +1699,7 @@ ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
                                           "contains bad UUID "
                                           "\"%s\" as member name",
                                           table_update_name,
-                                          table->class->name,
+                                          table->class_->name,
                                           table_node->name);
             }
             if (row_update->type != JSON_OBJECT) {
@@ -1110,7 +1708,7 @@ ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
                                           "contains <%s> for %s that "
                                           "is not an object",
                                           table_update_name,
-                                          table->class->name,
+                                          table->class_->name,
                                           row_update_name,
                                           table_node->name);
             }
@@ -1210,7 +1808,7 @@ ovsdb_idl_process_update(struct ovsdb_idl_table *table,
         } else {
             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
                          "from table %s",
-                         UUID_ARGS(uuid), table->class->name);
+                         UUID_ARGS(uuid), table->class_->name);
             return false;
         }
     } else if (!old) {
@@ -1221,7 +1819,7 @@ ovsdb_idl_process_update(struct ovsdb_idl_table *table,
             ovsdb_idl_insert_row(row, new);
         } else {
             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
-                         "table %s", UUID_ARGS(uuid), table->class->name);
+                         "table %s", UUID_ARGS(uuid), table->class_->name);
             return ovsdb_idl_modify_row(row, new);
         }
     } else {
@@ -1233,12 +1831,12 @@ ovsdb_idl_process_update(struct ovsdb_idl_table *table,
             } else {
                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
                              "referenced row "UUID_FMT" in table %s",
-                             UUID_ARGS(uuid), table->class->name);
+                             UUID_ARGS(uuid), table->class_->name);
                 ovsdb_idl_insert_row(row, new);
             }
         } else {
             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
-                         "in table %s", UUID_ARGS(uuid), table->class->name);
+                         "in table %s", UUID_ARGS(uuid), table->class_->name);
             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
         }
     }
@@ -1264,7 +1862,7 @@ ovsdb_idl_process_update2(struct ovsdb_idl_table *table,
         } else {
             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
                          "from table %s",
-                         UUID_ARGS(uuid), table->class->name);
+                         UUID_ARGS(uuid), table->class_->name);
             return false;
         }
     } else if (!strcmp(operation, "insert") || !strcmp(operation, "initial")) {
@@ -1275,7 +1873,7 @@ ovsdb_idl_process_update2(struct ovsdb_idl_table *table,
             ovsdb_idl_insert_row(row, json_row);
         } else {
             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
-                         "table %s", UUID_ARGS(uuid), table->class->name);
+                         "table %s", UUID_ARGS(uuid), table->class_->name);
             ovsdb_idl_delete_row(row);
             ovsdb_idl_insert_row(row, json_row);
         }
@@ -1287,18 +1885,18 @@ ovsdb_idl_process_update2(struct ovsdb_idl_table *table,
             } else {
                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
                              "referenced row "UUID_FMT" in table %s",
-                             UUID_ARGS(uuid), table->class->name);
+                             UUID_ARGS(uuid), table->class_->name);
                 return false;
             }
         } else {
             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
-                         "in table %s", UUID_ARGS(uuid), table->class->name);
+                         "in table %s", UUID_ARGS(uuid), table->class_->name);
             return false;
         }
     } else {
-            VLOG_WARN_RL(&semantic_rl, "unknown operation %s to "
-                         "table %s", operation, table->class->name);
-            return false;
+        VLOG_WARN_RL(&semantic_rl, "unknown operation %s to "
+                     "table %s", operation, table->class_->name);
+        return false;
     }
 
     return true;
@@ -1316,7 +1914,7 @@ ovsdb_idl_row_change__(struct ovsdb_idl_row *row, const struct json *row_json,
                        enum ovsdb_idl_change change)
 {
     struct ovsdb_idl_table *table = row->table;
-    const struct ovsdb_idl_table_class *class = table->class;
+    const struct ovsdb_idl_table_class *class = table->class_;
     struct shash_node *node;
     bool changed = false;
     bool apply_diff = diff_json != NULL;
@@ -1337,8 +1935,8 @@ ovsdb_idl_row_change__(struct ovsdb_idl_row *row, const struct json *row_json,
             continue;
         }
 
-        column_idx = column - table->class->columns;
-        old = &row->old[column_idx];
+        column_idx = column - table->class_->columns;
+        old = &row->old_datum[column_idx];
 
         error = NULL;
         if (apply_diff) {
@@ -1367,10 +1965,11 @@ ovsdb_idl_row_change__(struct ovsdb_idl_row *row, const struct json *row_json,
                         = row->table->change_seqno[change]
                         = row->table->idl->change_seqno + 1;
                     if (table->modes[column_idx] & OVSDB_IDL_TRACK) {
-                        if (list_is_empty(&row->track_node)) {
-                            list_push_front(&row->table->track_list,
-                                            &row->track_node);
+                        if (!ovs_list_is_empty(&row->track_node)) {
+                            ovs_list_remove(&row->track_node);
                         }
+                        ovs_list_push_back(&row->table->track_list,
+                                       &row->track_node);
                         if (!row->updated) {
                             row->updated = bitmap_allocate(class->n_columns);
                         }
@@ -1384,12 +1983,11 @@ ovsdb_idl_row_change__(struct ovsdb_idl_row *row, const struct json *row_json,
 
             ovsdb_datum_destroy(&datum, &column->type);
         } else {
-            char *s = ovsdb_error_to_string(error);
+            char *s = ovsdb_error_to_string_free(error);
             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
                          " in table %s: %s", column_name,
-                         UUID_ARGS(&row->uuid), table->class->name, s);
+                         UUID_ARGS(&row->uuid), table->class_->name, s);
             free(s);
-            ovsdb_error_destroy(error);
         }
     }
     return changed;
@@ -1422,89 +2020,494 @@ ovsdb_idl_row_apply_diff(struct ovsdb_idl_row *row,
 static bool
 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
 {
-    return !row->old && !row->new;
+    return !row->old_datum && !row->new_datum;
+}
+
+/* Returns true if 'row' is conceptually part of the database as modified by
+ * the current transaction (if any), false otherwise.
+ *
+ * This function will return true if 'row' is not an orphan (see the comment on
+ * ovsdb_idl_row_is_orphan()) and:
+ *
+ *   - 'row' exists in the database and has not been deleted within the
+ *     current transaction (if any).
+ *
+ *   - 'row' was inserted within the current transaction and has not been
+ *     deleted.  (In the latter case you should not have passed 'row' in at
+ *     all, because ovsdb_idl_txn_delete() freed it.)
+ *
+ * This function will return false if 'row' is an orphan or if 'row' was
+ * deleted within the current transaction.
+ */
+static bool
+ovsdb_idl_row_exists(const struct ovsdb_idl_row *row)
+{
+    return row->new_datum != NULL;
+}
+
+static void
+ovsdb_idl_row_parse(struct ovsdb_idl_row *row)
+{
+    const struct ovsdb_idl_table_class *class = row->table->class_;
+    size_t i;
+
+    for (i = 0; i < class->n_columns; i++) {
+        const struct ovsdb_idl_column *c = &class->columns[i];
+        (c->parse)(row, &row->old_datum[i]);
+    }
+}
+
+static void
+ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
+{
+    const struct ovsdb_idl_table_class *class = row->table->class_;
+    size_t i;
+
+    for (i = 0; i < class->n_columns; i++) {
+        const struct ovsdb_idl_column *c = &class->columns[i];
+        (c->unparse)(row);
+    }
+}
+\f
+/* The OVSDB-IDL Compound Indexes feature allows for the creation of custom
+ * table indexes over one or more columns in the IDL. These indexes provide
+ * the ability to retrieve rows matching a particular search criteria and to
+ * iterate over a subset of rows in a defined order.
+ */
+
+/* Creates a new index with the provided name, attached to the given idl and
+ * table. Note that all indexes must be created and indexing columns added
+ * before the first call to ovsdb_idl_run() is made.
+ */
+struct ovsdb_idl_index *
+ovsdb_idl_create_index(struct ovsdb_idl *idl,
+                       const struct ovsdb_idl_table_class *tc,
+                       const char *index_name)
+{
+    struct ovsdb_idl_index *index;
+    size_t i;
+
+    for (i = 0; i < idl->class_->n_tables; i++) {
+        struct ovsdb_idl_table *table = &idl->tables[i];
+
+        if (table->class_ == tc) {
+            index = ovsdb_idl_create_index_(table, 1);
+            if (!shash_add_once(&table->indexes, index_name, index)) {
+                VLOG_ERR("Duplicate index name '%s' in table %s",
+                         index_name, table->class_->name);
+                return NULL;
+            }
+            index->index_name = index_name;
+            return index;
+        }
+    }
+    OVS_NOT_REACHED();
+    return NULL;
+}
+
+/* Generic comparator that can compare each index, using the custom
+ * configuration (an struct ovsdb_idl_index) passed to it.
+ * Not intended for direct usage.
+ */
+static int
+ovsdb_idl_index_generic_comparer(const void *a,
+                                 const void *b, const void *conf)
+{
+    const struct ovsdb_idl_column *column;
+    const struct ovsdb_idl_index *index;
+    size_t i;
+
+    index = CONST_CAST(struct ovsdb_idl_index *, conf);
+
+    if (a == b) {
+        return 0;
+    }
+
+    for (i = 0; i < index->n_columns; i++) {
+        int val;
+        if (index->columns[i].comparer) {
+            val = index->columns[i].comparer(a, b);
+        } else {
+            column = index->columns[i].column;
+            const struct ovsdb_idl_row *row_a, *row_b;
+            row_a = CONST_CAST(struct ovsdb_idl_row *, a);
+            row_b = CONST_CAST(struct ovsdb_idl_row *, b);
+            const struct ovsdb_datum *datum_a, *datum_b;
+            datum_a = ovsdb_idl_read(row_a, column);
+            datum_b = ovsdb_idl_read(row_b, column);
+            val = ovsdb_datum_compare_3way(datum_a, datum_b, &column->type);
+        }
+
+        if (val) {
+            return val * index->columns[i].sorting_order;
+        }
+    }
+
+    /* If ins_del is true then a row is being inserted into or deleted from
+     * the index list. In this case, we augment the search key with
+     * additional values (row UUID and memory address) to create a unique
+     * search key in order to locate the correct entry efficiently and to
+     * ensure that the correct entry is deleted in the case of a "delete"
+     * operation.
+     */
+    if (index->ins_del) {
+        const struct ovsdb_idl_row *row_a, *row_b;
+
+        row_a = (const struct ovsdb_idl_row *) a;
+        row_b = (const struct ovsdb_idl_row *) b;
+        int value = uuid_compare_3way(&row_a->uuid, &row_b->uuid);
+
+        return value ? value : (a < b) - (a > b);
+    } else {
+        return 0;
+    }
+}
+
+static struct ovsdb_idl_index *
+ovsdb_idl_create_index_(const struct ovsdb_idl_table *table,
+                        size_t allocated_cols)
+{
+    struct ovsdb_idl_index *index;
+
+    index = xmalloc(sizeof (struct ovsdb_idl_index));
+    index->n_columns = 0;
+    index->alloc_columns = allocated_cols;
+    index->skiplist = skiplist_create(ovsdb_idl_index_generic_comparer, index);
+    index->columns = xmalloc(allocated_cols *
+                             sizeof (struct ovsdb_idl_index_column));
+    index->ins_del = false;
+    index->table = table;
+    return index;
+}
+
+static void
+ovsdb_idl_destroy_indexes(struct ovsdb_idl_table *table)
+{
+    struct ovsdb_idl_index *index;
+    struct shash_node *node;
+
+    SHASH_FOR_EACH (node, &(table->indexes)) {
+        index = node->data;
+        skiplist_destroy(index->skiplist, NULL);
+        free(index->columns);
+    }
+    shash_destroy_free_data(&table->indexes);
+}
+
+static void
+ovsdb_idl_add_to_indexes(const struct ovsdb_idl_row *row)
+{
+    struct ovsdb_idl_table *table = row->table;
+    struct ovsdb_idl_index *index;
+    struct shash_node *node;
+
+    SHASH_FOR_EACH (node, &(table->indexes)) {
+        index = node->data;
+        index->ins_del = true;
+        skiplist_insert(index->skiplist, row);
+        index->ins_del = false;
+    }
+}
+
+static void
+ovsdb_idl_remove_from_indexes(const struct ovsdb_idl_row *row)
+{
+    struct ovsdb_idl_table *table = row->table;
+    struct ovsdb_idl_index *index;
+    struct shash_node *node;
+
+    SHASH_FOR_EACH (node, &(table->indexes)) {
+        index = node->data;
+        index->ins_del = true;
+        skiplist_delete(index->skiplist, row);
+        index->ins_del = false;
+    }
+}
+
+/* Adds a column to an existing index (note that columns can only be added to
+ * an index before the first call to ovsdb_idl_run()). The 'order' parameter
+ * specifies whether the sort order should be ascending (OVSDB_INDEX_ASC) or
+ * descending (OVSDB_INDEX_DESC). The 'custom_comparer' parameter, if non-NULL,
+ * contains a pointer to a custom comparison function. A default comparison
+ * function is used if a custom comparison function is not provided (the
+ * default comparison function can only be used for columns of type string,
+ * uuid, integer, real, or boolean).
+ */
+void
+ovsdb_idl_index_add_column(struct ovsdb_idl_index *index,
+                           const struct ovsdb_idl_column *column,
+                           int order, column_comparator *custom_comparer)
+{
+    /* Check that the column or table is tracked */
+    if (!index->table->need_table &&
+        !((OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT) &
+          *ovsdb_idl_get_mode(index->table->idl, column))) {
+        VLOG_ERR("Can't add unmonitored column '%s' at index '%s' in "
+                 "table '%s'.",
+                 column->name, index->index_name, index->table->class_->name);
+    }
+    if (!ovsdb_type_is_scalar(&column->type) && !custom_comparer) {
+        VLOG_WARN("Comparing non-scalar values.");
+    }
+
+    /* Allocate more memory for column configuration */
+    if (index->n_columns == index->alloc_columns) {
+        index->alloc_columns++;
+        index->columns = xrealloc(index->columns,
+                                  index->alloc_columns *
+                                  sizeof(struct ovsdb_idl_index_column));
+    }
+
+    /* Append column to index */
+    int i = index->n_columns;
+
+    index->columns[i].column = column;
+    index->columns[i].comparer = custom_comparer ? custom_comparer : NULL;
+    if (order == OVSDB_INDEX_ASC) {
+        index->columns[i].sorting_order = OVSDB_INDEX_ASC;
+    } else {
+        index->columns[i].sorting_order = OVSDB_INDEX_DESC;
+    }
+    index->n_columns++;
+}
+
+bool
+ovsdb_idl_initialize_cursor(struct ovsdb_idl *idl,
+                            const struct ovsdb_idl_table_class *tc,
+                            const char *index_name,
+                            struct ovsdb_idl_index_cursor *cursor)
+{
+    size_t i;
+
+    for (i = 0; i < idl->class_->n_tables; i++) {
+        struct ovsdb_idl_table *table = &idl->tables[i];
+
+        if (table->class_ == tc) {
+            struct shash_node *node = shash_find(&table->indexes, index_name);
+
+            if (!node || !node->data) {
+                VLOG_ERR("Cursor initialization failed, "
+                         "index %s at table %s does not exist.",
+                         index_name, tc->name);
+                cursor->index = NULL;
+                cursor->position = NULL;
+                return false;
+            }
+            cursor->index = node->data;
+            cursor->position = skiplist_first(cursor->index->skiplist);
+            return true;
+        }
+    }
+    VLOG_ERR("Cursor initialization failed, "
+             "index %s at table %s does not exist.", index_name, tc->name);
+    return false;
+}
+
+/* ovsdb_idl_index_write_ writes a datum in an ovsdb_idl_row,
+ * and updates the corresponding field in the table record.
+ * Not intended for direct usage.
+ */
+void
+ovsdb_idl_index_write_(struct ovsdb_idl_row *const_row,
+                       const struct ovsdb_idl_column *column,
+                       struct ovsdb_datum *datum,
+                       const struct ovsdb_idl_table_class *class)
+{
+    struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, const_row);
+    size_t column_idx = column - class->columns;
+
+    if (bitmap_is_set(row->written, column_idx)) {
+        free(row->new_datum[column_idx].values);
+        free(row->new_datum[column_idx].keys);
+    } else {
+        bitmap_set1(row->written, column_idx);
+     }
+    row->new_datum[column_idx] = *datum;
+    (column->unparse)(row);
+    (column->parse)(row, &row->new_datum[column_idx]);
+}
+
+/* Magic UUID for index rows */
+static const struct uuid index_row_uuid = {
+        .parts = {0xdeadbeef,
+                  0xdeadbeef,
+                  0xdeadbeef,
+                  0xdeadbeef}};
+
+/* Check if a row is an index row */
+static bool
+is_index_row(struct ovsdb_idl_row *row)
+{
+    return uuid_equals(&row->uuid, &index_row_uuid);
+}
+
+/* Initializes a row for use in an indexed query.
+ * Not intended for direct usage.
+ */
+struct ovsdb_idl_row *
+ovsdb_idl_index_init_row(struct ovsdb_idl * idl,
+                         const struct ovsdb_idl_table_class *class)
+{
+    struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
+    class->row_init(row);
+    row->uuid = index_row_uuid;
+    row->new_datum = xmalloc(class->n_columns * sizeof *row->new_datum);
+    row->written = bitmap_allocate(class->n_columns);
+    row->table = ovsdb_idl_table_from_class(idl, class);
+    /* arcs are not used for index row, but it doesn't harm to initialize */
+    ovs_list_init(&row->src_arcs);
+    ovs_list_init(&row->dst_arcs);
+    return row;
+}
+
+/* Destroys 'row_' and frees all associated memory. This function is intended
+ * to be used indirectly through one of the "index_destroy_row" functions
+ * generated by ovsdb-idlc.
+ */
+void
+ovsdb_idl_index_destroy_row__(const struct ovsdb_idl_row *row_)
+{
+    struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
+    const struct ovsdb_idl_table_class *class = row->table->class_;
+    const struct ovsdb_idl_column *c;
+    size_t i;
+
+    ovs_assert(ovs_list_is_empty(&row_->src_arcs));
+    ovs_assert(ovs_list_is_empty(&row_->dst_arcs));
+    BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
+        c = &class->columns[i];
+        (c->unparse) (row);
+        free(row->new_datum[i].values);
+        free(row->new_datum[i].keys);
+    }
+    free(row->new_datum);
+    free(row->written);
+    free(row);
+}
+
+/* Moves the cursor to the first entry in the index. Returns a pointer to the
+ * corresponding ovsdb_idl_row, or NULL if the index list is empy.
+ */
+struct ovsdb_idl_row *
+ovsdb_idl_index_first(struct ovsdb_idl_index_cursor *cursor)
+{
+    cursor->position = skiplist_first(cursor->index->skiplist);
+    return ovsdb_idl_index_data(cursor);
+}
+
+/* Moves the cursor to the next record in the index list.
+ */
+struct ovsdb_idl_row *
+ovsdb_idl_index_next(struct ovsdb_idl_index_cursor *cursor)
+{
+    if (!cursor->position) {
+        return NULL;
+    }
+    cursor->position = skiplist_next(cursor->position);
+    return ovsdb_idl_index_data(cursor);
+ }
+
+/* Returns the ovsdb_idl_row pointer corresponding to the record at the
+ * current cursor location.
+ */
+struct ovsdb_idl_row *
+ovsdb_idl_index_data(struct ovsdb_idl_index_cursor *cursor)
+{
+    return skiplist_get_data(cursor->position);
 }
 
-/* Returns true if 'row' is conceptually part of the database as modified by
- * the current transaction (if any), false otherwise.
- *
- * This function will return true if 'row' is not an orphan (see the comment on
- * ovsdb_idl_row_is_orphan()) and:
- *
- *   - 'row' exists in the database and has not been deleted within the
- *     current transaction (if any).
- *
- *   - 'row' was inserted within the current transaction and has not been
- *     deleted.  (In the latter case you should not have passed 'row' in at
- *     all, because ovsdb_idl_txn_delete() freed it.)
- *
- * This function will return false if 'row' is an orphan or if 'row' was
- * deleted within the current transaction.
+/* Moves the cursor to the first entry in the index matching the specified
+ * value. If 'value' is NULL, the cursor is moved to the last entry in the
+ * list. Returns a pointer to the corresponding ovsdb_idl_row or NULL.
  */
-static bool
-ovsdb_idl_row_exists(const struct ovsdb_idl_row *row)
+struct ovsdb_idl_row *
+ovsdb_idl_index_find(struct ovsdb_idl_index_cursor *cursor,
+                     struct ovsdb_idl_row *value)
 {
-    return row->new != NULL;
+    if (value) {
+        cursor->position = skiplist_find(cursor->index->skiplist, value);
+    } else {
+        cursor->position = skiplist_first(cursor->index->skiplist);
+    }
+    return ovsdb_idl_index_data(cursor);
 }
 
-static void
-ovsdb_idl_row_parse(struct ovsdb_idl_row *row)
+/* Moves the cursor to the first entry in the index with a value greater than
+ * or equal to the given value. If 'value' is NULL, the cursor is moved to the
+ * first entry in the index.  Returns a pointer to the corresponding
+ * ovsdb_idl_row or NULL if such a row does not exist.
+ */
+struct ovsdb_idl_row *
+ovsdb_idl_index_forward_to(struct ovsdb_idl_index_cursor *cursor,
+                           struct ovsdb_idl_row *value)
 {
-    const struct ovsdb_idl_table_class *class = row->table->class;
-    size_t i;
-
-    for (i = 0; i < class->n_columns; i++) {
-        const struct ovsdb_idl_column *c = &class->columns[i];
-        (c->parse)(row, &row->old[i]);
+    if (value) {
+        cursor->position = skiplist_forward_to(cursor->index->skiplist, value);
+    } else {
+        cursor->position = skiplist_first(cursor->index->skiplist);
     }
+    return ovsdb_idl_index_data(cursor);
 }
 
-static void
-ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
+/* Returns the result of comparing two rows using the comparison function
+ * for this index.
+ * Returns:
+ * < 0 if a < b
+ * 0 if a == b
+ * > 0 if a > b
+ * When the pointer to either row is NULL, this function considers NULL to be
+ * greater than any other value, and NULL == NULL.
+ */
+int
+ovsdb_idl_index_compare(struct ovsdb_idl_index_cursor *cursor,
+                        struct ovsdb_idl_row *a, struct ovsdb_idl_row *b)
 {
-    const struct ovsdb_idl_table_class *class = row->table->class;
-    size_t i;
-
-    for (i = 0; i < class->n_columns; i++) {
-        const struct ovsdb_idl_column *c = &class->columns[i];
-        (c->unparse)(row);
+    if (a && b) {
+        return ovsdb_idl_index_generic_comparer(a, b, cursor->index);
+    } else if (!a && !b) {
+        return 0;
+    } else if (a) {
+        return -1;
+    } else {
+        return 1;
     }
 }
 
 static void
 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
 {
-    ovs_assert(row->old == row->new);
+    ovs_assert(row->old_datum == row->new_datum);
     if (!ovsdb_idl_row_is_orphan(row)) {
-        const struct ovsdb_idl_table_class *class = row->table->class;
+        const struct ovsdb_idl_table_class *class = row->table->class_;
         size_t i;
 
         for (i = 0; i < class->n_columns; i++) {
-            ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
+            ovsdb_datum_destroy(&row->old_datum[i], &class->columns[i].type);
         }
-        free(row->old);
-        row->old = row->new = NULL;
+        free(row->old_datum);
+        row->old_datum = row->new_datum = NULL;
     }
 }
 
 static void
 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
 {
-    if (row->old != row->new) {
-        if (row->new) {
-            const struct ovsdb_idl_table_class *class = row->table->class;
+    if (row->old_datum != row->new_datum) {
+        if (row->new_datum) {
+            const struct ovsdb_idl_table_class *class = row->table->class_;
             size_t i;
 
             if (row->written) {
                 BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
-                    ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
+                    ovsdb_datum_destroy(&row->new_datum[i],
+                                        &class->columns[i].type);
                 }
             }
-            free(row->new);
+            free(row->new_datum);
             free(row->written);
             row->written = NULL;
         }
-        row->new = row->old;
+        row->new_datum = row->old_datum;
     }
 }
 
@@ -1519,15 +2522,15 @@ ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
      * freed.
      */
     LIST_FOR_EACH_SAFE (arc, next, src_node, &row->src_arcs) {
-        list_remove(&arc->dst_node);
+        ovs_list_remove(&arc->dst_node);
         if (destroy_dsts
             && ovsdb_idl_row_is_orphan(arc->dst)
-            && list_is_empty(&arc->dst->dst_arcs)) {
+            && ovs_list_is_empty(&arc->dst->dst_arcs)) {
             ovsdb_idl_row_destroy(arc->dst);
         }
         free(arc);
     }
-    list_init(&row->src_arcs);
+    ovs_list_init(&row->src_arcs);
 }
 
 /* Force nodes that reference 'row' to reparse. */
@@ -1560,20 +2563,24 @@ ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
 {
     struct ovsdb_idl_row *row = xzalloc(class->allocation_size);
     class->row_init(row);
-    list_init(&row->src_arcs);
-    list_init(&row->dst_arcs);
+    ovs_list_init(&row->src_arcs);
+    ovs_list_init(&row->dst_arcs);
     hmap_node_nullify(&row->txn_node);
-    list_init(&row->track_node);
+    ovs_list_init(&row->track_node);
     return row;
 }
 
 static struct ovsdb_idl_row *
 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
 {
-    struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
+    struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class_);
     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
     row->uuid = *uuid;
     row->table = table;
+    row->map_op_written = NULL;
+    row->map_op_lists = NULL;
+    row->set_op_written = NULL;
+    row->set_op_lists = NULL;
     return row;
 }
 
@@ -1583,14 +2590,59 @@ ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
     if (row) {
         ovsdb_idl_row_clear_old(row);
         hmap_remove(&row->table->rows, &row->hmap_node);
+        ovsdb_idl_destroy_all_map_op_lists(row);
+        ovsdb_idl_destroy_all_set_op_lists(row);
         if (ovsdb_idl_track_is_set(row->table)) {
             row->change_seqno[OVSDB_IDL_CHANGE_DELETE]
                 = row->table->change_seqno[OVSDB_IDL_CHANGE_DELETE]
                 = row->table->idl->change_seqno + 1;
         }
-        if (list_is_empty(&row->track_node)) {
-            list_push_front(&row->table->track_list, &row->track_node);
+        if (!ovs_list_is_empty(&row->track_node)) {
+            ovs_list_remove(&row->track_node);
+        }
+        ovs_list_push_back(&row->table->track_list, &row->track_node);
+    }
+}
+
+static void
+ovsdb_idl_destroy_all_map_op_lists(struct ovsdb_idl_row *row)
+{
+    if (row->map_op_written) {
+        /* Clear Map Operation Lists */
+        size_t idx, n_columns;
+        const struct ovsdb_idl_column *columns;
+        const struct ovsdb_type *type;
+        n_columns = row->table->class_->n_columns;
+        columns = row->table->class_->columns;
+        BITMAP_FOR_EACH_1 (idx, n_columns, row->map_op_written) {
+            type = &columns[idx].type;
+            map_op_list_destroy(row->map_op_lists[idx], type);
+        }
+        free(row->map_op_lists);
+        bitmap_free(row->map_op_written);
+        row->map_op_lists = NULL;
+        row->map_op_written = NULL;
+    }
+}
+
+static void
+ovsdb_idl_destroy_all_set_op_lists(struct ovsdb_idl_row *row)
+{
+    if (row->set_op_written) {
+        /* Clear Set Operation Lists */
+        size_t idx, n_columns;
+        const struct ovsdb_idl_column *columns;
+        const struct ovsdb_type *type;
+        n_columns = row->table->class_->n_columns;
+        columns = row->table->class_->columns;
+        BITMAP_FOR_EACH_1 (idx, n_columns, row->set_op_written) {
+            type = &columns[idx].type;
+            set_op_list_destroy(row->set_op_lists[idx], type);
         }
+        free(row->set_op_lists);
+        bitmap_free(row->set_op_written);
+        row->set_op_lists = NULL;
+        row->set_op_written = NULL;
     }
 }
 
@@ -1599,15 +2651,15 @@ ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *idl)
 {
     size_t i;
 
-    for (i = 0; i < idl->class->n_tables; i++) {
+    for (i = 0; i < idl->class_->n_tables; i++) {
         struct ovsdb_idl_table *table = &idl->tables[i];
 
-        if (!list_is_empty(&table->track_list)) {
+        if (!ovs_list_is_empty(&table->track_list)) {
             struct ovsdb_idl_row *row, *next;
 
             LIST_FOR_EACH_SAFE(row, next, track_node, &table->track_list) {
                 if (!ovsdb_idl_track_is_set(row->table)) {
-                    list_remove(&row->track_node);
+                    ovs_list_remove(&row->track_node);
                     free(row);
                 }
             }
@@ -1618,27 +2670,30 @@ ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *idl)
 static void
 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
 {
-    const struct ovsdb_idl_table_class *class = row->table->class;
-    size_t i;
+    const struct ovsdb_idl_table_class *class = row->table->class_;
+    size_t i, datum_size;
 
-    ovs_assert(!row->old && !row->new);
-    row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
+    ovs_assert(!row->old_datum && !row->new_datum);
+    datum_size = class->n_columns * sizeof *row->old_datum;
+    row->old_datum = row->new_datum = xmalloc(datum_size);
     for (i = 0; i < class->n_columns; i++) {
-        ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
+        ovsdb_datum_init_default(&row->old_datum[i], &class->columns[i].type);
     }
     ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_INSERT);
     ovsdb_idl_row_parse(row);
 
     ovsdb_idl_row_reparse_backrefs(row);
+    ovsdb_idl_add_to_indexes(row);
 }
 
 static void
 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
 {
+    ovsdb_idl_remove_from_indexes(row);
     ovsdb_idl_row_unparse(row);
     ovsdb_idl_row_clear_arcs(row, true);
     ovsdb_idl_row_clear_old(row);
-    if (list_is_empty(&row->dst_arcs)) {
+    if (ovs_list_is_empty(&row->dst_arcs)) {
         ovsdb_idl_row_destroy(row);
     } else {
         ovsdb_idl_row_reparse_backrefs(row);
@@ -1652,10 +2707,12 @@ ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
 {
     bool changed;
 
+    ovsdb_idl_remove_from_indexes(row);
     ovsdb_idl_row_unparse(row);
     ovsdb_idl_row_clear_arcs(row, true);
     changed = ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_MODIFY);
     ovsdb_idl_row_parse(row);
+    ovsdb_idl_add_to_indexes(row);
 
     return changed;
 }
@@ -1691,7 +2748,7 @@ may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
      * at 'src', since we add all of the arcs from a given source in a clump
      * (in a single call to ovsdb_idl_row_parse()) and new arcs are always
      * added at the front of the dst_arcs list. */
-    if (list_is_empty(&dst->dst_arcs)) {
+    if (ovs_list_is_empty(&dst->dst_arcs)) {
         return true;
     }
     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
@@ -1702,13 +2759,13 @@ static struct ovsdb_idl_table *
 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
                            const struct ovsdb_idl_table_class *table_class)
 {
-    return &idl->tables[table_class - idl->class->tables];
+    return &idl->tables[table_class - idl->class_->tables];
 }
 
 /* Called by ovsdb-idlc generated code. */
 struct ovsdb_idl_row *
 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
-                      struct ovsdb_idl_table_class *dst_table_class,
+                      const struct ovsdb_idl_table_class *dst_table_class,
                       const struct uuid *dst_uuid)
 {
     struct ovsdb_idl *idl = src->table->idl;
@@ -1718,14 +2775,18 @@ ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
 
     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
-    if (idl->txn) {
-        /* We're being called from ovsdb_idl_txn_write().  We must not update
+    if (idl->txn || is_index_row(src)) {
+        /* There are two cases we should not update any arcs:
+         *
+         * 1. We're being called from ovsdb_idl_txn_write(). We must not update
          * any arcs, because the transaction will be backed out at commit or
          * abort time and we don't want our graph screwed up.
          *
-         * Just return the destination row, if there is one and it has not been
-         * deleted. */
-        if (dst && (hmap_node_is_null(&dst->txn_node) || dst->new)) {
+         * 2. The row is used as an index for querying purpose only.
+         *
+         * In these cases, just return the destination row, if there is one and
+         * it has not been deleted. */
+        if (dst && (hmap_node_is_null(&dst->txn_node) || dst->new_datum)) {
             return dst;
         }
         return NULL;
@@ -1740,8 +2801,8 @@ ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
             /* The arc *must* be added at the front of the dst_arcs list.  See
              * ovsdb_idl_row_reparse_backrefs() for details. */
             arc = xmalloc(sizeof *arc);
-            list_push_front(&src->src_arcs, &arc->src_node);
-            list_push_front(&dst->dst_arcs, &arc->dst_node);
+            ovs_list_push_front(&src->src_arcs, &arc->src_node);
+            ovs_list_push_front(&dst->dst_arcs, &arc->dst_node);
             arc->src = src;
             arc->dst = dst;
         }
@@ -1819,16 +2880,16 @@ ovsdb_idl_read(const struct ovsdb_idl_row *row,
 
     ovs_assert(!ovsdb_idl_row_is_synthetic(row));
 
-    class = row->table->class;
+    class = row->table->class_;
     column_idx = column - class->columns;
 
-    ovs_assert(row->new != NULL);
+    ovs_assert(row->new_datum != NULL);
     ovs_assert(column_idx < class->n_columns);
 
     if (row->written && bitmap_is_set(row->written, column_idx)) {
-        return &row->new[column_idx];
-    } else if (row->old) {
-        return &row->old[column_idx];
+        return &row->new_datum[column_idx];
+    } else if (row->old_datum) {
+        return &row->old_datum[column_idx];
     } else {
         return ovsdb_datum_default(&column->type);
     }
@@ -1865,7 +2926,7 @@ bool
 ovsdb_idl_is_mutable(const struct ovsdb_idl_row *row,
                      const struct ovsdb_idl_column *column)
 {
-    return column->mutable || (row->new && !row->old);
+    return column->is_mutable || (row->new_datum && !row->old_datum);
 }
 
 /* Returns false if 'row' was obtained from the IDL, true if it was initialized
@@ -1973,6 +3034,12 @@ ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
  * successfully, the client may retrieve the final (incremented) value of
  * 'column' with ovsdb_idl_txn_get_increment_new_value().
  *
+ * If at time of commit the transaction is otherwise empty, that is, it doesn't
+ * change the database, then 'force' is important.  If 'force' is false in this
+ * case, the IDL suppresses the increment and skips a round trip to the
+ * database server.  If 'force' is true, the IDL will still increment the
+ * column.
+ *
  * The client could accomplish something similar with ovsdb_idl_read(),
  * ovsdb_idl_txn_verify() and ovsdb_idl_txn_write(), or with ovsdb-idlc
  * generated wrappers for these functions.  However, ovsdb_idl_txn_increment()
@@ -1983,15 +3050,17 @@ ovsdb_idl_txn_set_dry_run(struct ovsdb_idl_txn *txn)
 void
 ovsdb_idl_txn_increment(struct ovsdb_idl_txn *txn,
                         const struct ovsdb_idl_row *row,
-                        const struct ovsdb_idl_column *column)
+                        const struct ovsdb_idl_column *column,
+                        bool force)
 {
     ovs_assert(!txn->inc_table);
     ovs_assert(column->type.key.type == OVSDB_TYPE_INTEGER);
     ovs_assert(column->type.value.type == OVSDB_TYPE_VOID);
 
-    txn->inc_table = row->table->class->name;
+    txn->inc_table = row->table->class_->name;
     txn->inc_column = column->name;
     txn->inc_row = row->uuid;
+    txn->inc_force = force;
 }
 
 /* Destroys 'txn' and frees all associated memory.  If ovsdb_idl_txn_commit()
@@ -2041,22 +3110,6 @@ where_uuid_equals(const struct uuid *uuid)
                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
 }
 
-static char *
-uuid_name_from_uuid(const struct uuid *uuid)
-{
-    char *name;
-    char *p;
-
-    name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
-    for (p = name; *p != '\0'; p++) {
-        if (*p == '-') {
-            *p = '_';
-        }
-    }
-
-    return name;
-}
-
 static const struct ovsdb_idl_row *
 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
 {
@@ -2086,12 +3139,12 @@ substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
             const struct ovsdb_idl_row *row;
 
             row = ovsdb_idl_txn_get_row(txn, &uuid);
-            if (row && !row->old && row->new) {
+            if (row && !row->old_datum && row->new_datum) {
                 json_destroy(json);
 
                 return json_array_create_2(
                     json_string_create("named-uuid"),
-                    json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
+                    json_string_create_nocopy(ovsdb_data_row_name(&uuid)));
             }
         }
 
@@ -2121,7 +3174,9 @@ ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
     txn->idl->txn = NULL;
 
     HMAP_FOR_EACH_SAFE (row, next, txn_node, &txn->txn_rows) {
-        if (row->old) {
+        ovsdb_idl_destroy_all_map_op_lists(row);
+        ovsdb_idl_destroy_all_set_op_lists(row);
+        if (row->old_datum) {
             if (row->written) {
                 ovsdb_idl_row_unparse(row);
                 ovsdb_idl_row_clear_arcs(row, false);
@@ -2140,7 +3195,7 @@ ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
 
         hmap_remove(&txn->txn_rows, &row->txn_node);
         hmap_node_nullify(&row->txn_node);
-        if (!row->old) {
+        if (!row->old_datum) {
             hmap_remove(&row->table->rows, &row->hmap_node);
             free(row);
         }
@@ -2149,6 +3204,203 @@ ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
     hmap_init(&txn->txn_rows);
 }
 
+static bool
+ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *row,
+                                struct json *mutations)
+{
+    const struct ovsdb_idl_table_class *class = row->table->class_;
+    size_t idx;
+    bool any_mutations = false;
+
+    if (row->map_op_written) {
+        BITMAP_FOR_EACH_1(idx, class->n_columns, row->map_op_written) {
+            struct map_op_list *map_op_list;
+            const struct ovsdb_idl_column *column;
+            const struct ovsdb_datum *old_datum;
+            enum ovsdb_atomic_type key_type, value_type;
+            struct json *mutation, *map, *col_name, *mutator;
+            struct json *del_set, *ins_map;
+            bool any_del, any_ins;
+
+            map_op_list = row->map_op_lists[idx];
+            column = &class->columns[idx];
+            key_type = column->type.key.type;
+            value_type = column->type.value.type;
+
+            /* Get the value to be changed */
+            if (row->new_datum && row->written
+                && bitmap_is_set(row->written,idx)) {
+                old_datum = &row->new_datum[idx];
+            } else if (row->old_datum != NULL) {
+                old_datum = &row->old_datum[idx];
+            } else {
+                old_datum = ovsdb_datum_default(&column->type);
+            }
+
+            del_set = json_array_create_empty();
+            ins_map = json_array_create_empty();
+            any_del = false;
+            any_ins = false;
+
+            for (struct map_op *map_op = map_op_list_first(map_op_list); map_op;
+                 map_op = map_op_list_next(map_op_list, map_op)) {
+
+                if (map_op_type(map_op) == MAP_OP_UPDATE) {
+                    /* Find out if value really changed. */
+                    struct ovsdb_datum *new_datum;
+                    unsigned int pos;
+                    new_datum = map_op_datum(map_op);
+                    pos = ovsdb_datum_find_key(old_datum,
+                                               &new_datum->keys[0],
+                                               key_type);
+                    if (ovsdb_atom_equals(&new_datum->values[0],
+                                          &old_datum->values[pos],
+                                          value_type)) {
+                        /* No change in value. Move on to next update. */
+                        continue;
+                    }
+                } else if (map_op_type(map_op) == MAP_OP_DELETE){
+                    /* Verify that there is a key to delete. */
+                    unsigned int pos;
+                    pos = ovsdb_datum_find_key(old_datum,
+                                               &map_op_datum(map_op)->keys[0],
+                                               key_type);
+                    if (pos == UINT_MAX) {
+                        /* No key to delete.  Move on to next update. */
+                        VLOG_WARN("Trying to delete a key that doesn't "
+                                  "exist in the map.");
+                        continue;
+                    }
+                }
+
+                if (map_op_type(map_op) == MAP_OP_INSERT) {
+                    map = json_array_create_2(
+                        ovsdb_atom_to_json(&map_op_datum(map_op)->keys[0],
+                                           key_type),
+                        ovsdb_atom_to_json(&map_op_datum(map_op)->values[0],
+                                           value_type));
+                    json_array_add(ins_map, map);
+                    any_ins = true;
+                } else { /* MAP_OP_UPDATE or MAP_OP_DELETE */
+                    map = ovsdb_atom_to_json(&map_op_datum(map_op)->keys[0],
+                                             key_type);
+                    json_array_add(del_set, map);
+                    any_del = true;
+                }
+
+                /* Generate an additional insert mutate for updates. */
+                if (map_op_type(map_op) == MAP_OP_UPDATE) {
+                    map = json_array_create_2(
+                        ovsdb_atom_to_json(&map_op_datum(map_op)->keys[0],
+                                           key_type),
+                        ovsdb_atom_to_json(&map_op_datum(map_op)->values[0],
+                                           value_type));
+                    json_array_add(ins_map, map);
+                    any_ins = true;
+                }
+            }
+
+            if (any_del) {
+                col_name = json_string_create(column->name);
+                mutator = json_string_create("delete");
+                map = json_array_create_2(json_string_create("set"), del_set);
+                mutation = json_array_create_3(col_name, mutator, map);
+                json_array_add(mutations, mutation);
+                any_mutations = true;
+            } else {
+                json_destroy(del_set);
+            }
+            if (any_ins) {
+                col_name = json_string_create(column->name);
+                mutator = json_string_create("insert");
+                map = json_array_create_2(json_string_create("map"), ins_map);
+                mutation = json_array_create_3(col_name, mutator, map);
+                json_array_add(mutations, mutation);
+                any_mutations = true;
+            } else {
+                json_destroy(ins_map);
+            }
+        }
+    }
+    if (row->set_op_written) {
+        BITMAP_FOR_EACH_1(idx, class->n_columns, row->set_op_written) {
+            struct set_op_list *set_op_list;
+            const struct ovsdb_idl_column *column;
+            const struct ovsdb_datum *old_datum;
+            enum ovsdb_atomic_type key_type;
+            struct json *mutation, *set, *col_name, *mutator;
+            struct json *del_set, *ins_set;
+            bool any_del, any_ins;
+
+            set_op_list = row->set_op_lists[idx];
+            column = &class->columns[idx];
+            key_type = column->type.key.type;
+
+            /* Get the value to be changed */
+            if (row->new_datum && row->written
+                && bitmap_is_set(row->written,idx)) {
+                old_datum = &row->new_datum[idx];
+            } else if (row->old_datum != NULL) {
+                old_datum = &row->old_datum[idx];
+            } else {
+                old_datum = ovsdb_datum_default(&column->type);
+            }
+
+            del_set = json_array_create_empty();
+            ins_set = json_array_create_empty();
+            any_del = false;
+            any_ins = false;
+
+            for (struct set_op *set_op = set_op_list_first(set_op_list); set_op;
+                 set_op = set_op_list_next(set_op_list, set_op)) {
+                if (set_op_type(set_op) == SET_OP_INSERT) {
+                    set = ovsdb_atom_to_json(&set_op_datum(set_op)->keys[0],
+                                             key_type);
+                    json_array_add(ins_set, set);
+                    any_ins = true;
+                } else { /* SETP_OP_DELETE */
+                    /* Verify that there is a key to delete. */
+                    unsigned int pos;
+                    pos = ovsdb_datum_find_key(old_datum,
+                                               &set_op_datum(set_op)->keys[0],
+                                               key_type);
+                    if (pos == UINT_MAX) {
+                        /* No key to delete.  Move on to next update. */
+                        VLOG_WARN("Trying to delete a key that doesn't "
+                                  "exist in the set.");
+                        continue;
+                    }
+                    set = ovsdb_atom_to_json(&set_op_datum(set_op)->keys[0],
+                                             key_type);
+                    json_array_add(del_set, set);
+                    any_del = true;
+                }
+            }
+            if (any_del) {
+                col_name = json_string_create(column->name);
+                mutator = json_string_create("delete");
+                set = json_array_create_2(json_string_create("set"), del_set);
+                mutation = json_array_create_3(col_name, mutator, set);
+                json_array_add(mutations, mutation);
+                any_mutations = true;
+            } else {
+                json_destroy(del_set);
+            }
+            if (any_ins) {
+                col_name = json_string_create(column->name);
+                mutator = json_string_create("insert");
+                set = json_array_create_2(json_string_create("set"), ins_set);
+                mutation = json_array_create_3(col_name, mutator, set);
+                json_array_add(mutations, mutation);
+                any_mutations = true;
+            } else {
+                json_destroy(ins_set);
+            }
+        }
+    }
+    return any_mutations;
+}
+
 /* Attempts to commit 'txn'.  Returns the status of the commit operation, one
  * of the following TXN_* constants:
  *
@@ -2216,7 +3468,7 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
     }
 
     operations = json_array_create_1(
-        json_string_create(txn->idl->class->database));
+        json_string_create(txn->idl->class_->database));
 
     /* Assert that we have the required lock (avoiding a race). */
     if (txn->idl->lock_name) {
@@ -2230,7 +3482,7 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
         /* XXX check that deleted rows exist even if no prereqs? */
         if (row->prereqs) {
-            const struct ovsdb_idl_table_class *class = row->table->class;
+            const struct ovsdb_idl_table_class *class = row->table->class_;
             size_t n_columns = class->n_columns;
             struct json *op, *columns, *row_json;
             size_t idx;
@@ -2251,7 +3503,7 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
                 const struct ovsdb_idl_column *column = &class->columns[idx];
                 json_array_add(columns, json_string_create(column->name));
                 json_object_put(row_json, column->name,
-                                ovsdb_datum_to_json(&row->old[idx],
+                                ovsdb_datum_to_json(&row->old_datum[idx],
                                                     &column->type));
             }
         }
@@ -2260,9 +3512,9 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
     /* Add updates. */
     any_updates = false;
     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
-        const struct ovsdb_idl_table_class *class = row->table->class;
+        const struct ovsdb_idl_table_class *class = row->table->class_;
 
-        if (!row->new) {
+        if (!row->new_datum) {
             if (class->is_root) {
                 struct json *op = json_object_create();
                 json_object_put_string(op, "op", "delete");
@@ -2273,15 +3525,32 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
             } else {
                 /* Let ovsdb-server decide whether to really delete it. */
             }
-        } else if (row->old != row->new) {
+        } else if (row->old_datum != row->new_datum) {
             struct json *row_json;
-            struct json *op;
             size_t idx;
 
-            op = json_object_create();
-            json_object_put_string(op, "op", row->old ? "update" : "insert");
+            if (!row->old_datum && class->is_singleton) {
+                /* We're inserting a row into a table that allows only a
+                 * single row.  (This is a fairly common OVSDB pattern for
+                 * storing global data.)  Verify that the table is empty
+                 * before inserting the row, so that we get a clear
+                 * verification-related failure if there was an insertion
+                 * race with another client. */
+                struct json *op = json_object_create();
+                json_array_add(operations, op);
+                json_object_put_string(op, "op", "wait");
+                json_object_put_string(op, "table", class->name);
+                json_object_put(op, "where", json_array_create_empty());
+                json_object_put(op, "timeout", json_integer_create(0));
+                json_object_put_string(op, "until", "==");
+                json_object_put(op, "rows", json_array_create_empty());
+            }
+
+            struct json *op = json_object_create();
+            json_object_put_string(op, "op",
+                                   row->old_datum ? "update" : "insert");
             json_object_put_string(op, "table", class->name);
-            if (row->old) {
+            if (row->old_datum) {
                 json_object_put(op, "where", where_uuid_equals(&row->uuid));
             } else {
                 struct ovsdb_idl_txn_insert *insert;
@@ -2290,7 +3559,7 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
 
                 json_object_put(op, "uuid-name",
                                 json_string_create_nocopy(
-                                    uuid_name_from_uuid(&row->uuid)));
+                                    ovsdb_data_row_name(&row->uuid)));
 
                 insert = xmalloc(sizeof *insert);
                 insert->dummy = row->uuid;
@@ -2307,21 +3576,23 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
                     const struct ovsdb_idl_column *column =
                                                         &class->columns[idx];
 
-                    if (row->old
-                        || !ovsdb_datum_is_default(&row->new[idx],
+                    if (row->old_datum
+                        || !ovsdb_datum_is_default(&row->new_datum[idx],
                                                   &column->type)) {
+                        struct json *value;
+
+                        value = ovsdb_datum_to_json(&row->new_datum[idx],
+                                                    &column->type);
                         json_object_put(row_json, column->name,
-                                        substitute_uuids(
-                                            ovsdb_datum_to_json(&row->new[idx],
-                                                                &column->type),
-                                            txn));
+                                        substitute_uuids(value, txn));
 
                         /* If anything really changed, consider it an update.
                          * We can't suppress not-really-changed values earlier
                          * or transactions would become nonatomic (see the big
                          * comment inside ovsdb_idl_txn_write()). */
-                        if (!any_updates && row->old &&
-                            !ovsdb_datum_equals(&row->old[idx], &row->new[idx],
+                        if (!any_updates && row->old_datum &&
+                            !ovsdb_datum_equals(&row->old_datum[idx],
+                                                &row->new_datum[idx],
                                                 &column->type)) {
                             any_updates = true;
                         }
@@ -2329,8 +3600,30 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
                 }
             }
 
-            if (!row->old || !shash_is_empty(json_object(row_json))) {
+            if (!row->old_datum || !shash_is_empty(json_object(row_json))) {
+                json_array_add(operations, op);
+            } else {
+                json_destroy(op);
+            }
+        }
+
+        /* Add mutate operation, for partial map or partial set updates. */
+        if (row->map_op_written || row->set_op_written) {
+            struct json *op, *mutations;
+            bool any_mutations;
+
+            op = json_object_create();
+            json_object_put_string(op, "op", "mutate");
+            json_object_put_string(op, "table", class->name);
+            json_object_put(op, "where", where_uuid_equals(&row->uuid));
+            mutations = json_array_create_empty();
+            any_mutations = ovsdb_idl_txn_extract_mutations(row, mutations);
+            json_object_put(op, "mutations", mutations);
+
+            if (any_mutations) {
+                op = substitute_uuids(op, txn);
                 json_array_add(operations, op);
+                any_updates = true;
             } else {
                 json_destroy(op);
             }
@@ -2338,12 +3631,11 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
     }
 
     /* Add increment. */
-    if (txn->inc_table && any_updates) {
-        struct json *op;
-
+    if (txn->inc_table && (any_updates || txn->inc_force)) {
+        any_updates = true;
         txn->inc_index = operations->u.array.n - 1;
 
-        op = json_object_create();
+        struct json *op = json_object_create();
         json_object_put_string(op, "op", "mutate");
         json_object_put_string(op, "table", txn->inc_table);
         json_object_put(op, "where",
@@ -2521,23 +3813,6 @@ ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
 }
 
-/* Writes 'datum' to the specified 'column' in 'row_'.  Updates both 'row_'
- * itself and the structs derived from it (e.g. the "struct ovsrec_*", for
- * ovs-vswitchd).
- *
- * 'datum' must have the correct type for its column.  The IDL does not check
- * that it meets schema constraints, but ovsdb-server will do so at commit time
- * so it had better be correct.
- *
- * A transaction must be in progress.  Replication of 'column' must not have
- * been disabled (by calling ovsdb_idl_omit()).
- *
- * Usually this function is used indirectly through one of the "set" functions
- * generated by ovsdb-idlc.
- *
- * Takes ownership of what 'datum' points to (and in some cases destroys that
- * data before returning) but makes a copy of 'datum' itself.  (Commonly
- * 'datum' is on the caller's stack.) */
 static void
 ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
                       const struct ovsdb_idl_column *column,
@@ -2548,17 +3823,18 @@ ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
     size_t column_idx;
     bool write_only;
 
+    ovs_assert(!column->is_synthetic);
     if (ovsdb_idl_row_is_synthetic(row)) {
         goto discard_datum;
     }
 
-    class = row->table->class;
+    class = row->table->class_;
     column_idx = column - class->columns;
     write_only = row->table->modes[column_idx] == OVSDB_IDL_MONITOR;
 
-    ovs_assert(row->new != NULL);
+    ovs_assert(row->new_datum != NULL);
     ovs_assert(column_idx < class->n_columns);
-    ovs_assert(row->old == NULL ||
+    ovs_assert(row->old_datum == NULL ||
                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
 
     if (row->table->idl->verify_write_only && !write_only) {
@@ -2587,24 +3863,24 @@ ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
                     uuid_hash(&row->uuid));
     }
-    if (row->old == row->new) {
-        row->new = xmalloc(class->n_columns * sizeof *row->new);
+    if (row->old_datum == row->new_datum) {
+        row->new_datum = xmalloc(class->n_columns * sizeof *row->new_datum);
     }
     if (!row->written) {
         row->written = bitmap_allocate(class->n_columns);
     }
     if (bitmap_is_set(row->written, column_idx)) {
-        ovsdb_datum_destroy(&row->new[column_idx], &column->type);
+        ovsdb_datum_destroy(&row->new_datum[column_idx], &column->type);
     } else {
         bitmap_set1(row->written, column_idx);
     }
     if (owns_datum) {
-        row->new[column_idx] = *datum;
+        row->new_datum[column_idx] = *datum;
     } else {
-        ovsdb_datum_clone(&row->new[column_idx], datum, &column->type);
+        ovsdb_datum_clone(&row->new_datum[column_idx], datum, &column->type);
     }
     (column->unparse)(row);
-    (column->parse)(row, &row->new[column_idx]);
+    (column->parse)(row, &row->new_datum[column_idx]);
     return;
 
 discard_datum:
@@ -2613,14 +3889,40 @@ discard_datum:
     }
 }
 
+/* Writes 'datum' to the specified 'column' in 'row_'.  Updates both 'row_'
+ * itself and the structs derived from it (e.g. the "struct ovsrec_*", for
+ * ovs-vswitchd).
+ *
+ * 'datum' must have the correct type for its column, but it needs not be
+ * sorted or unique because this function will take care of that.  The IDL does
+ * not check that it meets schema constraints, but ovsdb-server will do so at
+ * commit time so it had better be correct.
+ *
+ * A transaction must be in progress.  Replication of 'column' must not have
+ * been disabled (by calling ovsdb_idl_omit()).
+ *
+ * Usually this function is used indirectly through one of the "set" functions
+ * generated by ovsdb-idlc.
+ *
+ * Takes ownership of what 'datum' points to (and in some cases destroys that
+ * data before returning) but makes a copy of 'datum' itself.  (Commonly
+ * 'datum' is on the caller's stack.) */
 void
 ovsdb_idl_txn_write(const struct ovsdb_idl_row *row,
                     const struct ovsdb_idl_column *column,
                     struct ovsdb_datum *datum)
 {
+    ovsdb_datum_sort_unique(datum,
+                            column->type.key.type, column->type.value.type);
     ovsdb_idl_txn_write__(row, column, datum, true);
 }
 
+/* Similar to ovsdb_idl_txn_write(), except:
+ *
+ *     - The caller retains ownership of 'datum' and what it points to.
+ *
+ *     - The caller must ensure that 'datum' is sorted and unique (e.g. via
+ *       ovsdb_datum_sort_unique().) */
 void
 ovsdb_idl_txn_write_clone(const struct ovsdb_idl_row *row,
                           const struct ovsdb_idl_column *column,
@@ -2634,9 +3936,7 @@ ovsdb_idl_txn_write_clone(const struct ovsdb_idl_row *row,
  * prerequisite to completing the transaction.  That is, if 'column' in 'row_'
  * changed (or if 'row_' was deleted) between the time that the IDL originally
  * read its contents and the time that the transaction commits, then the
- * transaction aborts and ovsdb_idl_txn_commit() returns TXN_AGAIN_WAIT or
- * TXN_AGAIN_NOW (depending on whether the database change has already been
- * received).
+ * transaction aborts and ovsdb_idl_txn_commit() returns TXN_TRY_AGAIN.
  *
  * The intention is that, to ensure that no transaction commits based on dirty
  * reads, an application should call ovsdb_idl_txn_verify() on each data item
@@ -2670,13 +3970,13 @@ ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
         return;
     }
 
-    class = row->table->class;
+    class = row->table->class_;
     column_idx = column - class->columns;
 
-    ovs_assert(row->new != NULL);
-    ovs_assert(row->old == NULL ||
+    ovs_assert(row->new_datum != NULL);
+    ovs_assert(row->old_datum == NULL ||
                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
-    if (!row->old
+    if (!row->old_datum
         || (row->written && bitmap_is_set(row->written, column_idx))) {
         return;
     }
@@ -2707,8 +4007,8 @@ ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
         return;
     }
 
-    ovs_assert(row->new != NULL);
-    if (!row->old) {
+    ovs_assert(row->new_datum != NULL);
+    if (!row->old_datum) {
         ovsdb_idl_row_unparse(row);
         ovsdb_idl_row_clear_new(row);
         ovs_assert(!row->prereqs);
@@ -2722,7 +4022,7 @@ ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
                     uuid_hash(&row->uuid));
     }
     ovsdb_idl_row_clear_new(row);
-    row->new = NULL;
+    row->new_datum = NULL;
 }
 
 /* Inserts and returns a new row in the table with the specified 'class' in the
@@ -2752,7 +4052,7 @@ ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
     }
 
     row->table = ovsdb_idl_table_from_class(txn->idl, class);
-    row->new = xmalloc(class->n_columns * sizeof *row->new);
+    row->new_datum = xmalloc(class->n_columns * sizeof *row->new_datum);
     hmap_insert(&row->table->rows, &row->hmap_node, uuid_hash(&row->uuid));
     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
     return row;
@@ -2877,11 +4177,10 @@ ovsdb_idl_txn_process_insert_reply(struct ovsdb_idl_txn_insert *insert,
 
     error = ovsdb_atom_from_json(&uuid, &uuid_type, json_uuid, NULL);
     if (error) {
-        char *s = ovsdb_error_to_string(error);
+        char *s = ovsdb_error_to_string_free(error);
         VLOG_WARN_RL(&syntax_rl, "\"insert\" reply \"uuid\" is not a JSON "
                      "UUID: %s", s);
         free(s);
-        ovsdb_error_destroy(error);
         return false;
     }
 
@@ -2931,9 +4230,14 @@ ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
                             soft_errors++;
                         } else if (!strcmp(error->u.string, "not owner")) {
                             lock_errors++;
+                        } else if (!strcmp(error->u.string, "not allowed")) {
+                            hard_errors++;
+                            ovsdb_idl_txn_set_error_json(txn, op);
                         } else if (strcmp(error->u.string, "aborted")) {
                             hard_errors++;
                             ovsdb_idl_txn_set_error_json(txn, op);
+                            VLOG_WARN_RL(&other_rl,
+                                         "transaction error: %s", txn->error);
                         }
                     } else {
                         hard_errors++;
@@ -3058,7 +4362,7 @@ ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
 {
     if (new_has_lock && !idl->has_lock) {
         if (idl->state == IDL_S_MONITORING ||
-            idl->state == IDL_S_MONITORING2) {
+            idl->state == IDL_S_MONITORING_COND) {
             idl->change_seqno++;
         } else {
             /* We're setting up a session, so don't signal that the database
@@ -3143,6 +4447,205 @@ ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
     }
 }
 
+/* Inserts a new Map Operation into current transaction. */
+static void
+ovsdb_idl_txn_add_map_op(struct ovsdb_idl_row *row,
+                         const struct ovsdb_idl_column *column,
+                         struct ovsdb_datum *datum,
+                         enum map_op_type op_type)
+{
+    const struct ovsdb_idl_table_class *class;
+    size_t column_idx;
+    struct map_op *map_op;
+
+    class = row->table->class_;
+    column_idx = column - class->columns;
+
+    /* Check if a map operation list exists for this column. */
+    if (!row->map_op_written) {
+        row->map_op_written = bitmap_allocate(class->n_columns);
+        row->map_op_lists = xzalloc(class->n_columns *
+                                    sizeof *row->map_op_lists);
+    }
+    if (!row->map_op_lists[column_idx]) {
+        row->map_op_lists[column_idx] = map_op_list_create();
+    }
+
+    /* Add a map operation to the corresponding list. */
+    map_op = map_op_create(datum, op_type);
+    bitmap_set1(row->map_op_written, column_idx);
+    map_op_list_add(row->map_op_lists[column_idx], map_op, &column->type);
+
+    /* Add this row to transaction's list of rows. */
+    if (hmap_node_is_null(&row->txn_node)) {
+        hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
+                    uuid_hash(&row->uuid));
+    }
+}
+
+/* Inserts a new Set Operation into current transaction. */
+static void
+ovsdb_idl_txn_add_set_op(struct ovsdb_idl_row *row,
+                         const struct ovsdb_idl_column *column,
+                         struct ovsdb_datum *datum,
+                         enum set_op_type op_type)
+{
+    const struct ovsdb_idl_table_class *class;
+    size_t column_idx;
+    struct set_op *set_op;
+
+    class = row->table->class_;
+    column_idx = column - class->columns;
+
+    /* Check if a set operation list exists for this column. */
+    if (!row->set_op_written) {
+        row->set_op_written = bitmap_allocate(class->n_columns);
+        row->set_op_lists = xzalloc(class->n_columns *
+                                    sizeof *row->set_op_lists);
+    }
+    if (!row->set_op_lists[column_idx]) {
+        row->set_op_lists[column_idx] = set_op_list_create();
+    }
+
+    /* Add a set operation to the corresponding list. */
+    set_op = set_op_create(datum, op_type);
+    bitmap_set1(row->set_op_written, column_idx);
+    set_op_list_add(row->set_op_lists[column_idx], set_op, &column->type);
+
+    /* Add this row to the transactions's list of rows. */
+    if (hmap_node_is_null(&row->txn_node)) {
+        hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
+                    uuid_hash(&row->uuid));
+    }
+}
+
+static bool
+is_valid_partial_update(const struct ovsdb_idl_row *row,
+                        const struct ovsdb_idl_column *column,
+                        struct ovsdb_datum *datum)
+{
+    /* Verify that this column is being monitored. */
+    unsigned int column_idx = column - row->table->class_->columns;
+    if (!(row->table->modes[column_idx] & OVSDB_IDL_MONITOR)) {
+        VLOG_WARN("cannot partially update non-monitored column");
+        return false;
+    }
+
+    /* Verify that the update affects a single element. */
+    if (datum->n != 1) {
+        VLOG_WARN("invalid datum for partial update");
+        return false;
+    }
+
+    return true;
+}
+
+/* Inserts the value described in 'datum' into the map in 'column' in
+ * 'row_'. If the value doesn't already exist in 'column' then it's value
+ * is added.  The value in 'datum' must be of the same type as the values
+ * in 'column'.  This function takes ownership of 'datum'.
+ *
+ * Usually this function is used indirectly through one of the "update"
+ * functions generated by vswitch-idl. */
+void
+ovsdb_idl_txn_write_partial_set(const struct ovsdb_idl_row *row_,
+                                const struct ovsdb_idl_column *column,
+                                struct ovsdb_datum *datum)
+{
+    struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
+    enum set_op_type op_type;
+
+    if (!is_valid_partial_update(row, column, datum)) {
+        ovsdb_datum_destroy(datum, &column->type);
+        free(datum);
+        return;
+    }
+
+    op_type = SET_OP_INSERT;
+
+    ovsdb_idl_txn_add_set_op(row, column, datum, op_type);
+}
+
+/* Deletes the value specified in 'datum' from the set in 'column' in 'row_'.
+ * The value in 'datum' must be of the same type as the keys in 'column'.
+ * This function takes ownership of 'datum'.
+ *
+ * Usually this function is used indirectly through one of the "update"
+ * functions generated by vswitch-idl. */
+void
+ovsdb_idl_txn_delete_partial_set(const struct ovsdb_idl_row *row_,
+                                 const struct ovsdb_idl_column *column,
+                                 struct ovsdb_datum *datum)
+{
+    struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
+
+    if (!is_valid_partial_update(row, column, datum)) {
+        struct ovsdb_type type_ = column->type;
+        type_.value.type = OVSDB_TYPE_VOID;
+        ovsdb_datum_destroy(datum, &type_);
+        free(datum);
+        return;
+    }
+    ovsdb_idl_txn_add_set_op(row, column, datum, SET_OP_DELETE);
+}
+
+/* Inserts the key-value specified in 'datum' into the map in 'column' in
+ * 'row_'. If the key already exist in 'column', then it's value is updated
+ * with the value in 'datum'. The key-value in 'datum' must be of the same type
+ * as the keys-values in 'column'. This function takes ownership of 'datum'.
+ *
+ * Usually this function is used indirectly through one of the "update"
+ * functions generated by vswitch-idl. */
+void
+ovsdb_idl_txn_write_partial_map(const struct ovsdb_idl_row *row_,
+                                const struct ovsdb_idl_column *column,
+                                struct ovsdb_datum *datum)
+{
+    struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
+    enum ovsdb_atomic_type key_type;
+    enum map_op_type op_type;
+    unsigned int pos;
+    const struct ovsdb_datum *old_datum;
+
+    if (!is_valid_partial_update(row, column, datum)) {
+        ovsdb_datum_destroy(datum, &column->type);
+        free(datum);
+        return;
+    }
+
+    /* Find out if this is an insert or an update. */
+    key_type = column->type.key.type;
+    old_datum = ovsdb_idl_read(row, column);
+    pos = ovsdb_datum_find_key(old_datum, &datum->keys[0], key_type);
+    op_type = pos == UINT_MAX ? MAP_OP_INSERT : MAP_OP_UPDATE;
+
+    ovsdb_idl_txn_add_map_op(row, column, datum, op_type);
+}
+
+/* Deletes the key specified in 'datum' from the map in 'column' in 'row_'.
+ * The key in 'datum' must be of the same type as the keys in 'column'.
+ * The value in 'datum' must be NULL. This function takes ownership of
+ * 'datum'.
+ *
+ * Usually this function is used indirectly through one of the "update"
+ * functions generated by vswitch-idl. */
+void
+ovsdb_idl_txn_delete_partial_map(const struct ovsdb_idl_row *row_,
+                                 const struct ovsdb_idl_column *column,
+                                 struct ovsdb_datum *datum)
+{
+    struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_);
+
+    if (!is_valid_partial_update(row, column, datum)) {
+        struct ovsdb_type type_ = column->type;
+        type_.value.type = OVSDB_TYPE_VOID;
+        ovsdb_datum_destroy(datum, &type_);
+        free(datum);
+        return;
+    }
+    ovsdb_idl_txn_add_map_op(row, column, datum, MAP_OP_DELETE);
+}
+
 void
 ovsdb_idl_loop_destroy(struct ovsdb_idl_loop *loop)
 {
@@ -3162,7 +4665,27 @@ ovsdb_idl_loop_run(struct ovsdb_idl_loop *loop)
     return loop->open_txn;
 }
 
-void
+/* Attempts to commit the current transaction, if one is open, and sets up the
+ * poll loop to wake up when some more work might be needed.
+ *
+ * If a transaction was open, in this or a previous iteration of the main loop,
+ * and had not before finished committing (successfully or unsuccessfully), the
+ * return value is one of:
+ *
+ *  1: The transaction committed successfully (or it did not change anything in
+ *     the database).
+ *  0: The transaction failed.
+ * -1: The commit is still in progress.
+ *
+ * Thus, the return value is -1 if the transaction is in progress and otherwise
+ * true for success, false for failure.
+ *
+ * (In the corner case where the IDL sends a transaction to the database and
+ * the database commits it, and the connection between the IDL and the database
+ * drops before the IDL receives the message confirming the commit, this
+ * function can return 0 even though the transaction succeeded.)
+ */
+int
 ovsdb_idl_loop_commit_and_wait(struct ovsdb_idl_loop *loop)
 {
     if (loop->open_txn) {
@@ -3173,6 +4696,7 @@ ovsdb_idl_loop_commit_and_wait(struct ovsdb_idl_loop *loop)
     }
 
     struct ovsdb_idl_txn *txn = loop->committing_txn;
+    int retval;
     if (txn) {
         enum ovsdb_idl_txn_status status = ovsdb_idl_txn_commit(txn);
         if (status != TXN_INCOMPLETE) {
@@ -3185,32 +4709,44 @@ ovsdb_idl_loop_commit_and_wait(struct ovsdb_idl_loop *loop)
                 if (ovsdb_idl_get_seqno(loop->idl) != loop->skip_seqno) {
                     poll_immediate_wake();
                 }
+                retval = 0;
                 break;
 
             case TXN_SUCCESS:
-                /* If the database has already changed since we started the
-                 * commit, re-evaluate it immediately to avoid missing a change
-                 * for a while. */
-                if (ovsdb_idl_get_seqno(loop->idl) != loop->precommit_seqno) {
-                    poll_immediate_wake();
-                }
+                /* Possibly some work on the database was deferred because no
+                 * further transaction could proceed.  Wake up again. */
+                retval = 1;
+                loop->cur_cfg = loop->next_cfg;
+                poll_immediate_wake();
                 break;
 
             case TXN_UNCHANGED:
+                retval = 1;
+                loop->cur_cfg = loop->next_cfg;
+                break;
+
             case TXN_ABORTED:
             case TXN_NOT_LOCKED:
             case TXN_ERROR:
+                retval = 0;
                 break;
 
             case TXN_UNCOMMITTED:
             case TXN_INCOMPLETE:
+            default:
                 OVS_NOT_REACHED();
-
             }
             ovsdb_idl_txn_destroy(txn);
             loop->committing_txn = NULL;
+        } else {
+            retval = -1;
         }
+    } else {
+        /* Not a meaningful return value: no transaction was in progress. */
+        retval = 1;
     }
 
     ovsdb_idl_wait(loop->idl);
+
+    return retval;
 }