]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/jsonrpc-server.c
trigger: Free leaked ovsdb_schema
[mirror_ovs.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <errno.h>
21
22 #include "bitmap.h"
23 #include "column.h"
24 #include "openvswitch/dynamic-string.h"
25 #include "monitor.h"
26 #include "openvswitch/json.h"
27 #include "jsonrpc.h"
28 #include "ovsdb-error.h"
29 #include "ovsdb-parser.h"
30 #include "ovsdb.h"
31 #include "condition.h"
32 #include "openvswitch/poll-loop.h"
33 #include "reconnect.h"
34 #include "row.h"
35 #include "server.h"
36 #include "simap.h"
37 #include "storage.h"
38 #include "stream.h"
39 #include "table.h"
40 #include "timeval.h"
41 #include "transaction.h"
42 #include "trigger.h"
43 #include "util.h"
44 #include "openvswitch/vlog.h"
45
46 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
47
48 struct ovsdb_jsonrpc_remote;
49 struct ovsdb_jsonrpc_session;
50
51 /* Set false to defeature monitor_cond, causing jsonrpc to respond to
52 * monitor_cond method with an error. */
53 static bool monitor_cond_enable__ = true;
54
55 /* Message rate-limiting. */
56 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
57
58 /* Sessions. */
59 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
60 struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *, bool);
61 static void ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote *,
62 struct ovsdb *);
63 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
64 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
65 static void ovsdb_jsonrpc_session_get_memory_usage_all(
66 const struct ovsdb_jsonrpc_remote *, struct simap *usage);
67 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
68 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *,
69 bool force,
70 const char *comment);
71 static void ovsdb_jsonrpc_session_set_all_options(
72 struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
73 static bool ovsdb_jsonrpc_active_session_get_status(
74 const struct ovsdb_jsonrpc_remote *,
75 struct ovsdb_jsonrpc_remote_status *);
76 static void ovsdb_jsonrpc_session_get_status(
77 const struct ovsdb_jsonrpc_session *,
78 struct ovsdb_jsonrpc_remote_status *);
79 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
80 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
81 static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
82 struct jsonrpc_msg *);
83
84 /* Triggers. */
85 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
86 struct ovsdb *,
87 struct jsonrpc_msg *request);
88 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
89 struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
90 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
91 static void ovsdb_jsonrpc_trigger_preremove_db(struct ovsdb_jsonrpc_session *,
92 struct ovsdb *);
93 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
94 static void ovsdb_jsonrpc_trigger_complete_done(
95 struct ovsdb_jsonrpc_session *);
96
97 /* Monitors. */
98 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
99 struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
100 enum ovsdb_monitor_version, const struct json *request_id);
101 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cond_change(
102 struct ovsdb_jsonrpc_session *s,
103 struct json *params,
104 const struct json *request_id);
105 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
106 struct ovsdb_jsonrpc_session *,
107 struct json_array *params,
108 const struct json *request_id);
109 static void ovsdb_jsonrpc_monitor_preremove_db(struct ovsdb_jsonrpc_session *,
110 struct ovsdb *);
111 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
112 static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
113 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
114 static struct json *ovsdb_jsonrpc_monitor_compose_update(
115 struct ovsdb_jsonrpc_monitor *monitor, bool initial);
116 static struct jsonrpc_msg * ovsdb_jsonrpc_create_notify(
117 const struct ovsdb_jsonrpc_monitor *m,
118 struct json *params);
119
120 \f
121 /* JSON-RPC database server. */
122
123 struct ovsdb_jsonrpc_server {
124 struct ovsdb_server up;
125 unsigned int n_sessions;
126 bool read_only; /* This server is does not accept any
127 transactions that can modify the database. */
128 struct shash remotes; /* Contains "struct ovsdb_jsonrpc_remote *"s. */
129 };
130
131 /* A configured remote. This is either a passive stream listener plus a list
132 * of the currently connected sessions, or a list of exactly one active
133 * session. */
134 struct ovsdb_jsonrpc_remote {
135 struct ovsdb_jsonrpc_server *server;
136 struct pstream *listener; /* Listener, if passive. */
137 struct ovs_list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */
138 uint8_t dscp;
139 bool read_only;
140 char *role;
141 };
142
143 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
144 struct ovsdb_jsonrpc_server *, const char *name,
145 const struct ovsdb_jsonrpc_options *options
146 );
147 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
148
149 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
150 *
151 * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
152 * which 'server' should provide access. */
153 struct ovsdb_jsonrpc_server *
154 ovsdb_jsonrpc_server_create(bool read_only)
155 {
156 struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
157 ovsdb_server_init(&server->up);
158 shash_init(&server->remotes);
159 server->read_only = read_only;
160 return server;
161 }
162
163 /* Adds 'db' to the set of databases served out by 'svr'. Returns true if
164 * successful, false if 'db''s name is the same as some database already in
165 * 'server'. */
166 bool
167 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server *svr, struct ovsdb *db)
168 {
169 ovsdb_jsonrpc_server_reconnect(
170 svr, false, xasprintf("adding %s database", db->name));
171 return ovsdb_server_add_db(&svr->up, db);
172 }
173
174 /* Removes 'db' from the set of databases served out by 'svr'.
175 *
176 * 'comment' should be a human-readable reason for removing the database, for
177 * use in log messages, or NULL to suppress logging. This function frees
178 * it. */
179 void
180 ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server *svr,
181 struct ovsdb *db, char *comment)
182 {
183 struct shash_node *node;
184 SHASH_FOR_EACH (node, &svr->remotes) {
185 struct ovsdb_jsonrpc_remote *remote = node->data;
186
187 ovsdb_jsonrpc_session_preremove_db(remote, db);
188 }
189
190 ovsdb_jsonrpc_server_reconnect(svr, false, comment);
191
192 ovsdb_server_remove_db(&svr->up, db);
193 }
194
195 void
196 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
197 {
198 struct shash_node *node, *next;
199
200 SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
201 ovsdb_jsonrpc_server_del_remote(node);
202 }
203 shash_destroy(&svr->remotes);
204 ovsdb_server_destroy(&svr->up);
205 free(svr);
206 }
207
208 struct ovsdb_jsonrpc_options *
209 ovsdb_jsonrpc_default_options(const char *target)
210 {
211 struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
212 options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
213 options->probe_interval = (stream_or_pstream_needs_probes(target)
214 ? RECONNECT_DEFAULT_PROBE_INTERVAL
215 : 0);
216 return options;
217 }
218
219 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
220 * options in the struct ovsdb_jsonrpc_options supplied as the data values.
221 *
222 * A remote is an active or passive stream connection method, e.g. "pssl:" or
223 * "tcp:1.2.3.4". */
224 void
225 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
226 const struct shash *new_remotes)
227 {
228 struct shash_node *node, *next;
229
230 SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
231 struct ovsdb_jsonrpc_remote *remote = node->data;
232 struct ovsdb_jsonrpc_options *options
233 = shash_find_data(new_remotes, node->name);
234
235 if (!options) {
236 VLOG_INFO("%s: remote deconfigured", node->name);
237 ovsdb_jsonrpc_server_del_remote(node);
238 } else if (options->dscp != remote->dscp) {
239 ovsdb_jsonrpc_server_del_remote(node);
240 }
241 }
242 SHASH_FOR_EACH (node, new_remotes) {
243 const struct ovsdb_jsonrpc_options *options = node->data;
244 struct ovsdb_jsonrpc_remote *remote;
245
246 remote = shash_find_data(&svr->remotes, node->name);
247 if (!remote) {
248 remote = ovsdb_jsonrpc_server_add_remote(svr, node->name, options);
249 if (!remote) {
250 continue;
251 }
252 }
253
254 ovsdb_jsonrpc_session_set_all_options(remote, options);
255 }
256 }
257
258 static struct ovsdb_jsonrpc_remote *
259 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
260 const char *name,
261 const struct ovsdb_jsonrpc_options *options)
262 {
263 struct ovsdb_jsonrpc_remote *remote;
264 struct pstream *listener;
265 int error;
266
267 error = jsonrpc_pstream_open(name, &listener, options->dscp);
268 if (error && error != EAFNOSUPPORT) {
269 VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, ovs_strerror(error));
270 return NULL;
271 }
272
273 remote = xmalloc(sizeof *remote);
274 remote->server = svr;
275 remote->listener = listener;
276 ovs_list_init(&remote->sessions);
277 remote->dscp = options->dscp;
278 remote->read_only = options->read_only;
279 remote->role = nullable_xstrdup(options->role);
280 shash_add(&svr->remotes, name, remote);
281
282 if (!listener) {
283 ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name, true),
284 svr->read_only || remote->read_only);
285 }
286 return remote;
287 }
288
289 static void
290 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
291 {
292 struct ovsdb_jsonrpc_remote *remote = node->data;
293
294 ovsdb_jsonrpc_session_close_all(remote);
295 pstream_close(remote->listener);
296 shash_delete(&remote->server->remotes, node);
297 free(remote->role);
298 free(remote);
299 }
300
301 /* Stores status information for the remote named 'target', which should have
302 * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
303 * into '*status'. On success returns true, on failure (if 'svr' doesn't have
304 * a remote named 'target' or if that remote is an outbound remote that has no
305 * active connections) returns false. On failure, 'status' will be zeroed.
306 */
307 bool
308 ovsdb_jsonrpc_server_get_remote_status(
309 const struct ovsdb_jsonrpc_server *svr, const char *target,
310 struct ovsdb_jsonrpc_remote_status *status)
311 {
312 const struct ovsdb_jsonrpc_remote *remote;
313
314 memset(status, 0, sizeof *status);
315
316 remote = shash_find_data(&svr->remotes, target);
317
318 if (!remote) {
319 return false;
320 }
321
322 if (remote->listener) {
323 status->bound_port = pstream_get_bound_port(remote->listener);
324 status->is_connected = !ovs_list_is_empty(&remote->sessions);
325 status->n_connections = ovs_list_size(&remote->sessions);
326 return true;
327 }
328
329 return ovsdb_jsonrpc_active_session_get_status(remote, status);
330 }
331
332 void
333 ovsdb_jsonrpc_server_free_remote_status(
334 struct ovsdb_jsonrpc_remote_status *status)
335 {
336 free(status->locks_held);
337 free(status->locks_waiting);
338 free(status->locks_lost);
339 }
340
341 /* Makes all of the JSON-RPC sessions managed by 'svr' disconnect. (They
342 * will then generally reconnect.). Uses 'comment' as a human-readable comment
343 * for logging (it may be NULL to suppress logging). Frees 'comment'.
344 *
345 * If 'force' is true, disconnects all sessions. Otherwise, disconnects only
346 * sesions that aren't database change aware. */
347 void
348 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr, bool force,
349 char *comment)
350 {
351 struct shash_node *node;
352
353 SHASH_FOR_EACH (node, &svr->remotes) {
354 struct ovsdb_jsonrpc_remote *remote = node->data;
355
356 ovsdb_jsonrpc_session_reconnect_all(remote, force, comment);
357 }
358
359 free(comment);
360 }
361
362 void
363 ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *svr,
364 bool read_only)
365 {
366 if (svr->read_only != read_only) {
367 svr->read_only = read_only;
368 ovsdb_jsonrpc_server_reconnect(svr, true,
369 xstrdup(read_only
370 ? "making server read-only"
371 : "making server read/write"));
372 }
373 }
374
375 void
376 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
377 {
378 struct shash_node *node;
379
380 SHASH_FOR_EACH (node, &svr->remotes) {
381 struct ovsdb_jsonrpc_remote *remote = node->data;
382
383 if (remote->listener) {
384 struct stream *stream;
385 int error;
386
387 error = pstream_accept(remote->listener, &stream);
388 if (!error) {
389 struct jsonrpc_session *js;
390 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
391 remote->dscp);
392 ovsdb_jsonrpc_session_create(remote, js, svr->read_only ||
393 remote->read_only);
394 } else if (error != EAGAIN) {
395 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
396 pstream_get_name(remote->listener),
397 ovs_strerror(error));
398 }
399 }
400
401 ovsdb_jsonrpc_session_run_all(remote);
402 }
403 }
404
405 void
406 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
407 {
408 struct shash_node *node;
409
410 SHASH_FOR_EACH (node, &svr->remotes) {
411 struct ovsdb_jsonrpc_remote *remote = node->data;
412
413 if (remote->listener) {
414 pstream_wait(remote->listener);
415 }
416
417 ovsdb_jsonrpc_session_wait_all(remote);
418 }
419 }
420
421 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
422 * memory_report(). */
423 void
424 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr,
425 struct simap *usage)
426 {
427 struct shash_node *node;
428
429 simap_increase(usage, "sessions", svr->n_sessions);
430 SHASH_FOR_EACH (node, &svr->remotes) {
431 struct ovsdb_jsonrpc_remote *remote = node->data;
432
433 ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage);
434 }
435 }
436 \f
437 /* JSON-RPC database server session. */
438
439 struct ovsdb_jsonrpc_session {
440 struct ovs_list node; /* Element in remote's sessions list. */
441 struct ovsdb_session up;
442 struct ovsdb_jsonrpc_remote *remote;
443
444 /* RFC 7047 does not contemplate how to alert clients to changes to the set
445 * of databases, e.g. databases that are added or removed while the
446 * database server is running. Traditionally, ovsdb-server disconnects all
447 * of its clients when this happens; a well-written client will reassess
448 * what is available from the server upon reconnection.
449 *
450 * OVS 2.9 introduces a way for clients to monitor changes to the databases
451 * being served, through the Database table in the _Server database that
452 * OVSDB adds in this version. ovsdb-server suppresses the connection
453 * close for clients that identify themselves as taking advantage of this
454 * mechanism. When this member is true, it indicates that the client
455 * requested such suppression. */
456 bool db_change_aware;
457
458 /* Triggers. */
459 struct hmap triggers; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
460
461 /* Monitors. */
462 struct hmap monitors; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
463
464 /* Network connectivity. */
465 struct jsonrpc_session *js; /* JSON-RPC session. */
466 unsigned int js_seqno; /* Last jsonrpc_session_get_seqno() value. */
467
468 /* Read only. */
469 bool read_only; /* When true, not allow to modify the
470 database. */
471 };
472
473 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
474 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
475 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
476 static void ovsdb_jsonrpc_session_get_memory_usage(
477 const struct ovsdb_jsonrpc_session *, struct simap *usage);
478 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
479 struct jsonrpc_msg *);
480 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
481 struct jsonrpc_msg *);
482
483 static struct ovsdb_jsonrpc_session *
484 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
485 struct jsonrpc_session *js, bool read_only)
486 {
487 struct ovsdb_jsonrpc_session *s;
488
489 s = xzalloc(sizeof *s);
490 ovsdb_session_init(&s->up, &remote->server->up);
491 s->remote = remote;
492 ovs_list_push_back(&remote->sessions, &s->node);
493 hmap_init(&s->triggers);
494 hmap_init(&s->monitors);
495 s->js = js;
496 s->js_seqno = jsonrpc_session_get_seqno(js);
497 s->read_only = read_only;
498
499 remote->server->n_sessions++;
500
501 return s;
502 }
503
504 /* Database 'db' is about to be removed from the database server. To prepare,
505 * this function removes all references to 'db' from 'remote'. */
506 static void
507 ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote *remote,
508 struct ovsdb *db)
509 {
510 struct ovsdb_jsonrpc_session *s;
511
512 LIST_FOR_EACH (s, node, &remote->sessions) {
513 ovsdb_jsonrpc_monitor_preremove_db(s, db);
514 ovsdb_jsonrpc_trigger_preremove_db(s, db);
515 }
516 }
517
518 static void
519 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
520 {
521 ovsdb_jsonrpc_monitor_remove_all(s);
522 ovsdb_jsonrpc_session_unlock_all(s);
523 ovsdb_jsonrpc_trigger_complete_all(s);
524
525 hmap_destroy(&s->monitors);
526 hmap_destroy(&s->triggers);
527
528 jsonrpc_session_close(s->js);
529 ovs_list_remove(&s->node);
530 s->remote->server->n_sessions--;
531 ovsdb_session_destroy(&s->up);
532 free(s);
533 }
534
535 static int
536 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
537 {
538 jsonrpc_session_run(s->js);
539 if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
540 s->js_seqno = jsonrpc_session_get_seqno(s->js);
541 ovsdb_jsonrpc_trigger_complete_all(s);
542 ovsdb_jsonrpc_monitor_remove_all(s);
543 ovsdb_jsonrpc_session_unlock_all(s);
544 }
545
546 ovsdb_jsonrpc_trigger_complete_done(s);
547
548 if (!jsonrpc_session_get_backlog(s->js)) {
549 struct jsonrpc_msg *msg;
550
551 ovsdb_jsonrpc_monitor_flush_all(s);
552
553 msg = jsonrpc_session_recv(s->js);
554 if (msg) {
555 if (msg->type == JSONRPC_REQUEST) {
556 ovsdb_jsonrpc_session_got_request(s, msg);
557 } else if (msg->type == JSONRPC_NOTIFY) {
558 ovsdb_jsonrpc_session_got_notify(s, msg);
559 } else {
560 VLOG_WARN("%s: received unexpected %s message",
561 jsonrpc_session_get_name(s->js),
562 jsonrpc_msg_type_to_string(msg->type));
563 jsonrpc_session_force_reconnect(s->js);
564 jsonrpc_msg_destroy(msg);
565 }
566 }
567 }
568 return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
569 }
570
571 static void
572 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
573 const struct ovsdb_jsonrpc_options *options)
574 {
575 jsonrpc_session_set_max_backoff(session->js, options->max_backoff);
576 jsonrpc_session_set_probe_interval(session->js, options->probe_interval);
577 jsonrpc_session_set_dscp(session->js, options->dscp);
578 }
579
580 static void
581 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
582 {
583 struct ovsdb_jsonrpc_session *s, *next;
584
585 LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
586 int error = ovsdb_jsonrpc_session_run(s);
587 if (error) {
588 ovsdb_jsonrpc_session_close(s);
589 }
590 }
591 }
592
593 static void
594 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
595 {
596 jsonrpc_session_wait(s->js);
597 if (!jsonrpc_session_get_backlog(s->js)) {
598 if (ovsdb_jsonrpc_monitor_needs_flush(s)) {
599 poll_immediate_wake();
600 } else {
601 jsonrpc_session_recv_wait(s->js);
602 }
603 }
604 }
605
606 static void
607 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
608 {
609 struct ovsdb_jsonrpc_session *s;
610
611 LIST_FOR_EACH (s, node, &remote->sessions) {
612 ovsdb_jsonrpc_session_wait(s);
613 }
614 }
615
616 static void
617 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
618 struct simap *usage)
619 {
620 simap_increase(usage, "triggers", hmap_count(&s->triggers));
621 simap_increase(usage, "backlog", jsonrpc_session_get_backlog(s->js));
622 }
623
624 static void
625 ovsdb_jsonrpc_session_get_memory_usage_all(
626 const struct ovsdb_jsonrpc_remote *remote,
627 struct simap *usage)
628 {
629 struct ovsdb_jsonrpc_session *s;
630
631 LIST_FOR_EACH (s, node, &remote->sessions) {
632 ovsdb_jsonrpc_session_get_memory_usage(s, usage);
633 }
634 }
635
636 static void
637 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
638 {
639 struct ovsdb_jsonrpc_session *s, *next;
640
641 LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
642 ovsdb_jsonrpc_session_close(s);
643 }
644 }
645
646 /* Makes all of the JSON-RPC sessions managed by 'remote' disconnect. (They
647 * will then generally reconnect.). 'comment' should be a human-readable
648 * explanation of the reason for disconnection, for use in log messages, or
649 * NULL to suppress logging.
650 *
651 * If 'force' is true, disconnects all sessions. Otherwise, disconnects only
652 * sesions that aren't database change aware. */
653 static void
654 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote,
655 bool force, const char *comment)
656 {
657 struct ovsdb_jsonrpc_session *s, *next;
658
659 LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
660 if (force || !s->db_change_aware) {
661 jsonrpc_session_force_reconnect(s->js);
662 if (comment && jsonrpc_session_is_connected(s->js)) {
663 VLOG_INFO("%s: disconnecting (%s)",
664 jsonrpc_session_get_name(s->js), comment);
665 }
666 if (!jsonrpc_session_is_alive(s->js)) {
667 ovsdb_jsonrpc_session_close(s);
668 }
669 }
670 }
671 }
672
673 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
674 * 'options'.
675 *
676 * (The dscp value can't be changed directly; the caller must instead close and
677 * re-open the session.) */
678 static void
679 ovsdb_jsonrpc_session_set_all_options(
680 struct ovsdb_jsonrpc_remote *remote,
681 const struct ovsdb_jsonrpc_options *options)
682 {
683 struct ovsdb_jsonrpc_session *s;
684
685 LIST_FOR_EACH (s, node, &remote->sessions) {
686 ovsdb_jsonrpc_session_set_options(s, options);
687 }
688 }
689
690 /* Sets the 'status' of for the 'remote' with an outgoing connection. */
691 static bool
692 ovsdb_jsonrpc_active_session_get_status(
693 const struct ovsdb_jsonrpc_remote *remote,
694 struct ovsdb_jsonrpc_remote_status *status)
695 {
696 const struct ovs_list *sessions = &remote->sessions;
697 const struct ovsdb_jsonrpc_session *s;
698
699 if (ovs_list_is_empty(sessions)) {
700 return false;
701 }
702
703 ovs_assert(ovs_list_is_singleton(sessions));
704 s = CONTAINER_OF(ovs_list_front(sessions), struct ovsdb_jsonrpc_session, node);
705 ovsdb_jsonrpc_session_get_status(s, status);
706 status->n_connections = 1;
707
708 return true;
709 }
710
711 static void
712 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_session *session,
713 struct ovsdb_jsonrpc_remote_status *status)
714 {
715 const struct ovsdb_jsonrpc_session *s = session;
716 const struct jsonrpc_session *js;
717 struct ovsdb_lock_waiter *waiter;
718 struct reconnect_stats rstats;
719 struct ds locks_held, locks_waiting, locks_lost;
720
721 js = s->js;
722
723 status->is_connected = jsonrpc_session_is_connected(js);
724 status->last_error = jsonrpc_session_get_status(js);
725
726 jsonrpc_session_get_reconnect_stats(js, &rstats);
727 status->state = rstats.state;
728 status->sec_since_connect = rstats.msec_since_connect == UINT_MAX
729 ? UINT_MAX : rstats.msec_since_connect / 1000;
730 status->sec_since_disconnect = rstats.msec_since_disconnect == UINT_MAX
731 ? UINT_MAX : rstats.msec_since_disconnect / 1000;
732
733 ds_init(&locks_held);
734 ds_init(&locks_waiting);
735 ds_init(&locks_lost);
736 HMAP_FOR_EACH (waiter, session_node, &s->up.waiters) {
737 struct ds *string;
738
739 string = (ovsdb_lock_waiter_is_owner(waiter) ? &locks_held
740 : waiter->mode == OVSDB_LOCK_WAIT ? &locks_waiting
741 : &locks_lost);
742 if (string->length) {
743 ds_put_char(string, ' ');
744 }
745 ds_put_cstr(string, waiter->lock_name);
746 }
747 status->locks_held = ds_steal_cstr(&locks_held);
748 status->locks_waiting = ds_steal_cstr(&locks_waiting);
749 status->locks_lost = ds_steal_cstr(&locks_lost);
750 }
751
752 /* Examines 'request' to determine the database to which it relates, and then
753 * searches 's' to find that database:
754 *
755 * - If successful, returns the database and sets '*replyp' to NULL.
756 *
757 * - If no such database exists, returns NULL and sets '*replyp' to an
758 * appropriate JSON-RPC error reply, owned by the caller. */
759 static struct ovsdb *
760 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session *s,
761 const struct jsonrpc_msg *request,
762 struct jsonrpc_msg **replyp)
763 {
764 struct json_array *params;
765 struct ovsdb_error *error;
766 const char *db_name;
767 struct ovsdb *db;
768
769 params = json_array(request->params);
770 if (!params->n || params->elems[0]->type != JSON_STRING) {
771 error = ovsdb_syntax_error(
772 request->params, NULL,
773 "%s request params must begin with <db-name>", request->method);
774 goto error;
775 }
776
777 db_name = params->elems[0]->string;
778 db = shash_find_data(&s->up.server->dbs, db_name);
779 if (!db) {
780 error = ovsdb_syntax_error(
781 request->params, "unknown database",
782 "%s request specifies unknown database %s",
783 request->method, db_name);
784 goto error;
785 }
786
787 if (!db->schema) {
788 error = ovsdb_error("database not available",
789 "%s request specifies database %s which is not "
790 "yet available because it has not completed "
791 "joining its cluster",
792 request->method, db_name);
793 goto error;
794 }
795
796 *replyp = NULL;
797 return db;
798
799 error:
800 *replyp = jsonrpc_create_error(ovsdb_error_to_json_free(error),
801 request->id);
802 return NULL;
803 }
804
805 static struct ovsdb_error *
806 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg *request,
807 const char **lock_namep)
808 {
809 const struct json_array *params;
810
811 params = json_array(request->params);
812 if (params->n != 1 || params->elems[0]->type != JSON_STRING ||
813 !ovsdb_parser_is_id(json_string(params->elems[0]))) {
814 *lock_namep = NULL;
815 return ovsdb_syntax_error(request->params, NULL,
816 "%s request params must be <id>",
817 request->method);
818 }
819
820 *lock_namep = json_string(params->elems[0]);
821 return NULL;
822 }
823
824 static void
825 ovsdb_jsonrpc_session_notify(struct ovsdb_session *session,
826 const char *lock_name,
827 const char *method)
828 {
829 struct ovsdb_jsonrpc_session *s;
830 struct json *params;
831
832 s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
833 params = json_array_create_1(json_string_create(lock_name));
834 ovsdb_jsonrpc_session_send(s, jsonrpc_create_notify(method, params));
835 }
836
837 static struct jsonrpc_msg *
838 jsonrpc_create_readonly_lock_error(const struct json *id)
839 {
840 return jsonrpc_create_error(json_string_create(
841 "lock and unlock methods not allowed,"
842 " DB server is read only."), id);
843 }
844
845 static struct jsonrpc_msg *
846 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
847 struct jsonrpc_msg *request,
848 enum ovsdb_lock_mode mode)
849 {
850 struct ovsdb_lock_waiter *waiter;
851 struct ovsdb_error *error;
852 struct ovsdb_session *victim;
853 const char *lock_name;
854 struct json *result;
855
856 if (s->read_only) {
857 return jsonrpc_create_readonly_lock_error(request->id);
858 }
859
860 error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
861 if (error) {
862 goto error;
863 }
864
865 /* Report error if this session has issued a "lock" or "steal" without a
866 * matching "unlock" for this lock. */
867 waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
868 if (waiter) {
869 error = ovsdb_syntax_error(
870 request->params, NULL,
871 "must issue \"unlock\" before new \"%s\"", request->method);
872 goto error;
873 }
874
875 /* Get the lock, add us as a waiter. */
876 waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode,
877 &victim);
878 if (victim) {
879 ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen");
880 }
881
882 result = json_object_create();
883 json_object_put(result, "locked",
884 json_boolean_create(ovsdb_lock_waiter_is_owner(waiter)));
885
886 return jsonrpc_create_reply(result, request->id);
887
888 error:
889 return jsonrpc_create_error(ovsdb_error_to_json_free(error), request->id);
890 }
891
892 static void
893 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *s)
894 {
895 struct ovsdb_lock_waiter *waiter, *next;
896
897 HMAP_FOR_EACH_SAFE (waiter, next, session_node, &s->up.waiters) {
898 ovsdb_jsonrpc_session_unlock__(waiter);
899 }
900 }
901
902 static void
903 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *waiter)
904 {
905 struct ovsdb_lock *lock = waiter->lock;
906
907 if (lock) {
908 struct ovsdb_session *new_owner = ovsdb_lock_waiter_remove(waiter);
909 if (new_owner) {
910 ovsdb_jsonrpc_session_notify(new_owner, lock->name, "locked");
911 } else {
912 /* ovsdb_server_lock() might have freed 'lock'. */
913 }
914 }
915
916 ovsdb_lock_waiter_destroy(waiter);
917 }
918
919 static struct jsonrpc_msg *
920 syntax_error_reply(const struct jsonrpc_msg *request, const char *details)
921 {
922 struct ovsdb_error *error = ovsdb_syntax_error(
923 request->params, NULL, "%s: %s", request->method, details);
924 struct jsonrpc_msg *msg = jsonrpc_create_error(ovsdb_error_to_json(error),
925 request->id);
926 ovsdb_error_destroy(error);
927 return msg;
928 }
929
930 static struct jsonrpc_msg *
931 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session *s,
932 struct jsonrpc_msg *request)
933 {
934 struct ovsdb_lock_waiter *waiter;
935 struct ovsdb_error *error;
936 const char *lock_name;
937
938 if (s->read_only) {
939 return jsonrpc_create_readonly_lock_error(request->id);
940 }
941
942 error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
943 if (error) {
944 return jsonrpc_create_error(ovsdb_error_to_json_free(error),
945 request->id);
946 }
947
948 /* Report error if this session has not issued a "lock" or "steal" for this
949 * lock. */
950 waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
951 if (!waiter) {
952 return syntax_error_reply(request,
953 "\"unlock\" without \"lock\" or \"steal\"");
954 }
955
956 ovsdb_jsonrpc_session_unlock__(waiter);
957
958 return jsonrpc_create_reply(json_object_create(), request->id);
959 }
960
961 static struct jsonrpc_msg *
962 ovsdb_jsonrpc_session_set_db_change_aware(struct ovsdb_jsonrpc_session *s,
963 const struct jsonrpc_msg *request)
964 {
965 const struct json_array *params = json_array(request->params);
966 if (params->n != 1
967 || (params->elems[0]->type != JSON_TRUE &&
968 params->elems[0]->type != JSON_FALSE)) {
969 return syntax_error_reply(request, "true or false parameter expected");
970 }
971
972 s->db_change_aware = json_boolean(params->elems[0]);
973 return jsonrpc_create_reply(json_object_create(), request->id);
974 }
975
976 static void
977 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
978 struct jsonrpc_msg *request)
979 {
980 struct jsonrpc_msg *reply;
981
982 if (!strcmp(request->method, "transact") ||
983 !strcmp(request->method, "convert")) {
984 struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
985 if (!reply) {
986 ovsdb_jsonrpc_trigger_create(s, db, request);
987 }
988 } else if (!strcmp(request->method, "monitor") ||
989 (monitor_cond_enable__ &&
990 (!strcmp(request->method, "monitor_cond") ||
991 !strcmp(request->method, "monitor_cond_since")))) {
992 struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
993 if (!reply) {
994 enum ovsdb_monitor_version version;
995 if (!strcmp(request->method, "monitor")) {
996 version = OVSDB_MONITOR_V1;
997 } else if (!strcmp(request->method, "monitor_cond")) {
998 version = OVSDB_MONITOR_V2;
999 } else {
1000 version = OVSDB_MONITOR_V3;
1001 }
1002 reply = ovsdb_jsonrpc_monitor_create(s, db, request->params,
1003 version, request->id);
1004 }
1005 } else if (!strcmp(request->method, "monitor_cond_change")) {
1006 reply = ovsdb_jsonrpc_monitor_cond_change(s, request->params,
1007 request->id);
1008 } else if (!strcmp(request->method, "monitor_cancel")) {
1009 reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
1010 request->id);
1011 } else if (!strcmp(request->method, "get_schema")) {
1012 struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
1013 if (!reply) {
1014 reply = jsonrpc_create_reply(ovsdb_schema_to_json(db->schema),
1015 request->id);
1016 }
1017 } else if (!strcmp(request->method, "list_dbs")) {
1018 size_t n_dbs = shash_count(&s->up.server->dbs);
1019 struct shash_node *node;
1020 struct json **dbs;
1021 size_t i;
1022
1023 dbs = xmalloc(n_dbs * sizeof *dbs);
1024 i = 0;
1025 SHASH_FOR_EACH (node, &s->up.server->dbs) {
1026 dbs[i++] = json_string_create(node->name);
1027 }
1028 reply = jsonrpc_create_reply(json_array_create(dbs, n_dbs),
1029 request->id);
1030 } else if (!strcmp(request->method, "get_server_id")) {
1031 const struct uuid *uuid = &s->up.server->uuid;
1032 struct json *result;
1033
1034 result = json_string_create_nocopy(xasprintf(UUID_FMT,
1035 UUID_ARGS(uuid)));
1036 reply = jsonrpc_create_reply(result, request->id);
1037 } else if (!strcmp(request->method, "lock")) {
1038 reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_WAIT);
1039 } else if (!strcmp(request->method, "steal")) {
1040 reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_STEAL);
1041 } else if (!strcmp(request->method, "unlock")) {
1042 reply = ovsdb_jsonrpc_session_unlock(s, request);
1043 } else if (!strcmp(request->method, "set_db_change_aware")) {
1044 reply = ovsdb_jsonrpc_session_set_db_change_aware(s, request);
1045 } else if (!strcmp(request->method, "echo")) {
1046 reply = jsonrpc_create_reply(json_clone(request->params), request->id);
1047 } else {
1048 reply = jsonrpc_create_error(json_string_create("unknown method"),
1049 request->id);
1050 }
1051
1052 if (reply) {
1053 jsonrpc_msg_destroy(request);
1054 ovsdb_jsonrpc_session_send(s, reply);
1055 }
1056 }
1057
1058 static void
1059 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
1060 {
1061 if (json_array(request->params)->n == 1) {
1062 struct ovsdb_jsonrpc_trigger *t;
1063 struct json *id;
1064
1065 id = request->params->array.elems[0];
1066 t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
1067 if (t) {
1068 ovsdb_jsonrpc_trigger_complete(t);
1069 }
1070 }
1071 }
1072
1073 static void
1074 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
1075 struct jsonrpc_msg *request)
1076 {
1077 if (!strcmp(request->method, "cancel")) {
1078 execute_cancel(s, request);
1079 }
1080 jsonrpc_msg_destroy(request);
1081 }
1082
1083 static void
1084 ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *s,
1085 struct jsonrpc_msg *msg)
1086 {
1087 ovsdb_jsonrpc_monitor_flush_all(s);
1088 jsonrpc_session_send(s->js, msg);
1089 }
1090 \f
1091 /* JSON-RPC database server triggers.
1092 *
1093 * (Every transaction is treated as a trigger even if it doesn't actually have
1094 * any "wait" operations.) */
1095
1096 struct ovsdb_jsonrpc_trigger {
1097 struct ovsdb_trigger trigger;
1098 struct hmap_node hmap_node; /* In session's "triggers" hmap. */
1099 struct json *id;
1100 };
1101
1102 static void
1103 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
1104 struct jsonrpc_msg *request)
1105 {
1106 /* Check for duplicate ID. */
1107 size_t hash = json_hash(request->id, 0);
1108 struct ovsdb_jsonrpc_trigger *t
1109 = ovsdb_jsonrpc_trigger_find(s, request->id, hash);
1110 if (t) {
1111 ovsdb_jsonrpc_session_send(
1112 s, syntax_error_reply(request, "duplicate request ID"));
1113 jsonrpc_msg_destroy(request);
1114 return;
1115 }
1116
1117 /* Insert into trigger table. */
1118 t = xmalloc(sizeof *t);
1119 bool disconnect_all = ovsdb_trigger_init(
1120 &s->up, db, &t->trigger, request, time_msec(), s->read_only,
1121 s->remote->role, jsonrpc_session_get_id(s->js));
1122 t->id = json_clone(request->id);
1123 hmap_insert(&s->triggers, &t->hmap_node, hash);
1124
1125 /* Complete early if possible. */
1126 if (ovsdb_trigger_is_complete(&t->trigger)) {
1127 ovsdb_jsonrpc_trigger_complete(t);
1128 }
1129
1130 if (disconnect_all) {
1131 /* The message below is currently the only reason to disconnect all
1132 * clients. */
1133 ovsdb_jsonrpc_server_reconnect(s->remote->server, false,
1134 xasprintf("committed %s database "
1135 "schema conversion",
1136 db->name));
1137 }
1138 }
1139
1140 static struct ovsdb_jsonrpc_trigger *
1141 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
1142 const struct json *id, size_t hash)
1143 {
1144 struct ovsdb_jsonrpc_trigger *t;
1145
1146 HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &s->triggers) {
1147 if (json_equal(t->id, id)) {
1148 return t;
1149 }
1150 }
1151
1152 return NULL;
1153 }
1154
1155 static void
1156 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
1157 {
1158 struct ovsdb_jsonrpc_session *s;
1159
1160 s = CONTAINER_OF(t->trigger.session, struct ovsdb_jsonrpc_session, up);
1161
1162 if (jsonrpc_session_is_connected(s->js)) {
1163 bool complete = ovsdb_trigger_is_complete(&t->trigger);
1164 if (s->db_change_aware && !complete) {
1165 ovsdb_trigger_cancel(&t->trigger, "closing JSON-RPC session");
1166 complete = true;
1167 }
1168 if (complete) {
1169 struct jsonrpc_msg *reply = ovsdb_trigger_steal_reply(&t->trigger);
1170 ovsdb_jsonrpc_session_send(s, reply);
1171 }
1172 }
1173
1174 json_destroy(t->id);
1175 ovsdb_trigger_destroy(&t->trigger);
1176 hmap_remove(&s->triggers, &t->hmap_node);
1177 free(t);
1178 }
1179
1180 static void
1181 ovsdb_jsonrpc_trigger_remove__(struct ovsdb_jsonrpc_session *s,
1182 struct ovsdb *db)
1183 {
1184 struct ovsdb_jsonrpc_trigger *t, *next;
1185 HMAP_FOR_EACH_SAFE (t, next, hmap_node, &s->triggers) {
1186 if (!db || t->trigger.db == db) {
1187 ovsdb_jsonrpc_trigger_complete(t);
1188 }
1189 }
1190 }
1191
1192 /* Database 'db' is about to be removed from the database server. To prepare,
1193 * this function removes all references from triggers in 's' to 'db'. */
1194 static void
1195 ovsdb_jsonrpc_trigger_preremove_db(struct ovsdb_jsonrpc_session *s,
1196 struct ovsdb *db)
1197 {
1198 ovs_assert(db);
1199 ovsdb_jsonrpc_trigger_remove__(s, db);
1200 }
1201
1202 /* Removes all triggers from 's'. */
1203 static void
1204 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
1205 {
1206 ovsdb_jsonrpc_trigger_remove__(s, NULL);
1207 }
1208
1209 static void
1210 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
1211 {
1212 struct ovsdb_jsonrpc_trigger *trigger, *next;
1213 LIST_FOR_EACH_SAFE (trigger, next, trigger.node, &s->up.completions) {
1214 ovsdb_jsonrpc_trigger_complete(trigger);
1215 }
1216 }
1217 \f
1218 /* Jsonrpc front end monitor. */
1219 struct ovsdb_jsonrpc_monitor {
1220 struct hmap_node node; /* In ovsdb_jsonrpc_session's "monitors". */
1221 struct ovsdb_jsonrpc_session *session;
1222 struct ovsdb *db;
1223 struct json *monitor_id;
1224 struct ovsdb_monitor *dbmon;
1225 struct ovsdb_monitor_change_set *change_set;
1226 enum ovsdb_monitor_version version;
1227 struct ovsdb_monitor_session_condition *condition;/* Session's condition */
1228 };
1229
1230 static struct ovsdb_jsonrpc_monitor *
1231 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
1232 const struct json *monitor_id)
1233 {
1234 struct ovsdb_jsonrpc_monitor *m;
1235
1236 HMAP_FOR_EACH_WITH_HASH (m, node, json_hash(monitor_id, 0), &s->monitors) {
1237 if (json_equal(m->monitor_id, monitor_id)) {
1238 return m;
1239 }
1240 }
1241
1242 return NULL;
1243 }
1244
1245 static bool
1246 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
1247 {
1248 const struct json *json;
1249
1250 json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
1251 return json ? json_boolean(json) : default_value;
1252 }
1253
1254 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
1255 ovsdb_jsonrpc_parse_monitor_request(
1256 struct ovsdb_monitor *dbmon,
1257 const struct ovsdb_table *table,
1258 struct ovsdb_monitor_session_condition *cond,
1259 const struct json *monitor_request)
1260 {
1261 const struct ovsdb_table_schema *ts = table->schema;
1262 enum ovsdb_monitor_selection select;
1263 const struct json *columns, *select_json, *where = NULL;
1264 struct ovsdb_parser parser;
1265 struct ovsdb_error *error;
1266
1267 ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
1268 if (cond) {
1269 where = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
1270 }
1271 columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1272
1273 select_json = ovsdb_parser_member(&parser, "select",
1274 OP_OBJECT | OP_OPTIONAL);
1275
1276 error = ovsdb_parser_finish(&parser);
1277 if (error) {
1278 return error;
1279 }
1280
1281 if (select_json) {
1282 select = 0;
1283 ovsdb_parser_init(&parser, select_json, "table %s select", ts->name);
1284 if (parse_bool(&parser, "initial", true)) {
1285 select |= OJMS_INITIAL;
1286 }
1287 if (parse_bool(&parser, "insert", true)) {
1288 select |= OJMS_INSERT;
1289 }
1290 if (parse_bool(&parser, "delete", true)) {
1291 select |= OJMS_DELETE;
1292 }
1293 if (parse_bool(&parser, "modify", true)) {
1294 select |= OJMS_MODIFY;
1295 }
1296 error = ovsdb_parser_finish(&parser);
1297 if (error) {
1298 return error;
1299 }
1300 } else {
1301 select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
1302 }
1303
1304 ovsdb_monitor_table_add_select(dbmon, table, select);
1305 if (columns) {
1306 size_t i;
1307
1308 if (columns->type != JSON_ARRAY) {
1309 return ovsdb_syntax_error(columns, NULL,
1310 "array of column names expected");
1311 }
1312
1313 for (i = 0; i < columns->array.n; i++) {
1314 const struct ovsdb_column *column;
1315 const char *s;
1316
1317 if (columns->array.elems[i]->type != JSON_STRING) {
1318 return ovsdb_syntax_error(columns, NULL,
1319 "array of column names expected");
1320 }
1321
1322 s = columns->array.elems[i]->string;
1323 column = shash_find_data(&table->schema->columns, s);
1324 if (!column) {
1325 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
1326 "column name", s);
1327 }
1328 if (ovsdb_monitor_add_column(dbmon, table, column,
1329 select, true)) {
1330 return ovsdb_syntax_error(columns, NULL, "column %s "
1331 "mentioned more than once",
1332 column->name);
1333 }
1334 }
1335 } else {
1336 struct shash_node *node;
1337
1338 SHASH_FOR_EACH (node, &ts->columns) {
1339 const struct ovsdb_column *column = node->data;
1340 if (column->index != OVSDB_COL_UUID) {
1341 if (ovsdb_monitor_add_column(dbmon, table, column,
1342 select, true)) {
1343 return ovsdb_syntax_error(columns, NULL, "column %s "
1344 "mentioned more than once",
1345 column->name);
1346 }
1347 }
1348 }
1349 }
1350 if (cond) {
1351 error = ovsdb_monitor_table_condition_create(cond, table, where);
1352 if (error) {
1353 return error;
1354 }
1355 }
1356
1357 return NULL;
1358 }
1359
1360 static struct jsonrpc_msg *
1361 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
1362 struct json *params,
1363 enum ovsdb_monitor_version version,
1364 const struct json *request_id)
1365 {
1366 struct ovsdb_jsonrpc_monitor *m = NULL;
1367 struct ovsdb_monitor *dbmon = NULL;
1368 struct json *monitor_id, *monitor_requests;
1369 struct ovsdb_error *error = NULL;
1370 struct shash_node *node;
1371 struct json *json;
1372
1373 if ((version == OVSDB_MONITOR_V2 && json_array(params)->n != 3) ||
1374 (version == OVSDB_MONITOR_V3 && json_array(params)->n != 4)) {
1375 error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1376 goto error;
1377 }
1378 monitor_id = params->array.elems[1];
1379 monitor_requests = params->array.elems[2];
1380 if (monitor_requests->type != JSON_OBJECT) {
1381 error = ovsdb_syntax_error(monitor_requests, NULL,
1382 "monitor-requests must be object");
1383 goto error;
1384 }
1385
1386 if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
1387 error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
1388 goto error;
1389 }
1390
1391 m = xzalloc(sizeof *m);
1392 m->session = s;
1393 m->db = db;
1394 m->dbmon = ovsdb_monitor_create(db, m);
1395 if (version == OVSDB_MONITOR_V2 || version == OVSDB_MONITOR_V3) {
1396 m->condition = ovsdb_monitor_session_condition_create();
1397 }
1398 m->version = version;
1399 hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
1400 m->monitor_id = json_clone(monitor_id);
1401
1402 SHASH_FOR_EACH (node, json_object(monitor_requests)) {
1403 const struct ovsdb_table *table;
1404 const struct json *mr_value;
1405 size_t i;
1406
1407 table = ovsdb_get_table(m->db, node->name);
1408 if (!table) {
1409 error = ovsdb_syntax_error(NULL, NULL,
1410 "no table named %s", node->name);
1411 goto error;
1412 }
1413
1414 ovsdb_monitor_add_table(m->dbmon, table);
1415
1416 /* Parse columns. */
1417 mr_value = node->data;
1418 if (mr_value->type == JSON_ARRAY) {
1419 const struct json_array *array = &mr_value->array;
1420
1421 for (i = 0; i < array->n; i++) {
1422 error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
1423 table,
1424 m->condition,
1425 array->elems[i]);
1426 if (error) {
1427 goto error;
1428 }
1429 }
1430 } else {
1431 error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
1432 table,
1433 m->condition,
1434 mr_value);
1435 if (error) {
1436 goto error;
1437 }
1438 }
1439 }
1440
1441 dbmon = ovsdb_monitor_add(m->dbmon);
1442 if (dbmon != m->dbmon) {
1443 /* Found an exisiting dbmon, reuse the current one. */
1444 ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, NULL);
1445 ovsdb_monitor_add_jsonrpc_monitor(dbmon, m);
1446 m->dbmon = dbmon;
1447 }
1448
1449 /* Only now we can bind session's condition to ovsdb_monitor */
1450 if (m->condition) {
1451 ovsdb_monitor_condition_bind(m->dbmon, m->condition);
1452 }
1453
1454 bool initial = false;
1455 if (version == OVSDB_MONITOR_V3) {
1456 struct json *last_id = params->array.elems[3];
1457 if (last_id->type != JSON_STRING) {
1458 error = ovsdb_syntax_error(last_id, NULL,
1459 "last-txn-id must be string");
1460 goto error;
1461 }
1462 struct uuid txn_uuid;
1463 if (!uuid_from_string(&txn_uuid, last_id->string)) {
1464 error = ovsdb_syntax_error(last_id, NULL,
1465 "last-txn-id must be UUID format.");
1466 goto error;
1467 }
1468 if (!uuid_is_zero(&txn_uuid)) {
1469 ovsdb_monitor_get_changes_after(&txn_uuid, m->dbmon,
1470 &m->change_set);
1471 }
1472 }
1473 if (!m->change_set) {
1474 ovsdb_monitor_get_initial(m->dbmon, &m->change_set);
1475 initial = true;
1476 }
1477 json = ovsdb_jsonrpc_monitor_compose_update(m, initial);
1478 json = json ? json : json_object_create();
1479
1480 if (m->version == OVSDB_MONITOR_V3) {
1481 struct json *json_last_id = json_string_create_nocopy(
1482 xasprintf(UUID_FMT,
1483 UUID_ARGS(ovsdb_monitor_get_last_txnid(
1484 m->dbmon))));
1485
1486 struct json *json_found = json_boolean_create(!initial);
1487 json = json_array_create_3(json_found, json_last_id, json);
1488 }
1489
1490 return jsonrpc_create_reply(json, request_id);
1491
1492 error:
1493 if (m) {
1494 ovsdb_jsonrpc_monitor_destroy(m, false);
1495 }
1496
1497 return jsonrpc_create_error(ovsdb_error_to_json_free(error), request_id);
1498 }
1499
1500 static struct ovsdb_error *
1501 ovsdb_jsonrpc_parse_monitor_cond_change_request(
1502 struct ovsdb_jsonrpc_monitor *m,
1503 const struct ovsdb_table *table,
1504 const struct json *cond_change_req)
1505 {
1506 const struct ovsdb_table_schema *ts = table->schema;
1507 const struct json *condition, *columns;
1508 struct ovsdb_parser parser;
1509 struct ovsdb_error *error;
1510
1511 ovsdb_parser_init(&parser, cond_change_req, "table %s", ts->name);
1512 columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1513 condition = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
1514
1515 error = ovsdb_parser_finish(&parser);
1516 if (error) {
1517 return error;
1518 }
1519
1520 if (columns) {
1521 error = ovsdb_syntax_error(cond_change_req, NULL, "changing columns "
1522 "is unsupported");
1523 return error;
1524 }
1525 error = ovsdb_monitor_table_condition_update(m->dbmon, m->condition, table,
1526 condition);
1527
1528 return error;
1529 }
1530
1531 static struct jsonrpc_msg *
1532 ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session *s,
1533 struct json *params,
1534 const struct json *request_id)
1535 {
1536 struct ovsdb_error *error;
1537 struct ovsdb_jsonrpc_monitor *m;
1538 struct json *monitor_cond_change_reqs;
1539 struct shash_node *node;
1540
1541 if (json_array(params)->n != 3) {
1542 error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1543 goto error;
1544 }
1545
1546 m = ovsdb_jsonrpc_monitor_find(s, params->array.elems[0]);
1547 if (!m) {
1548 error = ovsdb_syntax_error(params->array.elems[0], NULL,
1549 "unknown monitor session");
1550 goto error;
1551 }
1552
1553 const struct json *new_monitor_id = params->array.elems[1];
1554 bool changing_id = !json_equal(m->monitor_id, new_monitor_id);
1555 if (changing_id && ovsdb_jsonrpc_monitor_find(s, new_monitor_id)) {
1556 error = ovsdb_syntax_error(new_monitor_id, NULL,
1557 "duplicate monitor ID");
1558 goto error;
1559 }
1560
1561 monitor_cond_change_reqs = params->array.elems[2];
1562 if (monitor_cond_change_reqs->type != JSON_OBJECT) {
1563 error =
1564 ovsdb_syntax_error(NULL, NULL,
1565 "monitor-cond-change-requests must be object");
1566 goto error;
1567 }
1568
1569 SHASH_FOR_EACH (node, json_object(monitor_cond_change_reqs)) {
1570 const struct ovsdb_table *table;
1571 const struct json *mr_value;
1572 size_t i;
1573
1574 table = ovsdb_get_table(m->db, node->name);
1575 if (!table) {
1576 error = ovsdb_syntax_error(NULL, NULL,
1577 "no table named %s", node->name);
1578 goto error;
1579 }
1580 if (!ovsdb_monitor_table_exists(m->dbmon, table)) {
1581 error = ovsdb_syntax_error(NULL, NULL,
1582 "no table named %s in monitor session",
1583 node->name);
1584 goto error;
1585 }
1586
1587 mr_value = node->data;
1588 if (mr_value->type == JSON_ARRAY) {
1589 const struct json_array *array = &mr_value->array;
1590
1591 for (i = 0; i < array->n; i++) {
1592 error = ovsdb_jsonrpc_parse_monitor_cond_change_request(
1593 m, table, array->elems[i]);
1594 if (error) {
1595 goto error;
1596 }
1597 }
1598 } else {
1599 error = ovsdb_syntax_error(
1600 NULL, NULL,
1601 "table %s no monitor-cond-change JSON array",
1602 node->name);
1603 goto error;
1604 }
1605 }
1606
1607 if (changing_id) {
1608 hmap_remove(&s->monitors, &m->node);
1609 json_destroy(m->monitor_id);
1610 m->monitor_id = json_clone(new_monitor_id);
1611 hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0));
1612 }
1613
1614 /* Send the new update, if any, represents the difference from the old
1615 * condition and the new one. */
1616 struct json *update_json;
1617
1618 update_json = ovsdb_monitor_get_update(m->dbmon, false, true,
1619 m->condition, m->version, &m->change_set);
1620 if (update_json) {
1621 struct jsonrpc_msg *msg;
1622 struct json *p;
1623 if (m->version == OVSDB_MONITOR_V3) {
1624 struct json *json_last_id = json_string_create_nocopy(
1625 xasprintf(UUID_FMT,
1626 UUID_ARGS(ovsdb_monitor_get_last_txnid(
1627 m->dbmon))));
1628
1629 p = json_array_create_3(json_clone(m->monitor_id), json_last_id,
1630 update_json);
1631 } else {
1632 p = json_array_create_2(json_clone(m->monitor_id), update_json);
1633 }
1634 msg = ovsdb_jsonrpc_create_notify(m, p);
1635 jsonrpc_session_send(s->js, msg);
1636 }
1637
1638 return jsonrpc_create_reply(json_object_create(), request_id);
1639
1640 error:
1641 return jsonrpc_create_error(ovsdb_error_to_json_free(error), request_id);
1642 }
1643
1644 static struct jsonrpc_msg *
1645 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
1646 struct json_array *params,
1647 const struct json *request_id)
1648 {
1649 if (params->n != 1) {
1650 return jsonrpc_create_error(json_string_create("invalid parameters"),
1651 request_id);
1652 } else {
1653 struct ovsdb_jsonrpc_monitor *m;
1654
1655 m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
1656 if (!m) {
1657 return jsonrpc_create_error(json_string_create("unknown monitor"),
1658 request_id);
1659 } else {
1660 ovsdb_jsonrpc_monitor_destroy(m, false);
1661 return jsonrpc_create_reply(json_object_create(), request_id);
1662 }
1663 }
1664 }
1665
1666 /* Database 'db' is about to be removed from the database server. To prepare,
1667 * this function removes all references from monitors in 's' to 'db'. */
1668 static void
1669 ovsdb_jsonrpc_monitor_preremove_db(struct ovsdb_jsonrpc_session *s,
1670 struct ovsdb *db)
1671 {
1672 ovs_assert(db);
1673
1674 struct ovsdb_jsonrpc_monitor *m, *next;
1675 HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
1676 if (m->db == db) {
1677 ovsdb_jsonrpc_monitor_destroy(m, true);
1678 }
1679 }
1680 }
1681
1682 /* Cancels all monitors in 's'. */
1683 static void
1684 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
1685 {
1686 struct ovsdb_jsonrpc_monitor *m, *next;
1687
1688 HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
1689 ovsdb_jsonrpc_monitor_destroy(m, false);
1690 }
1691 }
1692
1693 static struct json *
1694 ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
1695 bool initial)
1696 {
1697
1698 if (!ovsdb_monitor_needs_flush(m->dbmon, m->change_set)) {
1699 return NULL;
1700 }
1701
1702 return ovsdb_monitor_get_update(m->dbmon, initial, false,
1703 m->condition, m->version, &m->change_set);
1704 }
1705
1706 static bool
1707 ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
1708 {
1709 struct ovsdb_jsonrpc_monitor *m;
1710
1711 HMAP_FOR_EACH (m, node, &s->monitors) {
1712 if (ovsdb_monitor_needs_flush(m->dbmon, m->change_set)) {
1713 return true;
1714 }
1715 }
1716
1717 return false;
1718 }
1719
1720 void
1721 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_jsonrpc_monitor *m,
1722 bool notify_cancellation)
1723 {
1724 if (notify_cancellation) {
1725 struct ovsdb_jsonrpc_session *s = m->session;
1726 if (jsonrpc_session_is_connected(s->js) && s->db_change_aware) {
1727 struct jsonrpc_msg *notify = jsonrpc_create_notify(
1728 "monitor_canceled",
1729 json_array_create_1(json_clone(m->monitor_id)));
1730 ovsdb_jsonrpc_session_send(s, notify);
1731 }
1732 }
1733
1734 json_destroy(m->monitor_id);
1735 hmap_remove(&m->session->monitors, &m->node);
1736 ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->change_set);
1737 ovsdb_monitor_session_condition_destroy(m->condition);
1738 free(m);
1739 }
1740
1741 static struct jsonrpc_msg *
1742 ovsdb_jsonrpc_create_notify(const struct ovsdb_jsonrpc_monitor *m,
1743 struct json *params)
1744 {
1745 const char *method;
1746
1747 switch(m->version) {
1748 case OVSDB_MONITOR_V1:
1749 method = "update";
1750 break;
1751 case OVSDB_MONITOR_V2:
1752 method = "update2";
1753 break;
1754 case OVSDB_MONITOR_V3:
1755 method = "update3";
1756 break;
1757 case OVSDB_MONITOR_VERSION_MAX:
1758 default:
1759 OVS_NOT_REACHED();
1760 }
1761
1762 return jsonrpc_create_notify(method, params);
1763 }
1764
1765 const struct uuid *
1766 ovsdb_jsonrpc_server_get_uuid(const struct ovsdb_jsonrpc_server *s)
1767 {
1768 return &s->up.uuid;
1769 }
1770
1771 static void
1772 ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
1773 {
1774 struct ovsdb_jsonrpc_monitor *m;
1775
1776 HMAP_FOR_EACH (m, node, &s->monitors) {
1777 struct json *json;
1778
1779 json = ovsdb_jsonrpc_monitor_compose_update(m, false);
1780 if (json) {
1781 struct jsonrpc_msg *msg;
1782 struct json *params;
1783 if (m->version == OVSDB_MONITOR_V3) {
1784 struct json *json_last_id = json_string_create_nocopy(
1785 xasprintf(UUID_FMT,
1786 UUID_ARGS(ovsdb_monitor_get_last_txnid(
1787 m->dbmon))));
1788 params = json_array_create_3(json_clone(m->monitor_id),
1789 json_last_id, json);
1790 } else {
1791 params = json_array_create_2(json_clone(m->monitor_id), json);
1792 }
1793
1794 msg = ovsdb_jsonrpc_create_notify(m, params);
1795 jsonrpc_session_send(s->js, msg);
1796 }
1797 }
1798 }
1799
1800 void
1801 ovsdb_jsonrpc_disable_monitor_cond(void)
1802 {
1803 /* Once disabled, it is not possible to re-enable it. */
1804 monitor_cond_enable__ = false;
1805 }