]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/raft.c
9eabe2cfeecd7068ac73360b8c55a0f92efc0866
[mirror_ovs.git] / ovsdb / raft.c
1 /*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18
19 #include "raft.h"
20 #include "raft-private.h"
21
22 #include <errno.h>
23 #include <unistd.h>
24
25 #include "hash.h"
26 #include "jsonrpc.h"
27 #include "lockfile.h"
28 #include "openvswitch/dynamic-string.h"
29 #include "openvswitch/hmap.h"
30 #include "openvswitch/json.h"
31 #include "openvswitch/list.h"
32 #include "openvswitch/poll-loop.h"
33 #include "openvswitch/vlog.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-parser.h"
36 #include "ovsdb/log.h"
37 #include "raft-rpc.h"
38 #include "random.h"
39 #include "socket-util.h"
40 #include "stream.h"
41 #include "timeval.h"
42 #include "unicode.h"
43 #include "unixctl.h"
44 #include "util.h"
45 #include "uuid.h"
46
47 VLOG_DEFINE_THIS_MODULE(raft);
48
49 /* Roles for a Raft server:
50 *
51 * - Followers: Servers in touch with the current leader.
52 *
53 * - Candidate: Servers unaware of a current leader and seeking election to
54 * leader.
55 *
56 * - Leader: Handles all client requests. At most one at a time.
57 *
58 * In normal operation there is exactly one leader and all of the other servers
59 * are followers. */
60 enum raft_role {
61 RAFT_FOLLOWER,
62 RAFT_CANDIDATE,
63 RAFT_LEADER
64 };
65
66 /* Flags for unit tests. */
67 enum raft_failure_test {
68 FT_NO_TEST,
69 FT_CRASH_BEFORE_SEND_APPEND_REQ,
70 FT_CRASH_AFTER_SEND_APPEND_REQ,
71 FT_CRASH_BEFORE_SEND_EXEC_REP,
72 FT_CRASH_AFTER_SEND_EXEC_REP,
73 FT_CRASH_BEFORE_SEND_EXEC_REQ,
74 FT_CRASH_AFTER_SEND_EXEC_REQ,
75 FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE,
76 FT_DELAY_ELECTION
77 };
78 static enum raft_failure_test failure_test;
79
80 /* A connection between this Raft server and another one. */
81 struct raft_conn {
82 struct ovs_list list_node; /* In struct raft's 'conns' list. */
83 struct jsonrpc_session *js; /* JSON-RPC connection. */
84 struct uuid sid; /* This server's unique ID. */
85 char *nickname; /* Short name for use in log messages. */
86 bool incoming; /* True if incoming, false if outgoing. */
87 unsigned int js_seqno; /* Seqno for noticing (re)connections. */
88 };
89
90 static void raft_conn_close(struct raft_conn *);
91
92 /* A "command", that is, a request to append an entry to the log.
93 *
94 * The Raft specification only allows clients to issue commands to the leader.
95 * With this implementation, clients may issue a command on any server, which
96 * then relays the command to the leader if necessary.
97 *
98 * This structure is thus used in three cases:
99 *
100 * 1. We are the leader and the command was issued to us directly.
101 *
102 * 2. We are a follower and relayed the command to the leader.
103 *
104 * 3. We are the leader and a follower relayed the command to us.
105 */
106 struct raft_command {
107 /* All cases. */
108 struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
109 unsigned int n_refs; /* Reference count. */
110 enum raft_command_status status; /* Execution status. */
111 struct uuid eid; /* Entry ID of result. */
112
113 /* Case 1 only. */
114 uint64_t index; /* Index in log (0 if being relayed). */
115
116 /* Case 2 only. */
117 long long int timestamp; /* Issue or last ping time, for expiration. */
118
119 /* Case 3 only. */
120 struct uuid sid; /* The follower (otherwise UUID_ZERO). */
121 };
122
123 static void raft_command_complete(struct raft *, struct raft_command *,
124 enum raft_command_status);
125
126 static void raft_complete_all_commands(struct raft *,
127 enum raft_command_status);
128
129 /* Type of deferred action, see struct raft_waiter. */
130 enum raft_waiter_type {
131 RAFT_W_ENTRY,
132 RAFT_W_TERM,
133 RAFT_W_RPC,
134 };
135
136 /* An action deferred until a log write commits to disk. */
137 struct raft_waiter {
138 struct ovs_list list_node;
139 uint64_t commit_ticket;
140
141 enum raft_waiter_type type;
142 union {
143 /* RAFT_W_ENTRY.
144 *
145 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
146 * completion, updates 'log_synced' to indicate that the new log entry
147 * or entries are committed and, if we are leader, also updates our
148 * local 'match_index'. */
149 struct {
150 uint64_t index;
151 } entry;
152
153 /* RAFT_W_TERM.
154 *
155 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
156 * Upon completion, updates 'synced_term' and 'synced_vote', which
157 * triggers sending RPCs deferred by the uncommitted 'term' and
158 * 'vote'. */
159 struct {
160 uint64_t term;
161 struct uuid vote;
162 } term;
163
164 /* RAFT_W_RPC.
165 *
166 * Sometimes, sending an RPC to a peer must be delayed until an entry,
167 * a term, or a vote mentioned in the RPC is synced to disk. This
168 * waiter keeps a copy of such an RPC until the previous waiters have
169 * committed. */
170 union raft_rpc *rpc;
171 };
172 };
173
174 static struct raft_waiter *raft_waiter_create(struct raft *,
175 enum raft_waiter_type,
176 bool start_commit);
177 static void raft_waiters_destroy(struct raft *);
178
179 /* The Raft state machine. */
180 struct raft {
181 struct hmap_node hmap_node; /* In 'all_rafts'. */
182 struct ovsdb_log *log;
183
184 /* Persistent derived state.
185 *
186 * This must be updated on stable storage before responding to RPCs. It can be
187 * derived from the header, snapshot, and log in 'log'. */
188
189 struct uuid cid; /* Cluster ID (immutable for the cluster). */
190 struct uuid sid; /* Server ID (immutable for the server). */
191 char *local_address; /* Local address (immutable for the server). */
192 char *local_nickname; /* Used for local server in log messages. */
193 char *name; /* Schema name (immutable for the cluster). */
194
195 /* Contains "struct raft_server"s and represents the server configuration
196 * most recently added to 'log'. */
197 struct hmap servers;
198
199 #define ELECTION_BASE_MSEC 1000
200 #define ELECTION_RANGE_MSEC 1000
201 /* The election timeout base value for leader election, in milliseconds.
202 * It can be set by unixctl cluster/change-election-timer. Default value is
203 * ELECTION_BASE_MSEC. */
204 uint64_t election_timer;
205 /* If not 0, it is the new value of election_timer being proposed. */
206 uint64_t election_timer_new;
207
208 /* Persistent state on all servers.
209 *
210 * Must be updated on stable storage before responding to RPCs. */
211
212 /* Current term and the vote for that term. These might be on the way to
213 * disk now. */
214 uint64_t term; /* Initialized to 0 and only increases. */
215 struct uuid vote; /* All-zeros if no vote yet in 'term'. */
216
217 /* The term and vote that have been synced to disk. */
218 uint64_t synced_term;
219 struct uuid synced_vote;
220
221 /* The log.
222 *
223 * A log entry with index 1 never really exists; the initial snapshot for a
224 * Raft is considered to include this index. The first real log entry has
225 * index 2.
226 *
227 * A new Raft instance contains an empty log: log_start=2, log_end=2.
228 * Over time, the log grows: log_start=2, log_end=N.
229 * At some point, the server takes a snapshot: log_start=N, log_end=N.
230 * The log continues to grow: log_start=N, log_end=N+1...
231 *
232 * Must be updated on stable storage before responding to RPCs. */
233 struct raft_entry *entries; /* Log entry i is in log[i - log_start]. */
234 uint64_t log_start; /* Index of first entry in log. */
235 uint64_t log_end; /* Index of last entry in log, plus 1. */
236 uint64_t log_synced; /* Index of last synced entry. */
237 size_t allocated_log; /* Allocated entries in 'log'. */
238
239 /* Snapshot state (see Figure 5.1)
240 *
241 * This is the state of the cluster as of the last discarded log entry,
242 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
243 * Only committed log entries can be included in a snapshot. */
244 struct raft_entry snap;
245
246 /* Volatile state.
247 *
248 * The snapshot is always committed, but the rest of the log might not be yet.
249 * 'last_applied' tracks what entries have been passed to the client. If the
250 * client hasn't yet read the latest snapshot, then even the snapshot isn't
251 * applied yet. Thus, the invariants are different for these members:
252 *
253 * log_start - 2 <= last_applied <= commit_index < log_end.
254 * log_start - 1 <= commit_index < log_end.
255 */
256
257 enum raft_role role; /* Current role. */
258 uint64_t commit_index; /* Max log index known to be committed. */
259 uint64_t last_applied; /* Max log index applied to state machine. */
260 struct uuid leader_sid; /* Server ID of leader (zero, if unknown). */
261
262 long long int election_base; /* Time of last heartbeat from leader. */
263 long long int election_timeout; /* Time at which we start an election. */
264
265 /* Used for joining a cluster. */
266 bool joining; /* Attempting to join the cluster? */
267 struct sset remote_addresses; /* Addresses to try to find other servers. */
268 long long int join_timeout; /* Time to re-send add server request. */
269
270 /* Used for leaving a cluster. */
271 bool leaving; /* True if we are leaving the cluster. */
272 bool left; /* True if we have finished leaving. */
273 long long int leave_timeout; /* Time to re-send remove server request. */
274
275 /* Failure. */
276 bool failed; /* True if unrecoverable error has occurred. */
277
278 /* File synchronization. */
279 struct ovs_list waiters; /* Contains "struct raft_waiter"s. */
280
281 /* Network connections. */
282 struct pstream *listener; /* For connections from other Raft servers. */
283 long long int listen_backoff; /* For retrying creating 'listener'. */
284 struct ovs_list conns; /* Contains struct raft_conns. */
285
286 /* Leaders only. Reinitialized after becoming leader. */
287 struct hmap add_servers; /* Contains "struct raft_server"s to add. */
288 struct raft_server *remove_server; /* Server being removed. */
289 struct hmap commands; /* Contains "struct raft_command"s. */
290 long long int ping_timeout; /* Time at which to send a heartbeat */
291
292 /* Candidates only. Reinitialized at start of election. */
293 int n_votes; /* Number of votes for me. */
294
295 /* Followers and candidates only. */
296 bool candidate_retrying; /* The earlier election timed-out and we are
297 now retrying. */
298 bool had_leader; /* There has been leader elected since last
299 election initiated. This is to help setting
300 candidate_retrying. */
301 };
302
303 /* All Raft structures. */
304 static struct hmap all_rafts = HMAP_INITIALIZER(&all_rafts);
305
306 static void raft_init(void);
307
308 static struct ovsdb_error *raft_read_header(struct raft *)
309 OVS_WARN_UNUSED_RESULT;
310
311 static void raft_send_execute_command_reply(struct raft *,
312 const struct uuid *sid,
313 const struct uuid *eid,
314 enum raft_command_status,
315 uint64_t commit_index);
316
317 static void raft_update_our_match_index(struct raft *, uint64_t min_index);
318
319 static void raft_send_remove_server_reply__(
320 struct raft *, const struct uuid *target_sid,
321 const struct uuid *requester_sid, struct unixctl_conn *requester_conn,
322 bool success, const char *comment);
323 static void raft_finished_leaving_cluster(struct raft *);
324
325 static void raft_server_init_leader(struct raft *, struct raft_server *);
326
327 static bool raft_rpc_is_heartbeat(const union raft_rpc *);
328 static bool raft_is_rpc_synced(const struct raft *, const union raft_rpc *);
329
330 static void raft_handle_rpc(struct raft *, const union raft_rpc *);
331
332 static bool raft_send_at(struct raft *, const union raft_rpc *,
333 int line_number);
334 #define raft_send(raft, rpc) raft_send_at(raft, rpc, __LINE__)
335
336 static bool raft_send_to_conn_at(struct raft *, const union raft_rpc *,
337 struct raft_conn *, int line_number);
338 #define raft_send_to_conn(raft, rpc, conn) \
339 raft_send_to_conn_at(raft, rpc, conn, __LINE__)
340
341 static void raft_send_append_request(struct raft *,
342 struct raft_server *, unsigned int n,
343 const char *comment);
344
345 static void raft_become_leader(struct raft *);
346 static void raft_become_follower(struct raft *);
347 static void raft_reset_election_timer(struct raft *);
348 static void raft_reset_ping_timer(struct raft *);
349 static void raft_send_heartbeats(struct raft *);
350 static void raft_start_election(struct raft *, bool leadership_transfer);
351 static bool raft_truncate(struct raft *, uint64_t new_end);
352 static void raft_get_servers_from_log(struct raft *, enum vlog_level);
353 static void raft_get_election_timer_from_log(struct raft *);
354
355 static bool raft_handle_write_error(struct raft *, struct ovsdb_error *);
356
357 static void raft_run_reconfigure(struct raft *);
358
359 static void raft_set_leader(struct raft *, const struct uuid *sid);
360 static struct raft_server *
361 raft_find_server(const struct raft *raft, const struct uuid *sid)
362 {
363 return raft_server_find(&raft->servers, sid);
364 }
365
366 static char *
367 raft_make_address_passive(const char *address_)
368 {
369 if (!strncmp(address_, "unix:", 5)) {
370 return xasprintf("p%s", address_);
371 } else {
372 char *address = xstrdup(address_);
373 char *host, *port;
374 inet_parse_host_port_tokens(strchr(address, ':') + 1, &host, &port);
375
376 struct ds paddr = DS_EMPTY_INITIALIZER;
377 ds_put_format(&paddr, "p%.3s:%s:", address, port);
378 if (strchr(host, ':')) {
379 ds_put_format(&paddr, "[%s]", host);
380 } else {
381 ds_put_cstr(&paddr, host);
382 }
383 free(address);
384 return ds_steal_cstr(&paddr);
385 }
386 }
387
388 static struct raft *
389 raft_alloc(void)
390 {
391 raft_init();
392
393 struct raft *raft = xzalloc(sizeof *raft);
394 hmap_node_nullify(&raft->hmap_node);
395 hmap_init(&raft->servers);
396 raft->log_start = raft->log_end = 1;
397 raft->role = RAFT_FOLLOWER;
398 sset_init(&raft->remote_addresses);
399 raft->join_timeout = LLONG_MAX;
400 ovs_list_init(&raft->waiters);
401 raft->listen_backoff = LLONG_MIN;
402 ovs_list_init(&raft->conns);
403 hmap_init(&raft->add_servers);
404 hmap_init(&raft->commands);
405
406 raft->election_timer = ELECTION_BASE_MSEC;
407
408 return raft;
409 }
410
411 /* Creates an on-disk file that represents a new Raft cluster and initializes
412 * it to consist of a single server, the one on which this function is called.
413 *
414 * Creates the local copy of the cluster's log in 'file_name', which must not
415 * already exist. Gives it the name 'name', which should be the database
416 * schema name and which is used only to match up this database with the server
417 * added to the cluster later if the cluster ID is unavailable.
418 *
419 * The new server is located at 'local_address', which must take one of the
420 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
421 * square bracket enclosed IPv6 address and PORT is a TCP port number.
422 *
423 * This only creates the on-disk file. Use raft_open() to start operating the
424 * new server.
425 *
426 * Returns null if successful, otherwise an ovsdb_error describing the
427 * problem. */
428 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
429 raft_create_cluster(const char *file_name, const char *name,
430 const char *local_address, const struct json *data)
431 {
432 /* Parse and verify validity of the local address. */
433 struct ovsdb_error *error = raft_address_validate(local_address);
434 if (error) {
435 return error;
436 }
437
438 /* Create log file. */
439 struct ovsdb_log *log;
440 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
441 -1, &log);
442 if (error) {
443 return error;
444 }
445
446 /* Write log file. */
447 struct raft_header h = {
448 .sid = uuid_random(),
449 .cid = uuid_random(),
450 .name = xstrdup(name),
451 .local_address = xstrdup(local_address),
452 .joining = false,
453 .remote_addresses = SSET_INITIALIZER(&h.remote_addresses),
454 .snap_index = 1,
455 .snap = {
456 .term = 1,
457 .data = json_nullable_clone(data),
458 .eid = uuid_random(),
459 .servers = json_object_create(),
460 },
461 };
462 shash_add_nocopy(json_object(h.snap.servers),
463 xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
464 json_string_create(local_address));
465 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
466 raft_header_uninit(&h);
467 if (!error) {
468 error = ovsdb_log_commit_block(log);
469 }
470 ovsdb_log_close(log);
471
472 return error;
473 }
474
475 /* Creates a database file that represents a new server in an existing Raft
476 * cluster.
477 *
478 * Creates the local copy of the cluster's log in 'file_name', which must not
479 * already exist. Gives it the name 'name', which must be the same name
480 * passed in to raft_create_cluster() earlier.
481 *
482 * 'cid' is optional. If specified, the new server will join only the cluster
483 * with the given cluster ID.
484 *
485 * The new server is located at 'local_address', which must take one of the
486 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
487 * square bracket enclosed IPv6 address and PORT is a TCP port number.
488 *
489 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
490 * specifies the addresses of existing servers in the cluster. One server out
491 * of the existing cluster is sufficient, as long as that server is reachable
492 * and not partitioned from the current cluster leader. If multiple servers
493 * from the cluster are specified, then it is sufficient for any of them to
494 * meet this criterion.
495 *
496 * This only creates the on-disk file and does no network access. Use
497 * raft_open() to start operating the new server. (Until this happens, the
498 * new server has not joined the cluster.)
499 *
500 * Returns null if successful, otherwise an ovsdb_error describing the
501 * problem. */
502 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
503 raft_join_cluster(const char *file_name,
504 const char *name, const char *local_address,
505 const struct sset *remote_addresses,
506 const struct uuid *cid)
507 {
508 ovs_assert(!sset_is_empty(remote_addresses));
509
510 /* Parse and verify validity of the addresses. */
511 struct ovsdb_error *error = raft_address_validate(local_address);
512 if (error) {
513 return error;
514 }
515 const char *addr;
516 SSET_FOR_EACH (addr, remote_addresses) {
517 error = raft_address_validate(addr);
518 if (error) {
519 return error;
520 }
521 if (!strcmp(addr, local_address)) {
522 return ovsdb_error(NULL, "remote addresses cannot be the same "
523 "as the local address");
524 }
525 }
526
527 /* Verify validity of the cluster ID (if provided). */
528 if (cid && uuid_is_zero(cid)) {
529 return ovsdb_error(NULL, "all-zero UUID is not valid cluster ID");
530 }
531
532 /* Create log file. */
533 struct ovsdb_log *log;
534 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
535 -1, &log);
536 if (error) {
537 return error;
538 }
539
540 /* Write log file. */
541 struct raft_header h = {
542 .sid = uuid_random(),
543 .cid = cid ? *cid : UUID_ZERO,
544 .name = xstrdup(name),
545 .local_address = xstrdup(local_address),
546 .joining = true,
547 /* No snapshot yet. */
548 };
549 sset_clone(&h.remote_addresses, remote_addresses);
550 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
551 raft_header_uninit(&h);
552 if (!error) {
553 error = ovsdb_log_commit_block(log);
554 }
555 ovsdb_log_close(log);
556
557 return error;
558 }
559
560 /* Reads the initial header record from 'log', which must be a Raft clustered
561 * database log, and populates '*md' with the information read from it. The
562 * caller must eventually destroy 'md' with raft_metadata_destroy().
563 *
564 * On success, returns NULL. On failure, returns an error that the caller must
565 * eventually destroy and zeros '*md'. */
566 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
567 raft_read_metadata(struct ovsdb_log *log, struct raft_metadata *md)
568 {
569 struct raft *raft = raft_alloc();
570 raft->log = log;
571
572 struct ovsdb_error *error = raft_read_header(raft);
573 if (!error) {
574 md->sid = raft->sid;
575 md->name = xstrdup(raft->name);
576 md->local = xstrdup(raft->local_address);
577 md->cid = raft->cid;
578 } else {
579 memset(md, 0, sizeof *md);
580 }
581
582 raft->log = NULL;
583 raft_close(raft);
584 return error;
585 }
586
587 /* Frees the metadata in 'md'. */
588 void
589 raft_metadata_destroy(struct raft_metadata *md)
590 {
591 if (md) {
592 free(md->name);
593 free(md->local);
594 }
595 }
596
597 static const struct raft_entry *
598 raft_get_entry(const struct raft *raft, uint64_t index)
599 {
600 ovs_assert(index >= raft->log_start);
601 ovs_assert(index < raft->log_end);
602 return &raft->entries[index - raft->log_start];
603 }
604
605 static uint64_t
606 raft_get_term(const struct raft *raft, uint64_t index)
607 {
608 return (index == raft->log_start - 1
609 ? raft->snap.term
610 : raft_get_entry(raft, index)->term);
611 }
612
613 static const struct json *
614 raft_servers_for_index(const struct raft *raft, uint64_t index)
615 {
616 ovs_assert(index >= raft->log_start - 1);
617 ovs_assert(index < raft->log_end);
618
619 const struct json *servers = raft->snap.servers;
620 for (uint64_t i = raft->log_start; i <= index; i++) {
621 const struct raft_entry *e = raft_get_entry(raft, i);
622 if (e->servers) {
623 servers = e->servers;
624 }
625 }
626 return servers;
627 }
628
629 static void
630 raft_set_servers(struct raft *raft, const struct hmap *new_servers,
631 enum vlog_level level)
632 {
633 struct raft_server *s, *next;
634 HMAP_FOR_EACH_SAFE (s, next, hmap_node, &raft->servers) {
635 if (!raft_server_find(new_servers, &s->sid)) {
636 ovs_assert(s != raft->remove_server);
637
638 hmap_remove(&raft->servers, &s->hmap_node);
639 VLOG(level, "server %s removed from configuration", s->nickname);
640 raft_server_destroy(s);
641 }
642 }
643
644 HMAP_FOR_EACH_SAFE (s, next, hmap_node, new_servers) {
645 if (!raft_find_server(raft, &s->sid)) {
646 VLOG(level, "server %s added to configuration", s->nickname);
647
648 struct raft_server *new
649 = raft_server_add(&raft->servers, &s->sid, s->address);
650 raft_server_init_leader(raft, new);
651 }
652 }
653 }
654
655 static uint64_t
656 raft_add_entry(struct raft *raft,
657 uint64_t term, struct json *data, const struct uuid *eid,
658 struct json *servers, uint64_t election_timer)
659 {
660 if (raft->log_end - raft->log_start >= raft->allocated_log) {
661 raft->entries = x2nrealloc(raft->entries, &raft->allocated_log,
662 sizeof *raft->entries);
663 }
664
665 uint64_t index = raft->log_end++;
666 struct raft_entry *entry = &raft->entries[index - raft->log_start];
667 entry->term = term;
668 entry->data = data;
669 entry->eid = eid ? *eid : UUID_ZERO;
670 entry->servers = servers;
671 entry->election_timer = election_timer;
672 return index;
673 }
674
675 /* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers',
676 * 'election_timer' to * 'raft''s log and returns an error indication. */
677 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
678 raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
679 const struct uuid *eid, struct json *servers,
680 uint64_t election_timer)
681 {
682 struct raft_record r = {
683 .type = RAFT_REC_ENTRY,
684 .term = term,
685 .entry = {
686 .index = raft_add_entry(raft, term, data, eid, servers,
687 election_timer),
688 .data = data,
689 .servers = servers,
690 .election_timer = election_timer,
691 .eid = eid ? *eid : UUID_ZERO,
692 },
693 };
694 return ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r));
695 }
696
697 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
698 raft_write_state(struct ovsdb_log *log,
699 uint64_t term, const struct uuid *vote)
700 {
701 struct raft_record r = { .term = term };
702 if (vote && !uuid_is_zero(vote)) {
703 r.type = RAFT_REC_VOTE;
704 r.sid = *vote;
705 } else {
706 r.type = RAFT_REC_TERM;
707 }
708 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
709 }
710
711 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
712 raft_apply_record(struct raft *raft, unsigned long long int rec_idx,
713 const struct raft_record *r)
714 {
715 /* Apply "term", which is present in most kinds of records (and otherwise
716 * 0).
717 *
718 * A Raft leader can replicate entries from previous terms to the other
719 * servers in the cluster, retaining the original terms on those entries
720 * (see section 3.6.2 "Committing entries from previous terms" for more
721 * information), so it's OK for the term in a log record to precede the
722 * current term. */
723 if (r->term > raft->term) {
724 raft->term = raft->synced_term = r->term;
725 raft->vote = raft->synced_vote = UUID_ZERO;
726 }
727
728 switch (r->type) {
729 case RAFT_REC_ENTRY:
730 if (r->entry.index < raft->commit_index) {
731 return ovsdb_error(NULL, "record %llu attempts to truncate log "
732 "from %"PRIu64" to %"PRIu64" entries, but "
733 "commit index is already %"PRIu64,
734 rec_idx, raft->log_end, r->entry.index,
735 raft->commit_index);
736 } else if (r->entry.index > raft->log_end) {
737 return ovsdb_error(NULL, "record %llu with index %"PRIu64" skips "
738 "past expected index %"PRIu64,
739 rec_idx, r->entry.index, raft->log_end);
740 }
741
742 if (r->entry.index < raft->log_end) {
743 /* This can happen, but it is notable. */
744 VLOG_DBG("record %llu truncates log from %"PRIu64" to %"PRIu64
745 " entries", rec_idx, raft->log_end, r->entry.index);
746 raft_truncate(raft, r->entry.index);
747 }
748
749 uint64_t prev_term = (raft->log_end > raft->log_start
750 ? raft->entries[raft->log_end
751 - raft->log_start - 1].term
752 : raft->snap.term);
753 if (r->term < prev_term) {
754 return ovsdb_error(NULL, "record %llu with index %"PRIu64" term "
755 "%"PRIu64" precedes previous entry's term "
756 "%"PRIu64,
757 rec_idx, r->entry.index, r->term, prev_term);
758 }
759
760 raft->log_synced = raft_add_entry(
761 raft, r->term,
762 json_nullable_clone(r->entry.data), &r->entry.eid,
763 json_nullable_clone(r->entry.servers),
764 r->entry.election_timer);
765 return NULL;
766
767 case RAFT_REC_TERM:
768 return NULL;
769
770 case RAFT_REC_VOTE:
771 if (r->term < raft->term) {
772 return ovsdb_error(NULL, "record %llu votes for term %"PRIu64" "
773 "but current term is %"PRIu64,
774 rec_idx, r->term, raft->term);
775 } else if (!uuid_is_zero(&raft->vote)
776 && !uuid_equals(&raft->vote, &r->sid)) {
777 return ovsdb_error(NULL, "record %llu votes for "SID_FMT" in term "
778 "%"PRIu64" but a previous record for the "
779 "same term voted for "SID_FMT, rec_idx,
780 SID_ARGS(&raft->vote), r->term,
781 SID_ARGS(&r->sid));
782 } else {
783 raft->vote = raft->synced_vote = r->sid;
784 return NULL;
785 }
786 break;
787
788 case RAFT_REC_NOTE:
789 if (!strcmp(r->note, "left")) {
790 return ovsdb_error(NULL, "record %llu indicates server has left "
791 "the cluster; it cannot be added back (use "
792 "\"ovsdb-tool join-cluster\" to add a new "
793 "server)", rec_idx);
794 }
795 return NULL;
796
797 case RAFT_REC_COMMIT_INDEX:
798 if (r->commit_index < raft->commit_index) {
799 return ovsdb_error(NULL, "record %llu regresses commit index "
800 "from %"PRIu64 " to %"PRIu64,
801 rec_idx, raft->commit_index, r->commit_index);
802 } else if (r->commit_index >= raft->log_end) {
803 return ovsdb_error(NULL, "record %llu advances commit index to "
804 "%"PRIu64 " but last log index is %"PRIu64,
805 rec_idx, r->commit_index, raft->log_end - 1);
806 } else {
807 raft->commit_index = r->commit_index;
808 return NULL;
809 }
810 break;
811
812 case RAFT_REC_LEADER:
813 /* XXX we could use this to take back leadership for quick restart */
814 return NULL;
815
816 default:
817 OVS_NOT_REACHED();
818 }
819 }
820
821 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
822 raft_read_header(struct raft *raft)
823 {
824 /* Read header record. */
825 struct json *json;
826 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
827 if (error || !json) {
828 /* Report error or end-of-file. */
829 return error;
830 }
831 ovsdb_log_mark_base(raft->log);
832
833 struct raft_header h;
834 error = raft_header_from_json(&h, json);
835 json_destroy(json);
836 if (error) {
837 return error;
838 }
839
840 raft->sid = h.sid;
841 raft->cid = h.cid;
842 raft->name = xstrdup(h.name);
843 raft->local_address = xstrdup(h.local_address);
844 raft->local_nickname = raft_address_to_nickname(h.local_address, &h.sid);
845 raft->joining = h.joining;
846
847 if (h.joining) {
848 sset_clone(&raft->remote_addresses, &h.remote_addresses);
849 } else {
850 raft_entry_clone(&raft->snap, &h.snap);
851 raft->log_start = raft->log_end = h.snap_index + 1;
852 raft->commit_index = h.snap_index;
853 raft->last_applied = h.snap_index - 1;
854 }
855
856 raft_header_uninit(&h);
857
858 return NULL;
859 }
860
861 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
862 raft_read_log(struct raft *raft)
863 {
864 for (unsigned long long int i = 1; ; i++) {
865 struct json *json;
866 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
867 if (!json) {
868 if (error) {
869 /* We assume that the error is due to a partial write while
870 * appending to the file before a crash, so log it and
871 * continue. */
872 char *error_string = ovsdb_error_to_string_free(error);
873 VLOG_WARN("%s", error_string);
874 free(error_string);
875 error = NULL;
876 }
877 break;
878 }
879
880 struct raft_record r;
881 error = raft_record_from_json(&r, json);
882 if (!error) {
883 error = raft_apply_record(raft, i, &r);
884 raft_record_uninit(&r);
885 }
886 if (error) {
887 return ovsdb_wrap_error(error, "error reading record %llu from "
888 "%s log", i, raft->name);
889 }
890 }
891
892 /* Set the most recent servers. */
893 raft_get_servers_from_log(raft, VLL_DBG);
894
895 /* Set the most recent election_timer. */
896 raft_get_election_timer_from_log(raft);
897
898 return NULL;
899 }
900
901 static void
902 raft_reset_election_timer(struct raft *raft)
903 {
904 unsigned int duration = (raft->election_timer
905 + random_range(ELECTION_RANGE_MSEC));
906 raft->election_base = time_msec();
907 if (failure_test == FT_DELAY_ELECTION) {
908 /* Slow down this node so that it won't win the next election. */
909 duration += raft->election_timer;
910 }
911 raft->election_timeout = raft->election_base + duration;
912 }
913
914 static void
915 raft_reset_ping_timer(struct raft *raft)
916 {
917 raft->ping_timeout = time_msec() + raft->election_timer / 3;
918 }
919
920 static void
921 raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
922 const struct uuid *sid, bool incoming)
923 {
924 struct raft_conn *conn = xzalloc(sizeof *conn);
925 ovs_list_push_back(&raft->conns, &conn->list_node);
926 conn->js = js;
927 if (sid) {
928 conn->sid = *sid;
929 }
930 conn->nickname = raft_address_to_nickname(jsonrpc_session_get_name(js),
931 &conn->sid);
932 conn->incoming = incoming;
933 conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
934 }
935
936 /* Starts the local server in an existing Raft cluster, using the local copy of
937 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
938 * successful or not. */
939 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
940 raft_open(struct ovsdb_log *log, struct raft **raftp)
941 {
942 struct raft *raft = raft_alloc();
943 raft->log = log;
944
945 struct ovsdb_error *error = raft_read_header(raft);
946 if (error) {
947 goto error;
948 }
949
950 if (!raft->joining) {
951 error = raft_read_log(raft);
952 if (error) {
953 goto error;
954 }
955
956 /* Find our own server. */
957 if (!raft_find_server(raft, &raft->sid)) {
958 error = ovsdb_error(NULL, "server does not belong to cluster");
959 goto error;
960 }
961
962 /* If there's only one server, start an election right away so that the
963 * cluster bootstraps quickly. */
964 if (hmap_count(&raft->servers) == 1) {
965 raft_start_election(raft, false);
966 }
967 } else {
968 raft->join_timeout = time_msec() + 1000;
969 }
970
971 raft_reset_ping_timer(raft);
972 raft_reset_election_timer(raft);
973
974 *raftp = raft;
975 hmap_insert(&all_rafts, &raft->hmap_node, hash_string(raft->name, 0));
976 return NULL;
977
978 error:
979 raft_close(raft);
980 *raftp = NULL;
981 return error;
982 }
983
984 /* Returns the name of 'raft', which in OVSDB is the database schema name. */
985 const char *
986 raft_get_name(const struct raft *raft)
987 {
988 return raft->name;
989 }
990
991 /* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
992 * its cluster, then 'cid' will be all-zeros (unless the administrator
993 * specified a cluster ID running "ovsdb-tool join-cluster").
994 *
995 * Each cluster has a unique cluster ID. */
996 const struct uuid *
997 raft_get_cid(const struct raft *raft)
998 {
999 return &raft->cid;
1000 }
1001
1002 /* Returns the server ID of 'raft'. Each server has a unique server ID. */
1003 const struct uuid *
1004 raft_get_sid(const struct raft *raft)
1005 {
1006 return &raft->sid;
1007 }
1008
1009 /* Returns true if 'raft' has completed joining its cluster, has not left or
1010 * initiated leaving the cluster, does not have failed disk storage, and is
1011 * apparently connected to the leader in a healthy way (or is itself the
1012 * leader).
1013 *
1014 * If 'raft' is candidate:
1015 * a) if it is the first round of election, consider it as connected, hoping
1016 * it will successfully elect a new leader soon.
1017 * b) if it is already retrying, consider it as disconnected (so that clients
1018 * may decide to reconnect to other members). */
1019 bool
1020 raft_is_connected(const struct raft *raft)
1021 {
1022 bool ret = (!raft->candidate_retrying
1023 && !raft->joining
1024 && !raft->leaving
1025 && !raft->left
1026 && !raft->failed);
1027 VLOG_DBG("raft_is_connected: %s\n", ret? "true": "false");
1028 return ret;
1029 }
1030
1031 /* Returns true if 'raft' is the cluster leader. */
1032 bool
1033 raft_is_leader(const struct raft *raft)
1034 {
1035 return raft->role == RAFT_LEADER;
1036 }
1037
1038 /* Returns true if 'raft' is the process of joining its cluster. */
1039 bool
1040 raft_is_joining(const struct raft *raft)
1041 {
1042 return raft->joining;
1043 }
1044
1045 /* Only returns *connected* connections. */
1046 static struct raft_conn *
1047 raft_find_conn_by_sid(struct raft *raft, const struct uuid *sid)
1048 {
1049 if (!uuid_is_zero(sid)) {
1050 struct raft_conn *conn;
1051 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1052 if (uuid_equals(sid, &conn->sid)
1053 && jsonrpc_session_is_connected(conn->js)) {
1054 return conn;
1055 }
1056 }
1057 }
1058 return NULL;
1059 }
1060
1061 static struct raft_conn *
1062 raft_find_conn_by_address(struct raft *raft, const char *address)
1063 {
1064 struct raft_conn *conn;
1065 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1066 if (!strcmp(jsonrpc_session_get_name(conn->js), address)) {
1067 return conn;
1068 }
1069 }
1070 return NULL;
1071 }
1072
1073 static void OVS_PRINTF_FORMAT(3, 4)
1074 raft_record_note(struct raft *raft, const char *note,
1075 const char *comment_format, ...)
1076 {
1077 va_list args;
1078 va_start(args, comment_format);
1079 char *comment = xvasprintf(comment_format, args);
1080 va_end(args);
1081
1082 struct raft_record r = {
1083 .type = RAFT_REC_NOTE,
1084 .comment = comment,
1085 .note = CONST_CAST(char *, note),
1086 };
1087 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
1088
1089 free(comment);
1090 }
1091
1092 /* If we're leader, try to transfer leadership to another server, logging
1093 * 'reason' as the human-readable reason (it should be a phrase suitable for
1094 * following "because") . */
1095 void
1096 raft_transfer_leadership(struct raft *raft, const char *reason)
1097 {
1098 if (raft->role != RAFT_LEADER) {
1099 return;
1100 }
1101
1102 struct raft_server *s;
1103 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1104 if (!uuid_equals(&raft->sid, &s->sid)
1105 && s->phase == RAFT_PHASE_STABLE) {
1106 struct raft_conn *conn = raft_find_conn_by_sid(raft, &s->sid);
1107 if (!conn) {
1108 continue;
1109 }
1110
1111 union raft_rpc rpc = {
1112 .become_leader = {
1113 .common = {
1114 .comment = CONST_CAST(char *, reason),
1115 .type = RAFT_RPC_BECOME_LEADER,
1116 .sid = s->sid,
1117 },
1118 .term = raft->term,
1119 }
1120 };
1121 raft_send_to_conn(raft, &rpc, conn);
1122
1123 raft_record_note(raft, "transfer leadership",
1124 "transferring leadership to %s because %s",
1125 s->nickname, reason);
1126 break;
1127 }
1128 }
1129 }
1130
1131 /* Send a RemoveServerRequest to the rest of the servers in the cluster.
1132 *
1133 * If we know which server is the leader, we can just send the request to it.
1134 * However, we might not know which server is the leader, and we might never
1135 * find out if the remove request was actually previously committed by a
1136 * majority of the servers (because in that case the new leader will not send
1137 * AppendRequests or heartbeats to us). Therefore, we instead send
1138 * RemoveRequests to every server. This theoretically has the same problem, if
1139 * the current cluster leader was not previously a member of the cluster, but
1140 * it seems likely to be more robust in practice. */
1141 static void
1142 raft_send_remove_server_requests(struct raft *raft)
1143 {
1144 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1145 VLOG_INFO_RL(&rl, "sending remove request (joining=%s, leaving=%s)",
1146 raft->joining ? "true" : "false",
1147 raft->leaving ? "true" : "false");
1148 const struct raft_server *s;
1149 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1150 if (!uuid_equals(&s->sid, &raft->sid)) {
1151 union raft_rpc rpc = (union raft_rpc) {
1152 .remove_server_request = {
1153 .common = {
1154 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1155 .sid = s->sid,
1156 },
1157 .sid = raft->sid,
1158 },
1159 };
1160 raft_send(raft, &rpc);
1161 }
1162 }
1163
1164 raft->leave_timeout = time_msec() + raft->election_timer;
1165 }
1166
1167 /* Attempts to start 'raft' leaving its cluster. The caller can check progress
1168 * using raft_is_leaving() and raft_left(). */
1169 void
1170 raft_leave(struct raft *raft)
1171 {
1172 if (raft->joining || raft->failed || raft->leaving || raft->left) {
1173 return;
1174 }
1175 VLOG_INFO(SID_FMT": starting to leave cluster "CID_FMT,
1176 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1177 raft->leaving = true;
1178 raft_transfer_leadership(raft, "this server is leaving the cluster");
1179 raft_become_follower(raft);
1180 raft_send_remove_server_requests(raft);
1181 raft->leave_timeout = time_msec() + raft->election_timer;
1182 }
1183
1184 /* Returns true if 'raft' is currently attempting to leave its cluster. */
1185 bool
1186 raft_is_leaving(const struct raft *raft)
1187 {
1188 return raft->leaving;
1189 }
1190
1191 /* Returns true if 'raft' successfully left its cluster. */
1192 bool
1193 raft_left(const struct raft *raft)
1194 {
1195 return raft->left;
1196 }
1197
1198 /* Returns true if 'raft' has experienced a disk I/O failure. When this
1199 * returns true, only closing and reopening 'raft' allows for recovery. */
1200 bool
1201 raft_failed(const struct raft *raft)
1202 {
1203 return raft->failed;
1204 }
1205
1206 /* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1207 * current cluster. */
1208 void
1209 raft_take_leadership(struct raft *raft)
1210 {
1211 if (raft->role != RAFT_LEADER) {
1212 raft_start_election(raft, true);
1213 }
1214 }
1215
1216 /* Closes everything owned by 'raft' that might be visible outside the process:
1217 * network connections, commands, etc. This is part of closing 'raft'; it is
1218 * also used if 'raft' has failed in an unrecoverable way. */
1219 static void
1220 raft_close__(struct raft *raft)
1221 {
1222 if (!hmap_node_is_null(&raft->hmap_node)) {
1223 hmap_remove(&all_rafts, &raft->hmap_node);
1224 hmap_node_nullify(&raft->hmap_node);
1225 }
1226
1227 raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
1228
1229 struct raft_server *rs = raft->remove_server;
1230 if (rs) {
1231 raft_send_remove_server_reply__(raft, &rs->sid, &rs->requester_sid,
1232 rs->requester_conn, false,
1233 RAFT_SERVER_SHUTDOWN);
1234 raft_server_destroy(raft->remove_server);
1235 raft->remove_server = NULL;
1236 }
1237
1238 struct raft_conn *conn, *next;
1239 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1240 raft_conn_close(conn);
1241 }
1242 }
1243
1244 /* Closes and frees 'raft'.
1245 *
1246 * A server's cluster membership is independent of whether the server is
1247 * actually running. When a server that is a member of a cluster closes, the
1248 * cluster treats this as a server failure. */
1249 void
1250 raft_close(struct raft *raft)
1251 {
1252 if (!raft) {
1253 return;
1254 }
1255
1256 raft_transfer_leadership(raft, "this server is shutting down");
1257
1258 raft_close__(raft);
1259
1260 ovsdb_log_close(raft->log);
1261
1262 raft_servers_destroy(&raft->servers);
1263
1264 for (uint64_t index = raft->log_start; index < raft->log_end; index++) {
1265 struct raft_entry *e = &raft->entries[index - raft->log_start];
1266 raft_entry_uninit(e);
1267 }
1268 free(raft->entries);
1269
1270 raft_entry_uninit(&raft->snap);
1271
1272 raft_waiters_destroy(raft);
1273
1274 raft_servers_destroy(&raft->add_servers);
1275
1276 hmap_destroy(&raft->commands);
1277
1278 pstream_close(raft->listener);
1279
1280 sset_destroy(&raft->remote_addresses);
1281 free(raft->local_address);
1282 free(raft->local_nickname);
1283 free(raft->name);
1284
1285 free(raft);
1286 }
1287
1288 static bool
1289 raft_conn_receive(struct raft *raft, struct raft_conn *conn,
1290 union raft_rpc *rpc)
1291 {
1292 struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
1293 if (!msg) {
1294 return false;
1295 }
1296
1297 struct ovsdb_error *error = raft_rpc_from_jsonrpc(&raft->cid, &raft->sid,
1298 msg, rpc);
1299 jsonrpc_msg_destroy(msg);
1300 if (error) {
1301 char *s = ovsdb_error_to_string_free(error);
1302 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn->js), s);
1303 free(s);
1304 return false;
1305 }
1306
1307 if (uuid_is_zero(&conn->sid)) {
1308 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1309 conn->sid = rpc->common.sid;
1310 VLOG_INFO_RL(&rl, "%s: learned server ID "SID_FMT,
1311 jsonrpc_session_get_name(conn->js), SID_ARGS(&conn->sid));
1312 } else if (!uuid_equals(&conn->sid, &rpc->common.sid)) {
1313 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1314 VLOG_WARN_RL(&rl, "%s: ignoring message with unexpected server ID "
1315 SID_FMT" (expected "SID_FMT")",
1316 jsonrpc_session_get_name(conn->js),
1317 SID_ARGS(&rpc->common.sid), SID_ARGS(&conn->sid));
1318 raft_rpc_uninit(rpc);
1319 return false;
1320 }
1321
1322 const char *address = (rpc->type == RAFT_RPC_HELLO_REQUEST
1323 ? rpc->hello_request.address
1324 : rpc->type == RAFT_RPC_ADD_SERVER_REQUEST
1325 ? rpc->add_server_request.address
1326 : NULL);
1327 if (address) {
1328 char *new_nickname = raft_address_to_nickname(address, &conn->sid);
1329 if (strcmp(conn->nickname, new_nickname)) {
1330 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1331 VLOG_INFO_RL(&rl, "%s: learned remote address %s",
1332 jsonrpc_session_get_name(conn->js), address);
1333
1334 free(conn->nickname);
1335 conn->nickname = new_nickname;
1336 } else {
1337 free(new_nickname);
1338 }
1339 }
1340
1341 return true;
1342 }
1343
1344 static const char *
1345 raft_get_nickname(const struct raft *raft, const struct uuid *sid,
1346 char buf[SID_LEN + 1], size_t bufsize)
1347 {
1348 if (uuid_equals(sid, &raft->sid)) {
1349 return raft->local_nickname;
1350 }
1351
1352 const char *s = raft_servers_get_nickname__(&raft->servers, sid);
1353 if (s) {
1354 return s;
1355 }
1356
1357 return raft_servers_get_nickname(&raft->add_servers, sid, buf, bufsize);
1358 }
1359
1360 static void
1361 log_rpc(const union raft_rpc *rpc, const char *direction,
1362 const struct raft_conn *conn, int line_number)
1363 {
1364 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
1365 if (!raft_rpc_is_heartbeat(rpc) && !VLOG_DROP_DBG(&rl)) {
1366 struct ds s = DS_EMPTY_INITIALIZER;
1367 if (line_number) {
1368 ds_put_format(&s, "raft.c:%d ", line_number);
1369 }
1370 ds_put_format(&s, "%s%s ", direction, conn->nickname);
1371 raft_rpc_format(rpc, &s);
1372 VLOG_DBG("%s", ds_cstr(&s));
1373 ds_destroy(&s);
1374 }
1375 }
1376
1377 static void
1378 raft_send_add_server_request(struct raft *raft, struct raft_conn *conn)
1379 {
1380 union raft_rpc rq = {
1381 .add_server_request = {
1382 .common = {
1383 .type = RAFT_RPC_ADD_SERVER_REQUEST,
1384 .sid = UUID_ZERO,
1385 .comment = NULL,
1386 },
1387 .address = raft->local_address,
1388 },
1389 };
1390 raft_send_to_conn(raft, &rq, conn);
1391 }
1392
1393 static void
1394 raft_conn_run(struct raft *raft, struct raft_conn *conn)
1395 {
1396 jsonrpc_session_run(conn->js);
1397
1398 unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
1399 bool just_connected = (new_seqno != conn->js_seqno
1400 && jsonrpc_session_is_connected(conn->js));
1401 conn->js_seqno = new_seqno;
1402 if (just_connected) {
1403 if (raft->joining) {
1404 raft_send_add_server_request(raft, conn);
1405 } else if (raft->leaving) {
1406 union raft_rpc rq = {
1407 .remove_server_request = {
1408 .common = {
1409 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1410 .sid = conn->sid,
1411 },
1412 .sid = raft->sid,
1413 },
1414 };
1415 raft_send_to_conn(raft, &rq, conn);
1416 } else {
1417 union raft_rpc rq = (union raft_rpc) {
1418 .hello_request = {
1419 .common = {
1420 .type = RAFT_RPC_HELLO_REQUEST,
1421 .sid = conn->sid,
1422 },
1423 .address = raft->local_address,
1424 },
1425 };
1426 raft_send_to_conn(raft, &rq, conn);
1427 }
1428 }
1429
1430 for (size_t i = 0; i < 50; i++) {
1431 union raft_rpc rpc;
1432 if (!raft_conn_receive(raft, conn, &rpc)) {
1433 break;
1434 }
1435
1436 log_rpc(&rpc, "<--", conn, 0);
1437 raft_handle_rpc(raft, &rpc);
1438 raft_rpc_uninit(&rpc);
1439 }
1440 }
1441
1442 static void
1443 raft_waiter_complete_rpc(struct raft *raft, const union raft_rpc *rpc)
1444 {
1445 uint64_t term = raft_rpc_get_term(rpc);
1446 if (term && term < raft->term) {
1447 /* Drop the message because it's for an expired term. */
1448 return;
1449 }
1450
1451 if (!raft_is_rpc_synced(raft, rpc)) {
1452 /* This is a bug. A reply message is deferred because some state in
1453 * the message, such as a term or index, has not been committed to
1454 * disk, and they should only be completed when that commit is done.
1455 * But this message is being completed before the commit is finished.
1456 * Complain, and hope that someone reports the bug. */
1457 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1458 if (VLOG_DROP_ERR(&rl)) {
1459 return;
1460 }
1461
1462 struct ds s = DS_EMPTY_INITIALIZER;
1463
1464 if (term > raft->synced_term) {
1465 ds_put_format(&s, " because message term %"PRIu64" is "
1466 "past synced term %"PRIu64,
1467 term, raft->synced_term);
1468 }
1469
1470 uint64_t index = raft_rpc_get_min_sync_index(rpc);
1471 if (index > raft->log_synced) {
1472 ds_put_format(&s, " %s message index %"PRIu64" is past last "
1473 "synced index %"PRIu64,
1474 s.length ? "and" : "because",
1475 index, raft->log_synced);
1476 }
1477
1478 const struct uuid *vote = raft_rpc_get_vote(rpc);
1479 if (vote && !uuid_equals(vote, &raft->synced_vote)) {
1480 char buf1[SID_LEN + 1];
1481 char buf2[SID_LEN + 1];
1482 ds_put_format(&s, " %s vote %s differs from synced vote %s",
1483 s.length ? "and" : "because",
1484 raft_get_nickname(raft, vote, buf1, sizeof buf1),
1485 raft_get_nickname(raft, &raft->synced_vote,
1486 buf2, sizeof buf2));
1487 }
1488
1489 char buf[SID_LEN + 1];
1490 ds_put_format(&s, ": %s ",
1491 raft_get_nickname(raft, &rpc->common.sid,
1492 buf, sizeof buf));
1493 raft_rpc_format(rpc, &s);
1494 VLOG_ERR("internal error: deferred %s message completed "
1495 "but not ready to send%s",
1496 raft_rpc_type_to_string(rpc->type), ds_cstr(&s));
1497 ds_destroy(&s);
1498
1499 return;
1500 }
1501
1502 struct raft_conn *dst = raft_find_conn_by_sid(raft, &rpc->common.sid);
1503 if (dst) {
1504 raft_send_to_conn(raft, rpc, dst);
1505 }
1506 }
1507
1508 static void
1509 raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
1510 {
1511 switch (w->type) {
1512 case RAFT_W_ENTRY:
1513 if (raft->role == RAFT_LEADER) {
1514 raft_update_our_match_index(raft, w->entry.index);
1515 }
1516 raft->log_synced = w->entry.index;
1517 break;
1518
1519 case RAFT_W_TERM:
1520 raft->synced_term = w->term.term;
1521 raft->synced_vote = w->term.vote;
1522 break;
1523
1524 case RAFT_W_RPC:
1525 raft_waiter_complete_rpc(raft, w->rpc);
1526 break;
1527 }
1528 }
1529
1530 static void
1531 raft_waiter_destroy(struct raft_waiter *w)
1532 {
1533 if (!w) {
1534 return;
1535 }
1536
1537 ovs_list_remove(&w->list_node);
1538
1539 switch (w->type) {
1540 case RAFT_W_ENTRY:
1541 case RAFT_W_TERM:
1542 break;
1543
1544 case RAFT_W_RPC:
1545 raft_rpc_uninit(w->rpc);
1546 free(w->rpc);
1547 break;
1548 }
1549 free(w);
1550 }
1551
1552 static void
1553 raft_waiters_run(struct raft *raft)
1554 {
1555 if (ovs_list_is_empty(&raft->waiters)) {
1556 return;
1557 }
1558
1559 uint64_t cur = ovsdb_log_commit_progress(raft->log);
1560 struct raft_waiter *w, *next;
1561 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1562 if (cur < w->commit_ticket) {
1563 break;
1564 }
1565 raft_waiter_complete(raft, w);
1566 raft_waiter_destroy(w);
1567 }
1568 }
1569
1570 static void
1571 raft_waiters_wait(struct raft *raft)
1572 {
1573 struct raft_waiter *w;
1574 LIST_FOR_EACH (w, list_node, &raft->waiters) {
1575 ovsdb_log_commit_wait(raft->log, w->commit_ticket);
1576 break;
1577 }
1578 }
1579
1580 static void
1581 raft_waiters_destroy(struct raft *raft)
1582 {
1583 struct raft_waiter *w, *next;
1584 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1585 raft_waiter_destroy(w);
1586 }
1587 }
1588
1589 static bool OVS_WARN_UNUSED_RESULT
1590 raft_set_term(struct raft *raft, uint64_t term, const struct uuid *vote)
1591 {
1592 struct ovsdb_error *error = raft_write_state(raft->log, term, vote);
1593 if (!raft_handle_write_error(raft, error)) {
1594 return false;
1595 }
1596
1597 struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_TERM, true);
1598 raft->term = w->term.term = term;
1599 raft->vote = w->term.vote = vote ? *vote : UUID_ZERO;
1600 return true;
1601 }
1602
1603 static void
1604 raft_accept_vote(struct raft *raft, struct raft_server *s,
1605 const struct uuid *vote)
1606 {
1607 if (uuid_equals(&s->vote, vote)) {
1608 return;
1609 }
1610 if (!uuid_is_zero(&s->vote)) {
1611 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1612 char buf1[SID_LEN + 1];
1613 char buf2[SID_LEN + 1];
1614 VLOG_WARN_RL(&rl, "server %s changed its vote from %s to %s",
1615 s->nickname,
1616 raft_get_nickname(raft, &s->vote, buf1, sizeof buf1),
1617 raft_get_nickname(raft, vote, buf2, sizeof buf2));
1618 }
1619 s->vote = *vote;
1620 if (uuid_equals(vote, &raft->sid)
1621 && ++raft->n_votes > hmap_count(&raft->servers) / 2) {
1622 raft_become_leader(raft);
1623 }
1624 }
1625
1626 static void
1627 raft_start_election(struct raft *raft, bool leadership_transfer)
1628 {
1629 if (raft->leaving) {
1630 return;
1631 }
1632
1633 struct raft_server *me = raft_find_server(raft, &raft->sid);
1634 if (!me) {
1635 return;
1636 }
1637
1638 if (!raft_set_term(raft, raft->term + 1, &raft->sid)) {
1639 return;
1640 }
1641
1642 ovs_assert(raft->role != RAFT_LEADER);
1643 raft->role = RAFT_CANDIDATE;
1644 /* If there was no leader elected since last election, we know we are
1645 * retrying now. */
1646 raft->candidate_retrying = !raft->had_leader;
1647 raft->had_leader = false;
1648
1649 raft->n_votes = 0;
1650
1651 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1652 if (!VLOG_DROP_INFO(&rl)) {
1653 long long int now = time_msec();
1654 if (now >= raft->election_timeout) {
1655 VLOG_INFO("term %"PRIu64": %lld ms timeout expired, "
1656 "starting election",
1657 raft->term, now - raft->election_base);
1658 } else {
1659 VLOG_INFO("term %"PRIu64": starting election", raft->term);
1660 }
1661 }
1662 raft_reset_election_timer(raft);
1663
1664 struct raft_server *peer;
1665 HMAP_FOR_EACH (peer, hmap_node, &raft->servers) {
1666 peer->vote = UUID_ZERO;
1667 if (uuid_equals(&raft->sid, &peer->sid)) {
1668 continue;
1669 }
1670
1671 union raft_rpc rq = {
1672 .vote_request = {
1673 .common = {
1674 .type = RAFT_RPC_VOTE_REQUEST,
1675 .sid = peer->sid,
1676 },
1677 .term = raft->term,
1678 .last_log_index = raft->log_end - 1,
1679 .last_log_term = (
1680 raft->log_end > raft->log_start
1681 ? raft->entries[raft->log_end - raft->log_start - 1].term
1682 : raft->snap.term),
1683 .leadership_transfer = leadership_transfer,
1684 },
1685 };
1686 raft_send(raft, &rq);
1687 }
1688
1689 /* Vote for ourselves. */
1690 raft_accept_vote(raft, me, &raft->sid);
1691 }
1692
1693 static void
1694 raft_open_conn(struct raft *raft, const char *address, const struct uuid *sid)
1695 {
1696 if (strcmp(address, raft->local_address)
1697 && !raft_find_conn_by_address(raft, address)) {
1698 raft_add_conn(raft, jsonrpc_session_open(address, true), sid, false);
1699 }
1700 }
1701
1702 static void
1703 raft_conn_close(struct raft_conn *conn)
1704 {
1705 jsonrpc_session_close(conn->js);
1706 ovs_list_remove(&conn->list_node);
1707 free(conn->nickname);
1708 free(conn);
1709 }
1710
1711 /* Returns true if 'conn' should stay open, false if it should be closed. */
1712 static bool
1713 raft_conn_should_stay_open(struct raft *raft, struct raft_conn *conn)
1714 {
1715 /* Close the connection if it's actually dead. If necessary, we'll
1716 * initiate a new session later. */
1717 if (!jsonrpc_session_is_alive(conn->js)) {
1718 return false;
1719 }
1720
1721 /* Keep incoming sessions. We trust the originator to decide to drop
1722 * it. */
1723 if (conn->incoming) {
1724 return true;
1725 }
1726
1727 /* If we are joining the cluster, keep sessions to the remote addresses
1728 * that are supposed to be part of the cluster we're joining. */
1729 if (raft->joining && sset_contains(&raft->remote_addresses,
1730 jsonrpc_session_get_name(conn->js))) {
1731 return true;
1732 }
1733
1734 /* We have joined the cluster. If we did that "recently", then there is a
1735 * chance that we do not have the most recent server configuration log
1736 * entry. If so, it's a waste to disconnect from the servers that were in
1737 * remote_addresses and that will probably appear in the configuration,
1738 * just to reconnect to them a moment later when we do get the
1739 * configuration update. If we are not ourselves in the configuration,
1740 * then we know that there must be a new configuration coming up, so in
1741 * that case keep the connection. */
1742 if (!raft_find_server(raft, &raft->sid)) {
1743 return true;
1744 }
1745
1746 /* Keep the connection only if the server is part of the configuration. */
1747 return raft_find_server(raft, &conn->sid);
1748 }
1749
1750 /* Allows 'raft' to maintain the distributed log. Call this function as part
1751 * of the process's main loop. */
1752 void
1753 raft_run(struct raft *raft)
1754 {
1755 if (raft->left || raft->failed) {
1756 return;
1757 }
1758
1759 raft_waiters_run(raft);
1760
1761 if (!raft->listener && time_msec() >= raft->listen_backoff) {
1762 char *paddr = raft_make_address_passive(raft->local_address);
1763 int error = pstream_open(paddr, &raft->listener, DSCP_DEFAULT);
1764 if (error) {
1765 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1766 VLOG_WARN_RL(&rl, "%s: listen failed (%s)",
1767 paddr, ovs_strerror(error));
1768 raft->listen_backoff = time_msec() + 1000;
1769 }
1770 free(paddr);
1771 }
1772
1773 if (raft->listener) {
1774 struct stream *stream;
1775 int error = pstream_accept(raft->listener, &stream);
1776 if (!error) {
1777 raft_add_conn(raft, jsonrpc_session_open_unreliably(
1778 jsonrpc_open(stream), DSCP_DEFAULT), NULL,
1779 true);
1780 } else if (error != EAGAIN) {
1781 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1782 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
1783 pstream_get_name(raft->listener),
1784 ovs_strerror(error));
1785 }
1786 }
1787
1788 /* Run RPCs for all open sessions. */
1789 struct raft_conn *conn;
1790 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1791 raft_conn_run(raft, conn);
1792 }
1793
1794 /* Close unneeded sessions. */
1795 struct raft_conn *next;
1796 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1797 if (!raft_conn_should_stay_open(raft, conn)) {
1798 raft_conn_close(conn);
1799 }
1800 }
1801
1802 /* Open needed sessions. */
1803 struct raft_server *server;
1804 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1805 raft_open_conn(raft, server->address, &server->sid);
1806 }
1807 if (raft->joining) {
1808 const char *address;
1809 SSET_FOR_EACH (address, &raft->remote_addresses) {
1810 raft_open_conn(raft, address, NULL);
1811 }
1812 }
1813
1814 if (!raft->joining && time_msec() >= raft->election_timeout) {
1815 if (raft->role == RAFT_LEADER) {
1816 /* Check if majority of followers replied, then reset
1817 * election_timeout and reset s->replied. Otherwise, become
1818 * follower.
1819 *
1820 * Raft paper section 6.2: Leaders: A server might be in the leader
1821 * state, but if it isn’t the current leader, it could be
1822 * needlessly delaying client requests. For example, suppose a
1823 * leader is partitioned from the rest of the cluster, but it can
1824 * still communicate with a particular client. Without additional
1825 * mechanism, it could delay a request from that client forever,
1826 * being unable to replicate a log entry to any other servers.
1827 * Meanwhile, there might be another leader of a newer term that is
1828 * able to communicate with a majority of the cluster and would be
1829 * able to commit the client’s request. Thus, a leader in Raft
1830 * steps down if an election timeout elapses without a successful
1831 * round of heartbeats to a majority of its cluster; this allows
1832 * clients to retry their requests with another server. */
1833 int count = 0;
1834 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1835 if (server->replied) {
1836 count ++;
1837 }
1838 }
1839 if (count >= hmap_count(&raft->servers) / 2) {
1840 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1841 server->replied = false;
1842 }
1843 raft_reset_election_timer(raft);
1844 } else {
1845 raft_become_follower(raft);
1846 raft_start_election(raft, false);
1847 }
1848 } else {
1849 raft_start_election(raft, false);
1850 }
1851
1852 }
1853
1854 if (raft->leaving && time_msec() >= raft->leave_timeout) {
1855 raft_send_remove_server_requests(raft);
1856 }
1857
1858 if (raft->joining && time_msec() >= raft->join_timeout) {
1859 raft->join_timeout = time_msec() + 1000;
1860 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1861 raft_send_add_server_request(raft, conn);
1862 }
1863 }
1864
1865 long long int now = time_msec();
1866 if (now >= raft->ping_timeout) {
1867 if (raft->role == RAFT_LEADER) {
1868 raft_send_heartbeats(raft);
1869 }
1870 /* Check if any commands timeout. Timeout is set to twice the time of
1871 * election base time so that commands can complete properly during
1872 * leader election. E.g. a leader crashed and current node with pending
1873 * commands becomes new leader: the pending commands can still complete
1874 * if the crashed leader has replicated the transactions to majority of
1875 * followers before it crashed. */
1876 struct raft_command *cmd, *next_cmd;
1877 HMAP_FOR_EACH_SAFE (cmd, next_cmd, hmap_node, &raft->commands) {
1878 if (cmd->timestamp
1879 && now - cmd->timestamp > raft->election_timer * 2) {
1880 raft_command_complete(raft, cmd, RAFT_CMD_TIMEOUT);
1881 }
1882 }
1883 raft_reset_ping_timer(raft);
1884 }
1885
1886 /* Do this only at the end; if we did it as soon as we set raft->left or
1887 * raft->failed in handling the RemoveServerReply, then it could easily
1888 * cause references to freed memory in RPC sessions, etc. */
1889 if (raft->left || raft->failed) {
1890 raft_close__(raft);
1891 }
1892 }
1893
1894 static void
1895 raft_wait_session(struct jsonrpc_session *js)
1896 {
1897 if (js) {
1898 jsonrpc_session_wait(js);
1899 jsonrpc_session_recv_wait(js);
1900 }
1901 }
1902
1903 /* Causes the next call to poll_block() to wake up when 'raft' needs to do
1904 * something. */
1905 void
1906 raft_wait(struct raft *raft)
1907 {
1908 if (raft->left || raft->failed) {
1909 return;
1910 }
1911
1912 raft_waiters_wait(raft);
1913
1914 if (raft->listener) {
1915 pstream_wait(raft->listener);
1916 } else {
1917 poll_timer_wait_until(raft->listen_backoff);
1918 }
1919
1920 struct raft_conn *conn;
1921 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1922 raft_wait_session(conn->js);
1923 }
1924
1925 if (!raft->joining) {
1926 poll_timer_wait_until(raft->election_timeout);
1927 } else {
1928 poll_timer_wait_until(raft->join_timeout);
1929 }
1930 if (raft->leaving) {
1931 poll_timer_wait_until(raft->leave_timeout);
1932 }
1933 if (raft->role == RAFT_LEADER || !hmap_is_empty(&raft->commands)) {
1934 poll_timer_wait_until(raft->ping_timeout);
1935 }
1936 }
1937
1938 static struct raft_waiter *
1939 raft_waiter_create(struct raft *raft, enum raft_waiter_type type,
1940 bool start_commit)
1941 {
1942 struct raft_waiter *w = xzalloc(sizeof *w);
1943 ovs_list_push_back(&raft->waiters, &w->list_node);
1944 w->commit_ticket = start_commit ? ovsdb_log_commit_start(raft->log) : 0;
1945 w->type = type;
1946 return w;
1947 }
1948
1949 /* Returns a human-readable representation of 'status' (or NULL if 'status' is
1950 * invalid). */
1951 const char *
1952 raft_command_status_to_string(enum raft_command_status status)
1953 {
1954 switch (status) {
1955 case RAFT_CMD_INCOMPLETE:
1956 return "operation still in progress";
1957 case RAFT_CMD_SUCCESS:
1958 return "success";
1959 case RAFT_CMD_NOT_LEADER:
1960 return "not leader";
1961 case RAFT_CMD_BAD_PREREQ:
1962 return "prerequisite check failed";
1963 case RAFT_CMD_LOST_LEADERSHIP:
1964 return "lost leadership";
1965 case RAFT_CMD_SHUTDOWN:
1966 return "server shutdown";
1967 case RAFT_CMD_IO_ERROR:
1968 return "I/O error";
1969 case RAFT_CMD_TIMEOUT:
1970 return "timeout";
1971 default:
1972 return NULL;
1973 }
1974 }
1975
1976 /* Converts human-readable status in 's' into status code in '*statusp'.
1977 * Returns true if successful, false if 's' is unknown. */
1978 bool
1979 raft_command_status_from_string(const char *s,
1980 enum raft_command_status *statusp)
1981 {
1982 for (enum raft_command_status status = 0; ; status++) {
1983 const char *s2 = raft_command_status_to_string(status);
1984 if (!s2) {
1985 *statusp = 0;
1986 return false;
1987 } else if (!strcmp(s, s2)) {
1988 *statusp = status;
1989 return true;
1990 }
1991 }
1992 }
1993
1994 static const struct uuid *
1995 raft_get_eid(const struct raft *raft, uint64_t index)
1996 {
1997 for (; index >= raft->log_start; index--) {
1998 const struct raft_entry *e = raft_get_entry(raft, index);
1999 if (e->data) {
2000 return &e->eid;
2001 }
2002 }
2003 return &raft->snap.eid;
2004 }
2005
2006 const struct uuid *
2007 raft_current_eid(const struct raft *raft)
2008 {
2009 return raft_get_eid(raft, raft->log_end - 1);
2010 }
2011
2012 static struct raft_command *
2013 raft_command_create_completed(enum raft_command_status status)
2014 {
2015 ovs_assert(status != RAFT_CMD_INCOMPLETE);
2016
2017 struct raft_command *cmd = xzalloc(sizeof *cmd);
2018 cmd->n_refs = 1;
2019 cmd->status = status;
2020 return cmd;
2021 }
2022
2023 static struct raft_command *
2024 raft_command_create_incomplete(struct raft *raft, uint64_t index)
2025 {
2026 struct raft_command *cmd = xzalloc(sizeof *cmd);
2027 cmd->n_refs = 2; /* One for client, one for raft->commands. */
2028 cmd->index = index;
2029 cmd->status = RAFT_CMD_INCOMPLETE;
2030 hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
2031 return cmd;
2032 }
2033
2034 static struct raft_command * OVS_WARN_UNUSED_RESULT
2035 raft_command_initiate(struct raft *raft,
2036 const struct json *data, const struct json *servers,
2037 uint64_t election_timer, const struct uuid *eid)
2038 {
2039 /* Write to local log. */
2040 uint64_t index = raft->log_end;
2041 if (!raft_handle_write_error(
2042 raft, raft_write_entry(
2043 raft, raft->term, json_nullable_clone(data), eid,
2044 json_nullable_clone(servers),
2045 election_timer))) {
2046 return raft_command_create_completed(RAFT_CMD_IO_ERROR);
2047 }
2048
2049 struct raft_command *cmd = raft_command_create_incomplete(raft, index);
2050 ovs_assert(eid);
2051 cmd->eid = *eid;
2052 cmd->timestamp = time_msec();
2053
2054 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index = cmd->index;
2055
2056 if (failure_test == FT_CRASH_BEFORE_SEND_APPEND_REQ) {
2057 ovs_fatal(0, "Raft test: crash before sending append_request.");
2058 }
2059 /* Write to remote logs. */
2060 struct raft_server *s;
2061 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2062 if (!uuid_equals(&s->sid, &raft->sid) && s->next_index == index) {
2063 raft_send_append_request(raft, s, 1, "execute command");
2064 s->next_index++;
2065 }
2066 }
2067 if (failure_test == FT_CRASH_AFTER_SEND_APPEND_REQ) {
2068 ovs_fatal(0, "Raft test: crash after sending append_request.");
2069 }
2070 raft_reset_ping_timer(raft);
2071
2072 return cmd;
2073 }
2074
2075 static void
2076 log_all_commands(struct raft *raft)
2077 {
2078 struct raft_command *cmd, *next;
2079 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2080 VLOG_DBG("raft command eid: "UUID_FMT, UUID_ARGS(&cmd->eid));
2081 }
2082 }
2083
2084 static struct raft_command * OVS_WARN_UNUSED_RESULT
2085 raft_command_execute__(struct raft *raft, const struct json *data,
2086 const struct json *servers, uint64_t election_timer,
2087 const struct uuid *prereq, struct uuid *result)
2088 {
2089 if (raft->joining || raft->leaving || raft->left || raft->failed) {
2090 return raft_command_create_completed(RAFT_CMD_SHUTDOWN);
2091 }
2092
2093 if (raft->role != RAFT_LEADER) {
2094 /* Consider proxying the command to the leader. We can only do that if
2095 * we know the leader and the command does not change the set of
2096 * servers. We do not proxy commands without prerequisites, even
2097 * though we could, because in an OVSDB context a log entry doesn't
2098 * make sense without context. */
2099 if (servers || election_timer || !data
2100 || raft->role != RAFT_FOLLOWER || uuid_is_zero(&raft->leader_sid)
2101 || !prereq) {
2102 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2103 }
2104 }
2105
2106 struct uuid eid = data ? uuid_random() : UUID_ZERO;
2107 if (result) {
2108 *result = eid;
2109 }
2110
2111 if (raft->role != RAFT_LEADER) {
2112 const union raft_rpc rpc = {
2113 .execute_command_request = {
2114 .common = {
2115 .type = RAFT_RPC_EXECUTE_COMMAND_REQUEST,
2116 .sid = raft->leader_sid,
2117 },
2118 .data = CONST_CAST(struct json *, data),
2119 .prereq = *prereq,
2120 .result = eid,
2121 }
2122 };
2123 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REQ) {
2124 ovs_fatal(0, "Raft test: crash before sending "
2125 "execute_command_request");
2126 }
2127 if (!raft_send(raft, &rpc)) {
2128 /* Couldn't send command, so it definitely failed. */
2129 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2130 }
2131 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REQ) {
2132 ovs_fatal(0, "Raft test: crash after sending "
2133 "execute_command_request");
2134 }
2135
2136 struct raft_command *cmd = raft_command_create_incomplete(raft, 0);
2137 cmd->timestamp = time_msec();
2138 cmd->eid = eid;
2139 log_all_commands(raft);
2140 return cmd;
2141 }
2142
2143 const struct uuid *current_eid = raft_current_eid(raft);
2144 if (prereq && !uuid_equals(prereq, current_eid)) {
2145 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2146 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
2147 "prerequisite "UUID_FMT,
2148 UUID_ARGS(current_eid), UUID_ARGS(prereq));
2149 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ);
2150 }
2151
2152 return raft_command_initiate(raft, data, servers, election_timer, &eid);
2153 }
2154
2155 /* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2156 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2157 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2158 * populated with the entry ID for the new log entry.
2159 *
2160 * Returns a "struct raft_command" that may be used to track progress adding
2161 * the log entry. The caller must eventually free the returned structure, with
2162 * raft_command_unref(). */
2163 struct raft_command * OVS_WARN_UNUSED_RESULT
2164 raft_command_execute(struct raft *raft, const struct json *data,
2165 const struct uuid *prereq, struct uuid *result)
2166 {
2167 return raft_command_execute__(raft, data, NULL, 0, prereq, result);
2168 }
2169
2170 /* Returns the status of 'cmd'. */
2171 enum raft_command_status
2172 raft_command_get_status(const struct raft_command *cmd)
2173 {
2174 ovs_assert(cmd->n_refs > 0);
2175 return cmd->status;
2176 }
2177
2178 /* Returns the index of the log entry at which 'cmd' was committed.
2179 *
2180 * This function works only with successful commands. */
2181 uint64_t
2182 raft_command_get_commit_index(const struct raft_command *cmd)
2183 {
2184 ovs_assert(cmd->n_refs > 0);
2185 ovs_assert(cmd->status == RAFT_CMD_SUCCESS);
2186 return cmd->index;
2187 }
2188
2189 /* Frees 'cmd'. */
2190 void
2191 raft_command_unref(struct raft_command *cmd)
2192 {
2193 if (cmd) {
2194 ovs_assert(cmd->n_refs > 0);
2195 if (!--cmd->n_refs) {
2196 free(cmd);
2197 }
2198 }
2199 }
2200
2201 /* Causes poll_block() to wake up when 'cmd' has status to report. */
2202 void
2203 raft_command_wait(const struct raft_command *cmd)
2204 {
2205 if (cmd->status != RAFT_CMD_INCOMPLETE) {
2206 poll_immediate_wake();
2207 }
2208 }
2209
2210 static void
2211 raft_command_complete(struct raft *raft,
2212 struct raft_command *cmd,
2213 enum raft_command_status status)
2214 {
2215 VLOG_DBG("raft_command_complete eid "UUID_FMT" status: %s",
2216 UUID_ARGS(&cmd->eid), raft_command_status_to_string(status));
2217 if (!uuid_is_zero(&cmd->sid)) {
2218 uint64_t commit_index = status == RAFT_CMD_SUCCESS ? cmd->index : 0;
2219 raft_send_execute_command_reply(raft, &cmd->sid, &cmd->eid, status,
2220 commit_index);
2221 }
2222
2223 ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
2224 ovs_assert(cmd->n_refs > 0);
2225 hmap_remove(&raft->commands, &cmd->hmap_node);
2226 cmd->status = status;
2227 raft_command_unref(cmd);
2228 }
2229
2230 static void
2231 raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
2232 {
2233 struct raft_command *cmd, *next;
2234 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2235 raft_command_complete(raft, cmd, status);
2236 }
2237 }
2238
2239 static struct raft_command *
2240 raft_find_command_by_eid(struct raft *raft, const struct uuid *eid)
2241 {
2242 struct raft_command *cmd;
2243
2244 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2245 if (uuid_equals(&cmd->eid, eid)) {
2246 return cmd;
2247 }
2248 }
2249 return NULL;
2250 }
2251 \f
2252 #define RAFT_RPC(ENUM, NAME) \
2253 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2254 RAFT_RPC_TYPES
2255 #undef RAFT_RPC
2256
2257 static void
2258 raft_handle_hello_request(struct raft *raft OVS_UNUSED,
2259 const struct raft_hello_request *hello OVS_UNUSED)
2260 {
2261 }
2262
2263 /* 'sid' is the server being added. */
2264 static void
2265 raft_send_add_server_reply__(struct raft *raft, const struct uuid *sid,
2266 const char *address,
2267 bool success, const char *comment)
2268 {
2269 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2270 if (!VLOG_DROP_INFO(&rl)) {
2271 struct ds s = DS_EMPTY_INITIALIZER;
2272 char buf[SID_LEN + 1];
2273 ds_put_format(&s, "adding %s ("SID_FMT" at %s) "
2274 "to cluster "CID_FMT" %s",
2275 raft_get_nickname(raft, sid, buf, sizeof buf),
2276 SID_ARGS(sid), address, CID_ARGS(&raft->cid),
2277 success ? "succeeded" : "failed");
2278 if (comment) {
2279 ds_put_format(&s, " (%s)", comment);
2280 }
2281 VLOG_INFO("%s", ds_cstr(&s));
2282 ds_destroy(&s);
2283 }
2284
2285 union raft_rpc rpy = {
2286 .add_server_reply = {
2287 .common = {
2288 .type = RAFT_RPC_ADD_SERVER_REPLY,
2289 .sid = *sid,
2290 .comment = CONST_CAST(char *, comment),
2291 },
2292 .success = success,
2293 }
2294 };
2295
2296 struct sset *remote_addresses = &rpy.add_server_reply.remote_addresses;
2297 sset_init(remote_addresses);
2298 if (!raft->joining) {
2299 struct raft_server *s;
2300 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2301 if (!uuid_equals(&s->sid, &raft->sid)) {
2302 sset_add(remote_addresses, s->address);
2303 }
2304 }
2305 }
2306
2307 raft_send(raft, &rpy);
2308
2309 sset_destroy(remote_addresses);
2310 }
2311
2312 static void
2313 raft_send_remove_server_reply_rpc(struct raft *raft,
2314 const struct uuid *dst_sid,
2315 const struct uuid *target_sid,
2316 bool success, const char *comment)
2317 {
2318 if (uuid_equals(&raft->sid, dst_sid)) {
2319 if (success && uuid_equals(&raft->sid, target_sid)) {
2320 raft_finished_leaving_cluster(raft);
2321 }
2322 return;
2323 }
2324
2325 const union raft_rpc rpy = {
2326 .remove_server_reply = {
2327 .common = {
2328 .type = RAFT_RPC_REMOVE_SERVER_REPLY,
2329 .sid = *dst_sid,
2330 .comment = CONST_CAST(char *, comment),
2331 },
2332 .target_sid = (uuid_equals(dst_sid, target_sid)
2333 ? UUID_ZERO
2334 : *target_sid),
2335 .success = success,
2336 }
2337 };
2338 raft_send(raft, &rpy);
2339 }
2340
2341 static void
2342 raft_send_remove_server_reply__(struct raft *raft,
2343 const struct uuid *target_sid,
2344 const struct uuid *requester_sid,
2345 struct unixctl_conn *requester_conn,
2346 bool success, const char *comment)
2347 {
2348 struct ds s = DS_EMPTY_INITIALIZER;
2349 ds_put_format(&s, "request ");
2350 if (!uuid_is_zero(requester_sid)) {
2351 char buf[SID_LEN + 1];
2352 ds_put_format(&s, "by %s",
2353 raft_get_nickname(raft, requester_sid, buf, sizeof buf));
2354 } else {
2355 ds_put_cstr(&s, "via unixctl");
2356 }
2357 ds_put_cstr(&s, " to remove ");
2358 if (!requester_conn && uuid_equals(target_sid, requester_sid)) {
2359 ds_put_cstr(&s, "itself");
2360 } else {
2361 char buf[SID_LEN + 1];
2362 ds_put_cstr(&s, raft_get_nickname(raft, target_sid, buf, sizeof buf));
2363 if (uuid_equals(target_sid, &raft->sid)) {
2364 ds_put_cstr(&s, " (ourselves)");
2365 }
2366 }
2367 ds_put_format(&s, " from cluster "CID_FMT" %s",
2368 CID_ARGS(&raft->cid),
2369 success ? "succeeded" : "failed");
2370 if (comment) {
2371 ds_put_format(&s, " (%s)", comment);
2372 }
2373
2374 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2375 VLOG_INFO_RL(&rl, "%s", ds_cstr(&s));
2376
2377 /* Send RemoveServerReply to the requester (which could be a server or a
2378 * unixctl connection. Also always send it to the removed server; this
2379 * allows it to be sure that it's really removed and update its log and
2380 * disconnect permanently. */
2381 if (!uuid_is_zero(requester_sid)) {
2382 raft_send_remove_server_reply_rpc(raft, requester_sid, target_sid,
2383 success, comment);
2384 }
2385 if (!uuid_equals(requester_sid, target_sid)) {
2386 raft_send_remove_server_reply_rpc(raft, target_sid, target_sid,
2387 success, comment);
2388 }
2389 if (requester_conn) {
2390 if (success) {
2391 unixctl_command_reply(requester_conn, ds_cstr(&s));
2392 } else {
2393 unixctl_command_reply_error(requester_conn, ds_cstr(&s));
2394 }
2395 }
2396
2397 ds_destroy(&s);
2398 }
2399
2400 static void
2401 raft_send_add_server_reply(struct raft *raft,
2402 const struct raft_add_server_request *rq,
2403 bool success, const char *comment)
2404 {
2405 return raft_send_add_server_reply__(raft, &rq->common.sid, rq->address,
2406 success, comment);
2407 }
2408
2409 static void
2410 raft_send_remove_server_reply(struct raft *raft,
2411 const struct raft_remove_server_request *rq,
2412 bool success, const char *comment)
2413 {
2414 return raft_send_remove_server_reply__(raft, &rq->sid, &rq->common.sid,
2415 rq->requester_conn, success,
2416 comment);
2417 }
2418
2419 static void
2420 raft_become_follower(struct raft *raft)
2421 {
2422 raft->leader_sid = UUID_ZERO;
2423 if (raft->role == RAFT_FOLLOWER) {
2424 return;
2425 }
2426
2427 raft->role = RAFT_FOLLOWER;
2428 raft_reset_election_timer(raft);
2429
2430 /* Notify clients about lost leadership.
2431 *
2432 * We do not reverse our changes to 'raft->servers' because the new
2433 * configuration is already part of the log. Possibly the configuration
2434 * log entry will not be committed, but until we know that we must use the
2435 * new configuration. Our AppendEntries processing will properly update
2436 * the server configuration later, if necessary. */
2437 struct raft_server *s;
2438 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
2439 raft_send_add_server_reply__(raft, &s->sid, s->address, false,
2440 RAFT_SERVER_LOST_LEADERSHIP);
2441 }
2442 if (raft->remove_server) {
2443 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
2444 &raft->remove_server->requester_sid,
2445 raft->remove_server->requester_conn,
2446 false, RAFT_SERVER_LOST_LEADERSHIP);
2447 raft_server_destroy(raft->remove_server);
2448 raft->remove_server = NULL;
2449 }
2450
2451 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2452 }
2453
2454 static void
2455 raft_send_append_request(struct raft *raft,
2456 struct raft_server *peer, unsigned int n,
2457 const char *comment)
2458 {
2459 ovs_assert(raft->role == RAFT_LEADER);
2460
2461 const union raft_rpc rq = {
2462 .append_request = {
2463 .common = {
2464 .type = RAFT_RPC_APPEND_REQUEST,
2465 .sid = peer->sid,
2466 .comment = CONST_CAST(char *, comment),
2467 },
2468 .term = raft->term,
2469 .prev_log_index = peer->next_index - 1,
2470 .prev_log_term = (peer->next_index - 1 >= raft->log_start
2471 ? raft->entries[peer->next_index - 1
2472 - raft->log_start].term
2473 : raft->snap.term),
2474 .leader_commit = raft->commit_index,
2475 .entries = &raft->entries[peer->next_index - raft->log_start],
2476 .n_entries = n,
2477 },
2478 };
2479 raft_send(raft, &rq);
2480 }
2481
2482 static void
2483 raft_send_heartbeats(struct raft *raft)
2484 {
2485 struct raft_server *s;
2486 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2487 if (!uuid_equals(&raft->sid, &s->sid)) {
2488 raft_send_append_request(raft, s, 0, "heartbeat");
2489 }
2490 }
2491
2492 /* Send anyone waiting for a command to complete a ping to let them
2493 * know we're still working on it. */
2494 struct raft_command *cmd;
2495 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2496 if (!uuid_is_zero(&cmd->sid)) {
2497 raft_send_execute_command_reply(raft, &cmd->sid,
2498 &cmd->eid,
2499 RAFT_CMD_INCOMPLETE, 0);
2500 }
2501 }
2502
2503 raft_reset_ping_timer(raft);
2504 }
2505
2506 /* Initializes the fields in 's' that represent the leader's view of the
2507 * server. */
2508 static void
2509 raft_server_init_leader(struct raft *raft, struct raft_server *s)
2510 {
2511 s->next_index = raft->log_end;
2512 s->match_index = 0;
2513 s->phase = RAFT_PHASE_STABLE;
2514 s->replied = false;
2515 }
2516
2517 static void
2518 raft_set_leader(struct raft *raft, const struct uuid *sid)
2519 {
2520 raft->leader_sid = *sid;
2521 raft->had_leader = true;
2522 raft->candidate_retrying = false;
2523 }
2524
2525 static void
2526 raft_become_leader(struct raft *raft)
2527 {
2528 log_all_commands(raft);
2529
2530 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
2531 VLOG_INFO_RL(&rl, "term %"PRIu64": elected leader by %d+ of "
2532 "%"PRIuSIZE" servers", raft->term,
2533 raft->n_votes, hmap_count(&raft->servers));
2534
2535 ovs_assert(raft->role != RAFT_LEADER);
2536 raft->role = RAFT_LEADER;
2537 raft_set_leader(raft, &raft->sid);
2538 raft_reset_election_timer(raft);
2539 raft_reset_ping_timer(raft);
2540
2541 struct raft_server *s;
2542 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2543 raft_server_init_leader(raft, s);
2544 }
2545
2546 raft->election_timer_new = 0;
2547
2548 raft_update_our_match_index(raft, raft->log_end - 1);
2549 raft_send_heartbeats(raft);
2550
2551 /* Write the fact that we are leader to the log. This is not used by the
2552 * algorithm (although it could be, for quick restart), but it is used for
2553 * offline analysis to check for conformance with the properties that Raft
2554 * guarantees. */
2555 struct raft_record r = {
2556 .type = RAFT_REC_LEADER,
2557 .term = raft->term,
2558 .sid = raft->sid,
2559 };
2560 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2561
2562 /* Initiate a no-op commit. Otherwise we might never find out what's in
2563 * the log. See section 6.4 item 1:
2564 *
2565 * The Leader Completeness Property guarantees that a leader has all
2566 * committed entries, but at the start of its term, it may not know
2567 * which those are. To find out, it needs to commit an entry from its
2568 * term. Raft handles this by having each leader commit a blank no-op
2569 * entry into the log at the start of its term. As soon as this no-op
2570 * entry is committed, the leader’s commit index will be at least as
2571 * large as any other servers’ during its term.
2572 */
2573 raft_command_unref(raft_command_execute__(raft, NULL, NULL, 0, NULL,
2574 NULL));
2575 }
2576
2577 /* Processes term 'term' received as part of RPC 'common'. Returns true if the
2578 * caller should continue processing the RPC, false if the caller should reject
2579 * it due to a stale term. */
2580 static bool
2581 raft_receive_term__(struct raft *raft, const struct raft_rpc_common *common,
2582 uint64_t term)
2583 {
2584 /* Section 3.3 says:
2585 *
2586 * Current terms are exchanged whenever servers communicate; if one
2587 * server’s current term is smaller than the other’s, then it updates
2588 * its current term to the larger value. If a candidate or leader
2589 * discovers that its term is out of date, it immediately reverts to
2590 * follower state. If a server receives a request with a stale term
2591 * number, it rejects the request.
2592 */
2593 if (term > raft->term) {
2594 if (!raft_set_term(raft, term, NULL)) {
2595 /* Failed to update the term to 'term'. */
2596 return false;
2597 }
2598 raft_become_follower(raft);
2599 } else if (term < raft->term) {
2600 char buf[SID_LEN + 1];
2601 VLOG_INFO("rejecting term %"PRIu64" < current term %"PRIu64" received "
2602 "in %s message from server %s",
2603 term, raft->term,
2604 raft_rpc_type_to_string(common->type),
2605 raft_get_nickname(raft, &common->sid, buf, sizeof buf));
2606 return false;
2607 }
2608 return true;
2609 }
2610
2611 static void
2612 raft_get_servers_from_log(struct raft *raft, enum vlog_level level)
2613 {
2614 const struct json *servers_json = raft->snap.servers;
2615 for (uint64_t index = raft->log_end - 1; index >= raft->log_start;
2616 index--) {
2617 struct raft_entry *e = &raft->entries[index - raft->log_start];
2618 if (e->servers) {
2619 servers_json = e->servers;
2620 break;
2621 }
2622 }
2623
2624 struct hmap servers;
2625 struct ovsdb_error *error = raft_servers_from_json(servers_json, &servers);
2626 ovs_assert(!error);
2627 raft_set_servers(raft, &servers, level);
2628 raft_servers_destroy(&servers);
2629 }
2630
2631 /* Truncates the log, so that raft->log_end becomes 'new_end'.
2632 *
2633 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2634 * log file, but we don't have the right information to know how long it should
2635 * be. What we actually do is to append entries for older indexes to the
2636 * on-disk log; when we re-read it later, these entries truncate the log.
2637 *
2638 * Returns true if any of the removed log entries were server configuration
2639 * entries, false otherwise. */
2640 static bool
2641 raft_truncate(struct raft *raft, uint64_t new_end)
2642 {
2643 ovs_assert(new_end >= raft->log_start);
2644 if (raft->log_end > new_end) {
2645 char buf[SID_LEN + 1];
2646 VLOG_INFO("%s truncating %"PRIu64 " entries from end of log",
2647 raft_get_nickname(raft, &raft->sid, buf, sizeof buf),
2648 raft->log_end - new_end);
2649 }
2650
2651 bool servers_changed = false;
2652 while (raft->log_end > new_end) {
2653 struct raft_entry *entry = &raft->entries[--raft->log_end
2654 - raft->log_start];
2655 if (entry->servers) {
2656 servers_changed = true;
2657 }
2658 raft_entry_uninit(entry);
2659 }
2660 return servers_changed;
2661 }
2662
2663 static const struct json *
2664 raft_peek_next_entry(struct raft *raft, struct uuid *eid)
2665 {
2666 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2667 ovs_assert(raft->log_start <= raft->last_applied + 2);
2668 ovs_assert(raft->last_applied <= raft->commit_index);
2669 ovs_assert(raft->commit_index < raft->log_end);
2670
2671 if (raft->joining || raft->failed) {
2672 return NULL;
2673 }
2674
2675 if (raft->log_start == raft->last_applied + 2) {
2676 *eid = raft->snap.eid;
2677 return raft->snap.data;
2678 }
2679
2680 while (raft->last_applied < raft->commit_index) {
2681 const struct raft_entry *e = raft_get_entry(raft,
2682 raft->last_applied + 1);
2683 if (e->data) {
2684 *eid = e->eid;
2685 return e->data;
2686 }
2687 raft->last_applied++;
2688 }
2689 return NULL;
2690 }
2691
2692 static const struct json *
2693 raft_get_next_entry(struct raft *raft, struct uuid *eid)
2694 {
2695 const struct json *data = raft_peek_next_entry(raft, eid);
2696 if (data) {
2697 raft->last_applied++;
2698 }
2699 return data;
2700 }
2701
2702 /* Updates commit index in raft log. If commit index is already up-to-date
2703 * it does nothing and return false, otherwise, returns true. */
2704 static bool
2705 raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
2706 {
2707 if (new_commit_index <= raft->commit_index) {
2708 return false;
2709 }
2710
2711 if (raft->role == RAFT_LEADER) {
2712 while (raft->commit_index < new_commit_index) {
2713 uint64_t index = ++raft->commit_index;
2714 const struct raft_entry *e = raft_get_entry(raft, index);
2715 if (e->data) {
2716 struct raft_command *cmd
2717 = raft_find_command_by_eid(raft, &e->eid);
2718 if (cmd) {
2719 if (!cmd->index) {
2720 VLOG_DBG("Command completed after role change from"
2721 " follower to leader "UUID_FMT,
2722 UUID_ARGS(&e->eid));
2723 cmd->index = index;
2724 }
2725 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2726 }
2727 }
2728 if (e->election_timer) {
2729 VLOG_INFO("Election timer changed from %"PRIu64" to %"PRIu64,
2730 raft->election_timer, e->election_timer);
2731 raft->election_timer = e->election_timer;
2732 raft->election_timer_new = 0;
2733 }
2734 if (e->servers) {
2735 /* raft_run_reconfigure() can write a new Raft entry, which can
2736 * reallocate raft->entries, which would invalidate 'e', so
2737 * this case must be last, after the one for 'e->data'. */
2738 raft_run_reconfigure(raft);
2739 }
2740 }
2741 } else {
2742 while (raft->commit_index < new_commit_index) {
2743 uint64_t index = ++raft->commit_index;
2744 const struct raft_entry *e = raft_get_entry(raft, index);
2745 if (e->election_timer) {
2746 VLOG_INFO("Election timer changed from %"PRIu64" to %"PRIu64,
2747 raft->election_timer, e->election_timer);
2748 raft->election_timer = e->election_timer;
2749 }
2750 }
2751 /* Check if any pending command can be completed, and complete it.
2752 * This can happen when leader fail-over before sending
2753 * execute_command_reply. */
2754 const struct uuid *eid = raft_get_eid(raft, new_commit_index);
2755 struct raft_command *cmd = raft_find_command_by_eid(raft, eid);
2756 if (cmd) {
2757 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2758 VLOG_INFO_RL(&rl,
2759 "Command completed without reply (eid: "UUID_FMT", "
2760 "commit index: %"PRIu64")",
2761 UUID_ARGS(eid), new_commit_index);
2762 cmd->index = new_commit_index;
2763 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2764 }
2765 }
2766
2767 /* Write the commit index to the log. The next time we restart, this
2768 * allows us to start exporting a reasonably fresh log, instead of a log
2769 * that only contains the snapshot. */
2770 struct raft_record r = {
2771 .type = RAFT_REC_COMMIT_INDEX,
2772 .commit_index = raft->commit_index,
2773 };
2774 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2775 return true;
2776 }
2777
2778 /* This doesn't use rq->entries (but it does use rq->n_entries). */
2779 static void
2780 raft_send_append_reply(struct raft *raft, const struct raft_append_request *rq,
2781 enum raft_append_result result, const char *comment)
2782 {
2783 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2784 * min(leaderCommit, index of last new entry)" */
2785 if (result == RAFT_APPEND_OK && rq->leader_commit > raft->commit_index) {
2786 raft_update_commit_index(
2787 raft, MIN(rq->leader_commit, rq->prev_log_index + rq->n_entries));
2788 }
2789
2790 /* Send reply. */
2791 union raft_rpc reply = {
2792 .append_reply = {
2793 .common = {
2794 .type = RAFT_RPC_APPEND_REPLY,
2795 .sid = rq->common.sid,
2796 .comment = CONST_CAST(char *, comment),
2797 },
2798 .term = raft->term,
2799 .log_end = raft->log_end,
2800 .prev_log_index = rq->prev_log_index,
2801 .prev_log_term = rq->prev_log_term,
2802 .n_entries = rq->n_entries,
2803 .result = result,
2804 }
2805 };
2806 raft_send(raft, &reply);
2807 }
2808
2809 /* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2810 * NULL. Otherwise, returns an explanation for the mismatch. */
2811 static const char *
2812 match_index_and_term(const struct raft *raft,
2813 uint64_t prev_log_index, uint64_t prev_log_term)
2814 {
2815 if (prev_log_index < raft->log_start - 1) {
2816 return "mismatch before start of log";
2817 } else if (prev_log_index == raft->log_start - 1) {
2818 if (prev_log_term != raft->snap.term) {
2819 return "prev_term mismatch";
2820 }
2821 } else if (prev_log_index < raft->log_end) {
2822 if (raft->entries[prev_log_index - raft->log_start].term
2823 != prev_log_term) {
2824 return "term mismatch";
2825 }
2826 } else {
2827 /* prev_log_index >= raft->log_end */
2828 return "mismatch past end of log";
2829 }
2830 return NULL;
2831 }
2832
2833 static void
2834 raft_handle_append_entries(struct raft *raft,
2835 const struct raft_append_request *rq,
2836 uint64_t prev_log_index, uint64_t prev_log_term,
2837 const struct raft_entry *entries,
2838 unsigned int n_entries)
2839 {
2840 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2841 * the index and term of the entry in its log that immediately precedes
2842 * the new entries. If the follower does not find an entry in its log
2843 * with the same index and term, then it refuses the new entries." */
2844 const char *mismatch = match_index_and_term(raft, prev_log_index,
2845 prev_log_term);
2846 if (mismatch) {
2847 VLOG_INFO("rejecting append_request because previous entry "
2848 "%"PRIu64",%"PRIu64" not in local log (%s)",
2849 prev_log_term, prev_log_index, mismatch);
2850 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY, mismatch);
2851 return;
2852 }
2853
2854 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2855 * index but different terms), delete the existing entry and all that
2856 * follow it." */
2857 unsigned int i;
2858 bool servers_changed = false;
2859 for (i = 0; ; i++) {
2860 if (i >= n_entries) {
2861 /* No change. */
2862 if (rq->common.comment
2863 && !strcmp(rq->common.comment, "heartbeat")) {
2864 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "heartbeat");
2865 } else {
2866 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2867 }
2868 return;
2869 }
2870
2871 uint64_t log_index = (prev_log_index + 1) + i;
2872 if (log_index >= raft->log_end) {
2873 break;
2874 }
2875 if (raft->entries[log_index - raft->log_start].term
2876 != entries[i].term) {
2877 if (raft_truncate(raft, log_index)) {
2878 servers_changed = true;
2879 }
2880 break;
2881 }
2882 }
2883
2884 if (failure_test == FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE) {
2885 ovs_fatal(0, "Raft test: crash after receiving append_request with "
2886 "update.");
2887 }
2888 /* Figure 3.1: "Append any entries not already in the log." */
2889 struct ovsdb_error *error = NULL;
2890 bool any_written = false;
2891 for (; i < n_entries; i++) {
2892 const struct raft_entry *e = &entries[i];
2893 error = raft_write_entry(raft, e->term,
2894 json_nullable_clone(e->data), &e->eid,
2895 json_nullable_clone(e->servers),
2896 e->election_timer);
2897 if (error) {
2898 break;
2899 }
2900 any_written = true;
2901 if (e->servers) {
2902 servers_changed = true;
2903 }
2904 }
2905
2906 if (any_written) {
2907 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index
2908 = raft->log_end - 1;
2909 }
2910 if (servers_changed) {
2911 /* The set of servers might have changed; check. */
2912 raft_get_servers_from_log(raft, VLL_INFO);
2913 }
2914
2915 if (error) {
2916 char *s = ovsdb_error_to_string_free(error);
2917 VLOG_ERR("%s", s);
2918 free(s);
2919 raft_send_append_reply(raft, rq, RAFT_APPEND_IO_ERROR, "I/O error");
2920 return;
2921 }
2922
2923 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "log updated");
2924 }
2925
2926 static bool
2927 raft_update_leader(struct raft *raft, const struct uuid *sid)
2928 {
2929 if (raft->role == RAFT_LEADER) {
2930 char buf[SID_LEN + 1];
2931 VLOG_ERR("this server is leader but server %s claims to be",
2932 raft_get_nickname(raft, sid, buf, sizeof buf));
2933 return false;
2934 } else if (!uuid_equals(sid, &raft->leader_sid)) {
2935 if (!uuid_is_zero(&raft->leader_sid)) {
2936 char buf1[SID_LEN + 1];
2937 char buf2[SID_LEN + 1];
2938 VLOG_ERR("leader for term %"PRIu64" changed from %s to %s",
2939 raft->term,
2940 raft_get_nickname(raft, &raft->leader_sid,
2941 buf1, sizeof buf1),
2942 raft_get_nickname(raft, sid, buf2, sizeof buf2));
2943 } else {
2944 char buf[SID_LEN + 1];
2945 VLOG_INFO("server %s is leader for term %"PRIu64,
2946 raft_get_nickname(raft, sid, buf, sizeof buf),
2947 raft->term);
2948 }
2949 raft_set_leader(raft, sid);
2950
2951 /* Record the leader to the log. This is not used by the algorithm
2952 * (although it could be, for quick restart), but it is used for
2953 * offline analysis to check for conformance with the properties
2954 * that Raft guarantees. */
2955 struct raft_record r = {
2956 .type = RAFT_REC_LEADER,
2957 .term = raft->term,
2958 .sid = *sid,
2959 };
2960 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2961 }
2962 return true;
2963 }
2964
2965 static void
2966 raft_handle_append_request(struct raft *raft,
2967 const struct raft_append_request *rq)
2968 {
2969 /* We do not check whether the server that sent the request is part of the
2970 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
2971 * from a leader that is not part of the server’s latest configuration.
2972 * Otherwise, a new server could never be added to the cluster (it would
2973 * never accept any log entries preceding the configuration entry that adds
2974 * the server)." */
2975 if (!raft_update_leader(raft, &rq->common.sid)) {
2976 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
2977 "usurped leadership");
2978 return;
2979 }
2980 raft_reset_election_timer(raft);
2981
2982 /* First check for the common case, where the AppendEntries request is
2983 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
2984 * like this:
2985 *
2986 * rq->prev_log_index
2987 * | first_entry_index
2988 * | | nth_entry_index
2989 * | | |
2990 * v v v
2991 * +---+---+---+---+
2992 * T | T | T | T | T |
2993 * +---+-------+---+
2994 * +---+---+---+---+
2995 * T | T | T | T | T |
2996 * +---+---+---+---+
2997 * ^ ^
2998 * | |
2999 * log_start log_end
3000 * */
3001 uint64_t first_entry_index = rq->prev_log_index + 1;
3002 uint64_t nth_entry_index = rq->prev_log_index + rq->n_entries;
3003 if (OVS_LIKELY(first_entry_index >= raft->log_start)) {
3004 raft_handle_append_entries(raft, rq,
3005 rq->prev_log_index, rq->prev_log_term,
3006 rq->entries, rq->n_entries);
3007 return;
3008 }
3009
3010 /* Now a series of checks for odd cases, where the AppendEntries request
3011 * extends earlier than the beginning of our log, into the log entries
3012 * discarded by the most recent snapshot. */
3013
3014 /*
3015 * Handle the case where the indexes covered by rq->entries[] are entirely
3016 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
3017 * everything in the AppendEntries request must already have been
3018 * committed, and we might as well return true.
3019 *
3020 * rq->prev_log_index
3021 * | first_entry_index
3022 * | | nth_entry_index
3023 * | | |
3024 * v v v
3025 * +---+---+---+---+
3026 * T | T | T | T | T |
3027 * +---+-------+---+
3028 * +---+---+---+---+
3029 * T | T | T | T | T |
3030 * +---+---+---+---+
3031 * ^ ^
3032 * | |
3033 * log_start log_end
3034 */
3035 if (nth_entry_index < raft->log_start - 1) {
3036 raft_send_append_reply(raft, rq, RAFT_APPEND_OK,
3037 "append before log start");
3038 return;
3039 }
3040
3041 /*
3042 * Handle the case where the last entry in rq->entries[] has the same index
3043 * as 'log_start - 1', so we can compare their terms:
3044 *
3045 * rq->prev_log_index
3046 * | first_entry_index
3047 * | | nth_entry_index
3048 * | | |
3049 * v v v
3050 * +---+---+---+---+
3051 * T | T | T | T | T |
3052 * +---+-------+---+
3053 * +---+---+---+---+
3054 * T | T | T | T | T |
3055 * +---+---+---+---+
3056 * ^ ^
3057 * | |
3058 * log_start log_end
3059 *
3060 * There's actually a sub-case where rq->n_entries == 0, in which we
3061 * compare rq->prev_term:
3062 *
3063 * rq->prev_log_index
3064 * |
3065 * |
3066 * |
3067 * v
3068 * T
3069 *
3070 * +---+---+---+---+
3071 * T | T | T | T | T |
3072 * +---+---+---+---+
3073 * ^ ^
3074 * | |
3075 * log_start log_end
3076 */
3077 if (nth_entry_index == raft->log_start - 1) {
3078 if (rq->n_entries
3079 ? raft->snap.term == rq->entries[rq->n_entries - 1].term
3080 : raft->snap.term == rq->prev_log_term) {
3081 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
3082 } else {
3083 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
3084 "term mismatch");
3085 }
3086 return;
3087 }
3088
3089 /*
3090 * We now know that the data in rq->entries[] overlaps the data in
3091 * raft->entries[], as shown below, with some positive 'ofs':
3092 *
3093 * rq->prev_log_index
3094 * | first_entry_index
3095 * | | nth_entry_index
3096 * | | |
3097 * v v v
3098 * +---+---+---+---+---+
3099 * T | T | T | T | T | T |
3100 * +---+-------+---+---+
3101 * +---+---+---+---+
3102 * T | T | T | T | T |
3103 * +---+---+---+---+
3104 * ^ ^
3105 * | |
3106 * log_start log_end
3107 *
3108 * |<-- ofs -->|
3109 *
3110 * We transform this into the following by trimming the first 'ofs'
3111 * elements off of rq->entries[], ending up with the following. Notice how
3112 * we retain the term but not the data for rq->entries[ofs - 1]:
3113 *
3114 * first_entry_index + ofs - 1
3115 * | first_entry_index + ofs
3116 * | | nth_entry_index + ofs
3117 * | | |
3118 * v v v
3119 * +---+---+
3120 * T | T | T |
3121 * +---+---+
3122 * +---+---+---+---+
3123 * T | T | T | T | T |
3124 * +---+---+---+---+
3125 * ^ ^
3126 * | |
3127 * log_start log_end
3128 */
3129 uint64_t ofs = raft->log_start - first_entry_index;
3130 raft_handle_append_entries(raft, rq,
3131 raft->log_start - 1, rq->entries[ofs - 1].term,
3132 &rq->entries[ofs], rq->n_entries - ofs);
3133 }
3134
3135 /* Returns true if 'raft' has another log entry or snapshot to read. */
3136 bool
3137 raft_has_next_entry(const struct raft *raft_)
3138 {
3139 struct raft *raft = CONST_CAST(struct raft *, raft_);
3140 struct uuid eid;
3141 return raft_peek_next_entry(raft, &eid) != NULL;
3142 }
3143
3144 /* Returns the next log entry or snapshot from 'raft', or NULL if there are
3145 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
3146 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
3147 * log entry. */
3148 const struct json *
3149 raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
3150 {
3151 const struct json *data = raft_get_next_entry(raft, eid);
3152 *is_snapshot = data == raft->snap.data;
3153 return data;
3154 }
3155
3156 /* Returns the log index of the last-read snapshot or log entry. */
3157 uint64_t
3158 raft_get_applied_index(const struct raft *raft)
3159 {
3160 return raft->last_applied;
3161 }
3162
3163 /* Returns the log index of the last snapshot or log entry that is available to
3164 * be read. */
3165 uint64_t
3166 raft_get_commit_index(const struct raft *raft)
3167 {
3168 return raft->commit_index;
3169 }
3170
3171 static struct raft_server *
3172 raft_find_peer(struct raft *raft, const struct uuid *uuid)
3173 {
3174 struct raft_server *s = raft_find_server(raft, uuid);
3175 return s && !uuid_equals(&raft->sid, &s->sid) ? s : NULL;
3176 }
3177
3178 static struct raft_server *
3179 raft_find_new_server(struct raft *raft, const struct uuid *uuid)
3180 {
3181 return raft_server_find(&raft->add_servers, uuid);
3182 }
3183
3184 /* Figure 3.1: "If there exists an N such that N > commitIndex, a
3185 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
3186 * commitIndex = N (sections 3.5 and 3.6)." */
3187 static void
3188 raft_consider_updating_commit_index(struct raft *raft)
3189 {
3190 /* This loop cannot just bail out when it comes across a log entry that
3191 * does not match the criteria. For example, Figure 3.7(d2) shows a
3192 * case where the log entry for term 2 cannot be committed directly
3193 * (because it is not for the current term) but it can be committed as
3194 * a side effect of commit the entry for term 4 (the current term).
3195 * XXX Is there a more efficient way to do this? */
3196 ovs_assert(raft->role == RAFT_LEADER);
3197
3198 uint64_t new_commit_index = raft->commit_index;
3199 for (uint64_t idx = MAX(raft->commit_index + 1, raft->log_start);
3200 idx < raft->log_end; idx++) {
3201 if (raft->entries[idx - raft->log_start].term == raft->term) {
3202 size_t count = 0;
3203 struct raft_server *s2;
3204 HMAP_FOR_EACH (s2, hmap_node, &raft->servers) {
3205 if (s2->match_index >= idx) {
3206 count++;
3207 }
3208 }
3209 if (count > hmap_count(&raft->servers) / 2) {
3210 VLOG_DBG("index %"PRIu64" committed to %"PRIuSIZE" servers, "
3211 "applying", idx, count);
3212 new_commit_index = idx;
3213 }
3214 }
3215 }
3216 if (raft_update_commit_index(raft, new_commit_index)) {
3217 raft_send_heartbeats(raft);
3218 }
3219 }
3220
3221 static void
3222 raft_update_match_index(struct raft *raft, struct raft_server *s,
3223 uint64_t min_index)
3224 {
3225 ovs_assert(raft->role == RAFT_LEADER);
3226 if (min_index > s->match_index) {
3227 s->match_index = min_index;
3228 raft_consider_updating_commit_index(raft);
3229 }
3230 }
3231
3232 static void
3233 raft_update_our_match_index(struct raft *raft, uint64_t min_index)
3234 {
3235 struct raft_server *server = raft_find_server(raft, &raft->sid);
3236 if (server) {
3237 raft_update_match_index(raft, server, min_index);
3238 }
3239 }
3240
3241 static void
3242 raft_send_install_snapshot_request(struct raft *raft,
3243 const struct raft_server *s,
3244 const char *comment)
3245 {
3246 union raft_rpc rpc = {
3247 .install_snapshot_request = {
3248 .common = {
3249 .type = RAFT_RPC_INSTALL_SNAPSHOT_REQUEST,
3250 .sid = s->sid,
3251 .comment = CONST_CAST(char *, comment),
3252 },
3253 .term = raft->term,
3254 .last_index = raft->log_start - 1,
3255 .last_term = raft->snap.term,
3256 .last_servers = raft->snap.servers,
3257 .last_eid = raft->snap.eid,
3258 .data = raft->snap.data,
3259 .election_timer = raft->election_timer,
3260 }
3261 };
3262 raft_send(raft, &rpc);
3263 }
3264
3265 static void
3266 raft_handle_append_reply(struct raft *raft,
3267 const struct raft_append_reply *rpy)
3268 {
3269 if (raft->role != RAFT_LEADER) {
3270 VLOG_INFO("rejected append_reply (not leader)");
3271 return;
3272 }
3273
3274 /* Most commonly we'd be getting an AppendEntries reply from a configured
3275 * server (e.g. a peer), but we can also get them from servers in the
3276 * process of being added. */
3277 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3278 if (!s) {
3279 s = raft_find_new_server(raft, &rpy->common.sid);
3280 if (!s) {
3281 VLOG_INFO("rejected append_reply from unknown server "SID_FMT,
3282 SID_ARGS(&rpy->common.sid));
3283 return;
3284 }
3285 }
3286
3287 s->replied = true;
3288 if (rpy->result == RAFT_APPEND_OK) {
3289 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3290 * follower (section 3.5)." */
3291 uint64_t min_index = rpy->prev_log_index + rpy->n_entries + 1;
3292 if (s->next_index < min_index) {
3293 s->next_index = min_index;
3294 }
3295 raft_update_match_index(raft, s, min_index - 1);
3296 } else {
3297 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3298 * decrement nextIndex and retry (section 3.5)."
3299 *
3300 * We also implement the optimization suggested in section 4.2.1:
3301 * "Various approaches can make nextIndex converge to its correct value
3302 * more quickly, including those described in Chapter 3. The simplest
3303 * approach to solving this particular problem of adding a new server,
3304 * however, is to have followers return the length of their logs in the
3305 * AppendEntries response; this allows the leader to cap the follower’s
3306 * nextIndex accordingly." */
3307 s->next_index = (s->next_index > 0
3308 ? MIN(s->next_index - 1, rpy->log_end)
3309 : 0);
3310
3311 if (rpy->result == RAFT_APPEND_IO_ERROR) {
3312 /* Append failed but not because of a log inconsistency. Because
3313 * of the I/O error, there's no point in re-sending the append
3314 * immediately. */
3315 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3316 VLOG_INFO_RL(&rl, "%s reported I/O error", s->nickname);
3317 return;
3318 }
3319 }
3320
3321 /*
3322 * Our behavior here must depend on the value of next_index relative to
3323 * log_start and log_end. There are three cases:
3324 *
3325 * Case 1 | Case 2 | Case 3
3326 * <---------------->|<------------->|<------------------>
3327 * | |
3328 *
3329 * +---+---+---+---+
3330 * T | T | T | T | T |
3331 * +---+---+---+---+
3332 * ^ ^
3333 * | |
3334 * log_start log_end
3335 */
3336 if (s->next_index < raft->log_start) {
3337 /* Case 1. */
3338 raft_send_install_snapshot_request(raft, s, NULL);
3339 } else if (s->next_index < raft->log_end) {
3340 /* Case 2. */
3341 raft_send_append_request(raft, s, 1, NULL);
3342 } else {
3343 /* Case 3. */
3344 if (s->phase == RAFT_PHASE_CATCHUP) {
3345 s->phase = RAFT_PHASE_CAUGHT_UP;
3346 raft_run_reconfigure(raft);
3347 }
3348 }
3349 }
3350
3351 static bool
3352 raft_should_suppress_disruptive_server(struct raft *raft,
3353 const union raft_rpc *rpc)
3354 {
3355 if (rpc->type != RAFT_RPC_VOTE_REQUEST) {
3356 return false;
3357 }
3358
3359 /* Section 4.2.3 "Disruptive Servers" says:
3360 *
3361 * ...if a server receives a RequestVote request within the minimum
3362 * election timeout of hearing from a current leader, it does not update
3363 * its term or grant its vote...
3364 *
3365 * ...This change conflicts with the leadership transfer mechanism as
3366 * described in Chapter 3, in which a server legitimately starts an
3367 * election without waiting an election timeout. In that case,
3368 * RequestVote messages should be processed by other servers even when
3369 * they believe a current cluster leader exists. Those RequestVote
3370 * requests can include a special flag to indicate this behavior (“I
3371 * have permission to disrupt the leader--it told me to!”).
3372 *
3373 * This clearly describes how the followers should act, but not the leader.
3374 * We just ignore vote requests that arrive at a current leader. This
3375 * seems to be fairly safe, since a majority other than the current leader
3376 * can still elect a new leader and the first AppendEntries from that new
3377 * leader will depose the current leader. */
3378 const struct raft_vote_request *rq = raft_vote_request_cast(rpc);
3379 if (rq->leadership_transfer) {
3380 return false;
3381 }
3382
3383 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3384 long long int now = time_msec();
3385 switch (raft->role) {
3386 case RAFT_LEADER:
3387 VLOG_WARN_RL(&rl, "ignoring vote request received as leader");
3388 return true;
3389
3390 case RAFT_FOLLOWER:
3391 if (now < raft->election_base + raft->election_timer) {
3392 VLOG_WARN_RL(&rl, "ignoring vote request received after only "
3393 "%lld ms (minimum election time is %"PRIu64" ms)",
3394 now - raft->election_base, raft->election_timer);
3395 return true;
3396 }
3397 return false;
3398
3399 case RAFT_CANDIDATE:
3400 return false;
3401
3402 default:
3403 OVS_NOT_REACHED();
3404 }
3405 }
3406
3407 /* Returns true if a reply should be sent. */
3408 static bool
3409 raft_handle_vote_request__(struct raft *raft,
3410 const struct raft_vote_request *rq)
3411 {
3412 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3413 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3414 * 3.6)." */
3415 if (uuid_equals(&raft->vote, &rq->common.sid)) {
3416 /* Already voted for this candidate in this term. Resend vote. */
3417 return true;
3418 } else if (!uuid_is_zero(&raft->vote)) {
3419 /* Already voted for different candidate in this term. Send a reply
3420 * saying what candidate we did vote for. This isn't a necessary part
3421 * of the Raft protocol but it can make debugging easier. */
3422 return true;
3423 }
3424
3425 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3426 * includes information about the candidate’s log, and the voter denies its
3427 * vote if its own log is more up-to-date than that of the candidate. Raft
3428 * determines which of two logs is more up-to-date by comparing the index
3429 * and term of the last entries in the logs. If the logs have last entries
3430 * with different terms, then the log with the later term is more
3431 * up-to-date. If the logs end with the same term, then whichever log is
3432 * longer is more up-to-date." */
3433 uint64_t last_term = (raft->log_end > raft->log_start
3434 ? raft->entries[raft->log_end - 1
3435 - raft->log_start].term
3436 : raft->snap.term);
3437 if (last_term > rq->last_log_term
3438 || (last_term == rq->last_log_term
3439 && raft->log_end - 1 > rq->last_log_index)) {
3440 /* Our log is more up-to-date than the peer's. Withhold vote. */
3441 return false;
3442 }
3443
3444 /* Record a vote for the peer. */
3445 if (!raft_set_term(raft, raft->term, &rq->common.sid)) {
3446 return false;
3447 }
3448
3449 raft_reset_election_timer(raft);
3450
3451 return true;
3452 }
3453
3454 static void
3455 raft_send_vote_reply(struct raft *raft, const struct uuid *dst,
3456 const struct uuid *vote)
3457 {
3458 union raft_rpc rpy = {
3459 .vote_reply = {
3460 .common = {
3461 .type = RAFT_RPC_VOTE_REPLY,
3462 .sid = *dst,
3463 },
3464 .term = raft->term,
3465 .vote = *vote,
3466 },
3467 };
3468 raft_send(raft, &rpy);
3469 }
3470
3471 static void
3472 raft_handle_vote_request(struct raft *raft,
3473 const struct raft_vote_request *rq)
3474 {
3475 if (raft_handle_vote_request__(raft, rq)) {
3476 raft_send_vote_reply(raft, &rq->common.sid, &raft->vote);
3477 }
3478 }
3479
3480 static void
3481 raft_handle_vote_reply(struct raft *raft,
3482 const struct raft_vote_reply *rpy)
3483 {
3484 if (!raft_receive_term__(raft, &rpy->common, rpy->term)) {
3485 return;
3486 }
3487
3488 if (raft->role != RAFT_CANDIDATE) {
3489 return;
3490 }
3491
3492 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3493 if (s) {
3494 raft_accept_vote(raft, s, &rpy->vote);
3495 }
3496 }
3497
3498 /* Returns true if 'raft''s log contains reconfiguration entries that have not
3499 * yet been committed. */
3500 static bool
3501 raft_has_uncommitted_configuration(const struct raft *raft)
3502 {
3503 for (uint64_t i = raft->commit_index + 1; i < raft->log_end; i++) {
3504 ovs_assert(i >= raft->log_start);
3505 const struct raft_entry *e = &raft->entries[i - raft->log_start];
3506 if (e->servers) {
3507 return true;
3508 }
3509 }
3510 return false;
3511 }
3512
3513 static void
3514 raft_log_reconfiguration(struct raft *raft)
3515 {
3516 struct json *servers_json = raft_servers_to_json(&raft->servers);
3517 raft_command_unref(raft_command_execute__(
3518 raft, NULL, servers_json, 0, NULL, NULL));
3519 json_destroy(servers_json);
3520 }
3521
3522 static void
3523 raft_run_reconfigure(struct raft *raft)
3524 {
3525 ovs_assert(raft->role == RAFT_LEADER);
3526
3527 /* Reconfiguration only progresses when configuration changes commit. */
3528 if (raft_has_uncommitted_configuration(raft)) {
3529 return;
3530 }
3531
3532 /* If we were waiting for a configuration change to commit, it's done. */
3533 struct raft_server *s;
3534 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3535 if (s->phase == RAFT_PHASE_COMMITTING) {
3536 raft_send_add_server_reply__(raft, &s->sid, s->address,
3537 true, RAFT_SERVER_COMPLETED);
3538 s->phase = RAFT_PHASE_STABLE;
3539 }
3540 }
3541 if (raft->remove_server) {
3542 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
3543 &raft->remove_server->requester_sid,
3544 raft->remove_server->requester_conn,
3545 true, RAFT_SERVER_COMPLETED);
3546 raft_server_destroy(raft->remove_server);
3547 raft->remove_server = NULL;
3548 }
3549
3550 /* If a new server is caught up, add it to the configuration. */
3551 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
3552 if (s->phase == RAFT_PHASE_CAUGHT_UP) {
3553 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3554 hmap_remove(&raft->add_servers, &s->hmap_node);
3555 hmap_insert(&raft->servers, &s->hmap_node, uuid_hash(&s->sid));
3556
3557 /* Mark 's' as waiting for commit. */
3558 s->phase = RAFT_PHASE_COMMITTING;
3559
3560 raft_log_reconfiguration(raft);
3561
3562 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3563 * send a RAFT_SERVER_OK reply. */
3564
3565 return;
3566 }
3567 }
3568
3569 /* Remove a server, if one is scheduled for removal. */
3570 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3571 if (s->phase == RAFT_PHASE_REMOVE) {
3572 hmap_remove(&raft->servers, &s->hmap_node);
3573 raft->remove_server = s;
3574
3575 raft_log_reconfiguration(raft);
3576
3577 return;
3578 }
3579 }
3580 }
3581
3582 static void
3583 raft_handle_add_server_request(struct raft *raft,
3584 const struct raft_add_server_request *rq)
3585 {
3586 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3587 if (raft->role != RAFT_LEADER) {
3588 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3589 return;
3590 }
3591
3592 /* Check for an existing server. */
3593 struct raft_server *s = raft_find_server(raft, &rq->common.sid);
3594 if (s) {
3595 /* If the server is scheduled to be removed, cancel it. */
3596 if (s->phase == RAFT_PHASE_REMOVE) {
3597 s->phase = RAFT_PHASE_STABLE;
3598 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_CANCELED);
3599 return;
3600 }
3601
3602 /* If the server is being added, then it's in progress. */
3603 if (s->phase != RAFT_PHASE_STABLE) {
3604 raft_send_add_server_reply(raft, rq,
3605 false, RAFT_SERVER_IN_PROGRESS);
3606 }
3607
3608 /* Nothing to do--server is already part of the configuration. */
3609 raft_send_add_server_reply(raft, rq,
3610 true, RAFT_SERVER_ALREADY_PRESENT);
3611 return;
3612 }
3613
3614 /* Check for a server being removed. */
3615 if (raft->remove_server
3616 && uuid_equals(&rq->common.sid, &raft->remove_server->sid)) {
3617 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3618 return;
3619 }
3620
3621 /* Check for a server already being added. */
3622 if (raft_find_new_server(raft, &rq->common.sid)) {
3623 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_IN_PROGRESS);
3624 return;
3625 }
3626
3627 /* Add server to 'add_servers'. */
3628 s = raft_server_add(&raft->add_servers, &rq->common.sid, rq->address);
3629 raft_server_init_leader(raft, s);
3630 s->requester_sid = rq->common.sid;
3631 s->requester_conn = NULL;
3632 s->phase = RAFT_PHASE_CATCHUP;
3633
3634 /* Start sending the log. If this is the first time we've tried to add
3635 * this server, then this will quickly degenerate into an InstallSnapshot
3636 * followed by a series of AddEntries, but if it's a retry of an earlier
3637 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3638 * leadership) then it will gracefully resume populating the log.
3639 *
3640 * See the last few paragraphs of section 4.2.1 for further insight. */
3641 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3642 VLOG_INFO_RL(&rl,
3643 "starting to add server %s ("SID_FMT" at %s) "
3644 "to cluster "CID_FMT, s->nickname, SID_ARGS(&s->sid),
3645 rq->address, CID_ARGS(&raft->cid));
3646 raft_send_append_request(raft, s, 0, "initialize new server");
3647 }
3648
3649 static void
3650 raft_handle_add_server_reply(struct raft *raft,
3651 const struct raft_add_server_reply *rpy)
3652 {
3653 if (!raft->joining) {
3654 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3655 VLOG_WARN_RL(&rl, "received add_server_reply even though we're "
3656 "already part of the cluster");
3657 return;
3658 }
3659
3660 if (rpy->success) {
3661 raft->joining = false;
3662
3663 /* It is tempting, at this point, to check that this server is part of
3664 * the current configuration. However, this is not necessarily the
3665 * case, because the log entry that added this server to the cluster
3666 * might have been committed by a majority of the cluster that does not
3667 * include this one. This actually happens in testing. */
3668 } else {
3669 const char *address;
3670 SSET_FOR_EACH (address, &rpy->remote_addresses) {
3671 if (sset_add(&raft->remote_addresses, address)) {
3672 VLOG_INFO("%s: learned new server address for joining cluster",
3673 address);
3674 }
3675 }
3676 }
3677 }
3678
3679 /* This is called by raft_unixctl_kick() as well as via RPC. */
3680 static void
3681 raft_handle_remove_server_request(struct raft *raft,
3682 const struct raft_remove_server_request *rq)
3683 {
3684 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3685 if (raft->role != RAFT_LEADER) {
3686 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3687 return;
3688 }
3689
3690 /* If the server to remove is currently waiting to be added, cancel it. */
3691 struct raft_server *target = raft_find_new_server(raft, &rq->sid);
3692 if (target) {
3693 raft_send_add_server_reply__(raft, &target->sid, target->address,
3694 false, RAFT_SERVER_CANCELED);
3695 hmap_remove(&raft->add_servers, &target->hmap_node);
3696 raft_server_destroy(target);
3697 return;
3698 }
3699
3700 /* If the server isn't configured, report that. */
3701 target = raft_find_server(raft, &rq->sid);
3702 if (!target) {
3703 raft_send_remove_server_reply(raft, rq,
3704 true, RAFT_SERVER_ALREADY_GONE);
3705 return;
3706 }
3707
3708 /* Check whether we're waiting for the addition of the server to commit. */
3709 if (target->phase == RAFT_PHASE_COMMITTING) {
3710 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3711 return;
3712 }
3713
3714 /* Check whether the server is already scheduled for removal. */
3715 if (target->phase == RAFT_PHASE_REMOVE) {
3716 raft_send_remove_server_reply(raft, rq,
3717 false, RAFT_SERVER_IN_PROGRESS);
3718 return;
3719 }
3720
3721 /* Make sure that if we remove this server then that at least one other
3722 * server will be left. We don't count servers currently being added (in
3723 * 'add_servers') since those could fail. */
3724 struct raft_server *s;
3725 int n = 0;
3726 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3727 if (s != target && s->phase != RAFT_PHASE_REMOVE) {
3728 n++;
3729 }
3730 }
3731 if (!n) {
3732 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_EMPTY);
3733 return;
3734 }
3735
3736 /* Mark the server for removal. */
3737 target->phase = RAFT_PHASE_REMOVE;
3738 if (rq->requester_conn) {
3739 target->requester_sid = UUID_ZERO;
3740 unixctl_command_reply(rq->requester_conn, "started removal");
3741 } else {
3742 target->requester_sid = rq->common.sid;
3743 target->requester_conn = NULL;
3744 }
3745
3746 raft_run_reconfigure(raft);
3747 /* Operation in progress, reply will be sent later. */
3748 }
3749
3750 static void
3751 raft_finished_leaving_cluster(struct raft *raft)
3752 {
3753 VLOG_INFO(SID_FMT": finished leaving cluster "CID_FMT,
3754 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
3755
3756 raft_record_note(raft, "left", "this server left the cluster");
3757
3758 raft->leaving = false;
3759 raft->left = true;
3760 }
3761
3762 static void
3763 raft_handle_remove_server_reply(struct raft *raft,
3764 const struct raft_remove_server_reply *rpc)
3765 {
3766 if (rpc->success
3767 && (uuid_is_zero(&rpc->target_sid)
3768 || uuid_equals(&rpc->target_sid, &raft->sid))) {
3769 raft_finished_leaving_cluster(raft);
3770 }
3771 }
3772
3773 static bool
3774 raft_handle_write_error(struct raft *raft, struct ovsdb_error *error)
3775 {
3776 if (error && !raft->failed) {
3777 raft->failed = true;
3778
3779 char *s = ovsdb_error_to_string_free(error);
3780 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3781 raft->name, s);
3782 free(s);
3783 }
3784 return !raft->failed;
3785 }
3786
3787 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3788 raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
3789 uint64_t new_log_start,
3790 const struct raft_entry *new_snapshot)
3791 {
3792 struct raft_header h = {
3793 .sid = raft->sid,
3794 .cid = raft->cid,
3795 .name = raft->name,
3796 .local_address = raft->local_address,
3797 .snap_index = new_log_start - 1,
3798 .snap = *new_snapshot,
3799 };
3800 struct ovsdb_error *error = ovsdb_log_write_and_free(
3801 log, raft_header_to_json(&h));
3802 if (error) {
3803 return error;
3804 }
3805 ovsdb_log_mark_base(raft->log);
3806
3807 /* Write log records. */
3808 for (uint64_t index = new_log_start; index < raft->log_end; index++) {
3809 const struct raft_entry *e = &raft->entries[index - raft->log_start];
3810 struct raft_record r = {
3811 .type = RAFT_REC_ENTRY,
3812 .term = e->term,
3813 .entry = {
3814 .index = index,
3815 .data = e->data,
3816 .servers = e->servers,
3817 .election_timer = e->election_timer,
3818 .eid = e->eid,
3819 },
3820 };
3821 error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3822 if (error) {
3823 return error;
3824 }
3825 }
3826
3827 /* Write term and vote (if any).
3828 *
3829 * The term is redundant if we wrote a log record for that term above. The
3830 * vote, if any, is never redundant.
3831 */
3832 error = raft_write_state(log, raft->term, &raft->vote);
3833 if (error) {
3834 return error;
3835 }
3836
3837 /* Write commit_index if it's beyond the new start of the log. */
3838 if (raft->commit_index >= new_log_start) {
3839 struct raft_record r = {
3840 .type = RAFT_REC_COMMIT_INDEX,
3841 .commit_index = raft->commit_index,
3842 };
3843 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3844 }
3845 return NULL;
3846 }
3847
3848 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3849 raft_save_snapshot(struct raft *raft,
3850 uint64_t new_start, const struct raft_entry *new_snapshot)
3851
3852 {
3853 struct ovsdb_log *new_log;
3854 struct ovsdb_error *error;
3855 error = ovsdb_log_replace_start(raft->log, &new_log);
3856 if (error) {
3857 return error;
3858 }
3859
3860 error = raft_write_snapshot(raft, new_log, new_start, new_snapshot);
3861 if (error) {
3862 ovsdb_log_replace_abort(new_log);
3863 return error;
3864 }
3865
3866 return ovsdb_log_replace_commit(raft->log, new_log);
3867 }
3868
3869 static bool
3870 raft_handle_install_snapshot_request__(
3871 struct raft *raft, const struct raft_install_snapshot_request *rq)
3872 {
3873 raft_reset_election_timer(raft);
3874
3875 /*
3876 * Our behavior here depend on new_log_start in the snapshot compared to
3877 * log_start and log_end. There are three cases:
3878 *
3879 * Case 1 | Case 2 | Case 3
3880 * <---------------->|<------------->|<------------------>
3881 * | |
3882 *
3883 * +---+---+---+---+
3884 * T | T | T | T | T |
3885 * +---+---+---+---+
3886 * ^ ^
3887 * | |
3888 * log_start log_end
3889 */
3890 uint64_t new_log_start = rq->last_index + 1;
3891 if (new_log_start < raft->log_start) {
3892 /* Case 1: The new snapshot covers less than our current one. Nothing
3893 * to do. */
3894 return true;
3895 } else if (new_log_start < raft->log_end) {
3896 /* Case 2: The new snapshot starts in the middle of our log. We could
3897 * discard the first 'new_log_start - raft->log_start' entries in the
3898 * log. But there's not much value in that, since snapshotting is
3899 * supposed to be a local decision. Just skip it. */
3900 return true;
3901 }
3902
3903 /* Case 3: The new snapshot starts past the end of our current log, so
3904 * discard all of our current log. */
3905 const struct raft_entry new_snapshot = {
3906 .term = rq->last_term,
3907 .data = rq->data,
3908 .eid = rq->last_eid,
3909 .servers = rq->last_servers,
3910 .election_timer = rq->election_timer,
3911 };
3912 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3913 &new_snapshot);
3914 if (error) {
3915 char *error_s = ovsdb_error_to_string(error);
3916 VLOG_WARN("could not save snapshot: %s", error_s);
3917 free(error_s);
3918 return false;
3919 }
3920
3921 for (size_t i = 0; i < raft->log_end - raft->log_start; i++) {
3922 raft_entry_uninit(&raft->entries[i]);
3923 }
3924 raft->log_start = raft->log_end = new_log_start;
3925 raft->log_synced = raft->log_end - 1;
3926 raft->commit_index = raft->log_start - 1;
3927 if (raft->last_applied < raft->commit_index) {
3928 raft->last_applied = raft->log_start - 2;
3929 }
3930
3931 raft_entry_uninit(&raft->snap);
3932 raft_entry_clone(&raft->snap, &new_snapshot);
3933
3934 raft_get_servers_from_log(raft, VLL_INFO);
3935 raft_get_election_timer_from_log(raft);
3936
3937 return true;
3938 }
3939
3940 static void
3941 raft_handle_install_snapshot_request(
3942 struct raft *raft, const struct raft_install_snapshot_request *rq)
3943 {
3944 if (raft_handle_install_snapshot_request__(raft, rq)) {
3945 union raft_rpc rpy = {
3946 .install_snapshot_reply = {
3947 .common = {
3948 .type = RAFT_RPC_INSTALL_SNAPSHOT_REPLY,
3949 .sid = rq->common.sid,
3950 },
3951 .term = raft->term,
3952 .last_index = rq->last_index,
3953 .last_term = rq->last_term,
3954 },
3955 };
3956 raft_send(raft, &rpy);
3957 }
3958 }
3959
3960 static void
3961 raft_handle_install_snapshot_reply(
3962 struct raft *raft, const struct raft_install_snapshot_reply *rpy)
3963 {
3964 /* We might get an InstallSnapshot reply from a configured server (e.g. a
3965 * peer) or a server in the process of being added. */
3966 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3967 if (!s) {
3968 s = raft_find_new_server(raft, &rpy->common.sid);
3969 if (!s) {
3970 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3971 VLOG_INFO_RL(&rl, "cluster "CID_FMT": received %s from "
3972 "unknown server "SID_FMT, CID_ARGS(&raft->cid),
3973 raft_rpc_type_to_string(rpy->common.type),
3974 SID_ARGS(&rpy->common.sid));
3975 return;
3976 }
3977 }
3978
3979 if (rpy->last_index != raft->log_start - 1 ||
3980 rpy->last_term != raft->snap.term) {
3981 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3982 VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s installed "
3983 "out-of-date snapshot, starting over",
3984 CID_ARGS(&raft->cid), s->nickname);
3985 raft_send_install_snapshot_request(raft, s,
3986 "installed obsolete snapshot");
3987 return;
3988 }
3989
3990 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3991 VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
3992 " up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
3993 s->nickname, rpy->last_term, rpy->last_index);
3994 s->next_index = raft->log_end;
3995 raft_send_append_request(raft, s, 0, "snapshot installed");
3996 }
3997
3998 /* Returns true if 'raft' has grown enough since the last snapshot that
3999 * reducing the log to a snapshot would be valuable, false otherwise. */
4000 bool
4001 raft_grew_lots(const struct raft *raft)
4002 {
4003 return ovsdb_log_grew_lots(raft->log);
4004 }
4005
4006 /* Returns the number of log entries that could be trimmed off the on-disk log
4007 * by snapshotting. */
4008 uint64_t
4009 raft_get_log_length(const struct raft *raft)
4010 {
4011 return (raft->last_applied < raft->log_start
4012 ? 0
4013 : raft->last_applied - raft->log_start + 1);
4014 }
4015
4016 /* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
4017 * possible. */
4018 bool
4019 raft_may_snapshot(const struct raft *raft)
4020 {
4021 return (!raft->joining
4022 && !raft->leaving
4023 && !raft->left
4024 && !raft->failed
4025 && raft->last_applied >= raft->log_start);
4026 }
4027
4028 /* Replaces the log for 'raft', up to the last log entry read, by
4029 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
4030 * the caller must eventually free.
4031 *
4032 * This function can only succeed if raft_may_snapshot() returns true. It is
4033 * only valuable to call it if raft_get_log_length() is significant and
4034 * especially if raft_grew_lots() returns true. */
4035 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
4036 raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
4037 {
4038 if (raft->joining) {
4039 return ovsdb_error(NULL,
4040 "cannot store a snapshot while joining cluster");
4041 } else if (raft->leaving) {
4042 return ovsdb_error(NULL,
4043 "cannot store a snapshot while leaving cluster");
4044 } else if (raft->left) {
4045 return ovsdb_error(NULL,
4046 "cannot store a snapshot after leaving cluster");
4047 } else if (raft->failed) {
4048 return ovsdb_error(NULL,
4049 "cannot store a snapshot following failure");
4050 }
4051
4052 if (raft->last_applied < raft->log_start) {
4053 return ovsdb_error(NULL, "not storing a duplicate snapshot");
4054 }
4055
4056 uint64_t new_log_start = raft->last_applied + 1;
4057 struct raft_entry new_snapshot = {
4058 .term = raft_get_term(raft, new_log_start - 1),
4059 .data = json_clone(new_snapshot_data),
4060 .eid = *raft_get_eid(raft, new_log_start - 1),
4061 .servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
4062 .election_timer = raft->election_timer,
4063 };
4064 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
4065 &new_snapshot);
4066 if (error) {
4067 raft_entry_uninit(&new_snapshot);
4068 return error;
4069 }
4070
4071 raft->log_synced = raft->log_end - 1;
4072 raft_entry_uninit(&raft->snap);
4073 raft->snap = new_snapshot;
4074 for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
4075 raft_entry_uninit(&raft->entries[i]);
4076 }
4077 memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
4078 (raft->log_end - new_log_start) * sizeof *raft->entries);
4079 raft->log_start = new_log_start;
4080 return NULL;
4081 }
4082
4083 static void
4084 raft_handle_become_leader(struct raft *raft,
4085 const struct raft_become_leader *rq)
4086 {
4087 if (raft->role == RAFT_FOLLOWER) {
4088 char buf[SID_LEN + 1];
4089 VLOG_INFO("received leadership transfer from %s in term %"PRIu64,
4090 raft_get_nickname(raft, &rq->common.sid, buf, sizeof buf),
4091 rq->term);
4092 raft_start_election(raft, true);
4093 }
4094 }
4095
4096 static void
4097 raft_send_execute_command_reply(struct raft *raft,
4098 const struct uuid *sid,
4099 const struct uuid *eid,
4100 enum raft_command_status status,
4101 uint64_t commit_index)
4102 {
4103 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REP) {
4104 ovs_fatal(0, "Raft test: crash before sending execute_command_reply");
4105 }
4106 union raft_rpc rpc = {
4107 .execute_command_reply = {
4108 .common = {
4109 .type = RAFT_RPC_EXECUTE_COMMAND_REPLY,
4110 .sid = *sid,
4111 },
4112 .result = *eid,
4113 .status = status,
4114 .commit_index = commit_index,
4115 },
4116 };
4117 raft_send(raft, &rpc);
4118 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REP) {
4119 ovs_fatal(0, "Raft test: crash after sending execute_command_reply.");
4120 }
4121 }
4122
4123 static enum raft_command_status
4124 raft_handle_execute_command_request__(
4125 struct raft *raft, const struct raft_execute_command_request *rq)
4126 {
4127 if (raft->role != RAFT_LEADER) {
4128 return RAFT_CMD_NOT_LEADER;
4129 }
4130
4131 const struct uuid *current_eid = raft_current_eid(raft);
4132 if (!uuid_equals(&rq->prereq, current_eid)) {
4133 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4134 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
4135 "prerequisite "UUID_FMT" in execute_command_request",
4136 UUID_ARGS(current_eid), UUID_ARGS(&rq->prereq));
4137 return RAFT_CMD_BAD_PREREQ;
4138 }
4139
4140 struct raft_command *cmd = raft_command_initiate(raft, rq->data,
4141 NULL, 0, &rq->result);
4142 cmd->sid = rq->common.sid;
4143
4144 enum raft_command_status status = cmd->status;
4145 if (status != RAFT_CMD_INCOMPLETE) {
4146 raft_command_unref(cmd);
4147 }
4148 return status;
4149 }
4150
4151 static void
4152 raft_handle_execute_command_request(
4153 struct raft *raft, const struct raft_execute_command_request *rq)
4154 {
4155 enum raft_command_status status
4156 = raft_handle_execute_command_request__(raft, rq);
4157 if (status != RAFT_CMD_INCOMPLETE) {
4158 raft_send_execute_command_reply(raft, &rq->common.sid, &rq->result,
4159 status, 0);
4160 }
4161 }
4162
4163 static void
4164 raft_handle_execute_command_reply(
4165 struct raft *raft, const struct raft_execute_command_reply *rpy)
4166 {
4167 struct raft_command *cmd = raft_find_command_by_eid(raft, &rpy->result);
4168 if (!cmd) {
4169 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4170 char buf[SID_LEN + 1];
4171 VLOG_INFO_RL(&rl,
4172 "%s received \"%s\" reply from %s for unknown command",
4173 raft->local_nickname,
4174 raft_command_status_to_string(rpy->status),
4175 raft_get_nickname(raft, &rpy->common.sid,
4176 buf, sizeof buf));
4177 return;
4178 }
4179
4180 if (rpy->status == RAFT_CMD_INCOMPLETE) {
4181 cmd->timestamp = time_msec();
4182 } else {
4183 cmd->index = rpy->commit_index;
4184 raft_command_complete(raft, cmd, rpy->status);
4185 }
4186 }
4187
4188 static void
4189 raft_handle_rpc(struct raft *raft, const union raft_rpc *rpc)
4190 {
4191 uint64_t term = raft_rpc_get_term(rpc);
4192 if (term
4193 && !raft_should_suppress_disruptive_server(raft, rpc)
4194 && !raft_receive_term__(raft, &rpc->common, term)) {
4195 if (rpc->type == RAFT_RPC_APPEND_REQUEST) {
4196 /* Section 3.3: "If a server receives a request with a stale term
4197 * number, it rejects the request." */
4198 raft_send_append_reply(raft, raft_append_request_cast(rpc),
4199 RAFT_APPEND_INCONSISTENCY, "stale term");
4200 }
4201 return;
4202 }
4203
4204 switch (rpc->type) {
4205 #define RAFT_RPC(ENUM, NAME) \
4206 case ENUM: \
4207 raft_handle_##NAME(raft, &rpc->NAME); \
4208 break;
4209 RAFT_RPC_TYPES
4210 #undef RAFT_RPC
4211 default:
4212 OVS_NOT_REACHED();
4213 }
4214 }
4215 \f
4216 static bool
4217 raft_rpc_is_heartbeat(const union raft_rpc *rpc)
4218 {
4219 return ((rpc->type == RAFT_RPC_APPEND_REQUEST
4220 || rpc->type == RAFT_RPC_APPEND_REPLY)
4221 && rpc->common.comment
4222 && !strcmp(rpc->common.comment, "heartbeat"));
4223 }
4224
4225 \f
4226 static bool
4227 raft_send_to_conn_at(struct raft *raft, const union raft_rpc *rpc,
4228 struct raft_conn *conn, int line_number)
4229 {
4230 log_rpc(rpc, "-->", conn, line_number);
4231 return !jsonrpc_session_send(
4232 conn->js, raft_rpc_to_jsonrpc(&raft->cid, &raft->sid, rpc));
4233 }
4234
4235 static bool
4236 raft_is_rpc_synced(const struct raft *raft, const union raft_rpc *rpc)
4237 {
4238 uint64_t term = raft_rpc_get_term(rpc);
4239 uint64_t index = raft_rpc_get_min_sync_index(rpc);
4240 const struct uuid *vote = raft_rpc_get_vote(rpc);
4241
4242 return (term <= raft->synced_term
4243 && index <= raft->log_synced
4244 && (!vote || uuid_equals(vote, &raft->synced_vote)));
4245 }
4246
4247 static bool
4248 raft_send_at(struct raft *raft, const union raft_rpc *rpc, int line_number)
4249 {
4250 const struct uuid *dst = &rpc->common.sid;
4251 if (uuid_equals(dst, &raft->sid)) {
4252 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4253 VLOG_WARN_RL(&rl, "attempted to send RPC to self from raft.c:%d",
4254 line_number);
4255 return false;
4256 }
4257
4258 struct raft_conn *conn = raft_find_conn_by_sid(raft, dst);
4259 if (!conn) {
4260 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4261 char buf[SID_LEN + 1];
4262 VLOG_DBG_RL(&rl, "%s: no connection to %s, cannot send RPC "
4263 "from raft.c:%d", raft->local_nickname,
4264 raft_get_nickname(raft, dst, buf, sizeof buf),
4265 line_number);
4266 return false;
4267 }
4268
4269 if (!raft_is_rpc_synced(raft, rpc)) {
4270 raft_waiter_create(raft, RAFT_W_RPC, false)->rpc = raft_rpc_clone(rpc);
4271 return true;
4272 }
4273
4274 return raft_send_to_conn_at(raft, rpc, conn, line_number);
4275 }
4276 \f
4277 static struct raft *
4278 raft_lookup_by_name(const char *name)
4279 {
4280 struct raft *raft;
4281
4282 HMAP_FOR_EACH_WITH_HASH (raft, hmap_node, hash_string(name, 0),
4283 &all_rafts) {
4284 if (!strcmp(raft->name, name)) {
4285 return raft;
4286 }
4287 }
4288 return NULL;
4289 }
4290
4291 static void
4292 raft_unixctl_cid(struct unixctl_conn *conn,
4293 int argc OVS_UNUSED, const char *argv[],
4294 void *aux OVS_UNUSED)
4295 {
4296 struct raft *raft = raft_lookup_by_name(argv[1]);
4297 if (!raft) {
4298 unixctl_command_reply_error(conn, "unknown cluster");
4299 } else if (uuid_is_zero(&raft->cid)) {
4300 unixctl_command_reply_error(conn, "cluster id not yet known");
4301 } else {
4302 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->cid));
4303 unixctl_command_reply(conn, uuid);
4304 free(uuid);
4305 }
4306 }
4307
4308 static void
4309 raft_unixctl_sid(struct unixctl_conn *conn,
4310 int argc OVS_UNUSED, const char *argv[],
4311 void *aux OVS_UNUSED)
4312 {
4313 struct raft *raft = raft_lookup_by_name(argv[1]);
4314 if (!raft) {
4315 unixctl_command_reply_error(conn, "unknown cluster");
4316 } else {
4317 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->sid));
4318 unixctl_command_reply(conn, uuid);
4319 free(uuid);
4320 }
4321 }
4322
4323 static void
4324 raft_put_sid(const char *title, const struct uuid *sid,
4325 const struct raft *raft, struct ds *s)
4326 {
4327 ds_put_format(s, "%s: ", title);
4328 if (uuid_equals(sid, &raft->sid)) {
4329 ds_put_cstr(s, "self");
4330 } else if (uuid_is_zero(sid)) {
4331 ds_put_cstr(s, "unknown");
4332 } else {
4333 char buf[SID_LEN + 1];
4334 ds_put_cstr(s, raft_get_nickname(raft, sid, buf, sizeof buf));
4335 }
4336 ds_put_char(s, '\n');
4337 }
4338
4339 static void
4340 raft_unixctl_status(struct unixctl_conn *conn,
4341 int argc OVS_UNUSED, const char *argv[],
4342 void *aux OVS_UNUSED)
4343 {
4344 struct raft *raft = raft_lookup_by_name(argv[1]);
4345 if (!raft) {
4346 unixctl_command_reply_error(conn, "unknown cluster");
4347 return;
4348 }
4349
4350 struct ds s = DS_EMPTY_INITIALIZER;
4351 ds_put_format(&s, "%s\n", raft->local_nickname);
4352 ds_put_format(&s, "Name: %s\n", raft->name);
4353 ds_put_format(&s, "Cluster ID: ");
4354 if (!uuid_is_zero(&raft->cid)) {
4355 ds_put_format(&s, CID_FMT" ("UUID_FMT")\n",
4356 CID_ARGS(&raft->cid), UUID_ARGS(&raft->cid));
4357 } else {
4358 ds_put_format(&s, "not yet known\n");
4359 }
4360 ds_put_format(&s, "Server ID: "SID_FMT" ("UUID_FMT")\n",
4361 SID_ARGS(&raft->sid), UUID_ARGS(&raft->sid));
4362 ds_put_format(&s, "Address: %s\n", raft->local_address);
4363 ds_put_format(&s, "Status: %s\n",
4364 raft->joining ? "joining cluster"
4365 : raft->leaving ? "leaving cluster"
4366 : raft->left ? "left cluster"
4367 : raft->failed ? "failed"
4368 : "cluster member");
4369 if (raft->joining) {
4370 ds_put_format(&s, "Remotes for joining:");
4371 const char *address;
4372 SSET_FOR_EACH (address, &raft->remote_addresses) {
4373 ds_put_format(&s, " %s", address);
4374 }
4375 ds_put_char(&s, '\n');
4376 }
4377 if (raft->role == RAFT_LEADER) {
4378 struct raft_server *as;
4379 HMAP_FOR_EACH (as, hmap_node, &raft->add_servers) {
4380 ds_put_format(&s, "Adding server %s ("SID_FMT" at %s) (%s)\n",
4381 as->nickname, SID_ARGS(&as->sid), as->address,
4382 raft_server_phase_to_string(as->phase));
4383 }
4384
4385 struct raft_server *rs = raft->remove_server;
4386 if (rs) {
4387 ds_put_format(&s, "Removing server %s ("SID_FMT" at %s) (%s)\n",
4388 rs->nickname, SID_ARGS(&rs->sid), rs->address,
4389 raft_server_phase_to_string(rs->phase));
4390 }
4391 }
4392
4393 ds_put_format(&s, "Role: %s\n",
4394 raft->role == RAFT_LEADER ? "leader"
4395 : raft->role == RAFT_CANDIDATE ? "candidate"
4396 : raft->role == RAFT_FOLLOWER ? "follower"
4397 : "<error>");
4398 ds_put_format(&s, "Term: %"PRIu64"\n", raft->term);
4399 raft_put_sid("Leader", &raft->leader_sid, raft, &s);
4400 raft_put_sid("Vote", &raft->vote, raft, &s);
4401 ds_put_char(&s, '\n');
4402
4403 ds_put_format(&s, "Election timer: %"PRIu64, raft->election_timer);
4404 if (raft->role == RAFT_LEADER && raft->election_timer_new) {
4405 ds_put_format(&s, " (changing to %"PRIu64")",
4406 raft->election_timer_new);
4407 }
4408 ds_put_char(&s, '\n');
4409
4410 ds_put_format(&s, "Log: [%"PRIu64", %"PRIu64"]\n",
4411 raft->log_start, raft->log_end);
4412
4413 uint64_t n_uncommitted = raft->log_end - raft->commit_index - 1;
4414 ds_put_format(&s, "Entries not yet committed: %"PRIu64"\n", n_uncommitted);
4415
4416 uint64_t n_unapplied = raft->log_end - raft->last_applied - 1;
4417 ds_put_format(&s, "Entries not yet applied: %"PRIu64"\n", n_unapplied);
4418
4419 const struct raft_conn *c;
4420 ds_put_cstr(&s, "Connections:");
4421 LIST_FOR_EACH (c, list_node, &raft->conns) {
4422 bool connected = jsonrpc_session_is_connected(c->js);
4423 ds_put_format(&s, " %s%s%s%s",
4424 connected ? "" : "(",
4425 c->incoming ? "<-" : "->", c->nickname,
4426 connected ? "" : ")");
4427 }
4428 ds_put_char(&s, '\n');
4429
4430 ds_put_cstr(&s, "Servers:\n");
4431 struct raft_server *server;
4432 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
4433 ds_put_format(&s, " %s ("SID_FMT" at %s)",
4434 server->nickname,
4435 SID_ARGS(&server->sid), server->address);
4436 if (uuid_equals(&server->sid, &raft->sid)) {
4437 ds_put_cstr(&s, " (self)");
4438 }
4439 if (server->phase != RAFT_PHASE_STABLE) {
4440 ds_put_format (&s, " (%s)",
4441 raft_server_phase_to_string(server->phase));
4442 }
4443 if (raft->role == RAFT_CANDIDATE) {
4444 if (!uuid_is_zero(&server->vote)) {
4445 char buf[SID_LEN + 1];
4446 ds_put_format(&s, " (voted for %s)",
4447 raft_get_nickname(raft, &server->vote,
4448 buf, sizeof buf));
4449 }
4450 } else if (raft->role == RAFT_LEADER) {
4451 ds_put_format(&s, " next_index=%"PRIu64" match_index=%"PRIu64,
4452 server->next_index, server->match_index);
4453 }
4454 ds_put_char(&s, '\n');
4455 }
4456
4457 unixctl_command_reply(conn, ds_cstr(&s));
4458 ds_destroy(&s);
4459 }
4460
4461 static void
4462 raft_unixctl_leave__(struct unixctl_conn *conn, struct raft *raft)
4463 {
4464 if (raft_is_leaving(raft)) {
4465 unixctl_command_reply_error(conn,
4466 "already in progress leaving cluster");
4467 } else if (raft_is_joining(raft)) {
4468 unixctl_command_reply_error(conn,
4469 "can't leave while join in progress");
4470 } else if (raft_failed(raft)) {
4471 unixctl_command_reply_error(conn,
4472 "can't leave after failure");
4473 } else {
4474 raft_leave(raft);
4475 unixctl_command_reply(conn, NULL);
4476 }
4477 }
4478
4479 static void
4480 raft_unixctl_leave(struct unixctl_conn *conn, int argc OVS_UNUSED,
4481 const char *argv[], void *aux OVS_UNUSED)
4482 {
4483 struct raft *raft = raft_lookup_by_name(argv[1]);
4484 if (!raft) {
4485 unixctl_command_reply_error(conn, "unknown cluster");
4486 return;
4487 }
4488
4489 raft_unixctl_leave__(conn, raft);
4490 }
4491
4492 static struct raft_server *
4493 raft_lookup_server_best_match(struct raft *raft, const char *id)
4494 {
4495 struct raft_server *best = NULL;
4496 int best_score = -1;
4497 int n_best = 0;
4498
4499 struct raft_server *s;
4500 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
4501 int score = (!strcmp(id, s->address)
4502 ? INT_MAX
4503 : uuid_is_partial_match(&s->sid, id));
4504 if (score > best_score) {
4505 best = s;
4506 best_score = score;
4507 n_best = 1;
4508 } else if (score == best_score) {
4509 n_best++;
4510 }
4511 }
4512 return n_best == 1 ? best : NULL;
4513 }
4514
4515 static void
4516 raft_unixctl_kick(struct unixctl_conn *conn, int argc OVS_UNUSED,
4517 const char *argv[], void *aux OVS_UNUSED)
4518 {
4519 const char *cluster_name = argv[1];
4520 const char *server_name = argv[2];
4521
4522 struct raft *raft = raft_lookup_by_name(cluster_name);
4523 if (!raft) {
4524 unixctl_command_reply_error(conn, "unknown cluster");
4525 return;
4526 }
4527
4528 struct raft_server *server = raft_lookup_server_best_match(raft,
4529 server_name);
4530 if (!server) {
4531 unixctl_command_reply_error(conn, "unknown server");
4532 return;
4533 }
4534
4535 if (uuid_equals(&server->sid, &raft->sid)) {
4536 raft_unixctl_leave__(conn, raft);
4537 } else if (raft->role == RAFT_LEADER) {
4538 const struct raft_remove_server_request rq = {
4539 .sid = server->sid,
4540 .requester_conn = conn,
4541 };
4542 raft_handle_remove_server_request(raft, &rq);
4543 } else {
4544 const union raft_rpc rpc = {
4545 .remove_server_request = {
4546 .common = {
4547 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
4548 .sid = raft->leader_sid,
4549 .comment = "via unixctl"
4550 },
4551 .sid = server->sid,
4552 }
4553 };
4554 if (raft_send(raft, &rpc)) {
4555 unixctl_command_reply(conn, "sent removal request to leader");
4556 } else {
4557 unixctl_command_reply_error(conn,
4558 "failed to send removal request");
4559 }
4560 }
4561 }
4562
4563 static void
4564 raft_get_election_timer_from_log(struct raft *raft)
4565 {
4566 if (raft->snap.election_timer) {
4567 raft->election_timer = raft->snap.election_timer;
4568 }
4569 for (uint64_t index = raft->commit_index; index >= raft->log_start;
4570 index--) {
4571 struct raft_entry *e = &raft->entries[index - raft->log_start];
4572 if (e->election_timer) {
4573 raft->election_timer = e->election_timer;
4574 break;
4575 }
4576 }
4577 }
4578
4579 static void
4580 raft_log_election_timer(struct raft *raft)
4581 {
4582 raft_command_unref(raft_command_execute__(raft, NULL, NULL,
4583 raft->election_timer_new, NULL,
4584 NULL));
4585 }
4586
4587 static void
4588 raft_unixctl_change_election_timer(struct unixctl_conn *conn,
4589 int argc OVS_UNUSED, const char *argv[],
4590 void *aux OVS_UNUSED)
4591 {
4592 const char *cluster_name = argv[1];
4593 const char *election_timer_str = argv[2];
4594
4595 struct raft *raft = raft_lookup_by_name(cluster_name);
4596 if (!raft) {
4597 unixctl_command_reply_error(conn, "unknown cluster");
4598 return;
4599 }
4600
4601 if (raft->role != RAFT_LEADER) {
4602 unixctl_command_reply_error(conn, "election timer must be changed"
4603 " through leader.");
4604 return;
4605 }
4606
4607 /* If there are pending changes for election timer, reject it. */
4608 if (raft->election_timer_new) {
4609 unixctl_command_reply_error(conn, "election timer change pending.");
4610 return;
4611 }
4612
4613 uint64_t election_timer = atoll(election_timer_str);
4614 if (election_timer == raft->election_timer) {
4615 unixctl_command_reply(conn, "change election timer to current value.");
4616 return;
4617 }
4618
4619 /* Election timer smaller than 100ms or bigger than 10min doesn't make
4620 * sense. */
4621 if (election_timer < 100 || election_timer > 600000) {
4622 unixctl_command_reply_error(conn, "election timer must be between "
4623 "100 and 600000, in msec.");
4624 return;
4625 }
4626
4627 /* If election timer is to be enlarged, it should be done gradually so that
4628 * it won't cause timeout when new value is applied on leader but not yet
4629 * applied on some of the followers. */
4630 if (election_timer > raft->election_timer * 2) {
4631 unixctl_command_reply_error(conn, "election timer increase should not "
4632 "exceed the current value x 2.");
4633 return;
4634 }
4635
4636 raft->election_timer_new = election_timer;
4637 raft_log_election_timer(raft);
4638 unixctl_command_reply(conn, "change of election timer initiated.");
4639 }
4640
4641 static void
4642 raft_unixctl_failure_test(struct unixctl_conn *conn OVS_UNUSED,
4643 int argc OVS_UNUSED, const char *argv[],
4644 void *aux OVS_UNUSED)
4645 {
4646 const char *test = argv[1];
4647 if (!strcmp(test, "crash-before-sending-append-request")) {
4648 failure_test = FT_CRASH_BEFORE_SEND_APPEND_REQ;
4649 } else if (!strcmp(test, "crash-after-sending-append-request")) {
4650 failure_test = FT_CRASH_AFTER_SEND_APPEND_REQ;
4651 } else if (!strcmp(test, "crash-before-sending-execute-command-reply")) {
4652 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REP;
4653 } else if (!strcmp(test, "crash-after-sending-execute-command-reply")) {
4654 failure_test = FT_CRASH_AFTER_SEND_EXEC_REP;
4655 } else if (!strcmp(test, "crash-before-sending-execute-command-request")) {
4656 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REQ;
4657 } else if (!strcmp(test, "crash-after-sending-execute-command-request")) {
4658 failure_test = FT_CRASH_AFTER_SEND_EXEC_REQ;
4659 } else if (!strcmp(test, "crash-after-receiving-append-request-update")) {
4660 failure_test = FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE;
4661 } else if (!strcmp(test, "delay-election")) {
4662 failure_test = FT_DELAY_ELECTION;
4663 struct raft *raft;
4664 HMAP_FOR_EACH (raft, hmap_node, &all_rafts) {
4665 if (raft->role == RAFT_FOLLOWER) {
4666 raft_reset_election_timer(raft);
4667 }
4668 }
4669 } else if (!strcmp(test, "clear")) {
4670 failure_test = FT_NO_TEST;
4671 unixctl_command_reply(conn, "test dismissed");
4672 return;
4673 } else {
4674 unixctl_command_reply_error(conn, "unknown test scenario");
4675 return;
4676 }
4677 unixctl_command_reply(conn, "test engaged");
4678 }
4679
4680 static void
4681 raft_init(void)
4682 {
4683 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
4684 if (!ovsthread_once_start(&once)) {
4685 return;
4686 }
4687 unixctl_command_register("cluster/cid", "DB", 1, 1,
4688 raft_unixctl_cid, NULL);
4689 unixctl_command_register("cluster/sid", "DB", 1, 1,
4690 raft_unixctl_sid, NULL);
4691 unixctl_command_register("cluster/status", "DB", 1, 1,
4692 raft_unixctl_status, NULL);
4693 unixctl_command_register("cluster/leave", "DB", 1, 1,
4694 raft_unixctl_leave, NULL);
4695 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4696 raft_unixctl_kick, NULL);
4697 unixctl_command_register("cluster/change-election-timer", "DB TIME", 2, 2,
4698 raft_unixctl_change_election_timer, NULL);
4699 unixctl_command_register("cluster/failure-test", "FAILURE SCENARIO", 1, 1,
4700 raft_unixctl_failure_test, NULL);
4701 ovsthread_once_done(&once);
4702 }