]>
Commit | Line | Data |
---|---|---|
a5c067a8 BP |
1 | /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017 Nicira, Inc. |
2 | * Copyright (C) 2016 Hewlett Packard Enterprise Development LP | |
3 | * | |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | * you may not use this file except in compliance with the License. | |
6 | * You may obtain a copy of the License at: | |
7 | * | |
8 | * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | * | |
10 | * Unless required by applicable law or agreed to in writing, software | |
11 | * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | * WITHOUT WARRANTIES OR CONDITION OF ANY KIND, either express or implied. | |
13 | * See the License for the specific language governing permissions and | |
14 | * limitations under the License. | |
15 | */ | |
16 | ||
17 | #include <config.h> | |
18 | ||
19 | #include "ovsdb-cs.h" | |
20 | ||
21 | #include <errno.h> | |
22 | ||
23 | #include "hash.h" | |
24 | #include "jsonrpc.h" | |
25 | #include "openvswitch/dynamic-string.h" | |
26 | #include "openvswitch/hmap.h" | |
27 | #include "openvswitch/json.h" | |
28 | #include "openvswitch/poll-loop.h" | |
29 | #include "openvswitch/shash.h" | |
30 | #include "openvswitch/vlog.h" | |
31 | #include "ovsdb-data.h" | |
32 | #include "ovsdb-error.h" | |
33 | #include "ovsdb-parser.h" | |
34 | #include "ovsdb-session.h" | |
35 | #include "ovsdb-types.h" | |
36 | #include "sset.h" | |
37 | #include "svec.h" | |
38 | #include "util.h" | |
39 | #include "uuid.h" | |
40 | ||
41 | VLOG_DEFINE_THIS_MODULE(ovsdb_cs); | |
1c337c43 BP |
42 | |
43 | /* Connection state machine. | |
44 | * | |
45 | * When a JSON-RPC session connects, the CS layer sends a "monitor_cond" | |
46 | * request for the Database table in the _Server database and transitions to | |
47 | * the CS_S_SERVER_MONITOR_REQUESTED state. If the session drops and | |
48 | * reconnects, or if the CS receives a "monitor_canceled" notification for a | |
49 | * table it is monitoring, the CS starts over again in the same way. */ | |
50 | #define OVSDB_CS_STATES \ | |
51 | /* Waits for "get_schema" reply, then sends "monitor_cond" \ | |
52 | * request for the Database table in the _Server database, whose \ | |
53 | * details are informed by the schema, and transitions to \ | |
54 | * CS_S_SERVER_MONITOR_REQUESTED. */ \ | |
55 | OVSDB_CS_STATE(SERVER_SCHEMA_REQUESTED) \ | |
56 | \ | |
57 | /* Waits for "monitor_cond" reply for the Database table: \ | |
58 | * \ | |
59 | * - If the reply indicates success, and the Database table has a \ | |
60 | * row for the CS database: \ | |
61 | * \ | |
62 | * * If the row indicates that this is a clustered database \ | |
63 | * that is not connected to the cluster, closes the \ | |
64 | * connection. The next connection attempt has a chance at \ | |
65 | * picking a connected server. \ | |
66 | * \ | |
67 | * * Otherwise, sends a monitoring request for the CS \ | |
68 | * database whose details are informed by the schema \ | |
69 | * (obtained from the row), and transitions to \ | |
70 | * CS_S_DATA_MONITOR_(COND_(SINCE_))REQUESTED. \ | |
71 | * \ | |
72 | * - If the reply indicates success, but the Database table does \ | |
73 | * not have a row for the CS database, transitions to \ | |
74 | * CS_S_ERROR. \ | |
75 | * \ | |
76 | * - If the reply indicates failure, sends a "get_schema" request \ | |
77 | * for the CS database and transitions to \ | |
78 | * CS_S_DATA_SCHEMA_REQUESTED. */ \ | |
79 | OVSDB_CS_STATE(SERVER_MONITOR_REQUESTED) \ | |
80 | \ | |
81 | /* Waits for "get_schema" reply, then sends "monitor_cond" \ | |
82 | * request whose details are informed by the schema, and \ | |
83 | * transitions to CS_S_DATA_MONITOR_COND_REQUESTED. */ \ | |
84 | OVSDB_CS_STATE(DATA_SCHEMA_REQUESTED) \ | |
85 | \ | |
86 | /* Waits for "monitor_cond_since" reply. If successful, replaces \ | |
87 | * the CS contents by the data carried in the reply and \ | |
88 | * transitions to CS_S_MONITORING. On failure, sends a \ | |
89 | * "monitor_cond" request and transitions to \ | |
90 | * CS_S_DATA_MONITOR_COND_REQUESTED. */ \ | |
91 | OVSDB_CS_STATE(DATA_MONITOR_COND_SINCE_REQUESTED) \ | |
92 | \ | |
93 | /* Waits for "monitor_cond" reply. If successful, replaces the \ | |
94 | * CS contents by the data carried in the reply and transitions \ | |
95 | * to CS_S_MONITORING. On failure, sends a "monitor" request \ | |
96 | * and transitions to CS_S_DATA_MONITOR_REQUESTED. */ \ | |
97 | OVSDB_CS_STATE(DATA_MONITOR_COND_REQUESTED) \ | |
98 | \ | |
99 | /* Waits for "monitor" reply. If successful, replaces the CS \ | |
100 | * contents by the data carried in the reply and transitions to \ | |
101 | * CS_S_MONITORING. On failure, transitions to CS_S_ERROR. */ \ | |
102 | OVSDB_CS_STATE(DATA_MONITOR_REQUESTED) \ | |
103 | \ | |
104 | /* State that processes "update", "update2" or "update3" \ | |
105 | * notifications for the main database (and the Database table \ | |
106 | * in _Server if available). \ | |
107 | * \ | |
108 | * If we're monitoring the Database table and we get notified \ | |
109 | * that the CS database has been deleted, we close the \ | |
110 | * connection (which will restart the state machine). */ \ | |
111 | OVSDB_CS_STATE(MONITORING) \ | |
112 | \ | |
113 | /* Terminal error state that indicates that nothing useful can be \ | |
114 | * done, for example because the database server doesn't actually \ | |
115 | * have the desired database. We maintain the session with the \ | |
116 | * database server anyway. If it starts serving the database \ | |
117 | * that we want, or if someone fixes and restarts the database, \ | |
118 | * then it will kill the session and we will automatically \ | |
119 | * reconnect and try again. */ \ | |
120 | OVSDB_CS_STATE(ERROR) \ | |
121 | \ | |
122 | /* Terminal state that indicates we connected to a useless server \ | |
123 | * in a cluster, e.g. one that is partitioned from the rest of \ | |
124 | * the cluster. We're waiting to retry. */ \ | |
125 | OVSDB_CS_STATE(RETRY) | |
126 | ||
127 | enum ovsdb_cs_state { | |
128 | #define OVSDB_CS_STATE(NAME) CS_S_##NAME, | |
129 | OVSDB_CS_STATES | |
130 | #undef OVSDB_CS_STATE | |
131 | }; | |
132 | ||
133 | static const char * | |
134 | ovsdb_cs_state_to_string(enum ovsdb_cs_state state) | |
135 | { | |
136 | switch (state) { | |
137 | #define OVSDB_CS_STATE(NAME) case CS_S_##NAME: return #NAME; | |
138 | OVSDB_CS_STATES | |
139 | #undef OVSDB_CS_STATE | |
140 | default: return "<unknown>"; | |
141 | } | |
142 | } | |
143 | ||
144 | /* A database being monitored. | |
145 | * | |
146 | * There are two instances of this data structure for each CS instance, one for | |
147 | * the _Server database used for working with clusters, and the other one for | |
148 | * the actual database that the client is interested in. */ | |
149 | struct ovsdb_cs_db { | |
150 | struct ovsdb_cs *cs; | |
151 | ||
152 | /* Data. */ | |
153 | const char *db_name; /* Database's name. */ | |
154 | struct hmap tables; /* Contains "struct ovsdb_cs_db_table *"s.*/ | |
155 | struct json *monitor_id; | |
156 | struct json *schema; | |
157 | ||
158 | /* Monitor version. */ | |
159 | int max_version; /* Maximum version of monitor request to use. */ | |
160 | int monitor_version; /* 0 if not monitoring, 1=monitor, | |
161 | * 2=monitor_cond, 3=monitor_cond_since. */ | |
162 | ||
163 | /* Condition changes. */ | |
164 | bool cond_changed; /* Change not yet sent to server? */ | |
165 | unsigned int cond_seqno; /* Increments when condition changes. */ | |
166 | ||
167 | /* Database locking. */ | |
168 | char *lock_name; /* Name of lock we need, NULL if none. */ | |
169 | bool has_lock; /* Has db server told us we have the lock? */ | |
170 | bool is_lock_contended; /* Has db server told us we can't get lock? */ | |
171 | struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */ | |
172 | ||
173 | /* Last db txn id, used for fast resync through monitor_cond_since */ | |
174 | struct uuid last_id; | |
175 | ||
176 | /* Client interface. */ | |
177 | struct ovs_list events; | |
178 | const struct ovsdb_cs_ops *ops; | |
179 | void *ops_aux; | |
180 | }; | |
181 | ||
182 | static const struct ovsdb_cs_ops ovsdb_cs_server_ops; | |
183 | ||
184 | static void ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *); | |
185 | static unsigned int ovsdb_cs_db_set_condition( | |
186 | struct ovsdb_cs_db *, const char *db_name, const struct json *condition); | |
187 | ||
188 | static void ovsdb_cs_send_schema_request(struct ovsdb_cs *, | |
189 | struct ovsdb_cs_db *); | |
190 | static void ovsdb_cs_send_db_change_aware(struct ovsdb_cs *); | |
191 | static bool ovsdb_cs_check_server_db(struct ovsdb_cs *); | |
192 | static void ovsdb_cs_clear_server_rows(struct ovsdb_cs *); | |
193 | static void ovsdb_cs_send_monitor_request(struct ovsdb_cs *, | |
194 | struct ovsdb_cs_db *, int version); | |
195 | static void ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db); | |
196 | static void ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db); | |
197 | ||
198 | struct ovsdb_cs { | |
199 | struct ovsdb_cs_db server; | |
200 | struct ovsdb_cs_db data; | |
201 | ||
202 | /* Session state. | |
203 | * | |
204 | * 'state_seqno' is a snapshot of the session's sequence number as returned | |
205 | * jsonrpc_session_get_seqno(session), so if it differs from the value that | |
206 | * function currently returns then the session has reconnected and the | |
207 | * state machine must restart. */ | |
208 | struct jsonrpc_session *session; /* Connection to the server. */ | |
209 | char *remote; /* 'session' remote name. */ | |
210 | enum ovsdb_cs_state state; /* Current session state. */ | |
211 | unsigned int state_seqno; /* See above. */ | |
212 | struct json *request_id; /* JSON ID for request awaiting reply. */ | |
213 | ||
214 | /* IDs of outstanding transactions. */ | |
215 | struct json **txns; | |
216 | size_t n_txns, allocated_txns; | |
217 | ||
218 | /* Info for the _Server database. */ | |
219 | struct uuid cid; | |
220 | struct hmap server_rows; | |
221 | ||
222 | /* Clustered servers. */ | |
223 | uint64_t min_index; /* Minimum allowed index, to avoid regression. */ | |
224 | bool leader_only; /* If true, do not connect to Raft followers. */ | |
225 | bool shuffle_remotes; /* If true, connect to servers in random order. */ | |
226 | }; | |
227 | ||
228 | static void ovsdb_cs_transition_at(struct ovsdb_cs *, enum ovsdb_cs_state, | |
229 | const char *where); | |
230 | #define ovsdb_cs_transition(CS, STATE) \ | |
231 | ovsdb_cs_transition_at(CS, STATE, OVS_SOURCE_LOCATOR) | |
232 | ||
233 | static void ovsdb_cs_retry_at(struct ovsdb_cs *, const char *where); | |
234 | #define ovsdb_cs_retry(CS) ovsdb_cs_retry_at(CS, OVS_SOURCE_LOCATOR) | |
235 | ||
236 | static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5); | |
237 | ||
238 | static void ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *, | |
239 | const struct json *result, | |
240 | int version); | |
241 | static bool ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *, | |
242 | const struct jsonrpc_msg *); | |
243 | static bool ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *, | |
244 | struct ovsdb_cs_db *, | |
245 | const struct jsonrpc_msg *); | |
246 | ||
247 | static bool ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *, | |
248 | const struct jsonrpc_msg *); | |
249 | static struct jsonrpc_msg *ovsdb_cs_db_compose_lock_request( | |
250 | struct ovsdb_cs_db *); | |
251 | static struct jsonrpc_msg *ovsdb_cs_db_compose_unlock_request( | |
252 | struct ovsdb_cs_db *); | |
253 | static void ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *, | |
254 | const struct json *); | |
255 | static bool ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *, | |
256 | const struct json *params, | |
257 | bool new_has_lock); | |
258 | static void ovsdb_cs_send_cond_change(struct ovsdb_cs *); | |
259 | ||
260 | static bool ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *, | |
261 | const struct jsonrpc_msg *reply); | |
262 | \f | |
263 | /* Events. */ | |
264 | ||
265 | void | |
266 | ovsdb_cs_event_destroy(struct ovsdb_cs_event *event) | |
267 | { | |
268 | if (event) { | |
269 | switch (event->type) { | |
270 | case OVSDB_CS_EVENT_TYPE_RECONNECT: | |
271 | case OVSDB_CS_EVENT_TYPE_LOCKED: | |
272 | break; | |
273 | ||
274 | case OVSDB_CS_EVENT_TYPE_UPDATE: | |
275 | json_destroy(event->update.table_updates); | |
276 | break; | |
277 | ||
278 | case OVSDB_CS_EVENT_TYPE_TXN_REPLY: | |
279 | jsonrpc_msg_destroy(event->txn_reply); | |
280 | break; | |
281 | } | |
282 | free(event); | |
283 | } | |
284 | } | |
285 | \f | |
286 | /* Lifecycle. */ | |
287 | ||
288 | static void | |
289 | ovsdb_cs_db_init(struct ovsdb_cs_db *db, const char *db_name, | |
290 | struct ovsdb_cs *parent, int max_version, | |
291 | const struct ovsdb_cs_ops *ops, void *ops_aux) | |
292 | { | |
293 | *db = (struct ovsdb_cs_db) { | |
294 | .cs = parent, | |
295 | .db_name = db_name, | |
296 | .tables = HMAP_INITIALIZER(&db->tables), | |
297 | .max_version = max_version, | |
298 | .monitor_id = json_array_create_2(json_string_create("monid"), | |
299 | json_string_create(db_name)), | |
300 | .events = OVS_LIST_INITIALIZER(&db->events), | |
301 | .ops = ops, | |
302 | .ops_aux = ops_aux, | |
303 | }; | |
304 | } | |
305 | ||
306 | /* Creates and returns a new client synchronization object. The connection | |
307 | * will monitor remote database 'db_name'. If 'retry' is true, then also | |
308 | * reconnect if the connection fails. | |
309 | * | |
310 | * XXX 'max_version' should ordinarily be 3, to allow use of the most efficient | |
311 | * "monitor_cond_since" method with the database. Currently there's some kind | |
312 | * of bug in the DDlog Rust code that interfaces to that, so instead | |
313 | * ovn-northd-ddlog passes 1 to use plain 'monitor' instead. Once the DDlog | |
314 | * Rust code gets fixed, we might as well just delete 'max_version' | |
315 | * entirely. | |
316 | * | |
317 | * 'ops' is a struct for northd_cs_run() to use, and 'ops_aux' is a pointer | |
318 | * that gets passed into each call. | |
319 | * | |
320 | * Use ovsdb_cs_set_remote() to configure the database to which to connect. | |
321 | * Until a remote is configured, no data can be retrieved. | |
322 | */ | |
323 | struct ovsdb_cs * | |
324 | ovsdb_cs_create(const char *db_name, int max_version, | |
325 | const struct ovsdb_cs_ops *ops, void *ops_aux) | |
326 | { | |
327 | struct ovsdb_cs *cs = xzalloc(sizeof *cs); | |
328 | ovsdb_cs_db_init(&cs->server, "_Server", cs, 2, &ovsdb_cs_server_ops, cs); | |
329 | ovsdb_cs_db_init(&cs->data, db_name, cs, max_version, ops, ops_aux); | |
330 | cs->state_seqno = UINT_MAX; | |
331 | cs->request_id = NULL; | |
332 | cs->leader_only = true; | |
333 | cs->shuffle_remotes = true; | |
334 | hmap_init(&cs->server_rows); | |
335 | ||
336 | return cs; | |
337 | } | |
338 | ||
339 | static void | |
340 | ovsdb_cs_db_destroy(struct ovsdb_cs_db *db) | |
341 | { | |
342 | ovsdb_cs_db_destroy_tables(db); | |
343 | ||
344 | json_destroy(db->monitor_id); | |
345 | json_destroy(db->schema); | |
346 | ||
347 | free(db->lock_name); | |
348 | ||
349 | json_destroy(db->lock_request_id); | |
350 | ||
351 | /* This list always gets flushed out at the end of ovsdb_cs_run(). */ | |
352 | ovs_assert(ovs_list_is_empty(&db->events)); | |
353 | } | |
354 | ||
355 | /* Destroys 'cs' and all of the data structures that it manages. */ | |
356 | void | |
357 | ovsdb_cs_destroy(struct ovsdb_cs *cs) | |
358 | { | |
359 | if (cs) { | |
360 | ovsdb_cs_db_destroy(&cs->server); | |
361 | ovsdb_cs_db_destroy(&cs->data); | |
362 | jsonrpc_session_close(cs->session); | |
363 | free(cs->remote); | |
364 | json_destroy(cs->request_id); | |
365 | ||
366 | for (size_t i = 0; i < cs->n_txns; i++) { | |
367 | json_destroy(cs->txns[i]); | |
368 | } | |
369 | free(cs->txns); | |
370 | ||
371 | ovsdb_cs_clear_server_rows(cs); | |
372 | hmap_destroy(&cs->server_rows); | |
373 | ||
374 | free(cs); | |
375 | } | |
376 | } | |
377 | ||
378 | static void | |
379 | ovsdb_cs_transition_at(struct ovsdb_cs *cs, enum ovsdb_cs_state new_state, | |
380 | const char *where) | |
381 | { | |
382 | VLOG_DBG("%s: %s -> %s at %s", | |
383 | cs->session ? jsonrpc_session_get_name(cs->session) : "void", | |
384 | ovsdb_cs_state_to_string(cs->state), | |
385 | ovsdb_cs_state_to_string(new_state), | |
386 | where); | |
387 | cs->state = new_state; | |
388 | } | |
389 | ||
390 | static void | |
391 | ovsdb_cs_send_request(struct ovsdb_cs *cs, struct jsonrpc_msg *request) | |
392 | { | |
393 | json_destroy(cs->request_id); | |
394 | cs->request_id = json_clone(request->id); | |
395 | if (cs->session) { | |
396 | jsonrpc_session_send(cs->session, request); | |
397 | } else { | |
398 | jsonrpc_msg_destroy(request); | |
399 | } | |
400 | } | |
401 | ||
402 | static void | |
403 | ovsdb_cs_retry_at(struct ovsdb_cs *cs, const char *where) | |
404 | { | |
405 | ovsdb_cs_force_reconnect(cs); | |
406 | ovsdb_cs_transition_at(cs, CS_S_RETRY, where); | |
407 | } | |
408 | ||
409 | static void | |
410 | ovsdb_cs_restart_fsm(struct ovsdb_cs *cs) | |
411 | { | |
412 | /* Resync data DB table conditions to avoid missing updates due to | |
413 | * conditions that were in flight or changed locally while the connection | |
414 | * was down. | |
415 | */ | |
416 | ovsdb_cs_db_sync_condition(&cs->data); | |
417 | ||
418 | ovsdb_cs_send_schema_request(cs, &cs->server); | |
419 | ovsdb_cs_transition(cs, CS_S_SERVER_SCHEMA_REQUESTED); | |
420 | cs->data.monitor_version = 0; | |
421 | cs->server.monitor_version = 0; | |
422 | } | |
423 | ||
424 | static void | |
425 | ovsdb_cs_process_response(struct ovsdb_cs *cs, struct jsonrpc_msg *msg) | |
426 | { | |
427 | bool ok = msg->type == JSONRPC_REPLY; | |
428 | if (!ok | |
429 | && cs->state != CS_S_SERVER_SCHEMA_REQUESTED | |
430 | && cs->state != CS_S_SERVER_MONITOR_REQUESTED | |
431 | && cs->state != CS_S_DATA_MONITOR_COND_REQUESTED | |
432 | && cs->state != CS_S_DATA_MONITOR_COND_SINCE_REQUESTED) { | |
433 | static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); | |
434 | char *s = jsonrpc_msg_to_string(msg); | |
435 | VLOG_INFO_RL(&rl, "%s: received unexpected %s response in " | |
436 | "%s state: %s", jsonrpc_session_get_name(cs->session), | |
437 | jsonrpc_msg_type_to_string(msg->type), | |
438 | ovsdb_cs_state_to_string(cs->state), | |
439 | s); | |
440 | free(s); | |
441 | ovsdb_cs_retry(cs); | |
442 | return; | |
443 | } | |
444 | ||
445 | switch (cs->state) { | |
446 | case CS_S_SERVER_SCHEMA_REQUESTED: | |
447 | if (ok) { | |
448 | json_destroy(cs->server.schema); | |
449 | cs->server.schema = json_clone(msg->result); | |
450 | ovsdb_cs_send_monitor_request(cs, &cs->server, | |
451 | cs->server.max_version); | |
452 | ovsdb_cs_transition(cs, CS_S_SERVER_MONITOR_REQUESTED); | |
453 | } else { | |
454 | ovsdb_cs_send_schema_request(cs, &cs->data); | |
455 | ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED); | |
456 | } | |
457 | break; | |
458 | ||
459 | case CS_S_SERVER_MONITOR_REQUESTED: | |
460 | if (ok) { | |
461 | cs->server.monitor_version = cs->server.max_version; | |
462 | ovsdb_cs_db_parse_monitor_reply(&cs->server, msg->result, | |
463 | cs->server.monitor_version); | |
464 | if (ovsdb_cs_check_server_db(cs)) { | |
465 | ovsdb_cs_send_db_change_aware(cs); | |
466 | } | |
467 | } else { | |
468 | ovsdb_cs_send_schema_request(cs, &cs->data); | |
469 | ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED); | |
470 | } | |
471 | break; | |
472 | ||
473 | case CS_S_DATA_SCHEMA_REQUESTED: | |
474 | json_destroy(cs->data.schema); | |
475 | cs->data.schema = json_clone(msg->result); | |
476 | if (cs->data.max_version >= 2) { | |
477 | ovsdb_cs_send_monitor_request(cs, &cs->data, 2); | |
478 | ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED); | |
479 | } else { | |
480 | ovsdb_cs_send_monitor_request(cs, &cs->data, 1); | |
481 | ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED); | |
482 | } | |
483 | break; | |
484 | ||
485 | case CS_S_DATA_MONITOR_COND_SINCE_REQUESTED: | |
486 | if (!ok) { | |
487 | /* "monitor_cond_since" not supported. Try "monitor_cond". */ | |
488 | ovsdb_cs_send_monitor_request(cs, &cs->data, 2); | |
489 | ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED); | |
490 | } else { | |
491 | cs->data.monitor_version = 3; | |
492 | ovsdb_cs_transition(cs, CS_S_MONITORING); | |
493 | ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 3); | |
494 | } | |
495 | break; | |
496 | ||
497 | case CS_S_DATA_MONITOR_COND_REQUESTED: | |
498 | if (!ok) { | |
499 | /* "monitor_cond" not supported. Try "monitor". */ | |
500 | ovsdb_cs_send_monitor_request(cs, &cs->data, 1); | |
501 | ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED); | |
502 | } else { | |
503 | cs->data.monitor_version = 2; | |
504 | ovsdb_cs_transition(cs, CS_S_MONITORING); | |
505 | ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 2); | |
506 | } | |
507 | break; | |
508 | ||
509 | case CS_S_DATA_MONITOR_REQUESTED: | |
510 | cs->data.monitor_version = 1; | |
511 | ovsdb_cs_transition(cs, CS_S_MONITORING); | |
512 | ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 1); | |
513 | break; | |
514 | ||
515 | case CS_S_MONITORING: | |
516 | /* We don't normally have a request outstanding in this state. If we | |
517 | * do, it's a "monitor_cond_change", which means that the conditional | |
518 | * monitor clauses were updated. | |
519 | * | |
520 | * Mark the last requested conditions as acked and if further | |
521 | * condition changes were pending, send them now. */ | |
522 | ovsdb_cs_db_ack_condition(&cs->data); | |
523 | ovsdb_cs_send_cond_change(cs); | |
524 | cs->data.cond_seqno++; | |
525 | break; | |
526 | ||
527 | case CS_S_ERROR: | |
528 | case CS_S_RETRY: | |
529 | /* Nothing to do in this state. */ | |
530 | break; | |
531 | ||
532 | default: | |
533 | OVS_NOT_REACHED(); | |
534 | } | |
535 | } | |
536 | ||
537 | static void | |
538 | ovsdb_cs_process_msg(struct ovsdb_cs *cs, struct jsonrpc_msg *msg) | |
539 | { | |
540 | bool is_response = (msg->type == JSONRPC_REPLY || | |
541 | msg->type == JSONRPC_ERROR); | |
542 | ||
543 | /* Process a reply to an outstanding request. */ | |
544 | if (is_response | |
545 | && cs->request_id && json_equal(cs->request_id, msg->id)) { | |
546 | json_destroy(cs->request_id); | |
547 | cs->request_id = NULL; | |
548 | ovsdb_cs_process_response(cs, msg); | |
549 | return; | |
550 | } | |
551 | ||
552 | /* Process database contents updates. */ | |
553 | if (ovsdb_cs_db_parse_update_rpc(&cs->data, msg)) { | |
554 | return; | |
555 | } | |
556 | if (cs->server.monitor_version | |
557 | && ovsdb_cs_db_parse_update_rpc(&cs->server, msg)) { | |
558 | ovsdb_cs_check_server_db(cs); | |
559 | return; | |
560 | } | |
561 | ||
562 | if (ovsdb_cs_handle_monitor_canceled(cs, &cs->data, msg) | |
563 | || (cs->server.monitor_version | |
564 | && ovsdb_cs_handle_monitor_canceled(cs, &cs->server, msg))) { | |
565 | return; | |
566 | } | |
567 | ||
568 | /* Process "lock" replies and related notifications. */ | |
569 | if (ovsdb_cs_db_process_lock_replies(&cs->data, msg)) { | |
570 | return; | |
571 | } | |
572 | ||
573 | /* Process response to a database transaction we submitted. */ | |
574 | if (is_response && ovsdb_cs_db_txn_process_reply(cs, msg)) { | |
575 | return; | |
576 | } | |
577 | ||
578 | /* Unknown message. Log at a low level because this can happen if | |
579 | * ovsdb_cs_txn_destroy() is called to destroy a transaction | |
580 | * before we receive the reply. | |
581 | * | |
582 | * (We could sort those out from other kinds of unknown messages by | |
583 | * using distinctive IDs for transactions, if it seems valuable to | |
584 | * do so, and then it would be possible to use different log | |
585 | * levels. XXX?) */ | |
586 | char *s = jsonrpc_msg_to_string(msg); | |
587 | VLOG_DBG("%s: received unexpected %s message: %s", | |
588 | jsonrpc_session_get_name(cs->session), | |
589 | jsonrpc_msg_type_to_string(msg->type), s); | |
590 | free(s); | |
591 | } | |
592 | ||
593 | static struct ovsdb_cs_event * | |
594 | ovsdb_cs_db_add_event(struct ovsdb_cs_db *db, enum ovsdb_cs_event_type type) | |
595 | { | |
596 | struct ovsdb_cs_event *event = xmalloc(sizeof *event); | |
597 | event->type = type; | |
598 | ovs_list_push_back(&db->events, &event->list_node); | |
599 | return event; | |
600 | } | |
601 | ||
602 | /* Processes a batch of messages from the database server on 'cs'. This may | |
603 | * cause the CS's contents to change. | |
604 | * | |
605 | * Initializes 'events' with a list of events that occurred on 'cs'. The | |
606 | * caller must process and destroy all of the events. */ | |
607 | void | |
608 | ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events) | |
609 | { | |
610 | ovs_list_init(events); | |
611 | if (!cs->session) { | |
612 | return; | |
613 | } | |
614 | ||
615 | ovsdb_cs_send_cond_change(cs); | |
616 | ||
617 | jsonrpc_session_run(cs->session); | |
618 | ||
619 | unsigned int seqno = jsonrpc_session_get_seqno(cs->session); | |
620 | if (cs->state_seqno != seqno) { | |
621 | cs->state_seqno = seqno; | |
622 | ovsdb_cs_restart_fsm(cs); | |
623 | ||
624 | for (size_t i = 0; i < cs->n_txns; i++) { | |
625 | json_destroy(cs->txns[i]); | |
626 | } | |
627 | cs->n_txns = 0; | |
628 | ||
629 | ovsdb_cs_db_add_event(&cs->data, OVSDB_CS_EVENT_TYPE_RECONNECT); | |
630 | ||
631 | if (cs->data.lock_name) { | |
632 | jsonrpc_session_send( | |
633 | cs->session, | |
634 | ovsdb_cs_db_compose_lock_request(&cs->data)); | |
635 | } | |
636 | } | |
637 | ||
638 | for (int i = 0; i < 50; i++) { | |
639 | struct jsonrpc_msg *msg = jsonrpc_session_recv(cs->session); | |
640 | if (!msg) { | |
641 | break; | |
642 | } | |
643 | ovsdb_cs_process_msg(cs, msg); | |
644 | jsonrpc_msg_destroy(msg); | |
645 | } | |
646 | ovs_list_push_back_all(events, &cs->data.events); | |
647 | } | |
648 | ||
649 | /* Arranges for poll_block() to wake up when ovsdb_cs_run() has something to | |
650 | * do or when activity occurs on a transaction on 'cs'. */ | |
651 | void | |
652 | ovsdb_cs_wait(struct ovsdb_cs *cs) | |
653 | { | |
654 | if (!cs->session) { | |
655 | return; | |
656 | } | |
657 | jsonrpc_session_wait(cs->session); | |
658 | jsonrpc_session_recv_wait(cs->session); | |
659 | } | |
660 | \f | |
661 | /* Network connection. */ | |
662 | ||
663 | /* Changes the remote and creates a new session. | |
664 | * | |
665 | * If 'retry' is true, the connection to the remote will automatically retry | |
666 | * when it fails. If 'retry' is false, the connection is one-time. */ | |
667 | void | |
668 | ovsdb_cs_set_remote(struct ovsdb_cs *cs, const char *remote, bool retry) | |
669 | { | |
670 | if (cs | |
671 | && ((remote != NULL) != (cs->remote != NULL) | |
672 | || (remote && cs->remote && strcmp(remote, cs->remote)))) { | |
673 | /* Close the old session, if any. */ | |
674 | if (cs->session) { | |
675 | jsonrpc_session_close(cs->session); | |
676 | cs->session = NULL; | |
677 | ||
678 | free(cs->remote); | |
679 | cs->remote = NULL; | |
680 | } | |
681 | ||
682 | /* Open new session, if any. */ | |
683 | if (remote) { | |
684 | struct svec remotes = SVEC_EMPTY_INITIALIZER; | |
685 | ovsdb_session_parse_remote(remote, &remotes, &cs->cid); | |
686 | if (cs->shuffle_remotes) { | |
687 | svec_shuffle(&remotes); | |
688 | } | |
689 | cs->session = jsonrpc_session_open_multiple(&remotes, retry); | |
690 | svec_destroy(&remotes); | |
691 | ||
692 | cs->state_seqno = UINT_MAX; | |
693 | ||
694 | cs->remote = xstrdup(remote); | |
695 | } | |
696 | } | |
697 | } | |
698 | ||
699 | /* Reconfigures 'cs' so that it would reconnect to the database, if | |
700 | * connection was dropped. */ | |
701 | void | |
702 | ovsdb_cs_enable_reconnect(struct ovsdb_cs *cs) | |
703 | { | |
704 | if (cs->session) { | |
705 | jsonrpc_session_enable_reconnect(cs->session); | |
706 | } | |
707 | } | |
708 | ||
709 | /* Forces 'cs' to drop its connection to the database and reconnect. In the | |
710 | * meantime, the contents of 'cs' will not change. */ | |
711 | void | |
712 | ovsdb_cs_force_reconnect(struct ovsdb_cs *cs) | |
713 | { | |
714 | if (cs->session) { | |
715 | jsonrpc_session_force_reconnect(cs->session); | |
716 | } | |
717 | } | |
718 | ||
719 | /* Drops 'cs''s current connection and the cached session. This is useful if | |
720 | * the client notices some kind of inconsistency. */ | |
721 | void | |
722 | ovsdb_cs_flag_inconsistency(struct ovsdb_cs *cs) | |
723 | { | |
724 | cs->data.last_id = UUID_ZERO; | |
725 | ovsdb_cs_retry(cs); | |
726 | } | |
727 | ||
728 | /* Returns true if 'cs' is currently connected or will eventually try to | |
729 | * reconnect. */ | |
730 | bool | |
731 | ovsdb_cs_is_alive(const struct ovsdb_cs *cs) | |
732 | { | |
733 | return (cs->session | |
734 | && jsonrpc_session_is_alive(cs->session) | |
735 | && cs->state != CS_S_ERROR); | |
736 | } | |
737 | ||
738 | /* Returns true if 'cs' is currently connected to a server. */ | |
739 | bool | |
740 | ovsdb_cs_is_connected(const struct ovsdb_cs *cs) | |
741 | { | |
742 | return cs->session && jsonrpc_session_is_connected(cs->session); | |
743 | } | |
744 | ||
745 | /* Returns the last error reported on a connection by 'cs'. The return value | |
746 | * is 0 only if no connection made by 'cs' has ever encountered an error and | |
747 | * a negative response to a schema request has never been received. See | |
748 | * jsonrpc_get_status() for jsonrpc_session_get_last_error() return value | |
749 | * interpretation. */ | |
750 | int | |
751 | ovsdb_cs_get_last_error(const struct ovsdb_cs *cs) | |
752 | { | |
753 | int err = cs->session ? jsonrpc_session_get_last_error(cs->session) : 0; | |
754 | if (err) { | |
755 | return err; | |
756 | } else if (cs->state == CS_S_ERROR) { | |
757 | return ENOENT; | |
758 | } else { | |
759 | return 0; | |
760 | } | |
761 | } | |
762 | ||
763 | /* Sets the "probe interval" for 'cs''s current session to 'probe_interval', in | |
764 | * milliseconds. */ | |
765 | void | |
766 | ovsdb_cs_set_probe_interval(const struct ovsdb_cs *cs, int probe_interval) | |
767 | { | |
768 | if (cs->session) { | |
769 | jsonrpc_session_set_probe_interval(cs->session, probe_interval); | |
770 | } | |
771 | } | |
772 | \f | |
773 | /* Conditional monitoring. */ | |
774 | ||
775 | /* A table being monitored. | |
776 | * | |
777 | * At the CS layer, the only thing we care about, table-wise, is the conditions | |
778 | * we're using for monitoring them, so there's little here. We only create | |
779 | * these table structures at all for tables that have conditions. */ | |
780 | struct ovsdb_cs_db_table { | |
781 | struct hmap_node hmap_node; /* Indexed by 'name'. */ | |
782 | char *name; /* Table name. */ | |
783 | ||
784 | /* Each of these is a null pointer if it is empty, or JSON [<condition>*] | |
785 | * or [true] or [false] otherwise. [true] could be represented as a null | |
786 | * pointer, but we want to distinguish "empty slot" from "a condition that | |
787 | * is always true" in a slot. */ | |
788 | struct json *ack_cond; /* Last condition acked by the server. */ | |
789 | struct json *req_cond; /* Last condition requested to the server. */ | |
790 | struct json *new_cond; /* Latest condition set by the IDL client. */ | |
791 | }; | |
792 | ||
793 | /* A kind of condition, so that we can treat equivalent JSON as equivalent. */ | |
794 | enum condition_type { | |
795 | COND_FALSE, /* [] or [false] */ | |
796 | COND_TRUE, /* Null pointer or [true] */ | |
797 | COND_OTHER /* Anything else. */ | |
798 | }; | |
799 | ||
800 | /* Returns the condition_type for 'condition'. */ | |
801 | static enum condition_type | |
802 | condition_classify(const struct json *condition) | |
803 | { | |
804 | if (condition) { | |
805 | const struct json_array *a = json_array(condition); | |
806 | switch (a->n) { | |
807 | case 0: | |
808 | return COND_FALSE; | |
809 | ||
810 | case 1: | |
811 | return (a->elems[0]->type == JSON_FALSE ? COND_FALSE | |
812 | : a->elems[0]->type == JSON_TRUE ? COND_TRUE | |
813 | : COND_OTHER); | |
814 | ||
815 | default: | |
816 | return COND_OTHER; | |
817 | } | |
818 | } else { | |
819 | return COND_TRUE; | |
820 | } | |
821 | } | |
822 | ||
823 | /* Returns true if 'a' and 'b' are the same condition (in an obvious way; we're | |
824 | * not going to compare for boolean equivalence or anything). */ | |
825 | static bool | |
826 | condition_equal(const struct json *a, const struct json *b) | |
827 | { | |
828 | enum condition_type type = condition_classify(a); | |
829 | return (type == condition_classify(b) | |
830 | && (type != COND_OTHER || json_equal(a, b))); | |
831 | } | |
832 | ||
833 | /* Returns a clone of 'condition', translating always-true and always-false to | |
834 | * [true] and [false], respectively. */ | |
835 | static struct json * | |
836 | condition_clone(const struct json *condition) | |
837 | { | |
838 | switch (condition_classify(condition)) { | |
839 | case COND_TRUE: | |
840 | return json_array_create_1(json_boolean_create(true)); | |
841 | ||
842 | case COND_FALSE: | |
843 | return json_array_create_1(json_boolean_create(false)); | |
844 | ||
845 | case COND_OTHER: | |
846 | return json_clone(condition); | |
847 | } | |
848 | ||
849 | OVS_NOT_REACHED(); | |
850 | } | |
851 | ||
852 | /* Returns the ovsdb_cs_db_table associated with 'table' in 'db', creating an | |
853 | * empty one if necessary. */ | |
854 | static struct ovsdb_cs_db_table * | |
855 | ovsdb_cs_db_get_table(struct ovsdb_cs_db *db, const char *table) | |
856 | { | |
857 | uint32_t hash = hash_string(table, 0); | |
858 | struct ovsdb_cs_db_table *t; | |
859 | ||
860 | HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &db->tables) { | |
861 | if (!strcmp(t->name, table)) { | |
862 | return t; | |
863 | } | |
864 | } | |
865 | ||
866 | t = xzalloc(sizeof *t); | |
867 | t->name = xstrdup(table); | |
868 | t->new_cond = json_array_create_1(json_boolean_create(true)); | |
869 | hmap_insert(&db->tables, &t->hmap_node, hash); | |
870 | return t; | |
871 | } | |
872 | ||
873 | static void | |
874 | ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *db) | |
875 | { | |
876 | struct ovsdb_cs_db_table *table, *next; | |
877 | HMAP_FOR_EACH_SAFE (table, next, hmap_node, &db->tables) { | |
878 | json_destroy(table->ack_cond); | |
879 | json_destroy(table->req_cond); | |
880 | json_destroy(table->new_cond); | |
881 | hmap_remove(&db->tables, &table->hmap_node); | |
882 | free(table->name); | |
883 | free(table); | |
884 | } | |
885 | hmap_destroy(&db->tables); | |
886 | } | |
887 | ||
888 | static unsigned int | |
889 | ovsdb_cs_db_set_condition(struct ovsdb_cs_db *db, const char *table, | |
890 | const struct json *condition) | |
891 | { | |
892 | /* Compare the new condition to the last known condition which can be | |
893 | * either "new" (not sent yet), "requested" or "acked", in this order. */ | |
894 | struct ovsdb_cs_db_table *t = ovsdb_cs_db_get_table(db, table); | |
895 | const struct json *table_cond = (t->new_cond ? t->new_cond | |
896 | : t->req_cond ? t->req_cond | |
897 | : t->ack_cond); | |
898 | if (!condition_equal(condition, table_cond)) { | |
899 | json_destroy(t->new_cond); | |
900 | t->new_cond = condition_clone(condition); | |
901 | db->cond_changed = true; | |
902 | poll_immediate_wake(); | |
903 | } | |
904 | ||
905 | /* Conditions will be up to date when we receive replies for already | |
906 | * requested and new conditions, if any. */ | |
907 | return db->cond_seqno + (t->new_cond ? 1 : 0) + (t->req_cond ? 1 : 0); | |
908 | } | |
909 | ||
910 | /* Sets the replication condition for 'tc' in 'cs' to 'condition' and arranges | |
911 | * to send the new condition to the database server. | |
912 | * | |
913 | * Return the next conditional update sequence number. When this value and | |
914 | * ovsdb_cs_get_condition_seqno() matches, 'cs' contains rows that match the | |
915 | * 'condition'. */ | |
916 | unsigned int | |
917 | ovsdb_cs_set_condition(struct ovsdb_cs *cs, const char *table, | |
918 | const struct json *condition) | |
919 | { | |
920 | return ovsdb_cs_db_set_condition(&cs->data, table, condition); | |
921 | } | |
922 | ||
923 | /* Returns a "sequence number" that represents the number of conditional | |
924 | * monitoring updates successfully received by the OVSDB server of a CS | |
925 | * connection. | |
926 | * | |
927 | * ovsdb_cs_set_condition() sets a new condition that is different from the | |
928 | * current condtion, the next expected "sequence number" is returned. | |
929 | * | |
930 | * Whenever ovsdb_cs_get_condition_seqno() returns a value that matches the | |
931 | * return value of ovsdb_cs_set_condition(), the client is assured that: | |
932 | * | |
933 | * - The ovsdb_cs_set_condition() changes has been acknowledged by the OVSDB | |
934 | * server. | |
935 | * | |
936 | * - 'cs' now contains the content matches the new conditions. */ | |
937 | unsigned int | |
938 | ovsdb_cs_get_condition_seqno(const struct ovsdb_cs *cs) | |
939 | { | |
940 | return cs->data.cond_seqno; | |
941 | } | |
942 | ||
943 | static struct json * | |
944 | ovsdb_cs_create_cond_change_req(const struct json *cond) | |
945 | { | |
946 | struct json *monitor_cond_change_request = json_object_create(); | |
947 | json_object_put(monitor_cond_change_request, "where", json_clone(cond)); | |
948 | return monitor_cond_change_request; | |
949 | } | |
950 | ||
951 | static struct jsonrpc_msg * | |
952 | ovsdb_cs_db_compose_cond_change(struct ovsdb_cs_db *db) | |
953 | { | |
954 | if (!db->cond_changed) { | |
955 | return NULL; | |
956 | } | |
957 | ||
958 | struct json *monitor_cond_change_requests = NULL; | |
959 | struct ovsdb_cs_db_table *table; | |
960 | HMAP_FOR_EACH (table, hmap_node, &db->tables) { | |
961 | /* Always use the most recent conditions set by the CS client when | |
962 | * requesting monitor_cond_change, i.e., table->new_cond. | |
963 | */ | |
964 | if (table->new_cond) { | |
965 | struct json *req = | |
966 | ovsdb_cs_create_cond_change_req(table->new_cond); | |
967 | if (req) { | |
968 | if (!monitor_cond_change_requests) { | |
969 | monitor_cond_change_requests = json_object_create(); | |
970 | } | |
971 | json_object_put(monitor_cond_change_requests, | |
972 | table->name, | |
973 | json_array_create_1(req)); | |
974 | } | |
975 | /* Mark the new condition as requested by moving it to req_cond. | |
976 | * If there's already requested condition that's a bug. | |
977 | */ | |
978 | ovs_assert(table->req_cond == NULL); | |
979 | table->req_cond = table->new_cond; | |
980 | table->new_cond = NULL; | |
981 | } | |
982 | } | |
983 | ||
984 | if (!monitor_cond_change_requests) { | |
985 | return NULL; | |
986 | } | |
987 | ||
988 | db->cond_changed = false; | |
989 | struct json *params = json_array_create_3(json_clone(db->monitor_id), | |
990 | json_clone(db->monitor_id), | |
991 | monitor_cond_change_requests); | |
992 | return jsonrpc_create_request("monitor_cond_change", params, NULL); | |
993 | } | |
994 | ||
995 | /* Marks all requested table conditions in 'db' as acked by the server. | |
996 | * It should be called when the server replies to monitor_cond_change | |
997 | * requests. | |
998 | */ | |
999 | static void | |
1000 | ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db) | |
1001 | { | |
1002 | struct ovsdb_cs_db_table *table; | |
1003 | HMAP_FOR_EACH (table, hmap_node, &db->tables) { | |
1004 | if (table->req_cond) { | |
1005 | json_destroy(table->ack_cond); | |
1006 | table->ack_cond = table->req_cond; | |
1007 | table->req_cond = NULL; | |
1008 | } | |
1009 | } | |
1010 | } | |
1011 | ||
1012 | /* Should be called when the CS fsm is restarted and resyncs table conditions | |
1013 | * based on the state the DB is in: | |
1014 | * - if a non-zero last_id is available for the DB then upon reconnect | |
1015 | * the CS should first request acked conditions to avoid missing updates | |
1016 | * about records that were added before the transaction with | |
1017 | * txn-id == last_id. If there were requested condition changes in flight | |
1018 | * (i.e., req_cond not NULL) and the CS client didn't set new conditions | |
1019 | * (i.e., new_cond is NULL) then move req_cond to new_cond to trigger a | |
1020 | * follow up monitor_cond_change request. | |
1021 | * - if there's no last_id available for the DB then it's safe to use the | |
1022 | * latest conditions set by the CS client even if they weren't acked yet. | |
1023 | */ | |
1024 | static void | |
1025 | ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db) | |
1026 | { | |
1027 | bool ack_all = uuid_is_zero(&db->last_id); | |
1028 | if (ack_all) { | |
1029 | db->cond_changed = false; | |
1030 | } | |
1031 | ||
1032 | struct ovsdb_cs_db_table *table; | |
1033 | HMAP_FOR_EACH (table, hmap_node, &db->tables) { | |
1034 | /* When monitor_cond_since requests will be issued, the | |
1035 | * table->ack_cond condition will be added to the "where" clause". | |
1036 | * Follow up monitor_cond_change requests will use table->new_cond. | |
1037 | */ | |
1038 | if (ack_all) { | |
1039 | if (table->new_cond) { | |
1040 | json_destroy(table->req_cond); | |
1041 | table->req_cond = table->new_cond; | |
1042 | table->new_cond = NULL; | |
1043 | } | |
1044 | ||
1045 | if (table->req_cond) { | |
1046 | json_destroy(table->ack_cond); | |
1047 | table->ack_cond = table->req_cond; | |
1048 | table->req_cond = NULL; | |
1049 | } | |
1050 | } else { | |
1051 | if (table->req_cond) { | |
1052 | /* There was an in-flight monitor_cond_change request. It's no | |
1053 | * longer relevant in the restarted FSM, so clear it. */ | |
1054 | if (table->new_cond) { | |
1055 | /* We will send a new monitor_cond_change with the new | |
1056 | * condition. The previously in-flight condition is | |
1057 | * irrelevant and we can just forget about it. */ | |
1058 | json_destroy(table->req_cond); | |
1059 | } else { | |
1060 | /* The restarted FSM needs to again send a request for the | |
1061 | * previously in-flight condition. */ | |
1062 | table->new_cond = table->req_cond; | |
1063 | } | |
1064 | table->req_cond = NULL; | |
1065 | db->cond_changed = true; | |
1066 | } | |
1067 | } | |
1068 | } | |
1069 | } | |
1070 | ||
1071 | static void | |
1072 | ovsdb_cs_send_cond_change(struct ovsdb_cs *cs) | |
1073 | { | |
1074 | /* When 'cs->request_id' is not NULL, there is an outstanding | |
1075 | * conditional monitoring update request that we have not heard | |
1076 | * from the server yet. Don't generate another request in this case. */ | |
1077 | if (!jsonrpc_session_is_connected(cs->session) | |
1078 | || cs->data.monitor_version == 1 | |
1079 | || cs->request_id) { | |
1080 | return; | |
1081 | } | |
1082 | ||
1083 | struct jsonrpc_msg *msg = ovsdb_cs_db_compose_cond_change(&cs->data); | |
1084 | if (msg) { | |
1085 | cs->request_id = json_clone(msg->id); | |
1086 | jsonrpc_session_send(cs->session, msg); | |
1087 | } | |
1088 | } | |
1089 | \f | |
1090 | /* Clustered servers. */ | |
1091 | ||
1092 | /* By default, or if 'leader_only' is true, when 'cs' connects to a clustered | |
1093 | * database, the CS layer will avoid servers other than the cluster | |
1094 | * leader. This ensures that any data that it reads and reports is up-to-date. | |
1095 | * If 'leader_only' is false, the CS layer will accept any server in the | |
1096 | * cluster, which means that for read-only transactions it can report and act | |
1097 | * on stale data (transactions that modify the database are always serialized | |
1098 | * even with false 'leader_only'). Refer to Understanding Cluster Consistency | |
1099 | * in ovsdb(7) for more information. */ | |
1100 | void | |
1101 | ovsdb_cs_set_leader_only(struct ovsdb_cs *cs, bool leader_only) | |
1102 | { | |
1103 | cs->leader_only = leader_only; | |
1104 | if (leader_only && cs->server.monitor_version) { | |
1105 | ovsdb_cs_check_server_db(cs); | |
1106 | } | |
1107 | } | |
1108 | ||
1109 | /* Set whether the order of remotes should be shuffled, when there is more than | |
1110 | * one remote. The setting doesn't take effect until the next time when | |
1111 | * ovsdb_cs_set_remote() is called. */ | |
1112 | void | |
1113 | ovsdb_cs_set_shuffle_remotes(struct ovsdb_cs *cs, bool shuffle) | |
1114 | { | |
1115 | cs->shuffle_remotes = shuffle; | |
1116 | } | |
1117 | ||
1118 | /* Reset min_index to 0. This prevents a situation where the client | |
1119 | * thinks all databases have stale data, when they actually have all | |
1120 | * been destroyed and rebuilt from scratch. | |
1121 | */ | |
1122 | void | |
1123 | ovsdb_cs_reset_min_index(struct ovsdb_cs *cs) | |
1124 | { | |
1125 | cs->min_index = 0; | |
1126 | } | |
1127 | \f | |
1128 | /* Database locks. */ | |
1129 | ||
1130 | static struct jsonrpc_msg * | |
1131 | ovsdb_cs_db_set_lock(struct ovsdb_cs_db *db, const char *lock_name) | |
1132 | { | |
1133 | if (db->lock_name | |
1134 | && (!lock_name || strcmp(lock_name, db->lock_name))) { | |
1135 | /* Release previous lock. */ | |
1136 | struct jsonrpc_msg *msg = ovsdb_cs_db_compose_unlock_request(db); | |
1137 | free(db->lock_name); | |
1138 | db->lock_name = NULL; | |
1139 | db->is_lock_contended = false; | |
1140 | return msg; | |
1141 | } | |
1142 | ||
1143 | if (lock_name && !db->lock_name) { | |
1144 | /* Acquire new lock. */ | |
1145 | db->lock_name = xstrdup(lock_name); | |
1146 | return ovsdb_cs_db_compose_lock_request(db); | |
1147 | } | |
1148 | ||
1149 | return NULL; | |
1150 | } | |
1151 | ||
1152 | /* If 'lock_name' is nonnull, configures 'cs' to obtain the named lock from the | |
1153 | * database server and to prevent modifying the database when the lock cannot | |
1154 | * be acquired (that is, when another client has the same lock). | |
1155 | * | |
1156 | * If 'lock_name' is NULL, drops the locking requirement and releases the | |
1157 | * lock. */ | |
1158 | void | |
1159 | ovsdb_cs_set_lock(struct ovsdb_cs *cs, const char *lock_name) | |
1160 | { | |
1161 | for (;;) { | |
1162 | struct jsonrpc_msg *msg = ovsdb_cs_db_set_lock(&cs->data, lock_name); | |
1163 | if (!msg) { | |
1164 | break; | |
1165 | } | |
1166 | if (cs->session) { | |
1167 | jsonrpc_session_send(cs->session, msg); | |
1168 | } else { | |
1169 | jsonrpc_msg_destroy(msg); | |
1170 | } | |
1171 | } | |
1172 | } | |
1173 | ||
1174 | /* Returns the name of the lock that 'cs' is trying to obtain, or NULL if none | |
1175 | * is configured. */ | |
1176 | const char * | |
1177 | ovsdb_cs_get_lock(const struct ovsdb_cs *cs) | |
1178 | { | |
1179 | return cs->data.lock_name; | |
1180 | } | |
1181 | ||
1182 | /* Returns true if 'cs' is configured to obtain a lock and owns that lock, | |
1183 | * false if it doesn't own the lock or isn't configured to obtain one. | |
1184 | * | |
1185 | * Locking and unlocking happens asynchronously from the database client's | |
1186 | * point of view, so the information is only useful for optimization (e.g. if | |
1187 | * the client doesn't have the lock then there's no point in trying to write to | |
1188 | * the database). */ | |
1189 | bool | |
1190 | ovsdb_cs_has_lock(const struct ovsdb_cs *cs) | |
1191 | { | |
1192 | return cs->data.has_lock; | |
1193 | } | |
1194 | ||
1195 | /* Returns true if 'cs' is configured to obtain a lock but the database server | |
1196 | * has indicated that some other client already owns the requested lock. */ | |
1197 | bool | |
1198 | ovsdb_cs_is_lock_contended(const struct ovsdb_cs *cs) | |
1199 | { | |
1200 | return cs->data.is_lock_contended; | |
1201 | } | |
1202 | ||
1203 | static void | |
1204 | ovsdb_cs_db_update_has_lock(struct ovsdb_cs_db *db, bool new_has_lock) | |
1205 | { | |
1206 | if (new_has_lock && !db->has_lock) { | |
1207 | ovsdb_cs_db_add_event(db, OVSDB_CS_EVENT_TYPE_LOCKED); | |
1208 | db->is_lock_contended = false; | |
1209 | } | |
1210 | db->has_lock = new_has_lock; | |
1211 | } | |
1212 | ||
1213 | static bool | |
1214 | ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *db, | |
1215 | const struct jsonrpc_msg *msg) | |
1216 | { | |
1217 | if (msg->type == JSONRPC_REPLY | |
1218 | && db->lock_request_id | |
1219 | && json_equal(db->lock_request_id, msg->id)) { | |
1220 | /* Reply to our "lock" request. */ | |
1221 | ovsdb_cs_db_parse_lock_reply(db, msg->result); | |
1222 | return true; | |
1223 | } | |
1224 | ||
1225 | if (msg->type == JSONRPC_NOTIFY) { | |
1226 | if (!strcmp(msg->method, "locked")) { | |
1227 | /* We got our lock. */ | |
1228 | return ovsdb_cs_db_parse_lock_notify(db, msg->params, true); | |
1229 | } else if (!strcmp(msg->method, "stolen")) { | |
1230 | /* Someone else stole our lock. */ | |
1231 | return ovsdb_cs_db_parse_lock_notify(db, msg->params, false); | |
1232 | } | |
1233 | } | |
1234 | ||
1235 | return false; | |
1236 | } | |
1237 | ||
1238 | static struct jsonrpc_msg * | |
1239 | ovsdb_cs_db_compose_lock_request__(struct ovsdb_cs_db *db, | |
1240 | const char *method) | |
1241 | { | |
1242 | ovsdb_cs_db_update_has_lock(db, false); | |
1243 | ||
1244 | json_destroy(db->lock_request_id); | |
1245 | db->lock_request_id = NULL; | |
1246 | ||
1247 | struct json *params = json_array_create_1(json_string_create( | |
1248 | db->lock_name)); | |
1249 | return jsonrpc_create_request(method, params, NULL); | |
1250 | } | |
1251 | ||
1252 | static struct jsonrpc_msg * | |
1253 | ovsdb_cs_db_compose_lock_request(struct ovsdb_cs_db *db) | |
1254 | { | |
1255 | struct jsonrpc_msg *msg = ovsdb_cs_db_compose_lock_request__(db, "lock"); | |
1256 | db->lock_request_id = json_clone(msg->id); | |
1257 | return msg; | |
1258 | } | |
1259 | ||
1260 | static struct jsonrpc_msg * | |
1261 | ovsdb_cs_db_compose_unlock_request(struct ovsdb_cs_db *db) | |
1262 | { | |
1263 | return ovsdb_cs_db_compose_lock_request__(db, "unlock"); | |
1264 | } | |
1265 | ||
1266 | static void | |
1267 | ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *db, | |
1268 | const struct json *result) | |
1269 | { | |
1270 | bool got_lock; | |
1271 | ||
1272 | json_destroy(db->lock_request_id); | |
1273 | db->lock_request_id = NULL; | |
1274 | ||
1275 | if (result->type == JSON_OBJECT) { | |
1276 | const struct json *locked; | |
1277 | ||
1278 | locked = shash_find_data(json_object(result), "locked"); | |
1279 | got_lock = locked && locked->type == JSON_TRUE; | |
1280 | } else { | |
1281 | got_lock = false; | |
1282 | } | |
1283 | ||
1284 | ovsdb_cs_db_update_has_lock(db, got_lock); | |
1285 | if (!got_lock) { | |
1286 | db->is_lock_contended = true; | |
1287 | } | |
1288 | } | |
1289 | ||
1290 | static bool | |
1291 | ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *db, | |
1292 | const struct json *params, | |
1293 | bool new_has_lock) | |
1294 | { | |
1295 | if (db->lock_name | |
1296 | && params->type == JSON_ARRAY | |
1297 | && json_array(params)->n > 0 | |
1298 | && json_array(params)->elems[0]->type == JSON_STRING) { | |
1299 | const char *lock_name = json_string(json_array(params)->elems[0]); | |
1300 | ||
1301 | if (!strcmp(db->lock_name, lock_name)) { | |
1302 | ovsdb_cs_db_update_has_lock(db, new_has_lock); | |
1303 | if (!new_has_lock) { | |
1304 | db->is_lock_contended = true; | |
1305 | } | |
1306 | return true; | |
1307 | } | |
1308 | } | |
1309 | return false; | |
1310 | } | |
1311 | \f | |
1312 | /* Transactions. */ | |
1313 | ||
1314 | static bool | |
1315 | ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *cs, | |
1316 | const struct jsonrpc_msg *reply) | |
1317 | { | |
1318 | bool found = ovsdb_cs_forget_transaction(cs, reply->id); | |
1319 | if (found) { | |
1320 | struct ovsdb_cs_event *event | |
1321 | = ovsdb_cs_db_add_event(&cs->data, OVSDB_CS_EVENT_TYPE_TXN_REPLY); | |
1322 | event->txn_reply = jsonrpc_msg_clone(reply); | |
1323 | } | |
1324 | return found; | |
1325 | } | |
1326 | ||
1327 | /* Returns true if 'cs' can be sent a transaction now, false otherwise. This | |
1328 | * is useful for optimization: there is no point in composing and sending a | |
1329 | * transaction if it returns false. */ | |
1330 | bool | |
1331 | ovsdb_cs_may_send_transaction(const struct ovsdb_cs *cs) | |
1332 | { | |
1333 | return (cs->session != NULL | |
1334 | && cs->state == CS_S_MONITORING | |
1335 | && (!cs->data.lock_name || ovsdb_cs_has_lock(cs))); | |
1336 | } | |
1337 | ||
1338 | /* Attempts to send a transaction with the specified 'operations' to 'cs''s | |
1339 | * server. On success, returns the request ID; the caller must eventually free | |
1340 | * it. On failure, returns NULL. */ | |
1341 | struct json * OVS_WARN_UNUSED_RESULT | |
1342 | ovsdb_cs_send_transaction(struct ovsdb_cs *cs, struct json *operations) | |
1343 | { | |
1344 | if (!ovsdb_cs_may_send_transaction(cs)) { | |
1345 | json_destroy(operations); | |
1346 | return NULL; | |
1347 | } | |
1348 | ||
1349 | if (cs->data.lock_name) { | |
1350 | struct json *assertion = json_object_create(); | |
1351 | json_object_put_string(assertion, "op", "assert"); | |
1352 | json_object_put_string(assertion, "lock", cs->data.lock_name); | |
1353 | json_array_add(operations, assertion); | |
1354 | } | |
1355 | ||
1356 | struct json *request_id; | |
1357 | struct jsonrpc_msg *request = jsonrpc_create_request( | |
1358 | "transact", operations, &request_id); | |
1359 | int error = jsonrpc_session_send(cs->session, request); | |
1360 | if (error) { | |
1361 | json_destroy(request_id); | |
1362 | return NULL; | |
1363 | } | |
1364 | ||
1365 | if (cs->n_txns >= cs->allocated_txns) { | |
1366 | cs->txns = x2nrealloc(cs->txns, &cs->allocated_txns, | |
1367 | sizeof *cs->txns); | |
1368 | } | |
1369 | cs->txns[cs->n_txns++] = request_id; | |
1370 | return request_id; | |
1371 | } | |
1372 | ||
1373 | /* Makes 'cs' drop its record of transaction 'request_id'. If a reply arrives | |
1374 | * for it later (which it will, unless the connection drops in the meantime), | |
1375 | * it won't be reported through an event. | |
1376 | * | |
1377 | * Returns true if 'request_id' was known, false otherwise. */ | |
1378 | bool | |
1379 | ovsdb_cs_forget_transaction(struct ovsdb_cs *cs, const struct json *request_id) | |
1380 | { | |
1381 | for (size_t i = 0; i < cs->n_txns; i++) { | |
1382 | if (json_equal(request_id, cs->txns[i])) { | |
1383 | cs->txns[i] = cs->txns[--cs->n_txns]; | |
1384 | return true; | |
1385 | } | |
1386 | } | |
1387 | return false; | |
1388 | } | |
1389 | \f | |
1390 | static void | |
1391 | ovsdb_cs_send_schema_request(struct ovsdb_cs *cs, | |
1392 | struct ovsdb_cs_db *db) | |
1393 | { | |
1394 | ovsdb_cs_send_request(cs, jsonrpc_create_request( | |
1395 | "get_schema", | |
1396 | json_array_create_1(json_string_create( | |
1397 | db->db_name)), | |
1398 | NULL)); | |
1399 | } | |
1400 | ||
1401 | static void | |
1402 | ovsdb_cs_send_db_change_aware(struct ovsdb_cs *cs) | |
1403 | { | |
1404 | struct jsonrpc_msg *msg = jsonrpc_create_request( | |
1405 | "set_db_change_aware", json_array_create_1(json_boolean_create(true)), | |
1406 | NULL); | |
1407 | jsonrpc_session_send(cs->session, msg); | |
1408 | } | |
1409 | ||
1410 | static void | |
1411 | ovsdb_cs_send_monitor_request(struct ovsdb_cs *cs, struct ovsdb_cs_db *db, | |
1412 | int version) | |
1413 | { | |
1414 | struct json *mrs = db->ops->compose_monitor_requests( | |
1415 | db->schema, db->ops_aux); | |
1416 | /* XXX handle failure */ | |
1417 | ovs_assert(mrs->type == JSON_OBJECT); | |
1418 | ||
1419 | if (version > 1) { | |
1420 | struct ovsdb_cs_db_table *table; | |
1421 | HMAP_FOR_EACH (table, hmap_node, &db->tables) { | |
1422 | if (table->ack_cond) { | |
1423 | struct json *mr = shash_find_data(json_object(mrs), | |
1424 | table->name); | |
1425 | if (!mr) { | |
1426 | mr = json_array_create_empty(); | |
1427 | json_object_put(mrs, table->name, mr); | |
1428 | } | |
1429 | ovs_assert(mr->type == JSON_ARRAY); | |
1430 | ||
1431 | struct json *mr0; | |
1432 | if (json_array(mr)->n == 0) { | |
1433 | mr0 = json_object_create(); | |
1434 | json_object_put(mr0, "columns", json_array_create_empty()); | |
1435 | json_array_add(mr, mr0); | |
1436 | } else { | |
1437 | mr0 = json_array(mr)->elems[0]; | |
1438 | } | |
1439 | ovs_assert(mr0->type == JSON_OBJECT); | |
1440 | ||
1441 | json_object_put(mr0, "where", | |
1442 | json_clone(table->ack_cond)); | |
1443 | } | |
1444 | } | |
1445 | } | |
1446 | ||
1447 | const char *method = (version == 1 ? "monitor" | |
1448 | : version == 2 ? "monitor_cond" | |
1449 | : "monitor_cond_since"); | |
1450 | struct json *params = json_array_create_3( | |
1451 | json_string_create(db->db_name), | |
1452 | json_clone(db->monitor_id), | |
1453 | mrs); | |
1454 | if (version == 3) { | |
1455 | struct json *json_last_id = json_string_create_nocopy( | |
1456 | xasprintf(UUID_FMT, UUID_ARGS(&db->last_id))); | |
1457 | json_array_add(params, json_last_id); | |
1458 | } | |
1459 | ovsdb_cs_send_request(cs, jsonrpc_create_request(method, params, NULL)); | |
1460 | } | |
1461 | ||
1462 | static void | |
1463 | log_parse_update_error(struct ovsdb_error *error) | |
1464 | { | |
1465 | if (!VLOG_DROP_WARN(&syntax_rl)) { | |
1466 | char *s = ovsdb_error_to_string(error); | |
1467 | VLOG_WARN_RL(&syntax_rl, "%s", s); | |
1468 | free(s); | |
1469 | } | |
1470 | ovsdb_error_destroy(error); | |
1471 | } | |
1472 | ||
1473 | static void | |
1474 | ovsdb_cs_db_add_update(struct ovsdb_cs_db *db, | |
1475 | const struct json *table_updates, int version, | |
1476 | bool clear, bool monitor_reply) | |
1477 | { | |
1478 | struct ovsdb_cs_event *event = ovsdb_cs_db_add_event( | |
1479 | db, OVSDB_CS_EVENT_TYPE_UPDATE); | |
1480 | event->update = (struct ovsdb_cs_update_event) { | |
1481 | .table_updates = json_clone(table_updates), | |
1482 | .clear = clear, | |
1483 | .monitor_reply = monitor_reply, | |
1484 | .version = version, | |
1485 | }; | |
1486 | } | |
1487 | ||
1488 | static void | |
1489 | ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *db, | |
1490 | const struct json *result, int version) | |
1491 | { | |
1492 | const struct json *table_updates; | |
1493 | bool clear; | |
1494 | if (version == 3) { | |
1495 | struct uuid last_id; | |
1496 | if (result->type != JSON_ARRAY || result->array.n != 3 | |
1497 | || (result->array.elems[0]->type != JSON_TRUE && | |
1498 | result->array.elems[0]->type != JSON_FALSE) | |
1499 | || result->array.elems[1]->type != JSON_STRING | |
1500 | || !uuid_from_string(&last_id, | |
1501 | json_string(result->array.elems[1]))) { | |
1502 | struct ovsdb_error *error = ovsdb_syntax_error( | |
1503 | result, NULL, "bad monitor_cond_since reply format"); | |
1504 | log_parse_update_error(error); | |
1505 | return; | |
1506 | } | |
1507 | ||
1508 | bool found = json_boolean(result->array.elems[0]); | |
1509 | clear = !found; | |
1510 | table_updates = result->array.elems[2]; | |
1511 | } else { | |
1512 | clear = true; | |
1513 | table_updates = result; | |
1514 | } | |
1515 | ||
1516 | ovsdb_cs_db_add_update(db, table_updates, version, clear, true); | |
1517 | } | |
1518 | ||
1519 | static bool | |
1520 | ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *db, | |
1521 | const struct jsonrpc_msg *msg) | |
1522 | { | |
1523 | if (msg->type != JSONRPC_NOTIFY) { | |
1524 | return false; | |
1525 | } | |
1526 | ||
1527 | int version = (!strcmp(msg->method, "update") ? 1 | |
1528 | : !strcmp(msg->method, "update2") ? 2 | |
1529 | : !strcmp(msg->method, "update3") ? 3 | |
1530 | : 0); | |
1531 | if (!version) { | |
1532 | return false; | |
1533 | } | |
1534 | ||
1535 | struct json *params = msg->params; | |
1536 | int n = version == 3 ? 3 : 2; | |
1537 | if (params->type != JSON_ARRAY || params->array.n != n) { | |
1538 | struct ovsdb_error *error = ovsdb_syntax_error( | |
1539 | params, NULL, "%s must be an array with %u elements.", | |
1540 | msg->method, n); | |
1541 | log_parse_update_error(error); | |
1542 | return false; | |
1543 | } | |
1544 | ||
1545 | if (!json_equal(params->array.elems[0], db->monitor_id)) { | |
1546 | return false; | |
1547 | } | |
1548 | ||
1549 | if (version == 3) { | |
1550 | const char *last_id = json_string(params->array.elems[1]); | |
1551 | if (!uuid_from_string(&db->last_id, last_id)) { | |
1552 | struct ovsdb_error *error = ovsdb_syntax_error( | |
1553 | params, NULL, "Last-id %s is not in UUID format.", last_id); | |
1554 | log_parse_update_error(error); | |
1555 | return false; | |
1556 | } | |
1557 | } | |
1558 | ||
1559 | struct json *table_updates = params->array.elems[version == 3 ? 2 : 1]; | |
1560 | ovsdb_cs_db_add_update(db, table_updates, version, false, false); | |
1561 | return true; | |
1562 | } | |
1563 | ||
1564 | static bool | |
1565 | ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *cs, | |
1566 | struct ovsdb_cs_db *db, | |
1567 | const struct jsonrpc_msg *msg) | |
1568 | { | |
1569 | if (msg->type != JSONRPC_NOTIFY | |
1570 | || strcmp(msg->method, "monitor_canceled") | |
1571 | || msg->params->type != JSON_ARRAY | |
1572 | || msg->params->array.n != 1 | |
1573 | || !json_equal(msg->params->array.elems[0], db->monitor_id)) { | |
1574 | return false; | |
1575 | } | |
1576 | ||
1577 | db->monitor_version = 0; | |
1578 | ||
1579 | /* Cancel the other monitor and restart the FSM from the top. | |
1580 | * | |
1581 | * Maybe a more sophisticated response would be better in some cases, but | |
1582 | * it doesn't seem worth optimizing yet. (Although this is already more | |
1583 | * sophisticated than just dropping the connection and reconnecting.) */ | |
1584 | struct ovsdb_cs_db *other_db | |
1585 | = db == &cs->data ? &cs->server : &cs->data; | |
1586 | if (other_db->monitor_version) { | |
1587 | jsonrpc_session_send( | |
1588 | cs->session, | |
1589 | jsonrpc_create_request( | |
1590 | "monitor_cancel", | |
1591 | json_array_create_1(json_clone(other_db->monitor_id)), NULL)); | |
1592 | other_db->monitor_version = 0; | |
1593 | } | |
1594 | ovsdb_cs_restart_fsm(cs); | |
1595 | ||
1596 | return true; | |
1597 | } | |
1598 | \f | |
1599 | /* The _Server database. | |
1600 | * | |
1601 | * We replicate the Database table in the _Server database because this is the | |
1602 | * only way to find out properties we need to know for clustering, such as | |
1603 | * whether a database is clustered at all and whether this server is the | |
1604 | * leader. | |
1605 | * | |
1606 | * This code implements a kind of simple IDL-like layer. */ | |
1607 | ||
1608 | struct server_column { | |
1609 | const char *name; | |
1610 | struct ovsdb_type type; | |
1611 | }; | |
1612 | enum server_column_index { | |
1613 | COL_NAME, | |
1614 | COL_MODEL, | |
1615 | COL_CONNECTED, | |
1616 | COL_LEADER, | |
1617 | COL_SCHEMA, | |
1618 | COL_CID, | |
1619 | COL_INDEX, | |
1620 | }; | |
1621 | #define OPTIONAL_COLUMN(TYPE) \ | |
1622 | { \ | |
1623 | .key = OVSDB_BASE_##TYPE##_INIT, \ | |
1624 | .value = OVSDB_BASE_VOID_INIT, \ | |
1625 | .n_min = 0, \ | |
1626 | .n_max = 1 \ | |
1627 | } | |
1628 | static const struct server_column server_columns[] = { | |
1629 | [COL_NAME] = {"name", OPTIONAL_COLUMN(STRING) }, | |
1630 | [COL_MODEL] = {"model", OPTIONAL_COLUMN(STRING) }, | |
1631 | [COL_CONNECTED] = {"connected", OPTIONAL_COLUMN(BOOLEAN) }, | |
1632 | [COL_LEADER] = {"leader", OPTIONAL_COLUMN(BOOLEAN) }, | |
1633 | [COL_SCHEMA] = {"schema", OPTIONAL_COLUMN(STRING) }, | |
1634 | [COL_CID] = {"cid", OPTIONAL_COLUMN(UUID) }, | |
1635 | [COL_INDEX] = {"index", OPTIONAL_COLUMN(INTEGER) }, | |
1636 | }; | |
1637 | #define N_SERVER_COLUMNS ARRAY_SIZE(server_columns) | |
1638 | struct server_row { | |
1639 | struct hmap_node hmap_node; | |
1640 | struct uuid uuid; | |
1641 | struct ovsdb_datum data[N_SERVER_COLUMNS]; | |
1642 | }; | |
1643 | ||
1644 | static void | |
1645 | server_row_destroy(struct server_row *row) | |
1646 | { | |
1647 | if (row) { | |
1648 | for (size_t i = 0; i < N_SERVER_COLUMNS; i++) { | |
1649 | ovsdb_datum_destroy(&row->data[i], &server_columns[i].type); | |
1650 | } | |
1651 | free(row); | |
1652 | } | |
1653 | } | |
1654 | ||
1655 | static struct server_row * | |
1656 | ovsdb_cs_find_server_row(struct ovsdb_cs *cs, const struct uuid *uuid) | |
1657 | { | |
1658 | struct server_row *row; | |
1659 | HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) { | |
1660 | if (uuid_equals(uuid, &row->uuid)) { | |
1661 | return row; | |
1662 | } | |
1663 | } | |
1664 | return NULL; | |
1665 | } | |
1666 | ||
1667 | static void | |
1668 | ovsdb_cs_delete_server_row(struct ovsdb_cs *cs, struct server_row *row) | |
1669 | { | |
1670 | hmap_remove(&cs->server_rows, &row->hmap_node); | |
1671 | server_row_destroy(row); | |
1672 | } | |
1673 | ||
1674 | static struct server_row * | |
1675 | ovsdb_cs_insert_server_row(struct ovsdb_cs *cs, const struct uuid *uuid) | |
1676 | { | |
1677 | struct server_row *row = xmalloc(sizeof *row); | |
1678 | hmap_insert(&cs->server_rows, &row->hmap_node, uuid_hash(uuid)); | |
1679 | row->uuid = *uuid; | |
1680 | for (size_t i = 0; i < N_SERVER_COLUMNS; i++) { | |
1681 | ovsdb_datum_init_default(&row->data[i], &server_columns[i].type); | |
1682 | } | |
1683 | return row; | |
1684 | } | |
1685 | ||
1686 | static void | |
1687 | ovsdb_cs_update_server_row(struct server_row *row, | |
1688 | const struct shash *update, bool xor) | |
1689 | { | |
1690 | for (size_t i = 0; i < N_SERVER_COLUMNS; i++) { | |
1691 | const struct server_column *column = &server_columns[i]; | |
1692 | struct shash_node *node = shash_find(update, column->name); | |
1693 | if (!node) { | |
1694 | continue; | |
1695 | } | |
1696 | const struct json *json = node->data; | |
1697 | ||
1698 | struct ovsdb_datum *old = &row->data[i]; | |
1699 | struct ovsdb_datum new; | |
1700 | if (!xor) { | |
1701 | struct ovsdb_error *error = ovsdb_datum_from_json( | |
1702 | &new, &column->type, json, NULL); | |
1703 | if (error) { | |
1704 | ovsdb_error_destroy(error); | |
1705 | continue; | |
1706 | } | |
1707 | } else { | |
1708 | struct ovsdb_datum diff; | |
1709 | struct ovsdb_error *error = ovsdb_transient_datum_from_json( | |
1710 | &diff, &column->type, json); | |
1711 | if (error) { | |
1712 | ovsdb_error_destroy(error); | |
1713 | continue; | |
1714 | } | |
1715 | ||
1716 | error = ovsdb_datum_apply_diff(&new, old, &diff, &column->type); | |
1717 | if (error) { | |
1718 | ovsdb_error_destroy(error); | |
1719 | ovsdb_datum_destroy(&new, &column->type); | |
1720 | continue; | |
1721 | } | |
1722 | ovsdb_datum_destroy(&diff, &column->type); | |
1723 | } | |
1724 | ||
1725 | ovsdb_datum_destroy(&row->data[i], &column->type); | |
1726 | row->data[i] = new; | |
1727 | } | |
1728 | } | |
1729 | ||
1730 | static void | |
1731 | ovsdb_cs_clear_server_rows(struct ovsdb_cs *cs) | |
1732 | { | |
1733 | struct server_row *row, *next; | |
1734 | HMAP_FOR_EACH_SAFE (row, next, hmap_node, &cs->server_rows) { | |
1735 | ovsdb_cs_delete_server_row(cs, row); | |
1736 | } | |
1737 | } | |
1738 | ||
1739 | static void log_parse_update_error(struct ovsdb_error *); | |
1740 | ||
1741 | static void | |
1742 | ovsdb_cs_process_server_event(struct ovsdb_cs *cs, | |
1743 | const struct ovsdb_cs_event *event) | |
1744 | { | |
1745 | ovs_assert(event->type == OVSDB_CS_EVENT_TYPE_UPDATE); | |
1746 | ||
1747 | const struct ovsdb_cs_update_event *update = &event->update; | |
1748 | struct ovsdb_cs_db_update *du; | |
1749 | struct ovsdb_error *error = ovsdb_cs_parse_db_update( | |
1750 | update->table_updates, update->version, &du); | |
1751 | if (error) { | |
1752 | log_parse_update_error(error); | |
1753 | return; | |
1754 | } | |
1755 | ||
1756 | if (update->clear) { | |
1757 | ovsdb_cs_clear_server_rows(cs); | |
1758 | } | |
1759 | ||
1760 | const struct ovsdb_cs_table_update *tu = ovsdb_cs_db_update_find_table( | |
1761 | du, "Database"); | |
1762 | if (tu) { | |
1763 | for (size_t i = 0; i < tu->n; i++) { | |
1764 | const struct ovsdb_cs_row_update *ru = &tu->row_updates[i]; | |
1765 | struct server_row *row | |
1766 | = ovsdb_cs_find_server_row(cs, &ru->row_uuid); | |
1767 | if (ru->type == OVSDB_CS_ROW_DELETE) { | |
1768 | ovsdb_cs_delete_server_row(cs, row); | |
1769 | } else { | |
1770 | if (!row) { | |
1771 | row = ovsdb_cs_insert_server_row(cs, &ru->row_uuid); | |
1772 | } | |
1773 | ovsdb_cs_update_server_row(row, ru->columns, | |
1774 | ru->type == OVSDB_CS_ROW_XOR); | |
1775 | } | |
1776 | } | |
1777 | } | |
1778 | ||
1779 | ovsdb_cs_db_update_destroy(du); | |
1780 | } | |
1781 | ||
1782 | static const char * | |
1783 | server_column_get_string(const struct server_row *row, | |
1784 | enum server_column_index index, | |
1785 | const char *default_value) | |
1786 | { | |
1787 | ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_STRING); | |
1788 | const struct ovsdb_datum *d = &row->data[index]; | |
1789 | return d->n == 1 ? d->keys[0].string : default_value; | |
1790 | } | |
1791 | ||
1792 | static bool | |
1793 | server_column_get_bool(const struct server_row *row, | |
1794 | enum server_column_index index, | |
1795 | bool default_value) | |
1796 | { | |
1797 | ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_BOOLEAN); | |
1798 | const struct ovsdb_datum *d = &row->data[index]; | |
1799 | return d->n == 1 ? d->keys[0].boolean : default_value; | |
1800 | } | |
1801 | ||
1802 | static uint64_t | |
1803 | server_column_get_int(const struct server_row *row, | |
1804 | enum server_column_index index, | |
1805 | uint64_t default_value) | |
1806 | { | |
1807 | ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_INTEGER); | |
1808 | const struct ovsdb_datum *d = &row->data[index]; | |
1809 | return d->n == 1 ? d->keys[0].integer : default_value; | |
1810 | } | |
1811 | ||
1812 | static const struct uuid * | |
1813 | server_column_get_uuid(const struct server_row *row, | |
1814 | enum server_column_index index, | |
1815 | const struct uuid *default_value) | |
1816 | { | |
1817 | ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_UUID); | |
1818 | const struct ovsdb_datum *d = &row->data[index]; | |
1819 | return d->n == 1 ? &d->keys[0].uuid : default_value; | |
1820 | } | |
1821 | ||
1822 | static const struct server_row * | |
1823 | ovsdb_find_server_row(struct ovsdb_cs *cs) | |
1824 | { | |
1825 | const struct server_row *row; | |
1826 | HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) { | |
1827 | const struct uuid *cid = server_column_get_uuid(row, COL_CID, NULL); | |
1828 | const char *name = server_column_get_string(row, COL_NAME, NULL); | |
1829 | if (uuid_is_zero(&cs->cid) | |
1830 | ? (name && !strcmp(cs->data.db_name, name)) | |
1831 | : (cid && uuid_equals(cid, &cs->cid))) { | |
1832 | return row; | |
1833 | } | |
1834 | } | |
1835 | return NULL; | |
1836 | } | |
1837 | ||
1838 | static void OVS_UNUSED | |
1839 | ovsdb_log_server_rows(const struct ovsdb_cs *cs) | |
1840 | { | |
1841 | int row_num = 0; | |
1842 | const struct server_row *row; | |
1843 | HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) { | |
1844 | struct ds s = DS_EMPTY_INITIALIZER; | |
1845 | for (size_t i = 0; i < N_SERVER_COLUMNS; i++) { | |
1846 | ds_put_format(&s, " %s=", server_columns[i].name); | |
1847 | if (i == COL_SCHEMA) { | |
1848 | ds_put_format(&s, "..."); | |
1849 | } else { | |
1850 | ovsdb_datum_to_string(&row->data[i], &server_columns[i].type, | |
1851 | &s); | |
1852 | } | |
1853 | } | |
1854 | VLOG_INFO("row %d:%s", row_num++, ds_cstr(&s)); | |
1855 | ds_destroy(&s); | |
1856 | } | |
1857 | } | |
1858 | ||
1859 | static bool | |
1860 | ovsdb_cs_check_server_db__(struct ovsdb_cs *cs) | |
1861 | { | |
1862 | struct ovsdb_cs_event *event; | |
1863 | LIST_FOR_EACH_POP (event, list_node, &cs->server.events) { | |
1864 | ovsdb_cs_process_server_event(cs, event); | |
1865 | ovsdb_cs_event_destroy(event); | |
1866 | } | |
1867 | ||
1868 | const struct server_row *db_row = ovsdb_find_server_row(cs); | |
1869 | static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); | |
1870 | const char *server_name = jsonrpc_session_get_name(cs->session); | |
1871 | if (!db_row) { | |
1872 | VLOG_INFO_RL(&rl, "%s: server does not have %s database", | |
1873 | server_name, cs->data.db_name); | |
1874 | return false; | |
1875 | } | |
1876 | ||
1877 | bool ok = false; | |
1878 | const char *model = server_column_get_string(db_row, COL_MODEL, ""); | |
1879 | const char *schema = server_column_get_string(db_row, COL_SCHEMA, NULL); | |
1880 | if (!strcmp(model, "clustered")) { | |
1881 | bool connected = server_column_get_bool(db_row, COL_CONNECTED, false); | |
1882 | bool leader = server_column_get_bool(db_row, COL_LEADER, false); | |
1883 | uint64_t index = server_column_get_int(db_row, COL_INDEX, 0); | |
1884 | ||
1885 | if (!schema) { | |
1886 | VLOG_INFO("%s: clustered database server has not yet joined " | |
1887 | "cluster; trying another server", server_name); | |
1888 | } else if (!connected) { | |
1889 | VLOG_INFO("%s: clustered database server is disconnected " | |
1890 | "from cluster; trying another server", server_name); | |
1891 | } else if (cs->leader_only && !leader) { | |
1892 | VLOG_INFO("%s: clustered database server is not cluster " | |
1893 | "leader; trying another server", server_name); | |
1894 | } else if (index < cs->min_index) { | |
1895 | VLOG_WARN("%s: clustered database server has stale data; " | |
1896 | "trying another server", server_name); | |
1897 | } else { | |
1898 | cs->min_index = index; | |
1899 | ok = true; | |
1900 | } | |
1901 | } else { | |
1902 | if (!schema) { | |
1903 | VLOG_INFO("%s: missing database schema", server_name); | |
1904 | } else { | |
1905 | ok = true; | |
1906 | } | |
1907 | } | |
1908 | if (!ok) { | |
1909 | return false; | |
1910 | } | |
1911 | ||
1912 | if (cs->state == CS_S_SERVER_MONITOR_REQUESTED) { | |
1913 | json_destroy(cs->data.schema); | |
1914 | cs->data.schema = json_from_string(schema); | |
1915 | if (cs->data.max_version >= 3) { | |
1916 | ovsdb_cs_send_monitor_request(cs, &cs->data, 3); | |
1917 | ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_SINCE_REQUESTED); | |
1918 | } else if (cs->data.max_version >= 2) { | |
1919 | ovsdb_cs_send_monitor_request(cs, &cs->data, 2); | |
1920 | ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED); | |
1921 | } else { | |
1922 | ovsdb_cs_send_monitor_request(cs, &cs->data, 1); | |
1923 | ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED); | |
1924 | } | |
1925 | } | |
1926 | return true; | |
1927 | } | |
1928 | ||
1929 | static bool | |
1930 | ovsdb_cs_check_server_db(struct ovsdb_cs *cs) | |
1931 | { | |
1932 | bool ok = ovsdb_cs_check_server_db__(cs); | |
1933 | if (!ok) { | |
1934 | ovsdb_cs_retry(cs); | |
1935 | } | |
1936 | return ok; | |
1937 | } | |
1938 | ||
1939 | static struct json * | |
1940 | ovsdb_cs_compose_server_monitor_request(const struct json *schema_json, | |
1941 | void *cs_) | |
1942 | { | |
1943 | struct ovsdb_cs *cs = cs_; | |
1944 | struct shash *schema = ovsdb_cs_parse_schema(schema_json); | |
1945 | struct json *monitor_requests = json_object_create(); | |
1946 | ||
1947 | const char *table_name = "Database"; | |
1948 | const struct sset *table_schema | |
1949 | = schema ? shash_find_data(schema, table_name) : NULL; | |
1950 | if (!table_schema) { | |
1951 | VLOG_WARN("%s database lacks %s table " | |
1952 | "(database needs upgrade?)", | |
1953 | cs->server.db_name, table_name); | |
1954 | /* XXX return failure? */ | |
1955 | } else { | |
1956 | struct json *columns = json_array_create_empty(); | |
1957 | for (size_t j = 0; j < N_SERVER_COLUMNS; j++) { | |
1958 | const struct server_column *column = &server_columns[j]; | |
1959 | bool db_has_column = (table_schema && | |
1960 | sset_contains(table_schema, column->name)); | |
1961 | if (table_schema && !db_has_column) { | |
1962 | VLOG_WARN("%s table in %s database lacks %s column " | |
1963 | "(database needs upgrade?)", | |
1964 | table_name, cs->server.db_name, column->name); | |
1965 | continue; | |
1966 | } | |
1967 | json_array_add(columns, json_string_create(column->name)); | |
1968 | } | |
1969 | ||
1970 | struct json *monitor_request = json_object_create(); | |
1971 | json_object_put(monitor_request, "columns", columns); | |
1972 | json_object_put(monitor_requests, table_name, | |
1973 | json_array_create_1(monitor_request)); | |
1974 | } | |
1975 | ovsdb_cs_free_schema(schema); | |
1976 | ||
1977 | return monitor_requests; | |
1978 | } | |
1979 | ||
1980 | static const struct ovsdb_cs_ops ovsdb_cs_server_ops = { | |
1981 | ovsdb_cs_compose_server_monitor_request | |
1982 | }; | |
a5c067a8 BP |
1983 | \f |
1984 | static void | |
1985 | log_error(struct ovsdb_error *error) | |
1986 | { | |
1987 | char *s = ovsdb_error_to_string_free(error); | |
1988 | VLOG_WARN("error parsing database schema: %s", s); | |
1989 | free(s); | |
1990 | } | |
1991 | ||
1992 | /* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC | |
1993 | * 7047, to obtain the names of its rows and columns. If successful, returns | |
1994 | * an shash whose keys are table names and whose values are ssets, where each | |
1995 | * sset contains the names of its table's columns. On failure (due to a parse | |
1996 | * error), returns NULL. | |
1997 | * | |
1998 | * It would also be possible to use the general-purpose OVSDB schema parser in | |
1999 | * ovsdb-server, but that's overkill, possibly too strict for the current use | |
2000 | * case, and would require restructuring ovsdb-server to separate the schema | |
2001 | * code from the rest. */ | |
2002 | struct shash * | |
2003 | ovsdb_cs_parse_schema(const struct json *schema_json) | |
2004 | { | |
2005 | struct ovsdb_parser parser; | |
2006 | const struct json *tables_json; | |
2007 | struct ovsdb_error *error; | |
2008 | struct shash_node *node; | |
2009 | struct shash *schema; | |
2010 | ||
2011 | ovsdb_parser_init(&parser, schema_json, "database schema"); | |
2012 | tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT); | |
2013 | error = ovsdb_parser_destroy(&parser); | |
2014 | if (error) { | |
2015 | log_error(error); | |
2016 | return NULL; | |
2017 | } | |
2018 | ||
2019 | schema = xmalloc(sizeof *schema); | |
2020 | shash_init(schema); | |
2021 | SHASH_FOR_EACH (node, json_object(tables_json)) { | |
2022 | const char *table_name = node->name; | |
2023 | const struct json *json = node->data; | |
2024 | const struct json *columns_json; | |
2025 | ||
2026 | ovsdb_parser_init(&parser, json, "table schema for table %s", | |
2027 | table_name); | |
2028 | columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT); | |
2029 | error = ovsdb_parser_destroy(&parser); | |
2030 | if (error) { | |
2031 | log_error(error); | |
2032 | ovsdb_cs_free_schema(schema); | |
2033 | return NULL; | |
2034 | } | |
2035 | ||
2036 | struct sset *columns = xmalloc(sizeof *columns); | |
2037 | sset_init(columns); | |
2038 | ||
2039 | struct shash_node *node2; | |
2040 | SHASH_FOR_EACH (node2, json_object(columns_json)) { | |
2041 | const char *column_name = node2->name; | |
2042 | sset_add(columns, column_name); | |
2043 | } | |
2044 | shash_add(schema, table_name, columns); | |
2045 | } | |
2046 | return schema; | |
2047 | } | |
2048 | ||
2049 | /* Frees 'schema', which is in the format returned by | |
2050 | * ovsdb_cs_parse_schema(). */ | |
2051 | void | |
2052 | ovsdb_cs_free_schema(struct shash *schema) | |
2053 | { | |
2054 | if (schema) { | |
2055 | struct shash_node *node, *next; | |
2056 | ||
2057 | SHASH_FOR_EACH_SAFE (node, next, schema) { | |
2058 | struct sset *sset = node->data; | |
2059 | sset_destroy(sset); | |
2060 | free(sset); | |
2061 | shash_delete(schema, node); | |
2062 | } | |
2063 | shash_destroy(schema); | |
2064 | free(schema); | |
2065 | } | |
2066 | } | |
2067 | \f | |
2068 | static struct ovsdb_error * OVS_WARN_UNUSED_RESULT | |
2069 | ovsdb_cs_parse_row_update1(const struct json *in, | |
2070 | struct ovsdb_cs_row_update *out) | |
2071 | { | |
2072 | const struct json *old_json, *new_json; | |
2073 | ||
2074 | old_json = shash_find_data(json_object(in), "old"); | |
2075 | new_json = shash_find_data(json_object(in), "new"); | |
2076 | if (old_json && old_json->type != JSON_OBJECT) { | |
2077 | return ovsdb_syntax_error(old_json, NULL, | |
2078 | "\"old\" <row> is not object"); | |
2079 | } else if (new_json && new_json->type != JSON_OBJECT) { | |
2080 | return ovsdb_syntax_error(new_json, NULL, | |
2081 | "\"new\" <row> is not object"); | |
2082 | } else if ((old_json != NULL) + (new_json != NULL) | |
2083 | != shash_count(json_object(in))) { | |
2084 | return ovsdb_syntax_error(in, NULL, | |
2085 | "<row-update> contains " | |
2086 | "unexpected member"); | |
2087 | } else if (!old_json && !new_json) { | |
2088 | return ovsdb_syntax_error(in, NULL, | |
2089 | "<row-update> missing \"old\" " | |
2090 | "and \"new\" members"); | |
2091 | } | |
2092 | ||
2093 | if (!new_json) { | |
2094 | out->type = OVSDB_CS_ROW_DELETE; | |
2095 | out->columns = json_object(old_json); | |
2096 | } else if (!old_json) { | |
2097 | out->type = OVSDB_CS_ROW_INSERT; | |
2098 | out->columns = json_object(new_json); | |
2099 | } else { | |
2100 | out->type = OVSDB_CS_ROW_UPDATE; | |
2101 | out->columns = json_object(new_json); | |
2102 | } | |
2103 | return NULL; | |
2104 | } | |
2105 | ||
2106 | static struct ovsdb_error * OVS_WARN_UNUSED_RESULT | |
2107 | ovsdb_cs_parse_row_update2(const struct json *in, | |
2108 | struct ovsdb_cs_row_update *out) | |
2109 | { | |
2110 | const struct shash *object = json_object(in); | |
2111 | if (shash_count(object) != 1) { | |
2112 | return ovsdb_syntax_error( | |
2113 | in, NULL, "<row-update2> has %"PRIuSIZE" members " | |
2114 | "instead of expected 1", shash_count(object)); | |
2115 | } | |
2116 | ||
2117 | struct shash_node *node = shash_first(object); | |
2118 | const struct json *columns = node->data; | |
2119 | if (!strcmp(node->name, "insert") || !strcmp(node->name, "initial")) { | |
2120 | out->type = OVSDB_CS_ROW_INSERT; | |
2121 | } else if (!strcmp(node->name, "modify")) { | |
2122 | out->type = OVSDB_CS_ROW_XOR; | |
2123 | } else if (!strcmp(node->name, "delete")) { | |
2124 | out->type = OVSDB_CS_ROW_DELETE; | |
2125 | if (columns->type != JSON_NULL) { | |
2126 | return ovsdb_syntax_error( | |
2127 | in, NULL, | |
2128 | "<row-update2> delete operation has unexpected value"); | |
2129 | } | |
2130 | return NULL; | |
2131 | } else { | |
2132 | return ovsdb_syntax_error(in, NULL, | |
2133 | "<row-update2> has unknown member \"%s\"", | |
2134 | node->name); | |
2135 | } | |
2136 | ||
2137 | if (columns->type != JSON_OBJECT) { | |
2138 | return ovsdb_syntax_error( | |
2139 | in, NULL, | |
2140 | "<row-update2> \"%s\" operation has unexpected value", | |
2141 | node->name); | |
2142 | } | |
2143 | out->columns = json_object(columns); | |
2144 | ||
2145 | return NULL; | |
2146 | } | |
2147 | ||
2148 | static struct ovsdb_error * OVS_WARN_UNUSED_RESULT | |
2149 | ovsdb_cs_parse_row_update(const char *table_name, | |
2150 | const struct json *in, int version, | |
2151 | struct ovsdb_cs_row_update *out) | |
2152 | { | |
2153 | if (in->type != JSON_OBJECT) { | |
2154 | const char *suffix = version > 1 ? "2" : ""; | |
2155 | return ovsdb_syntax_error( | |
2156 | in, NULL, | |
2157 | "<table-update%s> for table \"%s\" contains <row-update%s> " | |
2158 | "that is not an object", | |
2159 | suffix, table_name, suffix); | |
2160 | } | |
2161 | ||
2162 | return (version == 1 | |
2163 | ? ovsdb_cs_parse_row_update1(in, out) | |
2164 | : ovsdb_cs_parse_row_update2(in, out)); | |
2165 | } | |
2166 | ||
2167 | static struct ovsdb_error * OVS_WARN_UNUSED_RESULT | |
2168 | ovsdb_cs_parse_table_update(const char *table_name, | |
2169 | const struct json *in, int version, | |
2170 | struct ovsdb_cs_table_update *out) | |
2171 | { | |
2172 | const char *suffix = version > 1 ? "2" : ""; | |
2173 | ||
2174 | if (in->type != JSON_OBJECT) { | |
2175 | return ovsdb_syntax_error( | |
2176 | in, NULL, "<table-update%s> for table \"%s\" is not an object", | |
2177 | suffix, table_name); | |
2178 | } | |
2179 | struct shash *in_rows = json_object(in); | |
2180 | ||
2181 | out->row_updates = xmalloc(shash_count(in_rows) * sizeof *out->row_updates); | |
2182 | ||
2183 | const struct shash_node *node; | |
2184 | SHASH_FOR_EACH (node, in_rows) { | |
2185 | const char *row_uuid_string = node->name; | |
2186 | struct uuid row_uuid; | |
2187 | if (!uuid_from_string(&row_uuid, row_uuid_string)) { | |
2188 | return ovsdb_syntax_error( | |
2189 | in, NULL, | |
2190 | "<table-update%s> for table \"%s\" contains " | |
2191 | "bad UUID \"%s\" as member name", | |
2192 | suffix, table_name, row_uuid_string); | |
2193 | } | |
2194 | ||
2195 | const struct json *in_ru = node->data; | |
2196 | struct ovsdb_cs_row_update *out_ru = &out->row_updates[out->n++]; | |
2197 | *out_ru = (struct ovsdb_cs_row_update) { .row_uuid = row_uuid }; | |
2198 | ||
2199 | struct ovsdb_error *error = ovsdb_cs_parse_row_update( | |
2200 | table_name, in_ru, version, out_ru); | |
2201 | if (error) { | |
2202 | return error; | |
2203 | } | |
2204 | } | |
2205 | ||
2206 | return NULL; | |
2207 | } | |
2208 | ||
2209 | /* Parses OVSDB <table-updates> or <table-updates2> object 'in' into '*outp'. | |
2210 | * If successful, sets '*outp' to the new object and returns NULL. On failure, | |
2211 | * returns the error and sets '*outp' to NULL. | |
2212 | * | |
2213 | * On success, the caller must eventually free '*outp', with | |
2214 | * ovsdb_cs_db_update_destroy(). | |
2215 | * | |
2216 | * 'version' should be 1 if 'in' is a <table-updates>, 2 or 3 if it is a | |
2217 | * <table-updates2>. */ | |
2218 | struct ovsdb_error * OVS_WARN_UNUSED_RESULT | |
2219 | ovsdb_cs_parse_db_update(const struct json *in, int version, | |
2220 | struct ovsdb_cs_db_update **outp) | |
2221 | { | |
2222 | const char *suffix = version > 1 ? "2" : ""; | |
2223 | ||
2224 | *outp = NULL; | |
2225 | if (in->type != JSON_OBJECT) { | |
2226 | return ovsdb_syntax_error(in, NULL, | |
2227 | "<table-updates%s> is not an object", suffix); | |
2228 | } | |
2229 | ||
2230 | struct ovsdb_cs_db_update *out = xzalloc(sizeof *out); | |
2231 | out->table_updates = xmalloc(shash_count(json_object(in)) | |
2232 | * sizeof *out->table_updates); | |
2233 | const struct shash_node *node; | |
2234 | SHASH_FOR_EACH (node, json_object(in)) { | |
2235 | const char *table_name = node->name; | |
2236 | const struct json *in_tu = node->data; | |
2237 | ||
2238 | struct ovsdb_cs_table_update *out_tu = &out->table_updates[out->n++]; | |
2239 | *out_tu = (struct ovsdb_cs_table_update) { .table_name = table_name }; | |
2240 | ||
2241 | struct ovsdb_error *error = ovsdb_cs_parse_table_update( | |
2242 | table_name, in_tu, version, out_tu); | |
2243 | if (error) { | |
2244 | ovsdb_cs_db_update_destroy(out); | |
2245 | return error; | |
2246 | } | |
2247 | } | |
2248 | ||
2249 | *outp = out; | |
2250 | return NULL; | |
2251 | } | |
2252 | ||
2253 | /* Frees 'du', which was presumably allocated by ovsdb_cs_parse_db_update(). */ | |
2254 | void | |
2255 | ovsdb_cs_db_update_destroy(struct ovsdb_cs_db_update *du) | |
2256 | { | |
2257 | if (!du) { | |
2258 | return; | |
2259 | } | |
2260 | ||
2261 | for (size_t i = 0; i < du->n; i++) { | |
2262 | struct ovsdb_cs_table_update *tu = &du->table_updates[i]; | |
2263 | free(tu->row_updates); | |
2264 | } | |
2265 | free(du->table_updates); | |
2266 | free(du); | |
2267 | } | |
1c337c43 BP |
2268 | |
2269 | const struct ovsdb_cs_table_update * | |
2270 | ovsdb_cs_db_update_find_table(const struct ovsdb_cs_db_update *du, | |
2271 | const char *table_name) | |
2272 | { | |
2273 | for (size_t i = 0; i < du->n; i++) { | |
2274 | const struct ovsdb_cs_table_update *tu = &du->table_updates[i]; | |
2275 | if (!strcmp(tu->table_name, table_name)) { | |
2276 | return tu; | |
2277 | } | |
2278 | } | |
2279 | return NULL; | |
2280 | } | |
2281 |