]> git.proxmox.com Git - mirror_ovs.git/blame - ovsdb/raft.c
ovsdb raft: Sync commit index to followers without delay.
[mirror_ovs.git] / ovsdb / raft.c
CommitLineData
1b1d2e6d
BP
1/*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <config.h>
18
19#include "raft.h"
20#include "raft-private.h"
21
22#include <errno.h>
23#include <unistd.h>
24
25#include "hash.h"
26#include "jsonrpc.h"
27#include "lockfile.h"
28#include "openvswitch/dynamic-string.h"
29#include "openvswitch/hmap.h"
30#include "openvswitch/json.h"
31#include "openvswitch/list.h"
32#include "openvswitch/poll-loop.h"
33#include "openvswitch/vlog.h"
34#include "ovsdb-error.h"
35#include "ovsdb-parser.h"
36#include "ovsdb/log.h"
37#include "raft-rpc.h"
38#include "random.h"
39#include "socket-util.h"
40#include "stream.h"
41#include "timeval.h"
42#include "unicode.h"
43#include "unixctl.h"
44#include "util.h"
45#include "uuid.h"
46
47VLOG_DEFINE_THIS_MODULE(raft);
48
49/* Roles for a Raft server:
50 *
51 * - Followers: Servers in touch with the current leader.
52 *
53 * - Candidate: Servers unaware of a current leader and seeking election to
54 * leader.
55 *
56 * - Leader: Handles all client requests. At most one at a time.
57 *
58 * In normal operation there is exactly one leader and all of the other servers
59 * are followers. */
60enum raft_role {
61 RAFT_FOLLOWER,
62 RAFT_CANDIDATE,
63 RAFT_LEADER
64};
65
66/* A connection between this Raft server and another one. */
67struct raft_conn {
68 struct ovs_list list_node; /* In struct raft's 'conns' list. */
69 struct jsonrpc_session *js; /* JSON-RPC connection. */
70 struct uuid sid; /* This server's unique ID. */
71 char *nickname; /* Short name for use in log messages. */
72 bool incoming; /* True if incoming, false if outgoing. */
73 unsigned int js_seqno; /* Seqno for noticing (re)connections. */
74};
75
76static void raft_conn_close(struct raft_conn *);
77
78/* A "command", that is, a request to append an entry to the log.
79 *
80 * The Raft specification only allows clients to issue commands to the leader.
81 * With this implementation, clients may issue a command on any server, which
82 * then relays the command to the leader if necessary.
83 *
84 * This structure is thus used in three cases:
85 *
86 * 1. We are the leader and the command was issued to us directly.
87 *
88 * 2. We are a follower and relayed the command to the leader.
89 *
90 * 3. We are the leader and a follower relayed the command to us.
91 */
92struct raft_command {
93 /* All cases. */
94 struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
95 unsigned int n_refs; /* Reference count. */
96 enum raft_command_status status; /* Execution status. */
97
98 /* Case 1 only. */
99 uint64_t index; /* Index in log (0 if being relayed). */
100
101 /* Cases 2 and 3. */
102 struct uuid eid; /* Entry ID of result. */
103
104 /* Case 2 only. */
105 long long int timestamp; /* Issue or last ping time, for expiration. */
106
107 /* Case 3 only. */
108 struct uuid sid; /* The follower (otherwise UUID_ZERO). */
109};
110
111static void raft_command_complete(struct raft *, struct raft_command *,
112 enum raft_command_status);
113
114static void raft_complete_all_commands(struct raft *,
115 enum raft_command_status);
116
117/* Type of deferred action, see struct raft_waiter. */
118enum raft_waiter_type {
119 RAFT_W_ENTRY,
120 RAFT_W_TERM,
121 RAFT_W_RPC,
122};
123
124/* An action deferred until a log write commits to disk. */
125struct raft_waiter {
126 struct ovs_list list_node;
127 uint64_t commit_ticket;
128
129 enum raft_waiter_type type;
130 union {
131 /* RAFT_W_ENTRY.
132 *
133 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
134 * completion, updates 'log_synced' to indicate that the new log entry
135 * or entries are committed and, if we are leader, also updates our
136 * local 'match_index'. */
137 struct {
138 uint64_t index;
139 } entry;
140
141 /* RAFT_W_TERM.
142 *
143 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
144 * Upon completion, updates 'synced_term' and 'synced_vote', which
145 * triggers sending RPCs deferred by the uncommitted 'term' and
146 * 'vote'. */
147 struct {
148 uint64_t term;
149 struct uuid vote;
150 } term;
151
152 /* RAFT_W_RPC.
153 *
154 * Sometimes, sending an RPC to a peer must be delayed until an entry,
155 * a term, or a vote mentioned in the RPC is synced to disk. This
156 * waiter keeps a copy of such an RPC until the previous waiters have
157 * committed. */
158 union raft_rpc *rpc;
159 };
160};
161
162static struct raft_waiter *raft_waiter_create(struct raft *,
163 enum raft_waiter_type,
164 bool start_commit);
165static void raft_waiters_destroy(struct raft *);
166
167/* The Raft state machine. */
168struct raft {
169 struct hmap_node hmap_node; /* In 'all_rafts'. */
170 struct ovsdb_log *log;
171
172/* Persistent derived state.
173 *
174 * This must be updated on stable storage before responding to RPCs. It can be
175 * derived from the header, snapshot, and log in 'log'. */
176
177 struct uuid cid; /* Cluster ID (immutable for the cluster). */
178 struct uuid sid; /* Server ID (immutable for the server). */
179 char *local_address; /* Local address (immutable for the server). */
180 char *local_nickname; /* Used for local server in log messages. */
181 char *name; /* Schema name (immutable for the cluster). */
182
183 /* Contains "struct raft_server"s and represents the server configuration
184 * most recently added to 'log'. */
185 struct hmap servers;
186
187/* Persistent state on all servers.
188 *
189 * Must be updated on stable storage before responding to RPCs. */
190
191 /* Current term and the vote for that term. These might be on the way to
192 * disk now. */
193 uint64_t term; /* Initialized to 0 and only increases. */
194 struct uuid vote; /* All-zeros if no vote yet in 'term'. */
195
196 /* The term and vote that have been synced to disk. */
197 uint64_t synced_term;
198 struct uuid synced_vote;
199
200 /* The log.
201 *
202 * A log entry with index 1 never really exists; the initial snapshot for a
203 * Raft is considered to include this index. The first real log entry has
204 * index 2.
205 *
206 * A new Raft instance contains an empty log: log_start=2, log_end=2.
207 * Over time, the log grows: log_start=2, log_end=N.
208 * At some point, the server takes a snapshot: log_start=N, log_end=N.
209 * The log continues to grow: log_start=N, log_end=N+1...
210 *
211 * Must be updated on stable storage before responding to RPCs. */
212 struct raft_entry *entries; /* Log entry i is in log[i - log_start]. */
213 uint64_t log_start; /* Index of first entry in log. */
214 uint64_t log_end; /* Index of last entry in log, plus 1. */
215 uint64_t log_synced; /* Index of last synced entry. */
216 size_t allocated_log; /* Allocated entries in 'log'. */
217
218 /* Snapshot state (see Figure 5.1)
219 *
220 * This is the state of the cluster as of the last discarded log entry,
221 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
222 * Only committed log entries can be included in a snapshot. */
223 struct raft_entry snap;
224
225/* Volatile state.
226 *
227 * The snapshot is always committed, but the rest of the log might not be yet.
228 * 'last_applied' tracks what entries have been passed to the client. If the
229 * client hasn't yet read the latest snapshot, then even the snapshot isn't
230 * applied yet. Thus, the invariants are different for these members:
231 *
232 * log_start - 2 <= last_applied <= commit_index < log_end.
233 * log_start - 1 <= commit_index < log_end.
234 */
235
236 enum raft_role role; /* Current role. */
237 uint64_t commit_index; /* Max log index known to be committed. */
238 uint64_t last_applied; /* Max log index applied to state machine. */
239 struct uuid leader_sid; /* Server ID of leader (zero, if unknown). */
240
241 /* Followers and candidates only. */
242#define ELECTION_BASE_MSEC 1024
243#define ELECTION_RANGE_MSEC 1024
244 long long int election_base; /* Time of last heartbeat from leader. */
245 long long int election_timeout; /* Time at which we start an election. */
246
247 /* Used for joining a cluster. */
248 bool joining; /* Attempting to join the cluster? */
249 struct sset remote_addresses; /* Addresses to try to find other servers. */
250 long long int join_timeout; /* Time to re-send add server request. */
251
252 /* Used for leaving a cluster. */
253 bool leaving; /* True if we are leaving the cluster. */
254 bool left; /* True if we have finished leaving. */
255 long long int leave_timeout; /* Time to re-send remove server request. */
256
257 /* Failure. */
258 bool failed; /* True if unrecoverable error has occurred. */
259
260 /* File synchronization. */
261 struct ovs_list waiters; /* Contains "struct raft_waiter"s. */
262
263 /* Network connections. */
264 struct pstream *listener; /* For connections from other Raft servers. */
265 long long int listen_backoff; /* For retrying creating 'listener'. */
266 struct ovs_list conns; /* Contains struct raft_conns. */
267
268 /* Leaders only. Reinitialized after becoming leader. */
269 struct hmap add_servers; /* Contains "struct raft_server"s to add. */
270 struct raft_server *remove_server; /* Server being removed. */
271 struct hmap commands; /* Contains "struct raft_command"s. */
272#define PING_TIME_MSEC (ELECTION_BASE_MSEC / 3)
273 long long int ping_timeout; /* Time at which to send a heartbeat */
274
275 /* Candidates only. Reinitialized at start of election. */
276 int n_votes; /* Number of votes for me. */
277};
278
279/* All Raft structures. */
280static struct hmap all_rafts = HMAP_INITIALIZER(&all_rafts);
281
282static void raft_init(void);
283
284static struct ovsdb_error *raft_read_header(struct raft *)
285 OVS_WARN_UNUSED_RESULT;
286
287static void raft_send_execute_command_reply(struct raft *,
288 const struct uuid *sid,
289 const struct uuid *eid,
290 enum raft_command_status,
291 uint64_t commit_index);
292
293static void raft_update_our_match_index(struct raft *, uint64_t min_index);
294
295static void raft_send_remove_server_reply__(
296 struct raft *, const struct uuid *target_sid,
297 const struct uuid *requester_sid, struct unixctl_conn *requester_conn,
298 bool success, const char *comment);
17bd4149 299static void raft_finished_leaving_cluster(struct raft *);
1b1d2e6d
BP
300
301static void raft_server_init_leader(struct raft *, struct raft_server *);
302
303static bool raft_rpc_is_heartbeat(const union raft_rpc *);
304static bool raft_is_rpc_synced(const struct raft *, const union raft_rpc *);
305
306static void raft_handle_rpc(struct raft *, const union raft_rpc *);
17bd4149 307
02acb41a
BP
308static bool raft_send_at(struct raft *, const union raft_rpc *,
309 int line_number);
310#define raft_send(raft, rpc) raft_send_at(raft, rpc, __LINE__)
311
312static bool raft_send_to_conn_at(struct raft *, const union raft_rpc *,
313 struct raft_conn *, int line_number);
314#define raft_send_to_conn(raft, rpc, conn) \
315 raft_send_to_conn_at(raft, rpc, conn, __LINE__)
316
1b1d2e6d
BP
317static void raft_send_append_request(struct raft *,
318 struct raft_server *, unsigned int n,
319 const char *comment);
320
321static void raft_become_leader(struct raft *);
322static void raft_become_follower(struct raft *);
0f954f32
HZ
323static void raft_reset_election_timer(struct raft *);
324static void raft_reset_ping_timer(struct raft *);
1b1d2e6d
BP
325static void raft_send_heartbeats(struct raft *);
326static void raft_start_election(struct raft *, bool leadership_transfer);
327static bool raft_truncate(struct raft *, uint64_t new_end);
328static void raft_get_servers_from_log(struct raft *, enum vlog_level);
329
330static bool raft_handle_write_error(struct raft *, struct ovsdb_error *);
331
332static void raft_run_reconfigure(struct raft *);
333
334static struct raft_server *
335raft_find_server(const struct raft *raft, const struct uuid *sid)
336{
337 return raft_server_find(&raft->servers, sid);
338}
339
340static char *
341raft_make_address_passive(const char *address_)
342{
343 if (!strncmp(address_, "unix:", 5)) {
344 return xasprintf("p%s", address_);
345 } else {
346 char *address = xstrdup(address_);
0b043300
BP
347 char *host, *port;
348 inet_parse_host_port_tokens(strchr(address, ':') + 1, &host, &port);
1b1d2e6d
BP
349
350 struct ds paddr = DS_EMPTY_INITIALIZER;
351 ds_put_format(&paddr, "p%.3s:%s:", address, port);
352 if (strchr(host, ':')) {
353 ds_put_format(&paddr, "[%s]", host);
354 } else {
355 ds_put_cstr(&paddr, host);
356 }
357 free(address);
358 return ds_steal_cstr(&paddr);
359 }
360}
361
362static struct raft *
363raft_alloc(void)
364{
365 raft_init();
366
367 struct raft *raft = xzalloc(sizeof *raft);
368 hmap_node_nullify(&raft->hmap_node);
369 hmap_init(&raft->servers);
370 raft->log_start = raft->log_end = 1;
371 raft->role = RAFT_FOLLOWER;
372 sset_init(&raft->remote_addresses);
373 raft->join_timeout = LLONG_MAX;
374 ovs_list_init(&raft->waiters);
375 raft->listen_backoff = LLONG_MIN;
376 ovs_list_init(&raft->conns);
377 hmap_init(&raft->add_servers);
378 hmap_init(&raft->commands);
379
0f954f32
HZ
380 raft_reset_ping_timer(raft);
381 raft_reset_election_timer(raft);
1b1d2e6d
BP
382
383 return raft;
384}
385
386/* Creates an on-disk file that represents a new Raft cluster and initializes
387 * it to consist of a single server, the one on which this function is called.
388 *
389 * Creates the local copy of the cluster's log in 'file_name', which must not
390 * already exist. Gives it the name 'name', which should be the database
391 * schema name and which is used only to match up this database with the server
392 * added to the cluster later if the cluster ID is unavailable.
393 *
394 * The new server is located at 'local_address', which must take one of the
395 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
396 * square bracket enclosed IPv6 address and PORT is a TCP port number.
397 *
398 * This only creates the on-disk file. Use raft_open() to start operating the
399 * new server.
400 *
401 * Returns null if successful, otherwise an ovsdb_error describing the
402 * problem. */
403struct ovsdb_error * OVS_WARN_UNUSED_RESULT
404raft_create_cluster(const char *file_name, const char *name,
405 const char *local_address, const struct json *data)
406{
407 /* Parse and verify validity of the local address. */
408 struct ovsdb_error *error = raft_address_validate(local_address);
409 if (error) {
410 return error;
411 }
412
413 /* Create log file. */
414 struct ovsdb_log *log;
415 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
416 -1, &log);
417 if (error) {
418 return error;
419 }
420
421 /* Write log file. */
422 struct raft_header h = {
423 .sid = uuid_random(),
424 .cid = uuid_random(),
425 .name = xstrdup(name),
426 .local_address = xstrdup(local_address),
427 .joining = false,
428 .remote_addresses = SSET_INITIALIZER(&h.remote_addresses),
429 .snap_index = 1,
430 .snap = {
431 .term = 1,
432 .data = json_nullable_clone(data),
433 .eid = uuid_random(),
434 .servers = json_object_create(),
435 },
436 };
437 shash_add_nocopy(json_object(h.snap.servers),
438 xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
439 json_string_create(local_address));
440 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
441 raft_header_uninit(&h);
442 if (!error) {
443 error = ovsdb_log_commit_block(log);
444 }
445 ovsdb_log_close(log);
446
447 return error;
448}
449
450/* Creates a database file that represents a new server in an existing Raft
451 * cluster.
452 *
453 * Creates the local copy of the cluster's log in 'file_name', which must not
454 * already exist. Gives it the name 'name', which must be the same name
455 * passed in to raft_create_cluster() earlier.
456 *
457 * 'cid' is optional. If specified, the new server will join only the cluster
458 * with the given cluster ID.
459 *
460 * The new server is located at 'local_address', which must take one of the
461 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
462 * square bracket enclosed IPv6 address and PORT is a TCP port number.
463 *
464 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
465 * specifies the addresses of existing servers in the cluster. One server out
466 * of the existing cluster is sufficient, as long as that server is reachable
467 * and not partitioned from the current cluster leader. If multiple servers
468 * from the cluster are specified, then it is sufficient for any of them to
469 * meet this criterion.
470 *
471 * This only creates the on-disk file and does no network access. Use
472 * raft_open() to start operating the new server. (Until this happens, the
473 * new server has not joined the cluster.)
474 *
475 * Returns null if successful, otherwise an ovsdb_error describing the
476 * problem. */
477struct ovsdb_error * OVS_WARN_UNUSED_RESULT
478raft_join_cluster(const char *file_name,
479 const char *name, const char *local_address,
480 const struct sset *remote_addresses,
481 const struct uuid *cid)
482{
483 ovs_assert(!sset_is_empty(remote_addresses));
484
485 /* Parse and verify validity of the addresses. */
486 struct ovsdb_error *error = raft_address_validate(local_address);
487 if (error) {
488 return error;
489 }
490 const char *addr;
491 SSET_FOR_EACH (addr, remote_addresses) {
492 error = raft_address_validate(addr);
493 if (error) {
494 return error;
495 }
496 if (!strcmp(addr, local_address)) {
497 return ovsdb_error(NULL, "remote addresses cannot be the same "
498 "as the local address");
499 }
500 }
501
502 /* Verify validity of the cluster ID (if provided). */
503 if (cid && uuid_is_zero(cid)) {
504 return ovsdb_error(NULL, "all-zero UUID is not valid cluster ID");
505 }
506
507 /* Create log file. */
508 struct ovsdb_log *log;
509 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
510 -1, &log);
511 if (error) {
512 return error;
513 }
514
515 /* Write log file. */
516 struct raft_header h = {
517 .sid = uuid_random(),
518 .cid = cid ? *cid : UUID_ZERO,
519 .name = xstrdup(name),
520 .local_address = xstrdup(local_address),
521 .joining = true,
522 /* No snapshot yet. */
523 };
524 sset_clone(&h.remote_addresses, remote_addresses);
525 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
526 raft_header_uninit(&h);
527 if (!error) {
528 error = ovsdb_log_commit_block(log);
529 }
530 ovsdb_log_close(log);
531
532 return error;
533}
534
535/* Reads the initial header record from 'log', which must be a Raft clustered
536 * database log, and populates '*md' with the information read from it. The
537 * caller must eventually destroy 'md' with raft_metadata_destroy().
538 *
539 * On success, returns NULL. On failure, returns an error that the caller must
540 * eventually destroy and zeros '*md'. */
541struct ovsdb_error * OVS_WARN_UNUSED_RESULT
542raft_read_metadata(struct ovsdb_log *log, struct raft_metadata *md)
543{
544 struct raft *raft = raft_alloc();
545 raft->log = log;
546
547 struct ovsdb_error *error = raft_read_header(raft);
548 if (!error) {
549 md->sid = raft->sid;
550 md->name = xstrdup(raft->name);
551 md->local = xstrdup(raft->local_address);
552 md->cid = raft->cid;
553 } else {
554 memset(md, 0, sizeof *md);
555 }
556
557 raft->log = NULL;
558 raft_close(raft);
559 return error;
560}
561
562/* Frees the metadata in 'md'. */
563void
564raft_metadata_destroy(struct raft_metadata *md)
565{
566 if (md) {
567 free(md->name);
568 free(md->local);
569 }
570}
571
572static const struct raft_entry *
573raft_get_entry(const struct raft *raft, uint64_t index)
574{
575 ovs_assert(index >= raft->log_start);
576 ovs_assert(index < raft->log_end);
577 return &raft->entries[index - raft->log_start];
578}
579
580static uint64_t
581raft_get_term(const struct raft *raft, uint64_t index)
582{
583 return (index == raft->log_start - 1
584 ? raft->snap.term
585 : raft_get_entry(raft, index)->term);
586}
587
588static const struct json *
589raft_servers_for_index(const struct raft *raft, uint64_t index)
590{
591 ovs_assert(index >= raft->log_start - 1);
592 ovs_assert(index < raft->log_end);
593
594 const struct json *servers = raft->snap.servers;
595 for (uint64_t i = raft->log_start; i <= index; i++) {
596 const struct raft_entry *e = raft_get_entry(raft, i);
597 if (e->servers) {
598 servers = e->servers;
599 }
600 }
601 return servers;
602}
603
604static void
605raft_set_servers(struct raft *raft, const struct hmap *new_servers,
606 enum vlog_level level)
607{
608 struct raft_server *s, *next;
609 HMAP_FOR_EACH_SAFE (s, next, hmap_node, &raft->servers) {
610 if (!raft_server_find(new_servers, &s->sid)) {
611 ovs_assert(s != raft->remove_server);
612
613 hmap_remove(&raft->servers, &s->hmap_node);
614 VLOG(level, "server %s removed from configuration", s->nickname);
615 raft_server_destroy(s);
616 }
617 }
618
619 HMAP_FOR_EACH_SAFE (s, next, hmap_node, new_servers) {
620 if (!raft_find_server(raft, &s->sid)) {
621 VLOG(level, "server %s added to configuration", s->nickname);
622
623 struct raft_server *new
624 = raft_server_add(&raft->servers, &s->sid, s->address);
625 raft_server_init_leader(raft, new);
626 }
627 }
628}
629
630static uint64_t
631raft_add_entry(struct raft *raft,
632 uint64_t term, struct json *data, const struct uuid *eid,
633 struct json *servers)
634{
635 if (raft->log_end - raft->log_start >= raft->allocated_log) {
636 raft->entries = x2nrealloc(raft->entries, &raft->allocated_log,
637 sizeof *raft->entries);
638 }
639
640 uint64_t index = raft->log_end++;
641 struct raft_entry *entry = &raft->entries[index - raft->log_start];
642 entry->term = term;
643 entry->data = data;
644 entry->eid = eid ? *eid : UUID_ZERO;
645 entry->servers = servers;
646 return index;
647}
648
649/* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers' to
650 * 'raft''s log and returns an error indication. */
651static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
652raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
653 const struct uuid *eid, struct json *servers)
654{
655 struct raft_record r = {
656 .type = RAFT_REC_ENTRY,
657 .term = term,
658 .entry = {
659 .index = raft_add_entry(raft, term, data, eid, servers),
660 .data = data,
661 .servers = servers,
662 .eid = eid ? *eid : UUID_ZERO,
663 },
664 };
665 return ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r));
666}
667
668static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
669raft_write_state(struct ovsdb_log *log,
670 uint64_t term, const struct uuid *vote)
671{
672 struct raft_record r = { .term = term };
673 if (vote && !uuid_is_zero(vote)) {
674 r.type = RAFT_REC_VOTE;
675 r.sid = *vote;
676 } else {
677 r.type = RAFT_REC_TERM;
678 }
679 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
680}
681
682static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
683raft_apply_record(struct raft *raft, unsigned long long int rec_idx,
684 const struct raft_record *r)
685{
686 /* Apply "term", which is present in most kinds of records (and otherwise
687 * 0).
688 *
689 * A Raft leader can replicate entries from previous terms to the other
690 * servers in the cluster, retaining the original terms on those entries
691 * (see section 3.6.2 "Committing entries from previous terms" for more
692 * information), so it's OK for the term in a log record to precede the
693 * current term. */
694 if (r->term > raft->term) {
695 raft->term = raft->synced_term = r->term;
696 raft->vote = raft->synced_vote = UUID_ZERO;
697 }
698
699 switch (r->type) {
700 case RAFT_REC_ENTRY:
701 if (r->entry.index < raft->commit_index) {
702 return ovsdb_error(NULL, "record %llu attempts to truncate log "
703 "from %"PRIu64" to %"PRIu64" entries, but "
704 "commit index is already %"PRIu64,
705 rec_idx, raft->log_end, r->entry.index,
706 raft->commit_index);
707 } else if (r->entry.index > raft->log_end) {
708 return ovsdb_error(NULL, "record %llu with index %"PRIu64" skips "
709 "past expected index %"PRIu64,
710 rec_idx, r->entry.index, raft->log_end);
711 }
712
713 if (r->entry.index < raft->log_end) {
714 /* This can happen, but it is notable. */
715 VLOG_DBG("record %llu truncates log from %"PRIu64" to %"PRIu64
716 " entries", rec_idx, raft->log_end, r->entry.index);
717 raft_truncate(raft, r->entry.index);
718 }
719
720 uint64_t prev_term = (raft->log_end > raft->log_start
721 ? raft->entries[raft->log_end
722 - raft->log_start - 1].term
723 : raft->snap.term);
724 if (r->term < prev_term) {
725 return ovsdb_error(NULL, "record %llu with index %"PRIu64" term "
726 "%"PRIu64" precedes previous entry's term "
727 "%"PRIu64,
728 rec_idx, r->entry.index, r->term, prev_term);
729 }
730
731 raft->log_synced = raft_add_entry(
732 raft, r->term,
733 json_nullable_clone(r->entry.data), &r->entry.eid,
734 json_nullable_clone(r->entry.servers));
735 return NULL;
736
737 case RAFT_REC_TERM:
738 return NULL;
739
740 case RAFT_REC_VOTE:
741 if (r->term < raft->term) {
742 return ovsdb_error(NULL, "record %llu votes for term %"PRIu64" "
743 "but current term is %"PRIu64,
744 rec_idx, r->term, raft->term);
745 } else if (!uuid_is_zero(&raft->vote)
746 && !uuid_equals(&raft->vote, &r->sid)) {
747 return ovsdb_error(NULL, "record %llu votes for "SID_FMT" in term "
748 "%"PRIu64" but a previous record for the "
749 "same term voted for "SID_FMT, rec_idx,
750 SID_ARGS(&raft->vote), r->term,
751 SID_ARGS(&r->sid));
752 } else {
753 raft->vote = raft->synced_vote = r->sid;
754 return NULL;
755 }
756 break;
757
758 case RAFT_REC_NOTE:
759 if (!strcmp(r->note, "left")) {
760 return ovsdb_error(NULL, "record %llu indicates server has left "
761 "the cluster; it cannot be added back (use "
762 "\"ovsdb-tool join-cluster\" to add a new "
763 "server)", rec_idx);
764 }
765 return NULL;
766
767 case RAFT_REC_COMMIT_INDEX:
768 if (r->commit_index < raft->commit_index) {
769 return ovsdb_error(NULL, "record %llu regresses commit index "
770 "from %"PRIu64 " to %"PRIu64,
771 rec_idx, raft->commit_index, r->commit_index);
772 } else if (r->commit_index >= raft->log_end) {
773 return ovsdb_error(NULL, "record %llu advances commit index to "
774 "%"PRIu64 " but last log index is %"PRIu64,
775 rec_idx, r->commit_index, raft->log_end - 1);
776 } else {
777 raft->commit_index = r->commit_index;
778 return NULL;
779 }
780 break;
781
782 case RAFT_REC_LEADER:
783 /* XXX we could use this to take back leadership for quick restart */
784 return NULL;
785
786 default:
787 OVS_NOT_REACHED();
788 }
789}
790
791static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
792raft_read_header(struct raft *raft)
793{
794 /* Read header record. */
795 struct json *json;
796 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
797 if (error || !json) {
798 /* Report error or end-of-file. */
799 return error;
800 }
801 ovsdb_log_mark_base(raft->log);
802
803 struct raft_header h;
804 error = raft_header_from_json(&h, json);
805 json_destroy(json);
806 if (error) {
807 return error;
808 }
809
810 raft->sid = h.sid;
811 raft->cid = h.cid;
812 raft->name = xstrdup(h.name);
813 raft->local_address = xstrdup(h.local_address);
814 raft->local_nickname = raft_address_to_nickname(h.local_address, &h.sid);
815 raft->joining = h.joining;
816
817 if (h.joining) {
818 sset_clone(&raft->remote_addresses, &h.remote_addresses);
819 } else {
820 raft_entry_clone(&raft->snap, &h.snap);
821 raft->log_start = raft->log_end = h.snap_index + 1;
822 raft->commit_index = h.snap_index;
823 raft->last_applied = h.snap_index - 1;
824 }
825
826 raft_header_uninit(&h);
827
828 return NULL;
829}
830
831static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
832raft_read_log(struct raft *raft)
833{
834 for (unsigned long long int i = 1; ; i++) {
835 struct json *json;
836 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
837 if (!json) {
838 if (error) {
839 /* We assume that the error is due to a partial write while
840 * appending to the file before a crash, so log it and
841 * continue. */
842 char *error_string = ovsdb_error_to_string_free(error);
843 VLOG_WARN("%s", error_string);
844 free(error_string);
845 error = NULL;
846 }
847 break;
848 }
849
850 struct raft_record r;
851 error = raft_record_from_json(&r, json);
852 if (!error) {
853 error = raft_apply_record(raft, i, &r);
854 raft_record_uninit(&r);
855 }
856 if (error) {
857 return ovsdb_wrap_error(error, "error reading record %llu from "
858 "%s log", i, raft->name);
859 }
860 }
861
862 /* Set the most recent servers. */
863 raft_get_servers_from_log(raft, VLL_DBG);
864
865 return NULL;
866}
867
868static void
0f954f32 869raft_reset_election_timer(struct raft *raft)
1b1d2e6d
BP
870{
871 unsigned int duration = (ELECTION_BASE_MSEC
872 + random_range(ELECTION_RANGE_MSEC));
873 raft->election_base = time_msec();
874 raft->election_timeout = raft->election_base + duration;
875}
876
0f954f32
HZ
877static void
878raft_reset_ping_timer(struct raft *raft)
879{
880 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
881}
882
1b1d2e6d
BP
883static void
884raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
885 const struct uuid *sid, bool incoming)
886{
887 struct raft_conn *conn = xzalloc(sizeof *conn);
888 ovs_list_push_back(&raft->conns, &conn->list_node);
889 conn->js = js;
890 if (sid) {
891 conn->sid = *sid;
892 }
893 conn->nickname = raft_address_to_nickname(jsonrpc_session_get_name(js),
894 &conn->sid);
895 conn->incoming = incoming;
896 conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
897}
898
899/* Starts the local server in an existing Raft cluster, using the local copy of
900 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
901 * successful or not. */
902struct ovsdb_error * OVS_WARN_UNUSED_RESULT
903raft_open(struct ovsdb_log *log, struct raft **raftp)
904{
905 struct raft *raft = raft_alloc();
906 raft->log = log;
907
908 struct ovsdb_error *error = raft_read_header(raft);
909 if (error) {
910 goto error;
911 }
912
913 if (!raft->joining) {
914 error = raft_read_log(raft);
915 if (error) {
916 goto error;
917 }
918
919 /* Find our own server. */
920 if (!raft_find_server(raft, &raft->sid)) {
921 error = ovsdb_error(NULL, "server does not belong to cluster");
922 goto error;
923 }
924
925 /* If there's only one server, start an election right away so that the
926 * cluster bootstraps quickly. */
927 if (hmap_count(&raft->servers) == 1) {
928 raft_start_election(raft, false);
929 }
930 } else {
931 raft->join_timeout = time_msec() + 1000;
932 }
933
934 *raftp = raft;
935 hmap_insert(&all_rafts, &raft->hmap_node, hash_string(raft->name, 0));
936 return NULL;
937
938error:
939 raft_close(raft);
940 *raftp = NULL;
941 return error;
942}
943
944/* Returns the name of 'raft', which in OVSDB is the database schema name. */
945const char *
946raft_get_name(const struct raft *raft)
947{
948 return raft->name;
949}
950
951/* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
952 * its cluster, then 'cid' will be all-zeros (unless the administrator
953 * specified a cluster ID running "ovsdb-tool join-cluster").
954 *
955 * Each cluster has a unique cluster ID. */
956const struct uuid *
957raft_get_cid(const struct raft *raft)
958{
959 return &raft->cid;
960}
961
962/* Returns the server ID of 'raft'. Each server has a unique server ID. */
963const struct uuid *
964raft_get_sid(const struct raft *raft)
965{
966 return &raft->sid;
967}
968
969/* Returns true if 'raft' has completed joining its cluster, has not left or
970 * initiated leaving the cluster, does not have failed disk storage, and is
971 * apparently connected to the leader in a healthy way (or is itself the
972 * leader).*/
973bool
974raft_is_connected(const struct raft *raft)
975{
976 return (raft->role != RAFT_CANDIDATE
977 && !raft->joining
978 && !raft->leaving
979 && !raft->left
980 && !raft->failed);
981}
982
983/* Returns true if 'raft' is the cluster leader. */
984bool
985raft_is_leader(const struct raft *raft)
986{
987 return raft->role == RAFT_LEADER;
988}
989
990/* Returns true if 'raft' is the process of joining its cluster. */
991bool
992raft_is_joining(const struct raft *raft)
993{
994 return raft->joining;
995}
996
997/* Only returns *connected* connections. */
998static struct raft_conn *
999raft_find_conn_by_sid(struct raft *raft, const struct uuid *sid)
1000{
1001 if (!uuid_is_zero(sid)) {
1002 struct raft_conn *conn;
1003 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1004 if (uuid_equals(sid, &conn->sid)
1005 && jsonrpc_session_is_connected(conn->js)) {
1006 return conn;
1007 }
1008 }
1009 }
1010 return NULL;
1011}
1012
1013static struct raft_conn *
1014raft_find_conn_by_address(struct raft *raft, const char *address)
1015{
1016 struct raft_conn *conn;
1017 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1018 if (!strcmp(jsonrpc_session_get_name(conn->js), address)) {
1019 return conn;
1020 }
1021 }
1022 return NULL;
1023}
1024
1025static void OVS_PRINTF_FORMAT(3, 4)
1026raft_record_note(struct raft *raft, const char *note,
1027 const char *comment_format, ...)
1028{
1029 va_list args;
1030 va_start(args, comment_format);
1031 char *comment = xvasprintf(comment_format, args);
1032 va_end(args);
1033
1034 struct raft_record r = {
1035 .type = RAFT_REC_NOTE,
1036 .comment = comment,
1037 .note = CONST_CAST(char *, note),
1038 };
1039 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
1040
1041 free(comment);
1042}
1043
1044/* If we're leader, try to transfer leadership to another server, logging
1045 * 'reason' as the human-readable reason (it should be a phrase suitable for
1046 * following "because") . */
1047void
1048raft_transfer_leadership(struct raft *raft, const char *reason)
1049{
1050 if (raft->role != RAFT_LEADER) {
1051 return;
1052 }
1053
1054 struct raft_server *s;
1055 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1056 if (!uuid_equals(&raft->sid, &s->sid)
1057 && s->phase == RAFT_PHASE_STABLE) {
1058 struct raft_conn *conn = raft_find_conn_by_sid(raft, &s->sid);
1059 if (!conn) {
1060 continue;
1061 }
1062
1063 union raft_rpc rpc = {
1064 .become_leader = {
1065 .common = {
1066 .comment = CONST_CAST(char *, reason),
1067 .type = RAFT_RPC_BECOME_LEADER,
1068 .sid = s->sid,
1069 },
1070 .term = raft->term,
1071 }
1072 };
02acb41a 1073 raft_send_to_conn(raft, &rpc, conn);
1b1d2e6d
BP
1074
1075 raft_record_note(raft, "transfer leadership",
1076 "transferring leadership to %s because %s",
1077 s->nickname, reason);
1078 break;
1079 }
1080 }
1081}
1082
1083/* Send a RemoveServerRequest to the rest of the servers in the cluster.
1084 *
1085 * If we know which server is the leader, we can just send the request to it.
1086 * However, we might not know which server is the leader, and we might never
1087 * find out if the remove request was actually previously committed by a
1088 * majority of the servers (because in that case the new leader will not send
1089 * AppendRequests or heartbeats to us). Therefore, we instead send
1090 * RemoveRequests to every server. This theoretically has the same problem, if
1091 * the current cluster leader was not previously a member of the cluster, but
1092 * it seems likely to be more robust in practice. */
1093static void
1094raft_send_remove_server_requests(struct raft *raft)
1095{
1096 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1097 VLOG_INFO_RL(&rl, "sending remove request (joining=%s, leaving=%s)",
1098 raft->joining ? "true" : "false",
1099 raft->leaving ? "true" : "false");
1100 const struct raft_server *s;
1101 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1102 if (!uuid_equals(&s->sid, &raft->sid)) {
1103 union raft_rpc rpc = (union raft_rpc) {
1104 .remove_server_request = {
1105 .common = {
1106 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1107 .sid = s->sid,
1108 },
1109 .sid = raft->sid,
1110 },
1111 };
1112 raft_send(raft, &rpc);
1113 }
1114 }
1115
1116 raft->leave_timeout = time_msec() + ELECTION_BASE_MSEC;
1117}
1118
1119/* Attempts to start 'raft' leaving its cluster. The caller can check progress
1120 * using raft_is_leaving() and raft_left(). */
1121void
1122raft_leave(struct raft *raft)
1123{
1124 if (raft->joining || raft->failed || raft->leaving || raft->left) {
1125 return;
1126 }
1127 VLOG_INFO(SID_FMT": starting to leave cluster "CID_FMT,
1128 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1129 raft->leaving = true;
1130 raft_transfer_leadership(raft, "this server is leaving the cluster");
1131 raft_become_follower(raft);
1132 raft_send_remove_server_requests(raft);
1133 raft->leave_timeout = time_msec() + ELECTION_BASE_MSEC;
1134}
1135
1136/* Returns true if 'raft' is currently attempting to leave its cluster. */
1137bool
1138raft_is_leaving(const struct raft *raft)
1139{
1140 return raft->leaving;
1141}
1142
1143/* Returns true if 'raft' successfully left its cluster. */
1144bool
1145raft_left(const struct raft *raft)
1146{
1147 return raft->left;
1148}
1149
1150/* Returns true if 'raft' has experienced a disk I/O failure. When this
1151 * returns true, only closing and reopening 'raft' allows for recovery. */
1152bool
1153raft_failed(const struct raft *raft)
1154{
1155 return raft->failed;
1156}
1157
1158/* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1159 * current cluster. */
1160void
1161raft_take_leadership(struct raft *raft)
1162{
1163 if (raft->role != RAFT_LEADER) {
1164 raft_start_election(raft, true);
1165 }
1166}
1167
1168/* Closes everything owned by 'raft' that might be visible outside the process:
1169 * network connections, commands, etc. This is part of closing 'raft'; it is
1170 * also used if 'raft' has failed in an unrecoverable way. */
1171static void
1172raft_close__(struct raft *raft)
1173{
1174 if (!hmap_node_is_null(&raft->hmap_node)) {
1175 hmap_remove(&all_rafts, &raft->hmap_node);
1176 hmap_node_nullify(&raft->hmap_node);
1177 }
1178
1179 raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
1180
1181 struct raft_server *rs = raft->remove_server;
1182 if (rs) {
1183 raft_send_remove_server_reply__(raft, &rs->sid, &rs->requester_sid,
1184 rs->requester_conn, false,
1185 RAFT_SERVER_SHUTDOWN);
1186 raft_server_destroy(raft->remove_server);
1187 raft->remove_server = NULL;
1188 }
1189
1190 struct raft_conn *conn, *next;
1191 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1192 raft_conn_close(conn);
1193 }
1194}
1195
1196/* Closes and frees 'raft'.
1197 *
1198 * A server's cluster membership is independent of whether the server is
1199 * actually running. When a server that is a member of a cluster closes, the
1200 * cluster treats this as a server failure. */
1201void
1202raft_close(struct raft *raft)
1203{
1204 if (!raft) {
1205 return;
1206 }
1207
1208 raft_transfer_leadership(raft, "this server is shutting down");
1209
1210 raft_close__(raft);
1211
1212 ovsdb_log_close(raft->log);
1213
1214 raft_servers_destroy(&raft->servers);
1215
1216 for (uint64_t index = raft->log_start; index < raft->log_end; index++) {
1217 struct raft_entry *e = &raft->entries[index - raft->log_start];
1218 raft_entry_uninit(e);
1219 }
1220 free(raft->entries);
1221
1222 raft_entry_uninit(&raft->snap);
1223
1224 raft_waiters_destroy(raft);
1225
1226 raft_servers_destroy(&raft->add_servers);
1227
1228 hmap_destroy(&raft->commands);
1229
1230 pstream_close(raft->listener);
1231
1232 sset_destroy(&raft->remote_addresses);
1233 free(raft->local_address);
1234 free(raft->local_nickname);
1235 free(raft->name);
1236
1237 free(raft);
1238}
1239
1240static bool
1241raft_conn_receive(struct raft *raft, struct raft_conn *conn,
1242 union raft_rpc *rpc)
1243{
1244 struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
1245 if (!msg) {
1246 return false;
1247 }
1248
1249 struct ovsdb_error *error = raft_rpc_from_jsonrpc(&raft->cid, &raft->sid,
1250 msg, rpc);
1251 jsonrpc_msg_destroy(msg);
1252 if (error) {
1253 char *s = ovsdb_error_to_string_free(error);
1254 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn->js), s);
1255 free(s);
1256 return false;
1257 }
1258
1259 if (uuid_is_zero(&conn->sid)) {
1260 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1261 conn->sid = rpc->common.sid;
1262 VLOG_INFO_RL(&rl, "%s: learned server ID "SID_FMT,
1263 jsonrpc_session_get_name(conn->js), SID_ARGS(&conn->sid));
1264 } else if (!uuid_equals(&conn->sid, &rpc->common.sid)) {
1265 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1266 VLOG_WARN_RL(&rl, "%s: ignoring message with unexpected server ID "
1267 SID_FMT" (expected "SID_FMT")",
1268 jsonrpc_session_get_name(conn->js),
1269 SID_ARGS(&rpc->common.sid), SID_ARGS(&conn->sid));
1270 raft_rpc_uninit(rpc);
1271 return false;
1272 }
1273
1274 const char *address = (rpc->type == RAFT_RPC_HELLO_REQUEST
1275 ? rpc->hello_request.address
1276 : rpc->type == RAFT_RPC_ADD_SERVER_REQUEST
1277 ? rpc->add_server_request.address
1278 : NULL);
1279 if (address) {
1280 char *new_nickname = raft_address_to_nickname(address, &conn->sid);
1281 if (strcmp(conn->nickname, new_nickname)) {
1282 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1283 VLOG_INFO_RL(&rl, "%s: learned remote address %s",
1284 jsonrpc_session_get_name(conn->js), address);
1285
1286 free(conn->nickname);
1287 conn->nickname = new_nickname;
1288 } else {
1289 free(new_nickname);
1290 }
1291 }
1292
1293 return true;
1294}
1295
1296static const char *
1297raft_get_nickname(const struct raft *raft, const struct uuid *sid,
1298 char buf[SID_LEN + 1], size_t bufsize)
1299{
1300 if (uuid_equals(sid, &raft->sid)) {
1301 return raft->local_nickname;
1302 }
1303
1304 const char *s = raft_servers_get_nickname__(&raft->servers, sid);
1305 if (s) {
1306 return s;
1307 }
1308
1309 return raft_servers_get_nickname(&raft->add_servers, sid, buf, bufsize);
1310}
1311
1312static void
02acb41a
BP
1313log_rpc(const union raft_rpc *rpc, const char *direction,
1314 const struct raft_conn *conn, int line_number)
1b1d2e6d
BP
1315{
1316 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
1317 if (!raft_rpc_is_heartbeat(rpc) && !VLOG_DROP_DBG(&rl)) {
1318 struct ds s = DS_EMPTY_INITIALIZER;
02acb41a
BP
1319 if (line_number) {
1320 ds_put_format(&s, "raft.c:%d ", line_number);
1321 }
1322 ds_put_format(&s, "%s%s ", direction, conn->nickname);
1b1d2e6d 1323 raft_rpc_format(rpc, &s);
02acb41a 1324 VLOG_DBG("%s", ds_cstr(&s));
1b1d2e6d
BP
1325 ds_destroy(&s);
1326 }
1327}
1328
1329static void
1330raft_send_add_server_request(struct raft *raft, struct raft_conn *conn)
1331{
1332 union raft_rpc rq = {
1333 .add_server_request = {
1334 .common = {
1335 .type = RAFT_RPC_ADD_SERVER_REQUEST,
1336 .sid = UUID_ZERO,
1337 .comment = NULL,
1338 },
1339 .address = raft->local_address,
1340 },
1341 };
02acb41a 1342 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1343}
1344
1345static void
1346raft_conn_run(struct raft *raft, struct raft_conn *conn)
1347{
1348 jsonrpc_session_run(conn->js);
1349
1350 unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
1351 bool just_connected = (new_seqno != conn->js_seqno
1352 && jsonrpc_session_is_connected(conn->js));
1353 conn->js_seqno = new_seqno;
1354 if (just_connected) {
1355 if (raft->joining) {
1356 raft_send_add_server_request(raft, conn);
1357 } else if (raft->leaving) {
1358 union raft_rpc rq = {
1359 .remove_server_request = {
1360 .common = {
1361 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1362 .sid = conn->sid,
1363 },
1364 .sid = raft->sid,
1365 },
1366 };
02acb41a 1367 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1368 } else {
1369 union raft_rpc rq = (union raft_rpc) {
1370 .hello_request = {
1371 .common = {
1372 .type = RAFT_RPC_HELLO_REQUEST,
1373 .sid = conn->sid,
1374 },
1375 .address = raft->local_address,
1376 },
1377 };
02acb41a 1378 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1379 }
1380 }
1381
1382 for (size_t i = 0; i < 50; i++) {
1383 union raft_rpc rpc;
1384 if (!raft_conn_receive(raft, conn, &rpc)) {
1385 break;
1386 }
1387
02acb41a 1388 log_rpc(&rpc, "<--", conn, 0);
1b1d2e6d
BP
1389 raft_handle_rpc(raft, &rpc);
1390 raft_rpc_uninit(&rpc);
1391 }
1392}
1393
1394static void
1395raft_waiter_complete_rpc(struct raft *raft, const union raft_rpc *rpc)
1396{
1397 uint64_t term = raft_rpc_get_term(rpc);
1398 if (term && term < raft->term) {
1399 /* Drop the message because it's for an expired term. */
1400 return;
1401 }
1402
1403 if (!raft_is_rpc_synced(raft, rpc)) {
1404 /* This is a bug. A reply message is deferred because some state in
1405 * the message, such as a term or index, has not been committed to
1406 * disk, and they should only be completed when that commit is done.
1407 * But this message is being completed before the commit is finished.
1408 * Complain, and hope that someone reports the bug. */
1409 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1410 if (VLOG_DROP_ERR(&rl)) {
1411 return;
1412 }
1413
1414 struct ds s = DS_EMPTY_INITIALIZER;
1415
1416 if (term > raft->synced_term) {
1417 ds_put_format(&s, " because message term %"PRIu64" is "
1418 "past synced term %"PRIu64,
1419 term, raft->synced_term);
1420 }
1421
1422 uint64_t index = raft_rpc_get_min_sync_index(rpc);
1423 if (index > raft->log_synced) {
1424 ds_put_format(&s, " %s message index %"PRIu64" is past last "
1425 "synced index %"PRIu64,
1426 s.length ? "and" : "because",
1427 index, raft->log_synced);
1428 }
1429
1430 const struct uuid *vote = raft_rpc_get_vote(rpc);
1431 if (vote && !uuid_equals(vote, &raft->synced_vote)) {
1432 char buf1[SID_LEN + 1];
1433 char buf2[SID_LEN + 1];
1434 ds_put_format(&s, " %s vote %s differs from synced vote %s",
1435 s.length ? "and" : "because",
1436 raft_get_nickname(raft, vote, buf1, sizeof buf1),
1437 raft_get_nickname(raft, &raft->synced_vote,
1438 buf2, sizeof buf2));
1439 }
1440
1441 char buf[SID_LEN + 1];
1442 ds_put_format(&s, ": %s ",
1443 raft_get_nickname(raft, &rpc->common.sid,
1444 buf, sizeof buf));
1445 raft_rpc_format(rpc, &s);
1446 VLOG_ERR("internal error: deferred %s message completed "
1447 "but not ready to send%s",
1448 raft_rpc_type_to_string(rpc->type), ds_cstr(&s));
1449 ds_destroy(&s);
1450
1451 return;
1452 }
1453
1454 struct raft_conn *dst = raft_find_conn_by_sid(raft, &rpc->common.sid);
1455 if (dst) {
02acb41a 1456 raft_send_to_conn(raft, rpc, dst);
1b1d2e6d
BP
1457 }
1458}
1459
1460static void
1461raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
1462{
1463 switch (w->type) {
1464 case RAFT_W_ENTRY:
1465 if (raft->role == RAFT_LEADER) {
1466 raft_update_our_match_index(raft, w->entry.index);
1467 }
1468 raft->log_synced = w->entry.index;
1469 break;
1470
1471 case RAFT_W_TERM:
1472 raft->synced_term = w->term.term;
1473 raft->synced_vote = w->term.vote;
1474 break;
1475
1476 case RAFT_W_RPC:
1477 raft_waiter_complete_rpc(raft, w->rpc);
1478 break;
1479 }
1480}
1481
1482static void
1483raft_waiter_destroy(struct raft_waiter *w)
1484{
1485 if (!w) {
1486 return;
1487 }
1488
1489 ovs_list_remove(&w->list_node);
1490
1491 switch (w->type) {
1492 case RAFT_W_ENTRY:
1493 case RAFT_W_TERM:
1494 break;
1495
1496 case RAFT_W_RPC:
1497 raft_rpc_uninit(w->rpc);
1498 free(w->rpc);
1499 break;
1500 }
1501 free(w);
1502}
1503
1504static void
1505raft_waiters_run(struct raft *raft)
1506{
1507 if (ovs_list_is_empty(&raft->waiters)) {
1508 return;
1509 }
1510
1511 uint64_t cur = ovsdb_log_commit_progress(raft->log);
1512 struct raft_waiter *w, *next;
1513 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1514 if (cur < w->commit_ticket) {
1515 break;
1516 }
1517 raft_waiter_complete(raft, w);
1518 raft_waiter_destroy(w);
1519 }
1520}
1521
1522static void
1523raft_waiters_wait(struct raft *raft)
1524{
1525 struct raft_waiter *w;
1526 LIST_FOR_EACH (w, list_node, &raft->waiters) {
1527 ovsdb_log_commit_wait(raft->log, w->commit_ticket);
1528 break;
1529 }
1530}
1531
1532static void
1533raft_waiters_destroy(struct raft *raft)
1534{
1535 struct raft_waiter *w, *next;
1536 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1537 raft_waiter_destroy(w);
1538 }
1539}
1540
1541static bool OVS_WARN_UNUSED_RESULT
1542raft_set_term(struct raft *raft, uint64_t term, const struct uuid *vote)
1543{
1544 struct ovsdb_error *error = raft_write_state(raft->log, term, vote);
1545 if (!raft_handle_write_error(raft, error)) {
1546 return false;
1547 }
1548
1549 struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_TERM, true);
1550 raft->term = w->term.term = term;
1551 raft->vote = w->term.vote = vote ? *vote : UUID_ZERO;
1552 return true;
1553}
1554
1555static void
1556raft_accept_vote(struct raft *raft, struct raft_server *s,
1557 const struct uuid *vote)
1558{
1559 if (uuid_equals(&s->vote, vote)) {
1560 return;
1561 }
1562 if (!uuid_is_zero(&s->vote)) {
1563 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1564 char buf1[SID_LEN + 1];
1565 char buf2[SID_LEN + 1];
1566 VLOG_WARN_RL(&rl, "server %s changed its vote from %s to %s",
1567 s->nickname,
1568 raft_get_nickname(raft, &s->vote, buf1, sizeof buf1),
1569 raft_get_nickname(raft, vote, buf2, sizeof buf2));
1570 }
1571 s->vote = *vote;
1572 if (uuid_equals(vote, &raft->sid)
1573 && ++raft->n_votes > hmap_count(&raft->servers) / 2) {
1574 raft_become_leader(raft);
1575 }
1576}
1577
1578static void
1579raft_start_election(struct raft *raft, bool leadership_transfer)
1580{
1581 if (raft->leaving) {
1582 return;
1583 }
1584
1585 struct raft_server *me = raft_find_server(raft, &raft->sid);
1586 if (!me) {
1587 return;
1588 }
1589
1590 if (!raft_set_term(raft, raft->term + 1, &raft->sid)) {
1591 return;
1592 }
1593
1594 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
1595
1596 ovs_assert(raft->role != RAFT_LEADER);
1597 ovs_assert(hmap_is_empty(&raft->commands));
1598 raft->role = RAFT_CANDIDATE;
1599
1600 raft->n_votes = 0;
1601
1602 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1603 if (!VLOG_DROP_INFO(&rl)) {
1604 long long int now = time_msec();
1605 if (now >= raft->election_timeout) {
1606 VLOG_INFO("term %"PRIu64": %lld ms timeout expired, "
1607 "starting election",
1608 raft->term, now - raft->election_base);
1609 } else {
1610 VLOG_INFO("term %"PRIu64": starting election", raft->term);
1611 }
1612 }
0f954f32 1613 raft_reset_election_timer(raft);
1b1d2e6d
BP
1614
1615 struct raft_server *peer;
1616 HMAP_FOR_EACH (peer, hmap_node, &raft->servers) {
1617 peer->vote = UUID_ZERO;
1618 if (uuid_equals(&raft->sid, &peer->sid)) {
1619 continue;
1620 }
1621
1622 union raft_rpc rq = {
1623 .vote_request = {
1624 .common = {
1625 .type = RAFT_RPC_VOTE_REQUEST,
1626 .sid = peer->sid,
1627 },
1628 .term = raft->term,
1629 .last_log_index = raft->log_end - 1,
1630 .last_log_term = (
1631 raft->log_end > raft->log_start
1632 ? raft->entries[raft->log_end - raft->log_start - 1].term
1633 : raft->snap.term),
1634 .leadership_transfer = leadership_transfer,
1635 },
1636 };
1637 raft_send(raft, &rq);
1638 }
1639
1640 /* Vote for ourselves. */
1641 raft_accept_vote(raft, me, &raft->sid);
1642}
1643
1644static void
1645raft_open_conn(struct raft *raft, const char *address, const struct uuid *sid)
1646{
1647 if (strcmp(address, raft->local_address)
1648 && !raft_find_conn_by_address(raft, address)) {
1649 raft_add_conn(raft, jsonrpc_session_open(address, true), sid, false);
1650 }
1651}
1652
1653static void
1654raft_conn_close(struct raft_conn *conn)
1655{
1656 jsonrpc_session_close(conn->js);
1657 ovs_list_remove(&conn->list_node);
1658 free(conn->nickname);
1659 free(conn);
1660}
1661
1662/* Returns true if 'conn' should stay open, false if it should be closed. */
1663static bool
1664raft_conn_should_stay_open(struct raft *raft, struct raft_conn *conn)
1665{
1666 /* Close the connection if it's actually dead. If necessary, we'll
1667 * initiate a new session later. */
1668 if (!jsonrpc_session_is_alive(conn->js)) {
1669 return false;
1670 }
1671
1672 /* Keep incoming sessions. We trust the originator to decide to drop
1673 * it. */
1674 if (conn->incoming) {
1675 return true;
1676 }
1677
1678 /* If we are joining the cluster, keep sessions to the remote addresses
1679 * that are supposed to be part of the cluster we're joining. */
1680 if (raft->joining && sset_contains(&raft->remote_addresses,
1681 jsonrpc_session_get_name(conn->js))) {
1682 return true;
1683 }
1684
1685 /* We have joined the cluster. If we did that "recently", then there is a
1686 * chance that we do not have the most recent server configuration log
1687 * entry. If so, it's a waste to disconnect from the servers that were in
1688 * remote_addresses and that will probably appear in the configuration,
1689 * just to reconnect to them a moment later when we do get the
1690 * configuration update. If we are not ourselves in the configuration,
1691 * then we know that there must be a new configuration coming up, so in
1692 * that case keep the connection. */
1693 if (!raft_find_server(raft, &raft->sid)) {
1694 return true;
1695 }
1696
1697 /* Keep the connection only if the server is part of the configuration. */
1698 return raft_find_server(raft, &conn->sid);
1699}
1700
1701/* Allows 'raft' to maintain the distributed log. Call this function as part
1702 * of the process's main loop. */
1703void
1704raft_run(struct raft *raft)
1705{
1706 if (raft->left || raft->failed) {
1707 return;
1708 }
1709
1710 raft_waiters_run(raft);
1711
1712 if (!raft->listener && time_msec() >= raft->listen_backoff) {
1713 char *paddr = raft_make_address_passive(raft->local_address);
1714 int error = pstream_open(paddr, &raft->listener, DSCP_DEFAULT);
1715 if (error) {
1716 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1717 VLOG_WARN_RL(&rl, "%s: listen failed (%s)",
1718 paddr, ovs_strerror(error));
1719 raft->listen_backoff = time_msec() + 1000;
1720 }
1721 free(paddr);
1722 }
1723
1724 if (raft->listener) {
1725 struct stream *stream;
1726 int error = pstream_accept(raft->listener, &stream);
1727 if (!error) {
1728 raft_add_conn(raft, jsonrpc_session_open_unreliably(
1729 jsonrpc_open(stream), DSCP_DEFAULT), NULL,
1730 true);
1731 } else if (error != EAGAIN) {
1732 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1733 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
1734 pstream_get_name(raft->listener),
1735 ovs_strerror(error));
1736 }
1737 }
1738
1739 /* Run RPCs for all open sessions. */
1740 struct raft_conn *conn;
1741 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1742 raft_conn_run(raft, conn);
1743 }
1744
1745 /* Close unneeded sessions. */
1746 struct raft_conn *next;
1747 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1748 if (!raft_conn_should_stay_open(raft, conn)) {
1749 raft_conn_close(conn);
1750 }
1751 }
1752
1753 /* Open needed sessions. */
1754 struct raft_server *server;
1755 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1756 raft_open_conn(raft, server->address, &server->sid);
1757 }
1758 if (raft->joining) {
1759 const char *address;
1760 SSET_FOR_EACH (address, &raft->remote_addresses) {
1761 raft_open_conn(raft, address, NULL);
1762 }
1763 }
1764
1765 if (!raft->joining && time_msec() >= raft->election_timeout) {
1766 raft_start_election(raft, false);
1767 }
1768
1769 if (raft->leaving && time_msec() >= raft->leave_timeout) {
1770 raft_send_remove_server_requests(raft);
1771 }
1772
1773 if (raft->joining && time_msec() >= raft->join_timeout) {
1774 raft->join_timeout = time_msec() + 1000;
1775 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1776 raft_send_add_server_request(raft, conn);
1777 }
1778 }
1779
1780 if (time_msec() >= raft->ping_timeout) {
1781 if (raft->role == RAFT_LEADER) {
1782 raft_send_heartbeats(raft);
1783 } else {
1784 long long int now = time_msec();
1785 struct raft_command *cmd, *next_cmd;
1786 HMAP_FOR_EACH_SAFE (cmd, next_cmd, hmap_node, &raft->commands) {
1787 if (cmd->timestamp
1788 && now - cmd->timestamp > ELECTION_BASE_MSEC) {
1789 raft_command_complete(raft, cmd, RAFT_CMD_TIMEOUT);
1790 }
1791 }
0f954f32 1792 raft_reset_ping_timer(raft);
1b1d2e6d 1793 }
1b1d2e6d
BP
1794 }
1795
1796 /* Do this only at the end; if we did it as soon as we set raft->left or
1797 * raft->failed in handling the RemoveServerReply, then it could easily
1798 * cause references to freed memory in RPC sessions, etc. */
1799 if (raft->left || raft->failed) {
1800 raft_close__(raft);
1801 }
1802}
1803
1804static void
1805raft_wait_session(struct jsonrpc_session *js)
1806{
1807 if (js) {
1808 jsonrpc_session_wait(js);
1809 jsonrpc_session_recv_wait(js);
1810 }
1811}
1812
1813/* Causes the next call to poll_block() to wake up when 'raft' needs to do
1814 * something. */
1815void
1816raft_wait(struct raft *raft)
1817{
1818 if (raft->left || raft->failed) {
1819 return;
1820 }
1821
1822 raft_waiters_wait(raft);
1823
1824 if (raft->listener) {
1825 pstream_wait(raft->listener);
1826 } else {
1827 poll_timer_wait_until(raft->listen_backoff);
1828 }
1829
1830 struct raft_conn *conn;
1831 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1832 raft_wait_session(conn->js);
1833 }
1834
1835 if (!raft->joining) {
1836 poll_timer_wait_until(raft->election_timeout);
1837 } else {
1838 poll_timer_wait_until(raft->join_timeout);
1839 }
1840 if (raft->leaving) {
1841 poll_timer_wait_until(raft->leave_timeout);
1842 }
1843 if (raft->role == RAFT_LEADER || !hmap_is_empty(&raft->commands)) {
1844 poll_timer_wait_until(raft->ping_timeout);
1845 }
1846}
1847
1848static struct raft_waiter *
1849raft_waiter_create(struct raft *raft, enum raft_waiter_type type,
1850 bool start_commit)
1851{
1852 struct raft_waiter *w = xzalloc(sizeof *w);
1853 ovs_list_push_back(&raft->waiters, &w->list_node);
1854 w->commit_ticket = start_commit ? ovsdb_log_commit_start(raft->log) : 0;
1855 w->type = type;
1856 return w;
1857}
1858
1859/* Returns a human-readable representation of 'status' (or NULL if 'status' is
1860 * invalid). */
1861const char *
1862raft_command_status_to_string(enum raft_command_status status)
1863{
1864 switch (status) {
1865 case RAFT_CMD_INCOMPLETE:
1866 return "operation still in progress";
1867 case RAFT_CMD_SUCCESS:
1868 return "success";
1869 case RAFT_CMD_NOT_LEADER:
1870 return "not leader";
1871 case RAFT_CMD_BAD_PREREQ:
1872 return "prerequisite check failed";
1873 case RAFT_CMD_LOST_LEADERSHIP:
1874 return "lost leadership";
1875 case RAFT_CMD_SHUTDOWN:
1876 return "server shutdown";
1877 case RAFT_CMD_IO_ERROR:
1878 return "I/O error";
1879 case RAFT_CMD_TIMEOUT:
1880 return "timeout";
1881 default:
1882 return NULL;
1883 }
1884}
1885
1886/* Converts human-readable status in 's' into status code in '*statusp'.
1887 * Returns true if successful, false if 's' is unknown. */
1888bool
1889raft_command_status_from_string(const char *s,
1890 enum raft_command_status *statusp)
1891{
1892 for (enum raft_command_status status = 0; ; status++) {
1893 const char *s2 = raft_command_status_to_string(status);
1894 if (!s2) {
1895 *statusp = 0;
1896 return false;
1897 } else if (!strcmp(s, s2)) {
1898 *statusp = status;
1899 return true;
1900 }
1901 }
1902}
1903
1904static const struct uuid *
1905raft_get_eid(const struct raft *raft, uint64_t index)
1906{
1907 for (; index >= raft->log_start; index--) {
1908 const struct raft_entry *e = raft_get_entry(raft, index);
1909 if (e->data) {
1910 return &e->eid;
1911 }
1912 }
1913 return &raft->snap.eid;
1914}
1915
2cd62f75 1916const struct uuid *
1b1d2e6d
BP
1917raft_current_eid(const struct raft *raft)
1918{
1919 return raft_get_eid(raft, raft->log_end - 1);
1920}
1921
1922static struct raft_command *
1923raft_command_create_completed(enum raft_command_status status)
1924{
1925 ovs_assert(status != RAFT_CMD_INCOMPLETE);
1926
1927 struct raft_command *cmd = xzalloc(sizeof *cmd);
1928 cmd->n_refs = 1;
1929 cmd->status = status;
1930 return cmd;
1931}
1932
1933static struct raft_command *
1934raft_command_create_incomplete(struct raft *raft, uint64_t index)
1935{
1936 struct raft_command *cmd = xzalloc(sizeof *cmd);
1937 cmd->n_refs = 2; /* One for client, one for raft->commands. */
1938 cmd->index = index;
1939 cmd->status = RAFT_CMD_INCOMPLETE;
1940 hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
1941 return cmd;
1942}
1943
1944static struct raft_command * OVS_WARN_UNUSED_RESULT
1945raft_command_initiate(struct raft *raft,
1946 const struct json *data, const struct json *servers,
1947 const struct uuid *eid)
1948{
1949 /* Write to local log. */
1950 uint64_t index = raft->log_end;
1951 if (!raft_handle_write_error(
1952 raft, raft_write_entry(
1953 raft, raft->term, json_nullable_clone(data), eid,
1954 json_nullable_clone(servers)))) {
1955 return raft_command_create_completed(RAFT_CMD_IO_ERROR);
1956 }
1957
1958 struct raft_command *cmd = raft_command_create_incomplete(raft, index);
1959 if (eid) {
1960 cmd->eid = *eid;
1961 }
1962
1963 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index = cmd->index;
1964
1965 /* Write to remote logs. */
1966 struct raft_server *s;
1967 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1968 if (!uuid_equals(&s->sid, &raft->sid) && s->next_index == index) {
1969 raft_send_append_request(raft, s, 1, "execute command");
1970 s->next_index++;
1971 }
1972 }
0f954f32 1973 raft_reset_ping_timer(raft);
1b1d2e6d
BP
1974
1975 return cmd;
1976}
1977
1978static struct raft_command * OVS_WARN_UNUSED_RESULT
1979raft_command_execute__(struct raft *raft,
1980 const struct json *data, const struct json *servers,
1981 const struct uuid *prereq, struct uuid *result)
1982{
1983 if (raft->joining || raft->leaving || raft->left || raft->failed) {
1984 return raft_command_create_completed(RAFT_CMD_SHUTDOWN);
1985 }
1986
1987 if (raft->role != RAFT_LEADER) {
1988 /* Consider proxying the command to the leader. We can only do that if
1989 * we know the leader and the command does not change the set of
1990 * servers. We do not proxy commands without prerequisites, even
1991 * though we could, because in an OVSDB context a log entry doesn't
1992 * make sense without context. */
1993 if (servers || !data
1994 || raft->role != RAFT_FOLLOWER || uuid_is_zero(&raft->leader_sid)
1995 || !prereq) {
1996 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
1997 }
1998 }
1999
2000 struct uuid eid = data ? uuid_random() : UUID_ZERO;
2001 if (result) {
2002 *result = eid;
2003 }
2004
2005 if (raft->role != RAFT_LEADER) {
2006 const union raft_rpc rpc = {
2007 .execute_command_request = {
2008 .common = {
2009 .type = RAFT_RPC_EXECUTE_COMMAND_REQUEST,
2010 .sid = raft->leader_sid,
2011 },
2012 .data = CONST_CAST(struct json *, data),
2013 .prereq = *prereq,
2014 .result = eid,
2015 }
2016 };
2017 if (!raft_send(raft, &rpc)) {
2018 /* Couldn't send command, so it definitely failed. */
2019 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2020 }
2021
2022 struct raft_command *cmd = raft_command_create_incomplete(raft, 0);
2023 cmd->timestamp = time_msec();
2024 cmd->eid = eid;
2025 return cmd;
2026 }
2027
2028 const struct uuid *current_eid = raft_current_eid(raft);
2029 if (prereq && !uuid_equals(prereq, current_eid)) {
2030 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2031 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
2032 "prerequisite "UUID_FMT,
2033 UUID_ARGS(current_eid), UUID_ARGS(prereq));
2034 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ);
2035 }
2036
2037 return raft_command_initiate(raft, data, servers, &eid);
2038}
2039
2040/* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2041 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2042 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2043 * populated with the entry ID for the new log entry.
2044 *
2045 * Returns a "struct raft_command" that may be used to track progress adding
2046 * the log entry. The caller must eventually free the returned structure, with
2047 * raft_command_unref(). */
2048struct raft_command * OVS_WARN_UNUSED_RESULT
2049raft_command_execute(struct raft *raft, const struct json *data,
2050 const struct uuid *prereq, struct uuid *result)
2051{
2052 return raft_command_execute__(raft, data, NULL, prereq, result);
2053}
2054
2055/* Returns the status of 'cmd'. */
2056enum raft_command_status
2057raft_command_get_status(const struct raft_command *cmd)
2058{
2059 ovs_assert(cmd->n_refs > 0);
2060 return cmd->status;
2061}
2062
2063/* Returns the index of the log entry at which 'cmd' was committed.
2064 *
2065 * This function works only with successful commands. */
2066uint64_t
2067raft_command_get_commit_index(const struct raft_command *cmd)
2068{
2069 ovs_assert(cmd->n_refs > 0);
2070 ovs_assert(cmd->status == RAFT_CMD_SUCCESS);
2071 return cmd->index;
2072}
2073
2074/* Frees 'cmd'. */
2075void
2076raft_command_unref(struct raft_command *cmd)
2077{
2078 if (cmd) {
2079 ovs_assert(cmd->n_refs > 0);
2080 if (!--cmd->n_refs) {
2081 free(cmd);
2082 }
2083 }
2084}
2085
2086/* Causes poll_block() to wake up when 'cmd' has status to report. */
2087void
2088raft_command_wait(const struct raft_command *cmd)
2089{
2090 if (cmd->status != RAFT_CMD_INCOMPLETE) {
2091 poll_immediate_wake();
2092 }
2093}
2094
2095static void
2096raft_command_complete(struct raft *raft,
2097 struct raft_command *cmd,
2098 enum raft_command_status status)
2099{
2100 if (!uuid_is_zero(&cmd->sid)) {
2101 uint64_t commit_index = status == RAFT_CMD_SUCCESS ? cmd->index : 0;
2102 raft_send_execute_command_reply(raft, &cmd->sid, &cmd->eid, status,
2103 commit_index);
2104 }
2105
2106 ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
2107 ovs_assert(cmd->n_refs > 0);
2108 hmap_remove(&raft->commands, &cmd->hmap_node);
2109 cmd->status = status;
2110 raft_command_unref(cmd);
2111}
2112
2113static void
2114raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
2115{
2116 struct raft_command *cmd, *next;
2117 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2118 raft_command_complete(raft, cmd, status);
2119 }
2120}
2121
2122static struct raft_command *
2123raft_find_command_by_index(struct raft *raft, uint64_t index)
2124{
2125 struct raft_command *cmd;
2126
2127 HMAP_FOR_EACH_IN_BUCKET (cmd, hmap_node, index, &raft->commands) {
2128 if (cmd->index == index) {
2129 return cmd;
2130 }
2131 }
2132 return NULL;
2133}
2134
2135static struct raft_command *
2136raft_find_command_by_eid(struct raft *raft, const struct uuid *eid)
2137{
2138 struct raft_command *cmd;
2139
2140 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2141 if (uuid_equals(&cmd->eid, eid)) {
2142 return cmd;
2143 }
2144 }
2145 return NULL;
2146}
2147\f
2148#define RAFT_RPC(ENUM, NAME) \
2149 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2150RAFT_RPC_TYPES
2151#undef RAFT_RPC
2152
2153static void
2154raft_handle_hello_request(struct raft *raft OVS_UNUSED,
2155 const struct raft_hello_request *hello OVS_UNUSED)
2156{
2157}
2158
2159/* 'sid' is the server being added. */
2160static void
2161raft_send_add_server_reply__(struct raft *raft, const struct uuid *sid,
2162 const char *address,
2163 bool success, const char *comment)
2164{
2165 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2166 if (!VLOG_DROP_INFO(&rl)) {
2167 struct ds s = DS_EMPTY_INITIALIZER;
2168 char buf[SID_LEN + 1];
2169 ds_put_format(&s, "adding %s ("SID_FMT" at %s) "
2170 "to cluster "CID_FMT" %s",
2171 raft_get_nickname(raft, sid, buf, sizeof buf),
2172 SID_ARGS(sid), address, CID_ARGS(&raft->cid),
2173 success ? "succeeded" : "failed");
2174 if (comment) {
2175 ds_put_format(&s, " (%s)", comment);
2176 }
2177 VLOG_INFO("%s", ds_cstr(&s));
2178 ds_destroy(&s);
2179 }
2180
2181 union raft_rpc rpy = {
2182 .add_server_reply = {
2183 .common = {
2184 .type = RAFT_RPC_ADD_SERVER_REPLY,
2185 .sid = *sid,
2186 .comment = CONST_CAST(char *, comment),
2187 },
2188 .success = success,
2189 }
2190 };
2191
2192 struct sset *remote_addresses = &rpy.add_server_reply.remote_addresses;
2193 sset_init(remote_addresses);
2194 if (!raft->joining) {
2195 struct raft_server *s;
2196 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2197 if (!uuid_equals(&s->sid, &raft->sid)) {
2198 sset_add(remote_addresses, s->address);
2199 }
2200 }
2201 }
2202
2203 raft_send(raft, &rpy);
2204
2205 sset_destroy(remote_addresses);
2206}
2207
2208static void
17bd4149
BP
2209raft_send_remove_server_reply_rpc(struct raft *raft,
2210 const struct uuid *dst_sid,
2211 const struct uuid *target_sid,
1b1d2e6d
BP
2212 bool success, const char *comment)
2213{
17bd4149
BP
2214 if (uuid_equals(&raft->sid, dst_sid)) {
2215 if (success && uuid_equals(&raft->sid, target_sid)) {
2216 raft_finished_leaving_cluster(raft);
2217 }
2218 return;
2219 }
2220
1b1d2e6d
BP
2221 const union raft_rpc rpy = {
2222 .remove_server_reply = {
2223 .common = {
2224 .type = RAFT_RPC_REMOVE_SERVER_REPLY,
17bd4149 2225 .sid = *dst_sid,
1b1d2e6d
BP
2226 .comment = CONST_CAST(char *, comment),
2227 },
17bd4149
BP
2228 .target_sid = (uuid_equals(dst_sid, target_sid)
2229 ? UUID_ZERO
2230 : *target_sid),
1b1d2e6d
BP
2231 .success = success,
2232 }
2233 };
2234 raft_send(raft, &rpy);
2235}
2236
2237static void
2238raft_send_remove_server_reply__(struct raft *raft,
2239 const struct uuid *target_sid,
2240 const struct uuid *requester_sid,
2241 struct unixctl_conn *requester_conn,
2242 bool success, const char *comment)
2243{
2244 struct ds s = DS_EMPTY_INITIALIZER;
2245 ds_put_format(&s, "request ");
2246 if (!uuid_is_zero(requester_sid)) {
2247 char buf[SID_LEN + 1];
2248 ds_put_format(&s, "by %s",
2249 raft_get_nickname(raft, requester_sid, buf, sizeof buf));
2250 } else {
2251 ds_put_cstr(&s, "via unixctl");
2252 }
2253 ds_put_cstr(&s, " to remove ");
2254 if (!requester_conn && uuid_equals(target_sid, requester_sid)) {
2255 ds_put_cstr(&s, "itself");
2256 } else {
2257 char buf[SID_LEN + 1];
2258 ds_put_cstr(&s, raft_get_nickname(raft, target_sid, buf, sizeof buf));
17bd4149
BP
2259 if (uuid_equals(target_sid, &raft->sid)) {
2260 ds_put_cstr(&s, " (ourselves)");
2261 }
1b1d2e6d
BP
2262 }
2263 ds_put_format(&s, " from cluster "CID_FMT" %s",
2264 CID_ARGS(&raft->cid),
2265 success ? "succeeded" : "failed");
2266 if (comment) {
2267 ds_put_format(&s, " (%s)", comment);
2268 }
2269
2270 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2271 VLOG_INFO_RL(&rl, "%s", ds_cstr(&s));
2272
2273 /* Send RemoveServerReply to the requester (which could be a server or a
2274 * unixctl connection. Also always send it to the removed server; this
2275 * allows it to be sure that it's really removed and update its log and
2276 * disconnect permanently. */
2277 if (!uuid_is_zero(requester_sid)) {
17bd4149 2278 raft_send_remove_server_reply_rpc(raft, requester_sid, target_sid,
1b1d2e6d
BP
2279 success, comment);
2280 }
2281 if (!uuid_equals(requester_sid, target_sid)) {
17bd4149
BP
2282 raft_send_remove_server_reply_rpc(raft, target_sid, target_sid,
2283 success, comment);
1b1d2e6d
BP
2284 }
2285 if (requester_conn) {
2286 if (success) {
2287 unixctl_command_reply(requester_conn, ds_cstr(&s));
2288 } else {
2289 unixctl_command_reply_error(requester_conn, ds_cstr(&s));
2290 }
2291 }
2292
2293 ds_destroy(&s);
2294}
2295
2296static void
2297raft_send_add_server_reply(struct raft *raft,
2298 const struct raft_add_server_request *rq,
2299 bool success, const char *comment)
2300{
2301 return raft_send_add_server_reply__(raft, &rq->common.sid, rq->address,
2302 success, comment);
2303}
2304
2305static void
2306raft_send_remove_server_reply(struct raft *raft,
2307 const struct raft_remove_server_request *rq,
2308 bool success, const char *comment)
2309{
2310 return raft_send_remove_server_reply__(raft, &rq->sid, &rq->common.sid,
2311 rq->requester_conn, success,
2312 comment);
2313}
2314
2315static void
2316raft_become_follower(struct raft *raft)
2317{
2318 raft->leader_sid = UUID_ZERO;
2319 if (raft->role == RAFT_FOLLOWER) {
2320 return;
2321 }
2322
2323 raft->role = RAFT_FOLLOWER;
0f954f32 2324 raft_reset_election_timer(raft);
1b1d2e6d
BP
2325
2326 /* Notify clients about lost leadership.
2327 *
2328 * We do not reverse our changes to 'raft->servers' because the new
2329 * configuration is already part of the log. Possibly the configuration
2330 * log entry will not be committed, but until we know that we must use the
2331 * new configuration. Our AppendEntries processing will properly update
2332 * the server configuration later, if necessary. */
2333 struct raft_server *s;
2334 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
2335 raft_send_add_server_reply__(raft, &s->sid, s->address, false,
2336 RAFT_SERVER_LOST_LEADERSHIP);
2337 }
2338 if (raft->remove_server) {
2339 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
2340 &raft->remove_server->requester_sid,
2341 raft->remove_server->requester_conn,
2342 false, RAFT_SERVER_LOST_LEADERSHIP);
2343 raft_server_destroy(raft->remove_server);
2344 raft->remove_server = NULL;
2345 }
2346
2347 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2348}
2349
2350static void
2351raft_send_append_request(struct raft *raft,
2352 struct raft_server *peer, unsigned int n,
2353 const char *comment)
2354{
2355 ovs_assert(raft->role == RAFT_LEADER);
2356
2357 const union raft_rpc rq = {
2358 .append_request = {
2359 .common = {
2360 .type = RAFT_RPC_APPEND_REQUEST,
2361 .sid = peer->sid,
2362 .comment = CONST_CAST(char *, comment),
2363 },
2364 .term = raft->term,
2365 .prev_log_index = peer->next_index - 1,
2366 .prev_log_term = (peer->next_index - 1 >= raft->log_start
2367 ? raft->entries[peer->next_index - 1
2368 - raft->log_start].term
2369 : raft->snap.term),
2370 .leader_commit = raft->commit_index,
2371 .entries = &raft->entries[peer->next_index - raft->log_start],
2372 .n_entries = n,
2373 },
2374 };
2375 raft_send(raft, &rq);
2376}
2377
2378static void
2379raft_send_heartbeats(struct raft *raft)
2380{
2381 struct raft_server *s;
2382 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2383 if (!uuid_equals(&raft->sid, &s->sid)) {
2384 raft_send_append_request(raft, s, 0, "heartbeat");
2385 }
2386 }
2387
2388 /* Send anyone waiting for a command to complete a ping to let them
2389 * know we're still working on it. */
2390 struct raft_command *cmd;
2391 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2392 if (!uuid_is_zero(&cmd->sid)) {
2393 raft_send_execute_command_reply(raft, &cmd->sid,
2394 &cmd->eid,
2395 RAFT_CMD_INCOMPLETE, 0);
2396 }
2397 }
0f954f32
HZ
2398
2399 raft_reset_ping_timer(raft);
1b1d2e6d
BP
2400}
2401
2402/* Initializes the fields in 's' that represent the leader's view of the
2403 * server. */
2404static void
2405raft_server_init_leader(struct raft *raft, struct raft_server *s)
2406{
2407 s->next_index = raft->log_end;
2408 s->match_index = 0;
2409 s->phase = RAFT_PHASE_STABLE;
2410}
2411
2412static void
2413raft_become_leader(struct raft *raft)
2414{
2415 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2416
2417 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
2418 VLOG_INFO_RL(&rl, "term %"PRIu64": elected leader by %d+ of "
2419 "%"PRIuSIZE" servers", raft->term,
2420 raft->n_votes, hmap_count(&raft->servers));
2421
2422 ovs_assert(raft->role != RAFT_LEADER);
2423 raft->role = RAFT_LEADER;
2424 raft->leader_sid = raft->sid;
2425 raft->election_timeout = LLONG_MAX;
0f954f32 2426 raft_reset_ping_timer(raft);
1b1d2e6d
BP
2427
2428 struct raft_server *s;
2429 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2430 raft_server_init_leader(raft, s);
2431 }
2432
2433 raft_update_our_match_index(raft, raft->log_end - 1);
2434 raft_send_heartbeats(raft);
2435
2436 /* Write the fact that we are leader to the log. This is not used by the
2437 * algorithm (although it could be, for quick restart), but it is used for
2438 * offline analysis to check for conformance with the properties that Raft
2439 * guarantees. */
2440 struct raft_record r = {
2441 .type = RAFT_REC_LEADER,
2442 .term = raft->term,
2443 .sid = raft->sid,
2444 };
2445 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2446
2447 /* Initiate a no-op commit. Otherwise we might never find out what's in
2448 * the log. See section 6.4 item 1:
2449 *
2450 * The Leader Completeness Property guarantees that a leader has all
2451 * committed entries, but at the start of its term, it may not know
2452 * which those are. To find out, it needs to commit an entry from its
2453 * term. Raft handles this by having each leader commit a blank no-op
2454 * entry into the log at the start of its term. As soon as this no-op
2455 * entry is committed, the leader’s commit index will be at least as
2456 * large as any other servers’ during its term.
2457 */
2458 raft_command_unref(raft_command_execute__(raft, NULL, NULL, NULL, NULL));
2459}
2460
2461/* Processes term 'term' received as part of RPC 'common'. Returns true if the
2462 * caller should continue processing the RPC, false if the caller should reject
2463 * it due to a stale term. */
2464static bool
2465raft_receive_term__(struct raft *raft, const struct raft_rpc_common *common,
2466 uint64_t term)
2467{
2468 /* Section 3.3 says:
2469 *
2470 * Current terms are exchanged whenever servers communicate; if one
2471 * server’s current term is smaller than the other’s, then it updates
2472 * its current term to the larger value. If a candidate or leader
2473 * discovers that its term is out of date, it immediately reverts to
2474 * follower state. If a server receives a request with a stale term
2475 * number, it rejects the request.
2476 */
2477 if (term > raft->term) {
2478 if (!raft_set_term(raft, term, NULL)) {
2479 /* Failed to update the term to 'term'. */
2480 return false;
2481 }
2482 raft_become_follower(raft);
2483 } else if (term < raft->term) {
2484 char buf[SID_LEN + 1];
2485 VLOG_INFO("rejecting term %"PRIu64" < current term %"PRIu64" received "
2486 "in %s message from server %s",
2487 term, raft->term,
2488 raft_rpc_type_to_string(common->type),
2489 raft_get_nickname(raft, &common->sid, buf, sizeof buf));
2490 return false;
2491 }
2492 return true;
2493}
2494
2495static void
2496raft_get_servers_from_log(struct raft *raft, enum vlog_level level)
2497{
2498 const struct json *servers_json = raft->snap.servers;
2499 for (uint64_t index = raft->log_end - 1; index >= raft->log_start;
2500 index--) {
2501 struct raft_entry *e = &raft->entries[index - raft->log_start];
2502 if (e->servers) {
2503 servers_json = e->servers;
2504 break;
2505 }
2506 }
2507
2508 struct hmap servers;
2509 struct ovsdb_error *error = raft_servers_from_json(servers_json, &servers);
2510 ovs_assert(!error);
2511 raft_set_servers(raft, &servers, level);
2512 raft_servers_destroy(&servers);
2513}
2514
2515/* Truncates the log, so that raft->log_end becomes 'new_end'.
2516 *
2517 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2518 * log file, but we don't have the right information to know how long it should
2519 * be. What we actually do is to append entries for older indexes to the
2520 * on-disk log; when we re-read it later, these entries truncate the log.
2521 *
2522 * Returns true if any of the removed log entries were server configuration
2523 * entries, false otherwise. */
2524static bool
2525raft_truncate(struct raft *raft, uint64_t new_end)
2526{
2527 ovs_assert(new_end >= raft->log_start);
2528 if (raft->log_end > new_end) {
2529 char buf[SID_LEN + 1];
2530 VLOG_INFO("%s truncating %"PRIu64 " entries from end of log",
2531 raft_get_nickname(raft, &raft->sid, buf, sizeof buf),
2532 raft->log_end - new_end);
2533 }
2534
2535 bool servers_changed = false;
2536 while (raft->log_end > new_end) {
2537 struct raft_entry *entry = &raft->entries[--raft->log_end
2538 - raft->log_start];
2539 if (entry->servers) {
2540 servers_changed = true;
2541 }
2542 raft_entry_uninit(entry);
2543 }
2544 return servers_changed;
2545}
2546
2547static const struct json *
2548raft_peek_next_entry(struct raft *raft, struct uuid *eid)
2549{
2550 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2551 ovs_assert(raft->log_start <= raft->last_applied + 2);
2552 ovs_assert(raft->last_applied <= raft->commit_index);
2553 ovs_assert(raft->commit_index < raft->log_end);
2554
2555 if (raft->joining || raft->failed) {
2556 return NULL;
2557 }
2558
2559 if (raft->log_start == raft->last_applied + 2) {
2560 *eid = raft->snap.eid;
2561 return raft->snap.data;
2562 }
2563
2564 while (raft->last_applied < raft->commit_index) {
2565 const struct raft_entry *e = raft_get_entry(raft,
2566 raft->last_applied + 1);
2567 if (e->data) {
2568 *eid = e->eid;
2569 return e->data;
2570 }
2571 raft->last_applied++;
2572 }
2573 return NULL;
2574}
2575
2576static const struct json *
2577raft_get_next_entry(struct raft *raft, struct uuid *eid)
2578{
2579 const struct json *data = raft_peek_next_entry(raft, eid);
2580 if (data) {
2581 raft->last_applied++;
2582 }
2583 return data;
2584}
2585
0f954f32
HZ
2586/* Updates commit index in raft log. If commit index is already up-to-date
2587 * it does nothing and return false, otherwise, returns true. */
2588static bool
1b1d2e6d
BP
2589raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
2590{
2591 if (new_commit_index <= raft->commit_index) {
0f954f32 2592 return false;
1b1d2e6d
BP
2593 }
2594
2595 if (raft->role == RAFT_LEADER) {
2596 while (raft->commit_index < new_commit_index) {
2597 uint64_t index = ++raft->commit_index;
2598 const struct raft_entry *e = raft_get_entry(raft, index);
1b1d2e6d
BP
2599 if (e->data) {
2600 struct raft_command *cmd
2601 = raft_find_command_by_index(raft, index);
2602 if (cmd) {
2603 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2604 }
2605 }
8b37ed75
BP
2606 if (e->servers) {
2607 /* raft_run_reconfigure() can write a new Raft entry, which can
2608 * reallocate raft->entries, which would invalidate 'e', so
2609 * this case must be last, after the one for 'e->data'. */
2610 raft_run_reconfigure(raft);
2611 }
1b1d2e6d
BP
2612 }
2613 } else {
2614 raft->commit_index = new_commit_index;
2615 }
2616
2617 /* Write the commit index to the log. The next time we restart, this
2618 * allows us to start exporting a reasonably fresh log, instead of a log
2619 * that only contains the snapshot. */
2620 struct raft_record r = {
2621 .type = RAFT_REC_COMMIT_INDEX,
2622 .commit_index = raft->commit_index,
2623 };
2624 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
0f954f32 2625 return true;
1b1d2e6d
BP
2626}
2627
2628/* This doesn't use rq->entries (but it does use rq->n_entries). */
2629static void
2630raft_send_append_reply(struct raft *raft, const struct raft_append_request *rq,
2631 enum raft_append_result result, const char *comment)
2632{
2633 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2634 * min(leaderCommit, index of last new entry)" */
2635 if (result == RAFT_APPEND_OK && rq->leader_commit > raft->commit_index) {
2636 raft_update_commit_index(
2637 raft, MIN(rq->leader_commit, rq->prev_log_index + rq->n_entries));
2638 }
2639
2640 /* Send reply. */
2641 union raft_rpc reply = {
2642 .append_reply = {
2643 .common = {
2644 .type = RAFT_RPC_APPEND_REPLY,
2645 .sid = rq->common.sid,
2646 .comment = CONST_CAST(char *, comment),
2647 },
2648 .term = raft->term,
2649 .log_end = raft->log_end,
2650 .prev_log_index = rq->prev_log_index,
2651 .prev_log_term = rq->prev_log_term,
2652 .n_entries = rq->n_entries,
2653 .result = result,
2654 }
2655 };
2656 raft_send(raft, &reply);
2657}
2658
2659/* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2660 * NULL. Otherwise, returns an explanation for the mismatch. */
2661static const char *
2662match_index_and_term(const struct raft *raft,
2663 uint64_t prev_log_index, uint64_t prev_log_term)
2664{
2665 if (prev_log_index < raft->log_start - 1) {
2666 return "mismatch before start of log";
2667 } else if (prev_log_index == raft->log_start - 1) {
2668 if (prev_log_term != raft->snap.term) {
2669 return "prev_term mismatch";
2670 }
2671 } else if (prev_log_index < raft->log_end) {
2672 if (raft->entries[prev_log_index - raft->log_start].term
2673 != prev_log_term) {
2674 return "term mismatch";
2675 }
2676 } else {
2677 /* prev_log_index >= raft->log_end */
2678 return "mismatch past end of log";
2679 }
2680 return NULL;
2681}
2682
2683static void
2684raft_handle_append_entries(struct raft *raft,
2685 const struct raft_append_request *rq,
2686 uint64_t prev_log_index, uint64_t prev_log_term,
2687 const struct raft_entry *entries,
2688 unsigned int n_entries)
2689{
2690 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2691 * the index and term of the entry in its log that immediately precedes
2692 * the new entries. If the follower does not find an entry in its log
2693 * with the same index and term, then it refuses the new entries." */
2694 const char *mismatch = match_index_and_term(raft, prev_log_index,
2695 prev_log_term);
2696 if (mismatch) {
2697 VLOG_INFO("rejecting append_request because previous entry "
2698 "%"PRIu64",%"PRIu64" not in local log (%s)",
2699 prev_log_term, prev_log_index, mismatch);
2700 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY, mismatch);
2701 return;
2702 }
2703
2704 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2705 * index but different terms), delete the existing entry and all that
2706 * follow it." */
2707 unsigned int i;
2708 bool servers_changed = false;
2709 for (i = 0; ; i++) {
2710 if (i >= n_entries) {
2711 /* No change. */
2712 if (rq->common.comment
2713 && !strcmp(rq->common.comment, "heartbeat")) {
2714 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "heartbeat");
2715 } else {
2716 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2717 }
2718 return;
2719 }
2720
2721 uint64_t log_index = (prev_log_index + 1) + i;
2722 if (log_index >= raft->log_end) {
2723 break;
2724 }
2725 if (raft->entries[log_index - raft->log_start].term
2726 != entries[i].term) {
2727 if (raft_truncate(raft, log_index)) {
2728 servers_changed = true;
2729 }
2730 break;
2731 }
2732 }
2733
2734 /* Figure 3.1: "Append any entries not already in the log." */
2735 struct ovsdb_error *error = NULL;
2736 bool any_written = false;
2737 for (; i < n_entries; i++) {
2738 const struct raft_entry *e = &entries[i];
2739 error = raft_write_entry(raft, e->term,
2740 json_nullable_clone(e->data), &e->eid,
2741 json_nullable_clone(e->servers));
2742 if (error) {
2743 break;
2744 }
2745 any_written = true;
2746 if (e->servers) {
2747 servers_changed = true;
2748 }
2749 }
2750
2751 if (any_written) {
2752 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index
2753 = raft->log_end - 1;
2754 }
2755 if (servers_changed) {
2756 /* The set of servers might have changed; check. */
2757 raft_get_servers_from_log(raft, VLL_INFO);
2758 }
2759
2760 if (error) {
2761 char *s = ovsdb_error_to_string_free(error);
2762 VLOG_ERR("%s", s);
2763 free(s);
2764 raft_send_append_reply(raft, rq, RAFT_APPEND_IO_ERROR, "I/O error");
2765 return;
2766 }
2767
2768 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "log updated");
2769}
2770
2771static bool
2772raft_update_leader(struct raft *raft, const struct uuid *sid)
2773{
2774 if (raft->role == RAFT_LEADER) {
2775 char buf[SID_LEN + 1];
2776 VLOG_ERR("this server is leader but server %s claims to be",
2777 raft_get_nickname(raft, sid, buf, sizeof buf));
2778 return false;
2779 } else if (!uuid_equals(sid, &raft->leader_sid)) {
2780 if (!uuid_is_zero(&raft->leader_sid)) {
2781 char buf1[SID_LEN + 1];
2782 char buf2[SID_LEN + 1];
2783 VLOG_ERR("leader for term %"PRIu64" changed from %s to %s",
2784 raft->term,
2785 raft_get_nickname(raft, &raft->leader_sid,
2786 buf1, sizeof buf1),
2787 raft_get_nickname(raft, sid, buf2, sizeof buf2));
2788 } else {
2789 char buf[SID_LEN + 1];
2790 VLOG_INFO("server %s is leader for term %"PRIu64,
2791 raft_get_nickname(raft, sid, buf, sizeof buf),
2792 raft->term);
2793 }
2794 raft->leader_sid = *sid;
2795
2796 /* Record the leader to the log. This is not used by the algorithm
2797 * (although it could be, for quick restart), but it is used for
2798 * offline analysis to check for conformance with the properties
2799 * that Raft guarantees. */
2800 struct raft_record r = {
2801 .type = RAFT_REC_LEADER,
2802 .term = raft->term,
2803 .sid = *sid,
2804 };
2805 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2806 }
2807 return true;
2808}
2809
2810static void
2811raft_handle_append_request(struct raft *raft,
2812 const struct raft_append_request *rq)
2813{
2814 /* We do not check whether the server that sent the request is part of the
2815 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
2816 * from a leader that is not part of the server’s latest configuration.
2817 * Otherwise, a new server could never be added to the cluster (it would
2818 * never accept any log entries preceding the configuration entry that adds
2819 * the server)." */
2820 if (!raft_update_leader(raft, &rq->common.sid)) {
2821 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
2822 "usurped leadership");
2823 return;
2824 }
0f954f32 2825 raft_reset_election_timer(raft);
1b1d2e6d
BP
2826
2827 /* First check for the common case, where the AppendEntries request is
2828 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
2829 * like this:
2830 *
2831 * rq->prev_log_index
2832 * | first_entry_index
2833 * | | nth_entry_index
2834 * | | |
2835 * v v v
2836 * +---+---+---+---+
2837 * T | T | T | T | T |
2838 * +---+-------+---+
2839 * +---+---+---+---+
2840 * T | T | T | T | T |
2841 * +---+---+---+---+
2842 * ^ ^
2843 * | |
2844 * log_start log_end
2845 * */
2846 uint64_t first_entry_index = rq->prev_log_index + 1;
2847 uint64_t nth_entry_index = rq->prev_log_index + rq->n_entries;
2848 if (OVS_LIKELY(first_entry_index >= raft->log_start)) {
2849 raft_handle_append_entries(raft, rq,
2850 rq->prev_log_index, rq->prev_log_term,
2851 rq->entries, rq->n_entries);
2852 return;
2853 }
2854
2855 /* Now a series of checks for odd cases, where the AppendEntries request
2856 * extends earlier than the beginning of our log, into the log entries
2857 * discarded by the most recent snapshot. */
2858
2859 /*
2860 * Handle the case where the indexes covered by rq->entries[] are entirely
2861 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
2862 * everything in the AppendEntries request must already have been
2863 * committed, and we might as well return true.
2864 *
2865 * rq->prev_log_index
2866 * | first_entry_index
2867 * | | nth_entry_index
2868 * | | |
2869 * v v v
2870 * +---+---+---+---+
2871 * T | T | T | T | T |
2872 * +---+-------+---+
2873 * +---+---+---+---+
2874 * T | T | T | T | T |
2875 * +---+---+---+---+
2876 * ^ ^
2877 * | |
2878 * log_start log_end
2879 */
2880 if (nth_entry_index < raft->log_start - 1) {
2881 raft_send_append_reply(raft, rq, RAFT_APPEND_OK,
2882 "append before log start");
2883 return;
2884 }
2885
2886 /*
2887 * Handle the case where the last entry in rq->entries[] has the same index
2888 * as 'log_start - 1', so we can compare their terms:
2889 *
2890 * rq->prev_log_index
2891 * | first_entry_index
2892 * | | nth_entry_index
2893 * | | |
2894 * v v v
2895 * +---+---+---+---+
2896 * T | T | T | T | T |
2897 * +---+-------+---+
2898 * +---+---+---+---+
2899 * T | T | T | T | T |
2900 * +---+---+---+---+
2901 * ^ ^
2902 * | |
2903 * log_start log_end
2904 *
2905 * There's actually a sub-case where rq->n_entries == 0, in which we
2906 * compare rq->prev_term:
2907 *
2908 * rq->prev_log_index
2909 * |
2910 * |
2911 * |
2912 * v
2913 * T
2914 *
2915 * +---+---+---+---+
2916 * T | T | T | T | T |
2917 * +---+---+---+---+
2918 * ^ ^
2919 * | |
2920 * log_start log_end
2921 */
2922 if (nth_entry_index == raft->log_start - 1) {
2923 if (rq->n_entries
2924 ? raft->snap.term == rq->entries[rq->n_entries - 1].term
2925 : raft->snap.term == rq->prev_log_term) {
2926 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2927 } else {
2928 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
2929 "term mismatch");
2930 }
2931 return;
2932 }
2933
2934 /*
2935 * We now know that the data in rq->entries[] overlaps the data in
2936 * raft->entries[], as shown below, with some positive 'ofs':
2937 *
2938 * rq->prev_log_index
2939 * | first_entry_index
2940 * | | nth_entry_index
2941 * | | |
2942 * v v v
2943 * +---+---+---+---+---+
2944 * T | T | T | T | T | T |
2945 * +---+-------+---+---+
2946 * +---+---+---+---+
2947 * T | T | T | T | T |
2948 * +---+---+---+---+
2949 * ^ ^
2950 * | |
2951 * log_start log_end
2952 *
2953 * |<-- ofs -->|
2954 *
2955 * We transform this into the following by trimming the first 'ofs'
2956 * elements off of rq->entries[], ending up with the following. Notice how
2957 * we retain the term but not the data for rq->entries[ofs - 1]:
2958 *
2959 * first_entry_index + ofs - 1
2960 * | first_entry_index + ofs
2961 * | | nth_entry_index + ofs
2962 * | | |
2963 * v v v
2964 * +---+---+
2965 * T | T | T |
2966 * +---+---+
2967 * +---+---+---+---+
2968 * T | T | T | T | T |
2969 * +---+---+---+---+
2970 * ^ ^
2971 * | |
2972 * log_start log_end
2973 */
2974 uint64_t ofs = raft->log_start - first_entry_index;
2975 raft_handle_append_entries(raft, rq,
2976 raft->log_start - 1, rq->entries[ofs - 1].term,
2977 &rq->entries[ofs], rq->n_entries - ofs);
2978}
2979
2980/* Returns true if 'raft' has another log entry or snapshot to read. */
2981bool
2982raft_has_next_entry(const struct raft *raft_)
2983{
2984 struct raft *raft = CONST_CAST(struct raft *, raft_);
2985 struct uuid eid;
2986 return raft_peek_next_entry(raft, &eid) != NULL;
2987}
2988
2989/* Returns the next log entry or snapshot from 'raft', or NULL if there are
2990 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
2991 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
2992 * log entry. */
2993const struct json *
2994raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
2995{
2996 const struct json *data = raft_get_next_entry(raft, eid);
2997 *is_snapshot = data == raft->snap.data;
2998 return data;
2999}
3000
3001/* Returns the log index of the last-read snapshot or log entry. */
3002uint64_t
3003raft_get_applied_index(const struct raft *raft)
3004{
3005 return raft->last_applied;
3006}
3007
3008/* Returns the log index of the last snapshot or log entry that is available to
3009 * be read. */
3010uint64_t
3011raft_get_commit_index(const struct raft *raft)
3012{
3013 return raft->commit_index;
3014}
3015
3016static struct raft_server *
3017raft_find_peer(struct raft *raft, const struct uuid *uuid)
3018{
3019 struct raft_server *s = raft_find_server(raft, uuid);
3020 return s && !uuid_equals(&raft->sid, &s->sid) ? s : NULL;
3021}
3022
3023static struct raft_server *
3024raft_find_new_server(struct raft *raft, const struct uuid *uuid)
3025{
3026 return raft_server_find(&raft->add_servers, uuid);
3027}
3028
3029/* Figure 3.1: "If there exists an N such that N > commitIndex, a
3030 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
3031 * commitIndex = N (sections 3.5 and 3.6)." */
3032static void
3033raft_consider_updating_commit_index(struct raft *raft)
3034{
3035 /* This loop cannot just bail out when it comes across a log entry that
3036 * does not match the criteria. For example, Figure 3.7(d2) shows a
3037 * case where the log entry for term 2 cannot be committed directly
3038 * (because it is not for the current term) but it can be committed as
3039 * a side effect of commit the entry for term 4 (the current term).
3040 * XXX Is there a more efficient way to do this? */
3041 ovs_assert(raft->role == RAFT_LEADER);
3042
3043 uint64_t new_commit_index = raft->commit_index;
3044 for (uint64_t idx = MAX(raft->commit_index + 1, raft->log_start);
3045 idx < raft->log_end; idx++) {
3046 if (raft->entries[idx - raft->log_start].term == raft->term) {
3047 size_t count = 0;
3048 struct raft_server *s2;
3049 HMAP_FOR_EACH (s2, hmap_node, &raft->servers) {
3050 if (s2->match_index >= idx) {
3051 count++;
3052 }
3053 }
3054 if (count > hmap_count(&raft->servers) / 2) {
3055 VLOG_DBG("index %"PRIu64" committed to %"PRIuSIZE" servers, "
3056 "applying", idx, count);
3057 new_commit_index = idx;
3058 }
3059 }
3060 }
0f954f32
HZ
3061 if (raft_update_commit_index(raft, new_commit_index)) {
3062 raft_send_heartbeats(raft);
3063 }
1b1d2e6d
BP
3064}
3065
3066static void
3067raft_update_match_index(struct raft *raft, struct raft_server *s,
3068 uint64_t min_index)
3069{
3070 ovs_assert(raft->role == RAFT_LEADER);
3071 if (min_index > s->match_index) {
3072 s->match_index = min_index;
3073 raft_consider_updating_commit_index(raft);
3074 }
3075}
3076
3077static void
3078raft_update_our_match_index(struct raft *raft, uint64_t min_index)
3079{
e8208c66
BP
3080 struct raft_server *server = raft_find_server(raft, &raft->sid);
3081 if (server) {
3082 raft_update_match_index(raft, server, min_index);
3083 }
1b1d2e6d
BP
3084}
3085
3086static void
3087raft_send_install_snapshot_request(struct raft *raft,
3088 const struct raft_server *s,
3089 const char *comment)
3090{
3091 union raft_rpc rpc = {
3092 .install_snapshot_request = {
3093 .common = {
3094 .type = RAFT_RPC_INSTALL_SNAPSHOT_REQUEST,
3095 .sid = s->sid,
3096 .comment = CONST_CAST(char *, comment),
3097 },
3098 .term = raft->term,
3099 .last_index = raft->log_start - 1,
3100 .last_term = raft->snap.term,
3101 .last_servers = raft->snap.servers,
3102 .last_eid = raft->snap.eid,
3103 .data = raft->snap.data,
3104 }
3105 };
3106 raft_send(raft, &rpc);
3107}
3108
3109static void
3110raft_handle_append_reply(struct raft *raft,
3111 const struct raft_append_reply *rpy)
3112{
3113 if (raft->role != RAFT_LEADER) {
3114 VLOG_INFO("rejected append_reply (not leader)");
3115 return;
3116 }
3117
3118 /* Most commonly we'd be getting an AppendEntries reply from a configured
3119 * server (e.g. a peer), but we can also get them from servers in the
3120 * process of being added. */
3121 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3122 if (!s) {
3123 s = raft_find_new_server(raft, &rpy->common.sid);
3124 if (!s) {
3125 VLOG_INFO("rejected append_reply from unknown server "SID_FMT,
3126 SID_ARGS(&rpy->common.sid));
3127 return;
3128 }
3129 }
3130
3131 if (rpy->result == RAFT_APPEND_OK) {
3132 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3133 * follower (section 3.5)." */
3134 uint64_t min_index = rpy->prev_log_index + rpy->n_entries + 1;
3135 if (s->next_index < min_index) {
3136 s->next_index = min_index;
3137 }
3138 raft_update_match_index(raft, s, min_index - 1);
3139 } else {
3140 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3141 * decrement nextIndex and retry (section 3.5)."
3142 *
3143 * We also implement the optimization suggested in section 4.2.1:
3144 * "Various approaches can make nextIndex converge to its correct value
3145 * more quickly, including those described in Chapter 3. The simplest
3146 * approach to solving this particular problem of adding a new server,
3147 * however, is to have followers return the length of their logs in the
3148 * AppendEntries response; this allows the leader to cap the follower’s
3149 * nextIndex accordingly." */
3150 s->next_index = (s->next_index > 0
3151 ? MIN(s->next_index - 1, rpy->log_end)
3152 : 0);
3153
3154 if (rpy->result == RAFT_APPEND_IO_ERROR) {
3155 /* Append failed but not because of a log inconsistency. Because
3156 * of the I/O error, there's no point in re-sending the append
3157 * immediately. */
3158 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3159 VLOG_INFO_RL(&rl, "%s reported I/O error", s->nickname);
3160 return;
3161 }
3162 }
3163
3164 /*
3165 * Our behavior here must depend on the value of next_index relative to
3166 * log_start and log_end. There are three cases:
3167 *
3168 * Case 1 | Case 2 | Case 3
3169 * <---------------->|<------------->|<------------------>
3170 * | |
3171 *
3172 * +---+---+---+---+
3173 * T | T | T | T | T |
3174 * +---+---+---+---+
3175 * ^ ^
3176 * | |
3177 * log_start log_end
3178 */
3179 if (s->next_index < raft->log_start) {
3180 /* Case 1. */
3181 raft_send_install_snapshot_request(raft, s, NULL);
3182 } else if (s->next_index < raft->log_end) {
3183 /* Case 2. */
3184 raft_send_append_request(raft, s, 1, NULL);
3185 } else {
3186 /* Case 3. */
3187 if (s->phase == RAFT_PHASE_CATCHUP) {
3188 s->phase = RAFT_PHASE_CAUGHT_UP;
3189 raft_run_reconfigure(raft);
3190 }
3191 }
3192}
3193
3194static bool
3195raft_should_suppress_disruptive_server(struct raft *raft,
3196 const union raft_rpc *rpc)
3197{
3198 if (rpc->type != RAFT_RPC_VOTE_REQUEST) {
3199 return false;
3200 }
3201
3202 /* Section 4.2.3 "Disruptive Servers" says:
3203 *
3204 * ...if a server receives a RequestVote request within the minimum
3205 * election timeout of hearing from a current leader, it does not update
3206 * its term or grant its vote...
3207 *
3208 * ...This change conflicts with the leadership transfer mechanism as
3209 * described in Chapter 3, in which a server legitimately starts an
3210 * election without waiting an election timeout. In that case,
3211 * RequestVote messages should be processed by other servers even when
3212 * they believe a current cluster leader exists. Those RequestVote
3213 * requests can include a special flag to indicate this behavior (“I
3214 * have permission to disrupt the leader--it told me to!”).
3215 *
3216 * This clearly describes how the followers should act, but not the leader.
3217 * We just ignore vote requests that arrive at a current leader. This
3218 * seems to be fairly safe, since a majority other than the current leader
3219 * can still elect a new leader and the first AppendEntries from that new
3220 * leader will depose the current leader. */
3221 const struct raft_vote_request *rq = raft_vote_request_cast(rpc);
3222 if (rq->leadership_transfer) {
3223 return false;
3224 }
3225
3226 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3227 long long int now = time_msec();
3228 switch (raft->role) {
3229 case RAFT_LEADER:
3230 VLOG_WARN_RL(&rl, "ignoring vote request received as leader");
3231 return true;
3232
3233 case RAFT_FOLLOWER:
3234 if (now < raft->election_base + ELECTION_BASE_MSEC) {
3235 VLOG_WARN_RL(&rl, "ignoring vote request received after only "
3236 "%lld ms (minimum election time is %d ms)",
3237 now - raft->election_base, ELECTION_BASE_MSEC);
3238 return true;
3239 }
3240 return false;
3241
3242 case RAFT_CANDIDATE:
3243 return false;
3244
3245 default:
3246 OVS_NOT_REACHED();
3247 }
3248}
3249
3250/* Returns true if a reply should be sent. */
3251static bool
3252raft_handle_vote_request__(struct raft *raft,
3253 const struct raft_vote_request *rq)
3254{
3255 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3256 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3257 * 3.6)." */
3258 if (uuid_equals(&raft->vote, &rq->common.sid)) {
3259 /* Already voted for this candidate in this term. Resend vote. */
3260 return true;
3261 } else if (!uuid_is_zero(&raft->vote)) {
3262 /* Already voted for different candidate in this term. Send a reply
3263 * saying what candidate we did vote for. This isn't a necessary part
3264 * of the Raft protocol but it can make debugging easier. */
3265 return true;
3266 }
3267
3268 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3269 * includes information about the candidate’s log, and the voter denies its
3270 * vote if its own log is more up-to-date than that of the candidate. Raft
3271 * determines which of two logs is more up-to-date by comparing the index
3272 * and term of the last entries in the logs. If the logs have last entries
3273 * with different terms, then the log with the later term is more
3274 * up-to-date. If the logs end with the same term, then whichever log is
3275 * longer is more up-to-date." */
3276 uint64_t last_term = (raft->log_end > raft->log_start
3277 ? raft->entries[raft->log_end - 1
3278 - raft->log_start].term
3279 : raft->snap.term);
3280 if (last_term > rq->last_log_term
3281 || (last_term == rq->last_log_term
3282 && raft->log_end - 1 > rq->last_log_index)) {
3283 /* Our log is more up-to-date than the peer's. Withhold vote. */
3284 return false;
3285 }
3286
3287 /* Record a vote for the peer. */
3288 if (!raft_set_term(raft, raft->term, &rq->common.sid)) {
3289 return false;
3290 }
3291
0f954f32 3292 raft_reset_election_timer(raft);
1b1d2e6d
BP
3293
3294 return true;
3295}
3296
3297static void
3298raft_send_vote_reply(struct raft *raft, const struct uuid *dst,
3299 const struct uuid *vote)
3300{
3301 union raft_rpc rpy = {
3302 .vote_reply = {
3303 .common = {
3304 .type = RAFT_RPC_VOTE_REPLY,
3305 .sid = *dst,
3306 },
3307 .term = raft->term,
3308 .vote = *vote,
3309 },
3310 };
3311 raft_send(raft, &rpy);
3312}
3313
3314static void
3315raft_handle_vote_request(struct raft *raft,
3316 const struct raft_vote_request *rq)
3317{
3318 if (raft_handle_vote_request__(raft, rq)) {
3319 raft_send_vote_reply(raft, &rq->common.sid, &raft->vote);
3320 }
3321}
3322
3323static void
3324raft_handle_vote_reply(struct raft *raft,
3325 const struct raft_vote_reply *rpy)
3326{
3327 if (!raft_receive_term__(raft, &rpy->common, rpy->term)) {
3328 return;
3329 }
3330
3331 if (raft->role != RAFT_CANDIDATE) {
3332 return;
3333 }
3334
3335 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3336 if (s) {
3337 raft_accept_vote(raft, s, &rpy->vote);
3338 }
3339}
3340
3341/* Returns true if 'raft''s log contains reconfiguration entries that have not
3342 * yet been committed. */
3343static bool
3344raft_has_uncommitted_configuration(const struct raft *raft)
3345{
3346 for (uint64_t i = raft->commit_index + 1; i < raft->log_end; i++) {
3347 ovs_assert(i >= raft->log_start);
3348 const struct raft_entry *e = &raft->entries[i - raft->log_start];
3349 if (e->servers) {
3350 return true;
3351 }
3352 }
3353 return false;
3354}
3355
3356static void
3357raft_log_reconfiguration(struct raft *raft)
3358{
3359 struct json *servers_json = raft_servers_to_json(&raft->servers);
3360 raft_command_unref(raft_command_execute__(
3361 raft, NULL, servers_json, NULL, NULL));
3362 json_destroy(servers_json);
3363}
3364
3365static void
3366raft_run_reconfigure(struct raft *raft)
3367{
3368 ovs_assert(raft->role == RAFT_LEADER);
3369
3370 /* Reconfiguration only progresses when configuration changes commit. */
3371 if (raft_has_uncommitted_configuration(raft)) {
3372 return;
3373 }
3374
3375 /* If we were waiting for a configuration change to commit, it's done. */
3376 struct raft_server *s;
3377 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3378 if (s->phase == RAFT_PHASE_COMMITTING) {
3379 raft_send_add_server_reply__(raft, &s->sid, s->address,
3380 true, RAFT_SERVER_COMPLETED);
3381 s->phase = RAFT_PHASE_STABLE;
3382 }
3383 }
3384 if (raft->remove_server) {
3385 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
3386 &raft->remove_server->requester_sid,
3387 raft->remove_server->requester_conn,
3388 true, RAFT_SERVER_COMPLETED);
3389 raft_server_destroy(raft->remove_server);
3390 raft->remove_server = NULL;
3391 }
3392
3393 /* If a new server is caught up, add it to the configuration. */
3394 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
3395 if (s->phase == RAFT_PHASE_CAUGHT_UP) {
3396 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3397 hmap_remove(&raft->add_servers, &s->hmap_node);
3398 hmap_insert(&raft->servers, &s->hmap_node, uuid_hash(&s->sid));
3399
3400 /* Mark 's' as waiting for commit. */
3401 s->phase = RAFT_PHASE_COMMITTING;
3402
3403 raft_log_reconfiguration(raft);
3404
3405 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3406 * send a RAFT_SERVER_OK reply. */
3407
3408 return;
3409 }
3410 }
3411
3412 /* Remove a server, if one is scheduled for removal. */
3413 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3414 if (s->phase == RAFT_PHASE_REMOVE) {
3415 hmap_remove(&raft->servers, &s->hmap_node);
3416 raft->remove_server = s;
3417
3418 raft_log_reconfiguration(raft);
3419
3420 return;
3421 }
3422 }
3423}
3424
3425static void
3426raft_handle_add_server_request(struct raft *raft,
3427 const struct raft_add_server_request *rq)
3428{
3429 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3430 if (raft->role != RAFT_LEADER) {
3431 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3432 return;
3433 }
3434
3435 /* Check for an existing server. */
3436 struct raft_server *s = raft_find_server(raft, &rq->common.sid);
3437 if (s) {
3438 /* If the server is scheduled to be removed, cancel it. */
3439 if (s->phase == RAFT_PHASE_REMOVE) {
3440 s->phase = RAFT_PHASE_STABLE;
3441 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_CANCELED);
3442 return;
3443 }
3444
3445 /* If the server is being added, then it's in progress. */
3446 if (s->phase != RAFT_PHASE_STABLE) {
3447 raft_send_add_server_reply(raft, rq,
3448 false, RAFT_SERVER_IN_PROGRESS);
3449 }
3450
3451 /* Nothing to do--server is already part of the configuration. */
3452 raft_send_add_server_reply(raft, rq,
3453 true, RAFT_SERVER_ALREADY_PRESENT);
3454 return;
3455 }
3456
3457 /* Check for a server being removed. */
3458 if (raft->remove_server
3459 && uuid_equals(&rq->common.sid, &raft->remove_server->sid)) {
3460 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3461 return;
3462 }
3463
3464 /* Check for a server already being added. */
3465 if (raft_find_new_server(raft, &rq->common.sid)) {
3466 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_IN_PROGRESS);
3467 return;
3468 }
3469
3470 /* Add server to 'add_servers'. */
3471 s = raft_server_add(&raft->add_servers, &rq->common.sid, rq->address);
3472 raft_server_init_leader(raft, s);
3473 s->requester_sid = rq->common.sid;
3474 s->requester_conn = NULL;
3475 s->phase = RAFT_PHASE_CATCHUP;
3476
3477 /* Start sending the log. If this is the first time we've tried to add
3478 * this server, then this will quickly degenerate into an InstallSnapshot
3479 * followed by a series of AddEntries, but if it's a retry of an earlier
3480 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3481 * leadership) then it will gracefully resume populating the log.
3482 *
3483 * See the last few paragraphs of section 4.2.1 for further insight. */
3484 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3485 VLOG_INFO_RL(&rl,
3486 "starting to add server %s ("SID_FMT" at %s) "
3487 "to cluster "CID_FMT, s->nickname, SID_ARGS(&s->sid),
3488 rq->address, CID_ARGS(&raft->cid));
3489 raft_send_append_request(raft, s, 0, "initialize new server");
3490}
3491
3492static void
3493raft_handle_add_server_reply(struct raft *raft,
3494 const struct raft_add_server_reply *rpy)
3495{
3496 if (!raft->joining) {
3497 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3498 VLOG_WARN_RL(&rl, "received add_server_reply even though we're "
3499 "already part of the cluster");
3500 return;
3501 }
3502
3503 if (rpy->success) {
3504 raft->joining = false;
3505
3506 /* It is tempting, at this point, to check that this server is part of
3507 * the current configuration. However, this is not necessarily the
3508 * case, because the log entry that added this server to the cluster
3509 * might have been committed by a majority of the cluster that does not
3510 * include this one. This actually happens in testing. */
3511 } else {
3512 const char *address;
3513 SSET_FOR_EACH (address, &rpy->remote_addresses) {
3514 if (sset_add(&raft->remote_addresses, address)) {
3515 VLOG_INFO("%s: learned new server address for joining cluster",
3516 address);
3517 }
3518 }
3519 }
3520}
3521
3522/* This is called by raft_unixctl_kick() as well as via RPC. */
3523static void
3524raft_handle_remove_server_request(struct raft *raft,
3525 const struct raft_remove_server_request *rq)
3526{
3527 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3528 if (raft->role != RAFT_LEADER) {
3529 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3530 return;
3531 }
3532
3533 /* If the server to remove is currently waiting to be added, cancel it. */
3534 struct raft_server *target = raft_find_new_server(raft, &rq->sid);
3535 if (target) {
3536 raft_send_add_server_reply__(raft, &target->sid, target->address,
3537 false, RAFT_SERVER_CANCELED);
3538 hmap_remove(&raft->add_servers, &target->hmap_node);
3539 raft_server_destroy(target);
3540 return;
3541 }
3542
3543 /* If the server isn't configured, report that. */
3544 target = raft_find_server(raft, &rq->sid);
3545 if (!target) {
3546 raft_send_remove_server_reply(raft, rq,
3547 true, RAFT_SERVER_ALREADY_GONE);
3548 return;
3549 }
3550
3551 /* Check whether we're waiting for the addition of the server to commit. */
3552 if (target->phase == RAFT_PHASE_COMMITTING) {
3553 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3554 return;
3555 }
3556
3557 /* Check whether the server is already scheduled for removal. */
3558 if (target->phase == RAFT_PHASE_REMOVE) {
3559 raft_send_remove_server_reply(raft, rq,
3560 false, RAFT_SERVER_IN_PROGRESS);
3561 return;
3562 }
3563
3564 /* Make sure that if we remove this server then that at least one other
3565 * server will be left. We don't count servers currently being added (in
3566 * 'add_servers') since those could fail. */
3567 struct raft_server *s;
3568 int n = 0;
3569 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3570 if (s != target && s->phase != RAFT_PHASE_REMOVE) {
3571 n++;
3572 }
3573 }
3574 if (!n) {
3575 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_EMPTY);
3576 return;
3577 }
3578
3579 /* Mark the server for removal. */
3580 target->phase = RAFT_PHASE_REMOVE;
3581 if (rq->requester_conn) {
3582 target->requester_sid = UUID_ZERO;
3583 unixctl_command_reply(rq->requester_conn, "started removal");
3584 } else {
3585 target->requester_sid = rq->common.sid;
3586 target->requester_conn = NULL;
3587 }
3588
3589 raft_run_reconfigure(raft);
3590 /* Operation in progress, reply will be sent later. */
3591}
3592
3593static void
17bd4149 3594raft_finished_leaving_cluster(struct raft *raft)
1b1d2e6d 3595{
17bd4149
BP
3596 VLOG_INFO(SID_FMT": finished leaving cluster "CID_FMT,
3597 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1b1d2e6d 3598
17bd4149
BP
3599 raft_record_note(raft, "left", "this server left the cluster");
3600
3601 raft->leaving = false;
3602 raft->left = true;
3603}
1b1d2e6d 3604
17bd4149
BP
3605static void
3606raft_handle_remove_server_reply(struct raft *raft,
3607 const struct raft_remove_server_reply *rpc)
3608{
3609 if (rpc->success
3610 && (uuid_is_zero(&rpc->target_sid)
3611 || uuid_equals(&rpc->target_sid, &raft->sid))) {
3612 raft_finished_leaving_cluster(raft);
1b1d2e6d
BP
3613 }
3614}
3615
3616static bool
3617raft_handle_write_error(struct raft *raft, struct ovsdb_error *error)
3618{
3619 if (error && !raft->failed) {
3620 raft->failed = true;
3621
3622 char *s = ovsdb_error_to_string_free(error);
3623 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3624 raft->name, s);
3625 free(s);
3626 }
3627 return !raft->failed;
3628}
3629
3630static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3631raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
3632 uint64_t new_log_start,
3633 const struct raft_entry *new_snapshot)
3634{
3635 struct raft_header h = {
3636 .sid = raft->sid,
3637 .cid = raft->cid,
3638 .name = raft->name,
3639 .local_address = raft->local_address,
3640 .snap_index = new_log_start - 1,
3641 .snap = *new_snapshot,
3642 };
3643 struct ovsdb_error *error = ovsdb_log_write_and_free(
3644 log, raft_header_to_json(&h));
3645 if (error) {
3646 return error;
3647 }
3648 ovsdb_log_mark_base(raft->log);
3649
3650 /* Write log records. */
3651 for (uint64_t index = new_log_start; index < raft->log_end; index++) {
3652 const struct raft_entry *e = &raft->entries[index - raft->log_start];
3653 struct raft_record r = {
3654 .type = RAFT_REC_ENTRY,
3655 .term = e->term,
3656 .entry = {
3657 .index = index,
3658 .data = e->data,
3659 .servers = e->servers,
3660 .eid = e->eid,
3661 },
3662 };
3663 error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3664 if (error) {
3665 return error;
3666 }
3667 }
3668
3669 /* Write term and vote (if any).
3670 *
3671 * The term is redundant if we wrote a log record for that term above. The
3672 * vote, if any, is never redundant.
3673 */
3674 error = raft_write_state(log, raft->term, &raft->vote);
3675 if (error) {
3676 return error;
3677 }
3678
3679 /* Write commit_index if it's beyond the new start of the log. */
3680 if (raft->commit_index >= new_log_start) {
3681 struct raft_record r = {
3682 .type = RAFT_REC_COMMIT_INDEX,
3683 .commit_index = raft->commit_index,
3684 };
3685 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3686 }
3687 return NULL;
3688}
3689
3690static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3691raft_save_snapshot(struct raft *raft,
3692 uint64_t new_start, const struct raft_entry *new_snapshot)
3693
3694{
3695 struct ovsdb_log *new_log;
3696 struct ovsdb_error *error;
3697 error = ovsdb_log_replace_start(raft->log, &new_log);
3698 if (error) {
3699 return error;
3700 }
3701
3702 error = raft_write_snapshot(raft, new_log, new_start, new_snapshot);
3703 if (error) {
3704 ovsdb_log_replace_abort(new_log);
3705 return error;
3706 }
3707
3708 return ovsdb_log_replace_commit(raft->log, new_log);
3709}
3710
3711static bool
3712raft_handle_install_snapshot_request__(
3713 struct raft *raft, const struct raft_install_snapshot_request *rq)
3714{
0f954f32 3715 raft_reset_election_timer(raft);
1b1d2e6d
BP
3716
3717 /*
3718 * Our behavior here depend on new_log_start in the snapshot compared to
3719 * log_start and log_end. There are three cases:
3720 *
3721 * Case 1 | Case 2 | Case 3
3722 * <---------------->|<------------->|<------------------>
3723 * | |
3724 *
3725 * +---+---+---+---+
3726 * T | T | T | T | T |
3727 * +---+---+---+---+
3728 * ^ ^
3729 * | |
3730 * log_start log_end
3731 */
3732 uint64_t new_log_start = rq->last_index + 1;
3733 if (new_log_start < raft->log_start) {
3734 /* Case 1: The new snapshot covers less than our current one. Nothing
3735 * to do. */
3736 return true;
3737 } else if (new_log_start < raft->log_end) {
3738 /* Case 2: The new snapshot starts in the middle of our log. We could
3739 * discard the first 'new_log_start - raft->log_start' entries in the
3740 * log. But there's not much value in that, since snapshotting is
3741 * supposed to be a local decision. Just skip it. */
3742 return true;
3743 }
3744
3745 /* Case 3: The new snapshot starts past the end of our current log, so
3746 * discard all of our current log. */
3747 const struct raft_entry new_snapshot = {
3748 .term = rq->last_term,
3749 .data = rq->data,
3750 .eid = rq->last_eid,
3751 .servers = rq->last_servers,
3752 };
3753 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3754 &new_snapshot);
3755 if (error) {
3756 char *error_s = ovsdb_error_to_string(error);
3757 VLOG_WARN("could not save snapshot: %s", error_s);
3758 free(error_s);
3759 return false;
3760 }
3761
3762 for (size_t i = 0; i < raft->log_end - raft->log_start; i++) {
3763 raft_entry_uninit(&raft->entries[i]);
3764 }
3765 raft->log_start = raft->log_end = new_log_start;
3766 raft->log_synced = raft->log_end - 1;
3767 raft->commit_index = raft->log_start - 1;
3768 if (raft->last_applied < raft->commit_index) {
3769 raft->last_applied = raft->log_start - 2;
3770 }
3771
3772 raft_entry_uninit(&raft->snap);
3773 raft_entry_clone(&raft->snap, &new_snapshot);
3774
3775 raft_get_servers_from_log(raft, VLL_INFO);
3776
3777 return true;
3778}
3779
3780static void
3781raft_handle_install_snapshot_request(
3782 struct raft *raft, const struct raft_install_snapshot_request *rq)
3783{
3784 if (raft_handle_install_snapshot_request__(raft, rq)) {
3785 union raft_rpc rpy = {
3786 .install_snapshot_reply = {
3787 .common = {
3788 .type = RAFT_RPC_INSTALL_SNAPSHOT_REPLY,
3789 .sid = rq->common.sid,
3790 },
3791 .term = raft->term,
3792 .last_index = rq->last_index,
3793 .last_term = rq->last_term,
3794 },
3795 };
3796 raft_send(raft, &rpy);
3797 }
3798}
3799
3800static void
3801raft_handle_install_snapshot_reply(
3802 struct raft *raft, const struct raft_install_snapshot_reply *rpy)
3803{
3804 /* We might get an InstallSnapshot reply from a configured server (e.g. a
3805 * peer) or a server in the process of being added. */
3806 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3807 if (!s) {
3808 s = raft_find_new_server(raft, &rpy->common.sid);
3809 if (!s) {
3810 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3811 VLOG_INFO_RL(&rl, "cluster "CID_FMT": received %s from "
3812 "unknown server "SID_FMT, CID_ARGS(&raft->cid),
3813 raft_rpc_type_to_string(rpy->common.type),
3814 SID_ARGS(&rpy->common.sid));
3815 return;
3816 }
3817 }
3818
3819 if (rpy->last_index != raft->log_start - 1 ||
3820 rpy->last_term != raft->snap.term) {
3821 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3822 VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s installed "
3823 "out-of-date snapshot, starting over",
3824 CID_ARGS(&raft->cid), s->nickname);
3825 raft_send_install_snapshot_request(raft, s,
3826 "installed obsolete snapshot");
3827 return;
3828 }
3829
3830 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3831 VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
3832 " up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
3833 s->nickname, rpy->last_term, rpy->last_index);
3834 s->next_index = raft->log_end;
3835 raft_send_append_request(raft, s, 0, "snapshot installed");
3836}
3837
3838/* Returns true if 'raft' has grown enough since the last snapshot that
3839 * reducing the log to a snapshot would be valuable, false otherwise. */
3840bool
3841raft_grew_lots(const struct raft *raft)
3842{
3843 return ovsdb_log_grew_lots(raft->log);
3844}
3845
3846/* Returns the number of log entries that could be trimmed off the on-disk log
3847 * by snapshotting. */
3848uint64_t
3849raft_get_log_length(const struct raft *raft)
3850{
3851 return (raft->last_applied < raft->log_start
3852 ? 0
3853 : raft->last_applied - raft->log_start + 1);
3854}
3855
3856/* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
3857 * possible. */
3858bool
3859raft_may_snapshot(const struct raft *raft)
3860{
3861 return (!raft->joining
3862 && !raft->leaving
3863 && !raft->left
3864 && !raft->failed
3865 && raft->last_applied >= raft->log_start);
3866}
3867
3868/* Replaces the log for 'raft', up to the last log entry read, by
3869 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
3870 * the caller must eventually free.
3871 *
3872 * This function can only succeed if raft_may_snapshot() returns true. It is
3873 * only valuable to call it if raft_get_log_length() is significant and
3874 * especially if raft_grew_lots() returns true. */
3875struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3876raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
3877{
3878 if (raft->joining) {
3879 return ovsdb_error(NULL,
3880 "cannot store a snapshot while joining cluster");
3881 } else if (raft->leaving) {
3882 return ovsdb_error(NULL,
3883 "cannot store a snapshot while leaving cluster");
3884 } else if (raft->left) {
3885 return ovsdb_error(NULL,
3886 "cannot store a snapshot after leaving cluster");
3887 } else if (raft->failed) {
3888 return ovsdb_error(NULL,
3889 "cannot store a snapshot following failure");
3890 }
3891
3892 if (raft->last_applied < raft->log_start) {
3893 return ovsdb_error(NULL, "not storing a duplicate snapshot");
3894 }
3895
3896 uint64_t new_log_start = raft->last_applied + 1;
a521491b 3897 struct raft_entry new_snapshot = {
1b1d2e6d 3898 .term = raft_get_term(raft, new_log_start - 1),
a521491b 3899 .data = json_clone(new_snapshot_data),
1b1d2e6d 3900 .eid = *raft_get_eid(raft, new_log_start - 1),
a521491b 3901 .servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
1b1d2e6d
BP
3902 };
3903 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3904 &new_snapshot);
3905 if (error) {
a521491b 3906 raft_entry_uninit(&new_snapshot);
1b1d2e6d
BP
3907 return error;
3908 }
3909
3910 raft->log_synced = raft->log_end - 1;
3911 raft_entry_uninit(&raft->snap);
a521491b 3912 raft->snap = new_snapshot;
1b1d2e6d
BP
3913 for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
3914 raft_entry_uninit(&raft->entries[i]);
3915 }
3916 memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
3917 (raft->log_end - new_log_start) * sizeof *raft->entries);
3918 raft->log_start = new_log_start;
3919 return NULL;
3920}
3921
3922static void
3923raft_handle_become_leader(struct raft *raft,
3924 const struct raft_become_leader *rq)
3925{
3926 if (raft->role == RAFT_FOLLOWER) {
3927 char buf[SID_LEN + 1];
3928 VLOG_INFO("received leadership transfer from %s in term %"PRIu64,
3929 raft_get_nickname(raft, &rq->common.sid, buf, sizeof buf),
3930 rq->term);
3931 raft_start_election(raft, true);
3932 }
3933}
3934
3935static void
3936raft_send_execute_command_reply(struct raft *raft,
3937 const struct uuid *sid,
3938 const struct uuid *eid,
3939 enum raft_command_status status,
3940 uint64_t commit_index)
3941{
3942 union raft_rpc rpc = {
3943 .execute_command_reply = {
3944 .common = {
3945 .type = RAFT_RPC_EXECUTE_COMMAND_REPLY,
3946 .sid = *sid,
3947 },
3948 .result = *eid,
3949 .status = status,
3950 .commit_index = commit_index,
3951 },
3952 };
3953 raft_send(raft, &rpc);
3954}
3955
3956static enum raft_command_status
3957raft_handle_execute_command_request__(
3958 struct raft *raft, const struct raft_execute_command_request *rq)
3959{
3960 if (raft->role != RAFT_LEADER) {
3961 return RAFT_CMD_NOT_LEADER;
3962 }
3963
3964 const struct uuid *current_eid = raft_current_eid(raft);
3965 if (!uuid_equals(&rq->prereq, current_eid)) {
3966 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3967 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
3968 "prerequisite "UUID_FMT" in execute_command_request",
3969 UUID_ARGS(current_eid), UUID_ARGS(&rq->prereq));
3970 return RAFT_CMD_BAD_PREREQ;
3971 }
3972
3973 struct raft_command *cmd = raft_command_initiate(raft, rq->data,
3974 NULL, &rq->result);
3975 cmd->sid = rq->common.sid;
3976
3977 enum raft_command_status status = cmd->status;
3978 if (status != RAFT_CMD_INCOMPLETE) {
3979 raft_command_unref(cmd);
3980 }
3981 return status;
3982}
3983
3984static void
3985raft_handle_execute_command_request(
3986 struct raft *raft, const struct raft_execute_command_request *rq)
3987{
3988 enum raft_command_status status
3989 = raft_handle_execute_command_request__(raft, rq);
3990 if (status != RAFT_CMD_INCOMPLETE) {
3991 raft_send_execute_command_reply(raft, &rq->common.sid, &rq->result,
3992 status, 0);
3993 }
3994}
3995
3996static void
3997raft_handle_execute_command_reply(
3998 struct raft *raft, const struct raft_execute_command_reply *rpy)
3999{
4000 struct raft_command *cmd = raft_find_command_by_eid(raft, &rpy->result);
4001 if (!cmd) {
4002 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
4003 char buf[SID_LEN + 1];
4004 VLOG_INFO_RL(&rl,
4005 "%s received \"%s\" reply from %s for unknown command",
4006 raft->local_nickname,
4007 raft_command_status_to_string(rpy->status),
4008 raft_get_nickname(raft, &rpy->common.sid,
4009 buf, sizeof buf));
4010 return;
4011 }
4012
4013 if (rpy->status == RAFT_CMD_INCOMPLETE) {
4014 cmd->timestamp = time_msec();
4015 } else {
4016 cmd->index = rpy->commit_index;
4017 raft_command_complete(raft, cmd, rpy->status);
4018 }
4019}
4020
4021static void
4022raft_handle_rpc(struct raft *raft, const union raft_rpc *rpc)
4023{
4024 uint64_t term = raft_rpc_get_term(rpc);
4025 if (term
4026 && !raft_should_suppress_disruptive_server(raft, rpc)
4027 && !raft_receive_term__(raft, &rpc->common, term)) {
4028 if (rpc->type == RAFT_RPC_APPEND_REQUEST) {
4029 /* Section 3.3: "If a server receives a request with a stale term
4030 * number, it rejects the request." */
4031 raft_send_append_reply(raft, raft_append_request_cast(rpc),
4032 RAFT_APPEND_INCONSISTENCY, "stale term");
4033 }
4034 return;
4035 }
4036
4037 switch (rpc->type) {
4038#define RAFT_RPC(ENUM, NAME) \
4039 case ENUM: \
4040 raft_handle_##NAME(raft, &rpc->NAME); \
4041 break;
4042 RAFT_RPC_TYPES
4043#undef RAFT_RPC
4044 default:
4045 OVS_NOT_REACHED();
4046 }
4047}
4048\f
4049static bool
4050raft_rpc_is_heartbeat(const union raft_rpc *rpc)
4051{
4052 return ((rpc->type == RAFT_RPC_APPEND_REQUEST
4053 || rpc->type == RAFT_RPC_APPEND_REPLY)
4054 && rpc->common.comment
4055 && !strcmp(rpc->common.comment, "heartbeat"));
4056}
4057
4058\f
4059static bool
02acb41a
BP
4060raft_send_to_conn_at(struct raft *raft, const union raft_rpc *rpc,
4061 struct raft_conn *conn, int line_number)
1b1d2e6d 4062{
02acb41a 4063 log_rpc(rpc, "-->", conn, line_number);
1b1d2e6d
BP
4064 return !jsonrpc_session_send(
4065 conn->js, raft_rpc_to_jsonrpc(&raft->cid, &raft->sid, rpc));
4066}
4067
4068static bool
4069raft_is_rpc_synced(const struct raft *raft, const union raft_rpc *rpc)
4070{
4071 uint64_t term = raft_rpc_get_term(rpc);
4072 uint64_t index = raft_rpc_get_min_sync_index(rpc);
4073 const struct uuid *vote = raft_rpc_get_vote(rpc);
4074
4075 return (term <= raft->synced_term
4076 && index <= raft->log_synced
4077 && (!vote || uuid_equals(vote, &raft->synced_vote)));
4078}
4079
4080static bool
02acb41a 4081raft_send_at(struct raft *raft, const union raft_rpc *rpc, int line_number)
1b1d2e6d
BP
4082{
4083 const struct uuid *dst = &rpc->common.sid;
4084 if (uuid_equals(dst, &raft->sid)) {
4085 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
02acb41a
BP
4086 VLOG_WARN_RL(&rl, "attempted to send RPC to self from raft.c:%d",
4087 line_number);
1b1d2e6d
BP
4088 return false;
4089 }
4090
4091 struct raft_conn *conn = raft_find_conn_by_sid(raft, dst);
4092 if (!conn) {
4093 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4094 char buf[SID_LEN + 1];
02acb41a
BP
4095 VLOG_DBG_RL(&rl, "%s: no connection to %s, cannot send RPC "
4096 "from raft.c:%d", raft->local_nickname,
4097 raft_get_nickname(raft, dst, buf, sizeof buf),
4098 line_number);
1b1d2e6d
BP
4099 return false;
4100 }
4101
4102 if (!raft_is_rpc_synced(raft, rpc)) {
4103 raft_waiter_create(raft, RAFT_W_RPC, false)->rpc = raft_rpc_clone(rpc);
4104 return true;
4105 }
4106
02acb41a 4107 return raft_send_to_conn_at(raft, rpc, conn, line_number);
1b1d2e6d
BP
4108}
4109\f
4110static struct raft *
4111raft_lookup_by_name(const char *name)
4112{
4113 struct raft *raft;
4114
4115 HMAP_FOR_EACH_WITH_HASH (raft, hmap_node, hash_string(name, 0),
4116 &all_rafts) {
4117 if (!strcmp(raft->name, name)) {
4118 return raft;
4119 }
4120 }
4121 return NULL;
4122}
4123
4124static void
4125raft_unixctl_cid(struct unixctl_conn *conn,
4126 int argc OVS_UNUSED, const char *argv[],
4127 void *aux OVS_UNUSED)
4128{
4129 struct raft *raft = raft_lookup_by_name(argv[1]);
4130 if (!raft) {
4131 unixctl_command_reply_error(conn, "unknown cluster");
4132 } else if (uuid_is_zero(&raft->cid)) {
4133 unixctl_command_reply_error(conn, "cluster id not yet known");
4134 } else {
4135 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->cid));
4136 unixctl_command_reply(conn, uuid);
4137 free(uuid);
4138 }
4139}
4140
4141static void
4142raft_unixctl_sid(struct unixctl_conn *conn,
4143 int argc OVS_UNUSED, const char *argv[],
4144 void *aux OVS_UNUSED)
4145{
4146 struct raft *raft = raft_lookup_by_name(argv[1]);
4147 if (!raft) {
4148 unixctl_command_reply_error(conn, "unknown cluster");
4149 } else {
4150 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->sid));
4151 unixctl_command_reply(conn, uuid);
4152 free(uuid);
4153 }
4154}
4155
4156static void
4157raft_put_sid(const char *title, const struct uuid *sid,
4158 const struct raft *raft, struct ds *s)
4159{
4160 ds_put_format(s, "%s: ", title);
4161 if (uuid_equals(sid, &raft->sid)) {
4162 ds_put_cstr(s, "self");
4163 } else if (uuid_is_zero(sid)) {
4164 ds_put_cstr(s, "unknown");
4165 } else {
4166 char buf[SID_LEN + 1];
4167 ds_put_cstr(s, raft_get_nickname(raft, sid, buf, sizeof buf));
4168 }
4169 ds_put_char(s, '\n');
4170}
4171
4172static void
4173raft_unixctl_status(struct unixctl_conn *conn,
4174 int argc OVS_UNUSED, const char *argv[],
4175 void *aux OVS_UNUSED)
4176{
4177 struct raft *raft = raft_lookup_by_name(argv[1]);
4178 if (!raft) {
4179 unixctl_command_reply_error(conn, "unknown cluster");
4180 return;
4181 }
4182
4183 struct ds s = DS_EMPTY_INITIALIZER;
4184 ds_put_format(&s, "%s\n", raft->local_nickname);
4185 ds_put_format(&s, "Name: %s\n", raft->name);
4186 ds_put_format(&s, "Cluster ID: ");
4187 if (!uuid_is_zero(&raft->cid)) {
4188 ds_put_format(&s, CID_FMT" ("UUID_FMT")\n",
4189 CID_ARGS(&raft->cid), UUID_ARGS(&raft->cid));
4190 } else {
4191 ds_put_format(&s, "not yet known\n");
4192 }
4193 ds_put_format(&s, "Server ID: "SID_FMT" ("UUID_FMT")\n",
4194 SID_ARGS(&raft->sid), UUID_ARGS(&raft->sid));
4195 ds_put_format(&s, "Address: %s\n", raft->local_address);
4196 ds_put_format(&s, "Status: %s\n",
4197 raft->joining ? "joining cluster"
4198 : raft->leaving ? "leaving cluster"
4199 : raft->left ? "left cluster"
4200 : raft->failed ? "failed"
4201 : "cluster member");
4202 if (raft->joining) {
4203 ds_put_format(&s, "Remotes for joining:");
4204 const char *address;
4205 SSET_FOR_EACH (address, &raft->remote_addresses) {
4206 ds_put_format(&s, " %s", address);
4207 }
4208 ds_put_char(&s, '\n');
4209 }
4210 if (raft->role == RAFT_LEADER) {
4211 struct raft_server *as;
4212 HMAP_FOR_EACH (as, hmap_node, &raft->add_servers) {
4213 ds_put_format(&s, "Adding server %s ("SID_FMT" at %s) (%s)\n",
4214 as->nickname, SID_ARGS(&as->sid), as->address,
4215 raft_server_phase_to_string(as->phase));
4216 }
4217
4218 struct raft_server *rs = raft->remove_server;
4219 if (rs) {
4220 ds_put_format(&s, "Removing server %s ("SID_FMT" at %s) (%s)\n",
4221 rs->nickname, SID_ARGS(&rs->sid), rs->address,
4222 raft_server_phase_to_string(rs->phase));
4223 }
4224 }
4225
4226 ds_put_format(&s, "Role: %s\n",
4227 raft->role == RAFT_LEADER ? "leader"
4228 : raft->role == RAFT_CANDIDATE ? "candidate"
4229 : raft->role == RAFT_FOLLOWER ? "follower"
4230 : "<error>");
4231 ds_put_format(&s, "Term: %"PRIu64"\n", raft->term);
4232 raft_put_sid("Leader", &raft->leader_sid, raft, &s);
4233 raft_put_sid("Vote", &raft->vote, raft, &s);
4234 ds_put_char(&s, '\n');
4235
4236 ds_put_format(&s, "Log: [%"PRIu64", %"PRIu64"]\n",
4237 raft->log_start, raft->log_end);
4238
4239 uint64_t n_uncommitted = raft->log_end - raft->commit_index - 1;
4240 ds_put_format(&s, "Entries not yet committed: %"PRIu64"\n", n_uncommitted);
4241
4242 uint64_t n_unapplied = raft->log_end - raft->last_applied - 1;
4243 ds_put_format(&s, "Entries not yet applied: %"PRIu64"\n", n_unapplied);
4244
4245 const struct raft_conn *c;
4246 ds_put_cstr(&s, "Connections:");
4247 LIST_FOR_EACH (c, list_node, &raft->conns) {
4248 bool connected = jsonrpc_session_is_connected(c->js);
4249 ds_put_format(&s, " %s%s%s%s",
4250 connected ? "" : "(",
4251 c->incoming ? "<-" : "->", c->nickname,
4252 connected ? "" : ")");
4253 }
4254 ds_put_char(&s, '\n');
4255
4256 ds_put_cstr(&s, "Servers:\n");
4257 struct raft_server *server;
4258 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
4259 ds_put_format(&s, " %s ("SID_FMT" at %s)",
4260 server->nickname,
4261 SID_ARGS(&server->sid), server->address);
4262 if (uuid_equals(&server->sid, &raft->sid)) {
4263 ds_put_cstr(&s, " (self)");
4264 }
4265 if (server->phase != RAFT_PHASE_STABLE) {
4266 ds_put_format (&s, " (%s)",
4267 raft_server_phase_to_string(server->phase));
4268 }
4269 if (raft->role == RAFT_CANDIDATE) {
4270 if (!uuid_is_zero(&server->vote)) {
4271 char buf[SID_LEN + 1];
4272 ds_put_format(&s, " (voted for %s)",
4273 raft_get_nickname(raft, &server->vote,
4274 buf, sizeof buf));
4275 }
4276 } else if (raft->role == RAFT_LEADER) {
4277 ds_put_format(&s, " next_index=%"PRIu64" match_index=%"PRIu64,
4278 server->next_index, server->match_index);
4279 }
4280 ds_put_char(&s, '\n');
4281 }
4282
4283 unixctl_command_reply(conn, ds_cstr(&s));
4284 ds_destroy(&s);
4285}
4286
4287static void
4288raft_unixctl_leave__(struct unixctl_conn *conn, struct raft *raft)
4289{
4290 if (raft_is_leaving(raft)) {
4291 unixctl_command_reply_error(conn,
4292 "already in progress leaving cluster");
4293 } else if (raft_is_joining(raft)) {
4294 unixctl_command_reply_error(conn,
4295 "can't leave while join in progress");
4296 } else if (raft_failed(raft)) {
4297 unixctl_command_reply_error(conn,
4298 "can't leave after failure");
4299 } else {
4300 raft_leave(raft);
4301 unixctl_command_reply(conn, NULL);
4302 }
4303}
4304
4305static void
4306raft_unixctl_leave(struct unixctl_conn *conn, int argc OVS_UNUSED,
4307 const char *argv[], void *aux OVS_UNUSED)
4308{
4309 struct raft *raft = raft_lookup_by_name(argv[1]);
4310 if (!raft) {
4311 unixctl_command_reply_error(conn, "unknown cluster");
4312 return;
4313 }
4314
4315 raft_unixctl_leave__(conn, raft);
4316}
4317
4318static struct raft_server *
4319raft_lookup_server_best_match(struct raft *raft, const char *id)
4320{
4321 struct raft_server *best = NULL;
4322 int best_score = -1;
4323 int n_best = 0;
4324
4325 struct raft_server *s;
4326 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
4327 int score = (!strcmp(id, s->address)
4328 ? INT_MAX
4329 : uuid_is_partial_match(&s->sid, id));
4330 if (score > best_score) {
4331 best = s;
4332 best_score = score;
4333 n_best = 1;
4334 } else if (score == best_score) {
4335 n_best++;
4336 }
4337 }
4338 return n_best == 1 ? best : NULL;
4339}
4340
4341static void
4342raft_unixctl_kick(struct unixctl_conn *conn, int argc OVS_UNUSED,
4343 const char *argv[], void *aux OVS_UNUSED)
4344{
4345 const char *cluster_name = argv[1];
4346 const char *server_name = argv[2];
4347
4348 struct raft *raft = raft_lookup_by_name(cluster_name);
4349 if (!raft) {
4350 unixctl_command_reply_error(conn, "unknown cluster");
4351 return;
4352 }
4353
4354 struct raft_server *server = raft_lookup_server_best_match(raft,
4355 server_name);
4356 if (!server) {
4357 unixctl_command_reply_error(conn, "unknown server");
4358 return;
4359 }
4360
4361 if (uuid_equals(&server->sid, &raft->sid)) {
4362 raft_unixctl_leave__(conn, raft);
4363 } else if (raft->role == RAFT_LEADER) {
4364 const struct raft_remove_server_request rq = {
4365 .sid = server->sid,
4366 .requester_conn = conn,
4367 };
4368 raft_handle_remove_server_request(raft, &rq);
4369 } else {
4370 const union raft_rpc rpc = {
4371 .remove_server_request = {
4372 .common = {
4373 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
4374 .sid = raft->leader_sid,
4375 .comment = "via unixctl"
4376 },
4377 .sid = server->sid,
4378 }
4379 };
4380 if (raft_send(raft, &rpc)) {
4381 unixctl_command_reply(conn, "sent removal request to leader");
4382 } else {
4383 unixctl_command_reply_error(conn,
4384 "failed to send removal request");
4385 }
4386 }
4387}
4388
4389static void
4390raft_init(void)
4391{
4392 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
4393 if (!ovsthread_once_start(&once)) {
4394 return;
4395 }
4396 unixctl_command_register("cluster/cid", "DB", 1, 1,
4397 raft_unixctl_cid, NULL);
4398 unixctl_command_register("cluster/sid", "DB", 1, 1,
4399 raft_unixctl_sid, NULL);
4400 unixctl_command_register("cluster/status", "DB", 1, 1,
4401 raft_unixctl_status, NULL);
4402 unixctl_command_register("cluster/leave", "DB", 1, 1,
4403 raft_unixctl_leave, NULL);
4404 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4405 raft_unixctl_kick, NULL);
4406 ovsthread_once_done(&once);
4407}