]> git.proxmox.com Git - mirror_ovs.git/blame - ovsdb/replication.c
stream: Allow timeout configuration for open_block.
[mirror_ovs.git] / ovsdb / replication.c
CommitLineData
ae671c5f 1/*
1b1d2e6d 2 * (c) Copyright 2016, 2017 Hewlett Packard Enterprise Development LP
05ac209a 3 * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016, 2017 Nicira, Inc.
ae671c5f
MC
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#include <config.h>
19
ae671c5f
MC
20
21#include "condition.h"
599c0f44 22#include "jsonrpc.h"
3109b4e1 23#include "openvswitch/dynamic-string.h"
599c0f44 24#include "openvswitch/hmap.h"
ee89ea7b 25#include "openvswitch/json.h"
8be42087 26#include "openvswitch/vlog.h"
ae671c5f 27#include "ovsdb-error.h"
599c0f44 28#include "ovsdb.h"
ae671c5f 29#include "query.h"
599c0f44 30#include "replication.h"
ae671c5f 31#include "row.h"
ae671c5f 32#include "sset.h"
599c0f44 33#include "stream.h"
ae671c5f
MC
34#include "svec.h"
35#include "table.h"
36#include "transaction.h"
05ac209a 37#include "uuid.h"
ae671c5f 38
8be42087
AZ
39VLOG_DEFINE_THIS_MODULE(replication);
40
60e0cd04 41static char *sync_from;
05ac209a 42static struct uuid server_uuid;
5e8bc3c5 43static struct jsonrpc_session *session;
23c16b51
AZ
44static unsigned int session_seqno = UINT_MAX;
45
46static struct jsonrpc_msg *create_monitor_request(struct ovsdb *db);
ae671c5f
MC
47static void add_monitored_table(struct ovsdb_table_schema *table,
48 struct json *monitor_requests);
49
23c16b51 50static struct ovsdb_error *reset_database(struct ovsdb *db);
ae671c5f 51
23c16b51 52static struct ovsdb_error *process_notification(struct json *, struct ovsdb *);
ae671c5f
MC
53static struct ovsdb_error *process_table_update(struct json *table_update,
54 const char *table_name,
55 struct ovsdb *database,
56 struct ovsdb_txn *txn);
57
58static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn,
1cb0f53c 59 const struct uuid *row_uuid,
ae671c5f
MC
60 struct ovsdb_table *table,
61 struct json *new);
62static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn,
1cb0f53c 63 const struct uuid *row_uuid,
ae671c5f
MC
64 struct ovsdb_table *table);
65static struct ovsdb_error *execute_update(struct ovsdb_txn *txn,
1cb0f53c 66 const struct uuid *row_uuid,
ae671c5f
MC
67 struct ovsdb_table *table,
68 struct json *new);
3109b4e1
AZ
69\f
70/* Maps from db name to sset of table names. */
71static struct shash blacklist_tables = SHASH_INITIALIZER(&blacklist_tables);
72
73static void blacklist_tables_clear(void);
74static void blacklist_tables_add(const char *database, const char *table);
75static bool blacklist_tables_find(const char *database, const char* table);
76
ae671c5f 77\f
599c0f44
AZ
78/* Keep track of request IDs of all outstanding OVSDB requests. */
79static struct hmap request_ids = HMAP_INITIALIZER(&request_ids);
80
81struct request_ids_hmap_node {
82 struct hmap_node hmap;
83 struct json *request_id;
84 struct ovsdb *db; /* associated database */
85};
86void request_ids_add(const struct json *id, struct ovsdb *db);
87bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db);
88static void request_ids_destroy(void);
89void request_ids_clear(void);
90\f
23c16b51 91enum ovsdb_replication_state {
60e0cd04 92 RPL_S_INIT,
05ac209a 93 RPL_S_SERVER_ID_REQUESTED,
23c16b51
AZ
94 RPL_S_DB_REQUESTED,
95 RPL_S_SCHEMA_REQUESTED,
96 RPL_S_MONITOR_REQUESTED,
97 RPL_S_REPLICATING,
98 RPL_S_ERR /* Error, no longer replicating. */
99};
100static enum ovsdb_replication_state state;
101
102\f
103/* All DBs known to ovsdb-server. The actual replication dbs are stored
104 * in 'replication dbs', which is a subset of all dbs and remote dbs whose
105 * schema matches. */
106static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
5e8bc3c5 107static struct shash *replication_dbs;
599c0f44 108
23c16b51 109static struct shash *replication_db_clone(struct shash *dbs);
5e8bc3c5 110static void replication_dbs_destroy(void);
6ab3dd96
AZ
111/* Find 'struct ovsdb' by name within 'replication_dbs' */
112static struct ovsdb* find_db(const char *db_name);
113\f
114
7a9d65d2 115void
05ac209a
AZ
116replication_init(const char *sync_from_, const char *exclude_tables,
117 const struct uuid *server)
7a9d65d2 118{
60e0cd04
AZ
119 free(sync_from);
120 sync_from = xstrdup(sync_from_);
60e0cd04
AZ
121 /* Caller should have verified that the 'exclude_tables' is
122 * parseable. An error here is unexpected. */
500db308 123 ovs_assert(!set_blacklist_tables(exclude_tables, false));
60e0cd04 124
5e8bc3c5 125 replication_dbs_destroy();
23c16b51
AZ
126
127 shash_clear(&local_dbs);
128 if (session) {
129 jsonrpc_session_close(session);
3109b4e1 130 }
23c16b51 131
60e0cd04 132 session = jsonrpc_session_open(sync_from, true);
23c16b51 133 session_seqno = UINT_MAX;
05ac209a
AZ
134
135 /* Keep a copy of local server uuid. */
136 server_uuid = *server;
137
60e0cd04 138 state = RPL_S_INIT;
7a9d65d2
MC
139}
140
ae671c5f 141void
23c16b51 142replication_add_local_db(const char *database, struct ovsdb *db)
6ab3dd96 143{
23c16b51 144 shash_add_assert(&local_dbs, database, db);
6ab3dd96
AZ
145}
146
71f21279
BP
147static void
148send_schema_requests(const struct json *result)
149{
fa37affa
BP
150 for (size_t i = 0; i < result->array.n; i++) {
151 const struct json *name = result->array.elems[i];
71f21279
BP
152 if (name->type == JSON_STRING) {
153 /* Send one schema request for each remote DB. */
154 const char *db_name = json_string(name);
155 struct ovsdb *db = find_db(db_name);
156 if (db) {
157 struct jsonrpc_msg *request =
158 jsonrpc_create_request(
159 "get_schema",
160 json_array_create_1(
161 json_string_create(db_name)),
162 NULL);
163
164 request_ids_add(request->id, db);
165 jsonrpc_session_send(session, request);
166 }
167 }
168 }
169}
170
6ab3dd96
AZ
171void
172replication_run(void)
ae671c5f 173{
23c16b51
AZ
174 if (!session) {
175 return;
176 }
177
178 jsonrpc_session_run(session);
179
180 for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
181 struct jsonrpc_msg *msg;
182 unsigned int seqno;
183
184 seqno = jsonrpc_session_get_seqno(session);
60e0cd04 185 if (seqno != session_seqno || state == RPL_S_INIT) {
23c16b51
AZ
186 session_seqno = seqno;
187 request_ids_clear();
188 struct jsonrpc_msg *request;
05ac209a 189 request = jsonrpc_create_request("get_server_id",
23c16b51
AZ
190 json_array_create_empty(), NULL);
191 request_ids_add(request->id, NULL);
192 jsonrpc_session_send(session, request);
193
05ac209a
AZ
194 state = RPL_S_SERVER_ID_REQUESTED;
195 VLOG_DBG("send server ID request.");
ae671c5f
MC
196 }
197
23c16b51
AZ
198 msg = jsonrpc_session_recv(session);
199 if (!msg) {
200 continue;
ae671c5f
MC
201 }
202
23c16b51
AZ
203 if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR
204 && !strcmp(msg->method, "update")) {
205 if (msg->params->type == JSON_ARRAY
fa37affa
BP
206 && msg->params->array.n == 2
207 && msg->params->array.elems[0]->type == JSON_STRING) {
208 char *db_name = msg->params->array.elems[0]->string;
23c16b51
AZ
209 struct ovsdb *db = find_db(db_name);
210 if (db) {
211 struct ovsdb_error *error;
fa37affa 212 error = process_notification(msg->params->array.elems[1],
23c16b51
AZ
213 db);
214 if (error) {
215 ovsdb_error_assert(error);
216 state = RPL_S_ERR;
217 }
218 }
219 }
220 } else if (msg->type == JSONRPC_REPLY) {
221 struct ovsdb *db;
222 if (!request_ids_lookup_and_free(msg->id, &db)) {
223 VLOG_WARN("received unexpected reply");
224 goto next;
225 }
226
227 switch (state) {
05ac209a
AZ
228 case RPL_S_SERVER_ID_REQUESTED: {
229 struct uuid uuid;
230 if (msg->result->type != JSON_STRING ||
231 !uuid_from_string(&uuid, json_string(msg->result))) {
232 struct ovsdb_error *error;
233 error = ovsdb_error("get_server_id failed",
234 "Server ID is not valid UUID");
235
236 ovsdb_error_assert(error);
237 state = RPL_S_ERR;
238 break;
239 }
240
241 if (uuid_equals(&uuid, &server_uuid)) {
242 struct ovsdb_error *error;
243 error = ovsdb_error("Server ID check failed",
244 "Self replicating is not allowed");
245
246 ovsdb_error_assert(error);
247 state = RPL_S_ERR;
248 break;
249 }
250
251 struct jsonrpc_msg *request;
252 request = jsonrpc_create_request("list_dbs",
253 json_array_create_empty(),
254 NULL);
255 request_ids_add(request->id, NULL);
256 jsonrpc_session_send(session, request);
257
258 replication_dbs_destroy();
259 replication_dbs = replication_db_clone(&local_dbs);
260 state = RPL_S_DB_REQUESTED;
261 break;
262 }
23c16b51
AZ
263 case RPL_S_DB_REQUESTED:
264 if (msg->result->type != JSON_ARRAY) {
265 struct ovsdb_error *error;
05ac209a 266 error = ovsdb_error("list_dbs failed",
23c16b51
AZ
267 "list_dbs response is not array");
268 ovsdb_error_assert(error);
269 state = RPL_S_ERR;
270 } else {
71f21279 271 send_schema_requests(msg->result);
60e0cd04 272 VLOG_DBG("Send schema requests");
23c16b51
AZ
273 state = RPL_S_SCHEMA_REQUESTED;
274 }
275 break;
276
277 case RPL_S_SCHEMA_REQUESTED: {
278 struct ovsdb_schema *schema;
279 struct ovsdb_error *error;
280
281 error = ovsdb_schema_from_json(msg->result, &schema);
282 if (error) {
283 ovsdb_error_assert(error);
284 state = RPL_S_ERR;
285 }
286
287 if (db != find_db(schema->name)) {
288 /* Unexpected schema. */
289 VLOG_WARN("unexpected schema %s", schema->name);
290 state = RPL_S_ERR;
291 } else if (!ovsdb_schema_equal(schema, db->schema)) {
292 /* Schmea version mismatch. */
293 VLOG_INFO("Schema version mismatch, %s not replicated",
294 schema->name);
295 shash_find_and_delete(replication_dbs, schema->name);
296 }
297 ovsdb_schema_destroy(schema);
298
299 /* After receiving schemas, reset the local databases that
300 * will be monitored and send out monitor requests for them. */
301 if (hmap_is_empty(&request_ids)) {
ecf44dd3 302 struct shash_node *node;
23c16b51
AZ
303
304 if (shash_is_empty(replication_dbs)) {
305 VLOG_WARN("Nothing to replicate.");
306 state = RPL_S_ERR;
307 } else {
308 SHASH_FOR_EACH (node, replication_dbs) {
309 db = node->data;
23c16b51
AZ
310 struct jsonrpc_msg *request =
311 create_monitor_request(db);
312
313 request_ids_add(request->id, db);
314 jsonrpc_session_send(session, request);
60e0cd04 315 VLOG_DBG("Send monitor requests");
23c16b51
AZ
316 state = RPL_S_MONITOR_REQUESTED;
317 }
318 }
319 }
320 break;
321 }
322
323 case RPL_S_MONITOR_REQUESTED: {
324 /* Reply to monitor requests. */
325 struct ovsdb_error *error;
ecf44dd3
NS
326 VLOG_INFO("Monitor request received. Resetting the database");
327 /* Resetting the database here has few risks. If the
328 * process_notification() fails, the database is completely
329 * lost locally. In case that node becomes active, then
330 * there is a chance of complete data loss in the active/standy
331 * cluster. */
332 error = reset_database(db);
333 if (!error) {
334 error = process_notification(msg->result, db);
335 }
23c16b51
AZ
336 if (error) {
337 ovsdb_error_assert(error);
338 state = RPL_S_ERR;
339 } else {
340 /* Transition to replicating state after receiving
341 * all replies of "monitor" requests. */
342 if (hmap_is_empty(&request_ids)) {
60e0cd04 343 VLOG_DBG("Listening to monitor updates");
23c16b51
AZ
344 state = RPL_S_REPLICATING;
345 }
346 }
347 break;
348 }
349
350 case RPL_S_ERR:
351 /* Ignore all messages */
352 break;
353
60e0cd04 354 case RPL_S_INIT:
23c16b51
AZ
355 case RPL_S_REPLICATING:
356 default:
357 OVS_NOT_REACHED();
358 }
359 }
360 next:
361 jsonrpc_msg_destroy(msg);
ae671c5f
MC
362 }
363}
364
8c945cec
AZ
365void
366replication_wait(void)
367{
23c16b51
AZ
368 if (session) {
369 jsonrpc_session_wait(session);
370 jsonrpc_session_recv_wait(session);
8c945cec
AZ
371 }
372}
373
3109b4e1
AZ
374/* Parse 'blacklist' to rebuild 'blacklist_tables'. If 'dryrun' is false, the
375 * current black list tables will be wiped out, regardless of whether
376 * 'blacklist' can be parsed. If 'dryrun' is true, only parses 'blacklist' and
377 * reports any errors, without modifying the blacklist.
378 *
379 * On error, returns the error string, which the caller is
380 * responsible for freeing. Returns NULL otherwise. */
381char * OVS_WARN_UNUSED_RESULT
382set_blacklist_tables(const char *blacklist, bool dryrun)
7a9d65d2 383{
3109b4e1
AZ
384 struct sset set = SSET_INITIALIZER(&set);
385 char *err = NULL;
386
7a9d65d2 387 if (blacklist) {
3109b4e1
AZ
388 const char *longname;
389
390 if (!dryrun) {
6ab3dd96 391 /* Can only add to an empty shash. */
3109b4e1
AZ
392 blacklist_tables_clear();
393 }
394
395 sset_from_delimited_string(&set, blacklist, " ,");
396 SSET_FOR_EACH (longname, &set) {
397 char *database = xstrdup(longname), *table = NULL;
398 strtok_r(database, ":", &table);
399 if (table && !dryrun) {
400 blacklist_tables_add(database, table);
401 }
402
403 free(database);
404 if (!table) {
405 err = xasprintf("Can't parse black list table: %s", longname);
406 goto done;
407 }
408 }
409 }
410
411done:
412 sset_destroy(&set);
413 if (err && !dryrun) {
414 /* On error, destroy the partially built 'blacklist_tables'. */
415 blacklist_tables_clear();
416 }
417 return err;
418}
419
420char * OVS_WARN_UNUSED_RESULT
421get_blacklist_tables(void)
422{
423 struct shash_node *node;
424 struct sset set = SSET_INITIALIZER(&set);
425
426 SHASH_FOR_EACH (node, &blacklist_tables) {
427 const char *database = node->name;
428 const char *table;
429 struct sset *tables = node->data;
430
431 SSET_FOR_EACH (table, tables) {
432 sset_add_and_free(&set, xasprintf("%s:%s", database, table));
433 }
434 }
435
436 /* Output the table list in an sorted order, so that
437 * the output string will not depend on the hash function
438 * that used to implement the hmap data structure. This is
439 * only useful for writting unit tests. */
440 const char **sorted = sset_sort(&set);
441 struct ds ds = DS_EMPTY_INITIALIZER;
442 size_t i;
443 for (i = 0; i < sset_count(&set); i++) {
444 ds_put_format(&ds, "%s,", sorted[i]);
7a9d65d2 445 }
3109b4e1
AZ
446
447 ds_chomp(&ds, ',');
448
449 free(sorted);
450 sset_destroy(&set);
451
452 return ds_steal_cstr(&ds);
7a9d65d2
MC
453}
454
3109b4e1
AZ
455static void
456blacklist_tables_clear(void)
9dc05cdc 457{
3109b4e1
AZ
458 struct shash_node *node;
459 SHASH_FOR_EACH (node, &blacklist_tables) {
460 struct sset *tables = node->data;
461 sset_destroy(tables);
462 }
463
464 shash_clear_free_data(&blacklist_tables);
465}
466
467static void
468blacklist_tables_add(const char *database, const char *table)
469{
470 struct sset *tables = shash_find_data(&blacklist_tables, database);
471
472 if (!tables) {
473 tables = xmalloc(sizeof *tables);
474 sset_init(tables);
475 shash_add(&blacklist_tables, database, tables);
476 }
477
478 sset_add(tables, table);
479}
480
481static bool
482blacklist_tables_find(const char *database, const char *table)
483{
484 struct sset *tables = shash_find_data(&blacklist_tables, database);
485 return tables && sset_contains(tables, table);
9dc05cdc
MC
486}
487
ae671c5f 488void
f53d7518 489disconnect_active_server(void)
c9c5c9e2 490{
23c16b51
AZ
491 jsonrpc_session_close(session);
492 session = NULL;
c9c5c9e2
MC
493}
494
495void
3109b4e1 496replication_destroy(void)
ae671c5f 497{
3109b4e1
AZ
498 blacklist_tables_clear();
499 shash_destroy(&blacklist_tables);
ae671c5f 500
60e0cd04
AZ
501 if (sync_from) {
502 free(sync_from);
503 sync_from = NULL;
ae671c5f 504 }
599c0f44
AZ
505
506 request_ids_destroy();
5e8bc3c5 507 replication_dbs_destroy();
23c16b51
AZ
508
509 shash_destroy(&local_dbs);
ae671c5f
MC
510}
511
6ab3dd96
AZ
512static struct ovsdb *
513find_db(const char *db_name)
ae671c5f 514{
23c16b51 515 return shash_find_data(replication_dbs, db_name);
ae671c5f
MC
516}
517\f
518static struct ovsdb_error *
23c16b51 519reset_database(struct ovsdb *db)
ae671c5f 520{
23c16b51 521 struct ovsdb_txn *txn = ovsdb_txn_create(db);
ae671c5f
MC
522 struct shash_node *table_node;
523
524 SHASH_FOR_EACH (table_node, &db->tables) {
3109b4e1
AZ
525 /* Delete all rows if the table is not blacklisted. */
526 if (!blacklist_tables_find(db->schema->name, table_node->name)) {
527 struct ovsdb_table *table = table_node->data;
45b4b20a
BP
528 struct ovsdb_row *row, *next;
529 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &table->rows) {
7a9d65d2
MC
530 ovsdb_txn_row_delete(txn, row);
531 }
ae671c5f
MC
532 }
533 }
ae671c5f 534
1b1d2e6d 535 return ovsdb_txn_propose_commit_block(txn, false);
ae671c5f
MC
536}
537
23c16b51
AZ
538/* Create a monitor request for 'db'. The monitor request will include
539 * any tables from 'blacklisted_tables'
540 *
541 * Caller is responsible for disposing 'request'.
542 */
543static struct jsonrpc_msg *
544create_monitor_request(struct ovsdb *db)
ae671c5f 545{
23c16b51
AZ
546 struct jsonrpc_msg *request;
547 struct json *monitor;
548 struct ovsdb_schema *schema = db->schema;
549 const char *db_name = schema->name;
ae671c5f 550
23c16b51
AZ
551 struct json *monitor_request = json_object_create();
552 size_t n = shash_count(&schema->tables);
553 const struct shash_node **nodes = shash_sort(&schema->tables);
ae671c5f 554
23c16b51
AZ
555 for (int j = 0; j < n; j++) {
556 struct ovsdb_table_schema *table = nodes[j]->data;
ae671c5f 557
23c16b51
AZ
558 /* Monitor all tables not blacklisted. */
559 if (!blacklist_tables_find(db_name, table->name)) {
560 add_monitored_table(table, monitor_request);
ae671c5f
MC
561 }
562 }
23c16b51 563 free(nodes);
ae671c5f 564
23c16b51
AZ
565 /* Create a monitor request. */
566 monitor = json_array_create_3(
567 json_string_create(db_name),
568 json_string_create(db_name),
569 monitor_request);
570 request = jsonrpc_create_request("monitor", monitor, NULL);
0cedc9db 571
23c16b51 572 return request;
ae671c5f
MC
573}
574
575static void
576add_monitored_table(struct ovsdb_table_schema *table,
577 struct json *monitor_request)
578{
579 struct json *monitor_request_array;
580
ae671c5f
MC
581 monitor_request_array = json_array_create_empty();
582 json_array_add(monitor_request_array, json_object_create());
583
584 json_object_put(monitor_request, table->name, monitor_request_array);
585}
ae671c5f 586
23c16b51
AZ
587\f
588static struct ovsdb_error *
6ab3dd96 589process_notification(struct json *table_updates, struct ovsdb *db)
ae671c5f 590{
23c16b51 591 struct ovsdb_error *error = NULL;
ae671c5f
MC
592 struct ovsdb_txn *txn;
593
23c16b51
AZ
594 if (table_updates->type == JSON_OBJECT) {
595 txn = ovsdb_txn_create(db);
596
597 /* Process each table update. */
598 struct shash_node *node;
599 SHASH_FOR_EACH (node, json_object(table_updates)) {
600 struct json *table_update = node->data;
601 if (table_update) {
602 error = process_table_update(table_update, node->name, db, txn);
603 if (error) {
604 break;
605 }
ae671c5f
MC
606 }
607 }
ae671c5f 608
23c16b51
AZ
609 if (error) {
610 ovsdb_txn_abort(txn);
611 return error;
612 } else {
613 /* Commit transaction. */
1b1d2e6d 614 error = ovsdb_txn_propose_commit_block(txn, false);
23c16b51 615 }
ae671c5f
MC
616 }
617
23c16b51 618 return error;
ae671c5f
MC
619}
620
621static struct ovsdb_error *
622process_table_update(struct json *table_update, const char *table_name,
623 struct ovsdb *database, struct ovsdb_txn *txn)
624{
1cb0f53c
BP
625 struct ovsdb_table *table = ovsdb_get_table(database, table_name);
626 if (!table) {
627 return ovsdb_error("unknown table", "unknown table %s", table_name);
628 }
ae671c5f
MC
629
630 if (table_update->type != JSON_OBJECT) {
43898d4c
WT
631 return ovsdb_error("Not a JSON object",
632 "<table-update> for table is not object");
ae671c5f
MC
633 }
634
1cb0f53c 635 struct shash_node *node;
ae671c5f
MC
636 SHASH_FOR_EACH (node, json_object(table_update)) {
637 struct json *row_update = node->data;
638 struct json *old, *new;
639
640 if (row_update->type != JSON_OBJECT) {
1cb0f53c
BP
641 return ovsdb_error("Not a JSON object",
642 "<row-update> is not object");
643 }
644
645 struct uuid uuid;
646 if (!uuid_from_string(&uuid, node->name)) {
647 return ovsdb_syntax_error(table_update, "bad row UUID",
648 "<table-update> names must be UUIDs");
ae671c5f 649 }
1cb0f53c 650
ae671c5f
MC
651 old = shash_find_data(json_object(row_update), "old");
652 new = shash_find_data(json_object(row_update), "new");
653
1cb0f53c
BP
654 struct ovsdb_error *error;
655 error = (!new ? execute_delete(txn, &uuid, table)
656 : !old ? execute_insert(txn, &uuid, table, new)
657 : execute_update(txn, &uuid, table, new));
c9c5c9e2 658 if (error) {
1cb0f53c 659 return error;
c9c5c9e2 660 }
ae671c5f 661 }
1cb0f53c 662 return NULL;
ae671c5f
MC
663}
664\f
665static struct ovsdb_error *
1cb0f53c 666execute_insert(struct ovsdb_txn *txn, const struct uuid *row_uuid,
ae671c5f
MC
667 struct ovsdb_table *table, struct json *json_row)
668{
1cb0f53c
BP
669 struct ovsdb_row *row = ovsdb_row_create(table);
670 struct ovsdb_error *error = ovsdb_row_from_json(row, json_row, NULL, NULL);
ae671c5f 671 if (!error) {
1cb0f53c 672 *ovsdb_row_get_uuid_rw(row) = *row_uuid;
ae671c5f
MC
673 ovsdb_txn_row_insert(txn, row);
674 } else {
1cb0f53c
BP
675 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
676 VLOG_WARN_RL(&rl, "cannot add existing row "UUID_FMT" to table %s",
677 UUID_ARGS(row_uuid), table->schema->name);
ae671c5f
MC
678 ovsdb_row_destroy(row);
679 }
680
681 return error;
682}
683
ae671c5f 684static struct ovsdb_error *
1cb0f53c 685execute_delete(struct ovsdb_txn *txn, const struct uuid *row_uuid,
ae671c5f
MC
686 struct ovsdb_table *table)
687{
1cb0f53c
BP
688 const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
689 if (row) {
690 ovsdb_txn_row_delete(txn, row);
691 } else {
692 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
693 VLOG_WARN_RL(&rl, "cannot delete missing row "UUID_FMT" from table %s",
694 UUID_ARGS(row_uuid), table->schema->name);
ae671c5f 695 }
1cb0f53c 696 return NULL;
ae671c5f
MC
697}
698
699static struct ovsdb_error *
1cb0f53c 700execute_update(struct ovsdb_txn *txn, const struct uuid *row_uuid,
ae671c5f
MC
701 struct ovsdb_table *table, struct json *json_row)
702{
1cb0f53c
BP
703 const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid);
704 if (!row) {
705 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
706 VLOG_WARN_RL(&rl, "cannot modify missing row "UUID_FMT" in table %s",
707 UUID_ARGS(row_uuid), table->schema->name);
708 return NULL;
ae671c5f 709 }
1cb0f53c
BP
710
711 struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
712 struct ovsdb_row *update = ovsdb_row_create(table);
713 struct ovsdb_error *error = ovsdb_row_from_json(update, json_row,
714 NULL, &columns);
715
716 if (!error && !ovsdb_row_equal_columns(row, update, &columns)) {
717 ovsdb_row_update_columns(ovsdb_txn_row_modify(txn, row),
718 update, &columns);
ae671c5f
MC
719 }
720
ae671c5f 721 ovsdb_column_set_destroy(&columns);
1cb0f53c 722 ovsdb_row_destroy(update);
ae671c5f
MC
723 return error;
724}
725
599c0f44
AZ
726void
727request_ids_add(const struct json *id, struct ovsdb *db)
728{
729 struct request_ids_hmap_node *node = xmalloc(sizeof *node);
730
731 node->request_id = json_clone(id);
732 node->db = db;
733 hmap_insert(&request_ids, &node->hmap, json_hash(id, 0));
734}
735
736/* Look up 'id' from 'request_ids', if found, remove the found id from
737 * 'request_ids' and free its memory. If not found, 'request_ids' does
738 * not change. Sets '*db' to the database for the request (NULL if not
739 * found).
740 *
741 * Return true if 'id' is found, false otherwise.
742 */
743bool
744request_ids_lookup_and_free(const struct json *id, struct ovsdb **db)
745{
746 struct request_ids_hmap_node *node;
747
748 HMAP_FOR_EACH_WITH_HASH (node, hmap, json_hash(id, 0), &request_ids) {
749 if (json_equal(id, node->request_id)) {
750 hmap_remove(&request_ids, &node->hmap);
751 *db = node->db;
752 json_destroy(node->request_id);
753 free(node);
754 return true;
755 }
756 }
757
758 *db = NULL;
759 return false;
760}
761
762static void
763request_ids_destroy(void)
764{
765 struct request_ids_hmap_node *node;
766
767 HMAP_FOR_EACH_POP (node, hmap, &request_ids) {
768 json_destroy(node->request_id);
769 free(node);
770 }
771 hmap_destroy(&request_ids);
772}
773
774void
775request_ids_clear(void)
776{
777 request_ids_destroy();
778 hmap_init(&request_ids);
779}
780
23c16b51
AZ
781static struct shash *
782replication_db_clone(struct shash *dbs)
783{
784 struct shash *new = xmalloc(sizeof *new);
785 shash_init(new);
786
787 struct shash_node *node;
788 SHASH_FOR_EACH (node, dbs) {
789 shash_add(new, node->name, node->data);
790 }
791
792 return new;
793}
794
5e8bc3c5
AZ
795static void
796replication_dbs_destroy(void)
797{
798 shash_destroy(replication_dbs);
799 free(replication_dbs);
800 replication_dbs = NULL;
801}
802
23c16b51
AZ
803/* Return true if replication just started or is ongoing.
804 * Return false if the connection failed, or the replication
805 * was not able to start. */
806bool
807replication_is_alive(void)
808{
809 if (session) {
810 return jsonrpc_session_is_alive(session) && state != RPL_S_ERR;
811 }
812 return false;
813}
814
815/* Return the last error reported on a connection by 'session'. The
816 * return value is 0 if replication is not currently running, or
817 * if replication session has not encountered any error.
818 *
819 * Return a negative value if replication session has error, or the
820 * replication was not able to start. */
821int
822replication_get_last_error(void)
823{
824 int err = 0;
825
826 if (session) {
827 err = jsonrpc_session_get_last_error(session);
828 if (!err) {
829 err = (state == RPL_S_ERR) ? ENOENT : 0;
830 }
831 }
832
833 return err;
834}
835
60e0cd04
AZ
836char *
837replication_status(void)
838{
839 bool alive = session && jsonrpc_session_is_alive(session);
840 struct ds ds = DS_EMPTY_INITIALIZER;
841
842 if (alive) {
843 switch(state) {
844 case RPL_S_INIT:
05ac209a 845 case RPL_S_SERVER_ID_REQUESTED:
60e0cd04
AZ
846 case RPL_S_DB_REQUESTED:
847 case RPL_S_SCHEMA_REQUESTED:
848 case RPL_S_MONITOR_REQUESTED:
849 ds_put_format(&ds, "connecting: %s", sync_from);
850 break;
851 case RPL_S_REPLICATING: {
852 struct shash_node *node;
853
854 ds_put_format(&ds, "replicating: %s\n", sync_from);
855 ds_put_cstr(&ds, "database:");
856 SHASH_FOR_EACH (node, replication_dbs) {
857 ds_put_format(&ds, " %s,", node->name);
858 }
859 ds_chomp(&ds, ',');
860
861 if (!shash_is_empty(&blacklist_tables)) {
862 ds_put_char(&ds, '\n');
863 ds_put_cstr(&ds, "exclude: ");
864 ds_put_and_free_cstr(&ds, get_blacklist_tables());
865 }
866 break;
867 }
868 case RPL_S_ERR:
869 ds_put_format(&ds, "Replication to (%s) failed\n", sync_from);
870 break;
871 default:
872 OVS_NOT_REACHED();
60e0cd04
AZ
873 }
874 } else {
875 ds_put_format(&ds, "not connected to %s", sync_from);
876 }
877 return ds_steal_cstr(&ds);
878}
879
ae671c5f
MC
880void
881replication_usage(void)
882{
883 printf("\n\
884Syncing options:\n\
60e0cd04
AZ
885 --sync-from=SERVER sync DATABASE from active SERVER and start in\n\
886 backup mode (except with --active)\n\
ae671c5f 887 --sync-exclude-tables=DB:TABLE,...\n\
60e0cd04
AZ
888 exclude the TABLE in DB from syncing\n\
889 --active with --sync-from, start in active mode\n");
ae671c5f 890}