]> git.proxmox.com Git - mirror_ovs.git/blame - ovsdb/raft.c
dist-docs: Include manpages generated from rST.
[mirror_ovs.git] / ovsdb / raft.c
CommitLineData
1b1d2e6d
BP
1/*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <config.h>
18
19#include "raft.h"
20#include "raft-private.h"
21
22#include <errno.h>
23#include <unistd.h>
24
25#include "hash.h"
26#include "jsonrpc.h"
27#include "lockfile.h"
28#include "openvswitch/dynamic-string.h"
29#include "openvswitch/hmap.h"
30#include "openvswitch/json.h"
31#include "openvswitch/list.h"
32#include "openvswitch/poll-loop.h"
33#include "openvswitch/vlog.h"
34#include "ovsdb-error.h"
35#include "ovsdb-parser.h"
36#include "ovsdb/log.h"
37#include "raft-rpc.h"
38#include "random.h"
3423cd97 39#include "simap.h"
1b1d2e6d
BP
40#include "socket-util.h"
41#include "stream.h"
42#include "timeval.h"
43#include "unicode.h"
44#include "unixctl.h"
45#include "util.h"
46#include "uuid.h"
47
48VLOG_DEFINE_THIS_MODULE(raft);
49
50/* Roles for a Raft server:
51 *
52 * - Followers: Servers in touch with the current leader.
53 *
54 * - Candidate: Servers unaware of a current leader and seeking election to
55 * leader.
56 *
57 * - Leader: Handles all client requests. At most one at a time.
58 *
59 * In normal operation there is exactly one leader and all of the other servers
60 * are followers. */
61enum raft_role {
62 RAFT_FOLLOWER,
63 RAFT_CANDIDATE,
64 RAFT_LEADER
65};
66
67dba070
HZ
67/* Flags for unit tests. */
68enum raft_failure_test {
69 FT_NO_TEST,
70 FT_CRASH_BEFORE_SEND_APPEND_REQ,
71 FT_CRASH_AFTER_SEND_APPEND_REQ,
72 FT_CRASH_BEFORE_SEND_EXEC_REP,
73 FT_CRASH_AFTER_SEND_EXEC_REP,
74 FT_CRASH_BEFORE_SEND_EXEC_REQ,
75 FT_CRASH_AFTER_SEND_EXEC_REQ,
76 FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE,
93ee4209
HZ
77 FT_DELAY_ELECTION,
78 FT_DONT_SEND_VOTE_REQUEST
67dba070
HZ
79};
80static enum raft_failure_test failure_test;
81
1b1d2e6d
BP
82/* A connection between this Raft server and another one. */
83struct raft_conn {
84 struct ovs_list list_node; /* In struct raft's 'conns' list. */
85 struct jsonrpc_session *js; /* JSON-RPC connection. */
86 struct uuid sid; /* This server's unique ID. */
87 char *nickname; /* Short name for use in log messages. */
88 bool incoming; /* True if incoming, false if outgoing. */
89 unsigned int js_seqno; /* Seqno for noticing (re)connections. */
90};
91
92static void raft_conn_close(struct raft_conn *);
93
94/* A "command", that is, a request to append an entry to the log.
95 *
96 * The Raft specification only allows clients to issue commands to the leader.
97 * With this implementation, clients may issue a command on any server, which
98 * then relays the command to the leader if necessary.
99 *
100 * This structure is thus used in three cases:
101 *
102 * 1. We are the leader and the command was issued to us directly.
103 *
104 * 2. We are a follower and relayed the command to the leader.
105 *
106 * 3. We are the leader and a follower relayed the command to us.
107 */
108struct raft_command {
109 /* All cases. */
110 struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
111 unsigned int n_refs; /* Reference count. */
112 enum raft_command_status status; /* Execution status. */
7ef36089 113 struct uuid eid; /* Entry ID of result. */
1b1d2e6d
BP
114
115 /* Case 1 only. */
116 uint64_t index; /* Index in log (0 if being relayed). */
117
1b1d2e6d
BP
118 /* Case 2 only. */
119 long long int timestamp; /* Issue or last ping time, for expiration. */
120
121 /* Case 3 only. */
122 struct uuid sid; /* The follower (otherwise UUID_ZERO). */
123};
124
125static void raft_command_complete(struct raft *, struct raft_command *,
126 enum raft_command_status);
127
128static void raft_complete_all_commands(struct raft *,
129 enum raft_command_status);
130
131/* Type of deferred action, see struct raft_waiter. */
132enum raft_waiter_type {
133 RAFT_W_ENTRY,
134 RAFT_W_TERM,
135 RAFT_W_RPC,
136};
137
138/* An action deferred until a log write commits to disk. */
139struct raft_waiter {
140 struct ovs_list list_node;
141 uint64_t commit_ticket;
142
143 enum raft_waiter_type type;
144 union {
145 /* RAFT_W_ENTRY.
146 *
147 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
148 * completion, updates 'log_synced' to indicate that the new log entry
149 * or entries are committed and, if we are leader, also updates our
150 * local 'match_index'. */
151 struct {
152 uint64_t index;
153 } entry;
154
155 /* RAFT_W_TERM.
156 *
157 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
158 * Upon completion, updates 'synced_term' and 'synced_vote', which
159 * triggers sending RPCs deferred by the uncommitted 'term' and
160 * 'vote'. */
161 struct {
162 uint64_t term;
163 struct uuid vote;
164 } term;
165
166 /* RAFT_W_RPC.
167 *
168 * Sometimes, sending an RPC to a peer must be delayed until an entry,
169 * a term, or a vote mentioned in the RPC is synced to disk. This
170 * waiter keeps a copy of such an RPC until the previous waiters have
171 * committed. */
172 union raft_rpc *rpc;
173 };
174};
175
176static struct raft_waiter *raft_waiter_create(struct raft *,
177 enum raft_waiter_type,
178 bool start_commit);
179static void raft_waiters_destroy(struct raft *);
180
181/* The Raft state machine. */
182struct raft {
183 struct hmap_node hmap_node; /* In 'all_rafts'. */
184 struct ovsdb_log *log;
185
186/* Persistent derived state.
187 *
188 * This must be updated on stable storage before responding to RPCs. It can be
189 * derived from the header, snapshot, and log in 'log'. */
190
191 struct uuid cid; /* Cluster ID (immutable for the cluster). */
192 struct uuid sid; /* Server ID (immutable for the server). */
193 char *local_address; /* Local address (immutable for the server). */
194 char *local_nickname; /* Used for local server in log messages. */
195 char *name; /* Schema name (immutable for the cluster). */
196
197 /* Contains "struct raft_server"s and represents the server configuration
198 * most recently added to 'log'. */
199 struct hmap servers;
200
8e354614
HZ
201#define ELECTION_BASE_MSEC 1000
202#define ELECTION_RANGE_MSEC 1000
203 /* The election timeout base value for leader election, in milliseconds.
204 * It can be set by unixctl cluster/change-election-timer. Default value is
205 * ELECTION_BASE_MSEC. */
206 uint64_t election_timer;
207 /* If not 0, it is the new value of election_timer being proposed. */
208 uint64_t election_timer_new;
209
1b1d2e6d
BP
210/* Persistent state on all servers.
211 *
212 * Must be updated on stable storage before responding to RPCs. */
213
214 /* Current term and the vote for that term. These might be on the way to
215 * disk now. */
216 uint64_t term; /* Initialized to 0 and only increases. */
217 struct uuid vote; /* All-zeros if no vote yet in 'term'. */
218
219 /* The term and vote that have been synced to disk. */
220 uint64_t synced_term;
221 struct uuid synced_vote;
222
223 /* The log.
224 *
225 * A log entry with index 1 never really exists; the initial snapshot for a
226 * Raft is considered to include this index. The first real log entry has
227 * index 2.
228 *
229 * A new Raft instance contains an empty log: log_start=2, log_end=2.
230 * Over time, the log grows: log_start=2, log_end=N.
231 * At some point, the server takes a snapshot: log_start=N, log_end=N.
232 * The log continues to grow: log_start=N, log_end=N+1...
233 *
234 * Must be updated on stable storage before responding to RPCs. */
235 struct raft_entry *entries; /* Log entry i is in log[i - log_start]. */
236 uint64_t log_start; /* Index of first entry in log. */
237 uint64_t log_end; /* Index of last entry in log, plus 1. */
238 uint64_t log_synced; /* Index of last synced entry. */
239 size_t allocated_log; /* Allocated entries in 'log'. */
240
241 /* Snapshot state (see Figure 5.1)
242 *
243 * This is the state of the cluster as of the last discarded log entry,
244 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
245 * Only committed log entries can be included in a snapshot. */
246 struct raft_entry snap;
247
248/* Volatile state.
249 *
250 * The snapshot is always committed, but the rest of the log might not be yet.
251 * 'last_applied' tracks what entries have been passed to the client. If the
252 * client hasn't yet read the latest snapshot, then even the snapshot isn't
253 * applied yet. Thus, the invariants are different for these members:
254 *
255 * log_start - 2 <= last_applied <= commit_index < log_end.
256 * log_start - 1 <= commit_index < log_end.
257 */
258
259 enum raft_role role; /* Current role. */
260 uint64_t commit_index; /* Max log index known to be committed. */
261 uint64_t last_applied; /* Max log index applied to state machine. */
262 struct uuid leader_sid; /* Server ID of leader (zero, if unknown). */
263
1b1d2e6d
BP
264 long long int election_base; /* Time of last heartbeat from leader. */
265 long long int election_timeout; /* Time at which we start an election. */
266
e8451e14
LB
267 long long int election_start; /* Start election time. */
268 long long int election_won; /* Time of election completion. */
269 bool leadership_transfer; /* Was the leadership transferred? */
270
271 unsigned int n_disconnections;
272
1b1d2e6d
BP
273 /* Used for joining a cluster. */
274 bool joining; /* Attempting to join the cluster? */
275 struct sset remote_addresses; /* Addresses to try to find other servers. */
276 long long int join_timeout; /* Time to re-send add server request. */
277
278 /* Used for leaving a cluster. */
279 bool leaving; /* True if we are leaving the cluster. */
280 bool left; /* True if we have finished leaving. */
281 long long int leave_timeout; /* Time to re-send remove server request. */
282
283 /* Failure. */
284 bool failed; /* True if unrecoverable error has occurred. */
285
286 /* File synchronization. */
287 struct ovs_list waiters; /* Contains "struct raft_waiter"s. */
288
289 /* Network connections. */
290 struct pstream *listener; /* For connections from other Raft servers. */
291 long long int listen_backoff; /* For retrying creating 'listener'. */
292 struct ovs_list conns; /* Contains struct raft_conns. */
293
294 /* Leaders only. Reinitialized after becoming leader. */
295 struct hmap add_servers; /* Contains "struct raft_server"s to add. */
296 struct raft_server *remove_server; /* Server being removed. */
297 struct hmap commands; /* Contains "struct raft_command"s. */
1b1d2e6d
BP
298 long long int ping_timeout; /* Time at which to send a heartbeat */
299
300 /* Candidates only. Reinitialized at start of election. */
301 int n_votes; /* Number of votes for me. */
923f01ca
HZ
302
303 /* Followers and candidates only. */
304 bool candidate_retrying; /* The earlier election timed-out and we are
305 now retrying. */
306 bool had_leader; /* There has been leader elected since last
307 election initiated. This is to help setting
308 candidate_retrying. */
2833885f
HZ
309
310 /* For all. */
311 bool ever_had_leader; /* There has been leader elected since the raft
312 is initialized, meaning it is ever
313 connected. */
80e3becd
IM
314
315 /* Connection backlog limits. */
316#define DEFAULT_MAX_BACKLOG_N_MSGS 500
317#define DEFAULT_MAX_BACKLOG_N_BYTES UINT32_MAX
318 size_t conn_backlog_max_n_msgs; /* Number of messages. */
319 size_t conn_backlog_max_n_bytes; /* Number of bytes. */
1b1d2e6d
BP
320};
321
322/* All Raft structures. */
323static struct hmap all_rafts = HMAP_INITIALIZER(&all_rafts);
324
325static void raft_init(void);
326
327static struct ovsdb_error *raft_read_header(struct raft *)
328 OVS_WARN_UNUSED_RESULT;
329
330static void raft_send_execute_command_reply(struct raft *,
331 const struct uuid *sid,
332 const struct uuid *eid,
333 enum raft_command_status,
334 uint64_t commit_index);
335
336static void raft_update_our_match_index(struct raft *, uint64_t min_index);
337
338static void raft_send_remove_server_reply__(
339 struct raft *, const struct uuid *target_sid,
340 const struct uuid *requester_sid, struct unixctl_conn *requester_conn,
341 bool success, const char *comment);
17bd4149 342static void raft_finished_leaving_cluster(struct raft *);
1b1d2e6d
BP
343
344static void raft_server_init_leader(struct raft *, struct raft_server *);
345
346static bool raft_rpc_is_heartbeat(const union raft_rpc *);
347static bool raft_is_rpc_synced(const struct raft *, const union raft_rpc *);
348
349static void raft_handle_rpc(struct raft *, const union raft_rpc *);
17bd4149 350
02acb41a
BP
351static bool raft_send_at(struct raft *, const union raft_rpc *,
352 int line_number);
353#define raft_send(raft, rpc) raft_send_at(raft, rpc, __LINE__)
354
355static bool raft_send_to_conn_at(struct raft *, const union raft_rpc *,
356 struct raft_conn *, int line_number);
357#define raft_send_to_conn(raft, rpc, conn) \
358 raft_send_to_conn_at(raft, rpc, conn, __LINE__)
359
1b1d2e6d
BP
360static void raft_send_append_request(struct raft *,
361 struct raft_server *, unsigned int n,
362 const char *comment);
363
364static void raft_become_leader(struct raft *);
365static void raft_become_follower(struct raft *);
0f954f32
HZ
366static void raft_reset_election_timer(struct raft *);
367static void raft_reset_ping_timer(struct raft *);
1b1d2e6d
BP
368static void raft_send_heartbeats(struct raft *);
369static void raft_start_election(struct raft *, bool leadership_transfer);
370static bool raft_truncate(struct raft *, uint64_t new_end);
371static void raft_get_servers_from_log(struct raft *, enum vlog_level);
8e354614 372static void raft_get_election_timer_from_log(struct raft *);
1b1d2e6d
BP
373
374static bool raft_handle_write_error(struct raft *, struct ovsdb_error *);
375
376static void raft_run_reconfigure(struct raft *);
377
923f01ca 378static void raft_set_leader(struct raft *, const struct uuid *sid);
1b1d2e6d
BP
379static struct raft_server *
380raft_find_server(const struct raft *raft, const struct uuid *sid)
381{
382 return raft_server_find(&raft->servers, sid);
383}
384
385static char *
386raft_make_address_passive(const char *address_)
387{
388 if (!strncmp(address_, "unix:", 5)) {
389 return xasprintf("p%s", address_);
390 } else {
391 char *address = xstrdup(address_);
0b043300
BP
392 char *host, *port;
393 inet_parse_host_port_tokens(strchr(address, ':') + 1, &host, &port);
1b1d2e6d
BP
394
395 struct ds paddr = DS_EMPTY_INITIALIZER;
396 ds_put_format(&paddr, "p%.3s:%s:", address, port);
397 if (strchr(host, ':')) {
398 ds_put_format(&paddr, "[%s]", host);
399 } else {
400 ds_put_cstr(&paddr, host);
401 }
402 free(address);
403 return ds_steal_cstr(&paddr);
404 }
405}
406
407static struct raft *
408raft_alloc(void)
409{
410 raft_init();
411
412 struct raft *raft = xzalloc(sizeof *raft);
413 hmap_node_nullify(&raft->hmap_node);
414 hmap_init(&raft->servers);
415 raft->log_start = raft->log_end = 1;
416 raft->role = RAFT_FOLLOWER;
417 sset_init(&raft->remote_addresses);
418 raft->join_timeout = LLONG_MAX;
419 ovs_list_init(&raft->waiters);
420 raft->listen_backoff = LLONG_MIN;
421 ovs_list_init(&raft->conns);
422 hmap_init(&raft->add_servers);
423 hmap_init(&raft->commands);
424
8e354614 425 raft->election_timer = ELECTION_BASE_MSEC;
1b1d2e6d 426
80e3becd
IM
427 raft->conn_backlog_max_n_msgs = DEFAULT_MAX_BACKLOG_N_MSGS;
428 raft->conn_backlog_max_n_bytes = DEFAULT_MAX_BACKLOG_N_BYTES;
429
1b1d2e6d
BP
430 return raft;
431}
432
433/* Creates an on-disk file that represents a new Raft cluster and initializes
434 * it to consist of a single server, the one on which this function is called.
435 *
436 * Creates the local copy of the cluster's log in 'file_name', which must not
437 * already exist. Gives it the name 'name', which should be the database
438 * schema name and which is used only to match up this database with the server
439 * added to the cluster later if the cluster ID is unavailable.
440 *
441 * The new server is located at 'local_address', which must take one of the
442 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
443 * square bracket enclosed IPv6 address and PORT is a TCP port number.
444 *
445 * This only creates the on-disk file. Use raft_open() to start operating the
446 * new server.
447 *
448 * Returns null if successful, otherwise an ovsdb_error describing the
449 * problem. */
450struct ovsdb_error * OVS_WARN_UNUSED_RESULT
451raft_create_cluster(const char *file_name, const char *name,
452 const char *local_address, const struct json *data)
453{
454 /* Parse and verify validity of the local address. */
455 struct ovsdb_error *error = raft_address_validate(local_address);
456 if (error) {
457 return error;
458 }
459
460 /* Create log file. */
461 struct ovsdb_log *log;
462 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
463 -1, &log);
464 if (error) {
465 return error;
466 }
467
468 /* Write log file. */
469 struct raft_header h = {
470 .sid = uuid_random(),
471 .cid = uuid_random(),
472 .name = xstrdup(name),
473 .local_address = xstrdup(local_address),
474 .joining = false,
475 .remote_addresses = SSET_INITIALIZER(&h.remote_addresses),
476 .snap_index = 1,
477 .snap = {
478 .term = 1,
479 .data = json_nullable_clone(data),
480 .eid = uuid_random(),
481 .servers = json_object_create(),
482 },
483 };
484 shash_add_nocopy(json_object(h.snap.servers),
485 xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
486 json_string_create(local_address));
487 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
488 raft_header_uninit(&h);
489 if (!error) {
490 error = ovsdb_log_commit_block(log);
491 }
492 ovsdb_log_close(log);
493
494 return error;
495}
496
497/* Creates a database file that represents a new server in an existing Raft
498 * cluster.
499 *
500 * Creates the local copy of the cluster's log in 'file_name', which must not
501 * already exist. Gives it the name 'name', which must be the same name
502 * passed in to raft_create_cluster() earlier.
503 *
504 * 'cid' is optional. If specified, the new server will join only the cluster
505 * with the given cluster ID.
506 *
507 * The new server is located at 'local_address', which must take one of the
508 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
509 * square bracket enclosed IPv6 address and PORT is a TCP port number.
510 *
511 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
512 * specifies the addresses of existing servers in the cluster. One server out
513 * of the existing cluster is sufficient, as long as that server is reachable
514 * and not partitioned from the current cluster leader. If multiple servers
515 * from the cluster are specified, then it is sufficient for any of them to
516 * meet this criterion.
517 *
518 * This only creates the on-disk file and does no network access. Use
519 * raft_open() to start operating the new server. (Until this happens, the
520 * new server has not joined the cluster.)
521 *
522 * Returns null if successful, otherwise an ovsdb_error describing the
523 * problem. */
524struct ovsdb_error * OVS_WARN_UNUSED_RESULT
525raft_join_cluster(const char *file_name,
526 const char *name, const char *local_address,
527 const struct sset *remote_addresses,
528 const struct uuid *cid)
529{
530 ovs_assert(!sset_is_empty(remote_addresses));
531
532 /* Parse and verify validity of the addresses. */
533 struct ovsdb_error *error = raft_address_validate(local_address);
534 if (error) {
535 return error;
536 }
537 const char *addr;
538 SSET_FOR_EACH (addr, remote_addresses) {
539 error = raft_address_validate(addr);
540 if (error) {
541 return error;
542 }
543 if (!strcmp(addr, local_address)) {
544 return ovsdb_error(NULL, "remote addresses cannot be the same "
545 "as the local address");
546 }
547 }
548
549 /* Verify validity of the cluster ID (if provided). */
550 if (cid && uuid_is_zero(cid)) {
551 return ovsdb_error(NULL, "all-zero UUID is not valid cluster ID");
552 }
553
554 /* Create log file. */
555 struct ovsdb_log *log;
556 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
557 -1, &log);
558 if (error) {
559 return error;
560 }
561
562 /* Write log file. */
563 struct raft_header h = {
564 .sid = uuid_random(),
565 .cid = cid ? *cid : UUID_ZERO,
566 .name = xstrdup(name),
567 .local_address = xstrdup(local_address),
568 .joining = true,
569 /* No snapshot yet. */
570 };
571 sset_clone(&h.remote_addresses, remote_addresses);
572 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
573 raft_header_uninit(&h);
574 if (!error) {
575 error = ovsdb_log_commit_block(log);
576 }
577 ovsdb_log_close(log);
578
579 return error;
580}
581
582/* Reads the initial header record from 'log', which must be a Raft clustered
583 * database log, and populates '*md' with the information read from it. The
584 * caller must eventually destroy 'md' with raft_metadata_destroy().
585 *
586 * On success, returns NULL. On failure, returns an error that the caller must
587 * eventually destroy and zeros '*md'. */
588struct ovsdb_error * OVS_WARN_UNUSED_RESULT
589raft_read_metadata(struct ovsdb_log *log, struct raft_metadata *md)
590{
591 struct raft *raft = raft_alloc();
592 raft->log = log;
593
594 struct ovsdb_error *error = raft_read_header(raft);
595 if (!error) {
596 md->sid = raft->sid;
597 md->name = xstrdup(raft->name);
598 md->local = xstrdup(raft->local_address);
599 md->cid = raft->cid;
600 } else {
601 memset(md, 0, sizeof *md);
602 }
603
604 raft->log = NULL;
605 raft_close(raft);
606 return error;
607}
608
609/* Frees the metadata in 'md'. */
610void
611raft_metadata_destroy(struct raft_metadata *md)
612{
613 if (md) {
614 free(md->name);
615 free(md->local);
616 }
617}
618
619static const struct raft_entry *
620raft_get_entry(const struct raft *raft, uint64_t index)
621{
622 ovs_assert(index >= raft->log_start);
623 ovs_assert(index < raft->log_end);
624 return &raft->entries[index - raft->log_start];
625}
626
627static uint64_t
628raft_get_term(const struct raft *raft, uint64_t index)
629{
630 return (index == raft->log_start - 1
631 ? raft->snap.term
632 : raft_get_entry(raft, index)->term);
633}
634
635static const struct json *
636raft_servers_for_index(const struct raft *raft, uint64_t index)
637{
638 ovs_assert(index >= raft->log_start - 1);
639 ovs_assert(index < raft->log_end);
640
641 const struct json *servers = raft->snap.servers;
642 for (uint64_t i = raft->log_start; i <= index; i++) {
643 const struct raft_entry *e = raft_get_entry(raft, i);
644 if (e->servers) {
645 servers = e->servers;
646 }
647 }
648 return servers;
649}
650
651static void
652raft_set_servers(struct raft *raft, const struct hmap *new_servers,
653 enum vlog_level level)
654{
655 struct raft_server *s, *next;
656 HMAP_FOR_EACH_SAFE (s, next, hmap_node, &raft->servers) {
657 if (!raft_server_find(new_servers, &s->sid)) {
658 ovs_assert(s != raft->remove_server);
659
660 hmap_remove(&raft->servers, &s->hmap_node);
661 VLOG(level, "server %s removed from configuration", s->nickname);
662 raft_server_destroy(s);
663 }
664 }
665
666 HMAP_FOR_EACH_SAFE (s, next, hmap_node, new_servers) {
667 if (!raft_find_server(raft, &s->sid)) {
668 VLOG(level, "server %s added to configuration", s->nickname);
669
670 struct raft_server *new
671 = raft_server_add(&raft->servers, &s->sid, s->address);
672 raft_server_init_leader(raft, new);
673 }
674 }
675}
676
677static uint64_t
678raft_add_entry(struct raft *raft,
679 uint64_t term, struct json *data, const struct uuid *eid,
8e354614 680 struct json *servers, uint64_t election_timer)
1b1d2e6d
BP
681{
682 if (raft->log_end - raft->log_start >= raft->allocated_log) {
683 raft->entries = x2nrealloc(raft->entries, &raft->allocated_log,
684 sizeof *raft->entries);
685 }
686
687 uint64_t index = raft->log_end++;
688 struct raft_entry *entry = &raft->entries[index - raft->log_start];
689 entry->term = term;
690 entry->data = data;
691 entry->eid = eid ? *eid : UUID_ZERO;
692 entry->servers = servers;
8e354614 693 entry->election_timer = election_timer;
1b1d2e6d
BP
694 return index;
695}
696
8e354614
HZ
697/* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers',
698 * 'election_timer' to * 'raft''s log and returns an error indication. */
1b1d2e6d
BP
699static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
700raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
8e354614
HZ
701 const struct uuid *eid, struct json *servers,
702 uint64_t election_timer)
1b1d2e6d
BP
703{
704 struct raft_record r = {
705 .type = RAFT_REC_ENTRY,
706 .term = term,
707 .entry = {
8e354614
HZ
708 .index = raft_add_entry(raft, term, data, eid, servers,
709 election_timer),
1b1d2e6d
BP
710 .data = data,
711 .servers = servers,
8e354614 712 .election_timer = election_timer,
1b1d2e6d
BP
713 .eid = eid ? *eid : UUID_ZERO,
714 },
715 };
716 return ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r));
717}
718
719static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
720raft_write_state(struct ovsdb_log *log,
721 uint64_t term, const struct uuid *vote)
722{
723 struct raft_record r = { .term = term };
724 if (vote && !uuid_is_zero(vote)) {
725 r.type = RAFT_REC_VOTE;
726 r.sid = *vote;
727 } else {
728 r.type = RAFT_REC_TERM;
729 }
730 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
731}
732
733static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
734raft_apply_record(struct raft *raft, unsigned long long int rec_idx,
735 const struct raft_record *r)
736{
737 /* Apply "term", which is present in most kinds of records (and otherwise
738 * 0).
739 *
740 * A Raft leader can replicate entries from previous terms to the other
741 * servers in the cluster, retaining the original terms on those entries
742 * (see section 3.6.2 "Committing entries from previous terms" for more
743 * information), so it's OK for the term in a log record to precede the
744 * current term. */
745 if (r->term > raft->term) {
746 raft->term = raft->synced_term = r->term;
747 raft->vote = raft->synced_vote = UUID_ZERO;
748 }
749
750 switch (r->type) {
751 case RAFT_REC_ENTRY:
752 if (r->entry.index < raft->commit_index) {
753 return ovsdb_error(NULL, "record %llu attempts to truncate log "
754 "from %"PRIu64" to %"PRIu64" entries, but "
755 "commit index is already %"PRIu64,
756 rec_idx, raft->log_end, r->entry.index,
757 raft->commit_index);
758 } else if (r->entry.index > raft->log_end) {
759 return ovsdb_error(NULL, "record %llu with index %"PRIu64" skips "
760 "past expected index %"PRIu64,
761 rec_idx, r->entry.index, raft->log_end);
762 }
763
764 if (r->entry.index < raft->log_end) {
765 /* This can happen, but it is notable. */
766 VLOG_DBG("record %llu truncates log from %"PRIu64" to %"PRIu64
767 " entries", rec_idx, raft->log_end, r->entry.index);
768 raft_truncate(raft, r->entry.index);
769 }
770
771 uint64_t prev_term = (raft->log_end > raft->log_start
772 ? raft->entries[raft->log_end
773 - raft->log_start - 1].term
774 : raft->snap.term);
775 if (r->term < prev_term) {
776 return ovsdb_error(NULL, "record %llu with index %"PRIu64" term "
777 "%"PRIu64" precedes previous entry's term "
778 "%"PRIu64,
779 rec_idx, r->entry.index, r->term, prev_term);
780 }
781
782 raft->log_synced = raft_add_entry(
783 raft, r->term,
784 json_nullable_clone(r->entry.data), &r->entry.eid,
8e354614
HZ
785 json_nullable_clone(r->entry.servers),
786 r->entry.election_timer);
1b1d2e6d
BP
787 return NULL;
788
789 case RAFT_REC_TERM:
790 return NULL;
791
792 case RAFT_REC_VOTE:
793 if (r->term < raft->term) {
794 return ovsdb_error(NULL, "record %llu votes for term %"PRIu64" "
795 "but current term is %"PRIu64,
796 rec_idx, r->term, raft->term);
797 } else if (!uuid_is_zero(&raft->vote)
798 && !uuid_equals(&raft->vote, &r->sid)) {
799 return ovsdb_error(NULL, "record %llu votes for "SID_FMT" in term "
800 "%"PRIu64" but a previous record for the "
801 "same term voted for "SID_FMT, rec_idx,
802 SID_ARGS(&raft->vote), r->term,
803 SID_ARGS(&r->sid));
804 } else {
805 raft->vote = raft->synced_vote = r->sid;
806 return NULL;
807 }
808 break;
809
810 case RAFT_REC_NOTE:
811 if (!strcmp(r->note, "left")) {
812 return ovsdb_error(NULL, "record %llu indicates server has left "
813 "the cluster; it cannot be added back (use "
814 "\"ovsdb-tool join-cluster\" to add a new "
815 "server)", rec_idx);
816 }
817 return NULL;
818
819 case RAFT_REC_COMMIT_INDEX:
820 if (r->commit_index < raft->commit_index) {
821 return ovsdb_error(NULL, "record %llu regresses commit index "
822 "from %"PRIu64 " to %"PRIu64,
823 rec_idx, raft->commit_index, r->commit_index);
824 } else if (r->commit_index >= raft->log_end) {
825 return ovsdb_error(NULL, "record %llu advances commit index to "
826 "%"PRIu64 " but last log index is %"PRIu64,
827 rec_idx, r->commit_index, raft->log_end - 1);
828 } else {
829 raft->commit_index = r->commit_index;
830 return NULL;
831 }
832 break;
833
834 case RAFT_REC_LEADER:
835 /* XXX we could use this to take back leadership for quick restart */
836 return NULL;
837
838 default:
839 OVS_NOT_REACHED();
840 }
841}
842
843static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
844raft_read_header(struct raft *raft)
845{
846 /* Read header record. */
847 struct json *json;
848 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
849 if (error || !json) {
850 /* Report error or end-of-file. */
851 return error;
852 }
853 ovsdb_log_mark_base(raft->log);
854
855 struct raft_header h;
856 error = raft_header_from_json(&h, json);
857 json_destroy(json);
858 if (error) {
859 return error;
860 }
861
862 raft->sid = h.sid;
863 raft->cid = h.cid;
864 raft->name = xstrdup(h.name);
865 raft->local_address = xstrdup(h.local_address);
866 raft->local_nickname = raft_address_to_nickname(h.local_address, &h.sid);
867 raft->joining = h.joining;
868
869 if (h.joining) {
870 sset_clone(&raft->remote_addresses, &h.remote_addresses);
871 } else {
872 raft_entry_clone(&raft->snap, &h.snap);
873 raft->log_start = raft->log_end = h.snap_index + 1;
7efc6980 874 raft->log_synced = raft->commit_index = h.snap_index;
1b1d2e6d
BP
875 raft->last_applied = h.snap_index - 1;
876 }
877
878 raft_header_uninit(&h);
879
880 return NULL;
881}
882
883static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
884raft_read_log(struct raft *raft)
885{
886 for (unsigned long long int i = 1; ; i++) {
887 struct json *json;
888 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
889 if (!json) {
890 if (error) {
891 /* We assume that the error is due to a partial write while
892 * appending to the file before a crash, so log it and
893 * continue. */
894 char *error_string = ovsdb_error_to_string_free(error);
895 VLOG_WARN("%s", error_string);
896 free(error_string);
897 error = NULL;
898 }
899 break;
900 }
901
902 struct raft_record r;
903 error = raft_record_from_json(&r, json);
904 if (!error) {
905 error = raft_apply_record(raft, i, &r);
906 raft_record_uninit(&r);
907 }
ce52f1ee 908 json_destroy(json);
1b1d2e6d
BP
909 if (error) {
910 return ovsdb_wrap_error(error, "error reading record %llu from "
911 "%s log", i, raft->name);
912 }
913 }
914
915 /* Set the most recent servers. */
916 raft_get_servers_from_log(raft, VLL_DBG);
917
8e354614
HZ
918 /* Set the most recent election_timer. */
919 raft_get_election_timer_from_log(raft);
920
1b1d2e6d
BP
921 return NULL;
922}
923
924static void
0f954f32 925raft_reset_election_timer(struct raft *raft)
1b1d2e6d 926{
8e354614 927 unsigned int duration = (raft->election_timer
1b1d2e6d
BP
928 + random_range(ELECTION_RANGE_MSEC));
929 raft->election_base = time_msec();
67dba070
HZ
930 if (failure_test == FT_DELAY_ELECTION) {
931 /* Slow down this node so that it won't win the next election. */
8e354614 932 duration += raft->election_timer;
67dba070 933 }
1b1d2e6d
BP
934 raft->election_timeout = raft->election_base + duration;
935}
936
0f954f32
HZ
937static void
938raft_reset_ping_timer(struct raft *raft)
939{
8e354614 940 raft->ping_timeout = time_msec() + raft->election_timer / 3;
0f954f32
HZ
941}
942
1b1d2e6d
BP
943static void
944raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
945 const struct uuid *sid, bool incoming)
946{
947 struct raft_conn *conn = xzalloc(sizeof *conn);
948 ovs_list_push_back(&raft->conns, &conn->list_node);
949 conn->js = js;
950 if (sid) {
951 conn->sid = *sid;
952 }
953 conn->nickname = raft_address_to_nickname(jsonrpc_session_get_name(js),
954 &conn->sid);
955 conn->incoming = incoming;
956 conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
db5a066c 957 jsonrpc_session_set_probe_interval(js, 0);
80e3becd
IM
958 jsonrpc_session_set_backlog_threshold(js, raft->conn_backlog_max_n_msgs,
959 raft->conn_backlog_max_n_bytes);
1b1d2e6d
BP
960}
961
962/* Starts the local server in an existing Raft cluster, using the local copy of
963 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
964 * successful or not. */
965struct ovsdb_error * OVS_WARN_UNUSED_RESULT
966raft_open(struct ovsdb_log *log, struct raft **raftp)
967{
968 struct raft *raft = raft_alloc();
969 raft->log = log;
970
971 struct ovsdb_error *error = raft_read_header(raft);
972 if (error) {
973 goto error;
974 }
975
976 if (!raft->joining) {
977 error = raft_read_log(raft);
978 if (error) {
979 goto error;
980 }
981
982 /* Find our own server. */
983 if (!raft_find_server(raft, &raft->sid)) {
984 error = ovsdb_error(NULL, "server does not belong to cluster");
985 goto error;
986 }
987
988 /* If there's only one server, start an election right away so that the
989 * cluster bootstraps quickly. */
990 if (hmap_count(&raft->servers) == 1) {
991 raft_start_election(raft, false);
992 }
993 } else {
994 raft->join_timeout = time_msec() + 1000;
995 }
996
ac3ba8c6
HZ
997 raft_reset_ping_timer(raft);
998 raft_reset_election_timer(raft);
999
1b1d2e6d
BP
1000 *raftp = raft;
1001 hmap_insert(&all_rafts, &raft->hmap_node, hash_string(raft->name, 0));
1002 return NULL;
1003
1004error:
1005 raft_close(raft);
1006 *raftp = NULL;
1007 return error;
1008}
1009
1010/* Returns the name of 'raft', which in OVSDB is the database schema name. */
1011const char *
1012raft_get_name(const struct raft *raft)
1013{
1014 return raft->name;
1015}
1016
1017/* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
1018 * its cluster, then 'cid' will be all-zeros (unless the administrator
1019 * specified a cluster ID running "ovsdb-tool join-cluster").
1020 *
1021 * Each cluster has a unique cluster ID. */
1022const struct uuid *
1023raft_get_cid(const struct raft *raft)
1024{
1025 return &raft->cid;
1026}
1027
1028/* Returns the server ID of 'raft'. Each server has a unique server ID. */
1029const struct uuid *
1030raft_get_sid(const struct raft *raft)
1031{
1032 return &raft->sid;
1033}
1034
3423cd97
IM
1035/* Adds memory consumption info to 'usage' for later use by memory_report(). */
1036void
1037raft_get_memory_usage(const struct raft *raft, struct simap *usage)
1038{
1039 struct raft_conn *conn;
6182c695 1040 uint64_t backlog = 0;
3423cd97
IM
1041 int cnt = 0;
1042
1043 LIST_FOR_EACH (conn, list_node, &raft->conns) {
6182c695 1044 backlog += jsonrpc_session_get_backlog(conn->js);
3423cd97
IM
1045 cnt++;
1046 }
6182c695 1047 simap_increase(usage, "raft-backlog-kB", backlog / 1000);
3423cd97 1048 simap_increase(usage, "raft-connections", cnt);
7e381881 1049 simap_increase(usage, "raft-log", raft->log_end - raft->log_start);
3423cd97
IM
1050}
1051
1b1d2e6d
BP
1052/* Returns true if 'raft' has completed joining its cluster, has not left or
1053 * initiated leaving the cluster, does not have failed disk storage, and is
1054 * apparently connected to the leader in a healthy way (or is itself the
4d9b28cb
HZ
1055 * leader).
1056 *
1057 * If 'raft' is candidate:
1058 * a) if it is the first round of election, consider it as connected, hoping
1059 * it will successfully elect a new leader soon.
1060 * b) if it is already retrying, consider it as disconnected (so that clients
1061 * may decide to reconnect to other members). */
1b1d2e6d
BP
1062bool
1063raft_is_connected(const struct raft *raft)
1064{
93023e80 1065 static bool last_state = false;
923f01ca 1066 bool ret = (!raft->candidate_retrying
1b1d2e6d
BP
1067 && !raft->joining
1068 && !raft->leaving
1069 && !raft->left
2833885f
HZ
1070 && !raft->failed
1071 && raft->ever_had_leader);
93023e80
IM
1072
1073 if (!ret) {
1074 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1075 VLOG_DBG_RL(&rl, "raft_is_connected: false");
1076 } else if (!last_state) {
1077 VLOG_DBG("raft_is_connected: true");
1078 }
1079 last_state = ret;
1080
923f01ca 1081 return ret;
1b1d2e6d
BP
1082}
1083
1084/* Returns true if 'raft' is the cluster leader. */
1085bool
1086raft_is_leader(const struct raft *raft)
1087{
1088 return raft->role == RAFT_LEADER;
1089}
1090
1091/* Returns true if 'raft' is the process of joining its cluster. */
1092bool
1093raft_is_joining(const struct raft *raft)
1094{
1095 return raft->joining;
1096}
1097
1098/* Only returns *connected* connections. */
1099static struct raft_conn *
1100raft_find_conn_by_sid(struct raft *raft, const struct uuid *sid)
1101{
1102 if (!uuid_is_zero(sid)) {
1103 struct raft_conn *conn;
1104 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1105 if (uuid_equals(sid, &conn->sid)
1106 && jsonrpc_session_is_connected(conn->js)) {
1107 return conn;
1108 }
1109 }
1110 }
1111 return NULL;
1112}
1113
1114static struct raft_conn *
1115raft_find_conn_by_address(struct raft *raft, const char *address)
1116{
1117 struct raft_conn *conn;
1118 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1119 if (!strcmp(jsonrpc_session_get_name(conn->js), address)) {
1120 return conn;
1121 }
1122 }
1123 return NULL;
1124}
1125
1126static void OVS_PRINTF_FORMAT(3, 4)
1127raft_record_note(struct raft *raft, const char *note,
1128 const char *comment_format, ...)
1129{
1130 va_list args;
1131 va_start(args, comment_format);
1132 char *comment = xvasprintf(comment_format, args);
1133 va_end(args);
1134
1135 struct raft_record r = {
1136 .type = RAFT_REC_NOTE,
1137 .comment = comment,
1138 .note = CONST_CAST(char *, note),
1139 };
1140 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
1141
1142 free(comment);
1143}
1144
1145/* If we're leader, try to transfer leadership to another server, logging
1146 * 'reason' as the human-readable reason (it should be a phrase suitable for
1147 * following "because") . */
1148void
1149raft_transfer_leadership(struct raft *raft, const char *reason)
1150{
1151 if (raft->role != RAFT_LEADER) {
1152 return;
1153 }
1154
1155 struct raft_server *s;
1156 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1157 if (!uuid_equals(&raft->sid, &s->sid)
1158 && s->phase == RAFT_PHASE_STABLE) {
1159 struct raft_conn *conn = raft_find_conn_by_sid(raft, &s->sid);
1160 if (!conn) {
1161 continue;
1162 }
1163
1164 union raft_rpc rpc = {
1165 .become_leader = {
1166 .common = {
1167 .comment = CONST_CAST(char *, reason),
1168 .type = RAFT_RPC_BECOME_LEADER,
1169 .sid = s->sid,
1170 },
1171 .term = raft->term,
1172 }
1173 };
02acb41a 1174 raft_send_to_conn(raft, &rpc, conn);
1b1d2e6d
BP
1175
1176 raft_record_note(raft, "transfer leadership",
1177 "transferring leadership to %s because %s",
1178 s->nickname, reason);
1179 break;
1180 }
1181 }
1182}
1183
1184/* Send a RemoveServerRequest to the rest of the servers in the cluster.
1185 *
1186 * If we know which server is the leader, we can just send the request to it.
1187 * However, we might not know which server is the leader, and we might never
1188 * find out if the remove request was actually previously committed by a
1189 * majority of the servers (because in that case the new leader will not send
1190 * AppendRequests or heartbeats to us). Therefore, we instead send
1191 * RemoveRequests to every server. This theoretically has the same problem, if
1192 * the current cluster leader was not previously a member of the cluster, but
1193 * it seems likely to be more robust in practice. */
1194static void
1195raft_send_remove_server_requests(struct raft *raft)
1196{
1197 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1198 VLOG_INFO_RL(&rl, "sending remove request (joining=%s, leaving=%s)",
1199 raft->joining ? "true" : "false",
1200 raft->leaving ? "true" : "false");
1201 const struct raft_server *s;
1202 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1203 if (!uuid_equals(&s->sid, &raft->sid)) {
1204 union raft_rpc rpc = (union raft_rpc) {
1205 .remove_server_request = {
1206 .common = {
1207 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1208 .sid = s->sid,
1209 },
1210 .sid = raft->sid,
1211 },
1212 };
1213 raft_send(raft, &rpc);
1214 }
1215 }
1216
8e354614 1217 raft->leave_timeout = time_msec() + raft->election_timer;
1b1d2e6d
BP
1218}
1219
1220/* Attempts to start 'raft' leaving its cluster. The caller can check progress
1221 * using raft_is_leaving() and raft_left(). */
1222void
1223raft_leave(struct raft *raft)
1224{
1225 if (raft->joining || raft->failed || raft->leaving || raft->left) {
1226 return;
1227 }
1228 VLOG_INFO(SID_FMT": starting to leave cluster "CID_FMT,
1229 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1230 raft->leaving = true;
1231 raft_transfer_leadership(raft, "this server is leaving the cluster");
1232 raft_become_follower(raft);
1233 raft_send_remove_server_requests(raft);
8e354614 1234 raft->leave_timeout = time_msec() + raft->election_timer;
1b1d2e6d
BP
1235}
1236
1237/* Returns true if 'raft' is currently attempting to leave its cluster. */
1238bool
1239raft_is_leaving(const struct raft *raft)
1240{
1241 return raft->leaving;
1242}
1243
1244/* Returns true if 'raft' successfully left its cluster. */
1245bool
1246raft_left(const struct raft *raft)
1247{
1248 return raft->left;
1249}
1250
1251/* Returns true if 'raft' has experienced a disk I/O failure. When this
1252 * returns true, only closing and reopening 'raft' allows for recovery. */
1253bool
1254raft_failed(const struct raft *raft)
1255{
1256 return raft->failed;
1257}
1258
1259/* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1260 * current cluster. */
1261void
1262raft_take_leadership(struct raft *raft)
1263{
1264 if (raft->role != RAFT_LEADER) {
1265 raft_start_election(raft, true);
1266 }
1267}
1268
1269/* Closes everything owned by 'raft' that might be visible outside the process:
1270 * network connections, commands, etc. This is part of closing 'raft'; it is
1271 * also used if 'raft' has failed in an unrecoverable way. */
1272static void
1273raft_close__(struct raft *raft)
1274{
1275 if (!hmap_node_is_null(&raft->hmap_node)) {
1276 hmap_remove(&all_rafts, &raft->hmap_node);
1277 hmap_node_nullify(&raft->hmap_node);
1278 }
1279
1280 raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
1281
1282 struct raft_server *rs = raft->remove_server;
1283 if (rs) {
1284 raft_send_remove_server_reply__(raft, &rs->sid, &rs->requester_sid,
1285 rs->requester_conn, false,
1286 RAFT_SERVER_SHUTDOWN);
1287 raft_server_destroy(raft->remove_server);
1288 raft->remove_server = NULL;
1289 }
1290
1291 struct raft_conn *conn, *next;
1292 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1293 raft_conn_close(conn);
1294 }
1295}
1296
1297/* Closes and frees 'raft'.
1298 *
1299 * A server's cluster membership is independent of whether the server is
1300 * actually running. When a server that is a member of a cluster closes, the
1301 * cluster treats this as a server failure. */
1302void
1303raft_close(struct raft *raft)
1304{
1305 if (!raft) {
1306 return;
1307 }
1308
1309 raft_transfer_leadership(raft, "this server is shutting down");
1310
1311 raft_close__(raft);
1312
1313 ovsdb_log_close(raft->log);
1314
1315 raft_servers_destroy(&raft->servers);
1316
1317 for (uint64_t index = raft->log_start; index < raft->log_end; index++) {
1318 struct raft_entry *e = &raft->entries[index - raft->log_start];
1319 raft_entry_uninit(e);
1320 }
1321 free(raft->entries);
1322
1323 raft_entry_uninit(&raft->snap);
1324
1325 raft_waiters_destroy(raft);
1326
1327 raft_servers_destroy(&raft->add_servers);
1328
1329 hmap_destroy(&raft->commands);
1330
1331 pstream_close(raft->listener);
1332
1333 sset_destroy(&raft->remote_addresses);
1334 free(raft->local_address);
1335 free(raft->local_nickname);
1336 free(raft->name);
1337
1338 free(raft);
1339}
1340
1341static bool
1342raft_conn_receive(struct raft *raft, struct raft_conn *conn,
1343 union raft_rpc *rpc)
1344{
1345 struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
1346 if (!msg) {
1347 return false;
1348 }
1349
1350 struct ovsdb_error *error = raft_rpc_from_jsonrpc(&raft->cid, &raft->sid,
1351 msg, rpc);
1352 jsonrpc_msg_destroy(msg);
1353 if (error) {
1354 char *s = ovsdb_error_to_string_free(error);
1355 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn->js), s);
1356 free(s);
1357 return false;
1358 }
1359
1360 if (uuid_is_zero(&conn->sid)) {
1361 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1362 conn->sid = rpc->common.sid;
1363 VLOG_INFO_RL(&rl, "%s: learned server ID "SID_FMT,
1364 jsonrpc_session_get_name(conn->js), SID_ARGS(&conn->sid));
1365 } else if (!uuid_equals(&conn->sid, &rpc->common.sid)) {
1366 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1367 VLOG_WARN_RL(&rl, "%s: ignoring message with unexpected server ID "
1368 SID_FMT" (expected "SID_FMT")",
1369 jsonrpc_session_get_name(conn->js),
1370 SID_ARGS(&rpc->common.sid), SID_ARGS(&conn->sid));
1371 raft_rpc_uninit(rpc);
1372 return false;
1373 }
1374
1375 const char *address = (rpc->type == RAFT_RPC_HELLO_REQUEST
1376 ? rpc->hello_request.address
1377 : rpc->type == RAFT_RPC_ADD_SERVER_REQUEST
1378 ? rpc->add_server_request.address
1379 : NULL);
1380 if (address) {
1381 char *new_nickname = raft_address_to_nickname(address, &conn->sid);
1382 if (strcmp(conn->nickname, new_nickname)) {
1383 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1384 VLOG_INFO_RL(&rl, "%s: learned remote address %s",
1385 jsonrpc_session_get_name(conn->js), address);
1386
1387 free(conn->nickname);
1388 conn->nickname = new_nickname;
1389 } else {
1390 free(new_nickname);
1391 }
1392 }
1393
1394 return true;
1395}
1396
1397static const char *
1398raft_get_nickname(const struct raft *raft, const struct uuid *sid,
1399 char buf[SID_LEN + 1], size_t bufsize)
1400{
1401 if (uuid_equals(sid, &raft->sid)) {
1402 return raft->local_nickname;
1403 }
1404
1405 const char *s = raft_servers_get_nickname__(&raft->servers, sid);
1406 if (s) {
1407 return s;
1408 }
1409
1410 return raft_servers_get_nickname(&raft->add_servers, sid, buf, bufsize);
1411}
1412
1413static void
02acb41a
BP
1414log_rpc(const union raft_rpc *rpc, const char *direction,
1415 const struct raft_conn *conn, int line_number)
1b1d2e6d
BP
1416{
1417 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
1418 if (!raft_rpc_is_heartbeat(rpc) && !VLOG_DROP_DBG(&rl)) {
1419 struct ds s = DS_EMPTY_INITIALIZER;
02acb41a
BP
1420 if (line_number) {
1421 ds_put_format(&s, "raft.c:%d ", line_number);
1422 }
1423 ds_put_format(&s, "%s%s ", direction, conn->nickname);
1b1d2e6d 1424 raft_rpc_format(rpc, &s);
02acb41a 1425 VLOG_DBG("%s", ds_cstr(&s));
1b1d2e6d
BP
1426 ds_destroy(&s);
1427 }
1428}
1429
1430static void
1431raft_send_add_server_request(struct raft *raft, struct raft_conn *conn)
1432{
1433 union raft_rpc rq = {
1434 .add_server_request = {
1435 .common = {
1436 .type = RAFT_RPC_ADD_SERVER_REQUEST,
1437 .sid = UUID_ZERO,
1438 .comment = NULL,
1439 },
1440 .address = raft->local_address,
1441 },
1442 };
02acb41a 1443 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1444}
1445
1446static void
1447raft_conn_run(struct raft *raft, struct raft_conn *conn)
1448{
1449 jsonrpc_session_run(conn->js);
1450
1451 unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
8c2c503b
IM
1452 bool reconnected = new_seqno != conn->js_seqno;
1453 bool just_connected = (reconnected
1b1d2e6d 1454 && jsonrpc_session_is_connected(conn->js));
8c2c503b
IM
1455
1456 if (reconnected) {
83fbd2e9
IM
1457 /* Clear 'install_snapshot_request_in_progress' since it might not
1458 * reach the destination or server was restarted. */
8c2c503b
IM
1459 struct raft_server *server = raft_find_server(raft, &conn->sid);
1460 if (server) {
83fbd2e9 1461 server->install_snapshot_request_in_progress = false;
8c2c503b
IM
1462 }
1463 }
1464
1b1d2e6d
BP
1465 conn->js_seqno = new_seqno;
1466 if (just_connected) {
1467 if (raft->joining) {
1468 raft_send_add_server_request(raft, conn);
1469 } else if (raft->leaving) {
1470 union raft_rpc rq = {
1471 .remove_server_request = {
1472 .common = {
1473 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1474 .sid = conn->sid,
1475 },
1476 .sid = raft->sid,
1477 },
1478 };
02acb41a 1479 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1480 } else {
1481 union raft_rpc rq = (union raft_rpc) {
1482 .hello_request = {
1483 .common = {
1484 .type = RAFT_RPC_HELLO_REQUEST,
1485 .sid = conn->sid,
1486 },
1487 .address = raft->local_address,
1488 },
1489 };
02acb41a 1490 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1491 }
1492 }
1493
1494 for (size_t i = 0; i < 50; i++) {
1495 union raft_rpc rpc;
1496 if (!raft_conn_receive(raft, conn, &rpc)) {
1497 break;
1498 }
1499
02acb41a 1500 log_rpc(&rpc, "<--", conn, 0);
1b1d2e6d
BP
1501 raft_handle_rpc(raft, &rpc);
1502 raft_rpc_uninit(&rpc);
1503 }
1504}
1505
1506static void
1507raft_waiter_complete_rpc(struct raft *raft, const union raft_rpc *rpc)
1508{
1509 uint64_t term = raft_rpc_get_term(rpc);
1510 if (term && term < raft->term) {
1511 /* Drop the message because it's for an expired term. */
1512 return;
1513 }
1514
1515 if (!raft_is_rpc_synced(raft, rpc)) {
1516 /* This is a bug. A reply message is deferred because some state in
1517 * the message, such as a term or index, has not been committed to
1518 * disk, and they should only be completed when that commit is done.
1519 * But this message is being completed before the commit is finished.
1520 * Complain, and hope that someone reports the bug. */
1521 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1522 if (VLOG_DROP_ERR(&rl)) {
1523 return;
1524 }
1525
1526 struct ds s = DS_EMPTY_INITIALIZER;
1527
1528 if (term > raft->synced_term) {
1529 ds_put_format(&s, " because message term %"PRIu64" is "
1530 "past synced term %"PRIu64,
1531 term, raft->synced_term);
1532 }
1533
1534 uint64_t index = raft_rpc_get_min_sync_index(rpc);
1535 if (index > raft->log_synced) {
1536 ds_put_format(&s, " %s message index %"PRIu64" is past last "
1537 "synced index %"PRIu64,
1538 s.length ? "and" : "because",
1539 index, raft->log_synced);
1540 }
1541
1542 const struct uuid *vote = raft_rpc_get_vote(rpc);
1543 if (vote && !uuid_equals(vote, &raft->synced_vote)) {
1544 char buf1[SID_LEN + 1];
1545 char buf2[SID_LEN + 1];
1546 ds_put_format(&s, " %s vote %s differs from synced vote %s",
1547 s.length ? "and" : "because",
1548 raft_get_nickname(raft, vote, buf1, sizeof buf1),
1549 raft_get_nickname(raft, &raft->synced_vote,
1550 buf2, sizeof buf2));
1551 }
1552
1553 char buf[SID_LEN + 1];
1554 ds_put_format(&s, ": %s ",
1555 raft_get_nickname(raft, &rpc->common.sid,
1556 buf, sizeof buf));
1557 raft_rpc_format(rpc, &s);
1558 VLOG_ERR("internal error: deferred %s message completed "
1559 "but not ready to send%s",
1560 raft_rpc_type_to_string(rpc->type), ds_cstr(&s));
1561 ds_destroy(&s);
1562
1563 return;
1564 }
1565
1566 struct raft_conn *dst = raft_find_conn_by_sid(raft, &rpc->common.sid);
1567 if (dst) {
02acb41a 1568 raft_send_to_conn(raft, rpc, dst);
1b1d2e6d
BP
1569 }
1570}
1571
1572static void
1573raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
1574{
1575 switch (w->type) {
1576 case RAFT_W_ENTRY:
1577 if (raft->role == RAFT_LEADER) {
1578 raft_update_our_match_index(raft, w->entry.index);
1579 }
1580 raft->log_synced = w->entry.index;
1581 break;
1582
1583 case RAFT_W_TERM:
1584 raft->synced_term = w->term.term;
1585 raft->synced_vote = w->term.vote;
1586 break;
1587
1588 case RAFT_W_RPC:
1589 raft_waiter_complete_rpc(raft, w->rpc);
1590 break;
1591 }
1592}
1593
1594static void
1595raft_waiter_destroy(struct raft_waiter *w)
1596{
1597 if (!w) {
1598 return;
1599 }
1600
1601 ovs_list_remove(&w->list_node);
1602
1603 switch (w->type) {
1604 case RAFT_W_ENTRY:
1605 case RAFT_W_TERM:
1606 break;
1607
1608 case RAFT_W_RPC:
1609 raft_rpc_uninit(w->rpc);
1610 free(w->rpc);
1611 break;
1612 }
1613 free(w);
1614}
1615
1616static void
1617raft_waiters_run(struct raft *raft)
1618{
1619 if (ovs_list_is_empty(&raft->waiters)) {
1620 return;
1621 }
1622
1623 uint64_t cur = ovsdb_log_commit_progress(raft->log);
1624 struct raft_waiter *w, *next;
1625 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1626 if (cur < w->commit_ticket) {
1627 break;
1628 }
1629 raft_waiter_complete(raft, w);
1630 raft_waiter_destroy(w);
1631 }
1632}
1633
1634static void
1635raft_waiters_wait(struct raft *raft)
1636{
1637 struct raft_waiter *w;
1638 LIST_FOR_EACH (w, list_node, &raft->waiters) {
1639 ovsdb_log_commit_wait(raft->log, w->commit_ticket);
1640 break;
1641 }
1642}
1643
1644static void
1645raft_waiters_destroy(struct raft *raft)
1646{
1647 struct raft_waiter *w, *next;
1648 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1649 raft_waiter_destroy(w);
1650 }
1651}
1652
1653static bool OVS_WARN_UNUSED_RESULT
1654raft_set_term(struct raft *raft, uint64_t term, const struct uuid *vote)
1655{
1656 struct ovsdb_error *error = raft_write_state(raft->log, term, vote);
1657 if (!raft_handle_write_error(raft, error)) {
1658 return false;
1659 }
1660
1661 struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_TERM, true);
1662 raft->term = w->term.term = term;
1663 raft->vote = w->term.vote = vote ? *vote : UUID_ZERO;
1664 return true;
1665}
1666
1667static void
1668raft_accept_vote(struct raft *raft, struct raft_server *s,
1669 const struct uuid *vote)
1670{
1671 if (uuid_equals(&s->vote, vote)) {
1672 return;
1673 }
1674 if (!uuid_is_zero(&s->vote)) {
1675 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1676 char buf1[SID_LEN + 1];
1677 char buf2[SID_LEN + 1];
1678 VLOG_WARN_RL(&rl, "server %s changed its vote from %s to %s",
1679 s->nickname,
1680 raft_get_nickname(raft, &s->vote, buf1, sizeof buf1),
1681 raft_get_nickname(raft, vote, buf2, sizeof buf2));
1682 }
1683 s->vote = *vote;
1684 if (uuid_equals(vote, &raft->sid)
1685 && ++raft->n_votes > hmap_count(&raft->servers) / 2) {
1686 raft_become_leader(raft);
1687 }
1688}
1689
1690static void
1691raft_start_election(struct raft *raft, bool leadership_transfer)
1692{
1693 if (raft->leaving) {
1694 return;
1695 }
1696
1697 struct raft_server *me = raft_find_server(raft, &raft->sid);
1698 if (!me) {
1699 return;
1700 }
1701
1702 if (!raft_set_term(raft, raft->term + 1, &raft->sid)) {
1703 return;
1704 }
1705
1b1d2e6d 1706 ovs_assert(raft->role != RAFT_LEADER);
93ee4209 1707
cdae6100 1708 raft->leader_sid = UUID_ZERO;
1b1d2e6d 1709 raft->role = RAFT_CANDIDATE;
923f01ca
HZ
1710 /* If there was no leader elected since last election, we know we are
1711 * retrying now. */
1712 raft->candidate_retrying = !raft->had_leader;
1713 raft->had_leader = false;
1b1d2e6d
BP
1714
1715 raft->n_votes = 0;
1716
e8451e14
LB
1717 raft->election_start = time_msec();
1718 raft->election_won = 0;
1719 raft->leadership_transfer = leadership_transfer;
1720
1b1d2e6d
BP
1721 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1722 if (!VLOG_DROP_INFO(&rl)) {
1723 long long int now = time_msec();
1724 if (now >= raft->election_timeout) {
1725 VLOG_INFO("term %"PRIu64": %lld ms timeout expired, "
1726 "starting election",
1727 raft->term, now - raft->election_base);
1728 } else {
1729 VLOG_INFO("term %"PRIu64": starting election", raft->term);
1730 }
1731 }
0f954f32 1732 raft_reset_election_timer(raft);
1b1d2e6d
BP
1733
1734 struct raft_server *peer;
1735 HMAP_FOR_EACH (peer, hmap_node, &raft->servers) {
1736 peer->vote = UUID_ZERO;
1737 if (uuid_equals(&raft->sid, &peer->sid)) {
1738 continue;
1739 }
1740
1741 union raft_rpc rq = {
1742 .vote_request = {
1743 .common = {
1744 .type = RAFT_RPC_VOTE_REQUEST,
1745 .sid = peer->sid,
1746 },
1747 .term = raft->term,
1748 .last_log_index = raft->log_end - 1,
1749 .last_log_term = (
1750 raft->log_end > raft->log_start
1751 ? raft->entries[raft->log_end - raft->log_start - 1].term
1752 : raft->snap.term),
1753 .leadership_transfer = leadership_transfer,
1754 },
1755 };
93ee4209
HZ
1756 if (failure_test != FT_DONT_SEND_VOTE_REQUEST) {
1757 raft_send(raft, &rq);
1758 }
1b1d2e6d
BP
1759 }
1760
1761 /* Vote for ourselves. */
1762 raft_accept_vote(raft, me, &raft->sid);
1763}
1764
1765static void
1766raft_open_conn(struct raft *raft, const char *address, const struct uuid *sid)
1767{
1768 if (strcmp(address, raft->local_address)
1769 && !raft_find_conn_by_address(raft, address)) {
1770 raft_add_conn(raft, jsonrpc_session_open(address, true), sid, false);
1771 }
1772}
1773
1774static void
1775raft_conn_close(struct raft_conn *conn)
1776{
1777 jsonrpc_session_close(conn->js);
1778 ovs_list_remove(&conn->list_node);
1779 free(conn->nickname);
1780 free(conn);
1781}
1782
1783/* Returns true if 'conn' should stay open, false if it should be closed. */
1784static bool
1785raft_conn_should_stay_open(struct raft *raft, struct raft_conn *conn)
1786{
1787 /* Close the connection if it's actually dead. If necessary, we'll
1788 * initiate a new session later. */
1789 if (!jsonrpc_session_is_alive(conn->js)) {
1790 return false;
1791 }
1792
1793 /* Keep incoming sessions. We trust the originator to decide to drop
1794 * it. */
1795 if (conn->incoming) {
1796 return true;
1797 }
1798
1799 /* If we are joining the cluster, keep sessions to the remote addresses
1800 * that are supposed to be part of the cluster we're joining. */
1801 if (raft->joining && sset_contains(&raft->remote_addresses,
1802 jsonrpc_session_get_name(conn->js))) {
1803 return true;
1804 }
1805
1806 /* We have joined the cluster. If we did that "recently", then there is a
1807 * chance that we do not have the most recent server configuration log
1808 * entry. If so, it's a waste to disconnect from the servers that were in
1809 * remote_addresses and that will probably appear in the configuration,
1810 * just to reconnect to them a moment later when we do get the
1811 * configuration update. If we are not ourselves in the configuration,
1812 * then we know that there must be a new configuration coming up, so in
1813 * that case keep the connection. */
1814 if (!raft_find_server(raft, &raft->sid)) {
1815 return true;
1816 }
1817
1818 /* Keep the connection only if the server is part of the configuration. */
1819 return raft_find_server(raft, &conn->sid);
1820}
1821
1822/* Allows 'raft' to maintain the distributed log. Call this function as part
1823 * of the process's main loop. */
1824void
1825raft_run(struct raft *raft)
1826{
1827 if (raft->left || raft->failed) {
1828 return;
1829 }
1830
1831 raft_waiters_run(raft);
1832
1833 if (!raft->listener && time_msec() >= raft->listen_backoff) {
1834 char *paddr = raft_make_address_passive(raft->local_address);
1835 int error = pstream_open(paddr, &raft->listener, DSCP_DEFAULT);
1836 if (error) {
1837 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1838 VLOG_WARN_RL(&rl, "%s: listen failed (%s)",
1839 paddr, ovs_strerror(error));
1840 raft->listen_backoff = time_msec() + 1000;
1841 }
1842 free(paddr);
1843 }
1844
1845 if (raft->listener) {
1846 struct stream *stream;
1847 int error = pstream_accept(raft->listener, &stream);
1848 if (!error) {
1849 raft_add_conn(raft, jsonrpc_session_open_unreliably(
1850 jsonrpc_open(stream), DSCP_DEFAULT), NULL,
1851 true);
1852 } else if (error != EAGAIN) {
1853 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1854 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
1855 pstream_get_name(raft->listener),
1856 ovs_strerror(error));
1857 }
1858 }
1859
1860 /* Run RPCs for all open sessions. */
1861 struct raft_conn *conn;
1862 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1863 raft_conn_run(raft, conn);
1864 }
1865
1866 /* Close unneeded sessions. */
1867 struct raft_conn *next;
1868 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1869 if (!raft_conn_should_stay_open(raft, conn)) {
e8451e14 1870 raft->n_disconnections++;
1b1d2e6d
BP
1871 raft_conn_close(conn);
1872 }
1873 }
1874
1875 /* Open needed sessions. */
1876 struct raft_server *server;
1877 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1878 raft_open_conn(raft, server->address, &server->sid);
1879 }
1880 if (raft->joining) {
1881 const char *address;
1882 SSET_FOR_EACH (address, &raft->remote_addresses) {
1883 raft_open_conn(raft, address, NULL);
1884 }
1885 }
1886
1887 if (!raft->joining && time_msec() >= raft->election_timeout) {
89771c1e
HZ
1888 if (raft->role == RAFT_LEADER) {
1889 /* Check if majority of followers replied, then reset
1890 * election_timeout and reset s->replied. Otherwise, become
1891 * follower.
1892 *
1893 * Raft paper section 6.2: Leaders: A server might be in the leader
1894 * state, but if it isn’t the current leader, it could be
1895 * needlessly delaying client requests. For example, suppose a
1896 * leader is partitioned from the rest of the cluster, but it can
1897 * still communicate with a particular client. Without additional
1898 * mechanism, it could delay a request from that client forever,
1899 * being unable to replicate a log entry to any other servers.
1900 * Meanwhile, there might be another leader of a newer term that is
1901 * able to communicate with a majority of the cluster and would be
1902 * able to commit the client’s request. Thus, a leader in Raft
1903 * steps down if an election timeout elapses without a successful
1904 * round of heartbeats to a majority of its cluster; this allows
1905 * clients to retry their requests with another server. */
1906 int count = 0;
1907 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1908 if (server->replied) {
1909 count ++;
1910 }
1911 }
1912 if (count >= hmap_count(&raft->servers) / 2) {
1913 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1914 server->replied = false;
1915 }
1916 raft_reset_election_timer(raft);
1917 } else {
1918 raft_become_follower(raft);
1919 raft_start_election(raft, false);
1920 }
1921 } else {
1922 raft_start_election(raft, false);
1923 }
1924
1b1d2e6d
BP
1925 }
1926
1927 if (raft->leaving && time_msec() >= raft->leave_timeout) {
1928 raft_send_remove_server_requests(raft);
1929 }
1930
1931 if (raft->joining && time_msec() >= raft->join_timeout) {
1932 raft->join_timeout = time_msec() + 1000;
1933 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1934 raft_send_add_server_request(raft, conn);
1935 }
1936 }
1937
5a9b53a5
HZ
1938 long long int now = time_msec();
1939 if (now >= raft->ping_timeout) {
1b1d2e6d
BP
1940 if (raft->role == RAFT_LEADER) {
1941 raft_send_heartbeats(raft);
5a9b53a5
HZ
1942 }
1943 /* Check if any commands timeout. Timeout is set to twice the time of
1944 * election base time so that commands can complete properly during
1945 * leader election. E.g. a leader crashed and current node with pending
1946 * commands becomes new leader: the pending commands can still complete
1947 * if the crashed leader has replicated the transactions to majority of
1948 * followers before it crashed. */
1949 struct raft_command *cmd, *next_cmd;
1950 HMAP_FOR_EACH_SAFE (cmd, next_cmd, hmap_node, &raft->commands) {
1951 if (cmd->timestamp
8e354614 1952 && now - cmd->timestamp > raft->election_timer * 2) {
5a9b53a5 1953 raft_command_complete(raft, cmd, RAFT_CMD_TIMEOUT);
1b1d2e6d
BP
1954 }
1955 }
f4bef8b3 1956 raft_reset_ping_timer(raft);
1b1d2e6d
BP
1957 }
1958
1959 /* Do this only at the end; if we did it as soon as we set raft->left or
1960 * raft->failed in handling the RemoveServerReply, then it could easily
1961 * cause references to freed memory in RPC sessions, etc. */
1962 if (raft->left || raft->failed) {
1963 raft_close__(raft);
1964 }
1965}
1966
1967static void
1968raft_wait_session(struct jsonrpc_session *js)
1969{
1970 if (js) {
1971 jsonrpc_session_wait(js);
1972 jsonrpc_session_recv_wait(js);
1973 }
1974}
1975
1976/* Causes the next call to poll_block() to wake up when 'raft' needs to do
1977 * something. */
1978void
1979raft_wait(struct raft *raft)
1980{
1981 if (raft->left || raft->failed) {
1982 return;
1983 }
1984
1985 raft_waiters_wait(raft);
1986
1987 if (raft->listener) {
1988 pstream_wait(raft->listener);
1989 } else {
1990 poll_timer_wait_until(raft->listen_backoff);
1991 }
1992
1993 struct raft_conn *conn;
1994 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1995 raft_wait_session(conn->js);
1996 }
1997
1998 if (!raft->joining) {
1999 poll_timer_wait_until(raft->election_timeout);
2000 } else {
2001 poll_timer_wait_until(raft->join_timeout);
2002 }
2003 if (raft->leaving) {
2004 poll_timer_wait_until(raft->leave_timeout);
2005 }
2006 if (raft->role == RAFT_LEADER || !hmap_is_empty(&raft->commands)) {
2007 poll_timer_wait_until(raft->ping_timeout);
2008 }
2009}
2010
2011static struct raft_waiter *
2012raft_waiter_create(struct raft *raft, enum raft_waiter_type type,
2013 bool start_commit)
2014{
2015 struct raft_waiter *w = xzalloc(sizeof *w);
2016 ovs_list_push_back(&raft->waiters, &w->list_node);
2017 w->commit_ticket = start_commit ? ovsdb_log_commit_start(raft->log) : 0;
2018 w->type = type;
2019 return w;
2020}
2021
2022/* Returns a human-readable representation of 'status' (or NULL if 'status' is
2023 * invalid). */
2024const char *
2025raft_command_status_to_string(enum raft_command_status status)
2026{
2027 switch (status) {
2028 case RAFT_CMD_INCOMPLETE:
2029 return "operation still in progress";
2030 case RAFT_CMD_SUCCESS:
2031 return "success";
2032 case RAFT_CMD_NOT_LEADER:
2033 return "not leader";
2034 case RAFT_CMD_BAD_PREREQ:
2035 return "prerequisite check failed";
2036 case RAFT_CMD_LOST_LEADERSHIP:
2037 return "lost leadership";
2038 case RAFT_CMD_SHUTDOWN:
2039 return "server shutdown";
2040 case RAFT_CMD_IO_ERROR:
2041 return "I/O error";
2042 case RAFT_CMD_TIMEOUT:
2043 return "timeout";
2044 default:
2045 return NULL;
2046 }
2047}
2048
2049/* Converts human-readable status in 's' into status code in '*statusp'.
2050 * Returns true if successful, false if 's' is unknown. */
2051bool
2052raft_command_status_from_string(const char *s,
2053 enum raft_command_status *statusp)
2054{
2055 for (enum raft_command_status status = 0; ; status++) {
2056 const char *s2 = raft_command_status_to_string(status);
2057 if (!s2) {
2058 *statusp = 0;
2059 return false;
2060 } else if (!strcmp(s, s2)) {
2061 *statusp = status;
2062 return true;
2063 }
2064 }
2065}
2066
2067static const struct uuid *
2068raft_get_eid(const struct raft *raft, uint64_t index)
2069{
2070 for (; index >= raft->log_start; index--) {
2071 const struct raft_entry *e = raft_get_entry(raft, index);
2072 if (e->data) {
2073 return &e->eid;
2074 }
2075 }
2076 return &raft->snap.eid;
2077}
2078
2cd62f75 2079const struct uuid *
1b1d2e6d
BP
2080raft_current_eid(const struct raft *raft)
2081{
2082 return raft_get_eid(raft, raft->log_end - 1);
2083}
2084
2085static struct raft_command *
2086raft_command_create_completed(enum raft_command_status status)
2087{
2088 ovs_assert(status != RAFT_CMD_INCOMPLETE);
2089
2090 struct raft_command *cmd = xzalloc(sizeof *cmd);
2091 cmd->n_refs = 1;
2092 cmd->status = status;
2093 return cmd;
2094}
2095
2096static struct raft_command *
2097raft_command_create_incomplete(struct raft *raft, uint64_t index)
2098{
2099 struct raft_command *cmd = xzalloc(sizeof *cmd);
2100 cmd->n_refs = 2; /* One for client, one for raft->commands. */
2101 cmd->index = index;
2102 cmd->status = RAFT_CMD_INCOMPLETE;
2103 hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
2104 return cmd;
2105}
2106
2107static struct raft_command * OVS_WARN_UNUSED_RESULT
2108raft_command_initiate(struct raft *raft,
2109 const struct json *data, const struct json *servers,
8e354614 2110 uint64_t election_timer, const struct uuid *eid)
1b1d2e6d
BP
2111{
2112 /* Write to local log. */
2113 uint64_t index = raft->log_end;
2114 if (!raft_handle_write_error(
2115 raft, raft_write_entry(
2116 raft, raft->term, json_nullable_clone(data), eid,
8e354614
HZ
2117 json_nullable_clone(servers),
2118 election_timer))) {
1b1d2e6d
BP
2119 return raft_command_create_completed(RAFT_CMD_IO_ERROR);
2120 }
2121
2122 struct raft_command *cmd = raft_command_create_incomplete(raft, index);
7ef36089
HZ
2123 ovs_assert(eid);
2124 cmd->eid = *eid;
5a9b53a5 2125 cmd->timestamp = time_msec();
1b1d2e6d
BP
2126
2127 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index = cmd->index;
2128
67dba070
HZ
2129 if (failure_test == FT_CRASH_BEFORE_SEND_APPEND_REQ) {
2130 ovs_fatal(0, "Raft test: crash before sending append_request.");
2131 }
1b1d2e6d
BP
2132 /* Write to remote logs. */
2133 struct raft_server *s;
2134 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2135 if (!uuid_equals(&s->sid, &raft->sid) && s->next_index == index) {
2136 raft_send_append_request(raft, s, 1, "execute command");
2137 s->next_index++;
2138 }
2139 }
67dba070
HZ
2140 if (failure_test == FT_CRASH_AFTER_SEND_APPEND_REQ) {
2141 ovs_fatal(0, "Raft test: crash after sending append_request.");
2142 }
0f954f32 2143 raft_reset_ping_timer(raft);
1b1d2e6d
BP
2144
2145 return cmd;
2146}
2147
5a9b53a5
HZ
2148static void
2149log_all_commands(struct raft *raft)
2150{
2151 struct raft_command *cmd, *next;
2152 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2153 VLOG_DBG("raft command eid: "UUID_FMT, UUID_ARGS(&cmd->eid));
2154 }
2155}
2156
1b1d2e6d 2157static struct raft_command * OVS_WARN_UNUSED_RESULT
8e354614
HZ
2158raft_command_execute__(struct raft *raft, const struct json *data,
2159 const struct json *servers, uint64_t election_timer,
1b1d2e6d
BP
2160 const struct uuid *prereq, struct uuid *result)
2161{
2162 if (raft->joining || raft->leaving || raft->left || raft->failed) {
2163 return raft_command_create_completed(RAFT_CMD_SHUTDOWN);
2164 }
2165
2166 if (raft->role != RAFT_LEADER) {
2167 /* Consider proxying the command to the leader. We can only do that if
2168 * we know the leader and the command does not change the set of
2169 * servers. We do not proxy commands without prerequisites, even
2170 * though we could, because in an OVSDB context a log entry doesn't
2171 * make sense without context. */
8e354614 2172 if (servers || election_timer || !data
1b1d2e6d
BP
2173 || raft->role != RAFT_FOLLOWER || uuid_is_zero(&raft->leader_sid)
2174 || !prereq) {
2175 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2176 }
2177 }
2178
2179 struct uuid eid = data ? uuid_random() : UUID_ZERO;
2180 if (result) {
2181 *result = eid;
2182 }
2183
2184 if (raft->role != RAFT_LEADER) {
2185 const union raft_rpc rpc = {
2186 .execute_command_request = {
2187 .common = {
2188 .type = RAFT_RPC_EXECUTE_COMMAND_REQUEST,
2189 .sid = raft->leader_sid,
2190 },
2191 .data = CONST_CAST(struct json *, data),
2192 .prereq = *prereq,
2193 .result = eid,
2194 }
2195 };
67dba070
HZ
2196 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REQ) {
2197 ovs_fatal(0, "Raft test: crash before sending "
2198 "execute_command_request");
2199 }
1b1d2e6d
BP
2200 if (!raft_send(raft, &rpc)) {
2201 /* Couldn't send command, so it definitely failed. */
2202 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2203 }
67dba070
HZ
2204 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REQ) {
2205 ovs_fatal(0, "Raft test: crash after sending "
2206 "execute_command_request");
2207 }
1b1d2e6d
BP
2208
2209 struct raft_command *cmd = raft_command_create_incomplete(raft, 0);
2210 cmd->timestamp = time_msec();
2211 cmd->eid = eid;
5a9b53a5 2212 log_all_commands(raft);
1b1d2e6d
BP
2213 return cmd;
2214 }
2215
2216 const struct uuid *current_eid = raft_current_eid(raft);
2217 if (prereq && !uuid_equals(prereq, current_eid)) {
2218 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2219 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
2220 "prerequisite "UUID_FMT,
2221 UUID_ARGS(current_eid), UUID_ARGS(prereq));
2222 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ);
2223 }
2224
8e354614 2225 return raft_command_initiate(raft, data, servers, election_timer, &eid);
1b1d2e6d
BP
2226}
2227
2228/* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2229 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2230 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2231 * populated with the entry ID for the new log entry.
2232 *
2233 * Returns a "struct raft_command" that may be used to track progress adding
2234 * the log entry. The caller must eventually free the returned structure, with
2235 * raft_command_unref(). */
2236struct raft_command * OVS_WARN_UNUSED_RESULT
2237raft_command_execute(struct raft *raft, const struct json *data,
2238 const struct uuid *prereq, struct uuid *result)
2239{
8e354614 2240 return raft_command_execute__(raft, data, NULL, 0, prereq, result);
1b1d2e6d
BP
2241}
2242
2243/* Returns the status of 'cmd'. */
2244enum raft_command_status
2245raft_command_get_status(const struct raft_command *cmd)
2246{
2247 ovs_assert(cmd->n_refs > 0);
2248 return cmd->status;
2249}
2250
2251/* Returns the index of the log entry at which 'cmd' was committed.
2252 *
2253 * This function works only with successful commands. */
2254uint64_t
2255raft_command_get_commit_index(const struct raft_command *cmd)
2256{
2257 ovs_assert(cmd->n_refs > 0);
2258 ovs_assert(cmd->status == RAFT_CMD_SUCCESS);
2259 return cmd->index;
2260}
2261
2262/* Frees 'cmd'. */
2263void
2264raft_command_unref(struct raft_command *cmd)
2265{
2266 if (cmd) {
2267 ovs_assert(cmd->n_refs > 0);
2268 if (!--cmd->n_refs) {
2269 free(cmd);
2270 }
2271 }
2272}
2273
2274/* Causes poll_block() to wake up when 'cmd' has status to report. */
2275void
2276raft_command_wait(const struct raft_command *cmd)
2277{
2278 if (cmd->status != RAFT_CMD_INCOMPLETE) {
2279 poll_immediate_wake();
2280 }
2281}
2282
2283static void
2284raft_command_complete(struct raft *raft,
2285 struct raft_command *cmd,
2286 enum raft_command_status status)
2287{
5a9b53a5
HZ
2288 VLOG_DBG("raft_command_complete eid "UUID_FMT" status: %s",
2289 UUID_ARGS(&cmd->eid), raft_command_status_to_string(status));
1b1d2e6d
BP
2290 if (!uuid_is_zero(&cmd->sid)) {
2291 uint64_t commit_index = status == RAFT_CMD_SUCCESS ? cmd->index : 0;
2292 raft_send_execute_command_reply(raft, &cmd->sid, &cmd->eid, status,
2293 commit_index);
2294 }
2295
2296 ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
2297 ovs_assert(cmd->n_refs > 0);
2298 hmap_remove(&raft->commands, &cmd->hmap_node);
2299 cmd->status = status;
2300 raft_command_unref(cmd);
2301}
2302
2303static void
2304raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
2305{
2306 struct raft_command *cmd, *next;
2307 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2308 raft_command_complete(raft, cmd, status);
2309 }
2310}
2311
1b1d2e6d
BP
2312static struct raft_command *
2313raft_find_command_by_eid(struct raft *raft, const struct uuid *eid)
2314{
2315 struct raft_command *cmd;
2316
2317 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2318 if (uuid_equals(&cmd->eid, eid)) {
2319 return cmd;
2320 }
2321 }
2322 return NULL;
2323}
2324\f
2325#define RAFT_RPC(ENUM, NAME) \
2326 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2327RAFT_RPC_TYPES
2328#undef RAFT_RPC
2329
2330static void
2331raft_handle_hello_request(struct raft *raft OVS_UNUSED,
2332 const struct raft_hello_request *hello OVS_UNUSED)
2333{
2334}
2335
2336/* 'sid' is the server being added. */
2337static void
2338raft_send_add_server_reply__(struct raft *raft, const struct uuid *sid,
2339 const char *address,
2340 bool success, const char *comment)
2341{
2342 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2343 if (!VLOG_DROP_INFO(&rl)) {
2344 struct ds s = DS_EMPTY_INITIALIZER;
2345 char buf[SID_LEN + 1];
2346 ds_put_format(&s, "adding %s ("SID_FMT" at %s) "
2347 "to cluster "CID_FMT" %s",
2348 raft_get_nickname(raft, sid, buf, sizeof buf),
2349 SID_ARGS(sid), address, CID_ARGS(&raft->cid),
2350 success ? "succeeded" : "failed");
2351 if (comment) {
2352 ds_put_format(&s, " (%s)", comment);
2353 }
2354 VLOG_INFO("%s", ds_cstr(&s));
2355 ds_destroy(&s);
2356 }
2357
2358 union raft_rpc rpy = {
2359 .add_server_reply = {
2360 .common = {
2361 .type = RAFT_RPC_ADD_SERVER_REPLY,
2362 .sid = *sid,
2363 .comment = CONST_CAST(char *, comment),
2364 },
2365 .success = success,
2366 }
2367 };
2368
2369 struct sset *remote_addresses = &rpy.add_server_reply.remote_addresses;
2370 sset_init(remote_addresses);
2371 if (!raft->joining) {
2372 struct raft_server *s;
2373 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2374 if (!uuid_equals(&s->sid, &raft->sid)) {
2375 sset_add(remote_addresses, s->address);
2376 }
2377 }
2378 }
2379
2380 raft_send(raft, &rpy);
2381
2382 sset_destroy(remote_addresses);
2383}
2384
2385static void
17bd4149
BP
2386raft_send_remove_server_reply_rpc(struct raft *raft,
2387 const struct uuid *dst_sid,
2388 const struct uuid *target_sid,
1b1d2e6d
BP
2389 bool success, const char *comment)
2390{
17bd4149
BP
2391 if (uuid_equals(&raft->sid, dst_sid)) {
2392 if (success && uuid_equals(&raft->sid, target_sid)) {
2393 raft_finished_leaving_cluster(raft);
2394 }
2395 return;
2396 }
2397
1b1d2e6d
BP
2398 const union raft_rpc rpy = {
2399 .remove_server_reply = {
2400 .common = {
2401 .type = RAFT_RPC_REMOVE_SERVER_REPLY,
17bd4149 2402 .sid = *dst_sid,
1b1d2e6d
BP
2403 .comment = CONST_CAST(char *, comment),
2404 },
17bd4149
BP
2405 .target_sid = (uuid_equals(dst_sid, target_sid)
2406 ? UUID_ZERO
2407 : *target_sid),
1b1d2e6d
BP
2408 .success = success,
2409 }
2410 };
2411 raft_send(raft, &rpy);
2412}
2413
2414static void
2415raft_send_remove_server_reply__(struct raft *raft,
2416 const struct uuid *target_sid,
2417 const struct uuid *requester_sid,
2418 struct unixctl_conn *requester_conn,
2419 bool success, const char *comment)
2420{
2421 struct ds s = DS_EMPTY_INITIALIZER;
2422 ds_put_format(&s, "request ");
2423 if (!uuid_is_zero(requester_sid)) {
2424 char buf[SID_LEN + 1];
2425 ds_put_format(&s, "by %s",
2426 raft_get_nickname(raft, requester_sid, buf, sizeof buf));
2427 } else {
2428 ds_put_cstr(&s, "via unixctl");
2429 }
2430 ds_put_cstr(&s, " to remove ");
2431 if (!requester_conn && uuid_equals(target_sid, requester_sid)) {
2432 ds_put_cstr(&s, "itself");
2433 } else {
2434 char buf[SID_LEN + 1];
2435 ds_put_cstr(&s, raft_get_nickname(raft, target_sid, buf, sizeof buf));
17bd4149
BP
2436 if (uuid_equals(target_sid, &raft->sid)) {
2437 ds_put_cstr(&s, " (ourselves)");
2438 }
1b1d2e6d
BP
2439 }
2440 ds_put_format(&s, " from cluster "CID_FMT" %s",
2441 CID_ARGS(&raft->cid),
2442 success ? "succeeded" : "failed");
2443 if (comment) {
2444 ds_put_format(&s, " (%s)", comment);
2445 }
2446
2447 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2448 VLOG_INFO_RL(&rl, "%s", ds_cstr(&s));
2449
2450 /* Send RemoveServerReply to the requester (which could be a server or a
2451 * unixctl connection. Also always send it to the removed server; this
2452 * allows it to be sure that it's really removed and update its log and
2453 * disconnect permanently. */
2454 if (!uuid_is_zero(requester_sid)) {
17bd4149 2455 raft_send_remove_server_reply_rpc(raft, requester_sid, target_sid,
1b1d2e6d
BP
2456 success, comment);
2457 }
2458 if (!uuid_equals(requester_sid, target_sid)) {
17bd4149
BP
2459 raft_send_remove_server_reply_rpc(raft, target_sid, target_sid,
2460 success, comment);
1b1d2e6d
BP
2461 }
2462 if (requester_conn) {
2463 if (success) {
2464 unixctl_command_reply(requester_conn, ds_cstr(&s));
2465 } else {
2466 unixctl_command_reply_error(requester_conn, ds_cstr(&s));
2467 }
2468 }
2469
2470 ds_destroy(&s);
2471}
2472
2473static void
2474raft_send_add_server_reply(struct raft *raft,
2475 const struct raft_add_server_request *rq,
2476 bool success, const char *comment)
2477{
2478 return raft_send_add_server_reply__(raft, &rq->common.sid, rq->address,
2479 success, comment);
2480}
2481
2482static void
2483raft_send_remove_server_reply(struct raft *raft,
2484 const struct raft_remove_server_request *rq,
2485 bool success, const char *comment)
2486{
2487 return raft_send_remove_server_reply__(raft, &rq->sid, &rq->common.sid,
2488 rq->requester_conn, success,
2489 comment);
2490}
2491
2492static void
2493raft_become_follower(struct raft *raft)
2494{
2495 raft->leader_sid = UUID_ZERO;
2496 if (raft->role == RAFT_FOLLOWER) {
2497 return;
2498 }
2499
2500 raft->role = RAFT_FOLLOWER;
0f954f32 2501 raft_reset_election_timer(raft);
1b1d2e6d
BP
2502
2503 /* Notify clients about lost leadership.
2504 *
2505 * We do not reverse our changes to 'raft->servers' because the new
2506 * configuration is already part of the log. Possibly the configuration
2507 * log entry will not be committed, but until we know that we must use the
2508 * new configuration. Our AppendEntries processing will properly update
2509 * the server configuration later, if necessary. */
2510 struct raft_server *s;
2511 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
2512 raft_send_add_server_reply__(raft, &s->sid, s->address, false,
2513 RAFT_SERVER_LOST_LEADERSHIP);
2514 }
2515 if (raft->remove_server) {
2516 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
2517 &raft->remove_server->requester_sid,
2518 raft->remove_server->requester_conn,
2519 false, RAFT_SERVER_LOST_LEADERSHIP);
2520 raft_server_destroy(raft->remove_server);
2521 raft->remove_server = NULL;
2522 }
2523
2524 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2525}
2526
2527static void
2528raft_send_append_request(struct raft *raft,
2529 struct raft_server *peer, unsigned int n,
2530 const char *comment)
2531{
2532 ovs_assert(raft->role == RAFT_LEADER);
2533
2534 const union raft_rpc rq = {
2535 .append_request = {
2536 .common = {
2537 .type = RAFT_RPC_APPEND_REQUEST,
2538 .sid = peer->sid,
2539 .comment = CONST_CAST(char *, comment),
2540 },
2541 .term = raft->term,
2542 .prev_log_index = peer->next_index - 1,
2543 .prev_log_term = (peer->next_index - 1 >= raft->log_start
2544 ? raft->entries[peer->next_index - 1
2545 - raft->log_start].term
2546 : raft->snap.term),
2547 .leader_commit = raft->commit_index,
2548 .entries = &raft->entries[peer->next_index - raft->log_start],
2549 .n_entries = n,
2550 },
2551 };
2552 raft_send(raft, &rq);
2553}
2554
2555static void
2556raft_send_heartbeats(struct raft *raft)
2557{
2558 struct raft_server *s;
2559 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2560 if (!uuid_equals(&raft->sid, &s->sid)) {
2561 raft_send_append_request(raft, s, 0, "heartbeat");
2562 }
2563 }
2564
2565 /* Send anyone waiting for a command to complete a ping to let them
2566 * know we're still working on it. */
2567 struct raft_command *cmd;
2568 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2569 if (!uuid_is_zero(&cmd->sid)) {
2570 raft_send_execute_command_reply(raft, &cmd->sid,
2571 &cmd->eid,
2572 RAFT_CMD_INCOMPLETE, 0);
2573 }
2574 }
0f954f32
HZ
2575
2576 raft_reset_ping_timer(raft);
1b1d2e6d
BP
2577}
2578
2579/* Initializes the fields in 's' that represent the leader's view of the
2580 * server. */
2581static void
2582raft_server_init_leader(struct raft *raft, struct raft_server *s)
2583{
2584 s->next_index = raft->log_end;
2585 s->match_index = 0;
2586 s->phase = RAFT_PHASE_STABLE;
89771c1e 2587 s->replied = false;
83fbd2e9 2588 s->install_snapshot_request_in_progress = false;
1b1d2e6d
BP
2589}
2590
923f01ca
HZ
2591static void
2592raft_set_leader(struct raft *raft, const struct uuid *sid)
2593{
2594 raft->leader_sid = *sid;
2833885f 2595 raft->ever_had_leader = raft->had_leader = true;
923f01ca
HZ
2596 raft->candidate_retrying = false;
2597}
2598
1b1d2e6d
BP
2599static void
2600raft_become_leader(struct raft *raft)
2601{
5a9b53a5 2602 log_all_commands(raft);
1b1d2e6d
BP
2603
2604 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
2605 VLOG_INFO_RL(&rl, "term %"PRIu64": elected leader by %d+ of "
2606 "%"PRIuSIZE" servers", raft->term,
2607 raft->n_votes, hmap_count(&raft->servers));
2608
2609 ovs_assert(raft->role != RAFT_LEADER);
2610 raft->role = RAFT_LEADER;
e8451e14 2611 raft->election_won = time_msec();
923f01ca 2612 raft_set_leader(raft, &raft->sid);
89771c1e 2613 raft_reset_election_timer(raft);
0f954f32 2614 raft_reset_ping_timer(raft);
1b1d2e6d
BP
2615
2616 struct raft_server *s;
2617 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2618 raft_server_init_leader(raft, s);
2619 }
2620
8e354614
HZ
2621 raft->election_timer_new = 0;
2622
1b1d2e6d 2623 raft_update_our_match_index(raft, raft->log_end - 1);
1b1d2e6d
BP
2624
2625 /* Write the fact that we are leader to the log. This is not used by the
2626 * algorithm (although it could be, for quick restart), but it is used for
2627 * offline analysis to check for conformance with the properties that Raft
2628 * guarantees. */
2629 struct raft_record r = {
2630 .type = RAFT_REC_LEADER,
2631 .term = raft->term,
2632 .sid = raft->sid,
2633 };
2634 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2635
2636 /* Initiate a no-op commit. Otherwise we might never find out what's in
2637 * the log. See section 6.4 item 1:
2638 *
2639 * The Leader Completeness Property guarantees that a leader has all
2640 * committed entries, but at the start of its term, it may not know
2641 * which those are. To find out, it needs to commit an entry from its
2642 * term. Raft handles this by having each leader commit a blank no-op
2643 * entry into the log at the start of its term. As soon as this no-op
2644 * entry is committed, the leader’s commit index will be at least as
2645 * large as any other servers’ during its term.
2646 */
8e354614
HZ
2647 raft_command_unref(raft_command_execute__(raft, NULL, NULL, 0, NULL,
2648 NULL));
1b1d2e6d
BP
2649}
2650
2651/* Processes term 'term' received as part of RPC 'common'. Returns true if the
2652 * caller should continue processing the RPC, false if the caller should reject
2653 * it due to a stale term. */
2654static bool
2655raft_receive_term__(struct raft *raft, const struct raft_rpc_common *common,
2656 uint64_t term)
2657{
2658 /* Section 3.3 says:
2659 *
2660 * Current terms are exchanged whenever servers communicate; if one
2661 * server’s current term is smaller than the other’s, then it updates
2662 * its current term to the larger value. If a candidate or leader
2663 * discovers that its term is out of date, it immediately reverts to
2664 * follower state. If a server receives a request with a stale term
2665 * number, it rejects the request.
2666 */
2667 if (term > raft->term) {
2668 if (!raft_set_term(raft, term, NULL)) {
2669 /* Failed to update the term to 'term'. */
2670 return false;
2671 }
2672 raft_become_follower(raft);
2673 } else if (term < raft->term) {
2674 char buf[SID_LEN + 1];
2675 VLOG_INFO("rejecting term %"PRIu64" < current term %"PRIu64" received "
2676 "in %s message from server %s",
2677 term, raft->term,
2678 raft_rpc_type_to_string(common->type),
2679 raft_get_nickname(raft, &common->sid, buf, sizeof buf));
2680 return false;
2681 }
2682 return true;
2683}
2684
2685static void
2686raft_get_servers_from_log(struct raft *raft, enum vlog_level level)
2687{
2688 const struct json *servers_json = raft->snap.servers;
2689 for (uint64_t index = raft->log_end - 1; index >= raft->log_start;
2690 index--) {
2691 struct raft_entry *e = &raft->entries[index - raft->log_start];
2692 if (e->servers) {
2693 servers_json = e->servers;
2694 break;
2695 }
2696 }
2697
2698 struct hmap servers;
2699 struct ovsdb_error *error = raft_servers_from_json(servers_json, &servers);
2700 ovs_assert(!error);
2701 raft_set_servers(raft, &servers, level);
2702 raft_servers_destroy(&servers);
2703}
2704
2705/* Truncates the log, so that raft->log_end becomes 'new_end'.
2706 *
2707 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2708 * log file, but we don't have the right information to know how long it should
2709 * be. What we actually do is to append entries for older indexes to the
2710 * on-disk log; when we re-read it later, these entries truncate the log.
2711 *
2712 * Returns true if any of the removed log entries were server configuration
2713 * entries, false otherwise. */
2714static bool
2715raft_truncate(struct raft *raft, uint64_t new_end)
2716{
2717 ovs_assert(new_end >= raft->log_start);
2718 if (raft->log_end > new_end) {
2719 char buf[SID_LEN + 1];
2720 VLOG_INFO("%s truncating %"PRIu64 " entries from end of log",
2721 raft_get_nickname(raft, &raft->sid, buf, sizeof buf),
2722 raft->log_end - new_end);
2723 }
2724
2725 bool servers_changed = false;
2726 while (raft->log_end > new_end) {
2727 struct raft_entry *entry = &raft->entries[--raft->log_end
2728 - raft->log_start];
2729 if (entry->servers) {
2730 servers_changed = true;
2731 }
2732 raft_entry_uninit(entry);
2733 }
2734 return servers_changed;
2735}
2736
2737static const struct json *
2738raft_peek_next_entry(struct raft *raft, struct uuid *eid)
2739{
2740 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2741 ovs_assert(raft->log_start <= raft->last_applied + 2);
2742 ovs_assert(raft->last_applied <= raft->commit_index);
2743 ovs_assert(raft->commit_index < raft->log_end);
2744
2745 if (raft->joining || raft->failed) {
2746 return NULL;
2747 }
2748
2749 if (raft->log_start == raft->last_applied + 2) {
2750 *eid = raft->snap.eid;
2751 return raft->snap.data;
2752 }
2753
2754 while (raft->last_applied < raft->commit_index) {
2755 const struct raft_entry *e = raft_get_entry(raft,
2756 raft->last_applied + 1);
2757 if (e->data) {
2758 *eid = e->eid;
2759 return e->data;
2760 }
2761 raft->last_applied++;
2762 }
2763 return NULL;
2764}
2765
2766static const struct json *
2767raft_get_next_entry(struct raft *raft, struct uuid *eid)
2768{
2769 const struct json *data = raft_peek_next_entry(raft, eid);
2770 if (data) {
2771 raft->last_applied++;
2772 }
2773 return data;
2774}
2775
0f954f32
HZ
2776/* Updates commit index in raft log. If commit index is already up-to-date
2777 * it does nothing and return false, otherwise, returns true. */
2778static bool
1b1d2e6d
BP
2779raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
2780{
2781 if (new_commit_index <= raft->commit_index) {
0f954f32 2782 return false;
1b1d2e6d
BP
2783 }
2784
2785 if (raft->role == RAFT_LEADER) {
2786 while (raft->commit_index < new_commit_index) {
2787 uint64_t index = ++raft->commit_index;
2788 const struct raft_entry *e = raft_get_entry(raft, index);
1b1d2e6d
BP
2789 if (e->data) {
2790 struct raft_command *cmd
5a9b53a5 2791 = raft_find_command_by_eid(raft, &e->eid);
1b1d2e6d 2792 if (cmd) {
5a9b53a5
HZ
2793 if (!cmd->index) {
2794 VLOG_DBG("Command completed after role change from"
2795 " follower to leader "UUID_FMT,
2796 UUID_ARGS(&e->eid));
2797 cmd->index = index;
2798 }
1b1d2e6d
BP
2799 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2800 }
2801 }
8e354614
HZ
2802 if (e->election_timer) {
2803 VLOG_INFO("Election timer changed from %"PRIu64" to %"PRIu64,
2804 raft->election_timer, e->election_timer);
2805 raft->election_timer = e->election_timer;
2806 raft->election_timer_new = 0;
2807 }
8b37ed75
BP
2808 if (e->servers) {
2809 /* raft_run_reconfigure() can write a new Raft entry, which can
2810 * reallocate raft->entries, which would invalidate 'e', so
2811 * this case must be last, after the one for 'e->data'. */
2812 raft_run_reconfigure(raft);
2813 }
1b1d2e6d
BP
2814 }
2815 } else {
8e354614
HZ
2816 while (raft->commit_index < new_commit_index) {
2817 uint64_t index = ++raft->commit_index;
2818 const struct raft_entry *e = raft_get_entry(raft, index);
2819 if (e->election_timer) {
2820 VLOG_INFO("Election timer changed from %"PRIu64" to %"PRIu64,
2821 raft->election_timer, e->election_timer);
2822 raft->election_timer = e->election_timer;
2823 }
2824 }
5a9b53a5
HZ
2825 /* Check if any pending command can be completed, and complete it.
2826 * This can happen when leader fail-over before sending
2827 * execute_command_reply. */
2828 const struct uuid *eid = raft_get_eid(raft, new_commit_index);
2829 struct raft_command *cmd = raft_find_command_by_eid(raft, eid);
2830 if (cmd) {
2831 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2832 VLOG_INFO_RL(&rl,
2833 "Command completed without reply (eid: "UUID_FMT", "
2834 "commit index: %"PRIu64")",
2835 UUID_ARGS(eid), new_commit_index);
2836 cmd->index = new_commit_index;
2837 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2838 }
1b1d2e6d
BP
2839 }
2840
2841 /* Write the commit index to the log. The next time we restart, this
2842 * allows us to start exporting a reasonably fresh log, instead of a log
2843 * that only contains the snapshot. */
2844 struct raft_record r = {
2845 .type = RAFT_REC_COMMIT_INDEX,
2846 .commit_index = raft->commit_index,
2847 };
2848 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
0f954f32 2849 return true;
1b1d2e6d
BP
2850}
2851
2852/* This doesn't use rq->entries (but it does use rq->n_entries). */
2853static void
2854raft_send_append_reply(struct raft *raft, const struct raft_append_request *rq,
2855 enum raft_append_result result, const char *comment)
2856{
2857 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2858 * min(leaderCommit, index of last new entry)" */
2859 if (result == RAFT_APPEND_OK && rq->leader_commit > raft->commit_index) {
2860 raft_update_commit_index(
2861 raft, MIN(rq->leader_commit, rq->prev_log_index + rq->n_entries));
2862 }
2863
2864 /* Send reply. */
2865 union raft_rpc reply = {
2866 .append_reply = {
2867 .common = {
2868 .type = RAFT_RPC_APPEND_REPLY,
2869 .sid = rq->common.sid,
2870 .comment = CONST_CAST(char *, comment),
2871 },
2872 .term = raft->term,
2873 .log_end = raft->log_end,
2874 .prev_log_index = rq->prev_log_index,
2875 .prev_log_term = rq->prev_log_term,
2876 .n_entries = rq->n_entries,
2877 .result = result,
2878 }
2879 };
2880 raft_send(raft, &reply);
2881}
2882
2883/* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2884 * NULL. Otherwise, returns an explanation for the mismatch. */
2885static const char *
2886match_index_and_term(const struct raft *raft,
2887 uint64_t prev_log_index, uint64_t prev_log_term)
2888{
2889 if (prev_log_index < raft->log_start - 1) {
2890 return "mismatch before start of log";
2891 } else if (prev_log_index == raft->log_start - 1) {
2892 if (prev_log_term != raft->snap.term) {
2893 return "prev_term mismatch";
2894 }
2895 } else if (prev_log_index < raft->log_end) {
2896 if (raft->entries[prev_log_index - raft->log_start].term
2897 != prev_log_term) {
2898 return "term mismatch";
2899 }
2900 } else {
2901 /* prev_log_index >= raft->log_end */
2902 return "mismatch past end of log";
2903 }
2904 return NULL;
2905}
2906
2907static void
2908raft_handle_append_entries(struct raft *raft,
2909 const struct raft_append_request *rq,
2910 uint64_t prev_log_index, uint64_t prev_log_term,
2911 const struct raft_entry *entries,
2912 unsigned int n_entries)
2913{
2914 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2915 * the index and term of the entry in its log that immediately precedes
2916 * the new entries. If the follower does not find an entry in its log
2917 * with the same index and term, then it refuses the new entries." */
2918 const char *mismatch = match_index_and_term(raft, prev_log_index,
2919 prev_log_term);
2920 if (mismatch) {
2921 VLOG_INFO("rejecting append_request because previous entry "
2922 "%"PRIu64",%"PRIu64" not in local log (%s)",
2923 prev_log_term, prev_log_index, mismatch);
2924 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY, mismatch);
2925 return;
2926 }
2927
2928 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2929 * index but different terms), delete the existing entry and all that
2930 * follow it." */
2931 unsigned int i;
2932 bool servers_changed = false;
2933 for (i = 0; ; i++) {
2934 if (i >= n_entries) {
2935 /* No change. */
2936 if (rq->common.comment
2937 && !strcmp(rq->common.comment, "heartbeat")) {
2938 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "heartbeat");
2939 } else {
2940 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2941 }
2942 return;
2943 }
2944
2945 uint64_t log_index = (prev_log_index + 1) + i;
2946 if (log_index >= raft->log_end) {
2947 break;
2948 }
2949 if (raft->entries[log_index - raft->log_start].term
2950 != entries[i].term) {
2951 if (raft_truncate(raft, log_index)) {
2952 servers_changed = true;
2953 }
2954 break;
2955 }
2956 }
2957
67dba070
HZ
2958 if (failure_test == FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE) {
2959 ovs_fatal(0, "Raft test: crash after receiving append_request with "
2960 "update.");
2961 }
1b1d2e6d
BP
2962 /* Figure 3.1: "Append any entries not already in the log." */
2963 struct ovsdb_error *error = NULL;
2964 bool any_written = false;
2965 for (; i < n_entries; i++) {
2966 const struct raft_entry *e = &entries[i];
2967 error = raft_write_entry(raft, e->term,
2968 json_nullable_clone(e->data), &e->eid,
8e354614
HZ
2969 json_nullable_clone(e->servers),
2970 e->election_timer);
1b1d2e6d
BP
2971 if (error) {
2972 break;
2973 }
2974 any_written = true;
2975 if (e->servers) {
2976 servers_changed = true;
2977 }
2978 }
2979
2980 if (any_written) {
2981 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index
2982 = raft->log_end - 1;
2983 }
2984 if (servers_changed) {
2985 /* The set of servers might have changed; check. */
2986 raft_get_servers_from_log(raft, VLL_INFO);
2987 }
2988
2989 if (error) {
2990 char *s = ovsdb_error_to_string_free(error);
2991 VLOG_ERR("%s", s);
2992 free(s);
2993 raft_send_append_reply(raft, rq, RAFT_APPEND_IO_ERROR, "I/O error");
2994 return;
2995 }
2996
2997 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "log updated");
2998}
2999
3000static bool
3001raft_update_leader(struct raft *raft, const struct uuid *sid)
3002{
3003 if (raft->role == RAFT_LEADER) {
3004 char buf[SID_LEN + 1];
3005 VLOG_ERR("this server is leader but server %s claims to be",
3006 raft_get_nickname(raft, sid, buf, sizeof buf));
3007 return false;
3008 } else if (!uuid_equals(sid, &raft->leader_sid)) {
3009 if (!uuid_is_zero(&raft->leader_sid)) {
3010 char buf1[SID_LEN + 1];
3011 char buf2[SID_LEN + 1];
3012 VLOG_ERR("leader for term %"PRIu64" changed from %s to %s",
3013 raft->term,
3014 raft_get_nickname(raft, &raft->leader_sid,
3015 buf1, sizeof buf1),
3016 raft_get_nickname(raft, sid, buf2, sizeof buf2));
3017 } else {
3018 char buf[SID_LEN + 1];
3019 VLOG_INFO("server %s is leader for term %"PRIu64,
3020 raft_get_nickname(raft, sid, buf, sizeof buf),
3021 raft->term);
3022 }
923f01ca 3023 raft_set_leader(raft, sid);
1b1d2e6d
BP
3024
3025 /* Record the leader to the log. This is not used by the algorithm
3026 * (although it could be, for quick restart), but it is used for
3027 * offline analysis to check for conformance with the properties
3028 * that Raft guarantees. */
3029 struct raft_record r = {
3030 .type = RAFT_REC_LEADER,
3031 .term = raft->term,
3032 .sid = *sid,
3033 };
3034 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
3035 }
93ee4209
HZ
3036 if (raft->role == RAFT_CANDIDATE) {
3037 /* Section 3.4: While waiting for votes, a candidate may
3038 * receive an AppendEntries RPC from another server claiming to
3039 * be leader. If the leader’s term (included in its RPC) is at
3040 * least as large as the candidate’s current term, then the
3041 * candidate recognizes the leader as legitimate and returns to
3042 * follower state. */
3043 raft->role = RAFT_FOLLOWER;
3044 }
1b1d2e6d
BP
3045 return true;
3046}
3047
3048static void
3049raft_handle_append_request(struct raft *raft,
3050 const struct raft_append_request *rq)
3051{
3052 /* We do not check whether the server that sent the request is part of the
3053 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
3054 * from a leader that is not part of the server’s latest configuration.
3055 * Otherwise, a new server could never be added to the cluster (it would
3056 * never accept any log entries preceding the configuration entry that adds
3057 * the server)." */
3058 if (!raft_update_leader(raft, &rq->common.sid)) {
3059 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
3060 "usurped leadership");
3061 return;
3062 }
0f954f32 3063 raft_reset_election_timer(raft);
1b1d2e6d
BP
3064
3065 /* First check for the common case, where the AppendEntries request is
3066 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
3067 * like this:
3068 *
3069 * rq->prev_log_index
3070 * | first_entry_index
3071 * | | nth_entry_index
3072 * | | |
3073 * v v v
3074 * +---+---+---+---+
3075 * T | T | T | T | T |
3076 * +---+-------+---+
3077 * +---+---+---+---+
3078 * T | T | T | T | T |
3079 * +---+---+---+---+
3080 * ^ ^
3081 * | |
3082 * log_start log_end
3083 * */
3084 uint64_t first_entry_index = rq->prev_log_index + 1;
3085 uint64_t nth_entry_index = rq->prev_log_index + rq->n_entries;
3086 if (OVS_LIKELY(first_entry_index >= raft->log_start)) {
3087 raft_handle_append_entries(raft, rq,
3088 rq->prev_log_index, rq->prev_log_term,
3089 rq->entries, rq->n_entries);
3090 return;
3091 }
3092
3093 /* Now a series of checks for odd cases, where the AppendEntries request
3094 * extends earlier than the beginning of our log, into the log entries
3095 * discarded by the most recent snapshot. */
3096
3097 /*
3098 * Handle the case where the indexes covered by rq->entries[] are entirely
3099 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
3100 * everything in the AppendEntries request must already have been
3101 * committed, and we might as well return true.
3102 *
3103 * rq->prev_log_index
3104 * | first_entry_index
3105 * | | nth_entry_index
3106 * | | |
3107 * v v v
3108 * +---+---+---+---+
3109 * T | T | T | T | T |
3110 * +---+-------+---+
3111 * +---+---+---+---+
3112 * T | T | T | T | T |
3113 * +---+---+---+---+
3114 * ^ ^
3115 * | |
3116 * log_start log_end
3117 */
3118 if (nth_entry_index < raft->log_start - 1) {
3119 raft_send_append_reply(raft, rq, RAFT_APPEND_OK,
3120 "append before log start");
3121 return;
3122 }
3123
3124 /*
3125 * Handle the case where the last entry in rq->entries[] has the same index
3126 * as 'log_start - 1', so we can compare their terms:
3127 *
3128 * rq->prev_log_index
3129 * | first_entry_index
3130 * | | nth_entry_index
3131 * | | |
3132 * v v v
3133 * +---+---+---+---+
3134 * T | T | T | T | T |
3135 * +---+-------+---+
3136 * +---+---+---+---+
3137 * T | T | T | T | T |
3138 * +---+---+---+---+
3139 * ^ ^
3140 * | |
3141 * log_start log_end
3142 *
3143 * There's actually a sub-case where rq->n_entries == 0, in which we
3144 * compare rq->prev_term:
3145 *
3146 * rq->prev_log_index
3147 * |
3148 * |
3149 * |
3150 * v
3151 * T
3152 *
3153 * +---+---+---+---+
3154 * T | T | T | T | T |
3155 * +---+---+---+---+
3156 * ^ ^
3157 * | |
3158 * log_start log_end
3159 */
3160 if (nth_entry_index == raft->log_start - 1) {
3161 if (rq->n_entries
3162 ? raft->snap.term == rq->entries[rq->n_entries - 1].term
3163 : raft->snap.term == rq->prev_log_term) {
3164 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
3165 } else {
3166 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
3167 "term mismatch");
3168 }
3169 return;
3170 }
3171
3172 /*
3173 * We now know that the data in rq->entries[] overlaps the data in
3174 * raft->entries[], as shown below, with some positive 'ofs':
3175 *
3176 * rq->prev_log_index
3177 * | first_entry_index
3178 * | | nth_entry_index
3179 * | | |
3180 * v v v
3181 * +---+---+---+---+---+
3182 * T | T | T | T | T | T |
3183 * +---+-------+---+---+
3184 * +---+---+---+---+
3185 * T | T | T | T | T |
3186 * +---+---+---+---+
3187 * ^ ^
3188 * | |
3189 * log_start log_end
3190 *
3191 * |<-- ofs -->|
3192 *
3193 * We transform this into the following by trimming the first 'ofs'
3194 * elements off of rq->entries[], ending up with the following. Notice how
3195 * we retain the term but not the data for rq->entries[ofs - 1]:
3196 *
3197 * first_entry_index + ofs - 1
3198 * | first_entry_index + ofs
3199 * | | nth_entry_index + ofs
3200 * | | |
3201 * v v v
3202 * +---+---+
3203 * T | T | T |
3204 * +---+---+
3205 * +---+---+---+---+
3206 * T | T | T | T | T |
3207 * +---+---+---+---+
3208 * ^ ^
3209 * | |
3210 * log_start log_end
3211 */
3212 uint64_t ofs = raft->log_start - first_entry_index;
3213 raft_handle_append_entries(raft, rq,
3214 raft->log_start - 1, rq->entries[ofs - 1].term,
3215 &rq->entries[ofs], rq->n_entries - ofs);
3216}
3217
3218/* Returns true if 'raft' has another log entry or snapshot to read. */
3219bool
3220raft_has_next_entry(const struct raft *raft_)
3221{
3222 struct raft *raft = CONST_CAST(struct raft *, raft_);
3223 struct uuid eid;
3224 return raft_peek_next_entry(raft, &eid) != NULL;
3225}
3226
3227/* Returns the next log entry or snapshot from 'raft', or NULL if there are
3228 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
3229 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
3230 * log entry. */
3231const struct json *
3232raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
3233{
3234 const struct json *data = raft_get_next_entry(raft, eid);
3235 *is_snapshot = data == raft->snap.data;
3236 return data;
3237}
3238
3239/* Returns the log index of the last-read snapshot or log entry. */
3240uint64_t
3241raft_get_applied_index(const struct raft *raft)
3242{
3243 return raft->last_applied;
3244}
3245
3246/* Returns the log index of the last snapshot or log entry that is available to
3247 * be read. */
3248uint64_t
3249raft_get_commit_index(const struct raft *raft)
3250{
3251 return raft->commit_index;
3252}
3253
3254static struct raft_server *
3255raft_find_peer(struct raft *raft, const struct uuid *uuid)
3256{
3257 struct raft_server *s = raft_find_server(raft, uuid);
3258 return s && !uuid_equals(&raft->sid, &s->sid) ? s : NULL;
3259}
3260
3261static struct raft_server *
3262raft_find_new_server(struct raft *raft, const struct uuid *uuid)
3263{
3264 return raft_server_find(&raft->add_servers, uuid);
3265}
3266
3267/* Figure 3.1: "If there exists an N such that N > commitIndex, a
3268 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
3269 * commitIndex = N (sections 3.5 and 3.6)." */
3270static void
3271raft_consider_updating_commit_index(struct raft *raft)
3272{
3273 /* This loop cannot just bail out when it comes across a log entry that
3274 * does not match the criteria. For example, Figure 3.7(d2) shows a
3275 * case where the log entry for term 2 cannot be committed directly
3276 * (because it is not for the current term) but it can be committed as
3277 * a side effect of commit the entry for term 4 (the current term).
3278 * XXX Is there a more efficient way to do this? */
3279 ovs_assert(raft->role == RAFT_LEADER);
3280
3281 uint64_t new_commit_index = raft->commit_index;
3282 for (uint64_t idx = MAX(raft->commit_index + 1, raft->log_start);
3283 idx < raft->log_end; idx++) {
3284 if (raft->entries[idx - raft->log_start].term == raft->term) {
3285 size_t count = 0;
3286 struct raft_server *s2;
3287 HMAP_FOR_EACH (s2, hmap_node, &raft->servers) {
3288 if (s2->match_index >= idx) {
3289 count++;
3290 }
3291 }
3292 if (count > hmap_count(&raft->servers) / 2) {
3293 VLOG_DBG("index %"PRIu64" committed to %"PRIuSIZE" servers, "
3294 "applying", idx, count);
3295 new_commit_index = idx;
3296 }
3297 }
3298 }
0f954f32
HZ
3299 if (raft_update_commit_index(raft, new_commit_index)) {
3300 raft_send_heartbeats(raft);
3301 }
1b1d2e6d
BP
3302}
3303
3304static void
3305raft_update_match_index(struct raft *raft, struct raft_server *s,
3306 uint64_t min_index)
3307{
3308 ovs_assert(raft->role == RAFT_LEADER);
3309 if (min_index > s->match_index) {
3310 s->match_index = min_index;
3311 raft_consider_updating_commit_index(raft);
3312 }
3313}
3314
3315static void
3316raft_update_our_match_index(struct raft *raft, uint64_t min_index)
3317{
e8208c66
BP
3318 struct raft_server *server = raft_find_server(raft, &raft->sid);
3319 if (server) {
3320 raft_update_match_index(raft, server, min_index);
3321 }
1b1d2e6d
BP
3322}
3323
3324static void
3325raft_send_install_snapshot_request(struct raft *raft,
3326 const struct raft_server *s,
3327 const char *comment)
3328{
3329 union raft_rpc rpc = {
3330 .install_snapshot_request = {
3331 .common = {
3332 .type = RAFT_RPC_INSTALL_SNAPSHOT_REQUEST,
3333 .sid = s->sid,
3334 .comment = CONST_CAST(char *, comment),
3335 },
3336 .term = raft->term,
3337 .last_index = raft->log_start - 1,
3338 .last_term = raft->snap.term,
3339 .last_servers = raft->snap.servers,
3340 .last_eid = raft->snap.eid,
3341 .data = raft->snap.data,
9bfb280a 3342 .election_timer = raft->election_timer, /* use latest value */
1b1d2e6d
BP
3343 }
3344 };
8c2c503b 3345
83fbd2e9
IM
3346 if (s->install_snapshot_request_in_progress) {
3347 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
8c2c503b 3348
83fbd2e9
IM
3349 VLOG_INFO_RL(&rl, "not sending snapshot to server %s, "
3350 "already in progress", s->nickname);
3351 return;
8c2c503b 3352 }
8c2c503b 3353
83fbd2e9
IM
3354 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3355 VLOG_INFO_RL(&rl, "sending snapshot to server %s, %"PRIu64":%"PRIu64".",
3356 s->nickname, raft->term, raft->log_start - 1);
3357 CONST_CAST(struct raft_server *, s)->install_snapshot_request_in_progress
3358 = raft_send(raft, &rpc);
1b1d2e6d
BP
3359}
3360
3361static void
3362raft_handle_append_reply(struct raft *raft,
3363 const struct raft_append_reply *rpy)
3364{
3365 if (raft->role != RAFT_LEADER) {
3366 VLOG_INFO("rejected append_reply (not leader)");
3367 return;
3368 }
3369
3370 /* Most commonly we'd be getting an AppendEntries reply from a configured
3371 * server (e.g. a peer), but we can also get them from servers in the
3372 * process of being added. */
3373 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3374 if (!s) {
3375 s = raft_find_new_server(raft, &rpy->common.sid);
3376 if (!s) {
3377 VLOG_INFO("rejected append_reply from unknown server "SID_FMT,
3378 SID_ARGS(&rpy->common.sid));
3379 return;
3380 }
3381 }
3382
89771c1e 3383 s->replied = true;
1b1d2e6d
BP
3384 if (rpy->result == RAFT_APPEND_OK) {
3385 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3386 * follower (section 3.5)." */
3387 uint64_t min_index = rpy->prev_log_index + rpy->n_entries + 1;
3388 if (s->next_index < min_index) {
3389 s->next_index = min_index;
3390 }
3391 raft_update_match_index(raft, s, min_index - 1);
3392 } else {
3393 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3394 * decrement nextIndex and retry (section 3.5)."
3395 *
3396 * We also implement the optimization suggested in section 4.2.1:
3397 * "Various approaches can make nextIndex converge to its correct value
3398 * more quickly, including those described in Chapter 3. The simplest
3399 * approach to solving this particular problem of adding a new server,
3400 * however, is to have followers return the length of their logs in the
3401 * AppendEntries response; this allows the leader to cap the follower’s
3402 * nextIndex accordingly." */
3403 s->next_index = (s->next_index > 0
3404 ? MIN(s->next_index - 1, rpy->log_end)
3405 : 0);
3406
3407 if (rpy->result == RAFT_APPEND_IO_ERROR) {
3408 /* Append failed but not because of a log inconsistency. Because
3409 * of the I/O error, there's no point in re-sending the append
3410 * immediately. */
3411 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3412 VLOG_INFO_RL(&rl, "%s reported I/O error", s->nickname);
3413 return;
3414 }
3415 }
3416
3417 /*
3418 * Our behavior here must depend on the value of next_index relative to
3419 * log_start and log_end. There are three cases:
3420 *
3421 * Case 1 | Case 2 | Case 3
3422 * <---------------->|<------------->|<------------------>
3423 * | |
3424 *
3425 * +---+---+---+---+
3426 * T | T | T | T | T |
3427 * +---+---+---+---+
3428 * ^ ^
3429 * | |
3430 * log_start log_end
3431 */
3432 if (s->next_index < raft->log_start) {
3433 /* Case 1. */
3434 raft_send_install_snapshot_request(raft, s, NULL);
3435 } else if (s->next_index < raft->log_end) {
3436 /* Case 2. */
99c2dc8d 3437 raft_send_append_request(raft, s, raft->log_end - s->next_index, NULL);
1b1d2e6d
BP
3438 } else {
3439 /* Case 3. */
3440 if (s->phase == RAFT_PHASE_CATCHUP) {
3441 s->phase = RAFT_PHASE_CAUGHT_UP;
3442 raft_run_reconfigure(raft);
3443 }
3444 }
3445}
3446
3447static bool
3448raft_should_suppress_disruptive_server(struct raft *raft,
3449 const union raft_rpc *rpc)
3450{
3451 if (rpc->type != RAFT_RPC_VOTE_REQUEST) {
3452 return false;
3453 }
3454
3455 /* Section 4.2.3 "Disruptive Servers" says:
3456 *
3457 * ...if a server receives a RequestVote request within the minimum
3458 * election timeout of hearing from a current leader, it does not update
3459 * its term or grant its vote...
3460 *
3461 * ...This change conflicts with the leadership transfer mechanism as
3462 * described in Chapter 3, in which a server legitimately starts an
3463 * election without waiting an election timeout. In that case,
3464 * RequestVote messages should be processed by other servers even when
3465 * they believe a current cluster leader exists. Those RequestVote
3466 * requests can include a special flag to indicate this behavior (“I
3467 * have permission to disrupt the leader--it told me to!”).
3468 *
3469 * This clearly describes how the followers should act, but not the leader.
3470 * We just ignore vote requests that arrive at a current leader. This
3471 * seems to be fairly safe, since a majority other than the current leader
3472 * can still elect a new leader and the first AppendEntries from that new
3473 * leader will depose the current leader. */
3474 const struct raft_vote_request *rq = raft_vote_request_cast(rpc);
3475 if (rq->leadership_transfer) {
3476 return false;
3477 }
3478
3479 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3480 long long int now = time_msec();
3481 switch (raft->role) {
3482 case RAFT_LEADER:
3483 VLOG_WARN_RL(&rl, "ignoring vote request received as leader");
3484 return true;
3485
3486 case RAFT_FOLLOWER:
8e354614 3487 if (now < raft->election_base + raft->election_timer) {
1b1d2e6d 3488 VLOG_WARN_RL(&rl, "ignoring vote request received after only "
8e354614
HZ
3489 "%lld ms (minimum election time is %"PRIu64" ms)",
3490 now - raft->election_base, raft->election_timer);
1b1d2e6d
BP
3491 return true;
3492 }
3493 return false;
3494
3495 case RAFT_CANDIDATE:
3496 return false;
3497
3498 default:
3499 OVS_NOT_REACHED();
3500 }
3501}
3502
3503/* Returns true if a reply should be sent. */
3504static bool
3505raft_handle_vote_request__(struct raft *raft,
3506 const struct raft_vote_request *rq)
3507{
3508 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3509 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3510 * 3.6)." */
3511 if (uuid_equals(&raft->vote, &rq->common.sid)) {
3512 /* Already voted for this candidate in this term. Resend vote. */
3513 return true;
3514 } else if (!uuid_is_zero(&raft->vote)) {
3515 /* Already voted for different candidate in this term. Send a reply
3516 * saying what candidate we did vote for. This isn't a necessary part
3517 * of the Raft protocol but it can make debugging easier. */
3518 return true;
3519 }
3520
3521 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3522 * includes information about the candidate’s log, and the voter denies its
3523 * vote if its own log is more up-to-date than that of the candidate. Raft
3524 * determines which of two logs is more up-to-date by comparing the index
3525 * and term of the last entries in the logs. If the logs have last entries
3526 * with different terms, then the log with the later term is more
3527 * up-to-date. If the logs end with the same term, then whichever log is
3528 * longer is more up-to-date." */
3529 uint64_t last_term = (raft->log_end > raft->log_start
3530 ? raft->entries[raft->log_end - 1
3531 - raft->log_start].term
3532 : raft->snap.term);
3533 if (last_term > rq->last_log_term
3534 || (last_term == rq->last_log_term
3535 && raft->log_end - 1 > rq->last_log_index)) {
3536 /* Our log is more up-to-date than the peer's. Withhold vote. */
3537 return false;
3538 }
3539
3540 /* Record a vote for the peer. */
3541 if (!raft_set_term(raft, raft->term, &rq->common.sid)) {
3542 return false;
3543 }
3544
0f954f32 3545 raft_reset_election_timer(raft);
1b1d2e6d
BP
3546
3547 return true;
3548}
3549
3550static void
3551raft_send_vote_reply(struct raft *raft, const struct uuid *dst,
3552 const struct uuid *vote)
3553{
3554 union raft_rpc rpy = {
3555 .vote_reply = {
3556 .common = {
3557 .type = RAFT_RPC_VOTE_REPLY,
3558 .sid = *dst,
3559 },
3560 .term = raft->term,
3561 .vote = *vote,
3562 },
3563 };
3564 raft_send(raft, &rpy);
3565}
3566
3567static void
3568raft_handle_vote_request(struct raft *raft,
3569 const struct raft_vote_request *rq)
3570{
3571 if (raft_handle_vote_request__(raft, rq)) {
3572 raft_send_vote_reply(raft, &rq->common.sid, &raft->vote);
3573 }
3574}
3575
3576static void
3577raft_handle_vote_reply(struct raft *raft,
3578 const struct raft_vote_reply *rpy)
3579{
3580 if (!raft_receive_term__(raft, &rpy->common, rpy->term)) {
3581 return;
3582 }
3583
3584 if (raft->role != RAFT_CANDIDATE) {
3585 return;
3586 }
3587
3588 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3589 if (s) {
3590 raft_accept_vote(raft, s, &rpy->vote);
3591 }
3592}
3593
3594/* Returns true if 'raft''s log contains reconfiguration entries that have not
3595 * yet been committed. */
3596static bool
3597raft_has_uncommitted_configuration(const struct raft *raft)
3598{
3599 for (uint64_t i = raft->commit_index + 1; i < raft->log_end; i++) {
3600 ovs_assert(i >= raft->log_start);
3601 const struct raft_entry *e = &raft->entries[i - raft->log_start];
3602 if (e->servers) {
3603 return true;
3604 }
3605 }
3606 return false;
3607}
3608
3609static void
3610raft_log_reconfiguration(struct raft *raft)
3611{
3612 struct json *servers_json = raft_servers_to_json(&raft->servers);
3613 raft_command_unref(raft_command_execute__(
8e354614 3614 raft, NULL, servers_json, 0, NULL, NULL));
1b1d2e6d
BP
3615 json_destroy(servers_json);
3616}
3617
3618static void
3619raft_run_reconfigure(struct raft *raft)
3620{
3621 ovs_assert(raft->role == RAFT_LEADER);
3622
3623 /* Reconfiguration only progresses when configuration changes commit. */
3624 if (raft_has_uncommitted_configuration(raft)) {
3625 return;
3626 }
3627
3628 /* If we were waiting for a configuration change to commit, it's done. */
3629 struct raft_server *s;
3630 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3631 if (s->phase == RAFT_PHASE_COMMITTING) {
3632 raft_send_add_server_reply__(raft, &s->sid, s->address,
3633 true, RAFT_SERVER_COMPLETED);
3634 s->phase = RAFT_PHASE_STABLE;
3635 }
3636 }
3637 if (raft->remove_server) {
3638 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
3639 &raft->remove_server->requester_sid,
3640 raft->remove_server->requester_conn,
3641 true, RAFT_SERVER_COMPLETED);
3642 raft_server_destroy(raft->remove_server);
3643 raft->remove_server = NULL;
3644 }
3645
3646 /* If a new server is caught up, add it to the configuration. */
3647 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
3648 if (s->phase == RAFT_PHASE_CAUGHT_UP) {
3649 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3650 hmap_remove(&raft->add_servers, &s->hmap_node);
3651 hmap_insert(&raft->servers, &s->hmap_node, uuid_hash(&s->sid));
3652
3653 /* Mark 's' as waiting for commit. */
3654 s->phase = RAFT_PHASE_COMMITTING;
3655
3656 raft_log_reconfiguration(raft);
3657
3658 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3659 * send a RAFT_SERVER_OK reply. */
3660
3661 return;
3662 }
3663 }
3664
3665 /* Remove a server, if one is scheduled for removal. */
3666 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3667 if (s->phase == RAFT_PHASE_REMOVE) {
3668 hmap_remove(&raft->servers, &s->hmap_node);
3669 raft->remove_server = s;
3670
3671 raft_log_reconfiguration(raft);
3672
3673 return;
3674 }
3675 }
3676}
3677
3678static void
3679raft_handle_add_server_request(struct raft *raft,
3680 const struct raft_add_server_request *rq)
3681{
3682 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3683 if (raft->role != RAFT_LEADER) {
3684 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3685 return;
3686 }
3687
3688 /* Check for an existing server. */
3689 struct raft_server *s = raft_find_server(raft, &rq->common.sid);
3690 if (s) {
3691 /* If the server is scheduled to be removed, cancel it. */
3692 if (s->phase == RAFT_PHASE_REMOVE) {
3693 s->phase = RAFT_PHASE_STABLE;
3694 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_CANCELED);
3695 return;
3696 }
3697
3698 /* If the server is being added, then it's in progress. */
3699 if (s->phase != RAFT_PHASE_STABLE) {
3700 raft_send_add_server_reply(raft, rq,
3701 false, RAFT_SERVER_IN_PROGRESS);
3702 }
3703
3704 /* Nothing to do--server is already part of the configuration. */
3705 raft_send_add_server_reply(raft, rq,
3706 true, RAFT_SERVER_ALREADY_PRESENT);
3707 return;
3708 }
3709
3710 /* Check for a server being removed. */
3711 if (raft->remove_server
3712 && uuid_equals(&rq->common.sid, &raft->remove_server->sid)) {
3713 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3714 return;
3715 }
3716
3717 /* Check for a server already being added. */
3718 if (raft_find_new_server(raft, &rq->common.sid)) {
3719 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_IN_PROGRESS);
3720 return;
3721 }
3722
3723 /* Add server to 'add_servers'. */
3724 s = raft_server_add(&raft->add_servers, &rq->common.sid, rq->address);
3725 raft_server_init_leader(raft, s);
3726 s->requester_sid = rq->common.sid;
3727 s->requester_conn = NULL;
3728 s->phase = RAFT_PHASE_CATCHUP;
e8451e14 3729 s->last_msg_ts = time_msec();
1b1d2e6d
BP
3730
3731 /* Start sending the log. If this is the first time we've tried to add
3732 * this server, then this will quickly degenerate into an InstallSnapshot
3733 * followed by a series of AddEntries, but if it's a retry of an earlier
3734 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3735 * leadership) then it will gracefully resume populating the log.
3736 *
3737 * See the last few paragraphs of section 4.2.1 for further insight. */
3738 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3739 VLOG_INFO_RL(&rl,
3740 "starting to add server %s ("SID_FMT" at %s) "
3741 "to cluster "CID_FMT, s->nickname, SID_ARGS(&s->sid),
3742 rq->address, CID_ARGS(&raft->cid));
3743 raft_send_append_request(raft, s, 0, "initialize new server");
3744}
3745
3746static void
3747raft_handle_add_server_reply(struct raft *raft,
3748 const struct raft_add_server_reply *rpy)
3749{
3750 if (!raft->joining) {
3751 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3752 VLOG_WARN_RL(&rl, "received add_server_reply even though we're "
3753 "already part of the cluster");
3754 return;
3755 }
3756
3757 if (rpy->success) {
3758 raft->joining = false;
3759
3760 /* It is tempting, at this point, to check that this server is part of
3761 * the current configuration. However, this is not necessarily the
3762 * case, because the log entry that added this server to the cluster
3763 * might have been committed by a majority of the cluster that does not
3764 * include this one. This actually happens in testing. */
3765 } else {
3766 const char *address;
3767 SSET_FOR_EACH (address, &rpy->remote_addresses) {
3768 if (sset_add(&raft->remote_addresses, address)) {
3769 VLOG_INFO("%s: learned new server address for joining cluster",
3770 address);
3771 }
3772 }
3773 }
3774}
3775
3776/* This is called by raft_unixctl_kick() as well as via RPC. */
3777static void
3778raft_handle_remove_server_request(struct raft *raft,
3779 const struct raft_remove_server_request *rq)
3780{
3781 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3782 if (raft->role != RAFT_LEADER) {
3783 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3784 return;
3785 }
3786
3787 /* If the server to remove is currently waiting to be added, cancel it. */
3788 struct raft_server *target = raft_find_new_server(raft, &rq->sid);
3789 if (target) {
3790 raft_send_add_server_reply__(raft, &target->sid, target->address,
3791 false, RAFT_SERVER_CANCELED);
3792 hmap_remove(&raft->add_servers, &target->hmap_node);
3793 raft_server_destroy(target);
3794 return;
3795 }
3796
3797 /* If the server isn't configured, report that. */
3798 target = raft_find_server(raft, &rq->sid);
3799 if (!target) {
3800 raft_send_remove_server_reply(raft, rq,
3801 true, RAFT_SERVER_ALREADY_GONE);
3802 return;
3803 }
3804
3805 /* Check whether we're waiting for the addition of the server to commit. */
3806 if (target->phase == RAFT_PHASE_COMMITTING) {
3807 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3808 return;
3809 }
3810
3811 /* Check whether the server is already scheduled for removal. */
3812 if (target->phase == RAFT_PHASE_REMOVE) {
3813 raft_send_remove_server_reply(raft, rq,
3814 false, RAFT_SERVER_IN_PROGRESS);
3815 return;
3816 }
3817
3818 /* Make sure that if we remove this server then that at least one other
3819 * server will be left. We don't count servers currently being added (in
3820 * 'add_servers') since those could fail. */
3821 struct raft_server *s;
3822 int n = 0;
3823 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3824 if (s != target && s->phase != RAFT_PHASE_REMOVE) {
3825 n++;
3826 }
3827 }
3828 if (!n) {
3829 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_EMPTY);
3830 return;
3831 }
3832
3833 /* Mark the server for removal. */
3834 target->phase = RAFT_PHASE_REMOVE;
3835 if (rq->requester_conn) {
3836 target->requester_sid = UUID_ZERO;
3837 unixctl_command_reply(rq->requester_conn, "started removal");
3838 } else {
3839 target->requester_sid = rq->common.sid;
3840 target->requester_conn = NULL;
3841 }
3842
3843 raft_run_reconfigure(raft);
3844 /* Operation in progress, reply will be sent later. */
3845}
3846
3847static void
17bd4149 3848raft_finished_leaving_cluster(struct raft *raft)
1b1d2e6d 3849{
17bd4149
BP
3850 VLOG_INFO(SID_FMT": finished leaving cluster "CID_FMT,
3851 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1b1d2e6d 3852
17bd4149
BP
3853 raft_record_note(raft, "left", "this server left the cluster");
3854
3855 raft->leaving = false;
3856 raft->left = true;
3857}
1b1d2e6d 3858
17bd4149
BP
3859static void
3860raft_handle_remove_server_reply(struct raft *raft,
3861 const struct raft_remove_server_reply *rpc)
3862{
3863 if (rpc->success
3864 && (uuid_is_zero(&rpc->target_sid)
3865 || uuid_equals(&rpc->target_sid, &raft->sid))) {
3866 raft_finished_leaving_cluster(raft);
1b1d2e6d
BP
3867 }
3868}
3869
3870static bool
3871raft_handle_write_error(struct raft *raft, struct ovsdb_error *error)
3872{
3873 if (error && !raft->failed) {
3874 raft->failed = true;
3875
3876 char *s = ovsdb_error_to_string_free(error);
3877 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3878 raft->name, s);
3879 free(s);
3880 }
3881 return !raft->failed;
3882}
3883
3884static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3885raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
3886 uint64_t new_log_start,
3887 const struct raft_entry *new_snapshot)
3888{
3889 struct raft_header h = {
3890 .sid = raft->sid,
3891 .cid = raft->cid,
3892 .name = raft->name,
3893 .local_address = raft->local_address,
3894 .snap_index = new_log_start - 1,
3895 .snap = *new_snapshot,
3896 };
3897 struct ovsdb_error *error = ovsdb_log_write_and_free(
3898 log, raft_header_to_json(&h));
3899 if (error) {
3900 return error;
3901 }
3902 ovsdb_log_mark_base(raft->log);
3903
3904 /* Write log records. */
3905 for (uint64_t index = new_log_start; index < raft->log_end; index++) {
3906 const struct raft_entry *e = &raft->entries[index - raft->log_start];
3907 struct raft_record r = {
3908 .type = RAFT_REC_ENTRY,
3909 .term = e->term,
3910 .entry = {
3911 .index = index,
3912 .data = e->data,
3913 .servers = e->servers,
8e354614 3914 .election_timer = e->election_timer,
1b1d2e6d
BP
3915 .eid = e->eid,
3916 },
3917 };
3918 error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3919 if (error) {
3920 return error;
3921 }
3922 }
3923
3924 /* Write term and vote (if any).
3925 *
3926 * The term is redundant if we wrote a log record for that term above. The
3927 * vote, if any, is never redundant.
3928 */
3929 error = raft_write_state(log, raft->term, &raft->vote);
3930 if (error) {
3931 return error;
3932 }
3933
3934 /* Write commit_index if it's beyond the new start of the log. */
3935 if (raft->commit_index >= new_log_start) {
3936 struct raft_record r = {
3937 .type = RAFT_REC_COMMIT_INDEX,
3938 .commit_index = raft->commit_index,
3939 };
3940 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3941 }
3942 return NULL;
3943}
3944
3945static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3946raft_save_snapshot(struct raft *raft,
3947 uint64_t new_start, const struct raft_entry *new_snapshot)
3948
3949{
3950 struct ovsdb_log *new_log;
3951 struct ovsdb_error *error;
3952 error = ovsdb_log_replace_start(raft->log, &new_log);
3953 if (error) {
3954 return error;
3955 }
3956
3957 error = raft_write_snapshot(raft, new_log, new_start, new_snapshot);
3958 if (error) {
3959 ovsdb_log_replace_abort(new_log);
3960 return error;
3961 }
3962
3963 return ovsdb_log_replace_commit(raft->log, new_log);
3964}
3965
3966static bool
3967raft_handle_install_snapshot_request__(
3968 struct raft *raft, const struct raft_install_snapshot_request *rq)
3969{
0f954f32 3970 raft_reset_election_timer(raft);
1b1d2e6d
BP
3971
3972 /*
3973 * Our behavior here depend on new_log_start in the snapshot compared to
3974 * log_start and log_end. There are three cases:
3975 *
3976 * Case 1 | Case 2 | Case 3
3977 * <---------------->|<------------->|<------------------>
3978 * | |
3979 *
3980 * +---+---+---+---+
3981 * T | T | T | T | T |
3982 * +---+---+---+---+
3983 * ^ ^
3984 * | |
3985 * log_start log_end
3986 */
3987 uint64_t new_log_start = rq->last_index + 1;
3988 if (new_log_start < raft->log_start) {
3989 /* Case 1: The new snapshot covers less than our current one. Nothing
3990 * to do. */
3991 return true;
3992 } else if (new_log_start < raft->log_end) {
3993 /* Case 2: The new snapshot starts in the middle of our log. We could
3994 * discard the first 'new_log_start - raft->log_start' entries in the
3995 * log. But there's not much value in that, since snapshotting is
3996 * supposed to be a local decision. Just skip it. */
3997 return true;
3998 }
3999
4000 /* Case 3: The new snapshot starts past the end of our current log, so
4001 * discard all of our current log. */
4002 const struct raft_entry new_snapshot = {
4003 .term = rq->last_term,
4004 .data = rq->data,
4005 .eid = rq->last_eid,
4006 .servers = rq->last_servers,
8e354614 4007 .election_timer = rq->election_timer,
1b1d2e6d
BP
4008 };
4009 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
4010 &new_snapshot);
4011 if (error) {
91bdb33e 4012 char *error_s = ovsdb_error_to_string_free(error);
1b1d2e6d
BP
4013 VLOG_WARN("could not save snapshot: %s", error_s);
4014 free(error_s);
4015 return false;
4016 }
4017
4018 for (size_t i = 0; i < raft->log_end - raft->log_start; i++) {
4019 raft_entry_uninit(&raft->entries[i]);
4020 }
4021 raft->log_start = raft->log_end = new_log_start;
4022 raft->log_synced = raft->log_end - 1;
4023 raft->commit_index = raft->log_start - 1;
4024 if (raft->last_applied < raft->commit_index) {
4025 raft->last_applied = raft->log_start - 2;
4026 }
4027
4028 raft_entry_uninit(&raft->snap);
4029 raft_entry_clone(&raft->snap, &new_snapshot);
4030
4031 raft_get_servers_from_log(raft, VLL_INFO);
8e354614 4032 raft_get_election_timer_from_log(raft);
1b1d2e6d
BP
4033
4034 return true;
4035}
4036
4037static void
4038raft_handle_install_snapshot_request(
4039 struct raft *raft, const struct raft_install_snapshot_request *rq)
4040{
4041 if (raft_handle_install_snapshot_request__(raft, rq)) {
4042 union raft_rpc rpy = {
4043 .install_snapshot_reply = {
4044 .common = {
4045 .type = RAFT_RPC_INSTALL_SNAPSHOT_REPLY,
4046 .sid = rq->common.sid,
4047 },
4048 .term = raft->term,
4049 .last_index = rq->last_index,
4050 .last_term = rq->last_term,
4051 },
4052 };
4053 raft_send(raft, &rpy);
4054 }
4055}
4056
4057static void
4058raft_handle_install_snapshot_reply(
4059 struct raft *raft, const struct raft_install_snapshot_reply *rpy)
4060{
4061 /* We might get an InstallSnapshot reply from a configured server (e.g. a
4062 * peer) or a server in the process of being added. */
4063 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
4064 if (!s) {
4065 s = raft_find_new_server(raft, &rpy->common.sid);
4066 if (!s) {
4067 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4068 VLOG_INFO_RL(&rl, "cluster "CID_FMT": received %s from "
4069 "unknown server "SID_FMT, CID_ARGS(&raft->cid),
4070 raft_rpc_type_to_string(rpy->common.type),
4071 SID_ARGS(&rpy->common.sid));
4072 return;
4073 }
4074 }
4075
83fbd2e9
IM
4076 s->install_snapshot_request_in_progress = false;
4077
1b1d2e6d
BP
4078 if (rpy->last_index != raft->log_start - 1 ||
4079 rpy->last_term != raft->snap.term) {
4080 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4081 VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s installed "
4082 "out-of-date snapshot, starting over",
4083 CID_ARGS(&raft->cid), s->nickname);
4084 raft_send_install_snapshot_request(raft, s,
4085 "installed obsolete snapshot");
4086 return;
4087 }
4088
4089 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
4090 VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
4091 " up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
4092 s->nickname, rpy->last_term, rpy->last_index);
315e88cb
HZ
4093 s->next_index = raft->log_start;
4094 raft_send_append_request(raft, s, raft->log_end - s->next_index,
4095 "snapshot installed");
1b1d2e6d
BP
4096}
4097
4098/* Returns true if 'raft' has grown enough since the last snapshot that
4099 * reducing the log to a snapshot would be valuable, false otherwise. */
4100bool
4101raft_grew_lots(const struct raft *raft)
4102{
4103 return ovsdb_log_grew_lots(raft->log);
4104}
4105
4106/* Returns the number of log entries that could be trimmed off the on-disk log
4107 * by snapshotting. */
4108uint64_t
4109raft_get_log_length(const struct raft *raft)
4110{
4111 return (raft->last_applied < raft->log_start
4112 ? 0
4113 : raft->last_applied - raft->log_start + 1);
4114}
4115
4116/* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
4117 * possible. */
4118bool
4119raft_may_snapshot(const struct raft *raft)
4120{
4121 return (!raft->joining
4122 && !raft->leaving
4123 && !raft->left
4124 && !raft->failed
4125 && raft->last_applied >= raft->log_start);
4126}
4127
4128/* Replaces the log for 'raft', up to the last log entry read, by
4129 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
4130 * the caller must eventually free.
4131 *
4132 * This function can only succeed if raft_may_snapshot() returns true. It is
4133 * only valuable to call it if raft_get_log_length() is significant and
4134 * especially if raft_grew_lots() returns true. */
4135struct ovsdb_error * OVS_WARN_UNUSED_RESULT
4136raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
4137{
4138 if (raft->joining) {
4139 return ovsdb_error(NULL,
4140 "cannot store a snapshot while joining cluster");
4141 } else if (raft->leaving) {
4142 return ovsdb_error(NULL,
4143 "cannot store a snapshot while leaving cluster");
4144 } else if (raft->left) {
4145 return ovsdb_error(NULL,
4146 "cannot store a snapshot after leaving cluster");
4147 } else if (raft->failed) {
4148 return ovsdb_error(NULL,
4149 "cannot store a snapshot following failure");
4150 }
4151
4152 if (raft->last_applied < raft->log_start) {
4153 return ovsdb_error(NULL, "not storing a duplicate snapshot");
4154 }
4155
4156 uint64_t new_log_start = raft->last_applied + 1;
a521491b 4157 struct raft_entry new_snapshot = {
1b1d2e6d 4158 .term = raft_get_term(raft, new_log_start - 1),
a521491b 4159 .data = json_clone(new_snapshot_data),
1b1d2e6d 4160 .eid = *raft_get_eid(raft, new_log_start - 1),
a521491b 4161 .servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
8e354614 4162 .election_timer = raft->election_timer,
1b1d2e6d
BP
4163 };
4164 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
4165 &new_snapshot);
4166 if (error) {
a521491b 4167 raft_entry_uninit(&new_snapshot);
1b1d2e6d
BP
4168 return error;
4169 }
4170
4171 raft->log_synced = raft->log_end - 1;
4172 raft_entry_uninit(&raft->snap);
a521491b 4173 raft->snap = new_snapshot;
1b1d2e6d
BP
4174 for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
4175 raft_entry_uninit(&raft->entries[i]);
4176 }
4177 memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
4178 (raft->log_end - new_log_start) * sizeof *raft->entries);
4179 raft->log_start = new_log_start;
4180 return NULL;
4181}
4182
4183static void
4184raft_handle_become_leader(struct raft *raft,
4185 const struct raft_become_leader *rq)
4186{
4187 if (raft->role == RAFT_FOLLOWER) {
4188 char buf[SID_LEN + 1];
4189 VLOG_INFO("received leadership transfer from %s in term %"PRIu64,
4190 raft_get_nickname(raft, &rq->common.sid, buf, sizeof buf),
4191 rq->term);
4192 raft_start_election(raft, true);
4193 }
4194}
4195
4196static void
4197raft_send_execute_command_reply(struct raft *raft,
4198 const struct uuid *sid,
4199 const struct uuid *eid,
4200 enum raft_command_status status,
4201 uint64_t commit_index)
4202{
67dba070
HZ
4203 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REP) {
4204 ovs_fatal(0, "Raft test: crash before sending execute_command_reply");
4205 }
1b1d2e6d
BP
4206 union raft_rpc rpc = {
4207 .execute_command_reply = {
4208 .common = {
4209 .type = RAFT_RPC_EXECUTE_COMMAND_REPLY,
4210 .sid = *sid,
4211 },
4212 .result = *eid,
4213 .status = status,
4214 .commit_index = commit_index,
4215 },
4216 };
4217 raft_send(raft, &rpc);
67dba070
HZ
4218 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REP) {
4219 ovs_fatal(0, "Raft test: crash after sending execute_command_reply.");
4220 }
1b1d2e6d
BP
4221}
4222
4223static enum raft_command_status
4224raft_handle_execute_command_request__(
4225 struct raft *raft, const struct raft_execute_command_request *rq)
4226{
4227 if (raft->role != RAFT_LEADER) {
4228 return RAFT_CMD_NOT_LEADER;
4229 }
4230
4231 const struct uuid *current_eid = raft_current_eid(raft);
4232 if (!uuid_equals(&rq->prereq, current_eid)) {
4233 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4234 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
4235 "prerequisite "UUID_FMT" in execute_command_request",
4236 UUID_ARGS(current_eid), UUID_ARGS(&rq->prereq));
4237 return RAFT_CMD_BAD_PREREQ;
4238 }
4239
4240 struct raft_command *cmd = raft_command_initiate(raft, rq->data,
8e354614 4241 NULL, 0, &rq->result);
1b1d2e6d
BP
4242 cmd->sid = rq->common.sid;
4243
4244 enum raft_command_status status = cmd->status;
48b1c764 4245 raft_command_unref(cmd);
1b1d2e6d
BP
4246 return status;
4247}
4248
4249static void
4250raft_handle_execute_command_request(
4251 struct raft *raft, const struct raft_execute_command_request *rq)
4252{
4253 enum raft_command_status status
4254 = raft_handle_execute_command_request__(raft, rq);
4255 if (status != RAFT_CMD_INCOMPLETE) {
4256 raft_send_execute_command_reply(raft, &rq->common.sid, &rq->result,
4257 status, 0);
4258 }
4259}
4260
4261static void
4262raft_handle_execute_command_reply(
4263 struct raft *raft, const struct raft_execute_command_reply *rpy)
4264{
4265 struct raft_command *cmd = raft_find_command_by_eid(raft, &rpy->result);
4266 if (!cmd) {
4267 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4268 char buf[SID_LEN + 1];
4269 VLOG_INFO_RL(&rl,
4270 "%s received \"%s\" reply from %s for unknown command",
4271 raft->local_nickname,
4272 raft_command_status_to_string(rpy->status),
4273 raft_get_nickname(raft, &rpy->common.sid,
4274 buf, sizeof buf));
4275 return;
4276 }
4277
4278 if (rpy->status == RAFT_CMD_INCOMPLETE) {
4279 cmd->timestamp = time_msec();
4280 } else {
4281 cmd->index = rpy->commit_index;
4282 raft_command_complete(raft, cmd, rpy->status);
4283 }
4284}
4285
4286static void
4287raft_handle_rpc(struct raft *raft, const union raft_rpc *rpc)
4288{
e8451e14
LB
4289 struct raft_server *s = raft_find_server(raft, &rpc->common.sid);
4290 if (s) {
4291 s->last_msg_ts = time_msec();
4292 }
4293
1b1d2e6d
BP
4294 uint64_t term = raft_rpc_get_term(rpc);
4295 if (term
4296 && !raft_should_suppress_disruptive_server(raft, rpc)
4297 && !raft_receive_term__(raft, &rpc->common, term)) {
4298 if (rpc->type == RAFT_RPC_APPEND_REQUEST) {
4299 /* Section 3.3: "If a server receives a request with a stale term
4300 * number, it rejects the request." */
4301 raft_send_append_reply(raft, raft_append_request_cast(rpc),
4302 RAFT_APPEND_INCONSISTENCY, "stale term");
4303 }
4304 return;
4305 }
4306
4307 switch (rpc->type) {
4308#define RAFT_RPC(ENUM, NAME) \
4309 case ENUM: \
4310 raft_handle_##NAME(raft, &rpc->NAME); \
4311 break;
4312 RAFT_RPC_TYPES
4313#undef RAFT_RPC
4314 default:
4315 OVS_NOT_REACHED();
4316 }
4317}
4318\f
4319static bool
4320raft_rpc_is_heartbeat(const union raft_rpc *rpc)
4321{
4322 return ((rpc->type == RAFT_RPC_APPEND_REQUEST
4323 || rpc->type == RAFT_RPC_APPEND_REPLY)
4324 && rpc->common.comment
4325 && !strcmp(rpc->common.comment, "heartbeat"));
4326}
4327
4328\f
4329static bool
02acb41a
BP
4330raft_send_to_conn_at(struct raft *raft, const union raft_rpc *rpc,
4331 struct raft_conn *conn, int line_number)
1b1d2e6d 4332{
02acb41a 4333 log_rpc(rpc, "-->", conn, line_number);
1b1d2e6d
BP
4334 return !jsonrpc_session_send(
4335 conn->js, raft_rpc_to_jsonrpc(&raft->cid, &raft->sid, rpc));
4336}
4337
4338static bool
4339raft_is_rpc_synced(const struct raft *raft, const union raft_rpc *rpc)
4340{
4341 uint64_t term = raft_rpc_get_term(rpc);
4342 uint64_t index = raft_rpc_get_min_sync_index(rpc);
4343 const struct uuid *vote = raft_rpc_get_vote(rpc);
4344
4345 return (term <= raft->synced_term
4346 && index <= raft->log_synced
4347 && (!vote || uuid_equals(vote, &raft->synced_vote)));
4348}
4349
4350static bool
02acb41a 4351raft_send_at(struct raft *raft, const union raft_rpc *rpc, int line_number)
1b1d2e6d
BP
4352{
4353 const struct uuid *dst = &rpc->common.sid;
4354 if (uuid_equals(dst, &raft->sid)) {
4355 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
02acb41a
BP
4356 VLOG_WARN_RL(&rl, "attempted to send RPC to self from raft.c:%d",
4357 line_number);
1b1d2e6d
BP
4358 return false;
4359 }
4360
4361 struct raft_conn *conn = raft_find_conn_by_sid(raft, dst);
4362 if (!conn) {
4363 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4364 char buf[SID_LEN + 1];
02acb41a
BP
4365 VLOG_DBG_RL(&rl, "%s: no connection to %s, cannot send RPC "
4366 "from raft.c:%d", raft->local_nickname,
4367 raft_get_nickname(raft, dst, buf, sizeof buf),
4368 line_number);
1b1d2e6d
BP
4369 return false;
4370 }
4371
4372 if (!raft_is_rpc_synced(raft, rpc)) {
4373 raft_waiter_create(raft, RAFT_W_RPC, false)->rpc = raft_rpc_clone(rpc);
4374 return true;
4375 }
4376
02acb41a 4377 return raft_send_to_conn_at(raft, rpc, conn, line_number);
1b1d2e6d
BP
4378}
4379\f
4380static struct raft *
4381raft_lookup_by_name(const char *name)
4382{
4383 struct raft *raft;
4384
4385 HMAP_FOR_EACH_WITH_HASH (raft, hmap_node, hash_string(name, 0),
4386 &all_rafts) {
4387 if (!strcmp(raft->name, name)) {
4388 return raft;
4389 }
4390 }
4391 return NULL;
4392}
4393
4394static void
4395raft_unixctl_cid(struct unixctl_conn *conn,
4396 int argc OVS_UNUSED, const char *argv[],
4397 void *aux OVS_UNUSED)
4398{
4399 struct raft *raft = raft_lookup_by_name(argv[1]);
4400 if (!raft) {
4401 unixctl_command_reply_error(conn, "unknown cluster");
4402 } else if (uuid_is_zero(&raft->cid)) {
4403 unixctl_command_reply_error(conn, "cluster id not yet known");
4404 } else {
4405 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->cid));
4406 unixctl_command_reply(conn, uuid);
4407 free(uuid);
4408 }
4409}
4410
4411static void
4412raft_unixctl_sid(struct unixctl_conn *conn,
4413 int argc OVS_UNUSED, const char *argv[],
4414 void *aux OVS_UNUSED)
4415{
4416 struct raft *raft = raft_lookup_by_name(argv[1]);
4417 if (!raft) {
4418 unixctl_command_reply_error(conn, "unknown cluster");
4419 } else {
4420 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->sid));
4421 unixctl_command_reply(conn, uuid);
4422 free(uuid);
4423 }
4424}
4425
4426static void
4427raft_put_sid(const char *title, const struct uuid *sid,
4428 const struct raft *raft, struct ds *s)
4429{
4430 ds_put_format(s, "%s: ", title);
4431 if (uuid_equals(sid, &raft->sid)) {
4432 ds_put_cstr(s, "self");
4433 } else if (uuid_is_zero(sid)) {
4434 ds_put_cstr(s, "unknown");
4435 } else {
4436 char buf[SID_LEN + 1];
4437 ds_put_cstr(s, raft_get_nickname(raft, sid, buf, sizeof buf));
4438 }
4439 ds_put_char(s, '\n');
4440}
4441
4442static void
4443raft_unixctl_status(struct unixctl_conn *conn,
4444 int argc OVS_UNUSED, const char *argv[],
4445 void *aux OVS_UNUSED)
4446{
4447 struct raft *raft = raft_lookup_by_name(argv[1]);
4448 if (!raft) {
4449 unixctl_command_reply_error(conn, "unknown cluster");
4450 return;
4451 }
4452
4453 struct ds s = DS_EMPTY_INITIALIZER;
4454 ds_put_format(&s, "%s\n", raft->local_nickname);
4455 ds_put_format(&s, "Name: %s\n", raft->name);
4456 ds_put_format(&s, "Cluster ID: ");
4457 if (!uuid_is_zero(&raft->cid)) {
4458 ds_put_format(&s, CID_FMT" ("UUID_FMT")\n",
4459 CID_ARGS(&raft->cid), UUID_ARGS(&raft->cid));
4460 } else {
4461 ds_put_format(&s, "not yet known\n");
4462 }
4463 ds_put_format(&s, "Server ID: "SID_FMT" ("UUID_FMT")\n",
4464 SID_ARGS(&raft->sid), UUID_ARGS(&raft->sid));
4465 ds_put_format(&s, "Address: %s\n", raft->local_address);
4466 ds_put_format(&s, "Status: %s\n",
4467 raft->joining ? "joining cluster"
4468 : raft->leaving ? "leaving cluster"
4469 : raft->left ? "left cluster"
4470 : raft->failed ? "failed"
4471 : "cluster member");
4472 if (raft->joining) {
4473 ds_put_format(&s, "Remotes for joining:");
4474 const char *address;
4475 SSET_FOR_EACH (address, &raft->remote_addresses) {
4476 ds_put_format(&s, " %s", address);
4477 }
4478 ds_put_char(&s, '\n');
4479 }
4480 if (raft->role == RAFT_LEADER) {
4481 struct raft_server *as;
4482 HMAP_FOR_EACH (as, hmap_node, &raft->add_servers) {
4483 ds_put_format(&s, "Adding server %s ("SID_FMT" at %s) (%s)\n",
4484 as->nickname, SID_ARGS(&as->sid), as->address,
4485 raft_server_phase_to_string(as->phase));
4486 }
4487
4488 struct raft_server *rs = raft->remove_server;
4489 if (rs) {
4490 ds_put_format(&s, "Removing server %s ("SID_FMT" at %s) (%s)\n",
4491 rs->nickname, SID_ARGS(&rs->sid), rs->address,
4492 raft_server_phase_to_string(rs->phase));
4493 }
4494 }
4495
4496 ds_put_format(&s, "Role: %s\n",
4497 raft->role == RAFT_LEADER ? "leader"
4498 : raft->role == RAFT_CANDIDATE ? "candidate"
4499 : raft->role == RAFT_FOLLOWER ? "follower"
4500 : "<error>");
4501 ds_put_format(&s, "Term: %"PRIu64"\n", raft->term);
4502 raft_put_sid("Leader", &raft->leader_sid, raft, &s);
4503 raft_put_sid("Vote", &raft->vote, raft, &s);
4504 ds_put_char(&s, '\n');
4505
e8451e14
LB
4506 if (raft->election_start) {
4507 ds_put_format(&s,
4508 "Last Election started %"PRIu64" ms ago, reason: %s\n",
4509 (uint64_t) (time_msec() - raft->election_start),
4510 raft->leadership_transfer
4511 ? "leadership_transfer" : "timeout");
4512 }
4513 if (raft->election_won) {
4514 ds_put_format(&s, "Last Election won: %"PRIu64" ms ago\n",
4515 (uint64_t) (time_msec() - raft->election_won));
4516 }
8e354614
HZ
4517 ds_put_format(&s, "Election timer: %"PRIu64, raft->election_timer);
4518 if (raft->role == RAFT_LEADER && raft->election_timer_new) {
4519 ds_put_format(&s, " (changing to %"PRIu64")",
4520 raft->election_timer_new);
4521 }
4522 ds_put_char(&s, '\n');
4523
1b1d2e6d
BP
4524 ds_put_format(&s, "Log: [%"PRIu64", %"PRIu64"]\n",
4525 raft->log_start, raft->log_end);
4526
4527 uint64_t n_uncommitted = raft->log_end - raft->commit_index - 1;
4528 ds_put_format(&s, "Entries not yet committed: %"PRIu64"\n", n_uncommitted);
4529
4530 uint64_t n_unapplied = raft->log_end - raft->last_applied - 1;
4531 ds_put_format(&s, "Entries not yet applied: %"PRIu64"\n", n_unapplied);
4532
4533 const struct raft_conn *c;
4534 ds_put_cstr(&s, "Connections:");
4535 LIST_FOR_EACH (c, list_node, &raft->conns) {
4536 bool connected = jsonrpc_session_is_connected(c->js);
4537 ds_put_format(&s, " %s%s%s%s",
4538 connected ? "" : "(",
4539 c->incoming ? "<-" : "->", c->nickname,
4540 connected ? "" : ")");
4541 }
4542 ds_put_char(&s, '\n');
4543
e8451e14
LB
4544 ds_put_format(&s, "Disconnections: %u\n", raft->n_disconnections);
4545
1b1d2e6d
BP
4546 ds_put_cstr(&s, "Servers:\n");
4547 struct raft_server *server;
4548 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
4549 ds_put_format(&s, " %s ("SID_FMT" at %s)",
4550 server->nickname,
4551 SID_ARGS(&server->sid), server->address);
4552 if (uuid_equals(&server->sid, &raft->sid)) {
4553 ds_put_cstr(&s, " (self)");
4554 }
4555 if (server->phase != RAFT_PHASE_STABLE) {
4556 ds_put_format (&s, " (%s)",
4557 raft_server_phase_to_string(server->phase));
4558 }
4559 if (raft->role == RAFT_CANDIDATE) {
4560 if (!uuid_is_zero(&server->vote)) {
4561 char buf[SID_LEN + 1];
4562 ds_put_format(&s, " (voted for %s)",
4563 raft_get_nickname(raft, &server->vote,
4564 buf, sizeof buf));
4565 }
4566 } else if (raft->role == RAFT_LEADER) {
4567 ds_put_format(&s, " next_index=%"PRIu64" match_index=%"PRIu64,
4568 server->next_index, server->match_index);
4569 }
e8451e14
LB
4570 if (server->last_msg_ts) {
4571 ds_put_format(&s, " last msg %"PRIu64" ms ago",
4572 (uint64_t) (time_msec() - server->last_msg_ts));
4573 }
1b1d2e6d
BP
4574 ds_put_char(&s, '\n');
4575 }
4576
4577 unixctl_command_reply(conn, ds_cstr(&s));
4578 ds_destroy(&s);
4579}
4580
4581static void
4582raft_unixctl_leave__(struct unixctl_conn *conn, struct raft *raft)
4583{
4584 if (raft_is_leaving(raft)) {
4585 unixctl_command_reply_error(conn,
4586 "already in progress leaving cluster");
4587 } else if (raft_is_joining(raft)) {
4588 unixctl_command_reply_error(conn,
4589 "can't leave while join in progress");
4590 } else if (raft_failed(raft)) {
4591 unixctl_command_reply_error(conn,
4592 "can't leave after failure");
4593 } else {
4594 raft_leave(raft);
4595 unixctl_command_reply(conn, NULL);
4596 }
4597}
4598
4599static void
4600raft_unixctl_leave(struct unixctl_conn *conn, int argc OVS_UNUSED,
4601 const char *argv[], void *aux OVS_UNUSED)
4602{
4603 struct raft *raft = raft_lookup_by_name(argv[1]);
4604 if (!raft) {
4605 unixctl_command_reply_error(conn, "unknown cluster");
4606 return;
4607 }
4608
4609 raft_unixctl_leave__(conn, raft);
4610}
4611
4612static struct raft_server *
4613raft_lookup_server_best_match(struct raft *raft, const char *id)
4614{
4615 struct raft_server *best = NULL;
4616 int best_score = -1;
4617 int n_best = 0;
4618
4619 struct raft_server *s;
4620 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
4621 int score = (!strcmp(id, s->address)
4622 ? INT_MAX
4623 : uuid_is_partial_match(&s->sid, id));
4624 if (score > best_score) {
4625 best = s;
4626 best_score = score;
4627 n_best = 1;
4628 } else if (score == best_score) {
4629 n_best++;
4630 }
4631 }
4632 return n_best == 1 ? best : NULL;
4633}
4634
4635static void
4636raft_unixctl_kick(struct unixctl_conn *conn, int argc OVS_UNUSED,
4637 const char *argv[], void *aux OVS_UNUSED)
4638{
4639 const char *cluster_name = argv[1];
4640 const char *server_name = argv[2];
4641
4642 struct raft *raft = raft_lookup_by_name(cluster_name);
4643 if (!raft) {
4644 unixctl_command_reply_error(conn, "unknown cluster");
4645 return;
4646 }
4647
4648 struct raft_server *server = raft_lookup_server_best_match(raft,
4649 server_name);
4650 if (!server) {
4651 unixctl_command_reply_error(conn, "unknown server");
4652 return;
4653 }
4654
4655 if (uuid_equals(&server->sid, &raft->sid)) {
4656 raft_unixctl_leave__(conn, raft);
4657 } else if (raft->role == RAFT_LEADER) {
4658 const struct raft_remove_server_request rq = {
4659 .sid = server->sid,
4660 .requester_conn = conn,
4661 };
4662 raft_handle_remove_server_request(raft, &rq);
4663 } else {
4664 const union raft_rpc rpc = {
4665 .remove_server_request = {
4666 .common = {
4667 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
4668 .sid = raft->leader_sid,
4669 .comment = "via unixctl"
4670 },
4671 .sid = server->sid,
4672 }
4673 };
4674 if (raft_send(raft, &rpc)) {
4675 unixctl_command_reply(conn, "sent removal request to leader");
4676 } else {
4677 unixctl_command_reply_error(conn,
4678 "failed to send removal request");
4679 }
4680 }
4681}
4682
8e354614
HZ
4683static void
4684raft_get_election_timer_from_log(struct raft *raft)
4685{
4686 if (raft->snap.election_timer) {
4687 raft->election_timer = raft->snap.election_timer;
4688 }
4689 for (uint64_t index = raft->commit_index; index >= raft->log_start;
4690 index--) {
4691 struct raft_entry *e = &raft->entries[index - raft->log_start];
4692 if (e->election_timer) {
4693 raft->election_timer = e->election_timer;
4694 break;
4695 }
4696 }
4697}
4698
4699static void
4700raft_log_election_timer(struct raft *raft)
4701{
4702 raft_command_unref(raft_command_execute__(raft, NULL, NULL,
4703 raft->election_timer_new, NULL,
4704 NULL));
4705}
4706
4707static void
4708raft_unixctl_change_election_timer(struct unixctl_conn *conn,
4709 int argc OVS_UNUSED, const char *argv[],
4710 void *aux OVS_UNUSED)
4711{
4712 const char *cluster_name = argv[1];
4713 const char *election_timer_str = argv[2];
4714
4715 struct raft *raft = raft_lookup_by_name(cluster_name);
4716 if (!raft) {
4717 unixctl_command_reply_error(conn, "unknown cluster");
4718 return;
4719 }
4720
4721 if (raft->role != RAFT_LEADER) {
4722 unixctl_command_reply_error(conn, "election timer must be changed"
4723 " through leader.");
4724 return;
4725 }
4726
4727 /* If there are pending changes for election timer, reject it. */
4728 if (raft->election_timer_new) {
4729 unixctl_command_reply_error(conn, "election timer change pending.");
4730 return;
4731 }
4732
4733 uint64_t election_timer = atoll(election_timer_str);
4734 if (election_timer == raft->election_timer) {
4735 unixctl_command_reply(conn, "change election timer to current value.");
4736 return;
4737 }
4738
4739 /* Election timer smaller than 100ms or bigger than 10min doesn't make
4740 * sense. */
4741 if (election_timer < 100 || election_timer > 600000) {
4742 unixctl_command_reply_error(conn, "election timer must be between "
4743 "100 and 600000, in msec.");
4744 return;
4745 }
4746
4747 /* If election timer is to be enlarged, it should be done gradually so that
4748 * it won't cause timeout when new value is applied on leader but not yet
4749 * applied on some of the followers. */
4750 if (election_timer > raft->election_timer * 2) {
4751 unixctl_command_reply_error(conn, "election timer increase should not "
4752 "exceed the current value x 2.");
4753 return;
4754 }
4755
4756 raft->election_timer_new = election_timer;
4757 raft_log_election_timer(raft);
4758 unixctl_command_reply(conn, "change of election timer initiated.");
4759}
4760
80e3becd
IM
4761static void
4762raft_unixctl_set_backlog_threshold(struct unixctl_conn *conn,
4763 int argc OVS_UNUSED, const char *argv[],
4764 void *aux OVS_UNUSED)
4765{
4766 const char *cluster_name = argv[1];
4767 unsigned long long n_msgs, n_bytes;
4768 struct raft_conn *r_conn;
4769
4770 struct raft *raft = raft_lookup_by_name(cluster_name);
4771 if (!raft) {
4772 unixctl_command_reply_error(conn, "unknown cluster");
4773 return;
4774 }
4775
4776 if (!str_to_ullong(argv[2], 10, &n_msgs)
4777 || !str_to_ullong(argv[3], 10, &n_bytes)) {
4778 unixctl_command_reply_error(conn, "invalid argument");
4779 return;
4780 }
4781
4782 if (n_msgs < 50 || n_msgs > SIZE_MAX || n_bytes > SIZE_MAX) {
4783 unixctl_command_reply_error(conn, "values out of range");
4784 return;
4785 }
4786
4787 raft->conn_backlog_max_n_msgs = n_msgs;
4788 raft->conn_backlog_max_n_bytes = n_bytes;
4789
4790 LIST_FOR_EACH (r_conn, list_node, &raft->conns) {
4791 jsonrpc_session_set_backlog_threshold(r_conn->js, n_msgs, n_bytes);
4792 }
4793
4794 unixctl_command_reply(conn, NULL);
4795}
4796
67dba070
HZ
4797static void
4798raft_unixctl_failure_test(struct unixctl_conn *conn OVS_UNUSED,
4799 int argc OVS_UNUSED, const char *argv[],
4800 void *aux OVS_UNUSED)
4801{
4802 const char *test = argv[1];
4803 if (!strcmp(test, "crash-before-sending-append-request")) {
4804 failure_test = FT_CRASH_BEFORE_SEND_APPEND_REQ;
4805 } else if (!strcmp(test, "crash-after-sending-append-request")) {
4806 failure_test = FT_CRASH_AFTER_SEND_APPEND_REQ;
4807 } else if (!strcmp(test, "crash-before-sending-execute-command-reply")) {
4808 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REP;
4809 } else if (!strcmp(test, "crash-after-sending-execute-command-reply")) {
4810 failure_test = FT_CRASH_AFTER_SEND_EXEC_REP;
4811 } else if (!strcmp(test, "crash-before-sending-execute-command-request")) {
4812 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REQ;
4813 } else if (!strcmp(test, "crash-after-sending-execute-command-request")) {
4814 failure_test = FT_CRASH_AFTER_SEND_EXEC_REQ;
4815 } else if (!strcmp(test, "crash-after-receiving-append-request-update")) {
4816 failure_test = FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE;
4817 } else if (!strcmp(test, "delay-election")) {
4818 failure_test = FT_DELAY_ELECTION;
4819 struct raft *raft;
4820 HMAP_FOR_EACH (raft, hmap_node, &all_rafts) {
4821 if (raft->role == RAFT_FOLLOWER) {
4822 raft_reset_election_timer(raft);
4823 }
4824 }
93ee4209
HZ
4825 } else if (!strcmp(test, "dont-send-vote-request")) {
4826 failure_test = FT_DONT_SEND_VOTE_REQUEST;
67dba070
HZ
4827 } else if (!strcmp(test, "clear")) {
4828 failure_test = FT_NO_TEST;
4829 unixctl_command_reply(conn, "test dismissed");
4830 return;
4831 } else {
4832 unixctl_command_reply_error(conn, "unknown test scenario");
4833 return;
4834 }
4835 unixctl_command_reply(conn, "test engaged");
4836}
4837
1b1d2e6d
BP
4838static void
4839raft_init(void)
4840{
4841 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
4842 if (!ovsthread_once_start(&once)) {
4843 return;
4844 }
4845 unixctl_command_register("cluster/cid", "DB", 1, 1,
4846 raft_unixctl_cid, NULL);
4847 unixctl_command_register("cluster/sid", "DB", 1, 1,
4848 raft_unixctl_sid, NULL);
4849 unixctl_command_register("cluster/status", "DB", 1, 1,
4850 raft_unixctl_status, NULL);
4851 unixctl_command_register("cluster/leave", "DB", 1, 1,
4852 raft_unixctl_leave, NULL);
4853 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4854 raft_unixctl_kick, NULL);
8e354614
HZ
4855 unixctl_command_register("cluster/change-election-timer", "DB TIME", 2, 2,
4856 raft_unixctl_change_election_timer, NULL);
80e3becd
IM
4857 unixctl_command_register("cluster/set-backlog-threshold",
4858 "DB N_MSGS N_BYTES", 3, 3,
4859 raft_unixctl_set_backlog_threshold, NULL);
67dba070
HZ
4860 unixctl_command_register("cluster/failure-test", "FAILURE SCENARIO", 1, 1,
4861 raft_unixctl_failure_test, NULL);
1b1d2e6d
BP
4862 ovsthread_once_done(&once);
4863}