2 * (c) Copyright 2016, 2017 Hewlett Packard Enterprise Development LP
3 * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016, 2017 Nicira, Inc.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
21 #include "condition.h"
23 #include "openvswitch/dynamic-string.h"
24 #include "openvswitch/hmap.h"
25 #include "openvswitch/json.h"
26 #include "openvswitch/vlog.h"
27 #include "ovsdb-error.h"
30 #include "replication.h"
36 #include "transaction.h"
39 VLOG_DEFINE_THIS_MODULE(replication
);
41 static char *sync_from
;
42 static struct uuid server_uuid
;
43 static struct jsonrpc_session
*session
;
44 static unsigned int session_seqno
= UINT_MAX
;
46 static struct jsonrpc_msg
*create_monitor_request(struct ovsdb_schema
*);
47 static void add_monitored_table(struct ovsdb_table_schema
*table
,
48 struct json
*monitor_requests
);
50 static struct ovsdb_error
*reset_database(struct ovsdb
*db
);
52 static struct ovsdb_error
*process_notification(struct json
*, struct ovsdb
*);
53 static struct ovsdb_error
*process_table_update(struct json
*table_update
,
54 const char *table_name
,
55 struct ovsdb
*database
,
56 struct ovsdb_txn
*txn
);
58 static struct ovsdb_error
*execute_insert(struct ovsdb_txn
*txn
,
59 const struct uuid
*row_uuid
,
60 struct ovsdb_table
*table
,
62 static struct ovsdb_error
*execute_delete(struct ovsdb_txn
*txn
,
63 const struct uuid
*row_uuid
,
64 struct ovsdb_table
*table
);
65 static struct ovsdb_error
*execute_update(struct ovsdb_txn
*txn
,
66 const struct uuid
*row_uuid
,
67 struct ovsdb_table
*table
,
70 /* Maps from db name to sset of table names. */
71 static struct shash blacklist_tables
= SHASH_INITIALIZER(&blacklist_tables
);
73 static void blacklist_tables_clear(void);
74 static void blacklist_tables_add(const char *database
, const char *table
);
75 static bool blacklist_tables_find(const char *database
, const char* table
);
78 /* Keep track of request IDs of all outstanding OVSDB requests. */
79 static struct hmap request_ids
= HMAP_INITIALIZER(&request_ids
);
81 struct request_ids_hmap_node
{
82 struct hmap_node hmap
;
83 struct json
*request_id
;
84 struct ovsdb
*db
; /* associated database */
86 void request_ids_add(const struct json
*id
, struct ovsdb
*db
);
87 bool request_ids_lookup_and_free(const struct json
*id
, struct ovsdb
**db
);
88 static void request_ids_destroy(void);
89 void request_ids_clear(void);
91 enum ovsdb_replication_state
{
93 RPL_S_SERVER_ID_REQUESTED
,
95 RPL_S_SCHEMA_REQUESTED
,
96 RPL_S_MONITOR_REQUESTED
,
98 RPL_S_ERR
/* Error, no longer replicating. */
100 static enum ovsdb_replication_state state
;
103 struct replication_db
{
105 bool schema_version_higher
;
106 /* Points to the schema received from the active server if
107 * the local db schema version is higher. NULL otherwise. */
108 struct ovsdb_schema
*active_db_schema
;
111 static bool is_replication_possible(struct ovsdb_schema
*local_db_schema
,
112 struct ovsdb_schema
*active_db_schema
);
114 /* All DBs known to ovsdb-server. The actual replication dbs are stored
115 * in 'replication dbs', which is a subset of all dbs and remote dbs whose
117 static struct shash local_dbs
= SHASH_INITIALIZER(&local_dbs
);
118 static struct shash
*replication_dbs
;
120 static struct shash
*replication_dbs_create(void);
121 static void replication_dbs_destroy(void);
122 /* Find 'struct ovsdb' by name within 'replication_dbs' */
123 static struct replication_db
*find_db(const char *db_name
);
127 replication_init(const char *sync_from_
, const char *exclude_tables
,
128 const struct uuid
*server
, int probe_interval
)
131 sync_from
= xstrdup(sync_from_
);
132 /* Caller should have verified that the 'exclude_tables' is
133 * parseable. An error here is unexpected. */
134 ovs_assert(!set_blacklist_tables(exclude_tables
, false));
136 replication_dbs_destroy();
138 shash_clear(&local_dbs
);
140 jsonrpc_session_close(session
);
143 session
= jsonrpc_session_open(sync_from
, true);
144 session_seqno
= UINT_MAX
;
146 jsonrpc_session_set_probe_interval(session
, probe_interval
);
148 /* Keep a copy of local server uuid. */
149 server_uuid
= *server
;
155 replication_add_local_db(const char *database
, struct ovsdb
*db
)
157 shash_add_assert(&local_dbs
, database
, db
);
161 send_schema_requests(const struct json
*result
)
163 for (size_t i
= 0; i
< result
->array
.n
; i
++) {
164 const struct json
*name
= result
->array
.elems
[i
];
165 if (name
->type
== JSON_STRING
) {
166 /* Send one schema request for each remote DB. */
167 const char *db_name
= json_string(name
);
168 struct replication_db
*rdb
= find_db(db_name
);
170 struct jsonrpc_msg
*request
=
171 jsonrpc_create_request(
174 json_string_create(db_name
)),
177 request_ids_add(request
->id
, rdb
->db
);
178 jsonrpc_session_send(session
, request
);
185 replication_run(void)
191 jsonrpc_session_run(session
);
193 for (int i
= 0; jsonrpc_session_is_connected(session
) && i
< 50; i
++) {
194 struct jsonrpc_msg
*msg
;
197 seqno
= jsonrpc_session_get_seqno(session
);
198 if (seqno
!= session_seqno
|| state
== RPL_S_INIT
) {
199 session_seqno
= seqno
;
201 struct jsonrpc_msg
*request
;
202 request
= jsonrpc_create_request("get_server_id",
203 json_array_create_empty(), NULL
);
204 request_ids_add(request
->id
, NULL
);
205 jsonrpc_session_send(session
, request
);
207 state
= RPL_S_SERVER_ID_REQUESTED
;
208 VLOG_DBG("send server ID request.");
211 msg
= jsonrpc_session_recv(session
);
216 if (msg
->type
== JSONRPC_NOTIFY
&& state
!= RPL_S_ERR
217 && !strcmp(msg
->method
, "update")) {
218 if (msg
->params
->type
== JSON_ARRAY
219 && msg
->params
->array
.n
== 2
220 && msg
->params
->array
.elems
[0]->type
== JSON_STRING
) {
221 char *db_name
= msg
->params
->array
.elems
[0]->string
;
222 struct replication_db
*rdb
= find_db(db_name
);
224 struct ovsdb_error
*error
;
225 error
= process_notification(msg
->params
->array
.elems
[1],
228 ovsdb_error_assert(error
);
233 } else if (msg
->type
== JSONRPC_REPLY
) {
234 struct replication_db
*rdb
;
236 if (!request_ids_lookup_and_free(msg
->id
, &db
)) {
237 VLOG_WARN("received unexpected reply");
242 case RPL_S_SERVER_ID_REQUESTED
: {
244 if (msg
->result
->type
!= JSON_STRING
||
245 !uuid_from_string(&uuid
, json_string(msg
->result
))) {
246 struct ovsdb_error
*error
;
247 error
= ovsdb_error("get_server_id failed",
248 "Server ID is not valid UUID");
250 ovsdb_error_assert(error
);
255 if (uuid_equals(&uuid
, &server_uuid
)) {
256 struct ovsdb_error
*error
;
257 error
= ovsdb_error("Server ID check failed",
258 "Self replicating is not allowed");
260 ovsdb_error_assert(error
);
265 struct jsonrpc_msg
*request
;
266 request
= jsonrpc_create_request("list_dbs",
267 json_array_create_empty(),
269 request_ids_add(request
->id
, NULL
);
270 jsonrpc_session_send(session
, request
);
272 replication_dbs_destroy();
273 replication_dbs
= replication_dbs_create();
274 state
= RPL_S_DB_REQUESTED
;
277 case RPL_S_DB_REQUESTED
:
278 if (msg
->result
->type
!= JSON_ARRAY
) {
279 struct ovsdb_error
*error
;
280 error
= ovsdb_error("list_dbs failed",
281 "list_dbs response is not array");
282 ovsdb_error_assert(error
);
285 send_schema_requests(msg
->result
);
286 VLOG_DBG("Send schema requests");
287 state
= RPL_S_SCHEMA_REQUESTED
;
291 case RPL_S_SCHEMA_REQUESTED
: {
292 struct ovsdb_schema
*schema
;
293 struct ovsdb_error
*error
;
295 error
= ovsdb_schema_from_json(msg
->result
, &schema
);
297 ovsdb_error_assert(error
);
301 rdb
= find_db(schema
->name
);
303 /* Unexpected schema. */
304 VLOG_WARN("unexpected schema %s", schema
->name
);
306 } else if (!ovsdb_schema_equal(schema
, rdb
->db
->schema
)) {
307 /* Schmea version mismatch. */
308 VLOG_INFO("Schema version mismatch, checking if %s can "
309 "still be replicated or not.",
311 if (is_replication_possible(rdb
->db
->schema
, schema
)) {
312 VLOG_INFO("%s can be replicated.", schema
->name
);
313 rdb
->schema_version_higher
= true;
314 if (rdb
->active_db_schema
) {
315 ovsdb_schema_destroy(rdb
->active_db_schema
);
317 rdb
->active_db_schema
= schema
;
319 VLOG_INFO("%s cannot be replicated.", schema
->name
);
320 struct replication_db
*r
=
321 shash_find_and_delete(replication_dbs
,
323 if (r
->active_db_schema
) {
324 ovsdb_schema_destroy(r
->active_db_schema
);
327 ovsdb_schema_destroy(schema
);
330 ovsdb_schema_destroy(schema
);
333 /* After receiving schemas, reset the local databases that
334 * will be monitored and send out monitor requests for them. */
335 if (hmap_is_empty(&request_ids
)) {
336 struct shash_node
*node
;
338 if (shash_is_empty(replication_dbs
)) {
339 VLOG_WARN("Nothing to replicate.");
342 SHASH_FOR_EACH (node
, replication_dbs
) {
344 struct jsonrpc_msg
*request
=
345 create_monitor_request(
346 rdb
->schema_version_higher
?
347 rdb
->active_db_schema
: rdb
->db
->schema
);
349 request_ids_add(request
->id
, rdb
->db
);
350 jsonrpc_session_send(session
, request
);
351 VLOG_DBG("Send monitor requests");
352 state
= RPL_S_MONITOR_REQUESTED
;
359 case RPL_S_MONITOR_REQUESTED
: {
360 /* Reply to monitor requests. */
361 struct ovsdb_error
*error
;
362 VLOG_INFO("Monitor request received. Resetting the database");
363 /* Resetting the database here has few risks. If the
364 * process_notification() fails, the database is completely
365 * lost locally. In case that node becomes active, then
366 * there is a chance of complete data loss in the active/standy
368 error
= reset_database(db
);
370 error
= process_notification(msg
->result
, db
);
373 ovsdb_error_assert(error
);
376 /* Transition to replicating state after receiving
377 * all replies of "monitor" requests. */
378 if (hmap_is_empty(&request_ids
)) {
379 VLOG_DBG("Listening to monitor updates");
380 state
= RPL_S_REPLICATING
;
387 /* Ignore all messages */
391 case RPL_S_REPLICATING
:
397 jsonrpc_msg_destroy(msg
);
402 replication_wait(void)
405 jsonrpc_session_wait(session
);
406 jsonrpc_session_recv_wait(session
);
410 /* Parse 'blacklist' to rebuild 'blacklist_tables'. If 'dryrun' is false, the
411 * current black list tables will be wiped out, regardless of whether
412 * 'blacklist' can be parsed. If 'dryrun' is true, only parses 'blacklist' and
413 * reports any errors, without modifying the blacklist.
415 * On error, returns the error string, which the caller is
416 * responsible for freeing. Returns NULL otherwise. */
417 char * OVS_WARN_UNUSED_RESULT
418 set_blacklist_tables(const char *blacklist
, bool dryrun
)
420 struct sset set
= SSET_INITIALIZER(&set
);
424 const char *longname
;
427 /* Can only add to an empty shash. */
428 blacklist_tables_clear();
431 sset_from_delimited_string(&set
, blacklist
, " ,");
432 SSET_FOR_EACH (longname
, &set
) {
433 char *database
= xstrdup(longname
), *table
= NULL
;
434 strtok_r(database
, ":", &table
);
435 if (table
&& !dryrun
) {
436 blacklist_tables_add(database
, table
);
441 err
= xasprintf("Can't parse black list table: %s", longname
);
449 if (err
&& !dryrun
) {
450 /* On error, destroy the partially built 'blacklist_tables'. */
451 blacklist_tables_clear();
456 char * OVS_WARN_UNUSED_RESULT
457 get_blacklist_tables(void)
459 struct shash_node
*node
;
460 struct sset set
= SSET_INITIALIZER(&set
);
462 SHASH_FOR_EACH (node
, &blacklist_tables
) {
463 const char *database
= node
->name
;
465 struct sset
*tables
= node
->data
;
467 SSET_FOR_EACH (table
, tables
) {
468 sset_add_and_free(&set
, xasprintf("%s:%s", database
, table
));
472 /* Output the table list in an sorted order, so that
473 * the output string will not depend on the hash function
474 * that used to implement the hmap data structure. This is
475 * only useful for writting unit tests. */
476 const char **sorted
= sset_sort(&set
);
477 struct ds ds
= DS_EMPTY_INITIALIZER
;
479 for (i
= 0; i
< sset_count(&set
); i
++) {
480 ds_put_format(&ds
, "%s,", sorted
[i
]);
488 return ds_steal_cstr(&ds
);
492 blacklist_tables_clear(void)
494 struct shash_node
*node
;
495 SHASH_FOR_EACH (node
, &blacklist_tables
) {
496 struct sset
*tables
= node
->data
;
497 sset_destroy(tables
);
500 shash_clear_free_data(&blacklist_tables
);
504 blacklist_tables_add(const char *database
, const char *table
)
506 struct sset
*tables
= shash_find_data(&blacklist_tables
, database
);
509 tables
= xmalloc(sizeof *tables
);
511 shash_add(&blacklist_tables
, database
, tables
);
514 sset_add(tables
, table
);
518 blacklist_tables_find(const char *database
, const char *table
)
520 struct sset
*tables
= shash_find_data(&blacklist_tables
, database
);
521 return tables
&& sset_contains(tables
, table
);
525 disconnect_active_server(void)
527 jsonrpc_session_close(session
);
532 replication_destroy(void)
534 blacklist_tables_clear();
535 shash_destroy(&blacklist_tables
);
542 request_ids_destroy();
543 replication_dbs_destroy();
545 shash_destroy(&local_dbs
);
548 static struct replication_db
*
549 find_db(const char *db_name
)
551 return shash_find_data(replication_dbs
, db_name
);
554 static struct ovsdb_error
*
555 reset_database(struct ovsdb
*db
)
557 struct ovsdb_txn
*txn
= ovsdb_txn_create(db
);
558 struct shash_node
*table_node
;
560 SHASH_FOR_EACH (table_node
, &db
->tables
) {
561 /* Delete all rows if the table is not blacklisted. */
562 if (!blacklist_tables_find(db
->schema
->name
, table_node
->name
)) {
563 struct ovsdb_table
*table
= table_node
->data
;
564 struct ovsdb_row
*row
, *next
;
565 HMAP_FOR_EACH_SAFE (row
, next
, hmap_node
, &table
->rows
) {
566 ovsdb_txn_row_delete(txn
, row
);
571 return ovsdb_txn_propose_commit_block(txn
, false);
574 /* Create a monitor request for 'db'. The monitor request will include
575 * any tables from 'blacklisted_tables'
577 * Caller is responsible for disposing 'request'.
579 static struct jsonrpc_msg
*
580 create_monitor_request(struct ovsdb_schema
*schema
)
582 struct jsonrpc_msg
*request
;
583 struct json
*monitor
;
584 const char *db_name
= schema
->name
;
586 struct json
*monitor_request
= json_object_create();
587 size_t n
= shash_count(&schema
->tables
);
588 const struct shash_node
**nodes
= shash_sort(&schema
->tables
);
590 for (int j
= 0; j
< n
; j
++) {
591 struct ovsdb_table_schema
*table
= nodes
[j
]->data
;
593 /* Monitor all tables not blacklisted. */
594 if (!blacklist_tables_find(db_name
, table
->name
)) {
595 add_monitored_table(table
, monitor_request
);
600 /* Create a monitor request. */
601 monitor
= json_array_create_3(
602 json_string_create(db_name
),
603 json_string_create(db_name
),
605 request
= jsonrpc_create_request("monitor", monitor
, NULL
);
611 add_monitored_table(struct ovsdb_table_schema
*table
,
612 struct json
*monitor_request
)
614 struct json
*monitor_request_array
;
616 monitor_request_array
= json_array_create_empty();
617 json_array_add(monitor_request_array
, json_object_create());
619 json_object_put(monitor_request
, table
->name
, monitor_request_array
);
623 static struct ovsdb_error
*
624 process_notification(struct json
*table_updates
, struct ovsdb
*db
)
626 struct ovsdb_error
*error
= NULL
;
627 struct ovsdb_txn
*txn
;
629 if (table_updates
->type
== JSON_OBJECT
) {
630 txn
= ovsdb_txn_create(db
);
632 /* Process each table update. */
633 struct shash_node
*node
;
634 SHASH_FOR_EACH (node
, json_object(table_updates
)) {
635 struct json
*table_update
= node
->data
;
637 error
= process_table_update(table_update
, node
->name
, db
, txn
);
645 ovsdb_txn_abort(txn
);
648 /* Commit transaction. */
649 error
= ovsdb_txn_propose_commit_block(txn
, false);
656 static struct ovsdb_error
*
657 process_table_update(struct json
*table_update
, const char *table_name
,
658 struct ovsdb
*database
, struct ovsdb_txn
*txn
)
660 struct ovsdb_table
*table
= ovsdb_get_table(database
, table_name
);
662 return ovsdb_error("unknown table", "unknown table %s", table_name
);
665 if (table_update
->type
!= JSON_OBJECT
) {
666 return ovsdb_error("Not a JSON object",
667 "<table-update> for table is not object");
670 struct shash_node
*node
;
671 SHASH_FOR_EACH (node
, json_object(table_update
)) {
672 struct json
*row_update
= node
->data
;
673 struct json
*old
, *new;
675 if (row_update
->type
!= JSON_OBJECT
) {
676 return ovsdb_error("Not a JSON object",
677 "<row-update> is not object");
681 if (!uuid_from_string(&uuid
, node
->name
)) {
682 return ovsdb_syntax_error(table_update
, "bad row UUID",
683 "<table-update> names must be UUIDs");
686 old
= shash_find_data(json_object(row_update
), "old");
687 new = shash_find_data(json_object(row_update
), "new");
689 struct ovsdb_error
*error
;
690 error
= (!new ? execute_delete(txn
, &uuid
, table
)
691 : !old
? execute_insert(txn
, &uuid
, table
, new)
692 : execute_update(txn
, &uuid
, table
, new));
700 static struct ovsdb_error
*
701 execute_insert(struct ovsdb_txn
*txn
, const struct uuid
*row_uuid
,
702 struct ovsdb_table
*table
, struct json
*json_row
)
704 struct ovsdb_row
*row
= ovsdb_row_create(table
);
705 struct ovsdb_error
*error
= ovsdb_row_from_json(row
, json_row
, NULL
, NULL
);
707 *ovsdb_row_get_uuid_rw(row
) = *row_uuid
;
708 ovsdb_txn_row_insert(txn
, row
);
710 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
711 VLOG_WARN_RL(&rl
, "cannot add existing row "UUID_FMT
" to table %s",
712 UUID_ARGS(row_uuid
), table
->schema
->name
);
713 ovsdb_row_destroy(row
);
719 static struct ovsdb_error
*
720 execute_delete(struct ovsdb_txn
*txn
, const struct uuid
*row_uuid
,
721 struct ovsdb_table
*table
)
723 const struct ovsdb_row
*row
= ovsdb_table_get_row(table
, row_uuid
);
725 ovsdb_txn_row_delete(txn
, row
);
727 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
728 VLOG_WARN_RL(&rl
, "cannot delete missing row "UUID_FMT
" from table %s",
729 UUID_ARGS(row_uuid
), table
->schema
->name
);
734 static struct ovsdb_error
*
735 execute_update(struct ovsdb_txn
*txn
, const struct uuid
*row_uuid
,
736 struct ovsdb_table
*table
, struct json
*json_row
)
738 const struct ovsdb_row
*row
= ovsdb_table_get_row(table
, row_uuid
);
740 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
741 VLOG_WARN_RL(&rl
, "cannot modify missing row "UUID_FMT
" in table %s",
742 UUID_ARGS(row_uuid
), table
->schema
->name
);
746 struct ovsdb_column_set columns
= OVSDB_COLUMN_SET_INITIALIZER
;
747 struct ovsdb_row
*update
= ovsdb_row_create(table
);
748 struct ovsdb_error
*error
= ovsdb_row_from_json(update
, json_row
,
751 if (!error
&& !ovsdb_row_equal_columns(row
, update
, &columns
)) {
752 ovsdb_row_update_columns(ovsdb_txn_row_modify(txn
, row
),
756 ovsdb_column_set_destroy(&columns
);
757 ovsdb_row_destroy(update
);
762 request_ids_add(const struct json
*id
, struct ovsdb
*db
)
764 struct request_ids_hmap_node
*node
= xmalloc(sizeof *node
);
766 node
->request_id
= json_clone(id
);
768 hmap_insert(&request_ids
, &node
->hmap
, json_hash(id
, 0));
771 /* Look up 'id' from 'request_ids', if found, remove the found id from
772 * 'request_ids' and free its memory. If not found, 'request_ids' does
773 * not change. Sets '*db' to the database for the request (NULL if not
776 * Return true if 'id' is found, false otherwise.
779 request_ids_lookup_and_free(const struct json
*id
, struct ovsdb
**db
)
781 struct request_ids_hmap_node
*node
;
783 HMAP_FOR_EACH_WITH_HASH (node
, hmap
, json_hash(id
, 0), &request_ids
) {
784 if (json_equal(id
, node
->request_id
)) {
785 hmap_remove(&request_ids
, &node
->hmap
);
787 json_destroy(node
->request_id
);
798 request_ids_destroy(void)
800 struct request_ids_hmap_node
*node
;
802 HMAP_FOR_EACH_POP (node
, hmap
, &request_ids
) {
803 json_destroy(node
->request_id
);
806 hmap_destroy(&request_ids
);
810 request_ids_clear(void)
812 request_ids_destroy();
813 hmap_init(&request_ids
);
816 static struct shash
*
817 replication_dbs_create(void)
819 struct shash
*new = xmalloc(sizeof *new);
822 struct shash_node
*node
;
823 SHASH_FOR_EACH (node
, &local_dbs
) {
824 struct replication_db
*repl_db
= xmalloc(sizeof *repl_db
);
825 repl_db
->db
= node
->data
;
826 repl_db
->schema_version_higher
= false;
827 repl_db
->active_db_schema
= NULL
;
828 shash_add(new, node
->name
, repl_db
);
835 replication_dbs_destroy(void)
837 if (!replication_dbs
) {
841 struct shash_node
*node
, *next
;
843 SHASH_FOR_EACH_SAFE (node
, next
, replication_dbs
) {
844 hmap_remove(&replication_dbs
->map
, &node
->node
);
845 struct replication_db
*rdb
= node
->data
;
846 if (rdb
->active_db_schema
) {
847 ovsdb_schema_destroy(rdb
->active_db_schema
);
854 hmap_destroy(&replication_dbs
->map
);
855 free(replication_dbs
);
856 replication_dbs
= NULL
;
859 /* Return true if replication just started or is ongoing.
860 * Return false if the connection failed, or the replication
861 * was not able to start. */
863 replication_is_alive(void)
866 return jsonrpc_session_is_alive(session
) && state
!= RPL_S_ERR
;
871 /* Return the last error reported on a connection by 'session'. The
872 * return value is 0 if replication is not currently running, or
873 * if replication session has not encountered any error.
875 * Return a negative value if replication session has error, or the
876 * replication was not able to start. */
878 replication_get_last_error(void)
883 err
= jsonrpc_session_get_last_error(session
);
885 err
= (state
== RPL_S_ERR
) ? ENOENT
: 0;
893 replication_status(void)
895 bool alive
= session
&& jsonrpc_session_is_alive(session
);
896 struct ds ds
= DS_EMPTY_INITIALIZER
;
901 case RPL_S_SERVER_ID_REQUESTED
:
902 case RPL_S_DB_REQUESTED
:
903 case RPL_S_SCHEMA_REQUESTED
:
904 case RPL_S_MONITOR_REQUESTED
:
905 ds_put_format(&ds
, "connecting: %s", sync_from
);
907 case RPL_S_REPLICATING
: {
908 struct shash_node
*node
;
910 ds_put_format(&ds
, "replicating: %s\n", sync_from
);
911 ds_put_cstr(&ds
, "database:");
912 SHASH_FOR_EACH (node
, replication_dbs
) {
913 ds_put_format(&ds
, " %s,", node
->name
);
917 if (!shash_is_empty(&blacklist_tables
)) {
918 ds_put_char(&ds
, '\n');
919 ds_put_cstr(&ds
, "exclude: ");
920 ds_put_and_free_cstr(&ds
, get_blacklist_tables());
925 ds_put_format(&ds
, "Replication to (%s) failed\n", sync_from
);
931 ds_put_format(&ds
, "not connected to %s", sync_from
);
933 return ds_steal_cstr(&ds
);
936 /* Checks if it's possible to replicate to the local db from the active db
937 * schema. Returns true, if 'local_db_schema' has all the tables and columns
938 * of 'active_db_schema', false otherwise.
941 is_replication_possible(struct ovsdb_schema
*local_db_schema
,
942 struct ovsdb_schema
*active_db_schema
)
944 struct shash_node
*node
;
945 SHASH_FOR_EACH (node
, &active_db_schema
->tables
) {
946 struct ovsdb_table_schema
*ldb_table_schema
=
947 shash_find_data(&local_db_schema
->tables
, node
->name
);
948 if (!ldb_table_schema
) {
949 VLOG_INFO("Table %s not present in the local db schema",
954 /* Local schema table should have all the columns
955 * of active schema table. */
956 struct ovsdb_table_schema
*adb_table_schema
= node
->data
;
957 struct shash_node
*n
;
958 SHASH_FOR_EACH (n
, &adb_table_schema
->columns
) {
959 struct ovsdb_column
*ldb_col
=
960 shash_find_data(&ldb_table_schema
->columns
, n
->name
);
962 VLOG_INFO("Column %s not present in the local "
963 "db schema table %s.", n
->name
, node
->name
);
967 struct json
*ldb_col_json
= ovsdb_column_to_json(ldb_col
);
968 struct json
*adb_col_json
= ovsdb_column_to_json(n
->data
);
969 bool cols_equal
= json_equal(ldb_col_json
, adb_col_json
);
970 json_destroy(ldb_col_json
);
971 json_destroy(adb_col_json
);
974 VLOG_INFO("Column %s mismatch in local "
975 "db schema table %s.", n
->name
, node
->name
);
985 replication_set_probe_interval(int probe_interval
)
988 jsonrpc_session_set_probe_interval(session
, probe_interval
);
993 replication_usage(void)
997 --sync-from=SERVER sync DATABASE from active SERVER and start in\n\
998 backup mode (except with --active)\n\
999 --sync-exclude-tables=DB:TABLE,...\n\
1000 exclude the TABLE in DB from syncing\n\
1001 --active with --sync-from, start in active mode\n");