]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/ovsdb-client.c
ovsdb_monitor: Fix style of prototypes.
[mirror_ovs.git] / ovsdb / ovsdb-client.c
1 /*
2 * Copyright (c) 2009-2017 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18
19 #include <ctype.h>
20 #include <errno.h>
21 #include <fcntl.h>
22 #include <getopt.h>
23 #include <limits.h>
24 #include <signal.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <unistd.h>
28
29 #include "command-line.h"
30 #include "column.h"
31 #include "compiler.h"
32 #include "daemon.h"
33 #include "dirs.h"
34 #include "openvswitch/dynamic-string.h"
35 #include "fatal-signal.h"
36 #include "file.h"
37 #include "openvswitch/json.h"
38 #include "jsonrpc.h"
39 #include "lib/table.h"
40 #include "log.h"
41 #include "ovsdb.h"
42 #include "ovsdb-data.h"
43 #include "ovsdb-error.h"
44 #include "ovsdb-session.h"
45 #include "openvswitch/poll-loop.h"
46 #include "row.h"
47 #include "sort.h"
48 #include "svec.h"
49 #include "storage.h"
50 #include "stream.h"
51 #include "stream-ssl.h"
52 #include "table.h"
53 #include "transaction.h"
54 #include "monitor.h"
55 #include "condition.h"
56 #include "timeval.h"
57 #include "unixctl.h"
58 #include "util.h"
59 #include "openvswitch/vlog.h"
60
61 VLOG_DEFINE_THIS_MODULE(ovsdb_client);
62
63 enum args_needed {
64 NEED_NONE, /* No JSON-RPC connection or database name needed. */
65 NEED_RPC, /* JSON-RPC connection needed. */
66 NEED_DATABASE /* JSON-RPC connection and database name needed. */
67 };
68
69 struct ovsdb_client_command {
70 const char *name;
71 enum args_needed need;
72 int min_args;
73 int max_args;
74 void (*handler)(struct jsonrpc *rpc, const char *database,
75 int argc, char *argv[]);
76 };
77
78 /* --timestamp: Print a timestamp before each update on "monitor" command? */
79 static bool timestamp;
80
81 /* --db-change-aware, --no-db-change-aware: Enable db_change_aware feature for
82 * "monitor" command?
83 *
84 * -1 (default): Use db_change_aware if available.
85 * 0: Disable db_change_aware.
86 * 1: Require db_change_aware.
87 *
88 * (This option is undocumented because anything other than the default is
89 * expected to be useful only for testing that the db_change_aware feature
90 * actually works.) */
91 static int db_change_aware = -1;
92
93 /* --force: Ignore schema differences for "restore" command? */
94 static bool force;
95
96 /* --leader-only, --no-leader-only: Only accept the leader in a cluster. */
97 static bool leader_only = true;
98
99 /* Format for table output. */
100 static struct table_style table_style = TABLE_STYLE_DEFAULT;
101
102 static const struct ovsdb_client_command *get_all_commands(void);
103
104 static struct json *parse_json(const char *);
105
106 OVS_NO_RETURN static void usage(void);
107 static void parse_options(int argc, char *argv[]);
108 static struct jsonrpc *open_jsonrpc(const char *server);
109 static void fetch_dbs(struct jsonrpc *, struct svec *dbs);
110 static bool should_stay_connected(const char *server, const char *database,
111 const struct uuid *cid,
112 const struct jsonrpc_msg *reply);
113 struct jsonrpc_msg *create_database_info_request(const char *database);
114
115 static char *
116 default_remote(void)
117 {
118 return xasprintf("unix:%s/db.sock", ovs_rundir());
119 }
120
121 static int
122 open_rpc(int min_args, enum args_needed need,
123 int argc, char *argv[], struct jsonrpc **rpcp, char **databasep)
124 {
125 struct svec remotes = SVEC_EMPTY_INITIALIZER;
126 struct uuid cid = UUID_ZERO;
127
128 /* First figure out the remote(s). If the first command-line argument has
129 * the form of a remote, use it, otherwise use the default. */
130 int argidx = 0;
131 if (argc > min_args && (isalpha((unsigned char) argv[0][0])
132 && strchr(argv[0], ':'))) {
133 ovsdb_session_parse_remote(argv[argidx++], &remotes, &cid);
134 } else {
135 svec_add_nocopy(&remotes, default_remote());
136 }
137
138 /* Handle the case where there's one remote. In this case, if we need a
139 * database name, we try to figure out a default if none was specified
140 * explicitly. */
141 char *database = *databasep;
142 if (remotes.n == 1) {
143 struct jsonrpc *rpc = open_jsonrpc(remotes.names[0]);
144 svec_destroy(&remotes);
145
146 if (need == NEED_DATABASE && !database) {
147 struct svec dbs;
148
149 svec_init(&dbs);
150 fetch_dbs(rpc, &dbs);
151 if (argc - argidx > min_args
152 && svec_contains(&dbs, argv[argidx])) {
153 database = xstrdup(argv[argidx++]);
154 } else if (svec_contains(&dbs, "Open_vSwitch")) {
155 database = xstrdup("Open_vSwitch");
156 } else {
157 size_t n = 0;
158 const char *best = NULL;
159 for (size_t i = 0; i < dbs.n; i++) {
160 if (dbs.names[i][0] != '_') {
161 best = dbs.names[i];
162 n++;
163 }
164 }
165 if (n != 1) {
166 jsonrpc_close(rpc);
167 ovs_fatal(0, "could not find a default database, "
168 "please specify a database name");
169 }
170 database = xstrdup(best);
171 }
172 svec_destroy(&dbs);
173 }
174 *rpcp = rpc;
175 *databasep = database;
176
177 return argidx;
178 }
179
180 /* If there's more than one remote, and we need a database name, then it
181 * must be specified explicitly. It's too likely to cause surprising
182 * behavior if we try to pick a default across several servers. */
183 if (!database && need == NEED_DATABASE) {
184 if (argc - argidx > min_args) {
185 database = xstrdup(argv[argidx++]);
186 } else {
187 ovs_fatal(0, "database name is required with multiple remotes");
188 }
189 }
190
191 /* We have multiple remotes. Connect to them in a random order and choose
192 * the first one that is up and hosts the database we want (if any) in an
193 * acceptable state. */
194 struct jsonrpc_session *js = jsonrpc_session_open_multiple(
195 &remotes, false);
196 svec_destroy(&remotes);
197
198 unsigned int seqno = 0;
199 struct json *id = NULL;
200 for (;;) {
201 jsonrpc_session_run(js);
202 if (!jsonrpc_session_is_alive(js)) {
203 ovs_fatal(0, "no servers were available");
204 }
205
206 if (seqno != jsonrpc_session_get_seqno(js)
207 && jsonrpc_session_is_connected(js)) {
208 if (!database) {
209 break;
210 }
211
212 seqno = jsonrpc_session_get_seqno(js);
213 struct jsonrpc_msg *txn = create_database_info_request(database);
214 json_destroy(id);
215 id = json_clone(txn->id);
216 jsonrpc_session_send(js, txn);
217 }
218
219 struct jsonrpc_msg *reply = jsonrpc_session_recv(js);
220 if (reply && id && reply->id && json_equal(id, reply->id)) {
221 if (reply->type == JSONRPC_REPLY
222 && should_stay_connected(jsonrpc_session_get_name(js),
223 database, &cid, reply)) {
224 jsonrpc_msg_destroy(reply);
225 break;
226 }
227 jsonrpc_session_force_reconnect(js);
228 }
229 jsonrpc_msg_destroy(reply);
230
231 jsonrpc_session_recv_wait(js);
232 jsonrpc_session_wait(js);
233 poll_block();
234 }
235 json_destroy(id);
236
237 *rpcp = jsonrpc_session_steal(js);
238 *databasep = database;
239 return argidx;
240 }
241
242 int
243 main(int argc, char *argv[])
244 {
245 const struct ovsdb_client_command *command;
246 ovs_cmdl_proctitle_init(argc, argv);
247 set_program_name(argv[0]);
248 service_start(&argc, &argv);
249 parse_options(argc, argv);
250 fatal_ignore_sigpipe();
251
252 daemon_become_new_user(false);
253 if (optind >= argc) {
254 ovs_fatal(0, "missing command name; use --help for help");
255 }
256
257 for (command = get_all_commands(); ; command++) {
258 if (!command->name) {
259 VLOG_FATAL("unknown command '%s'; use --help for help",
260 argv[optind]);
261 } else if (!strcmp(command->name, argv[optind])) {
262 break;
263 }
264 }
265 optind++;
266
267 char *database = NULL;
268 struct jsonrpc *rpc = NULL;
269 if (command->need != NEED_NONE) {
270 optind += open_rpc(command->min_args, command->need,
271 argc - optind, argv + optind, &rpc, &database);
272 }
273
274
275 if (argc - optind < command->min_args ||
276 argc - optind > command->max_args) {
277 free(database);
278 VLOG_FATAL("invalid syntax for '%s' (use --help for help)",
279 command->name);
280 }
281
282 command->handler(rpc, database, argc - optind, argv + optind);
283
284 free(database);
285 jsonrpc_close(rpc);
286
287 if (ferror(stdout)) {
288 VLOG_FATAL("write to stdout failed");
289 }
290 if (ferror(stderr)) {
291 VLOG_FATAL("write to stderr failed");
292 }
293
294 return 0;
295 }
296
297 static void
298 parse_options(int argc, char *argv[])
299 {
300 enum {
301 OPT_BOOTSTRAP_CA_CERT = UCHAR_MAX + 1,
302 OPT_TIMESTAMP,
303 OPT_FORCE,
304 OPT_LEADER_ONLY,
305 OPT_NO_LEADER_ONLY,
306 VLOG_OPTION_ENUMS,
307 DAEMON_OPTION_ENUMS,
308 TABLE_OPTION_ENUMS,
309 SSL_OPTION_ENUMS,
310 };
311 static const struct option long_options[] = {
312 {"help", no_argument, NULL, 'h'},
313 {"version", no_argument, NULL, 'V'},
314 {"timestamp", no_argument, NULL, OPT_TIMESTAMP},
315 {"force", no_argument, NULL, OPT_FORCE},
316 {"timeout", required_argument, NULL, 't'},
317 {"db-change-aware", no_argument, &db_change_aware, 1},
318 {"no-db-change-aware", no_argument, &db_change_aware, 0},
319 {"leader-only", no_argument, NULL, OPT_LEADER_ONLY},
320 {"no-leader-only", no_argument, NULL, OPT_NO_LEADER_ONLY},
321 VLOG_LONG_OPTIONS,
322 DAEMON_LONG_OPTIONS,
323 #ifdef HAVE_OPENSSL
324 {"bootstrap-ca-cert", required_argument, NULL, OPT_BOOTSTRAP_CA_CERT},
325 STREAM_SSL_LONG_OPTIONS,
326 #endif
327 TABLE_LONG_OPTIONS,
328 {NULL, 0, NULL, 0},
329 };
330 char *short_options = ovs_cmdl_long_options_to_short_options(long_options);
331 unsigned int timeout = 0;
332
333 table_style.format = TF_TABLE;
334
335 for (;;) {
336 int c;
337
338 c = getopt_long(argc, argv, short_options, long_options, NULL);
339 if (c == -1) {
340 break;
341 }
342
343 switch (c) {
344 case 'h':
345 usage();
346
347 case 'V':
348 ovs_print_version(0, 0);
349 exit(EXIT_SUCCESS);
350
351 VLOG_OPTION_HANDLERS
352 DAEMON_OPTION_HANDLERS
353 TABLE_OPTION_HANDLERS(&table_style)
354 STREAM_SSL_OPTION_HANDLERS
355
356 case OPT_BOOTSTRAP_CA_CERT:
357 stream_ssl_set_ca_cert_file(optarg, true);
358 break;
359
360 case OPT_TIMESTAMP:
361 timestamp = true;
362 break;
363
364 case OPT_FORCE:
365 force = true;
366 break;
367
368 case 't':
369 if (!str_to_uint(optarg, 10, &timeout) || !timeout) {
370 ovs_fatal(0, "value %s on -t or --timeout is invalid", optarg);
371 }
372 break;
373
374 case OPT_LEADER_ONLY:
375 leader_only = true;
376 break;
377
378 case OPT_NO_LEADER_ONLY:
379 leader_only = false;
380 break;
381
382 case '?':
383 exit(EXIT_FAILURE);
384
385 case 0:
386 /* getopt_long() already set the value for us. */
387 break;
388
389 default:
390 abort();
391 }
392 }
393 free(short_options);
394
395 ctl_timeout_setup(timeout);
396 }
397
398 static void
399 usage(void)
400 {
401 printf("%s: Open vSwitch database JSON-RPC client\n"
402 "usage: %s [OPTIONS] COMMAND [ARG...]\n"
403 "\nValid commands are:\n"
404 "\n list-dbs [SERVER]\n"
405 " list databases available on SERVER\n"
406 "\n get-schema [SERVER] [DATABASE]\n"
407 " retrieve schema for DATABASE from SERVER\n"
408 "\n get-schema-version [SERVER] [DATABASE]\n"
409 " retrieve schema for DATABASE from SERVER and report only its\n"
410 " version number on stdout\n"
411 "\n get-schema-cksum [SERVER] [DATABASE]\n"
412 " retrieve schema for DATABASE from SERVER and report only its\n"
413 " checksum on stdout\n"
414 "\n list-tables [SERVER] [DATABASE]\n"
415 " list tables for DATABASE on SERVER\n"
416 "\n list-columns [SERVER] [DATABASE] [TABLE]\n"
417 " list columns in TABLE (or all tables) in DATABASE on SERVER\n"
418 "\n transact [SERVER] TRANSACTION\n"
419 " run TRANSACTION (params for \"transact\" request) on SERVER\n"
420 " and print the results as JSON on stdout\n"
421 "\n query [SERVER] TRANSACTION\n"
422 " run TRANSACTION (params for \"transact\" request) on SERVER,\n"
423 " as read-only, and print the results as JSON on stdout\n"
424 "\n monitor [SERVER] [DATABASE] TABLE [COLUMN,...]...\n"
425 " monitor contents of COLUMNs in TABLE in DATABASE on SERVER.\n"
426 " COLUMNs may include !initial, !insert, !delete, !modify\n"
427 " to avoid seeing the specified kinds of changes.\n"
428 "\n monitor-cond [SERVER] [DATABASE] CONDITION TABLE [COLUMN,...]...\n"
429 " monitor contents that match CONDITION of COLUMNs in TABLE in\n"
430 " DATABASE on SERVER.\n"
431 " COLUMNs may include !initial, !insert, !delete, !modify\n"
432 " to avoid seeing the specified kinds of changes.\n"
433 "\n convert [SERVER] SCHEMA\n"
434 " convert database on SERVER named in SCHEMA to SCHEMA.\n"
435 "\n needs-conversion [SERVER] SCHEMA\n"
436 " tests whether SCHEMA's db on SERVER needs conversion.\n"
437 "\n monitor [SERVER] [DATABASE] ALL\n"
438 " monitor all changes to all columns in all tables\n"
439 "\n wait [SERVER] DATABASE STATE\n"
440 " wait until DATABASE reaches STATE "
441 "(\"added\" or \"connected\" or \"removed\")\n"
442 " in DATBASE on SERVER.\n"
443 "\n dump [SERVER] [DATABASE]\n"
444 " dump contents of DATABASE on SERVER to stdout\n"
445 "\n backup [SERVER] [DATABASE] > SNAPSHOT\n"
446 " dump database contents in the form of a database file\n"
447 "\n [--force] restore [SERVER] [DATABASE] < SNAPSHOT\n"
448 " restore database contents from a database file\n"
449 "\n lock [SERVER] LOCK\n"
450 " create or wait for LOCK in SERVER\n"
451 "\n steal [SERVER] LOCK\n"
452 " steal LOCK from SERVER\n"
453 "\n unlock [SERVER] LOCK\n"
454 " unlock LOCK from SERVER\n"
455 "\nThe default SERVER is unix:%s/db.sock.\n"
456 "The default DATABASE is Open_vSwitch.\n",
457 program_name, program_name, ovs_rundir());
458 stream_usage("SERVER", true, true, true);
459 table_usage();
460 printf(" --timestamp timestamp \"monitor\" output");
461 daemon_usage();
462 vlog_usage();
463 printf("\nOther options:\n"
464 " -h, --help display this help message\n"
465 " -V, --version display version information\n");
466 exit(EXIT_SUCCESS);
467 }
468 \f
469 static void
470 check_txn(int error, struct jsonrpc_msg **reply_)
471 {
472 struct jsonrpc_msg *reply = *reply_;
473
474 if (error) {
475 ovs_fatal(error, "transaction failed");
476 }
477
478 if (reply->error) {
479 ovs_fatal(error, "transaction returned error: %s",
480 json_to_string(reply->error, table_style.json_flags));
481 }
482 }
483
484 static struct json *
485 parse_json(const char *s)
486 {
487 struct json *json = json_from_string(s);
488 if (json->type == JSON_STRING) {
489 ovs_fatal(0, "\"%s\": %s", s, json->string);
490 }
491 return json;
492 }
493
494 static struct jsonrpc *
495 open_jsonrpc(const char *server)
496 {
497 struct stream *stream;
498 int error;
499
500 error = stream_open_block(jsonrpc_stream_open(server, &stream,
501 DSCP_DEFAULT), -1, &stream);
502 if (error == EAFNOSUPPORT) {
503 struct pstream *pstream;
504
505 error = jsonrpc_pstream_open(server, &pstream, DSCP_DEFAULT);
506 if (error) {
507 ovs_fatal(error, "failed to connect or listen to \"%s\"", server);
508 }
509
510 VLOG_INFO("%s: waiting for connection...", server);
511 error = pstream_accept_block(pstream, &stream);
512 if (error) {
513 ovs_fatal(error, "failed to accept connection on \"%s\"", server);
514 }
515
516 pstream_close(pstream);
517 } else if (error) {
518 ovs_fatal(error, "failed to connect to \"%s\"", server);
519 }
520
521 return jsonrpc_open(stream);
522 }
523
524 static void
525 print_json(struct json *json)
526 {
527 char *string = json_to_string(json, table_style.json_flags);
528 puts(string);
529 free(string);
530 }
531
532 static void
533 print_and_free_json(struct json *json)
534 {
535 print_json(json);
536 json_destroy(json);
537 }
538
539 static void
540 check_ovsdb_error(struct ovsdb_error *error)
541 {
542 if (error) {
543 ovs_fatal(0, "%s", ovsdb_error_to_string(error));
544 }
545 }
546
547 static struct ovsdb_schema *
548 fetch_schema(struct jsonrpc *rpc, const char *database)
549 {
550 struct jsonrpc_msg *request, *reply;
551 struct ovsdb_schema *schema;
552
553 request = jsonrpc_create_request("get_schema",
554 json_array_create_1(
555 json_string_create(database)),
556 NULL);
557 check_txn(jsonrpc_transact_block(rpc, request, &reply), &reply);
558 check_ovsdb_error(ovsdb_schema_from_json(reply->result, &schema));
559 jsonrpc_msg_destroy(reply);
560
561 return schema;
562 }
563
564 static void
565 fetch_dbs(struct jsonrpc *rpc, struct svec *dbs)
566 {
567 struct jsonrpc_msg *request, *reply;
568 size_t i;
569
570 request = jsonrpc_create_request("list_dbs", json_array_create_empty(),
571 NULL);
572
573 check_txn(jsonrpc_transact_block(rpc, request, &reply), &reply);
574 if (reply->result->type != JSON_ARRAY) {
575 ovs_fatal(0, "list_dbs response is not array");
576 }
577
578 for (i = 0; i < reply->result->array.n; i++) {
579 const struct json *name = reply->result->array.elems[i];
580
581 if (name->type != JSON_STRING) {
582 ovs_fatal(0, "list_dbs response %"PRIuSIZE" is not string", i);
583 }
584 svec_add(dbs, name->string);
585 }
586 jsonrpc_msg_destroy(reply);
587 svec_sort(dbs);
588 }
589
590 static const char *
591 parse_string_column(const struct json *row, const char *column_name)
592 {
593 const struct json *column = shash_find_data(json_object(row), column_name);
594 return column && column->type == JSON_STRING ? json_string(column) : "";
595 }
596
597 static int
598 parse_boolean_column(const struct json *row, const char *column_name)
599 {
600 const struct json *column = shash_find_data(json_object(row), column_name);
601 return (!column ? -1
602 : column->type == JSON_TRUE ? true
603 : column->type == JSON_FALSE ? false
604 : -1);
605 }
606
607 static struct uuid
608 parse_uuid_column(const struct json *row, const char *column_name)
609 {
610 const struct json *column = shash_find_data(json_object(row), column_name);
611 if (!column) {
612 return UUID_ZERO;
613 }
614
615 struct ovsdb_type type = { OVSDB_BASE_UUID_INIT, OVSDB_BASE_VOID_INIT,
616 0, 1 };
617 struct ovsdb_datum datum;
618 struct ovsdb_error *error = ovsdb_datum_from_json(&datum, &type, column,
619 NULL);
620 if (error) {
621 ovsdb_error_destroy(error);
622 return UUID_ZERO;
623 }
624 struct uuid uuid = datum.n > 0 ? datum.keys[0].uuid : UUID_ZERO;
625 ovsdb_datum_destroy(&datum, &type);
626 return uuid;
627 }
628
629 struct jsonrpc_msg *
630 create_database_info_request(const char *database)
631 {
632 struct json *op = json_object_create();
633 json_object_put_string(op, "op", "select");
634 json_object_put_string(op, "table", "Database");
635 struct json *condition = json_array_create_3(
636 json_string_create("name"),
637 json_string_create("=="),
638 json_string_create(database));
639 json_object_put(op, "where", json_array_create_1(condition));
640 struct json *txn = json_array_create_2(
641 json_string_create("_Server"), op);
642 return jsonrpc_create_request("transact", txn, NULL);
643 }
644
645 static const struct json *
646 parse_database_info_reply(const struct jsonrpc_msg *reply, const char *server,
647 const char *database, const struct uuid *cid)
648 {
649 const struct json *result = reply->result;
650 if (result->type != JSON_ARRAY
651 || result->array.n != 1
652 || result->array.elems[0]->type != JSON_OBJECT) {
653 VLOG_WARN("%s: unexpected reply to _Server request for %s",
654 server, database);
655 return NULL;
656 }
657
658 const struct json *op_result = result->array.elems[0];
659 const struct json *rows = shash_find_data(json_object(op_result), "rows");
660 if (!rows || rows->type != JSON_ARRAY) {
661 VLOG_WARN("%s: missing \"rows\" member in _Server reply for %s",
662 server, database);
663 return NULL;
664 }
665
666 for (size_t i = 0; i < rows->array.n; i++) {
667 const struct json *row = rows->array.elems[i];
668 if (row->type != JSON_OBJECT) {
669 VLOG_WARN("%s: bad row in _Server reply for %s",
670 server, database);
671 continue;
672 }
673
674 if (strcmp(parse_string_column(row, "name"), database)) {
675 continue;
676 }
677
678 if (cid && !uuid_is_zero(cid)) {
679 struct uuid cid2 = parse_uuid_column(row, "cid");
680 if (!uuid_equals(cid, &cid2)) {
681 continue;
682 }
683 }
684
685 return row;
686 }
687
688 /* No such database. */
689 return NULL;
690 }
691
692 /* Parses 'reply', a JSON-RPC reply to our request asking for the status of
693 * 'database' on 'server'. Determines whether this server is acceptable for
694 * the transaction we want to make and returns true if so or false to
695 * disconnect and try a different server. */
696 static bool
697 should_stay_connected(const char *server, const char *database,
698 const struct uuid *cid, const struct jsonrpc_msg *reply)
699 {
700 const struct json *row = parse_database_info_reply(reply, server,
701 database, cid);
702 if (!row) {
703 /* No such database. */
704 return false;
705 }
706
707 if (strcmp(parse_string_column(row, "model"), "clustered")) {
708 /* Always accept standalone databases. */
709 return true;
710 }
711
712 if (!parse_boolean_column(row, "connected")) {
713 /* Reject disconnected servers. */
714 return false;
715 }
716
717 if (leader_only && !parse_boolean_column(row, "leader")) {
718 /* Reject if not leader.. */
719 return false;
720 }
721
722 return true;
723 }
724 \f
725 static void
726 do_list_dbs(struct jsonrpc *rpc, const char *database OVS_UNUSED,
727 int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
728 {
729 const char *db_name;
730 struct svec dbs;
731 size_t i;
732
733 svec_init(&dbs);
734 fetch_dbs(rpc, &dbs);
735 SVEC_FOR_EACH (i, db_name, &dbs) {
736 puts(db_name);
737 }
738 svec_destroy(&dbs);
739 }
740
741 static void
742 do_get_schema(struct jsonrpc *rpc, const char *database,
743 int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
744 {
745 struct ovsdb_schema *schema = fetch_schema(rpc, database);
746 print_and_free_json(ovsdb_schema_to_json(schema));
747 ovsdb_schema_destroy(schema);
748 }
749
750 static void
751 do_get_schema_version(struct jsonrpc *rpc, const char *database,
752 int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
753 {
754 struct ovsdb_schema *schema = fetch_schema(rpc, database);
755 puts(schema->version);
756 ovsdb_schema_destroy(schema);
757 }
758
759 static void
760 do_get_schema_cksum(struct jsonrpc *rpc, const char *database,
761 int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
762 {
763 struct ovsdb_schema *schema = fetch_schema(rpc, database);
764 puts(schema->cksum);
765 ovsdb_schema_destroy(schema);
766 }
767
768 static void
769 do_list_tables(struct jsonrpc *rpc, const char *database,
770 int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
771 {
772 struct ovsdb_schema *schema;
773 struct shash_node *node;
774 struct table t;
775
776 schema = fetch_schema(rpc, database);
777 table_init(&t);
778 table_add_column(&t, "Table");
779 SHASH_FOR_EACH (node, &schema->tables) {
780 struct ovsdb_table_schema *ts = node->data;
781
782 table_add_row(&t);
783 table_add_cell(&t)->text = xstrdup(ts->name);
784 }
785 ovsdb_schema_destroy(schema);
786 table_print(&t, &table_style);
787 table_destroy(&t);
788 }
789
790 static void
791 do_list_columns(struct jsonrpc *rpc, const char *database,
792 int argc OVS_UNUSED, char *argv[])
793 {
794 const char *table_name = argv[0];
795 struct ovsdb_schema *schema;
796 struct shash_node *table_node;
797 struct table t;
798
799 schema = fetch_schema(rpc, database);
800 table_init(&t);
801 if (!table_name) {
802 table_add_column(&t, "Table");
803 }
804 table_add_column(&t, "Column");
805 table_add_column(&t, "Type");
806 SHASH_FOR_EACH (table_node, &schema->tables) {
807 struct ovsdb_table_schema *ts = table_node->data;
808
809 if (!table_name || !strcmp(table_name, ts->name)) {
810 struct shash_node *column_node;
811
812 SHASH_FOR_EACH (column_node, &ts->columns) {
813 const struct ovsdb_column *column = column_node->data;
814
815 table_add_row(&t);
816 if (!table_name) {
817 table_add_cell(&t)->text = xstrdup(ts->name);
818 }
819 table_add_cell(&t)->text = xstrdup(column->name);
820 table_add_cell(&t)->json = ovsdb_type_to_json(&column->type);
821 }
822 }
823 }
824 ovsdb_schema_destroy(schema);
825 table_print(&t, &table_style);
826 table_destroy(&t);
827 }
828
829 static void
830 send_db_change_aware(struct jsonrpc *rpc)
831 {
832 if (db_change_aware != 0) {
833 struct jsonrpc_msg *request = jsonrpc_create_request(
834 "set_db_change_aware",
835 json_array_create_1(json_boolean_create(true)),
836 NULL);
837 struct jsonrpc_msg *reply;
838 int error = jsonrpc_transact_block(rpc, request, &reply);
839 if (error) {
840 ovs_fatal(error, "%s: error setting db_change_aware",
841 jsonrpc_get_name(rpc));
842 }
843 if (reply->type == JSONRPC_ERROR && db_change_aware == 1) {
844 ovs_fatal(0, "%s: set_db_change_aware failed (%s)",
845 jsonrpc_get_name(rpc), json_to_string(reply->error, 0));
846 }
847 jsonrpc_msg_destroy(reply);
848 }
849 }
850
851 static struct json *
852 do_transact__(int argc, char *argv[], struct json *transaction)
853 {
854 struct jsonrpc_msg *request, *reply;
855 if (transaction->type != JSON_ARRAY
856 || !transaction->array.n
857 || transaction->array.elems[0]->type != JSON_STRING) {
858 ovs_fatal(0, "not a valid OVSDB query");
859 }
860 const char *db_name = json_string(transaction->array.elems[0]);
861
862 struct jsonrpc *rpc;
863 char *database = CONST_CAST(char *, db_name);
864 open_rpc(1, NEED_DATABASE, argc, argv, &rpc, &database);
865
866 if (db_change_aware == 1) {
867 send_db_change_aware(rpc);
868 }
869 daemon_save_fd(STDOUT_FILENO);
870 daemon_save_fd(STDERR_FILENO);
871 daemonize();
872
873 request = jsonrpc_create_request("transact", transaction, NULL);
874 check_txn(jsonrpc_transact_block(rpc, request, &reply), &reply);
875 struct json *result = json_clone(reply->result);
876 jsonrpc_msg_destroy(reply);
877 jsonrpc_close(rpc);
878
879 return result;
880 }
881
882 static void
883 do_transact(struct jsonrpc *rpc OVS_UNUSED, const char *database OVS_UNUSED,
884 int argc, char *argv[])
885 {
886 print_and_free_json(do_transact__(argc, argv, parse_json(argv[argc - 1])));
887 }
888
889 static void
890 do_query(struct jsonrpc *rpc OVS_UNUSED, const char *database OVS_UNUSED,
891 int argc, char *argv[])
892 {
893 struct json *transaction = parse_json(argv[argc - 1]);
894
895 if (transaction->type != JSON_ARRAY) {
896 ovs_fatal(0, "not a valid OVSDB query");
897 }
898
899 /* Append an "abort" operation to the query. */
900 struct json *abort_op = json_object_create();
901 json_object_put_string(abort_op, "op", "abort");
902 json_array_add(transaction, abort_op);
903 size_t abort_idx = transaction->array.n - 2;
904
905 /* Run query. */
906 struct json *result = do_transact__(argc, argv, transaction);
907
908 /* If the "abort" operation ended the transaction, remove its result. */
909 if (result->type == JSON_ARRAY
910 && result->array.n == abort_idx + 1
911 && result->array.elems[abort_idx]->type == JSON_OBJECT) {
912 struct json *op_result = result->array.elems[abort_idx];
913 struct json *error = shash_find_data(json_object(op_result), "error");
914 if (error
915 && error->type == JSON_STRING
916 && !strcmp(json_string(error), "aborted")) {
917 result->array.n--;
918 json_destroy(op_result);
919 }
920 }
921
922 /* Print the result. */
923 print_and_free_json(result);
924 }
925 \f
926 /* "monitor" command. */
927
928 struct monitored_table {
929 struct ovsdb_table_schema *table;
930 struct ovsdb_column_set columns;
931 };
932
933 static void
934 monitor_print_row(struct json *row, const char *type, const char *uuid,
935 const struct ovsdb_column_set *columns, struct table *t)
936 {
937 size_t i;
938
939 if (!row) {
940 ovs_error(0, "missing %s row", type);
941 return;
942 } else if (row->type != JSON_OBJECT) {
943 ovs_error(0, "<row> is not object");
944 return;
945 }
946
947 table_add_row(t);
948 table_add_cell(t)->text = xstrdup(uuid);
949 table_add_cell(t)->text = xstrdup(type);
950 for (i = 0; i < columns->n_columns; i++) {
951 const struct ovsdb_column *column = columns->columns[i];
952 struct json *value = shash_find_data(json_object(row), column->name);
953 struct cell *cell = table_add_cell(t);
954 if (value) {
955 cell->json = json_clone(value);
956 cell->type = &column->type;
957 }
958 }
959 }
960
961 static void
962 monitor_print_table(struct json *table_update,
963 const struct monitored_table *mt, char *caption,
964 bool initial)
965 {
966 const struct ovsdb_table_schema *table = mt->table;
967 const struct ovsdb_column_set *columns = &mt->columns;
968 struct shash_node *node;
969 struct table t;
970 size_t i;
971
972 if (table_update->type != JSON_OBJECT) {
973 ovs_error(0, "<table-update> for table %s is not object", table->name);
974 return;
975 }
976
977 table_init(&t);
978 table_set_timestamp(&t, timestamp);
979 table_set_caption(&t, caption);
980
981 table_add_column(&t, "row");
982 table_add_column(&t, "action");
983 for (i = 0; i < columns->n_columns; i++) {
984 table_add_column(&t, "%s", columns->columns[i]->name);
985 }
986 SHASH_FOR_EACH (node, json_object(table_update)) {
987 struct json *row_update = node->data;
988 struct json *old, *new;
989
990 if (row_update->type != JSON_OBJECT) {
991 ovs_error(0, "<row-update> is not object");
992 continue;
993 }
994 old = shash_find_data(json_object(row_update), "old");
995 new = shash_find_data(json_object(row_update), "new");
996 if (initial) {
997 monitor_print_row(new, "initial", node->name, columns, &t);
998 } else if (!old) {
999 monitor_print_row(new, "insert", node->name, columns, &t);
1000 } else if (!new) {
1001 monitor_print_row(old, "delete", node->name, columns, &t);
1002 } else {
1003 monitor_print_row(old, "old", node->name, columns, &t);
1004 monitor_print_row(new, "new", "", columns, &t);
1005 }
1006 }
1007 table_print(&t, &table_style);
1008 table_destroy(&t);
1009 }
1010
1011 static void
1012 monitor_print(struct json *table_updates,
1013 const struct monitored_table *mts, size_t n_mts,
1014 bool initial)
1015 {
1016 size_t i;
1017
1018 if (table_updates->type != JSON_OBJECT) {
1019 ovs_error(0, "<table-updates> is not object");
1020 return;
1021 }
1022
1023 for (i = 0; i < n_mts; i++) {
1024 const struct monitored_table *mt = &mts[i];
1025 struct json *table_update = shash_find_data(json_object(table_updates),
1026 mt->table->name);
1027 if (table_update) {
1028 monitor_print_table(table_update, mt,
1029 n_mts > 1 ? xstrdup(mt->table->name) : NULL,
1030 initial);
1031 }
1032 }
1033 }
1034
1035 static void
1036 monitor2_print_row(struct json *row, const char *type, const char *uuid,
1037 const struct ovsdb_column_set *columns, struct table *t)
1038 {
1039 if (!strcmp(type, "delete")) {
1040 if (row->type != JSON_NULL) {
1041 ovs_error(0, "delete method does not expect <row>");
1042 return;
1043 }
1044
1045 table_add_row(t);
1046 table_add_cell(t)->text = xstrdup(uuid);
1047 table_add_cell(t)->text = xstrdup(type);
1048 } else {
1049 if (!row || row->type != JSON_OBJECT) {
1050 ovs_error(0, "<row> is not object");
1051 return;
1052 }
1053 monitor_print_row(row, type, uuid, columns, t);
1054 }
1055 }
1056
1057 static void
1058 monitor2_print_table(struct json *table_update2,
1059 const struct monitored_table *mt, char *caption)
1060 {
1061 const struct ovsdb_table_schema *table = mt->table;
1062 const struct ovsdb_column_set *columns = &mt->columns;
1063 struct shash_node *node;
1064 struct table t;
1065
1066 if (table_update2->type != JSON_OBJECT) {
1067 ovs_error(0, "<table-update> for table %s is not object", table->name);
1068 return;
1069 }
1070
1071 table_init(&t);
1072 table_set_timestamp(&t, timestamp);
1073 table_set_caption(&t, caption);
1074
1075 table_add_column(&t, "row");
1076 table_add_column(&t, "action");
1077 for (size_t i = 0; i < columns->n_columns; i++) {
1078 table_add_column(&t, "%s", columns->columns[i]->name);
1079 }
1080 SHASH_FOR_EACH (node, json_object(table_update2)) {
1081 struct json *row_update2 = node->data;
1082 const char *operation;
1083 struct json *row;
1084 const char *ops[] = {"delete", "initial", "modify", "insert"};
1085
1086 if (row_update2->type != JSON_OBJECT) {
1087 ovs_error(0, "<row-update2> is not object");
1088 continue;
1089 }
1090
1091 /* row_update2 contains one of objects indexed by ops[] */
1092 for (int i = 0; i < ARRAY_SIZE(ops); i++) {
1093 operation = ops[i];
1094 row = shash_find_data(json_object(row_update2), operation);
1095
1096 if (row) {
1097 monitor2_print_row(row, operation, node->name, columns, &t);
1098 break;
1099 }
1100 }
1101 }
1102 table_print(&t, &table_style);
1103 table_destroy(&t);
1104 }
1105
1106 static void
1107 monitor2_print(struct json *table_updates2,
1108 const struct monitored_table *mts, size_t n_mts)
1109 {
1110 size_t i;
1111
1112 if (table_updates2->type != JSON_OBJECT) {
1113 ovs_error(0, "<table-updates2> is not object");
1114 return;
1115 }
1116
1117 for (i = 0; i < n_mts; i++) {
1118 const struct monitored_table *mt = &mts[i];
1119 struct json *table_update = shash_find_data(
1120 json_object(table_updates2),
1121 mt->table->name);
1122 if (table_update) {
1123 monitor2_print_table(table_update, mt,
1124 n_mts > 1 ? xstrdup(mt->table->name) : NULL);
1125 }
1126 }
1127 }
1128
1129 static void
1130 add_column(const char *server, const struct ovsdb_column *column,
1131 struct ovsdb_column_set *columns, struct json *columns_json)
1132 {
1133 if (ovsdb_column_set_contains(columns, column->index)) {
1134 ovs_fatal(0, "%s: column \"%s\" mentioned multiple times",
1135 server, column->name);
1136 }
1137 ovsdb_column_set_add(columns, column);
1138 json_array_add(columns_json, json_string_create(column->name));
1139 }
1140
1141 static struct json *
1142 parse_monitor_columns(char *arg, const char *server, const char *database,
1143 const struct ovsdb_table_schema *table,
1144 struct ovsdb_column_set *columns)
1145 {
1146 bool initial, insert, delete, modify;
1147 struct json *mr, *columns_json;
1148 char *save_ptr = NULL;
1149 char *token;
1150
1151 mr = json_object_create();
1152 columns_json = json_array_create_empty();
1153 json_object_put(mr, "columns", columns_json);
1154
1155 initial = insert = delete = modify = true;
1156 for (token = strtok_r(arg, ",", &save_ptr); token != NULL;
1157 token = strtok_r(NULL, ",", &save_ptr)) {
1158 if (!strcmp(token, "!initial")) {
1159 initial = false;
1160 } else if (!strcmp(token, "!insert")) {
1161 insert = false;
1162 } else if (!strcmp(token, "!delete")) {
1163 delete = false;
1164 } else if (!strcmp(token, "!modify")) {
1165 modify = false;
1166 } else {
1167 const struct ovsdb_column *column;
1168
1169 column = ovsdb_table_schema_get_column(table, token);
1170 if (!column) {
1171 ovs_fatal(0, "%s: table \"%s\" in %s does not have a "
1172 "column named \"%s\"",
1173 server, table->name, database, token);
1174 }
1175 add_column(server, column, columns, columns_json);
1176 }
1177 }
1178
1179 if (columns_json->array.n == 0) {
1180 const struct shash_node **nodes;
1181 size_t i, n;
1182
1183 n = shash_count(&table->columns);
1184 nodes = shash_sort(&table->columns);
1185 for (i = 0; i < n; i++) {
1186 const struct ovsdb_column *column = nodes[i]->data;
1187 if (column->index != OVSDB_COL_UUID
1188 && column->index != OVSDB_COL_VERSION) {
1189 add_column(server, column, columns, columns_json);
1190 }
1191 }
1192 free(nodes);
1193
1194 add_column(server, ovsdb_table_schema_get_column(table, "_version"),
1195 columns, columns_json);
1196 }
1197
1198 if (!initial || !insert || !delete || !modify) {
1199 struct json *select = json_object_create();
1200 json_object_put(select, "initial", json_boolean_create(initial));
1201 json_object_put(select, "insert", json_boolean_create(insert));
1202 json_object_put(select, "delete", json_boolean_create(delete));
1203 json_object_put(select, "modify", json_boolean_create(modify));
1204 json_object_put(mr, "select", select);
1205 }
1206
1207 return mr;
1208 }
1209
1210 static void
1211 ovsdb_client_exit(struct unixctl_conn *conn, int argc OVS_UNUSED,
1212 const char *argv[] OVS_UNUSED, void *exiting_)
1213 {
1214 bool *exiting = exiting_;
1215 *exiting = true;
1216 unixctl_command_reply(conn, NULL);
1217 }
1218
1219 static void
1220 ovsdb_client_block(struct unixctl_conn *conn, int argc OVS_UNUSED,
1221 const char *argv[] OVS_UNUSED, void *blocked_)
1222 {
1223 bool *blocked = blocked_;
1224
1225 if (!*blocked) {
1226 *blocked = true;
1227 unixctl_command_reply(conn, NULL);
1228 } else {
1229 unixctl_command_reply(conn, "already blocking");
1230 }
1231 }
1232
1233 static void
1234 ovsdb_client_unblock(struct unixctl_conn *conn, int argc OVS_UNUSED,
1235 const char *argv[] OVS_UNUSED, void *blocked_)
1236 {
1237 bool *blocked = blocked_;
1238
1239 if (*blocked) {
1240 *blocked = false;
1241 unixctl_command_reply(conn, NULL);
1242 } else {
1243 unixctl_command_reply(conn, "already unblocked");
1244 }
1245 }
1246
1247 static void
1248 ovsdb_client_cond_change(struct unixctl_conn *conn, int argc OVS_UNUSED,
1249 const char *argv[], void *rpc_)
1250 {
1251 struct jsonrpc *rpc = rpc_;
1252 struct json *monitor_cond_update_requests = json_object_create();
1253 struct json *monitor_cond_update_request = json_object_create();
1254 struct json *params;
1255 struct jsonrpc_msg *request;
1256
1257 json_object_put(monitor_cond_update_request, "where",
1258 json_from_string(argv[2]));
1259 json_object_put(monitor_cond_update_requests,
1260 argv[1],
1261 json_array_create_1(monitor_cond_update_request));
1262
1263 params = json_array_create_3(json_null_create(),json_null_create(),
1264 monitor_cond_update_requests);
1265
1266 request = jsonrpc_create_request("monitor_cond_change", params, NULL);
1267 jsonrpc_send(rpc, request);
1268
1269 VLOG_DBG("cond change %s %s", argv[1], argv[2]);
1270 unixctl_command_reply(conn, "condition changed");
1271 }
1272
1273 static void
1274 add_monitored_table(int argc, char *argv[],
1275 const char *server, const char *database,
1276 struct json *condition,
1277 struct ovsdb_table_schema *table,
1278 struct json *monitor_requests,
1279 struct monitored_table **mts,
1280 size_t *n_mts, size_t *allocated_mts)
1281 {
1282 struct json *monitor_request_array, *mr;
1283 struct monitored_table *mt;
1284
1285 if (*n_mts >= *allocated_mts) {
1286 *mts = x2nrealloc(*mts, allocated_mts, sizeof **mts);
1287 }
1288 mt = &(*mts)[(*n_mts)++];
1289 mt->table = table;
1290 ovsdb_column_set_init(&mt->columns);
1291
1292 monitor_request_array = json_array_create_empty();
1293 if (argc > 1) {
1294 int i;
1295
1296 for (i = 1; i < argc; i++) {
1297 mr = parse_monitor_columns(argv[i], server, database, table,
1298 &mt->columns);
1299 if (i == 1 && condition) {
1300 json_object_put(mr, "where", condition);
1301 }
1302 json_array_add(monitor_request_array, mr);
1303 }
1304 } else {
1305 /* Allocate a writable empty string since parse_monitor_columns()
1306 * is going to strtok() it and that's risky with literal "". */
1307 char empty[] = "";
1308
1309 mr = parse_monitor_columns(empty, server, database,
1310 table, &mt->columns);
1311 if (condition) {
1312 json_object_put(mr, "where", condition);
1313 }
1314 json_array_add(monitor_request_array, mr);
1315 }
1316
1317 json_object_put(monitor_requests, table->name, monitor_request_array);
1318 }
1319
1320 static void
1321 destroy_monitored_table(struct monitored_table *mts, size_t n)
1322 {
1323 int i;
1324
1325 for (i = 0; i < n; i++) {
1326 struct monitored_table *mt = &mts[i];
1327 ovsdb_column_set_destroy(&mt->columns);
1328 }
1329
1330 free(mts);
1331 }
1332
1333 static void
1334 do_monitor__(struct jsonrpc *rpc, const char *database,
1335 enum ovsdb_monitor_version version,
1336 int argc, char *argv[], struct json *condition)
1337 {
1338 const char *server = jsonrpc_get_name(rpc);
1339 const char *table_name = argv[0];
1340 struct unixctl_server *unixctl;
1341 struct ovsdb_schema *schema;
1342 struct json *monitor, *monitor_requests, *request_id;
1343 bool exiting = false;
1344 bool blocked = false;
1345
1346 struct monitored_table *mts;
1347 size_t n_mts, allocated_mts;
1348
1349 ovs_assert(version < OVSDB_MONITOR_VERSION_MAX);
1350
1351 daemon_save_fd(STDOUT_FILENO);
1352 daemon_save_fd(STDERR_FILENO);
1353 daemonize_start(false);
1354 if (get_detach()) {
1355 int error;
1356
1357 error = unixctl_server_create(NULL, &unixctl);
1358 if (error) {
1359 ovs_fatal(error, "failed to create unixctl server");
1360 }
1361
1362 unixctl_command_register("exit", "", 0, 0,
1363 ovsdb_client_exit, &exiting);
1364 unixctl_command_register("ovsdb-client/block", "", 0, 0,
1365 ovsdb_client_block, &blocked);
1366 unixctl_command_register("ovsdb-client/unblock", "", 0, 0,
1367 ovsdb_client_unblock, &blocked);
1368 unixctl_command_register("ovsdb-client/cond_change", "TABLE COND", 2, 2,
1369 ovsdb_client_cond_change, rpc);
1370 } else {
1371 unixctl = NULL;
1372 }
1373
1374 schema = fetch_schema(rpc, database);
1375
1376 monitor_requests = json_object_create();
1377
1378 mts = NULL;
1379 n_mts = allocated_mts = 0;
1380 if (strcmp(table_name, "ALL")) {
1381 struct ovsdb_table_schema *table;
1382
1383 table = shash_find_data(&schema->tables, table_name);
1384 if (!table) {
1385 ovs_fatal(0, "%s: %s does not have a table named \"%s\"",
1386 server, database, table_name);
1387 }
1388
1389 add_monitored_table(argc, argv, server, database, condition, table,
1390 monitor_requests, &mts, &n_mts, &allocated_mts);
1391 } else {
1392 size_t n = shash_count(&schema->tables);
1393 const struct shash_node **nodes = shash_sort(&schema->tables);
1394 size_t i;
1395
1396 if (condition) {
1397 ovs_fatal(0, "ALL tables are not allowed with condition");
1398 }
1399
1400 for (i = 0; i < n; i++) {
1401 struct ovsdb_table_schema *table = nodes[i]->data;
1402
1403 add_monitored_table(argc, argv, server, database, NULL, table,
1404 monitor_requests,
1405 &mts, &n_mts, &allocated_mts);
1406 }
1407 free(nodes);
1408 }
1409
1410 send_db_change_aware(rpc);
1411
1412 monitor = json_array_create_3(json_string_create(database),
1413 json_null_create(), monitor_requests);
1414 const char *method = version == OVSDB_MONITOR_V2 ? "monitor_cond"
1415 : "monitor";
1416
1417 struct jsonrpc_msg *request;
1418 request = jsonrpc_create_request(method, monitor, NULL);
1419 request_id = json_clone(request->id);
1420 jsonrpc_send(rpc, request);
1421
1422 for (;;) {
1423 unixctl_server_run(unixctl);
1424 while (!blocked) {
1425 struct jsonrpc_msg *msg;
1426 int error;
1427
1428 error = jsonrpc_recv(rpc, &msg);
1429 if (error == EAGAIN) {
1430 break;
1431 } else if (error) {
1432 ovs_fatal(error, "%s: receive failed", server);
1433 }
1434
1435 if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
1436 jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
1437 msg->id));
1438 } else if (msg->type == JSONRPC_REPLY
1439 && json_equal(msg->id, request_id)) {
1440 switch(version) {
1441 case OVSDB_MONITOR_V1:
1442 monitor_print(msg->result, mts, n_mts, true);
1443 break;
1444 case OVSDB_MONITOR_V2:
1445 monitor2_print(msg->result, mts, n_mts);
1446 break;
1447 case OVSDB_MONITOR_VERSION_MAX:
1448 default:
1449 OVS_NOT_REACHED();
1450 }
1451 fflush(stdout);
1452 daemonize_complete();
1453 } else if (msg->type == JSONRPC_NOTIFY
1454 && !strcmp(msg->method, "update")) {
1455 struct json *params = msg->params;
1456 if (params->type == JSON_ARRAY
1457 && params->array.n == 2
1458 && params->array.elems[0]->type == JSON_NULL) {
1459 monitor_print(params->array.elems[1], mts, n_mts, false);
1460 fflush(stdout);
1461 }
1462 } else if (msg->type == JSONRPC_NOTIFY
1463 && version == OVSDB_MONITOR_V2
1464 && !strcmp(msg->method, "update2")) {
1465 struct json *params = msg->params;
1466 if (params->type == JSON_ARRAY
1467 && params->array.n == 2
1468 && params->array.elems[0]->type == JSON_NULL) {
1469 monitor2_print(params->array.elems[1], mts, n_mts);
1470 fflush(stdout);
1471 }
1472 } else if (msg->type == JSONRPC_NOTIFY
1473 && !strcmp(msg->method, "monitor_canceled")) {
1474 ovs_fatal(0, "%s: %s database was removed",
1475 server, database);
1476 }
1477 jsonrpc_msg_destroy(msg);
1478 }
1479
1480 if (exiting) {
1481 break;
1482 }
1483
1484 jsonrpc_run(rpc);
1485 jsonrpc_wait(rpc);
1486 if (!blocked) {
1487 jsonrpc_recv_wait(rpc);
1488 }
1489 unixctl_server_wait(unixctl);
1490 poll_block();
1491 }
1492
1493 json_destroy(request_id);
1494 unixctl_server_destroy(unixctl);
1495 ovsdb_schema_destroy(schema);
1496 destroy_monitored_table(mts, n_mts);
1497 }
1498
1499 static void
1500 do_monitor(struct jsonrpc *rpc, const char *database,
1501 int argc, char *argv[])
1502 {
1503 do_monitor__(rpc, database, OVSDB_MONITOR_V1, argc, argv, NULL);
1504 }
1505
1506 static void
1507 do_monitor_cond(struct jsonrpc *rpc, const char *database,
1508 int argc, char *argv[])
1509 {
1510 struct ovsdb_condition cnd;
1511 struct json *condition = NULL;
1512 struct ovsdb_schema *schema;
1513 struct ovsdb_table_schema *table;
1514 const char *table_name = argv[1];
1515
1516 ovs_assert(argc > 1);
1517 schema = fetch_schema(rpc, database);
1518 table = shash_find_data(&schema->tables, table_name);
1519 if (!table) {
1520 ovs_fatal(0, "%s does not have a table named \"%s\"",
1521 database, table_name);
1522 }
1523 condition = parse_json(argv[0]);
1524 check_ovsdb_error(ovsdb_condition_from_json(table, condition,
1525 NULL, &cnd));
1526 ovsdb_condition_destroy(&cnd);
1527 do_monitor__(rpc, database, OVSDB_MONITOR_V2, --argc, ++argv, condition);
1528 ovsdb_schema_destroy(schema);
1529 }
1530
1531 static bool
1532 is_database_clustered(struct jsonrpc *rpc, const char *database)
1533 {
1534 struct jsonrpc_msg *reply;
1535 check_txn(jsonrpc_transact_block(rpc,
1536 create_database_info_request(database),
1537 &reply), &reply);
1538
1539 const struct json *row = parse_database_info_reply(
1540 reply, jsonrpc_get_name(rpc), database, NULL);
1541 return !strcmp(parse_string_column(row, "model"), "clustered");
1542 }
1543
1544 static void
1545 do_convert(struct jsonrpc *rpc, const char *database_ OVS_UNUSED,
1546 int argc, char *argv[])
1547 {
1548 const char *schema_file_name = argv[argc - 1];
1549 struct ovsdb_schema *new_schema;
1550 check_ovsdb_error(ovsdb_schema_from_file(schema_file_name, &new_schema));
1551
1552 char *database = new_schema->name;
1553 open_rpc(1, NEED_DATABASE, argc, argv, &rpc, &database);
1554
1555 if (is_database_clustered(rpc, database)) {
1556 ovsdb_schema_persist_ephemeral_columns(new_schema, schema_file_name);
1557 }
1558
1559 send_db_change_aware(rpc);
1560
1561 struct jsonrpc_msg *request, *reply;
1562 request = jsonrpc_create_request(
1563 "convert",
1564 json_array_create_2(json_string_create(new_schema->name),
1565 ovsdb_schema_to_json(new_schema)), NULL);
1566 check_txn(jsonrpc_transact_block(rpc, request, &reply), &reply);
1567 jsonrpc_msg_destroy(reply);
1568 }
1569
1570 static void
1571 do_needs_conversion(struct jsonrpc *rpc, const char *database_ OVS_UNUSED,
1572 int argc OVS_UNUSED, char *argv[])
1573 {
1574 struct ovsdb_schema *schema1;
1575 check_ovsdb_error(ovsdb_schema_from_file(argv[0], &schema1));
1576
1577 char *database = schema1->name;
1578 open_rpc(1, NEED_DATABASE, argc, argv, &rpc, &database);
1579
1580 if (is_database_clustered(rpc, database)) {
1581 ovsdb_schema_persist_ephemeral_columns(schema1, argv[0]);
1582 }
1583
1584 struct ovsdb_schema *schema2 = fetch_schema(rpc, schema1->name);
1585 puts(ovsdb_schema_equal(schema1, schema2) ? "no" : "yes");
1586 ovsdb_schema_destroy(schema1);
1587 ovsdb_schema_destroy(schema2);
1588 }
1589
1590 struct dump_table_aux {
1591 struct ovsdb_datum **data;
1592 const struct ovsdb_column **columns;
1593 size_t n_columns;
1594 };
1595
1596 static int
1597 compare_data(size_t a_y, size_t b_y, size_t x,
1598 const struct dump_table_aux *aux)
1599 {
1600 return ovsdb_datum_compare_3way(&aux->data[a_y][x],
1601 &aux->data[b_y][x],
1602 &aux->columns[x]->type);
1603 }
1604
1605 static int
1606 compare_rows(size_t a_y, size_t b_y, void *aux_)
1607 {
1608 struct dump_table_aux *aux = aux_;
1609 size_t x;
1610
1611 /* Skip UUID columns on the first pass, since their values tend to be
1612 * random and make our results less reproducible. */
1613 for (x = 0; x < aux->n_columns; x++) {
1614 if (aux->columns[x]->type.key.type != OVSDB_TYPE_UUID) {
1615 int cmp = compare_data(a_y, b_y, x, aux);
1616 if (cmp) {
1617 return cmp;
1618 }
1619 }
1620 }
1621
1622 /* Use UUID columns as tie-breakers. */
1623 for (x = 0; x < aux->n_columns; x++) {
1624 if (aux->columns[x]->type.key.type == OVSDB_TYPE_UUID) {
1625 int cmp = compare_data(a_y, b_y, x, aux);
1626 if (cmp) {
1627 return cmp;
1628 }
1629 }
1630 }
1631
1632 return 0;
1633 }
1634
1635 static void
1636 swap_rows(size_t a_y, size_t b_y, void *aux_)
1637 {
1638 struct dump_table_aux *aux = aux_;
1639 struct ovsdb_datum *tmp = aux->data[a_y];
1640 aux->data[a_y] = aux->data[b_y];
1641 aux->data[b_y] = tmp;
1642 }
1643
1644 static int
1645 compare_columns(const void *a_, const void *b_)
1646 {
1647 const struct ovsdb_column *const *ap = a_;
1648 const struct ovsdb_column *const *bp = b_;
1649 const struct ovsdb_column *a = *ap;
1650 const struct ovsdb_column *b = *bp;
1651
1652 return strcmp(a->name, b->name);
1653 }
1654
1655 static void
1656 dump_table(const char *table_name, const struct shash *cols,
1657 struct json_array *rows)
1658 {
1659 const struct ovsdb_column **columns;
1660 size_t n_columns;
1661
1662 struct ovsdb_datum **data;
1663
1664 struct dump_table_aux aux;
1665 struct shash_node *node;
1666 struct table t;
1667 size_t x, y;
1668
1669 /* Sort columns by name, for reproducibility. */
1670 columns = xmalloc(shash_count(cols) * sizeof *columns);
1671 n_columns = 0;
1672 SHASH_FOR_EACH (node, cols) {
1673 struct ovsdb_column *column = node->data;
1674 if (strcmp(column->name, "_version")) {
1675 columns[n_columns++] = column;
1676 }
1677 }
1678 qsort(columns, n_columns, sizeof *columns, compare_columns);
1679
1680 /* Extract data from table. */
1681 data = xmalloc(rows->n * sizeof *data);
1682 for (y = 0; y < rows->n; y++) {
1683 struct shash *row;
1684
1685 if (rows->elems[y]->type != JSON_OBJECT) {
1686 ovs_fatal(0, "row %"PRIuSIZE" in table %s response is not a JSON object: "
1687 "%s", y, table_name, json_to_string(rows->elems[y], 0));
1688 }
1689 row = json_object(rows->elems[y]);
1690
1691 data[y] = xmalloc(n_columns * sizeof **data);
1692 for (x = 0; x < n_columns; x++) {
1693 const struct json *json = shash_find_data(row, columns[x]->name);
1694 if (!json) {
1695 ovs_fatal(0, "row %"PRIuSIZE" in table %s response lacks %s column",
1696 y, table_name, columns[x]->name);
1697 }
1698
1699 check_ovsdb_error(ovsdb_unconstrained_datum_from_json(
1700 &data[y][x], &columns[x]->type, json));
1701 }
1702 }
1703
1704 /* Sort rows by column values, for reproducibility. */
1705 aux.data = data;
1706 aux.columns = columns;
1707 aux.n_columns = n_columns;
1708 sort(rows->n, compare_rows, swap_rows, &aux);
1709
1710 /* Add column headings. */
1711 table_init(&t);
1712 table_set_caption(&t, xasprintf("%s table", table_name));
1713 for (x = 0; x < n_columns; x++) {
1714 table_add_column(&t, "%s", columns[x]->name);
1715 }
1716
1717 /* Print rows. */
1718 for (y = 0; y < rows->n; y++) {
1719 table_add_row(&t);
1720 for (x = 0; x < n_columns; x++) {
1721 struct cell *cell = table_add_cell(&t);
1722 cell->json = ovsdb_datum_to_json(&data[y][x], &columns[x]->type);
1723 cell->type = &columns[x]->type;
1724 ovsdb_datum_destroy(&data[y][x], &columns[x]->type);
1725 }
1726 free(data[y]);
1727 }
1728 table_print(&t, &table_style);
1729 table_destroy(&t);
1730
1731 free(data);
1732 free(columns);
1733 }
1734
1735 static void
1736 do_dump(struct jsonrpc *rpc, const char *database,
1737 int argc, char *argv[])
1738 {
1739 struct jsonrpc_msg *request, *reply;
1740 struct ovsdb_schema *schema;
1741 struct json *transaction;
1742
1743 const struct shash_node *node, **tables;
1744 size_t n_tables;
1745 struct ovsdb_table_schema *tschema;
1746 const struct shash *columns;
1747 struct shash custom_columns;
1748
1749 size_t i;
1750
1751 shash_init(&custom_columns);
1752 schema = fetch_schema(rpc, database);
1753 if (argc) {
1754 node = shash_find(&schema->tables, argv[0]);
1755 if (!node) {
1756 ovs_fatal(0, "No table \"%s\" found.", argv[0]);
1757 }
1758 tables = xmemdup(&node, sizeof node);
1759 n_tables = 1;
1760 tschema = tables[0]->data;
1761 for (i = 1; i < argc; i++) {
1762 node = shash_find(&tschema->columns, argv[i]);
1763 if (!node) {
1764 ovs_fatal(0, "Table \"%s\" has no column %s.", argv[0], argv[i]);
1765 }
1766 shash_add(&custom_columns, argv[i], node->data);
1767 }
1768 } else {
1769 tables = shash_sort(&schema->tables);
1770 n_tables = shash_count(&schema->tables);
1771 }
1772
1773 /* Construct transaction to retrieve entire database. */
1774 transaction = json_array_create_1(json_string_create(database));
1775 for (i = 0; i < n_tables; i++) {
1776 const struct ovsdb_table_schema *ts = tables[i]->data;
1777 struct json *op, *jcolumns;
1778
1779 if (argc > 1) {
1780 columns = &custom_columns;
1781 } else {
1782 columns = &ts->columns;
1783 }
1784 jcolumns = json_array_create_empty();
1785 SHASH_FOR_EACH (node, columns) {
1786 const struct ovsdb_column *column = node->data;
1787
1788 if (strcmp(column->name, "_version")) {
1789 json_array_add(jcolumns, json_string_create(column->name));
1790 }
1791 }
1792
1793 op = json_object_create();
1794 json_object_put_string(op, "op", "select");
1795 json_object_put_string(op, "table", tables[i]->name);
1796 json_object_put(op, "where", json_array_create_empty());
1797 json_object_put(op, "columns", jcolumns);
1798 json_array_add(transaction, op);
1799 }
1800
1801 /* Send request, get reply. */
1802 request = jsonrpc_create_request("transact", transaction, NULL);
1803 check_txn(jsonrpc_transact_block(rpc, request, &reply), &reply);
1804
1805 /* Print database contents. */
1806 if (reply->result->type != JSON_ARRAY
1807 || reply->result->array.n != n_tables) {
1808 ovs_fatal(0, "reply is not array of %"PRIuSIZE" elements: %s",
1809 n_tables, json_to_string(reply->result, 0));
1810 }
1811 for (i = 0; i < n_tables; i++) {
1812 const struct ovsdb_table_schema *ts = tables[i]->data;
1813 const struct json *op_result = reply->result->array.elems[i];
1814 struct json *rows;
1815
1816 if (op_result->type != JSON_OBJECT
1817 || !(rows = shash_find_data(json_object(op_result), "rows"))
1818 || rows->type != JSON_ARRAY) {
1819 ovs_fatal(0, "%s table reply is not an object with a \"rows\" "
1820 "member array: %s",
1821 ts->name, json_to_string(op_result, 0));
1822 }
1823
1824 if (argc > 1) {
1825 dump_table(tables[i]->name, &custom_columns, &rows->array);
1826 } else {
1827 dump_table(tables[i]->name, &ts->columns, &rows->array);
1828 }
1829 }
1830
1831 jsonrpc_msg_destroy(reply);
1832 shash_destroy(&custom_columns);
1833 free(tables);
1834 ovsdb_schema_destroy(schema);
1835 }
1836
1837 static void
1838 print_and_free_log_record(struct json *record)
1839 {
1840 struct ds header = DS_EMPTY_INITIALIZER;
1841 struct ds data = DS_EMPTY_INITIALIZER;
1842 ovsdb_log_compose_record(record, OVSDB_MAGIC, &header, &data);
1843 fwrite(header.string, header.length, 1, stdout);
1844 fwrite(data.string, data.length, 1, stdout);
1845 ds_destroy(&data);
1846 ds_destroy(&header);
1847 json_destroy(record);
1848 }
1849
1850 static void
1851 set_binary_mode(FILE *stream OVS_UNUSED)
1852 {
1853 #ifdef _WIN32
1854 fflush(stream);
1855 /* On Windows set binary mode on the file descriptor to avoid
1856 * translation (i.e. CRLF line endings). */
1857 if (_setmode(_fileno(stream), O_BINARY) == -1) {
1858 ovs_fatal(errno, "could not set binary mode on fd %d",
1859 _fileno(stream));
1860 }
1861 #endif
1862 }
1863
1864 static void
1865 do_backup(struct jsonrpc *rpc, const char *database,
1866 int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
1867 {
1868 if (isatty(STDOUT_FILENO)) {
1869 ovs_fatal(0, "not writing backup to a terminal; "
1870 "please redirect stdout to a file");
1871 }
1872 set_binary_mode(stdout);
1873
1874 /* Get schema. */
1875 struct ovsdb_schema *schema = fetch_schema(rpc, database);
1876
1877 /* Construct transaction to retrieve all tables. */
1878 struct json *txn = json_array_create_1(json_string_create(database));
1879 struct shash_node *node;
1880 SHASH_FOR_EACH (node, &schema->tables) {
1881 const char *table_name = node->name;
1882 const struct ovsdb_table_schema *table = node->data;
1883
1884 /* Get all the columns except _version and the ephemeral ones.
1885 *
1886 * We don't omit tables that only have ephemeral columns because of the
1887 * possibility that other tables references rows in those tables; that
1888 * is, even if all the columns are ephemeral, the rows themselves are
1889 * not. */
1890 struct json *columns = json_array_create_empty();
1891 struct shash_node *node2;
1892 SHASH_FOR_EACH (node2, &table->columns) {
1893 const struct ovsdb_column *column = node2->data;
1894
1895 if (column->persistent) {
1896 if (!columns) {
1897 columns = json_array_create_empty();
1898 }
1899 json_array_add(columns, json_string_create(column->name));
1900 }
1901 }
1902
1903 struct json *op = json_object_create();
1904 json_object_put_string(op, "op", "select");
1905 json_object_put_string(op, "table", table_name);
1906 json_object_put(op, "where", json_array_create_empty());
1907 json_object_put(op, "columns", columns);
1908 json_array_add(txn, op);
1909 }
1910
1911 /* Send request, get reply. */
1912 struct jsonrpc_msg *rq = jsonrpc_create_request("transact", txn, NULL);
1913 struct jsonrpc_msg *reply;
1914 check_txn(jsonrpc_transact_block(rpc, rq, &reply), &reply);
1915
1916 /* Print schema record. */
1917 print_and_free_log_record(ovsdb_schema_to_json(schema));
1918
1919 /* Print database transaction record. */
1920 if (reply->result->type != JSON_ARRAY
1921 || reply->result->array.n != shash_count(&schema->tables)) {
1922 ovs_fatal(0, "reply is not array of %"PRIuSIZE" elements: %s",
1923 shash_count(&schema->tables),
1924 json_to_string(reply->result, 0));
1925 }
1926 struct json *output_txn = json_object_create();
1927
1928 size_t i = 0;
1929 SHASH_FOR_EACH (node, &schema->tables) {
1930 const char *table_name = node->name;
1931 const struct ovsdb_table_schema *table = node->data;
1932 const struct json *op_result = reply->result->array.elems[i++];
1933 struct json *rows;
1934
1935 if (op_result->type != JSON_OBJECT
1936 || !(rows = shash_find_data(json_object(op_result), "rows"))
1937 || rows->type != JSON_ARRAY) {
1938 ovs_fatal(0, "%s table reply is not an object with a \"rows\" "
1939 "member array: %s",
1940 table->name, json_to_string(op_result, 0));
1941 }
1942
1943 if (!rows->array.n) {
1944 continue;
1945 }
1946
1947 struct json *output_rows = json_object_create();
1948 for (size_t j = 0; j < rows->array.n; j++) {
1949 struct json *row = rows->array.elems[j];
1950 if (row->type != JSON_OBJECT) {
1951 ovs_fatal(0, "%s table reply row is not an object: %s",
1952 table_name, json_to_string(row, 0));
1953 }
1954
1955 struct json *uuid_json = shash_find_and_delete(json_object(row),
1956 "_uuid");
1957 if (!uuid_json) {
1958 ovs_fatal(0, "%s table reply row lacks _uuid member: %s",
1959 table_name, json_to_string(row, 0));
1960 }
1961
1962 const struct ovsdb_base_type uuid_base = OVSDB_BASE_UUID_INIT;
1963 union ovsdb_atom atom;
1964 check_ovsdb_error(ovsdb_atom_from_json(&atom, &uuid_base,
1965 uuid_json, NULL));
1966
1967 char uuid_s[UUID_LEN + 1];
1968 snprintf(uuid_s, sizeof uuid_s, UUID_FMT, UUID_ARGS(&atom.uuid));
1969 json_object_put(output_rows, uuid_s, json_clone(row));
1970 }
1971 json_object_put(output_txn, table_name, output_rows);
1972 }
1973 output_txn = ovsdb_file_txn_annotate(
1974 output_txn, "produced by \"ovsdb-client backup\"");
1975 print_and_free_log_record(output_txn);
1976
1977 ovsdb_schema_destroy(schema);
1978 jsonrpc_msg_destroy(reply);
1979 }
1980
1981 static void
1982 check_transaction_reply(struct jsonrpc_msg *reply)
1983 {
1984 if (reply->result->type != JSON_ARRAY) {
1985 ovs_fatal(0, "result is not array");
1986 }
1987 for (size_t i = 0; i < json_array(reply->result)->n; i++) {
1988 struct json *json = json_array(reply->result)->elems[i];
1989 if (json->type != JSON_OBJECT) {
1990 ovs_fatal(0, "result array element is not object");
1991 }
1992 struct shash *object = json_object(json);
1993 if (shash_find(object, "error")) {
1994 ovs_fatal(0, "server returned error reply: %s",
1995 json_to_string(json, JSSF_SORT));
1996 }
1997 }
1998 }
1999
2000 static void
2001 do_restore(struct jsonrpc *rpc, const char *database,
2002 int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
2003 {
2004 if (isatty(STDIN_FILENO)) {
2005 ovs_fatal(0, "not reading backup from a terminal; "
2006 "please redirect stdin from a file");
2007 }
2008 set_binary_mode(stdin);
2009
2010 struct ovsdb *backup = ovsdb_file_read("/dev/stdin", false);
2011 ovsdb_storage_close(backup->storage);
2012 backup->storage = NULL;
2013
2014 struct ovsdb_schema *online_schema = fetch_schema(rpc, database);
2015 if (!ovsdb_schema_equal(backup->schema, online_schema)) {
2016 struct ds s = DS_EMPTY_INITIALIZER;
2017 if (strcmp(backup->schema->version, online_schema->version)) {
2018 ds_put_format(&s, "backup schema has version \"%s\" but "
2019 "database schema has version \"%s\"",
2020 backup->schema->version, online_schema->version);
2021 } else {
2022 ds_put_format(&s, "backup schema and database schema are "
2023 "both version %s but still differ",
2024 backup->schema->version);
2025 }
2026 if (!force) {
2027 ovs_fatal(0, "%s (use --force to override differences, or "
2028 "\"ovsdb-client convert\" to change the schema)",
2029 ds_cstr(&s));
2030 }
2031 VLOG_INFO("%s", ds_cstr(&s));
2032 ds_destroy(&s);
2033 }
2034 ovsdb_schema_destroy(online_schema);
2035
2036 struct json *txn = json_array_create_empty();
2037 json_array_add(txn, json_string_create(backup->schema->name));
2038 struct shash_node *node;
2039 SHASH_FOR_EACH (node, &backup->tables) {
2040 const char *table_name = node->name;
2041 struct ovsdb_table *table = node->data;
2042
2043 struct json *del_op = json_object_create();
2044 json_object_put_string(del_op, "op", "delete");
2045 json_object_put_string(del_op, "table", table_name);
2046 json_object_put(del_op, "where", json_array_create_empty());
2047 json_array_add(txn, del_op);
2048
2049 const struct ovsdb_row *row;
2050 HMAP_FOR_EACH (row, hmap_node, &table->rows) {
2051 struct json *ins_op = json_object_create();
2052 json_object_put_string(ins_op, "op", "insert");
2053 json_object_put_string(ins_op, "table", table_name);
2054 json_object_put(ins_op, "uuid-name",
2055 json_string_create_nocopy(
2056 ovsdb_data_row_name(ovsdb_row_get_uuid(row))));
2057 struct json *row_json = json_object_create();
2058 json_object_put(ins_op, "row", row_json);
2059
2060 struct shash_node *node2;
2061 SHASH_FOR_EACH (node2, &table->schema->columns) {
2062 const struct ovsdb_column *column = node2->data;
2063 const struct ovsdb_datum *datum = &row->fields[column->index];
2064 const struct ovsdb_type *type = &column->type;
2065 if (column->persistent
2066 && column->index >= OVSDB_N_STD_COLUMNS
2067 && !ovsdb_datum_is_default(datum, type)) {
2068 struct json *value = ovsdb_datum_to_json_with_row_names(
2069 datum, type);
2070 json_object_put(row_json, column->name, value);
2071 }
2072 }
2073 json_array_add(txn, ins_op);
2074 }
2075 }
2076 ovsdb_destroy(backup);
2077 struct jsonrpc_msg *rq = jsonrpc_create_request("transact", txn, NULL);
2078 struct jsonrpc_msg *reply;
2079 check_txn(jsonrpc_transact_block(rpc, rq, &reply), &reply);
2080 check_transaction_reply(reply);
2081 jsonrpc_msg_destroy(reply);
2082 }
2083
2084
2085 static void
2086 do_help(struct jsonrpc *rpc OVS_UNUSED, const char *database OVS_UNUSED,
2087 int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
2088 {
2089 usage();
2090 }
2091
2092 \f
2093 /* "lock" command. */
2094
2095 struct ovsdb_client_lock_req {
2096 const char *method;
2097 char *lock;
2098 };
2099
2100 static void
2101 lock_req_init(struct ovsdb_client_lock_req *lock_req,
2102 const char *method, const char *lock_name)
2103 {
2104 if (lock_req->method || lock_req->lock) {
2105 return;
2106 }
2107 lock_req->method = method;
2108 lock_req->lock = xstrdup(lock_name);
2109 }
2110
2111 static bool
2112 lock_req_is_set(struct ovsdb_client_lock_req *lock_req)
2113 {
2114 return lock_req->method;
2115 }
2116
2117 static void
2118 lock_req_destroy(struct ovsdb_client_lock_req *lock_req)
2119 {
2120 free(lock_req->lock);
2121 lock_req->method = NULL;
2122 lock_req->lock = NULL;
2123 }
2124
2125 /* Create a lock class request. Caller is responsible for free
2126 * the 'request' message. */
2127 static struct jsonrpc_msg *
2128 create_lock_request(struct ovsdb_client_lock_req *lock_req)
2129 {
2130 struct json *locks, *lock;
2131
2132 locks = json_array_create_empty();
2133 lock = json_string_create(lock_req->lock);
2134 json_array_add(locks, lock);
2135
2136 return jsonrpc_create_request(lock_req->method, locks, NULL);
2137 }
2138
2139 static void
2140 ovsdb_client_lock(struct unixctl_conn *conn, int argc OVS_UNUSED,
2141 const char *argv[], void *lock_req_)
2142 {
2143 struct ovsdb_client_lock_req *lock_req = lock_req_;
2144 lock_req_init(lock_req, "lock", argv[1]);
2145 unixctl_command_reply(conn, NULL);
2146 }
2147
2148 static void
2149 ovsdb_client_unlock(struct unixctl_conn *conn, int argc OVS_UNUSED,
2150 const char *argv[], void *lock_req_)
2151 {
2152 struct ovsdb_client_lock_req *lock_req = lock_req_;
2153 lock_req_init(lock_req, "unlock", argv[1]);
2154 unixctl_command_reply(conn, NULL);
2155 }
2156
2157 static void
2158 ovsdb_client_steal(struct unixctl_conn *conn, int argc OVS_UNUSED,
2159 const char *argv[], void *lock_req_)
2160 {
2161 struct ovsdb_client_lock_req *lock_req = lock_req_;
2162 lock_req_init(lock_req, "steal", argv[1]);
2163 unixctl_command_reply(conn, NULL);
2164 }
2165
2166 static void
2167 do_lock(struct jsonrpc *rpc, const char *method, const char *lock)
2168 {
2169 struct ovsdb_client_lock_req lock_req = {NULL, NULL};
2170 struct unixctl_server *unixctl;
2171 struct jsonrpc_msg *request;
2172 struct json *request_id = NULL;
2173 bool exiting = false;
2174 bool enable_lock_request = true; /* Don't send another request before
2175 getting a reply of the previous
2176 request. */
2177 daemon_save_fd(STDOUT_FILENO);
2178 daemonize_start(false);
2179 lock_req_init(&lock_req, method, lock);
2180
2181 if (get_detach()) {
2182 int error;
2183
2184 error = unixctl_server_create(NULL, &unixctl);
2185 if (error) {
2186 ovs_fatal(error, "failed to create unixctl server");
2187 }
2188
2189 unixctl_command_register("unlock", "LOCK", 1, 1,
2190 ovsdb_client_unlock, &lock_req);
2191 unixctl_command_register("steal", "LOCK", 1, 1,
2192 ovsdb_client_steal, &lock_req);
2193 unixctl_command_register("lock", "LOCK", 1, 1,
2194 ovsdb_client_lock, &lock_req);
2195 unixctl_command_register("exit", "", 0, 0,
2196 ovsdb_client_exit, &exiting);
2197 } else {
2198 unixctl = NULL;
2199 }
2200
2201 for (;;) {
2202 struct jsonrpc_msg *msg;
2203 int error;
2204
2205 unixctl_server_run(unixctl);
2206 if (enable_lock_request && lock_req_is_set(&lock_req)) {
2207 request = create_lock_request(&lock_req);
2208 request_id = json_clone(request->id);
2209 jsonrpc_send(rpc, request);
2210 lock_req_destroy(&lock_req);
2211 }
2212
2213 error = jsonrpc_recv(rpc, &msg);
2214 if (error == EAGAIN) {
2215 goto no_msg;
2216 } else if (error) {
2217 ovs_fatal(error, "%s: receive failed", jsonrpc_get_name(rpc));
2218 }
2219
2220 if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
2221 jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
2222 msg->id));
2223 } else if (msg->type == JSONRPC_REPLY
2224 && json_equal(msg->id, request_id)) {
2225 print_json(msg->result);
2226 fflush(stdout);
2227 enable_lock_request = true;
2228 json_destroy(request_id);
2229 request_id = NULL;
2230 daemonize_complete();
2231 } else if (msg->type == JSONRPC_NOTIFY) {
2232 puts(msg->method);
2233 print_json(msg->params);
2234 fflush(stdout);
2235 }
2236
2237 jsonrpc_msg_destroy(msg);
2238
2239 no_msg:
2240 if (exiting) {
2241 break;
2242 }
2243
2244 jsonrpc_run(rpc);
2245 jsonrpc_wait(rpc);
2246 jsonrpc_recv_wait(rpc);
2247
2248 unixctl_server_wait(unixctl);
2249 poll_block();
2250 }
2251
2252 json_destroy(request_id);
2253 unixctl_server_destroy(unixctl);
2254 }
2255
2256 static void
2257 do_lock_create(struct jsonrpc *rpc, const char *database OVS_UNUSED,
2258 int argc OVS_UNUSED, char *argv[])
2259 {
2260 do_lock(rpc, "lock", argv[0]);
2261 }
2262
2263 static void
2264 do_lock_steal(struct jsonrpc *rpc, const char *database OVS_UNUSED,
2265 int argc OVS_UNUSED, char *argv[])
2266 {
2267 do_lock(rpc, "steal", argv[0]);
2268 }
2269
2270 static void
2271 do_lock_unlock(struct jsonrpc *rpc, const char *database OVS_UNUSED,
2272 int argc OVS_UNUSED, char *argv[])
2273 {
2274 do_lock(rpc, "unlock", argv[0]);
2275 }
2276
2277 enum ovsdb_client_wait_type {
2278 WAIT_CONNECTED,
2279 WAIT_ADDED,
2280 WAIT_REMOVED
2281 };
2282
2283 static struct jsonrpc_msg *
2284 compose_wait_transaction(enum ovsdb_client_wait_type type,
2285 const char *database)
2286 {
2287 struct json *txn = json_array_create_empty();
2288 json_array_add(txn, json_string_create("_Server"));
2289
2290 struct json *op = json_object_create();
2291 json_array_add(txn, op);
2292 json_object_put_string(op, "op", "wait");
2293 json_object_put_string(op, "table", "Database");
2294 json_object_put(op, "where",
2295 json_array_create_1(
2296 json_array_create_3(
2297 json_string_create("name"),
2298 json_string_create("=="),
2299 json_string_create(database))));
2300
2301 if (type == WAIT_CONNECTED) {
2302 /* Wait until connected == true. */
2303 json_object_put(op, "columns",
2304 json_array_create_1(json_string_create("connected")));
2305 json_object_put_string(op, "until", "==");
2306
2307 struct json *row = json_object_create();
2308 json_object_put(row, "connected", json_boolean_create(true));
2309 json_object_put(op, "rows", json_array_create_1(row));
2310 } else {
2311 ovs_assert(type == WAIT_ADDED || type == WAIT_REMOVED);
2312
2313 /* Wait until such a row exists, or not, respectively. */
2314 json_object_put(op, "columns", json_array_create_empty());
2315 json_object_put_string(op, "until", "==");
2316 json_object_put(op, "rows",
2317 (type == WAIT_ADDED
2318 ? json_array_create_1(json_object_create())
2319 : json_array_create_empty()));
2320 }
2321 return jsonrpc_create_request("transact", txn, NULL);
2322 }
2323
2324 static void
2325 do_wait(struct jsonrpc *rpc_unused OVS_UNUSED,
2326 const char *database_unused OVS_UNUSED,
2327 int argc, char *argv[])
2328 {
2329 const char *database = argv[argc - 2];
2330 const char *state = argv[argc - 1];
2331
2332 enum ovsdb_client_wait_type type;
2333 if (!strcmp(state, "connected")) {
2334 type = WAIT_CONNECTED;
2335 } else if (!strcmp(state, "added")) {
2336 type = WAIT_ADDED;
2337 } else if (!strcmp(state, "removed")) {
2338 type = WAIT_REMOVED;
2339 } else {
2340 ovs_fatal(0, "%s: unknown state", state);
2341 }
2342
2343 char *remote = argc > 2 ? xstrdup(argv[0]) : default_remote();
2344 struct jsonrpc_session *js = jsonrpc_session_open(remote, true);
2345 free(remote);
2346
2347 unsigned int seqno = 0;
2348 struct json *sdca_id = NULL;
2349 struct json *txn_id = NULL;
2350 for (;;) {
2351 jsonrpc_session_run(js);
2352
2353 if (seqno != jsonrpc_session_get_seqno(js)
2354 && jsonrpc_session_is_connected(js)) {
2355 seqno = jsonrpc_session_get_seqno(js);
2356
2357 /* Send set_db_change_aware request. */
2358 struct jsonrpc_msg *rq = jsonrpc_create_request(
2359 "set_db_change_aware",
2360 json_array_create_1(json_boolean_create(true)),
2361 NULL);
2362 json_destroy(sdca_id);
2363 sdca_id = json_clone(rq->id);
2364 jsonrpc_session_send(js, rq);
2365
2366 /* Send transaction. */
2367 rq = compose_wait_transaction(type, database);
2368 json_destroy(txn_id);
2369 txn_id = json_clone(rq->id);
2370 jsonrpc_session_send(js, rq);
2371 }
2372
2373 struct jsonrpc_msg *reply = jsonrpc_session_recv(js);
2374 if (reply && reply->id) {
2375 if (sdca_id && json_equal(sdca_id, reply->id)) {
2376 if (reply->type == JSONRPC_ERROR) {
2377 ovs_fatal(0, "%s: set_db_change_aware failed (%s)",
2378 jsonrpc_session_get_name(js),
2379 json_to_string(reply->error, 0));
2380 }
2381 } else if (txn_id && json_equal(txn_id, reply->id)) {
2382 check_transaction_reply(reply);
2383 exit(0);
2384 }
2385 }
2386 jsonrpc_msg_destroy(reply);
2387
2388 jsonrpc_session_recv_wait(js);
2389 jsonrpc_session_wait(js);
2390 poll_block();
2391 }
2392 }
2393
2394 /* Command handlers may take an optional server socket name (e.g. "unix:...")
2395 * and an optional database name (e.g. Open_vSwitch) as their initial
2396 * arguments. The NEED_* element indicates what a particular command needs.
2397 * These optional arguments should not be included in min_args or max_args, and
2398 * they are not included in the argc and argv arguments passed to the handler:
2399 * the argv[0] passed to the handler is the first argument after the optional
2400 * server socket name. */
2401 static const struct ovsdb_client_command all_commands[] = {
2402 { "list-dbs", NEED_RPC, 0, 0, do_list_dbs },
2403 { "get-schema", NEED_DATABASE, 0, 0, do_get_schema },
2404 { "get-schema-version", NEED_DATABASE, 0, 0, do_get_schema_version },
2405 { "get-schema-cksum", NEED_DATABASE, 0, 0, do_get_schema_cksum },
2406 { "list-tables", NEED_DATABASE, 0, 0, do_list_tables },
2407 { "list-columns", NEED_DATABASE, 0, 1, do_list_columns },
2408 { "transact", NEED_NONE, 1, 2, do_transact },
2409 { "query", NEED_NONE, 1, 2, do_query },
2410 { "monitor", NEED_DATABASE, 1, INT_MAX, do_monitor },
2411 { "monitor-cond", NEED_DATABASE, 2, 3, do_monitor_cond },
2412 { "wait", NEED_NONE, 2, 3, do_wait },
2413 { "convert", NEED_NONE, 1, 2, do_convert },
2414 { "needs-conversion", NEED_NONE, 1, 2, do_needs_conversion },
2415 { "dump", NEED_DATABASE, 0, INT_MAX, do_dump },
2416 { "backup", NEED_DATABASE, 0, 0, do_backup },
2417 { "restore", NEED_DATABASE, 0, 0, do_restore },
2418 { "lock", NEED_RPC, 1, 1, do_lock_create },
2419 { "steal", NEED_RPC, 1, 1, do_lock_steal },
2420 { "unlock", NEED_RPC, 1, 1, do_lock_unlock },
2421 { "help", NEED_NONE, 0, INT_MAX, do_help },
2422
2423 { NULL, 0, 0, 0, NULL },
2424 };
2425
2426 static const struct ovsdb_client_command *get_all_commands(void)
2427 {
2428 return all_commands;
2429 }