1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 #include "jsonrpc-server.h"
24 #include "openvswitch/dynamic-string.h"
26 #include "openvswitch/json.h"
28 #include "ovsdb-error.h"
29 #include "ovsdb-parser.h"
31 #include "condition.h"
32 #include "openvswitch/poll-loop.h"
33 #include "reconnect.h"
41 #include "transaction.h"
44 #include "openvswitch/vlog.h"
46 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server
);
48 struct ovsdb_jsonrpc_remote
;
49 struct ovsdb_jsonrpc_session
;
51 /* Set false to defeature monitor_cond, causing jsonrpc to respond to
52 * monitor_cond method with an error. */
53 static bool monitor_cond_enable__
= true;
55 /* Message rate-limiting. */
56 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
59 static struct ovsdb_jsonrpc_session
*ovsdb_jsonrpc_session_create(
60 struct ovsdb_jsonrpc_remote
*, struct jsonrpc_session
*, bool);
61 static void ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote
*,
63 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote
*);
64 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote
*);
65 static void ovsdb_jsonrpc_session_get_memory_usage_all(
66 const struct ovsdb_jsonrpc_remote
*, struct simap
*usage
);
67 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote
*);
68 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote
*,
71 static void ovsdb_jsonrpc_session_set_all_options(
72 struct ovsdb_jsonrpc_remote
*, const struct ovsdb_jsonrpc_options
*);
73 static bool ovsdb_jsonrpc_active_session_get_status(
74 const struct ovsdb_jsonrpc_remote
*,
75 struct ovsdb_jsonrpc_remote_status
*);
76 static void ovsdb_jsonrpc_session_get_status(
77 const struct ovsdb_jsonrpc_session
*,
78 struct ovsdb_jsonrpc_remote_status
*);
79 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session
*);
80 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter
*);
81 static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session
*,
82 struct jsonrpc_msg
*);
85 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session
*,
87 struct jsonrpc_msg
*request
);
88 static struct ovsdb_jsonrpc_trigger
*ovsdb_jsonrpc_trigger_find(
89 struct ovsdb_jsonrpc_session
*, const struct json
*id
, size_t hash
);
90 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger
*);
91 static void ovsdb_jsonrpc_trigger_preremove_db(struct ovsdb_jsonrpc_session
*,
93 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session
*);
94 static void ovsdb_jsonrpc_trigger_complete_done(
95 struct ovsdb_jsonrpc_session
*);
98 static struct jsonrpc_msg
*ovsdb_jsonrpc_monitor_create(
99 struct ovsdb_jsonrpc_session
*, struct ovsdb
*, struct json
*params
,
100 enum ovsdb_monitor_version
, const struct json
*request_id
);
101 static struct jsonrpc_msg
*ovsdb_jsonrpc_monitor_cond_change(
102 struct ovsdb_jsonrpc_session
*s
,
104 const struct json
*request_id
);
105 static struct jsonrpc_msg
*ovsdb_jsonrpc_monitor_cancel(
106 struct ovsdb_jsonrpc_session
*,
107 struct json_array
*params
,
108 const struct json
*request_id
);
109 static void ovsdb_jsonrpc_monitor_preremove_db(struct ovsdb_jsonrpc_session
*,
111 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session
*);
112 static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session
*);
113 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session
*);
114 static struct json
*ovsdb_jsonrpc_monitor_compose_update(
115 struct ovsdb_jsonrpc_monitor
*monitor
, bool initial
);
116 static struct jsonrpc_msg
* ovsdb_jsonrpc_create_notify(
117 const struct ovsdb_jsonrpc_monitor
*m
,
118 struct json
*params
);
121 /* JSON-RPC database server. */
123 struct ovsdb_jsonrpc_server
{
124 struct ovsdb_server up
;
125 unsigned int n_sessions
;
126 bool read_only
; /* This server is does not accept any
127 transactions that can modify the database. */
128 struct shash remotes
; /* Contains "struct ovsdb_jsonrpc_remote *"s. */
131 /* A configured remote. This is either a passive stream listener plus a list
132 * of the currently connected sessions, or a list of exactly one active
134 struct ovsdb_jsonrpc_remote
{
135 struct ovsdb_jsonrpc_server
*server
;
136 struct pstream
*listener
; /* Listener, if passive. */
137 struct ovs_list sessions
; /* List of "struct ovsdb_jsonrpc_session"s. */
143 static struct ovsdb_jsonrpc_remote
*ovsdb_jsonrpc_server_add_remote(
144 struct ovsdb_jsonrpc_server
*, const char *name
,
145 const struct ovsdb_jsonrpc_options
*options
147 static void ovsdb_jsonrpc_server_del_remote(struct shash_node
*);
149 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
151 * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
152 * which 'server' should provide access. */
153 struct ovsdb_jsonrpc_server
*
154 ovsdb_jsonrpc_server_create(bool read_only
)
156 struct ovsdb_jsonrpc_server
*server
= xzalloc(sizeof *server
);
157 ovsdb_server_init(&server
->up
);
158 shash_init(&server
->remotes
);
159 server
->read_only
= read_only
;
163 /* Adds 'db' to the set of databases served out by 'svr'. Returns true if
164 * successful, false if 'db''s name is the same as some database already in
167 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server
*svr
, struct ovsdb
*db
)
169 ovsdb_jsonrpc_server_reconnect(
170 svr
, false, xasprintf("adding %s database", db
->name
));
171 return ovsdb_server_add_db(&svr
->up
, db
);
174 /* Removes 'db' from the set of databases served out by 'svr'.
176 * 'comment' should be a human-readable reason for removing the database, for
177 * use in log messages, or NULL to suppress logging. This function frees
180 ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server
*svr
,
181 struct ovsdb
*db
, char *comment
)
183 struct shash_node
*node
;
184 SHASH_FOR_EACH (node
, &svr
->remotes
) {
185 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
187 ovsdb_jsonrpc_session_preremove_db(remote
, db
);
190 ovsdb_jsonrpc_server_reconnect(svr
, false, comment
);
192 ovsdb_server_remove_db(&svr
->up
, db
);
196 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server
*svr
)
198 struct shash_node
*node
, *next
;
200 SHASH_FOR_EACH_SAFE (node
, next
, &svr
->remotes
) {
201 ovsdb_jsonrpc_server_del_remote(node
);
203 shash_destroy(&svr
->remotes
);
204 ovsdb_server_destroy(&svr
->up
);
208 struct ovsdb_jsonrpc_options
*
209 ovsdb_jsonrpc_default_options(const char *target
)
211 struct ovsdb_jsonrpc_options
*options
= xzalloc(sizeof *options
);
212 options
->max_backoff
= RECONNECT_DEFAULT_MAX_BACKOFF
;
213 options
->probe_interval
= (stream_or_pstream_needs_probes(target
)
214 ? RECONNECT_DEFAULT_PROBE_INTERVAL
219 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
220 * options in the struct ovsdb_jsonrpc_options supplied as the data values.
222 * A remote is an active or passive stream connection method, e.g. "pssl:" or
225 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server
*svr
,
226 const struct shash
*new_remotes
)
228 struct shash_node
*node
, *next
;
230 SHASH_FOR_EACH_SAFE (node
, next
, &svr
->remotes
) {
231 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
232 struct ovsdb_jsonrpc_options
*options
233 = shash_find_data(new_remotes
, node
->name
);
236 VLOG_INFO("%s: remote deconfigured", node
->name
);
237 ovsdb_jsonrpc_server_del_remote(node
);
238 } else if (options
->dscp
!= remote
->dscp
) {
239 ovsdb_jsonrpc_server_del_remote(node
);
242 SHASH_FOR_EACH (node
, new_remotes
) {
243 const struct ovsdb_jsonrpc_options
*options
= node
->data
;
244 struct ovsdb_jsonrpc_remote
*remote
;
246 remote
= shash_find_data(&svr
->remotes
, node
->name
);
248 remote
= ovsdb_jsonrpc_server_add_remote(svr
, node
->name
, options
);
254 ovsdb_jsonrpc_session_set_all_options(remote
, options
);
258 static struct ovsdb_jsonrpc_remote
*
259 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server
*svr
,
261 const struct ovsdb_jsonrpc_options
*options
)
263 struct ovsdb_jsonrpc_remote
*remote
;
264 struct pstream
*listener
;
267 error
= jsonrpc_pstream_open(name
, &listener
, options
->dscp
);
268 if (error
&& error
!= EAFNOSUPPORT
) {
269 VLOG_ERR_RL(&rl
, "%s: listen failed: %s", name
, ovs_strerror(error
));
273 remote
= xmalloc(sizeof *remote
);
274 remote
->server
= svr
;
275 remote
->listener
= listener
;
276 ovs_list_init(&remote
->sessions
);
277 remote
->dscp
= options
->dscp
;
278 remote
->read_only
= options
->read_only
;
279 remote
->role
= nullable_xstrdup(options
->role
);
280 shash_add(&svr
->remotes
, name
, remote
);
283 ovsdb_jsonrpc_session_create(remote
, jsonrpc_session_open(name
, true),
284 svr
->read_only
|| remote
->read_only
);
290 ovsdb_jsonrpc_server_del_remote(struct shash_node
*node
)
292 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
294 ovsdb_jsonrpc_session_close_all(remote
);
295 pstream_close(remote
->listener
);
296 shash_delete(&remote
->server
->remotes
, node
);
301 /* Stores status information for the remote named 'target', which should have
302 * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
303 * into '*status'. On success returns true, on failure (if 'svr' doesn't have
304 * a remote named 'target' or if that remote is an outbound remote that has no
305 * active connections) returns false. On failure, 'status' will be zeroed.
308 ovsdb_jsonrpc_server_get_remote_status(
309 const struct ovsdb_jsonrpc_server
*svr
, const char *target
,
310 struct ovsdb_jsonrpc_remote_status
*status
)
312 const struct ovsdb_jsonrpc_remote
*remote
;
314 memset(status
, 0, sizeof *status
);
316 remote
= shash_find_data(&svr
->remotes
, target
);
322 if (remote
->listener
) {
323 status
->bound_port
= pstream_get_bound_port(remote
->listener
);
324 status
->is_connected
= !ovs_list_is_empty(&remote
->sessions
);
325 status
->n_connections
= ovs_list_size(&remote
->sessions
);
329 return ovsdb_jsonrpc_active_session_get_status(remote
, status
);
333 ovsdb_jsonrpc_server_free_remote_status(
334 struct ovsdb_jsonrpc_remote_status
*status
)
336 free(status
->locks_held
);
337 free(status
->locks_waiting
);
338 free(status
->locks_lost
);
341 /* Makes all of the JSON-RPC sessions managed by 'svr' disconnect. (They
342 * will then generally reconnect.). Uses 'comment' as a human-readable comment
343 * for logging (it may be NULL to suppress logging). Frees 'comment'.
345 * If 'force' is true, disconnects all sessions. Otherwise, disconnects only
346 * sesions that aren't database change aware. */
348 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server
*svr
, bool force
,
351 struct shash_node
*node
;
353 SHASH_FOR_EACH (node
, &svr
->remotes
) {
354 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
356 ovsdb_jsonrpc_session_reconnect_all(remote
, force
, comment
);
363 ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server
*svr
,
366 if (svr
->read_only
!= read_only
) {
367 svr
->read_only
= read_only
;
368 ovsdb_jsonrpc_server_reconnect(svr
, false,
370 ? "making server read-only"
371 : "making server read/write"));
376 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server
*svr
)
378 struct shash_node
*node
;
380 SHASH_FOR_EACH (node
, &svr
->remotes
) {
381 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
383 if (remote
->listener
) {
384 struct stream
*stream
;
387 error
= pstream_accept(remote
->listener
, &stream
);
389 struct jsonrpc_session
*js
;
390 js
= jsonrpc_session_open_unreliably(jsonrpc_open(stream
),
392 ovsdb_jsonrpc_session_create(remote
, js
, svr
->read_only
||
394 } else if (error
!= EAGAIN
) {
395 VLOG_WARN_RL(&rl
, "%s: accept failed: %s",
396 pstream_get_name(remote
->listener
),
397 ovs_strerror(error
));
401 ovsdb_jsonrpc_session_run_all(remote
);
406 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server
*svr
)
408 struct shash_node
*node
;
410 SHASH_FOR_EACH (node
, &svr
->remotes
) {
411 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
413 if (remote
->listener
) {
414 pstream_wait(remote
->listener
);
417 ovsdb_jsonrpc_session_wait_all(remote
);
421 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
422 * memory_report(). */
424 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server
*svr
,
427 struct shash_node
*node
;
429 simap_increase(usage
, "sessions", svr
->n_sessions
);
430 SHASH_FOR_EACH (node
, &svr
->remotes
) {
431 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
433 ovsdb_jsonrpc_session_get_memory_usage_all(remote
, usage
);
437 /* JSON-RPC database server session. */
439 struct ovsdb_jsonrpc_session
{
440 struct ovs_list node
; /* Element in remote's sessions list. */
441 struct ovsdb_session up
;
442 struct ovsdb_jsonrpc_remote
*remote
;
444 /* RFC 7047 does not contemplate how to alert clients to changes to the set
445 * of databases, e.g. databases that are added or removed while the
446 * database server is running. Traditionally, ovsdb-server disconnects all
447 * of its clients when this happens; a well-written client will reassess
448 * what is available from the server upon reconnection.
450 * OVS 2.9 introduces a way for clients to monitor changes to the databases
451 * being served, through the Database table in the _Server database that
452 * OVSDB adds in this version. ovsdb-server suppresses the connection
453 * close for clients that identify themselves as taking advantage of this
454 * mechanism. When this member is true, it indicates that the client
455 * requested such suppression. */
456 bool db_change_aware
;
459 struct hmap triggers
; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
462 struct hmap monitors
; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
464 /* Network connectivity. */
465 struct jsonrpc_session
*js
; /* JSON-RPC session. */
466 unsigned int js_seqno
; /* Last jsonrpc_session_get_seqno() value. */
469 bool read_only
; /* When true, not allow to modify the
473 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session
*);
474 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session
*);
475 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session
*);
476 static void ovsdb_jsonrpc_session_get_memory_usage(
477 const struct ovsdb_jsonrpc_session
*, struct simap
*usage
);
478 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session
*,
479 struct jsonrpc_msg
*);
480 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session
*,
481 struct jsonrpc_msg
*);
483 static struct ovsdb_jsonrpc_session
*
484 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote
*remote
,
485 struct jsonrpc_session
*js
, bool read_only
)
487 struct ovsdb_jsonrpc_session
*s
;
489 s
= xzalloc(sizeof *s
);
490 ovsdb_session_init(&s
->up
, &remote
->server
->up
);
492 ovs_list_push_back(&remote
->sessions
, &s
->node
);
493 hmap_init(&s
->triggers
);
494 hmap_init(&s
->monitors
);
496 s
->js_seqno
= jsonrpc_session_get_seqno(js
);
497 s
->read_only
= read_only
;
499 remote
->server
->n_sessions
++;
504 /* Database 'db' is about to be removed from the database server. To prepare,
505 * this function removes all references to 'db' from 'remote'. */
507 ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote
*remote
,
510 struct ovsdb_jsonrpc_session
*s
;
512 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
513 ovsdb_jsonrpc_monitor_preremove_db(s
, db
);
514 ovsdb_jsonrpc_trigger_preremove_db(s
, db
);
519 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session
*s
)
521 ovsdb_jsonrpc_monitor_remove_all(s
);
522 ovsdb_jsonrpc_session_unlock_all(s
);
523 ovsdb_jsonrpc_trigger_complete_all(s
);
525 hmap_destroy(&s
->monitors
);
526 hmap_destroy(&s
->triggers
);
528 jsonrpc_session_close(s
->js
);
529 ovs_list_remove(&s
->node
);
530 s
->remote
->server
->n_sessions
--;
531 ovsdb_session_destroy(&s
->up
);
536 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session
*s
)
538 jsonrpc_session_run(s
->js
);
539 if (s
->js_seqno
!= jsonrpc_session_get_seqno(s
->js
)) {
540 s
->js_seqno
= jsonrpc_session_get_seqno(s
->js
);
541 ovsdb_jsonrpc_trigger_complete_all(s
);
542 ovsdb_jsonrpc_monitor_remove_all(s
);
543 ovsdb_jsonrpc_session_unlock_all(s
);
546 ovsdb_jsonrpc_trigger_complete_done(s
);
548 if (!jsonrpc_session_get_backlog(s
->js
)) {
549 struct jsonrpc_msg
*msg
;
551 ovsdb_jsonrpc_monitor_flush_all(s
);
553 msg
= jsonrpc_session_recv(s
->js
);
555 if (msg
->type
== JSONRPC_REQUEST
) {
556 ovsdb_jsonrpc_session_got_request(s
, msg
);
557 } else if (msg
->type
== JSONRPC_NOTIFY
) {
558 ovsdb_jsonrpc_session_got_notify(s
, msg
);
560 VLOG_WARN("%s: received unexpected %s message",
561 jsonrpc_session_get_name(s
->js
),
562 jsonrpc_msg_type_to_string(msg
->type
));
563 jsonrpc_session_force_reconnect(s
->js
);
564 jsonrpc_msg_destroy(msg
);
568 return jsonrpc_session_is_alive(s
->js
) ? 0 : ETIMEDOUT
;
572 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session
*session
,
573 const struct ovsdb_jsonrpc_options
*options
)
575 jsonrpc_session_set_max_backoff(session
->js
, options
->max_backoff
);
576 jsonrpc_session_set_probe_interval(session
->js
, options
->probe_interval
);
577 jsonrpc_session_set_dscp(session
->js
, options
->dscp
);
581 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote
*remote
)
583 struct ovsdb_jsonrpc_session
*s
, *next
;
585 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
586 int error
= ovsdb_jsonrpc_session_run(s
);
588 ovsdb_jsonrpc_session_close(s
);
594 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session
*s
)
596 jsonrpc_session_wait(s
->js
);
597 if (!jsonrpc_session_get_backlog(s
->js
)) {
598 if (ovsdb_jsonrpc_monitor_needs_flush(s
)) {
599 poll_immediate_wake();
601 jsonrpc_session_recv_wait(s
->js
);
607 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote
*remote
)
609 struct ovsdb_jsonrpc_session
*s
;
611 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
612 ovsdb_jsonrpc_session_wait(s
);
617 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session
*s
,
620 simap_increase(usage
, "triggers", hmap_count(&s
->triggers
));
621 simap_increase(usage
, "backlog", jsonrpc_session_get_backlog(s
->js
));
625 ovsdb_jsonrpc_session_get_memory_usage_all(
626 const struct ovsdb_jsonrpc_remote
*remote
,
629 struct ovsdb_jsonrpc_session
*s
;
631 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
632 ovsdb_jsonrpc_session_get_memory_usage(s
, usage
);
637 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote
*remote
)
639 struct ovsdb_jsonrpc_session
*s
, *next
;
641 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
642 ovsdb_jsonrpc_session_close(s
);
646 /* Makes all of the JSON-RPC sessions managed by 'remote' disconnect. (They
647 * will then generally reconnect.). 'comment' should be a human-readable
648 * explanation of the reason for disconnection, for use in log messages, or
649 * NULL to suppress logging.
651 * If 'force' is true, disconnects all sessions. Otherwise, disconnects only
652 * sesions that aren't database change aware. */
654 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote
*remote
,
655 bool force
, const char *comment
)
657 struct ovsdb_jsonrpc_session
*s
, *next
;
659 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
660 if (force
|| !s
->db_change_aware
) {
661 jsonrpc_session_force_reconnect(s
->js
);
662 if (comment
&& jsonrpc_session_is_connected(s
->js
)) {
663 VLOG_INFO("%s: disconnecting (%s)",
664 jsonrpc_session_get_name(s
->js
), comment
);
666 if (!jsonrpc_session_is_alive(s
->js
)) {
667 ovsdb_jsonrpc_session_close(s
);
673 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
676 * (The dscp value can't be changed directly; the caller must instead close and
677 * re-open the session.) */
679 ovsdb_jsonrpc_session_set_all_options(
680 struct ovsdb_jsonrpc_remote
*remote
,
681 const struct ovsdb_jsonrpc_options
*options
)
683 struct ovsdb_jsonrpc_session
*s
;
685 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
686 ovsdb_jsonrpc_session_set_options(s
, options
);
690 /* Sets the 'status' of for the 'remote' with an outgoing connection. */
692 ovsdb_jsonrpc_active_session_get_status(
693 const struct ovsdb_jsonrpc_remote
*remote
,
694 struct ovsdb_jsonrpc_remote_status
*status
)
696 const struct ovs_list
*sessions
= &remote
->sessions
;
697 const struct ovsdb_jsonrpc_session
*s
;
699 if (ovs_list_is_empty(sessions
)) {
703 ovs_assert(ovs_list_is_singleton(sessions
));
704 s
= CONTAINER_OF(ovs_list_front(sessions
), struct ovsdb_jsonrpc_session
, node
);
705 ovsdb_jsonrpc_session_get_status(s
, status
);
706 status
->n_connections
= 1;
712 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_session
*session
,
713 struct ovsdb_jsonrpc_remote_status
*status
)
715 const struct ovsdb_jsonrpc_session
*s
= session
;
716 const struct jsonrpc_session
*js
;
717 struct ovsdb_lock_waiter
*waiter
;
718 struct reconnect_stats rstats
;
719 struct ds locks_held
, locks_waiting
, locks_lost
;
723 status
->is_connected
= jsonrpc_session_is_connected(js
);
724 status
->last_error
= jsonrpc_session_get_status(js
);
726 jsonrpc_session_get_reconnect_stats(js
, &rstats
);
727 status
->state
= rstats
.state
;
728 status
->sec_since_connect
= rstats
.msec_since_connect
== UINT_MAX
729 ? UINT_MAX
: rstats
.msec_since_connect
/ 1000;
730 status
->sec_since_disconnect
= rstats
.msec_since_disconnect
== UINT_MAX
731 ? UINT_MAX
: rstats
.msec_since_disconnect
/ 1000;
733 ds_init(&locks_held
);
734 ds_init(&locks_waiting
);
735 ds_init(&locks_lost
);
736 HMAP_FOR_EACH (waiter
, session_node
, &s
->up
.waiters
) {
739 string
= (ovsdb_lock_waiter_is_owner(waiter
) ? &locks_held
740 : waiter
->mode
== OVSDB_LOCK_WAIT
? &locks_waiting
742 if (string
->length
) {
743 ds_put_char(string
, ' ');
745 ds_put_cstr(string
, waiter
->lock_name
);
747 status
->locks_held
= ds_steal_cstr(&locks_held
);
748 status
->locks_waiting
= ds_steal_cstr(&locks_waiting
);
749 status
->locks_lost
= ds_steal_cstr(&locks_lost
);
752 /* Examines 'request' to determine the database to which it relates, and then
753 * searches 's' to find that database:
755 * - If successful, returns the database and sets '*replyp' to NULL.
757 * - If no such database exists, returns NULL and sets '*replyp' to an
758 * appropriate JSON-RPC error reply, owned by the caller. */
759 static struct ovsdb
*
760 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session
*s
,
761 const struct jsonrpc_msg
*request
,
762 struct jsonrpc_msg
**replyp
)
764 struct json_array
*params
;
765 struct ovsdb_error
*error
;
769 params
= json_array(request
->params
);
770 if (!params
->n
|| params
->elems
[0]->type
!= JSON_STRING
) {
771 error
= ovsdb_syntax_error(
772 request
->params
, NULL
,
773 "%s request params must begin with <db-name>", request
->method
);
777 db_name
= params
->elems
[0]->string
;
778 db
= shash_find_data(&s
->up
.server
->dbs
, db_name
);
780 error
= ovsdb_syntax_error(
781 request
->params
, "unknown database",
782 "%s request specifies unknown database %s",
783 request
->method
, db_name
);
788 error
= ovsdb_error("database not available",
789 "%s request specifies database %s which is not "
790 "yet available because it has not completed "
791 "joining its cluster",
792 request
->method
, db_name
);
800 *replyp
= jsonrpc_create_error(ovsdb_error_to_json_free(error
),
805 static struct ovsdb_error
*
806 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg
*request
,
807 const char **lock_namep
)
809 const struct json_array
*params
;
811 params
= json_array(request
->params
);
812 if (params
->n
!= 1 || params
->elems
[0]->type
!= JSON_STRING
||
813 !ovsdb_parser_is_id(json_string(params
->elems
[0]))) {
815 return ovsdb_syntax_error(request
->params
, NULL
,
816 "%s request params must be <id>",
820 *lock_namep
= json_string(params
->elems
[0]);
825 ovsdb_jsonrpc_session_notify(struct ovsdb_session
*session
,
826 const char *lock_name
,
829 struct ovsdb_jsonrpc_session
*s
;
832 s
= CONTAINER_OF(session
, struct ovsdb_jsonrpc_session
, up
);
833 params
= json_array_create_1(json_string_create(lock_name
));
834 ovsdb_jsonrpc_session_send(s
, jsonrpc_create_notify(method
, params
));
837 static struct jsonrpc_msg
*
838 jsonrpc_create_readonly_lock_error(const struct json
*id
)
840 return jsonrpc_create_error(json_string_create(
841 "lock and unlock methods not allowed,"
842 " DB server is read only."), id
);
845 static struct jsonrpc_msg
*
846 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session
*s
,
847 struct jsonrpc_msg
*request
,
848 enum ovsdb_lock_mode mode
)
850 struct ovsdb_lock_waiter
*waiter
;
851 struct ovsdb_error
*error
;
852 struct ovsdb_session
*victim
;
853 const char *lock_name
;
857 return jsonrpc_create_readonly_lock_error(request
->id
);
860 error
= ovsdb_jsonrpc_session_parse_lock_name(request
, &lock_name
);
865 /* Report error if this session has issued a "lock" or "steal" without a
866 * matching "unlock" for this lock. */
867 waiter
= ovsdb_session_get_lock_waiter(&s
->up
, lock_name
);
869 error
= ovsdb_syntax_error(
870 request
->params
, NULL
,
871 "must issue \"unlock\" before new \"%s\"", request
->method
);
875 /* Get the lock, add us as a waiter. */
876 waiter
= ovsdb_server_lock(&s
->remote
->server
->up
, &s
->up
, lock_name
, mode
,
879 ovsdb_jsonrpc_session_notify(victim
, lock_name
, "stolen");
882 result
= json_object_create();
883 json_object_put(result
, "locked",
884 json_boolean_create(ovsdb_lock_waiter_is_owner(waiter
)));
886 return jsonrpc_create_reply(result
, request
->id
);
889 return jsonrpc_create_error(ovsdb_error_to_json_free(error
), request
->id
);
893 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session
*s
)
895 struct ovsdb_lock_waiter
*waiter
, *next
;
897 HMAP_FOR_EACH_SAFE (waiter
, next
, session_node
, &s
->up
.waiters
) {
898 ovsdb_jsonrpc_session_unlock__(waiter
);
903 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter
*waiter
)
905 struct ovsdb_lock
*lock
= waiter
->lock
;
908 struct ovsdb_session
*new_owner
= ovsdb_lock_waiter_remove(waiter
);
910 ovsdb_jsonrpc_session_notify(new_owner
, lock
->name
, "locked");
912 /* ovsdb_server_lock() might have freed 'lock'. */
916 ovsdb_lock_waiter_destroy(waiter
);
919 static struct jsonrpc_msg
*
920 syntax_error_reply(const struct jsonrpc_msg
*request
, const char *details
)
922 struct ovsdb_error
*error
= ovsdb_syntax_error(
923 request
->params
, NULL
, "%s: %s", request
->method
, details
);
924 struct jsonrpc_msg
*msg
= jsonrpc_create_error(ovsdb_error_to_json(error
),
926 ovsdb_error_destroy(error
);
930 static struct jsonrpc_msg
*
931 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session
*s
,
932 struct jsonrpc_msg
*request
)
934 struct ovsdb_lock_waiter
*waiter
;
935 struct ovsdb_error
*error
;
936 const char *lock_name
;
939 return jsonrpc_create_readonly_lock_error(request
->id
);
942 error
= ovsdb_jsonrpc_session_parse_lock_name(request
, &lock_name
);
944 return jsonrpc_create_error(ovsdb_error_to_json_free(error
),
948 /* Report error if this session has not issued a "lock" or "steal" for this
950 waiter
= ovsdb_session_get_lock_waiter(&s
->up
, lock_name
);
952 return syntax_error_reply(request
,
953 "\"unlock\" without \"lock\" or \"steal\"");
956 ovsdb_jsonrpc_session_unlock__(waiter
);
958 return jsonrpc_create_reply(json_object_create(), request
->id
);
961 static struct jsonrpc_msg
*
962 ovsdb_jsonrpc_session_set_db_change_aware(struct ovsdb_jsonrpc_session
*s
,
963 const struct jsonrpc_msg
*request
)
965 const struct json_array
*params
= json_array(request
->params
);
967 || (params
->elems
[0]->type
!= JSON_TRUE
&&
968 params
->elems
[0]->type
!= JSON_FALSE
)) {
969 return syntax_error_reply(request
, "true or false parameter expected");
972 s
->db_change_aware
= json_boolean(params
->elems
[0]);
973 return jsonrpc_create_reply(json_object_create(), request
->id
);
977 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session
*s
,
978 struct jsonrpc_msg
*request
)
980 struct jsonrpc_msg
*reply
;
982 if (!strcmp(request
->method
, "transact") ||
983 !strcmp(request
->method
, "convert")) {
984 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
986 ovsdb_jsonrpc_trigger_create(s
, db
, request
);
988 } else if (!strcmp(request
->method
, "monitor") ||
989 (monitor_cond_enable__
&& !strcmp(request
->method
,
991 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
993 int l
= strlen(request
->method
) - strlen("monitor");
994 enum ovsdb_monitor_version version
= l
? OVSDB_MONITOR_V2
996 reply
= ovsdb_jsonrpc_monitor_create(s
, db
, request
->params
,
997 version
, request
->id
);
999 } else if (!strcmp(request
->method
, "monitor_cond_change")) {
1000 reply
= ovsdb_jsonrpc_monitor_cond_change(s
, request
->params
,
1002 } else if (!strcmp(request
->method
, "monitor_cancel")) {
1003 reply
= ovsdb_jsonrpc_monitor_cancel(s
, json_array(request
->params
),
1005 } else if (!strcmp(request
->method
, "get_schema")) {
1006 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
1008 reply
= jsonrpc_create_reply(ovsdb_schema_to_json(db
->schema
),
1011 } else if (!strcmp(request
->method
, "list_dbs")) {
1012 size_t n_dbs
= shash_count(&s
->up
.server
->dbs
);
1013 struct shash_node
*node
;
1017 dbs
= xmalloc(n_dbs
* sizeof *dbs
);
1019 SHASH_FOR_EACH (node
, &s
->up
.server
->dbs
) {
1020 dbs
[i
++] = json_string_create(node
->name
);
1022 reply
= jsonrpc_create_reply(json_array_create(dbs
, n_dbs
),
1024 } else if (!strcmp(request
->method
, "get_server_id")) {
1025 const struct uuid
*uuid
= &s
->up
.server
->uuid
;
1026 struct json
*result
;
1028 result
= json_string_create_nocopy(xasprintf(UUID_FMT
,
1030 reply
= jsonrpc_create_reply(result
, request
->id
);
1031 } else if (!strcmp(request
->method
, "lock")) {
1032 reply
= ovsdb_jsonrpc_session_lock(s
, request
, OVSDB_LOCK_WAIT
);
1033 } else if (!strcmp(request
->method
, "steal")) {
1034 reply
= ovsdb_jsonrpc_session_lock(s
, request
, OVSDB_LOCK_STEAL
);
1035 } else if (!strcmp(request
->method
, "unlock")) {
1036 reply
= ovsdb_jsonrpc_session_unlock(s
, request
);
1037 } else if (!strcmp(request
->method
, "set_db_change_aware")) {
1038 reply
= ovsdb_jsonrpc_session_set_db_change_aware(s
, request
);
1039 } else if (!strcmp(request
->method
, "echo")) {
1040 reply
= jsonrpc_create_reply(json_clone(request
->params
), request
->id
);
1042 reply
= jsonrpc_create_error(json_string_create("unknown method"),
1047 jsonrpc_msg_destroy(request
);
1048 ovsdb_jsonrpc_session_send(s
, reply
);
1053 execute_cancel(struct ovsdb_jsonrpc_session
*s
, struct jsonrpc_msg
*request
)
1055 if (json_array(request
->params
)->n
== 1) {
1056 struct ovsdb_jsonrpc_trigger
*t
;
1059 id
= request
->params
->array
.elems
[0];
1060 t
= ovsdb_jsonrpc_trigger_find(s
, id
, json_hash(id
, 0));
1062 ovsdb_jsonrpc_trigger_complete(t
);
1068 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session
*s
,
1069 struct jsonrpc_msg
*request
)
1071 if (!strcmp(request
->method
, "cancel")) {
1072 execute_cancel(s
, request
);
1074 jsonrpc_msg_destroy(request
);
1078 ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session
*s
,
1079 struct jsonrpc_msg
*msg
)
1081 ovsdb_jsonrpc_monitor_flush_all(s
);
1082 jsonrpc_session_send(s
->js
, msg
);
1085 /* JSON-RPC database server triggers.
1087 * (Every transaction is treated as a trigger even if it doesn't actually have
1088 * any "wait" operations.) */
1090 struct ovsdb_jsonrpc_trigger
{
1091 struct ovsdb_trigger trigger
;
1092 struct hmap_node hmap_node
; /* In session's "triggers" hmap. */
1097 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session
*s
, struct ovsdb
*db
,
1098 struct jsonrpc_msg
*request
)
1100 /* Check for duplicate ID. */
1101 size_t hash
= json_hash(request
->id
, 0);
1102 struct ovsdb_jsonrpc_trigger
*t
1103 = ovsdb_jsonrpc_trigger_find(s
, request
->id
, hash
);
1105 ovsdb_jsonrpc_session_send(
1106 s
, syntax_error_reply(request
, "duplicate request ID"));
1107 jsonrpc_msg_destroy(request
);
1111 /* Insert into trigger table. */
1112 t
= xmalloc(sizeof *t
);
1113 bool disconnect_all
= ovsdb_trigger_init(
1114 &s
->up
, db
, &t
->trigger
, request
, time_msec(), s
->read_only
,
1115 s
->remote
->role
, jsonrpc_session_get_id(s
->js
));
1116 t
->id
= json_clone(request
->id
);
1117 hmap_insert(&s
->triggers
, &t
->hmap_node
, hash
);
1119 /* Complete early if possible. */
1120 if (ovsdb_trigger_is_complete(&t
->trigger
)) {
1121 ovsdb_jsonrpc_trigger_complete(t
);
1124 if (disconnect_all
) {
1125 /* The message below is currently the only reason to disconnect all
1127 ovsdb_jsonrpc_server_reconnect(s
->remote
->server
, false,
1128 xasprintf("committed %s database "
1129 "schema conversion",
1134 static struct ovsdb_jsonrpc_trigger
*
1135 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session
*s
,
1136 const struct json
*id
, size_t hash
)
1138 struct ovsdb_jsonrpc_trigger
*t
;
1140 HMAP_FOR_EACH_WITH_HASH (t
, hmap_node
, hash
, &s
->triggers
) {
1141 if (json_equal(t
->id
, id
)) {
1150 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger
*t
)
1152 struct ovsdb_jsonrpc_session
*s
;
1154 s
= CONTAINER_OF(t
->trigger
.session
, struct ovsdb_jsonrpc_session
, up
);
1156 if (jsonrpc_session_is_connected(s
->js
)) {
1157 bool complete
= ovsdb_trigger_is_complete(&t
->trigger
);
1158 if (s
->db_change_aware
&& !complete
) {
1159 ovsdb_trigger_cancel(&t
->trigger
, "closing JSON-RPC session");
1163 struct jsonrpc_msg
*reply
= ovsdb_trigger_steal_reply(&t
->trigger
);
1164 ovsdb_jsonrpc_session_send(s
, reply
);
1168 json_destroy(t
->id
);
1169 ovsdb_trigger_destroy(&t
->trigger
);
1170 hmap_remove(&s
->triggers
, &t
->hmap_node
);
1175 ovsdb_jsonrpc_trigger_remove__(struct ovsdb_jsonrpc_session
*s
,
1178 struct ovsdb_jsonrpc_trigger
*t
, *next
;
1179 HMAP_FOR_EACH_SAFE (t
, next
, hmap_node
, &s
->triggers
) {
1180 if (!db
|| t
->trigger
.db
== db
) {
1181 ovsdb_jsonrpc_trigger_complete(t
);
1186 /* Database 'db' is about to be removed from the database server. To prepare,
1187 * this function removes all references from triggers in 's' to 'db'. */
1189 ovsdb_jsonrpc_trigger_preremove_db(struct ovsdb_jsonrpc_session
*s
,
1193 ovsdb_jsonrpc_trigger_remove__(s
, db
);
1196 /* Removes all triggers from 's'. */
1198 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session
*s
)
1200 ovsdb_jsonrpc_trigger_remove__(s
, NULL
);
1204 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session
*s
)
1206 struct ovsdb_jsonrpc_trigger
*trigger
, *next
;
1207 LIST_FOR_EACH_SAFE (trigger
, next
, trigger
.node
, &s
->up
.completions
) {
1208 ovsdb_jsonrpc_trigger_complete(trigger
);
1212 /* Jsonrpc front end monitor. */
1213 struct ovsdb_jsonrpc_monitor
{
1214 struct hmap_node node
; /* In ovsdb_jsonrpc_session's "monitors". */
1215 struct ovsdb_jsonrpc_session
*session
;
1217 struct json
*monitor_id
;
1218 struct ovsdb_monitor
*dbmon
;
1219 uint64_t unflushed
; /* The first transaction that has not been
1220 flushed to the jsonrpc remote client. */
1221 enum ovsdb_monitor_version version
;
1222 struct ovsdb_monitor_session_condition
*condition
;/* Session's condition */
1225 static struct ovsdb_jsonrpc_monitor
*
1226 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session
*s
,
1227 const struct json
*monitor_id
)
1229 struct ovsdb_jsonrpc_monitor
*m
;
1231 HMAP_FOR_EACH_WITH_HASH (m
, node
, json_hash(monitor_id
, 0), &s
->monitors
) {
1232 if (json_equal(m
->monitor_id
, monitor_id
)) {
1241 parse_bool(struct ovsdb_parser
*parser
, const char *name
, bool default_value
)
1243 const struct json
*json
;
1245 json
= ovsdb_parser_member(parser
, name
, OP_BOOLEAN
| OP_OPTIONAL
);
1246 return json
? json_boolean(json
) : default_value
;
1249 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
1250 ovsdb_jsonrpc_parse_monitor_request(
1251 struct ovsdb_monitor
*dbmon
,
1252 const struct ovsdb_table
*table
,
1253 struct ovsdb_monitor_session_condition
*cond
,
1254 const struct json
*monitor_request
)
1256 const struct ovsdb_table_schema
*ts
= table
->schema
;
1257 enum ovsdb_monitor_selection select
;
1258 const struct json
*columns
, *select_json
, *where
= NULL
;
1259 struct ovsdb_parser parser
;
1260 struct ovsdb_error
*error
;
1262 ovsdb_parser_init(&parser
, monitor_request
, "table %s", ts
->name
);
1264 where
= ovsdb_parser_member(&parser
, "where", OP_ARRAY
| OP_OPTIONAL
);
1266 columns
= ovsdb_parser_member(&parser
, "columns", OP_ARRAY
| OP_OPTIONAL
);
1268 select_json
= ovsdb_parser_member(&parser
, "select",
1269 OP_OBJECT
| OP_OPTIONAL
);
1271 error
= ovsdb_parser_finish(&parser
);
1278 ovsdb_parser_init(&parser
, select_json
, "table %s select", ts
->name
);
1279 if (parse_bool(&parser
, "initial", true)) {
1280 select
|= OJMS_INITIAL
;
1282 if (parse_bool(&parser
, "insert", true)) {
1283 select
|= OJMS_INSERT
;
1285 if (parse_bool(&parser
, "delete", true)) {
1286 select
|= OJMS_DELETE
;
1288 if (parse_bool(&parser
, "modify", true)) {
1289 select
|= OJMS_MODIFY
;
1291 error
= ovsdb_parser_finish(&parser
);
1296 select
= OJMS_INITIAL
| OJMS_INSERT
| OJMS_DELETE
| OJMS_MODIFY
;
1299 ovsdb_monitor_table_add_select(dbmon
, table
, select
);
1303 if (columns
->type
!= JSON_ARRAY
) {
1304 return ovsdb_syntax_error(columns
, NULL
,
1305 "array of column names expected");
1308 for (i
= 0; i
< columns
->array
.n
; i
++) {
1309 const struct ovsdb_column
*column
;
1312 if (columns
->array
.elems
[i
]->type
!= JSON_STRING
) {
1313 return ovsdb_syntax_error(columns
, NULL
,
1314 "array of column names expected");
1317 s
= columns
->array
.elems
[i
]->string
;
1318 column
= shash_find_data(&table
->schema
->columns
, s
);
1320 return ovsdb_syntax_error(columns
, NULL
, "%s is not a valid "
1323 if (ovsdb_monitor_add_column(dbmon
, table
, column
,
1325 return ovsdb_syntax_error(columns
, NULL
, "column %s "
1326 "mentioned more than once",
1331 struct shash_node
*node
;
1333 SHASH_FOR_EACH (node
, &ts
->columns
) {
1334 const struct ovsdb_column
*column
= node
->data
;
1335 if (column
->index
!= OVSDB_COL_UUID
) {
1336 if (ovsdb_monitor_add_column(dbmon
, table
, column
,
1338 return ovsdb_syntax_error(columns
, NULL
, "column %s "
1339 "mentioned more than once",
1346 error
= ovsdb_monitor_table_condition_create(cond
, table
, where
);
1355 static struct jsonrpc_msg
*
1356 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session
*s
, struct ovsdb
*db
,
1357 struct json
*params
,
1358 enum ovsdb_monitor_version version
,
1359 const struct json
*request_id
)
1361 struct ovsdb_jsonrpc_monitor
*m
= NULL
;
1362 struct ovsdb_monitor
*dbmon
= NULL
;
1363 struct json
*monitor_id
, *monitor_requests
;
1364 struct ovsdb_error
*error
= NULL
;
1365 struct shash_node
*node
;
1368 if (json_array(params
)->n
!= 3) {
1369 error
= ovsdb_syntax_error(params
, NULL
, "invalid parameters");
1372 monitor_id
= params
->array
.elems
[1];
1373 monitor_requests
= params
->array
.elems
[2];
1374 if (monitor_requests
->type
!= JSON_OBJECT
) {
1375 error
= ovsdb_syntax_error(monitor_requests
, NULL
,
1376 "monitor-requests must be object");
1380 if (ovsdb_jsonrpc_monitor_find(s
, monitor_id
)) {
1381 error
= ovsdb_syntax_error(monitor_id
, NULL
, "duplicate monitor ID");
1385 m
= xzalloc(sizeof *m
);
1388 m
->dbmon
= ovsdb_monitor_create(db
, m
);
1389 if (version
== OVSDB_MONITOR_V2
) {
1390 m
->condition
= ovsdb_monitor_session_condition_create();
1393 m
->version
= version
;
1394 hmap_insert(&s
->monitors
, &m
->node
, json_hash(monitor_id
, 0));
1395 m
->monitor_id
= json_clone(monitor_id
);
1397 SHASH_FOR_EACH (node
, json_object(monitor_requests
)) {
1398 const struct ovsdb_table
*table
;
1399 const struct json
*mr_value
;
1402 table
= ovsdb_get_table(m
->db
, node
->name
);
1404 error
= ovsdb_syntax_error(NULL
, NULL
,
1405 "no table named %s", node
->name
);
1409 ovsdb_monitor_add_table(m
->dbmon
, table
);
1411 /* Parse columns. */
1412 mr_value
= node
->data
;
1413 if (mr_value
->type
== JSON_ARRAY
) {
1414 const struct json_array
*array
= &mr_value
->array
;
1416 for (i
= 0; i
< array
->n
; i
++) {
1417 error
= ovsdb_jsonrpc_parse_monitor_request(m
->dbmon
,
1426 error
= ovsdb_jsonrpc_parse_monitor_request(m
->dbmon
,
1436 dbmon
= ovsdb_monitor_add(m
->dbmon
);
1437 if (dbmon
!= m
->dbmon
) {
1438 /* Found an exisiting dbmon, reuse the current one. */
1439 ovsdb_monitor_remove_jsonrpc_monitor(m
->dbmon
, m
, m
->unflushed
);
1440 ovsdb_monitor_add_jsonrpc_monitor(dbmon
, m
);
1444 /* Only now we can bind session's condition to ovsdb_monitor */
1446 ovsdb_monitor_condition_bind(m
->dbmon
, m
->condition
);
1449 ovsdb_monitor_get_initial(m
->dbmon
);
1450 json
= ovsdb_jsonrpc_monitor_compose_update(m
, true);
1451 json
= json
? json
: json_object_create();
1452 return jsonrpc_create_reply(json
, request_id
);
1456 ovsdb_jsonrpc_monitor_destroy(m
, false);
1459 return jsonrpc_create_error(ovsdb_error_to_json_free(error
), request_id
);
1462 static struct ovsdb_error
*
1463 ovsdb_jsonrpc_parse_monitor_cond_change_request(
1464 struct ovsdb_jsonrpc_monitor
*m
,
1465 const struct ovsdb_table
*table
,
1466 const struct json
*cond_change_req
)
1468 const struct ovsdb_table_schema
*ts
= table
->schema
;
1469 const struct json
*condition
, *columns
;
1470 struct ovsdb_parser parser
;
1471 struct ovsdb_error
*error
;
1473 ovsdb_parser_init(&parser
, cond_change_req
, "table %s", ts
->name
);
1474 columns
= ovsdb_parser_member(&parser
, "columns", OP_ARRAY
| OP_OPTIONAL
);
1475 condition
= ovsdb_parser_member(&parser
, "where", OP_ARRAY
| OP_OPTIONAL
);
1477 error
= ovsdb_parser_finish(&parser
);
1483 error
= ovsdb_syntax_error(cond_change_req
, NULL
, "changing columns "
1487 error
= ovsdb_monitor_table_condition_update(m
->dbmon
, m
->condition
, table
,
1493 static struct jsonrpc_msg
*
1494 ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session
*s
,
1495 struct json
*params
,
1496 const struct json
*request_id
)
1498 struct ovsdb_error
*error
;
1499 struct ovsdb_jsonrpc_monitor
*m
;
1500 struct json
*monitor_cond_change_reqs
;
1501 struct shash_node
*node
;
1503 if (json_array(params
)->n
!= 3) {
1504 error
= ovsdb_syntax_error(params
, NULL
, "invalid parameters");
1508 m
= ovsdb_jsonrpc_monitor_find(s
, params
->array
.elems
[0]);
1510 error
= ovsdb_syntax_error(params
->array
.elems
[0], NULL
,
1511 "unknown monitor session");
1515 const struct json
*new_monitor_id
= params
->array
.elems
[1];
1516 bool changing_id
= !json_equal(m
->monitor_id
, new_monitor_id
);
1517 if (changing_id
&& ovsdb_jsonrpc_monitor_find(s
, new_monitor_id
)) {
1518 error
= ovsdb_syntax_error(new_monitor_id
, NULL
,
1519 "duplicate monitor ID");
1523 monitor_cond_change_reqs
= params
->array
.elems
[2];
1524 if (monitor_cond_change_reqs
->type
!= JSON_OBJECT
) {
1526 ovsdb_syntax_error(NULL
, NULL
,
1527 "monitor-cond-change-requests must be object");
1531 SHASH_FOR_EACH (node
, json_object(monitor_cond_change_reqs
)) {
1532 const struct ovsdb_table
*table
;
1533 const struct json
*mr_value
;
1536 table
= ovsdb_get_table(m
->db
, node
->name
);
1538 error
= ovsdb_syntax_error(NULL
, NULL
,
1539 "no table named %s", node
->name
);
1542 if (!ovsdb_monitor_table_exists(m
->dbmon
, table
)) {
1543 error
= ovsdb_syntax_error(NULL
, NULL
,
1544 "no table named %s in monitor session",
1549 mr_value
= node
->data
;
1550 if (mr_value
->type
== JSON_ARRAY
) {
1551 const struct json_array
*array
= &mr_value
->array
;
1553 for (i
= 0; i
< array
->n
; i
++) {
1554 error
= ovsdb_jsonrpc_parse_monitor_cond_change_request(
1555 m
, table
, array
->elems
[i
]);
1561 error
= ovsdb_syntax_error(
1563 "table %s no monitor-cond-change JSON array",
1570 hmap_remove(&s
->monitors
, &m
->node
);
1571 json_destroy(m
->monitor_id
);
1572 m
->monitor_id
= json_clone(new_monitor_id
);
1573 hmap_insert(&s
->monitors
, &m
->node
, json_hash(m
->monitor_id
, 0));
1576 /* Send the new update, if any, represents the difference from the old
1577 * condition and the new one. */
1578 struct json
*update_json
;
1580 update_json
= ovsdb_monitor_get_update(m
->dbmon
, false, true,
1581 m
->condition
, m
->version
, &m
->unflushed
);
1583 struct jsonrpc_msg
*msg
;
1586 p
= json_array_create_2(json_clone(m
->monitor_id
), update_json
);
1587 msg
= ovsdb_jsonrpc_create_notify(m
, p
);
1588 jsonrpc_session_send(s
->js
, msg
);
1591 return jsonrpc_create_reply(json_object_create(), request_id
);
1594 return jsonrpc_create_error(ovsdb_error_to_json_free(error
), request_id
);
1597 static struct jsonrpc_msg
*
1598 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session
*s
,
1599 struct json_array
*params
,
1600 const struct json
*request_id
)
1602 if (params
->n
!= 1) {
1603 return jsonrpc_create_error(json_string_create("invalid parameters"),
1606 struct ovsdb_jsonrpc_monitor
*m
;
1608 m
= ovsdb_jsonrpc_monitor_find(s
, params
->elems
[0]);
1610 return jsonrpc_create_error(json_string_create("unknown monitor"),
1613 ovsdb_jsonrpc_monitor_destroy(m
, false);
1614 return jsonrpc_create_reply(json_object_create(), request_id
);
1619 /* Database 'db' is about to be removed from the database server. To prepare,
1620 * this function removes all references from monitors in 's' to 'db'. */
1622 ovsdb_jsonrpc_monitor_preremove_db(struct ovsdb_jsonrpc_session
*s
,
1627 struct ovsdb_jsonrpc_monitor
*m
, *next
;
1628 HMAP_FOR_EACH_SAFE (m
, next
, node
, &s
->monitors
) {
1630 ovsdb_jsonrpc_monitor_destroy(m
, true);
1635 /* Cancels all monitors in 's'. */
1637 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session
*s
)
1639 struct ovsdb_jsonrpc_monitor
*m
, *next
;
1641 HMAP_FOR_EACH_SAFE (m
, next
, node
, &s
->monitors
) {
1642 ovsdb_jsonrpc_monitor_destroy(m
, false);
1646 static struct json
*
1647 ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor
*m
,
1651 if (!ovsdb_monitor_needs_flush(m
->dbmon
, m
->unflushed
)) {
1655 return ovsdb_monitor_get_update(m
->dbmon
, initial
, false,
1656 m
->condition
, m
->version
, &m
->unflushed
);
1660 ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session
*s
)
1662 struct ovsdb_jsonrpc_monitor
*m
;
1664 HMAP_FOR_EACH (m
, node
, &s
->monitors
) {
1665 if (ovsdb_monitor_needs_flush(m
->dbmon
, m
->unflushed
)) {
1674 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_jsonrpc_monitor
*m
,
1675 bool notify_cancellation
)
1677 if (notify_cancellation
) {
1678 struct ovsdb_jsonrpc_session
*s
= m
->session
;
1679 if (jsonrpc_session_is_connected(s
->js
) && s
->db_change_aware
) {
1680 struct jsonrpc_msg
*notify
= jsonrpc_create_notify(
1682 json_array_create_1(json_clone(m
->monitor_id
)));
1683 ovsdb_jsonrpc_session_send(s
, notify
);
1687 json_destroy(m
->monitor_id
);
1688 hmap_remove(&m
->session
->monitors
, &m
->node
);
1689 ovsdb_monitor_remove_jsonrpc_monitor(m
->dbmon
, m
, m
->unflushed
);
1690 ovsdb_monitor_session_condition_destroy(m
->condition
);
1694 static struct jsonrpc_msg
*
1695 ovsdb_jsonrpc_create_notify(const struct ovsdb_jsonrpc_monitor
*m
,
1696 struct json
*params
)
1700 switch(m
->version
) {
1701 case OVSDB_MONITOR_V1
:
1704 case OVSDB_MONITOR_V2
:
1707 case OVSDB_MONITOR_VERSION_MAX
:
1712 return jsonrpc_create_notify(method
, params
);
1716 ovsdb_jsonrpc_server_get_uuid(const struct ovsdb_jsonrpc_server
*s
)
1722 ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session
*s
)
1724 struct ovsdb_jsonrpc_monitor
*m
;
1726 HMAP_FOR_EACH (m
, node
, &s
->monitors
) {
1729 json
= ovsdb_jsonrpc_monitor_compose_update(m
, false);
1731 struct jsonrpc_msg
*msg
;
1732 struct json
*params
;
1734 params
= json_array_create_2(json_clone(m
->monitor_id
), json
);
1735 msg
= ovsdb_jsonrpc_create_notify(m
, params
);
1736 jsonrpc_session_send(s
->js
, msg
);
1742 ovsdb_jsonrpc_disable_monitor_cond(void)
1744 /* Once disabled, it is not possible to re-enable it. */
1745 monitor_cond_enable__
= false;