]> git.proxmox.com Git - mirror_ovs.git/blame - ovsdb/storage.c
ovsdb: Add raft memory usage to memory report.
[mirror_ovs.git] / ovsdb / storage.c
CommitLineData
1b1d2e6d
BP
1
2/* Copyright (c) 2009, 2010, 2011, 2016, 2017 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this storage except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <config.h>
18
19#include "storage.h"
20#include <string.h>
21#include "log.h"
22#include "ovsdb-error.h"
23#include "openvswitch/json.h"
24#include "openvswitch/poll-loop.h"
25#include "openvswitch/vlog.h"
26#include "ovsdb.h"
27#include "raft.h"
28#include "random.h"
3423cd97 29#include "simap.h"
1b1d2e6d
BP
30#include "timeval.h"
31#include "util.h"
32
33VLOG_DEFINE_THIS_MODULE(storage);
34
35struct ovsdb_storage {
36 /* There are three kinds of storage:
37 *
38 * - Standalone, backed by a disk file. 'log' is nonnull, 'raft' is
39 * null.
40 *
41 * - Clustered, backed by a Raft cluster. 'log' is null, 'raft' is
42 * nonnull.
43 *
44 * - Memory only, unbacked. 'log' and 'raft' are null. */
45 struct ovsdb_log *log;
46 struct raft *raft;
47
48 /* All kinds of storage. */
49 struct ovsdb_error *error; /* If nonnull, a permanent error. */
50 long long next_snapshot_min; /* Earliest time to take next snapshot. */
51 long long next_snapshot_max; /* Latest time to take next snapshot. */
52
53 /* Standalone only. */
54 unsigned int n_read;
55 unsigned int n_written;
56};
57
58static void schedule_next_snapshot(struct ovsdb_storage *, bool quick);
59
60static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
61ovsdb_storage_open__(const char *filename, bool rw, bool allow_clustered,
62 struct ovsdb_storage **storagep)
63{
64 *storagep = NULL;
65
66 struct ovsdb_log *log;
67 struct ovsdb_error *error;
68 error = ovsdb_log_open(filename, OVSDB_MAGIC"|"RAFT_MAGIC,
69 rw ? OVSDB_LOG_READ_WRITE : OVSDB_LOG_READ_ONLY,
70 -1, &log);
71 if (error) {
72 return error;
73 }
74
75 struct raft *raft = NULL;
76 if (!strcmp(ovsdb_log_get_magic(log), RAFT_MAGIC)) {
77 if (!allow_clustered) {
78 ovsdb_log_close(log);
79 return ovsdb_error(NULL, "%s: cannot apply this operation to "
80 "clustered database file", filename);
81 }
82 error = raft_open(log, &raft);
83 log = NULL;
84 if (error) {
85 return error;
86 }
87 }
88
89 struct ovsdb_storage *storage = xzalloc(sizeof *storage);
90 storage->log = log;
91 storage->raft = raft;
92 schedule_next_snapshot(storage, false);
93 *storagep = storage;
94 return NULL;
95}
96
97/* Opens 'filename' for use as storage. If 'rw', opens it for read/write
98 * access, otherwise read-only. If successful, stores the new storage in
99 * '*storagep' and returns NULL; on failure, stores NULL in '*storagep' and
100 * returns the error.
101 *
102 * The returned storage might be clustered or standalone, depending on what the
103 * disk file contains. */
104struct ovsdb_error * OVS_WARN_UNUSED_RESULT
105ovsdb_storage_open(const char *filename, bool rw,
106 struct ovsdb_storage **storagep)
107{
108 return ovsdb_storage_open__(filename, rw, true, storagep);
109}
110
111struct ovsdb_storage *
112ovsdb_storage_open_standalone(const char *filename, bool rw)
113{
114 struct ovsdb_storage *storage;
115 struct ovsdb_error *error = ovsdb_storage_open__(filename, rw, false,
116 &storage);
117 if (error) {
118 ovs_fatal(0, "%s", ovsdb_error_to_string_free(error));
119 }
120 return storage;
121}
122
123/* Creates and returns new storage without any backing. Nothing will be read
124 * from the storage, and writes are discarded. */
125struct ovsdb_storage *
126ovsdb_storage_create_unbacked(void)
127{
128 struct ovsdb_storage *storage = xzalloc(sizeof *storage);
129 schedule_next_snapshot(storage, false);
130 return storage;
131}
132
133void
134ovsdb_storage_close(struct ovsdb_storage *storage)
135{
136 if (storage) {
137 ovsdb_log_close(storage->log);
138 raft_close(storage->raft);
139 ovsdb_error_destroy(storage->error);
140 free(storage);
141 }
142}
143
144const char *
145ovsdb_storage_get_model(const struct ovsdb_storage *storage)
146{
147 return storage->raft ? "clustered" : "standalone";
148}
149
150bool
151ovsdb_storage_is_clustered(const struct ovsdb_storage *storage)
152{
153 return storage->raft != NULL;
154}
155
156bool
157ovsdb_storage_is_connected(const struct ovsdb_storage *storage)
158{
159 return !storage->raft || raft_is_connected(storage->raft);
160}
161
162bool
163ovsdb_storage_is_dead(const struct ovsdb_storage *storage)
164{
165 return storage->raft && raft_left(storage->raft);
166}
167
168bool
169ovsdb_storage_is_leader(const struct ovsdb_storage *storage)
170{
171 return !storage->raft || raft_is_leader(storage->raft);
172}
173
174const struct uuid *
175ovsdb_storage_get_cid(const struct ovsdb_storage *storage)
176{
177 return storage->raft ? raft_get_cid(storage->raft) : NULL;
178}
179
180const struct uuid *
181ovsdb_storage_get_sid(const struct ovsdb_storage *storage)
182{
183 return storage->raft ? raft_get_sid(storage->raft) : NULL;
184}
185
186uint64_t
187ovsdb_storage_get_applied_index(const struct ovsdb_storage *storage)
188{
189 return storage->raft ? raft_get_applied_index(storage->raft) : 0;
190}
191
3423cd97
IM
192void
193ovsdb_storage_get_memory_usage(const struct ovsdb_storage *storage,
194 struct simap *usage)
195{
196 if (storage->raft) {
197 raft_get_memory_usage(storage->raft, usage);
198 }
199}
200
1b1d2e6d
BP
201void
202ovsdb_storage_run(struct ovsdb_storage *storage)
203{
204 if (storage->raft) {
205 raft_run(storage->raft);
206 }
207}
208
209void
210ovsdb_storage_wait(struct ovsdb_storage *storage)
211{
212 if (storage->raft) {
213 raft_wait(storage->raft);
214 }
215}
216
217/* Returns 'storage''s embedded name, if it has one, otherwise null.
218 *
219 * Only clustered storage has a built-in name. */
220const char *
221ovsdb_storage_get_name(const struct ovsdb_storage *storage)
222{
223 return storage->raft ? raft_get_name(storage->raft) : NULL;
224}
225
226/* Attempts to read a log record from 'storage'.
227 *
228 * If successful, returns NULL and stores the transaction information in
229 * '*schemap', '*txnp', and '*txnid'. At least one of these will be nonnull.
230 * The caller owns the data and must eventually free it (with json_destroy()).
231 *
232 * If 'storage' is not clustered, 'txnid' may be null.
233 *
234 * If a read error occurs, returns the error and stores NULL in '*jsonp'.
235 *
236 * If the read reaches end of file, returns NULL and stores NULL in
237 * '*jsonp'. */
238struct ovsdb_error * OVS_WARN_UNUSED_RESULT
239ovsdb_storage_read(struct ovsdb_storage *storage,
240 struct ovsdb_schema **schemap,
241 struct json **txnp,
242 struct uuid *txnid)
243{
244 *schemap = NULL;
245 *txnp = NULL;
246 if (txnid) {
247 *txnid = UUID_ZERO;
248 }
249
250 struct json *json;
251 struct json *schema_json = NULL;
252 struct json *txn_json = NULL;
253 if (storage->raft) {
254 bool is_snapshot;
255 json = json_nullable_clone(
256 raft_next_entry(storage->raft, txnid, &is_snapshot));
257 if (!json) {
258 return NULL;
fa37affa 259 } else if (json->type != JSON_ARRAY || json->array.n != 2) {
1b1d2e6d
BP
260 json_destroy(json);
261 return ovsdb_error(NULL, "invalid commit format");
262 }
263
fa37affa 264 struct json **e = json->array.elems;
1b1d2e6d
BP
265 schema_json = e[0]->type != JSON_NULL ? e[0] : NULL;
266 txn_json = e[1]->type != JSON_NULL ? e[1] : NULL;
267 } else if (storage->log) {
268 struct ovsdb_error *error = ovsdb_log_read(storage->log, &json);
269 if (error || !json) {
270 return error;
271 }
272
273 unsigned int n = storage->n_read++;
274 struct json **jsonp = !n ? &schema_json : &txn_json;
275 *jsonp = json;
276 if (n == 1) {
277 ovsdb_log_mark_base(storage->log);
278 }
279 } else {
280 /* Unbacked. Nothing to do. */
281 return NULL;
282 }
283
284 /* If we got this far then we must have at least a schema or a
285 * transaction. */
286 ovs_assert(schema_json || txn_json);
287
288 if (schema_json) {
289 struct ovsdb_schema *schema;
290 struct ovsdb_error *error = ovsdb_schema_from_json(schema_json,
291 &schema);
292 if (error) {
293 json_destroy(json);
294 return error;
295 }
296
297 const char *storage_name = ovsdb_storage_get_name(storage);
298 const char *schema_name = schema->name;
299 if (storage_name && strcmp(storage_name, schema_name)) {
300 error = ovsdb_error(NULL, "name %s in header does not match "
301 "name %s in schema",
302 storage_name, schema_name);
303 json_destroy(json);
304 ovsdb_schema_destroy(schema);
305 return error;
306 }
307
308 *schemap = schema;
309 }
310
311 if (txn_json) {
312 *txnp = json_clone(txn_json);
313 }
314
315 json_destroy(json);
316 return NULL;
317}
318
319/* Reads and returns the schema from standalone storage 'storage'. Terminates
320 * with an error on failure. */
321struct ovsdb_schema *
322ovsdb_storage_read_schema(struct ovsdb_storage *storage)
323{
324 ovs_assert(storage->log);
325
326 struct json *txn_json;
327 struct ovsdb_schema *schema;
328 struct ovsdb_error *error = ovsdb_storage_read(storage, &schema,
329 &txn_json, NULL);
330 if (error) {
331 ovs_fatal(0, "%s", ovsdb_error_to_string_free(error));
332 }
333 if (!schema && !txn_json) {
334 ovs_fatal(0, "unexpected end of file reading schema");
335 }
336 ovs_assert(schema && !txn_json);
337
338 return schema;
339}
340
341bool
342ovsdb_storage_read_wait(struct ovsdb_storage *storage)
343{
344 return (storage->raft
345 ? raft_has_next_entry(storage->raft)
346 : false);
347}
348
349void
350ovsdb_storage_unread(struct ovsdb_storage *storage)
351{
352 if (storage->error) {
353 return;
354 }
355
356 if (storage->raft) {
357 if (!storage->error) {
358 storage->error = ovsdb_error(NULL, "inconsistent data");
359 }
360 } else if (storage->log) {
361 ovsdb_log_unread(storage->log);
362 }
363}
364
365struct ovsdb_write {
366 struct ovsdb_error *error;
367 struct raft_command *command;
368};
369
370/* Not suitable for writing transactions that change the schema. */
371struct ovsdb_write * OVS_WARN_UNUSED_RESULT
372ovsdb_storage_write(struct ovsdb_storage *storage, const struct json *data,
373 const struct uuid *prereq, struct uuid *resultp,
374 bool durable)
375{
376 struct ovsdb_write *w = xzalloc(sizeof *w);
377 struct uuid result = UUID_ZERO;
378 if (storage->error) {
379 w->error = ovsdb_error_clone(storage->error);
380 } else if (storage->raft) {
381 struct json *txn_json = json_array_create_2(json_null_create(),
382 json_clone(data));
383 w->command = raft_command_execute(storage->raft, txn_json,
384 prereq, &result);
385 json_destroy(txn_json);
386 } else if (storage->log) {
387 w->error = ovsdb_log_write(storage->log, data);
388 if (!w->error) {
389 storage->n_written++;
390 if (durable) {
391 w->error = ovsdb_log_commit_block(storage->log);
392 }
393 }
394 } else {
395 /* When 'error' and 'command' are both null, it indicates that the
396 * command is complete. This is fine since this unbacked storage drops
397 * writes. */
398 }
399 if (resultp) {
400 *resultp = result;
401 }
402 return w;
403}
404
405/* Not suitable for writing transactions that change the schema. */
406struct ovsdb_error * OVS_WARN_UNUSED_RESULT
407ovsdb_storage_write_block(struct ovsdb_storage *storage,
408 const struct json *data, const struct uuid *prereq,
409 struct uuid *resultp, bool durable)
410{
411 struct ovsdb_write *w = ovsdb_storage_write(storage, data,
412 prereq, resultp, durable);
413 while (!ovsdb_write_is_complete(w)) {
414 if (storage->raft) {
415 raft_run(storage->raft);
416 }
417
418 ovsdb_write_wait(w);
419 if (storage->raft) {
420 raft_wait(storage->raft);
421 }
422 poll_block();
423 }
424
425 struct ovsdb_error *error = ovsdb_error_clone(ovsdb_write_get_error(w));
426 ovsdb_write_destroy(w);
427 return error;
428}
429
430bool
431ovsdb_write_is_complete(const struct ovsdb_write *w)
432{
433 return (w->error
434 || !w->command
435 || raft_command_get_status(w->command) != RAFT_CMD_INCOMPLETE);
436}
437
438const struct ovsdb_error *
439ovsdb_write_get_error(const struct ovsdb_write *w_)
440{
441 struct ovsdb_write *w = CONST_CAST(struct ovsdb_write *, w_);
442 ovs_assert(ovsdb_write_is_complete(w));
443
444 if (w->command && !w->error) {
445 enum raft_command_status status = raft_command_get_status(w->command);
446 if (status != RAFT_CMD_SUCCESS) {
447 w->error = ovsdb_error("cluster error", "%s",
448 raft_command_status_to_string(status));
449 }
450 }
451
452 return w->error;
453}
454
455uint64_t
456ovsdb_write_get_commit_index(const struct ovsdb_write *w)
457{
458 ovs_assert(ovsdb_write_is_complete(w));
459 return (w->command && !w->error
460 ? raft_command_get_commit_index(w->command)
461 : 0);
462}
463
464void
465ovsdb_write_wait(const struct ovsdb_write *w)
466{
467 if (ovsdb_write_is_complete(w)) {
468 poll_immediate_wake();
469 }
470}
471
472void
473ovsdb_write_destroy(struct ovsdb_write *w)
474{
475 if (w) {
476 raft_command_unref(w->command);
477 ovsdb_error_destroy(w->error);
478 free(w);
479 }
480}
481
482static void
483schedule_next_snapshot(struct ovsdb_storage *storage, bool quick)
484{
485 if (storage->log || storage->raft) {
486 unsigned int base = 10 * 60 * 1000; /* 10 minutes */
487 unsigned int range = 10 * 60 * 1000; /* 10 minutes */
488 if (quick) {
489 base /= 10;
490 range /= 10;
491 }
492
493 long long int now = time_msec();
494 storage->next_snapshot_min = now + base + random_range(range);
495 storage->next_snapshot_max = now + 60LL * 60 * 24 * 1000; /* 1 day */
496 } else {
497 storage->next_snapshot_min = LLONG_MAX;
498 storage->next_snapshot_max = LLONG_MAX;
499 }
500}
501
502bool
503ovsdb_storage_should_snapshot(const struct ovsdb_storage *storage)
504{
505 if (storage->raft || storage->log) {
506 /* If we haven't reached the minimum snapshot time, don't snapshot. */
507 long long int now = time_msec();
508 if (now < storage->next_snapshot_min) {
509 return false;
510 }
511
512 /* If we can't snapshot right now, don't. */
513 if (storage->raft && !raft_may_snapshot(storage->raft)) {
514 return false;
515 }
516
517 uint64_t log_len = (storage->raft
518 ? raft_get_log_length(storage->raft)
519 : storage->n_read + storage->n_written);
520 if (now < storage->next_snapshot_max) {
521 /* Maximum snapshot time not yet reached. Take a snapshot if there
522 * have been at least 100 log entries and the log file size has
523 * grown a lot. */
524 bool grew_lots = (storage->raft
525 ? raft_grew_lots(storage->raft)
526 : ovsdb_log_grew_lots(storage->log));
527 return log_len >= 100 && grew_lots;
528 } else {
529 /* We have reached the maximum snapshot time. Take a snapshot if
530 * there have been any log entries at all. */
531 return log_len > 0;
532 }
533 }
534
535 return false;
536}
537
538static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
539ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
540 const struct json *schema,
541 const struct json *data)
542{
543 if (storage->raft) {
544 struct json *entries = json_array_create_empty();
545 if (schema) {
546 json_array_add(entries, json_clone(schema));
547 }
548 if (data) {
549 json_array_add(entries, json_clone(data));
550 }
551 struct ovsdb_error *error = raft_store_snapshot(storage->raft,
552 entries);
553 json_destroy(entries);
554 return error;
555 } else if (storage->log) {
556 struct json *entries[2];
557 size_t n = 0;
558 if (schema) {
559 entries[n++] = CONST_CAST(struct json *, schema);
560 }
561 if (data) {
562 entries[n++] = CONST_CAST(struct json *, data);
563 }
564 return ovsdb_log_replace(storage->log, entries, n);
565 } else {
566 return NULL;
567 }
568}
569
570/* 'schema' and 'data' should faithfully represent the current schema and data,
571 * otherwise the two storing backing formats will yield divergent results. Use
572 * ovsdb_storage_write_schema_change() to change the schema. */
573struct ovsdb_error * OVS_WARN_UNUSED_RESULT
574ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
575 const struct json *schema,
576 const struct json *data)
577{
578 struct ovsdb_error *error = ovsdb_storage_store_snapshot__(storage,
579 schema, data);
580 bool retry_quickly = error != NULL;
581 schedule_next_snapshot(storage, retry_quickly);
582 return error;
583}
584
585struct ovsdb_write * OVS_WARN_UNUSED_RESULT
586ovsdb_storage_write_schema_change(struct ovsdb_storage *storage,
587 const struct json *schema,
588 const struct json *data,
589 const struct uuid *prereq,
590 struct uuid *resultp)
591{
592 struct ovsdb_write *w = xzalloc(sizeof *w);
593 struct uuid result = UUID_ZERO;
594 if (storage->error) {
595 w->error = ovsdb_error_clone(storage->error);
596 } else if (storage->raft) {
597 struct json *txn_json = json_array_create_2(json_clone(schema),
598 json_clone(data));
599 w->command = raft_command_execute(storage->raft, txn_json,
600 prereq, &result);
601 json_destroy(txn_json);
602 } else if (storage->log) {
603 w->error = ovsdb_storage_store_snapshot__(storage, schema, data);
604 } else {
605 /* When 'error' and 'command' are both null, it indicates that the
606 * command is complete. This is fine since this unbacked storage drops
607 * writes. */
608 }
609 if (resultp) {
610 *resultp = result;
611 }
612 return w;
613}
2cd62f75
HZ
614
615const struct uuid *
616ovsdb_storage_peek_last_eid(struct ovsdb_storage *storage)
617{
618 if (!storage->raft) {
619 return NULL;
620 }
621 return raft_current_eid(storage->raft);
622}