]> git.proxmox.com Git - mirror_ovs.git/blobdiff - ovsdb/transaction.c
vconn: Allow timeout configuration for blocking connection.
[mirror_ovs.git] / ovsdb / transaction.c
index 9e12a622572fdc13bdfdbbd2cf9806a52a52abcd..5a43132e4ab863e9c5c730051e61645aad8a885c 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2017 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 #include "bitmap.h"
 #include "openvswitch/dynamic-string.h"
+#include "file.h"
 #include "hash.h"
-#include "hmap.h"
-#include "json.h"
+#include "monitor.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/json.h"
 #include "openvswitch/list.h"
+#include "openvswitch/poll-loop.h"
+#include "openvswitch/vlog.h"
 #include "ovsdb-error.h"
 #include "ovsdb.h"
 #include "row.h"
+#include "storage.h"
 #include "table.h"
-#include "perf-counter.h"
 #include "uuid.h"
 
+VLOG_DEFINE_THIS_MODULE(transaction);
+
 struct ovsdb_txn {
     struct ovsdb *db;
     struct ovs_list txn_tables; /* Contains "struct ovsdb_txn_table"s. */
@@ -219,7 +225,7 @@ ovsdb_txn_adjust_atom_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
         return NULL;
     }
 
-    table = base->u.uuid.refTable;
+    table = base->uuid.refTable;
     for (i = 0; i < n; i++) {
         const struct uuid *uuid = &atoms[i].uuid;
         struct ovsdb_txn_row *txn_row;
@@ -316,7 +322,7 @@ delete_row_refs(struct ovsdb_txn *txn, const struct ovsdb_row *row,
         return NULL;
     }
 
-    table = base->u.uuid.refTable;
+    table = base->uuid.refTable;
     for (i = 0; i < n; i++) {
         const struct uuid *uuid = &atoms[i].uuid;
         struct ovsdb_txn_row *txn_row;
@@ -436,9 +442,48 @@ ovsdb_txn_row_commit(struct ovsdb_txn *txn OVS_UNUSED,
     return NULL;
 }
 
+static struct ovsdb_error *
+ovsdb_txn_update_weak_refs(struct ovsdb_txn *txn OVS_UNUSED,
+                           struct ovsdb_txn_row *txn_row)
+{
+    struct ovsdb_weak_ref *weak, *next;
+
+    /* Remove the weak references originating in the old version of the row. */
+    if (txn_row->old) {
+        LIST_FOR_EACH_SAFE (weak, next, src_node, &txn_row->old->src_refs) {
+            ovs_list_remove(&weak->src_node);
+            ovs_list_remove(&weak->dst_node);
+            free(weak);
+        }
+    }
+
+    /* Although the originating rows have the responsibility of updating the
+     * weak references in the dst, it is possible that some source rows aren't
+     * part of the transaction.  In that situation this row needs to move the
+     * list of incoming weak references from the old row into the new one.
+     */
+    if (txn_row->old && txn_row->new) {
+        /* Move the incoming weak references from old to new. */
+        ovs_list_push_back_all(&txn_row->new->dst_refs,
+                               &txn_row->old->dst_refs);
+    }
+
+    /* Insert the weak references originating in the new version of the row. */
+    struct ovsdb_row *dst_row;
+    if (txn_row->new) {
+        LIST_FOR_EACH (weak, src_node, &txn_row->new->src_refs) {
+            /* dst_row MUST exist. */
+            dst_row = CONST_CAST(struct ovsdb_row *,
+                    ovsdb_table_get_row(weak->dst_table, &weak->dst));
+            ovs_list_insert(&dst_row->dst_refs, &weak->dst_node);
+        }
+    }
+
+    return NULL;
+}
+
 static void
-add_weak_ref(struct ovsdb_txn *txn,
-             const struct ovsdb_row *src_, const struct ovsdb_row *dst_)
+add_weak_ref(const struct ovsdb_row *src_, const struct ovsdb_row *dst_)
 {
     struct ovsdb_row *src = CONST_CAST(struct ovsdb_row *, src_);
     struct ovsdb_row *dst = CONST_CAST(struct ovsdb_row *, dst_);
@@ -448,8 +493,6 @@ add_weak_ref(struct ovsdb_txn *txn,
         return;
     }
 
-    dst = ovsdb_txn_row_modify(txn, dst);
-
     if (!ovs_list_is_empty(&dst->dst_refs)) {
         /* Omit duplicates. */
         weak = CONTAINER_OF(ovs_list_back(&dst->dst_refs),
@@ -461,7 +504,10 @@ add_weak_ref(struct ovsdb_txn *txn,
 
     weak = xmalloc(sizeof *weak);
     weak->src = src;
-    ovs_list_push_back(&dst->dst_refs, &weak->dst_node);
+    weak->dst_table = dst->table;
+    weak->dst = *ovsdb_row_get_uuid(dst);
+    /* The dst_refs list is updated at commit time. */
+    ovs_list_init(&weak->dst_node);
     ovs_list_push_back(&src->src_refs, &weak->src_node);
 }
 
@@ -471,7 +517,7 @@ assess_weak_refs(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
     struct ovsdb_table *table;
     struct shash_node *node;
 
-    if (txn_row->old) {
+    if (txn_row->old && !txn_row->new) {
         /* Mark rows that have weak references to 'txn_row' as modified, so
          * that their weak references will get reassessed. */
         struct ovsdb_weak_ref *weak, *next;
@@ -503,10 +549,10 @@ assess_weak_refs(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
             for (i = 0; i < datum->n; ) {
                 const struct ovsdb_row *row;
 
-                row = ovsdb_table_get_row(column->type.key.u.uuid.refTable,
+                row = ovsdb_table_get_row(column->type.key.uuid.refTable,
                                           &datum->keys[i].uuid);
                 if (row) {
-                    add_weak_ref(txn, txn_row->new, row);
+                    add_weak_ref(txn_row->new, row);
                     i++;
                 } else {
                     if (uuid_is_zero(&datum->keys[i].uuid)) {
@@ -521,10 +567,10 @@ assess_weak_refs(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
             for (i = 0; i < datum->n; ) {
                 const struct ovsdb_row *row;
 
-                row = ovsdb_table_get_row(column->type.value.u.uuid.refTable,
+                row = ovsdb_table_get_row(column->type.value.uuid.refTable,
                                           &datum->values[i].uuid);
                 if (row) {
-                    add_weak_ref(txn, txn_row->new, row);
+                    add_weak_ref(txn_row->new, row);
                     i++;
                 } else {
                     if (uuid_is_zero(&datum->values[i].uuid)) {
@@ -717,31 +763,40 @@ static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
 check_index_uniqueness(struct ovsdb_txn *txn OVS_UNUSED,
                        struct ovsdb_txn_row *txn_row)
 {
-    struct ovsdb_txn_table *txn_table = txn_row->table->txn_table;
-    struct ovsdb_table *table = txn_row->table;
+    /* Skip rows that are being deleted since they can't violate uniqueness. */
     struct ovsdb_row *row = txn_row->new;
-    size_t i;
-
     if (!row) {
         return NULL;
     }
 
-    for (i = 0; i < table->schema->n_indexes; i++) {
+    struct ovsdb_txn_table *txn_table = txn_row->table->txn_table;
+    struct ovsdb_table *table = txn_row->table;
+    for (size_t i = 0; i < table->schema->n_indexes; i++) {
         const struct ovsdb_column_set *index = &table->schema->indexes[i];
-        struct ovsdb_row *irow;
-        uint32_t hash;
-
-        hash = ovsdb_row_hash_columns(row, index, 0);
-        irow = ovsdb_index_search(&txn_table->txn_indexes[i], row, i, hash);
+        uint32_t hash = ovsdb_row_hash_columns(row, index, 0);
+
+        /* Check whether the row has a match in the temporary hash table that
+         * we're building.  If we add two rows with the same index data, then
+         * there's a duplicate within the rows added or modified in this
+         * transaction.*/
+        struct ovsdb_row *irow
+            = ovsdb_index_search(&txn_table->txn_indexes[i], row, i, hash);
         if (irow) {
             return duplicate_index_row(index, irow, row);
         }
 
+        /* Also check whether the row has a match in the table's real index
+         * (which won't be updated until transaction commit is certain).  If
+         * there's a match, and it's for a row that wasn't pulled into the
+         * transaction, then it's a duplicate.  (If it is for a row that is
+         * part of the transaction, then the first check has already handled
+         * it.) */
         irow = ovsdb_index_search(&table->indexes[i], row, i, hash);
         if (irow && !irow->txn_row) {
             return duplicate_index_row(index, irow, row);
         }
 
+        /* Add row to temporary hash table. */
         hmap_insert(&txn_table->txn_indexes[i],
                     ovsdb_row_get_index_node(row, i), hash);
     }
@@ -764,41 +819,43 @@ update_version(struct ovsdb_txn *txn OVS_UNUSED, struct ovsdb_txn_row *txn_row)
     return NULL;
 }
 
-static struct ovsdb_error *
-ovsdb_txn_commit_(struct ovsdb_txn *txn, bool durable)
+static bool
+ovsdb_txn_is_empty(const struct ovsdb_txn *txn)
+{
+    return ovs_list_is_empty(&txn->txn_tables);
+}
+
+static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_txn_precommit(struct ovsdb_txn *txn)
 {
-    struct ovsdb_replica *replica;
     struct ovsdb_error *error;
 
     /* Figure out what actually changed, and abort early if the transaction
      * was really a no-op. */
     error = for_each_txn_row(txn, determine_changes);
     if (error) {
+        ovsdb_txn_abort(txn);
         return OVSDB_WRAP_BUG("can't happen", error);
     }
     if (ovs_list_is_empty(&txn->txn_tables)) {
-        ovsdb_txn_abort(txn);
         return NULL;
     }
 
     /* Update reference counts and check referential integrity. */
     error = update_ref_counts(txn);
     if (error) {
-        ovsdb_txn_abort(txn);
         return error;
     }
 
     /* Delete unreferenced, non-root rows. */
     error = for_each_txn_row(txn, collect_garbage);
     if (error) {
-        ovsdb_txn_abort(txn);
         return OVSDB_WRAP_BUG("can't happen", error);
     }
 
     /* Check maximum rows table constraints. */
     error = check_max_rows(txn);
     if (error) {
-        ovsdb_txn_abort(txn);
         return error;
     }
 
@@ -806,14 +863,12 @@ ovsdb_txn_commit_(struct ovsdb_txn *txn, bool durable)
      * integrity. */
     error = for_each_txn_row(txn, assess_weak_refs);
     if (error) {
-        ovsdb_txn_abort(txn);
         return error;
     }
 
     /* Verify that the indexes will still be unique post-transaction. */
     error = for_each_txn_row(txn, check_index_uniqueness);
     if (error) {
-        ovsdb_txn_abort(txn);
         return error;
     }
 
@@ -823,34 +878,193 @@ ovsdb_txn_commit_(struct ovsdb_txn *txn, bool durable)
         return OVSDB_WRAP_BUG("can't happen", error);
     }
 
-    /* Send the commit to each replica. */
-    LIST_FOR_EACH (replica, node, &txn->db->replicas) {
-        error = (replica->class->commit)(replica, txn, durable);
-        if (error) {
-            /* We don't support two-phase commit so only the first replica is
-             * allowed to report an error. */
-            ovs_assert(&replica->node == txn->db->replicas.next);
+    return error;
+}
+
+/* Finalize commit. */
+void
+ovsdb_txn_complete(struct ovsdb_txn *txn)
+{
+    if (!ovsdb_txn_is_empty(txn)) {
+        txn->db->run_triggers = true;
+        ovsdb_monitors_commit(txn->db, txn);
+        ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_update_weak_refs));
+        ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_row_commit));
+    }
+    ovsdb_txn_free(txn);
+}
+
+/* Applies 'txn' to the internal representation of the database.  This is for
+ * transactions that don't need to be written to storage; probably, they came
+ * from storage.  These transactions shouldn't ordinarily fail because storage
+ * should contain only consistent transactions.  (One exception is for database
+ * conversion in ovsdb_convert().) */
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_txn_replay_commit(struct ovsdb_txn *txn)
+{
+    struct ovsdb_error *error = ovsdb_txn_precommit(txn);
+    if (error) {
+        ovsdb_txn_abort(txn);
+    } else {
+        ovsdb_txn_complete(txn);
+    }
+    return error;
+}
+
+/* If 'error' is nonnull, the transaction is complete, with the given error as
+ * the result.
+ *
+ * Otherwise, if 'write' is nonnull, then the transaction is waiting for
+ * 'write' to complete.
+ *
+ * Otherwise, if 'commit_index' is nonzero, then the transaction is waiting for
+ * 'commit_index' to be applied to the storage.
+ *
+ * Otherwise, the transaction is complete and successful. */
+struct ovsdb_txn_progress {
+    struct ovsdb_error *error;
+    struct ovsdb_write *write;
+    uint64_t commit_index;
+
+    struct ovsdb_storage *storage;
+};
+
+struct ovsdb_txn_progress *
+ovsdb_txn_propose_schema_change(struct ovsdb *db,
+                                const struct json *schema,
+                                const struct json *data)
+{
+    struct ovsdb_txn_progress *progress = xzalloc(sizeof *progress);
+    progress->storage = db->storage;
+
+    struct uuid next;
+    struct ovsdb_write *write = ovsdb_storage_write_schema_change(
+        db->storage, schema, data, &db->prereq, &next);
+    if (!ovsdb_write_is_complete(write)) {
+        progress->write = write;
+    } else {
+        progress->error = ovsdb_error_clone(ovsdb_write_get_error(write));
+        ovsdb_write_destroy(write);
+    }
+    return progress;
+}
+
+struct ovsdb_txn_progress *
+ovsdb_txn_propose_commit(struct ovsdb_txn *txn, bool durable)
+{
+    struct ovsdb_txn_progress *progress = xzalloc(sizeof *progress);
+    progress->storage = txn->db->storage;
+    progress->error = ovsdb_txn_precommit(txn);
+    if (progress->error) {
+        return progress;
+    }
+
+    /* Turn the commit into the format used for the storage logs.. */
+    struct json *txn_json = ovsdb_file_txn_to_json(txn);
+    if (!txn_json) {
+        /* Nothing to do, so success. */
+        return progress;
+    }
+    txn_json = ovsdb_file_txn_annotate(txn_json, ovsdb_txn_get_comment(txn));
+
+    struct uuid next;
+    struct ovsdb_write *write = ovsdb_storage_write(
+        txn->db->storage, txn_json, &txn->db->prereq, &next, durable);
+    json_destroy(txn_json);
+    if (!ovsdb_write_is_complete(write)) {
+        progress->write = write;
+    } else {
+        progress->error = ovsdb_error_clone(ovsdb_write_get_error(write));
+        ovsdb_write_destroy(write);
+    }
+    return progress;
+}
+
+/* Proposes 'txn' for commitment and then waits for the commit to succeed or
+ * fail.  Returns null if successful, otherwise the error.
+ *
+ * **In addition**, this function also completes or aborts the transaction if
+ * the transaction succeeded or failed, respectively. */
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_txn_propose_commit_block(struct ovsdb_txn *txn, bool durable)
+{
+    struct ovsdb_txn_progress *p = ovsdb_txn_propose_commit(txn, durable);
+    for (;;) {
+        ovsdb_storage_run(p->storage);
+        if (ovsdb_txn_progress_is_complete(p)) {
+            struct ovsdb_error *error
+                = ovsdb_error_clone(ovsdb_txn_progress_get_error(p));
+            ovsdb_txn_progress_destroy(p);
+
+            if (error) {
+                ovsdb_txn_abort(txn);
+            } else {
+                ovsdb_txn_complete(txn);
+            }
 
-            ovsdb_txn_abort(txn);
             return error;
         }
+        ovsdb_storage_wait(p->storage);
+        poll_block();
     }
+}
 
-    /* Finalize commit. */
-    txn->db->run_triggers = true;
-    ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_row_commit));
-    ovsdb_txn_free(txn);
+static void
+ovsdb_txn_progress_run(struct ovsdb_txn_progress *p)
+{
+    if (p->error) {
+        return;
+    }
 
-    return NULL;
+    if (p->write) {
+        if (!ovsdb_write_is_complete(p->write)) {
+            return;
+        }
+        p->error = ovsdb_error_clone(ovsdb_write_get_error(p->write));
+        p->commit_index = ovsdb_write_get_commit_index(p->write);
+        ovsdb_write_destroy(p->write);
+        p->write = NULL;
+
+        if (p->error) {
+            return;
+        }
+    }
+
+    if (p->commit_index) {
+        if (ovsdb_storage_get_applied_index(p->storage) >= p->commit_index) {
+            p->commit_index = 0;
+        }
+    }
 }
 
-struct ovsdb_error *
-ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
+static bool
+ovsdb_txn_progress_is_complete__(const struct ovsdb_txn_progress *p)
 {
-   struct ovsdb_error *err;
+    return p->error || (!p->write && !p->commit_index);
+}
+
+bool
+ovsdb_txn_progress_is_complete(const struct ovsdb_txn_progress *p)
+{
+    ovsdb_txn_progress_run(CONST_CAST(struct ovsdb_txn_progress *, p));
+    return ovsdb_txn_progress_is_complete__(p);
+}
 
-   PERF(__func__, err = ovsdb_txn_commit_(txn, durable));
-   return err;
+const struct ovsdb_error *
+ovsdb_txn_progress_get_error(const struct ovsdb_txn_progress *p)
+{
+    ovs_assert(ovsdb_txn_progress_is_complete__(p));
+    return p->error;
+}
+
+void
+ovsdb_txn_progress_destroy(struct ovsdb_txn_progress *p)
+{
+    if (p) {
+        ovsdb_error_destroy(p->error);
+        ovsdb_write_destroy(p->write);
+        free(p);
+    }
 }
 
 void