]>
Commit | Line | Data |
---|---|---|
ae671c5f | 1 | /* |
1b1d2e6d | 2 | * (c) Copyright 2016, 2017 Hewlett Packard Enterprise Development LP |
05ac209a | 3 | * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016, 2017 Nicira, Inc. |
ae671c5f MC |
4 | * |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at: | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | #include <config.h> | |
19 | ||
ae671c5f MC |
20 | |
21 | #include "condition.h" | |
599c0f44 | 22 | #include "jsonrpc.h" |
3109b4e1 | 23 | #include "openvswitch/dynamic-string.h" |
599c0f44 | 24 | #include "openvswitch/hmap.h" |
ee89ea7b | 25 | #include "openvswitch/json.h" |
8be42087 | 26 | #include "openvswitch/vlog.h" |
ae671c5f | 27 | #include "ovsdb-error.h" |
599c0f44 | 28 | #include "ovsdb.h" |
ae671c5f | 29 | #include "query.h" |
599c0f44 | 30 | #include "replication.h" |
ae671c5f | 31 | #include "row.h" |
ae671c5f | 32 | #include "sset.h" |
599c0f44 | 33 | #include "stream.h" |
ae671c5f MC |
34 | #include "svec.h" |
35 | #include "table.h" | |
36 | #include "transaction.h" | |
05ac209a | 37 | #include "uuid.h" |
ae671c5f | 38 | |
8be42087 AZ |
39 | VLOG_DEFINE_THIS_MODULE(replication); |
40 | ||
60e0cd04 | 41 | static char *sync_from; |
05ac209a | 42 | static struct uuid server_uuid; |
5e8bc3c5 | 43 | static struct jsonrpc_session *session; |
23c16b51 AZ |
44 | static unsigned int session_seqno = UINT_MAX; |
45 | ||
46 | static struct jsonrpc_msg *create_monitor_request(struct ovsdb *db); | |
ae671c5f MC |
47 | static void add_monitored_table(struct ovsdb_table_schema *table, |
48 | struct json *monitor_requests); | |
49 | ||
23c16b51 | 50 | static struct ovsdb_error *reset_database(struct ovsdb *db); |
ae671c5f | 51 | |
23c16b51 | 52 | static struct ovsdb_error *process_notification(struct json *, struct ovsdb *); |
ae671c5f MC |
53 | static struct ovsdb_error *process_table_update(struct json *table_update, |
54 | const char *table_name, | |
55 | struct ovsdb *database, | |
56 | struct ovsdb_txn *txn); | |
57 | ||
58 | static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn, | |
1cb0f53c | 59 | const struct uuid *row_uuid, |
ae671c5f MC |
60 | struct ovsdb_table *table, |
61 | struct json *new); | |
62 | static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn, | |
1cb0f53c | 63 | const struct uuid *row_uuid, |
ae671c5f MC |
64 | struct ovsdb_table *table); |
65 | static struct ovsdb_error *execute_update(struct ovsdb_txn *txn, | |
1cb0f53c | 66 | const struct uuid *row_uuid, |
ae671c5f MC |
67 | struct ovsdb_table *table, |
68 | struct json *new); | |
3109b4e1 AZ |
69 | \f |
70 | /* Maps from db name to sset of table names. */ | |
71 | static struct shash blacklist_tables = SHASH_INITIALIZER(&blacklist_tables); | |
72 | ||
73 | static void blacklist_tables_clear(void); | |
74 | static void blacklist_tables_add(const char *database, const char *table); | |
75 | static bool blacklist_tables_find(const char *database, const char* table); | |
76 | ||
ae671c5f | 77 | \f |
599c0f44 AZ |
78 | /* Keep track of request IDs of all outstanding OVSDB requests. */ |
79 | static struct hmap request_ids = HMAP_INITIALIZER(&request_ids); | |
80 | ||
81 | struct request_ids_hmap_node { | |
82 | struct hmap_node hmap; | |
83 | struct json *request_id; | |
84 | struct ovsdb *db; /* associated database */ | |
85 | }; | |
86 | void request_ids_add(const struct json *id, struct ovsdb *db); | |
87 | bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db); | |
88 | static void request_ids_destroy(void); | |
89 | void request_ids_clear(void); | |
90 | \f | |
23c16b51 | 91 | enum ovsdb_replication_state { |
60e0cd04 | 92 | RPL_S_INIT, |
05ac209a | 93 | RPL_S_SERVER_ID_REQUESTED, |
23c16b51 AZ |
94 | RPL_S_DB_REQUESTED, |
95 | RPL_S_SCHEMA_REQUESTED, | |
96 | RPL_S_MONITOR_REQUESTED, | |
97 | RPL_S_REPLICATING, | |
98 | RPL_S_ERR /* Error, no longer replicating. */ | |
99 | }; | |
100 | static enum ovsdb_replication_state state; | |
101 | ||
102 | \f | |
103 | /* All DBs known to ovsdb-server. The actual replication dbs are stored | |
104 | * in 'replication dbs', which is a subset of all dbs and remote dbs whose | |
105 | * schema matches. */ | |
106 | static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs); | |
5e8bc3c5 | 107 | static struct shash *replication_dbs; |
599c0f44 | 108 | |
23c16b51 | 109 | static struct shash *replication_db_clone(struct shash *dbs); |
5e8bc3c5 | 110 | static void replication_dbs_destroy(void); |
6ab3dd96 AZ |
111 | /* Find 'struct ovsdb' by name within 'replication_dbs' */ |
112 | static struct ovsdb* find_db(const char *db_name); | |
113 | \f | |
114 | ||
7a9d65d2 | 115 | void |
05ac209a AZ |
116 | replication_init(const char *sync_from_, const char *exclude_tables, |
117 | const struct uuid *server) | |
7a9d65d2 | 118 | { |
60e0cd04 AZ |
119 | free(sync_from); |
120 | sync_from = xstrdup(sync_from_); | |
60e0cd04 AZ |
121 | /* Caller should have verified that the 'exclude_tables' is |
122 | * parseable. An error here is unexpected. */ | |
500db308 | 123 | ovs_assert(!set_blacklist_tables(exclude_tables, false)); |
60e0cd04 | 124 | |
5e8bc3c5 | 125 | replication_dbs_destroy(); |
23c16b51 AZ |
126 | |
127 | shash_clear(&local_dbs); | |
128 | if (session) { | |
129 | jsonrpc_session_close(session); | |
3109b4e1 | 130 | } |
23c16b51 | 131 | |
60e0cd04 | 132 | session = jsonrpc_session_open(sync_from, true); |
23c16b51 | 133 | session_seqno = UINT_MAX; |
05ac209a AZ |
134 | |
135 | /* Keep a copy of local server uuid. */ | |
136 | server_uuid = *server; | |
137 | ||
60e0cd04 | 138 | state = RPL_S_INIT; |
7a9d65d2 MC |
139 | } |
140 | ||
ae671c5f | 141 | void |
23c16b51 | 142 | replication_add_local_db(const char *database, struct ovsdb *db) |
6ab3dd96 | 143 | { |
23c16b51 | 144 | shash_add_assert(&local_dbs, database, db); |
6ab3dd96 AZ |
145 | } |
146 | ||
71f21279 BP |
147 | static void |
148 | send_schema_requests(const struct json *result) | |
149 | { | |
fa37affa BP |
150 | for (size_t i = 0; i < result->array.n; i++) { |
151 | const struct json *name = result->array.elems[i]; | |
71f21279 BP |
152 | if (name->type == JSON_STRING) { |
153 | /* Send one schema request for each remote DB. */ | |
154 | const char *db_name = json_string(name); | |
155 | struct ovsdb *db = find_db(db_name); | |
156 | if (db) { | |
157 | struct jsonrpc_msg *request = | |
158 | jsonrpc_create_request( | |
159 | "get_schema", | |
160 | json_array_create_1( | |
161 | json_string_create(db_name)), | |
162 | NULL); | |
163 | ||
164 | request_ids_add(request->id, db); | |
165 | jsonrpc_session_send(session, request); | |
166 | } | |
167 | } | |
168 | } | |
169 | } | |
170 | ||
6ab3dd96 AZ |
171 | void |
172 | replication_run(void) | |
ae671c5f | 173 | { |
23c16b51 AZ |
174 | if (!session) { |
175 | return; | |
176 | } | |
177 | ||
178 | jsonrpc_session_run(session); | |
179 | ||
180 | for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) { | |
181 | struct jsonrpc_msg *msg; | |
182 | unsigned int seqno; | |
183 | ||
184 | seqno = jsonrpc_session_get_seqno(session); | |
60e0cd04 | 185 | if (seqno != session_seqno || state == RPL_S_INIT) { |
23c16b51 AZ |
186 | session_seqno = seqno; |
187 | request_ids_clear(); | |
188 | struct jsonrpc_msg *request; | |
05ac209a | 189 | request = jsonrpc_create_request("get_server_id", |
23c16b51 AZ |
190 | json_array_create_empty(), NULL); |
191 | request_ids_add(request->id, NULL); | |
192 | jsonrpc_session_send(session, request); | |
193 | ||
05ac209a AZ |
194 | state = RPL_S_SERVER_ID_REQUESTED; |
195 | VLOG_DBG("send server ID request."); | |
ae671c5f MC |
196 | } |
197 | ||
23c16b51 AZ |
198 | msg = jsonrpc_session_recv(session); |
199 | if (!msg) { | |
200 | continue; | |
ae671c5f MC |
201 | } |
202 | ||
23c16b51 AZ |
203 | if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR |
204 | && !strcmp(msg->method, "update")) { | |
205 | if (msg->params->type == JSON_ARRAY | |
fa37affa BP |
206 | && msg->params->array.n == 2 |
207 | && msg->params->array.elems[0]->type == JSON_STRING) { | |
208 | char *db_name = msg->params->array.elems[0]->string; | |
23c16b51 AZ |
209 | struct ovsdb *db = find_db(db_name); |
210 | if (db) { | |
211 | struct ovsdb_error *error; | |
fa37affa | 212 | error = process_notification(msg->params->array.elems[1], |
23c16b51 AZ |
213 | db); |
214 | if (error) { | |
215 | ovsdb_error_assert(error); | |
216 | state = RPL_S_ERR; | |
217 | } | |
218 | } | |
219 | } | |
220 | } else if (msg->type == JSONRPC_REPLY) { | |
221 | struct ovsdb *db; | |
222 | if (!request_ids_lookup_and_free(msg->id, &db)) { | |
223 | VLOG_WARN("received unexpected reply"); | |
224 | goto next; | |
225 | } | |
226 | ||
227 | switch (state) { | |
05ac209a AZ |
228 | case RPL_S_SERVER_ID_REQUESTED: { |
229 | struct uuid uuid; | |
230 | if (msg->result->type != JSON_STRING || | |
231 | !uuid_from_string(&uuid, json_string(msg->result))) { | |
232 | struct ovsdb_error *error; | |
233 | error = ovsdb_error("get_server_id failed", | |
234 | "Server ID is not valid UUID"); | |
235 | ||
236 | ovsdb_error_assert(error); | |
237 | state = RPL_S_ERR; | |
238 | break; | |
239 | } | |
240 | ||
241 | if (uuid_equals(&uuid, &server_uuid)) { | |
242 | struct ovsdb_error *error; | |
243 | error = ovsdb_error("Server ID check failed", | |
244 | "Self replicating is not allowed"); | |
245 | ||
246 | ovsdb_error_assert(error); | |
247 | state = RPL_S_ERR; | |
248 | break; | |
249 | } | |
250 | ||
251 | struct jsonrpc_msg *request; | |
252 | request = jsonrpc_create_request("list_dbs", | |
253 | json_array_create_empty(), | |
254 | NULL); | |
255 | request_ids_add(request->id, NULL); | |
256 | jsonrpc_session_send(session, request); | |
257 | ||
258 | replication_dbs_destroy(); | |
259 | replication_dbs = replication_db_clone(&local_dbs); | |
260 | state = RPL_S_DB_REQUESTED; | |
261 | break; | |
262 | } | |
23c16b51 AZ |
263 | case RPL_S_DB_REQUESTED: |
264 | if (msg->result->type != JSON_ARRAY) { | |
265 | struct ovsdb_error *error; | |
05ac209a | 266 | error = ovsdb_error("list_dbs failed", |
23c16b51 AZ |
267 | "list_dbs response is not array"); |
268 | ovsdb_error_assert(error); | |
269 | state = RPL_S_ERR; | |
270 | } else { | |
71f21279 | 271 | send_schema_requests(msg->result); |
60e0cd04 | 272 | VLOG_DBG("Send schema requests"); |
23c16b51 AZ |
273 | state = RPL_S_SCHEMA_REQUESTED; |
274 | } | |
275 | break; | |
276 | ||
277 | case RPL_S_SCHEMA_REQUESTED: { | |
278 | struct ovsdb_schema *schema; | |
279 | struct ovsdb_error *error; | |
280 | ||
281 | error = ovsdb_schema_from_json(msg->result, &schema); | |
282 | if (error) { | |
283 | ovsdb_error_assert(error); | |
284 | state = RPL_S_ERR; | |
285 | } | |
286 | ||
287 | if (db != find_db(schema->name)) { | |
288 | /* Unexpected schema. */ | |
289 | VLOG_WARN("unexpected schema %s", schema->name); | |
290 | state = RPL_S_ERR; | |
291 | } else if (!ovsdb_schema_equal(schema, db->schema)) { | |
292 | /* Schmea version mismatch. */ | |
293 | VLOG_INFO("Schema version mismatch, %s not replicated", | |
294 | schema->name); | |
295 | shash_find_and_delete(replication_dbs, schema->name); | |
296 | } | |
297 | ovsdb_schema_destroy(schema); | |
298 | ||
299 | /* After receiving schemas, reset the local databases that | |
300 | * will be monitored and send out monitor requests for them. */ | |
301 | if (hmap_is_empty(&request_ids)) { | |
ecf44dd3 | 302 | struct shash_node *node; |
23c16b51 AZ |
303 | |
304 | if (shash_is_empty(replication_dbs)) { | |
305 | VLOG_WARN("Nothing to replicate."); | |
306 | state = RPL_S_ERR; | |
307 | } else { | |
308 | SHASH_FOR_EACH (node, replication_dbs) { | |
309 | db = node->data; | |
23c16b51 AZ |
310 | struct jsonrpc_msg *request = |
311 | create_monitor_request(db); | |
312 | ||
313 | request_ids_add(request->id, db); | |
314 | jsonrpc_session_send(session, request); | |
60e0cd04 | 315 | VLOG_DBG("Send monitor requests"); |
23c16b51 AZ |
316 | state = RPL_S_MONITOR_REQUESTED; |
317 | } | |
318 | } | |
319 | } | |
320 | break; | |
321 | } | |
322 | ||
323 | case RPL_S_MONITOR_REQUESTED: { | |
324 | /* Reply to monitor requests. */ | |
325 | struct ovsdb_error *error; | |
ecf44dd3 NS |
326 | VLOG_INFO("Monitor request received. Resetting the database"); |
327 | /* Resetting the database here has few risks. If the | |
328 | * process_notification() fails, the database is completely | |
329 | * lost locally. In case that node becomes active, then | |
330 | * there is a chance of complete data loss in the active/standy | |
331 | * cluster. */ | |
332 | error = reset_database(db); | |
333 | if (!error) { | |
334 | error = process_notification(msg->result, db); | |
335 | } | |
23c16b51 AZ |
336 | if (error) { |
337 | ovsdb_error_assert(error); | |
338 | state = RPL_S_ERR; | |
339 | } else { | |
340 | /* Transition to replicating state after receiving | |
341 | * all replies of "monitor" requests. */ | |
342 | if (hmap_is_empty(&request_ids)) { | |
60e0cd04 | 343 | VLOG_DBG("Listening to monitor updates"); |
23c16b51 AZ |
344 | state = RPL_S_REPLICATING; |
345 | } | |
346 | } | |
347 | break; | |
348 | } | |
349 | ||
350 | case RPL_S_ERR: | |
351 | /* Ignore all messages */ | |
352 | break; | |
353 | ||
60e0cd04 | 354 | case RPL_S_INIT: |
23c16b51 AZ |
355 | case RPL_S_REPLICATING: |
356 | default: | |
357 | OVS_NOT_REACHED(); | |
358 | } | |
359 | } | |
360 | next: | |
361 | jsonrpc_msg_destroy(msg); | |
ae671c5f MC |
362 | } |
363 | } | |
364 | ||
8c945cec AZ |
365 | void |
366 | replication_wait(void) | |
367 | { | |
23c16b51 AZ |
368 | if (session) { |
369 | jsonrpc_session_wait(session); | |
370 | jsonrpc_session_recv_wait(session); | |
8c945cec AZ |
371 | } |
372 | } | |
373 | ||
3109b4e1 AZ |
374 | /* Parse 'blacklist' to rebuild 'blacklist_tables'. If 'dryrun' is false, the |
375 | * current black list tables will be wiped out, regardless of whether | |
376 | * 'blacklist' can be parsed. If 'dryrun' is true, only parses 'blacklist' and | |
377 | * reports any errors, without modifying the blacklist. | |
378 | * | |
379 | * On error, returns the error string, which the caller is | |
380 | * responsible for freeing. Returns NULL otherwise. */ | |
381 | char * OVS_WARN_UNUSED_RESULT | |
382 | set_blacklist_tables(const char *blacklist, bool dryrun) | |
7a9d65d2 | 383 | { |
3109b4e1 AZ |
384 | struct sset set = SSET_INITIALIZER(&set); |
385 | char *err = NULL; | |
386 | ||
7a9d65d2 | 387 | if (blacklist) { |
3109b4e1 AZ |
388 | const char *longname; |
389 | ||
390 | if (!dryrun) { | |
6ab3dd96 | 391 | /* Can only add to an empty shash. */ |
3109b4e1 AZ |
392 | blacklist_tables_clear(); |
393 | } | |
394 | ||
395 | sset_from_delimited_string(&set, blacklist, " ,"); | |
396 | SSET_FOR_EACH (longname, &set) { | |
397 | char *database = xstrdup(longname), *table = NULL; | |
398 | strtok_r(database, ":", &table); | |
399 | if (table && !dryrun) { | |
400 | blacklist_tables_add(database, table); | |
401 | } | |
402 | ||
403 | free(database); | |
404 | if (!table) { | |
405 | err = xasprintf("Can't parse black list table: %s", longname); | |
406 | goto done; | |
407 | } | |
408 | } | |
409 | } | |
410 | ||
411 | done: | |
412 | sset_destroy(&set); | |
413 | if (err && !dryrun) { | |
414 | /* On error, destroy the partially built 'blacklist_tables'. */ | |
415 | blacklist_tables_clear(); | |
416 | } | |
417 | return err; | |
418 | } | |
419 | ||
420 | char * OVS_WARN_UNUSED_RESULT | |
421 | get_blacklist_tables(void) | |
422 | { | |
423 | struct shash_node *node; | |
424 | struct sset set = SSET_INITIALIZER(&set); | |
425 | ||
426 | SHASH_FOR_EACH (node, &blacklist_tables) { | |
427 | const char *database = node->name; | |
428 | const char *table; | |
429 | struct sset *tables = node->data; | |
430 | ||
431 | SSET_FOR_EACH (table, tables) { | |
432 | sset_add_and_free(&set, xasprintf("%s:%s", database, table)); | |
433 | } | |
434 | } | |
435 | ||
436 | /* Output the table list in an sorted order, so that | |
437 | * the output string will not depend on the hash function | |
438 | * that used to implement the hmap data structure. This is | |
439 | * only useful for writting unit tests. */ | |
440 | const char **sorted = sset_sort(&set); | |
441 | struct ds ds = DS_EMPTY_INITIALIZER; | |
442 | size_t i; | |
443 | for (i = 0; i < sset_count(&set); i++) { | |
444 | ds_put_format(&ds, "%s,", sorted[i]); | |
7a9d65d2 | 445 | } |
3109b4e1 AZ |
446 | |
447 | ds_chomp(&ds, ','); | |
448 | ||
449 | free(sorted); | |
450 | sset_destroy(&set); | |
451 | ||
452 | return ds_steal_cstr(&ds); | |
7a9d65d2 MC |
453 | } |
454 | ||
3109b4e1 AZ |
455 | static void |
456 | blacklist_tables_clear(void) | |
9dc05cdc | 457 | { |
3109b4e1 AZ |
458 | struct shash_node *node; |
459 | SHASH_FOR_EACH (node, &blacklist_tables) { | |
460 | struct sset *tables = node->data; | |
461 | sset_destroy(tables); | |
462 | } | |
463 | ||
464 | shash_clear_free_data(&blacklist_tables); | |
465 | } | |
466 | ||
467 | static void | |
468 | blacklist_tables_add(const char *database, const char *table) | |
469 | { | |
470 | struct sset *tables = shash_find_data(&blacklist_tables, database); | |
471 | ||
472 | if (!tables) { | |
473 | tables = xmalloc(sizeof *tables); | |
474 | sset_init(tables); | |
475 | shash_add(&blacklist_tables, database, tables); | |
476 | } | |
477 | ||
478 | sset_add(tables, table); | |
479 | } | |
480 | ||
481 | static bool | |
482 | blacklist_tables_find(const char *database, const char *table) | |
483 | { | |
484 | struct sset *tables = shash_find_data(&blacklist_tables, database); | |
485 | return tables && sset_contains(tables, table); | |
9dc05cdc MC |
486 | } |
487 | ||
ae671c5f | 488 | void |
f53d7518 | 489 | disconnect_active_server(void) |
c9c5c9e2 | 490 | { |
23c16b51 AZ |
491 | jsonrpc_session_close(session); |
492 | session = NULL; | |
c9c5c9e2 MC |
493 | } |
494 | ||
495 | void | |
3109b4e1 | 496 | replication_destroy(void) |
ae671c5f | 497 | { |
3109b4e1 AZ |
498 | blacklist_tables_clear(); |
499 | shash_destroy(&blacklist_tables); | |
ae671c5f | 500 | |
60e0cd04 AZ |
501 | if (sync_from) { |
502 | free(sync_from); | |
503 | sync_from = NULL; | |
ae671c5f | 504 | } |
599c0f44 AZ |
505 | |
506 | request_ids_destroy(); | |
5e8bc3c5 | 507 | replication_dbs_destroy(); |
23c16b51 AZ |
508 | |
509 | shash_destroy(&local_dbs); | |
ae671c5f MC |
510 | } |
511 | ||
6ab3dd96 AZ |
512 | static struct ovsdb * |
513 | find_db(const char *db_name) | |
ae671c5f | 514 | { |
23c16b51 | 515 | return shash_find_data(replication_dbs, db_name); |
ae671c5f MC |
516 | } |
517 | \f | |
518 | static struct ovsdb_error * | |
23c16b51 | 519 | reset_database(struct ovsdb *db) |
ae671c5f | 520 | { |
23c16b51 | 521 | struct ovsdb_txn *txn = ovsdb_txn_create(db); |
ae671c5f MC |
522 | struct shash_node *table_node; |
523 | ||
524 | SHASH_FOR_EACH (table_node, &db->tables) { | |
3109b4e1 AZ |
525 | /* Delete all rows if the table is not blacklisted. */ |
526 | if (!blacklist_tables_find(db->schema->name, table_node->name)) { | |
527 | struct ovsdb_table *table = table_node->data; | |
45b4b20a BP |
528 | struct ovsdb_row *row, *next; |
529 | HMAP_FOR_EACH_SAFE (row, next, hmap_node, &table->rows) { | |
7a9d65d2 MC |
530 | ovsdb_txn_row_delete(txn, row); |
531 | } | |
ae671c5f MC |
532 | } |
533 | } | |
ae671c5f | 534 | |
1b1d2e6d | 535 | return ovsdb_txn_propose_commit_block(txn, false); |
ae671c5f MC |
536 | } |
537 | ||
23c16b51 AZ |
538 | /* Create a monitor request for 'db'. The monitor request will include |
539 | * any tables from 'blacklisted_tables' | |
540 | * | |
541 | * Caller is responsible for disposing 'request'. | |
542 | */ | |
543 | static struct jsonrpc_msg * | |
544 | create_monitor_request(struct ovsdb *db) | |
ae671c5f | 545 | { |
23c16b51 AZ |
546 | struct jsonrpc_msg *request; |
547 | struct json *monitor; | |
548 | struct ovsdb_schema *schema = db->schema; | |
549 | const char *db_name = schema->name; | |
ae671c5f | 550 | |
23c16b51 AZ |
551 | struct json *monitor_request = json_object_create(); |
552 | size_t n = shash_count(&schema->tables); | |
553 | const struct shash_node **nodes = shash_sort(&schema->tables); | |
ae671c5f | 554 | |
23c16b51 AZ |
555 | for (int j = 0; j < n; j++) { |
556 | struct ovsdb_table_schema *table = nodes[j]->data; | |
ae671c5f | 557 | |
23c16b51 AZ |
558 | /* Monitor all tables not blacklisted. */ |
559 | if (!blacklist_tables_find(db_name, table->name)) { | |
560 | add_monitored_table(table, monitor_request); | |
ae671c5f MC |
561 | } |
562 | } | |
23c16b51 | 563 | free(nodes); |
ae671c5f | 564 | |
23c16b51 AZ |
565 | /* Create a monitor request. */ |
566 | monitor = json_array_create_3( | |
567 | json_string_create(db_name), | |
568 | json_string_create(db_name), | |
569 | monitor_request); | |
570 | request = jsonrpc_create_request("monitor", monitor, NULL); | |
0cedc9db | 571 | |
23c16b51 | 572 | return request; |
ae671c5f MC |
573 | } |
574 | ||
575 | static void | |
576 | add_monitored_table(struct ovsdb_table_schema *table, | |
577 | struct json *monitor_request) | |
578 | { | |
579 | struct json *monitor_request_array; | |
580 | ||
ae671c5f MC |
581 | monitor_request_array = json_array_create_empty(); |
582 | json_array_add(monitor_request_array, json_object_create()); | |
583 | ||
584 | json_object_put(monitor_request, table->name, monitor_request_array); | |
585 | } | |
ae671c5f | 586 | |
23c16b51 AZ |
587 | \f |
588 | static struct ovsdb_error * | |
6ab3dd96 | 589 | process_notification(struct json *table_updates, struct ovsdb *db) |
ae671c5f | 590 | { |
23c16b51 | 591 | struct ovsdb_error *error = NULL; |
ae671c5f MC |
592 | struct ovsdb_txn *txn; |
593 | ||
23c16b51 AZ |
594 | if (table_updates->type == JSON_OBJECT) { |
595 | txn = ovsdb_txn_create(db); | |
596 | ||
597 | /* Process each table update. */ | |
598 | struct shash_node *node; | |
599 | SHASH_FOR_EACH (node, json_object(table_updates)) { | |
600 | struct json *table_update = node->data; | |
601 | if (table_update) { | |
602 | error = process_table_update(table_update, node->name, db, txn); | |
603 | if (error) { | |
604 | break; | |
605 | } | |
ae671c5f MC |
606 | } |
607 | } | |
ae671c5f | 608 | |
23c16b51 AZ |
609 | if (error) { |
610 | ovsdb_txn_abort(txn); | |
611 | return error; | |
612 | } else { | |
613 | /* Commit transaction. */ | |
1b1d2e6d | 614 | error = ovsdb_txn_propose_commit_block(txn, false); |
23c16b51 | 615 | } |
ae671c5f MC |
616 | } |
617 | ||
23c16b51 | 618 | return error; |
ae671c5f MC |
619 | } |
620 | ||
621 | static struct ovsdb_error * | |
622 | process_table_update(struct json *table_update, const char *table_name, | |
623 | struct ovsdb *database, struct ovsdb_txn *txn) | |
624 | { | |
1cb0f53c BP |
625 | struct ovsdb_table *table = ovsdb_get_table(database, table_name); |
626 | if (!table) { | |
627 | return ovsdb_error("unknown table", "unknown table %s", table_name); | |
628 | } | |
ae671c5f MC |
629 | |
630 | if (table_update->type != JSON_OBJECT) { | |
43898d4c WT |
631 | return ovsdb_error("Not a JSON object", |
632 | "<table-update> for table is not object"); | |
ae671c5f MC |
633 | } |
634 | ||
1cb0f53c | 635 | struct shash_node *node; |
ae671c5f MC |
636 | SHASH_FOR_EACH (node, json_object(table_update)) { |
637 | struct json *row_update = node->data; | |
638 | struct json *old, *new; | |
639 | ||
640 | if (row_update->type != JSON_OBJECT) { | |
1cb0f53c BP |
641 | return ovsdb_error("Not a JSON object", |
642 | "<row-update> is not object"); | |
643 | } | |
644 | ||
645 | struct uuid uuid; | |
646 | if (!uuid_from_string(&uuid, node->name)) { | |
647 | return ovsdb_syntax_error(table_update, "bad row UUID", | |
648 | "<table-update> names must be UUIDs"); | |
ae671c5f | 649 | } |
1cb0f53c | 650 | |
ae671c5f MC |
651 | old = shash_find_data(json_object(row_update), "old"); |
652 | new = shash_find_data(json_object(row_update), "new"); | |
653 | ||
1cb0f53c BP |
654 | struct ovsdb_error *error; |
655 | error = (!new ? execute_delete(txn, &uuid, table) | |
656 | : !old ? execute_insert(txn, &uuid, table, new) | |
657 | : execute_update(txn, &uuid, table, new)); | |
c9c5c9e2 | 658 | if (error) { |
1cb0f53c | 659 | return error; |
c9c5c9e2 | 660 | } |
ae671c5f | 661 | } |
1cb0f53c | 662 | return NULL; |
ae671c5f MC |
663 | } |
664 | \f | |
665 | static struct ovsdb_error * | |
1cb0f53c | 666 | execute_insert(struct ovsdb_txn *txn, const struct uuid *row_uuid, |
ae671c5f MC |
667 | struct ovsdb_table *table, struct json *json_row) |
668 | { | |
1cb0f53c BP |
669 | struct ovsdb_row *row = ovsdb_row_create(table); |
670 | struct ovsdb_error *error = ovsdb_row_from_json(row, json_row, NULL, NULL); | |
ae671c5f | 671 | if (!error) { |
1cb0f53c | 672 | *ovsdb_row_get_uuid_rw(row) = *row_uuid; |
ae671c5f MC |
673 | ovsdb_txn_row_insert(txn, row); |
674 | } else { | |
1cb0f53c BP |
675 | static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); |
676 | VLOG_WARN_RL(&rl, "cannot add existing row "UUID_FMT" to table %s", | |
677 | UUID_ARGS(row_uuid), table->schema->name); | |
ae671c5f MC |
678 | ovsdb_row_destroy(row); |
679 | } | |
680 | ||
681 | return error; | |
682 | } | |
683 | ||
ae671c5f | 684 | static struct ovsdb_error * |
1cb0f53c | 685 | execute_delete(struct ovsdb_txn *txn, const struct uuid *row_uuid, |
ae671c5f MC |
686 | struct ovsdb_table *table) |
687 | { | |
1cb0f53c BP |
688 | const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid); |
689 | if (row) { | |
690 | ovsdb_txn_row_delete(txn, row); | |
691 | } else { | |
692 | static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); | |
693 | VLOG_WARN_RL(&rl, "cannot delete missing row "UUID_FMT" from table %s", | |
694 | UUID_ARGS(row_uuid), table->schema->name); | |
ae671c5f | 695 | } |
1cb0f53c | 696 | return NULL; |
ae671c5f MC |
697 | } |
698 | ||
699 | static struct ovsdb_error * | |
1cb0f53c | 700 | execute_update(struct ovsdb_txn *txn, const struct uuid *row_uuid, |
ae671c5f MC |
701 | struct ovsdb_table *table, struct json *json_row) |
702 | { | |
1cb0f53c BP |
703 | const struct ovsdb_row *row = ovsdb_table_get_row(table, row_uuid); |
704 | if (!row) { | |
705 | static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); | |
706 | VLOG_WARN_RL(&rl, "cannot modify missing row "UUID_FMT" in table %s", | |
707 | UUID_ARGS(row_uuid), table->schema->name); | |
708 | return NULL; | |
ae671c5f | 709 | } |
1cb0f53c BP |
710 | |
711 | struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER; | |
712 | struct ovsdb_row *update = ovsdb_row_create(table); | |
713 | struct ovsdb_error *error = ovsdb_row_from_json(update, json_row, | |
714 | NULL, &columns); | |
715 | ||
716 | if (!error && !ovsdb_row_equal_columns(row, update, &columns)) { | |
717 | ovsdb_row_update_columns(ovsdb_txn_row_modify(txn, row), | |
718 | update, &columns); | |
ae671c5f MC |
719 | } |
720 | ||
ae671c5f | 721 | ovsdb_column_set_destroy(&columns); |
1cb0f53c | 722 | ovsdb_row_destroy(update); |
ae671c5f MC |
723 | return error; |
724 | } | |
725 | ||
599c0f44 AZ |
726 | void |
727 | request_ids_add(const struct json *id, struct ovsdb *db) | |
728 | { | |
729 | struct request_ids_hmap_node *node = xmalloc(sizeof *node); | |
730 | ||
731 | node->request_id = json_clone(id); | |
732 | node->db = db; | |
733 | hmap_insert(&request_ids, &node->hmap, json_hash(id, 0)); | |
734 | } | |
735 | ||
736 | /* Look up 'id' from 'request_ids', if found, remove the found id from | |
737 | * 'request_ids' and free its memory. If not found, 'request_ids' does | |
738 | * not change. Sets '*db' to the database for the request (NULL if not | |
739 | * found). | |
740 | * | |
741 | * Return true if 'id' is found, false otherwise. | |
742 | */ | |
743 | bool | |
744 | request_ids_lookup_and_free(const struct json *id, struct ovsdb **db) | |
745 | { | |
746 | struct request_ids_hmap_node *node; | |
747 | ||
748 | HMAP_FOR_EACH_WITH_HASH (node, hmap, json_hash(id, 0), &request_ids) { | |
749 | if (json_equal(id, node->request_id)) { | |
750 | hmap_remove(&request_ids, &node->hmap); | |
751 | *db = node->db; | |
752 | json_destroy(node->request_id); | |
753 | free(node); | |
754 | return true; | |
755 | } | |
756 | } | |
757 | ||
758 | *db = NULL; | |
759 | return false; | |
760 | } | |
761 | ||
762 | static void | |
763 | request_ids_destroy(void) | |
764 | { | |
765 | struct request_ids_hmap_node *node; | |
766 | ||
767 | HMAP_FOR_EACH_POP (node, hmap, &request_ids) { | |
768 | json_destroy(node->request_id); | |
769 | free(node); | |
770 | } | |
771 | hmap_destroy(&request_ids); | |
772 | } | |
773 | ||
774 | void | |
775 | request_ids_clear(void) | |
776 | { | |
777 | request_ids_destroy(); | |
778 | hmap_init(&request_ids); | |
779 | } | |
780 | ||
23c16b51 AZ |
781 | static struct shash * |
782 | replication_db_clone(struct shash *dbs) | |
783 | { | |
784 | struct shash *new = xmalloc(sizeof *new); | |
785 | shash_init(new); | |
786 | ||
787 | struct shash_node *node; | |
788 | SHASH_FOR_EACH (node, dbs) { | |
789 | shash_add(new, node->name, node->data); | |
790 | } | |
791 | ||
792 | return new; | |
793 | } | |
794 | ||
5e8bc3c5 AZ |
795 | static void |
796 | replication_dbs_destroy(void) | |
797 | { | |
798 | shash_destroy(replication_dbs); | |
799 | free(replication_dbs); | |
800 | replication_dbs = NULL; | |
801 | } | |
802 | ||
23c16b51 AZ |
803 | /* Return true if replication just started or is ongoing. |
804 | * Return false if the connection failed, or the replication | |
805 | * was not able to start. */ | |
806 | bool | |
807 | replication_is_alive(void) | |
808 | { | |
809 | if (session) { | |
810 | return jsonrpc_session_is_alive(session) && state != RPL_S_ERR; | |
811 | } | |
812 | return false; | |
813 | } | |
814 | ||
815 | /* Return the last error reported on a connection by 'session'. The | |
816 | * return value is 0 if replication is not currently running, or | |
817 | * if replication session has not encountered any error. | |
818 | * | |
819 | * Return a negative value if replication session has error, or the | |
820 | * replication was not able to start. */ | |
821 | int | |
822 | replication_get_last_error(void) | |
823 | { | |
824 | int err = 0; | |
825 | ||
826 | if (session) { | |
827 | err = jsonrpc_session_get_last_error(session); | |
828 | if (!err) { | |
829 | err = (state == RPL_S_ERR) ? ENOENT : 0; | |
830 | } | |
831 | } | |
832 | ||
833 | return err; | |
834 | } | |
835 | ||
60e0cd04 AZ |
836 | char * |
837 | replication_status(void) | |
838 | { | |
839 | bool alive = session && jsonrpc_session_is_alive(session); | |
840 | struct ds ds = DS_EMPTY_INITIALIZER; | |
841 | ||
842 | if (alive) { | |
843 | switch(state) { | |
844 | case RPL_S_INIT: | |
05ac209a | 845 | case RPL_S_SERVER_ID_REQUESTED: |
60e0cd04 AZ |
846 | case RPL_S_DB_REQUESTED: |
847 | case RPL_S_SCHEMA_REQUESTED: | |
848 | case RPL_S_MONITOR_REQUESTED: | |
849 | ds_put_format(&ds, "connecting: %s", sync_from); | |
850 | break; | |
851 | case RPL_S_REPLICATING: { | |
852 | struct shash_node *node; | |
853 | ||
854 | ds_put_format(&ds, "replicating: %s\n", sync_from); | |
855 | ds_put_cstr(&ds, "database:"); | |
856 | SHASH_FOR_EACH (node, replication_dbs) { | |
857 | ds_put_format(&ds, " %s,", node->name); | |
858 | } | |
859 | ds_chomp(&ds, ','); | |
860 | ||
861 | if (!shash_is_empty(&blacklist_tables)) { | |
862 | ds_put_char(&ds, '\n'); | |
863 | ds_put_cstr(&ds, "exclude: "); | |
864 | ds_put_and_free_cstr(&ds, get_blacklist_tables()); | |
865 | } | |
866 | break; | |
867 | } | |
868 | case RPL_S_ERR: | |
869 | ds_put_format(&ds, "Replication to (%s) failed\n", sync_from); | |
870 | break; | |
871 | default: | |
872 | OVS_NOT_REACHED(); | |
60e0cd04 AZ |
873 | } |
874 | } else { | |
875 | ds_put_format(&ds, "not connected to %s", sync_from); | |
876 | } | |
877 | return ds_steal_cstr(&ds); | |
878 | } | |
879 | ||
ae671c5f MC |
880 | void |
881 | replication_usage(void) | |
882 | { | |
883 | printf("\n\ | |
884 | Syncing options:\n\ | |
60e0cd04 AZ |
885 | --sync-from=SERVER sync DATABASE from active SERVER and start in\n\ |
886 | backup mode (except with --active)\n\ | |
ae671c5f | 887 | --sync-exclude-tables=DB:TABLE,...\n\ |
60e0cd04 AZ |
888 | exclude the TABLE in DB from syncing\n\ |
889 | --active with --sync-from, start in active mode\n"); | |
ae671c5f | 890 | } |