1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 #include "jsonrpc-server.h"
24 #include "openvswitch/dynamic-string.h"
26 #include "openvswitch/json.h"
28 #include "ovsdb-error.h"
29 #include "ovsdb-parser.h"
31 #include "condition.h"
32 #include "openvswitch/poll-loop.h"
33 #include "reconnect.h"
41 #include "transaction.h"
44 #include "openvswitch/vlog.h"
46 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server
);
48 struct ovsdb_jsonrpc_remote
;
49 struct ovsdb_jsonrpc_session
;
51 /* Set false to defeature monitor_cond, causing jsonrpc to respond to
52 * monitor_cond method with an error. */
53 static bool monitor_cond_enable__
= true;
55 /* Message rate-limiting. */
56 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
59 static struct ovsdb_jsonrpc_session
*ovsdb_jsonrpc_session_create(
60 struct ovsdb_jsonrpc_remote
*, struct jsonrpc_session
*, bool);
61 static void ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote
*,
63 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote
*);
64 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote
*);
65 static void ovsdb_jsonrpc_session_get_memory_usage_all(
66 const struct ovsdb_jsonrpc_remote
*, struct simap
*usage
);
67 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote
*);
68 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote
*,
71 static void ovsdb_jsonrpc_session_set_all_options(
72 struct ovsdb_jsonrpc_remote
*, const struct ovsdb_jsonrpc_options
*);
73 static bool ovsdb_jsonrpc_active_session_get_status(
74 const struct ovsdb_jsonrpc_remote
*,
75 struct ovsdb_jsonrpc_remote_status
*);
76 static void ovsdb_jsonrpc_session_get_status(
77 const struct ovsdb_jsonrpc_session
*,
78 struct ovsdb_jsonrpc_remote_status
*);
79 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session
*);
80 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter
*);
81 static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session
*,
82 struct jsonrpc_msg
*);
85 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session
*,
87 struct jsonrpc_msg
*request
);
88 static struct ovsdb_jsonrpc_trigger
*ovsdb_jsonrpc_trigger_find(
89 struct ovsdb_jsonrpc_session
*, const struct json
*id
, size_t hash
);
90 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger
*);
91 static void ovsdb_jsonrpc_trigger_preremove_db(struct ovsdb_jsonrpc_session
*,
93 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session
*);
94 static void ovsdb_jsonrpc_trigger_complete_done(
95 struct ovsdb_jsonrpc_session
*);
98 static struct jsonrpc_msg
*ovsdb_jsonrpc_monitor_create(
99 struct ovsdb_jsonrpc_session
*, struct ovsdb
*, struct json
*params
,
100 enum ovsdb_monitor_version
, const struct json
*request_id
);
101 static struct jsonrpc_msg
*ovsdb_jsonrpc_monitor_cond_change(
102 struct ovsdb_jsonrpc_session
*s
,
104 const struct json
*request_id
);
105 static struct jsonrpc_msg
*ovsdb_jsonrpc_monitor_cancel(
106 struct ovsdb_jsonrpc_session
*,
107 struct json_array
*params
,
108 const struct json
*request_id
);
109 static void ovsdb_jsonrpc_monitor_preremove_db(struct ovsdb_jsonrpc_session
*,
111 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session
*);
112 static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session
*);
113 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session
*);
114 static struct json
*ovsdb_jsonrpc_monitor_compose_update(
115 struct ovsdb_jsonrpc_monitor
*monitor
, bool initial
);
116 static struct jsonrpc_msg
* ovsdb_jsonrpc_create_notify(
117 const struct ovsdb_jsonrpc_monitor
*m
,
118 struct json
*params
);
121 /* JSON-RPC database server. */
123 struct ovsdb_jsonrpc_server
{
124 struct ovsdb_server up
;
125 unsigned int n_sessions
;
126 bool read_only
; /* This server is does not accept any
127 transactions that can modify the database. */
128 struct shash remotes
; /* Contains "struct ovsdb_jsonrpc_remote *"s. */
131 /* A configured remote. This is either a passive stream listener plus a list
132 * of the currently connected sessions, or a list of exactly one active
134 struct ovsdb_jsonrpc_remote
{
135 struct ovsdb_jsonrpc_server
*server
;
136 struct pstream
*listener
; /* Listener, if passive. */
137 struct ovs_list sessions
; /* List of "struct ovsdb_jsonrpc_session"s. */
143 static struct ovsdb_jsonrpc_remote
*ovsdb_jsonrpc_server_add_remote(
144 struct ovsdb_jsonrpc_server
*, const char *name
,
145 const struct ovsdb_jsonrpc_options
*options
147 static void ovsdb_jsonrpc_server_del_remote(struct shash_node
*);
149 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
151 * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
152 * which 'server' should provide access. */
153 struct ovsdb_jsonrpc_server
*
154 ovsdb_jsonrpc_server_create(bool read_only
)
156 struct ovsdb_jsonrpc_server
*server
= xzalloc(sizeof *server
);
157 ovsdb_server_init(&server
->up
);
158 shash_init(&server
->remotes
);
159 server
->read_only
= read_only
;
163 /* Adds 'db' to the set of databases served out by 'svr'. Returns true if
164 * successful, false if 'db''s name is the same as some database already in
167 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server
*svr
, struct ovsdb
*db
)
169 ovsdb_jsonrpc_server_reconnect(
170 svr
, false, xasprintf("adding %s database", db
->name
));
171 return ovsdb_server_add_db(&svr
->up
, db
);
174 /* Removes 'db' from the set of databases served out by 'svr'.
176 * 'comment' should be a human-readable reason for removing the database, for
177 * use in log messages, or NULL to suppress logging. This function frees
180 ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server
*svr
,
181 struct ovsdb
*db
, char *comment
)
183 struct shash_node
*node
;
184 SHASH_FOR_EACH (node
, &svr
->remotes
) {
185 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
187 ovsdb_jsonrpc_session_preremove_db(remote
, db
);
190 ovsdb_jsonrpc_server_reconnect(svr
, false, comment
);
192 ovsdb_server_remove_db(&svr
->up
, db
);
196 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server
*svr
)
198 struct shash_node
*node
, *next
;
200 SHASH_FOR_EACH_SAFE (node
, next
, &svr
->remotes
) {
201 ovsdb_jsonrpc_server_del_remote(node
);
203 shash_destroy(&svr
->remotes
);
204 ovsdb_server_destroy(&svr
->up
);
208 struct ovsdb_jsonrpc_options
*
209 ovsdb_jsonrpc_default_options(const char *target
)
211 struct ovsdb_jsonrpc_options
*options
= xzalloc(sizeof *options
);
212 options
->max_backoff
= RECONNECT_DEFAULT_MAX_BACKOFF
;
213 options
->probe_interval
= (stream_or_pstream_needs_probes(target
)
214 ? RECONNECT_DEFAULT_PROBE_INTERVAL
219 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
220 * options in the struct ovsdb_jsonrpc_options supplied as the data values.
222 * A remote is an active or passive stream connection method, e.g. "pssl:" or
225 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server
*svr
,
226 const struct shash
*new_remotes
)
228 struct shash_node
*node
, *next
;
230 SHASH_FOR_EACH_SAFE (node
, next
, &svr
->remotes
) {
231 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
232 struct ovsdb_jsonrpc_options
*options
233 = shash_find_data(new_remotes
, node
->name
);
236 VLOG_INFO("%s: remote deconfigured", node
->name
);
237 ovsdb_jsonrpc_server_del_remote(node
);
238 } else if (options
->dscp
!= remote
->dscp
) {
239 ovsdb_jsonrpc_server_del_remote(node
);
242 SHASH_FOR_EACH (node
, new_remotes
) {
243 const struct ovsdb_jsonrpc_options
*options
= node
->data
;
244 struct ovsdb_jsonrpc_remote
*remote
;
246 remote
= shash_find_data(&svr
->remotes
, node
->name
);
248 remote
= ovsdb_jsonrpc_server_add_remote(svr
, node
->name
, options
);
254 ovsdb_jsonrpc_session_set_all_options(remote
, options
);
258 static struct ovsdb_jsonrpc_remote
*
259 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server
*svr
,
261 const struct ovsdb_jsonrpc_options
*options
)
263 struct ovsdb_jsonrpc_remote
*remote
;
264 struct pstream
*listener
;
267 error
= jsonrpc_pstream_open(name
, &listener
, options
->dscp
);
268 if (error
&& error
!= EAFNOSUPPORT
) {
269 VLOG_ERR_RL(&rl
, "%s: listen failed: %s", name
, ovs_strerror(error
));
273 remote
= xmalloc(sizeof *remote
);
274 remote
->server
= svr
;
275 remote
->listener
= listener
;
276 ovs_list_init(&remote
->sessions
);
277 remote
->dscp
= options
->dscp
;
278 remote
->read_only
= options
->read_only
;
279 remote
->role
= nullable_xstrdup(options
->role
);
280 shash_add(&svr
->remotes
, name
, remote
);
283 ovsdb_jsonrpc_session_create(remote
, jsonrpc_session_open(name
, true),
284 svr
->read_only
|| remote
->read_only
);
290 ovsdb_jsonrpc_server_del_remote(struct shash_node
*node
)
292 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
294 ovsdb_jsonrpc_session_close_all(remote
);
295 pstream_close(remote
->listener
);
296 shash_delete(&remote
->server
->remotes
, node
);
301 /* Stores status information for the remote named 'target', which should have
302 * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
303 * into '*status'. On success returns true, on failure (if 'svr' doesn't have
304 * a remote named 'target' or if that remote is an outbound remote that has no
305 * active connections) returns false. On failure, 'status' will be zeroed.
308 ovsdb_jsonrpc_server_get_remote_status(
309 const struct ovsdb_jsonrpc_server
*svr
, const char *target
,
310 struct ovsdb_jsonrpc_remote_status
*status
)
312 const struct ovsdb_jsonrpc_remote
*remote
;
314 memset(status
, 0, sizeof *status
);
316 remote
= shash_find_data(&svr
->remotes
, target
);
322 if (remote
->listener
) {
323 status
->bound_port
= pstream_get_bound_port(remote
->listener
);
324 status
->is_connected
= !ovs_list_is_empty(&remote
->sessions
);
325 status
->n_connections
= ovs_list_size(&remote
->sessions
);
329 return ovsdb_jsonrpc_active_session_get_status(remote
, status
);
333 ovsdb_jsonrpc_server_free_remote_status(
334 struct ovsdb_jsonrpc_remote_status
*status
)
336 free(status
->locks_held
);
337 free(status
->locks_waiting
);
338 free(status
->locks_lost
);
341 /* Makes all of the JSON-RPC sessions managed by 'svr' disconnect. (They
342 * will then generally reconnect.). Uses 'comment' as a human-readable comment
343 * for logging (it may be NULL to suppress logging). Frees 'comment'.
345 * If 'force' is true, disconnects all sessions. Otherwise, disconnects only
346 * sesions that aren't database change aware. */
348 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server
*svr
, bool force
,
351 struct shash_node
*node
;
353 SHASH_FOR_EACH (node
, &svr
->remotes
) {
354 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
356 ovsdb_jsonrpc_session_reconnect_all(remote
, force
, comment
);
363 ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server
*svr
,
366 if (svr
->read_only
!= read_only
) {
367 svr
->read_only
= read_only
;
368 ovsdb_jsonrpc_server_reconnect(svr
, false,
370 ? "making server read-only"
371 : "making server read/write"));
376 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server
*svr
)
378 struct shash_node
*node
;
380 SHASH_FOR_EACH (node
, &svr
->remotes
) {
381 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
383 if (remote
->listener
) {
384 struct stream
*stream
;
387 error
= pstream_accept(remote
->listener
, &stream
);
389 struct jsonrpc_session
*js
;
390 js
= jsonrpc_session_open_unreliably(jsonrpc_open(stream
),
392 ovsdb_jsonrpc_session_create(remote
, js
, svr
->read_only
||
394 } else if (error
!= EAGAIN
) {
395 VLOG_WARN_RL(&rl
, "%s: accept failed: %s",
396 pstream_get_name(remote
->listener
),
397 ovs_strerror(error
));
401 ovsdb_jsonrpc_session_run_all(remote
);
406 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server
*svr
)
408 struct shash_node
*node
;
410 SHASH_FOR_EACH (node
, &svr
->remotes
) {
411 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
413 if (remote
->listener
) {
414 pstream_wait(remote
->listener
);
417 ovsdb_jsonrpc_session_wait_all(remote
);
421 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
422 * memory_report(). */
424 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server
*svr
,
427 struct shash_node
*node
;
429 simap_increase(usage
, "sessions", svr
->n_sessions
);
430 SHASH_FOR_EACH (node
, &svr
->remotes
) {
431 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
433 ovsdb_jsonrpc_session_get_memory_usage_all(remote
, usage
);
437 /* JSON-RPC database server session. */
439 struct ovsdb_jsonrpc_session
{
440 struct ovs_list node
; /* Element in remote's sessions list. */
441 struct ovsdb_session up
;
442 struct ovsdb_jsonrpc_remote
*remote
;
444 /* RFC 7047 does not contemplate how to alert clients to changes to the set
445 * of databases, e.g. databases that are added or removed while the
446 * database server is running. Traditionally, ovsdb-server disconnects all
447 * of its clients when this happens; a well-written client will reassess
448 * what is available from the server upon reconnection.
450 * OVS 2.9 introduces a way for clients to monitor changes to the databases
451 * being served, through the Database table in the _Server database that
452 * OVSDB adds in this version. ovsdb-server suppresses the connection
453 * close for clients that identify themselves as taking advantage of this
454 * mechanism. When this member is true, it indicates that the client
455 * requested such suppression. */
456 bool db_change_aware
;
459 struct hmap triggers
; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
462 struct hmap monitors
; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
464 /* Network connectivity. */
465 struct jsonrpc_session
*js
; /* JSON-RPC session. */
466 unsigned int js_seqno
; /* Last jsonrpc_session_get_seqno() value. */
469 bool read_only
; /* When true, not allow to modify the
473 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session
*);
474 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session
*);
475 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session
*);
476 static void ovsdb_jsonrpc_session_get_memory_usage(
477 const struct ovsdb_jsonrpc_session
*, struct simap
*usage
);
478 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session
*,
479 struct jsonrpc_msg
*);
480 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session
*,
481 struct jsonrpc_msg
*);
483 static struct ovsdb_jsonrpc_session
*
484 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote
*remote
,
485 struct jsonrpc_session
*js
, bool read_only
)
487 struct ovsdb_jsonrpc_session
*s
;
489 s
= xzalloc(sizeof *s
);
490 ovsdb_session_init(&s
->up
, &remote
->server
->up
);
492 ovs_list_push_back(&remote
->sessions
, &s
->node
);
493 hmap_init(&s
->triggers
);
494 hmap_init(&s
->monitors
);
496 s
->js_seqno
= jsonrpc_session_get_seqno(js
);
497 s
->read_only
= read_only
;
499 remote
->server
->n_sessions
++;
504 /* Database 'db' is about to be removed from the database server. To prepare,
505 * this function removes all references to 'db' from 'remote'. */
507 ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote
*remote
,
510 struct ovsdb_jsonrpc_session
*s
;
512 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
513 ovsdb_jsonrpc_monitor_preremove_db(s
, db
);
514 ovsdb_jsonrpc_trigger_preremove_db(s
, db
);
519 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session
*s
)
521 ovsdb_jsonrpc_monitor_remove_all(s
);
522 ovsdb_jsonrpc_session_unlock_all(s
);
523 ovsdb_jsonrpc_trigger_complete_all(s
);
525 hmap_destroy(&s
->monitors
);
526 hmap_destroy(&s
->triggers
);
528 jsonrpc_session_close(s
->js
);
529 ovs_list_remove(&s
->node
);
530 s
->remote
->server
->n_sessions
--;
531 ovsdb_session_destroy(&s
->up
);
536 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session
*s
)
538 jsonrpc_session_run(s
->js
);
539 if (s
->js_seqno
!= jsonrpc_session_get_seqno(s
->js
)) {
540 s
->js_seqno
= jsonrpc_session_get_seqno(s
->js
);
541 ovsdb_jsonrpc_trigger_complete_all(s
);
542 ovsdb_jsonrpc_monitor_remove_all(s
);
543 ovsdb_jsonrpc_session_unlock_all(s
);
546 ovsdb_jsonrpc_trigger_complete_done(s
);
548 if (!jsonrpc_session_get_backlog(s
->js
)) {
549 struct jsonrpc_msg
*msg
;
551 ovsdb_jsonrpc_monitor_flush_all(s
);
553 msg
= jsonrpc_session_recv(s
->js
);
555 if (msg
->type
== JSONRPC_REQUEST
) {
556 ovsdb_jsonrpc_session_got_request(s
, msg
);
557 } else if (msg
->type
== JSONRPC_NOTIFY
) {
558 ovsdb_jsonrpc_session_got_notify(s
, msg
);
560 VLOG_WARN("%s: received unexpected %s message",
561 jsonrpc_session_get_name(s
->js
),
562 jsonrpc_msg_type_to_string(msg
->type
));
563 jsonrpc_session_force_reconnect(s
->js
);
564 jsonrpc_msg_destroy(msg
);
568 return jsonrpc_session_is_alive(s
->js
) ? 0 : ETIMEDOUT
;
572 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session
*session
,
573 const struct ovsdb_jsonrpc_options
*options
)
575 jsonrpc_session_set_max_backoff(session
->js
, options
->max_backoff
);
576 jsonrpc_session_set_probe_interval(session
->js
, options
->probe_interval
);
577 jsonrpc_session_set_dscp(session
->js
, options
->dscp
);
581 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote
*remote
)
583 struct ovsdb_jsonrpc_session
*s
, *next
;
585 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
586 int error
= ovsdb_jsonrpc_session_run(s
);
588 ovsdb_jsonrpc_session_close(s
);
594 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session
*s
)
596 jsonrpc_session_wait(s
->js
);
597 if (!jsonrpc_session_get_backlog(s
->js
)) {
598 if (ovsdb_jsonrpc_monitor_needs_flush(s
)) {
599 poll_immediate_wake();
601 jsonrpc_session_recv_wait(s
->js
);
607 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote
*remote
)
609 struct ovsdb_jsonrpc_session
*s
;
611 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
612 ovsdb_jsonrpc_session_wait(s
);
617 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session
*s
,
620 simap_increase(usage
, "triggers", hmap_count(&s
->triggers
));
621 simap_increase(usage
, "backlog", jsonrpc_session_get_backlog(s
->js
));
625 ovsdb_jsonrpc_session_get_memory_usage_all(
626 const struct ovsdb_jsonrpc_remote
*remote
,
629 struct ovsdb_jsonrpc_session
*s
;
631 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
632 ovsdb_jsonrpc_session_get_memory_usage(s
, usage
);
637 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote
*remote
)
639 struct ovsdb_jsonrpc_session
*s
, *next
;
641 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
642 ovsdb_jsonrpc_session_close(s
);
646 /* Makes all of the JSON-RPC sessions managed by 'remote' disconnect. (They
647 * will then generally reconnect.). 'comment' should be a human-readable
648 * explanation of the reason for disconnection, for use in log messages, or
649 * NULL to suppress logging.
651 * If 'force' is true, disconnects all sessions. Otherwise, disconnects only
652 * sesions that aren't database change aware. */
654 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote
*remote
,
655 bool force
, const char *comment
)
657 struct ovsdb_jsonrpc_session
*s
, *next
;
659 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
660 if (force
|| !s
->db_change_aware
) {
661 jsonrpc_session_force_reconnect(s
->js
);
662 if (comment
&& jsonrpc_session_is_connected(s
->js
)) {
663 VLOG_INFO("%s: disconnecting (%s)",
664 jsonrpc_session_get_name(s
->js
), comment
);
666 if (!jsonrpc_session_is_alive(s
->js
)) {
667 ovsdb_jsonrpc_session_close(s
);
673 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
676 * (The dscp value can't be changed directly; the caller must instead close and
677 * re-open the session.) */
679 ovsdb_jsonrpc_session_set_all_options(
680 struct ovsdb_jsonrpc_remote
*remote
,
681 const struct ovsdb_jsonrpc_options
*options
)
683 struct ovsdb_jsonrpc_session
*s
;
685 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
686 ovsdb_jsonrpc_session_set_options(s
, options
);
690 /* Sets the 'status' of for the 'remote' with an outgoing connection. */
692 ovsdb_jsonrpc_active_session_get_status(
693 const struct ovsdb_jsonrpc_remote
*remote
,
694 struct ovsdb_jsonrpc_remote_status
*status
)
696 const struct ovs_list
*sessions
= &remote
->sessions
;
697 const struct ovsdb_jsonrpc_session
*s
;
699 if (ovs_list_is_empty(sessions
)) {
703 ovs_assert(ovs_list_is_singleton(sessions
));
704 s
= CONTAINER_OF(ovs_list_front(sessions
), struct ovsdb_jsonrpc_session
, node
);
705 ovsdb_jsonrpc_session_get_status(s
, status
);
706 status
->n_connections
= 1;
712 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_session
*session
,
713 struct ovsdb_jsonrpc_remote_status
*status
)
715 const struct ovsdb_jsonrpc_session
*s
= session
;
716 const struct jsonrpc_session
*js
;
717 struct ovsdb_lock_waiter
*waiter
;
718 struct reconnect_stats rstats
;
719 struct ds locks_held
, locks_waiting
, locks_lost
;
723 status
->is_connected
= jsonrpc_session_is_connected(js
);
724 status
->last_error
= jsonrpc_session_get_status(js
);
726 jsonrpc_session_get_reconnect_stats(js
, &rstats
);
727 status
->state
= rstats
.state
;
728 status
->sec_since_connect
= rstats
.msec_since_connect
== UINT_MAX
729 ? UINT_MAX
: rstats
.msec_since_connect
/ 1000;
730 status
->sec_since_disconnect
= rstats
.msec_since_disconnect
== UINT_MAX
731 ? UINT_MAX
: rstats
.msec_since_disconnect
/ 1000;
733 ds_init(&locks_held
);
734 ds_init(&locks_waiting
);
735 ds_init(&locks_lost
);
736 HMAP_FOR_EACH (waiter
, session_node
, &s
->up
.waiters
) {
739 string
= (ovsdb_lock_waiter_is_owner(waiter
) ? &locks_held
740 : waiter
->mode
== OVSDB_LOCK_WAIT
? &locks_waiting
742 if (string
->length
) {
743 ds_put_char(string
, ' ');
745 ds_put_cstr(string
, waiter
->lock_name
);
747 status
->locks_held
= ds_steal_cstr(&locks_held
);
748 status
->locks_waiting
= ds_steal_cstr(&locks_waiting
);
749 status
->locks_lost
= ds_steal_cstr(&locks_lost
);
752 /* Examines 'request' to determine the database to which it relates, and then
753 * searches 's' to find that database:
755 * - If successful, returns the database and sets '*replyp' to NULL.
757 * - If no such database exists, returns NULL and sets '*replyp' to an
758 * appropriate JSON-RPC error reply, owned by the caller. */
759 static struct ovsdb
*
760 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session
*s
,
761 const struct jsonrpc_msg
*request
,
762 struct jsonrpc_msg
**replyp
)
764 struct json_array
*params
;
765 struct ovsdb_error
*error
;
769 params
= json_array(request
->params
);
770 if (!params
->n
|| params
->elems
[0]->type
!= JSON_STRING
) {
771 error
= ovsdb_syntax_error(
772 request
->params
, NULL
,
773 "%s request params must begin with <db-name>", request
->method
);
777 db_name
= params
->elems
[0]->string
;
778 db
= shash_find_data(&s
->up
.server
->dbs
, db_name
);
780 error
= ovsdb_syntax_error(
781 request
->params
, "unknown database",
782 "%s request specifies unknown database %s",
783 request
->method
, db_name
);
788 error
= ovsdb_error("database not available",
789 "%s request specifies database %s which is not "
790 "yet available because it has not completed "
791 "joining its cluster",
792 request
->method
, db_name
);
800 *replyp
= jsonrpc_create_error(ovsdb_error_to_json_free(error
),
805 static struct ovsdb_error
*
806 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg
*request
,
807 const char **lock_namep
)
809 const struct json_array
*params
;
811 params
= json_array(request
->params
);
812 if (params
->n
!= 1 || params
->elems
[0]->type
!= JSON_STRING
||
813 !ovsdb_parser_is_id(json_string(params
->elems
[0]))) {
815 return ovsdb_syntax_error(request
->params
, NULL
,
816 "%s request params must be <id>",
820 *lock_namep
= json_string(params
->elems
[0]);
825 ovsdb_jsonrpc_session_notify(struct ovsdb_session
*session
,
826 const char *lock_name
,
829 struct ovsdb_jsonrpc_session
*s
;
832 s
= CONTAINER_OF(session
, struct ovsdb_jsonrpc_session
, up
);
833 params
= json_array_create_1(json_string_create(lock_name
));
834 ovsdb_jsonrpc_session_send(s
, jsonrpc_create_notify(method
, params
));
837 static struct jsonrpc_msg
*
838 jsonrpc_create_readonly_lock_error(const struct json
*id
)
840 return jsonrpc_create_error(json_string_create(
841 "lock and unlock methods not allowed,"
842 " DB server is read only."), id
);
845 static struct jsonrpc_msg
*
846 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session
*s
,
847 struct jsonrpc_msg
*request
,
848 enum ovsdb_lock_mode mode
)
850 struct ovsdb_lock_waiter
*waiter
;
851 struct ovsdb_error
*error
;
852 struct ovsdb_session
*victim
;
853 const char *lock_name
;
857 return jsonrpc_create_readonly_lock_error(request
->id
);
860 error
= ovsdb_jsonrpc_session_parse_lock_name(request
, &lock_name
);
865 /* Report error if this session has issued a "lock" or "steal" without a
866 * matching "unlock" for this lock. */
867 waiter
= ovsdb_session_get_lock_waiter(&s
->up
, lock_name
);
869 error
= ovsdb_syntax_error(
870 request
->params
, NULL
,
871 "must issue \"unlock\" before new \"%s\"", request
->method
);
875 /* Get the lock, add us as a waiter. */
876 waiter
= ovsdb_server_lock(&s
->remote
->server
->up
, &s
->up
, lock_name
, mode
,
879 ovsdb_jsonrpc_session_notify(victim
, lock_name
, "stolen");
882 result
= json_object_create();
883 json_object_put(result
, "locked",
884 json_boolean_create(ovsdb_lock_waiter_is_owner(waiter
)));
886 return jsonrpc_create_reply(result
, request
->id
);
889 return jsonrpc_create_error(ovsdb_error_to_json_free(error
), request
->id
);
893 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session
*s
)
895 struct ovsdb_lock_waiter
*waiter
, *next
;
897 HMAP_FOR_EACH_SAFE (waiter
, next
, session_node
, &s
->up
.waiters
) {
898 ovsdb_jsonrpc_session_unlock__(waiter
);
903 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter
*waiter
)
905 struct ovsdb_lock
*lock
= waiter
->lock
;
908 struct ovsdb_session
*new_owner
= ovsdb_lock_waiter_remove(waiter
);
910 ovsdb_jsonrpc_session_notify(new_owner
, lock
->name
, "locked");
912 /* ovsdb_server_lock() might have freed 'lock'. */
916 ovsdb_lock_waiter_destroy(waiter
);
919 static struct jsonrpc_msg
*
920 syntax_error_reply(const struct jsonrpc_msg
*request
, const char *details
)
922 struct ovsdb_error
*error
= ovsdb_syntax_error(
923 request
->params
, NULL
, "%s: %s", request
->method
, details
);
924 struct jsonrpc_msg
*msg
= jsonrpc_create_error(ovsdb_error_to_json(error
),
926 ovsdb_error_destroy(error
);
930 static struct jsonrpc_msg
*
931 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session
*s
,
932 struct jsonrpc_msg
*request
)
934 struct ovsdb_lock_waiter
*waiter
;
935 struct ovsdb_error
*error
;
936 const char *lock_name
;
939 return jsonrpc_create_readonly_lock_error(request
->id
);
942 error
= ovsdb_jsonrpc_session_parse_lock_name(request
, &lock_name
);
944 return jsonrpc_create_error(ovsdb_error_to_json_free(error
),
948 /* Report error if this session has not issued a "lock" or "steal" for this
950 waiter
= ovsdb_session_get_lock_waiter(&s
->up
, lock_name
);
952 return syntax_error_reply(request
,
953 "\"unlock\" without \"lock\" or \"steal\"");
956 ovsdb_jsonrpc_session_unlock__(waiter
);
958 return jsonrpc_create_reply(json_object_create(), request
->id
);
961 static struct jsonrpc_msg
*
962 ovsdb_jsonrpc_session_set_db_change_aware(struct ovsdb_jsonrpc_session
*s
,
963 const struct jsonrpc_msg
*request
)
965 const struct json_array
*params
= json_array(request
->params
);
967 || (params
->elems
[0]->type
!= JSON_TRUE
&&
968 params
->elems
[0]->type
!= JSON_FALSE
)) {
969 return syntax_error_reply(request
, "true or false parameter expected");
972 s
->db_change_aware
= json_boolean(params
->elems
[0]);
973 return jsonrpc_create_reply(json_object_create(), request
->id
);
977 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session
*s
,
978 struct jsonrpc_msg
*request
)
980 struct jsonrpc_msg
*reply
;
982 if (!strcmp(request
->method
, "transact") ||
983 !strcmp(request
->method
, "convert")) {
984 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
986 ovsdb_jsonrpc_trigger_create(s
, db
, request
);
988 } else if (!strcmp(request
->method
, "monitor") ||
989 (monitor_cond_enable__
&&
990 (!strcmp(request
->method
, "monitor_cond") ||
991 !strcmp(request
->method
, "monitor_cond_since")))) {
992 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
994 enum ovsdb_monitor_version version
;
995 if (!strcmp(request
->method
, "monitor")) {
996 version
= OVSDB_MONITOR_V1
;
997 } else if (!strcmp(request
->method
, "monitor_cond")) {
998 version
= OVSDB_MONITOR_V2
;
1000 version
= OVSDB_MONITOR_V3
;
1002 reply
= ovsdb_jsonrpc_monitor_create(s
, db
, request
->params
,
1003 version
, request
->id
);
1005 } else if (!strcmp(request
->method
, "monitor_cond_change")) {
1006 reply
= ovsdb_jsonrpc_monitor_cond_change(s
, request
->params
,
1008 } else if (!strcmp(request
->method
, "monitor_cancel")) {
1009 reply
= ovsdb_jsonrpc_monitor_cancel(s
, json_array(request
->params
),
1011 } else if (!strcmp(request
->method
, "get_schema")) {
1012 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
1014 reply
= jsonrpc_create_reply(ovsdb_schema_to_json(db
->schema
),
1017 } else if (!strcmp(request
->method
, "list_dbs")) {
1018 size_t n_dbs
= shash_count(&s
->up
.server
->dbs
);
1019 struct shash_node
*node
;
1023 dbs
= xmalloc(n_dbs
* sizeof *dbs
);
1025 SHASH_FOR_EACH (node
, &s
->up
.server
->dbs
) {
1026 dbs
[i
++] = json_string_create(node
->name
);
1028 reply
= jsonrpc_create_reply(json_array_create(dbs
, n_dbs
),
1030 } else if (!strcmp(request
->method
, "get_server_id")) {
1031 const struct uuid
*uuid
= &s
->up
.server
->uuid
;
1032 struct json
*result
;
1034 result
= json_string_create_nocopy(xasprintf(UUID_FMT
,
1036 reply
= jsonrpc_create_reply(result
, request
->id
);
1037 } else if (!strcmp(request
->method
, "lock")) {
1038 reply
= ovsdb_jsonrpc_session_lock(s
, request
, OVSDB_LOCK_WAIT
);
1039 } else if (!strcmp(request
->method
, "steal")) {
1040 reply
= ovsdb_jsonrpc_session_lock(s
, request
, OVSDB_LOCK_STEAL
);
1041 } else if (!strcmp(request
->method
, "unlock")) {
1042 reply
= ovsdb_jsonrpc_session_unlock(s
, request
);
1043 } else if (!strcmp(request
->method
, "set_db_change_aware")) {
1044 reply
= ovsdb_jsonrpc_session_set_db_change_aware(s
, request
);
1045 } else if (!strcmp(request
->method
, "echo")) {
1046 reply
= jsonrpc_create_reply(json_clone(request
->params
), request
->id
);
1048 reply
= jsonrpc_create_error(json_string_create("unknown method"),
1053 jsonrpc_msg_destroy(request
);
1054 ovsdb_jsonrpc_session_send(s
, reply
);
1059 execute_cancel(struct ovsdb_jsonrpc_session
*s
, struct jsonrpc_msg
*request
)
1061 if (json_array(request
->params
)->n
== 1) {
1062 struct ovsdb_jsonrpc_trigger
*t
;
1065 id
= request
->params
->array
.elems
[0];
1066 t
= ovsdb_jsonrpc_trigger_find(s
, id
, json_hash(id
, 0));
1068 ovsdb_jsonrpc_trigger_complete(t
);
1074 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session
*s
,
1075 struct jsonrpc_msg
*request
)
1077 if (!strcmp(request
->method
, "cancel")) {
1078 execute_cancel(s
, request
);
1080 jsonrpc_msg_destroy(request
);
1084 ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session
*s
,
1085 struct jsonrpc_msg
*msg
)
1087 ovsdb_jsonrpc_monitor_flush_all(s
);
1088 jsonrpc_session_send(s
->js
, msg
);
1091 /* JSON-RPC database server triggers.
1093 * (Every transaction is treated as a trigger even if it doesn't actually have
1094 * any "wait" operations.) */
1096 struct ovsdb_jsonrpc_trigger
{
1097 struct ovsdb_trigger trigger
;
1098 struct hmap_node hmap_node
; /* In session's "triggers" hmap. */
1103 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session
*s
, struct ovsdb
*db
,
1104 struct jsonrpc_msg
*request
)
1106 /* Check for duplicate ID. */
1107 size_t hash
= json_hash(request
->id
, 0);
1108 struct ovsdb_jsonrpc_trigger
*t
1109 = ovsdb_jsonrpc_trigger_find(s
, request
->id
, hash
);
1111 ovsdb_jsonrpc_session_send(
1112 s
, syntax_error_reply(request
, "duplicate request ID"));
1113 jsonrpc_msg_destroy(request
);
1117 /* Insert into trigger table. */
1118 t
= xmalloc(sizeof *t
);
1119 bool disconnect_all
= ovsdb_trigger_init(
1120 &s
->up
, db
, &t
->trigger
, request
, time_msec(), s
->read_only
,
1121 s
->remote
->role
, jsonrpc_session_get_id(s
->js
));
1122 t
->id
= json_clone(request
->id
);
1123 hmap_insert(&s
->triggers
, &t
->hmap_node
, hash
);
1125 /* Complete early if possible. */
1126 if (ovsdb_trigger_is_complete(&t
->trigger
)) {
1127 ovsdb_jsonrpc_trigger_complete(t
);
1130 if (disconnect_all
) {
1131 /* The message below is currently the only reason to disconnect all
1133 ovsdb_jsonrpc_server_reconnect(s
->remote
->server
, false,
1134 xasprintf("committed %s database "
1135 "schema conversion",
1140 static struct ovsdb_jsonrpc_trigger
*
1141 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session
*s
,
1142 const struct json
*id
, size_t hash
)
1144 struct ovsdb_jsonrpc_trigger
*t
;
1146 HMAP_FOR_EACH_WITH_HASH (t
, hmap_node
, hash
, &s
->triggers
) {
1147 if (json_equal(t
->id
, id
)) {
1156 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger
*t
)
1158 struct ovsdb_jsonrpc_session
*s
;
1160 s
= CONTAINER_OF(t
->trigger
.session
, struct ovsdb_jsonrpc_session
, up
);
1162 if (jsonrpc_session_is_connected(s
->js
)) {
1163 bool complete
= ovsdb_trigger_is_complete(&t
->trigger
);
1164 if (s
->db_change_aware
&& !complete
) {
1165 ovsdb_trigger_cancel(&t
->trigger
, "closing JSON-RPC session");
1169 struct jsonrpc_msg
*reply
= ovsdb_trigger_steal_reply(&t
->trigger
);
1170 ovsdb_jsonrpc_session_send(s
, reply
);
1174 json_destroy(t
->id
);
1175 ovsdb_trigger_destroy(&t
->trigger
);
1176 hmap_remove(&s
->triggers
, &t
->hmap_node
);
1181 ovsdb_jsonrpc_trigger_remove__(struct ovsdb_jsonrpc_session
*s
,
1184 struct ovsdb_jsonrpc_trigger
*t
, *next
;
1185 HMAP_FOR_EACH_SAFE (t
, next
, hmap_node
, &s
->triggers
) {
1186 if (!db
|| t
->trigger
.db
== db
) {
1187 ovsdb_jsonrpc_trigger_complete(t
);
1192 /* Database 'db' is about to be removed from the database server. To prepare,
1193 * this function removes all references from triggers in 's' to 'db'. */
1195 ovsdb_jsonrpc_trigger_preremove_db(struct ovsdb_jsonrpc_session
*s
,
1199 ovsdb_jsonrpc_trigger_remove__(s
, db
);
1202 /* Removes all triggers from 's'. */
1204 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session
*s
)
1206 ovsdb_jsonrpc_trigger_remove__(s
, NULL
);
1210 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session
*s
)
1212 struct ovsdb_jsonrpc_trigger
*trigger
, *next
;
1213 LIST_FOR_EACH_SAFE (trigger
, next
, trigger
.node
, &s
->up
.completions
) {
1214 ovsdb_jsonrpc_trigger_complete(trigger
);
1218 /* Jsonrpc front end monitor. */
1219 struct ovsdb_jsonrpc_monitor
{
1220 struct hmap_node node
; /* In ovsdb_jsonrpc_session's "monitors". */
1221 struct ovsdb_jsonrpc_session
*session
;
1223 struct json
*monitor_id
;
1224 struct ovsdb_monitor
*dbmon
;
1225 struct ovsdb_monitor_change_set
*change_set
;
1226 enum ovsdb_monitor_version version
;
1227 struct ovsdb_monitor_session_condition
*condition
;/* Session's condition */
1230 static struct ovsdb_jsonrpc_monitor
*
1231 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session
*s
,
1232 const struct json
*monitor_id
)
1234 struct ovsdb_jsonrpc_monitor
*m
;
1236 HMAP_FOR_EACH_WITH_HASH (m
, node
, json_hash(monitor_id
, 0), &s
->monitors
) {
1237 if (json_equal(m
->monitor_id
, monitor_id
)) {
1246 parse_bool(struct ovsdb_parser
*parser
, const char *name
, bool default_value
)
1248 const struct json
*json
;
1250 json
= ovsdb_parser_member(parser
, name
, OP_BOOLEAN
| OP_OPTIONAL
);
1251 return json
? json_boolean(json
) : default_value
;
1254 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
1255 ovsdb_jsonrpc_parse_monitor_request(
1256 struct ovsdb_monitor
*dbmon
,
1257 const struct ovsdb_table
*table
,
1258 struct ovsdb_monitor_session_condition
*cond
,
1259 const struct json
*monitor_request
)
1261 const struct ovsdb_table_schema
*ts
= table
->schema
;
1262 enum ovsdb_monitor_selection select
;
1263 const struct json
*columns
, *select_json
, *where
= NULL
;
1264 struct ovsdb_parser parser
;
1265 struct ovsdb_error
*error
;
1267 ovsdb_parser_init(&parser
, monitor_request
, "table %s", ts
->name
);
1269 where
= ovsdb_parser_member(&parser
, "where", OP_ARRAY
| OP_OPTIONAL
);
1271 columns
= ovsdb_parser_member(&parser
, "columns", OP_ARRAY
| OP_OPTIONAL
);
1273 select_json
= ovsdb_parser_member(&parser
, "select",
1274 OP_OBJECT
| OP_OPTIONAL
);
1276 error
= ovsdb_parser_finish(&parser
);
1283 ovsdb_parser_init(&parser
, select_json
, "table %s select", ts
->name
);
1284 if (parse_bool(&parser
, "initial", true)) {
1285 select
|= OJMS_INITIAL
;
1287 if (parse_bool(&parser
, "insert", true)) {
1288 select
|= OJMS_INSERT
;
1290 if (parse_bool(&parser
, "delete", true)) {
1291 select
|= OJMS_DELETE
;
1293 if (parse_bool(&parser
, "modify", true)) {
1294 select
|= OJMS_MODIFY
;
1296 error
= ovsdb_parser_finish(&parser
);
1301 select
= OJMS_INITIAL
| OJMS_INSERT
| OJMS_DELETE
| OJMS_MODIFY
;
1304 ovsdb_monitor_table_add_select(dbmon
, table
, select
);
1308 if (columns
->type
!= JSON_ARRAY
) {
1309 return ovsdb_syntax_error(columns
, NULL
,
1310 "array of column names expected");
1313 for (i
= 0; i
< columns
->array
.n
; i
++) {
1314 const struct ovsdb_column
*column
;
1317 if (columns
->array
.elems
[i
]->type
!= JSON_STRING
) {
1318 return ovsdb_syntax_error(columns
, NULL
,
1319 "array of column names expected");
1322 s
= columns
->array
.elems
[i
]->string
;
1323 column
= shash_find_data(&table
->schema
->columns
, s
);
1325 return ovsdb_syntax_error(columns
, NULL
, "%s is not a valid "
1328 if (ovsdb_monitor_add_column(dbmon
, table
, column
,
1330 return ovsdb_syntax_error(columns
, NULL
, "column %s "
1331 "mentioned more than once",
1336 struct shash_node
*node
;
1338 SHASH_FOR_EACH (node
, &ts
->columns
) {
1339 const struct ovsdb_column
*column
= node
->data
;
1340 if (column
->index
!= OVSDB_COL_UUID
) {
1341 if (ovsdb_monitor_add_column(dbmon
, table
, column
,
1343 return ovsdb_syntax_error(columns
, NULL
, "column %s "
1344 "mentioned more than once",
1351 error
= ovsdb_monitor_table_condition_create(cond
, table
, where
);
1360 static struct jsonrpc_msg
*
1361 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session
*s
, struct ovsdb
*db
,
1362 struct json
*params
,
1363 enum ovsdb_monitor_version version
,
1364 const struct json
*request_id
)
1366 struct ovsdb_jsonrpc_monitor
*m
= NULL
;
1367 struct ovsdb_monitor
*dbmon
= NULL
;
1368 struct json
*monitor_id
, *monitor_requests
;
1369 struct ovsdb_error
*error
= NULL
;
1370 struct shash_node
*node
;
1373 if ((version
== OVSDB_MONITOR_V2
&& json_array(params
)->n
!= 3) ||
1374 (version
== OVSDB_MONITOR_V3
&& json_array(params
)->n
!= 4)) {
1375 error
= ovsdb_syntax_error(params
, NULL
, "invalid parameters");
1378 monitor_id
= params
->array
.elems
[1];
1379 monitor_requests
= params
->array
.elems
[2];
1380 if (monitor_requests
->type
!= JSON_OBJECT
) {
1381 error
= ovsdb_syntax_error(monitor_requests
, NULL
,
1382 "monitor-requests must be object");
1386 if (ovsdb_jsonrpc_monitor_find(s
, monitor_id
)) {
1387 error
= ovsdb_syntax_error(monitor_id
, NULL
, "duplicate monitor ID");
1391 m
= xzalloc(sizeof *m
);
1394 m
->dbmon
= ovsdb_monitor_create(db
, m
);
1395 if (version
== OVSDB_MONITOR_V2
|| version
== OVSDB_MONITOR_V3
) {
1396 m
->condition
= ovsdb_monitor_session_condition_create();
1398 m
->version
= version
;
1399 hmap_insert(&s
->monitors
, &m
->node
, json_hash(monitor_id
, 0));
1400 m
->monitor_id
= json_clone(monitor_id
);
1402 SHASH_FOR_EACH (node
, json_object(monitor_requests
)) {
1403 const struct ovsdb_table
*table
;
1404 const struct json
*mr_value
;
1407 table
= ovsdb_get_table(m
->db
, node
->name
);
1409 error
= ovsdb_syntax_error(NULL
, NULL
,
1410 "no table named %s", node
->name
);
1414 ovsdb_monitor_add_table(m
->dbmon
, table
);
1416 /* Parse columns. */
1417 mr_value
= node
->data
;
1418 if (mr_value
->type
== JSON_ARRAY
) {
1419 const struct json_array
*array
= &mr_value
->array
;
1421 for (i
= 0; i
< array
->n
; i
++) {
1422 error
= ovsdb_jsonrpc_parse_monitor_request(m
->dbmon
,
1431 error
= ovsdb_jsonrpc_parse_monitor_request(m
->dbmon
,
1441 dbmon
= ovsdb_monitor_add(m
->dbmon
);
1442 if (dbmon
!= m
->dbmon
) {
1443 /* Found an exisiting dbmon, reuse the current one. */
1444 ovsdb_monitor_remove_jsonrpc_monitor(m
->dbmon
, m
, NULL
);
1445 ovsdb_monitor_add_jsonrpc_monitor(dbmon
, m
);
1449 /* Only now we can bind session's condition to ovsdb_monitor */
1451 ovsdb_monitor_condition_bind(m
->dbmon
, m
->condition
);
1454 bool initial
= false;
1455 if (version
== OVSDB_MONITOR_V3
) {
1456 struct json
*last_id
= params
->array
.elems
[3];
1457 if (last_id
->type
!= JSON_STRING
) {
1458 error
= ovsdb_syntax_error(last_id
, NULL
,
1459 "last-txn-id must be string");
1462 struct uuid txn_uuid
;
1463 if (!uuid_from_string(&txn_uuid
, last_id
->string
)) {
1464 error
= ovsdb_syntax_error(last_id
, NULL
,
1465 "last-txn-id must be UUID format.");
1468 if (!uuid_is_zero(&txn_uuid
)) {
1469 ovsdb_monitor_get_changes_after(&txn_uuid
, m
->dbmon
,
1473 if (!m
->change_set
) {
1474 ovsdb_monitor_get_initial(m
->dbmon
, &m
->change_set
);
1477 json
= ovsdb_jsonrpc_monitor_compose_update(m
, initial
);
1478 json
= json
? json
: json_object_create();
1480 if (m
->version
== OVSDB_MONITOR_V3
) {
1481 struct json
*json_last_id
= json_string_create_nocopy(
1483 UUID_ARGS(ovsdb_monitor_get_last_txnid(
1486 struct json
*json_found
= json_boolean_create(!initial
);
1487 json
= json_array_create_3(json_found
, json_last_id
, json
);
1490 return jsonrpc_create_reply(json
, request_id
);
1494 ovsdb_jsonrpc_monitor_destroy(m
, false);
1497 return jsonrpc_create_error(ovsdb_error_to_json_free(error
), request_id
);
1500 static struct ovsdb_error
*
1501 ovsdb_jsonrpc_parse_monitor_cond_change_request(
1502 struct ovsdb_jsonrpc_monitor
*m
,
1503 const struct ovsdb_table
*table
,
1504 const struct json
*cond_change_req
)
1506 const struct ovsdb_table_schema
*ts
= table
->schema
;
1507 const struct json
*condition
, *columns
;
1508 struct ovsdb_parser parser
;
1509 struct ovsdb_error
*error
;
1511 ovsdb_parser_init(&parser
, cond_change_req
, "table %s", ts
->name
);
1512 columns
= ovsdb_parser_member(&parser
, "columns", OP_ARRAY
| OP_OPTIONAL
);
1513 condition
= ovsdb_parser_member(&parser
, "where", OP_ARRAY
| OP_OPTIONAL
);
1515 error
= ovsdb_parser_finish(&parser
);
1521 error
= ovsdb_syntax_error(cond_change_req
, NULL
, "changing columns "
1525 error
= ovsdb_monitor_table_condition_update(m
->dbmon
, m
->condition
, table
,
1531 static struct jsonrpc_msg
*
1532 ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session
*s
,
1533 struct json
*params
,
1534 const struct json
*request_id
)
1536 struct ovsdb_error
*error
;
1537 struct ovsdb_jsonrpc_monitor
*m
;
1538 struct json
*monitor_cond_change_reqs
;
1539 struct shash_node
*node
;
1541 if (json_array(params
)->n
!= 3) {
1542 error
= ovsdb_syntax_error(params
, NULL
, "invalid parameters");
1546 m
= ovsdb_jsonrpc_monitor_find(s
, params
->array
.elems
[0]);
1548 error
= ovsdb_syntax_error(params
->array
.elems
[0], NULL
,
1549 "unknown monitor session");
1553 const struct json
*new_monitor_id
= params
->array
.elems
[1];
1554 bool changing_id
= !json_equal(m
->monitor_id
, new_monitor_id
);
1555 if (changing_id
&& ovsdb_jsonrpc_monitor_find(s
, new_monitor_id
)) {
1556 error
= ovsdb_syntax_error(new_monitor_id
, NULL
,
1557 "duplicate monitor ID");
1561 monitor_cond_change_reqs
= params
->array
.elems
[2];
1562 if (monitor_cond_change_reqs
->type
!= JSON_OBJECT
) {
1564 ovsdb_syntax_error(NULL
, NULL
,
1565 "monitor-cond-change-requests must be object");
1569 SHASH_FOR_EACH (node
, json_object(monitor_cond_change_reqs
)) {
1570 const struct ovsdb_table
*table
;
1571 const struct json
*mr_value
;
1574 table
= ovsdb_get_table(m
->db
, node
->name
);
1576 error
= ovsdb_syntax_error(NULL
, NULL
,
1577 "no table named %s", node
->name
);
1580 if (!ovsdb_monitor_table_exists(m
->dbmon
, table
)) {
1581 error
= ovsdb_syntax_error(NULL
, NULL
,
1582 "no table named %s in monitor session",
1587 mr_value
= node
->data
;
1588 if (mr_value
->type
== JSON_ARRAY
) {
1589 const struct json_array
*array
= &mr_value
->array
;
1591 for (i
= 0; i
< array
->n
; i
++) {
1592 error
= ovsdb_jsonrpc_parse_monitor_cond_change_request(
1593 m
, table
, array
->elems
[i
]);
1599 error
= ovsdb_syntax_error(
1601 "table %s no monitor-cond-change JSON array",
1608 hmap_remove(&s
->monitors
, &m
->node
);
1609 json_destroy(m
->monitor_id
);
1610 m
->monitor_id
= json_clone(new_monitor_id
);
1611 hmap_insert(&s
->monitors
, &m
->node
, json_hash(m
->monitor_id
, 0));
1614 /* Send the new update, if any, represents the difference from the old
1615 * condition and the new one. */
1616 struct json
*update_json
;
1618 update_json
= ovsdb_monitor_get_update(m
->dbmon
, false, true,
1619 m
->condition
, m
->version
, &m
->change_set
);
1621 struct jsonrpc_msg
*msg
;
1623 if (m
->version
== OVSDB_MONITOR_V3
) {
1624 struct json
*json_last_id
= json_string_create_nocopy(
1626 UUID_ARGS(ovsdb_monitor_get_last_txnid(
1629 p
= json_array_create_3(json_clone(m
->monitor_id
), json_last_id
,
1632 p
= json_array_create_2(json_clone(m
->monitor_id
), update_json
);
1634 msg
= ovsdb_jsonrpc_create_notify(m
, p
);
1635 jsonrpc_session_send(s
->js
, msg
);
1638 return jsonrpc_create_reply(json_object_create(), request_id
);
1641 return jsonrpc_create_error(ovsdb_error_to_json_free(error
), request_id
);
1644 static struct jsonrpc_msg
*
1645 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session
*s
,
1646 struct json_array
*params
,
1647 const struct json
*request_id
)
1649 if (params
->n
!= 1) {
1650 return jsonrpc_create_error(json_string_create("invalid parameters"),
1653 struct ovsdb_jsonrpc_monitor
*m
;
1655 m
= ovsdb_jsonrpc_monitor_find(s
, params
->elems
[0]);
1657 return jsonrpc_create_error(json_string_create("unknown monitor"),
1660 ovsdb_jsonrpc_monitor_destroy(m
, false);
1661 return jsonrpc_create_reply(json_object_create(), request_id
);
1666 /* Database 'db' is about to be removed from the database server. To prepare,
1667 * this function removes all references from monitors in 's' to 'db'. */
1669 ovsdb_jsonrpc_monitor_preremove_db(struct ovsdb_jsonrpc_session
*s
,
1674 struct ovsdb_jsonrpc_monitor
*m
, *next
;
1675 HMAP_FOR_EACH_SAFE (m
, next
, node
, &s
->monitors
) {
1677 ovsdb_jsonrpc_monitor_destroy(m
, true);
1682 /* Cancels all monitors in 's'. */
1684 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session
*s
)
1686 struct ovsdb_jsonrpc_monitor
*m
, *next
;
1688 HMAP_FOR_EACH_SAFE (m
, next
, node
, &s
->monitors
) {
1689 ovsdb_jsonrpc_monitor_destroy(m
, false);
1693 static struct json
*
1694 ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor
*m
,
1698 if (!ovsdb_monitor_needs_flush(m
->dbmon
, m
->change_set
)) {
1702 return ovsdb_monitor_get_update(m
->dbmon
, initial
, false,
1703 m
->condition
, m
->version
, &m
->change_set
);
1707 ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session
*s
)
1709 struct ovsdb_jsonrpc_monitor
*m
;
1711 HMAP_FOR_EACH (m
, node
, &s
->monitors
) {
1712 if (ovsdb_monitor_needs_flush(m
->dbmon
, m
->change_set
)) {
1721 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_jsonrpc_monitor
*m
,
1722 bool notify_cancellation
)
1724 if (notify_cancellation
) {
1725 struct ovsdb_jsonrpc_session
*s
= m
->session
;
1726 if (jsonrpc_session_is_connected(s
->js
) && s
->db_change_aware
) {
1727 struct jsonrpc_msg
*notify
= jsonrpc_create_notify(
1729 json_array_create_1(json_clone(m
->monitor_id
)));
1730 ovsdb_jsonrpc_session_send(s
, notify
);
1734 json_destroy(m
->monitor_id
);
1735 hmap_remove(&m
->session
->monitors
, &m
->node
);
1736 ovsdb_monitor_remove_jsonrpc_monitor(m
->dbmon
, m
, m
->change_set
);
1737 ovsdb_monitor_session_condition_destroy(m
->condition
);
1741 static struct jsonrpc_msg
*
1742 ovsdb_jsonrpc_create_notify(const struct ovsdb_jsonrpc_monitor
*m
,
1743 struct json
*params
)
1747 switch(m
->version
) {
1748 case OVSDB_MONITOR_V1
:
1751 case OVSDB_MONITOR_V2
:
1754 case OVSDB_MONITOR_V3
:
1757 case OVSDB_MONITOR_VERSION_MAX
:
1762 return jsonrpc_create_notify(method
, params
);
1766 ovsdb_jsonrpc_server_get_uuid(const struct ovsdb_jsonrpc_server
*s
)
1772 ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session
*s
)
1774 struct ovsdb_jsonrpc_monitor
*m
;
1776 HMAP_FOR_EACH (m
, node
, &s
->monitors
) {
1779 json
= ovsdb_jsonrpc_monitor_compose_update(m
, false);
1781 struct jsonrpc_msg
*msg
;
1782 struct json
*params
;
1783 if (m
->version
== OVSDB_MONITOR_V3
) {
1784 struct json
*json_last_id
= json_string_create_nocopy(
1786 UUID_ARGS(ovsdb_monitor_get_last_txnid(
1788 params
= json_array_create_3(json_clone(m
->monitor_id
),
1789 json_last_id
, json
);
1791 params
= json_array_create_2(json_clone(m
->monitor_id
), json
);
1794 msg
= ovsdb_jsonrpc_create_notify(m
, params
);
1795 jsonrpc_session_send(s
->js
, msg
);
1801 ovsdb_jsonrpc_disable_monitor_cond(void)
1803 /* Once disabled, it is not possible to re-enable it. */
1804 monitor_cond_enable__
= false;