1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 #include "jsonrpc-server.h"
24 #include "dynamic-string.h"
27 #include "ovsdb-error.h"
28 #include "ovsdb-parser.h"
30 #include "reconnect.h"
37 #include "transaction.h"
41 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server
);
43 struct ovsdb_jsonrpc_remote
;
44 struct ovsdb_jsonrpc_session
;
46 /* Message rate-limiting. */
47 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
50 static struct ovsdb_jsonrpc_session
*ovsdb_jsonrpc_session_create(
51 struct ovsdb_jsonrpc_remote
*, struct jsonrpc_session
*);
52 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote
*);
53 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote
*);
54 static void ovsdb_jsonrpc_session_get_memory_usage_all(
55 const struct ovsdb_jsonrpc_remote
*, struct simap
*usage
);
56 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote
*);
57 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote
*);
58 static void ovsdb_jsonrpc_session_set_all_options(
59 struct ovsdb_jsonrpc_remote
*, const struct ovsdb_jsonrpc_options
*);
60 static bool ovsdb_jsonrpc_session_get_status(
61 const struct ovsdb_jsonrpc_remote
*,
62 struct ovsdb_jsonrpc_remote_status
*);
63 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session
*);
64 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter
*);
67 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session
*,
69 struct json
*id
, struct json
*params
);
70 static struct ovsdb_jsonrpc_trigger
*ovsdb_jsonrpc_trigger_find(
71 struct ovsdb_jsonrpc_session
*, const struct json
*id
, size_t hash
);
72 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger
*);
73 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session
*);
74 static void ovsdb_jsonrpc_trigger_complete_done(
75 struct ovsdb_jsonrpc_session
*);
78 static struct json
*ovsdb_jsonrpc_monitor_create(
79 struct ovsdb_jsonrpc_session
*, struct ovsdb
*, struct json
*params
);
80 static struct jsonrpc_msg
*ovsdb_jsonrpc_monitor_cancel(
81 struct ovsdb_jsonrpc_session
*,
82 struct json_array
*params
,
83 const struct json
*request_id
);
84 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session
*);
85 static size_t ovsdb_jsonrpc_monitor_json_length_all(
86 struct ovsdb_jsonrpc_session
*);
88 /* JSON-RPC database server. */
90 struct ovsdb_jsonrpc_server
{
91 struct ovsdb_server up
;
92 unsigned int n_sessions
, max_sessions
;
93 struct shash remotes
; /* Contains "struct ovsdb_jsonrpc_remote *"s. */
96 /* A configured remote. This is either a passive stream listener plus a list
97 * of the currently connected sessions, or a list of exactly one active
99 struct ovsdb_jsonrpc_remote
{
100 struct ovsdb_jsonrpc_server
*server
;
101 struct pstream
*listener
; /* Listener, if passive. */
102 struct list sessions
; /* List of "struct ovsdb_jsonrpc_session"s. */
106 static struct ovsdb_jsonrpc_remote
*ovsdb_jsonrpc_server_add_remote(
107 struct ovsdb_jsonrpc_server
*, const char *name
,
108 const struct ovsdb_jsonrpc_options
*options
110 static void ovsdb_jsonrpc_server_del_remote(struct shash_node
*);
112 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
114 * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
115 * which 'server' should provide access. */
116 struct ovsdb_jsonrpc_server
*
117 ovsdb_jsonrpc_server_create(void)
119 struct ovsdb_jsonrpc_server
*server
= xzalloc(sizeof *server
);
120 ovsdb_server_init(&server
->up
);
121 server
->max_sessions
= 64;
122 shash_init(&server
->remotes
);
126 /* Adds 'db' to the set of databases served out by 'svr'. Returns true if
127 * successful, false if 'db''s name is the same as some database already in
130 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server
*svr
, struct ovsdb
*db
)
132 /* The OVSDB protocol doesn't have a way to notify a client that a
133 * database has been added. If some client tried to use the database
134 * that we're adding and failed, then forcing it to reconnect seems like
135 * a reasonable way to make it try again.
137 * If this is too big of a hammer in practice, we could be more selective,
138 * e.g. disconnect only connections that actually tried to use a database
139 * with 'db''s name. */
140 ovsdb_jsonrpc_server_reconnect(svr
);
142 return ovsdb_server_add_db(&svr
->up
, db
);
145 /* Removes 'db' from the set of databases served out by 'svr'. Returns
146 * true if successful, false if there is no database associated with 'db'. */
148 ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server
*svr
,
151 /* There might be pointers to 'db' from 'svr', such as monitors or
152 * outstanding transactions. Disconnect all JSON-RPC connections to avoid
153 * accesses to freed memory.
155 * If this is too big of a hammer in practice, we could be more selective,
156 * e.g. disconnect only connections that actually reference 'db'. */
157 ovsdb_jsonrpc_server_reconnect(svr
);
159 return ovsdb_server_remove_db(&svr
->up
, db
);
163 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server
*svr
)
165 struct shash_node
*node
, *next
;
167 SHASH_FOR_EACH_SAFE (node
, next
, &svr
->remotes
) {
168 ovsdb_jsonrpc_server_del_remote(node
);
170 shash_destroy(&svr
->remotes
);
171 ovsdb_server_destroy(&svr
->up
);
175 struct ovsdb_jsonrpc_options
*
176 ovsdb_jsonrpc_default_options(const char *target
)
178 struct ovsdb_jsonrpc_options
*options
= xzalloc(sizeof *options
);
179 options
->max_backoff
= RECONNECT_DEFAULT_MAX_BACKOFF
;
180 options
->probe_interval
= (stream_or_pstream_needs_probes(target
)
181 ? RECONNECT_DEFAULT_PROBE_INTERVAL
186 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
187 * options in the struct ovsdb_jsonrpc_options supplied as the data values.
189 * A remote is an active or passive stream connection method, e.g. "pssl:" or
192 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server
*svr
,
193 const struct shash
*new_remotes
)
195 struct shash_node
*node
, *next
;
197 SHASH_FOR_EACH_SAFE (node
, next
, &svr
->remotes
) {
198 if (!shash_find(new_remotes
, node
->name
)) {
199 VLOG_INFO("%s: remote deconfigured", node
->name
);
200 ovsdb_jsonrpc_server_del_remote(node
);
203 SHASH_FOR_EACH (node
, new_remotes
) {
204 const struct ovsdb_jsonrpc_options
*options
= node
->data
;
205 struct ovsdb_jsonrpc_remote
*remote
;
207 remote
= shash_find_data(&svr
->remotes
, node
->name
);
209 remote
= ovsdb_jsonrpc_server_add_remote(svr
, node
->name
, options
);
215 ovsdb_jsonrpc_session_set_all_options(remote
, options
);
219 static struct ovsdb_jsonrpc_remote
*
220 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server
*svr
,
222 const struct ovsdb_jsonrpc_options
*options
)
224 struct ovsdb_jsonrpc_remote
*remote
;
225 struct pstream
*listener
;
228 error
= jsonrpc_pstream_open(name
, &listener
, options
->dscp
);
229 if (error
&& error
!= EAFNOSUPPORT
) {
230 VLOG_ERR_RL(&rl
, "%s: listen failed: %s", name
, ovs_strerror(error
));
234 remote
= xmalloc(sizeof *remote
);
235 remote
->server
= svr
;
236 remote
->listener
= listener
;
237 list_init(&remote
->sessions
);
238 remote
->dscp
= options
->dscp
;
239 shash_add(&svr
->remotes
, name
, remote
);
242 ovsdb_jsonrpc_session_create(remote
, jsonrpc_session_open(name
, true));
248 ovsdb_jsonrpc_server_del_remote(struct shash_node
*node
)
250 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
252 ovsdb_jsonrpc_session_close_all(remote
);
253 pstream_close(remote
->listener
);
254 shash_delete(&remote
->server
->remotes
, node
);
258 /* Stores status information for the remote named 'target', which should have
259 * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
260 * into '*status'. On success returns true, on failure (if 'svr' doesn't have
261 * a remote named 'target' or if that remote is an inbound remote that has no
262 * active connections) returns false. On failure, 'status' will be zeroed.
265 ovsdb_jsonrpc_server_get_remote_status(
266 const struct ovsdb_jsonrpc_server
*svr
, const char *target
,
267 struct ovsdb_jsonrpc_remote_status
*status
)
269 const struct ovsdb_jsonrpc_remote
*remote
;
271 memset(status
, 0, sizeof *status
);
273 remote
= shash_find_data(&svr
->remotes
, target
);
274 return remote
&& ovsdb_jsonrpc_session_get_status(remote
, status
);
278 ovsdb_jsonrpc_server_free_remote_status(
279 struct ovsdb_jsonrpc_remote_status
*status
)
281 free(status
->locks_held
);
282 free(status
->locks_waiting
);
283 free(status
->locks_lost
);
286 /* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
289 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server
*svr
)
291 struct shash_node
*node
;
293 SHASH_FOR_EACH (node
, &svr
->remotes
) {
294 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
296 ovsdb_jsonrpc_session_reconnect_all(remote
);
301 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server
*svr
)
303 struct shash_node
*node
;
305 SHASH_FOR_EACH (node
, &svr
->remotes
) {
306 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
308 if (remote
->listener
&& svr
->n_sessions
< svr
->max_sessions
) {
309 struct stream
*stream
;
312 error
= pstream_accept(remote
->listener
, &stream
);
314 struct jsonrpc_session
*js
;
315 js
= jsonrpc_session_open_unreliably(jsonrpc_open(stream
),
317 ovsdb_jsonrpc_session_create(remote
, js
);
318 } else if (error
!= EAGAIN
) {
319 VLOG_WARN_RL(&rl
, "%s: accept failed: %s",
320 pstream_get_name(remote
->listener
),
321 ovs_strerror(error
));
325 ovsdb_jsonrpc_session_run_all(remote
);
330 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server
*svr
)
332 struct shash_node
*node
;
334 SHASH_FOR_EACH (node
, &svr
->remotes
) {
335 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
337 if (remote
->listener
&& svr
->n_sessions
< svr
->max_sessions
) {
338 pstream_wait(remote
->listener
);
341 ovsdb_jsonrpc_session_wait_all(remote
);
345 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
346 * memory_report(). */
348 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server
*svr
,
351 struct shash_node
*node
;
353 simap_increase(usage
, "sessions", svr
->n_sessions
);
354 SHASH_FOR_EACH (node
, &svr
->remotes
) {
355 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
357 ovsdb_jsonrpc_session_get_memory_usage_all(remote
, usage
);
361 /* JSON-RPC database server session. */
363 struct ovsdb_jsonrpc_session
{
364 struct list node
; /* Element in remote's sessions list. */
365 struct ovsdb_session up
;
366 struct ovsdb_jsonrpc_remote
*remote
;
367 size_t backlog_threshold
; /* See ovsdb_jsonrpc_session_run(). */
368 size_t reply_backlog
;
371 struct hmap triggers
; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
374 struct hmap monitors
; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
376 /* Network connectivity. */
377 struct jsonrpc_session
*js
; /* JSON-RPC session. */
378 unsigned int js_seqno
; /* Last jsonrpc_session_get_seqno() value. */
381 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session
*);
382 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session
*);
383 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session
*);
384 static void ovsdb_jsonrpc_session_get_memory_usage(
385 const struct ovsdb_jsonrpc_session
*, struct simap
*usage
);
386 static void ovsdb_jsonrpc_session_set_options(
387 struct ovsdb_jsonrpc_session
*, const struct ovsdb_jsonrpc_options
*);
388 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session
*,
389 struct jsonrpc_msg
*);
390 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session
*,
391 struct jsonrpc_msg
*);
393 static struct ovsdb_jsonrpc_session
*
394 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote
*remote
,
395 struct jsonrpc_session
*js
)
397 struct ovsdb_jsonrpc_session
*s
;
399 s
= xzalloc(sizeof *s
);
400 ovsdb_session_init(&s
->up
, &remote
->server
->up
);
402 list_push_back(&remote
->sessions
, &s
->node
);
403 hmap_init(&s
->triggers
);
404 hmap_init(&s
->monitors
);
405 s
->reply_backlog
= 0;
406 s
->backlog_threshold
= 1024 * 1024;
408 s
->js_seqno
= jsonrpc_session_get_seqno(js
);
410 remote
->server
->n_sessions
++;
416 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session
*s
)
418 ovsdb_jsonrpc_monitor_remove_all(s
);
419 ovsdb_jsonrpc_session_unlock_all(s
);
420 ovsdb_jsonrpc_trigger_complete_all(s
);
422 hmap_destroy(&s
->monitors
);
423 hmap_destroy(&s
->triggers
);
425 jsonrpc_session_close(s
->js
);
426 list_remove(&s
->node
);
427 s
->remote
->server
->n_sessions
--;
428 ovsdb_session_destroy(&s
->up
);
433 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session
*s
)
437 jsonrpc_session_run(s
->js
);
438 if (s
->js_seqno
!= jsonrpc_session_get_seqno(s
->js
)) {
439 s
->js_seqno
= jsonrpc_session_get_seqno(s
->js
);
440 ovsdb_jsonrpc_trigger_complete_all(s
);
441 ovsdb_jsonrpc_monitor_remove_all(s
);
442 ovsdb_jsonrpc_session_unlock_all(s
);
445 ovsdb_jsonrpc_trigger_complete_done(s
);
447 backlog
= jsonrpc_session_get_backlog(s
->js
);
449 struct jsonrpc_msg
*msg
= jsonrpc_session_recv(s
->js
);
451 if (msg
->type
== JSONRPC_REQUEST
) {
452 ovsdb_jsonrpc_session_got_request(s
, msg
);
453 } else if (msg
->type
== JSONRPC_NOTIFY
) {
454 ovsdb_jsonrpc_session_got_notify(s
, msg
);
456 VLOG_WARN("%s: received unexpected %s message",
457 jsonrpc_session_get_name(s
->js
),
458 jsonrpc_msg_type_to_string(msg
->type
));
459 jsonrpc_session_force_reconnect(s
->js
);
460 jsonrpc_msg_destroy(msg
);
463 s
->reply_backlog
= jsonrpc_session_get_backlog(s
->js
);
464 } else if (backlog
> s
->reply_backlog
+ s
->backlog_threshold
) {
465 /* We have a lot of data queued to send to the client. The data is
466 * likely to be mostly monitor updates. It is unlikely that the
467 * monitor updates are due to transactions by 's', because we will not
468 * let 's' make any more transactions until it drains its backlog to 0
469 * (see previous 'if' case). So the monitor updates are probably due
470 * to transactions made by database clients other than 's'. We can't
471 * fix that by preventing 's' from executing more transactions. We
472 * could fix it by preventing every client from executing transactions,
473 * but then one slow or hung client could prevent other clients from
476 * Our solution is to cap the maximum backlog to O(1) in the amount of
477 * data in the database. If the backlog exceeds that amount, then we
478 * disconnect the client. When it reconnects, it can fetch the entire
479 * contents of the database using less data than was previously
481 size_t monitor_length
;
483 monitor_length
= ovsdb_jsonrpc_monitor_json_length_all(s
);
484 if (backlog
> s
->reply_backlog
+ monitor_length
* 2) {
485 VLOG_INFO("%s: %zu bytes backlogged but a complete replica "
486 "would only take %zu bytes, disconnecting",
487 jsonrpc_session_get_name(s
->js
),
488 backlog
- s
->reply_backlog
, monitor_length
);
489 jsonrpc_session_force_reconnect(s
->js
);
491 /* The backlog is not unreasonably big. Only check again after it
492 * becomes much bigger. */
493 s
->backlog_threshold
= 2 * MAX(s
->backlog_threshold
* 2,
497 return jsonrpc_session_is_alive(s
->js
) ? 0 : ETIMEDOUT
;
501 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session
*session
,
502 const struct ovsdb_jsonrpc_options
*options
)
504 jsonrpc_session_set_max_backoff(session
->js
, options
->max_backoff
);
505 jsonrpc_session_set_probe_interval(session
->js
, options
->probe_interval
);
506 jsonrpc_session_set_dscp(session
->js
, options
->dscp
);
510 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote
*remote
)
512 struct ovsdb_jsonrpc_session
*s
, *next
;
514 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
515 int error
= ovsdb_jsonrpc_session_run(s
);
517 ovsdb_jsonrpc_session_close(s
);
523 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session
*s
)
525 jsonrpc_session_wait(s
->js
);
526 if (!jsonrpc_session_get_backlog(s
->js
)) {
527 jsonrpc_session_recv_wait(s
->js
);
532 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote
*remote
)
534 struct ovsdb_jsonrpc_session
*s
;
536 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
537 ovsdb_jsonrpc_session_wait(s
);
542 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session
*s
,
545 simap_increase(usage
, "triggers", hmap_count(&s
->triggers
));
546 simap_increase(usage
, "monitors", hmap_count(&s
->monitors
));
547 simap_increase(usage
, "backlog", jsonrpc_session_get_backlog(s
->js
));
551 ovsdb_jsonrpc_session_get_memory_usage_all(
552 const struct ovsdb_jsonrpc_remote
*remote
,
555 struct ovsdb_jsonrpc_session
*s
;
557 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
558 ovsdb_jsonrpc_session_get_memory_usage(s
, usage
);
563 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote
*remote
)
565 struct ovsdb_jsonrpc_session
*s
, *next
;
567 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
568 ovsdb_jsonrpc_session_close(s
);
572 /* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
575 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote
*remote
)
577 struct ovsdb_jsonrpc_session
*s
, *next
;
579 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
580 jsonrpc_session_force_reconnect(s
->js
);
581 if (!jsonrpc_session_is_alive(s
->js
)) {
582 ovsdb_jsonrpc_session_close(s
);
587 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
590 ovsdb_jsonrpc_session_set_all_options(
591 struct ovsdb_jsonrpc_remote
*remote
,
592 const struct ovsdb_jsonrpc_options
*options
)
594 struct ovsdb_jsonrpc_session
*s
;
596 if (remote
->listener
) {
599 error
= pstream_set_dscp(remote
->listener
, options
->dscp
);
601 VLOG_ERR("%s: set_dscp failed %s",
602 pstream_get_name(remote
->listener
), ovs_strerror(error
));
604 remote
->dscp
= options
->dscp
;
607 * XXX race window between setting dscp to listening socket
608 * and accepting socket. Accepted socket may have old dscp value.
609 * Ignore this race window for now.
612 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
613 ovsdb_jsonrpc_session_set_options(s
, options
);
618 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_remote
*remote
,
619 struct ovsdb_jsonrpc_remote_status
*status
)
621 const struct ovsdb_jsonrpc_session
*s
;
622 const struct jsonrpc_session
*js
;
623 struct ovsdb_lock_waiter
*waiter
;
624 struct reconnect_stats rstats
;
625 struct ds locks_held
, locks_waiting
, locks_lost
;
627 status
->bound_port
= (remote
->listener
628 ? pstream_get_bound_port(remote
->listener
)
631 if (list_is_empty(&remote
->sessions
)) {
634 s
= CONTAINER_OF(remote
->sessions
.next
, struct ovsdb_jsonrpc_session
, node
);
637 status
->is_connected
= jsonrpc_session_is_connected(js
);
638 status
->last_error
= jsonrpc_session_get_status(js
);
640 jsonrpc_session_get_reconnect_stats(js
, &rstats
);
641 status
->state
= rstats
.state
;
642 status
->sec_since_connect
= rstats
.msec_since_connect
== UINT_MAX
643 ? UINT_MAX
: rstats
.msec_since_connect
/ 1000;
644 status
->sec_since_disconnect
= rstats
.msec_since_disconnect
== UINT_MAX
645 ? UINT_MAX
: rstats
.msec_since_disconnect
/ 1000;
647 ds_init(&locks_held
);
648 ds_init(&locks_waiting
);
649 ds_init(&locks_lost
);
650 HMAP_FOR_EACH (waiter
, session_node
, &s
->up
.waiters
) {
653 string
= (ovsdb_lock_waiter_is_owner(waiter
) ? &locks_held
654 : waiter
->mode
== OVSDB_LOCK_WAIT
? &locks_waiting
656 if (string
->length
) {
657 ds_put_char(string
, ' ');
659 ds_put_cstr(string
, waiter
->lock_name
);
661 status
->locks_held
= ds_steal_cstr(&locks_held
);
662 status
->locks_waiting
= ds_steal_cstr(&locks_waiting
);
663 status
->locks_lost
= ds_steal_cstr(&locks_lost
);
665 status
->n_connections
= list_size(&remote
->sessions
);
670 /* Examines 'request' to determine the database to which it relates, and then
671 * searches 's' to find that database:
673 * - If successful, returns the database and sets '*replyp' to NULL.
675 * - If no such database exists, returns NULL and sets '*replyp' to an
676 * appropriate JSON-RPC error reply, owned by the caller. */
677 static struct ovsdb
*
678 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session
*s
,
679 const struct jsonrpc_msg
*request
,
680 struct jsonrpc_msg
**replyp
)
682 struct json_array
*params
;
683 struct ovsdb_error
*error
;
687 params
= json_array(request
->params
);
688 if (!params
->n
|| params
->elems
[0]->type
!= JSON_STRING
) {
689 error
= ovsdb_syntax_error(
690 request
->params
, NULL
,
691 "%s request params must begin with <db-name>", request
->method
);
695 db_name
= params
->elems
[0]->u
.string
;
696 db
= shash_find_data(&s
->up
.server
->dbs
, db_name
);
698 error
= ovsdb_syntax_error(
699 request
->params
, "unknown database",
700 "%s request specifies unknown database %s",
701 request
->method
, db_name
);
709 *replyp
= jsonrpc_create_reply(ovsdb_error_to_json(error
), request
->id
);
710 ovsdb_error_destroy(error
);
714 static struct ovsdb_error
*
715 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg
*request
,
716 const char **lock_namep
)
718 const struct json_array
*params
;
720 params
= json_array(request
->params
);
721 if (params
->n
!= 1 || params
->elems
[0]->type
!= JSON_STRING
||
722 !ovsdb_parser_is_id(json_string(params
->elems
[0]))) {
724 return ovsdb_syntax_error(request
->params
, NULL
,
725 "%s request params must be <id>",
729 *lock_namep
= json_string(params
->elems
[0]);
734 ovsdb_jsonrpc_session_notify(struct ovsdb_session
*session
,
735 const char *lock_name
,
738 struct ovsdb_jsonrpc_session
*s
;
741 s
= CONTAINER_OF(session
, struct ovsdb_jsonrpc_session
, up
);
742 params
= json_array_create_1(json_string_create(lock_name
));
743 jsonrpc_session_send(s
->js
, jsonrpc_create_notify(method
, params
));
746 static struct jsonrpc_msg
*
747 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session
*s
,
748 struct jsonrpc_msg
*request
,
749 enum ovsdb_lock_mode mode
)
751 struct ovsdb_lock_waiter
*waiter
;
752 struct jsonrpc_msg
*reply
;
753 struct ovsdb_error
*error
;
754 struct ovsdb_session
*victim
;
755 const char *lock_name
;
758 error
= ovsdb_jsonrpc_session_parse_lock_name(request
, &lock_name
);
763 /* Report error if this session has issued a "lock" or "steal" without a
764 * matching "unlock" for this lock. */
765 waiter
= ovsdb_session_get_lock_waiter(&s
->up
, lock_name
);
767 error
= ovsdb_syntax_error(
768 request
->params
, NULL
,
769 "must issue \"unlock\" before new \"%s\"", request
->method
);
773 /* Get the lock, add us as a waiter. */
774 waiter
= ovsdb_server_lock(&s
->remote
->server
->up
, &s
->up
, lock_name
, mode
,
777 ovsdb_jsonrpc_session_notify(victim
, lock_name
, "stolen");
780 result
= json_object_create();
781 json_object_put(result
, "locked",
782 json_boolean_create(ovsdb_lock_waiter_is_owner(waiter
)));
784 return jsonrpc_create_reply(result
, request
->id
);
787 reply
= jsonrpc_create_reply(ovsdb_error_to_json(error
), request
->id
);
788 ovsdb_error_destroy(error
);
793 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session
*s
)
795 struct ovsdb_lock_waiter
*waiter
, *next
;
797 HMAP_FOR_EACH_SAFE (waiter
, next
, session_node
, &s
->up
.waiters
) {
798 ovsdb_jsonrpc_session_unlock__(waiter
);
803 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter
*waiter
)
805 struct ovsdb_lock
*lock
= waiter
->lock
;
808 struct ovsdb_session
*new_owner
= ovsdb_lock_waiter_remove(waiter
);
810 ovsdb_jsonrpc_session_notify(new_owner
, lock
->name
, "locked");
812 /* ovsdb_server_lock() might have freed 'lock'. */
816 ovsdb_lock_waiter_destroy(waiter
);
819 static struct jsonrpc_msg
*
820 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session
*s
,
821 struct jsonrpc_msg
*request
)
823 struct ovsdb_lock_waiter
*waiter
;
824 struct jsonrpc_msg
*reply
;
825 struct ovsdb_error
*error
;
826 const char *lock_name
;
828 error
= ovsdb_jsonrpc_session_parse_lock_name(request
, &lock_name
);
833 /* Report error if this session has not issued a "lock" or "steal" for this
835 waiter
= ovsdb_session_get_lock_waiter(&s
->up
, lock_name
);
837 error
= ovsdb_syntax_error(
838 request
->params
, NULL
, "\"unlock\" without \"lock\" or \"steal\"");
842 ovsdb_jsonrpc_session_unlock__(waiter
);
844 return jsonrpc_create_reply(json_object_create(), request
->id
);
847 reply
= jsonrpc_create_reply(ovsdb_error_to_json(error
), request
->id
);
848 ovsdb_error_destroy(error
);
852 static struct jsonrpc_msg
*
853 execute_transaction(struct ovsdb_jsonrpc_session
*s
, struct ovsdb
*db
,
854 struct jsonrpc_msg
*request
)
856 ovsdb_jsonrpc_trigger_create(s
, db
, request
->id
, request
->params
);
858 request
->params
= NULL
;
859 jsonrpc_msg_destroy(request
);
864 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session
*s
,
865 struct jsonrpc_msg
*request
)
867 struct jsonrpc_msg
*reply
;
869 if (!strcmp(request
->method
, "transact")) {
870 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
872 reply
= execute_transaction(s
, db
, request
);
874 } else if (!strcmp(request
->method
, "monitor")) {
875 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
877 reply
= jsonrpc_create_reply(
878 ovsdb_jsonrpc_monitor_create(s
, db
, request
->params
),
881 } else if (!strcmp(request
->method
, "monitor_cancel")) {
882 reply
= ovsdb_jsonrpc_monitor_cancel(s
, json_array(request
->params
),
884 } else if (!strcmp(request
->method
, "get_schema")) {
885 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
887 reply
= jsonrpc_create_reply(ovsdb_schema_to_json(db
->schema
),
890 } else if (!strcmp(request
->method
, "list_dbs")) {
891 size_t n_dbs
= shash_count(&s
->up
.server
->dbs
);
892 struct shash_node
*node
;
896 dbs
= xmalloc(n_dbs
* sizeof *dbs
);
898 SHASH_FOR_EACH (node
, &s
->up
.server
->dbs
) {
899 dbs
[i
++] = json_string_create(node
->name
);
901 reply
= jsonrpc_create_reply(json_array_create(dbs
, n_dbs
),
903 } else if (!strcmp(request
->method
, "lock")) {
904 reply
= ovsdb_jsonrpc_session_lock(s
, request
, OVSDB_LOCK_WAIT
);
905 } else if (!strcmp(request
->method
, "steal")) {
906 reply
= ovsdb_jsonrpc_session_lock(s
, request
, OVSDB_LOCK_STEAL
);
907 } else if (!strcmp(request
->method
, "unlock")) {
908 reply
= ovsdb_jsonrpc_session_unlock(s
, request
);
909 } else if (!strcmp(request
->method
, "echo")) {
910 reply
= jsonrpc_create_reply(json_clone(request
->params
), request
->id
);
912 reply
= jsonrpc_create_error(json_string_create("unknown method"),
917 jsonrpc_msg_destroy(request
);
918 jsonrpc_session_send(s
->js
, reply
);
923 execute_cancel(struct ovsdb_jsonrpc_session
*s
, struct jsonrpc_msg
*request
)
925 if (json_array(request
->params
)->n
== 1) {
926 struct ovsdb_jsonrpc_trigger
*t
;
929 id
= request
->params
->u
.array
.elems
[0];
930 t
= ovsdb_jsonrpc_trigger_find(s
, id
, json_hash(id
, 0));
932 ovsdb_jsonrpc_trigger_complete(t
);
938 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session
*s
,
939 struct jsonrpc_msg
*request
)
941 if (!strcmp(request
->method
, "cancel")) {
942 execute_cancel(s
, request
);
944 jsonrpc_msg_destroy(request
);
947 /* JSON-RPC database server triggers.
949 * (Every transaction is treated as a trigger even if it doesn't actually have
950 * any "wait" operations.) */
952 struct ovsdb_jsonrpc_trigger
{
953 struct ovsdb_trigger trigger
;
954 struct hmap_node hmap_node
; /* In session's "triggers" hmap. */
959 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session
*s
, struct ovsdb
*db
,
960 struct json
*id
, struct json
*params
)
962 struct ovsdb_jsonrpc_trigger
*t
;
965 /* Check for duplicate ID. */
966 hash
= json_hash(id
, 0);
967 t
= ovsdb_jsonrpc_trigger_find(s
, id
, hash
);
969 struct jsonrpc_msg
*msg
;
971 msg
= jsonrpc_create_error(json_string_create("duplicate request ID"),
973 jsonrpc_session_send(s
->js
, msg
);
975 json_destroy(params
);
979 /* Insert into trigger table. */
980 t
= xmalloc(sizeof *t
);
981 ovsdb_trigger_init(&s
->up
, db
, &t
->trigger
, params
, time_msec());
983 hmap_insert(&s
->triggers
, &t
->hmap_node
, hash
);
985 /* Complete early if possible. */
986 if (ovsdb_trigger_is_complete(&t
->trigger
)) {
987 ovsdb_jsonrpc_trigger_complete(t
);
991 static struct ovsdb_jsonrpc_trigger
*
992 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session
*s
,
993 const struct json
*id
, size_t hash
)
995 struct ovsdb_jsonrpc_trigger
*t
;
997 HMAP_FOR_EACH_WITH_HASH (t
, hmap_node
, hash
, &s
->triggers
) {
998 if (json_equal(t
->id
, id
)) {
1007 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger
*t
)
1009 struct ovsdb_jsonrpc_session
*s
;
1011 s
= CONTAINER_OF(t
->trigger
.session
, struct ovsdb_jsonrpc_session
, up
);
1013 if (jsonrpc_session_is_connected(s
->js
)) {
1014 struct jsonrpc_msg
*reply
;
1015 struct json
*result
;
1017 result
= ovsdb_trigger_steal_result(&t
->trigger
);
1019 reply
= jsonrpc_create_reply(result
, t
->id
);
1021 reply
= jsonrpc_create_error(json_string_create("canceled"),
1024 jsonrpc_session_send(s
->js
, reply
);
1027 json_destroy(t
->id
);
1028 ovsdb_trigger_destroy(&t
->trigger
);
1029 hmap_remove(&s
->triggers
, &t
->hmap_node
);
1034 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session
*s
)
1036 struct ovsdb_jsonrpc_trigger
*t
, *next
;
1037 HMAP_FOR_EACH_SAFE (t
, next
, hmap_node
, &s
->triggers
) {
1038 ovsdb_jsonrpc_trigger_complete(t
);
1043 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session
*s
)
1045 while (!list_is_empty(&s
->up
.completions
)) {
1046 struct ovsdb_jsonrpc_trigger
*t
1047 = CONTAINER_OF(s
->up
.completions
.next
,
1048 struct ovsdb_jsonrpc_trigger
, trigger
.node
);
1049 ovsdb_jsonrpc_trigger_complete(t
);
1053 /* JSON-RPC database table monitors. */
1055 enum ovsdb_jsonrpc_monitor_selection
{
1056 OJMS_INITIAL
= 1 << 0, /* All rows when monitor is created. */
1057 OJMS_INSERT
= 1 << 1, /* New rows. */
1058 OJMS_DELETE
= 1 << 2, /* Deleted rows. */
1059 OJMS_MODIFY
= 1 << 3 /* Modified rows. */
1062 /* A particular column being monitored. */
1063 struct ovsdb_jsonrpc_monitor_column
{
1064 const struct ovsdb_column
*column
;
1065 enum ovsdb_jsonrpc_monitor_selection select
;
1068 /* A particular table being monitored. */
1069 struct ovsdb_jsonrpc_monitor_table
{
1070 const struct ovsdb_table
*table
;
1072 /* This is the union (bitwise-OR) of the 'select' values in all of the
1073 * members of 'columns' below. */
1074 enum ovsdb_jsonrpc_monitor_selection select
;
1076 /* Columns being monitored. */
1077 struct ovsdb_jsonrpc_monitor_column
*columns
;
1081 /* A collection of tables being monitored. */
1082 struct ovsdb_jsonrpc_monitor
{
1083 struct ovsdb_replica replica
;
1084 struct ovsdb_jsonrpc_session
*session
;
1086 struct hmap_node node
; /* In ovsdb_jsonrpc_session's "monitors". */
1088 struct json
*monitor_id
;
1089 struct shash tables
; /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
1092 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class
;
1094 struct ovsdb_jsonrpc_monitor
*ovsdb_jsonrpc_monitor_find(
1095 struct ovsdb_jsonrpc_session
*, const struct json
*monitor_id
);
1096 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica
*);
1097 static struct json
*ovsdb_jsonrpc_monitor_get_initial(
1098 const struct ovsdb_jsonrpc_monitor
*);
1099 static size_t ovsdb_jsonrpc_monitor_json_length(
1100 const struct ovsdb_jsonrpc_monitor
*);
1103 parse_bool(struct ovsdb_parser
*parser
, const char *name
, bool default_value
)
1105 const struct json
*json
;
1107 json
= ovsdb_parser_member(parser
, name
, OP_BOOLEAN
| OP_OPTIONAL
);
1108 return json
? json_boolean(json
) : default_value
;
1111 struct ovsdb_jsonrpc_monitor
*
1112 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session
*s
,
1113 const struct json
*monitor_id
)
1115 struct ovsdb_jsonrpc_monitor
*m
;
1117 HMAP_FOR_EACH_WITH_HASH (m
, node
, json_hash(monitor_id
, 0), &s
->monitors
) {
1118 if (json_equal(m
->monitor_id
, monitor_id
)) {
1127 ovsdb_jsonrpc_add_monitor_column(struct ovsdb_jsonrpc_monitor_table
*mt
,
1128 const struct ovsdb_column
*column
,
1129 enum ovsdb_jsonrpc_monitor_selection select
,
1130 size_t *allocated_columns
)
1132 struct ovsdb_jsonrpc_monitor_column
*c
;
1134 if (mt
->n_columns
>= *allocated_columns
) {
1135 mt
->columns
= x2nrealloc(mt
->columns
, allocated_columns
,
1136 sizeof *mt
->columns
);
1139 c
= &mt
->columns
[mt
->n_columns
++];
1145 compare_ovsdb_jsonrpc_monitor_column(const void *a_
, const void *b_
)
1147 const struct ovsdb_jsonrpc_monitor_column
*a
= a_
;
1148 const struct ovsdb_jsonrpc_monitor_column
*b
= b_
;
1150 return a
->column
< b
->column
? -1 : a
->column
> b
->column
;
1153 static struct ovsdb_error
* WARN_UNUSED_RESULT
1154 ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_jsonrpc_monitor_table
*mt
,
1155 const struct json
*monitor_request
,
1156 size_t *allocated_columns
)
1158 const struct ovsdb_table_schema
*ts
= mt
->table
->schema
;
1159 enum ovsdb_jsonrpc_monitor_selection select
;
1160 const struct json
*columns
, *select_json
;
1161 struct ovsdb_parser parser
;
1162 struct ovsdb_error
*error
;
1164 ovsdb_parser_init(&parser
, monitor_request
, "table %s", ts
->name
);
1165 columns
= ovsdb_parser_member(&parser
, "columns", OP_ARRAY
| OP_OPTIONAL
);
1166 select_json
= ovsdb_parser_member(&parser
, "select",
1167 OP_OBJECT
| OP_OPTIONAL
);
1168 error
= ovsdb_parser_finish(&parser
);
1175 ovsdb_parser_init(&parser
, select_json
, "table %s select", ts
->name
);
1176 if (parse_bool(&parser
, "initial", true)) {
1177 select
|= OJMS_INITIAL
;
1179 if (parse_bool(&parser
, "insert", true)) {
1180 select
|= OJMS_INSERT
;
1182 if (parse_bool(&parser
, "delete", true)) {
1183 select
|= OJMS_DELETE
;
1185 if (parse_bool(&parser
, "modify", true)) {
1186 select
|= OJMS_MODIFY
;
1188 error
= ovsdb_parser_finish(&parser
);
1193 select
= OJMS_INITIAL
| OJMS_INSERT
| OJMS_DELETE
| OJMS_MODIFY
;
1195 mt
->select
|= select
;
1200 if (columns
->type
!= JSON_ARRAY
) {
1201 return ovsdb_syntax_error(columns
, NULL
,
1202 "array of column names expected");
1205 for (i
= 0; i
< columns
->u
.array
.n
; i
++) {
1206 const struct ovsdb_column
*column
;
1209 if (columns
->u
.array
.elems
[i
]->type
!= JSON_STRING
) {
1210 return ovsdb_syntax_error(columns
, NULL
,
1211 "array of column names expected");
1214 s
= columns
->u
.array
.elems
[i
]->u
.string
;
1215 column
= shash_find_data(&mt
->table
->schema
->columns
, s
);
1217 return ovsdb_syntax_error(columns
, NULL
, "%s is not a valid "
1220 ovsdb_jsonrpc_add_monitor_column(mt
, column
, select
,
1224 struct shash_node
*node
;
1226 SHASH_FOR_EACH (node
, &ts
->columns
) {
1227 const struct ovsdb_column
*column
= node
->data
;
1228 if (column
->index
!= OVSDB_COL_UUID
) {
1229 ovsdb_jsonrpc_add_monitor_column(mt
, column
, select
,
1238 static struct json
*
1239 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session
*s
, struct ovsdb
*db
,
1240 struct json
*params
)
1242 struct ovsdb_jsonrpc_monitor
*m
= NULL
;
1243 struct json
*monitor_id
, *monitor_requests
;
1244 struct ovsdb_error
*error
= NULL
;
1245 struct shash_node
*node
;
1248 if (json_array(params
)->n
!= 3) {
1249 error
= ovsdb_syntax_error(params
, NULL
, "invalid parameters");
1252 monitor_id
= params
->u
.array
.elems
[1];
1253 monitor_requests
= params
->u
.array
.elems
[2];
1254 if (monitor_requests
->type
!= JSON_OBJECT
) {
1255 error
= ovsdb_syntax_error(monitor_requests
, NULL
,
1256 "monitor-requests must be object");
1260 if (ovsdb_jsonrpc_monitor_find(s
, monitor_id
)) {
1261 error
= ovsdb_syntax_error(monitor_id
, NULL
, "duplicate monitor ID");
1265 m
= xzalloc(sizeof *m
);
1266 ovsdb_replica_init(&m
->replica
, &ovsdb_jsonrpc_replica_class
);
1267 ovsdb_add_replica(db
, &m
->replica
);
1270 hmap_insert(&s
->monitors
, &m
->node
, json_hash(monitor_id
, 0));
1271 m
->monitor_id
= json_clone(monitor_id
);
1272 shash_init(&m
->tables
);
1274 SHASH_FOR_EACH (node
, json_object(monitor_requests
)) {
1275 const struct ovsdb_table
*table
;
1276 struct ovsdb_jsonrpc_monitor_table
*mt
;
1277 size_t allocated_columns
;
1278 const struct json
*mr_value
;
1281 table
= ovsdb_get_table(m
->db
, node
->name
);
1283 error
= ovsdb_syntax_error(NULL
, NULL
,
1284 "no table named %s", node
->name
);
1288 mt
= xzalloc(sizeof *mt
);
1290 shash_add(&m
->tables
, table
->schema
->name
, mt
);
1292 /* Parse columns. */
1293 mr_value
= node
->data
;
1294 allocated_columns
= 0;
1295 if (mr_value
->type
== JSON_ARRAY
) {
1296 const struct json_array
*array
= &mr_value
->u
.array
;
1298 for (i
= 0; i
< array
->n
; i
++) {
1299 error
= ovsdb_jsonrpc_parse_monitor_request(
1300 mt
, array
->elems
[i
], &allocated_columns
);
1306 error
= ovsdb_jsonrpc_parse_monitor_request(
1307 mt
, mr_value
, &allocated_columns
);
1313 /* Check for duplicate columns. */
1314 qsort(mt
->columns
, mt
->n_columns
, sizeof *mt
->columns
,
1315 compare_ovsdb_jsonrpc_monitor_column
);
1316 for (i
= 1; i
< mt
->n_columns
; i
++) {
1317 if (mt
->columns
[i
].column
== mt
->columns
[i
- 1].column
) {
1318 error
= ovsdb_syntax_error(mr_value
, NULL
, "column %s "
1319 "mentioned more than once",
1320 mt
->columns
[i
].column
->name
);
1326 return ovsdb_jsonrpc_monitor_get_initial(m
);
1330 ovsdb_remove_replica(m
->db
, &m
->replica
);
1333 json
= ovsdb_error_to_json(error
);
1334 ovsdb_error_destroy(error
);
1338 static struct jsonrpc_msg
*
1339 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session
*s
,
1340 struct json_array
*params
,
1341 const struct json
*request_id
)
1343 if (params
->n
!= 1) {
1344 return jsonrpc_create_error(json_string_create("invalid parameters"),
1347 struct ovsdb_jsonrpc_monitor
*m
;
1349 m
= ovsdb_jsonrpc_monitor_find(s
, params
->elems
[0]);
1351 return jsonrpc_create_error(json_string_create("unknown monitor"),
1354 ovsdb_remove_replica(m
->db
, &m
->replica
);
1355 return jsonrpc_create_reply(json_object_create(), request_id
);
1361 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session
*s
)
1363 struct ovsdb_jsonrpc_monitor
*m
, *next
;
1365 HMAP_FOR_EACH_SAFE (m
, next
, node
, &s
->monitors
) {
1366 ovsdb_remove_replica(m
->db
, &m
->replica
);
1370 /* Returns an overestimate of the number of bytes of JSON data required to
1371 * report the current contents of the database over all the monitors currently
1372 * configured in 's'. */
1374 ovsdb_jsonrpc_monitor_json_length_all(struct ovsdb_jsonrpc_session
*s
)
1376 struct ovsdb_jsonrpc_monitor
*m
;
1380 HMAP_FOR_EACH (m
, node
, &s
->monitors
) {
1381 length
+= ovsdb_jsonrpc_monitor_json_length(m
);
1386 static struct ovsdb_jsonrpc_monitor
*
1387 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica
*replica
)
1389 ovs_assert(replica
->class == &ovsdb_jsonrpc_replica_class
);
1390 return CONTAINER_OF(replica
, struct ovsdb_jsonrpc_monitor
, replica
);
1393 struct ovsdb_jsonrpc_monitor_aux
{
1394 bool initial
; /* Sending initial contents of table? */
1395 const struct ovsdb_jsonrpc_monitor
*monitor
;
1396 struct json
*json
; /* JSON for the whole transaction. */
1398 /* Current table. */
1399 struct ovsdb_jsonrpc_monitor_table
*mt
;
1400 struct json
*table_json
; /* JSON for table's transaction. */
1404 any_reportable_change(const struct ovsdb_jsonrpc_monitor_table
*mt
,
1405 const unsigned long int *changed
)
1409 for (i
= 0; i
< mt
->n_columns
; i
++) {
1410 const struct ovsdb_jsonrpc_monitor_column
*c
= &mt
->columns
[i
];
1411 unsigned int idx
= c
->column
->index
;
1413 if (c
->select
& OJMS_MODIFY
&& bitmap_is_set(changed
, idx
)) {
1422 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row
*old
,
1423 const struct ovsdb_row
*new,
1424 const unsigned long int *changed
,
1427 struct ovsdb_jsonrpc_monitor_aux
*aux
= aux_
;
1428 const struct ovsdb_jsonrpc_monitor
*m
= aux
->monitor
;
1429 struct ovsdb_table
*table
= new ? new->table
: old
->table
;
1430 enum ovsdb_jsonrpc_monitor_selection type
;
1431 struct json
*old_json
, *new_json
;
1432 struct json
*row_json
;
1433 char uuid
[UUID_LEN
+ 1];
1436 if (!aux
->mt
|| table
!= aux
->mt
->table
) {
1437 aux
->mt
= shash_find_data(&m
->tables
, table
->schema
->name
);
1438 aux
->table_json
= NULL
;
1440 /* We don't care about rows in this table at all. Tell the caller
1446 type
= (aux
->initial
? OJMS_INITIAL
1447 : !old
? OJMS_INSERT
1448 : !new ? OJMS_DELETE
1450 if (!(aux
->mt
->select
& type
)) {
1451 /* We don't care about this type of change (but do want to be called
1452 * back for changes to other rows in the same table). */
1456 if (type
== OJMS_MODIFY
&& !any_reportable_change(aux
->mt
, changed
)) {
1457 /* Nothing of interest changed. */
1461 old_json
= new_json
= NULL
;
1462 if (type
& (OJMS_DELETE
| OJMS_MODIFY
)) {
1463 old_json
= json_object_create();
1465 if (type
& (OJMS_INITIAL
| OJMS_INSERT
| OJMS_MODIFY
)) {
1466 new_json
= json_object_create();
1468 for (i
= 0; i
< aux
->mt
->n_columns
; i
++) {
1469 const struct ovsdb_jsonrpc_monitor_column
*c
= &aux
->mt
->columns
[i
];
1470 const struct ovsdb_column
*column
= c
->column
;
1471 unsigned int idx
= c
->column
->index
;
1473 if (!(type
& c
->select
)) {
1474 /* We don't care about this type of change for this particular
1475 * column (but we will care about it for some other column). */
1479 if ((type
== OJMS_MODIFY
&& bitmap_is_set(changed
, idx
))
1480 || type
== OJMS_DELETE
) {
1481 json_object_put(old_json
, column
->name
,
1482 ovsdb_datum_to_json(&old
->fields
[idx
],
1485 if (type
& (OJMS_INITIAL
| OJMS_INSERT
| OJMS_MODIFY
)) {
1486 json_object_put(new_json
, column
->name
,
1487 ovsdb_datum_to_json(&new->fields
[idx
],
1492 /* Create JSON object for transaction overall. */
1494 aux
->json
= json_object_create();
1497 /* Create JSON object for transaction on this table. */
1498 if (!aux
->table_json
) {
1499 aux
->table_json
= json_object_create();
1500 json_object_put(aux
->json
, aux
->mt
->table
->schema
->name
,
1504 /* Create JSON object for transaction on this row. */
1505 row_json
= json_object_create();
1507 json_object_put(row_json
, "old", old_json
);
1510 json_object_put(row_json
, "new", new_json
);
1513 /* Add JSON row to JSON table. */
1514 snprintf(uuid
, sizeof uuid
,
1515 UUID_FMT
, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old
)));
1516 json_object_put(aux
->table_json
, uuid
, row_json
);
1521 /* Returns an overestimate of the number of bytes of JSON data required to
1522 * report the current contents of the database over monitor 'm'. */
1524 ovsdb_jsonrpc_monitor_json_length(const struct ovsdb_jsonrpc_monitor
*m
)
1526 const struct shash_node
*node
;
1529 /* Top-level overhead of monitor JSON. */
1532 SHASH_FOR_EACH (node
, &m
->tables
) {
1533 const struct ovsdb_jsonrpc_monitor_table
*mt
= node
->data
;
1534 const struct ovsdb_table
*table
= mt
->table
;
1535 const struct ovsdb_row
*row
;
1538 /* Per-table JSON overhead: "<table>":{...}. */
1539 length
+= strlen(table
->schema
->name
) + 32;
1541 /* Per-row JSON overhead: ,"<uuid>":{"old":{...},"new":{...}} */
1542 length
+= hmap_count(&table
->rows
) * (UUID_LEN
+ 32);
1544 /* Per-row, per-column JSON overhead: ,"<column>": */
1545 for (i
= 0; i
< mt
->n_columns
; i
++) {
1546 const struct ovsdb_jsonrpc_monitor_column
*c
= &mt
->columns
[i
];
1547 const struct ovsdb_column
*column
= c
->column
;
1549 length
+= hmap_count(&table
->rows
) * (8 + strlen(column
->name
));
1553 HMAP_FOR_EACH (row
, hmap_node
, &table
->rows
) {
1554 for (i
= 0; i
< mt
->n_columns
; i
++) {
1555 const struct ovsdb_jsonrpc_monitor_column
*c
= &mt
->columns
[i
];
1556 const struct ovsdb_column
*column
= c
->column
;
1558 length
+= ovsdb_datum_json_length(&row
->fields
[column
->index
],
1568 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux
*aux
,
1569 const struct ovsdb_jsonrpc_monitor
*m
,
1572 aux
->initial
= initial
;
1576 aux
->table_json
= NULL
;
1579 static struct ovsdb_error
*
1580 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica
*replica
,
1581 const struct ovsdb_txn
*txn
,
1582 bool durable OVS_UNUSED
)
1584 struct ovsdb_jsonrpc_monitor
*m
= ovsdb_jsonrpc_monitor_cast(replica
);
1585 struct ovsdb_jsonrpc_monitor_aux aux
;
1587 ovsdb_jsonrpc_monitor_init_aux(&aux
, m
, false);
1588 ovsdb_txn_for_each_change(txn
, ovsdb_jsonrpc_monitor_change_cb
, &aux
);
1590 struct jsonrpc_msg
*msg
;
1591 struct json
*params
;
1593 params
= json_array_create_2(json_clone(aux
.monitor
->monitor_id
),
1595 msg
= jsonrpc_create_notify("update", params
);
1596 jsonrpc_session_send(aux
.monitor
->session
->js
, msg
);
1602 static struct json
*
1603 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor
*m
)
1605 struct ovsdb_jsonrpc_monitor_aux aux
;
1606 struct shash_node
*node
;
1608 ovsdb_jsonrpc_monitor_init_aux(&aux
, m
, true);
1609 SHASH_FOR_EACH (node
, &m
->tables
) {
1610 struct ovsdb_jsonrpc_monitor_table
*mt
= node
->data
;
1612 if (mt
->select
& OJMS_INITIAL
) {
1613 struct ovsdb_row
*row
;
1615 HMAP_FOR_EACH (row
, hmap_node
, &mt
->table
->rows
) {
1616 ovsdb_jsonrpc_monitor_change_cb(NULL
, row
, NULL
, &aux
);
1620 return aux
.json
? aux
.json
: json_object_create();
1624 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica
*replica
)
1626 struct ovsdb_jsonrpc_monitor
*m
= ovsdb_jsonrpc_monitor_cast(replica
);
1627 struct shash_node
*node
;
1629 json_destroy(m
->monitor_id
);
1630 SHASH_FOR_EACH (node
, &m
->tables
) {
1631 struct ovsdb_jsonrpc_monitor_table
*mt
= node
->data
;
1635 shash_destroy(&m
->tables
);
1636 hmap_remove(&m
->session
->monitors
, &m
->node
);
1640 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class
= {
1641 ovsdb_jsonrpc_monitor_commit
,
1642 ovsdb_jsonrpc_monitor_destroy