]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/replication.c
ovn-nbctl: Fix the ovn-nbctl test "LBs - daemon" which fails during rpm build
[mirror_ovs.git] / ovsdb / replication.c
1 /*
2 * (c) Copyright 2016, 2017 Hewlett Packard Enterprise Development LP
3 * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016, 2017 Nicira, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include <config.h>
19
20
21 #include "condition.h"
22 #include "jsonrpc.h"
23 #include "openvswitch/dynamic-string.h"
24 #include "openvswitch/hmap.h"
25 #include "openvswitch/json.h"
26 #include "openvswitch/vlog.h"
27 #include "ovsdb-error.h"
28 #include "ovsdb.h"
29 #include "query.h"
30 #include "replication.h"
31 #include "row.h"
32 #include "sset.h"
33 #include "stream.h"
34 #include "svec.h"
35 #include "table.h"
36 #include "transaction.h"
37 #include "uuid.h"
38
39 VLOG_DEFINE_THIS_MODULE(replication);
40
41 static char *sync_from;
42 static struct uuid server_uuid;
43 static struct jsonrpc_session *session;
44 static unsigned int session_seqno = UINT_MAX;
45
46 static struct jsonrpc_msg *create_monitor_request(struct ovsdb *db);
47 static void add_monitored_table(struct ovsdb_table_schema *table,
48 struct json *monitor_requests);
49
50 static struct ovsdb_error *reset_database(struct ovsdb *db);
51
52 static struct ovsdb_error *process_notification(struct json *, struct ovsdb *);
53 static struct ovsdb_error *process_table_update(struct json *table_update,
54 const char *table_name,
55 struct ovsdb *database,
56 struct ovsdb_txn *txn);
57
58 static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn,
59 const struct uuid *row_uuid,
60 struct ovsdb_table *table,
61 struct json *new);
62 static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn,
63 const struct uuid *row_uuid,
64 struct ovsdb_table *table);
65 static struct ovsdb_error *execute_update(struct ovsdb_txn *txn,
66 const struct uuid *row_uuid,
67 struct ovsdb_table *table,
68 struct json *new);
69 \f
70 /* Maps from db name to sset of table names. */
71 static struct shash blacklist_tables = SHASH_INITIALIZER(&blacklist_tables);
72
73 static void blacklist_tables_clear(void);
74 static void blacklist_tables_add(const char *database, const char *table);
75 static bool blacklist_tables_find(const char *database, const char* table);
76
77 \f
78 /* Keep track of request IDs of all outstanding OVSDB requests. */
79 static struct hmap request_ids = HMAP_INITIALIZER(&request_ids);
80
81 struct request_ids_hmap_node {
82 struct hmap_node hmap;
83 struct json *request_id;
84 struct ovsdb *db; /* associated database */
85 };
86 void request_ids_add(const struct json *id, struct ovsdb *db);
87 bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db);
88 static void request_ids_destroy(void);
89 void request_ids_clear(void);
90 \f
91 enum ovsdb_replication_state {
92 RPL_S_INIT,
93 RPL_S_SERVER_ID_REQUESTED,
94 RPL_S_DB_REQUESTED,
95 RPL_S_SCHEMA_REQUESTED,
96 RPL_S_MONITOR_REQUESTED,
97 RPL_S_REPLICATING,
98 RPL_S_ERR /* Error, no longer replicating. */
99 };
100 static enum ovsdb_replication_state state;
101
102 \f
103 /* All DBs known to ovsdb-server. The actual replication dbs are stored
104 * in 'replication dbs', which is a subset of all dbs and remote dbs whose
105 * schema matches. */
106 static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
107 static struct shash *replication_dbs;
108
109 static struct shash *replication_db_clone(struct shash *dbs);
110 static void replication_dbs_destroy(void);
111 /* Find 'struct ovsdb' by name within 'replication_dbs' */
112 static struct ovsdb* find_db(const char *db_name);
113 \f
114
115 void
116 replication_init(const char *sync_from_, const char *exclude_tables,
117 const struct uuid *server)
118 {
119 free(sync_from);
120 sync_from = xstrdup(sync_from_);
121 /* Caller should have verified that the 'exclude_tables' is
122 * parseable. An error here is unexpected. */
123 ovs_assert(!set_blacklist_tables(exclude_tables, false));
124
125 replication_dbs_destroy();
126
127 shash_clear(&local_dbs);
128 if (session) {
129 jsonrpc_session_close(session);
130 }
131
132 session = jsonrpc_session_open(sync_from, true);
133 session_seqno = UINT_MAX;
134
135 /* Keep a copy of local server uuid. */
136 server_uuid = *server;
137
138 state = RPL_S_INIT;
139 }
140
141 void
142 replication_add_local_db(const char *database, struct ovsdb *db)
143 {
144 shash_add_assert(&local_dbs, database, db);
145 }
146
147 static void
148 send_schema_requests(const struct json *result)
149 {
150 for (size_t i = 0; i < result->array.n; i++) {
151 const struct json *name = result->array.elems[i];
152 if (name->type == JSON_STRING) {
153 /* Send one schema request for each remote DB. */
154 const char *db_name = json_string(name);
155 struct ovsdb *db = find_db(db_name);
156 if (db) {
157 struct jsonrpc_msg *request =
158 jsonrpc_create_request(
159 "get_schema",
160 json_array_create_1(
161 json_string_create(db_name)),
162 NULL);
163
164 request_ids_add(request->id, db);
165 jsonrpc_session_send(session, request);
166 }
167 }
168 }
169 }
170
171 void
172 replication_run(void)
173 {
174 if (!session) {
175 return;
176 }
177
178 jsonrpc_session_run(session);
179
180 for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
181 struct jsonrpc_msg *msg;
182 unsigned int seqno;
183
184 seqno = jsonrpc_session_get_seqno(session);
185 if (seqno != session_seqno || state == RPL_S_INIT) {
186 session_seqno = seqno;
187 request_ids_clear();
188 struct jsonrpc_msg *request;
189 request = jsonrpc_create_request("get_server_id",
190 json_array_create_empty(), NULL);
191 request_ids_add(request->id, NULL);
192 jsonrpc_session_send(session, request);
193
194 state = RPL_S_SERVER_ID_REQUESTED;
195 VLOG_DBG("send server ID request.");
196 }
197
198 msg = jsonrpc_session_recv(session);
199 if (!msg) {
200 continue;
201 }
202
203 if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR
204 && !strcmp(msg->method, "update")) {
205 if (msg->params->type == JSON_ARRAY
206 && msg->params->array.n == 2
207 && msg->params->array.elems[0]->type == JSON_STRING) {
208 char *db_name = msg->params->array.elems[0]->string;
209 struct ovsdb *db = find_db(db_name);
210 if (db) {
211 struct ovsdb_error *error;
212 error = process_notification(msg->params->array.elems[1],
213 db);
214 if (error) {
215 ovsdb_error_assert(error);
216 state = RPL_S_ERR;
217 }
218 }
219 }
220 } else if (msg->type == JSONRPC_REPLY) {
221 struct ovsdb *db;
222 if (!request_ids_lookup_and_free(msg->id, &db)) {
223 VLOG_WARN("received unexpected reply");
224 goto next;
225 }
226
227 switch (state) {
228 case RPL_S_SERVER_ID_REQUESTED: {
229 struct uuid uuid;
230 if (msg->result->type != JSON_STRING ||
231 !uuid_from_string(&uuid, json_string(msg->result))) {
232 struct ovsdb_error *error;
233 error = ovsdb_error("get_server_id failed",
234 "Server ID is not valid UUID");
235
236 ovsdb_error_assert(error);
237 state = RPL_S_ERR;
238 break;
239 }
240
241 if (uuid_equals(&uuid, &server_uuid)) {
242 struct ovsdb_error *error;
243 error = ovsdb_error("Server ID check failed",
244 "Self replicating is not allowed");
245
246 ovsdb_error_assert(error);
247 state = RPL_S_ERR;
248 break;
249 }
250
251 struct jsonrpc_msg *request;
252 request = jsonrpc_create_request("list_dbs",
253 json_array_create_empty(),
254 NULL);
255 request_ids_add(request->id, NULL);
256 jsonrpc_session_send(session, request);
257
258 replication_dbs_destroy();
259 replication_dbs = replication_db_clone(&local_dbs);
260 state = RPL_S_DB_REQUESTED;
261 break;
262 }
263 case RPL_S_DB_REQUESTED:
264 if (msg->result->type != JSON_ARRAY) {
265 struct ovsdb_error *error;
266 error = ovsdb_error("list_dbs failed",
267 "list_dbs response is not array");
268 ovsdb_error_assert(error);
269 state = RPL_S_ERR;
270 } else {
271 send_schema_requests(msg->result);
272 VLOG_DBG("Send schema requests");
273 state = RPL_S_SCHEMA_REQUESTED;
274 }
275 break;
276
277 case RPL_S_SCHEMA_REQUESTED: {
278 struct ovsdb_schema *schema;
279 struct ovsdb_error *error;
280
281 error = ovsdb_schema_from_json(msg->result, &schema);
282 if (error) {
283 ovsdb_error_assert(error);
284 state = RPL_S_ERR;
285 }
286
287 if (db != find_db(schema->name)) {
288 /* Unexpected schema. */
289 VLOG_WARN("unexpected schema %s", schema->name);
290 state = RPL_S_ERR;
291 } else if (!ovsdb_schema_equal(schema, db->schema)) {
292 /* Schmea version mismatch. */
293 VLOG_INFO("Schema version mismatch, %s not replicated",
294 schema->name);
295 shash_find_and_delete(replication_dbs, schema->name);
296 }
297 ovsdb_schema_destroy(schema);
298
299 /* After receiving schemas, reset the local databases that
300 * will be monitored and send out monitor requests for them. */
301 if (hmap_is_empty(&request_ids)) {
302 struct shash_node *node;
303
304 if (shash_is_empty(replication_dbs)) {
305 VLOG_WARN("Nothing to replicate.");
306 state = RPL_S_ERR;
307 } else {
308 SHASH_FOR_EACH (node, replication_dbs) {
309 db = node->data;
310 struct jsonrpc_msg *request =
311 create_monitor_request(db);
312
313 request_ids_add(request->id, db);
314 jsonrpc_session_send(session, request);
315 VLOG_DBG("Send monitor requests");
316 state = RPL_S_MONITOR_REQUESTED;
317 }
318 }
319 }
320 break;
321 }
322
323 case RPL_S_MONITOR_REQUESTED: {
324 /* Reply to monitor requests. */
325 struct ovsdb_error *error;
326 VLOG_INFO("Monitor request received. Resetting the database");
327 /* Resetting the database here has few risks. If the
328 * process_notification() fails, the database is completely
329 * lost locally. In case that node becomes active, then
330 * there is a chance of complete data loss in the active/standy
331 * cluster. */
332 error = reset_database(db);
333 if (!error) {
334 error = process_notification(msg->result, db);
335 }
336 if (error) {
337 ovsdb_error_assert(error);
338 state = RPL_S_ERR;
339 } else {
340 /* Transition to replicating state after receiving
341 * all replies of "monitor" requests. */
342 if (hmap_is_empty(&request_ids)) {
343 VLOG_DBG("Listening to monitor updates");
344 state = RPL_S_REPLICATING;
345 }
346 }
347 break;
348 }
349
350 case RPL_S_ERR:
351 /* Ignore all messages */
352 break;
353
354 case RPL_S_INIT:
355 case RPL_S_REPLICATING:
356 default:
357 OVS_NOT_REACHED();
358 }
359 }
360 next:
361 jsonrpc_msg_destroy(msg);
362 }
363 }
364
365 void
366 replication_wait(void)
367 {
368 if (session) {
369 jsonrpc_session_wait(session);
370 jsonrpc_session_recv_wait(session);
371 }
372 }
373
374 /* Parse 'blacklist' to rebuild 'blacklist_tables'. If 'dryrun' is false, the
375 * current black list tables will be wiped out, regardless of whether
376 * 'blacklist' can be parsed. If 'dryrun' is true, only parses 'blacklist' and
377 * reports any errors, without modifying the blacklist.
378 *
379 * On error, returns the error string, which the caller is
380 * responsible for freeing. Returns NULL otherwise. */
381 char * OVS_WARN_UNUSED_RESULT
382 set_blacklist_tables(const char *blacklist, bool dryrun)
383 {
384 struct sset set = SSET_INITIALIZER(&set);
385 char *err = NULL;
386
387 if (blacklist) {
388 const char *longname;
389
390 if (!dryrun) {
391 /* Can only add to an empty shash. */
392 blacklist_tables_clear();
393 }
394
395 sset_from_delimited_string(&set, blacklist, " ,");
396 SSET_FOR_EACH (longname, &set) {
397 char *database = xstrdup(longname), *table = NULL;
398 strtok_r(database, ":", &table);
399 if (table && !dryrun) {
400 blacklist_tables_add(database, table);
401 }
402
403 free(database);
404 if (!table) {
405 err = xasprintf("Can't parse black list table: %s", longname);
406 goto done;
407 }
408 }
409 }
410
411 done:
412 sset_destroy(&set);
413 if (err && !dryrun) {
414 /* On error, destroy the partially built 'blacklist_tables'. */
415 blacklist_tables_clear();
416 }
417 return err;
418 }
419
420 char * OVS_WARN_UNUSED_RESULT
421 get_blacklist_tables(void)
422 {
423 struct shash_node *node;
424 struct sset set = SSET_INITIALIZER(&set);
425
426 SHASH_FOR_EACH (node, &blacklist_tables) {
427 const char *database = node->name;
428 const char *table;
429 struct sset *tables = node->data;
430
431 SSET_FOR_EACH (table, tables) {
432 sset_add_and_free(&set, xasprintf("%s:%s", database, table));
433 }
434 }
435
436 /* Output the table list in an sorted order, so that
437 * the output string will not depend on the hash function
438 * that used to implement the hmap data structure. This is
439 * only useful for writting unit tests. */
440 const char **sorted = sset_sort(&set);
441 struct ds ds = DS_EMPTY_INITIALIZER;
442 size_t i;
443 for (i = 0; i < sset_count(&set); i++) {
444 ds_put_format(&ds, "%s,", sorted[i]);
445 }
446
447 ds_chomp(&ds, ',');
448
449 free(sorted);
450 sset_destroy(&set);
451
452 return ds_steal_cstr(&ds);
453 }
454
455 static void
456 blacklist_tables_clear(void)
457 {
458 struct shash_node *node;
459 SHASH_FOR_EACH (node, &blacklist_tables) {
460 struct sset *tables = node->data;
461 sset_destroy(tables);
462 }
463
464 shash_clear_free_data(&blacklist_tables);
465 }
466
467 static void
468 blacklist_tables_add(const char *database, const char *table)
469 {
470 struct sset *tables = shash_find_data(&blacklist_tables, database);
471
472 if (!tables) {
473 tables = xmalloc(sizeof *tables);
474 sset_init(tables);
475 shash_add(&blacklist_tables, database, tables);
476 }
477
478 sset_add(tables, table);
479 }
480
481 static bool
482 blacklist_tables_find(const char *database, const char *table)
483 {
484 struct sset *tables = shash_find_data(&blacklist_tables, database);
485 return tables && sset_contains(tables, table);
486 }
487
488 void
489 disconnect_active_server(void)
490 {
491 jsonrpc_session_close(session);
492 session = NULL;
493 }
494
495 void
496 replication_destroy(void)
497 {
498 blacklist_tables_clear();
499 shash_destroy(&blacklist_tables);
500
501 if (sync_from) {
502 free(sync_from);
503 sync_from = NULL;
504 }
505
506 request_ids_destroy();
507 replication_dbs_destroy();
508
509 shash_destroy(&local_dbs);
510 }
511
512 static struct ovsdb *
513 find_db(const char *db_name)
514 {
515 return shash_find_data(replication_dbs, db_name);
516 }
517 \f
518 static struct ovsdb_error *
519 reset_database(struct ovsdb *db)
520 {
521 struct ovsdb_txn *txn = ovsdb_txn_create(db);
522 struct shash_node *table_node;
523
524 SHASH_FOR_EACH (table_node, &db->tables) {
525 /* Delete all rows if the table is not blacklisted. */
526 if (!blacklist_tables_find(db->schema->name, table_node->name)) {
527 struct ovsdb_table *table = table_node->data;
528 struct ovsdb_row *row, *next;
529 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &table->rows) {
530 ovsdb_txn_row_delete(txn, row);
531 }
532 }
533 }
534
535 return ovsdb_txn_propose_commit_block(txn, false);
536 }
537
538 /* Create a monitor request for 'db'. The monitor request will include
539 * any tables from 'blacklisted_tables'
540 *
541 * Caller is responsible for disposing 'request'.
542 */
543 static struct jsonrpc_msg *
544 create_monitor_request(struct ovsdb *db)
545 {
546 struct jsonrpc_msg *request;
547 struct json *monitor;
548 struct ovsdb_schema *schema = db->schema;
549 const char *db_name = schema->name;
550
551 struct json *monitor_request = json_object_create();
552 size_t n = shash_count(&schema->tables);
553 const struct shash_node **nodes = shash_sort(&schema->tables);
554
555 for (int j = 0; j < n; j++) {
556 struct ovsdb_table_schema *table = nodes[j]->data;
557
558 /* Monitor all tables not blacklisted. */
559 if (!blacklist_tables_find(db_name, table->name)) {
560 add_monitored_table(table, monitor_request);
561 }
562 }
563 free(nodes);
564
565 /* Create a monitor request. */
566 monitor = json_array_create_3(
567 json_string_create(db_name),
568 json_string_create(db_name),
569 monitor_request);
570 request = jsonrpc_create_request("monitor", monitor, NULL);
571
572 return request;
573 }
574
575 static void
576 add_monitored_table(struct ovsdb_table_schema *table,
577 struct json *monitor_request)
578 {
579 struct json *monitor_request_array;
580
581 monitor_request_array = json_array_create_empty();
582 json_array_add(monitor_request_array, json_object_create());
583
584 json_object_put(monitor_request, table->name, monitor_request_array);
585 }
586
587 \f
588 static struct ovsdb_error *
589 process_notification(struct json *table_updates, struct ovsdb *db)
590 {
591 struct ovsdb_error *error = NULL;
592 struct ovsdb_txn *txn;
593
594 if (table_updates->type == JSON_OBJECT) {
595 txn = ovsdb_txn_create(db);
596
597 /* Process each table update. */
598 struct shash_node *node;
599 SHASH_FOR_EACH (node, json_object(table_updates)) {
600 struct json *table_update = node->data;
601 if (table_update) {
602 error = process_table_update(table_update, node->name, db, txn);
603 if (error) {
604 break;
605 }
606 }
607 }
608
609 if (error) {
610 ovsdb_txn_abort(txn);
611 return error;
612 } else {
613 /* Commit transaction. */
614 error = ovsdb_txn_propose_commit_block(txn, false);
615 }
616 }
617
618 return error;
619 }
620
621 static struct ovsdb_error *
622 process_table_update(struct json *table_update, const char *table_name,
623 struct ovsdb *database, struct ovsdb_txn *txn)
624 {
625 struct ovsdb_table *table = ovsdb_get_table(database, table_name);
626 if (!table) {
627 return ovsdb_error("unknown table", "unknown table %s", table_name);
628 }
629
630 if (table_update->type != JSON_OBJECT) {
631 return ovsdb_error("Not a JSON object",
632 "<table-update> for table is not object");
633 }
634
635 struct shash_node *node;
636 SHASH_FOR_EACH (node, json_object(table_update)) {
637 struct json *row_update = node->data;
638 struct json *old, *new;
639
640 if (row_update->type != JSON_OBJECT) {
641 return ovsdb_error("Not a JSON object",
642 "<row-update> is not object");
643 }
644
645 struct uuid uuid;
646 if (!uuid_from_string(&uuid, node->name)) {
647 return ovsdb_syntax_error(table_update, "bad row UUID",
648 "<table-update> names must be UUIDs");
649 }
650
651 old = shash_find_data(json_object(row_update), "old");
652 new = shash_find_data(json_object(row_update), "new");
653
654 struct ovsdb_error *error;
655 error = (!new ? execute_delete(txn, &uuid, table)
656 : !old ? execute_insert(txn, &uuid, table, new)
657 : execute_update(txn, &uuid, table, new));
658 if (error) {
659 return error;
660 }
661 }
662 return NULL;
663 }
664 \f
665 static struct ovsdb_error *
666 execute_insert(struct ovsdb_txn *txn, const struct uuid *row_uuid,
667 struct ovsdb_table *table, struct json *json_row)
668 {
669 struct ovsdb_row *row = ovsdb_row_create(table);
670 struct ovsdb_error *error = ovsdb_row_from_json(row, json_row, NULL, NULL);
671 if (!error) {
672 *ovsdb_row_get_uuid_rw(row) = *row_uuid;
673 ovsdb_txn_row_insert(txn, row);
674 } else {
675 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
676 VLOG_WARN_RL(&rl, "cannot add existing row "UUID_FMT" to table %s",
677 UUID_ARGS(row_uuid), table->schema->name);
678 ovsdb_row_destroy(row);
679 }
680
681 return error;
682 }
683
684 static struct ovsdb_error *
685 execute_delete(struct ovsdb_txn *txn, const struct uuid *row_uuid,
686 struct ovsdb_table *table)
687 {
688 const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
689 if (row) {
690 ovsdb_txn_row_delete(txn, row);
691 } else {
692 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
693 VLOG_WARN_RL(&rl, "cannot delete missing row "UUID_FMT" from table %s",
694 UUID_ARGS(row_uuid), table->schema->name);
695 }
696 return NULL;
697 }
698
699 static struct ovsdb_error *
700 execute_update(struct ovsdb_txn *txn, const struct uuid *row_uuid,
701 struct ovsdb_table *table, struct json *json_row)
702 {
703 const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
704 if (!row) {
705 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
706 VLOG_WARN_RL(&rl, "cannot modify missing row "UUID_FMT" in table %s",
707 UUID_ARGS(row_uuid), table->schema->name);
708 return NULL;
709 }
710
711 struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
712 struct ovsdb_row *update = ovsdb_row_create(table);
713 struct ovsdb_error *error = ovsdb_row_from_json(update, json_row,
714 NULL, &columns);
715
716 if (!error && !ovsdb_row_equal_columns(row, update, &columns)) {
717 ovsdb_row_update_columns(ovsdb_txn_row_modify(txn, row),
718 update, &columns);
719 }
720
721 ovsdb_column_set_destroy(&columns);
722 ovsdb_row_destroy(update);
723 return error;
724 }
725
726 void
727 request_ids_add(const struct json *id, struct ovsdb *db)
728 {
729 struct request_ids_hmap_node *node = xmalloc(sizeof *node);
730
731 node->request_id = json_clone(id);
732 node->db = db;
733 hmap_insert(&request_ids, &node->hmap, json_hash(id, 0));
734 }
735
736 /* Look up 'id' from 'request_ids', if found, remove the found id from
737 * 'request_ids' and free its memory. If not found, 'request_ids' does
738 * not change. Sets '*db' to the database for the request (NULL if not
739 * found).
740 *
741 * Return true if 'id' is found, false otherwise.
742 */
743 bool
744 request_ids_lookup_and_free(const struct json *id, struct ovsdb **db)
745 {
746 struct request_ids_hmap_node *node;
747
748 HMAP_FOR_EACH_WITH_HASH (node, hmap, json_hash(id, 0), &request_ids) {
749 if (json_equal(id, node->request_id)) {
750 hmap_remove(&request_ids, &node->hmap);
751 *db = node->db;
752 json_destroy(node->request_id);
753 free(node);
754 return true;
755 }
756 }
757
758 *db = NULL;
759 return false;
760 }
761
762 static void
763 request_ids_destroy(void)
764 {
765 struct request_ids_hmap_node *node;
766
767 HMAP_FOR_EACH_POP (node, hmap, &request_ids) {
768 json_destroy(node->request_id);
769 free(node);
770 }
771 hmap_destroy(&request_ids);
772 }
773
774 void
775 request_ids_clear(void)
776 {
777 request_ids_destroy();
778 hmap_init(&request_ids);
779 }
780
781 static struct shash *
782 replication_db_clone(struct shash *dbs)
783 {
784 struct shash *new = xmalloc(sizeof *new);
785 shash_init(new);
786
787 struct shash_node *node;
788 SHASH_FOR_EACH (node, dbs) {
789 shash_add(new, node->name, node->data);
790 }
791
792 return new;
793 }
794
795 static void
796 replication_dbs_destroy(void)
797 {
798 shash_destroy(replication_dbs);
799 free(replication_dbs);
800 replication_dbs = NULL;
801 }
802
803 /* Return true if replication just started or is ongoing.
804 * Return false if the connection failed, or the replication
805 * was not able to start. */
806 bool
807 replication_is_alive(void)
808 {
809 if (session) {
810 return jsonrpc_session_is_alive(session) && state != RPL_S_ERR;
811 }
812 return false;
813 }
814
815 /* Return the last error reported on a connection by 'session'. The
816 * return value is 0 if replication is not currently running, or
817 * if replication session has not encountered any error.
818 *
819 * Return a negative value if replication session has error, or the
820 * replication was not able to start. */
821 int
822 replication_get_last_error(void)
823 {
824 int err = 0;
825
826 if (session) {
827 err = jsonrpc_session_get_last_error(session);
828 if (!err) {
829 err = (state == RPL_S_ERR) ? ENOENT : 0;
830 }
831 }
832
833 return err;
834 }
835
836 char *
837 replication_status(void)
838 {
839 bool alive = session && jsonrpc_session_is_alive(session);
840 struct ds ds = DS_EMPTY_INITIALIZER;
841
842 if (alive) {
843 switch(state) {
844 case RPL_S_INIT:
845 case RPL_S_SERVER_ID_REQUESTED:
846 case RPL_S_DB_REQUESTED:
847 case RPL_S_SCHEMA_REQUESTED:
848 case RPL_S_MONITOR_REQUESTED:
849 ds_put_format(&ds, "connecting: %s", sync_from);
850 break;
851 case RPL_S_REPLICATING: {
852 struct shash_node *node;
853
854 ds_put_format(&ds, "replicating: %s\n", sync_from);
855 ds_put_cstr(&ds, "database:");
856 SHASH_FOR_EACH (node, replication_dbs) {
857 ds_put_format(&ds, " %s,", node->name);
858 }
859 ds_chomp(&ds, ',');
860
861 if (!shash_is_empty(&blacklist_tables)) {
862 ds_put_char(&ds, '\n');
863 ds_put_cstr(&ds, "exclude: ");
864 ds_put_and_free_cstr(&ds, get_blacklist_tables());
865 }
866 break;
867 }
868 case RPL_S_ERR:
869 ds_put_format(&ds, "Replication to (%s) failed\n", sync_from);
870 break;
871 default:
872 OVS_NOT_REACHED();
873 }
874 } else {
875 ds_put_format(&ds, "not connected to %s", sync_from);
876 }
877 return ds_steal_cstr(&ds);
878 }
879
880 void
881 replication_usage(void)
882 {
883 printf("\n\
884 Syncing options:\n\
885 --sync-from=SERVER sync DATABASE from active SERVER and start in\n\
886 backup mode (except with --active)\n\
887 --sync-exclude-tables=DB:TABLE,...\n\
888 exclude the TABLE in DB from syncing\n\
889 --active with --sync-from, start in active mode\n");
890 }