2 /* Copyright (c) 2009, 2010, 2011, 2016, 2017 Nicira, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this storage except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include "ovsdb-error.h"
23 #include "openvswitch/json.h"
24 #include "openvswitch/poll-loop.h"
25 #include "openvswitch/vlog.h"
32 VLOG_DEFINE_THIS_MODULE(storage
);
34 struct ovsdb_storage
{
35 /* There are three kinds of storage:
37 * - Standalone, backed by a disk file. 'log' is nonnull, 'raft' is
40 * - Clustered, backed by a Raft cluster. 'log' is null, 'raft' is
43 * - Memory only, unbacked. 'log' and 'raft' are null. */
44 struct ovsdb_log
*log
;
47 /* All kinds of storage. */
48 struct ovsdb_error
*error
; /* If nonnull, a permanent error. */
49 long long next_snapshot_min
; /* Earliest time to take next snapshot. */
50 long long next_snapshot_max
; /* Latest time to take next snapshot. */
52 /* Standalone only. */
54 unsigned int n_written
;
57 static void schedule_next_snapshot(struct ovsdb_storage
*, bool quick
);
59 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
60 ovsdb_storage_open__(const char *filename
, bool rw
, bool allow_clustered
,
61 struct ovsdb_storage
**storagep
)
65 struct ovsdb_log
*log
;
66 struct ovsdb_error
*error
;
67 error
= ovsdb_log_open(filename
, OVSDB_MAGIC
"|"RAFT_MAGIC
,
68 rw
? OVSDB_LOG_READ_WRITE
: OVSDB_LOG_READ_ONLY
,
74 struct raft
*raft
= NULL
;
75 if (!strcmp(ovsdb_log_get_magic(log
), RAFT_MAGIC
)) {
76 if (!allow_clustered
) {
78 return ovsdb_error(NULL
, "%s: cannot apply this operation to "
79 "clustered database file", filename
);
81 error
= raft_open(log
, &raft
);
88 struct ovsdb_storage
*storage
= xzalloc(sizeof *storage
);
91 schedule_next_snapshot(storage
, false);
96 /* Opens 'filename' for use as storage. If 'rw', opens it for read/write
97 * access, otherwise read-only. If successful, stores the new storage in
98 * '*storagep' and returns NULL; on failure, stores NULL in '*storagep' and
101 * The returned storage might be clustered or standalone, depending on what the
102 * disk file contains. */
103 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
104 ovsdb_storage_open(const char *filename
, bool rw
,
105 struct ovsdb_storage
**storagep
)
107 return ovsdb_storage_open__(filename
, rw
, true, storagep
);
110 struct ovsdb_storage
*
111 ovsdb_storage_open_standalone(const char *filename
, bool rw
)
113 struct ovsdb_storage
*storage
;
114 struct ovsdb_error
*error
= ovsdb_storage_open__(filename
, rw
, false,
117 ovs_fatal(0, "%s", ovsdb_error_to_string_free(error
));
122 /* Creates and returns new storage without any backing. Nothing will be read
123 * from the storage, and writes are discarded. */
124 struct ovsdb_storage
*
125 ovsdb_storage_create_unbacked(void)
127 struct ovsdb_storage
*storage
= xzalloc(sizeof *storage
);
128 schedule_next_snapshot(storage
, false);
133 ovsdb_storage_close(struct ovsdb_storage
*storage
)
136 ovsdb_log_close(storage
->log
);
137 raft_close(storage
->raft
);
138 ovsdb_error_destroy(storage
->error
);
144 ovsdb_storage_get_model(const struct ovsdb_storage
*storage
)
146 return storage
->raft
? "clustered" : "standalone";
150 ovsdb_storage_is_clustered(const struct ovsdb_storage
*storage
)
152 return storage
->raft
!= NULL
;
156 ovsdb_storage_is_connected(const struct ovsdb_storage
*storage
)
158 return !storage
->raft
|| raft_is_connected(storage
->raft
);
162 ovsdb_storage_is_dead(const struct ovsdb_storage
*storage
)
164 return storage
->raft
&& raft_left(storage
->raft
);
168 ovsdb_storage_is_leader(const struct ovsdb_storage
*storage
)
170 return !storage
->raft
|| raft_is_leader(storage
->raft
);
174 ovsdb_storage_get_cid(const struct ovsdb_storage
*storage
)
176 return storage
->raft
? raft_get_cid(storage
->raft
) : NULL
;
180 ovsdb_storage_get_sid(const struct ovsdb_storage
*storage
)
182 return storage
->raft
? raft_get_sid(storage
->raft
) : NULL
;
186 ovsdb_storage_get_applied_index(const struct ovsdb_storage
*storage
)
188 return storage
->raft
? raft_get_applied_index(storage
->raft
) : 0;
192 ovsdb_storage_run(struct ovsdb_storage
*storage
)
195 raft_run(storage
->raft
);
200 ovsdb_storage_wait(struct ovsdb_storage
*storage
)
203 raft_wait(storage
->raft
);
207 /* Returns 'storage''s embedded name, if it has one, otherwise null.
209 * Only clustered storage has a built-in name. */
211 ovsdb_storage_get_name(const struct ovsdb_storage
*storage
)
213 return storage
->raft
? raft_get_name(storage
->raft
) : NULL
;
216 /* Attempts to read a log record from 'storage'.
218 * If successful, returns NULL and stores the transaction information in
219 * '*schemap', '*txnp', and '*txnid'. At least one of these will be nonnull.
220 * The caller owns the data and must eventually free it (with json_destroy()).
222 * If 'storage' is not clustered, 'txnid' may be null.
224 * If a read error occurs, returns the error and stores NULL in '*jsonp'.
226 * If the read reaches end of file, returns NULL and stores NULL in
228 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
229 ovsdb_storage_read(struct ovsdb_storage
*storage
,
230 struct ovsdb_schema
**schemap
,
241 struct json
*schema_json
= NULL
;
242 struct json
*txn_json
= NULL
;
245 json
= json_nullable_clone(
246 raft_next_entry(storage
->raft
, txnid
, &is_snapshot
));
249 } else if (json
->type
!= JSON_ARRAY
|| json
->array
.n
!= 2) {
251 return ovsdb_error(NULL
, "invalid commit format");
254 struct json
**e
= json
->array
.elems
;
255 schema_json
= e
[0]->type
!= JSON_NULL
? e
[0] : NULL
;
256 txn_json
= e
[1]->type
!= JSON_NULL
? e
[1] : NULL
;
257 } else if (storage
->log
) {
258 struct ovsdb_error
*error
= ovsdb_log_read(storage
->log
, &json
);
259 if (error
|| !json
) {
263 unsigned int n
= storage
->n_read
++;
264 struct json
**jsonp
= !n
? &schema_json
: &txn_json
;
267 ovsdb_log_mark_base(storage
->log
);
270 /* Unbacked. Nothing to do. */
274 /* If we got this far then we must have at least a schema or a
276 ovs_assert(schema_json
|| txn_json
);
279 struct ovsdb_schema
*schema
;
280 struct ovsdb_error
*error
= ovsdb_schema_from_json(schema_json
,
287 const char *storage_name
= ovsdb_storage_get_name(storage
);
288 const char *schema_name
= schema
->name
;
289 if (storage_name
&& strcmp(storage_name
, schema_name
)) {
290 error
= ovsdb_error(NULL
, "name %s in header does not match "
292 storage_name
, schema_name
);
294 ovsdb_schema_destroy(schema
);
302 *txnp
= json_clone(txn_json
);
309 /* Reads and returns the schema from standalone storage 'storage'. Terminates
310 * with an error on failure. */
311 struct ovsdb_schema
*
312 ovsdb_storage_read_schema(struct ovsdb_storage
*storage
)
314 ovs_assert(storage
->log
);
316 struct json
*txn_json
;
317 struct ovsdb_schema
*schema
;
318 struct ovsdb_error
*error
= ovsdb_storage_read(storage
, &schema
,
321 ovs_fatal(0, "%s", ovsdb_error_to_string_free(error
));
323 if (!schema
&& !txn_json
) {
324 ovs_fatal(0, "unexpected end of file reading schema");
326 ovs_assert(schema
&& !txn_json
);
332 ovsdb_storage_read_wait(struct ovsdb_storage
*storage
)
334 return (storage
->raft
335 ? raft_has_next_entry(storage
->raft
)
340 ovsdb_storage_unread(struct ovsdb_storage
*storage
)
342 if (storage
->error
) {
347 if (!storage
->error
) {
348 storage
->error
= ovsdb_error(NULL
, "inconsistent data");
350 } else if (storage
->log
) {
351 ovsdb_log_unread(storage
->log
);
356 struct ovsdb_error
*error
;
357 struct raft_command
*command
;
360 /* Not suitable for writing transactions that change the schema. */
361 struct ovsdb_write
* OVS_WARN_UNUSED_RESULT
362 ovsdb_storage_write(struct ovsdb_storage
*storage
, const struct json
*data
,
363 const struct uuid
*prereq
, struct uuid
*resultp
,
366 struct ovsdb_write
*w
= xzalloc(sizeof *w
);
367 struct uuid result
= UUID_ZERO
;
368 if (storage
->error
) {
369 w
->error
= ovsdb_error_clone(storage
->error
);
370 } else if (storage
->raft
) {
371 struct json
*txn_json
= json_array_create_2(json_null_create(),
373 w
->command
= raft_command_execute(storage
->raft
, txn_json
,
375 json_destroy(txn_json
);
376 } else if (storage
->log
) {
377 w
->error
= ovsdb_log_write(storage
->log
, data
);
379 storage
->n_written
++;
381 w
->error
= ovsdb_log_commit_block(storage
->log
);
385 /* When 'error' and 'command' are both null, it indicates that the
386 * command is complete. This is fine since this unbacked storage drops
395 /* Not suitable for writing transactions that change the schema. */
396 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
397 ovsdb_storage_write_block(struct ovsdb_storage
*storage
,
398 const struct json
*data
, const struct uuid
*prereq
,
399 struct uuid
*resultp
, bool durable
)
401 struct ovsdb_write
*w
= ovsdb_storage_write(storage
, data
,
402 prereq
, resultp
, durable
);
403 while (!ovsdb_write_is_complete(w
)) {
405 raft_run(storage
->raft
);
410 raft_wait(storage
->raft
);
415 struct ovsdb_error
*error
= ovsdb_error_clone(ovsdb_write_get_error(w
));
416 ovsdb_write_destroy(w
);
421 ovsdb_write_is_complete(const struct ovsdb_write
*w
)
425 || raft_command_get_status(w
->command
) != RAFT_CMD_INCOMPLETE
);
428 const struct ovsdb_error
*
429 ovsdb_write_get_error(const struct ovsdb_write
*w_
)
431 struct ovsdb_write
*w
= CONST_CAST(struct ovsdb_write
*, w_
);
432 ovs_assert(ovsdb_write_is_complete(w
));
434 if (w
->command
&& !w
->error
) {
435 enum raft_command_status status
= raft_command_get_status(w
->command
);
436 if (status
!= RAFT_CMD_SUCCESS
) {
437 w
->error
= ovsdb_error("cluster error", "%s",
438 raft_command_status_to_string(status
));
446 ovsdb_write_get_commit_index(const struct ovsdb_write
*w
)
448 ovs_assert(ovsdb_write_is_complete(w
));
449 return (w
->command
&& !w
->error
450 ? raft_command_get_commit_index(w
->command
)
455 ovsdb_write_wait(const struct ovsdb_write
*w
)
457 if (ovsdb_write_is_complete(w
)) {
458 poll_immediate_wake();
463 ovsdb_write_destroy(struct ovsdb_write
*w
)
466 raft_command_unref(w
->command
);
467 ovsdb_error_destroy(w
->error
);
473 schedule_next_snapshot(struct ovsdb_storage
*storage
, bool quick
)
475 if (storage
->log
|| storage
->raft
) {
476 unsigned int base
= 10 * 60 * 1000; /* 10 minutes */
477 unsigned int range
= 10 * 60 * 1000; /* 10 minutes */
483 long long int now
= time_msec();
484 storage
->next_snapshot_min
= now
+ base
+ random_range(range
);
485 storage
->next_snapshot_max
= now
+ 60LL * 60 * 24 * 1000; /* 1 day */
487 storage
->next_snapshot_min
= LLONG_MAX
;
488 storage
->next_snapshot_max
= LLONG_MAX
;
493 ovsdb_storage_should_snapshot(const struct ovsdb_storage
*storage
)
495 if (storage
->raft
|| storage
->log
) {
496 /* If we haven't reached the minimum snapshot time, don't snapshot. */
497 long long int now
= time_msec();
498 if (now
< storage
->next_snapshot_min
) {
502 /* If we can't snapshot right now, don't. */
503 if (storage
->raft
&& !raft_may_snapshot(storage
->raft
)) {
507 uint64_t log_len
= (storage
->raft
508 ? raft_get_log_length(storage
->raft
)
509 : storage
->n_read
+ storage
->n_written
);
510 if (now
< storage
->next_snapshot_max
) {
511 /* Maximum snapshot time not yet reached. Take a snapshot if there
512 * have been at least 100 log entries and the log file size has
514 bool grew_lots
= (storage
->raft
515 ? raft_grew_lots(storage
->raft
)
516 : ovsdb_log_grew_lots(storage
->log
));
517 return log_len
>= 100 && grew_lots
;
519 /* We have reached the maximum snapshot time. Take a snapshot if
520 * there have been any log entries at all. */
528 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
529 ovsdb_storage_store_snapshot__(struct ovsdb_storage
*storage
,
530 const struct json
*schema
,
531 const struct json
*data
)
534 struct json
*entries
= json_array_create_empty();
536 json_array_add(entries
, json_clone(schema
));
539 json_array_add(entries
, json_clone(data
));
541 struct ovsdb_error
*error
= raft_store_snapshot(storage
->raft
,
543 json_destroy(entries
);
545 } else if (storage
->log
) {
546 struct json
*entries
[2];
549 entries
[n
++] = CONST_CAST(struct json
*, schema
);
552 entries
[n
++] = CONST_CAST(struct json
*, data
);
554 return ovsdb_log_replace(storage
->log
, entries
, n
);
560 /* 'schema' and 'data' should faithfully represent the current schema and data,
561 * otherwise the two storing backing formats will yield divergent results. Use
562 * ovsdb_storage_write_schema_change() to change the schema. */
563 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
564 ovsdb_storage_store_snapshot(struct ovsdb_storage
*storage
,
565 const struct json
*schema
,
566 const struct json
*data
)
568 struct ovsdb_error
*error
= ovsdb_storage_store_snapshot__(storage
,
570 bool retry_quickly
= error
!= NULL
;
571 schedule_next_snapshot(storage
, retry_quickly
);
575 struct ovsdb_write
* OVS_WARN_UNUSED_RESULT
576 ovsdb_storage_write_schema_change(struct ovsdb_storage
*storage
,
577 const struct json
*schema
,
578 const struct json
*data
,
579 const struct uuid
*prereq
,
580 struct uuid
*resultp
)
582 struct ovsdb_write
*w
= xzalloc(sizeof *w
);
583 struct uuid result
= UUID_ZERO
;
584 if (storage
->error
) {
585 w
->error
= ovsdb_error_clone(storage
->error
);
586 } else if (storage
->raft
) {
587 struct json
*txn_json
= json_array_create_2(json_clone(schema
),
589 w
->command
= raft_command_execute(storage
->raft
, txn_json
,
591 json_destroy(txn_json
);
592 } else if (storage
->log
) {
593 w
->error
= ovsdb_storage_store_snapshot__(storage
, schema
, data
);
595 /* When 'error' and 'command' are both null, it indicates that the
596 * command is complete. This is fine since this unbacked storage drops