]>
Commit | Line | Data |
---|---|---|
ae671c5f | 1 | /* |
1b1d2e6d | 2 | * (c) Copyright 2016, 2017 Hewlett Packard Enterprise Development LP |
05ac209a | 3 | * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016, 2017 Nicira, Inc. |
ae671c5f MC |
4 | * |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at: | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | #include <config.h> | |
19 | ||
ae671c5f MC |
20 | |
21 | #include "condition.h" | |
599c0f44 | 22 | #include "jsonrpc.h" |
3109b4e1 | 23 | #include "openvswitch/dynamic-string.h" |
599c0f44 | 24 | #include "openvswitch/hmap.h" |
ee89ea7b | 25 | #include "openvswitch/json.h" |
8be42087 | 26 | #include "openvswitch/vlog.h" |
ae671c5f | 27 | #include "ovsdb-error.h" |
599c0f44 | 28 | #include "ovsdb.h" |
ae671c5f | 29 | #include "query.h" |
599c0f44 | 30 | #include "replication.h" |
ae671c5f | 31 | #include "row.h" |
ae671c5f | 32 | #include "sset.h" |
599c0f44 | 33 | #include "stream.h" |
ae671c5f MC |
34 | #include "svec.h" |
35 | #include "table.h" | |
36 | #include "transaction.h" | |
05ac209a | 37 | #include "uuid.h" |
ae671c5f | 38 | |
8be42087 AZ |
39 | VLOG_DEFINE_THIS_MODULE(replication); |
40 | ||
60e0cd04 | 41 | static char *sync_from; |
05ac209a | 42 | static struct uuid server_uuid; |
5e8bc3c5 | 43 | static struct jsonrpc_session *session; |
23c16b51 AZ |
44 | static unsigned int session_seqno = UINT_MAX; |
45 | ||
cec7005b | 46 | static struct jsonrpc_msg *create_monitor_request(struct ovsdb_schema *); |
ae671c5f MC |
47 | static void add_monitored_table(struct ovsdb_table_schema *table, |
48 | struct json *monitor_requests); | |
49 | ||
23c16b51 | 50 | static struct ovsdb_error *reset_database(struct ovsdb *db); |
ae671c5f | 51 | |
23c16b51 | 52 | static struct ovsdb_error *process_notification(struct json *, struct ovsdb *); |
ae671c5f MC |
53 | static struct ovsdb_error *process_table_update(struct json *table_update, |
54 | const char *table_name, | |
55 | struct ovsdb *database, | |
56 | struct ovsdb_txn *txn); | |
57 | ||
58 | static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn, | |
1cb0f53c | 59 | const struct uuid *row_uuid, |
ae671c5f MC |
60 | struct ovsdb_table *table, |
61 | struct json *new); | |
62 | static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn, | |
1cb0f53c | 63 | const struct uuid *row_uuid, |
ae671c5f MC |
64 | struct ovsdb_table *table); |
65 | static struct ovsdb_error *execute_update(struct ovsdb_txn *txn, | |
1cb0f53c | 66 | const struct uuid *row_uuid, |
ae671c5f MC |
67 | struct ovsdb_table *table, |
68 | struct json *new); | |
3109b4e1 AZ |
69 | \f |
70 | /* Maps from db name to sset of table names. */ | |
71 | static struct shash blacklist_tables = SHASH_INITIALIZER(&blacklist_tables); | |
72 | ||
73 | static void blacklist_tables_clear(void); | |
74 | static void blacklist_tables_add(const char *database, const char *table); | |
75 | static bool blacklist_tables_find(const char *database, const char* table); | |
76 | ||
ae671c5f | 77 | \f |
599c0f44 AZ |
78 | /* Keep track of request IDs of all outstanding OVSDB requests. */ |
79 | static struct hmap request_ids = HMAP_INITIALIZER(&request_ids); | |
80 | ||
81 | struct request_ids_hmap_node { | |
82 | struct hmap_node hmap; | |
83 | struct json *request_id; | |
84 | struct ovsdb *db; /* associated database */ | |
85 | }; | |
86 | void request_ids_add(const struct json *id, struct ovsdb *db); | |
87 | bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db); | |
88 | static void request_ids_destroy(void); | |
89 | void request_ids_clear(void); | |
90 | \f | |
23c16b51 | 91 | enum ovsdb_replication_state { |
60e0cd04 | 92 | RPL_S_INIT, |
05ac209a | 93 | RPL_S_SERVER_ID_REQUESTED, |
23c16b51 AZ |
94 | RPL_S_DB_REQUESTED, |
95 | RPL_S_SCHEMA_REQUESTED, | |
96 | RPL_S_MONITOR_REQUESTED, | |
97 | RPL_S_REPLICATING, | |
98 | RPL_S_ERR /* Error, no longer replicating. */ | |
99 | }; | |
100 | static enum ovsdb_replication_state state; | |
101 | ||
102 | \f | |
cec7005b NS |
103 | struct replication_db { |
104 | struct ovsdb *db; | |
105 | bool schema_version_higher; | |
106 | /* Points to the schema received from the active server if | |
107 | * the local db schema version is higher. NULL otherwise. */ | |
108 | struct ovsdb_schema *active_db_schema; | |
109 | }; | |
110 | ||
111 | static bool is_replication_possible(struct ovsdb_schema *local_db_schema, | |
112 | struct ovsdb_schema *active_db_schema); | |
113 | ||
23c16b51 AZ |
114 | /* All DBs known to ovsdb-server. The actual replication dbs are stored |
115 | * in 'replication dbs', which is a subset of all dbs and remote dbs whose | |
116 | * schema matches. */ | |
117 | static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs); | |
5e8bc3c5 | 118 | static struct shash *replication_dbs; |
599c0f44 | 119 | |
cec7005b | 120 | static struct shash *replication_dbs_create(void); |
5e8bc3c5 | 121 | static void replication_dbs_destroy(void); |
6ab3dd96 | 122 | /* Find 'struct ovsdb' by name within 'replication_dbs' */ |
cec7005b | 123 | static struct replication_db *find_db(const char *db_name); |
6ab3dd96 AZ |
124 | \f |
125 | ||
7a9d65d2 | 126 | void |
05ac209a | 127 | replication_init(const char *sync_from_, const char *exclude_tables, |
e988b8ab | 128 | const struct uuid *server, int probe_interval) |
7a9d65d2 | 129 | { |
60e0cd04 AZ |
130 | free(sync_from); |
131 | sync_from = xstrdup(sync_from_); | |
60e0cd04 AZ |
132 | /* Caller should have verified that the 'exclude_tables' is |
133 | * parseable. An error here is unexpected. */ | |
500db308 | 134 | ovs_assert(!set_blacklist_tables(exclude_tables, false)); |
60e0cd04 | 135 | |
5e8bc3c5 | 136 | replication_dbs_destroy(); |
23c16b51 AZ |
137 | |
138 | shash_clear(&local_dbs); | |
139 | if (session) { | |
140 | jsonrpc_session_close(session); | |
3109b4e1 | 141 | } |
23c16b51 | 142 | |
60e0cd04 | 143 | session = jsonrpc_session_open(sync_from, true); |
23c16b51 | 144 | session_seqno = UINT_MAX; |
05ac209a | 145 | |
e988b8ab NS |
146 | jsonrpc_session_set_probe_interval(session, probe_interval); |
147 | ||
05ac209a AZ |
148 | /* Keep a copy of local server uuid. */ |
149 | server_uuid = *server; | |
150 | ||
60e0cd04 | 151 | state = RPL_S_INIT; |
7a9d65d2 MC |
152 | } |
153 | ||
ae671c5f | 154 | void |
23c16b51 | 155 | replication_add_local_db(const char *database, struct ovsdb *db) |
6ab3dd96 | 156 | { |
23c16b51 | 157 | shash_add_assert(&local_dbs, database, db); |
6ab3dd96 AZ |
158 | } |
159 | ||
71f21279 BP |
160 | static void |
161 | send_schema_requests(const struct json *result) | |
162 | { | |
fa37affa BP |
163 | for (size_t i = 0; i < result->array.n; i++) { |
164 | const struct json *name = result->array.elems[i]; | |
71f21279 BP |
165 | if (name->type == JSON_STRING) { |
166 | /* Send one schema request for each remote DB. */ | |
167 | const char *db_name = json_string(name); | |
cec7005b NS |
168 | struct replication_db *rdb = find_db(db_name); |
169 | if (rdb) { | |
71f21279 BP |
170 | struct jsonrpc_msg *request = |
171 | jsonrpc_create_request( | |
172 | "get_schema", | |
173 | json_array_create_1( | |
174 | json_string_create(db_name)), | |
175 | NULL); | |
176 | ||
cec7005b | 177 | request_ids_add(request->id, rdb->db); |
71f21279 BP |
178 | jsonrpc_session_send(session, request); |
179 | } | |
180 | } | |
181 | } | |
182 | } | |
183 | ||
6ab3dd96 AZ |
184 | void |
185 | replication_run(void) | |
ae671c5f | 186 | { |
23c16b51 AZ |
187 | if (!session) { |
188 | return; | |
189 | } | |
190 | ||
191 | jsonrpc_session_run(session); | |
192 | ||
193 | for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) { | |
194 | struct jsonrpc_msg *msg; | |
195 | unsigned int seqno; | |
196 | ||
197 | seqno = jsonrpc_session_get_seqno(session); | |
60e0cd04 | 198 | if (seqno != session_seqno || state == RPL_S_INIT) { |
23c16b51 AZ |
199 | session_seqno = seqno; |
200 | request_ids_clear(); | |
201 | struct jsonrpc_msg *request; | |
05ac209a | 202 | request = jsonrpc_create_request("get_server_id", |
23c16b51 AZ |
203 | json_array_create_empty(), NULL); |
204 | request_ids_add(request->id, NULL); | |
205 | jsonrpc_session_send(session, request); | |
206 | ||
05ac209a AZ |
207 | state = RPL_S_SERVER_ID_REQUESTED; |
208 | VLOG_DBG("send server ID request."); | |
ae671c5f MC |
209 | } |
210 | ||
23c16b51 AZ |
211 | msg = jsonrpc_session_recv(session); |
212 | if (!msg) { | |
213 | continue; | |
ae671c5f MC |
214 | } |
215 | ||
23c16b51 AZ |
216 | if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR |
217 | && !strcmp(msg->method, "update")) { | |
218 | if (msg->params->type == JSON_ARRAY | |
fa37affa BP |
219 | && msg->params->array.n == 2 |
220 | && msg->params->array.elems[0]->type == JSON_STRING) { | |
221 | char *db_name = msg->params->array.elems[0]->string; | |
cec7005b NS |
222 | struct replication_db *rdb = find_db(db_name); |
223 | if (rdb) { | |
23c16b51 | 224 | struct ovsdb_error *error; |
fa37affa | 225 | error = process_notification(msg->params->array.elems[1], |
cec7005b | 226 | rdb->db); |
23c16b51 AZ |
227 | if (error) { |
228 | ovsdb_error_assert(error); | |
229 | state = RPL_S_ERR; | |
230 | } | |
231 | } | |
232 | } | |
233 | } else if (msg->type == JSONRPC_REPLY) { | |
cec7005b | 234 | struct replication_db *rdb; |
23c16b51 AZ |
235 | struct ovsdb *db; |
236 | if (!request_ids_lookup_and_free(msg->id, &db)) { | |
237 | VLOG_WARN("received unexpected reply"); | |
238 | goto next; | |
239 | } | |
240 | ||
241 | switch (state) { | |
05ac209a AZ |
242 | case RPL_S_SERVER_ID_REQUESTED: { |
243 | struct uuid uuid; | |
244 | if (msg->result->type != JSON_STRING || | |
245 | !uuid_from_string(&uuid, json_string(msg->result))) { | |
246 | struct ovsdb_error *error; | |
247 | error = ovsdb_error("get_server_id failed", | |
248 | "Server ID is not valid UUID"); | |
249 | ||
250 | ovsdb_error_assert(error); | |
251 | state = RPL_S_ERR; | |
252 | break; | |
253 | } | |
254 | ||
255 | if (uuid_equals(&uuid, &server_uuid)) { | |
256 | struct ovsdb_error *error; | |
257 | error = ovsdb_error("Server ID check failed", | |
258 | "Self replicating is not allowed"); | |
259 | ||
260 | ovsdb_error_assert(error); | |
261 | state = RPL_S_ERR; | |
262 | break; | |
263 | } | |
264 | ||
265 | struct jsonrpc_msg *request; | |
266 | request = jsonrpc_create_request("list_dbs", | |
267 | json_array_create_empty(), | |
268 | NULL); | |
269 | request_ids_add(request->id, NULL); | |
270 | jsonrpc_session_send(session, request); | |
271 | ||
272 | replication_dbs_destroy(); | |
cec7005b | 273 | replication_dbs = replication_dbs_create(); |
05ac209a AZ |
274 | state = RPL_S_DB_REQUESTED; |
275 | break; | |
276 | } | |
23c16b51 AZ |
277 | case RPL_S_DB_REQUESTED: |
278 | if (msg->result->type != JSON_ARRAY) { | |
279 | struct ovsdb_error *error; | |
05ac209a | 280 | error = ovsdb_error("list_dbs failed", |
23c16b51 AZ |
281 | "list_dbs response is not array"); |
282 | ovsdb_error_assert(error); | |
283 | state = RPL_S_ERR; | |
284 | } else { | |
71f21279 | 285 | send_schema_requests(msg->result); |
60e0cd04 | 286 | VLOG_DBG("Send schema requests"); |
23c16b51 AZ |
287 | state = RPL_S_SCHEMA_REQUESTED; |
288 | } | |
289 | break; | |
290 | ||
291 | case RPL_S_SCHEMA_REQUESTED: { | |
292 | struct ovsdb_schema *schema; | |
293 | struct ovsdb_error *error; | |
294 | ||
295 | error = ovsdb_schema_from_json(msg->result, &schema); | |
296 | if (error) { | |
297 | ovsdb_error_assert(error); | |
298 | state = RPL_S_ERR; | |
299 | } | |
300 | ||
cec7005b NS |
301 | rdb = find_db(schema->name); |
302 | if (!rdb) { | |
23c16b51 AZ |
303 | /* Unexpected schema. */ |
304 | VLOG_WARN("unexpected schema %s", schema->name); | |
305 | state = RPL_S_ERR; | |
cec7005b | 306 | } else if (!ovsdb_schema_equal(schema, rdb->db->schema)) { |
23c16b51 | 307 | /* Schmea version mismatch. */ |
cec7005b NS |
308 | VLOG_INFO("Schema version mismatch, checking if %s can " |
309 | "still be replicated or not.", | |
23c16b51 | 310 | schema->name); |
cec7005b NS |
311 | if (is_replication_possible(rdb->db->schema, schema)) { |
312 | VLOG_INFO("%s can be replicated.", schema->name); | |
313 | rdb->schema_version_higher = true; | |
314 | if (rdb->active_db_schema) { | |
315 | ovsdb_schema_destroy(rdb->active_db_schema); | |
316 | } | |
317 | rdb->active_db_schema = schema; | |
318 | } else { | |
319 | VLOG_INFO("%s cannot be replicated.", schema->name); | |
320 | struct replication_db *r = | |
321 | shash_find_and_delete(replication_dbs, | |
322 | schema->name); | |
323 | if (r->active_db_schema) { | |
324 | ovsdb_schema_destroy(r->active_db_schema); | |
325 | } | |
326 | free(r); | |
327 | ovsdb_schema_destroy(schema); | |
328 | } | |
329 | } else { | |
330 | ovsdb_schema_destroy(schema); | |
23c16b51 | 331 | } |
23c16b51 AZ |
332 | |
333 | /* After receiving schemas, reset the local databases that | |
334 | * will be monitored and send out monitor requests for them. */ | |
335 | if (hmap_is_empty(&request_ids)) { | |
ecf44dd3 | 336 | struct shash_node *node; |
23c16b51 AZ |
337 | |
338 | if (shash_is_empty(replication_dbs)) { | |
339 | VLOG_WARN("Nothing to replicate."); | |
340 | state = RPL_S_ERR; | |
341 | } else { | |
342 | SHASH_FOR_EACH (node, replication_dbs) { | |
cec7005b | 343 | rdb = node->data; |
23c16b51 | 344 | struct jsonrpc_msg *request = |
cec7005b NS |
345 | create_monitor_request( |
346 | rdb->schema_version_higher ? | |
347 | rdb->active_db_schema : rdb->db->schema); | |
23c16b51 | 348 | |
cec7005b | 349 | request_ids_add(request->id, rdb->db); |
23c16b51 | 350 | jsonrpc_session_send(session, request); |
60e0cd04 | 351 | VLOG_DBG("Send monitor requests"); |
23c16b51 AZ |
352 | state = RPL_S_MONITOR_REQUESTED; |
353 | } | |
354 | } | |
355 | } | |
356 | break; | |
357 | } | |
358 | ||
359 | case RPL_S_MONITOR_REQUESTED: { | |
360 | /* Reply to monitor requests. */ | |
361 | struct ovsdb_error *error; | |
ecf44dd3 NS |
362 | VLOG_INFO("Monitor request received. Resetting the database"); |
363 | /* Resetting the database here has few risks. If the | |
364 | * process_notification() fails, the database is completely | |
365 | * lost locally. In case that node becomes active, then | |
366 | * there is a chance of complete data loss in the active/standy | |
367 | * cluster. */ | |
368 | error = reset_database(db); | |
369 | if (!error) { | |
370 | error = process_notification(msg->result, db); | |
371 | } | |
23c16b51 AZ |
372 | if (error) { |
373 | ovsdb_error_assert(error); | |
374 | state = RPL_S_ERR; | |
375 | } else { | |
376 | /* Transition to replicating state after receiving | |
377 | * all replies of "monitor" requests. */ | |
378 | if (hmap_is_empty(&request_ids)) { | |
60e0cd04 | 379 | VLOG_DBG("Listening to monitor updates"); |
23c16b51 AZ |
380 | state = RPL_S_REPLICATING; |
381 | } | |
382 | } | |
383 | break; | |
384 | } | |
385 | ||
386 | case RPL_S_ERR: | |
387 | /* Ignore all messages */ | |
388 | break; | |
389 | ||
60e0cd04 | 390 | case RPL_S_INIT: |
23c16b51 AZ |
391 | case RPL_S_REPLICATING: |
392 | default: | |
393 | OVS_NOT_REACHED(); | |
394 | } | |
395 | } | |
396 | next: | |
397 | jsonrpc_msg_destroy(msg); | |
ae671c5f MC |
398 | } |
399 | } | |
400 | ||
8c945cec AZ |
401 | void |
402 | replication_wait(void) | |
403 | { | |
23c16b51 AZ |
404 | if (session) { |
405 | jsonrpc_session_wait(session); | |
406 | jsonrpc_session_recv_wait(session); | |
8c945cec AZ |
407 | } |
408 | } | |
409 | ||
3109b4e1 AZ |
410 | /* Parse 'blacklist' to rebuild 'blacklist_tables'. If 'dryrun' is false, the |
411 | * current black list tables will be wiped out, regardless of whether | |
412 | * 'blacklist' can be parsed. If 'dryrun' is true, only parses 'blacklist' and | |
413 | * reports any errors, without modifying the blacklist. | |
414 | * | |
415 | * On error, returns the error string, which the caller is | |
416 | * responsible for freeing. Returns NULL otherwise. */ | |
417 | char * OVS_WARN_UNUSED_RESULT | |
418 | set_blacklist_tables(const char *blacklist, bool dryrun) | |
7a9d65d2 | 419 | { |
3109b4e1 AZ |
420 | struct sset set = SSET_INITIALIZER(&set); |
421 | char *err = NULL; | |
422 | ||
7a9d65d2 | 423 | if (blacklist) { |
3109b4e1 AZ |
424 | const char *longname; |
425 | ||
426 | if (!dryrun) { | |
6ab3dd96 | 427 | /* Can only add to an empty shash. */ |
3109b4e1 AZ |
428 | blacklist_tables_clear(); |
429 | } | |
430 | ||
431 | sset_from_delimited_string(&set, blacklist, " ,"); | |
432 | SSET_FOR_EACH (longname, &set) { | |
433 | char *database = xstrdup(longname), *table = NULL; | |
434 | strtok_r(database, ":", &table); | |
435 | if (table && !dryrun) { | |
436 | blacklist_tables_add(database, table); | |
437 | } | |
438 | ||
439 | free(database); | |
440 | if (!table) { | |
441 | err = xasprintf("Can't parse black list table: %s", longname); | |
442 | goto done; | |
443 | } | |
444 | } | |
445 | } | |
446 | ||
447 | done: | |
448 | sset_destroy(&set); | |
449 | if (err && !dryrun) { | |
450 | /* On error, destroy the partially built 'blacklist_tables'. */ | |
451 | blacklist_tables_clear(); | |
452 | } | |
453 | return err; | |
454 | } | |
455 | ||
456 | char * OVS_WARN_UNUSED_RESULT | |
457 | get_blacklist_tables(void) | |
458 | { | |
459 | struct shash_node *node; | |
460 | struct sset set = SSET_INITIALIZER(&set); | |
461 | ||
462 | SHASH_FOR_EACH (node, &blacklist_tables) { | |
463 | const char *database = node->name; | |
464 | const char *table; | |
465 | struct sset *tables = node->data; | |
466 | ||
467 | SSET_FOR_EACH (table, tables) { | |
468 | sset_add_and_free(&set, xasprintf("%s:%s", database, table)); | |
469 | } | |
470 | } | |
471 | ||
472 | /* Output the table list in an sorted order, so that | |
473 | * the output string will not depend on the hash function | |
474 | * that used to implement the hmap data structure. This is | |
475 | * only useful for writting unit tests. */ | |
476 | const char **sorted = sset_sort(&set); | |
477 | struct ds ds = DS_EMPTY_INITIALIZER; | |
478 | size_t i; | |
479 | for (i = 0; i < sset_count(&set); i++) { | |
480 | ds_put_format(&ds, "%s,", sorted[i]); | |
7a9d65d2 | 481 | } |
3109b4e1 AZ |
482 | |
483 | ds_chomp(&ds, ','); | |
484 | ||
485 | free(sorted); | |
486 | sset_destroy(&set); | |
487 | ||
488 | return ds_steal_cstr(&ds); | |
7a9d65d2 MC |
489 | } |
490 | ||
3109b4e1 AZ |
491 | static void |
492 | blacklist_tables_clear(void) | |
9dc05cdc | 493 | { |
3109b4e1 AZ |
494 | struct shash_node *node; |
495 | SHASH_FOR_EACH (node, &blacklist_tables) { | |
496 | struct sset *tables = node->data; | |
497 | sset_destroy(tables); | |
498 | } | |
499 | ||
500 | shash_clear_free_data(&blacklist_tables); | |
501 | } | |
502 | ||
503 | static void | |
504 | blacklist_tables_add(const char *database, const char *table) | |
505 | { | |
506 | struct sset *tables = shash_find_data(&blacklist_tables, database); | |
507 | ||
508 | if (!tables) { | |
509 | tables = xmalloc(sizeof *tables); | |
510 | sset_init(tables); | |
511 | shash_add(&blacklist_tables, database, tables); | |
512 | } | |
513 | ||
514 | sset_add(tables, table); | |
515 | } | |
516 | ||
517 | static bool | |
518 | blacklist_tables_find(const char *database, const char *table) | |
519 | { | |
520 | struct sset *tables = shash_find_data(&blacklist_tables, database); | |
521 | return tables && sset_contains(tables, table); | |
9dc05cdc MC |
522 | } |
523 | ||
ae671c5f | 524 | void |
f53d7518 | 525 | disconnect_active_server(void) |
c9c5c9e2 | 526 | { |
23c16b51 AZ |
527 | jsonrpc_session_close(session); |
528 | session = NULL; | |
c9c5c9e2 MC |
529 | } |
530 | ||
531 | void | |
3109b4e1 | 532 | replication_destroy(void) |
ae671c5f | 533 | { |
3109b4e1 AZ |
534 | blacklist_tables_clear(); |
535 | shash_destroy(&blacklist_tables); | |
ae671c5f | 536 | |
60e0cd04 AZ |
537 | if (sync_from) { |
538 | free(sync_from); | |
539 | sync_from = NULL; | |
ae671c5f | 540 | } |
599c0f44 AZ |
541 | |
542 | request_ids_destroy(); | |
5e8bc3c5 | 543 | replication_dbs_destroy(); |
23c16b51 AZ |
544 | |
545 | shash_destroy(&local_dbs); | |
ae671c5f MC |
546 | } |
547 | ||
cec7005b | 548 | static struct replication_db * |
6ab3dd96 | 549 | find_db(const char *db_name) |
ae671c5f | 550 | { |
23c16b51 | 551 | return shash_find_data(replication_dbs, db_name); |
ae671c5f MC |
552 | } |
553 | \f | |
554 | static struct ovsdb_error * | |
23c16b51 | 555 | reset_database(struct ovsdb *db) |
ae671c5f | 556 | { |
23c16b51 | 557 | struct ovsdb_txn *txn = ovsdb_txn_create(db); |
ae671c5f MC |
558 | struct shash_node *table_node; |
559 | ||
560 | SHASH_FOR_EACH (table_node, &db->tables) { | |
3109b4e1 AZ |
561 | /* Delete all rows if the table is not blacklisted. */ |
562 | if (!blacklist_tables_find(db->schema->name, table_node->name)) { | |
563 | struct ovsdb_table *table = table_node->data; | |
45b4b20a BP |
564 | struct ovsdb_row *row, *next; |
565 | HMAP_FOR_EACH_SAFE (row, next, hmap_node, &table->rows) { | |
7a9d65d2 MC |
566 | ovsdb_txn_row_delete(txn, row); |
567 | } | |
ae671c5f MC |
568 | } |
569 | } | |
ae671c5f | 570 | |
1b1d2e6d | 571 | return ovsdb_txn_propose_commit_block(txn, false); |
ae671c5f MC |
572 | } |
573 | ||
23c16b51 AZ |
574 | /* Create a monitor request for 'db'. The monitor request will include |
575 | * any tables from 'blacklisted_tables' | |
576 | * | |
577 | * Caller is responsible for disposing 'request'. | |
578 | */ | |
579 | static struct jsonrpc_msg * | |
cec7005b | 580 | create_monitor_request(struct ovsdb_schema *schema) |
ae671c5f | 581 | { |
23c16b51 AZ |
582 | struct jsonrpc_msg *request; |
583 | struct json *monitor; | |
23c16b51 | 584 | const char *db_name = schema->name; |
ae671c5f | 585 | |
23c16b51 AZ |
586 | struct json *monitor_request = json_object_create(); |
587 | size_t n = shash_count(&schema->tables); | |
588 | const struct shash_node **nodes = shash_sort(&schema->tables); | |
ae671c5f | 589 | |
23c16b51 AZ |
590 | for (int j = 0; j < n; j++) { |
591 | struct ovsdb_table_schema *table = nodes[j]->data; | |
ae671c5f | 592 | |
23c16b51 AZ |
593 | /* Monitor all tables not blacklisted. */ |
594 | if (!blacklist_tables_find(db_name, table->name)) { | |
595 | add_monitored_table(table, monitor_request); | |
ae671c5f MC |
596 | } |
597 | } | |
23c16b51 | 598 | free(nodes); |
ae671c5f | 599 | |
23c16b51 AZ |
600 | /* Create a monitor request. */ |
601 | monitor = json_array_create_3( | |
602 | json_string_create(db_name), | |
603 | json_string_create(db_name), | |
604 | monitor_request); | |
605 | request = jsonrpc_create_request("monitor", monitor, NULL); | |
0cedc9db | 606 | |
23c16b51 | 607 | return request; |
ae671c5f MC |
608 | } |
609 | ||
610 | static void | |
611 | add_monitored_table(struct ovsdb_table_schema *table, | |
612 | struct json *monitor_request) | |
613 | { | |
614 | struct json *monitor_request_array; | |
615 | ||
ae671c5f MC |
616 | monitor_request_array = json_array_create_empty(); |
617 | json_array_add(monitor_request_array, json_object_create()); | |
618 | ||
619 | json_object_put(monitor_request, table->name, monitor_request_array); | |
620 | } | |
ae671c5f | 621 | |
23c16b51 AZ |
622 | \f |
623 | static struct ovsdb_error * | |
6ab3dd96 | 624 | process_notification(struct json *table_updates, struct ovsdb *db) |
ae671c5f | 625 | { |
23c16b51 | 626 | struct ovsdb_error *error = NULL; |
ae671c5f MC |
627 | struct ovsdb_txn *txn; |
628 | ||
23c16b51 AZ |
629 | if (table_updates->type == JSON_OBJECT) { |
630 | txn = ovsdb_txn_create(db); | |
631 | ||
632 | /* Process each table update. */ | |
633 | struct shash_node *node; | |
634 | SHASH_FOR_EACH (node, json_object(table_updates)) { | |
635 | struct json *table_update = node->data; | |
636 | if (table_update) { | |
637 | error = process_table_update(table_update, node->name, db, txn); | |
638 | if (error) { | |
639 | break; | |
640 | } | |
ae671c5f MC |
641 | } |
642 | } | |
ae671c5f | 643 | |
23c16b51 AZ |
644 | if (error) { |
645 | ovsdb_txn_abort(txn); | |
646 | return error; | |
647 | } else { | |
648 | /* Commit transaction. */ | |
1b1d2e6d | 649 | error = ovsdb_txn_propose_commit_block(txn, false); |
23c16b51 | 650 | } |
ae671c5f MC |
651 | } |
652 | ||
23c16b51 | 653 | return error; |
ae671c5f MC |
654 | } |
655 | ||
656 | static struct ovsdb_error * | |
657 | process_table_update(struct json *table_update, const char *table_name, | |
658 | struct ovsdb *database, struct ovsdb_txn *txn) | |
659 | { | |
1cb0f53c BP |
660 | struct ovsdb_table *table = ovsdb_get_table(database, table_name); |
661 | if (!table) { | |
662 | return ovsdb_error("unknown table", "unknown table %s", table_name); | |
663 | } | |
ae671c5f MC |
664 | |
665 | if (table_update->type != JSON_OBJECT) { | |
43898d4c WT |
666 | return ovsdb_error("Not a JSON object", |
667 | "<table-update> for table is not object"); | |
ae671c5f MC |
668 | } |
669 | ||
1cb0f53c | 670 | struct shash_node *node; |
ae671c5f MC |
671 | SHASH_FOR_EACH (node, json_object(table_update)) { |
672 | struct json *row_update = node->data; | |
673 | struct json *old, *new; | |
674 | ||
675 | if (row_update->type != JSON_OBJECT) { | |
1cb0f53c BP |
676 | return ovsdb_error("Not a JSON object", |
677 | "<row-update> is not object"); | |
678 | } | |
679 | ||
680 | struct uuid uuid; | |
681 | if (!uuid_from_string(&uuid, node->name)) { | |
682 | return ovsdb_syntax_error(table_update, "bad row UUID", | |
683 | "<table-update> names must be UUIDs"); | |
ae671c5f | 684 | } |
1cb0f53c | 685 | |
ae671c5f MC |
686 | old = shash_find_data(json_object(row_update), "old"); |
687 | new = shash_find_data(json_object(row_update), "new"); | |
688 | ||
1cb0f53c BP |
689 | struct ovsdb_error *error; |
690 | error = (!new ? execute_delete(txn, &uuid, table) | |
691 | : !old ? execute_insert(txn, &uuid, table, new) | |
692 | : execute_update(txn, &uuid, table, new)); | |
c9c5c9e2 | 693 | if (error) { |
1cb0f53c | 694 | return error; |
c9c5c9e2 | 695 | } |
ae671c5f | 696 | } |
1cb0f53c | 697 | return NULL; |
ae671c5f MC |
698 | } |
699 | \f | |
700 | static struct ovsdb_error * | |
1cb0f53c | 701 | execute_insert(struct ovsdb_txn *txn, const struct uuid *row_uuid, |
ae671c5f MC |
702 | struct ovsdb_table *table, struct json *json_row) |
703 | { | |
1cb0f53c BP |
704 | struct ovsdb_row *row = ovsdb_row_create(table); |
705 | struct ovsdb_error *error = ovsdb_row_from_json(row, json_row, NULL, NULL); | |
ae671c5f | 706 | if (!error) { |
1cb0f53c | 707 | *ovsdb_row_get_uuid_rw(row) = *row_uuid; |
ae671c5f MC |
708 | ovsdb_txn_row_insert(txn, row); |
709 | } else { | |
1cb0f53c BP |
710 | static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); |
711 | VLOG_WARN_RL(&rl, "cannot add existing row "UUID_FMT" to table %s", | |
712 | UUID_ARGS(row_uuid), table->schema->name); | |
ae671c5f MC |
713 | ovsdb_row_destroy(row); |
714 | } | |
715 | ||
716 | return error; | |
717 | } | |
718 | ||
ae671c5f | 719 | static struct ovsdb_error * |
1cb0f53c | 720 | execute_delete(struct ovsdb_txn *txn, const struct uuid *row_uuid, |
ae671c5f MC |
721 | struct ovsdb_table *table) |
722 | { | |
1cb0f53c BP |
723 | const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid); |
724 | if (row) { | |
725 | ovsdb_txn_row_delete(txn, row); | |
726 | } else { | |
727 | static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); | |
728 | VLOG_WARN_RL(&rl, "cannot delete missing row "UUID_FMT" from table %s", | |
729 | UUID_ARGS(row_uuid), table->schema->name); | |
ae671c5f | 730 | } |
1cb0f53c | 731 | return NULL; |
ae671c5f MC |
732 | } |
733 | ||
734 | static struct ovsdb_error * | |
1cb0f53c | 735 | execute_update(struct ovsdb_txn *txn, const struct uuid *row_uuid, |
ae671c5f MC |
736 | struct ovsdb_table *table, struct json *json_row) |
737 | { | |
1cb0f53c BP |
738 | const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid); |
739 | if (!row) { | |
740 | static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); | |
741 | VLOG_WARN_RL(&rl, "cannot modify missing row "UUID_FMT" in table %s", | |
742 | UUID_ARGS(row_uuid), table->schema->name); | |
743 | return NULL; | |
ae671c5f | 744 | } |
1cb0f53c BP |
745 | |
746 | struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER; | |
747 | struct ovsdb_row *update = ovsdb_row_create(table); | |
748 | struct ovsdb_error *error = ovsdb_row_from_json(update, json_row, | |
749 | NULL, &columns); | |
750 | ||
751 | if (!error && !ovsdb_row_equal_columns(row, update, &columns)) { | |
752 | ovsdb_row_update_columns(ovsdb_txn_row_modify(txn, row), | |
753 | update, &columns); | |
ae671c5f MC |
754 | } |
755 | ||
ae671c5f | 756 | ovsdb_column_set_destroy(&columns); |
1cb0f53c | 757 | ovsdb_row_destroy(update); |
ae671c5f MC |
758 | return error; |
759 | } | |
760 | ||
599c0f44 AZ |
761 | void |
762 | request_ids_add(const struct json *id, struct ovsdb *db) | |
763 | { | |
764 | struct request_ids_hmap_node *node = xmalloc(sizeof *node); | |
765 | ||
766 | node->request_id = json_clone(id); | |
767 | node->db = db; | |
768 | hmap_insert(&request_ids, &node->hmap, json_hash(id, 0)); | |
769 | } | |
770 | ||
771 | /* Look up 'id' from 'request_ids', if found, remove the found id from | |
772 | * 'request_ids' and free its memory. If not found, 'request_ids' does | |
773 | * not change. Sets '*db' to the database for the request (NULL if not | |
774 | * found). | |
775 | * | |
776 | * Return true if 'id' is found, false otherwise. | |
777 | */ | |
778 | bool | |
779 | request_ids_lookup_and_free(const struct json *id, struct ovsdb **db) | |
780 | { | |
781 | struct request_ids_hmap_node *node; | |
782 | ||
783 | HMAP_FOR_EACH_WITH_HASH (node, hmap, json_hash(id, 0), &request_ids) { | |
784 | if (json_equal(id, node->request_id)) { | |
785 | hmap_remove(&request_ids, &node->hmap); | |
786 | *db = node->db; | |
787 | json_destroy(node->request_id); | |
788 | free(node); | |
789 | return true; | |
790 | } | |
791 | } | |
792 | ||
793 | *db = NULL; | |
794 | return false; | |
795 | } | |
796 | ||
797 | static void | |
798 | request_ids_destroy(void) | |
799 | { | |
800 | struct request_ids_hmap_node *node; | |
801 | ||
802 | HMAP_FOR_EACH_POP (node, hmap, &request_ids) { | |
803 | json_destroy(node->request_id); | |
804 | free(node); | |
805 | } | |
806 | hmap_destroy(&request_ids); | |
807 | } | |
808 | ||
809 | void | |
810 | request_ids_clear(void) | |
811 | { | |
812 | request_ids_destroy(); | |
813 | hmap_init(&request_ids); | |
814 | } | |
815 | ||
23c16b51 | 816 | static struct shash * |
cec7005b | 817 | replication_dbs_create(void) |
23c16b51 AZ |
818 | { |
819 | struct shash *new = xmalloc(sizeof *new); | |
820 | shash_init(new); | |
821 | ||
822 | struct shash_node *node; | |
cec7005b NS |
823 | SHASH_FOR_EACH (node, &local_dbs) { |
824 | struct replication_db *repl_db = xmalloc(sizeof *repl_db); | |
825 | repl_db->db = node->data; | |
826 | repl_db->schema_version_higher = false; | |
827 | repl_db->active_db_schema = NULL; | |
828 | shash_add(new, node->name, repl_db); | |
23c16b51 AZ |
829 | } |
830 | ||
831 | return new; | |
832 | } | |
833 | ||
5e8bc3c5 AZ |
834 | static void |
835 | replication_dbs_destroy(void) | |
836 | { | |
cec7005b NS |
837 | if (!replication_dbs) { |
838 | return; | |
839 | } | |
840 | ||
841 | struct shash_node *node, *next; | |
842 | ||
843 | SHASH_FOR_EACH_SAFE (node, next, replication_dbs) { | |
844 | hmap_remove(&replication_dbs->map, &node->node); | |
845 | struct replication_db *rdb = node->data; | |
846 | if (rdb->active_db_schema) { | |
847 | ovsdb_schema_destroy(rdb->active_db_schema); | |
848 | } | |
849 | free(rdb); | |
850 | free(node->name); | |
851 | free(node); | |
852 | } | |
853 | ||
854 | hmap_destroy(&replication_dbs->map); | |
5e8bc3c5 AZ |
855 | free(replication_dbs); |
856 | replication_dbs = NULL; | |
857 | } | |
858 | ||
23c16b51 AZ |
859 | /* Return true if replication just started or is ongoing. |
860 | * Return false if the connection failed, or the replication | |
861 | * was not able to start. */ | |
862 | bool | |
863 | replication_is_alive(void) | |
864 | { | |
865 | if (session) { | |
866 | return jsonrpc_session_is_alive(session) && state != RPL_S_ERR; | |
867 | } | |
868 | return false; | |
869 | } | |
870 | ||
871 | /* Return the last error reported on a connection by 'session'. The | |
872 | * return value is 0 if replication is not currently running, or | |
873 | * if replication session has not encountered any error. | |
874 | * | |
875 | * Return a negative value if replication session has error, or the | |
876 | * replication was not able to start. */ | |
877 | int | |
878 | replication_get_last_error(void) | |
879 | { | |
880 | int err = 0; | |
881 | ||
882 | if (session) { | |
883 | err = jsonrpc_session_get_last_error(session); | |
884 | if (!err) { | |
885 | err = (state == RPL_S_ERR) ? ENOENT : 0; | |
886 | } | |
887 | } | |
888 | ||
889 | return err; | |
890 | } | |
891 | ||
60e0cd04 AZ |
892 | char * |
893 | replication_status(void) | |
894 | { | |
895 | bool alive = session && jsonrpc_session_is_alive(session); | |
896 | struct ds ds = DS_EMPTY_INITIALIZER; | |
897 | ||
898 | if (alive) { | |
899 | switch(state) { | |
900 | case RPL_S_INIT: | |
05ac209a | 901 | case RPL_S_SERVER_ID_REQUESTED: |
60e0cd04 AZ |
902 | case RPL_S_DB_REQUESTED: |
903 | case RPL_S_SCHEMA_REQUESTED: | |
904 | case RPL_S_MONITOR_REQUESTED: | |
905 | ds_put_format(&ds, "connecting: %s", sync_from); | |
906 | break; | |
907 | case RPL_S_REPLICATING: { | |
908 | struct shash_node *node; | |
909 | ||
910 | ds_put_format(&ds, "replicating: %s\n", sync_from); | |
911 | ds_put_cstr(&ds, "database:"); | |
912 | SHASH_FOR_EACH (node, replication_dbs) { | |
913 | ds_put_format(&ds, " %s,", node->name); | |
914 | } | |
915 | ds_chomp(&ds, ','); | |
916 | ||
917 | if (!shash_is_empty(&blacklist_tables)) { | |
918 | ds_put_char(&ds, '\n'); | |
919 | ds_put_cstr(&ds, "exclude: "); | |
920 | ds_put_and_free_cstr(&ds, get_blacklist_tables()); | |
921 | } | |
922 | break; | |
923 | } | |
924 | case RPL_S_ERR: | |
925 | ds_put_format(&ds, "Replication to (%s) failed\n", sync_from); | |
926 | break; | |
927 | default: | |
928 | OVS_NOT_REACHED(); | |
60e0cd04 AZ |
929 | } |
930 | } else { | |
931 | ds_put_format(&ds, "not connected to %s", sync_from); | |
932 | } | |
933 | return ds_steal_cstr(&ds); | |
934 | } | |
935 | ||
cec7005b NS |
936 | /* Checks if it's possible to replicate to the local db from the active db |
937 | * schema. Returns true, if 'local_db_schema' has all the tables and columns | |
938 | * of 'active_db_schema', false otherwise. | |
939 | */ | |
940 | static bool | |
941 | is_replication_possible(struct ovsdb_schema *local_db_schema, | |
942 | struct ovsdb_schema *active_db_schema) | |
943 | { | |
944 | struct shash_node *node; | |
945 | SHASH_FOR_EACH (node, &active_db_schema->tables) { | |
946 | struct ovsdb_table_schema *ldb_table_schema = | |
947 | shash_find_data(&local_db_schema->tables, node->name); | |
948 | if (!ldb_table_schema) { | |
949 | VLOG_INFO("Table %s not present in the local db schema", | |
950 | node->name); | |
951 | return false; | |
952 | } | |
953 | ||
954 | /* Local schema table should have all the columns | |
955 | * of active schema table. */ | |
956 | struct ovsdb_table_schema *adb_table_schema = node->data; | |
957 | struct shash_node *n; | |
958 | SHASH_FOR_EACH (n, &adb_table_schema->columns) { | |
959 | struct ovsdb_column *ldb_col = | |
960 | shash_find_data(&ldb_table_schema->columns, n->name); | |
961 | if (!ldb_col) { | |
962 | VLOG_INFO("Column %s not present in the local " | |
963 | "db schema table %s.", n->name, node->name); | |
964 | return false; | |
965 | } | |
966 | ||
967 | struct json *ldb_col_json = ovsdb_column_to_json(ldb_col); | |
968 | struct json *adb_col_json = ovsdb_column_to_json(n->data); | |
969 | bool cols_equal = json_equal(ldb_col_json, adb_col_json); | |
970 | json_destroy(ldb_col_json); | |
971 | json_destroy(adb_col_json); | |
972 | ||
973 | if (!cols_equal) { | |
974 | VLOG_INFO("Column %s mismatch in local " | |
975 | "db schema table %s.", n->name, node->name); | |
976 | return false; | |
977 | } | |
978 | } | |
979 | } | |
980 | ||
981 | return true; | |
982 | } | |
983 | ||
e988b8ab NS |
984 | void |
985 | replication_set_probe_interval(int probe_interval) | |
986 | { | |
987 | if (session) { | |
988 | jsonrpc_session_set_probe_interval(session, probe_interval); | |
989 | } | |
990 | } | |
991 | ||
ae671c5f MC |
992 | void |
993 | replication_usage(void) | |
994 | { | |
995 | printf("\n\ | |
996 | Syncing options:\n\ | |
60e0cd04 AZ |
997 | --sync-from=SERVER sync DATABASE from active SERVER and start in\n\ |
998 | backup mode (except with --active)\n\ | |
ae671c5f | 999 | --sync-exclude-tables=DB:TABLE,...\n\ |
60e0cd04 AZ |
1000 | exclude the TABLE in DB from syncing\n\ |
1001 | --active with --sync-from, start in active mode\n"); | |
ae671c5f | 1002 | } |