]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/jsonrpc-server.c
Windows: Extend support for binaries which allow detach
[mirror_ovs.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <errno.h>
21
22 #include "bitmap.h"
23 #include "column.h"
24 #include "openvswitch/dynamic-string.h"
25 #include "monitor.h"
26 #include "openvswitch/json.h"
27 #include "jsonrpc.h"
28 #include "ovsdb-error.h"
29 #include "ovsdb-parser.h"
30 #include "ovsdb.h"
31 #include "condition.h"
32 #include "poll-loop.h"
33 #include "reconnect.h"
34 #include "row.h"
35 #include "server.h"
36 #include "simap.h"
37 #include "stream.h"
38 #include "table.h"
39 #include "timeval.h"
40 #include "transaction.h"
41 #include "trigger.h"
42 #include "util.h"
43 #include "openvswitch/vlog.h"
44
45 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
46
47 struct ovsdb_jsonrpc_remote;
48 struct ovsdb_jsonrpc_session;
49
50 /* Set false to defeature monitor_cond, causing jsonrpc to respond to
51 * monitor_cond method with an error. */
52 static bool monitor_cond_enable__ = true;
53
54 /* Message rate-limiting. */
55 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
56
57 /* Sessions. */
58 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
59 struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *, bool);
60 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
61 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
62 static void ovsdb_jsonrpc_session_get_memory_usage_all(
63 const struct ovsdb_jsonrpc_remote *, struct simap *usage);
64 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
65 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
66 static void ovsdb_jsonrpc_session_set_all_options(
67 struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
68 static bool ovsdb_jsonrpc_active_session_get_status(
69 const struct ovsdb_jsonrpc_remote *,
70 struct ovsdb_jsonrpc_remote_status *);
71 static void ovsdb_jsonrpc_session_get_status(
72 const struct ovsdb_jsonrpc_session *,
73 struct ovsdb_jsonrpc_remote_status *);
74 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
75 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
76 static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
77 struct jsonrpc_msg *);
78
79 /* Triggers. */
80 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
81 struct ovsdb *,
82 struct json *id, struct json *params);
83 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
84 struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
85 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
86 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
87 static void ovsdb_jsonrpc_trigger_complete_done(
88 struct ovsdb_jsonrpc_session *);
89
90 /* Monitors. */
91 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
92 struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
93 enum ovsdb_monitor_version, const struct json *request_id);
94 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cond_change(
95 struct ovsdb_jsonrpc_session *s,
96 struct json *params,
97 const struct json *request_id);
98 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
99 struct ovsdb_jsonrpc_session *,
100 struct json_array *params,
101 const struct json *request_id);
102 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
103 static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
104 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
105 static struct json *ovsdb_jsonrpc_monitor_compose_update(
106 struct ovsdb_jsonrpc_monitor *monitor, bool initial);
107 static struct jsonrpc_msg * ovsdb_jsonrpc_create_notify(
108 const struct ovsdb_jsonrpc_monitor *m,
109 struct json *params);
110
111 \f
112 /* JSON-RPC database server. */
113
114 struct ovsdb_jsonrpc_server {
115 struct ovsdb_server up;
116 unsigned int n_sessions;
117 bool read_only; /* This server is does not accept any
118 transactions that can modify the database. */
119 struct shash remotes; /* Contains "struct ovsdb_jsonrpc_remote *"s. */
120 };
121
122 /* A configured remote. This is either a passive stream listener plus a list
123 * of the currently connected sessions, or a list of exactly one active
124 * session. */
125 struct ovsdb_jsonrpc_remote {
126 struct ovsdb_jsonrpc_server *server;
127 struct pstream *listener; /* Listener, if passive. */
128 struct ovs_list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */
129 uint8_t dscp;
130 };
131
132 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
133 struct ovsdb_jsonrpc_server *, const char *name,
134 const struct ovsdb_jsonrpc_options *options
135 );
136 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
137
138 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
139 *
140 * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
141 * which 'server' should provide access. */
142 struct ovsdb_jsonrpc_server *
143 ovsdb_jsonrpc_server_create(bool read_only)
144 {
145 struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
146 ovsdb_server_init(&server->up);
147 shash_init(&server->remotes);
148 server->read_only = read_only;
149 return server;
150 }
151
152 /* Adds 'db' to the set of databases served out by 'svr'. Returns true if
153 * successful, false if 'db''s name is the same as some database already in
154 * 'server'. */
155 bool
156 ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server *svr, struct ovsdb *db)
157 {
158 /* The OVSDB protocol doesn't have a way to notify a client that a
159 * database has been added. If some client tried to use the database
160 * that we're adding and failed, then forcing it to reconnect seems like
161 * a reasonable way to make it try again.
162 *
163 * If this is too big of a hammer in practice, we could be more selective,
164 * e.g. disconnect only connections that actually tried to use a database
165 * with 'db''s name. */
166 ovsdb_jsonrpc_server_reconnect(svr, svr->read_only);
167
168 return ovsdb_server_add_db(&svr->up, db);
169 }
170
171 /* Removes 'db' from the set of databases served out by 'svr'. Returns
172 * true if successful, false if there is no database associated with 'db'. */
173 bool
174 ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server *svr,
175 struct ovsdb *db)
176 {
177 /* There might be pointers to 'db' from 'svr', such as monitors or
178 * outstanding transactions. Disconnect all JSON-RPC connections to avoid
179 * accesses to freed memory.
180 *
181 * If this is too big of a hammer in practice, we could be more selective,
182 * e.g. disconnect only connections that actually reference 'db'. */
183 ovsdb_jsonrpc_server_reconnect(svr, svr->read_only);
184
185 return ovsdb_server_remove_db(&svr->up, db);
186 }
187
188 void
189 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
190 {
191 struct shash_node *node, *next;
192
193 SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
194 ovsdb_jsonrpc_server_del_remote(node);
195 }
196 shash_destroy(&svr->remotes);
197 ovsdb_server_destroy(&svr->up);
198 free(svr);
199 }
200
201 struct ovsdb_jsonrpc_options *
202 ovsdb_jsonrpc_default_options(const char *target)
203 {
204 struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
205 options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
206 options->probe_interval = (stream_or_pstream_needs_probes(target)
207 ? RECONNECT_DEFAULT_PROBE_INTERVAL
208 : 0);
209 return options;
210 }
211
212 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
213 * options in the struct ovsdb_jsonrpc_options supplied as the data values.
214 *
215 * A remote is an active or passive stream connection method, e.g. "pssl:" or
216 * "tcp:1.2.3.4". */
217 void
218 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
219 const struct shash *new_remotes)
220 {
221 struct shash_node *node, *next;
222
223 SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
224 struct ovsdb_jsonrpc_remote *remote = node->data;
225 struct ovsdb_jsonrpc_options *options
226 = shash_find_data(new_remotes, node->name);
227
228 if (!options) {
229 VLOG_INFO("%s: remote deconfigured", node->name);
230 ovsdb_jsonrpc_server_del_remote(node);
231 } else if (options->dscp != remote->dscp) {
232 ovsdb_jsonrpc_server_del_remote(node);
233 }
234 }
235 SHASH_FOR_EACH (node, new_remotes) {
236 const struct ovsdb_jsonrpc_options *options = node->data;
237 struct ovsdb_jsonrpc_remote *remote;
238
239 remote = shash_find_data(&svr->remotes, node->name);
240 if (!remote) {
241 remote = ovsdb_jsonrpc_server_add_remote(svr, node->name, options);
242 if (!remote) {
243 continue;
244 }
245 }
246
247 ovsdb_jsonrpc_session_set_all_options(remote, options);
248 }
249 }
250
251 static struct ovsdb_jsonrpc_remote *
252 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
253 const char *name,
254 const struct ovsdb_jsonrpc_options *options)
255 {
256 struct ovsdb_jsonrpc_remote *remote;
257 struct pstream *listener;
258 int error;
259
260 error = jsonrpc_pstream_open(name, &listener, options->dscp);
261 if (error && error != EAFNOSUPPORT) {
262 VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, ovs_strerror(error));
263 return NULL;
264 }
265
266 remote = xmalloc(sizeof *remote);
267 remote->server = svr;
268 remote->listener = listener;
269 ovs_list_init(&remote->sessions);
270 remote->dscp = options->dscp;
271 shash_add(&svr->remotes, name, remote);
272
273 if (!listener) {
274 ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name, true),
275 svr->read_only);
276 }
277 return remote;
278 }
279
280 static void
281 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
282 {
283 struct ovsdb_jsonrpc_remote *remote = node->data;
284
285 ovsdb_jsonrpc_session_close_all(remote);
286 pstream_close(remote->listener);
287 shash_delete(&remote->server->remotes, node);
288 free(remote);
289 }
290
291 /* Stores status information for the remote named 'target', which should have
292 * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
293 * into '*status'. On success returns true, on failure (if 'svr' doesn't have
294 * a remote named 'target' or if that remote is an outbound remote that has no
295 * active connections) returns false. On failure, 'status' will be zeroed.
296 */
297 bool
298 ovsdb_jsonrpc_server_get_remote_status(
299 const struct ovsdb_jsonrpc_server *svr, const char *target,
300 struct ovsdb_jsonrpc_remote_status *status)
301 {
302 const struct ovsdb_jsonrpc_remote *remote;
303
304 memset(status, 0, sizeof *status);
305
306 remote = shash_find_data(&svr->remotes, target);
307
308 if (!remote) {
309 return false;
310 }
311
312 if (remote->listener) {
313 status->bound_port = pstream_get_bound_port(remote->listener);
314 status->is_connected = !ovs_list_is_empty(&remote->sessions);
315 status->n_connections = ovs_list_size(&remote->sessions);
316 return true;
317 }
318
319 return ovsdb_jsonrpc_active_session_get_status(remote, status);
320 }
321
322 void
323 ovsdb_jsonrpc_server_free_remote_status(
324 struct ovsdb_jsonrpc_remote_status *status)
325 {
326 free(status->locks_held);
327 free(status->locks_waiting);
328 free(status->locks_lost);
329 }
330
331 /* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
332 * reconnect. */
333 void
334 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr, bool read_only)
335 {
336 struct shash_node *node;
337
338 svr->read_only = read_only;
339 SHASH_FOR_EACH (node, &svr->remotes) {
340 struct ovsdb_jsonrpc_remote *remote = node->data;
341
342 ovsdb_jsonrpc_session_reconnect_all(remote);
343 }
344 }
345
346 bool
347 ovsdb_jsonrpc_server_is_read_only(struct ovsdb_jsonrpc_server *svr)
348 {
349 return svr->read_only;
350 }
351
352 void
353 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
354 {
355 struct shash_node *node;
356
357 SHASH_FOR_EACH (node, &svr->remotes) {
358 struct ovsdb_jsonrpc_remote *remote = node->data;
359
360 if (remote->listener) {
361 struct stream *stream;
362 int error;
363
364 error = pstream_accept(remote->listener, &stream);
365 if (!error) {
366 struct jsonrpc_session *js;
367 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
368 remote->dscp);
369 ovsdb_jsonrpc_session_create(remote, js, svr->read_only);
370 } else if (error != EAGAIN) {
371 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
372 pstream_get_name(remote->listener),
373 ovs_strerror(error));
374 }
375 }
376
377 ovsdb_jsonrpc_session_run_all(remote);
378 }
379 }
380
381 void
382 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
383 {
384 struct shash_node *node;
385
386 SHASH_FOR_EACH (node, &svr->remotes) {
387 struct ovsdb_jsonrpc_remote *remote = node->data;
388
389 if (remote->listener) {
390 pstream_wait(remote->listener);
391 }
392
393 ovsdb_jsonrpc_session_wait_all(remote);
394 }
395 }
396
397 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
398 * memory_report(). */
399 void
400 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr,
401 struct simap *usage)
402 {
403 struct shash_node *node;
404
405 simap_increase(usage, "sessions", svr->n_sessions);
406 SHASH_FOR_EACH (node, &svr->remotes) {
407 struct ovsdb_jsonrpc_remote *remote = node->data;
408
409 ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage);
410 }
411 }
412 \f
413 /* JSON-RPC database server session. */
414
415 struct ovsdb_jsonrpc_session {
416 struct ovs_list node; /* Element in remote's sessions list. */
417 struct ovsdb_session up;
418 struct ovsdb_jsonrpc_remote *remote;
419
420 /* Triggers. */
421 struct hmap triggers; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
422
423 /* Monitors. */
424 struct hmap monitors; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
425
426 /* Network connectivity. */
427 struct jsonrpc_session *js; /* JSON-RPC session. */
428 unsigned int js_seqno; /* Last jsonrpc_session_get_seqno() value. */
429
430 /* Read only. */
431 bool read_only; /* When true, not allow to modify the
432 database. */
433 };
434
435 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
436 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
437 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
438 static void ovsdb_jsonrpc_session_get_memory_usage(
439 const struct ovsdb_jsonrpc_session *, struct simap *usage);
440 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
441 struct jsonrpc_msg *);
442 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
443 struct jsonrpc_msg *);
444
445 static struct ovsdb_jsonrpc_session *
446 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
447 struct jsonrpc_session *js, bool read_only)
448 {
449 struct ovsdb_jsonrpc_session *s;
450
451 s = xzalloc(sizeof *s);
452 ovsdb_session_init(&s->up, &remote->server->up);
453 s->remote = remote;
454 ovs_list_push_back(&remote->sessions, &s->node);
455 hmap_init(&s->triggers);
456 hmap_init(&s->monitors);
457 s->js = js;
458 s->js_seqno = jsonrpc_session_get_seqno(js);
459 s->read_only = read_only;
460
461 remote->server->n_sessions++;
462
463 return s;
464 }
465
466 static void
467 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
468 {
469 ovsdb_jsonrpc_monitor_remove_all(s);
470 ovsdb_jsonrpc_session_unlock_all(s);
471 ovsdb_jsonrpc_trigger_complete_all(s);
472
473 hmap_destroy(&s->monitors);
474 hmap_destroy(&s->triggers);
475
476 jsonrpc_session_close(s->js);
477 ovs_list_remove(&s->node);
478 s->remote->server->n_sessions--;
479 ovsdb_session_destroy(&s->up);
480 free(s);
481 }
482
483 static int
484 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
485 {
486 jsonrpc_session_run(s->js);
487 if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
488 s->js_seqno = jsonrpc_session_get_seqno(s->js);
489 ovsdb_jsonrpc_trigger_complete_all(s);
490 ovsdb_jsonrpc_monitor_remove_all(s);
491 ovsdb_jsonrpc_session_unlock_all(s);
492 }
493
494 ovsdb_jsonrpc_trigger_complete_done(s);
495
496 if (!jsonrpc_session_get_backlog(s->js)) {
497 struct jsonrpc_msg *msg;
498
499 ovsdb_jsonrpc_monitor_flush_all(s);
500
501 msg = jsonrpc_session_recv(s->js);
502 if (msg) {
503 if (msg->type == JSONRPC_REQUEST) {
504 ovsdb_jsonrpc_session_got_request(s, msg);
505 } else if (msg->type == JSONRPC_NOTIFY) {
506 ovsdb_jsonrpc_session_got_notify(s, msg);
507 } else {
508 VLOG_WARN("%s: received unexpected %s message",
509 jsonrpc_session_get_name(s->js),
510 jsonrpc_msg_type_to_string(msg->type));
511 jsonrpc_session_force_reconnect(s->js);
512 jsonrpc_msg_destroy(msg);
513 }
514 }
515 }
516 return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
517 }
518
519 static void
520 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
521 const struct ovsdb_jsonrpc_options *options)
522 {
523 jsonrpc_session_set_max_backoff(session->js, options->max_backoff);
524 jsonrpc_session_set_probe_interval(session->js, options->probe_interval);
525 jsonrpc_session_set_dscp(session->js, options->dscp);
526 }
527
528 static void
529 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
530 {
531 struct ovsdb_jsonrpc_session *s, *next;
532
533 LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
534 int error = ovsdb_jsonrpc_session_run(s);
535 if (error) {
536 ovsdb_jsonrpc_session_close(s);
537 }
538 }
539 }
540
541 static void
542 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
543 {
544 jsonrpc_session_wait(s->js);
545 if (!jsonrpc_session_get_backlog(s->js)) {
546 if (ovsdb_jsonrpc_monitor_needs_flush(s)) {
547 poll_immediate_wake();
548 } else {
549 jsonrpc_session_recv_wait(s->js);
550 }
551 }
552 }
553
554 static void
555 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
556 {
557 struct ovsdb_jsonrpc_session *s;
558
559 LIST_FOR_EACH (s, node, &remote->sessions) {
560 ovsdb_jsonrpc_session_wait(s);
561 }
562 }
563
564 static void
565 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
566 struct simap *usage)
567 {
568 simap_increase(usage, "triggers", hmap_count(&s->triggers));
569 simap_increase(usage, "backlog", jsonrpc_session_get_backlog(s->js));
570 }
571
572 static void
573 ovsdb_jsonrpc_session_get_memory_usage_all(
574 const struct ovsdb_jsonrpc_remote *remote,
575 struct simap *usage)
576 {
577 struct ovsdb_jsonrpc_session *s;
578
579 LIST_FOR_EACH (s, node, &remote->sessions) {
580 ovsdb_jsonrpc_session_get_memory_usage(s, usage);
581 }
582 }
583
584 static void
585 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
586 {
587 struct ovsdb_jsonrpc_session *s, *next;
588
589 LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
590 ovsdb_jsonrpc_session_close(s);
591 }
592 }
593
594 /* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
595 * reconnect. */
596 static void
597 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
598 {
599 struct ovsdb_jsonrpc_session *s, *next;
600
601 LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
602 jsonrpc_session_force_reconnect(s->js);
603 if (!jsonrpc_session_is_alive(s->js)) {
604 ovsdb_jsonrpc_session_close(s);
605 }
606 }
607 }
608
609 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
610 * 'options'.
611 *
612 * (The dscp value can't be changed directly; the caller must instead close and
613 * re-open the session.) */
614 static void
615 ovsdb_jsonrpc_session_set_all_options(
616 struct ovsdb_jsonrpc_remote *remote,
617 const struct ovsdb_jsonrpc_options *options)
618 {
619 struct ovsdb_jsonrpc_session *s;
620
621 LIST_FOR_EACH (s, node, &remote->sessions) {
622 ovsdb_jsonrpc_session_set_options(s, options);
623 }
624 }
625
626 /* Sets the 'status' of for the 'remote' with an outgoing connection. */
627 static bool
628 ovsdb_jsonrpc_active_session_get_status(
629 const struct ovsdb_jsonrpc_remote *remote,
630 struct ovsdb_jsonrpc_remote_status *status)
631 {
632 const struct ovs_list *sessions = &remote->sessions;
633 const struct ovsdb_jsonrpc_session *s;
634
635 if (ovs_list_is_empty(sessions)) {
636 return false;
637 }
638
639 ovs_assert(ovs_list_is_singleton(sessions));
640 s = CONTAINER_OF(ovs_list_front(sessions), struct ovsdb_jsonrpc_session, node);
641 ovsdb_jsonrpc_session_get_status(s, status);
642 status->n_connections = 1;
643
644 return true;
645 }
646
647 static void
648 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_session *session,
649 struct ovsdb_jsonrpc_remote_status *status)
650 {
651 const struct ovsdb_jsonrpc_session *s = session;
652 const struct jsonrpc_session *js;
653 struct ovsdb_lock_waiter *waiter;
654 struct reconnect_stats rstats;
655 struct ds locks_held, locks_waiting, locks_lost;
656
657 js = s->js;
658
659 status->is_connected = jsonrpc_session_is_connected(js);
660 status->last_error = jsonrpc_session_get_status(js);
661
662 jsonrpc_session_get_reconnect_stats(js, &rstats);
663 status->state = rstats.state;
664 status->sec_since_connect = rstats.msec_since_connect == UINT_MAX
665 ? UINT_MAX : rstats.msec_since_connect / 1000;
666 status->sec_since_disconnect = rstats.msec_since_disconnect == UINT_MAX
667 ? UINT_MAX : rstats.msec_since_disconnect / 1000;
668
669 ds_init(&locks_held);
670 ds_init(&locks_waiting);
671 ds_init(&locks_lost);
672 HMAP_FOR_EACH (waiter, session_node, &s->up.waiters) {
673 struct ds *string;
674
675 string = (ovsdb_lock_waiter_is_owner(waiter) ? &locks_held
676 : waiter->mode == OVSDB_LOCK_WAIT ? &locks_waiting
677 : &locks_lost);
678 if (string->length) {
679 ds_put_char(string, ' ');
680 }
681 ds_put_cstr(string, waiter->lock_name);
682 }
683 status->locks_held = ds_steal_cstr(&locks_held);
684 status->locks_waiting = ds_steal_cstr(&locks_waiting);
685 status->locks_lost = ds_steal_cstr(&locks_lost);
686 }
687
688 /* Examines 'request' to determine the database to which it relates, and then
689 * searches 's' to find that database:
690 *
691 * - If successful, returns the database and sets '*replyp' to NULL.
692 *
693 * - If no such database exists, returns NULL and sets '*replyp' to an
694 * appropriate JSON-RPC error reply, owned by the caller. */
695 static struct ovsdb *
696 ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session *s,
697 const struct jsonrpc_msg *request,
698 struct jsonrpc_msg **replyp)
699 {
700 struct json_array *params;
701 struct ovsdb_error *error;
702 const char *db_name;
703 struct ovsdb *db;
704
705 params = json_array(request->params);
706 if (!params->n || params->elems[0]->type != JSON_STRING) {
707 error = ovsdb_syntax_error(
708 request->params, NULL,
709 "%s request params must begin with <db-name>", request->method);
710 goto error;
711 }
712
713 db_name = params->elems[0]->u.string;
714 db = shash_find_data(&s->up.server->dbs, db_name);
715 if (!db) {
716 error = ovsdb_syntax_error(
717 request->params, "unknown database",
718 "%s request specifies unknown database %s",
719 request->method, db_name);
720 goto error;
721 }
722
723 *replyp = NULL;
724 return db;
725
726 error:
727 *replyp = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
728 ovsdb_error_destroy(error);
729 return NULL;
730 }
731
732 static struct ovsdb_error *
733 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg *request,
734 const char **lock_namep)
735 {
736 const struct json_array *params;
737
738 params = json_array(request->params);
739 if (params->n != 1 || params->elems[0]->type != JSON_STRING ||
740 !ovsdb_parser_is_id(json_string(params->elems[0]))) {
741 *lock_namep = NULL;
742 return ovsdb_syntax_error(request->params, NULL,
743 "%s request params must be <id>",
744 request->method);
745 }
746
747 *lock_namep = json_string(params->elems[0]);
748 return NULL;
749 }
750
751 static void
752 ovsdb_jsonrpc_session_notify(struct ovsdb_session *session,
753 const char *lock_name,
754 const char *method)
755 {
756 struct ovsdb_jsonrpc_session *s;
757 struct json *params;
758
759 s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
760 params = json_array_create_1(json_string_create(lock_name));
761 ovsdb_jsonrpc_session_send(s, jsonrpc_create_notify(method, params));
762 }
763
764 static struct jsonrpc_msg *
765 jsonrpc_create_readonly_lock_error(const struct json *id)
766 {
767 return jsonrpc_create_error(json_string_create(
768 "lock and unlock methods not allowed,"
769 " DB server is read only."), id);
770 }
771
772 static struct jsonrpc_msg *
773 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
774 struct jsonrpc_msg *request,
775 enum ovsdb_lock_mode mode)
776 {
777 struct ovsdb_lock_waiter *waiter;
778 struct jsonrpc_msg *reply;
779 struct ovsdb_error *error;
780 struct ovsdb_session *victim;
781 const char *lock_name;
782 struct json *result;
783
784 if (s->read_only) {
785 return jsonrpc_create_readonly_lock_error(request->id);
786 }
787
788 error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
789 if (error) {
790 goto error;
791 }
792
793 /* Report error if this session has issued a "lock" or "steal" without a
794 * matching "unlock" for this lock. */
795 waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
796 if (waiter) {
797 error = ovsdb_syntax_error(
798 request->params, NULL,
799 "must issue \"unlock\" before new \"%s\"", request->method);
800 goto error;
801 }
802
803 /* Get the lock, add us as a waiter. */
804 waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode,
805 &victim);
806 if (victim) {
807 ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen");
808 }
809
810 result = json_object_create();
811 json_object_put(result, "locked",
812 json_boolean_create(ovsdb_lock_waiter_is_owner(waiter)));
813
814 return jsonrpc_create_reply(result, request->id);
815
816 error:
817 reply = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
818 ovsdb_error_destroy(error);
819 return reply;
820 }
821
822 static void
823 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *s)
824 {
825 struct ovsdb_lock_waiter *waiter, *next;
826
827 HMAP_FOR_EACH_SAFE (waiter, next, session_node, &s->up.waiters) {
828 ovsdb_jsonrpc_session_unlock__(waiter);
829 }
830 }
831
832 static void
833 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *waiter)
834 {
835 struct ovsdb_lock *lock = waiter->lock;
836
837 if (lock) {
838 struct ovsdb_session *new_owner = ovsdb_lock_waiter_remove(waiter);
839 if (new_owner) {
840 ovsdb_jsonrpc_session_notify(new_owner, lock->name, "locked");
841 } else {
842 /* ovsdb_server_lock() might have freed 'lock'. */
843 }
844 }
845
846 ovsdb_lock_waiter_destroy(waiter);
847 }
848
849 static struct jsonrpc_msg *
850 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session *s,
851 struct jsonrpc_msg *request)
852 {
853 struct ovsdb_lock_waiter *waiter;
854 struct jsonrpc_msg *reply;
855 struct ovsdb_error *error;
856 const char *lock_name;
857
858 if (s->read_only) {
859 return jsonrpc_create_readonly_lock_error(request->id);
860 }
861
862 error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
863 if (error) {
864 goto error;
865 }
866
867 /* Report error if this session has not issued a "lock" or "steal" for this
868 * lock. */
869 waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
870 if (!waiter) {
871 error = ovsdb_syntax_error(
872 request->params, NULL, "\"unlock\" without \"lock\" or \"steal\"");
873 goto error;
874 }
875
876 ovsdb_jsonrpc_session_unlock__(waiter);
877
878 return jsonrpc_create_reply(json_object_create(), request->id);
879
880 error:
881 reply = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
882 ovsdb_error_destroy(error);
883 return reply;
884 }
885
886 static struct jsonrpc_msg *
887 execute_transaction(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
888 struct jsonrpc_msg *request)
889 {
890 ovsdb_jsonrpc_trigger_create(s, db, request->id, request->params);
891 request->id = NULL;
892 request->params = NULL;
893 jsonrpc_msg_destroy(request);
894 return NULL;
895 }
896
897 static void
898 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
899 struct jsonrpc_msg *request)
900 {
901 struct jsonrpc_msg *reply;
902
903 if (!strcmp(request->method, "transact")) {
904 struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
905 if (!reply) {
906 reply = execute_transaction(s, db, request);
907 }
908 } else if (!strcmp(request->method, "monitor") ||
909 (monitor_cond_enable__ && !strcmp(request->method,
910 "monitor_cond"))) {
911 struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
912 if (!reply) {
913 int l = strlen(request->method) - strlen("monitor");
914 enum ovsdb_monitor_version version = l ? OVSDB_MONITOR_V2
915 : OVSDB_MONITOR_V1;
916 reply = ovsdb_jsonrpc_monitor_create(s, db, request->params,
917 version, request->id);
918 }
919 } else if (!strcmp(request->method, "monitor_cond_change")) {
920 reply = ovsdb_jsonrpc_monitor_cond_change(s, request->params,
921 request->id);
922 } else if (!strcmp(request->method, "monitor_cancel")) {
923 reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
924 request->id);
925 } else if (!strcmp(request->method, "get_schema")) {
926 struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
927 if (!reply) {
928 reply = jsonrpc_create_reply(ovsdb_schema_to_json(db->schema),
929 request->id);
930 }
931 } else if (!strcmp(request->method, "list_dbs")) {
932 size_t n_dbs = shash_count(&s->up.server->dbs);
933 struct shash_node *node;
934 struct json **dbs;
935 size_t i;
936
937 dbs = xmalloc(n_dbs * sizeof *dbs);
938 i = 0;
939 SHASH_FOR_EACH (node, &s->up.server->dbs) {
940 dbs[i++] = json_string_create(node->name);
941 }
942 reply = jsonrpc_create_reply(json_array_create(dbs, n_dbs),
943 request->id);
944 } else if (!strcmp(request->method, "lock")) {
945 reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_WAIT);
946 } else if (!strcmp(request->method, "steal")) {
947 reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_STEAL);
948 } else if (!strcmp(request->method, "unlock")) {
949 reply = ovsdb_jsonrpc_session_unlock(s, request);
950 } else if (!strcmp(request->method, "echo")) {
951 reply = jsonrpc_create_reply(json_clone(request->params), request->id);
952 } else {
953 reply = jsonrpc_create_error(json_string_create("unknown method"),
954 request->id);
955 }
956
957 if (reply) {
958 jsonrpc_msg_destroy(request);
959 ovsdb_jsonrpc_session_send(s, reply);
960 }
961 }
962
963 static void
964 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
965 {
966 if (json_array(request->params)->n == 1) {
967 struct ovsdb_jsonrpc_trigger *t;
968 struct json *id;
969
970 id = request->params->u.array.elems[0];
971 t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
972 if (t) {
973 ovsdb_jsonrpc_trigger_complete(t);
974 }
975 }
976 }
977
978 static void
979 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
980 struct jsonrpc_msg *request)
981 {
982 if (!strcmp(request->method, "cancel")) {
983 execute_cancel(s, request);
984 }
985 jsonrpc_msg_destroy(request);
986 }
987
988 static void
989 ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *s,
990 struct jsonrpc_msg *msg)
991 {
992 ovsdb_jsonrpc_monitor_flush_all(s);
993 jsonrpc_session_send(s->js, msg);
994 }
995 \f
996 /* JSON-RPC database server triggers.
997 *
998 * (Every transaction is treated as a trigger even if it doesn't actually have
999 * any "wait" operations.) */
1000
1001 struct ovsdb_jsonrpc_trigger {
1002 struct ovsdb_trigger trigger;
1003 struct hmap_node hmap_node; /* In session's "triggers" hmap. */
1004 struct json *id;
1005 };
1006
1007 static void
1008 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
1009 struct json *id, struct json *params)
1010 {
1011 struct ovsdb_jsonrpc_trigger *t;
1012 size_t hash;
1013
1014 /* Check for duplicate ID. */
1015 hash = json_hash(id, 0);
1016 t = ovsdb_jsonrpc_trigger_find(s, id, hash);
1017 if (t) {
1018 struct jsonrpc_msg *msg;
1019
1020 msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
1021 id);
1022 ovsdb_jsonrpc_session_send(s, msg);
1023 json_destroy(id);
1024 json_destroy(params);
1025 return;
1026 }
1027
1028 /* Insert into trigger table. */
1029 t = xmalloc(sizeof *t);
1030 ovsdb_trigger_init(&s->up, db, &t->trigger, params, time_msec(),
1031 s->read_only);
1032 t->id = id;
1033 hmap_insert(&s->triggers, &t->hmap_node, hash);
1034
1035 /* Complete early if possible. */
1036 if (ovsdb_trigger_is_complete(&t->trigger)) {
1037 ovsdb_jsonrpc_trigger_complete(t);
1038 }
1039 }
1040
1041 static struct ovsdb_jsonrpc_trigger *
1042 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
1043 const struct json *id, size_t hash)
1044 {
1045 struct ovsdb_jsonrpc_trigger *t;
1046
1047 HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &s->triggers) {
1048 if (json_equal(t->id, id)) {
1049 return t;
1050 }
1051 }
1052
1053 return NULL;
1054 }
1055
1056 static void
1057 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
1058 {
1059 struct ovsdb_jsonrpc_session *s;
1060
1061 s = CONTAINER_OF(t->trigger.session, struct ovsdb_jsonrpc_session, up);
1062
1063 if (jsonrpc_session_is_connected(s->js)) {
1064 struct jsonrpc_msg *reply;
1065 struct json *result;
1066
1067 result = ovsdb_trigger_steal_result(&t->trigger);
1068 if (result) {
1069 reply = jsonrpc_create_reply(result, t->id);
1070 } else {
1071 reply = jsonrpc_create_error(json_string_create("canceled"),
1072 t->id);
1073 }
1074 ovsdb_jsonrpc_session_send(s, reply);
1075 }
1076
1077 json_destroy(t->id);
1078 ovsdb_trigger_destroy(&t->trigger);
1079 hmap_remove(&s->triggers, &t->hmap_node);
1080 free(t);
1081 }
1082
1083 static void
1084 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
1085 {
1086 struct ovsdb_jsonrpc_trigger *t, *next;
1087 HMAP_FOR_EACH_SAFE (t, next, hmap_node, &s->triggers) {
1088 ovsdb_jsonrpc_trigger_complete(t);
1089 }
1090 }
1091
1092 static void
1093 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
1094 {
1095 while (!ovs_list_is_empty(&s->up.completions)) {
1096 struct ovsdb_jsonrpc_trigger *t
1097 = CONTAINER_OF(s->up.completions.next,
1098 struct ovsdb_jsonrpc_trigger, trigger.node);
1099 ovsdb_jsonrpc_trigger_complete(t);
1100 }
1101 }
1102 \f
1103 /* Jsonrpc front end monitor. */
1104 struct ovsdb_jsonrpc_monitor {
1105 struct hmap_node node; /* In ovsdb_jsonrpc_session's "monitors". */
1106 struct ovsdb_jsonrpc_session *session;
1107 struct ovsdb *db;
1108 struct json *monitor_id;
1109 struct ovsdb_monitor *dbmon;
1110 uint64_t unflushed; /* The first transaction that has not been
1111 flushed to the jsonrpc remote client. */
1112 enum ovsdb_monitor_version version;
1113 struct ovsdb_monitor_session_condition *condition;/* Session's condition */
1114 };
1115
1116 static struct ovsdb_jsonrpc_monitor *
1117 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
1118 const struct json *monitor_id)
1119 {
1120 struct ovsdb_jsonrpc_monitor *m;
1121
1122 HMAP_FOR_EACH_WITH_HASH (m, node, json_hash(monitor_id, 0), &s->monitors) {
1123 if (json_equal(m->monitor_id, monitor_id)) {
1124 return m;
1125 }
1126 }
1127
1128 return NULL;
1129 }
1130
1131 static bool
1132 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
1133 {
1134 const struct json *json;
1135
1136 json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
1137 return json ? json_boolean(json) : default_value;
1138 }
1139
1140 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
1141 ovsdb_jsonrpc_parse_monitor_request(
1142 struct ovsdb_monitor *dbmon,
1143 const struct ovsdb_table *table,
1144 struct ovsdb_monitor_session_condition *cond,
1145 const struct json *monitor_request)
1146 {
1147 const struct ovsdb_table_schema *ts = table->schema;
1148 enum ovsdb_monitor_selection select;
1149 const struct json *columns, *select_json, *where = NULL;
1150 struct ovsdb_parser parser;
1151 struct ovsdb_error *error;
1152
1153 ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
1154 if (cond) {
1155 where = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
1156 }
1157 columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1158
1159 select_json = ovsdb_parser_member(&parser, "select",
1160 OP_OBJECT | OP_OPTIONAL);
1161
1162 error = ovsdb_parser_finish(&parser);
1163 if (error) {
1164 return error;
1165 }
1166
1167 if (select_json) {
1168 select = 0;
1169 ovsdb_parser_init(&parser, select_json, "table %s select", ts->name);
1170 if (parse_bool(&parser, "initial", true)) {
1171 select |= OJMS_INITIAL;
1172 }
1173 if (parse_bool(&parser, "insert", true)) {
1174 select |= OJMS_INSERT;
1175 }
1176 if (parse_bool(&parser, "delete", true)) {
1177 select |= OJMS_DELETE;
1178 }
1179 if (parse_bool(&parser, "modify", true)) {
1180 select |= OJMS_MODIFY;
1181 }
1182 error = ovsdb_parser_finish(&parser);
1183 if (error) {
1184 return error;
1185 }
1186 } else {
1187 select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
1188 }
1189
1190 ovsdb_monitor_table_add_select(dbmon, table, select);
1191 if (columns) {
1192 size_t i;
1193
1194 if (columns->type != JSON_ARRAY) {
1195 return ovsdb_syntax_error(columns, NULL,
1196 "array of column names expected");
1197 }
1198
1199 for (i = 0; i < columns->u.array.n; i++) {
1200 const struct ovsdb_column *column;
1201 const char *s;
1202
1203 if (columns->u.array.elems[i]->type != JSON_STRING) {
1204 return ovsdb_syntax_error(columns, NULL,
1205 "array of column names expected");
1206 }
1207
1208 s = columns->u.array.elems[i]->u.string;
1209 column = shash_find_data(&table->schema->columns, s);
1210 if (!column) {
1211 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
1212 "column name", s);
1213 }
1214 if (ovsdb_monitor_add_column(dbmon, table, column,
1215 select, true)) {
1216 return ovsdb_syntax_error(columns, NULL, "column %s "
1217 "mentioned more than once",
1218 column->name);
1219 }
1220 }
1221 } else {
1222 struct shash_node *node;
1223
1224 SHASH_FOR_EACH (node, &ts->columns) {
1225 const struct ovsdb_column *column = node->data;
1226 if (column->index != OVSDB_COL_UUID) {
1227 if (ovsdb_monitor_add_column(dbmon, table, column,
1228 select, true)) {
1229 return ovsdb_syntax_error(columns, NULL, "column %s "
1230 "mentioned more than once",
1231 column->name);
1232 }
1233 }
1234 }
1235 }
1236 if (cond) {
1237 error = ovsdb_monitor_table_condition_create(cond, table, where);
1238 if (error) {
1239 return error;
1240 }
1241 }
1242
1243 return NULL;
1244 }
1245
1246 static struct jsonrpc_msg *
1247 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
1248 struct json *params,
1249 enum ovsdb_monitor_version version,
1250 const struct json *request_id)
1251 {
1252 struct ovsdb_jsonrpc_monitor *m = NULL;
1253 struct ovsdb_monitor *dbmon = NULL;
1254 struct json *monitor_id, *monitor_requests;
1255 struct ovsdb_error *error = NULL;
1256 struct shash_node *node;
1257 struct json *json;
1258
1259 if (json_array(params)->n != 3) {
1260 error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1261 goto error;
1262 }
1263 monitor_id = params->u.array.elems[1];
1264 monitor_requests = params->u.array.elems[2];
1265 if (monitor_requests->type != JSON_OBJECT) {
1266 error = ovsdb_syntax_error(monitor_requests, NULL,
1267 "monitor-requests must be object");
1268 goto error;
1269 }
1270
1271 if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
1272 error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
1273 goto error;
1274 }
1275
1276 m = xzalloc(sizeof *m);
1277 m->session = s;
1278 m->db = db;
1279 m->dbmon = ovsdb_monitor_create(db, m);
1280 if (version == OVSDB_MONITOR_V2) {
1281 m->condition = ovsdb_monitor_session_condition_create();
1282 }
1283 m->unflushed = 0;
1284 m->version = version;
1285 hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
1286 m->monitor_id = json_clone(monitor_id);
1287
1288 SHASH_FOR_EACH (node, json_object(monitor_requests)) {
1289 const struct ovsdb_table *table;
1290 const struct json *mr_value;
1291 size_t i;
1292
1293 table = ovsdb_get_table(m->db, node->name);
1294 if (!table) {
1295 error = ovsdb_syntax_error(NULL, NULL,
1296 "no table named %s", node->name);
1297 goto error;
1298 }
1299
1300 ovsdb_monitor_add_table(m->dbmon, table);
1301
1302 /* Parse columns. */
1303 mr_value = node->data;
1304 if (mr_value->type == JSON_ARRAY) {
1305 const struct json_array *array = &mr_value->u.array;
1306
1307 for (i = 0; i < array->n; i++) {
1308 error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
1309 table,
1310 m->condition,
1311 array->elems[i]);
1312 if (error) {
1313 goto error;
1314 }
1315 }
1316 } else {
1317 error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
1318 table,
1319 m->condition,
1320 mr_value);
1321 if (error) {
1322 goto error;
1323 }
1324 }
1325 }
1326
1327 dbmon = ovsdb_monitor_add(m->dbmon);
1328 if (dbmon != m->dbmon) {
1329 /* Found an exisiting dbmon, reuse the current one. */
1330 ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
1331 ovsdb_monitor_add_jsonrpc_monitor(dbmon, m);
1332 m->dbmon = dbmon;
1333 }
1334
1335 /* Only now we can bind session's condition to ovsdb_monitor */
1336 if (m->condition) {
1337 ovsdb_monitor_condition_bind(m->dbmon, m->condition);
1338 }
1339
1340 ovsdb_monitor_get_initial(m->dbmon);
1341 json = ovsdb_jsonrpc_monitor_compose_update(m, true);
1342 json = json ? json : json_object_create();
1343 return jsonrpc_create_reply(json, request_id);
1344
1345 error:
1346 if (m) {
1347 ovsdb_jsonrpc_monitor_destroy(m);
1348 }
1349
1350 json = ovsdb_error_to_json(error);
1351 ovsdb_error_destroy(error);
1352 return jsonrpc_create_error(json, request_id);
1353 }
1354
1355 static struct ovsdb_error *
1356 ovsdb_jsonrpc_parse_monitor_cond_change_request(
1357 struct ovsdb_jsonrpc_monitor *m,
1358 const struct ovsdb_table *table,
1359 const struct json *cond_change_req)
1360 {
1361 const struct ovsdb_table_schema *ts = table->schema;
1362 const struct json *condition, *columns;
1363 struct ovsdb_parser parser;
1364 struct ovsdb_error *error;
1365
1366 ovsdb_parser_init(&parser, cond_change_req, "table %s", ts->name);
1367 columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1368 condition = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
1369
1370 error = ovsdb_parser_finish(&parser);
1371 if (error) {
1372 return error;
1373 }
1374
1375 if (columns) {
1376 error = ovsdb_syntax_error(cond_change_req, NULL, "changing columns "
1377 "is unsupported");
1378 return error;
1379 }
1380 error = ovsdb_monitor_table_condition_update(m->dbmon, m->condition, table,
1381 condition);
1382
1383 return error;
1384 }
1385
1386 static struct jsonrpc_msg *
1387 ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session *s,
1388 struct json *params,
1389 const struct json *request_id)
1390 {
1391 struct ovsdb_error *error;
1392 struct ovsdb_jsonrpc_monitor *m;
1393 struct json *monitor_cond_change_reqs;
1394 struct shash_node *node;
1395 struct json *json;
1396
1397 if (json_array(params)->n != 3) {
1398 error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1399 goto error;
1400 }
1401
1402 m = ovsdb_jsonrpc_monitor_find(s, params->u.array.elems[0]);
1403 if (!m) {
1404 error = ovsdb_syntax_error(request_id, NULL,
1405 "unknown monitor session");
1406 goto error;
1407 }
1408
1409 monitor_cond_change_reqs = params->u.array.elems[2];
1410 if (monitor_cond_change_reqs->type != JSON_OBJECT) {
1411 error =
1412 ovsdb_syntax_error(NULL, NULL,
1413 "monitor-cond-change-requests must be object");
1414 goto error;
1415 }
1416
1417 SHASH_FOR_EACH (node, json_object(monitor_cond_change_reqs)) {
1418 const struct ovsdb_table *table;
1419 const struct json *mr_value;
1420 size_t i;
1421
1422 table = ovsdb_get_table(m->db, node->name);
1423 if (!table) {
1424 error = ovsdb_syntax_error(NULL, NULL,
1425 "no table named %s", node->name);
1426 goto error;
1427 }
1428 if (!ovsdb_monitor_table_exists(m->dbmon, table)) {
1429 error = ovsdb_syntax_error(NULL, NULL,
1430 "no table named %s in monitor session",
1431 node->name);
1432 goto error;
1433 }
1434
1435 mr_value = node->data;
1436 if (mr_value->type == JSON_ARRAY) {
1437 const struct json_array *array = &mr_value->u.array;
1438
1439 for (i = 0; i < array->n; i++) {
1440 error = ovsdb_jsonrpc_parse_monitor_cond_change_request(
1441 m, table, array->elems[i]);
1442 if (error) {
1443 goto error;
1444 }
1445 }
1446 } else {
1447 error = ovsdb_syntax_error(
1448 NULL, NULL,
1449 "table %s no monitor-cond-change JSON array",
1450 node->name);
1451 goto error;
1452 }
1453 }
1454
1455 /* Change monitor id */
1456 hmap_remove(&s->monitors, &m->node);
1457 json_destroy(m->monitor_id);
1458 m->monitor_id = json_clone(params->u.array.elems[1]);
1459 hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0));
1460
1461 /* Send the new update, if any, represents the difference from the old
1462 * condition and the new one. */
1463 struct json *update_json;
1464
1465 update_json = ovsdb_monitor_get_update(m->dbmon, false, true,
1466 &m->unflushed, m->condition, m->version);
1467 if (update_json) {
1468 struct jsonrpc_msg *msg;
1469 struct json *params;
1470
1471 params = json_array_create_2(json_clone(m->monitor_id), update_json);
1472 msg = ovsdb_jsonrpc_create_notify(m, params);
1473 jsonrpc_session_send(s->js, msg);
1474 }
1475
1476 return jsonrpc_create_reply(json_object_create(), request_id);
1477
1478 error:
1479
1480 json = ovsdb_error_to_json(error);
1481 ovsdb_error_destroy(error);
1482 return jsonrpc_create_error(json, request_id);
1483 }
1484
1485 static struct jsonrpc_msg *
1486 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
1487 struct json_array *params,
1488 const struct json *request_id)
1489 {
1490 if (params->n != 1) {
1491 return jsonrpc_create_error(json_string_create("invalid parameters"),
1492 request_id);
1493 } else {
1494 struct ovsdb_jsonrpc_monitor *m;
1495
1496 m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
1497 if (!m) {
1498 return jsonrpc_create_error(json_string_create("unknown monitor"),
1499 request_id);
1500 } else {
1501 ovsdb_jsonrpc_monitor_destroy(m);
1502 return jsonrpc_create_reply(json_object_create(), request_id);
1503 }
1504 }
1505 }
1506
1507 static void
1508 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
1509 {
1510 struct ovsdb_jsonrpc_monitor *m, *next;
1511
1512 HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
1513 ovsdb_jsonrpc_monitor_destroy(m);
1514 }
1515 }
1516
1517 static struct json *
1518 ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
1519 bool initial)
1520 {
1521
1522 if (!ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
1523 return NULL;
1524 }
1525
1526 return ovsdb_monitor_get_update(m->dbmon, initial, false,
1527 &m->unflushed, m->condition, m->version);
1528 }
1529
1530 static bool
1531 ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
1532 {
1533 struct ovsdb_jsonrpc_monitor *m;
1534
1535 HMAP_FOR_EACH (m, node, &s->monitors) {
1536 if (ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
1537 return true;
1538 }
1539 }
1540
1541 return false;
1542 }
1543
1544 void
1545 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_jsonrpc_monitor *m)
1546 {
1547 json_destroy(m->monitor_id);
1548 hmap_remove(&m->session->monitors, &m->node);
1549 ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
1550 ovsdb_monitor_session_condition_destroy(m->condition);
1551 free(m);
1552 }
1553
1554 static struct jsonrpc_msg *
1555 ovsdb_jsonrpc_create_notify(const struct ovsdb_jsonrpc_monitor *m,
1556 struct json *params)
1557 {
1558 const char *method;
1559
1560 switch(m->version) {
1561 case OVSDB_MONITOR_V1:
1562 method = "update";
1563 break;
1564 case OVSDB_MONITOR_V2:
1565 method = "update2";
1566 break;
1567 case OVSDB_MONITOR_VERSION_MAX:
1568 default:
1569 OVS_NOT_REACHED();
1570 }
1571
1572 return jsonrpc_create_notify(method, params);
1573 }
1574
1575 static void
1576 ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
1577 {
1578 struct ovsdb_jsonrpc_monitor *m;
1579
1580 HMAP_FOR_EACH (m, node, &s->monitors) {
1581 struct json *json;
1582
1583 json = ovsdb_jsonrpc_monitor_compose_update(m, false);
1584 if (json) {
1585 struct jsonrpc_msg *msg;
1586 struct json *params;
1587
1588 params = json_array_create_2(json_clone(m->monitor_id), json);
1589 msg = ovsdb_jsonrpc_create_notify(m, params);
1590 jsonrpc_session_send(s->js, msg);
1591 }
1592 }
1593 }
1594
1595 void
1596 ovsdb_jsonrpc_disable_monitor_cond(void)
1597 {
1598 /* Once disabled, it is not possible to re-enable it. */
1599 monitor_cond_enable__ = false;
1600 }