2 /* Copyright (c) 2009, 2010, 2011, 2016, 2017 Nicira, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this storage except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include "ovsdb-error.h"
23 #include "openvswitch/json.h"
24 #include "openvswitch/poll-loop.h"
25 #include "openvswitch/vlog.h"
33 VLOG_DEFINE_THIS_MODULE(storage
);
35 struct ovsdb_storage
{
36 /* There are three kinds of storage:
38 * - Standalone, backed by a disk file. 'log' is nonnull, 'raft' is
41 * - Clustered, backed by a Raft cluster. 'log' is null, 'raft' is
44 * - Memory only, unbacked. 'log' and 'raft' are null. */
45 struct ovsdb_log
*log
;
48 /* All kinds of storage. */
49 struct ovsdb_error
*error
; /* If nonnull, a permanent error. */
50 long long next_snapshot_min
; /* Earliest time to take next snapshot. */
51 long long next_snapshot_max
; /* Latest time to take next snapshot. */
53 /* Standalone only. */
55 unsigned int n_written
;
58 static void schedule_next_snapshot(struct ovsdb_storage
*, bool quick
);
60 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
61 ovsdb_storage_open__(const char *filename
, bool rw
, bool allow_clustered
,
62 struct ovsdb_storage
**storagep
)
66 struct ovsdb_log
*log
;
67 struct ovsdb_error
*error
;
68 error
= ovsdb_log_open(filename
, OVSDB_MAGIC
"|"RAFT_MAGIC
,
69 rw
? OVSDB_LOG_READ_WRITE
: OVSDB_LOG_READ_ONLY
,
75 struct raft
*raft
= NULL
;
76 if (!strcmp(ovsdb_log_get_magic(log
), RAFT_MAGIC
)) {
77 if (!allow_clustered
) {
79 return ovsdb_error(NULL
, "%s: cannot apply this operation to "
80 "clustered database file", filename
);
82 error
= raft_open(log
, &raft
);
89 struct ovsdb_storage
*storage
= xzalloc(sizeof *storage
);
92 schedule_next_snapshot(storage
, false);
97 /* Opens 'filename' for use as storage. If 'rw', opens it for read/write
98 * access, otherwise read-only. If successful, stores the new storage in
99 * '*storagep' and returns NULL; on failure, stores NULL in '*storagep' and
102 * The returned storage might be clustered or standalone, depending on what the
103 * disk file contains. */
104 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
105 ovsdb_storage_open(const char *filename
, bool rw
,
106 struct ovsdb_storage
**storagep
)
108 return ovsdb_storage_open__(filename
, rw
, true, storagep
);
111 struct ovsdb_storage
*
112 ovsdb_storage_open_standalone(const char *filename
, bool rw
)
114 struct ovsdb_storage
*storage
;
115 struct ovsdb_error
*error
= ovsdb_storage_open__(filename
, rw
, false,
118 ovs_fatal(0, "%s", ovsdb_error_to_string_free(error
));
123 /* Creates and returns new storage without any backing. Nothing will be read
124 * from the storage, and writes are discarded. */
125 struct ovsdb_storage
*
126 ovsdb_storage_create_unbacked(void)
128 struct ovsdb_storage
*storage
= xzalloc(sizeof *storage
);
129 schedule_next_snapshot(storage
, false);
134 ovsdb_storage_close(struct ovsdb_storage
*storage
)
137 ovsdb_log_close(storage
->log
);
138 raft_close(storage
->raft
);
139 ovsdb_error_destroy(storage
->error
);
145 ovsdb_storage_get_model(const struct ovsdb_storage
*storage
)
147 return storage
->raft
? "clustered" : "standalone";
151 ovsdb_storage_is_clustered(const struct ovsdb_storage
*storage
)
153 return storage
->raft
!= NULL
;
157 ovsdb_storage_is_connected(const struct ovsdb_storage
*storage
)
159 return !storage
->raft
|| raft_is_connected(storage
->raft
);
163 ovsdb_storage_is_dead(const struct ovsdb_storage
*storage
)
165 return storage
->raft
&& raft_left(storage
->raft
);
169 ovsdb_storage_is_leader(const struct ovsdb_storage
*storage
)
171 return !storage
->raft
|| raft_is_leader(storage
->raft
);
175 ovsdb_storage_get_cid(const struct ovsdb_storage
*storage
)
177 return storage
->raft
? raft_get_cid(storage
->raft
) : NULL
;
181 ovsdb_storage_get_sid(const struct ovsdb_storage
*storage
)
183 return storage
->raft
? raft_get_sid(storage
->raft
) : NULL
;
187 ovsdb_storage_get_applied_index(const struct ovsdb_storage
*storage
)
189 return storage
->raft
? raft_get_applied_index(storage
->raft
) : 0;
193 ovsdb_storage_get_memory_usage(const struct ovsdb_storage
*storage
,
197 raft_get_memory_usage(storage
->raft
, usage
);
202 ovsdb_storage_get_error(const struct ovsdb_storage
*storage
)
204 if (storage
->error
) {
205 return ovsdb_error_to_string(storage
->error
);
212 ovsdb_storage_run(struct ovsdb_storage
*storage
)
215 raft_run(storage
->raft
);
220 ovsdb_storage_wait(struct ovsdb_storage
*storage
)
223 raft_wait(storage
->raft
);
227 /* Returns 'storage''s embedded name, if it has one, otherwise null.
229 * Only clustered storage has a built-in name. */
231 ovsdb_storage_get_name(const struct ovsdb_storage
*storage
)
233 return storage
->raft
? raft_get_name(storage
->raft
) : NULL
;
236 /* Attempts to read a log record from 'storage'.
238 * If successful, returns NULL and stores the transaction information in
239 * '*schemap', '*txnp', and '*txnid'. At least one of these will be nonnull.
240 * The caller owns the data and must eventually free it (with json_destroy()).
242 * If 'storage' is not clustered, 'txnid' may be null.
244 * If a read error occurs, returns the error and stores NULL in '*jsonp'.
246 * If the read reaches end of file, returns NULL and stores NULL in
248 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
249 ovsdb_storage_read(struct ovsdb_storage
*storage
,
250 struct ovsdb_schema
**schemap
,
261 struct json
*schema_json
= NULL
;
262 struct json
*txn_json
= NULL
;
265 json
= json_nullable_clone(
266 raft_next_entry(storage
->raft
, txnid
, &is_snapshot
));
269 } else if (json
->type
!= JSON_ARRAY
|| json
->array
.n
!= 2) {
271 return ovsdb_error(NULL
, "invalid commit format");
274 struct json
**e
= json
->array
.elems
;
275 schema_json
= e
[0]->type
!= JSON_NULL
? e
[0] : NULL
;
276 txn_json
= e
[1]->type
!= JSON_NULL
? e
[1] : NULL
;
277 } else if (storage
->log
) {
278 struct ovsdb_error
*error
= ovsdb_log_read(storage
->log
, &json
);
279 if (error
|| !json
) {
283 unsigned int n
= storage
->n_read
++;
284 struct json
**jsonp
= !n
? &schema_json
: &txn_json
;
287 ovsdb_log_mark_base(storage
->log
);
290 /* Unbacked. Nothing to do. */
294 /* If we got this far then we must have at least a schema or a
296 ovs_assert(schema_json
|| txn_json
);
299 struct ovsdb_schema
*schema
;
300 struct ovsdb_error
*error
= ovsdb_schema_from_json(schema_json
,
307 const char *storage_name
= ovsdb_storage_get_name(storage
);
308 const char *schema_name
= schema
->name
;
309 if (storage_name
&& strcmp(storage_name
, schema_name
)) {
310 error
= ovsdb_error(NULL
, "name %s in header does not match "
312 storage_name
, schema_name
);
314 ovsdb_schema_destroy(schema
);
322 *txnp
= json_clone(txn_json
);
329 /* Reads and returns the schema from standalone storage 'storage'. Terminates
330 * with an error on failure. */
331 struct ovsdb_schema
*
332 ovsdb_storage_read_schema(struct ovsdb_storage
*storage
)
334 ovs_assert(storage
->log
);
336 struct json
*txn_json
;
337 struct ovsdb_schema
*schema
;
338 struct ovsdb_error
*error
= ovsdb_storage_read(storage
, &schema
,
341 ovs_fatal(0, "%s", ovsdb_error_to_string_free(error
));
343 if (!schema
&& !txn_json
) {
344 ovs_fatal(0, "unexpected end of file reading schema");
346 ovs_assert(schema
&& !txn_json
);
352 ovsdb_storage_read_wait(struct ovsdb_storage
*storage
)
354 return (storage
->raft
355 ? raft_has_next_entry(storage
->raft
)
360 ovsdb_storage_unread(struct ovsdb_storage
*storage
)
362 if (storage
->error
) {
367 if (!storage
->error
) {
368 storage
->error
= ovsdb_error(NULL
, "inconsistent data");
370 } else if (storage
->log
) {
371 ovsdb_log_unread(storage
->log
);
376 struct ovsdb_error
*error
;
377 struct raft_command
*command
;
380 /* Not suitable for writing transactions that change the schema. */
381 struct ovsdb_write
* OVS_WARN_UNUSED_RESULT
382 ovsdb_storage_write(struct ovsdb_storage
*storage
, const struct json
*data
,
383 const struct uuid
*prereq
, struct uuid
*resultp
,
386 struct ovsdb_write
*w
= xzalloc(sizeof *w
);
387 struct uuid result
= UUID_ZERO
;
388 if (storage
->error
) {
389 w
->error
= ovsdb_error_clone(storage
->error
);
390 } else if (storage
->raft
) {
391 struct json
*txn_json
= json_array_create_2(json_null_create(),
393 w
->command
= raft_command_execute(storage
->raft
, txn_json
,
395 json_destroy(txn_json
);
396 } else if (storage
->log
) {
397 w
->error
= ovsdb_log_write(storage
->log
, data
);
399 storage
->n_written
++;
401 w
->error
= ovsdb_log_commit_block(storage
->log
);
405 /* When 'error' and 'command' are both null, it indicates that the
406 * command is complete. This is fine since this unbacked storage drops
415 /* Not suitable for writing transactions that change the schema. */
416 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
417 ovsdb_storage_write_block(struct ovsdb_storage
*storage
,
418 const struct json
*data
, const struct uuid
*prereq
,
419 struct uuid
*resultp
, bool durable
)
421 struct ovsdb_write
*w
= ovsdb_storage_write(storage
, data
,
422 prereq
, resultp
, durable
);
423 while (!ovsdb_write_is_complete(w
)) {
425 raft_run(storage
->raft
);
430 raft_wait(storage
->raft
);
435 struct ovsdb_error
*error
= ovsdb_error_clone(ovsdb_write_get_error(w
));
436 ovsdb_write_destroy(w
);
441 ovsdb_write_is_complete(const struct ovsdb_write
*w
)
445 || raft_command_get_status(w
->command
) != RAFT_CMD_INCOMPLETE
);
448 const struct ovsdb_error
*
449 ovsdb_write_get_error(const struct ovsdb_write
*w_
)
451 struct ovsdb_write
*w
= CONST_CAST(struct ovsdb_write
*, w_
);
452 ovs_assert(ovsdb_write_is_complete(w
));
454 if (w
->command
&& !w
->error
) {
455 enum raft_command_status status
= raft_command_get_status(w
->command
);
456 if (status
!= RAFT_CMD_SUCCESS
) {
457 w
->error
= ovsdb_error("cluster error", "%s",
458 raft_command_status_to_string(status
));
466 ovsdb_write_get_commit_index(const struct ovsdb_write
*w
)
468 ovs_assert(ovsdb_write_is_complete(w
));
469 return (w
->command
&& !w
->error
470 ? raft_command_get_commit_index(w
->command
)
475 ovsdb_write_wait(const struct ovsdb_write
*w
)
477 if (ovsdb_write_is_complete(w
)) {
478 poll_immediate_wake();
483 ovsdb_write_destroy(struct ovsdb_write
*w
)
486 raft_command_unref(w
->command
);
487 ovsdb_error_destroy(w
->error
);
493 schedule_next_snapshot(struct ovsdb_storage
*storage
, bool quick
)
495 if (storage
->log
|| storage
->raft
) {
496 unsigned int base
= 10 * 60 * 1000; /* 10 minutes */
497 unsigned int range
= 10 * 60 * 1000; /* 10 minutes */
503 long long int now
= time_msec();
504 storage
->next_snapshot_min
= now
+ base
+ random_range(range
);
505 storage
->next_snapshot_max
= now
+ 60LL * 60 * 24 * 1000; /* 1 day */
507 storage
->next_snapshot_min
= LLONG_MAX
;
508 storage
->next_snapshot_max
= LLONG_MAX
;
513 ovsdb_storage_should_snapshot(const struct ovsdb_storage
*storage
)
515 if (storage
->raft
|| storage
->log
) {
516 /* If we haven't reached the minimum snapshot time, don't snapshot. */
517 long long int now
= time_msec();
518 if (now
< storage
->next_snapshot_min
) {
522 /* If we can't snapshot right now, don't. */
523 if (storage
->raft
&& !raft_may_snapshot(storage
->raft
)) {
527 uint64_t log_len
= (storage
->raft
528 ? raft_get_log_length(storage
->raft
)
529 : storage
->n_read
+ storage
->n_written
);
530 if (now
< storage
->next_snapshot_max
) {
531 /* Maximum snapshot time not yet reached. Take a snapshot if there
532 * have been at least 100 log entries and the log file size has
534 bool grew_lots
= (storage
->raft
535 ? raft_grew_lots(storage
->raft
)
536 : ovsdb_log_grew_lots(storage
->log
));
537 return log_len
>= 100 && grew_lots
;
539 /* We have reached the maximum snapshot time. Take a snapshot if
540 * there have been any log entries at all. */
548 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
549 ovsdb_storage_store_snapshot__(struct ovsdb_storage
*storage
,
550 const struct json
*schema
,
551 const struct json
*data
)
554 struct json
*entries
= json_array_create_empty();
556 json_array_add(entries
, json_clone(schema
));
559 json_array_add(entries
, json_clone(data
));
561 struct ovsdb_error
*error
= raft_store_snapshot(storage
->raft
,
563 json_destroy(entries
);
565 } else if (storage
->log
) {
566 struct json
*entries
[2];
569 entries
[n
++] = CONST_CAST(struct json
*, schema
);
572 entries
[n
++] = CONST_CAST(struct json
*, data
);
574 return ovsdb_log_replace(storage
->log
, entries
, n
);
580 /* 'schema' and 'data' should faithfully represent the current schema and data,
581 * otherwise the two storing backing formats will yield divergent results. Use
582 * ovsdb_storage_write_schema_change() to change the schema. */
583 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
584 ovsdb_storage_store_snapshot(struct ovsdb_storage
*storage
,
585 const struct json
*schema
,
586 const struct json
*data
)
588 struct ovsdb_error
*error
= ovsdb_storage_store_snapshot__(storage
,
590 bool retry_quickly
= error
!= NULL
;
591 schedule_next_snapshot(storage
, retry_quickly
);
595 struct ovsdb_write
* OVS_WARN_UNUSED_RESULT
596 ovsdb_storage_write_schema_change(struct ovsdb_storage
*storage
,
597 const struct json
*schema
,
598 const struct json
*data
,
599 const struct uuid
*prereq
,
600 struct uuid
*resultp
)
602 struct ovsdb_write
*w
= xzalloc(sizeof *w
);
603 struct uuid result
= UUID_ZERO
;
604 if (storage
->error
) {
605 w
->error
= ovsdb_error_clone(storage
->error
);
606 } else if (storage
->raft
) {
607 struct json
*txn_json
= json_array_create_2(json_clone(schema
),
609 w
->command
= raft_command_execute(storage
->raft
, txn_json
,
611 json_destroy(txn_json
);
612 } else if (storage
->log
) {
613 w
->error
= ovsdb_storage_store_snapshot__(storage
, schema
, data
);
615 /* When 'error' and 'command' are both null, it indicates that the
616 * command is complete. This is fine since this unbacked storage drops
626 ovsdb_storage_peek_last_eid(struct ovsdb_storage
*storage
)
628 if (!storage
->raft
) {
631 return raft_current_eid(storage
->raft
);