]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/replication.c
raft: Send all missing logs in one single append_request.
[mirror_ovs.git] / ovsdb / replication.c
1 /*
2 * (c) Copyright 2016, 2017 Hewlett Packard Enterprise Development LP
3 * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016, 2017 Nicira, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include <config.h>
19
20
21 #include "condition.h"
22 #include "jsonrpc.h"
23 #include "openvswitch/dynamic-string.h"
24 #include "openvswitch/hmap.h"
25 #include "openvswitch/json.h"
26 #include "openvswitch/vlog.h"
27 #include "ovsdb-error.h"
28 #include "ovsdb.h"
29 #include "query.h"
30 #include "replication.h"
31 #include "row.h"
32 #include "sset.h"
33 #include "stream.h"
34 #include "svec.h"
35 #include "table.h"
36 #include "transaction.h"
37 #include "uuid.h"
38
39 VLOG_DEFINE_THIS_MODULE(replication);
40
41 static char *sync_from;
42 static struct uuid server_uuid;
43 static struct jsonrpc_session *session;
44 static unsigned int session_seqno = UINT_MAX;
45
46 static struct jsonrpc_msg *create_monitor_request(struct ovsdb_schema *);
47 static void add_monitored_table(struct ovsdb_table_schema *table,
48 struct json *monitor_requests);
49
50 static struct ovsdb_error *reset_database(struct ovsdb *db);
51
52 static struct ovsdb_error *process_notification(struct json *, struct ovsdb *);
53 static struct ovsdb_error *process_table_update(struct json *table_update,
54 const char *table_name,
55 struct ovsdb *database,
56 struct ovsdb_txn *txn);
57
58 static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn,
59 const struct uuid *row_uuid,
60 struct ovsdb_table *table,
61 struct json *new);
62 static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn,
63 const struct uuid *row_uuid,
64 struct ovsdb_table *table);
65 static struct ovsdb_error *execute_update(struct ovsdb_txn *txn,
66 const struct uuid *row_uuid,
67 struct ovsdb_table *table,
68 struct json *new);
69 \f
70 /* Maps from db name to sset of table names. */
71 static struct shash blacklist_tables = SHASH_INITIALIZER(&blacklist_tables);
72
73 static void blacklist_tables_clear(void);
74 static void blacklist_tables_add(const char *database, const char *table);
75 static bool blacklist_tables_find(const char *database, const char* table);
76
77 \f
78 /* Keep track of request IDs of all outstanding OVSDB requests. */
79 static struct hmap request_ids = HMAP_INITIALIZER(&request_ids);
80
81 struct request_ids_hmap_node {
82 struct hmap_node hmap;
83 struct json *request_id;
84 struct ovsdb *db; /* associated database */
85 };
86 void request_ids_add(const struct json *id, struct ovsdb *db);
87 bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db);
88 static void request_ids_destroy(void);
89 void request_ids_clear(void);
90 \f
91 enum ovsdb_replication_state {
92 RPL_S_INIT,
93 RPL_S_SERVER_ID_REQUESTED,
94 RPL_S_DB_REQUESTED,
95 RPL_S_SCHEMA_REQUESTED,
96 RPL_S_MONITOR_REQUESTED,
97 RPL_S_REPLICATING,
98 RPL_S_ERR /* Error, no longer replicating. */
99 };
100 static enum ovsdb_replication_state state;
101
102 \f
103 struct replication_db {
104 struct ovsdb *db;
105 bool schema_version_higher;
106 /* Points to the schema received from the active server if
107 * the local db schema version is higher. NULL otherwise. */
108 struct ovsdb_schema *active_db_schema;
109 };
110
111 static bool is_replication_possible(struct ovsdb_schema *local_db_schema,
112 struct ovsdb_schema *active_db_schema);
113
114 /* All DBs known to ovsdb-server. The actual replication dbs are stored
115 * in 'replication dbs', which is a subset of all dbs and remote dbs whose
116 * schema matches. */
117 static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
118 static struct shash *replication_dbs;
119
120 static struct shash *replication_dbs_create(void);
121 static void replication_dbs_destroy(void);
122 /* Find 'struct ovsdb' by name within 'replication_dbs' */
123 static struct replication_db *find_db(const char *db_name);
124 \f
125
126 void
127 replication_init(const char *sync_from_, const char *exclude_tables,
128 const struct uuid *server, int probe_interval)
129 {
130 free(sync_from);
131 sync_from = xstrdup(sync_from_);
132 /* Caller should have verified that the 'exclude_tables' is
133 * parseable. An error here is unexpected. */
134 ovs_assert(!set_blacklist_tables(exclude_tables, false));
135
136 replication_dbs_destroy();
137
138 shash_clear(&local_dbs);
139 if (session) {
140 jsonrpc_session_close(session);
141 }
142
143 session = jsonrpc_session_open(sync_from, true);
144 session_seqno = UINT_MAX;
145
146 jsonrpc_session_set_probe_interval(session, probe_interval);
147
148 /* Keep a copy of local server uuid. */
149 server_uuid = *server;
150
151 state = RPL_S_INIT;
152 }
153
154 void
155 replication_add_local_db(const char *database, struct ovsdb *db)
156 {
157 shash_add_assert(&local_dbs, database, db);
158 }
159
160 static void
161 send_schema_requests(const struct json *result)
162 {
163 for (size_t i = 0; i < result->array.n; i++) {
164 const struct json *name = result->array.elems[i];
165 if (name->type == JSON_STRING) {
166 /* Send one schema request for each remote DB. */
167 const char *db_name = json_string(name);
168 struct replication_db *rdb = find_db(db_name);
169 if (rdb) {
170 struct jsonrpc_msg *request =
171 jsonrpc_create_request(
172 "get_schema",
173 json_array_create_1(
174 json_string_create(db_name)),
175 NULL);
176
177 request_ids_add(request->id, rdb->db);
178 jsonrpc_session_send(session, request);
179 }
180 }
181 }
182 }
183
184 void
185 replication_run(void)
186 {
187 if (!session) {
188 return;
189 }
190
191 jsonrpc_session_run(session);
192
193 for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
194 struct jsonrpc_msg *msg;
195 unsigned int seqno;
196
197 seqno = jsonrpc_session_get_seqno(session);
198 if (seqno != session_seqno || state == RPL_S_INIT) {
199 session_seqno = seqno;
200 request_ids_clear();
201 struct jsonrpc_msg *request;
202 request = jsonrpc_create_request("get_server_id",
203 json_array_create_empty(), NULL);
204 request_ids_add(request->id, NULL);
205 jsonrpc_session_send(session, request);
206
207 state = RPL_S_SERVER_ID_REQUESTED;
208 VLOG_DBG("send server ID request.");
209 }
210
211 msg = jsonrpc_session_recv(session);
212 if (!msg) {
213 continue;
214 }
215
216 if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR
217 && !strcmp(msg->method, "update")) {
218 if (msg->params->type == JSON_ARRAY
219 && msg->params->array.n == 2
220 && msg->params->array.elems[0]->type == JSON_STRING) {
221 char *db_name = msg->params->array.elems[0]->string;
222 struct replication_db *rdb = find_db(db_name);
223 if (rdb) {
224 struct ovsdb_error *error;
225 error = process_notification(msg->params->array.elems[1],
226 rdb->db);
227 if (error) {
228 ovsdb_error_assert(error);
229 state = RPL_S_ERR;
230 }
231 }
232 }
233 } else if (msg->type == JSONRPC_REPLY) {
234 struct replication_db *rdb;
235 struct ovsdb *db;
236 if (!request_ids_lookup_and_free(msg->id, &db)) {
237 VLOG_WARN("received unexpected reply");
238 goto next;
239 }
240
241 switch (state) {
242 case RPL_S_SERVER_ID_REQUESTED: {
243 struct uuid uuid;
244 if (msg->result->type != JSON_STRING ||
245 !uuid_from_string(&uuid, json_string(msg->result))) {
246 struct ovsdb_error *error;
247 error = ovsdb_error("get_server_id failed",
248 "Server ID is not valid UUID");
249
250 ovsdb_error_assert(error);
251 state = RPL_S_ERR;
252 break;
253 }
254
255 if (uuid_equals(&uuid, &server_uuid)) {
256 struct ovsdb_error *error;
257 error = ovsdb_error("Server ID check failed",
258 "Self replicating is not allowed");
259
260 ovsdb_error_assert(error);
261 state = RPL_S_ERR;
262 break;
263 }
264
265 struct jsonrpc_msg *request;
266 request = jsonrpc_create_request("list_dbs",
267 json_array_create_empty(),
268 NULL);
269 request_ids_add(request->id, NULL);
270 jsonrpc_session_send(session, request);
271
272 replication_dbs_destroy();
273 replication_dbs = replication_dbs_create();
274 state = RPL_S_DB_REQUESTED;
275 break;
276 }
277 case RPL_S_DB_REQUESTED:
278 if (msg->result->type != JSON_ARRAY) {
279 struct ovsdb_error *error;
280 error = ovsdb_error("list_dbs failed",
281 "list_dbs response is not array");
282 ovsdb_error_assert(error);
283 state = RPL_S_ERR;
284 } else {
285 send_schema_requests(msg->result);
286 VLOG_DBG("Send schema requests");
287 state = RPL_S_SCHEMA_REQUESTED;
288 }
289 break;
290
291 case RPL_S_SCHEMA_REQUESTED: {
292 struct ovsdb_schema *schema;
293 struct ovsdb_error *error;
294
295 error = ovsdb_schema_from_json(msg->result, &schema);
296 if (error) {
297 ovsdb_error_assert(error);
298 state = RPL_S_ERR;
299 }
300
301 rdb = find_db(schema->name);
302 if (!rdb) {
303 /* Unexpected schema. */
304 VLOG_WARN("unexpected schema %s", schema->name);
305 state = RPL_S_ERR;
306 } else if (!ovsdb_schema_equal(schema, rdb->db->schema)) {
307 /* Schmea version mismatch. */
308 VLOG_INFO("Schema version mismatch, checking if %s can "
309 "still be replicated or not.",
310 schema->name);
311 if (is_replication_possible(rdb->db->schema, schema)) {
312 VLOG_INFO("%s can be replicated.", schema->name);
313 rdb->schema_version_higher = true;
314 if (rdb->active_db_schema) {
315 ovsdb_schema_destroy(rdb->active_db_schema);
316 }
317 rdb->active_db_schema = schema;
318 } else {
319 VLOG_INFO("%s cannot be replicated.", schema->name);
320 struct replication_db *r =
321 shash_find_and_delete(replication_dbs,
322 schema->name);
323 if (r->active_db_schema) {
324 ovsdb_schema_destroy(r->active_db_schema);
325 }
326 free(r);
327 ovsdb_schema_destroy(schema);
328 }
329 } else {
330 ovsdb_schema_destroy(schema);
331 }
332
333 /* After receiving schemas, reset the local databases that
334 * will be monitored and send out monitor requests for them. */
335 if (hmap_is_empty(&request_ids)) {
336 struct shash_node *node;
337
338 if (shash_is_empty(replication_dbs)) {
339 VLOG_WARN("Nothing to replicate.");
340 state = RPL_S_ERR;
341 } else {
342 SHASH_FOR_EACH (node, replication_dbs) {
343 rdb = node->data;
344 struct jsonrpc_msg *request =
345 create_monitor_request(
346 rdb->schema_version_higher ?
347 rdb->active_db_schema : rdb->db->schema);
348
349 request_ids_add(request->id, rdb->db);
350 jsonrpc_session_send(session, request);
351 VLOG_DBG("Send monitor requests");
352 state = RPL_S_MONITOR_REQUESTED;
353 }
354 }
355 }
356 break;
357 }
358
359 case RPL_S_MONITOR_REQUESTED: {
360 /* Reply to monitor requests. */
361 struct ovsdb_error *error;
362 VLOG_INFO("Monitor request received. Resetting the database");
363 /* Resetting the database here has few risks. If the
364 * process_notification() fails, the database is completely
365 * lost locally. In case that node becomes active, then
366 * there is a chance of complete data loss in the active/standy
367 * cluster. */
368 error = reset_database(db);
369 if (!error) {
370 error = process_notification(msg->result, db);
371 }
372 if (error) {
373 ovsdb_error_assert(error);
374 state = RPL_S_ERR;
375 } else {
376 /* Transition to replicating state after receiving
377 * all replies of "monitor" requests. */
378 if (hmap_is_empty(&request_ids)) {
379 VLOG_DBG("Listening to monitor updates");
380 state = RPL_S_REPLICATING;
381 }
382 }
383 break;
384 }
385
386 case RPL_S_ERR:
387 /* Ignore all messages */
388 break;
389
390 case RPL_S_INIT:
391 case RPL_S_REPLICATING:
392 default:
393 OVS_NOT_REACHED();
394 }
395 }
396 next:
397 jsonrpc_msg_destroy(msg);
398 }
399 }
400
401 void
402 replication_wait(void)
403 {
404 if (session) {
405 jsonrpc_session_wait(session);
406 jsonrpc_session_recv_wait(session);
407 }
408 }
409
410 /* Parse 'blacklist' to rebuild 'blacklist_tables'. If 'dryrun' is false, the
411 * current black list tables will be wiped out, regardless of whether
412 * 'blacklist' can be parsed. If 'dryrun' is true, only parses 'blacklist' and
413 * reports any errors, without modifying the blacklist.
414 *
415 * On error, returns the error string, which the caller is
416 * responsible for freeing. Returns NULL otherwise. */
417 char * OVS_WARN_UNUSED_RESULT
418 set_blacklist_tables(const char *blacklist, bool dryrun)
419 {
420 struct sset set = SSET_INITIALIZER(&set);
421 char *err = NULL;
422
423 if (blacklist) {
424 const char *longname;
425
426 if (!dryrun) {
427 /* Can only add to an empty shash. */
428 blacklist_tables_clear();
429 }
430
431 sset_from_delimited_string(&set, blacklist, " ,");
432 SSET_FOR_EACH (longname, &set) {
433 char *database = xstrdup(longname), *table = NULL;
434 strtok_r(database, ":", &table);
435 if (table && !dryrun) {
436 blacklist_tables_add(database, table);
437 }
438
439 free(database);
440 if (!table) {
441 err = xasprintf("Can't parse black list table: %s", longname);
442 goto done;
443 }
444 }
445 }
446
447 done:
448 sset_destroy(&set);
449 if (err && !dryrun) {
450 /* On error, destroy the partially built 'blacklist_tables'. */
451 blacklist_tables_clear();
452 }
453 return err;
454 }
455
456 char * OVS_WARN_UNUSED_RESULT
457 get_blacklist_tables(void)
458 {
459 struct shash_node *node;
460 struct sset set = SSET_INITIALIZER(&set);
461
462 SHASH_FOR_EACH (node, &blacklist_tables) {
463 const char *database = node->name;
464 const char *table;
465 struct sset *tables = node->data;
466
467 SSET_FOR_EACH (table, tables) {
468 sset_add_and_free(&set, xasprintf("%s:%s", database, table));
469 }
470 }
471
472 /* Output the table list in an sorted order, so that
473 * the output string will not depend on the hash function
474 * that used to implement the hmap data structure. This is
475 * only useful for writting unit tests. */
476 const char **sorted = sset_sort(&set);
477 struct ds ds = DS_EMPTY_INITIALIZER;
478 size_t i;
479 for (i = 0; i < sset_count(&set); i++) {
480 ds_put_format(&ds, "%s,", sorted[i]);
481 }
482
483 ds_chomp(&ds, ',');
484
485 free(sorted);
486 sset_destroy(&set);
487
488 return ds_steal_cstr(&ds);
489 }
490
491 static void
492 blacklist_tables_clear(void)
493 {
494 struct shash_node *node;
495 SHASH_FOR_EACH (node, &blacklist_tables) {
496 struct sset *tables = node->data;
497 sset_destroy(tables);
498 }
499
500 shash_clear_free_data(&blacklist_tables);
501 }
502
503 static void
504 blacklist_tables_add(const char *database, const char *table)
505 {
506 struct sset *tables = shash_find_data(&blacklist_tables, database);
507
508 if (!tables) {
509 tables = xmalloc(sizeof *tables);
510 sset_init(tables);
511 shash_add(&blacklist_tables, database, tables);
512 }
513
514 sset_add(tables, table);
515 }
516
517 static bool
518 blacklist_tables_find(const char *database, const char *table)
519 {
520 struct sset *tables = shash_find_data(&blacklist_tables, database);
521 return tables && sset_contains(tables, table);
522 }
523
524 void
525 disconnect_active_server(void)
526 {
527 jsonrpc_session_close(session);
528 session = NULL;
529 }
530
531 void
532 replication_destroy(void)
533 {
534 blacklist_tables_clear();
535 shash_destroy(&blacklist_tables);
536
537 if (sync_from) {
538 free(sync_from);
539 sync_from = NULL;
540 }
541
542 request_ids_destroy();
543 replication_dbs_destroy();
544
545 shash_destroy(&local_dbs);
546 }
547
548 static struct replication_db *
549 find_db(const char *db_name)
550 {
551 return shash_find_data(replication_dbs, db_name);
552 }
553 \f
554 static struct ovsdb_error *
555 reset_database(struct ovsdb *db)
556 {
557 struct ovsdb_txn *txn = ovsdb_txn_create(db);
558 struct shash_node *table_node;
559
560 SHASH_FOR_EACH (table_node, &db->tables) {
561 /* Delete all rows if the table is not blacklisted. */
562 if (!blacklist_tables_find(db->schema->name, table_node->name)) {
563 struct ovsdb_table *table = table_node->data;
564 struct ovsdb_row *row, *next;
565 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &table->rows) {
566 ovsdb_txn_row_delete(txn, row);
567 }
568 }
569 }
570
571 return ovsdb_txn_propose_commit_block(txn, false);
572 }
573
574 /* Create a monitor request for 'db'. The monitor request will include
575 * any tables from 'blacklisted_tables'
576 *
577 * Caller is responsible for disposing 'request'.
578 */
579 static struct jsonrpc_msg *
580 create_monitor_request(struct ovsdb_schema *schema)
581 {
582 struct jsonrpc_msg *request;
583 struct json *monitor;
584 const char *db_name = schema->name;
585
586 struct json *monitor_request = json_object_create();
587 size_t n = shash_count(&schema->tables);
588 const struct shash_node **nodes = shash_sort(&schema->tables);
589
590 for (int j = 0; j < n; j++) {
591 struct ovsdb_table_schema *table = nodes[j]->data;
592
593 /* Monitor all tables not blacklisted. */
594 if (!blacklist_tables_find(db_name, table->name)) {
595 add_monitored_table(table, monitor_request);
596 }
597 }
598 free(nodes);
599
600 /* Create a monitor request. */
601 monitor = json_array_create_3(
602 json_string_create(db_name),
603 json_string_create(db_name),
604 monitor_request);
605 request = jsonrpc_create_request("monitor", monitor, NULL);
606
607 return request;
608 }
609
610 static void
611 add_monitored_table(struct ovsdb_table_schema *table,
612 struct json *monitor_request)
613 {
614 struct json *monitor_request_array;
615
616 monitor_request_array = json_array_create_empty();
617 json_array_add(monitor_request_array, json_object_create());
618
619 json_object_put(monitor_request, table->name, monitor_request_array);
620 }
621
622 \f
623 static struct ovsdb_error *
624 process_notification(struct json *table_updates, struct ovsdb *db)
625 {
626 struct ovsdb_error *error = NULL;
627 struct ovsdb_txn *txn;
628
629 if (table_updates->type == JSON_OBJECT) {
630 txn = ovsdb_txn_create(db);
631
632 /* Process each table update. */
633 struct shash_node *node;
634 SHASH_FOR_EACH (node, json_object(table_updates)) {
635 struct json *table_update = node->data;
636 if (table_update) {
637 error = process_table_update(table_update, node->name, db, txn);
638 if (error) {
639 break;
640 }
641 }
642 }
643
644 if (error) {
645 ovsdb_txn_abort(txn);
646 return error;
647 } else {
648 /* Commit transaction. */
649 error = ovsdb_txn_propose_commit_block(txn, false);
650 }
651 }
652
653 return error;
654 }
655
656 static struct ovsdb_error *
657 process_table_update(struct json *table_update, const char *table_name,
658 struct ovsdb *database, struct ovsdb_txn *txn)
659 {
660 struct ovsdb_table *table = ovsdb_get_table(database, table_name);
661 if (!table) {
662 return ovsdb_error("unknown table", "unknown table %s", table_name);
663 }
664
665 if (table_update->type != JSON_OBJECT) {
666 return ovsdb_error("Not a JSON object",
667 "<table-update> for table is not object");
668 }
669
670 struct shash_node *node;
671 SHASH_FOR_EACH (node, json_object(table_update)) {
672 struct json *row_update = node->data;
673 struct json *old, *new;
674
675 if (row_update->type != JSON_OBJECT) {
676 return ovsdb_error("Not a JSON object",
677 "<row-update> is not object");
678 }
679
680 struct uuid uuid;
681 if (!uuid_from_string(&uuid, node->name)) {
682 return ovsdb_syntax_error(table_update, "bad row UUID",
683 "<table-update> names must be UUIDs");
684 }
685
686 old = shash_find_data(json_object(row_update), "old");
687 new = shash_find_data(json_object(row_update), "new");
688
689 struct ovsdb_error *error;
690 error = (!new ? execute_delete(txn, &uuid, table)
691 : !old ? execute_insert(txn, &uuid, table, new)
692 : execute_update(txn, &uuid, table, new));
693 if (error) {
694 return error;
695 }
696 }
697 return NULL;
698 }
699 \f
700 static struct ovsdb_error *
701 execute_insert(struct ovsdb_txn *txn, const struct uuid *row_uuid,
702 struct ovsdb_table *table, struct json *json_row)
703 {
704 struct ovsdb_row *row = ovsdb_row_create(table);
705 struct ovsdb_error *error = ovsdb_row_from_json(row, json_row, NULL, NULL);
706 if (!error) {
707 *ovsdb_row_get_uuid_rw(row) = *row_uuid;
708 ovsdb_txn_row_insert(txn, row);
709 } else {
710 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
711 VLOG_WARN_RL(&rl, "cannot add existing row "UUID_FMT" to table %s",
712 UUID_ARGS(row_uuid), table->schema->name);
713 ovsdb_row_destroy(row);
714 }
715
716 return error;
717 }
718
719 static struct ovsdb_error *
720 execute_delete(struct ovsdb_txn *txn, const struct uuid *row_uuid,
721 struct ovsdb_table *table)
722 {
723 const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
724 if (row) {
725 ovsdb_txn_row_delete(txn, row);
726 } else {
727 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
728 VLOG_WARN_RL(&rl, "cannot delete missing row "UUID_FMT" from table %s",
729 UUID_ARGS(row_uuid), table->schema->name);
730 }
731 return NULL;
732 }
733
734 static struct ovsdb_error *
735 execute_update(struct ovsdb_txn *txn, const struct uuid *row_uuid,
736 struct ovsdb_table *table, struct json *json_row)
737 {
738 const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
739 if (!row) {
740 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
741 VLOG_WARN_RL(&rl, "cannot modify missing row "UUID_FMT" in table %s",
742 UUID_ARGS(row_uuid), table->schema->name);
743 return NULL;
744 }
745
746 struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
747 struct ovsdb_row *update = ovsdb_row_create(table);
748 struct ovsdb_error *error = ovsdb_row_from_json(update, json_row,
749 NULL, &columns);
750
751 if (!error && !ovsdb_row_equal_columns(row, update, &columns)) {
752 ovsdb_row_update_columns(ovsdb_txn_row_modify(txn, row),
753 update, &columns);
754 }
755
756 ovsdb_column_set_destroy(&columns);
757 ovsdb_row_destroy(update);
758 return error;
759 }
760
761 void
762 request_ids_add(const struct json *id, struct ovsdb *db)
763 {
764 struct request_ids_hmap_node *node = xmalloc(sizeof *node);
765
766 node->request_id = json_clone(id);
767 node->db = db;
768 hmap_insert(&request_ids, &node->hmap, json_hash(id, 0));
769 }
770
771 /* Look up 'id' from 'request_ids', if found, remove the found id from
772 * 'request_ids' and free its memory. If not found, 'request_ids' does
773 * not change. Sets '*db' to the database for the request (NULL if not
774 * found).
775 *
776 * Return true if 'id' is found, false otherwise.
777 */
778 bool
779 request_ids_lookup_and_free(const struct json *id, struct ovsdb **db)
780 {
781 struct request_ids_hmap_node *node;
782
783 HMAP_FOR_EACH_WITH_HASH (node, hmap, json_hash(id, 0), &request_ids) {
784 if (json_equal(id, node->request_id)) {
785 hmap_remove(&request_ids, &node->hmap);
786 *db = node->db;
787 json_destroy(node->request_id);
788 free(node);
789 return true;
790 }
791 }
792
793 *db = NULL;
794 return false;
795 }
796
797 static void
798 request_ids_destroy(void)
799 {
800 struct request_ids_hmap_node *node;
801
802 HMAP_FOR_EACH_POP (node, hmap, &request_ids) {
803 json_destroy(node->request_id);
804 free(node);
805 }
806 hmap_destroy(&request_ids);
807 }
808
809 void
810 request_ids_clear(void)
811 {
812 request_ids_destroy();
813 hmap_init(&request_ids);
814 }
815
816 static struct shash *
817 replication_dbs_create(void)
818 {
819 struct shash *new = xmalloc(sizeof *new);
820 shash_init(new);
821
822 struct shash_node *node;
823 SHASH_FOR_EACH (node, &local_dbs) {
824 struct replication_db *repl_db = xmalloc(sizeof *repl_db);
825 repl_db->db = node->data;
826 repl_db->schema_version_higher = false;
827 repl_db->active_db_schema = NULL;
828 shash_add(new, node->name, repl_db);
829 }
830
831 return new;
832 }
833
834 static void
835 replication_dbs_destroy(void)
836 {
837 if (!replication_dbs) {
838 return;
839 }
840
841 struct shash_node *node, *next;
842
843 SHASH_FOR_EACH_SAFE (node, next, replication_dbs) {
844 hmap_remove(&replication_dbs->map, &node->node);
845 struct replication_db *rdb = node->data;
846 if (rdb->active_db_schema) {
847 ovsdb_schema_destroy(rdb->active_db_schema);
848 }
849 free(rdb);
850 free(node->name);
851 free(node);
852 }
853
854 hmap_destroy(&replication_dbs->map);
855 free(replication_dbs);
856 replication_dbs = NULL;
857 }
858
859 /* Return true if replication just started or is ongoing.
860 * Return false if the connection failed, or the replication
861 * was not able to start. */
862 bool
863 replication_is_alive(void)
864 {
865 if (session) {
866 return jsonrpc_session_is_alive(session) && state != RPL_S_ERR;
867 }
868 return false;
869 }
870
871 /* Return the last error reported on a connection by 'session'. The
872 * return value is 0 if replication is not currently running, or
873 * if replication session has not encountered any error.
874 *
875 * Return a negative value if replication session has error, or the
876 * replication was not able to start. */
877 int
878 replication_get_last_error(void)
879 {
880 int err = 0;
881
882 if (session) {
883 err = jsonrpc_session_get_last_error(session);
884 if (!err) {
885 err = (state == RPL_S_ERR) ? ENOENT : 0;
886 }
887 }
888
889 return err;
890 }
891
892 char *
893 replication_status(void)
894 {
895 bool alive = session && jsonrpc_session_is_alive(session);
896 struct ds ds = DS_EMPTY_INITIALIZER;
897
898 if (alive) {
899 switch(state) {
900 case RPL_S_INIT:
901 case RPL_S_SERVER_ID_REQUESTED:
902 case RPL_S_DB_REQUESTED:
903 case RPL_S_SCHEMA_REQUESTED:
904 case RPL_S_MONITOR_REQUESTED:
905 ds_put_format(&ds, "connecting: %s", sync_from);
906 break;
907 case RPL_S_REPLICATING: {
908 struct shash_node *node;
909
910 ds_put_format(&ds, "replicating: %s\n", sync_from);
911 ds_put_cstr(&ds, "database:");
912 SHASH_FOR_EACH (node, replication_dbs) {
913 ds_put_format(&ds, " %s,", node->name);
914 }
915 ds_chomp(&ds, ',');
916
917 if (!shash_is_empty(&blacklist_tables)) {
918 ds_put_char(&ds, '\n');
919 ds_put_cstr(&ds, "exclude: ");
920 ds_put_and_free_cstr(&ds, get_blacklist_tables());
921 }
922 break;
923 }
924 case RPL_S_ERR:
925 ds_put_format(&ds, "Replication to (%s) failed\n", sync_from);
926 break;
927 default:
928 OVS_NOT_REACHED();
929 }
930 } else {
931 ds_put_format(&ds, "not connected to %s", sync_from);
932 }
933 return ds_steal_cstr(&ds);
934 }
935
936 /* Checks if it's possible to replicate to the local db from the active db
937 * schema. Returns true, if 'local_db_schema' has all the tables and columns
938 * of 'active_db_schema', false otherwise.
939 */
940 static bool
941 is_replication_possible(struct ovsdb_schema *local_db_schema,
942 struct ovsdb_schema *active_db_schema)
943 {
944 struct shash_node *node;
945 SHASH_FOR_EACH (node, &active_db_schema->tables) {
946 struct ovsdb_table_schema *ldb_table_schema =
947 shash_find_data(&local_db_schema->tables, node->name);
948 if (!ldb_table_schema) {
949 VLOG_INFO("Table %s not present in the local db schema",
950 node->name);
951 return false;
952 }
953
954 /* Local schema table should have all the columns
955 * of active schema table. */
956 struct ovsdb_table_schema *adb_table_schema = node->data;
957 struct shash_node *n;
958 SHASH_FOR_EACH (n, &adb_table_schema->columns) {
959 struct ovsdb_column *ldb_col =
960 shash_find_data(&ldb_table_schema->columns, n->name);
961 if (!ldb_col) {
962 VLOG_INFO("Column %s not present in the local "
963 "db schema table %s.", n->name, node->name);
964 return false;
965 }
966
967 struct json *ldb_col_json = ovsdb_column_to_json(ldb_col);
968 struct json *adb_col_json = ovsdb_column_to_json(n->data);
969 bool cols_equal = json_equal(ldb_col_json, adb_col_json);
970 json_destroy(ldb_col_json);
971 json_destroy(adb_col_json);
972
973 if (!cols_equal) {
974 VLOG_INFO("Column %s mismatch in local "
975 "db schema table %s.", n->name, node->name);
976 return false;
977 }
978 }
979 }
980
981 return true;
982 }
983
984 void
985 replication_set_probe_interval(int probe_interval)
986 {
987 if (session) {
988 jsonrpc_session_set_probe_interval(session, probe_interval);
989 }
990 }
991
992 void
993 replication_usage(void)
994 {
995 printf("\n\
996 Syncing options:\n\
997 --sync-from=SERVER sync DATABASE from active SERVER and start in\n\
998 backup mode (except with --active)\n\
999 --sync-exclude-tables=DB:TABLE,...\n\
1000 exclude the TABLE in DB from syncing\n\
1001 --active with --sync-from, start in active mode\n");
1002 }