2 /* Copyright (c) 2009, 2010, 2011, 2016, 2017 Nicira, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this storage except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include "ovsdb-error.h"
23 #include "openvswitch/json.h"
24 #include "openvswitch/poll-loop.h"
25 #include "openvswitch/vlog.h"
33 VLOG_DEFINE_THIS_MODULE(storage
);
35 struct ovsdb_storage
{
36 /* There are three kinds of storage:
38 * - Standalone, backed by a disk file. 'log' is nonnull, 'raft' is
41 * - Clustered, backed by a Raft cluster. 'log' is null, 'raft' is
44 * - Memory only, unbacked. 'log' and 'raft' are null. */
45 struct ovsdb_log
*log
;
48 /* All kinds of storage. */
49 struct ovsdb_error
*error
; /* If nonnull, a permanent error. */
50 long long next_snapshot_min
; /* Earliest time to take next snapshot. */
51 long long next_snapshot_max
; /* Latest time to take next snapshot. */
53 /* Standalone only. */
55 unsigned int n_written
;
58 static void schedule_next_snapshot(struct ovsdb_storage
*, bool quick
);
60 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
61 ovsdb_storage_open__(const char *filename
, bool rw
, bool allow_clustered
,
62 struct ovsdb_storage
**storagep
)
66 struct ovsdb_log
*log
;
67 struct ovsdb_error
*error
;
68 error
= ovsdb_log_open(filename
, OVSDB_MAGIC
"|"RAFT_MAGIC
,
69 rw
? OVSDB_LOG_READ_WRITE
: OVSDB_LOG_READ_ONLY
,
75 struct raft
*raft
= NULL
;
76 if (!strcmp(ovsdb_log_get_magic(log
), RAFT_MAGIC
)) {
77 if (!allow_clustered
) {
79 return ovsdb_error(NULL
, "%s: cannot apply this operation to "
80 "clustered database file", filename
);
82 error
= raft_open(log
, &raft
);
89 struct ovsdb_storage
*storage
= xzalloc(sizeof *storage
);
92 schedule_next_snapshot(storage
, false);
97 /* Opens 'filename' for use as storage. If 'rw', opens it for read/write
98 * access, otherwise read-only. If successful, stores the new storage in
99 * '*storagep' and returns NULL; on failure, stores NULL in '*storagep' and
102 * The returned storage might be clustered or standalone, depending on what the
103 * disk file contains. */
104 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
105 ovsdb_storage_open(const char *filename
, bool rw
,
106 struct ovsdb_storage
**storagep
)
108 return ovsdb_storage_open__(filename
, rw
, true, storagep
);
111 struct ovsdb_storage
*
112 ovsdb_storage_open_standalone(const char *filename
, bool rw
)
114 struct ovsdb_storage
*storage
;
115 struct ovsdb_error
*error
= ovsdb_storage_open__(filename
, rw
, false,
118 ovs_fatal(0, "%s", ovsdb_error_to_string_free(error
));
123 /* Creates and returns new storage without any backing. Nothing will be read
124 * from the storage, and writes are discarded. */
125 struct ovsdb_storage
*
126 ovsdb_storage_create_unbacked(void)
128 struct ovsdb_storage
*storage
= xzalloc(sizeof *storage
);
129 schedule_next_snapshot(storage
, false);
134 ovsdb_storage_close(struct ovsdb_storage
*storage
)
137 ovsdb_log_close(storage
->log
);
138 raft_close(storage
->raft
);
139 ovsdb_error_destroy(storage
->error
);
145 ovsdb_storage_get_model(const struct ovsdb_storage
*storage
)
147 return storage
->raft
? "clustered" : "standalone";
151 ovsdb_storage_is_clustered(const struct ovsdb_storage
*storage
)
153 return storage
->raft
!= NULL
;
157 ovsdb_storage_is_connected(const struct ovsdb_storage
*storage
)
159 return !storage
->raft
|| raft_is_connected(storage
->raft
);
163 ovsdb_storage_is_dead(const struct ovsdb_storage
*storage
)
165 return storage
->raft
&& raft_left(storage
->raft
);
169 ovsdb_storage_is_leader(const struct ovsdb_storage
*storage
)
171 return !storage
->raft
|| raft_is_leader(storage
->raft
);
175 ovsdb_storage_get_cid(const struct ovsdb_storage
*storage
)
177 return storage
->raft
? raft_get_cid(storage
->raft
) : NULL
;
181 ovsdb_storage_get_sid(const struct ovsdb_storage
*storage
)
183 return storage
->raft
? raft_get_sid(storage
->raft
) : NULL
;
187 ovsdb_storage_get_applied_index(const struct ovsdb_storage
*storage
)
189 return storage
->raft
? raft_get_applied_index(storage
->raft
) : 0;
193 ovsdb_storage_get_memory_usage(const struct ovsdb_storage
*storage
,
197 raft_get_memory_usage(storage
->raft
, usage
);
202 ovsdb_storage_run(struct ovsdb_storage
*storage
)
205 raft_run(storage
->raft
);
210 ovsdb_storage_wait(struct ovsdb_storage
*storage
)
213 raft_wait(storage
->raft
);
217 /* Returns 'storage''s embedded name, if it has one, otherwise null.
219 * Only clustered storage has a built-in name. */
221 ovsdb_storage_get_name(const struct ovsdb_storage
*storage
)
223 return storage
->raft
? raft_get_name(storage
->raft
) : NULL
;
226 /* Attempts to read a log record from 'storage'.
228 * If successful, returns NULL and stores the transaction information in
229 * '*schemap', '*txnp', and '*txnid'. At least one of these will be nonnull.
230 * The caller owns the data and must eventually free it (with json_destroy()).
232 * If 'storage' is not clustered, 'txnid' may be null.
234 * If a read error occurs, returns the error and stores NULL in '*jsonp'.
236 * If the read reaches end of file, returns NULL and stores NULL in
238 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
239 ovsdb_storage_read(struct ovsdb_storage
*storage
,
240 struct ovsdb_schema
**schemap
,
251 struct json
*schema_json
= NULL
;
252 struct json
*txn_json
= NULL
;
255 json
= json_nullable_clone(
256 raft_next_entry(storage
->raft
, txnid
, &is_snapshot
));
259 } else if (json
->type
!= JSON_ARRAY
|| json
->array
.n
!= 2) {
261 return ovsdb_error(NULL
, "invalid commit format");
264 struct json
**e
= json
->array
.elems
;
265 schema_json
= e
[0]->type
!= JSON_NULL
? e
[0] : NULL
;
266 txn_json
= e
[1]->type
!= JSON_NULL
? e
[1] : NULL
;
267 } else if (storage
->log
) {
268 struct ovsdb_error
*error
= ovsdb_log_read(storage
->log
, &json
);
269 if (error
|| !json
) {
273 unsigned int n
= storage
->n_read
++;
274 struct json
**jsonp
= !n
? &schema_json
: &txn_json
;
277 ovsdb_log_mark_base(storage
->log
);
280 /* Unbacked. Nothing to do. */
284 /* If we got this far then we must have at least a schema or a
286 ovs_assert(schema_json
|| txn_json
);
289 struct ovsdb_schema
*schema
;
290 struct ovsdb_error
*error
= ovsdb_schema_from_json(schema_json
,
297 const char *storage_name
= ovsdb_storage_get_name(storage
);
298 const char *schema_name
= schema
->name
;
299 if (storage_name
&& strcmp(storage_name
, schema_name
)) {
300 error
= ovsdb_error(NULL
, "name %s in header does not match "
302 storage_name
, schema_name
);
304 ovsdb_schema_destroy(schema
);
312 *txnp
= json_clone(txn_json
);
319 /* Reads and returns the schema from standalone storage 'storage'. Terminates
320 * with an error on failure. */
321 struct ovsdb_schema
*
322 ovsdb_storage_read_schema(struct ovsdb_storage
*storage
)
324 ovs_assert(storage
->log
);
326 struct json
*txn_json
;
327 struct ovsdb_schema
*schema
;
328 struct ovsdb_error
*error
= ovsdb_storage_read(storage
, &schema
,
331 ovs_fatal(0, "%s", ovsdb_error_to_string_free(error
));
333 if (!schema
&& !txn_json
) {
334 ovs_fatal(0, "unexpected end of file reading schema");
336 ovs_assert(schema
&& !txn_json
);
342 ovsdb_storage_read_wait(struct ovsdb_storage
*storage
)
344 return (storage
->raft
345 ? raft_has_next_entry(storage
->raft
)
350 ovsdb_storage_unread(struct ovsdb_storage
*storage
)
352 if (storage
->error
) {
357 if (!storage
->error
) {
358 storage
->error
= ovsdb_error(NULL
, "inconsistent data");
360 } else if (storage
->log
) {
361 ovsdb_log_unread(storage
->log
);
366 struct ovsdb_error
*error
;
367 struct raft_command
*command
;
370 /* Not suitable for writing transactions that change the schema. */
371 struct ovsdb_write
* OVS_WARN_UNUSED_RESULT
372 ovsdb_storage_write(struct ovsdb_storage
*storage
, const struct json
*data
,
373 const struct uuid
*prereq
, struct uuid
*resultp
,
376 struct ovsdb_write
*w
= xzalloc(sizeof *w
);
377 struct uuid result
= UUID_ZERO
;
378 if (storage
->error
) {
379 w
->error
= ovsdb_error_clone(storage
->error
);
380 } else if (storage
->raft
) {
381 struct json
*txn_json
= json_array_create_2(json_null_create(),
383 w
->command
= raft_command_execute(storage
->raft
, txn_json
,
385 json_destroy(txn_json
);
386 } else if (storage
->log
) {
387 w
->error
= ovsdb_log_write(storage
->log
, data
);
389 storage
->n_written
++;
391 w
->error
= ovsdb_log_commit_block(storage
->log
);
395 /* When 'error' and 'command' are both null, it indicates that the
396 * command is complete. This is fine since this unbacked storage drops
405 /* Not suitable for writing transactions that change the schema. */
406 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
407 ovsdb_storage_write_block(struct ovsdb_storage
*storage
,
408 const struct json
*data
, const struct uuid
*prereq
,
409 struct uuid
*resultp
, bool durable
)
411 struct ovsdb_write
*w
= ovsdb_storage_write(storage
, data
,
412 prereq
, resultp
, durable
);
413 while (!ovsdb_write_is_complete(w
)) {
415 raft_run(storage
->raft
);
420 raft_wait(storage
->raft
);
425 struct ovsdb_error
*error
= ovsdb_error_clone(ovsdb_write_get_error(w
));
426 ovsdb_write_destroy(w
);
431 ovsdb_write_is_complete(const struct ovsdb_write
*w
)
435 || raft_command_get_status(w
->command
) != RAFT_CMD_INCOMPLETE
);
438 const struct ovsdb_error
*
439 ovsdb_write_get_error(const struct ovsdb_write
*w_
)
441 struct ovsdb_write
*w
= CONST_CAST(struct ovsdb_write
*, w_
);
442 ovs_assert(ovsdb_write_is_complete(w
));
444 if (w
->command
&& !w
->error
) {
445 enum raft_command_status status
= raft_command_get_status(w
->command
);
446 if (status
!= RAFT_CMD_SUCCESS
) {
447 w
->error
= ovsdb_error("cluster error", "%s",
448 raft_command_status_to_string(status
));
456 ovsdb_write_get_commit_index(const struct ovsdb_write
*w
)
458 ovs_assert(ovsdb_write_is_complete(w
));
459 return (w
->command
&& !w
->error
460 ? raft_command_get_commit_index(w
->command
)
465 ovsdb_write_wait(const struct ovsdb_write
*w
)
467 if (ovsdb_write_is_complete(w
)) {
468 poll_immediate_wake();
473 ovsdb_write_destroy(struct ovsdb_write
*w
)
476 raft_command_unref(w
->command
);
477 ovsdb_error_destroy(w
->error
);
483 schedule_next_snapshot(struct ovsdb_storage
*storage
, bool quick
)
485 if (storage
->log
|| storage
->raft
) {
486 unsigned int base
= 10 * 60 * 1000; /* 10 minutes */
487 unsigned int range
= 10 * 60 * 1000; /* 10 minutes */
493 long long int now
= time_msec();
494 storage
->next_snapshot_min
= now
+ base
+ random_range(range
);
495 storage
->next_snapshot_max
= now
+ 60LL * 60 * 24 * 1000; /* 1 day */
497 storage
->next_snapshot_min
= LLONG_MAX
;
498 storage
->next_snapshot_max
= LLONG_MAX
;
503 ovsdb_storage_should_snapshot(const struct ovsdb_storage
*storage
)
505 if (storage
->raft
|| storage
->log
) {
506 /* If we haven't reached the minimum snapshot time, don't snapshot. */
507 long long int now
= time_msec();
508 if (now
< storage
->next_snapshot_min
) {
512 /* If we can't snapshot right now, don't. */
513 if (storage
->raft
&& !raft_may_snapshot(storage
->raft
)) {
517 uint64_t log_len
= (storage
->raft
518 ? raft_get_log_length(storage
->raft
)
519 : storage
->n_read
+ storage
->n_written
);
520 if (now
< storage
->next_snapshot_max
) {
521 /* Maximum snapshot time not yet reached. Take a snapshot if there
522 * have been at least 100 log entries and the log file size has
524 bool grew_lots
= (storage
->raft
525 ? raft_grew_lots(storage
->raft
)
526 : ovsdb_log_grew_lots(storage
->log
));
527 return log_len
>= 100 && grew_lots
;
529 /* We have reached the maximum snapshot time. Take a snapshot if
530 * there have been any log entries at all. */
538 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
539 ovsdb_storage_store_snapshot__(struct ovsdb_storage
*storage
,
540 const struct json
*schema
,
541 const struct json
*data
)
544 struct json
*entries
= json_array_create_empty();
546 json_array_add(entries
, json_clone(schema
));
549 json_array_add(entries
, json_clone(data
));
551 struct ovsdb_error
*error
= raft_store_snapshot(storage
->raft
,
553 json_destroy(entries
);
555 } else if (storage
->log
) {
556 struct json
*entries
[2];
559 entries
[n
++] = CONST_CAST(struct json
*, schema
);
562 entries
[n
++] = CONST_CAST(struct json
*, data
);
564 return ovsdb_log_replace(storage
->log
, entries
, n
);
570 /* 'schema' and 'data' should faithfully represent the current schema and data,
571 * otherwise the two storing backing formats will yield divergent results. Use
572 * ovsdb_storage_write_schema_change() to change the schema. */
573 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
574 ovsdb_storage_store_snapshot(struct ovsdb_storage
*storage
,
575 const struct json
*schema
,
576 const struct json
*data
)
578 struct ovsdb_error
*error
= ovsdb_storage_store_snapshot__(storage
,
580 bool retry_quickly
= error
!= NULL
;
581 schedule_next_snapshot(storage
, retry_quickly
);
585 struct ovsdb_write
* OVS_WARN_UNUSED_RESULT
586 ovsdb_storage_write_schema_change(struct ovsdb_storage
*storage
,
587 const struct json
*schema
,
588 const struct json
*data
,
589 const struct uuid
*prereq
,
590 struct uuid
*resultp
)
592 struct ovsdb_write
*w
= xzalloc(sizeof *w
);
593 struct uuid result
= UUID_ZERO
;
594 if (storage
->error
) {
595 w
->error
= ovsdb_error_clone(storage
->error
);
596 } else if (storage
->raft
) {
597 struct json
*txn_json
= json_array_create_2(json_clone(schema
),
599 w
->command
= raft_command_execute(storage
->raft
, txn_json
,
601 json_destroy(txn_json
);
602 } else if (storage
->log
) {
603 w
->error
= ovsdb_storage_store_snapshot__(storage
, schema
, data
);
605 /* When 'error' and 'command' are both null, it indicates that the
606 * command is complete. This is fine since this unbacked storage drops
616 ovsdb_storage_peek_last_eid(struct ovsdb_storage
*storage
)
618 if (!storage
->raft
) {
621 return raft_current_eid(storage
->raft
);