1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 #include "jsonrpc-server.h"
24 #include "openvswitch/dynamic-string.h"
26 #include "openvswitch/json.h"
28 #include "ovsdb-error.h"
29 #include "ovsdb-parser.h"
31 #include "condition.h"
32 #include "poll-loop.h"
33 #include "reconnect.h"
40 #include "transaction.h"
43 #include "openvswitch/vlog.h"
45 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server
);
47 struct ovsdb_jsonrpc_remote
;
48 struct ovsdb_jsonrpc_session
;
50 /* Set false to defeature monitor_cond, causing jsonrpc to respond to
51 * monitor_cond method with an error. */
52 static bool monitor_cond_enable__
= true;
54 /* Message rate-limiting. */
55 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
58 static struct ovsdb_jsonrpc_session
*ovsdb_jsonrpc_session_create(
59 struct ovsdb_jsonrpc_remote
*, struct jsonrpc_session
*, bool);
60 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote
*);
61 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote
*);
62 static void ovsdb_jsonrpc_session_get_memory_usage_all(
63 const struct ovsdb_jsonrpc_remote
*, struct simap
*usage
);
64 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote
*);
65 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote
*);
66 static void ovsdb_jsonrpc_session_set_all_options(
67 struct ovsdb_jsonrpc_remote
*, const struct ovsdb_jsonrpc_options
*);
68 static bool ovsdb_jsonrpc_active_session_get_status(
69 const struct ovsdb_jsonrpc_remote
*,
70 struct ovsdb_jsonrpc_remote_status
*);
71 static void ovsdb_jsonrpc_session_get_status(
72 const struct ovsdb_jsonrpc_session
*,
73 struct ovsdb_jsonrpc_remote_status
*);
74 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session
*);
75 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter
*);
76 static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session
*,
77 struct jsonrpc_msg
*);
80 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session
*,
82 struct json
*id
, struct json
*params
);
83 static struct ovsdb_jsonrpc_trigger
*ovsdb_jsonrpc_trigger_find(
84 struct ovsdb_jsonrpc_session
*, const struct json
*id
, size_t hash
);
85 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger
*);
86 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session
*);
87 static void ovsdb_jsonrpc_trigger_complete_done(
88 struct ovsdb_jsonrpc_session
*);
91 static struct jsonrpc_msg
*ovsdb_jsonrpc_monitor_create(
92 struct ovsdb_jsonrpc_session
*, struct ovsdb
*, struct json
*params
,
93 enum ovsdb_monitor_version
, const struct json
*request_id
);
94 static struct jsonrpc_msg
*ovsdb_jsonrpc_monitor_cond_change(
95 struct ovsdb_jsonrpc_session
*s
,
97 const struct json
*request_id
);
98 static struct jsonrpc_msg
*ovsdb_jsonrpc_monitor_cancel(
99 struct ovsdb_jsonrpc_session
*,
100 struct json_array
*params
,
101 const struct json
*request_id
);
102 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session
*);
103 static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session
*);
104 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session
*);
105 static struct json
*ovsdb_jsonrpc_monitor_compose_update(
106 struct ovsdb_jsonrpc_monitor
*monitor
, bool initial
);
107 static struct jsonrpc_msg
* ovsdb_jsonrpc_create_notify(
108 const struct ovsdb_jsonrpc_monitor
*m
,
109 struct json
*params
);
112 /* JSON-RPC database server. */
114 struct ovsdb_jsonrpc_server
{
115 struct ovsdb_server up
;
116 unsigned int n_sessions
;
117 bool read_only
; /* This server is does not accept any
118 transactions that can modify the database. */
119 struct shash remotes
; /* Contains "struct ovsdb_jsonrpc_remote *"s. */
122 /* A configured remote. This is either a passive stream listener plus a list
123 * of the currently connected sessions, or a list of exactly one active
125 struct ovsdb_jsonrpc_remote
{
126 struct ovsdb_jsonrpc_server
*server
;
127 struct pstream
*listener
; /* Listener, if passive. */
128 struct ovs_list sessions
; /* List of "struct ovsdb_jsonrpc_session"s. */
132 static struct ovsdb_jsonrpc_remote
*ovsdb_jsonrpc_server_add_remote(
133 struct ovsdb_jsonrpc_server
*, const char *name
,
134 const struct ovsdb_jsonrpc_options
*options
136 static void ovsdb_jsonrpc_server_del_remote(struct shash_node
*);
138 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
140 * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
141 * which 'server' should provide access. */
142 struct ovsdb_jsonrpc_server
*
143 ovsdb_jsonrpc_server_create(bool read_only
)
145 struct ovsdb_jsonrpc_server
*server
= xzalloc(sizeof *server
);
146 ovsdb_server_init(&server
->up
);
147 shash_init(&server
->remotes
);
148 server
->read_only
= read_only
;
152 /* Adds 'db' to the set of databases served out by 'svr'. Returns true if
153 * successful, false if 'db''s name is the same as some database already in
156 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server
*svr
, struct ovsdb
*db
)
158 /* The OVSDB protocol doesn't have a way to notify a client that a
159 * database has been added. If some client tried to use the database
160 * that we're adding and failed, then forcing it to reconnect seems like
161 * a reasonable way to make it try again.
163 * If this is too big of a hammer in practice, we could be more selective,
164 * e.g. disconnect only connections that actually tried to use a database
165 * with 'db''s name. */
166 ovsdb_jsonrpc_server_reconnect(svr
, svr
->read_only
);
168 return ovsdb_server_add_db(&svr
->up
, db
);
171 /* Removes 'db' from the set of databases served out by 'svr'. Returns
172 * true if successful, false if there is no database associated with 'db'. */
174 ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server
*svr
,
177 /* There might be pointers to 'db' from 'svr', such as monitors or
178 * outstanding transactions. Disconnect all JSON-RPC connections to avoid
179 * accesses to freed memory.
181 * If this is too big of a hammer in practice, we could be more selective,
182 * e.g. disconnect only connections that actually reference 'db'. */
183 ovsdb_jsonrpc_server_reconnect(svr
, svr
->read_only
);
185 return ovsdb_server_remove_db(&svr
->up
, db
);
189 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server
*svr
)
191 struct shash_node
*node
, *next
;
193 SHASH_FOR_EACH_SAFE (node
, next
, &svr
->remotes
) {
194 ovsdb_jsonrpc_server_del_remote(node
);
196 shash_destroy(&svr
->remotes
);
197 ovsdb_server_destroy(&svr
->up
);
201 struct ovsdb_jsonrpc_options
*
202 ovsdb_jsonrpc_default_options(const char *target
)
204 struct ovsdb_jsonrpc_options
*options
= xzalloc(sizeof *options
);
205 options
->max_backoff
= RECONNECT_DEFAULT_MAX_BACKOFF
;
206 options
->probe_interval
= (stream_or_pstream_needs_probes(target
)
207 ? RECONNECT_DEFAULT_PROBE_INTERVAL
212 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
213 * options in the struct ovsdb_jsonrpc_options supplied as the data values.
215 * A remote is an active or passive stream connection method, e.g. "pssl:" or
218 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server
*svr
,
219 const struct shash
*new_remotes
)
221 struct shash_node
*node
, *next
;
223 SHASH_FOR_EACH_SAFE (node
, next
, &svr
->remotes
) {
224 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
225 struct ovsdb_jsonrpc_options
*options
226 = shash_find_data(new_remotes
, node
->name
);
229 VLOG_INFO("%s: remote deconfigured", node
->name
);
230 ovsdb_jsonrpc_server_del_remote(node
);
231 } else if (options
->dscp
!= remote
->dscp
) {
232 ovsdb_jsonrpc_server_del_remote(node
);
235 SHASH_FOR_EACH (node
, new_remotes
) {
236 const struct ovsdb_jsonrpc_options
*options
= node
->data
;
237 struct ovsdb_jsonrpc_remote
*remote
;
239 remote
= shash_find_data(&svr
->remotes
, node
->name
);
241 remote
= ovsdb_jsonrpc_server_add_remote(svr
, node
->name
, options
);
247 ovsdb_jsonrpc_session_set_all_options(remote
, options
);
251 static struct ovsdb_jsonrpc_remote
*
252 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server
*svr
,
254 const struct ovsdb_jsonrpc_options
*options
)
256 struct ovsdb_jsonrpc_remote
*remote
;
257 struct pstream
*listener
;
260 error
= jsonrpc_pstream_open(name
, &listener
, options
->dscp
);
261 if (error
&& error
!= EAFNOSUPPORT
) {
262 VLOG_ERR_RL(&rl
, "%s: listen failed: %s", name
, ovs_strerror(error
));
266 remote
= xmalloc(sizeof *remote
);
267 remote
->server
= svr
;
268 remote
->listener
= listener
;
269 ovs_list_init(&remote
->sessions
);
270 remote
->dscp
= options
->dscp
;
271 shash_add(&svr
->remotes
, name
, remote
);
274 ovsdb_jsonrpc_session_create(remote
, jsonrpc_session_open(name
, true),
281 ovsdb_jsonrpc_server_del_remote(struct shash_node
*node
)
283 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
285 ovsdb_jsonrpc_session_close_all(remote
);
286 pstream_close(remote
->listener
);
287 shash_delete(&remote
->server
->remotes
, node
);
291 /* Stores status information for the remote named 'target', which should have
292 * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
293 * into '*status'. On success returns true, on failure (if 'svr' doesn't have
294 * a remote named 'target' or if that remote is an outbound remote that has no
295 * active connections) returns false. On failure, 'status' will be zeroed.
298 ovsdb_jsonrpc_server_get_remote_status(
299 const struct ovsdb_jsonrpc_server
*svr
, const char *target
,
300 struct ovsdb_jsonrpc_remote_status
*status
)
302 const struct ovsdb_jsonrpc_remote
*remote
;
304 memset(status
, 0, sizeof *status
);
306 remote
= shash_find_data(&svr
->remotes
, target
);
312 if (remote
->listener
) {
313 status
->bound_port
= pstream_get_bound_port(remote
->listener
);
314 status
->is_connected
= !ovs_list_is_empty(&remote
->sessions
);
315 status
->n_connections
= ovs_list_size(&remote
->sessions
);
319 return ovsdb_jsonrpc_active_session_get_status(remote
, status
);
323 ovsdb_jsonrpc_server_free_remote_status(
324 struct ovsdb_jsonrpc_remote_status
*status
)
326 free(status
->locks_held
);
327 free(status
->locks_waiting
);
328 free(status
->locks_lost
);
331 /* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
334 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server
*svr
, bool read_only
)
336 struct shash_node
*node
;
338 svr
->read_only
= read_only
;
339 SHASH_FOR_EACH (node
, &svr
->remotes
) {
340 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
342 ovsdb_jsonrpc_session_reconnect_all(remote
);
347 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server
*svr
)
349 struct shash_node
*node
;
351 SHASH_FOR_EACH (node
, &svr
->remotes
) {
352 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
354 if (remote
->listener
) {
355 struct stream
*stream
;
358 error
= pstream_accept(remote
->listener
, &stream
);
360 struct jsonrpc_session
*js
;
361 js
= jsonrpc_session_open_unreliably(jsonrpc_open(stream
),
363 ovsdb_jsonrpc_session_create(remote
, js
, svr
->read_only
);
364 } else if (error
!= EAGAIN
) {
365 VLOG_WARN_RL(&rl
, "%s: accept failed: %s",
366 pstream_get_name(remote
->listener
),
367 ovs_strerror(error
));
371 ovsdb_jsonrpc_session_run_all(remote
);
376 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server
*svr
)
378 struct shash_node
*node
;
380 SHASH_FOR_EACH (node
, &svr
->remotes
) {
381 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
383 if (remote
->listener
) {
384 pstream_wait(remote
->listener
);
387 ovsdb_jsonrpc_session_wait_all(remote
);
391 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
392 * memory_report(). */
394 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server
*svr
,
397 struct shash_node
*node
;
399 simap_increase(usage
, "sessions", svr
->n_sessions
);
400 SHASH_FOR_EACH (node
, &svr
->remotes
) {
401 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
403 ovsdb_jsonrpc_session_get_memory_usage_all(remote
, usage
);
407 /* JSON-RPC database server session. */
409 struct ovsdb_jsonrpc_session
{
410 struct ovs_list node
; /* Element in remote's sessions list. */
411 struct ovsdb_session up
;
412 struct ovsdb_jsonrpc_remote
*remote
;
415 struct hmap triggers
; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
418 struct hmap monitors
; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
420 /* Network connectivity. */
421 struct jsonrpc_session
*js
; /* JSON-RPC session. */
422 unsigned int js_seqno
; /* Last jsonrpc_session_get_seqno() value. */
425 bool read_only
; /* When true, not allow to modify the
429 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session
*);
430 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session
*);
431 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session
*);
432 static void ovsdb_jsonrpc_session_get_memory_usage(
433 const struct ovsdb_jsonrpc_session
*, struct simap
*usage
);
434 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session
*,
435 struct jsonrpc_msg
*);
436 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session
*,
437 struct jsonrpc_msg
*);
439 static struct ovsdb_jsonrpc_session
*
440 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote
*remote
,
441 struct jsonrpc_session
*js
, bool read_only
)
443 struct ovsdb_jsonrpc_session
*s
;
445 s
= xzalloc(sizeof *s
);
446 ovsdb_session_init(&s
->up
, &remote
->server
->up
);
448 ovs_list_push_back(&remote
->sessions
, &s
->node
);
449 hmap_init(&s
->triggers
);
450 hmap_init(&s
->monitors
);
452 s
->js_seqno
= jsonrpc_session_get_seqno(js
);
453 s
->read_only
= read_only
;
455 remote
->server
->n_sessions
++;
461 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session
*s
)
463 ovsdb_jsonrpc_monitor_remove_all(s
);
464 ovsdb_jsonrpc_session_unlock_all(s
);
465 ovsdb_jsonrpc_trigger_complete_all(s
);
467 hmap_destroy(&s
->monitors
);
468 hmap_destroy(&s
->triggers
);
470 jsonrpc_session_close(s
->js
);
471 ovs_list_remove(&s
->node
);
472 s
->remote
->server
->n_sessions
--;
473 ovsdb_session_destroy(&s
->up
);
478 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session
*s
)
480 jsonrpc_session_run(s
->js
);
481 if (s
->js_seqno
!= jsonrpc_session_get_seqno(s
->js
)) {
482 s
->js_seqno
= jsonrpc_session_get_seqno(s
->js
);
483 ovsdb_jsonrpc_trigger_complete_all(s
);
484 ovsdb_jsonrpc_monitor_remove_all(s
);
485 ovsdb_jsonrpc_session_unlock_all(s
);
488 ovsdb_jsonrpc_trigger_complete_done(s
);
490 if (!jsonrpc_session_get_backlog(s
->js
)) {
491 struct jsonrpc_msg
*msg
;
493 ovsdb_jsonrpc_monitor_flush_all(s
);
495 msg
= jsonrpc_session_recv(s
->js
);
497 if (msg
->type
== JSONRPC_REQUEST
) {
498 ovsdb_jsonrpc_session_got_request(s
, msg
);
499 } else if (msg
->type
== JSONRPC_NOTIFY
) {
500 ovsdb_jsonrpc_session_got_notify(s
, msg
);
502 VLOG_WARN("%s: received unexpected %s message",
503 jsonrpc_session_get_name(s
->js
),
504 jsonrpc_msg_type_to_string(msg
->type
));
505 jsonrpc_session_force_reconnect(s
->js
);
506 jsonrpc_msg_destroy(msg
);
510 return jsonrpc_session_is_alive(s
->js
) ? 0 : ETIMEDOUT
;
514 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session
*session
,
515 const struct ovsdb_jsonrpc_options
*options
)
517 jsonrpc_session_set_max_backoff(session
->js
, options
->max_backoff
);
518 jsonrpc_session_set_probe_interval(session
->js
, options
->probe_interval
);
519 jsonrpc_session_set_dscp(session
->js
, options
->dscp
);
523 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote
*remote
)
525 struct ovsdb_jsonrpc_session
*s
, *next
;
527 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
528 int error
= ovsdb_jsonrpc_session_run(s
);
530 ovsdb_jsonrpc_session_close(s
);
536 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session
*s
)
538 jsonrpc_session_wait(s
->js
);
539 if (!jsonrpc_session_get_backlog(s
->js
)) {
540 if (ovsdb_jsonrpc_monitor_needs_flush(s
)) {
541 poll_immediate_wake();
543 jsonrpc_session_recv_wait(s
->js
);
549 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote
*remote
)
551 struct ovsdb_jsonrpc_session
*s
;
553 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
554 ovsdb_jsonrpc_session_wait(s
);
559 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session
*s
,
562 simap_increase(usage
, "triggers", hmap_count(&s
->triggers
));
563 simap_increase(usage
, "backlog", jsonrpc_session_get_backlog(s
->js
));
567 ovsdb_jsonrpc_session_get_memory_usage_all(
568 const struct ovsdb_jsonrpc_remote
*remote
,
571 struct ovsdb_jsonrpc_session
*s
;
573 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
574 ovsdb_jsonrpc_session_get_memory_usage(s
, usage
);
579 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote
*remote
)
581 struct ovsdb_jsonrpc_session
*s
, *next
;
583 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
584 ovsdb_jsonrpc_session_close(s
);
588 /* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
591 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote
*remote
)
593 struct ovsdb_jsonrpc_session
*s
, *next
;
595 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
596 jsonrpc_session_force_reconnect(s
->js
);
597 if (!jsonrpc_session_is_alive(s
->js
)) {
598 ovsdb_jsonrpc_session_close(s
);
603 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
606 * (The dscp value can't be changed directly; the caller must instead close and
607 * re-open the session.) */
609 ovsdb_jsonrpc_session_set_all_options(
610 struct ovsdb_jsonrpc_remote
*remote
,
611 const struct ovsdb_jsonrpc_options
*options
)
613 struct ovsdb_jsonrpc_session
*s
;
615 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
616 ovsdb_jsonrpc_session_set_options(s
, options
);
620 /* Sets the 'status' of for the 'remote' with an outgoing connection. */
622 ovsdb_jsonrpc_active_session_get_status(
623 const struct ovsdb_jsonrpc_remote
*remote
,
624 struct ovsdb_jsonrpc_remote_status
*status
)
626 const struct ovs_list
*sessions
= &remote
->sessions
;
627 const struct ovsdb_jsonrpc_session
*s
;
629 if (ovs_list_is_empty(sessions
)) {
633 ovs_assert(ovs_list_is_singleton(sessions
));
634 s
= CONTAINER_OF(ovs_list_front(sessions
), struct ovsdb_jsonrpc_session
, node
);
635 ovsdb_jsonrpc_session_get_status(s
, status
);
636 status
->n_connections
= 1;
642 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_session
*session
,
643 struct ovsdb_jsonrpc_remote_status
*status
)
645 const struct ovsdb_jsonrpc_session
*s
= session
;
646 const struct jsonrpc_session
*js
;
647 struct ovsdb_lock_waiter
*waiter
;
648 struct reconnect_stats rstats
;
649 struct ds locks_held
, locks_waiting
, locks_lost
;
653 status
->is_connected
= jsonrpc_session_is_connected(js
);
654 status
->last_error
= jsonrpc_session_get_status(js
);
656 jsonrpc_session_get_reconnect_stats(js
, &rstats
);
657 status
->state
= rstats
.state
;
658 status
->sec_since_connect
= rstats
.msec_since_connect
== UINT_MAX
659 ? UINT_MAX
: rstats
.msec_since_connect
/ 1000;
660 status
->sec_since_disconnect
= rstats
.msec_since_disconnect
== UINT_MAX
661 ? UINT_MAX
: rstats
.msec_since_disconnect
/ 1000;
663 ds_init(&locks_held
);
664 ds_init(&locks_waiting
);
665 ds_init(&locks_lost
);
666 HMAP_FOR_EACH (waiter
, session_node
, &s
->up
.waiters
) {
669 string
= (ovsdb_lock_waiter_is_owner(waiter
) ? &locks_held
670 : waiter
->mode
== OVSDB_LOCK_WAIT
? &locks_waiting
672 if (string
->length
) {
673 ds_put_char(string
, ' ');
675 ds_put_cstr(string
, waiter
->lock_name
);
677 status
->locks_held
= ds_steal_cstr(&locks_held
);
678 status
->locks_waiting
= ds_steal_cstr(&locks_waiting
);
679 status
->locks_lost
= ds_steal_cstr(&locks_lost
);
682 /* Examines 'request' to determine the database to which it relates, and then
683 * searches 's' to find that database:
685 * - If successful, returns the database and sets '*replyp' to NULL.
687 * - If no such database exists, returns NULL and sets '*replyp' to an
688 * appropriate JSON-RPC error reply, owned by the caller. */
689 static struct ovsdb
*
690 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session
*s
,
691 const struct jsonrpc_msg
*request
,
692 struct jsonrpc_msg
**replyp
)
694 struct json_array
*params
;
695 struct ovsdb_error
*error
;
699 params
= json_array(request
->params
);
700 if (!params
->n
|| params
->elems
[0]->type
!= JSON_STRING
) {
701 error
= ovsdb_syntax_error(
702 request
->params
, NULL
,
703 "%s request params must begin with <db-name>", request
->method
);
707 db_name
= params
->elems
[0]->u
.string
;
708 db
= shash_find_data(&s
->up
.server
->dbs
, db_name
);
710 error
= ovsdb_syntax_error(
711 request
->params
, "unknown database",
712 "%s request specifies unknown database %s",
713 request
->method
, db_name
);
721 *replyp
= jsonrpc_create_error(ovsdb_error_to_json(error
), request
->id
);
722 ovsdb_error_destroy(error
);
726 static struct ovsdb_error
*
727 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg
*request
,
728 const char **lock_namep
)
730 const struct json_array
*params
;
732 params
= json_array(request
->params
);
733 if (params
->n
!= 1 || params
->elems
[0]->type
!= JSON_STRING
||
734 !ovsdb_parser_is_id(json_string(params
->elems
[0]))) {
736 return ovsdb_syntax_error(request
->params
, NULL
,
737 "%s request params must be <id>",
741 *lock_namep
= json_string(params
->elems
[0]);
746 ovsdb_jsonrpc_session_notify(struct ovsdb_session
*session
,
747 const char *lock_name
,
750 struct ovsdb_jsonrpc_session
*s
;
753 s
= CONTAINER_OF(session
, struct ovsdb_jsonrpc_session
, up
);
754 params
= json_array_create_1(json_string_create(lock_name
));
755 ovsdb_jsonrpc_session_send(s
, jsonrpc_create_notify(method
, params
));
758 static struct jsonrpc_msg
*
759 jsonrpc_create_readonly_lock_error(const struct json
*id
)
761 return jsonrpc_create_error(json_string_create(
762 "lock and unlock methods not allowed,"
763 " DB server is read only."), id
);
766 static struct jsonrpc_msg
*
767 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session
*s
,
768 struct jsonrpc_msg
*request
,
769 enum ovsdb_lock_mode mode
)
771 struct ovsdb_lock_waiter
*waiter
;
772 struct jsonrpc_msg
*reply
;
773 struct ovsdb_error
*error
;
774 struct ovsdb_session
*victim
;
775 const char *lock_name
;
779 return jsonrpc_create_readonly_lock_error(request
->id
);
782 error
= ovsdb_jsonrpc_session_parse_lock_name(request
, &lock_name
);
787 /* Report error if this session has issued a "lock" or "steal" without a
788 * matching "unlock" for this lock. */
789 waiter
= ovsdb_session_get_lock_waiter(&s
->up
, lock_name
);
791 error
= ovsdb_syntax_error(
792 request
->params
, NULL
,
793 "must issue \"unlock\" before new \"%s\"", request
->method
);
797 /* Get the lock, add us as a waiter. */
798 waiter
= ovsdb_server_lock(&s
->remote
->server
->up
, &s
->up
, lock_name
, mode
,
801 ovsdb_jsonrpc_session_notify(victim
, lock_name
, "stolen");
804 result
= json_object_create();
805 json_object_put(result
, "locked",
806 json_boolean_create(ovsdb_lock_waiter_is_owner(waiter
)));
808 return jsonrpc_create_reply(result
, request
->id
);
811 reply
= jsonrpc_create_error(ovsdb_error_to_json(error
), request
->id
);
812 ovsdb_error_destroy(error
);
817 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session
*s
)
819 struct ovsdb_lock_waiter
*waiter
, *next
;
821 HMAP_FOR_EACH_SAFE (waiter
, next
, session_node
, &s
->up
.waiters
) {
822 ovsdb_jsonrpc_session_unlock__(waiter
);
827 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter
*waiter
)
829 struct ovsdb_lock
*lock
= waiter
->lock
;
832 struct ovsdb_session
*new_owner
= ovsdb_lock_waiter_remove(waiter
);
834 ovsdb_jsonrpc_session_notify(new_owner
, lock
->name
, "locked");
836 /* ovsdb_server_lock() might have freed 'lock'. */
840 ovsdb_lock_waiter_destroy(waiter
);
843 static struct jsonrpc_msg
*
844 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session
*s
,
845 struct jsonrpc_msg
*request
)
847 struct ovsdb_lock_waiter
*waiter
;
848 struct jsonrpc_msg
*reply
;
849 struct ovsdb_error
*error
;
850 const char *lock_name
;
853 return jsonrpc_create_readonly_lock_error(request
->id
);
856 error
= ovsdb_jsonrpc_session_parse_lock_name(request
, &lock_name
);
861 /* Report error if this session has not issued a "lock" or "steal" for this
863 waiter
= ovsdb_session_get_lock_waiter(&s
->up
, lock_name
);
865 error
= ovsdb_syntax_error(
866 request
->params
, NULL
, "\"unlock\" without \"lock\" or \"steal\"");
870 ovsdb_jsonrpc_session_unlock__(waiter
);
872 return jsonrpc_create_reply(json_object_create(), request
->id
);
875 reply
= jsonrpc_create_error(ovsdb_error_to_json(error
), request
->id
);
876 ovsdb_error_destroy(error
);
880 static struct jsonrpc_msg
*
881 execute_transaction(struct ovsdb_jsonrpc_session
*s
, struct ovsdb
*db
,
882 struct jsonrpc_msg
*request
)
884 ovsdb_jsonrpc_trigger_create(s
, db
, request
->id
, request
->params
);
886 request
->params
= NULL
;
887 jsonrpc_msg_destroy(request
);
892 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session
*s
,
893 struct jsonrpc_msg
*request
)
895 struct jsonrpc_msg
*reply
;
897 if (!strcmp(request
->method
, "transact")) {
898 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
900 reply
= execute_transaction(s
, db
, request
);
902 } else if (!strcmp(request
->method
, "monitor") ||
903 (monitor_cond_enable__
&& !strcmp(request
->method
,
905 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
907 int l
= strlen(request
->method
) - strlen("monitor");
908 enum ovsdb_monitor_version version
= l
? OVSDB_MONITOR_V2
910 reply
= ovsdb_jsonrpc_monitor_create(s
, db
, request
->params
,
911 version
, request
->id
);
913 } else if (!strcmp(request
->method
, "monitor_cond_change")) {
914 reply
= ovsdb_jsonrpc_monitor_cond_change(s
, request
->params
,
916 } else if (!strcmp(request
->method
, "monitor_cancel")) {
917 reply
= ovsdb_jsonrpc_monitor_cancel(s
, json_array(request
->params
),
919 } else if (!strcmp(request
->method
, "get_schema")) {
920 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
922 reply
= jsonrpc_create_reply(ovsdb_schema_to_json(db
->schema
),
925 } else if (!strcmp(request
->method
, "list_dbs")) {
926 size_t n_dbs
= shash_count(&s
->up
.server
->dbs
);
927 struct shash_node
*node
;
931 dbs
= xmalloc(n_dbs
* sizeof *dbs
);
933 SHASH_FOR_EACH (node
, &s
->up
.server
->dbs
) {
934 dbs
[i
++] = json_string_create(node
->name
);
936 reply
= jsonrpc_create_reply(json_array_create(dbs
, n_dbs
),
938 } else if (!strcmp(request
->method
, "lock")) {
939 reply
= ovsdb_jsonrpc_session_lock(s
, request
, OVSDB_LOCK_WAIT
);
940 } else if (!strcmp(request
->method
, "steal")) {
941 reply
= ovsdb_jsonrpc_session_lock(s
, request
, OVSDB_LOCK_STEAL
);
942 } else if (!strcmp(request
->method
, "unlock")) {
943 reply
= ovsdb_jsonrpc_session_unlock(s
, request
);
944 } else if (!strcmp(request
->method
, "echo")) {
945 reply
= jsonrpc_create_reply(json_clone(request
->params
), request
->id
);
947 reply
= jsonrpc_create_error(json_string_create("unknown method"),
952 jsonrpc_msg_destroy(request
);
953 ovsdb_jsonrpc_session_send(s
, reply
);
958 execute_cancel(struct ovsdb_jsonrpc_session
*s
, struct jsonrpc_msg
*request
)
960 if (json_array(request
->params
)->n
== 1) {
961 struct ovsdb_jsonrpc_trigger
*t
;
964 id
= request
->params
->u
.array
.elems
[0];
965 t
= ovsdb_jsonrpc_trigger_find(s
, id
, json_hash(id
, 0));
967 ovsdb_jsonrpc_trigger_complete(t
);
973 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session
*s
,
974 struct jsonrpc_msg
*request
)
976 if (!strcmp(request
->method
, "cancel")) {
977 execute_cancel(s
, request
);
979 jsonrpc_msg_destroy(request
);
983 ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session
*s
,
984 struct jsonrpc_msg
*msg
)
986 ovsdb_jsonrpc_monitor_flush_all(s
);
987 jsonrpc_session_send(s
->js
, msg
);
990 /* JSON-RPC database server triggers.
992 * (Every transaction is treated as a trigger even if it doesn't actually have
993 * any "wait" operations.) */
995 struct ovsdb_jsonrpc_trigger
{
996 struct ovsdb_trigger trigger
;
997 struct hmap_node hmap_node
; /* In session's "triggers" hmap. */
1002 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session
*s
, struct ovsdb
*db
,
1003 struct json
*id
, struct json
*params
)
1005 struct ovsdb_jsonrpc_trigger
*t
;
1008 /* Check for duplicate ID. */
1009 hash
= json_hash(id
, 0);
1010 t
= ovsdb_jsonrpc_trigger_find(s
, id
, hash
);
1012 struct jsonrpc_msg
*msg
;
1014 msg
= jsonrpc_create_error(json_string_create("duplicate request ID"),
1016 ovsdb_jsonrpc_session_send(s
, msg
);
1018 json_destroy(params
);
1022 /* Insert into trigger table. */
1023 t
= xmalloc(sizeof *t
);
1024 ovsdb_trigger_init(&s
->up
, db
, &t
->trigger
, params
, time_msec(),
1027 hmap_insert(&s
->triggers
, &t
->hmap_node
, hash
);
1029 /* Complete early if possible. */
1030 if (ovsdb_trigger_is_complete(&t
->trigger
)) {
1031 ovsdb_jsonrpc_trigger_complete(t
);
1035 static struct ovsdb_jsonrpc_trigger
*
1036 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session
*s
,
1037 const struct json
*id
, size_t hash
)
1039 struct ovsdb_jsonrpc_trigger
*t
;
1041 HMAP_FOR_EACH_WITH_HASH (t
, hmap_node
, hash
, &s
->triggers
) {
1042 if (json_equal(t
->id
, id
)) {
1051 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger
*t
)
1053 struct ovsdb_jsonrpc_session
*s
;
1055 s
= CONTAINER_OF(t
->trigger
.session
, struct ovsdb_jsonrpc_session
, up
);
1057 if (jsonrpc_session_is_connected(s
->js
)) {
1058 struct jsonrpc_msg
*reply
;
1059 struct json
*result
;
1061 result
= ovsdb_trigger_steal_result(&t
->trigger
);
1063 reply
= jsonrpc_create_reply(result
, t
->id
);
1065 reply
= jsonrpc_create_error(json_string_create("canceled"),
1068 ovsdb_jsonrpc_session_send(s
, reply
);
1071 json_destroy(t
->id
);
1072 ovsdb_trigger_destroy(&t
->trigger
);
1073 hmap_remove(&s
->triggers
, &t
->hmap_node
);
1078 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session
*s
)
1080 struct ovsdb_jsonrpc_trigger
*t
, *next
;
1081 HMAP_FOR_EACH_SAFE (t
, next
, hmap_node
, &s
->triggers
) {
1082 ovsdb_jsonrpc_trigger_complete(t
);
1087 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session
*s
)
1089 while (!ovs_list_is_empty(&s
->up
.completions
)) {
1090 struct ovsdb_jsonrpc_trigger
*t
1091 = CONTAINER_OF(s
->up
.completions
.next
,
1092 struct ovsdb_jsonrpc_trigger
, trigger
.node
);
1093 ovsdb_jsonrpc_trigger_complete(t
);
1097 /* Jsonrpc front end monitor. */
1098 struct ovsdb_jsonrpc_monitor
{
1099 struct ovsdb_jsonrpc_session
*session
;
1101 struct hmap_node node
; /* In ovsdb_jsonrpc_session's "monitors". */
1102 struct json
*monitor_id
;
1103 struct ovsdb_monitor
*dbmon
;
1104 uint64_t unflushed
; /* The first transaction that has not been
1105 flushed to the jsonrpc remote client. */
1106 enum ovsdb_monitor_version version
;
1107 struct ovsdb_monitor_session_condition
*condition
;/* Session's condition */
1110 static struct ovsdb_jsonrpc_monitor
*
1111 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session
*s
,
1112 const struct json
*monitor_id
)
1114 struct ovsdb_jsonrpc_monitor
*m
;
1116 HMAP_FOR_EACH_WITH_HASH (m
, node
, json_hash(monitor_id
, 0), &s
->monitors
) {
1117 if (json_equal(m
->monitor_id
, monitor_id
)) {
1126 parse_bool(struct ovsdb_parser
*parser
, const char *name
, bool default_value
)
1128 const struct json
*json
;
1130 json
= ovsdb_parser_member(parser
, name
, OP_BOOLEAN
| OP_OPTIONAL
);
1131 return json
? json_boolean(json
) : default_value
;
1134 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
1135 ovsdb_jsonrpc_parse_monitor_request(
1136 struct ovsdb_monitor
*dbmon
,
1137 const struct ovsdb_table
*table
,
1138 struct ovsdb_monitor_session_condition
*cond
,
1139 const struct json
*monitor_request
)
1141 const struct ovsdb_table_schema
*ts
= table
->schema
;
1142 enum ovsdb_monitor_selection select
;
1143 const struct json
*columns
, *select_json
, *where
= NULL
;
1144 struct ovsdb_parser parser
;
1145 struct ovsdb_error
*error
;
1147 ovsdb_parser_init(&parser
, monitor_request
, "table %s", ts
->name
);
1149 where
= ovsdb_parser_member(&parser
, "where", OP_ARRAY
| OP_OPTIONAL
);
1151 columns
= ovsdb_parser_member(&parser
, "columns", OP_ARRAY
| OP_OPTIONAL
);
1153 select_json
= ovsdb_parser_member(&parser
, "select",
1154 OP_OBJECT
| OP_OPTIONAL
);
1156 error
= ovsdb_parser_finish(&parser
);
1163 ovsdb_parser_init(&parser
, select_json
, "table %s select", ts
->name
);
1164 if (parse_bool(&parser
, "initial", true)) {
1165 select
|= OJMS_INITIAL
;
1167 if (parse_bool(&parser
, "insert", true)) {
1168 select
|= OJMS_INSERT
;
1170 if (parse_bool(&parser
, "delete", true)) {
1171 select
|= OJMS_DELETE
;
1173 if (parse_bool(&parser
, "modify", true)) {
1174 select
|= OJMS_MODIFY
;
1176 error
= ovsdb_parser_finish(&parser
);
1181 select
= OJMS_INITIAL
| OJMS_INSERT
| OJMS_DELETE
| OJMS_MODIFY
;
1184 ovsdb_monitor_table_add_select(dbmon
, table
, select
);
1188 if (columns
->type
!= JSON_ARRAY
) {
1189 return ovsdb_syntax_error(columns
, NULL
,
1190 "array of column names expected");
1193 for (i
= 0; i
< columns
->u
.array
.n
; i
++) {
1194 const struct ovsdb_column
*column
;
1197 if (columns
->u
.array
.elems
[i
]->type
!= JSON_STRING
) {
1198 return ovsdb_syntax_error(columns
, NULL
,
1199 "array of column names expected");
1202 s
= columns
->u
.array
.elems
[i
]->u
.string
;
1203 column
= shash_find_data(&table
->schema
->columns
, s
);
1205 return ovsdb_syntax_error(columns
, NULL
, "%s is not a valid "
1208 if (ovsdb_monitor_add_column(dbmon
, table
, column
,
1210 return ovsdb_syntax_error(columns
, NULL
, "column %s "
1211 "mentioned more than once",
1216 struct shash_node
*node
;
1218 SHASH_FOR_EACH (node
, &ts
->columns
) {
1219 const struct ovsdb_column
*column
= node
->data
;
1220 if (column
->index
!= OVSDB_COL_UUID
) {
1221 if (ovsdb_monitor_add_column(dbmon
, table
, column
,
1223 return ovsdb_syntax_error(columns
, NULL
, "column %s "
1224 "mentioned more than once",
1231 error
= ovsdb_monitor_table_condition_create(cond
, table
, where
);
1240 static struct jsonrpc_msg
*
1241 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session
*s
, struct ovsdb
*db
,
1242 struct json
*params
,
1243 enum ovsdb_monitor_version version
,
1244 const struct json
*request_id
)
1246 struct ovsdb_jsonrpc_monitor
*m
= NULL
;
1247 struct ovsdb_monitor
*dbmon
= NULL
;
1248 struct json
*monitor_id
, *monitor_requests
;
1249 struct ovsdb_error
*error
= NULL
;
1250 struct shash_node
*node
;
1253 if (json_array(params
)->n
!= 3) {
1254 error
= ovsdb_syntax_error(params
, NULL
, "invalid parameters");
1257 monitor_id
= params
->u
.array
.elems
[1];
1258 monitor_requests
= params
->u
.array
.elems
[2];
1259 if (monitor_requests
->type
!= JSON_OBJECT
) {
1260 error
= ovsdb_syntax_error(monitor_requests
, NULL
,
1261 "monitor-requests must be object");
1265 if (ovsdb_jsonrpc_monitor_find(s
, monitor_id
)) {
1266 error
= ovsdb_syntax_error(monitor_id
, NULL
, "duplicate monitor ID");
1270 m
= xzalloc(sizeof *m
);
1273 m
->dbmon
= ovsdb_monitor_create(db
, m
);
1274 if (version
== OVSDB_MONITOR_V2
) {
1275 m
->condition
= ovsdb_monitor_session_condition_create();
1278 m
->version
= version
;
1279 hmap_insert(&s
->monitors
, &m
->node
, json_hash(monitor_id
, 0));
1280 m
->monitor_id
= json_clone(monitor_id
);
1282 SHASH_FOR_EACH (node
, json_object(monitor_requests
)) {
1283 const struct ovsdb_table
*table
;
1284 const struct json
*mr_value
;
1287 table
= ovsdb_get_table(m
->db
, node
->name
);
1289 error
= ovsdb_syntax_error(NULL
, NULL
,
1290 "no table named %s", node
->name
);
1294 ovsdb_monitor_add_table(m
->dbmon
, table
);
1296 /* Parse columns. */
1297 mr_value
= node
->data
;
1298 if (mr_value
->type
== JSON_ARRAY
) {
1299 const struct json_array
*array
= &mr_value
->u
.array
;
1301 for (i
= 0; i
< array
->n
; i
++) {
1302 error
= ovsdb_jsonrpc_parse_monitor_request(m
->dbmon
,
1311 error
= ovsdb_jsonrpc_parse_monitor_request(m
->dbmon
,
1321 dbmon
= ovsdb_monitor_add(m
->dbmon
);
1322 if (dbmon
!= m
->dbmon
) {
1323 /* Found an exisiting dbmon, reuse the current one. */
1324 ovsdb_monitor_remove_jsonrpc_monitor(m
->dbmon
, m
, m
->unflushed
);
1325 ovsdb_monitor_add_jsonrpc_monitor(dbmon
, m
);
1329 /* Only now we can bind session's condition to ovsdb_monitor */
1331 ovsdb_monitor_condition_bind(m
->dbmon
, m
->condition
);
1334 ovsdb_monitor_get_initial(m
->dbmon
);
1335 json
= ovsdb_jsonrpc_monitor_compose_update(m
, true);
1336 json
= json
? json
: json_object_create();
1337 return jsonrpc_create_reply(json
, request_id
);
1341 ovsdb_jsonrpc_monitor_destroy(m
);
1344 json
= ovsdb_error_to_json(error
);
1345 ovsdb_error_destroy(error
);
1346 return jsonrpc_create_error(json
, request_id
);
1349 static struct ovsdb_error
*
1350 ovsdb_jsonrpc_parse_monitor_cond_change_request(
1351 struct ovsdb_jsonrpc_monitor
*m
,
1352 const struct ovsdb_table
*table
,
1353 const struct json
*cond_change_req
)
1355 const struct ovsdb_table_schema
*ts
= table
->schema
;
1356 const struct json
*condition
, *columns
;
1357 struct ovsdb_parser parser
;
1358 struct ovsdb_error
*error
;
1360 ovsdb_parser_init(&parser
, cond_change_req
, "table %s", ts
->name
);
1361 columns
= ovsdb_parser_member(&parser
, "columns", OP_ARRAY
| OP_OPTIONAL
);
1362 condition
= ovsdb_parser_member(&parser
, "where", OP_ARRAY
| OP_OPTIONAL
);
1364 error
= ovsdb_parser_finish(&parser
);
1370 error
= ovsdb_syntax_error(cond_change_req
, NULL
, "changing columns "
1374 error
= ovsdb_monitor_table_condition_update(m
->dbmon
, m
->condition
, table
,
1380 static struct jsonrpc_msg
*
1381 ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session
*s
,
1382 struct json
*params
,
1383 const struct json
*request_id
)
1385 struct ovsdb_error
*error
;
1386 struct ovsdb_jsonrpc_monitor
*m
;
1387 struct json
*monitor_cond_change_reqs
;
1388 struct shash_node
*node
;
1391 if (json_array(params
)->n
!= 3) {
1392 error
= ovsdb_syntax_error(params
, NULL
, "invalid parameters");
1396 m
= ovsdb_jsonrpc_monitor_find(s
, params
->u
.array
.elems
[0]);
1398 error
= ovsdb_syntax_error(request_id
, NULL
,
1399 "unknown monitor session");
1403 monitor_cond_change_reqs
= params
->u
.array
.elems
[2];
1404 if (monitor_cond_change_reqs
->type
!= JSON_OBJECT
) {
1406 ovsdb_syntax_error(NULL
, NULL
,
1407 "monitor-cond-change-requests must be object");
1411 SHASH_FOR_EACH (node
, json_object(monitor_cond_change_reqs
)) {
1412 const struct ovsdb_table
*table
;
1413 const struct json
*mr_value
;
1416 table
= ovsdb_get_table(m
->db
, node
->name
);
1418 error
= ovsdb_syntax_error(NULL
, NULL
,
1419 "no table named %s", node
->name
);
1422 if (!ovsdb_monitor_table_exists(m
->dbmon
, table
)) {
1423 error
= ovsdb_syntax_error(NULL
, NULL
,
1424 "no table named %s in monitor session",
1429 mr_value
= node
->data
;
1430 if (mr_value
->type
== JSON_ARRAY
) {
1431 const struct json_array
*array
= &mr_value
->u
.array
;
1433 for (i
= 0; i
< array
->n
; i
++) {
1434 error
= ovsdb_jsonrpc_parse_monitor_cond_change_request(
1435 m
, table
, array
->elems
[i
]);
1441 error
= ovsdb_syntax_error(
1443 "table %s no monitor-cond-change JSON array",
1449 /* Change monitor id */
1450 hmap_remove(&s
->monitors
, &m
->node
);
1451 json_destroy(m
->monitor_id
);
1452 m
->monitor_id
= json_clone(params
->u
.array
.elems
[1]);
1453 hmap_insert(&s
->monitors
, &m
->node
, json_hash(m
->monitor_id
, 0));
1455 /* Send the new update, if any, represents the difference from the old
1456 * condition and the new one. */
1457 struct json
*update_json
;
1459 update_json
= ovsdb_monitor_get_update(m
->dbmon
, false, true,
1460 &m
->unflushed
, m
->condition
, m
->version
);
1462 struct jsonrpc_msg
*msg
;
1463 struct json
*params
;
1465 params
= json_array_create_2(json_clone(m
->monitor_id
), update_json
);
1466 msg
= ovsdb_jsonrpc_create_notify(m
, params
);
1467 jsonrpc_session_send(s
->js
, msg
);
1470 return jsonrpc_create_reply(json_object_create(), request_id
);
1474 json
= ovsdb_error_to_json(error
);
1475 ovsdb_error_destroy(error
);
1476 return jsonrpc_create_error(json
, request_id
);
1479 static struct jsonrpc_msg
*
1480 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session
*s
,
1481 struct json_array
*params
,
1482 const struct json
*request_id
)
1484 if (params
->n
!= 1) {
1485 return jsonrpc_create_error(json_string_create("invalid parameters"),
1488 struct ovsdb_jsonrpc_monitor
*m
;
1490 m
= ovsdb_jsonrpc_monitor_find(s
, params
->elems
[0]);
1492 return jsonrpc_create_error(json_string_create("unknown monitor"),
1495 ovsdb_jsonrpc_monitor_destroy(m
);
1496 return jsonrpc_create_reply(json_object_create(), request_id
);
1502 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session
*s
)
1504 struct ovsdb_jsonrpc_monitor
*m
, *next
;
1506 HMAP_FOR_EACH_SAFE (m
, next
, node
, &s
->monitors
) {
1507 ovsdb_jsonrpc_monitor_destroy(m
);
1511 static struct json
*
1512 ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor
*m
,
1516 if (!ovsdb_monitor_needs_flush(m
->dbmon
, m
->unflushed
)) {
1520 return ovsdb_monitor_get_update(m
->dbmon
, initial
, false,
1521 &m
->unflushed
, m
->condition
, m
->version
);
1525 ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session
*s
)
1527 struct ovsdb_jsonrpc_monitor
*m
;
1529 HMAP_FOR_EACH (m
, node
, &s
->monitors
) {
1530 if (ovsdb_monitor_needs_flush(m
->dbmon
, m
->unflushed
)) {
1539 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_jsonrpc_monitor
*m
)
1541 json_destroy(m
->monitor_id
);
1542 hmap_remove(&m
->session
->monitors
, &m
->node
);
1543 ovsdb_monitor_remove_jsonrpc_monitor(m
->dbmon
, m
, m
->unflushed
);
1544 ovsdb_monitor_session_condition_destroy(m
->condition
);
1548 static struct jsonrpc_msg
*
1549 ovsdb_jsonrpc_create_notify(const struct ovsdb_jsonrpc_monitor
*m
,
1550 struct json
*params
)
1554 switch(m
->version
) {
1555 case OVSDB_MONITOR_V1
:
1558 case OVSDB_MONITOR_V2
:
1561 case OVSDB_MONITOR_VERSION_MAX
:
1566 return jsonrpc_create_notify(method
, params
);
1570 ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session
*s
)
1572 struct ovsdb_jsonrpc_monitor
*m
;
1574 HMAP_FOR_EACH (m
, node
, &s
->monitors
) {
1577 json
= ovsdb_jsonrpc_monitor_compose_update(m
, false);
1579 struct jsonrpc_msg
*msg
;
1580 struct json
*params
;
1582 params
= json_array_create_2(json_clone(m
->monitor_id
), json
);
1583 msg
= ovsdb_jsonrpc_create_notify(m
, params
);
1584 jsonrpc_session_send(s
->js
, msg
);
1590 ovsdb_jsonrpc_disable_monitor_cond(void)
1592 /* Once disabled, it is not possible to re-enable it. */
1593 monitor_cond_enable__
= false;