]> git.proxmox.com Git - mirror_ovs.git/blobdiff - lib/db-ctl-base.c
cirrus: Use FreeBSD 12.2.
[mirror_ovs.git] / lib / db-ctl-base.c
index e3c0373d29c0b8cfca182432628f8af3d34cd3a7..e95c77da2cf60a03e6024c89ae92d1b21ee8b525 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015 Nicira, Inc.
+ * Copyright (c) 2015, 2016, 2017 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 #include "command-line.h"
 #include "compiler.h"
 #include "dirs.h"
-#include "dynamic-string.h"
+#include "openvswitch/dynamic-string.h"
 #include "fatal-signal.h"
 #include "hash.h"
-#include "json.h"
+#include "openvswitch/json.h"
 #include "openvswitch/vlog.h"
 #include "ovsdb-data.h"
 #include "ovsdb-idl.h"
 #include "ovsdb-idl-provider.h"
-#include "shash.h"
+#include "openvswitch/shash.h"
+#include "sset.h"
+#include "svec.h"
 #include "string.h"
 #include "table.h"
 #include "util.h"
@@ -51,7 +53,7 @@ VLOG_DEFINE_THIS_MODULE(db_ctl_base);
  * when ctl_init() is called.
  *
  * */
-extern struct cmd_show_table cmd_show_tables[];
+static const struct cmd_show_table *cmd_show_tables;
 
 /* ctl_exit() is called by ctl_fatal(). User can optionally supply an exit
  * function ctl_exit_func() via ctl_init. If supplied, this function will
@@ -60,16 +62,20 @@ extern struct cmd_show_table cmd_show_tables[];
 static void (*ctl_exit_func)(int status) = NULL;
 OVS_NO_RETURN static void ctl_exit(int status);
 
-/* Represents all tables in the schema.  User must define 'tables'
- * in implementation and supply via clt_init().  The definition must end
- * with an all-NULL entry. */
-static const struct ctl_table_class *tables;
+/* IDL class. */
+static const struct ovsdb_idl_class *idl_class;
+
+/* Two arrays with 'n_classes' elements, which represent the tables in this
+ * database and how the user can refer to their rows. */
+static const struct ctl_table_class *ctl_classes;
+static const struct ovsdb_idl_table_class *idl_classes;
+static size_t n_classes;
 
 static struct shash all_commands = SHASH_INITIALIZER(&all_commands);
-static const struct ctl_table_class *get_table(const char *table_name);
-static void set_column(const struct ctl_table_class *,
-                       const struct ovsdb_idl_row *, const char *,
-                       struct ovsdb_symbol_table *);
+static char *get_table(const char *, const struct ovsdb_idl_table_class **);
+static char *set_column(const struct ovsdb_idl_table_class *,
+                        const struct ovsdb_idl_row *, const char *,
+                        struct ovsdb_symbol_table *);
 
 \f
 static struct option *
@@ -180,20 +186,16 @@ print_command_arguments(const struct ctl_command_syntax *command)
     free(output);
 }
 
-static void
-die_if_error(char *error)
-{
-    if (error) {
-        ctl_fatal("%s", error);
-    }
-}
-
 static int
 to_lower_and_underscores(unsigned c)
 {
     return c == '-' ? '_' : tolower(c);
 }
 
+/* Returns a score representing how well 's' matches 'name'.  Higher return
+ * values indicate a better match.  The order of the arguments is important:
+ * 'name' is the name of an entity such as a table or a column, and 's' is user
+ * input. */
 static unsigned int
 score_partial_match(const char *name, const char *s)
 {
@@ -212,13 +214,18 @@ score_partial_match(const char *name, const char *s)
     return *s == '\0' ? score : 0;
 }
 
-static struct ovsdb_symbol *
-create_symbol(struct ovsdb_symbol_table *symtab, const char *id, bool *newp)
+/* Returns NULL and sets 'symbolp' and 'newp' if symbol was created
+ * successfully. Otherwise returns a malloc()'ed error message on failure. */
+static char * OVS_WARN_UNUSED_RESULT
+create_symbol(struct ovsdb_symbol_table *symtab, const char *id,
+              struct ovsdb_symbol **symbolp, bool *newp)
 {
     struct ovsdb_symbol *symbol;
 
+    ovs_assert(symbolp);
+
     if (id[0] != '@') {
-        ctl_fatal("row id \"%s\" does not begin with \"@\"", id);
+        return xasprintf("row id \"%s\" does not begin with \"@\"", id);
     }
 
     if (newp) {
@@ -227,57 +234,114 @@ create_symbol(struct ovsdb_symbol_table *symtab, const char *id, bool *newp)
 
     symbol = ovsdb_symbol_table_insert(symtab, id);
     if (symbol->created) {
-        ctl_fatal("row id \"%s\" may only be specified on one --id option",
-                  id);
+        return xasprintf("row id \"%s\" may only be specified on one --id "
+                         "option", id);
     }
     symbol->created = true;
-    return symbol;
+    *symbolp = symbol;
+    return NULL;
+}
+
+static bool
+record_id_equals(const union ovsdb_atom *name, enum ovsdb_atomic_type type,
+                 const char *record_id)
+{
+    if (type == OVSDB_TYPE_STRING) {
+        if (!strcmp(name->string, record_id)) {
+            return true;
+        }
+
+        struct uuid uuid;
+        size_t len = strlen(record_id);
+        if (len >= 4
+            && uuid_from_string(&uuid, name->string)
+            && !strncmp(name->string, record_id, len)) {
+            return true;
+        }
+
+        return false;
+    } else {
+        ovs_assert(type == OVSDB_TYPE_INTEGER);
+        return name->integer == strtoll(record_id, NULL, 10);
+    }
 }
 
 static const struct ovsdb_idl_row *
-get_row_by_id(struct ctl_context *ctx, const struct ctl_table_class *table,
-              const struct ctl_row_id *id, const char *record_id)
+get_row_by_id(struct ctl_context *ctx,
+              const struct ovsdb_idl_table_class *table,
+              const struct ctl_row_id *id, const char *record_id,
+              bool *multiple_matches)
 {
-    const struct ovsdb_idl_row *referrer, *final;
+    ovs_assert(multiple_matches);
+    *multiple_matches = false;
 
-    if (!id->table) {
+    if (!id->name_column) {
         return NULL;
     }
 
-    if (!id->name_column) {
-        if (strcmp(record_id, ".")) {
-            return NULL;
-        }
-        referrer = ovsdb_idl_first_row(ctx->idl, id->table);
-        if (!referrer || ovsdb_idl_next_row(referrer)) {
+    const struct ovsdb_idl_row *referrer = NULL;
+
+    /* Figure out the 'key' and 'value' types for the column that we're going
+     * to look at.  One of these ('name_type') is the type of the name we're
+     * going to compare against 'record_id'.  */
+    enum ovsdb_atomic_type key, value, name_type;
+    if (!id->key) {
+        name_type = key = id->name_column->type.key.type;
+        value = OVSDB_TYPE_VOID;
+    } else {
+        key = OVSDB_TYPE_STRING;
+        name_type = value = id->name_column->type.value.type;
+    }
+
+    /* We only support integer and string names (so far). */
+    if (name_type == OVSDB_TYPE_INTEGER) {
+        if (!record_id[0] || record_id[strspn(record_id, "0123456789")]) {
             return NULL;
         }
     } else {
-        const struct ovsdb_idl_row *row;
+        ovs_assert(name_type == OVSDB_TYPE_STRING);
+    }
 
-        referrer = NULL;
-        for (row = ovsdb_idl_first_row(ctx->idl, id->table);
-             row != NULL;
-             row = ovsdb_idl_next_row(row))
-            {
-                const struct ovsdb_datum *name;
-
-                name = ovsdb_idl_get(row, id->name_column,
-                                     OVSDB_TYPE_STRING, OVSDB_TYPE_VOID);
-                if (name->n == 1 && !strcmp(name->keys[0].string, record_id)) {
-                    if (referrer) {
-                        ctl_fatal("multiple rows in %s match \"%s\"",
-                                  table->class->name, record_id);
-                    }
-                    referrer = row;
-                }
+    const struct ovsdb_idl_class *class = ovsdb_idl_get_class(ctx->idl);
+    const struct ovsdb_idl_table_class *id_table
+        = ovsdb_idl_table_class_from_column(class, id->name_column);
+    for (const struct ovsdb_idl_row *row = ovsdb_idl_first_row(ctx->idl,
+                                                               id_table);
+         row != NULL;
+         row = ovsdb_idl_next_row(row)) {
+        /* Pick out the name column's data. */
+        const struct ovsdb_datum *datum = ovsdb_idl_get(
+            row, id->name_column, key, value);
+
+        /* Extract the name from the column. */
+        const union ovsdb_atom *name;
+        if (!id->key) {
+            name = datum->n == 1 ? &datum->keys[0] : NULL;
+        } else {
+            const union ovsdb_atom key_atom
+                = { .string = CONST_CAST(char *, id->key) };
+            unsigned int i = ovsdb_datum_find_key(datum, &key_atom,
+                                                  OVSDB_TYPE_STRING);
+            name = i == UINT_MAX ? NULL : &datum->values[i];
+        }
+        if (!name) {
+            continue;
+        }
+
+        /* If the name equals 'record_id', take it. */
+        if (record_id_equals(name, name_type, record_id)) {
+            if (referrer) {
+                *multiple_matches = true;
+                return NULL;
             }
+            referrer = row;
+        }
     }
     if (!referrer) {
         return NULL;
     }
 
-    final = NULL;
+    const struct ovsdb_idl_row *final = referrer;
     if (id->uuid_column) {
         const struct ovsdb_datum *uuid;
 
@@ -285,99 +349,150 @@ get_row_by_id(struct ctl_context *ctx, const struct ctl_table_class *table,
         uuid = ovsdb_idl_get(referrer, id->uuid_column,
                              OVSDB_TYPE_UUID, OVSDB_TYPE_VOID);
         if (uuid->n == 1) {
-            final = ovsdb_idl_get_row_for_uuid(ctx->idl, table->class,
+            final = ovsdb_idl_get_row_for_uuid(ctx->idl, table,
                                                &uuid->keys[0].uuid);
+        } else {
+            final = NULL;
         }
-    } else {
-        final = referrer;
     }
-
     return final;
 }
 
-static const struct ovsdb_idl_row *
-get_row(struct ctl_context *ctx,
-        const struct ctl_table_class *table, const char *record_id,
-        bool must_exist)
+char * OVS_WARN_UNUSED_RESULT
+ctl_get_row(struct ctl_context *ctx,
+            const struct ovsdb_idl_table_class *table, const char *record_id,
+            bool must_exist, const struct ovsdb_idl_row **rowp)
 {
-    const struct ovsdb_idl_row *row;
+    const struct ovsdb_idl_row *row = NULL;
     struct uuid uuid;
 
-    row = NULL;
+    ovs_assert(rowp);
+
     if (uuid_from_string(&uuid, record_id)) {
-        row = ovsdb_idl_get_row_for_uuid(ctx->idl, table->class, &uuid);
+        row = ovsdb_idl_get_row_for_uuid(ctx->idl, table, &uuid);
     }
     if (!row) {
-        int i;
-
-        for (i = 0; i < ARRAY_SIZE(table->row_ids); i++) {
-            row = get_row_by_id(ctx, table, &table->row_ids[i], record_id);
+        if (!strcmp(record_id, ".")) {
+            row = ovsdb_idl_first_row(ctx->idl, table);
+            if (row && ovsdb_idl_next_row(row)) {
+                row = NULL;
+            }
+        }
+    }
+    if (!row) {
+        const struct ctl_table_class *ctl_class
+            = &ctl_classes[table - idl_classes];
+        for (int i = 0; i < ARRAY_SIZE(ctl_class->row_ids); i++) {
+            const struct ctl_row_id *id = &ctl_class->row_ids[i];
+            bool multiple_matches;
+
+            row = get_row_by_id(ctx, table, id, record_id, &multiple_matches);
+            if (multiple_matches) {
+                const struct ovsdb_idl_class *class =
+                    ovsdb_idl_get_class(ctx->idl);
+                const struct ovsdb_idl_table_class *table_class =
+                    ovsdb_idl_table_class_from_column(class, id->name_column);
+                return xasprintf("multiple rows in %s match \"%s\"",
+                                 table_class->name, record_id);
+            }
             if (row) {
                 break;
             }
         }
     }
+    if (!row && uuid_is_partial_string(record_id) >= 4) {
+        for (const struct ovsdb_idl_row *r = ovsdb_idl_first_row(ctx->idl,
+                                                                 table);
+             r != NULL;
+             r = ovsdb_idl_next_row(r)) {
+            if (uuid_is_partial_match(&r->uuid, record_id)) {
+                if (!row) {
+                    row = r;
+                } else {
+                    return xasprintf("%s contains 2 or more rows whose UUIDs "
+                                     "begin with %s: at least "UUID_FMT" "
+                                     "and "UUID_FMT, table->name, record_id,
+                                     UUID_ARGS(&row->uuid),
+                                     UUID_ARGS(&r->uuid));
+                }
+            }
+        }
+    }
     if (must_exist && !row) {
-        ctl_fatal("no row \"%s\" in table %s",
-                  record_id, table->class->name);
+        return xasprintf("no row \"%s\" in table %s", record_id, table->name);
     }
-    return row;
+
+    *rowp = row;
+    return NULL;
 }
 
 static char *
-get_column(const struct ctl_table_class *table, const char *column_name,
+get_column(const struct ovsdb_idl_table_class *table, const char *column_name,
            const struct ovsdb_idl_column **columnp)
 {
+    struct sset best_matches = SSET_INITIALIZER(&best_matches);
     const struct ovsdb_idl_column *best_match = NULL;
     unsigned int best_score = 0;
-    size_t i;
 
-    for (i = 0; i < table->class->n_columns; i++) {
-        const struct ovsdb_idl_column *column = &table->class->columns[i];
+    for (size_t i = 0; i < table->n_columns; i++) {
+        const struct ovsdb_idl_column *column = &table->columns[i];
         unsigned int score = score_partial_match(column->name, column_name);
-        if (score > best_score) {
+        if (score && score >= best_score) {
+            if (score > best_score) {
+                sset_clear(&best_matches);
+            }
+            sset_add(&best_matches, column->name);
             best_match = column;
             best_score = score;
-        } else if (score == best_score) {
-            best_match = NULL;
         }
     }
 
-    *columnp = best_match;
-    if (best_match) {
-        return NULL;
-    } else if (best_score) {
-        return xasprintf("%s contains more than one column whose name "
-                         "matches \"%s\"", table->class->name, column_name);
+    char *error = NULL;
+    *columnp = NULL;
+    if (!best_match) {
+        error = xasprintf("%s does not contain a column whose name matches "
+                          "\"%s\"", table->name, column_name);
+    } else if (sset_count(&best_matches) == 1) {
+        *columnp = best_match;
     } else {
-        return xasprintf("%s does not contain a column whose name matches "
-                         "\"%s\"", table->class->name, column_name);
+        char *matches = sset_join(&best_matches, ", ", "");
+        error = xasprintf("%s contains more than one column "
+                          "whose name matches \"%s\": %s",
+                          table->name, column_name, matches);
+        free(matches);
     }
+    sset_destroy(&best_matches);
+    return error;
 }
 
-static void
+static char * OVS_WARN_UNUSED_RESULT
 pre_get_column(struct ctl_context *ctx,
-               const struct ctl_table_class *table, const char *column_name,
+               const struct ovsdb_idl_table_class *table,
+               const char *column_name,
                const struct ovsdb_idl_column **columnp)
 {
-    die_if_error(get_column(table, column_name, columnp));
+    char *error = get_column(table, column_name, columnp);
+    if (error) {
+        return error;
+    }
     ovsdb_idl_add_column(ctx->idl, *columnp);
+    return NULL;
 }
 
-static const struct ctl_table_class *
-pre_get_table(struct ctl_context *ctx, const char *table_name)
+static char * OVS_WARN_UNUSED_RESULT
+pre_get_table(struct ctl_context *ctx, const char *table_name,
+              const struct ovsdb_idl_table_class **tablep)
 {
-    const struct ctl_table_class *table_class;
-    int i;
-
-    table_class = get_table(table_name);
-    ovsdb_idl_add_table(ctx->idl, table_class->class);
+    const struct ovsdb_idl_table_class *table;
+    char *error = get_table(table_name, &table);
+    if (error) {
+        return error;
+    }
+    ovsdb_idl_add_table(ctx->idl, table);
 
-    for (i = 0; i < ARRAY_SIZE(table_class->row_ids); i++) {
-        const struct ctl_row_id *id = &table_class->row_ids[i];
-        if (id->table) {
-            ovsdb_idl_add_table(ctx->idl, id->table);
-        }
+    const struct ctl_table_class *ctl = &ctl_classes[table - idl_classes];
+    for (int i = 0; i < ARRAY_SIZE(ctl->row_ids); i++) {
+        const struct ctl_row_id *id = &ctl->row_ids[i];
         if (id->name_column) {
             ovsdb_idl_add_column(ctx->idl, id->name_column);
         }
@@ -386,7 +501,10 @@ pre_get_table(struct ctl_context *ctx, const char *table_name)
         }
     }
 
-    return table_class;
+    if (tablep) {
+        *tablep = table;
+    }
+    return NULL;
 }
 
 static char *
@@ -433,7 +551,7 @@ missing_operator_error(const char *arg, const char **allowed_operators,
  * message and stores NULL into all of the nonnull output arguments. */
 static char * OVS_WARN_UNUSED_RESULT
 parse_column_key_value(const char *arg,
-                       const struct ctl_table_class *table,
+                       const struct ovsdb_idl_table_class *table,
                        const struct ovsdb_idl_column **columnp, char **keyp,
                        int *operatorp,
                        const char **allowed_operators, size_t n_allowed,
@@ -529,35 +647,43 @@ parse_column_key_value(const char *arg,
     return error;
 }
 
-static const struct ovsdb_idl_column *
-pre_parse_column_key_value(struct ctl_context *ctx,
-                           const char *arg,
-                           const struct ctl_table_class *table)
+static char * OVS_WARN_UNUSED_RESULT
+pre_parse_column_key_value(struct ctl_context *ctx, const char *arg,
+                           const struct ovsdb_idl_table_class *table)
 {
     const struct ovsdb_idl_column *column;
     const char *p;
-    char *column_name;
+    char *column_name = NULL;
+    char *error;
 
     p = arg;
-    die_if_error(ovsdb_token_parse(&p, &column_name));
+    error = ovsdb_token_parse(&p, &column_name);
+    if (error) {
+        goto out;
+    }
     if (column_name[0] == '\0') {
-        ctl_fatal("%s: missing column name", arg);
+        error = xasprintf("%s: missing column name", arg);
+        goto out;
     }
 
-    pre_get_column(ctx, table, column_name, &column);
+    error = pre_get_column(ctx, table, column_name, &column);
+out:
     free(column_name);
 
-    return column;
+    return error;
 }
 
-static void
+/* Checks if the 'column' is mutable. Returns NULL if it is mutable, or a
+ * malloc()'ed error message otherwise. */
+static char * OVS_WARN_UNUSED_RESULT
 check_mutable(const struct ovsdb_idl_row *row,
               const struct ovsdb_idl_column *column)
 {
     if (!ovsdb_idl_is_mutable(row, column)) {
-        ctl_fatal("cannot modify read-only column %s in table %s",
-                  column->name, row->table->class->name);
+        return xasprintf("cannot modify read-only column %s in table %s",
+                         column->name, row->table->class_->name);
     }
+    return NULL;
 }
 
 #define RELOPS                                  \
@@ -572,7 +698,9 @@ check_mutable(const struct ovsdb_idl_row *row,
     RELOP(RELOP_SET_LT, "{<}")                  \
     RELOP(RELOP_SET_GT, "{>}")                  \
     RELOP(RELOP_SET_LE, "{<=}")                 \
-    RELOP(RELOP_SET_GE, "{>=}")
+    RELOP(RELOP_SET_GE, "{>=}")                 \
+    RELOP(RELOP_SET_IN, "{in}")                 \
+    RELOP(RELOP_SET_NOT_IN, "{not-in}")
 
 enum relop {
 #define RELOP(ENUM, STRING) ENUM,
@@ -585,7 +713,8 @@ is_set_operator(enum relop op)
 {
     return (op == RELOP_SET_EQ || op == RELOP_SET_NE ||
             op == RELOP_SET_LT || op == RELOP_SET_GT ||
-            op == RELOP_SET_LE || op == RELOP_SET_GE);
+            op == RELOP_SET_LE || op == RELOP_SET_GE ||
+            op == RELOP_SET_IN || op == RELOP_SET_NOT_IN);
 }
 
 static bool
@@ -613,19 +742,26 @@ evaluate_relop(const struct ovsdb_datum *a, const struct ovsdb_datum *b,
     case RELOP_SET_GT:
         return a->n > b->n && ovsdb_datum_includes_all(b, a, type);
     case RELOP_SET_LE:
+    case RELOP_SET_IN:
         return ovsdb_datum_includes_all(a, b, type);
     case RELOP_SET_GE:
         return ovsdb_datum_includes_all(b, a, type);
+    case RELOP_SET_NOT_IN:
+        return ovsdb_datum_excludes_all(a, b, type);
 
     default:
         OVS_NOT_REACHED();
     }
 }
 
-static bool
-is_condition_satisfied(const struct ctl_table_class *table,
-                       const struct ovsdb_idl_row *row, const char *arg,
-                       struct ovsdb_symbol_table *symtab)
+/* Checks if given row satisfies the specified condition. Returns the result of
+ * evaluating the condition in 'satisfied' flag and NULL as a return value on
+ * success. On failure returns a malloc()'ed error message and 'satisfied'
+ * value is not modified. */
+static char * OVS_WARN_UNUSED_RESULT
+check_condition(const struct ovsdb_idl_table_class *table,
+                const struct ovsdb_idl_row *row, const char *arg,
+                struct ovsdb_symbol_table *symtab, bool *satisfied)
 {
     static const char *operators[] = {
 #define RELOP(ENUM, STRING) STRING,
@@ -635,18 +771,24 @@ is_condition_satisfied(const struct ctl_table_class *table,
 
     const struct ovsdb_idl_column *column;
     const struct ovsdb_datum *have_datum;
-    char *key_string, *value_string;
+    char *key_string = NULL;
+    char *value_string = NULL;
     struct ovsdb_type type;
     int operator;
     bool retval;
     char *error;
 
+    ovs_assert(satisfied);
+
     error = parse_column_key_value(arg, table, &column, &key_string,
                                    &operator, operators, ARRAY_SIZE(operators),
                                    &value_string);
-    die_if_error(error);
+    if (error) {
+        goto out;
+    }
     if (!value_string) {
-        ctl_fatal("%s: missing value", arg);
+        error = xasprintf("%s: missing value", arg);
+        goto out;
     }
 
     type = column->type;
@@ -659,16 +801,23 @@ is_condition_satisfied(const struct ctl_table_class *table,
         unsigned int idx;
 
         if (column->type.value.type == OVSDB_TYPE_VOID) {
-            ctl_fatal("cannot specify key to check for non-map column %s",
-                      column->name);
+            error = xasprintf("cannot specify key to check for non-map column "
+                              "%s", column->name);
+            goto out;
         }
 
-        die_if_error(ovsdb_atom_from_string(&want_key, &column->type.key,
-                                            key_string, symtab));
+        error = ovsdb_atom_from_string(&want_key, NULL, &column->type.key,
+                                       key_string, symtab);
+        if (error) {
+            goto out;
+        }
 
         type.key = type.value;
         type.value.type = OVSDB_TYPE_VOID;
-        die_if_error(ovsdb_datum_from_string(&b, &type, value_string, symtab));
+        error = ovsdb_datum_from_string(&b, &type, value_string, symtab);
+        if (error) {
+            goto out;
+        }
 
         idx = ovsdb_datum_find_key(have_datum,
                                    &want_key, column->type.key.type);
@@ -695,23 +844,28 @@ is_condition_satisfied(const struct ctl_table_class *table,
     } else {
         struct ovsdb_datum want_datum;
 
-        die_if_error(ovsdb_datum_from_string(&want_datum, &column->type,
-                                             value_string, symtab));
+        error = ovsdb_datum_from_string(&want_datum, &column->type,
+                                        value_string, symtab);
+        if (error) {
+            goto out;
+        }
         retval = evaluate_relop(have_datum, &want_datum, &type, operator);
         ovsdb_datum_destroy(&want_datum, &column->type);
     }
 
+    *satisfied = retval;
+out:
     free(key_string);
     free(value_string);
 
-    return retval;
+    return error;
 }
 
 static void
 invalidate_cache(struct ctl_context *ctx)
 {
-    if (ctx->invalidate_cache) {
-        (ctx->invalidate_cache)(ctx);
+    if (ctx->invalidate_cache_cb) {
+        (ctx->invalidate_cache_cb)(ctx);
     }
 }
 \f
@@ -720,7 +874,7 @@ pre_cmd_get(struct ctl_context *ctx)
 {
     const char *id = shash_find_data(&ctx->options, "--id");
     const char *table_name = ctx->argv[1];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     int i;
 
     /* Using "get" without --id or a column name could possibly make sense.
@@ -732,14 +886,20 @@ pre_cmd_get(struct ctl_context *ctx)
                   "possibly erroneous");
     }
 
-    table = pre_get_table(ctx, table_name);
+    ctx->error = pre_get_table(ctx, table_name, &table);
+    if (ctx->error) {
+        return;
+    }
     for (i = 3; i < ctx->argc; i++) {
         if (!strcasecmp(ctx->argv[i], "_uuid")
             || !strcasecmp(ctx->argv[i], "-uuid")) {
             continue;
         }
 
-        pre_parse_column_key_value(ctx, ctx->argv[i], table);
+        ctx->error = pre_parse_column_key_value(ctx, ctx->argv[i], table);
+        if (ctx->error) {
+            return;
+        }
     }
 }
 
@@ -750,29 +910,40 @@ cmd_get(struct ctl_context *ctx)
     bool must_exist = !shash_find(&ctx->options, "--if-exists");
     const char *table_name = ctx->argv[1];
     const char *record_id = ctx->argv[2];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     const struct ovsdb_idl_row *row;
     struct ds *out = &ctx->output;
     int i;
 
     if (id && !must_exist) {
-        ctl_fatal("--if-exists and --id may not be specified together");
+        ctl_error(ctx, "--if-exists and --id may not be specified together");
+        return;
     }
 
-    table = get_table(table_name);
-    row = get_row(ctx, table, record_id, must_exist);
+    ctx->error = get_table(table_name, &table);
+    if (ctx->error) {
+        return;
+    }
+    ctx->error = ctl_get_row(ctx, table, record_id, must_exist, &row);
+    if (ctx->error) {
+        return;
+    }
     if (!row) {
         return;
     }
 
     if (id) {
-        struct ovsdb_symbol *symbol;
-        bool new;
+        struct ovsdb_symbol *symbol = NULL;
+        bool new = false;
 
-        symbol = create_symbol(ctx->symtab, id, &new);
+        ctx->error = create_symbol(ctx->symtab, id, &symbol, &new);
+        if (ctx->error) {
+            return;
+        }
         if (!new) {
-            ctl_fatal("row id \"%s\" specified on \"get\" command was used "
-                      "before it was defined", id);
+            ctl_error(ctx, "row id \"%s\" specified on \"get\" command was "
+                      "used before it was defined", id);
+            return;
         }
         symbol->uuid = row->uuid;
 
@@ -794,9 +965,11 @@ cmd_get(struct ctl_context *ctx)
             continue;
         }
 
-        die_if_error(parse_column_key_value(ctx->argv[i], table,
-                                            &column, &key_string,
-                                            NULL, NULL, 0, NULL));
+        ctx->error = parse_column_key_value(ctx->argv[i], table, &column,
+                                            &key_string, NULL, NULL, 0, NULL);
+        if (ctx->error) {
+            return;
+        }
 
         ovsdb_idl_txn_verify(row, column);
         datum = ovsdb_idl_read(row, column);
@@ -805,21 +978,30 @@ cmd_get(struct ctl_context *ctx)
             unsigned int idx;
 
             if (column->type.value.type == OVSDB_TYPE_VOID) {
-                ctl_fatal("cannot specify key to get for non-map column %s",
+                ctl_error(ctx,
+                          "cannot specify key to get for non-map column %s",
                           column->name);
+                free(key_string);
+                return;
             }
 
-            die_if_error(ovsdb_atom_from_string(&key,
-                                                &column->type.key,
-                                                key_string, ctx->symtab));
+            ctx->error = ovsdb_atom_from_string(&key, NULL, &column->type.key,
+                                                key_string, ctx->symtab);
+            if (ctx->error) {
+                free(key_string);
+                return;
+            }
 
             idx = ovsdb_datum_find_key(datum, &key,
                                        column->type.key.type);
             if (idx == UINT_MAX) {
                 if (must_exist) {
-                    ctl_fatal("no key \"%s\" in %s record \"%s\" column %s",
-                              key_string, table->class->name, record_id,
-                              column->name);
+                    ctl_error(
+                        ctx, "no key \"%s\" in %s record \"%s\" column %s",
+                        key_string, table->name, record_id, column->name);
+                    free(key_string);
+                    ovsdb_atom_destroy(&key, column->type.key.type);
+                    return;
                 }
             } else {
                 ovsdb_atom_to_string(&datum->values[idx],
@@ -835,9 +1017,10 @@ cmd_get(struct ctl_context *ctx)
     }
 }
 
-static void
+/* Returns NULL on success or malloc()'ed error message on failure. */
+static char * OVS_WARN_UNUSED_RESULT
 parse_column_names(const char *column_names,
-                   const struct ctl_table_class *table,
+                   const struct ovsdb_idl_table_class *table,
                    const struct ovsdb_idl_column ***columnsp,
                    size_t *n_columnsp)
 {
@@ -847,11 +1030,11 @@ parse_column_names(const char *column_names,
     if (!column_names) {
         size_t i;
 
-        n_columns = table->class->n_columns + 1;
+        n_columns = table->n_columns + 1;
         columns = xmalloc(n_columns * sizeof *columns);
         columns[0] = NULL;
-        for (i = 0; i < table->class->n_columns; i++) {
-            columns[i + 1] = &table->class->columns[i];
+        for (i = 0; i < table->n_columns; i++) {
+            columns[i + 1] = &table->columns[i];
         }
     } else {
         char *s = xstrdup(column_names);
@@ -868,7 +1051,12 @@ parse_column_names(const char *column_names,
             if (!strcasecmp(column_name, "_uuid")) {
                 column = NULL;
             } else {
-                die_if_error(get_column(table, column_name, &column));
+                char *error = get_column(table, column_name, &column);
+                if (error) {
+                    free(columns);
+                    free(s);
+                    return error;
+                }
             }
             if (n_columns >= allocated_columns) {
                 columns = x2nrealloc(columns, &allocated_columns,
@@ -879,29 +1067,35 @@ parse_column_names(const char *column_names,
         free(s);
 
         if (!n_columns) {
-            ctl_fatal("must specify at least one column name");
+            return xstrdup("must specify at least one column name");
         }
     }
     *columnsp = columns;
     *n_columnsp = n_columns;
+    return NULL;
 }
 
-static void
+static char * OVS_WARN_UNUSED_RESULT
 pre_list_columns(struct ctl_context *ctx,
-                 const struct ctl_table_class *table,
+                 const struct ovsdb_idl_table_class *table,
                  const char *column_names)
 {
     const struct ovsdb_idl_column **columns;
     size_t n_columns;
     size_t i;
+    char *error;
 
-    parse_column_names(column_names, table, &columns, &n_columns);
+    error = parse_column_names(column_names, table, &columns, &n_columns);
+    if (error) {
+        return error;
+    }
     for (i = 0; i < n_columns; i++) {
         if (columns[i]) {
             ovsdb_idl_add_column(ctx->idl, columns[i]);
         }
     }
     free(columns);
+    return NULL;
 }
 
 static void
@@ -909,10 +1103,16 @@ pre_cmd_list(struct ctl_context *ctx)
 {
     const char *column_names = shash_find_data(&ctx->options, "--columns");
     const char *table_name = ctx->argv[1];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
 
-    table = pre_get_table(ctx, table_name);
-    pre_list_columns(ctx, table, column_names);
+    ctx->error = pre_get_table(ctx, table_name, &table);
+    if (ctx->error) {
+        return;
+    }
+    ctx->error = pre_list_columns(ctx, table, column_names);
+    if (ctx->error) {
+        return;
+    }
 }
 
 static struct table *
@@ -978,23 +1178,36 @@ cmd_list(struct ctl_context *ctx)
     bool must_exist = !shash_find(&ctx->options, "--if-exists");
     const struct ovsdb_idl_column **columns;
     const char *table_name = ctx->argv[1];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     struct table *out;
     size_t n_columns;
     int i;
 
-    table = get_table(table_name);
-    parse_column_names(column_names, table, &columns, &n_columns);
+    ctx->error = get_table(table_name, &table);
+    if (ctx->error) {
+        return;
+    }
+    ctx->error = parse_column_names(column_names, table, &columns, &n_columns);
+    if (ctx->error) {
+        return;
+    }
     out = ctx->table = list_make_table(columns, n_columns);
     if (ctx->argc > 2) {
         for (i = 2; i < ctx->argc; i++) {
-            list_record(get_row(ctx, table, ctx->argv[i], must_exist),
-                        columns, n_columns, out);
+            const struct ovsdb_idl_row *row;
+
+            ctx->error = ctl_get_row(ctx, table, ctx->argv[i], must_exist,
+                                     &row);
+            if (ctx->error) {
+                free(columns);
+                return;
+            }
+            list_record(row, columns, n_columns, out);
         }
     } else {
         const struct ovsdb_idl_row *row;
 
-        for (row = ovsdb_idl_first_row(ctx->idl, table->class); row != NULL;
+        for (row = ovsdb_idl_first_row(ctx->idl, table); row != NULL;
              row = ovsdb_idl_next_row(row)) {
             list_record(row, columns, n_columns, out);
         }
@@ -1002,33 +1215,42 @@ cmd_list(struct ctl_context *ctx)
     free(columns);
 }
 
-/* Finds and returns the "struct ctl_table_class *" with 'table_name' by
- * searching the 'tables'. */
-static const struct ctl_table_class *
-get_table(const char *table_name)
+/* Finds the "struct ovsdb_idl_table_class *" with 'table_name' by searching
+ * the tables in these schema. Returns NULL and sets 'tablep' on success, or a
+ * malloc()'ed error message on failure. */
+static char * OVS_WARN_UNUSED_RESULT
+get_table(const char *table_name, const struct ovsdb_idl_table_class **tablep)
 {
-    const struct ctl_table_class *table;
-    const struct ctl_table_class *best_match = NULL;
+    struct sset best_matches = SSET_INITIALIZER(&best_matches);
+    const struct ovsdb_idl_table_class *best_match = NULL;
     unsigned int best_score = 0;
 
-    for (table = tables; table->class; table++) {
-        unsigned int score = score_partial_match(table->class->name,
-                                                 table_name);
-        if (score > best_score) {
+    for (const struct ovsdb_idl_table_class *table = idl_classes;
+         table < &idl_classes[n_classes]; table++) {
+        unsigned int score = score_partial_match(table->name, table_name);
+        if (score && score >= best_score) {
+            if (score > best_score) {
+                sset_clear(&best_matches);
+            }
+            sset_add(&best_matches, table->name);
             best_match = table;
             best_score = score;
-        } else if (score == best_score) {
-            best_match = NULL;
         }
     }
-    if (best_match) {
-        return best_match;
-    } else if (best_score) {
-        ctl_fatal("multiple table names match \"%s\"", table_name);
+
+    char *error = NULL;
+    if (!best_match) {
+        error = xasprintf("unknown table \"%s\"", table_name);
+    } else if (sset_count(&best_matches) == 1) {
+        *tablep = best_match;
     } else {
-        ctl_fatal("unknown table \"%s\"", table_name);
+        char *matches = sset_join(&best_matches, ", ", "");
+        error = xasprintf("\"%s\" matches multiple table names: %s",
+                          table_name, matches);
+        free(matches);
     }
-    return NULL;
+    sset_destroy(&best_matches);
+    return error;
 }
 
 static void
@@ -1036,13 +1258,22 @@ pre_cmd_find(struct ctl_context *ctx)
 {
     const char *column_names = shash_find_data(&ctx->options, "--columns");
     const char *table_name = ctx->argv[1];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     int i;
 
-    table = pre_get_table(ctx, table_name);
-    pre_list_columns(ctx, table, column_names);
+    ctx->error = pre_get_table(ctx, table_name, &table);
+    if (ctx->error) {
+        return;
+    }
+    ctx->error = pre_list_columns(ctx, table, column_names);
+    if (ctx->error) {
+        return;
+    }
     for (i = 2; i < ctx->argc; i++) {
-        pre_parse_column_key_value(ctx, ctx->argv[i], table);
+        ctx->error = pre_parse_column_key_value(ctx, ctx->argv[i], table);
+        if (ctx->error) {
+            return;
+        }
     }
 }
 
@@ -1052,21 +1283,34 @@ cmd_find(struct ctl_context *ctx)
     const char *column_names = shash_find_data(&ctx->options, "--columns");
     const struct ovsdb_idl_column **columns;
     const char *table_name = ctx->argv[1];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     const struct ovsdb_idl_row *row;
     struct table *out;
     size_t n_columns;
 
-    table = get_table(table_name);
-    parse_column_names(column_names, table, &columns, &n_columns);
+    ctx->error = get_table(table_name, &table);
+    if (ctx->error) {
+        return;
+    }
+    ctx->error = parse_column_names(column_names, table, &columns, &n_columns);
+    if (ctx->error) {
+        return;
+    }
     out = ctx->table = list_make_table(columns, n_columns);
-    for (row = ovsdb_idl_first_row(ctx->idl, table->class); row;
+    for (row = ovsdb_idl_first_row(ctx->idl, table); row;
          row = ovsdb_idl_next_row(row)) {
         int i;
 
         for (i = 2; i < ctx->argc; i++) {
-            if (!is_condition_satisfied(table, row, ctx->argv[i],
-                                        ctx->symtab)) {
+            bool satisfied = false;
+
+            ctx->error = check_condition(table, row, ctx->argv[i],
+                                         ctx->symtab, &satisfied);
+            if (ctx->error) {
+                free(columns);
+                return;
+            }
+            if (!satisfied) {
                 goto next_row;
             }
         }
@@ -1077,40 +1321,55 @@ cmd_find(struct ctl_context *ctx)
     free(columns);
 }
 
-/* Sets the column of 'row' in 'table'. */
-static void
-set_column(const struct ctl_table_class *table,
+/* Sets the column of 'row' in 'table'. Returns NULL on success or a
+ * malloc()'ed error message on failure. */
+static char * OVS_WARN_UNUSED_RESULT
+set_column(const struct ovsdb_idl_table_class *table,
            const struct ovsdb_idl_row *row, const char *arg,
            struct ovsdb_symbol_table *symtab)
 {
     const struct ovsdb_idl_column *column;
-    char *key_string, *value_string;
+    char *key_string = NULL;
+    char *value_string = NULL;
     char *error;
 
     error = parse_column_key_value(arg, table, &column, &key_string,
                                    NULL, NULL, 0, &value_string);
-    die_if_error(error);
+    if (error) {
+        goto out;
+    }
     if (!value_string) {
-        ctl_fatal("%s: missing value", arg);
+        error = xasprintf("%s: missing value", arg);
+        goto out;
+    }
+    error = check_mutable(row, column);
+    if (error) {
+        goto out;
     }
-    check_mutable(row, column);
 
     if (key_string) {
         union ovsdb_atom key, value;
         struct ovsdb_datum datum;
 
         if (column->type.value.type == OVSDB_TYPE_VOID) {
-            ctl_fatal("cannot specify key to set for non-map column %s",
-                      column->name);
+            error = xasprintf("cannot specify key to set for non-map column "
+                              "%s", column->name);
+            goto out;
         }
 
-        die_if_error(ovsdb_atom_from_string(&key, &column->type.key,
-                                            key_string, symtab));
-        die_if_error(ovsdb_atom_from_string(&value, &column->type.value,
-                                            value_string, symtab));
+        error = ovsdb_atom_from_string(&key, NULL, &column->type.key,
+                                       key_string, symtab);
+        if (error) {
+            goto out;
+        }
+        error = ovsdb_atom_from_string(&value, NULL, &column->type.value,
+                                       value_string, symtab);
+        if (error) {
+            goto out;
+        }
 
         ovsdb_datum_init_empty(&datum);
-        ovsdb_datum_add_unsafe(&datum, &key, &value, &column->type);
+        ovsdb_datum_add_unsafe(&datum, &key, &value, &column->type, NULL);
 
         ovsdb_atom_destroy(&key, column->type.key.type);
         ovsdb_atom_destroy(&value, column->type.value.type);
@@ -1122,25 +1381,37 @@ set_column(const struct ctl_table_class *table,
     } else {
         struct ovsdb_datum datum;
 
-        die_if_error(ovsdb_datum_from_string(&datum, &column->type,
-                                             value_string, symtab));
+        error = ovsdb_datum_from_string(&datum, &column->type,
+                                        value_string, symtab);
+        if (error) {
+            goto out;
+        }
         ovsdb_idl_txn_write(row, column, &datum);
     }
 
+out:
     free(key_string);
     free(value_string);
+
+    return error;
 }
 
 static void
 pre_cmd_set(struct ctl_context *ctx)
 {
     const char *table_name = ctx->argv[1];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     int i;
 
-    table = pre_get_table(ctx, table_name);
+    ctx->error = pre_get_table(ctx, table_name, &table);
+    if (ctx->error) {
+        return;
+    }
     for (i = 3; i < ctx->argc; i++) {
-        pre_parse_column_key_value(ctx, ctx->argv[i], table);
+        ctx->error = pre_parse_column_key_value(ctx, ctx->argv[i], table);
+        if (ctx->error) {
+            return;
+        }
     }
 }
 
@@ -1150,18 +1421,27 @@ cmd_set(struct ctl_context *ctx)
     bool must_exist = !shash_find(&ctx->options, "--if-exists");
     const char *table_name = ctx->argv[1];
     const char *record_id = ctx->argv[2];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     const struct ovsdb_idl_row *row;
     int i;
 
-    table = get_table(table_name);
-    row = get_row(ctx, table, record_id, must_exist);
+    ctx->error = get_table(table_name, &table);
+    if (ctx->error) {
+        return;
+    }
+    ctx->error = ctl_get_row(ctx, table, record_id, must_exist, &row);
+    if (ctx->error) {
+        return;
+    }
     if (!row) {
         return;
     }
 
     for (i = 3; i < ctx->argc; i++) {
-        set_column(table, row, ctx->argv[i], ctx->symtab);
+        ctx->error = set_column(table, row, ctx->argv[i], ctx->symtab);
+        if (ctx->error) {
+            return;
+        }
     }
 
     invalidate_cache(ctx);
@@ -1172,11 +1452,17 @@ pre_cmd_add(struct ctl_context *ctx)
 {
     const char *table_name = ctx->argv[1];
     const char *column_name = ctx->argv[3];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     const struct ovsdb_idl_column *column;
 
-    table = pre_get_table(ctx, table_name);
-    pre_get_column(ctx, table, column_name, &column);
+    ctx->error = pre_get_table(ctx, table_name, &table);
+    if (ctx->error) {
+        return;
+    }
+    ctx->error = pre_get_column(ctx, table, column_name, &column);
+    if (ctx->error) {
+        return;
+    }
 }
 
 static void
@@ -1186,20 +1472,32 @@ cmd_add(struct ctl_context *ctx)
     const char *table_name = ctx->argv[1];
     const char *record_id = ctx->argv[2];
     const char *column_name = ctx->argv[3];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     const struct ovsdb_idl_column *column;
     const struct ovsdb_idl_row *row;
     const struct ovsdb_type *type;
     struct ovsdb_datum old;
     int i;
 
-    table = get_table(table_name);
-    die_if_error(get_column(table, column_name, &column));
-    row = get_row(ctx, table, record_id, must_exist);
+    ctx->error = get_table(table_name, &table);
+    if (ctx->error) {
+        return;
+    }
+    ctx->error = get_column(table, column_name, &column);
+    if (ctx->error) {
+        return;
+    }
+    ctx->error = ctl_get_row(ctx, table, record_id, must_exist, &row);
+    if (ctx->error) {
+        return;
+    }
     if (!row) {
         return;
     }
-    check_mutable(row, column);
+    ctx->error = check_mutable(row, column);
+    if (ctx->error) {
+        return;
+    }
 
     type = &column->type;
     ovsdb_datum_clone(&old, ovsdb_idl_read(row, column), &column->type);
@@ -1210,17 +1508,23 @@ cmd_add(struct ctl_context *ctx)
         add_type = *type;
         add_type.n_min = 1;
         add_type.n_max = UINT_MAX;
-        die_if_error(ovsdb_datum_from_string(&add, &add_type, ctx->argv[i],
-                                             ctx->symtab));
+        ctx->error = ovsdb_datum_from_string(&add, &add_type, ctx->argv[i],
+                                             ctx->symtab);
+        if (ctx->error) {
+            ovsdb_datum_destroy(&old, &column->type);
+            return;
+        }
         ovsdb_datum_union(&old, &add, type, false);
         ovsdb_datum_destroy(&add, type);
     }
     if (old.n > type->n_max) {
-        ctl_fatal("\"add\" operation would put %u %s in column %s of "
+        ctl_error(ctx, "\"add\" operation would put %u %s in column %s of "
                   "table %s but the maximum number is %u",
                   old.n,
                   type->value.type == OVSDB_TYPE_VOID ? "values" : "pairs",
-                  column->name, table->class->name, type->n_max);
+                  column->name, table->name, type->n_max);
+        ovsdb_datum_destroy(&old, &column->type);
+        return;
     }
     ovsdb_idl_txn_verify(row, column);
     ovsdb_idl_txn_write(row, column, &old);
@@ -1233,11 +1537,17 @@ pre_cmd_remove(struct ctl_context *ctx)
 {
     const char *table_name = ctx->argv[1];
     const char *column_name = ctx->argv[3];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     const struct ovsdb_idl_column *column;
 
-    table = pre_get_table(ctx, table_name);
-    pre_get_column(ctx, table, column_name, &column);
+    ctx->error = pre_get_table(ctx, table_name, &table);
+    if (ctx->error) {
+        return;
+    }
+    ctx->error = pre_get_column(ctx, table, column_name, &column);
+    if (ctx->error) {
+        return;
+    }
 }
 
 static void
@@ -1247,20 +1557,32 @@ cmd_remove(struct ctl_context *ctx)
     const char *table_name = ctx->argv[1];
     const char *record_id = ctx->argv[2];
     const char *column_name = ctx->argv[3];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     const struct ovsdb_idl_column *column;
     const struct ovsdb_idl_row *row;
     const struct ovsdb_type *type;
     struct ovsdb_datum old;
     int i;
 
-    table = get_table(table_name);
-    die_if_error(get_column(table, column_name, &column));
-    row = get_row(ctx, table, record_id, must_exist);
+    ctx->error = get_table(table_name, &table);
+    if (ctx->error) {
+        return;
+    }
+    ctx->error = get_column(table, column_name, &column);
+    if (ctx->error) {
+        return;
+    }
+    ctx->error = ctl_get_row(ctx, table, record_id, must_exist, &row);
+    if (ctx->error) {
+        return;
+    }
     if (!row) {
         return;
     }
-    check_mutable(row, column);
+    ctx->error = check_mutable(row, column);
+    if (ctx->error) {
+        return;
+    }
 
     type = &column->type;
     ovsdb_datum_clone(&old, ovsdb_idl_read(row, column), &column->type);
@@ -1279,21 +1601,29 @@ cmd_remove(struct ctl_context *ctx)
             if (ovsdb_type_is_map(&rm_type)) {
                 rm_type.value.type = OVSDB_TYPE_VOID;
                 free(error);
-                die_if_error(ovsdb_datum_from_string(
-                                                     &rm, &rm_type, ctx->argv[i], ctx->symtab));
+                ctx->error = ovsdb_datum_from_string(&rm, &rm_type,
+                                                     ctx->argv[i],
+                                                     ctx->symtab);
+                if (ctx->error) {
+                    ovsdb_datum_destroy(&old, &column->type);
+                    return;
+                }
             } else {
-                ctl_fatal("%s", error);
+                ctx->error = error;
+                ovsdb_datum_destroy(&old, &column->type);
+                return;
             }
         }
         ovsdb_datum_subtract(&old, type, &rm, &rm_type);
         ovsdb_datum_destroy(&rm, &rm_type);
     }
     if (old.n < type->n_min) {
-        ctl_fatal("\"remove\" operation would put %u %s in column %s of "
-                  "table %s but the minimum number is %u",
-                  old.n,
+        ctl_error(ctx, "\"remove\" operation would put %u %s in column %s of "
+                  "table %s but the minimum number is %u", old.n,
                   type->value.type == OVSDB_TYPE_VOID ? "values" : "pairs",
-                  column->name, table->class->name, type->n_min);
+                  column->name, table->name, type->n_min);
+        ovsdb_datum_destroy(&old, &column->type);
+        return;
     }
     ovsdb_idl_txn_verify(row, column);
     ovsdb_idl_txn_write(row, column, &old);
@@ -1305,14 +1635,20 @@ static void
 pre_cmd_clear(struct ctl_context *ctx)
 {
     const char *table_name = ctx->argv[1];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     int i;
 
-    table = pre_get_table(ctx, table_name);
+    ctx->error = pre_get_table(ctx, table_name, &table);
+    if (ctx->error) {
+        return;
+    }
     for (i = 3; i < ctx->argc; i++) {
         const struct ovsdb_idl_column *column;
 
-        pre_get_column(ctx, table, ctx->argv[i], &column);
+        ctx->error = pre_get_column(ctx, table, ctx->argv[i], &column);
+        if (ctx->error) {
+            return;
+        }
     }
 }
 
@@ -1322,12 +1658,18 @@ cmd_clear(struct ctl_context *ctx)
     bool must_exist = !shash_find(&ctx->options, "--if-exists");
     const char *table_name = ctx->argv[1];
     const char *record_id = ctx->argv[2];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     const struct ovsdb_idl_row *row;
     int i;
 
-    table = get_table(table_name);
-    row = get_row(ctx, table, record_id, must_exist);
+    ctx->error = get_table(table_name, &table);
+    if (ctx->error) {
+        return;
+    }
+    ctx->error = ctl_get_row(ctx, table, record_id, must_exist, &row);
+    if (ctx->error) {
+        return;
+    }
     if (!row) {
         return;
     }
@@ -1337,14 +1679,21 @@ cmd_clear(struct ctl_context *ctx)
         const struct ovsdb_type *type;
         struct ovsdb_datum datum;
 
-        die_if_error(get_column(table, ctx->argv[i], &column));
-        check_mutable(row, column);
+        ctx->error = get_column(table, ctx->argv[i], &column);
+        if (ctx->error) {
+            return;
+        }
+        ctx->error = check_mutable(row, column);
+        if (ctx->error) {
+            return;
+        }
 
         type = &column->type;
         if (type->n_min > 0) {
-            ctl_fatal("\"clear\" operation cannot be applied to column %s "
-                      "of table %s, which is not allowed to be empty",
-                      column->name, table->class->name);
+            ctl_error(ctx, "\"clear\" operation cannot be applied to column "
+                      "%s of table %s, which is not allowed to be empty",
+                      column->name, table->name);
+            return;
         }
 
         ovsdb_datum_init_empty(&datum);
@@ -1359,12 +1708,15 @@ pre_create(struct ctl_context *ctx)
 {
     const char *id = shash_find_data(&ctx->options, "--id");
     const char *table_name = ctx->argv[1];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
 
-    table = get_table(table_name);
-    if (!id && !table->class->is_root) {
+    ctx->error = get_table(table_name, &table);
+    if (ctx->error) {
+        return;
+    }
+    if (!id && !table->is_root) {
         VLOG_WARN("applying \"create\" command to table %s without --id "
-                  "option will have no effect", table->class->name);
+                  "option will have no effect", table->name);
     }
 }
 
@@ -1373,27 +1725,37 @@ cmd_create(struct ctl_context *ctx)
 {
     const char *id = shash_find_data(&ctx->options, "--id");
     const char *table_name = ctx->argv[1];
-    const struct ctl_table_class *table = get_table(table_name);
+    const struct ovsdb_idl_table_class *table;
     const struct ovsdb_idl_row *row;
-    const struct uuid *uuid;
+    const struct uuid *uuid = NULL;
     int i;
 
+    ctx->error = get_table(table_name, &table);
+    if (ctx->error) {
+        return;
+    }
     if (id) {
-        struct ovsdb_symbol *symbol = create_symbol(ctx->symtab, id, NULL);
-        if (table->class->is_root) {
+        struct ovsdb_symbol *symbol = NULL;
+
+        ctx->error = create_symbol(ctx->symtab, id, &symbol, NULL);
+        if (ctx->error) {
+            return;
+        }
+        if (table->is_root) {
             /* This table is in the root set, meaning that rows created in it
              * won't disappear even if they are unreferenced, so disable
              * warnings about that by pretending that there is a reference. */
             symbol->strong_ref = true;
         }
         uuid = &symbol->uuid;
-    } else {
-        uuid = NULL;
     }
 
-    row = ovsdb_idl_txn_insert(ctx->txn, table->class, uuid);
+    row = ovsdb_idl_txn_insert(ctx->txn, table, uuid);
     for (i = 2; i < ctx->argc; i++) {
-        set_column(table, row, ctx->argv[i], ctx->symtab);
+        ctx->error = set_column(table, row, ctx->argv[i], ctx->symtab);
+        if (ctx->error) {
+            return;
+        }
     }
     ds_put_format(&ctx->output, UUID_FMT, UUID_ARGS(&row->uuid));
 }
@@ -1429,7 +1791,10 @@ pre_cmd_destroy(struct ctl_context *ctx)
 {
     const char *table_name = ctx->argv[1];
 
-    pre_get_table(ctx, table_name);
+    ctx->error = pre_get_table(ctx, table_name, NULL);
+    if (ctx->error) {
+        return;
+    }
 }
 
 static void
@@ -1438,24 +1803,31 @@ cmd_destroy(struct ctl_context *ctx)
     bool must_exist = !shash_find(&ctx->options, "--if-exists");
     bool delete_all = shash_find(&ctx->options, "--all");
     const char *table_name = ctx->argv[1];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     int i;
 
-    table = get_table(table_name);
+    ctx->error = get_table(table_name, &table);
+    if (ctx->error) {
+        return;
+    }
 
     if (delete_all && ctx->argc > 2) {
-        ctl_fatal("--all and records argument should not be specified together");
+        ctl_error(ctx, "--all and records argument should not be specified "
+                  "together");
+        return;
     }
 
     if (delete_all && !must_exist) {
-        ctl_fatal("--all and --if-exists should not be specified together");
+        ctl_error(ctx, "--all and --if-exists should not be specified "
+                  "together");
+        return;
     }
 
     if (delete_all) {
         const struct ovsdb_idl_row *row;
         const struct ovsdb_idl_row *next_row;
 
-        for (row = ovsdb_idl_first_row(ctx->idl, table->class);
+        for (row = ovsdb_idl_first_row(ctx->idl, table);
              row;) {
             next_row = ovsdb_idl_next_row(row);
             ovsdb_idl_txn_delete(row);
@@ -1465,7 +1837,11 @@ cmd_destroy(struct ctl_context *ctx)
         for (i = 2; i < ctx->argc; i++) {
             const struct ovsdb_idl_row *row;
 
-            row = get_row(ctx, table, ctx->argv[i], must_exist);
+            ctx->error = ctl_get_row(ctx, table, ctx->argv[i], must_exist,
+                                     &row);
+            if (ctx->error) {
+                return;
+            }
             if (row) {
                 ovsdb_idl_txn_delete(row);
             }
@@ -1478,13 +1854,19 @@ static void
 pre_cmd_wait_until(struct ctl_context *ctx)
 {
     const char *table_name = ctx->argv[1];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     int i;
 
-    table = pre_get_table(ctx, table_name);
+    ctx->error = pre_get_table(ctx, table_name, &table);
+    if (ctx->error) {
+        return;
+    }
 
     for (i = 3; i < ctx->argc; i++) {
-        pre_parse_column_key_value(ctx, ctx->argv[i], table);
+        ctx->error = pre_parse_column_key_value(ctx, ctx->argv[i], table);
+        if (ctx->error) {
+            return;
+        }
     }
 }
 
@@ -1493,20 +1875,32 @@ cmd_wait_until(struct ctl_context *ctx)
 {
     const char *table_name = ctx->argv[1];
     const char *record_id = ctx->argv[2];
-    const struct ctl_table_class *table;
+    const struct ovsdb_idl_table_class *table;
     const struct ovsdb_idl_row *row;
     int i;
 
-    table = get_table(table_name);
-
-    row = get_row(ctx, table, record_id, false);
+    ctx->error = get_table(table_name, &table);
+    if (ctx->error) {
+        return;
+    }
+    ctx->error = ctl_get_row(ctx, table, record_id, false, &row);
+    if (ctx->error) {
+        return;
+    }
     if (!row) {
         ctx->try_again = true;
         return;
     }
 
     for (i = 3; i < ctx->argc; i++) {
-        if (!is_condition_satisfied(table, row, ctx->argv[i], ctx->symtab)) {
+        bool satisfied;
+
+        ctx->error = check_condition(table, row, ctx->argv[i], ctx->symtab,
+                                     &satisfied);
+        if (ctx->error) {
+            return;
+        }
+        if (!satisfied) {
             ctx->try_again = true;
             return;
         }
@@ -1514,7 +1908,7 @@ cmd_wait_until(struct ctl_context *ctx)
 }
 
 /* Parses one command. */
-static void
+static char * OVS_WARN_UNUSED_RESULT
 parse_command(int argc, char *argv[], struct shash *local_options,
               struct ctl_command *command)
 {
@@ -1522,6 +1916,7 @@ parse_command(int argc, char *argv[], struct shash *local_options,
     struct shash_node *node;
     int n_arg;
     int i;
+    char *error;
 
     shash_init(&command->options);
     shash_swap(local_options, &command->options);
@@ -1544,67 +1939,84 @@ parse_command(int argc, char *argv[], struct shash *local_options,
         }
 
         if (shash_find(&command->options, key)) {
-            ctl_fatal("'%s' option specified multiple times", argv[i]);
+            free(key);
+            free(value);
+            error = xasprintf("'%s' option specified multiple times", argv[i]);
+            goto error;
         }
         shash_add_nocopy(&command->options, key, value);
     }
     if (i == argc) {
-        ctl_fatal("missing command name (use --help for help)");
+        error = xstrdup("missing command name (use --help for help)");
+        goto error;
     }
 
     p = shash_find_data(&all_commands, argv[i]);
     if (!p) {
-        ctl_fatal("unknown command '%s'; use --help for help", argv[i]);
+        error = xasprintf("unknown command '%s'; use --help for help",
+                          argv[i]);
+        goto error;
     }
 
     SHASH_FOR_EACH (node, &command->options) {
         const char *s = strstr(p->options, node->name);
         int end = s ? s[strlen(node->name)] : EOF;
 
-        if (end != '=' && end != ',' && end != ' ' && end != '\0') {
-            ctl_fatal("'%s' command has no '%s' option",
-                      argv[i], node->name);
+        if (!strchr("=,? ", end)) {
+            error = xasprintf("'%s' command has no '%s' option",
+                              argv[i], node->name);
+            goto error;
         }
-        if ((end == '=') != (node->data != NULL)) {
+        if (end != '?' && (end == '=') != (node->data != NULL)) {
             if (end == '=') {
-                ctl_fatal("missing argument to '%s' option on '%s' "
-                          "command", node->name, argv[i]);
+                error = xasprintf("missing argument to '%s' option on '%s' "
+                                  "command", node->name, argv[i]);
+                goto error;
             } else {
-                ctl_fatal("'%s' option on '%s' does not accept an "
-                          "argument", node->name, argv[i]);
+                error = xasprintf("'%s' option on '%s' does not accept an "
+                                  "argument", node->name, argv[i]);
+                goto error;
             }
         }
     }
 
     n_arg = argc - i - 1;
     if (n_arg < p->min_args) {
-        ctl_fatal("'%s' command requires at least %d arguments",
-                  p->name, p->min_args);
+        error = xasprintf("'%s' command requires at least %d arguments",
+                          p->name, p->min_args);
+        goto error;
     } else if (n_arg > p->max_args) {
         int j;
 
         for (j = i + 1; j < argc; j++) {
             if (argv[j][0] == '-') {
-                ctl_fatal("'%s' command takes at most %d arguments "
-                          "(note that options must precede command "
-                          "names and follow a \"--\" argument)",
-                          p->name, p->max_args);
+                error = xasprintf("'%s' command takes at most %d arguments "
+                                  "(note that options must precede command "
+                                  "names and follow a \"--\" argument)",
+                                  p->name, p->max_args);
+                goto error;
             }
         }
 
-        ctl_fatal("'%s' command takes at most %d arguments",
-                  p->name, p->max_args);
+        error = xasprintf("'%s' command takes at most %d arguments",
+                          p->name, p->max_args);
+        goto error;
     }
 
     command->syntax = p;
     command->argc = n_arg + 1;
     command->argv = &argv[i];
+    return NULL;
+
+error:
+    shash_destroy_free_data(&command->options);
+    return error;
 }
 
 static void
 pre_cmd_show(struct ctl_context *ctx)
 {
-    struct cmd_show_table *show;
+    const struct cmd_show_table *show;
 
     for (show = cmd_show_tables; show->table; show++) {
         size_t i;
@@ -1619,26 +2031,35 @@ pre_cmd_show(struct ctl_context *ctx)
                 ovsdb_idl_add_column(ctx->idl, column);
             }
         }
+        if (show->wref_table.table) {
+            ovsdb_idl_add_table(ctx->idl, show->wref_table.table);
+        }
+        if (show->wref_table.name_column) {
+            ovsdb_idl_add_column(ctx->idl, show->wref_table.name_column);
+        }
+        if (show->wref_table.wref_column) {
+            ovsdb_idl_add_column(ctx->idl, show->wref_table.wref_column);
+        }
     }
 }
 
-static struct cmd_show_table *
+static const struct cmd_show_table *
 cmd_show_find_table_by_row(const struct ovsdb_idl_row *row)
 {
-    struct cmd_show_table *show;
+    const struct cmd_show_table *show;
 
     for (show = cmd_show_tables; show->table; show++) {
-        if (show->table == row->table->class) {
+        if (show->table == row->table->class_) {
             return show;
         }
     }
     return NULL;
 }
 
-static struct cmd_show_table *
+static const struct cmd_show_table *
 cmd_show_find_table_by_name(const char *name)
 {
-    struct cmd_show_table *show;
+    const struct cmd_show_table *show;
 
     for (show = cmd_show_tables; show->table; show++) {
         if (!strcmp(show->table->name, name)) {
@@ -1648,11 +2069,47 @@ cmd_show_find_table_by_name(const char *name)
     return NULL;
 }
 
+/*  Prints table entries that weak reference the 'cur_row'. */
+static void
+cmd_show_weak_ref(struct ctl_context *ctx, const struct cmd_show_table *show,
+                  const struct ovsdb_idl_row *cur_row, int level)
+{
+    const struct ovsdb_idl_row *row_wref;
+    const struct ovsdb_idl_table_class *table = show->wref_table.table;
+    const struct ovsdb_idl_column *name_column
+        = show->wref_table.name_column;
+    const struct ovsdb_idl_column *wref_column
+        = show->wref_table.wref_column;
+
+    if (!table || !name_column || !wref_column) {
+        return;
+    }
+
+    for (row_wref = ovsdb_idl_first_row(ctx->idl, table); row_wref;
+         row_wref = ovsdb_idl_next_row(row_wref)) {
+        const struct ovsdb_datum *wref_datum
+            = ovsdb_idl_read(row_wref, wref_column);
+        /* If weak reference refers to the 'cur_row', prints it. */
+        if (wref_datum->n
+            && uuid_equals(&cur_row->uuid, &wref_datum->keys[0].uuid)) {
+            const struct ovsdb_datum *name_datum
+                = ovsdb_idl_read(row_wref, name_column);
+            ds_put_char_multiple(&ctx->output, ' ', (level + 1) * 4);
+            ds_put_format(&ctx->output, "%s ", table->name);
+            ovsdb_datum_to_string(name_datum, &name_column->type, &ctx->output);
+            ds_put_char(&ctx->output, '\n');
+        }
+    }
+}
+
+/* 'shown' records the tables that has been displayed by the current
+ * command to avoid duplicated prints.
+ */
 static void
 cmd_show_row(struct ctl_context *ctx, const struct ovsdb_idl_row *row,
-             int level)
+             int level, struct sset *shown)
 {
-    struct cmd_show_table *show = cmd_show_find_table_by_row(row);
+    const struct cmd_show_table *show = cmd_show_find_table_by_row(row);
     size_t i;
 
     ds_put_char_multiple(&ctx->output, ' ', level * 4);
@@ -1667,11 +2124,11 @@ cmd_show_row(struct ctl_context *ctx, const struct ovsdb_idl_row *row,
     }
     ds_put_char(&ctx->output, '\n');
 
-    if (!show || show->recurse) {
+    if (!show || sset_find(shown, show->table->name)) {
         return;
     }
 
-    show->recurse = true;
+    sset_add(shown, show->table->name);
     for (i = 0; i < ARRAY_SIZE(show->columns); i++) {
         const struct ovsdb_idl_column *column = show->columns[i];
         const struct ovsdb_datum *datum;
@@ -1682,12 +2139,12 @@ cmd_show_row(struct ctl_context *ctx, const struct ovsdb_idl_row *row,
 
         datum = ovsdb_idl_read(row, column);
         if (column->type.key.type == OVSDB_TYPE_UUID &&
-            column->type.key.u.uuid.refTableName) {
-            struct cmd_show_table *ref_show;
+            column->type.key.uuid.refTableName) {
+            const struct cmd_show_table *ref_show;
             size_t j;
 
             ref_show = cmd_show_find_table_by_name(
-                column->type.key.u.uuid.refTableName);
+                column->type.key.uuid.refTableName);
             if (ref_show) {
                 for (j = 0; j < datum->n; j++) {
                     const struct ovsdb_idl_row *ref_row;
@@ -1696,21 +2153,21 @@ cmd_show_row(struct ctl_context *ctx, const struct ovsdb_idl_row *row,
                                                          ref_show->table,
                                                          &datum->keys[j].uuid);
                     if (ref_row) {
-                        cmd_show_row(ctx, ref_row, level + 1);
+                        cmd_show_row(ctx, ref_row, level + 1, shown);
                     }
                 }
                 continue;
             }
         } else if (ovsdb_type_is_map(&column->type) &&
                    column->type.value.type == OVSDB_TYPE_UUID &&
-                   column->type.value.u.uuid.refTableName) {
-            struct cmd_show_table *ref_show;
+                   column->type.value.uuid.refTableName) {
+            const struct cmd_show_table *ref_show;
             size_t j;
 
             /* Prints the key to ref'ed table name map if the ref'ed table
              * is also defined in 'cmd_show_tables'.  */
             ref_show = cmd_show_find_table_by_name(
-                column->type.value.u.uuid.refTableName);
+                column->type.value.uuid.refTableName);
             if (ref_show && ref_show->name_column) {
                 ds_put_char_multiple(&ctx->output, ' ', (level + 1) * 4);
                 ds_put_format(&ctx->output, "%s:\n", column->name);
@@ -1749,18 +2206,23 @@ cmd_show_row(struct ctl_context *ctx, const struct ovsdb_idl_row *row,
             ds_put_char(&ctx->output, '\n');
         }
     }
-    show->recurse = false;
+    cmd_show_weak_ref(ctx, show, row, level);
+    sset_find_and_delete_assert(shown, show->table->name);
 }
 
 static void
 cmd_show(struct ctl_context *ctx)
 {
     const struct ovsdb_idl_row *row;
+    struct sset shown = SSET_INITIALIZER(&shown);
 
     for (row = ovsdb_idl_first_row(ctx->idl, cmd_show_tables[0].table);
          row; row = ovsdb_idl_next_row(row)) {
-        cmd_show_row(ctx, row, 0);
+        cmd_show_row(ctx, row, 0, &shown);
     }
+
+    ovs_assert(sset_is_empty(&shown));
+    sset_destroy(&shown);
 }
 
 \f
@@ -1787,19 +2249,14 @@ ctl_add_cmd_options(struct option **options_p, size_t *n_options_p,
             s = xstrdup(p->options);
             for (name = strtok_r(s, ",", &save_ptr); name != NULL;
                  name = strtok_r(NULL, ",", &save_ptr)) {
-                char *equals;
-                int has_arg;
-
                 ovs_assert(name[0] == '-' && name[1] == '-' && name[2]);
                 name += 2;
 
-                equals = strchr(name, '=');
-                if (equals) {
-                    has_arg = required_argument;
-                    *equals = '\0';
-                } else {
-                    has_arg = no_argument;
-                }
+                size_t n = strcspn(name, "=?");
+                int has_arg = (name[n] == '\0' ? no_argument
+                               : name[n] == '=' ? required_argument
+                               : optional_argument);
+                name[n] = '\0';
 
                 o = find_option(name, *options_p, *n_options_p);
                 if (o) {
@@ -1822,17 +2279,21 @@ ctl_add_cmd_options(struct option **options_p, size_t *n_options_p,
 }
 
 /* Parses command-line input for commands. */
-struct ctl_command *
+char *
 ctl_parse_commands(int argc, char *argv[], struct shash *local_options,
-                   size_t *n_commandsp)
+                   struct ctl_command **commandsp, size_t *n_commandsp)
 {
     struct ctl_command *commands;
     size_t n_commands, allocated_commands;
     int i, start;
+    char *error;
 
     commands = NULL;
     n_commands = allocated_commands = 0;
 
+    *commandsp = NULL;
+    *n_commandsp = 0;
+
     for (start = i = 0; i <= argc; i++) {
         if (i == argc || !strcmp(argv[i], "--")) {
             if (i > start) {
@@ -1845,19 +2306,32 @@ ctl_parse_commands(int argc, char *argv[], struct shash *local_options,
                         shash_moved(&c->options);
                     }
                 }
-                parse_command(i - start, &argv[start], local_options,
-                              &commands[n_commands++]);
+                error = parse_command(i - start, &argv[start], local_options,
+                                      &commands[n_commands]);
+                if (error) {
+                    struct ctl_command *c;
+
+                    for (c = commands; c < &commands[n_commands]; c++) {
+                        shash_destroy_free_data(&c->options);
+                    }
+                    free(commands);
+
+                    return error;
+                }
+
+                n_commands++;
             } else if (!shash_is_empty(local_options)) {
-                ctl_fatal("missing command name (use --help for help)");
+                return xstrdup("missing command name (use --help for help)");
             }
             start = i + 1;
         }
     }
     if (!n_commands) {
-        ctl_fatal("missing command name (use --help for help)");
+        return xstrdup("missing command name (use --help for help)");
     }
+    *commandsp = commands;
     *n_commandsp = n_commands;
-    return commands;
+    return NULL;
 }
 
 /* Prints all registered commands. */
@@ -1919,18 +2393,35 @@ ctl_default_db(void)
  * database, otherwise false.  (Not very smart, so it's prone to false
  * positives.) */
 bool
-ctl_might_write_to_db(char **argv)
+ctl_might_write_to_db(const struct ctl_command *commands, size_t n)
 {
-    for (; *argv; argv++) {
-        const struct ctl_command_syntax *p = shash_find_data(&all_commands,
-                                                             *argv);
-        if (p && p->mode == RW) {
+    for (size_t i = 0; i < n; i++) {
+        if (commands[i].syntax->mode == RW) {
             return true;
         }
     }
     return false;
 }
 
+/* Report an error while running in the command context. Caller should return
+ * to its caller immediately after reporting the error. */
+void
+ctl_error(struct ctl_context *ctx, const char *format, ...)
+{
+    va_list args;
+
+    ovs_assert(ctx);
+
+    if (ctx->error) {
+        VLOG_ERR("Discarding unhandled error: %s", ctx->error);
+        free(ctx->error);
+    }
+
+    va_start(args, format);
+    ctx->error = xvasprintf(format, args);
+    va_end(args);
+}
+
 void
 ctl_fatal(const char *format, ...)
 {
@@ -1941,7 +2432,7 @@ ctl_fatal(const char *format, ...)
     message = xvasprintf(format, args);
     va_end(args);
 
-    vlog_set_levels(&VLM_db_ctl_base, VLF_CONSOLE, VLL_OFF);
+    vlog_set_levels(&this_module, VLF_CONSOLE, VLL_OFF);
     VLOG_ERR("%s", message);
     ovs_error(0, "%s", message);
     ctl_exit(EXIT_FAILURE);
@@ -1953,7 +2444,7 @@ ctl_fatal(const char *format, ...)
  * Freeing the transaction and the IDL is not strictly necessary, but it makes
  * for a clean memory leak report from valgrind in the normal case.  That makes
  * it easier to notice real memory leaks. */
-void
+static void
 ctl_exit(int status)
 {
     if (ctl_exit_func) {
@@ -1985,10 +2476,15 @@ static const struct ctl_command_syntax db_ctl_commands[] = {
      NULL, "--if-exists,--all", RW},
     {"wait-until", 2, INT_MAX, "TABLE RECORD [COLUMN[:KEY]=VALUE]...",
      pre_cmd_wait_until, cmd_wait_until, NULL, "", RO},
-    {"show", 0, 0, "", pre_cmd_show, cmd_show, NULL, "", RO},
     {NULL, 0, 0, NULL, NULL, NULL, NULL, NULL, RO},
 };
 
+static void
+ctl_register_command(const struct ctl_command_syntax *command)
+{
+    shash_add_assert(&all_commands, command->name, command);
+}
+
 /* Registers commands represented by 'struct ctl_command_syntax's to
  * 'all_commands'.  The last element of 'commands' must be an all-NULL
  * element. */
@@ -1998,18 +2494,30 @@ ctl_register_commands(const struct ctl_command_syntax *commands)
     const struct ctl_command_syntax *p;
 
     for (p = commands; p->name; p++) {
-        shash_add_assert(&all_commands, p->name, p);
+        ctl_register_command(p);
     }
 }
 
 /* Registers the 'db_ctl_commands' to 'all_commands'. */
 void
-ctl_init(const struct ctl_table_class tables_[],
-         void (*ctl_exit_func_)(int status))
-{
-    tables = tables_;
+ctl_init__(const struct ovsdb_idl_class *idl_class_,
+           const struct ctl_table_class *ctl_classes_,
+           const struct cmd_show_table cmd_show_tables_[],
+           void (*ctl_exit_func_)(int status))
+{
+    idl_class = idl_class_;
+    idl_classes = idl_class_->tables;
+    ctl_classes = ctl_classes_;
+    n_classes = idl_class->n_tables;
     ctl_exit_func = ctl_exit_func_;
     ctl_register_commands(db_ctl_commands);
+
+    cmd_show_tables = cmd_show_tables_;
+    if (cmd_show_tables) {
+        static const struct ctl_command_syntax show =
+            {"show", 0, 0, "", pre_cmd_show, cmd_show, NULL, "", RO};
+        ctl_register_command(&show);
+    }
 }
 
 /* Returns the text for the database commands usage.  */
@@ -2030,6 +2538,63 @@ ctl_get_db_cmd_usage(void)
 Potentially unsafe database commands require --force option.\n";
 }
 
+const char *
+ctl_list_db_tables_usage(void)
+{
+    static struct ds s = DS_EMPTY_INITIALIZER;
+    if (s.length) {
+        return ds_cstr(&s);
+    }
+
+    ds_put_cstr(&s, "Database commands may reference a row in each table in the following ways:\n");
+    for (int i = 0; i < n_classes; i++) {
+        struct svec options = SVEC_EMPTY_INITIALIZER;
+
+        svec_add(&options, "by UUID");
+        if (idl_classes[i].is_singleton) {
+            svec_add(&options, "as \".\"");
+        }
+
+        for (int j = 0; j < ARRAY_SIZE(ctl_classes[i].row_ids); j++) {
+            const struct ctl_row_id *id = &ctl_classes[i].row_ids[j];
+            if (!id->name_column) {
+                continue;
+            }
+
+            struct ds o = DS_EMPTY_INITIALIZER;
+            if (id->uuid_column) {
+                ds_put_format(&o, "via \"%s\"", id->uuid_column->name);
+                const struct ovsdb_idl_table_class *referrer
+                    = ovsdb_idl_table_class_from_column(idl_class,
+                                                        id->uuid_column);
+                if (referrer != &idl_classes[i]) {
+                    ds_put_format(&o, " of %s", referrer->name);
+                }
+                if (id->key) {
+                    ds_put_format(&o, " with matching \"%s:%s\"",
+                                  id->name_column->name, id->key);
+                } else {
+                    ds_put_format(&o, " with matching \"%s\"", id->name_column->name);
+                }
+            } else if (id->key) {
+                ds_put_format(&o, "by \"%s:%s\"", id->name_column->name, id->key);
+            } else {
+                ds_put_format(&o, "by \"%s\"", id->name_column->name);
+            }
+            svec_add_nocopy(&options, ds_steal_cstr(&o));
+        }
+
+        ds_put_format(&s, "  %s:", idl_classes[i].name);
+        for (int j = 0; j < options.n; j++) {
+            ds_put_format(&s, "\n    %s", options.names[j]);
+        }
+        ds_put_char(&s, '\n');
+        svec_destroy(&options);
+    }
+
+    return ds_cstr(&s);
+}
+
 /* Initializes 'ctx' from 'command'. */
 void
 ctl_context_init_command(struct ctl_context *ctx,
@@ -2042,6 +2607,7 @@ ctl_context_init_command(struct ctl_context *ctx,
     ds_swap(&ctx->output, &command->output);
     ctx->table = command->table;
     ctx->try_again = false;
+    ctx->error = NULL;
 }
 
 /* Initializes the entire 'ctx'. */
@@ -2049,7 +2615,7 @@ void
 ctl_context_init(struct ctl_context *ctx, struct ctl_command *command,
                  struct ovsdb_idl *idl, struct ovsdb_idl_txn *txn,
                  struct ovsdb_symbol_table *symtab,
-                 void (*invalidate_cache)(struct ctl_context *))
+                 void (*invalidate_cache_cb)(struct ctl_context *))
 {
     if (command) {
         ctl_context_init_command(ctx, command);
@@ -2057,7 +2623,7 @@ ctl_context_init(struct ctl_context *ctx, struct ctl_command *command,
     ctx->idl = idl;
     ctx->txn = txn;
     ctx->symtab = symtab;
-    ctx->invalidate_cache = invalidate_cache;
+    ctx->invalidate_cache_cb = invalidate_cache_cb;
 }
 
 /* Completes processing of 'command' within 'ctx'. */
@@ -2067,6 +2633,8 @@ ctl_context_done_command(struct ctl_context *ctx,
 {
     ds_swap(&ctx->output, &command->output);
     command->table = ctx->table;
+    free(ctx->error);
+    ctx->error = NULL;
 }
 
 /* Finishes up with 'ctx'.
@@ -2083,9 +2651,21 @@ ctl_context_done(struct ctl_context *ctx,
     invalidate_cache(ctx);
 }
 
-void ctl_set_column(const char *table_name,
-                    const struct ovsdb_idl_row *row, const char *arg,
-                    struct ovsdb_symbol_table *symtab)
+char * OVS_WARN_UNUSED_RESULT
+ctl_set_column(const char *table_name, const struct ovsdb_idl_row *row,
+               const char *arg, struct ovsdb_symbol_table *symtab)
 {
-    set_column(get_table(table_name), row, arg, symtab);
+    const struct ovsdb_idl_table_class *table;
+    char *error;
+
+    error = get_table(table_name, &table);
+    if (error) {
+        return error;
+    }
+    error = set_column(table, row, arg, symtab);
+    if (error) {
+        return error;
+    }
+
+    return NULL;
 }