]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/raft.c
ovsdb raft: Support leader election time change online.
[mirror_ovs.git] / ovsdb / raft.c
1 /*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18
19 #include "raft.h"
20 #include "raft-private.h"
21
22 #include <errno.h>
23 #include <unistd.h>
24
25 #include "hash.h"
26 #include "jsonrpc.h"
27 #include "lockfile.h"
28 #include "openvswitch/dynamic-string.h"
29 #include "openvswitch/hmap.h"
30 #include "openvswitch/json.h"
31 #include "openvswitch/list.h"
32 #include "openvswitch/poll-loop.h"
33 #include "openvswitch/vlog.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-parser.h"
36 #include "ovsdb/log.h"
37 #include "raft-rpc.h"
38 #include "random.h"
39 #include "socket-util.h"
40 #include "stream.h"
41 #include "timeval.h"
42 #include "unicode.h"
43 #include "unixctl.h"
44 #include "util.h"
45 #include "uuid.h"
46
47 VLOG_DEFINE_THIS_MODULE(raft);
48
49 /* Roles for a Raft server:
50 *
51 * - Followers: Servers in touch with the current leader.
52 *
53 * - Candidate: Servers unaware of a current leader and seeking election to
54 * leader.
55 *
56 * - Leader: Handles all client requests. At most one at a time.
57 *
58 * In normal operation there is exactly one leader and all of the other servers
59 * are followers. */
60 enum raft_role {
61 RAFT_FOLLOWER,
62 RAFT_CANDIDATE,
63 RAFT_LEADER
64 };
65
66 /* Flags for unit tests. */
67 enum raft_failure_test {
68 FT_NO_TEST,
69 FT_CRASH_BEFORE_SEND_APPEND_REQ,
70 FT_CRASH_AFTER_SEND_APPEND_REQ,
71 FT_CRASH_BEFORE_SEND_EXEC_REP,
72 FT_CRASH_AFTER_SEND_EXEC_REP,
73 FT_CRASH_BEFORE_SEND_EXEC_REQ,
74 FT_CRASH_AFTER_SEND_EXEC_REQ,
75 FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE,
76 FT_DELAY_ELECTION
77 };
78 static enum raft_failure_test failure_test;
79
80 /* A connection between this Raft server and another one. */
81 struct raft_conn {
82 struct ovs_list list_node; /* In struct raft's 'conns' list. */
83 struct jsonrpc_session *js; /* JSON-RPC connection. */
84 struct uuid sid; /* This server's unique ID. */
85 char *nickname; /* Short name for use in log messages. */
86 bool incoming; /* True if incoming, false if outgoing. */
87 unsigned int js_seqno; /* Seqno for noticing (re)connections. */
88 };
89
90 static void raft_conn_close(struct raft_conn *);
91
92 /* A "command", that is, a request to append an entry to the log.
93 *
94 * The Raft specification only allows clients to issue commands to the leader.
95 * With this implementation, clients may issue a command on any server, which
96 * then relays the command to the leader if necessary.
97 *
98 * This structure is thus used in three cases:
99 *
100 * 1. We are the leader and the command was issued to us directly.
101 *
102 * 2. We are a follower and relayed the command to the leader.
103 *
104 * 3. We are the leader and a follower relayed the command to us.
105 */
106 struct raft_command {
107 /* All cases. */
108 struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
109 unsigned int n_refs; /* Reference count. */
110 enum raft_command_status status; /* Execution status. */
111 struct uuid eid; /* Entry ID of result. */
112
113 /* Case 1 only. */
114 uint64_t index; /* Index in log (0 if being relayed). */
115
116 /* Case 2 only. */
117 long long int timestamp; /* Issue or last ping time, for expiration. */
118
119 /* Case 3 only. */
120 struct uuid sid; /* The follower (otherwise UUID_ZERO). */
121 };
122
123 static void raft_command_complete(struct raft *, struct raft_command *,
124 enum raft_command_status);
125
126 static void raft_complete_all_commands(struct raft *,
127 enum raft_command_status);
128
129 /* Type of deferred action, see struct raft_waiter. */
130 enum raft_waiter_type {
131 RAFT_W_ENTRY,
132 RAFT_W_TERM,
133 RAFT_W_RPC,
134 };
135
136 /* An action deferred until a log write commits to disk. */
137 struct raft_waiter {
138 struct ovs_list list_node;
139 uint64_t commit_ticket;
140
141 enum raft_waiter_type type;
142 union {
143 /* RAFT_W_ENTRY.
144 *
145 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
146 * completion, updates 'log_synced' to indicate that the new log entry
147 * or entries are committed and, if we are leader, also updates our
148 * local 'match_index'. */
149 struct {
150 uint64_t index;
151 } entry;
152
153 /* RAFT_W_TERM.
154 *
155 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
156 * Upon completion, updates 'synced_term' and 'synced_vote', which
157 * triggers sending RPCs deferred by the uncommitted 'term' and
158 * 'vote'. */
159 struct {
160 uint64_t term;
161 struct uuid vote;
162 } term;
163
164 /* RAFT_W_RPC.
165 *
166 * Sometimes, sending an RPC to a peer must be delayed until an entry,
167 * a term, or a vote mentioned in the RPC is synced to disk. This
168 * waiter keeps a copy of such an RPC until the previous waiters have
169 * committed. */
170 union raft_rpc *rpc;
171 };
172 };
173
174 static struct raft_waiter *raft_waiter_create(struct raft *,
175 enum raft_waiter_type,
176 bool start_commit);
177 static void raft_waiters_destroy(struct raft *);
178
179 /* The Raft state machine. */
180 struct raft {
181 struct hmap_node hmap_node; /* In 'all_rafts'. */
182 struct ovsdb_log *log;
183
184 /* Persistent derived state.
185 *
186 * This must be updated on stable storage before responding to RPCs. It can be
187 * derived from the header, snapshot, and log in 'log'. */
188
189 struct uuid cid; /* Cluster ID (immutable for the cluster). */
190 struct uuid sid; /* Server ID (immutable for the server). */
191 char *local_address; /* Local address (immutable for the server). */
192 char *local_nickname; /* Used for local server in log messages. */
193 char *name; /* Schema name (immutable for the cluster). */
194
195 /* Contains "struct raft_server"s and represents the server configuration
196 * most recently added to 'log'. */
197 struct hmap servers;
198
199 #define ELECTION_BASE_MSEC 1000
200 #define ELECTION_RANGE_MSEC 1000
201 /* The election timeout base value for leader election, in milliseconds.
202 * It can be set by unixctl cluster/change-election-timer. Default value is
203 * ELECTION_BASE_MSEC. */
204 uint64_t election_timer;
205 /* If not 0, it is the new value of election_timer being proposed. */
206 uint64_t election_timer_new;
207
208 /* Persistent state on all servers.
209 *
210 * Must be updated on stable storage before responding to RPCs. */
211
212 /* Current term and the vote for that term. These might be on the way to
213 * disk now. */
214 uint64_t term; /* Initialized to 0 and only increases. */
215 struct uuid vote; /* All-zeros if no vote yet in 'term'. */
216
217 /* The term and vote that have been synced to disk. */
218 uint64_t synced_term;
219 struct uuid synced_vote;
220
221 /* The log.
222 *
223 * A log entry with index 1 never really exists; the initial snapshot for a
224 * Raft is considered to include this index. The first real log entry has
225 * index 2.
226 *
227 * A new Raft instance contains an empty log: log_start=2, log_end=2.
228 * Over time, the log grows: log_start=2, log_end=N.
229 * At some point, the server takes a snapshot: log_start=N, log_end=N.
230 * The log continues to grow: log_start=N, log_end=N+1...
231 *
232 * Must be updated on stable storage before responding to RPCs. */
233 struct raft_entry *entries; /* Log entry i is in log[i - log_start]. */
234 uint64_t log_start; /* Index of first entry in log. */
235 uint64_t log_end; /* Index of last entry in log, plus 1. */
236 uint64_t log_synced; /* Index of last synced entry. */
237 size_t allocated_log; /* Allocated entries in 'log'. */
238
239 /* Snapshot state (see Figure 5.1)
240 *
241 * This is the state of the cluster as of the last discarded log entry,
242 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
243 * Only committed log entries can be included in a snapshot. */
244 struct raft_entry snap;
245
246 /* Volatile state.
247 *
248 * The snapshot is always committed, but the rest of the log might not be yet.
249 * 'last_applied' tracks what entries have been passed to the client. If the
250 * client hasn't yet read the latest snapshot, then even the snapshot isn't
251 * applied yet. Thus, the invariants are different for these members:
252 *
253 * log_start - 2 <= last_applied <= commit_index < log_end.
254 * log_start - 1 <= commit_index < log_end.
255 */
256
257 enum raft_role role; /* Current role. */
258 uint64_t commit_index; /* Max log index known to be committed. */
259 uint64_t last_applied; /* Max log index applied to state machine. */
260 struct uuid leader_sid; /* Server ID of leader (zero, if unknown). */
261
262 long long int election_base; /* Time of last heartbeat from leader. */
263 long long int election_timeout; /* Time at which we start an election. */
264
265 /* Used for joining a cluster. */
266 bool joining; /* Attempting to join the cluster? */
267 struct sset remote_addresses; /* Addresses to try to find other servers. */
268 long long int join_timeout; /* Time to re-send add server request. */
269
270 /* Used for leaving a cluster. */
271 bool leaving; /* True if we are leaving the cluster. */
272 bool left; /* True if we have finished leaving. */
273 long long int leave_timeout; /* Time to re-send remove server request. */
274
275 /* Failure. */
276 bool failed; /* True if unrecoverable error has occurred. */
277
278 /* File synchronization. */
279 struct ovs_list waiters; /* Contains "struct raft_waiter"s. */
280
281 /* Network connections. */
282 struct pstream *listener; /* For connections from other Raft servers. */
283 long long int listen_backoff; /* For retrying creating 'listener'. */
284 struct ovs_list conns; /* Contains struct raft_conns. */
285
286 /* Leaders only. Reinitialized after becoming leader. */
287 struct hmap add_servers; /* Contains "struct raft_server"s to add. */
288 struct raft_server *remove_server; /* Server being removed. */
289 struct hmap commands; /* Contains "struct raft_command"s. */
290 long long int ping_timeout; /* Time at which to send a heartbeat */
291
292 /* Candidates only. Reinitialized at start of election. */
293 int n_votes; /* Number of votes for me. */
294
295 /* Followers and candidates only. */
296 bool candidate_retrying; /* The earlier election timed-out and we are
297 now retrying. */
298 bool had_leader; /* There has been leader elected since last
299 election initiated. This is to help setting
300 candidate_retrying. */
301 };
302
303 /* All Raft structures. */
304 static struct hmap all_rafts = HMAP_INITIALIZER(&all_rafts);
305
306 static void raft_init(void);
307
308 static struct ovsdb_error *raft_read_header(struct raft *)
309 OVS_WARN_UNUSED_RESULT;
310
311 static void raft_send_execute_command_reply(struct raft *,
312 const struct uuid *sid,
313 const struct uuid *eid,
314 enum raft_command_status,
315 uint64_t commit_index);
316
317 static void raft_update_our_match_index(struct raft *, uint64_t min_index);
318
319 static void raft_send_remove_server_reply__(
320 struct raft *, const struct uuid *target_sid,
321 const struct uuid *requester_sid, struct unixctl_conn *requester_conn,
322 bool success, const char *comment);
323 static void raft_finished_leaving_cluster(struct raft *);
324
325 static void raft_server_init_leader(struct raft *, struct raft_server *);
326
327 static bool raft_rpc_is_heartbeat(const union raft_rpc *);
328 static bool raft_is_rpc_synced(const struct raft *, const union raft_rpc *);
329
330 static void raft_handle_rpc(struct raft *, const union raft_rpc *);
331
332 static bool raft_send_at(struct raft *, const union raft_rpc *,
333 int line_number);
334 #define raft_send(raft, rpc) raft_send_at(raft, rpc, __LINE__)
335
336 static bool raft_send_to_conn_at(struct raft *, const union raft_rpc *,
337 struct raft_conn *, int line_number);
338 #define raft_send_to_conn(raft, rpc, conn) \
339 raft_send_to_conn_at(raft, rpc, conn, __LINE__)
340
341 static void raft_send_append_request(struct raft *,
342 struct raft_server *, unsigned int n,
343 const char *comment);
344
345 static void raft_become_leader(struct raft *);
346 static void raft_become_follower(struct raft *);
347 static void raft_reset_election_timer(struct raft *);
348 static void raft_reset_ping_timer(struct raft *);
349 static void raft_send_heartbeats(struct raft *);
350 static void raft_start_election(struct raft *, bool leadership_transfer);
351 static bool raft_truncate(struct raft *, uint64_t new_end);
352 static void raft_get_servers_from_log(struct raft *, enum vlog_level);
353 static void raft_get_election_timer_from_log(struct raft *);
354
355 static bool raft_handle_write_error(struct raft *, struct ovsdb_error *);
356
357 static void raft_run_reconfigure(struct raft *);
358
359 static void raft_set_leader(struct raft *, const struct uuid *sid);
360 static struct raft_server *
361 raft_find_server(const struct raft *raft, const struct uuid *sid)
362 {
363 return raft_server_find(&raft->servers, sid);
364 }
365
366 static char *
367 raft_make_address_passive(const char *address_)
368 {
369 if (!strncmp(address_, "unix:", 5)) {
370 return xasprintf("p%s", address_);
371 } else {
372 char *address = xstrdup(address_);
373 char *host, *port;
374 inet_parse_host_port_tokens(strchr(address, ':') + 1, &host, &port);
375
376 struct ds paddr = DS_EMPTY_INITIALIZER;
377 ds_put_format(&paddr, "p%.3s:%s:", address, port);
378 if (strchr(host, ':')) {
379 ds_put_format(&paddr, "[%s]", host);
380 } else {
381 ds_put_cstr(&paddr, host);
382 }
383 free(address);
384 return ds_steal_cstr(&paddr);
385 }
386 }
387
388 static struct raft *
389 raft_alloc(void)
390 {
391 raft_init();
392
393 struct raft *raft = xzalloc(sizeof *raft);
394 hmap_node_nullify(&raft->hmap_node);
395 hmap_init(&raft->servers);
396 raft->log_start = raft->log_end = 1;
397 raft->role = RAFT_FOLLOWER;
398 sset_init(&raft->remote_addresses);
399 raft->join_timeout = LLONG_MAX;
400 ovs_list_init(&raft->waiters);
401 raft->listen_backoff = LLONG_MIN;
402 ovs_list_init(&raft->conns);
403 hmap_init(&raft->add_servers);
404 hmap_init(&raft->commands);
405
406 raft->election_timer = ELECTION_BASE_MSEC;
407 raft_reset_ping_timer(raft);
408 raft_reset_election_timer(raft);
409
410 return raft;
411 }
412
413 /* Creates an on-disk file that represents a new Raft cluster and initializes
414 * it to consist of a single server, the one on which this function is called.
415 *
416 * Creates the local copy of the cluster's log in 'file_name', which must not
417 * already exist. Gives it the name 'name', which should be the database
418 * schema name and which is used only to match up this database with the server
419 * added to the cluster later if the cluster ID is unavailable.
420 *
421 * The new server is located at 'local_address', which must take one of the
422 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
423 * square bracket enclosed IPv6 address and PORT is a TCP port number.
424 *
425 * This only creates the on-disk file. Use raft_open() to start operating the
426 * new server.
427 *
428 * Returns null if successful, otherwise an ovsdb_error describing the
429 * problem. */
430 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
431 raft_create_cluster(const char *file_name, const char *name,
432 const char *local_address, const struct json *data)
433 {
434 /* Parse and verify validity of the local address. */
435 struct ovsdb_error *error = raft_address_validate(local_address);
436 if (error) {
437 return error;
438 }
439
440 /* Create log file. */
441 struct ovsdb_log *log;
442 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
443 -1, &log);
444 if (error) {
445 return error;
446 }
447
448 /* Write log file. */
449 struct raft_header h = {
450 .sid = uuid_random(),
451 .cid = uuid_random(),
452 .name = xstrdup(name),
453 .local_address = xstrdup(local_address),
454 .joining = false,
455 .remote_addresses = SSET_INITIALIZER(&h.remote_addresses),
456 .snap_index = 1,
457 .snap = {
458 .term = 1,
459 .data = json_nullable_clone(data),
460 .eid = uuid_random(),
461 .servers = json_object_create(),
462 },
463 };
464 shash_add_nocopy(json_object(h.snap.servers),
465 xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
466 json_string_create(local_address));
467 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
468 raft_header_uninit(&h);
469 if (!error) {
470 error = ovsdb_log_commit_block(log);
471 }
472 ovsdb_log_close(log);
473
474 return error;
475 }
476
477 /* Creates a database file that represents a new server in an existing Raft
478 * cluster.
479 *
480 * Creates the local copy of the cluster's log in 'file_name', which must not
481 * already exist. Gives it the name 'name', which must be the same name
482 * passed in to raft_create_cluster() earlier.
483 *
484 * 'cid' is optional. If specified, the new server will join only the cluster
485 * with the given cluster ID.
486 *
487 * The new server is located at 'local_address', which must take one of the
488 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
489 * square bracket enclosed IPv6 address and PORT is a TCP port number.
490 *
491 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
492 * specifies the addresses of existing servers in the cluster. One server out
493 * of the existing cluster is sufficient, as long as that server is reachable
494 * and not partitioned from the current cluster leader. If multiple servers
495 * from the cluster are specified, then it is sufficient for any of them to
496 * meet this criterion.
497 *
498 * This only creates the on-disk file and does no network access. Use
499 * raft_open() to start operating the new server. (Until this happens, the
500 * new server has not joined the cluster.)
501 *
502 * Returns null if successful, otherwise an ovsdb_error describing the
503 * problem. */
504 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
505 raft_join_cluster(const char *file_name,
506 const char *name, const char *local_address,
507 const struct sset *remote_addresses,
508 const struct uuid *cid)
509 {
510 ovs_assert(!sset_is_empty(remote_addresses));
511
512 /* Parse and verify validity of the addresses. */
513 struct ovsdb_error *error = raft_address_validate(local_address);
514 if (error) {
515 return error;
516 }
517 const char *addr;
518 SSET_FOR_EACH (addr, remote_addresses) {
519 error = raft_address_validate(addr);
520 if (error) {
521 return error;
522 }
523 if (!strcmp(addr, local_address)) {
524 return ovsdb_error(NULL, "remote addresses cannot be the same "
525 "as the local address");
526 }
527 }
528
529 /* Verify validity of the cluster ID (if provided). */
530 if (cid && uuid_is_zero(cid)) {
531 return ovsdb_error(NULL, "all-zero UUID is not valid cluster ID");
532 }
533
534 /* Create log file. */
535 struct ovsdb_log *log;
536 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
537 -1, &log);
538 if (error) {
539 return error;
540 }
541
542 /* Write log file. */
543 struct raft_header h = {
544 .sid = uuid_random(),
545 .cid = cid ? *cid : UUID_ZERO,
546 .name = xstrdup(name),
547 .local_address = xstrdup(local_address),
548 .joining = true,
549 /* No snapshot yet. */
550 };
551 sset_clone(&h.remote_addresses, remote_addresses);
552 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
553 raft_header_uninit(&h);
554 if (!error) {
555 error = ovsdb_log_commit_block(log);
556 }
557 ovsdb_log_close(log);
558
559 return error;
560 }
561
562 /* Reads the initial header record from 'log', which must be a Raft clustered
563 * database log, and populates '*md' with the information read from it. The
564 * caller must eventually destroy 'md' with raft_metadata_destroy().
565 *
566 * On success, returns NULL. On failure, returns an error that the caller must
567 * eventually destroy and zeros '*md'. */
568 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
569 raft_read_metadata(struct ovsdb_log *log, struct raft_metadata *md)
570 {
571 struct raft *raft = raft_alloc();
572 raft->log = log;
573
574 struct ovsdb_error *error = raft_read_header(raft);
575 if (!error) {
576 md->sid = raft->sid;
577 md->name = xstrdup(raft->name);
578 md->local = xstrdup(raft->local_address);
579 md->cid = raft->cid;
580 } else {
581 memset(md, 0, sizeof *md);
582 }
583
584 raft->log = NULL;
585 raft_close(raft);
586 return error;
587 }
588
589 /* Frees the metadata in 'md'. */
590 void
591 raft_metadata_destroy(struct raft_metadata *md)
592 {
593 if (md) {
594 free(md->name);
595 free(md->local);
596 }
597 }
598
599 static const struct raft_entry *
600 raft_get_entry(const struct raft *raft, uint64_t index)
601 {
602 ovs_assert(index >= raft->log_start);
603 ovs_assert(index < raft->log_end);
604 return &raft->entries[index - raft->log_start];
605 }
606
607 static uint64_t
608 raft_get_term(const struct raft *raft, uint64_t index)
609 {
610 return (index == raft->log_start - 1
611 ? raft->snap.term
612 : raft_get_entry(raft, index)->term);
613 }
614
615 static const struct json *
616 raft_servers_for_index(const struct raft *raft, uint64_t index)
617 {
618 ovs_assert(index >= raft->log_start - 1);
619 ovs_assert(index < raft->log_end);
620
621 const struct json *servers = raft->snap.servers;
622 for (uint64_t i = raft->log_start; i <= index; i++) {
623 const struct raft_entry *e = raft_get_entry(raft, i);
624 if (e->servers) {
625 servers = e->servers;
626 }
627 }
628 return servers;
629 }
630
631 static void
632 raft_set_servers(struct raft *raft, const struct hmap *new_servers,
633 enum vlog_level level)
634 {
635 struct raft_server *s, *next;
636 HMAP_FOR_EACH_SAFE (s, next, hmap_node, &raft->servers) {
637 if (!raft_server_find(new_servers, &s->sid)) {
638 ovs_assert(s != raft->remove_server);
639
640 hmap_remove(&raft->servers, &s->hmap_node);
641 VLOG(level, "server %s removed from configuration", s->nickname);
642 raft_server_destroy(s);
643 }
644 }
645
646 HMAP_FOR_EACH_SAFE (s, next, hmap_node, new_servers) {
647 if (!raft_find_server(raft, &s->sid)) {
648 VLOG(level, "server %s added to configuration", s->nickname);
649
650 struct raft_server *new
651 = raft_server_add(&raft->servers, &s->sid, s->address);
652 raft_server_init_leader(raft, new);
653 }
654 }
655 }
656
657 static uint64_t
658 raft_add_entry(struct raft *raft,
659 uint64_t term, struct json *data, const struct uuid *eid,
660 struct json *servers, uint64_t election_timer)
661 {
662 if (raft->log_end - raft->log_start >= raft->allocated_log) {
663 raft->entries = x2nrealloc(raft->entries, &raft->allocated_log,
664 sizeof *raft->entries);
665 }
666
667 uint64_t index = raft->log_end++;
668 struct raft_entry *entry = &raft->entries[index - raft->log_start];
669 entry->term = term;
670 entry->data = data;
671 entry->eid = eid ? *eid : UUID_ZERO;
672 entry->servers = servers;
673 entry->election_timer = election_timer;
674 return index;
675 }
676
677 /* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers',
678 * 'election_timer' to * 'raft''s log and returns an error indication. */
679 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
680 raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
681 const struct uuid *eid, struct json *servers,
682 uint64_t election_timer)
683 {
684 struct raft_record r = {
685 .type = RAFT_REC_ENTRY,
686 .term = term,
687 .entry = {
688 .index = raft_add_entry(raft, term, data, eid, servers,
689 election_timer),
690 .data = data,
691 .servers = servers,
692 .election_timer = election_timer,
693 .eid = eid ? *eid : UUID_ZERO,
694 },
695 };
696 return ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r));
697 }
698
699 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
700 raft_write_state(struct ovsdb_log *log,
701 uint64_t term, const struct uuid *vote)
702 {
703 struct raft_record r = { .term = term };
704 if (vote && !uuid_is_zero(vote)) {
705 r.type = RAFT_REC_VOTE;
706 r.sid = *vote;
707 } else {
708 r.type = RAFT_REC_TERM;
709 }
710 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
711 }
712
713 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
714 raft_apply_record(struct raft *raft, unsigned long long int rec_idx,
715 const struct raft_record *r)
716 {
717 /* Apply "term", which is present in most kinds of records (and otherwise
718 * 0).
719 *
720 * A Raft leader can replicate entries from previous terms to the other
721 * servers in the cluster, retaining the original terms on those entries
722 * (see section 3.6.2 "Committing entries from previous terms" for more
723 * information), so it's OK for the term in a log record to precede the
724 * current term. */
725 if (r->term > raft->term) {
726 raft->term = raft->synced_term = r->term;
727 raft->vote = raft->synced_vote = UUID_ZERO;
728 }
729
730 switch (r->type) {
731 case RAFT_REC_ENTRY:
732 if (r->entry.index < raft->commit_index) {
733 return ovsdb_error(NULL, "record %llu attempts to truncate log "
734 "from %"PRIu64" to %"PRIu64" entries, but "
735 "commit index is already %"PRIu64,
736 rec_idx, raft->log_end, r->entry.index,
737 raft->commit_index);
738 } else if (r->entry.index > raft->log_end) {
739 return ovsdb_error(NULL, "record %llu with index %"PRIu64" skips "
740 "past expected index %"PRIu64,
741 rec_idx, r->entry.index, raft->log_end);
742 }
743
744 if (r->entry.index < raft->log_end) {
745 /* This can happen, but it is notable. */
746 VLOG_DBG("record %llu truncates log from %"PRIu64" to %"PRIu64
747 " entries", rec_idx, raft->log_end, r->entry.index);
748 raft_truncate(raft, r->entry.index);
749 }
750
751 uint64_t prev_term = (raft->log_end > raft->log_start
752 ? raft->entries[raft->log_end
753 - raft->log_start - 1].term
754 : raft->snap.term);
755 if (r->term < prev_term) {
756 return ovsdb_error(NULL, "record %llu with index %"PRIu64" term "
757 "%"PRIu64" precedes previous entry's term "
758 "%"PRIu64,
759 rec_idx, r->entry.index, r->term, prev_term);
760 }
761
762 raft->log_synced = raft_add_entry(
763 raft, r->term,
764 json_nullable_clone(r->entry.data), &r->entry.eid,
765 json_nullable_clone(r->entry.servers),
766 r->entry.election_timer);
767 return NULL;
768
769 case RAFT_REC_TERM:
770 return NULL;
771
772 case RAFT_REC_VOTE:
773 if (r->term < raft->term) {
774 return ovsdb_error(NULL, "record %llu votes for term %"PRIu64" "
775 "but current term is %"PRIu64,
776 rec_idx, r->term, raft->term);
777 } else if (!uuid_is_zero(&raft->vote)
778 && !uuid_equals(&raft->vote, &r->sid)) {
779 return ovsdb_error(NULL, "record %llu votes for "SID_FMT" in term "
780 "%"PRIu64" but a previous record for the "
781 "same term voted for "SID_FMT, rec_idx,
782 SID_ARGS(&raft->vote), r->term,
783 SID_ARGS(&r->sid));
784 } else {
785 raft->vote = raft->synced_vote = r->sid;
786 return NULL;
787 }
788 break;
789
790 case RAFT_REC_NOTE:
791 if (!strcmp(r->note, "left")) {
792 return ovsdb_error(NULL, "record %llu indicates server has left "
793 "the cluster; it cannot be added back (use "
794 "\"ovsdb-tool join-cluster\" to add a new "
795 "server)", rec_idx);
796 }
797 return NULL;
798
799 case RAFT_REC_COMMIT_INDEX:
800 if (r->commit_index < raft->commit_index) {
801 return ovsdb_error(NULL, "record %llu regresses commit index "
802 "from %"PRIu64 " to %"PRIu64,
803 rec_idx, raft->commit_index, r->commit_index);
804 } else if (r->commit_index >= raft->log_end) {
805 return ovsdb_error(NULL, "record %llu advances commit index to "
806 "%"PRIu64 " but last log index is %"PRIu64,
807 rec_idx, r->commit_index, raft->log_end - 1);
808 } else {
809 raft->commit_index = r->commit_index;
810 return NULL;
811 }
812 break;
813
814 case RAFT_REC_LEADER:
815 /* XXX we could use this to take back leadership for quick restart */
816 return NULL;
817
818 default:
819 OVS_NOT_REACHED();
820 }
821 }
822
823 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
824 raft_read_header(struct raft *raft)
825 {
826 /* Read header record. */
827 struct json *json;
828 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
829 if (error || !json) {
830 /* Report error or end-of-file. */
831 return error;
832 }
833 ovsdb_log_mark_base(raft->log);
834
835 struct raft_header h;
836 error = raft_header_from_json(&h, json);
837 json_destroy(json);
838 if (error) {
839 return error;
840 }
841
842 raft->sid = h.sid;
843 raft->cid = h.cid;
844 raft->name = xstrdup(h.name);
845 raft->local_address = xstrdup(h.local_address);
846 raft->local_nickname = raft_address_to_nickname(h.local_address, &h.sid);
847 raft->joining = h.joining;
848
849 if (h.joining) {
850 sset_clone(&raft->remote_addresses, &h.remote_addresses);
851 } else {
852 raft_entry_clone(&raft->snap, &h.snap);
853 raft->log_start = raft->log_end = h.snap_index + 1;
854 raft->commit_index = h.snap_index;
855 raft->last_applied = h.snap_index - 1;
856 }
857
858 raft_header_uninit(&h);
859
860 return NULL;
861 }
862
863 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
864 raft_read_log(struct raft *raft)
865 {
866 for (unsigned long long int i = 1; ; i++) {
867 struct json *json;
868 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
869 if (!json) {
870 if (error) {
871 /* We assume that the error is due to a partial write while
872 * appending to the file before a crash, so log it and
873 * continue. */
874 char *error_string = ovsdb_error_to_string_free(error);
875 VLOG_WARN("%s", error_string);
876 free(error_string);
877 error = NULL;
878 }
879 break;
880 }
881
882 struct raft_record r;
883 error = raft_record_from_json(&r, json);
884 if (!error) {
885 error = raft_apply_record(raft, i, &r);
886 raft_record_uninit(&r);
887 }
888 if (error) {
889 return ovsdb_wrap_error(error, "error reading record %llu from "
890 "%s log", i, raft->name);
891 }
892 }
893
894 /* Set the most recent servers. */
895 raft_get_servers_from_log(raft, VLL_DBG);
896
897 /* Set the most recent election_timer. */
898 raft_get_election_timer_from_log(raft);
899
900 return NULL;
901 }
902
903 static void
904 raft_reset_election_timer(struct raft *raft)
905 {
906 unsigned int duration = (raft->election_timer
907 + random_range(ELECTION_RANGE_MSEC));
908 raft->election_base = time_msec();
909 if (failure_test == FT_DELAY_ELECTION) {
910 /* Slow down this node so that it won't win the next election. */
911 duration += raft->election_timer;
912 }
913 raft->election_timeout = raft->election_base + duration;
914 }
915
916 static void
917 raft_reset_ping_timer(struct raft *raft)
918 {
919 raft->ping_timeout = time_msec() + raft->election_timer / 3;
920 }
921
922 static void
923 raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
924 const struct uuid *sid, bool incoming)
925 {
926 struct raft_conn *conn = xzalloc(sizeof *conn);
927 ovs_list_push_back(&raft->conns, &conn->list_node);
928 conn->js = js;
929 if (sid) {
930 conn->sid = *sid;
931 }
932 conn->nickname = raft_address_to_nickname(jsonrpc_session_get_name(js),
933 &conn->sid);
934 conn->incoming = incoming;
935 conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
936 }
937
938 /* Starts the local server in an existing Raft cluster, using the local copy of
939 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
940 * successful or not. */
941 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
942 raft_open(struct ovsdb_log *log, struct raft **raftp)
943 {
944 struct raft *raft = raft_alloc();
945 raft->log = log;
946
947 struct ovsdb_error *error = raft_read_header(raft);
948 if (error) {
949 goto error;
950 }
951
952 if (!raft->joining) {
953 error = raft_read_log(raft);
954 if (error) {
955 goto error;
956 }
957
958 /* Find our own server. */
959 if (!raft_find_server(raft, &raft->sid)) {
960 error = ovsdb_error(NULL, "server does not belong to cluster");
961 goto error;
962 }
963
964 /* If there's only one server, start an election right away so that the
965 * cluster bootstraps quickly. */
966 if (hmap_count(&raft->servers) == 1) {
967 raft_start_election(raft, false);
968 }
969 } else {
970 raft->join_timeout = time_msec() + 1000;
971 }
972
973 *raftp = raft;
974 hmap_insert(&all_rafts, &raft->hmap_node, hash_string(raft->name, 0));
975 return NULL;
976
977 error:
978 raft_close(raft);
979 *raftp = NULL;
980 return error;
981 }
982
983 /* Returns the name of 'raft', which in OVSDB is the database schema name. */
984 const char *
985 raft_get_name(const struct raft *raft)
986 {
987 return raft->name;
988 }
989
990 /* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
991 * its cluster, then 'cid' will be all-zeros (unless the administrator
992 * specified a cluster ID running "ovsdb-tool join-cluster").
993 *
994 * Each cluster has a unique cluster ID. */
995 const struct uuid *
996 raft_get_cid(const struct raft *raft)
997 {
998 return &raft->cid;
999 }
1000
1001 /* Returns the server ID of 'raft'. Each server has a unique server ID. */
1002 const struct uuid *
1003 raft_get_sid(const struct raft *raft)
1004 {
1005 return &raft->sid;
1006 }
1007
1008 /* Returns true if 'raft' has completed joining its cluster, has not left or
1009 * initiated leaving the cluster, does not have failed disk storage, and is
1010 * apparently connected to the leader in a healthy way (or is itself the
1011 * leader).
1012 *
1013 * If 'raft' is candidate:
1014 * a) if it is the first round of election, consider it as connected, hoping
1015 * it will successfully elect a new leader soon.
1016 * b) if it is already retrying, consider it as disconnected (so that clients
1017 * may decide to reconnect to other members). */
1018 bool
1019 raft_is_connected(const struct raft *raft)
1020 {
1021 bool ret = (!raft->candidate_retrying
1022 && !raft->joining
1023 && !raft->leaving
1024 && !raft->left
1025 && !raft->failed);
1026 VLOG_DBG("raft_is_connected: %s\n", ret? "true": "false");
1027 return ret;
1028 }
1029
1030 /* Returns true if 'raft' is the cluster leader. */
1031 bool
1032 raft_is_leader(const struct raft *raft)
1033 {
1034 return raft->role == RAFT_LEADER;
1035 }
1036
1037 /* Returns true if 'raft' is the process of joining its cluster. */
1038 bool
1039 raft_is_joining(const struct raft *raft)
1040 {
1041 return raft->joining;
1042 }
1043
1044 /* Only returns *connected* connections. */
1045 static struct raft_conn *
1046 raft_find_conn_by_sid(struct raft *raft, const struct uuid *sid)
1047 {
1048 if (!uuid_is_zero(sid)) {
1049 struct raft_conn *conn;
1050 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1051 if (uuid_equals(sid, &conn->sid)
1052 && jsonrpc_session_is_connected(conn->js)) {
1053 return conn;
1054 }
1055 }
1056 }
1057 return NULL;
1058 }
1059
1060 static struct raft_conn *
1061 raft_find_conn_by_address(struct raft *raft, const char *address)
1062 {
1063 struct raft_conn *conn;
1064 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1065 if (!strcmp(jsonrpc_session_get_name(conn->js), address)) {
1066 return conn;
1067 }
1068 }
1069 return NULL;
1070 }
1071
1072 static void OVS_PRINTF_FORMAT(3, 4)
1073 raft_record_note(struct raft *raft, const char *note,
1074 const char *comment_format, ...)
1075 {
1076 va_list args;
1077 va_start(args, comment_format);
1078 char *comment = xvasprintf(comment_format, args);
1079 va_end(args);
1080
1081 struct raft_record r = {
1082 .type = RAFT_REC_NOTE,
1083 .comment = comment,
1084 .note = CONST_CAST(char *, note),
1085 };
1086 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
1087
1088 free(comment);
1089 }
1090
1091 /* If we're leader, try to transfer leadership to another server, logging
1092 * 'reason' as the human-readable reason (it should be a phrase suitable for
1093 * following "because") . */
1094 void
1095 raft_transfer_leadership(struct raft *raft, const char *reason)
1096 {
1097 if (raft->role != RAFT_LEADER) {
1098 return;
1099 }
1100
1101 struct raft_server *s;
1102 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1103 if (!uuid_equals(&raft->sid, &s->sid)
1104 && s->phase == RAFT_PHASE_STABLE) {
1105 struct raft_conn *conn = raft_find_conn_by_sid(raft, &s->sid);
1106 if (!conn) {
1107 continue;
1108 }
1109
1110 union raft_rpc rpc = {
1111 .become_leader = {
1112 .common = {
1113 .comment = CONST_CAST(char *, reason),
1114 .type = RAFT_RPC_BECOME_LEADER,
1115 .sid = s->sid,
1116 },
1117 .term = raft->term,
1118 }
1119 };
1120 raft_send_to_conn(raft, &rpc, conn);
1121
1122 raft_record_note(raft, "transfer leadership",
1123 "transferring leadership to %s because %s",
1124 s->nickname, reason);
1125 break;
1126 }
1127 }
1128 }
1129
1130 /* Send a RemoveServerRequest to the rest of the servers in the cluster.
1131 *
1132 * If we know which server is the leader, we can just send the request to it.
1133 * However, we might not know which server is the leader, and we might never
1134 * find out if the remove request was actually previously committed by a
1135 * majority of the servers (because in that case the new leader will not send
1136 * AppendRequests or heartbeats to us). Therefore, we instead send
1137 * RemoveRequests to every server. This theoretically has the same problem, if
1138 * the current cluster leader was not previously a member of the cluster, but
1139 * it seems likely to be more robust in practice. */
1140 static void
1141 raft_send_remove_server_requests(struct raft *raft)
1142 {
1143 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1144 VLOG_INFO_RL(&rl, "sending remove request (joining=%s, leaving=%s)",
1145 raft->joining ? "true" : "false",
1146 raft->leaving ? "true" : "false");
1147 const struct raft_server *s;
1148 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1149 if (!uuid_equals(&s->sid, &raft->sid)) {
1150 union raft_rpc rpc = (union raft_rpc) {
1151 .remove_server_request = {
1152 .common = {
1153 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1154 .sid = s->sid,
1155 },
1156 .sid = raft->sid,
1157 },
1158 };
1159 raft_send(raft, &rpc);
1160 }
1161 }
1162
1163 raft->leave_timeout = time_msec() + raft->election_timer;
1164 }
1165
1166 /* Attempts to start 'raft' leaving its cluster. The caller can check progress
1167 * using raft_is_leaving() and raft_left(). */
1168 void
1169 raft_leave(struct raft *raft)
1170 {
1171 if (raft->joining || raft->failed || raft->leaving || raft->left) {
1172 return;
1173 }
1174 VLOG_INFO(SID_FMT": starting to leave cluster "CID_FMT,
1175 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1176 raft->leaving = true;
1177 raft_transfer_leadership(raft, "this server is leaving the cluster");
1178 raft_become_follower(raft);
1179 raft_send_remove_server_requests(raft);
1180 raft->leave_timeout = time_msec() + raft->election_timer;
1181 }
1182
1183 /* Returns true if 'raft' is currently attempting to leave its cluster. */
1184 bool
1185 raft_is_leaving(const struct raft *raft)
1186 {
1187 return raft->leaving;
1188 }
1189
1190 /* Returns true if 'raft' successfully left its cluster. */
1191 bool
1192 raft_left(const struct raft *raft)
1193 {
1194 return raft->left;
1195 }
1196
1197 /* Returns true if 'raft' has experienced a disk I/O failure. When this
1198 * returns true, only closing and reopening 'raft' allows for recovery. */
1199 bool
1200 raft_failed(const struct raft *raft)
1201 {
1202 return raft->failed;
1203 }
1204
1205 /* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1206 * current cluster. */
1207 void
1208 raft_take_leadership(struct raft *raft)
1209 {
1210 if (raft->role != RAFT_LEADER) {
1211 raft_start_election(raft, true);
1212 }
1213 }
1214
1215 /* Closes everything owned by 'raft' that might be visible outside the process:
1216 * network connections, commands, etc. This is part of closing 'raft'; it is
1217 * also used if 'raft' has failed in an unrecoverable way. */
1218 static void
1219 raft_close__(struct raft *raft)
1220 {
1221 if (!hmap_node_is_null(&raft->hmap_node)) {
1222 hmap_remove(&all_rafts, &raft->hmap_node);
1223 hmap_node_nullify(&raft->hmap_node);
1224 }
1225
1226 raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
1227
1228 struct raft_server *rs = raft->remove_server;
1229 if (rs) {
1230 raft_send_remove_server_reply__(raft, &rs->sid, &rs->requester_sid,
1231 rs->requester_conn, false,
1232 RAFT_SERVER_SHUTDOWN);
1233 raft_server_destroy(raft->remove_server);
1234 raft->remove_server = NULL;
1235 }
1236
1237 struct raft_conn *conn, *next;
1238 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1239 raft_conn_close(conn);
1240 }
1241 }
1242
1243 /* Closes and frees 'raft'.
1244 *
1245 * A server's cluster membership is independent of whether the server is
1246 * actually running. When a server that is a member of a cluster closes, the
1247 * cluster treats this as a server failure. */
1248 void
1249 raft_close(struct raft *raft)
1250 {
1251 if (!raft) {
1252 return;
1253 }
1254
1255 raft_transfer_leadership(raft, "this server is shutting down");
1256
1257 raft_close__(raft);
1258
1259 ovsdb_log_close(raft->log);
1260
1261 raft_servers_destroy(&raft->servers);
1262
1263 for (uint64_t index = raft->log_start; index < raft->log_end; index++) {
1264 struct raft_entry *e = &raft->entries[index - raft->log_start];
1265 raft_entry_uninit(e);
1266 }
1267 free(raft->entries);
1268
1269 raft_entry_uninit(&raft->snap);
1270
1271 raft_waiters_destroy(raft);
1272
1273 raft_servers_destroy(&raft->add_servers);
1274
1275 hmap_destroy(&raft->commands);
1276
1277 pstream_close(raft->listener);
1278
1279 sset_destroy(&raft->remote_addresses);
1280 free(raft->local_address);
1281 free(raft->local_nickname);
1282 free(raft->name);
1283
1284 free(raft);
1285 }
1286
1287 static bool
1288 raft_conn_receive(struct raft *raft, struct raft_conn *conn,
1289 union raft_rpc *rpc)
1290 {
1291 struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
1292 if (!msg) {
1293 return false;
1294 }
1295
1296 struct ovsdb_error *error = raft_rpc_from_jsonrpc(&raft->cid, &raft->sid,
1297 msg, rpc);
1298 jsonrpc_msg_destroy(msg);
1299 if (error) {
1300 char *s = ovsdb_error_to_string_free(error);
1301 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn->js), s);
1302 free(s);
1303 return false;
1304 }
1305
1306 if (uuid_is_zero(&conn->sid)) {
1307 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1308 conn->sid = rpc->common.sid;
1309 VLOG_INFO_RL(&rl, "%s: learned server ID "SID_FMT,
1310 jsonrpc_session_get_name(conn->js), SID_ARGS(&conn->sid));
1311 } else if (!uuid_equals(&conn->sid, &rpc->common.sid)) {
1312 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1313 VLOG_WARN_RL(&rl, "%s: ignoring message with unexpected server ID "
1314 SID_FMT" (expected "SID_FMT")",
1315 jsonrpc_session_get_name(conn->js),
1316 SID_ARGS(&rpc->common.sid), SID_ARGS(&conn->sid));
1317 raft_rpc_uninit(rpc);
1318 return false;
1319 }
1320
1321 const char *address = (rpc->type == RAFT_RPC_HELLO_REQUEST
1322 ? rpc->hello_request.address
1323 : rpc->type == RAFT_RPC_ADD_SERVER_REQUEST
1324 ? rpc->add_server_request.address
1325 : NULL);
1326 if (address) {
1327 char *new_nickname = raft_address_to_nickname(address, &conn->sid);
1328 if (strcmp(conn->nickname, new_nickname)) {
1329 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1330 VLOG_INFO_RL(&rl, "%s: learned remote address %s",
1331 jsonrpc_session_get_name(conn->js), address);
1332
1333 free(conn->nickname);
1334 conn->nickname = new_nickname;
1335 } else {
1336 free(new_nickname);
1337 }
1338 }
1339
1340 return true;
1341 }
1342
1343 static const char *
1344 raft_get_nickname(const struct raft *raft, const struct uuid *sid,
1345 char buf[SID_LEN + 1], size_t bufsize)
1346 {
1347 if (uuid_equals(sid, &raft->sid)) {
1348 return raft->local_nickname;
1349 }
1350
1351 const char *s = raft_servers_get_nickname__(&raft->servers, sid);
1352 if (s) {
1353 return s;
1354 }
1355
1356 return raft_servers_get_nickname(&raft->add_servers, sid, buf, bufsize);
1357 }
1358
1359 static void
1360 log_rpc(const union raft_rpc *rpc, const char *direction,
1361 const struct raft_conn *conn, int line_number)
1362 {
1363 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
1364 if (!raft_rpc_is_heartbeat(rpc) && !VLOG_DROP_DBG(&rl)) {
1365 struct ds s = DS_EMPTY_INITIALIZER;
1366 if (line_number) {
1367 ds_put_format(&s, "raft.c:%d ", line_number);
1368 }
1369 ds_put_format(&s, "%s%s ", direction, conn->nickname);
1370 raft_rpc_format(rpc, &s);
1371 VLOG_DBG("%s", ds_cstr(&s));
1372 ds_destroy(&s);
1373 }
1374 }
1375
1376 static void
1377 raft_send_add_server_request(struct raft *raft, struct raft_conn *conn)
1378 {
1379 union raft_rpc rq = {
1380 .add_server_request = {
1381 .common = {
1382 .type = RAFT_RPC_ADD_SERVER_REQUEST,
1383 .sid = UUID_ZERO,
1384 .comment = NULL,
1385 },
1386 .address = raft->local_address,
1387 },
1388 };
1389 raft_send_to_conn(raft, &rq, conn);
1390 }
1391
1392 static void
1393 raft_conn_run(struct raft *raft, struct raft_conn *conn)
1394 {
1395 jsonrpc_session_run(conn->js);
1396
1397 unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
1398 bool just_connected = (new_seqno != conn->js_seqno
1399 && jsonrpc_session_is_connected(conn->js));
1400 conn->js_seqno = new_seqno;
1401 if (just_connected) {
1402 if (raft->joining) {
1403 raft_send_add_server_request(raft, conn);
1404 } else if (raft->leaving) {
1405 union raft_rpc rq = {
1406 .remove_server_request = {
1407 .common = {
1408 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1409 .sid = conn->sid,
1410 },
1411 .sid = raft->sid,
1412 },
1413 };
1414 raft_send_to_conn(raft, &rq, conn);
1415 } else {
1416 union raft_rpc rq = (union raft_rpc) {
1417 .hello_request = {
1418 .common = {
1419 .type = RAFT_RPC_HELLO_REQUEST,
1420 .sid = conn->sid,
1421 },
1422 .address = raft->local_address,
1423 },
1424 };
1425 raft_send_to_conn(raft, &rq, conn);
1426 }
1427 }
1428
1429 for (size_t i = 0; i < 50; i++) {
1430 union raft_rpc rpc;
1431 if (!raft_conn_receive(raft, conn, &rpc)) {
1432 break;
1433 }
1434
1435 log_rpc(&rpc, "<--", conn, 0);
1436 raft_handle_rpc(raft, &rpc);
1437 raft_rpc_uninit(&rpc);
1438 }
1439 }
1440
1441 static void
1442 raft_waiter_complete_rpc(struct raft *raft, const union raft_rpc *rpc)
1443 {
1444 uint64_t term = raft_rpc_get_term(rpc);
1445 if (term && term < raft->term) {
1446 /* Drop the message because it's for an expired term. */
1447 return;
1448 }
1449
1450 if (!raft_is_rpc_synced(raft, rpc)) {
1451 /* This is a bug. A reply message is deferred because some state in
1452 * the message, such as a term or index, has not been committed to
1453 * disk, and they should only be completed when that commit is done.
1454 * But this message is being completed before the commit is finished.
1455 * Complain, and hope that someone reports the bug. */
1456 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1457 if (VLOG_DROP_ERR(&rl)) {
1458 return;
1459 }
1460
1461 struct ds s = DS_EMPTY_INITIALIZER;
1462
1463 if (term > raft->synced_term) {
1464 ds_put_format(&s, " because message term %"PRIu64" is "
1465 "past synced term %"PRIu64,
1466 term, raft->synced_term);
1467 }
1468
1469 uint64_t index = raft_rpc_get_min_sync_index(rpc);
1470 if (index > raft->log_synced) {
1471 ds_put_format(&s, " %s message index %"PRIu64" is past last "
1472 "synced index %"PRIu64,
1473 s.length ? "and" : "because",
1474 index, raft->log_synced);
1475 }
1476
1477 const struct uuid *vote = raft_rpc_get_vote(rpc);
1478 if (vote && !uuid_equals(vote, &raft->synced_vote)) {
1479 char buf1[SID_LEN + 1];
1480 char buf2[SID_LEN + 1];
1481 ds_put_format(&s, " %s vote %s differs from synced vote %s",
1482 s.length ? "and" : "because",
1483 raft_get_nickname(raft, vote, buf1, sizeof buf1),
1484 raft_get_nickname(raft, &raft->synced_vote,
1485 buf2, sizeof buf2));
1486 }
1487
1488 char buf[SID_LEN + 1];
1489 ds_put_format(&s, ": %s ",
1490 raft_get_nickname(raft, &rpc->common.sid,
1491 buf, sizeof buf));
1492 raft_rpc_format(rpc, &s);
1493 VLOG_ERR("internal error: deferred %s message completed "
1494 "but not ready to send%s",
1495 raft_rpc_type_to_string(rpc->type), ds_cstr(&s));
1496 ds_destroy(&s);
1497
1498 return;
1499 }
1500
1501 struct raft_conn *dst = raft_find_conn_by_sid(raft, &rpc->common.sid);
1502 if (dst) {
1503 raft_send_to_conn(raft, rpc, dst);
1504 }
1505 }
1506
1507 static void
1508 raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
1509 {
1510 switch (w->type) {
1511 case RAFT_W_ENTRY:
1512 if (raft->role == RAFT_LEADER) {
1513 raft_update_our_match_index(raft, w->entry.index);
1514 }
1515 raft->log_synced = w->entry.index;
1516 break;
1517
1518 case RAFT_W_TERM:
1519 raft->synced_term = w->term.term;
1520 raft->synced_vote = w->term.vote;
1521 break;
1522
1523 case RAFT_W_RPC:
1524 raft_waiter_complete_rpc(raft, w->rpc);
1525 break;
1526 }
1527 }
1528
1529 static void
1530 raft_waiter_destroy(struct raft_waiter *w)
1531 {
1532 if (!w) {
1533 return;
1534 }
1535
1536 ovs_list_remove(&w->list_node);
1537
1538 switch (w->type) {
1539 case RAFT_W_ENTRY:
1540 case RAFT_W_TERM:
1541 break;
1542
1543 case RAFT_W_RPC:
1544 raft_rpc_uninit(w->rpc);
1545 free(w->rpc);
1546 break;
1547 }
1548 free(w);
1549 }
1550
1551 static void
1552 raft_waiters_run(struct raft *raft)
1553 {
1554 if (ovs_list_is_empty(&raft->waiters)) {
1555 return;
1556 }
1557
1558 uint64_t cur = ovsdb_log_commit_progress(raft->log);
1559 struct raft_waiter *w, *next;
1560 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1561 if (cur < w->commit_ticket) {
1562 break;
1563 }
1564 raft_waiter_complete(raft, w);
1565 raft_waiter_destroy(w);
1566 }
1567 }
1568
1569 static void
1570 raft_waiters_wait(struct raft *raft)
1571 {
1572 struct raft_waiter *w;
1573 LIST_FOR_EACH (w, list_node, &raft->waiters) {
1574 ovsdb_log_commit_wait(raft->log, w->commit_ticket);
1575 break;
1576 }
1577 }
1578
1579 static void
1580 raft_waiters_destroy(struct raft *raft)
1581 {
1582 struct raft_waiter *w, *next;
1583 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1584 raft_waiter_destroy(w);
1585 }
1586 }
1587
1588 static bool OVS_WARN_UNUSED_RESULT
1589 raft_set_term(struct raft *raft, uint64_t term, const struct uuid *vote)
1590 {
1591 struct ovsdb_error *error = raft_write_state(raft->log, term, vote);
1592 if (!raft_handle_write_error(raft, error)) {
1593 return false;
1594 }
1595
1596 struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_TERM, true);
1597 raft->term = w->term.term = term;
1598 raft->vote = w->term.vote = vote ? *vote : UUID_ZERO;
1599 return true;
1600 }
1601
1602 static void
1603 raft_accept_vote(struct raft *raft, struct raft_server *s,
1604 const struct uuid *vote)
1605 {
1606 if (uuid_equals(&s->vote, vote)) {
1607 return;
1608 }
1609 if (!uuid_is_zero(&s->vote)) {
1610 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1611 char buf1[SID_LEN + 1];
1612 char buf2[SID_LEN + 1];
1613 VLOG_WARN_RL(&rl, "server %s changed its vote from %s to %s",
1614 s->nickname,
1615 raft_get_nickname(raft, &s->vote, buf1, sizeof buf1),
1616 raft_get_nickname(raft, vote, buf2, sizeof buf2));
1617 }
1618 s->vote = *vote;
1619 if (uuid_equals(vote, &raft->sid)
1620 && ++raft->n_votes > hmap_count(&raft->servers) / 2) {
1621 raft_become_leader(raft);
1622 }
1623 }
1624
1625 static void
1626 raft_start_election(struct raft *raft, bool leadership_transfer)
1627 {
1628 if (raft->leaving) {
1629 return;
1630 }
1631
1632 struct raft_server *me = raft_find_server(raft, &raft->sid);
1633 if (!me) {
1634 return;
1635 }
1636
1637 if (!raft_set_term(raft, raft->term + 1, &raft->sid)) {
1638 return;
1639 }
1640
1641 ovs_assert(raft->role != RAFT_LEADER);
1642 raft->role = RAFT_CANDIDATE;
1643 /* If there was no leader elected since last election, we know we are
1644 * retrying now. */
1645 raft->candidate_retrying = !raft->had_leader;
1646 raft->had_leader = false;
1647
1648 raft->n_votes = 0;
1649
1650 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1651 if (!VLOG_DROP_INFO(&rl)) {
1652 long long int now = time_msec();
1653 if (now >= raft->election_timeout) {
1654 VLOG_INFO("term %"PRIu64": %lld ms timeout expired, "
1655 "starting election",
1656 raft->term, now - raft->election_base);
1657 } else {
1658 VLOG_INFO("term %"PRIu64": starting election", raft->term);
1659 }
1660 }
1661 raft_reset_election_timer(raft);
1662
1663 struct raft_server *peer;
1664 HMAP_FOR_EACH (peer, hmap_node, &raft->servers) {
1665 peer->vote = UUID_ZERO;
1666 if (uuid_equals(&raft->sid, &peer->sid)) {
1667 continue;
1668 }
1669
1670 union raft_rpc rq = {
1671 .vote_request = {
1672 .common = {
1673 .type = RAFT_RPC_VOTE_REQUEST,
1674 .sid = peer->sid,
1675 },
1676 .term = raft->term,
1677 .last_log_index = raft->log_end - 1,
1678 .last_log_term = (
1679 raft->log_end > raft->log_start
1680 ? raft->entries[raft->log_end - raft->log_start - 1].term
1681 : raft->snap.term),
1682 .leadership_transfer = leadership_transfer,
1683 },
1684 };
1685 raft_send(raft, &rq);
1686 }
1687
1688 /* Vote for ourselves. */
1689 raft_accept_vote(raft, me, &raft->sid);
1690 }
1691
1692 static void
1693 raft_open_conn(struct raft *raft, const char *address, const struct uuid *sid)
1694 {
1695 if (strcmp(address, raft->local_address)
1696 && !raft_find_conn_by_address(raft, address)) {
1697 raft_add_conn(raft, jsonrpc_session_open(address, true), sid, false);
1698 }
1699 }
1700
1701 static void
1702 raft_conn_close(struct raft_conn *conn)
1703 {
1704 jsonrpc_session_close(conn->js);
1705 ovs_list_remove(&conn->list_node);
1706 free(conn->nickname);
1707 free(conn);
1708 }
1709
1710 /* Returns true if 'conn' should stay open, false if it should be closed. */
1711 static bool
1712 raft_conn_should_stay_open(struct raft *raft, struct raft_conn *conn)
1713 {
1714 /* Close the connection if it's actually dead. If necessary, we'll
1715 * initiate a new session later. */
1716 if (!jsonrpc_session_is_alive(conn->js)) {
1717 return false;
1718 }
1719
1720 /* Keep incoming sessions. We trust the originator to decide to drop
1721 * it. */
1722 if (conn->incoming) {
1723 return true;
1724 }
1725
1726 /* If we are joining the cluster, keep sessions to the remote addresses
1727 * that are supposed to be part of the cluster we're joining. */
1728 if (raft->joining && sset_contains(&raft->remote_addresses,
1729 jsonrpc_session_get_name(conn->js))) {
1730 return true;
1731 }
1732
1733 /* We have joined the cluster. If we did that "recently", then there is a
1734 * chance that we do not have the most recent server configuration log
1735 * entry. If so, it's a waste to disconnect from the servers that were in
1736 * remote_addresses and that will probably appear in the configuration,
1737 * just to reconnect to them a moment later when we do get the
1738 * configuration update. If we are not ourselves in the configuration,
1739 * then we know that there must be a new configuration coming up, so in
1740 * that case keep the connection. */
1741 if (!raft_find_server(raft, &raft->sid)) {
1742 return true;
1743 }
1744
1745 /* Keep the connection only if the server is part of the configuration. */
1746 return raft_find_server(raft, &conn->sid);
1747 }
1748
1749 /* Allows 'raft' to maintain the distributed log. Call this function as part
1750 * of the process's main loop. */
1751 void
1752 raft_run(struct raft *raft)
1753 {
1754 if (raft->left || raft->failed) {
1755 return;
1756 }
1757
1758 raft_waiters_run(raft);
1759
1760 if (!raft->listener && time_msec() >= raft->listen_backoff) {
1761 char *paddr = raft_make_address_passive(raft->local_address);
1762 int error = pstream_open(paddr, &raft->listener, DSCP_DEFAULT);
1763 if (error) {
1764 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1765 VLOG_WARN_RL(&rl, "%s: listen failed (%s)",
1766 paddr, ovs_strerror(error));
1767 raft->listen_backoff = time_msec() + 1000;
1768 }
1769 free(paddr);
1770 }
1771
1772 if (raft->listener) {
1773 struct stream *stream;
1774 int error = pstream_accept(raft->listener, &stream);
1775 if (!error) {
1776 raft_add_conn(raft, jsonrpc_session_open_unreliably(
1777 jsonrpc_open(stream), DSCP_DEFAULT), NULL,
1778 true);
1779 } else if (error != EAGAIN) {
1780 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1781 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
1782 pstream_get_name(raft->listener),
1783 ovs_strerror(error));
1784 }
1785 }
1786
1787 /* Run RPCs for all open sessions. */
1788 struct raft_conn *conn;
1789 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1790 raft_conn_run(raft, conn);
1791 }
1792
1793 /* Close unneeded sessions. */
1794 struct raft_conn *next;
1795 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1796 if (!raft_conn_should_stay_open(raft, conn)) {
1797 raft_conn_close(conn);
1798 }
1799 }
1800
1801 /* Open needed sessions. */
1802 struct raft_server *server;
1803 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1804 raft_open_conn(raft, server->address, &server->sid);
1805 }
1806 if (raft->joining) {
1807 const char *address;
1808 SSET_FOR_EACH (address, &raft->remote_addresses) {
1809 raft_open_conn(raft, address, NULL);
1810 }
1811 }
1812
1813 if (!raft->joining && time_msec() >= raft->election_timeout) {
1814 if (raft->role == RAFT_LEADER) {
1815 /* Check if majority of followers replied, then reset
1816 * election_timeout and reset s->replied. Otherwise, become
1817 * follower.
1818 *
1819 * Raft paper section 6.2: Leaders: A server might be in the leader
1820 * state, but if it isn’t the current leader, it could be
1821 * needlessly delaying client requests. For example, suppose a
1822 * leader is partitioned from the rest of the cluster, but it can
1823 * still communicate with a particular client. Without additional
1824 * mechanism, it could delay a request from that client forever,
1825 * being unable to replicate a log entry to any other servers.
1826 * Meanwhile, there might be another leader of a newer term that is
1827 * able to communicate with a majority of the cluster and would be
1828 * able to commit the client’s request. Thus, a leader in Raft
1829 * steps down if an election timeout elapses without a successful
1830 * round of heartbeats to a majority of its cluster; this allows
1831 * clients to retry their requests with another server. */
1832 int count = 0;
1833 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1834 if (server->replied) {
1835 count ++;
1836 }
1837 }
1838 if (count >= hmap_count(&raft->servers) / 2) {
1839 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1840 server->replied = false;
1841 }
1842 raft_reset_election_timer(raft);
1843 } else {
1844 raft_become_follower(raft);
1845 raft_start_election(raft, false);
1846 }
1847 } else {
1848 raft_start_election(raft, false);
1849 }
1850
1851 }
1852
1853 if (raft->leaving && time_msec() >= raft->leave_timeout) {
1854 raft_send_remove_server_requests(raft);
1855 }
1856
1857 if (raft->joining && time_msec() >= raft->join_timeout) {
1858 raft->join_timeout = time_msec() + 1000;
1859 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1860 raft_send_add_server_request(raft, conn);
1861 }
1862 }
1863
1864 long long int now = time_msec();
1865 if (now >= raft->ping_timeout) {
1866 if (raft->role == RAFT_LEADER) {
1867 raft_send_heartbeats(raft);
1868 }
1869 /* Check if any commands timeout. Timeout is set to twice the time of
1870 * election base time so that commands can complete properly during
1871 * leader election. E.g. a leader crashed and current node with pending
1872 * commands becomes new leader: the pending commands can still complete
1873 * if the crashed leader has replicated the transactions to majority of
1874 * followers before it crashed. */
1875 struct raft_command *cmd, *next_cmd;
1876 HMAP_FOR_EACH_SAFE (cmd, next_cmd, hmap_node, &raft->commands) {
1877 if (cmd->timestamp
1878 && now - cmd->timestamp > raft->election_timer * 2) {
1879 raft_command_complete(raft, cmd, RAFT_CMD_TIMEOUT);
1880 }
1881 }
1882 raft_reset_ping_timer(raft);
1883 }
1884
1885 /* Do this only at the end; if we did it as soon as we set raft->left or
1886 * raft->failed in handling the RemoveServerReply, then it could easily
1887 * cause references to freed memory in RPC sessions, etc. */
1888 if (raft->left || raft->failed) {
1889 raft_close__(raft);
1890 }
1891 }
1892
1893 static void
1894 raft_wait_session(struct jsonrpc_session *js)
1895 {
1896 if (js) {
1897 jsonrpc_session_wait(js);
1898 jsonrpc_session_recv_wait(js);
1899 }
1900 }
1901
1902 /* Causes the next call to poll_block() to wake up when 'raft' needs to do
1903 * something. */
1904 void
1905 raft_wait(struct raft *raft)
1906 {
1907 if (raft->left || raft->failed) {
1908 return;
1909 }
1910
1911 raft_waiters_wait(raft);
1912
1913 if (raft->listener) {
1914 pstream_wait(raft->listener);
1915 } else {
1916 poll_timer_wait_until(raft->listen_backoff);
1917 }
1918
1919 struct raft_conn *conn;
1920 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1921 raft_wait_session(conn->js);
1922 }
1923
1924 if (!raft->joining) {
1925 poll_timer_wait_until(raft->election_timeout);
1926 } else {
1927 poll_timer_wait_until(raft->join_timeout);
1928 }
1929 if (raft->leaving) {
1930 poll_timer_wait_until(raft->leave_timeout);
1931 }
1932 if (raft->role == RAFT_LEADER || !hmap_is_empty(&raft->commands)) {
1933 poll_timer_wait_until(raft->ping_timeout);
1934 }
1935 }
1936
1937 static struct raft_waiter *
1938 raft_waiter_create(struct raft *raft, enum raft_waiter_type type,
1939 bool start_commit)
1940 {
1941 struct raft_waiter *w = xzalloc(sizeof *w);
1942 ovs_list_push_back(&raft->waiters, &w->list_node);
1943 w->commit_ticket = start_commit ? ovsdb_log_commit_start(raft->log) : 0;
1944 w->type = type;
1945 return w;
1946 }
1947
1948 /* Returns a human-readable representation of 'status' (or NULL if 'status' is
1949 * invalid). */
1950 const char *
1951 raft_command_status_to_string(enum raft_command_status status)
1952 {
1953 switch (status) {
1954 case RAFT_CMD_INCOMPLETE:
1955 return "operation still in progress";
1956 case RAFT_CMD_SUCCESS:
1957 return "success";
1958 case RAFT_CMD_NOT_LEADER:
1959 return "not leader";
1960 case RAFT_CMD_BAD_PREREQ:
1961 return "prerequisite check failed";
1962 case RAFT_CMD_LOST_LEADERSHIP:
1963 return "lost leadership";
1964 case RAFT_CMD_SHUTDOWN:
1965 return "server shutdown";
1966 case RAFT_CMD_IO_ERROR:
1967 return "I/O error";
1968 case RAFT_CMD_TIMEOUT:
1969 return "timeout";
1970 default:
1971 return NULL;
1972 }
1973 }
1974
1975 /* Converts human-readable status in 's' into status code in '*statusp'.
1976 * Returns true if successful, false if 's' is unknown. */
1977 bool
1978 raft_command_status_from_string(const char *s,
1979 enum raft_command_status *statusp)
1980 {
1981 for (enum raft_command_status status = 0; ; status++) {
1982 const char *s2 = raft_command_status_to_string(status);
1983 if (!s2) {
1984 *statusp = 0;
1985 return false;
1986 } else if (!strcmp(s, s2)) {
1987 *statusp = status;
1988 return true;
1989 }
1990 }
1991 }
1992
1993 static const struct uuid *
1994 raft_get_eid(const struct raft *raft, uint64_t index)
1995 {
1996 for (; index >= raft->log_start; index--) {
1997 const struct raft_entry *e = raft_get_entry(raft, index);
1998 if (e->data) {
1999 return &e->eid;
2000 }
2001 }
2002 return &raft->snap.eid;
2003 }
2004
2005 const struct uuid *
2006 raft_current_eid(const struct raft *raft)
2007 {
2008 return raft_get_eid(raft, raft->log_end - 1);
2009 }
2010
2011 static struct raft_command *
2012 raft_command_create_completed(enum raft_command_status status)
2013 {
2014 ovs_assert(status != RAFT_CMD_INCOMPLETE);
2015
2016 struct raft_command *cmd = xzalloc(sizeof *cmd);
2017 cmd->n_refs = 1;
2018 cmd->status = status;
2019 return cmd;
2020 }
2021
2022 static struct raft_command *
2023 raft_command_create_incomplete(struct raft *raft, uint64_t index)
2024 {
2025 struct raft_command *cmd = xzalloc(sizeof *cmd);
2026 cmd->n_refs = 2; /* One for client, one for raft->commands. */
2027 cmd->index = index;
2028 cmd->status = RAFT_CMD_INCOMPLETE;
2029 hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
2030 return cmd;
2031 }
2032
2033 static struct raft_command * OVS_WARN_UNUSED_RESULT
2034 raft_command_initiate(struct raft *raft,
2035 const struct json *data, const struct json *servers,
2036 uint64_t election_timer, const struct uuid *eid)
2037 {
2038 /* Write to local log. */
2039 uint64_t index = raft->log_end;
2040 if (!raft_handle_write_error(
2041 raft, raft_write_entry(
2042 raft, raft->term, json_nullable_clone(data), eid,
2043 json_nullable_clone(servers),
2044 election_timer))) {
2045 return raft_command_create_completed(RAFT_CMD_IO_ERROR);
2046 }
2047
2048 struct raft_command *cmd = raft_command_create_incomplete(raft, index);
2049 ovs_assert(eid);
2050 cmd->eid = *eid;
2051 cmd->timestamp = time_msec();
2052
2053 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index = cmd->index;
2054
2055 if (failure_test == FT_CRASH_BEFORE_SEND_APPEND_REQ) {
2056 ovs_fatal(0, "Raft test: crash before sending append_request.");
2057 }
2058 /* Write to remote logs. */
2059 struct raft_server *s;
2060 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2061 if (!uuid_equals(&s->sid, &raft->sid) && s->next_index == index) {
2062 raft_send_append_request(raft, s, 1, "execute command");
2063 s->next_index++;
2064 }
2065 }
2066 if (failure_test == FT_CRASH_AFTER_SEND_APPEND_REQ) {
2067 ovs_fatal(0, "Raft test: crash after sending append_request.");
2068 }
2069 raft_reset_ping_timer(raft);
2070
2071 return cmd;
2072 }
2073
2074 static void
2075 log_all_commands(struct raft *raft)
2076 {
2077 struct raft_command *cmd, *next;
2078 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2079 VLOG_DBG("raft command eid: "UUID_FMT, UUID_ARGS(&cmd->eid));
2080 }
2081 }
2082
2083 static struct raft_command * OVS_WARN_UNUSED_RESULT
2084 raft_command_execute__(struct raft *raft, const struct json *data,
2085 const struct json *servers, uint64_t election_timer,
2086 const struct uuid *prereq, struct uuid *result)
2087 {
2088 if (raft->joining || raft->leaving || raft->left || raft->failed) {
2089 return raft_command_create_completed(RAFT_CMD_SHUTDOWN);
2090 }
2091
2092 if (raft->role != RAFT_LEADER) {
2093 /* Consider proxying the command to the leader. We can only do that if
2094 * we know the leader and the command does not change the set of
2095 * servers. We do not proxy commands without prerequisites, even
2096 * though we could, because in an OVSDB context a log entry doesn't
2097 * make sense without context. */
2098 if (servers || election_timer || !data
2099 || raft->role != RAFT_FOLLOWER || uuid_is_zero(&raft->leader_sid)
2100 || !prereq) {
2101 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2102 }
2103 }
2104
2105 struct uuid eid = data ? uuid_random() : UUID_ZERO;
2106 if (result) {
2107 *result = eid;
2108 }
2109
2110 if (raft->role != RAFT_LEADER) {
2111 const union raft_rpc rpc = {
2112 .execute_command_request = {
2113 .common = {
2114 .type = RAFT_RPC_EXECUTE_COMMAND_REQUEST,
2115 .sid = raft->leader_sid,
2116 },
2117 .data = CONST_CAST(struct json *, data),
2118 .prereq = *prereq,
2119 .result = eid,
2120 }
2121 };
2122 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REQ) {
2123 ovs_fatal(0, "Raft test: crash before sending "
2124 "execute_command_request");
2125 }
2126 if (!raft_send(raft, &rpc)) {
2127 /* Couldn't send command, so it definitely failed. */
2128 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2129 }
2130 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REQ) {
2131 ovs_fatal(0, "Raft test: crash after sending "
2132 "execute_command_request");
2133 }
2134
2135 struct raft_command *cmd = raft_command_create_incomplete(raft, 0);
2136 cmd->timestamp = time_msec();
2137 cmd->eid = eid;
2138 log_all_commands(raft);
2139 return cmd;
2140 }
2141
2142 const struct uuid *current_eid = raft_current_eid(raft);
2143 if (prereq && !uuid_equals(prereq, current_eid)) {
2144 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2145 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
2146 "prerequisite "UUID_FMT,
2147 UUID_ARGS(current_eid), UUID_ARGS(prereq));
2148 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ);
2149 }
2150
2151 return raft_command_initiate(raft, data, servers, election_timer, &eid);
2152 }
2153
2154 /* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2155 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2156 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2157 * populated with the entry ID for the new log entry.
2158 *
2159 * Returns a "struct raft_command" that may be used to track progress adding
2160 * the log entry. The caller must eventually free the returned structure, with
2161 * raft_command_unref(). */
2162 struct raft_command * OVS_WARN_UNUSED_RESULT
2163 raft_command_execute(struct raft *raft, const struct json *data,
2164 const struct uuid *prereq, struct uuid *result)
2165 {
2166 return raft_command_execute__(raft, data, NULL, 0, prereq, result);
2167 }
2168
2169 /* Returns the status of 'cmd'. */
2170 enum raft_command_status
2171 raft_command_get_status(const struct raft_command *cmd)
2172 {
2173 ovs_assert(cmd->n_refs > 0);
2174 return cmd->status;
2175 }
2176
2177 /* Returns the index of the log entry at which 'cmd' was committed.
2178 *
2179 * This function works only with successful commands. */
2180 uint64_t
2181 raft_command_get_commit_index(const struct raft_command *cmd)
2182 {
2183 ovs_assert(cmd->n_refs > 0);
2184 ovs_assert(cmd->status == RAFT_CMD_SUCCESS);
2185 return cmd->index;
2186 }
2187
2188 /* Frees 'cmd'. */
2189 void
2190 raft_command_unref(struct raft_command *cmd)
2191 {
2192 if (cmd) {
2193 ovs_assert(cmd->n_refs > 0);
2194 if (!--cmd->n_refs) {
2195 free(cmd);
2196 }
2197 }
2198 }
2199
2200 /* Causes poll_block() to wake up when 'cmd' has status to report. */
2201 void
2202 raft_command_wait(const struct raft_command *cmd)
2203 {
2204 if (cmd->status != RAFT_CMD_INCOMPLETE) {
2205 poll_immediate_wake();
2206 }
2207 }
2208
2209 static void
2210 raft_command_complete(struct raft *raft,
2211 struct raft_command *cmd,
2212 enum raft_command_status status)
2213 {
2214 VLOG_DBG("raft_command_complete eid "UUID_FMT" status: %s",
2215 UUID_ARGS(&cmd->eid), raft_command_status_to_string(status));
2216 if (!uuid_is_zero(&cmd->sid)) {
2217 uint64_t commit_index = status == RAFT_CMD_SUCCESS ? cmd->index : 0;
2218 raft_send_execute_command_reply(raft, &cmd->sid, &cmd->eid, status,
2219 commit_index);
2220 }
2221
2222 ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
2223 ovs_assert(cmd->n_refs > 0);
2224 hmap_remove(&raft->commands, &cmd->hmap_node);
2225 cmd->status = status;
2226 raft_command_unref(cmd);
2227 }
2228
2229 static void
2230 raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
2231 {
2232 struct raft_command *cmd, *next;
2233 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2234 raft_command_complete(raft, cmd, status);
2235 }
2236 }
2237
2238 static struct raft_command *
2239 raft_find_command_by_eid(struct raft *raft, const struct uuid *eid)
2240 {
2241 struct raft_command *cmd;
2242
2243 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2244 if (uuid_equals(&cmd->eid, eid)) {
2245 return cmd;
2246 }
2247 }
2248 return NULL;
2249 }
2250 \f
2251 #define RAFT_RPC(ENUM, NAME) \
2252 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2253 RAFT_RPC_TYPES
2254 #undef RAFT_RPC
2255
2256 static void
2257 raft_handle_hello_request(struct raft *raft OVS_UNUSED,
2258 const struct raft_hello_request *hello OVS_UNUSED)
2259 {
2260 }
2261
2262 /* 'sid' is the server being added. */
2263 static void
2264 raft_send_add_server_reply__(struct raft *raft, const struct uuid *sid,
2265 const char *address,
2266 bool success, const char *comment)
2267 {
2268 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2269 if (!VLOG_DROP_INFO(&rl)) {
2270 struct ds s = DS_EMPTY_INITIALIZER;
2271 char buf[SID_LEN + 1];
2272 ds_put_format(&s, "adding %s ("SID_FMT" at %s) "
2273 "to cluster "CID_FMT" %s",
2274 raft_get_nickname(raft, sid, buf, sizeof buf),
2275 SID_ARGS(sid), address, CID_ARGS(&raft->cid),
2276 success ? "succeeded" : "failed");
2277 if (comment) {
2278 ds_put_format(&s, " (%s)", comment);
2279 }
2280 VLOG_INFO("%s", ds_cstr(&s));
2281 ds_destroy(&s);
2282 }
2283
2284 union raft_rpc rpy = {
2285 .add_server_reply = {
2286 .common = {
2287 .type = RAFT_RPC_ADD_SERVER_REPLY,
2288 .sid = *sid,
2289 .comment = CONST_CAST(char *, comment),
2290 },
2291 .success = success,
2292 }
2293 };
2294
2295 struct sset *remote_addresses = &rpy.add_server_reply.remote_addresses;
2296 sset_init(remote_addresses);
2297 if (!raft->joining) {
2298 struct raft_server *s;
2299 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2300 if (!uuid_equals(&s->sid, &raft->sid)) {
2301 sset_add(remote_addresses, s->address);
2302 }
2303 }
2304 }
2305
2306 raft_send(raft, &rpy);
2307
2308 sset_destroy(remote_addresses);
2309 }
2310
2311 static void
2312 raft_send_remove_server_reply_rpc(struct raft *raft,
2313 const struct uuid *dst_sid,
2314 const struct uuid *target_sid,
2315 bool success, const char *comment)
2316 {
2317 if (uuid_equals(&raft->sid, dst_sid)) {
2318 if (success && uuid_equals(&raft->sid, target_sid)) {
2319 raft_finished_leaving_cluster(raft);
2320 }
2321 return;
2322 }
2323
2324 const union raft_rpc rpy = {
2325 .remove_server_reply = {
2326 .common = {
2327 .type = RAFT_RPC_REMOVE_SERVER_REPLY,
2328 .sid = *dst_sid,
2329 .comment = CONST_CAST(char *, comment),
2330 },
2331 .target_sid = (uuid_equals(dst_sid, target_sid)
2332 ? UUID_ZERO
2333 : *target_sid),
2334 .success = success,
2335 }
2336 };
2337 raft_send(raft, &rpy);
2338 }
2339
2340 static void
2341 raft_send_remove_server_reply__(struct raft *raft,
2342 const struct uuid *target_sid,
2343 const struct uuid *requester_sid,
2344 struct unixctl_conn *requester_conn,
2345 bool success, const char *comment)
2346 {
2347 struct ds s = DS_EMPTY_INITIALIZER;
2348 ds_put_format(&s, "request ");
2349 if (!uuid_is_zero(requester_sid)) {
2350 char buf[SID_LEN + 1];
2351 ds_put_format(&s, "by %s",
2352 raft_get_nickname(raft, requester_sid, buf, sizeof buf));
2353 } else {
2354 ds_put_cstr(&s, "via unixctl");
2355 }
2356 ds_put_cstr(&s, " to remove ");
2357 if (!requester_conn && uuid_equals(target_sid, requester_sid)) {
2358 ds_put_cstr(&s, "itself");
2359 } else {
2360 char buf[SID_LEN + 1];
2361 ds_put_cstr(&s, raft_get_nickname(raft, target_sid, buf, sizeof buf));
2362 if (uuid_equals(target_sid, &raft->sid)) {
2363 ds_put_cstr(&s, " (ourselves)");
2364 }
2365 }
2366 ds_put_format(&s, " from cluster "CID_FMT" %s",
2367 CID_ARGS(&raft->cid),
2368 success ? "succeeded" : "failed");
2369 if (comment) {
2370 ds_put_format(&s, " (%s)", comment);
2371 }
2372
2373 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2374 VLOG_INFO_RL(&rl, "%s", ds_cstr(&s));
2375
2376 /* Send RemoveServerReply to the requester (which could be a server or a
2377 * unixctl connection. Also always send it to the removed server; this
2378 * allows it to be sure that it's really removed and update its log and
2379 * disconnect permanently. */
2380 if (!uuid_is_zero(requester_sid)) {
2381 raft_send_remove_server_reply_rpc(raft, requester_sid, target_sid,
2382 success, comment);
2383 }
2384 if (!uuid_equals(requester_sid, target_sid)) {
2385 raft_send_remove_server_reply_rpc(raft, target_sid, target_sid,
2386 success, comment);
2387 }
2388 if (requester_conn) {
2389 if (success) {
2390 unixctl_command_reply(requester_conn, ds_cstr(&s));
2391 } else {
2392 unixctl_command_reply_error(requester_conn, ds_cstr(&s));
2393 }
2394 }
2395
2396 ds_destroy(&s);
2397 }
2398
2399 static void
2400 raft_send_add_server_reply(struct raft *raft,
2401 const struct raft_add_server_request *rq,
2402 bool success, const char *comment)
2403 {
2404 return raft_send_add_server_reply__(raft, &rq->common.sid, rq->address,
2405 success, comment);
2406 }
2407
2408 static void
2409 raft_send_remove_server_reply(struct raft *raft,
2410 const struct raft_remove_server_request *rq,
2411 bool success, const char *comment)
2412 {
2413 return raft_send_remove_server_reply__(raft, &rq->sid, &rq->common.sid,
2414 rq->requester_conn, success,
2415 comment);
2416 }
2417
2418 static void
2419 raft_become_follower(struct raft *raft)
2420 {
2421 raft->leader_sid = UUID_ZERO;
2422 if (raft->role == RAFT_FOLLOWER) {
2423 return;
2424 }
2425
2426 raft->role = RAFT_FOLLOWER;
2427 raft_reset_election_timer(raft);
2428
2429 /* Notify clients about lost leadership.
2430 *
2431 * We do not reverse our changes to 'raft->servers' because the new
2432 * configuration is already part of the log. Possibly the configuration
2433 * log entry will not be committed, but until we know that we must use the
2434 * new configuration. Our AppendEntries processing will properly update
2435 * the server configuration later, if necessary. */
2436 struct raft_server *s;
2437 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
2438 raft_send_add_server_reply__(raft, &s->sid, s->address, false,
2439 RAFT_SERVER_LOST_LEADERSHIP);
2440 }
2441 if (raft->remove_server) {
2442 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
2443 &raft->remove_server->requester_sid,
2444 raft->remove_server->requester_conn,
2445 false, RAFT_SERVER_LOST_LEADERSHIP);
2446 raft_server_destroy(raft->remove_server);
2447 raft->remove_server = NULL;
2448 }
2449
2450 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2451 }
2452
2453 static void
2454 raft_send_append_request(struct raft *raft,
2455 struct raft_server *peer, unsigned int n,
2456 const char *comment)
2457 {
2458 ovs_assert(raft->role == RAFT_LEADER);
2459
2460 const union raft_rpc rq = {
2461 .append_request = {
2462 .common = {
2463 .type = RAFT_RPC_APPEND_REQUEST,
2464 .sid = peer->sid,
2465 .comment = CONST_CAST(char *, comment),
2466 },
2467 .term = raft->term,
2468 .prev_log_index = peer->next_index - 1,
2469 .prev_log_term = (peer->next_index - 1 >= raft->log_start
2470 ? raft->entries[peer->next_index - 1
2471 - raft->log_start].term
2472 : raft->snap.term),
2473 .leader_commit = raft->commit_index,
2474 .entries = &raft->entries[peer->next_index - raft->log_start],
2475 .n_entries = n,
2476 },
2477 };
2478 raft_send(raft, &rq);
2479 }
2480
2481 static void
2482 raft_send_heartbeats(struct raft *raft)
2483 {
2484 struct raft_server *s;
2485 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2486 if (!uuid_equals(&raft->sid, &s->sid)) {
2487 raft_send_append_request(raft, s, 0, "heartbeat");
2488 }
2489 }
2490
2491 /* Send anyone waiting for a command to complete a ping to let them
2492 * know we're still working on it. */
2493 struct raft_command *cmd;
2494 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2495 if (!uuid_is_zero(&cmd->sid)) {
2496 raft_send_execute_command_reply(raft, &cmd->sid,
2497 &cmd->eid,
2498 RAFT_CMD_INCOMPLETE, 0);
2499 }
2500 }
2501
2502 raft_reset_ping_timer(raft);
2503 }
2504
2505 /* Initializes the fields in 's' that represent the leader's view of the
2506 * server. */
2507 static void
2508 raft_server_init_leader(struct raft *raft, struct raft_server *s)
2509 {
2510 s->next_index = raft->log_end;
2511 s->match_index = 0;
2512 s->phase = RAFT_PHASE_STABLE;
2513 s->replied = false;
2514 }
2515
2516 static void
2517 raft_set_leader(struct raft *raft, const struct uuid *sid)
2518 {
2519 raft->leader_sid = *sid;
2520 raft->had_leader = true;
2521 raft->candidate_retrying = false;
2522 }
2523
2524 static void
2525 raft_become_leader(struct raft *raft)
2526 {
2527 log_all_commands(raft);
2528
2529 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
2530 VLOG_INFO_RL(&rl, "term %"PRIu64": elected leader by %d+ of "
2531 "%"PRIuSIZE" servers", raft->term,
2532 raft->n_votes, hmap_count(&raft->servers));
2533
2534 ovs_assert(raft->role != RAFT_LEADER);
2535 raft->role = RAFT_LEADER;
2536 raft_set_leader(raft, &raft->sid);
2537 raft_reset_election_timer(raft);
2538 raft_reset_ping_timer(raft);
2539
2540 struct raft_server *s;
2541 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2542 raft_server_init_leader(raft, s);
2543 }
2544
2545 raft->election_timer_new = 0;
2546
2547 raft_update_our_match_index(raft, raft->log_end - 1);
2548 raft_send_heartbeats(raft);
2549
2550 /* Write the fact that we are leader to the log. This is not used by the
2551 * algorithm (although it could be, for quick restart), but it is used for
2552 * offline analysis to check for conformance with the properties that Raft
2553 * guarantees. */
2554 struct raft_record r = {
2555 .type = RAFT_REC_LEADER,
2556 .term = raft->term,
2557 .sid = raft->sid,
2558 };
2559 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2560
2561 /* Initiate a no-op commit. Otherwise we might never find out what's in
2562 * the log. See section 6.4 item 1:
2563 *
2564 * The Leader Completeness Property guarantees that a leader has all
2565 * committed entries, but at the start of its term, it may not know
2566 * which those are. To find out, it needs to commit an entry from its
2567 * term. Raft handles this by having each leader commit a blank no-op
2568 * entry into the log at the start of its term. As soon as this no-op
2569 * entry is committed, the leader’s commit index will be at least as
2570 * large as any other servers’ during its term.
2571 */
2572 raft_command_unref(raft_command_execute__(raft, NULL, NULL, 0, NULL,
2573 NULL));
2574 }
2575
2576 /* Processes term 'term' received as part of RPC 'common'. Returns true if the
2577 * caller should continue processing the RPC, false if the caller should reject
2578 * it due to a stale term. */
2579 static bool
2580 raft_receive_term__(struct raft *raft, const struct raft_rpc_common *common,
2581 uint64_t term)
2582 {
2583 /* Section 3.3 says:
2584 *
2585 * Current terms are exchanged whenever servers communicate; if one
2586 * server’s current term is smaller than the other’s, then it updates
2587 * its current term to the larger value. If a candidate or leader
2588 * discovers that its term is out of date, it immediately reverts to
2589 * follower state. If a server receives a request with a stale term
2590 * number, it rejects the request.
2591 */
2592 if (term > raft->term) {
2593 if (!raft_set_term(raft, term, NULL)) {
2594 /* Failed to update the term to 'term'. */
2595 return false;
2596 }
2597 raft_become_follower(raft);
2598 } else if (term < raft->term) {
2599 char buf[SID_LEN + 1];
2600 VLOG_INFO("rejecting term %"PRIu64" < current term %"PRIu64" received "
2601 "in %s message from server %s",
2602 term, raft->term,
2603 raft_rpc_type_to_string(common->type),
2604 raft_get_nickname(raft, &common->sid, buf, sizeof buf));
2605 return false;
2606 }
2607 return true;
2608 }
2609
2610 static void
2611 raft_get_servers_from_log(struct raft *raft, enum vlog_level level)
2612 {
2613 const struct json *servers_json = raft->snap.servers;
2614 for (uint64_t index = raft->log_end - 1; index >= raft->log_start;
2615 index--) {
2616 struct raft_entry *e = &raft->entries[index - raft->log_start];
2617 if (e->servers) {
2618 servers_json = e->servers;
2619 break;
2620 }
2621 }
2622
2623 struct hmap servers;
2624 struct ovsdb_error *error = raft_servers_from_json(servers_json, &servers);
2625 ovs_assert(!error);
2626 raft_set_servers(raft, &servers, level);
2627 raft_servers_destroy(&servers);
2628 }
2629
2630 /* Truncates the log, so that raft->log_end becomes 'new_end'.
2631 *
2632 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2633 * log file, but we don't have the right information to know how long it should
2634 * be. What we actually do is to append entries for older indexes to the
2635 * on-disk log; when we re-read it later, these entries truncate the log.
2636 *
2637 * Returns true if any of the removed log entries were server configuration
2638 * entries, false otherwise. */
2639 static bool
2640 raft_truncate(struct raft *raft, uint64_t new_end)
2641 {
2642 ovs_assert(new_end >= raft->log_start);
2643 if (raft->log_end > new_end) {
2644 char buf[SID_LEN + 1];
2645 VLOG_INFO("%s truncating %"PRIu64 " entries from end of log",
2646 raft_get_nickname(raft, &raft->sid, buf, sizeof buf),
2647 raft->log_end - new_end);
2648 }
2649
2650 bool servers_changed = false;
2651 while (raft->log_end > new_end) {
2652 struct raft_entry *entry = &raft->entries[--raft->log_end
2653 - raft->log_start];
2654 if (entry->servers) {
2655 servers_changed = true;
2656 }
2657 raft_entry_uninit(entry);
2658 }
2659 return servers_changed;
2660 }
2661
2662 static const struct json *
2663 raft_peek_next_entry(struct raft *raft, struct uuid *eid)
2664 {
2665 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2666 ovs_assert(raft->log_start <= raft->last_applied + 2);
2667 ovs_assert(raft->last_applied <= raft->commit_index);
2668 ovs_assert(raft->commit_index < raft->log_end);
2669
2670 if (raft->joining || raft->failed) {
2671 return NULL;
2672 }
2673
2674 if (raft->log_start == raft->last_applied + 2) {
2675 *eid = raft->snap.eid;
2676 return raft->snap.data;
2677 }
2678
2679 while (raft->last_applied < raft->commit_index) {
2680 const struct raft_entry *e = raft_get_entry(raft,
2681 raft->last_applied + 1);
2682 if (e->data) {
2683 *eid = e->eid;
2684 return e->data;
2685 }
2686 raft->last_applied++;
2687 }
2688 return NULL;
2689 }
2690
2691 static const struct json *
2692 raft_get_next_entry(struct raft *raft, struct uuid *eid)
2693 {
2694 const struct json *data = raft_peek_next_entry(raft, eid);
2695 if (data) {
2696 raft->last_applied++;
2697 }
2698 return data;
2699 }
2700
2701 /* Updates commit index in raft log. If commit index is already up-to-date
2702 * it does nothing and return false, otherwise, returns true. */
2703 static bool
2704 raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
2705 {
2706 if (new_commit_index <= raft->commit_index) {
2707 return false;
2708 }
2709
2710 if (raft->role == RAFT_LEADER) {
2711 while (raft->commit_index < new_commit_index) {
2712 uint64_t index = ++raft->commit_index;
2713 const struct raft_entry *e = raft_get_entry(raft, index);
2714 if (e->data) {
2715 struct raft_command *cmd
2716 = raft_find_command_by_eid(raft, &e->eid);
2717 if (cmd) {
2718 if (!cmd->index) {
2719 VLOG_DBG("Command completed after role change from"
2720 " follower to leader "UUID_FMT,
2721 UUID_ARGS(&e->eid));
2722 cmd->index = index;
2723 }
2724 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2725 }
2726 }
2727 if (e->election_timer) {
2728 VLOG_INFO("Election timer changed from %"PRIu64" to %"PRIu64,
2729 raft->election_timer, e->election_timer);
2730 raft->election_timer = e->election_timer;
2731 raft->election_timer_new = 0;
2732 }
2733 if (e->servers) {
2734 /* raft_run_reconfigure() can write a new Raft entry, which can
2735 * reallocate raft->entries, which would invalidate 'e', so
2736 * this case must be last, after the one for 'e->data'. */
2737 raft_run_reconfigure(raft);
2738 }
2739 }
2740 } else {
2741 while (raft->commit_index < new_commit_index) {
2742 uint64_t index = ++raft->commit_index;
2743 const struct raft_entry *e = raft_get_entry(raft, index);
2744 if (e->election_timer) {
2745 VLOG_INFO("Election timer changed from %"PRIu64" to %"PRIu64,
2746 raft->election_timer, e->election_timer);
2747 raft->election_timer = e->election_timer;
2748 }
2749 }
2750 /* Check if any pending command can be completed, and complete it.
2751 * This can happen when leader fail-over before sending
2752 * execute_command_reply. */
2753 const struct uuid *eid = raft_get_eid(raft, new_commit_index);
2754 struct raft_command *cmd = raft_find_command_by_eid(raft, eid);
2755 if (cmd) {
2756 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2757 VLOG_INFO_RL(&rl,
2758 "Command completed without reply (eid: "UUID_FMT", "
2759 "commit index: %"PRIu64")",
2760 UUID_ARGS(eid), new_commit_index);
2761 cmd->index = new_commit_index;
2762 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2763 }
2764 }
2765
2766 /* Write the commit index to the log. The next time we restart, this
2767 * allows us to start exporting a reasonably fresh log, instead of a log
2768 * that only contains the snapshot. */
2769 struct raft_record r = {
2770 .type = RAFT_REC_COMMIT_INDEX,
2771 .commit_index = raft->commit_index,
2772 };
2773 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2774 return true;
2775 }
2776
2777 /* This doesn't use rq->entries (but it does use rq->n_entries). */
2778 static void
2779 raft_send_append_reply(struct raft *raft, const struct raft_append_request *rq,
2780 enum raft_append_result result, const char *comment)
2781 {
2782 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2783 * min(leaderCommit, index of last new entry)" */
2784 if (result == RAFT_APPEND_OK && rq->leader_commit > raft->commit_index) {
2785 raft_update_commit_index(
2786 raft, MIN(rq->leader_commit, rq->prev_log_index + rq->n_entries));
2787 }
2788
2789 /* Send reply. */
2790 union raft_rpc reply = {
2791 .append_reply = {
2792 .common = {
2793 .type = RAFT_RPC_APPEND_REPLY,
2794 .sid = rq->common.sid,
2795 .comment = CONST_CAST(char *, comment),
2796 },
2797 .term = raft->term,
2798 .log_end = raft->log_end,
2799 .prev_log_index = rq->prev_log_index,
2800 .prev_log_term = rq->prev_log_term,
2801 .n_entries = rq->n_entries,
2802 .result = result,
2803 }
2804 };
2805 raft_send(raft, &reply);
2806 }
2807
2808 /* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2809 * NULL. Otherwise, returns an explanation for the mismatch. */
2810 static const char *
2811 match_index_and_term(const struct raft *raft,
2812 uint64_t prev_log_index, uint64_t prev_log_term)
2813 {
2814 if (prev_log_index < raft->log_start - 1) {
2815 return "mismatch before start of log";
2816 } else if (prev_log_index == raft->log_start - 1) {
2817 if (prev_log_term != raft->snap.term) {
2818 return "prev_term mismatch";
2819 }
2820 } else if (prev_log_index < raft->log_end) {
2821 if (raft->entries[prev_log_index - raft->log_start].term
2822 != prev_log_term) {
2823 return "term mismatch";
2824 }
2825 } else {
2826 /* prev_log_index >= raft->log_end */
2827 return "mismatch past end of log";
2828 }
2829 return NULL;
2830 }
2831
2832 static void
2833 raft_handle_append_entries(struct raft *raft,
2834 const struct raft_append_request *rq,
2835 uint64_t prev_log_index, uint64_t prev_log_term,
2836 const struct raft_entry *entries,
2837 unsigned int n_entries)
2838 {
2839 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2840 * the index and term of the entry in its log that immediately precedes
2841 * the new entries. If the follower does not find an entry in its log
2842 * with the same index and term, then it refuses the new entries." */
2843 const char *mismatch = match_index_and_term(raft, prev_log_index,
2844 prev_log_term);
2845 if (mismatch) {
2846 VLOG_INFO("rejecting append_request because previous entry "
2847 "%"PRIu64",%"PRIu64" not in local log (%s)",
2848 prev_log_term, prev_log_index, mismatch);
2849 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY, mismatch);
2850 return;
2851 }
2852
2853 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2854 * index but different terms), delete the existing entry and all that
2855 * follow it." */
2856 unsigned int i;
2857 bool servers_changed = false;
2858 for (i = 0; ; i++) {
2859 if (i >= n_entries) {
2860 /* No change. */
2861 if (rq->common.comment
2862 && !strcmp(rq->common.comment, "heartbeat")) {
2863 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "heartbeat");
2864 } else {
2865 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2866 }
2867 return;
2868 }
2869
2870 uint64_t log_index = (prev_log_index + 1) + i;
2871 if (log_index >= raft->log_end) {
2872 break;
2873 }
2874 if (raft->entries[log_index - raft->log_start].term
2875 != entries[i].term) {
2876 if (raft_truncate(raft, log_index)) {
2877 servers_changed = true;
2878 }
2879 break;
2880 }
2881 }
2882
2883 if (failure_test == FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE) {
2884 ovs_fatal(0, "Raft test: crash after receiving append_request with "
2885 "update.");
2886 }
2887 /* Figure 3.1: "Append any entries not already in the log." */
2888 struct ovsdb_error *error = NULL;
2889 bool any_written = false;
2890 for (; i < n_entries; i++) {
2891 const struct raft_entry *e = &entries[i];
2892 error = raft_write_entry(raft, e->term,
2893 json_nullable_clone(e->data), &e->eid,
2894 json_nullable_clone(e->servers),
2895 e->election_timer);
2896 if (error) {
2897 break;
2898 }
2899 any_written = true;
2900 if (e->servers) {
2901 servers_changed = true;
2902 }
2903 }
2904
2905 if (any_written) {
2906 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index
2907 = raft->log_end - 1;
2908 }
2909 if (servers_changed) {
2910 /* The set of servers might have changed; check. */
2911 raft_get_servers_from_log(raft, VLL_INFO);
2912 }
2913
2914 if (error) {
2915 char *s = ovsdb_error_to_string_free(error);
2916 VLOG_ERR("%s", s);
2917 free(s);
2918 raft_send_append_reply(raft, rq, RAFT_APPEND_IO_ERROR, "I/O error");
2919 return;
2920 }
2921
2922 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "log updated");
2923 }
2924
2925 static bool
2926 raft_update_leader(struct raft *raft, const struct uuid *sid)
2927 {
2928 if (raft->role == RAFT_LEADER) {
2929 char buf[SID_LEN + 1];
2930 VLOG_ERR("this server is leader but server %s claims to be",
2931 raft_get_nickname(raft, sid, buf, sizeof buf));
2932 return false;
2933 } else if (!uuid_equals(sid, &raft->leader_sid)) {
2934 if (!uuid_is_zero(&raft->leader_sid)) {
2935 char buf1[SID_LEN + 1];
2936 char buf2[SID_LEN + 1];
2937 VLOG_ERR("leader for term %"PRIu64" changed from %s to %s",
2938 raft->term,
2939 raft_get_nickname(raft, &raft->leader_sid,
2940 buf1, sizeof buf1),
2941 raft_get_nickname(raft, sid, buf2, sizeof buf2));
2942 } else {
2943 char buf[SID_LEN + 1];
2944 VLOG_INFO("server %s is leader for term %"PRIu64,
2945 raft_get_nickname(raft, sid, buf, sizeof buf),
2946 raft->term);
2947 }
2948 raft_set_leader(raft, sid);
2949
2950 /* Record the leader to the log. This is not used by the algorithm
2951 * (although it could be, for quick restart), but it is used for
2952 * offline analysis to check for conformance with the properties
2953 * that Raft guarantees. */
2954 struct raft_record r = {
2955 .type = RAFT_REC_LEADER,
2956 .term = raft->term,
2957 .sid = *sid,
2958 };
2959 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2960 }
2961 return true;
2962 }
2963
2964 static void
2965 raft_handle_append_request(struct raft *raft,
2966 const struct raft_append_request *rq)
2967 {
2968 /* We do not check whether the server that sent the request is part of the
2969 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
2970 * from a leader that is not part of the server’s latest configuration.
2971 * Otherwise, a new server could never be added to the cluster (it would
2972 * never accept any log entries preceding the configuration entry that adds
2973 * the server)." */
2974 if (!raft_update_leader(raft, &rq->common.sid)) {
2975 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
2976 "usurped leadership");
2977 return;
2978 }
2979 raft_reset_election_timer(raft);
2980
2981 /* First check for the common case, where the AppendEntries request is
2982 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
2983 * like this:
2984 *
2985 * rq->prev_log_index
2986 * | first_entry_index
2987 * | | nth_entry_index
2988 * | | |
2989 * v v v
2990 * +---+---+---+---+
2991 * T | T | T | T | T |
2992 * +---+-------+---+
2993 * +---+---+---+---+
2994 * T | T | T | T | T |
2995 * +---+---+---+---+
2996 * ^ ^
2997 * | |
2998 * log_start log_end
2999 * */
3000 uint64_t first_entry_index = rq->prev_log_index + 1;
3001 uint64_t nth_entry_index = rq->prev_log_index + rq->n_entries;
3002 if (OVS_LIKELY(first_entry_index >= raft->log_start)) {
3003 raft_handle_append_entries(raft, rq,
3004 rq->prev_log_index, rq->prev_log_term,
3005 rq->entries, rq->n_entries);
3006 return;
3007 }
3008
3009 /* Now a series of checks for odd cases, where the AppendEntries request
3010 * extends earlier than the beginning of our log, into the log entries
3011 * discarded by the most recent snapshot. */
3012
3013 /*
3014 * Handle the case where the indexes covered by rq->entries[] are entirely
3015 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
3016 * everything in the AppendEntries request must already have been
3017 * committed, and we might as well return true.
3018 *
3019 * rq->prev_log_index
3020 * | first_entry_index
3021 * | | nth_entry_index
3022 * | | |
3023 * v v v
3024 * +---+---+---+---+
3025 * T | T | T | T | T |
3026 * +---+-------+---+
3027 * +---+---+---+---+
3028 * T | T | T | T | T |
3029 * +---+---+---+---+
3030 * ^ ^
3031 * | |
3032 * log_start log_end
3033 */
3034 if (nth_entry_index < raft->log_start - 1) {
3035 raft_send_append_reply(raft, rq, RAFT_APPEND_OK,
3036 "append before log start");
3037 return;
3038 }
3039
3040 /*
3041 * Handle the case where the last entry in rq->entries[] has the same index
3042 * as 'log_start - 1', so we can compare their terms:
3043 *
3044 * rq->prev_log_index
3045 * | first_entry_index
3046 * | | nth_entry_index
3047 * | | |
3048 * v v v
3049 * +---+---+---+---+
3050 * T | T | T | T | T |
3051 * +---+-------+---+
3052 * +---+---+---+---+
3053 * T | T | T | T | T |
3054 * +---+---+---+---+
3055 * ^ ^
3056 * | |
3057 * log_start log_end
3058 *
3059 * There's actually a sub-case where rq->n_entries == 0, in which we
3060 * compare rq->prev_term:
3061 *
3062 * rq->prev_log_index
3063 * |
3064 * |
3065 * |
3066 * v
3067 * T
3068 *
3069 * +---+---+---+---+
3070 * T | T | T | T | T |
3071 * +---+---+---+---+
3072 * ^ ^
3073 * | |
3074 * log_start log_end
3075 */
3076 if (nth_entry_index == raft->log_start - 1) {
3077 if (rq->n_entries
3078 ? raft->snap.term == rq->entries[rq->n_entries - 1].term
3079 : raft->snap.term == rq->prev_log_term) {
3080 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
3081 } else {
3082 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
3083 "term mismatch");
3084 }
3085 return;
3086 }
3087
3088 /*
3089 * We now know that the data in rq->entries[] overlaps the data in
3090 * raft->entries[], as shown below, with some positive 'ofs':
3091 *
3092 * rq->prev_log_index
3093 * | first_entry_index
3094 * | | nth_entry_index
3095 * | | |
3096 * v v v
3097 * +---+---+---+---+---+
3098 * T | T | T | T | T | T |
3099 * +---+-------+---+---+
3100 * +---+---+---+---+
3101 * T | T | T | T | T |
3102 * +---+---+---+---+
3103 * ^ ^
3104 * | |
3105 * log_start log_end
3106 *
3107 * |<-- ofs -->|
3108 *
3109 * We transform this into the following by trimming the first 'ofs'
3110 * elements off of rq->entries[], ending up with the following. Notice how
3111 * we retain the term but not the data for rq->entries[ofs - 1]:
3112 *
3113 * first_entry_index + ofs - 1
3114 * | first_entry_index + ofs
3115 * | | nth_entry_index + ofs
3116 * | | |
3117 * v v v
3118 * +---+---+
3119 * T | T | T |
3120 * +---+---+
3121 * +---+---+---+---+
3122 * T | T | T | T | T |
3123 * +---+---+---+---+
3124 * ^ ^
3125 * | |
3126 * log_start log_end
3127 */
3128 uint64_t ofs = raft->log_start - first_entry_index;
3129 raft_handle_append_entries(raft, rq,
3130 raft->log_start - 1, rq->entries[ofs - 1].term,
3131 &rq->entries[ofs], rq->n_entries - ofs);
3132 }
3133
3134 /* Returns true if 'raft' has another log entry or snapshot to read. */
3135 bool
3136 raft_has_next_entry(const struct raft *raft_)
3137 {
3138 struct raft *raft = CONST_CAST(struct raft *, raft_);
3139 struct uuid eid;
3140 return raft_peek_next_entry(raft, &eid) != NULL;
3141 }
3142
3143 /* Returns the next log entry or snapshot from 'raft', or NULL if there are
3144 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
3145 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
3146 * log entry. */
3147 const struct json *
3148 raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
3149 {
3150 const struct json *data = raft_get_next_entry(raft, eid);
3151 *is_snapshot = data == raft->snap.data;
3152 return data;
3153 }
3154
3155 /* Returns the log index of the last-read snapshot or log entry. */
3156 uint64_t
3157 raft_get_applied_index(const struct raft *raft)
3158 {
3159 return raft->last_applied;
3160 }
3161
3162 /* Returns the log index of the last snapshot or log entry that is available to
3163 * be read. */
3164 uint64_t
3165 raft_get_commit_index(const struct raft *raft)
3166 {
3167 return raft->commit_index;
3168 }
3169
3170 static struct raft_server *
3171 raft_find_peer(struct raft *raft, const struct uuid *uuid)
3172 {
3173 struct raft_server *s = raft_find_server(raft, uuid);
3174 return s && !uuid_equals(&raft->sid, &s->sid) ? s : NULL;
3175 }
3176
3177 static struct raft_server *
3178 raft_find_new_server(struct raft *raft, const struct uuid *uuid)
3179 {
3180 return raft_server_find(&raft->add_servers, uuid);
3181 }
3182
3183 /* Figure 3.1: "If there exists an N such that N > commitIndex, a
3184 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
3185 * commitIndex = N (sections 3.5 and 3.6)." */
3186 static void
3187 raft_consider_updating_commit_index(struct raft *raft)
3188 {
3189 /* This loop cannot just bail out when it comes across a log entry that
3190 * does not match the criteria. For example, Figure 3.7(d2) shows a
3191 * case where the log entry for term 2 cannot be committed directly
3192 * (because it is not for the current term) but it can be committed as
3193 * a side effect of commit the entry for term 4 (the current term).
3194 * XXX Is there a more efficient way to do this? */
3195 ovs_assert(raft->role == RAFT_LEADER);
3196
3197 uint64_t new_commit_index = raft->commit_index;
3198 for (uint64_t idx = MAX(raft->commit_index + 1, raft->log_start);
3199 idx < raft->log_end; idx++) {
3200 if (raft->entries[idx - raft->log_start].term == raft->term) {
3201 size_t count = 0;
3202 struct raft_server *s2;
3203 HMAP_FOR_EACH (s2, hmap_node, &raft->servers) {
3204 if (s2->match_index >= idx) {
3205 count++;
3206 }
3207 }
3208 if (count > hmap_count(&raft->servers) / 2) {
3209 VLOG_DBG("index %"PRIu64" committed to %"PRIuSIZE" servers, "
3210 "applying", idx, count);
3211 new_commit_index = idx;
3212 }
3213 }
3214 }
3215 if (raft_update_commit_index(raft, new_commit_index)) {
3216 raft_send_heartbeats(raft);
3217 }
3218 }
3219
3220 static void
3221 raft_update_match_index(struct raft *raft, struct raft_server *s,
3222 uint64_t min_index)
3223 {
3224 ovs_assert(raft->role == RAFT_LEADER);
3225 if (min_index > s->match_index) {
3226 s->match_index = min_index;
3227 raft_consider_updating_commit_index(raft);
3228 }
3229 }
3230
3231 static void
3232 raft_update_our_match_index(struct raft *raft, uint64_t min_index)
3233 {
3234 struct raft_server *server = raft_find_server(raft, &raft->sid);
3235 if (server) {
3236 raft_update_match_index(raft, server, min_index);
3237 }
3238 }
3239
3240 static void
3241 raft_send_install_snapshot_request(struct raft *raft,
3242 const struct raft_server *s,
3243 const char *comment)
3244 {
3245 union raft_rpc rpc = {
3246 .install_snapshot_request = {
3247 .common = {
3248 .type = RAFT_RPC_INSTALL_SNAPSHOT_REQUEST,
3249 .sid = s->sid,
3250 .comment = CONST_CAST(char *, comment),
3251 },
3252 .term = raft->term,
3253 .last_index = raft->log_start - 1,
3254 .last_term = raft->snap.term,
3255 .last_servers = raft->snap.servers,
3256 .last_eid = raft->snap.eid,
3257 .data = raft->snap.data,
3258 .election_timer = raft->election_timer,
3259 }
3260 };
3261 raft_send(raft, &rpc);
3262 }
3263
3264 static void
3265 raft_handle_append_reply(struct raft *raft,
3266 const struct raft_append_reply *rpy)
3267 {
3268 if (raft->role != RAFT_LEADER) {
3269 VLOG_INFO("rejected append_reply (not leader)");
3270 return;
3271 }
3272
3273 /* Most commonly we'd be getting an AppendEntries reply from a configured
3274 * server (e.g. a peer), but we can also get them from servers in the
3275 * process of being added. */
3276 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3277 if (!s) {
3278 s = raft_find_new_server(raft, &rpy->common.sid);
3279 if (!s) {
3280 VLOG_INFO("rejected append_reply from unknown server "SID_FMT,
3281 SID_ARGS(&rpy->common.sid));
3282 return;
3283 }
3284 }
3285
3286 s->replied = true;
3287 if (rpy->result == RAFT_APPEND_OK) {
3288 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3289 * follower (section 3.5)." */
3290 uint64_t min_index = rpy->prev_log_index + rpy->n_entries + 1;
3291 if (s->next_index < min_index) {
3292 s->next_index = min_index;
3293 }
3294 raft_update_match_index(raft, s, min_index - 1);
3295 } else {
3296 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3297 * decrement nextIndex and retry (section 3.5)."
3298 *
3299 * We also implement the optimization suggested in section 4.2.1:
3300 * "Various approaches can make nextIndex converge to its correct value
3301 * more quickly, including those described in Chapter 3. The simplest
3302 * approach to solving this particular problem of adding a new server,
3303 * however, is to have followers return the length of their logs in the
3304 * AppendEntries response; this allows the leader to cap the follower’s
3305 * nextIndex accordingly." */
3306 s->next_index = (s->next_index > 0
3307 ? MIN(s->next_index - 1, rpy->log_end)
3308 : 0);
3309
3310 if (rpy->result == RAFT_APPEND_IO_ERROR) {
3311 /* Append failed but not because of a log inconsistency. Because
3312 * of the I/O error, there's no point in re-sending the append
3313 * immediately. */
3314 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3315 VLOG_INFO_RL(&rl, "%s reported I/O error", s->nickname);
3316 return;
3317 }
3318 }
3319
3320 /*
3321 * Our behavior here must depend on the value of next_index relative to
3322 * log_start and log_end. There are three cases:
3323 *
3324 * Case 1 | Case 2 | Case 3
3325 * <---------------->|<------------->|<------------------>
3326 * | |
3327 *
3328 * +---+---+---+---+
3329 * T | T | T | T | T |
3330 * +---+---+---+---+
3331 * ^ ^
3332 * | |
3333 * log_start log_end
3334 */
3335 if (s->next_index < raft->log_start) {
3336 /* Case 1. */
3337 raft_send_install_snapshot_request(raft, s, NULL);
3338 } else if (s->next_index < raft->log_end) {
3339 /* Case 2. */
3340 raft_send_append_request(raft, s, 1, NULL);
3341 } else {
3342 /* Case 3. */
3343 if (s->phase == RAFT_PHASE_CATCHUP) {
3344 s->phase = RAFT_PHASE_CAUGHT_UP;
3345 raft_run_reconfigure(raft);
3346 }
3347 }
3348 }
3349
3350 static bool
3351 raft_should_suppress_disruptive_server(struct raft *raft,
3352 const union raft_rpc *rpc)
3353 {
3354 if (rpc->type != RAFT_RPC_VOTE_REQUEST) {
3355 return false;
3356 }
3357
3358 /* Section 4.2.3 "Disruptive Servers" says:
3359 *
3360 * ...if a server receives a RequestVote request within the minimum
3361 * election timeout of hearing from a current leader, it does not update
3362 * its term or grant its vote...
3363 *
3364 * ...This change conflicts with the leadership transfer mechanism as
3365 * described in Chapter 3, in which a server legitimately starts an
3366 * election without waiting an election timeout. In that case,
3367 * RequestVote messages should be processed by other servers even when
3368 * they believe a current cluster leader exists. Those RequestVote
3369 * requests can include a special flag to indicate this behavior (“I
3370 * have permission to disrupt the leader--it told me to!”).
3371 *
3372 * This clearly describes how the followers should act, but not the leader.
3373 * We just ignore vote requests that arrive at a current leader. This
3374 * seems to be fairly safe, since a majority other than the current leader
3375 * can still elect a new leader and the first AppendEntries from that new
3376 * leader will depose the current leader. */
3377 const struct raft_vote_request *rq = raft_vote_request_cast(rpc);
3378 if (rq->leadership_transfer) {
3379 return false;
3380 }
3381
3382 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3383 long long int now = time_msec();
3384 switch (raft->role) {
3385 case RAFT_LEADER:
3386 VLOG_WARN_RL(&rl, "ignoring vote request received as leader");
3387 return true;
3388
3389 case RAFT_FOLLOWER:
3390 if (now < raft->election_base + raft->election_timer) {
3391 VLOG_WARN_RL(&rl, "ignoring vote request received after only "
3392 "%lld ms (minimum election time is %"PRIu64" ms)",
3393 now - raft->election_base, raft->election_timer);
3394 return true;
3395 }
3396 return false;
3397
3398 case RAFT_CANDIDATE:
3399 return false;
3400
3401 default:
3402 OVS_NOT_REACHED();
3403 }
3404 }
3405
3406 /* Returns true if a reply should be sent. */
3407 static bool
3408 raft_handle_vote_request__(struct raft *raft,
3409 const struct raft_vote_request *rq)
3410 {
3411 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3412 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3413 * 3.6)." */
3414 if (uuid_equals(&raft->vote, &rq->common.sid)) {
3415 /* Already voted for this candidate in this term. Resend vote. */
3416 return true;
3417 } else if (!uuid_is_zero(&raft->vote)) {
3418 /* Already voted for different candidate in this term. Send a reply
3419 * saying what candidate we did vote for. This isn't a necessary part
3420 * of the Raft protocol but it can make debugging easier. */
3421 return true;
3422 }
3423
3424 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3425 * includes information about the candidate’s log, and the voter denies its
3426 * vote if its own log is more up-to-date than that of the candidate. Raft
3427 * determines which of two logs is more up-to-date by comparing the index
3428 * and term of the last entries in the logs. If the logs have last entries
3429 * with different terms, then the log with the later term is more
3430 * up-to-date. If the logs end with the same term, then whichever log is
3431 * longer is more up-to-date." */
3432 uint64_t last_term = (raft->log_end > raft->log_start
3433 ? raft->entries[raft->log_end - 1
3434 - raft->log_start].term
3435 : raft->snap.term);
3436 if (last_term > rq->last_log_term
3437 || (last_term == rq->last_log_term
3438 && raft->log_end - 1 > rq->last_log_index)) {
3439 /* Our log is more up-to-date than the peer's. Withhold vote. */
3440 return false;
3441 }
3442
3443 /* Record a vote for the peer. */
3444 if (!raft_set_term(raft, raft->term, &rq->common.sid)) {
3445 return false;
3446 }
3447
3448 raft_reset_election_timer(raft);
3449
3450 return true;
3451 }
3452
3453 static void
3454 raft_send_vote_reply(struct raft *raft, const struct uuid *dst,
3455 const struct uuid *vote)
3456 {
3457 union raft_rpc rpy = {
3458 .vote_reply = {
3459 .common = {
3460 .type = RAFT_RPC_VOTE_REPLY,
3461 .sid = *dst,
3462 },
3463 .term = raft->term,
3464 .vote = *vote,
3465 },
3466 };
3467 raft_send(raft, &rpy);
3468 }
3469
3470 static void
3471 raft_handle_vote_request(struct raft *raft,
3472 const struct raft_vote_request *rq)
3473 {
3474 if (raft_handle_vote_request__(raft, rq)) {
3475 raft_send_vote_reply(raft, &rq->common.sid, &raft->vote);
3476 }
3477 }
3478
3479 static void
3480 raft_handle_vote_reply(struct raft *raft,
3481 const struct raft_vote_reply *rpy)
3482 {
3483 if (!raft_receive_term__(raft, &rpy->common, rpy->term)) {
3484 return;
3485 }
3486
3487 if (raft->role != RAFT_CANDIDATE) {
3488 return;
3489 }
3490
3491 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3492 if (s) {
3493 raft_accept_vote(raft, s, &rpy->vote);
3494 }
3495 }
3496
3497 /* Returns true if 'raft''s log contains reconfiguration entries that have not
3498 * yet been committed. */
3499 static bool
3500 raft_has_uncommitted_configuration(const struct raft *raft)
3501 {
3502 for (uint64_t i = raft->commit_index + 1; i < raft->log_end; i++) {
3503 ovs_assert(i >= raft->log_start);
3504 const struct raft_entry *e = &raft->entries[i - raft->log_start];
3505 if (e->servers) {
3506 return true;
3507 }
3508 }
3509 return false;
3510 }
3511
3512 static void
3513 raft_log_reconfiguration(struct raft *raft)
3514 {
3515 struct json *servers_json = raft_servers_to_json(&raft->servers);
3516 raft_command_unref(raft_command_execute__(
3517 raft, NULL, servers_json, 0, NULL, NULL));
3518 json_destroy(servers_json);
3519 }
3520
3521 static void
3522 raft_run_reconfigure(struct raft *raft)
3523 {
3524 ovs_assert(raft->role == RAFT_LEADER);
3525
3526 /* Reconfiguration only progresses when configuration changes commit. */
3527 if (raft_has_uncommitted_configuration(raft)) {
3528 return;
3529 }
3530
3531 /* If we were waiting for a configuration change to commit, it's done. */
3532 struct raft_server *s;
3533 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3534 if (s->phase == RAFT_PHASE_COMMITTING) {
3535 raft_send_add_server_reply__(raft, &s->sid, s->address,
3536 true, RAFT_SERVER_COMPLETED);
3537 s->phase = RAFT_PHASE_STABLE;
3538 }
3539 }
3540 if (raft->remove_server) {
3541 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
3542 &raft->remove_server->requester_sid,
3543 raft->remove_server->requester_conn,
3544 true, RAFT_SERVER_COMPLETED);
3545 raft_server_destroy(raft->remove_server);
3546 raft->remove_server = NULL;
3547 }
3548
3549 /* If a new server is caught up, add it to the configuration. */
3550 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
3551 if (s->phase == RAFT_PHASE_CAUGHT_UP) {
3552 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3553 hmap_remove(&raft->add_servers, &s->hmap_node);
3554 hmap_insert(&raft->servers, &s->hmap_node, uuid_hash(&s->sid));
3555
3556 /* Mark 's' as waiting for commit. */
3557 s->phase = RAFT_PHASE_COMMITTING;
3558
3559 raft_log_reconfiguration(raft);
3560
3561 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3562 * send a RAFT_SERVER_OK reply. */
3563
3564 return;
3565 }
3566 }
3567
3568 /* Remove a server, if one is scheduled for removal. */
3569 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3570 if (s->phase == RAFT_PHASE_REMOVE) {
3571 hmap_remove(&raft->servers, &s->hmap_node);
3572 raft->remove_server = s;
3573
3574 raft_log_reconfiguration(raft);
3575
3576 return;
3577 }
3578 }
3579 }
3580
3581 static void
3582 raft_handle_add_server_request(struct raft *raft,
3583 const struct raft_add_server_request *rq)
3584 {
3585 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3586 if (raft->role != RAFT_LEADER) {
3587 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3588 return;
3589 }
3590
3591 /* Check for an existing server. */
3592 struct raft_server *s = raft_find_server(raft, &rq->common.sid);
3593 if (s) {
3594 /* If the server is scheduled to be removed, cancel it. */
3595 if (s->phase == RAFT_PHASE_REMOVE) {
3596 s->phase = RAFT_PHASE_STABLE;
3597 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_CANCELED);
3598 return;
3599 }
3600
3601 /* If the server is being added, then it's in progress. */
3602 if (s->phase != RAFT_PHASE_STABLE) {
3603 raft_send_add_server_reply(raft, rq,
3604 false, RAFT_SERVER_IN_PROGRESS);
3605 }
3606
3607 /* Nothing to do--server is already part of the configuration. */
3608 raft_send_add_server_reply(raft, rq,
3609 true, RAFT_SERVER_ALREADY_PRESENT);
3610 return;
3611 }
3612
3613 /* Check for a server being removed. */
3614 if (raft->remove_server
3615 && uuid_equals(&rq->common.sid, &raft->remove_server->sid)) {
3616 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3617 return;
3618 }
3619
3620 /* Check for a server already being added. */
3621 if (raft_find_new_server(raft, &rq->common.sid)) {
3622 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_IN_PROGRESS);
3623 return;
3624 }
3625
3626 /* Add server to 'add_servers'. */
3627 s = raft_server_add(&raft->add_servers, &rq->common.sid, rq->address);
3628 raft_server_init_leader(raft, s);
3629 s->requester_sid = rq->common.sid;
3630 s->requester_conn = NULL;
3631 s->phase = RAFT_PHASE_CATCHUP;
3632
3633 /* Start sending the log. If this is the first time we've tried to add
3634 * this server, then this will quickly degenerate into an InstallSnapshot
3635 * followed by a series of AddEntries, but if it's a retry of an earlier
3636 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3637 * leadership) then it will gracefully resume populating the log.
3638 *
3639 * See the last few paragraphs of section 4.2.1 for further insight. */
3640 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3641 VLOG_INFO_RL(&rl,
3642 "starting to add server %s ("SID_FMT" at %s) "
3643 "to cluster "CID_FMT, s->nickname, SID_ARGS(&s->sid),
3644 rq->address, CID_ARGS(&raft->cid));
3645 raft_send_append_request(raft, s, 0, "initialize new server");
3646 }
3647
3648 static void
3649 raft_handle_add_server_reply(struct raft *raft,
3650 const struct raft_add_server_reply *rpy)
3651 {
3652 if (!raft->joining) {
3653 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3654 VLOG_WARN_RL(&rl, "received add_server_reply even though we're "
3655 "already part of the cluster");
3656 return;
3657 }
3658
3659 if (rpy->success) {
3660 raft->joining = false;
3661
3662 /* It is tempting, at this point, to check that this server is part of
3663 * the current configuration. However, this is not necessarily the
3664 * case, because the log entry that added this server to the cluster
3665 * might have been committed by a majority of the cluster that does not
3666 * include this one. This actually happens in testing. */
3667 } else {
3668 const char *address;
3669 SSET_FOR_EACH (address, &rpy->remote_addresses) {
3670 if (sset_add(&raft->remote_addresses, address)) {
3671 VLOG_INFO("%s: learned new server address for joining cluster",
3672 address);
3673 }
3674 }
3675 }
3676 }
3677
3678 /* This is called by raft_unixctl_kick() as well as via RPC. */
3679 static void
3680 raft_handle_remove_server_request(struct raft *raft,
3681 const struct raft_remove_server_request *rq)
3682 {
3683 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3684 if (raft->role != RAFT_LEADER) {
3685 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3686 return;
3687 }
3688
3689 /* If the server to remove is currently waiting to be added, cancel it. */
3690 struct raft_server *target = raft_find_new_server(raft, &rq->sid);
3691 if (target) {
3692 raft_send_add_server_reply__(raft, &target->sid, target->address,
3693 false, RAFT_SERVER_CANCELED);
3694 hmap_remove(&raft->add_servers, &target->hmap_node);
3695 raft_server_destroy(target);
3696 return;
3697 }
3698
3699 /* If the server isn't configured, report that. */
3700 target = raft_find_server(raft, &rq->sid);
3701 if (!target) {
3702 raft_send_remove_server_reply(raft, rq,
3703 true, RAFT_SERVER_ALREADY_GONE);
3704 return;
3705 }
3706
3707 /* Check whether we're waiting for the addition of the server to commit. */
3708 if (target->phase == RAFT_PHASE_COMMITTING) {
3709 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3710 return;
3711 }
3712
3713 /* Check whether the server is already scheduled for removal. */
3714 if (target->phase == RAFT_PHASE_REMOVE) {
3715 raft_send_remove_server_reply(raft, rq,
3716 false, RAFT_SERVER_IN_PROGRESS);
3717 return;
3718 }
3719
3720 /* Make sure that if we remove this server then that at least one other
3721 * server will be left. We don't count servers currently being added (in
3722 * 'add_servers') since those could fail. */
3723 struct raft_server *s;
3724 int n = 0;
3725 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3726 if (s != target && s->phase != RAFT_PHASE_REMOVE) {
3727 n++;
3728 }
3729 }
3730 if (!n) {
3731 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_EMPTY);
3732 return;
3733 }
3734
3735 /* Mark the server for removal. */
3736 target->phase = RAFT_PHASE_REMOVE;
3737 if (rq->requester_conn) {
3738 target->requester_sid = UUID_ZERO;
3739 unixctl_command_reply(rq->requester_conn, "started removal");
3740 } else {
3741 target->requester_sid = rq->common.sid;
3742 target->requester_conn = NULL;
3743 }
3744
3745 raft_run_reconfigure(raft);
3746 /* Operation in progress, reply will be sent later. */
3747 }
3748
3749 static void
3750 raft_finished_leaving_cluster(struct raft *raft)
3751 {
3752 VLOG_INFO(SID_FMT": finished leaving cluster "CID_FMT,
3753 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
3754
3755 raft_record_note(raft, "left", "this server left the cluster");
3756
3757 raft->leaving = false;
3758 raft->left = true;
3759 }
3760
3761 static void
3762 raft_handle_remove_server_reply(struct raft *raft,
3763 const struct raft_remove_server_reply *rpc)
3764 {
3765 if (rpc->success
3766 && (uuid_is_zero(&rpc->target_sid)
3767 || uuid_equals(&rpc->target_sid, &raft->sid))) {
3768 raft_finished_leaving_cluster(raft);
3769 }
3770 }
3771
3772 static bool
3773 raft_handle_write_error(struct raft *raft, struct ovsdb_error *error)
3774 {
3775 if (error && !raft->failed) {
3776 raft->failed = true;
3777
3778 char *s = ovsdb_error_to_string_free(error);
3779 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3780 raft->name, s);
3781 free(s);
3782 }
3783 return !raft->failed;
3784 }
3785
3786 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3787 raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
3788 uint64_t new_log_start,
3789 const struct raft_entry *new_snapshot)
3790 {
3791 struct raft_header h = {
3792 .sid = raft->sid,
3793 .cid = raft->cid,
3794 .name = raft->name,
3795 .local_address = raft->local_address,
3796 .snap_index = new_log_start - 1,
3797 .snap = *new_snapshot,
3798 };
3799 struct ovsdb_error *error = ovsdb_log_write_and_free(
3800 log, raft_header_to_json(&h));
3801 if (error) {
3802 return error;
3803 }
3804 ovsdb_log_mark_base(raft->log);
3805
3806 /* Write log records. */
3807 for (uint64_t index = new_log_start; index < raft->log_end; index++) {
3808 const struct raft_entry *e = &raft->entries[index - raft->log_start];
3809 struct raft_record r = {
3810 .type = RAFT_REC_ENTRY,
3811 .term = e->term,
3812 .entry = {
3813 .index = index,
3814 .data = e->data,
3815 .servers = e->servers,
3816 .election_timer = e->election_timer,
3817 .eid = e->eid,
3818 },
3819 };
3820 error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3821 if (error) {
3822 return error;
3823 }
3824 }
3825
3826 /* Write term and vote (if any).
3827 *
3828 * The term is redundant if we wrote a log record for that term above. The
3829 * vote, if any, is never redundant.
3830 */
3831 error = raft_write_state(log, raft->term, &raft->vote);
3832 if (error) {
3833 return error;
3834 }
3835
3836 /* Write commit_index if it's beyond the new start of the log. */
3837 if (raft->commit_index >= new_log_start) {
3838 struct raft_record r = {
3839 .type = RAFT_REC_COMMIT_INDEX,
3840 .commit_index = raft->commit_index,
3841 };
3842 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3843 }
3844 return NULL;
3845 }
3846
3847 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3848 raft_save_snapshot(struct raft *raft,
3849 uint64_t new_start, const struct raft_entry *new_snapshot)
3850
3851 {
3852 struct ovsdb_log *new_log;
3853 struct ovsdb_error *error;
3854 error = ovsdb_log_replace_start(raft->log, &new_log);
3855 if (error) {
3856 return error;
3857 }
3858
3859 error = raft_write_snapshot(raft, new_log, new_start, new_snapshot);
3860 if (error) {
3861 ovsdb_log_replace_abort(new_log);
3862 return error;
3863 }
3864
3865 return ovsdb_log_replace_commit(raft->log, new_log);
3866 }
3867
3868 static bool
3869 raft_handle_install_snapshot_request__(
3870 struct raft *raft, const struct raft_install_snapshot_request *rq)
3871 {
3872 raft_reset_election_timer(raft);
3873
3874 /*
3875 * Our behavior here depend on new_log_start in the snapshot compared to
3876 * log_start and log_end. There are three cases:
3877 *
3878 * Case 1 | Case 2 | Case 3
3879 * <---------------->|<------------->|<------------------>
3880 * | |
3881 *
3882 * +---+---+---+---+
3883 * T | T | T | T | T |
3884 * +---+---+---+---+
3885 * ^ ^
3886 * | |
3887 * log_start log_end
3888 */
3889 uint64_t new_log_start = rq->last_index + 1;
3890 if (new_log_start < raft->log_start) {
3891 /* Case 1: The new snapshot covers less than our current one. Nothing
3892 * to do. */
3893 return true;
3894 } else if (new_log_start < raft->log_end) {
3895 /* Case 2: The new snapshot starts in the middle of our log. We could
3896 * discard the first 'new_log_start - raft->log_start' entries in the
3897 * log. But there's not much value in that, since snapshotting is
3898 * supposed to be a local decision. Just skip it. */
3899 return true;
3900 }
3901
3902 /* Case 3: The new snapshot starts past the end of our current log, so
3903 * discard all of our current log. */
3904 const struct raft_entry new_snapshot = {
3905 .term = rq->last_term,
3906 .data = rq->data,
3907 .eid = rq->last_eid,
3908 .servers = rq->last_servers,
3909 .election_timer = rq->election_timer,
3910 };
3911 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3912 &new_snapshot);
3913 if (error) {
3914 char *error_s = ovsdb_error_to_string(error);
3915 VLOG_WARN("could not save snapshot: %s", error_s);
3916 free(error_s);
3917 return false;
3918 }
3919
3920 for (size_t i = 0; i < raft->log_end - raft->log_start; i++) {
3921 raft_entry_uninit(&raft->entries[i]);
3922 }
3923 raft->log_start = raft->log_end = new_log_start;
3924 raft->log_synced = raft->log_end - 1;
3925 raft->commit_index = raft->log_start - 1;
3926 if (raft->last_applied < raft->commit_index) {
3927 raft->last_applied = raft->log_start - 2;
3928 }
3929
3930 raft_entry_uninit(&raft->snap);
3931 raft_entry_clone(&raft->snap, &new_snapshot);
3932
3933 raft_get_servers_from_log(raft, VLL_INFO);
3934 raft_get_election_timer_from_log(raft);
3935
3936 return true;
3937 }
3938
3939 static void
3940 raft_handle_install_snapshot_request(
3941 struct raft *raft, const struct raft_install_snapshot_request *rq)
3942 {
3943 if (raft_handle_install_snapshot_request__(raft, rq)) {
3944 union raft_rpc rpy = {
3945 .install_snapshot_reply = {
3946 .common = {
3947 .type = RAFT_RPC_INSTALL_SNAPSHOT_REPLY,
3948 .sid = rq->common.sid,
3949 },
3950 .term = raft->term,
3951 .last_index = rq->last_index,
3952 .last_term = rq->last_term,
3953 },
3954 };
3955 raft_send(raft, &rpy);
3956 }
3957 }
3958
3959 static void
3960 raft_handle_install_snapshot_reply(
3961 struct raft *raft, const struct raft_install_snapshot_reply *rpy)
3962 {
3963 /* We might get an InstallSnapshot reply from a configured server (e.g. a
3964 * peer) or a server in the process of being added. */
3965 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3966 if (!s) {
3967 s = raft_find_new_server(raft, &rpy->common.sid);
3968 if (!s) {
3969 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3970 VLOG_INFO_RL(&rl, "cluster "CID_FMT": received %s from "
3971 "unknown server "SID_FMT, CID_ARGS(&raft->cid),
3972 raft_rpc_type_to_string(rpy->common.type),
3973 SID_ARGS(&rpy->common.sid));
3974 return;
3975 }
3976 }
3977
3978 if (rpy->last_index != raft->log_start - 1 ||
3979 rpy->last_term != raft->snap.term) {
3980 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3981 VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s installed "
3982 "out-of-date snapshot, starting over",
3983 CID_ARGS(&raft->cid), s->nickname);
3984 raft_send_install_snapshot_request(raft, s,
3985 "installed obsolete snapshot");
3986 return;
3987 }
3988
3989 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3990 VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
3991 " up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
3992 s->nickname, rpy->last_term, rpy->last_index);
3993 s->next_index = raft->log_end;
3994 raft_send_append_request(raft, s, 0, "snapshot installed");
3995 }
3996
3997 /* Returns true if 'raft' has grown enough since the last snapshot that
3998 * reducing the log to a snapshot would be valuable, false otherwise. */
3999 bool
4000 raft_grew_lots(const struct raft *raft)
4001 {
4002 return ovsdb_log_grew_lots(raft->log);
4003 }
4004
4005 /* Returns the number of log entries that could be trimmed off the on-disk log
4006 * by snapshotting. */
4007 uint64_t
4008 raft_get_log_length(const struct raft *raft)
4009 {
4010 return (raft->last_applied < raft->log_start
4011 ? 0
4012 : raft->last_applied - raft->log_start + 1);
4013 }
4014
4015 /* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
4016 * possible. */
4017 bool
4018 raft_may_snapshot(const struct raft *raft)
4019 {
4020 return (!raft->joining
4021 && !raft->leaving
4022 && !raft->left
4023 && !raft->failed
4024 && raft->last_applied >= raft->log_start);
4025 }
4026
4027 /* Replaces the log for 'raft', up to the last log entry read, by
4028 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
4029 * the caller must eventually free.
4030 *
4031 * This function can only succeed if raft_may_snapshot() returns true. It is
4032 * only valuable to call it if raft_get_log_length() is significant and
4033 * especially if raft_grew_lots() returns true. */
4034 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
4035 raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
4036 {
4037 if (raft->joining) {
4038 return ovsdb_error(NULL,
4039 "cannot store a snapshot while joining cluster");
4040 } else if (raft->leaving) {
4041 return ovsdb_error(NULL,
4042 "cannot store a snapshot while leaving cluster");
4043 } else if (raft->left) {
4044 return ovsdb_error(NULL,
4045 "cannot store a snapshot after leaving cluster");
4046 } else if (raft->failed) {
4047 return ovsdb_error(NULL,
4048 "cannot store a snapshot following failure");
4049 }
4050
4051 if (raft->last_applied < raft->log_start) {
4052 return ovsdb_error(NULL, "not storing a duplicate snapshot");
4053 }
4054
4055 uint64_t new_log_start = raft->last_applied + 1;
4056 struct raft_entry new_snapshot = {
4057 .term = raft_get_term(raft, new_log_start - 1),
4058 .data = json_clone(new_snapshot_data),
4059 .eid = *raft_get_eid(raft, new_log_start - 1),
4060 .servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
4061 .election_timer = raft->election_timer,
4062 };
4063 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
4064 &new_snapshot);
4065 if (error) {
4066 raft_entry_uninit(&new_snapshot);
4067 return error;
4068 }
4069
4070 raft->log_synced = raft->log_end - 1;
4071 raft_entry_uninit(&raft->snap);
4072 raft->snap = new_snapshot;
4073 for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
4074 raft_entry_uninit(&raft->entries[i]);
4075 }
4076 memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
4077 (raft->log_end - new_log_start) * sizeof *raft->entries);
4078 raft->log_start = new_log_start;
4079 return NULL;
4080 }
4081
4082 static void
4083 raft_handle_become_leader(struct raft *raft,
4084 const struct raft_become_leader *rq)
4085 {
4086 if (raft->role == RAFT_FOLLOWER) {
4087 char buf[SID_LEN + 1];
4088 VLOG_INFO("received leadership transfer from %s in term %"PRIu64,
4089 raft_get_nickname(raft, &rq->common.sid, buf, sizeof buf),
4090 rq->term);
4091 raft_start_election(raft, true);
4092 }
4093 }
4094
4095 static void
4096 raft_send_execute_command_reply(struct raft *raft,
4097 const struct uuid *sid,
4098 const struct uuid *eid,
4099 enum raft_command_status status,
4100 uint64_t commit_index)
4101 {
4102 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REP) {
4103 ovs_fatal(0, "Raft test: crash before sending execute_command_reply");
4104 }
4105 union raft_rpc rpc = {
4106 .execute_command_reply = {
4107 .common = {
4108 .type = RAFT_RPC_EXECUTE_COMMAND_REPLY,
4109 .sid = *sid,
4110 },
4111 .result = *eid,
4112 .status = status,
4113 .commit_index = commit_index,
4114 },
4115 };
4116 raft_send(raft, &rpc);
4117 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REP) {
4118 ovs_fatal(0, "Raft test: crash after sending execute_command_reply.");
4119 }
4120 }
4121
4122 static enum raft_command_status
4123 raft_handle_execute_command_request__(
4124 struct raft *raft, const struct raft_execute_command_request *rq)
4125 {
4126 if (raft->role != RAFT_LEADER) {
4127 return RAFT_CMD_NOT_LEADER;
4128 }
4129
4130 const struct uuid *current_eid = raft_current_eid(raft);
4131 if (!uuid_equals(&rq->prereq, current_eid)) {
4132 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4133 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
4134 "prerequisite "UUID_FMT" in execute_command_request",
4135 UUID_ARGS(current_eid), UUID_ARGS(&rq->prereq));
4136 return RAFT_CMD_BAD_PREREQ;
4137 }
4138
4139 struct raft_command *cmd = raft_command_initiate(raft, rq->data,
4140 NULL, 0, &rq->result);
4141 cmd->sid = rq->common.sid;
4142
4143 enum raft_command_status status = cmd->status;
4144 if (status != RAFT_CMD_INCOMPLETE) {
4145 raft_command_unref(cmd);
4146 }
4147 return status;
4148 }
4149
4150 static void
4151 raft_handle_execute_command_request(
4152 struct raft *raft, const struct raft_execute_command_request *rq)
4153 {
4154 enum raft_command_status status
4155 = raft_handle_execute_command_request__(raft, rq);
4156 if (status != RAFT_CMD_INCOMPLETE) {
4157 raft_send_execute_command_reply(raft, &rq->common.sid, &rq->result,
4158 status, 0);
4159 }
4160 }
4161
4162 static void
4163 raft_handle_execute_command_reply(
4164 struct raft *raft, const struct raft_execute_command_reply *rpy)
4165 {
4166 struct raft_command *cmd = raft_find_command_by_eid(raft, &rpy->result);
4167 if (!cmd) {
4168 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4169 char buf[SID_LEN + 1];
4170 VLOG_INFO_RL(&rl,
4171 "%s received \"%s\" reply from %s for unknown command",
4172 raft->local_nickname,
4173 raft_command_status_to_string(rpy->status),
4174 raft_get_nickname(raft, &rpy->common.sid,
4175 buf, sizeof buf));
4176 return;
4177 }
4178
4179 if (rpy->status == RAFT_CMD_INCOMPLETE) {
4180 cmd->timestamp = time_msec();
4181 } else {
4182 cmd->index = rpy->commit_index;
4183 raft_command_complete(raft, cmd, rpy->status);
4184 }
4185 }
4186
4187 static void
4188 raft_handle_rpc(struct raft *raft, const union raft_rpc *rpc)
4189 {
4190 uint64_t term = raft_rpc_get_term(rpc);
4191 if (term
4192 && !raft_should_suppress_disruptive_server(raft, rpc)
4193 && !raft_receive_term__(raft, &rpc->common, term)) {
4194 if (rpc->type == RAFT_RPC_APPEND_REQUEST) {
4195 /* Section 3.3: "If a server receives a request with a stale term
4196 * number, it rejects the request." */
4197 raft_send_append_reply(raft, raft_append_request_cast(rpc),
4198 RAFT_APPEND_INCONSISTENCY, "stale term");
4199 }
4200 return;
4201 }
4202
4203 switch (rpc->type) {
4204 #define RAFT_RPC(ENUM, NAME) \
4205 case ENUM: \
4206 raft_handle_##NAME(raft, &rpc->NAME); \
4207 break;
4208 RAFT_RPC_TYPES
4209 #undef RAFT_RPC
4210 default:
4211 OVS_NOT_REACHED();
4212 }
4213 }
4214 \f
4215 static bool
4216 raft_rpc_is_heartbeat(const union raft_rpc *rpc)
4217 {
4218 return ((rpc->type == RAFT_RPC_APPEND_REQUEST
4219 || rpc->type == RAFT_RPC_APPEND_REPLY)
4220 && rpc->common.comment
4221 && !strcmp(rpc->common.comment, "heartbeat"));
4222 }
4223
4224 \f
4225 static bool
4226 raft_send_to_conn_at(struct raft *raft, const union raft_rpc *rpc,
4227 struct raft_conn *conn, int line_number)
4228 {
4229 log_rpc(rpc, "-->", conn, line_number);
4230 return !jsonrpc_session_send(
4231 conn->js, raft_rpc_to_jsonrpc(&raft->cid, &raft->sid, rpc));
4232 }
4233
4234 static bool
4235 raft_is_rpc_synced(const struct raft *raft, const union raft_rpc *rpc)
4236 {
4237 uint64_t term = raft_rpc_get_term(rpc);
4238 uint64_t index = raft_rpc_get_min_sync_index(rpc);
4239 const struct uuid *vote = raft_rpc_get_vote(rpc);
4240
4241 return (term <= raft->synced_term
4242 && index <= raft->log_synced
4243 && (!vote || uuid_equals(vote, &raft->synced_vote)));
4244 }
4245
4246 static bool
4247 raft_send_at(struct raft *raft, const union raft_rpc *rpc, int line_number)
4248 {
4249 const struct uuid *dst = &rpc->common.sid;
4250 if (uuid_equals(dst, &raft->sid)) {
4251 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4252 VLOG_WARN_RL(&rl, "attempted to send RPC to self from raft.c:%d",
4253 line_number);
4254 return false;
4255 }
4256
4257 struct raft_conn *conn = raft_find_conn_by_sid(raft, dst);
4258 if (!conn) {
4259 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4260 char buf[SID_LEN + 1];
4261 VLOG_DBG_RL(&rl, "%s: no connection to %s, cannot send RPC "
4262 "from raft.c:%d", raft->local_nickname,
4263 raft_get_nickname(raft, dst, buf, sizeof buf),
4264 line_number);
4265 return false;
4266 }
4267
4268 if (!raft_is_rpc_synced(raft, rpc)) {
4269 raft_waiter_create(raft, RAFT_W_RPC, false)->rpc = raft_rpc_clone(rpc);
4270 return true;
4271 }
4272
4273 return raft_send_to_conn_at(raft, rpc, conn, line_number);
4274 }
4275 \f
4276 static struct raft *
4277 raft_lookup_by_name(const char *name)
4278 {
4279 struct raft *raft;
4280
4281 HMAP_FOR_EACH_WITH_HASH (raft, hmap_node, hash_string(name, 0),
4282 &all_rafts) {
4283 if (!strcmp(raft->name, name)) {
4284 return raft;
4285 }
4286 }
4287 return NULL;
4288 }
4289
4290 static void
4291 raft_unixctl_cid(struct unixctl_conn *conn,
4292 int argc OVS_UNUSED, const char *argv[],
4293 void *aux OVS_UNUSED)
4294 {
4295 struct raft *raft = raft_lookup_by_name(argv[1]);
4296 if (!raft) {
4297 unixctl_command_reply_error(conn, "unknown cluster");
4298 } else if (uuid_is_zero(&raft->cid)) {
4299 unixctl_command_reply_error(conn, "cluster id not yet known");
4300 } else {
4301 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->cid));
4302 unixctl_command_reply(conn, uuid);
4303 free(uuid);
4304 }
4305 }
4306
4307 static void
4308 raft_unixctl_sid(struct unixctl_conn *conn,
4309 int argc OVS_UNUSED, const char *argv[],
4310 void *aux OVS_UNUSED)
4311 {
4312 struct raft *raft = raft_lookup_by_name(argv[1]);
4313 if (!raft) {
4314 unixctl_command_reply_error(conn, "unknown cluster");
4315 } else {
4316 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->sid));
4317 unixctl_command_reply(conn, uuid);
4318 free(uuid);
4319 }
4320 }
4321
4322 static void
4323 raft_put_sid(const char *title, const struct uuid *sid,
4324 const struct raft *raft, struct ds *s)
4325 {
4326 ds_put_format(s, "%s: ", title);
4327 if (uuid_equals(sid, &raft->sid)) {
4328 ds_put_cstr(s, "self");
4329 } else if (uuid_is_zero(sid)) {
4330 ds_put_cstr(s, "unknown");
4331 } else {
4332 char buf[SID_LEN + 1];
4333 ds_put_cstr(s, raft_get_nickname(raft, sid, buf, sizeof buf));
4334 }
4335 ds_put_char(s, '\n');
4336 }
4337
4338 static void
4339 raft_unixctl_status(struct unixctl_conn *conn,
4340 int argc OVS_UNUSED, const char *argv[],
4341 void *aux OVS_UNUSED)
4342 {
4343 struct raft *raft = raft_lookup_by_name(argv[1]);
4344 if (!raft) {
4345 unixctl_command_reply_error(conn, "unknown cluster");
4346 return;
4347 }
4348
4349 struct ds s = DS_EMPTY_INITIALIZER;
4350 ds_put_format(&s, "%s\n", raft->local_nickname);
4351 ds_put_format(&s, "Name: %s\n", raft->name);
4352 ds_put_format(&s, "Cluster ID: ");
4353 if (!uuid_is_zero(&raft->cid)) {
4354 ds_put_format(&s, CID_FMT" ("UUID_FMT")\n",
4355 CID_ARGS(&raft->cid), UUID_ARGS(&raft->cid));
4356 } else {
4357 ds_put_format(&s, "not yet known\n");
4358 }
4359 ds_put_format(&s, "Server ID: "SID_FMT" ("UUID_FMT")\n",
4360 SID_ARGS(&raft->sid), UUID_ARGS(&raft->sid));
4361 ds_put_format(&s, "Address: %s\n", raft->local_address);
4362 ds_put_format(&s, "Status: %s\n",
4363 raft->joining ? "joining cluster"
4364 : raft->leaving ? "leaving cluster"
4365 : raft->left ? "left cluster"
4366 : raft->failed ? "failed"
4367 : "cluster member");
4368 if (raft->joining) {
4369 ds_put_format(&s, "Remotes for joining:");
4370 const char *address;
4371 SSET_FOR_EACH (address, &raft->remote_addresses) {
4372 ds_put_format(&s, " %s", address);
4373 }
4374 ds_put_char(&s, '\n');
4375 }
4376 if (raft->role == RAFT_LEADER) {
4377 struct raft_server *as;
4378 HMAP_FOR_EACH (as, hmap_node, &raft->add_servers) {
4379 ds_put_format(&s, "Adding server %s ("SID_FMT" at %s) (%s)\n",
4380 as->nickname, SID_ARGS(&as->sid), as->address,
4381 raft_server_phase_to_string(as->phase));
4382 }
4383
4384 struct raft_server *rs = raft->remove_server;
4385 if (rs) {
4386 ds_put_format(&s, "Removing server %s ("SID_FMT" at %s) (%s)\n",
4387 rs->nickname, SID_ARGS(&rs->sid), rs->address,
4388 raft_server_phase_to_string(rs->phase));
4389 }
4390 }
4391
4392 ds_put_format(&s, "Role: %s\n",
4393 raft->role == RAFT_LEADER ? "leader"
4394 : raft->role == RAFT_CANDIDATE ? "candidate"
4395 : raft->role == RAFT_FOLLOWER ? "follower"
4396 : "<error>");
4397 ds_put_format(&s, "Term: %"PRIu64"\n", raft->term);
4398 raft_put_sid("Leader", &raft->leader_sid, raft, &s);
4399 raft_put_sid("Vote", &raft->vote, raft, &s);
4400 ds_put_char(&s, '\n');
4401
4402 ds_put_format(&s, "Election timer: %"PRIu64, raft->election_timer);
4403 if (raft->role == RAFT_LEADER && raft->election_timer_new) {
4404 ds_put_format(&s, " (changing to %"PRIu64")",
4405 raft->election_timer_new);
4406 }
4407 ds_put_char(&s, '\n');
4408
4409 ds_put_format(&s, "Log: [%"PRIu64", %"PRIu64"]\n",
4410 raft->log_start, raft->log_end);
4411
4412 uint64_t n_uncommitted = raft->log_end - raft->commit_index - 1;
4413 ds_put_format(&s, "Entries not yet committed: %"PRIu64"\n", n_uncommitted);
4414
4415 uint64_t n_unapplied = raft->log_end - raft->last_applied - 1;
4416 ds_put_format(&s, "Entries not yet applied: %"PRIu64"\n", n_unapplied);
4417
4418 const struct raft_conn *c;
4419 ds_put_cstr(&s, "Connections:");
4420 LIST_FOR_EACH (c, list_node, &raft->conns) {
4421 bool connected = jsonrpc_session_is_connected(c->js);
4422 ds_put_format(&s, " %s%s%s%s",
4423 connected ? "" : "(",
4424 c->incoming ? "<-" : "->", c->nickname,
4425 connected ? "" : ")");
4426 }
4427 ds_put_char(&s, '\n');
4428
4429 ds_put_cstr(&s, "Servers:\n");
4430 struct raft_server *server;
4431 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
4432 ds_put_format(&s, " %s ("SID_FMT" at %s)",
4433 server->nickname,
4434 SID_ARGS(&server->sid), server->address);
4435 if (uuid_equals(&server->sid, &raft->sid)) {
4436 ds_put_cstr(&s, " (self)");
4437 }
4438 if (server->phase != RAFT_PHASE_STABLE) {
4439 ds_put_format (&s, " (%s)",
4440 raft_server_phase_to_string(server->phase));
4441 }
4442 if (raft->role == RAFT_CANDIDATE) {
4443 if (!uuid_is_zero(&server->vote)) {
4444 char buf[SID_LEN + 1];
4445 ds_put_format(&s, " (voted for %s)",
4446 raft_get_nickname(raft, &server->vote,
4447 buf, sizeof buf));
4448 }
4449 } else if (raft->role == RAFT_LEADER) {
4450 ds_put_format(&s, " next_index=%"PRIu64" match_index=%"PRIu64,
4451 server->next_index, server->match_index);
4452 }
4453 ds_put_char(&s, '\n');
4454 }
4455
4456 unixctl_command_reply(conn, ds_cstr(&s));
4457 ds_destroy(&s);
4458 }
4459
4460 static void
4461 raft_unixctl_leave__(struct unixctl_conn *conn, struct raft *raft)
4462 {
4463 if (raft_is_leaving(raft)) {
4464 unixctl_command_reply_error(conn,
4465 "already in progress leaving cluster");
4466 } else if (raft_is_joining(raft)) {
4467 unixctl_command_reply_error(conn,
4468 "can't leave while join in progress");
4469 } else if (raft_failed(raft)) {
4470 unixctl_command_reply_error(conn,
4471 "can't leave after failure");
4472 } else {
4473 raft_leave(raft);
4474 unixctl_command_reply(conn, NULL);
4475 }
4476 }
4477
4478 static void
4479 raft_unixctl_leave(struct unixctl_conn *conn, int argc OVS_UNUSED,
4480 const char *argv[], void *aux OVS_UNUSED)
4481 {
4482 struct raft *raft = raft_lookup_by_name(argv[1]);
4483 if (!raft) {
4484 unixctl_command_reply_error(conn, "unknown cluster");
4485 return;
4486 }
4487
4488 raft_unixctl_leave__(conn, raft);
4489 }
4490
4491 static struct raft_server *
4492 raft_lookup_server_best_match(struct raft *raft, const char *id)
4493 {
4494 struct raft_server *best = NULL;
4495 int best_score = -1;
4496 int n_best = 0;
4497
4498 struct raft_server *s;
4499 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
4500 int score = (!strcmp(id, s->address)
4501 ? INT_MAX
4502 : uuid_is_partial_match(&s->sid, id));
4503 if (score > best_score) {
4504 best = s;
4505 best_score = score;
4506 n_best = 1;
4507 } else if (score == best_score) {
4508 n_best++;
4509 }
4510 }
4511 return n_best == 1 ? best : NULL;
4512 }
4513
4514 static void
4515 raft_unixctl_kick(struct unixctl_conn *conn, int argc OVS_UNUSED,
4516 const char *argv[], void *aux OVS_UNUSED)
4517 {
4518 const char *cluster_name = argv[1];
4519 const char *server_name = argv[2];
4520
4521 struct raft *raft = raft_lookup_by_name(cluster_name);
4522 if (!raft) {
4523 unixctl_command_reply_error(conn, "unknown cluster");
4524 return;
4525 }
4526
4527 struct raft_server *server = raft_lookup_server_best_match(raft,
4528 server_name);
4529 if (!server) {
4530 unixctl_command_reply_error(conn, "unknown server");
4531 return;
4532 }
4533
4534 if (uuid_equals(&server->sid, &raft->sid)) {
4535 raft_unixctl_leave__(conn, raft);
4536 } else if (raft->role == RAFT_LEADER) {
4537 const struct raft_remove_server_request rq = {
4538 .sid = server->sid,
4539 .requester_conn = conn,
4540 };
4541 raft_handle_remove_server_request(raft, &rq);
4542 } else {
4543 const union raft_rpc rpc = {
4544 .remove_server_request = {
4545 .common = {
4546 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
4547 .sid = raft->leader_sid,
4548 .comment = "via unixctl"
4549 },
4550 .sid = server->sid,
4551 }
4552 };
4553 if (raft_send(raft, &rpc)) {
4554 unixctl_command_reply(conn, "sent removal request to leader");
4555 } else {
4556 unixctl_command_reply_error(conn,
4557 "failed to send removal request");
4558 }
4559 }
4560 }
4561
4562 static void
4563 raft_get_election_timer_from_log(struct raft *raft)
4564 {
4565 if (raft->snap.election_timer) {
4566 raft->election_timer = raft->snap.election_timer;
4567 }
4568 for (uint64_t index = raft->commit_index; index >= raft->log_start;
4569 index--) {
4570 struct raft_entry *e = &raft->entries[index - raft->log_start];
4571 if (e->election_timer) {
4572 raft->election_timer = e->election_timer;
4573 break;
4574 }
4575 }
4576 }
4577
4578 static void
4579 raft_log_election_timer(struct raft *raft)
4580 {
4581 raft_command_unref(raft_command_execute__(raft, NULL, NULL,
4582 raft->election_timer_new, NULL,
4583 NULL));
4584 }
4585
4586 static void
4587 raft_unixctl_change_election_timer(struct unixctl_conn *conn,
4588 int argc OVS_UNUSED, const char *argv[],
4589 void *aux OVS_UNUSED)
4590 {
4591 const char *cluster_name = argv[1];
4592 const char *election_timer_str = argv[2];
4593
4594 struct raft *raft = raft_lookup_by_name(cluster_name);
4595 if (!raft) {
4596 unixctl_command_reply_error(conn, "unknown cluster");
4597 return;
4598 }
4599
4600 if (raft->role != RAFT_LEADER) {
4601 unixctl_command_reply_error(conn, "election timer must be changed"
4602 " through leader.");
4603 return;
4604 }
4605
4606 /* If there are pending changes for election timer, reject it. */
4607 if (raft->election_timer_new) {
4608 unixctl_command_reply_error(conn, "election timer change pending.");
4609 return;
4610 }
4611
4612 uint64_t election_timer = atoll(election_timer_str);
4613 if (election_timer == raft->election_timer) {
4614 unixctl_command_reply(conn, "change election timer to current value.");
4615 return;
4616 }
4617
4618 /* Election timer smaller than 100ms or bigger than 10min doesn't make
4619 * sense. */
4620 if (election_timer < 100 || election_timer > 600000) {
4621 unixctl_command_reply_error(conn, "election timer must be between "
4622 "100 and 600000, in msec.");
4623 return;
4624 }
4625
4626 /* If election timer is to be enlarged, it should be done gradually so that
4627 * it won't cause timeout when new value is applied on leader but not yet
4628 * applied on some of the followers. */
4629 if (election_timer > raft->election_timer * 2) {
4630 unixctl_command_reply_error(conn, "election timer increase should not "
4631 "exceed the current value x 2.");
4632 return;
4633 }
4634
4635 raft->election_timer_new = election_timer;
4636 raft_log_election_timer(raft);
4637 unixctl_command_reply(conn, "change of election timer initiated.");
4638 }
4639
4640 static void
4641 raft_unixctl_failure_test(struct unixctl_conn *conn OVS_UNUSED,
4642 int argc OVS_UNUSED, const char *argv[],
4643 void *aux OVS_UNUSED)
4644 {
4645 const char *test = argv[1];
4646 if (!strcmp(test, "crash-before-sending-append-request")) {
4647 failure_test = FT_CRASH_BEFORE_SEND_APPEND_REQ;
4648 } else if (!strcmp(test, "crash-after-sending-append-request")) {
4649 failure_test = FT_CRASH_AFTER_SEND_APPEND_REQ;
4650 } else if (!strcmp(test, "crash-before-sending-execute-command-reply")) {
4651 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REP;
4652 } else if (!strcmp(test, "crash-after-sending-execute-command-reply")) {
4653 failure_test = FT_CRASH_AFTER_SEND_EXEC_REP;
4654 } else if (!strcmp(test, "crash-before-sending-execute-command-request")) {
4655 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REQ;
4656 } else if (!strcmp(test, "crash-after-sending-execute-command-request")) {
4657 failure_test = FT_CRASH_AFTER_SEND_EXEC_REQ;
4658 } else if (!strcmp(test, "crash-after-receiving-append-request-update")) {
4659 failure_test = FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE;
4660 } else if (!strcmp(test, "delay-election")) {
4661 failure_test = FT_DELAY_ELECTION;
4662 struct raft *raft;
4663 HMAP_FOR_EACH (raft, hmap_node, &all_rafts) {
4664 if (raft->role == RAFT_FOLLOWER) {
4665 raft_reset_election_timer(raft);
4666 }
4667 }
4668 } else if (!strcmp(test, "clear")) {
4669 failure_test = FT_NO_TEST;
4670 unixctl_command_reply(conn, "test dismissed");
4671 return;
4672 } else {
4673 unixctl_command_reply_error(conn, "unknown test scenario");
4674 return;
4675 }
4676 unixctl_command_reply(conn, "test engaged");
4677 }
4678
4679 static void
4680 raft_init(void)
4681 {
4682 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
4683 if (!ovsthread_once_start(&once)) {
4684 return;
4685 }
4686 unixctl_command_register("cluster/cid", "DB", 1, 1,
4687 raft_unixctl_cid, NULL);
4688 unixctl_command_register("cluster/sid", "DB", 1, 1,
4689 raft_unixctl_sid, NULL);
4690 unixctl_command_register("cluster/status", "DB", 1, 1,
4691 raft_unixctl_status, NULL);
4692 unixctl_command_register("cluster/leave", "DB", 1, 1,
4693 raft_unixctl_leave, NULL);
4694 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4695 raft_unixctl_kick, NULL);
4696 unixctl_command_register("cluster/change-election-timer", "DB TIME", 2, 2,
4697 raft_unixctl_change_election_timer, NULL);
4698 unixctl_command_register("cluster/failure-test", "FAILURE SCENARIO", 1, 1,
4699 raft_unixctl_failure_test, NULL);
4700 ovsthread_once_done(&once);
4701 }