]> git.proxmox.com Git - mirror_ovs.git/blame - ovsdb/raft.c
raft: Report jsonrpc backlog in kilobytes.
[mirror_ovs.git] / ovsdb / raft.c
CommitLineData
1b1d2e6d
BP
1/*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <config.h>
18
19#include "raft.h"
20#include "raft-private.h"
21
22#include <errno.h>
23#include <unistd.h>
24
25#include "hash.h"
26#include "jsonrpc.h"
27#include "lockfile.h"
28#include "openvswitch/dynamic-string.h"
29#include "openvswitch/hmap.h"
30#include "openvswitch/json.h"
31#include "openvswitch/list.h"
32#include "openvswitch/poll-loop.h"
33#include "openvswitch/vlog.h"
34#include "ovsdb-error.h"
35#include "ovsdb-parser.h"
36#include "ovsdb/log.h"
37#include "raft-rpc.h"
38#include "random.h"
3423cd97 39#include "simap.h"
1b1d2e6d
BP
40#include "socket-util.h"
41#include "stream.h"
42#include "timeval.h"
43#include "unicode.h"
44#include "unixctl.h"
45#include "util.h"
46#include "uuid.h"
47
48VLOG_DEFINE_THIS_MODULE(raft);
49
50/* Roles for a Raft server:
51 *
52 * - Followers: Servers in touch with the current leader.
53 *
54 * - Candidate: Servers unaware of a current leader and seeking election to
55 * leader.
56 *
57 * - Leader: Handles all client requests. At most one at a time.
58 *
59 * In normal operation there is exactly one leader and all of the other servers
60 * are followers. */
61enum raft_role {
62 RAFT_FOLLOWER,
63 RAFT_CANDIDATE,
64 RAFT_LEADER
65};
66
67dba070
HZ
67/* Flags for unit tests. */
68enum raft_failure_test {
69 FT_NO_TEST,
70 FT_CRASH_BEFORE_SEND_APPEND_REQ,
71 FT_CRASH_AFTER_SEND_APPEND_REQ,
72 FT_CRASH_BEFORE_SEND_EXEC_REP,
73 FT_CRASH_AFTER_SEND_EXEC_REP,
74 FT_CRASH_BEFORE_SEND_EXEC_REQ,
75 FT_CRASH_AFTER_SEND_EXEC_REQ,
76 FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE,
93ee4209
HZ
77 FT_DELAY_ELECTION,
78 FT_DONT_SEND_VOTE_REQUEST
67dba070
HZ
79};
80static enum raft_failure_test failure_test;
81
1b1d2e6d
BP
82/* A connection between this Raft server and another one. */
83struct raft_conn {
84 struct ovs_list list_node; /* In struct raft's 'conns' list. */
85 struct jsonrpc_session *js; /* JSON-RPC connection. */
86 struct uuid sid; /* This server's unique ID. */
87 char *nickname; /* Short name for use in log messages. */
88 bool incoming; /* True if incoming, false if outgoing. */
89 unsigned int js_seqno; /* Seqno for noticing (re)connections. */
90};
91
92static void raft_conn_close(struct raft_conn *);
93
94/* A "command", that is, a request to append an entry to the log.
95 *
96 * The Raft specification only allows clients to issue commands to the leader.
97 * With this implementation, clients may issue a command on any server, which
98 * then relays the command to the leader if necessary.
99 *
100 * This structure is thus used in three cases:
101 *
102 * 1. We are the leader and the command was issued to us directly.
103 *
104 * 2. We are a follower and relayed the command to the leader.
105 *
106 * 3. We are the leader and a follower relayed the command to us.
107 */
108struct raft_command {
109 /* All cases. */
110 struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
111 unsigned int n_refs; /* Reference count. */
112 enum raft_command_status status; /* Execution status. */
7ef36089 113 struct uuid eid; /* Entry ID of result. */
1b1d2e6d
BP
114
115 /* Case 1 only. */
116 uint64_t index; /* Index in log (0 if being relayed). */
117
1b1d2e6d
BP
118 /* Case 2 only. */
119 long long int timestamp; /* Issue or last ping time, for expiration. */
120
121 /* Case 3 only. */
122 struct uuid sid; /* The follower (otherwise UUID_ZERO). */
123};
124
125static void raft_command_complete(struct raft *, struct raft_command *,
126 enum raft_command_status);
127
128static void raft_complete_all_commands(struct raft *,
129 enum raft_command_status);
130
131/* Type of deferred action, see struct raft_waiter. */
132enum raft_waiter_type {
133 RAFT_W_ENTRY,
134 RAFT_W_TERM,
135 RAFT_W_RPC,
136};
137
138/* An action deferred until a log write commits to disk. */
139struct raft_waiter {
140 struct ovs_list list_node;
141 uint64_t commit_ticket;
142
143 enum raft_waiter_type type;
144 union {
145 /* RAFT_W_ENTRY.
146 *
147 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
148 * completion, updates 'log_synced' to indicate that the new log entry
149 * or entries are committed and, if we are leader, also updates our
150 * local 'match_index'. */
151 struct {
152 uint64_t index;
153 } entry;
154
155 /* RAFT_W_TERM.
156 *
157 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
158 * Upon completion, updates 'synced_term' and 'synced_vote', which
159 * triggers sending RPCs deferred by the uncommitted 'term' and
160 * 'vote'. */
161 struct {
162 uint64_t term;
163 struct uuid vote;
164 } term;
165
166 /* RAFT_W_RPC.
167 *
168 * Sometimes, sending an RPC to a peer must be delayed until an entry,
169 * a term, or a vote mentioned in the RPC is synced to disk. This
170 * waiter keeps a copy of such an RPC until the previous waiters have
171 * committed. */
172 union raft_rpc *rpc;
173 };
174};
175
176static struct raft_waiter *raft_waiter_create(struct raft *,
177 enum raft_waiter_type,
178 bool start_commit);
179static void raft_waiters_destroy(struct raft *);
180
181/* The Raft state machine. */
182struct raft {
183 struct hmap_node hmap_node; /* In 'all_rafts'. */
184 struct ovsdb_log *log;
185
186/* Persistent derived state.
187 *
188 * This must be updated on stable storage before responding to RPCs. It can be
189 * derived from the header, snapshot, and log in 'log'. */
190
191 struct uuid cid; /* Cluster ID (immutable for the cluster). */
192 struct uuid sid; /* Server ID (immutable for the server). */
193 char *local_address; /* Local address (immutable for the server). */
194 char *local_nickname; /* Used for local server in log messages. */
195 char *name; /* Schema name (immutable for the cluster). */
196
197 /* Contains "struct raft_server"s and represents the server configuration
198 * most recently added to 'log'. */
199 struct hmap servers;
200
8e354614
HZ
201#define ELECTION_BASE_MSEC 1000
202#define ELECTION_RANGE_MSEC 1000
203 /* The election timeout base value for leader election, in milliseconds.
204 * It can be set by unixctl cluster/change-election-timer. Default value is
205 * ELECTION_BASE_MSEC. */
206 uint64_t election_timer;
207 /* If not 0, it is the new value of election_timer being proposed. */
208 uint64_t election_timer_new;
209
1b1d2e6d
BP
210/* Persistent state on all servers.
211 *
212 * Must be updated on stable storage before responding to RPCs. */
213
214 /* Current term and the vote for that term. These might be on the way to
215 * disk now. */
216 uint64_t term; /* Initialized to 0 and only increases. */
217 struct uuid vote; /* All-zeros if no vote yet in 'term'. */
218
219 /* The term and vote that have been synced to disk. */
220 uint64_t synced_term;
221 struct uuid synced_vote;
222
223 /* The log.
224 *
225 * A log entry with index 1 never really exists; the initial snapshot for a
226 * Raft is considered to include this index. The first real log entry has
227 * index 2.
228 *
229 * A new Raft instance contains an empty log: log_start=2, log_end=2.
230 * Over time, the log grows: log_start=2, log_end=N.
231 * At some point, the server takes a snapshot: log_start=N, log_end=N.
232 * The log continues to grow: log_start=N, log_end=N+1...
233 *
234 * Must be updated on stable storage before responding to RPCs. */
235 struct raft_entry *entries; /* Log entry i is in log[i - log_start]. */
236 uint64_t log_start; /* Index of first entry in log. */
237 uint64_t log_end; /* Index of last entry in log, plus 1. */
238 uint64_t log_synced; /* Index of last synced entry. */
239 size_t allocated_log; /* Allocated entries in 'log'. */
240
241 /* Snapshot state (see Figure 5.1)
242 *
243 * This is the state of the cluster as of the last discarded log entry,
244 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
245 * Only committed log entries can be included in a snapshot. */
246 struct raft_entry snap;
247
248/* Volatile state.
249 *
250 * The snapshot is always committed, but the rest of the log might not be yet.
251 * 'last_applied' tracks what entries have been passed to the client. If the
252 * client hasn't yet read the latest snapshot, then even the snapshot isn't
253 * applied yet. Thus, the invariants are different for these members:
254 *
255 * log_start - 2 <= last_applied <= commit_index < log_end.
256 * log_start - 1 <= commit_index < log_end.
257 */
258
259 enum raft_role role; /* Current role. */
260 uint64_t commit_index; /* Max log index known to be committed. */
261 uint64_t last_applied; /* Max log index applied to state machine. */
262 struct uuid leader_sid; /* Server ID of leader (zero, if unknown). */
263
1b1d2e6d
BP
264 long long int election_base; /* Time of last heartbeat from leader. */
265 long long int election_timeout; /* Time at which we start an election. */
266
267 /* Used for joining a cluster. */
268 bool joining; /* Attempting to join the cluster? */
269 struct sset remote_addresses; /* Addresses to try to find other servers. */
270 long long int join_timeout; /* Time to re-send add server request. */
271
272 /* Used for leaving a cluster. */
273 bool leaving; /* True if we are leaving the cluster. */
274 bool left; /* True if we have finished leaving. */
275 long long int leave_timeout; /* Time to re-send remove server request. */
276
277 /* Failure. */
278 bool failed; /* True if unrecoverable error has occurred. */
279
280 /* File synchronization. */
281 struct ovs_list waiters; /* Contains "struct raft_waiter"s. */
282
283 /* Network connections. */
284 struct pstream *listener; /* For connections from other Raft servers. */
285 long long int listen_backoff; /* For retrying creating 'listener'. */
286 struct ovs_list conns; /* Contains struct raft_conns. */
287
288 /* Leaders only. Reinitialized after becoming leader. */
289 struct hmap add_servers; /* Contains "struct raft_server"s to add. */
290 struct raft_server *remove_server; /* Server being removed. */
291 struct hmap commands; /* Contains "struct raft_command"s. */
1b1d2e6d
BP
292 long long int ping_timeout; /* Time at which to send a heartbeat */
293
294 /* Candidates only. Reinitialized at start of election. */
295 int n_votes; /* Number of votes for me. */
923f01ca
HZ
296
297 /* Followers and candidates only. */
298 bool candidate_retrying; /* The earlier election timed-out and we are
299 now retrying. */
300 bool had_leader; /* There has been leader elected since last
301 election initiated. This is to help setting
302 candidate_retrying. */
2833885f
HZ
303
304 /* For all. */
305 bool ever_had_leader; /* There has been leader elected since the raft
306 is initialized, meaning it is ever
307 connected. */
1b1d2e6d
BP
308};
309
310/* All Raft structures. */
311static struct hmap all_rafts = HMAP_INITIALIZER(&all_rafts);
312
313static void raft_init(void);
314
315static struct ovsdb_error *raft_read_header(struct raft *)
316 OVS_WARN_UNUSED_RESULT;
317
318static void raft_send_execute_command_reply(struct raft *,
319 const struct uuid *sid,
320 const struct uuid *eid,
321 enum raft_command_status,
322 uint64_t commit_index);
323
324static void raft_update_our_match_index(struct raft *, uint64_t min_index);
325
326static void raft_send_remove_server_reply__(
327 struct raft *, const struct uuid *target_sid,
328 const struct uuid *requester_sid, struct unixctl_conn *requester_conn,
329 bool success, const char *comment);
17bd4149 330static void raft_finished_leaving_cluster(struct raft *);
1b1d2e6d
BP
331
332static void raft_server_init_leader(struct raft *, struct raft_server *);
333
334static bool raft_rpc_is_heartbeat(const union raft_rpc *);
335static bool raft_is_rpc_synced(const struct raft *, const union raft_rpc *);
336
337static void raft_handle_rpc(struct raft *, const union raft_rpc *);
17bd4149 338
02acb41a
BP
339static bool raft_send_at(struct raft *, const union raft_rpc *,
340 int line_number);
341#define raft_send(raft, rpc) raft_send_at(raft, rpc, __LINE__)
342
343static bool raft_send_to_conn_at(struct raft *, const union raft_rpc *,
344 struct raft_conn *, int line_number);
345#define raft_send_to_conn(raft, rpc, conn) \
346 raft_send_to_conn_at(raft, rpc, conn, __LINE__)
347
1b1d2e6d
BP
348static void raft_send_append_request(struct raft *,
349 struct raft_server *, unsigned int n,
350 const char *comment);
351
352static void raft_become_leader(struct raft *);
353static void raft_become_follower(struct raft *);
0f954f32
HZ
354static void raft_reset_election_timer(struct raft *);
355static void raft_reset_ping_timer(struct raft *);
1b1d2e6d
BP
356static void raft_send_heartbeats(struct raft *);
357static void raft_start_election(struct raft *, bool leadership_transfer);
358static bool raft_truncate(struct raft *, uint64_t new_end);
359static void raft_get_servers_from_log(struct raft *, enum vlog_level);
8e354614 360static void raft_get_election_timer_from_log(struct raft *);
1b1d2e6d
BP
361
362static bool raft_handle_write_error(struct raft *, struct ovsdb_error *);
363
364static void raft_run_reconfigure(struct raft *);
365
923f01ca 366static void raft_set_leader(struct raft *, const struct uuid *sid);
1b1d2e6d
BP
367static struct raft_server *
368raft_find_server(const struct raft *raft, const struct uuid *sid)
369{
370 return raft_server_find(&raft->servers, sid);
371}
372
373static char *
374raft_make_address_passive(const char *address_)
375{
376 if (!strncmp(address_, "unix:", 5)) {
377 return xasprintf("p%s", address_);
378 } else {
379 char *address = xstrdup(address_);
0b043300
BP
380 char *host, *port;
381 inet_parse_host_port_tokens(strchr(address, ':') + 1, &host, &port);
1b1d2e6d
BP
382
383 struct ds paddr = DS_EMPTY_INITIALIZER;
384 ds_put_format(&paddr, "p%.3s:%s:", address, port);
385 if (strchr(host, ':')) {
386 ds_put_format(&paddr, "[%s]", host);
387 } else {
388 ds_put_cstr(&paddr, host);
389 }
390 free(address);
391 return ds_steal_cstr(&paddr);
392 }
393}
394
395static struct raft *
396raft_alloc(void)
397{
398 raft_init();
399
400 struct raft *raft = xzalloc(sizeof *raft);
401 hmap_node_nullify(&raft->hmap_node);
402 hmap_init(&raft->servers);
403 raft->log_start = raft->log_end = 1;
404 raft->role = RAFT_FOLLOWER;
405 sset_init(&raft->remote_addresses);
406 raft->join_timeout = LLONG_MAX;
407 ovs_list_init(&raft->waiters);
408 raft->listen_backoff = LLONG_MIN;
409 ovs_list_init(&raft->conns);
410 hmap_init(&raft->add_servers);
411 hmap_init(&raft->commands);
412
8e354614 413 raft->election_timer = ELECTION_BASE_MSEC;
1b1d2e6d
BP
414
415 return raft;
416}
417
418/* Creates an on-disk file that represents a new Raft cluster and initializes
419 * it to consist of a single server, the one on which this function is called.
420 *
421 * Creates the local copy of the cluster's log in 'file_name', which must not
422 * already exist. Gives it the name 'name', which should be the database
423 * schema name and which is used only to match up this database with the server
424 * added to the cluster later if the cluster ID is unavailable.
425 *
426 * The new server is located at 'local_address', which must take one of the
427 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
428 * square bracket enclosed IPv6 address and PORT is a TCP port number.
429 *
430 * This only creates the on-disk file. Use raft_open() to start operating the
431 * new server.
432 *
433 * Returns null if successful, otherwise an ovsdb_error describing the
434 * problem. */
435struct ovsdb_error * OVS_WARN_UNUSED_RESULT
436raft_create_cluster(const char *file_name, const char *name,
437 const char *local_address, const struct json *data)
438{
439 /* Parse and verify validity of the local address. */
440 struct ovsdb_error *error = raft_address_validate(local_address);
441 if (error) {
442 return error;
443 }
444
445 /* Create log file. */
446 struct ovsdb_log *log;
447 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
448 -1, &log);
449 if (error) {
450 return error;
451 }
452
453 /* Write log file. */
454 struct raft_header h = {
455 .sid = uuid_random(),
456 .cid = uuid_random(),
457 .name = xstrdup(name),
458 .local_address = xstrdup(local_address),
459 .joining = false,
460 .remote_addresses = SSET_INITIALIZER(&h.remote_addresses),
461 .snap_index = 1,
462 .snap = {
463 .term = 1,
464 .data = json_nullable_clone(data),
465 .eid = uuid_random(),
466 .servers = json_object_create(),
467 },
468 };
469 shash_add_nocopy(json_object(h.snap.servers),
470 xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
471 json_string_create(local_address));
472 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
473 raft_header_uninit(&h);
474 if (!error) {
475 error = ovsdb_log_commit_block(log);
476 }
477 ovsdb_log_close(log);
478
479 return error;
480}
481
482/* Creates a database file that represents a new server in an existing Raft
483 * cluster.
484 *
485 * Creates the local copy of the cluster's log in 'file_name', which must not
486 * already exist. Gives it the name 'name', which must be the same name
487 * passed in to raft_create_cluster() earlier.
488 *
489 * 'cid' is optional. If specified, the new server will join only the cluster
490 * with the given cluster ID.
491 *
492 * The new server is located at 'local_address', which must take one of the
493 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
494 * square bracket enclosed IPv6 address and PORT is a TCP port number.
495 *
496 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
497 * specifies the addresses of existing servers in the cluster. One server out
498 * of the existing cluster is sufficient, as long as that server is reachable
499 * and not partitioned from the current cluster leader. If multiple servers
500 * from the cluster are specified, then it is sufficient for any of them to
501 * meet this criterion.
502 *
503 * This only creates the on-disk file and does no network access. Use
504 * raft_open() to start operating the new server. (Until this happens, the
505 * new server has not joined the cluster.)
506 *
507 * Returns null if successful, otherwise an ovsdb_error describing the
508 * problem. */
509struct ovsdb_error * OVS_WARN_UNUSED_RESULT
510raft_join_cluster(const char *file_name,
511 const char *name, const char *local_address,
512 const struct sset *remote_addresses,
513 const struct uuid *cid)
514{
515 ovs_assert(!sset_is_empty(remote_addresses));
516
517 /* Parse and verify validity of the addresses. */
518 struct ovsdb_error *error = raft_address_validate(local_address);
519 if (error) {
520 return error;
521 }
522 const char *addr;
523 SSET_FOR_EACH (addr, remote_addresses) {
524 error = raft_address_validate(addr);
525 if (error) {
526 return error;
527 }
528 if (!strcmp(addr, local_address)) {
529 return ovsdb_error(NULL, "remote addresses cannot be the same "
530 "as the local address");
531 }
532 }
533
534 /* Verify validity of the cluster ID (if provided). */
535 if (cid && uuid_is_zero(cid)) {
536 return ovsdb_error(NULL, "all-zero UUID is not valid cluster ID");
537 }
538
539 /* Create log file. */
540 struct ovsdb_log *log;
541 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
542 -1, &log);
543 if (error) {
544 return error;
545 }
546
547 /* Write log file. */
548 struct raft_header h = {
549 .sid = uuid_random(),
550 .cid = cid ? *cid : UUID_ZERO,
551 .name = xstrdup(name),
552 .local_address = xstrdup(local_address),
553 .joining = true,
554 /* No snapshot yet. */
555 };
556 sset_clone(&h.remote_addresses, remote_addresses);
557 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
558 raft_header_uninit(&h);
559 if (!error) {
560 error = ovsdb_log_commit_block(log);
561 }
562 ovsdb_log_close(log);
563
564 return error;
565}
566
567/* Reads the initial header record from 'log', which must be a Raft clustered
568 * database log, and populates '*md' with the information read from it. The
569 * caller must eventually destroy 'md' with raft_metadata_destroy().
570 *
571 * On success, returns NULL. On failure, returns an error that the caller must
572 * eventually destroy and zeros '*md'. */
573struct ovsdb_error * OVS_WARN_UNUSED_RESULT
574raft_read_metadata(struct ovsdb_log *log, struct raft_metadata *md)
575{
576 struct raft *raft = raft_alloc();
577 raft->log = log;
578
579 struct ovsdb_error *error = raft_read_header(raft);
580 if (!error) {
581 md->sid = raft->sid;
582 md->name = xstrdup(raft->name);
583 md->local = xstrdup(raft->local_address);
584 md->cid = raft->cid;
585 } else {
586 memset(md, 0, sizeof *md);
587 }
588
589 raft->log = NULL;
590 raft_close(raft);
591 return error;
592}
593
594/* Frees the metadata in 'md'. */
595void
596raft_metadata_destroy(struct raft_metadata *md)
597{
598 if (md) {
599 free(md->name);
600 free(md->local);
601 }
602}
603
604static const struct raft_entry *
605raft_get_entry(const struct raft *raft, uint64_t index)
606{
607 ovs_assert(index >= raft->log_start);
608 ovs_assert(index < raft->log_end);
609 return &raft->entries[index - raft->log_start];
610}
611
612static uint64_t
613raft_get_term(const struct raft *raft, uint64_t index)
614{
615 return (index == raft->log_start - 1
616 ? raft->snap.term
617 : raft_get_entry(raft, index)->term);
618}
619
620static const struct json *
621raft_servers_for_index(const struct raft *raft, uint64_t index)
622{
623 ovs_assert(index >= raft->log_start - 1);
624 ovs_assert(index < raft->log_end);
625
626 const struct json *servers = raft->snap.servers;
627 for (uint64_t i = raft->log_start; i <= index; i++) {
628 const struct raft_entry *e = raft_get_entry(raft, i);
629 if (e->servers) {
630 servers = e->servers;
631 }
632 }
633 return servers;
634}
635
636static void
637raft_set_servers(struct raft *raft, const struct hmap *new_servers,
638 enum vlog_level level)
639{
640 struct raft_server *s, *next;
641 HMAP_FOR_EACH_SAFE (s, next, hmap_node, &raft->servers) {
642 if (!raft_server_find(new_servers, &s->sid)) {
643 ovs_assert(s != raft->remove_server);
644
645 hmap_remove(&raft->servers, &s->hmap_node);
646 VLOG(level, "server %s removed from configuration", s->nickname);
647 raft_server_destroy(s);
648 }
649 }
650
651 HMAP_FOR_EACH_SAFE (s, next, hmap_node, new_servers) {
652 if (!raft_find_server(raft, &s->sid)) {
653 VLOG(level, "server %s added to configuration", s->nickname);
654
655 struct raft_server *new
656 = raft_server_add(&raft->servers, &s->sid, s->address);
657 raft_server_init_leader(raft, new);
658 }
659 }
660}
661
662static uint64_t
663raft_add_entry(struct raft *raft,
664 uint64_t term, struct json *data, const struct uuid *eid,
8e354614 665 struct json *servers, uint64_t election_timer)
1b1d2e6d
BP
666{
667 if (raft->log_end - raft->log_start >= raft->allocated_log) {
668 raft->entries = x2nrealloc(raft->entries, &raft->allocated_log,
669 sizeof *raft->entries);
670 }
671
672 uint64_t index = raft->log_end++;
673 struct raft_entry *entry = &raft->entries[index - raft->log_start];
674 entry->term = term;
675 entry->data = data;
676 entry->eid = eid ? *eid : UUID_ZERO;
677 entry->servers = servers;
8e354614 678 entry->election_timer = election_timer;
1b1d2e6d
BP
679 return index;
680}
681
8e354614
HZ
682/* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers',
683 * 'election_timer' to * 'raft''s log and returns an error indication. */
1b1d2e6d
BP
684static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
685raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
8e354614
HZ
686 const struct uuid *eid, struct json *servers,
687 uint64_t election_timer)
1b1d2e6d
BP
688{
689 struct raft_record r = {
690 .type = RAFT_REC_ENTRY,
691 .term = term,
692 .entry = {
8e354614
HZ
693 .index = raft_add_entry(raft, term, data, eid, servers,
694 election_timer),
1b1d2e6d
BP
695 .data = data,
696 .servers = servers,
8e354614 697 .election_timer = election_timer,
1b1d2e6d
BP
698 .eid = eid ? *eid : UUID_ZERO,
699 },
700 };
701 return ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r));
702}
703
704static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
705raft_write_state(struct ovsdb_log *log,
706 uint64_t term, const struct uuid *vote)
707{
708 struct raft_record r = { .term = term };
709 if (vote && !uuid_is_zero(vote)) {
710 r.type = RAFT_REC_VOTE;
711 r.sid = *vote;
712 } else {
713 r.type = RAFT_REC_TERM;
714 }
715 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
716}
717
718static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
719raft_apply_record(struct raft *raft, unsigned long long int rec_idx,
720 const struct raft_record *r)
721{
722 /* Apply "term", which is present in most kinds of records (and otherwise
723 * 0).
724 *
725 * A Raft leader can replicate entries from previous terms to the other
726 * servers in the cluster, retaining the original terms on those entries
727 * (see section 3.6.2 "Committing entries from previous terms" for more
728 * information), so it's OK for the term in a log record to precede the
729 * current term. */
730 if (r->term > raft->term) {
731 raft->term = raft->synced_term = r->term;
732 raft->vote = raft->synced_vote = UUID_ZERO;
733 }
734
735 switch (r->type) {
736 case RAFT_REC_ENTRY:
737 if (r->entry.index < raft->commit_index) {
738 return ovsdb_error(NULL, "record %llu attempts to truncate log "
739 "from %"PRIu64" to %"PRIu64" entries, but "
740 "commit index is already %"PRIu64,
741 rec_idx, raft->log_end, r->entry.index,
742 raft->commit_index);
743 } else if (r->entry.index > raft->log_end) {
744 return ovsdb_error(NULL, "record %llu with index %"PRIu64" skips "
745 "past expected index %"PRIu64,
746 rec_idx, r->entry.index, raft->log_end);
747 }
748
749 if (r->entry.index < raft->log_end) {
750 /* This can happen, but it is notable. */
751 VLOG_DBG("record %llu truncates log from %"PRIu64" to %"PRIu64
752 " entries", rec_idx, raft->log_end, r->entry.index);
753 raft_truncate(raft, r->entry.index);
754 }
755
756 uint64_t prev_term = (raft->log_end > raft->log_start
757 ? raft->entries[raft->log_end
758 - raft->log_start - 1].term
759 : raft->snap.term);
760 if (r->term < prev_term) {
761 return ovsdb_error(NULL, "record %llu with index %"PRIu64" term "
762 "%"PRIu64" precedes previous entry's term "
763 "%"PRIu64,
764 rec_idx, r->entry.index, r->term, prev_term);
765 }
766
767 raft->log_synced = raft_add_entry(
768 raft, r->term,
769 json_nullable_clone(r->entry.data), &r->entry.eid,
8e354614
HZ
770 json_nullable_clone(r->entry.servers),
771 r->entry.election_timer);
1b1d2e6d
BP
772 return NULL;
773
774 case RAFT_REC_TERM:
775 return NULL;
776
777 case RAFT_REC_VOTE:
778 if (r->term < raft->term) {
779 return ovsdb_error(NULL, "record %llu votes for term %"PRIu64" "
780 "but current term is %"PRIu64,
781 rec_idx, r->term, raft->term);
782 } else if (!uuid_is_zero(&raft->vote)
783 && !uuid_equals(&raft->vote, &r->sid)) {
784 return ovsdb_error(NULL, "record %llu votes for "SID_FMT" in term "
785 "%"PRIu64" but a previous record for the "
786 "same term voted for "SID_FMT, rec_idx,
787 SID_ARGS(&raft->vote), r->term,
788 SID_ARGS(&r->sid));
789 } else {
790 raft->vote = raft->synced_vote = r->sid;
791 return NULL;
792 }
793 break;
794
795 case RAFT_REC_NOTE:
796 if (!strcmp(r->note, "left")) {
797 return ovsdb_error(NULL, "record %llu indicates server has left "
798 "the cluster; it cannot be added back (use "
799 "\"ovsdb-tool join-cluster\" to add a new "
800 "server)", rec_idx);
801 }
802 return NULL;
803
804 case RAFT_REC_COMMIT_INDEX:
805 if (r->commit_index < raft->commit_index) {
806 return ovsdb_error(NULL, "record %llu regresses commit index "
807 "from %"PRIu64 " to %"PRIu64,
808 rec_idx, raft->commit_index, r->commit_index);
809 } else if (r->commit_index >= raft->log_end) {
810 return ovsdb_error(NULL, "record %llu advances commit index to "
811 "%"PRIu64 " but last log index is %"PRIu64,
812 rec_idx, r->commit_index, raft->log_end - 1);
813 } else {
814 raft->commit_index = r->commit_index;
815 return NULL;
816 }
817 break;
818
819 case RAFT_REC_LEADER:
820 /* XXX we could use this to take back leadership for quick restart */
821 return NULL;
822
823 default:
824 OVS_NOT_REACHED();
825 }
826}
827
828static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
829raft_read_header(struct raft *raft)
830{
831 /* Read header record. */
832 struct json *json;
833 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
834 if (error || !json) {
835 /* Report error or end-of-file. */
836 return error;
837 }
838 ovsdb_log_mark_base(raft->log);
839
840 struct raft_header h;
841 error = raft_header_from_json(&h, json);
842 json_destroy(json);
843 if (error) {
844 return error;
845 }
846
847 raft->sid = h.sid;
848 raft->cid = h.cid;
849 raft->name = xstrdup(h.name);
850 raft->local_address = xstrdup(h.local_address);
851 raft->local_nickname = raft_address_to_nickname(h.local_address, &h.sid);
852 raft->joining = h.joining;
853
854 if (h.joining) {
855 sset_clone(&raft->remote_addresses, &h.remote_addresses);
856 } else {
857 raft_entry_clone(&raft->snap, &h.snap);
858 raft->log_start = raft->log_end = h.snap_index + 1;
7efc6980 859 raft->log_synced = raft->commit_index = h.snap_index;
1b1d2e6d
BP
860 raft->last_applied = h.snap_index - 1;
861 }
862
863 raft_header_uninit(&h);
864
865 return NULL;
866}
867
868static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
869raft_read_log(struct raft *raft)
870{
871 for (unsigned long long int i = 1; ; i++) {
872 struct json *json;
873 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
874 if (!json) {
875 if (error) {
876 /* We assume that the error is due to a partial write while
877 * appending to the file before a crash, so log it and
878 * continue. */
879 char *error_string = ovsdb_error_to_string_free(error);
880 VLOG_WARN("%s", error_string);
881 free(error_string);
882 error = NULL;
883 }
884 break;
885 }
886
887 struct raft_record r;
888 error = raft_record_from_json(&r, json);
889 if (!error) {
890 error = raft_apply_record(raft, i, &r);
891 raft_record_uninit(&r);
892 }
ce52f1ee 893 json_destroy(json);
1b1d2e6d
BP
894 if (error) {
895 return ovsdb_wrap_error(error, "error reading record %llu from "
896 "%s log", i, raft->name);
897 }
898 }
899
900 /* Set the most recent servers. */
901 raft_get_servers_from_log(raft, VLL_DBG);
902
8e354614
HZ
903 /* Set the most recent election_timer. */
904 raft_get_election_timer_from_log(raft);
905
1b1d2e6d
BP
906 return NULL;
907}
908
909static void
0f954f32 910raft_reset_election_timer(struct raft *raft)
1b1d2e6d 911{
8e354614 912 unsigned int duration = (raft->election_timer
1b1d2e6d
BP
913 + random_range(ELECTION_RANGE_MSEC));
914 raft->election_base = time_msec();
67dba070
HZ
915 if (failure_test == FT_DELAY_ELECTION) {
916 /* Slow down this node so that it won't win the next election. */
8e354614 917 duration += raft->election_timer;
67dba070 918 }
1b1d2e6d
BP
919 raft->election_timeout = raft->election_base + duration;
920}
921
0f954f32
HZ
922static void
923raft_reset_ping_timer(struct raft *raft)
924{
8e354614 925 raft->ping_timeout = time_msec() + raft->election_timer / 3;
0f954f32
HZ
926}
927
1b1d2e6d
BP
928static void
929raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
930 const struct uuid *sid, bool incoming)
931{
932 struct raft_conn *conn = xzalloc(sizeof *conn);
933 ovs_list_push_back(&raft->conns, &conn->list_node);
934 conn->js = js;
935 if (sid) {
936 conn->sid = *sid;
937 }
938 conn->nickname = raft_address_to_nickname(jsonrpc_session_get_name(js),
939 &conn->sid);
940 conn->incoming = incoming;
941 conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
db5a066c 942 jsonrpc_session_set_probe_interval(js, 0);
1b1d2e6d
BP
943}
944
945/* Starts the local server in an existing Raft cluster, using the local copy of
946 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
947 * successful or not. */
948struct ovsdb_error * OVS_WARN_UNUSED_RESULT
949raft_open(struct ovsdb_log *log, struct raft **raftp)
950{
951 struct raft *raft = raft_alloc();
952 raft->log = log;
953
954 struct ovsdb_error *error = raft_read_header(raft);
955 if (error) {
956 goto error;
957 }
958
959 if (!raft->joining) {
960 error = raft_read_log(raft);
961 if (error) {
962 goto error;
963 }
964
965 /* Find our own server. */
966 if (!raft_find_server(raft, &raft->sid)) {
967 error = ovsdb_error(NULL, "server does not belong to cluster");
968 goto error;
969 }
970
971 /* If there's only one server, start an election right away so that the
972 * cluster bootstraps quickly. */
973 if (hmap_count(&raft->servers) == 1) {
974 raft_start_election(raft, false);
975 }
976 } else {
977 raft->join_timeout = time_msec() + 1000;
978 }
979
ac3ba8c6
HZ
980 raft_reset_ping_timer(raft);
981 raft_reset_election_timer(raft);
982
1b1d2e6d
BP
983 *raftp = raft;
984 hmap_insert(&all_rafts, &raft->hmap_node, hash_string(raft->name, 0));
985 return NULL;
986
987error:
988 raft_close(raft);
989 *raftp = NULL;
990 return error;
991}
992
993/* Returns the name of 'raft', which in OVSDB is the database schema name. */
994const char *
995raft_get_name(const struct raft *raft)
996{
997 return raft->name;
998}
999
1000/* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
1001 * its cluster, then 'cid' will be all-zeros (unless the administrator
1002 * specified a cluster ID running "ovsdb-tool join-cluster").
1003 *
1004 * Each cluster has a unique cluster ID. */
1005const struct uuid *
1006raft_get_cid(const struct raft *raft)
1007{
1008 return &raft->cid;
1009}
1010
1011/* Returns the server ID of 'raft'. Each server has a unique server ID. */
1012const struct uuid *
1013raft_get_sid(const struct raft *raft)
1014{
1015 return &raft->sid;
1016}
1017
3423cd97
IM
1018/* Adds memory consumption info to 'usage' for later use by memory_report(). */
1019void
1020raft_get_memory_usage(const struct raft *raft, struct simap *usage)
1021{
1022 struct raft_conn *conn;
6182c695 1023 uint64_t backlog = 0;
3423cd97
IM
1024 int cnt = 0;
1025
1026 LIST_FOR_EACH (conn, list_node, &raft->conns) {
6182c695 1027 backlog += jsonrpc_session_get_backlog(conn->js);
3423cd97
IM
1028 cnt++;
1029 }
6182c695 1030 simap_increase(usage, "raft-backlog-kB", backlog / 1000);
3423cd97
IM
1031 simap_increase(usage, "raft-connections", cnt);
1032}
1033
1b1d2e6d
BP
1034/* Returns true if 'raft' has completed joining its cluster, has not left or
1035 * initiated leaving the cluster, does not have failed disk storage, and is
1036 * apparently connected to the leader in a healthy way (or is itself the
4d9b28cb
HZ
1037 * leader).
1038 *
1039 * If 'raft' is candidate:
1040 * a) if it is the first round of election, consider it as connected, hoping
1041 * it will successfully elect a new leader soon.
1042 * b) if it is already retrying, consider it as disconnected (so that clients
1043 * may decide to reconnect to other members). */
1b1d2e6d
BP
1044bool
1045raft_is_connected(const struct raft *raft)
1046{
923f01ca 1047 bool ret = (!raft->candidate_retrying
1b1d2e6d
BP
1048 && !raft->joining
1049 && !raft->leaving
1050 && !raft->left
2833885f
HZ
1051 && !raft->failed
1052 && raft->ever_had_leader);
923f01ca
HZ
1053 VLOG_DBG("raft_is_connected: %s\n", ret? "true": "false");
1054 return ret;
1b1d2e6d
BP
1055}
1056
1057/* Returns true if 'raft' is the cluster leader. */
1058bool
1059raft_is_leader(const struct raft *raft)
1060{
1061 return raft->role == RAFT_LEADER;
1062}
1063
1064/* Returns true if 'raft' is the process of joining its cluster. */
1065bool
1066raft_is_joining(const struct raft *raft)
1067{
1068 return raft->joining;
1069}
1070
1071/* Only returns *connected* connections. */
1072static struct raft_conn *
1073raft_find_conn_by_sid(struct raft *raft, const struct uuid *sid)
1074{
1075 if (!uuid_is_zero(sid)) {
1076 struct raft_conn *conn;
1077 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1078 if (uuid_equals(sid, &conn->sid)
1079 && jsonrpc_session_is_connected(conn->js)) {
1080 return conn;
1081 }
1082 }
1083 }
1084 return NULL;
1085}
1086
1087static struct raft_conn *
1088raft_find_conn_by_address(struct raft *raft, const char *address)
1089{
1090 struct raft_conn *conn;
1091 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1092 if (!strcmp(jsonrpc_session_get_name(conn->js), address)) {
1093 return conn;
1094 }
1095 }
1096 return NULL;
1097}
1098
1099static void OVS_PRINTF_FORMAT(3, 4)
1100raft_record_note(struct raft *raft, const char *note,
1101 const char *comment_format, ...)
1102{
1103 va_list args;
1104 va_start(args, comment_format);
1105 char *comment = xvasprintf(comment_format, args);
1106 va_end(args);
1107
1108 struct raft_record r = {
1109 .type = RAFT_REC_NOTE,
1110 .comment = comment,
1111 .note = CONST_CAST(char *, note),
1112 };
1113 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
1114
1115 free(comment);
1116}
1117
1118/* If we're leader, try to transfer leadership to another server, logging
1119 * 'reason' as the human-readable reason (it should be a phrase suitable for
1120 * following "because") . */
1121void
1122raft_transfer_leadership(struct raft *raft, const char *reason)
1123{
1124 if (raft->role != RAFT_LEADER) {
1125 return;
1126 }
1127
1128 struct raft_server *s;
1129 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1130 if (!uuid_equals(&raft->sid, &s->sid)
1131 && s->phase == RAFT_PHASE_STABLE) {
1132 struct raft_conn *conn = raft_find_conn_by_sid(raft, &s->sid);
1133 if (!conn) {
1134 continue;
1135 }
1136
1137 union raft_rpc rpc = {
1138 .become_leader = {
1139 .common = {
1140 .comment = CONST_CAST(char *, reason),
1141 .type = RAFT_RPC_BECOME_LEADER,
1142 .sid = s->sid,
1143 },
1144 .term = raft->term,
1145 }
1146 };
02acb41a 1147 raft_send_to_conn(raft, &rpc, conn);
1b1d2e6d
BP
1148
1149 raft_record_note(raft, "transfer leadership",
1150 "transferring leadership to %s because %s",
1151 s->nickname, reason);
1152 break;
1153 }
1154 }
1155}
1156
1157/* Send a RemoveServerRequest to the rest of the servers in the cluster.
1158 *
1159 * If we know which server is the leader, we can just send the request to it.
1160 * However, we might not know which server is the leader, and we might never
1161 * find out if the remove request was actually previously committed by a
1162 * majority of the servers (because in that case the new leader will not send
1163 * AppendRequests or heartbeats to us). Therefore, we instead send
1164 * RemoveRequests to every server. This theoretically has the same problem, if
1165 * the current cluster leader was not previously a member of the cluster, but
1166 * it seems likely to be more robust in practice. */
1167static void
1168raft_send_remove_server_requests(struct raft *raft)
1169{
1170 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1171 VLOG_INFO_RL(&rl, "sending remove request (joining=%s, leaving=%s)",
1172 raft->joining ? "true" : "false",
1173 raft->leaving ? "true" : "false");
1174 const struct raft_server *s;
1175 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1176 if (!uuid_equals(&s->sid, &raft->sid)) {
1177 union raft_rpc rpc = (union raft_rpc) {
1178 .remove_server_request = {
1179 .common = {
1180 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1181 .sid = s->sid,
1182 },
1183 .sid = raft->sid,
1184 },
1185 };
1186 raft_send(raft, &rpc);
1187 }
1188 }
1189
8e354614 1190 raft->leave_timeout = time_msec() + raft->election_timer;
1b1d2e6d
BP
1191}
1192
1193/* Attempts to start 'raft' leaving its cluster. The caller can check progress
1194 * using raft_is_leaving() and raft_left(). */
1195void
1196raft_leave(struct raft *raft)
1197{
1198 if (raft->joining || raft->failed || raft->leaving || raft->left) {
1199 return;
1200 }
1201 VLOG_INFO(SID_FMT": starting to leave cluster "CID_FMT,
1202 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1203 raft->leaving = true;
1204 raft_transfer_leadership(raft, "this server is leaving the cluster");
1205 raft_become_follower(raft);
1206 raft_send_remove_server_requests(raft);
8e354614 1207 raft->leave_timeout = time_msec() + raft->election_timer;
1b1d2e6d
BP
1208}
1209
1210/* Returns true if 'raft' is currently attempting to leave its cluster. */
1211bool
1212raft_is_leaving(const struct raft *raft)
1213{
1214 return raft->leaving;
1215}
1216
1217/* Returns true if 'raft' successfully left its cluster. */
1218bool
1219raft_left(const struct raft *raft)
1220{
1221 return raft->left;
1222}
1223
1224/* Returns true if 'raft' has experienced a disk I/O failure. When this
1225 * returns true, only closing and reopening 'raft' allows for recovery. */
1226bool
1227raft_failed(const struct raft *raft)
1228{
1229 return raft->failed;
1230}
1231
1232/* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1233 * current cluster. */
1234void
1235raft_take_leadership(struct raft *raft)
1236{
1237 if (raft->role != RAFT_LEADER) {
1238 raft_start_election(raft, true);
1239 }
1240}
1241
1242/* Closes everything owned by 'raft' that might be visible outside the process:
1243 * network connections, commands, etc. This is part of closing 'raft'; it is
1244 * also used if 'raft' has failed in an unrecoverable way. */
1245static void
1246raft_close__(struct raft *raft)
1247{
1248 if (!hmap_node_is_null(&raft->hmap_node)) {
1249 hmap_remove(&all_rafts, &raft->hmap_node);
1250 hmap_node_nullify(&raft->hmap_node);
1251 }
1252
1253 raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
1254
1255 struct raft_server *rs = raft->remove_server;
1256 if (rs) {
1257 raft_send_remove_server_reply__(raft, &rs->sid, &rs->requester_sid,
1258 rs->requester_conn, false,
1259 RAFT_SERVER_SHUTDOWN);
1260 raft_server_destroy(raft->remove_server);
1261 raft->remove_server = NULL;
1262 }
1263
1264 struct raft_conn *conn, *next;
1265 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1266 raft_conn_close(conn);
1267 }
1268}
1269
1270/* Closes and frees 'raft'.
1271 *
1272 * A server's cluster membership is independent of whether the server is
1273 * actually running. When a server that is a member of a cluster closes, the
1274 * cluster treats this as a server failure. */
1275void
1276raft_close(struct raft *raft)
1277{
1278 if (!raft) {
1279 return;
1280 }
1281
1282 raft_transfer_leadership(raft, "this server is shutting down");
1283
1284 raft_close__(raft);
1285
1286 ovsdb_log_close(raft->log);
1287
1288 raft_servers_destroy(&raft->servers);
1289
1290 for (uint64_t index = raft->log_start; index < raft->log_end; index++) {
1291 struct raft_entry *e = &raft->entries[index - raft->log_start];
1292 raft_entry_uninit(e);
1293 }
1294 free(raft->entries);
1295
1296 raft_entry_uninit(&raft->snap);
1297
1298 raft_waiters_destroy(raft);
1299
1300 raft_servers_destroy(&raft->add_servers);
1301
1302 hmap_destroy(&raft->commands);
1303
1304 pstream_close(raft->listener);
1305
1306 sset_destroy(&raft->remote_addresses);
1307 free(raft->local_address);
1308 free(raft->local_nickname);
1309 free(raft->name);
1310
1311 free(raft);
1312}
1313
1314static bool
1315raft_conn_receive(struct raft *raft, struct raft_conn *conn,
1316 union raft_rpc *rpc)
1317{
1318 struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
1319 if (!msg) {
1320 return false;
1321 }
1322
1323 struct ovsdb_error *error = raft_rpc_from_jsonrpc(&raft->cid, &raft->sid,
1324 msg, rpc);
1325 jsonrpc_msg_destroy(msg);
1326 if (error) {
1327 char *s = ovsdb_error_to_string_free(error);
1328 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn->js), s);
1329 free(s);
1330 return false;
1331 }
1332
1333 if (uuid_is_zero(&conn->sid)) {
1334 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1335 conn->sid = rpc->common.sid;
1336 VLOG_INFO_RL(&rl, "%s: learned server ID "SID_FMT,
1337 jsonrpc_session_get_name(conn->js), SID_ARGS(&conn->sid));
1338 } else if (!uuid_equals(&conn->sid, &rpc->common.sid)) {
1339 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1340 VLOG_WARN_RL(&rl, "%s: ignoring message with unexpected server ID "
1341 SID_FMT" (expected "SID_FMT")",
1342 jsonrpc_session_get_name(conn->js),
1343 SID_ARGS(&rpc->common.sid), SID_ARGS(&conn->sid));
1344 raft_rpc_uninit(rpc);
1345 return false;
1346 }
1347
1348 const char *address = (rpc->type == RAFT_RPC_HELLO_REQUEST
1349 ? rpc->hello_request.address
1350 : rpc->type == RAFT_RPC_ADD_SERVER_REQUEST
1351 ? rpc->add_server_request.address
1352 : NULL);
1353 if (address) {
1354 char *new_nickname = raft_address_to_nickname(address, &conn->sid);
1355 if (strcmp(conn->nickname, new_nickname)) {
1356 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1357 VLOG_INFO_RL(&rl, "%s: learned remote address %s",
1358 jsonrpc_session_get_name(conn->js), address);
1359
1360 free(conn->nickname);
1361 conn->nickname = new_nickname;
1362 } else {
1363 free(new_nickname);
1364 }
1365 }
1366
1367 return true;
1368}
1369
1370static const char *
1371raft_get_nickname(const struct raft *raft, const struct uuid *sid,
1372 char buf[SID_LEN + 1], size_t bufsize)
1373{
1374 if (uuid_equals(sid, &raft->sid)) {
1375 return raft->local_nickname;
1376 }
1377
1378 const char *s = raft_servers_get_nickname__(&raft->servers, sid);
1379 if (s) {
1380 return s;
1381 }
1382
1383 return raft_servers_get_nickname(&raft->add_servers, sid, buf, bufsize);
1384}
1385
1386static void
02acb41a
BP
1387log_rpc(const union raft_rpc *rpc, const char *direction,
1388 const struct raft_conn *conn, int line_number)
1b1d2e6d
BP
1389{
1390 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
1391 if (!raft_rpc_is_heartbeat(rpc) && !VLOG_DROP_DBG(&rl)) {
1392 struct ds s = DS_EMPTY_INITIALIZER;
02acb41a
BP
1393 if (line_number) {
1394 ds_put_format(&s, "raft.c:%d ", line_number);
1395 }
1396 ds_put_format(&s, "%s%s ", direction, conn->nickname);
1b1d2e6d 1397 raft_rpc_format(rpc, &s);
02acb41a 1398 VLOG_DBG("%s", ds_cstr(&s));
1b1d2e6d
BP
1399 ds_destroy(&s);
1400 }
1401}
1402
1403static void
1404raft_send_add_server_request(struct raft *raft, struct raft_conn *conn)
1405{
1406 union raft_rpc rq = {
1407 .add_server_request = {
1408 .common = {
1409 .type = RAFT_RPC_ADD_SERVER_REQUEST,
1410 .sid = UUID_ZERO,
1411 .comment = NULL,
1412 },
1413 .address = raft->local_address,
1414 },
1415 };
02acb41a 1416 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1417}
1418
1419static void
1420raft_conn_run(struct raft *raft, struct raft_conn *conn)
1421{
1422 jsonrpc_session_run(conn->js);
1423
1424 unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
8c2c503b
IM
1425 bool reconnected = new_seqno != conn->js_seqno;
1426 bool just_connected = (reconnected
1b1d2e6d 1427 && jsonrpc_session_is_connected(conn->js));
8c2c503b
IM
1428
1429 if (reconnected) {
1430 /* Clear 'last_install_snapshot_request' since it might not reach the
1431 * destination or server was restarted. */
1432 struct raft_server *server = raft_find_server(raft, &conn->sid);
1433 if (server) {
1434 free(server->last_install_snapshot_request);
1435 server->last_install_snapshot_request = NULL;
1436 }
1437 }
1438
1b1d2e6d
BP
1439 conn->js_seqno = new_seqno;
1440 if (just_connected) {
1441 if (raft->joining) {
1442 raft_send_add_server_request(raft, conn);
1443 } else if (raft->leaving) {
1444 union raft_rpc rq = {
1445 .remove_server_request = {
1446 .common = {
1447 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1448 .sid = conn->sid,
1449 },
1450 .sid = raft->sid,
1451 },
1452 };
02acb41a 1453 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1454 } else {
1455 union raft_rpc rq = (union raft_rpc) {
1456 .hello_request = {
1457 .common = {
1458 .type = RAFT_RPC_HELLO_REQUEST,
1459 .sid = conn->sid,
1460 },
1461 .address = raft->local_address,
1462 },
1463 };
02acb41a 1464 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1465 }
1466 }
1467
1468 for (size_t i = 0; i < 50; i++) {
1469 union raft_rpc rpc;
1470 if (!raft_conn_receive(raft, conn, &rpc)) {
1471 break;
1472 }
1473
02acb41a 1474 log_rpc(&rpc, "<--", conn, 0);
1b1d2e6d
BP
1475 raft_handle_rpc(raft, &rpc);
1476 raft_rpc_uninit(&rpc);
1477 }
1478}
1479
1480static void
1481raft_waiter_complete_rpc(struct raft *raft, const union raft_rpc *rpc)
1482{
1483 uint64_t term = raft_rpc_get_term(rpc);
1484 if (term && term < raft->term) {
1485 /* Drop the message because it's for an expired term. */
1486 return;
1487 }
1488
1489 if (!raft_is_rpc_synced(raft, rpc)) {
1490 /* This is a bug. A reply message is deferred because some state in
1491 * the message, such as a term or index, has not been committed to
1492 * disk, and they should only be completed when that commit is done.
1493 * But this message is being completed before the commit is finished.
1494 * Complain, and hope that someone reports the bug. */
1495 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1496 if (VLOG_DROP_ERR(&rl)) {
1497 return;
1498 }
1499
1500 struct ds s = DS_EMPTY_INITIALIZER;
1501
1502 if (term > raft->synced_term) {
1503 ds_put_format(&s, " because message term %"PRIu64" is "
1504 "past synced term %"PRIu64,
1505 term, raft->synced_term);
1506 }
1507
1508 uint64_t index = raft_rpc_get_min_sync_index(rpc);
1509 if (index > raft->log_synced) {
1510 ds_put_format(&s, " %s message index %"PRIu64" is past last "
1511 "synced index %"PRIu64,
1512 s.length ? "and" : "because",
1513 index, raft->log_synced);
1514 }
1515
1516 const struct uuid *vote = raft_rpc_get_vote(rpc);
1517 if (vote && !uuid_equals(vote, &raft->synced_vote)) {
1518 char buf1[SID_LEN + 1];
1519 char buf2[SID_LEN + 1];
1520 ds_put_format(&s, " %s vote %s differs from synced vote %s",
1521 s.length ? "and" : "because",
1522 raft_get_nickname(raft, vote, buf1, sizeof buf1),
1523 raft_get_nickname(raft, &raft->synced_vote,
1524 buf2, sizeof buf2));
1525 }
1526
1527 char buf[SID_LEN + 1];
1528 ds_put_format(&s, ": %s ",
1529 raft_get_nickname(raft, &rpc->common.sid,
1530 buf, sizeof buf));
1531 raft_rpc_format(rpc, &s);
1532 VLOG_ERR("internal error: deferred %s message completed "
1533 "but not ready to send%s",
1534 raft_rpc_type_to_string(rpc->type), ds_cstr(&s));
1535 ds_destroy(&s);
1536
1537 return;
1538 }
1539
1540 struct raft_conn *dst = raft_find_conn_by_sid(raft, &rpc->common.sid);
1541 if (dst) {
02acb41a 1542 raft_send_to_conn(raft, rpc, dst);
1b1d2e6d
BP
1543 }
1544}
1545
1546static void
1547raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
1548{
1549 switch (w->type) {
1550 case RAFT_W_ENTRY:
1551 if (raft->role == RAFT_LEADER) {
1552 raft_update_our_match_index(raft, w->entry.index);
1553 }
1554 raft->log_synced = w->entry.index;
1555 break;
1556
1557 case RAFT_W_TERM:
1558 raft->synced_term = w->term.term;
1559 raft->synced_vote = w->term.vote;
1560 break;
1561
1562 case RAFT_W_RPC:
1563 raft_waiter_complete_rpc(raft, w->rpc);
1564 break;
1565 }
1566}
1567
1568static void
1569raft_waiter_destroy(struct raft_waiter *w)
1570{
1571 if (!w) {
1572 return;
1573 }
1574
1575 ovs_list_remove(&w->list_node);
1576
1577 switch (w->type) {
1578 case RAFT_W_ENTRY:
1579 case RAFT_W_TERM:
1580 break;
1581
1582 case RAFT_W_RPC:
1583 raft_rpc_uninit(w->rpc);
1584 free(w->rpc);
1585 break;
1586 }
1587 free(w);
1588}
1589
1590static void
1591raft_waiters_run(struct raft *raft)
1592{
1593 if (ovs_list_is_empty(&raft->waiters)) {
1594 return;
1595 }
1596
1597 uint64_t cur = ovsdb_log_commit_progress(raft->log);
1598 struct raft_waiter *w, *next;
1599 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1600 if (cur < w->commit_ticket) {
1601 break;
1602 }
1603 raft_waiter_complete(raft, w);
1604 raft_waiter_destroy(w);
1605 }
1606}
1607
1608static void
1609raft_waiters_wait(struct raft *raft)
1610{
1611 struct raft_waiter *w;
1612 LIST_FOR_EACH (w, list_node, &raft->waiters) {
1613 ovsdb_log_commit_wait(raft->log, w->commit_ticket);
1614 break;
1615 }
1616}
1617
1618static void
1619raft_waiters_destroy(struct raft *raft)
1620{
1621 struct raft_waiter *w, *next;
1622 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1623 raft_waiter_destroy(w);
1624 }
1625}
1626
1627static bool OVS_WARN_UNUSED_RESULT
1628raft_set_term(struct raft *raft, uint64_t term, const struct uuid *vote)
1629{
1630 struct ovsdb_error *error = raft_write_state(raft->log, term, vote);
1631 if (!raft_handle_write_error(raft, error)) {
1632 return false;
1633 }
1634
1635 struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_TERM, true);
1636 raft->term = w->term.term = term;
1637 raft->vote = w->term.vote = vote ? *vote : UUID_ZERO;
1638 return true;
1639}
1640
1641static void
1642raft_accept_vote(struct raft *raft, struct raft_server *s,
1643 const struct uuid *vote)
1644{
1645 if (uuid_equals(&s->vote, vote)) {
1646 return;
1647 }
1648 if (!uuid_is_zero(&s->vote)) {
1649 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1650 char buf1[SID_LEN + 1];
1651 char buf2[SID_LEN + 1];
1652 VLOG_WARN_RL(&rl, "server %s changed its vote from %s to %s",
1653 s->nickname,
1654 raft_get_nickname(raft, &s->vote, buf1, sizeof buf1),
1655 raft_get_nickname(raft, vote, buf2, sizeof buf2));
1656 }
1657 s->vote = *vote;
1658 if (uuid_equals(vote, &raft->sid)
1659 && ++raft->n_votes > hmap_count(&raft->servers) / 2) {
1660 raft_become_leader(raft);
1661 }
1662}
1663
1664static void
1665raft_start_election(struct raft *raft, bool leadership_transfer)
1666{
1667 if (raft->leaving) {
1668 return;
1669 }
1670
1671 struct raft_server *me = raft_find_server(raft, &raft->sid);
1672 if (!me) {
1673 return;
1674 }
1675
1676 if (!raft_set_term(raft, raft->term + 1, &raft->sid)) {
1677 return;
1678 }
1679
1b1d2e6d 1680 ovs_assert(raft->role != RAFT_LEADER);
93ee4209 1681
cdae6100 1682 raft->leader_sid = UUID_ZERO;
1b1d2e6d 1683 raft->role = RAFT_CANDIDATE;
923f01ca
HZ
1684 /* If there was no leader elected since last election, we know we are
1685 * retrying now. */
1686 raft->candidate_retrying = !raft->had_leader;
1687 raft->had_leader = false;
1b1d2e6d
BP
1688
1689 raft->n_votes = 0;
1690
1691 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1692 if (!VLOG_DROP_INFO(&rl)) {
1693 long long int now = time_msec();
1694 if (now >= raft->election_timeout) {
1695 VLOG_INFO("term %"PRIu64": %lld ms timeout expired, "
1696 "starting election",
1697 raft->term, now - raft->election_base);
1698 } else {
1699 VLOG_INFO("term %"PRIu64": starting election", raft->term);
1700 }
1701 }
0f954f32 1702 raft_reset_election_timer(raft);
1b1d2e6d
BP
1703
1704 struct raft_server *peer;
1705 HMAP_FOR_EACH (peer, hmap_node, &raft->servers) {
1706 peer->vote = UUID_ZERO;
1707 if (uuid_equals(&raft->sid, &peer->sid)) {
1708 continue;
1709 }
1710
1711 union raft_rpc rq = {
1712 .vote_request = {
1713 .common = {
1714 .type = RAFT_RPC_VOTE_REQUEST,
1715 .sid = peer->sid,
1716 },
1717 .term = raft->term,
1718 .last_log_index = raft->log_end - 1,
1719 .last_log_term = (
1720 raft->log_end > raft->log_start
1721 ? raft->entries[raft->log_end - raft->log_start - 1].term
1722 : raft->snap.term),
1723 .leadership_transfer = leadership_transfer,
1724 },
1725 };
93ee4209
HZ
1726 if (failure_test != FT_DONT_SEND_VOTE_REQUEST) {
1727 raft_send(raft, &rq);
1728 }
1b1d2e6d
BP
1729 }
1730
1731 /* Vote for ourselves. */
1732 raft_accept_vote(raft, me, &raft->sid);
1733}
1734
1735static void
1736raft_open_conn(struct raft *raft, const char *address, const struct uuid *sid)
1737{
1738 if (strcmp(address, raft->local_address)
1739 && !raft_find_conn_by_address(raft, address)) {
1740 raft_add_conn(raft, jsonrpc_session_open(address, true), sid, false);
1741 }
1742}
1743
1744static void
1745raft_conn_close(struct raft_conn *conn)
1746{
1747 jsonrpc_session_close(conn->js);
1748 ovs_list_remove(&conn->list_node);
1749 free(conn->nickname);
1750 free(conn);
1751}
1752
1753/* Returns true if 'conn' should stay open, false if it should be closed. */
1754static bool
1755raft_conn_should_stay_open(struct raft *raft, struct raft_conn *conn)
1756{
1757 /* Close the connection if it's actually dead. If necessary, we'll
1758 * initiate a new session later. */
1759 if (!jsonrpc_session_is_alive(conn->js)) {
1760 return false;
1761 }
1762
1763 /* Keep incoming sessions. We trust the originator to decide to drop
1764 * it. */
1765 if (conn->incoming) {
1766 return true;
1767 }
1768
1769 /* If we are joining the cluster, keep sessions to the remote addresses
1770 * that are supposed to be part of the cluster we're joining. */
1771 if (raft->joining && sset_contains(&raft->remote_addresses,
1772 jsonrpc_session_get_name(conn->js))) {
1773 return true;
1774 }
1775
1776 /* We have joined the cluster. If we did that "recently", then there is a
1777 * chance that we do not have the most recent server configuration log
1778 * entry. If so, it's a waste to disconnect from the servers that were in
1779 * remote_addresses and that will probably appear in the configuration,
1780 * just to reconnect to them a moment later when we do get the
1781 * configuration update. If we are not ourselves in the configuration,
1782 * then we know that there must be a new configuration coming up, so in
1783 * that case keep the connection. */
1784 if (!raft_find_server(raft, &raft->sid)) {
1785 return true;
1786 }
1787
1788 /* Keep the connection only if the server is part of the configuration. */
1789 return raft_find_server(raft, &conn->sid);
1790}
1791
1792/* Allows 'raft' to maintain the distributed log. Call this function as part
1793 * of the process's main loop. */
1794void
1795raft_run(struct raft *raft)
1796{
1797 if (raft->left || raft->failed) {
1798 return;
1799 }
1800
1801 raft_waiters_run(raft);
1802
1803 if (!raft->listener && time_msec() >= raft->listen_backoff) {
1804 char *paddr = raft_make_address_passive(raft->local_address);
1805 int error = pstream_open(paddr, &raft->listener, DSCP_DEFAULT);
1806 if (error) {
1807 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1808 VLOG_WARN_RL(&rl, "%s: listen failed (%s)",
1809 paddr, ovs_strerror(error));
1810 raft->listen_backoff = time_msec() + 1000;
1811 }
1812 free(paddr);
1813 }
1814
1815 if (raft->listener) {
1816 struct stream *stream;
1817 int error = pstream_accept(raft->listener, &stream);
1818 if (!error) {
1819 raft_add_conn(raft, jsonrpc_session_open_unreliably(
1820 jsonrpc_open(stream), DSCP_DEFAULT), NULL,
1821 true);
1822 } else if (error != EAGAIN) {
1823 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1824 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
1825 pstream_get_name(raft->listener),
1826 ovs_strerror(error));
1827 }
1828 }
1829
1830 /* Run RPCs for all open sessions. */
1831 struct raft_conn *conn;
1832 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1833 raft_conn_run(raft, conn);
1834 }
1835
1836 /* Close unneeded sessions. */
1837 struct raft_conn *next;
1838 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1839 if (!raft_conn_should_stay_open(raft, conn)) {
1840 raft_conn_close(conn);
1841 }
1842 }
1843
1844 /* Open needed sessions. */
1845 struct raft_server *server;
1846 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1847 raft_open_conn(raft, server->address, &server->sid);
1848 }
1849 if (raft->joining) {
1850 const char *address;
1851 SSET_FOR_EACH (address, &raft->remote_addresses) {
1852 raft_open_conn(raft, address, NULL);
1853 }
1854 }
1855
1856 if (!raft->joining && time_msec() >= raft->election_timeout) {
89771c1e
HZ
1857 if (raft->role == RAFT_LEADER) {
1858 /* Check if majority of followers replied, then reset
1859 * election_timeout and reset s->replied. Otherwise, become
1860 * follower.
1861 *
1862 * Raft paper section 6.2: Leaders: A server might be in the leader
1863 * state, but if it isn’t the current leader, it could be
1864 * needlessly delaying client requests. For example, suppose a
1865 * leader is partitioned from the rest of the cluster, but it can
1866 * still communicate with a particular client. Without additional
1867 * mechanism, it could delay a request from that client forever,
1868 * being unable to replicate a log entry to any other servers.
1869 * Meanwhile, there might be another leader of a newer term that is
1870 * able to communicate with a majority of the cluster and would be
1871 * able to commit the client’s request. Thus, a leader in Raft
1872 * steps down if an election timeout elapses without a successful
1873 * round of heartbeats to a majority of its cluster; this allows
1874 * clients to retry their requests with another server. */
1875 int count = 0;
1876 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1877 if (server->replied) {
1878 count ++;
1879 }
1880 }
1881 if (count >= hmap_count(&raft->servers) / 2) {
1882 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1883 server->replied = false;
1884 }
1885 raft_reset_election_timer(raft);
1886 } else {
1887 raft_become_follower(raft);
1888 raft_start_election(raft, false);
1889 }
1890 } else {
1891 raft_start_election(raft, false);
1892 }
1893
1b1d2e6d
BP
1894 }
1895
1896 if (raft->leaving && time_msec() >= raft->leave_timeout) {
1897 raft_send_remove_server_requests(raft);
1898 }
1899
1900 if (raft->joining && time_msec() >= raft->join_timeout) {
1901 raft->join_timeout = time_msec() + 1000;
1902 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1903 raft_send_add_server_request(raft, conn);
1904 }
1905 }
1906
5a9b53a5
HZ
1907 long long int now = time_msec();
1908 if (now >= raft->ping_timeout) {
1b1d2e6d
BP
1909 if (raft->role == RAFT_LEADER) {
1910 raft_send_heartbeats(raft);
5a9b53a5
HZ
1911 }
1912 /* Check if any commands timeout. Timeout is set to twice the time of
1913 * election base time so that commands can complete properly during
1914 * leader election. E.g. a leader crashed and current node with pending
1915 * commands becomes new leader: the pending commands can still complete
1916 * if the crashed leader has replicated the transactions to majority of
1917 * followers before it crashed. */
1918 struct raft_command *cmd, *next_cmd;
1919 HMAP_FOR_EACH_SAFE (cmd, next_cmd, hmap_node, &raft->commands) {
1920 if (cmd->timestamp
8e354614 1921 && now - cmd->timestamp > raft->election_timer * 2) {
5a9b53a5 1922 raft_command_complete(raft, cmd, RAFT_CMD_TIMEOUT);
1b1d2e6d
BP
1923 }
1924 }
f4bef8b3 1925 raft_reset_ping_timer(raft);
1b1d2e6d
BP
1926 }
1927
1928 /* Do this only at the end; if we did it as soon as we set raft->left or
1929 * raft->failed in handling the RemoveServerReply, then it could easily
1930 * cause references to freed memory in RPC sessions, etc. */
1931 if (raft->left || raft->failed) {
1932 raft_close__(raft);
1933 }
1934}
1935
1936static void
1937raft_wait_session(struct jsonrpc_session *js)
1938{
1939 if (js) {
1940 jsonrpc_session_wait(js);
1941 jsonrpc_session_recv_wait(js);
1942 }
1943}
1944
1945/* Causes the next call to poll_block() to wake up when 'raft' needs to do
1946 * something. */
1947void
1948raft_wait(struct raft *raft)
1949{
1950 if (raft->left || raft->failed) {
1951 return;
1952 }
1953
1954 raft_waiters_wait(raft);
1955
1956 if (raft->listener) {
1957 pstream_wait(raft->listener);
1958 } else {
1959 poll_timer_wait_until(raft->listen_backoff);
1960 }
1961
1962 struct raft_conn *conn;
1963 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1964 raft_wait_session(conn->js);
1965 }
1966
1967 if (!raft->joining) {
1968 poll_timer_wait_until(raft->election_timeout);
1969 } else {
1970 poll_timer_wait_until(raft->join_timeout);
1971 }
1972 if (raft->leaving) {
1973 poll_timer_wait_until(raft->leave_timeout);
1974 }
1975 if (raft->role == RAFT_LEADER || !hmap_is_empty(&raft->commands)) {
1976 poll_timer_wait_until(raft->ping_timeout);
1977 }
1978}
1979
1980static struct raft_waiter *
1981raft_waiter_create(struct raft *raft, enum raft_waiter_type type,
1982 bool start_commit)
1983{
1984 struct raft_waiter *w = xzalloc(sizeof *w);
1985 ovs_list_push_back(&raft->waiters, &w->list_node);
1986 w->commit_ticket = start_commit ? ovsdb_log_commit_start(raft->log) : 0;
1987 w->type = type;
1988 return w;
1989}
1990
1991/* Returns a human-readable representation of 'status' (or NULL if 'status' is
1992 * invalid). */
1993const char *
1994raft_command_status_to_string(enum raft_command_status status)
1995{
1996 switch (status) {
1997 case RAFT_CMD_INCOMPLETE:
1998 return "operation still in progress";
1999 case RAFT_CMD_SUCCESS:
2000 return "success";
2001 case RAFT_CMD_NOT_LEADER:
2002 return "not leader";
2003 case RAFT_CMD_BAD_PREREQ:
2004 return "prerequisite check failed";
2005 case RAFT_CMD_LOST_LEADERSHIP:
2006 return "lost leadership";
2007 case RAFT_CMD_SHUTDOWN:
2008 return "server shutdown";
2009 case RAFT_CMD_IO_ERROR:
2010 return "I/O error";
2011 case RAFT_CMD_TIMEOUT:
2012 return "timeout";
2013 default:
2014 return NULL;
2015 }
2016}
2017
2018/* Converts human-readable status in 's' into status code in '*statusp'.
2019 * Returns true if successful, false if 's' is unknown. */
2020bool
2021raft_command_status_from_string(const char *s,
2022 enum raft_command_status *statusp)
2023{
2024 for (enum raft_command_status status = 0; ; status++) {
2025 const char *s2 = raft_command_status_to_string(status);
2026 if (!s2) {
2027 *statusp = 0;
2028 return false;
2029 } else if (!strcmp(s, s2)) {
2030 *statusp = status;
2031 return true;
2032 }
2033 }
2034}
2035
2036static const struct uuid *
2037raft_get_eid(const struct raft *raft, uint64_t index)
2038{
2039 for (; index >= raft->log_start; index--) {
2040 const struct raft_entry *e = raft_get_entry(raft, index);
2041 if (e->data) {
2042 return &e->eid;
2043 }
2044 }
2045 return &raft->snap.eid;
2046}
2047
2cd62f75 2048const struct uuid *
1b1d2e6d
BP
2049raft_current_eid(const struct raft *raft)
2050{
2051 return raft_get_eid(raft, raft->log_end - 1);
2052}
2053
2054static struct raft_command *
2055raft_command_create_completed(enum raft_command_status status)
2056{
2057 ovs_assert(status != RAFT_CMD_INCOMPLETE);
2058
2059 struct raft_command *cmd = xzalloc(sizeof *cmd);
2060 cmd->n_refs = 1;
2061 cmd->status = status;
2062 return cmd;
2063}
2064
2065static struct raft_command *
2066raft_command_create_incomplete(struct raft *raft, uint64_t index)
2067{
2068 struct raft_command *cmd = xzalloc(sizeof *cmd);
2069 cmd->n_refs = 2; /* One for client, one for raft->commands. */
2070 cmd->index = index;
2071 cmd->status = RAFT_CMD_INCOMPLETE;
2072 hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
2073 return cmd;
2074}
2075
2076static struct raft_command * OVS_WARN_UNUSED_RESULT
2077raft_command_initiate(struct raft *raft,
2078 const struct json *data, const struct json *servers,
8e354614 2079 uint64_t election_timer, const struct uuid *eid)
1b1d2e6d
BP
2080{
2081 /* Write to local log. */
2082 uint64_t index = raft->log_end;
2083 if (!raft_handle_write_error(
2084 raft, raft_write_entry(
2085 raft, raft->term, json_nullable_clone(data), eid,
8e354614
HZ
2086 json_nullable_clone(servers),
2087 election_timer))) {
1b1d2e6d
BP
2088 return raft_command_create_completed(RAFT_CMD_IO_ERROR);
2089 }
2090
2091 struct raft_command *cmd = raft_command_create_incomplete(raft, index);
7ef36089
HZ
2092 ovs_assert(eid);
2093 cmd->eid = *eid;
5a9b53a5 2094 cmd->timestamp = time_msec();
1b1d2e6d
BP
2095
2096 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index = cmd->index;
2097
67dba070
HZ
2098 if (failure_test == FT_CRASH_BEFORE_SEND_APPEND_REQ) {
2099 ovs_fatal(0, "Raft test: crash before sending append_request.");
2100 }
1b1d2e6d
BP
2101 /* Write to remote logs. */
2102 struct raft_server *s;
2103 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2104 if (!uuid_equals(&s->sid, &raft->sid) && s->next_index == index) {
2105 raft_send_append_request(raft, s, 1, "execute command");
2106 s->next_index++;
2107 }
2108 }
67dba070
HZ
2109 if (failure_test == FT_CRASH_AFTER_SEND_APPEND_REQ) {
2110 ovs_fatal(0, "Raft test: crash after sending append_request.");
2111 }
0f954f32 2112 raft_reset_ping_timer(raft);
1b1d2e6d
BP
2113
2114 return cmd;
2115}
2116
5a9b53a5
HZ
2117static void
2118log_all_commands(struct raft *raft)
2119{
2120 struct raft_command *cmd, *next;
2121 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2122 VLOG_DBG("raft command eid: "UUID_FMT, UUID_ARGS(&cmd->eid));
2123 }
2124}
2125
1b1d2e6d 2126static struct raft_command * OVS_WARN_UNUSED_RESULT
8e354614
HZ
2127raft_command_execute__(struct raft *raft, const struct json *data,
2128 const struct json *servers, uint64_t election_timer,
1b1d2e6d
BP
2129 const struct uuid *prereq, struct uuid *result)
2130{
2131 if (raft->joining || raft->leaving || raft->left || raft->failed) {
2132 return raft_command_create_completed(RAFT_CMD_SHUTDOWN);
2133 }
2134
2135 if (raft->role != RAFT_LEADER) {
2136 /* Consider proxying the command to the leader. We can only do that if
2137 * we know the leader and the command does not change the set of
2138 * servers. We do not proxy commands without prerequisites, even
2139 * though we could, because in an OVSDB context a log entry doesn't
2140 * make sense without context. */
8e354614 2141 if (servers || election_timer || !data
1b1d2e6d
BP
2142 || raft->role != RAFT_FOLLOWER || uuid_is_zero(&raft->leader_sid)
2143 || !prereq) {
2144 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2145 }
2146 }
2147
2148 struct uuid eid = data ? uuid_random() : UUID_ZERO;
2149 if (result) {
2150 *result = eid;
2151 }
2152
2153 if (raft->role != RAFT_LEADER) {
2154 const union raft_rpc rpc = {
2155 .execute_command_request = {
2156 .common = {
2157 .type = RAFT_RPC_EXECUTE_COMMAND_REQUEST,
2158 .sid = raft->leader_sid,
2159 },
2160 .data = CONST_CAST(struct json *, data),
2161 .prereq = *prereq,
2162 .result = eid,
2163 }
2164 };
67dba070
HZ
2165 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REQ) {
2166 ovs_fatal(0, "Raft test: crash before sending "
2167 "execute_command_request");
2168 }
1b1d2e6d
BP
2169 if (!raft_send(raft, &rpc)) {
2170 /* Couldn't send command, so it definitely failed. */
2171 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2172 }
67dba070
HZ
2173 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REQ) {
2174 ovs_fatal(0, "Raft test: crash after sending "
2175 "execute_command_request");
2176 }
1b1d2e6d
BP
2177
2178 struct raft_command *cmd = raft_command_create_incomplete(raft, 0);
2179 cmd->timestamp = time_msec();
2180 cmd->eid = eid;
5a9b53a5 2181 log_all_commands(raft);
1b1d2e6d
BP
2182 return cmd;
2183 }
2184
2185 const struct uuid *current_eid = raft_current_eid(raft);
2186 if (prereq && !uuid_equals(prereq, current_eid)) {
2187 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2188 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
2189 "prerequisite "UUID_FMT,
2190 UUID_ARGS(current_eid), UUID_ARGS(prereq));
2191 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ);
2192 }
2193
8e354614 2194 return raft_command_initiate(raft, data, servers, election_timer, &eid);
1b1d2e6d
BP
2195}
2196
2197/* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2198 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2199 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2200 * populated with the entry ID for the new log entry.
2201 *
2202 * Returns a "struct raft_command" that may be used to track progress adding
2203 * the log entry. The caller must eventually free the returned structure, with
2204 * raft_command_unref(). */
2205struct raft_command * OVS_WARN_UNUSED_RESULT
2206raft_command_execute(struct raft *raft, const struct json *data,
2207 const struct uuid *prereq, struct uuid *result)
2208{
8e354614 2209 return raft_command_execute__(raft, data, NULL, 0, prereq, result);
1b1d2e6d
BP
2210}
2211
2212/* Returns the status of 'cmd'. */
2213enum raft_command_status
2214raft_command_get_status(const struct raft_command *cmd)
2215{
2216 ovs_assert(cmd->n_refs > 0);
2217 return cmd->status;
2218}
2219
2220/* Returns the index of the log entry at which 'cmd' was committed.
2221 *
2222 * This function works only with successful commands. */
2223uint64_t
2224raft_command_get_commit_index(const struct raft_command *cmd)
2225{
2226 ovs_assert(cmd->n_refs > 0);
2227 ovs_assert(cmd->status == RAFT_CMD_SUCCESS);
2228 return cmd->index;
2229}
2230
2231/* Frees 'cmd'. */
2232void
2233raft_command_unref(struct raft_command *cmd)
2234{
2235 if (cmd) {
2236 ovs_assert(cmd->n_refs > 0);
2237 if (!--cmd->n_refs) {
2238 free(cmd);
2239 }
2240 }
2241}
2242
2243/* Causes poll_block() to wake up when 'cmd' has status to report. */
2244void
2245raft_command_wait(const struct raft_command *cmd)
2246{
2247 if (cmd->status != RAFT_CMD_INCOMPLETE) {
2248 poll_immediate_wake();
2249 }
2250}
2251
2252static void
2253raft_command_complete(struct raft *raft,
2254 struct raft_command *cmd,
2255 enum raft_command_status status)
2256{
5a9b53a5
HZ
2257 VLOG_DBG("raft_command_complete eid "UUID_FMT" status: %s",
2258 UUID_ARGS(&cmd->eid), raft_command_status_to_string(status));
1b1d2e6d
BP
2259 if (!uuid_is_zero(&cmd->sid)) {
2260 uint64_t commit_index = status == RAFT_CMD_SUCCESS ? cmd->index : 0;
2261 raft_send_execute_command_reply(raft, &cmd->sid, &cmd->eid, status,
2262 commit_index);
2263 }
2264
2265 ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
2266 ovs_assert(cmd->n_refs > 0);
2267 hmap_remove(&raft->commands, &cmd->hmap_node);
2268 cmd->status = status;
2269 raft_command_unref(cmd);
2270}
2271
2272static void
2273raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
2274{
2275 struct raft_command *cmd, *next;
2276 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2277 raft_command_complete(raft, cmd, status);
2278 }
2279}
2280
1b1d2e6d
BP
2281static struct raft_command *
2282raft_find_command_by_eid(struct raft *raft, const struct uuid *eid)
2283{
2284 struct raft_command *cmd;
2285
2286 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2287 if (uuid_equals(&cmd->eid, eid)) {
2288 return cmd;
2289 }
2290 }
2291 return NULL;
2292}
2293\f
2294#define RAFT_RPC(ENUM, NAME) \
2295 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2296RAFT_RPC_TYPES
2297#undef RAFT_RPC
2298
2299static void
2300raft_handle_hello_request(struct raft *raft OVS_UNUSED,
2301 const struct raft_hello_request *hello OVS_UNUSED)
2302{
2303}
2304
2305/* 'sid' is the server being added. */
2306static void
2307raft_send_add_server_reply__(struct raft *raft, const struct uuid *sid,
2308 const char *address,
2309 bool success, const char *comment)
2310{
2311 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2312 if (!VLOG_DROP_INFO(&rl)) {
2313 struct ds s = DS_EMPTY_INITIALIZER;
2314 char buf[SID_LEN + 1];
2315 ds_put_format(&s, "adding %s ("SID_FMT" at %s) "
2316 "to cluster "CID_FMT" %s",
2317 raft_get_nickname(raft, sid, buf, sizeof buf),
2318 SID_ARGS(sid), address, CID_ARGS(&raft->cid),
2319 success ? "succeeded" : "failed");
2320 if (comment) {
2321 ds_put_format(&s, " (%s)", comment);
2322 }
2323 VLOG_INFO("%s", ds_cstr(&s));
2324 ds_destroy(&s);
2325 }
2326
2327 union raft_rpc rpy = {
2328 .add_server_reply = {
2329 .common = {
2330 .type = RAFT_RPC_ADD_SERVER_REPLY,
2331 .sid = *sid,
2332 .comment = CONST_CAST(char *, comment),
2333 },
2334 .success = success,
2335 }
2336 };
2337
2338 struct sset *remote_addresses = &rpy.add_server_reply.remote_addresses;
2339 sset_init(remote_addresses);
2340 if (!raft->joining) {
2341 struct raft_server *s;
2342 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2343 if (!uuid_equals(&s->sid, &raft->sid)) {
2344 sset_add(remote_addresses, s->address);
2345 }
2346 }
2347 }
2348
2349 raft_send(raft, &rpy);
2350
2351 sset_destroy(remote_addresses);
2352}
2353
2354static void
17bd4149
BP
2355raft_send_remove_server_reply_rpc(struct raft *raft,
2356 const struct uuid *dst_sid,
2357 const struct uuid *target_sid,
1b1d2e6d
BP
2358 bool success, const char *comment)
2359{
17bd4149
BP
2360 if (uuid_equals(&raft->sid, dst_sid)) {
2361 if (success && uuid_equals(&raft->sid, target_sid)) {
2362 raft_finished_leaving_cluster(raft);
2363 }
2364 return;
2365 }
2366
1b1d2e6d
BP
2367 const union raft_rpc rpy = {
2368 .remove_server_reply = {
2369 .common = {
2370 .type = RAFT_RPC_REMOVE_SERVER_REPLY,
17bd4149 2371 .sid = *dst_sid,
1b1d2e6d
BP
2372 .comment = CONST_CAST(char *, comment),
2373 },
17bd4149
BP
2374 .target_sid = (uuid_equals(dst_sid, target_sid)
2375 ? UUID_ZERO
2376 : *target_sid),
1b1d2e6d
BP
2377 .success = success,
2378 }
2379 };
2380 raft_send(raft, &rpy);
2381}
2382
2383static void
2384raft_send_remove_server_reply__(struct raft *raft,
2385 const struct uuid *target_sid,
2386 const struct uuid *requester_sid,
2387 struct unixctl_conn *requester_conn,
2388 bool success, const char *comment)
2389{
2390 struct ds s = DS_EMPTY_INITIALIZER;
2391 ds_put_format(&s, "request ");
2392 if (!uuid_is_zero(requester_sid)) {
2393 char buf[SID_LEN + 1];
2394 ds_put_format(&s, "by %s",
2395 raft_get_nickname(raft, requester_sid, buf, sizeof buf));
2396 } else {
2397 ds_put_cstr(&s, "via unixctl");
2398 }
2399 ds_put_cstr(&s, " to remove ");
2400 if (!requester_conn && uuid_equals(target_sid, requester_sid)) {
2401 ds_put_cstr(&s, "itself");
2402 } else {
2403 char buf[SID_LEN + 1];
2404 ds_put_cstr(&s, raft_get_nickname(raft, target_sid, buf, sizeof buf));
17bd4149
BP
2405 if (uuid_equals(target_sid, &raft->sid)) {
2406 ds_put_cstr(&s, " (ourselves)");
2407 }
1b1d2e6d
BP
2408 }
2409 ds_put_format(&s, " from cluster "CID_FMT" %s",
2410 CID_ARGS(&raft->cid),
2411 success ? "succeeded" : "failed");
2412 if (comment) {
2413 ds_put_format(&s, " (%s)", comment);
2414 }
2415
2416 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2417 VLOG_INFO_RL(&rl, "%s", ds_cstr(&s));
2418
2419 /* Send RemoveServerReply to the requester (which could be a server or a
2420 * unixctl connection. Also always send it to the removed server; this
2421 * allows it to be sure that it's really removed and update its log and
2422 * disconnect permanently. */
2423 if (!uuid_is_zero(requester_sid)) {
17bd4149 2424 raft_send_remove_server_reply_rpc(raft, requester_sid, target_sid,
1b1d2e6d
BP
2425 success, comment);
2426 }
2427 if (!uuid_equals(requester_sid, target_sid)) {
17bd4149
BP
2428 raft_send_remove_server_reply_rpc(raft, target_sid, target_sid,
2429 success, comment);
1b1d2e6d
BP
2430 }
2431 if (requester_conn) {
2432 if (success) {
2433 unixctl_command_reply(requester_conn, ds_cstr(&s));
2434 } else {
2435 unixctl_command_reply_error(requester_conn, ds_cstr(&s));
2436 }
2437 }
2438
2439 ds_destroy(&s);
2440}
2441
2442static void
2443raft_send_add_server_reply(struct raft *raft,
2444 const struct raft_add_server_request *rq,
2445 bool success, const char *comment)
2446{
2447 return raft_send_add_server_reply__(raft, &rq->common.sid, rq->address,
2448 success, comment);
2449}
2450
2451static void
2452raft_send_remove_server_reply(struct raft *raft,
2453 const struct raft_remove_server_request *rq,
2454 bool success, const char *comment)
2455{
2456 return raft_send_remove_server_reply__(raft, &rq->sid, &rq->common.sid,
2457 rq->requester_conn, success,
2458 comment);
2459}
2460
2461static void
2462raft_become_follower(struct raft *raft)
2463{
2464 raft->leader_sid = UUID_ZERO;
2465 if (raft->role == RAFT_FOLLOWER) {
2466 return;
2467 }
2468
2469 raft->role = RAFT_FOLLOWER;
0f954f32 2470 raft_reset_election_timer(raft);
1b1d2e6d
BP
2471
2472 /* Notify clients about lost leadership.
2473 *
2474 * We do not reverse our changes to 'raft->servers' because the new
2475 * configuration is already part of the log. Possibly the configuration
2476 * log entry will not be committed, but until we know that we must use the
2477 * new configuration. Our AppendEntries processing will properly update
2478 * the server configuration later, if necessary. */
2479 struct raft_server *s;
2480 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
2481 raft_send_add_server_reply__(raft, &s->sid, s->address, false,
2482 RAFT_SERVER_LOST_LEADERSHIP);
2483 }
2484 if (raft->remove_server) {
2485 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
2486 &raft->remove_server->requester_sid,
2487 raft->remove_server->requester_conn,
2488 false, RAFT_SERVER_LOST_LEADERSHIP);
2489 raft_server_destroy(raft->remove_server);
2490 raft->remove_server = NULL;
2491 }
2492
2493 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2494}
2495
2496static void
2497raft_send_append_request(struct raft *raft,
2498 struct raft_server *peer, unsigned int n,
2499 const char *comment)
2500{
2501 ovs_assert(raft->role == RAFT_LEADER);
2502
2503 const union raft_rpc rq = {
2504 .append_request = {
2505 .common = {
2506 .type = RAFT_RPC_APPEND_REQUEST,
2507 .sid = peer->sid,
2508 .comment = CONST_CAST(char *, comment),
2509 },
2510 .term = raft->term,
2511 .prev_log_index = peer->next_index - 1,
2512 .prev_log_term = (peer->next_index - 1 >= raft->log_start
2513 ? raft->entries[peer->next_index - 1
2514 - raft->log_start].term
2515 : raft->snap.term),
2516 .leader_commit = raft->commit_index,
2517 .entries = &raft->entries[peer->next_index - raft->log_start],
2518 .n_entries = n,
2519 },
2520 };
2521 raft_send(raft, &rq);
2522}
2523
2524static void
2525raft_send_heartbeats(struct raft *raft)
2526{
2527 struct raft_server *s;
2528 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2529 if (!uuid_equals(&raft->sid, &s->sid)) {
2530 raft_send_append_request(raft, s, 0, "heartbeat");
2531 }
2532 }
2533
2534 /* Send anyone waiting for a command to complete a ping to let them
2535 * know we're still working on it. */
2536 struct raft_command *cmd;
2537 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2538 if (!uuid_is_zero(&cmd->sid)) {
2539 raft_send_execute_command_reply(raft, &cmd->sid,
2540 &cmd->eid,
2541 RAFT_CMD_INCOMPLETE, 0);
2542 }
2543 }
0f954f32
HZ
2544
2545 raft_reset_ping_timer(raft);
1b1d2e6d
BP
2546}
2547
2548/* Initializes the fields in 's' that represent the leader's view of the
2549 * server. */
2550static void
2551raft_server_init_leader(struct raft *raft, struct raft_server *s)
2552{
2553 s->next_index = raft->log_end;
2554 s->match_index = 0;
2555 s->phase = RAFT_PHASE_STABLE;
89771c1e 2556 s->replied = false;
1b1d2e6d
BP
2557}
2558
923f01ca
HZ
2559static void
2560raft_set_leader(struct raft *raft, const struct uuid *sid)
2561{
2562 raft->leader_sid = *sid;
2833885f 2563 raft->ever_had_leader = raft->had_leader = true;
923f01ca
HZ
2564 raft->candidate_retrying = false;
2565}
2566
1b1d2e6d
BP
2567static void
2568raft_become_leader(struct raft *raft)
2569{
5a9b53a5 2570 log_all_commands(raft);
1b1d2e6d
BP
2571
2572 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
2573 VLOG_INFO_RL(&rl, "term %"PRIu64": elected leader by %d+ of "
2574 "%"PRIuSIZE" servers", raft->term,
2575 raft->n_votes, hmap_count(&raft->servers));
2576
2577 ovs_assert(raft->role != RAFT_LEADER);
2578 raft->role = RAFT_LEADER;
923f01ca 2579 raft_set_leader(raft, &raft->sid);
89771c1e 2580 raft_reset_election_timer(raft);
0f954f32 2581 raft_reset_ping_timer(raft);
1b1d2e6d
BP
2582
2583 struct raft_server *s;
2584 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2585 raft_server_init_leader(raft, s);
2586 }
2587
8e354614
HZ
2588 raft->election_timer_new = 0;
2589
1b1d2e6d 2590 raft_update_our_match_index(raft, raft->log_end - 1);
1b1d2e6d
BP
2591
2592 /* Write the fact that we are leader to the log. This is not used by the
2593 * algorithm (although it could be, for quick restart), but it is used for
2594 * offline analysis to check for conformance with the properties that Raft
2595 * guarantees. */
2596 struct raft_record r = {
2597 .type = RAFT_REC_LEADER,
2598 .term = raft->term,
2599 .sid = raft->sid,
2600 };
2601 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2602
2603 /* Initiate a no-op commit. Otherwise we might never find out what's in
2604 * the log. See section 6.4 item 1:
2605 *
2606 * The Leader Completeness Property guarantees that a leader has all
2607 * committed entries, but at the start of its term, it may not know
2608 * which those are. To find out, it needs to commit an entry from its
2609 * term. Raft handles this by having each leader commit a blank no-op
2610 * entry into the log at the start of its term. As soon as this no-op
2611 * entry is committed, the leader’s commit index will be at least as
2612 * large as any other servers’ during its term.
2613 */
8e354614
HZ
2614 raft_command_unref(raft_command_execute__(raft, NULL, NULL, 0, NULL,
2615 NULL));
1b1d2e6d
BP
2616}
2617
2618/* Processes term 'term' received as part of RPC 'common'. Returns true if the
2619 * caller should continue processing the RPC, false if the caller should reject
2620 * it due to a stale term. */
2621static bool
2622raft_receive_term__(struct raft *raft, const struct raft_rpc_common *common,
2623 uint64_t term)
2624{
2625 /* Section 3.3 says:
2626 *
2627 * Current terms are exchanged whenever servers communicate; if one
2628 * server’s current term is smaller than the other’s, then it updates
2629 * its current term to the larger value. If a candidate or leader
2630 * discovers that its term is out of date, it immediately reverts to
2631 * follower state. If a server receives a request with a stale term
2632 * number, it rejects the request.
2633 */
2634 if (term > raft->term) {
2635 if (!raft_set_term(raft, term, NULL)) {
2636 /* Failed to update the term to 'term'. */
2637 return false;
2638 }
2639 raft_become_follower(raft);
2640 } else if (term < raft->term) {
2641 char buf[SID_LEN + 1];
2642 VLOG_INFO("rejecting term %"PRIu64" < current term %"PRIu64" received "
2643 "in %s message from server %s",
2644 term, raft->term,
2645 raft_rpc_type_to_string(common->type),
2646 raft_get_nickname(raft, &common->sid, buf, sizeof buf));
2647 return false;
2648 }
2649 return true;
2650}
2651
2652static void
2653raft_get_servers_from_log(struct raft *raft, enum vlog_level level)
2654{
2655 const struct json *servers_json = raft->snap.servers;
2656 for (uint64_t index = raft->log_end - 1; index >= raft->log_start;
2657 index--) {
2658 struct raft_entry *e = &raft->entries[index - raft->log_start];
2659 if (e->servers) {
2660 servers_json = e->servers;
2661 break;
2662 }
2663 }
2664
2665 struct hmap servers;
2666 struct ovsdb_error *error = raft_servers_from_json(servers_json, &servers);
2667 ovs_assert(!error);
2668 raft_set_servers(raft, &servers, level);
2669 raft_servers_destroy(&servers);
2670}
2671
2672/* Truncates the log, so that raft->log_end becomes 'new_end'.
2673 *
2674 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2675 * log file, but we don't have the right information to know how long it should
2676 * be. What we actually do is to append entries for older indexes to the
2677 * on-disk log; when we re-read it later, these entries truncate the log.
2678 *
2679 * Returns true if any of the removed log entries were server configuration
2680 * entries, false otherwise. */
2681static bool
2682raft_truncate(struct raft *raft, uint64_t new_end)
2683{
2684 ovs_assert(new_end >= raft->log_start);
2685 if (raft->log_end > new_end) {
2686 char buf[SID_LEN + 1];
2687 VLOG_INFO("%s truncating %"PRIu64 " entries from end of log",
2688 raft_get_nickname(raft, &raft->sid, buf, sizeof buf),
2689 raft->log_end - new_end);
2690 }
2691
2692 bool servers_changed = false;
2693 while (raft->log_end > new_end) {
2694 struct raft_entry *entry = &raft->entries[--raft->log_end
2695 - raft->log_start];
2696 if (entry->servers) {
2697 servers_changed = true;
2698 }
2699 raft_entry_uninit(entry);
2700 }
2701 return servers_changed;
2702}
2703
2704static const struct json *
2705raft_peek_next_entry(struct raft *raft, struct uuid *eid)
2706{
2707 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2708 ovs_assert(raft->log_start <= raft->last_applied + 2);
2709 ovs_assert(raft->last_applied <= raft->commit_index);
2710 ovs_assert(raft->commit_index < raft->log_end);
2711
2712 if (raft->joining || raft->failed) {
2713 return NULL;
2714 }
2715
2716 if (raft->log_start == raft->last_applied + 2) {
2717 *eid = raft->snap.eid;
2718 return raft->snap.data;
2719 }
2720
2721 while (raft->last_applied < raft->commit_index) {
2722 const struct raft_entry *e = raft_get_entry(raft,
2723 raft->last_applied + 1);
2724 if (e->data) {
2725 *eid = e->eid;
2726 return e->data;
2727 }
2728 raft->last_applied++;
2729 }
2730 return NULL;
2731}
2732
2733static const struct json *
2734raft_get_next_entry(struct raft *raft, struct uuid *eid)
2735{
2736 const struct json *data = raft_peek_next_entry(raft, eid);
2737 if (data) {
2738 raft->last_applied++;
2739 }
2740 return data;
2741}
2742
0f954f32
HZ
2743/* Updates commit index in raft log. If commit index is already up-to-date
2744 * it does nothing and return false, otherwise, returns true. */
2745static bool
1b1d2e6d
BP
2746raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
2747{
2748 if (new_commit_index <= raft->commit_index) {
0f954f32 2749 return false;
1b1d2e6d
BP
2750 }
2751
2752 if (raft->role == RAFT_LEADER) {
2753 while (raft->commit_index < new_commit_index) {
2754 uint64_t index = ++raft->commit_index;
2755 const struct raft_entry *e = raft_get_entry(raft, index);
1b1d2e6d
BP
2756 if (e->data) {
2757 struct raft_command *cmd
5a9b53a5 2758 = raft_find_command_by_eid(raft, &e->eid);
1b1d2e6d 2759 if (cmd) {
5a9b53a5
HZ
2760 if (!cmd->index) {
2761 VLOG_DBG("Command completed after role change from"
2762 " follower to leader "UUID_FMT,
2763 UUID_ARGS(&e->eid));
2764 cmd->index = index;
2765 }
1b1d2e6d
BP
2766 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2767 }
2768 }
8e354614
HZ
2769 if (e->election_timer) {
2770 VLOG_INFO("Election timer changed from %"PRIu64" to %"PRIu64,
2771 raft->election_timer, e->election_timer);
2772 raft->election_timer = e->election_timer;
2773 raft->election_timer_new = 0;
2774 }
8b37ed75
BP
2775 if (e->servers) {
2776 /* raft_run_reconfigure() can write a new Raft entry, which can
2777 * reallocate raft->entries, which would invalidate 'e', so
2778 * this case must be last, after the one for 'e->data'. */
2779 raft_run_reconfigure(raft);
2780 }
1b1d2e6d
BP
2781 }
2782 } else {
8e354614
HZ
2783 while (raft->commit_index < new_commit_index) {
2784 uint64_t index = ++raft->commit_index;
2785 const struct raft_entry *e = raft_get_entry(raft, index);
2786 if (e->election_timer) {
2787 VLOG_INFO("Election timer changed from %"PRIu64" to %"PRIu64,
2788 raft->election_timer, e->election_timer);
2789 raft->election_timer = e->election_timer;
2790 }
2791 }
5a9b53a5
HZ
2792 /* Check if any pending command can be completed, and complete it.
2793 * This can happen when leader fail-over before sending
2794 * execute_command_reply. */
2795 const struct uuid *eid = raft_get_eid(raft, new_commit_index);
2796 struct raft_command *cmd = raft_find_command_by_eid(raft, eid);
2797 if (cmd) {
2798 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2799 VLOG_INFO_RL(&rl,
2800 "Command completed without reply (eid: "UUID_FMT", "
2801 "commit index: %"PRIu64")",
2802 UUID_ARGS(eid), new_commit_index);
2803 cmd->index = new_commit_index;
2804 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2805 }
1b1d2e6d
BP
2806 }
2807
2808 /* Write the commit index to the log. The next time we restart, this
2809 * allows us to start exporting a reasonably fresh log, instead of a log
2810 * that only contains the snapshot. */
2811 struct raft_record r = {
2812 .type = RAFT_REC_COMMIT_INDEX,
2813 .commit_index = raft->commit_index,
2814 };
2815 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
0f954f32 2816 return true;
1b1d2e6d
BP
2817}
2818
2819/* This doesn't use rq->entries (but it does use rq->n_entries). */
2820static void
2821raft_send_append_reply(struct raft *raft, const struct raft_append_request *rq,
2822 enum raft_append_result result, const char *comment)
2823{
2824 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2825 * min(leaderCommit, index of last new entry)" */
2826 if (result == RAFT_APPEND_OK && rq->leader_commit > raft->commit_index) {
2827 raft_update_commit_index(
2828 raft, MIN(rq->leader_commit, rq->prev_log_index + rq->n_entries));
2829 }
2830
2831 /* Send reply. */
2832 union raft_rpc reply = {
2833 .append_reply = {
2834 .common = {
2835 .type = RAFT_RPC_APPEND_REPLY,
2836 .sid = rq->common.sid,
2837 .comment = CONST_CAST(char *, comment),
2838 },
2839 .term = raft->term,
2840 .log_end = raft->log_end,
2841 .prev_log_index = rq->prev_log_index,
2842 .prev_log_term = rq->prev_log_term,
2843 .n_entries = rq->n_entries,
2844 .result = result,
2845 }
2846 };
2847 raft_send(raft, &reply);
2848}
2849
2850/* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2851 * NULL. Otherwise, returns an explanation for the mismatch. */
2852static const char *
2853match_index_and_term(const struct raft *raft,
2854 uint64_t prev_log_index, uint64_t prev_log_term)
2855{
2856 if (prev_log_index < raft->log_start - 1) {
2857 return "mismatch before start of log";
2858 } else if (prev_log_index == raft->log_start - 1) {
2859 if (prev_log_term != raft->snap.term) {
2860 return "prev_term mismatch";
2861 }
2862 } else if (prev_log_index < raft->log_end) {
2863 if (raft->entries[prev_log_index - raft->log_start].term
2864 != prev_log_term) {
2865 return "term mismatch";
2866 }
2867 } else {
2868 /* prev_log_index >= raft->log_end */
2869 return "mismatch past end of log";
2870 }
2871 return NULL;
2872}
2873
2874static void
2875raft_handle_append_entries(struct raft *raft,
2876 const struct raft_append_request *rq,
2877 uint64_t prev_log_index, uint64_t prev_log_term,
2878 const struct raft_entry *entries,
2879 unsigned int n_entries)
2880{
2881 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2882 * the index and term of the entry in its log that immediately precedes
2883 * the new entries. If the follower does not find an entry in its log
2884 * with the same index and term, then it refuses the new entries." */
2885 const char *mismatch = match_index_and_term(raft, prev_log_index,
2886 prev_log_term);
2887 if (mismatch) {
2888 VLOG_INFO("rejecting append_request because previous entry "
2889 "%"PRIu64",%"PRIu64" not in local log (%s)",
2890 prev_log_term, prev_log_index, mismatch);
2891 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY, mismatch);
2892 return;
2893 }
2894
2895 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2896 * index but different terms), delete the existing entry and all that
2897 * follow it." */
2898 unsigned int i;
2899 bool servers_changed = false;
2900 for (i = 0; ; i++) {
2901 if (i >= n_entries) {
2902 /* No change. */
2903 if (rq->common.comment
2904 && !strcmp(rq->common.comment, "heartbeat")) {
2905 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "heartbeat");
2906 } else {
2907 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2908 }
2909 return;
2910 }
2911
2912 uint64_t log_index = (prev_log_index + 1) + i;
2913 if (log_index >= raft->log_end) {
2914 break;
2915 }
2916 if (raft->entries[log_index - raft->log_start].term
2917 != entries[i].term) {
2918 if (raft_truncate(raft, log_index)) {
2919 servers_changed = true;
2920 }
2921 break;
2922 }
2923 }
2924
67dba070
HZ
2925 if (failure_test == FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE) {
2926 ovs_fatal(0, "Raft test: crash after receiving append_request with "
2927 "update.");
2928 }
1b1d2e6d
BP
2929 /* Figure 3.1: "Append any entries not already in the log." */
2930 struct ovsdb_error *error = NULL;
2931 bool any_written = false;
2932 for (; i < n_entries; i++) {
2933 const struct raft_entry *e = &entries[i];
2934 error = raft_write_entry(raft, e->term,
2935 json_nullable_clone(e->data), &e->eid,
8e354614
HZ
2936 json_nullable_clone(e->servers),
2937 e->election_timer);
1b1d2e6d
BP
2938 if (error) {
2939 break;
2940 }
2941 any_written = true;
2942 if (e->servers) {
2943 servers_changed = true;
2944 }
2945 }
2946
2947 if (any_written) {
2948 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index
2949 = raft->log_end - 1;
2950 }
2951 if (servers_changed) {
2952 /* The set of servers might have changed; check. */
2953 raft_get_servers_from_log(raft, VLL_INFO);
2954 }
2955
2956 if (error) {
2957 char *s = ovsdb_error_to_string_free(error);
2958 VLOG_ERR("%s", s);
2959 free(s);
2960 raft_send_append_reply(raft, rq, RAFT_APPEND_IO_ERROR, "I/O error");
2961 return;
2962 }
2963
2964 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "log updated");
2965}
2966
2967static bool
2968raft_update_leader(struct raft *raft, const struct uuid *sid)
2969{
2970 if (raft->role == RAFT_LEADER) {
2971 char buf[SID_LEN + 1];
2972 VLOG_ERR("this server is leader but server %s claims to be",
2973 raft_get_nickname(raft, sid, buf, sizeof buf));
2974 return false;
2975 } else if (!uuid_equals(sid, &raft->leader_sid)) {
2976 if (!uuid_is_zero(&raft->leader_sid)) {
2977 char buf1[SID_LEN + 1];
2978 char buf2[SID_LEN + 1];
2979 VLOG_ERR("leader for term %"PRIu64" changed from %s to %s",
2980 raft->term,
2981 raft_get_nickname(raft, &raft->leader_sid,
2982 buf1, sizeof buf1),
2983 raft_get_nickname(raft, sid, buf2, sizeof buf2));
2984 } else {
2985 char buf[SID_LEN + 1];
2986 VLOG_INFO("server %s is leader for term %"PRIu64,
2987 raft_get_nickname(raft, sid, buf, sizeof buf),
2988 raft->term);
2989 }
923f01ca 2990 raft_set_leader(raft, sid);
1b1d2e6d
BP
2991
2992 /* Record the leader to the log. This is not used by the algorithm
2993 * (although it could be, for quick restart), but it is used for
2994 * offline analysis to check for conformance with the properties
2995 * that Raft guarantees. */
2996 struct raft_record r = {
2997 .type = RAFT_REC_LEADER,
2998 .term = raft->term,
2999 .sid = *sid,
3000 };
3001 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
3002 }
93ee4209
HZ
3003 if (raft->role == RAFT_CANDIDATE) {
3004 /* Section 3.4: While waiting for votes, a candidate may
3005 * receive an AppendEntries RPC from another server claiming to
3006 * be leader. If the leader’s term (included in its RPC) is at
3007 * least as large as the candidate’s current term, then the
3008 * candidate recognizes the leader as legitimate and returns to
3009 * follower state. */
3010 raft->role = RAFT_FOLLOWER;
3011 }
1b1d2e6d
BP
3012 return true;
3013}
3014
3015static void
3016raft_handle_append_request(struct raft *raft,
3017 const struct raft_append_request *rq)
3018{
3019 /* We do not check whether the server that sent the request is part of the
3020 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
3021 * from a leader that is not part of the server’s latest configuration.
3022 * Otherwise, a new server could never be added to the cluster (it would
3023 * never accept any log entries preceding the configuration entry that adds
3024 * the server)." */
3025 if (!raft_update_leader(raft, &rq->common.sid)) {
3026 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
3027 "usurped leadership");
3028 return;
3029 }
0f954f32 3030 raft_reset_election_timer(raft);
1b1d2e6d
BP
3031
3032 /* First check for the common case, where the AppendEntries request is
3033 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
3034 * like this:
3035 *
3036 * rq->prev_log_index
3037 * | first_entry_index
3038 * | | nth_entry_index
3039 * | | |
3040 * v v v
3041 * +---+---+---+---+
3042 * T | T | T | T | T |
3043 * +---+-------+---+
3044 * +---+---+---+---+
3045 * T | T | T | T | T |
3046 * +---+---+---+---+
3047 * ^ ^
3048 * | |
3049 * log_start log_end
3050 * */
3051 uint64_t first_entry_index = rq->prev_log_index + 1;
3052 uint64_t nth_entry_index = rq->prev_log_index + rq->n_entries;
3053 if (OVS_LIKELY(first_entry_index >= raft->log_start)) {
3054 raft_handle_append_entries(raft, rq,
3055 rq->prev_log_index, rq->prev_log_term,
3056 rq->entries, rq->n_entries);
3057 return;
3058 }
3059
3060 /* Now a series of checks for odd cases, where the AppendEntries request
3061 * extends earlier than the beginning of our log, into the log entries
3062 * discarded by the most recent snapshot. */
3063
3064 /*
3065 * Handle the case where the indexes covered by rq->entries[] are entirely
3066 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
3067 * everything in the AppendEntries request must already have been
3068 * committed, and we might as well return true.
3069 *
3070 * rq->prev_log_index
3071 * | first_entry_index
3072 * | | nth_entry_index
3073 * | | |
3074 * v v v
3075 * +---+---+---+---+
3076 * T | T | T | T | T |
3077 * +---+-------+---+
3078 * +---+---+---+---+
3079 * T | T | T | T | T |
3080 * +---+---+---+---+
3081 * ^ ^
3082 * | |
3083 * log_start log_end
3084 */
3085 if (nth_entry_index < raft->log_start - 1) {
3086 raft_send_append_reply(raft, rq, RAFT_APPEND_OK,
3087 "append before log start");
3088 return;
3089 }
3090
3091 /*
3092 * Handle the case where the last entry in rq->entries[] has the same index
3093 * as 'log_start - 1', so we can compare their terms:
3094 *
3095 * rq->prev_log_index
3096 * | first_entry_index
3097 * | | nth_entry_index
3098 * | | |
3099 * v v v
3100 * +---+---+---+---+
3101 * T | T | T | T | T |
3102 * +---+-------+---+
3103 * +---+---+---+---+
3104 * T | T | T | T | T |
3105 * +---+---+---+---+
3106 * ^ ^
3107 * | |
3108 * log_start log_end
3109 *
3110 * There's actually a sub-case where rq->n_entries == 0, in which we
3111 * compare rq->prev_term:
3112 *
3113 * rq->prev_log_index
3114 * |
3115 * |
3116 * |
3117 * v
3118 * T
3119 *
3120 * +---+---+---+---+
3121 * T | T | T | T | T |
3122 * +---+---+---+---+
3123 * ^ ^
3124 * | |
3125 * log_start log_end
3126 */
3127 if (nth_entry_index == raft->log_start - 1) {
3128 if (rq->n_entries
3129 ? raft->snap.term == rq->entries[rq->n_entries - 1].term
3130 : raft->snap.term == rq->prev_log_term) {
3131 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
3132 } else {
3133 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
3134 "term mismatch");
3135 }
3136 return;
3137 }
3138
3139 /*
3140 * We now know that the data in rq->entries[] overlaps the data in
3141 * raft->entries[], as shown below, with some positive 'ofs':
3142 *
3143 * rq->prev_log_index
3144 * | first_entry_index
3145 * | | nth_entry_index
3146 * | | |
3147 * v v v
3148 * +---+---+---+---+---+
3149 * T | T | T | T | T | T |
3150 * +---+-------+---+---+
3151 * +---+---+---+---+
3152 * T | T | T | T | T |
3153 * +---+---+---+---+
3154 * ^ ^
3155 * | |
3156 * log_start log_end
3157 *
3158 * |<-- ofs -->|
3159 *
3160 * We transform this into the following by trimming the first 'ofs'
3161 * elements off of rq->entries[], ending up with the following. Notice how
3162 * we retain the term but not the data for rq->entries[ofs - 1]:
3163 *
3164 * first_entry_index + ofs - 1
3165 * | first_entry_index + ofs
3166 * | | nth_entry_index + ofs
3167 * | | |
3168 * v v v
3169 * +---+---+
3170 * T | T | T |
3171 * +---+---+
3172 * +---+---+---+---+
3173 * T | T | T | T | T |
3174 * +---+---+---+---+
3175 * ^ ^
3176 * | |
3177 * log_start log_end
3178 */
3179 uint64_t ofs = raft->log_start - first_entry_index;
3180 raft_handle_append_entries(raft, rq,
3181 raft->log_start - 1, rq->entries[ofs - 1].term,
3182 &rq->entries[ofs], rq->n_entries - ofs);
3183}
3184
3185/* Returns true if 'raft' has another log entry or snapshot to read. */
3186bool
3187raft_has_next_entry(const struct raft *raft_)
3188{
3189 struct raft *raft = CONST_CAST(struct raft *, raft_);
3190 struct uuid eid;
3191 return raft_peek_next_entry(raft, &eid) != NULL;
3192}
3193
3194/* Returns the next log entry or snapshot from 'raft', or NULL if there are
3195 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
3196 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
3197 * log entry. */
3198const struct json *
3199raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
3200{
3201 const struct json *data = raft_get_next_entry(raft, eid);
3202 *is_snapshot = data == raft->snap.data;
3203 return data;
3204}
3205
3206/* Returns the log index of the last-read snapshot or log entry. */
3207uint64_t
3208raft_get_applied_index(const struct raft *raft)
3209{
3210 return raft->last_applied;
3211}
3212
3213/* Returns the log index of the last snapshot or log entry that is available to
3214 * be read. */
3215uint64_t
3216raft_get_commit_index(const struct raft *raft)
3217{
3218 return raft->commit_index;
3219}
3220
3221static struct raft_server *
3222raft_find_peer(struct raft *raft, const struct uuid *uuid)
3223{
3224 struct raft_server *s = raft_find_server(raft, uuid);
3225 return s && !uuid_equals(&raft->sid, &s->sid) ? s : NULL;
3226}
3227
3228static struct raft_server *
3229raft_find_new_server(struct raft *raft, const struct uuid *uuid)
3230{
3231 return raft_server_find(&raft->add_servers, uuid);
3232}
3233
3234/* Figure 3.1: "If there exists an N such that N > commitIndex, a
3235 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
3236 * commitIndex = N (sections 3.5 and 3.6)." */
3237static void
3238raft_consider_updating_commit_index(struct raft *raft)
3239{
3240 /* This loop cannot just bail out when it comes across a log entry that
3241 * does not match the criteria. For example, Figure 3.7(d2) shows a
3242 * case where the log entry for term 2 cannot be committed directly
3243 * (because it is not for the current term) but it can be committed as
3244 * a side effect of commit the entry for term 4 (the current term).
3245 * XXX Is there a more efficient way to do this? */
3246 ovs_assert(raft->role == RAFT_LEADER);
3247
3248 uint64_t new_commit_index = raft->commit_index;
3249 for (uint64_t idx = MAX(raft->commit_index + 1, raft->log_start);
3250 idx < raft->log_end; idx++) {
3251 if (raft->entries[idx - raft->log_start].term == raft->term) {
3252 size_t count = 0;
3253 struct raft_server *s2;
3254 HMAP_FOR_EACH (s2, hmap_node, &raft->servers) {
3255 if (s2->match_index >= idx) {
3256 count++;
3257 }
3258 }
3259 if (count > hmap_count(&raft->servers) / 2) {
3260 VLOG_DBG("index %"PRIu64" committed to %"PRIuSIZE" servers, "
3261 "applying", idx, count);
3262 new_commit_index = idx;
3263 }
3264 }
3265 }
0f954f32
HZ
3266 if (raft_update_commit_index(raft, new_commit_index)) {
3267 raft_send_heartbeats(raft);
3268 }
1b1d2e6d
BP
3269}
3270
3271static void
3272raft_update_match_index(struct raft *raft, struct raft_server *s,
3273 uint64_t min_index)
3274{
3275 ovs_assert(raft->role == RAFT_LEADER);
3276 if (min_index > s->match_index) {
3277 s->match_index = min_index;
3278 raft_consider_updating_commit_index(raft);
3279 }
3280}
3281
3282static void
3283raft_update_our_match_index(struct raft *raft, uint64_t min_index)
3284{
e8208c66
BP
3285 struct raft_server *server = raft_find_server(raft, &raft->sid);
3286 if (server) {
3287 raft_update_match_index(raft, server, min_index);
3288 }
1b1d2e6d
BP
3289}
3290
3291static void
3292raft_send_install_snapshot_request(struct raft *raft,
3293 const struct raft_server *s,
3294 const char *comment)
3295{
3296 union raft_rpc rpc = {
3297 .install_snapshot_request = {
3298 .common = {
3299 .type = RAFT_RPC_INSTALL_SNAPSHOT_REQUEST,
3300 .sid = s->sid,
3301 .comment = CONST_CAST(char *, comment),
3302 },
3303 .term = raft->term,
3304 .last_index = raft->log_start - 1,
3305 .last_term = raft->snap.term,
3306 .last_servers = raft->snap.servers,
3307 .last_eid = raft->snap.eid,
3308 .data = raft->snap.data,
9bfb280a 3309 .election_timer = raft->election_timer, /* use latest value */
1b1d2e6d
BP
3310 }
3311 };
8c2c503b
IM
3312
3313 if (s->last_install_snapshot_request) {
3314 struct raft_install_snapshot_request *old, *new;
3315
3316 old = s->last_install_snapshot_request;
3317 new = &rpc.install_snapshot_request;
3318 if ( old->term == new->term
3319 && old->last_index == new->last_index
3320 && old->last_term == new->last_term
3321 && old->last_servers == new->last_servers
3322 && old->data == new->data
3323 && old->election_timer == new->election_timer
3324 && uuid_equals(&old->last_eid, &new->last_eid)) {
3325 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3326
3327 VLOG_WARN_RL(&rl, "not sending exact same install_snapshot_request"
3328 " to server %s again", s->nickname);
3329 return;
3330 }
3331 }
3332 free(s->last_install_snapshot_request);
3333 CONST_CAST(struct raft_server *, s)->last_install_snapshot_request
3334 = xmemdup(&rpc.install_snapshot_request,
3335 sizeof rpc.install_snapshot_request);
3336
1b1d2e6d
BP
3337 raft_send(raft, &rpc);
3338}
3339
3340static void
3341raft_handle_append_reply(struct raft *raft,
3342 const struct raft_append_reply *rpy)
3343{
3344 if (raft->role != RAFT_LEADER) {
3345 VLOG_INFO("rejected append_reply (not leader)");
3346 return;
3347 }
3348
3349 /* Most commonly we'd be getting an AppendEntries reply from a configured
3350 * server (e.g. a peer), but we can also get them from servers in the
3351 * process of being added. */
3352 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3353 if (!s) {
3354 s = raft_find_new_server(raft, &rpy->common.sid);
3355 if (!s) {
3356 VLOG_INFO("rejected append_reply from unknown server "SID_FMT,
3357 SID_ARGS(&rpy->common.sid));
3358 return;
3359 }
3360 }
3361
89771c1e 3362 s->replied = true;
1b1d2e6d
BP
3363 if (rpy->result == RAFT_APPEND_OK) {
3364 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3365 * follower (section 3.5)." */
3366 uint64_t min_index = rpy->prev_log_index + rpy->n_entries + 1;
3367 if (s->next_index < min_index) {
3368 s->next_index = min_index;
3369 }
3370 raft_update_match_index(raft, s, min_index - 1);
3371 } else {
3372 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3373 * decrement nextIndex and retry (section 3.5)."
3374 *
3375 * We also implement the optimization suggested in section 4.2.1:
3376 * "Various approaches can make nextIndex converge to its correct value
3377 * more quickly, including those described in Chapter 3. The simplest
3378 * approach to solving this particular problem of adding a new server,
3379 * however, is to have followers return the length of their logs in the
3380 * AppendEntries response; this allows the leader to cap the follower’s
3381 * nextIndex accordingly." */
3382 s->next_index = (s->next_index > 0
3383 ? MIN(s->next_index - 1, rpy->log_end)
3384 : 0);
3385
3386 if (rpy->result == RAFT_APPEND_IO_ERROR) {
3387 /* Append failed but not because of a log inconsistency. Because
3388 * of the I/O error, there's no point in re-sending the append
3389 * immediately. */
3390 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3391 VLOG_INFO_RL(&rl, "%s reported I/O error", s->nickname);
3392 return;
3393 }
3394 }
3395
3396 /*
3397 * Our behavior here must depend on the value of next_index relative to
3398 * log_start and log_end. There are three cases:
3399 *
3400 * Case 1 | Case 2 | Case 3
3401 * <---------------->|<------------->|<------------------>
3402 * | |
3403 *
3404 * +---+---+---+---+
3405 * T | T | T | T | T |
3406 * +---+---+---+---+
3407 * ^ ^
3408 * | |
3409 * log_start log_end
3410 */
3411 if (s->next_index < raft->log_start) {
3412 /* Case 1. */
3413 raft_send_install_snapshot_request(raft, s, NULL);
3414 } else if (s->next_index < raft->log_end) {
3415 /* Case 2. */
99c2dc8d 3416 raft_send_append_request(raft, s, raft->log_end - s->next_index, NULL);
1b1d2e6d
BP
3417 } else {
3418 /* Case 3. */
3419 if (s->phase == RAFT_PHASE_CATCHUP) {
3420 s->phase = RAFT_PHASE_CAUGHT_UP;
3421 raft_run_reconfigure(raft);
3422 }
3423 }
3424}
3425
3426static bool
3427raft_should_suppress_disruptive_server(struct raft *raft,
3428 const union raft_rpc *rpc)
3429{
3430 if (rpc->type != RAFT_RPC_VOTE_REQUEST) {
3431 return false;
3432 }
3433
3434 /* Section 4.2.3 "Disruptive Servers" says:
3435 *
3436 * ...if a server receives a RequestVote request within the minimum
3437 * election timeout of hearing from a current leader, it does not update
3438 * its term or grant its vote...
3439 *
3440 * ...This change conflicts with the leadership transfer mechanism as
3441 * described in Chapter 3, in which a server legitimately starts an
3442 * election without waiting an election timeout. In that case,
3443 * RequestVote messages should be processed by other servers even when
3444 * they believe a current cluster leader exists. Those RequestVote
3445 * requests can include a special flag to indicate this behavior (“I
3446 * have permission to disrupt the leader--it told me to!”).
3447 *
3448 * This clearly describes how the followers should act, but not the leader.
3449 * We just ignore vote requests that arrive at a current leader. This
3450 * seems to be fairly safe, since a majority other than the current leader
3451 * can still elect a new leader and the first AppendEntries from that new
3452 * leader will depose the current leader. */
3453 const struct raft_vote_request *rq = raft_vote_request_cast(rpc);
3454 if (rq->leadership_transfer) {
3455 return false;
3456 }
3457
3458 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3459 long long int now = time_msec();
3460 switch (raft->role) {
3461 case RAFT_LEADER:
3462 VLOG_WARN_RL(&rl, "ignoring vote request received as leader");
3463 return true;
3464
3465 case RAFT_FOLLOWER:
8e354614 3466 if (now < raft->election_base + raft->election_timer) {
1b1d2e6d 3467 VLOG_WARN_RL(&rl, "ignoring vote request received after only "
8e354614
HZ
3468 "%lld ms (minimum election time is %"PRIu64" ms)",
3469 now - raft->election_base, raft->election_timer);
1b1d2e6d
BP
3470 return true;
3471 }
3472 return false;
3473
3474 case RAFT_CANDIDATE:
3475 return false;
3476
3477 default:
3478 OVS_NOT_REACHED();
3479 }
3480}
3481
3482/* Returns true if a reply should be sent. */
3483static bool
3484raft_handle_vote_request__(struct raft *raft,
3485 const struct raft_vote_request *rq)
3486{
3487 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3488 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3489 * 3.6)." */
3490 if (uuid_equals(&raft->vote, &rq->common.sid)) {
3491 /* Already voted for this candidate in this term. Resend vote. */
3492 return true;
3493 } else if (!uuid_is_zero(&raft->vote)) {
3494 /* Already voted for different candidate in this term. Send a reply
3495 * saying what candidate we did vote for. This isn't a necessary part
3496 * of the Raft protocol but it can make debugging easier. */
3497 return true;
3498 }
3499
3500 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3501 * includes information about the candidate’s log, and the voter denies its
3502 * vote if its own log is more up-to-date than that of the candidate. Raft
3503 * determines which of two logs is more up-to-date by comparing the index
3504 * and term of the last entries in the logs. If the logs have last entries
3505 * with different terms, then the log with the later term is more
3506 * up-to-date. If the logs end with the same term, then whichever log is
3507 * longer is more up-to-date." */
3508 uint64_t last_term = (raft->log_end > raft->log_start
3509 ? raft->entries[raft->log_end - 1
3510 - raft->log_start].term
3511 : raft->snap.term);
3512 if (last_term > rq->last_log_term
3513 || (last_term == rq->last_log_term
3514 && raft->log_end - 1 > rq->last_log_index)) {
3515 /* Our log is more up-to-date than the peer's. Withhold vote. */
3516 return false;
3517 }
3518
3519 /* Record a vote for the peer. */
3520 if (!raft_set_term(raft, raft->term, &rq->common.sid)) {
3521 return false;
3522 }
3523
0f954f32 3524 raft_reset_election_timer(raft);
1b1d2e6d
BP
3525
3526 return true;
3527}
3528
3529static void
3530raft_send_vote_reply(struct raft *raft, const struct uuid *dst,
3531 const struct uuid *vote)
3532{
3533 union raft_rpc rpy = {
3534 .vote_reply = {
3535 .common = {
3536 .type = RAFT_RPC_VOTE_REPLY,
3537 .sid = *dst,
3538 },
3539 .term = raft->term,
3540 .vote = *vote,
3541 },
3542 };
3543 raft_send(raft, &rpy);
3544}
3545
3546static void
3547raft_handle_vote_request(struct raft *raft,
3548 const struct raft_vote_request *rq)
3549{
3550 if (raft_handle_vote_request__(raft, rq)) {
3551 raft_send_vote_reply(raft, &rq->common.sid, &raft->vote);
3552 }
3553}
3554
3555static void
3556raft_handle_vote_reply(struct raft *raft,
3557 const struct raft_vote_reply *rpy)
3558{
3559 if (!raft_receive_term__(raft, &rpy->common, rpy->term)) {
3560 return;
3561 }
3562
3563 if (raft->role != RAFT_CANDIDATE) {
3564 return;
3565 }
3566
3567 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3568 if (s) {
3569 raft_accept_vote(raft, s, &rpy->vote);
3570 }
3571}
3572
3573/* Returns true if 'raft''s log contains reconfiguration entries that have not
3574 * yet been committed. */
3575static bool
3576raft_has_uncommitted_configuration(const struct raft *raft)
3577{
3578 for (uint64_t i = raft->commit_index + 1; i < raft->log_end; i++) {
3579 ovs_assert(i >= raft->log_start);
3580 const struct raft_entry *e = &raft->entries[i - raft->log_start];
3581 if (e->servers) {
3582 return true;
3583 }
3584 }
3585 return false;
3586}
3587
3588static void
3589raft_log_reconfiguration(struct raft *raft)
3590{
3591 struct json *servers_json = raft_servers_to_json(&raft->servers);
3592 raft_command_unref(raft_command_execute__(
8e354614 3593 raft, NULL, servers_json, 0, NULL, NULL));
1b1d2e6d
BP
3594 json_destroy(servers_json);
3595}
3596
3597static void
3598raft_run_reconfigure(struct raft *raft)
3599{
3600 ovs_assert(raft->role == RAFT_LEADER);
3601
3602 /* Reconfiguration only progresses when configuration changes commit. */
3603 if (raft_has_uncommitted_configuration(raft)) {
3604 return;
3605 }
3606
3607 /* If we were waiting for a configuration change to commit, it's done. */
3608 struct raft_server *s;
3609 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3610 if (s->phase == RAFT_PHASE_COMMITTING) {
3611 raft_send_add_server_reply__(raft, &s->sid, s->address,
3612 true, RAFT_SERVER_COMPLETED);
3613 s->phase = RAFT_PHASE_STABLE;
3614 }
3615 }
3616 if (raft->remove_server) {
3617 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
3618 &raft->remove_server->requester_sid,
3619 raft->remove_server->requester_conn,
3620 true, RAFT_SERVER_COMPLETED);
3621 raft_server_destroy(raft->remove_server);
3622 raft->remove_server = NULL;
3623 }
3624
3625 /* If a new server is caught up, add it to the configuration. */
3626 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
3627 if (s->phase == RAFT_PHASE_CAUGHT_UP) {
3628 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3629 hmap_remove(&raft->add_servers, &s->hmap_node);
3630 hmap_insert(&raft->servers, &s->hmap_node, uuid_hash(&s->sid));
3631
3632 /* Mark 's' as waiting for commit. */
3633 s->phase = RAFT_PHASE_COMMITTING;
3634
3635 raft_log_reconfiguration(raft);
3636
3637 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3638 * send a RAFT_SERVER_OK reply. */
3639
3640 return;
3641 }
3642 }
3643
3644 /* Remove a server, if one is scheduled for removal. */
3645 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3646 if (s->phase == RAFT_PHASE_REMOVE) {
3647 hmap_remove(&raft->servers, &s->hmap_node);
3648 raft->remove_server = s;
3649
3650 raft_log_reconfiguration(raft);
3651
3652 return;
3653 }
3654 }
3655}
3656
3657static void
3658raft_handle_add_server_request(struct raft *raft,
3659 const struct raft_add_server_request *rq)
3660{
3661 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3662 if (raft->role != RAFT_LEADER) {
3663 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3664 return;
3665 }
3666
3667 /* Check for an existing server. */
3668 struct raft_server *s = raft_find_server(raft, &rq->common.sid);
3669 if (s) {
3670 /* If the server is scheduled to be removed, cancel it. */
3671 if (s->phase == RAFT_PHASE_REMOVE) {
3672 s->phase = RAFT_PHASE_STABLE;
3673 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_CANCELED);
3674 return;
3675 }
3676
3677 /* If the server is being added, then it's in progress. */
3678 if (s->phase != RAFT_PHASE_STABLE) {
3679 raft_send_add_server_reply(raft, rq,
3680 false, RAFT_SERVER_IN_PROGRESS);
3681 }
3682
3683 /* Nothing to do--server is already part of the configuration. */
3684 raft_send_add_server_reply(raft, rq,
3685 true, RAFT_SERVER_ALREADY_PRESENT);
3686 return;
3687 }
3688
3689 /* Check for a server being removed. */
3690 if (raft->remove_server
3691 && uuid_equals(&rq->common.sid, &raft->remove_server->sid)) {
3692 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3693 return;
3694 }
3695
3696 /* Check for a server already being added. */
3697 if (raft_find_new_server(raft, &rq->common.sid)) {
3698 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_IN_PROGRESS);
3699 return;
3700 }
3701
3702 /* Add server to 'add_servers'. */
3703 s = raft_server_add(&raft->add_servers, &rq->common.sid, rq->address);
3704 raft_server_init_leader(raft, s);
3705 s->requester_sid = rq->common.sid;
3706 s->requester_conn = NULL;
3707 s->phase = RAFT_PHASE_CATCHUP;
3708
3709 /* Start sending the log. If this is the first time we've tried to add
3710 * this server, then this will quickly degenerate into an InstallSnapshot
3711 * followed by a series of AddEntries, but if it's a retry of an earlier
3712 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3713 * leadership) then it will gracefully resume populating the log.
3714 *
3715 * See the last few paragraphs of section 4.2.1 for further insight. */
3716 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3717 VLOG_INFO_RL(&rl,
3718 "starting to add server %s ("SID_FMT" at %s) "
3719 "to cluster "CID_FMT, s->nickname, SID_ARGS(&s->sid),
3720 rq->address, CID_ARGS(&raft->cid));
3721 raft_send_append_request(raft, s, 0, "initialize new server");
3722}
3723
3724static void
3725raft_handle_add_server_reply(struct raft *raft,
3726 const struct raft_add_server_reply *rpy)
3727{
3728 if (!raft->joining) {
3729 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3730 VLOG_WARN_RL(&rl, "received add_server_reply even though we're "
3731 "already part of the cluster");
3732 return;
3733 }
3734
3735 if (rpy->success) {
3736 raft->joining = false;
3737
3738 /* It is tempting, at this point, to check that this server is part of
3739 * the current configuration. However, this is not necessarily the
3740 * case, because the log entry that added this server to the cluster
3741 * might have been committed by a majority of the cluster that does not
3742 * include this one. This actually happens in testing. */
3743 } else {
3744 const char *address;
3745 SSET_FOR_EACH (address, &rpy->remote_addresses) {
3746 if (sset_add(&raft->remote_addresses, address)) {
3747 VLOG_INFO("%s: learned new server address for joining cluster",
3748 address);
3749 }
3750 }
3751 }
3752}
3753
3754/* This is called by raft_unixctl_kick() as well as via RPC. */
3755static void
3756raft_handle_remove_server_request(struct raft *raft,
3757 const struct raft_remove_server_request *rq)
3758{
3759 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3760 if (raft->role != RAFT_LEADER) {
3761 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3762 return;
3763 }
3764
3765 /* If the server to remove is currently waiting to be added, cancel it. */
3766 struct raft_server *target = raft_find_new_server(raft, &rq->sid);
3767 if (target) {
3768 raft_send_add_server_reply__(raft, &target->sid, target->address,
3769 false, RAFT_SERVER_CANCELED);
3770 hmap_remove(&raft->add_servers, &target->hmap_node);
3771 raft_server_destroy(target);
3772 return;
3773 }
3774
3775 /* If the server isn't configured, report that. */
3776 target = raft_find_server(raft, &rq->sid);
3777 if (!target) {
3778 raft_send_remove_server_reply(raft, rq,
3779 true, RAFT_SERVER_ALREADY_GONE);
3780 return;
3781 }
3782
3783 /* Check whether we're waiting for the addition of the server to commit. */
3784 if (target->phase == RAFT_PHASE_COMMITTING) {
3785 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3786 return;
3787 }
3788
3789 /* Check whether the server is already scheduled for removal. */
3790 if (target->phase == RAFT_PHASE_REMOVE) {
3791 raft_send_remove_server_reply(raft, rq,
3792 false, RAFT_SERVER_IN_PROGRESS);
3793 return;
3794 }
3795
3796 /* Make sure that if we remove this server then that at least one other
3797 * server will be left. We don't count servers currently being added (in
3798 * 'add_servers') since those could fail. */
3799 struct raft_server *s;
3800 int n = 0;
3801 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3802 if (s != target && s->phase != RAFT_PHASE_REMOVE) {
3803 n++;
3804 }
3805 }
3806 if (!n) {
3807 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_EMPTY);
3808 return;
3809 }
3810
3811 /* Mark the server for removal. */
3812 target->phase = RAFT_PHASE_REMOVE;
3813 if (rq->requester_conn) {
3814 target->requester_sid = UUID_ZERO;
3815 unixctl_command_reply(rq->requester_conn, "started removal");
3816 } else {
3817 target->requester_sid = rq->common.sid;
3818 target->requester_conn = NULL;
3819 }
3820
3821 raft_run_reconfigure(raft);
3822 /* Operation in progress, reply will be sent later. */
3823}
3824
3825static void
17bd4149 3826raft_finished_leaving_cluster(struct raft *raft)
1b1d2e6d 3827{
17bd4149
BP
3828 VLOG_INFO(SID_FMT": finished leaving cluster "CID_FMT,
3829 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1b1d2e6d 3830
17bd4149
BP
3831 raft_record_note(raft, "left", "this server left the cluster");
3832
3833 raft->leaving = false;
3834 raft->left = true;
3835}
1b1d2e6d 3836
17bd4149
BP
3837static void
3838raft_handle_remove_server_reply(struct raft *raft,
3839 const struct raft_remove_server_reply *rpc)
3840{
3841 if (rpc->success
3842 && (uuid_is_zero(&rpc->target_sid)
3843 || uuid_equals(&rpc->target_sid, &raft->sid))) {
3844 raft_finished_leaving_cluster(raft);
1b1d2e6d
BP
3845 }
3846}
3847
3848static bool
3849raft_handle_write_error(struct raft *raft, struct ovsdb_error *error)
3850{
3851 if (error && !raft->failed) {
3852 raft->failed = true;
3853
3854 char *s = ovsdb_error_to_string_free(error);
3855 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3856 raft->name, s);
3857 free(s);
3858 }
3859 return !raft->failed;
3860}
3861
3862static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3863raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
3864 uint64_t new_log_start,
3865 const struct raft_entry *new_snapshot)
3866{
3867 struct raft_header h = {
3868 .sid = raft->sid,
3869 .cid = raft->cid,
3870 .name = raft->name,
3871 .local_address = raft->local_address,
3872 .snap_index = new_log_start - 1,
3873 .snap = *new_snapshot,
3874 };
3875 struct ovsdb_error *error = ovsdb_log_write_and_free(
3876 log, raft_header_to_json(&h));
3877 if (error) {
3878 return error;
3879 }
3880 ovsdb_log_mark_base(raft->log);
3881
3882 /* Write log records. */
3883 for (uint64_t index = new_log_start; index < raft->log_end; index++) {
3884 const struct raft_entry *e = &raft->entries[index - raft->log_start];
3885 struct raft_record r = {
3886 .type = RAFT_REC_ENTRY,
3887 .term = e->term,
3888 .entry = {
3889 .index = index,
3890 .data = e->data,
3891 .servers = e->servers,
8e354614 3892 .election_timer = e->election_timer,
1b1d2e6d
BP
3893 .eid = e->eid,
3894 },
3895 };
3896 error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3897 if (error) {
3898 return error;
3899 }
3900 }
3901
3902 /* Write term and vote (if any).
3903 *
3904 * The term is redundant if we wrote a log record for that term above. The
3905 * vote, if any, is never redundant.
3906 */
3907 error = raft_write_state(log, raft->term, &raft->vote);
3908 if (error) {
3909 return error;
3910 }
3911
3912 /* Write commit_index if it's beyond the new start of the log. */
3913 if (raft->commit_index >= new_log_start) {
3914 struct raft_record r = {
3915 .type = RAFT_REC_COMMIT_INDEX,
3916 .commit_index = raft->commit_index,
3917 };
3918 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3919 }
3920 return NULL;
3921}
3922
3923static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3924raft_save_snapshot(struct raft *raft,
3925 uint64_t new_start, const struct raft_entry *new_snapshot)
3926
3927{
3928 struct ovsdb_log *new_log;
3929 struct ovsdb_error *error;
3930 error = ovsdb_log_replace_start(raft->log, &new_log);
3931 if (error) {
3932 return error;
3933 }
3934
3935 error = raft_write_snapshot(raft, new_log, new_start, new_snapshot);
3936 if (error) {
3937 ovsdb_log_replace_abort(new_log);
3938 return error;
3939 }
3940
3941 return ovsdb_log_replace_commit(raft->log, new_log);
3942}
3943
3944static bool
3945raft_handle_install_snapshot_request__(
3946 struct raft *raft, const struct raft_install_snapshot_request *rq)
3947{
0f954f32 3948 raft_reset_election_timer(raft);
1b1d2e6d
BP
3949
3950 /*
3951 * Our behavior here depend on new_log_start in the snapshot compared to
3952 * log_start and log_end. There are three cases:
3953 *
3954 * Case 1 | Case 2 | Case 3
3955 * <---------------->|<------------->|<------------------>
3956 * | |
3957 *
3958 * +---+---+---+---+
3959 * T | T | T | T | T |
3960 * +---+---+---+---+
3961 * ^ ^
3962 * | |
3963 * log_start log_end
3964 */
3965 uint64_t new_log_start = rq->last_index + 1;
3966 if (new_log_start < raft->log_start) {
3967 /* Case 1: The new snapshot covers less than our current one. Nothing
3968 * to do. */
3969 return true;
3970 } else if (new_log_start < raft->log_end) {
3971 /* Case 2: The new snapshot starts in the middle of our log. We could
3972 * discard the first 'new_log_start - raft->log_start' entries in the
3973 * log. But there's not much value in that, since snapshotting is
3974 * supposed to be a local decision. Just skip it. */
3975 return true;
3976 }
3977
3978 /* Case 3: The new snapshot starts past the end of our current log, so
3979 * discard all of our current log. */
3980 const struct raft_entry new_snapshot = {
3981 .term = rq->last_term,
3982 .data = rq->data,
3983 .eid = rq->last_eid,
3984 .servers = rq->last_servers,
8e354614 3985 .election_timer = rq->election_timer,
1b1d2e6d
BP
3986 };
3987 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3988 &new_snapshot);
3989 if (error) {
3990 char *error_s = ovsdb_error_to_string(error);
3991 VLOG_WARN("could not save snapshot: %s", error_s);
3992 free(error_s);
3993 return false;
3994 }
3995
3996 for (size_t i = 0; i < raft->log_end - raft->log_start; i++) {
3997 raft_entry_uninit(&raft->entries[i]);
3998 }
3999 raft->log_start = raft->log_end = new_log_start;
4000 raft->log_synced = raft->log_end - 1;
4001 raft->commit_index = raft->log_start - 1;
4002 if (raft->last_applied < raft->commit_index) {
4003 raft->last_applied = raft->log_start - 2;
4004 }
4005
4006 raft_entry_uninit(&raft->snap);
4007 raft_entry_clone(&raft->snap, &new_snapshot);
4008
4009 raft_get_servers_from_log(raft, VLL_INFO);
8e354614 4010 raft_get_election_timer_from_log(raft);
1b1d2e6d
BP
4011
4012 return true;
4013}
4014
4015static void
4016raft_handle_install_snapshot_request(
4017 struct raft *raft, const struct raft_install_snapshot_request *rq)
4018{
4019 if (raft_handle_install_snapshot_request__(raft, rq)) {
4020 union raft_rpc rpy = {
4021 .install_snapshot_reply = {
4022 .common = {
4023 .type = RAFT_RPC_INSTALL_SNAPSHOT_REPLY,
4024 .sid = rq->common.sid,
4025 },
4026 .term = raft->term,
4027 .last_index = rq->last_index,
4028 .last_term = rq->last_term,
4029 },
4030 };
4031 raft_send(raft, &rpy);
4032 }
4033}
4034
4035static void
4036raft_handle_install_snapshot_reply(
4037 struct raft *raft, const struct raft_install_snapshot_reply *rpy)
4038{
4039 /* We might get an InstallSnapshot reply from a configured server (e.g. a
4040 * peer) or a server in the process of being added. */
4041 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
4042 if (!s) {
4043 s = raft_find_new_server(raft, &rpy->common.sid);
4044 if (!s) {
4045 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4046 VLOG_INFO_RL(&rl, "cluster "CID_FMT": received %s from "
4047 "unknown server "SID_FMT, CID_ARGS(&raft->cid),
4048 raft_rpc_type_to_string(rpy->common.type),
4049 SID_ARGS(&rpy->common.sid));
4050 return;
4051 }
4052 }
4053
4054 if (rpy->last_index != raft->log_start - 1 ||
4055 rpy->last_term != raft->snap.term) {
4056 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4057 VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s installed "
4058 "out-of-date snapshot, starting over",
4059 CID_ARGS(&raft->cid), s->nickname);
4060 raft_send_install_snapshot_request(raft, s,
4061 "installed obsolete snapshot");
4062 return;
4063 }
4064
4065 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
4066 VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
4067 " up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
4068 s->nickname, rpy->last_term, rpy->last_index);
315e88cb
HZ
4069 s->next_index = raft->log_start;
4070 raft_send_append_request(raft, s, raft->log_end - s->next_index,
4071 "snapshot installed");
1b1d2e6d
BP
4072}
4073
4074/* Returns true if 'raft' has grown enough since the last snapshot that
4075 * reducing the log to a snapshot would be valuable, false otherwise. */
4076bool
4077raft_grew_lots(const struct raft *raft)
4078{
4079 return ovsdb_log_grew_lots(raft->log);
4080}
4081
4082/* Returns the number of log entries that could be trimmed off the on-disk log
4083 * by snapshotting. */
4084uint64_t
4085raft_get_log_length(const struct raft *raft)
4086{
4087 return (raft->last_applied < raft->log_start
4088 ? 0
4089 : raft->last_applied - raft->log_start + 1);
4090}
4091
4092/* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
4093 * possible. */
4094bool
4095raft_may_snapshot(const struct raft *raft)
4096{
4097 return (!raft->joining
4098 && !raft->leaving
4099 && !raft->left
4100 && !raft->failed
4101 && raft->last_applied >= raft->log_start);
4102}
4103
4104/* Replaces the log for 'raft', up to the last log entry read, by
4105 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
4106 * the caller must eventually free.
4107 *
4108 * This function can only succeed if raft_may_snapshot() returns true. It is
4109 * only valuable to call it if raft_get_log_length() is significant and
4110 * especially if raft_grew_lots() returns true. */
4111struct ovsdb_error * OVS_WARN_UNUSED_RESULT
4112raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
4113{
4114 if (raft->joining) {
4115 return ovsdb_error(NULL,
4116 "cannot store a snapshot while joining cluster");
4117 } else if (raft->leaving) {
4118 return ovsdb_error(NULL,
4119 "cannot store a snapshot while leaving cluster");
4120 } else if (raft->left) {
4121 return ovsdb_error(NULL,
4122 "cannot store a snapshot after leaving cluster");
4123 } else if (raft->failed) {
4124 return ovsdb_error(NULL,
4125 "cannot store a snapshot following failure");
4126 }
4127
4128 if (raft->last_applied < raft->log_start) {
4129 return ovsdb_error(NULL, "not storing a duplicate snapshot");
4130 }
4131
4132 uint64_t new_log_start = raft->last_applied + 1;
a521491b 4133 struct raft_entry new_snapshot = {
1b1d2e6d 4134 .term = raft_get_term(raft, new_log_start - 1),
a521491b 4135 .data = json_clone(new_snapshot_data),
1b1d2e6d 4136 .eid = *raft_get_eid(raft, new_log_start - 1),
a521491b 4137 .servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
8e354614 4138 .election_timer = raft->election_timer,
1b1d2e6d
BP
4139 };
4140 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
4141 &new_snapshot);
4142 if (error) {
a521491b 4143 raft_entry_uninit(&new_snapshot);
1b1d2e6d
BP
4144 return error;
4145 }
4146
4147 raft->log_synced = raft->log_end - 1;
4148 raft_entry_uninit(&raft->snap);
a521491b 4149 raft->snap = new_snapshot;
1b1d2e6d
BP
4150 for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
4151 raft_entry_uninit(&raft->entries[i]);
4152 }
4153 memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
4154 (raft->log_end - new_log_start) * sizeof *raft->entries);
4155 raft->log_start = new_log_start;
4156 return NULL;
4157}
4158
4159static void
4160raft_handle_become_leader(struct raft *raft,
4161 const struct raft_become_leader *rq)
4162{
4163 if (raft->role == RAFT_FOLLOWER) {
4164 char buf[SID_LEN + 1];
4165 VLOG_INFO("received leadership transfer from %s in term %"PRIu64,
4166 raft_get_nickname(raft, &rq->common.sid, buf, sizeof buf),
4167 rq->term);
4168 raft_start_election(raft, true);
4169 }
4170}
4171
4172static void
4173raft_send_execute_command_reply(struct raft *raft,
4174 const struct uuid *sid,
4175 const struct uuid *eid,
4176 enum raft_command_status status,
4177 uint64_t commit_index)
4178{
67dba070
HZ
4179 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REP) {
4180 ovs_fatal(0, "Raft test: crash before sending execute_command_reply");
4181 }
1b1d2e6d
BP
4182 union raft_rpc rpc = {
4183 .execute_command_reply = {
4184 .common = {
4185 .type = RAFT_RPC_EXECUTE_COMMAND_REPLY,
4186 .sid = *sid,
4187 },
4188 .result = *eid,
4189 .status = status,
4190 .commit_index = commit_index,
4191 },
4192 };
4193 raft_send(raft, &rpc);
67dba070
HZ
4194 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REP) {
4195 ovs_fatal(0, "Raft test: crash after sending execute_command_reply.");
4196 }
1b1d2e6d
BP
4197}
4198
4199static enum raft_command_status
4200raft_handle_execute_command_request__(
4201 struct raft *raft, const struct raft_execute_command_request *rq)
4202{
4203 if (raft->role != RAFT_LEADER) {
4204 return RAFT_CMD_NOT_LEADER;
4205 }
4206
4207 const struct uuid *current_eid = raft_current_eid(raft);
4208 if (!uuid_equals(&rq->prereq, current_eid)) {
4209 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4210 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
4211 "prerequisite "UUID_FMT" in execute_command_request",
4212 UUID_ARGS(current_eid), UUID_ARGS(&rq->prereq));
4213 return RAFT_CMD_BAD_PREREQ;
4214 }
4215
4216 struct raft_command *cmd = raft_command_initiate(raft, rq->data,
8e354614 4217 NULL, 0, &rq->result);
1b1d2e6d
BP
4218 cmd->sid = rq->common.sid;
4219
4220 enum raft_command_status status = cmd->status;
48b1c764 4221 raft_command_unref(cmd);
1b1d2e6d
BP
4222 return status;
4223}
4224
4225static void
4226raft_handle_execute_command_request(
4227 struct raft *raft, const struct raft_execute_command_request *rq)
4228{
4229 enum raft_command_status status
4230 = raft_handle_execute_command_request__(raft, rq);
4231 if (status != RAFT_CMD_INCOMPLETE) {
4232 raft_send_execute_command_reply(raft, &rq->common.sid, &rq->result,
4233 status, 0);
4234 }
4235}
4236
4237static void
4238raft_handle_execute_command_reply(
4239 struct raft *raft, const struct raft_execute_command_reply *rpy)
4240{
4241 struct raft_command *cmd = raft_find_command_by_eid(raft, &rpy->result);
4242 if (!cmd) {
4243 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4244 char buf[SID_LEN + 1];
4245 VLOG_INFO_RL(&rl,
4246 "%s received \"%s\" reply from %s for unknown command",
4247 raft->local_nickname,
4248 raft_command_status_to_string(rpy->status),
4249 raft_get_nickname(raft, &rpy->common.sid,
4250 buf, sizeof buf));
4251 return;
4252 }
4253
4254 if (rpy->status == RAFT_CMD_INCOMPLETE) {
4255 cmd->timestamp = time_msec();
4256 } else {
4257 cmd->index = rpy->commit_index;
4258 raft_command_complete(raft, cmd, rpy->status);
4259 }
4260}
4261
4262static void
4263raft_handle_rpc(struct raft *raft, const union raft_rpc *rpc)
4264{
4265 uint64_t term = raft_rpc_get_term(rpc);
4266 if (term
4267 && !raft_should_suppress_disruptive_server(raft, rpc)
4268 && !raft_receive_term__(raft, &rpc->common, term)) {
4269 if (rpc->type == RAFT_RPC_APPEND_REQUEST) {
4270 /* Section 3.3: "If a server receives a request with a stale term
4271 * number, it rejects the request." */
4272 raft_send_append_reply(raft, raft_append_request_cast(rpc),
4273 RAFT_APPEND_INCONSISTENCY, "stale term");
4274 }
4275 return;
4276 }
4277
4278 switch (rpc->type) {
4279#define RAFT_RPC(ENUM, NAME) \
4280 case ENUM: \
4281 raft_handle_##NAME(raft, &rpc->NAME); \
4282 break;
4283 RAFT_RPC_TYPES
4284#undef RAFT_RPC
4285 default:
4286 OVS_NOT_REACHED();
4287 }
4288}
4289\f
4290static bool
4291raft_rpc_is_heartbeat(const union raft_rpc *rpc)
4292{
4293 return ((rpc->type == RAFT_RPC_APPEND_REQUEST
4294 || rpc->type == RAFT_RPC_APPEND_REPLY)
4295 && rpc->common.comment
4296 && !strcmp(rpc->common.comment, "heartbeat"));
4297}
4298
4299\f
4300static bool
02acb41a
BP
4301raft_send_to_conn_at(struct raft *raft, const union raft_rpc *rpc,
4302 struct raft_conn *conn, int line_number)
1b1d2e6d 4303{
02acb41a 4304 log_rpc(rpc, "-->", conn, line_number);
1b1d2e6d
BP
4305 return !jsonrpc_session_send(
4306 conn->js, raft_rpc_to_jsonrpc(&raft->cid, &raft->sid, rpc));
4307}
4308
4309static bool
4310raft_is_rpc_synced(const struct raft *raft, const union raft_rpc *rpc)
4311{
4312 uint64_t term = raft_rpc_get_term(rpc);
4313 uint64_t index = raft_rpc_get_min_sync_index(rpc);
4314 const struct uuid *vote = raft_rpc_get_vote(rpc);
4315
4316 return (term <= raft->synced_term
4317 && index <= raft->log_synced
4318 && (!vote || uuid_equals(vote, &raft->synced_vote)));
4319}
4320
4321static bool
02acb41a 4322raft_send_at(struct raft *raft, const union raft_rpc *rpc, int line_number)
1b1d2e6d
BP
4323{
4324 const struct uuid *dst = &rpc->common.sid;
4325 if (uuid_equals(dst, &raft->sid)) {
4326 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
02acb41a
BP
4327 VLOG_WARN_RL(&rl, "attempted to send RPC to self from raft.c:%d",
4328 line_number);
1b1d2e6d
BP
4329 return false;
4330 }
4331
4332 struct raft_conn *conn = raft_find_conn_by_sid(raft, dst);
4333 if (!conn) {
4334 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4335 char buf[SID_LEN + 1];
02acb41a
BP
4336 VLOG_DBG_RL(&rl, "%s: no connection to %s, cannot send RPC "
4337 "from raft.c:%d", raft->local_nickname,
4338 raft_get_nickname(raft, dst, buf, sizeof buf),
4339 line_number);
1b1d2e6d
BP
4340 return false;
4341 }
4342
4343 if (!raft_is_rpc_synced(raft, rpc)) {
4344 raft_waiter_create(raft, RAFT_W_RPC, false)->rpc = raft_rpc_clone(rpc);
4345 return true;
4346 }
4347
02acb41a 4348 return raft_send_to_conn_at(raft, rpc, conn, line_number);
1b1d2e6d
BP
4349}
4350\f
4351static struct raft *
4352raft_lookup_by_name(const char *name)
4353{
4354 struct raft *raft;
4355
4356 HMAP_FOR_EACH_WITH_HASH (raft, hmap_node, hash_string(name, 0),
4357 &all_rafts) {
4358 if (!strcmp(raft->name, name)) {
4359 return raft;
4360 }
4361 }
4362 return NULL;
4363}
4364
4365static void
4366raft_unixctl_cid(struct unixctl_conn *conn,
4367 int argc OVS_UNUSED, const char *argv[],
4368 void *aux OVS_UNUSED)
4369{
4370 struct raft *raft = raft_lookup_by_name(argv[1]);
4371 if (!raft) {
4372 unixctl_command_reply_error(conn, "unknown cluster");
4373 } else if (uuid_is_zero(&raft->cid)) {
4374 unixctl_command_reply_error(conn, "cluster id not yet known");
4375 } else {
4376 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->cid));
4377 unixctl_command_reply(conn, uuid);
4378 free(uuid);
4379 }
4380}
4381
4382static void
4383raft_unixctl_sid(struct unixctl_conn *conn,
4384 int argc OVS_UNUSED, const char *argv[],
4385 void *aux OVS_UNUSED)
4386{
4387 struct raft *raft = raft_lookup_by_name(argv[1]);
4388 if (!raft) {
4389 unixctl_command_reply_error(conn, "unknown cluster");
4390 } else {
4391 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->sid));
4392 unixctl_command_reply(conn, uuid);
4393 free(uuid);
4394 }
4395}
4396
4397static void
4398raft_put_sid(const char *title, const struct uuid *sid,
4399 const struct raft *raft, struct ds *s)
4400{
4401 ds_put_format(s, "%s: ", title);
4402 if (uuid_equals(sid, &raft->sid)) {
4403 ds_put_cstr(s, "self");
4404 } else if (uuid_is_zero(sid)) {
4405 ds_put_cstr(s, "unknown");
4406 } else {
4407 char buf[SID_LEN + 1];
4408 ds_put_cstr(s, raft_get_nickname(raft, sid, buf, sizeof buf));
4409 }
4410 ds_put_char(s, '\n');
4411}
4412
4413static void
4414raft_unixctl_status(struct unixctl_conn *conn,
4415 int argc OVS_UNUSED, const char *argv[],
4416 void *aux OVS_UNUSED)
4417{
4418 struct raft *raft = raft_lookup_by_name(argv[1]);
4419 if (!raft) {
4420 unixctl_command_reply_error(conn, "unknown cluster");
4421 return;
4422 }
4423
4424 struct ds s = DS_EMPTY_INITIALIZER;
4425 ds_put_format(&s, "%s\n", raft->local_nickname);
4426 ds_put_format(&s, "Name: %s\n", raft->name);
4427 ds_put_format(&s, "Cluster ID: ");
4428 if (!uuid_is_zero(&raft->cid)) {
4429 ds_put_format(&s, CID_FMT" ("UUID_FMT")\n",
4430 CID_ARGS(&raft->cid), UUID_ARGS(&raft->cid));
4431 } else {
4432 ds_put_format(&s, "not yet known\n");
4433 }
4434 ds_put_format(&s, "Server ID: "SID_FMT" ("UUID_FMT")\n",
4435 SID_ARGS(&raft->sid), UUID_ARGS(&raft->sid));
4436 ds_put_format(&s, "Address: %s\n", raft->local_address);
4437 ds_put_format(&s, "Status: %s\n",
4438 raft->joining ? "joining cluster"
4439 : raft->leaving ? "leaving cluster"
4440 : raft->left ? "left cluster"
4441 : raft->failed ? "failed"
4442 : "cluster member");
4443 if (raft->joining) {
4444 ds_put_format(&s, "Remotes for joining:");
4445 const char *address;
4446 SSET_FOR_EACH (address, &raft->remote_addresses) {
4447 ds_put_format(&s, " %s", address);
4448 }
4449 ds_put_char(&s, '\n');
4450 }
4451 if (raft->role == RAFT_LEADER) {
4452 struct raft_server *as;
4453 HMAP_FOR_EACH (as, hmap_node, &raft->add_servers) {
4454 ds_put_format(&s, "Adding server %s ("SID_FMT" at %s) (%s)\n",
4455 as->nickname, SID_ARGS(&as->sid), as->address,
4456 raft_server_phase_to_string(as->phase));
4457 }
4458
4459 struct raft_server *rs = raft->remove_server;
4460 if (rs) {
4461 ds_put_format(&s, "Removing server %s ("SID_FMT" at %s) (%s)\n",
4462 rs->nickname, SID_ARGS(&rs->sid), rs->address,
4463 raft_server_phase_to_string(rs->phase));
4464 }
4465 }
4466
4467 ds_put_format(&s, "Role: %s\n",
4468 raft->role == RAFT_LEADER ? "leader"
4469 : raft->role == RAFT_CANDIDATE ? "candidate"
4470 : raft->role == RAFT_FOLLOWER ? "follower"
4471 : "<error>");
4472 ds_put_format(&s, "Term: %"PRIu64"\n", raft->term);
4473 raft_put_sid("Leader", &raft->leader_sid, raft, &s);
4474 raft_put_sid("Vote", &raft->vote, raft, &s);
4475 ds_put_char(&s, '\n');
4476
8e354614
HZ
4477 ds_put_format(&s, "Election timer: %"PRIu64, raft->election_timer);
4478 if (raft->role == RAFT_LEADER && raft->election_timer_new) {
4479 ds_put_format(&s, " (changing to %"PRIu64")",
4480 raft->election_timer_new);
4481 }
4482 ds_put_char(&s, '\n');
4483
1b1d2e6d
BP
4484 ds_put_format(&s, "Log: [%"PRIu64", %"PRIu64"]\n",
4485 raft->log_start, raft->log_end);
4486
4487 uint64_t n_uncommitted = raft->log_end - raft->commit_index - 1;
4488 ds_put_format(&s, "Entries not yet committed: %"PRIu64"\n", n_uncommitted);
4489
4490 uint64_t n_unapplied = raft->log_end - raft->last_applied - 1;
4491 ds_put_format(&s, "Entries not yet applied: %"PRIu64"\n", n_unapplied);
4492
4493 const struct raft_conn *c;
4494 ds_put_cstr(&s, "Connections:");
4495 LIST_FOR_EACH (c, list_node, &raft->conns) {
4496 bool connected = jsonrpc_session_is_connected(c->js);
4497 ds_put_format(&s, " %s%s%s%s",
4498 connected ? "" : "(",
4499 c->incoming ? "<-" : "->", c->nickname,
4500 connected ? "" : ")");
4501 }
4502 ds_put_char(&s, '\n');
4503
4504 ds_put_cstr(&s, "Servers:\n");
4505 struct raft_server *server;
4506 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
4507 ds_put_format(&s, " %s ("SID_FMT" at %s)",
4508 server->nickname,
4509 SID_ARGS(&server->sid), server->address);
4510 if (uuid_equals(&server->sid, &raft->sid)) {
4511 ds_put_cstr(&s, " (self)");
4512 }
4513 if (server->phase != RAFT_PHASE_STABLE) {
4514 ds_put_format (&s, " (%s)",
4515 raft_server_phase_to_string(server->phase));
4516 }
4517 if (raft->role == RAFT_CANDIDATE) {
4518 if (!uuid_is_zero(&server->vote)) {
4519 char buf[SID_LEN + 1];
4520 ds_put_format(&s, " (voted for %s)",
4521 raft_get_nickname(raft, &server->vote,
4522 buf, sizeof buf));
4523 }
4524 } else if (raft->role == RAFT_LEADER) {
4525 ds_put_format(&s, " next_index=%"PRIu64" match_index=%"PRIu64,
4526 server->next_index, server->match_index);
4527 }
4528 ds_put_char(&s, '\n');
4529 }
4530
4531 unixctl_command_reply(conn, ds_cstr(&s));
4532 ds_destroy(&s);
4533}
4534
4535static void
4536raft_unixctl_leave__(struct unixctl_conn *conn, struct raft *raft)
4537{
4538 if (raft_is_leaving(raft)) {
4539 unixctl_command_reply_error(conn,
4540 "already in progress leaving cluster");
4541 } else if (raft_is_joining(raft)) {
4542 unixctl_command_reply_error(conn,
4543 "can't leave while join in progress");
4544 } else if (raft_failed(raft)) {
4545 unixctl_command_reply_error(conn,
4546 "can't leave after failure");
4547 } else {
4548 raft_leave(raft);
4549 unixctl_command_reply(conn, NULL);
4550 }
4551}
4552
4553static void
4554raft_unixctl_leave(struct unixctl_conn *conn, int argc OVS_UNUSED,
4555 const char *argv[], void *aux OVS_UNUSED)
4556{
4557 struct raft *raft = raft_lookup_by_name(argv[1]);
4558 if (!raft) {
4559 unixctl_command_reply_error(conn, "unknown cluster");
4560 return;
4561 }
4562
4563 raft_unixctl_leave__(conn, raft);
4564}
4565
4566static struct raft_server *
4567raft_lookup_server_best_match(struct raft *raft, const char *id)
4568{
4569 struct raft_server *best = NULL;
4570 int best_score = -1;
4571 int n_best = 0;
4572
4573 struct raft_server *s;
4574 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
4575 int score = (!strcmp(id, s->address)
4576 ? INT_MAX
4577 : uuid_is_partial_match(&s->sid, id));
4578 if (score > best_score) {
4579 best = s;
4580 best_score = score;
4581 n_best = 1;
4582 } else if (score == best_score) {
4583 n_best++;
4584 }
4585 }
4586 return n_best == 1 ? best : NULL;
4587}
4588
4589static void
4590raft_unixctl_kick(struct unixctl_conn *conn, int argc OVS_UNUSED,
4591 const char *argv[], void *aux OVS_UNUSED)
4592{
4593 const char *cluster_name = argv[1];
4594 const char *server_name = argv[2];
4595
4596 struct raft *raft = raft_lookup_by_name(cluster_name);
4597 if (!raft) {
4598 unixctl_command_reply_error(conn, "unknown cluster");
4599 return;
4600 }
4601
4602 struct raft_server *server = raft_lookup_server_best_match(raft,
4603 server_name);
4604 if (!server) {
4605 unixctl_command_reply_error(conn, "unknown server");
4606 return;
4607 }
4608
4609 if (uuid_equals(&server->sid, &raft->sid)) {
4610 raft_unixctl_leave__(conn, raft);
4611 } else if (raft->role == RAFT_LEADER) {
4612 const struct raft_remove_server_request rq = {
4613 .sid = server->sid,
4614 .requester_conn = conn,
4615 };
4616 raft_handle_remove_server_request(raft, &rq);
4617 } else {
4618 const union raft_rpc rpc = {
4619 .remove_server_request = {
4620 .common = {
4621 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
4622 .sid = raft->leader_sid,
4623 .comment = "via unixctl"
4624 },
4625 .sid = server->sid,
4626 }
4627 };
4628 if (raft_send(raft, &rpc)) {
4629 unixctl_command_reply(conn, "sent removal request to leader");
4630 } else {
4631 unixctl_command_reply_error(conn,
4632 "failed to send removal request");
4633 }
4634 }
4635}
4636
8e354614
HZ
4637static void
4638raft_get_election_timer_from_log(struct raft *raft)
4639{
4640 if (raft->snap.election_timer) {
4641 raft->election_timer = raft->snap.election_timer;
4642 }
4643 for (uint64_t index = raft->commit_index; index >= raft->log_start;
4644 index--) {
4645 struct raft_entry *e = &raft->entries[index - raft->log_start];
4646 if (e->election_timer) {
4647 raft->election_timer = e->election_timer;
4648 break;
4649 }
4650 }
4651}
4652
4653static void
4654raft_log_election_timer(struct raft *raft)
4655{
4656 raft_command_unref(raft_command_execute__(raft, NULL, NULL,
4657 raft->election_timer_new, NULL,
4658 NULL));
4659}
4660
4661static void
4662raft_unixctl_change_election_timer(struct unixctl_conn *conn,
4663 int argc OVS_UNUSED, const char *argv[],
4664 void *aux OVS_UNUSED)
4665{
4666 const char *cluster_name = argv[1];
4667 const char *election_timer_str = argv[2];
4668
4669 struct raft *raft = raft_lookup_by_name(cluster_name);
4670 if (!raft) {
4671 unixctl_command_reply_error(conn, "unknown cluster");
4672 return;
4673 }
4674
4675 if (raft->role != RAFT_LEADER) {
4676 unixctl_command_reply_error(conn, "election timer must be changed"
4677 " through leader.");
4678 return;
4679 }
4680
4681 /* If there are pending changes for election timer, reject it. */
4682 if (raft->election_timer_new) {
4683 unixctl_command_reply_error(conn, "election timer change pending.");
4684 return;
4685 }
4686
4687 uint64_t election_timer = atoll(election_timer_str);
4688 if (election_timer == raft->election_timer) {
4689 unixctl_command_reply(conn, "change election timer to current value.");
4690 return;
4691 }
4692
4693 /* Election timer smaller than 100ms or bigger than 10min doesn't make
4694 * sense. */
4695 if (election_timer < 100 || election_timer > 600000) {
4696 unixctl_command_reply_error(conn, "election timer must be between "
4697 "100 and 600000, in msec.");
4698 return;
4699 }
4700
4701 /* If election timer is to be enlarged, it should be done gradually so that
4702 * it won't cause timeout when new value is applied on leader but not yet
4703 * applied on some of the followers. */
4704 if (election_timer > raft->election_timer * 2) {
4705 unixctl_command_reply_error(conn, "election timer increase should not "
4706 "exceed the current value x 2.");
4707 return;
4708 }
4709
4710 raft->election_timer_new = election_timer;
4711 raft_log_election_timer(raft);
4712 unixctl_command_reply(conn, "change of election timer initiated.");
4713}
4714
67dba070
HZ
4715static void
4716raft_unixctl_failure_test(struct unixctl_conn *conn OVS_UNUSED,
4717 int argc OVS_UNUSED, const char *argv[],
4718 void *aux OVS_UNUSED)
4719{
4720 const char *test = argv[1];
4721 if (!strcmp(test, "crash-before-sending-append-request")) {
4722 failure_test = FT_CRASH_BEFORE_SEND_APPEND_REQ;
4723 } else if (!strcmp(test, "crash-after-sending-append-request")) {
4724 failure_test = FT_CRASH_AFTER_SEND_APPEND_REQ;
4725 } else if (!strcmp(test, "crash-before-sending-execute-command-reply")) {
4726 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REP;
4727 } else if (!strcmp(test, "crash-after-sending-execute-command-reply")) {
4728 failure_test = FT_CRASH_AFTER_SEND_EXEC_REP;
4729 } else if (!strcmp(test, "crash-before-sending-execute-command-request")) {
4730 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REQ;
4731 } else if (!strcmp(test, "crash-after-sending-execute-command-request")) {
4732 failure_test = FT_CRASH_AFTER_SEND_EXEC_REQ;
4733 } else if (!strcmp(test, "crash-after-receiving-append-request-update")) {
4734 failure_test = FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE;
4735 } else if (!strcmp(test, "delay-election")) {
4736 failure_test = FT_DELAY_ELECTION;
4737 struct raft *raft;
4738 HMAP_FOR_EACH (raft, hmap_node, &all_rafts) {
4739 if (raft->role == RAFT_FOLLOWER) {
4740 raft_reset_election_timer(raft);
4741 }
4742 }
93ee4209
HZ
4743 } else if (!strcmp(test, "dont-send-vote-request")) {
4744 failure_test = FT_DONT_SEND_VOTE_REQUEST;
67dba070
HZ
4745 } else if (!strcmp(test, "clear")) {
4746 failure_test = FT_NO_TEST;
4747 unixctl_command_reply(conn, "test dismissed");
4748 return;
4749 } else {
4750 unixctl_command_reply_error(conn, "unknown test scenario");
4751 return;
4752 }
4753 unixctl_command_reply(conn, "test engaged");
4754}
4755
1b1d2e6d
BP
4756static void
4757raft_init(void)
4758{
4759 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
4760 if (!ovsthread_once_start(&once)) {
4761 return;
4762 }
4763 unixctl_command_register("cluster/cid", "DB", 1, 1,
4764 raft_unixctl_cid, NULL);
4765 unixctl_command_register("cluster/sid", "DB", 1, 1,
4766 raft_unixctl_sid, NULL);
4767 unixctl_command_register("cluster/status", "DB", 1, 1,
4768 raft_unixctl_status, NULL);
4769 unixctl_command_register("cluster/leave", "DB", 1, 1,
4770 raft_unixctl_leave, NULL);
4771 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4772 raft_unixctl_kick, NULL);
8e354614
HZ
4773 unixctl_command_register("cluster/change-election-timer", "DB TIME", 2, 2,
4774 raft_unixctl_change_election_timer, NULL);
67dba070
HZ
4775 unixctl_command_register("cluster/failure-test", "FAILURE SCENARIO", 1, 1,
4776 raft_unixctl_failure_test, NULL);
1b1d2e6d
BP
4777 ovsthread_once_done(&once);
4778}