]> git.proxmox.com Git - mirror_ovs.git/blame - ovsdb/raft.c
raft: Make backlog thresholds configurable.
[mirror_ovs.git] / ovsdb / raft.c
CommitLineData
1b1d2e6d
BP
1/*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <config.h>
18
19#include "raft.h"
20#include "raft-private.h"
21
22#include <errno.h>
23#include <unistd.h>
24
25#include "hash.h"
26#include "jsonrpc.h"
27#include "lockfile.h"
28#include "openvswitch/dynamic-string.h"
29#include "openvswitch/hmap.h"
30#include "openvswitch/json.h"
31#include "openvswitch/list.h"
32#include "openvswitch/poll-loop.h"
33#include "openvswitch/vlog.h"
34#include "ovsdb-error.h"
35#include "ovsdb-parser.h"
36#include "ovsdb/log.h"
37#include "raft-rpc.h"
38#include "random.h"
3423cd97 39#include "simap.h"
1b1d2e6d
BP
40#include "socket-util.h"
41#include "stream.h"
42#include "timeval.h"
43#include "unicode.h"
44#include "unixctl.h"
45#include "util.h"
46#include "uuid.h"
47
48VLOG_DEFINE_THIS_MODULE(raft);
49
50/* Roles for a Raft server:
51 *
52 * - Followers: Servers in touch with the current leader.
53 *
54 * - Candidate: Servers unaware of a current leader and seeking election to
55 * leader.
56 *
57 * - Leader: Handles all client requests. At most one at a time.
58 *
59 * In normal operation there is exactly one leader and all of the other servers
60 * are followers. */
61enum raft_role {
62 RAFT_FOLLOWER,
63 RAFT_CANDIDATE,
64 RAFT_LEADER
65};
66
67dba070
HZ
67/* Flags for unit tests. */
68enum raft_failure_test {
69 FT_NO_TEST,
70 FT_CRASH_BEFORE_SEND_APPEND_REQ,
71 FT_CRASH_AFTER_SEND_APPEND_REQ,
72 FT_CRASH_BEFORE_SEND_EXEC_REP,
73 FT_CRASH_AFTER_SEND_EXEC_REP,
74 FT_CRASH_BEFORE_SEND_EXEC_REQ,
75 FT_CRASH_AFTER_SEND_EXEC_REQ,
76 FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE,
93ee4209
HZ
77 FT_DELAY_ELECTION,
78 FT_DONT_SEND_VOTE_REQUEST
67dba070
HZ
79};
80static enum raft_failure_test failure_test;
81
1b1d2e6d
BP
82/* A connection between this Raft server and another one. */
83struct raft_conn {
84 struct ovs_list list_node; /* In struct raft's 'conns' list. */
85 struct jsonrpc_session *js; /* JSON-RPC connection. */
86 struct uuid sid; /* This server's unique ID. */
87 char *nickname; /* Short name for use in log messages. */
88 bool incoming; /* True if incoming, false if outgoing. */
89 unsigned int js_seqno; /* Seqno for noticing (re)connections. */
90};
91
92static void raft_conn_close(struct raft_conn *);
93
94/* A "command", that is, a request to append an entry to the log.
95 *
96 * The Raft specification only allows clients to issue commands to the leader.
97 * With this implementation, clients may issue a command on any server, which
98 * then relays the command to the leader if necessary.
99 *
100 * This structure is thus used in three cases:
101 *
102 * 1. We are the leader and the command was issued to us directly.
103 *
104 * 2. We are a follower and relayed the command to the leader.
105 *
106 * 3. We are the leader and a follower relayed the command to us.
107 */
108struct raft_command {
109 /* All cases. */
110 struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
111 unsigned int n_refs; /* Reference count. */
112 enum raft_command_status status; /* Execution status. */
7ef36089 113 struct uuid eid; /* Entry ID of result. */
1b1d2e6d
BP
114
115 /* Case 1 only. */
116 uint64_t index; /* Index in log (0 if being relayed). */
117
1b1d2e6d
BP
118 /* Case 2 only. */
119 long long int timestamp; /* Issue or last ping time, for expiration. */
120
121 /* Case 3 only. */
122 struct uuid sid; /* The follower (otherwise UUID_ZERO). */
123};
124
125static void raft_command_complete(struct raft *, struct raft_command *,
126 enum raft_command_status);
127
128static void raft_complete_all_commands(struct raft *,
129 enum raft_command_status);
130
131/* Type of deferred action, see struct raft_waiter. */
132enum raft_waiter_type {
133 RAFT_W_ENTRY,
134 RAFT_W_TERM,
135 RAFT_W_RPC,
136};
137
138/* An action deferred until a log write commits to disk. */
139struct raft_waiter {
140 struct ovs_list list_node;
141 uint64_t commit_ticket;
142
143 enum raft_waiter_type type;
144 union {
145 /* RAFT_W_ENTRY.
146 *
147 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
148 * completion, updates 'log_synced' to indicate that the new log entry
149 * or entries are committed and, if we are leader, also updates our
150 * local 'match_index'. */
151 struct {
152 uint64_t index;
153 } entry;
154
155 /* RAFT_W_TERM.
156 *
157 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
158 * Upon completion, updates 'synced_term' and 'synced_vote', which
159 * triggers sending RPCs deferred by the uncommitted 'term' and
160 * 'vote'. */
161 struct {
162 uint64_t term;
163 struct uuid vote;
164 } term;
165
166 /* RAFT_W_RPC.
167 *
168 * Sometimes, sending an RPC to a peer must be delayed until an entry,
169 * a term, or a vote mentioned in the RPC is synced to disk. This
170 * waiter keeps a copy of such an RPC until the previous waiters have
171 * committed. */
172 union raft_rpc *rpc;
173 };
174};
175
176static struct raft_waiter *raft_waiter_create(struct raft *,
177 enum raft_waiter_type,
178 bool start_commit);
179static void raft_waiters_destroy(struct raft *);
180
181/* The Raft state machine. */
182struct raft {
183 struct hmap_node hmap_node; /* In 'all_rafts'. */
184 struct ovsdb_log *log;
185
186/* Persistent derived state.
187 *
188 * This must be updated on stable storage before responding to RPCs. It can be
189 * derived from the header, snapshot, and log in 'log'. */
190
191 struct uuid cid; /* Cluster ID (immutable for the cluster). */
192 struct uuid sid; /* Server ID (immutable for the server). */
193 char *local_address; /* Local address (immutable for the server). */
194 char *local_nickname; /* Used for local server in log messages. */
195 char *name; /* Schema name (immutable for the cluster). */
196
197 /* Contains "struct raft_server"s and represents the server configuration
198 * most recently added to 'log'. */
199 struct hmap servers;
200
8e354614
HZ
201#define ELECTION_BASE_MSEC 1000
202#define ELECTION_RANGE_MSEC 1000
203 /* The election timeout base value for leader election, in milliseconds.
204 * It can be set by unixctl cluster/change-election-timer. Default value is
205 * ELECTION_BASE_MSEC. */
206 uint64_t election_timer;
207 /* If not 0, it is the new value of election_timer being proposed. */
208 uint64_t election_timer_new;
209
1b1d2e6d
BP
210/* Persistent state on all servers.
211 *
212 * Must be updated on stable storage before responding to RPCs. */
213
214 /* Current term and the vote for that term. These might be on the way to
215 * disk now. */
216 uint64_t term; /* Initialized to 0 and only increases. */
217 struct uuid vote; /* All-zeros if no vote yet in 'term'. */
218
219 /* The term and vote that have been synced to disk. */
220 uint64_t synced_term;
221 struct uuid synced_vote;
222
223 /* The log.
224 *
225 * A log entry with index 1 never really exists; the initial snapshot for a
226 * Raft is considered to include this index. The first real log entry has
227 * index 2.
228 *
229 * A new Raft instance contains an empty log: log_start=2, log_end=2.
230 * Over time, the log grows: log_start=2, log_end=N.
231 * At some point, the server takes a snapshot: log_start=N, log_end=N.
232 * The log continues to grow: log_start=N, log_end=N+1...
233 *
234 * Must be updated on stable storage before responding to RPCs. */
235 struct raft_entry *entries; /* Log entry i is in log[i - log_start]. */
236 uint64_t log_start; /* Index of first entry in log. */
237 uint64_t log_end; /* Index of last entry in log, plus 1. */
238 uint64_t log_synced; /* Index of last synced entry. */
239 size_t allocated_log; /* Allocated entries in 'log'. */
240
241 /* Snapshot state (see Figure 5.1)
242 *
243 * This is the state of the cluster as of the last discarded log entry,
244 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
245 * Only committed log entries can be included in a snapshot. */
246 struct raft_entry snap;
247
248/* Volatile state.
249 *
250 * The snapshot is always committed, but the rest of the log might not be yet.
251 * 'last_applied' tracks what entries have been passed to the client. If the
252 * client hasn't yet read the latest snapshot, then even the snapshot isn't
253 * applied yet. Thus, the invariants are different for these members:
254 *
255 * log_start - 2 <= last_applied <= commit_index < log_end.
256 * log_start - 1 <= commit_index < log_end.
257 */
258
259 enum raft_role role; /* Current role. */
260 uint64_t commit_index; /* Max log index known to be committed. */
261 uint64_t last_applied; /* Max log index applied to state machine. */
262 struct uuid leader_sid; /* Server ID of leader (zero, if unknown). */
263
1b1d2e6d
BP
264 long long int election_base; /* Time of last heartbeat from leader. */
265 long long int election_timeout; /* Time at which we start an election. */
266
267 /* Used for joining a cluster. */
268 bool joining; /* Attempting to join the cluster? */
269 struct sset remote_addresses; /* Addresses to try to find other servers. */
270 long long int join_timeout; /* Time to re-send add server request. */
271
272 /* Used for leaving a cluster. */
273 bool leaving; /* True if we are leaving the cluster. */
274 bool left; /* True if we have finished leaving. */
275 long long int leave_timeout; /* Time to re-send remove server request. */
276
277 /* Failure. */
278 bool failed; /* True if unrecoverable error has occurred. */
279
280 /* File synchronization. */
281 struct ovs_list waiters; /* Contains "struct raft_waiter"s. */
282
283 /* Network connections. */
284 struct pstream *listener; /* For connections from other Raft servers. */
285 long long int listen_backoff; /* For retrying creating 'listener'. */
286 struct ovs_list conns; /* Contains struct raft_conns. */
287
288 /* Leaders only. Reinitialized after becoming leader. */
289 struct hmap add_servers; /* Contains "struct raft_server"s to add. */
290 struct raft_server *remove_server; /* Server being removed. */
291 struct hmap commands; /* Contains "struct raft_command"s. */
1b1d2e6d
BP
292 long long int ping_timeout; /* Time at which to send a heartbeat */
293
294 /* Candidates only. Reinitialized at start of election. */
295 int n_votes; /* Number of votes for me. */
923f01ca
HZ
296
297 /* Followers and candidates only. */
298 bool candidate_retrying; /* The earlier election timed-out and we are
299 now retrying. */
300 bool had_leader; /* There has been leader elected since last
301 election initiated. This is to help setting
302 candidate_retrying. */
2833885f
HZ
303
304 /* For all. */
305 bool ever_had_leader; /* There has been leader elected since the raft
306 is initialized, meaning it is ever
307 connected. */
80e3becd
IM
308
309 /* Connection backlog limits. */
310#define DEFAULT_MAX_BACKLOG_N_MSGS 500
311#define DEFAULT_MAX_BACKLOG_N_BYTES UINT32_MAX
312 size_t conn_backlog_max_n_msgs; /* Number of messages. */
313 size_t conn_backlog_max_n_bytes; /* Number of bytes. */
1b1d2e6d
BP
314};
315
316/* All Raft structures. */
317static struct hmap all_rafts = HMAP_INITIALIZER(&all_rafts);
318
319static void raft_init(void);
320
321static struct ovsdb_error *raft_read_header(struct raft *)
322 OVS_WARN_UNUSED_RESULT;
323
324static void raft_send_execute_command_reply(struct raft *,
325 const struct uuid *sid,
326 const struct uuid *eid,
327 enum raft_command_status,
328 uint64_t commit_index);
329
330static void raft_update_our_match_index(struct raft *, uint64_t min_index);
331
332static void raft_send_remove_server_reply__(
333 struct raft *, const struct uuid *target_sid,
334 const struct uuid *requester_sid, struct unixctl_conn *requester_conn,
335 bool success, const char *comment);
17bd4149 336static void raft_finished_leaving_cluster(struct raft *);
1b1d2e6d
BP
337
338static void raft_server_init_leader(struct raft *, struct raft_server *);
339
340static bool raft_rpc_is_heartbeat(const union raft_rpc *);
341static bool raft_is_rpc_synced(const struct raft *, const union raft_rpc *);
342
343static void raft_handle_rpc(struct raft *, const union raft_rpc *);
17bd4149 344
02acb41a
BP
345static bool raft_send_at(struct raft *, const union raft_rpc *,
346 int line_number);
347#define raft_send(raft, rpc) raft_send_at(raft, rpc, __LINE__)
348
349static bool raft_send_to_conn_at(struct raft *, const union raft_rpc *,
350 struct raft_conn *, int line_number);
351#define raft_send_to_conn(raft, rpc, conn) \
352 raft_send_to_conn_at(raft, rpc, conn, __LINE__)
353
1b1d2e6d
BP
354static void raft_send_append_request(struct raft *,
355 struct raft_server *, unsigned int n,
356 const char *comment);
357
358static void raft_become_leader(struct raft *);
359static void raft_become_follower(struct raft *);
0f954f32
HZ
360static void raft_reset_election_timer(struct raft *);
361static void raft_reset_ping_timer(struct raft *);
1b1d2e6d
BP
362static void raft_send_heartbeats(struct raft *);
363static void raft_start_election(struct raft *, bool leadership_transfer);
364static bool raft_truncate(struct raft *, uint64_t new_end);
365static void raft_get_servers_from_log(struct raft *, enum vlog_level);
8e354614 366static void raft_get_election_timer_from_log(struct raft *);
1b1d2e6d
BP
367
368static bool raft_handle_write_error(struct raft *, struct ovsdb_error *);
369
370static void raft_run_reconfigure(struct raft *);
371
923f01ca 372static void raft_set_leader(struct raft *, const struct uuid *sid);
1b1d2e6d
BP
373static struct raft_server *
374raft_find_server(const struct raft *raft, const struct uuid *sid)
375{
376 return raft_server_find(&raft->servers, sid);
377}
378
379static char *
380raft_make_address_passive(const char *address_)
381{
382 if (!strncmp(address_, "unix:", 5)) {
383 return xasprintf("p%s", address_);
384 } else {
385 char *address = xstrdup(address_);
0b043300
BP
386 char *host, *port;
387 inet_parse_host_port_tokens(strchr(address, ':') + 1, &host, &port);
1b1d2e6d
BP
388
389 struct ds paddr = DS_EMPTY_INITIALIZER;
390 ds_put_format(&paddr, "p%.3s:%s:", address, port);
391 if (strchr(host, ':')) {
392 ds_put_format(&paddr, "[%s]", host);
393 } else {
394 ds_put_cstr(&paddr, host);
395 }
396 free(address);
397 return ds_steal_cstr(&paddr);
398 }
399}
400
401static struct raft *
402raft_alloc(void)
403{
404 raft_init();
405
406 struct raft *raft = xzalloc(sizeof *raft);
407 hmap_node_nullify(&raft->hmap_node);
408 hmap_init(&raft->servers);
409 raft->log_start = raft->log_end = 1;
410 raft->role = RAFT_FOLLOWER;
411 sset_init(&raft->remote_addresses);
412 raft->join_timeout = LLONG_MAX;
413 ovs_list_init(&raft->waiters);
414 raft->listen_backoff = LLONG_MIN;
415 ovs_list_init(&raft->conns);
416 hmap_init(&raft->add_servers);
417 hmap_init(&raft->commands);
418
8e354614 419 raft->election_timer = ELECTION_BASE_MSEC;
1b1d2e6d 420
80e3becd
IM
421 raft->conn_backlog_max_n_msgs = DEFAULT_MAX_BACKLOG_N_MSGS;
422 raft->conn_backlog_max_n_bytes = DEFAULT_MAX_BACKLOG_N_BYTES;
423
1b1d2e6d
BP
424 return raft;
425}
426
427/* Creates an on-disk file that represents a new Raft cluster and initializes
428 * it to consist of a single server, the one on which this function is called.
429 *
430 * Creates the local copy of the cluster's log in 'file_name', which must not
431 * already exist. Gives it the name 'name', which should be the database
432 * schema name and which is used only to match up this database with the server
433 * added to the cluster later if the cluster ID is unavailable.
434 *
435 * The new server is located at 'local_address', which must take one of the
436 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
437 * square bracket enclosed IPv6 address and PORT is a TCP port number.
438 *
439 * This only creates the on-disk file. Use raft_open() to start operating the
440 * new server.
441 *
442 * Returns null if successful, otherwise an ovsdb_error describing the
443 * problem. */
444struct ovsdb_error * OVS_WARN_UNUSED_RESULT
445raft_create_cluster(const char *file_name, const char *name,
446 const char *local_address, const struct json *data)
447{
448 /* Parse and verify validity of the local address. */
449 struct ovsdb_error *error = raft_address_validate(local_address);
450 if (error) {
451 return error;
452 }
453
454 /* Create log file. */
455 struct ovsdb_log *log;
456 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
457 -1, &log);
458 if (error) {
459 return error;
460 }
461
462 /* Write log file. */
463 struct raft_header h = {
464 .sid = uuid_random(),
465 .cid = uuid_random(),
466 .name = xstrdup(name),
467 .local_address = xstrdup(local_address),
468 .joining = false,
469 .remote_addresses = SSET_INITIALIZER(&h.remote_addresses),
470 .snap_index = 1,
471 .snap = {
472 .term = 1,
473 .data = json_nullable_clone(data),
474 .eid = uuid_random(),
475 .servers = json_object_create(),
476 },
477 };
478 shash_add_nocopy(json_object(h.snap.servers),
479 xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
480 json_string_create(local_address));
481 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
482 raft_header_uninit(&h);
483 if (!error) {
484 error = ovsdb_log_commit_block(log);
485 }
486 ovsdb_log_close(log);
487
488 return error;
489}
490
491/* Creates a database file that represents a new server in an existing Raft
492 * cluster.
493 *
494 * Creates the local copy of the cluster's log in 'file_name', which must not
495 * already exist. Gives it the name 'name', which must be the same name
496 * passed in to raft_create_cluster() earlier.
497 *
498 * 'cid' is optional. If specified, the new server will join only the cluster
499 * with the given cluster ID.
500 *
501 * The new server is located at 'local_address', which must take one of the
502 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
503 * square bracket enclosed IPv6 address and PORT is a TCP port number.
504 *
505 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
506 * specifies the addresses of existing servers in the cluster. One server out
507 * of the existing cluster is sufficient, as long as that server is reachable
508 * and not partitioned from the current cluster leader. If multiple servers
509 * from the cluster are specified, then it is sufficient for any of them to
510 * meet this criterion.
511 *
512 * This only creates the on-disk file and does no network access. Use
513 * raft_open() to start operating the new server. (Until this happens, the
514 * new server has not joined the cluster.)
515 *
516 * Returns null if successful, otherwise an ovsdb_error describing the
517 * problem. */
518struct ovsdb_error * OVS_WARN_UNUSED_RESULT
519raft_join_cluster(const char *file_name,
520 const char *name, const char *local_address,
521 const struct sset *remote_addresses,
522 const struct uuid *cid)
523{
524 ovs_assert(!sset_is_empty(remote_addresses));
525
526 /* Parse and verify validity of the addresses. */
527 struct ovsdb_error *error = raft_address_validate(local_address);
528 if (error) {
529 return error;
530 }
531 const char *addr;
532 SSET_FOR_EACH (addr, remote_addresses) {
533 error = raft_address_validate(addr);
534 if (error) {
535 return error;
536 }
537 if (!strcmp(addr, local_address)) {
538 return ovsdb_error(NULL, "remote addresses cannot be the same "
539 "as the local address");
540 }
541 }
542
543 /* Verify validity of the cluster ID (if provided). */
544 if (cid && uuid_is_zero(cid)) {
545 return ovsdb_error(NULL, "all-zero UUID is not valid cluster ID");
546 }
547
548 /* Create log file. */
549 struct ovsdb_log *log;
550 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
551 -1, &log);
552 if (error) {
553 return error;
554 }
555
556 /* Write log file. */
557 struct raft_header h = {
558 .sid = uuid_random(),
559 .cid = cid ? *cid : UUID_ZERO,
560 .name = xstrdup(name),
561 .local_address = xstrdup(local_address),
562 .joining = true,
563 /* No snapshot yet. */
564 };
565 sset_clone(&h.remote_addresses, remote_addresses);
566 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
567 raft_header_uninit(&h);
568 if (!error) {
569 error = ovsdb_log_commit_block(log);
570 }
571 ovsdb_log_close(log);
572
573 return error;
574}
575
576/* Reads the initial header record from 'log', which must be a Raft clustered
577 * database log, and populates '*md' with the information read from it. The
578 * caller must eventually destroy 'md' with raft_metadata_destroy().
579 *
580 * On success, returns NULL. On failure, returns an error that the caller must
581 * eventually destroy and zeros '*md'. */
582struct ovsdb_error * OVS_WARN_UNUSED_RESULT
583raft_read_metadata(struct ovsdb_log *log, struct raft_metadata *md)
584{
585 struct raft *raft = raft_alloc();
586 raft->log = log;
587
588 struct ovsdb_error *error = raft_read_header(raft);
589 if (!error) {
590 md->sid = raft->sid;
591 md->name = xstrdup(raft->name);
592 md->local = xstrdup(raft->local_address);
593 md->cid = raft->cid;
594 } else {
595 memset(md, 0, sizeof *md);
596 }
597
598 raft->log = NULL;
599 raft_close(raft);
600 return error;
601}
602
603/* Frees the metadata in 'md'. */
604void
605raft_metadata_destroy(struct raft_metadata *md)
606{
607 if (md) {
608 free(md->name);
609 free(md->local);
610 }
611}
612
613static const struct raft_entry *
614raft_get_entry(const struct raft *raft, uint64_t index)
615{
616 ovs_assert(index >= raft->log_start);
617 ovs_assert(index < raft->log_end);
618 return &raft->entries[index - raft->log_start];
619}
620
621static uint64_t
622raft_get_term(const struct raft *raft, uint64_t index)
623{
624 return (index == raft->log_start - 1
625 ? raft->snap.term
626 : raft_get_entry(raft, index)->term);
627}
628
629static const struct json *
630raft_servers_for_index(const struct raft *raft, uint64_t index)
631{
632 ovs_assert(index >= raft->log_start - 1);
633 ovs_assert(index < raft->log_end);
634
635 const struct json *servers = raft->snap.servers;
636 for (uint64_t i = raft->log_start; i <= index; i++) {
637 const struct raft_entry *e = raft_get_entry(raft, i);
638 if (e->servers) {
639 servers = e->servers;
640 }
641 }
642 return servers;
643}
644
645static void
646raft_set_servers(struct raft *raft, const struct hmap *new_servers,
647 enum vlog_level level)
648{
649 struct raft_server *s, *next;
650 HMAP_FOR_EACH_SAFE (s, next, hmap_node, &raft->servers) {
651 if (!raft_server_find(new_servers, &s->sid)) {
652 ovs_assert(s != raft->remove_server);
653
654 hmap_remove(&raft->servers, &s->hmap_node);
655 VLOG(level, "server %s removed from configuration", s->nickname);
656 raft_server_destroy(s);
657 }
658 }
659
660 HMAP_FOR_EACH_SAFE (s, next, hmap_node, new_servers) {
661 if (!raft_find_server(raft, &s->sid)) {
662 VLOG(level, "server %s added to configuration", s->nickname);
663
664 struct raft_server *new
665 = raft_server_add(&raft->servers, &s->sid, s->address);
666 raft_server_init_leader(raft, new);
667 }
668 }
669}
670
671static uint64_t
672raft_add_entry(struct raft *raft,
673 uint64_t term, struct json *data, const struct uuid *eid,
8e354614 674 struct json *servers, uint64_t election_timer)
1b1d2e6d
BP
675{
676 if (raft->log_end - raft->log_start >= raft->allocated_log) {
677 raft->entries = x2nrealloc(raft->entries, &raft->allocated_log,
678 sizeof *raft->entries);
679 }
680
681 uint64_t index = raft->log_end++;
682 struct raft_entry *entry = &raft->entries[index - raft->log_start];
683 entry->term = term;
684 entry->data = data;
685 entry->eid = eid ? *eid : UUID_ZERO;
686 entry->servers = servers;
8e354614 687 entry->election_timer = election_timer;
1b1d2e6d
BP
688 return index;
689}
690
8e354614
HZ
691/* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers',
692 * 'election_timer' to * 'raft''s log and returns an error indication. */
1b1d2e6d
BP
693static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
694raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
8e354614
HZ
695 const struct uuid *eid, struct json *servers,
696 uint64_t election_timer)
1b1d2e6d
BP
697{
698 struct raft_record r = {
699 .type = RAFT_REC_ENTRY,
700 .term = term,
701 .entry = {
8e354614
HZ
702 .index = raft_add_entry(raft, term, data, eid, servers,
703 election_timer),
1b1d2e6d
BP
704 .data = data,
705 .servers = servers,
8e354614 706 .election_timer = election_timer,
1b1d2e6d
BP
707 .eid = eid ? *eid : UUID_ZERO,
708 },
709 };
710 return ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r));
711}
712
713static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
714raft_write_state(struct ovsdb_log *log,
715 uint64_t term, const struct uuid *vote)
716{
717 struct raft_record r = { .term = term };
718 if (vote && !uuid_is_zero(vote)) {
719 r.type = RAFT_REC_VOTE;
720 r.sid = *vote;
721 } else {
722 r.type = RAFT_REC_TERM;
723 }
724 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
725}
726
727static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
728raft_apply_record(struct raft *raft, unsigned long long int rec_idx,
729 const struct raft_record *r)
730{
731 /* Apply "term", which is present in most kinds of records (and otherwise
732 * 0).
733 *
734 * A Raft leader can replicate entries from previous terms to the other
735 * servers in the cluster, retaining the original terms on those entries
736 * (see section 3.6.2 "Committing entries from previous terms" for more
737 * information), so it's OK for the term in a log record to precede the
738 * current term. */
739 if (r->term > raft->term) {
740 raft->term = raft->synced_term = r->term;
741 raft->vote = raft->synced_vote = UUID_ZERO;
742 }
743
744 switch (r->type) {
745 case RAFT_REC_ENTRY:
746 if (r->entry.index < raft->commit_index) {
747 return ovsdb_error(NULL, "record %llu attempts to truncate log "
748 "from %"PRIu64" to %"PRIu64" entries, but "
749 "commit index is already %"PRIu64,
750 rec_idx, raft->log_end, r->entry.index,
751 raft->commit_index);
752 } else if (r->entry.index > raft->log_end) {
753 return ovsdb_error(NULL, "record %llu with index %"PRIu64" skips "
754 "past expected index %"PRIu64,
755 rec_idx, r->entry.index, raft->log_end);
756 }
757
758 if (r->entry.index < raft->log_end) {
759 /* This can happen, but it is notable. */
760 VLOG_DBG("record %llu truncates log from %"PRIu64" to %"PRIu64
761 " entries", rec_idx, raft->log_end, r->entry.index);
762 raft_truncate(raft, r->entry.index);
763 }
764
765 uint64_t prev_term = (raft->log_end > raft->log_start
766 ? raft->entries[raft->log_end
767 - raft->log_start - 1].term
768 : raft->snap.term);
769 if (r->term < prev_term) {
770 return ovsdb_error(NULL, "record %llu with index %"PRIu64" term "
771 "%"PRIu64" precedes previous entry's term "
772 "%"PRIu64,
773 rec_idx, r->entry.index, r->term, prev_term);
774 }
775
776 raft->log_synced = raft_add_entry(
777 raft, r->term,
778 json_nullable_clone(r->entry.data), &r->entry.eid,
8e354614
HZ
779 json_nullable_clone(r->entry.servers),
780 r->entry.election_timer);
1b1d2e6d
BP
781 return NULL;
782
783 case RAFT_REC_TERM:
784 return NULL;
785
786 case RAFT_REC_VOTE:
787 if (r->term < raft->term) {
788 return ovsdb_error(NULL, "record %llu votes for term %"PRIu64" "
789 "but current term is %"PRIu64,
790 rec_idx, r->term, raft->term);
791 } else if (!uuid_is_zero(&raft->vote)
792 && !uuid_equals(&raft->vote, &r->sid)) {
793 return ovsdb_error(NULL, "record %llu votes for "SID_FMT" in term "
794 "%"PRIu64" but a previous record for the "
795 "same term voted for "SID_FMT, rec_idx,
796 SID_ARGS(&raft->vote), r->term,
797 SID_ARGS(&r->sid));
798 } else {
799 raft->vote = raft->synced_vote = r->sid;
800 return NULL;
801 }
802 break;
803
804 case RAFT_REC_NOTE:
805 if (!strcmp(r->note, "left")) {
806 return ovsdb_error(NULL, "record %llu indicates server has left "
807 "the cluster; it cannot be added back (use "
808 "\"ovsdb-tool join-cluster\" to add a new "
809 "server)", rec_idx);
810 }
811 return NULL;
812
813 case RAFT_REC_COMMIT_INDEX:
814 if (r->commit_index < raft->commit_index) {
815 return ovsdb_error(NULL, "record %llu regresses commit index "
816 "from %"PRIu64 " to %"PRIu64,
817 rec_idx, raft->commit_index, r->commit_index);
818 } else if (r->commit_index >= raft->log_end) {
819 return ovsdb_error(NULL, "record %llu advances commit index to "
820 "%"PRIu64 " but last log index is %"PRIu64,
821 rec_idx, r->commit_index, raft->log_end - 1);
822 } else {
823 raft->commit_index = r->commit_index;
824 return NULL;
825 }
826 break;
827
828 case RAFT_REC_LEADER:
829 /* XXX we could use this to take back leadership for quick restart */
830 return NULL;
831
832 default:
833 OVS_NOT_REACHED();
834 }
835}
836
837static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
838raft_read_header(struct raft *raft)
839{
840 /* Read header record. */
841 struct json *json;
842 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
843 if (error || !json) {
844 /* Report error or end-of-file. */
845 return error;
846 }
847 ovsdb_log_mark_base(raft->log);
848
849 struct raft_header h;
850 error = raft_header_from_json(&h, json);
851 json_destroy(json);
852 if (error) {
853 return error;
854 }
855
856 raft->sid = h.sid;
857 raft->cid = h.cid;
858 raft->name = xstrdup(h.name);
859 raft->local_address = xstrdup(h.local_address);
860 raft->local_nickname = raft_address_to_nickname(h.local_address, &h.sid);
861 raft->joining = h.joining;
862
863 if (h.joining) {
864 sset_clone(&raft->remote_addresses, &h.remote_addresses);
865 } else {
866 raft_entry_clone(&raft->snap, &h.snap);
867 raft->log_start = raft->log_end = h.snap_index + 1;
7efc6980 868 raft->log_synced = raft->commit_index = h.snap_index;
1b1d2e6d
BP
869 raft->last_applied = h.snap_index - 1;
870 }
871
872 raft_header_uninit(&h);
873
874 return NULL;
875}
876
877static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
878raft_read_log(struct raft *raft)
879{
880 for (unsigned long long int i = 1; ; i++) {
881 struct json *json;
882 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
883 if (!json) {
884 if (error) {
885 /* We assume that the error is due to a partial write while
886 * appending to the file before a crash, so log it and
887 * continue. */
888 char *error_string = ovsdb_error_to_string_free(error);
889 VLOG_WARN("%s", error_string);
890 free(error_string);
891 error = NULL;
892 }
893 break;
894 }
895
896 struct raft_record r;
897 error = raft_record_from_json(&r, json);
898 if (!error) {
899 error = raft_apply_record(raft, i, &r);
900 raft_record_uninit(&r);
901 }
ce52f1ee 902 json_destroy(json);
1b1d2e6d
BP
903 if (error) {
904 return ovsdb_wrap_error(error, "error reading record %llu from "
905 "%s log", i, raft->name);
906 }
907 }
908
909 /* Set the most recent servers. */
910 raft_get_servers_from_log(raft, VLL_DBG);
911
8e354614
HZ
912 /* Set the most recent election_timer. */
913 raft_get_election_timer_from_log(raft);
914
1b1d2e6d
BP
915 return NULL;
916}
917
918static void
0f954f32 919raft_reset_election_timer(struct raft *raft)
1b1d2e6d 920{
8e354614 921 unsigned int duration = (raft->election_timer
1b1d2e6d
BP
922 + random_range(ELECTION_RANGE_MSEC));
923 raft->election_base = time_msec();
67dba070
HZ
924 if (failure_test == FT_DELAY_ELECTION) {
925 /* Slow down this node so that it won't win the next election. */
8e354614 926 duration += raft->election_timer;
67dba070 927 }
1b1d2e6d
BP
928 raft->election_timeout = raft->election_base + duration;
929}
930
0f954f32
HZ
931static void
932raft_reset_ping_timer(struct raft *raft)
933{
8e354614 934 raft->ping_timeout = time_msec() + raft->election_timer / 3;
0f954f32
HZ
935}
936
1b1d2e6d
BP
937static void
938raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
939 const struct uuid *sid, bool incoming)
940{
941 struct raft_conn *conn = xzalloc(sizeof *conn);
942 ovs_list_push_back(&raft->conns, &conn->list_node);
943 conn->js = js;
944 if (sid) {
945 conn->sid = *sid;
946 }
947 conn->nickname = raft_address_to_nickname(jsonrpc_session_get_name(js),
948 &conn->sid);
949 conn->incoming = incoming;
950 conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
db5a066c 951 jsonrpc_session_set_probe_interval(js, 0);
80e3becd
IM
952 jsonrpc_session_set_backlog_threshold(js, raft->conn_backlog_max_n_msgs,
953 raft->conn_backlog_max_n_bytes);
1b1d2e6d
BP
954}
955
956/* Starts the local server in an existing Raft cluster, using the local copy of
957 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
958 * successful or not. */
959struct ovsdb_error * OVS_WARN_UNUSED_RESULT
960raft_open(struct ovsdb_log *log, struct raft **raftp)
961{
962 struct raft *raft = raft_alloc();
963 raft->log = log;
964
965 struct ovsdb_error *error = raft_read_header(raft);
966 if (error) {
967 goto error;
968 }
969
970 if (!raft->joining) {
971 error = raft_read_log(raft);
972 if (error) {
973 goto error;
974 }
975
976 /* Find our own server. */
977 if (!raft_find_server(raft, &raft->sid)) {
978 error = ovsdb_error(NULL, "server does not belong to cluster");
979 goto error;
980 }
981
982 /* If there's only one server, start an election right away so that the
983 * cluster bootstraps quickly. */
984 if (hmap_count(&raft->servers) == 1) {
985 raft_start_election(raft, false);
986 }
987 } else {
988 raft->join_timeout = time_msec() + 1000;
989 }
990
ac3ba8c6
HZ
991 raft_reset_ping_timer(raft);
992 raft_reset_election_timer(raft);
993
1b1d2e6d
BP
994 *raftp = raft;
995 hmap_insert(&all_rafts, &raft->hmap_node, hash_string(raft->name, 0));
996 return NULL;
997
998error:
999 raft_close(raft);
1000 *raftp = NULL;
1001 return error;
1002}
1003
1004/* Returns the name of 'raft', which in OVSDB is the database schema name. */
1005const char *
1006raft_get_name(const struct raft *raft)
1007{
1008 return raft->name;
1009}
1010
1011/* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
1012 * its cluster, then 'cid' will be all-zeros (unless the administrator
1013 * specified a cluster ID running "ovsdb-tool join-cluster").
1014 *
1015 * Each cluster has a unique cluster ID. */
1016const struct uuid *
1017raft_get_cid(const struct raft *raft)
1018{
1019 return &raft->cid;
1020}
1021
1022/* Returns the server ID of 'raft'. Each server has a unique server ID. */
1023const struct uuid *
1024raft_get_sid(const struct raft *raft)
1025{
1026 return &raft->sid;
1027}
1028
3423cd97
IM
1029/* Adds memory consumption info to 'usage' for later use by memory_report(). */
1030void
1031raft_get_memory_usage(const struct raft *raft, struct simap *usage)
1032{
1033 struct raft_conn *conn;
6182c695 1034 uint64_t backlog = 0;
3423cd97
IM
1035 int cnt = 0;
1036
1037 LIST_FOR_EACH (conn, list_node, &raft->conns) {
6182c695 1038 backlog += jsonrpc_session_get_backlog(conn->js);
3423cd97
IM
1039 cnt++;
1040 }
6182c695 1041 simap_increase(usage, "raft-backlog-kB", backlog / 1000);
3423cd97 1042 simap_increase(usage, "raft-connections", cnt);
7e381881 1043 simap_increase(usage, "raft-log", raft->log_end - raft->log_start);
3423cd97
IM
1044}
1045
1b1d2e6d
BP
1046/* Returns true if 'raft' has completed joining its cluster, has not left or
1047 * initiated leaving the cluster, does not have failed disk storage, and is
1048 * apparently connected to the leader in a healthy way (or is itself the
4d9b28cb
HZ
1049 * leader).
1050 *
1051 * If 'raft' is candidate:
1052 * a) if it is the first round of election, consider it as connected, hoping
1053 * it will successfully elect a new leader soon.
1054 * b) if it is already retrying, consider it as disconnected (so that clients
1055 * may decide to reconnect to other members). */
1b1d2e6d
BP
1056bool
1057raft_is_connected(const struct raft *raft)
1058{
93023e80 1059 static bool last_state = false;
923f01ca 1060 bool ret = (!raft->candidate_retrying
1b1d2e6d
BP
1061 && !raft->joining
1062 && !raft->leaving
1063 && !raft->left
2833885f
HZ
1064 && !raft->failed
1065 && raft->ever_had_leader);
93023e80
IM
1066
1067 if (!ret) {
1068 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1069 VLOG_DBG_RL(&rl, "raft_is_connected: false");
1070 } else if (!last_state) {
1071 VLOG_DBG("raft_is_connected: true");
1072 }
1073 last_state = ret;
1074
923f01ca 1075 return ret;
1b1d2e6d
BP
1076}
1077
1078/* Returns true if 'raft' is the cluster leader. */
1079bool
1080raft_is_leader(const struct raft *raft)
1081{
1082 return raft->role == RAFT_LEADER;
1083}
1084
1085/* Returns true if 'raft' is the process of joining its cluster. */
1086bool
1087raft_is_joining(const struct raft *raft)
1088{
1089 return raft->joining;
1090}
1091
1092/* Only returns *connected* connections. */
1093static struct raft_conn *
1094raft_find_conn_by_sid(struct raft *raft, const struct uuid *sid)
1095{
1096 if (!uuid_is_zero(sid)) {
1097 struct raft_conn *conn;
1098 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1099 if (uuid_equals(sid, &conn->sid)
1100 && jsonrpc_session_is_connected(conn->js)) {
1101 return conn;
1102 }
1103 }
1104 }
1105 return NULL;
1106}
1107
1108static struct raft_conn *
1109raft_find_conn_by_address(struct raft *raft, const char *address)
1110{
1111 struct raft_conn *conn;
1112 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1113 if (!strcmp(jsonrpc_session_get_name(conn->js), address)) {
1114 return conn;
1115 }
1116 }
1117 return NULL;
1118}
1119
1120static void OVS_PRINTF_FORMAT(3, 4)
1121raft_record_note(struct raft *raft, const char *note,
1122 const char *comment_format, ...)
1123{
1124 va_list args;
1125 va_start(args, comment_format);
1126 char *comment = xvasprintf(comment_format, args);
1127 va_end(args);
1128
1129 struct raft_record r = {
1130 .type = RAFT_REC_NOTE,
1131 .comment = comment,
1132 .note = CONST_CAST(char *, note),
1133 };
1134 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
1135
1136 free(comment);
1137}
1138
1139/* If we're leader, try to transfer leadership to another server, logging
1140 * 'reason' as the human-readable reason (it should be a phrase suitable for
1141 * following "because") . */
1142void
1143raft_transfer_leadership(struct raft *raft, const char *reason)
1144{
1145 if (raft->role != RAFT_LEADER) {
1146 return;
1147 }
1148
1149 struct raft_server *s;
1150 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1151 if (!uuid_equals(&raft->sid, &s->sid)
1152 && s->phase == RAFT_PHASE_STABLE) {
1153 struct raft_conn *conn = raft_find_conn_by_sid(raft, &s->sid);
1154 if (!conn) {
1155 continue;
1156 }
1157
1158 union raft_rpc rpc = {
1159 .become_leader = {
1160 .common = {
1161 .comment = CONST_CAST(char *, reason),
1162 .type = RAFT_RPC_BECOME_LEADER,
1163 .sid = s->sid,
1164 },
1165 .term = raft->term,
1166 }
1167 };
02acb41a 1168 raft_send_to_conn(raft, &rpc, conn);
1b1d2e6d
BP
1169
1170 raft_record_note(raft, "transfer leadership",
1171 "transferring leadership to %s because %s",
1172 s->nickname, reason);
1173 break;
1174 }
1175 }
1176}
1177
1178/* Send a RemoveServerRequest to the rest of the servers in the cluster.
1179 *
1180 * If we know which server is the leader, we can just send the request to it.
1181 * However, we might not know which server is the leader, and we might never
1182 * find out if the remove request was actually previously committed by a
1183 * majority of the servers (because in that case the new leader will not send
1184 * AppendRequests or heartbeats to us). Therefore, we instead send
1185 * RemoveRequests to every server. This theoretically has the same problem, if
1186 * the current cluster leader was not previously a member of the cluster, but
1187 * it seems likely to be more robust in practice. */
1188static void
1189raft_send_remove_server_requests(struct raft *raft)
1190{
1191 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1192 VLOG_INFO_RL(&rl, "sending remove request (joining=%s, leaving=%s)",
1193 raft->joining ? "true" : "false",
1194 raft->leaving ? "true" : "false");
1195 const struct raft_server *s;
1196 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1197 if (!uuid_equals(&s->sid, &raft->sid)) {
1198 union raft_rpc rpc = (union raft_rpc) {
1199 .remove_server_request = {
1200 .common = {
1201 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1202 .sid = s->sid,
1203 },
1204 .sid = raft->sid,
1205 },
1206 };
1207 raft_send(raft, &rpc);
1208 }
1209 }
1210
8e354614 1211 raft->leave_timeout = time_msec() + raft->election_timer;
1b1d2e6d
BP
1212}
1213
1214/* Attempts to start 'raft' leaving its cluster. The caller can check progress
1215 * using raft_is_leaving() and raft_left(). */
1216void
1217raft_leave(struct raft *raft)
1218{
1219 if (raft->joining || raft->failed || raft->leaving || raft->left) {
1220 return;
1221 }
1222 VLOG_INFO(SID_FMT": starting to leave cluster "CID_FMT,
1223 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1224 raft->leaving = true;
1225 raft_transfer_leadership(raft, "this server is leaving the cluster");
1226 raft_become_follower(raft);
1227 raft_send_remove_server_requests(raft);
8e354614 1228 raft->leave_timeout = time_msec() + raft->election_timer;
1b1d2e6d
BP
1229}
1230
1231/* Returns true if 'raft' is currently attempting to leave its cluster. */
1232bool
1233raft_is_leaving(const struct raft *raft)
1234{
1235 return raft->leaving;
1236}
1237
1238/* Returns true if 'raft' successfully left its cluster. */
1239bool
1240raft_left(const struct raft *raft)
1241{
1242 return raft->left;
1243}
1244
1245/* Returns true if 'raft' has experienced a disk I/O failure. When this
1246 * returns true, only closing and reopening 'raft' allows for recovery. */
1247bool
1248raft_failed(const struct raft *raft)
1249{
1250 return raft->failed;
1251}
1252
1253/* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1254 * current cluster. */
1255void
1256raft_take_leadership(struct raft *raft)
1257{
1258 if (raft->role != RAFT_LEADER) {
1259 raft_start_election(raft, true);
1260 }
1261}
1262
1263/* Closes everything owned by 'raft' that might be visible outside the process:
1264 * network connections, commands, etc. This is part of closing 'raft'; it is
1265 * also used if 'raft' has failed in an unrecoverable way. */
1266static void
1267raft_close__(struct raft *raft)
1268{
1269 if (!hmap_node_is_null(&raft->hmap_node)) {
1270 hmap_remove(&all_rafts, &raft->hmap_node);
1271 hmap_node_nullify(&raft->hmap_node);
1272 }
1273
1274 raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
1275
1276 struct raft_server *rs = raft->remove_server;
1277 if (rs) {
1278 raft_send_remove_server_reply__(raft, &rs->sid, &rs->requester_sid,
1279 rs->requester_conn, false,
1280 RAFT_SERVER_SHUTDOWN);
1281 raft_server_destroy(raft->remove_server);
1282 raft->remove_server = NULL;
1283 }
1284
1285 struct raft_conn *conn, *next;
1286 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1287 raft_conn_close(conn);
1288 }
1289}
1290
1291/* Closes and frees 'raft'.
1292 *
1293 * A server's cluster membership is independent of whether the server is
1294 * actually running. When a server that is a member of a cluster closes, the
1295 * cluster treats this as a server failure. */
1296void
1297raft_close(struct raft *raft)
1298{
1299 if (!raft) {
1300 return;
1301 }
1302
1303 raft_transfer_leadership(raft, "this server is shutting down");
1304
1305 raft_close__(raft);
1306
1307 ovsdb_log_close(raft->log);
1308
1309 raft_servers_destroy(&raft->servers);
1310
1311 for (uint64_t index = raft->log_start; index < raft->log_end; index++) {
1312 struct raft_entry *e = &raft->entries[index - raft->log_start];
1313 raft_entry_uninit(e);
1314 }
1315 free(raft->entries);
1316
1317 raft_entry_uninit(&raft->snap);
1318
1319 raft_waiters_destroy(raft);
1320
1321 raft_servers_destroy(&raft->add_servers);
1322
1323 hmap_destroy(&raft->commands);
1324
1325 pstream_close(raft->listener);
1326
1327 sset_destroy(&raft->remote_addresses);
1328 free(raft->local_address);
1329 free(raft->local_nickname);
1330 free(raft->name);
1331
1332 free(raft);
1333}
1334
1335static bool
1336raft_conn_receive(struct raft *raft, struct raft_conn *conn,
1337 union raft_rpc *rpc)
1338{
1339 struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
1340 if (!msg) {
1341 return false;
1342 }
1343
1344 struct ovsdb_error *error = raft_rpc_from_jsonrpc(&raft->cid, &raft->sid,
1345 msg, rpc);
1346 jsonrpc_msg_destroy(msg);
1347 if (error) {
1348 char *s = ovsdb_error_to_string_free(error);
1349 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn->js), s);
1350 free(s);
1351 return false;
1352 }
1353
1354 if (uuid_is_zero(&conn->sid)) {
1355 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1356 conn->sid = rpc->common.sid;
1357 VLOG_INFO_RL(&rl, "%s: learned server ID "SID_FMT,
1358 jsonrpc_session_get_name(conn->js), SID_ARGS(&conn->sid));
1359 } else if (!uuid_equals(&conn->sid, &rpc->common.sid)) {
1360 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1361 VLOG_WARN_RL(&rl, "%s: ignoring message with unexpected server ID "
1362 SID_FMT" (expected "SID_FMT")",
1363 jsonrpc_session_get_name(conn->js),
1364 SID_ARGS(&rpc->common.sid), SID_ARGS(&conn->sid));
1365 raft_rpc_uninit(rpc);
1366 return false;
1367 }
1368
1369 const char *address = (rpc->type == RAFT_RPC_HELLO_REQUEST
1370 ? rpc->hello_request.address
1371 : rpc->type == RAFT_RPC_ADD_SERVER_REQUEST
1372 ? rpc->add_server_request.address
1373 : NULL);
1374 if (address) {
1375 char *new_nickname = raft_address_to_nickname(address, &conn->sid);
1376 if (strcmp(conn->nickname, new_nickname)) {
1377 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1378 VLOG_INFO_RL(&rl, "%s: learned remote address %s",
1379 jsonrpc_session_get_name(conn->js), address);
1380
1381 free(conn->nickname);
1382 conn->nickname = new_nickname;
1383 } else {
1384 free(new_nickname);
1385 }
1386 }
1387
1388 return true;
1389}
1390
1391static const char *
1392raft_get_nickname(const struct raft *raft, const struct uuid *sid,
1393 char buf[SID_LEN + 1], size_t bufsize)
1394{
1395 if (uuid_equals(sid, &raft->sid)) {
1396 return raft->local_nickname;
1397 }
1398
1399 const char *s = raft_servers_get_nickname__(&raft->servers, sid);
1400 if (s) {
1401 return s;
1402 }
1403
1404 return raft_servers_get_nickname(&raft->add_servers, sid, buf, bufsize);
1405}
1406
1407static void
02acb41a
BP
1408log_rpc(const union raft_rpc *rpc, const char *direction,
1409 const struct raft_conn *conn, int line_number)
1b1d2e6d
BP
1410{
1411 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
1412 if (!raft_rpc_is_heartbeat(rpc) && !VLOG_DROP_DBG(&rl)) {
1413 struct ds s = DS_EMPTY_INITIALIZER;
02acb41a
BP
1414 if (line_number) {
1415 ds_put_format(&s, "raft.c:%d ", line_number);
1416 }
1417 ds_put_format(&s, "%s%s ", direction, conn->nickname);
1b1d2e6d 1418 raft_rpc_format(rpc, &s);
02acb41a 1419 VLOG_DBG("%s", ds_cstr(&s));
1b1d2e6d
BP
1420 ds_destroy(&s);
1421 }
1422}
1423
1424static void
1425raft_send_add_server_request(struct raft *raft, struct raft_conn *conn)
1426{
1427 union raft_rpc rq = {
1428 .add_server_request = {
1429 .common = {
1430 .type = RAFT_RPC_ADD_SERVER_REQUEST,
1431 .sid = UUID_ZERO,
1432 .comment = NULL,
1433 },
1434 .address = raft->local_address,
1435 },
1436 };
02acb41a 1437 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1438}
1439
1440static void
1441raft_conn_run(struct raft *raft, struct raft_conn *conn)
1442{
1443 jsonrpc_session_run(conn->js);
1444
1445 unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
8c2c503b
IM
1446 bool reconnected = new_seqno != conn->js_seqno;
1447 bool just_connected = (reconnected
1b1d2e6d 1448 && jsonrpc_session_is_connected(conn->js));
8c2c503b
IM
1449
1450 if (reconnected) {
83fbd2e9
IM
1451 /* Clear 'install_snapshot_request_in_progress' since it might not
1452 * reach the destination or server was restarted. */
8c2c503b
IM
1453 struct raft_server *server = raft_find_server(raft, &conn->sid);
1454 if (server) {
83fbd2e9 1455 server->install_snapshot_request_in_progress = false;
8c2c503b
IM
1456 }
1457 }
1458
1b1d2e6d
BP
1459 conn->js_seqno = new_seqno;
1460 if (just_connected) {
1461 if (raft->joining) {
1462 raft_send_add_server_request(raft, conn);
1463 } else if (raft->leaving) {
1464 union raft_rpc rq = {
1465 .remove_server_request = {
1466 .common = {
1467 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1468 .sid = conn->sid,
1469 },
1470 .sid = raft->sid,
1471 },
1472 };
02acb41a 1473 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1474 } else {
1475 union raft_rpc rq = (union raft_rpc) {
1476 .hello_request = {
1477 .common = {
1478 .type = RAFT_RPC_HELLO_REQUEST,
1479 .sid = conn->sid,
1480 },
1481 .address = raft->local_address,
1482 },
1483 };
02acb41a 1484 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1485 }
1486 }
1487
1488 for (size_t i = 0; i < 50; i++) {
1489 union raft_rpc rpc;
1490 if (!raft_conn_receive(raft, conn, &rpc)) {
1491 break;
1492 }
1493
02acb41a 1494 log_rpc(&rpc, "<--", conn, 0);
1b1d2e6d
BP
1495 raft_handle_rpc(raft, &rpc);
1496 raft_rpc_uninit(&rpc);
1497 }
1498}
1499
1500static void
1501raft_waiter_complete_rpc(struct raft *raft, const union raft_rpc *rpc)
1502{
1503 uint64_t term = raft_rpc_get_term(rpc);
1504 if (term && term < raft->term) {
1505 /* Drop the message because it's for an expired term. */
1506 return;
1507 }
1508
1509 if (!raft_is_rpc_synced(raft, rpc)) {
1510 /* This is a bug. A reply message is deferred because some state in
1511 * the message, such as a term or index, has not been committed to
1512 * disk, and they should only be completed when that commit is done.
1513 * But this message is being completed before the commit is finished.
1514 * Complain, and hope that someone reports the bug. */
1515 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1516 if (VLOG_DROP_ERR(&rl)) {
1517 return;
1518 }
1519
1520 struct ds s = DS_EMPTY_INITIALIZER;
1521
1522 if (term > raft->synced_term) {
1523 ds_put_format(&s, " because message term %"PRIu64" is "
1524 "past synced term %"PRIu64,
1525 term, raft->synced_term);
1526 }
1527
1528 uint64_t index = raft_rpc_get_min_sync_index(rpc);
1529 if (index > raft->log_synced) {
1530 ds_put_format(&s, " %s message index %"PRIu64" is past last "
1531 "synced index %"PRIu64,
1532 s.length ? "and" : "because",
1533 index, raft->log_synced);
1534 }
1535
1536 const struct uuid *vote = raft_rpc_get_vote(rpc);
1537 if (vote && !uuid_equals(vote, &raft->synced_vote)) {
1538 char buf1[SID_LEN + 1];
1539 char buf2[SID_LEN + 1];
1540 ds_put_format(&s, " %s vote %s differs from synced vote %s",
1541 s.length ? "and" : "because",
1542 raft_get_nickname(raft, vote, buf1, sizeof buf1),
1543 raft_get_nickname(raft, &raft->synced_vote,
1544 buf2, sizeof buf2));
1545 }
1546
1547 char buf[SID_LEN + 1];
1548 ds_put_format(&s, ": %s ",
1549 raft_get_nickname(raft, &rpc->common.sid,
1550 buf, sizeof buf));
1551 raft_rpc_format(rpc, &s);
1552 VLOG_ERR("internal error: deferred %s message completed "
1553 "but not ready to send%s",
1554 raft_rpc_type_to_string(rpc->type), ds_cstr(&s));
1555 ds_destroy(&s);
1556
1557 return;
1558 }
1559
1560 struct raft_conn *dst = raft_find_conn_by_sid(raft, &rpc->common.sid);
1561 if (dst) {
02acb41a 1562 raft_send_to_conn(raft, rpc, dst);
1b1d2e6d
BP
1563 }
1564}
1565
1566static void
1567raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
1568{
1569 switch (w->type) {
1570 case RAFT_W_ENTRY:
1571 if (raft->role == RAFT_LEADER) {
1572 raft_update_our_match_index(raft, w->entry.index);
1573 }
1574 raft->log_synced = w->entry.index;
1575 break;
1576
1577 case RAFT_W_TERM:
1578 raft->synced_term = w->term.term;
1579 raft->synced_vote = w->term.vote;
1580 break;
1581
1582 case RAFT_W_RPC:
1583 raft_waiter_complete_rpc(raft, w->rpc);
1584 break;
1585 }
1586}
1587
1588static void
1589raft_waiter_destroy(struct raft_waiter *w)
1590{
1591 if (!w) {
1592 return;
1593 }
1594
1595 ovs_list_remove(&w->list_node);
1596
1597 switch (w->type) {
1598 case RAFT_W_ENTRY:
1599 case RAFT_W_TERM:
1600 break;
1601
1602 case RAFT_W_RPC:
1603 raft_rpc_uninit(w->rpc);
1604 free(w->rpc);
1605 break;
1606 }
1607 free(w);
1608}
1609
1610static void
1611raft_waiters_run(struct raft *raft)
1612{
1613 if (ovs_list_is_empty(&raft->waiters)) {
1614 return;
1615 }
1616
1617 uint64_t cur = ovsdb_log_commit_progress(raft->log);
1618 struct raft_waiter *w, *next;
1619 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1620 if (cur < w->commit_ticket) {
1621 break;
1622 }
1623 raft_waiter_complete(raft, w);
1624 raft_waiter_destroy(w);
1625 }
1626}
1627
1628static void
1629raft_waiters_wait(struct raft *raft)
1630{
1631 struct raft_waiter *w;
1632 LIST_FOR_EACH (w, list_node, &raft->waiters) {
1633 ovsdb_log_commit_wait(raft->log, w->commit_ticket);
1634 break;
1635 }
1636}
1637
1638static void
1639raft_waiters_destroy(struct raft *raft)
1640{
1641 struct raft_waiter *w, *next;
1642 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1643 raft_waiter_destroy(w);
1644 }
1645}
1646
1647static bool OVS_WARN_UNUSED_RESULT
1648raft_set_term(struct raft *raft, uint64_t term, const struct uuid *vote)
1649{
1650 struct ovsdb_error *error = raft_write_state(raft->log, term, vote);
1651 if (!raft_handle_write_error(raft, error)) {
1652 return false;
1653 }
1654
1655 struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_TERM, true);
1656 raft->term = w->term.term = term;
1657 raft->vote = w->term.vote = vote ? *vote : UUID_ZERO;
1658 return true;
1659}
1660
1661static void
1662raft_accept_vote(struct raft *raft, struct raft_server *s,
1663 const struct uuid *vote)
1664{
1665 if (uuid_equals(&s->vote, vote)) {
1666 return;
1667 }
1668 if (!uuid_is_zero(&s->vote)) {
1669 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1670 char buf1[SID_LEN + 1];
1671 char buf2[SID_LEN + 1];
1672 VLOG_WARN_RL(&rl, "server %s changed its vote from %s to %s",
1673 s->nickname,
1674 raft_get_nickname(raft, &s->vote, buf1, sizeof buf1),
1675 raft_get_nickname(raft, vote, buf2, sizeof buf2));
1676 }
1677 s->vote = *vote;
1678 if (uuid_equals(vote, &raft->sid)
1679 && ++raft->n_votes > hmap_count(&raft->servers) / 2) {
1680 raft_become_leader(raft);
1681 }
1682}
1683
1684static void
1685raft_start_election(struct raft *raft, bool leadership_transfer)
1686{
1687 if (raft->leaving) {
1688 return;
1689 }
1690
1691 struct raft_server *me = raft_find_server(raft, &raft->sid);
1692 if (!me) {
1693 return;
1694 }
1695
1696 if (!raft_set_term(raft, raft->term + 1, &raft->sid)) {
1697 return;
1698 }
1699
1b1d2e6d 1700 ovs_assert(raft->role != RAFT_LEADER);
93ee4209 1701
cdae6100 1702 raft->leader_sid = UUID_ZERO;
1b1d2e6d 1703 raft->role = RAFT_CANDIDATE;
923f01ca
HZ
1704 /* If there was no leader elected since last election, we know we are
1705 * retrying now. */
1706 raft->candidate_retrying = !raft->had_leader;
1707 raft->had_leader = false;
1b1d2e6d
BP
1708
1709 raft->n_votes = 0;
1710
1711 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1712 if (!VLOG_DROP_INFO(&rl)) {
1713 long long int now = time_msec();
1714 if (now >= raft->election_timeout) {
1715 VLOG_INFO("term %"PRIu64": %lld ms timeout expired, "
1716 "starting election",
1717 raft->term, now - raft->election_base);
1718 } else {
1719 VLOG_INFO("term %"PRIu64": starting election", raft->term);
1720 }
1721 }
0f954f32 1722 raft_reset_election_timer(raft);
1b1d2e6d
BP
1723
1724 struct raft_server *peer;
1725 HMAP_FOR_EACH (peer, hmap_node, &raft->servers) {
1726 peer->vote = UUID_ZERO;
1727 if (uuid_equals(&raft->sid, &peer->sid)) {
1728 continue;
1729 }
1730
1731 union raft_rpc rq = {
1732 .vote_request = {
1733 .common = {
1734 .type = RAFT_RPC_VOTE_REQUEST,
1735 .sid = peer->sid,
1736 },
1737 .term = raft->term,
1738 .last_log_index = raft->log_end - 1,
1739 .last_log_term = (
1740 raft->log_end > raft->log_start
1741 ? raft->entries[raft->log_end - raft->log_start - 1].term
1742 : raft->snap.term),
1743 .leadership_transfer = leadership_transfer,
1744 },
1745 };
93ee4209
HZ
1746 if (failure_test != FT_DONT_SEND_VOTE_REQUEST) {
1747 raft_send(raft, &rq);
1748 }
1b1d2e6d
BP
1749 }
1750
1751 /* Vote for ourselves. */
1752 raft_accept_vote(raft, me, &raft->sid);
1753}
1754
1755static void
1756raft_open_conn(struct raft *raft, const char *address, const struct uuid *sid)
1757{
1758 if (strcmp(address, raft->local_address)
1759 && !raft_find_conn_by_address(raft, address)) {
1760 raft_add_conn(raft, jsonrpc_session_open(address, true), sid, false);
1761 }
1762}
1763
1764static void
1765raft_conn_close(struct raft_conn *conn)
1766{
1767 jsonrpc_session_close(conn->js);
1768 ovs_list_remove(&conn->list_node);
1769 free(conn->nickname);
1770 free(conn);
1771}
1772
1773/* Returns true if 'conn' should stay open, false if it should be closed. */
1774static bool
1775raft_conn_should_stay_open(struct raft *raft, struct raft_conn *conn)
1776{
1777 /* Close the connection if it's actually dead. If necessary, we'll
1778 * initiate a new session later. */
1779 if (!jsonrpc_session_is_alive(conn->js)) {
1780 return false;
1781 }
1782
1783 /* Keep incoming sessions. We trust the originator to decide to drop
1784 * it. */
1785 if (conn->incoming) {
1786 return true;
1787 }
1788
1789 /* If we are joining the cluster, keep sessions to the remote addresses
1790 * that are supposed to be part of the cluster we're joining. */
1791 if (raft->joining && sset_contains(&raft->remote_addresses,
1792 jsonrpc_session_get_name(conn->js))) {
1793 return true;
1794 }
1795
1796 /* We have joined the cluster. If we did that "recently", then there is a
1797 * chance that we do not have the most recent server configuration log
1798 * entry. If so, it's a waste to disconnect from the servers that were in
1799 * remote_addresses and that will probably appear in the configuration,
1800 * just to reconnect to them a moment later when we do get the
1801 * configuration update. If we are not ourselves in the configuration,
1802 * then we know that there must be a new configuration coming up, so in
1803 * that case keep the connection. */
1804 if (!raft_find_server(raft, &raft->sid)) {
1805 return true;
1806 }
1807
1808 /* Keep the connection only if the server is part of the configuration. */
1809 return raft_find_server(raft, &conn->sid);
1810}
1811
1812/* Allows 'raft' to maintain the distributed log. Call this function as part
1813 * of the process's main loop. */
1814void
1815raft_run(struct raft *raft)
1816{
1817 if (raft->left || raft->failed) {
1818 return;
1819 }
1820
1821 raft_waiters_run(raft);
1822
1823 if (!raft->listener && time_msec() >= raft->listen_backoff) {
1824 char *paddr = raft_make_address_passive(raft->local_address);
1825 int error = pstream_open(paddr, &raft->listener, DSCP_DEFAULT);
1826 if (error) {
1827 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1828 VLOG_WARN_RL(&rl, "%s: listen failed (%s)",
1829 paddr, ovs_strerror(error));
1830 raft->listen_backoff = time_msec() + 1000;
1831 }
1832 free(paddr);
1833 }
1834
1835 if (raft->listener) {
1836 struct stream *stream;
1837 int error = pstream_accept(raft->listener, &stream);
1838 if (!error) {
1839 raft_add_conn(raft, jsonrpc_session_open_unreliably(
1840 jsonrpc_open(stream), DSCP_DEFAULT), NULL,
1841 true);
1842 } else if (error != EAGAIN) {
1843 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1844 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
1845 pstream_get_name(raft->listener),
1846 ovs_strerror(error));
1847 }
1848 }
1849
1850 /* Run RPCs for all open sessions. */
1851 struct raft_conn *conn;
1852 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1853 raft_conn_run(raft, conn);
1854 }
1855
1856 /* Close unneeded sessions. */
1857 struct raft_conn *next;
1858 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1859 if (!raft_conn_should_stay_open(raft, conn)) {
1860 raft_conn_close(conn);
1861 }
1862 }
1863
1864 /* Open needed sessions. */
1865 struct raft_server *server;
1866 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1867 raft_open_conn(raft, server->address, &server->sid);
1868 }
1869 if (raft->joining) {
1870 const char *address;
1871 SSET_FOR_EACH (address, &raft->remote_addresses) {
1872 raft_open_conn(raft, address, NULL);
1873 }
1874 }
1875
1876 if (!raft->joining && time_msec() >= raft->election_timeout) {
89771c1e
HZ
1877 if (raft->role == RAFT_LEADER) {
1878 /* Check if majority of followers replied, then reset
1879 * election_timeout and reset s->replied. Otherwise, become
1880 * follower.
1881 *
1882 * Raft paper section 6.2: Leaders: A server might be in the leader
1883 * state, but if it isn’t the current leader, it could be
1884 * needlessly delaying client requests. For example, suppose a
1885 * leader is partitioned from the rest of the cluster, but it can
1886 * still communicate with a particular client. Without additional
1887 * mechanism, it could delay a request from that client forever,
1888 * being unable to replicate a log entry to any other servers.
1889 * Meanwhile, there might be another leader of a newer term that is
1890 * able to communicate with a majority of the cluster and would be
1891 * able to commit the client’s request. Thus, a leader in Raft
1892 * steps down if an election timeout elapses without a successful
1893 * round of heartbeats to a majority of its cluster; this allows
1894 * clients to retry their requests with another server. */
1895 int count = 0;
1896 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1897 if (server->replied) {
1898 count ++;
1899 }
1900 }
1901 if (count >= hmap_count(&raft->servers) / 2) {
1902 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1903 server->replied = false;
1904 }
1905 raft_reset_election_timer(raft);
1906 } else {
1907 raft_become_follower(raft);
1908 raft_start_election(raft, false);
1909 }
1910 } else {
1911 raft_start_election(raft, false);
1912 }
1913
1b1d2e6d
BP
1914 }
1915
1916 if (raft->leaving && time_msec() >= raft->leave_timeout) {
1917 raft_send_remove_server_requests(raft);
1918 }
1919
1920 if (raft->joining && time_msec() >= raft->join_timeout) {
1921 raft->join_timeout = time_msec() + 1000;
1922 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1923 raft_send_add_server_request(raft, conn);
1924 }
1925 }
1926
5a9b53a5
HZ
1927 long long int now = time_msec();
1928 if (now >= raft->ping_timeout) {
1b1d2e6d
BP
1929 if (raft->role == RAFT_LEADER) {
1930 raft_send_heartbeats(raft);
5a9b53a5
HZ
1931 }
1932 /* Check if any commands timeout. Timeout is set to twice the time of
1933 * election base time so that commands can complete properly during
1934 * leader election. E.g. a leader crashed and current node with pending
1935 * commands becomes new leader: the pending commands can still complete
1936 * if the crashed leader has replicated the transactions to majority of
1937 * followers before it crashed. */
1938 struct raft_command *cmd, *next_cmd;
1939 HMAP_FOR_EACH_SAFE (cmd, next_cmd, hmap_node, &raft->commands) {
1940 if (cmd->timestamp
8e354614 1941 && now - cmd->timestamp > raft->election_timer * 2) {
5a9b53a5 1942 raft_command_complete(raft, cmd, RAFT_CMD_TIMEOUT);
1b1d2e6d
BP
1943 }
1944 }
f4bef8b3 1945 raft_reset_ping_timer(raft);
1b1d2e6d
BP
1946 }
1947
1948 /* Do this only at the end; if we did it as soon as we set raft->left or
1949 * raft->failed in handling the RemoveServerReply, then it could easily
1950 * cause references to freed memory in RPC sessions, etc. */
1951 if (raft->left || raft->failed) {
1952 raft_close__(raft);
1953 }
1954}
1955
1956static void
1957raft_wait_session(struct jsonrpc_session *js)
1958{
1959 if (js) {
1960 jsonrpc_session_wait(js);
1961 jsonrpc_session_recv_wait(js);
1962 }
1963}
1964
1965/* Causes the next call to poll_block() to wake up when 'raft' needs to do
1966 * something. */
1967void
1968raft_wait(struct raft *raft)
1969{
1970 if (raft->left || raft->failed) {
1971 return;
1972 }
1973
1974 raft_waiters_wait(raft);
1975
1976 if (raft->listener) {
1977 pstream_wait(raft->listener);
1978 } else {
1979 poll_timer_wait_until(raft->listen_backoff);
1980 }
1981
1982 struct raft_conn *conn;
1983 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1984 raft_wait_session(conn->js);
1985 }
1986
1987 if (!raft->joining) {
1988 poll_timer_wait_until(raft->election_timeout);
1989 } else {
1990 poll_timer_wait_until(raft->join_timeout);
1991 }
1992 if (raft->leaving) {
1993 poll_timer_wait_until(raft->leave_timeout);
1994 }
1995 if (raft->role == RAFT_LEADER || !hmap_is_empty(&raft->commands)) {
1996 poll_timer_wait_until(raft->ping_timeout);
1997 }
1998}
1999
2000static struct raft_waiter *
2001raft_waiter_create(struct raft *raft, enum raft_waiter_type type,
2002 bool start_commit)
2003{
2004 struct raft_waiter *w = xzalloc(sizeof *w);
2005 ovs_list_push_back(&raft->waiters, &w->list_node);
2006 w->commit_ticket = start_commit ? ovsdb_log_commit_start(raft->log) : 0;
2007 w->type = type;
2008 return w;
2009}
2010
2011/* Returns a human-readable representation of 'status' (or NULL if 'status' is
2012 * invalid). */
2013const char *
2014raft_command_status_to_string(enum raft_command_status status)
2015{
2016 switch (status) {
2017 case RAFT_CMD_INCOMPLETE:
2018 return "operation still in progress";
2019 case RAFT_CMD_SUCCESS:
2020 return "success";
2021 case RAFT_CMD_NOT_LEADER:
2022 return "not leader";
2023 case RAFT_CMD_BAD_PREREQ:
2024 return "prerequisite check failed";
2025 case RAFT_CMD_LOST_LEADERSHIP:
2026 return "lost leadership";
2027 case RAFT_CMD_SHUTDOWN:
2028 return "server shutdown";
2029 case RAFT_CMD_IO_ERROR:
2030 return "I/O error";
2031 case RAFT_CMD_TIMEOUT:
2032 return "timeout";
2033 default:
2034 return NULL;
2035 }
2036}
2037
2038/* Converts human-readable status in 's' into status code in '*statusp'.
2039 * Returns true if successful, false if 's' is unknown. */
2040bool
2041raft_command_status_from_string(const char *s,
2042 enum raft_command_status *statusp)
2043{
2044 for (enum raft_command_status status = 0; ; status++) {
2045 const char *s2 = raft_command_status_to_string(status);
2046 if (!s2) {
2047 *statusp = 0;
2048 return false;
2049 } else if (!strcmp(s, s2)) {
2050 *statusp = status;
2051 return true;
2052 }
2053 }
2054}
2055
2056static const struct uuid *
2057raft_get_eid(const struct raft *raft, uint64_t index)
2058{
2059 for (; index >= raft->log_start; index--) {
2060 const struct raft_entry *e = raft_get_entry(raft, index);
2061 if (e->data) {
2062 return &e->eid;
2063 }
2064 }
2065 return &raft->snap.eid;
2066}
2067
2cd62f75 2068const struct uuid *
1b1d2e6d
BP
2069raft_current_eid(const struct raft *raft)
2070{
2071 return raft_get_eid(raft, raft->log_end - 1);
2072}
2073
2074static struct raft_command *
2075raft_command_create_completed(enum raft_command_status status)
2076{
2077 ovs_assert(status != RAFT_CMD_INCOMPLETE);
2078
2079 struct raft_command *cmd = xzalloc(sizeof *cmd);
2080 cmd->n_refs = 1;
2081 cmd->status = status;
2082 return cmd;
2083}
2084
2085static struct raft_command *
2086raft_command_create_incomplete(struct raft *raft, uint64_t index)
2087{
2088 struct raft_command *cmd = xzalloc(sizeof *cmd);
2089 cmd->n_refs = 2; /* One for client, one for raft->commands. */
2090 cmd->index = index;
2091 cmd->status = RAFT_CMD_INCOMPLETE;
2092 hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
2093 return cmd;
2094}
2095
2096static struct raft_command * OVS_WARN_UNUSED_RESULT
2097raft_command_initiate(struct raft *raft,
2098 const struct json *data, const struct json *servers,
8e354614 2099 uint64_t election_timer, const struct uuid *eid)
1b1d2e6d
BP
2100{
2101 /* Write to local log. */
2102 uint64_t index = raft->log_end;
2103 if (!raft_handle_write_error(
2104 raft, raft_write_entry(
2105 raft, raft->term, json_nullable_clone(data), eid,
8e354614
HZ
2106 json_nullable_clone(servers),
2107 election_timer))) {
1b1d2e6d
BP
2108 return raft_command_create_completed(RAFT_CMD_IO_ERROR);
2109 }
2110
2111 struct raft_command *cmd = raft_command_create_incomplete(raft, index);
7ef36089
HZ
2112 ovs_assert(eid);
2113 cmd->eid = *eid;
5a9b53a5 2114 cmd->timestamp = time_msec();
1b1d2e6d
BP
2115
2116 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index = cmd->index;
2117
67dba070
HZ
2118 if (failure_test == FT_CRASH_BEFORE_SEND_APPEND_REQ) {
2119 ovs_fatal(0, "Raft test: crash before sending append_request.");
2120 }
1b1d2e6d
BP
2121 /* Write to remote logs. */
2122 struct raft_server *s;
2123 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2124 if (!uuid_equals(&s->sid, &raft->sid) && s->next_index == index) {
2125 raft_send_append_request(raft, s, 1, "execute command");
2126 s->next_index++;
2127 }
2128 }
67dba070
HZ
2129 if (failure_test == FT_CRASH_AFTER_SEND_APPEND_REQ) {
2130 ovs_fatal(0, "Raft test: crash after sending append_request.");
2131 }
0f954f32 2132 raft_reset_ping_timer(raft);
1b1d2e6d
BP
2133
2134 return cmd;
2135}
2136
5a9b53a5
HZ
2137static void
2138log_all_commands(struct raft *raft)
2139{
2140 struct raft_command *cmd, *next;
2141 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2142 VLOG_DBG("raft command eid: "UUID_FMT, UUID_ARGS(&cmd->eid));
2143 }
2144}
2145
1b1d2e6d 2146static struct raft_command * OVS_WARN_UNUSED_RESULT
8e354614
HZ
2147raft_command_execute__(struct raft *raft, const struct json *data,
2148 const struct json *servers, uint64_t election_timer,
1b1d2e6d
BP
2149 const struct uuid *prereq, struct uuid *result)
2150{
2151 if (raft->joining || raft->leaving || raft->left || raft->failed) {
2152 return raft_command_create_completed(RAFT_CMD_SHUTDOWN);
2153 }
2154
2155 if (raft->role != RAFT_LEADER) {
2156 /* Consider proxying the command to the leader. We can only do that if
2157 * we know the leader and the command does not change the set of
2158 * servers. We do not proxy commands without prerequisites, even
2159 * though we could, because in an OVSDB context a log entry doesn't
2160 * make sense without context. */
8e354614 2161 if (servers || election_timer || !data
1b1d2e6d
BP
2162 || raft->role != RAFT_FOLLOWER || uuid_is_zero(&raft->leader_sid)
2163 || !prereq) {
2164 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2165 }
2166 }
2167
2168 struct uuid eid = data ? uuid_random() : UUID_ZERO;
2169 if (result) {
2170 *result = eid;
2171 }
2172
2173 if (raft->role != RAFT_LEADER) {
2174 const union raft_rpc rpc = {
2175 .execute_command_request = {
2176 .common = {
2177 .type = RAFT_RPC_EXECUTE_COMMAND_REQUEST,
2178 .sid = raft->leader_sid,
2179 },
2180 .data = CONST_CAST(struct json *, data),
2181 .prereq = *prereq,
2182 .result = eid,
2183 }
2184 };
67dba070
HZ
2185 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REQ) {
2186 ovs_fatal(0, "Raft test: crash before sending "
2187 "execute_command_request");
2188 }
1b1d2e6d
BP
2189 if (!raft_send(raft, &rpc)) {
2190 /* Couldn't send command, so it definitely failed. */
2191 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2192 }
67dba070
HZ
2193 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REQ) {
2194 ovs_fatal(0, "Raft test: crash after sending "
2195 "execute_command_request");
2196 }
1b1d2e6d
BP
2197
2198 struct raft_command *cmd = raft_command_create_incomplete(raft, 0);
2199 cmd->timestamp = time_msec();
2200 cmd->eid = eid;
5a9b53a5 2201 log_all_commands(raft);
1b1d2e6d
BP
2202 return cmd;
2203 }
2204
2205 const struct uuid *current_eid = raft_current_eid(raft);
2206 if (prereq && !uuid_equals(prereq, current_eid)) {
2207 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2208 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
2209 "prerequisite "UUID_FMT,
2210 UUID_ARGS(current_eid), UUID_ARGS(prereq));
2211 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ);
2212 }
2213
8e354614 2214 return raft_command_initiate(raft, data, servers, election_timer, &eid);
1b1d2e6d
BP
2215}
2216
2217/* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2218 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2219 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2220 * populated with the entry ID for the new log entry.
2221 *
2222 * Returns a "struct raft_command" that may be used to track progress adding
2223 * the log entry. The caller must eventually free the returned structure, with
2224 * raft_command_unref(). */
2225struct raft_command * OVS_WARN_UNUSED_RESULT
2226raft_command_execute(struct raft *raft, const struct json *data,
2227 const struct uuid *prereq, struct uuid *result)
2228{
8e354614 2229 return raft_command_execute__(raft, data, NULL, 0, prereq, result);
1b1d2e6d
BP
2230}
2231
2232/* Returns the status of 'cmd'. */
2233enum raft_command_status
2234raft_command_get_status(const struct raft_command *cmd)
2235{
2236 ovs_assert(cmd->n_refs > 0);
2237 return cmd->status;
2238}
2239
2240/* Returns the index of the log entry at which 'cmd' was committed.
2241 *
2242 * This function works only with successful commands. */
2243uint64_t
2244raft_command_get_commit_index(const struct raft_command *cmd)
2245{
2246 ovs_assert(cmd->n_refs > 0);
2247 ovs_assert(cmd->status == RAFT_CMD_SUCCESS);
2248 return cmd->index;
2249}
2250
2251/* Frees 'cmd'. */
2252void
2253raft_command_unref(struct raft_command *cmd)
2254{
2255 if (cmd) {
2256 ovs_assert(cmd->n_refs > 0);
2257 if (!--cmd->n_refs) {
2258 free(cmd);
2259 }
2260 }
2261}
2262
2263/* Causes poll_block() to wake up when 'cmd' has status to report. */
2264void
2265raft_command_wait(const struct raft_command *cmd)
2266{
2267 if (cmd->status != RAFT_CMD_INCOMPLETE) {
2268 poll_immediate_wake();
2269 }
2270}
2271
2272static void
2273raft_command_complete(struct raft *raft,
2274 struct raft_command *cmd,
2275 enum raft_command_status status)
2276{
5a9b53a5
HZ
2277 VLOG_DBG("raft_command_complete eid "UUID_FMT" status: %s",
2278 UUID_ARGS(&cmd->eid), raft_command_status_to_string(status));
1b1d2e6d
BP
2279 if (!uuid_is_zero(&cmd->sid)) {
2280 uint64_t commit_index = status == RAFT_CMD_SUCCESS ? cmd->index : 0;
2281 raft_send_execute_command_reply(raft, &cmd->sid, &cmd->eid, status,
2282 commit_index);
2283 }
2284
2285 ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
2286 ovs_assert(cmd->n_refs > 0);
2287 hmap_remove(&raft->commands, &cmd->hmap_node);
2288 cmd->status = status;
2289 raft_command_unref(cmd);
2290}
2291
2292static void
2293raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
2294{
2295 struct raft_command *cmd, *next;
2296 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2297 raft_command_complete(raft, cmd, status);
2298 }
2299}
2300
1b1d2e6d
BP
2301static struct raft_command *
2302raft_find_command_by_eid(struct raft *raft, const struct uuid *eid)
2303{
2304 struct raft_command *cmd;
2305
2306 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2307 if (uuid_equals(&cmd->eid, eid)) {
2308 return cmd;
2309 }
2310 }
2311 return NULL;
2312}
2313\f
2314#define RAFT_RPC(ENUM, NAME) \
2315 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2316RAFT_RPC_TYPES
2317#undef RAFT_RPC
2318
2319static void
2320raft_handle_hello_request(struct raft *raft OVS_UNUSED,
2321 const struct raft_hello_request *hello OVS_UNUSED)
2322{
2323}
2324
2325/* 'sid' is the server being added. */
2326static void
2327raft_send_add_server_reply__(struct raft *raft, const struct uuid *sid,
2328 const char *address,
2329 bool success, const char *comment)
2330{
2331 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2332 if (!VLOG_DROP_INFO(&rl)) {
2333 struct ds s = DS_EMPTY_INITIALIZER;
2334 char buf[SID_LEN + 1];
2335 ds_put_format(&s, "adding %s ("SID_FMT" at %s) "
2336 "to cluster "CID_FMT" %s",
2337 raft_get_nickname(raft, sid, buf, sizeof buf),
2338 SID_ARGS(sid), address, CID_ARGS(&raft->cid),
2339 success ? "succeeded" : "failed");
2340 if (comment) {
2341 ds_put_format(&s, " (%s)", comment);
2342 }
2343 VLOG_INFO("%s", ds_cstr(&s));
2344 ds_destroy(&s);
2345 }
2346
2347 union raft_rpc rpy = {
2348 .add_server_reply = {
2349 .common = {
2350 .type = RAFT_RPC_ADD_SERVER_REPLY,
2351 .sid = *sid,
2352 .comment = CONST_CAST(char *, comment),
2353 },
2354 .success = success,
2355 }
2356 };
2357
2358 struct sset *remote_addresses = &rpy.add_server_reply.remote_addresses;
2359 sset_init(remote_addresses);
2360 if (!raft->joining) {
2361 struct raft_server *s;
2362 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2363 if (!uuid_equals(&s->sid, &raft->sid)) {
2364 sset_add(remote_addresses, s->address);
2365 }
2366 }
2367 }
2368
2369 raft_send(raft, &rpy);
2370
2371 sset_destroy(remote_addresses);
2372}
2373
2374static void
17bd4149
BP
2375raft_send_remove_server_reply_rpc(struct raft *raft,
2376 const struct uuid *dst_sid,
2377 const struct uuid *target_sid,
1b1d2e6d
BP
2378 bool success, const char *comment)
2379{
17bd4149
BP
2380 if (uuid_equals(&raft->sid, dst_sid)) {
2381 if (success && uuid_equals(&raft->sid, target_sid)) {
2382 raft_finished_leaving_cluster(raft);
2383 }
2384 return;
2385 }
2386
1b1d2e6d
BP
2387 const union raft_rpc rpy = {
2388 .remove_server_reply = {
2389 .common = {
2390 .type = RAFT_RPC_REMOVE_SERVER_REPLY,
17bd4149 2391 .sid = *dst_sid,
1b1d2e6d
BP
2392 .comment = CONST_CAST(char *, comment),
2393 },
17bd4149
BP
2394 .target_sid = (uuid_equals(dst_sid, target_sid)
2395 ? UUID_ZERO
2396 : *target_sid),
1b1d2e6d
BP
2397 .success = success,
2398 }
2399 };
2400 raft_send(raft, &rpy);
2401}
2402
2403static void
2404raft_send_remove_server_reply__(struct raft *raft,
2405 const struct uuid *target_sid,
2406 const struct uuid *requester_sid,
2407 struct unixctl_conn *requester_conn,
2408 bool success, const char *comment)
2409{
2410 struct ds s = DS_EMPTY_INITIALIZER;
2411 ds_put_format(&s, "request ");
2412 if (!uuid_is_zero(requester_sid)) {
2413 char buf[SID_LEN + 1];
2414 ds_put_format(&s, "by %s",
2415 raft_get_nickname(raft, requester_sid, buf, sizeof buf));
2416 } else {
2417 ds_put_cstr(&s, "via unixctl");
2418 }
2419 ds_put_cstr(&s, " to remove ");
2420 if (!requester_conn && uuid_equals(target_sid, requester_sid)) {
2421 ds_put_cstr(&s, "itself");
2422 } else {
2423 char buf[SID_LEN + 1];
2424 ds_put_cstr(&s, raft_get_nickname(raft, target_sid, buf, sizeof buf));
17bd4149
BP
2425 if (uuid_equals(target_sid, &raft->sid)) {
2426 ds_put_cstr(&s, " (ourselves)");
2427 }
1b1d2e6d
BP
2428 }
2429 ds_put_format(&s, " from cluster "CID_FMT" %s",
2430 CID_ARGS(&raft->cid),
2431 success ? "succeeded" : "failed");
2432 if (comment) {
2433 ds_put_format(&s, " (%s)", comment);
2434 }
2435
2436 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2437 VLOG_INFO_RL(&rl, "%s", ds_cstr(&s));
2438
2439 /* Send RemoveServerReply to the requester (which could be a server or a
2440 * unixctl connection. Also always send it to the removed server; this
2441 * allows it to be sure that it's really removed and update its log and
2442 * disconnect permanently. */
2443 if (!uuid_is_zero(requester_sid)) {
17bd4149 2444 raft_send_remove_server_reply_rpc(raft, requester_sid, target_sid,
1b1d2e6d
BP
2445 success, comment);
2446 }
2447 if (!uuid_equals(requester_sid, target_sid)) {
17bd4149
BP
2448 raft_send_remove_server_reply_rpc(raft, target_sid, target_sid,
2449 success, comment);
1b1d2e6d
BP
2450 }
2451 if (requester_conn) {
2452 if (success) {
2453 unixctl_command_reply(requester_conn, ds_cstr(&s));
2454 } else {
2455 unixctl_command_reply_error(requester_conn, ds_cstr(&s));
2456 }
2457 }
2458
2459 ds_destroy(&s);
2460}
2461
2462static void
2463raft_send_add_server_reply(struct raft *raft,
2464 const struct raft_add_server_request *rq,
2465 bool success, const char *comment)
2466{
2467 return raft_send_add_server_reply__(raft, &rq->common.sid, rq->address,
2468 success, comment);
2469}
2470
2471static void
2472raft_send_remove_server_reply(struct raft *raft,
2473 const struct raft_remove_server_request *rq,
2474 bool success, const char *comment)
2475{
2476 return raft_send_remove_server_reply__(raft, &rq->sid, &rq->common.sid,
2477 rq->requester_conn, success,
2478 comment);
2479}
2480
2481static void
2482raft_become_follower(struct raft *raft)
2483{
2484 raft->leader_sid = UUID_ZERO;
2485 if (raft->role == RAFT_FOLLOWER) {
2486 return;
2487 }
2488
2489 raft->role = RAFT_FOLLOWER;
0f954f32 2490 raft_reset_election_timer(raft);
1b1d2e6d
BP
2491
2492 /* Notify clients about lost leadership.
2493 *
2494 * We do not reverse our changes to 'raft->servers' because the new
2495 * configuration is already part of the log. Possibly the configuration
2496 * log entry will not be committed, but until we know that we must use the
2497 * new configuration. Our AppendEntries processing will properly update
2498 * the server configuration later, if necessary. */
2499 struct raft_server *s;
2500 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
2501 raft_send_add_server_reply__(raft, &s->sid, s->address, false,
2502 RAFT_SERVER_LOST_LEADERSHIP);
2503 }
2504 if (raft->remove_server) {
2505 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
2506 &raft->remove_server->requester_sid,
2507 raft->remove_server->requester_conn,
2508 false, RAFT_SERVER_LOST_LEADERSHIP);
2509 raft_server_destroy(raft->remove_server);
2510 raft->remove_server = NULL;
2511 }
2512
2513 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2514}
2515
2516static void
2517raft_send_append_request(struct raft *raft,
2518 struct raft_server *peer, unsigned int n,
2519 const char *comment)
2520{
2521 ovs_assert(raft->role == RAFT_LEADER);
2522
2523 const union raft_rpc rq = {
2524 .append_request = {
2525 .common = {
2526 .type = RAFT_RPC_APPEND_REQUEST,
2527 .sid = peer->sid,
2528 .comment = CONST_CAST(char *, comment),
2529 },
2530 .term = raft->term,
2531 .prev_log_index = peer->next_index - 1,
2532 .prev_log_term = (peer->next_index - 1 >= raft->log_start
2533 ? raft->entries[peer->next_index - 1
2534 - raft->log_start].term
2535 : raft->snap.term),
2536 .leader_commit = raft->commit_index,
2537 .entries = &raft->entries[peer->next_index - raft->log_start],
2538 .n_entries = n,
2539 },
2540 };
2541 raft_send(raft, &rq);
2542}
2543
2544static void
2545raft_send_heartbeats(struct raft *raft)
2546{
2547 struct raft_server *s;
2548 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2549 if (!uuid_equals(&raft->sid, &s->sid)) {
2550 raft_send_append_request(raft, s, 0, "heartbeat");
2551 }
2552 }
2553
2554 /* Send anyone waiting for a command to complete a ping to let them
2555 * know we're still working on it. */
2556 struct raft_command *cmd;
2557 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2558 if (!uuid_is_zero(&cmd->sid)) {
2559 raft_send_execute_command_reply(raft, &cmd->sid,
2560 &cmd->eid,
2561 RAFT_CMD_INCOMPLETE, 0);
2562 }
2563 }
0f954f32
HZ
2564
2565 raft_reset_ping_timer(raft);
1b1d2e6d
BP
2566}
2567
2568/* Initializes the fields in 's' that represent the leader's view of the
2569 * server. */
2570static void
2571raft_server_init_leader(struct raft *raft, struct raft_server *s)
2572{
2573 s->next_index = raft->log_end;
2574 s->match_index = 0;
2575 s->phase = RAFT_PHASE_STABLE;
89771c1e 2576 s->replied = false;
83fbd2e9 2577 s->install_snapshot_request_in_progress = false;
1b1d2e6d
BP
2578}
2579
923f01ca
HZ
2580static void
2581raft_set_leader(struct raft *raft, const struct uuid *sid)
2582{
2583 raft->leader_sid = *sid;
2833885f 2584 raft->ever_had_leader = raft->had_leader = true;
923f01ca
HZ
2585 raft->candidate_retrying = false;
2586}
2587
1b1d2e6d
BP
2588static void
2589raft_become_leader(struct raft *raft)
2590{
5a9b53a5 2591 log_all_commands(raft);
1b1d2e6d
BP
2592
2593 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
2594 VLOG_INFO_RL(&rl, "term %"PRIu64": elected leader by %d+ of "
2595 "%"PRIuSIZE" servers", raft->term,
2596 raft->n_votes, hmap_count(&raft->servers));
2597
2598 ovs_assert(raft->role != RAFT_LEADER);
2599 raft->role = RAFT_LEADER;
923f01ca 2600 raft_set_leader(raft, &raft->sid);
89771c1e 2601 raft_reset_election_timer(raft);
0f954f32 2602 raft_reset_ping_timer(raft);
1b1d2e6d
BP
2603
2604 struct raft_server *s;
2605 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2606 raft_server_init_leader(raft, s);
2607 }
2608
8e354614
HZ
2609 raft->election_timer_new = 0;
2610
1b1d2e6d 2611 raft_update_our_match_index(raft, raft->log_end - 1);
1b1d2e6d
BP
2612
2613 /* Write the fact that we are leader to the log. This is not used by the
2614 * algorithm (although it could be, for quick restart), but it is used for
2615 * offline analysis to check for conformance with the properties that Raft
2616 * guarantees. */
2617 struct raft_record r = {
2618 .type = RAFT_REC_LEADER,
2619 .term = raft->term,
2620 .sid = raft->sid,
2621 };
2622 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2623
2624 /* Initiate a no-op commit. Otherwise we might never find out what's in
2625 * the log. See section 6.4 item 1:
2626 *
2627 * The Leader Completeness Property guarantees that a leader has all
2628 * committed entries, but at the start of its term, it may not know
2629 * which those are. To find out, it needs to commit an entry from its
2630 * term. Raft handles this by having each leader commit a blank no-op
2631 * entry into the log at the start of its term. As soon as this no-op
2632 * entry is committed, the leader’s commit index will be at least as
2633 * large as any other servers’ during its term.
2634 */
8e354614
HZ
2635 raft_command_unref(raft_command_execute__(raft, NULL, NULL, 0, NULL,
2636 NULL));
1b1d2e6d
BP
2637}
2638
2639/* Processes term 'term' received as part of RPC 'common'. Returns true if the
2640 * caller should continue processing the RPC, false if the caller should reject
2641 * it due to a stale term. */
2642static bool
2643raft_receive_term__(struct raft *raft, const struct raft_rpc_common *common,
2644 uint64_t term)
2645{
2646 /* Section 3.3 says:
2647 *
2648 * Current terms are exchanged whenever servers communicate; if one
2649 * server’s current term is smaller than the other’s, then it updates
2650 * its current term to the larger value. If a candidate or leader
2651 * discovers that its term is out of date, it immediately reverts to
2652 * follower state. If a server receives a request with a stale term
2653 * number, it rejects the request.
2654 */
2655 if (term > raft->term) {
2656 if (!raft_set_term(raft, term, NULL)) {
2657 /* Failed to update the term to 'term'. */
2658 return false;
2659 }
2660 raft_become_follower(raft);
2661 } else if (term < raft->term) {
2662 char buf[SID_LEN + 1];
2663 VLOG_INFO("rejecting term %"PRIu64" < current term %"PRIu64" received "
2664 "in %s message from server %s",
2665 term, raft->term,
2666 raft_rpc_type_to_string(common->type),
2667 raft_get_nickname(raft, &common->sid, buf, sizeof buf));
2668 return false;
2669 }
2670 return true;
2671}
2672
2673static void
2674raft_get_servers_from_log(struct raft *raft, enum vlog_level level)
2675{
2676 const struct json *servers_json = raft->snap.servers;
2677 for (uint64_t index = raft->log_end - 1; index >= raft->log_start;
2678 index--) {
2679 struct raft_entry *e = &raft->entries[index - raft->log_start];
2680 if (e->servers) {
2681 servers_json = e->servers;
2682 break;
2683 }
2684 }
2685
2686 struct hmap servers;
2687 struct ovsdb_error *error = raft_servers_from_json(servers_json, &servers);
2688 ovs_assert(!error);
2689 raft_set_servers(raft, &servers, level);
2690 raft_servers_destroy(&servers);
2691}
2692
2693/* Truncates the log, so that raft->log_end becomes 'new_end'.
2694 *
2695 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2696 * log file, but we don't have the right information to know how long it should
2697 * be. What we actually do is to append entries for older indexes to the
2698 * on-disk log; when we re-read it later, these entries truncate the log.
2699 *
2700 * Returns true if any of the removed log entries were server configuration
2701 * entries, false otherwise. */
2702static bool
2703raft_truncate(struct raft *raft, uint64_t new_end)
2704{
2705 ovs_assert(new_end >= raft->log_start);
2706 if (raft->log_end > new_end) {
2707 char buf[SID_LEN + 1];
2708 VLOG_INFO("%s truncating %"PRIu64 " entries from end of log",
2709 raft_get_nickname(raft, &raft->sid, buf, sizeof buf),
2710 raft->log_end - new_end);
2711 }
2712
2713 bool servers_changed = false;
2714 while (raft->log_end > new_end) {
2715 struct raft_entry *entry = &raft->entries[--raft->log_end
2716 - raft->log_start];
2717 if (entry->servers) {
2718 servers_changed = true;
2719 }
2720 raft_entry_uninit(entry);
2721 }
2722 return servers_changed;
2723}
2724
2725static const struct json *
2726raft_peek_next_entry(struct raft *raft, struct uuid *eid)
2727{
2728 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2729 ovs_assert(raft->log_start <= raft->last_applied + 2);
2730 ovs_assert(raft->last_applied <= raft->commit_index);
2731 ovs_assert(raft->commit_index < raft->log_end);
2732
2733 if (raft->joining || raft->failed) {
2734 return NULL;
2735 }
2736
2737 if (raft->log_start == raft->last_applied + 2) {
2738 *eid = raft->snap.eid;
2739 return raft->snap.data;
2740 }
2741
2742 while (raft->last_applied < raft->commit_index) {
2743 const struct raft_entry *e = raft_get_entry(raft,
2744 raft->last_applied + 1);
2745 if (e->data) {
2746 *eid = e->eid;
2747 return e->data;
2748 }
2749 raft->last_applied++;
2750 }
2751 return NULL;
2752}
2753
2754static const struct json *
2755raft_get_next_entry(struct raft *raft, struct uuid *eid)
2756{
2757 const struct json *data = raft_peek_next_entry(raft, eid);
2758 if (data) {
2759 raft->last_applied++;
2760 }
2761 return data;
2762}
2763
0f954f32
HZ
2764/* Updates commit index in raft log. If commit index is already up-to-date
2765 * it does nothing and return false, otherwise, returns true. */
2766static bool
1b1d2e6d
BP
2767raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
2768{
2769 if (new_commit_index <= raft->commit_index) {
0f954f32 2770 return false;
1b1d2e6d
BP
2771 }
2772
2773 if (raft->role == RAFT_LEADER) {
2774 while (raft->commit_index < new_commit_index) {
2775 uint64_t index = ++raft->commit_index;
2776 const struct raft_entry *e = raft_get_entry(raft, index);
1b1d2e6d
BP
2777 if (e->data) {
2778 struct raft_command *cmd
5a9b53a5 2779 = raft_find_command_by_eid(raft, &e->eid);
1b1d2e6d 2780 if (cmd) {
5a9b53a5
HZ
2781 if (!cmd->index) {
2782 VLOG_DBG("Command completed after role change from"
2783 " follower to leader "UUID_FMT,
2784 UUID_ARGS(&e->eid));
2785 cmd->index = index;
2786 }
1b1d2e6d
BP
2787 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2788 }
2789 }
8e354614
HZ
2790 if (e->election_timer) {
2791 VLOG_INFO("Election timer changed from %"PRIu64" to %"PRIu64,
2792 raft->election_timer, e->election_timer);
2793 raft->election_timer = e->election_timer;
2794 raft->election_timer_new = 0;
2795 }
8b37ed75
BP
2796 if (e->servers) {
2797 /* raft_run_reconfigure() can write a new Raft entry, which can
2798 * reallocate raft->entries, which would invalidate 'e', so
2799 * this case must be last, after the one for 'e->data'. */
2800 raft_run_reconfigure(raft);
2801 }
1b1d2e6d
BP
2802 }
2803 } else {
8e354614
HZ
2804 while (raft->commit_index < new_commit_index) {
2805 uint64_t index = ++raft->commit_index;
2806 const struct raft_entry *e = raft_get_entry(raft, index);
2807 if (e->election_timer) {
2808 VLOG_INFO("Election timer changed from %"PRIu64" to %"PRIu64,
2809 raft->election_timer, e->election_timer);
2810 raft->election_timer = e->election_timer;
2811 }
2812 }
5a9b53a5
HZ
2813 /* Check if any pending command can be completed, and complete it.
2814 * This can happen when leader fail-over before sending
2815 * execute_command_reply. */
2816 const struct uuid *eid = raft_get_eid(raft, new_commit_index);
2817 struct raft_command *cmd = raft_find_command_by_eid(raft, eid);
2818 if (cmd) {
2819 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2820 VLOG_INFO_RL(&rl,
2821 "Command completed without reply (eid: "UUID_FMT", "
2822 "commit index: %"PRIu64")",
2823 UUID_ARGS(eid), new_commit_index);
2824 cmd->index = new_commit_index;
2825 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2826 }
1b1d2e6d
BP
2827 }
2828
2829 /* Write the commit index to the log. The next time we restart, this
2830 * allows us to start exporting a reasonably fresh log, instead of a log
2831 * that only contains the snapshot. */
2832 struct raft_record r = {
2833 .type = RAFT_REC_COMMIT_INDEX,
2834 .commit_index = raft->commit_index,
2835 };
2836 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
0f954f32 2837 return true;
1b1d2e6d
BP
2838}
2839
2840/* This doesn't use rq->entries (but it does use rq->n_entries). */
2841static void
2842raft_send_append_reply(struct raft *raft, const struct raft_append_request *rq,
2843 enum raft_append_result result, const char *comment)
2844{
2845 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2846 * min(leaderCommit, index of last new entry)" */
2847 if (result == RAFT_APPEND_OK && rq->leader_commit > raft->commit_index) {
2848 raft_update_commit_index(
2849 raft, MIN(rq->leader_commit, rq->prev_log_index + rq->n_entries));
2850 }
2851
2852 /* Send reply. */
2853 union raft_rpc reply = {
2854 .append_reply = {
2855 .common = {
2856 .type = RAFT_RPC_APPEND_REPLY,
2857 .sid = rq->common.sid,
2858 .comment = CONST_CAST(char *, comment),
2859 },
2860 .term = raft->term,
2861 .log_end = raft->log_end,
2862 .prev_log_index = rq->prev_log_index,
2863 .prev_log_term = rq->prev_log_term,
2864 .n_entries = rq->n_entries,
2865 .result = result,
2866 }
2867 };
2868 raft_send(raft, &reply);
2869}
2870
2871/* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2872 * NULL. Otherwise, returns an explanation for the mismatch. */
2873static const char *
2874match_index_and_term(const struct raft *raft,
2875 uint64_t prev_log_index, uint64_t prev_log_term)
2876{
2877 if (prev_log_index < raft->log_start - 1) {
2878 return "mismatch before start of log";
2879 } else if (prev_log_index == raft->log_start - 1) {
2880 if (prev_log_term != raft->snap.term) {
2881 return "prev_term mismatch";
2882 }
2883 } else if (prev_log_index < raft->log_end) {
2884 if (raft->entries[prev_log_index - raft->log_start].term
2885 != prev_log_term) {
2886 return "term mismatch";
2887 }
2888 } else {
2889 /* prev_log_index >= raft->log_end */
2890 return "mismatch past end of log";
2891 }
2892 return NULL;
2893}
2894
2895static void
2896raft_handle_append_entries(struct raft *raft,
2897 const struct raft_append_request *rq,
2898 uint64_t prev_log_index, uint64_t prev_log_term,
2899 const struct raft_entry *entries,
2900 unsigned int n_entries)
2901{
2902 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2903 * the index and term of the entry in its log that immediately precedes
2904 * the new entries. If the follower does not find an entry in its log
2905 * with the same index and term, then it refuses the new entries." */
2906 const char *mismatch = match_index_and_term(raft, prev_log_index,
2907 prev_log_term);
2908 if (mismatch) {
2909 VLOG_INFO("rejecting append_request because previous entry "
2910 "%"PRIu64",%"PRIu64" not in local log (%s)",
2911 prev_log_term, prev_log_index, mismatch);
2912 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY, mismatch);
2913 return;
2914 }
2915
2916 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2917 * index but different terms), delete the existing entry and all that
2918 * follow it." */
2919 unsigned int i;
2920 bool servers_changed = false;
2921 for (i = 0; ; i++) {
2922 if (i >= n_entries) {
2923 /* No change. */
2924 if (rq->common.comment
2925 && !strcmp(rq->common.comment, "heartbeat")) {
2926 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "heartbeat");
2927 } else {
2928 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2929 }
2930 return;
2931 }
2932
2933 uint64_t log_index = (prev_log_index + 1) + i;
2934 if (log_index >= raft->log_end) {
2935 break;
2936 }
2937 if (raft->entries[log_index - raft->log_start].term
2938 != entries[i].term) {
2939 if (raft_truncate(raft, log_index)) {
2940 servers_changed = true;
2941 }
2942 break;
2943 }
2944 }
2945
67dba070
HZ
2946 if (failure_test == FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE) {
2947 ovs_fatal(0, "Raft test: crash after receiving append_request with "
2948 "update.");
2949 }
1b1d2e6d
BP
2950 /* Figure 3.1: "Append any entries not already in the log." */
2951 struct ovsdb_error *error = NULL;
2952 bool any_written = false;
2953 for (; i < n_entries; i++) {
2954 const struct raft_entry *e = &entries[i];
2955 error = raft_write_entry(raft, e->term,
2956 json_nullable_clone(e->data), &e->eid,
8e354614
HZ
2957 json_nullable_clone(e->servers),
2958 e->election_timer);
1b1d2e6d
BP
2959 if (error) {
2960 break;
2961 }
2962 any_written = true;
2963 if (e->servers) {
2964 servers_changed = true;
2965 }
2966 }
2967
2968 if (any_written) {
2969 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index
2970 = raft->log_end - 1;
2971 }
2972 if (servers_changed) {
2973 /* The set of servers might have changed; check. */
2974 raft_get_servers_from_log(raft, VLL_INFO);
2975 }
2976
2977 if (error) {
2978 char *s = ovsdb_error_to_string_free(error);
2979 VLOG_ERR("%s", s);
2980 free(s);
2981 raft_send_append_reply(raft, rq, RAFT_APPEND_IO_ERROR, "I/O error");
2982 return;
2983 }
2984
2985 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "log updated");
2986}
2987
2988static bool
2989raft_update_leader(struct raft *raft, const struct uuid *sid)
2990{
2991 if (raft->role == RAFT_LEADER) {
2992 char buf[SID_LEN + 1];
2993 VLOG_ERR("this server is leader but server %s claims to be",
2994 raft_get_nickname(raft, sid, buf, sizeof buf));
2995 return false;
2996 } else if (!uuid_equals(sid, &raft->leader_sid)) {
2997 if (!uuid_is_zero(&raft->leader_sid)) {
2998 char buf1[SID_LEN + 1];
2999 char buf2[SID_LEN + 1];
3000 VLOG_ERR("leader for term %"PRIu64" changed from %s to %s",
3001 raft->term,
3002 raft_get_nickname(raft, &raft->leader_sid,
3003 buf1, sizeof buf1),
3004 raft_get_nickname(raft, sid, buf2, sizeof buf2));
3005 } else {
3006 char buf[SID_LEN + 1];
3007 VLOG_INFO("server %s is leader for term %"PRIu64,
3008 raft_get_nickname(raft, sid, buf, sizeof buf),
3009 raft->term);
3010 }
923f01ca 3011 raft_set_leader(raft, sid);
1b1d2e6d
BP
3012
3013 /* Record the leader to the log. This is not used by the algorithm
3014 * (although it could be, for quick restart), but it is used for
3015 * offline analysis to check for conformance with the properties
3016 * that Raft guarantees. */
3017 struct raft_record r = {
3018 .type = RAFT_REC_LEADER,
3019 .term = raft->term,
3020 .sid = *sid,
3021 };
3022 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
3023 }
93ee4209
HZ
3024 if (raft->role == RAFT_CANDIDATE) {
3025 /* Section 3.4: While waiting for votes, a candidate may
3026 * receive an AppendEntries RPC from another server claiming to
3027 * be leader. If the leader’s term (included in its RPC) is at
3028 * least as large as the candidate’s current term, then the
3029 * candidate recognizes the leader as legitimate and returns to
3030 * follower state. */
3031 raft->role = RAFT_FOLLOWER;
3032 }
1b1d2e6d
BP
3033 return true;
3034}
3035
3036static void
3037raft_handle_append_request(struct raft *raft,
3038 const struct raft_append_request *rq)
3039{
3040 /* We do not check whether the server that sent the request is part of the
3041 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
3042 * from a leader that is not part of the server’s latest configuration.
3043 * Otherwise, a new server could never be added to the cluster (it would
3044 * never accept any log entries preceding the configuration entry that adds
3045 * the server)." */
3046 if (!raft_update_leader(raft, &rq->common.sid)) {
3047 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
3048 "usurped leadership");
3049 return;
3050 }
0f954f32 3051 raft_reset_election_timer(raft);
1b1d2e6d
BP
3052
3053 /* First check for the common case, where the AppendEntries request is
3054 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
3055 * like this:
3056 *
3057 * rq->prev_log_index
3058 * | first_entry_index
3059 * | | nth_entry_index
3060 * | | |
3061 * v v v
3062 * +---+---+---+---+
3063 * T | T | T | T | T |
3064 * +---+-------+---+
3065 * +---+---+---+---+
3066 * T | T | T | T | T |
3067 * +---+---+---+---+
3068 * ^ ^
3069 * | |
3070 * log_start log_end
3071 * */
3072 uint64_t first_entry_index = rq->prev_log_index + 1;
3073 uint64_t nth_entry_index = rq->prev_log_index + rq->n_entries;
3074 if (OVS_LIKELY(first_entry_index >= raft->log_start)) {
3075 raft_handle_append_entries(raft, rq,
3076 rq->prev_log_index, rq->prev_log_term,
3077 rq->entries, rq->n_entries);
3078 return;
3079 }
3080
3081 /* Now a series of checks for odd cases, where the AppendEntries request
3082 * extends earlier than the beginning of our log, into the log entries
3083 * discarded by the most recent snapshot. */
3084
3085 /*
3086 * Handle the case where the indexes covered by rq->entries[] are entirely
3087 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
3088 * everything in the AppendEntries request must already have been
3089 * committed, and we might as well return true.
3090 *
3091 * rq->prev_log_index
3092 * | first_entry_index
3093 * | | nth_entry_index
3094 * | | |
3095 * v v v
3096 * +---+---+---+---+
3097 * T | T | T | T | T |
3098 * +---+-------+---+
3099 * +---+---+---+---+
3100 * T | T | T | T | T |
3101 * +---+---+---+---+
3102 * ^ ^
3103 * | |
3104 * log_start log_end
3105 */
3106 if (nth_entry_index < raft->log_start - 1) {
3107 raft_send_append_reply(raft, rq, RAFT_APPEND_OK,
3108 "append before log start");
3109 return;
3110 }
3111
3112 /*
3113 * Handle the case where the last entry in rq->entries[] has the same index
3114 * as 'log_start - 1', so we can compare their terms:
3115 *
3116 * rq->prev_log_index
3117 * | first_entry_index
3118 * | | nth_entry_index
3119 * | | |
3120 * v v v
3121 * +---+---+---+---+
3122 * T | T | T | T | T |
3123 * +---+-------+---+
3124 * +---+---+---+---+
3125 * T | T | T | T | T |
3126 * +---+---+---+---+
3127 * ^ ^
3128 * | |
3129 * log_start log_end
3130 *
3131 * There's actually a sub-case where rq->n_entries == 0, in which we
3132 * compare rq->prev_term:
3133 *
3134 * rq->prev_log_index
3135 * |
3136 * |
3137 * |
3138 * v
3139 * T
3140 *
3141 * +---+---+---+---+
3142 * T | T | T | T | T |
3143 * +---+---+---+---+
3144 * ^ ^
3145 * | |
3146 * log_start log_end
3147 */
3148 if (nth_entry_index == raft->log_start - 1) {
3149 if (rq->n_entries
3150 ? raft->snap.term == rq->entries[rq->n_entries - 1].term
3151 : raft->snap.term == rq->prev_log_term) {
3152 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
3153 } else {
3154 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
3155 "term mismatch");
3156 }
3157 return;
3158 }
3159
3160 /*
3161 * We now know that the data in rq->entries[] overlaps the data in
3162 * raft->entries[], as shown below, with some positive 'ofs':
3163 *
3164 * rq->prev_log_index
3165 * | first_entry_index
3166 * | | nth_entry_index
3167 * | | |
3168 * v v v
3169 * +---+---+---+---+---+
3170 * T | T | T | T | T | T |
3171 * +---+-------+---+---+
3172 * +---+---+---+---+
3173 * T | T | T | T | T |
3174 * +---+---+---+---+
3175 * ^ ^
3176 * | |
3177 * log_start log_end
3178 *
3179 * |<-- ofs -->|
3180 *
3181 * We transform this into the following by trimming the first 'ofs'
3182 * elements off of rq->entries[], ending up with the following. Notice how
3183 * we retain the term but not the data for rq->entries[ofs - 1]:
3184 *
3185 * first_entry_index + ofs - 1
3186 * | first_entry_index + ofs
3187 * | | nth_entry_index + ofs
3188 * | | |
3189 * v v v
3190 * +---+---+
3191 * T | T | T |
3192 * +---+---+
3193 * +---+---+---+---+
3194 * T | T | T | T | T |
3195 * +---+---+---+---+
3196 * ^ ^
3197 * | |
3198 * log_start log_end
3199 */
3200 uint64_t ofs = raft->log_start - first_entry_index;
3201 raft_handle_append_entries(raft, rq,
3202 raft->log_start - 1, rq->entries[ofs - 1].term,
3203 &rq->entries[ofs], rq->n_entries - ofs);
3204}
3205
3206/* Returns true if 'raft' has another log entry or snapshot to read. */
3207bool
3208raft_has_next_entry(const struct raft *raft_)
3209{
3210 struct raft *raft = CONST_CAST(struct raft *, raft_);
3211 struct uuid eid;
3212 return raft_peek_next_entry(raft, &eid) != NULL;
3213}
3214
3215/* Returns the next log entry or snapshot from 'raft', or NULL if there are
3216 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
3217 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
3218 * log entry. */
3219const struct json *
3220raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
3221{
3222 const struct json *data = raft_get_next_entry(raft, eid);
3223 *is_snapshot = data == raft->snap.data;
3224 return data;
3225}
3226
3227/* Returns the log index of the last-read snapshot or log entry. */
3228uint64_t
3229raft_get_applied_index(const struct raft *raft)
3230{
3231 return raft->last_applied;
3232}
3233
3234/* Returns the log index of the last snapshot or log entry that is available to
3235 * be read. */
3236uint64_t
3237raft_get_commit_index(const struct raft *raft)
3238{
3239 return raft->commit_index;
3240}
3241
3242static struct raft_server *
3243raft_find_peer(struct raft *raft, const struct uuid *uuid)
3244{
3245 struct raft_server *s = raft_find_server(raft, uuid);
3246 return s && !uuid_equals(&raft->sid, &s->sid) ? s : NULL;
3247}
3248
3249static struct raft_server *
3250raft_find_new_server(struct raft *raft, const struct uuid *uuid)
3251{
3252 return raft_server_find(&raft->add_servers, uuid);
3253}
3254
3255/* Figure 3.1: "If there exists an N such that N > commitIndex, a
3256 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
3257 * commitIndex = N (sections 3.5 and 3.6)." */
3258static void
3259raft_consider_updating_commit_index(struct raft *raft)
3260{
3261 /* This loop cannot just bail out when it comes across a log entry that
3262 * does not match the criteria. For example, Figure 3.7(d2) shows a
3263 * case where the log entry for term 2 cannot be committed directly
3264 * (because it is not for the current term) but it can be committed as
3265 * a side effect of commit the entry for term 4 (the current term).
3266 * XXX Is there a more efficient way to do this? */
3267 ovs_assert(raft->role == RAFT_LEADER);
3268
3269 uint64_t new_commit_index = raft->commit_index;
3270 for (uint64_t idx = MAX(raft->commit_index + 1, raft->log_start);
3271 idx < raft->log_end; idx++) {
3272 if (raft->entries[idx - raft->log_start].term == raft->term) {
3273 size_t count = 0;
3274 struct raft_server *s2;
3275 HMAP_FOR_EACH (s2, hmap_node, &raft->servers) {
3276 if (s2->match_index >= idx) {
3277 count++;
3278 }
3279 }
3280 if (count > hmap_count(&raft->servers) / 2) {
3281 VLOG_DBG("index %"PRIu64" committed to %"PRIuSIZE" servers, "
3282 "applying", idx, count);
3283 new_commit_index = idx;
3284 }
3285 }
3286 }
0f954f32
HZ
3287 if (raft_update_commit_index(raft, new_commit_index)) {
3288 raft_send_heartbeats(raft);
3289 }
1b1d2e6d
BP
3290}
3291
3292static void
3293raft_update_match_index(struct raft *raft, struct raft_server *s,
3294 uint64_t min_index)
3295{
3296 ovs_assert(raft->role == RAFT_LEADER);
3297 if (min_index > s->match_index) {
3298 s->match_index = min_index;
3299 raft_consider_updating_commit_index(raft);
3300 }
3301}
3302
3303static void
3304raft_update_our_match_index(struct raft *raft, uint64_t min_index)
3305{
e8208c66
BP
3306 struct raft_server *server = raft_find_server(raft, &raft->sid);
3307 if (server) {
3308 raft_update_match_index(raft, server, min_index);
3309 }
1b1d2e6d
BP
3310}
3311
3312static void
3313raft_send_install_snapshot_request(struct raft *raft,
3314 const struct raft_server *s,
3315 const char *comment)
3316{
3317 union raft_rpc rpc = {
3318 .install_snapshot_request = {
3319 .common = {
3320 .type = RAFT_RPC_INSTALL_SNAPSHOT_REQUEST,
3321 .sid = s->sid,
3322 .comment = CONST_CAST(char *, comment),
3323 },
3324 .term = raft->term,
3325 .last_index = raft->log_start - 1,
3326 .last_term = raft->snap.term,
3327 .last_servers = raft->snap.servers,
3328 .last_eid = raft->snap.eid,
3329 .data = raft->snap.data,
9bfb280a 3330 .election_timer = raft->election_timer, /* use latest value */
1b1d2e6d
BP
3331 }
3332 };
8c2c503b 3333
83fbd2e9
IM
3334 if (s->install_snapshot_request_in_progress) {
3335 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
8c2c503b 3336
83fbd2e9
IM
3337 VLOG_INFO_RL(&rl, "not sending snapshot to server %s, "
3338 "already in progress", s->nickname);
3339 return;
8c2c503b 3340 }
8c2c503b 3341
83fbd2e9
IM
3342 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3343 VLOG_INFO_RL(&rl, "sending snapshot to server %s, %"PRIu64":%"PRIu64".",
3344 s->nickname, raft->term, raft->log_start - 1);
3345 CONST_CAST(struct raft_server *, s)->install_snapshot_request_in_progress
3346 = raft_send(raft, &rpc);
1b1d2e6d
BP
3347}
3348
3349static void
3350raft_handle_append_reply(struct raft *raft,
3351 const struct raft_append_reply *rpy)
3352{
3353 if (raft->role != RAFT_LEADER) {
3354 VLOG_INFO("rejected append_reply (not leader)");
3355 return;
3356 }
3357
3358 /* Most commonly we'd be getting an AppendEntries reply from a configured
3359 * server (e.g. a peer), but we can also get them from servers in the
3360 * process of being added. */
3361 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3362 if (!s) {
3363 s = raft_find_new_server(raft, &rpy->common.sid);
3364 if (!s) {
3365 VLOG_INFO("rejected append_reply from unknown server "SID_FMT,
3366 SID_ARGS(&rpy->common.sid));
3367 return;
3368 }
3369 }
3370
89771c1e 3371 s->replied = true;
1b1d2e6d
BP
3372 if (rpy->result == RAFT_APPEND_OK) {
3373 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3374 * follower (section 3.5)." */
3375 uint64_t min_index = rpy->prev_log_index + rpy->n_entries + 1;
3376 if (s->next_index < min_index) {
3377 s->next_index = min_index;
3378 }
3379 raft_update_match_index(raft, s, min_index - 1);
3380 } else {
3381 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3382 * decrement nextIndex and retry (section 3.5)."
3383 *
3384 * We also implement the optimization suggested in section 4.2.1:
3385 * "Various approaches can make nextIndex converge to its correct value
3386 * more quickly, including those described in Chapter 3. The simplest
3387 * approach to solving this particular problem of adding a new server,
3388 * however, is to have followers return the length of their logs in the
3389 * AppendEntries response; this allows the leader to cap the follower’s
3390 * nextIndex accordingly." */
3391 s->next_index = (s->next_index > 0
3392 ? MIN(s->next_index - 1, rpy->log_end)
3393 : 0);
3394
3395 if (rpy->result == RAFT_APPEND_IO_ERROR) {
3396 /* Append failed but not because of a log inconsistency. Because
3397 * of the I/O error, there's no point in re-sending the append
3398 * immediately. */
3399 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3400 VLOG_INFO_RL(&rl, "%s reported I/O error", s->nickname);
3401 return;
3402 }
3403 }
3404
3405 /*
3406 * Our behavior here must depend on the value of next_index relative to
3407 * log_start and log_end. There are three cases:
3408 *
3409 * Case 1 | Case 2 | Case 3
3410 * <---------------->|<------------->|<------------------>
3411 * | |
3412 *
3413 * +---+---+---+---+
3414 * T | T | T | T | T |
3415 * +---+---+---+---+
3416 * ^ ^
3417 * | |
3418 * log_start log_end
3419 */
3420 if (s->next_index < raft->log_start) {
3421 /* Case 1. */
3422 raft_send_install_snapshot_request(raft, s, NULL);
3423 } else if (s->next_index < raft->log_end) {
3424 /* Case 2. */
99c2dc8d 3425 raft_send_append_request(raft, s, raft->log_end - s->next_index, NULL);
1b1d2e6d
BP
3426 } else {
3427 /* Case 3. */
3428 if (s->phase == RAFT_PHASE_CATCHUP) {
3429 s->phase = RAFT_PHASE_CAUGHT_UP;
3430 raft_run_reconfigure(raft);
3431 }
3432 }
3433}
3434
3435static bool
3436raft_should_suppress_disruptive_server(struct raft *raft,
3437 const union raft_rpc *rpc)
3438{
3439 if (rpc->type != RAFT_RPC_VOTE_REQUEST) {
3440 return false;
3441 }
3442
3443 /* Section 4.2.3 "Disruptive Servers" says:
3444 *
3445 * ...if a server receives a RequestVote request within the minimum
3446 * election timeout of hearing from a current leader, it does not update
3447 * its term or grant its vote...
3448 *
3449 * ...This change conflicts with the leadership transfer mechanism as
3450 * described in Chapter 3, in which a server legitimately starts an
3451 * election without waiting an election timeout. In that case,
3452 * RequestVote messages should be processed by other servers even when
3453 * they believe a current cluster leader exists. Those RequestVote
3454 * requests can include a special flag to indicate this behavior (“I
3455 * have permission to disrupt the leader--it told me to!”).
3456 *
3457 * This clearly describes how the followers should act, but not the leader.
3458 * We just ignore vote requests that arrive at a current leader. This
3459 * seems to be fairly safe, since a majority other than the current leader
3460 * can still elect a new leader and the first AppendEntries from that new
3461 * leader will depose the current leader. */
3462 const struct raft_vote_request *rq = raft_vote_request_cast(rpc);
3463 if (rq->leadership_transfer) {
3464 return false;
3465 }
3466
3467 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3468 long long int now = time_msec();
3469 switch (raft->role) {
3470 case RAFT_LEADER:
3471 VLOG_WARN_RL(&rl, "ignoring vote request received as leader");
3472 return true;
3473
3474 case RAFT_FOLLOWER:
8e354614 3475 if (now < raft->election_base + raft->election_timer) {
1b1d2e6d 3476 VLOG_WARN_RL(&rl, "ignoring vote request received after only "
8e354614
HZ
3477 "%lld ms (minimum election time is %"PRIu64" ms)",
3478 now - raft->election_base, raft->election_timer);
1b1d2e6d
BP
3479 return true;
3480 }
3481 return false;
3482
3483 case RAFT_CANDIDATE:
3484 return false;
3485
3486 default:
3487 OVS_NOT_REACHED();
3488 }
3489}
3490
3491/* Returns true if a reply should be sent. */
3492static bool
3493raft_handle_vote_request__(struct raft *raft,
3494 const struct raft_vote_request *rq)
3495{
3496 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3497 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3498 * 3.6)." */
3499 if (uuid_equals(&raft->vote, &rq->common.sid)) {
3500 /* Already voted for this candidate in this term. Resend vote. */
3501 return true;
3502 } else if (!uuid_is_zero(&raft->vote)) {
3503 /* Already voted for different candidate in this term. Send a reply
3504 * saying what candidate we did vote for. This isn't a necessary part
3505 * of the Raft protocol but it can make debugging easier. */
3506 return true;
3507 }
3508
3509 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3510 * includes information about the candidate’s log, and the voter denies its
3511 * vote if its own log is more up-to-date than that of the candidate. Raft
3512 * determines which of two logs is more up-to-date by comparing the index
3513 * and term of the last entries in the logs. If the logs have last entries
3514 * with different terms, then the log with the later term is more
3515 * up-to-date. If the logs end with the same term, then whichever log is
3516 * longer is more up-to-date." */
3517 uint64_t last_term = (raft->log_end > raft->log_start
3518 ? raft->entries[raft->log_end - 1
3519 - raft->log_start].term
3520 : raft->snap.term);
3521 if (last_term > rq->last_log_term
3522 || (last_term == rq->last_log_term
3523 && raft->log_end - 1 > rq->last_log_index)) {
3524 /* Our log is more up-to-date than the peer's. Withhold vote. */
3525 return false;
3526 }
3527
3528 /* Record a vote for the peer. */
3529 if (!raft_set_term(raft, raft->term, &rq->common.sid)) {
3530 return false;
3531 }
3532
0f954f32 3533 raft_reset_election_timer(raft);
1b1d2e6d
BP
3534
3535 return true;
3536}
3537
3538static void
3539raft_send_vote_reply(struct raft *raft, const struct uuid *dst,
3540 const struct uuid *vote)
3541{
3542 union raft_rpc rpy = {
3543 .vote_reply = {
3544 .common = {
3545 .type = RAFT_RPC_VOTE_REPLY,
3546 .sid = *dst,
3547 },
3548 .term = raft->term,
3549 .vote = *vote,
3550 },
3551 };
3552 raft_send(raft, &rpy);
3553}
3554
3555static void
3556raft_handle_vote_request(struct raft *raft,
3557 const struct raft_vote_request *rq)
3558{
3559 if (raft_handle_vote_request__(raft, rq)) {
3560 raft_send_vote_reply(raft, &rq->common.sid, &raft->vote);
3561 }
3562}
3563
3564static void
3565raft_handle_vote_reply(struct raft *raft,
3566 const struct raft_vote_reply *rpy)
3567{
3568 if (!raft_receive_term__(raft, &rpy->common, rpy->term)) {
3569 return;
3570 }
3571
3572 if (raft->role != RAFT_CANDIDATE) {
3573 return;
3574 }
3575
3576 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3577 if (s) {
3578 raft_accept_vote(raft, s, &rpy->vote);
3579 }
3580}
3581
3582/* Returns true if 'raft''s log contains reconfiguration entries that have not
3583 * yet been committed. */
3584static bool
3585raft_has_uncommitted_configuration(const struct raft *raft)
3586{
3587 for (uint64_t i = raft->commit_index + 1; i < raft->log_end; i++) {
3588 ovs_assert(i >= raft->log_start);
3589 const struct raft_entry *e = &raft->entries[i - raft->log_start];
3590 if (e->servers) {
3591 return true;
3592 }
3593 }
3594 return false;
3595}
3596
3597static void
3598raft_log_reconfiguration(struct raft *raft)
3599{
3600 struct json *servers_json = raft_servers_to_json(&raft->servers);
3601 raft_command_unref(raft_command_execute__(
8e354614 3602 raft, NULL, servers_json, 0, NULL, NULL));
1b1d2e6d
BP
3603 json_destroy(servers_json);
3604}
3605
3606static void
3607raft_run_reconfigure(struct raft *raft)
3608{
3609 ovs_assert(raft->role == RAFT_LEADER);
3610
3611 /* Reconfiguration only progresses when configuration changes commit. */
3612 if (raft_has_uncommitted_configuration(raft)) {
3613 return;
3614 }
3615
3616 /* If we were waiting for a configuration change to commit, it's done. */
3617 struct raft_server *s;
3618 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3619 if (s->phase == RAFT_PHASE_COMMITTING) {
3620 raft_send_add_server_reply__(raft, &s->sid, s->address,
3621 true, RAFT_SERVER_COMPLETED);
3622 s->phase = RAFT_PHASE_STABLE;
3623 }
3624 }
3625 if (raft->remove_server) {
3626 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
3627 &raft->remove_server->requester_sid,
3628 raft->remove_server->requester_conn,
3629 true, RAFT_SERVER_COMPLETED);
3630 raft_server_destroy(raft->remove_server);
3631 raft->remove_server = NULL;
3632 }
3633
3634 /* If a new server is caught up, add it to the configuration. */
3635 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
3636 if (s->phase == RAFT_PHASE_CAUGHT_UP) {
3637 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3638 hmap_remove(&raft->add_servers, &s->hmap_node);
3639 hmap_insert(&raft->servers, &s->hmap_node, uuid_hash(&s->sid));
3640
3641 /* Mark 's' as waiting for commit. */
3642 s->phase = RAFT_PHASE_COMMITTING;
3643
3644 raft_log_reconfiguration(raft);
3645
3646 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3647 * send a RAFT_SERVER_OK reply. */
3648
3649 return;
3650 }
3651 }
3652
3653 /* Remove a server, if one is scheduled for removal. */
3654 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3655 if (s->phase == RAFT_PHASE_REMOVE) {
3656 hmap_remove(&raft->servers, &s->hmap_node);
3657 raft->remove_server = s;
3658
3659 raft_log_reconfiguration(raft);
3660
3661 return;
3662 }
3663 }
3664}
3665
3666static void
3667raft_handle_add_server_request(struct raft *raft,
3668 const struct raft_add_server_request *rq)
3669{
3670 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3671 if (raft->role != RAFT_LEADER) {
3672 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3673 return;
3674 }
3675
3676 /* Check for an existing server. */
3677 struct raft_server *s = raft_find_server(raft, &rq->common.sid);
3678 if (s) {
3679 /* If the server is scheduled to be removed, cancel it. */
3680 if (s->phase == RAFT_PHASE_REMOVE) {
3681 s->phase = RAFT_PHASE_STABLE;
3682 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_CANCELED);
3683 return;
3684 }
3685
3686 /* If the server is being added, then it's in progress. */
3687 if (s->phase != RAFT_PHASE_STABLE) {
3688 raft_send_add_server_reply(raft, rq,
3689 false, RAFT_SERVER_IN_PROGRESS);
3690 }
3691
3692 /* Nothing to do--server is already part of the configuration. */
3693 raft_send_add_server_reply(raft, rq,
3694 true, RAFT_SERVER_ALREADY_PRESENT);
3695 return;
3696 }
3697
3698 /* Check for a server being removed. */
3699 if (raft->remove_server
3700 && uuid_equals(&rq->common.sid, &raft->remove_server->sid)) {
3701 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3702 return;
3703 }
3704
3705 /* Check for a server already being added. */
3706 if (raft_find_new_server(raft, &rq->common.sid)) {
3707 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_IN_PROGRESS);
3708 return;
3709 }
3710
3711 /* Add server to 'add_servers'. */
3712 s = raft_server_add(&raft->add_servers, &rq->common.sid, rq->address);
3713 raft_server_init_leader(raft, s);
3714 s->requester_sid = rq->common.sid;
3715 s->requester_conn = NULL;
3716 s->phase = RAFT_PHASE_CATCHUP;
3717
3718 /* Start sending the log. If this is the first time we've tried to add
3719 * this server, then this will quickly degenerate into an InstallSnapshot
3720 * followed by a series of AddEntries, but if it's a retry of an earlier
3721 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3722 * leadership) then it will gracefully resume populating the log.
3723 *
3724 * See the last few paragraphs of section 4.2.1 for further insight. */
3725 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3726 VLOG_INFO_RL(&rl,
3727 "starting to add server %s ("SID_FMT" at %s) "
3728 "to cluster "CID_FMT, s->nickname, SID_ARGS(&s->sid),
3729 rq->address, CID_ARGS(&raft->cid));
3730 raft_send_append_request(raft, s, 0, "initialize new server");
3731}
3732
3733static void
3734raft_handle_add_server_reply(struct raft *raft,
3735 const struct raft_add_server_reply *rpy)
3736{
3737 if (!raft->joining) {
3738 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3739 VLOG_WARN_RL(&rl, "received add_server_reply even though we're "
3740 "already part of the cluster");
3741 return;
3742 }
3743
3744 if (rpy->success) {
3745 raft->joining = false;
3746
3747 /* It is tempting, at this point, to check that this server is part of
3748 * the current configuration. However, this is not necessarily the
3749 * case, because the log entry that added this server to the cluster
3750 * might have been committed by a majority of the cluster that does not
3751 * include this one. This actually happens in testing. */
3752 } else {
3753 const char *address;
3754 SSET_FOR_EACH (address, &rpy->remote_addresses) {
3755 if (sset_add(&raft->remote_addresses, address)) {
3756 VLOG_INFO("%s: learned new server address for joining cluster",
3757 address);
3758 }
3759 }
3760 }
3761}
3762
3763/* This is called by raft_unixctl_kick() as well as via RPC. */
3764static void
3765raft_handle_remove_server_request(struct raft *raft,
3766 const struct raft_remove_server_request *rq)
3767{
3768 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3769 if (raft->role != RAFT_LEADER) {
3770 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3771 return;
3772 }
3773
3774 /* If the server to remove is currently waiting to be added, cancel it. */
3775 struct raft_server *target = raft_find_new_server(raft, &rq->sid);
3776 if (target) {
3777 raft_send_add_server_reply__(raft, &target->sid, target->address,
3778 false, RAFT_SERVER_CANCELED);
3779 hmap_remove(&raft->add_servers, &target->hmap_node);
3780 raft_server_destroy(target);
3781 return;
3782 }
3783
3784 /* If the server isn't configured, report that. */
3785 target = raft_find_server(raft, &rq->sid);
3786 if (!target) {
3787 raft_send_remove_server_reply(raft, rq,
3788 true, RAFT_SERVER_ALREADY_GONE);
3789 return;
3790 }
3791
3792 /* Check whether we're waiting for the addition of the server to commit. */
3793 if (target->phase == RAFT_PHASE_COMMITTING) {
3794 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3795 return;
3796 }
3797
3798 /* Check whether the server is already scheduled for removal. */
3799 if (target->phase == RAFT_PHASE_REMOVE) {
3800 raft_send_remove_server_reply(raft, rq,
3801 false, RAFT_SERVER_IN_PROGRESS);
3802 return;
3803 }
3804
3805 /* Make sure that if we remove this server then that at least one other
3806 * server will be left. We don't count servers currently being added (in
3807 * 'add_servers') since those could fail. */
3808 struct raft_server *s;
3809 int n = 0;
3810 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3811 if (s != target && s->phase != RAFT_PHASE_REMOVE) {
3812 n++;
3813 }
3814 }
3815 if (!n) {
3816 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_EMPTY);
3817 return;
3818 }
3819
3820 /* Mark the server for removal. */
3821 target->phase = RAFT_PHASE_REMOVE;
3822 if (rq->requester_conn) {
3823 target->requester_sid = UUID_ZERO;
3824 unixctl_command_reply(rq->requester_conn, "started removal");
3825 } else {
3826 target->requester_sid = rq->common.sid;
3827 target->requester_conn = NULL;
3828 }
3829
3830 raft_run_reconfigure(raft);
3831 /* Operation in progress, reply will be sent later. */
3832}
3833
3834static void
17bd4149 3835raft_finished_leaving_cluster(struct raft *raft)
1b1d2e6d 3836{
17bd4149
BP
3837 VLOG_INFO(SID_FMT": finished leaving cluster "CID_FMT,
3838 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1b1d2e6d 3839
17bd4149
BP
3840 raft_record_note(raft, "left", "this server left the cluster");
3841
3842 raft->leaving = false;
3843 raft->left = true;
3844}
1b1d2e6d 3845
17bd4149
BP
3846static void
3847raft_handle_remove_server_reply(struct raft *raft,
3848 const struct raft_remove_server_reply *rpc)
3849{
3850 if (rpc->success
3851 && (uuid_is_zero(&rpc->target_sid)
3852 || uuid_equals(&rpc->target_sid, &raft->sid))) {
3853 raft_finished_leaving_cluster(raft);
1b1d2e6d
BP
3854 }
3855}
3856
3857static bool
3858raft_handle_write_error(struct raft *raft, struct ovsdb_error *error)
3859{
3860 if (error && !raft->failed) {
3861 raft->failed = true;
3862
3863 char *s = ovsdb_error_to_string_free(error);
3864 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3865 raft->name, s);
3866 free(s);
3867 }
3868 return !raft->failed;
3869}
3870
3871static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3872raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
3873 uint64_t new_log_start,
3874 const struct raft_entry *new_snapshot)
3875{
3876 struct raft_header h = {
3877 .sid = raft->sid,
3878 .cid = raft->cid,
3879 .name = raft->name,
3880 .local_address = raft->local_address,
3881 .snap_index = new_log_start - 1,
3882 .snap = *new_snapshot,
3883 };
3884 struct ovsdb_error *error = ovsdb_log_write_and_free(
3885 log, raft_header_to_json(&h));
3886 if (error) {
3887 return error;
3888 }
3889 ovsdb_log_mark_base(raft->log);
3890
3891 /* Write log records. */
3892 for (uint64_t index = new_log_start; index < raft->log_end; index++) {
3893 const struct raft_entry *e = &raft->entries[index - raft->log_start];
3894 struct raft_record r = {
3895 .type = RAFT_REC_ENTRY,
3896 .term = e->term,
3897 .entry = {
3898 .index = index,
3899 .data = e->data,
3900 .servers = e->servers,
8e354614 3901 .election_timer = e->election_timer,
1b1d2e6d
BP
3902 .eid = e->eid,
3903 },
3904 };
3905 error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3906 if (error) {
3907 return error;
3908 }
3909 }
3910
3911 /* Write term and vote (if any).
3912 *
3913 * The term is redundant if we wrote a log record for that term above. The
3914 * vote, if any, is never redundant.
3915 */
3916 error = raft_write_state(log, raft->term, &raft->vote);
3917 if (error) {
3918 return error;
3919 }
3920
3921 /* Write commit_index if it's beyond the new start of the log. */
3922 if (raft->commit_index >= new_log_start) {
3923 struct raft_record r = {
3924 .type = RAFT_REC_COMMIT_INDEX,
3925 .commit_index = raft->commit_index,
3926 };
3927 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3928 }
3929 return NULL;
3930}
3931
3932static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3933raft_save_snapshot(struct raft *raft,
3934 uint64_t new_start, const struct raft_entry *new_snapshot)
3935
3936{
3937 struct ovsdb_log *new_log;
3938 struct ovsdb_error *error;
3939 error = ovsdb_log_replace_start(raft->log, &new_log);
3940 if (error) {
3941 return error;
3942 }
3943
3944 error = raft_write_snapshot(raft, new_log, new_start, new_snapshot);
3945 if (error) {
3946 ovsdb_log_replace_abort(new_log);
3947 return error;
3948 }
3949
3950 return ovsdb_log_replace_commit(raft->log, new_log);
3951}
3952
3953static bool
3954raft_handle_install_snapshot_request__(
3955 struct raft *raft, const struct raft_install_snapshot_request *rq)
3956{
0f954f32 3957 raft_reset_election_timer(raft);
1b1d2e6d
BP
3958
3959 /*
3960 * Our behavior here depend on new_log_start in the snapshot compared to
3961 * log_start and log_end. There are three cases:
3962 *
3963 * Case 1 | Case 2 | Case 3
3964 * <---------------->|<------------->|<------------------>
3965 * | |
3966 *
3967 * +---+---+---+---+
3968 * T | T | T | T | T |
3969 * +---+---+---+---+
3970 * ^ ^
3971 * | |
3972 * log_start log_end
3973 */
3974 uint64_t new_log_start = rq->last_index + 1;
3975 if (new_log_start < raft->log_start) {
3976 /* Case 1: The new snapshot covers less than our current one. Nothing
3977 * to do. */
3978 return true;
3979 } else if (new_log_start < raft->log_end) {
3980 /* Case 2: The new snapshot starts in the middle of our log. We could
3981 * discard the first 'new_log_start - raft->log_start' entries in the
3982 * log. But there's not much value in that, since snapshotting is
3983 * supposed to be a local decision. Just skip it. */
3984 return true;
3985 }
3986
3987 /* Case 3: The new snapshot starts past the end of our current log, so
3988 * discard all of our current log. */
3989 const struct raft_entry new_snapshot = {
3990 .term = rq->last_term,
3991 .data = rq->data,
3992 .eid = rq->last_eid,
3993 .servers = rq->last_servers,
8e354614 3994 .election_timer = rq->election_timer,
1b1d2e6d
BP
3995 };
3996 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3997 &new_snapshot);
3998 if (error) {
91bdb33e 3999 char *error_s = ovsdb_error_to_string_free(error);
1b1d2e6d
BP
4000 VLOG_WARN("could not save snapshot: %s", error_s);
4001 free(error_s);
4002 return false;
4003 }
4004
4005 for (size_t i = 0; i < raft->log_end - raft->log_start; i++) {
4006 raft_entry_uninit(&raft->entries[i]);
4007 }
4008 raft->log_start = raft->log_end = new_log_start;
4009 raft->log_synced = raft->log_end - 1;
4010 raft->commit_index = raft->log_start - 1;
4011 if (raft->last_applied < raft->commit_index) {
4012 raft->last_applied = raft->log_start - 2;
4013 }
4014
4015 raft_entry_uninit(&raft->snap);
4016 raft_entry_clone(&raft->snap, &new_snapshot);
4017
4018 raft_get_servers_from_log(raft, VLL_INFO);
8e354614 4019 raft_get_election_timer_from_log(raft);
1b1d2e6d
BP
4020
4021 return true;
4022}
4023
4024static void
4025raft_handle_install_snapshot_request(
4026 struct raft *raft, const struct raft_install_snapshot_request *rq)
4027{
4028 if (raft_handle_install_snapshot_request__(raft, rq)) {
4029 union raft_rpc rpy = {
4030 .install_snapshot_reply = {
4031 .common = {
4032 .type = RAFT_RPC_INSTALL_SNAPSHOT_REPLY,
4033 .sid = rq->common.sid,
4034 },
4035 .term = raft->term,
4036 .last_index = rq->last_index,
4037 .last_term = rq->last_term,
4038 },
4039 };
4040 raft_send(raft, &rpy);
4041 }
4042}
4043
4044static void
4045raft_handle_install_snapshot_reply(
4046 struct raft *raft, const struct raft_install_snapshot_reply *rpy)
4047{
4048 /* We might get an InstallSnapshot reply from a configured server (e.g. a
4049 * peer) or a server in the process of being added. */
4050 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
4051 if (!s) {
4052 s = raft_find_new_server(raft, &rpy->common.sid);
4053 if (!s) {
4054 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4055 VLOG_INFO_RL(&rl, "cluster "CID_FMT": received %s from "
4056 "unknown server "SID_FMT, CID_ARGS(&raft->cid),
4057 raft_rpc_type_to_string(rpy->common.type),
4058 SID_ARGS(&rpy->common.sid));
4059 return;
4060 }
4061 }
4062
83fbd2e9
IM
4063 s->install_snapshot_request_in_progress = false;
4064
1b1d2e6d
BP
4065 if (rpy->last_index != raft->log_start - 1 ||
4066 rpy->last_term != raft->snap.term) {
4067 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4068 VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s installed "
4069 "out-of-date snapshot, starting over",
4070 CID_ARGS(&raft->cid), s->nickname);
4071 raft_send_install_snapshot_request(raft, s,
4072 "installed obsolete snapshot");
4073 return;
4074 }
4075
4076 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
4077 VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
4078 " up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
4079 s->nickname, rpy->last_term, rpy->last_index);
315e88cb
HZ
4080 s->next_index = raft->log_start;
4081 raft_send_append_request(raft, s, raft->log_end - s->next_index,
4082 "snapshot installed");
1b1d2e6d
BP
4083}
4084
4085/* Returns true if 'raft' has grown enough since the last snapshot that
4086 * reducing the log to a snapshot would be valuable, false otherwise. */
4087bool
4088raft_grew_lots(const struct raft *raft)
4089{
4090 return ovsdb_log_grew_lots(raft->log);
4091}
4092
4093/* Returns the number of log entries that could be trimmed off the on-disk log
4094 * by snapshotting. */
4095uint64_t
4096raft_get_log_length(const struct raft *raft)
4097{
4098 return (raft->last_applied < raft->log_start
4099 ? 0
4100 : raft->last_applied - raft->log_start + 1);
4101}
4102
4103/* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
4104 * possible. */
4105bool
4106raft_may_snapshot(const struct raft *raft)
4107{
4108 return (!raft->joining
4109 && !raft->leaving
4110 && !raft->left
4111 && !raft->failed
4112 && raft->last_applied >= raft->log_start);
4113}
4114
4115/* Replaces the log for 'raft', up to the last log entry read, by
4116 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
4117 * the caller must eventually free.
4118 *
4119 * This function can only succeed if raft_may_snapshot() returns true. It is
4120 * only valuable to call it if raft_get_log_length() is significant and
4121 * especially if raft_grew_lots() returns true. */
4122struct ovsdb_error * OVS_WARN_UNUSED_RESULT
4123raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
4124{
4125 if (raft->joining) {
4126 return ovsdb_error(NULL,
4127 "cannot store a snapshot while joining cluster");
4128 } else if (raft->leaving) {
4129 return ovsdb_error(NULL,
4130 "cannot store a snapshot while leaving cluster");
4131 } else if (raft->left) {
4132 return ovsdb_error(NULL,
4133 "cannot store a snapshot after leaving cluster");
4134 } else if (raft->failed) {
4135 return ovsdb_error(NULL,
4136 "cannot store a snapshot following failure");
4137 }
4138
4139 if (raft->last_applied < raft->log_start) {
4140 return ovsdb_error(NULL, "not storing a duplicate snapshot");
4141 }
4142
4143 uint64_t new_log_start = raft->last_applied + 1;
a521491b 4144 struct raft_entry new_snapshot = {
1b1d2e6d 4145 .term = raft_get_term(raft, new_log_start - 1),
a521491b 4146 .data = json_clone(new_snapshot_data),
1b1d2e6d 4147 .eid = *raft_get_eid(raft, new_log_start - 1),
a521491b 4148 .servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
8e354614 4149 .election_timer = raft->election_timer,
1b1d2e6d
BP
4150 };
4151 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
4152 &new_snapshot);
4153 if (error) {
a521491b 4154 raft_entry_uninit(&new_snapshot);
1b1d2e6d
BP
4155 return error;
4156 }
4157
4158 raft->log_synced = raft->log_end - 1;
4159 raft_entry_uninit(&raft->snap);
a521491b 4160 raft->snap = new_snapshot;
1b1d2e6d
BP
4161 for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
4162 raft_entry_uninit(&raft->entries[i]);
4163 }
4164 memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
4165 (raft->log_end - new_log_start) * sizeof *raft->entries);
4166 raft->log_start = new_log_start;
4167 return NULL;
4168}
4169
4170static void
4171raft_handle_become_leader(struct raft *raft,
4172 const struct raft_become_leader *rq)
4173{
4174 if (raft->role == RAFT_FOLLOWER) {
4175 char buf[SID_LEN + 1];
4176 VLOG_INFO("received leadership transfer from %s in term %"PRIu64,
4177 raft_get_nickname(raft, &rq->common.sid, buf, sizeof buf),
4178 rq->term);
4179 raft_start_election(raft, true);
4180 }
4181}
4182
4183static void
4184raft_send_execute_command_reply(struct raft *raft,
4185 const struct uuid *sid,
4186 const struct uuid *eid,
4187 enum raft_command_status status,
4188 uint64_t commit_index)
4189{
67dba070
HZ
4190 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REP) {
4191 ovs_fatal(0, "Raft test: crash before sending execute_command_reply");
4192 }
1b1d2e6d
BP
4193 union raft_rpc rpc = {
4194 .execute_command_reply = {
4195 .common = {
4196 .type = RAFT_RPC_EXECUTE_COMMAND_REPLY,
4197 .sid = *sid,
4198 },
4199 .result = *eid,
4200 .status = status,
4201 .commit_index = commit_index,
4202 },
4203 };
4204 raft_send(raft, &rpc);
67dba070
HZ
4205 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REP) {
4206 ovs_fatal(0, "Raft test: crash after sending execute_command_reply.");
4207 }
1b1d2e6d
BP
4208}
4209
4210static enum raft_command_status
4211raft_handle_execute_command_request__(
4212 struct raft *raft, const struct raft_execute_command_request *rq)
4213{
4214 if (raft->role != RAFT_LEADER) {
4215 return RAFT_CMD_NOT_LEADER;
4216 }
4217
4218 const struct uuid *current_eid = raft_current_eid(raft);
4219 if (!uuid_equals(&rq->prereq, current_eid)) {
4220 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4221 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
4222 "prerequisite "UUID_FMT" in execute_command_request",
4223 UUID_ARGS(current_eid), UUID_ARGS(&rq->prereq));
4224 return RAFT_CMD_BAD_PREREQ;
4225 }
4226
4227 struct raft_command *cmd = raft_command_initiate(raft, rq->data,
8e354614 4228 NULL, 0, &rq->result);
1b1d2e6d
BP
4229 cmd->sid = rq->common.sid;
4230
4231 enum raft_command_status status = cmd->status;
48b1c764 4232 raft_command_unref(cmd);
1b1d2e6d
BP
4233 return status;
4234}
4235
4236static void
4237raft_handle_execute_command_request(
4238 struct raft *raft, const struct raft_execute_command_request *rq)
4239{
4240 enum raft_command_status status
4241 = raft_handle_execute_command_request__(raft, rq);
4242 if (status != RAFT_CMD_INCOMPLETE) {
4243 raft_send_execute_command_reply(raft, &rq->common.sid, &rq->result,
4244 status, 0);
4245 }
4246}
4247
4248static void
4249raft_handle_execute_command_reply(
4250 struct raft *raft, const struct raft_execute_command_reply *rpy)
4251{
4252 struct raft_command *cmd = raft_find_command_by_eid(raft, &rpy->result);
4253 if (!cmd) {
4254 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4255 char buf[SID_LEN + 1];
4256 VLOG_INFO_RL(&rl,
4257 "%s received \"%s\" reply from %s for unknown command",
4258 raft->local_nickname,
4259 raft_command_status_to_string(rpy->status),
4260 raft_get_nickname(raft, &rpy->common.sid,
4261 buf, sizeof buf));
4262 return;
4263 }
4264
4265 if (rpy->status == RAFT_CMD_INCOMPLETE) {
4266 cmd->timestamp = time_msec();
4267 } else {
4268 cmd->index = rpy->commit_index;
4269 raft_command_complete(raft, cmd, rpy->status);
4270 }
4271}
4272
4273static void
4274raft_handle_rpc(struct raft *raft, const union raft_rpc *rpc)
4275{
4276 uint64_t term = raft_rpc_get_term(rpc);
4277 if (term
4278 && !raft_should_suppress_disruptive_server(raft, rpc)
4279 && !raft_receive_term__(raft, &rpc->common, term)) {
4280 if (rpc->type == RAFT_RPC_APPEND_REQUEST) {
4281 /* Section 3.3: "If a server receives a request with a stale term
4282 * number, it rejects the request." */
4283 raft_send_append_reply(raft, raft_append_request_cast(rpc),
4284 RAFT_APPEND_INCONSISTENCY, "stale term");
4285 }
4286 return;
4287 }
4288
4289 switch (rpc->type) {
4290#define RAFT_RPC(ENUM, NAME) \
4291 case ENUM: \
4292 raft_handle_##NAME(raft, &rpc->NAME); \
4293 break;
4294 RAFT_RPC_TYPES
4295#undef RAFT_RPC
4296 default:
4297 OVS_NOT_REACHED();
4298 }
4299}
4300\f
4301static bool
4302raft_rpc_is_heartbeat(const union raft_rpc *rpc)
4303{
4304 return ((rpc->type == RAFT_RPC_APPEND_REQUEST
4305 || rpc->type == RAFT_RPC_APPEND_REPLY)
4306 && rpc->common.comment
4307 && !strcmp(rpc->common.comment, "heartbeat"));
4308}
4309
4310\f
4311static bool
02acb41a
BP
4312raft_send_to_conn_at(struct raft *raft, const union raft_rpc *rpc,
4313 struct raft_conn *conn, int line_number)
1b1d2e6d 4314{
02acb41a 4315 log_rpc(rpc, "-->", conn, line_number);
1b1d2e6d
BP
4316 return !jsonrpc_session_send(
4317 conn->js, raft_rpc_to_jsonrpc(&raft->cid, &raft->sid, rpc));
4318}
4319
4320static bool
4321raft_is_rpc_synced(const struct raft *raft, const union raft_rpc *rpc)
4322{
4323 uint64_t term = raft_rpc_get_term(rpc);
4324 uint64_t index = raft_rpc_get_min_sync_index(rpc);
4325 const struct uuid *vote = raft_rpc_get_vote(rpc);
4326
4327 return (term <= raft->synced_term
4328 && index <= raft->log_synced
4329 && (!vote || uuid_equals(vote, &raft->synced_vote)));
4330}
4331
4332static bool
02acb41a 4333raft_send_at(struct raft *raft, const union raft_rpc *rpc, int line_number)
1b1d2e6d
BP
4334{
4335 const struct uuid *dst = &rpc->common.sid;
4336 if (uuid_equals(dst, &raft->sid)) {
4337 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
02acb41a
BP
4338 VLOG_WARN_RL(&rl, "attempted to send RPC to self from raft.c:%d",
4339 line_number);
1b1d2e6d
BP
4340 return false;
4341 }
4342
4343 struct raft_conn *conn = raft_find_conn_by_sid(raft, dst);
4344 if (!conn) {
4345 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4346 char buf[SID_LEN + 1];
02acb41a
BP
4347 VLOG_DBG_RL(&rl, "%s: no connection to %s, cannot send RPC "
4348 "from raft.c:%d", raft->local_nickname,
4349 raft_get_nickname(raft, dst, buf, sizeof buf),
4350 line_number);
1b1d2e6d
BP
4351 return false;
4352 }
4353
4354 if (!raft_is_rpc_synced(raft, rpc)) {
4355 raft_waiter_create(raft, RAFT_W_RPC, false)->rpc = raft_rpc_clone(rpc);
4356 return true;
4357 }
4358
02acb41a 4359 return raft_send_to_conn_at(raft, rpc, conn, line_number);
1b1d2e6d
BP
4360}
4361\f
4362static struct raft *
4363raft_lookup_by_name(const char *name)
4364{
4365 struct raft *raft;
4366
4367 HMAP_FOR_EACH_WITH_HASH (raft, hmap_node, hash_string(name, 0),
4368 &all_rafts) {
4369 if (!strcmp(raft->name, name)) {
4370 return raft;
4371 }
4372 }
4373 return NULL;
4374}
4375
4376static void
4377raft_unixctl_cid(struct unixctl_conn *conn,
4378 int argc OVS_UNUSED, const char *argv[],
4379 void *aux OVS_UNUSED)
4380{
4381 struct raft *raft = raft_lookup_by_name(argv[1]);
4382 if (!raft) {
4383 unixctl_command_reply_error(conn, "unknown cluster");
4384 } else if (uuid_is_zero(&raft->cid)) {
4385 unixctl_command_reply_error(conn, "cluster id not yet known");
4386 } else {
4387 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->cid));
4388 unixctl_command_reply(conn, uuid);
4389 free(uuid);
4390 }
4391}
4392
4393static void
4394raft_unixctl_sid(struct unixctl_conn *conn,
4395 int argc OVS_UNUSED, const char *argv[],
4396 void *aux OVS_UNUSED)
4397{
4398 struct raft *raft = raft_lookup_by_name(argv[1]);
4399 if (!raft) {
4400 unixctl_command_reply_error(conn, "unknown cluster");
4401 } else {
4402 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->sid));
4403 unixctl_command_reply(conn, uuid);
4404 free(uuid);
4405 }
4406}
4407
4408static void
4409raft_put_sid(const char *title, const struct uuid *sid,
4410 const struct raft *raft, struct ds *s)
4411{
4412 ds_put_format(s, "%s: ", title);
4413 if (uuid_equals(sid, &raft->sid)) {
4414 ds_put_cstr(s, "self");
4415 } else if (uuid_is_zero(sid)) {
4416 ds_put_cstr(s, "unknown");
4417 } else {
4418 char buf[SID_LEN + 1];
4419 ds_put_cstr(s, raft_get_nickname(raft, sid, buf, sizeof buf));
4420 }
4421 ds_put_char(s, '\n');
4422}
4423
4424static void
4425raft_unixctl_status(struct unixctl_conn *conn,
4426 int argc OVS_UNUSED, const char *argv[],
4427 void *aux OVS_UNUSED)
4428{
4429 struct raft *raft = raft_lookup_by_name(argv[1]);
4430 if (!raft) {
4431 unixctl_command_reply_error(conn, "unknown cluster");
4432 return;
4433 }
4434
4435 struct ds s = DS_EMPTY_INITIALIZER;
4436 ds_put_format(&s, "%s\n", raft->local_nickname);
4437 ds_put_format(&s, "Name: %s\n", raft->name);
4438 ds_put_format(&s, "Cluster ID: ");
4439 if (!uuid_is_zero(&raft->cid)) {
4440 ds_put_format(&s, CID_FMT" ("UUID_FMT")\n",
4441 CID_ARGS(&raft->cid), UUID_ARGS(&raft->cid));
4442 } else {
4443 ds_put_format(&s, "not yet known\n");
4444 }
4445 ds_put_format(&s, "Server ID: "SID_FMT" ("UUID_FMT")\n",
4446 SID_ARGS(&raft->sid), UUID_ARGS(&raft->sid));
4447 ds_put_format(&s, "Address: %s\n", raft->local_address);
4448 ds_put_format(&s, "Status: %s\n",
4449 raft->joining ? "joining cluster"
4450 : raft->leaving ? "leaving cluster"
4451 : raft->left ? "left cluster"
4452 : raft->failed ? "failed"
4453 : "cluster member");
4454 if (raft->joining) {
4455 ds_put_format(&s, "Remotes for joining:");
4456 const char *address;
4457 SSET_FOR_EACH (address, &raft->remote_addresses) {
4458 ds_put_format(&s, " %s", address);
4459 }
4460 ds_put_char(&s, '\n');
4461 }
4462 if (raft->role == RAFT_LEADER) {
4463 struct raft_server *as;
4464 HMAP_FOR_EACH (as, hmap_node, &raft->add_servers) {
4465 ds_put_format(&s, "Adding server %s ("SID_FMT" at %s) (%s)\n",
4466 as->nickname, SID_ARGS(&as->sid), as->address,
4467 raft_server_phase_to_string(as->phase));
4468 }
4469
4470 struct raft_server *rs = raft->remove_server;
4471 if (rs) {
4472 ds_put_format(&s, "Removing server %s ("SID_FMT" at %s) (%s)\n",
4473 rs->nickname, SID_ARGS(&rs->sid), rs->address,
4474 raft_server_phase_to_string(rs->phase));
4475 }
4476 }
4477
4478 ds_put_format(&s, "Role: %s\n",
4479 raft->role == RAFT_LEADER ? "leader"
4480 : raft->role == RAFT_CANDIDATE ? "candidate"
4481 : raft->role == RAFT_FOLLOWER ? "follower"
4482 : "<error>");
4483 ds_put_format(&s, "Term: %"PRIu64"\n", raft->term);
4484 raft_put_sid("Leader", &raft->leader_sid, raft, &s);
4485 raft_put_sid("Vote", &raft->vote, raft, &s);
4486 ds_put_char(&s, '\n');
4487
8e354614
HZ
4488 ds_put_format(&s, "Election timer: %"PRIu64, raft->election_timer);
4489 if (raft->role == RAFT_LEADER && raft->election_timer_new) {
4490 ds_put_format(&s, " (changing to %"PRIu64")",
4491 raft->election_timer_new);
4492 }
4493 ds_put_char(&s, '\n');
4494
1b1d2e6d
BP
4495 ds_put_format(&s, "Log: [%"PRIu64", %"PRIu64"]\n",
4496 raft->log_start, raft->log_end);
4497
4498 uint64_t n_uncommitted = raft->log_end - raft->commit_index - 1;
4499 ds_put_format(&s, "Entries not yet committed: %"PRIu64"\n", n_uncommitted);
4500
4501 uint64_t n_unapplied = raft->log_end - raft->last_applied - 1;
4502 ds_put_format(&s, "Entries not yet applied: %"PRIu64"\n", n_unapplied);
4503
4504 const struct raft_conn *c;
4505 ds_put_cstr(&s, "Connections:");
4506 LIST_FOR_EACH (c, list_node, &raft->conns) {
4507 bool connected = jsonrpc_session_is_connected(c->js);
4508 ds_put_format(&s, " %s%s%s%s",
4509 connected ? "" : "(",
4510 c->incoming ? "<-" : "->", c->nickname,
4511 connected ? "" : ")");
4512 }
4513 ds_put_char(&s, '\n');
4514
4515 ds_put_cstr(&s, "Servers:\n");
4516 struct raft_server *server;
4517 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
4518 ds_put_format(&s, " %s ("SID_FMT" at %s)",
4519 server->nickname,
4520 SID_ARGS(&server->sid), server->address);
4521 if (uuid_equals(&server->sid, &raft->sid)) {
4522 ds_put_cstr(&s, " (self)");
4523 }
4524 if (server->phase != RAFT_PHASE_STABLE) {
4525 ds_put_format (&s, " (%s)",
4526 raft_server_phase_to_string(server->phase));
4527 }
4528 if (raft->role == RAFT_CANDIDATE) {
4529 if (!uuid_is_zero(&server->vote)) {
4530 char buf[SID_LEN + 1];
4531 ds_put_format(&s, " (voted for %s)",
4532 raft_get_nickname(raft, &server->vote,
4533 buf, sizeof buf));
4534 }
4535 } else if (raft->role == RAFT_LEADER) {
4536 ds_put_format(&s, " next_index=%"PRIu64" match_index=%"PRIu64,
4537 server->next_index, server->match_index);
4538 }
4539 ds_put_char(&s, '\n');
4540 }
4541
4542 unixctl_command_reply(conn, ds_cstr(&s));
4543 ds_destroy(&s);
4544}
4545
4546static void
4547raft_unixctl_leave__(struct unixctl_conn *conn, struct raft *raft)
4548{
4549 if (raft_is_leaving(raft)) {
4550 unixctl_command_reply_error(conn,
4551 "already in progress leaving cluster");
4552 } else if (raft_is_joining(raft)) {
4553 unixctl_command_reply_error(conn,
4554 "can't leave while join in progress");
4555 } else if (raft_failed(raft)) {
4556 unixctl_command_reply_error(conn,
4557 "can't leave after failure");
4558 } else {
4559 raft_leave(raft);
4560 unixctl_command_reply(conn, NULL);
4561 }
4562}
4563
4564static void
4565raft_unixctl_leave(struct unixctl_conn *conn, int argc OVS_UNUSED,
4566 const char *argv[], void *aux OVS_UNUSED)
4567{
4568 struct raft *raft = raft_lookup_by_name(argv[1]);
4569 if (!raft) {
4570 unixctl_command_reply_error(conn, "unknown cluster");
4571 return;
4572 }
4573
4574 raft_unixctl_leave__(conn, raft);
4575}
4576
4577static struct raft_server *
4578raft_lookup_server_best_match(struct raft *raft, const char *id)
4579{
4580 struct raft_server *best = NULL;
4581 int best_score = -1;
4582 int n_best = 0;
4583
4584 struct raft_server *s;
4585 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
4586 int score = (!strcmp(id, s->address)
4587 ? INT_MAX
4588 : uuid_is_partial_match(&s->sid, id));
4589 if (score > best_score) {
4590 best = s;
4591 best_score = score;
4592 n_best = 1;
4593 } else if (score == best_score) {
4594 n_best++;
4595 }
4596 }
4597 return n_best == 1 ? best : NULL;
4598}
4599
4600static void
4601raft_unixctl_kick(struct unixctl_conn *conn, int argc OVS_UNUSED,
4602 const char *argv[], void *aux OVS_UNUSED)
4603{
4604 const char *cluster_name = argv[1];
4605 const char *server_name = argv[2];
4606
4607 struct raft *raft = raft_lookup_by_name(cluster_name);
4608 if (!raft) {
4609 unixctl_command_reply_error(conn, "unknown cluster");
4610 return;
4611 }
4612
4613 struct raft_server *server = raft_lookup_server_best_match(raft,
4614 server_name);
4615 if (!server) {
4616 unixctl_command_reply_error(conn, "unknown server");
4617 return;
4618 }
4619
4620 if (uuid_equals(&server->sid, &raft->sid)) {
4621 raft_unixctl_leave__(conn, raft);
4622 } else if (raft->role == RAFT_LEADER) {
4623 const struct raft_remove_server_request rq = {
4624 .sid = server->sid,
4625 .requester_conn = conn,
4626 };
4627 raft_handle_remove_server_request(raft, &rq);
4628 } else {
4629 const union raft_rpc rpc = {
4630 .remove_server_request = {
4631 .common = {
4632 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
4633 .sid = raft->leader_sid,
4634 .comment = "via unixctl"
4635 },
4636 .sid = server->sid,
4637 }
4638 };
4639 if (raft_send(raft, &rpc)) {
4640 unixctl_command_reply(conn, "sent removal request to leader");
4641 } else {
4642 unixctl_command_reply_error(conn,
4643 "failed to send removal request");
4644 }
4645 }
4646}
4647
8e354614
HZ
4648static void
4649raft_get_election_timer_from_log(struct raft *raft)
4650{
4651 if (raft->snap.election_timer) {
4652 raft->election_timer = raft->snap.election_timer;
4653 }
4654 for (uint64_t index = raft->commit_index; index >= raft->log_start;
4655 index--) {
4656 struct raft_entry *e = &raft->entries[index - raft->log_start];
4657 if (e->election_timer) {
4658 raft->election_timer = e->election_timer;
4659 break;
4660 }
4661 }
4662}
4663
4664static void
4665raft_log_election_timer(struct raft *raft)
4666{
4667 raft_command_unref(raft_command_execute__(raft, NULL, NULL,
4668 raft->election_timer_new, NULL,
4669 NULL));
4670}
4671
4672static void
4673raft_unixctl_change_election_timer(struct unixctl_conn *conn,
4674 int argc OVS_UNUSED, const char *argv[],
4675 void *aux OVS_UNUSED)
4676{
4677 const char *cluster_name = argv[1];
4678 const char *election_timer_str = argv[2];
4679
4680 struct raft *raft = raft_lookup_by_name(cluster_name);
4681 if (!raft) {
4682 unixctl_command_reply_error(conn, "unknown cluster");
4683 return;
4684 }
4685
4686 if (raft->role != RAFT_LEADER) {
4687 unixctl_command_reply_error(conn, "election timer must be changed"
4688 " through leader.");
4689 return;
4690 }
4691
4692 /* If there are pending changes for election timer, reject it. */
4693 if (raft->election_timer_new) {
4694 unixctl_command_reply_error(conn, "election timer change pending.");
4695 return;
4696 }
4697
4698 uint64_t election_timer = atoll(election_timer_str);
4699 if (election_timer == raft->election_timer) {
4700 unixctl_command_reply(conn, "change election timer to current value.");
4701 return;
4702 }
4703
4704 /* Election timer smaller than 100ms or bigger than 10min doesn't make
4705 * sense. */
4706 if (election_timer < 100 || election_timer > 600000) {
4707 unixctl_command_reply_error(conn, "election timer must be between "
4708 "100 and 600000, in msec.");
4709 return;
4710 }
4711
4712 /* If election timer is to be enlarged, it should be done gradually so that
4713 * it won't cause timeout when new value is applied on leader but not yet
4714 * applied on some of the followers. */
4715 if (election_timer > raft->election_timer * 2) {
4716 unixctl_command_reply_error(conn, "election timer increase should not "
4717 "exceed the current value x 2.");
4718 return;
4719 }
4720
4721 raft->election_timer_new = election_timer;
4722 raft_log_election_timer(raft);
4723 unixctl_command_reply(conn, "change of election timer initiated.");
4724}
4725
80e3becd
IM
4726static void
4727raft_unixctl_set_backlog_threshold(struct unixctl_conn *conn,
4728 int argc OVS_UNUSED, const char *argv[],
4729 void *aux OVS_UNUSED)
4730{
4731 const char *cluster_name = argv[1];
4732 unsigned long long n_msgs, n_bytes;
4733 struct raft_conn *r_conn;
4734
4735 struct raft *raft = raft_lookup_by_name(cluster_name);
4736 if (!raft) {
4737 unixctl_command_reply_error(conn, "unknown cluster");
4738 return;
4739 }
4740
4741 if (!str_to_ullong(argv[2], 10, &n_msgs)
4742 || !str_to_ullong(argv[3], 10, &n_bytes)) {
4743 unixctl_command_reply_error(conn, "invalid argument");
4744 return;
4745 }
4746
4747 if (n_msgs < 50 || n_msgs > SIZE_MAX || n_bytes > SIZE_MAX) {
4748 unixctl_command_reply_error(conn, "values out of range");
4749 return;
4750 }
4751
4752 raft->conn_backlog_max_n_msgs = n_msgs;
4753 raft->conn_backlog_max_n_bytes = n_bytes;
4754
4755 LIST_FOR_EACH (r_conn, list_node, &raft->conns) {
4756 jsonrpc_session_set_backlog_threshold(r_conn->js, n_msgs, n_bytes);
4757 }
4758
4759 unixctl_command_reply(conn, NULL);
4760}
4761
67dba070
HZ
4762static void
4763raft_unixctl_failure_test(struct unixctl_conn *conn OVS_UNUSED,
4764 int argc OVS_UNUSED, const char *argv[],
4765 void *aux OVS_UNUSED)
4766{
4767 const char *test = argv[1];
4768 if (!strcmp(test, "crash-before-sending-append-request")) {
4769 failure_test = FT_CRASH_BEFORE_SEND_APPEND_REQ;
4770 } else if (!strcmp(test, "crash-after-sending-append-request")) {
4771 failure_test = FT_CRASH_AFTER_SEND_APPEND_REQ;
4772 } else if (!strcmp(test, "crash-before-sending-execute-command-reply")) {
4773 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REP;
4774 } else if (!strcmp(test, "crash-after-sending-execute-command-reply")) {
4775 failure_test = FT_CRASH_AFTER_SEND_EXEC_REP;
4776 } else if (!strcmp(test, "crash-before-sending-execute-command-request")) {
4777 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REQ;
4778 } else if (!strcmp(test, "crash-after-sending-execute-command-request")) {
4779 failure_test = FT_CRASH_AFTER_SEND_EXEC_REQ;
4780 } else if (!strcmp(test, "crash-after-receiving-append-request-update")) {
4781 failure_test = FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE;
4782 } else if (!strcmp(test, "delay-election")) {
4783 failure_test = FT_DELAY_ELECTION;
4784 struct raft *raft;
4785 HMAP_FOR_EACH (raft, hmap_node, &all_rafts) {
4786 if (raft->role == RAFT_FOLLOWER) {
4787 raft_reset_election_timer(raft);
4788 }
4789 }
93ee4209
HZ
4790 } else if (!strcmp(test, "dont-send-vote-request")) {
4791 failure_test = FT_DONT_SEND_VOTE_REQUEST;
67dba070
HZ
4792 } else if (!strcmp(test, "clear")) {
4793 failure_test = FT_NO_TEST;
4794 unixctl_command_reply(conn, "test dismissed");
4795 return;
4796 } else {
4797 unixctl_command_reply_error(conn, "unknown test scenario");
4798 return;
4799 }
4800 unixctl_command_reply(conn, "test engaged");
4801}
4802
1b1d2e6d
BP
4803static void
4804raft_init(void)
4805{
4806 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
4807 if (!ovsthread_once_start(&once)) {
4808 return;
4809 }
4810 unixctl_command_register("cluster/cid", "DB", 1, 1,
4811 raft_unixctl_cid, NULL);
4812 unixctl_command_register("cluster/sid", "DB", 1, 1,
4813 raft_unixctl_sid, NULL);
4814 unixctl_command_register("cluster/status", "DB", 1, 1,
4815 raft_unixctl_status, NULL);
4816 unixctl_command_register("cluster/leave", "DB", 1, 1,
4817 raft_unixctl_leave, NULL);
4818 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4819 raft_unixctl_kick, NULL);
8e354614
HZ
4820 unixctl_command_register("cluster/change-election-timer", "DB TIME", 2, 2,
4821 raft_unixctl_change_election_timer, NULL);
80e3becd
IM
4822 unixctl_command_register("cluster/set-backlog-threshold",
4823 "DB N_MSGS N_BYTES", 3, 3,
4824 raft_unixctl_set_backlog_threshold, NULL);
67dba070
HZ
4825 unixctl_command_register("cluster/failure-test", "FAILURE SCENARIO", 1, 1,
4826 raft_unixctl_failure_test, NULL);
1b1d2e6d
BP
4827 ovsthread_once_done(&once);
4828}