]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/raft.c
raft.c: Set candidate_retrying if no leader elected since last election.
[mirror_ovs.git] / ovsdb / raft.c
1 /*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18
19 #include "raft.h"
20 #include "raft-private.h"
21
22 #include <errno.h>
23 #include <unistd.h>
24
25 #include "hash.h"
26 #include "jsonrpc.h"
27 #include "lockfile.h"
28 #include "openvswitch/dynamic-string.h"
29 #include "openvswitch/hmap.h"
30 #include "openvswitch/json.h"
31 #include "openvswitch/list.h"
32 #include "openvswitch/poll-loop.h"
33 #include "openvswitch/vlog.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-parser.h"
36 #include "ovsdb/log.h"
37 #include "raft-rpc.h"
38 #include "random.h"
39 #include "socket-util.h"
40 #include "stream.h"
41 #include "timeval.h"
42 #include "unicode.h"
43 #include "unixctl.h"
44 #include "util.h"
45 #include "uuid.h"
46
47 VLOG_DEFINE_THIS_MODULE(raft);
48
49 /* Roles for a Raft server:
50 *
51 * - Followers: Servers in touch with the current leader.
52 *
53 * - Candidate: Servers unaware of a current leader and seeking election to
54 * leader.
55 *
56 * - Leader: Handles all client requests. At most one at a time.
57 *
58 * In normal operation there is exactly one leader and all of the other servers
59 * are followers. */
60 enum raft_role {
61 RAFT_FOLLOWER,
62 RAFT_CANDIDATE,
63 RAFT_LEADER
64 };
65
66 /* Flags for unit tests. */
67 enum raft_failure_test {
68 FT_NO_TEST,
69 FT_CRASH_BEFORE_SEND_APPEND_REQ,
70 FT_CRASH_AFTER_SEND_APPEND_REQ,
71 FT_CRASH_BEFORE_SEND_EXEC_REP,
72 FT_CRASH_AFTER_SEND_EXEC_REP,
73 FT_CRASH_BEFORE_SEND_EXEC_REQ,
74 FT_CRASH_AFTER_SEND_EXEC_REQ,
75 FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE,
76 FT_DELAY_ELECTION
77 };
78 static enum raft_failure_test failure_test;
79
80 /* A connection between this Raft server and another one. */
81 struct raft_conn {
82 struct ovs_list list_node; /* In struct raft's 'conns' list. */
83 struct jsonrpc_session *js; /* JSON-RPC connection. */
84 struct uuid sid; /* This server's unique ID. */
85 char *nickname; /* Short name for use in log messages. */
86 bool incoming; /* True if incoming, false if outgoing. */
87 unsigned int js_seqno; /* Seqno for noticing (re)connections. */
88 };
89
90 static void raft_conn_close(struct raft_conn *);
91
92 /* A "command", that is, a request to append an entry to the log.
93 *
94 * The Raft specification only allows clients to issue commands to the leader.
95 * With this implementation, clients may issue a command on any server, which
96 * then relays the command to the leader if necessary.
97 *
98 * This structure is thus used in three cases:
99 *
100 * 1. We are the leader and the command was issued to us directly.
101 *
102 * 2. We are a follower and relayed the command to the leader.
103 *
104 * 3. We are the leader and a follower relayed the command to us.
105 */
106 struct raft_command {
107 /* All cases. */
108 struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
109 unsigned int n_refs; /* Reference count. */
110 enum raft_command_status status; /* Execution status. */
111 struct uuid eid; /* Entry ID of result. */
112
113 /* Case 1 only. */
114 uint64_t index; /* Index in log (0 if being relayed). */
115
116 /* Case 2 only. */
117 long long int timestamp; /* Issue or last ping time, for expiration. */
118
119 /* Case 3 only. */
120 struct uuid sid; /* The follower (otherwise UUID_ZERO). */
121 };
122
123 static void raft_command_complete(struct raft *, struct raft_command *,
124 enum raft_command_status);
125
126 static void raft_complete_all_commands(struct raft *,
127 enum raft_command_status);
128
129 /* Type of deferred action, see struct raft_waiter. */
130 enum raft_waiter_type {
131 RAFT_W_ENTRY,
132 RAFT_W_TERM,
133 RAFT_W_RPC,
134 };
135
136 /* An action deferred until a log write commits to disk. */
137 struct raft_waiter {
138 struct ovs_list list_node;
139 uint64_t commit_ticket;
140
141 enum raft_waiter_type type;
142 union {
143 /* RAFT_W_ENTRY.
144 *
145 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
146 * completion, updates 'log_synced' to indicate that the new log entry
147 * or entries are committed and, if we are leader, also updates our
148 * local 'match_index'. */
149 struct {
150 uint64_t index;
151 } entry;
152
153 /* RAFT_W_TERM.
154 *
155 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
156 * Upon completion, updates 'synced_term' and 'synced_vote', which
157 * triggers sending RPCs deferred by the uncommitted 'term' and
158 * 'vote'. */
159 struct {
160 uint64_t term;
161 struct uuid vote;
162 } term;
163
164 /* RAFT_W_RPC.
165 *
166 * Sometimes, sending an RPC to a peer must be delayed until an entry,
167 * a term, or a vote mentioned in the RPC is synced to disk. This
168 * waiter keeps a copy of such an RPC until the previous waiters have
169 * committed. */
170 union raft_rpc *rpc;
171 };
172 };
173
174 static struct raft_waiter *raft_waiter_create(struct raft *,
175 enum raft_waiter_type,
176 bool start_commit);
177 static void raft_waiters_destroy(struct raft *);
178
179 /* The Raft state machine. */
180 struct raft {
181 struct hmap_node hmap_node; /* In 'all_rafts'. */
182 struct ovsdb_log *log;
183
184 /* Persistent derived state.
185 *
186 * This must be updated on stable storage before responding to RPCs. It can be
187 * derived from the header, snapshot, and log in 'log'. */
188
189 struct uuid cid; /* Cluster ID (immutable for the cluster). */
190 struct uuid sid; /* Server ID (immutable for the server). */
191 char *local_address; /* Local address (immutable for the server). */
192 char *local_nickname; /* Used for local server in log messages. */
193 char *name; /* Schema name (immutable for the cluster). */
194
195 /* Contains "struct raft_server"s and represents the server configuration
196 * most recently added to 'log'. */
197 struct hmap servers;
198
199 /* Persistent state on all servers.
200 *
201 * Must be updated on stable storage before responding to RPCs. */
202
203 /* Current term and the vote for that term. These might be on the way to
204 * disk now. */
205 uint64_t term; /* Initialized to 0 and only increases. */
206 struct uuid vote; /* All-zeros if no vote yet in 'term'. */
207
208 /* The term and vote that have been synced to disk. */
209 uint64_t synced_term;
210 struct uuid synced_vote;
211
212 /* The log.
213 *
214 * A log entry with index 1 never really exists; the initial snapshot for a
215 * Raft is considered to include this index. The first real log entry has
216 * index 2.
217 *
218 * A new Raft instance contains an empty log: log_start=2, log_end=2.
219 * Over time, the log grows: log_start=2, log_end=N.
220 * At some point, the server takes a snapshot: log_start=N, log_end=N.
221 * The log continues to grow: log_start=N, log_end=N+1...
222 *
223 * Must be updated on stable storage before responding to RPCs. */
224 struct raft_entry *entries; /* Log entry i is in log[i - log_start]. */
225 uint64_t log_start; /* Index of first entry in log. */
226 uint64_t log_end; /* Index of last entry in log, plus 1. */
227 uint64_t log_synced; /* Index of last synced entry. */
228 size_t allocated_log; /* Allocated entries in 'log'. */
229
230 /* Snapshot state (see Figure 5.1)
231 *
232 * This is the state of the cluster as of the last discarded log entry,
233 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
234 * Only committed log entries can be included in a snapshot. */
235 struct raft_entry snap;
236
237 /* Volatile state.
238 *
239 * The snapshot is always committed, but the rest of the log might not be yet.
240 * 'last_applied' tracks what entries have been passed to the client. If the
241 * client hasn't yet read the latest snapshot, then even the snapshot isn't
242 * applied yet. Thus, the invariants are different for these members:
243 *
244 * log_start - 2 <= last_applied <= commit_index < log_end.
245 * log_start - 1 <= commit_index < log_end.
246 */
247
248 enum raft_role role; /* Current role. */
249 uint64_t commit_index; /* Max log index known to be committed. */
250 uint64_t last_applied; /* Max log index applied to state machine. */
251 struct uuid leader_sid; /* Server ID of leader (zero, if unknown). */
252
253 #define ELECTION_BASE_MSEC 1024
254 #define ELECTION_RANGE_MSEC 1024
255 long long int election_base; /* Time of last heartbeat from leader. */
256 long long int election_timeout; /* Time at which we start an election. */
257
258 /* Used for joining a cluster. */
259 bool joining; /* Attempting to join the cluster? */
260 struct sset remote_addresses; /* Addresses to try to find other servers. */
261 long long int join_timeout; /* Time to re-send add server request. */
262
263 /* Used for leaving a cluster. */
264 bool leaving; /* True if we are leaving the cluster. */
265 bool left; /* True if we have finished leaving. */
266 long long int leave_timeout; /* Time to re-send remove server request. */
267
268 /* Failure. */
269 bool failed; /* True if unrecoverable error has occurred. */
270
271 /* File synchronization. */
272 struct ovs_list waiters; /* Contains "struct raft_waiter"s. */
273
274 /* Network connections. */
275 struct pstream *listener; /* For connections from other Raft servers. */
276 long long int listen_backoff; /* For retrying creating 'listener'. */
277 struct ovs_list conns; /* Contains struct raft_conns. */
278
279 /* Leaders only. Reinitialized after becoming leader. */
280 struct hmap add_servers; /* Contains "struct raft_server"s to add. */
281 struct raft_server *remove_server; /* Server being removed. */
282 struct hmap commands; /* Contains "struct raft_command"s. */
283 #define PING_TIME_MSEC (ELECTION_BASE_MSEC / 3)
284 long long int ping_timeout; /* Time at which to send a heartbeat */
285
286 /* Candidates only. Reinitialized at start of election. */
287 int n_votes; /* Number of votes for me. */
288
289 /* Followers and candidates only. */
290 bool candidate_retrying; /* The earlier election timed-out and we are
291 now retrying. */
292 bool had_leader; /* There has been leader elected since last
293 election initiated. This is to help setting
294 candidate_retrying. */
295 };
296
297 /* All Raft structures. */
298 static struct hmap all_rafts = HMAP_INITIALIZER(&all_rafts);
299
300 static void raft_init(void);
301
302 static struct ovsdb_error *raft_read_header(struct raft *)
303 OVS_WARN_UNUSED_RESULT;
304
305 static void raft_send_execute_command_reply(struct raft *,
306 const struct uuid *sid,
307 const struct uuid *eid,
308 enum raft_command_status,
309 uint64_t commit_index);
310
311 static void raft_update_our_match_index(struct raft *, uint64_t min_index);
312
313 static void raft_send_remove_server_reply__(
314 struct raft *, const struct uuid *target_sid,
315 const struct uuid *requester_sid, struct unixctl_conn *requester_conn,
316 bool success, const char *comment);
317 static void raft_finished_leaving_cluster(struct raft *);
318
319 static void raft_server_init_leader(struct raft *, struct raft_server *);
320
321 static bool raft_rpc_is_heartbeat(const union raft_rpc *);
322 static bool raft_is_rpc_synced(const struct raft *, const union raft_rpc *);
323
324 static void raft_handle_rpc(struct raft *, const union raft_rpc *);
325
326 static bool raft_send_at(struct raft *, const union raft_rpc *,
327 int line_number);
328 #define raft_send(raft, rpc) raft_send_at(raft, rpc, __LINE__)
329
330 static bool raft_send_to_conn_at(struct raft *, const union raft_rpc *,
331 struct raft_conn *, int line_number);
332 #define raft_send_to_conn(raft, rpc, conn) \
333 raft_send_to_conn_at(raft, rpc, conn, __LINE__)
334
335 static void raft_send_append_request(struct raft *,
336 struct raft_server *, unsigned int n,
337 const char *comment);
338
339 static void raft_become_leader(struct raft *);
340 static void raft_become_follower(struct raft *);
341 static void raft_reset_election_timer(struct raft *);
342 static void raft_reset_ping_timer(struct raft *);
343 static void raft_send_heartbeats(struct raft *);
344 static void raft_start_election(struct raft *, bool leadership_transfer);
345 static bool raft_truncate(struct raft *, uint64_t new_end);
346 static void raft_get_servers_from_log(struct raft *, enum vlog_level);
347
348 static bool raft_handle_write_error(struct raft *, struct ovsdb_error *);
349
350 static void raft_run_reconfigure(struct raft *);
351
352 static void raft_set_leader(struct raft *, const struct uuid *sid);
353 static struct raft_server *
354 raft_find_server(const struct raft *raft, const struct uuid *sid)
355 {
356 return raft_server_find(&raft->servers, sid);
357 }
358
359 static char *
360 raft_make_address_passive(const char *address_)
361 {
362 if (!strncmp(address_, "unix:", 5)) {
363 return xasprintf("p%s", address_);
364 } else {
365 char *address = xstrdup(address_);
366 char *host, *port;
367 inet_parse_host_port_tokens(strchr(address, ':') + 1, &host, &port);
368
369 struct ds paddr = DS_EMPTY_INITIALIZER;
370 ds_put_format(&paddr, "p%.3s:%s:", address, port);
371 if (strchr(host, ':')) {
372 ds_put_format(&paddr, "[%s]", host);
373 } else {
374 ds_put_cstr(&paddr, host);
375 }
376 free(address);
377 return ds_steal_cstr(&paddr);
378 }
379 }
380
381 static struct raft *
382 raft_alloc(void)
383 {
384 raft_init();
385
386 struct raft *raft = xzalloc(sizeof *raft);
387 hmap_node_nullify(&raft->hmap_node);
388 hmap_init(&raft->servers);
389 raft->log_start = raft->log_end = 1;
390 raft->role = RAFT_FOLLOWER;
391 sset_init(&raft->remote_addresses);
392 raft->join_timeout = LLONG_MAX;
393 ovs_list_init(&raft->waiters);
394 raft->listen_backoff = LLONG_MIN;
395 ovs_list_init(&raft->conns);
396 hmap_init(&raft->add_servers);
397 hmap_init(&raft->commands);
398
399 raft_reset_ping_timer(raft);
400 raft_reset_election_timer(raft);
401
402 return raft;
403 }
404
405 /* Creates an on-disk file that represents a new Raft cluster and initializes
406 * it to consist of a single server, the one on which this function is called.
407 *
408 * Creates the local copy of the cluster's log in 'file_name', which must not
409 * already exist. Gives it the name 'name', which should be the database
410 * schema name and which is used only to match up this database with the server
411 * added to the cluster later if the cluster ID is unavailable.
412 *
413 * The new server is located at 'local_address', which must take one of the
414 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
415 * square bracket enclosed IPv6 address and PORT is a TCP port number.
416 *
417 * This only creates the on-disk file. Use raft_open() to start operating the
418 * new server.
419 *
420 * Returns null if successful, otherwise an ovsdb_error describing the
421 * problem. */
422 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
423 raft_create_cluster(const char *file_name, const char *name,
424 const char *local_address, const struct json *data)
425 {
426 /* Parse and verify validity of the local address. */
427 struct ovsdb_error *error = raft_address_validate(local_address);
428 if (error) {
429 return error;
430 }
431
432 /* Create log file. */
433 struct ovsdb_log *log;
434 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
435 -1, &log);
436 if (error) {
437 return error;
438 }
439
440 /* Write log file. */
441 struct raft_header h = {
442 .sid = uuid_random(),
443 .cid = uuid_random(),
444 .name = xstrdup(name),
445 .local_address = xstrdup(local_address),
446 .joining = false,
447 .remote_addresses = SSET_INITIALIZER(&h.remote_addresses),
448 .snap_index = 1,
449 .snap = {
450 .term = 1,
451 .data = json_nullable_clone(data),
452 .eid = uuid_random(),
453 .servers = json_object_create(),
454 },
455 };
456 shash_add_nocopy(json_object(h.snap.servers),
457 xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
458 json_string_create(local_address));
459 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
460 raft_header_uninit(&h);
461 if (!error) {
462 error = ovsdb_log_commit_block(log);
463 }
464 ovsdb_log_close(log);
465
466 return error;
467 }
468
469 /* Creates a database file that represents a new server in an existing Raft
470 * cluster.
471 *
472 * Creates the local copy of the cluster's log in 'file_name', which must not
473 * already exist. Gives it the name 'name', which must be the same name
474 * passed in to raft_create_cluster() earlier.
475 *
476 * 'cid' is optional. If specified, the new server will join only the cluster
477 * with the given cluster ID.
478 *
479 * The new server is located at 'local_address', which must take one of the
480 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
481 * square bracket enclosed IPv6 address and PORT is a TCP port number.
482 *
483 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
484 * specifies the addresses of existing servers in the cluster. One server out
485 * of the existing cluster is sufficient, as long as that server is reachable
486 * and not partitioned from the current cluster leader. If multiple servers
487 * from the cluster are specified, then it is sufficient for any of them to
488 * meet this criterion.
489 *
490 * This only creates the on-disk file and does no network access. Use
491 * raft_open() to start operating the new server. (Until this happens, the
492 * new server has not joined the cluster.)
493 *
494 * Returns null if successful, otherwise an ovsdb_error describing the
495 * problem. */
496 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
497 raft_join_cluster(const char *file_name,
498 const char *name, const char *local_address,
499 const struct sset *remote_addresses,
500 const struct uuid *cid)
501 {
502 ovs_assert(!sset_is_empty(remote_addresses));
503
504 /* Parse and verify validity of the addresses. */
505 struct ovsdb_error *error = raft_address_validate(local_address);
506 if (error) {
507 return error;
508 }
509 const char *addr;
510 SSET_FOR_EACH (addr, remote_addresses) {
511 error = raft_address_validate(addr);
512 if (error) {
513 return error;
514 }
515 if (!strcmp(addr, local_address)) {
516 return ovsdb_error(NULL, "remote addresses cannot be the same "
517 "as the local address");
518 }
519 }
520
521 /* Verify validity of the cluster ID (if provided). */
522 if (cid && uuid_is_zero(cid)) {
523 return ovsdb_error(NULL, "all-zero UUID is not valid cluster ID");
524 }
525
526 /* Create log file. */
527 struct ovsdb_log *log;
528 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
529 -1, &log);
530 if (error) {
531 return error;
532 }
533
534 /* Write log file. */
535 struct raft_header h = {
536 .sid = uuid_random(),
537 .cid = cid ? *cid : UUID_ZERO,
538 .name = xstrdup(name),
539 .local_address = xstrdup(local_address),
540 .joining = true,
541 /* No snapshot yet. */
542 };
543 sset_clone(&h.remote_addresses, remote_addresses);
544 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
545 raft_header_uninit(&h);
546 if (!error) {
547 error = ovsdb_log_commit_block(log);
548 }
549 ovsdb_log_close(log);
550
551 return error;
552 }
553
554 /* Reads the initial header record from 'log', which must be a Raft clustered
555 * database log, and populates '*md' with the information read from it. The
556 * caller must eventually destroy 'md' with raft_metadata_destroy().
557 *
558 * On success, returns NULL. On failure, returns an error that the caller must
559 * eventually destroy and zeros '*md'. */
560 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
561 raft_read_metadata(struct ovsdb_log *log, struct raft_metadata *md)
562 {
563 struct raft *raft = raft_alloc();
564 raft->log = log;
565
566 struct ovsdb_error *error = raft_read_header(raft);
567 if (!error) {
568 md->sid = raft->sid;
569 md->name = xstrdup(raft->name);
570 md->local = xstrdup(raft->local_address);
571 md->cid = raft->cid;
572 } else {
573 memset(md, 0, sizeof *md);
574 }
575
576 raft->log = NULL;
577 raft_close(raft);
578 return error;
579 }
580
581 /* Frees the metadata in 'md'. */
582 void
583 raft_metadata_destroy(struct raft_metadata *md)
584 {
585 if (md) {
586 free(md->name);
587 free(md->local);
588 }
589 }
590
591 static const struct raft_entry *
592 raft_get_entry(const struct raft *raft, uint64_t index)
593 {
594 ovs_assert(index >= raft->log_start);
595 ovs_assert(index < raft->log_end);
596 return &raft->entries[index - raft->log_start];
597 }
598
599 static uint64_t
600 raft_get_term(const struct raft *raft, uint64_t index)
601 {
602 return (index == raft->log_start - 1
603 ? raft->snap.term
604 : raft_get_entry(raft, index)->term);
605 }
606
607 static const struct json *
608 raft_servers_for_index(const struct raft *raft, uint64_t index)
609 {
610 ovs_assert(index >= raft->log_start - 1);
611 ovs_assert(index < raft->log_end);
612
613 const struct json *servers = raft->snap.servers;
614 for (uint64_t i = raft->log_start; i <= index; i++) {
615 const struct raft_entry *e = raft_get_entry(raft, i);
616 if (e->servers) {
617 servers = e->servers;
618 }
619 }
620 return servers;
621 }
622
623 static void
624 raft_set_servers(struct raft *raft, const struct hmap *new_servers,
625 enum vlog_level level)
626 {
627 struct raft_server *s, *next;
628 HMAP_FOR_EACH_SAFE (s, next, hmap_node, &raft->servers) {
629 if (!raft_server_find(new_servers, &s->sid)) {
630 ovs_assert(s != raft->remove_server);
631
632 hmap_remove(&raft->servers, &s->hmap_node);
633 VLOG(level, "server %s removed from configuration", s->nickname);
634 raft_server_destroy(s);
635 }
636 }
637
638 HMAP_FOR_EACH_SAFE (s, next, hmap_node, new_servers) {
639 if (!raft_find_server(raft, &s->sid)) {
640 VLOG(level, "server %s added to configuration", s->nickname);
641
642 struct raft_server *new
643 = raft_server_add(&raft->servers, &s->sid, s->address);
644 raft_server_init_leader(raft, new);
645 }
646 }
647 }
648
649 static uint64_t
650 raft_add_entry(struct raft *raft,
651 uint64_t term, struct json *data, const struct uuid *eid,
652 struct json *servers)
653 {
654 if (raft->log_end - raft->log_start >= raft->allocated_log) {
655 raft->entries = x2nrealloc(raft->entries, &raft->allocated_log,
656 sizeof *raft->entries);
657 }
658
659 uint64_t index = raft->log_end++;
660 struct raft_entry *entry = &raft->entries[index - raft->log_start];
661 entry->term = term;
662 entry->data = data;
663 entry->eid = eid ? *eid : UUID_ZERO;
664 entry->servers = servers;
665 return index;
666 }
667
668 /* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers' to
669 * 'raft''s log and returns an error indication. */
670 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
671 raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
672 const struct uuid *eid, struct json *servers)
673 {
674 struct raft_record r = {
675 .type = RAFT_REC_ENTRY,
676 .term = term,
677 .entry = {
678 .index = raft_add_entry(raft, term, data, eid, servers),
679 .data = data,
680 .servers = servers,
681 .eid = eid ? *eid : UUID_ZERO,
682 },
683 };
684 return ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r));
685 }
686
687 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
688 raft_write_state(struct ovsdb_log *log,
689 uint64_t term, const struct uuid *vote)
690 {
691 struct raft_record r = { .term = term };
692 if (vote && !uuid_is_zero(vote)) {
693 r.type = RAFT_REC_VOTE;
694 r.sid = *vote;
695 } else {
696 r.type = RAFT_REC_TERM;
697 }
698 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
699 }
700
701 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
702 raft_apply_record(struct raft *raft, unsigned long long int rec_idx,
703 const struct raft_record *r)
704 {
705 /* Apply "term", which is present in most kinds of records (and otherwise
706 * 0).
707 *
708 * A Raft leader can replicate entries from previous terms to the other
709 * servers in the cluster, retaining the original terms on those entries
710 * (see section 3.6.2 "Committing entries from previous terms" for more
711 * information), so it's OK for the term in a log record to precede the
712 * current term. */
713 if (r->term > raft->term) {
714 raft->term = raft->synced_term = r->term;
715 raft->vote = raft->synced_vote = UUID_ZERO;
716 }
717
718 switch (r->type) {
719 case RAFT_REC_ENTRY:
720 if (r->entry.index < raft->commit_index) {
721 return ovsdb_error(NULL, "record %llu attempts to truncate log "
722 "from %"PRIu64" to %"PRIu64" entries, but "
723 "commit index is already %"PRIu64,
724 rec_idx, raft->log_end, r->entry.index,
725 raft->commit_index);
726 } else if (r->entry.index > raft->log_end) {
727 return ovsdb_error(NULL, "record %llu with index %"PRIu64" skips "
728 "past expected index %"PRIu64,
729 rec_idx, r->entry.index, raft->log_end);
730 }
731
732 if (r->entry.index < raft->log_end) {
733 /* This can happen, but it is notable. */
734 VLOG_DBG("record %llu truncates log from %"PRIu64" to %"PRIu64
735 " entries", rec_idx, raft->log_end, r->entry.index);
736 raft_truncate(raft, r->entry.index);
737 }
738
739 uint64_t prev_term = (raft->log_end > raft->log_start
740 ? raft->entries[raft->log_end
741 - raft->log_start - 1].term
742 : raft->snap.term);
743 if (r->term < prev_term) {
744 return ovsdb_error(NULL, "record %llu with index %"PRIu64" term "
745 "%"PRIu64" precedes previous entry's term "
746 "%"PRIu64,
747 rec_idx, r->entry.index, r->term, prev_term);
748 }
749
750 raft->log_synced = raft_add_entry(
751 raft, r->term,
752 json_nullable_clone(r->entry.data), &r->entry.eid,
753 json_nullable_clone(r->entry.servers));
754 return NULL;
755
756 case RAFT_REC_TERM:
757 return NULL;
758
759 case RAFT_REC_VOTE:
760 if (r->term < raft->term) {
761 return ovsdb_error(NULL, "record %llu votes for term %"PRIu64" "
762 "but current term is %"PRIu64,
763 rec_idx, r->term, raft->term);
764 } else if (!uuid_is_zero(&raft->vote)
765 && !uuid_equals(&raft->vote, &r->sid)) {
766 return ovsdb_error(NULL, "record %llu votes for "SID_FMT" in term "
767 "%"PRIu64" but a previous record for the "
768 "same term voted for "SID_FMT, rec_idx,
769 SID_ARGS(&raft->vote), r->term,
770 SID_ARGS(&r->sid));
771 } else {
772 raft->vote = raft->synced_vote = r->sid;
773 return NULL;
774 }
775 break;
776
777 case RAFT_REC_NOTE:
778 if (!strcmp(r->note, "left")) {
779 return ovsdb_error(NULL, "record %llu indicates server has left "
780 "the cluster; it cannot be added back (use "
781 "\"ovsdb-tool join-cluster\" to add a new "
782 "server)", rec_idx);
783 }
784 return NULL;
785
786 case RAFT_REC_COMMIT_INDEX:
787 if (r->commit_index < raft->commit_index) {
788 return ovsdb_error(NULL, "record %llu regresses commit index "
789 "from %"PRIu64 " to %"PRIu64,
790 rec_idx, raft->commit_index, r->commit_index);
791 } else if (r->commit_index >= raft->log_end) {
792 return ovsdb_error(NULL, "record %llu advances commit index to "
793 "%"PRIu64 " but last log index is %"PRIu64,
794 rec_idx, r->commit_index, raft->log_end - 1);
795 } else {
796 raft->commit_index = r->commit_index;
797 return NULL;
798 }
799 break;
800
801 case RAFT_REC_LEADER:
802 /* XXX we could use this to take back leadership for quick restart */
803 return NULL;
804
805 default:
806 OVS_NOT_REACHED();
807 }
808 }
809
810 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
811 raft_read_header(struct raft *raft)
812 {
813 /* Read header record. */
814 struct json *json;
815 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
816 if (error || !json) {
817 /* Report error or end-of-file. */
818 return error;
819 }
820 ovsdb_log_mark_base(raft->log);
821
822 struct raft_header h;
823 error = raft_header_from_json(&h, json);
824 json_destroy(json);
825 if (error) {
826 return error;
827 }
828
829 raft->sid = h.sid;
830 raft->cid = h.cid;
831 raft->name = xstrdup(h.name);
832 raft->local_address = xstrdup(h.local_address);
833 raft->local_nickname = raft_address_to_nickname(h.local_address, &h.sid);
834 raft->joining = h.joining;
835
836 if (h.joining) {
837 sset_clone(&raft->remote_addresses, &h.remote_addresses);
838 } else {
839 raft_entry_clone(&raft->snap, &h.snap);
840 raft->log_start = raft->log_end = h.snap_index + 1;
841 raft->commit_index = h.snap_index;
842 raft->last_applied = h.snap_index - 1;
843 }
844
845 raft_header_uninit(&h);
846
847 return NULL;
848 }
849
850 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
851 raft_read_log(struct raft *raft)
852 {
853 for (unsigned long long int i = 1; ; i++) {
854 struct json *json;
855 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
856 if (!json) {
857 if (error) {
858 /* We assume that the error is due to a partial write while
859 * appending to the file before a crash, so log it and
860 * continue. */
861 char *error_string = ovsdb_error_to_string_free(error);
862 VLOG_WARN("%s", error_string);
863 free(error_string);
864 error = NULL;
865 }
866 break;
867 }
868
869 struct raft_record r;
870 error = raft_record_from_json(&r, json);
871 if (!error) {
872 error = raft_apply_record(raft, i, &r);
873 raft_record_uninit(&r);
874 }
875 if (error) {
876 return ovsdb_wrap_error(error, "error reading record %llu from "
877 "%s log", i, raft->name);
878 }
879 }
880
881 /* Set the most recent servers. */
882 raft_get_servers_from_log(raft, VLL_DBG);
883
884 return NULL;
885 }
886
887 static void
888 raft_reset_election_timer(struct raft *raft)
889 {
890 unsigned int duration = (ELECTION_BASE_MSEC
891 + random_range(ELECTION_RANGE_MSEC));
892 raft->election_base = time_msec();
893 if (failure_test == FT_DELAY_ELECTION) {
894 /* Slow down this node so that it won't win the next election. */
895 duration += ELECTION_BASE_MSEC;
896 }
897 raft->election_timeout = raft->election_base + duration;
898 }
899
900 static void
901 raft_reset_ping_timer(struct raft *raft)
902 {
903 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
904 }
905
906 static void
907 raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
908 const struct uuid *sid, bool incoming)
909 {
910 struct raft_conn *conn = xzalloc(sizeof *conn);
911 ovs_list_push_back(&raft->conns, &conn->list_node);
912 conn->js = js;
913 if (sid) {
914 conn->sid = *sid;
915 }
916 conn->nickname = raft_address_to_nickname(jsonrpc_session_get_name(js),
917 &conn->sid);
918 conn->incoming = incoming;
919 conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
920 }
921
922 /* Starts the local server in an existing Raft cluster, using the local copy of
923 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
924 * successful or not. */
925 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
926 raft_open(struct ovsdb_log *log, struct raft **raftp)
927 {
928 struct raft *raft = raft_alloc();
929 raft->log = log;
930
931 struct ovsdb_error *error = raft_read_header(raft);
932 if (error) {
933 goto error;
934 }
935
936 if (!raft->joining) {
937 error = raft_read_log(raft);
938 if (error) {
939 goto error;
940 }
941
942 /* Find our own server. */
943 if (!raft_find_server(raft, &raft->sid)) {
944 error = ovsdb_error(NULL, "server does not belong to cluster");
945 goto error;
946 }
947
948 /* If there's only one server, start an election right away so that the
949 * cluster bootstraps quickly. */
950 if (hmap_count(&raft->servers) == 1) {
951 raft_start_election(raft, false);
952 }
953 } else {
954 raft->join_timeout = time_msec() + 1000;
955 }
956
957 *raftp = raft;
958 hmap_insert(&all_rafts, &raft->hmap_node, hash_string(raft->name, 0));
959 return NULL;
960
961 error:
962 raft_close(raft);
963 *raftp = NULL;
964 return error;
965 }
966
967 /* Returns the name of 'raft', which in OVSDB is the database schema name. */
968 const char *
969 raft_get_name(const struct raft *raft)
970 {
971 return raft->name;
972 }
973
974 /* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
975 * its cluster, then 'cid' will be all-zeros (unless the administrator
976 * specified a cluster ID running "ovsdb-tool join-cluster").
977 *
978 * Each cluster has a unique cluster ID. */
979 const struct uuid *
980 raft_get_cid(const struct raft *raft)
981 {
982 return &raft->cid;
983 }
984
985 /* Returns the server ID of 'raft'. Each server has a unique server ID. */
986 const struct uuid *
987 raft_get_sid(const struct raft *raft)
988 {
989 return &raft->sid;
990 }
991
992 /* Returns true if 'raft' has completed joining its cluster, has not left or
993 * initiated leaving the cluster, does not have failed disk storage, and is
994 * apparently connected to the leader in a healthy way (or is itself the
995 * leader).
996 *
997 * If 'raft' is candidate:
998 * a) if it is the first round of election, consider it as connected, hoping
999 * it will successfully elect a new leader soon.
1000 * b) if it is already retrying, consider it as disconnected (so that clients
1001 * may decide to reconnect to other members). */
1002 bool
1003 raft_is_connected(const struct raft *raft)
1004 {
1005 bool ret = (!raft->candidate_retrying
1006 && !raft->joining
1007 && !raft->leaving
1008 && !raft->left
1009 && !raft->failed);
1010 VLOG_DBG("raft_is_connected: %s\n", ret? "true": "false");
1011 return ret;
1012 }
1013
1014 /* Returns true if 'raft' is the cluster leader. */
1015 bool
1016 raft_is_leader(const struct raft *raft)
1017 {
1018 return raft->role == RAFT_LEADER;
1019 }
1020
1021 /* Returns true if 'raft' is the process of joining its cluster. */
1022 bool
1023 raft_is_joining(const struct raft *raft)
1024 {
1025 return raft->joining;
1026 }
1027
1028 /* Only returns *connected* connections. */
1029 static struct raft_conn *
1030 raft_find_conn_by_sid(struct raft *raft, const struct uuid *sid)
1031 {
1032 if (!uuid_is_zero(sid)) {
1033 struct raft_conn *conn;
1034 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1035 if (uuid_equals(sid, &conn->sid)
1036 && jsonrpc_session_is_connected(conn->js)) {
1037 return conn;
1038 }
1039 }
1040 }
1041 return NULL;
1042 }
1043
1044 static struct raft_conn *
1045 raft_find_conn_by_address(struct raft *raft, const char *address)
1046 {
1047 struct raft_conn *conn;
1048 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1049 if (!strcmp(jsonrpc_session_get_name(conn->js), address)) {
1050 return conn;
1051 }
1052 }
1053 return NULL;
1054 }
1055
1056 static void OVS_PRINTF_FORMAT(3, 4)
1057 raft_record_note(struct raft *raft, const char *note,
1058 const char *comment_format, ...)
1059 {
1060 va_list args;
1061 va_start(args, comment_format);
1062 char *comment = xvasprintf(comment_format, args);
1063 va_end(args);
1064
1065 struct raft_record r = {
1066 .type = RAFT_REC_NOTE,
1067 .comment = comment,
1068 .note = CONST_CAST(char *, note),
1069 };
1070 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
1071
1072 free(comment);
1073 }
1074
1075 /* If we're leader, try to transfer leadership to another server, logging
1076 * 'reason' as the human-readable reason (it should be a phrase suitable for
1077 * following "because") . */
1078 void
1079 raft_transfer_leadership(struct raft *raft, const char *reason)
1080 {
1081 if (raft->role != RAFT_LEADER) {
1082 return;
1083 }
1084
1085 struct raft_server *s;
1086 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1087 if (!uuid_equals(&raft->sid, &s->sid)
1088 && s->phase == RAFT_PHASE_STABLE) {
1089 struct raft_conn *conn = raft_find_conn_by_sid(raft, &s->sid);
1090 if (!conn) {
1091 continue;
1092 }
1093
1094 union raft_rpc rpc = {
1095 .become_leader = {
1096 .common = {
1097 .comment = CONST_CAST(char *, reason),
1098 .type = RAFT_RPC_BECOME_LEADER,
1099 .sid = s->sid,
1100 },
1101 .term = raft->term,
1102 }
1103 };
1104 raft_send_to_conn(raft, &rpc, conn);
1105
1106 raft_record_note(raft, "transfer leadership",
1107 "transferring leadership to %s because %s",
1108 s->nickname, reason);
1109 break;
1110 }
1111 }
1112 }
1113
1114 /* Send a RemoveServerRequest to the rest of the servers in the cluster.
1115 *
1116 * If we know which server is the leader, we can just send the request to it.
1117 * However, we might not know which server is the leader, and we might never
1118 * find out if the remove request was actually previously committed by a
1119 * majority of the servers (because in that case the new leader will not send
1120 * AppendRequests or heartbeats to us). Therefore, we instead send
1121 * RemoveRequests to every server. This theoretically has the same problem, if
1122 * the current cluster leader was not previously a member of the cluster, but
1123 * it seems likely to be more robust in practice. */
1124 static void
1125 raft_send_remove_server_requests(struct raft *raft)
1126 {
1127 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1128 VLOG_INFO_RL(&rl, "sending remove request (joining=%s, leaving=%s)",
1129 raft->joining ? "true" : "false",
1130 raft->leaving ? "true" : "false");
1131 const struct raft_server *s;
1132 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1133 if (!uuid_equals(&s->sid, &raft->sid)) {
1134 union raft_rpc rpc = (union raft_rpc) {
1135 .remove_server_request = {
1136 .common = {
1137 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1138 .sid = s->sid,
1139 },
1140 .sid = raft->sid,
1141 },
1142 };
1143 raft_send(raft, &rpc);
1144 }
1145 }
1146
1147 raft->leave_timeout = time_msec() + ELECTION_BASE_MSEC;
1148 }
1149
1150 /* Attempts to start 'raft' leaving its cluster. The caller can check progress
1151 * using raft_is_leaving() and raft_left(). */
1152 void
1153 raft_leave(struct raft *raft)
1154 {
1155 if (raft->joining || raft->failed || raft->leaving || raft->left) {
1156 return;
1157 }
1158 VLOG_INFO(SID_FMT": starting to leave cluster "CID_FMT,
1159 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1160 raft->leaving = true;
1161 raft_transfer_leadership(raft, "this server is leaving the cluster");
1162 raft_become_follower(raft);
1163 raft_send_remove_server_requests(raft);
1164 raft->leave_timeout = time_msec() + ELECTION_BASE_MSEC;
1165 }
1166
1167 /* Returns true if 'raft' is currently attempting to leave its cluster. */
1168 bool
1169 raft_is_leaving(const struct raft *raft)
1170 {
1171 return raft->leaving;
1172 }
1173
1174 /* Returns true if 'raft' successfully left its cluster. */
1175 bool
1176 raft_left(const struct raft *raft)
1177 {
1178 return raft->left;
1179 }
1180
1181 /* Returns true if 'raft' has experienced a disk I/O failure. When this
1182 * returns true, only closing and reopening 'raft' allows for recovery. */
1183 bool
1184 raft_failed(const struct raft *raft)
1185 {
1186 return raft->failed;
1187 }
1188
1189 /* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1190 * current cluster. */
1191 void
1192 raft_take_leadership(struct raft *raft)
1193 {
1194 if (raft->role != RAFT_LEADER) {
1195 raft_start_election(raft, true);
1196 }
1197 }
1198
1199 /* Closes everything owned by 'raft' that might be visible outside the process:
1200 * network connections, commands, etc. This is part of closing 'raft'; it is
1201 * also used if 'raft' has failed in an unrecoverable way. */
1202 static void
1203 raft_close__(struct raft *raft)
1204 {
1205 if (!hmap_node_is_null(&raft->hmap_node)) {
1206 hmap_remove(&all_rafts, &raft->hmap_node);
1207 hmap_node_nullify(&raft->hmap_node);
1208 }
1209
1210 raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
1211
1212 struct raft_server *rs = raft->remove_server;
1213 if (rs) {
1214 raft_send_remove_server_reply__(raft, &rs->sid, &rs->requester_sid,
1215 rs->requester_conn, false,
1216 RAFT_SERVER_SHUTDOWN);
1217 raft_server_destroy(raft->remove_server);
1218 raft->remove_server = NULL;
1219 }
1220
1221 struct raft_conn *conn, *next;
1222 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1223 raft_conn_close(conn);
1224 }
1225 }
1226
1227 /* Closes and frees 'raft'.
1228 *
1229 * A server's cluster membership is independent of whether the server is
1230 * actually running. When a server that is a member of a cluster closes, the
1231 * cluster treats this as a server failure. */
1232 void
1233 raft_close(struct raft *raft)
1234 {
1235 if (!raft) {
1236 return;
1237 }
1238
1239 raft_transfer_leadership(raft, "this server is shutting down");
1240
1241 raft_close__(raft);
1242
1243 ovsdb_log_close(raft->log);
1244
1245 raft_servers_destroy(&raft->servers);
1246
1247 for (uint64_t index = raft->log_start; index < raft->log_end; index++) {
1248 struct raft_entry *e = &raft->entries[index - raft->log_start];
1249 raft_entry_uninit(e);
1250 }
1251 free(raft->entries);
1252
1253 raft_entry_uninit(&raft->snap);
1254
1255 raft_waiters_destroy(raft);
1256
1257 raft_servers_destroy(&raft->add_servers);
1258
1259 hmap_destroy(&raft->commands);
1260
1261 pstream_close(raft->listener);
1262
1263 sset_destroy(&raft->remote_addresses);
1264 free(raft->local_address);
1265 free(raft->local_nickname);
1266 free(raft->name);
1267
1268 free(raft);
1269 }
1270
1271 static bool
1272 raft_conn_receive(struct raft *raft, struct raft_conn *conn,
1273 union raft_rpc *rpc)
1274 {
1275 struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
1276 if (!msg) {
1277 return false;
1278 }
1279
1280 struct ovsdb_error *error = raft_rpc_from_jsonrpc(&raft->cid, &raft->sid,
1281 msg, rpc);
1282 jsonrpc_msg_destroy(msg);
1283 if (error) {
1284 char *s = ovsdb_error_to_string_free(error);
1285 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn->js), s);
1286 free(s);
1287 return false;
1288 }
1289
1290 if (uuid_is_zero(&conn->sid)) {
1291 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1292 conn->sid = rpc->common.sid;
1293 VLOG_INFO_RL(&rl, "%s: learned server ID "SID_FMT,
1294 jsonrpc_session_get_name(conn->js), SID_ARGS(&conn->sid));
1295 } else if (!uuid_equals(&conn->sid, &rpc->common.sid)) {
1296 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1297 VLOG_WARN_RL(&rl, "%s: ignoring message with unexpected server ID "
1298 SID_FMT" (expected "SID_FMT")",
1299 jsonrpc_session_get_name(conn->js),
1300 SID_ARGS(&rpc->common.sid), SID_ARGS(&conn->sid));
1301 raft_rpc_uninit(rpc);
1302 return false;
1303 }
1304
1305 const char *address = (rpc->type == RAFT_RPC_HELLO_REQUEST
1306 ? rpc->hello_request.address
1307 : rpc->type == RAFT_RPC_ADD_SERVER_REQUEST
1308 ? rpc->add_server_request.address
1309 : NULL);
1310 if (address) {
1311 char *new_nickname = raft_address_to_nickname(address, &conn->sid);
1312 if (strcmp(conn->nickname, new_nickname)) {
1313 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1314 VLOG_INFO_RL(&rl, "%s: learned remote address %s",
1315 jsonrpc_session_get_name(conn->js), address);
1316
1317 free(conn->nickname);
1318 conn->nickname = new_nickname;
1319 } else {
1320 free(new_nickname);
1321 }
1322 }
1323
1324 return true;
1325 }
1326
1327 static const char *
1328 raft_get_nickname(const struct raft *raft, const struct uuid *sid,
1329 char buf[SID_LEN + 1], size_t bufsize)
1330 {
1331 if (uuid_equals(sid, &raft->sid)) {
1332 return raft->local_nickname;
1333 }
1334
1335 const char *s = raft_servers_get_nickname__(&raft->servers, sid);
1336 if (s) {
1337 return s;
1338 }
1339
1340 return raft_servers_get_nickname(&raft->add_servers, sid, buf, bufsize);
1341 }
1342
1343 static void
1344 log_rpc(const union raft_rpc *rpc, const char *direction,
1345 const struct raft_conn *conn, int line_number)
1346 {
1347 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
1348 if (!raft_rpc_is_heartbeat(rpc) && !VLOG_DROP_DBG(&rl)) {
1349 struct ds s = DS_EMPTY_INITIALIZER;
1350 if (line_number) {
1351 ds_put_format(&s, "raft.c:%d ", line_number);
1352 }
1353 ds_put_format(&s, "%s%s ", direction, conn->nickname);
1354 raft_rpc_format(rpc, &s);
1355 VLOG_DBG("%s", ds_cstr(&s));
1356 ds_destroy(&s);
1357 }
1358 }
1359
1360 static void
1361 raft_send_add_server_request(struct raft *raft, struct raft_conn *conn)
1362 {
1363 union raft_rpc rq = {
1364 .add_server_request = {
1365 .common = {
1366 .type = RAFT_RPC_ADD_SERVER_REQUEST,
1367 .sid = UUID_ZERO,
1368 .comment = NULL,
1369 },
1370 .address = raft->local_address,
1371 },
1372 };
1373 raft_send_to_conn(raft, &rq, conn);
1374 }
1375
1376 static void
1377 raft_conn_run(struct raft *raft, struct raft_conn *conn)
1378 {
1379 jsonrpc_session_run(conn->js);
1380
1381 unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
1382 bool just_connected = (new_seqno != conn->js_seqno
1383 && jsonrpc_session_is_connected(conn->js));
1384 conn->js_seqno = new_seqno;
1385 if (just_connected) {
1386 if (raft->joining) {
1387 raft_send_add_server_request(raft, conn);
1388 } else if (raft->leaving) {
1389 union raft_rpc rq = {
1390 .remove_server_request = {
1391 .common = {
1392 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1393 .sid = conn->sid,
1394 },
1395 .sid = raft->sid,
1396 },
1397 };
1398 raft_send_to_conn(raft, &rq, conn);
1399 } else {
1400 union raft_rpc rq = (union raft_rpc) {
1401 .hello_request = {
1402 .common = {
1403 .type = RAFT_RPC_HELLO_REQUEST,
1404 .sid = conn->sid,
1405 },
1406 .address = raft->local_address,
1407 },
1408 };
1409 raft_send_to_conn(raft, &rq, conn);
1410 }
1411 }
1412
1413 for (size_t i = 0; i < 50; i++) {
1414 union raft_rpc rpc;
1415 if (!raft_conn_receive(raft, conn, &rpc)) {
1416 break;
1417 }
1418
1419 log_rpc(&rpc, "<--", conn, 0);
1420 raft_handle_rpc(raft, &rpc);
1421 raft_rpc_uninit(&rpc);
1422 }
1423 }
1424
1425 static void
1426 raft_waiter_complete_rpc(struct raft *raft, const union raft_rpc *rpc)
1427 {
1428 uint64_t term = raft_rpc_get_term(rpc);
1429 if (term && term < raft->term) {
1430 /* Drop the message because it's for an expired term. */
1431 return;
1432 }
1433
1434 if (!raft_is_rpc_synced(raft, rpc)) {
1435 /* This is a bug. A reply message is deferred because some state in
1436 * the message, such as a term or index, has not been committed to
1437 * disk, and they should only be completed when that commit is done.
1438 * But this message is being completed before the commit is finished.
1439 * Complain, and hope that someone reports the bug. */
1440 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1441 if (VLOG_DROP_ERR(&rl)) {
1442 return;
1443 }
1444
1445 struct ds s = DS_EMPTY_INITIALIZER;
1446
1447 if (term > raft->synced_term) {
1448 ds_put_format(&s, " because message term %"PRIu64" is "
1449 "past synced term %"PRIu64,
1450 term, raft->synced_term);
1451 }
1452
1453 uint64_t index = raft_rpc_get_min_sync_index(rpc);
1454 if (index > raft->log_synced) {
1455 ds_put_format(&s, " %s message index %"PRIu64" is past last "
1456 "synced index %"PRIu64,
1457 s.length ? "and" : "because",
1458 index, raft->log_synced);
1459 }
1460
1461 const struct uuid *vote = raft_rpc_get_vote(rpc);
1462 if (vote && !uuid_equals(vote, &raft->synced_vote)) {
1463 char buf1[SID_LEN + 1];
1464 char buf2[SID_LEN + 1];
1465 ds_put_format(&s, " %s vote %s differs from synced vote %s",
1466 s.length ? "and" : "because",
1467 raft_get_nickname(raft, vote, buf1, sizeof buf1),
1468 raft_get_nickname(raft, &raft->synced_vote,
1469 buf2, sizeof buf2));
1470 }
1471
1472 char buf[SID_LEN + 1];
1473 ds_put_format(&s, ": %s ",
1474 raft_get_nickname(raft, &rpc->common.sid,
1475 buf, sizeof buf));
1476 raft_rpc_format(rpc, &s);
1477 VLOG_ERR("internal error: deferred %s message completed "
1478 "but not ready to send%s",
1479 raft_rpc_type_to_string(rpc->type), ds_cstr(&s));
1480 ds_destroy(&s);
1481
1482 return;
1483 }
1484
1485 struct raft_conn *dst = raft_find_conn_by_sid(raft, &rpc->common.sid);
1486 if (dst) {
1487 raft_send_to_conn(raft, rpc, dst);
1488 }
1489 }
1490
1491 static void
1492 raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
1493 {
1494 switch (w->type) {
1495 case RAFT_W_ENTRY:
1496 if (raft->role == RAFT_LEADER) {
1497 raft_update_our_match_index(raft, w->entry.index);
1498 }
1499 raft->log_synced = w->entry.index;
1500 break;
1501
1502 case RAFT_W_TERM:
1503 raft->synced_term = w->term.term;
1504 raft->synced_vote = w->term.vote;
1505 break;
1506
1507 case RAFT_W_RPC:
1508 raft_waiter_complete_rpc(raft, w->rpc);
1509 break;
1510 }
1511 }
1512
1513 static void
1514 raft_waiter_destroy(struct raft_waiter *w)
1515 {
1516 if (!w) {
1517 return;
1518 }
1519
1520 ovs_list_remove(&w->list_node);
1521
1522 switch (w->type) {
1523 case RAFT_W_ENTRY:
1524 case RAFT_W_TERM:
1525 break;
1526
1527 case RAFT_W_RPC:
1528 raft_rpc_uninit(w->rpc);
1529 free(w->rpc);
1530 break;
1531 }
1532 free(w);
1533 }
1534
1535 static void
1536 raft_waiters_run(struct raft *raft)
1537 {
1538 if (ovs_list_is_empty(&raft->waiters)) {
1539 return;
1540 }
1541
1542 uint64_t cur = ovsdb_log_commit_progress(raft->log);
1543 struct raft_waiter *w, *next;
1544 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1545 if (cur < w->commit_ticket) {
1546 break;
1547 }
1548 raft_waiter_complete(raft, w);
1549 raft_waiter_destroy(w);
1550 }
1551 }
1552
1553 static void
1554 raft_waiters_wait(struct raft *raft)
1555 {
1556 struct raft_waiter *w;
1557 LIST_FOR_EACH (w, list_node, &raft->waiters) {
1558 ovsdb_log_commit_wait(raft->log, w->commit_ticket);
1559 break;
1560 }
1561 }
1562
1563 static void
1564 raft_waiters_destroy(struct raft *raft)
1565 {
1566 struct raft_waiter *w, *next;
1567 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1568 raft_waiter_destroy(w);
1569 }
1570 }
1571
1572 static bool OVS_WARN_UNUSED_RESULT
1573 raft_set_term(struct raft *raft, uint64_t term, const struct uuid *vote)
1574 {
1575 struct ovsdb_error *error = raft_write_state(raft->log, term, vote);
1576 if (!raft_handle_write_error(raft, error)) {
1577 return false;
1578 }
1579
1580 struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_TERM, true);
1581 raft->term = w->term.term = term;
1582 raft->vote = w->term.vote = vote ? *vote : UUID_ZERO;
1583 return true;
1584 }
1585
1586 static void
1587 raft_accept_vote(struct raft *raft, struct raft_server *s,
1588 const struct uuid *vote)
1589 {
1590 if (uuid_equals(&s->vote, vote)) {
1591 return;
1592 }
1593 if (!uuid_is_zero(&s->vote)) {
1594 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1595 char buf1[SID_LEN + 1];
1596 char buf2[SID_LEN + 1];
1597 VLOG_WARN_RL(&rl, "server %s changed its vote from %s to %s",
1598 s->nickname,
1599 raft_get_nickname(raft, &s->vote, buf1, sizeof buf1),
1600 raft_get_nickname(raft, vote, buf2, sizeof buf2));
1601 }
1602 s->vote = *vote;
1603 if (uuid_equals(vote, &raft->sid)
1604 && ++raft->n_votes > hmap_count(&raft->servers) / 2) {
1605 raft_become_leader(raft);
1606 }
1607 }
1608
1609 static void
1610 raft_start_election(struct raft *raft, bool leadership_transfer)
1611 {
1612 if (raft->leaving) {
1613 return;
1614 }
1615
1616 struct raft_server *me = raft_find_server(raft, &raft->sid);
1617 if (!me) {
1618 return;
1619 }
1620
1621 if (!raft_set_term(raft, raft->term + 1, &raft->sid)) {
1622 return;
1623 }
1624
1625 ovs_assert(raft->role != RAFT_LEADER);
1626 raft->role = RAFT_CANDIDATE;
1627 /* If there was no leader elected since last election, we know we are
1628 * retrying now. */
1629 raft->candidate_retrying = !raft->had_leader;
1630 raft->had_leader = false;
1631
1632 raft->n_votes = 0;
1633
1634 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1635 if (!VLOG_DROP_INFO(&rl)) {
1636 long long int now = time_msec();
1637 if (now >= raft->election_timeout) {
1638 VLOG_INFO("term %"PRIu64": %lld ms timeout expired, "
1639 "starting election",
1640 raft->term, now - raft->election_base);
1641 } else {
1642 VLOG_INFO("term %"PRIu64": starting election", raft->term);
1643 }
1644 }
1645 raft_reset_election_timer(raft);
1646
1647 struct raft_server *peer;
1648 HMAP_FOR_EACH (peer, hmap_node, &raft->servers) {
1649 peer->vote = UUID_ZERO;
1650 if (uuid_equals(&raft->sid, &peer->sid)) {
1651 continue;
1652 }
1653
1654 union raft_rpc rq = {
1655 .vote_request = {
1656 .common = {
1657 .type = RAFT_RPC_VOTE_REQUEST,
1658 .sid = peer->sid,
1659 },
1660 .term = raft->term,
1661 .last_log_index = raft->log_end - 1,
1662 .last_log_term = (
1663 raft->log_end > raft->log_start
1664 ? raft->entries[raft->log_end - raft->log_start - 1].term
1665 : raft->snap.term),
1666 .leadership_transfer = leadership_transfer,
1667 },
1668 };
1669 raft_send(raft, &rq);
1670 }
1671
1672 /* Vote for ourselves. */
1673 raft_accept_vote(raft, me, &raft->sid);
1674 }
1675
1676 static void
1677 raft_open_conn(struct raft *raft, const char *address, const struct uuid *sid)
1678 {
1679 if (strcmp(address, raft->local_address)
1680 && !raft_find_conn_by_address(raft, address)) {
1681 raft_add_conn(raft, jsonrpc_session_open(address, true), sid, false);
1682 }
1683 }
1684
1685 static void
1686 raft_conn_close(struct raft_conn *conn)
1687 {
1688 jsonrpc_session_close(conn->js);
1689 ovs_list_remove(&conn->list_node);
1690 free(conn->nickname);
1691 free(conn);
1692 }
1693
1694 /* Returns true if 'conn' should stay open, false if it should be closed. */
1695 static bool
1696 raft_conn_should_stay_open(struct raft *raft, struct raft_conn *conn)
1697 {
1698 /* Close the connection if it's actually dead. If necessary, we'll
1699 * initiate a new session later. */
1700 if (!jsonrpc_session_is_alive(conn->js)) {
1701 return false;
1702 }
1703
1704 /* Keep incoming sessions. We trust the originator to decide to drop
1705 * it. */
1706 if (conn->incoming) {
1707 return true;
1708 }
1709
1710 /* If we are joining the cluster, keep sessions to the remote addresses
1711 * that are supposed to be part of the cluster we're joining. */
1712 if (raft->joining && sset_contains(&raft->remote_addresses,
1713 jsonrpc_session_get_name(conn->js))) {
1714 return true;
1715 }
1716
1717 /* We have joined the cluster. If we did that "recently", then there is a
1718 * chance that we do not have the most recent server configuration log
1719 * entry. If so, it's a waste to disconnect from the servers that were in
1720 * remote_addresses and that will probably appear in the configuration,
1721 * just to reconnect to them a moment later when we do get the
1722 * configuration update. If we are not ourselves in the configuration,
1723 * then we know that there must be a new configuration coming up, so in
1724 * that case keep the connection. */
1725 if (!raft_find_server(raft, &raft->sid)) {
1726 return true;
1727 }
1728
1729 /* Keep the connection only if the server is part of the configuration. */
1730 return raft_find_server(raft, &conn->sid);
1731 }
1732
1733 /* Allows 'raft' to maintain the distributed log. Call this function as part
1734 * of the process's main loop. */
1735 void
1736 raft_run(struct raft *raft)
1737 {
1738 if (raft->left || raft->failed) {
1739 return;
1740 }
1741
1742 raft_waiters_run(raft);
1743
1744 if (!raft->listener && time_msec() >= raft->listen_backoff) {
1745 char *paddr = raft_make_address_passive(raft->local_address);
1746 int error = pstream_open(paddr, &raft->listener, DSCP_DEFAULT);
1747 if (error) {
1748 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1749 VLOG_WARN_RL(&rl, "%s: listen failed (%s)",
1750 paddr, ovs_strerror(error));
1751 raft->listen_backoff = time_msec() + 1000;
1752 }
1753 free(paddr);
1754 }
1755
1756 if (raft->listener) {
1757 struct stream *stream;
1758 int error = pstream_accept(raft->listener, &stream);
1759 if (!error) {
1760 raft_add_conn(raft, jsonrpc_session_open_unreliably(
1761 jsonrpc_open(stream), DSCP_DEFAULT), NULL,
1762 true);
1763 } else if (error != EAGAIN) {
1764 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1765 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
1766 pstream_get_name(raft->listener),
1767 ovs_strerror(error));
1768 }
1769 }
1770
1771 /* Run RPCs for all open sessions. */
1772 struct raft_conn *conn;
1773 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1774 raft_conn_run(raft, conn);
1775 }
1776
1777 /* Close unneeded sessions. */
1778 struct raft_conn *next;
1779 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1780 if (!raft_conn_should_stay_open(raft, conn)) {
1781 raft_conn_close(conn);
1782 }
1783 }
1784
1785 /* Open needed sessions. */
1786 struct raft_server *server;
1787 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1788 raft_open_conn(raft, server->address, &server->sid);
1789 }
1790 if (raft->joining) {
1791 const char *address;
1792 SSET_FOR_EACH (address, &raft->remote_addresses) {
1793 raft_open_conn(raft, address, NULL);
1794 }
1795 }
1796
1797 if (!raft->joining && time_msec() >= raft->election_timeout) {
1798 if (raft->role == RAFT_LEADER) {
1799 /* Check if majority of followers replied, then reset
1800 * election_timeout and reset s->replied. Otherwise, become
1801 * follower.
1802 *
1803 * Raft paper section 6.2: Leaders: A server might be in the leader
1804 * state, but if it isn’t the current leader, it could be
1805 * needlessly delaying client requests. For example, suppose a
1806 * leader is partitioned from the rest of the cluster, but it can
1807 * still communicate with a particular client. Without additional
1808 * mechanism, it could delay a request from that client forever,
1809 * being unable to replicate a log entry to any other servers.
1810 * Meanwhile, there might be another leader of a newer term that is
1811 * able to communicate with a majority of the cluster and would be
1812 * able to commit the client’s request. Thus, a leader in Raft
1813 * steps down if an election timeout elapses without a successful
1814 * round of heartbeats to a majority of its cluster; this allows
1815 * clients to retry their requests with another server. */
1816 int count = 0;
1817 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1818 if (server->replied) {
1819 count ++;
1820 }
1821 }
1822 if (count >= hmap_count(&raft->servers) / 2) {
1823 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1824 server->replied = false;
1825 }
1826 raft_reset_election_timer(raft);
1827 } else {
1828 raft_become_follower(raft);
1829 raft_start_election(raft, false);
1830 }
1831 } else {
1832 raft_start_election(raft, false);
1833 }
1834
1835 }
1836
1837 if (raft->leaving && time_msec() >= raft->leave_timeout) {
1838 raft_send_remove_server_requests(raft);
1839 }
1840
1841 if (raft->joining && time_msec() >= raft->join_timeout) {
1842 raft->join_timeout = time_msec() + 1000;
1843 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1844 raft_send_add_server_request(raft, conn);
1845 }
1846 }
1847
1848 long long int now = time_msec();
1849 if (now >= raft->ping_timeout) {
1850 if (raft->role == RAFT_LEADER) {
1851 raft_send_heartbeats(raft);
1852 }
1853 /* Check if any commands timeout. Timeout is set to twice the time of
1854 * election base time so that commands can complete properly during
1855 * leader election. E.g. a leader crashed and current node with pending
1856 * commands becomes new leader: the pending commands can still complete
1857 * if the crashed leader has replicated the transactions to majority of
1858 * followers before it crashed. */
1859 struct raft_command *cmd, *next_cmd;
1860 HMAP_FOR_EACH_SAFE (cmd, next_cmd, hmap_node, &raft->commands) {
1861 if (cmd->timestamp
1862 && now - cmd->timestamp > ELECTION_BASE_MSEC * 2) {
1863 raft_command_complete(raft, cmd, RAFT_CMD_TIMEOUT);
1864 }
1865 }
1866 raft_reset_ping_timer(raft);
1867 }
1868
1869 /* Do this only at the end; if we did it as soon as we set raft->left or
1870 * raft->failed in handling the RemoveServerReply, then it could easily
1871 * cause references to freed memory in RPC sessions, etc. */
1872 if (raft->left || raft->failed) {
1873 raft_close__(raft);
1874 }
1875 }
1876
1877 static void
1878 raft_wait_session(struct jsonrpc_session *js)
1879 {
1880 if (js) {
1881 jsonrpc_session_wait(js);
1882 jsonrpc_session_recv_wait(js);
1883 }
1884 }
1885
1886 /* Causes the next call to poll_block() to wake up when 'raft' needs to do
1887 * something. */
1888 void
1889 raft_wait(struct raft *raft)
1890 {
1891 if (raft->left || raft->failed) {
1892 return;
1893 }
1894
1895 raft_waiters_wait(raft);
1896
1897 if (raft->listener) {
1898 pstream_wait(raft->listener);
1899 } else {
1900 poll_timer_wait_until(raft->listen_backoff);
1901 }
1902
1903 struct raft_conn *conn;
1904 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1905 raft_wait_session(conn->js);
1906 }
1907
1908 if (!raft->joining) {
1909 poll_timer_wait_until(raft->election_timeout);
1910 } else {
1911 poll_timer_wait_until(raft->join_timeout);
1912 }
1913 if (raft->leaving) {
1914 poll_timer_wait_until(raft->leave_timeout);
1915 }
1916 if (raft->role == RAFT_LEADER || !hmap_is_empty(&raft->commands)) {
1917 poll_timer_wait_until(raft->ping_timeout);
1918 }
1919 }
1920
1921 static struct raft_waiter *
1922 raft_waiter_create(struct raft *raft, enum raft_waiter_type type,
1923 bool start_commit)
1924 {
1925 struct raft_waiter *w = xzalloc(sizeof *w);
1926 ovs_list_push_back(&raft->waiters, &w->list_node);
1927 w->commit_ticket = start_commit ? ovsdb_log_commit_start(raft->log) : 0;
1928 w->type = type;
1929 return w;
1930 }
1931
1932 /* Returns a human-readable representation of 'status' (or NULL if 'status' is
1933 * invalid). */
1934 const char *
1935 raft_command_status_to_string(enum raft_command_status status)
1936 {
1937 switch (status) {
1938 case RAFT_CMD_INCOMPLETE:
1939 return "operation still in progress";
1940 case RAFT_CMD_SUCCESS:
1941 return "success";
1942 case RAFT_CMD_NOT_LEADER:
1943 return "not leader";
1944 case RAFT_CMD_BAD_PREREQ:
1945 return "prerequisite check failed";
1946 case RAFT_CMD_LOST_LEADERSHIP:
1947 return "lost leadership";
1948 case RAFT_CMD_SHUTDOWN:
1949 return "server shutdown";
1950 case RAFT_CMD_IO_ERROR:
1951 return "I/O error";
1952 case RAFT_CMD_TIMEOUT:
1953 return "timeout";
1954 default:
1955 return NULL;
1956 }
1957 }
1958
1959 /* Converts human-readable status in 's' into status code in '*statusp'.
1960 * Returns true if successful, false if 's' is unknown. */
1961 bool
1962 raft_command_status_from_string(const char *s,
1963 enum raft_command_status *statusp)
1964 {
1965 for (enum raft_command_status status = 0; ; status++) {
1966 const char *s2 = raft_command_status_to_string(status);
1967 if (!s2) {
1968 *statusp = 0;
1969 return false;
1970 } else if (!strcmp(s, s2)) {
1971 *statusp = status;
1972 return true;
1973 }
1974 }
1975 }
1976
1977 static const struct uuid *
1978 raft_get_eid(const struct raft *raft, uint64_t index)
1979 {
1980 for (; index >= raft->log_start; index--) {
1981 const struct raft_entry *e = raft_get_entry(raft, index);
1982 if (e->data) {
1983 return &e->eid;
1984 }
1985 }
1986 return &raft->snap.eid;
1987 }
1988
1989 const struct uuid *
1990 raft_current_eid(const struct raft *raft)
1991 {
1992 return raft_get_eid(raft, raft->log_end - 1);
1993 }
1994
1995 static struct raft_command *
1996 raft_command_create_completed(enum raft_command_status status)
1997 {
1998 ovs_assert(status != RAFT_CMD_INCOMPLETE);
1999
2000 struct raft_command *cmd = xzalloc(sizeof *cmd);
2001 cmd->n_refs = 1;
2002 cmd->status = status;
2003 return cmd;
2004 }
2005
2006 static struct raft_command *
2007 raft_command_create_incomplete(struct raft *raft, uint64_t index)
2008 {
2009 struct raft_command *cmd = xzalloc(sizeof *cmd);
2010 cmd->n_refs = 2; /* One for client, one for raft->commands. */
2011 cmd->index = index;
2012 cmd->status = RAFT_CMD_INCOMPLETE;
2013 hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
2014 return cmd;
2015 }
2016
2017 static struct raft_command * OVS_WARN_UNUSED_RESULT
2018 raft_command_initiate(struct raft *raft,
2019 const struct json *data, const struct json *servers,
2020 const struct uuid *eid)
2021 {
2022 /* Write to local log. */
2023 uint64_t index = raft->log_end;
2024 if (!raft_handle_write_error(
2025 raft, raft_write_entry(
2026 raft, raft->term, json_nullable_clone(data), eid,
2027 json_nullable_clone(servers)))) {
2028 return raft_command_create_completed(RAFT_CMD_IO_ERROR);
2029 }
2030
2031 struct raft_command *cmd = raft_command_create_incomplete(raft, index);
2032 ovs_assert(eid);
2033 cmd->eid = *eid;
2034 cmd->timestamp = time_msec();
2035
2036 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index = cmd->index;
2037
2038 if (failure_test == FT_CRASH_BEFORE_SEND_APPEND_REQ) {
2039 ovs_fatal(0, "Raft test: crash before sending append_request.");
2040 }
2041 /* Write to remote logs. */
2042 struct raft_server *s;
2043 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2044 if (!uuid_equals(&s->sid, &raft->sid) && s->next_index == index) {
2045 raft_send_append_request(raft, s, 1, "execute command");
2046 s->next_index++;
2047 }
2048 }
2049 if (failure_test == FT_CRASH_AFTER_SEND_APPEND_REQ) {
2050 ovs_fatal(0, "Raft test: crash after sending append_request.");
2051 }
2052 raft_reset_ping_timer(raft);
2053
2054 return cmd;
2055 }
2056
2057 static void
2058 log_all_commands(struct raft *raft)
2059 {
2060 struct raft_command *cmd, *next;
2061 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2062 VLOG_DBG("raft command eid: "UUID_FMT, UUID_ARGS(&cmd->eid));
2063 }
2064 }
2065
2066 static struct raft_command * OVS_WARN_UNUSED_RESULT
2067 raft_command_execute__(struct raft *raft,
2068 const struct json *data, const struct json *servers,
2069 const struct uuid *prereq, struct uuid *result)
2070 {
2071 if (raft->joining || raft->leaving || raft->left || raft->failed) {
2072 return raft_command_create_completed(RAFT_CMD_SHUTDOWN);
2073 }
2074
2075 if (raft->role != RAFT_LEADER) {
2076 /* Consider proxying the command to the leader. We can only do that if
2077 * we know the leader and the command does not change the set of
2078 * servers. We do not proxy commands without prerequisites, even
2079 * though we could, because in an OVSDB context a log entry doesn't
2080 * make sense without context. */
2081 if (servers || !data
2082 || raft->role != RAFT_FOLLOWER || uuid_is_zero(&raft->leader_sid)
2083 || !prereq) {
2084 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2085 }
2086 }
2087
2088 struct uuid eid = data ? uuid_random() : UUID_ZERO;
2089 if (result) {
2090 *result = eid;
2091 }
2092
2093 if (raft->role != RAFT_LEADER) {
2094 const union raft_rpc rpc = {
2095 .execute_command_request = {
2096 .common = {
2097 .type = RAFT_RPC_EXECUTE_COMMAND_REQUEST,
2098 .sid = raft->leader_sid,
2099 },
2100 .data = CONST_CAST(struct json *, data),
2101 .prereq = *prereq,
2102 .result = eid,
2103 }
2104 };
2105 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REQ) {
2106 ovs_fatal(0, "Raft test: crash before sending "
2107 "execute_command_request");
2108 }
2109 if (!raft_send(raft, &rpc)) {
2110 /* Couldn't send command, so it definitely failed. */
2111 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2112 }
2113 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REQ) {
2114 ovs_fatal(0, "Raft test: crash after sending "
2115 "execute_command_request");
2116 }
2117
2118 struct raft_command *cmd = raft_command_create_incomplete(raft, 0);
2119 cmd->timestamp = time_msec();
2120 cmd->eid = eid;
2121 log_all_commands(raft);
2122 return cmd;
2123 }
2124
2125 const struct uuid *current_eid = raft_current_eid(raft);
2126 if (prereq && !uuid_equals(prereq, current_eid)) {
2127 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2128 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
2129 "prerequisite "UUID_FMT,
2130 UUID_ARGS(current_eid), UUID_ARGS(prereq));
2131 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ);
2132 }
2133
2134 return raft_command_initiate(raft, data, servers, &eid);
2135 }
2136
2137 /* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2138 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2139 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2140 * populated with the entry ID for the new log entry.
2141 *
2142 * Returns a "struct raft_command" that may be used to track progress adding
2143 * the log entry. The caller must eventually free the returned structure, with
2144 * raft_command_unref(). */
2145 struct raft_command * OVS_WARN_UNUSED_RESULT
2146 raft_command_execute(struct raft *raft, const struct json *data,
2147 const struct uuid *prereq, struct uuid *result)
2148 {
2149 return raft_command_execute__(raft, data, NULL, prereq, result);
2150 }
2151
2152 /* Returns the status of 'cmd'. */
2153 enum raft_command_status
2154 raft_command_get_status(const struct raft_command *cmd)
2155 {
2156 ovs_assert(cmd->n_refs > 0);
2157 return cmd->status;
2158 }
2159
2160 /* Returns the index of the log entry at which 'cmd' was committed.
2161 *
2162 * This function works only with successful commands. */
2163 uint64_t
2164 raft_command_get_commit_index(const struct raft_command *cmd)
2165 {
2166 ovs_assert(cmd->n_refs > 0);
2167 ovs_assert(cmd->status == RAFT_CMD_SUCCESS);
2168 return cmd->index;
2169 }
2170
2171 /* Frees 'cmd'. */
2172 void
2173 raft_command_unref(struct raft_command *cmd)
2174 {
2175 if (cmd) {
2176 ovs_assert(cmd->n_refs > 0);
2177 if (!--cmd->n_refs) {
2178 free(cmd);
2179 }
2180 }
2181 }
2182
2183 /* Causes poll_block() to wake up when 'cmd' has status to report. */
2184 void
2185 raft_command_wait(const struct raft_command *cmd)
2186 {
2187 if (cmd->status != RAFT_CMD_INCOMPLETE) {
2188 poll_immediate_wake();
2189 }
2190 }
2191
2192 static void
2193 raft_command_complete(struct raft *raft,
2194 struct raft_command *cmd,
2195 enum raft_command_status status)
2196 {
2197 VLOG_DBG("raft_command_complete eid "UUID_FMT" status: %s",
2198 UUID_ARGS(&cmd->eid), raft_command_status_to_string(status));
2199 if (!uuid_is_zero(&cmd->sid)) {
2200 uint64_t commit_index = status == RAFT_CMD_SUCCESS ? cmd->index : 0;
2201 raft_send_execute_command_reply(raft, &cmd->sid, &cmd->eid, status,
2202 commit_index);
2203 }
2204
2205 ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
2206 ovs_assert(cmd->n_refs > 0);
2207 hmap_remove(&raft->commands, &cmd->hmap_node);
2208 cmd->status = status;
2209 raft_command_unref(cmd);
2210 }
2211
2212 static void
2213 raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
2214 {
2215 struct raft_command *cmd, *next;
2216 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2217 raft_command_complete(raft, cmd, status);
2218 }
2219 }
2220
2221 static struct raft_command *
2222 raft_find_command_by_eid(struct raft *raft, const struct uuid *eid)
2223 {
2224 struct raft_command *cmd;
2225
2226 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2227 if (uuid_equals(&cmd->eid, eid)) {
2228 return cmd;
2229 }
2230 }
2231 return NULL;
2232 }
2233 \f
2234 #define RAFT_RPC(ENUM, NAME) \
2235 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2236 RAFT_RPC_TYPES
2237 #undef RAFT_RPC
2238
2239 static void
2240 raft_handle_hello_request(struct raft *raft OVS_UNUSED,
2241 const struct raft_hello_request *hello OVS_UNUSED)
2242 {
2243 }
2244
2245 /* 'sid' is the server being added. */
2246 static void
2247 raft_send_add_server_reply__(struct raft *raft, const struct uuid *sid,
2248 const char *address,
2249 bool success, const char *comment)
2250 {
2251 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2252 if (!VLOG_DROP_INFO(&rl)) {
2253 struct ds s = DS_EMPTY_INITIALIZER;
2254 char buf[SID_LEN + 1];
2255 ds_put_format(&s, "adding %s ("SID_FMT" at %s) "
2256 "to cluster "CID_FMT" %s",
2257 raft_get_nickname(raft, sid, buf, sizeof buf),
2258 SID_ARGS(sid), address, CID_ARGS(&raft->cid),
2259 success ? "succeeded" : "failed");
2260 if (comment) {
2261 ds_put_format(&s, " (%s)", comment);
2262 }
2263 VLOG_INFO("%s", ds_cstr(&s));
2264 ds_destroy(&s);
2265 }
2266
2267 union raft_rpc rpy = {
2268 .add_server_reply = {
2269 .common = {
2270 .type = RAFT_RPC_ADD_SERVER_REPLY,
2271 .sid = *sid,
2272 .comment = CONST_CAST(char *, comment),
2273 },
2274 .success = success,
2275 }
2276 };
2277
2278 struct sset *remote_addresses = &rpy.add_server_reply.remote_addresses;
2279 sset_init(remote_addresses);
2280 if (!raft->joining) {
2281 struct raft_server *s;
2282 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2283 if (!uuid_equals(&s->sid, &raft->sid)) {
2284 sset_add(remote_addresses, s->address);
2285 }
2286 }
2287 }
2288
2289 raft_send(raft, &rpy);
2290
2291 sset_destroy(remote_addresses);
2292 }
2293
2294 static void
2295 raft_send_remove_server_reply_rpc(struct raft *raft,
2296 const struct uuid *dst_sid,
2297 const struct uuid *target_sid,
2298 bool success, const char *comment)
2299 {
2300 if (uuid_equals(&raft->sid, dst_sid)) {
2301 if (success && uuid_equals(&raft->sid, target_sid)) {
2302 raft_finished_leaving_cluster(raft);
2303 }
2304 return;
2305 }
2306
2307 const union raft_rpc rpy = {
2308 .remove_server_reply = {
2309 .common = {
2310 .type = RAFT_RPC_REMOVE_SERVER_REPLY,
2311 .sid = *dst_sid,
2312 .comment = CONST_CAST(char *, comment),
2313 },
2314 .target_sid = (uuid_equals(dst_sid, target_sid)
2315 ? UUID_ZERO
2316 : *target_sid),
2317 .success = success,
2318 }
2319 };
2320 raft_send(raft, &rpy);
2321 }
2322
2323 static void
2324 raft_send_remove_server_reply__(struct raft *raft,
2325 const struct uuid *target_sid,
2326 const struct uuid *requester_sid,
2327 struct unixctl_conn *requester_conn,
2328 bool success, const char *comment)
2329 {
2330 struct ds s = DS_EMPTY_INITIALIZER;
2331 ds_put_format(&s, "request ");
2332 if (!uuid_is_zero(requester_sid)) {
2333 char buf[SID_LEN + 1];
2334 ds_put_format(&s, "by %s",
2335 raft_get_nickname(raft, requester_sid, buf, sizeof buf));
2336 } else {
2337 ds_put_cstr(&s, "via unixctl");
2338 }
2339 ds_put_cstr(&s, " to remove ");
2340 if (!requester_conn && uuid_equals(target_sid, requester_sid)) {
2341 ds_put_cstr(&s, "itself");
2342 } else {
2343 char buf[SID_LEN + 1];
2344 ds_put_cstr(&s, raft_get_nickname(raft, target_sid, buf, sizeof buf));
2345 if (uuid_equals(target_sid, &raft->sid)) {
2346 ds_put_cstr(&s, " (ourselves)");
2347 }
2348 }
2349 ds_put_format(&s, " from cluster "CID_FMT" %s",
2350 CID_ARGS(&raft->cid),
2351 success ? "succeeded" : "failed");
2352 if (comment) {
2353 ds_put_format(&s, " (%s)", comment);
2354 }
2355
2356 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2357 VLOG_INFO_RL(&rl, "%s", ds_cstr(&s));
2358
2359 /* Send RemoveServerReply to the requester (which could be a server or a
2360 * unixctl connection. Also always send it to the removed server; this
2361 * allows it to be sure that it's really removed and update its log and
2362 * disconnect permanently. */
2363 if (!uuid_is_zero(requester_sid)) {
2364 raft_send_remove_server_reply_rpc(raft, requester_sid, target_sid,
2365 success, comment);
2366 }
2367 if (!uuid_equals(requester_sid, target_sid)) {
2368 raft_send_remove_server_reply_rpc(raft, target_sid, target_sid,
2369 success, comment);
2370 }
2371 if (requester_conn) {
2372 if (success) {
2373 unixctl_command_reply(requester_conn, ds_cstr(&s));
2374 } else {
2375 unixctl_command_reply_error(requester_conn, ds_cstr(&s));
2376 }
2377 }
2378
2379 ds_destroy(&s);
2380 }
2381
2382 static void
2383 raft_send_add_server_reply(struct raft *raft,
2384 const struct raft_add_server_request *rq,
2385 bool success, const char *comment)
2386 {
2387 return raft_send_add_server_reply__(raft, &rq->common.sid, rq->address,
2388 success, comment);
2389 }
2390
2391 static void
2392 raft_send_remove_server_reply(struct raft *raft,
2393 const struct raft_remove_server_request *rq,
2394 bool success, const char *comment)
2395 {
2396 return raft_send_remove_server_reply__(raft, &rq->sid, &rq->common.sid,
2397 rq->requester_conn, success,
2398 comment);
2399 }
2400
2401 static void
2402 raft_become_follower(struct raft *raft)
2403 {
2404 raft->leader_sid = UUID_ZERO;
2405 if (raft->role == RAFT_FOLLOWER) {
2406 return;
2407 }
2408
2409 raft->role = RAFT_FOLLOWER;
2410 raft_reset_election_timer(raft);
2411
2412 /* Notify clients about lost leadership.
2413 *
2414 * We do not reverse our changes to 'raft->servers' because the new
2415 * configuration is already part of the log. Possibly the configuration
2416 * log entry will not be committed, but until we know that we must use the
2417 * new configuration. Our AppendEntries processing will properly update
2418 * the server configuration later, if necessary. */
2419 struct raft_server *s;
2420 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
2421 raft_send_add_server_reply__(raft, &s->sid, s->address, false,
2422 RAFT_SERVER_LOST_LEADERSHIP);
2423 }
2424 if (raft->remove_server) {
2425 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
2426 &raft->remove_server->requester_sid,
2427 raft->remove_server->requester_conn,
2428 false, RAFT_SERVER_LOST_LEADERSHIP);
2429 raft_server_destroy(raft->remove_server);
2430 raft->remove_server = NULL;
2431 }
2432
2433 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2434 }
2435
2436 static void
2437 raft_send_append_request(struct raft *raft,
2438 struct raft_server *peer, unsigned int n,
2439 const char *comment)
2440 {
2441 ovs_assert(raft->role == RAFT_LEADER);
2442
2443 const union raft_rpc rq = {
2444 .append_request = {
2445 .common = {
2446 .type = RAFT_RPC_APPEND_REQUEST,
2447 .sid = peer->sid,
2448 .comment = CONST_CAST(char *, comment),
2449 },
2450 .term = raft->term,
2451 .prev_log_index = peer->next_index - 1,
2452 .prev_log_term = (peer->next_index - 1 >= raft->log_start
2453 ? raft->entries[peer->next_index - 1
2454 - raft->log_start].term
2455 : raft->snap.term),
2456 .leader_commit = raft->commit_index,
2457 .entries = &raft->entries[peer->next_index - raft->log_start],
2458 .n_entries = n,
2459 },
2460 };
2461 raft_send(raft, &rq);
2462 }
2463
2464 static void
2465 raft_send_heartbeats(struct raft *raft)
2466 {
2467 struct raft_server *s;
2468 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2469 if (!uuid_equals(&raft->sid, &s->sid)) {
2470 raft_send_append_request(raft, s, 0, "heartbeat");
2471 }
2472 }
2473
2474 /* Send anyone waiting for a command to complete a ping to let them
2475 * know we're still working on it. */
2476 struct raft_command *cmd;
2477 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2478 if (!uuid_is_zero(&cmd->sid)) {
2479 raft_send_execute_command_reply(raft, &cmd->sid,
2480 &cmd->eid,
2481 RAFT_CMD_INCOMPLETE, 0);
2482 }
2483 }
2484
2485 raft_reset_ping_timer(raft);
2486 }
2487
2488 /* Initializes the fields in 's' that represent the leader's view of the
2489 * server. */
2490 static void
2491 raft_server_init_leader(struct raft *raft, struct raft_server *s)
2492 {
2493 s->next_index = raft->log_end;
2494 s->match_index = 0;
2495 s->phase = RAFT_PHASE_STABLE;
2496 s->replied = false;
2497 }
2498
2499 static void
2500 raft_set_leader(struct raft *raft, const struct uuid *sid)
2501 {
2502 raft->leader_sid = *sid;
2503 raft->had_leader = true;
2504 raft->candidate_retrying = false;
2505 }
2506
2507 static void
2508 raft_become_leader(struct raft *raft)
2509 {
2510 log_all_commands(raft);
2511
2512 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
2513 VLOG_INFO_RL(&rl, "term %"PRIu64": elected leader by %d+ of "
2514 "%"PRIuSIZE" servers", raft->term,
2515 raft->n_votes, hmap_count(&raft->servers));
2516
2517 ovs_assert(raft->role != RAFT_LEADER);
2518 raft->role = RAFT_LEADER;
2519 raft_set_leader(raft, &raft->sid);
2520 raft_reset_election_timer(raft);
2521 raft_reset_ping_timer(raft);
2522
2523 struct raft_server *s;
2524 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2525 raft_server_init_leader(raft, s);
2526 }
2527
2528 raft_update_our_match_index(raft, raft->log_end - 1);
2529 raft_send_heartbeats(raft);
2530
2531 /* Write the fact that we are leader to the log. This is not used by the
2532 * algorithm (although it could be, for quick restart), but it is used for
2533 * offline analysis to check for conformance with the properties that Raft
2534 * guarantees. */
2535 struct raft_record r = {
2536 .type = RAFT_REC_LEADER,
2537 .term = raft->term,
2538 .sid = raft->sid,
2539 };
2540 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2541
2542 /* Initiate a no-op commit. Otherwise we might never find out what's in
2543 * the log. See section 6.4 item 1:
2544 *
2545 * The Leader Completeness Property guarantees that a leader has all
2546 * committed entries, but at the start of its term, it may not know
2547 * which those are. To find out, it needs to commit an entry from its
2548 * term. Raft handles this by having each leader commit a blank no-op
2549 * entry into the log at the start of its term. As soon as this no-op
2550 * entry is committed, the leader’s commit index will be at least as
2551 * large as any other servers’ during its term.
2552 */
2553 raft_command_unref(raft_command_execute__(raft, NULL, NULL, NULL, NULL));
2554 }
2555
2556 /* Processes term 'term' received as part of RPC 'common'. Returns true if the
2557 * caller should continue processing the RPC, false if the caller should reject
2558 * it due to a stale term. */
2559 static bool
2560 raft_receive_term__(struct raft *raft, const struct raft_rpc_common *common,
2561 uint64_t term)
2562 {
2563 /* Section 3.3 says:
2564 *
2565 * Current terms are exchanged whenever servers communicate; if one
2566 * server’s current term is smaller than the other’s, then it updates
2567 * its current term to the larger value. If a candidate or leader
2568 * discovers that its term is out of date, it immediately reverts to
2569 * follower state. If a server receives a request with a stale term
2570 * number, it rejects the request.
2571 */
2572 if (term > raft->term) {
2573 if (!raft_set_term(raft, term, NULL)) {
2574 /* Failed to update the term to 'term'. */
2575 return false;
2576 }
2577 raft_become_follower(raft);
2578 } else if (term < raft->term) {
2579 char buf[SID_LEN + 1];
2580 VLOG_INFO("rejecting term %"PRIu64" < current term %"PRIu64" received "
2581 "in %s message from server %s",
2582 term, raft->term,
2583 raft_rpc_type_to_string(common->type),
2584 raft_get_nickname(raft, &common->sid, buf, sizeof buf));
2585 return false;
2586 }
2587 return true;
2588 }
2589
2590 static void
2591 raft_get_servers_from_log(struct raft *raft, enum vlog_level level)
2592 {
2593 const struct json *servers_json = raft->snap.servers;
2594 for (uint64_t index = raft->log_end - 1; index >= raft->log_start;
2595 index--) {
2596 struct raft_entry *e = &raft->entries[index - raft->log_start];
2597 if (e->servers) {
2598 servers_json = e->servers;
2599 break;
2600 }
2601 }
2602
2603 struct hmap servers;
2604 struct ovsdb_error *error = raft_servers_from_json(servers_json, &servers);
2605 ovs_assert(!error);
2606 raft_set_servers(raft, &servers, level);
2607 raft_servers_destroy(&servers);
2608 }
2609
2610 /* Truncates the log, so that raft->log_end becomes 'new_end'.
2611 *
2612 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2613 * log file, but we don't have the right information to know how long it should
2614 * be. What we actually do is to append entries for older indexes to the
2615 * on-disk log; when we re-read it later, these entries truncate the log.
2616 *
2617 * Returns true if any of the removed log entries were server configuration
2618 * entries, false otherwise. */
2619 static bool
2620 raft_truncate(struct raft *raft, uint64_t new_end)
2621 {
2622 ovs_assert(new_end >= raft->log_start);
2623 if (raft->log_end > new_end) {
2624 char buf[SID_LEN + 1];
2625 VLOG_INFO("%s truncating %"PRIu64 " entries from end of log",
2626 raft_get_nickname(raft, &raft->sid, buf, sizeof buf),
2627 raft->log_end - new_end);
2628 }
2629
2630 bool servers_changed = false;
2631 while (raft->log_end > new_end) {
2632 struct raft_entry *entry = &raft->entries[--raft->log_end
2633 - raft->log_start];
2634 if (entry->servers) {
2635 servers_changed = true;
2636 }
2637 raft_entry_uninit(entry);
2638 }
2639 return servers_changed;
2640 }
2641
2642 static const struct json *
2643 raft_peek_next_entry(struct raft *raft, struct uuid *eid)
2644 {
2645 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2646 ovs_assert(raft->log_start <= raft->last_applied + 2);
2647 ovs_assert(raft->last_applied <= raft->commit_index);
2648 ovs_assert(raft->commit_index < raft->log_end);
2649
2650 if (raft->joining || raft->failed) {
2651 return NULL;
2652 }
2653
2654 if (raft->log_start == raft->last_applied + 2) {
2655 *eid = raft->snap.eid;
2656 return raft->snap.data;
2657 }
2658
2659 while (raft->last_applied < raft->commit_index) {
2660 const struct raft_entry *e = raft_get_entry(raft,
2661 raft->last_applied + 1);
2662 if (e->data) {
2663 *eid = e->eid;
2664 return e->data;
2665 }
2666 raft->last_applied++;
2667 }
2668 return NULL;
2669 }
2670
2671 static const struct json *
2672 raft_get_next_entry(struct raft *raft, struct uuid *eid)
2673 {
2674 const struct json *data = raft_peek_next_entry(raft, eid);
2675 if (data) {
2676 raft->last_applied++;
2677 }
2678 return data;
2679 }
2680
2681 /* Updates commit index in raft log. If commit index is already up-to-date
2682 * it does nothing and return false, otherwise, returns true. */
2683 static bool
2684 raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
2685 {
2686 if (new_commit_index <= raft->commit_index) {
2687 return false;
2688 }
2689
2690 if (raft->role == RAFT_LEADER) {
2691 while (raft->commit_index < new_commit_index) {
2692 uint64_t index = ++raft->commit_index;
2693 const struct raft_entry *e = raft_get_entry(raft, index);
2694 if (e->data) {
2695 struct raft_command *cmd
2696 = raft_find_command_by_eid(raft, &e->eid);
2697 if (cmd) {
2698 if (!cmd->index) {
2699 VLOG_DBG("Command completed after role change from"
2700 " follower to leader "UUID_FMT,
2701 UUID_ARGS(&e->eid));
2702 cmd->index = index;
2703 }
2704 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2705 }
2706 }
2707 if (e->servers) {
2708 /* raft_run_reconfigure() can write a new Raft entry, which can
2709 * reallocate raft->entries, which would invalidate 'e', so
2710 * this case must be last, after the one for 'e->data'. */
2711 raft_run_reconfigure(raft);
2712 }
2713 }
2714 } else {
2715 raft->commit_index = new_commit_index;
2716 /* Check if any pending command can be completed, and complete it.
2717 * This can happen when leader fail-over before sending
2718 * execute_command_reply. */
2719 const struct uuid *eid = raft_get_eid(raft, new_commit_index);
2720 struct raft_command *cmd = raft_find_command_by_eid(raft, eid);
2721 if (cmd) {
2722 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2723 VLOG_INFO_RL(&rl,
2724 "Command completed without reply (eid: "UUID_FMT", "
2725 "commit index: %"PRIu64")",
2726 UUID_ARGS(eid), new_commit_index);
2727 cmd->index = new_commit_index;
2728 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2729 }
2730 }
2731
2732 /* Write the commit index to the log. The next time we restart, this
2733 * allows us to start exporting a reasonably fresh log, instead of a log
2734 * that only contains the snapshot. */
2735 struct raft_record r = {
2736 .type = RAFT_REC_COMMIT_INDEX,
2737 .commit_index = raft->commit_index,
2738 };
2739 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2740 return true;
2741 }
2742
2743 /* This doesn't use rq->entries (but it does use rq->n_entries). */
2744 static void
2745 raft_send_append_reply(struct raft *raft, const struct raft_append_request *rq,
2746 enum raft_append_result result, const char *comment)
2747 {
2748 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2749 * min(leaderCommit, index of last new entry)" */
2750 if (result == RAFT_APPEND_OK && rq->leader_commit > raft->commit_index) {
2751 raft_update_commit_index(
2752 raft, MIN(rq->leader_commit, rq->prev_log_index + rq->n_entries));
2753 }
2754
2755 /* Send reply. */
2756 union raft_rpc reply = {
2757 .append_reply = {
2758 .common = {
2759 .type = RAFT_RPC_APPEND_REPLY,
2760 .sid = rq->common.sid,
2761 .comment = CONST_CAST(char *, comment),
2762 },
2763 .term = raft->term,
2764 .log_end = raft->log_end,
2765 .prev_log_index = rq->prev_log_index,
2766 .prev_log_term = rq->prev_log_term,
2767 .n_entries = rq->n_entries,
2768 .result = result,
2769 }
2770 };
2771 raft_send(raft, &reply);
2772 }
2773
2774 /* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2775 * NULL. Otherwise, returns an explanation for the mismatch. */
2776 static const char *
2777 match_index_and_term(const struct raft *raft,
2778 uint64_t prev_log_index, uint64_t prev_log_term)
2779 {
2780 if (prev_log_index < raft->log_start - 1) {
2781 return "mismatch before start of log";
2782 } else if (prev_log_index == raft->log_start - 1) {
2783 if (prev_log_term != raft->snap.term) {
2784 return "prev_term mismatch";
2785 }
2786 } else if (prev_log_index < raft->log_end) {
2787 if (raft->entries[prev_log_index - raft->log_start].term
2788 != prev_log_term) {
2789 return "term mismatch";
2790 }
2791 } else {
2792 /* prev_log_index >= raft->log_end */
2793 return "mismatch past end of log";
2794 }
2795 return NULL;
2796 }
2797
2798 static void
2799 raft_handle_append_entries(struct raft *raft,
2800 const struct raft_append_request *rq,
2801 uint64_t prev_log_index, uint64_t prev_log_term,
2802 const struct raft_entry *entries,
2803 unsigned int n_entries)
2804 {
2805 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2806 * the index and term of the entry in its log that immediately precedes
2807 * the new entries. If the follower does not find an entry in its log
2808 * with the same index and term, then it refuses the new entries." */
2809 const char *mismatch = match_index_and_term(raft, prev_log_index,
2810 prev_log_term);
2811 if (mismatch) {
2812 VLOG_INFO("rejecting append_request because previous entry "
2813 "%"PRIu64",%"PRIu64" not in local log (%s)",
2814 prev_log_term, prev_log_index, mismatch);
2815 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY, mismatch);
2816 return;
2817 }
2818
2819 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2820 * index but different terms), delete the existing entry and all that
2821 * follow it." */
2822 unsigned int i;
2823 bool servers_changed = false;
2824 for (i = 0; ; i++) {
2825 if (i >= n_entries) {
2826 /* No change. */
2827 if (rq->common.comment
2828 && !strcmp(rq->common.comment, "heartbeat")) {
2829 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "heartbeat");
2830 } else {
2831 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2832 }
2833 return;
2834 }
2835
2836 uint64_t log_index = (prev_log_index + 1) + i;
2837 if (log_index >= raft->log_end) {
2838 break;
2839 }
2840 if (raft->entries[log_index - raft->log_start].term
2841 != entries[i].term) {
2842 if (raft_truncate(raft, log_index)) {
2843 servers_changed = true;
2844 }
2845 break;
2846 }
2847 }
2848
2849 if (failure_test == FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE) {
2850 ovs_fatal(0, "Raft test: crash after receiving append_request with "
2851 "update.");
2852 }
2853 /* Figure 3.1: "Append any entries not already in the log." */
2854 struct ovsdb_error *error = NULL;
2855 bool any_written = false;
2856 for (; i < n_entries; i++) {
2857 const struct raft_entry *e = &entries[i];
2858 error = raft_write_entry(raft, e->term,
2859 json_nullable_clone(e->data), &e->eid,
2860 json_nullable_clone(e->servers));
2861 if (error) {
2862 break;
2863 }
2864 any_written = true;
2865 if (e->servers) {
2866 servers_changed = true;
2867 }
2868 }
2869
2870 if (any_written) {
2871 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index
2872 = raft->log_end - 1;
2873 }
2874 if (servers_changed) {
2875 /* The set of servers might have changed; check. */
2876 raft_get_servers_from_log(raft, VLL_INFO);
2877 }
2878
2879 if (error) {
2880 char *s = ovsdb_error_to_string_free(error);
2881 VLOG_ERR("%s", s);
2882 free(s);
2883 raft_send_append_reply(raft, rq, RAFT_APPEND_IO_ERROR, "I/O error");
2884 return;
2885 }
2886
2887 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "log updated");
2888 }
2889
2890 static bool
2891 raft_update_leader(struct raft *raft, const struct uuid *sid)
2892 {
2893 if (raft->role == RAFT_LEADER) {
2894 char buf[SID_LEN + 1];
2895 VLOG_ERR("this server is leader but server %s claims to be",
2896 raft_get_nickname(raft, sid, buf, sizeof buf));
2897 return false;
2898 } else if (!uuid_equals(sid, &raft->leader_sid)) {
2899 if (!uuid_is_zero(&raft->leader_sid)) {
2900 char buf1[SID_LEN + 1];
2901 char buf2[SID_LEN + 1];
2902 VLOG_ERR("leader for term %"PRIu64" changed from %s to %s",
2903 raft->term,
2904 raft_get_nickname(raft, &raft->leader_sid,
2905 buf1, sizeof buf1),
2906 raft_get_nickname(raft, sid, buf2, sizeof buf2));
2907 } else {
2908 char buf[SID_LEN + 1];
2909 VLOG_INFO("server %s is leader for term %"PRIu64,
2910 raft_get_nickname(raft, sid, buf, sizeof buf),
2911 raft->term);
2912 }
2913 raft_set_leader(raft, sid);
2914
2915 /* Record the leader to the log. This is not used by the algorithm
2916 * (although it could be, for quick restart), but it is used for
2917 * offline analysis to check for conformance with the properties
2918 * that Raft guarantees. */
2919 struct raft_record r = {
2920 .type = RAFT_REC_LEADER,
2921 .term = raft->term,
2922 .sid = *sid,
2923 };
2924 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2925 }
2926 return true;
2927 }
2928
2929 static void
2930 raft_handle_append_request(struct raft *raft,
2931 const struct raft_append_request *rq)
2932 {
2933 /* We do not check whether the server that sent the request is part of the
2934 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
2935 * from a leader that is not part of the server’s latest configuration.
2936 * Otherwise, a new server could never be added to the cluster (it would
2937 * never accept any log entries preceding the configuration entry that adds
2938 * the server)." */
2939 if (!raft_update_leader(raft, &rq->common.sid)) {
2940 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
2941 "usurped leadership");
2942 return;
2943 }
2944 raft_reset_election_timer(raft);
2945
2946 /* First check for the common case, where the AppendEntries request is
2947 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
2948 * like this:
2949 *
2950 * rq->prev_log_index
2951 * | first_entry_index
2952 * | | nth_entry_index
2953 * | | |
2954 * v v v
2955 * +---+---+---+---+
2956 * T | T | T | T | T |
2957 * +---+-------+---+
2958 * +---+---+---+---+
2959 * T | T | T | T | T |
2960 * +---+---+---+---+
2961 * ^ ^
2962 * | |
2963 * log_start log_end
2964 * */
2965 uint64_t first_entry_index = rq->prev_log_index + 1;
2966 uint64_t nth_entry_index = rq->prev_log_index + rq->n_entries;
2967 if (OVS_LIKELY(first_entry_index >= raft->log_start)) {
2968 raft_handle_append_entries(raft, rq,
2969 rq->prev_log_index, rq->prev_log_term,
2970 rq->entries, rq->n_entries);
2971 return;
2972 }
2973
2974 /* Now a series of checks for odd cases, where the AppendEntries request
2975 * extends earlier than the beginning of our log, into the log entries
2976 * discarded by the most recent snapshot. */
2977
2978 /*
2979 * Handle the case where the indexes covered by rq->entries[] are entirely
2980 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
2981 * everything in the AppendEntries request must already have been
2982 * committed, and we might as well return true.
2983 *
2984 * rq->prev_log_index
2985 * | first_entry_index
2986 * | | nth_entry_index
2987 * | | |
2988 * v v v
2989 * +---+---+---+---+
2990 * T | T | T | T | T |
2991 * +---+-------+---+
2992 * +---+---+---+---+
2993 * T | T | T | T | T |
2994 * +---+---+---+---+
2995 * ^ ^
2996 * | |
2997 * log_start log_end
2998 */
2999 if (nth_entry_index < raft->log_start - 1) {
3000 raft_send_append_reply(raft, rq, RAFT_APPEND_OK,
3001 "append before log start");
3002 return;
3003 }
3004
3005 /*
3006 * Handle the case where the last entry in rq->entries[] has the same index
3007 * as 'log_start - 1', so we can compare their terms:
3008 *
3009 * rq->prev_log_index
3010 * | first_entry_index
3011 * | | nth_entry_index
3012 * | | |
3013 * v v v
3014 * +---+---+---+---+
3015 * T | T | T | T | T |
3016 * +---+-------+---+
3017 * +---+---+---+---+
3018 * T | T | T | T | T |
3019 * +---+---+---+---+
3020 * ^ ^
3021 * | |
3022 * log_start log_end
3023 *
3024 * There's actually a sub-case where rq->n_entries == 0, in which we
3025 * compare rq->prev_term:
3026 *
3027 * rq->prev_log_index
3028 * |
3029 * |
3030 * |
3031 * v
3032 * T
3033 *
3034 * +---+---+---+---+
3035 * T | T | T | T | T |
3036 * +---+---+---+---+
3037 * ^ ^
3038 * | |
3039 * log_start log_end
3040 */
3041 if (nth_entry_index == raft->log_start - 1) {
3042 if (rq->n_entries
3043 ? raft->snap.term == rq->entries[rq->n_entries - 1].term
3044 : raft->snap.term == rq->prev_log_term) {
3045 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
3046 } else {
3047 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
3048 "term mismatch");
3049 }
3050 return;
3051 }
3052
3053 /*
3054 * We now know that the data in rq->entries[] overlaps the data in
3055 * raft->entries[], as shown below, with some positive 'ofs':
3056 *
3057 * rq->prev_log_index
3058 * | first_entry_index
3059 * | | nth_entry_index
3060 * | | |
3061 * v v v
3062 * +---+---+---+---+---+
3063 * T | T | T | T | T | T |
3064 * +---+-------+---+---+
3065 * +---+---+---+---+
3066 * T | T | T | T | T |
3067 * +---+---+---+---+
3068 * ^ ^
3069 * | |
3070 * log_start log_end
3071 *
3072 * |<-- ofs -->|
3073 *
3074 * We transform this into the following by trimming the first 'ofs'
3075 * elements off of rq->entries[], ending up with the following. Notice how
3076 * we retain the term but not the data for rq->entries[ofs - 1]:
3077 *
3078 * first_entry_index + ofs - 1
3079 * | first_entry_index + ofs
3080 * | | nth_entry_index + ofs
3081 * | | |
3082 * v v v
3083 * +---+---+
3084 * T | T | T |
3085 * +---+---+
3086 * +---+---+---+---+
3087 * T | T | T | T | T |
3088 * +---+---+---+---+
3089 * ^ ^
3090 * | |
3091 * log_start log_end
3092 */
3093 uint64_t ofs = raft->log_start - first_entry_index;
3094 raft_handle_append_entries(raft, rq,
3095 raft->log_start - 1, rq->entries[ofs - 1].term,
3096 &rq->entries[ofs], rq->n_entries - ofs);
3097 }
3098
3099 /* Returns true if 'raft' has another log entry or snapshot to read. */
3100 bool
3101 raft_has_next_entry(const struct raft *raft_)
3102 {
3103 struct raft *raft = CONST_CAST(struct raft *, raft_);
3104 struct uuid eid;
3105 return raft_peek_next_entry(raft, &eid) != NULL;
3106 }
3107
3108 /* Returns the next log entry or snapshot from 'raft', or NULL if there are
3109 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
3110 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
3111 * log entry. */
3112 const struct json *
3113 raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
3114 {
3115 const struct json *data = raft_get_next_entry(raft, eid);
3116 *is_snapshot = data == raft->snap.data;
3117 return data;
3118 }
3119
3120 /* Returns the log index of the last-read snapshot or log entry. */
3121 uint64_t
3122 raft_get_applied_index(const struct raft *raft)
3123 {
3124 return raft->last_applied;
3125 }
3126
3127 /* Returns the log index of the last snapshot or log entry that is available to
3128 * be read. */
3129 uint64_t
3130 raft_get_commit_index(const struct raft *raft)
3131 {
3132 return raft->commit_index;
3133 }
3134
3135 static struct raft_server *
3136 raft_find_peer(struct raft *raft, const struct uuid *uuid)
3137 {
3138 struct raft_server *s = raft_find_server(raft, uuid);
3139 return s && !uuid_equals(&raft->sid, &s->sid) ? s : NULL;
3140 }
3141
3142 static struct raft_server *
3143 raft_find_new_server(struct raft *raft, const struct uuid *uuid)
3144 {
3145 return raft_server_find(&raft->add_servers, uuid);
3146 }
3147
3148 /* Figure 3.1: "If there exists an N such that N > commitIndex, a
3149 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
3150 * commitIndex = N (sections 3.5 and 3.6)." */
3151 static void
3152 raft_consider_updating_commit_index(struct raft *raft)
3153 {
3154 /* This loop cannot just bail out when it comes across a log entry that
3155 * does not match the criteria. For example, Figure 3.7(d2) shows a
3156 * case where the log entry for term 2 cannot be committed directly
3157 * (because it is not for the current term) but it can be committed as
3158 * a side effect of commit the entry for term 4 (the current term).
3159 * XXX Is there a more efficient way to do this? */
3160 ovs_assert(raft->role == RAFT_LEADER);
3161
3162 uint64_t new_commit_index = raft->commit_index;
3163 for (uint64_t idx = MAX(raft->commit_index + 1, raft->log_start);
3164 idx < raft->log_end; idx++) {
3165 if (raft->entries[idx - raft->log_start].term == raft->term) {
3166 size_t count = 0;
3167 struct raft_server *s2;
3168 HMAP_FOR_EACH (s2, hmap_node, &raft->servers) {
3169 if (s2->match_index >= idx) {
3170 count++;
3171 }
3172 }
3173 if (count > hmap_count(&raft->servers) / 2) {
3174 VLOG_DBG("index %"PRIu64" committed to %"PRIuSIZE" servers, "
3175 "applying", idx, count);
3176 new_commit_index = idx;
3177 }
3178 }
3179 }
3180 if (raft_update_commit_index(raft, new_commit_index)) {
3181 raft_send_heartbeats(raft);
3182 }
3183 }
3184
3185 static void
3186 raft_update_match_index(struct raft *raft, struct raft_server *s,
3187 uint64_t min_index)
3188 {
3189 ovs_assert(raft->role == RAFT_LEADER);
3190 if (min_index > s->match_index) {
3191 s->match_index = min_index;
3192 raft_consider_updating_commit_index(raft);
3193 }
3194 }
3195
3196 static void
3197 raft_update_our_match_index(struct raft *raft, uint64_t min_index)
3198 {
3199 struct raft_server *server = raft_find_server(raft, &raft->sid);
3200 if (server) {
3201 raft_update_match_index(raft, server, min_index);
3202 }
3203 }
3204
3205 static void
3206 raft_send_install_snapshot_request(struct raft *raft,
3207 const struct raft_server *s,
3208 const char *comment)
3209 {
3210 union raft_rpc rpc = {
3211 .install_snapshot_request = {
3212 .common = {
3213 .type = RAFT_RPC_INSTALL_SNAPSHOT_REQUEST,
3214 .sid = s->sid,
3215 .comment = CONST_CAST(char *, comment),
3216 },
3217 .term = raft->term,
3218 .last_index = raft->log_start - 1,
3219 .last_term = raft->snap.term,
3220 .last_servers = raft->snap.servers,
3221 .last_eid = raft->snap.eid,
3222 .data = raft->snap.data,
3223 }
3224 };
3225 raft_send(raft, &rpc);
3226 }
3227
3228 static void
3229 raft_handle_append_reply(struct raft *raft,
3230 const struct raft_append_reply *rpy)
3231 {
3232 if (raft->role != RAFT_LEADER) {
3233 VLOG_INFO("rejected append_reply (not leader)");
3234 return;
3235 }
3236
3237 /* Most commonly we'd be getting an AppendEntries reply from a configured
3238 * server (e.g. a peer), but we can also get them from servers in the
3239 * process of being added. */
3240 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3241 if (!s) {
3242 s = raft_find_new_server(raft, &rpy->common.sid);
3243 if (!s) {
3244 VLOG_INFO("rejected append_reply from unknown server "SID_FMT,
3245 SID_ARGS(&rpy->common.sid));
3246 return;
3247 }
3248 }
3249
3250 s->replied = true;
3251 if (rpy->result == RAFT_APPEND_OK) {
3252 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3253 * follower (section 3.5)." */
3254 uint64_t min_index = rpy->prev_log_index + rpy->n_entries + 1;
3255 if (s->next_index < min_index) {
3256 s->next_index = min_index;
3257 }
3258 raft_update_match_index(raft, s, min_index - 1);
3259 } else {
3260 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3261 * decrement nextIndex and retry (section 3.5)."
3262 *
3263 * We also implement the optimization suggested in section 4.2.1:
3264 * "Various approaches can make nextIndex converge to its correct value
3265 * more quickly, including those described in Chapter 3. The simplest
3266 * approach to solving this particular problem of adding a new server,
3267 * however, is to have followers return the length of their logs in the
3268 * AppendEntries response; this allows the leader to cap the follower’s
3269 * nextIndex accordingly." */
3270 s->next_index = (s->next_index > 0
3271 ? MIN(s->next_index - 1, rpy->log_end)
3272 : 0);
3273
3274 if (rpy->result == RAFT_APPEND_IO_ERROR) {
3275 /* Append failed but not because of a log inconsistency. Because
3276 * of the I/O error, there's no point in re-sending the append
3277 * immediately. */
3278 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3279 VLOG_INFO_RL(&rl, "%s reported I/O error", s->nickname);
3280 return;
3281 }
3282 }
3283
3284 /*
3285 * Our behavior here must depend on the value of next_index relative to
3286 * log_start and log_end. There are three cases:
3287 *
3288 * Case 1 | Case 2 | Case 3
3289 * <---------------->|<------------->|<------------------>
3290 * | |
3291 *
3292 * +---+---+---+---+
3293 * T | T | T | T | T |
3294 * +---+---+---+---+
3295 * ^ ^
3296 * | |
3297 * log_start log_end
3298 */
3299 if (s->next_index < raft->log_start) {
3300 /* Case 1. */
3301 raft_send_install_snapshot_request(raft, s, NULL);
3302 } else if (s->next_index < raft->log_end) {
3303 /* Case 2. */
3304 raft_send_append_request(raft, s, 1, NULL);
3305 } else {
3306 /* Case 3. */
3307 if (s->phase == RAFT_PHASE_CATCHUP) {
3308 s->phase = RAFT_PHASE_CAUGHT_UP;
3309 raft_run_reconfigure(raft);
3310 }
3311 }
3312 }
3313
3314 static bool
3315 raft_should_suppress_disruptive_server(struct raft *raft,
3316 const union raft_rpc *rpc)
3317 {
3318 if (rpc->type != RAFT_RPC_VOTE_REQUEST) {
3319 return false;
3320 }
3321
3322 /* Section 4.2.3 "Disruptive Servers" says:
3323 *
3324 * ...if a server receives a RequestVote request within the minimum
3325 * election timeout of hearing from a current leader, it does not update
3326 * its term or grant its vote...
3327 *
3328 * ...This change conflicts with the leadership transfer mechanism as
3329 * described in Chapter 3, in which a server legitimately starts an
3330 * election without waiting an election timeout. In that case,
3331 * RequestVote messages should be processed by other servers even when
3332 * they believe a current cluster leader exists. Those RequestVote
3333 * requests can include a special flag to indicate this behavior (“I
3334 * have permission to disrupt the leader--it told me to!”).
3335 *
3336 * This clearly describes how the followers should act, but not the leader.
3337 * We just ignore vote requests that arrive at a current leader. This
3338 * seems to be fairly safe, since a majority other than the current leader
3339 * can still elect a new leader and the first AppendEntries from that new
3340 * leader will depose the current leader. */
3341 const struct raft_vote_request *rq = raft_vote_request_cast(rpc);
3342 if (rq->leadership_transfer) {
3343 return false;
3344 }
3345
3346 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3347 long long int now = time_msec();
3348 switch (raft->role) {
3349 case RAFT_LEADER:
3350 VLOG_WARN_RL(&rl, "ignoring vote request received as leader");
3351 return true;
3352
3353 case RAFT_FOLLOWER:
3354 if (now < raft->election_base + ELECTION_BASE_MSEC) {
3355 VLOG_WARN_RL(&rl, "ignoring vote request received after only "
3356 "%lld ms (minimum election time is %d ms)",
3357 now - raft->election_base, ELECTION_BASE_MSEC);
3358 return true;
3359 }
3360 return false;
3361
3362 case RAFT_CANDIDATE:
3363 return false;
3364
3365 default:
3366 OVS_NOT_REACHED();
3367 }
3368 }
3369
3370 /* Returns true if a reply should be sent. */
3371 static bool
3372 raft_handle_vote_request__(struct raft *raft,
3373 const struct raft_vote_request *rq)
3374 {
3375 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3376 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3377 * 3.6)." */
3378 if (uuid_equals(&raft->vote, &rq->common.sid)) {
3379 /* Already voted for this candidate in this term. Resend vote. */
3380 return true;
3381 } else if (!uuid_is_zero(&raft->vote)) {
3382 /* Already voted for different candidate in this term. Send a reply
3383 * saying what candidate we did vote for. This isn't a necessary part
3384 * of the Raft protocol but it can make debugging easier. */
3385 return true;
3386 }
3387
3388 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3389 * includes information about the candidate’s log, and the voter denies its
3390 * vote if its own log is more up-to-date than that of the candidate. Raft
3391 * determines which of two logs is more up-to-date by comparing the index
3392 * and term of the last entries in the logs. If the logs have last entries
3393 * with different terms, then the log with the later term is more
3394 * up-to-date. If the logs end with the same term, then whichever log is
3395 * longer is more up-to-date." */
3396 uint64_t last_term = (raft->log_end > raft->log_start
3397 ? raft->entries[raft->log_end - 1
3398 - raft->log_start].term
3399 : raft->snap.term);
3400 if (last_term > rq->last_log_term
3401 || (last_term == rq->last_log_term
3402 && raft->log_end - 1 > rq->last_log_index)) {
3403 /* Our log is more up-to-date than the peer's. Withhold vote. */
3404 return false;
3405 }
3406
3407 /* Record a vote for the peer. */
3408 if (!raft_set_term(raft, raft->term, &rq->common.sid)) {
3409 return false;
3410 }
3411
3412 raft_reset_election_timer(raft);
3413
3414 return true;
3415 }
3416
3417 static void
3418 raft_send_vote_reply(struct raft *raft, const struct uuid *dst,
3419 const struct uuid *vote)
3420 {
3421 union raft_rpc rpy = {
3422 .vote_reply = {
3423 .common = {
3424 .type = RAFT_RPC_VOTE_REPLY,
3425 .sid = *dst,
3426 },
3427 .term = raft->term,
3428 .vote = *vote,
3429 },
3430 };
3431 raft_send(raft, &rpy);
3432 }
3433
3434 static void
3435 raft_handle_vote_request(struct raft *raft,
3436 const struct raft_vote_request *rq)
3437 {
3438 if (raft_handle_vote_request__(raft, rq)) {
3439 raft_send_vote_reply(raft, &rq->common.sid, &raft->vote);
3440 }
3441 }
3442
3443 static void
3444 raft_handle_vote_reply(struct raft *raft,
3445 const struct raft_vote_reply *rpy)
3446 {
3447 if (!raft_receive_term__(raft, &rpy->common, rpy->term)) {
3448 return;
3449 }
3450
3451 if (raft->role != RAFT_CANDIDATE) {
3452 return;
3453 }
3454
3455 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3456 if (s) {
3457 raft_accept_vote(raft, s, &rpy->vote);
3458 }
3459 }
3460
3461 /* Returns true if 'raft''s log contains reconfiguration entries that have not
3462 * yet been committed. */
3463 static bool
3464 raft_has_uncommitted_configuration(const struct raft *raft)
3465 {
3466 for (uint64_t i = raft->commit_index + 1; i < raft->log_end; i++) {
3467 ovs_assert(i >= raft->log_start);
3468 const struct raft_entry *e = &raft->entries[i - raft->log_start];
3469 if (e->servers) {
3470 return true;
3471 }
3472 }
3473 return false;
3474 }
3475
3476 static void
3477 raft_log_reconfiguration(struct raft *raft)
3478 {
3479 struct json *servers_json = raft_servers_to_json(&raft->servers);
3480 raft_command_unref(raft_command_execute__(
3481 raft, NULL, servers_json, NULL, NULL));
3482 json_destroy(servers_json);
3483 }
3484
3485 static void
3486 raft_run_reconfigure(struct raft *raft)
3487 {
3488 ovs_assert(raft->role == RAFT_LEADER);
3489
3490 /* Reconfiguration only progresses when configuration changes commit. */
3491 if (raft_has_uncommitted_configuration(raft)) {
3492 return;
3493 }
3494
3495 /* If we were waiting for a configuration change to commit, it's done. */
3496 struct raft_server *s;
3497 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3498 if (s->phase == RAFT_PHASE_COMMITTING) {
3499 raft_send_add_server_reply__(raft, &s->sid, s->address,
3500 true, RAFT_SERVER_COMPLETED);
3501 s->phase = RAFT_PHASE_STABLE;
3502 }
3503 }
3504 if (raft->remove_server) {
3505 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
3506 &raft->remove_server->requester_sid,
3507 raft->remove_server->requester_conn,
3508 true, RAFT_SERVER_COMPLETED);
3509 raft_server_destroy(raft->remove_server);
3510 raft->remove_server = NULL;
3511 }
3512
3513 /* If a new server is caught up, add it to the configuration. */
3514 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
3515 if (s->phase == RAFT_PHASE_CAUGHT_UP) {
3516 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3517 hmap_remove(&raft->add_servers, &s->hmap_node);
3518 hmap_insert(&raft->servers, &s->hmap_node, uuid_hash(&s->sid));
3519
3520 /* Mark 's' as waiting for commit. */
3521 s->phase = RAFT_PHASE_COMMITTING;
3522
3523 raft_log_reconfiguration(raft);
3524
3525 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3526 * send a RAFT_SERVER_OK reply. */
3527
3528 return;
3529 }
3530 }
3531
3532 /* Remove a server, if one is scheduled for removal. */
3533 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3534 if (s->phase == RAFT_PHASE_REMOVE) {
3535 hmap_remove(&raft->servers, &s->hmap_node);
3536 raft->remove_server = s;
3537
3538 raft_log_reconfiguration(raft);
3539
3540 return;
3541 }
3542 }
3543 }
3544
3545 static void
3546 raft_handle_add_server_request(struct raft *raft,
3547 const struct raft_add_server_request *rq)
3548 {
3549 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3550 if (raft->role != RAFT_LEADER) {
3551 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3552 return;
3553 }
3554
3555 /* Check for an existing server. */
3556 struct raft_server *s = raft_find_server(raft, &rq->common.sid);
3557 if (s) {
3558 /* If the server is scheduled to be removed, cancel it. */
3559 if (s->phase == RAFT_PHASE_REMOVE) {
3560 s->phase = RAFT_PHASE_STABLE;
3561 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_CANCELED);
3562 return;
3563 }
3564
3565 /* If the server is being added, then it's in progress. */
3566 if (s->phase != RAFT_PHASE_STABLE) {
3567 raft_send_add_server_reply(raft, rq,
3568 false, RAFT_SERVER_IN_PROGRESS);
3569 }
3570
3571 /* Nothing to do--server is already part of the configuration. */
3572 raft_send_add_server_reply(raft, rq,
3573 true, RAFT_SERVER_ALREADY_PRESENT);
3574 return;
3575 }
3576
3577 /* Check for a server being removed. */
3578 if (raft->remove_server
3579 && uuid_equals(&rq->common.sid, &raft->remove_server->sid)) {
3580 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3581 return;
3582 }
3583
3584 /* Check for a server already being added. */
3585 if (raft_find_new_server(raft, &rq->common.sid)) {
3586 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_IN_PROGRESS);
3587 return;
3588 }
3589
3590 /* Add server to 'add_servers'. */
3591 s = raft_server_add(&raft->add_servers, &rq->common.sid, rq->address);
3592 raft_server_init_leader(raft, s);
3593 s->requester_sid = rq->common.sid;
3594 s->requester_conn = NULL;
3595 s->phase = RAFT_PHASE_CATCHUP;
3596
3597 /* Start sending the log. If this is the first time we've tried to add
3598 * this server, then this will quickly degenerate into an InstallSnapshot
3599 * followed by a series of AddEntries, but if it's a retry of an earlier
3600 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3601 * leadership) then it will gracefully resume populating the log.
3602 *
3603 * See the last few paragraphs of section 4.2.1 for further insight. */
3604 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3605 VLOG_INFO_RL(&rl,
3606 "starting to add server %s ("SID_FMT" at %s) "
3607 "to cluster "CID_FMT, s->nickname, SID_ARGS(&s->sid),
3608 rq->address, CID_ARGS(&raft->cid));
3609 raft_send_append_request(raft, s, 0, "initialize new server");
3610 }
3611
3612 static void
3613 raft_handle_add_server_reply(struct raft *raft,
3614 const struct raft_add_server_reply *rpy)
3615 {
3616 if (!raft->joining) {
3617 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3618 VLOG_WARN_RL(&rl, "received add_server_reply even though we're "
3619 "already part of the cluster");
3620 return;
3621 }
3622
3623 if (rpy->success) {
3624 raft->joining = false;
3625
3626 /* It is tempting, at this point, to check that this server is part of
3627 * the current configuration. However, this is not necessarily the
3628 * case, because the log entry that added this server to the cluster
3629 * might have been committed by a majority of the cluster that does not
3630 * include this one. This actually happens in testing. */
3631 } else {
3632 const char *address;
3633 SSET_FOR_EACH (address, &rpy->remote_addresses) {
3634 if (sset_add(&raft->remote_addresses, address)) {
3635 VLOG_INFO("%s: learned new server address for joining cluster",
3636 address);
3637 }
3638 }
3639 }
3640 }
3641
3642 /* This is called by raft_unixctl_kick() as well as via RPC. */
3643 static void
3644 raft_handle_remove_server_request(struct raft *raft,
3645 const struct raft_remove_server_request *rq)
3646 {
3647 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3648 if (raft->role != RAFT_LEADER) {
3649 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3650 return;
3651 }
3652
3653 /* If the server to remove is currently waiting to be added, cancel it. */
3654 struct raft_server *target = raft_find_new_server(raft, &rq->sid);
3655 if (target) {
3656 raft_send_add_server_reply__(raft, &target->sid, target->address,
3657 false, RAFT_SERVER_CANCELED);
3658 hmap_remove(&raft->add_servers, &target->hmap_node);
3659 raft_server_destroy(target);
3660 return;
3661 }
3662
3663 /* If the server isn't configured, report that. */
3664 target = raft_find_server(raft, &rq->sid);
3665 if (!target) {
3666 raft_send_remove_server_reply(raft, rq,
3667 true, RAFT_SERVER_ALREADY_GONE);
3668 return;
3669 }
3670
3671 /* Check whether we're waiting for the addition of the server to commit. */
3672 if (target->phase == RAFT_PHASE_COMMITTING) {
3673 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3674 return;
3675 }
3676
3677 /* Check whether the server is already scheduled for removal. */
3678 if (target->phase == RAFT_PHASE_REMOVE) {
3679 raft_send_remove_server_reply(raft, rq,
3680 false, RAFT_SERVER_IN_PROGRESS);
3681 return;
3682 }
3683
3684 /* Make sure that if we remove this server then that at least one other
3685 * server will be left. We don't count servers currently being added (in
3686 * 'add_servers') since those could fail. */
3687 struct raft_server *s;
3688 int n = 0;
3689 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3690 if (s != target && s->phase != RAFT_PHASE_REMOVE) {
3691 n++;
3692 }
3693 }
3694 if (!n) {
3695 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_EMPTY);
3696 return;
3697 }
3698
3699 /* Mark the server for removal. */
3700 target->phase = RAFT_PHASE_REMOVE;
3701 if (rq->requester_conn) {
3702 target->requester_sid = UUID_ZERO;
3703 unixctl_command_reply(rq->requester_conn, "started removal");
3704 } else {
3705 target->requester_sid = rq->common.sid;
3706 target->requester_conn = NULL;
3707 }
3708
3709 raft_run_reconfigure(raft);
3710 /* Operation in progress, reply will be sent later. */
3711 }
3712
3713 static void
3714 raft_finished_leaving_cluster(struct raft *raft)
3715 {
3716 VLOG_INFO(SID_FMT": finished leaving cluster "CID_FMT,
3717 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
3718
3719 raft_record_note(raft, "left", "this server left the cluster");
3720
3721 raft->leaving = false;
3722 raft->left = true;
3723 }
3724
3725 static void
3726 raft_handle_remove_server_reply(struct raft *raft,
3727 const struct raft_remove_server_reply *rpc)
3728 {
3729 if (rpc->success
3730 && (uuid_is_zero(&rpc->target_sid)
3731 || uuid_equals(&rpc->target_sid, &raft->sid))) {
3732 raft_finished_leaving_cluster(raft);
3733 }
3734 }
3735
3736 static bool
3737 raft_handle_write_error(struct raft *raft, struct ovsdb_error *error)
3738 {
3739 if (error && !raft->failed) {
3740 raft->failed = true;
3741
3742 char *s = ovsdb_error_to_string_free(error);
3743 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3744 raft->name, s);
3745 free(s);
3746 }
3747 return !raft->failed;
3748 }
3749
3750 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3751 raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
3752 uint64_t new_log_start,
3753 const struct raft_entry *new_snapshot)
3754 {
3755 struct raft_header h = {
3756 .sid = raft->sid,
3757 .cid = raft->cid,
3758 .name = raft->name,
3759 .local_address = raft->local_address,
3760 .snap_index = new_log_start - 1,
3761 .snap = *new_snapshot,
3762 };
3763 struct ovsdb_error *error = ovsdb_log_write_and_free(
3764 log, raft_header_to_json(&h));
3765 if (error) {
3766 return error;
3767 }
3768 ovsdb_log_mark_base(raft->log);
3769
3770 /* Write log records. */
3771 for (uint64_t index = new_log_start; index < raft->log_end; index++) {
3772 const struct raft_entry *e = &raft->entries[index - raft->log_start];
3773 struct raft_record r = {
3774 .type = RAFT_REC_ENTRY,
3775 .term = e->term,
3776 .entry = {
3777 .index = index,
3778 .data = e->data,
3779 .servers = e->servers,
3780 .eid = e->eid,
3781 },
3782 };
3783 error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3784 if (error) {
3785 return error;
3786 }
3787 }
3788
3789 /* Write term and vote (if any).
3790 *
3791 * The term is redundant if we wrote a log record for that term above. The
3792 * vote, if any, is never redundant.
3793 */
3794 error = raft_write_state(log, raft->term, &raft->vote);
3795 if (error) {
3796 return error;
3797 }
3798
3799 /* Write commit_index if it's beyond the new start of the log. */
3800 if (raft->commit_index >= new_log_start) {
3801 struct raft_record r = {
3802 .type = RAFT_REC_COMMIT_INDEX,
3803 .commit_index = raft->commit_index,
3804 };
3805 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3806 }
3807 return NULL;
3808 }
3809
3810 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3811 raft_save_snapshot(struct raft *raft,
3812 uint64_t new_start, const struct raft_entry *new_snapshot)
3813
3814 {
3815 struct ovsdb_log *new_log;
3816 struct ovsdb_error *error;
3817 error = ovsdb_log_replace_start(raft->log, &new_log);
3818 if (error) {
3819 return error;
3820 }
3821
3822 error = raft_write_snapshot(raft, new_log, new_start, new_snapshot);
3823 if (error) {
3824 ovsdb_log_replace_abort(new_log);
3825 return error;
3826 }
3827
3828 return ovsdb_log_replace_commit(raft->log, new_log);
3829 }
3830
3831 static bool
3832 raft_handle_install_snapshot_request__(
3833 struct raft *raft, const struct raft_install_snapshot_request *rq)
3834 {
3835 raft_reset_election_timer(raft);
3836
3837 /*
3838 * Our behavior here depend on new_log_start in the snapshot compared to
3839 * log_start and log_end. There are three cases:
3840 *
3841 * Case 1 | Case 2 | Case 3
3842 * <---------------->|<------------->|<------------------>
3843 * | |
3844 *
3845 * +---+---+---+---+
3846 * T | T | T | T | T |
3847 * +---+---+---+---+
3848 * ^ ^
3849 * | |
3850 * log_start log_end
3851 */
3852 uint64_t new_log_start = rq->last_index + 1;
3853 if (new_log_start < raft->log_start) {
3854 /* Case 1: The new snapshot covers less than our current one. Nothing
3855 * to do. */
3856 return true;
3857 } else if (new_log_start < raft->log_end) {
3858 /* Case 2: The new snapshot starts in the middle of our log. We could
3859 * discard the first 'new_log_start - raft->log_start' entries in the
3860 * log. But there's not much value in that, since snapshotting is
3861 * supposed to be a local decision. Just skip it. */
3862 return true;
3863 }
3864
3865 /* Case 3: The new snapshot starts past the end of our current log, so
3866 * discard all of our current log. */
3867 const struct raft_entry new_snapshot = {
3868 .term = rq->last_term,
3869 .data = rq->data,
3870 .eid = rq->last_eid,
3871 .servers = rq->last_servers,
3872 };
3873 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3874 &new_snapshot);
3875 if (error) {
3876 char *error_s = ovsdb_error_to_string(error);
3877 VLOG_WARN("could not save snapshot: %s", error_s);
3878 free(error_s);
3879 return false;
3880 }
3881
3882 for (size_t i = 0; i < raft->log_end - raft->log_start; i++) {
3883 raft_entry_uninit(&raft->entries[i]);
3884 }
3885 raft->log_start = raft->log_end = new_log_start;
3886 raft->log_synced = raft->log_end - 1;
3887 raft->commit_index = raft->log_start - 1;
3888 if (raft->last_applied < raft->commit_index) {
3889 raft->last_applied = raft->log_start - 2;
3890 }
3891
3892 raft_entry_uninit(&raft->snap);
3893 raft_entry_clone(&raft->snap, &new_snapshot);
3894
3895 raft_get_servers_from_log(raft, VLL_INFO);
3896
3897 return true;
3898 }
3899
3900 static void
3901 raft_handle_install_snapshot_request(
3902 struct raft *raft, const struct raft_install_snapshot_request *rq)
3903 {
3904 if (raft_handle_install_snapshot_request__(raft, rq)) {
3905 union raft_rpc rpy = {
3906 .install_snapshot_reply = {
3907 .common = {
3908 .type = RAFT_RPC_INSTALL_SNAPSHOT_REPLY,
3909 .sid = rq->common.sid,
3910 },
3911 .term = raft->term,
3912 .last_index = rq->last_index,
3913 .last_term = rq->last_term,
3914 },
3915 };
3916 raft_send(raft, &rpy);
3917 }
3918 }
3919
3920 static void
3921 raft_handle_install_snapshot_reply(
3922 struct raft *raft, const struct raft_install_snapshot_reply *rpy)
3923 {
3924 /* We might get an InstallSnapshot reply from a configured server (e.g. a
3925 * peer) or a server in the process of being added. */
3926 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3927 if (!s) {
3928 s = raft_find_new_server(raft, &rpy->common.sid);
3929 if (!s) {
3930 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3931 VLOG_INFO_RL(&rl, "cluster "CID_FMT": received %s from "
3932 "unknown server "SID_FMT, CID_ARGS(&raft->cid),
3933 raft_rpc_type_to_string(rpy->common.type),
3934 SID_ARGS(&rpy->common.sid));
3935 return;
3936 }
3937 }
3938
3939 if (rpy->last_index != raft->log_start - 1 ||
3940 rpy->last_term != raft->snap.term) {
3941 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3942 VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s installed "
3943 "out-of-date snapshot, starting over",
3944 CID_ARGS(&raft->cid), s->nickname);
3945 raft_send_install_snapshot_request(raft, s,
3946 "installed obsolete snapshot");
3947 return;
3948 }
3949
3950 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3951 VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
3952 " up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
3953 s->nickname, rpy->last_term, rpy->last_index);
3954 s->next_index = raft->log_end;
3955 raft_send_append_request(raft, s, 0, "snapshot installed");
3956 }
3957
3958 /* Returns true if 'raft' has grown enough since the last snapshot that
3959 * reducing the log to a snapshot would be valuable, false otherwise. */
3960 bool
3961 raft_grew_lots(const struct raft *raft)
3962 {
3963 return ovsdb_log_grew_lots(raft->log);
3964 }
3965
3966 /* Returns the number of log entries that could be trimmed off the on-disk log
3967 * by snapshotting. */
3968 uint64_t
3969 raft_get_log_length(const struct raft *raft)
3970 {
3971 return (raft->last_applied < raft->log_start
3972 ? 0
3973 : raft->last_applied - raft->log_start + 1);
3974 }
3975
3976 /* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
3977 * possible. */
3978 bool
3979 raft_may_snapshot(const struct raft *raft)
3980 {
3981 return (!raft->joining
3982 && !raft->leaving
3983 && !raft->left
3984 && !raft->failed
3985 && raft->last_applied >= raft->log_start);
3986 }
3987
3988 /* Replaces the log for 'raft', up to the last log entry read, by
3989 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
3990 * the caller must eventually free.
3991 *
3992 * This function can only succeed if raft_may_snapshot() returns true. It is
3993 * only valuable to call it if raft_get_log_length() is significant and
3994 * especially if raft_grew_lots() returns true. */
3995 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3996 raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
3997 {
3998 if (raft->joining) {
3999 return ovsdb_error(NULL,
4000 "cannot store a snapshot while joining cluster");
4001 } else if (raft->leaving) {
4002 return ovsdb_error(NULL,
4003 "cannot store a snapshot while leaving cluster");
4004 } else if (raft->left) {
4005 return ovsdb_error(NULL,
4006 "cannot store a snapshot after leaving cluster");
4007 } else if (raft->failed) {
4008 return ovsdb_error(NULL,
4009 "cannot store a snapshot following failure");
4010 }
4011
4012 if (raft->last_applied < raft->log_start) {
4013 return ovsdb_error(NULL, "not storing a duplicate snapshot");
4014 }
4015
4016 uint64_t new_log_start = raft->last_applied + 1;
4017 struct raft_entry new_snapshot = {
4018 .term = raft_get_term(raft, new_log_start - 1),
4019 .data = json_clone(new_snapshot_data),
4020 .eid = *raft_get_eid(raft, new_log_start - 1),
4021 .servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
4022 };
4023 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
4024 &new_snapshot);
4025 if (error) {
4026 raft_entry_uninit(&new_snapshot);
4027 return error;
4028 }
4029
4030 raft->log_synced = raft->log_end - 1;
4031 raft_entry_uninit(&raft->snap);
4032 raft->snap = new_snapshot;
4033 for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
4034 raft_entry_uninit(&raft->entries[i]);
4035 }
4036 memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
4037 (raft->log_end - new_log_start) * sizeof *raft->entries);
4038 raft->log_start = new_log_start;
4039 return NULL;
4040 }
4041
4042 static void
4043 raft_handle_become_leader(struct raft *raft,
4044 const struct raft_become_leader *rq)
4045 {
4046 if (raft->role == RAFT_FOLLOWER) {
4047 char buf[SID_LEN + 1];
4048 VLOG_INFO("received leadership transfer from %s in term %"PRIu64,
4049 raft_get_nickname(raft, &rq->common.sid, buf, sizeof buf),
4050 rq->term);
4051 raft_start_election(raft, true);
4052 }
4053 }
4054
4055 static void
4056 raft_send_execute_command_reply(struct raft *raft,
4057 const struct uuid *sid,
4058 const struct uuid *eid,
4059 enum raft_command_status status,
4060 uint64_t commit_index)
4061 {
4062 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REP) {
4063 ovs_fatal(0, "Raft test: crash before sending execute_command_reply");
4064 }
4065 union raft_rpc rpc = {
4066 .execute_command_reply = {
4067 .common = {
4068 .type = RAFT_RPC_EXECUTE_COMMAND_REPLY,
4069 .sid = *sid,
4070 },
4071 .result = *eid,
4072 .status = status,
4073 .commit_index = commit_index,
4074 },
4075 };
4076 raft_send(raft, &rpc);
4077 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REP) {
4078 ovs_fatal(0, "Raft test: crash after sending execute_command_reply.");
4079 }
4080 }
4081
4082 static enum raft_command_status
4083 raft_handle_execute_command_request__(
4084 struct raft *raft, const struct raft_execute_command_request *rq)
4085 {
4086 if (raft->role != RAFT_LEADER) {
4087 return RAFT_CMD_NOT_LEADER;
4088 }
4089
4090 const struct uuid *current_eid = raft_current_eid(raft);
4091 if (!uuid_equals(&rq->prereq, current_eid)) {
4092 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4093 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
4094 "prerequisite "UUID_FMT" in execute_command_request",
4095 UUID_ARGS(current_eid), UUID_ARGS(&rq->prereq));
4096 return RAFT_CMD_BAD_PREREQ;
4097 }
4098
4099 struct raft_command *cmd = raft_command_initiate(raft, rq->data,
4100 NULL, &rq->result);
4101 cmd->sid = rq->common.sid;
4102
4103 enum raft_command_status status = cmd->status;
4104 if (status != RAFT_CMD_INCOMPLETE) {
4105 raft_command_unref(cmd);
4106 }
4107 return status;
4108 }
4109
4110 static void
4111 raft_handle_execute_command_request(
4112 struct raft *raft, const struct raft_execute_command_request *rq)
4113 {
4114 enum raft_command_status status
4115 = raft_handle_execute_command_request__(raft, rq);
4116 if (status != RAFT_CMD_INCOMPLETE) {
4117 raft_send_execute_command_reply(raft, &rq->common.sid, &rq->result,
4118 status, 0);
4119 }
4120 }
4121
4122 static void
4123 raft_handle_execute_command_reply(
4124 struct raft *raft, const struct raft_execute_command_reply *rpy)
4125 {
4126 struct raft_command *cmd = raft_find_command_by_eid(raft, &rpy->result);
4127 if (!cmd) {
4128 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4129 char buf[SID_LEN + 1];
4130 VLOG_INFO_RL(&rl,
4131 "%s received \"%s\" reply from %s for unknown command",
4132 raft->local_nickname,
4133 raft_command_status_to_string(rpy->status),
4134 raft_get_nickname(raft, &rpy->common.sid,
4135 buf, sizeof buf));
4136 return;
4137 }
4138
4139 if (rpy->status == RAFT_CMD_INCOMPLETE) {
4140 cmd->timestamp = time_msec();
4141 } else {
4142 cmd->index = rpy->commit_index;
4143 raft_command_complete(raft, cmd, rpy->status);
4144 }
4145 }
4146
4147 static void
4148 raft_handle_rpc(struct raft *raft, const union raft_rpc *rpc)
4149 {
4150 uint64_t term = raft_rpc_get_term(rpc);
4151 if (term
4152 && !raft_should_suppress_disruptive_server(raft, rpc)
4153 && !raft_receive_term__(raft, &rpc->common, term)) {
4154 if (rpc->type == RAFT_RPC_APPEND_REQUEST) {
4155 /* Section 3.3: "If a server receives a request with a stale term
4156 * number, it rejects the request." */
4157 raft_send_append_reply(raft, raft_append_request_cast(rpc),
4158 RAFT_APPEND_INCONSISTENCY, "stale term");
4159 }
4160 return;
4161 }
4162
4163 switch (rpc->type) {
4164 #define RAFT_RPC(ENUM, NAME) \
4165 case ENUM: \
4166 raft_handle_##NAME(raft, &rpc->NAME); \
4167 break;
4168 RAFT_RPC_TYPES
4169 #undef RAFT_RPC
4170 default:
4171 OVS_NOT_REACHED();
4172 }
4173 }
4174 \f
4175 static bool
4176 raft_rpc_is_heartbeat(const union raft_rpc *rpc)
4177 {
4178 return ((rpc->type == RAFT_RPC_APPEND_REQUEST
4179 || rpc->type == RAFT_RPC_APPEND_REPLY)
4180 && rpc->common.comment
4181 && !strcmp(rpc->common.comment, "heartbeat"));
4182 }
4183
4184 \f
4185 static bool
4186 raft_send_to_conn_at(struct raft *raft, const union raft_rpc *rpc,
4187 struct raft_conn *conn, int line_number)
4188 {
4189 log_rpc(rpc, "-->", conn, line_number);
4190 return !jsonrpc_session_send(
4191 conn->js, raft_rpc_to_jsonrpc(&raft->cid, &raft->sid, rpc));
4192 }
4193
4194 static bool
4195 raft_is_rpc_synced(const struct raft *raft, const union raft_rpc *rpc)
4196 {
4197 uint64_t term = raft_rpc_get_term(rpc);
4198 uint64_t index = raft_rpc_get_min_sync_index(rpc);
4199 const struct uuid *vote = raft_rpc_get_vote(rpc);
4200
4201 return (term <= raft->synced_term
4202 && index <= raft->log_synced
4203 && (!vote || uuid_equals(vote, &raft->synced_vote)));
4204 }
4205
4206 static bool
4207 raft_send_at(struct raft *raft, const union raft_rpc *rpc, int line_number)
4208 {
4209 const struct uuid *dst = &rpc->common.sid;
4210 if (uuid_equals(dst, &raft->sid)) {
4211 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4212 VLOG_WARN_RL(&rl, "attempted to send RPC to self from raft.c:%d",
4213 line_number);
4214 return false;
4215 }
4216
4217 struct raft_conn *conn = raft_find_conn_by_sid(raft, dst);
4218 if (!conn) {
4219 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4220 char buf[SID_LEN + 1];
4221 VLOG_DBG_RL(&rl, "%s: no connection to %s, cannot send RPC "
4222 "from raft.c:%d", raft->local_nickname,
4223 raft_get_nickname(raft, dst, buf, sizeof buf),
4224 line_number);
4225 return false;
4226 }
4227
4228 if (!raft_is_rpc_synced(raft, rpc)) {
4229 raft_waiter_create(raft, RAFT_W_RPC, false)->rpc = raft_rpc_clone(rpc);
4230 return true;
4231 }
4232
4233 return raft_send_to_conn_at(raft, rpc, conn, line_number);
4234 }
4235 \f
4236 static struct raft *
4237 raft_lookup_by_name(const char *name)
4238 {
4239 struct raft *raft;
4240
4241 HMAP_FOR_EACH_WITH_HASH (raft, hmap_node, hash_string(name, 0),
4242 &all_rafts) {
4243 if (!strcmp(raft->name, name)) {
4244 return raft;
4245 }
4246 }
4247 return NULL;
4248 }
4249
4250 static void
4251 raft_unixctl_cid(struct unixctl_conn *conn,
4252 int argc OVS_UNUSED, const char *argv[],
4253 void *aux OVS_UNUSED)
4254 {
4255 struct raft *raft = raft_lookup_by_name(argv[1]);
4256 if (!raft) {
4257 unixctl_command_reply_error(conn, "unknown cluster");
4258 } else if (uuid_is_zero(&raft->cid)) {
4259 unixctl_command_reply_error(conn, "cluster id not yet known");
4260 } else {
4261 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->cid));
4262 unixctl_command_reply(conn, uuid);
4263 free(uuid);
4264 }
4265 }
4266
4267 static void
4268 raft_unixctl_sid(struct unixctl_conn *conn,
4269 int argc OVS_UNUSED, const char *argv[],
4270 void *aux OVS_UNUSED)
4271 {
4272 struct raft *raft = raft_lookup_by_name(argv[1]);
4273 if (!raft) {
4274 unixctl_command_reply_error(conn, "unknown cluster");
4275 } else {
4276 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->sid));
4277 unixctl_command_reply(conn, uuid);
4278 free(uuid);
4279 }
4280 }
4281
4282 static void
4283 raft_put_sid(const char *title, const struct uuid *sid,
4284 const struct raft *raft, struct ds *s)
4285 {
4286 ds_put_format(s, "%s: ", title);
4287 if (uuid_equals(sid, &raft->sid)) {
4288 ds_put_cstr(s, "self");
4289 } else if (uuid_is_zero(sid)) {
4290 ds_put_cstr(s, "unknown");
4291 } else {
4292 char buf[SID_LEN + 1];
4293 ds_put_cstr(s, raft_get_nickname(raft, sid, buf, sizeof buf));
4294 }
4295 ds_put_char(s, '\n');
4296 }
4297
4298 static void
4299 raft_unixctl_status(struct unixctl_conn *conn,
4300 int argc OVS_UNUSED, const char *argv[],
4301 void *aux OVS_UNUSED)
4302 {
4303 struct raft *raft = raft_lookup_by_name(argv[1]);
4304 if (!raft) {
4305 unixctl_command_reply_error(conn, "unknown cluster");
4306 return;
4307 }
4308
4309 struct ds s = DS_EMPTY_INITIALIZER;
4310 ds_put_format(&s, "%s\n", raft->local_nickname);
4311 ds_put_format(&s, "Name: %s\n", raft->name);
4312 ds_put_format(&s, "Cluster ID: ");
4313 if (!uuid_is_zero(&raft->cid)) {
4314 ds_put_format(&s, CID_FMT" ("UUID_FMT")\n",
4315 CID_ARGS(&raft->cid), UUID_ARGS(&raft->cid));
4316 } else {
4317 ds_put_format(&s, "not yet known\n");
4318 }
4319 ds_put_format(&s, "Server ID: "SID_FMT" ("UUID_FMT")\n",
4320 SID_ARGS(&raft->sid), UUID_ARGS(&raft->sid));
4321 ds_put_format(&s, "Address: %s\n", raft->local_address);
4322 ds_put_format(&s, "Status: %s\n",
4323 raft->joining ? "joining cluster"
4324 : raft->leaving ? "leaving cluster"
4325 : raft->left ? "left cluster"
4326 : raft->failed ? "failed"
4327 : "cluster member");
4328 if (raft->joining) {
4329 ds_put_format(&s, "Remotes for joining:");
4330 const char *address;
4331 SSET_FOR_EACH (address, &raft->remote_addresses) {
4332 ds_put_format(&s, " %s", address);
4333 }
4334 ds_put_char(&s, '\n');
4335 }
4336 if (raft->role == RAFT_LEADER) {
4337 struct raft_server *as;
4338 HMAP_FOR_EACH (as, hmap_node, &raft->add_servers) {
4339 ds_put_format(&s, "Adding server %s ("SID_FMT" at %s) (%s)\n",
4340 as->nickname, SID_ARGS(&as->sid), as->address,
4341 raft_server_phase_to_string(as->phase));
4342 }
4343
4344 struct raft_server *rs = raft->remove_server;
4345 if (rs) {
4346 ds_put_format(&s, "Removing server %s ("SID_FMT" at %s) (%s)\n",
4347 rs->nickname, SID_ARGS(&rs->sid), rs->address,
4348 raft_server_phase_to_string(rs->phase));
4349 }
4350 }
4351
4352 ds_put_format(&s, "Role: %s\n",
4353 raft->role == RAFT_LEADER ? "leader"
4354 : raft->role == RAFT_CANDIDATE ? "candidate"
4355 : raft->role == RAFT_FOLLOWER ? "follower"
4356 : "<error>");
4357 ds_put_format(&s, "Term: %"PRIu64"\n", raft->term);
4358 raft_put_sid("Leader", &raft->leader_sid, raft, &s);
4359 raft_put_sid("Vote", &raft->vote, raft, &s);
4360 ds_put_char(&s, '\n');
4361
4362 ds_put_format(&s, "Log: [%"PRIu64", %"PRIu64"]\n",
4363 raft->log_start, raft->log_end);
4364
4365 uint64_t n_uncommitted = raft->log_end - raft->commit_index - 1;
4366 ds_put_format(&s, "Entries not yet committed: %"PRIu64"\n", n_uncommitted);
4367
4368 uint64_t n_unapplied = raft->log_end - raft->last_applied - 1;
4369 ds_put_format(&s, "Entries not yet applied: %"PRIu64"\n", n_unapplied);
4370
4371 const struct raft_conn *c;
4372 ds_put_cstr(&s, "Connections:");
4373 LIST_FOR_EACH (c, list_node, &raft->conns) {
4374 bool connected = jsonrpc_session_is_connected(c->js);
4375 ds_put_format(&s, " %s%s%s%s",
4376 connected ? "" : "(",
4377 c->incoming ? "<-" : "->", c->nickname,
4378 connected ? "" : ")");
4379 }
4380 ds_put_char(&s, '\n');
4381
4382 ds_put_cstr(&s, "Servers:\n");
4383 struct raft_server *server;
4384 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
4385 ds_put_format(&s, " %s ("SID_FMT" at %s)",
4386 server->nickname,
4387 SID_ARGS(&server->sid), server->address);
4388 if (uuid_equals(&server->sid, &raft->sid)) {
4389 ds_put_cstr(&s, " (self)");
4390 }
4391 if (server->phase != RAFT_PHASE_STABLE) {
4392 ds_put_format (&s, " (%s)",
4393 raft_server_phase_to_string(server->phase));
4394 }
4395 if (raft->role == RAFT_CANDIDATE) {
4396 if (!uuid_is_zero(&server->vote)) {
4397 char buf[SID_LEN + 1];
4398 ds_put_format(&s, " (voted for %s)",
4399 raft_get_nickname(raft, &server->vote,
4400 buf, sizeof buf));
4401 }
4402 } else if (raft->role == RAFT_LEADER) {
4403 ds_put_format(&s, " next_index=%"PRIu64" match_index=%"PRIu64,
4404 server->next_index, server->match_index);
4405 }
4406 ds_put_char(&s, '\n');
4407 }
4408
4409 unixctl_command_reply(conn, ds_cstr(&s));
4410 ds_destroy(&s);
4411 }
4412
4413 static void
4414 raft_unixctl_leave__(struct unixctl_conn *conn, struct raft *raft)
4415 {
4416 if (raft_is_leaving(raft)) {
4417 unixctl_command_reply_error(conn,
4418 "already in progress leaving cluster");
4419 } else if (raft_is_joining(raft)) {
4420 unixctl_command_reply_error(conn,
4421 "can't leave while join in progress");
4422 } else if (raft_failed(raft)) {
4423 unixctl_command_reply_error(conn,
4424 "can't leave after failure");
4425 } else {
4426 raft_leave(raft);
4427 unixctl_command_reply(conn, NULL);
4428 }
4429 }
4430
4431 static void
4432 raft_unixctl_leave(struct unixctl_conn *conn, int argc OVS_UNUSED,
4433 const char *argv[], void *aux OVS_UNUSED)
4434 {
4435 struct raft *raft = raft_lookup_by_name(argv[1]);
4436 if (!raft) {
4437 unixctl_command_reply_error(conn, "unknown cluster");
4438 return;
4439 }
4440
4441 raft_unixctl_leave__(conn, raft);
4442 }
4443
4444 static struct raft_server *
4445 raft_lookup_server_best_match(struct raft *raft, const char *id)
4446 {
4447 struct raft_server *best = NULL;
4448 int best_score = -1;
4449 int n_best = 0;
4450
4451 struct raft_server *s;
4452 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
4453 int score = (!strcmp(id, s->address)
4454 ? INT_MAX
4455 : uuid_is_partial_match(&s->sid, id));
4456 if (score > best_score) {
4457 best = s;
4458 best_score = score;
4459 n_best = 1;
4460 } else if (score == best_score) {
4461 n_best++;
4462 }
4463 }
4464 return n_best == 1 ? best : NULL;
4465 }
4466
4467 static void
4468 raft_unixctl_kick(struct unixctl_conn *conn, int argc OVS_UNUSED,
4469 const char *argv[], void *aux OVS_UNUSED)
4470 {
4471 const char *cluster_name = argv[1];
4472 const char *server_name = argv[2];
4473
4474 struct raft *raft = raft_lookup_by_name(cluster_name);
4475 if (!raft) {
4476 unixctl_command_reply_error(conn, "unknown cluster");
4477 return;
4478 }
4479
4480 struct raft_server *server = raft_lookup_server_best_match(raft,
4481 server_name);
4482 if (!server) {
4483 unixctl_command_reply_error(conn, "unknown server");
4484 return;
4485 }
4486
4487 if (uuid_equals(&server->sid, &raft->sid)) {
4488 raft_unixctl_leave__(conn, raft);
4489 } else if (raft->role == RAFT_LEADER) {
4490 const struct raft_remove_server_request rq = {
4491 .sid = server->sid,
4492 .requester_conn = conn,
4493 };
4494 raft_handle_remove_server_request(raft, &rq);
4495 } else {
4496 const union raft_rpc rpc = {
4497 .remove_server_request = {
4498 .common = {
4499 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
4500 .sid = raft->leader_sid,
4501 .comment = "via unixctl"
4502 },
4503 .sid = server->sid,
4504 }
4505 };
4506 if (raft_send(raft, &rpc)) {
4507 unixctl_command_reply(conn, "sent removal request to leader");
4508 } else {
4509 unixctl_command_reply_error(conn,
4510 "failed to send removal request");
4511 }
4512 }
4513 }
4514
4515 static void
4516 raft_unixctl_failure_test(struct unixctl_conn *conn OVS_UNUSED,
4517 int argc OVS_UNUSED, const char *argv[],
4518 void *aux OVS_UNUSED)
4519 {
4520 const char *test = argv[1];
4521 if (!strcmp(test, "crash-before-sending-append-request")) {
4522 failure_test = FT_CRASH_BEFORE_SEND_APPEND_REQ;
4523 } else if (!strcmp(test, "crash-after-sending-append-request")) {
4524 failure_test = FT_CRASH_AFTER_SEND_APPEND_REQ;
4525 } else if (!strcmp(test, "crash-before-sending-execute-command-reply")) {
4526 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REP;
4527 } else if (!strcmp(test, "crash-after-sending-execute-command-reply")) {
4528 failure_test = FT_CRASH_AFTER_SEND_EXEC_REP;
4529 } else if (!strcmp(test, "crash-before-sending-execute-command-request")) {
4530 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REQ;
4531 } else if (!strcmp(test, "crash-after-sending-execute-command-request")) {
4532 failure_test = FT_CRASH_AFTER_SEND_EXEC_REQ;
4533 } else if (!strcmp(test, "crash-after-receiving-append-request-update")) {
4534 failure_test = FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE;
4535 } else if (!strcmp(test, "delay-election")) {
4536 failure_test = FT_DELAY_ELECTION;
4537 struct raft *raft;
4538 HMAP_FOR_EACH (raft, hmap_node, &all_rafts) {
4539 if (raft->role == RAFT_FOLLOWER) {
4540 raft_reset_election_timer(raft);
4541 }
4542 }
4543 } else if (!strcmp(test, "clear")) {
4544 failure_test = FT_NO_TEST;
4545 unixctl_command_reply(conn, "test dismissed");
4546 return;
4547 } else {
4548 unixctl_command_reply_error(conn, "unknown test scenario");
4549 return;
4550 }
4551 unixctl_command_reply(conn, "test engaged");
4552 }
4553
4554 static void
4555 raft_init(void)
4556 {
4557 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
4558 if (!ovsthread_once_start(&once)) {
4559 return;
4560 }
4561 unixctl_command_register("cluster/cid", "DB", 1, 1,
4562 raft_unixctl_cid, NULL);
4563 unixctl_command_register("cluster/sid", "DB", 1, 1,
4564 raft_unixctl_sid, NULL);
4565 unixctl_command_register("cluster/status", "DB", 1, 1,
4566 raft_unixctl_status, NULL);
4567 unixctl_command_register("cluster/leave", "DB", 1, 1,
4568 raft_unixctl_leave, NULL);
4569 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4570 raft_unixctl_kick, NULL);
4571 unixctl_command_register("cluster/failure-test", "FAILURE SCENARIO", 1, 1,
4572 raft_unixctl_failure_test, NULL);
4573 ovsthread_once_done(&once);
4574 }