]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/jsonrpc-server.c
77f15d95cd4082b2a1509c7734dde37623f28504
[mirror_ovs.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <errno.h>
21
22 #include "bitmap.h"
23 #include "column.h"
24 #include "openvswitch/dynamic-string.h"
25 #include "monitor.h"
26 #include "openvswitch/json.h"
27 #include "jsonrpc.h"
28 #include "ovsdb-error.h"
29 #include "ovsdb-parser.h"
30 #include "ovsdb.h"
31 #include "condition.h"
32 #include "openvswitch/poll-loop.h"
33 #include "reconnect.h"
34 #include "row.h"
35 #include "server.h"
36 #include "simap.h"
37 #include "storage.h"
38 #include "stream.h"
39 #include "table.h"
40 #include "timeval.h"
41 #include "transaction.h"
42 #include "trigger.h"
43 #include "util.h"
44 #include "openvswitch/vlog.h"
45
46 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
47
48 struct ovsdb_jsonrpc_remote;
49 struct ovsdb_jsonrpc_session;
50
51 /* Set false to defeature monitor_cond, causing jsonrpc to respond to
52 * monitor_cond method with an error. */
53 static bool monitor_cond_enable__ = true;
54
55 /* Message rate-limiting. */
56 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
57
58 /* Sessions. */
59 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
60 struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *, bool);
61 static void ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote *,
62 struct ovsdb *);
63 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
64 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
65 static void ovsdb_jsonrpc_session_get_memory_usage_all(
66 const struct ovsdb_jsonrpc_remote *, struct simap *usage);
67 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
68 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *,
69 bool force,
70 const char *comment);
71 static void ovsdb_jsonrpc_session_set_all_options(
72 struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
73 static bool ovsdb_jsonrpc_active_session_get_status(
74 const struct ovsdb_jsonrpc_remote *,
75 struct ovsdb_jsonrpc_remote_status *);
76 static void ovsdb_jsonrpc_session_get_status(
77 const struct ovsdb_jsonrpc_session *,
78 struct ovsdb_jsonrpc_remote_status *);
79 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
80 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
81 static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
82 struct jsonrpc_msg *);
83
84 /* Triggers. */
85 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
86 struct ovsdb *,
87 struct jsonrpc_msg *request);
88 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
89 struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
90 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
91 static void ovsdb_jsonrpc_trigger_preremove_db(struct ovsdb_jsonrpc_session *,
92 struct ovsdb *);
93 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
94 static void ovsdb_jsonrpc_trigger_complete_done(
95 struct ovsdb_jsonrpc_session *);
96
97 /* Monitors. */
98 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
99 struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
100 enum ovsdb_monitor_version, const struct json *request_id);
101 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cond_change(
102 struct ovsdb_jsonrpc_session *s,
103 struct json *params,
104 const struct json *request_id);
105 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
106 struct ovsdb_jsonrpc_session *,
107 struct json_array *params,
108 const struct json *request_id);
109 static void ovsdb_jsonrpc_monitor_preremove_db(struct ovsdb_jsonrpc_session *,
110 struct ovsdb *);
111 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
112 static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
113 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
114 static struct json *ovsdb_jsonrpc_monitor_compose_update(
115 struct ovsdb_jsonrpc_monitor *monitor, bool initial);
116 static struct jsonrpc_msg * ovsdb_jsonrpc_create_notify(
117 const struct ovsdb_jsonrpc_monitor *m,
118 struct json *params);
119
120 \f
121 /* JSON-RPC database server. */
122
123 struct ovsdb_jsonrpc_server {
124 struct ovsdb_server up;
125 unsigned int n_sessions;
126 bool read_only; /* This server is does not accept any
127 transactions that can modify the database. */
128 struct shash remotes; /* Contains "struct ovsdb_jsonrpc_remote *"s. */
129 };
130
131 /* A configured remote. This is either a passive stream listener plus a list
132 * of the currently connected sessions, or a list of exactly one active
133 * session. */
134 struct ovsdb_jsonrpc_remote {
135 struct ovsdb_jsonrpc_server *server;
136 struct pstream *listener; /* Listener, if passive. */
137 struct ovs_list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */
138 uint8_t dscp;
139 bool read_only;
140 char *role;
141 };
142
143 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
144 struct ovsdb_jsonrpc_server *, const char *name,
145 const struct ovsdb_jsonrpc_options *options
146 );
147 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
148
149 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
150 *
151 * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
152 * which 'server' should provide access. */
153 struct ovsdb_jsonrpc_server *
154 ovsdb_jsonrpc_server_create(bool read_only)
155 {
156 struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
157 ovsdb_server_init(&server->up);
158 shash_init(&server->remotes);
159 server->read_only = read_only;
160 return server;
161 }
162
163 /* Adds 'db' to the set of databases served out by 'svr'. Returns true if
164 * successful, false if 'db''s name is the same as some database already in
165 * 'server'. */
166 bool
167 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server *svr, struct ovsdb *db)
168 {
169 ovsdb_jsonrpc_server_reconnect(
170 svr, false, xasprintf("adding %s database", db->name));
171 return ovsdb_server_add_db(&svr->up, db);
172 }
173
174 /* Removes 'db' from the set of databases served out by 'svr'.
175 *
176 * 'comment' should be a human-readable reason for removing the database, for
177 * use in log messages, or NULL to suppress logging. This function frees
178 * it. */
179 void
180 ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server *svr,
181 struct ovsdb *db, char *comment)
182 {
183 struct shash_node *node;
184 SHASH_FOR_EACH (node, &svr->remotes) {
185 struct ovsdb_jsonrpc_remote *remote = node->data;
186
187 ovsdb_jsonrpc_session_preremove_db(remote, db);
188 }
189
190 ovsdb_jsonrpc_server_reconnect(svr, false, comment);
191
192 ovsdb_server_remove_db(&svr->up, db);
193 }
194
195 void
196 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
197 {
198 struct shash_node *node, *next;
199
200 SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
201 ovsdb_jsonrpc_server_del_remote(node);
202 }
203 shash_destroy(&svr->remotes);
204 ovsdb_server_destroy(&svr->up);
205 free(svr);
206 }
207
208 struct ovsdb_jsonrpc_options *
209 ovsdb_jsonrpc_default_options(const char *target)
210 {
211 struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
212 options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
213 options->probe_interval = (stream_or_pstream_needs_probes(target)
214 ? RECONNECT_DEFAULT_PROBE_INTERVAL
215 : 0);
216 return options;
217 }
218
219 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
220 * options in the struct ovsdb_jsonrpc_options supplied as the data values.
221 *
222 * A remote is an active or passive stream connection method, e.g. "pssl:" or
223 * "tcp:1.2.3.4". */
224 void
225 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
226 const struct shash *new_remotes)
227 {
228 struct shash_node *node, *next;
229
230 SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
231 struct ovsdb_jsonrpc_remote *remote = node->data;
232 struct ovsdb_jsonrpc_options *options
233 = shash_find_data(new_remotes, node->name);
234
235 if (!options) {
236 VLOG_INFO("%s: remote deconfigured", node->name);
237 ovsdb_jsonrpc_server_del_remote(node);
238 } else if (options->dscp != remote->dscp) {
239 ovsdb_jsonrpc_server_del_remote(node);
240 }
241 }
242 SHASH_FOR_EACH (node, new_remotes) {
243 const struct ovsdb_jsonrpc_options *options = node->data;
244 struct ovsdb_jsonrpc_remote *remote;
245
246 remote = shash_find_data(&svr->remotes, node->name);
247 if (!remote) {
248 remote = ovsdb_jsonrpc_server_add_remote(svr, node->name, options);
249 if (!remote) {
250 continue;
251 }
252 }
253
254 ovsdb_jsonrpc_session_set_all_options(remote, options);
255 }
256 }
257
258 static struct ovsdb_jsonrpc_remote *
259 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
260 const char *name,
261 const struct ovsdb_jsonrpc_options *options)
262 {
263 struct ovsdb_jsonrpc_remote *remote;
264 struct pstream *listener;
265 int error;
266
267 error = jsonrpc_pstream_open(name, &listener, options->dscp);
268 if (error && error != EAFNOSUPPORT) {
269 VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, ovs_strerror(error));
270 return NULL;
271 }
272
273 remote = xmalloc(sizeof *remote);
274 remote->server = svr;
275 remote->listener = listener;
276 ovs_list_init(&remote->sessions);
277 remote->dscp = options->dscp;
278 remote->read_only = options->read_only;
279 remote->role = nullable_xstrdup(options->role);
280 shash_add(&svr->remotes, name, remote);
281
282 if (!listener) {
283 ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name, true),
284 svr->read_only || remote->read_only);
285 }
286 return remote;
287 }
288
289 static void
290 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
291 {
292 struct ovsdb_jsonrpc_remote *remote = node->data;
293
294 ovsdb_jsonrpc_session_close_all(remote);
295 pstream_close(remote->listener);
296 shash_delete(&remote->server->remotes, node);
297 free(remote->role);
298 free(remote);
299 }
300
301 /* Stores status information for the remote named 'target', which should have
302 * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
303 * into '*status'. On success returns true, on failure (if 'svr' doesn't have
304 * a remote named 'target' or if that remote is an outbound remote that has no
305 * active connections) returns false. On failure, 'status' will be zeroed.
306 */
307 bool
308 ovsdb_jsonrpc_server_get_remote_status(
309 const struct ovsdb_jsonrpc_server *svr, const char *target,
310 struct ovsdb_jsonrpc_remote_status *status)
311 {
312 const struct ovsdb_jsonrpc_remote *remote;
313
314 memset(status, 0, sizeof *status);
315
316 remote = shash_find_data(&svr->remotes, target);
317
318 if (!remote) {
319 return false;
320 }
321
322 if (remote->listener) {
323 status->bound_port = pstream_get_bound_port(remote->listener);
324 status->is_connected = !ovs_list_is_empty(&remote->sessions);
325 status->n_connections = ovs_list_size(&remote->sessions);
326 return true;
327 }
328
329 return ovsdb_jsonrpc_active_session_get_status(remote, status);
330 }
331
332 void
333 ovsdb_jsonrpc_server_free_remote_status(
334 struct ovsdb_jsonrpc_remote_status *status)
335 {
336 free(status->locks_held);
337 free(status->locks_waiting);
338 free(status->locks_lost);
339 }
340
341 /* Makes all of the JSON-RPC sessions managed by 'svr' disconnect. (They
342 * will then generally reconnect.). Uses 'comment' as a human-readable comment
343 * for logging (it may be NULL to suppress logging). Frees 'comment'.
344 *
345 * If 'force' is true, disconnects all sessions. Otherwise, disconnects only
346 * sesions that aren't database change aware. */
347 void
348 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr, bool force,
349 char *comment)
350 {
351 struct shash_node *node;
352
353 SHASH_FOR_EACH (node, &svr->remotes) {
354 struct ovsdb_jsonrpc_remote *remote = node->data;
355
356 ovsdb_jsonrpc_session_reconnect_all(remote, force, comment);
357 }
358
359 free(comment);
360 }
361
362 void
363 ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *svr,
364 bool read_only)
365 {
366 if (svr->read_only != read_only) {
367 svr->read_only = read_only;
368 ovsdb_jsonrpc_server_reconnect(svr, false,
369 xstrdup(read_only
370 ? "making server read-only"
371 : "making server read/write"));
372 }
373 }
374
375 void
376 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
377 {
378 struct shash_node *node;
379
380 SHASH_FOR_EACH (node, &svr->remotes) {
381 struct ovsdb_jsonrpc_remote *remote = node->data;
382
383 if (remote->listener) {
384 struct stream *stream;
385 int error;
386
387 error = pstream_accept(remote->listener, &stream);
388 if (!error) {
389 struct jsonrpc_session *js;
390 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
391 remote->dscp);
392 ovsdb_jsonrpc_session_create(remote, js, svr->read_only ||
393 remote->read_only);
394 } else if (error != EAGAIN) {
395 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
396 pstream_get_name(remote->listener),
397 ovs_strerror(error));
398 }
399 }
400
401 ovsdb_jsonrpc_session_run_all(remote);
402 }
403 }
404
405 void
406 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
407 {
408 struct shash_node *node;
409
410 SHASH_FOR_EACH (node, &svr->remotes) {
411 struct ovsdb_jsonrpc_remote *remote = node->data;
412
413 if (remote->listener) {
414 pstream_wait(remote->listener);
415 }
416
417 ovsdb_jsonrpc_session_wait_all(remote);
418 }
419 }
420
421 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
422 * memory_report(). */
423 void
424 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr,
425 struct simap *usage)
426 {
427 struct shash_node *node;
428
429 simap_increase(usage, "sessions", svr->n_sessions);
430 SHASH_FOR_EACH (node, &svr->remotes) {
431 struct ovsdb_jsonrpc_remote *remote = node->data;
432
433 ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage);
434 }
435 }
436 \f
437 /* JSON-RPC database server session. */
438
439 struct ovsdb_jsonrpc_session {
440 struct ovs_list node; /* Element in remote's sessions list. */
441 struct ovsdb_session up;
442 struct ovsdb_jsonrpc_remote *remote;
443
444 /* RFC 7047 does not contemplate how to alert clients to changes to the set
445 * of databases, e.g. databases that are added or removed while the
446 * database server is running. Traditionally, ovsdb-server disconnects all
447 * of its clients when this happens; a well-written client will reassess
448 * what is available from the server upon reconnection.
449 *
450 * OVS 2.9 introduces a way for clients to monitor changes to the databases
451 * being served, through the Database table in the _Server database that
452 * OVSDB adds in this version. ovsdb-server suppresses the connection
453 * close for clients that identify themselves as taking advantage of this
454 * mechanism. When this member is true, it indicates that the client
455 * requested such suppression. */
456 bool db_change_aware;
457
458 /* Triggers. */
459 struct hmap triggers; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
460
461 /* Monitors. */
462 struct hmap monitors; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
463
464 /* Network connectivity. */
465 struct jsonrpc_session *js; /* JSON-RPC session. */
466 unsigned int js_seqno; /* Last jsonrpc_session_get_seqno() value. */
467
468 /* Read only. */
469 bool read_only; /* When true, not allow to modify the
470 database. */
471 };
472
473 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
474 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
475 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
476 static void ovsdb_jsonrpc_session_get_memory_usage(
477 const struct ovsdb_jsonrpc_session *, struct simap *usage);
478 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
479 struct jsonrpc_msg *);
480 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
481 struct jsonrpc_msg *);
482
483 static struct ovsdb_jsonrpc_session *
484 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
485 struct jsonrpc_session *js, bool read_only)
486 {
487 struct ovsdb_jsonrpc_session *s;
488
489 s = xzalloc(sizeof *s);
490 ovsdb_session_init(&s->up, &remote->server->up);
491 s->remote = remote;
492 ovs_list_push_back(&remote->sessions, &s->node);
493 hmap_init(&s->triggers);
494 hmap_init(&s->monitors);
495 s->js = js;
496 s->js_seqno = jsonrpc_session_get_seqno(js);
497 s->read_only = read_only;
498
499 remote->server->n_sessions++;
500
501 return s;
502 }
503
504 /* Database 'db' is about to be removed from the database server. To prepare,
505 * this function removes all references to 'db' from 'remote'. */
506 static void
507 ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote *remote,
508 struct ovsdb *db)
509 {
510 struct ovsdb_jsonrpc_session *s;
511
512 LIST_FOR_EACH (s, node, &remote->sessions) {
513 ovsdb_jsonrpc_monitor_preremove_db(s, db);
514 ovsdb_jsonrpc_trigger_preremove_db(s, db);
515 }
516 }
517
518 static void
519 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
520 {
521 ovsdb_jsonrpc_monitor_remove_all(s);
522 ovsdb_jsonrpc_session_unlock_all(s);
523 ovsdb_jsonrpc_trigger_complete_all(s);
524
525 hmap_destroy(&s->monitors);
526 hmap_destroy(&s->triggers);
527
528 jsonrpc_session_close(s->js);
529 ovs_list_remove(&s->node);
530 s->remote->server->n_sessions--;
531 ovsdb_session_destroy(&s->up);
532 free(s);
533 }
534
535 static int
536 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
537 {
538 jsonrpc_session_run(s->js);
539 if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
540 s->js_seqno = jsonrpc_session_get_seqno(s->js);
541 ovsdb_jsonrpc_trigger_complete_all(s);
542 ovsdb_jsonrpc_monitor_remove_all(s);
543 ovsdb_jsonrpc_session_unlock_all(s);
544 }
545
546 ovsdb_jsonrpc_trigger_complete_done(s);
547
548 if (!jsonrpc_session_get_backlog(s->js)) {
549 struct jsonrpc_msg *msg;
550
551 ovsdb_jsonrpc_monitor_flush_all(s);
552
553 msg = jsonrpc_session_recv(s->js);
554 if (msg) {
555 if (msg->type == JSONRPC_REQUEST) {
556 ovsdb_jsonrpc_session_got_request(s, msg);
557 } else if (msg->type == JSONRPC_NOTIFY) {
558 ovsdb_jsonrpc_session_got_notify(s, msg);
559 } else {
560 VLOG_WARN("%s: received unexpected %s message",
561 jsonrpc_session_get_name(s->js),
562 jsonrpc_msg_type_to_string(msg->type));
563 jsonrpc_session_force_reconnect(s->js);
564 jsonrpc_msg_destroy(msg);
565 }
566 }
567 }
568 return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
569 }
570
571 static void
572 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
573 const struct ovsdb_jsonrpc_options *options)
574 {
575 jsonrpc_session_set_max_backoff(session->js, options->max_backoff);
576 jsonrpc_session_set_probe_interval(session->js, options->probe_interval);
577 jsonrpc_session_set_dscp(session->js, options->dscp);
578 }
579
580 static void
581 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
582 {
583 struct ovsdb_jsonrpc_session *s, *next;
584
585 LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
586 int error = ovsdb_jsonrpc_session_run(s);
587 if (error) {
588 ovsdb_jsonrpc_session_close(s);
589 }
590 }
591 }
592
593 static void
594 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
595 {
596 jsonrpc_session_wait(s->js);
597 if (!jsonrpc_session_get_backlog(s->js)) {
598 if (ovsdb_jsonrpc_monitor_needs_flush(s)) {
599 poll_immediate_wake();
600 } else {
601 jsonrpc_session_recv_wait(s->js);
602 }
603 }
604 }
605
606 static void
607 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
608 {
609 struct ovsdb_jsonrpc_session *s;
610
611 LIST_FOR_EACH (s, node, &remote->sessions) {
612 ovsdb_jsonrpc_session_wait(s);
613 }
614 }
615
616 static void
617 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
618 struct simap *usage)
619 {
620 simap_increase(usage, "triggers", hmap_count(&s->triggers));
621 simap_increase(usage, "backlog", jsonrpc_session_get_backlog(s->js));
622 }
623
624 static void
625 ovsdb_jsonrpc_session_get_memory_usage_all(
626 const struct ovsdb_jsonrpc_remote *remote,
627 struct simap *usage)
628 {
629 struct ovsdb_jsonrpc_session *s;
630
631 LIST_FOR_EACH (s, node, &remote->sessions) {
632 ovsdb_jsonrpc_session_get_memory_usage(s, usage);
633 }
634 }
635
636 static void
637 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
638 {
639 struct ovsdb_jsonrpc_session *s, *next;
640
641 LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
642 ovsdb_jsonrpc_session_close(s);
643 }
644 }
645
646 /* Makes all of the JSON-RPC sessions managed by 'remote' disconnect. (They
647 * will then generally reconnect.). 'comment' should be a human-readable
648 * explanation of the reason for disconnection, for use in log messages, or
649 * NULL to suppress logging.
650 *
651 * If 'force' is true, disconnects all sessions. Otherwise, disconnects only
652 * sesions that aren't database change aware. */
653 static void
654 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote,
655 bool force, const char *comment)
656 {
657 struct ovsdb_jsonrpc_session *s, *next;
658
659 LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
660 if (force || !s->db_change_aware) {
661 jsonrpc_session_force_reconnect(s->js);
662 if (comment && jsonrpc_session_is_connected(s->js)) {
663 VLOG_INFO("%s: disconnecting (%s)",
664 jsonrpc_session_get_name(s->js), comment);
665 }
666 if (!jsonrpc_session_is_alive(s->js)) {
667 ovsdb_jsonrpc_session_close(s);
668 }
669 }
670 }
671 }
672
673 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
674 * 'options'.
675 *
676 * (The dscp value can't be changed directly; the caller must instead close and
677 * re-open the session.) */
678 static void
679 ovsdb_jsonrpc_session_set_all_options(
680 struct ovsdb_jsonrpc_remote *remote,
681 const struct ovsdb_jsonrpc_options *options)
682 {
683 struct ovsdb_jsonrpc_session *s;
684
685 LIST_FOR_EACH (s, node, &remote->sessions) {
686 ovsdb_jsonrpc_session_set_options(s, options);
687 }
688 }
689
690 /* Sets the 'status' of for the 'remote' with an outgoing connection. */
691 static bool
692 ovsdb_jsonrpc_active_session_get_status(
693 const struct ovsdb_jsonrpc_remote *remote,
694 struct ovsdb_jsonrpc_remote_status *status)
695 {
696 const struct ovs_list *sessions = &remote->sessions;
697 const struct ovsdb_jsonrpc_session *s;
698
699 if (ovs_list_is_empty(sessions)) {
700 return false;
701 }
702
703 ovs_assert(ovs_list_is_singleton(sessions));
704 s = CONTAINER_OF(ovs_list_front(sessions), struct ovsdb_jsonrpc_session, node);
705 ovsdb_jsonrpc_session_get_status(s, status);
706 status->n_connections = 1;
707
708 return true;
709 }
710
711 static void
712 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_session *session,
713 struct ovsdb_jsonrpc_remote_status *status)
714 {
715 const struct ovsdb_jsonrpc_session *s = session;
716 const struct jsonrpc_session *js;
717 struct ovsdb_lock_waiter *waiter;
718 struct reconnect_stats rstats;
719 struct ds locks_held, locks_waiting, locks_lost;
720
721 js = s->js;
722
723 status->is_connected = jsonrpc_session_is_connected(js);
724 status->last_error = jsonrpc_session_get_status(js);
725
726 jsonrpc_session_get_reconnect_stats(js, &rstats);
727 status->state = rstats.state;
728 status->sec_since_connect = rstats.msec_since_connect == UINT_MAX
729 ? UINT_MAX : rstats.msec_since_connect / 1000;
730 status->sec_since_disconnect = rstats.msec_since_disconnect == UINT_MAX
731 ? UINT_MAX : rstats.msec_since_disconnect / 1000;
732
733 ds_init(&locks_held);
734 ds_init(&locks_waiting);
735 ds_init(&locks_lost);
736 HMAP_FOR_EACH (waiter, session_node, &s->up.waiters) {
737 struct ds *string;
738
739 string = (ovsdb_lock_waiter_is_owner(waiter) ? &locks_held
740 : waiter->mode == OVSDB_LOCK_WAIT ? &locks_waiting
741 : &locks_lost);
742 if (string->length) {
743 ds_put_char(string, ' ');
744 }
745 ds_put_cstr(string, waiter->lock_name);
746 }
747 status->locks_held = ds_steal_cstr(&locks_held);
748 status->locks_waiting = ds_steal_cstr(&locks_waiting);
749 status->locks_lost = ds_steal_cstr(&locks_lost);
750 }
751
752 /* Examines 'request' to determine the database to which it relates, and then
753 * searches 's' to find that database:
754 *
755 * - If successful, returns the database and sets '*replyp' to NULL.
756 *
757 * - If no such database exists, returns NULL and sets '*replyp' to an
758 * appropriate JSON-RPC error reply, owned by the caller. */
759 static struct ovsdb *
760 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session *s,
761 const struct jsonrpc_msg *request,
762 struct jsonrpc_msg **replyp)
763 {
764 struct json_array *params;
765 struct ovsdb_error *error;
766 const char *db_name;
767 struct ovsdb *db;
768
769 params = json_array(request->params);
770 if (!params->n || params->elems[0]->type != JSON_STRING) {
771 error = ovsdb_syntax_error(
772 request->params, NULL,
773 "%s request params must begin with <db-name>", request->method);
774 goto error;
775 }
776
777 db_name = params->elems[0]->string;
778 db = shash_find_data(&s->up.server->dbs, db_name);
779 if (!db) {
780 error = ovsdb_syntax_error(
781 request->params, "unknown database",
782 "%s request specifies unknown database %s",
783 request->method, db_name);
784 goto error;
785 }
786
787 if (!db->schema) {
788 error = ovsdb_error("database not available",
789 "%s request specifies database %s which is not "
790 "yet available because it has not completed "
791 "joining its cluster",
792 request->method, db_name);
793 goto error;
794 }
795
796 *replyp = NULL;
797 return db;
798
799 error:
800 *replyp = jsonrpc_create_error(ovsdb_error_to_json_free(error),
801 request->id);
802 return NULL;
803 }
804
805 static struct ovsdb_error *
806 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg *request,
807 const char **lock_namep)
808 {
809 const struct json_array *params;
810
811 params = json_array(request->params);
812 if (params->n != 1 || params->elems[0]->type != JSON_STRING ||
813 !ovsdb_parser_is_id(json_string(params->elems[0]))) {
814 *lock_namep = NULL;
815 return ovsdb_syntax_error(request->params, NULL,
816 "%s request params must be <id>",
817 request->method);
818 }
819
820 *lock_namep = json_string(params->elems[0]);
821 return NULL;
822 }
823
824 static void
825 ovsdb_jsonrpc_session_notify(struct ovsdb_session *session,
826 const char *lock_name,
827 const char *method)
828 {
829 struct ovsdb_jsonrpc_session *s;
830 struct json *params;
831
832 s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
833 params = json_array_create_1(json_string_create(lock_name));
834 ovsdb_jsonrpc_session_send(s, jsonrpc_create_notify(method, params));
835 }
836
837 static struct jsonrpc_msg *
838 jsonrpc_create_readonly_lock_error(const struct json *id)
839 {
840 return jsonrpc_create_error(json_string_create(
841 "lock and unlock methods not allowed,"
842 " DB server is read only."), id);
843 }
844
845 static struct jsonrpc_msg *
846 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
847 struct jsonrpc_msg *request,
848 enum ovsdb_lock_mode mode)
849 {
850 struct ovsdb_lock_waiter *waiter;
851 struct ovsdb_error *error;
852 struct ovsdb_session *victim;
853 const char *lock_name;
854 struct json *result;
855
856 if (s->read_only) {
857 return jsonrpc_create_readonly_lock_error(request->id);
858 }
859
860 error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
861 if (error) {
862 goto error;
863 }
864
865 /* Report error if this session has issued a "lock" or "steal" without a
866 * matching "unlock" for this lock. */
867 waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
868 if (waiter) {
869 error = ovsdb_syntax_error(
870 request->params, NULL,
871 "must issue \"unlock\" before new \"%s\"", request->method);
872 goto error;
873 }
874
875 /* Get the lock, add us as a waiter. */
876 waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode,
877 &victim);
878 if (victim) {
879 ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen");
880 }
881
882 result = json_object_create();
883 json_object_put(result, "locked",
884 json_boolean_create(ovsdb_lock_waiter_is_owner(waiter)));
885
886 return jsonrpc_create_reply(result, request->id);
887
888 error:
889 return jsonrpc_create_error(ovsdb_error_to_json_free(error), request->id);
890 }
891
892 static void
893 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *s)
894 {
895 struct ovsdb_lock_waiter *waiter, *next;
896
897 HMAP_FOR_EACH_SAFE (waiter, next, session_node, &s->up.waiters) {
898 ovsdb_jsonrpc_session_unlock__(waiter);
899 }
900 }
901
902 static void
903 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *waiter)
904 {
905 struct ovsdb_lock *lock = waiter->lock;
906
907 if (lock) {
908 struct ovsdb_session *new_owner = ovsdb_lock_waiter_remove(waiter);
909 if (new_owner) {
910 ovsdb_jsonrpc_session_notify(new_owner, lock->name, "locked");
911 } else {
912 /* ovsdb_server_lock() might have freed 'lock'. */
913 }
914 }
915
916 ovsdb_lock_waiter_destroy(waiter);
917 }
918
919 static struct jsonrpc_msg *
920 syntax_error_reply(const struct jsonrpc_msg *request, const char *details)
921 {
922 struct ovsdb_error *error = ovsdb_syntax_error(
923 request->params, NULL, "%s: %s", request->method, details);
924 struct jsonrpc_msg *msg = jsonrpc_create_error(ovsdb_error_to_json(error),
925 request->id);
926 ovsdb_error_destroy(error);
927 return msg;
928 }
929
930 static struct jsonrpc_msg *
931 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session *s,
932 struct jsonrpc_msg *request)
933 {
934 struct ovsdb_lock_waiter *waiter;
935 struct ovsdb_error *error;
936 const char *lock_name;
937
938 if (s->read_only) {
939 return jsonrpc_create_readonly_lock_error(request->id);
940 }
941
942 error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
943 if (error) {
944 return jsonrpc_create_error(ovsdb_error_to_json_free(error),
945 request->id);
946 }
947
948 /* Report error if this session has not issued a "lock" or "steal" for this
949 * lock. */
950 waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
951 if (!waiter) {
952 return syntax_error_reply(request,
953 "\"unlock\" without \"lock\" or \"steal\"");
954 }
955
956 ovsdb_jsonrpc_session_unlock__(waiter);
957
958 return jsonrpc_create_reply(json_object_create(), request->id);
959 }
960
961 static struct jsonrpc_msg *
962 ovsdb_jsonrpc_session_set_db_change_aware(struct ovsdb_jsonrpc_session *s,
963 const struct jsonrpc_msg *request)
964 {
965 const struct json_array *params = json_array(request->params);
966 if (params->n != 1
967 || (params->elems[0]->type != JSON_TRUE &&
968 params->elems[0]->type != JSON_FALSE)) {
969 return syntax_error_reply(request, "true or false parameter expected");
970 }
971
972 s->db_change_aware = json_boolean(params->elems[0]);
973 return jsonrpc_create_reply(json_object_create(), request->id);
974 }
975
976 static void
977 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
978 struct jsonrpc_msg *request)
979 {
980 struct jsonrpc_msg *reply;
981
982 if (!strcmp(request->method, "transact") ||
983 !strcmp(request->method, "convert")) {
984 struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
985 if (!reply) {
986 ovsdb_jsonrpc_trigger_create(s, db, request);
987 }
988 } else if (!strcmp(request->method, "monitor") ||
989 (monitor_cond_enable__ && !strcmp(request->method,
990 "monitor_cond"))) {
991 struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
992 if (!reply) {
993 int l = strlen(request->method) - strlen("monitor");
994 enum ovsdb_monitor_version version = l ? OVSDB_MONITOR_V2
995 : OVSDB_MONITOR_V1;
996 reply = ovsdb_jsonrpc_monitor_create(s, db, request->params,
997 version, request->id);
998 }
999 } else if (!strcmp(request->method, "monitor_cond_change")) {
1000 reply = ovsdb_jsonrpc_monitor_cond_change(s, request->params,
1001 request->id);
1002 } else if (!strcmp(request->method, "monitor_cancel")) {
1003 reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
1004 request->id);
1005 } else if (!strcmp(request->method, "get_schema")) {
1006 struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
1007 if (!reply) {
1008 reply = jsonrpc_create_reply(ovsdb_schema_to_json(db->schema),
1009 request->id);
1010 }
1011 } else if (!strcmp(request->method, "list_dbs")) {
1012 size_t n_dbs = shash_count(&s->up.server->dbs);
1013 struct shash_node *node;
1014 struct json **dbs;
1015 size_t i;
1016
1017 dbs = xmalloc(n_dbs * sizeof *dbs);
1018 i = 0;
1019 SHASH_FOR_EACH (node, &s->up.server->dbs) {
1020 dbs[i++] = json_string_create(node->name);
1021 }
1022 reply = jsonrpc_create_reply(json_array_create(dbs, n_dbs),
1023 request->id);
1024 } else if (!strcmp(request->method, "get_server_id")) {
1025 const struct uuid *uuid = &s->up.server->uuid;
1026 struct json *result;
1027
1028 result = json_string_create_nocopy(xasprintf(UUID_FMT,
1029 UUID_ARGS(uuid)));
1030 reply = jsonrpc_create_reply(result, request->id);
1031 } else if (!strcmp(request->method, "lock")) {
1032 reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_WAIT);
1033 } else if (!strcmp(request->method, "steal")) {
1034 reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_STEAL);
1035 } else if (!strcmp(request->method, "unlock")) {
1036 reply = ovsdb_jsonrpc_session_unlock(s, request);
1037 } else if (!strcmp(request->method, "set_db_change_aware")) {
1038 reply = ovsdb_jsonrpc_session_set_db_change_aware(s, request);
1039 } else if (!strcmp(request->method, "echo")) {
1040 reply = jsonrpc_create_reply(json_clone(request->params), request->id);
1041 } else {
1042 reply = jsonrpc_create_error(json_string_create("unknown method"),
1043 request->id);
1044 }
1045
1046 if (reply) {
1047 jsonrpc_msg_destroy(request);
1048 ovsdb_jsonrpc_session_send(s, reply);
1049 }
1050 }
1051
1052 static void
1053 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
1054 {
1055 if (json_array(request->params)->n == 1) {
1056 struct ovsdb_jsonrpc_trigger *t;
1057 struct json *id;
1058
1059 id = request->params->array.elems[0];
1060 t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
1061 if (t) {
1062 ovsdb_jsonrpc_trigger_complete(t);
1063 }
1064 }
1065 }
1066
1067 static void
1068 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
1069 struct jsonrpc_msg *request)
1070 {
1071 if (!strcmp(request->method, "cancel")) {
1072 execute_cancel(s, request);
1073 }
1074 jsonrpc_msg_destroy(request);
1075 }
1076
1077 static void
1078 ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *s,
1079 struct jsonrpc_msg *msg)
1080 {
1081 ovsdb_jsonrpc_monitor_flush_all(s);
1082 jsonrpc_session_send(s->js, msg);
1083 }
1084 \f
1085 /* JSON-RPC database server triggers.
1086 *
1087 * (Every transaction is treated as a trigger even if it doesn't actually have
1088 * any "wait" operations.) */
1089
1090 struct ovsdb_jsonrpc_trigger {
1091 struct ovsdb_trigger trigger;
1092 struct hmap_node hmap_node; /* In session's "triggers" hmap. */
1093 struct json *id;
1094 };
1095
1096 static void
1097 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
1098 struct jsonrpc_msg *request)
1099 {
1100 /* Check for duplicate ID. */
1101 size_t hash = json_hash(request->id, 0);
1102 struct ovsdb_jsonrpc_trigger *t
1103 = ovsdb_jsonrpc_trigger_find(s, request->id, hash);
1104 if (t) {
1105 ovsdb_jsonrpc_session_send(
1106 s, syntax_error_reply(request, "duplicate request ID"));
1107 jsonrpc_msg_destroy(request);
1108 return;
1109 }
1110
1111 /* Insert into trigger table. */
1112 t = xmalloc(sizeof *t);
1113 bool disconnect_all = ovsdb_trigger_init(
1114 &s->up, db, &t->trigger, request, time_msec(), s->read_only,
1115 s->remote->role, jsonrpc_session_get_id(s->js));
1116 t->id = json_clone(request->id);
1117 hmap_insert(&s->triggers, &t->hmap_node, hash);
1118
1119 /* Complete early if possible. */
1120 if (ovsdb_trigger_is_complete(&t->trigger)) {
1121 ovsdb_jsonrpc_trigger_complete(t);
1122 }
1123
1124 if (disconnect_all) {
1125 /* The message below is currently the only reason to disconnect all
1126 * clients. */
1127 ovsdb_jsonrpc_server_reconnect(s->remote->server, false,
1128 xasprintf("committed %s database "
1129 "schema conversion",
1130 db->name));
1131 }
1132 }
1133
1134 static struct ovsdb_jsonrpc_trigger *
1135 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
1136 const struct json *id, size_t hash)
1137 {
1138 struct ovsdb_jsonrpc_trigger *t;
1139
1140 HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &s->triggers) {
1141 if (json_equal(t->id, id)) {
1142 return t;
1143 }
1144 }
1145
1146 return NULL;
1147 }
1148
1149 static void
1150 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
1151 {
1152 struct ovsdb_jsonrpc_session *s;
1153
1154 s = CONTAINER_OF(t->trigger.session, struct ovsdb_jsonrpc_session, up);
1155
1156 if (jsonrpc_session_is_connected(s->js)) {
1157 bool complete = ovsdb_trigger_is_complete(&t->trigger);
1158 if (s->db_change_aware && !complete) {
1159 ovsdb_trigger_cancel(&t->trigger, "closing JSON-RPC session");
1160 complete = true;
1161 }
1162 if (complete) {
1163 struct jsonrpc_msg *reply = ovsdb_trigger_steal_reply(&t->trigger);
1164 ovsdb_jsonrpc_session_send(s, reply);
1165 }
1166 }
1167
1168 json_destroy(t->id);
1169 ovsdb_trigger_destroy(&t->trigger);
1170 hmap_remove(&s->triggers, &t->hmap_node);
1171 free(t);
1172 }
1173
1174 static void
1175 ovsdb_jsonrpc_trigger_remove__(struct ovsdb_jsonrpc_session *s,
1176 struct ovsdb *db)
1177 {
1178 struct ovsdb_jsonrpc_trigger *t, *next;
1179 HMAP_FOR_EACH_SAFE (t, next, hmap_node, &s->triggers) {
1180 if (!db || t->trigger.db == db) {
1181 ovsdb_jsonrpc_trigger_complete(t);
1182 }
1183 }
1184 }
1185
1186 /* Database 'db' is about to be removed from the database server. To prepare,
1187 * this function removes all references from triggers in 's' to 'db'. */
1188 static void
1189 ovsdb_jsonrpc_trigger_preremove_db(struct ovsdb_jsonrpc_session *s,
1190 struct ovsdb *db)
1191 {
1192 ovs_assert(db);
1193 ovsdb_jsonrpc_trigger_remove__(s, db);
1194 }
1195
1196 /* Removes all triggers from 's'. */
1197 static void
1198 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
1199 {
1200 ovsdb_jsonrpc_trigger_remove__(s, NULL);
1201 }
1202
1203 static void
1204 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
1205 {
1206 struct ovsdb_jsonrpc_trigger *trigger, *next;
1207 LIST_FOR_EACH_SAFE (trigger, next, trigger.node, &s->up.completions) {
1208 ovsdb_jsonrpc_trigger_complete(trigger);
1209 }
1210 }
1211 \f
1212 /* Jsonrpc front end monitor. */
1213 struct ovsdb_jsonrpc_monitor {
1214 struct hmap_node node; /* In ovsdb_jsonrpc_session's "monitors". */
1215 struct ovsdb_jsonrpc_session *session;
1216 struct ovsdb *db;
1217 struct json *monitor_id;
1218 struct ovsdb_monitor *dbmon;
1219 uint64_t unflushed; /* The first transaction that has not been
1220 flushed to the jsonrpc remote client. */
1221 enum ovsdb_monitor_version version;
1222 struct ovsdb_monitor_session_condition *condition;/* Session's condition */
1223 };
1224
1225 static struct ovsdb_jsonrpc_monitor *
1226 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
1227 const struct json *monitor_id)
1228 {
1229 struct ovsdb_jsonrpc_monitor *m;
1230
1231 HMAP_FOR_EACH_WITH_HASH (m, node, json_hash(monitor_id, 0), &s->monitors) {
1232 if (json_equal(m->monitor_id, monitor_id)) {
1233 return m;
1234 }
1235 }
1236
1237 return NULL;
1238 }
1239
1240 static bool
1241 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
1242 {
1243 const struct json *json;
1244
1245 json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
1246 return json ? json_boolean(json) : default_value;
1247 }
1248
1249 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
1250 ovsdb_jsonrpc_parse_monitor_request(
1251 struct ovsdb_monitor *dbmon,
1252 const struct ovsdb_table *table,
1253 struct ovsdb_monitor_session_condition *cond,
1254 const struct json *monitor_request)
1255 {
1256 const struct ovsdb_table_schema *ts = table->schema;
1257 enum ovsdb_monitor_selection select;
1258 const struct json *columns, *select_json, *where = NULL;
1259 struct ovsdb_parser parser;
1260 struct ovsdb_error *error;
1261
1262 ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
1263 if (cond) {
1264 where = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
1265 }
1266 columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1267
1268 select_json = ovsdb_parser_member(&parser, "select",
1269 OP_OBJECT | OP_OPTIONAL);
1270
1271 error = ovsdb_parser_finish(&parser);
1272 if (error) {
1273 return error;
1274 }
1275
1276 if (select_json) {
1277 select = 0;
1278 ovsdb_parser_init(&parser, select_json, "table %s select", ts->name);
1279 if (parse_bool(&parser, "initial", true)) {
1280 select |= OJMS_INITIAL;
1281 }
1282 if (parse_bool(&parser, "insert", true)) {
1283 select |= OJMS_INSERT;
1284 }
1285 if (parse_bool(&parser, "delete", true)) {
1286 select |= OJMS_DELETE;
1287 }
1288 if (parse_bool(&parser, "modify", true)) {
1289 select |= OJMS_MODIFY;
1290 }
1291 error = ovsdb_parser_finish(&parser);
1292 if (error) {
1293 return error;
1294 }
1295 } else {
1296 select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
1297 }
1298
1299 ovsdb_monitor_table_add_select(dbmon, table, select);
1300 if (columns) {
1301 size_t i;
1302
1303 if (columns->type != JSON_ARRAY) {
1304 return ovsdb_syntax_error(columns, NULL,
1305 "array of column names expected");
1306 }
1307
1308 for (i = 0; i < columns->array.n; i++) {
1309 const struct ovsdb_column *column;
1310 const char *s;
1311
1312 if (columns->array.elems[i]->type != JSON_STRING) {
1313 return ovsdb_syntax_error(columns, NULL,
1314 "array of column names expected");
1315 }
1316
1317 s = columns->array.elems[i]->string;
1318 column = shash_find_data(&table->schema->columns, s);
1319 if (!column) {
1320 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
1321 "column name", s);
1322 }
1323 if (ovsdb_monitor_add_column(dbmon, table, column,
1324 select, true)) {
1325 return ovsdb_syntax_error(columns, NULL, "column %s "
1326 "mentioned more than once",
1327 column->name);
1328 }
1329 }
1330 } else {
1331 struct shash_node *node;
1332
1333 SHASH_FOR_EACH (node, &ts->columns) {
1334 const struct ovsdb_column *column = node->data;
1335 if (column->index != OVSDB_COL_UUID) {
1336 if (ovsdb_monitor_add_column(dbmon, table, column,
1337 select, true)) {
1338 return ovsdb_syntax_error(columns, NULL, "column %s "
1339 "mentioned more than once",
1340 column->name);
1341 }
1342 }
1343 }
1344 }
1345 if (cond) {
1346 error = ovsdb_monitor_table_condition_create(cond, table, where);
1347 if (error) {
1348 return error;
1349 }
1350 }
1351
1352 return NULL;
1353 }
1354
1355 static struct jsonrpc_msg *
1356 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
1357 struct json *params,
1358 enum ovsdb_monitor_version version,
1359 const struct json *request_id)
1360 {
1361 struct ovsdb_jsonrpc_monitor *m = NULL;
1362 struct ovsdb_monitor *dbmon = NULL;
1363 struct json *monitor_id, *monitor_requests;
1364 struct ovsdb_error *error = NULL;
1365 struct shash_node *node;
1366 struct json *json;
1367
1368 if (json_array(params)->n != 3) {
1369 error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1370 goto error;
1371 }
1372 monitor_id = params->array.elems[1];
1373 monitor_requests = params->array.elems[2];
1374 if (monitor_requests->type != JSON_OBJECT) {
1375 error = ovsdb_syntax_error(monitor_requests, NULL,
1376 "monitor-requests must be object");
1377 goto error;
1378 }
1379
1380 if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
1381 error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
1382 goto error;
1383 }
1384
1385 m = xzalloc(sizeof *m);
1386 m->session = s;
1387 m->db = db;
1388 m->dbmon = ovsdb_monitor_create(db, m);
1389 if (version == OVSDB_MONITOR_V2) {
1390 m->condition = ovsdb_monitor_session_condition_create();
1391 }
1392 m->unflushed = 0;
1393 m->version = version;
1394 hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
1395 m->monitor_id = json_clone(monitor_id);
1396
1397 SHASH_FOR_EACH (node, json_object(monitor_requests)) {
1398 const struct ovsdb_table *table;
1399 const struct json *mr_value;
1400 size_t i;
1401
1402 table = ovsdb_get_table(m->db, node->name);
1403 if (!table) {
1404 error = ovsdb_syntax_error(NULL, NULL,
1405 "no table named %s", node->name);
1406 goto error;
1407 }
1408
1409 ovsdb_monitor_add_table(m->dbmon, table);
1410
1411 /* Parse columns. */
1412 mr_value = node->data;
1413 if (mr_value->type == JSON_ARRAY) {
1414 const struct json_array *array = &mr_value->array;
1415
1416 for (i = 0; i < array->n; i++) {
1417 error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
1418 table,
1419 m->condition,
1420 array->elems[i]);
1421 if (error) {
1422 goto error;
1423 }
1424 }
1425 } else {
1426 error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
1427 table,
1428 m->condition,
1429 mr_value);
1430 if (error) {
1431 goto error;
1432 }
1433 }
1434 }
1435
1436 dbmon = ovsdb_monitor_add(m->dbmon);
1437 if (dbmon != m->dbmon) {
1438 /* Found an exisiting dbmon, reuse the current one. */
1439 ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
1440 ovsdb_monitor_add_jsonrpc_monitor(dbmon, m);
1441 m->dbmon = dbmon;
1442 }
1443
1444 /* Only now we can bind session's condition to ovsdb_monitor */
1445 if (m->condition) {
1446 ovsdb_monitor_condition_bind(m->dbmon, m->condition);
1447 }
1448
1449 ovsdb_monitor_get_initial(m->dbmon);
1450 json = ovsdb_jsonrpc_monitor_compose_update(m, true);
1451 json = json ? json : json_object_create();
1452 return jsonrpc_create_reply(json, request_id);
1453
1454 error:
1455 if (m) {
1456 ovsdb_jsonrpc_monitor_destroy(m, false);
1457 }
1458
1459 return jsonrpc_create_error(ovsdb_error_to_json_free(error), request_id);
1460 }
1461
1462 static struct ovsdb_error *
1463 ovsdb_jsonrpc_parse_monitor_cond_change_request(
1464 struct ovsdb_jsonrpc_monitor *m,
1465 const struct ovsdb_table *table,
1466 const struct json *cond_change_req)
1467 {
1468 const struct ovsdb_table_schema *ts = table->schema;
1469 const struct json *condition, *columns;
1470 struct ovsdb_parser parser;
1471 struct ovsdb_error *error;
1472
1473 ovsdb_parser_init(&parser, cond_change_req, "table %s", ts->name);
1474 columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1475 condition = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
1476
1477 error = ovsdb_parser_finish(&parser);
1478 if (error) {
1479 return error;
1480 }
1481
1482 if (columns) {
1483 error = ovsdb_syntax_error(cond_change_req, NULL, "changing columns "
1484 "is unsupported");
1485 return error;
1486 }
1487 error = ovsdb_monitor_table_condition_update(m->dbmon, m->condition, table,
1488 condition);
1489
1490 return error;
1491 }
1492
1493 static struct jsonrpc_msg *
1494 ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session *s,
1495 struct json *params,
1496 const struct json *request_id)
1497 {
1498 struct ovsdb_error *error;
1499 struct ovsdb_jsonrpc_monitor *m;
1500 struct json *monitor_cond_change_reqs;
1501 struct shash_node *node;
1502
1503 if (json_array(params)->n != 3) {
1504 error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1505 goto error;
1506 }
1507
1508 m = ovsdb_jsonrpc_monitor_find(s, params->array.elems[0]);
1509 if (!m) {
1510 error = ovsdb_syntax_error(params->array.elems[0], NULL,
1511 "unknown monitor session");
1512 goto error;
1513 }
1514
1515 const struct json *new_monitor_id = params->array.elems[1];
1516 bool changing_id = !json_equal(m->monitor_id, new_monitor_id);
1517 if (changing_id && ovsdb_jsonrpc_monitor_find(s, new_monitor_id)) {
1518 error = ovsdb_syntax_error(new_monitor_id, NULL,
1519 "duplicate monitor ID");
1520 goto error;
1521 }
1522
1523 monitor_cond_change_reqs = params->array.elems[2];
1524 if (monitor_cond_change_reqs->type != JSON_OBJECT) {
1525 error =
1526 ovsdb_syntax_error(NULL, NULL,
1527 "monitor-cond-change-requests must be object");
1528 goto error;
1529 }
1530
1531 SHASH_FOR_EACH (node, json_object(monitor_cond_change_reqs)) {
1532 const struct ovsdb_table *table;
1533 const struct json *mr_value;
1534 size_t i;
1535
1536 table = ovsdb_get_table(m->db, node->name);
1537 if (!table) {
1538 error = ovsdb_syntax_error(NULL, NULL,
1539 "no table named %s", node->name);
1540 goto error;
1541 }
1542 if (!ovsdb_monitor_table_exists(m->dbmon, table)) {
1543 error = ovsdb_syntax_error(NULL, NULL,
1544 "no table named %s in monitor session",
1545 node->name);
1546 goto error;
1547 }
1548
1549 mr_value = node->data;
1550 if (mr_value->type == JSON_ARRAY) {
1551 const struct json_array *array = &mr_value->array;
1552
1553 for (i = 0; i < array->n; i++) {
1554 error = ovsdb_jsonrpc_parse_monitor_cond_change_request(
1555 m, table, array->elems[i]);
1556 if (error) {
1557 goto error;
1558 }
1559 }
1560 } else {
1561 error = ovsdb_syntax_error(
1562 NULL, NULL,
1563 "table %s no monitor-cond-change JSON array",
1564 node->name);
1565 goto error;
1566 }
1567 }
1568
1569 if (changing_id) {
1570 hmap_remove(&s->monitors, &m->node);
1571 json_destroy(m->monitor_id);
1572 m->monitor_id = json_clone(new_monitor_id);
1573 hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0));
1574 }
1575
1576 /* Send the new update, if any, represents the difference from the old
1577 * condition and the new one. */
1578 struct json *update_json;
1579
1580 update_json = ovsdb_monitor_get_update(m->dbmon, false, true,
1581 m->condition, m->version, &m->unflushed);
1582 if (update_json) {
1583 struct jsonrpc_msg *msg;
1584 struct json *p;
1585
1586 p = json_array_create_2(json_clone(m->monitor_id), update_json);
1587 msg = ovsdb_jsonrpc_create_notify(m, p);
1588 jsonrpc_session_send(s->js, msg);
1589 }
1590
1591 return jsonrpc_create_reply(json_object_create(), request_id);
1592
1593 error:
1594 return jsonrpc_create_error(ovsdb_error_to_json_free(error), request_id);
1595 }
1596
1597 static struct jsonrpc_msg *
1598 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
1599 struct json_array *params,
1600 const struct json *request_id)
1601 {
1602 if (params->n != 1) {
1603 return jsonrpc_create_error(json_string_create("invalid parameters"),
1604 request_id);
1605 } else {
1606 struct ovsdb_jsonrpc_monitor *m;
1607
1608 m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
1609 if (!m) {
1610 return jsonrpc_create_error(json_string_create("unknown monitor"),
1611 request_id);
1612 } else {
1613 ovsdb_jsonrpc_monitor_destroy(m, false);
1614 return jsonrpc_create_reply(json_object_create(), request_id);
1615 }
1616 }
1617 }
1618
1619 /* Database 'db' is about to be removed from the database server. To prepare,
1620 * this function removes all references from monitors in 's' to 'db'. */
1621 static void
1622 ovsdb_jsonrpc_monitor_preremove_db(struct ovsdb_jsonrpc_session *s,
1623 struct ovsdb *db)
1624 {
1625 ovs_assert(db);
1626
1627 struct ovsdb_jsonrpc_monitor *m, *next;
1628 HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
1629 if (m->db == db) {
1630 ovsdb_jsonrpc_monitor_destroy(m, true);
1631 }
1632 }
1633 }
1634
1635 /* Cancels all monitors in 's'. */
1636 static void
1637 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
1638 {
1639 struct ovsdb_jsonrpc_monitor *m, *next;
1640
1641 HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
1642 ovsdb_jsonrpc_monitor_destroy(m, false);
1643 }
1644 }
1645
1646 static struct json *
1647 ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
1648 bool initial)
1649 {
1650
1651 if (!ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
1652 return NULL;
1653 }
1654
1655 return ovsdb_monitor_get_update(m->dbmon, initial, false,
1656 m->condition, m->version, &m->unflushed);
1657 }
1658
1659 static bool
1660 ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
1661 {
1662 struct ovsdb_jsonrpc_monitor *m;
1663
1664 HMAP_FOR_EACH (m, node, &s->monitors) {
1665 if (ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
1666 return true;
1667 }
1668 }
1669
1670 return false;
1671 }
1672
1673 void
1674 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_jsonrpc_monitor *m,
1675 bool notify_cancellation)
1676 {
1677 if (notify_cancellation) {
1678 struct ovsdb_jsonrpc_session *s = m->session;
1679 if (jsonrpc_session_is_connected(s->js) && s->db_change_aware) {
1680 struct jsonrpc_msg *notify = jsonrpc_create_notify(
1681 "monitor_canceled",
1682 json_array_create_1(json_clone(m->monitor_id)));
1683 ovsdb_jsonrpc_session_send(s, notify);
1684 }
1685 }
1686
1687 json_destroy(m->monitor_id);
1688 hmap_remove(&m->session->monitors, &m->node);
1689 ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
1690 ovsdb_monitor_session_condition_destroy(m->condition);
1691 free(m);
1692 }
1693
1694 static struct jsonrpc_msg *
1695 ovsdb_jsonrpc_create_notify(const struct ovsdb_jsonrpc_monitor *m,
1696 struct json *params)
1697 {
1698 const char *method;
1699
1700 switch(m->version) {
1701 case OVSDB_MONITOR_V1:
1702 method = "update";
1703 break;
1704 case OVSDB_MONITOR_V2:
1705 method = "update2";
1706 break;
1707 case OVSDB_MONITOR_VERSION_MAX:
1708 default:
1709 OVS_NOT_REACHED();
1710 }
1711
1712 return jsonrpc_create_notify(method, params);
1713 }
1714
1715 const struct uuid *
1716 ovsdb_jsonrpc_server_get_uuid(const struct ovsdb_jsonrpc_server *s)
1717 {
1718 return &s->up.uuid;
1719 }
1720
1721 static void
1722 ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
1723 {
1724 struct ovsdb_jsonrpc_monitor *m;
1725
1726 HMAP_FOR_EACH (m, node, &s->monitors) {
1727 struct json *json;
1728
1729 json = ovsdb_jsonrpc_monitor_compose_update(m, false);
1730 if (json) {
1731 struct jsonrpc_msg *msg;
1732 struct json *params;
1733
1734 params = json_array_create_2(json_clone(m->monitor_id), json);
1735 msg = ovsdb_jsonrpc_create_notify(m, params);
1736 jsonrpc_session_send(s->js, msg);
1737 }
1738 }
1739 }
1740
1741 void
1742 ovsdb_jsonrpc_disable_monitor_cond(void)
1743 {
1744 /* Once disabled, it is not possible to re-enable it. */
1745 monitor_cond_enable__ = false;
1746 }