]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/replication.c
log: Allow client to specify magic.
[mirror_ovs.git] / ovsdb / replication.c
1 /*
2 * (c) Copyright 2016 Hewlett Packard Enterprise Development LP
3 * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016, 2017 Nicira, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include <config.h>
19
20
21 #include "condition.h"
22 #include "jsonrpc.h"
23 #include "openvswitch/dynamic-string.h"
24 #include "openvswitch/hmap.h"
25 #include "openvswitch/json.h"
26 #include "openvswitch/vlog.h"
27 #include "ovsdb-error.h"
28 #include "ovsdb.h"
29 #include "query.h"
30 #include "replication.h"
31 #include "row.h"
32 #include "sset.h"
33 #include "stream.h"
34 #include "svec.h"
35 #include "table.h"
36 #include "transaction.h"
37 #include "uuid.h"
38
39 VLOG_DEFINE_THIS_MODULE(replication);
40
41 static char *sync_from;
42 static struct uuid server_uuid;
43 static struct jsonrpc_session *session;
44 static unsigned int session_seqno = UINT_MAX;
45
46 static struct jsonrpc_msg *create_monitor_request(struct ovsdb *db);
47 static void add_monitored_table(struct ovsdb_table_schema *table,
48 struct json *monitor_requests);
49
50 static struct ovsdb_error *reset_database(struct ovsdb *db);
51
52 static struct ovsdb_error *process_notification(struct json *, struct ovsdb *);
53 static struct ovsdb_error *process_table_update(struct json *table_update,
54 const char *table_name,
55 struct ovsdb *database,
56 struct ovsdb_txn *txn);
57
58 static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn,
59 const struct uuid *row_uuid,
60 struct ovsdb_table *table,
61 struct json *new);
62 static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn,
63 const struct uuid *row_uuid,
64 struct ovsdb_table *table);
65 static struct ovsdb_error *execute_update(struct ovsdb_txn *txn,
66 const struct uuid *row_uuid,
67 struct ovsdb_table *table,
68 struct json *new);
69 \f
70 /* Maps from db name to sset of table names. */
71 static struct shash blacklist_tables = SHASH_INITIALIZER(&blacklist_tables);
72
73 static void blacklist_tables_clear(void);
74 static void blacklist_tables_add(const char *database, const char *table);
75 static bool blacklist_tables_find(const char *database, const char* table);
76
77 \f
78 /* Keep track of request IDs of all outstanding OVSDB requests. */
79 static struct hmap request_ids = HMAP_INITIALIZER(&request_ids);
80
81 struct request_ids_hmap_node {
82 struct hmap_node hmap;
83 struct json *request_id;
84 struct ovsdb *db; /* associated database */
85 };
86 void request_ids_add(const struct json *id, struct ovsdb *db);
87 bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db);
88 static void request_ids_destroy(void);
89 void request_ids_clear(void);
90 \f
91 enum ovsdb_replication_state {
92 RPL_S_INIT,
93 RPL_S_SERVER_ID_REQUESTED,
94 RPL_S_DB_REQUESTED,
95 RPL_S_SCHEMA_REQUESTED,
96 RPL_S_MONITOR_REQUESTED,
97 RPL_S_REPLICATING,
98 RPL_S_ERR /* Error, no longer replicating. */
99 };
100 static enum ovsdb_replication_state state;
101
102 \f
103 /* All DBs known to ovsdb-server. The actual replication dbs are stored
104 * in 'replication dbs', which is a subset of all dbs and remote dbs whose
105 * schema matches. */
106 static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
107 static struct shash *replication_dbs;
108
109 static struct shash *replication_db_clone(struct shash *dbs);
110 static void replication_dbs_destroy(void);
111 /* Find 'struct ovsdb' by name within 'replication_dbs' */
112 static struct ovsdb* find_db(const char *db_name);
113 \f
114
115 void
116 replication_init(const char *sync_from_, const char *exclude_tables,
117 const struct uuid *server)
118 {
119 free(sync_from);
120 sync_from = xstrdup(sync_from_);
121 char *err = set_blacklist_tables(exclude_tables, false);
122 /* Caller should have verified that the 'exclude_tables' is
123 * parseable. An error here is unexpected. */
124 ovs_assert(!err);
125
126 replication_dbs_destroy();
127
128 shash_clear(&local_dbs);
129 if (session) {
130 jsonrpc_session_close(session);
131 }
132
133 session = jsonrpc_session_open(sync_from, true);
134 session_seqno = UINT_MAX;
135
136 /* Keep a copy of local server uuid. */
137 server_uuid = *server;
138
139 state = RPL_S_INIT;
140 }
141
142 void
143 replication_add_local_db(const char *database, struct ovsdb *db)
144 {
145 shash_add_assert(&local_dbs, database, db);
146 }
147
148 static void
149 send_schema_requests(const struct json *result)
150 {
151 for (size_t i = 0; i < result->u.array.n; i++) {
152 const struct json *name = result->u.array.elems[i];
153 if (name->type == JSON_STRING) {
154 /* Send one schema request for each remote DB. */
155 const char *db_name = json_string(name);
156 struct ovsdb *db = find_db(db_name);
157 if (db) {
158 struct jsonrpc_msg *request =
159 jsonrpc_create_request(
160 "get_schema",
161 json_array_create_1(
162 json_string_create(db_name)),
163 NULL);
164
165 request_ids_add(request->id, db);
166 jsonrpc_session_send(session, request);
167 }
168 }
169 }
170 }
171
172 void
173 replication_run(void)
174 {
175 if (!session) {
176 return;
177 }
178
179 jsonrpc_session_run(session);
180
181 for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
182 struct jsonrpc_msg *msg;
183 unsigned int seqno;
184
185 seqno = jsonrpc_session_get_seqno(session);
186 if (seqno != session_seqno || state == RPL_S_INIT) {
187 session_seqno = seqno;
188 request_ids_clear();
189 struct jsonrpc_msg *request;
190 request = jsonrpc_create_request("get_server_id",
191 json_array_create_empty(), NULL);
192 request_ids_add(request->id, NULL);
193 jsonrpc_session_send(session, request);
194
195 state = RPL_S_SERVER_ID_REQUESTED;
196 VLOG_DBG("send server ID request.");
197 }
198
199 msg = jsonrpc_session_recv(session);
200 if (!msg) {
201 continue;
202 }
203
204 if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR
205 && !strcmp(msg->method, "update")) {
206 if (msg->params->type == JSON_ARRAY
207 && msg->params->u.array.n == 2
208 && msg->params->u.array.elems[0]->type == JSON_STRING) {
209 char *db_name = msg->params->u.array.elems[0]->u.string;
210 struct ovsdb *db = find_db(db_name);
211 if (db) {
212 struct ovsdb_error *error;
213 error = process_notification(msg->params->u.array.elems[1],
214 db);
215 if (error) {
216 ovsdb_error_assert(error);
217 state = RPL_S_ERR;
218 }
219 }
220 }
221 } else if (msg->type == JSONRPC_REPLY) {
222 struct ovsdb *db;
223 if (!request_ids_lookup_and_free(msg->id, &db)) {
224 VLOG_WARN("received unexpected reply");
225 goto next;
226 }
227
228 switch (state) {
229 case RPL_S_SERVER_ID_REQUESTED: {
230 struct uuid uuid;
231 if (msg->result->type != JSON_STRING ||
232 !uuid_from_string(&uuid, json_string(msg->result))) {
233 struct ovsdb_error *error;
234 error = ovsdb_error("get_server_id failed",
235 "Server ID is not valid UUID");
236
237 ovsdb_error_assert(error);
238 state = RPL_S_ERR;
239 break;
240 }
241
242 if (uuid_equals(&uuid, &server_uuid)) {
243 struct ovsdb_error *error;
244 error = ovsdb_error("Server ID check failed",
245 "Self replicating is not allowed");
246
247 ovsdb_error_assert(error);
248 state = RPL_S_ERR;
249 break;
250 }
251
252 struct jsonrpc_msg *request;
253 request = jsonrpc_create_request("list_dbs",
254 json_array_create_empty(),
255 NULL);
256 request_ids_add(request->id, NULL);
257 jsonrpc_session_send(session, request);
258
259 replication_dbs_destroy();
260 replication_dbs = replication_db_clone(&local_dbs);
261 state = RPL_S_DB_REQUESTED;
262 break;
263 }
264 case RPL_S_DB_REQUESTED:
265 if (msg->result->type != JSON_ARRAY) {
266 struct ovsdb_error *error;
267 error = ovsdb_error("list_dbs failed",
268 "list_dbs response is not array");
269 ovsdb_error_assert(error);
270 state = RPL_S_ERR;
271 } else {
272 send_schema_requests(msg->result);
273 VLOG_DBG("Send schema requests");
274 state = RPL_S_SCHEMA_REQUESTED;
275 }
276 break;
277
278 case RPL_S_SCHEMA_REQUESTED: {
279 struct ovsdb_schema *schema;
280 struct ovsdb_error *error;
281
282 error = ovsdb_schema_from_json(msg->result, &schema);
283 if (error) {
284 ovsdb_error_assert(error);
285 state = RPL_S_ERR;
286 }
287
288 if (db != find_db(schema->name)) {
289 /* Unexpected schema. */
290 VLOG_WARN("unexpected schema %s", schema->name);
291 state = RPL_S_ERR;
292 } else if (!ovsdb_schema_equal(schema, db->schema)) {
293 /* Schmea version mismatch. */
294 VLOG_INFO("Schema version mismatch, %s not replicated",
295 schema->name);
296 shash_find_and_delete(replication_dbs, schema->name);
297 }
298 ovsdb_schema_destroy(schema);
299
300 /* After receiving schemas, reset the local databases that
301 * will be monitored and send out monitor requests for them. */
302 if (hmap_is_empty(&request_ids)) {
303 struct shash_node *node, *next;
304
305 SHASH_FOR_EACH_SAFE (node, next, replication_dbs) {
306 db = node->data;
307 error = reset_database(db);
308 if (error) {
309 const char *db_name = db->schema->name;
310 shash_find_and_delete(replication_dbs, db_name);
311 ovsdb_error_assert(error);
312 VLOG_WARN("Failed to reset database, "
313 "%s not replicated.", db_name);
314 }
315 }
316
317 if (shash_is_empty(replication_dbs)) {
318 VLOG_WARN("Nothing to replicate.");
319 state = RPL_S_ERR;
320 } else {
321 SHASH_FOR_EACH (node, replication_dbs) {
322 db = node->data;
323 struct jsonrpc_msg *request =
324 create_monitor_request(db);
325
326 request_ids_add(request->id, db);
327 jsonrpc_session_send(session, request);
328 VLOG_DBG("Send monitor requests");
329 state = RPL_S_MONITOR_REQUESTED;
330 }
331 }
332 }
333 break;
334 }
335
336 case RPL_S_MONITOR_REQUESTED: {
337 /* Reply to monitor requests. */
338 struct ovsdb_error *error;
339 error = process_notification(msg->result, db);
340 if (error) {
341 ovsdb_error_assert(error);
342 state = RPL_S_ERR;
343 } else {
344 /* Transition to replicating state after receiving
345 * all replies of "monitor" requests. */
346 if (hmap_is_empty(&request_ids)) {
347 VLOG_DBG("Listening to monitor updates");
348 state = RPL_S_REPLICATING;
349 }
350 }
351 break;
352 }
353
354 case RPL_S_ERR:
355 /* Ignore all messages */
356 break;
357
358 case RPL_S_INIT:
359 case RPL_S_REPLICATING:
360 default:
361 OVS_NOT_REACHED();
362 }
363 }
364 next:
365 jsonrpc_msg_destroy(msg);
366 }
367 }
368
369 void
370 replication_wait(void)
371 {
372 if (session) {
373 jsonrpc_session_wait(session);
374 jsonrpc_session_recv_wait(session);
375 }
376 }
377
378 /* Parse 'blacklist' to rebuild 'blacklist_tables'. If 'dryrun' is false, the
379 * current black list tables will be wiped out, regardless of whether
380 * 'blacklist' can be parsed. If 'dryrun' is true, only parses 'blacklist' and
381 * reports any errors, without modifying the blacklist.
382 *
383 * On error, returns the error string, which the caller is
384 * responsible for freeing. Returns NULL otherwise. */
385 char * OVS_WARN_UNUSED_RESULT
386 set_blacklist_tables(const char *blacklist, bool dryrun)
387 {
388 struct sset set = SSET_INITIALIZER(&set);
389 char *err = NULL;
390
391 if (blacklist) {
392 const char *longname;
393
394 if (!dryrun) {
395 /* Can only add to an empty shash. */
396 blacklist_tables_clear();
397 }
398
399 sset_from_delimited_string(&set, blacklist, " ,");
400 SSET_FOR_EACH (longname, &set) {
401 char *database = xstrdup(longname), *table = NULL;
402 strtok_r(database, ":", &table);
403 if (table && !dryrun) {
404 blacklist_tables_add(database, table);
405 }
406
407 free(database);
408 if (!table) {
409 err = xasprintf("Can't parse black list table: %s", longname);
410 goto done;
411 }
412 }
413 }
414
415 done:
416 sset_destroy(&set);
417 if (err && !dryrun) {
418 /* On error, destroy the partially built 'blacklist_tables'. */
419 blacklist_tables_clear();
420 }
421 return err;
422 }
423
424 char * OVS_WARN_UNUSED_RESULT
425 get_blacklist_tables(void)
426 {
427 struct shash_node *node;
428 struct sset set = SSET_INITIALIZER(&set);
429
430 SHASH_FOR_EACH (node, &blacklist_tables) {
431 const char *database = node->name;
432 const char *table;
433 struct sset *tables = node->data;
434
435 SSET_FOR_EACH (table, tables) {
436 sset_add_and_free(&set, xasprintf("%s:%s", database, table));
437 }
438 }
439
440 /* Output the table list in an sorted order, so that
441 * the output string will not depend on the hash function
442 * that used to implement the hmap data structure. This is
443 * only useful for writting unit tests. */
444 const char **sorted = sset_sort(&set);
445 struct ds ds = DS_EMPTY_INITIALIZER;
446 size_t i;
447 for (i = 0; i < sset_count(&set); i++) {
448 ds_put_format(&ds, "%s,", sorted[i]);
449 }
450
451 ds_chomp(&ds, ',');
452
453 free(sorted);
454 sset_destroy(&set);
455
456 return ds_steal_cstr(&ds);
457 }
458
459 static void
460 blacklist_tables_clear(void)
461 {
462 struct shash_node *node;
463 SHASH_FOR_EACH (node, &blacklist_tables) {
464 struct sset *tables = node->data;
465 sset_destroy(tables);
466 }
467
468 shash_clear_free_data(&blacklist_tables);
469 }
470
471 static void
472 blacklist_tables_add(const char *database, const char *table)
473 {
474 struct sset *tables = shash_find_data(&blacklist_tables, database);
475
476 if (!tables) {
477 tables = xmalloc(sizeof *tables);
478 sset_init(tables);
479 shash_add(&blacklist_tables, database, tables);
480 }
481
482 sset_add(tables, table);
483 }
484
485 static bool
486 blacklist_tables_find(const char *database, const char *table)
487 {
488 struct sset *tables = shash_find_data(&blacklist_tables, database);
489 return tables && sset_contains(tables, table);
490 }
491
492 void
493 disconnect_active_server(void)
494 {
495 jsonrpc_session_close(session);
496 session = NULL;
497 }
498
499 void
500 replication_destroy(void)
501 {
502 blacklist_tables_clear();
503 shash_destroy(&blacklist_tables);
504
505 if (sync_from) {
506 free(sync_from);
507 sync_from = NULL;
508 }
509
510 request_ids_destroy();
511 replication_dbs_destroy();
512
513 shash_destroy(&local_dbs);
514 }
515
516 static struct ovsdb *
517 find_db(const char *db_name)
518 {
519 return shash_find_data(replication_dbs, db_name);
520 }
521 \f
522 static struct ovsdb_error *
523 reset_database(struct ovsdb *db)
524 {
525 struct ovsdb_txn *txn = ovsdb_txn_create(db);
526 struct shash_node *table_node;
527
528 SHASH_FOR_EACH (table_node, &db->tables) {
529 /* Delete all rows if the table is not blacklisted. */
530 if (!blacklist_tables_find(db->schema->name, table_node->name)) {
531 struct ovsdb_table *table = table_node->data;
532 struct ovsdb_row *row, *next;
533 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &table->rows) {
534 ovsdb_txn_row_delete(txn, row);
535 }
536 }
537 }
538
539 return ovsdb_txn_commit(txn, false);
540 }
541
542 /* Create a monitor request for 'db'. The monitor request will include
543 * any tables from 'blacklisted_tables'
544 *
545 * Caller is responsible for disposing 'request'.
546 */
547 static struct jsonrpc_msg *
548 create_monitor_request(struct ovsdb *db)
549 {
550 struct jsonrpc_msg *request;
551 struct json *monitor;
552 struct ovsdb_schema *schema = db->schema;
553 const char *db_name = schema->name;
554
555 struct json *monitor_request = json_object_create();
556 size_t n = shash_count(&schema->tables);
557 const struct shash_node **nodes = shash_sort(&schema->tables);
558
559 for (int j = 0; j < n; j++) {
560 struct ovsdb_table_schema *table = nodes[j]->data;
561
562 /* Monitor all tables not blacklisted. */
563 if (!blacklist_tables_find(db_name, table->name)) {
564 add_monitored_table(table, monitor_request);
565 }
566 }
567 free(nodes);
568
569 /* Create a monitor request. */
570 monitor = json_array_create_3(
571 json_string_create(db_name),
572 json_string_create(db_name),
573 monitor_request);
574 request = jsonrpc_create_request("monitor", monitor, NULL);
575
576 return request;
577 }
578
579 static void
580 add_monitored_table(struct ovsdb_table_schema *table,
581 struct json *monitor_request)
582 {
583 struct json *monitor_request_array;
584
585 monitor_request_array = json_array_create_empty();
586 json_array_add(monitor_request_array, json_object_create());
587
588 json_object_put(monitor_request, table->name, monitor_request_array);
589 }
590
591 \f
592 static struct ovsdb_error *
593 process_notification(struct json *table_updates, struct ovsdb *db)
594 {
595 struct ovsdb_error *error = NULL;
596 struct ovsdb_txn *txn;
597
598 if (table_updates->type == JSON_OBJECT) {
599 txn = ovsdb_txn_create(db);
600
601 /* Process each table update. */
602 struct shash_node *node;
603 SHASH_FOR_EACH (node, json_object(table_updates)) {
604 struct json *table_update = node->data;
605 if (table_update) {
606 error = process_table_update(table_update, node->name, db, txn);
607 if (error) {
608 break;
609 }
610 }
611 }
612
613 if (error) {
614 ovsdb_txn_abort(txn);
615 return error;
616 } else {
617 /* Commit transaction. */
618 error = ovsdb_txn_commit(txn, false);
619 }
620 }
621
622 return error;
623 }
624
625 static struct ovsdb_error *
626 process_table_update(struct json *table_update, const char *table_name,
627 struct ovsdb *database, struct ovsdb_txn *txn)
628 {
629 struct ovsdb_table *table = ovsdb_get_table(database, table_name);
630 if (!table) {
631 return ovsdb_error("unknown table", "unknown table %s", table_name);
632 }
633
634 if (table_update->type != JSON_OBJECT) {
635 return ovsdb_error("Not a JSON object",
636 "<table-update> for table is not object");
637 }
638
639 struct shash_node *node;
640 SHASH_FOR_EACH (node, json_object(table_update)) {
641 struct json *row_update = node->data;
642 struct json *old, *new;
643
644 if (row_update->type != JSON_OBJECT) {
645 return ovsdb_error("Not a JSON object",
646 "<row-update> is not object");
647 }
648
649 struct uuid uuid;
650 if (!uuid_from_string(&uuid, node->name)) {
651 return ovsdb_syntax_error(table_update, "bad row UUID",
652 "<table-update> names must be UUIDs");
653 }
654
655 old = shash_find_data(json_object(row_update), "old");
656 new = shash_find_data(json_object(row_update), "new");
657
658 struct ovsdb_error *error;
659 error = (!new ? execute_delete(txn, &uuid, table)
660 : !old ? execute_insert(txn, &uuid, table, new)
661 : execute_update(txn, &uuid, table, new));
662 if (error) {
663 return error;
664 }
665 }
666 return NULL;
667 }
668 \f
669 static struct ovsdb_error *
670 execute_insert(struct ovsdb_txn *txn, const struct uuid *row_uuid,
671 struct ovsdb_table *table, struct json *json_row)
672 {
673 struct ovsdb_row *row = ovsdb_row_create(table);
674 struct ovsdb_error *error = ovsdb_row_from_json(row, json_row, NULL, NULL);
675 if (!error) {
676 *ovsdb_row_get_uuid_rw(row) = *row_uuid;
677 ovsdb_txn_row_insert(txn, row);
678 } else {
679 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
680 VLOG_WARN_RL(&rl, "cannot add existing row "UUID_FMT" to table %s",
681 UUID_ARGS(row_uuid), table->schema->name);
682 ovsdb_row_destroy(row);
683 }
684
685 return error;
686 }
687
688 static struct ovsdb_error *
689 execute_delete(struct ovsdb_txn *txn, const struct uuid *row_uuid,
690 struct ovsdb_table *table)
691 {
692 const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
693 if (row) {
694 ovsdb_txn_row_delete(txn, row);
695 } else {
696 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
697 VLOG_WARN_RL(&rl, "cannot delete missing row "UUID_FMT" from table %s",
698 UUID_ARGS(row_uuid), table->schema->name);
699 }
700 return NULL;
701 }
702
703 static struct ovsdb_error *
704 execute_update(struct ovsdb_txn *txn, const struct uuid *row_uuid,
705 struct ovsdb_table *table, struct json *json_row)
706 {
707 const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
708 if (!row) {
709 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
710 VLOG_WARN_RL(&rl, "cannot modify missing row "UUID_FMT" in table %s",
711 UUID_ARGS(row_uuid), table->schema->name);
712 return NULL;
713 }
714
715 struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
716 struct ovsdb_row *update = ovsdb_row_create(table);
717 struct ovsdb_error *error = ovsdb_row_from_json(update, json_row,
718 NULL, &columns);
719
720 if (!error && !ovsdb_row_equal_columns(row, update, &columns)) {
721 ovsdb_row_update_columns(ovsdb_txn_row_modify(txn, row),
722 update, &columns);
723 }
724
725 ovsdb_column_set_destroy(&columns);
726 ovsdb_row_destroy(update);
727 return error;
728 }
729
730 void
731 request_ids_add(const struct json *id, struct ovsdb *db)
732 {
733 struct request_ids_hmap_node *node = xmalloc(sizeof *node);
734
735 node->request_id = json_clone(id);
736 node->db = db;
737 hmap_insert(&request_ids, &node->hmap, json_hash(id, 0));
738 }
739
740 /* Look up 'id' from 'request_ids', if found, remove the found id from
741 * 'request_ids' and free its memory. If not found, 'request_ids' does
742 * not change. Sets '*db' to the database for the request (NULL if not
743 * found).
744 *
745 * Return true if 'id' is found, false otherwise.
746 */
747 bool
748 request_ids_lookup_and_free(const struct json *id, struct ovsdb **db)
749 {
750 struct request_ids_hmap_node *node;
751
752 HMAP_FOR_EACH_WITH_HASH (node, hmap, json_hash(id, 0), &request_ids) {
753 if (json_equal(id, node->request_id)) {
754 hmap_remove(&request_ids, &node->hmap);
755 *db = node->db;
756 json_destroy(node->request_id);
757 free(node);
758 return true;
759 }
760 }
761
762 *db = NULL;
763 return false;
764 }
765
766 static void
767 request_ids_destroy(void)
768 {
769 struct request_ids_hmap_node *node;
770
771 HMAP_FOR_EACH_POP (node, hmap, &request_ids) {
772 json_destroy(node->request_id);
773 free(node);
774 }
775 hmap_destroy(&request_ids);
776 }
777
778 void
779 request_ids_clear(void)
780 {
781 request_ids_destroy();
782 hmap_init(&request_ids);
783 }
784
785 static struct shash *
786 replication_db_clone(struct shash *dbs)
787 {
788 struct shash *new = xmalloc(sizeof *new);
789 shash_init(new);
790
791 struct shash_node *node;
792 SHASH_FOR_EACH (node, dbs) {
793 shash_add(new, node->name, node->data);
794 }
795
796 return new;
797 }
798
799 static void
800 replication_dbs_destroy(void)
801 {
802 shash_destroy(replication_dbs);
803 free(replication_dbs);
804 replication_dbs = NULL;
805 }
806
807 /* Return true if replication just started or is ongoing.
808 * Return false if the connection failed, or the replication
809 * was not able to start. */
810 bool
811 replication_is_alive(void)
812 {
813 if (session) {
814 return jsonrpc_session_is_alive(session) && state != RPL_S_ERR;
815 }
816 return false;
817 }
818
819 /* Return the last error reported on a connection by 'session'. The
820 * return value is 0 if replication is not currently running, or
821 * if replication session has not encountered any error.
822 *
823 * Return a negative value if replication session has error, or the
824 * replication was not able to start. */
825 int
826 replication_get_last_error(void)
827 {
828 int err = 0;
829
830 if (session) {
831 err = jsonrpc_session_get_last_error(session);
832 if (!err) {
833 err = (state == RPL_S_ERR) ? ENOENT : 0;
834 }
835 }
836
837 return err;
838 }
839
840 char *
841 replication_status(void)
842 {
843 bool alive = session && jsonrpc_session_is_alive(session);
844 struct ds ds = DS_EMPTY_INITIALIZER;
845
846 if (alive) {
847 switch(state) {
848 case RPL_S_INIT:
849 case RPL_S_SERVER_ID_REQUESTED:
850 case RPL_S_DB_REQUESTED:
851 case RPL_S_SCHEMA_REQUESTED:
852 case RPL_S_MONITOR_REQUESTED:
853 ds_put_format(&ds, "connecting: %s", sync_from);
854 break;
855 case RPL_S_REPLICATING: {
856 struct shash_node *node;
857
858 ds_put_format(&ds, "replicating: %s\n", sync_from);
859 ds_put_cstr(&ds, "database:");
860 SHASH_FOR_EACH (node, replication_dbs) {
861 ds_put_format(&ds, " %s,", node->name);
862 }
863 ds_chomp(&ds, ',');
864
865 if (!shash_is_empty(&blacklist_tables)) {
866 ds_put_char(&ds, '\n');
867 ds_put_cstr(&ds, "exclude: ");
868 ds_put_and_free_cstr(&ds, get_blacklist_tables());
869 }
870 break;
871 }
872 case RPL_S_ERR:
873 ds_put_format(&ds, "Replication to (%s) failed\n", sync_from);
874 break;
875 default:
876 OVS_NOT_REACHED();
877 }
878 } else {
879 ds_put_format(&ds, "not connected to %s", sync_from);
880 }
881 return ds_steal_cstr(&ds);
882 }
883
884 void
885 replication_usage(void)
886 {
887 printf("\n\
888 Syncing options:\n\
889 --sync-from=SERVER sync DATABASE from active SERVER and start in\n\
890 backup mode (except with --active)\n\
891 --sync-exclude-tables=DB:TABLE,...\n\
892 exclude the TABLE in DB from syncing\n\
893 --active with --sync-from, start in active mode\n");
894 }