2 * Copyright (c) 2015 Nicira, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
23 #include "dynamic-string.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
35 #include "transaction.h"
36 #include "jsonrpc-server.h"
38 #include "openvswitch/vlog.h"
41 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class
;
42 static struct hmap ovsdb_monitors
= HMAP_INITIALIZER(&ovsdb_monitors
);
46 * ovsdb_monitor keep track of the ovsdb changes.
49 /* A collection of tables being monitored. */
50 struct ovsdb_monitor
{
51 struct ovsdb_replica replica
;
52 struct shash tables
; /* Holds "struct ovsdb_monitor_table"s. */
53 struct ovs_list jsonrpc_monitors
; /* Contains "jsonrpc_monitor_node"s. */
55 uint64_t n_transactions
; /* Count number of committed transactions. */
56 struct hmap_node hmap_node
; /* Elements within ovsdb_monitors. */
57 struct hmap json_cache
; /* Contains "ovsdb_monitor_json_cache_node"s.*/
60 /* A json object of updates between 'from_txn' and 'dbmon->n_transactions'
62 struct ovsdb_monitor_json_cache_node
{
63 struct hmap_node hmap_node
; /* Elements in json cache. */
64 enum ovsdb_monitor_version version
;
66 struct json
*json
; /* Null, or a cloned of json */
69 struct jsonrpc_monitor_node
{
70 struct ovsdb_jsonrpc_monitor
*jsonrpc_monitor
;
74 /* A particular column being monitored. */
75 struct ovsdb_monitor_column
{
76 const struct ovsdb_column
*column
;
77 enum ovsdb_monitor_selection select
;
80 /* A row that has changed in a monitored table. */
81 struct ovsdb_monitor_row
{
82 struct hmap_node hmap_node
; /* In ovsdb_jsonrpc_monitor_table.changes. */
83 struct uuid uuid
; /* UUID of row that changed. */
84 struct ovsdb_datum
*old
; /* Old data, NULL for an inserted row. */
85 struct ovsdb_datum
*new; /* New data, NULL for a deleted row. */
88 /* Contains 'struct ovsdb_monitor_row's for rows that have been
89 * updated but not yet flushed to all the jsonrpc connection.
91 * 'n_refs' represent the number of jsonrpc connections that have
92 * not received updates. Generate the update for the last jsonprc
93 * connection will also destroy the whole "struct ovsdb_monitor_changes"
96 * 'transaction' stores the first update's transaction id.
98 struct ovsdb_monitor_changes
{
99 struct ovsdb_monitor_table
*mt
;
102 uint64_t transaction
;
103 struct hmap_node hmap_node
; /* Element in ovsdb_monitor_tables' changes
107 /* A particular table being monitored. */
108 struct ovsdb_monitor_table
{
109 const struct ovsdb_table
*table
;
111 /* This is the union (bitwise-OR) of the 'select' values in all of the
112 * members of 'columns' below. */
113 enum ovsdb_monitor_selection select
;
115 /* Columns being monitored. */
116 struct ovsdb_monitor_column
*columns
;
119 /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
123 typedef struct json
*
124 (*compose_row_update_cb_func
)(const struct ovsdb_monitor_table
*mt
,
125 const struct ovsdb_monitor_row
*row
,
126 bool initial
, unsigned long int *changed
);
128 static void ovsdb_monitor_destroy(struct ovsdb_monitor
*dbmon
);
129 static struct ovsdb_monitor_changes
* ovsdb_monitor_table_add_changes(
130 struct ovsdb_monitor_table
*mt
, uint64_t next_txn
);
131 static struct ovsdb_monitor_changes
*ovsdb_monitor_table_find_changes(
132 struct ovsdb_monitor_table
*mt
, uint64_t unflushed
);
133 static void ovsdb_monitor_changes_destroy(
134 struct ovsdb_monitor_changes
*changes
);
135 static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table
*mt
,
139 json_cache_hash(enum ovsdb_monitor_version version
, uint64_t from_txn
)
143 hash
= hash_uint64(version
);
144 hash
= hash_uint64_basis(from_txn
, hash
);
149 static struct ovsdb_monitor_json_cache_node
*
150 ovsdb_monitor_json_cache_search(const struct ovsdb_monitor
*dbmon
,
151 enum ovsdb_monitor_version version
,
154 struct ovsdb_monitor_json_cache_node
*node
;
155 uint32_t hash
= json_cache_hash(version
, from_txn
);
157 HMAP_FOR_EACH_WITH_HASH(node
, hmap_node
, hash
, &dbmon
->json_cache
) {
158 if (node
->from_txn
== from_txn
&& node
->version
== version
) {
167 ovsdb_monitor_json_cache_insert(struct ovsdb_monitor
*dbmon
,
168 enum ovsdb_monitor_version version
,
169 uint64_t from_txn
, struct json
*json
)
171 struct ovsdb_monitor_json_cache_node
*node
;
172 uint32_t hash
= json_cache_hash(version
, from_txn
);
174 node
= xmalloc(sizeof *node
);
176 node
->version
= version
;
177 node
->from_txn
= from_txn
;
178 node
->json
= json
? json_clone(json
) : NULL
;
180 hmap_insert(&dbmon
->json_cache
, &node
->hmap_node
, hash
);
184 ovsdb_monitor_json_cache_flush(struct ovsdb_monitor
*dbmon
)
186 struct ovsdb_monitor_json_cache_node
*node
, *next
;
188 HMAP_FOR_EACH_SAFE(node
, next
, hmap_node
, &dbmon
->json_cache
) {
189 hmap_remove(&dbmon
->json_cache
, &node
->hmap_node
);
190 json_destroy(node
->json
);
196 compare_ovsdb_monitor_column(const void *a_
, const void *b_
)
198 const struct ovsdb_monitor_column
*a
= a_
;
199 const struct ovsdb_monitor_column
*b
= b_
;
201 return a
->column
< b
->column
? -1 : a
->column
> b
->column
;
204 static struct ovsdb_monitor
*
205 ovsdb_monitor_cast(struct ovsdb_replica
*replica
)
207 ovs_assert(replica
->class == &ovsdb_jsonrpc_replica_class
);
208 return CONTAINER_OF(replica
, struct ovsdb_monitor
, replica
);
211 /* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
212 * given 'uuid', or NULL if there is no such row. */
213 static struct ovsdb_monitor_row
*
214 ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes
*changes
,
215 const struct uuid
*uuid
)
217 struct ovsdb_monitor_row
*row
;
219 HMAP_FOR_EACH_WITH_HASH (row
, hmap_node
, uuid_hash(uuid
),
221 if (uuid_equals(uuid
, &row
->uuid
)) {
228 /* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
229 * copies of the data in 'row' drawn from the columns represented by
230 * mt->columns[]. Returns the array.
232 * If 'row' is NULL, returns NULL. */
233 static struct ovsdb_datum
*
234 clone_monitor_row_data(const struct ovsdb_monitor_table
*mt
,
235 const struct ovsdb_row
*row
)
237 struct ovsdb_datum
*data
;
244 data
= xmalloc(mt
->n_columns
* sizeof *data
);
245 for (i
= 0; i
< mt
->n_columns
; i
++) {
246 const struct ovsdb_column
*c
= mt
->columns
[i
].column
;
247 const struct ovsdb_datum
*src
= &row
->fields
[c
->index
];
248 struct ovsdb_datum
*dst
= &data
[i
];
249 const struct ovsdb_type
*type
= &c
->type
;
251 ovsdb_datum_clone(dst
, src
, type
);
256 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
257 * in 'row' drawn from the columns represented by mt->columns[]. */
259 update_monitor_row_data(const struct ovsdb_monitor_table
*mt
,
260 const struct ovsdb_row
*row
,
261 struct ovsdb_datum
*data
)
265 for (i
= 0; i
< mt
->n_columns
; i
++) {
266 const struct ovsdb_column
*c
= mt
->columns
[i
].column
;
267 const struct ovsdb_datum
*src
= &row
->fields
[c
->index
];
268 struct ovsdb_datum
*dst
= &data
[i
];
269 const struct ovsdb_type
*type
= &c
->type
;
271 if (!ovsdb_datum_equals(src
, dst
, type
)) {
272 ovsdb_datum_destroy(dst
, type
);
273 ovsdb_datum_clone(dst
, src
, type
);
278 /* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
279 * from mt->columns[], plus 'data' itself. */
281 free_monitor_row_data(const struct ovsdb_monitor_table
*mt
,
282 struct ovsdb_datum
*data
)
287 for (i
= 0; i
< mt
->n_columns
; i
++) {
288 const struct ovsdb_column
*c
= mt
->columns
[i
].column
;
290 ovsdb_datum_destroy(&data
[i
], &c
->type
);
296 /* Frees 'row', which must have been created from 'mt'. */
298 ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table
*mt
,
299 struct ovsdb_monitor_row
*row
)
302 free_monitor_row_data(mt
, row
->old
);
303 free_monitor_row_data(mt
, row
->new);
309 ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor
*dbmon
,
310 struct ovsdb_jsonrpc_monitor
*jsonrpc_monitor
)
312 struct jsonrpc_monitor_node
*jm
;
314 jm
= xzalloc(sizeof *jm
);
315 jm
->jsonrpc_monitor
= jsonrpc_monitor
;
316 list_push_back(&dbmon
->jsonrpc_monitors
, &jm
->node
);
319 struct ovsdb_monitor
*
320 ovsdb_monitor_create(struct ovsdb
*db
,
321 struct ovsdb_jsonrpc_monitor
*jsonrpc_monitor
)
323 struct ovsdb_monitor
*dbmon
;
325 dbmon
= xzalloc(sizeof *dbmon
);
327 ovsdb_replica_init(&dbmon
->replica
, &ovsdb_jsonrpc_replica_class
);
328 ovsdb_add_replica(db
, &dbmon
->replica
);
329 list_init(&dbmon
->jsonrpc_monitors
);
331 dbmon
->n_transactions
= 0;
332 shash_init(&dbmon
->tables
);
333 hmap_node_nullify(&dbmon
->hmap_node
);
334 hmap_init(&dbmon
->json_cache
);
336 ovsdb_monitor_add_jsonrpc_monitor(dbmon
, jsonrpc_monitor
);
341 ovsdb_monitor_add_table(struct ovsdb_monitor
*m
,
342 const struct ovsdb_table
*table
)
344 struct ovsdb_monitor_table
*mt
;
346 mt
= xzalloc(sizeof *mt
);
348 shash_add(&m
->tables
, table
->schema
->name
, mt
);
349 hmap_init(&mt
->changes
);
353 ovsdb_monitor_add_column(struct ovsdb_monitor
*dbmon
,
354 const struct ovsdb_table
*table
,
355 const struct ovsdb_column
*column
,
356 enum ovsdb_monitor_selection select
,
357 size_t *allocated_columns
)
359 struct ovsdb_monitor_table
*mt
;
360 struct ovsdb_monitor_column
*c
;
362 mt
= shash_find_data(&dbmon
->tables
, table
->schema
->name
);
364 if (mt
->n_columns
>= *allocated_columns
) {
365 mt
->columns
= x2nrealloc(mt
->columns
, allocated_columns
,
366 sizeof *mt
->columns
);
369 mt
->select
|= select
;
370 c
= &mt
->columns
[mt
->n_columns
++];
375 /* Check for duplicated column names. Return the first
376 * duplicated column's name if found. Otherwise return
378 const char * OVS_WARN_UNUSED_RESULT
379 ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor
*m
,
380 const struct ovsdb_table
*table
)
382 struct ovsdb_monitor_table
*mt
;
385 mt
= shash_find_data(&m
->tables
, table
->schema
->name
);
388 /* Check for duplicate columns. */
389 qsort(mt
->columns
, mt
->n_columns
, sizeof *mt
->columns
,
390 compare_ovsdb_monitor_column
);
391 for (i
= 1; i
< mt
->n_columns
; i
++) {
392 if (mt
->columns
[i
].column
== mt
->columns
[i
- 1].column
) {
393 return mt
->columns
[i
].column
->name
;
401 static struct ovsdb_monitor_changes
*
402 ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table
*mt
,
405 struct ovsdb_monitor_changes
*changes
;
407 changes
= xzalloc(sizeof *changes
);
409 changes
->transaction
= next_txn
;
412 hmap_init(&changes
->rows
);
413 hmap_insert(&mt
->changes
, &changes
->hmap_node
, hash_uint64(next_txn
));
418 static struct ovsdb_monitor_changes
*
419 ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table
*mt
,
420 uint64_t transaction
)
422 struct ovsdb_monitor_changes
*changes
;
423 size_t hash
= hash_uint64(transaction
);
425 HMAP_FOR_EACH_WITH_HASH(changes
, hmap_node
, hash
, &mt
->changes
) {
426 if (changes
->transaction
== transaction
) {
434 /* Stop currently tracking changes to table 'mt' since 'transaction'. */
436 ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table
*mt
,
437 uint64_t transaction
)
439 struct ovsdb_monitor_changes
*changes
=
440 ovsdb_monitor_table_find_changes(mt
, transaction
);
442 if (--changes
->n_refs
== 0) {
443 hmap_remove(&mt
->changes
, &changes
->hmap_node
);
444 ovsdb_monitor_changes_destroy(changes
);
449 /* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
452 ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table
*mt
,
453 uint64_t transaction
)
455 struct ovsdb_monitor_changes
*changes
;
457 changes
= ovsdb_monitor_table_find_changes(mt
, transaction
);
461 ovsdb_monitor_table_add_changes(mt
, transaction
);
466 ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes
*changes
)
468 struct ovsdb_monitor_row
*row
, *next
;
470 HMAP_FOR_EACH_SAFE (row
, next
, hmap_node
, &changes
->rows
) {
471 hmap_remove(&changes
->rows
, &row
->hmap_node
);
472 ovsdb_monitor_row_destroy(changes
->mt
, row
);
474 hmap_destroy(&changes
->rows
);
478 static enum ovsdb_monitor_selection
479 ovsdb_monitor_row_update_type(bool initial
, const bool old
, const bool new)
481 return initial
? OJMS_INITIAL
487 ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table
*mt
,
488 const struct ovsdb_monitor_row
*row
,
489 enum ovsdb_monitor_selection type
,
490 unsigned long int *changed
)
492 if (!(mt
->select
& type
)) {
496 if (type
== OJMS_MODIFY
) {
500 memset(changed
, 0, bitmap_n_bytes(mt
->n_columns
));
501 for (i
= 0; i
< mt
->n_columns
; i
++) {
502 const struct ovsdb_column
*c
= mt
->columns
[i
].column
;
503 if (!ovsdb_datum_equals(&row
->old
[i
], &row
->new[i
], &c
->type
)) {
504 bitmap_set1(changed
, i
);
509 /* No actual changes: presumably a row changed and then
510 * changed back later. */
518 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
519 * 'mt', or NULL if no row update should be sent.
521 * The caller should specify 'initial' as true if the returned JSON is going to
522 * be used as part of the initial reply to a "monitor" request, false if it is
523 * going to be used as part of an "update" notification.
525 * 'changed' must be a scratch buffer for internal use that is at least
526 * bitmap_n_bytes(mt->n_columns) bytes long. */
528 ovsdb_monitor_compose_row_update(
529 const struct ovsdb_monitor_table
*mt
,
530 const struct ovsdb_monitor_row
*row
,
531 bool initial
, unsigned long int *changed
)
533 enum ovsdb_monitor_selection type
;
534 struct json
*old_json
, *new_json
;
535 struct json
*row_json
;
538 type
= ovsdb_monitor_row_update_type(initial
, row
->old
, row
->new);
539 if (ovsdb_monitor_row_skip_update(mt
, row
, type
, changed
)) {
543 row_json
= json_object_create();
544 old_json
= new_json
= NULL
;
545 if (type
& (OJMS_DELETE
| OJMS_MODIFY
)) {
546 old_json
= json_object_create();
547 json_object_put(row_json
, "old", old_json
);
549 if (type
& (OJMS_INITIAL
| OJMS_INSERT
| OJMS_MODIFY
)) {
550 new_json
= json_object_create();
551 json_object_put(row_json
, "new", new_json
);
553 for (i
= 0; i
< mt
->n_columns
; i
++) {
554 const struct ovsdb_monitor_column
*c
= &mt
->columns
[i
];
556 if (!(type
& c
->select
)) {
557 /* We don't care about this type of change for this
558 * particular column (but we will care about it for some
563 if ((type
== OJMS_MODIFY
&& bitmap_is_set(changed
, i
))
564 || type
== OJMS_DELETE
) {
565 json_object_put(old_json
, c
->column
->name
,
566 ovsdb_datum_to_json(&row
->old
[i
],
569 if (type
& (OJMS_INITIAL
| OJMS_INSERT
| OJMS_MODIFY
)) {
570 json_object_put(new_json
, c
->column
->name
,
571 ovsdb_datum_to_json(&row
->new[i
],
579 /* Returns JSON for a <row-update2> (as described in ovsdb-server(1) mapage)
580 * for 'row' within * 'mt', or NULL if no row update should be sent.
582 * The caller should specify 'initial' as true if the returned JSON is
583 * going to be used as part of the initial reply to a "monitor2" request,
584 * false if it is going to be used as part of an "update2" notification.
586 * 'changed' must be a scratch buffer for internal use that is at least
587 * bitmap_n_bytes(mt->n_columns) bytes long. */
589 ovsdb_monitor_compose_row_update2(
590 const struct ovsdb_monitor_table
*mt
,
591 const struct ovsdb_monitor_row
*row
,
592 bool initial
, unsigned long int *changed
)
594 enum ovsdb_monitor_selection type
;
595 struct json
*row_update2
, *diff_json
;
598 type
= ovsdb_monitor_row_update_type(initial
, row
->old
, row
->new);
599 if (ovsdb_monitor_row_skip_update(mt
, row
, type
, changed
)) {
603 row_update2
= json_object_create();
604 if (type
== OJMS_DELETE
) {
605 json_object_put(row_update2
, "delete", json_null_create());
607 diff_json
= json_object_create();
610 for (i
= 0; i
< mt
->n_columns
; i
++) {
611 const struct ovsdb_monitor_column
*c
= &mt
->columns
[i
];
613 if (!(type
& c
->select
)) {
614 /* We don't care about this type of change for this
615 * particular column (but we will care about it for some
620 if (type
== OJMS_MODIFY
) {
621 struct ovsdb_datum diff
;
623 if (!bitmap_is_set(changed
, i
)) {
627 ovsdb_datum_diff(&diff
,&row
->old
[i
], &row
->new[i
],
629 json_object_put(diff_json
, c
->column
->name
,
630 ovsdb_datum_to_json(&diff
, &c
->column
->type
));
631 ovsdb_datum_destroy(&diff
, &c
->column
->type
);
633 if (!ovsdb_datum_is_default(&row
->new[i
], &c
->column
->type
)) {
634 json_object_put(diff_json
, c
->column
->name
,
635 ovsdb_datum_to_json(&row
->new[i
],
641 op
= type
== OJMS_INITIAL
? "initial"
642 : type
== OJMS_MODIFY
? "modify" : "insert";
643 json_object_put(row_update2
, op
, diff_json
);
650 ovsdb_monitor_max_columns(struct ovsdb_monitor
*dbmon
)
652 struct shash_node
*node
;
653 size_t max_columns
= 0;
655 SHASH_FOR_EACH (node
, &dbmon
->tables
) {
656 struct ovsdb_monitor_table
*mt
= node
->data
;
658 max_columns
= MAX(max_columns
, mt
->n_columns
);
664 /* Constructs and returns JSON for a <table-updates> object (as described in
665 * RFC 7047) for all the outstanding changes within 'monitor', starting from
668 ovsdb_monitor_compose_update(struct ovsdb_monitor
*dbmon
,
669 bool initial
, uint64_t transaction
,
670 compose_row_update_cb_func row_update
)
672 struct shash_node
*node
;
674 size_t max_columns
= ovsdb_monitor_max_columns(dbmon
);
675 unsigned long int *changed
= xmalloc(bitmap_n_bytes(max_columns
));
678 SHASH_FOR_EACH (node
, &dbmon
->tables
) {
679 struct ovsdb_monitor_table
*mt
= node
->data
;
680 struct ovsdb_monitor_row
*row
, *next
;
681 struct ovsdb_monitor_changes
*changes
;
682 struct json
*table_json
= NULL
;
684 changes
= ovsdb_monitor_table_find_changes(mt
, transaction
);
689 HMAP_FOR_EACH_SAFE (row
, next
, hmap_node
, &changes
->rows
) {
690 struct json
*row_json
;
692 row_json
= (*row_update
)(mt
, row
, initial
, changed
);
694 char uuid
[UUID_LEN
+ 1];
696 /* Create JSON object for transaction overall. */
698 json
= json_object_create();
701 /* Create JSON object for transaction on this table. */
703 table_json
= json_object_create();
704 json_object_put(json
, mt
->table
->schema
->name
, table_json
);
707 /* Add JSON row to JSON table. */
708 snprintf(uuid
, sizeof uuid
, UUID_FMT
, UUID_ARGS(&row
->uuid
));
709 json_object_put(table_json
, uuid
, row_json
);
718 /* Returns JSON for a <table-updates> object (as described in RFC 7047)
719 * for all the outstanding changes within 'monitor' that starts from
720 * '*unflushed' transaction id.
722 * The caller should specify 'initial' as true if the returned JSON is going to
723 * be used as part of the initial reply to a "monitor" request, false if it is
724 * going to be used as part of an "update" notification. */
726 ovsdb_monitor_get_update(struct ovsdb_monitor
*dbmon
,
727 bool initial
, uint64_t *unflushed
,
728 enum ovsdb_monitor_version version
)
730 struct ovsdb_monitor_json_cache_node
*cache_node
;
731 struct shash_node
*node
;
733 uint64_t prev_txn
= *unflushed
;
734 uint64_t next_txn
= dbmon
->n_transactions
+ 1;
736 /* Return a clone of cached json if one exists. Otherwise,
737 * generate a new one and add it to the cache. */
738 cache_node
= ovsdb_monitor_json_cache_search(dbmon
, version
, prev_txn
);
740 json
= cache_node
->json
? json_clone(cache_node
->json
) : NULL
;
742 if (version
== OVSDB_MONITOR_V1
) {
743 json
= ovsdb_monitor_compose_update(dbmon
, initial
, prev_txn
,
744 ovsdb_monitor_compose_row_update
);
746 ovs_assert(version
== OVSDB_MONITOR_V2
);
747 json
= ovsdb_monitor_compose_update(dbmon
, initial
, prev_txn
,
748 ovsdb_monitor_compose_row_update2
);
750 ovsdb_monitor_json_cache_insert(dbmon
, version
, prev_txn
, json
);
753 /* Maintain transaction id of 'changes'. */
754 SHASH_FOR_EACH (node
, &dbmon
->tables
) {
755 struct ovsdb_monitor_table
*mt
= node
->data
;
757 ovsdb_monitor_table_untrack_changes(mt
, prev_txn
);
758 ovsdb_monitor_table_track_changes(mt
, next_txn
);
760 *unflushed
= next_txn
;
766 ovsdb_monitor_needs_flush(struct ovsdb_monitor
*dbmon
,
767 uint64_t next_transaction
)
769 ovs_assert(next_transaction
<= dbmon
->n_transactions
+ 1);
770 return (next_transaction
<= dbmon
->n_transactions
);
774 ovsdb_monitor_table_add_select(struct ovsdb_monitor
*dbmon
,
775 const struct ovsdb_table
*table
,
776 enum ovsdb_monitor_selection select
)
778 struct ovsdb_monitor_table
* mt
;
780 mt
= shash_find_data(&dbmon
->tables
, table
->schema
->name
);
781 mt
->select
|= select
;
785 * If a row's change type (insert, delete or modify) matches that of
786 * the monitor, they should be sent to the monitor's clients as updates.
787 * Of cause, the monitor should also internally update with this change.
789 * When a change type does not require client side update, the monitor
790 * may still need to keep track of certain changes in order to generate
791 * correct future updates. For example, the monitor internal state should
792 * be updated whenever a new row is inserted, in order to generate the
793 * correct initial state, regardless if a insert change type is being
796 * On the other hand, if a transaction only contains changes to columns
797 * that are not monitored, this transaction can be safely ignored by the
800 * Thus, the order of the declaration is important:
801 * 'OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE' always implies
802 * 'OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE', but not vice versa. */
803 enum ovsdb_monitor_changes_efficacy
{
804 OVSDB_CHANGES_NO_EFFECT
, /* Monitor does not care about this
806 OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE
, /* Monitor internal updates. */
807 OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE
, /* Client needs to be updated. */
810 struct ovsdb_monitor_aux
{
811 const struct ovsdb_monitor
*monitor
;
812 struct ovsdb_monitor_table
*mt
;
813 enum ovsdb_monitor_changes_efficacy efficacy
;
817 ovsdb_monitor_init_aux(struct ovsdb_monitor_aux
*aux
,
818 const struct ovsdb_monitor
*m
)
822 aux
->efficacy
= OVSDB_CHANGES_NO_EFFECT
;
826 ovsdb_monitor_changes_update(const struct ovsdb_row
*old
,
827 const struct ovsdb_row
*new,
828 const struct ovsdb_monitor_table
*mt
,
829 struct ovsdb_monitor_changes
*changes
)
831 const struct uuid
*uuid
= ovsdb_row_get_uuid(new ? new : old
);
832 struct ovsdb_monitor_row
*change
;
834 change
= ovsdb_monitor_changes_row_find(changes
, uuid
);
836 change
= xzalloc(sizeof *change
);
837 hmap_insert(&changes
->rows
, &change
->hmap_node
, uuid_hash(uuid
));
838 change
->uuid
= *uuid
;
839 change
->old
= clone_monitor_row_data(mt
, old
);
840 change
->new = clone_monitor_row_data(mt
, new);
843 update_monitor_row_data(mt
, new, change
->new);
845 free_monitor_row_data(mt
, change
->new);
849 /* This row was added then deleted. Forget about it. */
850 hmap_remove(&changes
->rows
, &change
->hmap_node
);
858 ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table
*mt
,
859 const unsigned long int *changed
)
863 for (i
= 0; i
< mt
->n_columns
; i
++) {
864 size_t column_index
= mt
->columns
[i
].column
->index
;
866 if (bitmap_is_set(changed
, column_index
)) {
874 /* Return the efficacy of a row's change to a monitor table.
876 * Please see the block comment above 'ovsdb_monitor_changes_efficacy'
877 * definition form more information. */
878 static enum ovsdb_monitor_changes_efficacy
879 ovsdb_monitor_changes_classify(enum ovsdb_monitor_selection type
,
880 const struct ovsdb_monitor_table
*mt
,
881 const unsigned long int *changed
)
883 if (type
== OJMS_MODIFY
&&
884 !ovsdb_monitor_columns_changed(mt
, changed
)) {
885 return OVSDB_CHANGES_NO_EFFECT
;
888 return (mt
->select
& type
)
889 ? OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE
890 : OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE
;
894 ovsdb_monitor_change_cb(const struct ovsdb_row
*old
,
895 const struct ovsdb_row
*new,
896 const unsigned long int *changed
,
899 struct ovsdb_monitor_aux
*aux
= aux_
;
900 const struct ovsdb_monitor
*m
= aux
->monitor
;
901 struct ovsdb_table
*table
= new ? new->table
: old
->table
;
902 struct ovsdb_monitor_table
*mt
;
903 struct ovsdb_monitor_changes
*changes
;
905 if (!aux
->mt
|| table
!= aux
->mt
->table
) {
906 aux
->mt
= shash_find_data(&m
->tables
, table
->schema
->name
);
908 /* We don't care about rows in this table at all. Tell the caller
915 HMAP_FOR_EACH(changes
, hmap_node
, &mt
->changes
) {
916 enum ovsdb_monitor_changes_efficacy efficacy
;
917 enum ovsdb_monitor_selection type
;
919 type
= ovsdb_monitor_row_update_type(false, old
, new);
920 efficacy
= ovsdb_monitor_changes_classify(type
, mt
, changed
);
921 if (efficacy
> OVSDB_CHANGES_NO_EFFECT
) {
922 ovsdb_monitor_changes_update(old
, new, mt
, changes
);
925 if (aux
->efficacy
< efficacy
) {
926 aux
->efficacy
= efficacy
;
934 ovsdb_monitor_get_initial(const struct ovsdb_monitor
*dbmon
)
936 struct ovsdb_monitor_aux aux
;
937 struct shash_node
*node
;
939 ovsdb_monitor_init_aux(&aux
, dbmon
);
940 SHASH_FOR_EACH (node
, &dbmon
->tables
) {
941 struct ovsdb_monitor_table
*mt
= node
->data
;
943 if (mt
->select
& OJMS_INITIAL
) {
944 struct ovsdb_row
*row
;
945 struct ovsdb_monitor_changes
*changes
;
947 changes
= ovsdb_monitor_table_find_changes(mt
, 0);
949 changes
= ovsdb_monitor_table_add_changes(mt
, 0);
950 HMAP_FOR_EACH (row
, hmap_node
, &mt
->table
->rows
) {
951 ovsdb_monitor_changes_update(NULL
, row
, mt
, changes
);
961 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor
*dbmon
,
962 struct ovsdb_jsonrpc_monitor
*jsonrpc_monitor
)
964 struct jsonrpc_monitor_node
*jm
;
966 if (list_is_empty(&dbmon
->jsonrpc_monitors
)) {
967 ovsdb_monitor_destroy(dbmon
);
971 /* Find and remove the jsonrpc monitor from the list. */
972 LIST_FOR_EACH(jm
, node
, &dbmon
->jsonrpc_monitors
) {
973 if (jm
->jsonrpc_monitor
== jsonrpc_monitor
) {
974 list_remove(&jm
->node
);
977 /* Destroy ovsdb monitor if this is the last user. */
978 if (list_is_empty(&dbmon
->jsonrpc_monitors
)) {
979 ovsdb_monitor_destroy(dbmon
);
986 /* Should never reach here. jsonrpc_monitor should be on the list. */
991 ovsdb_monitor_table_equal(const struct ovsdb_monitor_table
*a
,
992 const struct ovsdb_monitor_table
*b
)
996 if ((a
->table
!= b
->table
) ||
997 (a
->select
!= b
->select
) ||
998 (a
->n_columns
!= b
->n_columns
)) {
1002 for (i
= 0; i
< a
->n_columns
; i
++) {
1003 if ((a
->columns
[i
].column
!= b
->columns
[i
].column
) ||
1004 (a
->columns
[i
].select
!= b
->columns
[i
].select
)) {
1013 ovsdb_monitor_equal(const struct ovsdb_monitor
*a
,
1014 const struct ovsdb_monitor
*b
)
1016 struct shash_node
*node
;
1018 if (shash_count(&a
->tables
) != shash_count(&b
->tables
)) {
1022 SHASH_FOR_EACH(node
, &a
->tables
) {
1023 const struct ovsdb_monitor_table
*mta
= node
->data
;
1024 const struct ovsdb_monitor_table
*mtb
;
1026 mtb
= shash_find_data(&b
->tables
, node
->name
);
1031 if (!ovsdb_monitor_table_equal(mta
, mtb
)) {
1040 ovsdb_monitor_hash(const struct ovsdb_monitor
*dbmon
, size_t basis
)
1042 const struct shash_node
**nodes
;
1045 nodes
= shash_sort(&dbmon
->tables
);
1046 n
= shash_count(&dbmon
->tables
);
1048 for (i
= 0; i
< n
; i
++) {
1049 struct ovsdb_monitor_table
*mt
= nodes
[i
]->data
;
1051 basis
= hash_pointer(mt
->table
, basis
);
1052 basis
= hash_3words(mt
->select
, mt
->n_columns
, basis
);
1054 for (j
= 0; j
< mt
->n_columns
; j
++) {
1055 basis
= hash_pointer(mt
->columns
[j
].column
, basis
);
1056 basis
= hash_2words(mt
->columns
[j
].select
, basis
);
1064 struct ovsdb_monitor
*
1065 ovsdb_monitor_add(struct ovsdb_monitor
*new_dbmon
)
1067 struct ovsdb_monitor
*dbmon
;
1070 /* New_dbmon should be associated with only one jsonrpc
1072 ovs_assert(list_is_singleton(&new_dbmon
->jsonrpc_monitors
));
1074 hash
= ovsdb_monitor_hash(new_dbmon
, 0);
1075 HMAP_FOR_EACH_WITH_HASH(dbmon
, hmap_node
, hash
, &ovsdb_monitors
) {
1076 if (ovsdb_monitor_equal(dbmon
, new_dbmon
)) {
1081 hmap_insert(&ovsdb_monitors
, &new_dbmon
->hmap_node
, hash
);
1086 ovsdb_monitor_destroy(struct ovsdb_monitor
*dbmon
)
1088 struct shash_node
*node
;
1090 list_remove(&dbmon
->replica
.node
);
1092 if (!hmap_node_is_null(&dbmon
->hmap_node
)) {
1093 hmap_remove(&ovsdb_monitors
, &dbmon
->hmap_node
);
1096 ovsdb_monitor_json_cache_flush(dbmon
);
1097 hmap_destroy(&dbmon
->json_cache
);
1099 SHASH_FOR_EACH (node
, &dbmon
->tables
) {
1100 struct ovsdb_monitor_table
*mt
= node
->data
;
1101 struct ovsdb_monitor_changes
*changes
, *next
;
1103 HMAP_FOR_EACH_SAFE (changes
, next
, hmap_node
, &mt
->changes
) {
1104 hmap_remove(&mt
->changes
, &changes
->hmap_node
);
1105 ovsdb_monitor_changes_destroy(changes
);
1107 hmap_destroy(&mt
->changes
);
1111 shash_destroy(&dbmon
->tables
);
1115 static struct ovsdb_error
*
1116 ovsdb_monitor_commit(struct ovsdb_replica
*replica
,
1117 const struct ovsdb_txn
*txn
,
1118 bool durable OVS_UNUSED
)
1120 struct ovsdb_monitor
*m
= ovsdb_monitor_cast(replica
);
1121 struct ovsdb_monitor_aux aux
;
1123 ovsdb_monitor_init_aux(&aux
, m
);
1124 ovsdb_txn_for_each_change(txn
, ovsdb_monitor_change_cb
, &aux
);
1126 if (aux
.efficacy
== OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE
) {
1127 ovsdb_monitor_json_cache_flush(m
);
1128 m
->n_transactions
++;
1135 ovsdb_monitor_destroy_callback(struct ovsdb_replica
*replica
)
1137 struct ovsdb_monitor
*dbmon
= ovsdb_monitor_cast(replica
);
1138 struct jsonrpc_monitor_node
*jm
, *next
;
1140 /* Delete all front end monitors. Removing the last front
1141 * end monitor will also destroy the corresponding 'ovsdb_monitor'.
1142 * ovsdb monitor will also be destroied. */
1143 LIST_FOR_EACH_SAFE(jm
, next
, node
, &dbmon
->jsonrpc_monitors
) {
1144 ovsdb_jsonrpc_monitor_destroy(jm
->jsonrpc_monitor
);
1148 /* Add some memory usage statics for monitors into 'usage', for use with
1149 * memory_report(). */
1151 ovsdb_monitor_get_memory_usage(struct simap
*usage
)
1153 simap_put(usage
, "monitors", hmap_count(&ovsdb_monitors
));
1156 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class
= {
1157 ovsdb_monitor_commit
,
1158 ovsdb_monitor_destroy_callback
,