]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/monitor.c
stream: Allow timeout configuration for open_block.
[mirror_ovs.git] / ovsdb / monitor.c
1 /*
2 * Copyright (c) 2015, 2017 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18
19 #include <errno.h>
20
21 #include "bitmap.h"
22 #include "column.h"
23 #include "openvswitch/dynamic-string.h"
24 #include "openvswitch/json.h"
25 #include "jsonrpc.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
28 #include "ovsdb.h"
29 #include "row.h"
30 #include "condition.h"
31 #include "simap.h"
32 #include "hash.h"
33 #include "table.h"
34 #include "hash.h"
35 #include "timeval.h"
36 #include "transaction.h"
37 #include "jsonrpc-server.h"
38 #include "monitor.h"
39 #include "util.h"
40 #include "openvswitch/vlog.h"
41
42 VLOG_DEFINE_THIS_MODULE(ovsdb_monitor);
43
44 static struct hmap ovsdb_monitors = HMAP_INITIALIZER(&ovsdb_monitors);
45
46 /* Keep state of session's conditions */
47 struct ovsdb_monitor_session_condition {
48 bool conditional; /* True iff every table's condition is true. */
49 struct shash tables; /* Contains
50 * "struct ovsdb_monitor_table_condition *"s. */
51 };
52
53 /* Monitored table session's conditions */
54 struct ovsdb_monitor_table_condition {
55 const struct ovsdb_table *table;
56 struct ovsdb_monitor_table *mt;
57 struct ovsdb_condition old_condition;
58 struct ovsdb_condition new_condition;
59 };
60
61 /* Backend monitor.
62 *
63 * ovsdb_monitor keep track of the ovsdb changes.
64 */
65
66 /* A collection of tables being monitored. */
67 struct ovsdb_monitor {
68 struct ovs_list list_node; /* In struct ovsdb's "monitors" list. */
69 struct shash tables; /* Holds "struct ovsdb_monitor_table"s. */
70 struct ovs_list jsonrpc_monitors; /* Contains "jsonrpc_monitor_node"s. */
71 struct ovsdb *db;
72 uint64_t n_transactions; /* Count number of committed transactions. */
73 struct hmap_node hmap_node; /* Elements within ovsdb_monitors. */
74 struct hmap json_cache; /* Contains "ovsdb_monitor_json_cache_node"s.*/
75 };
76
77 /* A json object of updates between 'from_txn' and 'dbmon->n_transactions'
78 * inclusive. */
79 struct ovsdb_monitor_json_cache_node {
80 struct hmap_node hmap_node; /* Elements in json cache. */
81 enum ovsdb_monitor_version version;
82 uint64_t from_txn;
83 struct json *json; /* Null, or a cloned of json */
84 };
85
86 struct jsonrpc_monitor_node {
87 struct ovs_list node;
88 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
89 };
90
91 /* A particular column being monitored. */
92 struct ovsdb_monitor_column {
93 const struct ovsdb_column *column;
94 enum ovsdb_monitor_selection select;
95 bool monitored;
96 };
97
98 /* A row that has changed in a monitored table. */
99 struct ovsdb_monitor_row {
100 struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
101 struct uuid uuid; /* UUID of row that changed. */
102 struct ovsdb_datum *old; /* Old data, NULL for an inserted row. */
103 struct ovsdb_datum *new; /* New data, NULL for a deleted row. */
104 };
105
106 /* Contains 'struct ovsdb_monitor_row's for rows that have been
107 * updated but not yet flushed to all the jsonrpc connection.
108 *
109 * 'n_refs' represent the number of jsonrpc connections that have
110 * not received updates. Generate the update for the last jsonprc
111 * connection will also destroy the whole "struct ovsdb_monitor_changes"
112 * object.
113 *
114 * 'transaction' stores the first update's transaction id.
115 * */
116 struct ovsdb_monitor_changes {
117 struct hmap_node hmap_node; /* Element in ovsdb_monitor_tables' changes
118 hmap. */
119 struct ovsdb_monitor_table *mt;
120 struct hmap rows;
121 int n_refs;
122 uint64_t transaction;
123 };
124
125 /* A particular table being monitored. */
126 struct ovsdb_monitor_table {
127 const struct ovsdb_table *table;
128
129 /* This is the union (bitwise-OR) of the 'select' values in all of the
130 * members of 'columns' below. */
131 enum ovsdb_monitor_selection select;
132
133 /* Columns being monitored. */
134 struct ovsdb_monitor_column *columns;
135 size_t n_columns;
136 size_t n_monitored_columns;
137 size_t allocated_columns;
138
139 /* Columns in ovsdb_monitor_row have different indexes then in
140 * ovsdb_row. This field maps between column->index to the index in the
141 * ovsdb_monitor_row. It is used for condition evaluation. */
142 unsigned int *columns_index_map;
143
144 /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
145 struct hmap changes;
146 };
147
148 enum ovsdb_monitor_row_type {
149 OVSDB_ROW,
150 OVSDB_MONITOR_ROW
151 };
152
153 typedef struct json *
154 (*compose_row_update_cb_func)
155 (const struct ovsdb_monitor_table *mt,
156 const struct ovsdb_monitor_session_condition * condition,
157 enum ovsdb_monitor_row_type row_type,
158 const void *,
159 bool initial, unsigned long int *changed);
160
161 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
162 static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
163 struct ovsdb_monitor_table *mt, uint64_t next_txn);
164 static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
165 struct ovsdb_monitor_table *mt, uint64_t unflushed);
166 static void ovsdb_monitor_changes_destroy(
167 struct ovsdb_monitor_changes *changes);
168 static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
169 uint64_t unflushed);
170
171 static uint32_t
172 json_cache_hash(enum ovsdb_monitor_version version, uint64_t from_txn)
173 {
174 uint32_t hash;
175
176 hash = hash_uint64(version);
177 hash = hash_uint64_basis(from_txn, hash);
178
179 return hash;
180 }
181
182 static struct ovsdb_monitor_json_cache_node *
183 ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon,
184 enum ovsdb_monitor_version version,
185 uint64_t from_txn)
186 {
187 struct ovsdb_monitor_json_cache_node *node;
188 uint32_t hash = json_cache_hash(version, from_txn);
189
190 HMAP_FOR_EACH_WITH_HASH(node, hmap_node, hash, &dbmon->json_cache) {
191 if (node->from_txn == from_txn && node->version == version) {
192 return node;
193 }
194 }
195
196 return NULL;
197 }
198
199 static void
200 ovsdb_monitor_json_cache_insert(struct ovsdb_monitor *dbmon,
201 enum ovsdb_monitor_version version,
202 uint64_t from_txn, struct json *json)
203 {
204 struct ovsdb_monitor_json_cache_node *node;
205 uint32_t hash = json_cache_hash(version, from_txn);
206
207 node = xmalloc(sizeof *node);
208
209 node->version = version;
210 node->from_txn = from_txn;
211 node->json = json ? json_clone(json) : NULL;
212
213 hmap_insert(&dbmon->json_cache, &node->hmap_node, hash);
214 }
215
216 static void
217 ovsdb_monitor_json_cache_flush(struct ovsdb_monitor *dbmon)
218 {
219 struct ovsdb_monitor_json_cache_node *node;
220
221 HMAP_FOR_EACH_POP(node, hmap_node, &dbmon->json_cache) {
222 json_destroy(node->json);
223 free(node);
224 }
225 }
226
227 static int
228 compare_ovsdb_monitor_column(const void *a_, const void *b_)
229 {
230 const struct ovsdb_monitor_column *a = a_;
231 const struct ovsdb_monitor_column *b = b_;
232
233 /* put all monitored columns at the begining */
234 if (a->monitored != b->monitored) {
235 return a->monitored ? -1 : 1;
236 }
237
238 return a->column < b->column ? -1 : a->column > b->column;
239 }
240
241 /* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
242 * given 'uuid', or NULL if there is no such row. */
243 static struct ovsdb_monitor_row *
244 ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
245 const struct uuid *uuid)
246 {
247 struct ovsdb_monitor_row *row;
248
249 HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
250 &changes->rows) {
251 if (uuid_equals(uuid, &row->uuid)) {
252 return row;
253 }
254 }
255 return NULL;
256 }
257
258 /* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
259 * copies of the data in 'row' drawn from the columns represented by
260 * mt->columns[]. Returns the array.
261 *
262 * If 'row' is NULL, returns NULL. */
263 static struct ovsdb_datum *
264 clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
265 const struct ovsdb_row *row)
266 {
267 struct ovsdb_datum *data;
268 size_t i;
269
270 if (!row) {
271 return NULL;
272 }
273
274 data = xmalloc(mt->n_columns * sizeof *data);
275 for (i = 0; i < mt->n_columns; i++) {
276 const struct ovsdb_column *c = mt->columns[i].column;
277 const struct ovsdb_datum *src = &row->fields[c->index];
278 struct ovsdb_datum *dst = &data[i];
279 const struct ovsdb_type *type = &c->type;
280
281 ovsdb_datum_clone(dst, src, type);
282 }
283 return data;
284 }
285
286 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
287 * in 'row' drawn from the columns represented by mt->columns[]. */
288 static void
289 update_monitor_row_data(const struct ovsdb_monitor_table *mt,
290 const struct ovsdb_row *row,
291 struct ovsdb_datum *data)
292 {
293 size_t i;
294
295 for (i = 0; i < mt->n_columns; i++) {
296 const struct ovsdb_column *c = mt->columns[i].column;
297 const struct ovsdb_datum *src = &row->fields[c->index];
298 struct ovsdb_datum *dst = &data[i];
299 const struct ovsdb_type *type = &c->type;
300
301 if (!ovsdb_datum_equals(src, dst, type)) {
302 ovsdb_datum_destroy(dst, type);
303 ovsdb_datum_clone(dst, src, type);
304 }
305 }
306 }
307
308 /* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
309 * from mt->columns[], plus 'data' itself. */
310 static void
311 free_monitor_row_data(const struct ovsdb_monitor_table *mt,
312 struct ovsdb_datum *data)
313 {
314 if (data) {
315 size_t i;
316
317 for (i = 0; i < mt->n_columns; i++) {
318 const struct ovsdb_column *c = mt->columns[i].column;
319
320 ovsdb_datum_destroy(&data[i], &c->type);
321 }
322 free(data);
323 }
324 }
325
326 /* Frees 'row', which must have been created from 'mt'. */
327 static void
328 ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
329 struct ovsdb_monitor_row *row)
330 {
331 if (row) {
332 free_monitor_row_data(mt, row->old);
333 free_monitor_row_data(mt, row->new);
334 free(row);
335 }
336 }
337
338 static void
339 ovsdb_monitor_columns_sort(struct ovsdb_monitor *dbmon)
340 {
341 int i;
342 struct shash_node *node;
343
344 SHASH_FOR_EACH (node, &dbmon->tables) {
345 struct ovsdb_monitor_table *mt = node->data;
346
347 qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
348 compare_ovsdb_monitor_column);
349 for (i = 0; i < mt->n_columns; i++) {
350 /* re-set index map due to sort */
351 mt->columns_index_map[mt->columns[i].column->index] = i;
352 }
353 }
354 }
355
356 void
357 ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
358 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
359 {
360 struct jsonrpc_monitor_node *jm;
361
362 jm = xzalloc(sizeof *jm);
363 jm->jsonrpc_monitor = jsonrpc_monitor;
364 ovs_list_push_back(&dbmon->jsonrpc_monitors, &jm->node);
365 }
366
367 struct ovsdb_monitor *
368 ovsdb_monitor_create(struct ovsdb *db,
369 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
370 {
371 struct ovsdb_monitor *dbmon;
372
373 dbmon = xzalloc(sizeof *dbmon);
374
375 ovs_list_push_back(&db->monitors, &dbmon->list_node);
376 ovs_list_init(&dbmon->jsonrpc_monitors);
377 dbmon->db = db;
378 dbmon->n_transactions = 0;
379 shash_init(&dbmon->tables);
380 hmap_node_nullify(&dbmon->hmap_node);
381 hmap_init(&dbmon->json_cache);
382
383 ovsdb_monitor_add_jsonrpc_monitor(dbmon, jsonrpc_monitor);
384 return dbmon;
385 }
386
387 void
388 ovsdb_monitor_add_table(struct ovsdb_monitor *m,
389 const struct ovsdb_table *table)
390 {
391 struct ovsdb_monitor_table *mt;
392 int i;
393 size_t n_columns = shash_count(&table->schema->columns);
394
395 mt = xzalloc(sizeof *mt);
396 mt->table = table;
397 shash_add(&m->tables, table->schema->name, mt);
398 hmap_init(&mt->changes);
399 mt->columns_index_map =
400 xmalloc(sizeof *mt->columns_index_map * n_columns);
401 for (i = 0; i < n_columns; i++) {
402 mt->columns_index_map[i] = -1;
403 }
404 }
405
406 const char *
407 ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
408 const struct ovsdb_table *table,
409 const struct ovsdb_column *column,
410 enum ovsdb_monitor_selection select,
411 bool monitored)
412 {
413 struct ovsdb_monitor_table *mt;
414 struct ovsdb_monitor_column *c;
415
416 mt = shash_find_data(&dbmon->tables, table->schema->name);
417
418 /* Check for column duplication. Return duplicated column name. */
419 if (mt->columns_index_map[column->index] != -1) {
420 return column->name;
421 }
422
423 if (mt->n_columns >= mt->allocated_columns) {
424 mt->columns = x2nrealloc(mt->columns, &mt->allocated_columns,
425 sizeof *mt->columns);
426 }
427
428 mt->select |= select;
429 mt->columns_index_map[column->index] = mt->n_columns;
430 c = &mt->columns[mt->n_columns++];
431 c->column = column;
432 c->select = select;
433 c->monitored = monitored;
434 if (monitored) {
435 mt->n_monitored_columns++;
436 }
437
438 return NULL;
439 }
440
441 static void
442 ovsdb_monitor_condition_add_columns(struct ovsdb_monitor *dbmon,
443 const struct ovsdb_table *table,
444 struct ovsdb_condition *condition)
445 {
446 size_t n_columns;
447 int i;
448 const struct ovsdb_column **columns =
449 ovsdb_condition_get_columns(condition, &n_columns);
450
451 for (i = 0; i < n_columns; i++) {
452 ovsdb_monitor_add_column(dbmon, table, columns[i],
453 OJMS_NONE, false);
454 }
455
456 free(columns);
457 }
458
459 /* Bind this session's condition to ovsdb_monitor */
460 void
461 ovsdb_monitor_condition_bind(struct ovsdb_monitor *dbmon,
462 struct ovsdb_monitor_session_condition *cond)
463 {
464 struct shash_node *node;
465
466 SHASH_FOR_EACH(node, &cond->tables) {
467 struct ovsdb_monitor_table_condition *mtc = node->data;
468 struct ovsdb_monitor_table *mt =
469 shash_find_data(&dbmon->tables, mtc->table->schema->name);
470
471 mtc->mt = mt;
472 ovsdb_monitor_condition_add_columns(dbmon, mtc->table,
473 &mtc->new_condition);
474 }
475 }
476
477 bool
478 ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
479 const struct ovsdb_table *table)
480 {
481 return shash_find_data(&m->tables, table->schema->name);
482 }
483
484 static struct ovsdb_monitor_changes *
485 ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
486 uint64_t next_txn)
487 {
488 struct ovsdb_monitor_changes *changes;
489
490 changes = xzalloc(sizeof *changes);
491
492 changes->transaction = next_txn;
493 changes->mt = mt;
494 changes->n_refs = 1;
495 hmap_init(&changes->rows);
496 hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
497
498 return changes;
499 };
500
501 static struct ovsdb_monitor_changes *
502 ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
503 uint64_t transaction)
504 {
505 struct ovsdb_monitor_changes *changes;
506 size_t hash = hash_uint64(transaction);
507
508 HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
509 if (changes->transaction == transaction) {
510 return changes;
511 }
512 }
513
514 return NULL;
515 }
516
517 /* Stop currently tracking changes to table 'mt' since 'transaction'. */
518 static void
519 ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
520 uint64_t transaction)
521 {
522 struct ovsdb_monitor_changes *changes =
523 ovsdb_monitor_table_find_changes(mt, transaction);
524 if (changes) {
525 if (--changes->n_refs == 0) {
526 hmap_remove(&mt->changes, &changes->hmap_node);
527 ovsdb_monitor_changes_destroy(changes);
528 }
529 }
530 }
531
532 /* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
533 */
534 static void
535 ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
536 uint64_t transaction)
537 {
538 struct ovsdb_monitor_changes *changes;
539
540 changes = ovsdb_monitor_table_find_changes(mt, transaction);
541 if (changes) {
542 changes->n_refs++;
543 } else {
544 ovsdb_monitor_table_add_changes(mt, transaction);
545 }
546 }
547
548 static void
549 ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
550 {
551 struct ovsdb_monitor_row *row, *next;
552
553 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
554 hmap_remove(&changes->rows, &row->hmap_node);
555 ovsdb_monitor_row_destroy(changes->mt, row);
556 }
557 hmap_destroy(&changes->rows);
558 free(changes);
559 }
560
561 static enum ovsdb_monitor_selection
562 ovsdb_monitor_row_update_type(bool initial, const bool old, const bool new)
563 {
564 return initial ? OJMS_INITIAL
565 : !old ? OJMS_INSERT
566 : !new ? OJMS_DELETE
567 : OJMS_MODIFY;
568 }
569
570 /* Set conditional monitoring mode only if we have non-empty condition in one
571 * of the tables at least */
572 static inline void
573 ovsdb_monitor_session_condition_set_mode(
574 struct ovsdb_monitor_session_condition *cond)
575 {
576 struct shash_node *node;
577
578 SHASH_FOR_EACH (node, &cond->tables) {
579 struct ovsdb_monitor_table_condition *mtc = node->data;
580
581 if (!ovsdb_condition_is_true(&mtc->new_condition)) {
582 cond->conditional = true;
583 return;
584 }
585 }
586 cond->conditional = false;
587 }
588
589 /* Returnes an empty allocated session's condition state holder */
590 struct ovsdb_monitor_session_condition *
591 ovsdb_monitor_session_condition_create(void)
592 {
593 struct ovsdb_monitor_session_condition *condition =
594 xzalloc(sizeof *condition);
595
596 condition->conditional = false;
597 shash_init(&condition->tables);
598 return condition;
599 }
600
601 void
602 ovsdb_monitor_session_condition_destroy(
603 struct ovsdb_monitor_session_condition *condition)
604 {
605 struct shash_node *node, *next;
606
607 if (!condition) {
608 return;
609 }
610
611 SHASH_FOR_EACH_SAFE (node, next, &condition->tables) {
612 struct ovsdb_monitor_table_condition *mtc = node->data;
613
614 ovsdb_condition_destroy(&mtc->new_condition);
615 ovsdb_condition_destroy(&mtc->old_condition);
616 shash_delete(&condition->tables, node);
617 free(mtc);
618 }
619 shash_destroy(&condition->tables);
620 free(condition);
621 }
622
623 struct ovsdb_error *
624 ovsdb_monitor_table_condition_create(
625 struct ovsdb_monitor_session_condition *condition,
626 const struct ovsdb_table *table,
627 const struct json *json_cnd)
628 {
629 struct ovsdb_monitor_table_condition *mtc;
630 struct ovsdb_error *error;
631
632 mtc = xzalloc(sizeof *mtc);
633 mtc->table = table;
634 ovsdb_condition_init(&mtc->old_condition);
635 ovsdb_condition_init(&mtc->new_condition);
636
637 if (json_cnd) {
638 error = ovsdb_condition_from_json(table->schema,
639 json_cnd,
640 NULL,
641 &mtc->old_condition);
642 if (error) {
643 free(mtc);
644 return error;
645 }
646 }
647
648 shash_add(&condition->tables, table->schema->name, mtc);
649 /* On session startup old == new condition */
650 ovsdb_condition_clone(&mtc->new_condition, &mtc->old_condition);
651 ovsdb_monitor_session_condition_set_mode(condition);
652
653 return NULL;
654 }
655
656 static bool
657 ovsdb_monitor_get_table_conditions(
658 const struct ovsdb_monitor_table *mt,
659 const struct ovsdb_monitor_session_condition *condition,
660 struct ovsdb_condition **old_condition,
661 struct ovsdb_condition **new_condition)
662 {
663 if (!condition) {
664 return false;
665 }
666
667 struct ovsdb_monitor_table_condition *mtc =
668 shash_find_data(&condition->tables, mt->table->schema->name);
669
670 if (!mtc) {
671 return false;
672 }
673 *old_condition = &mtc->old_condition;
674 *new_condition = &mtc->new_condition;
675
676 return true;
677 }
678
679 struct ovsdb_error *
680 ovsdb_monitor_table_condition_update(
681 struct ovsdb_monitor *dbmon,
682 struct ovsdb_monitor_session_condition *condition,
683 const struct ovsdb_table *table,
684 const struct json *cond_json)
685 {
686 if (!condition) {
687 return NULL;
688 }
689
690 struct ovsdb_monitor_table_condition *mtc =
691 shash_find_data(&condition->tables, table->schema->name);
692 struct ovsdb_error *error;
693 struct ovsdb_condition cond = OVSDB_CONDITION_INITIALIZER(&cond);
694
695 error = ovsdb_condition_from_json(table->schema, cond_json,
696 NULL, &cond);
697 if (error) {
698 return error;
699 }
700 ovsdb_condition_destroy(&mtc->new_condition);
701 ovsdb_condition_clone(&mtc->new_condition, &cond);
702 ovsdb_condition_destroy(&cond);
703 ovsdb_monitor_condition_add_columns(dbmon,
704 table,
705 &mtc->new_condition);
706
707 return NULL;
708 }
709
710 static void
711 ovsdb_monitor_table_condition_updated(struct ovsdb_monitor_table *mt,
712 struct ovsdb_monitor_session_condition *condition)
713 {
714 struct ovsdb_monitor_table_condition *mtc =
715 shash_find_data(&condition->tables, mt->table->schema->name);
716
717 if (mtc) {
718 /* If conditional monitoring - set old condition to new condition */
719 if (ovsdb_condition_cmp_3way(&mtc->old_condition,
720 &mtc->new_condition)) {
721 ovsdb_condition_destroy(&mtc->old_condition);
722 ovsdb_condition_clone(&mtc->old_condition, &mtc->new_condition);
723 ovsdb_monitor_session_condition_set_mode(condition);
724 }
725 }
726 }
727
728 static enum ovsdb_monitor_selection
729 ovsdb_monitor_row_update_type_condition(
730 const struct ovsdb_monitor_table *mt,
731 const struct ovsdb_monitor_session_condition *condition,
732 bool initial,
733 enum ovsdb_monitor_row_type row_type,
734 const struct ovsdb_datum *old,
735 const struct ovsdb_datum *new)
736 {
737 struct ovsdb_condition *old_condition, *new_condition;
738 enum ovsdb_monitor_selection type =
739 ovsdb_monitor_row_update_type(initial, old, new);
740
741 if (ovsdb_monitor_get_table_conditions(mt,
742 condition,
743 &old_condition,
744 &new_condition)) {
745 bool old_cond = !old ? false
746 : ovsdb_condition_empty_or_match_any(old,
747 old_condition,
748 row_type == OVSDB_MONITOR_ROW ?
749 mt->columns_index_map :
750 NULL);
751 bool new_cond = !new ? false
752 : ovsdb_condition_empty_or_match_any(new,
753 new_condition,
754 row_type == OVSDB_MONITOR_ROW ?
755 mt->columns_index_map :
756 NULL);
757
758 if (!old_cond && !new_cond) {
759 type = OJMS_NONE;
760 }
761
762 switch (type) {
763 case OJMS_INITIAL:
764 case OJMS_INSERT:
765 if (!new_cond) {
766 type = OJMS_NONE;
767 }
768 break;
769 case OJMS_MODIFY:
770 type = !old_cond ? OJMS_INSERT : !new_cond
771 ? OJMS_DELETE : OJMS_MODIFY;
772 break;
773 case OJMS_DELETE:
774 if (!old_cond) {
775 type = OJMS_NONE;
776 }
777 break;
778 case OJMS_NONE:
779 break;
780 }
781 }
782 return type;
783 }
784
785 static bool
786 ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table *mt,
787 enum ovsdb_monitor_row_type row_type,
788 const struct ovsdb_datum *old,
789 const struct ovsdb_datum *new,
790 enum ovsdb_monitor_selection type,
791 unsigned long int *changed)
792 {
793 if (!(mt->select & type)) {
794 return true;
795 }
796
797 if (type == OJMS_MODIFY) {
798 size_t i, n_changes;
799
800 n_changes = 0;
801 memset(changed, 0, bitmap_n_bytes(mt->n_columns));
802 for (i = 0; i < mt->n_columns; i++) {
803 const struct ovsdb_column *c = mt->columns[i].column;
804 size_t index = row_type == OVSDB_ROW ? c->index : i;
805 if (!ovsdb_datum_equals(&old[index], &new[index], &c->type)) {
806 bitmap_set1(changed, i);
807 n_changes++;
808 }
809 }
810 if (!n_changes) {
811 /* No actual changes: presumably a row changed and then
812 * changed back later. */
813 return true;
814 }
815 }
816
817 return false;
818 }
819
820 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
821 * 'mt', or NULL if no row update should be sent.
822 *
823 * The caller should specify 'initial' as true if the returned JSON is going to
824 * be used as part of the initial reply to a "monitor" request, false if it is
825 * going to be used as part of an "update" notification.
826 *
827 * 'changed' must be a scratch buffer for internal use that is at least
828 * bitmap_n_bytes(mt->n_columns) bytes long. */
829 static struct json *
830 ovsdb_monitor_compose_row_update(
831 const struct ovsdb_monitor_table *mt,
832 const struct ovsdb_monitor_session_condition *condition OVS_UNUSED,
833 enum ovsdb_monitor_row_type row_type OVS_UNUSED,
834 const void *_row,
835 bool initial, unsigned long int *changed)
836 {
837 const struct ovsdb_monitor_row *row = _row;
838 enum ovsdb_monitor_selection type;
839 struct json *old_json, *new_json;
840 struct json *row_json;
841 size_t i;
842
843 ovs_assert(row_type == OVSDB_MONITOR_ROW);
844 type = ovsdb_monitor_row_update_type(initial, row->old, row->new);
845 if (ovsdb_monitor_row_skip_update(mt, row_type, row->old,
846 row->new, type, changed)) {
847 return NULL;
848 }
849
850 row_json = json_object_create();
851 old_json = new_json = NULL;
852 if (type & (OJMS_DELETE | OJMS_MODIFY)) {
853 old_json = json_object_create();
854 json_object_put(row_json, "old", old_json);
855 }
856 if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
857 new_json = json_object_create();
858 json_object_put(row_json, "new", new_json);
859 }
860 for (i = 0; i < mt->n_monitored_columns; i++) {
861 const struct ovsdb_monitor_column *c = &mt->columns[i];
862
863 if (!c->monitored || !(type & c->select)) {
864 /* We don't care about this type of change for this
865 * particular column (but we will care about it for some
866 * other column). */
867 continue;
868 }
869
870 if ((type == OJMS_MODIFY && bitmap_is_set(changed, i))
871 || type == OJMS_DELETE) {
872 json_object_put(old_json, c->column->name,
873 ovsdb_datum_to_json(&row->old[i],
874 &c->column->type));
875 }
876 if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
877 json_object_put(new_json, c->column->name,
878 ovsdb_datum_to_json(&row->new[i],
879 &c->column->type));
880 }
881 }
882
883 return row_json;
884 }
885
886 /* Returns JSON for a <row-update2> (as described in ovsdb-server(1) mapage)
887 * for 'row' within * 'mt', or NULL if no row update should be sent.
888 *
889 * The caller should specify 'initial' as true if the returned JSON is
890 * going to be used as part of the initial reply to a "monitor_cond" request,
891 * false if it is going to be used as part of an "update2" notification.
892 *
893 * 'changed' must be a scratch buffer for internal use that is at least
894 * bitmap_n_bytes(mt->n_columns) bytes long. */
895 static struct json *
896 ovsdb_monitor_compose_row_update2(
897 const struct ovsdb_monitor_table *mt,
898 const struct ovsdb_monitor_session_condition *condition,
899 enum ovsdb_monitor_row_type row_type,
900 const void *_row,
901 bool initial, unsigned long int *changed)
902 {
903 enum ovsdb_monitor_selection type;
904 struct json *row_update2, *diff_json;
905 const struct ovsdb_datum *old, *new;
906 size_t i;
907
908 if (row_type == OVSDB_MONITOR_ROW) {
909 old = ((const struct ovsdb_monitor_row *)_row)->old;;
910 new = ((const struct ovsdb_monitor_row *)_row)->new;
911 } else {
912 old = new = ((const struct ovsdb_row *)_row)->fields;
913 }
914
915 type = ovsdb_monitor_row_update_type_condition(mt, condition, initial,
916 row_type, old, new);
917 if (ovsdb_monitor_row_skip_update(mt, row_type, old, new, type, changed)) {
918 return NULL;
919 }
920
921 row_update2 = json_object_create();
922 if (type == OJMS_DELETE) {
923 json_object_put(row_update2, "delete", json_null_create());
924 } else {
925 diff_json = json_object_create();
926 const char *op;
927
928 for (i = 0; i < mt->n_monitored_columns; i++) {
929 const struct ovsdb_monitor_column *c = &mt->columns[i];
930 size_t index = row_type == OVSDB_ROW ? c->column->index : i;
931 if (!c->monitored || !(type & c->select)) {
932 /* We don't care about this type of change for this
933 * particular column (but we will care about it for some
934 * other column). */
935 continue;
936 }
937
938 if (type == OJMS_MODIFY) {
939 struct ovsdb_datum diff;
940
941 if (!bitmap_is_set(changed, i)) {
942 continue;
943 }
944
945 ovsdb_datum_diff(&diff ,&old[index], &new[index],
946 &c->column->type);
947 json_object_put(diff_json, c->column->name,
948 ovsdb_datum_to_json(&diff, &c->column->type));
949 ovsdb_datum_destroy(&diff, &c->column->type);
950 } else {
951 if (!ovsdb_datum_is_default(&new[index], &c->column->type)) {
952 json_object_put(diff_json, c->column->name,
953 ovsdb_datum_to_json(&new[index],
954 &c->column->type));
955 }
956 }
957 }
958
959 op = type == OJMS_INITIAL ? "initial"
960 : type == OJMS_MODIFY ? "modify" : "insert";
961 json_object_put(row_update2, op, diff_json);
962 }
963
964 return row_update2;
965 }
966
967 static size_t
968 ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon)
969 {
970 struct shash_node *node;
971 size_t max_columns = 0;
972
973 SHASH_FOR_EACH (node, &dbmon->tables) {
974 struct ovsdb_monitor_table *mt = node->data;
975
976 max_columns = MAX(max_columns, mt->n_columns);
977 }
978
979 return max_columns;
980 }
981
982 static void
983 ovsdb_monitor_add_json_row(struct json **json, const char *table_name,
984 struct json **table_json, struct json *row_json,
985 const struct uuid *row_uuid)
986 {
987 char uuid[UUID_LEN + 1];
988
989 /* Create JSON object for transaction overall. */
990 if (!*json) {
991 *json = json_object_create();
992 }
993
994 /* Create JSON object for transaction on this table. */
995 if (!*table_json) {
996 *table_json = json_object_create();
997 json_object_put(*json, table_name, *table_json);
998 }
999
1000 /* Add JSON row to JSON table. */
1001 snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(row_uuid));
1002 json_object_put(*table_json, uuid, row_json);
1003 }
1004
1005 /* Constructs and returns JSON for a <table-updates> object (as described in
1006 * RFC 7047) for all the outstanding changes within 'monitor', starting from
1007 * 'transaction'. */
1008 static struct json*
1009 ovsdb_monitor_compose_update(
1010 struct ovsdb_monitor *dbmon,
1011 bool initial, uint64_t transaction,
1012 const struct ovsdb_monitor_session_condition *condition,
1013 compose_row_update_cb_func row_update)
1014 {
1015 struct shash_node *node;
1016 struct json *json;
1017 size_t max_columns = ovsdb_monitor_max_columns(dbmon);
1018 unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
1019
1020 json = NULL;
1021 SHASH_FOR_EACH (node, &dbmon->tables) {
1022 struct ovsdb_monitor_table *mt = node->data;
1023 struct ovsdb_monitor_row *row, *next;
1024 struct ovsdb_monitor_changes *changes;
1025 struct json *table_json = NULL;
1026
1027 changes = ovsdb_monitor_table_find_changes(mt, transaction);
1028 if (!changes) {
1029 continue;
1030 }
1031
1032 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
1033 struct json *row_json;
1034 row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row,
1035 initial, changed);
1036 if (row_json) {
1037 ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
1038 &table_json, row_json,
1039 &row->uuid);
1040 }
1041 }
1042 }
1043 free(changed);
1044
1045 return json;
1046 }
1047
1048 static struct json*
1049 ovsdb_monitor_compose_cond_change_update(
1050 struct ovsdb_monitor *dbmon,
1051 struct ovsdb_monitor_session_condition *condition)
1052 {
1053 struct shash_node *node;
1054 struct json *json = NULL;
1055 size_t max_columns = ovsdb_monitor_max_columns(dbmon);
1056 unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
1057
1058 SHASH_FOR_EACH (node, &dbmon->tables) {
1059 struct ovsdb_monitor_table *mt = node->data;
1060 struct ovsdb_row *row;
1061 struct json *table_json = NULL;
1062 struct ovsdb_condition *old_condition, *new_condition;
1063
1064 if (!ovsdb_monitor_get_table_conditions(mt,
1065 condition,
1066 &old_condition,
1067 &new_condition) ||
1068 !ovsdb_condition_cmp_3way(old_condition, new_condition)) {
1069 /* Nothing to update on this table */
1070 continue;
1071 }
1072
1073 /* Iterate over all rows in table */
1074 HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
1075 struct json *row_json;
1076
1077 row_json = ovsdb_monitor_compose_row_update2(mt, condition,
1078 OVSDB_ROW, row,
1079 false, changed);
1080 if (row_json) {
1081 ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
1082 &table_json, row_json,
1083 ovsdb_row_get_uuid(row));
1084 }
1085 }
1086 ovsdb_monitor_table_condition_updated(mt, condition);
1087 }
1088 free(changed);
1089
1090 return json;
1091 }
1092
1093 /* Returns JSON for a <table-updates> object (as described in RFC 7047)
1094 * for all the outstanding changes within 'monitor' that starts from
1095 * '*unflushed'.
1096 * If cond_updated is true all rows in the db that match conditions will be
1097 * sent.
1098 *
1099 * The caller should specify 'initial' as true if the returned JSON is going to
1100 * be used as part of the initial reply to a "monitor" request, false if it is
1101 * going to be used as part of an "update" notification. */
1102 struct json *
1103 ovsdb_monitor_get_update(
1104 struct ovsdb_monitor *dbmon,
1105 bool initial, bool cond_updated,
1106 uint64_t *unflushed_,
1107 struct ovsdb_monitor_session_condition *condition,
1108 enum ovsdb_monitor_version version)
1109 {
1110 struct ovsdb_monitor_json_cache_node *cache_node = NULL;
1111 struct shash_node *node;
1112 struct json *json;
1113 const uint64_t unflushed = *unflushed_;
1114 const uint64_t next_unflushed = dbmon->n_transactions + 1;
1115
1116 ovs_assert(cond_updated ? unflushed == next_unflushed : true);
1117
1118 /* Return a clone of cached json if one exists. Otherwise,
1119 * generate a new one and add it to the cache. */
1120 if (!condition || (!condition->conditional && !cond_updated)) {
1121 cache_node = ovsdb_monitor_json_cache_search(dbmon, version,
1122 unflushed);
1123 }
1124 if (cache_node) {
1125 json = cache_node->json ? json_clone(cache_node->json) : NULL;
1126 } else {
1127 if (version == OVSDB_MONITOR_V1) {
1128 json =
1129 ovsdb_monitor_compose_update(dbmon, initial, unflushed,
1130 condition,
1131 ovsdb_monitor_compose_row_update);
1132 } else {
1133 ovs_assert(version == OVSDB_MONITOR_V2);
1134 if (!cond_updated) {
1135 json = ovsdb_monitor_compose_update(dbmon, initial, unflushed,
1136 condition,
1137 ovsdb_monitor_compose_row_update2);
1138
1139 if (!condition || !condition->conditional) {
1140 ovsdb_monitor_json_cache_insert(dbmon, version, unflushed,
1141 json);
1142 }
1143 } else {
1144 /* Compose update on whole db due to condition update.
1145 Session must be flushed (change list is empty)*/
1146 json =
1147 ovsdb_monitor_compose_cond_change_update(dbmon, condition);
1148 }
1149 }
1150 }
1151
1152 /* Maintain transaction id of 'changes'. */
1153 SHASH_FOR_EACH (node, &dbmon->tables) {
1154 struct ovsdb_monitor_table *mt = node->data;
1155
1156 ovsdb_monitor_table_untrack_changes(mt, unflushed);
1157 ovsdb_monitor_table_track_changes(mt, next_unflushed);
1158 }
1159 *unflushed_ = next_unflushed;
1160
1161 return json;
1162 }
1163
1164 bool
1165 ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
1166 uint64_t next_transaction)
1167 {
1168 ovs_assert(next_transaction <= dbmon->n_transactions + 1);
1169 return (next_transaction <= dbmon->n_transactions);
1170 }
1171
1172 void
1173 ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
1174 const struct ovsdb_table *table,
1175 enum ovsdb_monitor_selection select)
1176 {
1177 struct ovsdb_monitor_table * mt;
1178
1179 mt = shash_find_data(&dbmon->tables, table->schema->name);
1180 mt->select |= select;
1181 }
1182
1183 /*
1184 * If a row's change type (insert, delete or modify) matches that of
1185 * the monitor, they should be sent to the monitor's clients as updates.
1186 * Of cause, the monitor should also internally update with this change.
1187 *
1188 * When a change type does not require client side update, the monitor
1189 * may still need to keep track of certain changes in order to generate
1190 * correct future updates. For example, the monitor internal state should
1191 * be updated whenever a new row is inserted, in order to generate the
1192 * correct initial state, regardless if a insert change type is being
1193 * monitored.
1194 *
1195 * On the other hand, if a transaction only contains changes to columns
1196 * that are not monitored, this transaction can be safely ignored by the
1197 * monitor.
1198 *
1199 * Thus, the order of the declaration is important:
1200 * 'OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE' always implies
1201 * 'OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE', but not vice versa. */
1202 enum ovsdb_monitor_changes_efficacy {
1203 OVSDB_CHANGES_NO_EFFECT, /* Monitor does not care about this
1204 change. */
1205 OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE, /* Monitor internal updates. */
1206 OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE, /* Client needs to be updated. */
1207 };
1208
1209 struct ovsdb_monitor_aux {
1210 const struct ovsdb_monitor *monitor;
1211 struct ovsdb_monitor_table *mt;
1212 enum ovsdb_monitor_changes_efficacy efficacy;
1213 };
1214
1215 static void
1216 ovsdb_monitor_init_aux(struct ovsdb_monitor_aux *aux,
1217 const struct ovsdb_monitor *m)
1218 {
1219 aux->monitor = m;
1220 aux->mt = NULL;
1221 aux->efficacy = OVSDB_CHANGES_NO_EFFECT;
1222 }
1223
1224 static void
1225 ovsdb_monitor_changes_update(const struct ovsdb_row *old,
1226 const struct ovsdb_row *new,
1227 const struct ovsdb_monitor_table *mt,
1228 struct ovsdb_monitor_changes *changes)
1229 {
1230 const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
1231 struct ovsdb_monitor_row *change;
1232
1233 change = ovsdb_monitor_changes_row_find(changes, uuid);
1234 if (!change) {
1235 change = xzalloc(sizeof *change);
1236 hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
1237 change->uuid = *uuid;
1238 change->old = clone_monitor_row_data(mt, old);
1239 change->new = clone_monitor_row_data(mt, new);
1240 } else {
1241 if (new) {
1242 if (!change->new) {
1243 /* Reinsert the row that was just deleted.
1244 *
1245 * This path won't be hit without replication. Whenever OVSDB
1246 * server inserts a new row, It always generates a new UUID
1247 * that is different from the row just deleted.
1248 *
1249 * With replication, this path can be hit in a corner
1250 * case when two OVSDB servers are set up to replicate
1251 * each other. Not that is a useful set up, but can
1252 * happen in practice.
1253 *
1254 * An example of how this path can be hit is documented below.
1255 * The details is not as important to the correctness of the
1256 * logic, but added here to convince ourselves that this path
1257 * can be hit.
1258 *
1259 * Imagine two OVSDB servers that replicates from each
1260 * other. For each replication session, there is a
1261 * corresponding monitor at the other end of the replication
1262 * JSONRPC connection.
1263 *
1264 * The events can lead to a back to back deletion and
1265 * insertion operation of the same row for the monitor of
1266 * the first server are:
1267 *
1268 * 1. A row is inserted in the first OVSDB server.
1269 * 2. The row is then replicated to the remote OVSDB server.
1270 * 3. The row is now deleted by the local OVSDB server. This
1271 * deletion operation is replicated to the local monitor
1272 * of the OVSDB server.
1273 * 4. The monitor now receives the same row, as an insertion,
1274 * from the replication server. Because of
1275 * replication, the row carries the same UUID as the row
1276 * just deleted.
1277 */
1278 change->new = clone_monitor_row_data(mt, new);
1279 } else {
1280 update_monitor_row_data(mt, new, change->new);
1281 }
1282 } else {
1283 free_monitor_row_data(mt, change->new);
1284 change->new = NULL;
1285
1286 if (!change->old) {
1287 /* This row was added then deleted. Forget about it. */
1288 hmap_remove(&changes->rows, &change->hmap_node);
1289 free(change);
1290 }
1291 }
1292 }
1293 }
1294
1295 static bool
1296 ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table *mt,
1297 const unsigned long int *changed)
1298 {
1299 size_t i;
1300
1301 for (i = 0; i < mt->n_columns; i++) {
1302 size_t column_index = mt->columns[i].column->index;
1303
1304 if (bitmap_is_set(changed, column_index)) {
1305 return true;
1306 }
1307 }
1308
1309 return false;
1310 }
1311
1312 /* Return the efficacy of a row's change to a monitor table.
1313 *
1314 * Please see the block comment above 'ovsdb_monitor_changes_efficacy'
1315 * definition form more information. */
1316 static enum ovsdb_monitor_changes_efficacy
1317 ovsdb_monitor_changes_classify(enum ovsdb_monitor_selection type,
1318 const struct ovsdb_monitor_table *mt,
1319 const unsigned long int *changed)
1320 {
1321 if (type == OJMS_MODIFY &&
1322 !ovsdb_monitor_columns_changed(mt, changed)) {
1323 return OVSDB_CHANGES_NO_EFFECT;
1324 }
1325
1326 if (type == OJMS_MODIFY) {
1327 /* Condition might turn a modify operation to insert or delete */
1328 type |= OJMS_INSERT | OJMS_DELETE;
1329 }
1330
1331 return (mt->select & type)
1332 ? OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE
1333 : OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE;
1334 }
1335
1336 static bool
1337 ovsdb_monitor_change_cb(const struct ovsdb_row *old,
1338 const struct ovsdb_row *new,
1339 const unsigned long int *changed,
1340 void *aux_)
1341 {
1342 struct ovsdb_monitor_aux *aux = aux_;
1343 const struct ovsdb_monitor *m = aux->monitor;
1344 struct ovsdb_table *table = new ? new->table : old->table;
1345 struct ovsdb_monitor_table *mt;
1346 struct ovsdb_monitor_changes *changes;
1347
1348 if (!aux->mt || table != aux->mt->table) {
1349 aux->mt = shash_find_data(&m->tables, table->schema->name);
1350 if (!aux->mt) {
1351 /* We don't care about rows in this table at all. Tell the caller
1352 * to skip it. */
1353 return false;
1354 }
1355 }
1356 mt = aux->mt;
1357
1358 enum ovsdb_monitor_selection type =
1359 ovsdb_monitor_row_update_type(false, old, new);
1360 enum ovsdb_monitor_changes_efficacy efficacy =
1361 ovsdb_monitor_changes_classify(type, mt, changed);
1362
1363 HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
1364 if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
1365 ovsdb_monitor_changes_update(old, new, mt, changes);
1366 }
1367 }
1368 if (aux->efficacy < efficacy) {
1369 aux->efficacy = efficacy;
1370 }
1371
1372 return true;
1373 }
1374
1375 void
1376 ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
1377 {
1378 struct shash_node *node;
1379
1380 SHASH_FOR_EACH (node, &dbmon->tables) {
1381 struct ovsdb_monitor_table *mt = node->data;
1382
1383 if (mt->select & OJMS_INITIAL) {
1384 struct ovsdb_row *row;
1385 struct ovsdb_monitor_changes *changes;
1386
1387 changes = ovsdb_monitor_table_find_changes(mt, 0);
1388 if (!changes) {
1389 changes = ovsdb_monitor_table_add_changes(mt, 0);
1390 HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
1391 ovsdb_monitor_changes_update(NULL, row, mt, changes);
1392 }
1393 } else {
1394 changes->n_refs++;
1395 }
1396 }
1397 }
1398 }
1399
1400 void
1401 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
1402 struct ovsdb_jsonrpc_monitor *jsonrpc_monitor,
1403 uint64_t unflushed)
1404 {
1405 struct jsonrpc_monitor_node *jm;
1406
1407 if (ovs_list_is_empty(&dbmon->jsonrpc_monitors)) {
1408 ovsdb_monitor_destroy(dbmon);
1409 return;
1410 }
1411
1412 /* Find and remove the jsonrpc monitor from the list. */
1413 LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
1414 if (jm->jsonrpc_monitor == jsonrpc_monitor) {
1415 /* Release the tracked changes. */
1416 struct shash_node *node;
1417 SHASH_FOR_EACH (node, &dbmon->tables) {
1418 struct ovsdb_monitor_table *mt = node->data;
1419 ovsdb_monitor_table_untrack_changes(mt, unflushed);
1420 }
1421 ovs_list_remove(&jm->node);
1422 free(jm);
1423
1424 /* Destroy ovsdb monitor if this is the last user. */
1425 if (ovs_list_is_empty(&dbmon->jsonrpc_monitors)) {
1426 ovsdb_monitor_destroy(dbmon);
1427 }
1428
1429 return;
1430 };
1431 }
1432
1433 /* Should never reach here. jsonrpc_monitor should be on the list. */
1434 OVS_NOT_REACHED();
1435 }
1436
1437 static bool
1438 ovsdb_monitor_table_equal(const struct ovsdb_monitor_table *a,
1439 const struct ovsdb_monitor_table *b)
1440 {
1441 size_t i;
1442
1443 ovs_assert(b->n_columns == b->n_monitored_columns);
1444
1445 if ((a->table != b->table) ||
1446 (a->select != b->select) ||
1447 (a->n_monitored_columns != b->n_monitored_columns)) {
1448 return false;
1449 }
1450
1451 /* Compare only monitored columns that must be sorted already */
1452 for (i = 0; i < a->n_monitored_columns; i++) {
1453 if ((a->columns[i].column != b->columns[i].column) ||
1454 (a->columns[i].select != b->columns[i].select)) {
1455 return false;
1456 }
1457 }
1458 return true;
1459 }
1460
1461 static bool
1462 ovsdb_monitor_equal(const struct ovsdb_monitor *a,
1463 const struct ovsdb_monitor *b)
1464 {
1465 struct shash_node *node;
1466
1467 if (shash_count(&a->tables) != shash_count(&b->tables)) {
1468 return false;
1469 }
1470
1471 SHASH_FOR_EACH(node, &a->tables) {
1472 const struct ovsdb_monitor_table *mta = node->data;
1473 const struct ovsdb_monitor_table *mtb;
1474
1475 mtb = shash_find_data(&b->tables, node->name);
1476 if (!mtb) {
1477 return false;
1478 }
1479
1480 if (!ovsdb_monitor_table_equal(mta, mtb)) {
1481 return false;
1482 }
1483 }
1484
1485 return true;
1486 }
1487
1488 static size_t
1489 ovsdb_monitor_hash(const struct ovsdb_monitor *dbmon, size_t basis)
1490 {
1491 const struct shash_node **nodes;
1492 size_t i, j, n;
1493
1494 nodes = shash_sort(&dbmon->tables);
1495 n = shash_count(&dbmon->tables);
1496
1497 for (i = 0; i < n; i++) {
1498 struct ovsdb_monitor_table *mt = nodes[i]->data;
1499
1500 basis = hash_pointer(mt->table, basis);
1501 basis = hash_3words(mt->select, mt->n_columns, basis);
1502
1503 for (j = 0; j < mt->n_columns; j++) {
1504 basis = hash_pointer(mt->columns[j].column, basis);
1505 basis = hash_2words(mt->columns[j].select, basis);
1506 }
1507 }
1508 free(nodes);
1509
1510 return basis;
1511 }
1512
1513 struct ovsdb_monitor *
1514 ovsdb_monitor_add(struct ovsdb_monitor *new_dbmon)
1515 {
1516 struct ovsdb_monitor *dbmon;
1517 size_t hash;
1518
1519 /* New_dbmon should be associated with only one jsonrpc
1520 * connections. */
1521 ovs_assert(ovs_list_is_singleton(&new_dbmon->jsonrpc_monitors));
1522
1523 ovsdb_monitor_columns_sort(new_dbmon);
1524
1525 hash = ovsdb_monitor_hash(new_dbmon, 0);
1526 HMAP_FOR_EACH_WITH_HASH(dbmon, hmap_node, hash, &ovsdb_monitors) {
1527 if (ovsdb_monitor_equal(dbmon, new_dbmon)) {
1528 return dbmon;
1529 }
1530 }
1531
1532 hmap_insert(&ovsdb_monitors, &new_dbmon->hmap_node, hash);
1533 return new_dbmon;
1534 }
1535
1536 static void
1537 ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
1538 {
1539 struct shash_node *node;
1540
1541 ovs_list_remove(&dbmon->list_node);
1542
1543 if (!hmap_node_is_null(&dbmon->hmap_node)) {
1544 hmap_remove(&ovsdb_monitors, &dbmon->hmap_node);
1545 }
1546
1547 ovsdb_monitor_json_cache_flush(dbmon);
1548 hmap_destroy(&dbmon->json_cache);
1549
1550 SHASH_FOR_EACH (node, &dbmon->tables) {
1551 struct ovsdb_monitor_table *mt = node->data;
1552 struct ovsdb_monitor_changes *changes, *next;
1553
1554 HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) {
1555 hmap_remove(&mt->changes, &changes->hmap_node);
1556 ovsdb_monitor_changes_destroy(changes);
1557 }
1558 hmap_destroy(&mt->changes);
1559 free(mt->columns);
1560 free(mt->columns_index_map);
1561 free(mt);
1562 }
1563 shash_destroy(&dbmon->tables);
1564 free(dbmon);
1565 }
1566
1567 static void
1568 ovsdb_monitor_commit(struct ovsdb_monitor *m, const struct ovsdb_txn *txn)
1569 {
1570 struct ovsdb_monitor_aux aux;
1571
1572 ovsdb_monitor_init_aux(&aux, m);
1573 /* Update ovsdb_monitor's transaction number for
1574 * each transaction, before calling ovsdb_monitor_change_cb(). */
1575 m->n_transactions++;
1576 ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
1577
1578 switch(aux.efficacy) {
1579 case OVSDB_CHANGES_NO_EFFECT:
1580 /* The transaction is ignored by the monitor.
1581 * Roll back the 'n_transactions' as if the transaction
1582 * has never happened. */
1583 m->n_transactions--;
1584 break;
1585 case OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE:
1586 /* Nothing. */
1587 break;
1588 case OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE:
1589 ovsdb_monitor_json_cache_flush(m);
1590 break;
1591 }
1592 }
1593
1594 void
1595 ovsdb_monitors_commit(struct ovsdb *db, const struct ovsdb_txn *txn)
1596 {
1597 struct ovsdb_monitor *m;
1598
1599 LIST_FOR_EACH (m, list_node, &db->monitors) {
1600 ovsdb_monitor_commit(m, txn);
1601 }
1602 }
1603
1604 void
1605 ovsdb_monitors_remove(struct ovsdb *db)
1606 {
1607 struct ovsdb_monitor *m, *next_m;
1608
1609 LIST_FOR_EACH_SAFE (m, next_m, list_node, &db->monitors) {
1610 struct jsonrpc_monitor_node *jm, *next_jm;
1611
1612 /* Delete all front-end monitors. Removing the last front-end monitor
1613 * will also destroy the corresponding ovsdb_monitor. */
1614 LIST_FOR_EACH_SAFE (jm, next_jm, node, &m->jsonrpc_monitors) {
1615 ovsdb_jsonrpc_monitor_destroy(jm->jsonrpc_monitor, false);
1616 }
1617 }
1618 }
1619
1620 /* Add some memory usage statics for monitors into 'usage', for use with
1621 * memory_report(). */
1622 void
1623 ovsdb_monitor_get_memory_usage(struct simap *usage)
1624 {
1625 struct ovsdb_monitor *dbmon;
1626 simap_put(usage, "monitors", hmap_count(&ovsdb_monitors));
1627
1628 HMAP_FOR_EACH(dbmon, hmap_node, &ovsdb_monitors) {
1629 simap_increase(usage, "json-caches", hmap_count(&dbmon->json_cache));
1630 }
1631 }
1632
1633 void
1634 ovsdb_monitor_prereplace_db(struct ovsdb *db)
1635 {
1636 struct ovsdb_monitor *m, *next_m;
1637
1638 LIST_FOR_EACH_SAFE (m, next_m, list_node, &db->monitors) {
1639 struct jsonrpc_monitor_node *jm, *next_jm;
1640
1641 /* Delete all front-end monitors. Removing the last front-end monitor
1642 * will also destroy the corresponding ovsdb_monitor. */
1643 LIST_FOR_EACH_SAFE (jm, next_jm, node, &m->jsonrpc_monitors) {
1644 ovsdb_jsonrpc_monitor_destroy(jm->jsonrpc_monitor, true);
1645 }
1646 }
1647 }