]> git.proxmox.com Git - mirror_ovs.git/blame - ovsdb/raft.c
ovn-nbctl: Fix the ovn-nbctl test "LBs - daemon" which fails during rpm build
[mirror_ovs.git] / ovsdb / raft.c
CommitLineData
1b1d2e6d
BP
1/*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <config.h>
18
19#include "raft.h"
20#include "raft-private.h"
21
22#include <errno.h>
23#include <unistd.h>
24
25#include "hash.h"
26#include "jsonrpc.h"
27#include "lockfile.h"
28#include "openvswitch/dynamic-string.h"
29#include "openvswitch/hmap.h"
30#include "openvswitch/json.h"
31#include "openvswitch/list.h"
32#include "openvswitch/poll-loop.h"
33#include "openvswitch/vlog.h"
34#include "ovsdb-error.h"
35#include "ovsdb-parser.h"
36#include "ovsdb/log.h"
37#include "raft-rpc.h"
38#include "random.h"
39#include "socket-util.h"
40#include "stream.h"
41#include "timeval.h"
42#include "unicode.h"
43#include "unixctl.h"
44#include "util.h"
45#include "uuid.h"
46
47VLOG_DEFINE_THIS_MODULE(raft);
48
49/* Roles for a Raft server:
50 *
51 * - Followers: Servers in touch with the current leader.
52 *
53 * - Candidate: Servers unaware of a current leader and seeking election to
54 * leader.
55 *
56 * - Leader: Handles all client requests. At most one at a time.
57 *
58 * In normal operation there is exactly one leader and all of the other servers
59 * are followers. */
60enum raft_role {
61 RAFT_FOLLOWER,
62 RAFT_CANDIDATE,
63 RAFT_LEADER
64};
65
66/* A connection between this Raft server and another one. */
67struct raft_conn {
68 struct ovs_list list_node; /* In struct raft's 'conns' list. */
69 struct jsonrpc_session *js; /* JSON-RPC connection. */
70 struct uuid sid; /* This server's unique ID. */
71 char *nickname; /* Short name for use in log messages. */
72 bool incoming; /* True if incoming, false if outgoing. */
73 unsigned int js_seqno; /* Seqno for noticing (re)connections. */
74};
75
76static void raft_conn_close(struct raft_conn *);
77
78/* A "command", that is, a request to append an entry to the log.
79 *
80 * The Raft specification only allows clients to issue commands to the leader.
81 * With this implementation, clients may issue a command on any server, which
82 * then relays the command to the leader if necessary.
83 *
84 * This structure is thus used in three cases:
85 *
86 * 1. We are the leader and the command was issued to us directly.
87 *
88 * 2. We are a follower and relayed the command to the leader.
89 *
90 * 3. We are the leader and a follower relayed the command to us.
91 */
92struct raft_command {
93 /* All cases. */
94 struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
95 unsigned int n_refs; /* Reference count. */
96 enum raft_command_status status; /* Execution status. */
97
98 /* Case 1 only. */
99 uint64_t index; /* Index in log (0 if being relayed). */
100
101 /* Cases 2 and 3. */
102 struct uuid eid; /* Entry ID of result. */
103
104 /* Case 2 only. */
105 long long int timestamp; /* Issue or last ping time, for expiration. */
106
107 /* Case 3 only. */
108 struct uuid sid; /* The follower (otherwise UUID_ZERO). */
109};
110
111static void raft_command_complete(struct raft *, struct raft_command *,
112 enum raft_command_status);
113
114static void raft_complete_all_commands(struct raft *,
115 enum raft_command_status);
116
117/* Type of deferred action, see struct raft_waiter. */
118enum raft_waiter_type {
119 RAFT_W_ENTRY,
120 RAFT_W_TERM,
121 RAFT_W_RPC,
122};
123
124/* An action deferred until a log write commits to disk. */
125struct raft_waiter {
126 struct ovs_list list_node;
127 uint64_t commit_ticket;
128
129 enum raft_waiter_type type;
130 union {
131 /* RAFT_W_ENTRY.
132 *
133 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
134 * completion, updates 'log_synced' to indicate that the new log entry
135 * or entries are committed and, if we are leader, also updates our
136 * local 'match_index'. */
137 struct {
138 uint64_t index;
139 } entry;
140
141 /* RAFT_W_TERM.
142 *
143 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
144 * Upon completion, updates 'synced_term' and 'synced_vote', which
145 * triggers sending RPCs deferred by the uncommitted 'term' and
146 * 'vote'. */
147 struct {
148 uint64_t term;
149 struct uuid vote;
150 } term;
151
152 /* RAFT_W_RPC.
153 *
154 * Sometimes, sending an RPC to a peer must be delayed until an entry,
155 * a term, or a vote mentioned in the RPC is synced to disk. This
156 * waiter keeps a copy of such an RPC until the previous waiters have
157 * committed. */
158 union raft_rpc *rpc;
159 };
160};
161
162static struct raft_waiter *raft_waiter_create(struct raft *,
163 enum raft_waiter_type,
164 bool start_commit);
165static void raft_waiters_destroy(struct raft *);
166
167/* The Raft state machine. */
168struct raft {
169 struct hmap_node hmap_node; /* In 'all_rafts'. */
170 struct ovsdb_log *log;
171
172/* Persistent derived state.
173 *
174 * This must be updated on stable storage before responding to RPCs. It can be
175 * derived from the header, snapshot, and log in 'log'. */
176
177 struct uuid cid; /* Cluster ID (immutable for the cluster). */
178 struct uuid sid; /* Server ID (immutable for the server). */
179 char *local_address; /* Local address (immutable for the server). */
180 char *local_nickname; /* Used for local server in log messages. */
181 char *name; /* Schema name (immutable for the cluster). */
182
183 /* Contains "struct raft_server"s and represents the server configuration
184 * most recently added to 'log'. */
185 struct hmap servers;
186
187/* Persistent state on all servers.
188 *
189 * Must be updated on stable storage before responding to RPCs. */
190
191 /* Current term and the vote for that term. These might be on the way to
192 * disk now. */
193 uint64_t term; /* Initialized to 0 and only increases. */
194 struct uuid vote; /* All-zeros if no vote yet in 'term'. */
195
196 /* The term and vote that have been synced to disk. */
197 uint64_t synced_term;
198 struct uuid synced_vote;
199
200 /* The log.
201 *
202 * A log entry with index 1 never really exists; the initial snapshot for a
203 * Raft is considered to include this index. The first real log entry has
204 * index 2.
205 *
206 * A new Raft instance contains an empty log: log_start=2, log_end=2.
207 * Over time, the log grows: log_start=2, log_end=N.
208 * At some point, the server takes a snapshot: log_start=N, log_end=N.
209 * The log continues to grow: log_start=N, log_end=N+1...
210 *
211 * Must be updated on stable storage before responding to RPCs. */
212 struct raft_entry *entries; /* Log entry i is in log[i - log_start]. */
213 uint64_t log_start; /* Index of first entry in log. */
214 uint64_t log_end; /* Index of last entry in log, plus 1. */
215 uint64_t log_synced; /* Index of last synced entry. */
216 size_t allocated_log; /* Allocated entries in 'log'. */
217
218 /* Snapshot state (see Figure 5.1)
219 *
220 * This is the state of the cluster as of the last discarded log entry,
221 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
222 * Only committed log entries can be included in a snapshot. */
223 struct raft_entry snap;
224
225/* Volatile state.
226 *
227 * The snapshot is always committed, but the rest of the log might not be yet.
228 * 'last_applied' tracks what entries have been passed to the client. If the
229 * client hasn't yet read the latest snapshot, then even the snapshot isn't
230 * applied yet. Thus, the invariants are different for these members:
231 *
232 * log_start - 2 <= last_applied <= commit_index < log_end.
233 * log_start - 1 <= commit_index < log_end.
234 */
235
236 enum raft_role role; /* Current role. */
237 uint64_t commit_index; /* Max log index known to be committed. */
238 uint64_t last_applied; /* Max log index applied to state machine. */
239 struct uuid leader_sid; /* Server ID of leader (zero, if unknown). */
240
241 /* Followers and candidates only. */
242#define ELECTION_BASE_MSEC 1024
243#define ELECTION_RANGE_MSEC 1024
244 long long int election_base; /* Time of last heartbeat from leader. */
245 long long int election_timeout; /* Time at which we start an election. */
246
247 /* Used for joining a cluster. */
248 bool joining; /* Attempting to join the cluster? */
249 struct sset remote_addresses; /* Addresses to try to find other servers. */
250 long long int join_timeout; /* Time to re-send add server request. */
251
252 /* Used for leaving a cluster. */
253 bool leaving; /* True if we are leaving the cluster. */
254 bool left; /* True if we have finished leaving. */
255 long long int leave_timeout; /* Time to re-send remove server request. */
256
257 /* Failure. */
258 bool failed; /* True if unrecoverable error has occurred. */
259
260 /* File synchronization. */
261 struct ovs_list waiters; /* Contains "struct raft_waiter"s. */
262
263 /* Network connections. */
264 struct pstream *listener; /* For connections from other Raft servers. */
265 long long int listen_backoff; /* For retrying creating 'listener'. */
266 struct ovs_list conns; /* Contains struct raft_conns. */
267
268 /* Leaders only. Reinitialized after becoming leader. */
269 struct hmap add_servers; /* Contains "struct raft_server"s to add. */
270 struct raft_server *remove_server; /* Server being removed. */
271 struct hmap commands; /* Contains "struct raft_command"s. */
272#define PING_TIME_MSEC (ELECTION_BASE_MSEC / 3)
273 long long int ping_timeout; /* Time at which to send a heartbeat */
274
275 /* Candidates only. Reinitialized at start of election. */
276 int n_votes; /* Number of votes for me. */
277};
278
279/* All Raft structures. */
280static struct hmap all_rafts = HMAP_INITIALIZER(&all_rafts);
281
282static void raft_init(void);
283
284static struct ovsdb_error *raft_read_header(struct raft *)
285 OVS_WARN_UNUSED_RESULT;
286
287static void raft_send_execute_command_reply(struct raft *,
288 const struct uuid *sid,
289 const struct uuid *eid,
290 enum raft_command_status,
291 uint64_t commit_index);
292
293static void raft_update_our_match_index(struct raft *, uint64_t min_index);
294
295static void raft_send_remove_server_reply__(
296 struct raft *, const struct uuid *target_sid,
297 const struct uuid *requester_sid, struct unixctl_conn *requester_conn,
298 bool success, const char *comment);
299
300static void raft_server_init_leader(struct raft *, struct raft_server *);
301
302static bool raft_rpc_is_heartbeat(const union raft_rpc *);
303static bool raft_is_rpc_synced(const struct raft *, const union raft_rpc *);
304
305static void raft_handle_rpc(struct raft *, const union raft_rpc *);
306static bool raft_send(struct raft *, const union raft_rpc *);
307static bool raft_send__(struct raft *, const union raft_rpc *,
308 struct raft_conn *);
309static void raft_send_append_request(struct raft *,
310 struct raft_server *, unsigned int n,
311 const char *comment);
312
313static void raft_become_leader(struct raft *);
314static void raft_become_follower(struct raft *);
315static void raft_reset_timer(struct raft *);
316static void raft_send_heartbeats(struct raft *);
317static void raft_start_election(struct raft *, bool leadership_transfer);
318static bool raft_truncate(struct raft *, uint64_t new_end);
319static void raft_get_servers_from_log(struct raft *, enum vlog_level);
320
321static bool raft_handle_write_error(struct raft *, struct ovsdb_error *);
322
323static void raft_run_reconfigure(struct raft *);
324
325static struct raft_server *
326raft_find_server(const struct raft *raft, const struct uuid *sid)
327{
328 return raft_server_find(&raft->servers, sid);
329}
330
331static char *
332raft_make_address_passive(const char *address_)
333{
334 if (!strncmp(address_, "unix:", 5)) {
335 return xasprintf("p%s", address_);
336 } else {
337 char *address = xstrdup(address_);
0b043300
BP
338 char *host, *port;
339 inet_parse_host_port_tokens(strchr(address, ':') + 1, &host, &port);
1b1d2e6d
BP
340
341 struct ds paddr = DS_EMPTY_INITIALIZER;
342 ds_put_format(&paddr, "p%.3s:%s:", address, port);
343 if (strchr(host, ':')) {
344 ds_put_format(&paddr, "[%s]", host);
345 } else {
346 ds_put_cstr(&paddr, host);
347 }
348 free(address);
349 return ds_steal_cstr(&paddr);
350 }
351}
352
353static struct raft *
354raft_alloc(void)
355{
356 raft_init();
357
358 struct raft *raft = xzalloc(sizeof *raft);
359 hmap_node_nullify(&raft->hmap_node);
360 hmap_init(&raft->servers);
361 raft->log_start = raft->log_end = 1;
362 raft->role = RAFT_FOLLOWER;
363 sset_init(&raft->remote_addresses);
364 raft->join_timeout = LLONG_MAX;
365 ovs_list_init(&raft->waiters);
366 raft->listen_backoff = LLONG_MIN;
367 ovs_list_init(&raft->conns);
368 hmap_init(&raft->add_servers);
369 hmap_init(&raft->commands);
370
371 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
372 raft_reset_timer(raft);
373
374 return raft;
375}
376
377/* Creates an on-disk file that represents a new Raft cluster and initializes
378 * it to consist of a single server, the one on which this function is called.
379 *
380 * Creates the local copy of the cluster's log in 'file_name', which must not
381 * already exist. Gives it the name 'name', which should be the database
382 * schema name and which is used only to match up this database with the server
383 * added to the cluster later if the cluster ID is unavailable.
384 *
385 * The new server is located at 'local_address', which must take one of the
386 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
387 * square bracket enclosed IPv6 address and PORT is a TCP port number.
388 *
389 * This only creates the on-disk file. Use raft_open() to start operating the
390 * new server.
391 *
392 * Returns null if successful, otherwise an ovsdb_error describing the
393 * problem. */
394struct ovsdb_error * OVS_WARN_UNUSED_RESULT
395raft_create_cluster(const char *file_name, const char *name,
396 const char *local_address, const struct json *data)
397{
398 /* Parse and verify validity of the local address. */
399 struct ovsdb_error *error = raft_address_validate(local_address);
400 if (error) {
401 return error;
402 }
403
404 /* Create log file. */
405 struct ovsdb_log *log;
406 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
407 -1, &log);
408 if (error) {
409 return error;
410 }
411
412 /* Write log file. */
413 struct raft_header h = {
414 .sid = uuid_random(),
415 .cid = uuid_random(),
416 .name = xstrdup(name),
417 .local_address = xstrdup(local_address),
418 .joining = false,
419 .remote_addresses = SSET_INITIALIZER(&h.remote_addresses),
420 .snap_index = 1,
421 .snap = {
422 .term = 1,
423 .data = json_nullable_clone(data),
424 .eid = uuid_random(),
425 .servers = json_object_create(),
426 },
427 };
428 shash_add_nocopy(json_object(h.snap.servers),
429 xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
430 json_string_create(local_address));
431 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
432 raft_header_uninit(&h);
433 if (!error) {
434 error = ovsdb_log_commit_block(log);
435 }
436 ovsdb_log_close(log);
437
438 return error;
439}
440
441/* Creates a database file that represents a new server in an existing Raft
442 * cluster.
443 *
444 * Creates the local copy of the cluster's log in 'file_name', which must not
445 * already exist. Gives it the name 'name', which must be the same name
446 * passed in to raft_create_cluster() earlier.
447 *
448 * 'cid' is optional. If specified, the new server will join only the cluster
449 * with the given cluster ID.
450 *
451 * The new server is located at 'local_address', which must take one of the
452 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
453 * square bracket enclosed IPv6 address and PORT is a TCP port number.
454 *
455 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
456 * specifies the addresses of existing servers in the cluster. One server out
457 * of the existing cluster is sufficient, as long as that server is reachable
458 * and not partitioned from the current cluster leader. If multiple servers
459 * from the cluster are specified, then it is sufficient for any of them to
460 * meet this criterion.
461 *
462 * This only creates the on-disk file and does no network access. Use
463 * raft_open() to start operating the new server. (Until this happens, the
464 * new server has not joined the cluster.)
465 *
466 * Returns null if successful, otherwise an ovsdb_error describing the
467 * problem. */
468struct ovsdb_error * OVS_WARN_UNUSED_RESULT
469raft_join_cluster(const char *file_name,
470 const char *name, const char *local_address,
471 const struct sset *remote_addresses,
472 const struct uuid *cid)
473{
474 ovs_assert(!sset_is_empty(remote_addresses));
475
476 /* Parse and verify validity of the addresses. */
477 struct ovsdb_error *error = raft_address_validate(local_address);
478 if (error) {
479 return error;
480 }
481 const char *addr;
482 SSET_FOR_EACH (addr, remote_addresses) {
483 error = raft_address_validate(addr);
484 if (error) {
485 return error;
486 }
487 if (!strcmp(addr, local_address)) {
488 return ovsdb_error(NULL, "remote addresses cannot be the same "
489 "as the local address");
490 }
491 }
492
493 /* Verify validity of the cluster ID (if provided). */
494 if (cid && uuid_is_zero(cid)) {
495 return ovsdb_error(NULL, "all-zero UUID is not valid cluster ID");
496 }
497
498 /* Create log file. */
499 struct ovsdb_log *log;
500 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
501 -1, &log);
502 if (error) {
503 return error;
504 }
505
506 /* Write log file. */
507 struct raft_header h = {
508 .sid = uuid_random(),
509 .cid = cid ? *cid : UUID_ZERO,
510 .name = xstrdup(name),
511 .local_address = xstrdup(local_address),
512 .joining = true,
513 /* No snapshot yet. */
514 };
515 sset_clone(&h.remote_addresses, remote_addresses);
516 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
517 raft_header_uninit(&h);
518 if (!error) {
519 error = ovsdb_log_commit_block(log);
520 }
521 ovsdb_log_close(log);
522
523 return error;
524}
525
526/* Reads the initial header record from 'log', which must be a Raft clustered
527 * database log, and populates '*md' with the information read from it. The
528 * caller must eventually destroy 'md' with raft_metadata_destroy().
529 *
530 * On success, returns NULL. On failure, returns an error that the caller must
531 * eventually destroy and zeros '*md'. */
532struct ovsdb_error * OVS_WARN_UNUSED_RESULT
533raft_read_metadata(struct ovsdb_log *log, struct raft_metadata *md)
534{
535 struct raft *raft = raft_alloc();
536 raft->log = log;
537
538 struct ovsdb_error *error = raft_read_header(raft);
539 if (!error) {
540 md->sid = raft->sid;
541 md->name = xstrdup(raft->name);
542 md->local = xstrdup(raft->local_address);
543 md->cid = raft->cid;
544 } else {
545 memset(md, 0, sizeof *md);
546 }
547
548 raft->log = NULL;
549 raft_close(raft);
550 return error;
551}
552
553/* Frees the metadata in 'md'. */
554void
555raft_metadata_destroy(struct raft_metadata *md)
556{
557 if (md) {
558 free(md->name);
559 free(md->local);
560 }
561}
562
563static const struct raft_entry *
564raft_get_entry(const struct raft *raft, uint64_t index)
565{
566 ovs_assert(index >= raft->log_start);
567 ovs_assert(index < raft->log_end);
568 return &raft->entries[index - raft->log_start];
569}
570
571static uint64_t
572raft_get_term(const struct raft *raft, uint64_t index)
573{
574 return (index == raft->log_start - 1
575 ? raft->snap.term
576 : raft_get_entry(raft, index)->term);
577}
578
579static const struct json *
580raft_servers_for_index(const struct raft *raft, uint64_t index)
581{
582 ovs_assert(index >= raft->log_start - 1);
583 ovs_assert(index < raft->log_end);
584
585 const struct json *servers = raft->snap.servers;
586 for (uint64_t i = raft->log_start; i <= index; i++) {
587 const struct raft_entry *e = raft_get_entry(raft, i);
588 if (e->servers) {
589 servers = e->servers;
590 }
591 }
592 return servers;
593}
594
595static void
596raft_set_servers(struct raft *raft, const struct hmap *new_servers,
597 enum vlog_level level)
598{
599 struct raft_server *s, *next;
600 HMAP_FOR_EACH_SAFE (s, next, hmap_node, &raft->servers) {
601 if (!raft_server_find(new_servers, &s->sid)) {
602 ovs_assert(s != raft->remove_server);
603
604 hmap_remove(&raft->servers, &s->hmap_node);
605 VLOG(level, "server %s removed from configuration", s->nickname);
606 raft_server_destroy(s);
607 }
608 }
609
610 HMAP_FOR_EACH_SAFE (s, next, hmap_node, new_servers) {
611 if (!raft_find_server(raft, &s->sid)) {
612 VLOG(level, "server %s added to configuration", s->nickname);
613
614 struct raft_server *new
615 = raft_server_add(&raft->servers, &s->sid, s->address);
616 raft_server_init_leader(raft, new);
617 }
618 }
619}
620
621static uint64_t
622raft_add_entry(struct raft *raft,
623 uint64_t term, struct json *data, const struct uuid *eid,
624 struct json *servers)
625{
626 if (raft->log_end - raft->log_start >= raft->allocated_log) {
627 raft->entries = x2nrealloc(raft->entries, &raft->allocated_log,
628 sizeof *raft->entries);
629 }
630
631 uint64_t index = raft->log_end++;
632 struct raft_entry *entry = &raft->entries[index - raft->log_start];
633 entry->term = term;
634 entry->data = data;
635 entry->eid = eid ? *eid : UUID_ZERO;
636 entry->servers = servers;
637 return index;
638}
639
640/* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers' to
641 * 'raft''s log and returns an error indication. */
642static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
643raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
644 const struct uuid *eid, struct json *servers)
645{
646 struct raft_record r = {
647 .type = RAFT_REC_ENTRY,
648 .term = term,
649 .entry = {
650 .index = raft_add_entry(raft, term, data, eid, servers),
651 .data = data,
652 .servers = servers,
653 .eid = eid ? *eid : UUID_ZERO,
654 },
655 };
656 return ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r));
657}
658
659static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
660raft_write_state(struct ovsdb_log *log,
661 uint64_t term, const struct uuid *vote)
662{
663 struct raft_record r = { .term = term };
664 if (vote && !uuid_is_zero(vote)) {
665 r.type = RAFT_REC_VOTE;
666 r.sid = *vote;
667 } else {
668 r.type = RAFT_REC_TERM;
669 }
670 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
671}
672
673static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
674raft_apply_record(struct raft *raft, unsigned long long int rec_idx,
675 const struct raft_record *r)
676{
677 /* Apply "term", which is present in most kinds of records (and otherwise
678 * 0).
679 *
680 * A Raft leader can replicate entries from previous terms to the other
681 * servers in the cluster, retaining the original terms on those entries
682 * (see section 3.6.2 "Committing entries from previous terms" for more
683 * information), so it's OK for the term in a log record to precede the
684 * current term. */
685 if (r->term > raft->term) {
686 raft->term = raft->synced_term = r->term;
687 raft->vote = raft->synced_vote = UUID_ZERO;
688 }
689
690 switch (r->type) {
691 case RAFT_REC_ENTRY:
692 if (r->entry.index < raft->commit_index) {
693 return ovsdb_error(NULL, "record %llu attempts to truncate log "
694 "from %"PRIu64" to %"PRIu64" entries, but "
695 "commit index is already %"PRIu64,
696 rec_idx, raft->log_end, r->entry.index,
697 raft->commit_index);
698 } else if (r->entry.index > raft->log_end) {
699 return ovsdb_error(NULL, "record %llu with index %"PRIu64" skips "
700 "past expected index %"PRIu64,
701 rec_idx, r->entry.index, raft->log_end);
702 }
703
704 if (r->entry.index < raft->log_end) {
705 /* This can happen, but it is notable. */
706 VLOG_DBG("record %llu truncates log from %"PRIu64" to %"PRIu64
707 " entries", rec_idx, raft->log_end, r->entry.index);
708 raft_truncate(raft, r->entry.index);
709 }
710
711 uint64_t prev_term = (raft->log_end > raft->log_start
712 ? raft->entries[raft->log_end
713 - raft->log_start - 1].term
714 : raft->snap.term);
715 if (r->term < prev_term) {
716 return ovsdb_error(NULL, "record %llu with index %"PRIu64" term "
717 "%"PRIu64" precedes previous entry's term "
718 "%"PRIu64,
719 rec_idx, r->entry.index, r->term, prev_term);
720 }
721
722 raft->log_synced = raft_add_entry(
723 raft, r->term,
724 json_nullable_clone(r->entry.data), &r->entry.eid,
725 json_nullable_clone(r->entry.servers));
726 return NULL;
727
728 case RAFT_REC_TERM:
729 return NULL;
730
731 case RAFT_REC_VOTE:
732 if (r->term < raft->term) {
733 return ovsdb_error(NULL, "record %llu votes for term %"PRIu64" "
734 "but current term is %"PRIu64,
735 rec_idx, r->term, raft->term);
736 } else if (!uuid_is_zero(&raft->vote)
737 && !uuid_equals(&raft->vote, &r->sid)) {
738 return ovsdb_error(NULL, "record %llu votes for "SID_FMT" in term "
739 "%"PRIu64" but a previous record for the "
740 "same term voted for "SID_FMT, rec_idx,
741 SID_ARGS(&raft->vote), r->term,
742 SID_ARGS(&r->sid));
743 } else {
744 raft->vote = raft->synced_vote = r->sid;
745 return NULL;
746 }
747 break;
748
749 case RAFT_REC_NOTE:
750 if (!strcmp(r->note, "left")) {
751 return ovsdb_error(NULL, "record %llu indicates server has left "
752 "the cluster; it cannot be added back (use "
753 "\"ovsdb-tool join-cluster\" to add a new "
754 "server)", rec_idx);
755 }
756 return NULL;
757
758 case RAFT_REC_COMMIT_INDEX:
759 if (r->commit_index < raft->commit_index) {
760 return ovsdb_error(NULL, "record %llu regresses commit index "
761 "from %"PRIu64 " to %"PRIu64,
762 rec_idx, raft->commit_index, r->commit_index);
763 } else if (r->commit_index >= raft->log_end) {
764 return ovsdb_error(NULL, "record %llu advances commit index to "
765 "%"PRIu64 " but last log index is %"PRIu64,
766 rec_idx, r->commit_index, raft->log_end - 1);
767 } else {
768 raft->commit_index = r->commit_index;
769 return NULL;
770 }
771 break;
772
773 case RAFT_REC_LEADER:
774 /* XXX we could use this to take back leadership for quick restart */
775 return NULL;
776
777 default:
778 OVS_NOT_REACHED();
779 }
780}
781
782static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
783raft_read_header(struct raft *raft)
784{
785 /* Read header record. */
786 struct json *json;
787 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
788 if (error || !json) {
789 /* Report error or end-of-file. */
790 return error;
791 }
792 ovsdb_log_mark_base(raft->log);
793
794 struct raft_header h;
795 error = raft_header_from_json(&h, json);
796 json_destroy(json);
797 if (error) {
798 return error;
799 }
800
801 raft->sid = h.sid;
802 raft->cid = h.cid;
803 raft->name = xstrdup(h.name);
804 raft->local_address = xstrdup(h.local_address);
805 raft->local_nickname = raft_address_to_nickname(h.local_address, &h.sid);
806 raft->joining = h.joining;
807
808 if (h.joining) {
809 sset_clone(&raft->remote_addresses, &h.remote_addresses);
810 } else {
811 raft_entry_clone(&raft->snap, &h.snap);
812 raft->log_start = raft->log_end = h.snap_index + 1;
813 raft->commit_index = h.snap_index;
814 raft->last_applied = h.snap_index - 1;
815 }
816
817 raft_header_uninit(&h);
818
819 return NULL;
820}
821
822static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
823raft_read_log(struct raft *raft)
824{
825 for (unsigned long long int i = 1; ; i++) {
826 struct json *json;
827 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
828 if (!json) {
829 if (error) {
830 /* We assume that the error is due to a partial write while
831 * appending to the file before a crash, so log it and
832 * continue. */
833 char *error_string = ovsdb_error_to_string_free(error);
834 VLOG_WARN("%s", error_string);
835 free(error_string);
836 error = NULL;
837 }
838 break;
839 }
840
841 struct raft_record r;
842 error = raft_record_from_json(&r, json);
843 if (!error) {
844 error = raft_apply_record(raft, i, &r);
845 raft_record_uninit(&r);
846 }
847 if (error) {
848 return ovsdb_wrap_error(error, "error reading record %llu from "
849 "%s log", i, raft->name);
850 }
851 }
852
853 /* Set the most recent servers. */
854 raft_get_servers_from_log(raft, VLL_DBG);
855
856 return NULL;
857}
858
859static void
860raft_reset_timer(struct raft *raft)
861{
862 unsigned int duration = (ELECTION_BASE_MSEC
863 + random_range(ELECTION_RANGE_MSEC));
864 raft->election_base = time_msec();
865 raft->election_timeout = raft->election_base + duration;
866}
867
868static void
869raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
870 const struct uuid *sid, bool incoming)
871{
872 struct raft_conn *conn = xzalloc(sizeof *conn);
873 ovs_list_push_back(&raft->conns, &conn->list_node);
874 conn->js = js;
875 if (sid) {
876 conn->sid = *sid;
877 }
878 conn->nickname = raft_address_to_nickname(jsonrpc_session_get_name(js),
879 &conn->sid);
880 conn->incoming = incoming;
881 conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
882}
883
884/* Starts the local server in an existing Raft cluster, using the local copy of
885 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
886 * successful or not. */
887struct ovsdb_error * OVS_WARN_UNUSED_RESULT
888raft_open(struct ovsdb_log *log, struct raft **raftp)
889{
890 struct raft *raft = raft_alloc();
891 raft->log = log;
892
893 struct ovsdb_error *error = raft_read_header(raft);
894 if (error) {
895 goto error;
896 }
897
898 if (!raft->joining) {
899 error = raft_read_log(raft);
900 if (error) {
901 goto error;
902 }
903
904 /* Find our own server. */
905 if (!raft_find_server(raft, &raft->sid)) {
906 error = ovsdb_error(NULL, "server does not belong to cluster");
907 goto error;
908 }
909
910 /* If there's only one server, start an election right away so that the
911 * cluster bootstraps quickly. */
912 if (hmap_count(&raft->servers) == 1) {
913 raft_start_election(raft, false);
914 }
915 } else {
916 raft->join_timeout = time_msec() + 1000;
917 }
918
919 *raftp = raft;
920 hmap_insert(&all_rafts, &raft->hmap_node, hash_string(raft->name, 0));
921 return NULL;
922
923error:
924 raft_close(raft);
925 *raftp = NULL;
926 return error;
927}
928
929/* Returns the name of 'raft', which in OVSDB is the database schema name. */
930const char *
931raft_get_name(const struct raft *raft)
932{
933 return raft->name;
934}
935
936/* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
937 * its cluster, then 'cid' will be all-zeros (unless the administrator
938 * specified a cluster ID running "ovsdb-tool join-cluster").
939 *
940 * Each cluster has a unique cluster ID. */
941const struct uuid *
942raft_get_cid(const struct raft *raft)
943{
944 return &raft->cid;
945}
946
947/* Returns the server ID of 'raft'. Each server has a unique server ID. */
948const struct uuid *
949raft_get_sid(const struct raft *raft)
950{
951 return &raft->sid;
952}
953
954/* Returns true if 'raft' has completed joining its cluster, has not left or
955 * initiated leaving the cluster, does not have failed disk storage, and is
956 * apparently connected to the leader in a healthy way (or is itself the
957 * leader).*/
958bool
959raft_is_connected(const struct raft *raft)
960{
961 return (raft->role != RAFT_CANDIDATE
962 && !raft->joining
963 && !raft->leaving
964 && !raft->left
965 && !raft->failed);
966}
967
968/* Returns true if 'raft' is the cluster leader. */
969bool
970raft_is_leader(const struct raft *raft)
971{
972 return raft->role == RAFT_LEADER;
973}
974
975/* Returns true if 'raft' is the process of joining its cluster. */
976bool
977raft_is_joining(const struct raft *raft)
978{
979 return raft->joining;
980}
981
982/* Only returns *connected* connections. */
983static struct raft_conn *
984raft_find_conn_by_sid(struct raft *raft, const struct uuid *sid)
985{
986 if (!uuid_is_zero(sid)) {
987 struct raft_conn *conn;
988 LIST_FOR_EACH (conn, list_node, &raft->conns) {
989 if (uuid_equals(sid, &conn->sid)
990 && jsonrpc_session_is_connected(conn->js)) {
991 return conn;
992 }
993 }
994 }
995 return NULL;
996}
997
998static struct raft_conn *
999raft_find_conn_by_address(struct raft *raft, const char *address)
1000{
1001 struct raft_conn *conn;
1002 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1003 if (!strcmp(jsonrpc_session_get_name(conn->js), address)) {
1004 return conn;
1005 }
1006 }
1007 return NULL;
1008}
1009
1010static void OVS_PRINTF_FORMAT(3, 4)
1011raft_record_note(struct raft *raft, const char *note,
1012 const char *comment_format, ...)
1013{
1014 va_list args;
1015 va_start(args, comment_format);
1016 char *comment = xvasprintf(comment_format, args);
1017 va_end(args);
1018
1019 struct raft_record r = {
1020 .type = RAFT_REC_NOTE,
1021 .comment = comment,
1022 .note = CONST_CAST(char *, note),
1023 };
1024 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
1025
1026 free(comment);
1027}
1028
1029/* If we're leader, try to transfer leadership to another server, logging
1030 * 'reason' as the human-readable reason (it should be a phrase suitable for
1031 * following "because") . */
1032void
1033raft_transfer_leadership(struct raft *raft, const char *reason)
1034{
1035 if (raft->role != RAFT_LEADER) {
1036 return;
1037 }
1038
1039 struct raft_server *s;
1040 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1041 if (!uuid_equals(&raft->sid, &s->sid)
1042 && s->phase == RAFT_PHASE_STABLE) {
1043 struct raft_conn *conn = raft_find_conn_by_sid(raft, &s->sid);
1044 if (!conn) {
1045 continue;
1046 }
1047
1048 union raft_rpc rpc = {
1049 .become_leader = {
1050 .common = {
1051 .comment = CONST_CAST(char *, reason),
1052 .type = RAFT_RPC_BECOME_LEADER,
1053 .sid = s->sid,
1054 },
1055 .term = raft->term,
1056 }
1057 };
1058 raft_send__(raft, &rpc, conn);
1059
1060 raft_record_note(raft, "transfer leadership",
1061 "transferring leadership to %s because %s",
1062 s->nickname, reason);
1063 break;
1064 }
1065 }
1066}
1067
1068/* Send a RemoveServerRequest to the rest of the servers in the cluster.
1069 *
1070 * If we know which server is the leader, we can just send the request to it.
1071 * However, we might not know which server is the leader, and we might never
1072 * find out if the remove request was actually previously committed by a
1073 * majority of the servers (because in that case the new leader will not send
1074 * AppendRequests or heartbeats to us). Therefore, we instead send
1075 * RemoveRequests to every server. This theoretically has the same problem, if
1076 * the current cluster leader was not previously a member of the cluster, but
1077 * it seems likely to be more robust in practice. */
1078static void
1079raft_send_remove_server_requests(struct raft *raft)
1080{
1081 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1082 VLOG_INFO_RL(&rl, "sending remove request (joining=%s, leaving=%s)",
1083 raft->joining ? "true" : "false",
1084 raft->leaving ? "true" : "false");
1085 const struct raft_server *s;
1086 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1087 if (!uuid_equals(&s->sid, &raft->sid)) {
1088 union raft_rpc rpc = (union raft_rpc) {
1089 .remove_server_request = {
1090 .common = {
1091 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1092 .sid = s->sid,
1093 },
1094 .sid = raft->sid,
1095 },
1096 };
1097 raft_send(raft, &rpc);
1098 }
1099 }
1100
1101 raft->leave_timeout = time_msec() + ELECTION_BASE_MSEC;
1102}
1103
1104/* Attempts to start 'raft' leaving its cluster. The caller can check progress
1105 * using raft_is_leaving() and raft_left(). */
1106void
1107raft_leave(struct raft *raft)
1108{
1109 if (raft->joining || raft->failed || raft->leaving || raft->left) {
1110 return;
1111 }
1112 VLOG_INFO(SID_FMT": starting to leave cluster "CID_FMT,
1113 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1114 raft->leaving = true;
1115 raft_transfer_leadership(raft, "this server is leaving the cluster");
1116 raft_become_follower(raft);
1117 raft_send_remove_server_requests(raft);
1118 raft->leave_timeout = time_msec() + ELECTION_BASE_MSEC;
1119}
1120
1121/* Returns true if 'raft' is currently attempting to leave its cluster. */
1122bool
1123raft_is_leaving(const struct raft *raft)
1124{
1125 return raft->leaving;
1126}
1127
1128/* Returns true if 'raft' successfully left its cluster. */
1129bool
1130raft_left(const struct raft *raft)
1131{
1132 return raft->left;
1133}
1134
1135/* Returns true if 'raft' has experienced a disk I/O failure. When this
1136 * returns true, only closing and reopening 'raft' allows for recovery. */
1137bool
1138raft_failed(const struct raft *raft)
1139{
1140 return raft->failed;
1141}
1142
1143/* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1144 * current cluster. */
1145void
1146raft_take_leadership(struct raft *raft)
1147{
1148 if (raft->role != RAFT_LEADER) {
1149 raft_start_election(raft, true);
1150 }
1151}
1152
1153/* Closes everything owned by 'raft' that might be visible outside the process:
1154 * network connections, commands, etc. This is part of closing 'raft'; it is
1155 * also used if 'raft' has failed in an unrecoverable way. */
1156static void
1157raft_close__(struct raft *raft)
1158{
1159 if (!hmap_node_is_null(&raft->hmap_node)) {
1160 hmap_remove(&all_rafts, &raft->hmap_node);
1161 hmap_node_nullify(&raft->hmap_node);
1162 }
1163
1164 raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
1165
1166 struct raft_server *rs = raft->remove_server;
1167 if (rs) {
1168 raft_send_remove_server_reply__(raft, &rs->sid, &rs->requester_sid,
1169 rs->requester_conn, false,
1170 RAFT_SERVER_SHUTDOWN);
1171 raft_server_destroy(raft->remove_server);
1172 raft->remove_server = NULL;
1173 }
1174
1175 struct raft_conn *conn, *next;
1176 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1177 raft_conn_close(conn);
1178 }
1179}
1180
1181/* Closes and frees 'raft'.
1182 *
1183 * A server's cluster membership is independent of whether the server is
1184 * actually running. When a server that is a member of a cluster closes, the
1185 * cluster treats this as a server failure. */
1186void
1187raft_close(struct raft *raft)
1188{
1189 if (!raft) {
1190 return;
1191 }
1192
1193 raft_transfer_leadership(raft, "this server is shutting down");
1194
1195 raft_close__(raft);
1196
1197 ovsdb_log_close(raft->log);
1198
1199 raft_servers_destroy(&raft->servers);
1200
1201 for (uint64_t index = raft->log_start; index < raft->log_end; index++) {
1202 struct raft_entry *e = &raft->entries[index - raft->log_start];
1203 raft_entry_uninit(e);
1204 }
1205 free(raft->entries);
1206
1207 raft_entry_uninit(&raft->snap);
1208
1209 raft_waiters_destroy(raft);
1210
1211 raft_servers_destroy(&raft->add_servers);
1212
1213 hmap_destroy(&raft->commands);
1214
1215 pstream_close(raft->listener);
1216
1217 sset_destroy(&raft->remote_addresses);
1218 free(raft->local_address);
1219 free(raft->local_nickname);
1220 free(raft->name);
1221
1222 free(raft);
1223}
1224
1225static bool
1226raft_conn_receive(struct raft *raft, struct raft_conn *conn,
1227 union raft_rpc *rpc)
1228{
1229 struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
1230 if (!msg) {
1231 return false;
1232 }
1233
1234 struct ovsdb_error *error = raft_rpc_from_jsonrpc(&raft->cid, &raft->sid,
1235 msg, rpc);
1236 jsonrpc_msg_destroy(msg);
1237 if (error) {
1238 char *s = ovsdb_error_to_string_free(error);
1239 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn->js), s);
1240 free(s);
1241 return false;
1242 }
1243
1244 if (uuid_is_zero(&conn->sid)) {
1245 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1246 conn->sid = rpc->common.sid;
1247 VLOG_INFO_RL(&rl, "%s: learned server ID "SID_FMT,
1248 jsonrpc_session_get_name(conn->js), SID_ARGS(&conn->sid));
1249 } else if (!uuid_equals(&conn->sid, &rpc->common.sid)) {
1250 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1251 VLOG_WARN_RL(&rl, "%s: ignoring message with unexpected server ID "
1252 SID_FMT" (expected "SID_FMT")",
1253 jsonrpc_session_get_name(conn->js),
1254 SID_ARGS(&rpc->common.sid), SID_ARGS(&conn->sid));
1255 raft_rpc_uninit(rpc);
1256 return false;
1257 }
1258
1259 const char *address = (rpc->type == RAFT_RPC_HELLO_REQUEST
1260 ? rpc->hello_request.address
1261 : rpc->type == RAFT_RPC_ADD_SERVER_REQUEST
1262 ? rpc->add_server_request.address
1263 : NULL);
1264 if (address) {
1265 char *new_nickname = raft_address_to_nickname(address, &conn->sid);
1266 if (strcmp(conn->nickname, new_nickname)) {
1267 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1268 VLOG_INFO_RL(&rl, "%s: learned remote address %s",
1269 jsonrpc_session_get_name(conn->js), address);
1270
1271 free(conn->nickname);
1272 conn->nickname = new_nickname;
1273 } else {
1274 free(new_nickname);
1275 }
1276 }
1277
1278 return true;
1279}
1280
1281static const char *
1282raft_get_nickname(const struct raft *raft, const struct uuid *sid,
1283 char buf[SID_LEN + 1], size_t bufsize)
1284{
1285 if (uuid_equals(sid, &raft->sid)) {
1286 return raft->local_nickname;
1287 }
1288
1289 const char *s = raft_servers_get_nickname__(&raft->servers, sid);
1290 if (s) {
1291 return s;
1292 }
1293
1294 return raft_servers_get_nickname(&raft->add_servers, sid, buf, bufsize);
1295}
1296
1297static void
1298log_rpc(const union raft_rpc *rpc,
1299 const char *direction, const struct raft_conn *conn)
1300{
1301 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
1302 if (!raft_rpc_is_heartbeat(rpc) && !VLOG_DROP_DBG(&rl)) {
1303 struct ds s = DS_EMPTY_INITIALIZER;
1304 raft_rpc_format(rpc, &s);
1305 VLOG_DBG("%s%s %s", direction, conn->nickname, ds_cstr(&s));
1306 ds_destroy(&s);
1307 }
1308}
1309
1310static void
1311raft_send_add_server_request(struct raft *raft, struct raft_conn *conn)
1312{
1313 union raft_rpc rq = {
1314 .add_server_request = {
1315 .common = {
1316 .type = RAFT_RPC_ADD_SERVER_REQUEST,
1317 .sid = UUID_ZERO,
1318 .comment = NULL,
1319 },
1320 .address = raft->local_address,
1321 },
1322 };
1323 raft_send__(raft, &rq, conn);
1324}
1325
1326static void
1327raft_conn_run(struct raft *raft, struct raft_conn *conn)
1328{
1329 jsonrpc_session_run(conn->js);
1330
1331 unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
1332 bool just_connected = (new_seqno != conn->js_seqno
1333 && jsonrpc_session_is_connected(conn->js));
1334 conn->js_seqno = new_seqno;
1335 if (just_connected) {
1336 if (raft->joining) {
1337 raft_send_add_server_request(raft, conn);
1338 } else if (raft->leaving) {
1339 union raft_rpc rq = {
1340 .remove_server_request = {
1341 .common = {
1342 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1343 .sid = conn->sid,
1344 },
1345 .sid = raft->sid,
1346 },
1347 };
1348 raft_send__(raft, &rq, conn);
1349 } else {
1350 union raft_rpc rq = (union raft_rpc) {
1351 .hello_request = {
1352 .common = {
1353 .type = RAFT_RPC_HELLO_REQUEST,
1354 .sid = conn->sid,
1355 },
1356 .address = raft->local_address,
1357 },
1358 };
1359 raft_send__(raft, &rq, conn);
1360 }
1361 }
1362
1363 for (size_t i = 0; i < 50; i++) {
1364 union raft_rpc rpc;
1365 if (!raft_conn_receive(raft, conn, &rpc)) {
1366 break;
1367 }
1368
1369 log_rpc(&rpc, "<--", conn);
1370 raft_handle_rpc(raft, &rpc);
1371 raft_rpc_uninit(&rpc);
1372 }
1373}
1374
1375static void
1376raft_waiter_complete_rpc(struct raft *raft, const union raft_rpc *rpc)
1377{
1378 uint64_t term = raft_rpc_get_term(rpc);
1379 if (term && term < raft->term) {
1380 /* Drop the message because it's for an expired term. */
1381 return;
1382 }
1383
1384 if (!raft_is_rpc_synced(raft, rpc)) {
1385 /* This is a bug. A reply message is deferred because some state in
1386 * the message, such as a term or index, has not been committed to
1387 * disk, and they should only be completed when that commit is done.
1388 * But this message is being completed before the commit is finished.
1389 * Complain, and hope that someone reports the bug. */
1390 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1391 if (VLOG_DROP_ERR(&rl)) {
1392 return;
1393 }
1394
1395 struct ds s = DS_EMPTY_INITIALIZER;
1396
1397 if (term > raft->synced_term) {
1398 ds_put_format(&s, " because message term %"PRIu64" is "
1399 "past synced term %"PRIu64,
1400 term, raft->synced_term);
1401 }
1402
1403 uint64_t index = raft_rpc_get_min_sync_index(rpc);
1404 if (index > raft->log_synced) {
1405 ds_put_format(&s, " %s message index %"PRIu64" is past last "
1406 "synced index %"PRIu64,
1407 s.length ? "and" : "because",
1408 index, raft->log_synced);
1409 }
1410
1411 const struct uuid *vote = raft_rpc_get_vote(rpc);
1412 if (vote && !uuid_equals(vote, &raft->synced_vote)) {
1413 char buf1[SID_LEN + 1];
1414 char buf2[SID_LEN + 1];
1415 ds_put_format(&s, " %s vote %s differs from synced vote %s",
1416 s.length ? "and" : "because",
1417 raft_get_nickname(raft, vote, buf1, sizeof buf1),
1418 raft_get_nickname(raft, &raft->synced_vote,
1419 buf2, sizeof buf2));
1420 }
1421
1422 char buf[SID_LEN + 1];
1423 ds_put_format(&s, ": %s ",
1424 raft_get_nickname(raft, &rpc->common.sid,
1425 buf, sizeof buf));
1426 raft_rpc_format(rpc, &s);
1427 VLOG_ERR("internal error: deferred %s message completed "
1428 "but not ready to send%s",
1429 raft_rpc_type_to_string(rpc->type), ds_cstr(&s));
1430 ds_destroy(&s);
1431
1432 return;
1433 }
1434
1435 struct raft_conn *dst = raft_find_conn_by_sid(raft, &rpc->common.sid);
1436 if (dst) {
1437 raft_send__(raft, rpc, dst);
1438 }
1439}
1440
1441static void
1442raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
1443{
1444 switch (w->type) {
1445 case RAFT_W_ENTRY:
1446 if (raft->role == RAFT_LEADER) {
1447 raft_update_our_match_index(raft, w->entry.index);
1448 }
1449 raft->log_synced = w->entry.index;
1450 break;
1451
1452 case RAFT_W_TERM:
1453 raft->synced_term = w->term.term;
1454 raft->synced_vote = w->term.vote;
1455 break;
1456
1457 case RAFT_W_RPC:
1458 raft_waiter_complete_rpc(raft, w->rpc);
1459 break;
1460 }
1461}
1462
1463static void
1464raft_waiter_destroy(struct raft_waiter *w)
1465{
1466 if (!w) {
1467 return;
1468 }
1469
1470 ovs_list_remove(&w->list_node);
1471
1472 switch (w->type) {
1473 case RAFT_W_ENTRY:
1474 case RAFT_W_TERM:
1475 break;
1476
1477 case RAFT_W_RPC:
1478 raft_rpc_uninit(w->rpc);
1479 free(w->rpc);
1480 break;
1481 }
1482 free(w);
1483}
1484
1485static void
1486raft_waiters_run(struct raft *raft)
1487{
1488 if (ovs_list_is_empty(&raft->waiters)) {
1489 return;
1490 }
1491
1492 uint64_t cur = ovsdb_log_commit_progress(raft->log);
1493 struct raft_waiter *w, *next;
1494 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1495 if (cur < w->commit_ticket) {
1496 break;
1497 }
1498 raft_waiter_complete(raft, w);
1499 raft_waiter_destroy(w);
1500 }
1501}
1502
1503static void
1504raft_waiters_wait(struct raft *raft)
1505{
1506 struct raft_waiter *w;
1507 LIST_FOR_EACH (w, list_node, &raft->waiters) {
1508 ovsdb_log_commit_wait(raft->log, w->commit_ticket);
1509 break;
1510 }
1511}
1512
1513static void
1514raft_waiters_destroy(struct raft *raft)
1515{
1516 struct raft_waiter *w, *next;
1517 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1518 raft_waiter_destroy(w);
1519 }
1520}
1521
1522static bool OVS_WARN_UNUSED_RESULT
1523raft_set_term(struct raft *raft, uint64_t term, const struct uuid *vote)
1524{
1525 struct ovsdb_error *error = raft_write_state(raft->log, term, vote);
1526 if (!raft_handle_write_error(raft, error)) {
1527 return false;
1528 }
1529
1530 struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_TERM, true);
1531 raft->term = w->term.term = term;
1532 raft->vote = w->term.vote = vote ? *vote : UUID_ZERO;
1533 return true;
1534}
1535
1536static void
1537raft_accept_vote(struct raft *raft, struct raft_server *s,
1538 const struct uuid *vote)
1539{
1540 if (uuid_equals(&s->vote, vote)) {
1541 return;
1542 }
1543 if (!uuid_is_zero(&s->vote)) {
1544 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1545 char buf1[SID_LEN + 1];
1546 char buf2[SID_LEN + 1];
1547 VLOG_WARN_RL(&rl, "server %s changed its vote from %s to %s",
1548 s->nickname,
1549 raft_get_nickname(raft, &s->vote, buf1, sizeof buf1),
1550 raft_get_nickname(raft, vote, buf2, sizeof buf2));
1551 }
1552 s->vote = *vote;
1553 if (uuid_equals(vote, &raft->sid)
1554 && ++raft->n_votes > hmap_count(&raft->servers) / 2) {
1555 raft_become_leader(raft);
1556 }
1557}
1558
1559static void
1560raft_start_election(struct raft *raft, bool leadership_transfer)
1561{
1562 if (raft->leaving) {
1563 return;
1564 }
1565
1566 struct raft_server *me = raft_find_server(raft, &raft->sid);
1567 if (!me) {
1568 return;
1569 }
1570
1571 if (!raft_set_term(raft, raft->term + 1, &raft->sid)) {
1572 return;
1573 }
1574
1575 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
1576
1577 ovs_assert(raft->role != RAFT_LEADER);
1578 ovs_assert(hmap_is_empty(&raft->commands));
1579 raft->role = RAFT_CANDIDATE;
1580
1581 raft->n_votes = 0;
1582
1583 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1584 if (!VLOG_DROP_INFO(&rl)) {
1585 long long int now = time_msec();
1586 if (now >= raft->election_timeout) {
1587 VLOG_INFO("term %"PRIu64": %lld ms timeout expired, "
1588 "starting election",
1589 raft->term, now - raft->election_base);
1590 } else {
1591 VLOG_INFO("term %"PRIu64": starting election", raft->term);
1592 }
1593 }
1594 raft_reset_timer(raft);
1595
1596 struct raft_server *peer;
1597 HMAP_FOR_EACH (peer, hmap_node, &raft->servers) {
1598 peer->vote = UUID_ZERO;
1599 if (uuid_equals(&raft->sid, &peer->sid)) {
1600 continue;
1601 }
1602
1603 union raft_rpc rq = {
1604 .vote_request = {
1605 .common = {
1606 .type = RAFT_RPC_VOTE_REQUEST,
1607 .sid = peer->sid,
1608 },
1609 .term = raft->term,
1610 .last_log_index = raft->log_end - 1,
1611 .last_log_term = (
1612 raft->log_end > raft->log_start
1613 ? raft->entries[raft->log_end - raft->log_start - 1].term
1614 : raft->snap.term),
1615 .leadership_transfer = leadership_transfer,
1616 },
1617 };
1618 raft_send(raft, &rq);
1619 }
1620
1621 /* Vote for ourselves. */
1622 raft_accept_vote(raft, me, &raft->sid);
1623}
1624
1625static void
1626raft_open_conn(struct raft *raft, const char *address, const struct uuid *sid)
1627{
1628 if (strcmp(address, raft->local_address)
1629 && !raft_find_conn_by_address(raft, address)) {
1630 raft_add_conn(raft, jsonrpc_session_open(address, true), sid, false);
1631 }
1632}
1633
1634static void
1635raft_conn_close(struct raft_conn *conn)
1636{
1637 jsonrpc_session_close(conn->js);
1638 ovs_list_remove(&conn->list_node);
1639 free(conn->nickname);
1640 free(conn);
1641}
1642
1643/* Returns true if 'conn' should stay open, false if it should be closed. */
1644static bool
1645raft_conn_should_stay_open(struct raft *raft, struct raft_conn *conn)
1646{
1647 /* Close the connection if it's actually dead. If necessary, we'll
1648 * initiate a new session later. */
1649 if (!jsonrpc_session_is_alive(conn->js)) {
1650 return false;
1651 }
1652
1653 /* Keep incoming sessions. We trust the originator to decide to drop
1654 * it. */
1655 if (conn->incoming) {
1656 return true;
1657 }
1658
1659 /* If we are joining the cluster, keep sessions to the remote addresses
1660 * that are supposed to be part of the cluster we're joining. */
1661 if (raft->joining && sset_contains(&raft->remote_addresses,
1662 jsonrpc_session_get_name(conn->js))) {
1663 return true;
1664 }
1665
1666 /* We have joined the cluster. If we did that "recently", then there is a
1667 * chance that we do not have the most recent server configuration log
1668 * entry. If so, it's a waste to disconnect from the servers that were in
1669 * remote_addresses and that will probably appear in the configuration,
1670 * just to reconnect to them a moment later when we do get the
1671 * configuration update. If we are not ourselves in the configuration,
1672 * then we know that there must be a new configuration coming up, so in
1673 * that case keep the connection. */
1674 if (!raft_find_server(raft, &raft->sid)) {
1675 return true;
1676 }
1677
1678 /* Keep the connection only if the server is part of the configuration. */
1679 return raft_find_server(raft, &conn->sid);
1680}
1681
1682/* Allows 'raft' to maintain the distributed log. Call this function as part
1683 * of the process's main loop. */
1684void
1685raft_run(struct raft *raft)
1686{
1687 if (raft->left || raft->failed) {
1688 return;
1689 }
1690
1691 raft_waiters_run(raft);
1692
1693 if (!raft->listener && time_msec() >= raft->listen_backoff) {
1694 char *paddr = raft_make_address_passive(raft->local_address);
1695 int error = pstream_open(paddr, &raft->listener, DSCP_DEFAULT);
1696 if (error) {
1697 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1698 VLOG_WARN_RL(&rl, "%s: listen failed (%s)",
1699 paddr, ovs_strerror(error));
1700 raft->listen_backoff = time_msec() + 1000;
1701 }
1702 free(paddr);
1703 }
1704
1705 if (raft->listener) {
1706 struct stream *stream;
1707 int error = pstream_accept(raft->listener, &stream);
1708 if (!error) {
1709 raft_add_conn(raft, jsonrpc_session_open_unreliably(
1710 jsonrpc_open(stream), DSCP_DEFAULT), NULL,
1711 true);
1712 } else if (error != EAGAIN) {
1713 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1714 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
1715 pstream_get_name(raft->listener),
1716 ovs_strerror(error));
1717 }
1718 }
1719
1720 /* Run RPCs for all open sessions. */
1721 struct raft_conn *conn;
1722 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1723 raft_conn_run(raft, conn);
1724 }
1725
1726 /* Close unneeded sessions. */
1727 struct raft_conn *next;
1728 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1729 if (!raft_conn_should_stay_open(raft, conn)) {
1730 raft_conn_close(conn);
1731 }
1732 }
1733
1734 /* Open needed sessions. */
1735 struct raft_server *server;
1736 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1737 raft_open_conn(raft, server->address, &server->sid);
1738 }
1739 if (raft->joining) {
1740 const char *address;
1741 SSET_FOR_EACH (address, &raft->remote_addresses) {
1742 raft_open_conn(raft, address, NULL);
1743 }
1744 }
1745
1746 if (!raft->joining && time_msec() >= raft->election_timeout) {
1747 raft_start_election(raft, false);
1748 }
1749
1750 if (raft->leaving && time_msec() >= raft->leave_timeout) {
1751 raft_send_remove_server_requests(raft);
1752 }
1753
1754 if (raft->joining && time_msec() >= raft->join_timeout) {
1755 raft->join_timeout = time_msec() + 1000;
1756 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1757 raft_send_add_server_request(raft, conn);
1758 }
1759 }
1760
1761 if (time_msec() >= raft->ping_timeout) {
1762 if (raft->role == RAFT_LEADER) {
1763 raft_send_heartbeats(raft);
1764 } else {
1765 long long int now = time_msec();
1766 struct raft_command *cmd, *next_cmd;
1767 HMAP_FOR_EACH_SAFE (cmd, next_cmd, hmap_node, &raft->commands) {
1768 if (cmd->timestamp
1769 && now - cmd->timestamp > ELECTION_BASE_MSEC) {
1770 raft_command_complete(raft, cmd, RAFT_CMD_TIMEOUT);
1771 }
1772 }
1773 }
1774 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
1775 }
1776
1777 /* Do this only at the end; if we did it as soon as we set raft->left or
1778 * raft->failed in handling the RemoveServerReply, then it could easily
1779 * cause references to freed memory in RPC sessions, etc. */
1780 if (raft->left || raft->failed) {
1781 raft_close__(raft);
1782 }
1783}
1784
1785static void
1786raft_wait_session(struct jsonrpc_session *js)
1787{
1788 if (js) {
1789 jsonrpc_session_wait(js);
1790 jsonrpc_session_recv_wait(js);
1791 }
1792}
1793
1794/* Causes the next call to poll_block() to wake up when 'raft' needs to do
1795 * something. */
1796void
1797raft_wait(struct raft *raft)
1798{
1799 if (raft->left || raft->failed) {
1800 return;
1801 }
1802
1803 raft_waiters_wait(raft);
1804
1805 if (raft->listener) {
1806 pstream_wait(raft->listener);
1807 } else {
1808 poll_timer_wait_until(raft->listen_backoff);
1809 }
1810
1811 struct raft_conn *conn;
1812 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1813 raft_wait_session(conn->js);
1814 }
1815
1816 if (!raft->joining) {
1817 poll_timer_wait_until(raft->election_timeout);
1818 } else {
1819 poll_timer_wait_until(raft->join_timeout);
1820 }
1821 if (raft->leaving) {
1822 poll_timer_wait_until(raft->leave_timeout);
1823 }
1824 if (raft->role == RAFT_LEADER || !hmap_is_empty(&raft->commands)) {
1825 poll_timer_wait_until(raft->ping_timeout);
1826 }
1827}
1828
1829static struct raft_waiter *
1830raft_waiter_create(struct raft *raft, enum raft_waiter_type type,
1831 bool start_commit)
1832{
1833 struct raft_waiter *w = xzalloc(sizeof *w);
1834 ovs_list_push_back(&raft->waiters, &w->list_node);
1835 w->commit_ticket = start_commit ? ovsdb_log_commit_start(raft->log) : 0;
1836 w->type = type;
1837 return w;
1838}
1839
1840/* Returns a human-readable representation of 'status' (or NULL if 'status' is
1841 * invalid). */
1842const char *
1843raft_command_status_to_string(enum raft_command_status status)
1844{
1845 switch (status) {
1846 case RAFT_CMD_INCOMPLETE:
1847 return "operation still in progress";
1848 case RAFT_CMD_SUCCESS:
1849 return "success";
1850 case RAFT_CMD_NOT_LEADER:
1851 return "not leader";
1852 case RAFT_CMD_BAD_PREREQ:
1853 return "prerequisite check failed";
1854 case RAFT_CMD_LOST_LEADERSHIP:
1855 return "lost leadership";
1856 case RAFT_CMD_SHUTDOWN:
1857 return "server shutdown";
1858 case RAFT_CMD_IO_ERROR:
1859 return "I/O error";
1860 case RAFT_CMD_TIMEOUT:
1861 return "timeout";
1862 default:
1863 return NULL;
1864 }
1865}
1866
1867/* Converts human-readable status in 's' into status code in '*statusp'.
1868 * Returns true if successful, false if 's' is unknown. */
1869bool
1870raft_command_status_from_string(const char *s,
1871 enum raft_command_status *statusp)
1872{
1873 for (enum raft_command_status status = 0; ; status++) {
1874 const char *s2 = raft_command_status_to_string(status);
1875 if (!s2) {
1876 *statusp = 0;
1877 return false;
1878 } else if (!strcmp(s, s2)) {
1879 *statusp = status;
1880 return true;
1881 }
1882 }
1883}
1884
1885static const struct uuid *
1886raft_get_eid(const struct raft *raft, uint64_t index)
1887{
1888 for (; index >= raft->log_start; index--) {
1889 const struct raft_entry *e = raft_get_entry(raft, index);
1890 if (e->data) {
1891 return &e->eid;
1892 }
1893 }
1894 return &raft->snap.eid;
1895}
1896
1897static const struct uuid *
1898raft_current_eid(const struct raft *raft)
1899{
1900 return raft_get_eid(raft, raft->log_end - 1);
1901}
1902
1903static struct raft_command *
1904raft_command_create_completed(enum raft_command_status status)
1905{
1906 ovs_assert(status != RAFT_CMD_INCOMPLETE);
1907
1908 struct raft_command *cmd = xzalloc(sizeof *cmd);
1909 cmd->n_refs = 1;
1910 cmd->status = status;
1911 return cmd;
1912}
1913
1914static struct raft_command *
1915raft_command_create_incomplete(struct raft *raft, uint64_t index)
1916{
1917 struct raft_command *cmd = xzalloc(sizeof *cmd);
1918 cmd->n_refs = 2; /* One for client, one for raft->commands. */
1919 cmd->index = index;
1920 cmd->status = RAFT_CMD_INCOMPLETE;
1921 hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
1922 return cmd;
1923}
1924
1925static struct raft_command * OVS_WARN_UNUSED_RESULT
1926raft_command_initiate(struct raft *raft,
1927 const struct json *data, const struct json *servers,
1928 const struct uuid *eid)
1929{
1930 /* Write to local log. */
1931 uint64_t index = raft->log_end;
1932 if (!raft_handle_write_error(
1933 raft, raft_write_entry(
1934 raft, raft->term, json_nullable_clone(data), eid,
1935 json_nullable_clone(servers)))) {
1936 return raft_command_create_completed(RAFT_CMD_IO_ERROR);
1937 }
1938
1939 struct raft_command *cmd = raft_command_create_incomplete(raft, index);
1940 if (eid) {
1941 cmd->eid = *eid;
1942 }
1943
1944 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index = cmd->index;
1945
1946 /* Write to remote logs. */
1947 struct raft_server *s;
1948 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1949 if (!uuid_equals(&s->sid, &raft->sid) && s->next_index == index) {
1950 raft_send_append_request(raft, s, 1, "execute command");
1951 s->next_index++;
1952 }
1953 }
1954
1955 return cmd;
1956}
1957
1958static struct raft_command * OVS_WARN_UNUSED_RESULT
1959raft_command_execute__(struct raft *raft,
1960 const struct json *data, const struct json *servers,
1961 const struct uuid *prereq, struct uuid *result)
1962{
1963 if (raft->joining || raft->leaving || raft->left || raft->failed) {
1964 return raft_command_create_completed(RAFT_CMD_SHUTDOWN);
1965 }
1966
1967 if (raft->role != RAFT_LEADER) {
1968 /* Consider proxying the command to the leader. We can only do that if
1969 * we know the leader and the command does not change the set of
1970 * servers. We do not proxy commands without prerequisites, even
1971 * though we could, because in an OVSDB context a log entry doesn't
1972 * make sense without context. */
1973 if (servers || !data
1974 || raft->role != RAFT_FOLLOWER || uuid_is_zero(&raft->leader_sid)
1975 || !prereq) {
1976 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
1977 }
1978 }
1979
1980 struct uuid eid = data ? uuid_random() : UUID_ZERO;
1981 if (result) {
1982 *result = eid;
1983 }
1984
1985 if (raft->role != RAFT_LEADER) {
1986 const union raft_rpc rpc = {
1987 .execute_command_request = {
1988 .common = {
1989 .type = RAFT_RPC_EXECUTE_COMMAND_REQUEST,
1990 .sid = raft->leader_sid,
1991 },
1992 .data = CONST_CAST(struct json *, data),
1993 .prereq = *prereq,
1994 .result = eid,
1995 }
1996 };
1997 if (!raft_send(raft, &rpc)) {
1998 /* Couldn't send command, so it definitely failed. */
1999 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2000 }
2001
2002 struct raft_command *cmd = raft_command_create_incomplete(raft, 0);
2003 cmd->timestamp = time_msec();
2004 cmd->eid = eid;
2005 return cmd;
2006 }
2007
2008 const struct uuid *current_eid = raft_current_eid(raft);
2009 if (prereq && !uuid_equals(prereq, current_eid)) {
2010 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2011 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
2012 "prerequisite "UUID_FMT,
2013 UUID_ARGS(current_eid), UUID_ARGS(prereq));
2014 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ);
2015 }
2016
2017 return raft_command_initiate(raft, data, servers, &eid);
2018}
2019
2020/* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2021 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2022 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2023 * populated with the entry ID for the new log entry.
2024 *
2025 * Returns a "struct raft_command" that may be used to track progress adding
2026 * the log entry. The caller must eventually free the returned structure, with
2027 * raft_command_unref(). */
2028struct raft_command * OVS_WARN_UNUSED_RESULT
2029raft_command_execute(struct raft *raft, const struct json *data,
2030 const struct uuid *prereq, struct uuid *result)
2031{
2032 return raft_command_execute__(raft, data, NULL, prereq, result);
2033}
2034
2035/* Returns the status of 'cmd'. */
2036enum raft_command_status
2037raft_command_get_status(const struct raft_command *cmd)
2038{
2039 ovs_assert(cmd->n_refs > 0);
2040 return cmd->status;
2041}
2042
2043/* Returns the index of the log entry at which 'cmd' was committed.
2044 *
2045 * This function works only with successful commands. */
2046uint64_t
2047raft_command_get_commit_index(const struct raft_command *cmd)
2048{
2049 ovs_assert(cmd->n_refs > 0);
2050 ovs_assert(cmd->status == RAFT_CMD_SUCCESS);
2051 return cmd->index;
2052}
2053
2054/* Frees 'cmd'. */
2055void
2056raft_command_unref(struct raft_command *cmd)
2057{
2058 if (cmd) {
2059 ovs_assert(cmd->n_refs > 0);
2060 if (!--cmd->n_refs) {
2061 free(cmd);
2062 }
2063 }
2064}
2065
2066/* Causes poll_block() to wake up when 'cmd' has status to report. */
2067void
2068raft_command_wait(const struct raft_command *cmd)
2069{
2070 if (cmd->status != RAFT_CMD_INCOMPLETE) {
2071 poll_immediate_wake();
2072 }
2073}
2074
2075static void
2076raft_command_complete(struct raft *raft,
2077 struct raft_command *cmd,
2078 enum raft_command_status status)
2079{
2080 if (!uuid_is_zero(&cmd->sid)) {
2081 uint64_t commit_index = status == RAFT_CMD_SUCCESS ? cmd->index : 0;
2082 raft_send_execute_command_reply(raft, &cmd->sid, &cmd->eid, status,
2083 commit_index);
2084 }
2085
2086 ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
2087 ovs_assert(cmd->n_refs > 0);
2088 hmap_remove(&raft->commands, &cmd->hmap_node);
2089 cmd->status = status;
2090 raft_command_unref(cmd);
2091}
2092
2093static void
2094raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
2095{
2096 struct raft_command *cmd, *next;
2097 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2098 raft_command_complete(raft, cmd, status);
2099 }
2100}
2101
2102static struct raft_command *
2103raft_find_command_by_index(struct raft *raft, uint64_t index)
2104{
2105 struct raft_command *cmd;
2106
2107 HMAP_FOR_EACH_IN_BUCKET (cmd, hmap_node, index, &raft->commands) {
2108 if (cmd->index == index) {
2109 return cmd;
2110 }
2111 }
2112 return NULL;
2113}
2114
2115static struct raft_command *
2116raft_find_command_by_eid(struct raft *raft, const struct uuid *eid)
2117{
2118 struct raft_command *cmd;
2119
2120 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2121 if (uuid_equals(&cmd->eid, eid)) {
2122 return cmd;
2123 }
2124 }
2125 return NULL;
2126}
2127\f
2128#define RAFT_RPC(ENUM, NAME) \
2129 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2130RAFT_RPC_TYPES
2131#undef RAFT_RPC
2132
2133static void
2134raft_handle_hello_request(struct raft *raft OVS_UNUSED,
2135 const struct raft_hello_request *hello OVS_UNUSED)
2136{
2137}
2138
2139/* 'sid' is the server being added. */
2140static void
2141raft_send_add_server_reply__(struct raft *raft, const struct uuid *sid,
2142 const char *address,
2143 bool success, const char *comment)
2144{
2145 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2146 if (!VLOG_DROP_INFO(&rl)) {
2147 struct ds s = DS_EMPTY_INITIALIZER;
2148 char buf[SID_LEN + 1];
2149 ds_put_format(&s, "adding %s ("SID_FMT" at %s) "
2150 "to cluster "CID_FMT" %s",
2151 raft_get_nickname(raft, sid, buf, sizeof buf),
2152 SID_ARGS(sid), address, CID_ARGS(&raft->cid),
2153 success ? "succeeded" : "failed");
2154 if (comment) {
2155 ds_put_format(&s, " (%s)", comment);
2156 }
2157 VLOG_INFO("%s", ds_cstr(&s));
2158 ds_destroy(&s);
2159 }
2160
2161 union raft_rpc rpy = {
2162 .add_server_reply = {
2163 .common = {
2164 .type = RAFT_RPC_ADD_SERVER_REPLY,
2165 .sid = *sid,
2166 .comment = CONST_CAST(char *, comment),
2167 },
2168 .success = success,
2169 }
2170 };
2171
2172 struct sset *remote_addresses = &rpy.add_server_reply.remote_addresses;
2173 sset_init(remote_addresses);
2174 if (!raft->joining) {
2175 struct raft_server *s;
2176 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2177 if (!uuid_equals(&s->sid, &raft->sid)) {
2178 sset_add(remote_addresses, s->address);
2179 }
2180 }
2181 }
2182
2183 raft_send(raft, &rpy);
2184
2185 sset_destroy(remote_addresses);
2186}
2187
2188static void
2189raft_send_remove_server_reply_rpc(struct raft *raft, const struct uuid *sid,
2190 bool success, const char *comment)
2191{
2192 const union raft_rpc rpy = {
2193 .remove_server_reply = {
2194 .common = {
2195 .type = RAFT_RPC_REMOVE_SERVER_REPLY,
2196 .sid = *sid,
2197 .comment = CONST_CAST(char *, comment),
2198 },
2199 .success = success,
2200 }
2201 };
2202 raft_send(raft, &rpy);
2203}
2204
2205static void
2206raft_send_remove_server_reply__(struct raft *raft,
2207 const struct uuid *target_sid,
2208 const struct uuid *requester_sid,
2209 struct unixctl_conn *requester_conn,
2210 bool success, const char *comment)
2211{
2212 struct ds s = DS_EMPTY_INITIALIZER;
2213 ds_put_format(&s, "request ");
2214 if (!uuid_is_zero(requester_sid)) {
2215 char buf[SID_LEN + 1];
2216 ds_put_format(&s, "by %s",
2217 raft_get_nickname(raft, requester_sid, buf, sizeof buf));
2218 } else {
2219 ds_put_cstr(&s, "via unixctl");
2220 }
2221 ds_put_cstr(&s, " to remove ");
2222 if (!requester_conn && uuid_equals(target_sid, requester_sid)) {
2223 ds_put_cstr(&s, "itself");
2224 } else {
2225 char buf[SID_LEN + 1];
2226 ds_put_cstr(&s, raft_get_nickname(raft, target_sid, buf, sizeof buf));
2227 }
2228 ds_put_format(&s, " from cluster "CID_FMT" %s",
2229 CID_ARGS(&raft->cid),
2230 success ? "succeeded" : "failed");
2231 if (comment) {
2232 ds_put_format(&s, " (%s)", comment);
2233 }
2234
2235 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2236 VLOG_INFO_RL(&rl, "%s", ds_cstr(&s));
2237
2238 /* Send RemoveServerReply to the requester (which could be a server or a
2239 * unixctl connection. Also always send it to the removed server; this
2240 * allows it to be sure that it's really removed and update its log and
2241 * disconnect permanently. */
2242 if (!uuid_is_zero(requester_sid)) {
2243 raft_send_remove_server_reply_rpc(raft, requester_sid,
2244 success, comment);
2245 }
2246 if (!uuid_equals(requester_sid, target_sid)) {
2247 raft_send_remove_server_reply_rpc(raft, target_sid, success, comment);
2248 }
2249 if (requester_conn) {
2250 if (success) {
2251 unixctl_command_reply(requester_conn, ds_cstr(&s));
2252 } else {
2253 unixctl_command_reply_error(requester_conn, ds_cstr(&s));
2254 }
2255 }
2256
2257 ds_destroy(&s);
2258}
2259
2260static void
2261raft_send_add_server_reply(struct raft *raft,
2262 const struct raft_add_server_request *rq,
2263 bool success, const char *comment)
2264{
2265 return raft_send_add_server_reply__(raft, &rq->common.sid, rq->address,
2266 success, comment);
2267}
2268
2269static void
2270raft_send_remove_server_reply(struct raft *raft,
2271 const struct raft_remove_server_request *rq,
2272 bool success, const char *comment)
2273{
2274 return raft_send_remove_server_reply__(raft, &rq->sid, &rq->common.sid,
2275 rq->requester_conn, success,
2276 comment);
2277}
2278
2279static void
2280raft_become_follower(struct raft *raft)
2281{
2282 raft->leader_sid = UUID_ZERO;
2283 if (raft->role == RAFT_FOLLOWER) {
2284 return;
2285 }
2286
2287 raft->role = RAFT_FOLLOWER;
2288 raft_reset_timer(raft);
2289
2290 /* Notify clients about lost leadership.
2291 *
2292 * We do not reverse our changes to 'raft->servers' because the new
2293 * configuration is already part of the log. Possibly the configuration
2294 * log entry will not be committed, but until we know that we must use the
2295 * new configuration. Our AppendEntries processing will properly update
2296 * the server configuration later, if necessary. */
2297 struct raft_server *s;
2298 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
2299 raft_send_add_server_reply__(raft, &s->sid, s->address, false,
2300 RAFT_SERVER_LOST_LEADERSHIP);
2301 }
2302 if (raft->remove_server) {
2303 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
2304 &raft->remove_server->requester_sid,
2305 raft->remove_server->requester_conn,
2306 false, RAFT_SERVER_LOST_LEADERSHIP);
2307 raft_server_destroy(raft->remove_server);
2308 raft->remove_server = NULL;
2309 }
2310
2311 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2312}
2313
2314static void
2315raft_send_append_request(struct raft *raft,
2316 struct raft_server *peer, unsigned int n,
2317 const char *comment)
2318{
2319 ovs_assert(raft->role == RAFT_LEADER);
2320
2321 const union raft_rpc rq = {
2322 .append_request = {
2323 .common = {
2324 .type = RAFT_RPC_APPEND_REQUEST,
2325 .sid = peer->sid,
2326 .comment = CONST_CAST(char *, comment),
2327 },
2328 .term = raft->term,
2329 .prev_log_index = peer->next_index - 1,
2330 .prev_log_term = (peer->next_index - 1 >= raft->log_start
2331 ? raft->entries[peer->next_index - 1
2332 - raft->log_start].term
2333 : raft->snap.term),
2334 .leader_commit = raft->commit_index,
2335 .entries = &raft->entries[peer->next_index - raft->log_start],
2336 .n_entries = n,
2337 },
2338 };
2339 raft_send(raft, &rq);
2340}
2341
2342static void
2343raft_send_heartbeats(struct raft *raft)
2344{
2345 struct raft_server *s;
2346 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2347 if (!uuid_equals(&raft->sid, &s->sid)) {
2348 raft_send_append_request(raft, s, 0, "heartbeat");
2349 }
2350 }
2351
2352 /* Send anyone waiting for a command to complete a ping to let them
2353 * know we're still working on it. */
2354 struct raft_command *cmd;
2355 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2356 if (!uuid_is_zero(&cmd->sid)) {
2357 raft_send_execute_command_reply(raft, &cmd->sid,
2358 &cmd->eid,
2359 RAFT_CMD_INCOMPLETE, 0);
2360 }
2361 }
2362}
2363
2364/* Initializes the fields in 's' that represent the leader's view of the
2365 * server. */
2366static void
2367raft_server_init_leader(struct raft *raft, struct raft_server *s)
2368{
2369 s->next_index = raft->log_end;
2370 s->match_index = 0;
2371 s->phase = RAFT_PHASE_STABLE;
2372}
2373
2374static void
2375raft_become_leader(struct raft *raft)
2376{
2377 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2378
2379 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
2380 VLOG_INFO_RL(&rl, "term %"PRIu64": elected leader by %d+ of "
2381 "%"PRIuSIZE" servers", raft->term,
2382 raft->n_votes, hmap_count(&raft->servers));
2383
2384 ovs_assert(raft->role != RAFT_LEADER);
2385 raft->role = RAFT_LEADER;
2386 raft->leader_sid = raft->sid;
2387 raft->election_timeout = LLONG_MAX;
2388 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
2389
2390 struct raft_server *s;
2391 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2392 raft_server_init_leader(raft, s);
2393 }
2394
2395 raft_update_our_match_index(raft, raft->log_end - 1);
2396 raft_send_heartbeats(raft);
2397
2398 /* Write the fact that we are leader to the log. This is not used by the
2399 * algorithm (although it could be, for quick restart), but it is used for
2400 * offline analysis to check for conformance with the properties that Raft
2401 * guarantees. */
2402 struct raft_record r = {
2403 .type = RAFT_REC_LEADER,
2404 .term = raft->term,
2405 .sid = raft->sid,
2406 };
2407 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2408
2409 /* Initiate a no-op commit. Otherwise we might never find out what's in
2410 * the log. See section 6.4 item 1:
2411 *
2412 * The Leader Completeness Property guarantees that a leader has all
2413 * committed entries, but at the start of its term, it may not know
2414 * which those are. To find out, it needs to commit an entry from its
2415 * term. Raft handles this by having each leader commit a blank no-op
2416 * entry into the log at the start of its term. As soon as this no-op
2417 * entry is committed, the leader’s commit index will be at least as
2418 * large as any other servers’ during its term.
2419 */
2420 raft_command_unref(raft_command_execute__(raft, NULL, NULL, NULL, NULL));
2421}
2422
2423/* Processes term 'term' received as part of RPC 'common'. Returns true if the
2424 * caller should continue processing the RPC, false if the caller should reject
2425 * it due to a stale term. */
2426static bool
2427raft_receive_term__(struct raft *raft, const struct raft_rpc_common *common,
2428 uint64_t term)
2429{
2430 /* Section 3.3 says:
2431 *
2432 * Current terms are exchanged whenever servers communicate; if one
2433 * server’s current term is smaller than the other’s, then it updates
2434 * its current term to the larger value. If a candidate or leader
2435 * discovers that its term is out of date, it immediately reverts to
2436 * follower state. If a server receives a request with a stale term
2437 * number, it rejects the request.
2438 */
2439 if (term > raft->term) {
2440 if (!raft_set_term(raft, term, NULL)) {
2441 /* Failed to update the term to 'term'. */
2442 return false;
2443 }
2444 raft_become_follower(raft);
2445 } else if (term < raft->term) {
2446 char buf[SID_LEN + 1];
2447 VLOG_INFO("rejecting term %"PRIu64" < current term %"PRIu64" received "
2448 "in %s message from server %s",
2449 term, raft->term,
2450 raft_rpc_type_to_string(common->type),
2451 raft_get_nickname(raft, &common->sid, buf, sizeof buf));
2452 return false;
2453 }
2454 return true;
2455}
2456
2457static void
2458raft_get_servers_from_log(struct raft *raft, enum vlog_level level)
2459{
2460 const struct json *servers_json = raft->snap.servers;
2461 for (uint64_t index = raft->log_end - 1; index >= raft->log_start;
2462 index--) {
2463 struct raft_entry *e = &raft->entries[index - raft->log_start];
2464 if (e->servers) {
2465 servers_json = e->servers;
2466 break;
2467 }
2468 }
2469
2470 struct hmap servers;
2471 struct ovsdb_error *error = raft_servers_from_json(servers_json, &servers);
2472 ovs_assert(!error);
2473 raft_set_servers(raft, &servers, level);
2474 raft_servers_destroy(&servers);
2475}
2476
2477/* Truncates the log, so that raft->log_end becomes 'new_end'.
2478 *
2479 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2480 * log file, but we don't have the right information to know how long it should
2481 * be. What we actually do is to append entries for older indexes to the
2482 * on-disk log; when we re-read it later, these entries truncate the log.
2483 *
2484 * Returns true if any of the removed log entries were server configuration
2485 * entries, false otherwise. */
2486static bool
2487raft_truncate(struct raft *raft, uint64_t new_end)
2488{
2489 ovs_assert(new_end >= raft->log_start);
2490 if (raft->log_end > new_end) {
2491 char buf[SID_LEN + 1];
2492 VLOG_INFO("%s truncating %"PRIu64 " entries from end of log",
2493 raft_get_nickname(raft, &raft->sid, buf, sizeof buf),
2494 raft->log_end - new_end);
2495 }
2496
2497 bool servers_changed = false;
2498 while (raft->log_end > new_end) {
2499 struct raft_entry *entry = &raft->entries[--raft->log_end
2500 - raft->log_start];
2501 if (entry->servers) {
2502 servers_changed = true;
2503 }
2504 raft_entry_uninit(entry);
2505 }
2506 return servers_changed;
2507}
2508
2509static const struct json *
2510raft_peek_next_entry(struct raft *raft, struct uuid *eid)
2511{
2512 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2513 ovs_assert(raft->log_start <= raft->last_applied + 2);
2514 ovs_assert(raft->last_applied <= raft->commit_index);
2515 ovs_assert(raft->commit_index < raft->log_end);
2516
2517 if (raft->joining || raft->failed) {
2518 return NULL;
2519 }
2520
2521 if (raft->log_start == raft->last_applied + 2) {
2522 *eid = raft->snap.eid;
2523 return raft->snap.data;
2524 }
2525
2526 while (raft->last_applied < raft->commit_index) {
2527 const struct raft_entry *e = raft_get_entry(raft,
2528 raft->last_applied + 1);
2529 if (e->data) {
2530 *eid = e->eid;
2531 return e->data;
2532 }
2533 raft->last_applied++;
2534 }
2535 return NULL;
2536}
2537
2538static const struct json *
2539raft_get_next_entry(struct raft *raft, struct uuid *eid)
2540{
2541 const struct json *data = raft_peek_next_entry(raft, eid);
2542 if (data) {
2543 raft->last_applied++;
2544 }
2545 return data;
2546}
2547
2548static void
2549raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
2550{
2551 if (new_commit_index <= raft->commit_index) {
2552 return;
2553 }
2554
2555 if (raft->role == RAFT_LEADER) {
2556 while (raft->commit_index < new_commit_index) {
2557 uint64_t index = ++raft->commit_index;
2558 const struct raft_entry *e = raft_get_entry(raft, index);
2559 if (e->servers) {
2560 raft_run_reconfigure(raft);
2561 }
2562 if (e->data) {
2563 struct raft_command *cmd
2564 = raft_find_command_by_index(raft, index);
2565 if (cmd) {
2566 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2567 }
2568 }
2569 }
2570 } else {
2571 raft->commit_index = new_commit_index;
2572 }
2573
2574 /* Write the commit index to the log. The next time we restart, this
2575 * allows us to start exporting a reasonably fresh log, instead of a log
2576 * that only contains the snapshot. */
2577 struct raft_record r = {
2578 .type = RAFT_REC_COMMIT_INDEX,
2579 .commit_index = raft->commit_index,
2580 };
2581 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2582}
2583
2584/* This doesn't use rq->entries (but it does use rq->n_entries). */
2585static void
2586raft_send_append_reply(struct raft *raft, const struct raft_append_request *rq,
2587 enum raft_append_result result, const char *comment)
2588{
2589 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2590 * min(leaderCommit, index of last new entry)" */
2591 if (result == RAFT_APPEND_OK && rq->leader_commit > raft->commit_index) {
2592 raft_update_commit_index(
2593 raft, MIN(rq->leader_commit, rq->prev_log_index + rq->n_entries));
2594 }
2595
2596 /* Send reply. */
2597 union raft_rpc reply = {
2598 .append_reply = {
2599 .common = {
2600 .type = RAFT_RPC_APPEND_REPLY,
2601 .sid = rq->common.sid,
2602 .comment = CONST_CAST(char *, comment),
2603 },
2604 .term = raft->term,
2605 .log_end = raft->log_end,
2606 .prev_log_index = rq->prev_log_index,
2607 .prev_log_term = rq->prev_log_term,
2608 .n_entries = rq->n_entries,
2609 .result = result,
2610 }
2611 };
2612 raft_send(raft, &reply);
2613}
2614
2615/* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2616 * NULL. Otherwise, returns an explanation for the mismatch. */
2617static const char *
2618match_index_and_term(const struct raft *raft,
2619 uint64_t prev_log_index, uint64_t prev_log_term)
2620{
2621 if (prev_log_index < raft->log_start - 1) {
2622 return "mismatch before start of log";
2623 } else if (prev_log_index == raft->log_start - 1) {
2624 if (prev_log_term != raft->snap.term) {
2625 return "prev_term mismatch";
2626 }
2627 } else if (prev_log_index < raft->log_end) {
2628 if (raft->entries[prev_log_index - raft->log_start].term
2629 != prev_log_term) {
2630 return "term mismatch";
2631 }
2632 } else {
2633 /* prev_log_index >= raft->log_end */
2634 return "mismatch past end of log";
2635 }
2636 return NULL;
2637}
2638
2639static void
2640raft_handle_append_entries(struct raft *raft,
2641 const struct raft_append_request *rq,
2642 uint64_t prev_log_index, uint64_t prev_log_term,
2643 const struct raft_entry *entries,
2644 unsigned int n_entries)
2645{
2646 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2647 * the index and term of the entry in its log that immediately precedes
2648 * the new entries. If the follower does not find an entry in its log
2649 * with the same index and term, then it refuses the new entries." */
2650 const char *mismatch = match_index_and_term(raft, prev_log_index,
2651 prev_log_term);
2652 if (mismatch) {
2653 VLOG_INFO("rejecting append_request because previous entry "
2654 "%"PRIu64",%"PRIu64" not in local log (%s)",
2655 prev_log_term, prev_log_index, mismatch);
2656 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY, mismatch);
2657 return;
2658 }
2659
2660 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2661 * index but different terms), delete the existing entry and all that
2662 * follow it." */
2663 unsigned int i;
2664 bool servers_changed = false;
2665 for (i = 0; ; i++) {
2666 if (i >= n_entries) {
2667 /* No change. */
2668 if (rq->common.comment
2669 && !strcmp(rq->common.comment, "heartbeat")) {
2670 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "heartbeat");
2671 } else {
2672 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2673 }
2674 return;
2675 }
2676
2677 uint64_t log_index = (prev_log_index + 1) + i;
2678 if (log_index >= raft->log_end) {
2679 break;
2680 }
2681 if (raft->entries[log_index - raft->log_start].term
2682 != entries[i].term) {
2683 if (raft_truncate(raft, log_index)) {
2684 servers_changed = true;
2685 }
2686 break;
2687 }
2688 }
2689
2690 /* Figure 3.1: "Append any entries not already in the log." */
2691 struct ovsdb_error *error = NULL;
2692 bool any_written = false;
2693 for (; i < n_entries; i++) {
2694 const struct raft_entry *e = &entries[i];
2695 error = raft_write_entry(raft, e->term,
2696 json_nullable_clone(e->data), &e->eid,
2697 json_nullable_clone(e->servers));
2698 if (error) {
2699 break;
2700 }
2701 any_written = true;
2702 if (e->servers) {
2703 servers_changed = true;
2704 }
2705 }
2706
2707 if (any_written) {
2708 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index
2709 = raft->log_end - 1;
2710 }
2711 if (servers_changed) {
2712 /* The set of servers might have changed; check. */
2713 raft_get_servers_from_log(raft, VLL_INFO);
2714 }
2715
2716 if (error) {
2717 char *s = ovsdb_error_to_string_free(error);
2718 VLOG_ERR("%s", s);
2719 free(s);
2720 raft_send_append_reply(raft, rq, RAFT_APPEND_IO_ERROR, "I/O error");
2721 return;
2722 }
2723
2724 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "log updated");
2725}
2726
2727static bool
2728raft_update_leader(struct raft *raft, const struct uuid *sid)
2729{
2730 if (raft->role == RAFT_LEADER) {
2731 char buf[SID_LEN + 1];
2732 VLOG_ERR("this server is leader but server %s claims to be",
2733 raft_get_nickname(raft, sid, buf, sizeof buf));
2734 return false;
2735 } else if (!uuid_equals(sid, &raft->leader_sid)) {
2736 if (!uuid_is_zero(&raft->leader_sid)) {
2737 char buf1[SID_LEN + 1];
2738 char buf2[SID_LEN + 1];
2739 VLOG_ERR("leader for term %"PRIu64" changed from %s to %s",
2740 raft->term,
2741 raft_get_nickname(raft, &raft->leader_sid,
2742 buf1, sizeof buf1),
2743 raft_get_nickname(raft, sid, buf2, sizeof buf2));
2744 } else {
2745 char buf[SID_LEN + 1];
2746 VLOG_INFO("server %s is leader for term %"PRIu64,
2747 raft_get_nickname(raft, sid, buf, sizeof buf),
2748 raft->term);
2749 }
2750 raft->leader_sid = *sid;
2751
2752 /* Record the leader to the log. This is not used by the algorithm
2753 * (although it could be, for quick restart), but it is used for
2754 * offline analysis to check for conformance with the properties
2755 * that Raft guarantees. */
2756 struct raft_record r = {
2757 .type = RAFT_REC_LEADER,
2758 .term = raft->term,
2759 .sid = *sid,
2760 };
2761 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2762 }
2763 return true;
2764}
2765
2766static void
2767raft_handle_append_request(struct raft *raft,
2768 const struct raft_append_request *rq)
2769{
2770 /* We do not check whether the server that sent the request is part of the
2771 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
2772 * from a leader that is not part of the server’s latest configuration.
2773 * Otherwise, a new server could never be added to the cluster (it would
2774 * never accept any log entries preceding the configuration entry that adds
2775 * the server)." */
2776 if (!raft_update_leader(raft, &rq->common.sid)) {
2777 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
2778 "usurped leadership");
2779 return;
2780 }
2781 raft_reset_timer(raft);
2782
2783 /* First check for the common case, where the AppendEntries request is
2784 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
2785 * like this:
2786 *
2787 * rq->prev_log_index
2788 * | first_entry_index
2789 * | | nth_entry_index
2790 * | | |
2791 * v v v
2792 * +---+---+---+---+
2793 * T | T | T | T | T |
2794 * +---+-------+---+
2795 * +---+---+---+---+
2796 * T | T | T | T | T |
2797 * +---+---+---+---+
2798 * ^ ^
2799 * | |
2800 * log_start log_end
2801 * */
2802 uint64_t first_entry_index = rq->prev_log_index + 1;
2803 uint64_t nth_entry_index = rq->prev_log_index + rq->n_entries;
2804 if (OVS_LIKELY(first_entry_index >= raft->log_start)) {
2805 raft_handle_append_entries(raft, rq,
2806 rq->prev_log_index, rq->prev_log_term,
2807 rq->entries, rq->n_entries);
2808 return;
2809 }
2810
2811 /* Now a series of checks for odd cases, where the AppendEntries request
2812 * extends earlier than the beginning of our log, into the log entries
2813 * discarded by the most recent snapshot. */
2814
2815 /*
2816 * Handle the case where the indexes covered by rq->entries[] are entirely
2817 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
2818 * everything in the AppendEntries request must already have been
2819 * committed, and we might as well return true.
2820 *
2821 * rq->prev_log_index
2822 * | first_entry_index
2823 * | | nth_entry_index
2824 * | | |
2825 * v v v
2826 * +---+---+---+---+
2827 * T | T | T | T | T |
2828 * +---+-------+---+
2829 * +---+---+---+---+
2830 * T | T | T | T | T |
2831 * +---+---+---+---+
2832 * ^ ^
2833 * | |
2834 * log_start log_end
2835 */
2836 if (nth_entry_index < raft->log_start - 1) {
2837 raft_send_append_reply(raft, rq, RAFT_APPEND_OK,
2838 "append before log start");
2839 return;
2840 }
2841
2842 /*
2843 * Handle the case where the last entry in rq->entries[] has the same index
2844 * as 'log_start - 1', so we can compare their terms:
2845 *
2846 * rq->prev_log_index
2847 * | first_entry_index
2848 * | | nth_entry_index
2849 * | | |
2850 * v v v
2851 * +---+---+---+---+
2852 * T | T | T | T | T |
2853 * +---+-------+---+
2854 * +---+---+---+---+
2855 * T | T | T | T | T |
2856 * +---+---+---+---+
2857 * ^ ^
2858 * | |
2859 * log_start log_end
2860 *
2861 * There's actually a sub-case where rq->n_entries == 0, in which we
2862 * compare rq->prev_term:
2863 *
2864 * rq->prev_log_index
2865 * |
2866 * |
2867 * |
2868 * v
2869 * T
2870 *
2871 * +---+---+---+---+
2872 * T | T | T | T | T |
2873 * +---+---+---+---+
2874 * ^ ^
2875 * | |
2876 * log_start log_end
2877 */
2878 if (nth_entry_index == raft->log_start - 1) {
2879 if (rq->n_entries
2880 ? raft->snap.term == rq->entries[rq->n_entries - 1].term
2881 : raft->snap.term == rq->prev_log_term) {
2882 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2883 } else {
2884 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
2885 "term mismatch");
2886 }
2887 return;
2888 }
2889
2890 /*
2891 * We now know that the data in rq->entries[] overlaps the data in
2892 * raft->entries[], as shown below, with some positive 'ofs':
2893 *
2894 * rq->prev_log_index
2895 * | first_entry_index
2896 * | | nth_entry_index
2897 * | | |
2898 * v v v
2899 * +---+---+---+---+---+
2900 * T | T | T | T | T | T |
2901 * +---+-------+---+---+
2902 * +---+---+---+---+
2903 * T | T | T | T | T |
2904 * +---+---+---+---+
2905 * ^ ^
2906 * | |
2907 * log_start log_end
2908 *
2909 * |<-- ofs -->|
2910 *
2911 * We transform this into the following by trimming the first 'ofs'
2912 * elements off of rq->entries[], ending up with the following. Notice how
2913 * we retain the term but not the data for rq->entries[ofs - 1]:
2914 *
2915 * first_entry_index + ofs - 1
2916 * | first_entry_index + ofs
2917 * | | nth_entry_index + ofs
2918 * | | |
2919 * v v v
2920 * +---+---+
2921 * T | T | T |
2922 * +---+---+
2923 * +---+---+---+---+
2924 * T | T | T | T | T |
2925 * +---+---+---+---+
2926 * ^ ^
2927 * | |
2928 * log_start log_end
2929 */
2930 uint64_t ofs = raft->log_start - first_entry_index;
2931 raft_handle_append_entries(raft, rq,
2932 raft->log_start - 1, rq->entries[ofs - 1].term,
2933 &rq->entries[ofs], rq->n_entries - ofs);
2934}
2935
2936/* Returns true if 'raft' has another log entry or snapshot to read. */
2937bool
2938raft_has_next_entry(const struct raft *raft_)
2939{
2940 struct raft *raft = CONST_CAST(struct raft *, raft_);
2941 struct uuid eid;
2942 return raft_peek_next_entry(raft, &eid) != NULL;
2943}
2944
2945/* Returns the next log entry or snapshot from 'raft', or NULL if there are
2946 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
2947 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
2948 * log entry. */
2949const struct json *
2950raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
2951{
2952 const struct json *data = raft_get_next_entry(raft, eid);
2953 *is_snapshot = data == raft->snap.data;
2954 return data;
2955}
2956
2957/* Returns the log index of the last-read snapshot or log entry. */
2958uint64_t
2959raft_get_applied_index(const struct raft *raft)
2960{
2961 return raft->last_applied;
2962}
2963
2964/* Returns the log index of the last snapshot or log entry that is available to
2965 * be read. */
2966uint64_t
2967raft_get_commit_index(const struct raft *raft)
2968{
2969 return raft->commit_index;
2970}
2971
2972static struct raft_server *
2973raft_find_peer(struct raft *raft, const struct uuid *uuid)
2974{
2975 struct raft_server *s = raft_find_server(raft, uuid);
2976 return s && !uuid_equals(&raft->sid, &s->sid) ? s : NULL;
2977}
2978
2979static struct raft_server *
2980raft_find_new_server(struct raft *raft, const struct uuid *uuid)
2981{
2982 return raft_server_find(&raft->add_servers, uuid);
2983}
2984
2985/* Figure 3.1: "If there exists an N such that N > commitIndex, a
2986 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
2987 * commitIndex = N (sections 3.5 and 3.6)." */
2988static void
2989raft_consider_updating_commit_index(struct raft *raft)
2990{
2991 /* This loop cannot just bail out when it comes across a log entry that
2992 * does not match the criteria. For example, Figure 3.7(d2) shows a
2993 * case where the log entry for term 2 cannot be committed directly
2994 * (because it is not for the current term) but it can be committed as
2995 * a side effect of commit the entry for term 4 (the current term).
2996 * XXX Is there a more efficient way to do this? */
2997 ovs_assert(raft->role == RAFT_LEADER);
2998
2999 uint64_t new_commit_index = raft->commit_index;
3000 for (uint64_t idx = MAX(raft->commit_index + 1, raft->log_start);
3001 idx < raft->log_end; idx++) {
3002 if (raft->entries[idx - raft->log_start].term == raft->term) {
3003 size_t count = 0;
3004 struct raft_server *s2;
3005 HMAP_FOR_EACH (s2, hmap_node, &raft->servers) {
3006 if (s2->match_index >= idx) {
3007 count++;
3008 }
3009 }
3010 if (count > hmap_count(&raft->servers) / 2) {
3011 VLOG_DBG("index %"PRIu64" committed to %"PRIuSIZE" servers, "
3012 "applying", idx, count);
3013 new_commit_index = idx;
3014 }
3015 }
3016 }
3017 raft_update_commit_index(raft, new_commit_index);
3018}
3019
3020static void
3021raft_update_match_index(struct raft *raft, struct raft_server *s,
3022 uint64_t min_index)
3023{
3024 ovs_assert(raft->role == RAFT_LEADER);
3025 if (min_index > s->match_index) {
3026 s->match_index = min_index;
3027 raft_consider_updating_commit_index(raft);
3028 }
3029}
3030
3031static void
3032raft_update_our_match_index(struct raft *raft, uint64_t min_index)
3033{
3034 raft_update_match_index(raft, raft_find_server(raft, &raft->sid),
3035 min_index);
3036}
3037
3038static void
3039raft_send_install_snapshot_request(struct raft *raft,
3040 const struct raft_server *s,
3041 const char *comment)
3042{
3043 union raft_rpc rpc = {
3044 .install_snapshot_request = {
3045 .common = {
3046 .type = RAFT_RPC_INSTALL_SNAPSHOT_REQUEST,
3047 .sid = s->sid,
3048 .comment = CONST_CAST(char *, comment),
3049 },
3050 .term = raft->term,
3051 .last_index = raft->log_start - 1,
3052 .last_term = raft->snap.term,
3053 .last_servers = raft->snap.servers,
3054 .last_eid = raft->snap.eid,
3055 .data = raft->snap.data,
3056 }
3057 };
3058 raft_send(raft, &rpc);
3059}
3060
3061static void
3062raft_handle_append_reply(struct raft *raft,
3063 const struct raft_append_reply *rpy)
3064{
3065 if (raft->role != RAFT_LEADER) {
3066 VLOG_INFO("rejected append_reply (not leader)");
3067 return;
3068 }
3069
3070 /* Most commonly we'd be getting an AppendEntries reply from a configured
3071 * server (e.g. a peer), but we can also get them from servers in the
3072 * process of being added. */
3073 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3074 if (!s) {
3075 s = raft_find_new_server(raft, &rpy->common.sid);
3076 if (!s) {
3077 VLOG_INFO("rejected append_reply from unknown server "SID_FMT,
3078 SID_ARGS(&rpy->common.sid));
3079 return;
3080 }
3081 }
3082
3083 if (rpy->result == RAFT_APPEND_OK) {
3084 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3085 * follower (section 3.5)." */
3086 uint64_t min_index = rpy->prev_log_index + rpy->n_entries + 1;
3087 if (s->next_index < min_index) {
3088 s->next_index = min_index;
3089 }
3090 raft_update_match_index(raft, s, min_index - 1);
3091 } else {
3092 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3093 * decrement nextIndex and retry (section 3.5)."
3094 *
3095 * We also implement the optimization suggested in section 4.2.1:
3096 * "Various approaches can make nextIndex converge to its correct value
3097 * more quickly, including those described in Chapter 3. The simplest
3098 * approach to solving this particular problem of adding a new server,
3099 * however, is to have followers return the length of their logs in the
3100 * AppendEntries response; this allows the leader to cap the follower’s
3101 * nextIndex accordingly." */
3102 s->next_index = (s->next_index > 0
3103 ? MIN(s->next_index - 1, rpy->log_end)
3104 : 0);
3105
3106 if (rpy->result == RAFT_APPEND_IO_ERROR) {
3107 /* Append failed but not because of a log inconsistency. Because
3108 * of the I/O error, there's no point in re-sending the append
3109 * immediately. */
3110 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3111 VLOG_INFO_RL(&rl, "%s reported I/O error", s->nickname);
3112 return;
3113 }
3114 }
3115
3116 /*
3117 * Our behavior here must depend on the value of next_index relative to
3118 * log_start and log_end. There are three cases:
3119 *
3120 * Case 1 | Case 2 | Case 3
3121 * <---------------->|<------------->|<------------------>
3122 * | |
3123 *
3124 * +---+---+---+---+
3125 * T | T | T | T | T |
3126 * +---+---+---+---+
3127 * ^ ^
3128 * | |
3129 * log_start log_end
3130 */
3131 if (s->next_index < raft->log_start) {
3132 /* Case 1. */
3133 raft_send_install_snapshot_request(raft, s, NULL);
3134 } else if (s->next_index < raft->log_end) {
3135 /* Case 2. */
3136 raft_send_append_request(raft, s, 1, NULL);
3137 } else {
3138 /* Case 3. */
3139 if (s->phase == RAFT_PHASE_CATCHUP) {
3140 s->phase = RAFT_PHASE_CAUGHT_UP;
3141 raft_run_reconfigure(raft);
3142 }
3143 }
3144}
3145
3146static bool
3147raft_should_suppress_disruptive_server(struct raft *raft,
3148 const union raft_rpc *rpc)
3149{
3150 if (rpc->type != RAFT_RPC_VOTE_REQUEST) {
3151 return false;
3152 }
3153
3154 /* Section 4.2.3 "Disruptive Servers" says:
3155 *
3156 * ...if a server receives a RequestVote request within the minimum
3157 * election timeout of hearing from a current leader, it does not update
3158 * its term or grant its vote...
3159 *
3160 * ...This change conflicts with the leadership transfer mechanism as
3161 * described in Chapter 3, in which a server legitimately starts an
3162 * election without waiting an election timeout. In that case,
3163 * RequestVote messages should be processed by other servers even when
3164 * they believe a current cluster leader exists. Those RequestVote
3165 * requests can include a special flag to indicate this behavior (“I
3166 * have permission to disrupt the leader--it told me to!”).
3167 *
3168 * This clearly describes how the followers should act, but not the leader.
3169 * We just ignore vote requests that arrive at a current leader. This
3170 * seems to be fairly safe, since a majority other than the current leader
3171 * can still elect a new leader and the first AppendEntries from that new
3172 * leader will depose the current leader. */
3173 const struct raft_vote_request *rq = raft_vote_request_cast(rpc);
3174 if (rq->leadership_transfer) {
3175 return false;
3176 }
3177
3178 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3179 long long int now = time_msec();
3180 switch (raft->role) {
3181 case RAFT_LEADER:
3182 VLOG_WARN_RL(&rl, "ignoring vote request received as leader");
3183 return true;
3184
3185 case RAFT_FOLLOWER:
3186 if (now < raft->election_base + ELECTION_BASE_MSEC) {
3187 VLOG_WARN_RL(&rl, "ignoring vote request received after only "
3188 "%lld ms (minimum election time is %d ms)",
3189 now - raft->election_base, ELECTION_BASE_MSEC);
3190 return true;
3191 }
3192 return false;
3193
3194 case RAFT_CANDIDATE:
3195 return false;
3196
3197 default:
3198 OVS_NOT_REACHED();
3199 }
3200}
3201
3202/* Returns true if a reply should be sent. */
3203static bool
3204raft_handle_vote_request__(struct raft *raft,
3205 const struct raft_vote_request *rq)
3206{
3207 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3208 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3209 * 3.6)." */
3210 if (uuid_equals(&raft->vote, &rq->common.sid)) {
3211 /* Already voted for this candidate in this term. Resend vote. */
3212 return true;
3213 } else if (!uuid_is_zero(&raft->vote)) {
3214 /* Already voted for different candidate in this term. Send a reply
3215 * saying what candidate we did vote for. This isn't a necessary part
3216 * of the Raft protocol but it can make debugging easier. */
3217 return true;
3218 }
3219
3220 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3221 * includes information about the candidate’s log, and the voter denies its
3222 * vote if its own log is more up-to-date than that of the candidate. Raft
3223 * determines which of two logs is more up-to-date by comparing the index
3224 * and term of the last entries in the logs. If the logs have last entries
3225 * with different terms, then the log with the later term is more
3226 * up-to-date. If the logs end with the same term, then whichever log is
3227 * longer is more up-to-date." */
3228 uint64_t last_term = (raft->log_end > raft->log_start
3229 ? raft->entries[raft->log_end - 1
3230 - raft->log_start].term
3231 : raft->snap.term);
3232 if (last_term > rq->last_log_term
3233 || (last_term == rq->last_log_term
3234 && raft->log_end - 1 > rq->last_log_index)) {
3235 /* Our log is more up-to-date than the peer's. Withhold vote. */
3236 return false;
3237 }
3238
3239 /* Record a vote for the peer. */
3240 if (!raft_set_term(raft, raft->term, &rq->common.sid)) {
3241 return false;
3242 }
3243
3244 raft_reset_timer(raft);
3245
3246 return true;
3247}
3248
3249static void
3250raft_send_vote_reply(struct raft *raft, const struct uuid *dst,
3251 const struct uuid *vote)
3252{
3253 union raft_rpc rpy = {
3254 .vote_reply = {
3255 .common = {
3256 .type = RAFT_RPC_VOTE_REPLY,
3257 .sid = *dst,
3258 },
3259 .term = raft->term,
3260 .vote = *vote,
3261 },
3262 };
3263 raft_send(raft, &rpy);
3264}
3265
3266static void
3267raft_handle_vote_request(struct raft *raft,
3268 const struct raft_vote_request *rq)
3269{
3270 if (raft_handle_vote_request__(raft, rq)) {
3271 raft_send_vote_reply(raft, &rq->common.sid, &raft->vote);
3272 }
3273}
3274
3275static void
3276raft_handle_vote_reply(struct raft *raft,
3277 const struct raft_vote_reply *rpy)
3278{
3279 if (!raft_receive_term__(raft, &rpy->common, rpy->term)) {
3280 return;
3281 }
3282
3283 if (raft->role != RAFT_CANDIDATE) {
3284 return;
3285 }
3286
3287 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3288 if (s) {
3289 raft_accept_vote(raft, s, &rpy->vote);
3290 }
3291}
3292
3293/* Returns true if 'raft''s log contains reconfiguration entries that have not
3294 * yet been committed. */
3295static bool
3296raft_has_uncommitted_configuration(const struct raft *raft)
3297{
3298 for (uint64_t i = raft->commit_index + 1; i < raft->log_end; i++) {
3299 ovs_assert(i >= raft->log_start);
3300 const struct raft_entry *e = &raft->entries[i - raft->log_start];
3301 if (e->servers) {
3302 return true;
3303 }
3304 }
3305 return false;
3306}
3307
3308static void
3309raft_log_reconfiguration(struct raft *raft)
3310{
3311 struct json *servers_json = raft_servers_to_json(&raft->servers);
3312 raft_command_unref(raft_command_execute__(
3313 raft, NULL, servers_json, NULL, NULL));
3314 json_destroy(servers_json);
3315}
3316
3317static void
3318raft_run_reconfigure(struct raft *raft)
3319{
3320 ovs_assert(raft->role == RAFT_LEADER);
3321
3322 /* Reconfiguration only progresses when configuration changes commit. */
3323 if (raft_has_uncommitted_configuration(raft)) {
3324 return;
3325 }
3326
3327 /* If we were waiting for a configuration change to commit, it's done. */
3328 struct raft_server *s;
3329 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3330 if (s->phase == RAFT_PHASE_COMMITTING) {
3331 raft_send_add_server_reply__(raft, &s->sid, s->address,
3332 true, RAFT_SERVER_COMPLETED);
3333 s->phase = RAFT_PHASE_STABLE;
3334 }
3335 }
3336 if (raft->remove_server) {
3337 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
3338 &raft->remove_server->requester_sid,
3339 raft->remove_server->requester_conn,
3340 true, RAFT_SERVER_COMPLETED);
3341 raft_server_destroy(raft->remove_server);
3342 raft->remove_server = NULL;
3343 }
3344
3345 /* If a new server is caught up, add it to the configuration. */
3346 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
3347 if (s->phase == RAFT_PHASE_CAUGHT_UP) {
3348 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3349 hmap_remove(&raft->add_servers, &s->hmap_node);
3350 hmap_insert(&raft->servers, &s->hmap_node, uuid_hash(&s->sid));
3351
3352 /* Mark 's' as waiting for commit. */
3353 s->phase = RAFT_PHASE_COMMITTING;
3354
3355 raft_log_reconfiguration(raft);
3356
3357 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3358 * send a RAFT_SERVER_OK reply. */
3359
3360 return;
3361 }
3362 }
3363
3364 /* Remove a server, if one is scheduled for removal. */
3365 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3366 if (s->phase == RAFT_PHASE_REMOVE) {
3367 hmap_remove(&raft->servers, &s->hmap_node);
3368 raft->remove_server = s;
3369
3370 raft_log_reconfiguration(raft);
3371
3372 return;
3373 }
3374 }
3375}
3376
3377static void
3378raft_handle_add_server_request(struct raft *raft,
3379 const struct raft_add_server_request *rq)
3380{
3381 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3382 if (raft->role != RAFT_LEADER) {
3383 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3384 return;
3385 }
3386
3387 /* Check for an existing server. */
3388 struct raft_server *s = raft_find_server(raft, &rq->common.sid);
3389 if (s) {
3390 /* If the server is scheduled to be removed, cancel it. */
3391 if (s->phase == RAFT_PHASE_REMOVE) {
3392 s->phase = RAFT_PHASE_STABLE;
3393 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_CANCELED);
3394 return;
3395 }
3396
3397 /* If the server is being added, then it's in progress. */
3398 if (s->phase != RAFT_PHASE_STABLE) {
3399 raft_send_add_server_reply(raft, rq,
3400 false, RAFT_SERVER_IN_PROGRESS);
3401 }
3402
3403 /* Nothing to do--server is already part of the configuration. */
3404 raft_send_add_server_reply(raft, rq,
3405 true, RAFT_SERVER_ALREADY_PRESENT);
3406 return;
3407 }
3408
3409 /* Check for a server being removed. */
3410 if (raft->remove_server
3411 && uuid_equals(&rq->common.sid, &raft->remove_server->sid)) {
3412 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3413 return;
3414 }
3415
3416 /* Check for a server already being added. */
3417 if (raft_find_new_server(raft, &rq->common.sid)) {
3418 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_IN_PROGRESS);
3419 return;
3420 }
3421
3422 /* Add server to 'add_servers'. */
3423 s = raft_server_add(&raft->add_servers, &rq->common.sid, rq->address);
3424 raft_server_init_leader(raft, s);
3425 s->requester_sid = rq->common.sid;
3426 s->requester_conn = NULL;
3427 s->phase = RAFT_PHASE_CATCHUP;
3428
3429 /* Start sending the log. If this is the first time we've tried to add
3430 * this server, then this will quickly degenerate into an InstallSnapshot
3431 * followed by a series of AddEntries, but if it's a retry of an earlier
3432 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3433 * leadership) then it will gracefully resume populating the log.
3434 *
3435 * See the last few paragraphs of section 4.2.1 for further insight. */
3436 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3437 VLOG_INFO_RL(&rl,
3438 "starting to add server %s ("SID_FMT" at %s) "
3439 "to cluster "CID_FMT, s->nickname, SID_ARGS(&s->sid),
3440 rq->address, CID_ARGS(&raft->cid));
3441 raft_send_append_request(raft, s, 0, "initialize new server");
3442}
3443
3444static void
3445raft_handle_add_server_reply(struct raft *raft,
3446 const struct raft_add_server_reply *rpy)
3447{
3448 if (!raft->joining) {
3449 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3450 VLOG_WARN_RL(&rl, "received add_server_reply even though we're "
3451 "already part of the cluster");
3452 return;
3453 }
3454
3455 if (rpy->success) {
3456 raft->joining = false;
3457
3458 /* It is tempting, at this point, to check that this server is part of
3459 * the current configuration. However, this is not necessarily the
3460 * case, because the log entry that added this server to the cluster
3461 * might have been committed by a majority of the cluster that does not
3462 * include this one. This actually happens in testing. */
3463 } else {
3464 const char *address;
3465 SSET_FOR_EACH (address, &rpy->remote_addresses) {
3466 if (sset_add(&raft->remote_addresses, address)) {
3467 VLOG_INFO("%s: learned new server address for joining cluster",
3468 address);
3469 }
3470 }
3471 }
3472}
3473
3474/* This is called by raft_unixctl_kick() as well as via RPC. */
3475static void
3476raft_handle_remove_server_request(struct raft *raft,
3477 const struct raft_remove_server_request *rq)
3478{
3479 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3480 if (raft->role != RAFT_LEADER) {
3481 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3482 return;
3483 }
3484
3485 /* If the server to remove is currently waiting to be added, cancel it. */
3486 struct raft_server *target = raft_find_new_server(raft, &rq->sid);
3487 if (target) {
3488 raft_send_add_server_reply__(raft, &target->sid, target->address,
3489 false, RAFT_SERVER_CANCELED);
3490 hmap_remove(&raft->add_servers, &target->hmap_node);
3491 raft_server_destroy(target);
3492 return;
3493 }
3494
3495 /* If the server isn't configured, report that. */
3496 target = raft_find_server(raft, &rq->sid);
3497 if (!target) {
3498 raft_send_remove_server_reply(raft, rq,
3499 true, RAFT_SERVER_ALREADY_GONE);
3500 return;
3501 }
3502
3503 /* Check whether we're waiting for the addition of the server to commit. */
3504 if (target->phase == RAFT_PHASE_COMMITTING) {
3505 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3506 return;
3507 }
3508
3509 /* Check whether the server is already scheduled for removal. */
3510 if (target->phase == RAFT_PHASE_REMOVE) {
3511 raft_send_remove_server_reply(raft, rq,
3512 false, RAFT_SERVER_IN_PROGRESS);
3513 return;
3514 }
3515
3516 /* Make sure that if we remove this server then that at least one other
3517 * server will be left. We don't count servers currently being added (in
3518 * 'add_servers') since those could fail. */
3519 struct raft_server *s;
3520 int n = 0;
3521 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3522 if (s != target && s->phase != RAFT_PHASE_REMOVE) {
3523 n++;
3524 }
3525 }
3526 if (!n) {
3527 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_EMPTY);
3528 return;
3529 }
3530
3531 /* Mark the server for removal. */
3532 target->phase = RAFT_PHASE_REMOVE;
3533 if (rq->requester_conn) {
3534 target->requester_sid = UUID_ZERO;
3535 unixctl_command_reply(rq->requester_conn, "started removal");
3536 } else {
3537 target->requester_sid = rq->common.sid;
3538 target->requester_conn = NULL;
3539 }
3540
3541 raft_run_reconfigure(raft);
3542 /* Operation in progress, reply will be sent later. */
3543}
3544
3545static void
3546raft_handle_remove_server_reply(struct raft *raft,
3547 const struct raft_remove_server_reply *rpc)
3548{
3549 if (rpc->success) {
3550 VLOG_INFO(SID_FMT": finished leaving cluster "CID_FMT,
3551 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
3552
3553 raft_record_note(raft, "left", "this server left the cluster");
3554
3555 raft->leaving = false;
3556 raft->left = true;
3557 }
3558}
3559
3560static bool
3561raft_handle_write_error(struct raft *raft, struct ovsdb_error *error)
3562{
3563 if (error && !raft->failed) {
3564 raft->failed = true;
3565
3566 char *s = ovsdb_error_to_string_free(error);
3567 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3568 raft->name, s);
3569 free(s);
3570 }
3571 return !raft->failed;
3572}
3573
3574static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3575raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
3576 uint64_t new_log_start,
3577 const struct raft_entry *new_snapshot)
3578{
3579 struct raft_header h = {
3580 .sid = raft->sid,
3581 .cid = raft->cid,
3582 .name = raft->name,
3583 .local_address = raft->local_address,
3584 .snap_index = new_log_start - 1,
3585 .snap = *new_snapshot,
3586 };
3587 struct ovsdb_error *error = ovsdb_log_write_and_free(
3588 log, raft_header_to_json(&h));
3589 if (error) {
3590 return error;
3591 }
3592 ovsdb_log_mark_base(raft->log);
3593
3594 /* Write log records. */
3595 for (uint64_t index = new_log_start; index < raft->log_end; index++) {
3596 const struct raft_entry *e = &raft->entries[index - raft->log_start];
3597 struct raft_record r = {
3598 .type = RAFT_REC_ENTRY,
3599 .term = e->term,
3600 .entry = {
3601 .index = index,
3602 .data = e->data,
3603 .servers = e->servers,
3604 .eid = e->eid,
3605 },
3606 };
3607 error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3608 if (error) {
3609 return error;
3610 }
3611 }
3612
3613 /* Write term and vote (if any).
3614 *
3615 * The term is redundant if we wrote a log record for that term above. The
3616 * vote, if any, is never redundant.
3617 */
3618 error = raft_write_state(log, raft->term, &raft->vote);
3619 if (error) {
3620 return error;
3621 }
3622
3623 /* Write commit_index if it's beyond the new start of the log. */
3624 if (raft->commit_index >= new_log_start) {
3625 struct raft_record r = {
3626 .type = RAFT_REC_COMMIT_INDEX,
3627 .commit_index = raft->commit_index,
3628 };
3629 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3630 }
3631 return NULL;
3632}
3633
3634static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3635raft_save_snapshot(struct raft *raft,
3636 uint64_t new_start, const struct raft_entry *new_snapshot)
3637
3638{
3639 struct ovsdb_log *new_log;
3640 struct ovsdb_error *error;
3641 error = ovsdb_log_replace_start(raft->log, &new_log);
3642 if (error) {
3643 return error;
3644 }
3645
3646 error = raft_write_snapshot(raft, new_log, new_start, new_snapshot);
3647 if (error) {
3648 ovsdb_log_replace_abort(new_log);
3649 return error;
3650 }
3651
3652 return ovsdb_log_replace_commit(raft->log, new_log);
3653}
3654
3655static bool
3656raft_handle_install_snapshot_request__(
3657 struct raft *raft, const struct raft_install_snapshot_request *rq)
3658{
3659 raft_reset_timer(raft);
3660
3661 /*
3662 * Our behavior here depend on new_log_start in the snapshot compared to
3663 * log_start and log_end. There are three cases:
3664 *
3665 * Case 1 | Case 2 | Case 3
3666 * <---------------->|<------------->|<------------------>
3667 * | |
3668 *
3669 * +---+---+---+---+
3670 * T | T | T | T | T |
3671 * +---+---+---+---+
3672 * ^ ^
3673 * | |
3674 * log_start log_end
3675 */
3676 uint64_t new_log_start = rq->last_index + 1;
3677 if (new_log_start < raft->log_start) {
3678 /* Case 1: The new snapshot covers less than our current one. Nothing
3679 * to do. */
3680 return true;
3681 } else if (new_log_start < raft->log_end) {
3682 /* Case 2: The new snapshot starts in the middle of our log. We could
3683 * discard the first 'new_log_start - raft->log_start' entries in the
3684 * log. But there's not much value in that, since snapshotting is
3685 * supposed to be a local decision. Just skip it. */
3686 return true;
3687 }
3688
3689 /* Case 3: The new snapshot starts past the end of our current log, so
3690 * discard all of our current log. */
3691 const struct raft_entry new_snapshot = {
3692 .term = rq->last_term,
3693 .data = rq->data,
3694 .eid = rq->last_eid,
3695 .servers = rq->last_servers,
3696 };
3697 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3698 &new_snapshot);
3699 if (error) {
3700 char *error_s = ovsdb_error_to_string(error);
3701 VLOG_WARN("could not save snapshot: %s", error_s);
3702 free(error_s);
3703 return false;
3704 }
3705
3706 for (size_t i = 0; i < raft->log_end - raft->log_start; i++) {
3707 raft_entry_uninit(&raft->entries[i]);
3708 }
3709 raft->log_start = raft->log_end = new_log_start;
3710 raft->log_synced = raft->log_end - 1;
3711 raft->commit_index = raft->log_start - 1;
3712 if (raft->last_applied < raft->commit_index) {
3713 raft->last_applied = raft->log_start - 2;
3714 }
3715
3716 raft_entry_uninit(&raft->snap);
3717 raft_entry_clone(&raft->snap, &new_snapshot);
3718
3719 raft_get_servers_from_log(raft, VLL_INFO);
3720
3721 return true;
3722}
3723
3724static void
3725raft_handle_install_snapshot_request(
3726 struct raft *raft, const struct raft_install_snapshot_request *rq)
3727{
3728 if (raft_handle_install_snapshot_request__(raft, rq)) {
3729 union raft_rpc rpy = {
3730 .install_snapshot_reply = {
3731 .common = {
3732 .type = RAFT_RPC_INSTALL_SNAPSHOT_REPLY,
3733 .sid = rq->common.sid,
3734 },
3735 .term = raft->term,
3736 .last_index = rq->last_index,
3737 .last_term = rq->last_term,
3738 },
3739 };
3740 raft_send(raft, &rpy);
3741 }
3742}
3743
3744static void
3745raft_handle_install_snapshot_reply(
3746 struct raft *raft, const struct raft_install_snapshot_reply *rpy)
3747{
3748 /* We might get an InstallSnapshot reply from a configured server (e.g. a
3749 * peer) or a server in the process of being added. */
3750 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3751 if (!s) {
3752 s = raft_find_new_server(raft, &rpy->common.sid);
3753 if (!s) {
3754 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3755 VLOG_INFO_RL(&rl, "cluster "CID_FMT": received %s from "
3756 "unknown server "SID_FMT, CID_ARGS(&raft->cid),
3757 raft_rpc_type_to_string(rpy->common.type),
3758 SID_ARGS(&rpy->common.sid));
3759 return;
3760 }
3761 }
3762
3763 if (rpy->last_index != raft->log_start - 1 ||
3764 rpy->last_term != raft->snap.term) {
3765 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3766 VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s installed "
3767 "out-of-date snapshot, starting over",
3768 CID_ARGS(&raft->cid), s->nickname);
3769 raft_send_install_snapshot_request(raft, s,
3770 "installed obsolete snapshot");
3771 return;
3772 }
3773
3774 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3775 VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
3776 " up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
3777 s->nickname, rpy->last_term, rpy->last_index);
3778 s->next_index = raft->log_end;
3779 raft_send_append_request(raft, s, 0, "snapshot installed");
3780}
3781
3782/* Returns true if 'raft' has grown enough since the last snapshot that
3783 * reducing the log to a snapshot would be valuable, false otherwise. */
3784bool
3785raft_grew_lots(const struct raft *raft)
3786{
3787 return ovsdb_log_grew_lots(raft->log);
3788}
3789
3790/* Returns the number of log entries that could be trimmed off the on-disk log
3791 * by snapshotting. */
3792uint64_t
3793raft_get_log_length(const struct raft *raft)
3794{
3795 return (raft->last_applied < raft->log_start
3796 ? 0
3797 : raft->last_applied - raft->log_start + 1);
3798}
3799
3800/* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
3801 * possible. */
3802bool
3803raft_may_snapshot(const struct raft *raft)
3804{
3805 return (!raft->joining
3806 && !raft->leaving
3807 && !raft->left
3808 && !raft->failed
3809 && raft->last_applied >= raft->log_start);
3810}
3811
3812/* Replaces the log for 'raft', up to the last log entry read, by
3813 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
3814 * the caller must eventually free.
3815 *
3816 * This function can only succeed if raft_may_snapshot() returns true. It is
3817 * only valuable to call it if raft_get_log_length() is significant and
3818 * especially if raft_grew_lots() returns true. */
3819struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3820raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
3821{
3822 if (raft->joining) {
3823 return ovsdb_error(NULL,
3824 "cannot store a snapshot while joining cluster");
3825 } else if (raft->leaving) {
3826 return ovsdb_error(NULL,
3827 "cannot store a snapshot while leaving cluster");
3828 } else if (raft->left) {
3829 return ovsdb_error(NULL,
3830 "cannot store a snapshot after leaving cluster");
3831 } else if (raft->failed) {
3832 return ovsdb_error(NULL,
3833 "cannot store a snapshot following failure");
3834 }
3835
3836 if (raft->last_applied < raft->log_start) {
3837 return ovsdb_error(NULL, "not storing a duplicate snapshot");
3838 }
3839
3840 uint64_t new_log_start = raft->last_applied + 1;
a521491b 3841 struct raft_entry new_snapshot = {
1b1d2e6d 3842 .term = raft_get_term(raft, new_log_start - 1),
a521491b 3843 .data = json_clone(new_snapshot_data),
1b1d2e6d 3844 .eid = *raft_get_eid(raft, new_log_start - 1),
a521491b 3845 .servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
1b1d2e6d
BP
3846 };
3847 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3848 &new_snapshot);
3849 if (error) {
a521491b 3850 raft_entry_uninit(&new_snapshot);
1b1d2e6d
BP
3851 return error;
3852 }
3853
3854 raft->log_synced = raft->log_end - 1;
3855 raft_entry_uninit(&raft->snap);
a521491b 3856 raft->snap = new_snapshot;
1b1d2e6d
BP
3857 for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
3858 raft_entry_uninit(&raft->entries[i]);
3859 }
3860 memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
3861 (raft->log_end - new_log_start) * sizeof *raft->entries);
3862 raft->log_start = new_log_start;
3863 return NULL;
3864}
3865
3866static void
3867raft_handle_become_leader(struct raft *raft,
3868 const struct raft_become_leader *rq)
3869{
3870 if (raft->role == RAFT_FOLLOWER) {
3871 char buf[SID_LEN + 1];
3872 VLOG_INFO("received leadership transfer from %s in term %"PRIu64,
3873 raft_get_nickname(raft, &rq->common.sid, buf, sizeof buf),
3874 rq->term);
3875 raft_start_election(raft, true);
3876 }
3877}
3878
3879static void
3880raft_send_execute_command_reply(struct raft *raft,
3881 const struct uuid *sid,
3882 const struct uuid *eid,
3883 enum raft_command_status status,
3884 uint64_t commit_index)
3885{
3886 union raft_rpc rpc = {
3887 .execute_command_reply = {
3888 .common = {
3889 .type = RAFT_RPC_EXECUTE_COMMAND_REPLY,
3890 .sid = *sid,
3891 },
3892 .result = *eid,
3893 .status = status,
3894 .commit_index = commit_index,
3895 },
3896 };
3897 raft_send(raft, &rpc);
3898}
3899
3900static enum raft_command_status
3901raft_handle_execute_command_request__(
3902 struct raft *raft, const struct raft_execute_command_request *rq)
3903{
3904 if (raft->role != RAFT_LEADER) {
3905 return RAFT_CMD_NOT_LEADER;
3906 }
3907
3908 const struct uuid *current_eid = raft_current_eid(raft);
3909 if (!uuid_equals(&rq->prereq, current_eid)) {
3910 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3911 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
3912 "prerequisite "UUID_FMT" in execute_command_request",
3913 UUID_ARGS(current_eid), UUID_ARGS(&rq->prereq));
3914 return RAFT_CMD_BAD_PREREQ;
3915 }
3916
3917 struct raft_command *cmd = raft_command_initiate(raft, rq->data,
3918 NULL, &rq->result);
3919 cmd->sid = rq->common.sid;
3920
3921 enum raft_command_status status = cmd->status;
3922 if (status != RAFT_CMD_INCOMPLETE) {
3923 raft_command_unref(cmd);
3924 }
3925 return status;
3926}
3927
3928static void
3929raft_handle_execute_command_request(
3930 struct raft *raft, const struct raft_execute_command_request *rq)
3931{
3932 enum raft_command_status status
3933 = raft_handle_execute_command_request__(raft, rq);
3934 if (status != RAFT_CMD_INCOMPLETE) {
3935 raft_send_execute_command_reply(raft, &rq->common.sid, &rq->result,
3936 status, 0);
3937 }
3938}
3939
3940static void
3941raft_handle_execute_command_reply(
3942 struct raft *raft, const struct raft_execute_command_reply *rpy)
3943{
3944 struct raft_command *cmd = raft_find_command_by_eid(raft, &rpy->result);
3945 if (!cmd) {
3946 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3947 char buf[SID_LEN + 1];
3948 VLOG_INFO_RL(&rl,
3949 "%s received \"%s\" reply from %s for unknown command",
3950 raft->local_nickname,
3951 raft_command_status_to_string(rpy->status),
3952 raft_get_nickname(raft, &rpy->common.sid,
3953 buf, sizeof buf));
3954 return;
3955 }
3956
3957 if (rpy->status == RAFT_CMD_INCOMPLETE) {
3958 cmd->timestamp = time_msec();
3959 } else {
3960 cmd->index = rpy->commit_index;
3961 raft_command_complete(raft, cmd, rpy->status);
3962 }
3963}
3964
3965static void
3966raft_handle_rpc(struct raft *raft, const union raft_rpc *rpc)
3967{
3968 uint64_t term = raft_rpc_get_term(rpc);
3969 if (term
3970 && !raft_should_suppress_disruptive_server(raft, rpc)
3971 && !raft_receive_term__(raft, &rpc->common, term)) {
3972 if (rpc->type == RAFT_RPC_APPEND_REQUEST) {
3973 /* Section 3.3: "If a server receives a request with a stale term
3974 * number, it rejects the request." */
3975 raft_send_append_reply(raft, raft_append_request_cast(rpc),
3976 RAFT_APPEND_INCONSISTENCY, "stale term");
3977 }
3978 return;
3979 }
3980
3981 switch (rpc->type) {
3982#define RAFT_RPC(ENUM, NAME) \
3983 case ENUM: \
3984 raft_handle_##NAME(raft, &rpc->NAME); \
3985 break;
3986 RAFT_RPC_TYPES
3987#undef RAFT_RPC
3988 default:
3989 OVS_NOT_REACHED();
3990 }
3991}
3992\f
3993static bool
3994raft_rpc_is_heartbeat(const union raft_rpc *rpc)
3995{
3996 return ((rpc->type == RAFT_RPC_APPEND_REQUEST
3997 || rpc->type == RAFT_RPC_APPEND_REPLY)
3998 && rpc->common.comment
3999 && !strcmp(rpc->common.comment, "heartbeat"));
4000}
4001
4002\f
4003static bool
4004raft_send__(struct raft *raft, const union raft_rpc *rpc,
4005 struct raft_conn *conn)
4006{
4007 log_rpc(rpc, "-->", conn);
4008 return !jsonrpc_session_send(
4009 conn->js, raft_rpc_to_jsonrpc(&raft->cid, &raft->sid, rpc));
4010}
4011
4012static bool
4013raft_is_rpc_synced(const struct raft *raft, const union raft_rpc *rpc)
4014{
4015 uint64_t term = raft_rpc_get_term(rpc);
4016 uint64_t index = raft_rpc_get_min_sync_index(rpc);
4017 const struct uuid *vote = raft_rpc_get_vote(rpc);
4018
4019 return (term <= raft->synced_term
4020 && index <= raft->log_synced
4021 && (!vote || uuid_equals(vote, &raft->synced_vote)));
4022}
4023
4024static bool
4025raft_send(struct raft *raft, const union raft_rpc *rpc)
4026{
4027 const struct uuid *dst = &rpc->common.sid;
4028 if (uuid_equals(dst, &raft->sid)) {
4029 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4030 VLOG_WARN_RL(&rl, "attempting to send RPC to self");
4031 return false;
4032 }
4033
4034 struct raft_conn *conn = raft_find_conn_by_sid(raft, dst);
4035 if (!conn) {
4036 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4037 char buf[SID_LEN + 1];
4038 VLOG_DBG_RL(&rl, "%s: no connection to %s, cannot send RPC",
4039 raft->local_nickname,
4040 raft_get_nickname(raft, dst, buf, sizeof buf));
4041 return false;
4042 }
4043
4044 if (!raft_is_rpc_synced(raft, rpc)) {
4045 raft_waiter_create(raft, RAFT_W_RPC, false)->rpc = raft_rpc_clone(rpc);
4046 return true;
4047 }
4048
4049 return raft_send__(raft, rpc, conn);
4050}
4051\f
4052static struct raft *
4053raft_lookup_by_name(const char *name)
4054{
4055 struct raft *raft;
4056
4057 HMAP_FOR_EACH_WITH_HASH (raft, hmap_node, hash_string(name, 0),
4058 &all_rafts) {
4059 if (!strcmp(raft->name, name)) {
4060 return raft;
4061 }
4062 }
4063 return NULL;
4064}
4065
4066static void
4067raft_unixctl_cid(struct unixctl_conn *conn,
4068 int argc OVS_UNUSED, const char *argv[],
4069 void *aux OVS_UNUSED)
4070{
4071 struct raft *raft = raft_lookup_by_name(argv[1]);
4072 if (!raft) {
4073 unixctl_command_reply_error(conn, "unknown cluster");
4074 } else if (uuid_is_zero(&raft->cid)) {
4075 unixctl_command_reply_error(conn, "cluster id not yet known");
4076 } else {
4077 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->cid));
4078 unixctl_command_reply(conn, uuid);
4079 free(uuid);
4080 }
4081}
4082
4083static void
4084raft_unixctl_sid(struct unixctl_conn *conn,
4085 int argc OVS_UNUSED, const char *argv[],
4086 void *aux OVS_UNUSED)
4087{
4088 struct raft *raft = raft_lookup_by_name(argv[1]);
4089 if (!raft) {
4090 unixctl_command_reply_error(conn, "unknown cluster");
4091 } else {
4092 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->sid));
4093 unixctl_command_reply(conn, uuid);
4094 free(uuid);
4095 }
4096}
4097
4098static void
4099raft_put_sid(const char *title, const struct uuid *sid,
4100 const struct raft *raft, struct ds *s)
4101{
4102 ds_put_format(s, "%s: ", title);
4103 if (uuid_equals(sid, &raft->sid)) {
4104 ds_put_cstr(s, "self");
4105 } else if (uuid_is_zero(sid)) {
4106 ds_put_cstr(s, "unknown");
4107 } else {
4108 char buf[SID_LEN + 1];
4109 ds_put_cstr(s, raft_get_nickname(raft, sid, buf, sizeof buf));
4110 }
4111 ds_put_char(s, '\n');
4112}
4113
4114static void
4115raft_unixctl_status(struct unixctl_conn *conn,
4116 int argc OVS_UNUSED, const char *argv[],
4117 void *aux OVS_UNUSED)
4118{
4119 struct raft *raft = raft_lookup_by_name(argv[1]);
4120 if (!raft) {
4121 unixctl_command_reply_error(conn, "unknown cluster");
4122 return;
4123 }
4124
4125 struct ds s = DS_EMPTY_INITIALIZER;
4126 ds_put_format(&s, "%s\n", raft->local_nickname);
4127 ds_put_format(&s, "Name: %s\n", raft->name);
4128 ds_put_format(&s, "Cluster ID: ");
4129 if (!uuid_is_zero(&raft->cid)) {
4130 ds_put_format(&s, CID_FMT" ("UUID_FMT")\n",
4131 CID_ARGS(&raft->cid), UUID_ARGS(&raft->cid));
4132 } else {
4133 ds_put_format(&s, "not yet known\n");
4134 }
4135 ds_put_format(&s, "Server ID: "SID_FMT" ("UUID_FMT")\n",
4136 SID_ARGS(&raft->sid), UUID_ARGS(&raft->sid));
4137 ds_put_format(&s, "Address: %s\n", raft->local_address);
4138 ds_put_format(&s, "Status: %s\n",
4139 raft->joining ? "joining cluster"
4140 : raft->leaving ? "leaving cluster"
4141 : raft->left ? "left cluster"
4142 : raft->failed ? "failed"
4143 : "cluster member");
4144 if (raft->joining) {
4145 ds_put_format(&s, "Remotes for joining:");
4146 const char *address;
4147 SSET_FOR_EACH (address, &raft->remote_addresses) {
4148 ds_put_format(&s, " %s", address);
4149 }
4150 ds_put_char(&s, '\n');
4151 }
4152 if (raft->role == RAFT_LEADER) {
4153 struct raft_server *as;
4154 HMAP_FOR_EACH (as, hmap_node, &raft->add_servers) {
4155 ds_put_format(&s, "Adding server %s ("SID_FMT" at %s) (%s)\n",
4156 as->nickname, SID_ARGS(&as->sid), as->address,
4157 raft_server_phase_to_string(as->phase));
4158 }
4159
4160 struct raft_server *rs = raft->remove_server;
4161 if (rs) {
4162 ds_put_format(&s, "Removing server %s ("SID_FMT" at %s) (%s)\n",
4163 rs->nickname, SID_ARGS(&rs->sid), rs->address,
4164 raft_server_phase_to_string(rs->phase));
4165 }
4166 }
4167
4168 ds_put_format(&s, "Role: %s\n",
4169 raft->role == RAFT_LEADER ? "leader"
4170 : raft->role == RAFT_CANDIDATE ? "candidate"
4171 : raft->role == RAFT_FOLLOWER ? "follower"
4172 : "<error>");
4173 ds_put_format(&s, "Term: %"PRIu64"\n", raft->term);
4174 raft_put_sid("Leader", &raft->leader_sid, raft, &s);
4175 raft_put_sid("Vote", &raft->vote, raft, &s);
4176 ds_put_char(&s, '\n');
4177
4178 ds_put_format(&s, "Log: [%"PRIu64", %"PRIu64"]\n",
4179 raft->log_start, raft->log_end);
4180
4181 uint64_t n_uncommitted = raft->log_end - raft->commit_index - 1;
4182 ds_put_format(&s, "Entries not yet committed: %"PRIu64"\n", n_uncommitted);
4183
4184 uint64_t n_unapplied = raft->log_end - raft->last_applied - 1;
4185 ds_put_format(&s, "Entries not yet applied: %"PRIu64"\n", n_unapplied);
4186
4187 const struct raft_conn *c;
4188 ds_put_cstr(&s, "Connections:");
4189 LIST_FOR_EACH (c, list_node, &raft->conns) {
4190 bool connected = jsonrpc_session_is_connected(c->js);
4191 ds_put_format(&s, " %s%s%s%s",
4192 connected ? "" : "(",
4193 c->incoming ? "<-" : "->", c->nickname,
4194 connected ? "" : ")");
4195 }
4196 ds_put_char(&s, '\n');
4197
4198 ds_put_cstr(&s, "Servers:\n");
4199 struct raft_server *server;
4200 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
4201 ds_put_format(&s, " %s ("SID_FMT" at %s)",
4202 server->nickname,
4203 SID_ARGS(&server->sid), server->address);
4204 if (uuid_equals(&server->sid, &raft->sid)) {
4205 ds_put_cstr(&s, " (self)");
4206 }
4207 if (server->phase != RAFT_PHASE_STABLE) {
4208 ds_put_format (&s, " (%s)",
4209 raft_server_phase_to_string(server->phase));
4210 }
4211 if (raft->role == RAFT_CANDIDATE) {
4212 if (!uuid_is_zero(&server->vote)) {
4213 char buf[SID_LEN + 1];
4214 ds_put_format(&s, " (voted for %s)",
4215 raft_get_nickname(raft, &server->vote,
4216 buf, sizeof buf));
4217 }
4218 } else if (raft->role == RAFT_LEADER) {
4219 ds_put_format(&s, " next_index=%"PRIu64" match_index=%"PRIu64,
4220 server->next_index, server->match_index);
4221 }
4222 ds_put_char(&s, '\n');
4223 }
4224
4225 unixctl_command_reply(conn, ds_cstr(&s));
4226 ds_destroy(&s);
4227}
4228
4229static void
4230raft_unixctl_leave__(struct unixctl_conn *conn, struct raft *raft)
4231{
4232 if (raft_is_leaving(raft)) {
4233 unixctl_command_reply_error(conn,
4234 "already in progress leaving cluster");
4235 } else if (raft_is_joining(raft)) {
4236 unixctl_command_reply_error(conn,
4237 "can't leave while join in progress");
4238 } else if (raft_failed(raft)) {
4239 unixctl_command_reply_error(conn,
4240 "can't leave after failure");
4241 } else {
4242 raft_leave(raft);
4243 unixctl_command_reply(conn, NULL);
4244 }
4245}
4246
4247static void
4248raft_unixctl_leave(struct unixctl_conn *conn, int argc OVS_UNUSED,
4249 const char *argv[], void *aux OVS_UNUSED)
4250{
4251 struct raft *raft = raft_lookup_by_name(argv[1]);
4252 if (!raft) {
4253 unixctl_command_reply_error(conn, "unknown cluster");
4254 return;
4255 }
4256
4257 raft_unixctl_leave__(conn, raft);
4258}
4259
4260static struct raft_server *
4261raft_lookup_server_best_match(struct raft *raft, const char *id)
4262{
4263 struct raft_server *best = NULL;
4264 int best_score = -1;
4265 int n_best = 0;
4266
4267 struct raft_server *s;
4268 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
4269 int score = (!strcmp(id, s->address)
4270 ? INT_MAX
4271 : uuid_is_partial_match(&s->sid, id));
4272 if (score > best_score) {
4273 best = s;
4274 best_score = score;
4275 n_best = 1;
4276 } else if (score == best_score) {
4277 n_best++;
4278 }
4279 }
4280 return n_best == 1 ? best : NULL;
4281}
4282
4283static void
4284raft_unixctl_kick(struct unixctl_conn *conn, int argc OVS_UNUSED,
4285 const char *argv[], void *aux OVS_UNUSED)
4286{
4287 const char *cluster_name = argv[1];
4288 const char *server_name = argv[2];
4289
4290 struct raft *raft = raft_lookup_by_name(cluster_name);
4291 if (!raft) {
4292 unixctl_command_reply_error(conn, "unknown cluster");
4293 return;
4294 }
4295
4296 struct raft_server *server = raft_lookup_server_best_match(raft,
4297 server_name);
4298 if (!server) {
4299 unixctl_command_reply_error(conn, "unknown server");
4300 return;
4301 }
4302
4303 if (uuid_equals(&server->sid, &raft->sid)) {
4304 raft_unixctl_leave__(conn, raft);
4305 } else if (raft->role == RAFT_LEADER) {
4306 const struct raft_remove_server_request rq = {
4307 .sid = server->sid,
4308 .requester_conn = conn,
4309 };
4310 raft_handle_remove_server_request(raft, &rq);
4311 } else {
4312 const union raft_rpc rpc = {
4313 .remove_server_request = {
4314 .common = {
4315 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
4316 .sid = raft->leader_sid,
4317 .comment = "via unixctl"
4318 },
4319 .sid = server->sid,
4320 }
4321 };
4322 if (raft_send(raft, &rpc)) {
4323 unixctl_command_reply(conn, "sent removal request to leader");
4324 } else {
4325 unixctl_command_reply_error(conn,
4326 "failed to send removal request");
4327 }
4328 }
4329}
4330
4331static void
4332raft_init(void)
4333{
4334 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
4335 if (!ovsthread_once_start(&once)) {
4336 return;
4337 }
4338 unixctl_command_register("cluster/cid", "DB", 1, 1,
4339 raft_unixctl_cid, NULL);
4340 unixctl_command_register("cluster/sid", "DB", 1, 1,
4341 raft_unixctl_sid, NULL);
4342 unixctl_command_register("cluster/status", "DB", 1, 1,
4343 raft_unixctl_status, NULL);
4344 unixctl_command_register("cluster/leave", "DB", 1, 1,
4345 raft_unixctl_leave, NULL);
4346 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4347 raft_unixctl_kick, NULL);
4348 ovsthread_once_done(&once);
4349}