]> git.proxmox.com Git - mirror_ovs.git/blame - ovsdb/storage.c
stream: Allow timeout configuration for open_block.
[mirror_ovs.git] / ovsdb / storage.c
CommitLineData
1b1d2e6d
BP
1
2/* Copyright (c) 2009, 2010, 2011, 2016, 2017 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this storage except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <config.h>
18
19#include "storage.h"
20#include <string.h>
21#include "log.h"
22#include "ovsdb-error.h"
23#include "openvswitch/json.h"
24#include "openvswitch/poll-loop.h"
25#include "openvswitch/vlog.h"
26#include "ovsdb.h"
27#include "raft.h"
28#include "random.h"
29#include "timeval.h"
30#include "util.h"
31
32VLOG_DEFINE_THIS_MODULE(storage);
33
34struct ovsdb_storage {
35 /* There are three kinds of storage:
36 *
37 * - Standalone, backed by a disk file. 'log' is nonnull, 'raft' is
38 * null.
39 *
40 * - Clustered, backed by a Raft cluster. 'log' is null, 'raft' is
41 * nonnull.
42 *
43 * - Memory only, unbacked. 'log' and 'raft' are null. */
44 struct ovsdb_log *log;
45 struct raft *raft;
46
47 /* All kinds of storage. */
48 struct ovsdb_error *error; /* If nonnull, a permanent error. */
49 long long next_snapshot_min; /* Earliest time to take next snapshot. */
50 long long next_snapshot_max; /* Latest time to take next snapshot. */
51
52 /* Standalone only. */
53 unsigned int n_read;
54 unsigned int n_written;
55};
56
57static void schedule_next_snapshot(struct ovsdb_storage *, bool quick);
58
59static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
60ovsdb_storage_open__(const char *filename, bool rw, bool allow_clustered,
61 struct ovsdb_storage **storagep)
62{
63 *storagep = NULL;
64
65 struct ovsdb_log *log;
66 struct ovsdb_error *error;
67 error = ovsdb_log_open(filename, OVSDB_MAGIC"|"RAFT_MAGIC,
68 rw ? OVSDB_LOG_READ_WRITE : OVSDB_LOG_READ_ONLY,
69 -1, &log);
70 if (error) {
71 return error;
72 }
73
74 struct raft *raft = NULL;
75 if (!strcmp(ovsdb_log_get_magic(log), RAFT_MAGIC)) {
76 if (!allow_clustered) {
77 ovsdb_log_close(log);
78 return ovsdb_error(NULL, "%s: cannot apply this operation to "
79 "clustered database file", filename);
80 }
81 error = raft_open(log, &raft);
82 log = NULL;
83 if (error) {
84 return error;
85 }
86 }
87
88 struct ovsdb_storage *storage = xzalloc(sizeof *storage);
89 storage->log = log;
90 storage->raft = raft;
91 schedule_next_snapshot(storage, false);
92 *storagep = storage;
93 return NULL;
94}
95
96/* Opens 'filename' for use as storage. If 'rw', opens it for read/write
97 * access, otherwise read-only. If successful, stores the new storage in
98 * '*storagep' and returns NULL; on failure, stores NULL in '*storagep' and
99 * returns the error.
100 *
101 * The returned storage might be clustered or standalone, depending on what the
102 * disk file contains. */
103struct ovsdb_error * OVS_WARN_UNUSED_RESULT
104ovsdb_storage_open(const char *filename, bool rw,
105 struct ovsdb_storage **storagep)
106{
107 return ovsdb_storage_open__(filename, rw, true, storagep);
108}
109
110struct ovsdb_storage *
111ovsdb_storage_open_standalone(const char *filename, bool rw)
112{
113 struct ovsdb_storage *storage;
114 struct ovsdb_error *error = ovsdb_storage_open__(filename, rw, false,
115 &storage);
116 if (error) {
117 ovs_fatal(0, "%s", ovsdb_error_to_string_free(error));
118 }
119 return storage;
120}
121
122/* Creates and returns new storage without any backing. Nothing will be read
123 * from the storage, and writes are discarded. */
124struct ovsdb_storage *
125ovsdb_storage_create_unbacked(void)
126{
127 struct ovsdb_storage *storage = xzalloc(sizeof *storage);
128 schedule_next_snapshot(storage, false);
129 return storage;
130}
131
132void
133ovsdb_storage_close(struct ovsdb_storage *storage)
134{
135 if (storage) {
136 ovsdb_log_close(storage->log);
137 raft_close(storage->raft);
138 ovsdb_error_destroy(storage->error);
139 free(storage);
140 }
141}
142
143const char *
144ovsdb_storage_get_model(const struct ovsdb_storage *storage)
145{
146 return storage->raft ? "clustered" : "standalone";
147}
148
149bool
150ovsdb_storage_is_clustered(const struct ovsdb_storage *storage)
151{
152 return storage->raft != NULL;
153}
154
155bool
156ovsdb_storage_is_connected(const struct ovsdb_storage *storage)
157{
158 return !storage->raft || raft_is_connected(storage->raft);
159}
160
161bool
162ovsdb_storage_is_dead(const struct ovsdb_storage *storage)
163{
164 return storage->raft && raft_left(storage->raft);
165}
166
167bool
168ovsdb_storage_is_leader(const struct ovsdb_storage *storage)
169{
170 return !storage->raft || raft_is_leader(storage->raft);
171}
172
173const struct uuid *
174ovsdb_storage_get_cid(const struct ovsdb_storage *storage)
175{
176 return storage->raft ? raft_get_cid(storage->raft) : NULL;
177}
178
179const struct uuid *
180ovsdb_storage_get_sid(const struct ovsdb_storage *storage)
181{
182 return storage->raft ? raft_get_sid(storage->raft) : NULL;
183}
184
185uint64_t
186ovsdb_storage_get_applied_index(const struct ovsdb_storage *storage)
187{
188 return storage->raft ? raft_get_applied_index(storage->raft) : 0;
189}
190
191void
192ovsdb_storage_run(struct ovsdb_storage *storage)
193{
194 if (storage->raft) {
195 raft_run(storage->raft);
196 }
197}
198
199void
200ovsdb_storage_wait(struct ovsdb_storage *storage)
201{
202 if (storage->raft) {
203 raft_wait(storage->raft);
204 }
205}
206
207/* Returns 'storage''s embedded name, if it has one, otherwise null.
208 *
209 * Only clustered storage has a built-in name. */
210const char *
211ovsdb_storage_get_name(const struct ovsdb_storage *storage)
212{
213 return storage->raft ? raft_get_name(storage->raft) : NULL;
214}
215
216/* Attempts to read a log record from 'storage'.
217 *
218 * If successful, returns NULL and stores the transaction information in
219 * '*schemap', '*txnp', and '*txnid'. At least one of these will be nonnull.
220 * The caller owns the data and must eventually free it (with json_destroy()).
221 *
222 * If 'storage' is not clustered, 'txnid' may be null.
223 *
224 * If a read error occurs, returns the error and stores NULL in '*jsonp'.
225 *
226 * If the read reaches end of file, returns NULL and stores NULL in
227 * '*jsonp'. */
228struct ovsdb_error * OVS_WARN_UNUSED_RESULT
229ovsdb_storage_read(struct ovsdb_storage *storage,
230 struct ovsdb_schema **schemap,
231 struct json **txnp,
232 struct uuid *txnid)
233{
234 *schemap = NULL;
235 *txnp = NULL;
236 if (txnid) {
237 *txnid = UUID_ZERO;
238 }
239
240 struct json *json;
241 struct json *schema_json = NULL;
242 struct json *txn_json = NULL;
243 if (storage->raft) {
244 bool is_snapshot;
245 json = json_nullable_clone(
246 raft_next_entry(storage->raft, txnid, &is_snapshot));
247 if (!json) {
248 return NULL;
fa37affa 249 } else if (json->type != JSON_ARRAY || json->array.n != 2) {
1b1d2e6d
BP
250 json_destroy(json);
251 return ovsdb_error(NULL, "invalid commit format");
252 }
253
fa37affa 254 struct json **e = json->array.elems;
1b1d2e6d
BP
255 schema_json = e[0]->type != JSON_NULL ? e[0] : NULL;
256 txn_json = e[1]->type != JSON_NULL ? e[1] : NULL;
257 } else if (storage->log) {
258 struct ovsdb_error *error = ovsdb_log_read(storage->log, &json);
259 if (error || !json) {
260 return error;
261 }
262
263 unsigned int n = storage->n_read++;
264 struct json **jsonp = !n ? &schema_json : &txn_json;
265 *jsonp = json;
266 if (n == 1) {
267 ovsdb_log_mark_base(storage->log);
268 }
269 } else {
270 /* Unbacked. Nothing to do. */
271 return NULL;
272 }
273
274 /* If we got this far then we must have at least a schema or a
275 * transaction. */
276 ovs_assert(schema_json || txn_json);
277
278 if (schema_json) {
279 struct ovsdb_schema *schema;
280 struct ovsdb_error *error = ovsdb_schema_from_json(schema_json,
281 &schema);
282 if (error) {
283 json_destroy(json);
284 return error;
285 }
286
287 const char *storage_name = ovsdb_storage_get_name(storage);
288 const char *schema_name = schema->name;
289 if (storage_name && strcmp(storage_name, schema_name)) {
290 error = ovsdb_error(NULL, "name %s in header does not match "
291 "name %s in schema",
292 storage_name, schema_name);
293 json_destroy(json);
294 ovsdb_schema_destroy(schema);
295 return error;
296 }
297
298 *schemap = schema;
299 }
300
301 if (txn_json) {
302 *txnp = json_clone(txn_json);
303 }
304
305 json_destroy(json);
306 return NULL;
307}
308
309/* Reads and returns the schema from standalone storage 'storage'. Terminates
310 * with an error on failure. */
311struct ovsdb_schema *
312ovsdb_storage_read_schema(struct ovsdb_storage *storage)
313{
314 ovs_assert(storage->log);
315
316 struct json *txn_json;
317 struct ovsdb_schema *schema;
318 struct ovsdb_error *error = ovsdb_storage_read(storage, &schema,
319 &txn_json, NULL);
320 if (error) {
321 ovs_fatal(0, "%s", ovsdb_error_to_string_free(error));
322 }
323 if (!schema && !txn_json) {
324 ovs_fatal(0, "unexpected end of file reading schema");
325 }
326 ovs_assert(schema && !txn_json);
327
328 return schema;
329}
330
331bool
332ovsdb_storage_read_wait(struct ovsdb_storage *storage)
333{
334 return (storage->raft
335 ? raft_has_next_entry(storage->raft)
336 : false);
337}
338
339void
340ovsdb_storage_unread(struct ovsdb_storage *storage)
341{
342 if (storage->error) {
343 return;
344 }
345
346 if (storage->raft) {
347 if (!storage->error) {
348 storage->error = ovsdb_error(NULL, "inconsistent data");
349 }
350 } else if (storage->log) {
351 ovsdb_log_unread(storage->log);
352 }
353}
354
355struct ovsdb_write {
356 struct ovsdb_error *error;
357 struct raft_command *command;
358};
359
360/* Not suitable for writing transactions that change the schema. */
361struct ovsdb_write * OVS_WARN_UNUSED_RESULT
362ovsdb_storage_write(struct ovsdb_storage *storage, const struct json *data,
363 const struct uuid *prereq, struct uuid *resultp,
364 bool durable)
365{
366 struct ovsdb_write *w = xzalloc(sizeof *w);
367 struct uuid result = UUID_ZERO;
368 if (storage->error) {
369 w->error = ovsdb_error_clone(storage->error);
370 } else if (storage->raft) {
371 struct json *txn_json = json_array_create_2(json_null_create(),
372 json_clone(data));
373 w->command = raft_command_execute(storage->raft, txn_json,
374 prereq, &result);
375 json_destroy(txn_json);
376 } else if (storage->log) {
377 w->error = ovsdb_log_write(storage->log, data);
378 if (!w->error) {
379 storage->n_written++;
380 if (durable) {
381 w->error = ovsdb_log_commit_block(storage->log);
382 }
383 }
384 } else {
385 /* When 'error' and 'command' are both null, it indicates that the
386 * command is complete. This is fine since this unbacked storage drops
387 * writes. */
388 }
389 if (resultp) {
390 *resultp = result;
391 }
392 return w;
393}
394
395/* Not suitable for writing transactions that change the schema. */
396struct ovsdb_error * OVS_WARN_UNUSED_RESULT
397ovsdb_storage_write_block(struct ovsdb_storage *storage,
398 const struct json *data, const struct uuid *prereq,
399 struct uuid *resultp, bool durable)
400{
401 struct ovsdb_write *w = ovsdb_storage_write(storage, data,
402 prereq, resultp, durable);
403 while (!ovsdb_write_is_complete(w)) {
404 if (storage->raft) {
405 raft_run(storage->raft);
406 }
407
408 ovsdb_write_wait(w);
409 if (storage->raft) {
410 raft_wait(storage->raft);
411 }
412 poll_block();
413 }
414
415 struct ovsdb_error *error = ovsdb_error_clone(ovsdb_write_get_error(w));
416 ovsdb_write_destroy(w);
417 return error;
418}
419
420bool
421ovsdb_write_is_complete(const struct ovsdb_write *w)
422{
423 return (w->error
424 || !w->command
425 || raft_command_get_status(w->command) != RAFT_CMD_INCOMPLETE);
426}
427
428const struct ovsdb_error *
429ovsdb_write_get_error(const struct ovsdb_write *w_)
430{
431 struct ovsdb_write *w = CONST_CAST(struct ovsdb_write *, w_);
432 ovs_assert(ovsdb_write_is_complete(w));
433
434 if (w->command && !w->error) {
435 enum raft_command_status status = raft_command_get_status(w->command);
436 if (status != RAFT_CMD_SUCCESS) {
437 w->error = ovsdb_error("cluster error", "%s",
438 raft_command_status_to_string(status));
439 }
440 }
441
442 return w->error;
443}
444
445uint64_t
446ovsdb_write_get_commit_index(const struct ovsdb_write *w)
447{
448 ovs_assert(ovsdb_write_is_complete(w));
449 return (w->command && !w->error
450 ? raft_command_get_commit_index(w->command)
451 : 0);
452}
453
454void
455ovsdb_write_wait(const struct ovsdb_write *w)
456{
457 if (ovsdb_write_is_complete(w)) {
458 poll_immediate_wake();
459 }
460}
461
462void
463ovsdb_write_destroy(struct ovsdb_write *w)
464{
465 if (w) {
466 raft_command_unref(w->command);
467 ovsdb_error_destroy(w->error);
468 free(w);
469 }
470}
471
472static void
473schedule_next_snapshot(struct ovsdb_storage *storage, bool quick)
474{
475 if (storage->log || storage->raft) {
476 unsigned int base = 10 * 60 * 1000; /* 10 minutes */
477 unsigned int range = 10 * 60 * 1000; /* 10 minutes */
478 if (quick) {
479 base /= 10;
480 range /= 10;
481 }
482
483 long long int now = time_msec();
484 storage->next_snapshot_min = now + base + random_range(range);
485 storage->next_snapshot_max = now + 60LL * 60 * 24 * 1000; /* 1 day */
486 } else {
487 storage->next_snapshot_min = LLONG_MAX;
488 storage->next_snapshot_max = LLONG_MAX;
489 }
490}
491
492bool
493ovsdb_storage_should_snapshot(const struct ovsdb_storage *storage)
494{
495 if (storage->raft || storage->log) {
496 /* If we haven't reached the minimum snapshot time, don't snapshot. */
497 long long int now = time_msec();
498 if (now < storage->next_snapshot_min) {
499 return false;
500 }
501
502 /* If we can't snapshot right now, don't. */
503 if (storage->raft && !raft_may_snapshot(storage->raft)) {
504 return false;
505 }
506
507 uint64_t log_len = (storage->raft
508 ? raft_get_log_length(storage->raft)
509 : storage->n_read + storage->n_written);
510 if (now < storage->next_snapshot_max) {
511 /* Maximum snapshot time not yet reached. Take a snapshot if there
512 * have been at least 100 log entries and the log file size has
513 * grown a lot. */
514 bool grew_lots = (storage->raft
515 ? raft_grew_lots(storage->raft)
516 : ovsdb_log_grew_lots(storage->log));
517 return log_len >= 100 && grew_lots;
518 } else {
519 /* We have reached the maximum snapshot time. Take a snapshot if
520 * there have been any log entries at all. */
521 return log_len > 0;
522 }
523 }
524
525 return false;
526}
527
528static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
529ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
530 const struct json *schema,
531 const struct json *data)
532{
533 if (storage->raft) {
534 struct json *entries = json_array_create_empty();
535 if (schema) {
536 json_array_add(entries, json_clone(schema));
537 }
538 if (data) {
539 json_array_add(entries, json_clone(data));
540 }
541 struct ovsdb_error *error = raft_store_snapshot(storage->raft,
542 entries);
543 json_destroy(entries);
544 return error;
545 } else if (storage->log) {
546 struct json *entries[2];
547 size_t n = 0;
548 if (schema) {
549 entries[n++] = CONST_CAST(struct json *, schema);
550 }
551 if (data) {
552 entries[n++] = CONST_CAST(struct json *, data);
553 }
554 return ovsdb_log_replace(storage->log, entries, n);
555 } else {
556 return NULL;
557 }
558}
559
560/* 'schema' and 'data' should faithfully represent the current schema and data,
561 * otherwise the two storing backing formats will yield divergent results. Use
562 * ovsdb_storage_write_schema_change() to change the schema. */
563struct ovsdb_error * OVS_WARN_UNUSED_RESULT
564ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
565 const struct json *schema,
566 const struct json *data)
567{
568 struct ovsdb_error *error = ovsdb_storage_store_snapshot__(storage,
569 schema, data);
570 bool retry_quickly = error != NULL;
571 schedule_next_snapshot(storage, retry_quickly);
572 return error;
573}
574
575struct ovsdb_write * OVS_WARN_UNUSED_RESULT
576ovsdb_storage_write_schema_change(struct ovsdb_storage *storage,
577 const struct json *schema,
578 const struct json *data,
579 const struct uuid *prereq,
580 struct uuid *resultp)
581{
582 struct ovsdb_write *w = xzalloc(sizeof *w);
583 struct uuid result = UUID_ZERO;
584 if (storage->error) {
585 w->error = ovsdb_error_clone(storage->error);
586 } else if (storage->raft) {
587 struct json *txn_json = json_array_create_2(json_clone(schema),
588 json_clone(data));
589 w->command = raft_command_execute(storage->raft, txn_json,
590 prereq, &result);
591 json_destroy(txn_json);
592 } else if (storage->log) {
593 w->error = ovsdb_storage_store_snapshot__(storage, schema, data);
594 } else {
595 /* When 'error' and 'command' are both null, it indicates that the
596 * command is complete. This is fine since this unbacked storage drops
597 * writes. */
598 }
599 if (resultp) {
600 *resultp = result;
601 }
602 return w;
603}