]> git.proxmox.com Git - ovs.git/blob - ovsdb/storage.c
bump version to 2.15.0+ds1-2+deb11u3.1
[ovs.git] / ovsdb / storage.c
1
2 /* Copyright (c) 2009, 2010, 2011, 2016, 2017 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this storage except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18
19 #include "storage.h"
20 #include <string.h>
21 #include "log.h"
22 #include "ovsdb-error.h"
23 #include "openvswitch/json.h"
24 #include "openvswitch/poll-loop.h"
25 #include "openvswitch/vlog.h"
26 #include "ovsdb.h"
27 #include "raft.h"
28 #include "random.h"
29 #include "simap.h"
30 #include "timeval.h"
31 #include "util.h"
32
33 VLOG_DEFINE_THIS_MODULE(storage);
34
35 struct ovsdb_storage {
36 /* There are three kinds of storage:
37 *
38 * - Standalone, backed by a disk file. 'log' is nonnull, 'raft' is
39 * null.
40 *
41 * - Clustered, backed by a Raft cluster. 'log' is null, 'raft' is
42 * nonnull.
43 *
44 * - Memory only, unbacked. 'log' and 'raft' are null. */
45 struct ovsdb_log *log;
46 struct raft *raft;
47
48 /* All kinds of storage. */
49 struct ovsdb_error *error; /* If nonnull, a permanent error. */
50 long long next_snapshot_min; /* Earliest time to take next snapshot. */
51 long long next_snapshot_max; /* Latest time to take next snapshot. */
52
53 /* Standalone only. */
54 unsigned int n_read;
55 unsigned int n_written;
56 };
57
58 static void schedule_next_snapshot(struct ovsdb_storage *, bool quick);
59
60 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
61 ovsdb_storage_open__(const char *filename, bool rw, bool allow_clustered,
62 struct ovsdb_storage **storagep)
63 {
64 *storagep = NULL;
65
66 struct ovsdb_log *log;
67 struct ovsdb_error *error;
68 error = ovsdb_log_open(filename, OVSDB_MAGIC"|"RAFT_MAGIC,
69 rw ? OVSDB_LOG_READ_WRITE : OVSDB_LOG_READ_ONLY,
70 -1, &log);
71 if (error) {
72 return error;
73 }
74
75 struct raft *raft = NULL;
76 if (!strcmp(ovsdb_log_get_magic(log), RAFT_MAGIC)) {
77 if (!allow_clustered) {
78 ovsdb_log_close(log);
79 return ovsdb_error(NULL, "%s: cannot apply this operation to "
80 "clustered database file", filename);
81 }
82 error = raft_open(log, &raft);
83 log = NULL;
84 if (error) {
85 return error;
86 }
87 }
88
89 struct ovsdb_storage *storage = xzalloc(sizeof *storage);
90 storage->log = log;
91 storage->raft = raft;
92 schedule_next_snapshot(storage, false);
93 *storagep = storage;
94 return NULL;
95 }
96
97 /* Opens 'filename' for use as storage. If 'rw', opens it for read/write
98 * access, otherwise read-only. If successful, stores the new storage in
99 * '*storagep' and returns NULL; on failure, stores NULL in '*storagep' and
100 * returns the error.
101 *
102 * The returned storage might be clustered or standalone, depending on what the
103 * disk file contains. */
104 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
105 ovsdb_storage_open(const char *filename, bool rw,
106 struct ovsdb_storage **storagep)
107 {
108 return ovsdb_storage_open__(filename, rw, true, storagep);
109 }
110
111 struct ovsdb_storage *
112 ovsdb_storage_open_standalone(const char *filename, bool rw)
113 {
114 struct ovsdb_storage *storage;
115 struct ovsdb_error *error = ovsdb_storage_open__(filename, rw, false,
116 &storage);
117 if (error) {
118 ovs_fatal(0, "%s", ovsdb_error_to_string_free(error));
119 }
120 return storage;
121 }
122
123 /* Creates and returns new storage without any backing. Nothing will be read
124 * from the storage, and writes are discarded. */
125 struct ovsdb_storage *
126 ovsdb_storage_create_unbacked(void)
127 {
128 struct ovsdb_storage *storage = xzalloc(sizeof *storage);
129 schedule_next_snapshot(storage, false);
130 return storage;
131 }
132
133 void
134 ovsdb_storage_close(struct ovsdb_storage *storage)
135 {
136 if (storage) {
137 ovsdb_log_close(storage->log);
138 raft_close(storage->raft);
139 ovsdb_error_destroy(storage->error);
140 free(storage);
141 }
142 }
143
144 const char *
145 ovsdb_storage_get_model(const struct ovsdb_storage *storage)
146 {
147 return storage->raft ? "clustered" : "standalone";
148 }
149
150 bool
151 ovsdb_storage_is_clustered(const struct ovsdb_storage *storage)
152 {
153 return storage->raft != NULL;
154 }
155
156 bool
157 ovsdb_storage_is_connected(const struct ovsdb_storage *storage)
158 {
159 return !storage->raft || raft_is_connected(storage->raft);
160 }
161
162 bool
163 ovsdb_storage_is_dead(const struct ovsdb_storage *storage)
164 {
165 return storage->raft && raft_left(storage->raft);
166 }
167
168 bool
169 ovsdb_storage_is_leader(const struct ovsdb_storage *storage)
170 {
171 return !storage->raft || raft_is_leader(storage->raft);
172 }
173
174 const struct uuid *
175 ovsdb_storage_get_cid(const struct ovsdb_storage *storage)
176 {
177 return storage->raft ? raft_get_cid(storage->raft) : NULL;
178 }
179
180 const struct uuid *
181 ovsdb_storage_get_sid(const struct ovsdb_storage *storage)
182 {
183 return storage->raft ? raft_get_sid(storage->raft) : NULL;
184 }
185
186 uint64_t
187 ovsdb_storage_get_applied_index(const struct ovsdb_storage *storage)
188 {
189 return storage->raft ? raft_get_applied_index(storage->raft) : 0;
190 }
191
192 void
193 ovsdb_storage_get_memory_usage(const struct ovsdb_storage *storage,
194 struct simap *usage)
195 {
196 if (storage->raft) {
197 raft_get_memory_usage(storage->raft, usage);
198 }
199 }
200
201 char *
202 ovsdb_storage_get_error(const struct ovsdb_storage *storage)
203 {
204 if (storage->error) {
205 return ovsdb_error_to_string(storage->error);
206 }
207
208 return NULL;
209 }
210
211 void
212 ovsdb_storage_run(struct ovsdb_storage *storage)
213 {
214 if (storage->raft) {
215 raft_run(storage->raft);
216 }
217 }
218
219 void
220 ovsdb_storage_wait(struct ovsdb_storage *storage)
221 {
222 if (storage->raft) {
223 raft_wait(storage->raft);
224 }
225 }
226
227 /* Returns 'storage''s embedded name, if it has one, otherwise null.
228 *
229 * Only clustered storage has a built-in name. */
230 const char *
231 ovsdb_storage_get_name(const struct ovsdb_storage *storage)
232 {
233 return storage->raft ? raft_get_name(storage->raft) : NULL;
234 }
235
236 /* Attempts to read a log record from 'storage'.
237 *
238 * If successful, returns NULL and stores the transaction information in
239 * '*schemap', '*txnp', and '*txnid'. At least one of these will be nonnull.
240 * The caller owns the data and must eventually free it (with json_destroy()).
241 *
242 * If 'storage' is not clustered, 'txnid' may be null.
243 *
244 * If a read error occurs, returns the error and stores NULL in '*jsonp'.
245 *
246 * If the read reaches end of file, returns NULL and stores NULL in
247 * '*jsonp'. */
248 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
249 ovsdb_storage_read(struct ovsdb_storage *storage,
250 struct ovsdb_schema **schemap,
251 struct json **txnp,
252 struct uuid *txnid)
253 {
254 *schemap = NULL;
255 *txnp = NULL;
256 if (txnid) {
257 *txnid = UUID_ZERO;
258 }
259
260 struct json *json;
261 struct json *schema_json = NULL;
262 struct json *txn_json = NULL;
263 if (storage->raft) {
264 bool is_snapshot;
265 json = json_nullable_clone(
266 raft_next_entry(storage->raft, txnid, &is_snapshot));
267 if (!json) {
268 return NULL;
269 } else if (json->type != JSON_ARRAY || json->array.n != 2) {
270 json_destroy(json);
271 return ovsdb_error(NULL, "invalid commit format");
272 }
273
274 struct json **e = json->array.elems;
275 schema_json = e[0]->type != JSON_NULL ? e[0] : NULL;
276 txn_json = e[1]->type != JSON_NULL ? e[1] : NULL;
277 } else if (storage->log) {
278 struct ovsdb_error *error = ovsdb_log_read(storage->log, &json);
279 if (error || !json) {
280 return error;
281 }
282
283 unsigned int n = storage->n_read++;
284 struct json **jsonp = !n ? &schema_json : &txn_json;
285 *jsonp = json;
286 if (n == 1) {
287 ovsdb_log_mark_base(storage->log);
288 }
289 } else {
290 /* Unbacked. Nothing to do. */
291 return NULL;
292 }
293
294 /* If we got this far then we must have at least a schema or a
295 * transaction. */
296 ovs_assert(schema_json || txn_json);
297
298 if (schema_json) {
299 struct ovsdb_schema *schema;
300 struct ovsdb_error *error = ovsdb_schema_from_json(schema_json,
301 &schema);
302 if (error) {
303 json_destroy(json);
304 return error;
305 }
306
307 const char *storage_name = ovsdb_storage_get_name(storage);
308 const char *schema_name = schema->name;
309 if (storage_name && strcmp(storage_name, schema_name)) {
310 error = ovsdb_error(NULL, "name %s in header does not match "
311 "name %s in schema",
312 storage_name, schema_name);
313 json_destroy(json);
314 ovsdb_schema_destroy(schema);
315 return error;
316 }
317
318 *schemap = schema;
319 }
320
321 if (txn_json) {
322 *txnp = json_clone(txn_json);
323 }
324
325 json_destroy(json);
326 return NULL;
327 }
328
329 /* Reads and returns the schema from standalone storage 'storage'. Terminates
330 * with an error on failure. */
331 struct ovsdb_schema *
332 ovsdb_storage_read_schema(struct ovsdb_storage *storage)
333 {
334 ovs_assert(storage->log);
335
336 struct json *txn_json;
337 struct ovsdb_schema *schema;
338 struct ovsdb_error *error = ovsdb_storage_read(storage, &schema,
339 &txn_json, NULL);
340 if (error) {
341 ovs_fatal(0, "%s", ovsdb_error_to_string_free(error));
342 }
343 if (!schema && !txn_json) {
344 ovs_fatal(0, "unexpected end of file reading schema");
345 }
346 ovs_assert(schema && !txn_json);
347
348 return schema;
349 }
350
351 bool
352 ovsdb_storage_read_wait(struct ovsdb_storage *storage)
353 {
354 return (storage->raft
355 ? raft_has_next_entry(storage->raft)
356 : false);
357 }
358
359 void
360 ovsdb_storage_unread(struct ovsdb_storage *storage)
361 {
362 if (storage->error) {
363 return;
364 }
365
366 if (storage->raft) {
367 if (!storage->error) {
368 storage->error = ovsdb_error(NULL, "inconsistent data");
369 }
370 } else if (storage->log) {
371 ovsdb_log_unread(storage->log);
372 }
373 }
374
375 struct ovsdb_write {
376 struct ovsdb_error *error;
377 struct raft_command *command;
378 };
379
380 /* Not suitable for writing transactions that change the schema. */
381 struct ovsdb_write * OVS_WARN_UNUSED_RESULT
382 ovsdb_storage_write(struct ovsdb_storage *storage, const struct json *data,
383 const struct uuid *prereq, struct uuid *resultp,
384 bool durable)
385 {
386 struct ovsdb_write *w = xzalloc(sizeof *w);
387 struct uuid result = UUID_ZERO;
388 if (storage->error) {
389 w->error = ovsdb_error_clone(storage->error);
390 } else if (storage->raft) {
391 struct json *txn_json = json_array_create_2(json_null_create(),
392 json_clone(data));
393 w->command = raft_command_execute(storage->raft, txn_json,
394 prereq, &result);
395 json_destroy(txn_json);
396 } else if (storage->log) {
397 w->error = ovsdb_log_write(storage->log, data);
398 if (!w->error) {
399 storage->n_written++;
400 if (durable) {
401 w->error = ovsdb_log_commit_block(storage->log);
402 }
403 }
404 } else {
405 /* When 'error' and 'command' are both null, it indicates that the
406 * command is complete. This is fine since this unbacked storage drops
407 * writes. */
408 }
409 if (resultp) {
410 *resultp = result;
411 }
412 return w;
413 }
414
415 /* Not suitable for writing transactions that change the schema. */
416 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
417 ovsdb_storage_write_block(struct ovsdb_storage *storage,
418 const struct json *data, const struct uuid *prereq,
419 struct uuid *resultp, bool durable)
420 {
421 struct ovsdb_write *w = ovsdb_storage_write(storage, data,
422 prereq, resultp, durable);
423 while (!ovsdb_write_is_complete(w)) {
424 if (storage->raft) {
425 raft_run(storage->raft);
426 }
427
428 ovsdb_write_wait(w);
429 if (storage->raft) {
430 raft_wait(storage->raft);
431 }
432 poll_block();
433 }
434
435 struct ovsdb_error *error = ovsdb_error_clone(ovsdb_write_get_error(w));
436 ovsdb_write_destroy(w);
437 return error;
438 }
439
440 bool
441 ovsdb_write_is_complete(const struct ovsdb_write *w)
442 {
443 return (w->error
444 || !w->command
445 || raft_command_get_status(w->command) != RAFT_CMD_INCOMPLETE);
446 }
447
448 const struct ovsdb_error *
449 ovsdb_write_get_error(const struct ovsdb_write *w_)
450 {
451 struct ovsdb_write *w = CONST_CAST(struct ovsdb_write *, w_);
452 ovs_assert(ovsdb_write_is_complete(w));
453
454 if (w->command && !w->error) {
455 enum raft_command_status status = raft_command_get_status(w->command);
456 if (status != RAFT_CMD_SUCCESS) {
457 w->error = ovsdb_error("cluster error", "%s",
458 raft_command_status_to_string(status));
459 }
460 }
461
462 return w->error;
463 }
464
465 uint64_t
466 ovsdb_write_get_commit_index(const struct ovsdb_write *w)
467 {
468 ovs_assert(ovsdb_write_is_complete(w));
469 return (w->command && !w->error
470 ? raft_command_get_commit_index(w->command)
471 : 0);
472 }
473
474 void
475 ovsdb_write_wait(const struct ovsdb_write *w)
476 {
477 if (ovsdb_write_is_complete(w)) {
478 poll_immediate_wake();
479 }
480 }
481
482 void
483 ovsdb_write_destroy(struct ovsdb_write *w)
484 {
485 if (w) {
486 raft_command_unref(w->command);
487 ovsdb_error_destroy(w->error);
488 free(w);
489 }
490 }
491
492 static void
493 schedule_next_snapshot(struct ovsdb_storage *storage, bool quick)
494 {
495 if (storage->log || storage->raft) {
496 unsigned int base = 10 * 60 * 1000; /* 10 minutes */
497 unsigned int range = 10 * 60 * 1000; /* 10 minutes */
498 if (quick) {
499 base /= 10;
500 range /= 10;
501 }
502
503 long long int now = time_msec();
504 storage->next_snapshot_min = now + base + random_range(range);
505 storage->next_snapshot_max = now + 60LL * 60 * 24 * 1000; /* 1 day */
506 } else {
507 storage->next_snapshot_min = LLONG_MAX;
508 storage->next_snapshot_max = LLONG_MAX;
509 }
510 }
511
512 bool
513 ovsdb_storage_should_snapshot(const struct ovsdb_storage *storage)
514 {
515 if (storage->raft || storage->log) {
516 /* If we haven't reached the minimum snapshot time, don't snapshot. */
517 long long int now = time_msec();
518 if (now < storage->next_snapshot_min) {
519 return false;
520 }
521
522 /* If we can't snapshot right now, don't. */
523 if (storage->raft && !raft_may_snapshot(storage->raft)) {
524 return false;
525 }
526
527 uint64_t log_len = (storage->raft
528 ? raft_get_log_length(storage->raft)
529 : storage->n_read + storage->n_written);
530 if (now < storage->next_snapshot_max) {
531 /* Maximum snapshot time not yet reached. Take a snapshot if there
532 * have been at least 100 log entries and the log file size has
533 * grown a lot. */
534 bool grew_lots = (storage->raft
535 ? raft_grew_lots(storage->raft)
536 : ovsdb_log_grew_lots(storage->log));
537 return log_len >= 100 && grew_lots;
538 } else {
539 /* We have reached the maximum snapshot time. Take a snapshot if
540 * there have been any log entries at all. */
541 return log_len > 0;
542 }
543 }
544
545 return false;
546 }
547
548 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
549 ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
550 const struct json *schema,
551 const struct json *data)
552 {
553 if (storage->raft) {
554 struct json *entries = json_array_create_empty();
555 if (schema) {
556 json_array_add(entries, json_clone(schema));
557 }
558 if (data) {
559 json_array_add(entries, json_clone(data));
560 }
561 struct ovsdb_error *error = raft_store_snapshot(storage->raft,
562 entries);
563 json_destroy(entries);
564 return error;
565 } else if (storage->log) {
566 struct json *entries[2];
567 size_t n = 0;
568 if (schema) {
569 entries[n++] = CONST_CAST(struct json *, schema);
570 }
571 if (data) {
572 entries[n++] = CONST_CAST(struct json *, data);
573 }
574 return ovsdb_log_replace(storage->log, entries, n);
575 } else {
576 return NULL;
577 }
578 }
579
580 /* 'schema' and 'data' should faithfully represent the current schema and data,
581 * otherwise the two storing backing formats will yield divergent results. Use
582 * ovsdb_storage_write_schema_change() to change the schema. */
583 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
584 ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
585 const struct json *schema,
586 const struct json *data)
587 {
588 struct ovsdb_error *error = ovsdb_storage_store_snapshot__(storage,
589 schema, data);
590 bool retry_quickly = error != NULL;
591 schedule_next_snapshot(storage, retry_quickly);
592 return error;
593 }
594
595 struct ovsdb_write * OVS_WARN_UNUSED_RESULT
596 ovsdb_storage_write_schema_change(struct ovsdb_storage *storage,
597 const struct json *schema,
598 const struct json *data,
599 const struct uuid *prereq,
600 struct uuid *resultp)
601 {
602 struct ovsdb_write *w = xzalloc(sizeof *w);
603 struct uuid result = UUID_ZERO;
604 if (storage->error) {
605 w->error = ovsdb_error_clone(storage->error);
606 } else if (storage->raft) {
607 struct json *txn_json = json_array_create_2(json_clone(schema),
608 json_clone(data));
609 w->command = raft_command_execute(storage->raft, txn_json,
610 prereq, &result);
611 json_destroy(txn_json);
612 } else if (storage->log) {
613 w->error = ovsdb_storage_store_snapshot__(storage, schema, data);
614 } else {
615 /* When 'error' and 'command' are both null, it indicates that the
616 * command is complete. This is fine since this unbacked storage drops
617 * writes. */
618 }
619 if (resultp) {
620 *resultp = result;
621 }
622 return w;
623 }
624
625 const struct uuid *
626 ovsdb_storage_peek_last_eid(struct ovsdb_storage *storage)
627 {
628 if (!storage->raft) {
629 return NULL;
630 }
631 return raft_current_eid(storage->raft);
632 }