]> git.proxmox.com Git - ovs.git/blob - ovsdb/transaction.c
dpctl: Fix dpctl process command parameter error.
[ovs.git] / ovsdb / transaction.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2017, 2019 Nicira, Inc.
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <config.h>
17
18 #include "transaction.h"
19
20 #include "bitmap.h"
21 #include "openvswitch/dynamic-string.h"
22 #include "file.h"
23 #include "hash.h"
24 #include "monitor.h"
25 #include "openvswitch/hmap.h"
26 #include "openvswitch/json.h"
27 #include "openvswitch/list.h"
28 #include "openvswitch/poll-loop.h"
29 #include "openvswitch/vlog.h"
30 #include "ovsdb-error.h"
31 #include "ovsdb.h"
32 #include "row.h"
33 #include "storage.h"
34 #include "table.h"
35 #include "uuid.h"
36
37 VLOG_DEFINE_THIS_MODULE(transaction);
38
39 struct ovsdb_txn {
40 struct ovsdb *db;
41 struct ovs_list txn_tables; /* Contains "struct ovsdb_txn_table"s. */
42 struct ds comment;
43 struct uuid txnid; /* For clustered mode only. It is the eid. */
44 };
45
46 /* A table modified by a transaction. */
47 struct ovsdb_txn_table {
48 struct ovs_list node; /* Element in ovsdb_txn's txn_tables list. */
49 struct ovsdb_table *table;
50 struct hmap txn_rows; /* Contains "struct ovsdb_txn_row"s. */
51
52 /* This has the same form as the 'indexes' member of struct ovsdb_table,
53 * but it is only used or updated at transaction commit time, from
54 * check_index_uniqueness(). */
55 struct hmap *txn_indexes;
56
57 /* Used by for_each_txn_row(). */
58 unsigned int serial; /* Serial number of in-progress iteration. */
59 unsigned int n_processed; /* Number of rows processed. */
60 };
61
62 /* A row modified by the transaction:
63 *
64 * - A row added by a transaction will have null 'old' and non-null 'new'.
65 *
66 * - A row deleted by a transaction will have non-null 'old' and null
67 * 'new'.
68 *
69 * - A row modified by a transaction will have non-null 'old' and 'new'.
70 *
71 * - 'old' and 'new' both null indicates that a row was added then deleted
72 * within a single transaction. Most of the time we instead delete the
73 * ovsdb_txn_row entirely, but inside a for_each_txn_row() callback
74 * there are restrictions that sometimes mean we have to leave the
75 * ovsdb_txn_row in place.
76 */
77 struct ovsdb_txn_row {
78 struct hmap_node hmap_node; /* In ovsdb_txn_table's txn_rows hmap. */
79 struct ovsdb_row *old; /* The old row. */
80 struct ovsdb_row *new; /* The new row. */
81 size_t n_refs; /* Number of remaining references. */
82
83 /* These members are the same as the corresponding members of 'old' or
84 * 'new'. They are present here for convenience and because occasionally
85 * there can be an ovsdb_txn_row where both 'old' and 'new' are NULL. */
86 struct uuid uuid;
87 struct ovsdb_table *table;
88
89 /* Used by for_each_txn_row(). */
90 unsigned int serial; /* Serial number of in-progress commit. */
91
92 unsigned long changed[]; /* Bits set to 1 for columns that changed. */
93 };
94
95 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
96 delete_garbage_row(struct ovsdb_txn *txn, struct ovsdb_txn_row *r);
97 static void ovsdb_txn_row_prefree(struct ovsdb_txn_row *);
98 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
99 for_each_txn_row(struct ovsdb_txn *txn,
100 struct ovsdb_error *(*)(struct ovsdb_txn *,
101 struct ovsdb_txn_row *));
102
103 /* Used by for_each_txn_row() to track tables and rows that have been
104 * processed. */
105 static unsigned int serial;
106
107 struct ovsdb_txn *
108 ovsdb_txn_create(struct ovsdb *db)
109 {
110 struct ovsdb_txn *txn = xzalloc(sizeof *txn);
111 txn->db = db;
112 ovs_list_init(&txn->txn_tables);
113 ds_init(&txn->comment);
114 return txn;
115 }
116
117 void
118 ovsdb_txn_set_txnid(const struct uuid *txnid, struct ovsdb_txn *txn)
119 {
120 txn->txnid = *txnid;
121 }
122
123 const struct uuid *
124 ovsdb_txn_get_txnid(const struct ovsdb_txn *txn)
125 {
126 return &txn->txnid;
127 }
128
129 static void
130 ovsdb_txn_free(struct ovsdb_txn *txn)
131 {
132 ovs_assert(ovs_list_is_empty(&txn->txn_tables));
133 ds_destroy(&txn->comment);
134 free(txn);
135 }
136
137 static struct ovsdb_error *
138 ovsdb_txn_row_abort(struct ovsdb_txn *txn OVS_UNUSED,
139 struct ovsdb_txn_row *txn_row)
140 {
141 struct ovsdb_row *old = txn_row->old;
142 struct ovsdb_row *new = txn_row->new;
143
144 ovsdb_txn_row_prefree(txn_row);
145 if (!old) {
146 if (new) {
147 hmap_remove(&new->table->rows, &new->hmap_node);
148 }
149 } else if (!new) {
150 hmap_insert(&old->table->rows, &old->hmap_node, ovsdb_row_hash(old));
151 } else {
152 hmap_replace(&new->table->rows, &new->hmap_node, &old->hmap_node);
153 }
154 ovsdb_row_destroy(new);
155 free(txn_row);
156
157 return NULL;
158 }
159
160 /* Returns the offset in bytes from the start of an ovsdb_row for 'table' to
161 * the hmap_node for the index numbered 'i'. */
162 static size_t
163 ovsdb_row_index_offset__(const struct ovsdb_table *table, size_t i)
164 {
165 size_t n_fields = shash_count(&table->schema->columns);
166 return (offsetof(struct ovsdb_row, fields)
167 + n_fields * sizeof(struct ovsdb_datum)
168 + i * sizeof(struct hmap_node));
169 }
170
171 /* Returns the hmap_node in 'row' for the index numbered 'i'. */
172 static struct hmap_node *
173 ovsdb_row_get_index_node(struct ovsdb_row *row, size_t i)
174 {
175 return (void *) ((char *) row + ovsdb_row_index_offset__(row->table, i));
176 }
177
178 /* Returns the ovsdb_row given 'index_node', which is a pointer to that row's
179 * hmap_node for the index numbered 'i' within 'table'. */
180 static struct ovsdb_row *
181 ovsdb_row_from_index_node(struct hmap_node *index_node,
182 const struct ovsdb_table *table, size_t i)
183 {
184 return (void *) ((char *) index_node - ovsdb_row_index_offset__(table, i));
185 }
186
187 void
188 ovsdb_txn_abort(struct ovsdb_txn *txn)
189 {
190 ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_row_abort));
191 ovsdb_txn_free(txn);
192 }
193
194 static struct ovsdb_txn_row *
195 find_txn_row(const struct ovsdb_table *table, const struct uuid *uuid)
196 {
197 struct ovsdb_txn_row *txn_row;
198
199 if (!table->txn_table) {
200 return NULL;
201 }
202
203 HMAP_FOR_EACH_WITH_HASH (txn_row, hmap_node,
204 uuid_hash(uuid), &table->txn_table->txn_rows) {
205 if (uuid_equals(uuid, &txn_row->uuid)) {
206 return txn_row;
207 }
208 }
209
210 return NULL;
211 }
212
213 static struct ovsdb_txn_row *
214 find_or_make_txn_row(struct ovsdb_txn *txn, const struct ovsdb_table *table,
215 const struct uuid *uuid)
216 {
217 struct ovsdb_txn_row *txn_row = find_txn_row(table, uuid);
218 if (!txn_row) {
219 const struct ovsdb_row *row = ovsdb_table_get_row(table, uuid);
220 if (row) {
221 txn_row = ovsdb_txn_row_modify(txn, row)->txn_row;
222 }
223 }
224 return txn_row;
225 }
226
227 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
228 ovsdb_txn_adjust_atom_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
229 const struct ovsdb_column *c,
230 const struct ovsdb_base_type *base,
231 const union ovsdb_atom *atoms, unsigned int n,
232 int delta)
233 {
234 const struct ovsdb_table *table;
235 unsigned int i;
236
237 if (!ovsdb_base_type_is_strong_ref(base)) {
238 return NULL;
239 }
240
241 table = base->uuid.refTable;
242 for (i = 0; i < n; i++) {
243 const struct uuid *uuid = &atoms[i].uuid;
244 struct ovsdb_txn_row *txn_row;
245
246 if (uuid_equals(uuid, ovsdb_row_get_uuid(r))) {
247 /* Self-references don't count. */
248 continue;
249 }
250
251 txn_row = find_or_make_txn_row(txn, table, uuid);
252 if (!txn_row) {
253 return ovsdb_error("referential integrity violation",
254 "Table %s column %s row "UUID_FMT" "
255 "references nonexistent row "UUID_FMT" in "
256 "table %s.",
257 r->table->schema->name, c->name,
258 UUID_ARGS(ovsdb_row_get_uuid(r)),
259 UUID_ARGS(uuid), table->schema->name);
260 }
261 txn_row->n_refs += delta;
262 }
263
264 return NULL;
265 }
266
267 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
268 ovsdb_txn_adjust_row_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
269 const struct ovsdb_column *column, int delta)
270 {
271 const struct ovsdb_datum *field = &r->fields[column->index];
272 struct ovsdb_error *error;
273
274 error = ovsdb_txn_adjust_atom_refs(txn, r, column, &column->type.key,
275 field->keys, field->n, delta);
276 if (!error) {
277 error = ovsdb_txn_adjust_atom_refs(txn, r, column, &column->type.value,
278 field->values, field->n, delta);
279 }
280 return error;
281 }
282
283 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
284 update_row_ref_count(struct ovsdb_txn *txn, struct ovsdb_txn_row *r)
285 {
286 struct ovsdb_table *table = r->table;
287 struct shash_node *node;
288
289 SHASH_FOR_EACH (node, &table->schema->columns) {
290 const struct ovsdb_column *column = node->data;
291 struct ovsdb_error *error;
292
293 if (bitmap_is_set(r->changed, column->index)) {
294 if (r->old) {
295 error = ovsdb_txn_adjust_row_refs(txn, r->old, column, -1);
296 if (error) {
297 return OVSDB_WRAP_BUG("error decreasing refcount", error);
298 }
299 }
300 if (r->new) {
301 error = ovsdb_txn_adjust_row_refs(txn, r->new, column, 1);
302 if (error) {
303 return error;
304 }
305 }
306 }
307 }
308
309 return NULL;
310 }
311
312 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
313 check_ref_count(struct ovsdb_txn *txn OVS_UNUSED, struct ovsdb_txn_row *r)
314 {
315 if (r->new || !r->n_refs) {
316 return NULL;
317 } else {
318 return ovsdb_error("referential integrity violation",
319 "cannot delete %s row "UUID_FMT" because "
320 "of %"PRIuSIZE" remaining reference(s)",
321 r->table->schema->name, UUID_ARGS(&r->uuid),
322 r->n_refs);
323 }
324 }
325
326 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
327 delete_row_refs(struct ovsdb_txn *txn, const struct ovsdb_row *row,
328 const struct ovsdb_base_type *base,
329 const union ovsdb_atom *atoms, unsigned int n)
330 {
331 const struct ovsdb_table *table;
332 unsigned int i;
333
334 if (!ovsdb_base_type_is_strong_ref(base)) {
335 return NULL;
336 }
337
338 table = base->uuid.refTable;
339 for (i = 0; i < n; i++) {
340 const struct uuid *uuid = &atoms[i].uuid;
341 struct ovsdb_txn_row *txn_row;
342
343 if (uuid_equals(uuid, ovsdb_row_get_uuid(row))) {
344 /* Self-references don't count. */
345 continue;
346 }
347
348 txn_row = find_or_make_txn_row(txn, table, uuid);
349 if (!txn_row) {
350 return OVSDB_BUG("strong ref target missing");
351 } else if (!txn_row->n_refs) {
352 return OVSDB_BUG("strong ref target has zero n_refs");
353 } else if (!txn_row->new) {
354 return OVSDB_BUG("deleted strong ref target");
355 }
356
357 if (--txn_row->n_refs == 0) {
358 struct ovsdb_error *error = delete_garbage_row(txn, txn_row);
359 if (error) {
360 return error;
361 }
362 }
363 }
364
365 return NULL;
366 }
367
368 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
369 delete_garbage_row(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
370 {
371 struct shash_node *node;
372 struct ovsdb_row *row;
373
374 if (txn_row->table->schema->is_root) {
375 return NULL;
376 }
377
378 row = txn_row->new;
379 txn_row->new = NULL;
380 hmap_remove(&txn_row->table->rows, &row->hmap_node);
381 SHASH_FOR_EACH (node, &txn_row->table->schema->columns) {
382 const struct ovsdb_column *column = node->data;
383 const struct ovsdb_datum *field = &row->fields[column->index];
384 struct ovsdb_error *error;
385
386 error = delete_row_refs(txn, row,
387 &column->type.key, field->keys, field->n);
388 if (error) {
389 return error;
390 }
391
392 error = delete_row_refs(txn, row,
393 &column->type.value, field->values, field->n);
394 if (error) {
395 return error;
396 }
397 }
398 ovsdb_row_destroy(row);
399
400 return NULL;
401 }
402
403 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
404 collect_garbage(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
405 {
406 if (txn_row->new && !txn_row->n_refs) {
407 return delete_garbage_row(txn, txn_row);
408 }
409 return NULL;
410 }
411
412 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
413 update_ref_counts(struct ovsdb_txn *txn)
414 {
415 struct ovsdb_error *error;
416
417 error = for_each_txn_row(txn, update_row_ref_count);
418 if (error) {
419 return error;
420 }
421
422 return for_each_txn_row(txn, check_ref_count);
423 }
424
425 static struct ovsdb_error *
426 ovsdb_txn_row_commit(struct ovsdb_txn *txn OVS_UNUSED,
427 struct ovsdb_txn_row *txn_row)
428 {
429 size_t n_indexes = txn_row->table->schema->n_indexes;
430
431 if (txn_row->old) {
432 size_t i;
433
434 for (i = 0; i < n_indexes; i++) {
435 struct hmap_node *node = ovsdb_row_get_index_node(txn_row->old, i);
436 hmap_remove(&txn_row->table->indexes[i], node);
437 }
438 }
439 if (txn_row->new) {
440 size_t i;
441
442 for (i = 0; i < n_indexes; i++) {
443 struct hmap_node *node = ovsdb_row_get_index_node(txn_row->new, i);
444 hmap_insert(&txn_row->table->indexes[i], node, node->hash);
445 }
446 }
447
448 ovsdb_txn_row_prefree(txn_row);
449 if (txn_row->new) {
450 txn_row->new->n_refs = txn_row->n_refs;
451 }
452 ovsdb_row_destroy(txn_row->old);
453 free(txn_row);
454
455 return NULL;
456 }
457
458 static struct ovsdb_error *
459 ovsdb_txn_update_weak_refs(struct ovsdb_txn *txn OVS_UNUSED,
460 struct ovsdb_txn_row *txn_row)
461 {
462 struct ovsdb_weak_ref *weak, *next;
463
464 /* Remove the weak references originating in the old version of the row. */
465 if (txn_row->old) {
466 LIST_FOR_EACH_SAFE (weak, next, src_node, &txn_row->old->src_refs) {
467 ovs_list_remove(&weak->src_node);
468 ovs_list_remove(&weak->dst_node);
469 free(weak);
470 }
471 }
472
473 /* Although the originating rows have the responsibility of updating the
474 * weak references in the dst, it is possible that some source rows aren't
475 * part of the transaction. In that situation this row needs to move the
476 * list of incoming weak references from the old row into the new one.
477 */
478 if (txn_row->old && txn_row->new) {
479 /* Move the incoming weak references from old to new. */
480 ovs_list_push_back_all(&txn_row->new->dst_refs,
481 &txn_row->old->dst_refs);
482 }
483
484 /* Insert the weak references originating in the new version of the row. */
485 struct ovsdb_row *dst_row;
486 if (txn_row->new) {
487 LIST_FOR_EACH (weak, src_node, &txn_row->new->src_refs) {
488 /* dst_row MUST exist. */
489 dst_row = CONST_CAST(struct ovsdb_row *,
490 ovsdb_table_get_row(weak->dst_table, &weak->dst));
491 ovs_list_insert(&dst_row->dst_refs, &weak->dst_node);
492 }
493 }
494
495 return NULL;
496 }
497
498 static void
499 add_weak_ref(const struct ovsdb_row *src_, const struct ovsdb_row *dst_)
500 {
501 struct ovsdb_row *src = CONST_CAST(struct ovsdb_row *, src_);
502 struct ovsdb_row *dst = CONST_CAST(struct ovsdb_row *, dst_);
503 struct ovsdb_weak_ref *weak;
504
505 if (src == dst) {
506 return;
507 }
508
509 if (!ovs_list_is_empty(&dst->dst_refs)) {
510 /* Omit duplicates. */
511 weak = CONTAINER_OF(ovs_list_back(&dst->dst_refs),
512 struct ovsdb_weak_ref, dst_node);
513 if (weak->src == src) {
514 return;
515 }
516 }
517
518 weak = xmalloc(sizeof *weak);
519 weak->src = src;
520 weak->dst_table = dst->table;
521 weak->dst = *ovsdb_row_get_uuid(dst);
522 /* The dst_refs list is updated at commit time. */
523 ovs_list_init(&weak->dst_node);
524 ovs_list_push_back(&src->src_refs, &weak->src_node);
525 }
526
527 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
528 assess_weak_refs(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
529 {
530 struct ovsdb_table *table;
531 struct shash_node *node;
532
533 if (txn_row->old && !txn_row->new) {
534 /* Mark rows that have weak references to 'txn_row' as modified, so
535 * that their weak references will get reassessed. */
536 struct ovsdb_weak_ref *weak, *next;
537
538 LIST_FOR_EACH_SAFE (weak, next, dst_node, &txn_row->old->dst_refs) {
539 if (!weak->src->txn_row) {
540 ovsdb_txn_row_modify(txn, weak->src);
541 }
542 }
543 }
544
545 if (!txn_row->new) {
546 /* We don't have to do anything about references that originate at
547 * 'txn_row', because ovsdb_row_destroy() will remove those weak
548 * references. */
549 return NULL;
550 }
551
552 table = txn_row->table;
553 SHASH_FOR_EACH (node, &table->schema->columns) {
554 const struct ovsdb_column *column = node->data;
555 struct ovsdb_datum *datum = &txn_row->new->fields[column->index];
556 unsigned int orig_n, i;
557 bool zero = false;
558
559 orig_n = datum->n;
560
561 if (ovsdb_base_type_is_weak_ref(&column->type.key)) {
562 for (i = 0; i < datum->n; ) {
563 const struct ovsdb_row *row;
564
565 row = ovsdb_table_get_row(column->type.key.uuid.refTable,
566 &datum->keys[i].uuid);
567 if (row) {
568 add_weak_ref(txn_row->new, row);
569 i++;
570 } else {
571 if (uuid_is_zero(&datum->keys[i].uuid)) {
572 zero = true;
573 }
574 ovsdb_datum_remove_unsafe(datum, i, &column->type);
575 }
576 }
577 }
578
579 if (ovsdb_base_type_is_weak_ref(&column->type.value)) {
580 for (i = 0; i < datum->n; ) {
581 const struct ovsdb_row *row;
582
583 row = ovsdb_table_get_row(column->type.value.uuid.refTable,
584 &datum->values[i].uuid);
585 if (row) {
586 add_weak_ref(txn_row->new, row);
587 i++;
588 } else {
589 if (uuid_is_zero(&datum->values[i].uuid)) {
590 zero = true;
591 }
592 ovsdb_datum_remove_unsafe(datum, i, &column->type);
593 }
594 }
595 }
596
597 if (datum->n != orig_n) {
598 bitmap_set1(txn_row->changed, column->index);
599 ovsdb_datum_sort_assert(datum, column->type.key.type);
600 if (datum->n < column->type.n_min) {
601 const struct uuid *row_uuid = ovsdb_row_get_uuid(txn_row->new);
602 if (zero && !txn_row->old) {
603 return ovsdb_error(
604 "constraint violation",
605 "Weak reference column \"%s\" in \"%s\" row "UUID_FMT
606 " (inserted within this transaction) contained "
607 "all-zeros UUID (probably as the default value for "
608 "this column) but deleting this value caused a "
609 "constraint volation because this column is not "
610 "allowed to be empty.", column->name,
611 table->schema->name, UUID_ARGS(row_uuid));
612 } else {
613 return ovsdb_error(
614 "constraint violation",
615 "Deletion of %u weak reference(s) to deleted (or "
616 "never-existing) rows from column \"%s\" in \"%s\" "
617 "row "UUID_FMT" %scaused this column to become empty, "
618 "but constraints on this column disallow an "
619 "empty column.",
620 orig_n - datum->n, column->name, table->schema->name,
621 UUID_ARGS(row_uuid),
622 (txn_row->old
623 ? ""
624 : "(inserted within this transaction) "));
625 }
626 }
627 }
628 }
629
630 return NULL;
631 }
632
633 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
634 determine_changes(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
635 {
636 struct ovsdb_table *table = txn_row->table;
637
638 if (txn_row->old && txn_row->new) {
639 struct shash_node *node;
640 bool changed = false;
641
642 SHASH_FOR_EACH (node, &table->schema->columns) {
643 const struct ovsdb_column *column = node->data;
644 const struct ovsdb_type *type = &column->type;
645 unsigned int idx = column->index;
646
647 if (!ovsdb_datum_equals(&txn_row->old->fields[idx],
648 &txn_row->new->fields[idx],
649 type)) {
650 bitmap_set1(txn_row->changed, idx);
651 changed = true;
652 }
653 }
654
655 if (!changed) {
656 /* Nothing actually changed in this row, so drop it. */
657 ovsdb_txn_row_abort(txn, txn_row);
658 }
659 } else {
660 bitmap_set_multiple(txn_row->changed, 0,
661 shash_count(&table->schema->columns), 1);
662 }
663
664 return NULL;
665 }
666
667 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
668 check_max_rows(struct ovsdb_txn *txn)
669 {
670 struct ovsdb_txn_table *t;
671
672 LIST_FOR_EACH (t, node, &txn->txn_tables) {
673 size_t n_rows = hmap_count(&t->table->rows);
674 unsigned int max_rows = t->table->schema->max_rows;
675
676 if (n_rows > max_rows) {
677 return ovsdb_error("constraint violation",
678 "transaction causes \"%s\" table to contain "
679 "%"PRIuSIZE" rows, greater than the schema-defined "
680 "limit of %u row(s)",
681 t->table->schema->name, n_rows, max_rows);
682 }
683 }
684
685 return NULL;
686 }
687
688 static struct ovsdb_row *
689 ovsdb_index_search(struct hmap *index, struct ovsdb_row *row, size_t i,
690 uint32_t hash)
691 {
692 const struct ovsdb_table *table = row->table;
693 const struct ovsdb_column_set *columns = &table->schema->indexes[i];
694 struct hmap_node *node;
695
696 for (node = hmap_first_with_hash(index, hash); node;
697 node = hmap_next_with_hash(node)) {
698 struct ovsdb_row *irow = ovsdb_row_from_index_node(node, table, i);
699 if (ovsdb_row_equal_columns(row, irow, columns)) {
700 return irow;
701 }
702 }
703
704 return NULL;
705 }
706
707 static void
708 duplicate_index_row__(const struct ovsdb_column_set *index,
709 const struct ovsdb_row *row,
710 const char *title,
711 struct ds *out)
712 {
713 size_t n_columns = shash_count(&row->table->schema->columns);
714
715 ds_put_format(out, "%s row, with UUID "UUID_FMT", ",
716 title, UUID_ARGS(ovsdb_row_get_uuid(row)));
717 if (!row->txn_row
718 || bitmap_scan(row->txn_row->changed, 1, 0, n_columns) == n_columns) {
719 ds_put_cstr(out, "existed in the database before this "
720 "transaction and was not modified by the transaction.");
721 } else if (!row->txn_row->old) {
722 ds_put_cstr(out, "was inserted by this transaction.");
723 } else if (ovsdb_row_equal_columns(row->txn_row->old,
724 row->txn_row->new, index)) {
725 ds_put_cstr(out, "existed in the database before this "
726 "transaction, which modified some of the row's columns "
727 "but not any columns in this index.");
728 } else {
729 ds_put_cstr(out, "had the following index values before the "
730 "transaction: ");
731 ovsdb_row_columns_to_string(row->txn_row->old, index, out);
732 ds_put_char(out, '.');
733 }
734 }
735
736 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
737 duplicate_index_row(const struct ovsdb_column_set *index,
738 const struct ovsdb_row *a,
739 const struct ovsdb_row *b)
740 {
741 struct ovsdb_column_set all_columns;
742 struct ovsdb_error *error;
743 char *index_s;
744 struct ds s;
745
746 /* Put 'a' and 'b' in a predictable order to make error messages
747 * reproducible for testing. */
748 ovsdb_column_set_init(&all_columns);
749 ovsdb_column_set_add_all(&all_columns, a->table);
750 if (ovsdb_row_compare_columns_3way(a, b, &all_columns) < 0) {
751 const struct ovsdb_row *tmp = a;
752 a = b;
753 b = tmp;
754 }
755 ovsdb_column_set_destroy(&all_columns);
756
757 index_s = ovsdb_column_set_to_string(index);
758
759 ds_init(&s);
760 ds_put_format(&s, "Transaction causes multiple rows in \"%s\" table to "
761 "have identical values (", a->table->schema->name);
762 ovsdb_row_columns_to_string(a, index, &s);
763 ds_put_format(&s, ") for index on %s. ", index_s);
764 duplicate_index_row__(index, a, "First", &s);
765 ds_put_cstr(&s, " ");
766 duplicate_index_row__(index, b, "Second", &s);
767
768 free(index_s);
769
770 error = ovsdb_error("constraint violation", "%s", ds_cstr(&s));
771 ds_destroy(&s);
772 return error;
773 }
774
775 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
776 check_index_uniqueness(struct ovsdb_txn *txn OVS_UNUSED,
777 struct ovsdb_txn_row *txn_row)
778 {
779 /* Skip rows that are being deleted since they can't violate uniqueness. */
780 struct ovsdb_row *row = txn_row->new;
781 if (!row) {
782 return NULL;
783 }
784
785 struct ovsdb_txn_table *txn_table = txn_row->table->txn_table;
786 struct ovsdb_table *table = txn_row->table;
787 for (size_t i = 0; i < table->schema->n_indexes; i++) {
788 const struct ovsdb_column_set *index = &table->schema->indexes[i];
789 uint32_t hash = ovsdb_row_hash_columns(row, index, 0);
790
791 /* Check whether the row has a match in the temporary hash table that
792 * we're building. If we add two rows with the same index data, then
793 * there's a duplicate within the rows added or modified in this
794 * transaction.*/
795 struct ovsdb_row *irow
796 = ovsdb_index_search(&txn_table->txn_indexes[i], row, i, hash);
797 if (irow) {
798 return duplicate_index_row(index, irow, row);
799 }
800
801 /* Also check whether the row has a match in the table's real index
802 * (which won't be updated until transaction commit is certain). If
803 * there's a match, and it's for a row that wasn't pulled into the
804 * transaction, then it's a duplicate. (If it is for a row that is
805 * part of the transaction, then the first check has already handled
806 * it.) */
807 irow = ovsdb_index_search(&table->indexes[i], row, i, hash);
808 if (irow && !irow->txn_row) {
809 return duplicate_index_row(index, irow, row);
810 }
811
812 /* Add row to temporary hash table. */
813 hmap_insert(&txn_table->txn_indexes[i],
814 ovsdb_row_get_index_node(row, i), hash);
815 }
816
817 return NULL;
818 }
819
820 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
821 update_version(struct ovsdb_txn *txn OVS_UNUSED, struct ovsdb_txn_row *txn_row)
822 {
823 struct ovsdb_table *table = txn_row->table;
824 size_t n_columns = shash_count(&table->schema->columns);
825
826 if (txn_row->old && txn_row->new
827 && !bitmap_is_all_zeros(txn_row->changed, n_columns)) {
828 bitmap_set1(txn_row->changed, OVSDB_COL_VERSION);
829 uuid_generate(ovsdb_row_get_version_rw(txn_row->new));
830 }
831
832 return NULL;
833 }
834
835 static bool
836 ovsdb_txn_is_empty(const struct ovsdb_txn *txn)
837 {
838 return ovs_list_is_empty(&txn->txn_tables);
839 }
840
841 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
842 ovsdb_txn_precommit(struct ovsdb_txn *txn)
843 {
844 struct ovsdb_error *error;
845
846 /* Figure out what actually changed, and abort early if the transaction
847 * was really a no-op. */
848 error = for_each_txn_row(txn, determine_changes);
849 if (error) {
850 ovsdb_txn_abort(txn);
851 return OVSDB_WRAP_BUG("can't happen", error);
852 }
853 if (ovs_list_is_empty(&txn->txn_tables)) {
854 return NULL;
855 }
856
857 /* Update reference counts and check referential integrity. */
858 error = update_ref_counts(txn);
859 if (error) {
860 return error;
861 }
862
863 /* Delete unreferenced, non-root rows. */
864 error = for_each_txn_row(txn, collect_garbage);
865 if (error) {
866 return OVSDB_WRAP_BUG("can't happen", error);
867 }
868
869 /* Check maximum rows table constraints. */
870 error = check_max_rows(txn);
871 if (error) {
872 return error;
873 }
874
875 /* Check reference counts and remove bad references for "weak" referential
876 * integrity. */
877 error = for_each_txn_row(txn, assess_weak_refs);
878 if (error) {
879 return error;
880 }
881
882 /* Verify that the indexes will still be unique post-transaction. */
883 error = for_each_txn_row(txn, check_index_uniqueness);
884 if (error) {
885 return error;
886 }
887
888 /* Update _version for rows that changed. */
889 error = for_each_txn_row(txn, update_version);
890 if (error) {
891 return OVSDB_WRAP_BUG("can't happen", error);
892 }
893
894 return error;
895 }
896
897 static struct ovsdb_txn*
898 ovsdb_txn_clone(const struct ovsdb_txn *txn)
899 {
900 struct ovsdb_txn *txn_cloned = xzalloc(sizeof *txn_cloned);
901 ovs_list_init(&txn_cloned->txn_tables);
902 txn_cloned->txnid = txn->txnid;
903
904 struct ovsdb_txn_table *t;
905 LIST_FOR_EACH (t, node, &txn->txn_tables) {
906 struct ovsdb_txn_table *t_cloned = xmalloc(sizeof *t_cloned);
907 ovs_list_push_back(&txn_cloned->txn_tables, &t_cloned->node);
908 hmap_init(&t_cloned->txn_rows);
909
910 struct ovsdb_txn_row *r;
911 HMAP_FOR_EACH (r, hmap_node, &t->txn_rows) {
912 size_t n_columns = shash_count(&t->table->schema->columns);
913 struct ovsdb_txn_row *r_cloned =
914 xzalloc(offsetof(struct ovsdb_txn_row, changed)
915 + bitmap_n_bytes(n_columns));
916
917 r_cloned->uuid = r->uuid;
918 r_cloned->table = r->table;
919 r_cloned->old = r->old ? ovsdb_row_clone(r->old) : NULL;
920 r_cloned->new = r->new ? ovsdb_row_clone(r->new) : NULL;
921 memcpy(&r_cloned->changed, &r->changed, bitmap_n_bytes(n_columns));
922 hmap_insert(&t_cloned->txn_rows, &r_cloned->hmap_node,
923 uuid_hash(&r_cloned->uuid));
924 }
925 }
926 return txn_cloned;
927 }
928
929 static void
930 ovsdb_txn_destroy_cloned(struct ovsdb_txn *txn)
931 {
932 ovs_assert(!txn->db);
933 struct ovsdb_txn_table *t, *next_txn_table;
934 LIST_FOR_EACH_SAFE (t, next_txn_table, node, &txn->txn_tables) {
935 struct ovsdb_txn_row *r, *next_txn_row;
936 HMAP_FOR_EACH_SAFE (r, next_txn_row, hmap_node, &t->txn_rows) {
937 if (r->old) {
938 ovsdb_row_destroy(r->old);
939 }
940 if (r->new) {
941 ovsdb_row_destroy(r->new);
942 }
943 hmap_remove(&t->txn_rows, &r->hmap_node);
944 free(r);
945 }
946 hmap_destroy(&t->txn_rows);
947 ovs_list_remove(&t->node);
948 free(t);
949 }
950 free(txn);
951 }
952
953 static void
954 ovsdb_txn_add_to_history(struct ovsdb_txn *txn)
955 {
956 if (txn->db->need_txn_history) {
957 struct ovsdb_txn_history_node *node = xzalloc(sizeof *node);
958 node->txn = ovsdb_txn_clone(txn);
959 ovs_list_push_back(&txn->db->txn_history, &node->node);
960 txn->db->n_txn_history++;
961 }
962 }
963
964 /* Finalize commit. */
965 void
966 ovsdb_txn_complete(struct ovsdb_txn *txn)
967 {
968 if (!ovsdb_txn_is_empty(txn)) {
969
970 txn->db->run_triggers_now = txn->db->run_triggers = true;
971 ovsdb_monitors_commit(txn->db, txn);
972 ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_update_weak_refs));
973 ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_row_commit));
974 }
975 ovsdb_txn_free(txn);
976 }
977
978 /* Applies 'txn' to the internal representation of the database. This is for
979 * transactions that don't need to be written to storage; probably, they came
980 * from storage. These transactions shouldn't ordinarily fail because storage
981 * should contain only consistent transactions. (One exception is for database
982 * conversion in ovsdb_convert().) */
983 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
984 ovsdb_txn_replay_commit(struct ovsdb_txn *txn)
985 {
986 struct ovsdb_error *error = ovsdb_txn_precommit(txn);
987 if (error) {
988 ovsdb_txn_abort(txn);
989 } else {
990 ovsdb_txn_add_to_history(txn);
991 ovsdb_txn_complete(txn);
992 }
993 return error;
994 }
995
996 /* If 'error' is nonnull, the transaction is complete, with the given error as
997 * the result.
998 *
999 * Otherwise, if 'write' is nonnull, then the transaction is waiting for
1000 * 'write' to complete.
1001 *
1002 * Otherwise, if 'commit_index' is nonzero, then the transaction is waiting for
1003 * 'commit_index' to be applied to the storage.
1004 *
1005 * Otherwise, the transaction is complete and successful. */
1006 struct ovsdb_txn_progress {
1007 struct ovsdb_error *error;
1008 struct ovsdb_write *write;
1009 uint64_t commit_index;
1010
1011 struct ovsdb_storage *storage;
1012 };
1013
1014 bool
1015 ovsdb_txn_precheck_prereq(const struct ovsdb *db)
1016 {
1017 const struct uuid *eid = ovsdb_storage_peek_last_eid(db->storage);
1018 if (!eid) {
1019 return true;
1020 }
1021 return uuid_equals(&db->prereq, eid);
1022 }
1023
1024 struct ovsdb_txn_progress *
1025 ovsdb_txn_propose_schema_change(struct ovsdb *db,
1026 const struct json *schema,
1027 const struct json *data)
1028 {
1029 struct ovsdb_txn_progress *progress = xzalloc(sizeof *progress);
1030 progress->storage = db->storage;
1031
1032 struct uuid next;
1033 struct ovsdb_write *write = ovsdb_storage_write_schema_change(
1034 db->storage, schema, data, &db->prereq, &next);
1035 if (!ovsdb_write_is_complete(write)) {
1036 progress->write = write;
1037 } else {
1038 progress->error = ovsdb_error_clone(ovsdb_write_get_error(write));
1039 ovsdb_write_destroy(write);
1040 }
1041 return progress;
1042 }
1043
1044 struct ovsdb_txn_progress *
1045 ovsdb_txn_propose_commit(struct ovsdb_txn *txn, bool durable)
1046 {
1047 struct ovsdb_txn_progress *progress = xzalloc(sizeof *progress);
1048 progress->storage = txn->db->storage;
1049 progress->error = ovsdb_txn_precommit(txn);
1050 if (progress->error) {
1051 return progress;
1052 }
1053
1054 /* Turn the commit into the format used for the storage logs.. */
1055 struct json *txn_json = ovsdb_file_txn_to_json(txn);
1056 if (!txn_json) {
1057 /* Nothing to do, so success. */
1058 return progress;
1059 }
1060 txn_json = ovsdb_file_txn_annotate(txn_json, ovsdb_txn_get_comment(txn));
1061
1062 struct uuid next;
1063 struct ovsdb_write *write = ovsdb_storage_write(
1064 txn->db->storage, txn_json, &txn->db->prereq, &next, durable);
1065 json_destroy(txn_json);
1066 if (!ovsdb_write_is_complete(write)) {
1067 progress->write = write;
1068 } else {
1069 progress->error = ovsdb_error_clone(ovsdb_write_get_error(write));
1070 ovsdb_write_destroy(write);
1071 }
1072 return progress;
1073 }
1074
1075 /* Proposes 'txn' for commitment and then waits for the commit to succeed or
1076 * fail. Returns null if successful, otherwise the error.
1077 *
1078 * **In addition**, this function also completes or aborts the transaction if
1079 * the transaction succeeded or failed, respectively. */
1080 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
1081 ovsdb_txn_propose_commit_block(struct ovsdb_txn *txn, bool durable)
1082 {
1083 struct ovsdb_txn_progress *p = ovsdb_txn_propose_commit(txn, durable);
1084 for (;;) {
1085 ovsdb_storage_run(p->storage);
1086 if (ovsdb_txn_progress_is_complete(p)) {
1087 struct ovsdb_error *error
1088 = ovsdb_error_clone(ovsdb_txn_progress_get_error(p));
1089 ovsdb_txn_progress_destroy(p);
1090
1091 if (error) {
1092 ovsdb_txn_abort(txn);
1093 } else {
1094 ovsdb_txn_complete(txn);
1095 }
1096
1097 return error;
1098 }
1099 ovsdb_storage_wait(p->storage);
1100 poll_block();
1101 }
1102 }
1103
1104 static void
1105 ovsdb_txn_progress_run(struct ovsdb_txn_progress *p)
1106 {
1107 if (p->error) {
1108 return;
1109 }
1110
1111 if (p->write) {
1112 if (!ovsdb_write_is_complete(p->write)) {
1113 return;
1114 }
1115 p->error = ovsdb_error_clone(ovsdb_write_get_error(p->write));
1116 p->commit_index = ovsdb_write_get_commit_index(p->write);
1117 ovsdb_write_destroy(p->write);
1118 p->write = NULL;
1119
1120 if (p->error) {
1121 return;
1122 }
1123 }
1124
1125 if (p->commit_index) {
1126 if (ovsdb_storage_get_applied_index(p->storage) >= p->commit_index) {
1127 p->commit_index = 0;
1128 }
1129 }
1130 }
1131
1132 static bool
1133 ovsdb_txn_progress_is_complete__(const struct ovsdb_txn_progress *p)
1134 {
1135 return p->error || (!p->write && !p->commit_index);
1136 }
1137
1138 bool
1139 ovsdb_txn_progress_is_complete(const struct ovsdb_txn_progress *p)
1140 {
1141 ovsdb_txn_progress_run(CONST_CAST(struct ovsdb_txn_progress *, p));
1142 return ovsdb_txn_progress_is_complete__(p);
1143 }
1144
1145 const struct ovsdb_error *
1146 ovsdb_txn_progress_get_error(const struct ovsdb_txn_progress *p)
1147 {
1148 ovs_assert(ovsdb_txn_progress_is_complete__(p));
1149 return p->error;
1150 }
1151
1152 void
1153 ovsdb_txn_progress_destroy(struct ovsdb_txn_progress *p)
1154 {
1155 if (p) {
1156 ovsdb_error_destroy(p->error);
1157 ovsdb_write_destroy(p->write);
1158 free(p);
1159 }
1160 }
1161
1162 void
1163 ovsdb_txn_for_each_change(const struct ovsdb_txn *txn,
1164 ovsdb_txn_row_cb_func *cb, void *aux)
1165 {
1166 struct ovsdb_txn_table *t;
1167 struct ovsdb_txn_row *r;
1168
1169 LIST_FOR_EACH (t, node, &txn->txn_tables) {
1170 HMAP_FOR_EACH (r, hmap_node, &t->txn_rows) {
1171 if ((r->old || r->new) && !cb(r->old, r->new, r->changed, aux)) {
1172 break;
1173 }
1174 }
1175 }
1176 }
1177
1178 static struct ovsdb_txn_table *
1179 ovsdb_txn_create_txn_table(struct ovsdb_txn *txn, struct ovsdb_table *table)
1180 {
1181 if (!table->txn_table) {
1182 struct ovsdb_txn_table *txn_table;
1183 size_t i;
1184
1185 table->txn_table = txn_table = xmalloc(sizeof *table->txn_table);
1186 txn_table->table = table;
1187 hmap_init(&txn_table->txn_rows);
1188 txn_table->serial = serial - 1;
1189 txn_table->txn_indexes = xmalloc(table->schema->n_indexes
1190 * sizeof *txn_table->txn_indexes);
1191 for (i = 0; i < table->schema->n_indexes; i++) {
1192 hmap_init(&txn_table->txn_indexes[i]);
1193 }
1194 ovs_list_push_back(&txn->txn_tables, &txn_table->node);
1195 }
1196 return table->txn_table;
1197 }
1198
1199 static struct ovsdb_txn_row *
1200 ovsdb_txn_row_create(struct ovsdb_txn *txn, struct ovsdb_table *table,
1201 const struct ovsdb_row *old_, struct ovsdb_row *new)
1202 {
1203 const struct ovsdb_row *row = old_ ? old_ : new;
1204 struct ovsdb_row *old = CONST_CAST(struct ovsdb_row *, old_);
1205 size_t n_columns = shash_count(&table->schema->columns);
1206 struct ovsdb_txn_table *txn_table;
1207 struct ovsdb_txn_row *txn_row;
1208
1209 txn_row = xzalloc(offsetof(struct ovsdb_txn_row, changed)
1210 + bitmap_n_bytes(n_columns));
1211 txn_row->uuid = *ovsdb_row_get_uuid(row);
1212 txn_row->table = row->table;
1213 txn_row->old = old;
1214 txn_row->new = new;
1215 txn_row->n_refs = old ? old->n_refs : 0;
1216 txn_row->serial = serial - 1;
1217
1218 if (old) {
1219 old->txn_row = txn_row;
1220 }
1221 if (new) {
1222 new->txn_row = txn_row;
1223 }
1224
1225 txn_table = ovsdb_txn_create_txn_table(txn, table);
1226 hmap_insert(&txn_table->txn_rows, &txn_row->hmap_node,
1227 ovsdb_row_hash(old ? old : new));
1228
1229 return txn_row;
1230 }
1231
1232 struct ovsdb_row *
1233 ovsdb_txn_row_modify(struct ovsdb_txn *txn, const struct ovsdb_row *ro_row_)
1234 {
1235 struct ovsdb_row *ro_row = CONST_CAST(struct ovsdb_row *, ro_row_);
1236
1237 if (ro_row->txn_row) {
1238 ovs_assert(ro_row == ro_row->txn_row->new);
1239 return ro_row;
1240 } else {
1241 struct ovsdb_table *table = ro_row->table;
1242 struct ovsdb_row *rw_row;
1243
1244 rw_row = ovsdb_row_clone(ro_row);
1245 rw_row->n_refs = ro_row->n_refs;
1246 ovsdb_txn_row_create(txn, table, ro_row, rw_row);
1247 hmap_replace(&table->rows, &ro_row->hmap_node, &rw_row->hmap_node);
1248
1249 return rw_row;
1250 }
1251 }
1252
1253 void
1254 ovsdb_txn_row_insert(struct ovsdb_txn *txn, struct ovsdb_row *row)
1255 {
1256 uint32_t hash = ovsdb_row_hash(row);
1257 struct ovsdb_table *table = row->table;
1258
1259 uuid_generate(ovsdb_row_get_version_rw(row));
1260
1261 ovsdb_txn_row_create(txn, table, NULL, row);
1262 hmap_insert(&table->rows, &row->hmap_node, hash);
1263 }
1264
1265 /* 'row' must be assumed destroyed upon return; the caller must not reference
1266 * it again. */
1267 void
1268 ovsdb_txn_row_delete(struct ovsdb_txn *txn, const struct ovsdb_row *row_)
1269 {
1270 struct ovsdb_row *row = CONST_CAST(struct ovsdb_row *, row_);
1271 struct ovsdb_table *table = row->table;
1272 struct ovsdb_txn_row *txn_row = row->txn_row;
1273
1274 hmap_remove(&table->rows, &row->hmap_node);
1275
1276 if (!txn_row) {
1277 ovsdb_txn_row_create(txn, table, row, NULL);
1278 } else {
1279 ovs_assert(txn_row->new == row);
1280 if (txn_row->old) {
1281 txn_row->new = NULL;
1282 } else {
1283 hmap_remove(&table->txn_table->txn_rows, &txn_row->hmap_node);
1284 free(txn_row);
1285 }
1286 ovsdb_row_destroy(row);
1287 }
1288 }
1289
1290 /* Returns true if 'row_uuid' may be used as the UUID for a newly created row
1291 * in 'table' (that is, that it is unique within 'table'), false otherwise. */
1292 bool
1293 ovsdb_txn_may_create_row(const struct ovsdb_table *table,
1294 const struct uuid *row_uuid)
1295 {
1296 /* If a row 'row_uuid' currently exists, disallow creating a duplicate. */
1297 if (ovsdb_table_get_row(table, row_uuid)) {
1298 return false;
1299 }
1300
1301 /* If a row 'row_uuid' previously existed in this transaction, disallow
1302 * creating a new row with the same UUID. */
1303 if (find_txn_row(table, row_uuid)) {
1304 return false;
1305 }
1306
1307 return true;
1308 }
1309
1310 void
1311 ovsdb_txn_add_comment(struct ovsdb_txn *txn, const char *s)
1312 {
1313 if (txn->comment.length) {
1314 ds_put_char(&txn->comment, '\n');
1315 }
1316 ds_put_cstr(&txn->comment, s);
1317 }
1318
1319 const char *
1320 ovsdb_txn_get_comment(const struct ovsdb_txn *txn)
1321 {
1322 return txn->comment.length ? ds_cstr_ro(&txn->comment) : NULL;
1323 }
1324 \f
1325 static void
1326 ovsdb_txn_row_prefree(struct ovsdb_txn_row *txn_row)
1327 {
1328 struct ovsdb_txn_table *txn_table = txn_row->table->txn_table;
1329
1330 txn_table->n_processed--;
1331 hmap_remove(&txn_table->txn_rows, &txn_row->hmap_node);
1332
1333 if (txn_row->old) {
1334 txn_row->old->txn_row = NULL;
1335 }
1336 if (txn_row->new) {
1337 txn_row->new->txn_row = NULL;
1338 }
1339 }
1340
1341 static void
1342 ovsdb_txn_table_destroy(struct ovsdb_txn_table *txn_table)
1343 {
1344 size_t i;
1345
1346 ovs_assert(hmap_is_empty(&txn_table->txn_rows));
1347
1348 for (i = 0; i < txn_table->table->schema->n_indexes; i++) {
1349 hmap_destroy(&txn_table->txn_indexes[i]);
1350 }
1351 free(txn_table->txn_indexes);
1352
1353 txn_table->table->txn_table = NULL;
1354 hmap_destroy(&txn_table->txn_rows);
1355 ovs_list_remove(&txn_table->node);
1356 free(txn_table);
1357 }
1358
1359 /* Calls 'cb' for every txn_row within 'txn'. If 'cb' returns nonnull, this
1360 * aborts the iteration and for_each_txn_row() passes the error up. Otherwise,
1361 * returns a null pointer after iteration is complete.
1362 *
1363 * 'cb' may insert new txn_rows and new txn_tables into 'txn'. It may delete
1364 * the txn_row that it is passed in, or txn_rows in txn_tables other than the
1365 * one passed to 'cb'. It may *not* delete txn_rows other than the one passed
1366 * in within the same txn_table. It may *not* delete any txn_tables. As long
1367 * as these rules are followed, 'cb' will be called exactly once for each
1368 * txn_row in 'txn', even those added by 'cb'.
1369 *
1370 * (Even though 'cb' is not allowed to delete some txn_rows, it can still
1371 * delete any actual row by clearing a txn_row's 'new' member.)
1372 */
1373 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
1374 for_each_txn_row(struct ovsdb_txn *txn,
1375 struct ovsdb_error *(*cb)(struct ovsdb_txn *,
1376 struct ovsdb_txn_row *))
1377 {
1378 bool any_work;
1379
1380 serial++;
1381
1382 do {
1383 struct ovsdb_txn_table *t, *next_txn_table;
1384
1385 any_work = false;
1386 LIST_FOR_EACH_SAFE (t, next_txn_table, node, &txn->txn_tables) {
1387 if (t->serial != serial) {
1388 t->serial = serial;
1389 t->n_processed = 0;
1390 }
1391
1392 while (t->n_processed < hmap_count(&t->txn_rows)) {
1393 struct ovsdb_txn_row *r, *next_txn_row;
1394
1395 HMAP_FOR_EACH_SAFE (r, next_txn_row, hmap_node, &t->txn_rows) {
1396 if (r->serial != serial) {
1397 struct ovsdb_error *error;
1398
1399 r->serial = serial;
1400 t->n_processed++;
1401 any_work = true;
1402
1403 error = cb(txn, r);
1404 if (error) {
1405 return error;
1406 }
1407 }
1408 }
1409 }
1410 if (hmap_is_empty(&t->txn_rows)) {
1411 /* Table is empty. Drop it. */
1412 ovsdb_txn_table_destroy(t);
1413 }
1414 }
1415 } while (any_work);
1416
1417 return NULL;
1418 }
1419
1420 void
1421 ovsdb_txn_history_run(struct ovsdb *db)
1422 {
1423 if (!db->need_txn_history) {
1424 return;
1425 }
1426 /* Remove old histories to limit the size of the history */
1427 while (db->n_txn_history > 100) {
1428 struct ovsdb_txn_history_node *txn_h_node = CONTAINER_OF(
1429 ovs_list_pop_front(&db->txn_history),
1430 struct ovsdb_txn_history_node, node);
1431
1432 ovsdb_txn_destroy_cloned(txn_h_node->txn);
1433 free(txn_h_node);
1434 db->n_txn_history--;
1435 }
1436 }
1437
1438 void
1439 ovsdb_txn_history_init(struct ovsdb *db, bool need_txn_history)
1440 {
1441 db->need_txn_history = need_txn_history;
1442 db->n_txn_history = 0;
1443 ovs_list_init(&db->txn_history);
1444 }
1445
1446 void
1447 ovsdb_txn_history_destroy(struct ovsdb *db)
1448 {
1449
1450 if (!db->need_txn_history) {
1451 return;
1452 }
1453
1454 struct ovsdb_txn_history_node *txn_h_node, *next;
1455 LIST_FOR_EACH_SAFE (txn_h_node, next, node, &db->txn_history) {
1456 ovs_list_remove(&txn_h_node->node);
1457 ovsdb_txn_destroy_cloned(txn_h_node->txn);
1458 free(txn_h_node);
1459 }
1460 db->n_txn_history = 0;
1461 }