]> git.proxmox.com Git - mirror_ovs.git/blame - ovsdb/raft.c
stream: Allow timeout configuration for open_block.
[mirror_ovs.git] / ovsdb / raft.c
CommitLineData
1b1d2e6d
BP
1/*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <config.h>
18
19#include "raft.h"
20#include "raft-private.h"
21
22#include <errno.h>
23#include <unistd.h>
24
25#include "hash.h"
26#include "jsonrpc.h"
27#include "lockfile.h"
28#include "openvswitch/dynamic-string.h"
29#include "openvswitch/hmap.h"
30#include "openvswitch/json.h"
31#include "openvswitch/list.h"
32#include "openvswitch/poll-loop.h"
33#include "openvswitch/vlog.h"
34#include "ovsdb-error.h"
35#include "ovsdb-parser.h"
36#include "ovsdb/log.h"
37#include "raft-rpc.h"
38#include "random.h"
39#include "socket-util.h"
40#include "stream.h"
41#include "timeval.h"
42#include "unicode.h"
43#include "unixctl.h"
44#include "util.h"
45#include "uuid.h"
46
47VLOG_DEFINE_THIS_MODULE(raft);
48
49/* Roles for a Raft server:
50 *
51 * - Followers: Servers in touch with the current leader.
52 *
53 * - Candidate: Servers unaware of a current leader and seeking election to
54 * leader.
55 *
56 * - Leader: Handles all client requests. At most one at a time.
57 *
58 * In normal operation there is exactly one leader and all of the other servers
59 * are followers. */
60enum raft_role {
61 RAFT_FOLLOWER,
62 RAFT_CANDIDATE,
63 RAFT_LEADER
64};
65
66/* A connection between this Raft server and another one. */
67struct raft_conn {
68 struct ovs_list list_node; /* In struct raft's 'conns' list. */
69 struct jsonrpc_session *js; /* JSON-RPC connection. */
70 struct uuid sid; /* This server's unique ID. */
71 char *nickname; /* Short name for use in log messages. */
72 bool incoming; /* True if incoming, false if outgoing. */
73 unsigned int js_seqno; /* Seqno for noticing (re)connections. */
74};
75
76static void raft_conn_close(struct raft_conn *);
77
78/* A "command", that is, a request to append an entry to the log.
79 *
80 * The Raft specification only allows clients to issue commands to the leader.
81 * With this implementation, clients may issue a command on any server, which
82 * then relays the command to the leader if necessary.
83 *
84 * This structure is thus used in three cases:
85 *
86 * 1. We are the leader and the command was issued to us directly.
87 *
88 * 2. We are a follower and relayed the command to the leader.
89 *
90 * 3. We are the leader and a follower relayed the command to us.
91 */
92struct raft_command {
93 /* All cases. */
94 struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
95 unsigned int n_refs; /* Reference count. */
96 enum raft_command_status status; /* Execution status. */
97
98 /* Case 1 only. */
99 uint64_t index; /* Index in log (0 if being relayed). */
100
101 /* Cases 2 and 3. */
102 struct uuid eid; /* Entry ID of result. */
103
104 /* Case 2 only. */
105 long long int timestamp; /* Issue or last ping time, for expiration. */
106
107 /* Case 3 only. */
108 struct uuid sid; /* The follower (otherwise UUID_ZERO). */
109};
110
111static void raft_command_complete(struct raft *, struct raft_command *,
112 enum raft_command_status);
113
114static void raft_complete_all_commands(struct raft *,
115 enum raft_command_status);
116
117/* Type of deferred action, see struct raft_waiter. */
118enum raft_waiter_type {
119 RAFT_W_ENTRY,
120 RAFT_W_TERM,
121 RAFT_W_RPC,
122};
123
124/* An action deferred until a log write commits to disk. */
125struct raft_waiter {
126 struct ovs_list list_node;
127 uint64_t commit_ticket;
128
129 enum raft_waiter_type type;
130 union {
131 /* RAFT_W_ENTRY.
132 *
133 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
134 * completion, updates 'log_synced' to indicate that the new log entry
135 * or entries are committed and, if we are leader, also updates our
136 * local 'match_index'. */
137 struct {
138 uint64_t index;
139 } entry;
140
141 /* RAFT_W_TERM.
142 *
143 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
144 * Upon completion, updates 'synced_term' and 'synced_vote', which
145 * triggers sending RPCs deferred by the uncommitted 'term' and
146 * 'vote'. */
147 struct {
148 uint64_t term;
149 struct uuid vote;
150 } term;
151
152 /* RAFT_W_RPC.
153 *
154 * Sometimes, sending an RPC to a peer must be delayed until an entry,
155 * a term, or a vote mentioned in the RPC is synced to disk. This
156 * waiter keeps a copy of such an RPC until the previous waiters have
157 * committed. */
158 union raft_rpc *rpc;
159 };
160};
161
162static struct raft_waiter *raft_waiter_create(struct raft *,
163 enum raft_waiter_type,
164 bool start_commit);
165static void raft_waiters_destroy(struct raft *);
166
167/* The Raft state machine. */
168struct raft {
169 struct hmap_node hmap_node; /* In 'all_rafts'. */
170 struct ovsdb_log *log;
171
172/* Persistent derived state.
173 *
174 * This must be updated on stable storage before responding to RPCs. It can be
175 * derived from the header, snapshot, and log in 'log'. */
176
177 struct uuid cid; /* Cluster ID (immutable for the cluster). */
178 struct uuid sid; /* Server ID (immutable for the server). */
179 char *local_address; /* Local address (immutable for the server). */
180 char *local_nickname; /* Used for local server in log messages. */
181 char *name; /* Schema name (immutable for the cluster). */
182
183 /* Contains "struct raft_server"s and represents the server configuration
184 * most recently added to 'log'. */
185 struct hmap servers;
186
187/* Persistent state on all servers.
188 *
189 * Must be updated on stable storage before responding to RPCs. */
190
191 /* Current term and the vote for that term. These might be on the way to
192 * disk now. */
193 uint64_t term; /* Initialized to 0 and only increases. */
194 struct uuid vote; /* All-zeros if no vote yet in 'term'. */
195
196 /* The term and vote that have been synced to disk. */
197 uint64_t synced_term;
198 struct uuid synced_vote;
199
200 /* The log.
201 *
202 * A log entry with index 1 never really exists; the initial snapshot for a
203 * Raft is considered to include this index. The first real log entry has
204 * index 2.
205 *
206 * A new Raft instance contains an empty log: log_start=2, log_end=2.
207 * Over time, the log grows: log_start=2, log_end=N.
208 * At some point, the server takes a snapshot: log_start=N, log_end=N.
209 * The log continues to grow: log_start=N, log_end=N+1...
210 *
211 * Must be updated on stable storage before responding to RPCs. */
212 struct raft_entry *entries; /* Log entry i is in log[i - log_start]. */
213 uint64_t log_start; /* Index of first entry in log. */
214 uint64_t log_end; /* Index of last entry in log, plus 1. */
215 uint64_t log_synced; /* Index of last synced entry. */
216 size_t allocated_log; /* Allocated entries in 'log'. */
217
218 /* Snapshot state (see Figure 5.1)
219 *
220 * This is the state of the cluster as of the last discarded log entry,
221 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
222 * Only committed log entries can be included in a snapshot. */
223 struct raft_entry snap;
224
225/* Volatile state.
226 *
227 * The snapshot is always committed, but the rest of the log might not be yet.
228 * 'last_applied' tracks what entries have been passed to the client. If the
229 * client hasn't yet read the latest snapshot, then even the snapshot isn't
230 * applied yet. Thus, the invariants are different for these members:
231 *
232 * log_start - 2 <= last_applied <= commit_index < log_end.
233 * log_start - 1 <= commit_index < log_end.
234 */
235
236 enum raft_role role; /* Current role. */
237 uint64_t commit_index; /* Max log index known to be committed. */
238 uint64_t last_applied; /* Max log index applied to state machine. */
239 struct uuid leader_sid; /* Server ID of leader (zero, if unknown). */
240
241 /* Followers and candidates only. */
242#define ELECTION_BASE_MSEC 1024
243#define ELECTION_RANGE_MSEC 1024
244 long long int election_base; /* Time of last heartbeat from leader. */
245 long long int election_timeout; /* Time at which we start an election. */
246
247 /* Used for joining a cluster. */
248 bool joining; /* Attempting to join the cluster? */
249 struct sset remote_addresses; /* Addresses to try to find other servers. */
250 long long int join_timeout; /* Time to re-send add server request. */
251
252 /* Used for leaving a cluster. */
253 bool leaving; /* True if we are leaving the cluster. */
254 bool left; /* True if we have finished leaving. */
255 long long int leave_timeout; /* Time to re-send remove server request. */
256
257 /* Failure. */
258 bool failed; /* True if unrecoverable error has occurred. */
259
260 /* File synchronization. */
261 struct ovs_list waiters; /* Contains "struct raft_waiter"s. */
262
263 /* Network connections. */
264 struct pstream *listener; /* For connections from other Raft servers. */
265 long long int listen_backoff; /* For retrying creating 'listener'. */
266 struct ovs_list conns; /* Contains struct raft_conns. */
267
268 /* Leaders only. Reinitialized after becoming leader. */
269 struct hmap add_servers; /* Contains "struct raft_server"s to add. */
270 struct raft_server *remove_server; /* Server being removed. */
271 struct hmap commands; /* Contains "struct raft_command"s. */
272#define PING_TIME_MSEC (ELECTION_BASE_MSEC / 3)
273 long long int ping_timeout; /* Time at which to send a heartbeat */
274
275 /* Candidates only. Reinitialized at start of election. */
276 int n_votes; /* Number of votes for me. */
277};
278
279/* All Raft structures. */
280static struct hmap all_rafts = HMAP_INITIALIZER(&all_rafts);
281
282static void raft_init(void);
283
284static struct ovsdb_error *raft_read_header(struct raft *)
285 OVS_WARN_UNUSED_RESULT;
286
287static void raft_send_execute_command_reply(struct raft *,
288 const struct uuid *sid,
289 const struct uuid *eid,
290 enum raft_command_status,
291 uint64_t commit_index);
292
293static void raft_update_our_match_index(struct raft *, uint64_t min_index);
294
295static void raft_send_remove_server_reply__(
296 struct raft *, const struct uuid *target_sid,
297 const struct uuid *requester_sid, struct unixctl_conn *requester_conn,
298 bool success, const char *comment);
17bd4149 299static void raft_finished_leaving_cluster(struct raft *);
1b1d2e6d
BP
300
301static void raft_server_init_leader(struct raft *, struct raft_server *);
302
303static bool raft_rpc_is_heartbeat(const union raft_rpc *);
304static bool raft_is_rpc_synced(const struct raft *, const union raft_rpc *);
305
306static void raft_handle_rpc(struct raft *, const union raft_rpc *);
17bd4149 307
02acb41a
BP
308static bool raft_send_at(struct raft *, const union raft_rpc *,
309 int line_number);
310#define raft_send(raft, rpc) raft_send_at(raft, rpc, __LINE__)
311
312static bool raft_send_to_conn_at(struct raft *, const union raft_rpc *,
313 struct raft_conn *, int line_number);
314#define raft_send_to_conn(raft, rpc, conn) \
315 raft_send_to_conn_at(raft, rpc, conn, __LINE__)
316
1b1d2e6d
BP
317static void raft_send_append_request(struct raft *,
318 struct raft_server *, unsigned int n,
319 const char *comment);
320
321static void raft_become_leader(struct raft *);
322static void raft_become_follower(struct raft *);
323static void raft_reset_timer(struct raft *);
324static void raft_send_heartbeats(struct raft *);
325static void raft_start_election(struct raft *, bool leadership_transfer);
326static bool raft_truncate(struct raft *, uint64_t new_end);
327static void raft_get_servers_from_log(struct raft *, enum vlog_level);
328
329static bool raft_handle_write_error(struct raft *, struct ovsdb_error *);
330
331static void raft_run_reconfigure(struct raft *);
332
333static struct raft_server *
334raft_find_server(const struct raft *raft, const struct uuid *sid)
335{
336 return raft_server_find(&raft->servers, sid);
337}
338
339static char *
340raft_make_address_passive(const char *address_)
341{
342 if (!strncmp(address_, "unix:", 5)) {
343 return xasprintf("p%s", address_);
344 } else {
345 char *address = xstrdup(address_);
0b043300
BP
346 char *host, *port;
347 inet_parse_host_port_tokens(strchr(address, ':') + 1, &host, &port);
1b1d2e6d
BP
348
349 struct ds paddr = DS_EMPTY_INITIALIZER;
350 ds_put_format(&paddr, "p%.3s:%s:", address, port);
351 if (strchr(host, ':')) {
352 ds_put_format(&paddr, "[%s]", host);
353 } else {
354 ds_put_cstr(&paddr, host);
355 }
356 free(address);
357 return ds_steal_cstr(&paddr);
358 }
359}
360
361static struct raft *
362raft_alloc(void)
363{
364 raft_init();
365
366 struct raft *raft = xzalloc(sizeof *raft);
367 hmap_node_nullify(&raft->hmap_node);
368 hmap_init(&raft->servers);
369 raft->log_start = raft->log_end = 1;
370 raft->role = RAFT_FOLLOWER;
371 sset_init(&raft->remote_addresses);
372 raft->join_timeout = LLONG_MAX;
373 ovs_list_init(&raft->waiters);
374 raft->listen_backoff = LLONG_MIN;
375 ovs_list_init(&raft->conns);
376 hmap_init(&raft->add_servers);
377 hmap_init(&raft->commands);
378
379 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
380 raft_reset_timer(raft);
381
382 return raft;
383}
384
385/* Creates an on-disk file that represents a new Raft cluster and initializes
386 * it to consist of a single server, the one on which this function is called.
387 *
388 * Creates the local copy of the cluster's log in 'file_name', which must not
389 * already exist. Gives it the name 'name', which should be the database
390 * schema name and which is used only to match up this database with the server
391 * added to the cluster later if the cluster ID is unavailable.
392 *
393 * The new server is located at 'local_address', which must take one of the
394 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
395 * square bracket enclosed IPv6 address and PORT is a TCP port number.
396 *
397 * This only creates the on-disk file. Use raft_open() to start operating the
398 * new server.
399 *
400 * Returns null if successful, otherwise an ovsdb_error describing the
401 * problem. */
402struct ovsdb_error * OVS_WARN_UNUSED_RESULT
403raft_create_cluster(const char *file_name, const char *name,
404 const char *local_address, const struct json *data)
405{
406 /* Parse and verify validity of the local address. */
407 struct ovsdb_error *error = raft_address_validate(local_address);
408 if (error) {
409 return error;
410 }
411
412 /* Create log file. */
413 struct ovsdb_log *log;
414 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
415 -1, &log);
416 if (error) {
417 return error;
418 }
419
420 /* Write log file. */
421 struct raft_header h = {
422 .sid = uuid_random(),
423 .cid = uuid_random(),
424 .name = xstrdup(name),
425 .local_address = xstrdup(local_address),
426 .joining = false,
427 .remote_addresses = SSET_INITIALIZER(&h.remote_addresses),
428 .snap_index = 1,
429 .snap = {
430 .term = 1,
431 .data = json_nullable_clone(data),
432 .eid = uuid_random(),
433 .servers = json_object_create(),
434 },
435 };
436 shash_add_nocopy(json_object(h.snap.servers),
437 xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
438 json_string_create(local_address));
439 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
440 raft_header_uninit(&h);
441 if (!error) {
442 error = ovsdb_log_commit_block(log);
443 }
444 ovsdb_log_close(log);
445
446 return error;
447}
448
449/* Creates a database file that represents a new server in an existing Raft
450 * cluster.
451 *
452 * Creates the local copy of the cluster's log in 'file_name', which must not
453 * already exist. Gives it the name 'name', which must be the same name
454 * passed in to raft_create_cluster() earlier.
455 *
456 * 'cid' is optional. If specified, the new server will join only the cluster
457 * with the given cluster ID.
458 *
459 * The new server is located at 'local_address', which must take one of the
460 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
461 * square bracket enclosed IPv6 address and PORT is a TCP port number.
462 *
463 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
464 * specifies the addresses of existing servers in the cluster. One server out
465 * of the existing cluster is sufficient, as long as that server is reachable
466 * and not partitioned from the current cluster leader. If multiple servers
467 * from the cluster are specified, then it is sufficient for any of them to
468 * meet this criterion.
469 *
470 * This only creates the on-disk file and does no network access. Use
471 * raft_open() to start operating the new server. (Until this happens, the
472 * new server has not joined the cluster.)
473 *
474 * Returns null if successful, otherwise an ovsdb_error describing the
475 * problem. */
476struct ovsdb_error * OVS_WARN_UNUSED_RESULT
477raft_join_cluster(const char *file_name,
478 const char *name, const char *local_address,
479 const struct sset *remote_addresses,
480 const struct uuid *cid)
481{
482 ovs_assert(!sset_is_empty(remote_addresses));
483
484 /* Parse and verify validity of the addresses. */
485 struct ovsdb_error *error = raft_address_validate(local_address);
486 if (error) {
487 return error;
488 }
489 const char *addr;
490 SSET_FOR_EACH (addr, remote_addresses) {
491 error = raft_address_validate(addr);
492 if (error) {
493 return error;
494 }
495 if (!strcmp(addr, local_address)) {
496 return ovsdb_error(NULL, "remote addresses cannot be the same "
497 "as the local address");
498 }
499 }
500
501 /* Verify validity of the cluster ID (if provided). */
502 if (cid && uuid_is_zero(cid)) {
503 return ovsdb_error(NULL, "all-zero UUID is not valid cluster ID");
504 }
505
506 /* Create log file. */
507 struct ovsdb_log *log;
508 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
509 -1, &log);
510 if (error) {
511 return error;
512 }
513
514 /* Write log file. */
515 struct raft_header h = {
516 .sid = uuid_random(),
517 .cid = cid ? *cid : UUID_ZERO,
518 .name = xstrdup(name),
519 .local_address = xstrdup(local_address),
520 .joining = true,
521 /* No snapshot yet. */
522 };
523 sset_clone(&h.remote_addresses, remote_addresses);
524 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
525 raft_header_uninit(&h);
526 if (!error) {
527 error = ovsdb_log_commit_block(log);
528 }
529 ovsdb_log_close(log);
530
531 return error;
532}
533
534/* Reads the initial header record from 'log', which must be a Raft clustered
535 * database log, and populates '*md' with the information read from it. The
536 * caller must eventually destroy 'md' with raft_metadata_destroy().
537 *
538 * On success, returns NULL. On failure, returns an error that the caller must
539 * eventually destroy and zeros '*md'. */
540struct ovsdb_error * OVS_WARN_UNUSED_RESULT
541raft_read_metadata(struct ovsdb_log *log, struct raft_metadata *md)
542{
543 struct raft *raft = raft_alloc();
544 raft->log = log;
545
546 struct ovsdb_error *error = raft_read_header(raft);
547 if (!error) {
548 md->sid = raft->sid;
549 md->name = xstrdup(raft->name);
550 md->local = xstrdup(raft->local_address);
551 md->cid = raft->cid;
552 } else {
553 memset(md, 0, sizeof *md);
554 }
555
556 raft->log = NULL;
557 raft_close(raft);
558 return error;
559}
560
561/* Frees the metadata in 'md'. */
562void
563raft_metadata_destroy(struct raft_metadata *md)
564{
565 if (md) {
566 free(md->name);
567 free(md->local);
568 }
569}
570
571static const struct raft_entry *
572raft_get_entry(const struct raft *raft, uint64_t index)
573{
574 ovs_assert(index >= raft->log_start);
575 ovs_assert(index < raft->log_end);
576 return &raft->entries[index - raft->log_start];
577}
578
579static uint64_t
580raft_get_term(const struct raft *raft, uint64_t index)
581{
582 return (index == raft->log_start - 1
583 ? raft->snap.term
584 : raft_get_entry(raft, index)->term);
585}
586
587static const struct json *
588raft_servers_for_index(const struct raft *raft, uint64_t index)
589{
590 ovs_assert(index >= raft->log_start - 1);
591 ovs_assert(index < raft->log_end);
592
593 const struct json *servers = raft->snap.servers;
594 for (uint64_t i = raft->log_start; i <= index; i++) {
595 const struct raft_entry *e = raft_get_entry(raft, i);
596 if (e->servers) {
597 servers = e->servers;
598 }
599 }
600 return servers;
601}
602
603static void
604raft_set_servers(struct raft *raft, const struct hmap *new_servers,
605 enum vlog_level level)
606{
607 struct raft_server *s, *next;
608 HMAP_FOR_EACH_SAFE (s, next, hmap_node, &raft->servers) {
609 if (!raft_server_find(new_servers, &s->sid)) {
610 ovs_assert(s != raft->remove_server);
611
612 hmap_remove(&raft->servers, &s->hmap_node);
613 VLOG(level, "server %s removed from configuration", s->nickname);
614 raft_server_destroy(s);
615 }
616 }
617
618 HMAP_FOR_EACH_SAFE (s, next, hmap_node, new_servers) {
619 if (!raft_find_server(raft, &s->sid)) {
620 VLOG(level, "server %s added to configuration", s->nickname);
621
622 struct raft_server *new
623 = raft_server_add(&raft->servers, &s->sid, s->address);
624 raft_server_init_leader(raft, new);
625 }
626 }
627}
628
629static uint64_t
630raft_add_entry(struct raft *raft,
631 uint64_t term, struct json *data, const struct uuid *eid,
632 struct json *servers)
633{
634 if (raft->log_end - raft->log_start >= raft->allocated_log) {
635 raft->entries = x2nrealloc(raft->entries, &raft->allocated_log,
636 sizeof *raft->entries);
637 }
638
639 uint64_t index = raft->log_end++;
640 struct raft_entry *entry = &raft->entries[index - raft->log_start];
641 entry->term = term;
642 entry->data = data;
643 entry->eid = eid ? *eid : UUID_ZERO;
644 entry->servers = servers;
645 return index;
646}
647
648/* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers' to
649 * 'raft''s log and returns an error indication. */
650static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
651raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
652 const struct uuid *eid, struct json *servers)
653{
654 struct raft_record r = {
655 .type = RAFT_REC_ENTRY,
656 .term = term,
657 .entry = {
658 .index = raft_add_entry(raft, term, data, eid, servers),
659 .data = data,
660 .servers = servers,
661 .eid = eid ? *eid : UUID_ZERO,
662 },
663 };
664 return ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r));
665}
666
667static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
668raft_write_state(struct ovsdb_log *log,
669 uint64_t term, const struct uuid *vote)
670{
671 struct raft_record r = { .term = term };
672 if (vote && !uuid_is_zero(vote)) {
673 r.type = RAFT_REC_VOTE;
674 r.sid = *vote;
675 } else {
676 r.type = RAFT_REC_TERM;
677 }
678 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
679}
680
681static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
682raft_apply_record(struct raft *raft, unsigned long long int rec_idx,
683 const struct raft_record *r)
684{
685 /* Apply "term", which is present in most kinds of records (and otherwise
686 * 0).
687 *
688 * A Raft leader can replicate entries from previous terms to the other
689 * servers in the cluster, retaining the original terms on those entries
690 * (see section 3.6.2 "Committing entries from previous terms" for more
691 * information), so it's OK for the term in a log record to precede the
692 * current term. */
693 if (r->term > raft->term) {
694 raft->term = raft->synced_term = r->term;
695 raft->vote = raft->synced_vote = UUID_ZERO;
696 }
697
698 switch (r->type) {
699 case RAFT_REC_ENTRY:
700 if (r->entry.index < raft->commit_index) {
701 return ovsdb_error(NULL, "record %llu attempts to truncate log "
702 "from %"PRIu64" to %"PRIu64" entries, but "
703 "commit index is already %"PRIu64,
704 rec_idx, raft->log_end, r->entry.index,
705 raft->commit_index);
706 } else if (r->entry.index > raft->log_end) {
707 return ovsdb_error(NULL, "record %llu with index %"PRIu64" skips "
708 "past expected index %"PRIu64,
709 rec_idx, r->entry.index, raft->log_end);
710 }
711
712 if (r->entry.index < raft->log_end) {
713 /* This can happen, but it is notable. */
714 VLOG_DBG("record %llu truncates log from %"PRIu64" to %"PRIu64
715 " entries", rec_idx, raft->log_end, r->entry.index);
716 raft_truncate(raft, r->entry.index);
717 }
718
719 uint64_t prev_term = (raft->log_end > raft->log_start
720 ? raft->entries[raft->log_end
721 - raft->log_start - 1].term
722 : raft->snap.term);
723 if (r->term < prev_term) {
724 return ovsdb_error(NULL, "record %llu with index %"PRIu64" term "
725 "%"PRIu64" precedes previous entry's term "
726 "%"PRIu64,
727 rec_idx, r->entry.index, r->term, prev_term);
728 }
729
730 raft->log_synced = raft_add_entry(
731 raft, r->term,
732 json_nullable_clone(r->entry.data), &r->entry.eid,
733 json_nullable_clone(r->entry.servers));
734 return NULL;
735
736 case RAFT_REC_TERM:
737 return NULL;
738
739 case RAFT_REC_VOTE:
740 if (r->term < raft->term) {
741 return ovsdb_error(NULL, "record %llu votes for term %"PRIu64" "
742 "but current term is %"PRIu64,
743 rec_idx, r->term, raft->term);
744 } else if (!uuid_is_zero(&raft->vote)
745 && !uuid_equals(&raft->vote, &r->sid)) {
746 return ovsdb_error(NULL, "record %llu votes for "SID_FMT" in term "
747 "%"PRIu64" but a previous record for the "
748 "same term voted for "SID_FMT, rec_idx,
749 SID_ARGS(&raft->vote), r->term,
750 SID_ARGS(&r->sid));
751 } else {
752 raft->vote = raft->synced_vote = r->sid;
753 return NULL;
754 }
755 break;
756
757 case RAFT_REC_NOTE:
758 if (!strcmp(r->note, "left")) {
759 return ovsdb_error(NULL, "record %llu indicates server has left "
760 "the cluster; it cannot be added back (use "
761 "\"ovsdb-tool join-cluster\" to add a new "
762 "server)", rec_idx);
763 }
764 return NULL;
765
766 case RAFT_REC_COMMIT_INDEX:
767 if (r->commit_index < raft->commit_index) {
768 return ovsdb_error(NULL, "record %llu regresses commit index "
769 "from %"PRIu64 " to %"PRIu64,
770 rec_idx, raft->commit_index, r->commit_index);
771 } else if (r->commit_index >= raft->log_end) {
772 return ovsdb_error(NULL, "record %llu advances commit index to "
773 "%"PRIu64 " but last log index is %"PRIu64,
774 rec_idx, r->commit_index, raft->log_end - 1);
775 } else {
776 raft->commit_index = r->commit_index;
777 return NULL;
778 }
779 break;
780
781 case RAFT_REC_LEADER:
782 /* XXX we could use this to take back leadership for quick restart */
783 return NULL;
784
785 default:
786 OVS_NOT_REACHED();
787 }
788}
789
790static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
791raft_read_header(struct raft *raft)
792{
793 /* Read header record. */
794 struct json *json;
795 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
796 if (error || !json) {
797 /* Report error or end-of-file. */
798 return error;
799 }
800 ovsdb_log_mark_base(raft->log);
801
802 struct raft_header h;
803 error = raft_header_from_json(&h, json);
804 json_destroy(json);
805 if (error) {
806 return error;
807 }
808
809 raft->sid = h.sid;
810 raft->cid = h.cid;
811 raft->name = xstrdup(h.name);
812 raft->local_address = xstrdup(h.local_address);
813 raft->local_nickname = raft_address_to_nickname(h.local_address, &h.sid);
814 raft->joining = h.joining;
815
816 if (h.joining) {
817 sset_clone(&raft->remote_addresses, &h.remote_addresses);
818 } else {
819 raft_entry_clone(&raft->snap, &h.snap);
820 raft->log_start = raft->log_end = h.snap_index + 1;
821 raft->commit_index = h.snap_index;
822 raft->last_applied = h.snap_index - 1;
823 }
824
825 raft_header_uninit(&h);
826
827 return NULL;
828}
829
830static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
831raft_read_log(struct raft *raft)
832{
833 for (unsigned long long int i = 1; ; i++) {
834 struct json *json;
835 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
836 if (!json) {
837 if (error) {
838 /* We assume that the error is due to a partial write while
839 * appending to the file before a crash, so log it and
840 * continue. */
841 char *error_string = ovsdb_error_to_string_free(error);
842 VLOG_WARN("%s", error_string);
843 free(error_string);
844 error = NULL;
845 }
846 break;
847 }
848
849 struct raft_record r;
850 error = raft_record_from_json(&r, json);
851 if (!error) {
852 error = raft_apply_record(raft, i, &r);
853 raft_record_uninit(&r);
854 }
855 if (error) {
856 return ovsdb_wrap_error(error, "error reading record %llu from "
857 "%s log", i, raft->name);
858 }
859 }
860
861 /* Set the most recent servers. */
862 raft_get_servers_from_log(raft, VLL_DBG);
863
864 return NULL;
865}
866
867static void
868raft_reset_timer(struct raft *raft)
869{
870 unsigned int duration = (ELECTION_BASE_MSEC
871 + random_range(ELECTION_RANGE_MSEC));
872 raft->election_base = time_msec();
873 raft->election_timeout = raft->election_base + duration;
874}
875
876static void
877raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
878 const struct uuid *sid, bool incoming)
879{
880 struct raft_conn *conn = xzalloc(sizeof *conn);
881 ovs_list_push_back(&raft->conns, &conn->list_node);
882 conn->js = js;
883 if (sid) {
884 conn->sid = *sid;
885 }
886 conn->nickname = raft_address_to_nickname(jsonrpc_session_get_name(js),
887 &conn->sid);
888 conn->incoming = incoming;
889 conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
890}
891
892/* Starts the local server in an existing Raft cluster, using the local copy of
893 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
894 * successful or not. */
895struct ovsdb_error * OVS_WARN_UNUSED_RESULT
896raft_open(struct ovsdb_log *log, struct raft **raftp)
897{
898 struct raft *raft = raft_alloc();
899 raft->log = log;
900
901 struct ovsdb_error *error = raft_read_header(raft);
902 if (error) {
903 goto error;
904 }
905
906 if (!raft->joining) {
907 error = raft_read_log(raft);
908 if (error) {
909 goto error;
910 }
911
912 /* Find our own server. */
913 if (!raft_find_server(raft, &raft->sid)) {
914 error = ovsdb_error(NULL, "server does not belong to cluster");
915 goto error;
916 }
917
918 /* If there's only one server, start an election right away so that the
919 * cluster bootstraps quickly. */
920 if (hmap_count(&raft->servers) == 1) {
921 raft_start_election(raft, false);
922 }
923 } else {
924 raft->join_timeout = time_msec() + 1000;
925 }
926
927 *raftp = raft;
928 hmap_insert(&all_rafts, &raft->hmap_node, hash_string(raft->name, 0));
929 return NULL;
930
931error:
932 raft_close(raft);
933 *raftp = NULL;
934 return error;
935}
936
937/* Returns the name of 'raft', which in OVSDB is the database schema name. */
938const char *
939raft_get_name(const struct raft *raft)
940{
941 return raft->name;
942}
943
944/* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
945 * its cluster, then 'cid' will be all-zeros (unless the administrator
946 * specified a cluster ID running "ovsdb-tool join-cluster").
947 *
948 * Each cluster has a unique cluster ID. */
949const struct uuid *
950raft_get_cid(const struct raft *raft)
951{
952 return &raft->cid;
953}
954
955/* Returns the server ID of 'raft'. Each server has a unique server ID. */
956const struct uuid *
957raft_get_sid(const struct raft *raft)
958{
959 return &raft->sid;
960}
961
962/* Returns true if 'raft' has completed joining its cluster, has not left or
963 * initiated leaving the cluster, does not have failed disk storage, and is
964 * apparently connected to the leader in a healthy way (or is itself the
965 * leader).*/
966bool
967raft_is_connected(const struct raft *raft)
968{
969 return (raft->role != RAFT_CANDIDATE
970 && !raft->joining
971 && !raft->leaving
972 && !raft->left
973 && !raft->failed);
974}
975
976/* Returns true if 'raft' is the cluster leader. */
977bool
978raft_is_leader(const struct raft *raft)
979{
980 return raft->role == RAFT_LEADER;
981}
982
983/* Returns true if 'raft' is the process of joining its cluster. */
984bool
985raft_is_joining(const struct raft *raft)
986{
987 return raft->joining;
988}
989
990/* Only returns *connected* connections. */
991static struct raft_conn *
992raft_find_conn_by_sid(struct raft *raft, const struct uuid *sid)
993{
994 if (!uuid_is_zero(sid)) {
995 struct raft_conn *conn;
996 LIST_FOR_EACH (conn, list_node, &raft->conns) {
997 if (uuid_equals(sid, &conn->sid)
998 && jsonrpc_session_is_connected(conn->js)) {
999 return conn;
1000 }
1001 }
1002 }
1003 return NULL;
1004}
1005
1006static struct raft_conn *
1007raft_find_conn_by_address(struct raft *raft, const char *address)
1008{
1009 struct raft_conn *conn;
1010 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1011 if (!strcmp(jsonrpc_session_get_name(conn->js), address)) {
1012 return conn;
1013 }
1014 }
1015 return NULL;
1016}
1017
1018static void OVS_PRINTF_FORMAT(3, 4)
1019raft_record_note(struct raft *raft, const char *note,
1020 const char *comment_format, ...)
1021{
1022 va_list args;
1023 va_start(args, comment_format);
1024 char *comment = xvasprintf(comment_format, args);
1025 va_end(args);
1026
1027 struct raft_record r = {
1028 .type = RAFT_REC_NOTE,
1029 .comment = comment,
1030 .note = CONST_CAST(char *, note),
1031 };
1032 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
1033
1034 free(comment);
1035}
1036
1037/* If we're leader, try to transfer leadership to another server, logging
1038 * 'reason' as the human-readable reason (it should be a phrase suitable for
1039 * following "because") . */
1040void
1041raft_transfer_leadership(struct raft *raft, const char *reason)
1042{
1043 if (raft->role != RAFT_LEADER) {
1044 return;
1045 }
1046
1047 struct raft_server *s;
1048 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1049 if (!uuid_equals(&raft->sid, &s->sid)
1050 && s->phase == RAFT_PHASE_STABLE) {
1051 struct raft_conn *conn = raft_find_conn_by_sid(raft, &s->sid);
1052 if (!conn) {
1053 continue;
1054 }
1055
1056 union raft_rpc rpc = {
1057 .become_leader = {
1058 .common = {
1059 .comment = CONST_CAST(char *, reason),
1060 .type = RAFT_RPC_BECOME_LEADER,
1061 .sid = s->sid,
1062 },
1063 .term = raft->term,
1064 }
1065 };
02acb41a 1066 raft_send_to_conn(raft, &rpc, conn);
1b1d2e6d
BP
1067
1068 raft_record_note(raft, "transfer leadership",
1069 "transferring leadership to %s because %s",
1070 s->nickname, reason);
1071 break;
1072 }
1073 }
1074}
1075
1076/* Send a RemoveServerRequest to the rest of the servers in the cluster.
1077 *
1078 * If we know which server is the leader, we can just send the request to it.
1079 * However, we might not know which server is the leader, and we might never
1080 * find out if the remove request was actually previously committed by a
1081 * majority of the servers (because in that case the new leader will not send
1082 * AppendRequests or heartbeats to us). Therefore, we instead send
1083 * RemoveRequests to every server. This theoretically has the same problem, if
1084 * the current cluster leader was not previously a member of the cluster, but
1085 * it seems likely to be more robust in practice. */
1086static void
1087raft_send_remove_server_requests(struct raft *raft)
1088{
1089 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1090 VLOG_INFO_RL(&rl, "sending remove request (joining=%s, leaving=%s)",
1091 raft->joining ? "true" : "false",
1092 raft->leaving ? "true" : "false");
1093 const struct raft_server *s;
1094 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1095 if (!uuid_equals(&s->sid, &raft->sid)) {
1096 union raft_rpc rpc = (union raft_rpc) {
1097 .remove_server_request = {
1098 .common = {
1099 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1100 .sid = s->sid,
1101 },
1102 .sid = raft->sid,
1103 },
1104 };
1105 raft_send(raft, &rpc);
1106 }
1107 }
1108
1109 raft->leave_timeout = time_msec() + ELECTION_BASE_MSEC;
1110}
1111
1112/* Attempts to start 'raft' leaving its cluster. The caller can check progress
1113 * using raft_is_leaving() and raft_left(). */
1114void
1115raft_leave(struct raft *raft)
1116{
1117 if (raft->joining || raft->failed || raft->leaving || raft->left) {
1118 return;
1119 }
1120 VLOG_INFO(SID_FMT": starting to leave cluster "CID_FMT,
1121 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1122 raft->leaving = true;
1123 raft_transfer_leadership(raft, "this server is leaving the cluster");
1124 raft_become_follower(raft);
1125 raft_send_remove_server_requests(raft);
1126 raft->leave_timeout = time_msec() + ELECTION_BASE_MSEC;
1127}
1128
1129/* Returns true if 'raft' is currently attempting to leave its cluster. */
1130bool
1131raft_is_leaving(const struct raft *raft)
1132{
1133 return raft->leaving;
1134}
1135
1136/* Returns true if 'raft' successfully left its cluster. */
1137bool
1138raft_left(const struct raft *raft)
1139{
1140 return raft->left;
1141}
1142
1143/* Returns true if 'raft' has experienced a disk I/O failure. When this
1144 * returns true, only closing and reopening 'raft' allows for recovery. */
1145bool
1146raft_failed(const struct raft *raft)
1147{
1148 return raft->failed;
1149}
1150
1151/* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1152 * current cluster. */
1153void
1154raft_take_leadership(struct raft *raft)
1155{
1156 if (raft->role != RAFT_LEADER) {
1157 raft_start_election(raft, true);
1158 }
1159}
1160
1161/* Closes everything owned by 'raft' that might be visible outside the process:
1162 * network connections, commands, etc. This is part of closing 'raft'; it is
1163 * also used if 'raft' has failed in an unrecoverable way. */
1164static void
1165raft_close__(struct raft *raft)
1166{
1167 if (!hmap_node_is_null(&raft->hmap_node)) {
1168 hmap_remove(&all_rafts, &raft->hmap_node);
1169 hmap_node_nullify(&raft->hmap_node);
1170 }
1171
1172 raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
1173
1174 struct raft_server *rs = raft->remove_server;
1175 if (rs) {
1176 raft_send_remove_server_reply__(raft, &rs->sid, &rs->requester_sid,
1177 rs->requester_conn, false,
1178 RAFT_SERVER_SHUTDOWN);
1179 raft_server_destroy(raft->remove_server);
1180 raft->remove_server = NULL;
1181 }
1182
1183 struct raft_conn *conn, *next;
1184 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1185 raft_conn_close(conn);
1186 }
1187}
1188
1189/* Closes and frees 'raft'.
1190 *
1191 * A server's cluster membership is independent of whether the server is
1192 * actually running. When a server that is a member of a cluster closes, the
1193 * cluster treats this as a server failure. */
1194void
1195raft_close(struct raft *raft)
1196{
1197 if (!raft) {
1198 return;
1199 }
1200
1201 raft_transfer_leadership(raft, "this server is shutting down");
1202
1203 raft_close__(raft);
1204
1205 ovsdb_log_close(raft->log);
1206
1207 raft_servers_destroy(&raft->servers);
1208
1209 for (uint64_t index = raft->log_start; index < raft->log_end; index++) {
1210 struct raft_entry *e = &raft->entries[index - raft->log_start];
1211 raft_entry_uninit(e);
1212 }
1213 free(raft->entries);
1214
1215 raft_entry_uninit(&raft->snap);
1216
1217 raft_waiters_destroy(raft);
1218
1219 raft_servers_destroy(&raft->add_servers);
1220
1221 hmap_destroy(&raft->commands);
1222
1223 pstream_close(raft->listener);
1224
1225 sset_destroy(&raft->remote_addresses);
1226 free(raft->local_address);
1227 free(raft->local_nickname);
1228 free(raft->name);
1229
1230 free(raft);
1231}
1232
1233static bool
1234raft_conn_receive(struct raft *raft, struct raft_conn *conn,
1235 union raft_rpc *rpc)
1236{
1237 struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
1238 if (!msg) {
1239 return false;
1240 }
1241
1242 struct ovsdb_error *error = raft_rpc_from_jsonrpc(&raft->cid, &raft->sid,
1243 msg, rpc);
1244 jsonrpc_msg_destroy(msg);
1245 if (error) {
1246 char *s = ovsdb_error_to_string_free(error);
1247 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn->js), s);
1248 free(s);
1249 return false;
1250 }
1251
1252 if (uuid_is_zero(&conn->sid)) {
1253 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1254 conn->sid = rpc->common.sid;
1255 VLOG_INFO_RL(&rl, "%s: learned server ID "SID_FMT,
1256 jsonrpc_session_get_name(conn->js), SID_ARGS(&conn->sid));
1257 } else if (!uuid_equals(&conn->sid, &rpc->common.sid)) {
1258 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1259 VLOG_WARN_RL(&rl, "%s: ignoring message with unexpected server ID "
1260 SID_FMT" (expected "SID_FMT")",
1261 jsonrpc_session_get_name(conn->js),
1262 SID_ARGS(&rpc->common.sid), SID_ARGS(&conn->sid));
1263 raft_rpc_uninit(rpc);
1264 return false;
1265 }
1266
1267 const char *address = (rpc->type == RAFT_RPC_HELLO_REQUEST
1268 ? rpc->hello_request.address
1269 : rpc->type == RAFT_RPC_ADD_SERVER_REQUEST
1270 ? rpc->add_server_request.address
1271 : NULL);
1272 if (address) {
1273 char *new_nickname = raft_address_to_nickname(address, &conn->sid);
1274 if (strcmp(conn->nickname, new_nickname)) {
1275 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1276 VLOG_INFO_RL(&rl, "%s: learned remote address %s",
1277 jsonrpc_session_get_name(conn->js), address);
1278
1279 free(conn->nickname);
1280 conn->nickname = new_nickname;
1281 } else {
1282 free(new_nickname);
1283 }
1284 }
1285
1286 return true;
1287}
1288
1289static const char *
1290raft_get_nickname(const struct raft *raft, const struct uuid *sid,
1291 char buf[SID_LEN + 1], size_t bufsize)
1292{
1293 if (uuid_equals(sid, &raft->sid)) {
1294 return raft->local_nickname;
1295 }
1296
1297 const char *s = raft_servers_get_nickname__(&raft->servers, sid);
1298 if (s) {
1299 return s;
1300 }
1301
1302 return raft_servers_get_nickname(&raft->add_servers, sid, buf, bufsize);
1303}
1304
1305static void
02acb41a
BP
1306log_rpc(const union raft_rpc *rpc, const char *direction,
1307 const struct raft_conn *conn, int line_number)
1b1d2e6d
BP
1308{
1309 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
1310 if (!raft_rpc_is_heartbeat(rpc) && !VLOG_DROP_DBG(&rl)) {
1311 struct ds s = DS_EMPTY_INITIALIZER;
02acb41a
BP
1312 if (line_number) {
1313 ds_put_format(&s, "raft.c:%d ", line_number);
1314 }
1315 ds_put_format(&s, "%s%s ", direction, conn->nickname);
1b1d2e6d 1316 raft_rpc_format(rpc, &s);
02acb41a 1317 VLOG_DBG("%s", ds_cstr(&s));
1b1d2e6d
BP
1318 ds_destroy(&s);
1319 }
1320}
1321
1322static void
1323raft_send_add_server_request(struct raft *raft, struct raft_conn *conn)
1324{
1325 union raft_rpc rq = {
1326 .add_server_request = {
1327 .common = {
1328 .type = RAFT_RPC_ADD_SERVER_REQUEST,
1329 .sid = UUID_ZERO,
1330 .comment = NULL,
1331 },
1332 .address = raft->local_address,
1333 },
1334 };
02acb41a 1335 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1336}
1337
1338static void
1339raft_conn_run(struct raft *raft, struct raft_conn *conn)
1340{
1341 jsonrpc_session_run(conn->js);
1342
1343 unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
1344 bool just_connected = (new_seqno != conn->js_seqno
1345 && jsonrpc_session_is_connected(conn->js));
1346 conn->js_seqno = new_seqno;
1347 if (just_connected) {
1348 if (raft->joining) {
1349 raft_send_add_server_request(raft, conn);
1350 } else if (raft->leaving) {
1351 union raft_rpc rq = {
1352 .remove_server_request = {
1353 .common = {
1354 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1355 .sid = conn->sid,
1356 },
1357 .sid = raft->sid,
1358 },
1359 };
02acb41a 1360 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1361 } else {
1362 union raft_rpc rq = (union raft_rpc) {
1363 .hello_request = {
1364 .common = {
1365 .type = RAFT_RPC_HELLO_REQUEST,
1366 .sid = conn->sid,
1367 },
1368 .address = raft->local_address,
1369 },
1370 };
02acb41a 1371 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1372 }
1373 }
1374
1375 for (size_t i = 0; i < 50; i++) {
1376 union raft_rpc rpc;
1377 if (!raft_conn_receive(raft, conn, &rpc)) {
1378 break;
1379 }
1380
02acb41a 1381 log_rpc(&rpc, "<--", conn, 0);
1b1d2e6d
BP
1382 raft_handle_rpc(raft, &rpc);
1383 raft_rpc_uninit(&rpc);
1384 }
1385}
1386
1387static void
1388raft_waiter_complete_rpc(struct raft *raft, const union raft_rpc *rpc)
1389{
1390 uint64_t term = raft_rpc_get_term(rpc);
1391 if (term && term < raft->term) {
1392 /* Drop the message because it's for an expired term. */
1393 return;
1394 }
1395
1396 if (!raft_is_rpc_synced(raft, rpc)) {
1397 /* This is a bug. A reply message is deferred because some state in
1398 * the message, such as a term or index, has not been committed to
1399 * disk, and they should only be completed when that commit is done.
1400 * But this message is being completed before the commit is finished.
1401 * Complain, and hope that someone reports the bug. */
1402 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1403 if (VLOG_DROP_ERR(&rl)) {
1404 return;
1405 }
1406
1407 struct ds s = DS_EMPTY_INITIALIZER;
1408
1409 if (term > raft->synced_term) {
1410 ds_put_format(&s, " because message term %"PRIu64" is "
1411 "past synced term %"PRIu64,
1412 term, raft->synced_term);
1413 }
1414
1415 uint64_t index = raft_rpc_get_min_sync_index(rpc);
1416 if (index > raft->log_synced) {
1417 ds_put_format(&s, " %s message index %"PRIu64" is past last "
1418 "synced index %"PRIu64,
1419 s.length ? "and" : "because",
1420 index, raft->log_synced);
1421 }
1422
1423 const struct uuid *vote = raft_rpc_get_vote(rpc);
1424 if (vote && !uuid_equals(vote, &raft->synced_vote)) {
1425 char buf1[SID_LEN + 1];
1426 char buf2[SID_LEN + 1];
1427 ds_put_format(&s, " %s vote %s differs from synced vote %s",
1428 s.length ? "and" : "because",
1429 raft_get_nickname(raft, vote, buf1, sizeof buf1),
1430 raft_get_nickname(raft, &raft->synced_vote,
1431 buf2, sizeof buf2));
1432 }
1433
1434 char buf[SID_LEN + 1];
1435 ds_put_format(&s, ": %s ",
1436 raft_get_nickname(raft, &rpc->common.sid,
1437 buf, sizeof buf));
1438 raft_rpc_format(rpc, &s);
1439 VLOG_ERR("internal error: deferred %s message completed "
1440 "but not ready to send%s",
1441 raft_rpc_type_to_string(rpc->type), ds_cstr(&s));
1442 ds_destroy(&s);
1443
1444 return;
1445 }
1446
1447 struct raft_conn *dst = raft_find_conn_by_sid(raft, &rpc->common.sid);
1448 if (dst) {
02acb41a 1449 raft_send_to_conn(raft, rpc, dst);
1b1d2e6d
BP
1450 }
1451}
1452
1453static void
1454raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
1455{
02acb41a 1456 VLOG_INFO("%s:%d", __FILE__, __LINE__);
1b1d2e6d
BP
1457 switch (w->type) {
1458 case RAFT_W_ENTRY:
1459 if (raft->role == RAFT_LEADER) {
1460 raft_update_our_match_index(raft, w->entry.index);
1461 }
1462 raft->log_synced = w->entry.index;
1463 break;
1464
1465 case RAFT_W_TERM:
1466 raft->synced_term = w->term.term;
1467 raft->synced_vote = w->term.vote;
1468 break;
1469
1470 case RAFT_W_RPC:
1471 raft_waiter_complete_rpc(raft, w->rpc);
1472 break;
1473 }
1474}
1475
1476static void
1477raft_waiter_destroy(struct raft_waiter *w)
1478{
1479 if (!w) {
1480 return;
1481 }
1482
1483 ovs_list_remove(&w->list_node);
1484
1485 switch (w->type) {
1486 case RAFT_W_ENTRY:
1487 case RAFT_W_TERM:
1488 break;
1489
1490 case RAFT_W_RPC:
1491 raft_rpc_uninit(w->rpc);
1492 free(w->rpc);
1493 break;
1494 }
1495 free(w);
1496}
1497
1498static void
1499raft_waiters_run(struct raft *raft)
1500{
1501 if (ovs_list_is_empty(&raft->waiters)) {
1502 return;
1503 }
1504
1505 uint64_t cur = ovsdb_log_commit_progress(raft->log);
1506 struct raft_waiter *w, *next;
1507 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1508 if (cur < w->commit_ticket) {
1509 break;
1510 }
1511 raft_waiter_complete(raft, w);
1512 raft_waiter_destroy(w);
1513 }
1514}
1515
1516static void
1517raft_waiters_wait(struct raft *raft)
1518{
1519 struct raft_waiter *w;
1520 LIST_FOR_EACH (w, list_node, &raft->waiters) {
1521 ovsdb_log_commit_wait(raft->log, w->commit_ticket);
1522 break;
1523 }
1524}
1525
1526static void
1527raft_waiters_destroy(struct raft *raft)
1528{
1529 struct raft_waiter *w, *next;
1530 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1531 raft_waiter_destroy(w);
1532 }
1533}
1534
1535static bool OVS_WARN_UNUSED_RESULT
1536raft_set_term(struct raft *raft, uint64_t term, const struct uuid *vote)
1537{
1538 struct ovsdb_error *error = raft_write_state(raft->log, term, vote);
1539 if (!raft_handle_write_error(raft, error)) {
1540 return false;
1541 }
1542
1543 struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_TERM, true);
1544 raft->term = w->term.term = term;
1545 raft->vote = w->term.vote = vote ? *vote : UUID_ZERO;
1546 return true;
1547}
1548
1549static void
1550raft_accept_vote(struct raft *raft, struct raft_server *s,
1551 const struct uuid *vote)
1552{
1553 if (uuid_equals(&s->vote, vote)) {
1554 return;
1555 }
1556 if (!uuid_is_zero(&s->vote)) {
1557 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1558 char buf1[SID_LEN + 1];
1559 char buf2[SID_LEN + 1];
1560 VLOG_WARN_RL(&rl, "server %s changed its vote from %s to %s",
1561 s->nickname,
1562 raft_get_nickname(raft, &s->vote, buf1, sizeof buf1),
1563 raft_get_nickname(raft, vote, buf2, sizeof buf2));
1564 }
1565 s->vote = *vote;
1566 if (uuid_equals(vote, &raft->sid)
1567 && ++raft->n_votes > hmap_count(&raft->servers) / 2) {
1568 raft_become_leader(raft);
1569 }
1570}
1571
1572static void
1573raft_start_election(struct raft *raft, bool leadership_transfer)
1574{
1575 if (raft->leaving) {
1576 return;
1577 }
1578
1579 struct raft_server *me = raft_find_server(raft, &raft->sid);
1580 if (!me) {
1581 return;
1582 }
1583
1584 if (!raft_set_term(raft, raft->term + 1, &raft->sid)) {
1585 return;
1586 }
1587
1588 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
1589
1590 ovs_assert(raft->role != RAFT_LEADER);
1591 ovs_assert(hmap_is_empty(&raft->commands));
1592 raft->role = RAFT_CANDIDATE;
1593
1594 raft->n_votes = 0;
1595
1596 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1597 if (!VLOG_DROP_INFO(&rl)) {
1598 long long int now = time_msec();
1599 if (now >= raft->election_timeout) {
1600 VLOG_INFO("term %"PRIu64": %lld ms timeout expired, "
1601 "starting election",
1602 raft->term, now - raft->election_base);
1603 } else {
1604 VLOG_INFO("term %"PRIu64": starting election", raft->term);
1605 }
1606 }
1607 raft_reset_timer(raft);
1608
1609 struct raft_server *peer;
1610 HMAP_FOR_EACH (peer, hmap_node, &raft->servers) {
1611 peer->vote = UUID_ZERO;
1612 if (uuid_equals(&raft->sid, &peer->sid)) {
1613 continue;
1614 }
1615
1616 union raft_rpc rq = {
1617 .vote_request = {
1618 .common = {
1619 .type = RAFT_RPC_VOTE_REQUEST,
1620 .sid = peer->sid,
1621 },
1622 .term = raft->term,
1623 .last_log_index = raft->log_end - 1,
1624 .last_log_term = (
1625 raft->log_end > raft->log_start
1626 ? raft->entries[raft->log_end - raft->log_start - 1].term
1627 : raft->snap.term),
1628 .leadership_transfer = leadership_transfer,
1629 },
1630 };
1631 raft_send(raft, &rq);
1632 }
1633
1634 /* Vote for ourselves. */
1635 raft_accept_vote(raft, me, &raft->sid);
1636}
1637
1638static void
1639raft_open_conn(struct raft *raft, const char *address, const struct uuid *sid)
1640{
1641 if (strcmp(address, raft->local_address)
1642 && !raft_find_conn_by_address(raft, address)) {
1643 raft_add_conn(raft, jsonrpc_session_open(address, true), sid, false);
1644 }
1645}
1646
1647static void
1648raft_conn_close(struct raft_conn *conn)
1649{
1650 jsonrpc_session_close(conn->js);
1651 ovs_list_remove(&conn->list_node);
1652 free(conn->nickname);
1653 free(conn);
1654}
1655
1656/* Returns true if 'conn' should stay open, false if it should be closed. */
1657static bool
1658raft_conn_should_stay_open(struct raft *raft, struct raft_conn *conn)
1659{
1660 /* Close the connection if it's actually dead. If necessary, we'll
1661 * initiate a new session later. */
1662 if (!jsonrpc_session_is_alive(conn->js)) {
1663 return false;
1664 }
1665
1666 /* Keep incoming sessions. We trust the originator to decide to drop
1667 * it. */
1668 if (conn->incoming) {
1669 return true;
1670 }
1671
1672 /* If we are joining the cluster, keep sessions to the remote addresses
1673 * that are supposed to be part of the cluster we're joining. */
1674 if (raft->joining && sset_contains(&raft->remote_addresses,
1675 jsonrpc_session_get_name(conn->js))) {
1676 return true;
1677 }
1678
1679 /* We have joined the cluster. If we did that "recently", then there is a
1680 * chance that we do not have the most recent server configuration log
1681 * entry. If so, it's a waste to disconnect from the servers that were in
1682 * remote_addresses and that will probably appear in the configuration,
1683 * just to reconnect to them a moment later when we do get the
1684 * configuration update. If we are not ourselves in the configuration,
1685 * then we know that there must be a new configuration coming up, so in
1686 * that case keep the connection. */
1687 if (!raft_find_server(raft, &raft->sid)) {
1688 return true;
1689 }
1690
1691 /* Keep the connection only if the server is part of the configuration. */
1692 return raft_find_server(raft, &conn->sid);
1693}
1694
1695/* Allows 'raft' to maintain the distributed log. Call this function as part
1696 * of the process's main loop. */
1697void
1698raft_run(struct raft *raft)
1699{
1700 if (raft->left || raft->failed) {
1701 return;
1702 }
1703
1704 raft_waiters_run(raft);
1705
1706 if (!raft->listener && time_msec() >= raft->listen_backoff) {
1707 char *paddr = raft_make_address_passive(raft->local_address);
1708 int error = pstream_open(paddr, &raft->listener, DSCP_DEFAULT);
1709 if (error) {
1710 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1711 VLOG_WARN_RL(&rl, "%s: listen failed (%s)",
1712 paddr, ovs_strerror(error));
1713 raft->listen_backoff = time_msec() + 1000;
1714 }
1715 free(paddr);
1716 }
1717
1718 if (raft->listener) {
1719 struct stream *stream;
1720 int error = pstream_accept(raft->listener, &stream);
1721 if (!error) {
1722 raft_add_conn(raft, jsonrpc_session_open_unreliably(
1723 jsonrpc_open(stream), DSCP_DEFAULT), NULL,
1724 true);
1725 } else if (error != EAGAIN) {
1726 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1727 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
1728 pstream_get_name(raft->listener),
1729 ovs_strerror(error));
1730 }
1731 }
1732
1733 /* Run RPCs for all open sessions. */
1734 struct raft_conn *conn;
1735 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1736 raft_conn_run(raft, conn);
1737 }
1738
1739 /* Close unneeded sessions. */
1740 struct raft_conn *next;
1741 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1742 if (!raft_conn_should_stay_open(raft, conn)) {
1743 raft_conn_close(conn);
1744 }
1745 }
1746
1747 /* Open needed sessions. */
1748 struct raft_server *server;
1749 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1750 raft_open_conn(raft, server->address, &server->sid);
1751 }
1752 if (raft->joining) {
1753 const char *address;
1754 SSET_FOR_EACH (address, &raft->remote_addresses) {
1755 raft_open_conn(raft, address, NULL);
1756 }
1757 }
1758
1759 if (!raft->joining && time_msec() >= raft->election_timeout) {
1760 raft_start_election(raft, false);
1761 }
1762
1763 if (raft->leaving && time_msec() >= raft->leave_timeout) {
1764 raft_send_remove_server_requests(raft);
1765 }
1766
1767 if (raft->joining && time_msec() >= raft->join_timeout) {
1768 raft->join_timeout = time_msec() + 1000;
1769 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1770 raft_send_add_server_request(raft, conn);
1771 }
1772 }
1773
1774 if (time_msec() >= raft->ping_timeout) {
1775 if (raft->role == RAFT_LEADER) {
1776 raft_send_heartbeats(raft);
1777 } else {
1778 long long int now = time_msec();
1779 struct raft_command *cmd, *next_cmd;
1780 HMAP_FOR_EACH_SAFE (cmd, next_cmd, hmap_node, &raft->commands) {
1781 if (cmd->timestamp
1782 && now - cmd->timestamp > ELECTION_BASE_MSEC) {
1783 raft_command_complete(raft, cmd, RAFT_CMD_TIMEOUT);
1784 }
1785 }
1786 }
1787 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
1788 }
1789
1790 /* Do this only at the end; if we did it as soon as we set raft->left or
1791 * raft->failed in handling the RemoveServerReply, then it could easily
1792 * cause references to freed memory in RPC sessions, etc. */
1793 if (raft->left || raft->failed) {
1794 raft_close__(raft);
1795 }
1796}
1797
1798static void
1799raft_wait_session(struct jsonrpc_session *js)
1800{
1801 if (js) {
1802 jsonrpc_session_wait(js);
1803 jsonrpc_session_recv_wait(js);
1804 }
1805}
1806
1807/* Causes the next call to poll_block() to wake up when 'raft' needs to do
1808 * something. */
1809void
1810raft_wait(struct raft *raft)
1811{
1812 if (raft->left || raft->failed) {
1813 return;
1814 }
1815
1816 raft_waiters_wait(raft);
1817
1818 if (raft->listener) {
1819 pstream_wait(raft->listener);
1820 } else {
1821 poll_timer_wait_until(raft->listen_backoff);
1822 }
1823
1824 struct raft_conn *conn;
1825 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1826 raft_wait_session(conn->js);
1827 }
1828
1829 if (!raft->joining) {
1830 poll_timer_wait_until(raft->election_timeout);
1831 } else {
1832 poll_timer_wait_until(raft->join_timeout);
1833 }
1834 if (raft->leaving) {
1835 poll_timer_wait_until(raft->leave_timeout);
1836 }
1837 if (raft->role == RAFT_LEADER || !hmap_is_empty(&raft->commands)) {
1838 poll_timer_wait_until(raft->ping_timeout);
1839 }
1840}
1841
1842static struct raft_waiter *
1843raft_waiter_create(struct raft *raft, enum raft_waiter_type type,
1844 bool start_commit)
1845{
1846 struct raft_waiter *w = xzalloc(sizeof *w);
1847 ovs_list_push_back(&raft->waiters, &w->list_node);
1848 w->commit_ticket = start_commit ? ovsdb_log_commit_start(raft->log) : 0;
1849 w->type = type;
1850 return w;
1851}
1852
1853/* Returns a human-readable representation of 'status' (or NULL if 'status' is
1854 * invalid). */
1855const char *
1856raft_command_status_to_string(enum raft_command_status status)
1857{
1858 switch (status) {
1859 case RAFT_CMD_INCOMPLETE:
1860 return "operation still in progress";
1861 case RAFT_CMD_SUCCESS:
1862 return "success";
1863 case RAFT_CMD_NOT_LEADER:
1864 return "not leader";
1865 case RAFT_CMD_BAD_PREREQ:
1866 return "prerequisite check failed";
1867 case RAFT_CMD_LOST_LEADERSHIP:
1868 return "lost leadership";
1869 case RAFT_CMD_SHUTDOWN:
1870 return "server shutdown";
1871 case RAFT_CMD_IO_ERROR:
1872 return "I/O error";
1873 case RAFT_CMD_TIMEOUT:
1874 return "timeout";
1875 default:
1876 return NULL;
1877 }
1878}
1879
1880/* Converts human-readable status in 's' into status code in '*statusp'.
1881 * Returns true if successful, false if 's' is unknown. */
1882bool
1883raft_command_status_from_string(const char *s,
1884 enum raft_command_status *statusp)
1885{
1886 for (enum raft_command_status status = 0; ; status++) {
1887 const char *s2 = raft_command_status_to_string(status);
1888 if (!s2) {
1889 *statusp = 0;
1890 return false;
1891 } else if (!strcmp(s, s2)) {
1892 *statusp = status;
1893 return true;
1894 }
1895 }
1896}
1897
1898static const struct uuid *
1899raft_get_eid(const struct raft *raft, uint64_t index)
1900{
1901 for (; index >= raft->log_start; index--) {
1902 const struct raft_entry *e = raft_get_entry(raft, index);
1903 if (e->data) {
1904 return &e->eid;
1905 }
1906 }
1907 return &raft->snap.eid;
1908}
1909
1910static const struct uuid *
1911raft_current_eid(const struct raft *raft)
1912{
1913 return raft_get_eid(raft, raft->log_end - 1);
1914}
1915
1916static struct raft_command *
1917raft_command_create_completed(enum raft_command_status status)
1918{
1919 ovs_assert(status != RAFT_CMD_INCOMPLETE);
1920
1921 struct raft_command *cmd = xzalloc(sizeof *cmd);
1922 cmd->n_refs = 1;
1923 cmd->status = status;
1924 return cmd;
1925}
1926
1927static struct raft_command *
1928raft_command_create_incomplete(struct raft *raft, uint64_t index)
1929{
1930 struct raft_command *cmd = xzalloc(sizeof *cmd);
1931 cmd->n_refs = 2; /* One for client, one for raft->commands. */
1932 cmd->index = index;
1933 cmd->status = RAFT_CMD_INCOMPLETE;
1934 hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
1935 return cmd;
1936}
1937
1938static struct raft_command * OVS_WARN_UNUSED_RESULT
1939raft_command_initiate(struct raft *raft,
1940 const struct json *data, const struct json *servers,
1941 const struct uuid *eid)
1942{
1943 /* Write to local log. */
1944 uint64_t index = raft->log_end;
1945 if (!raft_handle_write_error(
1946 raft, raft_write_entry(
1947 raft, raft->term, json_nullable_clone(data), eid,
1948 json_nullable_clone(servers)))) {
1949 return raft_command_create_completed(RAFT_CMD_IO_ERROR);
1950 }
1951
1952 struct raft_command *cmd = raft_command_create_incomplete(raft, index);
1953 if (eid) {
1954 cmd->eid = *eid;
1955 }
1956
1957 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index = cmd->index;
1958
1959 /* Write to remote logs. */
1960 struct raft_server *s;
1961 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1962 if (!uuid_equals(&s->sid, &raft->sid) && s->next_index == index) {
1963 raft_send_append_request(raft, s, 1, "execute command");
1964 s->next_index++;
1965 }
1966 }
1967
1968 return cmd;
1969}
1970
1971static struct raft_command * OVS_WARN_UNUSED_RESULT
1972raft_command_execute__(struct raft *raft,
1973 const struct json *data, const struct json *servers,
1974 const struct uuid *prereq, struct uuid *result)
1975{
1976 if (raft->joining || raft->leaving || raft->left || raft->failed) {
1977 return raft_command_create_completed(RAFT_CMD_SHUTDOWN);
1978 }
1979
1980 if (raft->role != RAFT_LEADER) {
1981 /* Consider proxying the command to the leader. We can only do that if
1982 * we know the leader and the command does not change the set of
1983 * servers. We do not proxy commands without prerequisites, even
1984 * though we could, because in an OVSDB context a log entry doesn't
1985 * make sense without context. */
1986 if (servers || !data
1987 || raft->role != RAFT_FOLLOWER || uuid_is_zero(&raft->leader_sid)
1988 || !prereq) {
1989 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
1990 }
1991 }
1992
1993 struct uuid eid = data ? uuid_random() : UUID_ZERO;
1994 if (result) {
1995 *result = eid;
1996 }
1997
1998 if (raft->role != RAFT_LEADER) {
1999 const union raft_rpc rpc = {
2000 .execute_command_request = {
2001 .common = {
2002 .type = RAFT_RPC_EXECUTE_COMMAND_REQUEST,
2003 .sid = raft->leader_sid,
2004 },
2005 .data = CONST_CAST(struct json *, data),
2006 .prereq = *prereq,
2007 .result = eid,
2008 }
2009 };
2010 if (!raft_send(raft, &rpc)) {
2011 /* Couldn't send command, so it definitely failed. */
2012 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2013 }
2014
2015 struct raft_command *cmd = raft_command_create_incomplete(raft, 0);
2016 cmd->timestamp = time_msec();
2017 cmd->eid = eid;
2018 return cmd;
2019 }
2020
2021 const struct uuid *current_eid = raft_current_eid(raft);
2022 if (prereq && !uuid_equals(prereq, current_eid)) {
2023 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2024 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
2025 "prerequisite "UUID_FMT,
2026 UUID_ARGS(current_eid), UUID_ARGS(prereq));
2027 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ);
2028 }
2029
2030 return raft_command_initiate(raft, data, servers, &eid);
2031}
2032
2033/* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2034 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2035 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2036 * populated with the entry ID for the new log entry.
2037 *
2038 * Returns a "struct raft_command" that may be used to track progress adding
2039 * the log entry. The caller must eventually free the returned structure, with
2040 * raft_command_unref(). */
2041struct raft_command * OVS_WARN_UNUSED_RESULT
2042raft_command_execute(struct raft *raft, const struct json *data,
2043 const struct uuid *prereq, struct uuid *result)
2044{
2045 return raft_command_execute__(raft, data, NULL, prereq, result);
2046}
2047
2048/* Returns the status of 'cmd'. */
2049enum raft_command_status
2050raft_command_get_status(const struct raft_command *cmd)
2051{
2052 ovs_assert(cmd->n_refs > 0);
2053 return cmd->status;
2054}
2055
2056/* Returns the index of the log entry at which 'cmd' was committed.
2057 *
2058 * This function works only with successful commands. */
2059uint64_t
2060raft_command_get_commit_index(const struct raft_command *cmd)
2061{
2062 ovs_assert(cmd->n_refs > 0);
2063 ovs_assert(cmd->status == RAFT_CMD_SUCCESS);
2064 return cmd->index;
2065}
2066
2067/* Frees 'cmd'. */
2068void
2069raft_command_unref(struct raft_command *cmd)
2070{
2071 if (cmd) {
2072 ovs_assert(cmd->n_refs > 0);
2073 if (!--cmd->n_refs) {
2074 free(cmd);
2075 }
2076 }
2077}
2078
2079/* Causes poll_block() to wake up when 'cmd' has status to report. */
2080void
2081raft_command_wait(const struct raft_command *cmd)
2082{
2083 if (cmd->status != RAFT_CMD_INCOMPLETE) {
2084 poll_immediate_wake();
2085 }
2086}
2087
2088static void
2089raft_command_complete(struct raft *raft,
2090 struct raft_command *cmd,
2091 enum raft_command_status status)
2092{
2093 if (!uuid_is_zero(&cmd->sid)) {
2094 uint64_t commit_index = status == RAFT_CMD_SUCCESS ? cmd->index : 0;
2095 raft_send_execute_command_reply(raft, &cmd->sid, &cmd->eid, status,
2096 commit_index);
2097 }
2098
2099 ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
2100 ovs_assert(cmd->n_refs > 0);
2101 hmap_remove(&raft->commands, &cmd->hmap_node);
2102 cmd->status = status;
2103 raft_command_unref(cmd);
2104}
2105
2106static void
2107raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
2108{
2109 struct raft_command *cmd, *next;
2110 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2111 raft_command_complete(raft, cmd, status);
2112 }
2113}
2114
2115static struct raft_command *
2116raft_find_command_by_index(struct raft *raft, uint64_t index)
2117{
2118 struct raft_command *cmd;
2119
2120 HMAP_FOR_EACH_IN_BUCKET (cmd, hmap_node, index, &raft->commands) {
2121 if (cmd->index == index) {
2122 return cmd;
2123 }
2124 }
2125 return NULL;
2126}
2127
2128static struct raft_command *
2129raft_find_command_by_eid(struct raft *raft, const struct uuid *eid)
2130{
2131 struct raft_command *cmd;
2132
2133 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2134 if (uuid_equals(&cmd->eid, eid)) {
2135 return cmd;
2136 }
2137 }
2138 return NULL;
2139}
2140\f
2141#define RAFT_RPC(ENUM, NAME) \
2142 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2143RAFT_RPC_TYPES
2144#undef RAFT_RPC
2145
2146static void
2147raft_handle_hello_request(struct raft *raft OVS_UNUSED,
2148 const struct raft_hello_request *hello OVS_UNUSED)
2149{
2150}
2151
2152/* 'sid' is the server being added. */
2153static void
2154raft_send_add_server_reply__(struct raft *raft, const struct uuid *sid,
2155 const char *address,
2156 bool success, const char *comment)
2157{
2158 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2159 if (!VLOG_DROP_INFO(&rl)) {
2160 struct ds s = DS_EMPTY_INITIALIZER;
2161 char buf[SID_LEN + 1];
2162 ds_put_format(&s, "adding %s ("SID_FMT" at %s) "
2163 "to cluster "CID_FMT" %s",
2164 raft_get_nickname(raft, sid, buf, sizeof buf),
2165 SID_ARGS(sid), address, CID_ARGS(&raft->cid),
2166 success ? "succeeded" : "failed");
2167 if (comment) {
2168 ds_put_format(&s, " (%s)", comment);
2169 }
2170 VLOG_INFO("%s", ds_cstr(&s));
2171 ds_destroy(&s);
2172 }
2173
2174 union raft_rpc rpy = {
2175 .add_server_reply = {
2176 .common = {
2177 .type = RAFT_RPC_ADD_SERVER_REPLY,
2178 .sid = *sid,
2179 .comment = CONST_CAST(char *, comment),
2180 },
2181 .success = success,
2182 }
2183 };
2184
2185 struct sset *remote_addresses = &rpy.add_server_reply.remote_addresses;
2186 sset_init(remote_addresses);
2187 if (!raft->joining) {
2188 struct raft_server *s;
2189 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2190 if (!uuid_equals(&s->sid, &raft->sid)) {
2191 sset_add(remote_addresses, s->address);
2192 }
2193 }
2194 }
2195
2196 raft_send(raft, &rpy);
2197
2198 sset_destroy(remote_addresses);
2199}
2200
2201static void
17bd4149
BP
2202raft_send_remove_server_reply_rpc(struct raft *raft,
2203 const struct uuid *dst_sid,
2204 const struct uuid *target_sid,
1b1d2e6d
BP
2205 bool success, const char *comment)
2206{
17bd4149
BP
2207 if (uuid_equals(&raft->sid, dst_sid)) {
2208 if (success && uuid_equals(&raft->sid, target_sid)) {
2209 raft_finished_leaving_cluster(raft);
2210 }
2211 return;
2212 }
2213
1b1d2e6d
BP
2214 const union raft_rpc rpy = {
2215 .remove_server_reply = {
2216 .common = {
2217 .type = RAFT_RPC_REMOVE_SERVER_REPLY,
17bd4149 2218 .sid = *dst_sid,
1b1d2e6d
BP
2219 .comment = CONST_CAST(char *, comment),
2220 },
17bd4149
BP
2221 .target_sid = (uuid_equals(dst_sid, target_sid)
2222 ? UUID_ZERO
2223 : *target_sid),
1b1d2e6d
BP
2224 .success = success,
2225 }
2226 };
2227 raft_send(raft, &rpy);
2228}
2229
2230static void
2231raft_send_remove_server_reply__(struct raft *raft,
2232 const struct uuid *target_sid,
2233 const struct uuid *requester_sid,
2234 struct unixctl_conn *requester_conn,
2235 bool success, const char *comment)
2236{
2237 struct ds s = DS_EMPTY_INITIALIZER;
2238 ds_put_format(&s, "request ");
2239 if (!uuid_is_zero(requester_sid)) {
2240 char buf[SID_LEN + 1];
2241 ds_put_format(&s, "by %s",
2242 raft_get_nickname(raft, requester_sid, buf, sizeof buf));
2243 } else {
2244 ds_put_cstr(&s, "via unixctl");
2245 }
2246 ds_put_cstr(&s, " to remove ");
2247 if (!requester_conn && uuid_equals(target_sid, requester_sid)) {
2248 ds_put_cstr(&s, "itself");
2249 } else {
2250 char buf[SID_LEN + 1];
2251 ds_put_cstr(&s, raft_get_nickname(raft, target_sid, buf, sizeof buf));
17bd4149
BP
2252 if (uuid_equals(target_sid, &raft->sid)) {
2253 ds_put_cstr(&s, " (ourselves)");
2254 }
1b1d2e6d
BP
2255 }
2256 ds_put_format(&s, " from cluster "CID_FMT" %s",
2257 CID_ARGS(&raft->cid),
2258 success ? "succeeded" : "failed");
2259 if (comment) {
2260 ds_put_format(&s, " (%s)", comment);
2261 }
2262
2263 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2264 VLOG_INFO_RL(&rl, "%s", ds_cstr(&s));
2265
2266 /* Send RemoveServerReply to the requester (which could be a server or a
2267 * unixctl connection. Also always send it to the removed server; this
2268 * allows it to be sure that it's really removed and update its log and
2269 * disconnect permanently. */
2270 if (!uuid_is_zero(requester_sid)) {
17bd4149 2271 raft_send_remove_server_reply_rpc(raft, requester_sid, target_sid,
1b1d2e6d
BP
2272 success, comment);
2273 }
2274 if (!uuid_equals(requester_sid, target_sid)) {
17bd4149
BP
2275 raft_send_remove_server_reply_rpc(raft, target_sid, target_sid,
2276 success, comment);
1b1d2e6d
BP
2277 }
2278 if (requester_conn) {
2279 if (success) {
2280 unixctl_command_reply(requester_conn, ds_cstr(&s));
2281 } else {
2282 unixctl_command_reply_error(requester_conn, ds_cstr(&s));
2283 }
2284 }
2285
2286 ds_destroy(&s);
2287}
2288
2289static void
2290raft_send_add_server_reply(struct raft *raft,
2291 const struct raft_add_server_request *rq,
2292 bool success, const char *comment)
2293{
2294 return raft_send_add_server_reply__(raft, &rq->common.sid, rq->address,
2295 success, comment);
2296}
2297
2298static void
2299raft_send_remove_server_reply(struct raft *raft,
2300 const struct raft_remove_server_request *rq,
2301 bool success, const char *comment)
2302{
2303 return raft_send_remove_server_reply__(raft, &rq->sid, &rq->common.sid,
2304 rq->requester_conn, success,
2305 comment);
2306}
2307
2308static void
2309raft_become_follower(struct raft *raft)
2310{
2311 raft->leader_sid = UUID_ZERO;
2312 if (raft->role == RAFT_FOLLOWER) {
2313 return;
2314 }
2315
2316 raft->role = RAFT_FOLLOWER;
2317 raft_reset_timer(raft);
2318
2319 /* Notify clients about lost leadership.
2320 *
2321 * We do not reverse our changes to 'raft->servers' because the new
2322 * configuration is already part of the log. Possibly the configuration
2323 * log entry will not be committed, but until we know that we must use the
2324 * new configuration. Our AppendEntries processing will properly update
2325 * the server configuration later, if necessary. */
2326 struct raft_server *s;
2327 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
2328 raft_send_add_server_reply__(raft, &s->sid, s->address, false,
2329 RAFT_SERVER_LOST_LEADERSHIP);
2330 }
2331 if (raft->remove_server) {
2332 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
2333 &raft->remove_server->requester_sid,
2334 raft->remove_server->requester_conn,
2335 false, RAFT_SERVER_LOST_LEADERSHIP);
2336 raft_server_destroy(raft->remove_server);
2337 raft->remove_server = NULL;
2338 }
2339
2340 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2341}
2342
2343static void
2344raft_send_append_request(struct raft *raft,
2345 struct raft_server *peer, unsigned int n,
2346 const char *comment)
2347{
2348 ovs_assert(raft->role == RAFT_LEADER);
2349
2350 const union raft_rpc rq = {
2351 .append_request = {
2352 .common = {
2353 .type = RAFT_RPC_APPEND_REQUEST,
2354 .sid = peer->sid,
2355 .comment = CONST_CAST(char *, comment),
2356 },
2357 .term = raft->term,
2358 .prev_log_index = peer->next_index - 1,
2359 .prev_log_term = (peer->next_index - 1 >= raft->log_start
2360 ? raft->entries[peer->next_index - 1
2361 - raft->log_start].term
2362 : raft->snap.term),
2363 .leader_commit = raft->commit_index,
2364 .entries = &raft->entries[peer->next_index - raft->log_start],
2365 .n_entries = n,
2366 },
2367 };
2368 raft_send(raft, &rq);
2369}
2370
2371static void
2372raft_send_heartbeats(struct raft *raft)
2373{
2374 struct raft_server *s;
2375 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2376 if (!uuid_equals(&raft->sid, &s->sid)) {
2377 raft_send_append_request(raft, s, 0, "heartbeat");
2378 }
2379 }
2380
2381 /* Send anyone waiting for a command to complete a ping to let them
2382 * know we're still working on it. */
2383 struct raft_command *cmd;
2384 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2385 if (!uuid_is_zero(&cmd->sid)) {
2386 raft_send_execute_command_reply(raft, &cmd->sid,
2387 &cmd->eid,
2388 RAFT_CMD_INCOMPLETE, 0);
2389 }
2390 }
2391}
2392
2393/* Initializes the fields in 's' that represent the leader's view of the
2394 * server. */
2395static void
2396raft_server_init_leader(struct raft *raft, struct raft_server *s)
2397{
2398 s->next_index = raft->log_end;
2399 s->match_index = 0;
2400 s->phase = RAFT_PHASE_STABLE;
2401}
2402
2403static void
2404raft_become_leader(struct raft *raft)
2405{
2406 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2407
2408 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
2409 VLOG_INFO_RL(&rl, "term %"PRIu64": elected leader by %d+ of "
2410 "%"PRIuSIZE" servers", raft->term,
2411 raft->n_votes, hmap_count(&raft->servers));
2412
2413 ovs_assert(raft->role != RAFT_LEADER);
2414 raft->role = RAFT_LEADER;
2415 raft->leader_sid = raft->sid;
2416 raft->election_timeout = LLONG_MAX;
2417 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
2418
2419 struct raft_server *s;
2420 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2421 raft_server_init_leader(raft, s);
2422 }
2423
2424 raft_update_our_match_index(raft, raft->log_end - 1);
2425 raft_send_heartbeats(raft);
2426
2427 /* Write the fact that we are leader to the log. This is not used by the
2428 * algorithm (although it could be, for quick restart), but it is used for
2429 * offline analysis to check for conformance with the properties that Raft
2430 * guarantees. */
2431 struct raft_record r = {
2432 .type = RAFT_REC_LEADER,
2433 .term = raft->term,
2434 .sid = raft->sid,
2435 };
2436 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2437
2438 /* Initiate a no-op commit. Otherwise we might never find out what's in
2439 * the log. See section 6.4 item 1:
2440 *
2441 * The Leader Completeness Property guarantees that a leader has all
2442 * committed entries, but at the start of its term, it may not know
2443 * which those are. To find out, it needs to commit an entry from its
2444 * term. Raft handles this by having each leader commit a blank no-op
2445 * entry into the log at the start of its term. As soon as this no-op
2446 * entry is committed, the leader’s commit index will be at least as
2447 * large as any other servers’ during its term.
2448 */
2449 raft_command_unref(raft_command_execute__(raft, NULL, NULL, NULL, NULL));
2450}
2451
2452/* Processes term 'term' received as part of RPC 'common'. Returns true if the
2453 * caller should continue processing the RPC, false if the caller should reject
2454 * it due to a stale term. */
2455static bool
2456raft_receive_term__(struct raft *raft, const struct raft_rpc_common *common,
2457 uint64_t term)
2458{
2459 /* Section 3.3 says:
2460 *
2461 * Current terms are exchanged whenever servers communicate; if one
2462 * server’s current term is smaller than the other’s, then it updates
2463 * its current term to the larger value. If a candidate or leader
2464 * discovers that its term is out of date, it immediately reverts to
2465 * follower state. If a server receives a request with a stale term
2466 * number, it rejects the request.
2467 */
2468 if (term > raft->term) {
2469 if (!raft_set_term(raft, term, NULL)) {
2470 /* Failed to update the term to 'term'. */
2471 return false;
2472 }
2473 raft_become_follower(raft);
2474 } else if (term < raft->term) {
2475 char buf[SID_LEN + 1];
2476 VLOG_INFO("rejecting term %"PRIu64" < current term %"PRIu64" received "
2477 "in %s message from server %s",
2478 term, raft->term,
2479 raft_rpc_type_to_string(common->type),
2480 raft_get_nickname(raft, &common->sid, buf, sizeof buf));
2481 return false;
2482 }
2483 return true;
2484}
2485
2486static void
2487raft_get_servers_from_log(struct raft *raft, enum vlog_level level)
2488{
2489 const struct json *servers_json = raft->snap.servers;
2490 for (uint64_t index = raft->log_end - 1; index >= raft->log_start;
2491 index--) {
2492 struct raft_entry *e = &raft->entries[index - raft->log_start];
2493 if (e->servers) {
2494 servers_json = e->servers;
2495 break;
2496 }
2497 }
2498
2499 struct hmap servers;
2500 struct ovsdb_error *error = raft_servers_from_json(servers_json, &servers);
2501 ovs_assert(!error);
2502 raft_set_servers(raft, &servers, level);
2503 raft_servers_destroy(&servers);
2504}
2505
2506/* Truncates the log, so that raft->log_end becomes 'new_end'.
2507 *
2508 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2509 * log file, but we don't have the right information to know how long it should
2510 * be. What we actually do is to append entries for older indexes to the
2511 * on-disk log; when we re-read it later, these entries truncate the log.
2512 *
2513 * Returns true if any of the removed log entries were server configuration
2514 * entries, false otherwise. */
2515static bool
2516raft_truncate(struct raft *raft, uint64_t new_end)
2517{
2518 ovs_assert(new_end >= raft->log_start);
2519 if (raft->log_end > new_end) {
2520 char buf[SID_LEN + 1];
2521 VLOG_INFO("%s truncating %"PRIu64 " entries from end of log",
2522 raft_get_nickname(raft, &raft->sid, buf, sizeof buf),
2523 raft->log_end - new_end);
2524 }
2525
2526 bool servers_changed = false;
2527 while (raft->log_end > new_end) {
2528 struct raft_entry *entry = &raft->entries[--raft->log_end
2529 - raft->log_start];
2530 if (entry->servers) {
2531 servers_changed = true;
2532 }
2533 raft_entry_uninit(entry);
2534 }
2535 return servers_changed;
2536}
2537
2538static const struct json *
2539raft_peek_next_entry(struct raft *raft, struct uuid *eid)
2540{
2541 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2542 ovs_assert(raft->log_start <= raft->last_applied + 2);
2543 ovs_assert(raft->last_applied <= raft->commit_index);
2544 ovs_assert(raft->commit_index < raft->log_end);
2545
2546 if (raft->joining || raft->failed) {
2547 return NULL;
2548 }
2549
2550 if (raft->log_start == raft->last_applied + 2) {
2551 *eid = raft->snap.eid;
2552 return raft->snap.data;
2553 }
2554
2555 while (raft->last_applied < raft->commit_index) {
2556 const struct raft_entry *e = raft_get_entry(raft,
2557 raft->last_applied + 1);
2558 if (e->data) {
2559 *eid = e->eid;
2560 return e->data;
2561 }
2562 raft->last_applied++;
2563 }
2564 return NULL;
2565}
2566
2567static const struct json *
2568raft_get_next_entry(struct raft *raft, struct uuid *eid)
2569{
2570 const struct json *data = raft_peek_next_entry(raft, eid);
2571 if (data) {
2572 raft->last_applied++;
2573 }
2574 return data;
2575}
2576
2577static void
2578raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
2579{
2580 if (new_commit_index <= raft->commit_index) {
2581 return;
2582 }
2583
2584 if (raft->role == RAFT_LEADER) {
2585 while (raft->commit_index < new_commit_index) {
2586 uint64_t index = ++raft->commit_index;
2587 const struct raft_entry *e = raft_get_entry(raft, index);
1b1d2e6d
BP
2588 if (e->data) {
2589 struct raft_command *cmd
2590 = raft_find_command_by_index(raft, index);
2591 if (cmd) {
2592 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2593 }
2594 }
8b37ed75
BP
2595 if (e->servers) {
2596 /* raft_run_reconfigure() can write a new Raft entry, which can
2597 * reallocate raft->entries, which would invalidate 'e', so
2598 * this case must be last, after the one for 'e->data'. */
2599 raft_run_reconfigure(raft);
2600 }
1b1d2e6d
BP
2601 }
2602 } else {
2603 raft->commit_index = new_commit_index;
2604 }
2605
2606 /* Write the commit index to the log. The next time we restart, this
2607 * allows us to start exporting a reasonably fresh log, instead of a log
2608 * that only contains the snapshot. */
2609 struct raft_record r = {
2610 .type = RAFT_REC_COMMIT_INDEX,
2611 .commit_index = raft->commit_index,
2612 };
2613 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2614}
2615
2616/* This doesn't use rq->entries (but it does use rq->n_entries). */
2617static void
2618raft_send_append_reply(struct raft *raft, const struct raft_append_request *rq,
2619 enum raft_append_result result, const char *comment)
2620{
2621 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2622 * min(leaderCommit, index of last new entry)" */
2623 if (result == RAFT_APPEND_OK && rq->leader_commit > raft->commit_index) {
2624 raft_update_commit_index(
2625 raft, MIN(rq->leader_commit, rq->prev_log_index + rq->n_entries));
2626 }
2627
2628 /* Send reply. */
2629 union raft_rpc reply = {
2630 .append_reply = {
2631 .common = {
2632 .type = RAFT_RPC_APPEND_REPLY,
2633 .sid = rq->common.sid,
2634 .comment = CONST_CAST(char *, comment),
2635 },
2636 .term = raft->term,
2637 .log_end = raft->log_end,
2638 .prev_log_index = rq->prev_log_index,
2639 .prev_log_term = rq->prev_log_term,
2640 .n_entries = rq->n_entries,
2641 .result = result,
2642 }
2643 };
2644 raft_send(raft, &reply);
2645}
2646
2647/* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2648 * NULL. Otherwise, returns an explanation for the mismatch. */
2649static const char *
2650match_index_and_term(const struct raft *raft,
2651 uint64_t prev_log_index, uint64_t prev_log_term)
2652{
2653 if (prev_log_index < raft->log_start - 1) {
2654 return "mismatch before start of log";
2655 } else if (prev_log_index == raft->log_start - 1) {
2656 if (prev_log_term != raft->snap.term) {
2657 return "prev_term mismatch";
2658 }
2659 } else if (prev_log_index < raft->log_end) {
2660 if (raft->entries[prev_log_index - raft->log_start].term
2661 != prev_log_term) {
2662 return "term mismatch";
2663 }
2664 } else {
2665 /* prev_log_index >= raft->log_end */
2666 return "mismatch past end of log";
2667 }
2668 return NULL;
2669}
2670
2671static void
2672raft_handle_append_entries(struct raft *raft,
2673 const struct raft_append_request *rq,
2674 uint64_t prev_log_index, uint64_t prev_log_term,
2675 const struct raft_entry *entries,
2676 unsigned int n_entries)
2677{
2678 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2679 * the index and term of the entry in its log that immediately precedes
2680 * the new entries. If the follower does not find an entry in its log
2681 * with the same index and term, then it refuses the new entries." */
2682 const char *mismatch = match_index_and_term(raft, prev_log_index,
2683 prev_log_term);
2684 if (mismatch) {
2685 VLOG_INFO("rejecting append_request because previous entry "
2686 "%"PRIu64",%"PRIu64" not in local log (%s)",
2687 prev_log_term, prev_log_index, mismatch);
2688 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY, mismatch);
2689 return;
2690 }
2691
2692 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2693 * index but different terms), delete the existing entry and all that
2694 * follow it." */
2695 unsigned int i;
2696 bool servers_changed = false;
2697 for (i = 0; ; i++) {
2698 if (i >= n_entries) {
2699 /* No change. */
2700 if (rq->common.comment
2701 && !strcmp(rq->common.comment, "heartbeat")) {
2702 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "heartbeat");
2703 } else {
2704 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2705 }
2706 return;
2707 }
2708
2709 uint64_t log_index = (prev_log_index + 1) + i;
2710 if (log_index >= raft->log_end) {
2711 break;
2712 }
2713 if (raft->entries[log_index - raft->log_start].term
2714 != entries[i].term) {
2715 if (raft_truncate(raft, log_index)) {
2716 servers_changed = true;
2717 }
2718 break;
2719 }
2720 }
2721
2722 /* Figure 3.1: "Append any entries not already in the log." */
2723 struct ovsdb_error *error = NULL;
2724 bool any_written = false;
2725 for (; i < n_entries; i++) {
2726 const struct raft_entry *e = &entries[i];
2727 error = raft_write_entry(raft, e->term,
2728 json_nullable_clone(e->data), &e->eid,
2729 json_nullable_clone(e->servers));
2730 if (error) {
2731 break;
2732 }
2733 any_written = true;
2734 if (e->servers) {
2735 servers_changed = true;
2736 }
2737 }
2738
2739 if (any_written) {
2740 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index
2741 = raft->log_end - 1;
2742 }
2743 if (servers_changed) {
2744 /* The set of servers might have changed; check. */
2745 raft_get_servers_from_log(raft, VLL_INFO);
2746 }
2747
2748 if (error) {
2749 char *s = ovsdb_error_to_string_free(error);
2750 VLOG_ERR("%s", s);
2751 free(s);
2752 raft_send_append_reply(raft, rq, RAFT_APPEND_IO_ERROR, "I/O error");
2753 return;
2754 }
2755
2756 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "log updated");
2757}
2758
2759static bool
2760raft_update_leader(struct raft *raft, const struct uuid *sid)
2761{
2762 if (raft->role == RAFT_LEADER) {
2763 char buf[SID_LEN + 1];
2764 VLOG_ERR("this server is leader but server %s claims to be",
2765 raft_get_nickname(raft, sid, buf, sizeof buf));
2766 return false;
2767 } else if (!uuid_equals(sid, &raft->leader_sid)) {
2768 if (!uuid_is_zero(&raft->leader_sid)) {
2769 char buf1[SID_LEN + 1];
2770 char buf2[SID_LEN + 1];
2771 VLOG_ERR("leader for term %"PRIu64" changed from %s to %s",
2772 raft->term,
2773 raft_get_nickname(raft, &raft->leader_sid,
2774 buf1, sizeof buf1),
2775 raft_get_nickname(raft, sid, buf2, sizeof buf2));
2776 } else {
2777 char buf[SID_LEN + 1];
2778 VLOG_INFO("server %s is leader for term %"PRIu64,
2779 raft_get_nickname(raft, sid, buf, sizeof buf),
2780 raft->term);
2781 }
2782 raft->leader_sid = *sid;
2783
2784 /* Record the leader to the log. This is not used by the algorithm
2785 * (although it could be, for quick restart), but it is used for
2786 * offline analysis to check for conformance with the properties
2787 * that Raft guarantees. */
2788 struct raft_record r = {
2789 .type = RAFT_REC_LEADER,
2790 .term = raft->term,
2791 .sid = *sid,
2792 };
2793 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2794 }
2795 return true;
2796}
2797
2798static void
2799raft_handle_append_request(struct raft *raft,
2800 const struct raft_append_request *rq)
2801{
2802 /* We do not check whether the server that sent the request is part of the
2803 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
2804 * from a leader that is not part of the server’s latest configuration.
2805 * Otherwise, a new server could never be added to the cluster (it would
2806 * never accept any log entries preceding the configuration entry that adds
2807 * the server)." */
2808 if (!raft_update_leader(raft, &rq->common.sid)) {
2809 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
2810 "usurped leadership");
2811 return;
2812 }
2813 raft_reset_timer(raft);
2814
2815 /* First check for the common case, where the AppendEntries request is
2816 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
2817 * like this:
2818 *
2819 * rq->prev_log_index
2820 * | first_entry_index
2821 * | | nth_entry_index
2822 * | | |
2823 * v v v
2824 * +---+---+---+---+
2825 * T | T | T | T | T |
2826 * +---+-------+---+
2827 * +---+---+---+---+
2828 * T | T | T | T | T |
2829 * +---+---+---+---+
2830 * ^ ^
2831 * | |
2832 * log_start log_end
2833 * */
2834 uint64_t first_entry_index = rq->prev_log_index + 1;
2835 uint64_t nth_entry_index = rq->prev_log_index + rq->n_entries;
2836 if (OVS_LIKELY(first_entry_index >= raft->log_start)) {
2837 raft_handle_append_entries(raft, rq,
2838 rq->prev_log_index, rq->prev_log_term,
2839 rq->entries, rq->n_entries);
2840 return;
2841 }
2842
2843 /* Now a series of checks for odd cases, where the AppendEntries request
2844 * extends earlier than the beginning of our log, into the log entries
2845 * discarded by the most recent snapshot. */
2846
2847 /*
2848 * Handle the case where the indexes covered by rq->entries[] are entirely
2849 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
2850 * everything in the AppendEntries request must already have been
2851 * committed, and we might as well return true.
2852 *
2853 * rq->prev_log_index
2854 * | first_entry_index
2855 * | | nth_entry_index
2856 * | | |
2857 * v v v
2858 * +---+---+---+---+
2859 * T | T | T | T | T |
2860 * +---+-------+---+
2861 * +---+---+---+---+
2862 * T | T | T | T | T |
2863 * +---+---+---+---+
2864 * ^ ^
2865 * | |
2866 * log_start log_end
2867 */
2868 if (nth_entry_index < raft->log_start - 1) {
2869 raft_send_append_reply(raft, rq, RAFT_APPEND_OK,
2870 "append before log start");
2871 return;
2872 }
2873
2874 /*
2875 * Handle the case where the last entry in rq->entries[] has the same index
2876 * as 'log_start - 1', so we can compare their terms:
2877 *
2878 * rq->prev_log_index
2879 * | first_entry_index
2880 * | | nth_entry_index
2881 * | | |
2882 * v v v
2883 * +---+---+---+---+
2884 * T | T | T | T | T |
2885 * +---+-------+---+
2886 * +---+---+---+---+
2887 * T | T | T | T | T |
2888 * +---+---+---+---+
2889 * ^ ^
2890 * | |
2891 * log_start log_end
2892 *
2893 * There's actually a sub-case where rq->n_entries == 0, in which we
2894 * compare rq->prev_term:
2895 *
2896 * rq->prev_log_index
2897 * |
2898 * |
2899 * |
2900 * v
2901 * T
2902 *
2903 * +---+---+---+---+
2904 * T | T | T | T | T |
2905 * +---+---+---+---+
2906 * ^ ^
2907 * | |
2908 * log_start log_end
2909 */
2910 if (nth_entry_index == raft->log_start - 1) {
2911 if (rq->n_entries
2912 ? raft->snap.term == rq->entries[rq->n_entries - 1].term
2913 : raft->snap.term == rq->prev_log_term) {
2914 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2915 } else {
2916 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
2917 "term mismatch");
2918 }
2919 return;
2920 }
2921
2922 /*
2923 * We now know that the data in rq->entries[] overlaps the data in
2924 * raft->entries[], as shown below, with some positive 'ofs':
2925 *
2926 * rq->prev_log_index
2927 * | first_entry_index
2928 * | | nth_entry_index
2929 * | | |
2930 * v v v
2931 * +---+---+---+---+---+
2932 * T | T | T | T | T | T |
2933 * +---+-------+---+---+
2934 * +---+---+---+---+
2935 * T | T | T | T | T |
2936 * +---+---+---+---+
2937 * ^ ^
2938 * | |
2939 * log_start log_end
2940 *
2941 * |<-- ofs -->|
2942 *
2943 * We transform this into the following by trimming the first 'ofs'
2944 * elements off of rq->entries[], ending up with the following. Notice how
2945 * we retain the term but not the data for rq->entries[ofs - 1]:
2946 *
2947 * first_entry_index + ofs - 1
2948 * | first_entry_index + ofs
2949 * | | nth_entry_index + ofs
2950 * | | |
2951 * v v v
2952 * +---+---+
2953 * T | T | T |
2954 * +---+---+
2955 * +---+---+---+---+
2956 * T | T | T | T | T |
2957 * +---+---+---+---+
2958 * ^ ^
2959 * | |
2960 * log_start log_end
2961 */
2962 uint64_t ofs = raft->log_start - first_entry_index;
2963 raft_handle_append_entries(raft, rq,
2964 raft->log_start - 1, rq->entries[ofs - 1].term,
2965 &rq->entries[ofs], rq->n_entries - ofs);
2966}
2967
2968/* Returns true if 'raft' has another log entry or snapshot to read. */
2969bool
2970raft_has_next_entry(const struct raft *raft_)
2971{
2972 struct raft *raft = CONST_CAST(struct raft *, raft_);
2973 struct uuid eid;
2974 return raft_peek_next_entry(raft, &eid) != NULL;
2975}
2976
2977/* Returns the next log entry or snapshot from 'raft', or NULL if there are
2978 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
2979 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
2980 * log entry. */
2981const struct json *
2982raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
2983{
2984 const struct json *data = raft_get_next_entry(raft, eid);
2985 *is_snapshot = data == raft->snap.data;
2986 return data;
2987}
2988
2989/* Returns the log index of the last-read snapshot or log entry. */
2990uint64_t
2991raft_get_applied_index(const struct raft *raft)
2992{
2993 return raft->last_applied;
2994}
2995
2996/* Returns the log index of the last snapshot or log entry that is available to
2997 * be read. */
2998uint64_t
2999raft_get_commit_index(const struct raft *raft)
3000{
3001 return raft->commit_index;
3002}
3003
3004static struct raft_server *
3005raft_find_peer(struct raft *raft, const struct uuid *uuid)
3006{
3007 struct raft_server *s = raft_find_server(raft, uuid);
3008 return s && !uuid_equals(&raft->sid, &s->sid) ? s : NULL;
3009}
3010
3011static struct raft_server *
3012raft_find_new_server(struct raft *raft, const struct uuid *uuid)
3013{
3014 return raft_server_find(&raft->add_servers, uuid);
3015}
3016
3017/* Figure 3.1: "If there exists an N such that N > commitIndex, a
3018 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
3019 * commitIndex = N (sections 3.5 and 3.6)." */
3020static void
3021raft_consider_updating_commit_index(struct raft *raft)
3022{
3023 /* This loop cannot just bail out when it comes across a log entry that
3024 * does not match the criteria. For example, Figure 3.7(d2) shows a
3025 * case where the log entry for term 2 cannot be committed directly
3026 * (because it is not for the current term) but it can be committed as
3027 * a side effect of commit the entry for term 4 (the current term).
3028 * XXX Is there a more efficient way to do this? */
3029 ovs_assert(raft->role == RAFT_LEADER);
3030
3031 uint64_t new_commit_index = raft->commit_index;
3032 for (uint64_t idx = MAX(raft->commit_index + 1, raft->log_start);
3033 idx < raft->log_end; idx++) {
3034 if (raft->entries[idx - raft->log_start].term == raft->term) {
3035 size_t count = 0;
3036 struct raft_server *s2;
3037 HMAP_FOR_EACH (s2, hmap_node, &raft->servers) {
3038 if (s2->match_index >= idx) {
3039 count++;
3040 }
3041 }
3042 if (count > hmap_count(&raft->servers) / 2) {
3043 VLOG_DBG("index %"PRIu64" committed to %"PRIuSIZE" servers, "
3044 "applying", idx, count);
3045 new_commit_index = idx;
3046 }
3047 }
3048 }
3049 raft_update_commit_index(raft, new_commit_index);
3050}
3051
3052static void
3053raft_update_match_index(struct raft *raft, struct raft_server *s,
3054 uint64_t min_index)
3055{
3056 ovs_assert(raft->role == RAFT_LEADER);
3057 if (min_index > s->match_index) {
3058 s->match_index = min_index;
3059 raft_consider_updating_commit_index(raft);
3060 }
3061}
3062
3063static void
3064raft_update_our_match_index(struct raft *raft, uint64_t min_index)
3065{
e8208c66
BP
3066 struct raft_server *server = raft_find_server(raft, &raft->sid);
3067 if (server) {
3068 raft_update_match_index(raft, server, min_index);
3069 }
1b1d2e6d
BP
3070}
3071
3072static void
3073raft_send_install_snapshot_request(struct raft *raft,
3074 const struct raft_server *s,
3075 const char *comment)
3076{
3077 union raft_rpc rpc = {
3078 .install_snapshot_request = {
3079 .common = {
3080 .type = RAFT_RPC_INSTALL_SNAPSHOT_REQUEST,
3081 .sid = s->sid,
3082 .comment = CONST_CAST(char *, comment),
3083 },
3084 .term = raft->term,
3085 .last_index = raft->log_start - 1,
3086 .last_term = raft->snap.term,
3087 .last_servers = raft->snap.servers,
3088 .last_eid = raft->snap.eid,
3089 .data = raft->snap.data,
3090 }
3091 };
3092 raft_send(raft, &rpc);
3093}
3094
3095static void
3096raft_handle_append_reply(struct raft *raft,
3097 const struct raft_append_reply *rpy)
3098{
3099 if (raft->role != RAFT_LEADER) {
3100 VLOG_INFO("rejected append_reply (not leader)");
3101 return;
3102 }
3103
3104 /* Most commonly we'd be getting an AppendEntries reply from a configured
3105 * server (e.g. a peer), but we can also get them from servers in the
3106 * process of being added. */
3107 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3108 if (!s) {
3109 s = raft_find_new_server(raft, &rpy->common.sid);
3110 if (!s) {
3111 VLOG_INFO("rejected append_reply from unknown server "SID_FMT,
3112 SID_ARGS(&rpy->common.sid));
3113 return;
3114 }
3115 }
3116
3117 if (rpy->result == RAFT_APPEND_OK) {
3118 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3119 * follower (section 3.5)." */
3120 uint64_t min_index = rpy->prev_log_index + rpy->n_entries + 1;
3121 if (s->next_index < min_index) {
3122 s->next_index = min_index;
3123 }
3124 raft_update_match_index(raft, s, min_index - 1);
3125 } else {
3126 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3127 * decrement nextIndex and retry (section 3.5)."
3128 *
3129 * We also implement the optimization suggested in section 4.2.1:
3130 * "Various approaches can make nextIndex converge to its correct value
3131 * more quickly, including those described in Chapter 3. The simplest
3132 * approach to solving this particular problem of adding a new server,
3133 * however, is to have followers return the length of their logs in the
3134 * AppendEntries response; this allows the leader to cap the follower’s
3135 * nextIndex accordingly." */
3136 s->next_index = (s->next_index > 0
3137 ? MIN(s->next_index - 1, rpy->log_end)
3138 : 0);
3139
3140 if (rpy->result == RAFT_APPEND_IO_ERROR) {
3141 /* Append failed but not because of a log inconsistency. Because
3142 * of the I/O error, there's no point in re-sending the append
3143 * immediately. */
3144 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3145 VLOG_INFO_RL(&rl, "%s reported I/O error", s->nickname);
3146 return;
3147 }
3148 }
3149
3150 /*
3151 * Our behavior here must depend on the value of next_index relative to
3152 * log_start and log_end. There are three cases:
3153 *
3154 * Case 1 | Case 2 | Case 3
3155 * <---------------->|<------------->|<------------------>
3156 * | |
3157 *
3158 * +---+---+---+---+
3159 * T | T | T | T | T |
3160 * +---+---+---+---+
3161 * ^ ^
3162 * | |
3163 * log_start log_end
3164 */
3165 if (s->next_index < raft->log_start) {
3166 /* Case 1. */
3167 raft_send_install_snapshot_request(raft, s, NULL);
3168 } else if (s->next_index < raft->log_end) {
3169 /* Case 2. */
3170 raft_send_append_request(raft, s, 1, NULL);
3171 } else {
3172 /* Case 3. */
3173 if (s->phase == RAFT_PHASE_CATCHUP) {
3174 s->phase = RAFT_PHASE_CAUGHT_UP;
3175 raft_run_reconfigure(raft);
3176 }
3177 }
3178}
3179
3180static bool
3181raft_should_suppress_disruptive_server(struct raft *raft,
3182 const union raft_rpc *rpc)
3183{
3184 if (rpc->type != RAFT_RPC_VOTE_REQUEST) {
3185 return false;
3186 }
3187
3188 /* Section 4.2.3 "Disruptive Servers" says:
3189 *
3190 * ...if a server receives a RequestVote request within the minimum
3191 * election timeout of hearing from a current leader, it does not update
3192 * its term or grant its vote...
3193 *
3194 * ...This change conflicts with the leadership transfer mechanism as
3195 * described in Chapter 3, in which a server legitimately starts an
3196 * election without waiting an election timeout. In that case,
3197 * RequestVote messages should be processed by other servers even when
3198 * they believe a current cluster leader exists. Those RequestVote
3199 * requests can include a special flag to indicate this behavior (“I
3200 * have permission to disrupt the leader--it told me to!”).
3201 *
3202 * This clearly describes how the followers should act, but not the leader.
3203 * We just ignore vote requests that arrive at a current leader. This
3204 * seems to be fairly safe, since a majority other than the current leader
3205 * can still elect a new leader and the first AppendEntries from that new
3206 * leader will depose the current leader. */
3207 const struct raft_vote_request *rq = raft_vote_request_cast(rpc);
3208 if (rq->leadership_transfer) {
3209 return false;
3210 }
3211
3212 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3213 long long int now = time_msec();
3214 switch (raft->role) {
3215 case RAFT_LEADER:
3216 VLOG_WARN_RL(&rl, "ignoring vote request received as leader");
3217 return true;
3218
3219 case RAFT_FOLLOWER:
3220 if (now < raft->election_base + ELECTION_BASE_MSEC) {
3221 VLOG_WARN_RL(&rl, "ignoring vote request received after only "
3222 "%lld ms (minimum election time is %d ms)",
3223 now - raft->election_base, ELECTION_BASE_MSEC);
3224 return true;
3225 }
3226 return false;
3227
3228 case RAFT_CANDIDATE:
3229 return false;
3230
3231 default:
3232 OVS_NOT_REACHED();
3233 }
3234}
3235
3236/* Returns true if a reply should be sent. */
3237static bool
3238raft_handle_vote_request__(struct raft *raft,
3239 const struct raft_vote_request *rq)
3240{
3241 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3242 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3243 * 3.6)." */
3244 if (uuid_equals(&raft->vote, &rq->common.sid)) {
3245 /* Already voted for this candidate in this term. Resend vote. */
3246 return true;
3247 } else if (!uuid_is_zero(&raft->vote)) {
3248 /* Already voted for different candidate in this term. Send a reply
3249 * saying what candidate we did vote for. This isn't a necessary part
3250 * of the Raft protocol but it can make debugging easier. */
3251 return true;
3252 }
3253
3254 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3255 * includes information about the candidate’s log, and the voter denies its
3256 * vote if its own log is more up-to-date than that of the candidate. Raft
3257 * determines which of two logs is more up-to-date by comparing the index
3258 * and term of the last entries in the logs. If the logs have last entries
3259 * with different terms, then the log with the later term is more
3260 * up-to-date. If the logs end with the same term, then whichever log is
3261 * longer is more up-to-date." */
3262 uint64_t last_term = (raft->log_end > raft->log_start
3263 ? raft->entries[raft->log_end - 1
3264 - raft->log_start].term
3265 : raft->snap.term);
3266 if (last_term > rq->last_log_term
3267 || (last_term == rq->last_log_term
3268 && raft->log_end - 1 > rq->last_log_index)) {
3269 /* Our log is more up-to-date than the peer's. Withhold vote. */
3270 return false;
3271 }
3272
3273 /* Record a vote for the peer. */
3274 if (!raft_set_term(raft, raft->term, &rq->common.sid)) {
3275 return false;
3276 }
3277
3278 raft_reset_timer(raft);
3279
3280 return true;
3281}
3282
3283static void
3284raft_send_vote_reply(struct raft *raft, const struct uuid *dst,
3285 const struct uuid *vote)
3286{
3287 union raft_rpc rpy = {
3288 .vote_reply = {
3289 .common = {
3290 .type = RAFT_RPC_VOTE_REPLY,
3291 .sid = *dst,
3292 },
3293 .term = raft->term,
3294 .vote = *vote,
3295 },
3296 };
3297 raft_send(raft, &rpy);
3298}
3299
3300static void
3301raft_handle_vote_request(struct raft *raft,
3302 const struct raft_vote_request *rq)
3303{
3304 if (raft_handle_vote_request__(raft, rq)) {
3305 raft_send_vote_reply(raft, &rq->common.sid, &raft->vote);
3306 }
3307}
3308
3309static void
3310raft_handle_vote_reply(struct raft *raft,
3311 const struct raft_vote_reply *rpy)
3312{
3313 if (!raft_receive_term__(raft, &rpy->common, rpy->term)) {
3314 return;
3315 }
3316
3317 if (raft->role != RAFT_CANDIDATE) {
3318 return;
3319 }
3320
3321 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3322 if (s) {
3323 raft_accept_vote(raft, s, &rpy->vote);
3324 }
3325}
3326
3327/* Returns true if 'raft''s log contains reconfiguration entries that have not
3328 * yet been committed. */
3329static bool
3330raft_has_uncommitted_configuration(const struct raft *raft)
3331{
3332 for (uint64_t i = raft->commit_index + 1; i < raft->log_end; i++) {
3333 ovs_assert(i >= raft->log_start);
3334 const struct raft_entry *e = &raft->entries[i - raft->log_start];
3335 if (e->servers) {
3336 return true;
3337 }
3338 }
3339 return false;
3340}
3341
3342static void
3343raft_log_reconfiguration(struct raft *raft)
3344{
3345 struct json *servers_json = raft_servers_to_json(&raft->servers);
3346 raft_command_unref(raft_command_execute__(
3347 raft, NULL, servers_json, NULL, NULL));
3348 json_destroy(servers_json);
3349}
3350
3351static void
3352raft_run_reconfigure(struct raft *raft)
3353{
3354 ovs_assert(raft->role == RAFT_LEADER);
3355
3356 /* Reconfiguration only progresses when configuration changes commit. */
3357 if (raft_has_uncommitted_configuration(raft)) {
3358 return;
3359 }
3360
3361 /* If we were waiting for a configuration change to commit, it's done. */
3362 struct raft_server *s;
3363 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3364 if (s->phase == RAFT_PHASE_COMMITTING) {
3365 raft_send_add_server_reply__(raft, &s->sid, s->address,
3366 true, RAFT_SERVER_COMPLETED);
3367 s->phase = RAFT_PHASE_STABLE;
3368 }
3369 }
3370 if (raft->remove_server) {
3371 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
3372 &raft->remove_server->requester_sid,
3373 raft->remove_server->requester_conn,
3374 true, RAFT_SERVER_COMPLETED);
3375 raft_server_destroy(raft->remove_server);
3376 raft->remove_server = NULL;
3377 }
3378
3379 /* If a new server is caught up, add it to the configuration. */
3380 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
3381 if (s->phase == RAFT_PHASE_CAUGHT_UP) {
3382 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3383 hmap_remove(&raft->add_servers, &s->hmap_node);
3384 hmap_insert(&raft->servers, &s->hmap_node, uuid_hash(&s->sid));
3385
3386 /* Mark 's' as waiting for commit. */
3387 s->phase = RAFT_PHASE_COMMITTING;
3388
3389 raft_log_reconfiguration(raft);
3390
3391 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3392 * send a RAFT_SERVER_OK reply. */
3393
3394 return;
3395 }
3396 }
3397
3398 /* Remove a server, if one is scheduled for removal. */
3399 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3400 if (s->phase == RAFT_PHASE_REMOVE) {
3401 hmap_remove(&raft->servers, &s->hmap_node);
3402 raft->remove_server = s;
3403
3404 raft_log_reconfiguration(raft);
3405
3406 return;
3407 }
3408 }
3409}
3410
3411static void
3412raft_handle_add_server_request(struct raft *raft,
3413 const struct raft_add_server_request *rq)
3414{
3415 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3416 if (raft->role != RAFT_LEADER) {
3417 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3418 return;
3419 }
3420
3421 /* Check for an existing server. */
3422 struct raft_server *s = raft_find_server(raft, &rq->common.sid);
3423 if (s) {
3424 /* If the server is scheduled to be removed, cancel it. */
3425 if (s->phase == RAFT_PHASE_REMOVE) {
3426 s->phase = RAFT_PHASE_STABLE;
3427 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_CANCELED);
3428 return;
3429 }
3430
3431 /* If the server is being added, then it's in progress. */
3432 if (s->phase != RAFT_PHASE_STABLE) {
3433 raft_send_add_server_reply(raft, rq,
3434 false, RAFT_SERVER_IN_PROGRESS);
3435 }
3436
3437 /* Nothing to do--server is already part of the configuration. */
3438 raft_send_add_server_reply(raft, rq,
3439 true, RAFT_SERVER_ALREADY_PRESENT);
3440 return;
3441 }
3442
3443 /* Check for a server being removed. */
3444 if (raft->remove_server
3445 && uuid_equals(&rq->common.sid, &raft->remove_server->sid)) {
3446 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3447 return;
3448 }
3449
3450 /* Check for a server already being added. */
3451 if (raft_find_new_server(raft, &rq->common.sid)) {
3452 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_IN_PROGRESS);
3453 return;
3454 }
3455
3456 /* Add server to 'add_servers'. */
3457 s = raft_server_add(&raft->add_servers, &rq->common.sid, rq->address);
3458 raft_server_init_leader(raft, s);
3459 s->requester_sid = rq->common.sid;
3460 s->requester_conn = NULL;
3461 s->phase = RAFT_PHASE_CATCHUP;
3462
3463 /* Start sending the log. If this is the first time we've tried to add
3464 * this server, then this will quickly degenerate into an InstallSnapshot
3465 * followed by a series of AddEntries, but if it's a retry of an earlier
3466 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3467 * leadership) then it will gracefully resume populating the log.
3468 *
3469 * See the last few paragraphs of section 4.2.1 for further insight. */
3470 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3471 VLOG_INFO_RL(&rl,
3472 "starting to add server %s ("SID_FMT" at %s) "
3473 "to cluster "CID_FMT, s->nickname, SID_ARGS(&s->sid),
3474 rq->address, CID_ARGS(&raft->cid));
3475 raft_send_append_request(raft, s, 0, "initialize new server");
3476}
3477
3478static void
3479raft_handle_add_server_reply(struct raft *raft,
3480 const struct raft_add_server_reply *rpy)
3481{
3482 if (!raft->joining) {
3483 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3484 VLOG_WARN_RL(&rl, "received add_server_reply even though we're "
3485 "already part of the cluster");
3486 return;
3487 }
3488
3489 if (rpy->success) {
3490 raft->joining = false;
3491
3492 /* It is tempting, at this point, to check that this server is part of
3493 * the current configuration. However, this is not necessarily the
3494 * case, because the log entry that added this server to the cluster
3495 * might have been committed by a majority of the cluster that does not
3496 * include this one. This actually happens in testing. */
3497 } else {
3498 const char *address;
3499 SSET_FOR_EACH (address, &rpy->remote_addresses) {
3500 if (sset_add(&raft->remote_addresses, address)) {
3501 VLOG_INFO("%s: learned new server address for joining cluster",
3502 address);
3503 }
3504 }
3505 }
3506}
3507
3508/* This is called by raft_unixctl_kick() as well as via RPC. */
3509static void
3510raft_handle_remove_server_request(struct raft *raft,
3511 const struct raft_remove_server_request *rq)
3512{
3513 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3514 if (raft->role != RAFT_LEADER) {
3515 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3516 return;
3517 }
3518
3519 /* If the server to remove is currently waiting to be added, cancel it. */
3520 struct raft_server *target = raft_find_new_server(raft, &rq->sid);
3521 if (target) {
3522 raft_send_add_server_reply__(raft, &target->sid, target->address,
3523 false, RAFT_SERVER_CANCELED);
3524 hmap_remove(&raft->add_servers, &target->hmap_node);
3525 raft_server_destroy(target);
3526 return;
3527 }
3528
3529 /* If the server isn't configured, report that. */
3530 target = raft_find_server(raft, &rq->sid);
3531 if (!target) {
3532 raft_send_remove_server_reply(raft, rq,
3533 true, RAFT_SERVER_ALREADY_GONE);
3534 return;
3535 }
3536
3537 /* Check whether we're waiting for the addition of the server to commit. */
3538 if (target->phase == RAFT_PHASE_COMMITTING) {
3539 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3540 return;
3541 }
3542
3543 /* Check whether the server is already scheduled for removal. */
3544 if (target->phase == RAFT_PHASE_REMOVE) {
3545 raft_send_remove_server_reply(raft, rq,
3546 false, RAFT_SERVER_IN_PROGRESS);
3547 return;
3548 }
3549
3550 /* Make sure that if we remove this server then that at least one other
3551 * server will be left. We don't count servers currently being added (in
3552 * 'add_servers') since those could fail. */
3553 struct raft_server *s;
3554 int n = 0;
3555 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3556 if (s != target && s->phase != RAFT_PHASE_REMOVE) {
3557 n++;
3558 }
3559 }
3560 if (!n) {
3561 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_EMPTY);
3562 return;
3563 }
3564
3565 /* Mark the server for removal. */
3566 target->phase = RAFT_PHASE_REMOVE;
3567 if (rq->requester_conn) {
3568 target->requester_sid = UUID_ZERO;
3569 unixctl_command_reply(rq->requester_conn, "started removal");
3570 } else {
3571 target->requester_sid = rq->common.sid;
3572 target->requester_conn = NULL;
3573 }
3574
3575 raft_run_reconfigure(raft);
3576 /* Operation in progress, reply will be sent later. */
3577}
3578
3579static void
17bd4149 3580raft_finished_leaving_cluster(struct raft *raft)
1b1d2e6d 3581{
17bd4149
BP
3582 VLOG_INFO(SID_FMT": finished leaving cluster "CID_FMT,
3583 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1b1d2e6d 3584
17bd4149
BP
3585 raft_record_note(raft, "left", "this server left the cluster");
3586
3587 raft->leaving = false;
3588 raft->left = true;
3589}
1b1d2e6d 3590
17bd4149
BP
3591static void
3592raft_handle_remove_server_reply(struct raft *raft,
3593 const struct raft_remove_server_reply *rpc)
3594{
3595 if (rpc->success
3596 && (uuid_is_zero(&rpc->target_sid)
3597 || uuid_equals(&rpc->target_sid, &raft->sid))) {
3598 raft_finished_leaving_cluster(raft);
1b1d2e6d
BP
3599 }
3600}
3601
3602static bool
3603raft_handle_write_error(struct raft *raft, struct ovsdb_error *error)
3604{
3605 if (error && !raft->failed) {
3606 raft->failed = true;
3607
3608 char *s = ovsdb_error_to_string_free(error);
3609 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3610 raft->name, s);
3611 free(s);
3612 }
3613 return !raft->failed;
3614}
3615
3616static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3617raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
3618 uint64_t new_log_start,
3619 const struct raft_entry *new_snapshot)
3620{
3621 struct raft_header h = {
3622 .sid = raft->sid,
3623 .cid = raft->cid,
3624 .name = raft->name,
3625 .local_address = raft->local_address,
3626 .snap_index = new_log_start - 1,
3627 .snap = *new_snapshot,
3628 };
3629 struct ovsdb_error *error = ovsdb_log_write_and_free(
3630 log, raft_header_to_json(&h));
3631 if (error) {
3632 return error;
3633 }
3634 ovsdb_log_mark_base(raft->log);
3635
3636 /* Write log records. */
3637 for (uint64_t index = new_log_start; index < raft->log_end; index++) {
3638 const struct raft_entry *e = &raft->entries[index - raft->log_start];
3639 struct raft_record r = {
3640 .type = RAFT_REC_ENTRY,
3641 .term = e->term,
3642 .entry = {
3643 .index = index,
3644 .data = e->data,
3645 .servers = e->servers,
3646 .eid = e->eid,
3647 },
3648 };
3649 error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3650 if (error) {
3651 return error;
3652 }
3653 }
3654
3655 /* Write term and vote (if any).
3656 *
3657 * The term is redundant if we wrote a log record for that term above. The
3658 * vote, if any, is never redundant.
3659 */
3660 error = raft_write_state(log, raft->term, &raft->vote);
3661 if (error) {
3662 return error;
3663 }
3664
3665 /* Write commit_index if it's beyond the new start of the log. */
3666 if (raft->commit_index >= new_log_start) {
3667 struct raft_record r = {
3668 .type = RAFT_REC_COMMIT_INDEX,
3669 .commit_index = raft->commit_index,
3670 };
3671 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3672 }
3673 return NULL;
3674}
3675
3676static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3677raft_save_snapshot(struct raft *raft,
3678 uint64_t new_start, const struct raft_entry *new_snapshot)
3679
3680{
3681 struct ovsdb_log *new_log;
3682 struct ovsdb_error *error;
3683 error = ovsdb_log_replace_start(raft->log, &new_log);
3684 if (error) {
3685 return error;
3686 }
3687
3688 error = raft_write_snapshot(raft, new_log, new_start, new_snapshot);
3689 if (error) {
3690 ovsdb_log_replace_abort(new_log);
3691 return error;
3692 }
3693
3694 return ovsdb_log_replace_commit(raft->log, new_log);
3695}
3696
3697static bool
3698raft_handle_install_snapshot_request__(
3699 struct raft *raft, const struct raft_install_snapshot_request *rq)
3700{
3701 raft_reset_timer(raft);
3702
3703 /*
3704 * Our behavior here depend on new_log_start in the snapshot compared to
3705 * log_start and log_end. There are three cases:
3706 *
3707 * Case 1 | Case 2 | Case 3
3708 * <---------------->|<------------->|<------------------>
3709 * | |
3710 *
3711 * +---+---+---+---+
3712 * T | T | T | T | T |
3713 * +---+---+---+---+
3714 * ^ ^
3715 * | |
3716 * log_start log_end
3717 */
3718 uint64_t new_log_start = rq->last_index + 1;
3719 if (new_log_start < raft->log_start) {
3720 /* Case 1: The new snapshot covers less than our current one. Nothing
3721 * to do. */
3722 return true;
3723 } else if (new_log_start < raft->log_end) {
3724 /* Case 2: The new snapshot starts in the middle of our log. We could
3725 * discard the first 'new_log_start - raft->log_start' entries in the
3726 * log. But there's not much value in that, since snapshotting is
3727 * supposed to be a local decision. Just skip it. */
3728 return true;
3729 }
3730
3731 /* Case 3: The new snapshot starts past the end of our current log, so
3732 * discard all of our current log. */
3733 const struct raft_entry new_snapshot = {
3734 .term = rq->last_term,
3735 .data = rq->data,
3736 .eid = rq->last_eid,
3737 .servers = rq->last_servers,
3738 };
3739 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3740 &new_snapshot);
3741 if (error) {
3742 char *error_s = ovsdb_error_to_string(error);
3743 VLOG_WARN("could not save snapshot: %s", error_s);
3744 free(error_s);
3745 return false;
3746 }
3747
3748 for (size_t i = 0; i < raft->log_end - raft->log_start; i++) {
3749 raft_entry_uninit(&raft->entries[i]);
3750 }
3751 raft->log_start = raft->log_end = new_log_start;
3752 raft->log_synced = raft->log_end - 1;
3753 raft->commit_index = raft->log_start - 1;
3754 if (raft->last_applied < raft->commit_index) {
3755 raft->last_applied = raft->log_start - 2;
3756 }
3757
3758 raft_entry_uninit(&raft->snap);
3759 raft_entry_clone(&raft->snap, &new_snapshot);
3760
3761 raft_get_servers_from_log(raft, VLL_INFO);
3762
3763 return true;
3764}
3765
3766static void
3767raft_handle_install_snapshot_request(
3768 struct raft *raft, const struct raft_install_snapshot_request *rq)
3769{
3770 if (raft_handle_install_snapshot_request__(raft, rq)) {
3771 union raft_rpc rpy = {
3772 .install_snapshot_reply = {
3773 .common = {
3774 .type = RAFT_RPC_INSTALL_SNAPSHOT_REPLY,
3775 .sid = rq->common.sid,
3776 },
3777 .term = raft->term,
3778 .last_index = rq->last_index,
3779 .last_term = rq->last_term,
3780 },
3781 };
3782 raft_send(raft, &rpy);
3783 }
3784}
3785
3786static void
3787raft_handle_install_snapshot_reply(
3788 struct raft *raft, const struct raft_install_snapshot_reply *rpy)
3789{
3790 /* We might get an InstallSnapshot reply from a configured server (e.g. a
3791 * peer) or a server in the process of being added. */
3792 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3793 if (!s) {
3794 s = raft_find_new_server(raft, &rpy->common.sid);
3795 if (!s) {
3796 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3797 VLOG_INFO_RL(&rl, "cluster "CID_FMT": received %s from "
3798 "unknown server "SID_FMT, CID_ARGS(&raft->cid),
3799 raft_rpc_type_to_string(rpy->common.type),
3800 SID_ARGS(&rpy->common.sid));
3801 return;
3802 }
3803 }
3804
3805 if (rpy->last_index != raft->log_start - 1 ||
3806 rpy->last_term != raft->snap.term) {
3807 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3808 VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s installed "
3809 "out-of-date snapshot, starting over",
3810 CID_ARGS(&raft->cid), s->nickname);
3811 raft_send_install_snapshot_request(raft, s,
3812 "installed obsolete snapshot");
3813 return;
3814 }
3815
3816 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3817 VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
3818 " up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
3819 s->nickname, rpy->last_term, rpy->last_index);
3820 s->next_index = raft->log_end;
3821 raft_send_append_request(raft, s, 0, "snapshot installed");
3822}
3823
3824/* Returns true if 'raft' has grown enough since the last snapshot that
3825 * reducing the log to a snapshot would be valuable, false otherwise. */
3826bool
3827raft_grew_lots(const struct raft *raft)
3828{
3829 return ovsdb_log_grew_lots(raft->log);
3830}
3831
3832/* Returns the number of log entries that could be trimmed off the on-disk log
3833 * by snapshotting. */
3834uint64_t
3835raft_get_log_length(const struct raft *raft)
3836{
3837 return (raft->last_applied < raft->log_start
3838 ? 0
3839 : raft->last_applied - raft->log_start + 1);
3840}
3841
3842/* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
3843 * possible. */
3844bool
3845raft_may_snapshot(const struct raft *raft)
3846{
3847 return (!raft->joining
3848 && !raft->leaving
3849 && !raft->left
3850 && !raft->failed
3851 && raft->last_applied >= raft->log_start);
3852}
3853
3854/* Replaces the log for 'raft', up to the last log entry read, by
3855 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
3856 * the caller must eventually free.
3857 *
3858 * This function can only succeed if raft_may_snapshot() returns true. It is
3859 * only valuable to call it if raft_get_log_length() is significant and
3860 * especially if raft_grew_lots() returns true. */
3861struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3862raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
3863{
3864 if (raft->joining) {
3865 return ovsdb_error(NULL,
3866 "cannot store a snapshot while joining cluster");
3867 } else if (raft->leaving) {
3868 return ovsdb_error(NULL,
3869 "cannot store a snapshot while leaving cluster");
3870 } else if (raft->left) {
3871 return ovsdb_error(NULL,
3872 "cannot store a snapshot after leaving cluster");
3873 } else if (raft->failed) {
3874 return ovsdb_error(NULL,
3875 "cannot store a snapshot following failure");
3876 }
3877
3878 if (raft->last_applied < raft->log_start) {
3879 return ovsdb_error(NULL, "not storing a duplicate snapshot");
3880 }
3881
3882 uint64_t new_log_start = raft->last_applied + 1;
a521491b 3883 struct raft_entry new_snapshot = {
1b1d2e6d 3884 .term = raft_get_term(raft, new_log_start - 1),
a521491b 3885 .data = json_clone(new_snapshot_data),
1b1d2e6d 3886 .eid = *raft_get_eid(raft, new_log_start - 1),
a521491b 3887 .servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
1b1d2e6d
BP
3888 };
3889 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3890 &new_snapshot);
3891 if (error) {
a521491b 3892 raft_entry_uninit(&new_snapshot);
1b1d2e6d
BP
3893 return error;
3894 }
3895
3896 raft->log_synced = raft->log_end - 1;
3897 raft_entry_uninit(&raft->snap);
a521491b 3898 raft->snap = new_snapshot;
1b1d2e6d
BP
3899 for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
3900 raft_entry_uninit(&raft->entries[i]);
3901 }
3902 memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
3903 (raft->log_end - new_log_start) * sizeof *raft->entries);
3904 raft->log_start = new_log_start;
3905 return NULL;
3906}
3907
3908static void
3909raft_handle_become_leader(struct raft *raft,
3910 const struct raft_become_leader *rq)
3911{
3912 if (raft->role == RAFT_FOLLOWER) {
3913 char buf[SID_LEN + 1];
3914 VLOG_INFO("received leadership transfer from %s in term %"PRIu64,
3915 raft_get_nickname(raft, &rq->common.sid, buf, sizeof buf),
3916 rq->term);
3917 raft_start_election(raft, true);
3918 }
3919}
3920
3921static void
3922raft_send_execute_command_reply(struct raft *raft,
3923 const struct uuid *sid,
3924 const struct uuid *eid,
3925 enum raft_command_status status,
3926 uint64_t commit_index)
3927{
3928 union raft_rpc rpc = {
3929 .execute_command_reply = {
3930 .common = {
3931 .type = RAFT_RPC_EXECUTE_COMMAND_REPLY,
3932 .sid = *sid,
3933 },
3934 .result = *eid,
3935 .status = status,
3936 .commit_index = commit_index,
3937 },
3938 };
3939 raft_send(raft, &rpc);
3940}
3941
3942static enum raft_command_status
3943raft_handle_execute_command_request__(
3944 struct raft *raft, const struct raft_execute_command_request *rq)
3945{
3946 if (raft->role != RAFT_LEADER) {
3947 return RAFT_CMD_NOT_LEADER;
3948 }
3949
3950 const struct uuid *current_eid = raft_current_eid(raft);
3951 if (!uuid_equals(&rq->prereq, current_eid)) {
3952 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3953 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
3954 "prerequisite "UUID_FMT" in execute_command_request",
3955 UUID_ARGS(current_eid), UUID_ARGS(&rq->prereq));
3956 return RAFT_CMD_BAD_PREREQ;
3957 }
3958
3959 struct raft_command *cmd = raft_command_initiate(raft, rq->data,
3960 NULL, &rq->result);
3961 cmd->sid = rq->common.sid;
3962
3963 enum raft_command_status status = cmd->status;
3964 if (status != RAFT_CMD_INCOMPLETE) {
3965 raft_command_unref(cmd);
3966 }
3967 return status;
3968}
3969
3970static void
3971raft_handle_execute_command_request(
3972 struct raft *raft, const struct raft_execute_command_request *rq)
3973{
3974 enum raft_command_status status
3975 = raft_handle_execute_command_request__(raft, rq);
3976 if (status != RAFT_CMD_INCOMPLETE) {
3977 raft_send_execute_command_reply(raft, &rq->common.sid, &rq->result,
3978 status, 0);
3979 }
3980}
3981
3982static void
3983raft_handle_execute_command_reply(
3984 struct raft *raft, const struct raft_execute_command_reply *rpy)
3985{
3986 struct raft_command *cmd = raft_find_command_by_eid(raft, &rpy->result);
3987 if (!cmd) {
3988 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3989 char buf[SID_LEN + 1];
3990 VLOG_INFO_RL(&rl,
3991 "%s received \"%s\" reply from %s for unknown command",
3992 raft->local_nickname,
3993 raft_command_status_to_string(rpy->status),
3994 raft_get_nickname(raft, &rpy->common.sid,
3995 buf, sizeof buf));
3996 return;
3997 }
3998
3999 if (rpy->status == RAFT_CMD_INCOMPLETE) {
4000 cmd->timestamp = time_msec();
4001 } else {
4002 cmd->index = rpy->commit_index;
4003 raft_command_complete(raft, cmd, rpy->status);
4004 }
4005}
4006
4007static void
4008raft_handle_rpc(struct raft *raft, const union raft_rpc *rpc)
4009{
4010 uint64_t term = raft_rpc_get_term(rpc);
4011 if (term
4012 && !raft_should_suppress_disruptive_server(raft, rpc)
4013 && !raft_receive_term__(raft, &rpc->common, term)) {
4014 if (rpc->type == RAFT_RPC_APPEND_REQUEST) {
4015 /* Section 3.3: "If a server receives a request with a stale term
4016 * number, it rejects the request." */
4017 raft_send_append_reply(raft, raft_append_request_cast(rpc),
4018 RAFT_APPEND_INCONSISTENCY, "stale term");
4019 }
4020 return;
4021 }
4022
4023 switch (rpc->type) {
4024#define RAFT_RPC(ENUM, NAME) \
4025 case ENUM: \
4026 raft_handle_##NAME(raft, &rpc->NAME); \
4027 break;
4028 RAFT_RPC_TYPES
4029#undef RAFT_RPC
4030 default:
4031 OVS_NOT_REACHED();
4032 }
4033}
4034\f
4035static bool
4036raft_rpc_is_heartbeat(const union raft_rpc *rpc)
4037{
4038 return ((rpc->type == RAFT_RPC_APPEND_REQUEST
4039 || rpc->type == RAFT_RPC_APPEND_REPLY)
4040 && rpc->common.comment
4041 && !strcmp(rpc->common.comment, "heartbeat"));
4042}
4043
4044\f
4045static bool
02acb41a
BP
4046raft_send_to_conn_at(struct raft *raft, const union raft_rpc *rpc,
4047 struct raft_conn *conn, int line_number)
1b1d2e6d 4048{
02acb41a 4049 log_rpc(rpc, "-->", conn, line_number);
1b1d2e6d
BP
4050 return !jsonrpc_session_send(
4051 conn->js, raft_rpc_to_jsonrpc(&raft->cid, &raft->sid, rpc));
4052}
4053
4054static bool
4055raft_is_rpc_synced(const struct raft *raft, const union raft_rpc *rpc)
4056{
4057 uint64_t term = raft_rpc_get_term(rpc);
4058 uint64_t index = raft_rpc_get_min_sync_index(rpc);
4059 const struct uuid *vote = raft_rpc_get_vote(rpc);
4060
4061 return (term <= raft->synced_term
4062 && index <= raft->log_synced
4063 && (!vote || uuid_equals(vote, &raft->synced_vote)));
4064}
4065
4066static bool
02acb41a 4067raft_send_at(struct raft *raft, const union raft_rpc *rpc, int line_number)
1b1d2e6d
BP
4068{
4069 const struct uuid *dst = &rpc->common.sid;
4070 if (uuid_equals(dst, &raft->sid)) {
4071 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
02acb41a
BP
4072 VLOG_WARN_RL(&rl, "attempted to send RPC to self from raft.c:%d",
4073 line_number);
1b1d2e6d
BP
4074 return false;
4075 }
4076
4077 struct raft_conn *conn = raft_find_conn_by_sid(raft, dst);
4078 if (!conn) {
4079 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4080 char buf[SID_LEN + 1];
02acb41a
BP
4081 VLOG_DBG_RL(&rl, "%s: no connection to %s, cannot send RPC "
4082 "from raft.c:%d", raft->local_nickname,
4083 raft_get_nickname(raft, dst, buf, sizeof buf),
4084 line_number);
1b1d2e6d
BP
4085 return false;
4086 }
4087
4088 if (!raft_is_rpc_synced(raft, rpc)) {
4089 raft_waiter_create(raft, RAFT_W_RPC, false)->rpc = raft_rpc_clone(rpc);
4090 return true;
4091 }
4092
02acb41a 4093 return raft_send_to_conn_at(raft, rpc, conn, line_number);
1b1d2e6d
BP
4094}
4095\f
4096static struct raft *
4097raft_lookup_by_name(const char *name)
4098{
4099 struct raft *raft;
4100
4101 HMAP_FOR_EACH_WITH_HASH (raft, hmap_node, hash_string(name, 0),
4102 &all_rafts) {
4103 if (!strcmp(raft->name, name)) {
4104 return raft;
4105 }
4106 }
4107 return NULL;
4108}
4109
4110static void
4111raft_unixctl_cid(struct unixctl_conn *conn,
4112 int argc OVS_UNUSED, const char *argv[],
4113 void *aux OVS_UNUSED)
4114{
4115 struct raft *raft = raft_lookup_by_name(argv[1]);
4116 if (!raft) {
4117 unixctl_command_reply_error(conn, "unknown cluster");
4118 } else if (uuid_is_zero(&raft->cid)) {
4119 unixctl_command_reply_error(conn, "cluster id not yet known");
4120 } else {
4121 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->cid));
4122 unixctl_command_reply(conn, uuid);
4123 free(uuid);
4124 }
4125}
4126
4127static void
4128raft_unixctl_sid(struct unixctl_conn *conn,
4129 int argc OVS_UNUSED, const char *argv[],
4130 void *aux OVS_UNUSED)
4131{
4132 struct raft *raft = raft_lookup_by_name(argv[1]);
4133 if (!raft) {
4134 unixctl_command_reply_error(conn, "unknown cluster");
4135 } else {
4136 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->sid));
4137 unixctl_command_reply(conn, uuid);
4138 free(uuid);
4139 }
4140}
4141
4142static void
4143raft_put_sid(const char *title, const struct uuid *sid,
4144 const struct raft *raft, struct ds *s)
4145{
4146 ds_put_format(s, "%s: ", title);
4147 if (uuid_equals(sid, &raft->sid)) {
4148 ds_put_cstr(s, "self");
4149 } else if (uuid_is_zero(sid)) {
4150 ds_put_cstr(s, "unknown");
4151 } else {
4152 char buf[SID_LEN + 1];
4153 ds_put_cstr(s, raft_get_nickname(raft, sid, buf, sizeof buf));
4154 }
4155 ds_put_char(s, '\n');
4156}
4157
4158static void
4159raft_unixctl_status(struct unixctl_conn *conn,
4160 int argc OVS_UNUSED, const char *argv[],
4161 void *aux OVS_UNUSED)
4162{
4163 struct raft *raft = raft_lookup_by_name(argv[1]);
4164 if (!raft) {
4165 unixctl_command_reply_error(conn, "unknown cluster");
4166 return;
4167 }
4168
4169 struct ds s = DS_EMPTY_INITIALIZER;
4170 ds_put_format(&s, "%s\n", raft->local_nickname);
4171 ds_put_format(&s, "Name: %s\n", raft->name);
4172 ds_put_format(&s, "Cluster ID: ");
4173 if (!uuid_is_zero(&raft->cid)) {
4174 ds_put_format(&s, CID_FMT" ("UUID_FMT")\n",
4175 CID_ARGS(&raft->cid), UUID_ARGS(&raft->cid));
4176 } else {
4177 ds_put_format(&s, "not yet known\n");
4178 }
4179 ds_put_format(&s, "Server ID: "SID_FMT" ("UUID_FMT")\n",
4180 SID_ARGS(&raft->sid), UUID_ARGS(&raft->sid));
4181 ds_put_format(&s, "Address: %s\n", raft->local_address);
4182 ds_put_format(&s, "Status: %s\n",
4183 raft->joining ? "joining cluster"
4184 : raft->leaving ? "leaving cluster"
4185 : raft->left ? "left cluster"
4186 : raft->failed ? "failed"
4187 : "cluster member");
4188 if (raft->joining) {
4189 ds_put_format(&s, "Remotes for joining:");
4190 const char *address;
4191 SSET_FOR_EACH (address, &raft->remote_addresses) {
4192 ds_put_format(&s, " %s", address);
4193 }
4194 ds_put_char(&s, '\n');
4195 }
4196 if (raft->role == RAFT_LEADER) {
4197 struct raft_server *as;
4198 HMAP_FOR_EACH (as, hmap_node, &raft->add_servers) {
4199 ds_put_format(&s, "Adding server %s ("SID_FMT" at %s) (%s)\n",
4200 as->nickname, SID_ARGS(&as->sid), as->address,
4201 raft_server_phase_to_string(as->phase));
4202 }
4203
4204 struct raft_server *rs = raft->remove_server;
4205 if (rs) {
4206 ds_put_format(&s, "Removing server %s ("SID_FMT" at %s) (%s)\n",
4207 rs->nickname, SID_ARGS(&rs->sid), rs->address,
4208 raft_server_phase_to_string(rs->phase));
4209 }
4210 }
4211
4212 ds_put_format(&s, "Role: %s\n",
4213 raft->role == RAFT_LEADER ? "leader"
4214 : raft->role == RAFT_CANDIDATE ? "candidate"
4215 : raft->role == RAFT_FOLLOWER ? "follower"
4216 : "<error>");
4217 ds_put_format(&s, "Term: %"PRIu64"\n", raft->term);
4218 raft_put_sid("Leader", &raft->leader_sid, raft, &s);
4219 raft_put_sid("Vote", &raft->vote, raft, &s);
4220 ds_put_char(&s, '\n');
4221
4222 ds_put_format(&s, "Log: [%"PRIu64", %"PRIu64"]\n",
4223 raft->log_start, raft->log_end);
4224
4225 uint64_t n_uncommitted = raft->log_end - raft->commit_index - 1;
4226 ds_put_format(&s, "Entries not yet committed: %"PRIu64"\n", n_uncommitted);
4227
4228 uint64_t n_unapplied = raft->log_end - raft->last_applied - 1;
4229 ds_put_format(&s, "Entries not yet applied: %"PRIu64"\n", n_unapplied);
4230
4231 const struct raft_conn *c;
4232 ds_put_cstr(&s, "Connections:");
4233 LIST_FOR_EACH (c, list_node, &raft->conns) {
4234 bool connected = jsonrpc_session_is_connected(c->js);
4235 ds_put_format(&s, " %s%s%s%s",
4236 connected ? "" : "(",
4237 c->incoming ? "<-" : "->", c->nickname,
4238 connected ? "" : ")");
4239 }
4240 ds_put_char(&s, '\n');
4241
4242 ds_put_cstr(&s, "Servers:\n");
4243 struct raft_server *server;
4244 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
4245 ds_put_format(&s, " %s ("SID_FMT" at %s)",
4246 server->nickname,
4247 SID_ARGS(&server->sid), server->address);
4248 if (uuid_equals(&server->sid, &raft->sid)) {
4249 ds_put_cstr(&s, " (self)");
4250 }
4251 if (server->phase != RAFT_PHASE_STABLE) {
4252 ds_put_format (&s, " (%s)",
4253 raft_server_phase_to_string(server->phase));
4254 }
4255 if (raft->role == RAFT_CANDIDATE) {
4256 if (!uuid_is_zero(&server->vote)) {
4257 char buf[SID_LEN + 1];
4258 ds_put_format(&s, " (voted for %s)",
4259 raft_get_nickname(raft, &server->vote,
4260 buf, sizeof buf));
4261 }
4262 } else if (raft->role == RAFT_LEADER) {
4263 ds_put_format(&s, " next_index=%"PRIu64" match_index=%"PRIu64,
4264 server->next_index, server->match_index);
4265 }
4266 ds_put_char(&s, '\n');
4267 }
4268
4269 unixctl_command_reply(conn, ds_cstr(&s));
4270 ds_destroy(&s);
4271}
4272
4273static void
4274raft_unixctl_leave__(struct unixctl_conn *conn, struct raft *raft)
4275{
4276 if (raft_is_leaving(raft)) {
4277 unixctl_command_reply_error(conn,
4278 "already in progress leaving cluster");
4279 } else if (raft_is_joining(raft)) {
4280 unixctl_command_reply_error(conn,
4281 "can't leave while join in progress");
4282 } else if (raft_failed(raft)) {
4283 unixctl_command_reply_error(conn,
4284 "can't leave after failure");
4285 } else {
4286 raft_leave(raft);
4287 unixctl_command_reply(conn, NULL);
4288 }
4289}
4290
4291static void
4292raft_unixctl_leave(struct unixctl_conn *conn, int argc OVS_UNUSED,
4293 const char *argv[], void *aux OVS_UNUSED)
4294{
4295 struct raft *raft = raft_lookup_by_name(argv[1]);
4296 if (!raft) {
4297 unixctl_command_reply_error(conn, "unknown cluster");
4298 return;
4299 }
4300
4301 raft_unixctl_leave__(conn, raft);
4302}
4303
4304static struct raft_server *
4305raft_lookup_server_best_match(struct raft *raft, const char *id)
4306{
4307 struct raft_server *best = NULL;
4308 int best_score = -1;
4309 int n_best = 0;
4310
4311 struct raft_server *s;
4312 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
4313 int score = (!strcmp(id, s->address)
4314 ? INT_MAX
4315 : uuid_is_partial_match(&s->sid, id));
4316 if (score > best_score) {
4317 best = s;
4318 best_score = score;
4319 n_best = 1;
4320 } else if (score == best_score) {
4321 n_best++;
4322 }
4323 }
4324 return n_best == 1 ? best : NULL;
4325}
4326
4327static void
4328raft_unixctl_kick(struct unixctl_conn *conn, int argc OVS_UNUSED,
4329 const char *argv[], void *aux OVS_UNUSED)
4330{
4331 const char *cluster_name = argv[1];
4332 const char *server_name = argv[2];
4333
4334 struct raft *raft = raft_lookup_by_name(cluster_name);
4335 if (!raft) {
4336 unixctl_command_reply_error(conn, "unknown cluster");
4337 return;
4338 }
4339
4340 struct raft_server *server = raft_lookup_server_best_match(raft,
4341 server_name);
4342 if (!server) {
4343 unixctl_command_reply_error(conn, "unknown server");
4344 return;
4345 }
4346
4347 if (uuid_equals(&server->sid, &raft->sid)) {
4348 raft_unixctl_leave__(conn, raft);
4349 } else if (raft->role == RAFT_LEADER) {
4350 const struct raft_remove_server_request rq = {
4351 .sid = server->sid,
4352 .requester_conn = conn,
4353 };
4354 raft_handle_remove_server_request(raft, &rq);
4355 } else {
4356 const union raft_rpc rpc = {
4357 .remove_server_request = {
4358 .common = {
4359 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
4360 .sid = raft->leader_sid,
4361 .comment = "via unixctl"
4362 },
4363 .sid = server->sid,
4364 }
4365 };
4366 if (raft_send(raft, &rpc)) {
4367 unixctl_command_reply(conn, "sent removal request to leader");
4368 } else {
4369 unixctl_command_reply_error(conn,
4370 "failed to send removal request");
4371 }
4372 }
4373}
4374
4375static void
4376raft_init(void)
4377{
4378 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
4379 if (!ovsthread_once_start(&once)) {
4380 return;
4381 }
4382 unixctl_command_register("cluster/cid", "DB", 1, 1,
4383 raft_unixctl_cid, NULL);
4384 unixctl_command_register("cluster/sid", "DB", 1, 1,
4385 raft_unixctl_sid, NULL);
4386 unixctl_command_register("cluster/status", "DB", 1, 1,
4387 raft_unixctl_status, NULL);
4388 unixctl_command_register("cluster/leave", "DB", 1, 1,
4389 raft_unixctl_leave, NULL);
4390 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4391 raft_unixctl_kick, NULL);
4392 ovsthread_once_done(&once);
4393}