]> git.proxmox.com Git - mirror_ovs.git/blame - lib/ovsdb-cs.c
cirrus: Use FreeBSD 12.2.
[mirror_ovs.git] / lib / ovsdb-cs.c
CommitLineData
a5c067a8
BP
1/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017 Nicira, Inc.
2 * Copyright (C) 2016 Hewlett Packard Enterprise Development LP
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITION OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <config.h>
18
19#include "ovsdb-cs.h"
20
21#include <errno.h>
22
23#include "hash.h"
24#include "jsonrpc.h"
25#include "openvswitch/dynamic-string.h"
26#include "openvswitch/hmap.h"
27#include "openvswitch/json.h"
28#include "openvswitch/poll-loop.h"
29#include "openvswitch/shash.h"
30#include "openvswitch/vlog.h"
31#include "ovsdb-data.h"
32#include "ovsdb-error.h"
33#include "ovsdb-parser.h"
34#include "ovsdb-session.h"
35#include "ovsdb-types.h"
36#include "sset.h"
37#include "svec.h"
38#include "util.h"
39#include "uuid.h"
40
41VLOG_DEFINE_THIS_MODULE(ovsdb_cs);
1c337c43
BP
42
43/* Connection state machine.
44 *
45 * When a JSON-RPC session connects, the CS layer sends a "monitor_cond"
46 * request for the Database table in the _Server database and transitions to
47 * the CS_S_SERVER_MONITOR_REQUESTED state. If the session drops and
48 * reconnects, or if the CS receives a "monitor_canceled" notification for a
49 * table it is monitoring, the CS starts over again in the same way. */
50#define OVSDB_CS_STATES \
51 /* Waits for "get_schema" reply, then sends "monitor_cond" \
52 * request for the Database table in the _Server database, whose \
53 * details are informed by the schema, and transitions to \
54 * CS_S_SERVER_MONITOR_REQUESTED. */ \
55 OVSDB_CS_STATE(SERVER_SCHEMA_REQUESTED) \
56 \
57 /* Waits for "monitor_cond" reply for the Database table: \
58 * \
59 * - If the reply indicates success, and the Database table has a \
60 * row for the CS database: \
61 * \
62 * * If the row indicates that this is a clustered database \
63 * that is not connected to the cluster, closes the \
64 * connection. The next connection attempt has a chance at \
65 * picking a connected server. \
66 * \
67 * * Otherwise, sends a monitoring request for the CS \
68 * database whose details are informed by the schema \
69 * (obtained from the row), and transitions to \
70 * CS_S_DATA_MONITOR_(COND_(SINCE_))REQUESTED. \
71 * \
72 * - If the reply indicates success, but the Database table does \
73 * not have a row for the CS database, transitions to \
74 * CS_S_ERROR. \
75 * \
76 * - If the reply indicates failure, sends a "get_schema" request \
77 * for the CS database and transitions to \
78 * CS_S_DATA_SCHEMA_REQUESTED. */ \
79 OVSDB_CS_STATE(SERVER_MONITOR_REQUESTED) \
80 \
81 /* Waits for "get_schema" reply, then sends "monitor_cond" \
82 * request whose details are informed by the schema, and \
83 * transitions to CS_S_DATA_MONITOR_COND_REQUESTED. */ \
84 OVSDB_CS_STATE(DATA_SCHEMA_REQUESTED) \
85 \
86 /* Waits for "monitor_cond_since" reply. If successful, replaces \
87 * the CS contents by the data carried in the reply and \
88 * transitions to CS_S_MONITORING. On failure, sends a \
89 * "monitor_cond" request and transitions to \
90 * CS_S_DATA_MONITOR_COND_REQUESTED. */ \
91 OVSDB_CS_STATE(DATA_MONITOR_COND_SINCE_REQUESTED) \
92 \
93 /* Waits for "monitor_cond" reply. If successful, replaces the \
94 * CS contents by the data carried in the reply and transitions \
95 * to CS_S_MONITORING. On failure, sends a "monitor" request \
96 * and transitions to CS_S_DATA_MONITOR_REQUESTED. */ \
97 OVSDB_CS_STATE(DATA_MONITOR_COND_REQUESTED) \
98 \
99 /* Waits for "monitor" reply. If successful, replaces the CS \
100 * contents by the data carried in the reply and transitions to \
101 * CS_S_MONITORING. On failure, transitions to CS_S_ERROR. */ \
102 OVSDB_CS_STATE(DATA_MONITOR_REQUESTED) \
103 \
104 /* State that processes "update", "update2" or "update3" \
105 * notifications for the main database (and the Database table \
106 * in _Server if available). \
107 * \
108 * If we're monitoring the Database table and we get notified \
109 * that the CS database has been deleted, we close the \
110 * connection (which will restart the state machine). */ \
111 OVSDB_CS_STATE(MONITORING) \
112 \
113 /* Terminal error state that indicates that nothing useful can be \
114 * done, for example because the database server doesn't actually \
115 * have the desired database. We maintain the session with the \
116 * database server anyway. If it starts serving the database \
117 * that we want, or if someone fixes and restarts the database, \
118 * then it will kill the session and we will automatically \
119 * reconnect and try again. */ \
120 OVSDB_CS_STATE(ERROR) \
121 \
122 /* Terminal state that indicates we connected to a useless server \
123 * in a cluster, e.g. one that is partitioned from the rest of \
124 * the cluster. We're waiting to retry. */ \
125 OVSDB_CS_STATE(RETRY)
126
127enum ovsdb_cs_state {
128#define OVSDB_CS_STATE(NAME) CS_S_##NAME,
129 OVSDB_CS_STATES
130#undef OVSDB_CS_STATE
131};
132
133static const char *
134ovsdb_cs_state_to_string(enum ovsdb_cs_state state)
135{
136 switch (state) {
137#define OVSDB_CS_STATE(NAME) case CS_S_##NAME: return #NAME;
138 OVSDB_CS_STATES
139#undef OVSDB_CS_STATE
140 default: return "<unknown>";
141 }
142}
143
144/* A database being monitored.
145 *
146 * There are two instances of this data structure for each CS instance, one for
147 * the _Server database used for working with clusters, and the other one for
148 * the actual database that the client is interested in. */
149struct ovsdb_cs_db {
150 struct ovsdb_cs *cs;
151
152 /* Data. */
153 const char *db_name; /* Database's name. */
154 struct hmap tables; /* Contains "struct ovsdb_cs_db_table *"s.*/
155 struct json *monitor_id;
156 struct json *schema;
157
158 /* Monitor version. */
159 int max_version; /* Maximum version of monitor request to use. */
160 int monitor_version; /* 0 if not monitoring, 1=monitor,
161 * 2=monitor_cond, 3=monitor_cond_since. */
162
163 /* Condition changes. */
164 bool cond_changed; /* Change not yet sent to server? */
165 unsigned int cond_seqno; /* Increments when condition changes. */
166
167 /* Database locking. */
168 char *lock_name; /* Name of lock we need, NULL if none. */
169 bool has_lock; /* Has db server told us we have the lock? */
170 bool is_lock_contended; /* Has db server told us we can't get lock? */
171 struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
172
173 /* Last db txn id, used for fast resync through monitor_cond_since */
174 struct uuid last_id;
175
176 /* Client interface. */
177 struct ovs_list events;
178 const struct ovsdb_cs_ops *ops;
179 void *ops_aux;
180};
181
182static const struct ovsdb_cs_ops ovsdb_cs_server_ops;
183
184static void ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *);
185static unsigned int ovsdb_cs_db_set_condition(
186 struct ovsdb_cs_db *, const char *db_name, const struct json *condition);
187
188static void ovsdb_cs_send_schema_request(struct ovsdb_cs *,
189 struct ovsdb_cs_db *);
190static void ovsdb_cs_send_db_change_aware(struct ovsdb_cs *);
191static bool ovsdb_cs_check_server_db(struct ovsdb_cs *);
192static void ovsdb_cs_clear_server_rows(struct ovsdb_cs *);
193static void ovsdb_cs_send_monitor_request(struct ovsdb_cs *,
194 struct ovsdb_cs_db *, int version);
195static void ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db);
196static void ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db);
197
198struct ovsdb_cs {
199 struct ovsdb_cs_db server;
200 struct ovsdb_cs_db data;
201
202 /* Session state.
203 *
204 * 'state_seqno' is a snapshot of the session's sequence number as returned
205 * jsonrpc_session_get_seqno(session), so if it differs from the value that
206 * function currently returns then the session has reconnected and the
207 * state machine must restart. */
208 struct jsonrpc_session *session; /* Connection to the server. */
209 char *remote; /* 'session' remote name. */
210 enum ovsdb_cs_state state; /* Current session state. */
211 unsigned int state_seqno; /* See above. */
212 struct json *request_id; /* JSON ID for request awaiting reply. */
213
214 /* IDs of outstanding transactions. */
215 struct json **txns;
216 size_t n_txns, allocated_txns;
217
218 /* Info for the _Server database. */
219 struct uuid cid;
220 struct hmap server_rows;
221
222 /* Clustered servers. */
223 uint64_t min_index; /* Minimum allowed index, to avoid regression. */
224 bool leader_only; /* If true, do not connect to Raft followers. */
225 bool shuffle_remotes; /* If true, connect to servers in random order. */
226};
227
228static void ovsdb_cs_transition_at(struct ovsdb_cs *, enum ovsdb_cs_state,
229 const char *where);
230#define ovsdb_cs_transition(CS, STATE) \
231 ovsdb_cs_transition_at(CS, STATE, OVS_SOURCE_LOCATOR)
232
233static void ovsdb_cs_retry_at(struct ovsdb_cs *, const char *where);
234#define ovsdb_cs_retry(CS) ovsdb_cs_retry_at(CS, OVS_SOURCE_LOCATOR)
235
236static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
237
238static void ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *,
239 const struct json *result,
240 int version);
241static bool ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *,
242 const struct jsonrpc_msg *);
243static bool ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *,
244 struct ovsdb_cs_db *,
245 const struct jsonrpc_msg *);
246
247static bool ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *,
248 const struct jsonrpc_msg *);
249static struct jsonrpc_msg *ovsdb_cs_db_compose_lock_request(
250 struct ovsdb_cs_db *);
251static struct jsonrpc_msg *ovsdb_cs_db_compose_unlock_request(
252 struct ovsdb_cs_db *);
253static void ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *,
254 const struct json *);
255static bool ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *,
256 const struct json *params,
257 bool new_has_lock);
258static void ovsdb_cs_send_cond_change(struct ovsdb_cs *);
259
260static bool ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *,
261 const struct jsonrpc_msg *reply);
262\f
263/* Events. */
264
265void
266ovsdb_cs_event_destroy(struct ovsdb_cs_event *event)
267{
268 if (event) {
269 switch (event->type) {
270 case OVSDB_CS_EVENT_TYPE_RECONNECT:
271 case OVSDB_CS_EVENT_TYPE_LOCKED:
272 break;
273
274 case OVSDB_CS_EVENT_TYPE_UPDATE:
275 json_destroy(event->update.table_updates);
276 break;
277
278 case OVSDB_CS_EVENT_TYPE_TXN_REPLY:
279 jsonrpc_msg_destroy(event->txn_reply);
280 break;
281 }
282 free(event);
283 }
284}
285\f
286/* Lifecycle. */
287
288static void
289ovsdb_cs_db_init(struct ovsdb_cs_db *db, const char *db_name,
290 struct ovsdb_cs *parent, int max_version,
291 const struct ovsdb_cs_ops *ops, void *ops_aux)
292{
293 *db = (struct ovsdb_cs_db) {
294 .cs = parent,
295 .db_name = db_name,
296 .tables = HMAP_INITIALIZER(&db->tables),
297 .max_version = max_version,
298 .monitor_id = json_array_create_2(json_string_create("monid"),
299 json_string_create(db_name)),
300 .events = OVS_LIST_INITIALIZER(&db->events),
301 .ops = ops,
302 .ops_aux = ops_aux,
303 };
304}
305
306/* Creates and returns a new client synchronization object. The connection
307 * will monitor remote database 'db_name'. If 'retry' is true, then also
308 * reconnect if the connection fails.
309 *
310 * XXX 'max_version' should ordinarily be 3, to allow use of the most efficient
311 * "monitor_cond_since" method with the database. Currently there's some kind
312 * of bug in the DDlog Rust code that interfaces to that, so instead
313 * ovn-northd-ddlog passes 1 to use plain 'monitor' instead. Once the DDlog
314 * Rust code gets fixed, we might as well just delete 'max_version'
315 * entirely.
316 *
317 * 'ops' is a struct for northd_cs_run() to use, and 'ops_aux' is a pointer
318 * that gets passed into each call.
319 *
320 * Use ovsdb_cs_set_remote() to configure the database to which to connect.
321 * Until a remote is configured, no data can be retrieved.
322 */
323struct ovsdb_cs *
324ovsdb_cs_create(const char *db_name, int max_version,
325 const struct ovsdb_cs_ops *ops, void *ops_aux)
326{
327 struct ovsdb_cs *cs = xzalloc(sizeof *cs);
328 ovsdb_cs_db_init(&cs->server, "_Server", cs, 2, &ovsdb_cs_server_ops, cs);
329 ovsdb_cs_db_init(&cs->data, db_name, cs, max_version, ops, ops_aux);
330 cs->state_seqno = UINT_MAX;
331 cs->request_id = NULL;
332 cs->leader_only = true;
333 cs->shuffle_remotes = true;
334 hmap_init(&cs->server_rows);
335
336 return cs;
337}
338
339static void
340ovsdb_cs_db_destroy(struct ovsdb_cs_db *db)
341{
342 ovsdb_cs_db_destroy_tables(db);
343
344 json_destroy(db->monitor_id);
345 json_destroy(db->schema);
346
347 free(db->lock_name);
348
349 json_destroy(db->lock_request_id);
350
351 /* This list always gets flushed out at the end of ovsdb_cs_run(). */
352 ovs_assert(ovs_list_is_empty(&db->events));
353}
354
355/* Destroys 'cs' and all of the data structures that it manages. */
356void
357ovsdb_cs_destroy(struct ovsdb_cs *cs)
358{
359 if (cs) {
360 ovsdb_cs_db_destroy(&cs->server);
361 ovsdb_cs_db_destroy(&cs->data);
362 jsonrpc_session_close(cs->session);
363 free(cs->remote);
364 json_destroy(cs->request_id);
365
366 for (size_t i = 0; i < cs->n_txns; i++) {
367 json_destroy(cs->txns[i]);
368 }
369 free(cs->txns);
370
371 ovsdb_cs_clear_server_rows(cs);
372 hmap_destroy(&cs->server_rows);
373
374 free(cs);
375 }
376}
377
378static void
379ovsdb_cs_transition_at(struct ovsdb_cs *cs, enum ovsdb_cs_state new_state,
380 const char *where)
381{
382 VLOG_DBG("%s: %s -> %s at %s",
383 cs->session ? jsonrpc_session_get_name(cs->session) : "void",
384 ovsdb_cs_state_to_string(cs->state),
385 ovsdb_cs_state_to_string(new_state),
386 where);
387 cs->state = new_state;
388}
389
390static void
391ovsdb_cs_send_request(struct ovsdb_cs *cs, struct jsonrpc_msg *request)
392{
393 json_destroy(cs->request_id);
394 cs->request_id = json_clone(request->id);
395 if (cs->session) {
396 jsonrpc_session_send(cs->session, request);
397 } else {
398 jsonrpc_msg_destroy(request);
399 }
400}
401
402static void
403ovsdb_cs_retry_at(struct ovsdb_cs *cs, const char *where)
404{
405 ovsdb_cs_force_reconnect(cs);
406 ovsdb_cs_transition_at(cs, CS_S_RETRY, where);
407}
408
409static void
410ovsdb_cs_restart_fsm(struct ovsdb_cs *cs)
411{
412 /* Resync data DB table conditions to avoid missing updates due to
413 * conditions that were in flight or changed locally while the connection
414 * was down.
415 */
416 ovsdb_cs_db_sync_condition(&cs->data);
417
418 ovsdb_cs_send_schema_request(cs, &cs->server);
419 ovsdb_cs_transition(cs, CS_S_SERVER_SCHEMA_REQUESTED);
420 cs->data.monitor_version = 0;
421 cs->server.monitor_version = 0;
422}
423
424static void
425ovsdb_cs_process_response(struct ovsdb_cs *cs, struct jsonrpc_msg *msg)
426{
427 bool ok = msg->type == JSONRPC_REPLY;
428 if (!ok
429 && cs->state != CS_S_SERVER_SCHEMA_REQUESTED
430 && cs->state != CS_S_SERVER_MONITOR_REQUESTED
431 && cs->state != CS_S_DATA_MONITOR_COND_REQUESTED
432 && cs->state != CS_S_DATA_MONITOR_COND_SINCE_REQUESTED) {
433 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
434 char *s = jsonrpc_msg_to_string(msg);
435 VLOG_INFO_RL(&rl, "%s: received unexpected %s response in "
436 "%s state: %s", jsonrpc_session_get_name(cs->session),
437 jsonrpc_msg_type_to_string(msg->type),
438 ovsdb_cs_state_to_string(cs->state),
439 s);
440 free(s);
441 ovsdb_cs_retry(cs);
442 return;
443 }
444
445 switch (cs->state) {
446 case CS_S_SERVER_SCHEMA_REQUESTED:
447 if (ok) {
448 json_destroy(cs->server.schema);
449 cs->server.schema = json_clone(msg->result);
450 ovsdb_cs_send_monitor_request(cs, &cs->server,
451 cs->server.max_version);
452 ovsdb_cs_transition(cs, CS_S_SERVER_MONITOR_REQUESTED);
453 } else {
454 ovsdb_cs_send_schema_request(cs, &cs->data);
455 ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED);
456 }
457 break;
458
459 case CS_S_SERVER_MONITOR_REQUESTED:
460 if (ok) {
461 cs->server.monitor_version = cs->server.max_version;
462 ovsdb_cs_db_parse_monitor_reply(&cs->server, msg->result,
463 cs->server.monitor_version);
464 if (ovsdb_cs_check_server_db(cs)) {
465 ovsdb_cs_send_db_change_aware(cs);
466 }
467 } else {
468 ovsdb_cs_send_schema_request(cs, &cs->data);
469 ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED);
470 }
471 break;
472
473 case CS_S_DATA_SCHEMA_REQUESTED:
474 json_destroy(cs->data.schema);
475 cs->data.schema = json_clone(msg->result);
476 if (cs->data.max_version >= 2) {
477 ovsdb_cs_send_monitor_request(cs, &cs->data, 2);
478 ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED);
479 } else {
480 ovsdb_cs_send_monitor_request(cs, &cs->data, 1);
481 ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED);
482 }
483 break;
484
485 case CS_S_DATA_MONITOR_COND_SINCE_REQUESTED:
486 if (!ok) {
487 /* "monitor_cond_since" not supported. Try "monitor_cond". */
488 ovsdb_cs_send_monitor_request(cs, &cs->data, 2);
489 ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED);
490 } else {
491 cs->data.monitor_version = 3;
492 ovsdb_cs_transition(cs, CS_S_MONITORING);
493 ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 3);
494 }
495 break;
496
497 case CS_S_DATA_MONITOR_COND_REQUESTED:
498 if (!ok) {
499 /* "monitor_cond" not supported. Try "monitor". */
500 ovsdb_cs_send_monitor_request(cs, &cs->data, 1);
501 ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED);
502 } else {
503 cs->data.monitor_version = 2;
504 ovsdb_cs_transition(cs, CS_S_MONITORING);
505 ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 2);
506 }
507 break;
508
509 case CS_S_DATA_MONITOR_REQUESTED:
510 cs->data.monitor_version = 1;
511 ovsdb_cs_transition(cs, CS_S_MONITORING);
512 ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 1);
513 break;
514
515 case CS_S_MONITORING:
516 /* We don't normally have a request outstanding in this state. If we
517 * do, it's a "monitor_cond_change", which means that the conditional
518 * monitor clauses were updated.
519 *
520 * Mark the last requested conditions as acked and if further
521 * condition changes were pending, send them now. */
522 ovsdb_cs_db_ack_condition(&cs->data);
523 ovsdb_cs_send_cond_change(cs);
524 cs->data.cond_seqno++;
525 break;
526
527 case CS_S_ERROR:
528 case CS_S_RETRY:
529 /* Nothing to do in this state. */
530 break;
531
532 default:
533 OVS_NOT_REACHED();
534 }
535}
536
537static void
538ovsdb_cs_process_msg(struct ovsdb_cs *cs, struct jsonrpc_msg *msg)
539{
540 bool is_response = (msg->type == JSONRPC_REPLY ||
541 msg->type == JSONRPC_ERROR);
542
543 /* Process a reply to an outstanding request. */
544 if (is_response
545 && cs->request_id && json_equal(cs->request_id, msg->id)) {
546 json_destroy(cs->request_id);
547 cs->request_id = NULL;
548 ovsdb_cs_process_response(cs, msg);
549 return;
550 }
551
552 /* Process database contents updates. */
553 if (ovsdb_cs_db_parse_update_rpc(&cs->data, msg)) {
554 return;
555 }
556 if (cs->server.monitor_version
557 && ovsdb_cs_db_parse_update_rpc(&cs->server, msg)) {
558 ovsdb_cs_check_server_db(cs);
559 return;
560 }
561
562 if (ovsdb_cs_handle_monitor_canceled(cs, &cs->data, msg)
563 || (cs->server.monitor_version
564 && ovsdb_cs_handle_monitor_canceled(cs, &cs->server, msg))) {
565 return;
566 }
567
568 /* Process "lock" replies and related notifications. */
569 if (ovsdb_cs_db_process_lock_replies(&cs->data, msg)) {
570 return;
571 }
572
573 /* Process response to a database transaction we submitted. */
574 if (is_response && ovsdb_cs_db_txn_process_reply(cs, msg)) {
575 return;
576 }
577
578 /* Unknown message. Log at a low level because this can happen if
579 * ovsdb_cs_txn_destroy() is called to destroy a transaction
580 * before we receive the reply.
581 *
582 * (We could sort those out from other kinds of unknown messages by
583 * using distinctive IDs for transactions, if it seems valuable to
584 * do so, and then it would be possible to use different log
585 * levels. XXX?) */
586 char *s = jsonrpc_msg_to_string(msg);
587 VLOG_DBG("%s: received unexpected %s message: %s",
588 jsonrpc_session_get_name(cs->session),
589 jsonrpc_msg_type_to_string(msg->type), s);
590 free(s);
591}
592
593static struct ovsdb_cs_event *
594ovsdb_cs_db_add_event(struct ovsdb_cs_db *db, enum ovsdb_cs_event_type type)
595{
596 struct ovsdb_cs_event *event = xmalloc(sizeof *event);
597 event->type = type;
598 ovs_list_push_back(&db->events, &event->list_node);
599 return event;
600}
601
602/* Processes a batch of messages from the database server on 'cs'. This may
603 * cause the CS's contents to change.
604 *
605 * Initializes 'events' with a list of events that occurred on 'cs'. The
606 * caller must process and destroy all of the events. */
607void
608ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events)
609{
610 ovs_list_init(events);
611 if (!cs->session) {
612 return;
613 }
614
615 ovsdb_cs_send_cond_change(cs);
616
617 jsonrpc_session_run(cs->session);
618
619 unsigned int seqno = jsonrpc_session_get_seqno(cs->session);
620 if (cs->state_seqno != seqno) {
621 cs->state_seqno = seqno;
622 ovsdb_cs_restart_fsm(cs);
623
624 for (size_t i = 0; i < cs->n_txns; i++) {
625 json_destroy(cs->txns[i]);
626 }
627 cs->n_txns = 0;
628
629 ovsdb_cs_db_add_event(&cs->data, OVSDB_CS_EVENT_TYPE_RECONNECT);
630
631 if (cs->data.lock_name) {
632 jsonrpc_session_send(
633 cs->session,
634 ovsdb_cs_db_compose_lock_request(&cs->data));
635 }
636 }
637
638 for (int i = 0; i < 50; i++) {
639 struct jsonrpc_msg *msg = jsonrpc_session_recv(cs->session);
640 if (!msg) {
641 break;
642 }
643 ovsdb_cs_process_msg(cs, msg);
644 jsonrpc_msg_destroy(msg);
645 }
646 ovs_list_push_back_all(events, &cs->data.events);
647}
648
649/* Arranges for poll_block() to wake up when ovsdb_cs_run() has something to
650 * do or when activity occurs on a transaction on 'cs'. */
651void
652ovsdb_cs_wait(struct ovsdb_cs *cs)
653{
654 if (!cs->session) {
655 return;
656 }
657 jsonrpc_session_wait(cs->session);
658 jsonrpc_session_recv_wait(cs->session);
659}
660\f
661/* Network connection. */
662
663/* Changes the remote and creates a new session.
664 *
665 * If 'retry' is true, the connection to the remote will automatically retry
666 * when it fails. If 'retry' is false, the connection is one-time. */
667void
668ovsdb_cs_set_remote(struct ovsdb_cs *cs, const char *remote, bool retry)
669{
670 if (cs
671 && ((remote != NULL) != (cs->remote != NULL)
672 || (remote && cs->remote && strcmp(remote, cs->remote)))) {
673 /* Close the old session, if any. */
674 if (cs->session) {
675 jsonrpc_session_close(cs->session);
676 cs->session = NULL;
677
678 free(cs->remote);
679 cs->remote = NULL;
680 }
681
682 /* Open new session, if any. */
683 if (remote) {
684 struct svec remotes = SVEC_EMPTY_INITIALIZER;
685 ovsdb_session_parse_remote(remote, &remotes, &cs->cid);
686 if (cs->shuffle_remotes) {
687 svec_shuffle(&remotes);
688 }
689 cs->session = jsonrpc_session_open_multiple(&remotes, retry);
690 svec_destroy(&remotes);
691
692 cs->state_seqno = UINT_MAX;
693
694 cs->remote = xstrdup(remote);
695 }
696 }
697}
698
699/* Reconfigures 'cs' so that it would reconnect to the database, if
700 * connection was dropped. */
701void
702ovsdb_cs_enable_reconnect(struct ovsdb_cs *cs)
703{
704 if (cs->session) {
705 jsonrpc_session_enable_reconnect(cs->session);
706 }
707}
708
709/* Forces 'cs' to drop its connection to the database and reconnect. In the
710 * meantime, the contents of 'cs' will not change. */
711void
712ovsdb_cs_force_reconnect(struct ovsdb_cs *cs)
713{
714 if (cs->session) {
715 jsonrpc_session_force_reconnect(cs->session);
716 }
717}
718
719/* Drops 'cs''s current connection and the cached session. This is useful if
720 * the client notices some kind of inconsistency. */
721void
722ovsdb_cs_flag_inconsistency(struct ovsdb_cs *cs)
723{
724 cs->data.last_id = UUID_ZERO;
725 ovsdb_cs_retry(cs);
726}
727
728/* Returns true if 'cs' is currently connected or will eventually try to
729 * reconnect. */
730bool
731ovsdb_cs_is_alive(const struct ovsdb_cs *cs)
732{
733 return (cs->session
734 && jsonrpc_session_is_alive(cs->session)
735 && cs->state != CS_S_ERROR);
736}
737
738/* Returns true if 'cs' is currently connected to a server. */
739bool
740ovsdb_cs_is_connected(const struct ovsdb_cs *cs)
741{
742 return cs->session && jsonrpc_session_is_connected(cs->session);
743}
744
745/* Returns the last error reported on a connection by 'cs'. The return value
746 * is 0 only if no connection made by 'cs' has ever encountered an error and
747 * a negative response to a schema request has never been received. See
748 * jsonrpc_get_status() for jsonrpc_session_get_last_error() return value
749 * interpretation. */
750int
751ovsdb_cs_get_last_error(const struct ovsdb_cs *cs)
752{
753 int err = cs->session ? jsonrpc_session_get_last_error(cs->session) : 0;
754 if (err) {
755 return err;
756 } else if (cs->state == CS_S_ERROR) {
757 return ENOENT;
758 } else {
759 return 0;
760 }
761}
762
763/* Sets the "probe interval" for 'cs''s current session to 'probe_interval', in
764 * milliseconds. */
765void
766ovsdb_cs_set_probe_interval(const struct ovsdb_cs *cs, int probe_interval)
767{
768 if (cs->session) {
769 jsonrpc_session_set_probe_interval(cs->session, probe_interval);
770 }
771}
772\f
773/* Conditional monitoring. */
774
775/* A table being monitored.
776 *
777 * At the CS layer, the only thing we care about, table-wise, is the conditions
778 * we're using for monitoring them, so there's little here. We only create
779 * these table structures at all for tables that have conditions. */
780struct ovsdb_cs_db_table {
781 struct hmap_node hmap_node; /* Indexed by 'name'. */
782 char *name; /* Table name. */
783
784 /* Each of these is a null pointer if it is empty, or JSON [<condition>*]
785 * or [true] or [false] otherwise. [true] could be represented as a null
786 * pointer, but we want to distinguish "empty slot" from "a condition that
787 * is always true" in a slot. */
788 struct json *ack_cond; /* Last condition acked by the server. */
789 struct json *req_cond; /* Last condition requested to the server. */
790 struct json *new_cond; /* Latest condition set by the IDL client. */
791};
792
793/* A kind of condition, so that we can treat equivalent JSON as equivalent. */
794enum condition_type {
795 COND_FALSE, /* [] or [false] */
796 COND_TRUE, /* Null pointer or [true] */
797 COND_OTHER /* Anything else. */
798};
799
800/* Returns the condition_type for 'condition'. */
801static enum condition_type
802condition_classify(const struct json *condition)
803{
804 if (condition) {
805 const struct json_array *a = json_array(condition);
806 switch (a->n) {
807 case 0:
808 return COND_FALSE;
809
810 case 1:
811 return (a->elems[0]->type == JSON_FALSE ? COND_FALSE
812 : a->elems[0]->type == JSON_TRUE ? COND_TRUE
813 : COND_OTHER);
814
815 default:
816 return COND_OTHER;
817 }
818 } else {
819 return COND_TRUE;
820 }
821}
822
823/* Returns true if 'a' and 'b' are the same condition (in an obvious way; we're
824 * not going to compare for boolean equivalence or anything). */
825static bool
826condition_equal(const struct json *a, const struct json *b)
827{
828 enum condition_type type = condition_classify(a);
829 return (type == condition_classify(b)
830 && (type != COND_OTHER || json_equal(a, b)));
831}
832
833/* Returns a clone of 'condition', translating always-true and always-false to
834 * [true] and [false], respectively. */
835static struct json *
836condition_clone(const struct json *condition)
837{
838 switch (condition_classify(condition)) {
839 case COND_TRUE:
840 return json_array_create_1(json_boolean_create(true));
841
842 case COND_FALSE:
843 return json_array_create_1(json_boolean_create(false));
844
845 case COND_OTHER:
846 return json_clone(condition);
847 }
848
849 OVS_NOT_REACHED();
850}
851
852/* Returns the ovsdb_cs_db_table associated with 'table' in 'db', creating an
853 * empty one if necessary. */
854static struct ovsdb_cs_db_table *
855ovsdb_cs_db_get_table(struct ovsdb_cs_db *db, const char *table)
856{
857 uint32_t hash = hash_string(table, 0);
858 struct ovsdb_cs_db_table *t;
859
860 HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &db->tables) {
861 if (!strcmp(t->name, table)) {
862 return t;
863 }
864 }
865
866 t = xzalloc(sizeof *t);
867 t->name = xstrdup(table);
868 t->new_cond = json_array_create_1(json_boolean_create(true));
869 hmap_insert(&db->tables, &t->hmap_node, hash);
870 return t;
871}
872
873static void
874ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *db)
875{
876 struct ovsdb_cs_db_table *table, *next;
877 HMAP_FOR_EACH_SAFE (table, next, hmap_node, &db->tables) {
878 json_destroy(table->ack_cond);
879 json_destroy(table->req_cond);
880 json_destroy(table->new_cond);
881 hmap_remove(&db->tables, &table->hmap_node);
882 free(table->name);
883 free(table);
884 }
885 hmap_destroy(&db->tables);
886}
887
888static unsigned int
889ovsdb_cs_db_set_condition(struct ovsdb_cs_db *db, const char *table,
890 const struct json *condition)
891{
892 /* Compare the new condition to the last known condition which can be
893 * either "new" (not sent yet), "requested" or "acked", in this order. */
894 struct ovsdb_cs_db_table *t = ovsdb_cs_db_get_table(db, table);
895 const struct json *table_cond = (t->new_cond ? t->new_cond
896 : t->req_cond ? t->req_cond
897 : t->ack_cond);
898 if (!condition_equal(condition, table_cond)) {
899 json_destroy(t->new_cond);
900 t->new_cond = condition_clone(condition);
901 db->cond_changed = true;
902 poll_immediate_wake();
903 }
904
905 /* Conditions will be up to date when we receive replies for already
906 * requested and new conditions, if any. */
907 return db->cond_seqno + (t->new_cond ? 1 : 0) + (t->req_cond ? 1 : 0);
908}
909
910/* Sets the replication condition for 'tc' in 'cs' to 'condition' and arranges
911 * to send the new condition to the database server.
912 *
913 * Return the next conditional update sequence number. When this value and
914 * ovsdb_cs_get_condition_seqno() matches, 'cs' contains rows that match the
915 * 'condition'. */
916unsigned int
917ovsdb_cs_set_condition(struct ovsdb_cs *cs, const char *table,
918 const struct json *condition)
919{
920 return ovsdb_cs_db_set_condition(&cs->data, table, condition);
921}
922
923/* Returns a "sequence number" that represents the number of conditional
924 * monitoring updates successfully received by the OVSDB server of a CS
925 * connection.
926 *
927 * ovsdb_cs_set_condition() sets a new condition that is different from the
928 * current condtion, the next expected "sequence number" is returned.
929 *
930 * Whenever ovsdb_cs_get_condition_seqno() returns a value that matches the
931 * return value of ovsdb_cs_set_condition(), the client is assured that:
932 *
933 * - The ovsdb_cs_set_condition() changes has been acknowledged by the OVSDB
934 * server.
935 *
936 * - 'cs' now contains the content matches the new conditions. */
937unsigned int
938ovsdb_cs_get_condition_seqno(const struct ovsdb_cs *cs)
939{
940 return cs->data.cond_seqno;
941}
942
943static struct json *
944ovsdb_cs_create_cond_change_req(const struct json *cond)
945{
946 struct json *monitor_cond_change_request = json_object_create();
947 json_object_put(monitor_cond_change_request, "where", json_clone(cond));
948 return monitor_cond_change_request;
949}
950
951static struct jsonrpc_msg *
952ovsdb_cs_db_compose_cond_change(struct ovsdb_cs_db *db)
953{
954 if (!db->cond_changed) {
955 return NULL;
956 }
957
958 struct json *monitor_cond_change_requests = NULL;
959 struct ovsdb_cs_db_table *table;
960 HMAP_FOR_EACH (table, hmap_node, &db->tables) {
961 /* Always use the most recent conditions set by the CS client when
962 * requesting monitor_cond_change, i.e., table->new_cond.
963 */
964 if (table->new_cond) {
965 struct json *req =
966 ovsdb_cs_create_cond_change_req(table->new_cond);
967 if (req) {
968 if (!monitor_cond_change_requests) {
969 monitor_cond_change_requests = json_object_create();
970 }
971 json_object_put(monitor_cond_change_requests,
972 table->name,
973 json_array_create_1(req));
974 }
975 /* Mark the new condition as requested by moving it to req_cond.
976 * If there's already requested condition that's a bug.
977 */
978 ovs_assert(table->req_cond == NULL);
979 table->req_cond = table->new_cond;
980 table->new_cond = NULL;
981 }
982 }
983
984 if (!monitor_cond_change_requests) {
985 return NULL;
986 }
987
988 db->cond_changed = false;
989 struct json *params = json_array_create_3(json_clone(db->monitor_id),
990 json_clone(db->monitor_id),
991 monitor_cond_change_requests);
992 return jsonrpc_create_request("monitor_cond_change", params, NULL);
993}
994
995/* Marks all requested table conditions in 'db' as acked by the server.
996 * It should be called when the server replies to monitor_cond_change
997 * requests.
998 */
999static void
1000ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db)
1001{
1002 struct ovsdb_cs_db_table *table;
1003 HMAP_FOR_EACH (table, hmap_node, &db->tables) {
1004 if (table->req_cond) {
1005 json_destroy(table->ack_cond);
1006 table->ack_cond = table->req_cond;
1007 table->req_cond = NULL;
1008 }
1009 }
1010}
1011
1012/* Should be called when the CS fsm is restarted and resyncs table conditions
1013 * based on the state the DB is in:
1014 * - if a non-zero last_id is available for the DB then upon reconnect
1015 * the CS should first request acked conditions to avoid missing updates
1016 * about records that were added before the transaction with
1017 * txn-id == last_id. If there were requested condition changes in flight
1018 * (i.e., req_cond not NULL) and the CS client didn't set new conditions
1019 * (i.e., new_cond is NULL) then move req_cond to new_cond to trigger a
1020 * follow up monitor_cond_change request.
1021 * - if there's no last_id available for the DB then it's safe to use the
1022 * latest conditions set by the CS client even if they weren't acked yet.
1023 */
1024static void
1025ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db)
1026{
1027 bool ack_all = uuid_is_zero(&db->last_id);
1028 if (ack_all) {
1029 db->cond_changed = false;
1030 }
1031
1032 struct ovsdb_cs_db_table *table;
1033 HMAP_FOR_EACH (table, hmap_node, &db->tables) {
1034 /* When monitor_cond_since requests will be issued, the
1035 * table->ack_cond condition will be added to the "where" clause".
1036 * Follow up monitor_cond_change requests will use table->new_cond.
1037 */
1038 if (ack_all) {
1039 if (table->new_cond) {
1040 json_destroy(table->req_cond);
1041 table->req_cond = table->new_cond;
1042 table->new_cond = NULL;
1043 }
1044
1045 if (table->req_cond) {
1046 json_destroy(table->ack_cond);
1047 table->ack_cond = table->req_cond;
1048 table->req_cond = NULL;
1049 }
1050 } else {
1051 if (table->req_cond) {
1052 /* There was an in-flight monitor_cond_change request. It's no
1053 * longer relevant in the restarted FSM, so clear it. */
1054 if (table->new_cond) {
1055 /* We will send a new monitor_cond_change with the new
1056 * condition. The previously in-flight condition is
1057 * irrelevant and we can just forget about it. */
1058 json_destroy(table->req_cond);
1059 } else {
1060 /* The restarted FSM needs to again send a request for the
1061 * previously in-flight condition. */
1062 table->new_cond = table->req_cond;
1063 }
1064 table->req_cond = NULL;
1065 db->cond_changed = true;
1066 }
1067 }
1068 }
1069}
1070
1071static void
1072ovsdb_cs_send_cond_change(struct ovsdb_cs *cs)
1073{
1074 /* When 'cs->request_id' is not NULL, there is an outstanding
1075 * conditional monitoring update request that we have not heard
1076 * from the server yet. Don't generate another request in this case. */
1077 if (!jsonrpc_session_is_connected(cs->session)
1078 || cs->data.monitor_version == 1
1079 || cs->request_id) {
1080 return;
1081 }
1082
1083 struct jsonrpc_msg *msg = ovsdb_cs_db_compose_cond_change(&cs->data);
1084 if (msg) {
1085 cs->request_id = json_clone(msg->id);
1086 jsonrpc_session_send(cs->session, msg);
1087 }
1088}
1089\f
1090/* Clustered servers. */
1091
1092/* By default, or if 'leader_only' is true, when 'cs' connects to a clustered
1093 * database, the CS layer will avoid servers other than the cluster
1094 * leader. This ensures that any data that it reads and reports is up-to-date.
1095 * If 'leader_only' is false, the CS layer will accept any server in the
1096 * cluster, which means that for read-only transactions it can report and act
1097 * on stale data (transactions that modify the database are always serialized
1098 * even with false 'leader_only'). Refer to Understanding Cluster Consistency
1099 * in ovsdb(7) for more information. */
1100void
1101ovsdb_cs_set_leader_only(struct ovsdb_cs *cs, bool leader_only)
1102{
1103 cs->leader_only = leader_only;
1104 if (leader_only && cs->server.monitor_version) {
1105 ovsdb_cs_check_server_db(cs);
1106 }
1107}
1108
1109/* Set whether the order of remotes should be shuffled, when there is more than
1110 * one remote. The setting doesn't take effect until the next time when
1111 * ovsdb_cs_set_remote() is called. */
1112void
1113ovsdb_cs_set_shuffle_remotes(struct ovsdb_cs *cs, bool shuffle)
1114{
1115 cs->shuffle_remotes = shuffle;
1116}
1117
1118/* Reset min_index to 0. This prevents a situation where the client
1119 * thinks all databases have stale data, when they actually have all
1120 * been destroyed and rebuilt from scratch.
1121 */
1122void
1123ovsdb_cs_reset_min_index(struct ovsdb_cs *cs)
1124{
1125 cs->min_index = 0;
1126}
1127\f
1128/* Database locks. */
1129
1130static struct jsonrpc_msg *
1131ovsdb_cs_db_set_lock(struct ovsdb_cs_db *db, const char *lock_name)
1132{
1133 if (db->lock_name
1134 && (!lock_name || strcmp(lock_name, db->lock_name))) {
1135 /* Release previous lock. */
1136 struct jsonrpc_msg *msg = ovsdb_cs_db_compose_unlock_request(db);
1137 free(db->lock_name);
1138 db->lock_name = NULL;
1139 db->is_lock_contended = false;
1140 return msg;
1141 }
1142
1143 if (lock_name && !db->lock_name) {
1144 /* Acquire new lock. */
1145 db->lock_name = xstrdup(lock_name);
1146 return ovsdb_cs_db_compose_lock_request(db);
1147 }
1148
1149 return NULL;
1150}
1151
1152/* If 'lock_name' is nonnull, configures 'cs' to obtain the named lock from the
1153 * database server and to prevent modifying the database when the lock cannot
1154 * be acquired (that is, when another client has the same lock).
1155 *
1156 * If 'lock_name' is NULL, drops the locking requirement and releases the
1157 * lock. */
1158void
1159ovsdb_cs_set_lock(struct ovsdb_cs *cs, const char *lock_name)
1160{
1161 for (;;) {
1162 struct jsonrpc_msg *msg = ovsdb_cs_db_set_lock(&cs->data, lock_name);
1163 if (!msg) {
1164 break;
1165 }
1166 if (cs->session) {
1167 jsonrpc_session_send(cs->session, msg);
1168 } else {
1169 jsonrpc_msg_destroy(msg);
1170 }
1171 }
1172}
1173
1174/* Returns the name of the lock that 'cs' is trying to obtain, or NULL if none
1175 * is configured. */
1176const char *
1177ovsdb_cs_get_lock(const struct ovsdb_cs *cs)
1178{
1179 return cs->data.lock_name;
1180}
1181
1182/* Returns true if 'cs' is configured to obtain a lock and owns that lock,
1183 * false if it doesn't own the lock or isn't configured to obtain one.
1184 *
1185 * Locking and unlocking happens asynchronously from the database client's
1186 * point of view, so the information is only useful for optimization (e.g. if
1187 * the client doesn't have the lock then there's no point in trying to write to
1188 * the database). */
1189bool
1190ovsdb_cs_has_lock(const struct ovsdb_cs *cs)
1191{
1192 return cs->data.has_lock;
1193}
1194
1195/* Returns true if 'cs' is configured to obtain a lock but the database server
1196 * has indicated that some other client already owns the requested lock. */
1197bool
1198ovsdb_cs_is_lock_contended(const struct ovsdb_cs *cs)
1199{
1200 return cs->data.is_lock_contended;
1201}
1202
1203static void
1204ovsdb_cs_db_update_has_lock(struct ovsdb_cs_db *db, bool new_has_lock)
1205{
1206 if (new_has_lock && !db->has_lock) {
1207 ovsdb_cs_db_add_event(db, OVSDB_CS_EVENT_TYPE_LOCKED);
1208 db->is_lock_contended = false;
1209 }
1210 db->has_lock = new_has_lock;
1211}
1212
1213static bool
1214ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *db,
1215 const struct jsonrpc_msg *msg)
1216{
1217 if (msg->type == JSONRPC_REPLY
1218 && db->lock_request_id
1219 && json_equal(db->lock_request_id, msg->id)) {
1220 /* Reply to our "lock" request. */
1221 ovsdb_cs_db_parse_lock_reply(db, msg->result);
1222 return true;
1223 }
1224
1225 if (msg->type == JSONRPC_NOTIFY) {
1226 if (!strcmp(msg->method, "locked")) {
1227 /* We got our lock. */
1228 return ovsdb_cs_db_parse_lock_notify(db, msg->params, true);
1229 } else if (!strcmp(msg->method, "stolen")) {
1230 /* Someone else stole our lock. */
1231 return ovsdb_cs_db_parse_lock_notify(db, msg->params, false);
1232 }
1233 }
1234
1235 return false;
1236}
1237
1238static struct jsonrpc_msg *
1239ovsdb_cs_db_compose_lock_request__(struct ovsdb_cs_db *db,
1240 const char *method)
1241{
1242 ovsdb_cs_db_update_has_lock(db, false);
1243
1244 json_destroy(db->lock_request_id);
1245 db->lock_request_id = NULL;
1246
1247 struct json *params = json_array_create_1(json_string_create(
1248 db->lock_name));
1249 return jsonrpc_create_request(method, params, NULL);
1250}
1251
1252static struct jsonrpc_msg *
1253ovsdb_cs_db_compose_lock_request(struct ovsdb_cs_db *db)
1254{
1255 struct jsonrpc_msg *msg = ovsdb_cs_db_compose_lock_request__(db, "lock");
1256 db->lock_request_id = json_clone(msg->id);
1257 return msg;
1258}
1259
1260static struct jsonrpc_msg *
1261ovsdb_cs_db_compose_unlock_request(struct ovsdb_cs_db *db)
1262{
1263 return ovsdb_cs_db_compose_lock_request__(db, "unlock");
1264}
1265
1266static void
1267ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *db,
1268 const struct json *result)
1269{
1270 bool got_lock;
1271
1272 json_destroy(db->lock_request_id);
1273 db->lock_request_id = NULL;
1274
1275 if (result->type == JSON_OBJECT) {
1276 const struct json *locked;
1277
1278 locked = shash_find_data(json_object(result), "locked");
1279 got_lock = locked && locked->type == JSON_TRUE;
1280 } else {
1281 got_lock = false;
1282 }
1283
1284 ovsdb_cs_db_update_has_lock(db, got_lock);
1285 if (!got_lock) {
1286 db->is_lock_contended = true;
1287 }
1288}
1289
1290static bool
1291ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *db,
1292 const struct json *params,
1293 bool new_has_lock)
1294{
1295 if (db->lock_name
1296 && params->type == JSON_ARRAY
1297 && json_array(params)->n > 0
1298 && json_array(params)->elems[0]->type == JSON_STRING) {
1299 const char *lock_name = json_string(json_array(params)->elems[0]);
1300
1301 if (!strcmp(db->lock_name, lock_name)) {
1302 ovsdb_cs_db_update_has_lock(db, new_has_lock);
1303 if (!new_has_lock) {
1304 db->is_lock_contended = true;
1305 }
1306 return true;
1307 }
1308 }
1309 return false;
1310}
1311\f
1312/* Transactions. */
1313
1314static bool
1315ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *cs,
1316 const struct jsonrpc_msg *reply)
1317{
1318 bool found = ovsdb_cs_forget_transaction(cs, reply->id);
1319 if (found) {
1320 struct ovsdb_cs_event *event
1321 = ovsdb_cs_db_add_event(&cs->data, OVSDB_CS_EVENT_TYPE_TXN_REPLY);
1322 event->txn_reply = jsonrpc_msg_clone(reply);
1323 }
1324 return found;
1325}
1326
1327/* Returns true if 'cs' can be sent a transaction now, false otherwise. This
1328 * is useful for optimization: there is no point in composing and sending a
1329 * transaction if it returns false. */
1330bool
1331ovsdb_cs_may_send_transaction(const struct ovsdb_cs *cs)
1332{
1333 return (cs->session != NULL
1334 && cs->state == CS_S_MONITORING
1335 && (!cs->data.lock_name || ovsdb_cs_has_lock(cs)));
1336}
1337
1338/* Attempts to send a transaction with the specified 'operations' to 'cs''s
1339 * server. On success, returns the request ID; the caller must eventually free
1340 * it. On failure, returns NULL. */
1341struct json * OVS_WARN_UNUSED_RESULT
1342ovsdb_cs_send_transaction(struct ovsdb_cs *cs, struct json *operations)
1343{
1344 if (!ovsdb_cs_may_send_transaction(cs)) {
1345 json_destroy(operations);
1346 return NULL;
1347 }
1348
1349 if (cs->data.lock_name) {
1350 struct json *assertion = json_object_create();
1351 json_object_put_string(assertion, "op", "assert");
1352 json_object_put_string(assertion, "lock", cs->data.lock_name);
1353 json_array_add(operations, assertion);
1354 }
1355
1356 struct json *request_id;
1357 struct jsonrpc_msg *request = jsonrpc_create_request(
1358 "transact", operations, &request_id);
1359 int error = jsonrpc_session_send(cs->session, request);
1360 if (error) {
1361 json_destroy(request_id);
1362 return NULL;
1363 }
1364
1365 if (cs->n_txns >= cs->allocated_txns) {
1366 cs->txns = x2nrealloc(cs->txns, &cs->allocated_txns,
1367 sizeof *cs->txns);
1368 }
1369 cs->txns[cs->n_txns++] = request_id;
1370 return request_id;
1371}
1372
1373/* Makes 'cs' drop its record of transaction 'request_id'. If a reply arrives
1374 * for it later (which it will, unless the connection drops in the meantime),
1375 * it won't be reported through an event.
1376 *
1377 * Returns true if 'request_id' was known, false otherwise. */
1378bool
1379ovsdb_cs_forget_transaction(struct ovsdb_cs *cs, const struct json *request_id)
1380{
1381 for (size_t i = 0; i < cs->n_txns; i++) {
1382 if (json_equal(request_id, cs->txns[i])) {
1383 cs->txns[i] = cs->txns[--cs->n_txns];
1384 return true;
1385 }
1386 }
1387 return false;
1388}
1389\f
1390static void
1391ovsdb_cs_send_schema_request(struct ovsdb_cs *cs,
1392 struct ovsdb_cs_db *db)
1393{
1394 ovsdb_cs_send_request(cs, jsonrpc_create_request(
1395 "get_schema",
1396 json_array_create_1(json_string_create(
1397 db->db_name)),
1398 NULL));
1399}
1400
1401static void
1402ovsdb_cs_send_db_change_aware(struct ovsdb_cs *cs)
1403{
1404 struct jsonrpc_msg *msg = jsonrpc_create_request(
1405 "set_db_change_aware", json_array_create_1(json_boolean_create(true)),
1406 NULL);
1407 jsonrpc_session_send(cs->session, msg);
1408}
1409
1410static void
1411ovsdb_cs_send_monitor_request(struct ovsdb_cs *cs, struct ovsdb_cs_db *db,
1412 int version)
1413{
1414 struct json *mrs = db->ops->compose_monitor_requests(
1415 db->schema, db->ops_aux);
1416 /* XXX handle failure */
1417 ovs_assert(mrs->type == JSON_OBJECT);
1418
1419 if (version > 1) {
1420 struct ovsdb_cs_db_table *table;
1421 HMAP_FOR_EACH (table, hmap_node, &db->tables) {
1422 if (table->ack_cond) {
1423 struct json *mr = shash_find_data(json_object(mrs),
1424 table->name);
1425 if (!mr) {
1426 mr = json_array_create_empty();
1427 json_object_put(mrs, table->name, mr);
1428 }
1429 ovs_assert(mr->type == JSON_ARRAY);
1430
1431 struct json *mr0;
1432 if (json_array(mr)->n == 0) {
1433 mr0 = json_object_create();
1434 json_object_put(mr0, "columns", json_array_create_empty());
1435 json_array_add(mr, mr0);
1436 } else {
1437 mr0 = json_array(mr)->elems[0];
1438 }
1439 ovs_assert(mr0->type == JSON_OBJECT);
1440
1441 json_object_put(mr0, "where",
1442 json_clone(table->ack_cond));
1443 }
1444 }
1445 }
1446
1447 const char *method = (version == 1 ? "monitor"
1448 : version == 2 ? "monitor_cond"
1449 : "monitor_cond_since");
1450 struct json *params = json_array_create_3(
1451 json_string_create(db->db_name),
1452 json_clone(db->monitor_id),
1453 mrs);
1454 if (version == 3) {
1455 struct json *json_last_id = json_string_create_nocopy(
1456 xasprintf(UUID_FMT, UUID_ARGS(&db->last_id)));
1457 json_array_add(params, json_last_id);
1458 }
1459 ovsdb_cs_send_request(cs, jsonrpc_create_request(method, params, NULL));
1460}
1461
1462static void
1463log_parse_update_error(struct ovsdb_error *error)
1464{
1465 if (!VLOG_DROP_WARN(&syntax_rl)) {
1466 char *s = ovsdb_error_to_string(error);
1467 VLOG_WARN_RL(&syntax_rl, "%s", s);
1468 free(s);
1469 }
1470 ovsdb_error_destroy(error);
1471}
1472
1473static void
1474ovsdb_cs_db_add_update(struct ovsdb_cs_db *db,
1475 const struct json *table_updates, int version,
1476 bool clear, bool monitor_reply)
1477{
1478 struct ovsdb_cs_event *event = ovsdb_cs_db_add_event(
1479 db, OVSDB_CS_EVENT_TYPE_UPDATE);
1480 event->update = (struct ovsdb_cs_update_event) {
1481 .table_updates = json_clone(table_updates),
1482 .clear = clear,
1483 .monitor_reply = monitor_reply,
1484 .version = version,
1485 };
1486}
1487
1488static void
1489ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *db,
1490 const struct json *result, int version)
1491{
1492 const struct json *table_updates;
1493 bool clear;
1494 if (version == 3) {
1495 struct uuid last_id;
1496 if (result->type != JSON_ARRAY || result->array.n != 3
1497 || (result->array.elems[0]->type != JSON_TRUE &&
1498 result->array.elems[0]->type != JSON_FALSE)
1499 || result->array.elems[1]->type != JSON_STRING
1500 || !uuid_from_string(&last_id,
1501 json_string(result->array.elems[1]))) {
1502 struct ovsdb_error *error = ovsdb_syntax_error(
1503 result, NULL, "bad monitor_cond_since reply format");
1504 log_parse_update_error(error);
1505 return;
1506 }
1507
1508 bool found = json_boolean(result->array.elems[0]);
1509 clear = !found;
1510 table_updates = result->array.elems[2];
1511 } else {
1512 clear = true;
1513 table_updates = result;
1514 }
1515
1516 ovsdb_cs_db_add_update(db, table_updates, version, clear, true);
1517}
1518
1519static bool
1520ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *db,
1521 const struct jsonrpc_msg *msg)
1522{
1523 if (msg->type != JSONRPC_NOTIFY) {
1524 return false;
1525 }
1526
1527 int version = (!strcmp(msg->method, "update") ? 1
1528 : !strcmp(msg->method, "update2") ? 2
1529 : !strcmp(msg->method, "update3") ? 3
1530 : 0);
1531 if (!version) {
1532 return false;
1533 }
1534
1535 struct json *params = msg->params;
1536 int n = version == 3 ? 3 : 2;
1537 if (params->type != JSON_ARRAY || params->array.n != n) {
1538 struct ovsdb_error *error = ovsdb_syntax_error(
1539 params, NULL, "%s must be an array with %u elements.",
1540 msg->method, n);
1541 log_parse_update_error(error);
1542 return false;
1543 }
1544
1545 if (!json_equal(params->array.elems[0], db->monitor_id)) {
1546 return false;
1547 }
1548
1549 if (version == 3) {
1550 const char *last_id = json_string(params->array.elems[1]);
1551 if (!uuid_from_string(&db->last_id, last_id)) {
1552 struct ovsdb_error *error = ovsdb_syntax_error(
1553 params, NULL, "Last-id %s is not in UUID format.", last_id);
1554 log_parse_update_error(error);
1555 return false;
1556 }
1557 }
1558
1559 struct json *table_updates = params->array.elems[version == 3 ? 2 : 1];
1560 ovsdb_cs_db_add_update(db, table_updates, version, false, false);
1561 return true;
1562}
1563
1564static bool
1565ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *cs,
1566 struct ovsdb_cs_db *db,
1567 const struct jsonrpc_msg *msg)
1568{
1569 if (msg->type != JSONRPC_NOTIFY
1570 || strcmp(msg->method, "monitor_canceled")
1571 || msg->params->type != JSON_ARRAY
1572 || msg->params->array.n != 1
1573 || !json_equal(msg->params->array.elems[0], db->monitor_id)) {
1574 return false;
1575 }
1576
1577 db->monitor_version = 0;
1578
1579 /* Cancel the other monitor and restart the FSM from the top.
1580 *
1581 * Maybe a more sophisticated response would be better in some cases, but
1582 * it doesn't seem worth optimizing yet. (Although this is already more
1583 * sophisticated than just dropping the connection and reconnecting.) */
1584 struct ovsdb_cs_db *other_db
1585 = db == &cs->data ? &cs->server : &cs->data;
1586 if (other_db->monitor_version) {
1587 jsonrpc_session_send(
1588 cs->session,
1589 jsonrpc_create_request(
1590 "monitor_cancel",
1591 json_array_create_1(json_clone(other_db->monitor_id)), NULL));
1592 other_db->monitor_version = 0;
1593 }
1594 ovsdb_cs_restart_fsm(cs);
1595
1596 return true;
1597}
1598\f
1599/* The _Server database.
1600 *
1601 * We replicate the Database table in the _Server database because this is the
1602 * only way to find out properties we need to know for clustering, such as
1603 * whether a database is clustered at all and whether this server is the
1604 * leader.
1605 *
1606 * This code implements a kind of simple IDL-like layer. */
1607
1608struct server_column {
1609 const char *name;
1610 struct ovsdb_type type;
1611};
1612enum server_column_index {
1613 COL_NAME,
1614 COL_MODEL,
1615 COL_CONNECTED,
1616 COL_LEADER,
1617 COL_SCHEMA,
1618 COL_CID,
1619 COL_INDEX,
1620};
1621#define OPTIONAL_COLUMN(TYPE) \
1622 { \
1623 .key = OVSDB_BASE_##TYPE##_INIT, \
1624 .value = OVSDB_BASE_VOID_INIT, \
1625 .n_min = 0, \
1626 .n_max = 1 \
1627 }
1628static const struct server_column server_columns[] = {
1629 [COL_NAME] = {"name", OPTIONAL_COLUMN(STRING) },
1630 [COL_MODEL] = {"model", OPTIONAL_COLUMN(STRING) },
1631 [COL_CONNECTED] = {"connected", OPTIONAL_COLUMN(BOOLEAN) },
1632 [COL_LEADER] = {"leader", OPTIONAL_COLUMN(BOOLEAN) },
1633 [COL_SCHEMA] = {"schema", OPTIONAL_COLUMN(STRING) },
1634 [COL_CID] = {"cid", OPTIONAL_COLUMN(UUID) },
1635 [COL_INDEX] = {"index", OPTIONAL_COLUMN(INTEGER) },
1636};
1637#define N_SERVER_COLUMNS ARRAY_SIZE(server_columns)
1638struct server_row {
1639 struct hmap_node hmap_node;
1640 struct uuid uuid;
1641 struct ovsdb_datum data[N_SERVER_COLUMNS];
1642};
1643
1644static void
1645server_row_destroy(struct server_row *row)
1646{
1647 if (row) {
1648 for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
1649 ovsdb_datum_destroy(&row->data[i], &server_columns[i].type);
1650 }
1651 free(row);
1652 }
1653}
1654
1655static struct server_row *
1656ovsdb_cs_find_server_row(struct ovsdb_cs *cs, const struct uuid *uuid)
1657{
1658 struct server_row *row;
1659 HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) {
1660 if (uuid_equals(uuid, &row->uuid)) {
1661 return row;
1662 }
1663 }
1664 return NULL;
1665}
1666
1667static void
1668ovsdb_cs_delete_server_row(struct ovsdb_cs *cs, struct server_row *row)
1669{
1670 hmap_remove(&cs->server_rows, &row->hmap_node);
1671 server_row_destroy(row);
1672}
1673
1674static struct server_row *
1675ovsdb_cs_insert_server_row(struct ovsdb_cs *cs, const struct uuid *uuid)
1676{
1677 struct server_row *row = xmalloc(sizeof *row);
1678 hmap_insert(&cs->server_rows, &row->hmap_node, uuid_hash(uuid));
1679 row->uuid = *uuid;
1680 for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
1681 ovsdb_datum_init_default(&row->data[i], &server_columns[i].type);
1682 }
1683 return row;
1684}
1685
1686static void
1687ovsdb_cs_update_server_row(struct server_row *row,
1688 const struct shash *update, bool xor)
1689{
1690 for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
1691 const struct server_column *column = &server_columns[i];
1692 struct shash_node *node = shash_find(update, column->name);
1693 if (!node) {
1694 continue;
1695 }
1696 const struct json *json = node->data;
1697
1698 struct ovsdb_datum *old = &row->data[i];
1699 struct ovsdb_datum new;
1700 if (!xor) {
1701 struct ovsdb_error *error = ovsdb_datum_from_json(
1702 &new, &column->type, json, NULL);
1703 if (error) {
1704 ovsdb_error_destroy(error);
1705 continue;
1706 }
1707 } else {
1708 struct ovsdb_datum diff;
1709 struct ovsdb_error *error = ovsdb_transient_datum_from_json(
1710 &diff, &column->type, json);
1711 if (error) {
1712 ovsdb_error_destroy(error);
1713 continue;
1714 }
1715
1716 error = ovsdb_datum_apply_diff(&new, old, &diff, &column->type);
1717 if (error) {
1718 ovsdb_error_destroy(error);
1719 ovsdb_datum_destroy(&new, &column->type);
1720 continue;
1721 }
1722 ovsdb_datum_destroy(&diff, &column->type);
1723 }
1724
1725 ovsdb_datum_destroy(&row->data[i], &column->type);
1726 row->data[i] = new;
1727 }
1728}
1729
1730static void
1731ovsdb_cs_clear_server_rows(struct ovsdb_cs *cs)
1732{
1733 struct server_row *row, *next;
1734 HMAP_FOR_EACH_SAFE (row, next, hmap_node, &cs->server_rows) {
1735 ovsdb_cs_delete_server_row(cs, row);
1736 }
1737}
1738
1739static void log_parse_update_error(struct ovsdb_error *);
1740
1741static void
1742ovsdb_cs_process_server_event(struct ovsdb_cs *cs,
1743 const struct ovsdb_cs_event *event)
1744{
1745 ovs_assert(event->type == OVSDB_CS_EVENT_TYPE_UPDATE);
1746
1747 const struct ovsdb_cs_update_event *update = &event->update;
1748 struct ovsdb_cs_db_update *du;
1749 struct ovsdb_error *error = ovsdb_cs_parse_db_update(
1750 update->table_updates, update->version, &du);
1751 if (error) {
1752 log_parse_update_error(error);
1753 return;
1754 }
1755
1756 if (update->clear) {
1757 ovsdb_cs_clear_server_rows(cs);
1758 }
1759
1760 const struct ovsdb_cs_table_update *tu = ovsdb_cs_db_update_find_table(
1761 du, "Database");
1762 if (tu) {
1763 for (size_t i = 0; i < tu->n; i++) {
1764 const struct ovsdb_cs_row_update *ru = &tu->row_updates[i];
1765 struct server_row *row
1766 = ovsdb_cs_find_server_row(cs, &ru->row_uuid);
1767 if (ru->type == OVSDB_CS_ROW_DELETE) {
1768 ovsdb_cs_delete_server_row(cs, row);
1769 } else {
1770 if (!row) {
1771 row = ovsdb_cs_insert_server_row(cs, &ru->row_uuid);
1772 }
1773 ovsdb_cs_update_server_row(row, ru->columns,
1774 ru->type == OVSDB_CS_ROW_XOR);
1775 }
1776 }
1777 }
1778
1779 ovsdb_cs_db_update_destroy(du);
1780}
1781
1782static const char *
1783server_column_get_string(const struct server_row *row,
1784 enum server_column_index index,
1785 const char *default_value)
1786{
1787 ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_STRING);
1788 const struct ovsdb_datum *d = &row->data[index];
1789 return d->n == 1 ? d->keys[0].string : default_value;
1790}
1791
1792static bool
1793server_column_get_bool(const struct server_row *row,
1794 enum server_column_index index,
1795 bool default_value)
1796{
1797 ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_BOOLEAN);
1798 const struct ovsdb_datum *d = &row->data[index];
1799 return d->n == 1 ? d->keys[0].boolean : default_value;
1800}
1801
1802static uint64_t
1803server_column_get_int(const struct server_row *row,
1804 enum server_column_index index,
1805 uint64_t default_value)
1806{
1807 ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_INTEGER);
1808 const struct ovsdb_datum *d = &row->data[index];
1809 return d->n == 1 ? d->keys[0].integer : default_value;
1810}
1811
1812static const struct uuid *
1813server_column_get_uuid(const struct server_row *row,
1814 enum server_column_index index,
1815 const struct uuid *default_value)
1816{
1817 ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_UUID);
1818 const struct ovsdb_datum *d = &row->data[index];
1819 return d->n == 1 ? &d->keys[0].uuid : default_value;
1820}
1821
1822static const struct server_row *
1823ovsdb_find_server_row(struct ovsdb_cs *cs)
1824{
1825 const struct server_row *row;
1826 HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) {
1827 const struct uuid *cid = server_column_get_uuid(row, COL_CID, NULL);
1828 const char *name = server_column_get_string(row, COL_NAME, NULL);
1829 if (uuid_is_zero(&cs->cid)
1830 ? (name && !strcmp(cs->data.db_name, name))
1831 : (cid && uuid_equals(cid, &cs->cid))) {
1832 return row;
1833 }
1834 }
1835 return NULL;
1836}
1837
1838static void OVS_UNUSED
1839ovsdb_log_server_rows(const struct ovsdb_cs *cs)
1840{
1841 int row_num = 0;
1842 const struct server_row *row;
1843 HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) {
1844 struct ds s = DS_EMPTY_INITIALIZER;
1845 for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
1846 ds_put_format(&s, " %s=", server_columns[i].name);
1847 if (i == COL_SCHEMA) {
1848 ds_put_format(&s, "...");
1849 } else {
1850 ovsdb_datum_to_string(&row->data[i], &server_columns[i].type,
1851 &s);
1852 }
1853 }
1854 VLOG_INFO("row %d:%s", row_num++, ds_cstr(&s));
1855 ds_destroy(&s);
1856 }
1857}
1858
1859static bool
1860ovsdb_cs_check_server_db__(struct ovsdb_cs *cs)
1861{
1862 struct ovsdb_cs_event *event;
1863 LIST_FOR_EACH_POP (event, list_node, &cs->server.events) {
1864 ovsdb_cs_process_server_event(cs, event);
1865 ovsdb_cs_event_destroy(event);
1866 }
1867
1868 const struct server_row *db_row = ovsdb_find_server_row(cs);
1869 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1870 const char *server_name = jsonrpc_session_get_name(cs->session);
1871 if (!db_row) {
1872 VLOG_INFO_RL(&rl, "%s: server does not have %s database",
1873 server_name, cs->data.db_name);
1874 return false;
1875 }
1876
1877 bool ok = false;
1878 const char *model = server_column_get_string(db_row, COL_MODEL, "");
1879 const char *schema = server_column_get_string(db_row, COL_SCHEMA, NULL);
1880 if (!strcmp(model, "clustered")) {
1881 bool connected = server_column_get_bool(db_row, COL_CONNECTED, false);
1882 bool leader = server_column_get_bool(db_row, COL_LEADER, false);
1883 uint64_t index = server_column_get_int(db_row, COL_INDEX, 0);
1884
1885 if (!schema) {
1886 VLOG_INFO("%s: clustered database server has not yet joined "
1887 "cluster; trying another server", server_name);
1888 } else if (!connected) {
1889 VLOG_INFO("%s: clustered database server is disconnected "
1890 "from cluster; trying another server", server_name);
1891 } else if (cs->leader_only && !leader) {
1892 VLOG_INFO("%s: clustered database server is not cluster "
1893 "leader; trying another server", server_name);
1894 } else if (index < cs->min_index) {
1895 VLOG_WARN("%s: clustered database server has stale data; "
1896 "trying another server", server_name);
1897 } else {
1898 cs->min_index = index;
1899 ok = true;
1900 }
1901 } else {
1902 if (!schema) {
1903 VLOG_INFO("%s: missing database schema", server_name);
1904 } else {
1905 ok = true;
1906 }
1907 }
1908 if (!ok) {
1909 return false;
1910 }
1911
1912 if (cs->state == CS_S_SERVER_MONITOR_REQUESTED) {
1913 json_destroy(cs->data.schema);
1914 cs->data.schema = json_from_string(schema);
1915 if (cs->data.max_version >= 3) {
1916 ovsdb_cs_send_monitor_request(cs, &cs->data, 3);
1917 ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_SINCE_REQUESTED);
1918 } else if (cs->data.max_version >= 2) {
1919 ovsdb_cs_send_monitor_request(cs, &cs->data, 2);
1920 ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED);
1921 } else {
1922 ovsdb_cs_send_monitor_request(cs, &cs->data, 1);
1923 ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED);
1924 }
1925 }
1926 return true;
1927}
1928
1929static bool
1930ovsdb_cs_check_server_db(struct ovsdb_cs *cs)
1931{
1932 bool ok = ovsdb_cs_check_server_db__(cs);
1933 if (!ok) {
1934 ovsdb_cs_retry(cs);
1935 }
1936 return ok;
1937}
1938
1939static struct json *
1940ovsdb_cs_compose_server_monitor_request(const struct json *schema_json,
1941 void *cs_)
1942{
1943 struct ovsdb_cs *cs = cs_;
1944 struct shash *schema = ovsdb_cs_parse_schema(schema_json);
1945 struct json *monitor_requests = json_object_create();
1946
1947 const char *table_name = "Database";
1948 const struct sset *table_schema
1949 = schema ? shash_find_data(schema, table_name) : NULL;
1950 if (!table_schema) {
1951 VLOG_WARN("%s database lacks %s table "
1952 "(database needs upgrade?)",
1953 cs->server.db_name, table_name);
1954 /* XXX return failure? */
1955 } else {
1956 struct json *columns = json_array_create_empty();
1957 for (size_t j = 0; j < N_SERVER_COLUMNS; j++) {
1958 const struct server_column *column = &server_columns[j];
1959 bool db_has_column = (table_schema &&
1960 sset_contains(table_schema, column->name));
1961 if (table_schema && !db_has_column) {
1962 VLOG_WARN("%s table in %s database lacks %s column "
1963 "(database needs upgrade?)",
1964 table_name, cs->server.db_name, column->name);
1965 continue;
1966 }
1967 json_array_add(columns, json_string_create(column->name));
1968 }
1969
1970 struct json *monitor_request = json_object_create();
1971 json_object_put(monitor_request, "columns", columns);
1972 json_object_put(monitor_requests, table_name,
1973 json_array_create_1(monitor_request));
1974 }
1975 ovsdb_cs_free_schema(schema);
1976
1977 return monitor_requests;
1978}
1979
1980static const struct ovsdb_cs_ops ovsdb_cs_server_ops = {
1981 ovsdb_cs_compose_server_monitor_request
1982};
a5c067a8
BP
1983\f
1984static void
1985log_error(struct ovsdb_error *error)
1986{
1987 char *s = ovsdb_error_to_string_free(error);
1988 VLOG_WARN("error parsing database schema: %s", s);
1989 free(s);
1990}
1991
1992/* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC
1993 * 7047, to obtain the names of its rows and columns. If successful, returns
1994 * an shash whose keys are table names and whose values are ssets, where each
1995 * sset contains the names of its table's columns. On failure (due to a parse
1996 * error), returns NULL.
1997 *
1998 * It would also be possible to use the general-purpose OVSDB schema parser in
1999 * ovsdb-server, but that's overkill, possibly too strict for the current use
2000 * case, and would require restructuring ovsdb-server to separate the schema
2001 * code from the rest. */
2002struct shash *
2003ovsdb_cs_parse_schema(const struct json *schema_json)
2004{
2005 struct ovsdb_parser parser;
2006 const struct json *tables_json;
2007 struct ovsdb_error *error;
2008 struct shash_node *node;
2009 struct shash *schema;
2010
2011 ovsdb_parser_init(&parser, schema_json, "database schema");
2012 tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT);
2013 error = ovsdb_parser_destroy(&parser);
2014 if (error) {
2015 log_error(error);
2016 return NULL;
2017 }
2018
2019 schema = xmalloc(sizeof *schema);
2020 shash_init(schema);
2021 SHASH_FOR_EACH (node, json_object(tables_json)) {
2022 const char *table_name = node->name;
2023 const struct json *json = node->data;
2024 const struct json *columns_json;
2025
2026 ovsdb_parser_init(&parser, json, "table schema for table %s",
2027 table_name);
2028 columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT);
2029 error = ovsdb_parser_destroy(&parser);
2030 if (error) {
2031 log_error(error);
2032 ovsdb_cs_free_schema(schema);
2033 return NULL;
2034 }
2035
2036 struct sset *columns = xmalloc(sizeof *columns);
2037 sset_init(columns);
2038
2039 struct shash_node *node2;
2040 SHASH_FOR_EACH (node2, json_object(columns_json)) {
2041 const char *column_name = node2->name;
2042 sset_add(columns, column_name);
2043 }
2044 shash_add(schema, table_name, columns);
2045 }
2046 return schema;
2047}
2048
2049/* Frees 'schema', which is in the format returned by
2050 * ovsdb_cs_parse_schema(). */
2051void
2052ovsdb_cs_free_schema(struct shash *schema)
2053{
2054 if (schema) {
2055 struct shash_node *node, *next;
2056
2057 SHASH_FOR_EACH_SAFE (node, next, schema) {
2058 struct sset *sset = node->data;
2059 sset_destroy(sset);
2060 free(sset);
2061 shash_delete(schema, node);
2062 }
2063 shash_destroy(schema);
2064 free(schema);
2065 }
2066}
2067\f
2068static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2069ovsdb_cs_parse_row_update1(const struct json *in,
2070 struct ovsdb_cs_row_update *out)
2071{
2072 const struct json *old_json, *new_json;
2073
2074 old_json = shash_find_data(json_object(in), "old");
2075 new_json = shash_find_data(json_object(in), "new");
2076 if (old_json && old_json->type != JSON_OBJECT) {
2077 return ovsdb_syntax_error(old_json, NULL,
2078 "\"old\" <row> is not object");
2079 } else if (new_json && new_json->type != JSON_OBJECT) {
2080 return ovsdb_syntax_error(new_json, NULL,
2081 "\"new\" <row> is not object");
2082 } else if ((old_json != NULL) + (new_json != NULL)
2083 != shash_count(json_object(in))) {
2084 return ovsdb_syntax_error(in, NULL,
2085 "<row-update> contains "
2086 "unexpected member");
2087 } else if (!old_json && !new_json) {
2088 return ovsdb_syntax_error(in, NULL,
2089 "<row-update> missing \"old\" "
2090 "and \"new\" members");
2091 }
2092
2093 if (!new_json) {
2094 out->type = OVSDB_CS_ROW_DELETE;
2095 out->columns = json_object(old_json);
2096 } else if (!old_json) {
2097 out->type = OVSDB_CS_ROW_INSERT;
2098 out->columns = json_object(new_json);
2099 } else {
2100 out->type = OVSDB_CS_ROW_UPDATE;
2101 out->columns = json_object(new_json);
2102 }
2103 return NULL;
2104}
2105
2106static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2107ovsdb_cs_parse_row_update2(const struct json *in,
2108 struct ovsdb_cs_row_update *out)
2109{
2110 const struct shash *object = json_object(in);
2111 if (shash_count(object) != 1) {
2112 return ovsdb_syntax_error(
2113 in, NULL, "<row-update2> has %"PRIuSIZE" members "
2114 "instead of expected 1", shash_count(object));
2115 }
2116
2117 struct shash_node *node = shash_first(object);
2118 const struct json *columns = node->data;
2119 if (!strcmp(node->name, "insert") || !strcmp(node->name, "initial")) {
2120 out->type = OVSDB_CS_ROW_INSERT;
2121 } else if (!strcmp(node->name, "modify")) {
2122 out->type = OVSDB_CS_ROW_XOR;
2123 } else if (!strcmp(node->name, "delete")) {
2124 out->type = OVSDB_CS_ROW_DELETE;
2125 if (columns->type != JSON_NULL) {
2126 return ovsdb_syntax_error(
2127 in, NULL,
2128 "<row-update2> delete operation has unexpected value");
2129 }
2130 return NULL;
2131 } else {
2132 return ovsdb_syntax_error(in, NULL,
2133 "<row-update2> has unknown member \"%s\"",
2134 node->name);
2135 }
2136
2137 if (columns->type != JSON_OBJECT) {
2138 return ovsdb_syntax_error(
2139 in, NULL,
2140 "<row-update2> \"%s\" operation has unexpected value",
2141 node->name);
2142 }
2143 out->columns = json_object(columns);
2144
2145 return NULL;
2146}
2147
2148static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2149ovsdb_cs_parse_row_update(const char *table_name,
2150 const struct json *in, int version,
2151 struct ovsdb_cs_row_update *out)
2152{
2153 if (in->type != JSON_OBJECT) {
2154 const char *suffix = version > 1 ? "2" : "";
2155 return ovsdb_syntax_error(
2156 in, NULL,
2157 "<table-update%s> for table \"%s\" contains <row-update%s> "
2158 "that is not an object",
2159 suffix, table_name, suffix);
2160 }
2161
2162 return (version == 1
2163 ? ovsdb_cs_parse_row_update1(in, out)
2164 : ovsdb_cs_parse_row_update2(in, out));
2165}
2166
2167static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2168ovsdb_cs_parse_table_update(const char *table_name,
2169 const struct json *in, int version,
2170 struct ovsdb_cs_table_update *out)
2171{
2172 const char *suffix = version > 1 ? "2" : "";
2173
2174 if (in->type != JSON_OBJECT) {
2175 return ovsdb_syntax_error(
2176 in, NULL, "<table-update%s> for table \"%s\" is not an object",
2177 suffix, table_name);
2178 }
2179 struct shash *in_rows = json_object(in);
2180
2181 out->row_updates = xmalloc(shash_count(in_rows) * sizeof *out->row_updates);
2182
2183 const struct shash_node *node;
2184 SHASH_FOR_EACH (node, in_rows) {
2185 const char *row_uuid_string = node->name;
2186 struct uuid row_uuid;
2187 if (!uuid_from_string(&row_uuid, row_uuid_string)) {
2188 return ovsdb_syntax_error(
2189 in, NULL,
2190 "<table-update%s> for table \"%s\" contains "
2191 "bad UUID \"%s\" as member name",
2192 suffix, table_name, row_uuid_string);
2193 }
2194
2195 const struct json *in_ru = node->data;
2196 struct ovsdb_cs_row_update *out_ru = &out->row_updates[out->n++];
2197 *out_ru = (struct ovsdb_cs_row_update) { .row_uuid = row_uuid };
2198
2199 struct ovsdb_error *error = ovsdb_cs_parse_row_update(
2200 table_name, in_ru, version, out_ru);
2201 if (error) {
2202 return error;
2203 }
2204 }
2205
2206 return NULL;
2207}
2208
2209/* Parses OVSDB <table-updates> or <table-updates2> object 'in' into '*outp'.
2210 * If successful, sets '*outp' to the new object and returns NULL. On failure,
2211 * returns the error and sets '*outp' to NULL.
2212 *
2213 * On success, the caller must eventually free '*outp', with
2214 * ovsdb_cs_db_update_destroy().
2215 *
2216 * 'version' should be 1 if 'in' is a <table-updates>, 2 or 3 if it is a
2217 * <table-updates2>. */
2218struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2219ovsdb_cs_parse_db_update(const struct json *in, int version,
2220 struct ovsdb_cs_db_update **outp)
2221{
2222 const char *suffix = version > 1 ? "2" : "";
2223
2224 *outp = NULL;
2225 if (in->type != JSON_OBJECT) {
2226 return ovsdb_syntax_error(in, NULL,
2227 "<table-updates%s> is not an object", suffix);
2228 }
2229
2230 struct ovsdb_cs_db_update *out = xzalloc(sizeof *out);
2231 out->table_updates = xmalloc(shash_count(json_object(in))
2232 * sizeof *out->table_updates);
2233 const struct shash_node *node;
2234 SHASH_FOR_EACH (node, json_object(in)) {
2235 const char *table_name = node->name;
2236 const struct json *in_tu = node->data;
2237
2238 struct ovsdb_cs_table_update *out_tu = &out->table_updates[out->n++];
2239 *out_tu = (struct ovsdb_cs_table_update) { .table_name = table_name };
2240
2241 struct ovsdb_error *error = ovsdb_cs_parse_table_update(
2242 table_name, in_tu, version, out_tu);
2243 if (error) {
2244 ovsdb_cs_db_update_destroy(out);
2245 return error;
2246 }
2247 }
2248
2249 *outp = out;
2250 return NULL;
2251}
2252
2253/* Frees 'du', which was presumably allocated by ovsdb_cs_parse_db_update(). */
2254void
2255ovsdb_cs_db_update_destroy(struct ovsdb_cs_db_update *du)
2256{
2257 if (!du) {
2258 return;
2259 }
2260
2261 for (size_t i = 0; i < du->n; i++) {
2262 struct ovsdb_cs_table_update *tu = &du->table_updates[i];
2263 free(tu->row_updates);
2264 }
2265 free(du->table_updates);
2266 free(du);
2267}
1c337c43
BP
2268
2269const struct ovsdb_cs_table_update *
2270ovsdb_cs_db_update_find_table(const struct ovsdb_cs_db_update *du,
2271 const char *table_name)
2272{
2273 for (size_t i = 0; i < du->n; i++) {
2274 const struct ovsdb_cs_table_update *tu = &du->table_updates[i];
2275 if (!strcmp(tu->table_name, table_name)) {
2276 return tu;
2277 }
2278 }
2279 return NULL;
2280}
2281