]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/raft.c
71c5a7e1aa18a44400b452c0d65450614415ed07
[mirror_ovs.git] / ovsdb / raft.c
1 /*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18
19 #include "raft.h"
20 #include "raft-private.h"
21
22 #include <errno.h>
23 #include <unistd.h>
24
25 #include "hash.h"
26 #include "jsonrpc.h"
27 #include "lockfile.h"
28 #include "openvswitch/dynamic-string.h"
29 #include "openvswitch/hmap.h"
30 #include "openvswitch/json.h"
31 #include "openvswitch/list.h"
32 #include "openvswitch/poll-loop.h"
33 #include "openvswitch/vlog.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-parser.h"
36 #include "ovsdb/log.h"
37 #include "raft-rpc.h"
38 #include "random.h"
39 #include "socket-util.h"
40 #include "stream.h"
41 #include "timeval.h"
42 #include "unicode.h"
43 #include "unixctl.h"
44 #include "util.h"
45 #include "uuid.h"
46
47 VLOG_DEFINE_THIS_MODULE(raft);
48
49 /* Roles for a Raft server:
50 *
51 * - Followers: Servers in touch with the current leader.
52 *
53 * - Candidate: Servers unaware of a current leader and seeking election to
54 * leader.
55 *
56 * - Leader: Handles all client requests. At most one at a time.
57 *
58 * In normal operation there is exactly one leader and all of the other servers
59 * are followers. */
60 enum raft_role {
61 RAFT_FOLLOWER,
62 RAFT_CANDIDATE,
63 RAFT_LEADER
64 };
65
66 /* Flags for unit tests. */
67 enum raft_failure_test {
68 FT_NO_TEST,
69 FT_CRASH_BEFORE_SEND_APPEND_REQ,
70 FT_CRASH_AFTER_SEND_APPEND_REQ,
71 FT_CRASH_BEFORE_SEND_EXEC_REP,
72 FT_CRASH_AFTER_SEND_EXEC_REP,
73 FT_CRASH_BEFORE_SEND_EXEC_REQ,
74 FT_CRASH_AFTER_SEND_EXEC_REQ,
75 FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE,
76 FT_DELAY_ELECTION
77 };
78 static enum raft_failure_test failure_test;
79
80 /* A connection between this Raft server and another one. */
81 struct raft_conn {
82 struct ovs_list list_node; /* In struct raft's 'conns' list. */
83 struct jsonrpc_session *js; /* JSON-RPC connection. */
84 struct uuid sid; /* This server's unique ID. */
85 char *nickname; /* Short name for use in log messages. */
86 bool incoming; /* True if incoming, false if outgoing. */
87 unsigned int js_seqno; /* Seqno for noticing (re)connections. */
88 };
89
90 static void raft_conn_close(struct raft_conn *);
91
92 /* A "command", that is, a request to append an entry to the log.
93 *
94 * The Raft specification only allows clients to issue commands to the leader.
95 * With this implementation, clients may issue a command on any server, which
96 * then relays the command to the leader if necessary.
97 *
98 * This structure is thus used in three cases:
99 *
100 * 1. We are the leader and the command was issued to us directly.
101 *
102 * 2. We are a follower and relayed the command to the leader.
103 *
104 * 3. We are the leader and a follower relayed the command to us.
105 */
106 struct raft_command {
107 /* All cases. */
108 struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
109 unsigned int n_refs; /* Reference count. */
110 enum raft_command_status status; /* Execution status. */
111 struct uuid eid; /* Entry ID of result. */
112
113 /* Case 1 only. */
114 uint64_t index; /* Index in log (0 if being relayed). */
115
116 /* Case 2 only. */
117 long long int timestamp; /* Issue or last ping time, for expiration. */
118
119 /* Case 3 only. */
120 struct uuid sid; /* The follower (otherwise UUID_ZERO). */
121 };
122
123 static void raft_command_complete(struct raft *, struct raft_command *,
124 enum raft_command_status);
125
126 static void raft_complete_all_commands(struct raft *,
127 enum raft_command_status);
128
129 /* Type of deferred action, see struct raft_waiter. */
130 enum raft_waiter_type {
131 RAFT_W_ENTRY,
132 RAFT_W_TERM,
133 RAFT_W_RPC,
134 };
135
136 /* An action deferred until a log write commits to disk. */
137 struct raft_waiter {
138 struct ovs_list list_node;
139 uint64_t commit_ticket;
140
141 enum raft_waiter_type type;
142 union {
143 /* RAFT_W_ENTRY.
144 *
145 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
146 * completion, updates 'log_synced' to indicate that the new log entry
147 * or entries are committed and, if we are leader, also updates our
148 * local 'match_index'. */
149 struct {
150 uint64_t index;
151 } entry;
152
153 /* RAFT_W_TERM.
154 *
155 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
156 * Upon completion, updates 'synced_term' and 'synced_vote', which
157 * triggers sending RPCs deferred by the uncommitted 'term' and
158 * 'vote'. */
159 struct {
160 uint64_t term;
161 struct uuid vote;
162 } term;
163
164 /* RAFT_W_RPC.
165 *
166 * Sometimes, sending an RPC to a peer must be delayed until an entry,
167 * a term, or a vote mentioned in the RPC is synced to disk. This
168 * waiter keeps a copy of such an RPC until the previous waiters have
169 * committed. */
170 union raft_rpc *rpc;
171 };
172 };
173
174 static struct raft_waiter *raft_waiter_create(struct raft *,
175 enum raft_waiter_type,
176 bool start_commit);
177 static void raft_waiters_destroy(struct raft *);
178
179 /* The Raft state machine. */
180 struct raft {
181 struct hmap_node hmap_node; /* In 'all_rafts'. */
182 struct ovsdb_log *log;
183
184 /* Persistent derived state.
185 *
186 * This must be updated on stable storage before responding to RPCs. It can be
187 * derived from the header, snapshot, and log in 'log'. */
188
189 struct uuid cid; /* Cluster ID (immutable for the cluster). */
190 struct uuid sid; /* Server ID (immutable for the server). */
191 char *local_address; /* Local address (immutable for the server). */
192 char *local_nickname; /* Used for local server in log messages. */
193 char *name; /* Schema name (immutable for the cluster). */
194
195 /* Contains "struct raft_server"s and represents the server configuration
196 * most recently added to 'log'. */
197 struct hmap servers;
198
199 /* Persistent state on all servers.
200 *
201 * Must be updated on stable storage before responding to RPCs. */
202
203 /* Current term and the vote for that term. These might be on the way to
204 * disk now. */
205 uint64_t term; /* Initialized to 0 and only increases. */
206 struct uuid vote; /* All-zeros if no vote yet in 'term'. */
207
208 /* The term and vote that have been synced to disk. */
209 uint64_t synced_term;
210 struct uuid synced_vote;
211
212 /* The log.
213 *
214 * A log entry with index 1 never really exists; the initial snapshot for a
215 * Raft is considered to include this index. The first real log entry has
216 * index 2.
217 *
218 * A new Raft instance contains an empty log: log_start=2, log_end=2.
219 * Over time, the log grows: log_start=2, log_end=N.
220 * At some point, the server takes a snapshot: log_start=N, log_end=N.
221 * The log continues to grow: log_start=N, log_end=N+1...
222 *
223 * Must be updated on stable storage before responding to RPCs. */
224 struct raft_entry *entries; /* Log entry i is in log[i - log_start]. */
225 uint64_t log_start; /* Index of first entry in log. */
226 uint64_t log_end; /* Index of last entry in log, plus 1. */
227 uint64_t log_synced; /* Index of last synced entry. */
228 size_t allocated_log; /* Allocated entries in 'log'. */
229
230 /* Snapshot state (see Figure 5.1)
231 *
232 * This is the state of the cluster as of the last discarded log entry,
233 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
234 * Only committed log entries can be included in a snapshot. */
235 struct raft_entry snap;
236
237 /* Volatile state.
238 *
239 * The snapshot is always committed, but the rest of the log might not be yet.
240 * 'last_applied' tracks what entries have been passed to the client. If the
241 * client hasn't yet read the latest snapshot, then even the snapshot isn't
242 * applied yet. Thus, the invariants are different for these members:
243 *
244 * log_start - 2 <= last_applied <= commit_index < log_end.
245 * log_start - 1 <= commit_index < log_end.
246 */
247
248 enum raft_role role; /* Current role. */
249 uint64_t commit_index; /* Max log index known to be committed. */
250 uint64_t last_applied; /* Max log index applied to state machine. */
251 struct uuid leader_sid; /* Server ID of leader (zero, if unknown). */
252
253 #define ELECTION_BASE_MSEC 1024
254 #define ELECTION_RANGE_MSEC 1024
255 long long int election_base; /* Time of last heartbeat from leader. */
256 long long int election_timeout; /* Time at which we start an election. */
257
258 /* Used for joining a cluster. */
259 bool joining; /* Attempting to join the cluster? */
260 struct sset remote_addresses; /* Addresses to try to find other servers. */
261 long long int join_timeout; /* Time to re-send add server request. */
262
263 /* Used for leaving a cluster. */
264 bool leaving; /* True if we are leaving the cluster. */
265 bool left; /* True if we have finished leaving. */
266 long long int leave_timeout; /* Time to re-send remove server request. */
267
268 /* Failure. */
269 bool failed; /* True if unrecoverable error has occurred. */
270
271 /* File synchronization. */
272 struct ovs_list waiters; /* Contains "struct raft_waiter"s. */
273
274 /* Network connections. */
275 struct pstream *listener; /* For connections from other Raft servers. */
276 long long int listen_backoff; /* For retrying creating 'listener'. */
277 struct ovs_list conns; /* Contains struct raft_conns. */
278
279 /* Leaders only. Reinitialized after becoming leader. */
280 struct hmap add_servers; /* Contains "struct raft_server"s to add. */
281 struct raft_server *remove_server; /* Server being removed. */
282 struct hmap commands; /* Contains "struct raft_command"s. */
283 #define PING_TIME_MSEC (ELECTION_BASE_MSEC / 3)
284 long long int ping_timeout; /* Time at which to send a heartbeat */
285
286 /* Candidates only. Reinitialized at start of election. */
287 int n_votes; /* Number of votes for me. */
288 bool candidate_retrying; /* The first round of election timed-out and it
289 is now retrying. */
290 };
291
292 /* All Raft structures. */
293 static struct hmap all_rafts = HMAP_INITIALIZER(&all_rafts);
294
295 static void raft_init(void);
296
297 static struct ovsdb_error *raft_read_header(struct raft *)
298 OVS_WARN_UNUSED_RESULT;
299
300 static void raft_send_execute_command_reply(struct raft *,
301 const struct uuid *sid,
302 const struct uuid *eid,
303 enum raft_command_status,
304 uint64_t commit_index);
305
306 static void raft_update_our_match_index(struct raft *, uint64_t min_index);
307
308 static void raft_send_remove_server_reply__(
309 struct raft *, const struct uuid *target_sid,
310 const struct uuid *requester_sid, struct unixctl_conn *requester_conn,
311 bool success, const char *comment);
312 static void raft_finished_leaving_cluster(struct raft *);
313
314 static void raft_server_init_leader(struct raft *, struct raft_server *);
315
316 static bool raft_rpc_is_heartbeat(const union raft_rpc *);
317 static bool raft_is_rpc_synced(const struct raft *, const union raft_rpc *);
318
319 static void raft_handle_rpc(struct raft *, const union raft_rpc *);
320
321 static bool raft_send_at(struct raft *, const union raft_rpc *,
322 int line_number);
323 #define raft_send(raft, rpc) raft_send_at(raft, rpc, __LINE__)
324
325 static bool raft_send_to_conn_at(struct raft *, const union raft_rpc *,
326 struct raft_conn *, int line_number);
327 #define raft_send_to_conn(raft, rpc, conn) \
328 raft_send_to_conn_at(raft, rpc, conn, __LINE__)
329
330 static void raft_send_append_request(struct raft *,
331 struct raft_server *, unsigned int n,
332 const char *comment);
333
334 static void raft_become_leader(struct raft *);
335 static void raft_become_follower(struct raft *);
336 static void raft_reset_election_timer(struct raft *);
337 static void raft_reset_ping_timer(struct raft *);
338 static void raft_send_heartbeats(struct raft *);
339 static void raft_start_election(struct raft *, bool leadership_transfer);
340 static bool raft_truncate(struct raft *, uint64_t new_end);
341 static void raft_get_servers_from_log(struct raft *, enum vlog_level);
342
343 static bool raft_handle_write_error(struct raft *, struct ovsdb_error *);
344
345 static void raft_run_reconfigure(struct raft *);
346
347 static struct raft_server *
348 raft_find_server(const struct raft *raft, const struct uuid *sid)
349 {
350 return raft_server_find(&raft->servers, sid);
351 }
352
353 static char *
354 raft_make_address_passive(const char *address_)
355 {
356 if (!strncmp(address_, "unix:", 5)) {
357 return xasprintf("p%s", address_);
358 } else {
359 char *address = xstrdup(address_);
360 char *host, *port;
361 inet_parse_host_port_tokens(strchr(address, ':') + 1, &host, &port);
362
363 struct ds paddr = DS_EMPTY_INITIALIZER;
364 ds_put_format(&paddr, "p%.3s:%s:", address, port);
365 if (strchr(host, ':')) {
366 ds_put_format(&paddr, "[%s]", host);
367 } else {
368 ds_put_cstr(&paddr, host);
369 }
370 free(address);
371 return ds_steal_cstr(&paddr);
372 }
373 }
374
375 static struct raft *
376 raft_alloc(void)
377 {
378 raft_init();
379
380 struct raft *raft = xzalloc(sizeof *raft);
381 hmap_node_nullify(&raft->hmap_node);
382 hmap_init(&raft->servers);
383 raft->log_start = raft->log_end = 1;
384 raft->role = RAFT_FOLLOWER;
385 sset_init(&raft->remote_addresses);
386 raft->join_timeout = LLONG_MAX;
387 ovs_list_init(&raft->waiters);
388 raft->listen_backoff = LLONG_MIN;
389 ovs_list_init(&raft->conns);
390 hmap_init(&raft->add_servers);
391 hmap_init(&raft->commands);
392
393 raft_reset_ping_timer(raft);
394 raft_reset_election_timer(raft);
395
396 return raft;
397 }
398
399 /* Creates an on-disk file that represents a new Raft cluster and initializes
400 * it to consist of a single server, the one on which this function is called.
401 *
402 * Creates the local copy of the cluster's log in 'file_name', which must not
403 * already exist. Gives it the name 'name', which should be the database
404 * schema name and which is used only to match up this database with the server
405 * added to the cluster later if the cluster ID is unavailable.
406 *
407 * The new server is located at 'local_address', which must take one of the
408 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
409 * square bracket enclosed IPv6 address and PORT is a TCP port number.
410 *
411 * This only creates the on-disk file. Use raft_open() to start operating the
412 * new server.
413 *
414 * Returns null if successful, otherwise an ovsdb_error describing the
415 * problem. */
416 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
417 raft_create_cluster(const char *file_name, const char *name,
418 const char *local_address, const struct json *data)
419 {
420 /* Parse and verify validity of the local address. */
421 struct ovsdb_error *error = raft_address_validate(local_address);
422 if (error) {
423 return error;
424 }
425
426 /* Create log file. */
427 struct ovsdb_log *log;
428 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
429 -1, &log);
430 if (error) {
431 return error;
432 }
433
434 /* Write log file. */
435 struct raft_header h = {
436 .sid = uuid_random(),
437 .cid = uuid_random(),
438 .name = xstrdup(name),
439 .local_address = xstrdup(local_address),
440 .joining = false,
441 .remote_addresses = SSET_INITIALIZER(&h.remote_addresses),
442 .snap_index = 1,
443 .snap = {
444 .term = 1,
445 .data = json_nullable_clone(data),
446 .eid = uuid_random(),
447 .servers = json_object_create(),
448 },
449 };
450 shash_add_nocopy(json_object(h.snap.servers),
451 xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
452 json_string_create(local_address));
453 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
454 raft_header_uninit(&h);
455 if (!error) {
456 error = ovsdb_log_commit_block(log);
457 }
458 ovsdb_log_close(log);
459
460 return error;
461 }
462
463 /* Creates a database file that represents a new server in an existing Raft
464 * cluster.
465 *
466 * Creates the local copy of the cluster's log in 'file_name', which must not
467 * already exist. Gives it the name 'name', which must be the same name
468 * passed in to raft_create_cluster() earlier.
469 *
470 * 'cid' is optional. If specified, the new server will join only the cluster
471 * with the given cluster ID.
472 *
473 * The new server is located at 'local_address', which must take one of the
474 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
475 * square bracket enclosed IPv6 address and PORT is a TCP port number.
476 *
477 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
478 * specifies the addresses of existing servers in the cluster. One server out
479 * of the existing cluster is sufficient, as long as that server is reachable
480 * and not partitioned from the current cluster leader. If multiple servers
481 * from the cluster are specified, then it is sufficient for any of them to
482 * meet this criterion.
483 *
484 * This only creates the on-disk file and does no network access. Use
485 * raft_open() to start operating the new server. (Until this happens, the
486 * new server has not joined the cluster.)
487 *
488 * Returns null if successful, otherwise an ovsdb_error describing the
489 * problem. */
490 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
491 raft_join_cluster(const char *file_name,
492 const char *name, const char *local_address,
493 const struct sset *remote_addresses,
494 const struct uuid *cid)
495 {
496 ovs_assert(!sset_is_empty(remote_addresses));
497
498 /* Parse and verify validity of the addresses. */
499 struct ovsdb_error *error = raft_address_validate(local_address);
500 if (error) {
501 return error;
502 }
503 const char *addr;
504 SSET_FOR_EACH (addr, remote_addresses) {
505 error = raft_address_validate(addr);
506 if (error) {
507 return error;
508 }
509 if (!strcmp(addr, local_address)) {
510 return ovsdb_error(NULL, "remote addresses cannot be the same "
511 "as the local address");
512 }
513 }
514
515 /* Verify validity of the cluster ID (if provided). */
516 if (cid && uuid_is_zero(cid)) {
517 return ovsdb_error(NULL, "all-zero UUID is not valid cluster ID");
518 }
519
520 /* Create log file. */
521 struct ovsdb_log *log;
522 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
523 -1, &log);
524 if (error) {
525 return error;
526 }
527
528 /* Write log file. */
529 struct raft_header h = {
530 .sid = uuid_random(),
531 .cid = cid ? *cid : UUID_ZERO,
532 .name = xstrdup(name),
533 .local_address = xstrdup(local_address),
534 .joining = true,
535 /* No snapshot yet. */
536 };
537 sset_clone(&h.remote_addresses, remote_addresses);
538 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
539 raft_header_uninit(&h);
540 if (!error) {
541 error = ovsdb_log_commit_block(log);
542 }
543 ovsdb_log_close(log);
544
545 return error;
546 }
547
548 /* Reads the initial header record from 'log', which must be a Raft clustered
549 * database log, and populates '*md' with the information read from it. The
550 * caller must eventually destroy 'md' with raft_metadata_destroy().
551 *
552 * On success, returns NULL. On failure, returns an error that the caller must
553 * eventually destroy and zeros '*md'. */
554 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
555 raft_read_metadata(struct ovsdb_log *log, struct raft_metadata *md)
556 {
557 struct raft *raft = raft_alloc();
558 raft->log = log;
559
560 struct ovsdb_error *error = raft_read_header(raft);
561 if (!error) {
562 md->sid = raft->sid;
563 md->name = xstrdup(raft->name);
564 md->local = xstrdup(raft->local_address);
565 md->cid = raft->cid;
566 } else {
567 memset(md, 0, sizeof *md);
568 }
569
570 raft->log = NULL;
571 raft_close(raft);
572 return error;
573 }
574
575 /* Frees the metadata in 'md'. */
576 void
577 raft_metadata_destroy(struct raft_metadata *md)
578 {
579 if (md) {
580 free(md->name);
581 free(md->local);
582 }
583 }
584
585 static const struct raft_entry *
586 raft_get_entry(const struct raft *raft, uint64_t index)
587 {
588 ovs_assert(index >= raft->log_start);
589 ovs_assert(index < raft->log_end);
590 return &raft->entries[index - raft->log_start];
591 }
592
593 static uint64_t
594 raft_get_term(const struct raft *raft, uint64_t index)
595 {
596 return (index == raft->log_start - 1
597 ? raft->snap.term
598 : raft_get_entry(raft, index)->term);
599 }
600
601 static const struct json *
602 raft_servers_for_index(const struct raft *raft, uint64_t index)
603 {
604 ovs_assert(index >= raft->log_start - 1);
605 ovs_assert(index < raft->log_end);
606
607 const struct json *servers = raft->snap.servers;
608 for (uint64_t i = raft->log_start; i <= index; i++) {
609 const struct raft_entry *e = raft_get_entry(raft, i);
610 if (e->servers) {
611 servers = e->servers;
612 }
613 }
614 return servers;
615 }
616
617 static void
618 raft_set_servers(struct raft *raft, const struct hmap *new_servers,
619 enum vlog_level level)
620 {
621 struct raft_server *s, *next;
622 HMAP_FOR_EACH_SAFE (s, next, hmap_node, &raft->servers) {
623 if (!raft_server_find(new_servers, &s->sid)) {
624 ovs_assert(s != raft->remove_server);
625
626 hmap_remove(&raft->servers, &s->hmap_node);
627 VLOG(level, "server %s removed from configuration", s->nickname);
628 raft_server_destroy(s);
629 }
630 }
631
632 HMAP_FOR_EACH_SAFE (s, next, hmap_node, new_servers) {
633 if (!raft_find_server(raft, &s->sid)) {
634 VLOG(level, "server %s added to configuration", s->nickname);
635
636 struct raft_server *new
637 = raft_server_add(&raft->servers, &s->sid, s->address);
638 raft_server_init_leader(raft, new);
639 }
640 }
641 }
642
643 static uint64_t
644 raft_add_entry(struct raft *raft,
645 uint64_t term, struct json *data, const struct uuid *eid,
646 struct json *servers)
647 {
648 if (raft->log_end - raft->log_start >= raft->allocated_log) {
649 raft->entries = x2nrealloc(raft->entries, &raft->allocated_log,
650 sizeof *raft->entries);
651 }
652
653 uint64_t index = raft->log_end++;
654 struct raft_entry *entry = &raft->entries[index - raft->log_start];
655 entry->term = term;
656 entry->data = data;
657 entry->eid = eid ? *eid : UUID_ZERO;
658 entry->servers = servers;
659 return index;
660 }
661
662 /* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers' to
663 * 'raft''s log and returns an error indication. */
664 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
665 raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
666 const struct uuid *eid, struct json *servers)
667 {
668 struct raft_record r = {
669 .type = RAFT_REC_ENTRY,
670 .term = term,
671 .entry = {
672 .index = raft_add_entry(raft, term, data, eid, servers),
673 .data = data,
674 .servers = servers,
675 .eid = eid ? *eid : UUID_ZERO,
676 },
677 };
678 return ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r));
679 }
680
681 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
682 raft_write_state(struct ovsdb_log *log,
683 uint64_t term, const struct uuid *vote)
684 {
685 struct raft_record r = { .term = term };
686 if (vote && !uuid_is_zero(vote)) {
687 r.type = RAFT_REC_VOTE;
688 r.sid = *vote;
689 } else {
690 r.type = RAFT_REC_TERM;
691 }
692 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
693 }
694
695 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
696 raft_apply_record(struct raft *raft, unsigned long long int rec_idx,
697 const struct raft_record *r)
698 {
699 /* Apply "term", which is present in most kinds of records (and otherwise
700 * 0).
701 *
702 * A Raft leader can replicate entries from previous terms to the other
703 * servers in the cluster, retaining the original terms on those entries
704 * (see section 3.6.2 "Committing entries from previous terms" for more
705 * information), so it's OK for the term in a log record to precede the
706 * current term. */
707 if (r->term > raft->term) {
708 raft->term = raft->synced_term = r->term;
709 raft->vote = raft->synced_vote = UUID_ZERO;
710 }
711
712 switch (r->type) {
713 case RAFT_REC_ENTRY:
714 if (r->entry.index < raft->commit_index) {
715 return ovsdb_error(NULL, "record %llu attempts to truncate log "
716 "from %"PRIu64" to %"PRIu64" entries, but "
717 "commit index is already %"PRIu64,
718 rec_idx, raft->log_end, r->entry.index,
719 raft->commit_index);
720 } else if (r->entry.index > raft->log_end) {
721 return ovsdb_error(NULL, "record %llu with index %"PRIu64" skips "
722 "past expected index %"PRIu64,
723 rec_idx, r->entry.index, raft->log_end);
724 }
725
726 if (r->entry.index < raft->log_end) {
727 /* This can happen, but it is notable. */
728 VLOG_DBG("record %llu truncates log from %"PRIu64" to %"PRIu64
729 " entries", rec_idx, raft->log_end, r->entry.index);
730 raft_truncate(raft, r->entry.index);
731 }
732
733 uint64_t prev_term = (raft->log_end > raft->log_start
734 ? raft->entries[raft->log_end
735 - raft->log_start - 1].term
736 : raft->snap.term);
737 if (r->term < prev_term) {
738 return ovsdb_error(NULL, "record %llu with index %"PRIu64" term "
739 "%"PRIu64" precedes previous entry's term "
740 "%"PRIu64,
741 rec_idx, r->entry.index, r->term, prev_term);
742 }
743
744 raft->log_synced = raft_add_entry(
745 raft, r->term,
746 json_nullable_clone(r->entry.data), &r->entry.eid,
747 json_nullable_clone(r->entry.servers));
748 return NULL;
749
750 case RAFT_REC_TERM:
751 return NULL;
752
753 case RAFT_REC_VOTE:
754 if (r->term < raft->term) {
755 return ovsdb_error(NULL, "record %llu votes for term %"PRIu64" "
756 "but current term is %"PRIu64,
757 rec_idx, r->term, raft->term);
758 } else if (!uuid_is_zero(&raft->vote)
759 && !uuid_equals(&raft->vote, &r->sid)) {
760 return ovsdb_error(NULL, "record %llu votes for "SID_FMT" in term "
761 "%"PRIu64" but a previous record for the "
762 "same term voted for "SID_FMT, rec_idx,
763 SID_ARGS(&raft->vote), r->term,
764 SID_ARGS(&r->sid));
765 } else {
766 raft->vote = raft->synced_vote = r->sid;
767 return NULL;
768 }
769 break;
770
771 case RAFT_REC_NOTE:
772 if (!strcmp(r->note, "left")) {
773 return ovsdb_error(NULL, "record %llu indicates server has left "
774 "the cluster; it cannot be added back (use "
775 "\"ovsdb-tool join-cluster\" to add a new "
776 "server)", rec_idx);
777 }
778 return NULL;
779
780 case RAFT_REC_COMMIT_INDEX:
781 if (r->commit_index < raft->commit_index) {
782 return ovsdb_error(NULL, "record %llu regresses commit index "
783 "from %"PRIu64 " to %"PRIu64,
784 rec_idx, raft->commit_index, r->commit_index);
785 } else if (r->commit_index >= raft->log_end) {
786 return ovsdb_error(NULL, "record %llu advances commit index to "
787 "%"PRIu64 " but last log index is %"PRIu64,
788 rec_idx, r->commit_index, raft->log_end - 1);
789 } else {
790 raft->commit_index = r->commit_index;
791 return NULL;
792 }
793 break;
794
795 case RAFT_REC_LEADER:
796 /* XXX we could use this to take back leadership for quick restart */
797 return NULL;
798
799 default:
800 OVS_NOT_REACHED();
801 }
802 }
803
804 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
805 raft_read_header(struct raft *raft)
806 {
807 /* Read header record. */
808 struct json *json;
809 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
810 if (error || !json) {
811 /* Report error or end-of-file. */
812 return error;
813 }
814 ovsdb_log_mark_base(raft->log);
815
816 struct raft_header h;
817 error = raft_header_from_json(&h, json);
818 json_destroy(json);
819 if (error) {
820 return error;
821 }
822
823 raft->sid = h.sid;
824 raft->cid = h.cid;
825 raft->name = xstrdup(h.name);
826 raft->local_address = xstrdup(h.local_address);
827 raft->local_nickname = raft_address_to_nickname(h.local_address, &h.sid);
828 raft->joining = h.joining;
829
830 if (h.joining) {
831 sset_clone(&raft->remote_addresses, &h.remote_addresses);
832 } else {
833 raft_entry_clone(&raft->snap, &h.snap);
834 raft->log_start = raft->log_end = h.snap_index + 1;
835 raft->commit_index = h.snap_index;
836 raft->last_applied = h.snap_index - 1;
837 }
838
839 raft_header_uninit(&h);
840
841 return NULL;
842 }
843
844 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
845 raft_read_log(struct raft *raft)
846 {
847 for (unsigned long long int i = 1; ; i++) {
848 struct json *json;
849 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
850 if (!json) {
851 if (error) {
852 /* We assume that the error is due to a partial write while
853 * appending to the file before a crash, so log it and
854 * continue. */
855 char *error_string = ovsdb_error_to_string_free(error);
856 VLOG_WARN("%s", error_string);
857 free(error_string);
858 error = NULL;
859 }
860 break;
861 }
862
863 struct raft_record r;
864 error = raft_record_from_json(&r, json);
865 if (!error) {
866 error = raft_apply_record(raft, i, &r);
867 raft_record_uninit(&r);
868 }
869 if (error) {
870 return ovsdb_wrap_error(error, "error reading record %llu from "
871 "%s log", i, raft->name);
872 }
873 }
874
875 /* Set the most recent servers. */
876 raft_get_servers_from_log(raft, VLL_DBG);
877
878 return NULL;
879 }
880
881 static void
882 raft_reset_election_timer(struct raft *raft)
883 {
884 unsigned int duration = (ELECTION_BASE_MSEC
885 + random_range(ELECTION_RANGE_MSEC));
886 raft->election_base = time_msec();
887 if (failure_test == FT_DELAY_ELECTION) {
888 /* Slow down this node so that it won't win the next election. */
889 duration += ELECTION_BASE_MSEC;
890 }
891 raft->election_timeout = raft->election_base + duration;
892 }
893
894 static void
895 raft_reset_ping_timer(struct raft *raft)
896 {
897 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
898 }
899
900 static void
901 raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
902 const struct uuid *sid, bool incoming)
903 {
904 struct raft_conn *conn = xzalloc(sizeof *conn);
905 ovs_list_push_back(&raft->conns, &conn->list_node);
906 conn->js = js;
907 if (sid) {
908 conn->sid = *sid;
909 }
910 conn->nickname = raft_address_to_nickname(jsonrpc_session_get_name(js),
911 &conn->sid);
912 conn->incoming = incoming;
913 conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
914 }
915
916 /* Starts the local server in an existing Raft cluster, using the local copy of
917 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
918 * successful or not. */
919 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
920 raft_open(struct ovsdb_log *log, struct raft **raftp)
921 {
922 struct raft *raft = raft_alloc();
923 raft->log = log;
924
925 struct ovsdb_error *error = raft_read_header(raft);
926 if (error) {
927 goto error;
928 }
929
930 if (!raft->joining) {
931 error = raft_read_log(raft);
932 if (error) {
933 goto error;
934 }
935
936 /* Find our own server. */
937 if (!raft_find_server(raft, &raft->sid)) {
938 error = ovsdb_error(NULL, "server does not belong to cluster");
939 goto error;
940 }
941
942 /* If there's only one server, start an election right away so that the
943 * cluster bootstraps quickly. */
944 if (hmap_count(&raft->servers) == 1) {
945 raft_start_election(raft, false);
946 }
947 } else {
948 raft->join_timeout = time_msec() + 1000;
949 }
950
951 *raftp = raft;
952 hmap_insert(&all_rafts, &raft->hmap_node, hash_string(raft->name, 0));
953 return NULL;
954
955 error:
956 raft_close(raft);
957 *raftp = NULL;
958 return error;
959 }
960
961 /* Returns the name of 'raft', which in OVSDB is the database schema name. */
962 const char *
963 raft_get_name(const struct raft *raft)
964 {
965 return raft->name;
966 }
967
968 /* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
969 * its cluster, then 'cid' will be all-zeros (unless the administrator
970 * specified a cluster ID running "ovsdb-tool join-cluster").
971 *
972 * Each cluster has a unique cluster ID. */
973 const struct uuid *
974 raft_get_cid(const struct raft *raft)
975 {
976 return &raft->cid;
977 }
978
979 /* Returns the server ID of 'raft'. Each server has a unique server ID. */
980 const struct uuid *
981 raft_get_sid(const struct raft *raft)
982 {
983 return &raft->sid;
984 }
985
986 /* Returns true if 'raft' has completed joining its cluster, has not left or
987 * initiated leaving the cluster, does not have failed disk storage, and is
988 * apparently connected to the leader in a healthy way (or is itself the
989 * leader).
990 *
991 * If 'raft' is candidate:
992 * a) if it is the first round of election, consider it as connected, hoping
993 * it will successfully elect a new leader soon.
994 * b) if it is already retrying, consider it as disconnected (so that clients
995 * may decide to reconnect to other members). */
996 bool
997 raft_is_connected(const struct raft *raft)
998 {
999 return (!(raft->role == RAFT_CANDIDATE && raft->candidate_retrying)
1000 && !raft->joining
1001 && !raft->leaving
1002 && !raft->left
1003 && !raft->failed);
1004 }
1005
1006 /* Returns true if 'raft' is the cluster leader. */
1007 bool
1008 raft_is_leader(const struct raft *raft)
1009 {
1010 return raft->role == RAFT_LEADER;
1011 }
1012
1013 /* Returns true if 'raft' is the process of joining its cluster. */
1014 bool
1015 raft_is_joining(const struct raft *raft)
1016 {
1017 return raft->joining;
1018 }
1019
1020 /* Only returns *connected* connections. */
1021 static struct raft_conn *
1022 raft_find_conn_by_sid(struct raft *raft, const struct uuid *sid)
1023 {
1024 if (!uuid_is_zero(sid)) {
1025 struct raft_conn *conn;
1026 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1027 if (uuid_equals(sid, &conn->sid)
1028 && jsonrpc_session_is_connected(conn->js)) {
1029 return conn;
1030 }
1031 }
1032 }
1033 return NULL;
1034 }
1035
1036 static struct raft_conn *
1037 raft_find_conn_by_address(struct raft *raft, const char *address)
1038 {
1039 struct raft_conn *conn;
1040 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1041 if (!strcmp(jsonrpc_session_get_name(conn->js), address)) {
1042 return conn;
1043 }
1044 }
1045 return NULL;
1046 }
1047
1048 static void OVS_PRINTF_FORMAT(3, 4)
1049 raft_record_note(struct raft *raft, const char *note,
1050 const char *comment_format, ...)
1051 {
1052 va_list args;
1053 va_start(args, comment_format);
1054 char *comment = xvasprintf(comment_format, args);
1055 va_end(args);
1056
1057 struct raft_record r = {
1058 .type = RAFT_REC_NOTE,
1059 .comment = comment,
1060 .note = CONST_CAST(char *, note),
1061 };
1062 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
1063
1064 free(comment);
1065 }
1066
1067 /* If we're leader, try to transfer leadership to another server, logging
1068 * 'reason' as the human-readable reason (it should be a phrase suitable for
1069 * following "because") . */
1070 void
1071 raft_transfer_leadership(struct raft *raft, const char *reason)
1072 {
1073 if (raft->role != RAFT_LEADER) {
1074 return;
1075 }
1076
1077 struct raft_server *s;
1078 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1079 if (!uuid_equals(&raft->sid, &s->sid)
1080 && s->phase == RAFT_PHASE_STABLE) {
1081 struct raft_conn *conn = raft_find_conn_by_sid(raft, &s->sid);
1082 if (!conn) {
1083 continue;
1084 }
1085
1086 union raft_rpc rpc = {
1087 .become_leader = {
1088 .common = {
1089 .comment = CONST_CAST(char *, reason),
1090 .type = RAFT_RPC_BECOME_LEADER,
1091 .sid = s->sid,
1092 },
1093 .term = raft->term,
1094 }
1095 };
1096 raft_send_to_conn(raft, &rpc, conn);
1097
1098 raft_record_note(raft, "transfer leadership",
1099 "transferring leadership to %s because %s",
1100 s->nickname, reason);
1101 break;
1102 }
1103 }
1104 }
1105
1106 /* Send a RemoveServerRequest to the rest of the servers in the cluster.
1107 *
1108 * If we know which server is the leader, we can just send the request to it.
1109 * However, we might not know which server is the leader, and we might never
1110 * find out if the remove request was actually previously committed by a
1111 * majority of the servers (because in that case the new leader will not send
1112 * AppendRequests or heartbeats to us). Therefore, we instead send
1113 * RemoveRequests to every server. This theoretically has the same problem, if
1114 * the current cluster leader was not previously a member of the cluster, but
1115 * it seems likely to be more robust in practice. */
1116 static void
1117 raft_send_remove_server_requests(struct raft *raft)
1118 {
1119 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1120 VLOG_INFO_RL(&rl, "sending remove request (joining=%s, leaving=%s)",
1121 raft->joining ? "true" : "false",
1122 raft->leaving ? "true" : "false");
1123 const struct raft_server *s;
1124 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1125 if (!uuid_equals(&s->sid, &raft->sid)) {
1126 union raft_rpc rpc = (union raft_rpc) {
1127 .remove_server_request = {
1128 .common = {
1129 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1130 .sid = s->sid,
1131 },
1132 .sid = raft->sid,
1133 },
1134 };
1135 raft_send(raft, &rpc);
1136 }
1137 }
1138
1139 raft->leave_timeout = time_msec() + ELECTION_BASE_MSEC;
1140 }
1141
1142 /* Attempts to start 'raft' leaving its cluster. The caller can check progress
1143 * using raft_is_leaving() and raft_left(). */
1144 void
1145 raft_leave(struct raft *raft)
1146 {
1147 if (raft->joining || raft->failed || raft->leaving || raft->left) {
1148 return;
1149 }
1150 VLOG_INFO(SID_FMT": starting to leave cluster "CID_FMT,
1151 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1152 raft->leaving = true;
1153 raft_transfer_leadership(raft, "this server is leaving the cluster");
1154 raft_become_follower(raft);
1155 raft_send_remove_server_requests(raft);
1156 raft->leave_timeout = time_msec() + ELECTION_BASE_MSEC;
1157 }
1158
1159 /* Returns true if 'raft' is currently attempting to leave its cluster. */
1160 bool
1161 raft_is_leaving(const struct raft *raft)
1162 {
1163 return raft->leaving;
1164 }
1165
1166 /* Returns true if 'raft' successfully left its cluster. */
1167 bool
1168 raft_left(const struct raft *raft)
1169 {
1170 return raft->left;
1171 }
1172
1173 /* Returns true if 'raft' has experienced a disk I/O failure. When this
1174 * returns true, only closing and reopening 'raft' allows for recovery. */
1175 bool
1176 raft_failed(const struct raft *raft)
1177 {
1178 return raft->failed;
1179 }
1180
1181 /* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1182 * current cluster. */
1183 void
1184 raft_take_leadership(struct raft *raft)
1185 {
1186 if (raft->role != RAFT_LEADER) {
1187 raft_start_election(raft, true);
1188 }
1189 }
1190
1191 /* Closes everything owned by 'raft' that might be visible outside the process:
1192 * network connections, commands, etc. This is part of closing 'raft'; it is
1193 * also used if 'raft' has failed in an unrecoverable way. */
1194 static void
1195 raft_close__(struct raft *raft)
1196 {
1197 if (!hmap_node_is_null(&raft->hmap_node)) {
1198 hmap_remove(&all_rafts, &raft->hmap_node);
1199 hmap_node_nullify(&raft->hmap_node);
1200 }
1201
1202 raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
1203
1204 struct raft_server *rs = raft->remove_server;
1205 if (rs) {
1206 raft_send_remove_server_reply__(raft, &rs->sid, &rs->requester_sid,
1207 rs->requester_conn, false,
1208 RAFT_SERVER_SHUTDOWN);
1209 raft_server_destroy(raft->remove_server);
1210 raft->remove_server = NULL;
1211 }
1212
1213 struct raft_conn *conn, *next;
1214 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1215 raft_conn_close(conn);
1216 }
1217 }
1218
1219 /* Closes and frees 'raft'.
1220 *
1221 * A server's cluster membership is independent of whether the server is
1222 * actually running. When a server that is a member of a cluster closes, the
1223 * cluster treats this as a server failure. */
1224 void
1225 raft_close(struct raft *raft)
1226 {
1227 if (!raft) {
1228 return;
1229 }
1230
1231 raft_transfer_leadership(raft, "this server is shutting down");
1232
1233 raft_close__(raft);
1234
1235 ovsdb_log_close(raft->log);
1236
1237 raft_servers_destroy(&raft->servers);
1238
1239 for (uint64_t index = raft->log_start; index < raft->log_end; index++) {
1240 struct raft_entry *e = &raft->entries[index - raft->log_start];
1241 raft_entry_uninit(e);
1242 }
1243 free(raft->entries);
1244
1245 raft_entry_uninit(&raft->snap);
1246
1247 raft_waiters_destroy(raft);
1248
1249 raft_servers_destroy(&raft->add_servers);
1250
1251 hmap_destroy(&raft->commands);
1252
1253 pstream_close(raft->listener);
1254
1255 sset_destroy(&raft->remote_addresses);
1256 free(raft->local_address);
1257 free(raft->local_nickname);
1258 free(raft->name);
1259
1260 free(raft);
1261 }
1262
1263 static bool
1264 raft_conn_receive(struct raft *raft, struct raft_conn *conn,
1265 union raft_rpc *rpc)
1266 {
1267 struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
1268 if (!msg) {
1269 return false;
1270 }
1271
1272 struct ovsdb_error *error = raft_rpc_from_jsonrpc(&raft->cid, &raft->sid,
1273 msg, rpc);
1274 jsonrpc_msg_destroy(msg);
1275 if (error) {
1276 char *s = ovsdb_error_to_string_free(error);
1277 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn->js), s);
1278 free(s);
1279 return false;
1280 }
1281
1282 if (uuid_is_zero(&conn->sid)) {
1283 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1284 conn->sid = rpc->common.sid;
1285 VLOG_INFO_RL(&rl, "%s: learned server ID "SID_FMT,
1286 jsonrpc_session_get_name(conn->js), SID_ARGS(&conn->sid));
1287 } else if (!uuid_equals(&conn->sid, &rpc->common.sid)) {
1288 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1289 VLOG_WARN_RL(&rl, "%s: ignoring message with unexpected server ID "
1290 SID_FMT" (expected "SID_FMT")",
1291 jsonrpc_session_get_name(conn->js),
1292 SID_ARGS(&rpc->common.sid), SID_ARGS(&conn->sid));
1293 raft_rpc_uninit(rpc);
1294 return false;
1295 }
1296
1297 const char *address = (rpc->type == RAFT_RPC_HELLO_REQUEST
1298 ? rpc->hello_request.address
1299 : rpc->type == RAFT_RPC_ADD_SERVER_REQUEST
1300 ? rpc->add_server_request.address
1301 : NULL);
1302 if (address) {
1303 char *new_nickname = raft_address_to_nickname(address, &conn->sid);
1304 if (strcmp(conn->nickname, new_nickname)) {
1305 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1306 VLOG_INFO_RL(&rl, "%s: learned remote address %s",
1307 jsonrpc_session_get_name(conn->js), address);
1308
1309 free(conn->nickname);
1310 conn->nickname = new_nickname;
1311 } else {
1312 free(new_nickname);
1313 }
1314 }
1315
1316 return true;
1317 }
1318
1319 static const char *
1320 raft_get_nickname(const struct raft *raft, const struct uuid *sid,
1321 char buf[SID_LEN + 1], size_t bufsize)
1322 {
1323 if (uuid_equals(sid, &raft->sid)) {
1324 return raft->local_nickname;
1325 }
1326
1327 const char *s = raft_servers_get_nickname__(&raft->servers, sid);
1328 if (s) {
1329 return s;
1330 }
1331
1332 return raft_servers_get_nickname(&raft->add_servers, sid, buf, bufsize);
1333 }
1334
1335 static void
1336 log_rpc(const union raft_rpc *rpc, const char *direction,
1337 const struct raft_conn *conn, int line_number)
1338 {
1339 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
1340 if (!raft_rpc_is_heartbeat(rpc) && !VLOG_DROP_DBG(&rl)) {
1341 struct ds s = DS_EMPTY_INITIALIZER;
1342 if (line_number) {
1343 ds_put_format(&s, "raft.c:%d ", line_number);
1344 }
1345 ds_put_format(&s, "%s%s ", direction, conn->nickname);
1346 raft_rpc_format(rpc, &s);
1347 VLOG_DBG("%s", ds_cstr(&s));
1348 ds_destroy(&s);
1349 }
1350 }
1351
1352 static void
1353 raft_send_add_server_request(struct raft *raft, struct raft_conn *conn)
1354 {
1355 union raft_rpc rq = {
1356 .add_server_request = {
1357 .common = {
1358 .type = RAFT_RPC_ADD_SERVER_REQUEST,
1359 .sid = UUID_ZERO,
1360 .comment = NULL,
1361 },
1362 .address = raft->local_address,
1363 },
1364 };
1365 raft_send_to_conn(raft, &rq, conn);
1366 }
1367
1368 static void
1369 raft_conn_run(struct raft *raft, struct raft_conn *conn)
1370 {
1371 jsonrpc_session_run(conn->js);
1372
1373 unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
1374 bool just_connected = (new_seqno != conn->js_seqno
1375 && jsonrpc_session_is_connected(conn->js));
1376 conn->js_seqno = new_seqno;
1377 if (just_connected) {
1378 if (raft->joining) {
1379 raft_send_add_server_request(raft, conn);
1380 } else if (raft->leaving) {
1381 union raft_rpc rq = {
1382 .remove_server_request = {
1383 .common = {
1384 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1385 .sid = conn->sid,
1386 },
1387 .sid = raft->sid,
1388 },
1389 };
1390 raft_send_to_conn(raft, &rq, conn);
1391 } else {
1392 union raft_rpc rq = (union raft_rpc) {
1393 .hello_request = {
1394 .common = {
1395 .type = RAFT_RPC_HELLO_REQUEST,
1396 .sid = conn->sid,
1397 },
1398 .address = raft->local_address,
1399 },
1400 };
1401 raft_send_to_conn(raft, &rq, conn);
1402 }
1403 }
1404
1405 for (size_t i = 0; i < 50; i++) {
1406 union raft_rpc rpc;
1407 if (!raft_conn_receive(raft, conn, &rpc)) {
1408 break;
1409 }
1410
1411 log_rpc(&rpc, "<--", conn, 0);
1412 raft_handle_rpc(raft, &rpc);
1413 raft_rpc_uninit(&rpc);
1414 }
1415 }
1416
1417 static void
1418 raft_waiter_complete_rpc(struct raft *raft, const union raft_rpc *rpc)
1419 {
1420 uint64_t term = raft_rpc_get_term(rpc);
1421 if (term && term < raft->term) {
1422 /* Drop the message because it's for an expired term. */
1423 return;
1424 }
1425
1426 if (!raft_is_rpc_synced(raft, rpc)) {
1427 /* This is a bug. A reply message is deferred because some state in
1428 * the message, such as a term or index, has not been committed to
1429 * disk, and they should only be completed when that commit is done.
1430 * But this message is being completed before the commit is finished.
1431 * Complain, and hope that someone reports the bug. */
1432 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1433 if (VLOG_DROP_ERR(&rl)) {
1434 return;
1435 }
1436
1437 struct ds s = DS_EMPTY_INITIALIZER;
1438
1439 if (term > raft->synced_term) {
1440 ds_put_format(&s, " because message term %"PRIu64" is "
1441 "past synced term %"PRIu64,
1442 term, raft->synced_term);
1443 }
1444
1445 uint64_t index = raft_rpc_get_min_sync_index(rpc);
1446 if (index > raft->log_synced) {
1447 ds_put_format(&s, " %s message index %"PRIu64" is past last "
1448 "synced index %"PRIu64,
1449 s.length ? "and" : "because",
1450 index, raft->log_synced);
1451 }
1452
1453 const struct uuid *vote = raft_rpc_get_vote(rpc);
1454 if (vote && !uuid_equals(vote, &raft->synced_vote)) {
1455 char buf1[SID_LEN + 1];
1456 char buf2[SID_LEN + 1];
1457 ds_put_format(&s, " %s vote %s differs from synced vote %s",
1458 s.length ? "and" : "because",
1459 raft_get_nickname(raft, vote, buf1, sizeof buf1),
1460 raft_get_nickname(raft, &raft->synced_vote,
1461 buf2, sizeof buf2));
1462 }
1463
1464 char buf[SID_LEN + 1];
1465 ds_put_format(&s, ": %s ",
1466 raft_get_nickname(raft, &rpc->common.sid,
1467 buf, sizeof buf));
1468 raft_rpc_format(rpc, &s);
1469 VLOG_ERR("internal error: deferred %s message completed "
1470 "but not ready to send%s",
1471 raft_rpc_type_to_string(rpc->type), ds_cstr(&s));
1472 ds_destroy(&s);
1473
1474 return;
1475 }
1476
1477 struct raft_conn *dst = raft_find_conn_by_sid(raft, &rpc->common.sid);
1478 if (dst) {
1479 raft_send_to_conn(raft, rpc, dst);
1480 }
1481 }
1482
1483 static void
1484 raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
1485 {
1486 switch (w->type) {
1487 case RAFT_W_ENTRY:
1488 if (raft->role == RAFT_LEADER) {
1489 raft_update_our_match_index(raft, w->entry.index);
1490 }
1491 raft->log_synced = w->entry.index;
1492 break;
1493
1494 case RAFT_W_TERM:
1495 raft->synced_term = w->term.term;
1496 raft->synced_vote = w->term.vote;
1497 break;
1498
1499 case RAFT_W_RPC:
1500 raft_waiter_complete_rpc(raft, w->rpc);
1501 break;
1502 }
1503 }
1504
1505 static void
1506 raft_waiter_destroy(struct raft_waiter *w)
1507 {
1508 if (!w) {
1509 return;
1510 }
1511
1512 ovs_list_remove(&w->list_node);
1513
1514 switch (w->type) {
1515 case RAFT_W_ENTRY:
1516 case RAFT_W_TERM:
1517 break;
1518
1519 case RAFT_W_RPC:
1520 raft_rpc_uninit(w->rpc);
1521 free(w->rpc);
1522 break;
1523 }
1524 free(w);
1525 }
1526
1527 static void
1528 raft_waiters_run(struct raft *raft)
1529 {
1530 if (ovs_list_is_empty(&raft->waiters)) {
1531 return;
1532 }
1533
1534 uint64_t cur = ovsdb_log_commit_progress(raft->log);
1535 struct raft_waiter *w, *next;
1536 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1537 if (cur < w->commit_ticket) {
1538 break;
1539 }
1540 raft_waiter_complete(raft, w);
1541 raft_waiter_destroy(w);
1542 }
1543 }
1544
1545 static void
1546 raft_waiters_wait(struct raft *raft)
1547 {
1548 struct raft_waiter *w;
1549 LIST_FOR_EACH (w, list_node, &raft->waiters) {
1550 ovsdb_log_commit_wait(raft->log, w->commit_ticket);
1551 break;
1552 }
1553 }
1554
1555 static void
1556 raft_waiters_destroy(struct raft *raft)
1557 {
1558 struct raft_waiter *w, *next;
1559 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1560 raft_waiter_destroy(w);
1561 }
1562 }
1563
1564 static bool OVS_WARN_UNUSED_RESULT
1565 raft_set_term(struct raft *raft, uint64_t term, const struct uuid *vote)
1566 {
1567 struct ovsdb_error *error = raft_write_state(raft->log, term, vote);
1568 if (!raft_handle_write_error(raft, error)) {
1569 return false;
1570 }
1571
1572 struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_TERM, true);
1573 raft->term = w->term.term = term;
1574 raft->vote = w->term.vote = vote ? *vote : UUID_ZERO;
1575 return true;
1576 }
1577
1578 static void
1579 raft_accept_vote(struct raft *raft, struct raft_server *s,
1580 const struct uuid *vote)
1581 {
1582 if (uuid_equals(&s->vote, vote)) {
1583 return;
1584 }
1585 if (!uuid_is_zero(&s->vote)) {
1586 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1587 char buf1[SID_LEN + 1];
1588 char buf2[SID_LEN + 1];
1589 VLOG_WARN_RL(&rl, "server %s changed its vote from %s to %s",
1590 s->nickname,
1591 raft_get_nickname(raft, &s->vote, buf1, sizeof buf1),
1592 raft_get_nickname(raft, vote, buf2, sizeof buf2));
1593 }
1594 s->vote = *vote;
1595 if (uuid_equals(vote, &raft->sid)
1596 && ++raft->n_votes > hmap_count(&raft->servers) / 2) {
1597 raft_become_leader(raft);
1598 }
1599 }
1600
1601 static void
1602 raft_start_election(struct raft *raft, bool leadership_transfer)
1603 {
1604 if (raft->leaving) {
1605 return;
1606 }
1607
1608 struct raft_server *me = raft_find_server(raft, &raft->sid);
1609 if (!me) {
1610 return;
1611 }
1612
1613 if (!raft_set_term(raft, raft->term + 1, &raft->sid)) {
1614 return;
1615 }
1616
1617 ovs_assert(raft->role != RAFT_LEADER);
1618 raft->candidate_retrying = (raft->role == RAFT_CANDIDATE);
1619 raft->role = RAFT_CANDIDATE;
1620
1621 raft->n_votes = 0;
1622
1623 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1624 if (!VLOG_DROP_INFO(&rl)) {
1625 long long int now = time_msec();
1626 if (now >= raft->election_timeout) {
1627 VLOG_INFO("term %"PRIu64": %lld ms timeout expired, "
1628 "starting election",
1629 raft->term, now - raft->election_base);
1630 } else {
1631 VLOG_INFO("term %"PRIu64": starting election", raft->term);
1632 }
1633 }
1634 raft_reset_election_timer(raft);
1635
1636 struct raft_server *peer;
1637 HMAP_FOR_EACH (peer, hmap_node, &raft->servers) {
1638 peer->vote = UUID_ZERO;
1639 if (uuid_equals(&raft->sid, &peer->sid)) {
1640 continue;
1641 }
1642
1643 union raft_rpc rq = {
1644 .vote_request = {
1645 .common = {
1646 .type = RAFT_RPC_VOTE_REQUEST,
1647 .sid = peer->sid,
1648 },
1649 .term = raft->term,
1650 .last_log_index = raft->log_end - 1,
1651 .last_log_term = (
1652 raft->log_end > raft->log_start
1653 ? raft->entries[raft->log_end - raft->log_start - 1].term
1654 : raft->snap.term),
1655 .leadership_transfer = leadership_transfer,
1656 },
1657 };
1658 raft_send(raft, &rq);
1659 }
1660
1661 /* Vote for ourselves. */
1662 raft_accept_vote(raft, me, &raft->sid);
1663 }
1664
1665 static void
1666 raft_open_conn(struct raft *raft, const char *address, const struct uuid *sid)
1667 {
1668 if (strcmp(address, raft->local_address)
1669 && !raft_find_conn_by_address(raft, address)) {
1670 raft_add_conn(raft, jsonrpc_session_open(address, true), sid, false);
1671 }
1672 }
1673
1674 static void
1675 raft_conn_close(struct raft_conn *conn)
1676 {
1677 jsonrpc_session_close(conn->js);
1678 ovs_list_remove(&conn->list_node);
1679 free(conn->nickname);
1680 free(conn);
1681 }
1682
1683 /* Returns true if 'conn' should stay open, false if it should be closed. */
1684 static bool
1685 raft_conn_should_stay_open(struct raft *raft, struct raft_conn *conn)
1686 {
1687 /* Close the connection if it's actually dead. If necessary, we'll
1688 * initiate a new session later. */
1689 if (!jsonrpc_session_is_alive(conn->js)) {
1690 return false;
1691 }
1692
1693 /* Keep incoming sessions. We trust the originator to decide to drop
1694 * it. */
1695 if (conn->incoming) {
1696 return true;
1697 }
1698
1699 /* If we are joining the cluster, keep sessions to the remote addresses
1700 * that are supposed to be part of the cluster we're joining. */
1701 if (raft->joining && sset_contains(&raft->remote_addresses,
1702 jsonrpc_session_get_name(conn->js))) {
1703 return true;
1704 }
1705
1706 /* We have joined the cluster. If we did that "recently", then there is a
1707 * chance that we do not have the most recent server configuration log
1708 * entry. If so, it's a waste to disconnect from the servers that were in
1709 * remote_addresses and that will probably appear in the configuration,
1710 * just to reconnect to them a moment later when we do get the
1711 * configuration update. If we are not ourselves in the configuration,
1712 * then we know that there must be a new configuration coming up, so in
1713 * that case keep the connection. */
1714 if (!raft_find_server(raft, &raft->sid)) {
1715 return true;
1716 }
1717
1718 /* Keep the connection only if the server is part of the configuration. */
1719 return raft_find_server(raft, &conn->sid);
1720 }
1721
1722 /* Allows 'raft' to maintain the distributed log. Call this function as part
1723 * of the process's main loop. */
1724 void
1725 raft_run(struct raft *raft)
1726 {
1727 if (raft->left || raft->failed) {
1728 return;
1729 }
1730
1731 raft_waiters_run(raft);
1732
1733 if (!raft->listener && time_msec() >= raft->listen_backoff) {
1734 char *paddr = raft_make_address_passive(raft->local_address);
1735 int error = pstream_open(paddr, &raft->listener, DSCP_DEFAULT);
1736 if (error) {
1737 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1738 VLOG_WARN_RL(&rl, "%s: listen failed (%s)",
1739 paddr, ovs_strerror(error));
1740 raft->listen_backoff = time_msec() + 1000;
1741 }
1742 free(paddr);
1743 }
1744
1745 if (raft->listener) {
1746 struct stream *stream;
1747 int error = pstream_accept(raft->listener, &stream);
1748 if (!error) {
1749 raft_add_conn(raft, jsonrpc_session_open_unreliably(
1750 jsonrpc_open(stream), DSCP_DEFAULT), NULL,
1751 true);
1752 } else if (error != EAGAIN) {
1753 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1754 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
1755 pstream_get_name(raft->listener),
1756 ovs_strerror(error));
1757 }
1758 }
1759
1760 /* Run RPCs for all open sessions. */
1761 struct raft_conn *conn;
1762 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1763 raft_conn_run(raft, conn);
1764 }
1765
1766 /* Close unneeded sessions. */
1767 struct raft_conn *next;
1768 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1769 if (!raft_conn_should_stay_open(raft, conn)) {
1770 raft_conn_close(conn);
1771 }
1772 }
1773
1774 /* Open needed sessions. */
1775 struct raft_server *server;
1776 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1777 raft_open_conn(raft, server->address, &server->sid);
1778 }
1779 if (raft->joining) {
1780 const char *address;
1781 SSET_FOR_EACH (address, &raft->remote_addresses) {
1782 raft_open_conn(raft, address, NULL);
1783 }
1784 }
1785
1786 if (!raft->joining && time_msec() >= raft->election_timeout) {
1787 if (raft->role == RAFT_LEADER) {
1788 /* Check if majority of followers replied, then reset
1789 * election_timeout and reset s->replied. Otherwise, become
1790 * follower.
1791 *
1792 * Raft paper section 6.2: Leaders: A server might be in the leader
1793 * state, but if it isn’t the current leader, it could be
1794 * needlessly delaying client requests. For example, suppose a
1795 * leader is partitioned from the rest of the cluster, but it can
1796 * still communicate with a particular client. Without additional
1797 * mechanism, it could delay a request from that client forever,
1798 * being unable to replicate a log entry to any other servers.
1799 * Meanwhile, there might be another leader of a newer term that is
1800 * able to communicate with a majority of the cluster and would be
1801 * able to commit the client’s request. Thus, a leader in Raft
1802 * steps down if an election timeout elapses without a successful
1803 * round of heartbeats to a majority of its cluster; this allows
1804 * clients to retry their requests with another server. */
1805 int count = 0;
1806 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1807 if (server->replied) {
1808 count ++;
1809 }
1810 }
1811 if (count >= hmap_count(&raft->servers) / 2) {
1812 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1813 server->replied = false;
1814 }
1815 raft_reset_election_timer(raft);
1816 } else {
1817 raft_become_follower(raft);
1818 raft_start_election(raft, false);
1819 }
1820 } else {
1821 raft_start_election(raft, false);
1822 }
1823
1824 }
1825
1826 if (raft->leaving && time_msec() >= raft->leave_timeout) {
1827 raft_send_remove_server_requests(raft);
1828 }
1829
1830 if (raft->joining && time_msec() >= raft->join_timeout) {
1831 raft->join_timeout = time_msec() + 1000;
1832 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1833 raft_send_add_server_request(raft, conn);
1834 }
1835 }
1836
1837 long long int now = time_msec();
1838 if (now >= raft->ping_timeout) {
1839 if (raft->role == RAFT_LEADER) {
1840 raft_send_heartbeats(raft);
1841 }
1842 /* Check if any commands timeout. Timeout is set to twice the time of
1843 * election base time so that commands can complete properly during
1844 * leader election. E.g. a leader crashed and current node with pending
1845 * commands becomes new leader: the pending commands can still complete
1846 * if the crashed leader has replicated the transactions to majority of
1847 * followers before it crashed. */
1848 struct raft_command *cmd, *next_cmd;
1849 HMAP_FOR_EACH_SAFE (cmd, next_cmd, hmap_node, &raft->commands) {
1850 if (cmd->timestamp
1851 && now - cmd->timestamp > ELECTION_BASE_MSEC * 2) {
1852 raft_command_complete(raft, cmd, RAFT_CMD_TIMEOUT);
1853 }
1854 }
1855 raft_reset_ping_timer(raft);
1856 }
1857
1858 /* Do this only at the end; if we did it as soon as we set raft->left or
1859 * raft->failed in handling the RemoveServerReply, then it could easily
1860 * cause references to freed memory in RPC sessions, etc. */
1861 if (raft->left || raft->failed) {
1862 raft_close__(raft);
1863 }
1864 }
1865
1866 static void
1867 raft_wait_session(struct jsonrpc_session *js)
1868 {
1869 if (js) {
1870 jsonrpc_session_wait(js);
1871 jsonrpc_session_recv_wait(js);
1872 }
1873 }
1874
1875 /* Causes the next call to poll_block() to wake up when 'raft' needs to do
1876 * something. */
1877 void
1878 raft_wait(struct raft *raft)
1879 {
1880 if (raft->left || raft->failed) {
1881 return;
1882 }
1883
1884 raft_waiters_wait(raft);
1885
1886 if (raft->listener) {
1887 pstream_wait(raft->listener);
1888 } else {
1889 poll_timer_wait_until(raft->listen_backoff);
1890 }
1891
1892 struct raft_conn *conn;
1893 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1894 raft_wait_session(conn->js);
1895 }
1896
1897 if (!raft->joining) {
1898 poll_timer_wait_until(raft->election_timeout);
1899 } else {
1900 poll_timer_wait_until(raft->join_timeout);
1901 }
1902 if (raft->leaving) {
1903 poll_timer_wait_until(raft->leave_timeout);
1904 }
1905 if (raft->role == RAFT_LEADER || !hmap_is_empty(&raft->commands)) {
1906 poll_timer_wait_until(raft->ping_timeout);
1907 }
1908 }
1909
1910 static struct raft_waiter *
1911 raft_waiter_create(struct raft *raft, enum raft_waiter_type type,
1912 bool start_commit)
1913 {
1914 struct raft_waiter *w = xzalloc(sizeof *w);
1915 ovs_list_push_back(&raft->waiters, &w->list_node);
1916 w->commit_ticket = start_commit ? ovsdb_log_commit_start(raft->log) : 0;
1917 w->type = type;
1918 return w;
1919 }
1920
1921 /* Returns a human-readable representation of 'status' (or NULL if 'status' is
1922 * invalid). */
1923 const char *
1924 raft_command_status_to_string(enum raft_command_status status)
1925 {
1926 switch (status) {
1927 case RAFT_CMD_INCOMPLETE:
1928 return "operation still in progress";
1929 case RAFT_CMD_SUCCESS:
1930 return "success";
1931 case RAFT_CMD_NOT_LEADER:
1932 return "not leader";
1933 case RAFT_CMD_BAD_PREREQ:
1934 return "prerequisite check failed";
1935 case RAFT_CMD_LOST_LEADERSHIP:
1936 return "lost leadership";
1937 case RAFT_CMD_SHUTDOWN:
1938 return "server shutdown";
1939 case RAFT_CMD_IO_ERROR:
1940 return "I/O error";
1941 case RAFT_CMD_TIMEOUT:
1942 return "timeout";
1943 default:
1944 return NULL;
1945 }
1946 }
1947
1948 /* Converts human-readable status in 's' into status code in '*statusp'.
1949 * Returns true if successful, false if 's' is unknown. */
1950 bool
1951 raft_command_status_from_string(const char *s,
1952 enum raft_command_status *statusp)
1953 {
1954 for (enum raft_command_status status = 0; ; status++) {
1955 const char *s2 = raft_command_status_to_string(status);
1956 if (!s2) {
1957 *statusp = 0;
1958 return false;
1959 } else if (!strcmp(s, s2)) {
1960 *statusp = status;
1961 return true;
1962 }
1963 }
1964 }
1965
1966 static const struct uuid *
1967 raft_get_eid(const struct raft *raft, uint64_t index)
1968 {
1969 for (; index >= raft->log_start; index--) {
1970 const struct raft_entry *e = raft_get_entry(raft, index);
1971 if (e->data) {
1972 return &e->eid;
1973 }
1974 }
1975 return &raft->snap.eid;
1976 }
1977
1978 const struct uuid *
1979 raft_current_eid(const struct raft *raft)
1980 {
1981 return raft_get_eid(raft, raft->log_end - 1);
1982 }
1983
1984 static struct raft_command *
1985 raft_command_create_completed(enum raft_command_status status)
1986 {
1987 ovs_assert(status != RAFT_CMD_INCOMPLETE);
1988
1989 struct raft_command *cmd = xzalloc(sizeof *cmd);
1990 cmd->n_refs = 1;
1991 cmd->status = status;
1992 return cmd;
1993 }
1994
1995 static struct raft_command *
1996 raft_command_create_incomplete(struct raft *raft, uint64_t index)
1997 {
1998 struct raft_command *cmd = xzalloc(sizeof *cmd);
1999 cmd->n_refs = 2; /* One for client, one for raft->commands. */
2000 cmd->index = index;
2001 cmd->status = RAFT_CMD_INCOMPLETE;
2002 hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
2003 return cmd;
2004 }
2005
2006 static struct raft_command * OVS_WARN_UNUSED_RESULT
2007 raft_command_initiate(struct raft *raft,
2008 const struct json *data, const struct json *servers,
2009 const struct uuid *eid)
2010 {
2011 /* Write to local log. */
2012 uint64_t index = raft->log_end;
2013 if (!raft_handle_write_error(
2014 raft, raft_write_entry(
2015 raft, raft->term, json_nullable_clone(data), eid,
2016 json_nullable_clone(servers)))) {
2017 return raft_command_create_completed(RAFT_CMD_IO_ERROR);
2018 }
2019
2020 struct raft_command *cmd = raft_command_create_incomplete(raft, index);
2021 ovs_assert(eid);
2022 cmd->eid = *eid;
2023 cmd->timestamp = time_msec();
2024
2025 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index = cmd->index;
2026
2027 if (failure_test == FT_CRASH_BEFORE_SEND_APPEND_REQ) {
2028 ovs_fatal(0, "Raft test: crash before sending append_request.");
2029 }
2030 /* Write to remote logs. */
2031 struct raft_server *s;
2032 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2033 if (!uuid_equals(&s->sid, &raft->sid) && s->next_index == index) {
2034 raft_send_append_request(raft, s, 1, "execute command");
2035 s->next_index++;
2036 }
2037 }
2038 if (failure_test == FT_CRASH_AFTER_SEND_APPEND_REQ) {
2039 ovs_fatal(0, "Raft test: crash after sending append_request.");
2040 }
2041 raft_reset_ping_timer(raft);
2042
2043 return cmd;
2044 }
2045
2046 static void
2047 log_all_commands(struct raft *raft)
2048 {
2049 struct raft_command *cmd, *next;
2050 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2051 VLOG_DBG("raft command eid: "UUID_FMT, UUID_ARGS(&cmd->eid));
2052 }
2053 }
2054
2055 static struct raft_command * OVS_WARN_UNUSED_RESULT
2056 raft_command_execute__(struct raft *raft,
2057 const struct json *data, const struct json *servers,
2058 const struct uuid *prereq, struct uuid *result)
2059 {
2060 if (raft->joining || raft->leaving || raft->left || raft->failed) {
2061 return raft_command_create_completed(RAFT_CMD_SHUTDOWN);
2062 }
2063
2064 if (raft->role != RAFT_LEADER) {
2065 /* Consider proxying the command to the leader. We can only do that if
2066 * we know the leader and the command does not change the set of
2067 * servers. We do not proxy commands without prerequisites, even
2068 * though we could, because in an OVSDB context a log entry doesn't
2069 * make sense without context. */
2070 if (servers || !data
2071 || raft->role != RAFT_FOLLOWER || uuid_is_zero(&raft->leader_sid)
2072 || !prereq) {
2073 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2074 }
2075 }
2076
2077 struct uuid eid = data ? uuid_random() : UUID_ZERO;
2078 if (result) {
2079 *result = eid;
2080 }
2081
2082 if (raft->role != RAFT_LEADER) {
2083 const union raft_rpc rpc = {
2084 .execute_command_request = {
2085 .common = {
2086 .type = RAFT_RPC_EXECUTE_COMMAND_REQUEST,
2087 .sid = raft->leader_sid,
2088 },
2089 .data = CONST_CAST(struct json *, data),
2090 .prereq = *prereq,
2091 .result = eid,
2092 }
2093 };
2094 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REQ) {
2095 ovs_fatal(0, "Raft test: crash before sending "
2096 "execute_command_request");
2097 }
2098 if (!raft_send(raft, &rpc)) {
2099 /* Couldn't send command, so it definitely failed. */
2100 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2101 }
2102 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REQ) {
2103 ovs_fatal(0, "Raft test: crash after sending "
2104 "execute_command_request");
2105 }
2106
2107 struct raft_command *cmd = raft_command_create_incomplete(raft, 0);
2108 cmd->timestamp = time_msec();
2109 cmd->eid = eid;
2110 log_all_commands(raft);
2111 return cmd;
2112 }
2113
2114 const struct uuid *current_eid = raft_current_eid(raft);
2115 if (prereq && !uuid_equals(prereq, current_eid)) {
2116 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2117 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
2118 "prerequisite "UUID_FMT,
2119 UUID_ARGS(current_eid), UUID_ARGS(prereq));
2120 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ);
2121 }
2122
2123 return raft_command_initiate(raft, data, servers, &eid);
2124 }
2125
2126 /* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2127 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2128 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2129 * populated with the entry ID for the new log entry.
2130 *
2131 * Returns a "struct raft_command" that may be used to track progress adding
2132 * the log entry. The caller must eventually free the returned structure, with
2133 * raft_command_unref(). */
2134 struct raft_command * OVS_WARN_UNUSED_RESULT
2135 raft_command_execute(struct raft *raft, const struct json *data,
2136 const struct uuid *prereq, struct uuid *result)
2137 {
2138 return raft_command_execute__(raft, data, NULL, prereq, result);
2139 }
2140
2141 /* Returns the status of 'cmd'. */
2142 enum raft_command_status
2143 raft_command_get_status(const struct raft_command *cmd)
2144 {
2145 ovs_assert(cmd->n_refs > 0);
2146 return cmd->status;
2147 }
2148
2149 /* Returns the index of the log entry at which 'cmd' was committed.
2150 *
2151 * This function works only with successful commands. */
2152 uint64_t
2153 raft_command_get_commit_index(const struct raft_command *cmd)
2154 {
2155 ovs_assert(cmd->n_refs > 0);
2156 ovs_assert(cmd->status == RAFT_CMD_SUCCESS);
2157 return cmd->index;
2158 }
2159
2160 /* Frees 'cmd'. */
2161 void
2162 raft_command_unref(struct raft_command *cmd)
2163 {
2164 if (cmd) {
2165 ovs_assert(cmd->n_refs > 0);
2166 if (!--cmd->n_refs) {
2167 free(cmd);
2168 }
2169 }
2170 }
2171
2172 /* Causes poll_block() to wake up when 'cmd' has status to report. */
2173 void
2174 raft_command_wait(const struct raft_command *cmd)
2175 {
2176 if (cmd->status != RAFT_CMD_INCOMPLETE) {
2177 poll_immediate_wake();
2178 }
2179 }
2180
2181 static void
2182 raft_command_complete(struct raft *raft,
2183 struct raft_command *cmd,
2184 enum raft_command_status status)
2185 {
2186 VLOG_DBG("raft_command_complete eid "UUID_FMT" status: %s",
2187 UUID_ARGS(&cmd->eid), raft_command_status_to_string(status));
2188 if (!uuid_is_zero(&cmd->sid)) {
2189 uint64_t commit_index = status == RAFT_CMD_SUCCESS ? cmd->index : 0;
2190 raft_send_execute_command_reply(raft, &cmd->sid, &cmd->eid, status,
2191 commit_index);
2192 }
2193
2194 ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
2195 ovs_assert(cmd->n_refs > 0);
2196 hmap_remove(&raft->commands, &cmd->hmap_node);
2197 cmd->status = status;
2198 raft_command_unref(cmd);
2199 }
2200
2201 static void
2202 raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
2203 {
2204 struct raft_command *cmd, *next;
2205 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2206 raft_command_complete(raft, cmd, status);
2207 }
2208 }
2209
2210 static struct raft_command *
2211 raft_find_command_by_eid(struct raft *raft, const struct uuid *eid)
2212 {
2213 struct raft_command *cmd;
2214
2215 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2216 if (uuid_equals(&cmd->eid, eid)) {
2217 return cmd;
2218 }
2219 }
2220 return NULL;
2221 }
2222 \f
2223 #define RAFT_RPC(ENUM, NAME) \
2224 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2225 RAFT_RPC_TYPES
2226 #undef RAFT_RPC
2227
2228 static void
2229 raft_handle_hello_request(struct raft *raft OVS_UNUSED,
2230 const struct raft_hello_request *hello OVS_UNUSED)
2231 {
2232 }
2233
2234 /* 'sid' is the server being added. */
2235 static void
2236 raft_send_add_server_reply__(struct raft *raft, const struct uuid *sid,
2237 const char *address,
2238 bool success, const char *comment)
2239 {
2240 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2241 if (!VLOG_DROP_INFO(&rl)) {
2242 struct ds s = DS_EMPTY_INITIALIZER;
2243 char buf[SID_LEN + 1];
2244 ds_put_format(&s, "adding %s ("SID_FMT" at %s) "
2245 "to cluster "CID_FMT" %s",
2246 raft_get_nickname(raft, sid, buf, sizeof buf),
2247 SID_ARGS(sid), address, CID_ARGS(&raft->cid),
2248 success ? "succeeded" : "failed");
2249 if (comment) {
2250 ds_put_format(&s, " (%s)", comment);
2251 }
2252 VLOG_INFO("%s", ds_cstr(&s));
2253 ds_destroy(&s);
2254 }
2255
2256 union raft_rpc rpy = {
2257 .add_server_reply = {
2258 .common = {
2259 .type = RAFT_RPC_ADD_SERVER_REPLY,
2260 .sid = *sid,
2261 .comment = CONST_CAST(char *, comment),
2262 },
2263 .success = success,
2264 }
2265 };
2266
2267 struct sset *remote_addresses = &rpy.add_server_reply.remote_addresses;
2268 sset_init(remote_addresses);
2269 if (!raft->joining) {
2270 struct raft_server *s;
2271 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2272 if (!uuid_equals(&s->sid, &raft->sid)) {
2273 sset_add(remote_addresses, s->address);
2274 }
2275 }
2276 }
2277
2278 raft_send(raft, &rpy);
2279
2280 sset_destroy(remote_addresses);
2281 }
2282
2283 static void
2284 raft_send_remove_server_reply_rpc(struct raft *raft,
2285 const struct uuid *dst_sid,
2286 const struct uuid *target_sid,
2287 bool success, const char *comment)
2288 {
2289 if (uuid_equals(&raft->sid, dst_sid)) {
2290 if (success && uuid_equals(&raft->sid, target_sid)) {
2291 raft_finished_leaving_cluster(raft);
2292 }
2293 return;
2294 }
2295
2296 const union raft_rpc rpy = {
2297 .remove_server_reply = {
2298 .common = {
2299 .type = RAFT_RPC_REMOVE_SERVER_REPLY,
2300 .sid = *dst_sid,
2301 .comment = CONST_CAST(char *, comment),
2302 },
2303 .target_sid = (uuid_equals(dst_sid, target_sid)
2304 ? UUID_ZERO
2305 : *target_sid),
2306 .success = success,
2307 }
2308 };
2309 raft_send(raft, &rpy);
2310 }
2311
2312 static void
2313 raft_send_remove_server_reply__(struct raft *raft,
2314 const struct uuid *target_sid,
2315 const struct uuid *requester_sid,
2316 struct unixctl_conn *requester_conn,
2317 bool success, const char *comment)
2318 {
2319 struct ds s = DS_EMPTY_INITIALIZER;
2320 ds_put_format(&s, "request ");
2321 if (!uuid_is_zero(requester_sid)) {
2322 char buf[SID_LEN + 1];
2323 ds_put_format(&s, "by %s",
2324 raft_get_nickname(raft, requester_sid, buf, sizeof buf));
2325 } else {
2326 ds_put_cstr(&s, "via unixctl");
2327 }
2328 ds_put_cstr(&s, " to remove ");
2329 if (!requester_conn && uuid_equals(target_sid, requester_sid)) {
2330 ds_put_cstr(&s, "itself");
2331 } else {
2332 char buf[SID_LEN + 1];
2333 ds_put_cstr(&s, raft_get_nickname(raft, target_sid, buf, sizeof buf));
2334 if (uuid_equals(target_sid, &raft->sid)) {
2335 ds_put_cstr(&s, " (ourselves)");
2336 }
2337 }
2338 ds_put_format(&s, " from cluster "CID_FMT" %s",
2339 CID_ARGS(&raft->cid),
2340 success ? "succeeded" : "failed");
2341 if (comment) {
2342 ds_put_format(&s, " (%s)", comment);
2343 }
2344
2345 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2346 VLOG_INFO_RL(&rl, "%s", ds_cstr(&s));
2347
2348 /* Send RemoveServerReply to the requester (which could be a server or a
2349 * unixctl connection. Also always send it to the removed server; this
2350 * allows it to be sure that it's really removed and update its log and
2351 * disconnect permanently. */
2352 if (!uuid_is_zero(requester_sid)) {
2353 raft_send_remove_server_reply_rpc(raft, requester_sid, target_sid,
2354 success, comment);
2355 }
2356 if (!uuid_equals(requester_sid, target_sid)) {
2357 raft_send_remove_server_reply_rpc(raft, target_sid, target_sid,
2358 success, comment);
2359 }
2360 if (requester_conn) {
2361 if (success) {
2362 unixctl_command_reply(requester_conn, ds_cstr(&s));
2363 } else {
2364 unixctl_command_reply_error(requester_conn, ds_cstr(&s));
2365 }
2366 }
2367
2368 ds_destroy(&s);
2369 }
2370
2371 static void
2372 raft_send_add_server_reply(struct raft *raft,
2373 const struct raft_add_server_request *rq,
2374 bool success, const char *comment)
2375 {
2376 return raft_send_add_server_reply__(raft, &rq->common.sid, rq->address,
2377 success, comment);
2378 }
2379
2380 static void
2381 raft_send_remove_server_reply(struct raft *raft,
2382 const struct raft_remove_server_request *rq,
2383 bool success, const char *comment)
2384 {
2385 return raft_send_remove_server_reply__(raft, &rq->sid, &rq->common.sid,
2386 rq->requester_conn, success,
2387 comment);
2388 }
2389
2390 static void
2391 raft_become_follower(struct raft *raft)
2392 {
2393 raft->leader_sid = UUID_ZERO;
2394 if (raft->role == RAFT_FOLLOWER) {
2395 return;
2396 }
2397
2398 raft->role = RAFT_FOLLOWER;
2399 raft_reset_election_timer(raft);
2400
2401 /* Notify clients about lost leadership.
2402 *
2403 * We do not reverse our changes to 'raft->servers' because the new
2404 * configuration is already part of the log. Possibly the configuration
2405 * log entry will not be committed, but until we know that we must use the
2406 * new configuration. Our AppendEntries processing will properly update
2407 * the server configuration later, if necessary. */
2408 struct raft_server *s;
2409 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
2410 raft_send_add_server_reply__(raft, &s->sid, s->address, false,
2411 RAFT_SERVER_LOST_LEADERSHIP);
2412 }
2413 if (raft->remove_server) {
2414 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
2415 &raft->remove_server->requester_sid,
2416 raft->remove_server->requester_conn,
2417 false, RAFT_SERVER_LOST_LEADERSHIP);
2418 raft_server_destroy(raft->remove_server);
2419 raft->remove_server = NULL;
2420 }
2421
2422 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2423 }
2424
2425 static void
2426 raft_send_append_request(struct raft *raft,
2427 struct raft_server *peer, unsigned int n,
2428 const char *comment)
2429 {
2430 ovs_assert(raft->role == RAFT_LEADER);
2431
2432 const union raft_rpc rq = {
2433 .append_request = {
2434 .common = {
2435 .type = RAFT_RPC_APPEND_REQUEST,
2436 .sid = peer->sid,
2437 .comment = CONST_CAST(char *, comment),
2438 },
2439 .term = raft->term,
2440 .prev_log_index = peer->next_index - 1,
2441 .prev_log_term = (peer->next_index - 1 >= raft->log_start
2442 ? raft->entries[peer->next_index - 1
2443 - raft->log_start].term
2444 : raft->snap.term),
2445 .leader_commit = raft->commit_index,
2446 .entries = &raft->entries[peer->next_index - raft->log_start],
2447 .n_entries = n,
2448 },
2449 };
2450 raft_send(raft, &rq);
2451 }
2452
2453 static void
2454 raft_send_heartbeats(struct raft *raft)
2455 {
2456 struct raft_server *s;
2457 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2458 if (!uuid_equals(&raft->sid, &s->sid)) {
2459 raft_send_append_request(raft, s, 0, "heartbeat");
2460 }
2461 }
2462
2463 /* Send anyone waiting for a command to complete a ping to let them
2464 * know we're still working on it. */
2465 struct raft_command *cmd;
2466 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2467 if (!uuid_is_zero(&cmd->sid)) {
2468 raft_send_execute_command_reply(raft, &cmd->sid,
2469 &cmd->eid,
2470 RAFT_CMD_INCOMPLETE, 0);
2471 }
2472 }
2473
2474 raft_reset_ping_timer(raft);
2475 }
2476
2477 /* Initializes the fields in 's' that represent the leader's view of the
2478 * server. */
2479 static void
2480 raft_server_init_leader(struct raft *raft, struct raft_server *s)
2481 {
2482 s->next_index = raft->log_end;
2483 s->match_index = 0;
2484 s->phase = RAFT_PHASE_STABLE;
2485 s->replied = false;
2486 }
2487
2488 static void
2489 raft_become_leader(struct raft *raft)
2490 {
2491 log_all_commands(raft);
2492
2493 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
2494 VLOG_INFO_RL(&rl, "term %"PRIu64": elected leader by %d+ of "
2495 "%"PRIuSIZE" servers", raft->term,
2496 raft->n_votes, hmap_count(&raft->servers));
2497
2498 ovs_assert(raft->role != RAFT_LEADER);
2499 raft->role = RAFT_LEADER;
2500 raft->leader_sid = raft->sid;
2501 raft_reset_election_timer(raft);
2502 raft_reset_ping_timer(raft);
2503
2504 struct raft_server *s;
2505 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2506 raft_server_init_leader(raft, s);
2507 }
2508
2509 raft_update_our_match_index(raft, raft->log_end - 1);
2510 raft_send_heartbeats(raft);
2511
2512 /* Write the fact that we are leader to the log. This is not used by the
2513 * algorithm (although it could be, for quick restart), but it is used for
2514 * offline analysis to check for conformance with the properties that Raft
2515 * guarantees. */
2516 struct raft_record r = {
2517 .type = RAFT_REC_LEADER,
2518 .term = raft->term,
2519 .sid = raft->sid,
2520 };
2521 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2522
2523 /* Initiate a no-op commit. Otherwise we might never find out what's in
2524 * the log. See section 6.4 item 1:
2525 *
2526 * The Leader Completeness Property guarantees that a leader has all
2527 * committed entries, but at the start of its term, it may not know
2528 * which those are. To find out, it needs to commit an entry from its
2529 * term. Raft handles this by having each leader commit a blank no-op
2530 * entry into the log at the start of its term. As soon as this no-op
2531 * entry is committed, the leader’s commit index will be at least as
2532 * large as any other servers’ during its term.
2533 */
2534 raft_command_unref(raft_command_execute__(raft, NULL, NULL, NULL, NULL));
2535 }
2536
2537 /* Processes term 'term' received as part of RPC 'common'. Returns true if the
2538 * caller should continue processing the RPC, false if the caller should reject
2539 * it due to a stale term. */
2540 static bool
2541 raft_receive_term__(struct raft *raft, const struct raft_rpc_common *common,
2542 uint64_t term)
2543 {
2544 /* Section 3.3 says:
2545 *
2546 * Current terms are exchanged whenever servers communicate; if one
2547 * server’s current term is smaller than the other’s, then it updates
2548 * its current term to the larger value. If a candidate or leader
2549 * discovers that its term is out of date, it immediately reverts to
2550 * follower state. If a server receives a request with a stale term
2551 * number, it rejects the request.
2552 */
2553 if (term > raft->term) {
2554 if (!raft_set_term(raft, term, NULL)) {
2555 /* Failed to update the term to 'term'. */
2556 return false;
2557 }
2558 raft_become_follower(raft);
2559 } else if (term < raft->term) {
2560 char buf[SID_LEN + 1];
2561 VLOG_INFO("rejecting term %"PRIu64" < current term %"PRIu64" received "
2562 "in %s message from server %s",
2563 term, raft->term,
2564 raft_rpc_type_to_string(common->type),
2565 raft_get_nickname(raft, &common->sid, buf, sizeof buf));
2566 return false;
2567 }
2568 return true;
2569 }
2570
2571 static void
2572 raft_get_servers_from_log(struct raft *raft, enum vlog_level level)
2573 {
2574 const struct json *servers_json = raft->snap.servers;
2575 for (uint64_t index = raft->log_end - 1; index >= raft->log_start;
2576 index--) {
2577 struct raft_entry *e = &raft->entries[index - raft->log_start];
2578 if (e->servers) {
2579 servers_json = e->servers;
2580 break;
2581 }
2582 }
2583
2584 struct hmap servers;
2585 struct ovsdb_error *error = raft_servers_from_json(servers_json, &servers);
2586 ovs_assert(!error);
2587 raft_set_servers(raft, &servers, level);
2588 raft_servers_destroy(&servers);
2589 }
2590
2591 /* Truncates the log, so that raft->log_end becomes 'new_end'.
2592 *
2593 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2594 * log file, but we don't have the right information to know how long it should
2595 * be. What we actually do is to append entries for older indexes to the
2596 * on-disk log; when we re-read it later, these entries truncate the log.
2597 *
2598 * Returns true if any of the removed log entries were server configuration
2599 * entries, false otherwise. */
2600 static bool
2601 raft_truncate(struct raft *raft, uint64_t new_end)
2602 {
2603 ovs_assert(new_end >= raft->log_start);
2604 if (raft->log_end > new_end) {
2605 char buf[SID_LEN + 1];
2606 VLOG_INFO("%s truncating %"PRIu64 " entries from end of log",
2607 raft_get_nickname(raft, &raft->sid, buf, sizeof buf),
2608 raft->log_end - new_end);
2609 }
2610
2611 bool servers_changed = false;
2612 while (raft->log_end > new_end) {
2613 struct raft_entry *entry = &raft->entries[--raft->log_end
2614 - raft->log_start];
2615 if (entry->servers) {
2616 servers_changed = true;
2617 }
2618 raft_entry_uninit(entry);
2619 }
2620 return servers_changed;
2621 }
2622
2623 static const struct json *
2624 raft_peek_next_entry(struct raft *raft, struct uuid *eid)
2625 {
2626 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2627 ovs_assert(raft->log_start <= raft->last_applied + 2);
2628 ovs_assert(raft->last_applied <= raft->commit_index);
2629 ovs_assert(raft->commit_index < raft->log_end);
2630
2631 if (raft->joining || raft->failed) {
2632 return NULL;
2633 }
2634
2635 if (raft->log_start == raft->last_applied + 2) {
2636 *eid = raft->snap.eid;
2637 return raft->snap.data;
2638 }
2639
2640 while (raft->last_applied < raft->commit_index) {
2641 const struct raft_entry *e = raft_get_entry(raft,
2642 raft->last_applied + 1);
2643 if (e->data) {
2644 *eid = e->eid;
2645 return e->data;
2646 }
2647 raft->last_applied++;
2648 }
2649 return NULL;
2650 }
2651
2652 static const struct json *
2653 raft_get_next_entry(struct raft *raft, struct uuid *eid)
2654 {
2655 const struct json *data = raft_peek_next_entry(raft, eid);
2656 if (data) {
2657 raft->last_applied++;
2658 }
2659 return data;
2660 }
2661
2662 /* Updates commit index in raft log. If commit index is already up-to-date
2663 * it does nothing and return false, otherwise, returns true. */
2664 static bool
2665 raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
2666 {
2667 if (new_commit_index <= raft->commit_index) {
2668 return false;
2669 }
2670
2671 if (raft->role == RAFT_LEADER) {
2672 while (raft->commit_index < new_commit_index) {
2673 uint64_t index = ++raft->commit_index;
2674 const struct raft_entry *e = raft_get_entry(raft, index);
2675 if (e->data) {
2676 struct raft_command *cmd
2677 = raft_find_command_by_eid(raft, &e->eid);
2678 if (cmd) {
2679 if (!cmd->index) {
2680 VLOG_DBG("Command completed after role change from"
2681 " follower to leader "UUID_FMT,
2682 UUID_ARGS(&e->eid));
2683 cmd->index = index;
2684 }
2685 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2686 }
2687 }
2688 if (e->servers) {
2689 /* raft_run_reconfigure() can write a new Raft entry, which can
2690 * reallocate raft->entries, which would invalidate 'e', so
2691 * this case must be last, after the one for 'e->data'. */
2692 raft_run_reconfigure(raft);
2693 }
2694 }
2695 } else {
2696 raft->commit_index = new_commit_index;
2697 /* Check if any pending command can be completed, and complete it.
2698 * This can happen when leader fail-over before sending
2699 * execute_command_reply. */
2700 const struct uuid *eid = raft_get_eid(raft, new_commit_index);
2701 struct raft_command *cmd = raft_find_command_by_eid(raft, eid);
2702 if (cmd) {
2703 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2704 VLOG_INFO_RL(&rl,
2705 "Command completed without reply (eid: "UUID_FMT", "
2706 "commit index: %"PRIu64")",
2707 UUID_ARGS(eid), new_commit_index);
2708 cmd->index = new_commit_index;
2709 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2710 }
2711 }
2712
2713 /* Write the commit index to the log. The next time we restart, this
2714 * allows us to start exporting a reasonably fresh log, instead of a log
2715 * that only contains the snapshot. */
2716 struct raft_record r = {
2717 .type = RAFT_REC_COMMIT_INDEX,
2718 .commit_index = raft->commit_index,
2719 };
2720 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2721 return true;
2722 }
2723
2724 /* This doesn't use rq->entries (but it does use rq->n_entries). */
2725 static void
2726 raft_send_append_reply(struct raft *raft, const struct raft_append_request *rq,
2727 enum raft_append_result result, const char *comment)
2728 {
2729 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2730 * min(leaderCommit, index of last new entry)" */
2731 if (result == RAFT_APPEND_OK && rq->leader_commit > raft->commit_index) {
2732 raft_update_commit_index(
2733 raft, MIN(rq->leader_commit, rq->prev_log_index + rq->n_entries));
2734 }
2735
2736 /* Send reply. */
2737 union raft_rpc reply = {
2738 .append_reply = {
2739 .common = {
2740 .type = RAFT_RPC_APPEND_REPLY,
2741 .sid = rq->common.sid,
2742 .comment = CONST_CAST(char *, comment),
2743 },
2744 .term = raft->term,
2745 .log_end = raft->log_end,
2746 .prev_log_index = rq->prev_log_index,
2747 .prev_log_term = rq->prev_log_term,
2748 .n_entries = rq->n_entries,
2749 .result = result,
2750 }
2751 };
2752 raft_send(raft, &reply);
2753 }
2754
2755 /* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2756 * NULL. Otherwise, returns an explanation for the mismatch. */
2757 static const char *
2758 match_index_and_term(const struct raft *raft,
2759 uint64_t prev_log_index, uint64_t prev_log_term)
2760 {
2761 if (prev_log_index < raft->log_start - 1) {
2762 return "mismatch before start of log";
2763 } else if (prev_log_index == raft->log_start - 1) {
2764 if (prev_log_term != raft->snap.term) {
2765 return "prev_term mismatch";
2766 }
2767 } else if (prev_log_index < raft->log_end) {
2768 if (raft->entries[prev_log_index - raft->log_start].term
2769 != prev_log_term) {
2770 return "term mismatch";
2771 }
2772 } else {
2773 /* prev_log_index >= raft->log_end */
2774 return "mismatch past end of log";
2775 }
2776 return NULL;
2777 }
2778
2779 static void
2780 raft_handle_append_entries(struct raft *raft,
2781 const struct raft_append_request *rq,
2782 uint64_t prev_log_index, uint64_t prev_log_term,
2783 const struct raft_entry *entries,
2784 unsigned int n_entries)
2785 {
2786 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2787 * the index and term of the entry in its log that immediately precedes
2788 * the new entries. If the follower does not find an entry in its log
2789 * with the same index and term, then it refuses the new entries." */
2790 const char *mismatch = match_index_and_term(raft, prev_log_index,
2791 prev_log_term);
2792 if (mismatch) {
2793 VLOG_INFO("rejecting append_request because previous entry "
2794 "%"PRIu64",%"PRIu64" not in local log (%s)",
2795 prev_log_term, prev_log_index, mismatch);
2796 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY, mismatch);
2797 return;
2798 }
2799
2800 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2801 * index but different terms), delete the existing entry and all that
2802 * follow it." */
2803 unsigned int i;
2804 bool servers_changed = false;
2805 for (i = 0; ; i++) {
2806 if (i >= n_entries) {
2807 /* No change. */
2808 if (rq->common.comment
2809 && !strcmp(rq->common.comment, "heartbeat")) {
2810 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "heartbeat");
2811 } else {
2812 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2813 }
2814 return;
2815 }
2816
2817 uint64_t log_index = (prev_log_index + 1) + i;
2818 if (log_index >= raft->log_end) {
2819 break;
2820 }
2821 if (raft->entries[log_index - raft->log_start].term
2822 != entries[i].term) {
2823 if (raft_truncate(raft, log_index)) {
2824 servers_changed = true;
2825 }
2826 break;
2827 }
2828 }
2829
2830 if (failure_test == FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE) {
2831 ovs_fatal(0, "Raft test: crash after receiving append_request with "
2832 "update.");
2833 }
2834 /* Figure 3.1: "Append any entries not already in the log." */
2835 struct ovsdb_error *error = NULL;
2836 bool any_written = false;
2837 for (; i < n_entries; i++) {
2838 const struct raft_entry *e = &entries[i];
2839 error = raft_write_entry(raft, e->term,
2840 json_nullable_clone(e->data), &e->eid,
2841 json_nullable_clone(e->servers));
2842 if (error) {
2843 break;
2844 }
2845 any_written = true;
2846 if (e->servers) {
2847 servers_changed = true;
2848 }
2849 }
2850
2851 if (any_written) {
2852 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index
2853 = raft->log_end - 1;
2854 }
2855 if (servers_changed) {
2856 /* The set of servers might have changed; check. */
2857 raft_get_servers_from_log(raft, VLL_INFO);
2858 }
2859
2860 if (error) {
2861 char *s = ovsdb_error_to_string_free(error);
2862 VLOG_ERR("%s", s);
2863 free(s);
2864 raft_send_append_reply(raft, rq, RAFT_APPEND_IO_ERROR, "I/O error");
2865 return;
2866 }
2867
2868 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "log updated");
2869 }
2870
2871 static bool
2872 raft_update_leader(struct raft *raft, const struct uuid *sid)
2873 {
2874 if (raft->role == RAFT_LEADER) {
2875 char buf[SID_LEN + 1];
2876 VLOG_ERR("this server is leader but server %s claims to be",
2877 raft_get_nickname(raft, sid, buf, sizeof buf));
2878 return false;
2879 } else if (!uuid_equals(sid, &raft->leader_sid)) {
2880 if (!uuid_is_zero(&raft->leader_sid)) {
2881 char buf1[SID_LEN + 1];
2882 char buf2[SID_LEN + 1];
2883 VLOG_ERR("leader for term %"PRIu64" changed from %s to %s",
2884 raft->term,
2885 raft_get_nickname(raft, &raft->leader_sid,
2886 buf1, sizeof buf1),
2887 raft_get_nickname(raft, sid, buf2, sizeof buf2));
2888 } else {
2889 char buf[SID_LEN + 1];
2890 VLOG_INFO("server %s is leader for term %"PRIu64,
2891 raft_get_nickname(raft, sid, buf, sizeof buf),
2892 raft->term);
2893 }
2894 raft->leader_sid = *sid;
2895
2896 /* Record the leader to the log. This is not used by the algorithm
2897 * (although it could be, for quick restart), but it is used for
2898 * offline analysis to check for conformance with the properties
2899 * that Raft guarantees. */
2900 struct raft_record r = {
2901 .type = RAFT_REC_LEADER,
2902 .term = raft->term,
2903 .sid = *sid,
2904 };
2905 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2906 }
2907 return true;
2908 }
2909
2910 static void
2911 raft_handle_append_request(struct raft *raft,
2912 const struct raft_append_request *rq)
2913 {
2914 /* We do not check whether the server that sent the request is part of the
2915 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
2916 * from a leader that is not part of the server’s latest configuration.
2917 * Otherwise, a new server could never be added to the cluster (it would
2918 * never accept any log entries preceding the configuration entry that adds
2919 * the server)." */
2920 if (!raft_update_leader(raft, &rq->common.sid)) {
2921 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
2922 "usurped leadership");
2923 return;
2924 }
2925 raft_reset_election_timer(raft);
2926
2927 /* First check for the common case, where the AppendEntries request is
2928 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
2929 * like this:
2930 *
2931 * rq->prev_log_index
2932 * | first_entry_index
2933 * | | nth_entry_index
2934 * | | |
2935 * v v v
2936 * +---+---+---+---+
2937 * T | T | T | T | T |
2938 * +---+-------+---+
2939 * +---+---+---+---+
2940 * T | T | T | T | T |
2941 * +---+---+---+---+
2942 * ^ ^
2943 * | |
2944 * log_start log_end
2945 * */
2946 uint64_t first_entry_index = rq->prev_log_index + 1;
2947 uint64_t nth_entry_index = rq->prev_log_index + rq->n_entries;
2948 if (OVS_LIKELY(first_entry_index >= raft->log_start)) {
2949 raft_handle_append_entries(raft, rq,
2950 rq->prev_log_index, rq->prev_log_term,
2951 rq->entries, rq->n_entries);
2952 return;
2953 }
2954
2955 /* Now a series of checks for odd cases, where the AppendEntries request
2956 * extends earlier than the beginning of our log, into the log entries
2957 * discarded by the most recent snapshot. */
2958
2959 /*
2960 * Handle the case where the indexes covered by rq->entries[] are entirely
2961 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
2962 * everything in the AppendEntries request must already have been
2963 * committed, and we might as well return true.
2964 *
2965 * rq->prev_log_index
2966 * | first_entry_index
2967 * | | nth_entry_index
2968 * | | |
2969 * v v v
2970 * +---+---+---+---+
2971 * T | T | T | T | T |
2972 * +---+-------+---+
2973 * +---+---+---+---+
2974 * T | T | T | T | T |
2975 * +---+---+---+---+
2976 * ^ ^
2977 * | |
2978 * log_start log_end
2979 */
2980 if (nth_entry_index < raft->log_start - 1) {
2981 raft_send_append_reply(raft, rq, RAFT_APPEND_OK,
2982 "append before log start");
2983 return;
2984 }
2985
2986 /*
2987 * Handle the case where the last entry in rq->entries[] has the same index
2988 * as 'log_start - 1', so we can compare their terms:
2989 *
2990 * rq->prev_log_index
2991 * | first_entry_index
2992 * | | nth_entry_index
2993 * | | |
2994 * v v v
2995 * +---+---+---+---+
2996 * T | T | T | T | T |
2997 * +---+-------+---+
2998 * +---+---+---+---+
2999 * T | T | T | T | T |
3000 * +---+---+---+---+
3001 * ^ ^
3002 * | |
3003 * log_start log_end
3004 *
3005 * There's actually a sub-case where rq->n_entries == 0, in which we
3006 * compare rq->prev_term:
3007 *
3008 * rq->prev_log_index
3009 * |
3010 * |
3011 * |
3012 * v
3013 * T
3014 *
3015 * +---+---+---+---+
3016 * T | T | T | T | T |
3017 * +---+---+---+---+
3018 * ^ ^
3019 * | |
3020 * log_start log_end
3021 */
3022 if (nth_entry_index == raft->log_start - 1) {
3023 if (rq->n_entries
3024 ? raft->snap.term == rq->entries[rq->n_entries - 1].term
3025 : raft->snap.term == rq->prev_log_term) {
3026 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
3027 } else {
3028 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
3029 "term mismatch");
3030 }
3031 return;
3032 }
3033
3034 /*
3035 * We now know that the data in rq->entries[] overlaps the data in
3036 * raft->entries[], as shown below, with some positive 'ofs':
3037 *
3038 * rq->prev_log_index
3039 * | first_entry_index
3040 * | | nth_entry_index
3041 * | | |
3042 * v v v
3043 * +---+---+---+---+---+
3044 * T | T | T | T | T | T |
3045 * +---+-------+---+---+
3046 * +---+---+---+---+
3047 * T | T | T | T | T |
3048 * +---+---+---+---+
3049 * ^ ^
3050 * | |
3051 * log_start log_end
3052 *
3053 * |<-- ofs -->|
3054 *
3055 * We transform this into the following by trimming the first 'ofs'
3056 * elements off of rq->entries[], ending up with the following. Notice how
3057 * we retain the term but not the data for rq->entries[ofs - 1]:
3058 *
3059 * first_entry_index + ofs - 1
3060 * | first_entry_index + ofs
3061 * | | nth_entry_index + ofs
3062 * | | |
3063 * v v v
3064 * +---+---+
3065 * T | T | T |
3066 * +---+---+
3067 * +---+---+---+---+
3068 * T | T | T | T | T |
3069 * +---+---+---+---+
3070 * ^ ^
3071 * | |
3072 * log_start log_end
3073 */
3074 uint64_t ofs = raft->log_start - first_entry_index;
3075 raft_handle_append_entries(raft, rq,
3076 raft->log_start - 1, rq->entries[ofs - 1].term,
3077 &rq->entries[ofs], rq->n_entries - ofs);
3078 }
3079
3080 /* Returns true if 'raft' has another log entry or snapshot to read. */
3081 bool
3082 raft_has_next_entry(const struct raft *raft_)
3083 {
3084 struct raft *raft = CONST_CAST(struct raft *, raft_);
3085 struct uuid eid;
3086 return raft_peek_next_entry(raft, &eid) != NULL;
3087 }
3088
3089 /* Returns the next log entry or snapshot from 'raft', or NULL if there are
3090 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
3091 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
3092 * log entry. */
3093 const struct json *
3094 raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
3095 {
3096 const struct json *data = raft_get_next_entry(raft, eid);
3097 *is_snapshot = data == raft->snap.data;
3098 return data;
3099 }
3100
3101 /* Returns the log index of the last-read snapshot or log entry. */
3102 uint64_t
3103 raft_get_applied_index(const struct raft *raft)
3104 {
3105 return raft->last_applied;
3106 }
3107
3108 /* Returns the log index of the last snapshot or log entry that is available to
3109 * be read. */
3110 uint64_t
3111 raft_get_commit_index(const struct raft *raft)
3112 {
3113 return raft->commit_index;
3114 }
3115
3116 static struct raft_server *
3117 raft_find_peer(struct raft *raft, const struct uuid *uuid)
3118 {
3119 struct raft_server *s = raft_find_server(raft, uuid);
3120 return s && !uuid_equals(&raft->sid, &s->sid) ? s : NULL;
3121 }
3122
3123 static struct raft_server *
3124 raft_find_new_server(struct raft *raft, const struct uuid *uuid)
3125 {
3126 return raft_server_find(&raft->add_servers, uuid);
3127 }
3128
3129 /* Figure 3.1: "If there exists an N such that N > commitIndex, a
3130 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
3131 * commitIndex = N (sections 3.5 and 3.6)." */
3132 static void
3133 raft_consider_updating_commit_index(struct raft *raft)
3134 {
3135 /* This loop cannot just bail out when it comes across a log entry that
3136 * does not match the criteria. For example, Figure 3.7(d2) shows a
3137 * case where the log entry for term 2 cannot be committed directly
3138 * (because it is not for the current term) but it can be committed as
3139 * a side effect of commit the entry for term 4 (the current term).
3140 * XXX Is there a more efficient way to do this? */
3141 ovs_assert(raft->role == RAFT_LEADER);
3142
3143 uint64_t new_commit_index = raft->commit_index;
3144 for (uint64_t idx = MAX(raft->commit_index + 1, raft->log_start);
3145 idx < raft->log_end; idx++) {
3146 if (raft->entries[idx - raft->log_start].term == raft->term) {
3147 size_t count = 0;
3148 struct raft_server *s2;
3149 HMAP_FOR_EACH (s2, hmap_node, &raft->servers) {
3150 if (s2->match_index >= idx) {
3151 count++;
3152 }
3153 }
3154 if (count > hmap_count(&raft->servers) / 2) {
3155 VLOG_DBG("index %"PRIu64" committed to %"PRIuSIZE" servers, "
3156 "applying", idx, count);
3157 new_commit_index = idx;
3158 }
3159 }
3160 }
3161 if (raft_update_commit_index(raft, new_commit_index)) {
3162 raft_send_heartbeats(raft);
3163 }
3164 }
3165
3166 static void
3167 raft_update_match_index(struct raft *raft, struct raft_server *s,
3168 uint64_t min_index)
3169 {
3170 ovs_assert(raft->role == RAFT_LEADER);
3171 if (min_index > s->match_index) {
3172 s->match_index = min_index;
3173 raft_consider_updating_commit_index(raft);
3174 }
3175 }
3176
3177 static void
3178 raft_update_our_match_index(struct raft *raft, uint64_t min_index)
3179 {
3180 struct raft_server *server = raft_find_server(raft, &raft->sid);
3181 if (server) {
3182 raft_update_match_index(raft, server, min_index);
3183 }
3184 }
3185
3186 static void
3187 raft_send_install_snapshot_request(struct raft *raft,
3188 const struct raft_server *s,
3189 const char *comment)
3190 {
3191 union raft_rpc rpc = {
3192 .install_snapshot_request = {
3193 .common = {
3194 .type = RAFT_RPC_INSTALL_SNAPSHOT_REQUEST,
3195 .sid = s->sid,
3196 .comment = CONST_CAST(char *, comment),
3197 },
3198 .term = raft->term,
3199 .last_index = raft->log_start - 1,
3200 .last_term = raft->snap.term,
3201 .last_servers = raft->snap.servers,
3202 .last_eid = raft->snap.eid,
3203 .data = raft->snap.data,
3204 }
3205 };
3206 raft_send(raft, &rpc);
3207 }
3208
3209 static void
3210 raft_handle_append_reply(struct raft *raft,
3211 const struct raft_append_reply *rpy)
3212 {
3213 if (raft->role != RAFT_LEADER) {
3214 VLOG_INFO("rejected append_reply (not leader)");
3215 return;
3216 }
3217
3218 /* Most commonly we'd be getting an AppendEntries reply from a configured
3219 * server (e.g. a peer), but we can also get them from servers in the
3220 * process of being added. */
3221 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3222 if (!s) {
3223 s = raft_find_new_server(raft, &rpy->common.sid);
3224 if (!s) {
3225 VLOG_INFO("rejected append_reply from unknown server "SID_FMT,
3226 SID_ARGS(&rpy->common.sid));
3227 return;
3228 }
3229 }
3230
3231 s->replied = true;
3232 if (rpy->result == RAFT_APPEND_OK) {
3233 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3234 * follower (section 3.5)." */
3235 uint64_t min_index = rpy->prev_log_index + rpy->n_entries + 1;
3236 if (s->next_index < min_index) {
3237 s->next_index = min_index;
3238 }
3239 raft_update_match_index(raft, s, min_index - 1);
3240 } else {
3241 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3242 * decrement nextIndex and retry (section 3.5)."
3243 *
3244 * We also implement the optimization suggested in section 4.2.1:
3245 * "Various approaches can make nextIndex converge to its correct value
3246 * more quickly, including those described in Chapter 3. The simplest
3247 * approach to solving this particular problem of adding a new server,
3248 * however, is to have followers return the length of their logs in the
3249 * AppendEntries response; this allows the leader to cap the follower’s
3250 * nextIndex accordingly." */
3251 s->next_index = (s->next_index > 0
3252 ? MIN(s->next_index - 1, rpy->log_end)
3253 : 0);
3254
3255 if (rpy->result == RAFT_APPEND_IO_ERROR) {
3256 /* Append failed but not because of a log inconsistency. Because
3257 * of the I/O error, there's no point in re-sending the append
3258 * immediately. */
3259 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3260 VLOG_INFO_RL(&rl, "%s reported I/O error", s->nickname);
3261 return;
3262 }
3263 }
3264
3265 /*
3266 * Our behavior here must depend on the value of next_index relative to
3267 * log_start and log_end. There are three cases:
3268 *
3269 * Case 1 | Case 2 | Case 3
3270 * <---------------->|<------------->|<------------------>
3271 * | |
3272 *
3273 * +---+---+---+---+
3274 * T | T | T | T | T |
3275 * +---+---+---+---+
3276 * ^ ^
3277 * | |
3278 * log_start log_end
3279 */
3280 if (s->next_index < raft->log_start) {
3281 /* Case 1. */
3282 raft_send_install_snapshot_request(raft, s, NULL);
3283 } else if (s->next_index < raft->log_end) {
3284 /* Case 2. */
3285 raft_send_append_request(raft, s, 1, NULL);
3286 } else {
3287 /* Case 3. */
3288 if (s->phase == RAFT_PHASE_CATCHUP) {
3289 s->phase = RAFT_PHASE_CAUGHT_UP;
3290 raft_run_reconfigure(raft);
3291 }
3292 }
3293 }
3294
3295 static bool
3296 raft_should_suppress_disruptive_server(struct raft *raft,
3297 const union raft_rpc *rpc)
3298 {
3299 if (rpc->type != RAFT_RPC_VOTE_REQUEST) {
3300 return false;
3301 }
3302
3303 /* Section 4.2.3 "Disruptive Servers" says:
3304 *
3305 * ...if a server receives a RequestVote request within the minimum
3306 * election timeout of hearing from a current leader, it does not update
3307 * its term or grant its vote...
3308 *
3309 * ...This change conflicts with the leadership transfer mechanism as
3310 * described in Chapter 3, in which a server legitimately starts an
3311 * election without waiting an election timeout. In that case,
3312 * RequestVote messages should be processed by other servers even when
3313 * they believe a current cluster leader exists. Those RequestVote
3314 * requests can include a special flag to indicate this behavior (“I
3315 * have permission to disrupt the leader--it told me to!”).
3316 *
3317 * This clearly describes how the followers should act, but not the leader.
3318 * We just ignore vote requests that arrive at a current leader. This
3319 * seems to be fairly safe, since a majority other than the current leader
3320 * can still elect a new leader and the first AppendEntries from that new
3321 * leader will depose the current leader. */
3322 const struct raft_vote_request *rq = raft_vote_request_cast(rpc);
3323 if (rq->leadership_transfer) {
3324 return false;
3325 }
3326
3327 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3328 long long int now = time_msec();
3329 switch (raft->role) {
3330 case RAFT_LEADER:
3331 VLOG_WARN_RL(&rl, "ignoring vote request received as leader");
3332 return true;
3333
3334 case RAFT_FOLLOWER:
3335 if (now < raft->election_base + ELECTION_BASE_MSEC) {
3336 VLOG_WARN_RL(&rl, "ignoring vote request received after only "
3337 "%lld ms (minimum election time is %d ms)",
3338 now - raft->election_base, ELECTION_BASE_MSEC);
3339 return true;
3340 }
3341 return false;
3342
3343 case RAFT_CANDIDATE:
3344 return false;
3345
3346 default:
3347 OVS_NOT_REACHED();
3348 }
3349 }
3350
3351 /* Returns true if a reply should be sent. */
3352 static bool
3353 raft_handle_vote_request__(struct raft *raft,
3354 const struct raft_vote_request *rq)
3355 {
3356 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3357 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3358 * 3.6)." */
3359 if (uuid_equals(&raft->vote, &rq->common.sid)) {
3360 /* Already voted for this candidate in this term. Resend vote. */
3361 return true;
3362 } else if (!uuid_is_zero(&raft->vote)) {
3363 /* Already voted for different candidate in this term. Send a reply
3364 * saying what candidate we did vote for. This isn't a necessary part
3365 * of the Raft protocol but it can make debugging easier. */
3366 return true;
3367 }
3368
3369 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3370 * includes information about the candidate’s log, and the voter denies its
3371 * vote if its own log is more up-to-date than that of the candidate. Raft
3372 * determines which of two logs is more up-to-date by comparing the index
3373 * and term of the last entries in the logs. If the logs have last entries
3374 * with different terms, then the log with the later term is more
3375 * up-to-date. If the logs end with the same term, then whichever log is
3376 * longer is more up-to-date." */
3377 uint64_t last_term = (raft->log_end > raft->log_start
3378 ? raft->entries[raft->log_end - 1
3379 - raft->log_start].term
3380 : raft->snap.term);
3381 if (last_term > rq->last_log_term
3382 || (last_term == rq->last_log_term
3383 && raft->log_end - 1 > rq->last_log_index)) {
3384 /* Our log is more up-to-date than the peer's. Withhold vote. */
3385 return false;
3386 }
3387
3388 /* Record a vote for the peer. */
3389 if (!raft_set_term(raft, raft->term, &rq->common.sid)) {
3390 return false;
3391 }
3392
3393 raft_reset_election_timer(raft);
3394
3395 return true;
3396 }
3397
3398 static void
3399 raft_send_vote_reply(struct raft *raft, const struct uuid *dst,
3400 const struct uuid *vote)
3401 {
3402 union raft_rpc rpy = {
3403 .vote_reply = {
3404 .common = {
3405 .type = RAFT_RPC_VOTE_REPLY,
3406 .sid = *dst,
3407 },
3408 .term = raft->term,
3409 .vote = *vote,
3410 },
3411 };
3412 raft_send(raft, &rpy);
3413 }
3414
3415 static void
3416 raft_handle_vote_request(struct raft *raft,
3417 const struct raft_vote_request *rq)
3418 {
3419 if (raft_handle_vote_request__(raft, rq)) {
3420 raft_send_vote_reply(raft, &rq->common.sid, &raft->vote);
3421 }
3422 }
3423
3424 static void
3425 raft_handle_vote_reply(struct raft *raft,
3426 const struct raft_vote_reply *rpy)
3427 {
3428 if (!raft_receive_term__(raft, &rpy->common, rpy->term)) {
3429 return;
3430 }
3431
3432 if (raft->role != RAFT_CANDIDATE) {
3433 return;
3434 }
3435
3436 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3437 if (s) {
3438 raft_accept_vote(raft, s, &rpy->vote);
3439 }
3440 }
3441
3442 /* Returns true if 'raft''s log contains reconfiguration entries that have not
3443 * yet been committed. */
3444 static bool
3445 raft_has_uncommitted_configuration(const struct raft *raft)
3446 {
3447 for (uint64_t i = raft->commit_index + 1; i < raft->log_end; i++) {
3448 ovs_assert(i >= raft->log_start);
3449 const struct raft_entry *e = &raft->entries[i - raft->log_start];
3450 if (e->servers) {
3451 return true;
3452 }
3453 }
3454 return false;
3455 }
3456
3457 static void
3458 raft_log_reconfiguration(struct raft *raft)
3459 {
3460 struct json *servers_json = raft_servers_to_json(&raft->servers);
3461 raft_command_unref(raft_command_execute__(
3462 raft, NULL, servers_json, NULL, NULL));
3463 json_destroy(servers_json);
3464 }
3465
3466 static void
3467 raft_run_reconfigure(struct raft *raft)
3468 {
3469 ovs_assert(raft->role == RAFT_LEADER);
3470
3471 /* Reconfiguration only progresses when configuration changes commit. */
3472 if (raft_has_uncommitted_configuration(raft)) {
3473 return;
3474 }
3475
3476 /* If we were waiting for a configuration change to commit, it's done. */
3477 struct raft_server *s;
3478 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3479 if (s->phase == RAFT_PHASE_COMMITTING) {
3480 raft_send_add_server_reply__(raft, &s->sid, s->address,
3481 true, RAFT_SERVER_COMPLETED);
3482 s->phase = RAFT_PHASE_STABLE;
3483 }
3484 }
3485 if (raft->remove_server) {
3486 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
3487 &raft->remove_server->requester_sid,
3488 raft->remove_server->requester_conn,
3489 true, RAFT_SERVER_COMPLETED);
3490 raft_server_destroy(raft->remove_server);
3491 raft->remove_server = NULL;
3492 }
3493
3494 /* If a new server is caught up, add it to the configuration. */
3495 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
3496 if (s->phase == RAFT_PHASE_CAUGHT_UP) {
3497 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3498 hmap_remove(&raft->add_servers, &s->hmap_node);
3499 hmap_insert(&raft->servers, &s->hmap_node, uuid_hash(&s->sid));
3500
3501 /* Mark 's' as waiting for commit. */
3502 s->phase = RAFT_PHASE_COMMITTING;
3503
3504 raft_log_reconfiguration(raft);
3505
3506 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3507 * send a RAFT_SERVER_OK reply. */
3508
3509 return;
3510 }
3511 }
3512
3513 /* Remove a server, if one is scheduled for removal. */
3514 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3515 if (s->phase == RAFT_PHASE_REMOVE) {
3516 hmap_remove(&raft->servers, &s->hmap_node);
3517 raft->remove_server = s;
3518
3519 raft_log_reconfiguration(raft);
3520
3521 return;
3522 }
3523 }
3524 }
3525
3526 static void
3527 raft_handle_add_server_request(struct raft *raft,
3528 const struct raft_add_server_request *rq)
3529 {
3530 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3531 if (raft->role != RAFT_LEADER) {
3532 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3533 return;
3534 }
3535
3536 /* Check for an existing server. */
3537 struct raft_server *s = raft_find_server(raft, &rq->common.sid);
3538 if (s) {
3539 /* If the server is scheduled to be removed, cancel it. */
3540 if (s->phase == RAFT_PHASE_REMOVE) {
3541 s->phase = RAFT_PHASE_STABLE;
3542 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_CANCELED);
3543 return;
3544 }
3545
3546 /* If the server is being added, then it's in progress. */
3547 if (s->phase != RAFT_PHASE_STABLE) {
3548 raft_send_add_server_reply(raft, rq,
3549 false, RAFT_SERVER_IN_PROGRESS);
3550 }
3551
3552 /* Nothing to do--server is already part of the configuration. */
3553 raft_send_add_server_reply(raft, rq,
3554 true, RAFT_SERVER_ALREADY_PRESENT);
3555 return;
3556 }
3557
3558 /* Check for a server being removed. */
3559 if (raft->remove_server
3560 && uuid_equals(&rq->common.sid, &raft->remove_server->sid)) {
3561 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3562 return;
3563 }
3564
3565 /* Check for a server already being added. */
3566 if (raft_find_new_server(raft, &rq->common.sid)) {
3567 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_IN_PROGRESS);
3568 return;
3569 }
3570
3571 /* Add server to 'add_servers'. */
3572 s = raft_server_add(&raft->add_servers, &rq->common.sid, rq->address);
3573 raft_server_init_leader(raft, s);
3574 s->requester_sid = rq->common.sid;
3575 s->requester_conn = NULL;
3576 s->phase = RAFT_PHASE_CATCHUP;
3577
3578 /* Start sending the log. If this is the first time we've tried to add
3579 * this server, then this will quickly degenerate into an InstallSnapshot
3580 * followed by a series of AddEntries, but if it's a retry of an earlier
3581 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3582 * leadership) then it will gracefully resume populating the log.
3583 *
3584 * See the last few paragraphs of section 4.2.1 for further insight. */
3585 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3586 VLOG_INFO_RL(&rl,
3587 "starting to add server %s ("SID_FMT" at %s) "
3588 "to cluster "CID_FMT, s->nickname, SID_ARGS(&s->sid),
3589 rq->address, CID_ARGS(&raft->cid));
3590 raft_send_append_request(raft, s, 0, "initialize new server");
3591 }
3592
3593 static void
3594 raft_handle_add_server_reply(struct raft *raft,
3595 const struct raft_add_server_reply *rpy)
3596 {
3597 if (!raft->joining) {
3598 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3599 VLOG_WARN_RL(&rl, "received add_server_reply even though we're "
3600 "already part of the cluster");
3601 return;
3602 }
3603
3604 if (rpy->success) {
3605 raft->joining = false;
3606
3607 /* It is tempting, at this point, to check that this server is part of
3608 * the current configuration. However, this is not necessarily the
3609 * case, because the log entry that added this server to the cluster
3610 * might have been committed by a majority of the cluster that does not
3611 * include this one. This actually happens in testing. */
3612 } else {
3613 const char *address;
3614 SSET_FOR_EACH (address, &rpy->remote_addresses) {
3615 if (sset_add(&raft->remote_addresses, address)) {
3616 VLOG_INFO("%s: learned new server address for joining cluster",
3617 address);
3618 }
3619 }
3620 }
3621 }
3622
3623 /* This is called by raft_unixctl_kick() as well as via RPC. */
3624 static void
3625 raft_handle_remove_server_request(struct raft *raft,
3626 const struct raft_remove_server_request *rq)
3627 {
3628 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3629 if (raft->role != RAFT_LEADER) {
3630 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3631 return;
3632 }
3633
3634 /* If the server to remove is currently waiting to be added, cancel it. */
3635 struct raft_server *target = raft_find_new_server(raft, &rq->sid);
3636 if (target) {
3637 raft_send_add_server_reply__(raft, &target->sid, target->address,
3638 false, RAFT_SERVER_CANCELED);
3639 hmap_remove(&raft->add_servers, &target->hmap_node);
3640 raft_server_destroy(target);
3641 return;
3642 }
3643
3644 /* If the server isn't configured, report that. */
3645 target = raft_find_server(raft, &rq->sid);
3646 if (!target) {
3647 raft_send_remove_server_reply(raft, rq,
3648 true, RAFT_SERVER_ALREADY_GONE);
3649 return;
3650 }
3651
3652 /* Check whether we're waiting for the addition of the server to commit. */
3653 if (target->phase == RAFT_PHASE_COMMITTING) {
3654 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3655 return;
3656 }
3657
3658 /* Check whether the server is already scheduled for removal. */
3659 if (target->phase == RAFT_PHASE_REMOVE) {
3660 raft_send_remove_server_reply(raft, rq,
3661 false, RAFT_SERVER_IN_PROGRESS);
3662 return;
3663 }
3664
3665 /* Make sure that if we remove this server then that at least one other
3666 * server will be left. We don't count servers currently being added (in
3667 * 'add_servers') since those could fail. */
3668 struct raft_server *s;
3669 int n = 0;
3670 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3671 if (s != target && s->phase != RAFT_PHASE_REMOVE) {
3672 n++;
3673 }
3674 }
3675 if (!n) {
3676 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_EMPTY);
3677 return;
3678 }
3679
3680 /* Mark the server for removal. */
3681 target->phase = RAFT_PHASE_REMOVE;
3682 if (rq->requester_conn) {
3683 target->requester_sid = UUID_ZERO;
3684 unixctl_command_reply(rq->requester_conn, "started removal");
3685 } else {
3686 target->requester_sid = rq->common.sid;
3687 target->requester_conn = NULL;
3688 }
3689
3690 raft_run_reconfigure(raft);
3691 /* Operation in progress, reply will be sent later. */
3692 }
3693
3694 static void
3695 raft_finished_leaving_cluster(struct raft *raft)
3696 {
3697 VLOG_INFO(SID_FMT": finished leaving cluster "CID_FMT,
3698 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
3699
3700 raft_record_note(raft, "left", "this server left the cluster");
3701
3702 raft->leaving = false;
3703 raft->left = true;
3704 }
3705
3706 static void
3707 raft_handle_remove_server_reply(struct raft *raft,
3708 const struct raft_remove_server_reply *rpc)
3709 {
3710 if (rpc->success
3711 && (uuid_is_zero(&rpc->target_sid)
3712 || uuid_equals(&rpc->target_sid, &raft->sid))) {
3713 raft_finished_leaving_cluster(raft);
3714 }
3715 }
3716
3717 static bool
3718 raft_handle_write_error(struct raft *raft, struct ovsdb_error *error)
3719 {
3720 if (error && !raft->failed) {
3721 raft->failed = true;
3722
3723 char *s = ovsdb_error_to_string_free(error);
3724 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3725 raft->name, s);
3726 free(s);
3727 }
3728 return !raft->failed;
3729 }
3730
3731 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3732 raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
3733 uint64_t new_log_start,
3734 const struct raft_entry *new_snapshot)
3735 {
3736 struct raft_header h = {
3737 .sid = raft->sid,
3738 .cid = raft->cid,
3739 .name = raft->name,
3740 .local_address = raft->local_address,
3741 .snap_index = new_log_start - 1,
3742 .snap = *new_snapshot,
3743 };
3744 struct ovsdb_error *error = ovsdb_log_write_and_free(
3745 log, raft_header_to_json(&h));
3746 if (error) {
3747 return error;
3748 }
3749 ovsdb_log_mark_base(raft->log);
3750
3751 /* Write log records. */
3752 for (uint64_t index = new_log_start; index < raft->log_end; index++) {
3753 const struct raft_entry *e = &raft->entries[index - raft->log_start];
3754 struct raft_record r = {
3755 .type = RAFT_REC_ENTRY,
3756 .term = e->term,
3757 .entry = {
3758 .index = index,
3759 .data = e->data,
3760 .servers = e->servers,
3761 .eid = e->eid,
3762 },
3763 };
3764 error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3765 if (error) {
3766 return error;
3767 }
3768 }
3769
3770 /* Write term and vote (if any).
3771 *
3772 * The term is redundant if we wrote a log record for that term above. The
3773 * vote, if any, is never redundant.
3774 */
3775 error = raft_write_state(log, raft->term, &raft->vote);
3776 if (error) {
3777 return error;
3778 }
3779
3780 /* Write commit_index if it's beyond the new start of the log. */
3781 if (raft->commit_index >= new_log_start) {
3782 struct raft_record r = {
3783 .type = RAFT_REC_COMMIT_INDEX,
3784 .commit_index = raft->commit_index,
3785 };
3786 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3787 }
3788 return NULL;
3789 }
3790
3791 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3792 raft_save_snapshot(struct raft *raft,
3793 uint64_t new_start, const struct raft_entry *new_snapshot)
3794
3795 {
3796 struct ovsdb_log *new_log;
3797 struct ovsdb_error *error;
3798 error = ovsdb_log_replace_start(raft->log, &new_log);
3799 if (error) {
3800 return error;
3801 }
3802
3803 error = raft_write_snapshot(raft, new_log, new_start, new_snapshot);
3804 if (error) {
3805 ovsdb_log_replace_abort(new_log);
3806 return error;
3807 }
3808
3809 return ovsdb_log_replace_commit(raft->log, new_log);
3810 }
3811
3812 static bool
3813 raft_handle_install_snapshot_request__(
3814 struct raft *raft, const struct raft_install_snapshot_request *rq)
3815 {
3816 raft_reset_election_timer(raft);
3817
3818 /*
3819 * Our behavior here depend on new_log_start in the snapshot compared to
3820 * log_start and log_end. There are three cases:
3821 *
3822 * Case 1 | Case 2 | Case 3
3823 * <---------------->|<------------->|<------------------>
3824 * | |
3825 *
3826 * +---+---+---+---+
3827 * T | T | T | T | T |
3828 * +---+---+---+---+
3829 * ^ ^
3830 * | |
3831 * log_start log_end
3832 */
3833 uint64_t new_log_start = rq->last_index + 1;
3834 if (new_log_start < raft->log_start) {
3835 /* Case 1: The new snapshot covers less than our current one. Nothing
3836 * to do. */
3837 return true;
3838 } else if (new_log_start < raft->log_end) {
3839 /* Case 2: The new snapshot starts in the middle of our log. We could
3840 * discard the first 'new_log_start - raft->log_start' entries in the
3841 * log. But there's not much value in that, since snapshotting is
3842 * supposed to be a local decision. Just skip it. */
3843 return true;
3844 }
3845
3846 /* Case 3: The new snapshot starts past the end of our current log, so
3847 * discard all of our current log. */
3848 const struct raft_entry new_snapshot = {
3849 .term = rq->last_term,
3850 .data = rq->data,
3851 .eid = rq->last_eid,
3852 .servers = rq->last_servers,
3853 };
3854 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3855 &new_snapshot);
3856 if (error) {
3857 char *error_s = ovsdb_error_to_string(error);
3858 VLOG_WARN("could not save snapshot: %s", error_s);
3859 free(error_s);
3860 return false;
3861 }
3862
3863 for (size_t i = 0; i < raft->log_end - raft->log_start; i++) {
3864 raft_entry_uninit(&raft->entries[i]);
3865 }
3866 raft->log_start = raft->log_end = new_log_start;
3867 raft->log_synced = raft->log_end - 1;
3868 raft->commit_index = raft->log_start - 1;
3869 if (raft->last_applied < raft->commit_index) {
3870 raft->last_applied = raft->log_start - 2;
3871 }
3872
3873 raft_entry_uninit(&raft->snap);
3874 raft_entry_clone(&raft->snap, &new_snapshot);
3875
3876 raft_get_servers_from_log(raft, VLL_INFO);
3877
3878 return true;
3879 }
3880
3881 static void
3882 raft_handle_install_snapshot_request(
3883 struct raft *raft, const struct raft_install_snapshot_request *rq)
3884 {
3885 if (raft_handle_install_snapshot_request__(raft, rq)) {
3886 union raft_rpc rpy = {
3887 .install_snapshot_reply = {
3888 .common = {
3889 .type = RAFT_RPC_INSTALL_SNAPSHOT_REPLY,
3890 .sid = rq->common.sid,
3891 },
3892 .term = raft->term,
3893 .last_index = rq->last_index,
3894 .last_term = rq->last_term,
3895 },
3896 };
3897 raft_send(raft, &rpy);
3898 }
3899 }
3900
3901 static void
3902 raft_handle_install_snapshot_reply(
3903 struct raft *raft, const struct raft_install_snapshot_reply *rpy)
3904 {
3905 /* We might get an InstallSnapshot reply from a configured server (e.g. a
3906 * peer) or a server in the process of being added. */
3907 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3908 if (!s) {
3909 s = raft_find_new_server(raft, &rpy->common.sid);
3910 if (!s) {
3911 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3912 VLOG_INFO_RL(&rl, "cluster "CID_FMT": received %s from "
3913 "unknown server "SID_FMT, CID_ARGS(&raft->cid),
3914 raft_rpc_type_to_string(rpy->common.type),
3915 SID_ARGS(&rpy->common.sid));
3916 return;
3917 }
3918 }
3919
3920 if (rpy->last_index != raft->log_start - 1 ||
3921 rpy->last_term != raft->snap.term) {
3922 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3923 VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s installed "
3924 "out-of-date snapshot, starting over",
3925 CID_ARGS(&raft->cid), s->nickname);
3926 raft_send_install_snapshot_request(raft, s,
3927 "installed obsolete snapshot");
3928 return;
3929 }
3930
3931 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3932 VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
3933 " up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
3934 s->nickname, rpy->last_term, rpy->last_index);
3935 s->next_index = raft->log_end;
3936 raft_send_append_request(raft, s, 0, "snapshot installed");
3937 }
3938
3939 /* Returns true if 'raft' has grown enough since the last snapshot that
3940 * reducing the log to a snapshot would be valuable, false otherwise. */
3941 bool
3942 raft_grew_lots(const struct raft *raft)
3943 {
3944 return ovsdb_log_grew_lots(raft->log);
3945 }
3946
3947 /* Returns the number of log entries that could be trimmed off the on-disk log
3948 * by snapshotting. */
3949 uint64_t
3950 raft_get_log_length(const struct raft *raft)
3951 {
3952 return (raft->last_applied < raft->log_start
3953 ? 0
3954 : raft->last_applied - raft->log_start + 1);
3955 }
3956
3957 /* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
3958 * possible. */
3959 bool
3960 raft_may_snapshot(const struct raft *raft)
3961 {
3962 return (!raft->joining
3963 && !raft->leaving
3964 && !raft->left
3965 && !raft->failed
3966 && raft->last_applied >= raft->log_start);
3967 }
3968
3969 /* Replaces the log for 'raft', up to the last log entry read, by
3970 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
3971 * the caller must eventually free.
3972 *
3973 * This function can only succeed if raft_may_snapshot() returns true. It is
3974 * only valuable to call it if raft_get_log_length() is significant and
3975 * especially if raft_grew_lots() returns true. */
3976 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3977 raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
3978 {
3979 if (raft->joining) {
3980 return ovsdb_error(NULL,
3981 "cannot store a snapshot while joining cluster");
3982 } else if (raft->leaving) {
3983 return ovsdb_error(NULL,
3984 "cannot store a snapshot while leaving cluster");
3985 } else if (raft->left) {
3986 return ovsdb_error(NULL,
3987 "cannot store a snapshot after leaving cluster");
3988 } else if (raft->failed) {
3989 return ovsdb_error(NULL,
3990 "cannot store a snapshot following failure");
3991 }
3992
3993 if (raft->last_applied < raft->log_start) {
3994 return ovsdb_error(NULL, "not storing a duplicate snapshot");
3995 }
3996
3997 uint64_t new_log_start = raft->last_applied + 1;
3998 struct raft_entry new_snapshot = {
3999 .term = raft_get_term(raft, new_log_start - 1),
4000 .data = json_clone(new_snapshot_data),
4001 .eid = *raft_get_eid(raft, new_log_start - 1),
4002 .servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
4003 };
4004 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
4005 &new_snapshot);
4006 if (error) {
4007 raft_entry_uninit(&new_snapshot);
4008 return error;
4009 }
4010
4011 raft->log_synced = raft->log_end - 1;
4012 raft_entry_uninit(&raft->snap);
4013 raft->snap = new_snapshot;
4014 for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
4015 raft_entry_uninit(&raft->entries[i]);
4016 }
4017 memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
4018 (raft->log_end - new_log_start) * sizeof *raft->entries);
4019 raft->log_start = new_log_start;
4020 return NULL;
4021 }
4022
4023 static void
4024 raft_handle_become_leader(struct raft *raft,
4025 const struct raft_become_leader *rq)
4026 {
4027 if (raft->role == RAFT_FOLLOWER) {
4028 char buf[SID_LEN + 1];
4029 VLOG_INFO("received leadership transfer from %s in term %"PRIu64,
4030 raft_get_nickname(raft, &rq->common.sid, buf, sizeof buf),
4031 rq->term);
4032 raft_start_election(raft, true);
4033 }
4034 }
4035
4036 static void
4037 raft_send_execute_command_reply(struct raft *raft,
4038 const struct uuid *sid,
4039 const struct uuid *eid,
4040 enum raft_command_status status,
4041 uint64_t commit_index)
4042 {
4043 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REP) {
4044 ovs_fatal(0, "Raft test: crash before sending execute_command_reply");
4045 }
4046 union raft_rpc rpc = {
4047 .execute_command_reply = {
4048 .common = {
4049 .type = RAFT_RPC_EXECUTE_COMMAND_REPLY,
4050 .sid = *sid,
4051 },
4052 .result = *eid,
4053 .status = status,
4054 .commit_index = commit_index,
4055 },
4056 };
4057 raft_send(raft, &rpc);
4058 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REP) {
4059 ovs_fatal(0, "Raft test: crash after sending execute_command_reply.");
4060 }
4061 }
4062
4063 static enum raft_command_status
4064 raft_handle_execute_command_request__(
4065 struct raft *raft, const struct raft_execute_command_request *rq)
4066 {
4067 if (raft->role != RAFT_LEADER) {
4068 return RAFT_CMD_NOT_LEADER;
4069 }
4070
4071 const struct uuid *current_eid = raft_current_eid(raft);
4072 if (!uuid_equals(&rq->prereq, current_eid)) {
4073 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4074 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
4075 "prerequisite "UUID_FMT" in execute_command_request",
4076 UUID_ARGS(current_eid), UUID_ARGS(&rq->prereq));
4077 return RAFT_CMD_BAD_PREREQ;
4078 }
4079
4080 struct raft_command *cmd = raft_command_initiate(raft, rq->data,
4081 NULL, &rq->result);
4082 cmd->sid = rq->common.sid;
4083
4084 enum raft_command_status status = cmd->status;
4085 if (status != RAFT_CMD_INCOMPLETE) {
4086 raft_command_unref(cmd);
4087 }
4088 return status;
4089 }
4090
4091 static void
4092 raft_handle_execute_command_request(
4093 struct raft *raft, const struct raft_execute_command_request *rq)
4094 {
4095 enum raft_command_status status
4096 = raft_handle_execute_command_request__(raft, rq);
4097 if (status != RAFT_CMD_INCOMPLETE) {
4098 raft_send_execute_command_reply(raft, &rq->common.sid, &rq->result,
4099 status, 0);
4100 }
4101 }
4102
4103 static void
4104 raft_handle_execute_command_reply(
4105 struct raft *raft, const struct raft_execute_command_reply *rpy)
4106 {
4107 struct raft_command *cmd = raft_find_command_by_eid(raft, &rpy->result);
4108 if (!cmd) {
4109 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4110 char buf[SID_LEN + 1];
4111 VLOG_INFO_RL(&rl,
4112 "%s received \"%s\" reply from %s for unknown command",
4113 raft->local_nickname,
4114 raft_command_status_to_string(rpy->status),
4115 raft_get_nickname(raft, &rpy->common.sid,
4116 buf, sizeof buf));
4117 return;
4118 }
4119
4120 if (rpy->status == RAFT_CMD_INCOMPLETE) {
4121 cmd->timestamp = time_msec();
4122 } else {
4123 cmd->index = rpy->commit_index;
4124 raft_command_complete(raft, cmd, rpy->status);
4125 }
4126 }
4127
4128 static void
4129 raft_handle_rpc(struct raft *raft, const union raft_rpc *rpc)
4130 {
4131 uint64_t term = raft_rpc_get_term(rpc);
4132 if (term
4133 && !raft_should_suppress_disruptive_server(raft, rpc)
4134 && !raft_receive_term__(raft, &rpc->common, term)) {
4135 if (rpc->type == RAFT_RPC_APPEND_REQUEST) {
4136 /* Section 3.3: "If a server receives a request with a stale term
4137 * number, it rejects the request." */
4138 raft_send_append_reply(raft, raft_append_request_cast(rpc),
4139 RAFT_APPEND_INCONSISTENCY, "stale term");
4140 }
4141 return;
4142 }
4143
4144 switch (rpc->type) {
4145 #define RAFT_RPC(ENUM, NAME) \
4146 case ENUM: \
4147 raft_handle_##NAME(raft, &rpc->NAME); \
4148 break;
4149 RAFT_RPC_TYPES
4150 #undef RAFT_RPC
4151 default:
4152 OVS_NOT_REACHED();
4153 }
4154 }
4155 \f
4156 static bool
4157 raft_rpc_is_heartbeat(const union raft_rpc *rpc)
4158 {
4159 return ((rpc->type == RAFT_RPC_APPEND_REQUEST
4160 || rpc->type == RAFT_RPC_APPEND_REPLY)
4161 && rpc->common.comment
4162 && !strcmp(rpc->common.comment, "heartbeat"));
4163 }
4164
4165 \f
4166 static bool
4167 raft_send_to_conn_at(struct raft *raft, const union raft_rpc *rpc,
4168 struct raft_conn *conn, int line_number)
4169 {
4170 log_rpc(rpc, "-->", conn, line_number);
4171 return !jsonrpc_session_send(
4172 conn->js, raft_rpc_to_jsonrpc(&raft->cid, &raft->sid, rpc));
4173 }
4174
4175 static bool
4176 raft_is_rpc_synced(const struct raft *raft, const union raft_rpc *rpc)
4177 {
4178 uint64_t term = raft_rpc_get_term(rpc);
4179 uint64_t index = raft_rpc_get_min_sync_index(rpc);
4180 const struct uuid *vote = raft_rpc_get_vote(rpc);
4181
4182 return (term <= raft->synced_term
4183 && index <= raft->log_synced
4184 && (!vote || uuid_equals(vote, &raft->synced_vote)));
4185 }
4186
4187 static bool
4188 raft_send_at(struct raft *raft, const union raft_rpc *rpc, int line_number)
4189 {
4190 const struct uuid *dst = &rpc->common.sid;
4191 if (uuid_equals(dst, &raft->sid)) {
4192 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4193 VLOG_WARN_RL(&rl, "attempted to send RPC to self from raft.c:%d",
4194 line_number);
4195 return false;
4196 }
4197
4198 struct raft_conn *conn = raft_find_conn_by_sid(raft, dst);
4199 if (!conn) {
4200 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4201 char buf[SID_LEN + 1];
4202 VLOG_DBG_RL(&rl, "%s: no connection to %s, cannot send RPC "
4203 "from raft.c:%d", raft->local_nickname,
4204 raft_get_nickname(raft, dst, buf, sizeof buf),
4205 line_number);
4206 return false;
4207 }
4208
4209 if (!raft_is_rpc_synced(raft, rpc)) {
4210 raft_waiter_create(raft, RAFT_W_RPC, false)->rpc = raft_rpc_clone(rpc);
4211 return true;
4212 }
4213
4214 return raft_send_to_conn_at(raft, rpc, conn, line_number);
4215 }
4216 \f
4217 static struct raft *
4218 raft_lookup_by_name(const char *name)
4219 {
4220 struct raft *raft;
4221
4222 HMAP_FOR_EACH_WITH_HASH (raft, hmap_node, hash_string(name, 0),
4223 &all_rafts) {
4224 if (!strcmp(raft->name, name)) {
4225 return raft;
4226 }
4227 }
4228 return NULL;
4229 }
4230
4231 static void
4232 raft_unixctl_cid(struct unixctl_conn *conn,
4233 int argc OVS_UNUSED, const char *argv[],
4234 void *aux OVS_UNUSED)
4235 {
4236 struct raft *raft = raft_lookup_by_name(argv[1]);
4237 if (!raft) {
4238 unixctl_command_reply_error(conn, "unknown cluster");
4239 } else if (uuid_is_zero(&raft->cid)) {
4240 unixctl_command_reply_error(conn, "cluster id not yet known");
4241 } else {
4242 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->cid));
4243 unixctl_command_reply(conn, uuid);
4244 free(uuid);
4245 }
4246 }
4247
4248 static void
4249 raft_unixctl_sid(struct unixctl_conn *conn,
4250 int argc OVS_UNUSED, const char *argv[],
4251 void *aux OVS_UNUSED)
4252 {
4253 struct raft *raft = raft_lookup_by_name(argv[1]);
4254 if (!raft) {
4255 unixctl_command_reply_error(conn, "unknown cluster");
4256 } else {
4257 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->sid));
4258 unixctl_command_reply(conn, uuid);
4259 free(uuid);
4260 }
4261 }
4262
4263 static void
4264 raft_put_sid(const char *title, const struct uuid *sid,
4265 const struct raft *raft, struct ds *s)
4266 {
4267 ds_put_format(s, "%s: ", title);
4268 if (uuid_equals(sid, &raft->sid)) {
4269 ds_put_cstr(s, "self");
4270 } else if (uuid_is_zero(sid)) {
4271 ds_put_cstr(s, "unknown");
4272 } else {
4273 char buf[SID_LEN + 1];
4274 ds_put_cstr(s, raft_get_nickname(raft, sid, buf, sizeof buf));
4275 }
4276 ds_put_char(s, '\n');
4277 }
4278
4279 static void
4280 raft_unixctl_status(struct unixctl_conn *conn,
4281 int argc OVS_UNUSED, const char *argv[],
4282 void *aux OVS_UNUSED)
4283 {
4284 struct raft *raft = raft_lookup_by_name(argv[1]);
4285 if (!raft) {
4286 unixctl_command_reply_error(conn, "unknown cluster");
4287 return;
4288 }
4289
4290 struct ds s = DS_EMPTY_INITIALIZER;
4291 ds_put_format(&s, "%s\n", raft->local_nickname);
4292 ds_put_format(&s, "Name: %s\n", raft->name);
4293 ds_put_format(&s, "Cluster ID: ");
4294 if (!uuid_is_zero(&raft->cid)) {
4295 ds_put_format(&s, CID_FMT" ("UUID_FMT")\n",
4296 CID_ARGS(&raft->cid), UUID_ARGS(&raft->cid));
4297 } else {
4298 ds_put_format(&s, "not yet known\n");
4299 }
4300 ds_put_format(&s, "Server ID: "SID_FMT" ("UUID_FMT")\n",
4301 SID_ARGS(&raft->sid), UUID_ARGS(&raft->sid));
4302 ds_put_format(&s, "Address: %s\n", raft->local_address);
4303 ds_put_format(&s, "Status: %s\n",
4304 raft->joining ? "joining cluster"
4305 : raft->leaving ? "leaving cluster"
4306 : raft->left ? "left cluster"
4307 : raft->failed ? "failed"
4308 : "cluster member");
4309 if (raft->joining) {
4310 ds_put_format(&s, "Remotes for joining:");
4311 const char *address;
4312 SSET_FOR_EACH (address, &raft->remote_addresses) {
4313 ds_put_format(&s, " %s", address);
4314 }
4315 ds_put_char(&s, '\n');
4316 }
4317 if (raft->role == RAFT_LEADER) {
4318 struct raft_server *as;
4319 HMAP_FOR_EACH (as, hmap_node, &raft->add_servers) {
4320 ds_put_format(&s, "Adding server %s ("SID_FMT" at %s) (%s)\n",
4321 as->nickname, SID_ARGS(&as->sid), as->address,
4322 raft_server_phase_to_string(as->phase));
4323 }
4324
4325 struct raft_server *rs = raft->remove_server;
4326 if (rs) {
4327 ds_put_format(&s, "Removing server %s ("SID_FMT" at %s) (%s)\n",
4328 rs->nickname, SID_ARGS(&rs->sid), rs->address,
4329 raft_server_phase_to_string(rs->phase));
4330 }
4331 }
4332
4333 ds_put_format(&s, "Role: %s\n",
4334 raft->role == RAFT_LEADER ? "leader"
4335 : raft->role == RAFT_CANDIDATE ? "candidate"
4336 : raft->role == RAFT_FOLLOWER ? "follower"
4337 : "<error>");
4338 ds_put_format(&s, "Term: %"PRIu64"\n", raft->term);
4339 raft_put_sid("Leader", &raft->leader_sid, raft, &s);
4340 raft_put_sid("Vote", &raft->vote, raft, &s);
4341 ds_put_char(&s, '\n');
4342
4343 ds_put_format(&s, "Log: [%"PRIu64", %"PRIu64"]\n",
4344 raft->log_start, raft->log_end);
4345
4346 uint64_t n_uncommitted = raft->log_end - raft->commit_index - 1;
4347 ds_put_format(&s, "Entries not yet committed: %"PRIu64"\n", n_uncommitted);
4348
4349 uint64_t n_unapplied = raft->log_end - raft->last_applied - 1;
4350 ds_put_format(&s, "Entries not yet applied: %"PRIu64"\n", n_unapplied);
4351
4352 const struct raft_conn *c;
4353 ds_put_cstr(&s, "Connections:");
4354 LIST_FOR_EACH (c, list_node, &raft->conns) {
4355 bool connected = jsonrpc_session_is_connected(c->js);
4356 ds_put_format(&s, " %s%s%s%s",
4357 connected ? "" : "(",
4358 c->incoming ? "<-" : "->", c->nickname,
4359 connected ? "" : ")");
4360 }
4361 ds_put_char(&s, '\n');
4362
4363 ds_put_cstr(&s, "Servers:\n");
4364 struct raft_server *server;
4365 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
4366 ds_put_format(&s, " %s ("SID_FMT" at %s)",
4367 server->nickname,
4368 SID_ARGS(&server->sid), server->address);
4369 if (uuid_equals(&server->sid, &raft->sid)) {
4370 ds_put_cstr(&s, " (self)");
4371 }
4372 if (server->phase != RAFT_PHASE_STABLE) {
4373 ds_put_format (&s, " (%s)",
4374 raft_server_phase_to_string(server->phase));
4375 }
4376 if (raft->role == RAFT_CANDIDATE) {
4377 if (!uuid_is_zero(&server->vote)) {
4378 char buf[SID_LEN + 1];
4379 ds_put_format(&s, " (voted for %s)",
4380 raft_get_nickname(raft, &server->vote,
4381 buf, sizeof buf));
4382 }
4383 } else if (raft->role == RAFT_LEADER) {
4384 ds_put_format(&s, " next_index=%"PRIu64" match_index=%"PRIu64,
4385 server->next_index, server->match_index);
4386 }
4387 ds_put_char(&s, '\n');
4388 }
4389
4390 unixctl_command_reply(conn, ds_cstr(&s));
4391 ds_destroy(&s);
4392 }
4393
4394 static void
4395 raft_unixctl_leave__(struct unixctl_conn *conn, struct raft *raft)
4396 {
4397 if (raft_is_leaving(raft)) {
4398 unixctl_command_reply_error(conn,
4399 "already in progress leaving cluster");
4400 } else if (raft_is_joining(raft)) {
4401 unixctl_command_reply_error(conn,
4402 "can't leave while join in progress");
4403 } else if (raft_failed(raft)) {
4404 unixctl_command_reply_error(conn,
4405 "can't leave after failure");
4406 } else {
4407 raft_leave(raft);
4408 unixctl_command_reply(conn, NULL);
4409 }
4410 }
4411
4412 static void
4413 raft_unixctl_leave(struct unixctl_conn *conn, int argc OVS_UNUSED,
4414 const char *argv[], void *aux OVS_UNUSED)
4415 {
4416 struct raft *raft = raft_lookup_by_name(argv[1]);
4417 if (!raft) {
4418 unixctl_command_reply_error(conn, "unknown cluster");
4419 return;
4420 }
4421
4422 raft_unixctl_leave__(conn, raft);
4423 }
4424
4425 static struct raft_server *
4426 raft_lookup_server_best_match(struct raft *raft, const char *id)
4427 {
4428 struct raft_server *best = NULL;
4429 int best_score = -1;
4430 int n_best = 0;
4431
4432 struct raft_server *s;
4433 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
4434 int score = (!strcmp(id, s->address)
4435 ? INT_MAX
4436 : uuid_is_partial_match(&s->sid, id));
4437 if (score > best_score) {
4438 best = s;
4439 best_score = score;
4440 n_best = 1;
4441 } else if (score == best_score) {
4442 n_best++;
4443 }
4444 }
4445 return n_best == 1 ? best : NULL;
4446 }
4447
4448 static void
4449 raft_unixctl_kick(struct unixctl_conn *conn, int argc OVS_UNUSED,
4450 const char *argv[], void *aux OVS_UNUSED)
4451 {
4452 const char *cluster_name = argv[1];
4453 const char *server_name = argv[2];
4454
4455 struct raft *raft = raft_lookup_by_name(cluster_name);
4456 if (!raft) {
4457 unixctl_command_reply_error(conn, "unknown cluster");
4458 return;
4459 }
4460
4461 struct raft_server *server = raft_lookup_server_best_match(raft,
4462 server_name);
4463 if (!server) {
4464 unixctl_command_reply_error(conn, "unknown server");
4465 return;
4466 }
4467
4468 if (uuid_equals(&server->sid, &raft->sid)) {
4469 raft_unixctl_leave__(conn, raft);
4470 } else if (raft->role == RAFT_LEADER) {
4471 const struct raft_remove_server_request rq = {
4472 .sid = server->sid,
4473 .requester_conn = conn,
4474 };
4475 raft_handle_remove_server_request(raft, &rq);
4476 } else {
4477 const union raft_rpc rpc = {
4478 .remove_server_request = {
4479 .common = {
4480 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
4481 .sid = raft->leader_sid,
4482 .comment = "via unixctl"
4483 },
4484 .sid = server->sid,
4485 }
4486 };
4487 if (raft_send(raft, &rpc)) {
4488 unixctl_command_reply(conn, "sent removal request to leader");
4489 } else {
4490 unixctl_command_reply_error(conn,
4491 "failed to send removal request");
4492 }
4493 }
4494 }
4495
4496 static void
4497 raft_unixctl_failure_test(struct unixctl_conn *conn OVS_UNUSED,
4498 int argc OVS_UNUSED, const char *argv[],
4499 void *aux OVS_UNUSED)
4500 {
4501 const char *test = argv[1];
4502 if (!strcmp(test, "crash-before-sending-append-request")) {
4503 failure_test = FT_CRASH_BEFORE_SEND_APPEND_REQ;
4504 } else if (!strcmp(test, "crash-after-sending-append-request")) {
4505 failure_test = FT_CRASH_AFTER_SEND_APPEND_REQ;
4506 } else if (!strcmp(test, "crash-before-sending-execute-command-reply")) {
4507 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REP;
4508 } else if (!strcmp(test, "crash-after-sending-execute-command-reply")) {
4509 failure_test = FT_CRASH_AFTER_SEND_EXEC_REP;
4510 } else if (!strcmp(test, "crash-before-sending-execute-command-request")) {
4511 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REQ;
4512 } else if (!strcmp(test, "crash-after-sending-execute-command-request")) {
4513 failure_test = FT_CRASH_AFTER_SEND_EXEC_REQ;
4514 } else if (!strcmp(test, "crash-after-receiving-append-request-update")) {
4515 failure_test = FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE;
4516 } else if (!strcmp(test, "delay-election")) {
4517 failure_test = FT_DELAY_ELECTION;
4518 struct raft *raft;
4519 HMAP_FOR_EACH (raft, hmap_node, &all_rafts) {
4520 if (raft->role == RAFT_FOLLOWER) {
4521 raft_reset_election_timer(raft);
4522 }
4523 }
4524 } else if (!strcmp(test, "clear")) {
4525 failure_test = FT_NO_TEST;
4526 unixctl_command_reply(conn, "test dismissed");
4527 return;
4528 } else {
4529 unixctl_command_reply_error(conn, "unknown test scenario");
4530 return;
4531 }
4532 unixctl_command_reply(conn, "test engaged");
4533 }
4534
4535 static void
4536 raft_init(void)
4537 {
4538 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
4539 if (!ovsthread_once_start(&once)) {
4540 return;
4541 }
4542 unixctl_command_register("cluster/cid", "DB", 1, 1,
4543 raft_unixctl_cid, NULL);
4544 unixctl_command_register("cluster/sid", "DB", 1, 1,
4545 raft_unixctl_sid, NULL);
4546 unixctl_command_register("cluster/status", "DB", 1, 1,
4547 raft_unixctl_status, NULL);
4548 unixctl_command_register("cluster/leave", "DB", 1, 1,
4549 raft_unixctl_leave, NULL);
4550 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4551 raft_unixctl_kick, NULL);
4552 unixctl_command_register("cluster/failure-test", "FAILURE SCENARIO", 1, 1,
4553 raft_unixctl_failure_test, NULL);
4554 ovsthread_once_done(&once);
4555 }