]> git.proxmox.com Git - mirror_ovs.git/blame - ovsdb/replication.c
ovsdb replication: Provide option to configure probe interval.
[mirror_ovs.git] / ovsdb / replication.c
CommitLineData
ae671c5f 1/*
1b1d2e6d 2 * (c) Copyright 2016, 2017 Hewlett Packard Enterprise Development LP
05ac209a 3 * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016, 2017 Nicira, Inc.
ae671c5f
MC
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#include <config.h>
19
ae671c5f
MC
20
21#include "condition.h"
599c0f44 22#include "jsonrpc.h"
3109b4e1 23#include "openvswitch/dynamic-string.h"
599c0f44 24#include "openvswitch/hmap.h"
ee89ea7b 25#include "openvswitch/json.h"
8be42087 26#include "openvswitch/vlog.h"
ae671c5f 27#include "ovsdb-error.h"
599c0f44 28#include "ovsdb.h"
ae671c5f 29#include "query.h"
599c0f44 30#include "replication.h"
ae671c5f 31#include "row.h"
ae671c5f 32#include "sset.h"
599c0f44 33#include "stream.h"
ae671c5f
MC
34#include "svec.h"
35#include "table.h"
36#include "transaction.h"
05ac209a 37#include "uuid.h"
ae671c5f 38
8be42087
AZ
39VLOG_DEFINE_THIS_MODULE(replication);
40
60e0cd04 41static char *sync_from;
05ac209a 42static struct uuid server_uuid;
5e8bc3c5 43static struct jsonrpc_session *session;
23c16b51
AZ
44static unsigned int session_seqno = UINT_MAX;
45
cec7005b 46static struct jsonrpc_msg *create_monitor_request(struct ovsdb_schema *);
ae671c5f
MC
47static void add_monitored_table(struct ovsdb_table_schema *table,
48 struct json *monitor_requests);
49
23c16b51 50static struct ovsdb_error *reset_database(struct ovsdb *db);
ae671c5f 51
23c16b51 52static struct ovsdb_error *process_notification(struct json *, struct ovsdb *);
ae671c5f
MC
53static struct ovsdb_error *process_table_update(struct json *table_update,
54 const char *table_name,
55 struct ovsdb *database,
56 struct ovsdb_txn *txn);
57
58static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn,
1cb0f53c 59 const struct uuid *row_uuid,
ae671c5f
MC
60 struct ovsdb_table *table,
61 struct json *new);
62static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn,
1cb0f53c 63 const struct uuid *row_uuid,
ae671c5f
MC
64 struct ovsdb_table *table);
65static struct ovsdb_error *execute_update(struct ovsdb_txn *txn,
1cb0f53c 66 const struct uuid *row_uuid,
ae671c5f
MC
67 struct ovsdb_table *table,
68 struct json *new);
3109b4e1
AZ
69\f
70/* Maps from db name to sset of table names. */
71static struct shash blacklist_tables = SHASH_INITIALIZER(&blacklist_tables);
72
73static void blacklist_tables_clear(void);
74static void blacklist_tables_add(const char *database, const char *table);
75static bool blacklist_tables_find(const char *database, const char* table);
76
ae671c5f 77\f
599c0f44
AZ
78/* Keep track of request IDs of all outstanding OVSDB requests. */
79static struct hmap request_ids = HMAP_INITIALIZER(&request_ids);
80
81struct request_ids_hmap_node {
82 struct hmap_node hmap;
83 struct json *request_id;
84 struct ovsdb *db; /* associated database */
85};
86void request_ids_add(const struct json *id, struct ovsdb *db);
87bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db);
88static void request_ids_destroy(void);
89void request_ids_clear(void);
90\f
23c16b51 91enum ovsdb_replication_state {
60e0cd04 92 RPL_S_INIT,
05ac209a 93 RPL_S_SERVER_ID_REQUESTED,
23c16b51
AZ
94 RPL_S_DB_REQUESTED,
95 RPL_S_SCHEMA_REQUESTED,
96 RPL_S_MONITOR_REQUESTED,
97 RPL_S_REPLICATING,
98 RPL_S_ERR /* Error, no longer replicating. */
99};
100static enum ovsdb_replication_state state;
101
102\f
cec7005b
NS
103struct replication_db {
104 struct ovsdb *db;
105 bool schema_version_higher;
106 /* Points to the schema received from the active server if
107 * the local db schema version is higher. NULL otherwise. */
108 struct ovsdb_schema *active_db_schema;
109};
110
111static bool is_replication_possible(struct ovsdb_schema *local_db_schema,
112 struct ovsdb_schema *active_db_schema);
113
23c16b51
AZ
114/* All DBs known to ovsdb-server. The actual replication dbs are stored
115 * in 'replication dbs', which is a subset of all dbs and remote dbs whose
116 * schema matches. */
117static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
5e8bc3c5 118static struct shash *replication_dbs;
599c0f44 119
cec7005b 120static struct shash *replication_dbs_create(void);
5e8bc3c5 121static void replication_dbs_destroy(void);
6ab3dd96 122/* Find 'struct ovsdb' by name within 'replication_dbs' */
cec7005b 123static struct replication_db *find_db(const char *db_name);
6ab3dd96
AZ
124\f
125
7a9d65d2 126void
05ac209a 127replication_init(const char *sync_from_, const char *exclude_tables,
e988b8ab 128 const struct uuid *server, int probe_interval)
7a9d65d2 129{
60e0cd04
AZ
130 free(sync_from);
131 sync_from = xstrdup(sync_from_);
60e0cd04
AZ
132 /* Caller should have verified that the 'exclude_tables' is
133 * parseable. An error here is unexpected. */
500db308 134 ovs_assert(!set_blacklist_tables(exclude_tables, false));
60e0cd04 135
5e8bc3c5 136 replication_dbs_destroy();
23c16b51
AZ
137
138 shash_clear(&local_dbs);
139 if (session) {
140 jsonrpc_session_close(session);
3109b4e1 141 }
23c16b51 142
60e0cd04 143 session = jsonrpc_session_open(sync_from, true);
23c16b51 144 session_seqno = UINT_MAX;
05ac209a 145
e988b8ab
NS
146 jsonrpc_session_set_probe_interval(session, probe_interval);
147
05ac209a
AZ
148 /* Keep a copy of local server uuid. */
149 server_uuid = *server;
150
60e0cd04 151 state = RPL_S_INIT;
7a9d65d2
MC
152}
153
ae671c5f 154void
23c16b51 155replication_add_local_db(const char *database, struct ovsdb *db)
6ab3dd96 156{
23c16b51 157 shash_add_assert(&local_dbs, database, db);
6ab3dd96
AZ
158}
159
71f21279
BP
160static void
161send_schema_requests(const struct json *result)
162{
fa37affa
BP
163 for (size_t i = 0; i < result->array.n; i++) {
164 const struct json *name = result->array.elems[i];
71f21279
BP
165 if (name->type == JSON_STRING) {
166 /* Send one schema request for each remote DB. */
167 const char *db_name = json_string(name);
cec7005b
NS
168 struct replication_db *rdb = find_db(db_name);
169 if (rdb) {
71f21279
BP
170 struct jsonrpc_msg *request =
171 jsonrpc_create_request(
172 "get_schema",
173 json_array_create_1(
174 json_string_create(db_name)),
175 NULL);
176
cec7005b 177 request_ids_add(request->id, rdb->db);
71f21279
BP
178 jsonrpc_session_send(session, request);
179 }
180 }
181 }
182}
183
6ab3dd96
AZ
184void
185replication_run(void)
ae671c5f 186{
23c16b51
AZ
187 if (!session) {
188 return;
189 }
190
191 jsonrpc_session_run(session);
192
193 for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
194 struct jsonrpc_msg *msg;
195 unsigned int seqno;
196
197 seqno = jsonrpc_session_get_seqno(session);
60e0cd04 198 if (seqno != session_seqno || state == RPL_S_INIT) {
23c16b51
AZ
199 session_seqno = seqno;
200 request_ids_clear();
201 struct jsonrpc_msg *request;
05ac209a 202 request = jsonrpc_create_request("get_server_id",
23c16b51
AZ
203 json_array_create_empty(), NULL);
204 request_ids_add(request->id, NULL);
205 jsonrpc_session_send(session, request);
206
05ac209a
AZ
207 state = RPL_S_SERVER_ID_REQUESTED;
208 VLOG_DBG("send server ID request.");
ae671c5f
MC
209 }
210
23c16b51
AZ
211 msg = jsonrpc_session_recv(session);
212 if (!msg) {
213 continue;
ae671c5f
MC
214 }
215
23c16b51
AZ
216 if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR
217 && !strcmp(msg->method, "update")) {
218 if (msg->params->type == JSON_ARRAY
fa37affa
BP
219 && msg->params->array.n == 2
220 && msg->params->array.elems[0]->type == JSON_STRING) {
221 char *db_name = msg->params->array.elems[0]->string;
cec7005b
NS
222 struct replication_db *rdb = find_db(db_name);
223 if (rdb) {
23c16b51 224 struct ovsdb_error *error;
fa37affa 225 error = process_notification(msg->params->array.elems[1],
cec7005b 226 rdb->db);
23c16b51
AZ
227 if (error) {
228 ovsdb_error_assert(error);
229 state = RPL_S_ERR;
230 }
231 }
232 }
233 } else if (msg->type == JSONRPC_REPLY) {
cec7005b 234 struct replication_db *rdb;
23c16b51
AZ
235 struct ovsdb *db;
236 if (!request_ids_lookup_and_free(msg->id, &db)) {
237 VLOG_WARN("received unexpected reply");
238 goto next;
239 }
240
241 switch (state) {
05ac209a
AZ
242 case RPL_S_SERVER_ID_REQUESTED: {
243 struct uuid uuid;
244 if (msg->result->type != JSON_STRING ||
245 !uuid_from_string(&uuid, json_string(msg->result))) {
246 struct ovsdb_error *error;
247 error = ovsdb_error("get_server_id failed",
248 "Server ID is not valid UUID");
249
250 ovsdb_error_assert(error);
251 state = RPL_S_ERR;
252 break;
253 }
254
255 if (uuid_equals(&uuid, &server_uuid)) {
256 struct ovsdb_error *error;
257 error = ovsdb_error("Server ID check failed",
258 "Self replicating is not allowed");
259
260 ovsdb_error_assert(error);
261 state = RPL_S_ERR;
262 break;
263 }
264
265 struct jsonrpc_msg *request;
266 request = jsonrpc_create_request("list_dbs",
267 json_array_create_empty(),
268 NULL);
269 request_ids_add(request->id, NULL);
270 jsonrpc_session_send(session, request);
271
272 replication_dbs_destroy();
cec7005b 273 replication_dbs = replication_dbs_create();
05ac209a
AZ
274 state = RPL_S_DB_REQUESTED;
275 break;
276 }
23c16b51
AZ
277 case RPL_S_DB_REQUESTED:
278 if (msg->result->type != JSON_ARRAY) {
279 struct ovsdb_error *error;
05ac209a 280 error = ovsdb_error("list_dbs failed",
23c16b51
AZ
281 "list_dbs response is not array");
282 ovsdb_error_assert(error);
283 state = RPL_S_ERR;
284 } else {
71f21279 285 send_schema_requests(msg->result);
60e0cd04 286 VLOG_DBG("Send schema requests");
23c16b51
AZ
287 state = RPL_S_SCHEMA_REQUESTED;
288 }
289 break;
290
291 case RPL_S_SCHEMA_REQUESTED: {
292 struct ovsdb_schema *schema;
293 struct ovsdb_error *error;
294
295 error = ovsdb_schema_from_json(msg->result, &schema);
296 if (error) {
297 ovsdb_error_assert(error);
298 state = RPL_S_ERR;
299 }
300
cec7005b
NS
301 rdb = find_db(schema->name);
302 if (!rdb) {
23c16b51
AZ
303 /* Unexpected schema. */
304 VLOG_WARN("unexpected schema %s", schema->name);
305 state = RPL_S_ERR;
cec7005b 306 } else if (!ovsdb_schema_equal(schema, rdb->db->schema)) {
23c16b51 307 /* Schmea version mismatch. */
cec7005b
NS
308 VLOG_INFO("Schema version mismatch, checking if %s can "
309 "still be replicated or not.",
23c16b51 310 schema->name);
cec7005b
NS
311 if (is_replication_possible(rdb->db->schema, schema)) {
312 VLOG_INFO("%s can be replicated.", schema->name);
313 rdb->schema_version_higher = true;
314 if (rdb->active_db_schema) {
315 ovsdb_schema_destroy(rdb->active_db_schema);
316 }
317 rdb->active_db_schema = schema;
318 } else {
319 VLOG_INFO("%s cannot be replicated.", schema->name);
320 struct replication_db *r =
321 shash_find_and_delete(replication_dbs,
322 schema->name);
323 if (r->active_db_schema) {
324 ovsdb_schema_destroy(r->active_db_schema);
325 }
326 free(r);
327 ovsdb_schema_destroy(schema);
328 }
329 } else {
330 ovsdb_schema_destroy(schema);
23c16b51 331 }
23c16b51
AZ
332
333 /* After receiving schemas, reset the local databases that
334 * will be monitored and send out monitor requests for them. */
335 if (hmap_is_empty(&request_ids)) {
ecf44dd3 336 struct shash_node *node;
23c16b51
AZ
337
338 if (shash_is_empty(replication_dbs)) {
339 VLOG_WARN("Nothing to replicate.");
340 state = RPL_S_ERR;
341 } else {
342 SHASH_FOR_EACH (node, replication_dbs) {
cec7005b 343 rdb = node->data;
23c16b51 344 struct jsonrpc_msg *request =
cec7005b
NS
345 create_monitor_request(
346 rdb->schema_version_higher ?
347 rdb->active_db_schema : rdb->db->schema);
23c16b51 348
cec7005b 349 request_ids_add(request->id, rdb->db);
23c16b51 350 jsonrpc_session_send(session, request);
60e0cd04 351 VLOG_DBG("Send monitor requests");
23c16b51
AZ
352 state = RPL_S_MONITOR_REQUESTED;
353 }
354 }
355 }
356 break;
357 }
358
359 case RPL_S_MONITOR_REQUESTED: {
360 /* Reply to monitor requests. */
361 struct ovsdb_error *error;
ecf44dd3
NS
362 VLOG_INFO("Monitor request received. Resetting the database");
363 /* Resetting the database here has few risks. If the
364 * process_notification() fails, the database is completely
365 * lost locally. In case that node becomes active, then
366 * there is a chance of complete data loss in the active/standy
367 * cluster. */
368 error = reset_database(db);
369 if (!error) {
370 error = process_notification(msg->result, db);
371 }
23c16b51
AZ
372 if (error) {
373 ovsdb_error_assert(error);
374 state = RPL_S_ERR;
375 } else {
376 /* Transition to replicating state after receiving
377 * all replies of "monitor" requests. */
378 if (hmap_is_empty(&request_ids)) {
60e0cd04 379 VLOG_DBG("Listening to monitor updates");
23c16b51
AZ
380 state = RPL_S_REPLICATING;
381 }
382 }
383 break;
384 }
385
386 case RPL_S_ERR:
387 /* Ignore all messages */
388 break;
389
60e0cd04 390 case RPL_S_INIT:
23c16b51
AZ
391 case RPL_S_REPLICATING:
392 default:
393 OVS_NOT_REACHED();
394 }
395 }
396 next:
397 jsonrpc_msg_destroy(msg);
ae671c5f
MC
398 }
399}
400
8c945cec
AZ
401void
402replication_wait(void)
403{
23c16b51
AZ
404 if (session) {
405 jsonrpc_session_wait(session);
406 jsonrpc_session_recv_wait(session);
8c945cec
AZ
407 }
408}
409
3109b4e1
AZ
410/* Parse 'blacklist' to rebuild 'blacklist_tables'. If 'dryrun' is false, the
411 * current black list tables will be wiped out, regardless of whether
412 * 'blacklist' can be parsed. If 'dryrun' is true, only parses 'blacklist' and
413 * reports any errors, without modifying the blacklist.
414 *
415 * On error, returns the error string, which the caller is
416 * responsible for freeing. Returns NULL otherwise. */
417char * OVS_WARN_UNUSED_RESULT
418set_blacklist_tables(const char *blacklist, bool dryrun)
7a9d65d2 419{
3109b4e1
AZ
420 struct sset set = SSET_INITIALIZER(&set);
421 char *err = NULL;
422
7a9d65d2 423 if (blacklist) {
3109b4e1
AZ
424 const char *longname;
425
426 if (!dryrun) {
6ab3dd96 427 /* Can only add to an empty shash. */
3109b4e1
AZ
428 blacklist_tables_clear();
429 }
430
431 sset_from_delimited_string(&set, blacklist, " ,");
432 SSET_FOR_EACH (longname, &set) {
433 char *database = xstrdup(longname), *table = NULL;
434 strtok_r(database, ":", &table);
435 if (table && !dryrun) {
436 blacklist_tables_add(database, table);
437 }
438
439 free(database);
440 if (!table) {
441 err = xasprintf("Can't parse black list table: %s", longname);
442 goto done;
443 }
444 }
445 }
446
447done:
448 sset_destroy(&set);
449 if (err && !dryrun) {
450 /* On error, destroy the partially built 'blacklist_tables'. */
451 blacklist_tables_clear();
452 }
453 return err;
454}
455
456char * OVS_WARN_UNUSED_RESULT
457get_blacklist_tables(void)
458{
459 struct shash_node *node;
460 struct sset set = SSET_INITIALIZER(&set);
461
462 SHASH_FOR_EACH (node, &blacklist_tables) {
463 const char *database = node->name;
464 const char *table;
465 struct sset *tables = node->data;
466
467 SSET_FOR_EACH (table, tables) {
468 sset_add_and_free(&set, xasprintf("%s:%s", database, table));
469 }
470 }
471
472 /* Output the table list in an sorted order, so that
473 * the output string will not depend on the hash function
474 * that used to implement the hmap data structure. This is
475 * only useful for writting unit tests. */
476 const char **sorted = sset_sort(&set);
477 struct ds ds = DS_EMPTY_INITIALIZER;
478 size_t i;
479 for (i = 0; i < sset_count(&set); i++) {
480 ds_put_format(&ds, "%s,", sorted[i]);
7a9d65d2 481 }
3109b4e1
AZ
482
483 ds_chomp(&ds, ',');
484
485 free(sorted);
486 sset_destroy(&set);
487
488 return ds_steal_cstr(&ds);
7a9d65d2
MC
489}
490
3109b4e1
AZ
491static void
492blacklist_tables_clear(void)
9dc05cdc 493{
3109b4e1
AZ
494 struct shash_node *node;
495 SHASH_FOR_EACH (node, &blacklist_tables) {
496 struct sset *tables = node->data;
497 sset_destroy(tables);
498 }
499
500 shash_clear_free_data(&blacklist_tables);
501}
502
503static void
504blacklist_tables_add(const char *database, const char *table)
505{
506 struct sset *tables = shash_find_data(&blacklist_tables, database);
507
508 if (!tables) {
509 tables = xmalloc(sizeof *tables);
510 sset_init(tables);
511 shash_add(&blacklist_tables, database, tables);
512 }
513
514 sset_add(tables, table);
515}
516
517static bool
518blacklist_tables_find(const char *database, const char *table)
519{
520 struct sset *tables = shash_find_data(&blacklist_tables, database);
521 return tables && sset_contains(tables, table);
9dc05cdc
MC
522}
523
ae671c5f 524void
f53d7518 525disconnect_active_server(void)
c9c5c9e2 526{
23c16b51
AZ
527 jsonrpc_session_close(session);
528 session = NULL;
c9c5c9e2
MC
529}
530
531void
3109b4e1 532replication_destroy(void)
ae671c5f 533{
3109b4e1
AZ
534 blacklist_tables_clear();
535 shash_destroy(&blacklist_tables);
ae671c5f 536
60e0cd04
AZ
537 if (sync_from) {
538 free(sync_from);
539 sync_from = NULL;
ae671c5f 540 }
599c0f44
AZ
541
542 request_ids_destroy();
5e8bc3c5 543 replication_dbs_destroy();
23c16b51
AZ
544
545 shash_destroy(&local_dbs);
ae671c5f
MC
546}
547
cec7005b 548static struct replication_db *
6ab3dd96 549find_db(const char *db_name)
ae671c5f 550{
23c16b51 551 return shash_find_data(replication_dbs, db_name);
ae671c5f
MC
552}
553\f
554static struct ovsdb_error *
23c16b51 555reset_database(struct ovsdb *db)
ae671c5f 556{
23c16b51 557 struct ovsdb_txn *txn = ovsdb_txn_create(db);
ae671c5f
MC
558 struct shash_node *table_node;
559
560 SHASH_FOR_EACH (table_node, &db->tables) {
3109b4e1
AZ
561 /* Delete all rows if the table is not blacklisted. */
562 if (!blacklist_tables_find(db->schema->name, table_node->name)) {
563 struct ovsdb_table *table = table_node->data;
45b4b20a
BP
564 struct ovsdb_row *row, *next;
565 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &table->rows) {
7a9d65d2
MC
566 ovsdb_txn_row_delete(txn, row);
567 }
ae671c5f
MC
568 }
569 }
ae671c5f 570
1b1d2e6d 571 return ovsdb_txn_propose_commit_block(txn, false);
ae671c5f
MC
572}
573
23c16b51
AZ
574/* Create a monitor request for 'db'. The monitor request will include
575 * any tables from 'blacklisted_tables'
576 *
577 * Caller is responsible for disposing 'request'.
578 */
579static struct jsonrpc_msg *
cec7005b 580create_monitor_request(struct ovsdb_schema *schema)
ae671c5f 581{
23c16b51
AZ
582 struct jsonrpc_msg *request;
583 struct json *monitor;
23c16b51 584 const char *db_name = schema->name;
ae671c5f 585
23c16b51
AZ
586 struct json *monitor_request = json_object_create();
587 size_t n = shash_count(&schema->tables);
588 const struct shash_node **nodes = shash_sort(&schema->tables);
ae671c5f 589
23c16b51
AZ
590 for (int j = 0; j < n; j++) {
591 struct ovsdb_table_schema *table = nodes[j]->data;
ae671c5f 592
23c16b51
AZ
593 /* Monitor all tables not blacklisted. */
594 if (!blacklist_tables_find(db_name, table->name)) {
595 add_monitored_table(table, monitor_request);
ae671c5f
MC
596 }
597 }
23c16b51 598 free(nodes);
ae671c5f 599
23c16b51
AZ
600 /* Create a monitor request. */
601 monitor = json_array_create_3(
602 json_string_create(db_name),
603 json_string_create(db_name),
604 monitor_request);
605 request = jsonrpc_create_request("monitor", monitor, NULL);
0cedc9db 606
23c16b51 607 return request;
ae671c5f
MC
608}
609
610static void
611add_monitored_table(struct ovsdb_table_schema *table,
612 struct json *monitor_request)
613{
614 struct json *monitor_request_array;
615
ae671c5f
MC
616 monitor_request_array = json_array_create_empty();
617 json_array_add(monitor_request_array, json_object_create());
618
619 json_object_put(monitor_request, table->name, monitor_request_array);
620}
ae671c5f 621
23c16b51
AZ
622\f
623static struct ovsdb_error *
6ab3dd96 624process_notification(struct json *table_updates, struct ovsdb *db)
ae671c5f 625{
23c16b51 626 struct ovsdb_error *error = NULL;
ae671c5f
MC
627 struct ovsdb_txn *txn;
628
23c16b51
AZ
629 if (table_updates->type == JSON_OBJECT) {
630 txn = ovsdb_txn_create(db);
631
632 /* Process each table update. */
633 struct shash_node *node;
634 SHASH_FOR_EACH (node, json_object(table_updates)) {
635 struct json *table_update = node->data;
636 if (table_update) {
637 error = process_table_update(table_update, node->name, db, txn);
638 if (error) {
639 break;
640 }
ae671c5f
MC
641 }
642 }
ae671c5f 643
23c16b51
AZ
644 if (error) {
645 ovsdb_txn_abort(txn);
646 return error;
647 } else {
648 /* Commit transaction. */
1b1d2e6d 649 error = ovsdb_txn_propose_commit_block(txn, false);
23c16b51 650 }
ae671c5f
MC
651 }
652
23c16b51 653 return error;
ae671c5f
MC
654}
655
656static struct ovsdb_error *
657process_table_update(struct json *table_update, const char *table_name,
658 struct ovsdb *database, struct ovsdb_txn *txn)
659{
1cb0f53c
BP
660 struct ovsdb_table *table = ovsdb_get_table(database, table_name);
661 if (!table) {
662 return ovsdb_error("unknown table", "unknown table %s", table_name);
663 }
ae671c5f
MC
664
665 if (table_update->type != JSON_OBJECT) {
43898d4c
WT
666 return ovsdb_error("Not a JSON object",
667 "<table-update> for table is not object");
ae671c5f
MC
668 }
669
1cb0f53c 670 struct shash_node *node;
ae671c5f
MC
671 SHASH_FOR_EACH (node, json_object(table_update)) {
672 struct json *row_update = node->data;
673 struct json *old, *new;
674
675 if (row_update->type != JSON_OBJECT) {
1cb0f53c
BP
676 return ovsdb_error("Not a JSON object",
677 "<row-update> is not object");
678 }
679
680 struct uuid uuid;
681 if (!uuid_from_string(&uuid, node->name)) {
682 return ovsdb_syntax_error(table_update, "bad row UUID",
683 "<table-update> names must be UUIDs");
ae671c5f 684 }
1cb0f53c 685
ae671c5f
MC
686 old = shash_find_data(json_object(row_update), "old");
687 new = shash_find_data(json_object(row_update), "new");
688
1cb0f53c
BP
689 struct ovsdb_error *error;
690 error = (!new ? execute_delete(txn, &uuid, table)
691 : !old ? execute_insert(txn, &uuid, table, new)
692 : execute_update(txn, &uuid, table, new));
c9c5c9e2 693 if (error) {
1cb0f53c 694 return error;
c9c5c9e2 695 }
ae671c5f 696 }
1cb0f53c 697 return NULL;
ae671c5f
MC
698}
699\f
700static struct ovsdb_error *
1cb0f53c 701execute_insert(struct ovsdb_txn *txn, const struct uuid *row_uuid,
ae671c5f
MC
702 struct ovsdb_table *table, struct json *json_row)
703{
1cb0f53c
BP
704 struct ovsdb_row *row = ovsdb_row_create(table);
705 struct ovsdb_error *error = ovsdb_row_from_json(row, json_row, NULL, NULL);
ae671c5f 706 if (!error) {
1cb0f53c 707 *ovsdb_row_get_uuid_rw(row) = *row_uuid;
ae671c5f
MC
708 ovsdb_txn_row_insert(txn, row);
709 } else {
1cb0f53c
BP
710 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
711 VLOG_WARN_RL(&rl, "cannot add existing row "UUID_FMT" to table %s",
712 UUID_ARGS(row_uuid), table->schema->name);
ae671c5f
MC
713 ovsdb_row_destroy(row);
714 }
715
716 return error;
717}
718
ae671c5f 719static struct ovsdb_error *
1cb0f53c 720execute_delete(struct ovsdb_txn *txn, const struct uuid *row_uuid,
ae671c5f
MC
721 struct ovsdb_table *table)
722{
1cb0f53c
BP
723 const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
724 if (row) {
725 ovsdb_txn_row_delete(txn, row);
726 } else {
727 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
728 VLOG_WARN_RL(&rl, "cannot delete missing row "UUID_FMT" from table %s",
729 UUID_ARGS(row_uuid), table->schema->name);
ae671c5f 730 }
1cb0f53c 731 return NULL;
ae671c5f
MC
732}
733
734static struct ovsdb_error *
1cb0f53c 735execute_update(struct ovsdb_txn *txn, const struct uuid *row_uuid,
ae671c5f
MC
736 struct ovsdb_table *table, struct json *json_row)
737{
1cb0f53c
BP
738 const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
739 if (!row) {
740 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
741 VLOG_WARN_RL(&rl, "cannot modify missing row "UUID_FMT" in table %s",
742 UUID_ARGS(row_uuid), table->schema->name);
743 return NULL;
ae671c5f 744 }
1cb0f53c
BP
745
746 struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
747 struct ovsdb_row *update = ovsdb_row_create(table);
748 struct ovsdb_error *error = ovsdb_row_from_json(update, json_row,
749 NULL, &columns);
750
751 if (!error && !ovsdb_row_equal_columns(row, update, &columns)) {
752 ovsdb_row_update_columns(ovsdb_txn_row_modify(txn, row),
753 update, &columns);
ae671c5f
MC
754 }
755
ae671c5f 756 ovsdb_column_set_destroy(&columns);
1cb0f53c 757 ovsdb_row_destroy(update);
ae671c5f
MC
758 return error;
759}
760
599c0f44
AZ
761void
762request_ids_add(const struct json *id, struct ovsdb *db)
763{
764 struct request_ids_hmap_node *node = xmalloc(sizeof *node);
765
766 node->request_id = json_clone(id);
767 node->db = db;
768 hmap_insert(&request_ids, &node->hmap, json_hash(id, 0));
769}
770
771/* Look up 'id' from 'request_ids', if found, remove the found id from
772 * 'request_ids' and free its memory. If not found, 'request_ids' does
773 * not change. Sets '*db' to the database for the request (NULL if not
774 * found).
775 *
776 * Return true if 'id' is found, false otherwise.
777 */
778bool
779request_ids_lookup_and_free(const struct json *id, struct ovsdb **db)
780{
781 struct request_ids_hmap_node *node;
782
783 HMAP_FOR_EACH_WITH_HASH (node, hmap, json_hash(id, 0), &request_ids) {
784 if (json_equal(id, node->request_id)) {
785 hmap_remove(&request_ids, &node->hmap);
786 *db = node->db;
787 json_destroy(node->request_id);
788 free(node);
789 return true;
790 }
791 }
792
793 *db = NULL;
794 return false;
795}
796
797static void
798request_ids_destroy(void)
799{
800 struct request_ids_hmap_node *node;
801
802 HMAP_FOR_EACH_POP (node, hmap, &request_ids) {
803 json_destroy(node->request_id);
804 free(node);
805 }
806 hmap_destroy(&request_ids);
807}
808
809void
810request_ids_clear(void)
811{
812 request_ids_destroy();
813 hmap_init(&request_ids);
814}
815
23c16b51 816static struct shash *
cec7005b 817replication_dbs_create(void)
23c16b51
AZ
818{
819 struct shash *new = xmalloc(sizeof *new);
820 shash_init(new);
821
822 struct shash_node *node;
cec7005b
NS
823 SHASH_FOR_EACH (node, &local_dbs) {
824 struct replication_db *repl_db = xmalloc(sizeof *repl_db);
825 repl_db->db = node->data;
826 repl_db->schema_version_higher = false;
827 repl_db->active_db_schema = NULL;
828 shash_add(new, node->name, repl_db);
23c16b51
AZ
829 }
830
831 return new;
832}
833
5e8bc3c5
AZ
834static void
835replication_dbs_destroy(void)
836{
cec7005b
NS
837 if (!replication_dbs) {
838 return;
839 }
840
841 struct shash_node *node, *next;
842
843 SHASH_FOR_EACH_SAFE (node, next, replication_dbs) {
844 hmap_remove(&replication_dbs->map, &node->node);
845 struct replication_db *rdb = node->data;
846 if (rdb->active_db_schema) {
847 ovsdb_schema_destroy(rdb->active_db_schema);
848 }
849 free(rdb);
850 free(node->name);
851 free(node);
852 }
853
854 hmap_destroy(&replication_dbs->map);
5e8bc3c5
AZ
855 free(replication_dbs);
856 replication_dbs = NULL;
857}
858
23c16b51
AZ
859/* Return true if replication just started or is ongoing.
860 * Return false if the connection failed, or the replication
861 * was not able to start. */
862bool
863replication_is_alive(void)
864{
865 if (session) {
866 return jsonrpc_session_is_alive(session) && state != RPL_S_ERR;
867 }
868 return false;
869}
870
871/* Return the last error reported on a connection by 'session'. The
872 * return value is 0 if replication is not currently running, or
873 * if replication session has not encountered any error.
874 *
875 * Return a negative value if replication session has error, or the
876 * replication was not able to start. */
877int
878replication_get_last_error(void)
879{
880 int err = 0;
881
882 if (session) {
883 err = jsonrpc_session_get_last_error(session);
884 if (!err) {
885 err = (state == RPL_S_ERR) ? ENOENT : 0;
886 }
887 }
888
889 return err;
890}
891
60e0cd04
AZ
892char *
893replication_status(void)
894{
895 bool alive = session && jsonrpc_session_is_alive(session);
896 struct ds ds = DS_EMPTY_INITIALIZER;
897
898 if (alive) {
899 switch(state) {
900 case RPL_S_INIT:
05ac209a 901 case RPL_S_SERVER_ID_REQUESTED:
60e0cd04
AZ
902 case RPL_S_DB_REQUESTED:
903 case RPL_S_SCHEMA_REQUESTED:
904 case RPL_S_MONITOR_REQUESTED:
905 ds_put_format(&ds, "connecting: %s", sync_from);
906 break;
907 case RPL_S_REPLICATING: {
908 struct shash_node *node;
909
910 ds_put_format(&ds, "replicating: %s\n", sync_from);
911 ds_put_cstr(&ds, "database:");
912 SHASH_FOR_EACH (node, replication_dbs) {
913 ds_put_format(&ds, " %s,", node->name);
914 }
915 ds_chomp(&ds, ',');
916
917 if (!shash_is_empty(&blacklist_tables)) {
918 ds_put_char(&ds, '\n');
919 ds_put_cstr(&ds, "exclude: ");
920 ds_put_and_free_cstr(&ds, get_blacklist_tables());
921 }
922 break;
923 }
924 case RPL_S_ERR:
925 ds_put_format(&ds, "Replication to (%s) failed\n", sync_from);
926 break;
927 default:
928 OVS_NOT_REACHED();
60e0cd04
AZ
929 }
930 } else {
931 ds_put_format(&ds, "not connected to %s", sync_from);
932 }
933 return ds_steal_cstr(&ds);
934}
935
cec7005b
NS
936/* Checks if it's possible to replicate to the local db from the active db
937 * schema. Returns true, if 'local_db_schema' has all the tables and columns
938 * of 'active_db_schema', false otherwise.
939 */
940static bool
941is_replication_possible(struct ovsdb_schema *local_db_schema,
942 struct ovsdb_schema *active_db_schema)
943{
944 struct shash_node *node;
945 SHASH_FOR_EACH (node, &active_db_schema->tables) {
946 struct ovsdb_table_schema *ldb_table_schema =
947 shash_find_data(&local_db_schema->tables, node->name);
948 if (!ldb_table_schema) {
949 VLOG_INFO("Table %s not present in the local db schema",
950 node->name);
951 return false;
952 }
953
954 /* Local schema table should have all the columns
955 * of active schema table. */
956 struct ovsdb_table_schema *adb_table_schema = node->data;
957 struct shash_node *n;
958 SHASH_FOR_EACH (n, &adb_table_schema->columns) {
959 struct ovsdb_column *ldb_col =
960 shash_find_data(&ldb_table_schema->columns, n->name);
961 if (!ldb_col) {
962 VLOG_INFO("Column %s not present in the local "
963 "db schema table %s.", n->name, node->name);
964 return false;
965 }
966
967 struct json *ldb_col_json = ovsdb_column_to_json(ldb_col);
968 struct json *adb_col_json = ovsdb_column_to_json(n->data);
969 bool cols_equal = json_equal(ldb_col_json, adb_col_json);
970 json_destroy(ldb_col_json);
971 json_destroy(adb_col_json);
972
973 if (!cols_equal) {
974 VLOG_INFO("Column %s mismatch in local "
975 "db schema table %s.", n->name, node->name);
976 return false;
977 }
978 }
979 }
980
981 return true;
982}
983
e988b8ab
NS
984void
985replication_set_probe_interval(int probe_interval)
986{
987 if (session) {
988 jsonrpc_session_set_probe_interval(session, probe_interval);
989 }
990}
991
ae671c5f
MC
992void
993replication_usage(void)
994{
995 printf("\n\
996Syncing options:\n\
60e0cd04
AZ
997 --sync-from=SERVER sync DATABASE from active SERVER and start in\n\
998 backup mode (except with --active)\n\
ae671c5f 999 --sync-exclude-tables=DB:TABLE,...\n\
60e0cd04
AZ
1000 exclude the TABLE in DB from syncing\n\
1001 --active with --sync-from, start in active mode\n");
ae671c5f 1002}