]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/storage.c
stream: Allow timeout configuration for open_block.
[mirror_ovs.git] / ovsdb / storage.c
1
2 /* Copyright (c) 2009, 2010, 2011, 2016, 2017 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this storage except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18
19 #include "storage.h"
20 #include <string.h>
21 #include "log.h"
22 #include "ovsdb-error.h"
23 #include "openvswitch/json.h"
24 #include "openvswitch/poll-loop.h"
25 #include "openvswitch/vlog.h"
26 #include "ovsdb.h"
27 #include "raft.h"
28 #include "random.h"
29 #include "timeval.h"
30 #include "util.h"
31
32 VLOG_DEFINE_THIS_MODULE(storage);
33
34 struct ovsdb_storage {
35 /* There are three kinds of storage:
36 *
37 * - Standalone, backed by a disk file. 'log' is nonnull, 'raft' is
38 * null.
39 *
40 * - Clustered, backed by a Raft cluster. 'log' is null, 'raft' is
41 * nonnull.
42 *
43 * - Memory only, unbacked. 'log' and 'raft' are null. */
44 struct ovsdb_log *log;
45 struct raft *raft;
46
47 /* All kinds of storage. */
48 struct ovsdb_error *error; /* If nonnull, a permanent error. */
49 long long next_snapshot_min; /* Earliest time to take next snapshot. */
50 long long next_snapshot_max; /* Latest time to take next snapshot. */
51
52 /* Standalone only. */
53 unsigned int n_read;
54 unsigned int n_written;
55 };
56
57 static void schedule_next_snapshot(struct ovsdb_storage *, bool quick);
58
59 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
60 ovsdb_storage_open__(const char *filename, bool rw, bool allow_clustered,
61 struct ovsdb_storage **storagep)
62 {
63 *storagep = NULL;
64
65 struct ovsdb_log *log;
66 struct ovsdb_error *error;
67 error = ovsdb_log_open(filename, OVSDB_MAGIC"|"RAFT_MAGIC,
68 rw ? OVSDB_LOG_READ_WRITE : OVSDB_LOG_READ_ONLY,
69 -1, &log);
70 if (error) {
71 return error;
72 }
73
74 struct raft *raft = NULL;
75 if (!strcmp(ovsdb_log_get_magic(log), RAFT_MAGIC)) {
76 if (!allow_clustered) {
77 ovsdb_log_close(log);
78 return ovsdb_error(NULL, "%s: cannot apply this operation to "
79 "clustered database file", filename);
80 }
81 error = raft_open(log, &raft);
82 log = NULL;
83 if (error) {
84 return error;
85 }
86 }
87
88 struct ovsdb_storage *storage = xzalloc(sizeof *storage);
89 storage->log = log;
90 storage->raft = raft;
91 schedule_next_snapshot(storage, false);
92 *storagep = storage;
93 return NULL;
94 }
95
96 /* Opens 'filename' for use as storage. If 'rw', opens it for read/write
97 * access, otherwise read-only. If successful, stores the new storage in
98 * '*storagep' and returns NULL; on failure, stores NULL in '*storagep' and
99 * returns the error.
100 *
101 * The returned storage might be clustered or standalone, depending on what the
102 * disk file contains. */
103 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
104 ovsdb_storage_open(const char *filename, bool rw,
105 struct ovsdb_storage **storagep)
106 {
107 return ovsdb_storage_open__(filename, rw, true, storagep);
108 }
109
110 struct ovsdb_storage *
111 ovsdb_storage_open_standalone(const char *filename, bool rw)
112 {
113 struct ovsdb_storage *storage;
114 struct ovsdb_error *error = ovsdb_storage_open__(filename, rw, false,
115 &storage);
116 if (error) {
117 ovs_fatal(0, "%s", ovsdb_error_to_string_free(error));
118 }
119 return storage;
120 }
121
122 /* Creates and returns new storage without any backing. Nothing will be read
123 * from the storage, and writes are discarded. */
124 struct ovsdb_storage *
125 ovsdb_storage_create_unbacked(void)
126 {
127 struct ovsdb_storage *storage = xzalloc(sizeof *storage);
128 schedule_next_snapshot(storage, false);
129 return storage;
130 }
131
132 void
133 ovsdb_storage_close(struct ovsdb_storage *storage)
134 {
135 if (storage) {
136 ovsdb_log_close(storage->log);
137 raft_close(storage->raft);
138 ovsdb_error_destroy(storage->error);
139 free(storage);
140 }
141 }
142
143 const char *
144 ovsdb_storage_get_model(const struct ovsdb_storage *storage)
145 {
146 return storage->raft ? "clustered" : "standalone";
147 }
148
149 bool
150 ovsdb_storage_is_clustered(const struct ovsdb_storage *storage)
151 {
152 return storage->raft != NULL;
153 }
154
155 bool
156 ovsdb_storage_is_connected(const struct ovsdb_storage *storage)
157 {
158 return !storage->raft || raft_is_connected(storage->raft);
159 }
160
161 bool
162 ovsdb_storage_is_dead(const struct ovsdb_storage *storage)
163 {
164 return storage->raft && raft_left(storage->raft);
165 }
166
167 bool
168 ovsdb_storage_is_leader(const struct ovsdb_storage *storage)
169 {
170 return !storage->raft || raft_is_leader(storage->raft);
171 }
172
173 const struct uuid *
174 ovsdb_storage_get_cid(const struct ovsdb_storage *storage)
175 {
176 return storage->raft ? raft_get_cid(storage->raft) : NULL;
177 }
178
179 const struct uuid *
180 ovsdb_storage_get_sid(const struct ovsdb_storage *storage)
181 {
182 return storage->raft ? raft_get_sid(storage->raft) : NULL;
183 }
184
185 uint64_t
186 ovsdb_storage_get_applied_index(const struct ovsdb_storage *storage)
187 {
188 return storage->raft ? raft_get_applied_index(storage->raft) : 0;
189 }
190
191 void
192 ovsdb_storage_run(struct ovsdb_storage *storage)
193 {
194 if (storage->raft) {
195 raft_run(storage->raft);
196 }
197 }
198
199 void
200 ovsdb_storage_wait(struct ovsdb_storage *storage)
201 {
202 if (storage->raft) {
203 raft_wait(storage->raft);
204 }
205 }
206
207 /* Returns 'storage''s embedded name, if it has one, otherwise null.
208 *
209 * Only clustered storage has a built-in name. */
210 const char *
211 ovsdb_storage_get_name(const struct ovsdb_storage *storage)
212 {
213 return storage->raft ? raft_get_name(storage->raft) : NULL;
214 }
215
216 /* Attempts to read a log record from 'storage'.
217 *
218 * If successful, returns NULL and stores the transaction information in
219 * '*schemap', '*txnp', and '*txnid'. At least one of these will be nonnull.
220 * The caller owns the data and must eventually free it (with json_destroy()).
221 *
222 * If 'storage' is not clustered, 'txnid' may be null.
223 *
224 * If a read error occurs, returns the error and stores NULL in '*jsonp'.
225 *
226 * If the read reaches end of file, returns NULL and stores NULL in
227 * '*jsonp'. */
228 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
229 ovsdb_storage_read(struct ovsdb_storage *storage,
230 struct ovsdb_schema **schemap,
231 struct json **txnp,
232 struct uuid *txnid)
233 {
234 *schemap = NULL;
235 *txnp = NULL;
236 if (txnid) {
237 *txnid = UUID_ZERO;
238 }
239
240 struct json *json;
241 struct json *schema_json = NULL;
242 struct json *txn_json = NULL;
243 if (storage->raft) {
244 bool is_snapshot;
245 json = json_nullable_clone(
246 raft_next_entry(storage->raft, txnid, &is_snapshot));
247 if (!json) {
248 return NULL;
249 } else if (json->type != JSON_ARRAY || json->array.n != 2) {
250 json_destroy(json);
251 return ovsdb_error(NULL, "invalid commit format");
252 }
253
254 struct json **e = json->array.elems;
255 schema_json = e[0]->type != JSON_NULL ? e[0] : NULL;
256 txn_json = e[1]->type != JSON_NULL ? e[1] : NULL;
257 } else if (storage->log) {
258 struct ovsdb_error *error = ovsdb_log_read(storage->log, &json);
259 if (error || !json) {
260 return error;
261 }
262
263 unsigned int n = storage->n_read++;
264 struct json **jsonp = !n ? &schema_json : &txn_json;
265 *jsonp = json;
266 if (n == 1) {
267 ovsdb_log_mark_base(storage->log);
268 }
269 } else {
270 /* Unbacked. Nothing to do. */
271 return NULL;
272 }
273
274 /* If we got this far then we must have at least a schema or a
275 * transaction. */
276 ovs_assert(schema_json || txn_json);
277
278 if (schema_json) {
279 struct ovsdb_schema *schema;
280 struct ovsdb_error *error = ovsdb_schema_from_json(schema_json,
281 &schema);
282 if (error) {
283 json_destroy(json);
284 return error;
285 }
286
287 const char *storage_name = ovsdb_storage_get_name(storage);
288 const char *schema_name = schema->name;
289 if (storage_name && strcmp(storage_name, schema_name)) {
290 error = ovsdb_error(NULL, "name %s in header does not match "
291 "name %s in schema",
292 storage_name, schema_name);
293 json_destroy(json);
294 ovsdb_schema_destroy(schema);
295 return error;
296 }
297
298 *schemap = schema;
299 }
300
301 if (txn_json) {
302 *txnp = json_clone(txn_json);
303 }
304
305 json_destroy(json);
306 return NULL;
307 }
308
309 /* Reads and returns the schema from standalone storage 'storage'. Terminates
310 * with an error on failure. */
311 struct ovsdb_schema *
312 ovsdb_storage_read_schema(struct ovsdb_storage *storage)
313 {
314 ovs_assert(storage->log);
315
316 struct json *txn_json;
317 struct ovsdb_schema *schema;
318 struct ovsdb_error *error = ovsdb_storage_read(storage, &schema,
319 &txn_json, NULL);
320 if (error) {
321 ovs_fatal(0, "%s", ovsdb_error_to_string_free(error));
322 }
323 if (!schema && !txn_json) {
324 ovs_fatal(0, "unexpected end of file reading schema");
325 }
326 ovs_assert(schema && !txn_json);
327
328 return schema;
329 }
330
331 bool
332 ovsdb_storage_read_wait(struct ovsdb_storage *storage)
333 {
334 return (storage->raft
335 ? raft_has_next_entry(storage->raft)
336 : false);
337 }
338
339 void
340 ovsdb_storage_unread(struct ovsdb_storage *storage)
341 {
342 if (storage->error) {
343 return;
344 }
345
346 if (storage->raft) {
347 if (!storage->error) {
348 storage->error = ovsdb_error(NULL, "inconsistent data");
349 }
350 } else if (storage->log) {
351 ovsdb_log_unread(storage->log);
352 }
353 }
354
355 struct ovsdb_write {
356 struct ovsdb_error *error;
357 struct raft_command *command;
358 };
359
360 /* Not suitable for writing transactions that change the schema. */
361 struct ovsdb_write * OVS_WARN_UNUSED_RESULT
362 ovsdb_storage_write(struct ovsdb_storage *storage, const struct json *data,
363 const struct uuid *prereq, struct uuid *resultp,
364 bool durable)
365 {
366 struct ovsdb_write *w = xzalloc(sizeof *w);
367 struct uuid result = UUID_ZERO;
368 if (storage->error) {
369 w->error = ovsdb_error_clone(storage->error);
370 } else if (storage->raft) {
371 struct json *txn_json = json_array_create_2(json_null_create(),
372 json_clone(data));
373 w->command = raft_command_execute(storage->raft, txn_json,
374 prereq, &result);
375 json_destroy(txn_json);
376 } else if (storage->log) {
377 w->error = ovsdb_log_write(storage->log, data);
378 if (!w->error) {
379 storage->n_written++;
380 if (durable) {
381 w->error = ovsdb_log_commit_block(storage->log);
382 }
383 }
384 } else {
385 /* When 'error' and 'command' are both null, it indicates that the
386 * command is complete. This is fine since this unbacked storage drops
387 * writes. */
388 }
389 if (resultp) {
390 *resultp = result;
391 }
392 return w;
393 }
394
395 /* Not suitable for writing transactions that change the schema. */
396 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
397 ovsdb_storage_write_block(struct ovsdb_storage *storage,
398 const struct json *data, const struct uuid *prereq,
399 struct uuid *resultp, bool durable)
400 {
401 struct ovsdb_write *w = ovsdb_storage_write(storage, data,
402 prereq, resultp, durable);
403 while (!ovsdb_write_is_complete(w)) {
404 if (storage->raft) {
405 raft_run(storage->raft);
406 }
407
408 ovsdb_write_wait(w);
409 if (storage->raft) {
410 raft_wait(storage->raft);
411 }
412 poll_block();
413 }
414
415 struct ovsdb_error *error = ovsdb_error_clone(ovsdb_write_get_error(w));
416 ovsdb_write_destroy(w);
417 return error;
418 }
419
420 bool
421 ovsdb_write_is_complete(const struct ovsdb_write *w)
422 {
423 return (w->error
424 || !w->command
425 || raft_command_get_status(w->command) != RAFT_CMD_INCOMPLETE);
426 }
427
428 const struct ovsdb_error *
429 ovsdb_write_get_error(const struct ovsdb_write *w_)
430 {
431 struct ovsdb_write *w = CONST_CAST(struct ovsdb_write *, w_);
432 ovs_assert(ovsdb_write_is_complete(w));
433
434 if (w->command && !w->error) {
435 enum raft_command_status status = raft_command_get_status(w->command);
436 if (status != RAFT_CMD_SUCCESS) {
437 w->error = ovsdb_error("cluster error", "%s",
438 raft_command_status_to_string(status));
439 }
440 }
441
442 return w->error;
443 }
444
445 uint64_t
446 ovsdb_write_get_commit_index(const struct ovsdb_write *w)
447 {
448 ovs_assert(ovsdb_write_is_complete(w));
449 return (w->command && !w->error
450 ? raft_command_get_commit_index(w->command)
451 : 0);
452 }
453
454 void
455 ovsdb_write_wait(const struct ovsdb_write *w)
456 {
457 if (ovsdb_write_is_complete(w)) {
458 poll_immediate_wake();
459 }
460 }
461
462 void
463 ovsdb_write_destroy(struct ovsdb_write *w)
464 {
465 if (w) {
466 raft_command_unref(w->command);
467 ovsdb_error_destroy(w->error);
468 free(w);
469 }
470 }
471
472 static void
473 schedule_next_snapshot(struct ovsdb_storage *storage, bool quick)
474 {
475 if (storage->log || storage->raft) {
476 unsigned int base = 10 * 60 * 1000; /* 10 minutes */
477 unsigned int range = 10 * 60 * 1000; /* 10 minutes */
478 if (quick) {
479 base /= 10;
480 range /= 10;
481 }
482
483 long long int now = time_msec();
484 storage->next_snapshot_min = now + base + random_range(range);
485 storage->next_snapshot_max = now + 60LL * 60 * 24 * 1000; /* 1 day */
486 } else {
487 storage->next_snapshot_min = LLONG_MAX;
488 storage->next_snapshot_max = LLONG_MAX;
489 }
490 }
491
492 bool
493 ovsdb_storage_should_snapshot(const struct ovsdb_storage *storage)
494 {
495 if (storage->raft || storage->log) {
496 /* If we haven't reached the minimum snapshot time, don't snapshot. */
497 long long int now = time_msec();
498 if (now < storage->next_snapshot_min) {
499 return false;
500 }
501
502 /* If we can't snapshot right now, don't. */
503 if (storage->raft && !raft_may_snapshot(storage->raft)) {
504 return false;
505 }
506
507 uint64_t log_len = (storage->raft
508 ? raft_get_log_length(storage->raft)
509 : storage->n_read + storage->n_written);
510 if (now < storage->next_snapshot_max) {
511 /* Maximum snapshot time not yet reached. Take a snapshot if there
512 * have been at least 100 log entries and the log file size has
513 * grown a lot. */
514 bool grew_lots = (storage->raft
515 ? raft_grew_lots(storage->raft)
516 : ovsdb_log_grew_lots(storage->log));
517 return log_len >= 100 && grew_lots;
518 } else {
519 /* We have reached the maximum snapshot time. Take a snapshot if
520 * there have been any log entries at all. */
521 return log_len > 0;
522 }
523 }
524
525 return false;
526 }
527
528 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
529 ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
530 const struct json *schema,
531 const struct json *data)
532 {
533 if (storage->raft) {
534 struct json *entries = json_array_create_empty();
535 if (schema) {
536 json_array_add(entries, json_clone(schema));
537 }
538 if (data) {
539 json_array_add(entries, json_clone(data));
540 }
541 struct ovsdb_error *error = raft_store_snapshot(storage->raft,
542 entries);
543 json_destroy(entries);
544 return error;
545 } else if (storage->log) {
546 struct json *entries[2];
547 size_t n = 0;
548 if (schema) {
549 entries[n++] = CONST_CAST(struct json *, schema);
550 }
551 if (data) {
552 entries[n++] = CONST_CAST(struct json *, data);
553 }
554 return ovsdb_log_replace(storage->log, entries, n);
555 } else {
556 return NULL;
557 }
558 }
559
560 /* 'schema' and 'data' should faithfully represent the current schema and data,
561 * otherwise the two storing backing formats will yield divergent results. Use
562 * ovsdb_storage_write_schema_change() to change the schema. */
563 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
564 ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
565 const struct json *schema,
566 const struct json *data)
567 {
568 struct ovsdb_error *error = ovsdb_storage_store_snapshot__(storage,
569 schema, data);
570 bool retry_quickly = error != NULL;
571 schedule_next_snapshot(storage, retry_quickly);
572 return error;
573 }
574
575 struct ovsdb_write * OVS_WARN_UNUSED_RESULT
576 ovsdb_storage_write_schema_change(struct ovsdb_storage *storage,
577 const struct json *schema,
578 const struct json *data,
579 const struct uuid *prereq,
580 struct uuid *resultp)
581 {
582 struct ovsdb_write *w = xzalloc(sizeof *w);
583 struct uuid result = UUID_ZERO;
584 if (storage->error) {
585 w->error = ovsdb_error_clone(storage->error);
586 } else if (storage->raft) {
587 struct json *txn_json = json_array_create_2(json_clone(schema),
588 json_clone(data));
589 w->command = raft_command_execute(storage->raft, txn_json,
590 prereq, &result);
591 json_destroy(txn_json);
592 } else if (storage->log) {
593 w->error = ovsdb_storage_store_snapshot__(storage, schema, data);
594 } else {
595 /* When 'error' and 'command' are both null, it indicates that the
596 * command is complete. This is fine since this unbacked storage drops
597 * writes. */
598 }
599 if (resultp) {
600 *resultp = result;
601 }
602 return w;
603 }