1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2017 Nicira, Inc.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 #include "transaction.h"
21 #include "openvswitch/dynamic-string.h"
25 #include "openvswitch/hmap.h"
26 #include "openvswitch/json.h"
27 #include "openvswitch/list.h"
28 #include "openvswitch/poll-loop.h"
29 #include "openvswitch/vlog.h"
30 #include "ovsdb-error.h"
37 VLOG_DEFINE_THIS_MODULE(transaction
);
41 struct ovs_list txn_tables
; /* Contains "struct ovsdb_txn_table"s. */
43 struct uuid txnid
; /* For clustered mode only. It is the eid. */
46 /* A table modified by a transaction. */
47 struct ovsdb_txn_table
{
48 struct ovs_list node
; /* Element in ovsdb_txn's txn_tables list. */
49 struct ovsdb_table
*table
;
50 struct hmap txn_rows
; /* Contains "struct ovsdb_txn_row"s. */
52 /* This has the same form as the 'indexes' member of struct ovsdb_table,
53 * but it is only used or updated at transaction commit time, from
54 * check_index_uniqueness(). */
55 struct hmap
*txn_indexes
;
57 /* Used by for_each_txn_row(). */
58 unsigned int serial
; /* Serial number of in-progress iteration. */
59 unsigned int n_processed
; /* Number of rows processed. */
62 /* A row modified by the transaction:
64 * - A row added by a transaction will have null 'old' and non-null 'new'.
66 * - A row deleted by a transaction will have non-null 'old' and null
69 * - A row modified by a transaction will have non-null 'old' and 'new'.
71 * - 'old' and 'new' both null indicates that a row was added then deleted
72 * within a single transaction. Most of the time we instead delete the
73 * ovsdb_txn_row entirely, but inside a for_each_txn_row() callback
74 * there are restrictions that sometimes mean we have to leave the
75 * ovsdb_txn_row in place.
77 struct ovsdb_txn_row
{
78 struct hmap_node hmap_node
; /* In ovsdb_txn_table's txn_rows hmap. */
79 struct ovsdb_row
*old
; /* The old row. */
80 struct ovsdb_row
*new; /* The new row. */
81 size_t n_refs
; /* Number of remaining references. */
83 /* These members are the same as the corresponding members of 'old' or
84 * 'new'. They are present here for convenience and because occasionally
85 * there can be an ovsdb_txn_row where both 'old' and 'new' are NULL. */
87 struct ovsdb_table
*table
;
89 /* Used by for_each_txn_row(). */
90 unsigned int serial
; /* Serial number of in-progress commit. */
92 unsigned long changed
[]; /* Bits set to 1 for columns that changed. */
95 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
96 delete_garbage_row(struct ovsdb_txn
*txn
, struct ovsdb_txn_row
*r
);
97 static void ovsdb_txn_row_prefree(struct ovsdb_txn_row
*);
98 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
99 for_each_txn_row(struct ovsdb_txn
*txn
,
100 struct ovsdb_error
*(*)(struct ovsdb_txn
*,
101 struct ovsdb_txn_row
*));
103 /* Used by for_each_txn_row() to track tables and rows that have been
105 static unsigned int serial
;
108 ovsdb_txn_create(struct ovsdb
*db
)
110 struct ovsdb_txn
*txn
= xzalloc(sizeof *txn
);
112 ovs_list_init(&txn
->txn_tables
);
113 ds_init(&txn
->comment
);
118 ovsdb_txn_set_txnid(const struct uuid
*txnid
, struct ovsdb_txn
*txn
)
124 ovsdb_txn_get_txnid(const struct ovsdb_txn
*txn
)
130 ovsdb_txn_free(struct ovsdb_txn
*txn
)
132 ovs_assert(ovs_list_is_empty(&txn
->txn_tables
));
133 ds_destroy(&txn
->comment
);
137 static struct ovsdb_error
*
138 ovsdb_txn_row_abort(struct ovsdb_txn
*txn OVS_UNUSED
,
139 struct ovsdb_txn_row
*txn_row
)
141 struct ovsdb_row
*old
= txn_row
->old
;
142 struct ovsdb_row
*new = txn_row
->new;
144 ovsdb_txn_row_prefree(txn_row
);
147 hmap_remove(&new->table
->rows
, &new->hmap_node
);
150 hmap_insert(&old
->table
->rows
, &old
->hmap_node
, ovsdb_row_hash(old
));
152 hmap_replace(&new->table
->rows
, &new->hmap_node
, &old
->hmap_node
);
154 ovsdb_row_destroy(new);
160 /* Returns the offset in bytes from the start of an ovsdb_row for 'table' to
161 * the hmap_node for the index numbered 'i'. */
163 ovsdb_row_index_offset__(const struct ovsdb_table
*table
, size_t i
)
165 size_t n_fields
= shash_count(&table
->schema
->columns
);
166 return (offsetof(struct ovsdb_row
, fields
)
167 + n_fields
* sizeof(struct ovsdb_datum
)
168 + i
* sizeof(struct hmap_node
));
171 /* Returns the hmap_node in 'row' for the index numbered 'i'. */
172 static struct hmap_node
*
173 ovsdb_row_get_index_node(struct ovsdb_row
*row
, size_t i
)
175 return (void *) ((char *) row
+ ovsdb_row_index_offset__(row
->table
, i
));
178 /* Returns the ovsdb_row given 'index_node', which is a pointer to that row's
179 * hmap_node for the index numbered 'i' within 'table'. */
180 static struct ovsdb_row
*
181 ovsdb_row_from_index_node(struct hmap_node
*index_node
,
182 const struct ovsdb_table
*table
, size_t i
)
184 return (void *) ((char *) index_node
- ovsdb_row_index_offset__(table
, i
));
188 ovsdb_txn_abort(struct ovsdb_txn
*txn
)
190 ovsdb_error_assert(for_each_txn_row(txn
, ovsdb_txn_row_abort
));
194 static struct ovsdb_txn_row
*
195 find_txn_row(const struct ovsdb_table
*table
, const struct uuid
*uuid
)
197 struct ovsdb_txn_row
*txn_row
;
199 if (!table
->txn_table
) {
203 HMAP_FOR_EACH_WITH_HASH (txn_row
, hmap_node
,
204 uuid_hash(uuid
), &table
->txn_table
->txn_rows
) {
205 if (uuid_equals(uuid
, &txn_row
->uuid
)) {
213 static struct ovsdb_txn_row
*
214 find_or_make_txn_row(struct ovsdb_txn
*txn
, const struct ovsdb_table
*table
,
215 const struct uuid
*uuid
)
217 struct ovsdb_txn_row
*txn_row
= find_txn_row(table
, uuid
);
219 const struct ovsdb_row
*row
= ovsdb_table_get_row(table
, uuid
);
221 txn_row
= ovsdb_txn_row_modify(txn
, row
)->txn_row
;
227 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
228 ovsdb_txn_adjust_atom_refs(struct ovsdb_txn
*txn
, const struct ovsdb_row
*r
,
229 const struct ovsdb_column
*c
,
230 const struct ovsdb_base_type
*base
,
231 const union ovsdb_atom
*atoms
, unsigned int n
,
234 const struct ovsdb_table
*table
;
237 if (!ovsdb_base_type_is_strong_ref(base
)) {
241 table
= base
->uuid
.refTable
;
242 for (i
= 0; i
< n
; i
++) {
243 const struct uuid
*uuid
= &atoms
[i
].uuid
;
244 struct ovsdb_txn_row
*txn_row
;
246 if (uuid_equals(uuid
, ovsdb_row_get_uuid(r
))) {
247 /* Self-references don't count. */
251 txn_row
= find_or_make_txn_row(txn
, table
, uuid
);
253 return ovsdb_error("referential integrity violation",
254 "Table %s column %s row "UUID_FMT
" "
255 "references nonexistent row "UUID_FMT
" in "
257 r
->table
->schema
->name
, c
->name
,
258 UUID_ARGS(ovsdb_row_get_uuid(r
)),
259 UUID_ARGS(uuid
), table
->schema
->name
);
261 txn_row
->n_refs
+= delta
;
267 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
268 ovsdb_txn_adjust_row_refs(struct ovsdb_txn
*txn
, const struct ovsdb_row
*r
,
269 const struct ovsdb_column
*column
, int delta
)
271 const struct ovsdb_datum
*field
= &r
->fields
[column
->index
];
272 struct ovsdb_error
*error
;
274 error
= ovsdb_txn_adjust_atom_refs(txn
, r
, column
, &column
->type
.key
,
275 field
->keys
, field
->n
, delta
);
277 error
= ovsdb_txn_adjust_atom_refs(txn
, r
, column
, &column
->type
.value
,
278 field
->values
, field
->n
, delta
);
283 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
284 update_row_ref_count(struct ovsdb_txn
*txn
, struct ovsdb_txn_row
*r
)
286 struct ovsdb_table
*table
= r
->table
;
287 struct shash_node
*node
;
289 SHASH_FOR_EACH (node
, &table
->schema
->columns
) {
290 const struct ovsdb_column
*column
= node
->data
;
291 struct ovsdb_error
*error
;
293 if (bitmap_is_set(r
->changed
, column
->index
)) {
295 error
= ovsdb_txn_adjust_row_refs(txn
, r
->old
, column
, -1);
297 return OVSDB_WRAP_BUG("error decreasing refcount", error
);
301 error
= ovsdb_txn_adjust_row_refs(txn
, r
->new, column
, 1);
312 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
313 check_ref_count(struct ovsdb_txn
*txn OVS_UNUSED
, struct ovsdb_txn_row
*r
)
315 if (r
->new || !r
->n_refs
) {
318 return ovsdb_error("referential integrity violation",
319 "cannot delete %s row "UUID_FMT
" because "
320 "of %"PRIuSIZE
" remaining reference(s)",
321 r
->table
->schema
->name
, UUID_ARGS(&r
->uuid
),
326 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
327 delete_row_refs(struct ovsdb_txn
*txn
, const struct ovsdb_row
*row
,
328 const struct ovsdb_base_type
*base
,
329 const union ovsdb_atom
*atoms
, unsigned int n
)
331 const struct ovsdb_table
*table
;
334 if (!ovsdb_base_type_is_strong_ref(base
)) {
338 table
= base
->uuid
.refTable
;
339 for (i
= 0; i
< n
; i
++) {
340 const struct uuid
*uuid
= &atoms
[i
].uuid
;
341 struct ovsdb_txn_row
*txn_row
;
343 if (uuid_equals(uuid
, ovsdb_row_get_uuid(row
))) {
344 /* Self-references don't count. */
348 txn_row
= find_or_make_txn_row(txn
, table
, uuid
);
350 return OVSDB_BUG("strong ref target missing");
351 } else if (!txn_row
->n_refs
) {
352 return OVSDB_BUG("strong ref target has zero n_refs");
353 } else if (!txn_row
->new) {
354 return OVSDB_BUG("deleted strong ref target");
357 if (--txn_row
->n_refs
== 0) {
358 struct ovsdb_error
*error
= delete_garbage_row(txn
, txn_row
);
368 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
369 delete_garbage_row(struct ovsdb_txn
*txn
, struct ovsdb_txn_row
*txn_row
)
371 struct shash_node
*node
;
372 struct ovsdb_row
*row
;
374 if (txn_row
->table
->schema
->is_root
) {
380 hmap_remove(&txn_row
->table
->rows
, &row
->hmap_node
);
381 SHASH_FOR_EACH (node
, &txn_row
->table
->schema
->columns
) {
382 const struct ovsdb_column
*column
= node
->data
;
383 const struct ovsdb_datum
*field
= &row
->fields
[column
->index
];
384 struct ovsdb_error
*error
;
386 error
= delete_row_refs(txn
, row
,
387 &column
->type
.key
, field
->keys
, field
->n
);
392 error
= delete_row_refs(txn
, row
,
393 &column
->type
.value
, field
->values
, field
->n
);
398 ovsdb_row_destroy(row
);
403 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
404 collect_garbage(struct ovsdb_txn
*txn
, struct ovsdb_txn_row
*txn_row
)
406 if (txn_row
->new && !txn_row
->n_refs
) {
407 return delete_garbage_row(txn
, txn_row
);
412 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
413 update_ref_counts(struct ovsdb_txn
*txn
)
415 struct ovsdb_error
*error
;
417 error
= for_each_txn_row(txn
, update_row_ref_count
);
422 return for_each_txn_row(txn
, check_ref_count
);
425 static struct ovsdb_error
*
426 ovsdb_txn_row_commit(struct ovsdb_txn
*txn OVS_UNUSED
,
427 struct ovsdb_txn_row
*txn_row
)
429 size_t n_indexes
= txn_row
->table
->schema
->n_indexes
;
434 for (i
= 0; i
< n_indexes
; i
++) {
435 struct hmap_node
*node
= ovsdb_row_get_index_node(txn_row
->old
, i
);
436 hmap_remove(&txn_row
->table
->indexes
[i
], node
);
442 for (i
= 0; i
< n_indexes
; i
++) {
443 struct hmap_node
*node
= ovsdb_row_get_index_node(txn_row
->new, i
);
444 hmap_insert(&txn_row
->table
->indexes
[i
], node
, node
->hash
);
448 ovsdb_txn_row_prefree(txn_row
);
450 txn_row
->new->n_refs
= txn_row
->n_refs
;
452 ovsdb_row_destroy(txn_row
->old
);
458 static struct ovsdb_error
*
459 ovsdb_txn_update_weak_refs(struct ovsdb_txn
*txn OVS_UNUSED
,
460 struct ovsdb_txn_row
*txn_row
)
462 struct ovsdb_weak_ref
*weak
, *next
;
464 /* Remove the weak references originating in the old version of the row. */
466 LIST_FOR_EACH_SAFE (weak
, next
, src_node
, &txn_row
->old
->src_refs
) {
467 ovs_list_remove(&weak
->src_node
);
468 ovs_list_remove(&weak
->dst_node
);
473 /* Although the originating rows have the responsibility of updating the
474 * weak references in the dst, it is possible that some source rows aren't
475 * part of the transaction. In that situation this row needs to move the
476 * list of incoming weak references from the old row into the new one.
478 if (txn_row
->old
&& txn_row
->new) {
479 /* Move the incoming weak references from old to new. */
480 ovs_list_push_back_all(&txn_row
->new->dst_refs
,
481 &txn_row
->old
->dst_refs
);
484 /* Insert the weak references originating in the new version of the row. */
485 struct ovsdb_row
*dst_row
;
487 LIST_FOR_EACH (weak
, src_node
, &txn_row
->new->src_refs
) {
488 /* dst_row MUST exist. */
489 dst_row
= CONST_CAST(struct ovsdb_row
*,
490 ovsdb_table_get_row(weak
->dst_table
, &weak
->dst
));
491 ovs_list_insert(&dst_row
->dst_refs
, &weak
->dst_node
);
499 add_weak_ref(const struct ovsdb_row
*src_
, const struct ovsdb_row
*dst_
)
501 struct ovsdb_row
*src
= CONST_CAST(struct ovsdb_row
*, src_
);
502 struct ovsdb_row
*dst
= CONST_CAST(struct ovsdb_row
*, dst_
);
503 struct ovsdb_weak_ref
*weak
;
509 if (!ovs_list_is_empty(&dst
->dst_refs
)) {
510 /* Omit duplicates. */
511 weak
= CONTAINER_OF(ovs_list_back(&dst
->dst_refs
),
512 struct ovsdb_weak_ref
, dst_node
);
513 if (weak
->src
== src
) {
518 weak
= xmalloc(sizeof *weak
);
520 weak
->dst_table
= dst
->table
;
521 weak
->dst
= *ovsdb_row_get_uuid(dst
);
522 /* The dst_refs list is updated at commit time. */
523 ovs_list_init(&weak
->dst_node
);
524 ovs_list_push_back(&src
->src_refs
, &weak
->src_node
);
527 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
528 assess_weak_refs(struct ovsdb_txn
*txn
, struct ovsdb_txn_row
*txn_row
)
530 struct ovsdb_table
*table
;
531 struct shash_node
*node
;
533 if (txn_row
->old
&& !txn_row
->new) {
534 /* Mark rows that have weak references to 'txn_row' as modified, so
535 * that their weak references will get reassessed. */
536 struct ovsdb_weak_ref
*weak
, *next
;
538 LIST_FOR_EACH_SAFE (weak
, next
, dst_node
, &txn_row
->old
->dst_refs
) {
539 if (!weak
->src
->txn_row
) {
540 ovsdb_txn_row_modify(txn
, weak
->src
);
546 /* We don't have to do anything about references that originate at
547 * 'txn_row', because ovsdb_row_destroy() will remove those weak
552 table
= txn_row
->table
;
553 SHASH_FOR_EACH (node
, &table
->schema
->columns
) {
554 const struct ovsdb_column
*column
= node
->data
;
555 struct ovsdb_datum
*datum
= &txn_row
->new->fields
[column
->index
];
556 unsigned int orig_n
, i
;
561 if (ovsdb_base_type_is_weak_ref(&column
->type
.key
)) {
562 for (i
= 0; i
< datum
->n
; ) {
563 const struct ovsdb_row
*row
;
565 row
= ovsdb_table_get_row(column
->type
.key
.uuid
.refTable
,
566 &datum
->keys
[i
].uuid
);
568 add_weak_ref(txn_row
->new, row
);
571 if (uuid_is_zero(&datum
->keys
[i
].uuid
)) {
574 ovsdb_datum_remove_unsafe(datum
, i
, &column
->type
);
579 if (ovsdb_base_type_is_weak_ref(&column
->type
.value
)) {
580 for (i
= 0; i
< datum
->n
; ) {
581 const struct ovsdb_row
*row
;
583 row
= ovsdb_table_get_row(column
->type
.value
.uuid
.refTable
,
584 &datum
->values
[i
].uuid
);
586 add_weak_ref(txn_row
->new, row
);
589 if (uuid_is_zero(&datum
->values
[i
].uuid
)) {
592 ovsdb_datum_remove_unsafe(datum
, i
, &column
->type
);
597 if (datum
->n
!= orig_n
) {
598 bitmap_set1(txn_row
->changed
, column
->index
);
599 ovsdb_datum_sort_assert(datum
, column
->type
.key
.type
);
600 if (datum
->n
< column
->type
.n_min
) {
601 const struct uuid
*row_uuid
= ovsdb_row_get_uuid(txn_row
->new);
602 if (zero
&& !txn_row
->old
) {
604 "constraint violation",
605 "Weak reference column \"%s\" in \"%s\" row "UUID_FMT
606 " (inserted within this transaction) contained "
607 "all-zeros UUID (probably as the default value for "
608 "this column) but deleting this value caused a "
609 "constraint volation because this column is not "
610 "allowed to be empty.", column
->name
,
611 table
->schema
->name
, UUID_ARGS(row_uuid
));
614 "constraint violation",
615 "Deletion of %u weak reference(s) to deleted (or "
616 "never-existing) rows from column \"%s\" in \"%s\" "
617 "row "UUID_FMT
" %scaused this column to become empty, "
618 "but constraints on this column disallow an "
620 orig_n
- datum
->n
, column
->name
, table
->schema
->name
,
624 : "(inserted within this transaction) "));
633 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
634 determine_changes(struct ovsdb_txn
*txn
, struct ovsdb_txn_row
*txn_row
)
636 struct ovsdb_table
*table
= txn_row
->table
;
638 if (txn_row
->old
&& txn_row
->new) {
639 struct shash_node
*node
;
640 bool changed
= false;
642 SHASH_FOR_EACH (node
, &table
->schema
->columns
) {
643 const struct ovsdb_column
*column
= node
->data
;
644 const struct ovsdb_type
*type
= &column
->type
;
645 unsigned int idx
= column
->index
;
647 if (!ovsdb_datum_equals(&txn_row
->old
->fields
[idx
],
648 &txn_row
->new->fields
[idx
],
650 bitmap_set1(txn_row
->changed
, idx
);
656 /* Nothing actually changed in this row, so drop it. */
657 ovsdb_txn_row_abort(txn
, txn_row
);
660 bitmap_set_multiple(txn_row
->changed
, 0,
661 shash_count(&table
->schema
->columns
), 1);
667 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
668 check_max_rows(struct ovsdb_txn
*txn
)
670 struct ovsdb_txn_table
*t
;
672 LIST_FOR_EACH (t
, node
, &txn
->txn_tables
) {
673 size_t n_rows
= hmap_count(&t
->table
->rows
);
674 unsigned int max_rows
= t
->table
->schema
->max_rows
;
676 if (n_rows
> max_rows
) {
677 return ovsdb_error("constraint violation",
678 "transaction causes \"%s\" table to contain "
679 "%"PRIuSIZE
" rows, greater than the schema-defined "
680 "limit of %u row(s)",
681 t
->table
->schema
->name
, n_rows
, max_rows
);
688 static struct ovsdb_row
*
689 ovsdb_index_search(struct hmap
*index
, struct ovsdb_row
*row
, size_t i
,
692 const struct ovsdb_table
*table
= row
->table
;
693 const struct ovsdb_column_set
*columns
= &table
->schema
->indexes
[i
];
694 struct hmap_node
*node
;
696 for (node
= hmap_first_with_hash(index
, hash
); node
;
697 node
= hmap_next_with_hash(node
)) {
698 struct ovsdb_row
*irow
= ovsdb_row_from_index_node(node
, table
, i
);
699 if (ovsdb_row_equal_columns(row
, irow
, columns
)) {
708 duplicate_index_row__(const struct ovsdb_column_set
*index
,
709 const struct ovsdb_row
*row
,
713 size_t n_columns
= shash_count(&row
->table
->schema
->columns
);
715 ds_put_format(out
, "%s row, with UUID "UUID_FMT
", ",
716 title
, UUID_ARGS(ovsdb_row_get_uuid(row
)));
718 || bitmap_scan(row
->txn_row
->changed
, 1, 0, n_columns
) == n_columns
) {
719 ds_put_cstr(out
, "existed in the database before this "
720 "transaction and was not modified by the transaction.");
721 } else if (!row
->txn_row
->old
) {
722 ds_put_cstr(out
, "was inserted by this transaction.");
723 } else if (ovsdb_row_equal_columns(row
->txn_row
->old
,
724 row
->txn_row
->new, index
)) {
725 ds_put_cstr(out
, "existed in the database before this "
726 "transaction, which modified some of the row's columns "
727 "but not any columns in this index.");
729 ds_put_cstr(out
, "had the following index values before the "
731 ovsdb_row_columns_to_string(row
->txn_row
->old
, index
, out
);
732 ds_put_char(out
, '.');
736 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
737 duplicate_index_row(const struct ovsdb_column_set
*index
,
738 const struct ovsdb_row
*a
,
739 const struct ovsdb_row
*b
)
741 struct ovsdb_column_set all_columns
;
742 struct ovsdb_error
*error
;
746 /* Put 'a' and 'b' in a predictable order to make error messages
747 * reproducible for testing. */
748 ovsdb_column_set_init(&all_columns
);
749 ovsdb_column_set_add_all(&all_columns
, a
->table
);
750 if (ovsdb_row_compare_columns_3way(a
, b
, &all_columns
) < 0) {
751 const struct ovsdb_row
*tmp
= a
;
755 ovsdb_column_set_destroy(&all_columns
);
757 index_s
= ovsdb_column_set_to_string(index
);
760 ds_put_format(&s
, "Transaction causes multiple rows in \"%s\" table to "
761 "have identical values (", a
->table
->schema
->name
);
762 ovsdb_row_columns_to_string(a
, index
, &s
);
763 ds_put_format(&s
, ") for index on %s. ", index_s
);
764 duplicate_index_row__(index
, a
, "First", &s
);
765 ds_put_cstr(&s
, " ");
766 duplicate_index_row__(index
, b
, "Second", &s
);
770 error
= ovsdb_error("constraint violation", "%s", ds_cstr(&s
));
775 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
776 check_index_uniqueness(struct ovsdb_txn
*txn OVS_UNUSED
,
777 struct ovsdb_txn_row
*txn_row
)
779 /* Skip rows that are being deleted since they can't violate uniqueness. */
780 struct ovsdb_row
*row
= txn_row
->new;
785 struct ovsdb_txn_table
*txn_table
= txn_row
->table
->txn_table
;
786 struct ovsdb_table
*table
= txn_row
->table
;
787 for (size_t i
= 0; i
< table
->schema
->n_indexes
; i
++) {
788 const struct ovsdb_column_set
*index
= &table
->schema
->indexes
[i
];
789 uint32_t hash
= ovsdb_row_hash_columns(row
, index
, 0);
791 /* Check whether the row has a match in the temporary hash table that
792 * we're building. If we add two rows with the same index data, then
793 * there's a duplicate within the rows added or modified in this
795 struct ovsdb_row
*irow
796 = ovsdb_index_search(&txn_table
->txn_indexes
[i
], row
, i
, hash
);
798 return duplicate_index_row(index
, irow
, row
);
801 /* Also check whether the row has a match in the table's real index
802 * (which won't be updated until transaction commit is certain). If
803 * there's a match, and it's for a row that wasn't pulled into the
804 * transaction, then it's a duplicate. (If it is for a row that is
805 * part of the transaction, then the first check has already handled
807 irow
= ovsdb_index_search(&table
->indexes
[i
], row
, i
, hash
);
808 if (irow
&& !irow
->txn_row
) {
809 return duplicate_index_row(index
, irow
, row
);
812 /* Add row to temporary hash table. */
813 hmap_insert(&txn_table
->txn_indexes
[i
],
814 ovsdb_row_get_index_node(row
, i
), hash
);
820 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
821 update_version(struct ovsdb_txn
*txn OVS_UNUSED
, struct ovsdb_txn_row
*txn_row
)
823 struct ovsdb_table
*table
= txn_row
->table
;
824 size_t n_columns
= shash_count(&table
->schema
->columns
);
826 if (txn_row
->old
&& txn_row
->new
827 && !bitmap_is_all_zeros(txn_row
->changed
, n_columns
)) {
828 bitmap_set1(txn_row
->changed
, OVSDB_COL_VERSION
);
829 uuid_generate(ovsdb_row_get_version_rw(txn_row
->new));
836 ovsdb_txn_is_empty(const struct ovsdb_txn
*txn
)
838 return ovs_list_is_empty(&txn
->txn_tables
);
841 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
842 ovsdb_txn_precommit(struct ovsdb_txn
*txn
)
844 struct ovsdb_error
*error
;
846 /* Figure out what actually changed, and abort early if the transaction
847 * was really a no-op. */
848 error
= for_each_txn_row(txn
, determine_changes
);
850 ovsdb_txn_abort(txn
);
851 return OVSDB_WRAP_BUG("can't happen", error
);
853 if (ovs_list_is_empty(&txn
->txn_tables
)) {
857 /* Update reference counts and check referential integrity. */
858 error
= update_ref_counts(txn
);
863 /* Delete unreferenced, non-root rows. */
864 error
= for_each_txn_row(txn
, collect_garbage
);
866 return OVSDB_WRAP_BUG("can't happen", error
);
869 /* Check maximum rows table constraints. */
870 error
= check_max_rows(txn
);
875 /* Check reference counts and remove bad references for "weak" referential
877 error
= for_each_txn_row(txn
, assess_weak_refs
);
882 /* Verify that the indexes will still be unique post-transaction. */
883 error
= for_each_txn_row(txn
, check_index_uniqueness
);
888 /* Update _version for rows that changed. */
889 error
= for_each_txn_row(txn
, update_version
);
891 return OVSDB_WRAP_BUG("can't happen", error
);
897 static struct ovsdb_txn
*
898 ovsdb_txn_clone(const struct ovsdb_txn
*txn
)
900 struct ovsdb_txn
*txn_cloned
= xzalloc(sizeof *txn_cloned
);
901 ovs_list_init(&txn_cloned
->txn_tables
);
902 txn_cloned
->txnid
= txn
->txnid
;
904 struct ovsdb_txn_table
*t
;
905 LIST_FOR_EACH (t
, node
, &txn
->txn_tables
) {
906 struct ovsdb_txn_table
*t_cloned
= xmalloc(sizeof *t_cloned
);
907 ovs_list_push_back(&txn_cloned
->txn_tables
, &t_cloned
->node
);
908 hmap_init(&t_cloned
->txn_rows
);
910 struct ovsdb_txn_row
*r
;
911 HMAP_FOR_EACH (r
, hmap_node
, &t
->txn_rows
) {
912 size_t n_columns
= shash_count(&t
->table
->schema
->columns
);
913 struct ovsdb_txn_row
*r_cloned
=
914 xzalloc(offsetof(struct ovsdb_txn_row
, changed
)
915 + bitmap_n_bytes(n_columns
));
917 r_cloned
->uuid
= r
->uuid
;
918 r_cloned
->table
= r
->table
;
919 r_cloned
->old
= r
->old
? ovsdb_row_clone(r
->old
) : NULL
;
920 r_cloned
->new = r
->new ? ovsdb_row_clone(r
->new) : NULL
;
921 memcpy(&r_cloned
->changed
, &r
->changed
, bitmap_n_bytes(n_columns
));
922 hmap_insert(&t_cloned
->txn_rows
, &r_cloned
->hmap_node
,
923 uuid_hash(&r_cloned
->uuid
));
930 ovsdb_txn_destroy_cloned(struct ovsdb_txn
*txn
)
932 ovs_assert(!txn
->db
);
933 struct ovsdb_txn_table
*t
, *next_txn_table
;
934 LIST_FOR_EACH_SAFE (t
, next_txn_table
, node
, &txn
->txn_tables
) {
935 struct ovsdb_txn_row
*r
, *next_txn_row
;
936 HMAP_FOR_EACH_SAFE (r
, next_txn_row
, hmap_node
, &t
->txn_rows
) {
938 ovsdb_row_destroy(r
->old
);
941 ovsdb_row_destroy(r
->new);
943 hmap_remove(&t
->txn_rows
, &r
->hmap_node
);
946 hmap_destroy(&t
->txn_rows
);
947 ovs_list_remove(&t
->node
);
954 ovsdb_txn_add_to_history(struct ovsdb_txn
*txn
)
956 if (txn
->db
->need_txn_history
) {
957 struct ovsdb_txn_history_node
*node
= xzalloc(sizeof *node
);
958 node
->txn
= ovsdb_txn_clone(txn
);
959 ovs_list_push_back(&txn
->db
->txn_history
, &node
->node
);
960 txn
->db
->n_txn_history
++;
964 /* Finalize commit. */
966 ovsdb_txn_complete(struct ovsdb_txn
*txn
)
968 if (!ovsdb_txn_is_empty(txn
)) {
970 txn
->db
->run_triggers
= true;
971 ovsdb_monitors_commit(txn
->db
, txn
);
972 ovsdb_error_assert(for_each_txn_row(txn
, ovsdb_txn_update_weak_refs
));
973 ovsdb_error_assert(for_each_txn_row(txn
, ovsdb_txn_row_commit
));
978 /* Applies 'txn' to the internal representation of the database. This is for
979 * transactions that don't need to be written to storage; probably, they came
980 * from storage. These transactions shouldn't ordinarily fail because storage
981 * should contain only consistent transactions. (One exception is for database
982 * conversion in ovsdb_convert().) */
983 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
984 ovsdb_txn_replay_commit(struct ovsdb_txn
*txn
)
986 struct ovsdb_error
*error
= ovsdb_txn_precommit(txn
);
988 ovsdb_txn_abort(txn
);
990 ovsdb_txn_add_to_history(txn
);
991 ovsdb_txn_complete(txn
);
996 /* If 'error' is nonnull, the transaction is complete, with the given error as
999 * Otherwise, if 'write' is nonnull, then the transaction is waiting for
1000 * 'write' to complete.
1002 * Otherwise, if 'commit_index' is nonzero, then the transaction is waiting for
1003 * 'commit_index' to be applied to the storage.
1005 * Otherwise, the transaction is complete and successful. */
1006 struct ovsdb_txn_progress
{
1007 struct ovsdb_error
*error
;
1008 struct ovsdb_write
*write
;
1009 uint64_t commit_index
;
1011 struct ovsdb_storage
*storage
;
1015 ovsdb_txn_precheck_prereq(const struct ovsdb
*db
)
1017 const struct uuid
*eid
= ovsdb_storage_peek_last_eid(db
->storage
);
1021 return uuid_equals(&db
->prereq
, eid
);
1024 struct ovsdb_txn_progress
*
1025 ovsdb_txn_propose_schema_change(struct ovsdb
*db
,
1026 const struct json
*schema
,
1027 const struct json
*data
)
1029 struct ovsdb_txn_progress
*progress
= xzalloc(sizeof *progress
);
1030 progress
->storage
= db
->storage
;
1033 struct ovsdb_write
*write
= ovsdb_storage_write_schema_change(
1034 db
->storage
, schema
, data
, &db
->prereq
, &next
);
1035 if (!ovsdb_write_is_complete(write
)) {
1036 progress
->write
= write
;
1038 progress
->error
= ovsdb_error_clone(ovsdb_write_get_error(write
));
1039 ovsdb_write_destroy(write
);
1044 struct ovsdb_txn_progress
*
1045 ovsdb_txn_propose_commit(struct ovsdb_txn
*txn
, bool durable
)
1047 struct ovsdb_txn_progress
*progress
= xzalloc(sizeof *progress
);
1048 progress
->storage
= txn
->db
->storage
;
1049 progress
->error
= ovsdb_txn_precommit(txn
);
1050 if (progress
->error
) {
1054 /* Turn the commit into the format used for the storage logs.. */
1055 struct json
*txn_json
= ovsdb_file_txn_to_json(txn
);
1057 /* Nothing to do, so success. */
1060 txn_json
= ovsdb_file_txn_annotate(txn_json
, ovsdb_txn_get_comment(txn
));
1063 struct ovsdb_write
*write
= ovsdb_storage_write(
1064 txn
->db
->storage
, txn_json
, &txn
->db
->prereq
, &next
, durable
);
1065 json_destroy(txn_json
);
1066 if (!ovsdb_write_is_complete(write
)) {
1067 progress
->write
= write
;
1069 progress
->error
= ovsdb_error_clone(ovsdb_write_get_error(write
));
1070 ovsdb_write_destroy(write
);
1075 /* Proposes 'txn' for commitment and then waits for the commit to succeed or
1076 * fail. Returns null if successful, otherwise the error.
1078 * **In addition**, this function also completes or aborts the transaction if
1079 * the transaction succeeded or failed, respectively. */
1080 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
1081 ovsdb_txn_propose_commit_block(struct ovsdb_txn
*txn
, bool durable
)
1083 struct ovsdb_txn_progress
*p
= ovsdb_txn_propose_commit(txn
, durable
);
1085 ovsdb_storage_run(p
->storage
);
1086 if (ovsdb_txn_progress_is_complete(p
)) {
1087 struct ovsdb_error
*error
1088 = ovsdb_error_clone(ovsdb_txn_progress_get_error(p
));
1089 ovsdb_txn_progress_destroy(p
);
1092 ovsdb_txn_abort(txn
);
1094 ovsdb_txn_complete(txn
);
1099 ovsdb_storage_wait(p
->storage
);
1105 ovsdb_txn_progress_run(struct ovsdb_txn_progress
*p
)
1112 if (!ovsdb_write_is_complete(p
->write
)) {
1115 p
->error
= ovsdb_error_clone(ovsdb_write_get_error(p
->write
));
1116 p
->commit_index
= ovsdb_write_get_commit_index(p
->write
);
1117 ovsdb_write_destroy(p
->write
);
1125 if (p
->commit_index
) {
1126 if (ovsdb_storage_get_applied_index(p
->storage
) >= p
->commit_index
) {
1127 p
->commit_index
= 0;
1133 ovsdb_txn_progress_is_complete__(const struct ovsdb_txn_progress
*p
)
1135 return p
->error
|| (!p
->write
&& !p
->commit_index
);
1139 ovsdb_txn_progress_is_complete(const struct ovsdb_txn_progress
*p
)
1141 ovsdb_txn_progress_run(CONST_CAST(struct ovsdb_txn_progress
*, p
));
1142 return ovsdb_txn_progress_is_complete__(p
);
1145 const struct ovsdb_error
*
1146 ovsdb_txn_progress_get_error(const struct ovsdb_txn_progress
*p
)
1148 ovs_assert(ovsdb_txn_progress_is_complete__(p
));
1153 ovsdb_txn_progress_destroy(struct ovsdb_txn_progress
*p
)
1156 ovsdb_error_destroy(p
->error
);
1157 ovsdb_write_destroy(p
->write
);
1163 ovsdb_txn_for_each_change(const struct ovsdb_txn
*txn
,
1164 ovsdb_txn_row_cb_func
*cb
, void *aux
)
1166 struct ovsdb_txn_table
*t
;
1167 struct ovsdb_txn_row
*r
;
1169 LIST_FOR_EACH (t
, node
, &txn
->txn_tables
) {
1170 HMAP_FOR_EACH (r
, hmap_node
, &t
->txn_rows
) {
1171 if ((r
->old
|| r
->new) && !cb(r
->old
, r
->new, r
->changed
, aux
)) {
1178 static struct ovsdb_txn_table
*
1179 ovsdb_txn_create_txn_table(struct ovsdb_txn
*txn
, struct ovsdb_table
*table
)
1181 if (!table
->txn_table
) {
1182 struct ovsdb_txn_table
*txn_table
;
1185 table
->txn_table
= txn_table
= xmalloc(sizeof *table
->txn_table
);
1186 txn_table
->table
= table
;
1187 hmap_init(&txn_table
->txn_rows
);
1188 txn_table
->serial
= serial
- 1;
1189 txn_table
->txn_indexes
= xmalloc(table
->schema
->n_indexes
1190 * sizeof *txn_table
->txn_indexes
);
1191 for (i
= 0; i
< table
->schema
->n_indexes
; i
++) {
1192 hmap_init(&txn_table
->txn_indexes
[i
]);
1194 ovs_list_push_back(&txn
->txn_tables
, &txn_table
->node
);
1196 return table
->txn_table
;
1199 static struct ovsdb_txn_row
*
1200 ovsdb_txn_row_create(struct ovsdb_txn
*txn
, struct ovsdb_table
*table
,
1201 const struct ovsdb_row
*old_
, struct ovsdb_row
*new)
1203 const struct ovsdb_row
*row
= old_
? old_
: new;
1204 struct ovsdb_row
*old
= CONST_CAST(struct ovsdb_row
*, old_
);
1205 size_t n_columns
= shash_count(&table
->schema
->columns
);
1206 struct ovsdb_txn_table
*txn_table
;
1207 struct ovsdb_txn_row
*txn_row
;
1209 txn_row
= xzalloc(offsetof(struct ovsdb_txn_row
, changed
)
1210 + bitmap_n_bytes(n_columns
));
1211 txn_row
->uuid
= *ovsdb_row_get_uuid(row
);
1212 txn_row
->table
= row
->table
;
1215 txn_row
->n_refs
= old
? old
->n_refs
: 0;
1216 txn_row
->serial
= serial
- 1;
1219 old
->txn_row
= txn_row
;
1222 new->txn_row
= txn_row
;
1225 txn_table
= ovsdb_txn_create_txn_table(txn
, table
);
1226 hmap_insert(&txn_table
->txn_rows
, &txn_row
->hmap_node
,
1227 ovsdb_row_hash(old
? old
: new));
1233 ovsdb_txn_row_modify(struct ovsdb_txn
*txn
, const struct ovsdb_row
*ro_row_
)
1235 struct ovsdb_row
*ro_row
= CONST_CAST(struct ovsdb_row
*, ro_row_
);
1237 if (ro_row
->txn_row
) {
1238 ovs_assert(ro_row
== ro_row
->txn_row
->new);
1241 struct ovsdb_table
*table
= ro_row
->table
;
1242 struct ovsdb_row
*rw_row
;
1244 rw_row
= ovsdb_row_clone(ro_row
);
1245 rw_row
->n_refs
= ro_row
->n_refs
;
1246 ovsdb_txn_row_create(txn
, table
, ro_row
, rw_row
);
1247 hmap_replace(&table
->rows
, &ro_row
->hmap_node
, &rw_row
->hmap_node
);
1254 ovsdb_txn_row_insert(struct ovsdb_txn
*txn
, struct ovsdb_row
*row
)
1256 uint32_t hash
= ovsdb_row_hash(row
);
1257 struct ovsdb_table
*table
= row
->table
;
1259 uuid_generate(ovsdb_row_get_version_rw(row
));
1261 ovsdb_txn_row_create(txn
, table
, NULL
, row
);
1262 hmap_insert(&table
->rows
, &row
->hmap_node
, hash
);
1265 /* 'row' must be assumed destroyed upon return; the caller must not reference
1268 ovsdb_txn_row_delete(struct ovsdb_txn
*txn
, const struct ovsdb_row
*row_
)
1270 struct ovsdb_row
*row
= CONST_CAST(struct ovsdb_row
*, row_
);
1271 struct ovsdb_table
*table
= row
->table
;
1272 struct ovsdb_txn_row
*txn_row
= row
->txn_row
;
1274 hmap_remove(&table
->rows
, &row
->hmap_node
);
1277 ovsdb_txn_row_create(txn
, table
, row
, NULL
);
1279 ovs_assert(txn_row
->new == row
);
1281 txn_row
->new = NULL
;
1283 hmap_remove(&table
->txn_table
->txn_rows
, &txn_row
->hmap_node
);
1286 ovsdb_row_destroy(row
);
1291 ovsdb_txn_add_comment(struct ovsdb_txn
*txn
, const char *s
)
1293 if (txn
->comment
.length
) {
1294 ds_put_char(&txn
->comment
, '\n');
1296 ds_put_cstr(&txn
->comment
, s
);
1300 ovsdb_txn_get_comment(const struct ovsdb_txn
*txn
)
1302 return txn
->comment
.length
? ds_cstr_ro(&txn
->comment
) : NULL
;
1306 ovsdb_txn_row_prefree(struct ovsdb_txn_row
*txn_row
)
1308 struct ovsdb_txn_table
*txn_table
= txn_row
->table
->txn_table
;
1310 txn_table
->n_processed
--;
1311 hmap_remove(&txn_table
->txn_rows
, &txn_row
->hmap_node
);
1314 txn_row
->old
->txn_row
= NULL
;
1317 txn_row
->new->txn_row
= NULL
;
1322 ovsdb_txn_table_destroy(struct ovsdb_txn_table
*txn_table
)
1326 ovs_assert(hmap_is_empty(&txn_table
->txn_rows
));
1328 for (i
= 0; i
< txn_table
->table
->schema
->n_indexes
; i
++) {
1329 hmap_destroy(&txn_table
->txn_indexes
[i
]);
1331 free(txn_table
->txn_indexes
);
1333 txn_table
->table
->txn_table
= NULL
;
1334 hmap_destroy(&txn_table
->txn_rows
);
1335 ovs_list_remove(&txn_table
->node
);
1339 /* Calls 'cb' for every txn_row within 'txn'. If 'cb' returns nonnull, this
1340 * aborts the iteration and for_each_txn_row() passes the error up. Otherwise,
1341 * returns a null pointer after iteration is complete.
1343 * 'cb' may insert new txn_rows and new txn_tables into 'txn'. It may delete
1344 * the txn_row that it is passed in, or txn_rows in txn_tables other than the
1345 * one passed to 'cb'. It may *not* delete txn_rows other than the one passed
1346 * in within the same txn_table. It may *not* delete any txn_tables. As long
1347 * as these rules are followed, 'cb' will be called exactly once for each
1348 * txn_row in 'txn', even those added by 'cb'.
1350 * (Even though 'cb' is not allowed to delete some txn_rows, it can still
1351 * delete any actual row by clearing a txn_row's 'new' member.)
1353 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
1354 for_each_txn_row(struct ovsdb_txn
*txn
,
1355 struct ovsdb_error
*(*cb
)(struct ovsdb_txn
*,
1356 struct ovsdb_txn_row
*))
1363 struct ovsdb_txn_table
*t
, *next_txn_table
;
1366 LIST_FOR_EACH_SAFE (t
, next_txn_table
, node
, &txn
->txn_tables
) {
1367 if (t
->serial
!= serial
) {
1372 while (t
->n_processed
< hmap_count(&t
->txn_rows
)) {
1373 struct ovsdb_txn_row
*r
, *next_txn_row
;
1375 HMAP_FOR_EACH_SAFE (r
, next_txn_row
, hmap_node
, &t
->txn_rows
) {
1376 if (r
->serial
!= serial
) {
1377 struct ovsdb_error
*error
;
1390 if (hmap_is_empty(&t
->txn_rows
)) {
1391 /* Table is empty. Drop it. */
1392 ovsdb_txn_table_destroy(t
);
1401 ovsdb_txn_history_run(struct ovsdb
*db
)
1403 if (!db
->need_txn_history
) {
1406 /* Remove old histories to limit the size of the history */
1407 while (db
->n_txn_history
> 100) {
1408 struct ovsdb_txn_history_node
*txn_h_node
= CONTAINER_OF(
1409 ovs_list_pop_front(&db
->txn_history
),
1410 struct ovsdb_txn_history_node
, node
);
1412 ovsdb_txn_destroy_cloned(txn_h_node
->txn
);
1414 db
->n_txn_history
--;
1419 ovsdb_txn_history_init(struct ovsdb
*db
, bool need_txn_history
)
1421 db
->need_txn_history
= need_txn_history
;
1422 db
->n_txn_history
= 0;
1423 ovs_list_init(&db
->txn_history
);
1427 ovsdb_txn_history_destroy(struct ovsdb
*db
)
1430 if (!db
->need_txn_history
) {
1434 struct ovsdb_txn_history_node
*txn_h_node
, *next
;
1435 LIST_FOR_EACH_SAFE (txn_h_node
, next
, node
, &db
->txn_history
) {
1436 ovs_list_remove(&txn_h_node
->node
);
1437 ovsdb_txn_destroy_cloned(txn_h_node
->txn
);
1440 db
->n_txn_history
= 0;