]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/raft.c
Eliminate "whitelist" and "blacklist" terms.
[mirror_ovs.git] / ovsdb / raft.c
1 /*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18
19 #include "raft.h"
20 #include "raft-private.h"
21
22 #include <errno.h>
23 #include <unistd.h>
24
25 #include "hash.h"
26 #include "jsonrpc.h"
27 #include "lockfile.h"
28 #include "openvswitch/dynamic-string.h"
29 #include "openvswitch/hmap.h"
30 #include "openvswitch/json.h"
31 #include "openvswitch/list.h"
32 #include "openvswitch/poll-loop.h"
33 #include "openvswitch/vlog.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-parser.h"
36 #include "ovsdb/log.h"
37 #include "raft-rpc.h"
38 #include "random.h"
39 #include "simap.h"
40 #include "socket-util.h"
41 #include "stream.h"
42 #include "timeval.h"
43 #include "unicode.h"
44 #include "unixctl.h"
45 #include "util.h"
46 #include "uuid.h"
47
48 VLOG_DEFINE_THIS_MODULE(raft);
49
50 /* Roles for a Raft server:
51 *
52 * - Followers: Servers in touch with the current leader.
53 *
54 * - Candidate: Servers unaware of a current leader and seeking election to
55 * leader.
56 *
57 * - Leader: Handles all client requests. At most one at a time.
58 *
59 * In normal operation there is exactly one leader and all of the other servers
60 * are followers. */
61 enum raft_role {
62 RAFT_FOLLOWER,
63 RAFT_CANDIDATE,
64 RAFT_LEADER
65 };
66
67 /* Flags for unit tests. */
68 enum raft_failure_test {
69 FT_NO_TEST,
70 FT_CRASH_BEFORE_SEND_APPEND_REQ,
71 FT_CRASH_AFTER_SEND_APPEND_REQ,
72 FT_CRASH_BEFORE_SEND_EXEC_REP,
73 FT_CRASH_AFTER_SEND_EXEC_REP,
74 FT_CRASH_BEFORE_SEND_EXEC_REQ,
75 FT_CRASH_AFTER_SEND_EXEC_REQ,
76 FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE,
77 FT_DELAY_ELECTION,
78 FT_DONT_SEND_VOTE_REQUEST
79 };
80 static enum raft_failure_test failure_test;
81
82 /* A connection between this Raft server and another one. */
83 struct raft_conn {
84 struct ovs_list list_node; /* In struct raft's 'conns' list. */
85 struct jsonrpc_session *js; /* JSON-RPC connection. */
86 struct uuid sid; /* This server's unique ID. */
87 char *nickname; /* Short name for use in log messages. */
88 bool incoming; /* True if incoming, false if outgoing. */
89 unsigned int js_seqno; /* Seqno for noticing (re)connections. */
90 };
91
92 static void raft_conn_close(struct raft_conn *);
93
94 /* A "command", that is, a request to append an entry to the log.
95 *
96 * The Raft specification only allows clients to issue commands to the leader.
97 * With this implementation, clients may issue a command on any server, which
98 * then relays the command to the leader if necessary.
99 *
100 * This structure is thus used in three cases:
101 *
102 * 1. We are the leader and the command was issued to us directly.
103 *
104 * 2. We are a follower and relayed the command to the leader.
105 *
106 * 3. We are the leader and a follower relayed the command to us.
107 */
108 struct raft_command {
109 /* All cases. */
110 struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
111 unsigned int n_refs; /* Reference count. */
112 enum raft_command_status status; /* Execution status. */
113 struct uuid eid; /* Entry ID of result. */
114
115 /* Case 1 only. */
116 uint64_t index; /* Index in log (0 if being relayed). */
117
118 /* Case 2 only. */
119 long long int timestamp; /* Issue or last ping time, for expiration. */
120
121 /* Case 3 only. */
122 struct uuid sid; /* The follower (otherwise UUID_ZERO). */
123 };
124
125 static void raft_command_complete(struct raft *, struct raft_command *,
126 enum raft_command_status);
127
128 static void raft_complete_all_commands(struct raft *,
129 enum raft_command_status);
130
131 /* Type of deferred action, see struct raft_waiter. */
132 enum raft_waiter_type {
133 RAFT_W_ENTRY,
134 RAFT_W_TERM,
135 RAFT_W_RPC,
136 };
137
138 /* An action deferred until a log write commits to disk. */
139 struct raft_waiter {
140 struct ovs_list list_node;
141 uint64_t commit_ticket;
142
143 enum raft_waiter_type type;
144 union {
145 /* RAFT_W_ENTRY.
146 *
147 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
148 * completion, updates 'log_synced' to indicate that the new log entry
149 * or entries are committed and, if we are leader, also updates our
150 * local 'match_index'. */
151 struct {
152 uint64_t index;
153 } entry;
154
155 /* RAFT_W_TERM.
156 *
157 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
158 * Upon completion, updates 'synced_term' and 'synced_vote', which
159 * triggers sending RPCs deferred by the uncommitted 'term' and
160 * 'vote'. */
161 struct {
162 uint64_t term;
163 struct uuid vote;
164 } term;
165
166 /* RAFT_W_RPC.
167 *
168 * Sometimes, sending an RPC to a peer must be delayed until an entry,
169 * a term, or a vote mentioned in the RPC is synced to disk. This
170 * waiter keeps a copy of such an RPC until the previous waiters have
171 * committed. */
172 union raft_rpc *rpc;
173 };
174 };
175
176 static struct raft_waiter *raft_waiter_create(struct raft *,
177 enum raft_waiter_type,
178 bool start_commit);
179 static void raft_waiters_destroy(struct raft *);
180
181 /* The Raft state machine. */
182 struct raft {
183 struct hmap_node hmap_node; /* In 'all_rafts'. */
184 struct ovsdb_log *log;
185
186 /* Persistent derived state.
187 *
188 * This must be updated on stable storage before responding to RPCs. It can be
189 * derived from the header, snapshot, and log in 'log'. */
190
191 struct uuid cid; /* Cluster ID (immutable for the cluster). */
192 struct uuid sid; /* Server ID (immutable for the server). */
193 char *local_address; /* Local address (immutable for the server). */
194 char *local_nickname; /* Used for local server in log messages. */
195 char *name; /* Schema name (immutable for the cluster). */
196
197 /* Contains "struct raft_server"s and represents the server configuration
198 * most recently added to 'log'. */
199 struct hmap servers;
200
201 #define ELECTION_BASE_MSEC 1000
202 #define ELECTION_RANGE_MSEC 1000
203 /* The election timeout base value for leader election, in milliseconds.
204 * It can be set by unixctl cluster/change-election-timer. Default value is
205 * ELECTION_BASE_MSEC. */
206 uint64_t election_timer;
207 /* If not 0, it is the new value of election_timer being proposed. */
208 uint64_t election_timer_new;
209
210 /* Persistent state on all servers.
211 *
212 * Must be updated on stable storage before responding to RPCs. */
213
214 /* Current term and the vote for that term. These might be on the way to
215 * disk now. */
216 uint64_t term; /* Initialized to 0 and only increases. */
217 struct uuid vote; /* All-zeros if no vote yet in 'term'. */
218
219 /* The term and vote that have been synced to disk. */
220 uint64_t synced_term;
221 struct uuid synced_vote;
222
223 /* The log.
224 *
225 * A log entry with index 1 never really exists; the initial snapshot for a
226 * Raft is considered to include this index. The first real log entry has
227 * index 2.
228 *
229 * A new Raft instance contains an empty log: log_start=2, log_end=2.
230 * Over time, the log grows: log_start=2, log_end=N.
231 * At some point, the server takes a snapshot: log_start=N, log_end=N.
232 * The log continues to grow: log_start=N, log_end=N+1...
233 *
234 * Must be updated on stable storage before responding to RPCs. */
235 struct raft_entry *entries; /* Log entry i is in log[i - log_start]. */
236 uint64_t log_start; /* Index of first entry in log. */
237 uint64_t log_end; /* Index of last entry in log, plus 1. */
238 uint64_t log_synced; /* Index of last synced entry. */
239 size_t allocated_log; /* Allocated entries in 'log'. */
240
241 /* Snapshot state (see Figure 5.1)
242 *
243 * This is the state of the cluster as of the last discarded log entry,
244 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
245 * Only committed log entries can be included in a snapshot. */
246 struct raft_entry snap;
247
248 /* Volatile state.
249 *
250 * The snapshot is always committed, but the rest of the log might not be yet.
251 * 'last_applied' tracks what entries have been passed to the client. If the
252 * client hasn't yet read the latest snapshot, then even the snapshot isn't
253 * applied yet. Thus, the invariants are different for these members:
254 *
255 * log_start - 2 <= last_applied <= commit_index < log_end.
256 * log_start - 1 <= commit_index < log_end.
257 */
258
259 enum raft_role role; /* Current role. */
260 uint64_t commit_index; /* Max log index known to be committed. */
261 uint64_t last_applied; /* Max log index applied to state machine. */
262 struct uuid leader_sid; /* Server ID of leader (zero, if unknown). */
263
264 long long int election_base; /* Time of last heartbeat from leader. */
265 long long int election_timeout; /* Time at which we start an election. */
266
267 /* Used for joining a cluster. */
268 bool joining; /* Attempting to join the cluster? */
269 struct sset remote_addresses; /* Addresses to try to find other servers. */
270 long long int join_timeout; /* Time to re-send add server request. */
271
272 /* Used for leaving a cluster. */
273 bool leaving; /* True if we are leaving the cluster. */
274 bool left; /* True if we have finished leaving. */
275 long long int leave_timeout; /* Time to re-send remove server request. */
276
277 /* Failure. */
278 bool failed; /* True if unrecoverable error has occurred. */
279
280 /* File synchronization. */
281 struct ovs_list waiters; /* Contains "struct raft_waiter"s. */
282
283 /* Network connections. */
284 struct pstream *listener; /* For connections from other Raft servers. */
285 long long int listen_backoff; /* For retrying creating 'listener'. */
286 struct ovs_list conns; /* Contains struct raft_conns. */
287
288 /* Leaders only. Reinitialized after becoming leader. */
289 struct hmap add_servers; /* Contains "struct raft_server"s to add. */
290 struct raft_server *remove_server; /* Server being removed. */
291 struct hmap commands; /* Contains "struct raft_command"s. */
292 long long int ping_timeout; /* Time at which to send a heartbeat */
293
294 /* Candidates only. Reinitialized at start of election. */
295 int n_votes; /* Number of votes for me. */
296
297 /* Followers and candidates only. */
298 bool candidate_retrying; /* The earlier election timed-out and we are
299 now retrying. */
300 bool had_leader; /* There has been leader elected since last
301 election initiated. This is to help setting
302 candidate_retrying. */
303
304 /* For all. */
305 bool ever_had_leader; /* There has been leader elected since the raft
306 is initialized, meaning it is ever
307 connected. */
308 };
309
310 /* All Raft structures. */
311 static struct hmap all_rafts = HMAP_INITIALIZER(&all_rafts);
312
313 static void raft_init(void);
314
315 static struct ovsdb_error *raft_read_header(struct raft *)
316 OVS_WARN_UNUSED_RESULT;
317
318 static void raft_send_execute_command_reply(struct raft *,
319 const struct uuid *sid,
320 const struct uuid *eid,
321 enum raft_command_status,
322 uint64_t commit_index);
323
324 static void raft_update_our_match_index(struct raft *, uint64_t min_index);
325
326 static void raft_send_remove_server_reply__(
327 struct raft *, const struct uuid *target_sid,
328 const struct uuid *requester_sid, struct unixctl_conn *requester_conn,
329 bool success, const char *comment);
330 static void raft_finished_leaving_cluster(struct raft *);
331
332 static void raft_server_init_leader(struct raft *, struct raft_server *);
333
334 static bool raft_rpc_is_heartbeat(const union raft_rpc *);
335 static bool raft_is_rpc_synced(const struct raft *, const union raft_rpc *);
336
337 static void raft_handle_rpc(struct raft *, const union raft_rpc *);
338
339 static bool raft_send_at(struct raft *, const union raft_rpc *,
340 int line_number);
341 #define raft_send(raft, rpc) raft_send_at(raft, rpc, __LINE__)
342
343 static bool raft_send_to_conn_at(struct raft *, const union raft_rpc *,
344 struct raft_conn *, int line_number);
345 #define raft_send_to_conn(raft, rpc, conn) \
346 raft_send_to_conn_at(raft, rpc, conn, __LINE__)
347
348 static void raft_send_append_request(struct raft *,
349 struct raft_server *, unsigned int n,
350 const char *comment);
351
352 static void raft_become_leader(struct raft *);
353 static void raft_become_follower(struct raft *);
354 static void raft_reset_election_timer(struct raft *);
355 static void raft_reset_ping_timer(struct raft *);
356 static void raft_send_heartbeats(struct raft *);
357 static void raft_start_election(struct raft *, bool leadership_transfer);
358 static bool raft_truncate(struct raft *, uint64_t new_end);
359 static void raft_get_servers_from_log(struct raft *, enum vlog_level);
360 static void raft_get_election_timer_from_log(struct raft *);
361
362 static bool raft_handle_write_error(struct raft *, struct ovsdb_error *);
363
364 static void raft_run_reconfigure(struct raft *);
365
366 static void raft_set_leader(struct raft *, const struct uuid *sid);
367 static struct raft_server *
368 raft_find_server(const struct raft *raft, const struct uuid *sid)
369 {
370 return raft_server_find(&raft->servers, sid);
371 }
372
373 static char *
374 raft_make_address_passive(const char *address_)
375 {
376 if (!strncmp(address_, "unix:", 5)) {
377 return xasprintf("p%s", address_);
378 } else {
379 char *address = xstrdup(address_);
380 char *host, *port;
381 inet_parse_host_port_tokens(strchr(address, ':') + 1, &host, &port);
382
383 struct ds paddr = DS_EMPTY_INITIALIZER;
384 ds_put_format(&paddr, "p%.3s:%s:", address, port);
385 if (strchr(host, ':')) {
386 ds_put_format(&paddr, "[%s]", host);
387 } else {
388 ds_put_cstr(&paddr, host);
389 }
390 free(address);
391 return ds_steal_cstr(&paddr);
392 }
393 }
394
395 static struct raft *
396 raft_alloc(void)
397 {
398 raft_init();
399
400 struct raft *raft = xzalloc(sizeof *raft);
401 hmap_node_nullify(&raft->hmap_node);
402 hmap_init(&raft->servers);
403 raft->log_start = raft->log_end = 1;
404 raft->role = RAFT_FOLLOWER;
405 sset_init(&raft->remote_addresses);
406 raft->join_timeout = LLONG_MAX;
407 ovs_list_init(&raft->waiters);
408 raft->listen_backoff = LLONG_MIN;
409 ovs_list_init(&raft->conns);
410 hmap_init(&raft->add_servers);
411 hmap_init(&raft->commands);
412
413 raft->election_timer = ELECTION_BASE_MSEC;
414
415 return raft;
416 }
417
418 /* Creates an on-disk file that represents a new Raft cluster and initializes
419 * it to consist of a single server, the one on which this function is called.
420 *
421 * Creates the local copy of the cluster's log in 'file_name', which must not
422 * already exist. Gives it the name 'name', which should be the database
423 * schema name and which is used only to match up this database with the server
424 * added to the cluster later if the cluster ID is unavailable.
425 *
426 * The new server is located at 'local_address', which must take one of the
427 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
428 * square bracket enclosed IPv6 address and PORT is a TCP port number.
429 *
430 * This only creates the on-disk file. Use raft_open() to start operating the
431 * new server.
432 *
433 * Returns null if successful, otherwise an ovsdb_error describing the
434 * problem. */
435 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
436 raft_create_cluster(const char *file_name, const char *name,
437 const char *local_address, const struct json *data)
438 {
439 /* Parse and verify validity of the local address. */
440 struct ovsdb_error *error = raft_address_validate(local_address);
441 if (error) {
442 return error;
443 }
444
445 /* Create log file. */
446 struct ovsdb_log *log;
447 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
448 -1, &log);
449 if (error) {
450 return error;
451 }
452
453 /* Write log file. */
454 struct raft_header h = {
455 .sid = uuid_random(),
456 .cid = uuid_random(),
457 .name = xstrdup(name),
458 .local_address = xstrdup(local_address),
459 .joining = false,
460 .remote_addresses = SSET_INITIALIZER(&h.remote_addresses),
461 .snap_index = 1,
462 .snap = {
463 .term = 1,
464 .data = json_nullable_clone(data),
465 .eid = uuid_random(),
466 .servers = json_object_create(),
467 },
468 };
469 shash_add_nocopy(json_object(h.snap.servers),
470 xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
471 json_string_create(local_address));
472 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
473 raft_header_uninit(&h);
474 if (!error) {
475 error = ovsdb_log_commit_block(log);
476 }
477 ovsdb_log_close(log);
478
479 return error;
480 }
481
482 /* Creates a database file that represents a new server in an existing Raft
483 * cluster.
484 *
485 * Creates the local copy of the cluster's log in 'file_name', which must not
486 * already exist. Gives it the name 'name', which must be the same name
487 * passed in to raft_create_cluster() earlier.
488 *
489 * 'cid' is optional. If specified, the new server will join only the cluster
490 * with the given cluster ID.
491 *
492 * The new server is located at 'local_address', which must take one of the
493 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
494 * square bracket enclosed IPv6 address and PORT is a TCP port number.
495 *
496 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
497 * specifies the addresses of existing servers in the cluster. One server out
498 * of the existing cluster is sufficient, as long as that server is reachable
499 * and not partitioned from the current cluster leader. If multiple servers
500 * from the cluster are specified, then it is sufficient for any of them to
501 * meet this criterion.
502 *
503 * This only creates the on-disk file and does no network access. Use
504 * raft_open() to start operating the new server. (Until this happens, the
505 * new server has not joined the cluster.)
506 *
507 * Returns null if successful, otherwise an ovsdb_error describing the
508 * problem. */
509 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
510 raft_join_cluster(const char *file_name,
511 const char *name, const char *local_address,
512 const struct sset *remote_addresses,
513 const struct uuid *cid)
514 {
515 ovs_assert(!sset_is_empty(remote_addresses));
516
517 /* Parse and verify validity of the addresses. */
518 struct ovsdb_error *error = raft_address_validate(local_address);
519 if (error) {
520 return error;
521 }
522 const char *addr;
523 SSET_FOR_EACH (addr, remote_addresses) {
524 error = raft_address_validate(addr);
525 if (error) {
526 return error;
527 }
528 if (!strcmp(addr, local_address)) {
529 return ovsdb_error(NULL, "remote addresses cannot be the same "
530 "as the local address");
531 }
532 }
533
534 /* Verify validity of the cluster ID (if provided). */
535 if (cid && uuid_is_zero(cid)) {
536 return ovsdb_error(NULL, "all-zero UUID is not valid cluster ID");
537 }
538
539 /* Create log file. */
540 struct ovsdb_log *log;
541 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
542 -1, &log);
543 if (error) {
544 return error;
545 }
546
547 /* Write log file. */
548 struct raft_header h = {
549 .sid = uuid_random(),
550 .cid = cid ? *cid : UUID_ZERO,
551 .name = xstrdup(name),
552 .local_address = xstrdup(local_address),
553 .joining = true,
554 /* No snapshot yet. */
555 };
556 sset_clone(&h.remote_addresses, remote_addresses);
557 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
558 raft_header_uninit(&h);
559 if (!error) {
560 error = ovsdb_log_commit_block(log);
561 }
562 ovsdb_log_close(log);
563
564 return error;
565 }
566
567 /* Reads the initial header record from 'log', which must be a Raft clustered
568 * database log, and populates '*md' with the information read from it. The
569 * caller must eventually destroy 'md' with raft_metadata_destroy().
570 *
571 * On success, returns NULL. On failure, returns an error that the caller must
572 * eventually destroy and zeros '*md'. */
573 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
574 raft_read_metadata(struct ovsdb_log *log, struct raft_metadata *md)
575 {
576 struct raft *raft = raft_alloc();
577 raft->log = log;
578
579 struct ovsdb_error *error = raft_read_header(raft);
580 if (!error) {
581 md->sid = raft->sid;
582 md->name = xstrdup(raft->name);
583 md->local = xstrdup(raft->local_address);
584 md->cid = raft->cid;
585 } else {
586 memset(md, 0, sizeof *md);
587 }
588
589 raft->log = NULL;
590 raft_close(raft);
591 return error;
592 }
593
594 /* Frees the metadata in 'md'. */
595 void
596 raft_metadata_destroy(struct raft_metadata *md)
597 {
598 if (md) {
599 free(md->name);
600 free(md->local);
601 }
602 }
603
604 static const struct raft_entry *
605 raft_get_entry(const struct raft *raft, uint64_t index)
606 {
607 ovs_assert(index >= raft->log_start);
608 ovs_assert(index < raft->log_end);
609 return &raft->entries[index - raft->log_start];
610 }
611
612 static uint64_t
613 raft_get_term(const struct raft *raft, uint64_t index)
614 {
615 return (index == raft->log_start - 1
616 ? raft->snap.term
617 : raft_get_entry(raft, index)->term);
618 }
619
620 static const struct json *
621 raft_servers_for_index(const struct raft *raft, uint64_t index)
622 {
623 ovs_assert(index >= raft->log_start - 1);
624 ovs_assert(index < raft->log_end);
625
626 const struct json *servers = raft->snap.servers;
627 for (uint64_t i = raft->log_start; i <= index; i++) {
628 const struct raft_entry *e = raft_get_entry(raft, i);
629 if (e->servers) {
630 servers = e->servers;
631 }
632 }
633 return servers;
634 }
635
636 static void
637 raft_set_servers(struct raft *raft, const struct hmap *new_servers,
638 enum vlog_level level)
639 {
640 struct raft_server *s, *next;
641 HMAP_FOR_EACH_SAFE (s, next, hmap_node, &raft->servers) {
642 if (!raft_server_find(new_servers, &s->sid)) {
643 ovs_assert(s != raft->remove_server);
644
645 hmap_remove(&raft->servers, &s->hmap_node);
646 VLOG(level, "server %s removed from configuration", s->nickname);
647 raft_server_destroy(s);
648 }
649 }
650
651 HMAP_FOR_EACH_SAFE (s, next, hmap_node, new_servers) {
652 if (!raft_find_server(raft, &s->sid)) {
653 VLOG(level, "server %s added to configuration", s->nickname);
654
655 struct raft_server *new
656 = raft_server_add(&raft->servers, &s->sid, s->address);
657 raft_server_init_leader(raft, new);
658 }
659 }
660 }
661
662 static uint64_t
663 raft_add_entry(struct raft *raft,
664 uint64_t term, struct json *data, const struct uuid *eid,
665 struct json *servers, uint64_t election_timer)
666 {
667 if (raft->log_end - raft->log_start >= raft->allocated_log) {
668 raft->entries = x2nrealloc(raft->entries, &raft->allocated_log,
669 sizeof *raft->entries);
670 }
671
672 uint64_t index = raft->log_end++;
673 struct raft_entry *entry = &raft->entries[index - raft->log_start];
674 entry->term = term;
675 entry->data = data;
676 entry->eid = eid ? *eid : UUID_ZERO;
677 entry->servers = servers;
678 entry->election_timer = election_timer;
679 return index;
680 }
681
682 /* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers',
683 * 'election_timer' to * 'raft''s log and returns an error indication. */
684 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
685 raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
686 const struct uuid *eid, struct json *servers,
687 uint64_t election_timer)
688 {
689 struct raft_record r = {
690 .type = RAFT_REC_ENTRY,
691 .term = term,
692 .entry = {
693 .index = raft_add_entry(raft, term, data, eid, servers,
694 election_timer),
695 .data = data,
696 .servers = servers,
697 .election_timer = election_timer,
698 .eid = eid ? *eid : UUID_ZERO,
699 },
700 };
701 return ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r));
702 }
703
704 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
705 raft_write_state(struct ovsdb_log *log,
706 uint64_t term, const struct uuid *vote)
707 {
708 struct raft_record r = { .term = term };
709 if (vote && !uuid_is_zero(vote)) {
710 r.type = RAFT_REC_VOTE;
711 r.sid = *vote;
712 } else {
713 r.type = RAFT_REC_TERM;
714 }
715 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
716 }
717
718 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
719 raft_apply_record(struct raft *raft, unsigned long long int rec_idx,
720 const struct raft_record *r)
721 {
722 /* Apply "term", which is present in most kinds of records (and otherwise
723 * 0).
724 *
725 * A Raft leader can replicate entries from previous terms to the other
726 * servers in the cluster, retaining the original terms on those entries
727 * (see section 3.6.2 "Committing entries from previous terms" for more
728 * information), so it's OK for the term in a log record to precede the
729 * current term. */
730 if (r->term > raft->term) {
731 raft->term = raft->synced_term = r->term;
732 raft->vote = raft->synced_vote = UUID_ZERO;
733 }
734
735 switch (r->type) {
736 case RAFT_REC_ENTRY:
737 if (r->entry.index < raft->commit_index) {
738 return ovsdb_error(NULL, "record %llu attempts to truncate log "
739 "from %"PRIu64" to %"PRIu64" entries, but "
740 "commit index is already %"PRIu64,
741 rec_idx, raft->log_end, r->entry.index,
742 raft->commit_index);
743 } else if (r->entry.index > raft->log_end) {
744 return ovsdb_error(NULL, "record %llu with index %"PRIu64" skips "
745 "past expected index %"PRIu64,
746 rec_idx, r->entry.index, raft->log_end);
747 }
748
749 if (r->entry.index < raft->log_end) {
750 /* This can happen, but it is notable. */
751 VLOG_DBG("record %llu truncates log from %"PRIu64" to %"PRIu64
752 " entries", rec_idx, raft->log_end, r->entry.index);
753 raft_truncate(raft, r->entry.index);
754 }
755
756 uint64_t prev_term = (raft->log_end > raft->log_start
757 ? raft->entries[raft->log_end
758 - raft->log_start - 1].term
759 : raft->snap.term);
760 if (r->term < prev_term) {
761 return ovsdb_error(NULL, "record %llu with index %"PRIu64" term "
762 "%"PRIu64" precedes previous entry's term "
763 "%"PRIu64,
764 rec_idx, r->entry.index, r->term, prev_term);
765 }
766
767 raft->log_synced = raft_add_entry(
768 raft, r->term,
769 json_nullable_clone(r->entry.data), &r->entry.eid,
770 json_nullable_clone(r->entry.servers),
771 r->entry.election_timer);
772 return NULL;
773
774 case RAFT_REC_TERM:
775 return NULL;
776
777 case RAFT_REC_VOTE:
778 if (r->term < raft->term) {
779 return ovsdb_error(NULL, "record %llu votes for term %"PRIu64" "
780 "but current term is %"PRIu64,
781 rec_idx, r->term, raft->term);
782 } else if (!uuid_is_zero(&raft->vote)
783 && !uuid_equals(&raft->vote, &r->sid)) {
784 return ovsdb_error(NULL, "record %llu votes for "SID_FMT" in term "
785 "%"PRIu64" but a previous record for the "
786 "same term voted for "SID_FMT, rec_idx,
787 SID_ARGS(&raft->vote), r->term,
788 SID_ARGS(&r->sid));
789 } else {
790 raft->vote = raft->synced_vote = r->sid;
791 return NULL;
792 }
793 break;
794
795 case RAFT_REC_NOTE:
796 if (!strcmp(r->note, "left")) {
797 return ovsdb_error(NULL, "record %llu indicates server has left "
798 "the cluster; it cannot be added back (use "
799 "\"ovsdb-tool join-cluster\" to add a new "
800 "server)", rec_idx);
801 }
802 return NULL;
803
804 case RAFT_REC_COMMIT_INDEX:
805 if (r->commit_index < raft->commit_index) {
806 return ovsdb_error(NULL, "record %llu regresses commit index "
807 "from %"PRIu64 " to %"PRIu64,
808 rec_idx, raft->commit_index, r->commit_index);
809 } else if (r->commit_index >= raft->log_end) {
810 return ovsdb_error(NULL, "record %llu advances commit index to "
811 "%"PRIu64 " but last log index is %"PRIu64,
812 rec_idx, r->commit_index, raft->log_end - 1);
813 } else {
814 raft->commit_index = r->commit_index;
815 return NULL;
816 }
817 break;
818
819 case RAFT_REC_LEADER:
820 /* XXX we could use this to take back leadership for quick restart */
821 return NULL;
822
823 default:
824 OVS_NOT_REACHED();
825 }
826 }
827
828 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
829 raft_read_header(struct raft *raft)
830 {
831 /* Read header record. */
832 struct json *json;
833 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
834 if (error || !json) {
835 /* Report error or end-of-file. */
836 return error;
837 }
838 ovsdb_log_mark_base(raft->log);
839
840 struct raft_header h;
841 error = raft_header_from_json(&h, json);
842 json_destroy(json);
843 if (error) {
844 return error;
845 }
846
847 raft->sid = h.sid;
848 raft->cid = h.cid;
849 raft->name = xstrdup(h.name);
850 raft->local_address = xstrdup(h.local_address);
851 raft->local_nickname = raft_address_to_nickname(h.local_address, &h.sid);
852 raft->joining = h.joining;
853
854 if (h.joining) {
855 sset_clone(&raft->remote_addresses, &h.remote_addresses);
856 } else {
857 raft_entry_clone(&raft->snap, &h.snap);
858 raft->log_start = raft->log_end = h.snap_index + 1;
859 raft->log_synced = raft->commit_index = h.snap_index;
860 raft->last_applied = h.snap_index - 1;
861 }
862
863 raft_header_uninit(&h);
864
865 return NULL;
866 }
867
868 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
869 raft_read_log(struct raft *raft)
870 {
871 for (unsigned long long int i = 1; ; i++) {
872 struct json *json;
873 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
874 if (!json) {
875 if (error) {
876 /* We assume that the error is due to a partial write while
877 * appending to the file before a crash, so log it and
878 * continue. */
879 char *error_string = ovsdb_error_to_string_free(error);
880 VLOG_WARN("%s", error_string);
881 free(error_string);
882 error = NULL;
883 }
884 break;
885 }
886
887 struct raft_record r;
888 error = raft_record_from_json(&r, json);
889 if (!error) {
890 error = raft_apply_record(raft, i, &r);
891 raft_record_uninit(&r);
892 }
893 json_destroy(json);
894 if (error) {
895 return ovsdb_wrap_error(error, "error reading record %llu from "
896 "%s log", i, raft->name);
897 }
898 }
899
900 /* Set the most recent servers. */
901 raft_get_servers_from_log(raft, VLL_DBG);
902
903 /* Set the most recent election_timer. */
904 raft_get_election_timer_from_log(raft);
905
906 return NULL;
907 }
908
909 static void
910 raft_reset_election_timer(struct raft *raft)
911 {
912 unsigned int duration = (raft->election_timer
913 + random_range(ELECTION_RANGE_MSEC));
914 raft->election_base = time_msec();
915 if (failure_test == FT_DELAY_ELECTION) {
916 /* Slow down this node so that it won't win the next election. */
917 duration += raft->election_timer;
918 }
919 raft->election_timeout = raft->election_base + duration;
920 }
921
922 static void
923 raft_reset_ping_timer(struct raft *raft)
924 {
925 raft->ping_timeout = time_msec() + raft->election_timer / 3;
926 }
927
928 static void
929 raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
930 const struct uuid *sid, bool incoming)
931 {
932 struct raft_conn *conn = xzalloc(sizeof *conn);
933 ovs_list_push_back(&raft->conns, &conn->list_node);
934 conn->js = js;
935 if (sid) {
936 conn->sid = *sid;
937 }
938 conn->nickname = raft_address_to_nickname(jsonrpc_session_get_name(js),
939 &conn->sid);
940 conn->incoming = incoming;
941 conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
942 jsonrpc_session_set_probe_interval(js, 0);
943 }
944
945 /* Starts the local server in an existing Raft cluster, using the local copy of
946 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
947 * successful or not. */
948 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
949 raft_open(struct ovsdb_log *log, struct raft **raftp)
950 {
951 struct raft *raft = raft_alloc();
952 raft->log = log;
953
954 struct ovsdb_error *error = raft_read_header(raft);
955 if (error) {
956 goto error;
957 }
958
959 if (!raft->joining) {
960 error = raft_read_log(raft);
961 if (error) {
962 goto error;
963 }
964
965 /* Find our own server. */
966 if (!raft_find_server(raft, &raft->sid)) {
967 error = ovsdb_error(NULL, "server does not belong to cluster");
968 goto error;
969 }
970
971 /* If there's only one server, start an election right away so that the
972 * cluster bootstraps quickly. */
973 if (hmap_count(&raft->servers) == 1) {
974 raft_start_election(raft, false);
975 }
976 } else {
977 raft->join_timeout = time_msec() + 1000;
978 }
979
980 raft_reset_ping_timer(raft);
981 raft_reset_election_timer(raft);
982
983 *raftp = raft;
984 hmap_insert(&all_rafts, &raft->hmap_node, hash_string(raft->name, 0));
985 return NULL;
986
987 error:
988 raft_close(raft);
989 *raftp = NULL;
990 return error;
991 }
992
993 /* Returns the name of 'raft', which in OVSDB is the database schema name. */
994 const char *
995 raft_get_name(const struct raft *raft)
996 {
997 return raft->name;
998 }
999
1000 /* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
1001 * its cluster, then 'cid' will be all-zeros (unless the administrator
1002 * specified a cluster ID running "ovsdb-tool join-cluster").
1003 *
1004 * Each cluster has a unique cluster ID. */
1005 const struct uuid *
1006 raft_get_cid(const struct raft *raft)
1007 {
1008 return &raft->cid;
1009 }
1010
1011 /* Returns the server ID of 'raft'. Each server has a unique server ID. */
1012 const struct uuid *
1013 raft_get_sid(const struct raft *raft)
1014 {
1015 return &raft->sid;
1016 }
1017
1018 /* Adds memory consumption info to 'usage' for later use by memory_report(). */
1019 void
1020 raft_get_memory_usage(const struct raft *raft, struct simap *usage)
1021 {
1022 struct raft_conn *conn;
1023 int cnt = 0;
1024
1025 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1026 simap_increase(usage, "raft-backlog",
1027 jsonrpc_session_get_backlog(conn->js));
1028 cnt++;
1029 }
1030 simap_increase(usage, "raft-connections", cnt);
1031 }
1032
1033 /* Returns true if 'raft' has completed joining its cluster, has not left or
1034 * initiated leaving the cluster, does not have failed disk storage, and is
1035 * apparently connected to the leader in a healthy way (or is itself the
1036 * leader).
1037 *
1038 * If 'raft' is candidate:
1039 * a) if it is the first round of election, consider it as connected, hoping
1040 * it will successfully elect a new leader soon.
1041 * b) if it is already retrying, consider it as disconnected (so that clients
1042 * may decide to reconnect to other members). */
1043 bool
1044 raft_is_connected(const struct raft *raft)
1045 {
1046 bool ret = (!raft->candidate_retrying
1047 && !raft->joining
1048 && !raft->leaving
1049 && !raft->left
1050 && !raft->failed
1051 && raft->ever_had_leader);
1052 VLOG_DBG("raft_is_connected: %s\n", ret? "true": "false");
1053 return ret;
1054 }
1055
1056 /* Returns true if 'raft' is the cluster leader. */
1057 bool
1058 raft_is_leader(const struct raft *raft)
1059 {
1060 return raft->role == RAFT_LEADER;
1061 }
1062
1063 /* Returns true if 'raft' is the process of joining its cluster. */
1064 bool
1065 raft_is_joining(const struct raft *raft)
1066 {
1067 return raft->joining;
1068 }
1069
1070 /* Only returns *connected* connections. */
1071 static struct raft_conn *
1072 raft_find_conn_by_sid(struct raft *raft, const struct uuid *sid)
1073 {
1074 if (!uuid_is_zero(sid)) {
1075 struct raft_conn *conn;
1076 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1077 if (uuid_equals(sid, &conn->sid)
1078 && jsonrpc_session_is_connected(conn->js)) {
1079 return conn;
1080 }
1081 }
1082 }
1083 return NULL;
1084 }
1085
1086 static struct raft_conn *
1087 raft_find_conn_by_address(struct raft *raft, const char *address)
1088 {
1089 struct raft_conn *conn;
1090 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1091 if (!strcmp(jsonrpc_session_get_name(conn->js), address)) {
1092 return conn;
1093 }
1094 }
1095 return NULL;
1096 }
1097
1098 static void OVS_PRINTF_FORMAT(3, 4)
1099 raft_record_note(struct raft *raft, const char *note,
1100 const char *comment_format, ...)
1101 {
1102 va_list args;
1103 va_start(args, comment_format);
1104 char *comment = xvasprintf(comment_format, args);
1105 va_end(args);
1106
1107 struct raft_record r = {
1108 .type = RAFT_REC_NOTE,
1109 .comment = comment,
1110 .note = CONST_CAST(char *, note),
1111 };
1112 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
1113
1114 free(comment);
1115 }
1116
1117 /* If we're leader, try to transfer leadership to another server, logging
1118 * 'reason' as the human-readable reason (it should be a phrase suitable for
1119 * following "because") . */
1120 void
1121 raft_transfer_leadership(struct raft *raft, const char *reason)
1122 {
1123 if (raft->role != RAFT_LEADER) {
1124 return;
1125 }
1126
1127 struct raft_server *s;
1128 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1129 if (!uuid_equals(&raft->sid, &s->sid)
1130 && s->phase == RAFT_PHASE_STABLE) {
1131 struct raft_conn *conn = raft_find_conn_by_sid(raft, &s->sid);
1132 if (!conn) {
1133 continue;
1134 }
1135
1136 union raft_rpc rpc = {
1137 .become_leader = {
1138 .common = {
1139 .comment = CONST_CAST(char *, reason),
1140 .type = RAFT_RPC_BECOME_LEADER,
1141 .sid = s->sid,
1142 },
1143 .term = raft->term,
1144 }
1145 };
1146 raft_send_to_conn(raft, &rpc, conn);
1147
1148 raft_record_note(raft, "transfer leadership",
1149 "transferring leadership to %s because %s",
1150 s->nickname, reason);
1151 break;
1152 }
1153 }
1154 }
1155
1156 /* Send a RemoveServerRequest to the rest of the servers in the cluster.
1157 *
1158 * If we know which server is the leader, we can just send the request to it.
1159 * However, we might not know which server is the leader, and we might never
1160 * find out if the remove request was actually previously committed by a
1161 * majority of the servers (because in that case the new leader will not send
1162 * AppendRequests or heartbeats to us). Therefore, we instead send
1163 * RemoveRequests to every server. This theoretically has the same problem, if
1164 * the current cluster leader was not previously a member of the cluster, but
1165 * it seems likely to be more robust in practice. */
1166 static void
1167 raft_send_remove_server_requests(struct raft *raft)
1168 {
1169 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1170 VLOG_INFO_RL(&rl, "sending remove request (joining=%s, leaving=%s)",
1171 raft->joining ? "true" : "false",
1172 raft->leaving ? "true" : "false");
1173 const struct raft_server *s;
1174 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1175 if (!uuid_equals(&s->sid, &raft->sid)) {
1176 union raft_rpc rpc = (union raft_rpc) {
1177 .remove_server_request = {
1178 .common = {
1179 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1180 .sid = s->sid,
1181 },
1182 .sid = raft->sid,
1183 },
1184 };
1185 raft_send(raft, &rpc);
1186 }
1187 }
1188
1189 raft->leave_timeout = time_msec() + raft->election_timer;
1190 }
1191
1192 /* Attempts to start 'raft' leaving its cluster. The caller can check progress
1193 * using raft_is_leaving() and raft_left(). */
1194 void
1195 raft_leave(struct raft *raft)
1196 {
1197 if (raft->joining || raft->failed || raft->leaving || raft->left) {
1198 return;
1199 }
1200 VLOG_INFO(SID_FMT": starting to leave cluster "CID_FMT,
1201 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1202 raft->leaving = true;
1203 raft_transfer_leadership(raft, "this server is leaving the cluster");
1204 raft_become_follower(raft);
1205 raft_send_remove_server_requests(raft);
1206 raft->leave_timeout = time_msec() + raft->election_timer;
1207 }
1208
1209 /* Returns true if 'raft' is currently attempting to leave its cluster. */
1210 bool
1211 raft_is_leaving(const struct raft *raft)
1212 {
1213 return raft->leaving;
1214 }
1215
1216 /* Returns true if 'raft' successfully left its cluster. */
1217 bool
1218 raft_left(const struct raft *raft)
1219 {
1220 return raft->left;
1221 }
1222
1223 /* Returns true if 'raft' has experienced a disk I/O failure. When this
1224 * returns true, only closing and reopening 'raft' allows for recovery. */
1225 bool
1226 raft_failed(const struct raft *raft)
1227 {
1228 return raft->failed;
1229 }
1230
1231 /* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1232 * current cluster. */
1233 void
1234 raft_take_leadership(struct raft *raft)
1235 {
1236 if (raft->role != RAFT_LEADER) {
1237 raft_start_election(raft, true);
1238 }
1239 }
1240
1241 /* Closes everything owned by 'raft' that might be visible outside the process:
1242 * network connections, commands, etc. This is part of closing 'raft'; it is
1243 * also used if 'raft' has failed in an unrecoverable way. */
1244 static void
1245 raft_close__(struct raft *raft)
1246 {
1247 if (!hmap_node_is_null(&raft->hmap_node)) {
1248 hmap_remove(&all_rafts, &raft->hmap_node);
1249 hmap_node_nullify(&raft->hmap_node);
1250 }
1251
1252 raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
1253
1254 struct raft_server *rs = raft->remove_server;
1255 if (rs) {
1256 raft_send_remove_server_reply__(raft, &rs->sid, &rs->requester_sid,
1257 rs->requester_conn, false,
1258 RAFT_SERVER_SHUTDOWN);
1259 raft_server_destroy(raft->remove_server);
1260 raft->remove_server = NULL;
1261 }
1262
1263 struct raft_conn *conn, *next;
1264 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1265 raft_conn_close(conn);
1266 }
1267 }
1268
1269 /* Closes and frees 'raft'.
1270 *
1271 * A server's cluster membership is independent of whether the server is
1272 * actually running. When a server that is a member of a cluster closes, the
1273 * cluster treats this as a server failure. */
1274 void
1275 raft_close(struct raft *raft)
1276 {
1277 if (!raft) {
1278 return;
1279 }
1280
1281 raft_transfer_leadership(raft, "this server is shutting down");
1282
1283 raft_close__(raft);
1284
1285 ovsdb_log_close(raft->log);
1286
1287 raft_servers_destroy(&raft->servers);
1288
1289 for (uint64_t index = raft->log_start; index < raft->log_end; index++) {
1290 struct raft_entry *e = &raft->entries[index - raft->log_start];
1291 raft_entry_uninit(e);
1292 }
1293 free(raft->entries);
1294
1295 raft_entry_uninit(&raft->snap);
1296
1297 raft_waiters_destroy(raft);
1298
1299 raft_servers_destroy(&raft->add_servers);
1300
1301 hmap_destroy(&raft->commands);
1302
1303 pstream_close(raft->listener);
1304
1305 sset_destroy(&raft->remote_addresses);
1306 free(raft->local_address);
1307 free(raft->local_nickname);
1308 free(raft->name);
1309
1310 free(raft);
1311 }
1312
1313 static bool
1314 raft_conn_receive(struct raft *raft, struct raft_conn *conn,
1315 union raft_rpc *rpc)
1316 {
1317 struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
1318 if (!msg) {
1319 return false;
1320 }
1321
1322 struct ovsdb_error *error = raft_rpc_from_jsonrpc(&raft->cid, &raft->sid,
1323 msg, rpc);
1324 jsonrpc_msg_destroy(msg);
1325 if (error) {
1326 char *s = ovsdb_error_to_string_free(error);
1327 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn->js), s);
1328 free(s);
1329 return false;
1330 }
1331
1332 if (uuid_is_zero(&conn->sid)) {
1333 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1334 conn->sid = rpc->common.sid;
1335 VLOG_INFO_RL(&rl, "%s: learned server ID "SID_FMT,
1336 jsonrpc_session_get_name(conn->js), SID_ARGS(&conn->sid));
1337 } else if (!uuid_equals(&conn->sid, &rpc->common.sid)) {
1338 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1339 VLOG_WARN_RL(&rl, "%s: ignoring message with unexpected server ID "
1340 SID_FMT" (expected "SID_FMT")",
1341 jsonrpc_session_get_name(conn->js),
1342 SID_ARGS(&rpc->common.sid), SID_ARGS(&conn->sid));
1343 raft_rpc_uninit(rpc);
1344 return false;
1345 }
1346
1347 const char *address = (rpc->type == RAFT_RPC_HELLO_REQUEST
1348 ? rpc->hello_request.address
1349 : rpc->type == RAFT_RPC_ADD_SERVER_REQUEST
1350 ? rpc->add_server_request.address
1351 : NULL);
1352 if (address) {
1353 char *new_nickname = raft_address_to_nickname(address, &conn->sid);
1354 if (strcmp(conn->nickname, new_nickname)) {
1355 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1356 VLOG_INFO_RL(&rl, "%s: learned remote address %s",
1357 jsonrpc_session_get_name(conn->js), address);
1358
1359 free(conn->nickname);
1360 conn->nickname = new_nickname;
1361 } else {
1362 free(new_nickname);
1363 }
1364 }
1365
1366 return true;
1367 }
1368
1369 static const char *
1370 raft_get_nickname(const struct raft *raft, const struct uuid *sid,
1371 char buf[SID_LEN + 1], size_t bufsize)
1372 {
1373 if (uuid_equals(sid, &raft->sid)) {
1374 return raft->local_nickname;
1375 }
1376
1377 const char *s = raft_servers_get_nickname__(&raft->servers, sid);
1378 if (s) {
1379 return s;
1380 }
1381
1382 return raft_servers_get_nickname(&raft->add_servers, sid, buf, bufsize);
1383 }
1384
1385 static void
1386 log_rpc(const union raft_rpc *rpc, const char *direction,
1387 const struct raft_conn *conn, int line_number)
1388 {
1389 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
1390 if (!raft_rpc_is_heartbeat(rpc) && !VLOG_DROP_DBG(&rl)) {
1391 struct ds s = DS_EMPTY_INITIALIZER;
1392 if (line_number) {
1393 ds_put_format(&s, "raft.c:%d ", line_number);
1394 }
1395 ds_put_format(&s, "%s%s ", direction, conn->nickname);
1396 raft_rpc_format(rpc, &s);
1397 VLOG_DBG("%s", ds_cstr(&s));
1398 ds_destroy(&s);
1399 }
1400 }
1401
1402 static void
1403 raft_send_add_server_request(struct raft *raft, struct raft_conn *conn)
1404 {
1405 union raft_rpc rq = {
1406 .add_server_request = {
1407 .common = {
1408 .type = RAFT_RPC_ADD_SERVER_REQUEST,
1409 .sid = UUID_ZERO,
1410 .comment = NULL,
1411 },
1412 .address = raft->local_address,
1413 },
1414 };
1415 raft_send_to_conn(raft, &rq, conn);
1416 }
1417
1418 static void
1419 raft_conn_run(struct raft *raft, struct raft_conn *conn)
1420 {
1421 jsonrpc_session_run(conn->js);
1422
1423 unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
1424 bool reconnected = new_seqno != conn->js_seqno;
1425 bool just_connected = (reconnected
1426 && jsonrpc_session_is_connected(conn->js));
1427
1428 if (reconnected) {
1429 /* Clear 'last_install_snapshot_request' since it might not reach the
1430 * destination or server was restarted. */
1431 struct raft_server *server = raft_find_server(raft, &conn->sid);
1432 if (server) {
1433 free(server->last_install_snapshot_request);
1434 server->last_install_snapshot_request = NULL;
1435 }
1436 }
1437
1438 conn->js_seqno = new_seqno;
1439 if (just_connected) {
1440 if (raft->joining) {
1441 raft_send_add_server_request(raft, conn);
1442 } else if (raft->leaving) {
1443 union raft_rpc rq = {
1444 .remove_server_request = {
1445 .common = {
1446 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1447 .sid = conn->sid,
1448 },
1449 .sid = raft->sid,
1450 },
1451 };
1452 raft_send_to_conn(raft, &rq, conn);
1453 } else {
1454 union raft_rpc rq = (union raft_rpc) {
1455 .hello_request = {
1456 .common = {
1457 .type = RAFT_RPC_HELLO_REQUEST,
1458 .sid = conn->sid,
1459 },
1460 .address = raft->local_address,
1461 },
1462 };
1463 raft_send_to_conn(raft, &rq, conn);
1464 }
1465 }
1466
1467 for (size_t i = 0; i < 50; i++) {
1468 union raft_rpc rpc;
1469 if (!raft_conn_receive(raft, conn, &rpc)) {
1470 break;
1471 }
1472
1473 log_rpc(&rpc, "<--", conn, 0);
1474 raft_handle_rpc(raft, &rpc);
1475 raft_rpc_uninit(&rpc);
1476 }
1477 }
1478
1479 static void
1480 raft_waiter_complete_rpc(struct raft *raft, const union raft_rpc *rpc)
1481 {
1482 uint64_t term = raft_rpc_get_term(rpc);
1483 if (term && term < raft->term) {
1484 /* Drop the message because it's for an expired term. */
1485 return;
1486 }
1487
1488 if (!raft_is_rpc_synced(raft, rpc)) {
1489 /* This is a bug. A reply message is deferred because some state in
1490 * the message, such as a term or index, has not been committed to
1491 * disk, and they should only be completed when that commit is done.
1492 * But this message is being completed before the commit is finished.
1493 * Complain, and hope that someone reports the bug. */
1494 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1495 if (VLOG_DROP_ERR(&rl)) {
1496 return;
1497 }
1498
1499 struct ds s = DS_EMPTY_INITIALIZER;
1500
1501 if (term > raft->synced_term) {
1502 ds_put_format(&s, " because message term %"PRIu64" is "
1503 "past synced term %"PRIu64,
1504 term, raft->synced_term);
1505 }
1506
1507 uint64_t index = raft_rpc_get_min_sync_index(rpc);
1508 if (index > raft->log_synced) {
1509 ds_put_format(&s, " %s message index %"PRIu64" is past last "
1510 "synced index %"PRIu64,
1511 s.length ? "and" : "because",
1512 index, raft->log_synced);
1513 }
1514
1515 const struct uuid *vote = raft_rpc_get_vote(rpc);
1516 if (vote && !uuid_equals(vote, &raft->synced_vote)) {
1517 char buf1[SID_LEN + 1];
1518 char buf2[SID_LEN + 1];
1519 ds_put_format(&s, " %s vote %s differs from synced vote %s",
1520 s.length ? "and" : "because",
1521 raft_get_nickname(raft, vote, buf1, sizeof buf1),
1522 raft_get_nickname(raft, &raft->synced_vote,
1523 buf2, sizeof buf2));
1524 }
1525
1526 char buf[SID_LEN + 1];
1527 ds_put_format(&s, ": %s ",
1528 raft_get_nickname(raft, &rpc->common.sid,
1529 buf, sizeof buf));
1530 raft_rpc_format(rpc, &s);
1531 VLOG_ERR("internal error: deferred %s message completed "
1532 "but not ready to send%s",
1533 raft_rpc_type_to_string(rpc->type), ds_cstr(&s));
1534 ds_destroy(&s);
1535
1536 return;
1537 }
1538
1539 struct raft_conn *dst = raft_find_conn_by_sid(raft, &rpc->common.sid);
1540 if (dst) {
1541 raft_send_to_conn(raft, rpc, dst);
1542 }
1543 }
1544
1545 static void
1546 raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
1547 {
1548 switch (w->type) {
1549 case RAFT_W_ENTRY:
1550 if (raft->role == RAFT_LEADER) {
1551 raft_update_our_match_index(raft, w->entry.index);
1552 }
1553 raft->log_synced = w->entry.index;
1554 break;
1555
1556 case RAFT_W_TERM:
1557 raft->synced_term = w->term.term;
1558 raft->synced_vote = w->term.vote;
1559 break;
1560
1561 case RAFT_W_RPC:
1562 raft_waiter_complete_rpc(raft, w->rpc);
1563 break;
1564 }
1565 }
1566
1567 static void
1568 raft_waiter_destroy(struct raft_waiter *w)
1569 {
1570 if (!w) {
1571 return;
1572 }
1573
1574 ovs_list_remove(&w->list_node);
1575
1576 switch (w->type) {
1577 case RAFT_W_ENTRY:
1578 case RAFT_W_TERM:
1579 break;
1580
1581 case RAFT_W_RPC:
1582 raft_rpc_uninit(w->rpc);
1583 free(w->rpc);
1584 break;
1585 }
1586 free(w);
1587 }
1588
1589 static void
1590 raft_waiters_run(struct raft *raft)
1591 {
1592 if (ovs_list_is_empty(&raft->waiters)) {
1593 return;
1594 }
1595
1596 uint64_t cur = ovsdb_log_commit_progress(raft->log);
1597 struct raft_waiter *w, *next;
1598 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1599 if (cur < w->commit_ticket) {
1600 break;
1601 }
1602 raft_waiter_complete(raft, w);
1603 raft_waiter_destroy(w);
1604 }
1605 }
1606
1607 static void
1608 raft_waiters_wait(struct raft *raft)
1609 {
1610 struct raft_waiter *w;
1611 LIST_FOR_EACH (w, list_node, &raft->waiters) {
1612 ovsdb_log_commit_wait(raft->log, w->commit_ticket);
1613 break;
1614 }
1615 }
1616
1617 static void
1618 raft_waiters_destroy(struct raft *raft)
1619 {
1620 struct raft_waiter *w, *next;
1621 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1622 raft_waiter_destroy(w);
1623 }
1624 }
1625
1626 static bool OVS_WARN_UNUSED_RESULT
1627 raft_set_term(struct raft *raft, uint64_t term, const struct uuid *vote)
1628 {
1629 struct ovsdb_error *error = raft_write_state(raft->log, term, vote);
1630 if (!raft_handle_write_error(raft, error)) {
1631 return false;
1632 }
1633
1634 struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_TERM, true);
1635 raft->term = w->term.term = term;
1636 raft->vote = w->term.vote = vote ? *vote : UUID_ZERO;
1637 return true;
1638 }
1639
1640 static void
1641 raft_accept_vote(struct raft *raft, struct raft_server *s,
1642 const struct uuid *vote)
1643 {
1644 if (uuid_equals(&s->vote, vote)) {
1645 return;
1646 }
1647 if (!uuid_is_zero(&s->vote)) {
1648 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1649 char buf1[SID_LEN + 1];
1650 char buf2[SID_LEN + 1];
1651 VLOG_WARN_RL(&rl, "server %s changed its vote from %s to %s",
1652 s->nickname,
1653 raft_get_nickname(raft, &s->vote, buf1, sizeof buf1),
1654 raft_get_nickname(raft, vote, buf2, sizeof buf2));
1655 }
1656 s->vote = *vote;
1657 if (uuid_equals(vote, &raft->sid)
1658 && ++raft->n_votes > hmap_count(&raft->servers) / 2) {
1659 raft_become_leader(raft);
1660 }
1661 }
1662
1663 static void
1664 raft_start_election(struct raft *raft, bool leadership_transfer)
1665 {
1666 if (raft->leaving) {
1667 return;
1668 }
1669
1670 struct raft_server *me = raft_find_server(raft, &raft->sid);
1671 if (!me) {
1672 return;
1673 }
1674
1675 if (!raft_set_term(raft, raft->term + 1, &raft->sid)) {
1676 return;
1677 }
1678
1679 ovs_assert(raft->role != RAFT_LEADER);
1680
1681 raft->leader_sid = UUID_ZERO;
1682 raft->role = RAFT_CANDIDATE;
1683 /* If there was no leader elected since last election, we know we are
1684 * retrying now. */
1685 raft->candidate_retrying = !raft->had_leader;
1686 raft->had_leader = false;
1687
1688 raft->n_votes = 0;
1689
1690 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1691 if (!VLOG_DROP_INFO(&rl)) {
1692 long long int now = time_msec();
1693 if (now >= raft->election_timeout) {
1694 VLOG_INFO("term %"PRIu64": %lld ms timeout expired, "
1695 "starting election",
1696 raft->term, now - raft->election_base);
1697 } else {
1698 VLOG_INFO("term %"PRIu64": starting election", raft->term);
1699 }
1700 }
1701 raft_reset_election_timer(raft);
1702
1703 struct raft_server *peer;
1704 HMAP_FOR_EACH (peer, hmap_node, &raft->servers) {
1705 peer->vote = UUID_ZERO;
1706 if (uuid_equals(&raft->sid, &peer->sid)) {
1707 continue;
1708 }
1709
1710 union raft_rpc rq = {
1711 .vote_request = {
1712 .common = {
1713 .type = RAFT_RPC_VOTE_REQUEST,
1714 .sid = peer->sid,
1715 },
1716 .term = raft->term,
1717 .last_log_index = raft->log_end - 1,
1718 .last_log_term = (
1719 raft->log_end > raft->log_start
1720 ? raft->entries[raft->log_end - raft->log_start - 1].term
1721 : raft->snap.term),
1722 .leadership_transfer = leadership_transfer,
1723 },
1724 };
1725 if (failure_test != FT_DONT_SEND_VOTE_REQUEST) {
1726 raft_send(raft, &rq);
1727 }
1728 }
1729
1730 /* Vote for ourselves. */
1731 raft_accept_vote(raft, me, &raft->sid);
1732 }
1733
1734 static void
1735 raft_open_conn(struct raft *raft, const char *address, const struct uuid *sid)
1736 {
1737 if (strcmp(address, raft->local_address)
1738 && !raft_find_conn_by_address(raft, address)) {
1739 raft_add_conn(raft, jsonrpc_session_open(address, true), sid, false);
1740 }
1741 }
1742
1743 static void
1744 raft_conn_close(struct raft_conn *conn)
1745 {
1746 jsonrpc_session_close(conn->js);
1747 ovs_list_remove(&conn->list_node);
1748 free(conn->nickname);
1749 free(conn);
1750 }
1751
1752 /* Returns true if 'conn' should stay open, false if it should be closed. */
1753 static bool
1754 raft_conn_should_stay_open(struct raft *raft, struct raft_conn *conn)
1755 {
1756 /* Close the connection if it's actually dead. If necessary, we'll
1757 * initiate a new session later. */
1758 if (!jsonrpc_session_is_alive(conn->js)) {
1759 return false;
1760 }
1761
1762 /* Keep incoming sessions. We trust the originator to decide to drop
1763 * it. */
1764 if (conn->incoming) {
1765 return true;
1766 }
1767
1768 /* If we are joining the cluster, keep sessions to the remote addresses
1769 * that are supposed to be part of the cluster we're joining. */
1770 if (raft->joining && sset_contains(&raft->remote_addresses,
1771 jsonrpc_session_get_name(conn->js))) {
1772 return true;
1773 }
1774
1775 /* We have joined the cluster. If we did that "recently", then there is a
1776 * chance that we do not have the most recent server configuration log
1777 * entry. If so, it's a waste to disconnect from the servers that were in
1778 * remote_addresses and that will probably appear in the configuration,
1779 * just to reconnect to them a moment later when we do get the
1780 * configuration update. If we are not ourselves in the configuration,
1781 * then we know that there must be a new configuration coming up, so in
1782 * that case keep the connection. */
1783 if (!raft_find_server(raft, &raft->sid)) {
1784 return true;
1785 }
1786
1787 /* Keep the connection only if the server is part of the configuration. */
1788 return raft_find_server(raft, &conn->sid);
1789 }
1790
1791 /* Allows 'raft' to maintain the distributed log. Call this function as part
1792 * of the process's main loop. */
1793 void
1794 raft_run(struct raft *raft)
1795 {
1796 if (raft->left || raft->failed) {
1797 return;
1798 }
1799
1800 raft_waiters_run(raft);
1801
1802 if (!raft->listener && time_msec() >= raft->listen_backoff) {
1803 char *paddr = raft_make_address_passive(raft->local_address);
1804 int error = pstream_open(paddr, &raft->listener, DSCP_DEFAULT);
1805 if (error) {
1806 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1807 VLOG_WARN_RL(&rl, "%s: listen failed (%s)",
1808 paddr, ovs_strerror(error));
1809 raft->listen_backoff = time_msec() + 1000;
1810 }
1811 free(paddr);
1812 }
1813
1814 if (raft->listener) {
1815 struct stream *stream;
1816 int error = pstream_accept(raft->listener, &stream);
1817 if (!error) {
1818 raft_add_conn(raft, jsonrpc_session_open_unreliably(
1819 jsonrpc_open(stream), DSCP_DEFAULT), NULL,
1820 true);
1821 } else if (error != EAGAIN) {
1822 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1823 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
1824 pstream_get_name(raft->listener),
1825 ovs_strerror(error));
1826 }
1827 }
1828
1829 /* Run RPCs for all open sessions. */
1830 struct raft_conn *conn;
1831 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1832 raft_conn_run(raft, conn);
1833 }
1834
1835 /* Close unneeded sessions. */
1836 struct raft_conn *next;
1837 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1838 if (!raft_conn_should_stay_open(raft, conn)) {
1839 raft_conn_close(conn);
1840 }
1841 }
1842
1843 /* Open needed sessions. */
1844 struct raft_server *server;
1845 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1846 raft_open_conn(raft, server->address, &server->sid);
1847 }
1848 if (raft->joining) {
1849 const char *address;
1850 SSET_FOR_EACH (address, &raft->remote_addresses) {
1851 raft_open_conn(raft, address, NULL);
1852 }
1853 }
1854
1855 if (!raft->joining && time_msec() >= raft->election_timeout) {
1856 if (raft->role == RAFT_LEADER) {
1857 /* Check if majority of followers replied, then reset
1858 * election_timeout and reset s->replied. Otherwise, become
1859 * follower.
1860 *
1861 * Raft paper section 6.2: Leaders: A server might be in the leader
1862 * state, but if it isn’t the current leader, it could be
1863 * needlessly delaying client requests. For example, suppose a
1864 * leader is partitioned from the rest of the cluster, but it can
1865 * still communicate with a particular client. Without additional
1866 * mechanism, it could delay a request from that client forever,
1867 * being unable to replicate a log entry to any other servers.
1868 * Meanwhile, there might be another leader of a newer term that is
1869 * able to communicate with a majority of the cluster and would be
1870 * able to commit the client’s request. Thus, a leader in Raft
1871 * steps down if an election timeout elapses without a successful
1872 * round of heartbeats to a majority of its cluster; this allows
1873 * clients to retry their requests with another server. */
1874 int count = 0;
1875 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1876 if (server->replied) {
1877 count ++;
1878 }
1879 }
1880 if (count >= hmap_count(&raft->servers) / 2) {
1881 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1882 server->replied = false;
1883 }
1884 raft_reset_election_timer(raft);
1885 } else {
1886 raft_become_follower(raft);
1887 raft_start_election(raft, false);
1888 }
1889 } else {
1890 raft_start_election(raft, false);
1891 }
1892
1893 }
1894
1895 if (raft->leaving && time_msec() >= raft->leave_timeout) {
1896 raft_send_remove_server_requests(raft);
1897 }
1898
1899 if (raft->joining && time_msec() >= raft->join_timeout) {
1900 raft->join_timeout = time_msec() + 1000;
1901 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1902 raft_send_add_server_request(raft, conn);
1903 }
1904 }
1905
1906 long long int now = time_msec();
1907 if (now >= raft->ping_timeout) {
1908 if (raft->role == RAFT_LEADER) {
1909 raft_send_heartbeats(raft);
1910 }
1911 /* Check if any commands timeout. Timeout is set to twice the time of
1912 * election base time so that commands can complete properly during
1913 * leader election. E.g. a leader crashed and current node with pending
1914 * commands becomes new leader: the pending commands can still complete
1915 * if the crashed leader has replicated the transactions to majority of
1916 * followers before it crashed. */
1917 struct raft_command *cmd, *next_cmd;
1918 HMAP_FOR_EACH_SAFE (cmd, next_cmd, hmap_node, &raft->commands) {
1919 if (cmd->timestamp
1920 && now - cmd->timestamp > raft->election_timer * 2) {
1921 raft_command_complete(raft, cmd, RAFT_CMD_TIMEOUT);
1922 }
1923 }
1924 raft_reset_ping_timer(raft);
1925 }
1926
1927 /* Do this only at the end; if we did it as soon as we set raft->left or
1928 * raft->failed in handling the RemoveServerReply, then it could easily
1929 * cause references to freed memory in RPC sessions, etc. */
1930 if (raft->left || raft->failed) {
1931 raft_close__(raft);
1932 }
1933 }
1934
1935 static void
1936 raft_wait_session(struct jsonrpc_session *js)
1937 {
1938 if (js) {
1939 jsonrpc_session_wait(js);
1940 jsonrpc_session_recv_wait(js);
1941 }
1942 }
1943
1944 /* Causes the next call to poll_block() to wake up when 'raft' needs to do
1945 * something. */
1946 void
1947 raft_wait(struct raft *raft)
1948 {
1949 if (raft->left || raft->failed) {
1950 return;
1951 }
1952
1953 raft_waiters_wait(raft);
1954
1955 if (raft->listener) {
1956 pstream_wait(raft->listener);
1957 } else {
1958 poll_timer_wait_until(raft->listen_backoff);
1959 }
1960
1961 struct raft_conn *conn;
1962 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1963 raft_wait_session(conn->js);
1964 }
1965
1966 if (!raft->joining) {
1967 poll_timer_wait_until(raft->election_timeout);
1968 } else {
1969 poll_timer_wait_until(raft->join_timeout);
1970 }
1971 if (raft->leaving) {
1972 poll_timer_wait_until(raft->leave_timeout);
1973 }
1974 if (raft->role == RAFT_LEADER || !hmap_is_empty(&raft->commands)) {
1975 poll_timer_wait_until(raft->ping_timeout);
1976 }
1977 }
1978
1979 static struct raft_waiter *
1980 raft_waiter_create(struct raft *raft, enum raft_waiter_type type,
1981 bool start_commit)
1982 {
1983 struct raft_waiter *w = xzalloc(sizeof *w);
1984 ovs_list_push_back(&raft->waiters, &w->list_node);
1985 w->commit_ticket = start_commit ? ovsdb_log_commit_start(raft->log) : 0;
1986 w->type = type;
1987 return w;
1988 }
1989
1990 /* Returns a human-readable representation of 'status' (or NULL if 'status' is
1991 * invalid). */
1992 const char *
1993 raft_command_status_to_string(enum raft_command_status status)
1994 {
1995 switch (status) {
1996 case RAFT_CMD_INCOMPLETE:
1997 return "operation still in progress";
1998 case RAFT_CMD_SUCCESS:
1999 return "success";
2000 case RAFT_CMD_NOT_LEADER:
2001 return "not leader";
2002 case RAFT_CMD_BAD_PREREQ:
2003 return "prerequisite check failed";
2004 case RAFT_CMD_LOST_LEADERSHIP:
2005 return "lost leadership";
2006 case RAFT_CMD_SHUTDOWN:
2007 return "server shutdown";
2008 case RAFT_CMD_IO_ERROR:
2009 return "I/O error";
2010 case RAFT_CMD_TIMEOUT:
2011 return "timeout";
2012 default:
2013 return NULL;
2014 }
2015 }
2016
2017 /* Converts human-readable status in 's' into status code in '*statusp'.
2018 * Returns true if successful, false if 's' is unknown. */
2019 bool
2020 raft_command_status_from_string(const char *s,
2021 enum raft_command_status *statusp)
2022 {
2023 for (enum raft_command_status status = 0; ; status++) {
2024 const char *s2 = raft_command_status_to_string(status);
2025 if (!s2) {
2026 *statusp = 0;
2027 return false;
2028 } else if (!strcmp(s, s2)) {
2029 *statusp = status;
2030 return true;
2031 }
2032 }
2033 }
2034
2035 static const struct uuid *
2036 raft_get_eid(const struct raft *raft, uint64_t index)
2037 {
2038 for (; index >= raft->log_start; index--) {
2039 const struct raft_entry *e = raft_get_entry(raft, index);
2040 if (e->data) {
2041 return &e->eid;
2042 }
2043 }
2044 return &raft->snap.eid;
2045 }
2046
2047 const struct uuid *
2048 raft_current_eid(const struct raft *raft)
2049 {
2050 return raft_get_eid(raft, raft->log_end - 1);
2051 }
2052
2053 static struct raft_command *
2054 raft_command_create_completed(enum raft_command_status status)
2055 {
2056 ovs_assert(status != RAFT_CMD_INCOMPLETE);
2057
2058 struct raft_command *cmd = xzalloc(sizeof *cmd);
2059 cmd->n_refs = 1;
2060 cmd->status = status;
2061 return cmd;
2062 }
2063
2064 static struct raft_command *
2065 raft_command_create_incomplete(struct raft *raft, uint64_t index)
2066 {
2067 struct raft_command *cmd = xzalloc(sizeof *cmd);
2068 cmd->n_refs = 2; /* One for client, one for raft->commands. */
2069 cmd->index = index;
2070 cmd->status = RAFT_CMD_INCOMPLETE;
2071 hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
2072 return cmd;
2073 }
2074
2075 static struct raft_command * OVS_WARN_UNUSED_RESULT
2076 raft_command_initiate(struct raft *raft,
2077 const struct json *data, const struct json *servers,
2078 uint64_t election_timer, const struct uuid *eid)
2079 {
2080 /* Write to local log. */
2081 uint64_t index = raft->log_end;
2082 if (!raft_handle_write_error(
2083 raft, raft_write_entry(
2084 raft, raft->term, json_nullable_clone(data), eid,
2085 json_nullable_clone(servers),
2086 election_timer))) {
2087 return raft_command_create_completed(RAFT_CMD_IO_ERROR);
2088 }
2089
2090 struct raft_command *cmd = raft_command_create_incomplete(raft, index);
2091 ovs_assert(eid);
2092 cmd->eid = *eid;
2093 cmd->timestamp = time_msec();
2094
2095 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index = cmd->index;
2096
2097 if (failure_test == FT_CRASH_BEFORE_SEND_APPEND_REQ) {
2098 ovs_fatal(0, "Raft test: crash before sending append_request.");
2099 }
2100 /* Write to remote logs. */
2101 struct raft_server *s;
2102 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2103 if (!uuid_equals(&s->sid, &raft->sid) && s->next_index == index) {
2104 raft_send_append_request(raft, s, 1, "execute command");
2105 s->next_index++;
2106 }
2107 }
2108 if (failure_test == FT_CRASH_AFTER_SEND_APPEND_REQ) {
2109 ovs_fatal(0, "Raft test: crash after sending append_request.");
2110 }
2111 raft_reset_ping_timer(raft);
2112
2113 return cmd;
2114 }
2115
2116 static void
2117 log_all_commands(struct raft *raft)
2118 {
2119 struct raft_command *cmd, *next;
2120 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2121 VLOG_DBG("raft command eid: "UUID_FMT, UUID_ARGS(&cmd->eid));
2122 }
2123 }
2124
2125 static struct raft_command * OVS_WARN_UNUSED_RESULT
2126 raft_command_execute__(struct raft *raft, const struct json *data,
2127 const struct json *servers, uint64_t election_timer,
2128 const struct uuid *prereq, struct uuid *result)
2129 {
2130 if (raft->joining || raft->leaving || raft->left || raft->failed) {
2131 return raft_command_create_completed(RAFT_CMD_SHUTDOWN);
2132 }
2133
2134 if (raft->role != RAFT_LEADER) {
2135 /* Consider proxying the command to the leader. We can only do that if
2136 * we know the leader and the command does not change the set of
2137 * servers. We do not proxy commands without prerequisites, even
2138 * though we could, because in an OVSDB context a log entry doesn't
2139 * make sense without context. */
2140 if (servers || election_timer || !data
2141 || raft->role != RAFT_FOLLOWER || uuid_is_zero(&raft->leader_sid)
2142 || !prereq) {
2143 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2144 }
2145 }
2146
2147 struct uuid eid = data ? uuid_random() : UUID_ZERO;
2148 if (result) {
2149 *result = eid;
2150 }
2151
2152 if (raft->role != RAFT_LEADER) {
2153 const union raft_rpc rpc = {
2154 .execute_command_request = {
2155 .common = {
2156 .type = RAFT_RPC_EXECUTE_COMMAND_REQUEST,
2157 .sid = raft->leader_sid,
2158 },
2159 .data = CONST_CAST(struct json *, data),
2160 .prereq = *prereq,
2161 .result = eid,
2162 }
2163 };
2164 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REQ) {
2165 ovs_fatal(0, "Raft test: crash before sending "
2166 "execute_command_request");
2167 }
2168 if (!raft_send(raft, &rpc)) {
2169 /* Couldn't send command, so it definitely failed. */
2170 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2171 }
2172 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REQ) {
2173 ovs_fatal(0, "Raft test: crash after sending "
2174 "execute_command_request");
2175 }
2176
2177 struct raft_command *cmd = raft_command_create_incomplete(raft, 0);
2178 cmd->timestamp = time_msec();
2179 cmd->eid = eid;
2180 log_all_commands(raft);
2181 return cmd;
2182 }
2183
2184 const struct uuid *current_eid = raft_current_eid(raft);
2185 if (prereq && !uuid_equals(prereq, current_eid)) {
2186 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2187 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
2188 "prerequisite "UUID_FMT,
2189 UUID_ARGS(current_eid), UUID_ARGS(prereq));
2190 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ);
2191 }
2192
2193 return raft_command_initiate(raft, data, servers, election_timer, &eid);
2194 }
2195
2196 /* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2197 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2198 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2199 * populated with the entry ID for the new log entry.
2200 *
2201 * Returns a "struct raft_command" that may be used to track progress adding
2202 * the log entry. The caller must eventually free the returned structure, with
2203 * raft_command_unref(). */
2204 struct raft_command * OVS_WARN_UNUSED_RESULT
2205 raft_command_execute(struct raft *raft, const struct json *data,
2206 const struct uuid *prereq, struct uuid *result)
2207 {
2208 return raft_command_execute__(raft, data, NULL, 0, prereq, result);
2209 }
2210
2211 /* Returns the status of 'cmd'. */
2212 enum raft_command_status
2213 raft_command_get_status(const struct raft_command *cmd)
2214 {
2215 ovs_assert(cmd->n_refs > 0);
2216 return cmd->status;
2217 }
2218
2219 /* Returns the index of the log entry at which 'cmd' was committed.
2220 *
2221 * This function works only with successful commands. */
2222 uint64_t
2223 raft_command_get_commit_index(const struct raft_command *cmd)
2224 {
2225 ovs_assert(cmd->n_refs > 0);
2226 ovs_assert(cmd->status == RAFT_CMD_SUCCESS);
2227 return cmd->index;
2228 }
2229
2230 /* Frees 'cmd'. */
2231 void
2232 raft_command_unref(struct raft_command *cmd)
2233 {
2234 if (cmd) {
2235 ovs_assert(cmd->n_refs > 0);
2236 if (!--cmd->n_refs) {
2237 free(cmd);
2238 }
2239 }
2240 }
2241
2242 /* Causes poll_block() to wake up when 'cmd' has status to report. */
2243 void
2244 raft_command_wait(const struct raft_command *cmd)
2245 {
2246 if (cmd->status != RAFT_CMD_INCOMPLETE) {
2247 poll_immediate_wake();
2248 }
2249 }
2250
2251 static void
2252 raft_command_complete(struct raft *raft,
2253 struct raft_command *cmd,
2254 enum raft_command_status status)
2255 {
2256 VLOG_DBG("raft_command_complete eid "UUID_FMT" status: %s",
2257 UUID_ARGS(&cmd->eid), raft_command_status_to_string(status));
2258 if (!uuid_is_zero(&cmd->sid)) {
2259 uint64_t commit_index = status == RAFT_CMD_SUCCESS ? cmd->index : 0;
2260 raft_send_execute_command_reply(raft, &cmd->sid, &cmd->eid, status,
2261 commit_index);
2262 }
2263
2264 ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
2265 ovs_assert(cmd->n_refs > 0);
2266 hmap_remove(&raft->commands, &cmd->hmap_node);
2267 cmd->status = status;
2268 raft_command_unref(cmd);
2269 }
2270
2271 static void
2272 raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
2273 {
2274 struct raft_command *cmd, *next;
2275 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2276 raft_command_complete(raft, cmd, status);
2277 }
2278 }
2279
2280 static struct raft_command *
2281 raft_find_command_by_eid(struct raft *raft, const struct uuid *eid)
2282 {
2283 struct raft_command *cmd;
2284
2285 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2286 if (uuid_equals(&cmd->eid, eid)) {
2287 return cmd;
2288 }
2289 }
2290 return NULL;
2291 }
2292 \f
2293 #define RAFT_RPC(ENUM, NAME) \
2294 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2295 RAFT_RPC_TYPES
2296 #undef RAFT_RPC
2297
2298 static void
2299 raft_handle_hello_request(struct raft *raft OVS_UNUSED,
2300 const struct raft_hello_request *hello OVS_UNUSED)
2301 {
2302 }
2303
2304 /* 'sid' is the server being added. */
2305 static void
2306 raft_send_add_server_reply__(struct raft *raft, const struct uuid *sid,
2307 const char *address,
2308 bool success, const char *comment)
2309 {
2310 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2311 if (!VLOG_DROP_INFO(&rl)) {
2312 struct ds s = DS_EMPTY_INITIALIZER;
2313 char buf[SID_LEN + 1];
2314 ds_put_format(&s, "adding %s ("SID_FMT" at %s) "
2315 "to cluster "CID_FMT" %s",
2316 raft_get_nickname(raft, sid, buf, sizeof buf),
2317 SID_ARGS(sid), address, CID_ARGS(&raft->cid),
2318 success ? "succeeded" : "failed");
2319 if (comment) {
2320 ds_put_format(&s, " (%s)", comment);
2321 }
2322 VLOG_INFO("%s", ds_cstr(&s));
2323 ds_destroy(&s);
2324 }
2325
2326 union raft_rpc rpy = {
2327 .add_server_reply = {
2328 .common = {
2329 .type = RAFT_RPC_ADD_SERVER_REPLY,
2330 .sid = *sid,
2331 .comment = CONST_CAST(char *, comment),
2332 },
2333 .success = success,
2334 }
2335 };
2336
2337 struct sset *remote_addresses = &rpy.add_server_reply.remote_addresses;
2338 sset_init(remote_addresses);
2339 if (!raft->joining) {
2340 struct raft_server *s;
2341 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2342 if (!uuid_equals(&s->sid, &raft->sid)) {
2343 sset_add(remote_addresses, s->address);
2344 }
2345 }
2346 }
2347
2348 raft_send(raft, &rpy);
2349
2350 sset_destroy(remote_addresses);
2351 }
2352
2353 static void
2354 raft_send_remove_server_reply_rpc(struct raft *raft,
2355 const struct uuid *dst_sid,
2356 const struct uuid *target_sid,
2357 bool success, const char *comment)
2358 {
2359 if (uuid_equals(&raft->sid, dst_sid)) {
2360 if (success && uuid_equals(&raft->sid, target_sid)) {
2361 raft_finished_leaving_cluster(raft);
2362 }
2363 return;
2364 }
2365
2366 const union raft_rpc rpy = {
2367 .remove_server_reply = {
2368 .common = {
2369 .type = RAFT_RPC_REMOVE_SERVER_REPLY,
2370 .sid = *dst_sid,
2371 .comment = CONST_CAST(char *, comment),
2372 },
2373 .target_sid = (uuid_equals(dst_sid, target_sid)
2374 ? UUID_ZERO
2375 : *target_sid),
2376 .success = success,
2377 }
2378 };
2379 raft_send(raft, &rpy);
2380 }
2381
2382 static void
2383 raft_send_remove_server_reply__(struct raft *raft,
2384 const struct uuid *target_sid,
2385 const struct uuid *requester_sid,
2386 struct unixctl_conn *requester_conn,
2387 bool success, const char *comment)
2388 {
2389 struct ds s = DS_EMPTY_INITIALIZER;
2390 ds_put_format(&s, "request ");
2391 if (!uuid_is_zero(requester_sid)) {
2392 char buf[SID_LEN + 1];
2393 ds_put_format(&s, "by %s",
2394 raft_get_nickname(raft, requester_sid, buf, sizeof buf));
2395 } else {
2396 ds_put_cstr(&s, "via unixctl");
2397 }
2398 ds_put_cstr(&s, " to remove ");
2399 if (!requester_conn && uuid_equals(target_sid, requester_sid)) {
2400 ds_put_cstr(&s, "itself");
2401 } else {
2402 char buf[SID_LEN + 1];
2403 ds_put_cstr(&s, raft_get_nickname(raft, target_sid, buf, sizeof buf));
2404 if (uuid_equals(target_sid, &raft->sid)) {
2405 ds_put_cstr(&s, " (ourselves)");
2406 }
2407 }
2408 ds_put_format(&s, " from cluster "CID_FMT" %s",
2409 CID_ARGS(&raft->cid),
2410 success ? "succeeded" : "failed");
2411 if (comment) {
2412 ds_put_format(&s, " (%s)", comment);
2413 }
2414
2415 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2416 VLOG_INFO_RL(&rl, "%s", ds_cstr(&s));
2417
2418 /* Send RemoveServerReply to the requester (which could be a server or a
2419 * unixctl connection. Also always send it to the removed server; this
2420 * allows it to be sure that it's really removed and update its log and
2421 * disconnect permanently. */
2422 if (!uuid_is_zero(requester_sid)) {
2423 raft_send_remove_server_reply_rpc(raft, requester_sid, target_sid,
2424 success, comment);
2425 }
2426 if (!uuid_equals(requester_sid, target_sid)) {
2427 raft_send_remove_server_reply_rpc(raft, target_sid, target_sid,
2428 success, comment);
2429 }
2430 if (requester_conn) {
2431 if (success) {
2432 unixctl_command_reply(requester_conn, ds_cstr(&s));
2433 } else {
2434 unixctl_command_reply_error(requester_conn, ds_cstr(&s));
2435 }
2436 }
2437
2438 ds_destroy(&s);
2439 }
2440
2441 static void
2442 raft_send_add_server_reply(struct raft *raft,
2443 const struct raft_add_server_request *rq,
2444 bool success, const char *comment)
2445 {
2446 return raft_send_add_server_reply__(raft, &rq->common.sid, rq->address,
2447 success, comment);
2448 }
2449
2450 static void
2451 raft_send_remove_server_reply(struct raft *raft,
2452 const struct raft_remove_server_request *rq,
2453 bool success, const char *comment)
2454 {
2455 return raft_send_remove_server_reply__(raft, &rq->sid, &rq->common.sid,
2456 rq->requester_conn, success,
2457 comment);
2458 }
2459
2460 static void
2461 raft_become_follower(struct raft *raft)
2462 {
2463 raft->leader_sid = UUID_ZERO;
2464 if (raft->role == RAFT_FOLLOWER) {
2465 return;
2466 }
2467
2468 raft->role = RAFT_FOLLOWER;
2469 raft_reset_election_timer(raft);
2470
2471 /* Notify clients about lost leadership.
2472 *
2473 * We do not reverse our changes to 'raft->servers' because the new
2474 * configuration is already part of the log. Possibly the configuration
2475 * log entry will not be committed, but until we know that we must use the
2476 * new configuration. Our AppendEntries processing will properly update
2477 * the server configuration later, if necessary. */
2478 struct raft_server *s;
2479 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
2480 raft_send_add_server_reply__(raft, &s->sid, s->address, false,
2481 RAFT_SERVER_LOST_LEADERSHIP);
2482 }
2483 if (raft->remove_server) {
2484 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
2485 &raft->remove_server->requester_sid,
2486 raft->remove_server->requester_conn,
2487 false, RAFT_SERVER_LOST_LEADERSHIP);
2488 raft_server_destroy(raft->remove_server);
2489 raft->remove_server = NULL;
2490 }
2491
2492 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2493 }
2494
2495 static void
2496 raft_send_append_request(struct raft *raft,
2497 struct raft_server *peer, unsigned int n,
2498 const char *comment)
2499 {
2500 ovs_assert(raft->role == RAFT_LEADER);
2501
2502 const union raft_rpc rq = {
2503 .append_request = {
2504 .common = {
2505 .type = RAFT_RPC_APPEND_REQUEST,
2506 .sid = peer->sid,
2507 .comment = CONST_CAST(char *, comment),
2508 },
2509 .term = raft->term,
2510 .prev_log_index = peer->next_index - 1,
2511 .prev_log_term = (peer->next_index - 1 >= raft->log_start
2512 ? raft->entries[peer->next_index - 1
2513 - raft->log_start].term
2514 : raft->snap.term),
2515 .leader_commit = raft->commit_index,
2516 .entries = &raft->entries[peer->next_index - raft->log_start],
2517 .n_entries = n,
2518 },
2519 };
2520 raft_send(raft, &rq);
2521 }
2522
2523 static void
2524 raft_send_heartbeats(struct raft *raft)
2525 {
2526 struct raft_server *s;
2527 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2528 if (!uuid_equals(&raft->sid, &s->sid)) {
2529 raft_send_append_request(raft, s, 0, "heartbeat");
2530 }
2531 }
2532
2533 /* Send anyone waiting for a command to complete a ping to let them
2534 * know we're still working on it. */
2535 struct raft_command *cmd;
2536 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2537 if (!uuid_is_zero(&cmd->sid)) {
2538 raft_send_execute_command_reply(raft, &cmd->sid,
2539 &cmd->eid,
2540 RAFT_CMD_INCOMPLETE, 0);
2541 }
2542 }
2543
2544 raft_reset_ping_timer(raft);
2545 }
2546
2547 /* Initializes the fields in 's' that represent the leader's view of the
2548 * server. */
2549 static void
2550 raft_server_init_leader(struct raft *raft, struct raft_server *s)
2551 {
2552 s->next_index = raft->log_end;
2553 s->match_index = 0;
2554 s->phase = RAFT_PHASE_STABLE;
2555 s->replied = false;
2556 }
2557
2558 static void
2559 raft_set_leader(struct raft *raft, const struct uuid *sid)
2560 {
2561 raft->leader_sid = *sid;
2562 raft->ever_had_leader = raft->had_leader = true;
2563 raft->candidate_retrying = false;
2564 }
2565
2566 static void
2567 raft_become_leader(struct raft *raft)
2568 {
2569 log_all_commands(raft);
2570
2571 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
2572 VLOG_INFO_RL(&rl, "term %"PRIu64": elected leader by %d+ of "
2573 "%"PRIuSIZE" servers", raft->term,
2574 raft->n_votes, hmap_count(&raft->servers));
2575
2576 ovs_assert(raft->role != RAFT_LEADER);
2577 raft->role = RAFT_LEADER;
2578 raft_set_leader(raft, &raft->sid);
2579 raft_reset_election_timer(raft);
2580 raft_reset_ping_timer(raft);
2581
2582 struct raft_server *s;
2583 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2584 raft_server_init_leader(raft, s);
2585 }
2586
2587 raft->election_timer_new = 0;
2588
2589 raft_update_our_match_index(raft, raft->log_end - 1);
2590
2591 /* Write the fact that we are leader to the log. This is not used by the
2592 * algorithm (although it could be, for quick restart), but it is used for
2593 * offline analysis to check for conformance with the properties that Raft
2594 * guarantees. */
2595 struct raft_record r = {
2596 .type = RAFT_REC_LEADER,
2597 .term = raft->term,
2598 .sid = raft->sid,
2599 };
2600 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2601
2602 /* Initiate a no-op commit. Otherwise we might never find out what's in
2603 * the log. See section 6.4 item 1:
2604 *
2605 * The Leader Completeness Property guarantees that a leader has all
2606 * committed entries, but at the start of its term, it may not know
2607 * which those are. To find out, it needs to commit an entry from its
2608 * term. Raft handles this by having each leader commit a blank no-op
2609 * entry into the log at the start of its term. As soon as this no-op
2610 * entry is committed, the leader’s commit index will be at least as
2611 * large as any other servers’ during its term.
2612 */
2613 raft_command_unref(raft_command_execute__(raft, NULL, NULL, 0, NULL,
2614 NULL));
2615 }
2616
2617 /* Processes term 'term' received as part of RPC 'common'. Returns true if the
2618 * caller should continue processing the RPC, false if the caller should reject
2619 * it due to a stale term. */
2620 static bool
2621 raft_receive_term__(struct raft *raft, const struct raft_rpc_common *common,
2622 uint64_t term)
2623 {
2624 /* Section 3.3 says:
2625 *
2626 * Current terms are exchanged whenever servers communicate; if one
2627 * server’s current term is smaller than the other’s, then it updates
2628 * its current term to the larger value. If a candidate or leader
2629 * discovers that its term is out of date, it immediately reverts to
2630 * follower state. If a server receives a request with a stale term
2631 * number, it rejects the request.
2632 */
2633 if (term > raft->term) {
2634 if (!raft_set_term(raft, term, NULL)) {
2635 /* Failed to update the term to 'term'. */
2636 return false;
2637 }
2638 raft_become_follower(raft);
2639 } else if (term < raft->term) {
2640 char buf[SID_LEN + 1];
2641 VLOG_INFO("rejecting term %"PRIu64" < current term %"PRIu64" received "
2642 "in %s message from server %s",
2643 term, raft->term,
2644 raft_rpc_type_to_string(common->type),
2645 raft_get_nickname(raft, &common->sid, buf, sizeof buf));
2646 return false;
2647 }
2648 return true;
2649 }
2650
2651 static void
2652 raft_get_servers_from_log(struct raft *raft, enum vlog_level level)
2653 {
2654 const struct json *servers_json = raft->snap.servers;
2655 for (uint64_t index = raft->log_end - 1; index >= raft->log_start;
2656 index--) {
2657 struct raft_entry *e = &raft->entries[index - raft->log_start];
2658 if (e->servers) {
2659 servers_json = e->servers;
2660 break;
2661 }
2662 }
2663
2664 struct hmap servers;
2665 struct ovsdb_error *error = raft_servers_from_json(servers_json, &servers);
2666 ovs_assert(!error);
2667 raft_set_servers(raft, &servers, level);
2668 raft_servers_destroy(&servers);
2669 }
2670
2671 /* Truncates the log, so that raft->log_end becomes 'new_end'.
2672 *
2673 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2674 * log file, but we don't have the right information to know how long it should
2675 * be. What we actually do is to append entries for older indexes to the
2676 * on-disk log; when we re-read it later, these entries truncate the log.
2677 *
2678 * Returns true if any of the removed log entries were server configuration
2679 * entries, false otherwise. */
2680 static bool
2681 raft_truncate(struct raft *raft, uint64_t new_end)
2682 {
2683 ovs_assert(new_end >= raft->log_start);
2684 if (raft->log_end > new_end) {
2685 char buf[SID_LEN + 1];
2686 VLOG_INFO("%s truncating %"PRIu64 " entries from end of log",
2687 raft_get_nickname(raft, &raft->sid, buf, sizeof buf),
2688 raft->log_end - new_end);
2689 }
2690
2691 bool servers_changed = false;
2692 while (raft->log_end > new_end) {
2693 struct raft_entry *entry = &raft->entries[--raft->log_end
2694 - raft->log_start];
2695 if (entry->servers) {
2696 servers_changed = true;
2697 }
2698 raft_entry_uninit(entry);
2699 }
2700 return servers_changed;
2701 }
2702
2703 static const struct json *
2704 raft_peek_next_entry(struct raft *raft, struct uuid *eid)
2705 {
2706 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2707 ovs_assert(raft->log_start <= raft->last_applied + 2);
2708 ovs_assert(raft->last_applied <= raft->commit_index);
2709 ovs_assert(raft->commit_index < raft->log_end);
2710
2711 if (raft->joining || raft->failed) {
2712 return NULL;
2713 }
2714
2715 if (raft->log_start == raft->last_applied + 2) {
2716 *eid = raft->snap.eid;
2717 return raft->snap.data;
2718 }
2719
2720 while (raft->last_applied < raft->commit_index) {
2721 const struct raft_entry *e = raft_get_entry(raft,
2722 raft->last_applied + 1);
2723 if (e->data) {
2724 *eid = e->eid;
2725 return e->data;
2726 }
2727 raft->last_applied++;
2728 }
2729 return NULL;
2730 }
2731
2732 static const struct json *
2733 raft_get_next_entry(struct raft *raft, struct uuid *eid)
2734 {
2735 const struct json *data = raft_peek_next_entry(raft, eid);
2736 if (data) {
2737 raft->last_applied++;
2738 }
2739 return data;
2740 }
2741
2742 /* Updates commit index in raft log. If commit index is already up-to-date
2743 * it does nothing and return false, otherwise, returns true. */
2744 static bool
2745 raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
2746 {
2747 if (new_commit_index <= raft->commit_index) {
2748 return false;
2749 }
2750
2751 if (raft->role == RAFT_LEADER) {
2752 while (raft->commit_index < new_commit_index) {
2753 uint64_t index = ++raft->commit_index;
2754 const struct raft_entry *e = raft_get_entry(raft, index);
2755 if (e->data) {
2756 struct raft_command *cmd
2757 = raft_find_command_by_eid(raft, &e->eid);
2758 if (cmd) {
2759 if (!cmd->index) {
2760 VLOG_DBG("Command completed after role change from"
2761 " follower to leader "UUID_FMT,
2762 UUID_ARGS(&e->eid));
2763 cmd->index = index;
2764 }
2765 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2766 }
2767 }
2768 if (e->election_timer) {
2769 VLOG_INFO("Election timer changed from %"PRIu64" to %"PRIu64,
2770 raft->election_timer, e->election_timer);
2771 raft->election_timer = e->election_timer;
2772 raft->election_timer_new = 0;
2773 }
2774 if (e->servers) {
2775 /* raft_run_reconfigure() can write a new Raft entry, which can
2776 * reallocate raft->entries, which would invalidate 'e', so
2777 * this case must be last, after the one for 'e->data'. */
2778 raft_run_reconfigure(raft);
2779 }
2780 }
2781 } else {
2782 while (raft->commit_index < new_commit_index) {
2783 uint64_t index = ++raft->commit_index;
2784 const struct raft_entry *e = raft_get_entry(raft, index);
2785 if (e->election_timer) {
2786 VLOG_INFO("Election timer changed from %"PRIu64" to %"PRIu64,
2787 raft->election_timer, e->election_timer);
2788 raft->election_timer = e->election_timer;
2789 }
2790 }
2791 /* Check if any pending command can be completed, and complete it.
2792 * This can happen when leader fail-over before sending
2793 * execute_command_reply. */
2794 const struct uuid *eid = raft_get_eid(raft, new_commit_index);
2795 struct raft_command *cmd = raft_find_command_by_eid(raft, eid);
2796 if (cmd) {
2797 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2798 VLOG_INFO_RL(&rl,
2799 "Command completed without reply (eid: "UUID_FMT", "
2800 "commit index: %"PRIu64")",
2801 UUID_ARGS(eid), new_commit_index);
2802 cmd->index = new_commit_index;
2803 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2804 }
2805 }
2806
2807 /* Write the commit index to the log. The next time we restart, this
2808 * allows us to start exporting a reasonably fresh log, instead of a log
2809 * that only contains the snapshot. */
2810 struct raft_record r = {
2811 .type = RAFT_REC_COMMIT_INDEX,
2812 .commit_index = raft->commit_index,
2813 };
2814 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2815 return true;
2816 }
2817
2818 /* This doesn't use rq->entries (but it does use rq->n_entries). */
2819 static void
2820 raft_send_append_reply(struct raft *raft, const struct raft_append_request *rq,
2821 enum raft_append_result result, const char *comment)
2822 {
2823 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2824 * min(leaderCommit, index of last new entry)" */
2825 if (result == RAFT_APPEND_OK && rq->leader_commit > raft->commit_index) {
2826 raft_update_commit_index(
2827 raft, MIN(rq->leader_commit, rq->prev_log_index + rq->n_entries));
2828 }
2829
2830 /* Send reply. */
2831 union raft_rpc reply = {
2832 .append_reply = {
2833 .common = {
2834 .type = RAFT_RPC_APPEND_REPLY,
2835 .sid = rq->common.sid,
2836 .comment = CONST_CAST(char *, comment),
2837 },
2838 .term = raft->term,
2839 .log_end = raft->log_end,
2840 .prev_log_index = rq->prev_log_index,
2841 .prev_log_term = rq->prev_log_term,
2842 .n_entries = rq->n_entries,
2843 .result = result,
2844 }
2845 };
2846 raft_send(raft, &reply);
2847 }
2848
2849 /* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2850 * NULL. Otherwise, returns an explanation for the mismatch. */
2851 static const char *
2852 match_index_and_term(const struct raft *raft,
2853 uint64_t prev_log_index, uint64_t prev_log_term)
2854 {
2855 if (prev_log_index < raft->log_start - 1) {
2856 return "mismatch before start of log";
2857 } else if (prev_log_index == raft->log_start - 1) {
2858 if (prev_log_term != raft->snap.term) {
2859 return "prev_term mismatch";
2860 }
2861 } else if (prev_log_index < raft->log_end) {
2862 if (raft->entries[prev_log_index - raft->log_start].term
2863 != prev_log_term) {
2864 return "term mismatch";
2865 }
2866 } else {
2867 /* prev_log_index >= raft->log_end */
2868 return "mismatch past end of log";
2869 }
2870 return NULL;
2871 }
2872
2873 static void
2874 raft_handle_append_entries(struct raft *raft,
2875 const struct raft_append_request *rq,
2876 uint64_t prev_log_index, uint64_t prev_log_term,
2877 const struct raft_entry *entries,
2878 unsigned int n_entries)
2879 {
2880 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2881 * the index and term of the entry in its log that immediately precedes
2882 * the new entries. If the follower does not find an entry in its log
2883 * with the same index and term, then it refuses the new entries." */
2884 const char *mismatch = match_index_and_term(raft, prev_log_index,
2885 prev_log_term);
2886 if (mismatch) {
2887 VLOG_INFO("rejecting append_request because previous entry "
2888 "%"PRIu64",%"PRIu64" not in local log (%s)",
2889 prev_log_term, prev_log_index, mismatch);
2890 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY, mismatch);
2891 return;
2892 }
2893
2894 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2895 * index but different terms), delete the existing entry and all that
2896 * follow it." */
2897 unsigned int i;
2898 bool servers_changed = false;
2899 for (i = 0; ; i++) {
2900 if (i >= n_entries) {
2901 /* No change. */
2902 if (rq->common.comment
2903 && !strcmp(rq->common.comment, "heartbeat")) {
2904 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "heartbeat");
2905 } else {
2906 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2907 }
2908 return;
2909 }
2910
2911 uint64_t log_index = (prev_log_index + 1) + i;
2912 if (log_index >= raft->log_end) {
2913 break;
2914 }
2915 if (raft->entries[log_index - raft->log_start].term
2916 != entries[i].term) {
2917 if (raft_truncate(raft, log_index)) {
2918 servers_changed = true;
2919 }
2920 break;
2921 }
2922 }
2923
2924 if (failure_test == FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE) {
2925 ovs_fatal(0, "Raft test: crash after receiving append_request with "
2926 "update.");
2927 }
2928 /* Figure 3.1: "Append any entries not already in the log." */
2929 struct ovsdb_error *error = NULL;
2930 bool any_written = false;
2931 for (; i < n_entries; i++) {
2932 const struct raft_entry *e = &entries[i];
2933 error = raft_write_entry(raft, e->term,
2934 json_nullable_clone(e->data), &e->eid,
2935 json_nullable_clone(e->servers),
2936 e->election_timer);
2937 if (error) {
2938 break;
2939 }
2940 any_written = true;
2941 if (e->servers) {
2942 servers_changed = true;
2943 }
2944 }
2945
2946 if (any_written) {
2947 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index
2948 = raft->log_end - 1;
2949 }
2950 if (servers_changed) {
2951 /* The set of servers might have changed; check. */
2952 raft_get_servers_from_log(raft, VLL_INFO);
2953 }
2954
2955 if (error) {
2956 char *s = ovsdb_error_to_string_free(error);
2957 VLOG_ERR("%s", s);
2958 free(s);
2959 raft_send_append_reply(raft, rq, RAFT_APPEND_IO_ERROR, "I/O error");
2960 return;
2961 }
2962
2963 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "log updated");
2964 }
2965
2966 static bool
2967 raft_update_leader(struct raft *raft, const struct uuid *sid)
2968 {
2969 if (raft->role == RAFT_LEADER) {
2970 char buf[SID_LEN + 1];
2971 VLOG_ERR("this server is leader but server %s claims to be",
2972 raft_get_nickname(raft, sid, buf, sizeof buf));
2973 return false;
2974 } else if (!uuid_equals(sid, &raft->leader_sid)) {
2975 if (!uuid_is_zero(&raft->leader_sid)) {
2976 char buf1[SID_LEN + 1];
2977 char buf2[SID_LEN + 1];
2978 VLOG_ERR("leader for term %"PRIu64" changed from %s to %s",
2979 raft->term,
2980 raft_get_nickname(raft, &raft->leader_sid,
2981 buf1, sizeof buf1),
2982 raft_get_nickname(raft, sid, buf2, sizeof buf2));
2983 } else {
2984 char buf[SID_LEN + 1];
2985 VLOG_INFO("server %s is leader for term %"PRIu64,
2986 raft_get_nickname(raft, sid, buf, sizeof buf),
2987 raft->term);
2988 }
2989 raft_set_leader(raft, sid);
2990
2991 /* Record the leader to the log. This is not used by the algorithm
2992 * (although it could be, for quick restart), but it is used for
2993 * offline analysis to check for conformance with the properties
2994 * that Raft guarantees. */
2995 struct raft_record r = {
2996 .type = RAFT_REC_LEADER,
2997 .term = raft->term,
2998 .sid = *sid,
2999 };
3000 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
3001 }
3002 if (raft->role == RAFT_CANDIDATE) {
3003 /* Section 3.4: While waiting for votes, a candidate may
3004 * receive an AppendEntries RPC from another server claiming to
3005 * be leader. If the leader’s term (included in its RPC) is at
3006 * least as large as the candidate’s current term, then the
3007 * candidate recognizes the leader as legitimate and returns to
3008 * follower state. */
3009 raft->role = RAFT_FOLLOWER;
3010 }
3011 return true;
3012 }
3013
3014 static void
3015 raft_handle_append_request(struct raft *raft,
3016 const struct raft_append_request *rq)
3017 {
3018 /* We do not check whether the server that sent the request is part of the
3019 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
3020 * from a leader that is not part of the server’s latest configuration.
3021 * Otherwise, a new server could never be added to the cluster (it would
3022 * never accept any log entries preceding the configuration entry that adds
3023 * the server)." */
3024 if (!raft_update_leader(raft, &rq->common.sid)) {
3025 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
3026 "usurped leadership");
3027 return;
3028 }
3029 raft_reset_election_timer(raft);
3030
3031 /* First check for the common case, where the AppendEntries request is
3032 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
3033 * like this:
3034 *
3035 * rq->prev_log_index
3036 * | first_entry_index
3037 * | | nth_entry_index
3038 * | | |
3039 * v v v
3040 * +---+---+---+---+
3041 * T | T | T | T | T |
3042 * +---+-------+---+
3043 * +---+---+---+---+
3044 * T | T | T | T | T |
3045 * +---+---+---+---+
3046 * ^ ^
3047 * | |
3048 * log_start log_end
3049 * */
3050 uint64_t first_entry_index = rq->prev_log_index + 1;
3051 uint64_t nth_entry_index = rq->prev_log_index + rq->n_entries;
3052 if (OVS_LIKELY(first_entry_index >= raft->log_start)) {
3053 raft_handle_append_entries(raft, rq,
3054 rq->prev_log_index, rq->prev_log_term,
3055 rq->entries, rq->n_entries);
3056 return;
3057 }
3058
3059 /* Now a series of checks for odd cases, where the AppendEntries request
3060 * extends earlier than the beginning of our log, into the log entries
3061 * discarded by the most recent snapshot. */
3062
3063 /*
3064 * Handle the case where the indexes covered by rq->entries[] are entirely
3065 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
3066 * everything in the AppendEntries request must already have been
3067 * committed, and we might as well return true.
3068 *
3069 * rq->prev_log_index
3070 * | first_entry_index
3071 * | | nth_entry_index
3072 * | | |
3073 * v v v
3074 * +---+---+---+---+
3075 * T | T | T | T | T |
3076 * +---+-------+---+
3077 * +---+---+---+---+
3078 * T | T | T | T | T |
3079 * +---+---+---+---+
3080 * ^ ^
3081 * | |
3082 * log_start log_end
3083 */
3084 if (nth_entry_index < raft->log_start - 1) {
3085 raft_send_append_reply(raft, rq, RAFT_APPEND_OK,
3086 "append before log start");
3087 return;
3088 }
3089
3090 /*
3091 * Handle the case where the last entry in rq->entries[] has the same index
3092 * as 'log_start - 1', so we can compare their terms:
3093 *
3094 * rq->prev_log_index
3095 * | first_entry_index
3096 * | | nth_entry_index
3097 * | | |
3098 * v v v
3099 * +---+---+---+---+
3100 * T | T | T | T | T |
3101 * +---+-------+---+
3102 * +---+---+---+---+
3103 * T | T | T | T | T |
3104 * +---+---+---+---+
3105 * ^ ^
3106 * | |
3107 * log_start log_end
3108 *
3109 * There's actually a sub-case where rq->n_entries == 0, in which we
3110 * compare rq->prev_term:
3111 *
3112 * rq->prev_log_index
3113 * |
3114 * |
3115 * |
3116 * v
3117 * T
3118 *
3119 * +---+---+---+---+
3120 * T | T | T | T | T |
3121 * +---+---+---+---+
3122 * ^ ^
3123 * | |
3124 * log_start log_end
3125 */
3126 if (nth_entry_index == raft->log_start - 1) {
3127 if (rq->n_entries
3128 ? raft->snap.term == rq->entries[rq->n_entries - 1].term
3129 : raft->snap.term == rq->prev_log_term) {
3130 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
3131 } else {
3132 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
3133 "term mismatch");
3134 }
3135 return;
3136 }
3137
3138 /*
3139 * We now know that the data in rq->entries[] overlaps the data in
3140 * raft->entries[], as shown below, with some positive 'ofs':
3141 *
3142 * rq->prev_log_index
3143 * | first_entry_index
3144 * | | nth_entry_index
3145 * | | |
3146 * v v v
3147 * +---+---+---+---+---+
3148 * T | T | T | T | T | T |
3149 * +---+-------+---+---+
3150 * +---+---+---+---+
3151 * T | T | T | T | T |
3152 * +---+---+---+---+
3153 * ^ ^
3154 * | |
3155 * log_start log_end
3156 *
3157 * |<-- ofs -->|
3158 *
3159 * We transform this into the following by trimming the first 'ofs'
3160 * elements off of rq->entries[], ending up with the following. Notice how
3161 * we retain the term but not the data for rq->entries[ofs - 1]:
3162 *
3163 * first_entry_index + ofs - 1
3164 * | first_entry_index + ofs
3165 * | | nth_entry_index + ofs
3166 * | | |
3167 * v v v
3168 * +---+---+
3169 * T | T | T |
3170 * +---+---+
3171 * +---+---+---+---+
3172 * T | T | T | T | T |
3173 * +---+---+---+---+
3174 * ^ ^
3175 * | |
3176 * log_start log_end
3177 */
3178 uint64_t ofs = raft->log_start - first_entry_index;
3179 raft_handle_append_entries(raft, rq,
3180 raft->log_start - 1, rq->entries[ofs - 1].term,
3181 &rq->entries[ofs], rq->n_entries - ofs);
3182 }
3183
3184 /* Returns true if 'raft' has another log entry or snapshot to read. */
3185 bool
3186 raft_has_next_entry(const struct raft *raft_)
3187 {
3188 struct raft *raft = CONST_CAST(struct raft *, raft_);
3189 struct uuid eid;
3190 return raft_peek_next_entry(raft, &eid) != NULL;
3191 }
3192
3193 /* Returns the next log entry or snapshot from 'raft', or NULL if there are
3194 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
3195 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
3196 * log entry. */
3197 const struct json *
3198 raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
3199 {
3200 const struct json *data = raft_get_next_entry(raft, eid);
3201 *is_snapshot = data == raft->snap.data;
3202 return data;
3203 }
3204
3205 /* Returns the log index of the last-read snapshot or log entry. */
3206 uint64_t
3207 raft_get_applied_index(const struct raft *raft)
3208 {
3209 return raft->last_applied;
3210 }
3211
3212 /* Returns the log index of the last snapshot or log entry that is available to
3213 * be read. */
3214 uint64_t
3215 raft_get_commit_index(const struct raft *raft)
3216 {
3217 return raft->commit_index;
3218 }
3219
3220 static struct raft_server *
3221 raft_find_peer(struct raft *raft, const struct uuid *uuid)
3222 {
3223 struct raft_server *s = raft_find_server(raft, uuid);
3224 return s && !uuid_equals(&raft->sid, &s->sid) ? s : NULL;
3225 }
3226
3227 static struct raft_server *
3228 raft_find_new_server(struct raft *raft, const struct uuid *uuid)
3229 {
3230 return raft_server_find(&raft->add_servers, uuid);
3231 }
3232
3233 /* Figure 3.1: "If there exists an N such that N > commitIndex, a
3234 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
3235 * commitIndex = N (sections 3.5 and 3.6)." */
3236 static void
3237 raft_consider_updating_commit_index(struct raft *raft)
3238 {
3239 /* This loop cannot just bail out when it comes across a log entry that
3240 * does not match the criteria. For example, Figure 3.7(d2) shows a
3241 * case where the log entry for term 2 cannot be committed directly
3242 * (because it is not for the current term) but it can be committed as
3243 * a side effect of commit the entry for term 4 (the current term).
3244 * XXX Is there a more efficient way to do this? */
3245 ovs_assert(raft->role == RAFT_LEADER);
3246
3247 uint64_t new_commit_index = raft->commit_index;
3248 for (uint64_t idx = MAX(raft->commit_index + 1, raft->log_start);
3249 idx < raft->log_end; idx++) {
3250 if (raft->entries[idx - raft->log_start].term == raft->term) {
3251 size_t count = 0;
3252 struct raft_server *s2;
3253 HMAP_FOR_EACH (s2, hmap_node, &raft->servers) {
3254 if (s2->match_index >= idx) {
3255 count++;
3256 }
3257 }
3258 if (count > hmap_count(&raft->servers) / 2) {
3259 VLOG_DBG("index %"PRIu64" committed to %"PRIuSIZE" servers, "
3260 "applying", idx, count);
3261 new_commit_index = idx;
3262 }
3263 }
3264 }
3265 if (raft_update_commit_index(raft, new_commit_index)) {
3266 raft_send_heartbeats(raft);
3267 }
3268 }
3269
3270 static void
3271 raft_update_match_index(struct raft *raft, struct raft_server *s,
3272 uint64_t min_index)
3273 {
3274 ovs_assert(raft->role == RAFT_LEADER);
3275 if (min_index > s->match_index) {
3276 s->match_index = min_index;
3277 raft_consider_updating_commit_index(raft);
3278 }
3279 }
3280
3281 static void
3282 raft_update_our_match_index(struct raft *raft, uint64_t min_index)
3283 {
3284 struct raft_server *server = raft_find_server(raft, &raft->sid);
3285 if (server) {
3286 raft_update_match_index(raft, server, min_index);
3287 }
3288 }
3289
3290 static void
3291 raft_send_install_snapshot_request(struct raft *raft,
3292 const struct raft_server *s,
3293 const char *comment)
3294 {
3295 union raft_rpc rpc = {
3296 .install_snapshot_request = {
3297 .common = {
3298 .type = RAFT_RPC_INSTALL_SNAPSHOT_REQUEST,
3299 .sid = s->sid,
3300 .comment = CONST_CAST(char *, comment),
3301 },
3302 .term = raft->term,
3303 .last_index = raft->log_start - 1,
3304 .last_term = raft->snap.term,
3305 .last_servers = raft->snap.servers,
3306 .last_eid = raft->snap.eid,
3307 .data = raft->snap.data,
3308 .election_timer = raft->election_timer, /* use latest value */
3309 }
3310 };
3311
3312 if (s->last_install_snapshot_request) {
3313 struct raft_install_snapshot_request *old, *new;
3314
3315 old = s->last_install_snapshot_request;
3316 new = &rpc.install_snapshot_request;
3317 if ( old->term == new->term
3318 && old->last_index == new->last_index
3319 && old->last_term == new->last_term
3320 && old->last_servers == new->last_servers
3321 && old->data == new->data
3322 && old->election_timer == new->election_timer
3323 && uuid_equals(&old->last_eid, &new->last_eid)) {
3324 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3325
3326 VLOG_WARN_RL(&rl, "not sending exact same install_snapshot_request"
3327 " to server %s again", s->nickname);
3328 return;
3329 }
3330 }
3331 free(s->last_install_snapshot_request);
3332 CONST_CAST(struct raft_server *, s)->last_install_snapshot_request
3333 = xmemdup(&rpc.install_snapshot_request,
3334 sizeof rpc.install_snapshot_request);
3335
3336 raft_send(raft, &rpc);
3337 }
3338
3339 static void
3340 raft_handle_append_reply(struct raft *raft,
3341 const struct raft_append_reply *rpy)
3342 {
3343 if (raft->role != RAFT_LEADER) {
3344 VLOG_INFO("rejected append_reply (not leader)");
3345 return;
3346 }
3347
3348 /* Most commonly we'd be getting an AppendEntries reply from a configured
3349 * server (e.g. a peer), but we can also get them from servers in the
3350 * process of being added. */
3351 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3352 if (!s) {
3353 s = raft_find_new_server(raft, &rpy->common.sid);
3354 if (!s) {
3355 VLOG_INFO("rejected append_reply from unknown server "SID_FMT,
3356 SID_ARGS(&rpy->common.sid));
3357 return;
3358 }
3359 }
3360
3361 s->replied = true;
3362 if (rpy->result == RAFT_APPEND_OK) {
3363 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3364 * follower (section 3.5)." */
3365 uint64_t min_index = rpy->prev_log_index + rpy->n_entries + 1;
3366 if (s->next_index < min_index) {
3367 s->next_index = min_index;
3368 }
3369 raft_update_match_index(raft, s, min_index - 1);
3370 } else {
3371 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3372 * decrement nextIndex and retry (section 3.5)."
3373 *
3374 * We also implement the optimization suggested in section 4.2.1:
3375 * "Various approaches can make nextIndex converge to its correct value
3376 * more quickly, including those described in Chapter 3. The simplest
3377 * approach to solving this particular problem of adding a new server,
3378 * however, is to have followers return the length of their logs in the
3379 * AppendEntries response; this allows the leader to cap the follower’s
3380 * nextIndex accordingly." */
3381 s->next_index = (s->next_index > 0
3382 ? MIN(s->next_index - 1, rpy->log_end)
3383 : 0);
3384
3385 if (rpy->result == RAFT_APPEND_IO_ERROR) {
3386 /* Append failed but not because of a log inconsistency. Because
3387 * of the I/O error, there's no point in re-sending the append
3388 * immediately. */
3389 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3390 VLOG_INFO_RL(&rl, "%s reported I/O error", s->nickname);
3391 return;
3392 }
3393 }
3394
3395 /*
3396 * Our behavior here must depend on the value of next_index relative to
3397 * log_start and log_end. There are three cases:
3398 *
3399 * Case 1 | Case 2 | Case 3
3400 * <---------------->|<------------->|<------------------>
3401 * | |
3402 *
3403 * +---+---+---+---+
3404 * T | T | T | T | T |
3405 * +---+---+---+---+
3406 * ^ ^
3407 * | |
3408 * log_start log_end
3409 */
3410 if (s->next_index < raft->log_start) {
3411 /* Case 1. */
3412 raft_send_install_snapshot_request(raft, s, NULL);
3413 } else if (s->next_index < raft->log_end) {
3414 /* Case 2. */
3415 raft_send_append_request(raft, s, raft->log_end - s->next_index, NULL);
3416 } else {
3417 /* Case 3. */
3418 if (s->phase == RAFT_PHASE_CATCHUP) {
3419 s->phase = RAFT_PHASE_CAUGHT_UP;
3420 raft_run_reconfigure(raft);
3421 }
3422 }
3423 }
3424
3425 static bool
3426 raft_should_suppress_disruptive_server(struct raft *raft,
3427 const union raft_rpc *rpc)
3428 {
3429 if (rpc->type != RAFT_RPC_VOTE_REQUEST) {
3430 return false;
3431 }
3432
3433 /* Section 4.2.3 "Disruptive Servers" says:
3434 *
3435 * ...if a server receives a RequestVote request within the minimum
3436 * election timeout of hearing from a current leader, it does not update
3437 * its term or grant its vote...
3438 *
3439 * ...This change conflicts with the leadership transfer mechanism as
3440 * described in Chapter 3, in which a server legitimately starts an
3441 * election without waiting an election timeout. In that case,
3442 * RequestVote messages should be processed by other servers even when
3443 * they believe a current cluster leader exists. Those RequestVote
3444 * requests can include a special flag to indicate this behavior (“I
3445 * have permission to disrupt the leader--it told me to!”).
3446 *
3447 * This clearly describes how the followers should act, but not the leader.
3448 * We just ignore vote requests that arrive at a current leader. This
3449 * seems to be fairly safe, since a majority other than the current leader
3450 * can still elect a new leader and the first AppendEntries from that new
3451 * leader will depose the current leader. */
3452 const struct raft_vote_request *rq = raft_vote_request_cast(rpc);
3453 if (rq->leadership_transfer) {
3454 return false;
3455 }
3456
3457 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3458 long long int now = time_msec();
3459 switch (raft->role) {
3460 case RAFT_LEADER:
3461 VLOG_WARN_RL(&rl, "ignoring vote request received as leader");
3462 return true;
3463
3464 case RAFT_FOLLOWER:
3465 if (now < raft->election_base + raft->election_timer) {
3466 VLOG_WARN_RL(&rl, "ignoring vote request received after only "
3467 "%lld ms (minimum election time is %"PRIu64" ms)",
3468 now - raft->election_base, raft->election_timer);
3469 return true;
3470 }
3471 return false;
3472
3473 case RAFT_CANDIDATE:
3474 return false;
3475
3476 default:
3477 OVS_NOT_REACHED();
3478 }
3479 }
3480
3481 /* Returns true if a reply should be sent. */
3482 static bool
3483 raft_handle_vote_request__(struct raft *raft,
3484 const struct raft_vote_request *rq)
3485 {
3486 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3487 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3488 * 3.6)." */
3489 if (uuid_equals(&raft->vote, &rq->common.sid)) {
3490 /* Already voted for this candidate in this term. Resend vote. */
3491 return true;
3492 } else if (!uuid_is_zero(&raft->vote)) {
3493 /* Already voted for different candidate in this term. Send a reply
3494 * saying what candidate we did vote for. This isn't a necessary part
3495 * of the Raft protocol but it can make debugging easier. */
3496 return true;
3497 }
3498
3499 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3500 * includes information about the candidate’s log, and the voter denies its
3501 * vote if its own log is more up-to-date than that of the candidate. Raft
3502 * determines which of two logs is more up-to-date by comparing the index
3503 * and term of the last entries in the logs. If the logs have last entries
3504 * with different terms, then the log with the later term is more
3505 * up-to-date. If the logs end with the same term, then whichever log is
3506 * longer is more up-to-date." */
3507 uint64_t last_term = (raft->log_end > raft->log_start
3508 ? raft->entries[raft->log_end - 1
3509 - raft->log_start].term
3510 : raft->snap.term);
3511 if (last_term > rq->last_log_term
3512 || (last_term == rq->last_log_term
3513 && raft->log_end - 1 > rq->last_log_index)) {
3514 /* Our log is more up-to-date than the peer's. Withhold vote. */
3515 return false;
3516 }
3517
3518 /* Record a vote for the peer. */
3519 if (!raft_set_term(raft, raft->term, &rq->common.sid)) {
3520 return false;
3521 }
3522
3523 raft_reset_election_timer(raft);
3524
3525 return true;
3526 }
3527
3528 static void
3529 raft_send_vote_reply(struct raft *raft, const struct uuid *dst,
3530 const struct uuid *vote)
3531 {
3532 union raft_rpc rpy = {
3533 .vote_reply = {
3534 .common = {
3535 .type = RAFT_RPC_VOTE_REPLY,
3536 .sid = *dst,
3537 },
3538 .term = raft->term,
3539 .vote = *vote,
3540 },
3541 };
3542 raft_send(raft, &rpy);
3543 }
3544
3545 static void
3546 raft_handle_vote_request(struct raft *raft,
3547 const struct raft_vote_request *rq)
3548 {
3549 if (raft_handle_vote_request__(raft, rq)) {
3550 raft_send_vote_reply(raft, &rq->common.sid, &raft->vote);
3551 }
3552 }
3553
3554 static void
3555 raft_handle_vote_reply(struct raft *raft,
3556 const struct raft_vote_reply *rpy)
3557 {
3558 if (!raft_receive_term__(raft, &rpy->common, rpy->term)) {
3559 return;
3560 }
3561
3562 if (raft->role != RAFT_CANDIDATE) {
3563 return;
3564 }
3565
3566 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3567 if (s) {
3568 raft_accept_vote(raft, s, &rpy->vote);
3569 }
3570 }
3571
3572 /* Returns true if 'raft''s log contains reconfiguration entries that have not
3573 * yet been committed. */
3574 static bool
3575 raft_has_uncommitted_configuration(const struct raft *raft)
3576 {
3577 for (uint64_t i = raft->commit_index + 1; i < raft->log_end; i++) {
3578 ovs_assert(i >= raft->log_start);
3579 const struct raft_entry *e = &raft->entries[i - raft->log_start];
3580 if (e->servers) {
3581 return true;
3582 }
3583 }
3584 return false;
3585 }
3586
3587 static void
3588 raft_log_reconfiguration(struct raft *raft)
3589 {
3590 struct json *servers_json = raft_servers_to_json(&raft->servers);
3591 raft_command_unref(raft_command_execute__(
3592 raft, NULL, servers_json, 0, NULL, NULL));
3593 json_destroy(servers_json);
3594 }
3595
3596 static void
3597 raft_run_reconfigure(struct raft *raft)
3598 {
3599 ovs_assert(raft->role == RAFT_LEADER);
3600
3601 /* Reconfiguration only progresses when configuration changes commit. */
3602 if (raft_has_uncommitted_configuration(raft)) {
3603 return;
3604 }
3605
3606 /* If we were waiting for a configuration change to commit, it's done. */
3607 struct raft_server *s;
3608 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3609 if (s->phase == RAFT_PHASE_COMMITTING) {
3610 raft_send_add_server_reply__(raft, &s->sid, s->address,
3611 true, RAFT_SERVER_COMPLETED);
3612 s->phase = RAFT_PHASE_STABLE;
3613 }
3614 }
3615 if (raft->remove_server) {
3616 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
3617 &raft->remove_server->requester_sid,
3618 raft->remove_server->requester_conn,
3619 true, RAFT_SERVER_COMPLETED);
3620 raft_server_destroy(raft->remove_server);
3621 raft->remove_server = NULL;
3622 }
3623
3624 /* If a new server is caught up, add it to the configuration. */
3625 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
3626 if (s->phase == RAFT_PHASE_CAUGHT_UP) {
3627 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3628 hmap_remove(&raft->add_servers, &s->hmap_node);
3629 hmap_insert(&raft->servers, &s->hmap_node, uuid_hash(&s->sid));
3630
3631 /* Mark 's' as waiting for commit. */
3632 s->phase = RAFT_PHASE_COMMITTING;
3633
3634 raft_log_reconfiguration(raft);
3635
3636 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3637 * send a RAFT_SERVER_OK reply. */
3638
3639 return;
3640 }
3641 }
3642
3643 /* Remove a server, if one is scheduled for removal. */
3644 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3645 if (s->phase == RAFT_PHASE_REMOVE) {
3646 hmap_remove(&raft->servers, &s->hmap_node);
3647 raft->remove_server = s;
3648
3649 raft_log_reconfiguration(raft);
3650
3651 return;
3652 }
3653 }
3654 }
3655
3656 static void
3657 raft_handle_add_server_request(struct raft *raft,
3658 const struct raft_add_server_request *rq)
3659 {
3660 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3661 if (raft->role != RAFT_LEADER) {
3662 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3663 return;
3664 }
3665
3666 /* Check for an existing server. */
3667 struct raft_server *s = raft_find_server(raft, &rq->common.sid);
3668 if (s) {
3669 /* If the server is scheduled to be removed, cancel it. */
3670 if (s->phase == RAFT_PHASE_REMOVE) {
3671 s->phase = RAFT_PHASE_STABLE;
3672 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_CANCELED);
3673 return;
3674 }
3675
3676 /* If the server is being added, then it's in progress. */
3677 if (s->phase != RAFT_PHASE_STABLE) {
3678 raft_send_add_server_reply(raft, rq,
3679 false, RAFT_SERVER_IN_PROGRESS);
3680 }
3681
3682 /* Nothing to do--server is already part of the configuration. */
3683 raft_send_add_server_reply(raft, rq,
3684 true, RAFT_SERVER_ALREADY_PRESENT);
3685 return;
3686 }
3687
3688 /* Check for a server being removed. */
3689 if (raft->remove_server
3690 && uuid_equals(&rq->common.sid, &raft->remove_server->sid)) {
3691 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3692 return;
3693 }
3694
3695 /* Check for a server already being added. */
3696 if (raft_find_new_server(raft, &rq->common.sid)) {
3697 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_IN_PROGRESS);
3698 return;
3699 }
3700
3701 /* Add server to 'add_servers'. */
3702 s = raft_server_add(&raft->add_servers, &rq->common.sid, rq->address);
3703 raft_server_init_leader(raft, s);
3704 s->requester_sid = rq->common.sid;
3705 s->requester_conn = NULL;
3706 s->phase = RAFT_PHASE_CATCHUP;
3707
3708 /* Start sending the log. If this is the first time we've tried to add
3709 * this server, then this will quickly degenerate into an InstallSnapshot
3710 * followed by a series of AddEntries, but if it's a retry of an earlier
3711 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3712 * leadership) then it will gracefully resume populating the log.
3713 *
3714 * See the last few paragraphs of section 4.2.1 for further insight. */
3715 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3716 VLOG_INFO_RL(&rl,
3717 "starting to add server %s ("SID_FMT" at %s) "
3718 "to cluster "CID_FMT, s->nickname, SID_ARGS(&s->sid),
3719 rq->address, CID_ARGS(&raft->cid));
3720 raft_send_append_request(raft, s, 0, "initialize new server");
3721 }
3722
3723 static void
3724 raft_handle_add_server_reply(struct raft *raft,
3725 const struct raft_add_server_reply *rpy)
3726 {
3727 if (!raft->joining) {
3728 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3729 VLOG_WARN_RL(&rl, "received add_server_reply even though we're "
3730 "already part of the cluster");
3731 return;
3732 }
3733
3734 if (rpy->success) {
3735 raft->joining = false;
3736
3737 /* It is tempting, at this point, to check that this server is part of
3738 * the current configuration. However, this is not necessarily the
3739 * case, because the log entry that added this server to the cluster
3740 * might have been committed by a majority of the cluster that does not
3741 * include this one. This actually happens in testing. */
3742 } else {
3743 const char *address;
3744 SSET_FOR_EACH (address, &rpy->remote_addresses) {
3745 if (sset_add(&raft->remote_addresses, address)) {
3746 VLOG_INFO("%s: learned new server address for joining cluster",
3747 address);
3748 }
3749 }
3750 }
3751 }
3752
3753 /* This is called by raft_unixctl_kick() as well as via RPC. */
3754 static void
3755 raft_handle_remove_server_request(struct raft *raft,
3756 const struct raft_remove_server_request *rq)
3757 {
3758 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3759 if (raft->role != RAFT_LEADER) {
3760 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3761 return;
3762 }
3763
3764 /* If the server to remove is currently waiting to be added, cancel it. */
3765 struct raft_server *target = raft_find_new_server(raft, &rq->sid);
3766 if (target) {
3767 raft_send_add_server_reply__(raft, &target->sid, target->address,
3768 false, RAFT_SERVER_CANCELED);
3769 hmap_remove(&raft->add_servers, &target->hmap_node);
3770 raft_server_destroy(target);
3771 return;
3772 }
3773
3774 /* If the server isn't configured, report that. */
3775 target = raft_find_server(raft, &rq->sid);
3776 if (!target) {
3777 raft_send_remove_server_reply(raft, rq,
3778 true, RAFT_SERVER_ALREADY_GONE);
3779 return;
3780 }
3781
3782 /* Check whether we're waiting for the addition of the server to commit. */
3783 if (target->phase == RAFT_PHASE_COMMITTING) {
3784 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3785 return;
3786 }
3787
3788 /* Check whether the server is already scheduled for removal. */
3789 if (target->phase == RAFT_PHASE_REMOVE) {
3790 raft_send_remove_server_reply(raft, rq,
3791 false, RAFT_SERVER_IN_PROGRESS);
3792 return;
3793 }
3794
3795 /* Make sure that if we remove this server then that at least one other
3796 * server will be left. We don't count servers currently being added (in
3797 * 'add_servers') since those could fail. */
3798 struct raft_server *s;
3799 int n = 0;
3800 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3801 if (s != target && s->phase != RAFT_PHASE_REMOVE) {
3802 n++;
3803 }
3804 }
3805 if (!n) {
3806 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_EMPTY);
3807 return;
3808 }
3809
3810 /* Mark the server for removal. */
3811 target->phase = RAFT_PHASE_REMOVE;
3812 if (rq->requester_conn) {
3813 target->requester_sid = UUID_ZERO;
3814 unixctl_command_reply(rq->requester_conn, "started removal");
3815 } else {
3816 target->requester_sid = rq->common.sid;
3817 target->requester_conn = NULL;
3818 }
3819
3820 raft_run_reconfigure(raft);
3821 /* Operation in progress, reply will be sent later. */
3822 }
3823
3824 static void
3825 raft_finished_leaving_cluster(struct raft *raft)
3826 {
3827 VLOG_INFO(SID_FMT": finished leaving cluster "CID_FMT,
3828 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
3829
3830 raft_record_note(raft, "left", "this server left the cluster");
3831
3832 raft->leaving = false;
3833 raft->left = true;
3834 }
3835
3836 static void
3837 raft_handle_remove_server_reply(struct raft *raft,
3838 const struct raft_remove_server_reply *rpc)
3839 {
3840 if (rpc->success
3841 && (uuid_is_zero(&rpc->target_sid)
3842 || uuid_equals(&rpc->target_sid, &raft->sid))) {
3843 raft_finished_leaving_cluster(raft);
3844 }
3845 }
3846
3847 static bool
3848 raft_handle_write_error(struct raft *raft, struct ovsdb_error *error)
3849 {
3850 if (error && !raft->failed) {
3851 raft->failed = true;
3852
3853 char *s = ovsdb_error_to_string_free(error);
3854 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3855 raft->name, s);
3856 free(s);
3857 }
3858 return !raft->failed;
3859 }
3860
3861 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3862 raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
3863 uint64_t new_log_start,
3864 const struct raft_entry *new_snapshot)
3865 {
3866 struct raft_header h = {
3867 .sid = raft->sid,
3868 .cid = raft->cid,
3869 .name = raft->name,
3870 .local_address = raft->local_address,
3871 .snap_index = new_log_start - 1,
3872 .snap = *new_snapshot,
3873 };
3874 struct ovsdb_error *error = ovsdb_log_write_and_free(
3875 log, raft_header_to_json(&h));
3876 if (error) {
3877 return error;
3878 }
3879 ovsdb_log_mark_base(raft->log);
3880
3881 /* Write log records. */
3882 for (uint64_t index = new_log_start; index < raft->log_end; index++) {
3883 const struct raft_entry *e = &raft->entries[index - raft->log_start];
3884 struct raft_record r = {
3885 .type = RAFT_REC_ENTRY,
3886 .term = e->term,
3887 .entry = {
3888 .index = index,
3889 .data = e->data,
3890 .servers = e->servers,
3891 .election_timer = e->election_timer,
3892 .eid = e->eid,
3893 },
3894 };
3895 error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3896 if (error) {
3897 return error;
3898 }
3899 }
3900
3901 /* Write term and vote (if any).
3902 *
3903 * The term is redundant if we wrote a log record for that term above. The
3904 * vote, if any, is never redundant.
3905 */
3906 error = raft_write_state(log, raft->term, &raft->vote);
3907 if (error) {
3908 return error;
3909 }
3910
3911 /* Write commit_index if it's beyond the new start of the log. */
3912 if (raft->commit_index >= new_log_start) {
3913 struct raft_record r = {
3914 .type = RAFT_REC_COMMIT_INDEX,
3915 .commit_index = raft->commit_index,
3916 };
3917 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3918 }
3919 return NULL;
3920 }
3921
3922 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3923 raft_save_snapshot(struct raft *raft,
3924 uint64_t new_start, const struct raft_entry *new_snapshot)
3925
3926 {
3927 struct ovsdb_log *new_log;
3928 struct ovsdb_error *error;
3929 error = ovsdb_log_replace_start(raft->log, &new_log);
3930 if (error) {
3931 return error;
3932 }
3933
3934 error = raft_write_snapshot(raft, new_log, new_start, new_snapshot);
3935 if (error) {
3936 ovsdb_log_replace_abort(new_log);
3937 return error;
3938 }
3939
3940 return ovsdb_log_replace_commit(raft->log, new_log);
3941 }
3942
3943 static bool
3944 raft_handle_install_snapshot_request__(
3945 struct raft *raft, const struct raft_install_snapshot_request *rq)
3946 {
3947 raft_reset_election_timer(raft);
3948
3949 /*
3950 * Our behavior here depend on new_log_start in the snapshot compared to
3951 * log_start and log_end. There are three cases:
3952 *
3953 * Case 1 | Case 2 | Case 3
3954 * <---------------->|<------------->|<------------------>
3955 * | |
3956 *
3957 * +---+---+---+---+
3958 * T | T | T | T | T |
3959 * +---+---+---+---+
3960 * ^ ^
3961 * | |
3962 * log_start log_end
3963 */
3964 uint64_t new_log_start = rq->last_index + 1;
3965 if (new_log_start < raft->log_start) {
3966 /* Case 1: The new snapshot covers less than our current one. Nothing
3967 * to do. */
3968 return true;
3969 } else if (new_log_start < raft->log_end) {
3970 /* Case 2: The new snapshot starts in the middle of our log. We could
3971 * discard the first 'new_log_start - raft->log_start' entries in the
3972 * log. But there's not much value in that, since snapshotting is
3973 * supposed to be a local decision. Just skip it. */
3974 return true;
3975 }
3976
3977 /* Case 3: The new snapshot starts past the end of our current log, so
3978 * discard all of our current log. */
3979 const struct raft_entry new_snapshot = {
3980 .term = rq->last_term,
3981 .data = rq->data,
3982 .eid = rq->last_eid,
3983 .servers = rq->last_servers,
3984 .election_timer = rq->election_timer,
3985 };
3986 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3987 &new_snapshot);
3988 if (error) {
3989 char *error_s = ovsdb_error_to_string(error);
3990 VLOG_WARN("could not save snapshot: %s", error_s);
3991 free(error_s);
3992 return false;
3993 }
3994
3995 for (size_t i = 0; i < raft->log_end - raft->log_start; i++) {
3996 raft_entry_uninit(&raft->entries[i]);
3997 }
3998 raft->log_start = raft->log_end = new_log_start;
3999 raft->log_synced = raft->log_end - 1;
4000 raft->commit_index = raft->log_start - 1;
4001 if (raft->last_applied < raft->commit_index) {
4002 raft->last_applied = raft->log_start - 2;
4003 }
4004
4005 raft_entry_uninit(&raft->snap);
4006 raft_entry_clone(&raft->snap, &new_snapshot);
4007
4008 raft_get_servers_from_log(raft, VLL_INFO);
4009 raft_get_election_timer_from_log(raft);
4010
4011 return true;
4012 }
4013
4014 static void
4015 raft_handle_install_snapshot_request(
4016 struct raft *raft, const struct raft_install_snapshot_request *rq)
4017 {
4018 if (raft_handle_install_snapshot_request__(raft, rq)) {
4019 union raft_rpc rpy = {
4020 .install_snapshot_reply = {
4021 .common = {
4022 .type = RAFT_RPC_INSTALL_SNAPSHOT_REPLY,
4023 .sid = rq->common.sid,
4024 },
4025 .term = raft->term,
4026 .last_index = rq->last_index,
4027 .last_term = rq->last_term,
4028 },
4029 };
4030 raft_send(raft, &rpy);
4031 }
4032 }
4033
4034 static void
4035 raft_handle_install_snapshot_reply(
4036 struct raft *raft, const struct raft_install_snapshot_reply *rpy)
4037 {
4038 /* We might get an InstallSnapshot reply from a configured server (e.g. a
4039 * peer) or a server in the process of being added. */
4040 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
4041 if (!s) {
4042 s = raft_find_new_server(raft, &rpy->common.sid);
4043 if (!s) {
4044 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4045 VLOG_INFO_RL(&rl, "cluster "CID_FMT": received %s from "
4046 "unknown server "SID_FMT, CID_ARGS(&raft->cid),
4047 raft_rpc_type_to_string(rpy->common.type),
4048 SID_ARGS(&rpy->common.sid));
4049 return;
4050 }
4051 }
4052
4053 if (rpy->last_index != raft->log_start - 1 ||
4054 rpy->last_term != raft->snap.term) {
4055 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4056 VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s installed "
4057 "out-of-date snapshot, starting over",
4058 CID_ARGS(&raft->cid), s->nickname);
4059 raft_send_install_snapshot_request(raft, s,
4060 "installed obsolete snapshot");
4061 return;
4062 }
4063
4064 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
4065 VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
4066 " up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
4067 s->nickname, rpy->last_term, rpy->last_index);
4068 s->next_index = raft->log_start;
4069 raft_send_append_request(raft, s, raft->log_end - s->next_index,
4070 "snapshot installed");
4071 }
4072
4073 /* Returns true if 'raft' has grown enough since the last snapshot that
4074 * reducing the log to a snapshot would be valuable, false otherwise. */
4075 bool
4076 raft_grew_lots(const struct raft *raft)
4077 {
4078 return ovsdb_log_grew_lots(raft->log);
4079 }
4080
4081 /* Returns the number of log entries that could be trimmed off the on-disk log
4082 * by snapshotting. */
4083 uint64_t
4084 raft_get_log_length(const struct raft *raft)
4085 {
4086 return (raft->last_applied < raft->log_start
4087 ? 0
4088 : raft->last_applied - raft->log_start + 1);
4089 }
4090
4091 /* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
4092 * possible. */
4093 bool
4094 raft_may_snapshot(const struct raft *raft)
4095 {
4096 return (!raft->joining
4097 && !raft->leaving
4098 && !raft->left
4099 && !raft->failed
4100 && raft->last_applied >= raft->log_start);
4101 }
4102
4103 /* Replaces the log for 'raft', up to the last log entry read, by
4104 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
4105 * the caller must eventually free.
4106 *
4107 * This function can only succeed if raft_may_snapshot() returns true. It is
4108 * only valuable to call it if raft_get_log_length() is significant and
4109 * especially if raft_grew_lots() returns true. */
4110 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
4111 raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
4112 {
4113 if (raft->joining) {
4114 return ovsdb_error(NULL,
4115 "cannot store a snapshot while joining cluster");
4116 } else if (raft->leaving) {
4117 return ovsdb_error(NULL,
4118 "cannot store a snapshot while leaving cluster");
4119 } else if (raft->left) {
4120 return ovsdb_error(NULL,
4121 "cannot store a snapshot after leaving cluster");
4122 } else if (raft->failed) {
4123 return ovsdb_error(NULL,
4124 "cannot store a snapshot following failure");
4125 }
4126
4127 if (raft->last_applied < raft->log_start) {
4128 return ovsdb_error(NULL, "not storing a duplicate snapshot");
4129 }
4130
4131 uint64_t new_log_start = raft->last_applied + 1;
4132 struct raft_entry new_snapshot = {
4133 .term = raft_get_term(raft, new_log_start - 1),
4134 .data = json_clone(new_snapshot_data),
4135 .eid = *raft_get_eid(raft, new_log_start - 1),
4136 .servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
4137 .election_timer = raft->election_timer,
4138 };
4139 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
4140 &new_snapshot);
4141 if (error) {
4142 raft_entry_uninit(&new_snapshot);
4143 return error;
4144 }
4145
4146 raft->log_synced = raft->log_end - 1;
4147 raft_entry_uninit(&raft->snap);
4148 raft->snap = new_snapshot;
4149 for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
4150 raft_entry_uninit(&raft->entries[i]);
4151 }
4152 memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
4153 (raft->log_end - new_log_start) * sizeof *raft->entries);
4154 raft->log_start = new_log_start;
4155 return NULL;
4156 }
4157
4158 static void
4159 raft_handle_become_leader(struct raft *raft,
4160 const struct raft_become_leader *rq)
4161 {
4162 if (raft->role == RAFT_FOLLOWER) {
4163 char buf[SID_LEN + 1];
4164 VLOG_INFO("received leadership transfer from %s in term %"PRIu64,
4165 raft_get_nickname(raft, &rq->common.sid, buf, sizeof buf),
4166 rq->term);
4167 raft_start_election(raft, true);
4168 }
4169 }
4170
4171 static void
4172 raft_send_execute_command_reply(struct raft *raft,
4173 const struct uuid *sid,
4174 const struct uuid *eid,
4175 enum raft_command_status status,
4176 uint64_t commit_index)
4177 {
4178 if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REP) {
4179 ovs_fatal(0, "Raft test: crash before sending execute_command_reply");
4180 }
4181 union raft_rpc rpc = {
4182 .execute_command_reply = {
4183 .common = {
4184 .type = RAFT_RPC_EXECUTE_COMMAND_REPLY,
4185 .sid = *sid,
4186 },
4187 .result = *eid,
4188 .status = status,
4189 .commit_index = commit_index,
4190 },
4191 };
4192 raft_send(raft, &rpc);
4193 if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REP) {
4194 ovs_fatal(0, "Raft test: crash after sending execute_command_reply.");
4195 }
4196 }
4197
4198 static enum raft_command_status
4199 raft_handle_execute_command_request__(
4200 struct raft *raft, const struct raft_execute_command_request *rq)
4201 {
4202 if (raft->role != RAFT_LEADER) {
4203 return RAFT_CMD_NOT_LEADER;
4204 }
4205
4206 const struct uuid *current_eid = raft_current_eid(raft);
4207 if (!uuid_equals(&rq->prereq, current_eid)) {
4208 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4209 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
4210 "prerequisite "UUID_FMT" in execute_command_request",
4211 UUID_ARGS(current_eid), UUID_ARGS(&rq->prereq));
4212 return RAFT_CMD_BAD_PREREQ;
4213 }
4214
4215 struct raft_command *cmd = raft_command_initiate(raft, rq->data,
4216 NULL, 0, &rq->result);
4217 cmd->sid = rq->common.sid;
4218
4219 enum raft_command_status status = cmd->status;
4220 raft_command_unref(cmd);
4221 return status;
4222 }
4223
4224 static void
4225 raft_handle_execute_command_request(
4226 struct raft *raft, const struct raft_execute_command_request *rq)
4227 {
4228 enum raft_command_status status
4229 = raft_handle_execute_command_request__(raft, rq);
4230 if (status != RAFT_CMD_INCOMPLETE) {
4231 raft_send_execute_command_reply(raft, &rq->common.sid, &rq->result,
4232 status, 0);
4233 }
4234 }
4235
4236 static void
4237 raft_handle_execute_command_reply(
4238 struct raft *raft, const struct raft_execute_command_reply *rpy)
4239 {
4240 struct raft_command *cmd = raft_find_command_by_eid(raft, &rpy->result);
4241 if (!cmd) {
4242 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4243 char buf[SID_LEN + 1];
4244 VLOG_INFO_RL(&rl,
4245 "%s received \"%s\" reply from %s for unknown command",
4246 raft->local_nickname,
4247 raft_command_status_to_string(rpy->status),
4248 raft_get_nickname(raft, &rpy->common.sid,
4249 buf, sizeof buf));
4250 return;
4251 }
4252
4253 if (rpy->status == RAFT_CMD_INCOMPLETE) {
4254 cmd->timestamp = time_msec();
4255 } else {
4256 cmd->index = rpy->commit_index;
4257 raft_command_complete(raft, cmd, rpy->status);
4258 }
4259 }
4260
4261 static void
4262 raft_handle_rpc(struct raft *raft, const union raft_rpc *rpc)
4263 {
4264 uint64_t term = raft_rpc_get_term(rpc);
4265 if (term
4266 && !raft_should_suppress_disruptive_server(raft, rpc)
4267 && !raft_receive_term__(raft, &rpc->common, term)) {
4268 if (rpc->type == RAFT_RPC_APPEND_REQUEST) {
4269 /* Section 3.3: "If a server receives a request with a stale term
4270 * number, it rejects the request." */
4271 raft_send_append_reply(raft, raft_append_request_cast(rpc),
4272 RAFT_APPEND_INCONSISTENCY, "stale term");
4273 }
4274 return;
4275 }
4276
4277 switch (rpc->type) {
4278 #define RAFT_RPC(ENUM, NAME) \
4279 case ENUM: \
4280 raft_handle_##NAME(raft, &rpc->NAME); \
4281 break;
4282 RAFT_RPC_TYPES
4283 #undef RAFT_RPC
4284 default:
4285 OVS_NOT_REACHED();
4286 }
4287 }
4288 \f
4289 static bool
4290 raft_rpc_is_heartbeat(const union raft_rpc *rpc)
4291 {
4292 return ((rpc->type == RAFT_RPC_APPEND_REQUEST
4293 || rpc->type == RAFT_RPC_APPEND_REPLY)
4294 && rpc->common.comment
4295 && !strcmp(rpc->common.comment, "heartbeat"));
4296 }
4297
4298 \f
4299 static bool
4300 raft_send_to_conn_at(struct raft *raft, const union raft_rpc *rpc,
4301 struct raft_conn *conn, int line_number)
4302 {
4303 log_rpc(rpc, "-->", conn, line_number);
4304 return !jsonrpc_session_send(
4305 conn->js, raft_rpc_to_jsonrpc(&raft->cid, &raft->sid, rpc));
4306 }
4307
4308 static bool
4309 raft_is_rpc_synced(const struct raft *raft, const union raft_rpc *rpc)
4310 {
4311 uint64_t term = raft_rpc_get_term(rpc);
4312 uint64_t index = raft_rpc_get_min_sync_index(rpc);
4313 const struct uuid *vote = raft_rpc_get_vote(rpc);
4314
4315 return (term <= raft->synced_term
4316 && index <= raft->log_synced
4317 && (!vote || uuid_equals(vote, &raft->synced_vote)));
4318 }
4319
4320 static bool
4321 raft_send_at(struct raft *raft, const union raft_rpc *rpc, int line_number)
4322 {
4323 const struct uuid *dst = &rpc->common.sid;
4324 if (uuid_equals(dst, &raft->sid)) {
4325 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4326 VLOG_WARN_RL(&rl, "attempted to send RPC to self from raft.c:%d",
4327 line_number);
4328 return false;
4329 }
4330
4331 struct raft_conn *conn = raft_find_conn_by_sid(raft, dst);
4332 if (!conn) {
4333 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4334 char buf[SID_LEN + 1];
4335 VLOG_DBG_RL(&rl, "%s: no connection to %s, cannot send RPC "
4336 "from raft.c:%d", raft->local_nickname,
4337 raft_get_nickname(raft, dst, buf, sizeof buf),
4338 line_number);
4339 return false;
4340 }
4341
4342 if (!raft_is_rpc_synced(raft, rpc)) {
4343 raft_waiter_create(raft, RAFT_W_RPC, false)->rpc = raft_rpc_clone(rpc);
4344 return true;
4345 }
4346
4347 return raft_send_to_conn_at(raft, rpc, conn, line_number);
4348 }
4349 \f
4350 static struct raft *
4351 raft_lookup_by_name(const char *name)
4352 {
4353 struct raft *raft;
4354
4355 HMAP_FOR_EACH_WITH_HASH (raft, hmap_node, hash_string(name, 0),
4356 &all_rafts) {
4357 if (!strcmp(raft->name, name)) {
4358 return raft;
4359 }
4360 }
4361 return NULL;
4362 }
4363
4364 static void
4365 raft_unixctl_cid(struct unixctl_conn *conn,
4366 int argc OVS_UNUSED, const char *argv[],
4367 void *aux OVS_UNUSED)
4368 {
4369 struct raft *raft = raft_lookup_by_name(argv[1]);
4370 if (!raft) {
4371 unixctl_command_reply_error(conn, "unknown cluster");
4372 } else if (uuid_is_zero(&raft->cid)) {
4373 unixctl_command_reply_error(conn, "cluster id not yet known");
4374 } else {
4375 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->cid));
4376 unixctl_command_reply(conn, uuid);
4377 free(uuid);
4378 }
4379 }
4380
4381 static void
4382 raft_unixctl_sid(struct unixctl_conn *conn,
4383 int argc OVS_UNUSED, const char *argv[],
4384 void *aux OVS_UNUSED)
4385 {
4386 struct raft *raft = raft_lookup_by_name(argv[1]);
4387 if (!raft) {
4388 unixctl_command_reply_error(conn, "unknown cluster");
4389 } else {
4390 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->sid));
4391 unixctl_command_reply(conn, uuid);
4392 free(uuid);
4393 }
4394 }
4395
4396 static void
4397 raft_put_sid(const char *title, const struct uuid *sid,
4398 const struct raft *raft, struct ds *s)
4399 {
4400 ds_put_format(s, "%s: ", title);
4401 if (uuid_equals(sid, &raft->sid)) {
4402 ds_put_cstr(s, "self");
4403 } else if (uuid_is_zero(sid)) {
4404 ds_put_cstr(s, "unknown");
4405 } else {
4406 char buf[SID_LEN + 1];
4407 ds_put_cstr(s, raft_get_nickname(raft, sid, buf, sizeof buf));
4408 }
4409 ds_put_char(s, '\n');
4410 }
4411
4412 static void
4413 raft_unixctl_status(struct unixctl_conn *conn,
4414 int argc OVS_UNUSED, const char *argv[],
4415 void *aux OVS_UNUSED)
4416 {
4417 struct raft *raft = raft_lookup_by_name(argv[1]);
4418 if (!raft) {
4419 unixctl_command_reply_error(conn, "unknown cluster");
4420 return;
4421 }
4422
4423 struct ds s = DS_EMPTY_INITIALIZER;
4424 ds_put_format(&s, "%s\n", raft->local_nickname);
4425 ds_put_format(&s, "Name: %s\n", raft->name);
4426 ds_put_format(&s, "Cluster ID: ");
4427 if (!uuid_is_zero(&raft->cid)) {
4428 ds_put_format(&s, CID_FMT" ("UUID_FMT")\n",
4429 CID_ARGS(&raft->cid), UUID_ARGS(&raft->cid));
4430 } else {
4431 ds_put_format(&s, "not yet known\n");
4432 }
4433 ds_put_format(&s, "Server ID: "SID_FMT" ("UUID_FMT")\n",
4434 SID_ARGS(&raft->sid), UUID_ARGS(&raft->sid));
4435 ds_put_format(&s, "Address: %s\n", raft->local_address);
4436 ds_put_format(&s, "Status: %s\n",
4437 raft->joining ? "joining cluster"
4438 : raft->leaving ? "leaving cluster"
4439 : raft->left ? "left cluster"
4440 : raft->failed ? "failed"
4441 : "cluster member");
4442 if (raft->joining) {
4443 ds_put_format(&s, "Remotes for joining:");
4444 const char *address;
4445 SSET_FOR_EACH (address, &raft->remote_addresses) {
4446 ds_put_format(&s, " %s", address);
4447 }
4448 ds_put_char(&s, '\n');
4449 }
4450 if (raft->role == RAFT_LEADER) {
4451 struct raft_server *as;
4452 HMAP_FOR_EACH (as, hmap_node, &raft->add_servers) {
4453 ds_put_format(&s, "Adding server %s ("SID_FMT" at %s) (%s)\n",
4454 as->nickname, SID_ARGS(&as->sid), as->address,
4455 raft_server_phase_to_string(as->phase));
4456 }
4457
4458 struct raft_server *rs = raft->remove_server;
4459 if (rs) {
4460 ds_put_format(&s, "Removing server %s ("SID_FMT" at %s) (%s)\n",
4461 rs->nickname, SID_ARGS(&rs->sid), rs->address,
4462 raft_server_phase_to_string(rs->phase));
4463 }
4464 }
4465
4466 ds_put_format(&s, "Role: %s\n",
4467 raft->role == RAFT_LEADER ? "leader"
4468 : raft->role == RAFT_CANDIDATE ? "candidate"
4469 : raft->role == RAFT_FOLLOWER ? "follower"
4470 : "<error>");
4471 ds_put_format(&s, "Term: %"PRIu64"\n", raft->term);
4472 raft_put_sid("Leader", &raft->leader_sid, raft, &s);
4473 raft_put_sid("Vote", &raft->vote, raft, &s);
4474 ds_put_char(&s, '\n');
4475
4476 ds_put_format(&s, "Election timer: %"PRIu64, raft->election_timer);
4477 if (raft->role == RAFT_LEADER && raft->election_timer_new) {
4478 ds_put_format(&s, " (changing to %"PRIu64")",
4479 raft->election_timer_new);
4480 }
4481 ds_put_char(&s, '\n');
4482
4483 ds_put_format(&s, "Log: [%"PRIu64", %"PRIu64"]\n",
4484 raft->log_start, raft->log_end);
4485
4486 uint64_t n_uncommitted = raft->log_end - raft->commit_index - 1;
4487 ds_put_format(&s, "Entries not yet committed: %"PRIu64"\n", n_uncommitted);
4488
4489 uint64_t n_unapplied = raft->log_end - raft->last_applied - 1;
4490 ds_put_format(&s, "Entries not yet applied: %"PRIu64"\n", n_unapplied);
4491
4492 const struct raft_conn *c;
4493 ds_put_cstr(&s, "Connections:");
4494 LIST_FOR_EACH (c, list_node, &raft->conns) {
4495 bool connected = jsonrpc_session_is_connected(c->js);
4496 ds_put_format(&s, " %s%s%s%s",
4497 connected ? "" : "(",
4498 c->incoming ? "<-" : "->", c->nickname,
4499 connected ? "" : ")");
4500 }
4501 ds_put_char(&s, '\n');
4502
4503 ds_put_cstr(&s, "Servers:\n");
4504 struct raft_server *server;
4505 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
4506 ds_put_format(&s, " %s ("SID_FMT" at %s)",
4507 server->nickname,
4508 SID_ARGS(&server->sid), server->address);
4509 if (uuid_equals(&server->sid, &raft->sid)) {
4510 ds_put_cstr(&s, " (self)");
4511 }
4512 if (server->phase != RAFT_PHASE_STABLE) {
4513 ds_put_format (&s, " (%s)",
4514 raft_server_phase_to_string(server->phase));
4515 }
4516 if (raft->role == RAFT_CANDIDATE) {
4517 if (!uuid_is_zero(&server->vote)) {
4518 char buf[SID_LEN + 1];
4519 ds_put_format(&s, " (voted for %s)",
4520 raft_get_nickname(raft, &server->vote,
4521 buf, sizeof buf));
4522 }
4523 } else if (raft->role == RAFT_LEADER) {
4524 ds_put_format(&s, " next_index=%"PRIu64" match_index=%"PRIu64,
4525 server->next_index, server->match_index);
4526 }
4527 ds_put_char(&s, '\n');
4528 }
4529
4530 unixctl_command_reply(conn, ds_cstr(&s));
4531 ds_destroy(&s);
4532 }
4533
4534 static void
4535 raft_unixctl_leave__(struct unixctl_conn *conn, struct raft *raft)
4536 {
4537 if (raft_is_leaving(raft)) {
4538 unixctl_command_reply_error(conn,
4539 "already in progress leaving cluster");
4540 } else if (raft_is_joining(raft)) {
4541 unixctl_command_reply_error(conn,
4542 "can't leave while join in progress");
4543 } else if (raft_failed(raft)) {
4544 unixctl_command_reply_error(conn,
4545 "can't leave after failure");
4546 } else {
4547 raft_leave(raft);
4548 unixctl_command_reply(conn, NULL);
4549 }
4550 }
4551
4552 static void
4553 raft_unixctl_leave(struct unixctl_conn *conn, int argc OVS_UNUSED,
4554 const char *argv[], void *aux OVS_UNUSED)
4555 {
4556 struct raft *raft = raft_lookup_by_name(argv[1]);
4557 if (!raft) {
4558 unixctl_command_reply_error(conn, "unknown cluster");
4559 return;
4560 }
4561
4562 raft_unixctl_leave__(conn, raft);
4563 }
4564
4565 static struct raft_server *
4566 raft_lookup_server_best_match(struct raft *raft, const char *id)
4567 {
4568 struct raft_server *best = NULL;
4569 int best_score = -1;
4570 int n_best = 0;
4571
4572 struct raft_server *s;
4573 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
4574 int score = (!strcmp(id, s->address)
4575 ? INT_MAX
4576 : uuid_is_partial_match(&s->sid, id));
4577 if (score > best_score) {
4578 best = s;
4579 best_score = score;
4580 n_best = 1;
4581 } else if (score == best_score) {
4582 n_best++;
4583 }
4584 }
4585 return n_best == 1 ? best : NULL;
4586 }
4587
4588 static void
4589 raft_unixctl_kick(struct unixctl_conn *conn, int argc OVS_UNUSED,
4590 const char *argv[], void *aux OVS_UNUSED)
4591 {
4592 const char *cluster_name = argv[1];
4593 const char *server_name = argv[2];
4594
4595 struct raft *raft = raft_lookup_by_name(cluster_name);
4596 if (!raft) {
4597 unixctl_command_reply_error(conn, "unknown cluster");
4598 return;
4599 }
4600
4601 struct raft_server *server = raft_lookup_server_best_match(raft,
4602 server_name);
4603 if (!server) {
4604 unixctl_command_reply_error(conn, "unknown server");
4605 return;
4606 }
4607
4608 if (uuid_equals(&server->sid, &raft->sid)) {
4609 raft_unixctl_leave__(conn, raft);
4610 } else if (raft->role == RAFT_LEADER) {
4611 const struct raft_remove_server_request rq = {
4612 .sid = server->sid,
4613 .requester_conn = conn,
4614 };
4615 raft_handle_remove_server_request(raft, &rq);
4616 } else {
4617 const union raft_rpc rpc = {
4618 .remove_server_request = {
4619 .common = {
4620 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
4621 .sid = raft->leader_sid,
4622 .comment = "via unixctl"
4623 },
4624 .sid = server->sid,
4625 }
4626 };
4627 if (raft_send(raft, &rpc)) {
4628 unixctl_command_reply(conn, "sent removal request to leader");
4629 } else {
4630 unixctl_command_reply_error(conn,
4631 "failed to send removal request");
4632 }
4633 }
4634 }
4635
4636 static void
4637 raft_get_election_timer_from_log(struct raft *raft)
4638 {
4639 if (raft->snap.election_timer) {
4640 raft->election_timer = raft->snap.election_timer;
4641 }
4642 for (uint64_t index = raft->commit_index; index >= raft->log_start;
4643 index--) {
4644 struct raft_entry *e = &raft->entries[index - raft->log_start];
4645 if (e->election_timer) {
4646 raft->election_timer = e->election_timer;
4647 break;
4648 }
4649 }
4650 }
4651
4652 static void
4653 raft_log_election_timer(struct raft *raft)
4654 {
4655 raft_command_unref(raft_command_execute__(raft, NULL, NULL,
4656 raft->election_timer_new, NULL,
4657 NULL));
4658 }
4659
4660 static void
4661 raft_unixctl_change_election_timer(struct unixctl_conn *conn,
4662 int argc OVS_UNUSED, const char *argv[],
4663 void *aux OVS_UNUSED)
4664 {
4665 const char *cluster_name = argv[1];
4666 const char *election_timer_str = argv[2];
4667
4668 struct raft *raft = raft_lookup_by_name(cluster_name);
4669 if (!raft) {
4670 unixctl_command_reply_error(conn, "unknown cluster");
4671 return;
4672 }
4673
4674 if (raft->role != RAFT_LEADER) {
4675 unixctl_command_reply_error(conn, "election timer must be changed"
4676 " through leader.");
4677 return;
4678 }
4679
4680 /* If there are pending changes for election timer, reject it. */
4681 if (raft->election_timer_new) {
4682 unixctl_command_reply_error(conn, "election timer change pending.");
4683 return;
4684 }
4685
4686 uint64_t election_timer = atoll(election_timer_str);
4687 if (election_timer == raft->election_timer) {
4688 unixctl_command_reply(conn, "change election timer to current value.");
4689 return;
4690 }
4691
4692 /* Election timer smaller than 100ms or bigger than 10min doesn't make
4693 * sense. */
4694 if (election_timer < 100 || election_timer > 600000) {
4695 unixctl_command_reply_error(conn, "election timer must be between "
4696 "100 and 600000, in msec.");
4697 return;
4698 }
4699
4700 /* If election timer is to be enlarged, it should be done gradually so that
4701 * it won't cause timeout when new value is applied on leader but not yet
4702 * applied on some of the followers. */
4703 if (election_timer > raft->election_timer * 2) {
4704 unixctl_command_reply_error(conn, "election timer increase should not "
4705 "exceed the current value x 2.");
4706 return;
4707 }
4708
4709 raft->election_timer_new = election_timer;
4710 raft_log_election_timer(raft);
4711 unixctl_command_reply(conn, "change of election timer initiated.");
4712 }
4713
4714 static void
4715 raft_unixctl_failure_test(struct unixctl_conn *conn OVS_UNUSED,
4716 int argc OVS_UNUSED, const char *argv[],
4717 void *aux OVS_UNUSED)
4718 {
4719 const char *test = argv[1];
4720 if (!strcmp(test, "crash-before-sending-append-request")) {
4721 failure_test = FT_CRASH_BEFORE_SEND_APPEND_REQ;
4722 } else if (!strcmp(test, "crash-after-sending-append-request")) {
4723 failure_test = FT_CRASH_AFTER_SEND_APPEND_REQ;
4724 } else if (!strcmp(test, "crash-before-sending-execute-command-reply")) {
4725 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REP;
4726 } else if (!strcmp(test, "crash-after-sending-execute-command-reply")) {
4727 failure_test = FT_CRASH_AFTER_SEND_EXEC_REP;
4728 } else if (!strcmp(test, "crash-before-sending-execute-command-request")) {
4729 failure_test = FT_CRASH_BEFORE_SEND_EXEC_REQ;
4730 } else if (!strcmp(test, "crash-after-sending-execute-command-request")) {
4731 failure_test = FT_CRASH_AFTER_SEND_EXEC_REQ;
4732 } else if (!strcmp(test, "crash-after-receiving-append-request-update")) {
4733 failure_test = FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE;
4734 } else if (!strcmp(test, "delay-election")) {
4735 failure_test = FT_DELAY_ELECTION;
4736 struct raft *raft;
4737 HMAP_FOR_EACH (raft, hmap_node, &all_rafts) {
4738 if (raft->role == RAFT_FOLLOWER) {
4739 raft_reset_election_timer(raft);
4740 }
4741 }
4742 } else if (!strcmp(test, "dont-send-vote-request")) {
4743 failure_test = FT_DONT_SEND_VOTE_REQUEST;
4744 } else if (!strcmp(test, "clear")) {
4745 failure_test = FT_NO_TEST;
4746 unixctl_command_reply(conn, "test dismissed");
4747 return;
4748 } else {
4749 unixctl_command_reply_error(conn, "unknown test scenario");
4750 return;
4751 }
4752 unixctl_command_reply(conn, "test engaged");
4753 }
4754
4755 static void
4756 raft_init(void)
4757 {
4758 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
4759 if (!ovsthread_once_start(&once)) {
4760 return;
4761 }
4762 unixctl_command_register("cluster/cid", "DB", 1, 1,
4763 raft_unixctl_cid, NULL);
4764 unixctl_command_register("cluster/sid", "DB", 1, 1,
4765 raft_unixctl_sid, NULL);
4766 unixctl_command_register("cluster/status", "DB", 1, 1,
4767 raft_unixctl_status, NULL);
4768 unixctl_command_register("cluster/leave", "DB", 1, 1,
4769 raft_unixctl_leave, NULL);
4770 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4771 raft_unixctl_kick, NULL);
4772 unixctl_command_register("cluster/change-election-timer", "DB TIME", 2, 2,
4773 raft_unixctl_change_election_timer, NULL);
4774 unixctl_command_register("cluster/failure-test", "FAILURE SCENARIO", 1, 1,
4775 raft_unixctl_failure_test, NULL);
4776 ovsthread_once_done(&once);
4777 }