1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 #include "jsonrpc-server.h"
24 #include "dynamic-string.h"
27 #include "ovsdb-error.h"
28 #include "ovsdb-parser.h"
30 #include "reconnect.h"
37 #include "transaction.h"
41 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server
);
43 struct ovsdb_jsonrpc_remote
;
44 struct ovsdb_jsonrpc_session
;
46 /* Message rate-limiting. */
47 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
50 static struct ovsdb_jsonrpc_session
*ovsdb_jsonrpc_session_create(
51 struct ovsdb_jsonrpc_remote
*, struct jsonrpc_session
*);
52 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote
*);
53 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote
*);
54 static void ovsdb_jsonrpc_session_get_memory_usage_all(
55 const struct ovsdb_jsonrpc_remote
*, struct simap
*usage
);
56 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote
*);
57 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote
*);
58 static void ovsdb_jsonrpc_session_set_all_options(
59 struct ovsdb_jsonrpc_remote
*, const struct ovsdb_jsonrpc_options
*);
60 static bool ovsdb_jsonrpc_session_get_status(
61 const struct ovsdb_jsonrpc_remote
*,
62 struct ovsdb_jsonrpc_remote_status
*);
63 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session
*);
64 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter
*);
67 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session
*,
69 struct json
*id
, struct json
*params
);
70 static struct ovsdb_jsonrpc_trigger
*ovsdb_jsonrpc_trigger_find(
71 struct ovsdb_jsonrpc_session
*, const struct json
*id
, size_t hash
);
72 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger
*);
73 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session
*);
74 static void ovsdb_jsonrpc_trigger_complete_done(
75 struct ovsdb_jsonrpc_session
*);
78 static struct json
*ovsdb_jsonrpc_monitor_create(
79 struct ovsdb_jsonrpc_session
*, struct ovsdb
*, struct json
*params
);
80 static struct jsonrpc_msg
*ovsdb_jsonrpc_monitor_cancel(
81 struct ovsdb_jsonrpc_session
*,
82 struct json_array
*params
,
83 const struct json
*request_id
);
84 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session
*);
86 /* JSON-RPC database server. */
88 struct ovsdb_jsonrpc_server
{
89 struct ovsdb_server up
;
90 unsigned int n_sessions
, max_sessions
;
91 struct shash remotes
; /* Contains "struct ovsdb_jsonrpc_remote *"s. */
94 /* A configured remote. This is either a passive stream listener plus a list
95 * of the currently connected sessions, or a list of exactly one active
97 struct ovsdb_jsonrpc_remote
{
98 struct ovsdb_jsonrpc_server
*server
;
99 struct pstream
*listener
; /* Listener, if passive. */
100 struct list sessions
; /* List of "struct ovsdb_jsonrpc_session"s. */
104 static struct ovsdb_jsonrpc_remote
*ovsdb_jsonrpc_server_add_remote(
105 struct ovsdb_jsonrpc_server
*, const char *name
,
106 const struct ovsdb_jsonrpc_options
*options
108 static void ovsdb_jsonrpc_server_del_remote(struct shash_node
*);
110 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
112 * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
113 * which 'server' should provide access. */
114 struct ovsdb_jsonrpc_server
*
115 ovsdb_jsonrpc_server_create(void)
117 struct ovsdb_jsonrpc_server
*server
= xzalloc(sizeof *server
);
118 ovsdb_server_init(&server
->up
);
119 server
->max_sessions
= 64;
120 shash_init(&server
->remotes
);
124 /* Adds 'db' to the set of databases served out by 'svr'. Returns true if
125 * successful, false if 'db''s name is the same as some database already in
128 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server
*svr
, struct ovsdb
*db
)
130 return ovsdb_server_add_db(&svr
->up
, db
);
134 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server
*svr
)
136 struct shash_node
*node
, *next
;
138 SHASH_FOR_EACH_SAFE (node
, next
, &svr
->remotes
) {
139 ovsdb_jsonrpc_server_del_remote(node
);
141 shash_destroy(&svr
->remotes
);
142 ovsdb_server_destroy(&svr
->up
);
146 struct ovsdb_jsonrpc_options
*
147 ovsdb_jsonrpc_default_options(const char *target
)
149 struct ovsdb_jsonrpc_options
*options
= xzalloc(sizeof *options
);
150 options
->max_backoff
= RECONNECT_DEFAULT_MAX_BACKOFF
;
151 options
->probe_interval
= (stream_or_pstream_needs_probes(target
)
152 ? RECONNECT_DEFAULT_PROBE_INTERVAL
157 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
158 * options in the struct ovsdb_jsonrpc_options supplied as the data values.
160 * A remote is an active or passive stream connection method, e.g. "pssl:" or
163 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server
*svr
,
164 const struct shash
*new_remotes
)
166 struct shash_node
*node
, *next
;
168 SHASH_FOR_EACH_SAFE (node
, next
, &svr
->remotes
) {
169 if (!shash_find(new_remotes
, node
->name
)) {
170 VLOG_INFO("%s: remote deconfigured", node
->name
);
171 ovsdb_jsonrpc_server_del_remote(node
);
174 SHASH_FOR_EACH (node
, new_remotes
) {
175 const struct ovsdb_jsonrpc_options
*options
= node
->data
;
176 struct ovsdb_jsonrpc_remote
*remote
;
178 remote
= shash_find_data(&svr
->remotes
, node
->name
);
180 remote
= ovsdb_jsonrpc_server_add_remote(svr
, node
->name
, options
);
186 ovsdb_jsonrpc_session_set_all_options(remote
, options
);
190 static struct ovsdb_jsonrpc_remote
*
191 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server
*svr
,
193 const struct ovsdb_jsonrpc_options
*options
)
195 struct ovsdb_jsonrpc_remote
*remote
;
196 struct pstream
*listener
;
199 error
= jsonrpc_pstream_open(name
, &listener
, options
->dscp
);
200 if (error
&& error
!= EAFNOSUPPORT
) {
201 VLOG_ERR_RL(&rl
, "%s: listen failed: %s", name
, strerror(error
));
205 remote
= xmalloc(sizeof *remote
);
206 remote
->server
= svr
;
207 remote
->listener
= listener
;
208 list_init(&remote
->sessions
);
209 remote
->dscp
= options
->dscp
;
210 shash_add(&svr
->remotes
, name
, remote
);
213 ovsdb_jsonrpc_session_create(remote
, jsonrpc_session_open(name
));
219 ovsdb_jsonrpc_server_del_remote(struct shash_node
*node
)
221 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
223 ovsdb_jsonrpc_session_close_all(remote
);
224 pstream_close(remote
->listener
);
225 shash_delete(&remote
->server
->remotes
, node
);
229 /* Stores status information for the remote named 'target', which should have
230 * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
231 * into '*status'. On success returns true, on failure (if 'svr' doesn't have
232 * a remote named 'target' or if that remote is an inbound remote that has no
233 * active connections) returns false. On failure, 'status' will be zeroed.
236 ovsdb_jsonrpc_server_get_remote_status(
237 const struct ovsdb_jsonrpc_server
*svr
, const char *target
,
238 struct ovsdb_jsonrpc_remote_status
*status
)
240 const struct ovsdb_jsonrpc_remote
*remote
;
242 memset(status
, 0, sizeof *status
);
244 remote
= shash_find_data(&svr
->remotes
, target
);
245 return remote
&& ovsdb_jsonrpc_session_get_status(remote
, status
);
249 ovsdb_jsonrpc_server_free_remote_status(
250 struct ovsdb_jsonrpc_remote_status
*status
)
252 free(status
->locks_held
);
253 free(status
->locks_waiting
);
254 free(status
->locks_lost
);
257 /* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
260 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server
*svr
)
262 struct shash_node
*node
;
264 SHASH_FOR_EACH (node
, &svr
->remotes
) {
265 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
267 ovsdb_jsonrpc_session_reconnect_all(remote
);
272 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server
*svr
)
274 struct shash_node
*node
;
276 SHASH_FOR_EACH (node
, &svr
->remotes
) {
277 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
279 if (remote
->listener
&& svr
->n_sessions
< svr
->max_sessions
) {
280 struct stream
*stream
;
283 error
= pstream_accept(remote
->listener
, &stream
);
285 struct jsonrpc_session
*js
;
286 js
= jsonrpc_session_open_unreliably(jsonrpc_open(stream
),
288 ovsdb_jsonrpc_session_create(remote
, js
);
289 } else if (error
!= EAGAIN
) {
290 VLOG_WARN_RL(&rl
, "%s: accept failed: %s",
291 pstream_get_name(remote
->listener
),
296 ovsdb_jsonrpc_session_run_all(remote
);
301 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server
*svr
)
303 struct shash_node
*node
;
305 SHASH_FOR_EACH (node
, &svr
->remotes
) {
306 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
308 if (remote
->listener
&& svr
->n_sessions
< svr
->max_sessions
) {
309 pstream_wait(remote
->listener
);
312 ovsdb_jsonrpc_session_wait_all(remote
);
316 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
317 * memory_report(). */
319 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server
*svr
,
322 struct shash_node
*node
;
324 simap_increase(usage
, "sessions", svr
->n_sessions
);
325 SHASH_FOR_EACH (node
, &svr
->remotes
) {
326 struct ovsdb_jsonrpc_remote
*remote
= node
->data
;
328 ovsdb_jsonrpc_session_get_memory_usage_all(remote
, usage
);
332 /* JSON-RPC database server session. */
334 struct ovsdb_jsonrpc_session
{
335 struct list node
; /* Element in remote's sessions list. */
336 struct ovsdb_session up
;
337 struct ovsdb_jsonrpc_remote
*remote
;
340 struct hmap triggers
; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
343 struct hmap monitors
; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
345 /* Network connectivity. */
346 struct jsonrpc_session
*js
; /* JSON-RPC session. */
347 unsigned int js_seqno
; /* Last jsonrpc_session_get_seqno() value. */
350 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session
*);
351 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session
*);
352 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session
*);
353 static void ovsdb_jsonrpc_session_get_memory_usage(
354 const struct ovsdb_jsonrpc_session
*, struct simap
*usage
);
355 static void ovsdb_jsonrpc_session_set_options(
356 struct ovsdb_jsonrpc_session
*, const struct ovsdb_jsonrpc_options
*);
357 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session
*,
358 struct jsonrpc_msg
*);
359 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session
*,
360 struct jsonrpc_msg
*);
362 static struct ovsdb_jsonrpc_session
*
363 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote
*remote
,
364 struct jsonrpc_session
*js
)
366 struct ovsdb_jsonrpc_session
*s
;
368 s
= xzalloc(sizeof *s
);
369 ovsdb_session_init(&s
->up
, &remote
->server
->up
);
371 list_push_back(&remote
->sessions
, &s
->node
);
372 hmap_init(&s
->triggers
);
373 hmap_init(&s
->monitors
);
375 s
->js_seqno
= jsonrpc_session_get_seqno(js
);
377 remote
->server
->n_sessions
++;
383 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session
*s
)
385 ovsdb_jsonrpc_monitor_remove_all(s
);
386 ovsdb_jsonrpc_session_unlock_all(s
);
387 ovsdb_jsonrpc_trigger_complete_all(s
);
389 hmap_destroy(&s
->monitors
);
390 hmap_destroy(&s
->triggers
);
392 jsonrpc_session_close(s
->js
);
393 list_remove(&s
->node
);
394 s
->remote
->server
->n_sessions
--;
395 ovsdb_session_destroy(&s
->up
);
400 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session
*s
)
402 jsonrpc_session_run(s
->js
);
403 if (s
->js_seqno
!= jsonrpc_session_get_seqno(s
->js
)) {
404 s
->js_seqno
= jsonrpc_session_get_seqno(s
->js
);
405 ovsdb_jsonrpc_trigger_complete_all(s
);
406 ovsdb_jsonrpc_monitor_remove_all(s
);
407 ovsdb_jsonrpc_session_unlock_all(s
);
410 ovsdb_jsonrpc_trigger_complete_done(s
);
412 if (!jsonrpc_session_get_backlog(s
->js
)) {
413 struct jsonrpc_msg
*msg
= jsonrpc_session_recv(s
->js
);
415 if (msg
->type
== JSONRPC_REQUEST
) {
416 ovsdb_jsonrpc_session_got_request(s
, msg
);
417 } else if (msg
->type
== JSONRPC_NOTIFY
) {
418 ovsdb_jsonrpc_session_got_notify(s
, msg
);
420 VLOG_WARN("%s: received unexpected %s message",
421 jsonrpc_session_get_name(s
->js
),
422 jsonrpc_msg_type_to_string(msg
->type
));
423 jsonrpc_session_force_reconnect(s
->js
);
424 jsonrpc_msg_destroy(msg
);
428 return jsonrpc_session_is_alive(s
->js
) ? 0 : ETIMEDOUT
;
432 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session
*session
,
433 const struct ovsdb_jsonrpc_options
*options
)
435 jsonrpc_session_set_max_backoff(session
->js
, options
->max_backoff
);
436 jsonrpc_session_set_probe_interval(session
->js
, options
->probe_interval
);
437 jsonrpc_session_set_dscp(session
->js
, options
->dscp
);
441 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote
*remote
)
443 struct ovsdb_jsonrpc_session
*s
, *next
;
445 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
446 int error
= ovsdb_jsonrpc_session_run(s
);
448 ovsdb_jsonrpc_session_close(s
);
454 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session
*s
)
456 jsonrpc_session_wait(s
->js
);
457 if (!jsonrpc_session_get_backlog(s
->js
)) {
458 jsonrpc_session_recv_wait(s
->js
);
463 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote
*remote
)
465 struct ovsdb_jsonrpc_session
*s
;
467 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
468 ovsdb_jsonrpc_session_wait(s
);
473 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session
*s
,
476 simap_increase(usage
, "triggers", hmap_count(&s
->triggers
));
477 simap_increase(usage
, "monitors", hmap_count(&s
->monitors
));
478 simap_increase(usage
, "backlog", jsonrpc_session_get_backlog(s
->js
));
482 ovsdb_jsonrpc_session_get_memory_usage_all(
483 const struct ovsdb_jsonrpc_remote
*remote
,
486 struct ovsdb_jsonrpc_session
*s
;
488 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
489 ovsdb_jsonrpc_session_get_memory_usage(s
, usage
);
494 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote
*remote
)
496 struct ovsdb_jsonrpc_session
*s
, *next
;
498 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
499 ovsdb_jsonrpc_session_close(s
);
503 /* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
506 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote
*remote
)
508 struct ovsdb_jsonrpc_session
*s
, *next
;
510 LIST_FOR_EACH_SAFE (s
, next
, node
, &remote
->sessions
) {
511 jsonrpc_session_force_reconnect(s
->js
);
512 if (!jsonrpc_session_is_alive(s
->js
)) {
513 ovsdb_jsonrpc_session_close(s
);
518 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
521 ovsdb_jsonrpc_session_set_all_options(
522 struct ovsdb_jsonrpc_remote
*remote
,
523 const struct ovsdb_jsonrpc_options
*options
)
525 struct ovsdb_jsonrpc_session
*s
;
527 if (remote
->listener
) {
530 error
= pstream_set_dscp(remote
->listener
, options
->dscp
);
532 VLOG_ERR("%s: set_dscp failed %s",
533 pstream_get_name(remote
->listener
), strerror(error
));
535 remote
->dscp
= options
->dscp
;
538 * XXX race window between setting dscp to listening socket
539 * and accepting socket. Accepted socket may have old dscp value.
540 * Ignore this race window for now.
543 LIST_FOR_EACH (s
, node
, &remote
->sessions
) {
544 ovsdb_jsonrpc_session_set_options(s
, options
);
549 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_remote
*remote
,
550 struct ovsdb_jsonrpc_remote_status
*status
)
552 const struct ovsdb_jsonrpc_session
*s
;
553 const struct jsonrpc_session
*js
;
554 struct ovsdb_lock_waiter
*waiter
;
555 struct reconnect_stats rstats
;
556 struct ds locks_held
, locks_waiting
, locks_lost
;
558 if (list_is_empty(&remote
->sessions
)) {
561 s
= CONTAINER_OF(remote
->sessions
.next
, struct ovsdb_jsonrpc_session
, node
);
564 status
->is_connected
= jsonrpc_session_is_connected(js
);
565 status
->last_error
= jsonrpc_session_get_status(js
);
567 jsonrpc_session_get_reconnect_stats(js
, &rstats
);
568 status
->state
= rstats
.state
;
569 status
->sec_since_connect
= rstats
.msec_since_connect
== UINT_MAX
570 ? UINT_MAX
: rstats
.msec_since_connect
/ 1000;
571 status
->sec_since_disconnect
= rstats
.msec_since_disconnect
== UINT_MAX
572 ? UINT_MAX
: rstats
.msec_since_disconnect
/ 1000;
574 ds_init(&locks_held
);
575 ds_init(&locks_waiting
);
576 ds_init(&locks_lost
);
577 HMAP_FOR_EACH (waiter
, session_node
, &s
->up
.waiters
) {
580 string
= (ovsdb_lock_waiter_is_owner(waiter
) ? &locks_held
581 : waiter
->mode
== OVSDB_LOCK_WAIT
? &locks_waiting
583 if (string
->length
) {
584 ds_put_char(string
, ' ');
586 ds_put_cstr(string
, waiter
->lock_name
);
588 status
->locks_held
= ds_steal_cstr(&locks_held
);
589 status
->locks_waiting
= ds_steal_cstr(&locks_waiting
);
590 status
->locks_lost
= ds_steal_cstr(&locks_lost
);
592 status
->n_connections
= list_size(&remote
->sessions
);
597 /* Examines 'request' to determine the database to which it relates, and then
598 * searches 's' to find that database:
600 * - If successful, returns the database and sets '*replyp' to NULL.
602 * - If no such database exists, returns NULL and sets '*replyp' to an
603 * appropriate JSON-RPC error reply, owned by the caller. */
604 static struct ovsdb
*
605 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session
*s
,
606 const struct jsonrpc_msg
*request
,
607 struct jsonrpc_msg
**replyp
)
609 struct json_array
*params
;
610 struct ovsdb_error
*error
;
614 params
= json_array(request
->params
);
615 if (!params
->n
|| params
->elems
[0]->type
!= JSON_STRING
) {
616 error
= ovsdb_syntax_error(
617 request
->params
, NULL
,
618 "%s request params must begin with <db-name>", request
->method
);
622 db_name
= params
->elems
[0]->u
.string
;
623 db
= shash_find_data(&s
->up
.server
->dbs
, db_name
);
625 error
= ovsdb_syntax_error(
626 request
->params
, "unknown database",
627 "%s request specifies unknown database %s",
628 request
->method
, db_name
);
636 *replyp
= jsonrpc_create_reply(ovsdb_error_to_json(error
), request
->id
);
637 ovsdb_error_destroy(error
);
641 static struct ovsdb_error
*
642 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg
*request
,
643 const char **lock_namep
)
645 const struct json_array
*params
;
647 params
= json_array(request
->params
);
648 if (params
->n
!= 1 || params
->elems
[0]->type
!= JSON_STRING
||
649 !ovsdb_parser_is_id(json_string(params
->elems
[0]))) {
651 return ovsdb_syntax_error(request
->params
, NULL
,
652 "%s request params must be <id>",
656 *lock_namep
= json_string(params
->elems
[0]);
661 ovsdb_jsonrpc_session_notify(struct ovsdb_session
*session
,
662 const char *lock_name
,
665 struct ovsdb_jsonrpc_session
*s
;
668 s
= CONTAINER_OF(session
, struct ovsdb_jsonrpc_session
, up
);
669 params
= json_array_create_1(json_string_create(lock_name
));
670 jsonrpc_session_send(s
->js
, jsonrpc_create_notify(method
, params
));
673 static struct jsonrpc_msg
*
674 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session
*s
,
675 struct jsonrpc_msg
*request
,
676 enum ovsdb_lock_mode mode
)
678 struct ovsdb_lock_waiter
*waiter
;
679 struct jsonrpc_msg
*reply
;
680 struct ovsdb_error
*error
;
681 struct ovsdb_session
*victim
;
682 const char *lock_name
;
685 error
= ovsdb_jsonrpc_session_parse_lock_name(request
, &lock_name
);
690 /* Report error if this session has issued a "lock" or "steal" without a
691 * matching "unlock" for this lock. */
692 waiter
= ovsdb_session_get_lock_waiter(&s
->up
, lock_name
);
694 error
= ovsdb_syntax_error(
695 request
->params
, NULL
,
696 "must issue \"unlock\" before new \"%s\"", request
->method
);
700 /* Get the lock, add us as a waiter. */
701 waiter
= ovsdb_server_lock(&s
->remote
->server
->up
, &s
->up
, lock_name
, mode
,
704 ovsdb_jsonrpc_session_notify(victim
, lock_name
, "stolen");
707 result
= json_object_create();
708 json_object_put(result
, "locked",
709 json_boolean_create(ovsdb_lock_waiter_is_owner(waiter
)));
711 return jsonrpc_create_reply(result
, request
->id
);
714 reply
= jsonrpc_create_reply(ovsdb_error_to_json(error
), request
->id
);
715 ovsdb_error_destroy(error
);
720 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session
*s
)
722 struct ovsdb_lock_waiter
*waiter
, *next
;
724 HMAP_FOR_EACH_SAFE (waiter
, next
, session_node
, &s
->up
.waiters
) {
725 ovsdb_jsonrpc_session_unlock__(waiter
);
730 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter
*waiter
)
732 struct ovsdb_lock
*lock
= waiter
->lock
;
735 struct ovsdb_session
*new_owner
= ovsdb_lock_waiter_remove(waiter
);
737 ovsdb_jsonrpc_session_notify(new_owner
, lock
->name
, "locked");
739 /* ovsdb_server_lock() might have freed 'lock'. */
743 ovsdb_lock_waiter_destroy(waiter
);
746 static struct jsonrpc_msg
*
747 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session
*s
,
748 struct jsonrpc_msg
*request
)
750 struct ovsdb_lock_waiter
*waiter
;
751 struct jsonrpc_msg
*reply
;
752 struct ovsdb_error
*error
;
753 const char *lock_name
;
755 error
= ovsdb_jsonrpc_session_parse_lock_name(request
, &lock_name
);
760 /* Report error if this session has not issued a "lock" or "steal" for this
762 waiter
= ovsdb_session_get_lock_waiter(&s
->up
, lock_name
);
764 error
= ovsdb_syntax_error(
765 request
->params
, NULL
, "\"unlock\" without \"lock\" or \"steal\"");
769 ovsdb_jsonrpc_session_unlock__(waiter
);
771 return jsonrpc_create_reply(json_object_create(), request
->id
);
774 reply
= jsonrpc_create_reply(ovsdb_error_to_json(error
), request
->id
);
775 ovsdb_error_destroy(error
);
779 static struct jsonrpc_msg
*
780 execute_transaction(struct ovsdb_jsonrpc_session
*s
, struct ovsdb
*db
,
781 struct jsonrpc_msg
*request
)
783 ovsdb_jsonrpc_trigger_create(s
, db
, request
->id
, request
->params
);
785 request
->params
= NULL
;
786 jsonrpc_msg_destroy(request
);
791 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session
*s
,
792 struct jsonrpc_msg
*request
)
794 struct jsonrpc_msg
*reply
;
796 if (!strcmp(request
->method
, "transact")) {
797 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
799 reply
= execute_transaction(s
, db
, request
);
801 } else if (!strcmp(request
->method
, "monitor")) {
802 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
804 reply
= jsonrpc_create_reply(
805 ovsdb_jsonrpc_monitor_create(s
, db
, request
->params
),
808 } else if (!strcmp(request
->method
, "monitor_cancel")) {
809 reply
= ovsdb_jsonrpc_monitor_cancel(s
, json_array(request
->params
),
811 } else if (!strcmp(request
->method
, "get_schema")) {
812 struct ovsdb
*db
= ovsdb_jsonrpc_lookup_db(s
, request
, &reply
);
814 reply
= jsonrpc_create_reply(ovsdb_schema_to_json(db
->schema
),
817 } else if (!strcmp(request
->method
, "list_dbs")) {
818 size_t n_dbs
= shash_count(&s
->up
.server
->dbs
);
819 struct shash_node
*node
;
823 dbs
= xmalloc(n_dbs
* sizeof *dbs
);
825 SHASH_FOR_EACH (node
, &s
->up
.server
->dbs
) {
826 dbs
[i
++] = json_string_create(node
->name
);
828 reply
= jsonrpc_create_reply(json_array_create(dbs
, n_dbs
),
830 } else if (!strcmp(request
->method
, "lock")) {
831 reply
= ovsdb_jsonrpc_session_lock(s
, request
, OVSDB_LOCK_WAIT
);
832 } else if (!strcmp(request
->method
, "steal")) {
833 reply
= ovsdb_jsonrpc_session_lock(s
, request
, OVSDB_LOCK_STEAL
);
834 } else if (!strcmp(request
->method
, "unlock")) {
835 reply
= ovsdb_jsonrpc_session_unlock(s
, request
);
836 } else if (!strcmp(request
->method
, "echo")) {
837 reply
= jsonrpc_create_reply(json_clone(request
->params
), request
->id
);
839 reply
= jsonrpc_create_error(json_string_create("unknown method"),
844 jsonrpc_msg_destroy(request
);
845 jsonrpc_session_send(s
->js
, reply
);
850 execute_cancel(struct ovsdb_jsonrpc_session
*s
, struct jsonrpc_msg
*request
)
852 if (json_array(request
->params
)->n
== 1) {
853 struct ovsdb_jsonrpc_trigger
*t
;
856 id
= request
->params
->u
.array
.elems
[0];
857 t
= ovsdb_jsonrpc_trigger_find(s
, id
, json_hash(id
, 0));
859 ovsdb_jsonrpc_trigger_complete(t
);
865 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session
*s
,
866 struct jsonrpc_msg
*request
)
868 if (!strcmp(request
->method
, "cancel")) {
869 execute_cancel(s
, request
);
871 jsonrpc_msg_destroy(request
);
874 /* JSON-RPC database server triggers.
876 * (Every transaction is treated as a trigger even if it doesn't actually have
877 * any "wait" operations.) */
879 struct ovsdb_jsonrpc_trigger
{
880 struct ovsdb_trigger trigger
;
881 struct hmap_node hmap_node
; /* In session's "triggers" hmap. */
886 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session
*s
, struct ovsdb
*db
,
887 struct json
*id
, struct json
*params
)
889 struct ovsdb_jsonrpc_trigger
*t
;
892 /* Check for duplicate ID. */
893 hash
= json_hash(id
, 0);
894 t
= ovsdb_jsonrpc_trigger_find(s
, id
, hash
);
896 struct jsonrpc_msg
*msg
;
898 msg
= jsonrpc_create_error(json_string_create("duplicate request ID"),
900 jsonrpc_session_send(s
->js
, msg
);
902 json_destroy(params
);
906 /* Insert into trigger table. */
907 t
= xmalloc(sizeof *t
);
908 ovsdb_trigger_init(&s
->up
, db
, &t
->trigger
, params
, time_msec());
910 hmap_insert(&s
->triggers
, &t
->hmap_node
, hash
);
912 /* Complete early if possible. */
913 if (ovsdb_trigger_is_complete(&t
->trigger
)) {
914 ovsdb_jsonrpc_trigger_complete(t
);
918 static struct ovsdb_jsonrpc_trigger
*
919 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session
*s
,
920 const struct json
*id
, size_t hash
)
922 struct ovsdb_jsonrpc_trigger
*t
;
924 HMAP_FOR_EACH_WITH_HASH (t
, hmap_node
, hash
, &s
->triggers
) {
925 if (json_equal(t
->id
, id
)) {
934 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger
*t
)
936 struct ovsdb_jsonrpc_session
*s
;
938 s
= CONTAINER_OF(t
->trigger
.session
, struct ovsdb_jsonrpc_session
, up
);
940 if (jsonrpc_session_is_connected(s
->js
)) {
941 struct jsonrpc_msg
*reply
;
944 result
= ovsdb_trigger_steal_result(&t
->trigger
);
946 reply
= jsonrpc_create_reply(result
, t
->id
);
948 reply
= jsonrpc_create_error(json_string_create("canceled"),
951 jsonrpc_session_send(s
->js
, reply
);
955 ovsdb_trigger_destroy(&t
->trigger
);
956 hmap_remove(&s
->triggers
, &t
->hmap_node
);
961 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session
*s
)
963 struct ovsdb_jsonrpc_trigger
*t
, *next
;
964 HMAP_FOR_EACH_SAFE (t
, next
, hmap_node
, &s
->triggers
) {
965 ovsdb_jsonrpc_trigger_complete(t
);
970 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session
*s
)
972 while (!list_is_empty(&s
->up
.completions
)) {
973 struct ovsdb_jsonrpc_trigger
*t
974 = CONTAINER_OF(s
->up
.completions
.next
,
975 struct ovsdb_jsonrpc_trigger
, trigger
.node
);
976 ovsdb_jsonrpc_trigger_complete(t
);
980 /* JSON-RPC database table monitors. */
982 enum ovsdb_jsonrpc_monitor_selection
{
983 OJMS_INITIAL
= 1 << 0, /* All rows when monitor is created. */
984 OJMS_INSERT
= 1 << 1, /* New rows. */
985 OJMS_DELETE
= 1 << 2, /* Deleted rows. */
986 OJMS_MODIFY
= 1 << 3 /* Modified rows. */
989 /* A particular column being monitored. */
990 struct ovsdb_jsonrpc_monitor_column
{
991 const struct ovsdb_column
*column
;
992 enum ovsdb_jsonrpc_monitor_selection select
;
995 /* A particular table being monitored. */
996 struct ovsdb_jsonrpc_monitor_table
{
997 const struct ovsdb_table
*table
;
999 /* This is the union (bitwise-OR) of the 'select' values in all of the
1000 * members of 'columns' below. */
1001 enum ovsdb_jsonrpc_monitor_selection select
;
1003 /* Columns being monitored. */
1004 struct ovsdb_jsonrpc_monitor_column
*columns
;
1008 /* A collection of tables being monitored. */
1009 struct ovsdb_jsonrpc_monitor
{
1010 struct ovsdb_replica replica
;
1011 struct ovsdb_jsonrpc_session
*session
;
1013 struct hmap_node node
; /* In ovsdb_jsonrpc_session's "monitors". */
1015 struct json
*monitor_id
;
1016 struct shash tables
; /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
1019 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class
;
1021 struct ovsdb_jsonrpc_monitor
*ovsdb_jsonrpc_monitor_find(
1022 struct ovsdb_jsonrpc_session
*, const struct json
*monitor_id
);
1023 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica
*);
1024 static struct json
*ovsdb_jsonrpc_monitor_get_initial(
1025 const struct ovsdb_jsonrpc_monitor
*);
1028 parse_bool(struct ovsdb_parser
*parser
, const char *name
, bool default_value
)
1030 const struct json
*json
;
1032 json
= ovsdb_parser_member(parser
, name
, OP_BOOLEAN
| OP_OPTIONAL
);
1033 return json
? json_boolean(json
) : default_value
;
1036 struct ovsdb_jsonrpc_monitor
*
1037 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session
*s
,
1038 const struct json
*monitor_id
)
1040 struct ovsdb_jsonrpc_monitor
*m
;
1042 HMAP_FOR_EACH_WITH_HASH (m
, node
, json_hash(monitor_id
, 0), &s
->monitors
) {
1043 if (json_equal(m
->monitor_id
, monitor_id
)) {
1052 ovsdb_jsonrpc_add_monitor_column(struct ovsdb_jsonrpc_monitor_table
*mt
,
1053 const struct ovsdb_column
*column
,
1054 enum ovsdb_jsonrpc_monitor_selection select
,
1055 size_t *allocated_columns
)
1057 struct ovsdb_jsonrpc_monitor_column
*c
;
1059 if (mt
->n_columns
>= *allocated_columns
) {
1060 mt
->columns
= x2nrealloc(mt
->columns
, allocated_columns
,
1061 sizeof *mt
->columns
);
1064 c
= &mt
->columns
[mt
->n_columns
++];
1070 compare_ovsdb_jsonrpc_monitor_column(const void *a_
, const void *b_
)
1072 const struct ovsdb_jsonrpc_monitor_column
*a
= a_
;
1073 const struct ovsdb_jsonrpc_monitor_column
*b
= b_
;
1075 return a
->column
< b
->column
? -1 : a
->column
> b
->column
;
1078 static struct ovsdb_error
* WARN_UNUSED_RESULT
1079 ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_jsonrpc_monitor_table
*mt
,
1080 const struct json
*monitor_request
,
1081 size_t *allocated_columns
)
1083 const struct ovsdb_table_schema
*ts
= mt
->table
->schema
;
1084 enum ovsdb_jsonrpc_monitor_selection select
;
1085 const struct json
*columns
, *select_json
;
1086 struct ovsdb_parser parser
;
1087 struct ovsdb_error
*error
;
1089 ovsdb_parser_init(&parser
, monitor_request
, "table %s", ts
->name
);
1090 columns
= ovsdb_parser_member(&parser
, "columns", OP_ARRAY
| OP_OPTIONAL
);
1091 select_json
= ovsdb_parser_member(&parser
, "select",
1092 OP_OBJECT
| OP_OPTIONAL
);
1093 error
= ovsdb_parser_finish(&parser
);
1100 ovsdb_parser_init(&parser
, select_json
, "table %s select", ts
->name
);
1101 if (parse_bool(&parser
, "initial", true)) {
1102 select
|= OJMS_INITIAL
;
1104 if (parse_bool(&parser
, "insert", true)) {
1105 select
|= OJMS_INSERT
;
1107 if (parse_bool(&parser
, "delete", true)) {
1108 select
|= OJMS_DELETE
;
1110 if (parse_bool(&parser
, "modify", true)) {
1111 select
|= OJMS_MODIFY
;
1113 error
= ovsdb_parser_finish(&parser
);
1118 select
= OJMS_INITIAL
| OJMS_INSERT
| OJMS_DELETE
| OJMS_MODIFY
;
1120 mt
->select
|= select
;
1125 if (columns
->type
!= JSON_ARRAY
) {
1126 return ovsdb_syntax_error(columns
, NULL
,
1127 "array of column names expected");
1130 for (i
= 0; i
< columns
->u
.array
.n
; i
++) {
1131 const struct ovsdb_column
*column
;
1134 if (columns
->u
.array
.elems
[i
]->type
!= JSON_STRING
) {
1135 return ovsdb_syntax_error(columns
, NULL
,
1136 "array of column names expected");
1139 s
= columns
->u
.array
.elems
[i
]->u
.string
;
1140 column
= shash_find_data(&mt
->table
->schema
->columns
, s
);
1142 return ovsdb_syntax_error(columns
, NULL
, "%s is not a valid "
1145 ovsdb_jsonrpc_add_monitor_column(mt
, column
, select
,
1149 struct shash_node
*node
;
1151 SHASH_FOR_EACH (node
, &ts
->columns
) {
1152 const struct ovsdb_column
*column
= node
->data
;
1153 if (column
->index
!= OVSDB_COL_UUID
) {
1154 ovsdb_jsonrpc_add_monitor_column(mt
, column
, select
,
1163 static struct json
*
1164 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session
*s
, struct ovsdb
*db
,
1165 struct json
*params
)
1167 struct ovsdb_jsonrpc_monitor
*m
= NULL
;
1168 struct json
*monitor_id
, *monitor_requests
;
1169 struct ovsdb_error
*error
= NULL
;
1170 struct shash_node
*node
;
1173 if (json_array(params
)->n
!= 3) {
1174 error
= ovsdb_syntax_error(params
, NULL
, "invalid parameters");
1177 monitor_id
= params
->u
.array
.elems
[1];
1178 monitor_requests
= params
->u
.array
.elems
[2];
1179 if (monitor_requests
->type
!= JSON_OBJECT
) {
1180 error
= ovsdb_syntax_error(monitor_requests
, NULL
,
1181 "monitor-requests must be object");
1185 if (ovsdb_jsonrpc_monitor_find(s
, monitor_id
)) {
1186 error
= ovsdb_syntax_error(monitor_id
, NULL
, "duplicate monitor ID");
1190 m
= xzalloc(sizeof *m
);
1191 ovsdb_replica_init(&m
->replica
, &ovsdb_jsonrpc_replica_class
);
1192 ovsdb_add_replica(db
, &m
->replica
);
1195 hmap_insert(&s
->monitors
, &m
->node
, json_hash(monitor_id
, 0));
1196 m
->monitor_id
= json_clone(monitor_id
);
1197 shash_init(&m
->tables
);
1199 SHASH_FOR_EACH (node
, json_object(monitor_requests
)) {
1200 const struct ovsdb_table
*table
;
1201 struct ovsdb_jsonrpc_monitor_table
*mt
;
1202 size_t allocated_columns
;
1203 const struct json
*mr_value
;
1206 table
= ovsdb_get_table(m
->db
, node
->name
);
1208 error
= ovsdb_syntax_error(NULL
, NULL
,
1209 "no table named %s", node
->name
);
1213 mt
= xzalloc(sizeof *mt
);
1215 shash_add(&m
->tables
, table
->schema
->name
, mt
);
1217 /* Parse columns. */
1218 mr_value
= node
->data
;
1219 allocated_columns
= 0;
1220 if (mr_value
->type
== JSON_ARRAY
) {
1221 const struct json_array
*array
= &mr_value
->u
.array
;
1223 for (i
= 0; i
< array
->n
; i
++) {
1224 error
= ovsdb_jsonrpc_parse_monitor_request(
1225 mt
, array
->elems
[i
], &allocated_columns
);
1231 error
= ovsdb_jsonrpc_parse_monitor_request(
1232 mt
, mr_value
, &allocated_columns
);
1238 /* Check for duplicate columns. */
1239 qsort(mt
->columns
, mt
->n_columns
, sizeof *mt
->columns
,
1240 compare_ovsdb_jsonrpc_monitor_column
);
1241 for (i
= 1; i
< mt
->n_columns
; i
++) {
1242 if (mt
->columns
[i
].column
== mt
->columns
[i
- 1].column
) {
1243 error
= ovsdb_syntax_error(mr_value
, NULL
, "column %s "
1244 "mentioned more than once",
1245 mt
->columns
[i
].column
->name
);
1251 return ovsdb_jsonrpc_monitor_get_initial(m
);
1255 ovsdb_remove_replica(m
->db
, &m
->replica
);
1258 json
= ovsdb_error_to_json(error
);
1259 ovsdb_error_destroy(error
);
1263 static struct jsonrpc_msg
*
1264 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session
*s
,
1265 struct json_array
*params
,
1266 const struct json
*request_id
)
1268 if (params
->n
!= 1) {
1269 return jsonrpc_create_error(json_string_create("invalid parameters"),
1272 struct ovsdb_jsonrpc_monitor
*m
;
1274 m
= ovsdb_jsonrpc_monitor_find(s
, params
->elems
[0]);
1276 return jsonrpc_create_error(json_string_create("unknown monitor"),
1279 ovsdb_remove_replica(m
->db
, &m
->replica
);
1280 return jsonrpc_create_reply(json_object_create(), request_id
);
1286 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session
*s
)
1288 struct ovsdb_jsonrpc_monitor
*m
, *next
;
1290 HMAP_FOR_EACH_SAFE (m
, next
, node
, &s
->monitors
) {
1291 ovsdb_remove_replica(m
->db
, &m
->replica
);
1295 static struct ovsdb_jsonrpc_monitor
*
1296 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica
*replica
)
1298 ovs_assert(replica
->class == &ovsdb_jsonrpc_replica_class
);
1299 return CONTAINER_OF(replica
, struct ovsdb_jsonrpc_monitor
, replica
);
1302 struct ovsdb_jsonrpc_monitor_aux
{
1303 bool initial
; /* Sending initial contents of table? */
1304 const struct ovsdb_jsonrpc_monitor
*monitor
;
1305 struct json
*json
; /* JSON for the whole transaction. */
1307 /* Current table. */
1308 struct ovsdb_jsonrpc_monitor_table
*mt
;
1309 struct json
*table_json
; /* JSON for table's transaction. */
1313 any_reportable_change(const struct ovsdb_jsonrpc_monitor_table
*mt
,
1314 const unsigned long int *changed
)
1318 for (i
= 0; i
< mt
->n_columns
; i
++) {
1319 const struct ovsdb_jsonrpc_monitor_column
*c
= &mt
->columns
[i
];
1320 unsigned int idx
= c
->column
->index
;
1322 if (c
->select
& OJMS_MODIFY
&& bitmap_is_set(changed
, idx
)) {
1331 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row
*old
,
1332 const struct ovsdb_row
*new,
1333 const unsigned long int *changed
,
1336 struct ovsdb_jsonrpc_monitor_aux
*aux
= aux_
;
1337 const struct ovsdb_jsonrpc_monitor
*m
= aux
->monitor
;
1338 struct ovsdb_table
*table
= new ? new->table
: old
->table
;
1339 enum ovsdb_jsonrpc_monitor_selection type
;
1340 struct json
*old_json
, *new_json
;
1341 struct json
*row_json
;
1342 char uuid
[UUID_LEN
+ 1];
1345 if (!aux
->mt
|| table
!= aux
->mt
->table
) {
1346 aux
->mt
= shash_find_data(&m
->tables
, table
->schema
->name
);
1347 aux
->table_json
= NULL
;
1349 /* We don't care about rows in this table at all. Tell the caller
1355 type
= (aux
->initial
? OJMS_INITIAL
1356 : !old
? OJMS_INSERT
1357 : !new ? OJMS_DELETE
1359 if (!(aux
->mt
->select
& type
)) {
1360 /* We don't care about this type of change (but do want to be called
1361 * back for changes to other rows in the same table). */
1365 if (type
== OJMS_MODIFY
&& !any_reportable_change(aux
->mt
, changed
)) {
1366 /* Nothing of interest changed. */
1370 old_json
= new_json
= NULL
;
1371 if (type
& (OJMS_DELETE
| OJMS_MODIFY
)) {
1372 old_json
= json_object_create();
1374 if (type
& (OJMS_INITIAL
| OJMS_INSERT
| OJMS_MODIFY
)) {
1375 new_json
= json_object_create();
1377 for (i
= 0; i
< aux
->mt
->n_columns
; i
++) {
1378 const struct ovsdb_jsonrpc_monitor_column
*c
= &aux
->mt
->columns
[i
];
1379 const struct ovsdb_column
*column
= c
->column
;
1380 unsigned int idx
= c
->column
->index
;
1382 if (!(type
& c
->select
)) {
1383 /* We don't care about this type of change for this particular
1384 * column (but we will care about it for some other column). */
1388 if ((type
== OJMS_MODIFY
&& bitmap_is_set(changed
, idx
))
1389 || type
== OJMS_DELETE
) {
1390 json_object_put(old_json
, column
->name
,
1391 ovsdb_datum_to_json(&old
->fields
[idx
],
1394 if (type
& (OJMS_INITIAL
| OJMS_INSERT
| OJMS_MODIFY
)) {
1395 json_object_put(new_json
, column
->name
,
1396 ovsdb_datum_to_json(&new->fields
[idx
],
1401 /* Create JSON object for transaction overall. */
1403 aux
->json
= json_object_create();
1406 /* Create JSON object for transaction on this table. */
1407 if (!aux
->table_json
) {
1408 aux
->table_json
= json_object_create();
1409 json_object_put(aux
->json
, aux
->mt
->table
->schema
->name
,
1413 /* Create JSON object for transaction on this row. */
1414 row_json
= json_object_create();
1416 json_object_put(row_json
, "old", old_json
);
1419 json_object_put(row_json
, "new", new_json
);
1422 /* Add JSON row to JSON table. */
1423 snprintf(uuid
, sizeof uuid
,
1424 UUID_FMT
, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old
)));
1425 json_object_put(aux
->table_json
, uuid
, row_json
);
1431 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux
*aux
,
1432 const struct ovsdb_jsonrpc_monitor
*m
,
1435 aux
->initial
= initial
;
1439 aux
->table_json
= NULL
;
1442 static struct ovsdb_error
*
1443 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica
*replica
,
1444 const struct ovsdb_txn
*txn
,
1445 bool durable OVS_UNUSED
)
1447 struct ovsdb_jsonrpc_monitor
*m
= ovsdb_jsonrpc_monitor_cast(replica
);
1448 struct ovsdb_jsonrpc_monitor_aux aux
;
1450 ovsdb_jsonrpc_monitor_init_aux(&aux
, m
, false);
1451 ovsdb_txn_for_each_change(txn
, ovsdb_jsonrpc_monitor_change_cb
, &aux
);
1453 struct jsonrpc_msg
*msg
;
1454 struct json
*params
;
1456 params
= json_array_create_2(json_clone(aux
.monitor
->monitor_id
),
1458 msg
= jsonrpc_create_notify("update", params
);
1459 jsonrpc_session_send(aux
.monitor
->session
->js
, msg
);
1465 static struct json
*
1466 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor
*m
)
1468 struct ovsdb_jsonrpc_monitor_aux aux
;
1469 struct shash_node
*node
;
1471 ovsdb_jsonrpc_monitor_init_aux(&aux
, m
, true);
1472 SHASH_FOR_EACH (node
, &m
->tables
) {
1473 struct ovsdb_jsonrpc_monitor_table
*mt
= node
->data
;
1475 if (mt
->select
& OJMS_INITIAL
) {
1476 struct ovsdb_row
*row
;
1478 HMAP_FOR_EACH (row
, hmap_node
, &mt
->table
->rows
) {
1479 ovsdb_jsonrpc_monitor_change_cb(NULL
, row
, NULL
, &aux
);
1483 return aux
.json
? aux
.json
: json_object_create();
1487 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica
*replica
)
1489 struct ovsdb_jsonrpc_monitor
*m
= ovsdb_jsonrpc_monitor_cast(replica
);
1490 struct shash_node
*node
;
1492 json_destroy(m
->monitor_id
);
1493 SHASH_FOR_EACH (node
, &m
->tables
) {
1494 struct ovsdb_jsonrpc_monitor_table
*mt
= node
->data
;
1498 shash_destroy(&m
->tables
);
1499 hmap_remove(&m
->session
->monitors
, &m
->node
);
1503 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class
= {
1504 ovsdb_jsonrpc_monitor_commit
,
1505 ovsdb_jsonrpc_monitor_destroy