]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/raft.c
ovsdb: Introduce experimental support for clustered databases.
[mirror_ovs.git] / ovsdb / raft.c
1 /*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18
19 #include "raft.h"
20 #include "raft-private.h"
21
22 #include <errno.h>
23 #include <unistd.h>
24
25 #include "hash.h"
26 #include "jsonrpc.h"
27 #include "lockfile.h"
28 #include "openvswitch/dynamic-string.h"
29 #include "openvswitch/hmap.h"
30 #include "openvswitch/json.h"
31 #include "openvswitch/list.h"
32 #include "openvswitch/poll-loop.h"
33 #include "openvswitch/vlog.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-parser.h"
36 #include "ovsdb/log.h"
37 #include "raft-rpc.h"
38 #include "random.h"
39 #include "socket-util.h"
40 #include "stream.h"
41 #include "timeval.h"
42 #include "unicode.h"
43 #include "unixctl.h"
44 #include "util.h"
45 #include "uuid.h"
46
47 VLOG_DEFINE_THIS_MODULE(raft);
48
49 /* Roles for a Raft server:
50 *
51 * - Followers: Servers in touch with the current leader.
52 *
53 * - Candidate: Servers unaware of a current leader and seeking election to
54 * leader.
55 *
56 * - Leader: Handles all client requests. At most one at a time.
57 *
58 * In normal operation there is exactly one leader and all of the other servers
59 * are followers. */
60 enum raft_role {
61 RAFT_FOLLOWER,
62 RAFT_CANDIDATE,
63 RAFT_LEADER
64 };
65
66 /* A connection between this Raft server and another one. */
67 struct raft_conn {
68 struct ovs_list list_node; /* In struct raft's 'conns' list. */
69 struct jsonrpc_session *js; /* JSON-RPC connection. */
70 struct uuid sid; /* This server's unique ID. */
71 char *nickname; /* Short name for use in log messages. */
72 bool incoming; /* True if incoming, false if outgoing. */
73 unsigned int js_seqno; /* Seqno for noticing (re)connections. */
74 };
75
76 static void raft_conn_close(struct raft_conn *);
77
78 /* A "command", that is, a request to append an entry to the log.
79 *
80 * The Raft specification only allows clients to issue commands to the leader.
81 * With this implementation, clients may issue a command on any server, which
82 * then relays the command to the leader if necessary.
83 *
84 * This structure is thus used in three cases:
85 *
86 * 1. We are the leader and the command was issued to us directly.
87 *
88 * 2. We are a follower and relayed the command to the leader.
89 *
90 * 3. We are the leader and a follower relayed the command to us.
91 */
92 struct raft_command {
93 /* All cases. */
94 struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
95 unsigned int n_refs; /* Reference count. */
96 enum raft_command_status status; /* Execution status. */
97
98 /* Case 1 only. */
99 uint64_t index; /* Index in log (0 if being relayed). */
100
101 /* Cases 2 and 3. */
102 struct uuid eid; /* Entry ID of result. */
103
104 /* Case 2 only. */
105 long long int timestamp; /* Issue or last ping time, for expiration. */
106
107 /* Case 3 only. */
108 struct uuid sid; /* The follower (otherwise UUID_ZERO). */
109 };
110
111 static void raft_command_complete(struct raft *, struct raft_command *,
112 enum raft_command_status);
113
114 static void raft_complete_all_commands(struct raft *,
115 enum raft_command_status);
116
117 /* Type of deferred action, see struct raft_waiter. */
118 enum raft_waiter_type {
119 RAFT_W_ENTRY,
120 RAFT_W_TERM,
121 RAFT_W_RPC,
122 };
123
124 /* An action deferred until a log write commits to disk. */
125 struct raft_waiter {
126 struct ovs_list list_node;
127 uint64_t commit_ticket;
128
129 enum raft_waiter_type type;
130 union {
131 /* RAFT_W_ENTRY.
132 *
133 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
134 * completion, updates 'log_synced' to indicate that the new log entry
135 * or entries are committed and, if we are leader, also updates our
136 * local 'match_index'. */
137 struct {
138 uint64_t index;
139 } entry;
140
141 /* RAFT_W_TERM.
142 *
143 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
144 * Upon completion, updates 'synced_term' and 'synced_vote', which
145 * triggers sending RPCs deferred by the uncommitted 'term' and
146 * 'vote'. */
147 struct {
148 uint64_t term;
149 struct uuid vote;
150 } term;
151
152 /* RAFT_W_RPC.
153 *
154 * Sometimes, sending an RPC to a peer must be delayed until an entry,
155 * a term, or a vote mentioned in the RPC is synced to disk. This
156 * waiter keeps a copy of such an RPC until the previous waiters have
157 * committed. */
158 union raft_rpc *rpc;
159 };
160 };
161
162 static struct raft_waiter *raft_waiter_create(struct raft *,
163 enum raft_waiter_type,
164 bool start_commit);
165 static void raft_waiters_destroy(struct raft *);
166
167 /* The Raft state machine. */
168 struct raft {
169 struct hmap_node hmap_node; /* In 'all_rafts'. */
170 struct ovsdb_log *log;
171
172 /* Persistent derived state.
173 *
174 * This must be updated on stable storage before responding to RPCs. It can be
175 * derived from the header, snapshot, and log in 'log'. */
176
177 struct uuid cid; /* Cluster ID (immutable for the cluster). */
178 struct uuid sid; /* Server ID (immutable for the server). */
179 char *local_address; /* Local address (immutable for the server). */
180 char *local_nickname; /* Used for local server in log messages. */
181 char *name; /* Schema name (immutable for the cluster). */
182
183 /* Contains "struct raft_server"s and represents the server configuration
184 * most recently added to 'log'. */
185 struct hmap servers;
186
187 /* Persistent state on all servers.
188 *
189 * Must be updated on stable storage before responding to RPCs. */
190
191 /* Current term and the vote for that term. These might be on the way to
192 * disk now. */
193 uint64_t term; /* Initialized to 0 and only increases. */
194 struct uuid vote; /* All-zeros if no vote yet in 'term'. */
195
196 /* The term and vote that have been synced to disk. */
197 uint64_t synced_term;
198 struct uuid synced_vote;
199
200 /* The log.
201 *
202 * A log entry with index 1 never really exists; the initial snapshot for a
203 * Raft is considered to include this index. The first real log entry has
204 * index 2.
205 *
206 * A new Raft instance contains an empty log: log_start=2, log_end=2.
207 * Over time, the log grows: log_start=2, log_end=N.
208 * At some point, the server takes a snapshot: log_start=N, log_end=N.
209 * The log continues to grow: log_start=N, log_end=N+1...
210 *
211 * Must be updated on stable storage before responding to RPCs. */
212 struct raft_entry *entries; /* Log entry i is in log[i - log_start]. */
213 uint64_t log_start; /* Index of first entry in log. */
214 uint64_t log_end; /* Index of last entry in log, plus 1. */
215 uint64_t log_synced; /* Index of last synced entry. */
216 size_t allocated_log; /* Allocated entries in 'log'. */
217
218 /* Snapshot state (see Figure 5.1)
219 *
220 * This is the state of the cluster as of the last discarded log entry,
221 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
222 * Only committed log entries can be included in a snapshot. */
223 struct raft_entry snap;
224
225 /* Volatile state.
226 *
227 * The snapshot is always committed, but the rest of the log might not be yet.
228 * 'last_applied' tracks what entries have been passed to the client. If the
229 * client hasn't yet read the latest snapshot, then even the snapshot isn't
230 * applied yet. Thus, the invariants are different for these members:
231 *
232 * log_start - 2 <= last_applied <= commit_index < log_end.
233 * log_start - 1 <= commit_index < log_end.
234 */
235
236 enum raft_role role; /* Current role. */
237 uint64_t commit_index; /* Max log index known to be committed. */
238 uint64_t last_applied; /* Max log index applied to state machine. */
239 struct uuid leader_sid; /* Server ID of leader (zero, if unknown). */
240
241 /* Followers and candidates only. */
242 #define ELECTION_BASE_MSEC 1024
243 #define ELECTION_RANGE_MSEC 1024
244 long long int election_base; /* Time of last heartbeat from leader. */
245 long long int election_timeout; /* Time at which we start an election. */
246
247 /* Used for joining a cluster. */
248 bool joining; /* Attempting to join the cluster? */
249 struct sset remote_addresses; /* Addresses to try to find other servers. */
250 long long int join_timeout; /* Time to re-send add server request. */
251
252 /* Used for leaving a cluster. */
253 bool leaving; /* True if we are leaving the cluster. */
254 bool left; /* True if we have finished leaving. */
255 long long int leave_timeout; /* Time to re-send remove server request. */
256
257 /* Failure. */
258 bool failed; /* True if unrecoverable error has occurred. */
259
260 /* File synchronization. */
261 struct ovs_list waiters; /* Contains "struct raft_waiter"s. */
262
263 /* Network connections. */
264 struct pstream *listener; /* For connections from other Raft servers. */
265 long long int listen_backoff; /* For retrying creating 'listener'. */
266 struct ovs_list conns; /* Contains struct raft_conns. */
267
268 /* Leaders only. Reinitialized after becoming leader. */
269 struct hmap add_servers; /* Contains "struct raft_server"s to add. */
270 struct raft_server *remove_server; /* Server being removed. */
271 struct hmap commands; /* Contains "struct raft_command"s. */
272 #define PING_TIME_MSEC (ELECTION_BASE_MSEC / 3)
273 long long int ping_timeout; /* Time at which to send a heartbeat */
274
275 /* Candidates only. Reinitialized at start of election. */
276 int n_votes; /* Number of votes for me. */
277 };
278
279 /* All Raft structures. */
280 static struct hmap all_rafts = HMAP_INITIALIZER(&all_rafts);
281
282 static void raft_init(void);
283
284 static struct ovsdb_error *raft_read_header(struct raft *)
285 OVS_WARN_UNUSED_RESULT;
286
287 static void raft_send_execute_command_reply(struct raft *,
288 const struct uuid *sid,
289 const struct uuid *eid,
290 enum raft_command_status,
291 uint64_t commit_index);
292
293 static void raft_update_our_match_index(struct raft *, uint64_t min_index);
294
295 static void raft_send_remove_server_reply__(
296 struct raft *, const struct uuid *target_sid,
297 const struct uuid *requester_sid, struct unixctl_conn *requester_conn,
298 bool success, const char *comment);
299
300 static void raft_server_init_leader(struct raft *, struct raft_server *);
301
302 static bool raft_rpc_is_heartbeat(const union raft_rpc *);
303 static bool raft_is_rpc_synced(const struct raft *, const union raft_rpc *);
304
305 static void raft_handle_rpc(struct raft *, const union raft_rpc *);
306 static bool raft_send(struct raft *, const union raft_rpc *);
307 static bool raft_send__(struct raft *, const union raft_rpc *,
308 struct raft_conn *);
309 static void raft_send_append_request(struct raft *,
310 struct raft_server *, unsigned int n,
311 const char *comment);
312
313 static void raft_become_leader(struct raft *);
314 static void raft_become_follower(struct raft *);
315 static void raft_reset_timer(struct raft *);
316 static void raft_send_heartbeats(struct raft *);
317 static void raft_start_election(struct raft *, bool leadership_transfer);
318 static bool raft_truncate(struct raft *, uint64_t new_end);
319 static void raft_get_servers_from_log(struct raft *, enum vlog_level);
320
321 static bool raft_handle_write_error(struct raft *, struct ovsdb_error *);
322
323 static void raft_run_reconfigure(struct raft *);
324
325 static struct raft_server *
326 raft_find_server(const struct raft *raft, const struct uuid *sid)
327 {
328 return raft_server_find(&raft->servers, sid);
329 }
330
331 static char *
332 raft_make_address_passive(const char *address_)
333 {
334 if (!strncmp(address_, "unix:", 5)) {
335 return xasprintf("p%s", address_);
336 } else {
337 char *address = xstrdup(address_);
338 char *p = strchr(address, ':') + 1;
339 char *host = inet_parse_token(&p);
340 char *port = inet_parse_token(&p);
341
342 struct ds paddr = DS_EMPTY_INITIALIZER;
343 ds_put_format(&paddr, "p%.3s:%s:", address, port);
344 if (strchr(host, ':')) {
345 ds_put_format(&paddr, "[%s]", host);
346 } else {
347 ds_put_cstr(&paddr, host);
348 }
349 free(address);
350 return ds_steal_cstr(&paddr);
351 }
352 }
353
354 static struct raft *
355 raft_alloc(void)
356 {
357 raft_init();
358
359 struct raft *raft = xzalloc(sizeof *raft);
360 hmap_node_nullify(&raft->hmap_node);
361 hmap_init(&raft->servers);
362 raft->log_start = raft->log_end = 1;
363 raft->role = RAFT_FOLLOWER;
364 sset_init(&raft->remote_addresses);
365 raft->join_timeout = LLONG_MAX;
366 ovs_list_init(&raft->waiters);
367 raft->listen_backoff = LLONG_MIN;
368 ovs_list_init(&raft->conns);
369 hmap_init(&raft->add_servers);
370 hmap_init(&raft->commands);
371
372 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
373 raft_reset_timer(raft);
374
375 return raft;
376 }
377
378 /* Creates an on-disk file that represents a new Raft cluster and initializes
379 * it to consist of a single server, the one on which this function is called.
380 *
381 * Creates the local copy of the cluster's log in 'file_name', which must not
382 * already exist. Gives it the name 'name', which should be the database
383 * schema name and which is used only to match up this database with the server
384 * added to the cluster later if the cluster ID is unavailable.
385 *
386 * The new server is located at 'local_address', which must take one of the
387 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
388 * square bracket enclosed IPv6 address and PORT is a TCP port number.
389 *
390 * This only creates the on-disk file. Use raft_open() to start operating the
391 * new server.
392 *
393 * Returns null if successful, otherwise an ovsdb_error describing the
394 * problem. */
395 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
396 raft_create_cluster(const char *file_name, const char *name,
397 const char *local_address, const struct json *data)
398 {
399 /* Parse and verify validity of the local address. */
400 struct ovsdb_error *error = raft_address_validate(local_address);
401 if (error) {
402 return error;
403 }
404
405 /* Create log file. */
406 struct ovsdb_log *log;
407 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
408 -1, &log);
409 if (error) {
410 return error;
411 }
412
413 /* Write log file. */
414 struct raft_header h = {
415 .sid = uuid_random(),
416 .cid = uuid_random(),
417 .name = xstrdup(name),
418 .local_address = xstrdup(local_address),
419 .joining = false,
420 .remote_addresses = SSET_INITIALIZER(&h.remote_addresses),
421 .snap_index = 1,
422 .snap = {
423 .term = 1,
424 .data = json_nullable_clone(data),
425 .eid = uuid_random(),
426 .servers = json_object_create(),
427 },
428 };
429 shash_add_nocopy(json_object(h.snap.servers),
430 xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
431 json_string_create(local_address));
432 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
433 raft_header_uninit(&h);
434 if (!error) {
435 error = ovsdb_log_commit_block(log);
436 }
437 ovsdb_log_close(log);
438
439 return error;
440 }
441
442 /* Creates a database file that represents a new server in an existing Raft
443 * cluster.
444 *
445 * Creates the local copy of the cluster's log in 'file_name', which must not
446 * already exist. Gives it the name 'name', which must be the same name
447 * passed in to raft_create_cluster() earlier.
448 *
449 * 'cid' is optional. If specified, the new server will join only the cluster
450 * with the given cluster ID.
451 *
452 * The new server is located at 'local_address', which must take one of the
453 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
454 * square bracket enclosed IPv6 address and PORT is a TCP port number.
455 *
456 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
457 * specifies the addresses of existing servers in the cluster. One server out
458 * of the existing cluster is sufficient, as long as that server is reachable
459 * and not partitioned from the current cluster leader. If multiple servers
460 * from the cluster are specified, then it is sufficient for any of them to
461 * meet this criterion.
462 *
463 * This only creates the on-disk file and does no network access. Use
464 * raft_open() to start operating the new server. (Until this happens, the
465 * new server has not joined the cluster.)
466 *
467 * Returns null if successful, otherwise an ovsdb_error describing the
468 * problem. */
469 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
470 raft_join_cluster(const char *file_name,
471 const char *name, const char *local_address,
472 const struct sset *remote_addresses,
473 const struct uuid *cid)
474 {
475 ovs_assert(!sset_is_empty(remote_addresses));
476
477 /* Parse and verify validity of the addresses. */
478 struct ovsdb_error *error = raft_address_validate(local_address);
479 if (error) {
480 return error;
481 }
482 const char *addr;
483 SSET_FOR_EACH (addr, remote_addresses) {
484 error = raft_address_validate(addr);
485 if (error) {
486 return error;
487 }
488 if (!strcmp(addr, local_address)) {
489 return ovsdb_error(NULL, "remote addresses cannot be the same "
490 "as the local address");
491 }
492 }
493
494 /* Verify validity of the cluster ID (if provided). */
495 if (cid && uuid_is_zero(cid)) {
496 return ovsdb_error(NULL, "all-zero UUID is not valid cluster ID");
497 }
498
499 /* Create log file. */
500 struct ovsdb_log *log;
501 error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
502 -1, &log);
503 if (error) {
504 return error;
505 }
506
507 /* Write log file. */
508 struct raft_header h = {
509 .sid = uuid_random(),
510 .cid = cid ? *cid : UUID_ZERO,
511 .name = xstrdup(name),
512 .local_address = xstrdup(local_address),
513 .joining = true,
514 /* No snapshot yet. */
515 };
516 sset_clone(&h.remote_addresses, remote_addresses);
517 error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
518 raft_header_uninit(&h);
519 if (!error) {
520 error = ovsdb_log_commit_block(log);
521 }
522 ovsdb_log_close(log);
523
524 return error;
525 }
526
527 /* Reads the initial header record from 'log', which must be a Raft clustered
528 * database log, and populates '*md' with the information read from it. The
529 * caller must eventually destroy 'md' with raft_metadata_destroy().
530 *
531 * On success, returns NULL. On failure, returns an error that the caller must
532 * eventually destroy and zeros '*md'. */
533 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
534 raft_read_metadata(struct ovsdb_log *log, struct raft_metadata *md)
535 {
536 struct raft *raft = raft_alloc();
537 raft->log = log;
538
539 struct ovsdb_error *error = raft_read_header(raft);
540 if (!error) {
541 md->sid = raft->sid;
542 md->name = xstrdup(raft->name);
543 md->local = xstrdup(raft->local_address);
544 md->cid = raft->cid;
545 } else {
546 memset(md, 0, sizeof *md);
547 }
548
549 raft->log = NULL;
550 raft_close(raft);
551 return error;
552 }
553
554 /* Frees the metadata in 'md'. */
555 void
556 raft_metadata_destroy(struct raft_metadata *md)
557 {
558 if (md) {
559 free(md->name);
560 free(md->local);
561 }
562 }
563
564 static const struct raft_entry *
565 raft_get_entry(const struct raft *raft, uint64_t index)
566 {
567 ovs_assert(index >= raft->log_start);
568 ovs_assert(index < raft->log_end);
569 return &raft->entries[index - raft->log_start];
570 }
571
572 static uint64_t
573 raft_get_term(const struct raft *raft, uint64_t index)
574 {
575 return (index == raft->log_start - 1
576 ? raft->snap.term
577 : raft_get_entry(raft, index)->term);
578 }
579
580 static const struct json *
581 raft_servers_for_index(const struct raft *raft, uint64_t index)
582 {
583 ovs_assert(index >= raft->log_start - 1);
584 ovs_assert(index < raft->log_end);
585
586 const struct json *servers = raft->snap.servers;
587 for (uint64_t i = raft->log_start; i <= index; i++) {
588 const struct raft_entry *e = raft_get_entry(raft, i);
589 if (e->servers) {
590 servers = e->servers;
591 }
592 }
593 return servers;
594 }
595
596 static void
597 raft_set_servers(struct raft *raft, const struct hmap *new_servers,
598 enum vlog_level level)
599 {
600 struct raft_server *s, *next;
601 HMAP_FOR_EACH_SAFE (s, next, hmap_node, &raft->servers) {
602 if (!raft_server_find(new_servers, &s->sid)) {
603 ovs_assert(s != raft->remove_server);
604
605 hmap_remove(&raft->servers, &s->hmap_node);
606 VLOG(level, "server %s removed from configuration", s->nickname);
607 raft_server_destroy(s);
608 }
609 }
610
611 HMAP_FOR_EACH_SAFE (s, next, hmap_node, new_servers) {
612 if (!raft_find_server(raft, &s->sid)) {
613 VLOG(level, "server %s added to configuration", s->nickname);
614
615 struct raft_server *new
616 = raft_server_add(&raft->servers, &s->sid, s->address);
617 raft_server_init_leader(raft, new);
618 }
619 }
620 }
621
622 static uint64_t
623 raft_add_entry(struct raft *raft,
624 uint64_t term, struct json *data, const struct uuid *eid,
625 struct json *servers)
626 {
627 if (raft->log_end - raft->log_start >= raft->allocated_log) {
628 raft->entries = x2nrealloc(raft->entries, &raft->allocated_log,
629 sizeof *raft->entries);
630 }
631
632 uint64_t index = raft->log_end++;
633 struct raft_entry *entry = &raft->entries[index - raft->log_start];
634 entry->term = term;
635 entry->data = data;
636 entry->eid = eid ? *eid : UUID_ZERO;
637 entry->servers = servers;
638 return index;
639 }
640
641 /* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers' to
642 * 'raft''s log and returns an error indication. */
643 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
644 raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
645 const struct uuid *eid, struct json *servers)
646 {
647 struct raft_record r = {
648 .type = RAFT_REC_ENTRY,
649 .term = term,
650 .entry = {
651 .index = raft_add_entry(raft, term, data, eid, servers),
652 .data = data,
653 .servers = servers,
654 .eid = eid ? *eid : UUID_ZERO,
655 },
656 };
657 return ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r));
658 }
659
660 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
661 raft_write_state(struct ovsdb_log *log,
662 uint64_t term, const struct uuid *vote)
663 {
664 struct raft_record r = { .term = term };
665 if (vote && !uuid_is_zero(vote)) {
666 r.type = RAFT_REC_VOTE;
667 r.sid = *vote;
668 } else {
669 r.type = RAFT_REC_TERM;
670 }
671 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
672 }
673
674 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
675 raft_apply_record(struct raft *raft, unsigned long long int rec_idx,
676 const struct raft_record *r)
677 {
678 /* Apply "term", which is present in most kinds of records (and otherwise
679 * 0).
680 *
681 * A Raft leader can replicate entries from previous terms to the other
682 * servers in the cluster, retaining the original terms on those entries
683 * (see section 3.6.2 "Committing entries from previous terms" for more
684 * information), so it's OK for the term in a log record to precede the
685 * current term. */
686 if (r->term > raft->term) {
687 raft->term = raft->synced_term = r->term;
688 raft->vote = raft->synced_vote = UUID_ZERO;
689 }
690
691 switch (r->type) {
692 case RAFT_REC_ENTRY:
693 if (r->entry.index < raft->commit_index) {
694 return ovsdb_error(NULL, "record %llu attempts to truncate log "
695 "from %"PRIu64" to %"PRIu64" entries, but "
696 "commit index is already %"PRIu64,
697 rec_idx, raft->log_end, r->entry.index,
698 raft->commit_index);
699 } else if (r->entry.index > raft->log_end) {
700 return ovsdb_error(NULL, "record %llu with index %"PRIu64" skips "
701 "past expected index %"PRIu64,
702 rec_idx, r->entry.index, raft->log_end);
703 }
704
705 if (r->entry.index < raft->log_end) {
706 /* This can happen, but it is notable. */
707 VLOG_DBG("record %llu truncates log from %"PRIu64" to %"PRIu64
708 " entries", rec_idx, raft->log_end, r->entry.index);
709 raft_truncate(raft, r->entry.index);
710 }
711
712 uint64_t prev_term = (raft->log_end > raft->log_start
713 ? raft->entries[raft->log_end
714 - raft->log_start - 1].term
715 : raft->snap.term);
716 if (r->term < prev_term) {
717 return ovsdb_error(NULL, "record %llu with index %"PRIu64" term "
718 "%"PRIu64" precedes previous entry's term "
719 "%"PRIu64,
720 rec_idx, r->entry.index, r->term, prev_term);
721 }
722
723 raft->log_synced = raft_add_entry(
724 raft, r->term,
725 json_nullable_clone(r->entry.data), &r->entry.eid,
726 json_nullable_clone(r->entry.servers));
727 return NULL;
728
729 case RAFT_REC_TERM:
730 return NULL;
731
732 case RAFT_REC_VOTE:
733 if (r->term < raft->term) {
734 return ovsdb_error(NULL, "record %llu votes for term %"PRIu64" "
735 "but current term is %"PRIu64,
736 rec_idx, r->term, raft->term);
737 } else if (!uuid_is_zero(&raft->vote)
738 && !uuid_equals(&raft->vote, &r->sid)) {
739 return ovsdb_error(NULL, "record %llu votes for "SID_FMT" in term "
740 "%"PRIu64" but a previous record for the "
741 "same term voted for "SID_FMT, rec_idx,
742 SID_ARGS(&raft->vote), r->term,
743 SID_ARGS(&r->sid));
744 } else {
745 raft->vote = raft->synced_vote = r->sid;
746 return NULL;
747 }
748 break;
749
750 case RAFT_REC_NOTE:
751 if (!strcmp(r->note, "left")) {
752 return ovsdb_error(NULL, "record %llu indicates server has left "
753 "the cluster; it cannot be added back (use "
754 "\"ovsdb-tool join-cluster\" to add a new "
755 "server)", rec_idx);
756 }
757 return NULL;
758
759 case RAFT_REC_COMMIT_INDEX:
760 if (r->commit_index < raft->commit_index) {
761 return ovsdb_error(NULL, "record %llu regresses commit index "
762 "from %"PRIu64 " to %"PRIu64,
763 rec_idx, raft->commit_index, r->commit_index);
764 } else if (r->commit_index >= raft->log_end) {
765 return ovsdb_error(NULL, "record %llu advances commit index to "
766 "%"PRIu64 " but last log index is %"PRIu64,
767 rec_idx, r->commit_index, raft->log_end - 1);
768 } else {
769 raft->commit_index = r->commit_index;
770 return NULL;
771 }
772 break;
773
774 case RAFT_REC_LEADER:
775 /* XXX we could use this to take back leadership for quick restart */
776 return NULL;
777
778 default:
779 OVS_NOT_REACHED();
780 }
781 }
782
783 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
784 raft_read_header(struct raft *raft)
785 {
786 /* Read header record. */
787 struct json *json;
788 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
789 if (error || !json) {
790 /* Report error or end-of-file. */
791 return error;
792 }
793 ovsdb_log_mark_base(raft->log);
794
795 struct raft_header h;
796 error = raft_header_from_json(&h, json);
797 json_destroy(json);
798 if (error) {
799 return error;
800 }
801
802 raft->sid = h.sid;
803 raft->cid = h.cid;
804 raft->name = xstrdup(h.name);
805 raft->local_address = xstrdup(h.local_address);
806 raft->local_nickname = raft_address_to_nickname(h.local_address, &h.sid);
807 raft->joining = h.joining;
808
809 if (h.joining) {
810 sset_clone(&raft->remote_addresses, &h.remote_addresses);
811 } else {
812 raft_entry_clone(&raft->snap, &h.snap);
813 raft->log_start = raft->log_end = h.snap_index + 1;
814 raft->commit_index = h.snap_index;
815 raft->last_applied = h.snap_index - 1;
816 }
817
818 raft_header_uninit(&h);
819
820 return NULL;
821 }
822
823 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
824 raft_read_log(struct raft *raft)
825 {
826 for (unsigned long long int i = 1; ; i++) {
827 struct json *json;
828 struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
829 if (!json) {
830 if (error) {
831 /* We assume that the error is due to a partial write while
832 * appending to the file before a crash, so log it and
833 * continue. */
834 char *error_string = ovsdb_error_to_string_free(error);
835 VLOG_WARN("%s", error_string);
836 free(error_string);
837 error = NULL;
838 }
839 break;
840 }
841
842 struct raft_record r;
843 error = raft_record_from_json(&r, json);
844 if (!error) {
845 error = raft_apply_record(raft, i, &r);
846 raft_record_uninit(&r);
847 }
848 if (error) {
849 return ovsdb_wrap_error(error, "error reading record %llu from "
850 "%s log", i, raft->name);
851 }
852 }
853
854 /* Set the most recent servers. */
855 raft_get_servers_from_log(raft, VLL_DBG);
856
857 return NULL;
858 }
859
860 static void
861 raft_reset_timer(struct raft *raft)
862 {
863 unsigned int duration = (ELECTION_BASE_MSEC
864 + random_range(ELECTION_RANGE_MSEC));
865 raft->election_base = time_msec();
866 raft->election_timeout = raft->election_base + duration;
867 }
868
869 static void
870 raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
871 const struct uuid *sid, bool incoming)
872 {
873 struct raft_conn *conn = xzalloc(sizeof *conn);
874 ovs_list_push_back(&raft->conns, &conn->list_node);
875 conn->js = js;
876 if (sid) {
877 conn->sid = *sid;
878 }
879 conn->nickname = raft_address_to_nickname(jsonrpc_session_get_name(js),
880 &conn->sid);
881 conn->incoming = incoming;
882 conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
883 }
884
885 /* Starts the local server in an existing Raft cluster, using the local copy of
886 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
887 * successful or not. */
888 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
889 raft_open(struct ovsdb_log *log, struct raft **raftp)
890 {
891 struct raft *raft = raft_alloc();
892 raft->log = log;
893
894 struct ovsdb_error *error = raft_read_header(raft);
895 if (error) {
896 goto error;
897 }
898
899 if (!raft->joining) {
900 error = raft_read_log(raft);
901 if (error) {
902 goto error;
903 }
904
905 /* Find our own server. */
906 if (!raft_find_server(raft, &raft->sid)) {
907 error = ovsdb_error(NULL, "server does not belong to cluster");
908 goto error;
909 }
910
911 /* If there's only one server, start an election right away so that the
912 * cluster bootstraps quickly. */
913 if (hmap_count(&raft->servers) == 1) {
914 raft_start_election(raft, false);
915 }
916 } else {
917 raft->join_timeout = time_msec() + 1000;
918 }
919
920 *raftp = raft;
921 hmap_insert(&all_rafts, &raft->hmap_node, hash_string(raft->name, 0));
922 return NULL;
923
924 error:
925 raft_close(raft);
926 *raftp = NULL;
927 return error;
928 }
929
930 /* Returns the name of 'raft', which in OVSDB is the database schema name. */
931 const char *
932 raft_get_name(const struct raft *raft)
933 {
934 return raft->name;
935 }
936
937 /* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
938 * its cluster, then 'cid' will be all-zeros (unless the administrator
939 * specified a cluster ID running "ovsdb-tool join-cluster").
940 *
941 * Each cluster has a unique cluster ID. */
942 const struct uuid *
943 raft_get_cid(const struct raft *raft)
944 {
945 return &raft->cid;
946 }
947
948 /* Returns the server ID of 'raft'. Each server has a unique server ID. */
949 const struct uuid *
950 raft_get_sid(const struct raft *raft)
951 {
952 return &raft->sid;
953 }
954
955 /* Returns true if 'raft' has completed joining its cluster, has not left or
956 * initiated leaving the cluster, does not have failed disk storage, and is
957 * apparently connected to the leader in a healthy way (or is itself the
958 * leader).*/
959 bool
960 raft_is_connected(const struct raft *raft)
961 {
962 return (raft->role != RAFT_CANDIDATE
963 && !raft->joining
964 && !raft->leaving
965 && !raft->left
966 && !raft->failed);
967 }
968
969 /* Returns true if 'raft' is the cluster leader. */
970 bool
971 raft_is_leader(const struct raft *raft)
972 {
973 return raft->role == RAFT_LEADER;
974 }
975
976 /* Returns true if 'raft' is the process of joining its cluster. */
977 bool
978 raft_is_joining(const struct raft *raft)
979 {
980 return raft->joining;
981 }
982
983 /* Only returns *connected* connections. */
984 static struct raft_conn *
985 raft_find_conn_by_sid(struct raft *raft, const struct uuid *sid)
986 {
987 if (!uuid_is_zero(sid)) {
988 struct raft_conn *conn;
989 LIST_FOR_EACH (conn, list_node, &raft->conns) {
990 if (uuid_equals(sid, &conn->sid)
991 && jsonrpc_session_is_connected(conn->js)) {
992 return conn;
993 }
994 }
995 }
996 return NULL;
997 }
998
999 static struct raft_conn *
1000 raft_find_conn_by_address(struct raft *raft, const char *address)
1001 {
1002 struct raft_conn *conn;
1003 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1004 if (!strcmp(jsonrpc_session_get_name(conn->js), address)) {
1005 return conn;
1006 }
1007 }
1008 return NULL;
1009 }
1010
1011 static void OVS_PRINTF_FORMAT(3, 4)
1012 raft_record_note(struct raft *raft, const char *note,
1013 const char *comment_format, ...)
1014 {
1015 va_list args;
1016 va_start(args, comment_format);
1017 char *comment = xvasprintf(comment_format, args);
1018 va_end(args);
1019
1020 struct raft_record r = {
1021 .type = RAFT_REC_NOTE,
1022 .comment = comment,
1023 .note = CONST_CAST(char *, note),
1024 };
1025 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
1026
1027 free(comment);
1028 }
1029
1030 /* If we're leader, try to transfer leadership to another server, logging
1031 * 'reason' as the human-readable reason (it should be a phrase suitable for
1032 * following "because") . */
1033 void
1034 raft_transfer_leadership(struct raft *raft, const char *reason)
1035 {
1036 if (raft->role != RAFT_LEADER) {
1037 return;
1038 }
1039
1040 struct raft_server *s;
1041 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1042 if (!uuid_equals(&raft->sid, &s->sid)
1043 && s->phase == RAFT_PHASE_STABLE) {
1044 struct raft_conn *conn = raft_find_conn_by_sid(raft, &s->sid);
1045 if (!conn) {
1046 continue;
1047 }
1048
1049 union raft_rpc rpc = {
1050 .become_leader = {
1051 .common = {
1052 .comment = CONST_CAST(char *, reason),
1053 .type = RAFT_RPC_BECOME_LEADER,
1054 .sid = s->sid,
1055 },
1056 .term = raft->term,
1057 }
1058 };
1059 raft_send__(raft, &rpc, conn);
1060
1061 raft_record_note(raft, "transfer leadership",
1062 "transferring leadership to %s because %s",
1063 s->nickname, reason);
1064 break;
1065 }
1066 }
1067 }
1068
1069 /* Send a RemoveServerRequest to the rest of the servers in the cluster.
1070 *
1071 * If we know which server is the leader, we can just send the request to it.
1072 * However, we might not know which server is the leader, and we might never
1073 * find out if the remove request was actually previously committed by a
1074 * majority of the servers (because in that case the new leader will not send
1075 * AppendRequests or heartbeats to us). Therefore, we instead send
1076 * RemoveRequests to every server. This theoretically has the same problem, if
1077 * the current cluster leader was not previously a member of the cluster, but
1078 * it seems likely to be more robust in practice. */
1079 static void
1080 raft_send_remove_server_requests(struct raft *raft)
1081 {
1082 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1083 VLOG_INFO_RL(&rl, "sending remove request (joining=%s, leaving=%s)",
1084 raft->joining ? "true" : "false",
1085 raft->leaving ? "true" : "false");
1086 const struct raft_server *s;
1087 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1088 if (!uuid_equals(&s->sid, &raft->sid)) {
1089 union raft_rpc rpc = (union raft_rpc) {
1090 .remove_server_request = {
1091 .common = {
1092 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1093 .sid = s->sid,
1094 },
1095 .sid = raft->sid,
1096 },
1097 };
1098 raft_send(raft, &rpc);
1099 }
1100 }
1101
1102 raft->leave_timeout = time_msec() + ELECTION_BASE_MSEC;
1103 }
1104
1105 /* Attempts to start 'raft' leaving its cluster. The caller can check progress
1106 * using raft_is_leaving() and raft_left(). */
1107 void
1108 raft_leave(struct raft *raft)
1109 {
1110 if (raft->joining || raft->failed || raft->leaving || raft->left) {
1111 return;
1112 }
1113 VLOG_INFO(SID_FMT": starting to leave cluster "CID_FMT,
1114 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
1115 raft->leaving = true;
1116 raft_transfer_leadership(raft, "this server is leaving the cluster");
1117 raft_become_follower(raft);
1118 raft_send_remove_server_requests(raft);
1119 raft->leave_timeout = time_msec() + ELECTION_BASE_MSEC;
1120 }
1121
1122 /* Returns true if 'raft' is currently attempting to leave its cluster. */
1123 bool
1124 raft_is_leaving(const struct raft *raft)
1125 {
1126 return raft->leaving;
1127 }
1128
1129 /* Returns true if 'raft' successfully left its cluster. */
1130 bool
1131 raft_left(const struct raft *raft)
1132 {
1133 return raft->left;
1134 }
1135
1136 /* Returns true if 'raft' has experienced a disk I/O failure. When this
1137 * returns true, only closing and reopening 'raft' allows for recovery. */
1138 bool
1139 raft_failed(const struct raft *raft)
1140 {
1141 return raft->failed;
1142 }
1143
1144 /* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1145 * current cluster. */
1146 void
1147 raft_take_leadership(struct raft *raft)
1148 {
1149 if (raft->role != RAFT_LEADER) {
1150 raft_start_election(raft, true);
1151 }
1152 }
1153
1154 /* Closes everything owned by 'raft' that might be visible outside the process:
1155 * network connections, commands, etc. This is part of closing 'raft'; it is
1156 * also used if 'raft' has failed in an unrecoverable way. */
1157 static void
1158 raft_close__(struct raft *raft)
1159 {
1160 if (!hmap_node_is_null(&raft->hmap_node)) {
1161 hmap_remove(&all_rafts, &raft->hmap_node);
1162 hmap_node_nullify(&raft->hmap_node);
1163 }
1164
1165 raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
1166
1167 struct raft_server *rs = raft->remove_server;
1168 if (rs) {
1169 raft_send_remove_server_reply__(raft, &rs->sid, &rs->requester_sid,
1170 rs->requester_conn, false,
1171 RAFT_SERVER_SHUTDOWN);
1172 raft_server_destroy(raft->remove_server);
1173 raft->remove_server = NULL;
1174 }
1175
1176 struct raft_conn *conn, *next;
1177 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1178 raft_conn_close(conn);
1179 }
1180 }
1181
1182 /* Closes and frees 'raft'.
1183 *
1184 * A server's cluster membership is independent of whether the server is
1185 * actually running. When a server that is a member of a cluster closes, the
1186 * cluster treats this as a server failure. */
1187 void
1188 raft_close(struct raft *raft)
1189 {
1190 if (!raft) {
1191 return;
1192 }
1193
1194 raft_transfer_leadership(raft, "this server is shutting down");
1195
1196 raft_close__(raft);
1197
1198 ovsdb_log_close(raft->log);
1199
1200 raft_servers_destroy(&raft->servers);
1201
1202 for (uint64_t index = raft->log_start; index < raft->log_end; index++) {
1203 struct raft_entry *e = &raft->entries[index - raft->log_start];
1204 raft_entry_uninit(e);
1205 }
1206 free(raft->entries);
1207
1208 raft_entry_uninit(&raft->snap);
1209
1210 raft_waiters_destroy(raft);
1211
1212 raft_servers_destroy(&raft->add_servers);
1213
1214 hmap_destroy(&raft->commands);
1215
1216 pstream_close(raft->listener);
1217
1218 sset_destroy(&raft->remote_addresses);
1219 free(raft->local_address);
1220 free(raft->local_nickname);
1221 free(raft->name);
1222
1223 free(raft);
1224 }
1225
1226 static bool
1227 raft_conn_receive(struct raft *raft, struct raft_conn *conn,
1228 union raft_rpc *rpc)
1229 {
1230 struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
1231 if (!msg) {
1232 return false;
1233 }
1234
1235 struct ovsdb_error *error = raft_rpc_from_jsonrpc(&raft->cid, &raft->sid,
1236 msg, rpc);
1237 jsonrpc_msg_destroy(msg);
1238 if (error) {
1239 char *s = ovsdb_error_to_string_free(error);
1240 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn->js), s);
1241 free(s);
1242 return false;
1243 }
1244
1245 if (uuid_is_zero(&conn->sid)) {
1246 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1247 conn->sid = rpc->common.sid;
1248 VLOG_INFO_RL(&rl, "%s: learned server ID "SID_FMT,
1249 jsonrpc_session_get_name(conn->js), SID_ARGS(&conn->sid));
1250 } else if (!uuid_equals(&conn->sid, &rpc->common.sid)) {
1251 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1252 VLOG_WARN_RL(&rl, "%s: ignoring message with unexpected server ID "
1253 SID_FMT" (expected "SID_FMT")",
1254 jsonrpc_session_get_name(conn->js),
1255 SID_ARGS(&rpc->common.sid), SID_ARGS(&conn->sid));
1256 raft_rpc_uninit(rpc);
1257 return false;
1258 }
1259
1260 const char *address = (rpc->type == RAFT_RPC_HELLO_REQUEST
1261 ? rpc->hello_request.address
1262 : rpc->type == RAFT_RPC_ADD_SERVER_REQUEST
1263 ? rpc->add_server_request.address
1264 : NULL);
1265 if (address) {
1266 char *new_nickname = raft_address_to_nickname(address, &conn->sid);
1267 if (strcmp(conn->nickname, new_nickname)) {
1268 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
1269 VLOG_INFO_RL(&rl, "%s: learned remote address %s",
1270 jsonrpc_session_get_name(conn->js), address);
1271
1272 free(conn->nickname);
1273 conn->nickname = new_nickname;
1274 } else {
1275 free(new_nickname);
1276 }
1277 }
1278
1279 return true;
1280 }
1281
1282 static const char *
1283 raft_get_nickname(const struct raft *raft, const struct uuid *sid,
1284 char buf[SID_LEN + 1], size_t bufsize)
1285 {
1286 if (uuid_equals(sid, &raft->sid)) {
1287 return raft->local_nickname;
1288 }
1289
1290 const char *s = raft_servers_get_nickname__(&raft->servers, sid);
1291 if (s) {
1292 return s;
1293 }
1294
1295 return raft_servers_get_nickname(&raft->add_servers, sid, buf, bufsize);
1296 }
1297
1298 static void
1299 log_rpc(const union raft_rpc *rpc,
1300 const char *direction, const struct raft_conn *conn)
1301 {
1302 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
1303 if (!raft_rpc_is_heartbeat(rpc) && !VLOG_DROP_DBG(&rl)) {
1304 struct ds s = DS_EMPTY_INITIALIZER;
1305 raft_rpc_format(rpc, &s);
1306 VLOG_DBG("%s%s %s", direction, conn->nickname, ds_cstr(&s));
1307 ds_destroy(&s);
1308 }
1309 }
1310
1311 static void
1312 raft_send_add_server_request(struct raft *raft, struct raft_conn *conn)
1313 {
1314 union raft_rpc rq = {
1315 .add_server_request = {
1316 .common = {
1317 .type = RAFT_RPC_ADD_SERVER_REQUEST,
1318 .sid = UUID_ZERO,
1319 .comment = NULL,
1320 },
1321 .address = raft->local_address,
1322 },
1323 };
1324 raft_send__(raft, &rq, conn);
1325 }
1326
1327 static void
1328 raft_conn_run(struct raft *raft, struct raft_conn *conn)
1329 {
1330 jsonrpc_session_run(conn->js);
1331
1332 unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
1333 bool just_connected = (new_seqno != conn->js_seqno
1334 && jsonrpc_session_is_connected(conn->js));
1335 conn->js_seqno = new_seqno;
1336 if (just_connected) {
1337 if (raft->joining) {
1338 raft_send_add_server_request(raft, conn);
1339 } else if (raft->leaving) {
1340 union raft_rpc rq = {
1341 .remove_server_request = {
1342 .common = {
1343 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
1344 .sid = conn->sid,
1345 },
1346 .sid = raft->sid,
1347 },
1348 };
1349 raft_send__(raft, &rq, conn);
1350 } else {
1351 union raft_rpc rq = (union raft_rpc) {
1352 .hello_request = {
1353 .common = {
1354 .type = RAFT_RPC_HELLO_REQUEST,
1355 .sid = conn->sid,
1356 },
1357 .address = raft->local_address,
1358 },
1359 };
1360 raft_send__(raft, &rq, conn);
1361 }
1362 }
1363
1364 for (size_t i = 0; i < 50; i++) {
1365 union raft_rpc rpc;
1366 if (!raft_conn_receive(raft, conn, &rpc)) {
1367 break;
1368 }
1369
1370 log_rpc(&rpc, "<--", conn);
1371 raft_handle_rpc(raft, &rpc);
1372 raft_rpc_uninit(&rpc);
1373 }
1374 }
1375
1376 static void
1377 raft_waiter_complete_rpc(struct raft *raft, const union raft_rpc *rpc)
1378 {
1379 uint64_t term = raft_rpc_get_term(rpc);
1380 if (term && term < raft->term) {
1381 /* Drop the message because it's for an expired term. */
1382 return;
1383 }
1384
1385 if (!raft_is_rpc_synced(raft, rpc)) {
1386 /* This is a bug. A reply message is deferred because some state in
1387 * the message, such as a term or index, has not been committed to
1388 * disk, and they should only be completed when that commit is done.
1389 * But this message is being completed before the commit is finished.
1390 * Complain, and hope that someone reports the bug. */
1391 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1392 if (VLOG_DROP_ERR(&rl)) {
1393 return;
1394 }
1395
1396 struct ds s = DS_EMPTY_INITIALIZER;
1397
1398 if (term > raft->synced_term) {
1399 ds_put_format(&s, " because message term %"PRIu64" is "
1400 "past synced term %"PRIu64,
1401 term, raft->synced_term);
1402 }
1403
1404 uint64_t index = raft_rpc_get_min_sync_index(rpc);
1405 if (index > raft->log_synced) {
1406 ds_put_format(&s, " %s message index %"PRIu64" is past last "
1407 "synced index %"PRIu64,
1408 s.length ? "and" : "because",
1409 index, raft->log_synced);
1410 }
1411
1412 const struct uuid *vote = raft_rpc_get_vote(rpc);
1413 if (vote && !uuid_equals(vote, &raft->synced_vote)) {
1414 char buf1[SID_LEN + 1];
1415 char buf2[SID_LEN + 1];
1416 ds_put_format(&s, " %s vote %s differs from synced vote %s",
1417 s.length ? "and" : "because",
1418 raft_get_nickname(raft, vote, buf1, sizeof buf1),
1419 raft_get_nickname(raft, &raft->synced_vote,
1420 buf2, sizeof buf2));
1421 }
1422
1423 char buf[SID_LEN + 1];
1424 ds_put_format(&s, ": %s ",
1425 raft_get_nickname(raft, &rpc->common.sid,
1426 buf, sizeof buf));
1427 raft_rpc_format(rpc, &s);
1428 VLOG_ERR("internal error: deferred %s message completed "
1429 "but not ready to send%s",
1430 raft_rpc_type_to_string(rpc->type), ds_cstr(&s));
1431 ds_destroy(&s);
1432
1433 return;
1434 }
1435
1436 struct raft_conn *dst = raft_find_conn_by_sid(raft, &rpc->common.sid);
1437 if (dst) {
1438 raft_send__(raft, rpc, dst);
1439 }
1440 }
1441
1442 static void
1443 raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
1444 {
1445 switch (w->type) {
1446 case RAFT_W_ENTRY:
1447 if (raft->role == RAFT_LEADER) {
1448 raft_update_our_match_index(raft, w->entry.index);
1449 }
1450 raft->log_synced = w->entry.index;
1451 break;
1452
1453 case RAFT_W_TERM:
1454 raft->synced_term = w->term.term;
1455 raft->synced_vote = w->term.vote;
1456 break;
1457
1458 case RAFT_W_RPC:
1459 raft_waiter_complete_rpc(raft, w->rpc);
1460 break;
1461 }
1462 }
1463
1464 static void
1465 raft_waiter_destroy(struct raft_waiter *w)
1466 {
1467 if (!w) {
1468 return;
1469 }
1470
1471 ovs_list_remove(&w->list_node);
1472
1473 switch (w->type) {
1474 case RAFT_W_ENTRY:
1475 case RAFT_W_TERM:
1476 break;
1477
1478 case RAFT_W_RPC:
1479 raft_rpc_uninit(w->rpc);
1480 free(w->rpc);
1481 break;
1482 }
1483 free(w);
1484 }
1485
1486 static void
1487 raft_waiters_run(struct raft *raft)
1488 {
1489 if (ovs_list_is_empty(&raft->waiters)) {
1490 return;
1491 }
1492
1493 uint64_t cur = ovsdb_log_commit_progress(raft->log);
1494 struct raft_waiter *w, *next;
1495 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1496 if (cur < w->commit_ticket) {
1497 break;
1498 }
1499 raft_waiter_complete(raft, w);
1500 raft_waiter_destroy(w);
1501 }
1502 }
1503
1504 static void
1505 raft_waiters_wait(struct raft *raft)
1506 {
1507 struct raft_waiter *w;
1508 LIST_FOR_EACH (w, list_node, &raft->waiters) {
1509 ovsdb_log_commit_wait(raft->log, w->commit_ticket);
1510 break;
1511 }
1512 }
1513
1514 static void
1515 raft_waiters_destroy(struct raft *raft)
1516 {
1517 struct raft_waiter *w, *next;
1518 LIST_FOR_EACH_SAFE (w, next, list_node, &raft->waiters) {
1519 raft_waiter_destroy(w);
1520 }
1521 }
1522
1523 static bool OVS_WARN_UNUSED_RESULT
1524 raft_set_term(struct raft *raft, uint64_t term, const struct uuid *vote)
1525 {
1526 struct ovsdb_error *error = raft_write_state(raft->log, term, vote);
1527 if (!raft_handle_write_error(raft, error)) {
1528 return false;
1529 }
1530
1531 struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_TERM, true);
1532 raft->term = w->term.term = term;
1533 raft->vote = w->term.vote = vote ? *vote : UUID_ZERO;
1534 return true;
1535 }
1536
1537 static void
1538 raft_accept_vote(struct raft *raft, struct raft_server *s,
1539 const struct uuid *vote)
1540 {
1541 if (uuid_equals(&s->vote, vote)) {
1542 return;
1543 }
1544 if (!uuid_is_zero(&s->vote)) {
1545 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1546 char buf1[SID_LEN + 1];
1547 char buf2[SID_LEN + 1];
1548 VLOG_WARN_RL(&rl, "server %s changed its vote from %s to %s",
1549 s->nickname,
1550 raft_get_nickname(raft, &s->vote, buf1, sizeof buf1),
1551 raft_get_nickname(raft, vote, buf2, sizeof buf2));
1552 }
1553 s->vote = *vote;
1554 if (uuid_equals(vote, &raft->sid)
1555 && ++raft->n_votes > hmap_count(&raft->servers) / 2) {
1556 raft_become_leader(raft);
1557 }
1558 }
1559
1560 static void
1561 raft_start_election(struct raft *raft, bool leadership_transfer)
1562 {
1563 if (raft->leaving) {
1564 return;
1565 }
1566
1567 struct raft_server *me = raft_find_server(raft, &raft->sid);
1568 if (!me) {
1569 return;
1570 }
1571
1572 if (!raft_set_term(raft, raft->term + 1, &raft->sid)) {
1573 return;
1574 }
1575
1576 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
1577
1578 ovs_assert(raft->role != RAFT_LEADER);
1579 ovs_assert(hmap_is_empty(&raft->commands));
1580 raft->role = RAFT_CANDIDATE;
1581
1582 raft->n_votes = 0;
1583
1584 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1585 if (!VLOG_DROP_INFO(&rl)) {
1586 long long int now = time_msec();
1587 if (now >= raft->election_timeout) {
1588 VLOG_INFO("term %"PRIu64": %lld ms timeout expired, "
1589 "starting election",
1590 raft->term, now - raft->election_base);
1591 } else {
1592 VLOG_INFO("term %"PRIu64": starting election", raft->term);
1593 }
1594 }
1595 raft_reset_timer(raft);
1596
1597 struct raft_server *peer;
1598 HMAP_FOR_EACH (peer, hmap_node, &raft->servers) {
1599 peer->vote = UUID_ZERO;
1600 if (uuid_equals(&raft->sid, &peer->sid)) {
1601 continue;
1602 }
1603
1604 union raft_rpc rq = {
1605 .vote_request = {
1606 .common = {
1607 .type = RAFT_RPC_VOTE_REQUEST,
1608 .sid = peer->sid,
1609 },
1610 .term = raft->term,
1611 .last_log_index = raft->log_end - 1,
1612 .last_log_term = (
1613 raft->log_end > raft->log_start
1614 ? raft->entries[raft->log_end - raft->log_start - 1].term
1615 : raft->snap.term),
1616 .leadership_transfer = leadership_transfer,
1617 },
1618 };
1619 raft_send(raft, &rq);
1620 }
1621
1622 /* Vote for ourselves. */
1623 raft_accept_vote(raft, me, &raft->sid);
1624 }
1625
1626 static void
1627 raft_open_conn(struct raft *raft, const char *address, const struct uuid *sid)
1628 {
1629 if (strcmp(address, raft->local_address)
1630 && !raft_find_conn_by_address(raft, address)) {
1631 raft_add_conn(raft, jsonrpc_session_open(address, true), sid, false);
1632 }
1633 }
1634
1635 static void
1636 raft_conn_close(struct raft_conn *conn)
1637 {
1638 jsonrpc_session_close(conn->js);
1639 ovs_list_remove(&conn->list_node);
1640 free(conn->nickname);
1641 free(conn);
1642 }
1643
1644 /* Returns true if 'conn' should stay open, false if it should be closed. */
1645 static bool
1646 raft_conn_should_stay_open(struct raft *raft, struct raft_conn *conn)
1647 {
1648 /* Close the connection if it's actually dead. If necessary, we'll
1649 * initiate a new session later. */
1650 if (!jsonrpc_session_is_alive(conn->js)) {
1651 return false;
1652 }
1653
1654 /* Keep incoming sessions. We trust the originator to decide to drop
1655 * it. */
1656 if (conn->incoming) {
1657 return true;
1658 }
1659
1660 /* If we are joining the cluster, keep sessions to the remote addresses
1661 * that are supposed to be part of the cluster we're joining. */
1662 if (raft->joining && sset_contains(&raft->remote_addresses,
1663 jsonrpc_session_get_name(conn->js))) {
1664 return true;
1665 }
1666
1667 /* We have joined the cluster. If we did that "recently", then there is a
1668 * chance that we do not have the most recent server configuration log
1669 * entry. If so, it's a waste to disconnect from the servers that were in
1670 * remote_addresses and that will probably appear in the configuration,
1671 * just to reconnect to them a moment later when we do get the
1672 * configuration update. If we are not ourselves in the configuration,
1673 * then we know that there must be a new configuration coming up, so in
1674 * that case keep the connection. */
1675 if (!raft_find_server(raft, &raft->sid)) {
1676 return true;
1677 }
1678
1679 /* Keep the connection only if the server is part of the configuration. */
1680 return raft_find_server(raft, &conn->sid);
1681 }
1682
1683 /* Allows 'raft' to maintain the distributed log. Call this function as part
1684 * of the process's main loop. */
1685 void
1686 raft_run(struct raft *raft)
1687 {
1688 if (raft->left || raft->failed) {
1689 return;
1690 }
1691
1692 raft_waiters_run(raft);
1693
1694 if (!raft->listener && time_msec() >= raft->listen_backoff) {
1695 char *paddr = raft_make_address_passive(raft->local_address);
1696 int error = pstream_open(paddr, &raft->listener, DSCP_DEFAULT);
1697 if (error) {
1698 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1699 VLOG_WARN_RL(&rl, "%s: listen failed (%s)",
1700 paddr, ovs_strerror(error));
1701 raft->listen_backoff = time_msec() + 1000;
1702 }
1703 free(paddr);
1704 }
1705
1706 if (raft->listener) {
1707 struct stream *stream;
1708 int error = pstream_accept(raft->listener, &stream);
1709 if (!error) {
1710 raft_add_conn(raft, jsonrpc_session_open_unreliably(
1711 jsonrpc_open(stream), DSCP_DEFAULT), NULL,
1712 true);
1713 } else if (error != EAGAIN) {
1714 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1715 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
1716 pstream_get_name(raft->listener),
1717 ovs_strerror(error));
1718 }
1719 }
1720
1721 /* Run RPCs for all open sessions. */
1722 struct raft_conn *conn;
1723 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1724 raft_conn_run(raft, conn);
1725 }
1726
1727 /* Close unneeded sessions. */
1728 struct raft_conn *next;
1729 LIST_FOR_EACH_SAFE (conn, next, list_node, &raft->conns) {
1730 if (!raft_conn_should_stay_open(raft, conn)) {
1731 raft_conn_close(conn);
1732 }
1733 }
1734
1735 /* Open needed sessions. */
1736 struct raft_server *server;
1737 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
1738 raft_open_conn(raft, server->address, &server->sid);
1739 }
1740 if (raft->joining) {
1741 const char *address;
1742 SSET_FOR_EACH (address, &raft->remote_addresses) {
1743 raft_open_conn(raft, address, NULL);
1744 }
1745 }
1746
1747 if (!raft->joining && time_msec() >= raft->election_timeout) {
1748 raft_start_election(raft, false);
1749 }
1750
1751 if (raft->leaving && time_msec() >= raft->leave_timeout) {
1752 raft_send_remove_server_requests(raft);
1753 }
1754
1755 if (raft->joining && time_msec() >= raft->join_timeout) {
1756 raft->join_timeout = time_msec() + 1000;
1757 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1758 raft_send_add_server_request(raft, conn);
1759 }
1760 }
1761
1762 if (time_msec() >= raft->ping_timeout) {
1763 if (raft->role == RAFT_LEADER) {
1764 raft_send_heartbeats(raft);
1765 } else {
1766 long long int now = time_msec();
1767 struct raft_command *cmd, *next_cmd;
1768 HMAP_FOR_EACH_SAFE (cmd, next_cmd, hmap_node, &raft->commands) {
1769 if (cmd->timestamp
1770 && now - cmd->timestamp > ELECTION_BASE_MSEC) {
1771 raft_command_complete(raft, cmd, RAFT_CMD_TIMEOUT);
1772 }
1773 }
1774 }
1775 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
1776 }
1777
1778 /* Do this only at the end; if we did it as soon as we set raft->left or
1779 * raft->failed in handling the RemoveServerReply, then it could easily
1780 * cause references to freed memory in RPC sessions, etc. */
1781 if (raft->left || raft->failed) {
1782 raft_close__(raft);
1783 }
1784 }
1785
1786 static void
1787 raft_wait_session(struct jsonrpc_session *js)
1788 {
1789 if (js) {
1790 jsonrpc_session_wait(js);
1791 jsonrpc_session_recv_wait(js);
1792 }
1793 }
1794
1795 /* Causes the next call to poll_block() to wake up when 'raft' needs to do
1796 * something. */
1797 void
1798 raft_wait(struct raft *raft)
1799 {
1800 if (raft->left || raft->failed) {
1801 return;
1802 }
1803
1804 raft_waiters_wait(raft);
1805
1806 if (raft->listener) {
1807 pstream_wait(raft->listener);
1808 } else {
1809 poll_timer_wait_until(raft->listen_backoff);
1810 }
1811
1812 struct raft_conn *conn;
1813 LIST_FOR_EACH (conn, list_node, &raft->conns) {
1814 raft_wait_session(conn->js);
1815 }
1816
1817 if (!raft->joining) {
1818 poll_timer_wait_until(raft->election_timeout);
1819 } else {
1820 poll_timer_wait_until(raft->join_timeout);
1821 }
1822 if (raft->leaving) {
1823 poll_timer_wait_until(raft->leave_timeout);
1824 }
1825 if (raft->role == RAFT_LEADER || !hmap_is_empty(&raft->commands)) {
1826 poll_timer_wait_until(raft->ping_timeout);
1827 }
1828 }
1829
1830 static struct raft_waiter *
1831 raft_waiter_create(struct raft *raft, enum raft_waiter_type type,
1832 bool start_commit)
1833 {
1834 struct raft_waiter *w = xzalloc(sizeof *w);
1835 ovs_list_push_back(&raft->waiters, &w->list_node);
1836 w->commit_ticket = start_commit ? ovsdb_log_commit_start(raft->log) : 0;
1837 w->type = type;
1838 return w;
1839 }
1840
1841 /* Returns a human-readable representation of 'status' (or NULL if 'status' is
1842 * invalid). */
1843 const char *
1844 raft_command_status_to_string(enum raft_command_status status)
1845 {
1846 switch (status) {
1847 case RAFT_CMD_INCOMPLETE:
1848 return "operation still in progress";
1849 case RAFT_CMD_SUCCESS:
1850 return "success";
1851 case RAFT_CMD_NOT_LEADER:
1852 return "not leader";
1853 case RAFT_CMD_BAD_PREREQ:
1854 return "prerequisite check failed";
1855 case RAFT_CMD_LOST_LEADERSHIP:
1856 return "lost leadership";
1857 case RAFT_CMD_SHUTDOWN:
1858 return "server shutdown";
1859 case RAFT_CMD_IO_ERROR:
1860 return "I/O error";
1861 case RAFT_CMD_TIMEOUT:
1862 return "timeout";
1863 default:
1864 return NULL;
1865 }
1866 }
1867
1868 /* Converts human-readable status in 's' into status code in '*statusp'.
1869 * Returns true if successful, false if 's' is unknown. */
1870 bool
1871 raft_command_status_from_string(const char *s,
1872 enum raft_command_status *statusp)
1873 {
1874 for (enum raft_command_status status = 0; ; status++) {
1875 const char *s2 = raft_command_status_to_string(status);
1876 if (!s2) {
1877 *statusp = 0;
1878 return false;
1879 } else if (!strcmp(s, s2)) {
1880 *statusp = status;
1881 return true;
1882 }
1883 }
1884 }
1885
1886 static const struct uuid *
1887 raft_get_eid(const struct raft *raft, uint64_t index)
1888 {
1889 for (; index >= raft->log_start; index--) {
1890 const struct raft_entry *e = raft_get_entry(raft, index);
1891 if (e->data) {
1892 return &e->eid;
1893 }
1894 }
1895 return &raft->snap.eid;
1896 }
1897
1898 static const struct uuid *
1899 raft_current_eid(const struct raft *raft)
1900 {
1901 return raft_get_eid(raft, raft->log_end - 1);
1902 }
1903
1904 static struct raft_command *
1905 raft_command_create_completed(enum raft_command_status status)
1906 {
1907 ovs_assert(status != RAFT_CMD_INCOMPLETE);
1908
1909 struct raft_command *cmd = xzalloc(sizeof *cmd);
1910 cmd->n_refs = 1;
1911 cmd->status = status;
1912 return cmd;
1913 }
1914
1915 static struct raft_command *
1916 raft_command_create_incomplete(struct raft *raft, uint64_t index)
1917 {
1918 struct raft_command *cmd = xzalloc(sizeof *cmd);
1919 cmd->n_refs = 2; /* One for client, one for raft->commands. */
1920 cmd->index = index;
1921 cmd->status = RAFT_CMD_INCOMPLETE;
1922 hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
1923 return cmd;
1924 }
1925
1926 static struct raft_command * OVS_WARN_UNUSED_RESULT
1927 raft_command_initiate(struct raft *raft,
1928 const struct json *data, const struct json *servers,
1929 const struct uuid *eid)
1930 {
1931 /* Write to local log. */
1932 uint64_t index = raft->log_end;
1933 if (!raft_handle_write_error(
1934 raft, raft_write_entry(
1935 raft, raft->term, json_nullable_clone(data), eid,
1936 json_nullable_clone(servers)))) {
1937 return raft_command_create_completed(RAFT_CMD_IO_ERROR);
1938 }
1939
1940 struct raft_command *cmd = raft_command_create_incomplete(raft, index);
1941 if (eid) {
1942 cmd->eid = *eid;
1943 }
1944
1945 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index = cmd->index;
1946
1947 /* Write to remote logs. */
1948 struct raft_server *s;
1949 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
1950 if (!uuid_equals(&s->sid, &raft->sid) && s->next_index == index) {
1951 raft_send_append_request(raft, s, 1, "execute command");
1952 s->next_index++;
1953 }
1954 }
1955
1956 return cmd;
1957 }
1958
1959 static struct raft_command * OVS_WARN_UNUSED_RESULT
1960 raft_command_execute__(struct raft *raft,
1961 const struct json *data, const struct json *servers,
1962 const struct uuid *prereq, struct uuid *result)
1963 {
1964 if (raft->joining || raft->leaving || raft->left || raft->failed) {
1965 return raft_command_create_completed(RAFT_CMD_SHUTDOWN);
1966 }
1967
1968 if (raft->role != RAFT_LEADER) {
1969 /* Consider proxying the command to the leader. We can only do that if
1970 * we know the leader and the command does not change the set of
1971 * servers. We do not proxy commands without prerequisites, even
1972 * though we could, because in an OVSDB context a log entry doesn't
1973 * make sense without context. */
1974 if (servers || !data
1975 || raft->role != RAFT_FOLLOWER || uuid_is_zero(&raft->leader_sid)
1976 || !prereq) {
1977 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
1978 }
1979 }
1980
1981 struct uuid eid = data ? uuid_random() : UUID_ZERO;
1982 if (result) {
1983 *result = eid;
1984 }
1985
1986 if (raft->role != RAFT_LEADER) {
1987 const union raft_rpc rpc = {
1988 .execute_command_request = {
1989 .common = {
1990 .type = RAFT_RPC_EXECUTE_COMMAND_REQUEST,
1991 .sid = raft->leader_sid,
1992 },
1993 .data = CONST_CAST(struct json *, data),
1994 .prereq = *prereq,
1995 .result = eid,
1996 }
1997 };
1998 if (!raft_send(raft, &rpc)) {
1999 /* Couldn't send command, so it definitely failed. */
2000 return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
2001 }
2002
2003 struct raft_command *cmd = raft_command_create_incomplete(raft, 0);
2004 cmd->timestamp = time_msec();
2005 cmd->eid = eid;
2006 return cmd;
2007 }
2008
2009 const struct uuid *current_eid = raft_current_eid(raft);
2010 if (prereq && !uuid_equals(prereq, current_eid)) {
2011 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
2012 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
2013 "prerequisite "UUID_FMT,
2014 UUID_ARGS(current_eid), UUID_ARGS(prereq));
2015 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ);
2016 }
2017
2018 return raft_command_initiate(raft, data, servers, &eid);
2019 }
2020
2021 /* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2022 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2023 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2024 * populated with the entry ID for the new log entry.
2025 *
2026 * Returns a "struct raft_command" that may be used to track progress adding
2027 * the log entry. The caller must eventually free the returned structure, with
2028 * raft_command_unref(). */
2029 struct raft_command * OVS_WARN_UNUSED_RESULT
2030 raft_command_execute(struct raft *raft, const struct json *data,
2031 const struct uuid *prereq, struct uuid *result)
2032 {
2033 return raft_command_execute__(raft, data, NULL, prereq, result);
2034 }
2035
2036 /* Returns the status of 'cmd'. */
2037 enum raft_command_status
2038 raft_command_get_status(const struct raft_command *cmd)
2039 {
2040 ovs_assert(cmd->n_refs > 0);
2041 return cmd->status;
2042 }
2043
2044 /* Returns the index of the log entry at which 'cmd' was committed.
2045 *
2046 * This function works only with successful commands. */
2047 uint64_t
2048 raft_command_get_commit_index(const struct raft_command *cmd)
2049 {
2050 ovs_assert(cmd->n_refs > 0);
2051 ovs_assert(cmd->status == RAFT_CMD_SUCCESS);
2052 return cmd->index;
2053 }
2054
2055 /* Frees 'cmd'. */
2056 void
2057 raft_command_unref(struct raft_command *cmd)
2058 {
2059 if (cmd) {
2060 ovs_assert(cmd->n_refs > 0);
2061 if (!--cmd->n_refs) {
2062 free(cmd);
2063 }
2064 }
2065 }
2066
2067 /* Causes poll_block() to wake up when 'cmd' has status to report. */
2068 void
2069 raft_command_wait(const struct raft_command *cmd)
2070 {
2071 if (cmd->status != RAFT_CMD_INCOMPLETE) {
2072 poll_immediate_wake();
2073 }
2074 }
2075
2076 static void
2077 raft_command_complete(struct raft *raft,
2078 struct raft_command *cmd,
2079 enum raft_command_status status)
2080 {
2081 if (!uuid_is_zero(&cmd->sid)) {
2082 uint64_t commit_index = status == RAFT_CMD_SUCCESS ? cmd->index : 0;
2083 raft_send_execute_command_reply(raft, &cmd->sid, &cmd->eid, status,
2084 commit_index);
2085 }
2086
2087 ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
2088 ovs_assert(cmd->n_refs > 0);
2089 hmap_remove(&raft->commands, &cmd->hmap_node);
2090 cmd->status = status;
2091 raft_command_unref(cmd);
2092 }
2093
2094 static void
2095 raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
2096 {
2097 struct raft_command *cmd, *next;
2098 HMAP_FOR_EACH_SAFE (cmd, next, hmap_node, &raft->commands) {
2099 raft_command_complete(raft, cmd, status);
2100 }
2101 }
2102
2103 static struct raft_command *
2104 raft_find_command_by_index(struct raft *raft, uint64_t index)
2105 {
2106 struct raft_command *cmd;
2107
2108 HMAP_FOR_EACH_IN_BUCKET (cmd, hmap_node, index, &raft->commands) {
2109 if (cmd->index == index) {
2110 return cmd;
2111 }
2112 }
2113 return NULL;
2114 }
2115
2116 static struct raft_command *
2117 raft_find_command_by_eid(struct raft *raft, const struct uuid *eid)
2118 {
2119 struct raft_command *cmd;
2120
2121 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2122 if (uuid_equals(&cmd->eid, eid)) {
2123 return cmd;
2124 }
2125 }
2126 return NULL;
2127 }
2128 \f
2129 #define RAFT_RPC(ENUM, NAME) \
2130 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2131 RAFT_RPC_TYPES
2132 #undef RAFT_RPC
2133
2134 static void
2135 raft_handle_hello_request(struct raft *raft OVS_UNUSED,
2136 const struct raft_hello_request *hello OVS_UNUSED)
2137 {
2138 }
2139
2140 /* 'sid' is the server being added. */
2141 static void
2142 raft_send_add_server_reply__(struct raft *raft, const struct uuid *sid,
2143 const char *address,
2144 bool success, const char *comment)
2145 {
2146 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2147 if (!VLOG_DROP_INFO(&rl)) {
2148 struct ds s = DS_EMPTY_INITIALIZER;
2149 char buf[SID_LEN + 1];
2150 ds_put_format(&s, "adding %s ("SID_FMT" at %s) "
2151 "to cluster "CID_FMT" %s",
2152 raft_get_nickname(raft, sid, buf, sizeof buf),
2153 SID_ARGS(sid), address, CID_ARGS(&raft->cid),
2154 success ? "succeeded" : "failed");
2155 if (comment) {
2156 ds_put_format(&s, " (%s)", comment);
2157 }
2158 VLOG_INFO("%s", ds_cstr(&s));
2159 ds_destroy(&s);
2160 }
2161
2162 union raft_rpc rpy = {
2163 .add_server_reply = {
2164 .common = {
2165 .type = RAFT_RPC_ADD_SERVER_REPLY,
2166 .sid = *sid,
2167 .comment = CONST_CAST(char *, comment),
2168 },
2169 .success = success,
2170 }
2171 };
2172
2173 struct sset *remote_addresses = &rpy.add_server_reply.remote_addresses;
2174 sset_init(remote_addresses);
2175 if (!raft->joining) {
2176 struct raft_server *s;
2177 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2178 if (!uuid_equals(&s->sid, &raft->sid)) {
2179 sset_add(remote_addresses, s->address);
2180 }
2181 }
2182 }
2183
2184 raft_send(raft, &rpy);
2185
2186 sset_destroy(remote_addresses);
2187 }
2188
2189 static void
2190 raft_send_remove_server_reply_rpc(struct raft *raft, const struct uuid *sid,
2191 bool success, const char *comment)
2192 {
2193 const union raft_rpc rpy = {
2194 .remove_server_reply = {
2195 .common = {
2196 .type = RAFT_RPC_REMOVE_SERVER_REPLY,
2197 .sid = *sid,
2198 .comment = CONST_CAST(char *, comment),
2199 },
2200 .success = success,
2201 }
2202 };
2203 raft_send(raft, &rpy);
2204 }
2205
2206 static void
2207 raft_send_remove_server_reply__(struct raft *raft,
2208 const struct uuid *target_sid,
2209 const struct uuid *requester_sid,
2210 struct unixctl_conn *requester_conn,
2211 bool success, const char *comment)
2212 {
2213 struct ds s = DS_EMPTY_INITIALIZER;
2214 ds_put_format(&s, "request ");
2215 if (!uuid_is_zero(requester_sid)) {
2216 char buf[SID_LEN + 1];
2217 ds_put_format(&s, "by %s",
2218 raft_get_nickname(raft, requester_sid, buf, sizeof buf));
2219 } else {
2220 ds_put_cstr(&s, "via unixctl");
2221 }
2222 ds_put_cstr(&s, " to remove ");
2223 if (!requester_conn && uuid_equals(target_sid, requester_sid)) {
2224 ds_put_cstr(&s, "itself");
2225 } else {
2226 char buf[SID_LEN + 1];
2227 ds_put_cstr(&s, raft_get_nickname(raft, target_sid, buf, sizeof buf));
2228 }
2229 ds_put_format(&s, " from cluster "CID_FMT" %s",
2230 CID_ARGS(&raft->cid),
2231 success ? "succeeded" : "failed");
2232 if (comment) {
2233 ds_put_format(&s, " (%s)", comment);
2234 }
2235
2236 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
2237 VLOG_INFO_RL(&rl, "%s", ds_cstr(&s));
2238
2239 /* Send RemoveServerReply to the requester (which could be a server or a
2240 * unixctl connection. Also always send it to the removed server; this
2241 * allows it to be sure that it's really removed and update its log and
2242 * disconnect permanently. */
2243 if (!uuid_is_zero(requester_sid)) {
2244 raft_send_remove_server_reply_rpc(raft, requester_sid,
2245 success, comment);
2246 }
2247 if (!uuid_equals(requester_sid, target_sid)) {
2248 raft_send_remove_server_reply_rpc(raft, target_sid, success, comment);
2249 }
2250 if (requester_conn) {
2251 if (success) {
2252 unixctl_command_reply(requester_conn, ds_cstr(&s));
2253 } else {
2254 unixctl_command_reply_error(requester_conn, ds_cstr(&s));
2255 }
2256 }
2257
2258 ds_destroy(&s);
2259 }
2260
2261 static void
2262 raft_send_add_server_reply(struct raft *raft,
2263 const struct raft_add_server_request *rq,
2264 bool success, const char *comment)
2265 {
2266 return raft_send_add_server_reply__(raft, &rq->common.sid, rq->address,
2267 success, comment);
2268 }
2269
2270 static void
2271 raft_send_remove_server_reply(struct raft *raft,
2272 const struct raft_remove_server_request *rq,
2273 bool success, const char *comment)
2274 {
2275 return raft_send_remove_server_reply__(raft, &rq->sid, &rq->common.sid,
2276 rq->requester_conn, success,
2277 comment);
2278 }
2279
2280 static void
2281 raft_become_follower(struct raft *raft)
2282 {
2283 raft->leader_sid = UUID_ZERO;
2284 if (raft->role == RAFT_FOLLOWER) {
2285 return;
2286 }
2287
2288 raft->role = RAFT_FOLLOWER;
2289 raft_reset_timer(raft);
2290
2291 /* Notify clients about lost leadership.
2292 *
2293 * We do not reverse our changes to 'raft->servers' because the new
2294 * configuration is already part of the log. Possibly the configuration
2295 * log entry will not be committed, but until we know that we must use the
2296 * new configuration. Our AppendEntries processing will properly update
2297 * the server configuration later, if necessary. */
2298 struct raft_server *s;
2299 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
2300 raft_send_add_server_reply__(raft, &s->sid, s->address, false,
2301 RAFT_SERVER_LOST_LEADERSHIP);
2302 }
2303 if (raft->remove_server) {
2304 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
2305 &raft->remove_server->requester_sid,
2306 raft->remove_server->requester_conn,
2307 false, RAFT_SERVER_LOST_LEADERSHIP);
2308 raft_server_destroy(raft->remove_server);
2309 raft->remove_server = NULL;
2310 }
2311
2312 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2313 }
2314
2315 static void
2316 raft_send_append_request(struct raft *raft,
2317 struct raft_server *peer, unsigned int n,
2318 const char *comment)
2319 {
2320 ovs_assert(raft->role == RAFT_LEADER);
2321
2322 const union raft_rpc rq = {
2323 .append_request = {
2324 .common = {
2325 .type = RAFT_RPC_APPEND_REQUEST,
2326 .sid = peer->sid,
2327 .comment = CONST_CAST(char *, comment),
2328 },
2329 .term = raft->term,
2330 .prev_log_index = peer->next_index - 1,
2331 .prev_log_term = (peer->next_index - 1 >= raft->log_start
2332 ? raft->entries[peer->next_index - 1
2333 - raft->log_start].term
2334 : raft->snap.term),
2335 .leader_commit = raft->commit_index,
2336 .entries = &raft->entries[peer->next_index - raft->log_start],
2337 .n_entries = n,
2338 },
2339 };
2340 raft_send(raft, &rq);
2341 }
2342
2343 static void
2344 raft_send_heartbeats(struct raft *raft)
2345 {
2346 struct raft_server *s;
2347 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2348 if (!uuid_equals(&raft->sid, &s->sid)) {
2349 raft_send_append_request(raft, s, 0, "heartbeat");
2350 }
2351 }
2352
2353 /* Send anyone waiting for a command to complete a ping to let them
2354 * know we're still working on it. */
2355 struct raft_command *cmd;
2356 HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
2357 if (!uuid_is_zero(&cmd->sid)) {
2358 raft_send_execute_command_reply(raft, &cmd->sid,
2359 &cmd->eid,
2360 RAFT_CMD_INCOMPLETE, 0);
2361 }
2362 }
2363 }
2364
2365 /* Initializes the fields in 's' that represent the leader's view of the
2366 * server. */
2367 static void
2368 raft_server_init_leader(struct raft *raft, struct raft_server *s)
2369 {
2370 s->next_index = raft->log_end;
2371 s->match_index = 0;
2372 s->phase = RAFT_PHASE_STABLE;
2373 }
2374
2375 static void
2376 raft_become_leader(struct raft *raft)
2377 {
2378 raft_complete_all_commands(raft, RAFT_CMD_LOST_LEADERSHIP);
2379
2380 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
2381 VLOG_INFO_RL(&rl, "term %"PRIu64": elected leader by %d+ of "
2382 "%"PRIuSIZE" servers", raft->term,
2383 raft->n_votes, hmap_count(&raft->servers));
2384
2385 ovs_assert(raft->role != RAFT_LEADER);
2386 raft->role = RAFT_LEADER;
2387 raft->leader_sid = raft->sid;
2388 raft->election_timeout = LLONG_MAX;
2389 raft->ping_timeout = time_msec() + PING_TIME_MSEC;
2390
2391 struct raft_server *s;
2392 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
2393 raft_server_init_leader(raft, s);
2394 }
2395
2396 raft_update_our_match_index(raft, raft->log_end - 1);
2397 raft_send_heartbeats(raft);
2398
2399 /* Write the fact that we are leader to the log. This is not used by the
2400 * algorithm (although it could be, for quick restart), but it is used for
2401 * offline analysis to check for conformance with the properties that Raft
2402 * guarantees. */
2403 struct raft_record r = {
2404 .type = RAFT_REC_LEADER,
2405 .term = raft->term,
2406 .sid = raft->sid,
2407 };
2408 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2409
2410 /* Initiate a no-op commit. Otherwise we might never find out what's in
2411 * the log. See section 6.4 item 1:
2412 *
2413 * The Leader Completeness Property guarantees that a leader has all
2414 * committed entries, but at the start of its term, it may not know
2415 * which those are. To find out, it needs to commit an entry from its
2416 * term. Raft handles this by having each leader commit a blank no-op
2417 * entry into the log at the start of its term. As soon as this no-op
2418 * entry is committed, the leader’s commit index will be at least as
2419 * large as any other servers’ during its term.
2420 */
2421 raft_command_unref(raft_command_execute__(raft, NULL, NULL, NULL, NULL));
2422 }
2423
2424 /* Processes term 'term' received as part of RPC 'common'. Returns true if the
2425 * caller should continue processing the RPC, false if the caller should reject
2426 * it due to a stale term. */
2427 static bool
2428 raft_receive_term__(struct raft *raft, const struct raft_rpc_common *common,
2429 uint64_t term)
2430 {
2431 /* Section 3.3 says:
2432 *
2433 * Current terms are exchanged whenever servers communicate; if one
2434 * server’s current term is smaller than the other’s, then it updates
2435 * its current term to the larger value. If a candidate or leader
2436 * discovers that its term is out of date, it immediately reverts to
2437 * follower state. If a server receives a request with a stale term
2438 * number, it rejects the request.
2439 */
2440 if (term > raft->term) {
2441 if (!raft_set_term(raft, term, NULL)) {
2442 /* Failed to update the term to 'term'. */
2443 return false;
2444 }
2445 raft_become_follower(raft);
2446 } else if (term < raft->term) {
2447 char buf[SID_LEN + 1];
2448 VLOG_INFO("rejecting term %"PRIu64" < current term %"PRIu64" received "
2449 "in %s message from server %s",
2450 term, raft->term,
2451 raft_rpc_type_to_string(common->type),
2452 raft_get_nickname(raft, &common->sid, buf, sizeof buf));
2453 return false;
2454 }
2455 return true;
2456 }
2457
2458 static void
2459 raft_get_servers_from_log(struct raft *raft, enum vlog_level level)
2460 {
2461 const struct json *servers_json = raft->snap.servers;
2462 for (uint64_t index = raft->log_end - 1; index >= raft->log_start;
2463 index--) {
2464 struct raft_entry *e = &raft->entries[index - raft->log_start];
2465 if (e->servers) {
2466 servers_json = e->servers;
2467 break;
2468 }
2469 }
2470
2471 struct hmap servers;
2472 struct ovsdb_error *error = raft_servers_from_json(servers_json, &servers);
2473 ovs_assert(!error);
2474 raft_set_servers(raft, &servers, level);
2475 raft_servers_destroy(&servers);
2476 }
2477
2478 /* Truncates the log, so that raft->log_end becomes 'new_end'.
2479 *
2480 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2481 * log file, but we don't have the right information to know how long it should
2482 * be. What we actually do is to append entries for older indexes to the
2483 * on-disk log; when we re-read it later, these entries truncate the log.
2484 *
2485 * Returns true if any of the removed log entries were server configuration
2486 * entries, false otherwise. */
2487 static bool
2488 raft_truncate(struct raft *raft, uint64_t new_end)
2489 {
2490 ovs_assert(new_end >= raft->log_start);
2491 if (raft->log_end > new_end) {
2492 char buf[SID_LEN + 1];
2493 VLOG_INFO("%s truncating %"PRIu64 " entries from end of log",
2494 raft_get_nickname(raft, &raft->sid, buf, sizeof buf),
2495 raft->log_end - new_end);
2496 }
2497
2498 bool servers_changed = false;
2499 while (raft->log_end > new_end) {
2500 struct raft_entry *entry = &raft->entries[--raft->log_end
2501 - raft->log_start];
2502 if (entry->servers) {
2503 servers_changed = true;
2504 }
2505 raft_entry_uninit(entry);
2506 }
2507 return servers_changed;
2508 }
2509
2510 static const struct json *
2511 raft_peek_next_entry(struct raft *raft, struct uuid *eid)
2512 {
2513 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2514 ovs_assert(raft->log_start <= raft->last_applied + 2);
2515 ovs_assert(raft->last_applied <= raft->commit_index);
2516 ovs_assert(raft->commit_index < raft->log_end);
2517
2518 if (raft->joining || raft->failed) {
2519 return NULL;
2520 }
2521
2522 if (raft->log_start == raft->last_applied + 2) {
2523 *eid = raft->snap.eid;
2524 return raft->snap.data;
2525 }
2526
2527 while (raft->last_applied < raft->commit_index) {
2528 const struct raft_entry *e = raft_get_entry(raft,
2529 raft->last_applied + 1);
2530 if (e->data) {
2531 *eid = e->eid;
2532 return e->data;
2533 }
2534 raft->last_applied++;
2535 }
2536 return NULL;
2537 }
2538
2539 static const struct json *
2540 raft_get_next_entry(struct raft *raft, struct uuid *eid)
2541 {
2542 const struct json *data = raft_peek_next_entry(raft, eid);
2543 if (data) {
2544 raft->last_applied++;
2545 }
2546 return data;
2547 }
2548
2549 static void
2550 raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
2551 {
2552 if (new_commit_index <= raft->commit_index) {
2553 return;
2554 }
2555
2556 if (raft->role == RAFT_LEADER) {
2557 while (raft->commit_index < new_commit_index) {
2558 uint64_t index = ++raft->commit_index;
2559 const struct raft_entry *e = raft_get_entry(raft, index);
2560 if (e->servers) {
2561 raft_run_reconfigure(raft);
2562 }
2563 if (e->data) {
2564 struct raft_command *cmd
2565 = raft_find_command_by_index(raft, index);
2566 if (cmd) {
2567 raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
2568 }
2569 }
2570 }
2571 } else {
2572 raft->commit_index = new_commit_index;
2573 }
2574
2575 /* Write the commit index to the log. The next time we restart, this
2576 * allows us to start exporting a reasonably fresh log, instead of a log
2577 * that only contains the snapshot. */
2578 struct raft_record r = {
2579 .type = RAFT_REC_COMMIT_INDEX,
2580 .commit_index = raft->commit_index,
2581 };
2582 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2583 }
2584
2585 /* This doesn't use rq->entries (but it does use rq->n_entries). */
2586 static void
2587 raft_send_append_reply(struct raft *raft, const struct raft_append_request *rq,
2588 enum raft_append_result result, const char *comment)
2589 {
2590 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2591 * min(leaderCommit, index of last new entry)" */
2592 if (result == RAFT_APPEND_OK && rq->leader_commit > raft->commit_index) {
2593 raft_update_commit_index(
2594 raft, MIN(rq->leader_commit, rq->prev_log_index + rq->n_entries));
2595 }
2596
2597 /* Send reply. */
2598 union raft_rpc reply = {
2599 .append_reply = {
2600 .common = {
2601 .type = RAFT_RPC_APPEND_REPLY,
2602 .sid = rq->common.sid,
2603 .comment = CONST_CAST(char *, comment),
2604 },
2605 .term = raft->term,
2606 .log_end = raft->log_end,
2607 .prev_log_index = rq->prev_log_index,
2608 .prev_log_term = rq->prev_log_term,
2609 .n_entries = rq->n_entries,
2610 .result = result,
2611 }
2612 };
2613 raft_send(raft, &reply);
2614 }
2615
2616 /* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2617 * NULL. Otherwise, returns an explanation for the mismatch. */
2618 static const char *
2619 match_index_and_term(const struct raft *raft,
2620 uint64_t prev_log_index, uint64_t prev_log_term)
2621 {
2622 if (prev_log_index < raft->log_start - 1) {
2623 return "mismatch before start of log";
2624 } else if (prev_log_index == raft->log_start - 1) {
2625 if (prev_log_term != raft->snap.term) {
2626 return "prev_term mismatch";
2627 }
2628 } else if (prev_log_index < raft->log_end) {
2629 if (raft->entries[prev_log_index - raft->log_start].term
2630 != prev_log_term) {
2631 return "term mismatch";
2632 }
2633 } else {
2634 /* prev_log_index >= raft->log_end */
2635 return "mismatch past end of log";
2636 }
2637 return NULL;
2638 }
2639
2640 static void
2641 raft_handle_append_entries(struct raft *raft,
2642 const struct raft_append_request *rq,
2643 uint64_t prev_log_index, uint64_t prev_log_term,
2644 const struct raft_entry *entries,
2645 unsigned int n_entries)
2646 {
2647 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2648 * the index and term of the entry in its log that immediately precedes
2649 * the new entries. If the follower does not find an entry in its log
2650 * with the same index and term, then it refuses the new entries." */
2651 const char *mismatch = match_index_and_term(raft, prev_log_index,
2652 prev_log_term);
2653 if (mismatch) {
2654 VLOG_INFO("rejecting append_request because previous entry "
2655 "%"PRIu64",%"PRIu64" not in local log (%s)",
2656 prev_log_term, prev_log_index, mismatch);
2657 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY, mismatch);
2658 return;
2659 }
2660
2661 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2662 * index but different terms), delete the existing entry and all that
2663 * follow it." */
2664 unsigned int i;
2665 bool servers_changed = false;
2666 for (i = 0; ; i++) {
2667 if (i >= n_entries) {
2668 /* No change. */
2669 if (rq->common.comment
2670 && !strcmp(rq->common.comment, "heartbeat")) {
2671 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "heartbeat");
2672 } else {
2673 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2674 }
2675 return;
2676 }
2677
2678 uint64_t log_index = (prev_log_index + 1) + i;
2679 if (log_index >= raft->log_end) {
2680 break;
2681 }
2682 if (raft->entries[log_index - raft->log_start].term
2683 != entries[i].term) {
2684 if (raft_truncate(raft, log_index)) {
2685 servers_changed = true;
2686 }
2687 break;
2688 }
2689 }
2690
2691 /* Figure 3.1: "Append any entries not already in the log." */
2692 struct ovsdb_error *error = NULL;
2693 bool any_written = false;
2694 for (; i < n_entries; i++) {
2695 const struct raft_entry *e = &entries[i];
2696 error = raft_write_entry(raft, e->term,
2697 json_nullable_clone(e->data), &e->eid,
2698 json_nullable_clone(e->servers));
2699 if (error) {
2700 break;
2701 }
2702 any_written = true;
2703 if (e->servers) {
2704 servers_changed = true;
2705 }
2706 }
2707
2708 if (any_written) {
2709 raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index
2710 = raft->log_end - 1;
2711 }
2712 if (servers_changed) {
2713 /* The set of servers might have changed; check. */
2714 raft_get_servers_from_log(raft, VLL_INFO);
2715 }
2716
2717 if (error) {
2718 char *s = ovsdb_error_to_string_free(error);
2719 VLOG_ERR("%s", s);
2720 free(s);
2721 raft_send_append_reply(raft, rq, RAFT_APPEND_IO_ERROR, "I/O error");
2722 return;
2723 }
2724
2725 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "log updated");
2726 }
2727
2728 static bool
2729 raft_update_leader(struct raft *raft, const struct uuid *sid)
2730 {
2731 if (raft->role == RAFT_LEADER) {
2732 char buf[SID_LEN + 1];
2733 VLOG_ERR("this server is leader but server %s claims to be",
2734 raft_get_nickname(raft, sid, buf, sizeof buf));
2735 return false;
2736 } else if (!uuid_equals(sid, &raft->leader_sid)) {
2737 if (!uuid_is_zero(&raft->leader_sid)) {
2738 char buf1[SID_LEN + 1];
2739 char buf2[SID_LEN + 1];
2740 VLOG_ERR("leader for term %"PRIu64" changed from %s to %s",
2741 raft->term,
2742 raft_get_nickname(raft, &raft->leader_sid,
2743 buf1, sizeof buf1),
2744 raft_get_nickname(raft, sid, buf2, sizeof buf2));
2745 } else {
2746 char buf[SID_LEN + 1];
2747 VLOG_INFO("server %s is leader for term %"PRIu64,
2748 raft_get_nickname(raft, sid, buf, sizeof buf),
2749 raft->term);
2750 }
2751 raft->leader_sid = *sid;
2752
2753 /* Record the leader to the log. This is not used by the algorithm
2754 * (although it could be, for quick restart), but it is used for
2755 * offline analysis to check for conformance with the properties
2756 * that Raft guarantees. */
2757 struct raft_record r = {
2758 .type = RAFT_REC_LEADER,
2759 .term = raft->term,
2760 .sid = *sid,
2761 };
2762 ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
2763 }
2764 return true;
2765 }
2766
2767 static void
2768 raft_handle_append_request(struct raft *raft,
2769 const struct raft_append_request *rq)
2770 {
2771 /* We do not check whether the server that sent the request is part of the
2772 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
2773 * from a leader that is not part of the server’s latest configuration.
2774 * Otherwise, a new server could never be added to the cluster (it would
2775 * never accept any log entries preceding the configuration entry that adds
2776 * the server)." */
2777 if (!raft_update_leader(raft, &rq->common.sid)) {
2778 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
2779 "usurped leadership");
2780 return;
2781 }
2782 raft_reset_timer(raft);
2783
2784 /* First check for the common case, where the AppendEntries request is
2785 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
2786 * like this:
2787 *
2788 * rq->prev_log_index
2789 * | first_entry_index
2790 * | | nth_entry_index
2791 * | | |
2792 * v v v
2793 * +---+---+---+---+
2794 * T | T | T | T | T |
2795 * +---+-------+---+
2796 * +---+---+---+---+
2797 * T | T | T | T | T |
2798 * +---+---+---+---+
2799 * ^ ^
2800 * | |
2801 * log_start log_end
2802 * */
2803 uint64_t first_entry_index = rq->prev_log_index + 1;
2804 uint64_t nth_entry_index = rq->prev_log_index + rq->n_entries;
2805 if (OVS_LIKELY(first_entry_index >= raft->log_start)) {
2806 raft_handle_append_entries(raft, rq,
2807 rq->prev_log_index, rq->prev_log_term,
2808 rq->entries, rq->n_entries);
2809 return;
2810 }
2811
2812 /* Now a series of checks for odd cases, where the AppendEntries request
2813 * extends earlier than the beginning of our log, into the log entries
2814 * discarded by the most recent snapshot. */
2815
2816 /*
2817 * Handle the case where the indexes covered by rq->entries[] are entirely
2818 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
2819 * everything in the AppendEntries request must already have been
2820 * committed, and we might as well return true.
2821 *
2822 * rq->prev_log_index
2823 * | first_entry_index
2824 * | | nth_entry_index
2825 * | | |
2826 * v v v
2827 * +---+---+---+---+
2828 * T | T | T | T | T |
2829 * +---+-------+---+
2830 * +---+---+---+---+
2831 * T | T | T | T | T |
2832 * +---+---+---+---+
2833 * ^ ^
2834 * | |
2835 * log_start log_end
2836 */
2837 if (nth_entry_index < raft->log_start - 1) {
2838 raft_send_append_reply(raft, rq, RAFT_APPEND_OK,
2839 "append before log start");
2840 return;
2841 }
2842
2843 /*
2844 * Handle the case where the last entry in rq->entries[] has the same index
2845 * as 'log_start - 1', so we can compare their terms:
2846 *
2847 * rq->prev_log_index
2848 * | first_entry_index
2849 * | | nth_entry_index
2850 * | | |
2851 * v v v
2852 * +---+---+---+---+
2853 * T | T | T | T | T |
2854 * +---+-------+---+
2855 * +---+---+---+---+
2856 * T | T | T | T | T |
2857 * +---+---+---+---+
2858 * ^ ^
2859 * | |
2860 * log_start log_end
2861 *
2862 * There's actually a sub-case where rq->n_entries == 0, in which we
2863 * compare rq->prev_term:
2864 *
2865 * rq->prev_log_index
2866 * |
2867 * |
2868 * |
2869 * v
2870 * T
2871 *
2872 * +---+---+---+---+
2873 * T | T | T | T | T |
2874 * +---+---+---+---+
2875 * ^ ^
2876 * | |
2877 * log_start log_end
2878 */
2879 if (nth_entry_index == raft->log_start - 1) {
2880 if (rq->n_entries
2881 ? raft->snap.term == rq->entries[rq->n_entries - 1].term
2882 : raft->snap.term == rq->prev_log_term) {
2883 raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
2884 } else {
2885 raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
2886 "term mismatch");
2887 }
2888 return;
2889 }
2890
2891 /*
2892 * We now know that the data in rq->entries[] overlaps the data in
2893 * raft->entries[], as shown below, with some positive 'ofs':
2894 *
2895 * rq->prev_log_index
2896 * | first_entry_index
2897 * | | nth_entry_index
2898 * | | |
2899 * v v v
2900 * +---+---+---+---+---+
2901 * T | T | T | T | T | T |
2902 * +---+-------+---+---+
2903 * +---+---+---+---+
2904 * T | T | T | T | T |
2905 * +---+---+---+---+
2906 * ^ ^
2907 * | |
2908 * log_start log_end
2909 *
2910 * |<-- ofs -->|
2911 *
2912 * We transform this into the following by trimming the first 'ofs'
2913 * elements off of rq->entries[], ending up with the following. Notice how
2914 * we retain the term but not the data for rq->entries[ofs - 1]:
2915 *
2916 * first_entry_index + ofs - 1
2917 * | first_entry_index + ofs
2918 * | | nth_entry_index + ofs
2919 * | | |
2920 * v v v
2921 * +---+---+
2922 * T | T | T |
2923 * +---+---+
2924 * +---+---+---+---+
2925 * T | T | T | T | T |
2926 * +---+---+---+---+
2927 * ^ ^
2928 * | |
2929 * log_start log_end
2930 */
2931 uint64_t ofs = raft->log_start - first_entry_index;
2932 raft_handle_append_entries(raft, rq,
2933 raft->log_start - 1, rq->entries[ofs - 1].term,
2934 &rq->entries[ofs], rq->n_entries - ofs);
2935 }
2936
2937 /* Returns true if 'raft' has another log entry or snapshot to read. */
2938 bool
2939 raft_has_next_entry(const struct raft *raft_)
2940 {
2941 struct raft *raft = CONST_CAST(struct raft *, raft_);
2942 struct uuid eid;
2943 return raft_peek_next_entry(raft, &eid) != NULL;
2944 }
2945
2946 /* Returns the next log entry or snapshot from 'raft', or NULL if there are
2947 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
2948 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
2949 * log entry. */
2950 const struct json *
2951 raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
2952 {
2953 const struct json *data = raft_get_next_entry(raft, eid);
2954 *is_snapshot = data == raft->snap.data;
2955 return data;
2956 }
2957
2958 /* Returns the log index of the last-read snapshot or log entry. */
2959 uint64_t
2960 raft_get_applied_index(const struct raft *raft)
2961 {
2962 return raft->last_applied;
2963 }
2964
2965 /* Returns the log index of the last snapshot or log entry that is available to
2966 * be read. */
2967 uint64_t
2968 raft_get_commit_index(const struct raft *raft)
2969 {
2970 return raft->commit_index;
2971 }
2972
2973 static struct raft_server *
2974 raft_find_peer(struct raft *raft, const struct uuid *uuid)
2975 {
2976 struct raft_server *s = raft_find_server(raft, uuid);
2977 return s && !uuid_equals(&raft->sid, &s->sid) ? s : NULL;
2978 }
2979
2980 static struct raft_server *
2981 raft_find_new_server(struct raft *raft, const struct uuid *uuid)
2982 {
2983 return raft_server_find(&raft->add_servers, uuid);
2984 }
2985
2986 /* Figure 3.1: "If there exists an N such that N > commitIndex, a
2987 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
2988 * commitIndex = N (sections 3.5 and 3.6)." */
2989 static void
2990 raft_consider_updating_commit_index(struct raft *raft)
2991 {
2992 /* This loop cannot just bail out when it comes across a log entry that
2993 * does not match the criteria. For example, Figure 3.7(d2) shows a
2994 * case where the log entry for term 2 cannot be committed directly
2995 * (because it is not for the current term) but it can be committed as
2996 * a side effect of commit the entry for term 4 (the current term).
2997 * XXX Is there a more efficient way to do this? */
2998 ovs_assert(raft->role == RAFT_LEADER);
2999
3000 uint64_t new_commit_index = raft->commit_index;
3001 for (uint64_t idx = MAX(raft->commit_index + 1, raft->log_start);
3002 idx < raft->log_end; idx++) {
3003 if (raft->entries[idx - raft->log_start].term == raft->term) {
3004 size_t count = 0;
3005 struct raft_server *s2;
3006 HMAP_FOR_EACH (s2, hmap_node, &raft->servers) {
3007 if (s2->match_index >= idx) {
3008 count++;
3009 }
3010 }
3011 if (count > hmap_count(&raft->servers) / 2) {
3012 VLOG_DBG("index %"PRIu64" committed to %"PRIuSIZE" servers, "
3013 "applying", idx, count);
3014 new_commit_index = idx;
3015 }
3016 }
3017 }
3018 raft_update_commit_index(raft, new_commit_index);
3019 }
3020
3021 static void
3022 raft_update_match_index(struct raft *raft, struct raft_server *s,
3023 uint64_t min_index)
3024 {
3025 ovs_assert(raft->role == RAFT_LEADER);
3026 if (min_index > s->match_index) {
3027 s->match_index = min_index;
3028 raft_consider_updating_commit_index(raft);
3029 }
3030 }
3031
3032 static void
3033 raft_update_our_match_index(struct raft *raft, uint64_t min_index)
3034 {
3035 raft_update_match_index(raft, raft_find_server(raft, &raft->sid),
3036 min_index);
3037 }
3038
3039 static void
3040 raft_send_install_snapshot_request(struct raft *raft,
3041 const struct raft_server *s,
3042 const char *comment)
3043 {
3044 union raft_rpc rpc = {
3045 .install_snapshot_request = {
3046 .common = {
3047 .type = RAFT_RPC_INSTALL_SNAPSHOT_REQUEST,
3048 .sid = s->sid,
3049 .comment = CONST_CAST(char *, comment),
3050 },
3051 .term = raft->term,
3052 .last_index = raft->log_start - 1,
3053 .last_term = raft->snap.term,
3054 .last_servers = raft->snap.servers,
3055 .last_eid = raft->snap.eid,
3056 .data = raft->snap.data,
3057 }
3058 };
3059 raft_send(raft, &rpc);
3060 }
3061
3062 static void
3063 raft_handle_append_reply(struct raft *raft,
3064 const struct raft_append_reply *rpy)
3065 {
3066 if (raft->role != RAFT_LEADER) {
3067 VLOG_INFO("rejected append_reply (not leader)");
3068 return;
3069 }
3070
3071 /* Most commonly we'd be getting an AppendEntries reply from a configured
3072 * server (e.g. a peer), but we can also get them from servers in the
3073 * process of being added. */
3074 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3075 if (!s) {
3076 s = raft_find_new_server(raft, &rpy->common.sid);
3077 if (!s) {
3078 VLOG_INFO("rejected append_reply from unknown server "SID_FMT,
3079 SID_ARGS(&rpy->common.sid));
3080 return;
3081 }
3082 }
3083
3084 if (rpy->result == RAFT_APPEND_OK) {
3085 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3086 * follower (section 3.5)." */
3087 uint64_t min_index = rpy->prev_log_index + rpy->n_entries + 1;
3088 if (s->next_index < min_index) {
3089 s->next_index = min_index;
3090 }
3091 raft_update_match_index(raft, s, min_index - 1);
3092 } else {
3093 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3094 * decrement nextIndex and retry (section 3.5)."
3095 *
3096 * We also implement the optimization suggested in section 4.2.1:
3097 * "Various approaches can make nextIndex converge to its correct value
3098 * more quickly, including those described in Chapter 3. The simplest
3099 * approach to solving this particular problem of adding a new server,
3100 * however, is to have followers return the length of their logs in the
3101 * AppendEntries response; this allows the leader to cap the follower’s
3102 * nextIndex accordingly." */
3103 s->next_index = (s->next_index > 0
3104 ? MIN(s->next_index - 1, rpy->log_end)
3105 : 0);
3106
3107 if (rpy->result == RAFT_APPEND_IO_ERROR) {
3108 /* Append failed but not because of a log inconsistency. Because
3109 * of the I/O error, there's no point in re-sending the append
3110 * immediately. */
3111 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3112 VLOG_INFO_RL(&rl, "%s reported I/O error", s->nickname);
3113 return;
3114 }
3115 }
3116
3117 /*
3118 * Our behavior here must depend on the value of next_index relative to
3119 * log_start and log_end. There are three cases:
3120 *
3121 * Case 1 | Case 2 | Case 3
3122 * <---------------->|<------------->|<------------------>
3123 * | |
3124 *
3125 * +---+---+---+---+
3126 * T | T | T | T | T |
3127 * +---+---+---+---+
3128 * ^ ^
3129 * | |
3130 * log_start log_end
3131 */
3132 if (s->next_index < raft->log_start) {
3133 /* Case 1. */
3134 raft_send_install_snapshot_request(raft, s, NULL);
3135 } else if (s->next_index < raft->log_end) {
3136 /* Case 2. */
3137 raft_send_append_request(raft, s, 1, NULL);
3138 } else {
3139 /* Case 3. */
3140 if (s->phase == RAFT_PHASE_CATCHUP) {
3141 s->phase = RAFT_PHASE_CAUGHT_UP;
3142 raft_run_reconfigure(raft);
3143 }
3144 }
3145 }
3146
3147 static bool
3148 raft_should_suppress_disruptive_server(struct raft *raft,
3149 const union raft_rpc *rpc)
3150 {
3151 if (rpc->type != RAFT_RPC_VOTE_REQUEST) {
3152 return false;
3153 }
3154
3155 /* Section 4.2.3 "Disruptive Servers" says:
3156 *
3157 * ...if a server receives a RequestVote request within the minimum
3158 * election timeout of hearing from a current leader, it does not update
3159 * its term or grant its vote...
3160 *
3161 * ...This change conflicts with the leadership transfer mechanism as
3162 * described in Chapter 3, in which a server legitimately starts an
3163 * election without waiting an election timeout. In that case,
3164 * RequestVote messages should be processed by other servers even when
3165 * they believe a current cluster leader exists. Those RequestVote
3166 * requests can include a special flag to indicate this behavior (“I
3167 * have permission to disrupt the leader--it told me to!”).
3168 *
3169 * This clearly describes how the followers should act, but not the leader.
3170 * We just ignore vote requests that arrive at a current leader. This
3171 * seems to be fairly safe, since a majority other than the current leader
3172 * can still elect a new leader and the first AppendEntries from that new
3173 * leader will depose the current leader. */
3174 const struct raft_vote_request *rq = raft_vote_request_cast(rpc);
3175 if (rq->leadership_transfer) {
3176 return false;
3177 }
3178
3179 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3180 long long int now = time_msec();
3181 switch (raft->role) {
3182 case RAFT_LEADER:
3183 VLOG_WARN_RL(&rl, "ignoring vote request received as leader");
3184 return true;
3185
3186 case RAFT_FOLLOWER:
3187 if (now < raft->election_base + ELECTION_BASE_MSEC) {
3188 VLOG_WARN_RL(&rl, "ignoring vote request received after only "
3189 "%lld ms (minimum election time is %d ms)",
3190 now - raft->election_base, ELECTION_BASE_MSEC);
3191 return true;
3192 }
3193 return false;
3194
3195 case RAFT_CANDIDATE:
3196 return false;
3197
3198 default:
3199 OVS_NOT_REACHED();
3200 }
3201 }
3202
3203 /* Returns true if a reply should be sent. */
3204 static bool
3205 raft_handle_vote_request__(struct raft *raft,
3206 const struct raft_vote_request *rq)
3207 {
3208 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3209 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3210 * 3.6)." */
3211 if (uuid_equals(&raft->vote, &rq->common.sid)) {
3212 /* Already voted for this candidate in this term. Resend vote. */
3213 return true;
3214 } else if (!uuid_is_zero(&raft->vote)) {
3215 /* Already voted for different candidate in this term. Send a reply
3216 * saying what candidate we did vote for. This isn't a necessary part
3217 * of the Raft protocol but it can make debugging easier. */
3218 return true;
3219 }
3220
3221 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3222 * includes information about the candidate’s log, and the voter denies its
3223 * vote if its own log is more up-to-date than that of the candidate. Raft
3224 * determines which of two logs is more up-to-date by comparing the index
3225 * and term of the last entries in the logs. If the logs have last entries
3226 * with different terms, then the log with the later term is more
3227 * up-to-date. If the logs end with the same term, then whichever log is
3228 * longer is more up-to-date." */
3229 uint64_t last_term = (raft->log_end > raft->log_start
3230 ? raft->entries[raft->log_end - 1
3231 - raft->log_start].term
3232 : raft->snap.term);
3233 if (last_term > rq->last_log_term
3234 || (last_term == rq->last_log_term
3235 && raft->log_end - 1 > rq->last_log_index)) {
3236 /* Our log is more up-to-date than the peer's. Withhold vote. */
3237 return false;
3238 }
3239
3240 /* Record a vote for the peer. */
3241 if (!raft_set_term(raft, raft->term, &rq->common.sid)) {
3242 return false;
3243 }
3244
3245 raft_reset_timer(raft);
3246
3247 return true;
3248 }
3249
3250 static void
3251 raft_send_vote_reply(struct raft *raft, const struct uuid *dst,
3252 const struct uuid *vote)
3253 {
3254 union raft_rpc rpy = {
3255 .vote_reply = {
3256 .common = {
3257 .type = RAFT_RPC_VOTE_REPLY,
3258 .sid = *dst,
3259 },
3260 .term = raft->term,
3261 .vote = *vote,
3262 },
3263 };
3264 raft_send(raft, &rpy);
3265 }
3266
3267 static void
3268 raft_handle_vote_request(struct raft *raft,
3269 const struct raft_vote_request *rq)
3270 {
3271 if (raft_handle_vote_request__(raft, rq)) {
3272 raft_send_vote_reply(raft, &rq->common.sid, &raft->vote);
3273 }
3274 }
3275
3276 static void
3277 raft_handle_vote_reply(struct raft *raft,
3278 const struct raft_vote_reply *rpy)
3279 {
3280 if (!raft_receive_term__(raft, &rpy->common, rpy->term)) {
3281 return;
3282 }
3283
3284 if (raft->role != RAFT_CANDIDATE) {
3285 return;
3286 }
3287
3288 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3289 if (s) {
3290 raft_accept_vote(raft, s, &rpy->vote);
3291 }
3292 }
3293
3294 /* Returns true if 'raft''s log contains reconfiguration entries that have not
3295 * yet been committed. */
3296 static bool
3297 raft_has_uncommitted_configuration(const struct raft *raft)
3298 {
3299 for (uint64_t i = raft->commit_index + 1; i < raft->log_end; i++) {
3300 ovs_assert(i >= raft->log_start);
3301 const struct raft_entry *e = &raft->entries[i - raft->log_start];
3302 if (e->servers) {
3303 return true;
3304 }
3305 }
3306 return false;
3307 }
3308
3309 static void
3310 raft_log_reconfiguration(struct raft *raft)
3311 {
3312 struct json *servers_json = raft_servers_to_json(&raft->servers);
3313 raft_command_unref(raft_command_execute__(
3314 raft, NULL, servers_json, NULL, NULL));
3315 json_destroy(servers_json);
3316 }
3317
3318 static void
3319 raft_run_reconfigure(struct raft *raft)
3320 {
3321 ovs_assert(raft->role == RAFT_LEADER);
3322
3323 /* Reconfiguration only progresses when configuration changes commit. */
3324 if (raft_has_uncommitted_configuration(raft)) {
3325 return;
3326 }
3327
3328 /* If we were waiting for a configuration change to commit, it's done. */
3329 struct raft_server *s;
3330 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3331 if (s->phase == RAFT_PHASE_COMMITTING) {
3332 raft_send_add_server_reply__(raft, &s->sid, s->address,
3333 true, RAFT_SERVER_COMPLETED);
3334 s->phase = RAFT_PHASE_STABLE;
3335 }
3336 }
3337 if (raft->remove_server) {
3338 raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
3339 &raft->remove_server->requester_sid,
3340 raft->remove_server->requester_conn,
3341 true, RAFT_SERVER_COMPLETED);
3342 raft_server_destroy(raft->remove_server);
3343 raft->remove_server = NULL;
3344 }
3345
3346 /* If a new server is caught up, add it to the configuration. */
3347 HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
3348 if (s->phase == RAFT_PHASE_CAUGHT_UP) {
3349 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3350 hmap_remove(&raft->add_servers, &s->hmap_node);
3351 hmap_insert(&raft->servers, &s->hmap_node, uuid_hash(&s->sid));
3352
3353 /* Mark 's' as waiting for commit. */
3354 s->phase = RAFT_PHASE_COMMITTING;
3355
3356 raft_log_reconfiguration(raft);
3357
3358 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3359 * send a RAFT_SERVER_OK reply. */
3360
3361 return;
3362 }
3363 }
3364
3365 /* Remove a server, if one is scheduled for removal. */
3366 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3367 if (s->phase == RAFT_PHASE_REMOVE) {
3368 hmap_remove(&raft->servers, &s->hmap_node);
3369 raft->remove_server = s;
3370
3371 raft_log_reconfiguration(raft);
3372
3373 return;
3374 }
3375 }
3376 }
3377
3378 static void
3379 raft_handle_add_server_request(struct raft *raft,
3380 const struct raft_add_server_request *rq)
3381 {
3382 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3383 if (raft->role != RAFT_LEADER) {
3384 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3385 return;
3386 }
3387
3388 /* Check for an existing server. */
3389 struct raft_server *s = raft_find_server(raft, &rq->common.sid);
3390 if (s) {
3391 /* If the server is scheduled to be removed, cancel it. */
3392 if (s->phase == RAFT_PHASE_REMOVE) {
3393 s->phase = RAFT_PHASE_STABLE;
3394 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_CANCELED);
3395 return;
3396 }
3397
3398 /* If the server is being added, then it's in progress. */
3399 if (s->phase != RAFT_PHASE_STABLE) {
3400 raft_send_add_server_reply(raft, rq,
3401 false, RAFT_SERVER_IN_PROGRESS);
3402 }
3403
3404 /* Nothing to do--server is already part of the configuration. */
3405 raft_send_add_server_reply(raft, rq,
3406 true, RAFT_SERVER_ALREADY_PRESENT);
3407 return;
3408 }
3409
3410 /* Check for a server being removed. */
3411 if (raft->remove_server
3412 && uuid_equals(&rq->common.sid, &raft->remove_server->sid)) {
3413 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3414 return;
3415 }
3416
3417 /* Check for a server already being added. */
3418 if (raft_find_new_server(raft, &rq->common.sid)) {
3419 raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_IN_PROGRESS);
3420 return;
3421 }
3422
3423 /* Add server to 'add_servers'. */
3424 s = raft_server_add(&raft->add_servers, &rq->common.sid, rq->address);
3425 raft_server_init_leader(raft, s);
3426 s->requester_sid = rq->common.sid;
3427 s->requester_conn = NULL;
3428 s->phase = RAFT_PHASE_CATCHUP;
3429
3430 /* Start sending the log. If this is the first time we've tried to add
3431 * this server, then this will quickly degenerate into an InstallSnapshot
3432 * followed by a series of AddEntries, but if it's a retry of an earlier
3433 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3434 * leadership) then it will gracefully resume populating the log.
3435 *
3436 * See the last few paragraphs of section 4.2.1 for further insight. */
3437 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3438 VLOG_INFO_RL(&rl,
3439 "starting to add server %s ("SID_FMT" at %s) "
3440 "to cluster "CID_FMT, s->nickname, SID_ARGS(&s->sid),
3441 rq->address, CID_ARGS(&raft->cid));
3442 raft_send_append_request(raft, s, 0, "initialize new server");
3443 }
3444
3445 static void
3446 raft_handle_add_server_reply(struct raft *raft,
3447 const struct raft_add_server_reply *rpy)
3448 {
3449 if (!raft->joining) {
3450 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3451 VLOG_WARN_RL(&rl, "received add_server_reply even though we're "
3452 "already part of the cluster");
3453 return;
3454 }
3455
3456 if (rpy->success) {
3457 raft->joining = false;
3458
3459 /* It is tempting, at this point, to check that this server is part of
3460 * the current configuration. However, this is not necessarily the
3461 * case, because the log entry that added this server to the cluster
3462 * might have been committed by a majority of the cluster that does not
3463 * include this one. This actually happens in testing. */
3464 } else {
3465 const char *address;
3466 SSET_FOR_EACH (address, &rpy->remote_addresses) {
3467 if (sset_add(&raft->remote_addresses, address)) {
3468 VLOG_INFO("%s: learned new server address for joining cluster",
3469 address);
3470 }
3471 }
3472 }
3473 }
3474
3475 /* This is called by raft_unixctl_kick() as well as via RPC. */
3476 static void
3477 raft_handle_remove_server_request(struct raft *raft,
3478 const struct raft_remove_server_request *rq)
3479 {
3480 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3481 if (raft->role != RAFT_LEADER) {
3482 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
3483 return;
3484 }
3485
3486 /* If the server to remove is currently waiting to be added, cancel it. */
3487 struct raft_server *target = raft_find_new_server(raft, &rq->sid);
3488 if (target) {
3489 raft_send_add_server_reply__(raft, &target->sid, target->address,
3490 false, RAFT_SERVER_CANCELED);
3491 hmap_remove(&raft->add_servers, &target->hmap_node);
3492 raft_server_destroy(target);
3493 return;
3494 }
3495
3496 /* If the server isn't configured, report that. */
3497 target = raft_find_server(raft, &rq->sid);
3498 if (!target) {
3499 raft_send_remove_server_reply(raft, rq,
3500 true, RAFT_SERVER_ALREADY_GONE);
3501 return;
3502 }
3503
3504 /* Check whether we're waiting for the addition of the server to commit. */
3505 if (target->phase == RAFT_PHASE_COMMITTING) {
3506 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
3507 return;
3508 }
3509
3510 /* Check whether the server is already scheduled for removal. */
3511 if (target->phase == RAFT_PHASE_REMOVE) {
3512 raft_send_remove_server_reply(raft, rq,
3513 false, RAFT_SERVER_IN_PROGRESS);
3514 return;
3515 }
3516
3517 /* Make sure that if we remove this server then that at least one other
3518 * server will be left. We don't count servers currently being added (in
3519 * 'add_servers') since those could fail. */
3520 struct raft_server *s;
3521 int n = 0;
3522 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
3523 if (s != target && s->phase != RAFT_PHASE_REMOVE) {
3524 n++;
3525 }
3526 }
3527 if (!n) {
3528 raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_EMPTY);
3529 return;
3530 }
3531
3532 /* Mark the server for removal. */
3533 target->phase = RAFT_PHASE_REMOVE;
3534 if (rq->requester_conn) {
3535 target->requester_sid = UUID_ZERO;
3536 unixctl_command_reply(rq->requester_conn, "started removal");
3537 } else {
3538 target->requester_sid = rq->common.sid;
3539 target->requester_conn = NULL;
3540 }
3541
3542 raft_run_reconfigure(raft);
3543 /* Operation in progress, reply will be sent later. */
3544 }
3545
3546 static void
3547 raft_handle_remove_server_reply(struct raft *raft,
3548 const struct raft_remove_server_reply *rpc)
3549 {
3550 if (rpc->success) {
3551 VLOG_INFO(SID_FMT": finished leaving cluster "CID_FMT,
3552 SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
3553
3554 raft_record_note(raft, "left", "this server left the cluster");
3555
3556 raft->leaving = false;
3557 raft->left = true;
3558 }
3559 }
3560
3561 static bool
3562 raft_handle_write_error(struct raft *raft, struct ovsdb_error *error)
3563 {
3564 if (error && !raft->failed) {
3565 raft->failed = true;
3566
3567 char *s = ovsdb_error_to_string_free(error);
3568 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3569 raft->name, s);
3570 free(s);
3571 }
3572 return !raft->failed;
3573 }
3574
3575 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3576 raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
3577 uint64_t new_log_start,
3578 const struct raft_entry *new_snapshot)
3579 {
3580 struct raft_header h = {
3581 .sid = raft->sid,
3582 .cid = raft->cid,
3583 .name = raft->name,
3584 .local_address = raft->local_address,
3585 .snap_index = new_log_start - 1,
3586 .snap = *new_snapshot,
3587 };
3588 struct ovsdb_error *error = ovsdb_log_write_and_free(
3589 log, raft_header_to_json(&h));
3590 if (error) {
3591 return error;
3592 }
3593 ovsdb_log_mark_base(raft->log);
3594
3595 /* Write log records. */
3596 for (uint64_t index = new_log_start; index < raft->log_end; index++) {
3597 const struct raft_entry *e = &raft->entries[index - raft->log_start];
3598 struct raft_record r = {
3599 .type = RAFT_REC_ENTRY,
3600 .term = e->term,
3601 .entry = {
3602 .index = index,
3603 .data = e->data,
3604 .servers = e->servers,
3605 .eid = e->eid,
3606 },
3607 };
3608 error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3609 if (error) {
3610 return error;
3611 }
3612 }
3613
3614 /* Write term and vote (if any).
3615 *
3616 * The term is redundant if we wrote a log record for that term above. The
3617 * vote, if any, is never redundant.
3618 */
3619 error = raft_write_state(log, raft->term, &raft->vote);
3620 if (error) {
3621 return error;
3622 }
3623
3624 /* Write commit_index if it's beyond the new start of the log. */
3625 if (raft->commit_index >= new_log_start) {
3626 struct raft_record r = {
3627 .type = RAFT_REC_COMMIT_INDEX,
3628 .commit_index = raft->commit_index,
3629 };
3630 return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
3631 }
3632 return NULL;
3633 }
3634
3635 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3636 raft_save_snapshot(struct raft *raft,
3637 uint64_t new_start, const struct raft_entry *new_snapshot)
3638
3639 {
3640 struct ovsdb_log *new_log;
3641 struct ovsdb_error *error;
3642 error = ovsdb_log_replace_start(raft->log, &new_log);
3643 if (error) {
3644 return error;
3645 }
3646
3647 error = raft_write_snapshot(raft, new_log, new_start, new_snapshot);
3648 if (error) {
3649 ovsdb_log_replace_abort(new_log);
3650 return error;
3651 }
3652
3653 return ovsdb_log_replace_commit(raft->log, new_log);
3654 }
3655
3656 static bool
3657 raft_handle_install_snapshot_request__(
3658 struct raft *raft, const struct raft_install_snapshot_request *rq)
3659 {
3660 raft_reset_timer(raft);
3661
3662 /*
3663 * Our behavior here depend on new_log_start in the snapshot compared to
3664 * log_start and log_end. There are three cases:
3665 *
3666 * Case 1 | Case 2 | Case 3
3667 * <---------------->|<------------->|<------------------>
3668 * | |
3669 *
3670 * +---+---+---+---+
3671 * T | T | T | T | T |
3672 * +---+---+---+---+
3673 * ^ ^
3674 * | |
3675 * log_start log_end
3676 */
3677 uint64_t new_log_start = rq->last_index + 1;
3678 if (new_log_start < raft->log_start) {
3679 /* Case 1: The new snapshot covers less than our current one. Nothing
3680 * to do. */
3681 return true;
3682 } else if (new_log_start < raft->log_end) {
3683 /* Case 2: The new snapshot starts in the middle of our log. We could
3684 * discard the first 'new_log_start - raft->log_start' entries in the
3685 * log. But there's not much value in that, since snapshotting is
3686 * supposed to be a local decision. Just skip it. */
3687 return true;
3688 }
3689
3690 /* Case 3: The new snapshot starts past the end of our current log, so
3691 * discard all of our current log. */
3692 const struct raft_entry new_snapshot = {
3693 .term = rq->last_term,
3694 .data = rq->data,
3695 .eid = rq->last_eid,
3696 .servers = rq->last_servers,
3697 };
3698 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3699 &new_snapshot);
3700 if (error) {
3701 char *error_s = ovsdb_error_to_string(error);
3702 VLOG_WARN("could not save snapshot: %s", error_s);
3703 free(error_s);
3704 return false;
3705 }
3706
3707 for (size_t i = 0; i < raft->log_end - raft->log_start; i++) {
3708 raft_entry_uninit(&raft->entries[i]);
3709 }
3710 raft->log_start = raft->log_end = new_log_start;
3711 raft->log_synced = raft->log_end - 1;
3712 raft->commit_index = raft->log_start - 1;
3713 if (raft->last_applied < raft->commit_index) {
3714 raft->last_applied = raft->log_start - 2;
3715 }
3716
3717 raft_entry_uninit(&raft->snap);
3718 raft_entry_clone(&raft->snap, &new_snapshot);
3719
3720 raft_get_servers_from_log(raft, VLL_INFO);
3721
3722 return true;
3723 }
3724
3725 static void
3726 raft_handle_install_snapshot_request(
3727 struct raft *raft, const struct raft_install_snapshot_request *rq)
3728 {
3729 if (raft_handle_install_snapshot_request__(raft, rq)) {
3730 union raft_rpc rpy = {
3731 .install_snapshot_reply = {
3732 .common = {
3733 .type = RAFT_RPC_INSTALL_SNAPSHOT_REPLY,
3734 .sid = rq->common.sid,
3735 },
3736 .term = raft->term,
3737 .last_index = rq->last_index,
3738 .last_term = rq->last_term,
3739 },
3740 };
3741 raft_send(raft, &rpy);
3742 }
3743 }
3744
3745 static void
3746 raft_handle_install_snapshot_reply(
3747 struct raft *raft, const struct raft_install_snapshot_reply *rpy)
3748 {
3749 /* We might get an InstallSnapshot reply from a configured server (e.g. a
3750 * peer) or a server in the process of being added. */
3751 struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
3752 if (!s) {
3753 s = raft_find_new_server(raft, &rpy->common.sid);
3754 if (!s) {
3755 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3756 VLOG_INFO_RL(&rl, "cluster "CID_FMT": received %s from "
3757 "unknown server "SID_FMT, CID_ARGS(&raft->cid),
3758 raft_rpc_type_to_string(rpy->common.type),
3759 SID_ARGS(&rpy->common.sid));
3760 return;
3761 }
3762 }
3763
3764 if (rpy->last_index != raft->log_start - 1 ||
3765 rpy->last_term != raft->snap.term) {
3766 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3767 VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s installed "
3768 "out-of-date snapshot, starting over",
3769 CID_ARGS(&raft->cid), s->nickname);
3770 raft_send_install_snapshot_request(raft, s,
3771 "installed obsolete snapshot");
3772 return;
3773 }
3774
3775 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
3776 VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
3777 " up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
3778 s->nickname, rpy->last_term, rpy->last_index);
3779 s->next_index = raft->log_end;
3780 raft_send_append_request(raft, s, 0, "snapshot installed");
3781 }
3782
3783 /* Returns true if 'raft' has grown enough since the last snapshot that
3784 * reducing the log to a snapshot would be valuable, false otherwise. */
3785 bool
3786 raft_grew_lots(const struct raft *raft)
3787 {
3788 return ovsdb_log_grew_lots(raft->log);
3789 }
3790
3791 /* Returns the number of log entries that could be trimmed off the on-disk log
3792 * by snapshotting. */
3793 uint64_t
3794 raft_get_log_length(const struct raft *raft)
3795 {
3796 return (raft->last_applied < raft->log_start
3797 ? 0
3798 : raft->last_applied - raft->log_start + 1);
3799 }
3800
3801 /* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
3802 * possible. */
3803 bool
3804 raft_may_snapshot(const struct raft *raft)
3805 {
3806 return (!raft->joining
3807 && !raft->leaving
3808 && !raft->left
3809 && !raft->failed
3810 && raft->last_applied >= raft->log_start);
3811 }
3812
3813 /* Replaces the log for 'raft', up to the last log entry read, by
3814 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
3815 * the caller must eventually free.
3816 *
3817 * This function can only succeed if raft_may_snapshot() returns true. It is
3818 * only valuable to call it if raft_get_log_length() is significant and
3819 * especially if raft_grew_lots() returns true. */
3820 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
3821 raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
3822 {
3823 if (raft->joining) {
3824 return ovsdb_error(NULL,
3825 "cannot store a snapshot while joining cluster");
3826 } else if (raft->leaving) {
3827 return ovsdb_error(NULL,
3828 "cannot store a snapshot while leaving cluster");
3829 } else if (raft->left) {
3830 return ovsdb_error(NULL,
3831 "cannot store a snapshot after leaving cluster");
3832 } else if (raft->failed) {
3833 return ovsdb_error(NULL,
3834 "cannot store a snapshot following failure");
3835 }
3836
3837 if (raft->last_applied < raft->log_start) {
3838 return ovsdb_error(NULL, "not storing a duplicate snapshot");
3839 }
3840
3841 uint64_t new_log_start = raft->last_applied + 1;
3842 const struct raft_entry new_snapshot = {
3843 .term = raft_get_term(raft, new_log_start - 1),
3844 .data = CONST_CAST(struct json *, new_snapshot_data),
3845 .eid = *raft_get_eid(raft, new_log_start - 1),
3846 .servers = CONST_CAST(struct json *,
3847 raft_servers_for_index(raft, new_log_start - 1)),
3848 };
3849 struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
3850 &new_snapshot);
3851 if (error) {
3852 return error;
3853 }
3854
3855 raft->log_synced = raft->log_end - 1;
3856 raft_entry_uninit(&raft->snap);
3857 raft_entry_clone(&raft->snap, &new_snapshot);
3858 for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
3859 raft_entry_uninit(&raft->entries[i]);
3860 }
3861 memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
3862 (raft->log_end - new_log_start) * sizeof *raft->entries);
3863 raft->log_start = new_log_start;
3864 return NULL;
3865 }
3866
3867 static void
3868 raft_handle_become_leader(struct raft *raft,
3869 const struct raft_become_leader *rq)
3870 {
3871 if (raft->role == RAFT_FOLLOWER) {
3872 char buf[SID_LEN + 1];
3873 VLOG_INFO("received leadership transfer from %s in term %"PRIu64,
3874 raft_get_nickname(raft, &rq->common.sid, buf, sizeof buf),
3875 rq->term);
3876 raft_start_election(raft, true);
3877 }
3878 }
3879
3880 static void
3881 raft_send_execute_command_reply(struct raft *raft,
3882 const struct uuid *sid,
3883 const struct uuid *eid,
3884 enum raft_command_status status,
3885 uint64_t commit_index)
3886 {
3887 union raft_rpc rpc = {
3888 .execute_command_reply = {
3889 .common = {
3890 .type = RAFT_RPC_EXECUTE_COMMAND_REPLY,
3891 .sid = *sid,
3892 },
3893 .result = *eid,
3894 .status = status,
3895 .commit_index = commit_index,
3896 },
3897 };
3898 raft_send(raft, &rpc);
3899 }
3900
3901 static enum raft_command_status
3902 raft_handle_execute_command_request__(
3903 struct raft *raft, const struct raft_execute_command_request *rq)
3904 {
3905 if (raft->role != RAFT_LEADER) {
3906 return RAFT_CMD_NOT_LEADER;
3907 }
3908
3909 const struct uuid *current_eid = raft_current_eid(raft);
3910 if (!uuid_equals(&rq->prereq, current_eid)) {
3911 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3912 VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
3913 "prerequisite "UUID_FMT" in execute_command_request",
3914 UUID_ARGS(current_eid), UUID_ARGS(&rq->prereq));
3915 return RAFT_CMD_BAD_PREREQ;
3916 }
3917
3918 struct raft_command *cmd = raft_command_initiate(raft, rq->data,
3919 NULL, &rq->result);
3920 cmd->sid = rq->common.sid;
3921
3922 enum raft_command_status status = cmd->status;
3923 if (status != RAFT_CMD_INCOMPLETE) {
3924 raft_command_unref(cmd);
3925 }
3926 return status;
3927 }
3928
3929 static void
3930 raft_handle_execute_command_request(
3931 struct raft *raft, const struct raft_execute_command_request *rq)
3932 {
3933 enum raft_command_status status
3934 = raft_handle_execute_command_request__(raft, rq);
3935 if (status != RAFT_CMD_INCOMPLETE) {
3936 raft_send_execute_command_reply(raft, &rq->common.sid, &rq->result,
3937 status, 0);
3938 }
3939 }
3940
3941 static void
3942 raft_handle_execute_command_reply(
3943 struct raft *raft, const struct raft_execute_command_reply *rpy)
3944 {
3945 struct raft_command *cmd = raft_find_command_by_eid(raft, &rpy->result);
3946 if (!cmd) {
3947 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
3948 char buf[SID_LEN + 1];
3949 VLOG_INFO_RL(&rl,
3950 "%s received \"%s\" reply from %s for unknown command",
3951 raft->local_nickname,
3952 raft_command_status_to_string(rpy->status),
3953 raft_get_nickname(raft, &rpy->common.sid,
3954 buf, sizeof buf));
3955 return;
3956 }
3957
3958 if (rpy->status == RAFT_CMD_INCOMPLETE) {
3959 cmd->timestamp = time_msec();
3960 } else {
3961 cmd->index = rpy->commit_index;
3962 raft_command_complete(raft, cmd, rpy->status);
3963 }
3964 }
3965
3966 static void
3967 raft_handle_rpc(struct raft *raft, const union raft_rpc *rpc)
3968 {
3969 uint64_t term = raft_rpc_get_term(rpc);
3970 if (term
3971 && !raft_should_suppress_disruptive_server(raft, rpc)
3972 && !raft_receive_term__(raft, &rpc->common, term)) {
3973 if (rpc->type == RAFT_RPC_APPEND_REQUEST) {
3974 /* Section 3.3: "If a server receives a request with a stale term
3975 * number, it rejects the request." */
3976 raft_send_append_reply(raft, raft_append_request_cast(rpc),
3977 RAFT_APPEND_INCONSISTENCY, "stale term");
3978 }
3979 return;
3980 }
3981
3982 switch (rpc->type) {
3983 #define RAFT_RPC(ENUM, NAME) \
3984 case ENUM: \
3985 raft_handle_##NAME(raft, &rpc->NAME); \
3986 break;
3987 RAFT_RPC_TYPES
3988 #undef RAFT_RPC
3989 default:
3990 OVS_NOT_REACHED();
3991 }
3992 }
3993 \f
3994 static bool
3995 raft_rpc_is_heartbeat(const union raft_rpc *rpc)
3996 {
3997 return ((rpc->type == RAFT_RPC_APPEND_REQUEST
3998 || rpc->type == RAFT_RPC_APPEND_REPLY)
3999 && rpc->common.comment
4000 && !strcmp(rpc->common.comment, "heartbeat"));
4001 }
4002
4003 \f
4004 static bool
4005 raft_send__(struct raft *raft, const union raft_rpc *rpc,
4006 struct raft_conn *conn)
4007 {
4008 log_rpc(rpc, "-->", conn);
4009 return !jsonrpc_session_send(
4010 conn->js, raft_rpc_to_jsonrpc(&raft->cid, &raft->sid, rpc));
4011 }
4012
4013 static bool
4014 raft_is_rpc_synced(const struct raft *raft, const union raft_rpc *rpc)
4015 {
4016 uint64_t term = raft_rpc_get_term(rpc);
4017 uint64_t index = raft_rpc_get_min_sync_index(rpc);
4018 const struct uuid *vote = raft_rpc_get_vote(rpc);
4019
4020 return (term <= raft->synced_term
4021 && index <= raft->log_synced
4022 && (!vote || uuid_equals(vote, &raft->synced_vote)));
4023 }
4024
4025 static bool
4026 raft_send(struct raft *raft, const union raft_rpc *rpc)
4027 {
4028 const struct uuid *dst = &rpc->common.sid;
4029 if (uuid_equals(dst, &raft->sid)) {
4030 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4031 VLOG_WARN_RL(&rl, "attempting to send RPC to self");
4032 return false;
4033 }
4034
4035 struct raft_conn *conn = raft_find_conn_by_sid(raft, dst);
4036 if (!conn) {
4037 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
4038 char buf[SID_LEN + 1];
4039 VLOG_DBG_RL(&rl, "%s: no connection to %s, cannot send RPC",
4040 raft->local_nickname,
4041 raft_get_nickname(raft, dst, buf, sizeof buf));
4042 return false;
4043 }
4044
4045 if (!raft_is_rpc_synced(raft, rpc)) {
4046 raft_waiter_create(raft, RAFT_W_RPC, false)->rpc = raft_rpc_clone(rpc);
4047 return true;
4048 }
4049
4050 return raft_send__(raft, rpc, conn);
4051 }
4052 \f
4053 static struct raft *
4054 raft_lookup_by_name(const char *name)
4055 {
4056 struct raft *raft;
4057
4058 HMAP_FOR_EACH_WITH_HASH (raft, hmap_node, hash_string(name, 0),
4059 &all_rafts) {
4060 if (!strcmp(raft->name, name)) {
4061 return raft;
4062 }
4063 }
4064 return NULL;
4065 }
4066
4067 static void
4068 raft_unixctl_cid(struct unixctl_conn *conn,
4069 int argc OVS_UNUSED, const char *argv[],
4070 void *aux OVS_UNUSED)
4071 {
4072 struct raft *raft = raft_lookup_by_name(argv[1]);
4073 if (!raft) {
4074 unixctl_command_reply_error(conn, "unknown cluster");
4075 } else if (uuid_is_zero(&raft->cid)) {
4076 unixctl_command_reply_error(conn, "cluster id not yet known");
4077 } else {
4078 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->cid));
4079 unixctl_command_reply(conn, uuid);
4080 free(uuid);
4081 }
4082 }
4083
4084 static void
4085 raft_unixctl_sid(struct unixctl_conn *conn,
4086 int argc OVS_UNUSED, const char *argv[],
4087 void *aux OVS_UNUSED)
4088 {
4089 struct raft *raft = raft_lookup_by_name(argv[1]);
4090 if (!raft) {
4091 unixctl_command_reply_error(conn, "unknown cluster");
4092 } else {
4093 char *uuid = xasprintf(UUID_FMT, UUID_ARGS(&raft->sid));
4094 unixctl_command_reply(conn, uuid);
4095 free(uuid);
4096 }
4097 }
4098
4099 static void
4100 raft_put_sid(const char *title, const struct uuid *sid,
4101 const struct raft *raft, struct ds *s)
4102 {
4103 ds_put_format(s, "%s: ", title);
4104 if (uuid_equals(sid, &raft->sid)) {
4105 ds_put_cstr(s, "self");
4106 } else if (uuid_is_zero(sid)) {
4107 ds_put_cstr(s, "unknown");
4108 } else {
4109 char buf[SID_LEN + 1];
4110 ds_put_cstr(s, raft_get_nickname(raft, sid, buf, sizeof buf));
4111 }
4112 ds_put_char(s, '\n');
4113 }
4114
4115 static void
4116 raft_unixctl_status(struct unixctl_conn *conn,
4117 int argc OVS_UNUSED, const char *argv[],
4118 void *aux OVS_UNUSED)
4119 {
4120 struct raft *raft = raft_lookup_by_name(argv[1]);
4121 if (!raft) {
4122 unixctl_command_reply_error(conn, "unknown cluster");
4123 return;
4124 }
4125
4126 struct ds s = DS_EMPTY_INITIALIZER;
4127 ds_put_format(&s, "%s\n", raft->local_nickname);
4128 ds_put_format(&s, "Name: %s\n", raft->name);
4129 ds_put_format(&s, "Cluster ID: ");
4130 if (!uuid_is_zero(&raft->cid)) {
4131 ds_put_format(&s, CID_FMT" ("UUID_FMT")\n",
4132 CID_ARGS(&raft->cid), UUID_ARGS(&raft->cid));
4133 } else {
4134 ds_put_format(&s, "not yet known\n");
4135 }
4136 ds_put_format(&s, "Server ID: "SID_FMT" ("UUID_FMT")\n",
4137 SID_ARGS(&raft->sid), UUID_ARGS(&raft->sid));
4138 ds_put_format(&s, "Address: %s\n", raft->local_address);
4139 ds_put_format(&s, "Status: %s\n",
4140 raft->joining ? "joining cluster"
4141 : raft->leaving ? "leaving cluster"
4142 : raft->left ? "left cluster"
4143 : raft->failed ? "failed"
4144 : "cluster member");
4145 if (raft->joining) {
4146 ds_put_format(&s, "Remotes for joining:");
4147 const char *address;
4148 SSET_FOR_EACH (address, &raft->remote_addresses) {
4149 ds_put_format(&s, " %s", address);
4150 }
4151 ds_put_char(&s, '\n');
4152 }
4153 if (raft->role == RAFT_LEADER) {
4154 struct raft_server *as;
4155 HMAP_FOR_EACH (as, hmap_node, &raft->add_servers) {
4156 ds_put_format(&s, "Adding server %s ("SID_FMT" at %s) (%s)\n",
4157 as->nickname, SID_ARGS(&as->sid), as->address,
4158 raft_server_phase_to_string(as->phase));
4159 }
4160
4161 struct raft_server *rs = raft->remove_server;
4162 if (rs) {
4163 ds_put_format(&s, "Removing server %s ("SID_FMT" at %s) (%s)\n",
4164 rs->nickname, SID_ARGS(&rs->sid), rs->address,
4165 raft_server_phase_to_string(rs->phase));
4166 }
4167 }
4168
4169 ds_put_format(&s, "Role: %s\n",
4170 raft->role == RAFT_LEADER ? "leader"
4171 : raft->role == RAFT_CANDIDATE ? "candidate"
4172 : raft->role == RAFT_FOLLOWER ? "follower"
4173 : "<error>");
4174 ds_put_format(&s, "Term: %"PRIu64"\n", raft->term);
4175 raft_put_sid("Leader", &raft->leader_sid, raft, &s);
4176 raft_put_sid("Vote", &raft->vote, raft, &s);
4177 ds_put_char(&s, '\n');
4178
4179 ds_put_format(&s, "Log: [%"PRIu64", %"PRIu64"]\n",
4180 raft->log_start, raft->log_end);
4181
4182 uint64_t n_uncommitted = raft->log_end - raft->commit_index - 1;
4183 ds_put_format(&s, "Entries not yet committed: %"PRIu64"\n", n_uncommitted);
4184
4185 uint64_t n_unapplied = raft->log_end - raft->last_applied - 1;
4186 ds_put_format(&s, "Entries not yet applied: %"PRIu64"\n", n_unapplied);
4187
4188 const struct raft_conn *c;
4189 ds_put_cstr(&s, "Connections:");
4190 LIST_FOR_EACH (c, list_node, &raft->conns) {
4191 bool connected = jsonrpc_session_is_connected(c->js);
4192 ds_put_format(&s, " %s%s%s%s",
4193 connected ? "" : "(",
4194 c->incoming ? "<-" : "->", c->nickname,
4195 connected ? "" : ")");
4196 }
4197 ds_put_char(&s, '\n');
4198
4199 ds_put_cstr(&s, "Servers:\n");
4200 struct raft_server *server;
4201 HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
4202 ds_put_format(&s, " %s ("SID_FMT" at %s)",
4203 server->nickname,
4204 SID_ARGS(&server->sid), server->address);
4205 if (uuid_equals(&server->sid, &raft->sid)) {
4206 ds_put_cstr(&s, " (self)");
4207 }
4208 if (server->phase != RAFT_PHASE_STABLE) {
4209 ds_put_format (&s, " (%s)",
4210 raft_server_phase_to_string(server->phase));
4211 }
4212 if (raft->role == RAFT_CANDIDATE) {
4213 if (!uuid_is_zero(&server->vote)) {
4214 char buf[SID_LEN + 1];
4215 ds_put_format(&s, " (voted for %s)",
4216 raft_get_nickname(raft, &server->vote,
4217 buf, sizeof buf));
4218 }
4219 } else if (raft->role == RAFT_LEADER) {
4220 ds_put_format(&s, " next_index=%"PRIu64" match_index=%"PRIu64,
4221 server->next_index, server->match_index);
4222 }
4223 ds_put_char(&s, '\n');
4224 }
4225
4226 unixctl_command_reply(conn, ds_cstr(&s));
4227 ds_destroy(&s);
4228 }
4229
4230 static void
4231 raft_unixctl_leave__(struct unixctl_conn *conn, struct raft *raft)
4232 {
4233 if (raft_is_leaving(raft)) {
4234 unixctl_command_reply_error(conn,
4235 "already in progress leaving cluster");
4236 } else if (raft_is_joining(raft)) {
4237 unixctl_command_reply_error(conn,
4238 "can't leave while join in progress");
4239 } else if (raft_failed(raft)) {
4240 unixctl_command_reply_error(conn,
4241 "can't leave after failure");
4242 } else {
4243 raft_leave(raft);
4244 unixctl_command_reply(conn, NULL);
4245 }
4246 }
4247
4248 static void
4249 raft_unixctl_leave(struct unixctl_conn *conn, int argc OVS_UNUSED,
4250 const char *argv[], void *aux OVS_UNUSED)
4251 {
4252 struct raft *raft = raft_lookup_by_name(argv[1]);
4253 if (!raft) {
4254 unixctl_command_reply_error(conn, "unknown cluster");
4255 return;
4256 }
4257
4258 raft_unixctl_leave__(conn, raft);
4259 }
4260
4261 static struct raft_server *
4262 raft_lookup_server_best_match(struct raft *raft, const char *id)
4263 {
4264 struct raft_server *best = NULL;
4265 int best_score = -1;
4266 int n_best = 0;
4267
4268 struct raft_server *s;
4269 HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
4270 int score = (!strcmp(id, s->address)
4271 ? INT_MAX
4272 : uuid_is_partial_match(&s->sid, id));
4273 if (score > best_score) {
4274 best = s;
4275 best_score = score;
4276 n_best = 1;
4277 } else if (score == best_score) {
4278 n_best++;
4279 }
4280 }
4281 return n_best == 1 ? best : NULL;
4282 }
4283
4284 static void
4285 raft_unixctl_kick(struct unixctl_conn *conn, int argc OVS_UNUSED,
4286 const char *argv[], void *aux OVS_UNUSED)
4287 {
4288 const char *cluster_name = argv[1];
4289 const char *server_name = argv[2];
4290
4291 struct raft *raft = raft_lookup_by_name(cluster_name);
4292 if (!raft) {
4293 unixctl_command_reply_error(conn, "unknown cluster");
4294 return;
4295 }
4296
4297 struct raft_server *server = raft_lookup_server_best_match(raft,
4298 server_name);
4299 if (!server) {
4300 unixctl_command_reply_error(conn, "unknown server");
4301 return;
4302 }
4303
4304 if (uuid_equals(&server->sid, &raft->sid)) {
4305 raft_unixctl_leave__(conn, raft);
4306 } else if (raft->role == RAFT_LEADER) {
4307 const struct raft_remove_server_request rq = {
4308 .sid = server->sid,
4309 .requester_conn = conn,
4310 };
4311 raft_handle_remove_server_request(raft, &rq);
4312 } else {
4313 const union raft_rpc rpc = {
4314 .remove_server_request = {
4315 .common = {
4316 .type = RAFT_RPC_REMOVE_SERVER_REQUEST,
4317 .sid = raft->leader_sid,
4318 .comment = "via unixctl"
4319 },
4320 .sid = server->sid,
4321 }
4322 };
4323 if (raft_send(raft, &rpc)) {
4324 unixctl_command_reply(conn, "sent removal request to leader");
4325 } else {
4326 unixctl_command_reply_error(conn,
4327 "failed to send removal request");
4328 }
4329 }
4330 }
4331
4332 static void
4333 raft_init(void)
4334 {
4335 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
4336 if (!ovsthread_once_start(&once)) {
4337 return;
4338 }
4339 unixctl_command_register("cluster/cid", "DB", 1, 1,
4340 raft_unixctl_cid, NULL);
4341 unixctl_command_register("cluster/sid", "DB", 1, 1,
4342 raft_unixctl_sid, NULL);
4343 unixctl_command_register("cluster/status", "DB", 1, 1,
4344 raft_unixctl_status, NULL);
4345 unixctl_command_register("cluster/leave", "DB", 1, 1,
4346 raft_unixctl_leave, NULL);
4347 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4348 raft_unixctl_kick, NULL);
4349 ovsthread_once_done(&once);
4350 }