]> git.proxmox.com Git - mirror_ovs.git/blame - ovsdb/raft.c
raft: Avoid use-after-free error in raft_update_commit_index().
[mirror_ovs.git] / ovsdb / raft.c
CommitLineData
1b1d2e6d
BP
1/*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <config.h>
18
19#include "raft.h"
20#include "raft-private.h"
21
22#include <errno.h>
23#include <unistd.h>
24
25#include "hash.h"
26#include "jsonrpc.h"
27#include "lockfile.h"
28#include "openvswitch/dynamic-string.h"
29#include "openvswitch/hmap.h"
30#include "openvswitch/json.h"
31#include "openvswitch/list.h"
32#include "openvswitch/poll-loop.h"
33#include "openvswitch/vlog.h"
34#include "ovsdb-error.h"
35#include "ovsdb-parser.h"
36#include "ovsdb/log.h"
37#include "raft-rpc.h"
38#include "random.h"
39#include "socket-util.h"
40#include "stream.h"
41#include "timeval.h"
42#include "unicode.h"
43#include "unixctl.h"
44#include "util.h"
45#include "uuid.h"
46
47VLOG_DEFINE_THIS_MODULE(raft);
48
49/* Roles for a Raft server:
50 *
51 * - Followers: Servers in touch with the current leader.
52 *
53 * - Candidate: Servers unaware of a current leader and seeking election to
54 * leader.
55 *
56 * - Leader: Handles all client requests. At most one at a time.
57 *
58 * In normal operation there is exactly one leader and all of the other servers
59 * are followers. */
60enum raft_role {
61 RAFT_FOLLOWER,
62 RAFT_CANDIDATE,
63 RAFT_LEADER
64};
65
66/* A connection between this Raft server and another one. */
67struct raft_conn {
68 struct ovs_list list_node; /* In struct raft's 'conns' list. */
69 struct jsonrpc_session *js; /* JSON-RPC connection. */
70 struct uuid sid; /* This server's unique ID. */
71 char *nickname; /* Short name for use in log messages. */
72 bool incoming; /* True if incoming, false if outgoing. */
73 unsigned int js_seqno; /* Seqno for noticing (re)connections. */
74};
75
76static void raft_conn_close(struct raft_conn *);
77
78/* A "command", that is, a request to append an entry to the log.
79 *
80 * The Raft specification only allows clients to issue commands to the leader.
81 * With this implementation, clients may issue a command on any server, which
82 * then relays the command to the leader if necessary.
83 *
84 * This structure is thus used in three cases:
85 *
86 * 1. We are the leader and the command was issued to us directly.
87 *
88 * 2. We are a follower and relayed the command to the leader.
89 *
90 * 3. We are the leader and a follower relayed the command to us.
91 */
92struct raft_command {
93 /* All cases. */
94 struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
95 unsigned int n_refs; /* Reference count. */
96 enum raft_command_status status; /* Execution status. */
97
98 /* Case 1 only. */
99 uint64_t index; /* Index in log (0 if being relayed). */
100
101 /* Cases 2 and 3. */
102 struct uuid eid; /* Entry ID of result. */
103
104 /* Case 2 only. */
105 long long int timestamp; /* Issue or last ping time, for expiration. */
106
107 /* Case 3 only. */
108 struct uuid sid; /* The follower (otherwise UUID_ZERO). */
109};
110
111static void raft_command_complete(struct raft *, struct raft_command *,
112 enum raft_command_status);
113
114static void raft_complete_all_commands(struct raft *,
115 enum raft_command_status);
116
117/* Type of deferred action, see struct raft_waiter. */
118enum raft_waiter_type {
119 RAFT_W_ENTRY,
120 RAFT_W_TERM,
121 RAFT_W_RPC,
122};
123
124/* An action deferred until a log write commits to disk. */
125struct raft_waiter {
126 struct ovs_list list_node;
127 uint64_t commit_ticket;
128
129 enum raft_waiter_type type;
130 union {
131 /* RAFT_W_ENTRY.
132 *
133 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
134 * completion, updates 'log_synced' to indicate that the new log entry
135 * or entries are committed and, if we are leader, also updates our
136 * local 'match_index'. */
137 struct {
138 uint64_t index;
139 } entry;
140
141 /* RAFT_W_TERM.
142 *
143 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
144 * Upon completion, updates 'synced_term' and 'synced_vote', which
145 * triggers sending RPCs deferred by the uncommitted 'term' and
146 * 'vote'. */
147 struct {
148 uint64_t term;
149 struct uuid vote;
150 } term;
151
152 /* RAFT_W_RPC.
153 *
154 * Sometimes, sending an RPC to a peer must be delayed until an entry,
155 * a term, or a vote mentioned in the RPC is synced to disk. This
156 * waiter keeps a copy of such an RPC until the previous waiters have
157 * committed. */
158 union raft_rpc *rpc;
159 };
160};
161
162static struct raft_waiter *raft_waiter_create(struct raft *,
163 enum raft_waiter_type,
164 bool start_commit);
165static void raft_waiters_destroy(struct raft *);
166
167/* The Raft state machine. */
168struct raft {
169 struct hmap_node hmap_node; /* In 'all_rafts'. */
170 struct ovsdb_log *log;
171
172/* Persistent derived state.
173 *
174 * This must be updated on stable storage before responding to RPCs. It can be
175 * derived from the header, snapshot, and log in 'log'. */
176
177 struct uuid cid; /* Cluster ID (immutable for the cluster). */
178 struct uuid sid; /* Server ID (immutable for the server). */
179 char *local_address; /* Local address (immutable for the server). */
180 char *local_nickname; /* Used for local server in log messages. */
181 char *name; /* Schema name (immutable for the cluster). */
182
183 /* Contains "struct raft_server"s and represents the server configuration
184 * most recently added to 'log'. */
185 struct hmap servers;
186
187/* Persistent state on all servers.
188 *
189 * Must be updated on stable storage before responding to RPCs. */
190
191 /* Current term and the vote for that term. These might be on the way to
192 * disk now. */
193 uint64_t term; /* Initialized to 0 and only increases. */
194 struct uuid vote; /* All-zeros if no vote yet in 'term'. */
195
196 /* The term and vote that have been synced to disk. */
197 uint64_t synced_term;
198 struct uuid synced_vote;
199
200 /* The log.
201 *
202 * A log entry with index 1 never really exists; the initial snapshot for a
203 * Raft is considered to include this index. The first real log entry has
204 * index 2.
205 *
206 * A new Raft instance contains an empty log: log_start=2, log_end=2.
207 * Over time, the log grows: log_start=2, log_end=N.
208 * At some point, the server takes a snapshot: log_start=N, log_end=N.
209 * The log continues to grow: log_start=N, log_end=N+1...
210 *
211 * Must be updated on stable storage before responding to RPCs. */
212 struct raft_entry *entries; /* Log entry i is in log[i - log_start]. */
213 uint64_t log_start; /* Index of first entry in log. */
214 uint64_t log_end; /* Index of last entry in log, plus 1. */
215 uint64_t log_synced; /* Index of last synced entry. */
216 size_t allocated_log; /* Allocated entries in 'log'. */
217
218 /* Snapshot state (see Figure 5.1)
219 *
220 * This is the state of the cluster as of the last discarded log entry,
221 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
222 * Only committed log entries can be included in a snapshot. */
223 struct raft_entry snap;
224
225/* Volatile state.
226 *
227 * The snapshot is always committed, but the rest of the log might not be yet.
228 * 'last_applied' tracks what entries have been passed to the client. If the
229 * client hasn't yet read the latest snapshot, then even the snapshot isn't
230 * applied yet. Thus, the invariants are different for these members:
231 *
232 * log_start - 2 <= last_applied <= commit_index < log_end.
233 * log_start - 1 <= commit_index < log_end.
234 */
235
236 enum raft_role role; /* Current role. */
237 uint64_t commit_index; /* Max log index known to be committed. */
238 uint64_t last_applied; /* Max log index applied to state machine. */
239 struct uuid leader_sid; /* Server ID of leader (zero, if unknown). */
240
241 /* Followers and candidates only. */
242#define ELECTION_BASE_MSEC 1024
243#define ELECTION_RANGE_MSEC 1024
244 long long int election_base; /* Time of last heartbeat from leader. */
245 long long int election_timeout; /* Time at which we start an election. */
246
247 /* Used for joining a cluster. */
248 bool joining; /* Attempting to join the cluster? */
249 struct sset remote_addresses; /* Addresses to try to find other servers. */
250 long long int join_timeout; /* Time to re-send add server request. */
251
252 /* Used for leaving a cluster. */
253 bool leaving; /* True if we are leaving the cluster. */
254 bool left; /* True if we have finished leaving. */
255 long long int leave_timeout; /* Time to re-send remove server request. */
256
257 /* Failure. */
258 bool failed; /* True if unrecoverable error has occurred. */
259
260 /* File synchronization. */
261 struct ovs_list waiters; /* Contains "struct raft_waiter"s. */
262
263 /* Network connections. */
264 struct pstream *listener; /* For connections from other Raft servers. */
265 long long int listen_backoff; /* For retrying creating 'listener'. */
266 struct ovs_list conns; /* Contains struct raft_conns. */
267
268 /* Leaders only. Reinitialized after becoming leader. */
269 struct hmap add_servers; /* Contains "struct raft_server"s to add. */
270 struct raft_server *remove_server; /* Server being removed. */
271 struct hmap commands; /* Contains "struct raft_command"s. */
272#define PING_TIME_MSEC (ELECTION_BASE_MSEC / 3)
273 long long int ping_timeout; /* Time at which to send a heartbeat */
274
275 /* Candidates only. Reinitialized at start of election. */
276 int n_votes; /* Number of votes for me. */
277};
278
279/* All Raft structures. */
280static struct hmap all_rafts = HMAP_INITIALIZER(&all_rafts);
281
282static void raft_init(void);
283
284static struct ovsdb_error *raft_read_header(struct raft *)
285 OVS_WARN_UNUSED_RESULT;
286
287static void raft_send_execute_command_reply(struct raft *,
288 const struct uuid *sid,
289 const struct uuid *eid,
290 enum raft_command_status,
291 uint64_t commit_index);
292
293static void raft_update_our_match_index(struct raft *, uint64_t min_index);
294
295static void raft_send_remove_server_reply__(
296 struct raft *, const struct uuid *target_sid,
297 const struct uuid *requester_sid, struct unixctl_conn *requester_conn,
298 bool success, const char *comment);
299
300static void raft_server_init_leader(struct raft *, struct raft_server *);
301
302static bool raft_rpc_is_heartbeat(const union raft_rpc *);
303static bool raft_is_rpc_synced(const struct raft *, const union raft_rpc *);
304
305static void raft_handle_rpc(struct raft *, const union raft_rpc *);
02acb41a
BP
306static bool raft_send_at(struct raft *, const union raft_rpc *,
307 int line_number);
308#define raft_send(raft, rpc) raft_send_at(raft, rpc, __LINE__)
309
310static bool raft_send_to_conn_at(struct raft *, const union raft_rpc *,
311 struct raft_conn *, int line_number);
312#define raft_send_to_conn(raft, rpc, conn) \
313 raft_send_to_conn_at(raft, rpc, conn, __LINE__)
314
1b1d2e6d
BP
315static void raft_send_append_request(struct raft *,
316 struct raft_server *, unsigned int n,
317 const char *comment);
318
319static void raft_become_leader(struct raft *);
320static void raft_become_follower(struct raft *);
321static void raft_reset_timer(struct raft *);
322static void raft_send_heartbeats(struct raft *);
323static void raft_start_election(struct raft *, bool leadership_transfer);
324static bool raft_truncate(struct raft *, uint64_t new_end);
325static void raft_get_servers_from_log(struct raft *, enum vlog_level);
326
327static bool raft_handle_write_error(struct raft *, struct ovsdb_error *);
328
329static void raft_run_reconfigure(struct raft *);
330
331static struct raft_server *
332raft_find_server(const struct raft *raft, const struct uuid *sid)
333{
334 return raft_server_find(&raft->servers, sid);
335}
336
337static char *
338raft_make_address_passive(const char *address_)
339{
340 if (!strncmp(address_, "unix:", 5)) {
341 return xasprintf("p%s", address_);
342 } else {
343 char *address = xstrdup(address_);
0b043300
BP
344 char *host, *port;
345 inet_parse_host_port_tokens(strchr(address, ':') + 1, &host, &port);
1b1d2e6d
BP
346
347 struct ds paddr = DS_EMPTY_INITIALIZER;
348 ds_put_format(&paddr, "p%.3s:%s:", address, port);
349 if (strchr(host, ':')) {
350 ds_put_format(&paddr, "[%s]", host);
351 } else {
352 ds_put_cstr(&paddr, host);
353 }
354 free(address);
355 return ds_steal_cstr(&paddr);
356 }
357}
358
359static struct raft *
360raft_alloc(void)
361{
362 raft_init();
363
364 struct raft *raft = xzalloc(sizeof *raft);
365 hmap_node_nullify(&raft->hmap_node);
366 hmap_init(&raft->servers);
367 raft->log_start = raft->log_end = 1;
368 raft->role = RAFT_FOLLOWER;
369 sset_init(&raft->remote_addresses);
370 raft->join_timeout = LLONG_MAX;
371 ovs_list_init(&raft->waiters);
372 raft->listen_backoff = LLONG_MIN;
373 ovs_list_init(&raft->conns);
374 hmap_init(&raft->add_servers);
375 hmap_init(&raft->commands);
376
377 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
378 raft_reset_timer(raft);
379
380 return raft;
381}
382
383/* Creates an on-disk file that represents a new Raft cluster and initializes
384 * it to consist of a single server, the one on which this function is called.
385 *
386 * Creates the local copy of the cluster's log in 'file_name', which must not
387 * already exist. Gives it the name 'name', which should be the database
388 * schema name and which is used only to match up this database with the server
389 * added to the cluster later if the cluster ID is unavailable.
390 *
391 * The new server is located at 'local_address', which must take one of the
392 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
393 * square bracket enclosed IPv6 address and PORT is a TCP port number.
394 *
395 * This only creates the on-disk file. Use raft_open() to start operating the
396 * new server.
397 *
398 * Returns null if successful, otherwise an ovsdb_error describing the
399 * problem. */
400struct ovsdb_error * OVS_WARN_UNUSED_RESULT
401raft_create_cluster(const char *file_name, const char *name,
402 const char *local_address, const struct json *data)
403{
404 /* Parse and verify validity of the local address. */
405 struct ovsdb_error *error = raft_address_validate(local_address);
406 if (error) {
407 return error;
408 }
409
410 /* Create log file. */
411 struct ovsdb_log *log;
412 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
413 -1, &log);
414 if (error) {
415 return error;
416 }
417
418 /* Write log file. */
419 struct raft_header h = {
420 .sid = uuid_random(),
421 .cid = uuid_random(),
422 .name = xstrdup(name),
423 .local_address = xstrdup(local_address),
424 .joining = false,
425 .remote_addresses = SSET_INITIALIZER(&h.remote_addresses),
426 .snap_index = 1,
427 .snap = {
428 .term = 1,
429 .data = json_nullable_clone(data),
430 .eid = uuid_random(),
431 .servers = json_object_create(),
432 },
433 };
434 shash_add_nocopy(json_object(h.snap.servers),
435 xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
436 json_string_create(local_address));
437 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
438 raft_header_uninit(&h);
439 if (!error) {
440 error = ovsdb_log_commit_block(log);
441 }
442 ovsdb_log_close(log);
443
444 return error;
445}
446
447/* Creates a database file that represents a new server in an existing Raft
448 * cluster.
449 *
450 * Creates the local copy of the cluster's log in 'file_name', which must not
451 * already exist. Gives it the name 'name', which must be the same name
452 * passed in to raft_create_cluster() earlier.
453 *
454 * 'cid' is optional. If specified, the new server will join only the cluster
455 * with the given cluster ID.
456 *
457 * The new server is located at 'local_address', which must take one of the
458 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
459 * square bracket enclosed IPv6 address and PORT is a TCP port number.
460 *
461 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
462 * specifies the addresses of existing servers in the cluster. One server out
463 * of the existing cluster is sufficient, as long as that server is reachable
464 * and not partitioned from the current cluster leader. If multiple servers
465 * from the cluster are specified, then it is sufficient for any of them to
466 * meet this criterion.
467 *
468 * This only creates the on-disk file and does no network access. Use
469 * raft_open() to start operating the new server. (Until this happens, the
470 * new server has not joined the cluster.)
471 *
472 * Returns null if successful, otherwise an ovsdb_error describing the
473 * problem. */
474struct ovsdb_error * OVS_WARN_UNUSED_RESULT
475raft_join_cluster(const char *file_name,
476 const char *name, const char *local_address,
477 const struct sset *remote_addresses,
478 const struct uuid *cid)
479{
480 ovs_assert(!sset_is_empty(remote_addresses));
481
482 /* Parse and verify validity of the addresses. */
483 struct ovsdb_error *error = raft_address_validate(local_address);
484 if (error) {
485 return error;
486 }
487 const char *addr;
488 SSET_FOR_EACH (addr, remote_addresses) {
489 error = raft_address_validate(addr);
490 if (error) {
491 return error;
492 }
493 if (!strcmp(addr, local_address)) {
494 return ovsdb_error(NULL, "remote addresses cannot be the same "
495 "as the local address");
496 }
497 }
498
499 /* Verify validity of the cluster ID (if provided). */
500 if (cid && uuid_is_zero(cid)) {
501 return ovsdb_error(NULL, "all-zero UUID is not valid cluster ID");
502 }
503
504 /* Create log file. */
505 struct ovsdb_log *log;
506 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
507 -1, &log);
508 if (error) {
509 return error;
510 }
511
512 /* Write log file. */
513 struct raft_header h = {
514 .sid = uuid_random(),
515 .cid = cid ? *cid : UUID_ZERO,
516 .name = xstrdup(name),
517 .local_address = xstrdup(local_address),
518 .joining = true,
519 /* No snapshot yet. */
520 };
521 sset_clone(&h.remote_addresses, remote_addresses);
522 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
523 raft_header_uninit(&h);
524 if (!error) {
525 error = ovsdb_log_commit_block(log);
526 }
527 ovsdb_log_close(log);
528
529 return error;
530}
531
532/* Reads the initial header record from 'log', which must be a Raft clustered
533 * database log, and populates '*md' with the information read from it. The
534 * caller must eventually destroy 'md' with raft_metadata_destroy().
535 *
536 * On success, returns NULL. On failure, returns an error that the caller must
537 * eventually destroy and zeros '*md'. */
538struct ovsdb_error * OVS_WARN_UNUSED_RESULT
539raft_read_metadata(struct ovsdb_log *log, struct raft_metadata *md)
540{
541 struct raft *raft = raft_alloc();
542 raft->log = log;
543
544 struct ovsdb_error *error = raft_read_header(raft);
545 if (!error) {
546 md->sid = raft->sid;
547 md->name = xstrdup(raft->name);
548 md->local = xstrdup(raft->local_address);
549 md->cid = raft->cid;
550 } else {
551 memset(md, 0, sizeof *md);
552 }
553
554 raft->log = NULL;
555 raft_close(raft);
556 return error;
557}
558
559/* Frees the metadata in 'md'. */
560void
561raft_metadata_destroy(struct raft_metadata *md)
562{
563 if (md) {
564 free(md->name);
565 free(md->local);
566 }
567}
568
569static const struct raft_entry *
570raft_get_entry(const struct raft *raft, uint64_t index)
571{
572 ovs_assert(index >= raft->log_start);
573 ovs_assert(index < raft->log_end);
574 return &raft->entries[index - raft->log_start];
575}
576
577static uint64_t
578raft_get_term(const struct raft *raft, uint64_t index)
579{
580 return (index == raft->log_start - 1
581 ? raft->snap.term
582 : raft_get_entry(raft, index)->term);
583}
584
585static const struct json *
586raft_servers_for_index(const struct raft *raft, uint64_t index)
587{
588 ovs_assert(index >= raft->log_start - 1);
589 ovs_assert(index < raft->log_end);
590
591 const struct json *servers = raft->snap.servers;
592 for (uint64_t i = raft->log_start; i <= index; i++) {
593 const struct raft_entry *e = raft_get_entry(raft, i);
594 if (e->servers) {
595 servers = e->servers;
596 }
597 }
598 return servers;
599}
600
601static void
602raft_set_servers(struct raft *raft, const struct hmap *new_servers,
603 enum vlog_level level)
604{
605 struct raft_server *s, *next;
606 HMAP_FOR_EACH_SAFE (s, next, hmap_node, &raft->servers) {
607 if (!raft_server_find(new_servers, &s->sid)) {
608 ovs_assert(s != raft->remove_server);
609
610 hmap_remove(&raft->servers, &s->hmap_node);
611 VLOG(level, "server %s removed from configuration", s->nickname);
612 raft_server_destroy(s);
613 }
614 }
615
616 HMAP_FOR_EACH_SAFE (s, next, hmap_node, new_servers) {
617 if (!raft_find_server(raft, &s->sid)) {
618 VLOG(level, "server %s added to configuration", s->nickname);
619
620 struct raft_server *new
621 = raft_server_add(&raft->servers, &s->sid, s->address);
622 raft_server_init_leader(raft, new);
623 }
624 }
625}
626
627static uint64_t
628raft_add_entry(struct raft *raft,
629 uint64_t term, struct json *data, const struct uuid *eid,
630 struct json *servers)
631{
632 if (raft->log_end - raft->log_start >= raft->allocated_log) {
633 raft->entries = x2nrealloc(raft->entries, &raft->allocated_log,
634 sizeof *raft->entries);
635 }
636
637 uint64_t index = raft->log_end++;
638 struct raft_entry *entry = &raft->entries[index - raft->log_start];
639 entry->term = term;
640 entry->data = data;
641 entry->eid = eid ? *eid : UUID_ZERO;
642 entry->servers = servers;
643 return index;
644}
645
646/* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers' to
647 * 'raft''s log and returns an error indication. */
648static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
649raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
650 const struct uuid *eid, struct json *servers)
651{
652 struct raft_record r = {
653 .type = RAFT_REC_ENTRY,
654 .term = term,
655 .entry = {
656 .index = raft_add_entry(raft, term, data, eid, servers),
657 .data = data,
658 .servers = servers,
659 .eid = eid ? *eid : UUID_ZERO,
660 },
661 };
662 return ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r));
663}
664
665static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
666raft_write_state(struct ovsdb_log *log,
667 uint64_t term, const struct uuid *vote)
668{
669 struct raft_record r = { .term = term };
670 if (vote && !uuid_is_zero(vote)) {
671 r.type = RAFT_REC_VOTE;
672 r.sid = *vote;
673 } else {
674 r.type = RAFT_REC_TERM;
675 }
676 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
677}
678
679static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
680raft_apply_record(struct raft *raft, unsigned long long int rec_idx,
681 const struct raft_record *r)
682{
683 /* Apply "term", which is present in most kinds of records (and otherwise
684 * 0).
685 *
686 * A Raft leader can replicate entries from previous terms to the other
687 * servers in the cluster, retaining the original terms on those entries
688 * (see section 3.6.2 "Committing entries from previous terms" for more
689 * information), so it's OK for the term in a log record to precede the
690 * current term. */
691 if (r->term > raft->term) {
692 raft->term = raft->synced_term = r->term;
693 raft->vote = raft->synced_vote = UUID_ZERO;
694 }
695
696 switch (r->type) {
697 case RAFT_REC_ENTRY:
698 if (r->entry.index < raft->commit_index) {
699 return ovsdb_error(NULL, "record %llu attempts to truncate log "
700 "from %"PRIu64" to %"PRIu64" entries, but "
701 "commit index is already %"PRIu64,
702 rec_idx, raft->log_end, r->entry.index,
703 raft->commit_index);
704 } else if (r->entry.index > raft->log_end) {
705 return ovsdb_error(NULL, "record %llu with index %"PRIu64" skips "
706 "past expected index %"PRIu64,
707 rec_idx, r->entry.index, raft->log_end);
708 }
709
710 if (r->entry.index < raft->log_end) {
711 /* This can happen, but it is notable. */
712 VLOG_DBG("record %llu truncates log from %"PRIu64" to %"PRIu64
713 " entries", rec_idx, raft->log_end, r->entry.index);
714 raft_truncate(raft, r->entry.index);
715 }
716
717 uint64_t prev_term = (raft->log_end > raft->log_start
718 ? raft->entries[raft->log_end
719 - raft->log_start - 1].term
720 : raft->snap.term);
721 if (r->term < prev_term) {
722 return ovsdb_error(NULL, "record %llu with index %"PRIu64" term "
723 "%"PRIu64" precedes previous entry's term "
724 "%"PRIu64,
725 rec_idx, r->entry.index, r->term, prev_term);
726 }
727
728 raft->log_synced = raft_add_entry(
729 raft, r->term,
730 json_nullable_clone(r->entry.data), &r->entry.eid,
731 json_nullable_clone(r->entry.servers));
732 return NULL;
733
734 case RAFT_REC_TERM:
735 return NULL;
736
737 case RAFT_REC_VOTE:
738 if (r->term < raft->term) {
739 return ovsdb_error(NULL, "record %llu votes for term %"PRIu64" "
740 "but current term is %"PRIu64,
741 rec_idx, r->term, raft->term);
742 } else if (!uuid_is_zero(&raft->vote)
743 && !uuid_equals(&raft->vote, &r->sid)) {
744 return ovsdb_error(NULL, "record %llu votes for "SID_FMT" in term "
745 "%"PRIu64" but a previous record for the "
746 "same term voted for "SID_FMT, rec_idx,
747 SID_ARGS(&raft->vote), r->term,
748 SID_ARGS(&r->sid));
749 } else {
750 raft->vote = raft->synced_vote = r->sid;
751 return NULL;
752 }
753 break;
754
755 case RAFT_REC_NOTE:
756 if (!strcmp(r->note, "left")) {
757 return ovsdb_error(NULL, "record %llu indicates server has left "
758 "the cluster; it cannot be added back (use "
759 "\"ovsdb-tool join-cluster\" to add a new "
760 "server)", rec_idx);
761 }
762 return NULL;
763
764 case RAFT_REC_COMMIT_INDEX:
765 if (r->commit_index < raft->commit_index) {
766 return ovsdb_error(NULL, "record %llu regresses commit index "
767 "from %"PRIu64 " to %"PRIu64,
768 rec_idx, raft->commit_index, r->commit_index);
769 } else if (r->commit_index >= raft->log_end) {
770 return ovsdb_error(NULL, "record %llu advances commit index to "
771 "%"PRIu64 " but last log index is %"PRIu64,
772 rec_idx, r->commit_index, raft->log_end - 1);
773 } else {
774 raft->commit_index = r->commit_index;
775 return NULL;
776 }
777 break;
778
779 case RAFT_REC_LEADER:
780 /* XXX we could use this to take back leadership for quick restart */
781 return NULL;
782
783 default:
784 OVS_NOT_REACHED();
785 }
786}
787
788static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
789raft_read_header(struct raft *raft)
790{
791 /* Read header record. */
792 struct json *json;
793 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
794 if (error || !json) {
795 /* Report error or end-of-file. */
796 return error;
797 }
798 ovsdb_log_mark_base(raft->log);
799
800 struct raft_header h;
801 error = raft_header_from_json(&h, json);
802 json_destroy(json);
803 if (error) {
804 return error;
805 }
806
807 raft->sid = h.sid;
808 raft->cid = h.cid;
809 raft->name = xstrdup(h.name);
810 raft->local_address = xstrdup(h.local_address);
811 raft->local_nickname = raft_address_to_nickname(h.local_address, &h.sid);
812 raft->joining = h.joining;
813
814 if (h.joining) {
815 sset_clone(&raft->remote_addresses, &h.remote_addresses);
816 } else {
817 raft_entry_clone(&raft->snap, &h.snap);
818 raft->log_start = raft->log_end = h.snap_index + 1;
819 raft->commit_index = h.snap_index;
820 raft->last_applied = h.snap_index - 1;
821 }
822
823 raft_header_uninit(&h);
824
825 return NULL;
826}
827
828static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
829raft_read_log(struct raft *raft)
830{
831 for (unsigned long long int i = 1; ; i++) {
832 struct json *json;
833 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
834 if (!json) {
835 if (error) {
836 /* We assume that the error is due to a partial write while
837 * appending to the file before a crash, so log it and
838 * continue. */
839 char *error_string = ovsdb_error_to_string_free(error);
840 VLOG_WARN("%s", error_string);
841 free(error_string);
842 error = NULL;
843 }
844 break;
845 }
846
847 struct raft_record r;
848 error = raft_record_from_json(&r, json);
849 if (!error) {
850 error = raft_apply_record(raft, i, &r);
851 raft_record_uninit(&r);
852 }
853 if (error) {
854 return ovsdb_wrap_error(error, "error reading record %llu from "
855 "%s log", i, raft->name);
856 }
857 }
858
859 /* Set the most recent servers. */
860 raft_get_servers_from_log(raft, VLL_DBG);
861
862 return NULL;
863}
864
865static void
866raft_reset_timer(struct raft *raft)
867{
868 unsigned int duration = (ELECTION_BASE_MSEC
869 + random_range(ELECTION_RANGE_MSEC));
870 raft->election_base = time_msec();
871 raft->election_timeout = raft->election_base + duration;
872}
873
874static void
875raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
876 const struct uuid *sid, bool incoming)
877{
878 struct raft_conn *conn = xzalloc(sizeof *conn);
879 ovs_list_push_back(&raft->conns, &conn->list_node);
880 conn->js = js;
881 if (sid) {
882 conn->sid = *sid;
883 }
884 conn->nickname = raft_address_to_nickname(jsonrpc_session_get_name(js),
885 &conn->sid);
886 conn->incoming = incoming;
887 conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
888}
889
890/* Starts the local server in an existing Raft cluster, using the local copy of
891 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
892 * successful or not. */
893struct ovsdb_error * OVS_WARN_UNUSED_RESULT
894raft_open(struct ovsdb_log *log, struct raft **raftp)
895{
896 struct raft *raft = raft_alloc();
897 raft->log = log;
898
899 struct ovsdb_error *error = raft_read_header(raft);
900 if (error) {
901 goto error;
902 }
903
904 if (!raft->joining) {
905 error = raft_read_log(raft);
906 if (error) {
907 goto error;
908 }
909
910 /* Find our own server. */
911 if (!raft_find_server(raft, &raft->sid)) {
912 error = ovsdb_error(NULL, "server does not belong to cluster");
913 goto error;
914 }
915
916 /* If there's only one server, start an election right away so that the
917 * cluster bootstraps quickly. */
918 if (hmap_count(&raft->servers) == 1) {
919 raft_start_election(raft, false);
920 }
921 } else {
922 raft->join_timeout = time_msec() + 1000;
923 }
924
925 *raftp = raft;
926 hmap_insert(&all_rafts, &raft->hmap_node, hash_string(raft->name, 0));
927 return NULL;
928
929error:
930 raft_close(raft);
931 *raftp = NULL;
932 return error;
933}
934
935/* Returns the name of 'raft', which in OVSDB is the database schema name. */
936const char *
937raft_get_name(const struct raft *raft)
938{
939 return raft->name;
940}
941
942/* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
943 * its cluster, then 'cid' will be all-zeros (unless the administrator
944 * specified a cluster ID running "ovsdb-tool join-cluster").
945 *
946 * Each cluster has a unique cluster ID. */
947const struct uuid *
948raft_get_cid(const struct raft *raft)
949{
950 return &raft->cid;
951}
952
953/* Returns the server ID of 'raft'. Each server has a unique server ID. */
954const struct uuid *
955raft_get_sid(const struct raft *raft)
956{
957 return &raft->sid;
958}
959
960/* Returns true if 'raft' has completed joining its cluster, has not left or
961 * initiated leaving the cluster, does not have failed disk storage, and is
962 * apparently connected to the leader in a healthy way (or is itself the
963 * leader).*/
964bool
965raft_is_connected(const struct raft *raft)
966{
967 return (raft->role != RAFT_CANDIDATE
968 && !raft->joining
969 && !raft->leaving
970 && !raft->left
971 && !raft->failed);
972}
973
974/* Returns true if 'raft' is the cluster leader. */
975bool
976raft_is_leader(const struct raft *raft)
977{
978 return raft->role == RAFT_LEADER;
979}
980
981/* Returns true if 'raft' is the process of joining its cluster. */
982bool
983raft_is_joining(const struct raft *raft)
984{
985 return raft->joining;
986}
987
988/* Only returns *connected* connections. */
989static struct raft_conn *
990raft_find_conn_by_sid(struct raft *raft, const struct uuid *sid)
991{
992 if (!uuid_is_zero(sid)) {
993 struct raft_conn *conn;
994 LIST_FOR_EACH (conn, list_node, &raft->conns) {
995 if (uuid_equals(sid, &conn->sid)
996 && jsonrpc_session_is_connected(conn->js)) {
997 return conn;
998 }
999 }
1000 }
1001 return NULL;
1002}
1003
1004static struct raft_conn *
1005raft_find_conn_by_address(struct raft *raft, const char *address)
1006{
1007 struct raft_conn *conn;
1008 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1009 if (!strcmp(jsonrpc_session_get_name(conn->js), address)) {
1010 return conn;
1011 }
1012 }
1013 return NULL;
1014}
1015
1016static void OVS_PRINTF_FORMAT(3, 4)
1017raft_record_note(struct raft *raft, const char *note,
1018 const char *comment_format, ...)
1019{
1020 va_list args;
1021 va_start(args, comment_format);
1022 char *comment = xvasprintf(comment_format, args);
1023 va_end(args);
1024
1025 struct raft_record r = {
1026 .type = RAFT_REC_NOTE,
1027 .comment = comment,
1028 .note = CONST_CAST(char *, note),
1029 };
1030 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
1031
1032 free(comment);
1033}
1034
1035/* If we're leader, try to transfer leadership to another server, logging
1036 * 'reason' as the human-readable reason (it should be a phrase suitable for
1037 * following "because") . */
1038void
1039raft_transfer_leadership(struct raft *raft, const char *reason)
1040{
1041 if (raft->role != RAFT_LEADER) {
1042 return;
1043 }
1044
1045 struct raft_server *s;
1046 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1047 if (!uuid_equals(&raft->sid, &s->sid)
1048 && s->phase == RAFT_PHASE_STABLE) {
1049 struct raft_conn *conn = raft_find_conn_by_sid(raft, &s->sid);
1050 if (!conn) {
1051 continue;
1052 }
1053
1054 union raft_rpc rpc = {
1055 .become_leader = {
1056 .common = {
1057 .comment = CONST_CAST(char *, reason),
1058 .type = RAFT_RPC_BECOME_LEADER,
1059 .sid = s->sid,
1060 },
1061 .term = raft->term,
1062 }
1063 };
02acb41a 1064 raft_send_to_conn(raft, &rpc, conn);
1b1d2e6d
BP
1065
1066 raft_record_note(raft, "transfer leadership",
1067 "transferring leadership to %s because %s",
1068 s->nickname, reason);
1069 break;
1070 }
1071 }
1072}
1073
1074/* Send a RemoveServerRequest to the rest of the servers in the cluster.
1075 *
1076 * If we know which server is the leader, we can just send the request to it.
1077 * However, we might not know which server is the leader, and we might never
1078 * find out if the remove request was actually previously committed by a
1079 * majority of the servers (because in that case the new leader will not send
1080 * AppendRequests or heartbeats to us). Therefore, we instead send
1081 * RemoveRequests to every server. This theoretically has the same problem, if
1082 * the current cluster leader was not previously a member of the cluster, but
1083 * it seems likely to be more robust in practice. */
1084static void
1085raft_send_remove_server_requests(struct raft *raft)
1086{
1087 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1088 VLOG_INFO_RL(&rl, "sending remove request (joining=%s, leaving=%s)",
1089 raft->joining ? "true" : "false",
1090 raft->leaving ? "true" : "false");
1091 const struct raft_server *s;
1092 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1093 if (!uuid_equals(&s->sid, &raft->sid)) {
1094 union raft_rpc rpc = (union raft_rpc) {
1095 .remove_server_request = {
1096 .common = {
1097 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1098 .sid = s->sid,
1099 },
1100 .sid = raft->sid,
1101 },
1102 };
1103 raft_send(raft, &rpc);
1104 }
1105 }
1106
1107 raft->leave_timeout = time_msec() + ELECTION_BASE_MSEC;
1108}
1109
1110/* Attempts to start 'raft' leaving its cluster. The caller can check progress
1111 * using raft_is_leaving() and raft_left(). */
1112void
1113raft_leave(struct raft *raft)
1114{
1115 if (raft->joining || raft->failed || raft->leaving || raft->left) {
1116 return;
1117 }
1118 VLOG_INFO(SID_FMT": starting to leave cluster "CID_FMT,
1119 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1120 raft->leaving = true;
1121 raft_transfer_leadership(raft, "this server is leaving the cluster");
1122 raft_become_follower(raft);
1123 raft_send_remove_server_requests(raft);
1124 raft->leave_timeout = time_msec() + ELECTION_BASE_MSEC;
1125}
1126
1127/* Returns true if 'raft' is currently attempting to leave its cluster. */
1128bool
1129raft_is_leaving(const struct raft *raft)
1130{
1131 return raft->leaving;
1132}
1133
1134/* Returns true if 'raft' successfully left its cluster. */
1135bool
1136raft_left(const struct raft *raft)
1137{
1138 return raft->left;
1139}
1140
1141/* Returns true if 'raft' has experienced a disk I/O failure. When this
1142 * returns true, only closing and reopening 'raft' allows for recovery. */
1143bool
1144raft_failed(const struct raft *raft)
1145{
1146 return raft->failed;
1147}
1148
1149/* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1150 * current cluster. */
1151void
1152raft_take_leadership(struct raft *raft)
1153{
1154 if (raft->role != RAFT_LEADER) {
1155 raft_start_election(raft, true);
1156 }
1157}
1158
1159/* Closes everything owned by 'raft' that might be visible outside the process:
1160 * network connections, commands, etc. This is part of closing 'raft'; it is
1161 * also used if 'raft' has failed in an unrecoverable way. */
1162static void
1163raft_close__(struct raft *raft)
1164{
1165 if (!hmap_node_is_null(&raft->hmap_node)) {
1166 hmap_remove(&all_rafts, &raft->hmap_node);
1167 hmap_node_nullify(&raft->hmap_node);
1168 }
1169
1170 raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
1171
1172 struct raft_server *rs = raft->remove_server;
1173 if (rs) {
1174 raft_send_remove_server_reply__(raft, &rs->sid, &rs->requester_sid,
1175 rs->requester_conn, false,
1176 RAFT_SERVER_SHUTDOWN);
1177 raft_server_destroy(raft->remove_server);
1178 raft->remove_server = NULL;
1179 }
1180
1181 struct raft_conn *conn, *next;
1182 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1183 raft_conn_close(conn);
1184 }
1185}
1186
1187/* Closes and frees 'raft'.
1188 *
1189 * A server's cluster membership is independent of whether the server is
1190 * actually running. When a server that is a member of a cluster closes, the
1191 * cluster treats this as a server failure. */
1192void
1193raft_close(struct raft *raft)
1194{
1195 if (!raft) {
1196 return;
1197 }
1198
1199 raft_transfer_leadership(raft, "this server is shutting down");
1200
1201 raft_close__(raft);
1202
1203 ovsdb_log_close(raft->log);
1204
1205 raft_servers_destroy(&raft->servers);
1206
1207 for (uint64_t index = raft->log_start; index < raft->log_end; index++) {
1208 struct raft_entry *e = &raft->entries[index - raft->log_start];
1209 raft_entry_uninit(e);
1210 }
1211 free(raft->entries);
1212
1213 raft_entry_uninit(&raft->snap);
1214
1215 raft_waiters_destroy(raft);
1216
1217 raft_servers_destroy(&raft->add_servers);
1218
1219 hmap_destroy(&raft->commands);
1220
1221 pstream_close(raft->listener);
1222
1223 sset_destroy(&raft->remote_addresses);
1224 free(raft->local_address);
1225 free(raft->local_nickname);
1226 free(raft->name);
1227
1228 free(raft);
1229}
1230
1231static bool
1232raft_conn_receive(struct raft *raft, struct raft_conn *conn,
1233 union raft_rpc *rpc)
1234{
1235 struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
1236 if (!msg) {
1237 return false;
1238 }
1239
1240 struct ovsdb_error *error = raft_rpc_from_jsonrpc(&raft->cid, &raft->sid,
1241 msg, rpc);
1242 jsonrpc_msg_destroy(msg);
1243 if (error) {
1244 char *s = ovsdb_error_to_string_free(error);
1245 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn->js), s);
1246 free(s);
1247 return false;
1248 }
1249
1250 if (uuid_is_zero(&conn->sid)) {
1251 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1252 conn->sid = rpc->common.sid;
1253 VLOG_INFO_RL(&rl, "%s: learned server ID "SID_FMT,
1254 jsonrpc_session_get_name(conn->js), SID_ARGS(&conn->sid));
1255 } else if (!uuid_equals(&conn->sid, &rpc->common.sid)) {
1256 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1257 VLOG_WARN_RL(&rl, "%s: ignoring message with unexpected server ID "
1258 SID_FMT" (expected "SID_FMT")",
1259 jsonrpc_session_get_name(conn->js),
1260 SID_ARGS(&rpc->common.sid), SID_ARGS(&conn->sid));
1261 raft_rpc_uninit(rpc);
1262 return false;
1263 }
1264
1265 const char *address = (rpc->type == RAFT_RPC_HELLO_REQUEST
1266 ? rpc->hello_request.address
1267 : rpc->type == RAFT_RPC_ADD_SERVER_REQUEST
1268 ? rpc->add_server_request.address
1269 : NULL);
1270 if (address) {
1271 char *new_nickname = raft_address_to_nickname(address, &conn->sid);
1272 if (strcmp(conn->nickname, new_nickname)) {
1273 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1274 VLOG_INFO_RL(&rl, "%s: learned remote address %s",
1275 jsonrpc_session_get_name(conn->js), address);
1276
1277 free(conn->nickname);
1278 conn->nickname = new_nickname;
1279 } else {
1280 free(new_nickname);
1281 }
1282 }
1283
1284 return true;
1285}
1286
1287static const char *
1288raft_get_nickname(const struct raft *raft, const struct uuid *sid,
1289 char buf[SID_LEN + 1], size_t bufsize)
1290{
1291 if (uuid_equals(sid, &raft->sid)) {
1292 return raft->local_nickname;
1293 }
1294
1295 const char *s = raft_servers_get_nickname__(&raft->servers, sid);
1296 if (s) {
1297 return s;
1298 }
1299
1300 return raft_servers_get_nickname(&raft->add_servers, sid, buf, bufsize);
1301}
1302
1303static void
02acb41a
BP
1304log_rpc(const union raft_rpc *rpc, const char *direction,
1305 const struct raft_conn *conn, int line_number)
1b1d2e6d
BP
1306{
1307 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
1308 if (!raft_rpc_is_heartbeat(rpc) && !VLOG_DROP_DBG(&rl)) {
1309 struct ds s = DS_EMPTY_INITIALIZER;
02acb41a
BP
1310 if (line_number) {
1311 ds_put_format(&s, "raft.c:%d ", line_number);
1312 }
1313 ds_put_format(&s, "%s%s ", direction, conn->nickname);
1b1d2e6d 1314 raft_rpc_format(rpc, &s);
02acb41a 1315 VLOG_DBG("%s", ds_cstr(&s));
1b1d2e6d
BP
1316 ds_destroy(&s);
1317 }
1318}
1319
1320static void
1321raft_send_add_server_request(struct raft *raft, struct raft_conn *conn)
1322{
1323 union raft_rpc rq = {
1324 .add_server_request = {
1325 .common = {
1326 .type = RAFT_RPC_ADD_SERVER_REQUEST,
1327 .sid = UUID_ZERO,
1328 .comment = NULL,
1329 },
1330 .address = raft->local_address,
1331 },
1332 };
02acb41a 1333 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1334}
1335
1336static void
1337raft_conn_run(struct raft *raft, struct raft_conn *conn)
1338{
1339 jsonrpc_session_run(conn->js);
1340
1341 unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
1342 bool just_connected = (new_seqno != conn->js_seqno
1343 && jsonrpc_session_is_connected(conn->js));
1344 conn->js_seqno = new_seqno;
1345 if (just_connected) {
1346 if (raft->joining) {
1347 raft_send_add_server_request(raft, conn);
1348 } else if (raft->leaving) {
1349 union raft_rpc rq = {
1350 .remove_server_request = {
1351 .common = {
1352 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1353 .sid = conn->sid,
1354 },
1355 .sid = raft->sid,
1356 },
1357 };
02acb41a 1358 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1359 } else {
1360 union raft_rpc rq = (union raft_rpc) {
1361 .hello_request = {
1362 .common = {
1363 .type = RAFT_RPC_HELLO_REQUEST,
1364 .sid = conn->sid,
1365 },
1366 .address = raft->local_address,
1367 },
1368 };
02acb41a 1369 raft_send_to_conn(raft, &rq, conn);
1b1d2e6d
BP
1370 }
1371 }
1372
1373 for (size_t i = 0; i < 50; i++) {
1374 union raft_rpc rpc;
1375 if (!raft_conn_receive(raft, conn, &rpc)) {
1376 break;
1377 }
1378
02acb41a 1379 log_rpc(&rpc, "<--", conn, 0);
1b1d2e6d
BP
1380 raft_handle_rpc(raft, &rpc);
1381 raft_rpc_uninit(&rpc);
1382 }
1383}
1384
1385static void
1386raft_waiter_complete_rpc(struct raft *raft, const union raft_rpc *rpc)
1387{
1388 uint64_t term = raft_rpc_get_term(rpc);
1389 if (term && term < raft->term) {
1390 /* Drop the message because it's for an expired term. */
1391 return;
1392 }
1393
1394 if (!raft_is_rpc_synced(raft, rpc)) {
1395 /* This is a bug. A reply message is deferred because some state in
1396 * the message, such as a term or index, has not been committed to
1397 * disk, and they should only be completed when that commit is done.
1398 * But this message is being completed before the commit is finished.
1399 * Complain, and hope that someone reports the bug. */
1400 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1401 if (VLOG_DROP_ERR(&rl)) {
1402 return;
1403 }
1404
1405 struct ds s = DS_EMPTY_INITIALIZER;
1406
1407 if (term > raft->synced_term) {
1408 ds_put_format(&s, " because message term %"PRIu64" is "
1409 "past synced term %"PRIu64,
1410 term, raft->synced_term);
1411 }
1412
1413 uint64_t index = raft_rpc_get_min_sync_index(rpc);
1414 if (index > raft->log_synced) {
1415 ds_put_format(&s, " %s message index %"PRIu64" is past last "
1416 "synced index %"PRIu64,
1417 s.length ? "and" : "because",
1418 index, raft->log_synced);
1419 }
1420
1421 const struct uuid *vote = raft_rpc_get_vote(rpc);
1422 if (vote && !uuid_equals(vote, &raft->synced_vote)) {
1423 char buf1[SID_LEN + 1];
1424 char buf2[SID_LEN + 1];
1425 ds_put_format(&s, " %s vote %s differs from synced vote %s",
1426 s.length ? "and" : "because",
1427 raft_get_nickname(raft, vote, buf1, sizeof buf1),
1428 raft_get_nickname(raft, &raft->synced_vote,
1429 buf2, sizeof buf2));
1430 }
1431
1432 char buf[SID_LEN + 1];
1433 ds_put_format(&s, ": %s ",
1434 raft_get_nickname(raft, &rpc->common.sid,
1435 buf, sizeof buf));
1436 raft_rpc_format(rpc, &s);
1437 VLOG_ERR("internal error: deferred %s message completed "
1438 "but not ready to send%s",
1439 raft_rpc_type_to_string(rpc->type), ds_cstr(&s));
1440 ds_destroy(&s);
1441
1442 return;
1443 }
1444
1445 struct raft_conn *dst = raft_find_conn_by_sid(raft, &rpc->common.sid);
1446 if (dst) {
02acb41a 1447 raft_send_to_conn(raft, rpc, dst);
1b1d2e6d
BP
1448 }
1449}
1450
1451static void
1452raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
1453{
02acb41a 1454 VLOG_INFO("%s:%d", __FILE__, __LINE__);
1b1d2e6d
BP
1455 switch (w->type) {
1456 case RAFT_W_ENTRY:
1457 if (raft->role == RAFT_LEADER) {
1458 raft_update_our_match_index(raft, w->entry.index);
1459 }
1460 raft->log_synced = w->entry.index;
1461 break;
1462
1463 case RAFT_W_TERM:
1464 raft->synced_term = w->term.term;
1465 raft->synced_vote = w->term.vote;
1466 break;
1467
1468 case RAFT_W_RPC:
1469 raft_waiter_complete_rpc(raft, w->rpc);
1470 break;
1471 }
1472}
1473
1474static void
1475raft_waiter_destroy(struct raft_waiter *w)
1476{
1477 if (!w) {
1478 return;
1479 }
1480
1481 ovs_list_remove(&w->list_node);
1482
1483 switch (w->type) {
1484 case RAFT_W_ENTRY:
1485 case RAFT_W_TERM:
1486 break;
1487
1488 case RAFT_W_RPC:
1489 raft_rpc_uninit(w->rpc);
1490 free(w->rpc);
1491 break;
1492 }
1493 free(w);
1494}
1495
1496static void
1497raft_waiters_run(struct raft *raft)
1498{
1499 if (ovs_list_is_empty(&raft->waiters)) {
1500 return;
1501 }
1502
1503 uint64_t cur = ovsdb_log_commit_progress(raft->log);
1504 struct raft_waiter *w, *next;
1505 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1506 if (cur < w->commit_ticket) {
1507 break;
1508 }
1509 raft_waiter_complete(raft, w);
1510 raft_waiter_destroy(w);
1511 }
1512}
1513
1514static void
1515raft_waiters_wait(struct raft *raft)
1516{
1517 struct raft_waiter *w;
1518 LIST_FOR_EACH (w, list_node, &raft->waiters) {
1519 ovsdb_log_commit_wait(raft->log, w->commit_ticket);
1520 break;
1521 }
1522}
1523
1524static void
1525raft_waiters_destroy(struct raft *raft)
1526{
1527 struct raft_waiter *w, *next;
1528 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1529 raft_waiter_destroy(w);
1530 }
1531}
1532
1533static bool OVS_WARN_UNUSED_RESULT
1534raft_set_term(struct raft *raft, uint64_t term, const struct uuid *vote)
1535{
1536 struct ovsdb_error *error = raft_write_state(raft->log, term, vote);
1537 if (!raft_handle_write_error(raft, error)) {
1538 return false;
1539 }
1540
1541 struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_TERM, true);
1542 raft->term = w->term.term = term;
1543 raft->vote = w->term.vote = vote ? *vote : UUID_ZERO;
1544 return true;
1545}
1546
1547static void
1548raft_accept_vote(struct raft *raft, struct raft_server *s,
1549 const struct uuid *vote)
1550{
1551 if (uuid_equals(&s->vote, vote)) {
1552 return;
1553 }
1554 if (!uuid_is_zero(&s->vote)) {
1555 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1556 char buf1[SID_LEN + 1];
1557 char buf2[SID_LEN + 1];
1558 VLOG_WARN_RL(&rl, "server %s changed its vote from %s to %s",
1559 s->nickname,
1560 raft_get_nickname(raft, &s->vote, buf1, sizeof buf1),
1561 raft_get_nickname(raft, vote, buf2, sizeof buf2));
1562 }
1563 s->vote = *vote;
1564 if (uuid_equals(vote, &raft->sid)
1565 && ++raft->n_votes > hmap_count(&raft->servers) / 2) {
1566 raft_become_leader(raft);
1567 }
1568}
1569
1570static void
1571raft_start_election(struct raft *raft, bool leadership_transfer)
1572{
1573 if (raft->leaving) {
1574 return;
1575 }
1576
1577 struct raft_server *me = raft_find_server(raft, &raft->sid);
1578 if (!me) {
1579 return;
1580 }
1581
1582 if (!raft_set_term(raft, raft->term + 1, &raft->sid)) {
1583 return;
1584 }
1585
1586 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
1587
1588 ovs_assert(raft->role != RAFT_LEADER);
1589 ovs_assert(hmap_is_empty(&raft->commands));
1590 raft->role = RAFT_CANDIDATE;
1591
1592 raft->n_votes = 0;
1593
1594 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1595 if (!VLOG_DROP_INFO(&rl)) {
1596 long long int now = time_msec();
1597 if (now >= raft->election_timeout) {
1598 VLOG_INFO("term %"PRIu64": %lld ms timeout expired, "
1599 "starting election",
1600 raft->term, now - raft->election_base);
1601 } else {
1602 VLOG_INFO("term %"PRIu64": starting election", raft->term);
1603 }
1604 }
1605 raft_reset_timer(raft);
1606
1607 struct raft_server *peer;
1608 HMAP_FOR_EACH (peer, hmap_node, &raft->servers) {
1609 peer->vote = UUID_ZERO;
1610 if (uuid_equals(&raft->sid, &peer->sid)) {
1611 continue;
1612 }
1613
1614 union raft_rpc rq = {
1615 .vote_request = {
1616 .common = {
1617 .type = RAFT_RPC_VOTE_REQUEST,
1618 .sid = peer->sid,
1619 },
1620 .term = raft->term,
1621 .last_log_index = raft->log_end - 1,
1622 .last_log_term = (
1623 raft->log_end > raft->log_start
1624 ? raft->entries[raft->log_end - raft->log_start - 1].term
1625 : raft->snap.term),
1626 .leadership_transfer = leadership_transfer,
1627 },
1628 };
1629 raft_send(raft, &rq);
1630 }
1631
1632 /* Vote for ourselves. */
1633 raft_accept_vote(raft, me, &raft->sid);
1634}
1635
1636static void
1637raft_open_conn(struct raft *raft, const char *address, const struct uuid *sid)
1638{
1639 if (strcmp(address, raft->local_address)
1640 && !raft_find_conn_by_address(raft, address)) {
1641 raft_add_conn(raft, jsonrpc_session_open(address, true), sid, false);
1642 }
1643}
1644
1645static void
1646raft_conn_close(struct raft_conn *conn)
1647{
1648 jsonrpc_session_close(conn->js);
1649 ovs_list_remove(&conn->list_node);
1650 free(conn->nickname);
1651 free(conn);
1652}
1653
1654/* Returns true if 'conn' should stay open, false if it should be closed. */
1655static bool
1656raft_conn_should_stay_open(struct raft *raft, struct raft_conn *conn)
1657{
1658 /* Close the connection if it's actually dead. If necessary, we'll
1659 * initiate a new session later. */
1660 if (!jsonrpc_session_is_alive(conn->js)) {
1661 return false;
1662 }
1663
1664 /* Keep incoming sessions. We trust the originator to decide to drop
1665 * it. */
1666 if (conn->incoming) {
1667 return true;
1668 }
1669
1670 /* If we are joining the cluster, keep sessions to the remote addresses
1671 * that are supposed to be part of the cluster we're joining. */
1672 if (raft->joining && sset_contains(&raft->remote_addresses,
1673 jsonrpc_session_get_name(conn->js))) {
1674 return true;
1675 }
1676
1677 /* We have joined the cluster. If we did that "recently", then there is a
1678 * chance that we do not have the most recent server configuration log
1679 * entry. If so, it's a waste to disconnect from the servers that were in
1680 * remote_addresses and that will probably appear in the configuration,
1681 * just to reconnect to them a moment later when we do get the
1682 * configuration update. If we are not ourselves in the configuration,
1683 * then we know that there must be a new configuration coming up, so in
1684 * that case keep the connection. */
1685 if (!raft_find_server(raft, &raft->sid)) {
1686 return true;
1687 }
1688
1689 /* Keep the connection only if the server is part of the configuration. */
1690 return raft_find_server(raft, &conn->sid);
1691}
1692
1693/* Allows 'raft' to maintain the distributed log. Call this function as part
1694 * of the process's main loop. */
1695void
1696raft_run(struct raft *raft)
1697{
1698 if (raft->left || raft->failed) {
1699 return;
1700 }
1701
1702 raft_waiters_run(raft);
1703
1704 if (!raft->listener && time_msec() >= raft->listen_backoff) {
1705 char *paddr = raft_make_address_passive(raft->local_address);
1706 int error = pstream_open(paddr, &raft->listener, DSCP_DEFAULT);
1707 if (error) {
1708 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1709 VLOG_WARN_RL(&rl, "%s: listen failed (%s)",
1710 paddr, ovs_strerror(error));
1711 raft->listen_backoff = time_msec() + 1000;
1712 }
1713 free(paddr);
1714 }
1715
1716 if (raft->listener) {
1717 struct stream *stream;
1718 int error = pstream_accept(raft->listener, &stream);
1719 if (!error) {
1720 raft_add_conn(raft, jsonrpc_session_open_unreliably(
1721 jsonrpc_open(stream), DSCP_DEFAULT), NULL,
1722 true);
1723 } else if (error != EAGAIN) {
1724 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1725 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
1726 pstream_get_name(raft->listener),
1727 ovs_strerror(error));
1728 }
1729 }
1730
1731 /* Run RPCs for all open sessions. */
1732 struct raft_conn *conn;
1733 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1734 raft_conn_run(raft, conn);
1735 }
1736
1737 /* Close unneeded sessions. */
1738 struct raft_conn *next;
1739 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1740 if (!raft_conn_should_stay_open(raft, conn)) {
1741 raft_conn_close(conn);
1742 }
1743 }
1744
1745 /* Open needed sessions. */
1746 struct raft_server *server;
1747 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1748 raft_open_conn(raft, server->address, &server->sid);
1749 }
1750 if (raft->joining) {
1751 const char *address;
1752 SSET_FOR_EACH (address, &raft->remote_addresses) {
1753 raft_open_conn(raft, address, NULL);
1754 }
1755 }
1756
1757 if (!raft->joining && time_msec() >= raft->election_timeout) {
1758 raft_start_election(raft, false);
1759 }
1760
1761 if (raft->leaving && time_msec() >= raft->leave_timeout) {
1762 raft_send_remove_server_requests(raft);
1763 }
1764
1765 if (raft->joining && time_msec() >= raft->join_timeout) {
1766 raft->join_timeout = time_msec() + 1000;
1767 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1768 raft_send_add_server_request(raft, conn);
1769 }
1770 }
1771
1772 if (time_msec() >= raft->ping_timeout) {
1773 if (raft->role == RAFT_LEADER) {
1774 raft_send_heartbeats(raft);
1775 } else {
1776 long long int now = time_msec();
1777 struct raft_command *cmd, *next_cmd;
1778 HMAP_FOR_EACH_SAFE (cmd, next_cmd, hmap_node, &raft->commands) {
1779 if (cmd->timestamp
1780 && now - cmd->timestamp > ELECTION_BASE_MSEC) {
1781 raft_command_complete(raft, cmd, RAFT_CMD_TIMEOUT);
1782 }
1783 }
1784 }
1785 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
1786 }
1787
1788 /* Do this only at the end; if we did it as soon as we set raft->left or
1789 * raft->failed in handling the RemoveServerReply, then it could easily
1790 * cause references to freed memory in RPC sessions, etc. */
1791 if (raft->left || raft->failed) {
1792 raft_close__(raft);
1793 }
1794}
1795
1796static void
1797raft_wait_session(struct jsonrpc_session *js)
1798{
1799 if (js) {
1800 jsonrpc_session_wait(js);
1801 jsonrpc_session_recv_wait(js);
1802 }
1803}
1804
1805/* Causes the next call to poll_block() to wake up when 'raft' needs to do
1806 * something. */
1807void
1808raft_wait(struct raft *raft)
1809{
1810 if (raft->left || raft->failed) {
1811 return;
1812 }
1813
1814 raft_waiters_wait(raft);
1815
1816 if (raft->listener) {
1817 pstream_wait(raft->listener);
1818 } else {
1819 poll_timer_wait_until(raft->listen_backoff);
1820 }
1821
1822 struct raft_conn *conn;
1823 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1824 raft_wait_session(conn->js);
1825 }
1826
1827 if (!raft->joining) {
1828 poll_timer_wait_until(raft->election_timeout);
1829 } else {
1830 poll_timer_wait_until(raft->join_timeout);
1831 }
1832 if (raft->leaving) {
1833 poll_timer_wait_until(raft->leave_timeout);
1834 }
1835 if (raft->role == RAFT_LEADER || !hmap_is_empty(&raft->commands)) {
1836 poll_timer_wait_until(raft->ping_timeout);
1837 }
1838}
1839
1840static struct raft_waiter *
1841raft_waiter_create(struct raft *raft, enum raft_waiter_type type,
1842 bool start_commit)
1843{
1844 struct raft_waiter *w = xzalloc(sizeof *w);
1845 ovs_list_push_back(&raft->waiters, &w->list_node);
1846 w->commit_ticket = start_commit ? ovsdb_log_commit_start(raft->log) : 0;
1847 w->type = type;
1848 return w;
1849}
1850
1851/* Returns a human-readable representation of 'status' (or NULL if 'status' is
1852 * invalid). */
1853const char *
1854raft_command_status_to_string(enum raft_command_status status)
1855{
1856 switch (status) {
1857 case RAFT_CMD_INCOMPLETE:
1858 return "operation still in progress";
1859 case RAFT_CMD_SUCCESS:
1860 return "success";
1861 case RAFT_CMD_NOT_LEADER:
1862 return "not leader";
1863 case RAFT_CMD_BAD_PREREQ:
1864 return "prerequisite check failed";
1865 case RAFT_CMD_LOST_LEADERSHIP:
1866 return "lost leadership";
1867 case RAFT_CMD_SHUTDOWN:
1868 return "server shutdown";
1869 case RAFT_CMD_IO_ERROR:
1870 return "I/O error";
1871 case RAFT_CMD_TIMEOUT:
1872 return "timeout";
1873 default:
1874 return NULL;
1875 }
1876}
1877
1878/* Converts human-readable status in 's' into status code in '*statusp'.
1879 * Returns true if successful, false if 's' is unknown. */
1880bool
1881raft_command_status_from_string(const char *s,
1882 enum raft_command_status *statusp)
1883{
1884 for (enum raft_command_status status = 0; ; status++) {
1885 const char *s2 = raft_command_status_to_string(status);
1886 if (!s2) {
1887 *statusp = 0;
1888 return false;
1889 } else if (!strcmp(s, s2)) {
1890 *statusp = status;
1891 return true;
1892 }
1893 }
1894}
1895
1896static const struct uuid *
1897raft_get_eid(const struct raft *raft, uint64_t index)
1898{
1899 for (; index >= raft->log_start; index--) {
1900 const struct raft_entry *e = raft_get_entry(raft, index);
1901 if (e->data) {
1902 return &e->eid;
1903 }
1904 }
1905 return &raft->snap.eid;
1906}
1907
1908static const struct uuid *
1909raft_current_eid(const struct raft *raft)
1910{
1911 return raft_get_eid(raft, raft->log_end - 1);
1912}
1913
1914static struct raft_command *
1915raft_command_create_completed(enum raft_command_status status)
1916{
1917 ovs_assert(status != RAFT_CMD_INCOMPLETE);
1918
1919 struct raft_command *cmd = xzalloc(sizeof *cmd);
1920 cmd->n_refs = 1;
1921 cmd->status = status;
1922 return cmd;
1923}
1924
1925static struct raft_command *
1926raft_command_create_incomplete(struct raft *raft, uint64_t index)
1927{
1928 struct raft_command *cmd = xzalloc(sizeof *cmd);
1929 cmd->n_refs = 2; /* One for client, one for raft->commands. */
1930 cmd->index = index;
1931 cmd->status = RAFT_CMD_INCOMPLETE;
1932 hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
1933 return cmd;
1934}
1935
1936static struct raft_command * OVS_WARN_UNUSED_RESULT
1937raft_command_initiate(struct raft *raft,
1938 const struct json *data, const struct json *servers,
1939 const struct uuid *eid)
1940{
1941 /* Write to local log. */
1942 uint64_t index = raft->log_end;
1943 if (!raft_handle_write_error(
1944 raft, raft_write_entry(
1945 raft, raft->term, json_nullable_clone(data), eid,
1946 json_nullable_clone(servers)))) {
1947 return raft_command_create_completed(RAFT_CMD_IO_ERROR);
1948 }
1949
1950 struct raft_command *cmd = raft_command_create_incomplete(raft, index);
1951 if (eid) {
1952 cmd->eid = *eid;
1953 }
1954
1955 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index = cmd->index;
1956
1957 /* Write to remote logs. */
1958 struct raft_server *s;
1959 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1960 if (!uuid_equals(&s->sid, &raft->sid) && s->next_index == index) {
1961 raft_send_append_request(raft, s, 1, "execute command");
1962 s->next_index++;
1963 }
1964 }
1965
1966 return cmd;
1967}
1968
1969static struct raft_command * OVS_WARN_UNUSED_RESULT
1970raft_command_execute__(struct raft *raft,
1971 const struct json *data, const struct json *servers,
1972 const struct uuid *prereq, struct uuid *result)
1973{
1974 if (raft->joining || raft->leaving || raft->left || raft->failed) {
1975 return raft_command_create_completed(RAFT_CMD_SHUTDOWN);
1976 }
1977
1978 if (raft->role != RAFT_LEADER) {
1979 /* Consider proxying the command to the leader. We can only do that if
1980 * we know the leader and the command does not change the set of
1981 * servers. We do not proxy commands without prerequisites, even
1982 * though we could, because in an OVSDB context a log entry doesn't
1983 * make sense without context. */
1984 if (servers || !data
1985 || raft->role != RAFT_FOLLOWER || uuid_is_zero(&raft->leader_sid)
1986 || !prereq) {
1987 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
1988 }
1989 }
1990
1991 struct uuid eid = data ? uuid_random() : UUID_ZERO;
1992 if (result) {
1993 *result = eid;
1994 }
1995
1996 if (raft->role != RAFT_LEADER) {
1997 const union raft_rpc rpc = {
1998 .execute_command_request = {
1999 .common = {
2000 .type = RAFT_RPC_EXECUTE_COMMAND_REQUEST,
2001 .sid = raft->leader_sid,
2002 },
2003 .data = CONST_CAST(struct json *, data),
2004 .prereq = *prereq,
2005 .result = eid,
2006 }
2007 };
2008 if (!raft_send(raft, &rpc)) {
2009 /* Couldn't send command, so it definitely failed. */
2010 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2011 }
2012
2013 struct raft_command *cmd = raft_command_create_incomplete(raft, 0);
2014 cmd->timestamp = time_msec();
2015 cmd->eid = eid;
2016 return cmd;
2017 }
2018
2019 const struct uuid *current_eid = raft_current_eid(raft);
2020 if (prereq && !uuid_equals(prereq, current_eid)) {
2021 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2022 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
2023 "prerequisite "UUID_FMT,
2024 UUID_ARGS(current_eid), UUID_ARGS(prereq));
2025 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ);
2026 }
2027
2028 return raft_command_initiate(raft, data, servers, &eid);
2029}
2030
2031/* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2032 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2033 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2034 * populated with the entry ID for the new log entry.
2035 *
2036 * Returns a "struct raft_command" that may be used to track progress adding
2037 * the log entry. The caller must eventually free the returned structure, with
2038 * raft_command_unref(). */
2039struct raft_command * OVS_WARN_UNUSED_RESULT
2040raft_command_execute(struct raft *raft, const struct json *data,
2041 const struct uuid *prereq, struct uuid *result)
2042{
2043 return raft_command_execute__(raft, data, NULL, prereq, result);
2044}
2045
2046/* Returns the status of 'cmd'. */
2047enum raft_command_status
2048raft_command_get_status(const struct raft_command *cmd)
2049{
2050 ovs_assert(cmd->n_refs > 0);
2051 return cmd->status;
2052}
2053
2054/* Returns the index of the log entry at which 'cmd' was committed.
2055 *
2056 * This function works only with successful commands. */
2057uint64_t
2058raft_command_get_commit_index(const struct raft_command *cmd)
2059{
2060 ovs_assert(cmd->n_refs > 0);
2061 ovs_assert(cmd->status == RAFT_CMD_SUCCESS);
2062 return cmd->index;
2063}
2064
2065/* Frees 'cmd'. */
2066void
2067raft_command_unref(struct raft_command *cmd)
2068{
2069 if (cmd) {
2070 ovs_assert(cmd->n_refs > 0);
2071 if (!--cmd->n_refs) {
2072 free(cmd);
2073 }
2074 }
2075}
2076
2077/* Causes poll_block() to wake up when 'cmd' has status to report. */
2078void
2079raft_command_wait(const struct raft_command *cmd)
2080{
2081 if (cmd->status != RAFT_CMD_INCOMPLETE) {
2082 poll_immediate_wake();
2083 }
2084}
2085
2086static void
2087raft_command_complete(struct raft *raft,
2088 struct raft_command *cmd,
2089 enum raft_command_status status)
2090{
2091 if (!uuid_is_zero(&cmd->sid)) {
2092 uint64_t commit_index = status == RAFT_CMD_SUCCESS ? cmd->index : 0;
2093 raft_send_execute_command_reply(raft, &cmd->sid, &cmd->eid, status,
2094 commit_index);
2095 }
2096
2097 ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
2098 ovs_assert(cmd->n_refs > 0);
2099 hmap_remove(&raft->commands, &cmd->hmap_node);
2100 cmd->status = status;
2101 raft_command_unref(cmd);
2102}
2103
2104static void
2105raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
2106{
2107 struct raft_command *cmd, *next;
2108 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2109 raft_command_complete(raft, cmd, status);
2110 }
2111}
2112
2113static struct raft_command *
2114raft_find_command_by_index(struct raft *raft, uint64_t index)
2115{
2116 struct raft_command *cmd;
2117
2118 HMAP_FOR_EACH_IN_BUCKET (cmd, hmap_node, index, &raft->commands) {
2119 if (cmd->index == index) {
2120 return cmd;
2121 }
2122 }
2123 return NULL;
2124}
2125
2126static struct raft_command *
2127raft_find_command_by_eid(struct raft *raft, const struct uuid *eid)
2128{
2129 struct raft_command *cmd;
2130
2131 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2132 if (uuid_equals(&cmd->eid, eid)) {
2133 return cmd;
2134 }
2135 }
2136 return NULL;
2137}
2138\f
2139#define RAFT_RPC(ENUM, NAME) \
2140 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2141RAFT_RPC_TYPES
2142#undef RAFT_RPC
2143
2144static void
2145raft_handle_hello_request(struct raft *raft OVS_UNUSED,
2146 const struct raft_hello_request *hello OVS_UNUSED)
2147{
2148}
2149
2150/* 'sid' is the server being added. */
2151static void
2152raft_send_add_server_reply__(struct raft *raft, const struct uuid *sid,
2153 const char *address,
2154 bool success, const char *comment)
2155{
2156 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2157 if (!VLOG_DROP_INFO(&rl)) {
2158 struct ds s = DS_EMPTY_INITIALIZER;
2159 char buf[SID_LEN + 1];
2160 ds_put_format(&s, "adding %s ("SID_FMT" at %s) "
2161 "to cluster "CID_FMT" %s",
2162 raft_get_nickname(raft, sid, buf, sizeof buf),
2163 SID_ARGS(sid), address, CID_ARGS(&raft->cid),
2164 success ? "succeeded" : "failed");
2165 if (comment) {
2166 ds_put_format(&s, " (%s)", comment);
2167 }
2168 VLOG_INFO("%s", ds_cstr(&s));
2169 ds_destroy(&s);
2170 }
2171
2172 union raft_rpc rpy = {
2173 .add_server_reply = {
2174 .common = {
2175 .type = RAFT_RPC_ADD_SERVER_REPLY,
2176 .sid = *sid,
2177 .comment = CONST_CAST(char *, comment),
2178 },
2179 .success = success,
2180 }
2181 };
2182
2183 struct sset *remote_addresses = &rpy.add_server_reply.remote_addresses;
2184 sset_init(remote_addresses);
2185 if (!raft->joining) {
2186 struct raft_server *s;
2187 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2188 if (!uuid_equals(&s->sid, &raft->sid)) {
2189 sset_add(remote_addresses, s->address);
2190 }
2191 }
2192 }
2193
2194 raft_send(raft, &rpy);
2195
2196 sset_destroy(remote_addresses);
2197}
2198
2199static void
2200raft_send_remove_server_reply_rpc(struct raft *raft, const struct uuid *sid,
2201 bool success, const char *comment)
2202{
2203 const union raft_rpc rpy = {
2204 .remove_server_reply = {
2205 .common = {
2206 .type = RAFT_RPC_REMOVE_SERVER_REPLY,
2207 .sid = *sid,
2208 .comment = CONST_CAST(char *, comment),
2209 },
2210 .success = success,
2211 }
2212 };
2213 raft_send(raft, &rpy);
2214}
2215
2216static void
2217raft_send_remove_server_reply__(struct raft *raft,
2218 const struct uuid *target_sid,
2219 const struct uuid *requester_sid,
2220 struct unixctl_conn *requester_conn,
2221 bool success, const char *comment)
2222{
2223 struct ds s = DS_EMPTY_INITIALIZER;
2224 ds_put_format(&s, "request ");
2225 if (!uuid_is_zero(requester_sid)) {
2226 char buf[SID_LEN + 1];
2227 ds_put_format(&s, "by %s",
2228 raft_get_nickname(raft, requester_sid, buf, sizeof buf));
2229 } else {
2230 ds_put_cstr(&s, "via unixctl");
2231 }
2232 ds_put_cstr(&s, " to remove ");
2233 if (!requester_conn && uuid_equals(target_sid, requester_sid)) {
2234 ds_put_cstr(&s, "itself");
2235 } else {
2236 char buf[SID_LEN + 1];
2237 ds_put_cstr(&s, raft_get_nickname(raft, target_sid, buf, sizeof buf));
2238 }
2239 ds_put_format(&s, " from cluster "CID_FMT" %s",
2240 CID_ARGS(&raft->cid),
2241 success ? "succeeded" : "failed");
2242 if (comment) {
2243 ds_put_format(&s, " (%s)", comment);
2244 }
2245
2246 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2247 VLOG_INFO_RL(&rl, "%s", ds_cstr(&s));
2248
2249 /* Send RemoveServerReply to the requester (which could be a server or a
2250 * unixctl connection. Also always send it to the removed server; this
2251 * allows it to be sure that it's really removed and update its log and
2252 * disconnect permanently. */
2253 if (!uuid_is_zero(requester_sid)) {
2254 raft_send_remove_server_reply_rpc(raft, requester_sid,
2255 success, comment);
2256 }
2257 if (!uuid_equals(requester_sid, target_sid)) {
2258 raft_send_remove_server_reply_rpc(raft, target_sid, success, comment);
2259 }
2260 if (requester_conn) {
2261 if (success) {
2262 unixctl_command_reply(requester_conn, ds_cstr(&s));
2263 } else {
2264 unixctl_command_reply_error(requester_conn, ds_cstr(&s));
2265 }
2266 }
2267
2268 ds_destroy(&s);
2269}
2270
2271static void
2272raft_send_add_server_reply(struct raft *raft,
2273 const struct raft_add_server_request *rq,
2274 bool success, const char *comment)
2275{
2276 return raft_send_add_server_reply__(raft, &rq->common.sid, rq->address,
2277 success, comment);
2278}
2279
2280static void
2281raft_send_remove_server_reply(struct raft *raft,
2282 const struct raft_remove_server_request *rq,
2283 bool success, const char *comment)
2284{
2285 return raft_send_remove_server_reply__(raft, &rq->sid, &rq->common.sid,
2286 rq->requester_conn, success,
2287 comment);
2288}
2289
2290static void
2291raft_become_follower(struct raft *raft)
2292{
2293 raft->leader_sid = UUID_ZERO;
2294 if (raft->role == RAFT_FOLLOWER) {
2295 return;
2296 }
2297
2298 raft->role = RAFT_FOLLOWER;
2299 raft_reset_timer(raft);
2300
2301 /* Notify clients about lost leadership.
2302 *
2303 * We do not reverse our changes to 'raft->servers' because the new
2304 * configuration is already part of the log. Possibly the configuration
2305 * log entry will not be committed, but until we know that we must use the
2306 * new configuration. Our AppendEntries processing will properly update
2307 * the server configuration later, if necessary. */
2308 struct raft_server *s;
2309 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
2310 raft_send_add_server_reply__(raft, &s->sid, s->address, false,
2311 RAFT_SERVER_LOST_LEADERSHIP);
2312 }
2313 if (raft->remove_server) {
2314 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
2315 &raft->remove_server->requester_sid,
2316 raft->remove_server->requester_conn,
2317 false, RAFT_SERVER_LOST_LEADERSHIP);
2318 raft_server_destroy(raft->remove_server);
2319 raft->remove_server = NULL;
2320 }
2321
2322 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2323}
2324
2325static void
2326raft_send_append_request(struct raft *raft,
2327 struct raft_server *peer, unsigned int n,
2328 const char *comment)
2329{
2330 ovs_assert(raft->role == RAFT_LEADER);
2331
2332 const union raft_rpc rq = {
2333 .append_request = {
2334 .common = {
2335 .type = RAFT_RPC_APPEND_REQUEST,
2336 .sid = peer->sid,
2337 .comment = CONST_CAST(char *, comment),
2338 },
2339 .term = raft->term,
2340 .prev_log_index = peer->next_index - 1,
2341 .prev_log_term = (peer->next_index - 1 >= raft->log_start
2342 ? raft->entries[peer->next_index - 1
2343 - raft->log_start].term
2344 : raft->snap.term),
2345 .leader_commit = raft->commit_index,
2346 .entries = &raft->entries[peer->next_index - raft->log_start],
2347 .n_entries = n,
2348 },
2349 };
2350 raft_send(raft, &rq);
2351}
2352
2353static void
2354raft_send_heartbeats(struct raft *raft)
2355{
2356 struct raft_server *s;
2357 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2358 if (!uuid_equals(&raft->sid, &s->sid)) {
2359 raft_send_append_request(raft, s, 0, "heartbeat");
2360 }
2361 }
2362
2363 /* Send anyone waiting for a command to complete a ping to let them
2364 * know we're still working on it. */
2365 struct raft_command *cmd;
2366 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2367 if (!uuid_is_zero(&cmd->sid)) {
2368 raft_send_execute_command_reply(raft, &cmd->sid,
2369 &cmd->eid,
2370 RAFT_CMD_INCOMPLETE, 0);
2371 }
2372 }
2373}
2374
2375/* Initializes the fields in 's' that represent the leader's view of the
2376 * server. */
2377static void
2378raft_server_init_leader(struct raft *raft, struct raft_server *s)
2379{
2380 s->next_index = raft->log_end;
2381 s->match_index = 0;
2382 s->phase = RAFT_PHASE_STABLE;
2383}
2384
2385static void
2386raft_become_leader(struct raft *raft)
2387{
2388 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2389
2390 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
2391 VLOG_INFO_RL(&rl, "term %"PRIu64": elected leader by %d+ of "
2392 "%"PRIuSIZE" servers", raft->term,
2393 raft->n_votes, hmap_count(&raft->servers));
2394
2395 ovs_assert(raft->role != RAFT_LEADER);
2396 raft->role = RAFT_LEADER;
2397 raft->leader_sid = raft->sid;
2398 raft->election_timeout = LLONG_MAX;
2399 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
2400
2401 struct raft_server *s;
2402 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2403 raft_server_init_leader(raft, s);
2404 }
2405
2406 raft_update_our_match_index(raft, raft->log_end - 1);
2407 raft_send_heartbeats(raft);
2408
2409 /* Write the fact that we are leader to the log. This is not used by the
2410 * algorithm (although it could be, for quick restart), but it is used for
2411 * offline analysis to check for conformance with the properties that Raft
2412 * guarantees. */
2413 struct raft_record r = {
2414 .type = RAFT_REC_LEADER,
2415 .term = raft->term,
2416 .sid = raft->sid,
2417 };
2418 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2419
2420 /* Initiate a no-op commit. Otherwise we might never find out what's in
2421 * the log. See section 6.4 item 1:
2422 *
2423 * The Leader Completeness Property guarantees that a leader has all
2424 * committed entries, but at the start of its term, it may not know
2425 * which those are. To find out, it needs to commit an entry from its
2426 * term. Raft handles this by having each leader commit a blank no-op
2427 * entry into the log at the start of its term. As soon as this no-op
2428 * entry is committed, the leader’s commit index will be at least as
2429 * large as any other servers’ during its term.
2430 */
2431 raft_command_unref(raft_command_execute__(raft, NULL, NULL, NULL, NULL));
2432}
2433
2434/* Processes term 'term' received as part of RPC 'common'. Returns true if the
2435 * caller should continue processing the RPC, false if the caller should reject
2436 * it due to a stale term. */
2437static bool
2438raft_receive_term__(struct raft *raft, const struct raft_rpc_common *common,
2439 uint64_t term)
2440{
2441 /* Section 3.3 says:
2442 *
2443 * Current terms are exchanged whenever servers communicate; if one
2444 * server’s current term is smaller than the other’s, then it updates
2445 * its current term to the larger value. If a candidate or leader
2446 * discovers that its term is out of date, it immediately reverts to
2447 * follower state. If a server receives a request with a stale term
2448 * number, it rejects the request.
2449 */
2450 if (term > raft->term) {
2451 if (!raft_set_term(raft, term, NULL)) {
2452 /* Failed to update the term to 'term'. */
2453 return false;
2454 }
2455 raft_become_follower(raft);
2456 } else if (term < raft->term) {
2457 char buf[SID_LEN + 1];
2458 VLOG_INFO("rejecting term %"PRIu64" < current term %"PRIu64" received "
2459 "in %s message from server %s",
2460 term, raft->term,
2461 raft_rpc_type_to_string(common->type),
2462 raft_get_nickname(raft, &common->sid, buf, sizeof buf));
2463 return false;
2464 }
2465 return true;
2466}
2467
2468static void
2469raft_get_servers_from_log(struct raft *raft, enum vlog_level level)
2470{
2471 const struct json *servers_json = raft->snap.servers;
2472 for (uint64_t index = raft->log_end - 1; index >= raft->log_start;
2473 index--) {
2474 struct raft_entry *e = &raft->entries[index - raft->log_start];
2475 if (e->servers) {
2476 servers_json = e->servers;
2477 break;
2478 }
2479 }
2480
2481 struct hmap servers;
2482 struct ovsdb_error *error = raft_servers_from_json(servers_json, &servers);
2483 ovs_assert(!error);
2484 raft_set_servers(raft, &servers, level);
2485 raft_servers_destroy(&servers);
2486}
2487
2488/* Truncates the log, so that raft->log_end becomes 'new_end'.
2489 *
2490 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2491 * log file, but we don't have the right information to know how long it should
2492 * be. What we actually do is to append entries for older indexes to the
2493 * on-disk log; when we re-read it later, these entries truncate the log.
2494 *
2495 * Returns true if any of the removed log entries were server configuration
2496 * entries, false otherwise. */
2497static bool
2498raft_truncate(struct raft *raft, uint64_t new_end)
2499{
2500 ovs_assert(new_end >= raft->log_start);
2501 if (raft->log_end > new_end) {
2502 char buf[SID_LEN + 1];
2503 VLOG_INFO("%s truncating %"PRIu64 " entries from end of log",
2504 raft_get_nickname(raft, &raft->sid, buf, sizeof buf),
2505 raft->log_end - new_end);
2506 }
2507
2508 bool servers_changed = false;
2509 while (raft->log_end > new_end) {
2510 struct raft_entry *entry = &raft->entries[--raft->log_end
2511 - raft->log_start];
2512 if (entry->servers) {
2513 servers_changed = true;
2514 }
2515 raft_entry_uninit(entry);
2516 }
2517 return servers_changed;
2518}
2519
2520static const struct json *
2521raft_peek_next_entry(struct raft *raft, struct uuid *eid)
2522{
2523 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2524 ovs_assert(raft->log_start <= raft->last_applied + 2);
2525 ovs_assert(raft->last_applied <= raft->commit_index);
2526 ovs_assert(raft->commit_index < raft->log_end);
2527
2528 if (raft->joining || raft->failed) {
2529 return NULL;
2530 }
2531
2532 if (raft->log_start == raft->last_applied + 2) {
2533 *eid = raft->snap.eid;
2534 return raft->snap.data;
2535 }
2536
2537 while (raft->last_applied < raft->commit_index) {
2538 const struct raft_entry *e = raft_get_entry(raft,
2539 raft->last_applied + 1);
2540 if (e->data) {
2541 *eid = e->eid;
2542 return e->data;
2543 }
2544 raft->last_applied++;
2545 }
2546 return NULL;
2547}
2548
2549static const struct json *
2550raft_get_next_entry(struct raft *raft, struct uuid *eid)
2551{
2552 const struct json *data = raft_peek_next_entry(raft, eid);
2553 if (data) {
2554 raft->last_applied++;
2555 }
2556 return data;
2557}
2558
2559static void
2560raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
2561{
2562 if (new_commit_index <= raft->commit_index) {
2563 return;
2564 }
2565
2566 if (raft->role == RAFT_LEADER) {
2567 while (raft->commit_index < new_commit_index) {
2568 uint64_t index = ++raft->commit_index;
2569 const struct raft_entry *e = raft_get_entry(raft, index);
1b1d2e6d
BP
2570 if (e->data) {
2571 struct raft_command *cmd
2572 = raft_find_command_by_index(raft, index);
2573 if (cmd) {
2574 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2575 }
2576 }
8b37ed75
BP
2577 if (e->servers) {
2578 /* raft_run_reconfigure() can write a new Raft entry, which can
2579 * reallocate raft->entries, which would invalidate 'e', so
2580 * this case must be last, after the one for 'e->data'. */
2581 raft_run_reconfigure(raft);
2582 }
1b1d2e6d
BP
2583 }
2584 } else {
2585 raft->commit_index = new_commit_index;
2586 }
2587
2588 /* Write the commit index to the log. The next time we restart, this
2589 * allows us to start exporting a reasonably fresh log, instead of a log
2590 * that only contains the snapshot. */
2591 struct raft_record r = {
2592 .type = RAFT_REC_COMMIT_INDEX,
2593 .commit_index = raft->commit_index,
2594 };
2595 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2596}
2597
2598/* This doesn't use rq->entries (but it does use rq->n_entries). */
2599static void
2600raft_send_append_reply(struct raft *raft, const struct raft_append_request *rq,
2601 enum raft_append_result result, const char *comment)
2602{
2603 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2604 * min(leaderCommit, index of last new entry)" */
2605 if (result == RAFT_APPEND_OK && rq->leader_commit > raft->commit_index) {
2606 raft_update_commit_index(
2607 raft, MIN(rq->leader_commit, rq->prev_log_index + rq->n_entries));
2608 }
2609
2610 /* Send reply. */
2611 union raft_rpc reply = {
2612 .append_reply = {
2613 .common = {
2614 .type = RAFT_RPC_APPEND_REPLY,
2615 .sid = rq->common.sid,
2616 .comment = CONST_CAST(char *, comment),
2617 },
2618 .term = raft->term,
2619 .log_end = raft->log_end,
2620 .prev_log_index = rq->prev_log_index,
2621 .prev_log_term = rq->prev_log_term,
2622 .n_entries = rq->n_entries,
2623 .result = result,
2624 }
2625 };
2626 raft_send(raft, &reply);
2627}
2628
2629/* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2630 * NULL. Otherwise, returns an explanation for the mismatch. */
2631static const char *
2632match_index_and_term(const struct raft *raft,
2633 uint64_t prev_log_index, uint64_t prev_log_term)
2634{
2635 if (prev_log_index < raft->log_start - 1) {
2636 return "mismatch before start of log";
2637 } else if (prev_log_index == raft->log_start - 1) {
2638 if (prev_log_term != raft->snap.term) {
2639 return "prev_term mismatch";
2640 }
2641 } else if (prev_log_index < raft->log_end) {
2642 if (raft->entries[prev_log_index - raft->log_start].term
2643 != prev_log_term) {
2644 return "term mismatch";
2645 }
2646 } else {
2647 /* prev_log_index >= raft->log_end */
2648 return "mismatch past end of log";
2649 }
2650 return NULL;
2651}
2652
2653static void
2654raft_handle_append_entries(struct raft *raft,
2655 const struct raft_append_request *rq,
2656 uint64_t prev_log_index, uint64_t prev_log_term,
2657 const struct raft_entry *entries,
2658 unsigned int n_entries)
2659{
2660 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2661 * the index and term of the entry in its log that immediately precedes
2662 * the new entries. If the follower does not find an entry in its log
2663 * with the same index and term, then it refuses the new entries." */
2664 const char *mismatch = match_index_and_term(raft, prev_log_index,
2665 prev_log_term);
2666 if (mismatch) {
2667 VLOG_INFO("rejecting append_request because previous entry "
2668 "%"PRIu64",%"PRIu64" not in local log (%s)",
2669 prev_log_term, prev_log_index, mismatch);
2670 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY, mismatch);
2671 return;
2672 }
2673
2674 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2675 * index but different terms), delete the existing entry and all that
2676 * follow it." */
2677 unsigned int i;
2678 bool servers_changed = false;
2679 for (i = 0; ; i++) {
2680 if (i >= n_entries) {
2681 /* No change. */
2682 if (rq->common.comment
2683 && !strcmp(rq->common.comment, "heartbeat")) {
2684 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "heartbeat");
2685 } else {
2686 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2687 }
2688 return;
2689 }
2690
2691 uint64_t log_index = (prev_log_index + 1) + i;
2692 if (log_index >= raft->log_end) {
2693 break;
2694 }
2695 if (raft->entries[log_index - raft->log_start].term
2696 != entries[i].term) {
2697 if (raft_truncate(raft, log_index)) {
2698 servers_changed = true;
2699 }
2700 break;
2701 }
2702 }
2703
2704 /* Figure 3.1: "Append any entries not already in the log." */
2705 struct ovsdb_error *error = NULL;
2706 bool any_written = false;
2707 for (; i < n_entries; i++) {
2708 const struct raft_entry *e = &entries[i];
2709 error = raft_write_entry(raft, e->term,
2710 json_nullable_clone(e->data), &e->eid,
2711 json_nullable_clone(e->servers));
2712 if (error) {
2713 break;
2714 }
2715 any_written = true;
2716 if (e->servers) {
2717 servers_changed = true;
2718 }
2719 }
2720
2721 if (any_written) {
2722 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index
2723 = raft->log_end - 1;
2724 }
2725 if (servers_changed) {
2726 /* The set of servers might have changed; check. */
2727 raft_get_servers_from_log(raft, VLL_INFO);
2728 }
2729
2730 if (error) {
2731 char *s = ovsdb_error_to_string_free(error);
2732 VLOG_ERR("%s", s);
2733 free(s);
2734 raft_send_append_reply(raft, rq, RAFT_APPEND_IO_ERROR, "I/O error");
2735 return;
2736 }
2737
2738 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "log updated");
2739}
2740
2741static bool
2742raft_update_leader(struct raft *raft, const struct uuid *sid)
2743{
2744 if (raft->role == RAFT_LEADER) {
2745 char buf[SID_LEN + 1];
2746 VLOG_ERR("this server is leader but server %s claims to be",
2747 raft_get_nickname(raft, sid, buf, sizeof buf));
2748 return false;
2749 } else if (!uuid_equals(sid, &raft->leader_sid)) {
2750 if (!uuid_is_zero(&raft->leader_sid)) {
2751 char buf1[SID_LEN + 1];
2752 char buf2[SID_LEN + 1];
2753 VLOG_ERR("leader for term %"PRIu64" changed from %s to %s",
2754 raft->term,
2755 raft_get_nickname(raft, &raft->leader_sid,
2756 buf1, sizeof buf1),
2757 raft_get_nickname(raft, sid, buf2, sizeof buf2));
2758 } else {
2759 char buf[SID_LEN + 1];
2760 VLOG_INFO("server %s is leader for term %"PRIu64,
2761 raft_get_nickname(raft, sid, buf, sizeof buf),
2762 raft->term);
2763 }
2764 raft->leader_sid = *sid;
2765
2766 /* Record the leader to the log. This is not used by the algorithm
2767 * (although it could be, for quick restart), but it is used for
2768 * offline analysis to check for conformance with the properties
2769 * that Raft guarantees. */
2770 struct raft_record r = {
2771 .type = RAFT_REC_LEADER,
2772 .term = raft->term,
2773 .sid = *sid,
2774 };
2775 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2776 }
2777 return true;
2778}
2779
2780static void
2781raft_handle_append_request(struct raft *raft,
2782 const struct raft_append_request *rq)
2783{
2784 /* We do not check whether the server that sent the request is part of the
2785 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
2786 * from a leader that is not part of the server’s latest configuration.
2787 * Otherwise, a new server could never be added to the cluster (it would
2788 * never accept any log entries preceding the configuration entry that adds
2789 * the server)." */
2790 if (!raft_update_leader(raft, &rq->common.sid)) {
2791 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
2792 "usurped leadership");
2793 return;
2794 }
2795 raft_reset_timer(raft);
2796
2797 /* First check for the common case, where the AppendEntries request is
2798 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
2799 * like this:
2800 *
2801 * rq->prev_log_index
2802 * | first_entry_index
2803 * | | nth_entry_index
2804 * | | |
2805 * v v v
2806 * +---+---+---+---+
2807 * T | T | T | T | T |
2808 * +---+-------+---+
2809 * +---+---+---+---+
2810 * T | T | T | T | T |
2811 * +---+---+---+---+
2812 * ^ ^
2813 * | |
2814 * log_start log_end
2815 * */
2816 uint64_t first_entry_index = rq->prev_log_index + 1;
2817 uint64_t nth_entry_index = rq->prev_log_index + rq->n_entries;
2818 if (OVS_LIKELY(first_entry_index >= raft->log_start)) {
2819 raft_handle_append_entries(raft, rq,
2820 rq->prev_log_index, rq->prev_log_term,
2821 rq->entries, rq->n_entries);
2822 return;
2823 }
2824
2825 /* Now a series of checks for odd cases, where the AppendEntries request
2826 * extends earlier than the beginning of our log, into the log entries
2827 * discarded by the most recent snapshot. */
2828
2829 /*
2830 * Handle the case where the indexes covered by rq->entries[] are entirely
2831 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
2832 * everything in the AppendEntries request must already have been
2833 * committed, and we might as well return true.
2834 *
2835 * rq->prev_log_index
2836 * | first_entry_index
2837 * | | nth_entry_index
2838 * | | |
2839 * v v v
2840 * +---+---+---+---+
2841 * T | T | T | T | T |
2842 * +---+-------+---+
2843 * +---+---+---+---+
2844 * T | T | T | T | T |
2845 * +---+---+---+---+
2846 * ^ ^
2847 * | |
2848 * log_start log_end
2849 */
2850 if (nth_entry_index < raft->log_start - 1) {
2851 raft_send_append_reply(raft, rq, RAFT_APPEND_OK,
2852 "append before log start");
2853 return;
2854 }
2855
2856 /*
2857 * Handle the case where the last entry in rq->entries[] has the same index
2858 * as 'log_start - 1', so we can compare their terms:
2859 *
2860 * rq->prev_log_index
2861 * | first_entry_index
2862 * | | nth_entry_index
2863 * | | |
2864 * v v v
2865 * +---+---+---+---+
2866 * T | T | T | T | T |
2867 * +---+-------+---+
2868 * +---+---+---+---+
2869 * T | T | T | T | T |
2870 * +---+---+---+---+
2871 * ^ ^
2872 * | |
2873 * log_start log_end
2874 *
2875 * There's actually a sub-case where rq->n_entries == 0, in which we
2876 * compare rq->prev_term:
2877 *
2878 * rq->prev_log_index
2879 * |
2880 * |
2881 * |
2882 * v
2883 * T
2884 *
2885 * +---+---+---+---+
2886 * T | T | T | T | T |
2887 * +---+---+---+---+
2888 * ^ ^
2889 * | |
2890 * log_start log_end
2891 */
2892 if (nth_entry_index == raft->log_start - 1) {
2893 if (rq->n_entries
2894 ? raft->snap.term == rq->entries[rq->n_entries - 1].term
2895 : raft->snap.term == rq->prev_log_term) {
2896 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2897 } else {
2898 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
2899 "term mismatch");
2900 }
2901 return;
2902 }
2903
2904 /*
2905 * We now know that the data in rq->entries[] overlaps the data in
2906 * raft->entries[], as shown below, with some positive 'ofs':
2907 *
2908 * rq->prev_log_index
2909 * | first_entry_index
2910 * | | nth_entry_index
2911 * | | |
2912 * v v v
2913 * +---+---+---+---+---+
2914 * T | T | T | T | T | T |
2915 * +---+-------+---+---+
2916 * +---+---+---+---+
2917 * T | T | T | T | T |
2918 * +---+---+---+---+
2919 * ^ ^
2920 * | |
2921 * log_start log_end
2922 *
2923 * |<-- ofs -->|
2924 *
2925 * We transform this into the following by trimming the first 'ofs'
2926 * elements off of rq->entries[], ending up with the following. Notice how
2927 * we retain the term but not the data for rq->entries[ofs - 1]:
2928 *
2929 * first_entry_index + ofs - 1
2930 * | first_entry_index + ofs
2931 * | | nth_entry_index + ofs
2932 * | | |
2933 * v v v
2934 * +---+---+
2935 * T | T | T |
2936 * +---+---+
2937 * +---+---+---+---+
2938 * T | T | T | T | T |
2939 * +---+---+---+---+
2940 * ^ ^
2941 * | |
2942 * log_start log_end
2943 */
2944 uint64_t ofs = raft->log_start - first_entry_index;
2945 raft_handle_append_entries(raft, rq,
2946 raft->log_start - 1, rq->entries[ofs - 1].term,
2947 &rq->entries[ofs], rq->n_entries - ofs);
2948}
2949
2950/* Returns true if 'raft' has another log entry or snapshot to read. */
2951bool
2952raft_has_next_entry(const struct raft *raft_)
2953{
2954 struct raft *raft = CONST_CAST(struct raft *, raft_);
2955 struct uuid eid;
2956 return raft_peek_next_entry(raft, &eid) != NULL;
2957}
2958
2959/* Returns the next log entry or snapshot from 'raft', or NULL if there are
2960 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
2961 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
2962 * log entry. */
2963const struct json *
2964raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
2965{
2966 const struct json *data = raft_get_next_entry(raft, eid);
2967 *is_snapshot = data == raft->snap.data;
2968 return data;
2969}
2970
2971/* Returns the log index of the last-read snapshot or log entry. */
2972uint64_t
2973raft_get_applied_index(const struct raft *raft)
2974{
2975 return raft->last_applied;
2976}
2977
2978/* Returns the log index of the last snapshot or log entry that is available to
2979 * be read. */
2980uint64_t
2981raft_get_commit_index(const struct raft *raft)
2982{
2983 return raft->commit_index;
2984}
2985
2986static struct raft_server *
2987raft_find_peer(struct raft *raft, const struct uuid *uuid)
2988{
2989 struct raft_server *s = raft_find_server(raft, uuid);
2990 return s && !uuid_equals(&raft->sid, &s->sid) ? s : NULL;
2991}
2992
2993static struct raft_server *
2994raft_find_new_server(struct raft *raft, const struct uuid *uuid)
2995{
2996 return raft_server_find(&raft->add_servers, uuid);
2997}
2998
2999/* Figure 3.1: "If there exists an N such that N > commitIndex, a
3000 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
3001 * commitIndex = N (sections 3.5 and 3.6)." */
3002static void
3003raft_consider_updating_commit_index(struct raft *raft)
3004{
3005 /* This loop cannot just bail out when it comes across a log entry that
3006 * does not match the criteria. For example, Figure 3.7(d2) shows a
3007 * case where the log entry for term 2 cannot be committed directly
3008 * (because it is not for the current term) but it can be committed as
3009 * a side effect of commit the entry for term 4 (the current term).
3010 * XXX Is there a more efficient way to do this? */
3011 ovs_assert(raft->role == RAFT_LEADER);
3012
3013 uint64_t new_commit_index = raft->commit_index;
3014 for (uint64_t idx = MAX(raft->commit_index + 1, raft->log_start);
3015 idx < raft->log_end; idx++) {
3016 if (raft->entries[idx - raft->log_start].term == raft->term) {
3017 size_t count = 0;
3018 struct raft_server *s2;
3019 HMAP_FOR_EACH (s2, hmap_node, &raft->servers) {
3020 if (s2->match_index >= idx) {
3021 count++;
3022 }
3023 }
3024 if (count > hmap_count(&raft->servers) / 2) {
3025 VLOG_DBG("index %"PRIu64" committed to %"PRIuSIZE" servers, "
3026 "applying", idx, count);
3027 new_commit_index = idx;
3028 }
3029 }
3030 }
3031 raft_update_commit_index(raft, new_commit_index);
3032}
3033
3034static void
3035raft_update_match_index(struct raft *raft, struct raft_server *s,
3036 uint64_t min_index)
3037{
3038 ovs_assert(raft->role == RAFT_LEADER);
3039 if (min_index > s->match_index) {
3040 s->match_index = min_index;
3041 raft_consider_updating_commit_index(raft);
3042 }
3043}
3044
3045static void
3046raft_update_our_match_index(struct raft *raft, uint64_t min_index)
3047{
3048 raft_update_match_index(raft, raft_find_server(raft, &raft->sid),
3049 min_index);
3050}
3051
3052static void
3053raft_send_install_snapshot_request(struct raft *raft,
3054 const struct raft_server *s,
3055 const char *comment)
3056{
3057 union raft_rpc rpc = {
3058 .install_snapshot_request = {
3059 .common = {
3060 .type = RAFT_RPC_INSTALL_SNAPSHOT_REQUEST,
3061 .sid = s->sid,
3062 .comment = CONST_CAST(char *, comment),
3063 },
3064 .term = raft->term,
3065 .last_index = raft->log_start - 1,
3066 .last_term = raft->snap.term,
3067 .last_servers = raft->snap.servers,
3068 .last_eid = raft->snap.eid,
3069 .data = raft->snap.data,
3070 }
3071 };
3072 raft_send(raft, &rpc);
3073}
3074
3075static void
3076raft_handle_append_reply(struct raft *raft,
3077 const struct raft_append_reply *rpy)
3078{
3079 if (raft->role != RAFT_LEADER) {
3080 VLOG_INFO("rejected append_reply (not leader)");
3081 return;
3082 }
3083
3084 /* Most commonly we'd be getting an AppendEntries reply from a configured
3085 * server (e.g. a peer), but we can also get them from servers in the
3086 * process of being added. */
3087 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3088 if (!s) {
3089 s = raft_find_new_server(raft, &rpy->common.sid);
3090 if (!s) {
3091 VLOG_INFO("rejected append_reply from unknown server "SID_FMT,
3092 SID_ARGS(&rpy->common.sid));
3093 return;
3094 }
3095 }
3096
3097 if (rpy->result == RAFT_APPEND_OK) {
3098 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3099 * follower (section 3.5)." */
3100 uint64_t min_index = rpy->prev_log_index + rpy->n_entries + 1;
3101 if (s->next_index < min_index) {
3102 s->next_index = min_index;
3103 }
3104 raft_update_match_index(raft, s, min_index - 1);
3105 } else {
3106 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3107 * decrement nextIndex and retry (section 3.5)."
3108 *
3109 * We also implement the optimization suggested in section 4.2.1:
3110 * "Various approaches can make nextIndex converge to its correct value
3111 * more quickly, including those described in Chapter 3. The simplest
3112 * approach to solving this particular problem of adding a new server,
3113 * however, is to have followers return the length of their logs in the
3114 * AppendEntries response; this allows the leader to cap the follower’s
3115 * nextIndex accordingly." */
3116 s->next_index = (s->next_index > 0
3117 ? MIN(s->next_index - 1, rpy->log_end)
3118 : 0);
3119
3120 if (rpy->result == RAFT_APPEND_IO_ERROR) {
3121 /* Append failed but not because of a log inconsistency. Because
3122 * of the I/O error, there's no point in re-sending the append
3123 * immediately. */
3124 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3125 VLOG_INFO_RL(&rl, "%s reported I/O error", s->nickname);
3126 return;
3127 }
3128 }
3129
3130 /*
3131 * Our behavior here must depend on the value of next_index relative to
3132 * log_start and log_end. There are three cases:
3133 *
3134 * Case 1 | Case 2 | Case 3
3135 * <---------------->|<------------->|<------------------>
3136 * | |
3137 *
3138 * +---+---+---+---+
3139 * T | T | T | T | T |
3140 * +---+---+---+---+
3141 * ^ ^
3142 * | |
3143 * log_start log_end
3144 */
3145 if (s->next_index < raft->log_start) {
3146 /* Case 1. */
3147 raft_send_install_snapshot_request(raft, s, NULL);
3148 } else if (s->next_index < raft->log_end) {
3149 /* Case 2. */
3150 raft_send_append_request(raft, s, 1, NULL);
3151 } else {
3152 /* Case 3. */
3153 if (s->phase == RAFT_PHASE_CATCHUP) {
3154 s->phase = RAFT_PHASE_CAUGHT_UP;
3155 raft_run_reconfigure(raft);
3156 }
3157 }
3158}
3159
3160static bool
3161raft_should_suppress_disruptive_server(struct raft *raft,
3162 const union raft_rpc *rpc)
3163{
3164 if (rpc->type != RAFT_RPC_VOTE_REQUEST) {
3165 return false;
3166 }
3167
3168 /* Section 4.2.3 "Disruptive Servers" says:
3169 *
3170 * ...if a server receives a RequestVote request within the minimum
3171 * election timeout of hearing from a current leader, it does not update
3172 * its term or grant its vote...
3173 *
3174 * ...This change conflicts with the leadership transfer mechanism as
3175 * described in Chapter 3, in which a server legitimately starts an
3176 * election without waiting an election timeout. In that case,
3177 * RequestVote messages should be processed by other servers even when
3178 * they believe a current cluster leader exists. Those RequestVote
3179 * requests can include a special flag to indicate this behavior (“I
3180 * have permission to disrupt the leader--it told me to!”).
3181 *
3182 * This clearly describes how the followers should act, but not the leader.
3183 * We just ignore vote requests that arrive at a current leader. This
3184 * seems to be fairly safe, since a majority other than the current leader
3185 * can still elect a new leader and the first AppendEntries from that new
3186 * leader will depose the current leader. */
3187 const struct raft_vote_request *rq = raft_vote_request_cast(rpc);
3188 if (rq->leadership_transfer) {
3189 return false;
3190 }
3191
3192 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3193 long long int now = time_msec();
3194 switch (raft->role) {
3195 case RAFT_LEADER:
3196 VLOG_WARN_RL(&rl, "ignoring vote request received as leader");
3197 return true;
3198
3199 case RAFT_FOLLOWER:
3200 if (now < raft->election_base + ELECTION_BASE_MSEC) {
3201 VLOG_WARN_RL(&rl, "ignoring vote request received after only "
3202 "%lld ms (minimum election time is %d ms)",
3203 now - raft->election_base, ELECTION_BASE_MSEC);
3204 return true;
3205 }
3206 return false;
3207
3208 case RAFT_CANDIDATE:
3209 return false;
3210
3211 default:
3212 OVS_NOT_REACHED();
3213 }
3214}
3215
3216/* Returns true if a reply should be sent. */
3217static bool
3218raft_handle_vote_request__(struct raft *raft,
3219 const struct raft_vote_request *rq)
3220{
3221 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3222 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3223 * 3.6)." */
3224 if (uuid_equals(&raft->vote, &rq->common.sid)) {
3225 /* Already voted for this candidate in this term. Resend vote. */
3226 return true;
3227 } else if (!uuid_is_zero(&raft->vote)) {
3228 /* Already voted for different candidate in this term. Send a reply
3229 * saying what candidate we did vote for. This isn't a necessary part
3230 * of the Raft protocol but it can make debugging easier. */
3231 return true;
3232 }
3233
3234 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3235 * includes information about the candidate’s log, and the voter denies its
3236 * vote if its own log is more up-to-date than that of the candidate. Raft
3237 * determines which of two logs is more up-to-date by comparing the index
3238 * and term of the last entries in the logs. If the logs have last entries
3239 * with different terms, then the log with the later term is more
3240 * up-to-date. If the logs end with the same term, then whichever log is
3241 * longer is more up-to-date." */
3242 uint64_t last_term = (raft->log_end > raft->log_start
3243 ? raft->entries[raft->log_end - 1
3244 - raft->log_start].term
3245 : raft->snap.term);
3246 if (last_term > rq->last_log_term
3247 || (last_term == rq->last_log_term
3248 && raft->log_end - 1 > rq->last_log_index)) {
3249 /* Our log is more up-to-date than the peer's. Withhold vote. */
3250 return false;
3251 }
3252
3253 /* Record a vote for the peer. */
3254 if (!raft_set_term(raft, raft->term, &rq->common.sid)) {
3255 return false;
3256 }
3257
3258 raft_reset_timer(raft);
3259
3260 return true;
3261}
3262
3263static void
3264raft_send_vote_reply(struct raft *raft, const struct uuid *dst,
3265 const struct uuid *vote)
3266{
3267 union raft_rpc rpy = {
3268 .vote_reply = {
3269 .common = {
3270 .type = RAFT_RPC_VOTE_REPLY,
3271 .sid = *dst,
3272 },
3273 .term = raft->term,
3274 .vote = *vote,
3275 },
3276 };
3277 raft_send(raft, &rpy);
3278}
3279
3280static void
3281raft_handle_vote_request(struct raft *raft,
3282 const struct raft_vote_request *rq)
3283{
3284 if (raft_handle_vote_request__(raft, rq)) {
3285 raft_send_vote_reply(raft, &rq->common.sid, &raft->vote);
3286 }
3287}
3288
3289static void
3290raft_handle_vote_reply(struct raft *raft,
3291 const struct raft_vote_reply *rpy)
3292{
3293 if (!raft_receive_term__(raft, &rpy->common, rpy->term)) {
3294 return;
3295 }
3296
3297 if (raft->role != RAFT_CANDIDATE) {
3298 return;
3299 }
3300
3301 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3302 if (s) {
3303 raft_accept_vote(raft, s, &rpy->vote);
3304 }
3305}
3306
3307/* Returns true if 'raft''s log contains reconfiguration entries that have not
3308 * yet been committed. */
3309static bool
3310raft_has_uncommitted_configuration(const struct raft *raft)
3311{
3312 for (uint64_t i = raft->commit_index + 1; i < raft->log_end; i++) {
3313 ovs_assert(i >= raft->log_start);
3314 const struct raft_entry *e = &raft->entries[i - raft->log_start];
3315 if (e->servers) {
3316 return true;
3317 }
3318 }
3319 return false;
3320}
3321
3322static void
3323raft_log_reconfiguration(struct raft *raft)
3324{
3325 struct json *servers_json = raft_servers_to_json(&raft->servers);
3326 raft_command_unref(raft_command_execute__(
3327 raft, NULL, servers_json, NULL, NULL));
3328 json_destroy(servers_json);
3329}
3330
3331static void
3332raft_run_reconfigure(struct raft *raft)
3333{
3334 ovs_assert(raft->role == RAFT_LEADER);
3335
3336 /* Reconfiguration only progresses when configuration changes commit. */
3337 if (raft_has_uncommitted_configuration(raft)) {
3338 return;
3339 }
3340
3341 /* If we were waiting for a configuration change to commit, it's done. */
3342 struct raft_server *s;
3343 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3344 if (s->phase == RAFT_PHASE_COMMITTING) {
3345 raft_send_add_server_reply__(raft, &s->sid, s->address,
3346 true, RAFT_SERVER_COMPLETED);
3347 s->phase = RAFT_PHASE_STABLE;
3348 }
3349 }
3350 if (raft->remove_server) {
3351 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
3352 &raft->remove_server->requester_sid,
3353 raft->remove_server->requester_conn,
3354 true, RAFT_SERVER_COMPLETED);
3355 raft_server_destroy(raft->remove_server);
3356 raft->remove_server = NULL;
3357 }
3358
3359 /* If a new server is caught up, add it to the configuration. */
3360 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
3361 if (s->phase == RAFT_PHASE_CAUGHT_UP) {
3362 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3363 hmap_remove(&raft->add_servers, &s->hmap_node);
3364 hmap_insert(&raft->servers, &s->hmap_node, uuid_hash(&s->sid));
3365
3366 /* Mark 's' as waiting for commit. */
3367 s->phase = RAFT_PHASE_COMMITTING;
3368
3369 raft_log_reconfiguration(raft);
3370
3371 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3372 * send a RAFT_SERVER_OK reply. */
3373
3374 return;
3375 }
3376 }
3377
3378 /* Remove a server, if one is scheduled for removal. */
3379 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3380 if (s->phase == RAFT_PHASE_REMOVE) {
3381 hmap_remove(&raft->servers, &s->hmap_node);
3382 raft->remove_server = s;
3383
3384 raft_log_reconfiguration(raft);
3385
3386 return;
3387 }
3388 }
3389}
3390
3391static void
3392raft_handle_add_server_request(struct raft *raft,
3393 const struct raft_add_server_request *rq)
3394{
3395 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3396 if (raft->role != RAFT_LEADER) {
3397 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3398 return;
3399 }
3400
3401 /* Check for an existing server. */
3402 struct raft_server *s = raft_find_server(raft, &rq->common.sid);
3403 if (s) {
3404 /* If the server is scheduled to be removed, cancel it. */
3405 if (s->phase == RAFT_PHASE_REMOVE) {
3406 s->phase = RAFT_PHASE_STABLE;
3407 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_CANCELED);
3408 return;
3409 }
3410
3411 /* If the server is being added, then it's in progress. */
3412 if (s->phase != RAFT_PHASE_STABLE) {
3413 raft_send_add_server_reply(raft, rq,
3414 false, RAFT_SERVER_IN_PROGRESS);
3415 }
3416
3417 /* Nothing to do--server is already part of the configuration. */
3418 raft_send_add_server_reply(raft, rq,
3419 true, RAFT_SERVER_ALREADY_PRESENT);
3420 return;
3421 }
3422
3423 /* Check for a server being removed. */
3424 if (raft->remove_server
3425 && uuid_equals(&rq->common.sid, &raft->remove_server->sid)) {
3426 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3427 return;
3428 }
3429
3430 /* Check for a server already being added. */
3431 if (raft_find_new_server(raft, &rq->common.sid)) {
3432 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_IN_PROGRESS);
3433 return;
3434 }
3435
3436 /* Add server to 'add_servers'. */
3437 s = raft_server_add(&raft->add_servers, &rq->common.sid, rq->address);
3438 raft_server_init_leader(raft, s);
3439 s->requester_sid = rq->common.sid;
3440 s->requester_conn = NULL;
3441 s->phase = RAFT_PHASE_CATCHUP;
3442
3443 /* Start sending the log. If this is the first time we've tried to add
3444 * this server, then this will quickly degenerate into an InstallSnapshot
3445 * followed by a series of AddEntries, but if it's a retry of an earlier
3446 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3447 * leadership) then it will gracefully resume populating the log.
3448 *
3449 * See the last few paragraphs of section 4.2.1 for further insight. */
3450 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3451 VLOG_INFO_RL(&rl,
3452 "starting to add server %s ("SID_FMT" at %s) "
3453 "to cluster "CID_FMT, s->nickname, SID_ARGS(&s->sid),
3454 rq->address, CID_ARGS(&raft->cid));
3455 raft_send_append_request(raft, s, 0, "initialize new server");
3456}
3457
3458static void
3459raft_handle_add_server_reply(struct raft *raft,
3460 const struct raft_add_server_reply *rpy)
3461{
3462 if (!raft->joining) {
3463 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3464 VLOG_WARN_RL(&rl, "received add_server_reply even though we're "
3465 "already part of the cluster");
3466 return;
3467 }
3468
3469 if (rpy->success) {
3470 raft->joining = false;
3471
3472 /* It is tempting, at this point, to check that this server is part of
3473 * the current configuration. However, this is not necessarily the
3474 * case, because the log entry that added this server to the cluster
3475 * might have been committed by a majority of the cluster that does not
3476 * include this one. This actually happens in testing. */
3477 } else {
3478 const char *address;
3479 SSET_FOR_EACH (address, &rpy->remote_addresses) {
3480 if (sset_add(&raft->remote_addresses, address)) {
3481 VLOG_INFO("%s: learned new server address for joining cluster",
3482 address);
3483 }
3484 }
3485 }
3486}
3487
3488/* This is called by raft_unixctl_kick() as well as via RPC. */
3489static void
3490raft_handle_remove_server_request(struct raft *raft,
3491 const struct raft_remove_server_request *rq)
3492{
3493 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3494 if (raft->role != RAFT_LEADER) {
3495 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3496 return;
3497 }
3498
3499 /* If the server to remove is currently waiting to be added, cancel it. */
3500 struct raft_server *target = raft_find_new_server(raft, &rq->sid);
3501 if (target) {
3502 raft_send_add_server_reply__(raft, &target->sid, target->address,
3503 false, RAFT_SERVER_CANCELED);
3504 hmap_remove(&raft->add_servers, &target->hmap_node);
3505 raft_server_destroy(target);
3506 return;
3507 }
3508
3509 /* If the server isn't configured, report that. */
3510 target = raft_find_server(raft, &rq->sid);
3511 if (!target) {
3512 raft_send_remove_server_reply(raft, rq,
3513 true, RAFT_SERVER_ALREADY_GONE);
3514 return;
3515 }
3516
3517 /* Check whether we're waiting for the addition of the server to commit. */
3518 if (target->phase == RAFT_PHASE_COMMITTING) {
3519 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3520 return;
3521 }
3522
3523 /* Check whether the server is already scheduled for removal. */
3524 if (target->phase == RAFT_PHASE_REMOVE) {
3525 raft_send_remove_server_reply(raft, rq,
3526 false, RAFT_SERVER_IN_PROGRESS);
3527 return;
3528 }
3529
3530 /* Make sure that if we remove this server then that at least one other
3531 * server will be left. We don't count servers currently being added (in
3532 * 'add_servers') since those could fail. */
3533 struct raft_server *s;
3534 int n = 0;
3535 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3536 if (s != target && s->phase != RAFT_PHASE_REMOVE) {
3537 n++;
3538 }
3539 }
3540 if (!n) {
3541 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_EMPTY);
3542 return;
3543 }
3544
3545 /* Mark the server for removal. */
3546 target->phase = RAFT_PHASE_REMOVE;
3547 if (rq->requester_conn) {
3548 target->requester_sid = UUID_ZERO;
3549 unixctl_command_reply(rq->requester_conn, "started removal");
3550 } else {
3551 target->requester_sid = rq->common.sid;
3552 target->requester_conn = NULL;
3553 }
3554
3555 raft_run_reconfigure(raft);
3556 /* Operation in progress, reply will be sent later. */
3557}
3558
3559static void
3560raft_handle_remove_server_reply(struct raft *raft,
3561 const struct raft_remove_server_reply *rpc)
3562{
3563 if (rpc->success) {
3564 VLOG_INFO(SID_FMT": finished leaving cluster "CID_FMT,
3565 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
3566
3567 raft_record_note(raft, "left", "this server left the cluster");
3568
3569 raft->leaving = false;
3570 raft->left = true;
3571 }
3572}
3573
3574static bool
3575raft_handle_write_error(struct raft *raft, struct ovsdb_error *error)
3576{
3577 if (error && !raft->failed) {
3578 raft->failed = true;
3579
3580 char *s = ovsdb_error_to_string_free(error);
3581 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3582 raft->name, s);
3583 free(s);
3584 }
3585 return !raft->failed;
3586}
3587
3588static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3589raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
3590 uint64_t new_log_start,
3591 const struct raft_entry *new_snapshot)
3592{
3593 struct raft_header h = {
3594 .sid = raft->sid,
3595 .cid = raft->cid,
3596 .name = raft->name,
3597 .local_address = raft->local_address,
3598 .snap_index = new_log_start - 1,
3599 .snap = *new_snapshot,
3600 };
3601 struct ovsdb_error *error = ovsdb_log_write_and_free(
3602 log, raft_header_to_json(&h));
3603 if (error) {
3604 return error;
3605 }
3606 ovsdb_log_mark_base(raft->log);
3607
3608 /* Write log records. */
3609 for (uint64_t index = new_log_start; index < raft->log_end; index++) {
3610 const struct raft_entry *e = &raft->entries[index - raft->log_start];
3611 struct raft_record r = {
3612 .type = RAFT_REC_ENTRY,
3613 .term = e->term,
3614 .entry = {
3615 .index = index,
3616 .data = e->data,
3617 .servers = e->servers,
3618 .eid = e->eid,
3619 },
3620 };
3621 error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3622 if (error) {
3623 return error;
3624 }
3625 }
3626
3627 /* Write term and vote (if any).
3628 *
3629 * The term is redundant if we wrote a log record for that term above. The
3630 * vote, if any, is never redundant.
3631 */
3632 error = raft_write_state(log, raft->term, &raft->vote);
3633 if (error) {
3634 return error;
3635 }
3636
3637 /* Write commit_index if it's beyond the new start of the log. */
3638 if (raft->commit_index >= new_log_start) {
3639 struct raft_record r = {
3640 .type = RAFT_REC_COMMIT_INDEX,
3641 .commit_index = raft->commit_index,
3642 };
3643 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3644 }
3645 return NULL;
3646}
3647
3648static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3649raft_save_snapshot(struct raft *raft,
3650 uint64_t new_start, const struct raft_entry *new_snapshot)
3651
3652{
3653 struct ovsdb_log *new_log;
3654 struct ovsdb_error *error;
3655 error = ovsdb_log_replace_start(raft->log, &new_log);
3656 if (error) {
3657 return error;
3658 }
3659
3660 error = raft_write_snapshot(raft, new_log, new_start, new_snapshot);
3661 if (error) {
3662 ovsdb_log_replace_abort(new_log);
3663 return error;
3664 }
3665
3666 return ovsdb_log_replace_commit(raft->log, new_log);
3667}
3668
3669static bool
3670raft_handle_install_snapshot_request__(
3671 struct raft *raft, const struct raft_install_snapshot_request *rq)
3672{
3673 raft_reset_timer(raft);
3674
3675 /*
3676 * Our behavior here depend on new_log_start in the snapshot compared to
3677 * log_start and log_end. There are three cases:
3678 *
3679 * Case 1 | Case 2 | Case 3
3680 * <---------------->|<------------->|<------------------>
3681 * | |
3682 *
3683 * +---+---+---+---+
3684 * T | T | T | T | T |
3685 * +---+---+---+---+
3686 * ^ ^
3687 * | |
3688 * log_start log_end
3689 */
3690 uint64_t new_log_start = rq->last_index + 1;
3691 if (new_log_start < raft->log_start) {
3692 /* Case 1: The new snapshot covers less than our current one. Nothing
3693 * to do. */
3694 return true;
3695 } else if (new_log_start < raft->log_end) {
3696 /* Case 2: The new snapshot starts in the middle of our log. We could
3697 * discard the first 'new_log_start - raft->log_start' entries in the
3698 * log. But there's not much value in that, since snapshotting is
3699 * supposed to be a local decision. Just skip it. */
3700 return true;
3701 }
3702
3703 /* Case 3: The new snapshot starts past the end of our current log, so
3704 * discard all of our current log. */
3705 const struct raft_entry new_snapshot = {
3706 .term = rq->last_term,
3707 .data = rq->data,
3708 .eid = rq->last_eid,
3709 .servers = rq->last_servers,
3710 };
3711 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3712 &new_snapshot);
3713 if (error) {
3714 char *error_s = ovsdb_error_to_string(error);
3715 VLOG_WARN("could not save snapshot: %s", error_s);
3716 free(error_s);
3717 return false;
3718 }
3719
3720 for (size_t i = 0; i < raft->log_end - raft->log_start; i++) {
3721 raft_entry_uninit(&raft->entries[i]);
3722 }
3723 raft->log_start = raft->log_end = new_log_start;
3724 raft->log_synced = raft->log_end - 1;
3725 raft->commit_index = raft->log_start - 1;
3726 if (raft->last_applied < raft->commit_index) {
3727 raft->last_applied = raft->log_start - 2;
3728 }
3729
3730 raft_entry_uninit(&raft->snap);
3731 raft_entry_clone(&raft->snap, &new_snapshot);
3732
3733 raft_get_servers_from_log(raft, VLL_INFO);
3734
3735 return true;
3736}
3737
3738static void
3739raft_handle_install_snapshot_request(
3740 struct raft *raft, const struct raft_install_snapshot_request *rq)
3741{
3742 if (raft_handle_install_snapshot_request__(raft, rq)) {
3743 union raft_rpc rpy = {
3744 .install_snapshot_reply = {
3745 .common = {
3746 .type = RAFT_RPC_INSTALL_SNAPSHOT_REPLY,
3747 .sid = rq->common.sid,
3748 },
3749 .term = raft->term,
3750 .last_index = rq->last_index,
3751 .last_term = rq->last_term,
3752 },
3753 };
3754 raft_send(raft, &rpy);
3755 }
3756}
3757
3758static void
3759raft_handle_install_snapshot_reply(
3760 struct raft *raft, const struct raft_install_snapshot_reply *rpy)
3761{
3762 /* We might get an InstallSnapshot reply from a configured server (e.g. a
3763 * peer) or a server in the process of being added. */
3764 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3765 if (!s) {
3766 s = raft_find_new_server(raft, &rpy->common.sid);
3767 if (!s) {
3768 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3769 VLOG_INFO_RL(&rl, "cluster "CID_FMT": received %s from "
3770 "unknown server "SID_FMT, CID_ARGS(&raft->cid),
3771 raft_rpc_type_to_string(rpy->common.type),
3772 SID_ARGS(&rpy->common.sid));
3773 return;
3774 }
3775 }
3776
3777 if (rpy->last_index != raft->log_start - 1 ||
3778 rpy->last_term != raft->snap.term) {
3779 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3780 VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s installed "
3781 "out-of-date snapshot, starting over",
3782 CID_ARGS(&raft->cid), s->nickname);
3783 raft_send_install_snapshot_request(raft, s,
3784 "installed obsolete snapshot");
3785 return;
3786 }
3787
3788 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3789 VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
3790 " up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
3791 s->nickname, rpy->last_term, rpy->last_index);
3792 s->next_index = raft->log_end;
3793 raft_send_append_request(raft, s, 0, "snapshot installed");
3794}
3795
3796/* Returns true if 'raft' has grown enough since the last snapshot that
3797 * reducing the log to a snapshot would be valuable, false otherwise. */
3798bool
3799raft_grew_lots(const struct raft *raft)
3800{
3801 return ovsdb_log_grew_lots(raft->log);
3802}
3803
3804/* Returns the number of log entries that could be trimmed off the on-disk log
3805 * by snapshotting. */
3806uint64_t
3807raft_get_log_length(const struct raft *raft)
3808{
3809 return (raft->last_applied < raft->log_start
3810 ? 0
3811 : raft->last_applied - raft->log_start + 1);
3812}
3813
3814/* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
3815 * possible. */
3816bool
3817raft_may_snapshot(const struct raft *raft)
3818{
3819 return (!raft->joining
3820 && !raft->leaving
3821 && !raft->left
3822 && !raft->failed
3823 && raft->last_applied >= raft->log_start);
3824}
3825
3826/* Replaces the log for 'raft', up to the last log entry read, by
3827 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
3828 * the caller must eventually free.
3829 *
3830 * This function can only succeed if raft_may_snapshot() returns true. It is
3831 * only valuable to call it if raft_get_log_length() is significant and
3832 * especially if raft_grew_lots() returns true. */
3833struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3834raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
3835{
3836 if (raft->joining) {
3837 return ovsdb_error(NULL,
3838 "cannot store a snapshot while joining cluster");
3839 } else if (raft->leaving) {
3840 return ovsdb_error(NULL,
3841 "cannot store a snapshot while leaving cluster");
3842 } else if (raft->left) {
3843 return ovsdb_error(NULL,
3844 "cannot store a snapshot after leaving cluster");
3845 } else if (raft->failed) {
3846 return ovsdb_error(NULL,
3847 "cannot store a snapshot following failure");
3848 }
3849
3850 if (raft->last_applied < raft->log_start) {
3851 return ovsdb_error(NULL, "not storing a duplicate snapshot");
3852 }
3853
3854 uint64_t new_log_start = raft->last_applied + 1;
a521491b 3855 struct raft_entry new_snapshot = {
1b1d2e6d 3856 .term = raft_get_term(raft, new_log_start - 1),
a521491b 3857 .data = json_clone(new_snapshot_data),
1b1d2e6d 3858 .eid = *raft_get_eid(raft, new_log_start - 1),
a521491b 3859 .servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
1b1d2e6d
BP
3860 };
3861 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3862 &new_snapshot);
3863 if (error) {
a521491b 3864 raft_entry_uninit(&new_snapshot);
1b1d2e6d
BP
3865 return error;
3866 }
3867
3868 raft->log_synced = raft->log_end - 1;
3869 raft_entry_uninit(&raft->snap);
a521491b 3870 raft->snap = new_snapshot;
1b1d2e6d
BP
3871 for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
3872 raft_entry_uninit(&raft->entries[i]);
3873 }
3874 memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
3875 (raft->log_end - new_log_start) * sizeof *raft->entries);
3876 raft->log_start = new_log_start;
3877 return NULL;
3878}
3879
3880static void
3881raft_handle_become_leader(struct raft *raft,
3882 const struct raft_become_leader *rq)
3883{
3884 if (raft->role == RAFT_FOLLOWER) {
3885 char buf[SID_LEN + 1];
3886 VLOG_INFO("received leadership transfer from %s in term %"PRIu64,
3887 raft_get_nickname(raft, &rq->common.sid, buf, sizeof buf),
3888 rq->term);
3889 raft_start_election(raft, true);
3890 }
3891}
3892
3893static void
3894raft_send_execute_command_reply(struct raft *raft,
3895 const struct uuid *sid,
3896 const struct uuid *eid,
3897 enum raft_command_status status,
3898 uint64_t commit_index)
3899{
3900 union raft_rpc rpc = {
3901 .execute_command_reply = {
3902 .common = {
3903 .type = RAFT_RPC_EXECUTE_COMMAND_REPLY,
3904 .sid = *sid,
3905 },
3906 .result = *eid,
3907 .status = status,
3908 .commit_index = commit_index,
3909 },
3910 };
3911 raft_send(raft, &rpc);
3912}
3913
3914static enum raft_command_status
3915raft_handle_execute_command_request__(
3916 struct raft *raft, const struct raft_execute_command_request *rq)
3917{
3918 if (raft->role != RAFT_LEADER) {
3919 return RAFT_CMD_NOT_LEADER;
3920 }
3921
3922 const struct uuid *current_eid = raft_current_eid(raft);
3923 if (!uuid_equals(&rq->prereq, current_eid)) {
3924 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3925 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
3926 "prerequisite "UUID_FMT" in execute_command_request",
3927 UUID_ARGS(current_eid), UUID_ARGS(&rq->prereq));
3928 return RAFT_CMD_BAD_PREREQ;
3929 }
3930
3931 struct raft_command *cmd = raft_command_initiate(raft, rq->data,
3932 NULL, &rq->result);
3933 cmd->sid = rq->common.sid;
3934
3935 enum raft_command_status status = cmd->status;
3936 if (status != RAFT_CMD_INCOMPLETE) {
3937 raft_command_unref(cmd);
3938 }
3939 return status;
3940}
3941
3942static void
3943raft_handle_execute_command_request(
3944 struct raft *raft, const struct raft_execute_command_request *rq)
3945{
3946 enum raft_command_status status
3947 = raft_handle_execute_command_request__(raft, rq);
3948 if (status != RAFT_CMD_INCOMPLETE) {
3949 raft_send_execute_command_reply(raft, &rq->common.sid, &rq->result,
3950 status, 0);
3951 }
3952}
3953
3954static void
3955raft_handle_execute_command_reply(
3956 struct raft *raft, const struct raft_execute_command_reply *rpy)
3957{
3958 struct raft_command *cmd = raft_find_command_by_eid(raft, &rpy->result);
3959 if (!cmd) {
3960 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3961 char buf[SID_LEN + 1];
3962 VLOG_INFO_RL(&rl,
3963 "%s received \"%s\" reply from %s for unknown command",
3964 raft->local_nickname,
3965 raft_command_status_to_string(rpy->status),
3966 raft_get_nickname(raft, &rpy->common.sid,
3967 buf, sizeof buf));
3968 return;
3969 }
3970
3971 if (rpy->status == RAFT_CMD_INCOMPLETE) {
3972 cmd->timestamp = time_msec();
3973 } else {
3974 cmd->index = rpy->commit_index;
3975 raft_command_complete(raft, cmd, rpy->status);
3976 }
3977}
3978
3979static void
3980raft_handle_rpc(struct raft *raft, const union raft_rpc *rpc)
3981{
3982 uint64_t term = raft_rpc_get_term(rpc);
3983 if (term
3984 && !raft_should_suppress_disruptive_server(raft, rpc)
3985 && !raft_receive_term__(raft, &rpc->common, term)) {
3986 if (rpc->type == RAFT_RPC_APPEND_REQUEST) {
3987 /* Section 3.3: "If a server receives a request with a stale term
3988 * number, it rejects the request." */
3989 raft_send_append_reply(raft, raft_append_request_cast(rpc),
3990 RAFT_APPEND_INCONSISTENCY, "stale term");
3991 }
3992 return;
3993 }
3994
3995 switch (rpc->type) {
3996#define RAFT_RPC(ENUM, NAME) \
3997 case ENUM: \
3998 raft_handle_##NAME(raft, &rpc->NAME); \
3999 break;
4000 RAFT_RPC_TYPES
4001#undef RAFT_RPC
4002 default:
4003 OVS_NOT_REACHED();
4004 }
4005}
4006\f
4007static bool
4008raft_rpc_is_heartbeat(const union raft_rpc *rpc)
4009{
4010 return ((rpc->type == RAFT_RPC_APPEND_REQUEST
4011 || rpc->type == RAFT_RPC_APPEND_REPLY)
4012 && rpc->common.comment
4013 && !strcmp(rpc->common.comment, "heartbeat"));
4014}
4015
4016\f
4017static bool
02acb41a
BP
4018raft_send_to_conn_at(struct raft *raft, const union raft_rpc *rpc,
4019 struct raft_conn *conn, int line_number)
1b1d2e6d 4020{
02acb41a 4021 log_rpc(rpc, "-->", conn, line_number);
1b1d2e6d
BP
4022 return !jsonrpc_session_send(
4023 conn->js, raft_rpc_to_jsonrpc(&raft->cid, &raft->sid, rpc));
4024}
4025
4026static bool
4027raft_is_rpc_synced(const struct raft *raft, const union raft_rpc *rpc)
4028{
4029 uint64_t term = raft_rpc_get_term(rpc);
4030 uint64_t index = raft_rpc_get_min_sync_index(rpc);
4031 const struct uuid *vote = raft_rpc_get_vote(rpc);
4032
4033 return (term <= raft->synced_term
4034 && index <= raft->log_synced
4035 && (!vote || uuid_equals(vote, &raft->synced_vote)));
4036}
4037
4038static bool
02acb41a 4039raft_send_at(struct raft *raft, const union raft_rpc *rpc, int line_number)
1b1d2e6d
BP
4040{
4041 const struct uuid *dst = &rpc->common.sid;
4042 if (uuid_equals(dst, &raft->sid)) {
4043 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
02acb41a
BP
4044 VLOG_WARN_RL(&rl, "attempted to send RPC to self from raft.c:%d",
4045 line_number);
1b1d2e6d
BP
4046 return false;
4047 }
4048
4049 struct raft_conn *conn = raft_find_conn_by_sid(raft, dst);
4050 if (!conn) {
4051 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4052 char buf[SID_LEN + 1];
02acb41a
BP
4053 VLOG_DBG_RL(&rl, "%s: no connection to %s, cannot send RPC "
4054 "from raft.c:%d", raft->local_nickname,
4055 raft_get_nickname(raft, dst, buf, sizeof buf),
4056 line_number);
1b1d2e6d
BP
4057 return false;
4058 }
4059
4060 if (!raft_is_rpc_synced(raft, rpc)) {
4061 raft_waiter_create(raft, RAFT_W_RPC, false)->rpc = raft_rpc_clone(rpc);
4062 return true;
4063 }
4064
02acb41a 4065 return raft_send_to_conn_at(raft, rpc, conn, line_number);
1b1d2e6d
BP
4066}
4067\f
4068static struct raft *
4069raft_lookup_by_name(const char *name)
4070{
4071 struct raft *raft;
4072
4073 HMAP_FOR_EACH_WITH_HASH (raft, hmap_node, hash_string(name, 0),
4074 &all_rafts) {
4075 if (!strcmp(raft->name, name)) {
4076 return raft;
4077 }
4078 }
4079 return NULL;
4080}
4081
4082static void
4083raft_unixctl_cid(struct unixctl_conn *conn,
4084 int argc OVS_UNUSED, const char *argv[],
4085 void *aux OVS_UNUSED)
4086{
4087 struct raft *raft = raft_lookup_by_name(argv[1]);
4088 if (!raft) {
4089 unixctl_command_reply_error(conn, "unknown cluster");
4090 } else if (uuid_is_zero(&raft->cid)) {
4091 unixctl_command_reply_error(conn, "cluster id not yet known");
4092 } else {
4093 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->cid));
4094 unixctl_command_reply(conn, uuid);
4095 free(uuid);
4096 }
4097}
4098
4099static void
4100raft_unixctl_sid(struct unixctl_conn *conn,
4101 int argc OVS_UNUSED, const char *argv[],
4102 void *aux OVS_UNUSED)
4103{
4104 struct raft *raft = raft_lookup_by_name(argv[1]);
4105 if (!raft) {
4106 unixctl_command_reply_error(conn, "unknown cluster");
4107 } else {
4108 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->sid));
4109 unixctl_command_reply(conn, uuid);
4110 free(uuid);
4111 }
4112}
4113
4114static void
4115raft_put_sid(const char *title, const struct uuid *sid,
4116 const struct raft *raft, struct ds *s)
4117{
4118 ds_put_format(s, "%s: ", title);
4119 if (uuid_equals(sid, &raft->sid)) {
4120 ds_put_cstr(s, "self");
4121 } else if (uuid_is_zero(sid)) {
4122 ds_put_cstr(s, "unknown");
4123 } else {
4124 char buf[SID_LEN + 1];
4125 ds_put_cstr(s, raft_get_nickname(raft, sid, buf, sizeof buf));
4126 }
4127 ds_put_char(s, '\n');
4128}
4129
4130static void
4131raft_unixctl_status(struct unixctl_conn *conn,
4132 int argc OVS_UNUSED, const char *argv[],
4133 void *aux OVS_UNUSED)
4134{
4135 struct raft *raft = raft_lookup_by_name(argv[1]);
4136 if (!raft) {
4137 unixctl_command_reply_error(conn, "unknown cluster");
4138 return;
4139 }
4140
4141 struct ds s = DS_EMPTY_INITIALIZER;
4142 ds_put_format(&s, "%s\n", raft->local_nickname);
4143 ds_put_format(&s, "Name: %s\n", raft->name);
4144 ds_put_format(&s, "Cluster ID: ");
4145 if (!uuid_is_zero(&raft->cid)) {
4146 ds_put_format(&s, CID_FMT" ("UUID_FMT")\n",
4147 CID_ARGS(&raft->cid), UUID_ARGS(&raft->cid));
4148 } else {
4149 ds_put_format(&s, "not yet known\n");
4150 }
4151 ds_put_format(&s, "Server ID: "SID_FMT" ("UUID_FMT")\n",
4152 SID_ARGS(&raft->sid), UUID_ARGS(&raft->sid));
4153 ds_put_format(&s, "Address: %s\n", raft->local_address);
4154 ds_put_format(&s, "Status: %s\n",
4155 raft->joining ? "joining cluster"
4156 : raft->leaving ? "leaving cluster"
4157 : raft->left ? "left cluster"
4158 : raft->failed ? "failed"
4159 : "cluster member");
4160 if (raft->joining) {
4161 ds_put_format(&s, "Remotes for joining:");
4162 const char *address;
4163 SSET_FOR_EACH (address, &raft->remote_addresses) {
4164 ds_put_format(&s, " %s", address);
4165 }
4166 ds_put_char(&s, '\n');
4167 }
4168 if (raft->role == RAFT_LEADER) {
4169 struct raft_server *as;
4170 HMAP_FOR_EACH (as, hmap_node, &raft->add_servers) {
4171 ds_put_format(&s, "Adding server %s ("SID_FMT" at %s) (%s)\n",
4172 as->nickname, SID_ARGS(&as->sid), as->address,
4173 raft_server_phase_to_string(as->phase));
4174 }
4175
4176 struct raft_server *rs = raft->remove_server;
4177 if (rs) {
4178 ds_put_format(&s, "Removing server %s ("SID_FMT" at %s) (%s)\n",
4179 rs->nickname, SID_ARGS(&rs->sid), rs->address,
4180 raft_server_phase_to_string(rs->phase));
4181 }
4182 }
4183
4184 ds_put_format(&s, "Role: %s\n",
4185 raft->role == RAFT_LEADER ? "leader"
4186 : raft->role == RAFT_CANDIDATE ? "candidate"
4187 : raft->role == RAFT_FOLLOWER ? "follower"
4188 : "<error>");
4189 ds_put_format(&s, "Term: %"PRIu64"\n", raft->term);
4190 raft_put_sid("Leader", &raft->leader_sid, raft, &s);
4191 raft_put_sid("Vote", &raft->vote, raft, &s);
4192 ds_put_char(&s, '\n');
4193
4194 ds_put_format(&s, "Log: [%"PRIu64", %"PRIu64"]\n",
4195 raft->log_start, raft->log_end);
4196
4197 uint64_t n_uncommitted = raft->log_end - raft->commit_index - 1;
4198 ds_put_format(&s, "Entries not yet committed: %"PRIu64"\n", n_uncommitted);
4199
4200 uint64_t n_unapplied = raft->log_end - raft->last_applied - 1;
4201 ds_put_format(&s, "Entries not yet applied: %"PRIu64"\n", n_unapplied);
4202
4203 const struct raft_conn *c;
4204 ds_put_cstr(&s, "Connections:");
4205 LIST_FOR_EACH (c, list_node, &raft->conns) {
4206 bool connected = jsonrpc_session_is_connected(c->js);
4207 ds_put_format(&s, " %s%s%s%s",
4208 connected ? "" : "(",
4209 c->incoming ? "<-" : "->", c->nickname,
4210 connected ? "" : ")");
4211 }
4212 ds_put_char(&s, '\n');
4213
4214 ds_put_cstr(&s, "Servers:\n");
4215 struct raft_server *server;
4216 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
4217 ds_put_format(&s, " %s ("SID_FMT" at %s)",
4218 server->nickname,
4219 SID_ARGS(&server->sid), server->address);
4220 if (uuid_equals(&server->sid, &raft->sid)) {
4221 ds_put_cstr(&s, " (self)");
4222 }
4223 if (server->phase != RAFT_PHASE_STABLE) {
4224 ds_put_format (&s, " (%s)",
4225 raft_server_phase_to_string(server->phase));
4226 }
4227 if (raft->role == RAFT_CANDIDATE) {
4228 if (!uuid_is_zero(&server->vote)) {
4229 char buf[SID_LEN + 1];
4230 ds_put_format(&s, " (voted for %s)",
4231 raft_get_nickname(raft, &server->vote,
4232 buf, sizeof buf));
4233 }
4234 } else if (raft->role == RAFT_LEADER) {
4235 ds_put_format(&s, " next_index=%"PRIu64" match_index=%"PRIu64,
4236 server->next_index, server->match_index);
4237 }
4238 ds_put_char(&s, '\n');
4239 }
4240
4241 unixctl_command_reply(conn, ds_cstr(&s));
4242 ds_destroy(&s);
4243}
4244
4245static void
4246raft_unixctl_leave__(struct unixctl_conn *conn, struct raft *raft)
4247{
4248 if (raft_is_leaving(raft)) {
4249 unixctl_command_reply_error(conn,
4250 "already in progress leaving cluster");
4251 } else if (raft_is_joining(raft)) {
4252 unixctl_command_reply_error(conn,
4253 "can't leave while join in progress");
4254 } else if (raft_failed(raft)) {
4255 unixctl_command_reply_error(conn,
4256 "can't leave after failure");
4257 } else {
4258 raft_leave(raft);
4259 unixctl_command_reply(conn, NULL);
4260 }
4261}
4262
4263static void
4264raft_unixctl_leave(struct unixctl_conn *conn, int argc OVS_UNUSED,
4265 const char *argv[], void *aux OVS_UNUSED)
4266{
4267 struct raft *raft = raft_lookup_by_name(argv[1]);
4268 if (!raft) {
4269 unixctl_command_reply_error(conn, "unknown cluster");
4270 return;
4271 }
4272
4273 raft_unixctl_leave__(conn, raft);
4274}
4275
4276static struct raft_server *
4277raft_lookup_server_best_match(struct raft *raft, const char *id)
4278{
4279 struct raft_server *best = NULL;
4280 int best_score = -1;
4281 int n_best = 0;
4282
4283 struct raft_server *s;
4284 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
4285 int score = (!strcmp(id, s->address)
4286 ? INT_MAX
4287 : uuid_is_partial_match(&s->sid, id));
4288 if (score > best_score) {
4289 best = s;
4290 best_score = score;
4291 n_best = 1;
4292 } else if (score == best_score) {
4293 n_best++;
4294 }
4295 }
4296 return n_best == 1 ? best : NULL;
4297}
4298
4299static void
4300raft_unixctl_kick(struct unixctl_conn *conn, int argc OVS_UNUSED,
4301 const char *argv[], void *aux OVS_UNUSED)
4302{
4303 const char *cluster_name = argv[1];
4304 const char *server_name = argv[2];
4305
4306 struct raft *raft = raft_lookup_by_name(cluster_name);
4307 if (!raft) {
4308 unixctl_command_reply_error(conn, "unknown cluster");
4309 return;
4310 }
4311
4312 struct raft_server *server = raft_lookup_server_best_match(raft,
4313 server_name);
4314 if (!server) {
4315 unixctl_command_reply_error(conn, "unknown server");
4316 return;
4317 }
4318
4319 if (uuid_equals(&server->sid, &raft->sid)) {
4320 raft_unixctl_leave__(conn, raft);
4321 } else if (raft->role == RAFT_LEADER) {
4322 const struct raft_remove_server_request rq = {
4323 .sid = server->sid,
4324 .requester_conn = conn,
4325 };
4326 raft_handle_remove_server_request(raft, &rq);
4327 } else {
4328 const union raft_rpc rpc = {
4329 .remove_server_request = {
4330 .common = {
4331 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
4332 .sid = raft->leader_sid,
4333 .comment = "via unixctl"
4334 },
4335 .sid = server->sid,
4336 }
4337 };
4338 if (raft_send(raft, &rpc)) {
4339 unixctl_command_reply(conn, "sent removal request to leader");
4340 } else {
4341 unixctl_command_reply_error(conn,
4342 "failed to send removal request");
4343 }
4344 }
4345}
4346
4347static void
4348raft_init(void)
4349{
4350 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
4351 if (!ovsthread_once_start(&once)) {
4352 return;
4353 }
4354 unixctl_command_register("cluster/cid", "DB", 1, 1,
4355 raft_unixctl_cid, NULL);
4356 unixctl_command_register("cluster/sid", "DB", 1, 1,
4357 raft_unixctl_sid, NULL);
4358 unixctl_command_register("cluster/status", "DB", 1, 1,
4359 raft_unixctl_status, NULL);
4360 unixctl_command_register("cluster/leave", "DB", 1, 1,
4361 raft_unixctl_leave, NULL);
4362 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4363 raft_unixctl_kick, NULL);
4364 ovsthread_once_done(&once);
4365}