2 * Copyright (c) 2017, 2018 Nicira, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include "raft-private.h"
28 #include "openvswitch/dynamic-string.h"
29 #include "openvswitch/hmap.h"
30 #include "openvswitch/json.h"
31 #include "openvswitch/list.h"
32 #include "openvswitch/poll-loop.h"
33 #include "openvswitch/vlog.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-parser.h"
36 #include "ovsdb/log.h"
39 #include "socket-util.h"
47 VLOG_DEFINE_THIS_MODULE(raft
);
49 /* Roles for a Raft server:
51 * - Followers: Servers in touch with the current leader.
53 * - Candidate: Servers unaware of a current leader and seeking election to
56 * - Leader: Handles all client requests. At most one at a time.
58 * In normal operation there is exactly one leader and all of the other servers
66 /* A connection between this Raft server and another one. */
68 struct ovs_list list_node
; /* In struct raft's 'conns' list. */
69 struct jsonrpc_session
*js
; /* JSON-RPC connection. */
70 struct uuid sid
; /* This server's unique ID. */
71 char *nickname
; /* Short name for use in log messages. */
72 bool incoming
; /* True if incoming, false if outgoing. */
73 unsigned int js_seqno
; /* Seqno for noticing (re)connections. */
76 static void raft_conn_close(struct raft_conn
*);
78 /* A "command", that is, a request to append an entry to the log.
80 * The Raft specification only allows clients to issue commands to the leader.
81 * With this implementation, clients may issue a command on any server, which
82 * then relays the command to the leader if necessary.
84 * This structure is thus used in three cases:
86 * 1. We are the leader and the command was issued to us directly.
88 * 2. We are a follower and relayed the command to the leader.
90 * 3. We are the leader and a follower relayed the command to us.
94 struct hmap_node hmap_node
; /* In struct raft's 'commands' hmap. */
95 unsigned int n_refs
; /* Reference count. */
96 enum raft_command_status status
; /* Execution status. */
99 uint64_t index
; /* Index in log (0 if being relayed). */
102 struct uuid eid
; /* Entry ID of result. */
105 long long int timestamp
; /* Issue or last ping time, for expiration. */
108 struct uuid sid
; /* The follower (otherwise UUID_ZERO). */
111 static void raft_command_complete(struct raft
*, struct raft_command
*,
112 enum raft_command_status
);
114 static void raft_complete_all_commands(struct raft
*,
115 enum raft_command_status
);
117 /* Type of deferred action, see struct raft_waiter. */
118 enum raft_waiter_type
{
124 /* An action deferred until a log write commits to disk. */
126 struct ovs_list list_node
;
127 uint64_t commit_ticket
;
129 enum raft_waiter_type type
;
133 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
134 * completion, updates 'log_synced' to indicate that the new log entry
135 * or entries are committed and, if we are leader, also updates our
136 * local 'match_index'. */
143 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
144 * Upon completion, updates 'synced_term' and 'synced_vote', which
145 * triggers sending RPCs deferred by the uncommitted 'term' and
154 * Sometimes, sending an RPC to a peer must be delayed until an entry,
155 * a term, or a vote mentioned in the RPC is synced to disk. This
156 * waiter keeps a copy of such an RPC until the previous waiters have
162 static struct raft_waiter
*raft_waiter_create(struct raft
*,
163 enum raft_waiter_type
,
165 static void raft_waiters_destroy(struct raft
*);
167 /* The Raft state machine. */
169 struct hmap_node hmap_node
; /* In 'all_rafts'. */
170 struct ovsdb_log
*log
;
172 /* Persistent derived state.
174 * This must be updated on stable storage before responding to RPCs. It can be
175 * derived from the header, snapshot, and log in 'log'. */
177 struct uuid cid
; /* Cluster ID (immutable for the cluster). */
178 struct uuid sid
; /* Server ID (immutable for the server). */
179 char *local_address
; /* Local address (immutable for the server). */
180 char *local_nickname
; /* Used for local server in log messages. */
181 char *name
; /* Schema name (immutable for the cluster). */
183 /* Contains "struct raft_server"s and represents the server configuration
184 * most recently added to 'log'. */
187 /* Persistent state on all servers.
189 * Must be updated on stable storage before responding to RPCs. */
191 /* Current term and the vote for that term. These might be on the way to
193 uint64_t term
; /* Initialized to 0 and only increases. */
194 struct uuid vote
; /* All-zeros if no vote yet in 'term'. */
196 /* The term and vote that have been synced to disk. */
197 uint64_t synced_term
;
198 struct uuid synced_vote
;
202 * A log entry with index 1 never really exists; the initial snapshot for a
203 * Raft is considered to include this index. The first real log entry has
206 * A new Raft instance contains an empty log: log_start=2, log_end=2.
207 * Over time, the log grows: log_start=2, log_end=N.
208 * At some point, the server takes a snapshot: log_start=N, log_end=N.
209 * The log continues to grow: log_start=N, log_end=N+1...
211 * Must be updated on stable storage before responding to RPCs. */
212 struct raft_entry
*entries
; /* Log entry i is in log[i - log_start]. */
213 uint64_t log_start
; /* Index of first entry in log. */
214 uint64_t log_end
; /* Index of last entry in log, plus 1. */
215 uint64_t log_synced
; /* Index of last synced entry. */
216 size_t allocated_log
; /* Allocated entries in 'log'. */
218 /* Snapshot state (see Figure 5.1)
220 * This is the state of the cluster as of the last discarded log entry,
221 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
222 * Only committed log entries can be included in a snapshot. */
223 struct raft_entry snap
;
227 * The snapshot is always committed, but the rest of the log might not be yet.
228 * 'last_applied' tracks what entries have been passed to the client. If the
229 * client hasn't yet read the latest snapshot, then even the snapshot isn't
230 * applied yet. Thus, the invariants are different for these members:
232 * log_start - 2 <= last_applied <= commit_index < log_end.
233 * log_start - 1 <= commit_index < log_end.
236 enum raft_role role
; /* Current role. */
237 uint64_t commit_index
; /* Max log index known to be committed. */
238 uint64_t last_applied
; /* Max log index applied to state machine. */
239 struct uuid leader_sid
; /* Server ID of leader (zero, if unknown). */
241 /* Followers and candidates only. */
242 #define ELECTION_BASE_MSEC 1024
243 #define ELECTION_RANGE_MSEC 1024
244 long long int election_base
; /* Time of last heartbeat from leader. */
245 long long int election_timeout
; /* Time at which we start an election. */
247 /* Used for joining a cluster. */
248 bool joining
; /* Attempting to join the cluster? */
249 struct sset remote_addresses
; /* Addresses to try to find other servers. */
250 long long int join_timeout
; /* Time to re-send add server request. */
252 /* Used for leaving a cluster. */
253 bool leaving
; /* True if we are leaving the cluster. */
254 bool left
; /* True if we have finished leaving. */
255 long long int leave_timeout
; /* Time to re-send remove server request. */
258 bool failed
; /* True if unrecoverable error has occurred. */
260 /* File synchronization. */
261 struct ovs_list waiters
; /* Contains "struct raft_waiter"s. */
263 /* Network connections. */
264 struct pstream
*listener
; /* For connections from other Raft servers. */
265 long long int listen_backoff
; /* For retrying creating 'listener'. */
266 struct ovs_list conns
; /* Contains struct raft_conns. */
268 /* Leaders only. Reinitialized after becoming leader. */
269 struct hmap add_servers
; /* Contains "struct raft_server"s to add. */
270 struct raft_server
*remove_server
; /* Server being removed. */
271 struct hmap commands
; /* Contains "struct raft_command"s. */
272 #define PING_TIME_MSEC (ELECTION_BASE_MSEC / 3)
273 long long int ping_timeout
; /* Time at which to send a heartbeat */
275 /* Candidates only. Reinitialized at start of election. */
276 int n_votes
; /* Number of votes for me. */
279 /* All Raft structures. */
280 static struct hmap all_rafts
= HMAP_INITIALIZER(&all_rafts
);
282 static void raft_init(void);
284 static struct ovsdb_error
*raft_read_header(struct raft
*)
285 OVS_WARN_UNUSED_RESULT
;
287 static void raft_send_execute_command_reply(struct raft
*,
288 const struct uuid
*sid
,
289 const struct uuid
*eid
,
290 enum raft_command_status
,
291 uint64_t commit_index
);
293 static void raft_update_our_match_index(struct raft
*, uint64_t min_index
);
295 static void raft_send_remove_server_reply__(
296 struct raft
*, const struct uuid
*target_sid
,
297 const struct uuid
*requester_sid
, struct unixctl_conn
*requester_conn
,
298 bool success
, const char *comment
);
300 static void raft_server_init_leader(struct raft
*, struct raft_server
*);
302 static bool raft_rpc_is_heartbeat(const union raft_rpc
*);
303 static bool raft_is_rpc_synced(const struct raft
*, const union raft_rpc
*);
305 static void raft_handle_rpc(struct raft
*, const union raft_rpc
*);
306 static bool raft_send(struct raft
*, const union raft_rpc
*);
307 static bool raft_send__(struct raft
*, const union raft_rpc
*,
309 static void raft_send_append_request(struct raft
*,
310 struct raft_server
*, unsigned int n
,
311 const char *comment
);
313 static void raft_become_leader(struct raft
*);
314 static void raft_become_follower(struct raft
*);
315 static void raft_reset_timer(struct raft
*);
316 static void raft_send_heartbeats(struct raft
*);
317 static void raft_start_election(struct raft
*, bool leadership_transfer
);
318 static bool raft_truncate(struct raft
*, uint64_t new_end
);
319 static void raft_get_servers_from_log(struct raft
*, enum vlog_level
);
321 static bool raft_handle_write_error(struct raft
*, struct ovsdb_error
*);
323 static void raft_run_reconfigure(struct raft
*);
325 static struct raft_server
*
326 raft_find_server(const struct raft
*raft
, const struct uuid
*sid
)
328 return raft_server_find(&raft
->servers
, sid
);
332 raft_make_address_passive(const char *address_
)
334 if (!strncmp(address_
, "unix:", 5)) {
335 return xasprintf("p%s", address_
);
337 char *address
= xstrdup(address_
);
338 char *p
= strchr(address
, ':') + 1;
339 char *host
= inet_parse_token(&p
);
340 char *port
= inet_parse_token(&p
);
342 struct ds paddr
= DS_EMPTY_INITIALIZER
;
343 ds_put_format(&paddr
, "p%.3s:%s:", address
, port
);
344 if (strchr(host
, ':')) {
345 ds_put_format(&paddr
, "[%s]", host
);
347 ds_put_cstr(&paddr
, host
);
350 return ds_steal_cstr(&paddr
);
359 struct raft
*raft
= xzalloc(sizeof *raft
);
360 hmap_node_nullify(&raft
->hmap_node
);
361 hmap_init(&raft
->servers
);
362 raft
->log_start
= raft
->log_end
= 1;
363 raft
->role
= RAFT_FOLLOWER
;
364 sset_init(&raft
->remote_addresses
);
365 raft
->join_timeout
= LLONG_MAX
;
366 ovs_list_init(&raft
->waiters
);
367 raft
->listen_backoff
= LLONG_MIN
;
368 ovs_list_init(&raft
->conns
);
369 hmap_init(&raft
->add_servers
);
370 hmap_init(&raft
->commands
);
372 raft
->ping_timeout
= time_msec() + PING_TIME_MSEC
;
373 raft_reset_timer(raft
);
378 /* Creates an on-disk file that represents a new Raft cluster and initializes
379 * it to consist of a single server, the one on which this function is called.
381 * Creates the local copy of the cluster's log in 'file_name', which must not
382 * already exist. Gives it the name 'name', which should be the database
383 * schema name and which is used only to match up this database with the server
384 * added to the cluster later if the cluster ID is unavailable.
386 * The new server is located at 'local_address', which must take one of the
387 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
388 * square bracket enclosed IPv6 address and PORT is a TCP port number.
390 * This only creates the on-disk file. Use raft_open() to start operating the
393 * Returns null if successful, otherwise an ovsdb_error describing the
395 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
396 raft_create_cluster(const char *file_name
, const char *name
,
397 const char *local_address
, const struct json
*data
)
399 /* Parse and verify validity of the local address. */
400 struct ovsdb_error
*error
= raft_address_validate(local_address
);
405 /* Create log file. */
406 struct ovsdb_log
*log
;
407 error
= ovsdb_log_open(file_name
, RAFT_MAGIC
, OVSDB_LOG_CREATE_EXCL
,
413 /* Write log file. */
414 struct raft_header h
= {
415 .sid
= uuid_random(),
416 .cid
= uuid_random(),
417 .name
= xstrdup(name
),
418 .local_address
= xstrdup(local_address
),
420 .remote_addresses
= SSET_INITIALIZER(&h
.remote_addresses
),
424 .data
= json_nullable_clone(data
),
425 .eid
= uuid_random(),
426 .servers
= json_object_create(),
429 shash_add_nocopy(json_object(h
.snap
.servers
),
430 xasprintf(UUID_FMT
, UUID_ARGS(&h
.sid
)),
431 json_string_create(local_address
));
432 error
= ovsdb_log_write_and_free(log
, raft_header_to_json(&h
));
433 raft_header_uninit(&h
);
435 error
= ovsdb_log_commit_block(log
);
437 ovsdb_log_close(log
);
442 /* Creates a database file that represents a new server in an existing Raft
445 * Creates the local copy of the cluster's log in 'file_name', which must not
446 * already exist. Gives it the name 'name', which must be the same name
447 * passed in to raft_create_cluster() earlier.
449 * 'cid' is optional. If specified, the new server will join only the cluster
450 * with the given cluster ID.
452 * The new server is located at 'local_address', which must take one of the
453 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
454 * square bracket enclosed IPv6 address and PORT is a TCP port number.
456 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
457 * specifies the addresses of existing servers in the cluster. One server out
458 * of the existing cluster is sufficient, as long as that server is reachable
459 * and not partitioned from the current cluster leader. If multiple servers
460 * from the cluster are specified, then it is sufficient for any of them to
461 * meet this criterion.
463 * This only creates the on-disk file and does no network access. Use
464 * raft_open() to start operating the new server. (Until this happens, the
465 * new server has not joined the cluster.)
467 * Returns null if successful, otherwise an ovsdb_error describing the
469 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
470 raft_join_cluster(const char *file_name
,
471 const char *name
, const char *local_address
,
472 const struct sset
*remote_addresses
,
473 const struct uuid
*cid
)
475 ovs_assert(!sset_is_empty(remote_addresses
));
477 /* Parse and verify validity of the addresses. */
478 struct ovsdb_error
*error
= raft_address_validate(local_address
);
483 SSET_FOR_EACH (addr
, remote_addresses
) {
484 error
= raft_address_validate(addr
);
488 if (!strcmp(addr
, local_address
)) {
489 return ovsdb_error(NULL
, "remote addresses cannot be the same "
490 "as the local address");
494 /* Verify validity of the cluster ID (if provided). */
495 if (cid
&& uuid_is_zero(cid
)) {
496 return ovsdb_error(NULL
, "all-zero UUID is not valid cluster ID");
499 /* Create log file. */
500 struct ovsdb_log
*log
;
501 error
= ovsdb_log_open(file_name
, RAFT_MAGIC
, OVSDB_LOG_CREATE_EXCL
,
507 /* Write log file. */
508 struct raft_header h
= {
509 .sid
= uuid_random(),
510 .cid
= cid
? *cid
: UUID_ZERO
,
511 .name
= xstrdup(name
),
512 .local_address
= xstrdup(local_address
),
514 /* No snapshot yet. */
516 sset_clone(&h
.remote_addresses
, remote_addresses
);
517 error
= ovsdb_log_write_and_free(log
, raft_header_to_json(&h
));
518 raft_header_uninit(&h
);
520 error
= ovsdb_log_commit_block(log
);
522 ovsdb_log_close(log
);
527 /* Reads the initial header record from 'log', which must be a Raft clustered
528 * database log, and populates '*md' with the information read from it. The
529 * caller must eventually destroy 'md' with raft_metadata_destroy().
531 * On success, returns NULL. On failure, returns an error that the caller must
532 * eventually destroy and zeros '*md'. */
533 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
534 raft_read_metadata(struct ovsdb_log
*log
, struct raft_metadata
*md
)
536 struct raft
*raft
= raft_alloc();
539 struct ovsdb_error
*error
= raft_read_header(raft
);
542 md
->name
= xstrdup(raft
->name
);
543 md
->local
= xstrdup(raft
->local_address
);
546 memset(md
, 0, sizeof *md
);
554 /* Frees the metadata in 'md'. */
556 raft_metadata_destroy(struct raft_metadata
*md
)
564 static const struct raft_entry
*
565 raft_get_entry(const struct raft
*raft
, uint64_t index
)
567 ovs_assert(index
>= raft
->log_start
);
568 ovs_assert(index
< raft
->log_end
);
569 return &raft
->entries
[index
- raft
->log_start
];
573 raft_get_term(const struct raft
*raft
, uint64_t index
)
575 return (index
== raft
->log_start
- 1
577 : raft_get_entry(raft
, index
)->term
);
580 static const struct json
*
581 raft_servers_for_index(const struct raft
*raft
, uint64_t index
)
583 ovs_assert(index
>= raft
->log_start
- 1);
584 ovs_assert(index
< raft
->log_end
);
586 const struct json
*servers
= raft
->snap
.servers
;
587 for (uint64_t i
= raft
->log_start
; i
<= index
; i
++) {
588 const struct raft_entry
*e
= raft_get_entry(raft
, i
);
590 servers
= e
->servers
;
597 raft_set_servers(struct raft
*raft
, const struct hmap
*new_servers
,
598 enum vlog_level level
)
600 struct raft_server
*s
, *next
;
601 HMAP_FOR_EACH_SAFE (s
, next
, hmap_node
, &raft
->servers
) {
602 if (!raft_server_find(new_servers
, &s
->sid
)) {
603 ovs_assert(s
!= raft
->remove_server
);
605 hmap_remove(&raft
->servers
, &s
->hmap_node
);
606 VLOG(level
, "server %s removed from configuration", s
->nickname
);
607 raft_server_destroy(s
);
611 HMAP_FOR_EACH_SAFE (s
, next
, hmap_node
, new_servers
) {
612 if (!raft_find_server(raft
, &s
->sid
)) {
613 VLOG(level
, "server %s added to configuration", s
->nickname
);
615 struct raft_server
*new
616 = raft_server_add(&raft
->servers
, &s
->sid
, s
->address
);
617 raft_server_init_leader(raft
, new);
623 raft_add_entry(struct raft
*raft
,
624 uint64_t term
, struct json
*data
, const struct uuid
*eid
,
625 struct json
*servers
)
627 if (raft
->log_end
- raft
->log_start
>= raft
->allocated_log
) {
628 raft
->entries
= x2nrealloc(raft
->entries
, &raft
->allocated_log
,
629 sizeof *raft
->entries
);
632 uint64_t index
= raft
->log_end
++;
633 struct raft_entry
*entry
= &raft
->entries
[index
- raft
->log_start
];
636 entry
->eid
= eid
? *eid
: UUID_ZERO
;
637 entry
->servers
= servers
;
641 /* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers' to
642 * 'raft''s log and returns an error indication. */
643 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
644 raft_write_entry(struct raft
*raft
, uint64_t term
, struct json
*data
,
645 const struct uuid
*eid
, struct json
*servers
)
647 struct raft_record r
= {
648 .type
= RAFT_REC_ENTRY
,
651 .index
= raft_add_entry(raft
, term
, data
, eid
, servers
),
654 .eid
= eid
? *eid
: UUID_ZERO
,
657 return ovsdb_log_write_and_free(raft
->log
, raft_record_to_json(&r
));
660 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
661 raft_write_state(struct ovsdb_log
*log
,
662 uint64_t term
, const struct uuid
*vote
)
664 struct raft_record r
= { .term
= term
};
665 if (vote
&& !uuid_is_zero(vote
)) {
666 r
.type
= RAFT_REC_VOTE
;
669 r
.type
= RAFT_REC_TERM
;
671 return ovsdb_log_write_and_free(log
, raft_record_to_json(&r
));
674 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
675 raft_apply_record(struct raft
*raft
, unsigned long long int rec_idx
,
676 const struct raft_record
*r
)
678 /* Apply "term", which is present in most kinds of records (and otherwise
681 * A Raft leader can replicate entries from previous terms to the other
682 * servers in the cluster, retaining the original terms on those entries
683 * (see section 3.6.2 "Committing entries from previous terms" for more
684 * information), so it's OK for the term in a log record to precede the
686 if (r
->term
> raft
->term
) {
687 raft
->term
= raft
->synced_term
= r
->term
;
688 raft
->vote
= raft
->synced_vote
= UUID_ZERO
;
693 if (r
->entry
.index
< raft
->commit_index
) {
694 return ovsdb_error(NULL
, "record %llu attempts to truncate log "
695 "from %"PRIu64
" to %"PRIu64
" entries, but "
696 "commit index is already %"PRIu64
,
697 rec_idx
, raft
->log_end
, r
->entry
.index
,
699 } else if (r
->entry
.index
> raft
->log_end
) {
700 return ovsdb_error(NULL
, "record %llu with index %"PRIu64
" skips "
701 "past expected index %"PRIu64
,
702 rec_idx
, r
->entry
.index
, raft
->log_end
);
705 if (r
->entry
.index
< raft
->log_end
) {
706 /* This can happen, but it is notable. */
707 VLOG_DBG("record %llu truncates log from %"PRIu64
" to %"PRIu64
708 " entries", rec_idx
, raft
->log_end
, r
->entry
.index
);
709 raft_truncate(raft
, r
->entry
.index
);
712 uint64_t prev_term
= (raft
->log_end
> raft
->log_start
713 ? raft
->entries
[raft
->log_end
714 - raft
->log_start
- 1].term
716 if (r
->term
< prev_term
) {
717 return ovsdb_error(NULL
, "record %llu with index %"PRIu64
" term "
718 "%"PRIu64
" precedes previous entry's term "
720 rec_idx
, r
->entry
.index
, r
->term
, prev_term
);
723 raft
->log_synced
= raft_add_entry(
725 json_nullable_clone(r
->entry
.data
), &r
->entry
.eid
,
726 json_nullable_clone(r
->entry
.servers
));
733 if (r
->term
< raft
->term
) {
734 return ovsdb_error(NULL
, "record %llu votes for term %"PRIu64
" "
735 "but current term is %"PRIu64
,
736 rec_idx
, r
->term
, raft
->term
);
737 } else if (!uuid_is_zero(&raft
->vote
)
738 && !uuid_equals(&raft
->vote
, &r
->sid
)) {
739 return ovsdb_error(NULL
, "record %llu votes for "SID_FMT
" in term "
740 "%"PRIu64
" but a previous record for the "
741 "same term voted for "SID_FMT
, rec_idx
,
742 SID_ARGS(&raft
->vote
), r
->term
,
745 raft
->vote
= raft
->synced_vote
= r
->sid
;
751 if (!strcmp(r
->note
, "left")) {
752 return ovsdb_error(NULL
, "record %llu indicates server has left "
753 "the cluster; it cannot be added back (use "
754 "\"ovsdb-tool join-cluster\" to add a new "
759 case RAFT_REC_COMMIT_INDEX
:
760 if (r
->commit_index
< raft
->commit_index
) {
761 return ovsdb_error(NULL
, "record %llu regresses commit index "
762 "from %"PRIu64
" to %"PRIu64
,
763 rec_idx
, raft
->commit_index
, r
->commit_index
);
764 } else if (r
->commit_index
>= raft
->log_end
) {
765 return ovsdb_error(NULL
, "record %llu advances commit index to "
766 "%"PRIu64
" but last log index is %"PRIu64
,
767 rec_idx
, r
->commit_index
, raft
->log_end
- 1);
769 raft
->commit_index
= r
->commit_index
;
774 case RAFT_REC_LEADER
:
775 /* XXX we could use this to take back leadership for quick restart */
783 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
784 raft_read_header(struct raft
*raft
)
786 /* Read header record. */
788 struct ovsdb_error
*error
= ovsdb_log_read(raft
->log
, &json
);
789 if (error
|| !json
) {
790 /* Report error or end-of-file. */
793 ovsdb_log_mark_base(raft
->log
);
795 struct raft_header h
;
796 error
= raft_header_from_json(&h
, json
);
804 raft
->name
= xstrdup(h
.name
);
805 raft
->local_address
= xstrdup(h
.local_address
);
806 raft
->local_nickname
= raft_address_to_nickname(h
.local_address
, &h
.sid
);
807 raft
->joining
= h
.joining
;
810 sset_clone(&raft
->remote_addresses
, &h
.remote_addresses
);
812 raft_entry_clone(&raft
->snap
, &h
.snap
);
813 raft
->log_start
= raft
->log_end
= h
.snap_index
+ 1;
814 raft
->commit_index
= h
.snap_index
;
815 raft
->last_applied
= h
.snap_index
- 1;
818 raft_header_uninit(&h
);
823 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
824 raft_read_log(struct raft
*raft
)
826 for (unsigned long long int i
= 1; ; i
++) {
828 struct ovsdb_error
*error
= ovsdb_log_read(raft
->log
, &json
);
831 /* We assume that the error is due to a partial write while
832 * appending to the file before a crash, so log it and
834 char *error_string
= ovsdb_error_to_string_free(error
);
835 VLOG_WARN("%s", error_string
);
842 struct raft_record r
;
843 error
= raft_record_from_json(&r
, json
);
845 error
= raft_apply_record(raft
, i
, &r
);
846 raft_record_uninit(&r
);
849 return ovsdb_wrap_error(error
, "error reading record %llu from "
850 "%s log", i
, raft
->name
);
854 /* Set the most recent servers. */
855 raft_get_servers_from_log(raft
, VLL_DBG
);
861 raft_reset_timer(struct raft
*raft
)
863 unsigned int duration
= (ELECTION_BASE_MSEC
864 + random_range(ELECTION_RANGE_MSEC
));
865 raft
->election_base
= time_msec();
866 raft
->election_timeout
= raft
->election_base
+ duration
;
870 raft_add_conn(struct raft
*raft
, struct jsonrpc_session
*js
,
871 const struct uuid
*sid
, bool incoming
)
873 struct raft_conn
*conn
= xzalloc(sizeof *conn
);
874 ovs_list_push_back(&raft
->conns
, &conn
->list_node
);
879 conn
->nickname
= raft_address_to_nickname(jsonrpc_session_get_name(js
),
881 conn
->incoming
= incoming
;
882 conn
->js_seqno
= jsonrpc_session_get_seqno(conn
->js
);
885 /* Starts the local server in an existing Raft cluster, using the local copy of
886 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
887 * successful or not. */
888 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
889 raft_open(struct ovsdb_log
*log
, struct raft
**raftp
)
891 struct raft
*raft
= raft_alloc();
894 struct ovsdb_error
*error
= raft_read_header(raft
);
899 if (!raft
->joining
) {
900 error
= raft_read_log(raft
);
905 /* Find our own server. */
906 if (!raft_find_server(raft
, &raft
->sid
)) {
907 error
= ovsdb_error(NULL
, "server does not belong to cluster");
911 /* If there's only one server, start an election right away so that the
912 * cluster bootstraps quickly. */
913 if (hmap_count(&raft
->servers
) == 1) {
914 raft_start_election(raft
, false);
917 raft
->join_timeout
= time_msec() + 1000;
921 hmap_insert(&all_rafts
, &raft
->hmap_node
, hash_string(raft
->name
, 0));
930 /* Returns the name of 'raft', which in OVSDB is the database schema name. */
932 raft_get_name(const struct raft
*raft
)
937 /* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
938 * its cluster, then 'cid' will be all-zeros (unless the administrator
939 * specified a cluster ID running "ovsdb-tool join-cluster").
941 * Each cluster has a unique cluster ID. */
943 raft_get_cid(const struct raft
*raft
)
948 /* Returns the server ID of 'raft'. Each server has a unique server ID. */
950 raft_get_sid(const struct raft
*raft
)
955 /* Returns true if 'raft' has completed joining its cluster, has not left or
956 * initiated leaving the cluster, does not have failed disk storage, and is
957 * apparently connected to the leader in a healthy way (or is itself the
960 raft_is_connected(const struct raft
*raft
)
962 return (raft
->role
!= RAFT_CANDIDATE
969 /* Returns true if 'raft' is the cluster leader. */
971 raft_is_leader(const struct raft
*raft
)
973 return raft
->role
== RAFT_LEADER
;
976 /* Returns true if 'raft' is the process of joining its cluster. */
978 raft_is_joining(const struct raft
*raft
)
980 return raft
->joining
;
983 /* Only returns *connected* connections. */
984 static struct raft_conn
*
985 raft_find_conn_by_sid(struct raft
*raft
, const struct uuid
*sid
)
987 if (!uuid_is_zero(sid
)) {
988 struct raft_conn
*conn
;
989 LIST_FOR_EACH (conn
, list_node
, &raft
->conns
) {
990 if (uuid_equals(sid
, &conn
->sid
)
991 && jsonrpc_session_is_connected(conn
->js
)) {
999 static struct raft_conn
*
1000 raft_find_conn_by_address(struct raft
*raft
, const char *address
)
1002 struct raft_conn
*conn
;
1003 LIST_FOR_EACH (conn
, list_node
, &raft
->conns
) {
1004 if (!strcmp(jsonrpc_session_get_name(conn
->js
), address
)) {
1011 static void OVS_PRINTF_FORMAT(3, 4)
1012 raft_record_note(struct raft
*raft
, const char *note
,
1013 const char *comment_format
, ...)
1016 va_start(args
, comment_format
);
1017 char *comment
= xvasprintf(comment_format
, args
);
1020 struct raft_record r
= {
1021 .type
= RAFT_REC_NOTE
,
1023 .note
= CONST_CAST(char *, note
),
1025 ignore(ovsdb_log_write_and_free(raft
->log
, raft_record_to_json(&r
)));
1030 /* If we're leader, try to transfer leadership to another server, logging
1031 * 'reason' as the human-readable reason (it should be a phrase suitable for
1032 * following "because") . */
1034 raft_transfer_leadership(struct raft
*raft
, const char *reason
)
1036 if (raft
->role
!= RAFT_LEADER
) {
1040 struct raft_server
*s
;
1041 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
1042 if (!uuid_equals(&raft
->sid
, &s
->sid
)
1043 && s
->phase
== RAFT_PHASE_STABLE
) {
1044 struct raft_conn
*conn
= raft_find_conn_by_sid(raft
, &s
->sid
);
1049 union raft_rpc rpc
= {
1052 .comment
= CONST_CAST(char *, reason
),
1053 .type
= RAFT_RPC_BECOME_LEADER
,
1059 raft_send__(raft
, &rpc
, conn
);
1061 raft_record_note(raft
, "transfer leadership",
1062 "transferring leadership to %s because %s",
1063 s
->nickname
, reason
);
1069 /* Send a RemoveServerRequest to the rest of the servers in the cluster.
1071 * If we know which server is the leader, we can just send the request to it.
1072 * However, we might not know which server is the leader, and we might never
1073 * find out if the remove request was actually previously committed by a
1074 * majority of the servers (because in that case the new leader will not send
1075 * AppendRequests or heartbeats to us). Therefore, we instead send
1076 * RemoveRequests to every server. This theoretically has the same problem, if
1077 * the current cluster leader was not previously a member of the cluster, but
1078 * it seems likely to be more robust in practice. */
1080 raft_send_remove_server_requests(struct raft
*raft
)
1082 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
1083 VLOG_INFO_RL(&rl
, "sending remove request (joining=%s, leaving=%s)",
1084 raft
->joining
? "true" : "false",
1085 raft
->leaving
? "true" : "false");
1086 const struct raft_server
*s
;
1087 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
1088 if (!uuid_equals(&s
->sid
, &raft
->sid
)) {
1089 union raft_rpc rpc
= (union raft_rpc
) {
1090 .remove_server_request
= {
1092 .type
= RAFT_RPC_REMOVE_SERVER_REQUEST
,
1098 raft_send(raft
, &rpc
);
1102 raft
->leave_timeout
= time_msec() + ELECTION_BASE_MSEC
;
1105 /* Attempts to start 'raft' leaving its cluster. The caller can check progress
1106 * using raft_is_leaving() and raft_left(). */
1108 raft_leave(struct raft
*raft
)
1110 if (raft
->joining
|| raft
->failed
|| raft
->leaving
|| raft
->left
) {
1113 VLOG_INFO(SID_FMT
": starting to leave cluster "CID_FMT
,
1114 SID_ARGS(&raft
->sid
), CID_ARGS(&raft
->cid
));
1115 raft
->leaving
= true;
1116 raft_transfer_leadership(raft
, "this server is leaving the cluster");
1117 raft_become_follower(raft
);
1118 raft_send_remove_server_requests(raft
);
1119 raft
->leave_timeout
= time_msec() + ELECTION_BASE_MSEC
;
1122 /* Returns true if 'raft' is currently attempting to leave its cluster. */
1124 raft_is_leaving(const struct raft
*raft
)
1126 return raft
->leaving
;
1129 /* Returns true if 'raft' successfully left its cluster. */
1131 raft_left(const struct raft
*raft
)
1136 /* Returns true if 'raft' has experienced a disk I/O failure. When this
1137 * returns true, only closing and reopening 'raft' allows for recovery. */
1139 raft_failed(const struct raft
*raft
)
1141 return raft
->failed
;
1144 /* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1145 * current cluster. */
1147 raft_take_leadership(struct raft
*raft
)
1149 if (raft
->role
!= RAFT_LEADER
) {
1150 raft_start_election(raft
, true);
1154 /* Closes everything owned by 'raft' that might be visible outside the process:
1155 * network connections, commands, etc. This is part of closing 'raft'; it is
1156 * also used if 'raft' has failed in an unrecoverable way. */
1158 raft_close__(struct raft
*raft
)
1160 if (!hmap_node_is_null(&raft
->hmap_node
)) {
1161 hmap_remove(&all_rafts
, &raft
->hmap_node
);
1162 hmap_node_nullify(&raft
->hmap_node
);
1165 raft_complete_all_commands(raft
, RAFT_CMD_SHUTDOWN
);
1167 struct raft_server
*rs
= raft
->remove_server
;
1169 raft_send_remove_server_reply__(raft
, &rs
->sid
, &rs
->requester_sid
,
1170 rs
->requester_conn
, false,
1171 RAFT_SERVER_SHUTDOWN
);
1172 raft_server_destroy(raft
->remove_server
);
1173 raft
->remove_server
= NULL
;
1176 struct raft_conn
*conn
, *next
;
1177 LIST_FOR_EACH_SAFE (conn
, next
, list_node
, &raft
->conns
) {
1178 raft_conn_close(conn
);
1182 /* Closes and frees 'raft'.
1184 * A server's cluster membership is independent of whether the server is
1185 * actually running. When a server that is a member of a cluster closes, the
1186 * cluster treats this as a server failure. */
1188 raft_close(struct raft
*raft
)
1194 raft_transfer_leadership(raft
, "this server is shutting down");
1198 ovsdb_log_close(raft
->log
);
1200 raft_servers_destroy(&raft
->servers
);
1202 for (uint64_t index
= raft
->log_start
; index
< raft
->log_end
; index
++) {
1203 struct raft_entry
*e
= &raft
->entries
[index
- raft
->log_start
];
1204 raft_entry_uninit(e
);
1206 free(raft
->entries
);
1208 raft_entry_uninit(&raft
->snap
);
1210 raft_waiters_destroy(raft
);
1212 raft_servers_destroy(&raft
->add_servers
);
1214 hmap_destroy(&raft
->commands
);
1216 pstream_close(raft
->listener
);
1218 sset_destroy(&raft
->remote_addresses
);
1219 free(raft
->local_address
);
1220 free(raft
->local_nickname
);
1227 raft_conn_receive(struct raft
*raft
, struct raft_conn
*conn
,
1228 union raft_rpc
*rpc
)
1230 struct jsonrpc_msg
*msg
= jsonrpc_session_recv(conn
->js
);
1235 struct ovsdb_error
*error
= raft_rpc_from_jsonrpc(&raft
->cid
, &raft
->sid
,
1237 jsonrpc_msg_destroy(msg
);
1239 char *s
= ovsdb_error_to_string_free(error
);
1240 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn
->js
), s
);
1245 if (uuid_is_zero(&conn
->sid
)) {
1246 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(50, 50);
1247 conn
->sid
= rpc
->common
.sid
;
1248 VLOG_INFO_RL(&rl
, "%s: learned server ID "SID_FMT
,
1249 jsonrpc_session_get_name(conn
->js
), SID_ARGS(&conn
->sid
));
1250 } else if (!uuid_equals(&conn
->sid
, &rpc
->common
.sid
)) {
1251 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
1252 VLOG_WARN_RL(&rl
, "%s: ignoring message with unexpected server ID "
1253 SID_FMT
" (expected "SID_FMT
")",
1254 jsonrpc_session_get_name(conn
->js
),
1255 SID_ARGS(&rpc
->common
.sid
), SID_ARGS(&conn
->sid
));
1256 raft_rpc_uninit(rpc
);
1260 const char *address
= (rpc
->type
== RAFT_RPC_HELLO_REQUEST
1261 ? rpc
->hello_request
.address
1262 : rpc
->type
== RAFT_RPC_ADD_SERVER_REQUEST
1263 ? rpc
->add_server_request
.address
1266 char *new_nickname
= raft_address_to_nickname(address
, &conn
->sid
);
1267 if (strcmp(conn
->nickname
, new_nickname
)) {
1268 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(50, 50);
1269 VLOG_INFO_RL(&rl
, "%s: learned remote address %s",
1270 jsonrpc_session_get_name(conn
->js
), address
);
1272 free(conn
->nickname
);
1273 conn
->nickname
= new_nickname
;
1283 raft_get_nickname(const struct raft
*raft
, const struct uuid
*sid
,
1284 char buf
[SID_LEN
+ 1], size_t bufsize
)
1286 if (uuid_equals(sid
, &raft
->sid
)) {
1287 return raft
->local_nickname
;
1290 const char *s
= raft_servers_get_nickname__(&raft
->servers
, sid
);
1295 return raft_servers_get_nickname(&raft
->add_servers
, sid
, buf
, bufsize
);
1299 log_rpc(const union raft_rpc
*rpc
,
1300 const char *direction
, const struct raft_conn
*conn
)
1302 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(600, 600);
1303 if (!raft_rpc_is_heartbeat(rpc
) && !VLOG_DROP_DBG(&rl
)) {
1304 struct ds s
= DS_EMPTY_INITIALIZER
;
1305 raft_rpc_format(rpc
, &s
);
1306 VLOG_DBG("%s%s %s", direction
, conn
->nickname
, ds_cstr(&s
));
1312 raft_send_add_server_request(struct raft
*raft
, struct raft_conn
*conn
)
1314 union raft_rpc rq
= {
1315 .add_server_request
= {
1317 .type
= RAFT_RPC_ADD_SERVER_REQUEST
,
1321 .address
= raft
->local_address
,
1324 raft_send__(raft
, &rq
, conn
);
1328 raft_conn_run(struct raft
*raft
, struct raft_conn
*conn
)
1330 jsonrpc_session_run(conn
->js
);
1332 unsigned int new_seqno
= jsonrpc_session_get_seqno(conn
->js
);
1333 bool just_connected
= (new_seqno
!= conn
->js_seqno
1334 && jsonrpc_session_is_connected(conn
->js
));
1335 conn
->js_seqno
= new_seqno
;
1336 if (just_connected
) {
1337 if (raft
->joining
) {
1338 raft_send_add_server_request(raft
, conn
);
1339 } else if (raft
->leaving
) {
1340 union raft_rpc rq
= {
1341 .remove_server_request
= {
1343 .type
= RAFT_RPC_REMOVE_SERVER_REQUEST
,
1349 raft_send__(raft
, &rq
, conn
);
1351 union raft_rpc rq
= (union raft_rpc
) {
1354 .type
= RAFT_RPC_HELLO_REQUEST
,
1357 .address
= raft
->local_address
,
1360 raft_send__(raft
, &rq
, conn
);
1364 for (size_t i
= 0; i
< 50; i
++) {
1366 if (!raft_conn_receive(raft
, conn
, &rpc
)) {
1370 log_rpc(&rpc
, "<--", conn
);
1371 raft_handle_rpc(raft
, &rpc
);
1372 raft_rpc_uninit(&rpc
);
1377 raft_waiter_complete_rpc(struct raft
*raft
, const union raft_rpc
*rpc
)
1379 uint64_t term
= raft_rpc_get_term(rpc
);
1380 if (term
&& term
< raft
->term
) {
1381 /* Drop the message because it's for an expired term. */
1385 if (!raft_is_rpc_synced(raft
, rpc
)) {
1386 /* This is a bug. A reply message is deferred because some state in
1387 * the message, such as a term or index, has not been committed to
1388 * disk, and they should only be completed when that commit is done.
1389 * But this message is being completed before the commit is finished.
1390 * Complain, and hope that someone reports the bug. */
1391 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
1392 if (VLOG_DROP_ERR(&rl
)) {
1396 struct ds s
= DS_EMPTY_INITIALIZER
;
1398 if (term
> raft
->synced_term
) {
1399 ds_put_format(&s
, " because message term %"PRIu64
" is "
1400 "past synced term %"PRIu64
,
1401 term
, raft
->synced_term
);
1404 uint64_t index
= raft_rpc_get_min_sync_index(rpc
);
1405 if (index
> raft
->log_synced
) {
1406 ds_put_format(&s
, " %s message index %"PRIu64
" is past last "
1407 "synced index %"PRIu64
,
1408 s
.length
? "and" : "because",
1409 index
, raft
->log_synced
);
1412 const struct uuid
*vote
= raft_rpc_get_vote(rpc
);
1413 if (vote
&& !uuid_equals(vote
, &raft
->synced_vote
)) {
1414 char buf1
[SID_LEN
+ 1];
1415 char buf2
[SID_LEN
+ 1];
1416 ds_put_format(&s
, " %s vote %s differs from synced vote %s",
1417 s
.length
? "and" : "because",
1418 raft_get_nickname(raft
, vote
, buf1
, sizeof buf1
),
1419 raft_get_nickname(raft
, &raft
->synced_vote
,
1420 buf2
, sizeof buf2
));
1423 char buf
[SID_LEN
+ 1];
1424 ds_put_format(&s
, ": %s ",
1425 raft_get_nickname(raft
, &rpc
->common
.sid
,
1427 raft_rpc_format(rpc
, &s
);
1428 VLOG_ERR("internal error: deferred %s message completed "
1429 "but not ready to send%s",
1430 raft_rpc_type_to_string(rpc
->type
), ds_cstr(&s
));
1436 struct raft_conn
*dst
= raft_find_conn_by_sid(raft
, &rpc
->common
.sid
);
1438 raft_send__(raft
, rpc
, dst
);
1443 raft_waiter_complete(struct raft
*raft
, struct raft_waiter
*w
)
1447 if (raft
->role
== RAFT_LEADER
) {
1448 raft_update_our_match_index(raft
, w
->entry
.index
);
1450 raft
->log_synced
= w
->entry
.index
;
1454 raft
->synced_term
= w
->term
.term
;
1455 raft
->synced_vote
= w
->term
.vote
;
1459 raft_waiter_complete_rpc(raft
, w
->rpc
);
1465 raft_waiter_destroy(struct raft_waiter
*w
)
1471 ovs_list_remove(&w
->list_node
);
1479 raft_rpc_uninit(w
->rpc
);
1487 raft_waiters_run(struct raft
*raft
)
1489 if (ovs_list_is_empty(&raft
->waiters
)) {
1493 uint64_t cur
= ovsdb_log_commit_progress(raft
->log
);
1494 struct raft_waiter
*w
, *next
;
1495 LIST_FOR_EACH_SAFE (w
, next
, list_node
, &raft
->waiters
) {
1496 if (cur
< w
->commit_ticket
) {
1499 raft_waiter_complete(raft
, w
);
1500 raft_waiter_destroy(w
);
1505 raft_waiters_wait(struct raft
*raft
)
1507 struct raft_waiter
*w
;
1508 LIST_FOR_EACH (w
, list_node
, &raft
->waiters
) {
1509 ovsdb_log_commit_wait(raft
->log
, w
->commit_ticket
);
1515 raft_waiters_destroy(struct raft
*raft
)
1517 struct raft_waiter
*w
, *next
;
1518 LIST_FOR_EACH_SAFE (w
, next
, list_node
, &raft
->waiters
) {
1519 raft_waiter_destroy(w
);
1523 static bool OVS_WARN_UNUSED_RESULT
1524 raft_set_term(struct raft
*raft
, uint64_t term
, const struct uuid
*vote
)
1526 struct ovsdb_error
*error
= raft_write_state(raft
->log
, term
, vote
);
1527 if (!raft_handle_write_error(raft
, error
)) {
1531 struct raft_waiter
*w
= raft_waiter_create(raft
, RAFT_W_TERM
, true);
1532 raft
->term
= w
->term
.term
= term
;
1533 raft
->vote
= w
->term
.vote
= vote
? *vote
: UUID_ZERO
;
1538 raft_accept_vote(struct raft
*raft
, struct raft_server
*s
,
1539 const struct uuid
*vote
)
1541 if (uuid_equals(&s
->vote
, vote
)) {
1544 if (!uuid_is_zero(&s
->vote
)) {
1545 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 1);
1546 char buf1
[SID_LEN
+ 1];
1547 char buf2
[SID_LEN
+ 1];
1548 VLOG_WARN_RL(&rl
, "server %s changed its vote from %s to %s",
1550 raft_get_nickname(raft
, &s
->vote
, buf1
, sizeof buf1
),
1551 raft_get_nickname(raft
, vote
, buf2
, sizeof buf2
));
1554 if (uuid_equals(vote
, &raft
->sid
)
1555 && ++raft
->n_votes
> hmap_count(&raft
->servers
) / 2) {
1556 raft_become_leader(raft
);
1561 raft_start_election(struct raft
*raft
, bool leadership_transfer
)
1563 if (raft
->leaving
) {
1567 struct raft_server
*me
= raft_find_server(raft
, &raft
->sid
);
1572 if (!raft_set_term(raft
, raft
->term
+ 1, &raft
->sid
)) {
1576 raft_complete_all_commands(raft
, RAFT_CMD_LOST_LEADERSHIP
);
1578 ovs_assert(raft
->role
!= RAFT_LEADER
);
1579 ovs_assert(hmap_is_empty(&raft
->commands
));
1580 raft
->role
= RAFT_CANDIDATE
;
1584 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
1585 if (!VLOG_DROP_INFO(&rl
)) {
1586 long long int now
= time_msec();
1587 if (now
>= raft
->election_timeout
) {
1588 VLOG_INFO("term %"PRIu64
": %lld ms timeout expired, "
1589 "starting election",
1590 raft
->term
, now
- raft
->election_base
);
1592 VLOG_INFO("term %"PRIu64
": starting election", raft
->term
);
1595 raft_reset_timer(raft
);
1597 struct raft_server
*peer
;
1598 HMAP_FOR_EACH (peer
, hmap_node
, &raft
->servers
) {
1599 peer
->vote
= UUID_ZERO
;
1600 if (uuid_equals(&raft
->sid
, &peer
->sid
)) {
1604 union raft_rpc rq
= {
1607 .type
= RAFT_RPC_VOTE_REQUEST
,
1611 .last_log_index
= raft
->log_end
- 1,
1613 raft
->log_end
> raft
->log_start
1614 ? raft
->entries
[raft
->log_end
- raft
->log_start
- 1].term
1616 .leadership_transfer
= leadership_transfer
,
1619 raft_send(raft
, &rq
);
1622 /* Vote for ourselves. */
1623 raft_accept_vote(raft
, me
, &raft
->sid
);
1627 raft_open_conn(struct raft
*raft
, const char *address
, const struct uuid
*sid
)
1629 if (strcmp(address
, raft
->local_address
)
1630 && !raft_find_conn_by_address(raft
, address
)) {
1631 raft_add_conn(raft
, jsonrpc_session_open(address
, true), sid
, false);
1636 raft_conn_close(struct raft_conn
*conn
)
1638 jsonrpc_session_close(conn
->js
);
1639 ovs_list_remove(&conn
->list_node
);
1640 free(conn
->nickname
);
1644 /* Returns true if 'conn' should stay open, false if it should be closed. */
1646 raft_conn_should_stay_open(struct raft
*raft
, struct raft_conn
*conn
)
1648 /* Close the connection if it's actually dead. If necessary, we'll
1649 * initiate a new session later. */
1650 if (!jsonrpc_session_is_alive(conn
->js
)) {
1654 /* Keep incoming sessions. We trust the originator to decide to drop
1656 if (conn
->incoming
) {
1660 /* If we are joining the cluster, keep sessions to the remote addresses
1661 * that are supposed to be part of the cluster we're joining. */
1662 if (raft
->joining
&& sset_contains(&raft
->remote_addresses
,
1663 jsonrpc_session_get_name(conn
->js
))) {
1667 /* We have joined the cluster. If we did that "recently", then there is a
1668 * chance that we do not have the most recent server configuration log
1669 * entry. If so, it's a waste to disconnect from the servers that were in
1670 * remote_addresses and that will probably appear in the configuration,
1671 * just to reconnect to them a moment later when we do get the
1672 * configuration update. If we are not ourselves in the configuration,
1673 * then we know that there must be a new configuration coming up, so in
1674 * that case keep the connection. */
1675 if (!raft_find_server(raft
, &raft
->sid
)) {
1679 /* Keep the connection only if the server is part of the configuration. */
1680 return raft_find_server(raft
, &conn
->sid
);
1683 /* Allows 'raft' to maintain the distributed log. Call this function as part
1684 * of the process's main loop. */
1686 raft_run(struct raft
*raft
)
1688 if (raft
->left
|| raft
->failed
) {
1692 raft_waiters_run(raft
);
1694 if (!raft
->listener
&& time_msec() >= raft
->listen_backoff
) {
1695 char *paddr
= raft_make_address_passive(raft
->local_address
);
1696 int error
= pstream_open(paddr
, &raft
->listener
, DSCP_DEFAULT
);
1698 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
1699 VLOG_WARN_RL(&rl
, "%s: listen failed (%s)",
1700 paddr
, ovs_strerror(error
));
1701 raft
->listen_backoff
= time_msec() + 1000;
1706 if (raft
->listener
) {
1707 struct stream
*stream
;
1708 int error
= pstream_accept(raft
->listener
, &stream
);
1710 raft_add_conn(raft
, jsonrpc_session_open_unreliably(
1711 jsonrpc_open(stream
), DSCP_DEFAULT
), NULL
,
1713 } else if (error
!= EAGAIN
) {
1714 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 1);
1715 VLOG_WARN_RL(&rl
, "%s: accept failed: %s",
1716 pstream_get_name(raft
->listener
),
1717 ovs_strerror(error
));
1721 /* Run RPCs for all open sessions. */
1722 struct raft_conn
*conn
;
1723 LIST_FOR_EACH (conn
, list_node
, &raft
->conns
) {
1724 raft_conn_run(raft
, conn
);
1727 /* Close unneeded sessions. */
1728 struct raft_conn
*next
;
1729 LIST_FOR_EACH_SAFE (conn
, next
, list_node
, &raft
->conns
) {
1730 if (!raft_conn_should_stay_open(raft
, conn
)) {
1731 raft_conn_close(conn
);
1735 /* Open needed sessions. */
1736 struct raft_server
*server
;
1737 HMAP_FOR_EACH (server
, hmap_node
, &raft
->servers
) {
1738 raft_open_conn(raft
, server
->address
, &server
->sid
);
1740 if (raft
->joining
) {
1741 const char *address
;
1742 SSET_FOR_EACH (address
, &raft
->remote_addresses
) {
1743 raft_open_conn(raft
, address
, NULL
);
1747 if (!raft
->joining
&& time_msec() >= raft
->election_timeout
) {
1748 raft_start_election(raft
, false);
1751 if (raft
->leaving
&& time_msec() >= raft
->leave_timeout
) {
1752 raft_send_remove_server_requests(raft
);
1755 if (raft
->joining
&& time_msec() >= raft
->join_timeout
) {
1756 raft
->join_timeout
= time_msec() + 1000;
1757 LIST_FOR_EACH (conn
, list_node
, &raft
->conns
) {
1758 raft_send_add_server_request(raft
, conn
);
1762 if (time_msec() >= raft
->ping_timeout
) {
1763 if (raft
->role
== RAFT_LEADER
) {
1764 raft_send_heartbeats(raft
);
1766 long long int now
= time_msec();
1767 struct raft_command
*cmd
, *next_cmd
;
1768 HMAP_FOR_EACH_SAFE (cmd
, next_cmd
, hmap_node
, &raft
->commands
) {
1770 && now
- cmd
->timestamp
> ELECTION_BASE_MSEC
) {
1771 raft_command_complete(raft
, cmd
, RAFT_CMD_TIMEOUT
);
1775 raft
->ping_timeout
= time_msec() + PING_TIME_MSEC
;
1778 /* Do this only at the end; if we did it as soon as we set raft->left or
1779 * raft->failed in handling the RemoveServerReply, then it could easily
1780 * cause references to freed memory in RPC sessions, etc. */
1781 if (raft
->left
|| raft
->failed
) {
1787 raft_wait_session(struct jsonrpc_session
*js
)
1790 jsonrpc_session_wait(js
);
1791 jsonrpc_session_recv_wait(js
);
1795 /* Causes the next call to poll_block() to wake up when 'raft' needs to do
1798 raft_wait(struct raft
*raft
)
1800 if (raft
->left
|| raft
->failed
) {
1804 raft_waiters_wait(raft
);
1806 if (raft
->listener
) {
1807 pstream_wait(raft
->listener
);
1809 poll_timer_wait_until(raft
->listen_backoff
);
1812 struct raft_conn
*conn
;
1813 LIST_FOR_EACH (conn
, list_node
, &raft
->conns
) {
1814 raft_wait_session(conn
->js
);
1817 if (!raft
->joining
) {
1818 poll_timer_wait_until(raft
->election_timeout
);
1820 poll_timer_wait_until(raft
->join_timeout
);
1822 if (raft
->leaving
) {
1823 poll_timer_wait_until(raft
->leave_timeout
);
1825 if (raft
->role
== RAFT_LEADER
|| !hmap_is_empty(&raft
->commands
)) {
1826 poll_timer_wait_until(raft
->ping_timeout
);
1830 static struct raft_waiter
*
1831 raft_waiter_create(struct raft
*raft
, enum raft_waiter_type type
,
1834 struct raft_waiter
*w
= xzalloc(sizeof *w
);
1835 ovs_list_push_back(&raft
->waiters
, &w
->list_node
);
1836 w
->commit_ticket
= start_commit
? ovsdb_log_commit_start(raft
->log
) : 0;
1841 /* Returns a human-readable representation of 'status' (or NULL if 'status' is
1844 raft_command_status_to_string(enum raft_command_status status
)
1847 case RAFT_CMD_INCOMPLETE
:
1848 return "operation still in progress";
1849 case RAFT_CMD_SUCCESS
:
1851 case RAFT_CMD_NOT_LEADER
:
1852 return "not leader";
1853 case RAFT_CMD_BAD_PREREQ
:
1854 return "prerequisite check failed";
1855 case RAFT_CMD_LOST_LEADERSHIP
:
1856 return "lost leadership";
1857 case RAFT_CMD_SHUTDOWN
:
1858 return "server shutdown";
1859 case RAFT_CMD_IO_ERROR
:
1861 case RAFT_CMD_TIMEOUT
:
1868 /* Converts human-readable status in 's' into status code in '*statusp'.
1869 * Returns true if successful, false if 's' is unknown. */
1871 raft_command_status_from_string(const char *s
,
1872 enum raft_command_status
*statusp
)
1874 for (enum raft_command_status status
= 0; ; status
++) {
1875 const char *s2
= raft_command_status_to_string(status
);
1879 } else if (!strcmp(s
, s2
)) {
1886 static const struct uuid
*
1887 raft_get_eid(const struct raft
*raft
, uint64_t index
)
1889 for (; index
>= raft
->log_start
; index
--) {
1890 const struct raft_entry
*e
= raft_get_entry(raft
, index
);
1895 return &raft
->snap
.eid
;
1898 static const struct uuid
*
1899 raft_current_eid(const struct raft
*raft
)
1901 return raft_get_eid(raft
, raft
->log_end
- 1);
1904 static struct raft_command
*
1905 raft_command_create_completed(enum raft_command_status status
)
1907 ovs_assert(status
!= RAFT_CMD_INCOMPLETE
);
1909 struct raft_command
*cmd
= xzalloc(sizeof *cmd
);
1911 cmd
->status
= status
;
1915 static struct raft_command
*
1916 raft_command_create_incomplete(struct raft
*raft
, uint64_t index
)
1918 struct raft_command
*cmd
= xzalloc(sizeof *cmd
);
1919 cmd
->n_refs
= 2; /* One for client, one for raft->commands. */
1921 cmd
->status
= RAFT_CMD_INCOMPLETE
;
1922 hmap_insert(&raft
->commands
, &cmd
->hmap_node
, cmd
->index
);
1926 static struct raft_command
* OVS_WARN_UNUSED_RESULT
1927 raft_command_initiate(struct raft
*raft
,
1928 const struct json
*data
, const struct json
*servers
,
1929 const struct uuid
*eid
)
1931 /* Write to local log. */
1932 uint64_t index
= raft
->log_end
;
1933 if (!raft_handle_write_error(
1934 raft
, raft_write_entry(
1935 raft
, raft
->term
, json_nullable_clone(data
), eid
,
1936 json_nullable_clone(servers
)))) {
1937 return raft_command_create_completed(RAFT_CMD_IO_ERROR
);
1940 struct raft_command
*cmd
= raft_command_create_incomplete(raft
, index
);
1945 raft_waiter_create(raft
, RAFT_W_ENTRY
, true)->entry
.index
= cmd
->index
;
1947 /* Write to remote logs. */
1948 struct raft_server
*s
;
1949 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
1950 if (!uuid_equals(&s
->sid
, &raft
->sid
) && s
->next_index
== index
) {
1951 raft_send_append_request(raft
, s
, 1, "execute command");
1959 static struct raft_command
* OVS_WARN_UNUSED_RESULT
1960 raft_command_execute__(struct raft
*raft
,
1961 const struct json
*data
, const struct json
*servers
,
1962 const struct uuid
*prereq
, struct uuid
*result
)
1964 if (raft
->joining
|| raft
->leaving
|| raft
->left
|| raft
->failed
) {
1965 return raft_command_create_completed(RAFT_CMD_SHUTDOWN
);
1968 if (raft
->role
!= RAFT_LEADER
) {
1969 /* Consider proxying the command to the leader. We can only do that if
1970 * we know the leader and the command does not change the set of
1971 * servers. We do not proxy commands without prerequisites, even
1972 * though we could, because in an OVSDB context a log entry doesn't
1973 * make sense without context. */
1974 if (servers
|| !data
1975 || raft
->role
!= RAFT_FOLLOWER
|| uuid_is_zero(&raft
->leader_sid
)
1977 return raft_command_create_completed(RAFT_CMD_NOT_LEADER
);
1981 struct uuid eid
= data
? uuid_random() : UUID_ZERO
;
1986 if (raft
->role
!= RAFT_LEADER
) {
1987 const union raft_rpc rpc
= {
1988 .execute_command_request
= {
1990 .type
= RAFT_RPC_EXECUTE_COMMAND_REQUEST
,
1991 .sid
= raft
->leader_sid
,
1993 .data
= CONST_CAST(struct json
*, data
),
1998 if (!raft_send(raft
, &rpc
)) {
1999 /* Couldn't send command, so it definitely failed. */
2000 return raft_command_create_completed(RAFT_CMD_NOT_LEADER
);
2003 struct raft_command
*cmd
= raft_command_create_incomplete(raft
, 0);
2004 cmd
->timestamp
= time_msec();
2009 const struct uuid
*current_eid
= raft_current_eid(raft
);
2010 if (prereq
&& !uuid_equals(prereq
, current_eid
)) {
2011 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
2012 VLOG_INFO_RL(&rl
, "current entry eid "UUID_FMT
" does not match "
2013 "prerequisite "UUID_FMT
,
2014 UUID_ARGS(current_eid
), UUID_ARGS(prereq
));
2015 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ
);
2018 return raft_command_initiate(raft
, data
, servers
, &eid
);
2021 /* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2022 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2023 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2024 * populated with the entry ID for the new log entry.
2026 * Returns a "struct raft_command" that may be used to track progress adding
2027 * the log entry. The caller must eventually free the returned structure, with
2028 * raft_command_unref(). */
2029 struct raft_command
* OVS_WARN_UNUSED_RESULT
2030 raft_command_execute(struct raft
*raft
, const struct json
*data
,
2031 const struct uuid
*prereq
, struct uuid
*result
)
2033 return raft_command_execute__(raft
, data
, NULL
, prereq
, result
);
2036 /* Returns the status of 'cmd'. */
2037 enum raft_command_status
2038 raft_command_get_status(const struct raft_command
*cmd
)
2040 ovs_assert(cmd
->n_refs
> 0);
2044 /* Returns the index of the log entry at which 'cmd' was committed.
2046 * This function works only with successful commands. */
2048 raft_command_get_commit_index(const struct raft_command
*cmd
)
2050 ovs_assert(cmd
->n_refs
> 0);
2051 ovs_assert(cmd
->status
== RAFT_CMD_SUCCESS
);
2057 raft_command_unref(struct raft_command
*cmd
)
2060 ovs_assert(cmd
->n_refs
> 0);
2061 if (!--cmd
->n_refs
) {
2067 /* Causes poll_block() to wake up when 'cmd' has status to report. */
2069 raft_command_wait(const struct raft_command
*cmd
)
2071 if (cmd
->status
!= RAFT_CMD_INCOMPLETE
) {
2072 poll_immediate_wake();
2077 raft_command_complete(struct raft
*raft
,
2078 struct raft_command
*cmd
,
2079 enum raft_command_status status
)
2081 if (!uuid_is_zero(&cmd
->sid
)) {
2082 uint64_t commit_index
= status
== RAFT_CMD_SUCCESS
? cmd
->index
: 0;
2083 raft_send_execute_command_reply(raft
, &cmd
->sid
, &cmd
->eid
, status
,
2087 ovs_assert(cmd
->status
== RAFT_CMD_INCOMPLETE
);
2088 ovs_assert(cmd
->n_refs
> 0);
2089 hmap_remove(&raft
->commands
, &cmd
->hmap_node
);
2090 cmd
->status
= status
;
2091 raft_command_unref(cmd
);
2095 raft_complete_all_commands(struct raft
*raft
, enum raft_command_status status
)
2097 struct raft_command
*cmd
, *next
;
2098 HMAP_FOR_EACH_SAFE (cmd
, next
, hmap_node
, &raft
->commands
) {
2099 raft_command_complete(raft
, cmd
, status
);
2103 static struct raft_command
*
2104 raft_find_command_by_index(struct raft
*raft
, uint64_t index
)
2106 struct raft_command
*cmd
;
2108 HMAP_FOR_EACH_IN_BUCKET (cmd
, hmap_node
, index
, &raft
->commands
) {
2109 if (cmd
->index
== index
) {
2116 static struct raft_command
*
2117 raft_find_command_by_eid(struct raft
*raft
, const struct uuid
*eid
)
2119 struct raft_command
*cmd
;
2121 HMAP_FOR_EACH (cmd
, hmap_node
, &raft
->commands
) {
2122 if (uuid_equals(&cmd
->eid
, eid
)) {
2129 #define RAFT_RPC(ENUM, NAME) \
2130 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2135 raft_handle_hello_request(struct raft
*raft OVS_UNUSED
,
2136 const struct raft_hello_request
*hello OVS_UNUSED
)
2140 /* 'sid' is the server being added. */
2142 raft_send_add_server_reply__(struct raft
*raft
, const struct uuid
*sid
,
2143 const char *address
,
2144 bool success
, const char *comment
)
2146 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(10, 10);
2147 if (!VLOG_DROP_INFO(&rl
)) {
2148 struct ds s
= DS_EMPTY_INITIALIZER
;
2149 char buf
[SID_LEN
+ 1];
2150 ds_put_format(&s
, "adding %s ("SID_FMT
" at %s) "
2151 "to cluster "CID_FMT
" %s",
2152 raft_get_nickname(raft
, sid
, buf
, sizeof buf
),
2153 SID_ARGS(sid
), address
, CID_ARGS(&raft
->cid
),
2154 success
? "succeeded" : "failed");
2156 ds_put_format(&s
, " (%s)", comment
);
2158 VLOG_INFO("%s", ds_cstr(&s
));
2162 union raft_rpc rpy
= {
2163 .add_server_reply
= {
2165 .type
= RAFT_RPC_ADD_SERVER_REPLY
,
2167 .comment
= CONST_CAST(char *, comment
),
2173 struct sset
*remote_addresses
= &rpy
.add_server_reply
.remote_addresses
;
2174 sset_init(remote_addresses
);
2175 if (!raft
->joining
) {
2176 struct raft_server
*s
;
2177 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
2178 if (!uuid_equals(&s
->sid
, &raft
->sid
)) {
2179 sset_add(remote_addresses
, s
->address
);
2184 raft_send(raft
, &rpy
);
2186 sset_destroy(remote_addresses
);
2190 raft_send_remove_server_reply_rpc(struct raft
*raft
, const struct uuid
*sid
,
2191 bool success
, const char *comment
)
2193 const union raft_rpc rpy
= {
2194 .remove_server_reply
= {
2196 .type
= RAFT_RPC_REMOVE_SERVER_REPLY
,
2198 .comment
= CONST_CAST(char *, comment
),
2203 raft_send(raft
, &rpy
);
2207 raft_send_remove_server_reply__(struct raft
*raft
,
2208 const struct uuid
*target_sid
,
2209 const struct uuid
*requester_sid
,
2210 struct unixctl_conn
*requester_conn
,
2211 bool success
, const char *comment
)
2213 struct ds s
= DS_EMPTY_INITIALIZER
;
2214 ds_put_format(&s
, "request ");
2215 if (!uuid_is_zero(requester_sid
)) {
2216 char buf
[SID_LEN
+ 1];
2217 ds_put_format(&s
, "by %s",
2218 raft_get_nickname(raft
, requester_sid
, buf
, sizeof buf
));
2220 ds_put_cstr(&s
, "via unixctl");
2222 ds_put_cstr(&s
, " to remove ");
2223 if (!requester_conn
&& uuid_equals(target_sid
, requester_sid
)) {
2224 ds_put_cstr(&s
, "itself");
2226 char buf
[SID_LEN
+ 1];
2227 ds_put_cstr(&s
, raft_get_nickname(raft
, target_sid
, buf
, sizeof buf
));
2229 ds_put_format(&s
, " from cluster "CID_FMT
" %s",
2230 CID_ARGS(&raft
->cid
),
2231 success
? "succeeded" : "failed");
2233 ds_put_format(&s
, " (%s)", comment
);
2236 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(10, 10);
2237 VLOG_INFO_RL(&rl
, "%s", ds_cstr(&s
));
2239 /* Send RemoveServerReply to the requester (which could be a server or a
2240 * unixctl connection. Also always send it to the removed server; this
2241 * allows it to be sure that it's really removed and update its log and
2242 * disconnect permanently. */
2243 if (!uuid_is_zero(requester_sid
)) {
2244 raft_send_remove_server_reply_rpc(raft
, requester_sid
,
2247 if (!uuid_equals(requester_sid
, target_sid
)) {
2248 raft_send_remove_server_reply_rpc(raft
, target_sid
, success
, comment
);
2250 if (requester_conn
) {
2252 unixctl_command_reply(requester_conn
, ds_cstr(&s
));
2254 unixctl_command_reply_error(requester_conn
, ds_cstr(&s
));
2262 raft_send_add_server_reply(struct raft
*raft
,
2263 const struct raft_add_server_request
*rq
,
2264 bool success
, const char *comment
)
2266 return raft_send_add_server_reply__(raft
, &rq
->common
.sid
, rq
->address
,
2271 raft_send_remove_server_reply(struct raft
*raft
,
2272 const struct raft_remove_server_request
*rq
,
2273 bool success
, const char *comment
)
2275 return raft_send_remove_server_reply__(raft
, &rq
->sid
, &rq
->common
.sid
,
2276 rq
->requester_conn
, success
,
2281 raft_become_follower(struct raft
*raft
)
2283 raft
->leader_sid
= UUID_ZERO
;
2284 if (raft
->role
== RAFT_FOLLOWER
) {
2288 raft
->role
= RAFT_FOLLOWER
;
2289 raft_reset_timer(raft
);
2291 /* Notify clients about lost leadership.
2293 * We do not reverse our changes to 'raft->servers' because the new
2294 * configuration is already part of the log. Possibly the configuration
2295 * log entry will not be committed, but until we know that we must use the
2296 * new configuration. Our AppendEntries processing will properly update
2297 * the server configuration later, if necessary. */
2298 struct raft_server
*s
;
2299 HMAP_FOR_EACH (s
, hmap_node
, &raft
->add_servers
) {
2300 raft_send_add_server_reply__(raft
, &s
->sid
, s
->address
, false,
2301 RAFT_SERVER_LOST_LEADERSHIP
);
2303 if (raft
->remove_server
) {
2304 raft_send_remove_server_reply__(raft
, &raft
->remove_server
->sid
,
2305 &raft
->remove_server
->requester_sid
,
2306 raft
->remove_server
->requester_conn
,
2307 false, RAFT_SERVER_LOST_LEADERSHIP
);
2308 raft_server_destroy(raft
->remove_server
);
2309 raft
->remove_server
= NULL
;
2312 raft_complete_all_commands(raft
, RAFT_CMD_LOST_LEADERSHIP
);
2316 raft_send_append_request(struct raft
*raft
,
2317 struct raft_server
*peer
, unsigned int n
,
2318 const char *comment
)
2320 ovs_assert(raft
->role
== RAFT_LEADER
);
2322 const union raft_rpc rq
= {
2325 .type
= RAFT_RPC_APPEND_REQUEST
,
2327 .comment
= CONST_CAST(char *, comment
),
2330 .prev_log_index
= peer
->next_index
- 1,
2331 .prev_log_term
= (peer
->next_index
- 1 >= raft
->log_start
2332 ? raft
->entries
[peer
->next_index
- 1
2333 - raft
->log_start
].term
2335 .leader_commit
= raft
->commit_index
,
2336 .entries
= &raft
->entries
[peer
->next_index
- raft
->log_start
],
2340 raft_send(raft
, &rq
);
2344 raft_send_heartbeats(struct raft
*raft
)
2346 struct raft_server
*s
;
2347 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
2348 if (!uuid_equals(&raft
->sid
, &s
->sid
)) {
2349 raft_send_append_request(raft
, s
, 0, "heartbeat");
2353 /* Send anyone waiting for a command to complete a ping to let them
2354 * know we're still working on it. */
2355 struct raft_command
*cmd
;
2356 HMAP_FOR_EACH (cmd
, hmap_node
, &raft
->commands
) {
2357 if (!uuid_is_zero(&cmd
->sid
)) {
2358 raft_send_execute_command_reply(raft
, &cmd
->sid
,
2360 RAFT_CMD_INCOMPLETE
, 0);
2365 /* Initializes the fields in 's' that represent the leader's view of the
2368 raft_server_init_leader(struct raft
*raft
, struct raft_server
*s
)
2370 s
->next_index
= raft
->log_end
;
2372 s
->phase
= RAFT_PHASE_STABLE
;
2376 raft_become_leader(struct raft
*raft
)
2378 raft_complete_all_commands(raft
, RAFT_CMD_LOST_LEADERSHIP
);
2380 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
2381 VLOG_INFO_RL(&rl
, "term %"PRIu64
": elected leader by %d+ of "
2382 "%"PRIuSIZE
" servers", raft
->term
,
2383 raft
->n_votes
, hmap_count(&raft
->servers
));
2385 ovs_assert(raft
->role
!= RAFT_LEADER
);
2386 raft
->role
= RAFT_LEADER
;
2387 raft
->leader_sid
= raft
->sid
;
2388 raft
->election_timeout
= LLONG_MAX
;
2389 raft
->ping_timeout
= time_msec() + PING_TIME_MSEC
;
2391 struct raft_server
*s
;
2392 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
2393 raft_server_init_leader(raft
, s
);
2396 raft_update_our_match_index(raft
, raft
->log_end
- 1);
2397 raft_send_heartbeats(raft
);
2399 /* Write the fact that we are leader to the log. This is not used by the
2400 * algorithm (although it could be, for quick restart), but it is used for
2401 * offline analysis to check for conformance with the properties that Raft
2403 struct raft_record r
= {
2404 .type
= RAFT_REC_LEADER
,
2408 ignore(ovsdb_log_write_and_free(raft
->log
, raft_record_to_json(&r
)));
2410 /* Initiate a no-op commit. Otherwise we might never find out what's in
2411 * the log. See section 6.4 item 1:
2413 * The Leader Completeness Property guarantees that a leader has all
2414 * committed entries, but at the start of its term, it may not know
2415 * which those are. To find out, it needs to commit an entry from its
2416 * term. Raft handles this by having each leader commit a blank no-op
2417 * entry into the log at the start of its term. As soon as this no-op
2418 * entry is committed, the leader’s commit index will be at least as
2419 * large as any other servers’ during its term.
2421 raft_command_unref(raft_command_execute__(raft
, NULL
, NULL
, NULL
, NULL
));
2424 /* Processes term 'term' received as part of RPC 'common'. Returns true if the
2425 * caller should continue processing the RPC, false if the caller should reject
2426 * it due to a stale term. */
2428 raft_receive_term__(struct raft
*raft
, const struct raft_rpc_common
*common
,
2431 /* Section 3.3 says:
2433 * Current terms are exchanged whenever servers communicate; if one
2434 * server’s current term is smaller than the other’s, then it updates
2435 * its current term to the larger value. If a candidate or leader
2436 * discovers that its term is out of date, it immediately reverts to
2437 * follower state. If a server receives a request with a stale term
2438 * number, it rejects the request.
2440 if (term
> raft
->term
) {
2441 if (!raft_set_term(raft
, term
, NULL
)) {
2442 /* Failed to update the term to 'term'. */
2445 raft_become_follower(raft
);
2446 } else if (term
< raft
->term
) {
2447 char buf
[SID_LEN
+ 1];
2448 VLOG_INFO("rejecting term %"PRIu64
" < current term %"PRIu64
" received "
2449 "in %s message from server %s",
2451 raft_rpc_type_to_string(common
->type
),
2452 raft_get_nickname(raft
, &common
->sid
, buf
, sizeof buf
));
2459 raft_get_servers_from_log(struct raft
*raft
, enum vlog_level level
)
2461 const struct json
*servers_json
= raft
->snap
.servers
;
2462 for (uint64_t index
= raft
->log_end
- 1; index
>= raft
->log_start
;
2464 struct raft_entry
*e
= &raft
->entries
[index
- raft
->log_start
];
2466 servers_json
= e
->servers
;
2471 struct hmap servers
;
2472 struct ovsdb_error
*error
= raft_servers_from_json(servers_json
, &servers
);
2474 raft_set_servers(raft
, &servers
, level
);
2475 raft_servers_destroy(&servers
);
2478 /* Truncates the log, so that raft->log_end becomes 'new_end'.
2480 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2481 * log file, but we don't have the right information to know how long it should
2482 * be. What we actually do is to append entries for older indexes to the
2483 * on-disk log; when we re-read it later, these entries truncate the log.
2485 * Returns true if any of the removed log entries were server configuration
2486 * entries, false otherwise. */
2488 raft_truncate(struct raft
*raft
, uint64_t new_end
)
2490 ovs_assert(new_end
>= raft
->log_start
);
2491 if (raft
->log_end
> new_end
) {
2492 char buf
[SID_LEN
+ 1];
2493 VLOG_INFO("%s truncating %"PRIu64
" entries from end of log",
2494 raft_get_nickname(raft
, &raft
->sid
, buf
, sizeof buf
),
2495 raft
->log_end
- new_end
);
2498 bool servers_changed
= false;
2499 while (raft
->log_end
> new_end
) {
2500 struct raft_entry
*entry
= &raft
->entries
[--raft
->log_end
2502 if (entry
->servers
) {
2503 servers_changed
= true;
2505 raft_entry_uninit(entry
);
2507 return servers_changed
;
2510 static const struct json
*
2511 raft_peek_next_entry(struct raft
*raft
, struct uuid
*eid
)
2513 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2514 ovs_assert(raft
->log_start
<= raft
->last_applied
+ 2);
2515 ovs_assert(raft
->last_applied
<= raft
->commit_index
);
2516 ovs_assert(raft
->commit_index
< raft
->log_end
);
2518 if (raft
->joining
|| raft
->failed
) {
2522 if (raft
->log_start
== raft
->last_applied
+ 2) {
2523 *eid
= raft
->snap
.eid
;
2524 return raft
->snap
.data
;
2527 while (raft
->last_applied
< raft
->commit_index
) {
2528 const struct raft_entry
*e
= raft_get_entry(raft
,
2529 raft
->last_applied
+ 1);
2534 raft
->last_applied
++;
2539 static const struct json
*
2540 raft_get_next_entry(struct raft
*raft
, struct uuid
*eid
)
2542 const struct json
*data
= raft_peek_next_entry(raft
, eid
);
2544 raft
->last_applied
++;
2550 raft_update_commit_index(struct raft
*raft
, uint64_t new_commit_index
)
2552 if (new_commit_index
<= raft
->commit_index
) {
2556 if (raft
->role
== RAFT_LEADER
) {
2557 while (raft
->commit_index
< new_commit_index
) {
2558 uint64_t index
= ++raft
->commit_index
;
2559 const struct raft_entry
*e
= raft_get_entry(raft
, index
);
2561 raft_run_reconfigure(raft
);
2564 struct raft_command
*cmd
2565 = raft_find_command_by_index(raft
, index
);
2567 raft_command_complete(raft
, cmd
, RAFT_CMD_SUCCESS
);
2572 raft
->commit_index
= new_commit_index
;
2575 /* Write the commit index to the log. The next time we restart, this
2576 * allows us to start exporting a reasonably fresh log, instead of a log
2577 * that only contains the snapshot. */
2578 struct raft_record r
= {
2579 .type
= RAFT_REC_COMMIT_INDEX
,
2580 .commit_index
= raft
->commit_index
,
2582 ignore(ovsdb_log_write_and_free(raft
->log
, raft_record_to_json(&r
)));
2585 /* This doesn't use rq->entries (but it does use rq->n_entries). */
2587 raft_send_append_reply(struct raft
*raft
, const struct raft_append_request
*rq
,
2588 enum raft_append_result result
, const char *comment
)
2590 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2591 * min(leaderCommit, index of last new entry)" */
2592 if (result
== RAFT_APPEND_OK
&& rq
->leader_commit
> raft
->commit_index
) {
2593 raft_update_commit_index(
2594 raft
, MIN(rq
->leader_commit
, rq
->prev_log_index
+ rq
->n_entries
));
2598 union raft_rpc reply
= {
2601 .type
= RAFT_RPC_APPEND_REPLY
,
2602 .sid
= rq
->common
.sid
,
2603 .comment
= CONST_CAST(char *, comment
),
2606 .log_end
= raft
->log_end
,
2607 .prev_log_index
= rq
->prev_log_index
,
2608 .prev_log_term
= rq
->prev_log_term
,
2609 .n_entries
= rq
->n_entries
,
2613 raft_send(raft
, &reply
);
2616 /* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2617 * NULL. Otherwise, returns an explanation for the mismatch. */
2619 match_index_and_term(const struct raft
*raft
,
2620 uint64_t prev_log_index
, uint64_t prev_log_term
)
2622 if (prev_log_index
< raft
->log_start
- 1) {
2623 return "mismatch before start of log";
2624 } else if (prev_log_index
== raft
->log_start
- 1) {
2625 if (prev_log_term
!= raft
->snap
.term
) {
2626 return "prev_term mismatch";
2628 } else if (prev_log_index
< raft
->log_end
) {
2629 if (raft
->entries
[prev_log_index
- raft
->log_start
].term
2631 return "term mismatch";
2634 /* prev_log_index >= raft->log_end */
2635 return "mismatch past end of log";
2641 raft_handle_append_entries(struct raft
*raft
,
2642 const struct raft_append_request
*rq
,
2643 uint64_t prev_log_index
, uint64_t prev_log_term
,
2644 const struct raft_entry
*entries
,
2645 unsigned int n_entries
)
2647 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2648 * the index and term of the entry in its log that immediately precedes
2649 * the new entries. If the follower does not find an entry in its log
2650 * with the same index and term, then it refuses the new entries." */
2651 const char *mismatch
= match_index_and_term(raft
, prev_log_index
,
2654 VLOG_INFO("rejecting append_request because previous entry "
2655 "%"PRIu64
",%"PRIu64
" not in local log (%s)",
2656 prev_log_term
, prev_log_index
, mismatch
);
2657 raft_send_append_reply(raft
, rq
, RAFT_APPEND_INCONSISTENCY
, mismatch
);
2661 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2662 * index but different terms), delete the existing entry and all that
2665 bool servers_changed
= false;
2666 for (i
= 0; ; i
++) {
2667 if (i
>= n_entries
) {
2669 if (rq
->common
.comment
2670 && !strcmp(rq
->common
.comment
, "heartbeat")) {
2671 raft_send_append_reply(raft
, rq
, RAFT_APPEND_OK
, "heartbeat");
2673 raft_send_append_reply(raft
, rq
, RAFT_APPEND_OK
, "no change");
2678 uint64_t log_index
= (prev_log_index
+ 1) + i
;
2679 if (log_index
>= raft
->log_end
) {
2682 if (raft
->entries
[log_index
- raft
->log_start
].term
2683 != entries
[i
].term
) {
2684 if (raft_truncate(raft
, log_index
)) {
2685 servers_changed
= true;
2691 /* Figure 3.1: "Append any entries not already in the log." */
2692 struct ovsdb_error
*error
= NULL
;
2693 bool any_written
= false;
2694 for (; i
< n_entries
; i
++) {
2695 const struct raft_entry
*e
= &entries
[i
];
2696 error
= raft_write_entry(raft
, e
->term
,
2697 json_nullable_clone(e
->data
), &e
->eid
,
2698 json_nullable_clone(e
->servers
));
2704 servers_changed
= true;
2709 raft_waiter_create(raft
, RAFT_W_ENTRY
, true)->entry
.index
2710 = raft
->log_end
- 1;
2712 if (servers_changed
) {
2713 /* The set of servers might have changed; check. */
2714 raft_get_servers_from_log(raft
, VLL_INFO
);
2718 char *s
= ovsdb_error_to_string_free(error
);
2721 raft_send_append_reply(raft
, rq
, RAFT_APPEND_IO_ERROR
, "I/O error");
2725 raft_send_append_reply(raft
, rq
, RAFT_APPEND_OK
, "log updated");
2729 raft_update_leader(struct raft
*raft
, const struct uuid
*sid
)
2731 if (raft
->role
== RAFT_LEADER
) {
2732 char buf
[SID_LEN
+ 1];
2733 VLOG_ERR("this server is leader but server %s claims to be",
2734 raft_get_nickname(raft
, sid
, buf
, sizeof buf
));
2736 } else if (!uuid_equals(sid
, &raft
->leader_sid
)) {
2737 if (!uuid_is_zero(&raft
->leader_sid
)) {
2738 char buf1
[SID_LEN
+ 1];
2739 char buf2
[SID_LEN
+ 1];
2740 VLOG_ERR("leader for term %"PRIu64
" changed from %s to %s",
2742 raft_get_nickname(raft
, &raft
->leader_sid
,
2744 raft_get_nickname(raft
, sid
, buf2
, sizeof buf2
));
2746 char buf
[SID_LEN
+ 1];
2747 VLOG_INFO("server %s is leader for term %"PRIu64
,
2748 raft_get_nickname(raft
, sid
, buf
, sizeof buf
),
2751 raft
->leader_sid
= *sid
;
2753 /* Record the leader to the log. This is not used by the algorithm
2754 * (although it could be, for quick restart), but it is used for
2755 * offline analysis to check for conformance with the properties
2756 * that Raft guarantees. */
2757 struct raft_record r
= {
2758 .type
= RAFT_REC_LEADER
,
2762 ignore(ovsdb_log_write_and_free(raft
->log
, raft_record_to_json(&r
)));
2768 raft_handle_append_request(struct raft
*raft
,
2769 const struct raft_append_request
*rq
)
2771 /* We do not check whether the server that sent the request is part of the
2772 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
2773 * from a leader that is not part of the server’s latest configuration.
2774 * Otherwise, a new server could never be added to the cluster (it would
2775 * never accept any log entries preceding the configuration entry that adds
2777 if (!raft_update_leader(raft
, &rq
->common
.sid
)) {
2778 raft_send_append_reply(raft
, rq
, RAFT_APPEND_INCONSISTENCY
,
2779 "usurped leadership");
2782 raft_reset_timer(raft
);
2784 /* First check for the common case, where the AppendEntries request is
2785 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
2788 * rq->prev_log_index
2789 * | first_entry_index
2790 * | | nth_entry_index
2794 * T | T | T | T | T |
2797 * T | T | T | T | T |
2803 uint64_t first_entry_index
= rq
->prev_log_index
+ 1;
2804 uint64_t nth_entry_index
= rq
->prev_log_index
+ rq
->n_entries
;
2805 if (OVS_LIKELY(first_entry_index
>= raft
->log_start
)) {
2806 raft_handle_append_entries(raft
, rq
,
2807 rq
->prev_log_index
, rq
->prev_log_term
,
2808 rq
->entries
, rq
->n_entries
);
2812 /* Now a series of checks for odd cases, where the AppendEntries request
2813 * extends earlier than the beginning of our log, into the log entries
2814 * discarded by the most recent snapshot. */
2817 * Handle the case where the indexes covered by rq->entries[] are entirely
2818 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
2819 * everything in the AppendEntries request must already have been
2820 * committed, and we might as well return true.
2822 * rq->prev_log_index
2823 * | first_entry_index
2824 * | | nth_entry_index
2828 * T | T | T | T | T |
2831 * T | T | T | T | T |
2837 if (nth_entry_index
< raft
->log_start
- 1) {
2838 raft_send_append_reply(raft
, rq
, RAFT_APPEND_OK
,
2839 "append before log start");
2844 * Handle the case where the last entry in rq->entries[] has the same index
2845 * as 'log_start - 1', so we can compare their terms:
2847 * rq->prev_log_index
2848 * | first_entry_index
2849 * | | nth_entry_index
2853 * T | T | T | T | T |
2856 * T | T | T | T | T |
2862 * There's actually a sub-case where rq->n_entries == 0, in which we
2863 * compare rq->prev_term:
2865 * rq->prev_log_index
2873 * T | T | T | T | T |
2879 if (nth_entry_index
== raft
->log_start
- 1) {
2881 ? raft
->snap
.term
== rq
->entries
[rq
->n_entries
- 1].term
2882 : raft
->snap
.term
== rq
->prev_log_term
) {
2883 raft_send_append_reply(raft
, rq
, RAFT_APPEND_OK
, "no change");
2885 raft_send_append_reply(raft
, rq
, RAFT_APPEND_INCONSISTENCY
,
2892 * We now know that the data in rq->entries[] overlaps the data in
2893 * raft->entries[], as shown below, with some positive 'ofs':
2895 * rq->prev_log_index
2896 * | first_entry_index
2897 * | | nth_entry_index
2900 * +---+---+---+---+---+
2901 * T | T | T | T | T | T |
2902 * +---+-------+---+---+
2904 * T | T | T | T | T |
2912 * We transform this into the following by trimming the first 'ofs'
2913 * elements off of rq->entries[], ending up with the following. Notice how
2914 * we retain the term but not the data for rq->entries[ofs - 1]:
2916 * first_entry_index + ofs - 1
2917 * | first_entry_index + ofs
2918 * | | nth_entry_index + ofs
2925 * T | T | T | T | T |
2931 uint64_t ofs
= raft
->log_start
- first_entry_index
;
2932 raft_handle_append_entries(raft
, rq
,
2933 raft
->log_start
- 1, rq
->entries
[ofs
- 1].term
,
2934 &rq
->entries
[ofs
], rq
->n_entries
- ofs
);
2937 /* Returns true if 'raft' has another log entry or snapshot to read. */
2939 raft_has_next_entry(const struct raft
*raft_
)
2941 struct raft
*raft
= CONST_CAST(struct raft
*, raft_
);
2943 return raft_peek_next_entry(raft
, &eid
) != NULL
;
2946 /* Returns the next log entry or snapshot from 'raft', or NULL if there are
2947 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
2948 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
2951 raft_next_entry(struct raft
*raft
, struct uuid
*eid
, bool *is_snapshot
)
2953 const struct json
*data
= raft_get_next_entry(raft
, eid
);
2954 *is_snapshot
= data
== raft
->snap
.data
;
2958 /* Returns the log index of the last-read snapshot or log entry. */
2960 raft_get_applied_index(const struct raft
*raft
)
2962 return raft
->last_applied
;
2965 /* Returns the log index of the last snapshot or log entry that is available to
2968 raft_get_commit_index(const struct raft
*raft
)
2970 return raft
->commit_index
;
2973 static struct raft_server
*
2974 raft_find_peer(struct raft
*raft
, const struct uuid
*uuid
)
2976 struct raft_server
*s
= raft_find_server(raft
, uuid
);
2977 return s
&& !uuid_equals(&raft
->sid
, &s
->sid
) ? s
: NULL
;
2980 static struct raft_server
*
2981 raft_find_new_server(struct raft
*raft
, const struct uuid
*uuid
)
2983 return raft_server_find(&raft
->add_servers
, uuid
);
2986 /* Figure 3.1: "If there exists an N such that N > commitIndex, a
2987 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
2988 * commitIndex = N (sections 3.5 and 3.6)." */
2990 raft_consider_updating_commit_index(struct raft
*raft
)
2992 /* This loop cannot just bail out when it comes across a log entry that
2993 * does not match the criteria. For example, Figure 3.7(d2) shows a
2994 * case where the log entry for term 2 cannot be committed directly
2995 * (because it is not for the current term) but it can be committed as
2996 * a side effect of commit the entry for term 4 (the current term).
2997 * XXX Is there a more efficient way to do this? */
2998 ovs_assert(raft
->role
== RAFT_LEADER
);
3000 uint64_t new_commit_index
= raft
->commit_index
;
3001 for (uint64_t idx
= MAX(raft
->commit_index
+ 1, raft
->log_start
);
3002 idx
< raft
->log_end
; idx
++) {
3003 if (raft
->entries
[idx
- raft
->log_start
].term
== raft
->term
) {
3005 struct raft_server
*s2
;
3006 HMAP_FOR_EACH (s2
, hmap_node
, &raft
->servers
) {
3007 if (s2
->match_index
>= idx
) {
3011 if (count
> hmap_count(&raft
->servers
) / 2) {
3012 VLOG_DBG("index %"PRIu64
" committed to %"PRIuSIZE
" servers, "
3013 "applying", idx
, count
);
3014 new_commit_index
= idx
;
3018 raft_update_commit_index(raft
, new_commit_index
);
3022 raft_update_match_index(struct raft
*raft
, struct raft_server
*s
,
3025 ovs_assert(raft
->role
== RAFT_LEADER
);
3026 if (min_index
> s
->match_index
) {
3027 s
->match_index
= min_index
;
3028 raft_consider_updating_commit_index(raft
);
3033 raft_update_our_match_index(struct raft
*raft
, uint64_t min_index
)
3035 raft_update_match_index(raft
, raft_find_server(raft
, &raft
->sid
),
3040 raft_send_install_snapshot_request(struct raft
*raft
,
3041 const struct raft_server
*s
,
3042 const char *comment
)
3044 union raft_rpc rpc
= {
3045 .install_snapshot_request
= {
3047 .type
= RAFT_RPC_INSTALL_SNAPSHOT_REQUEST
,
3049 .comment
= CONST_CAST(char *, comment
),
3052 .last_index
= raft
->log_start
- 1,
3053 .last_term
= raft
->snap
.term
,
3054 .last_servers
= raft
->snap
.servers
,
3055 .last_eid
= raft
->snap
.eid
,
3056 .data
= raft
->snap
.data
,
3059 raft_send(raft
, &rpc
);
3063 raft_handle_append_reply(struct raft
*raft
,
3064 const struct raft_append_reply
*rpy
)
3066 if (raft
->role
!= RAFT_LEADER
) {
3067 VLOG_INFO("rejected append_reply (not leader)");
3071 /* Most commonly we'd be getting an AppendEntries reply from a configured
3072 * server (e.g. a peer), but we can also get them from servers in the
3073 * process of being added. */
3074 struct raft_server
*s
= raft_find_peer(raft
, &rpy
->common
.sid
);
3076 s
= raft_find_new_server(raft
, &rpy
->common
.sid
);
3078 VLOG_INFO("rejected append_reply from unknown server "SID_FMT
,
3079 SID_ARGS(&rpy
->common
.sid
));
3084 if (rpy
->result
== RAFT_APPEND_OK
) {
3085 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3086 * follower (section 3.5)." */
3087 uint64_t min_index
= rpy
->prev_log_index
+ rpy
->n_entries
+ 1;
3088 if (s
->next_index
< min_index
) {
3089 s
->next_index
= min_index
;
3091 raft_update_match_index(raft
, s
, min_index
- 1);
3093 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3094 * decrement nextIndex and retry (section 3.5)."
3096 * We also implement the optimization suggested in section 4.2.1:
3097 * "Various approaches can make nextIndex converge to its correct value
3098 * more quickly, including those described in Chapter 3. The simplest
3099 * approach to solving this particular problem of adding a new server,
3100 * however, is to have followers return the length of their logs in the
3101 * AppendEntries response; this allows the leader to cap the follower’s
3102 * nextIndex accordingly." */
3103 s
->next_index
= (s
->next_index
> 0
3104 ? MIN(s
->next_index
- 1, rpy
->log_end
)
3107 if (rpy
->result
== RAFT_APPEND_IO_ERROR
) {
3108 /* Append failed but not because of a log inconsistency. Because
3109 * of the I/O error, there's no point in re-sending the append
3111 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
3112 VLOG_INFO_RL(&rl
, "%s reported I/O error", s
->nickname
);
3118 * Our behavior here must depend on the value of next_index relative to
3119 * log_start and log_end. There are three cases:
3121 * Case 1 | Case 2 | Case 3
3122 * <---------------->|<------------->|<------------------>
3126 * T | T | T | T | T |
3132 if (s
->next_index
< raft
->log_start
) {
3134 raft_send_install_snapshot_request(raft
, s
, NULL
);
3135 } else if (s
->next_index
< raft
->log_end
) {
3137 raft_send_append_request(raft
, s
, 1, NULL
);
3140 if (s
->phase
== RAFT_PHASE_CATCHUP
) {
3141 s
->phase
= RAFT_PHASE_CAUGHT_UP
;
3142 raft_run_reconfigure(raft
);
3148 raft_should_suppress_disruptive_server(struct raft
*raft
,
3149 const union raft_rpc
*rpc
)
3151 if (rpc
->type
!= RAFT_RPC_VOTE_REQUEST
) {
3155 /* Section 4.2.3 "Disruptive Servers" says:
3157 * ...if a server receives a RequestVote request within the minimum
3158 * election timeout of hearing from a current leader, it does not update
3159 * its term or grant its vote...
3161 * ...This change conflicts with the leadership transfer mechanism as
3162 * described in Chapter 3, in which a server legitimately starts an
3163 * election without waiting an election timeout. In that case,
3164 * RequestVote messages should be processed by other servers even when
3165 * they believe a current cluster leader exists. Those RequestVote
3166 * requests can include a special flag to indicate this behavior (“I
3167 * have permission to disrupt the leader--it told me to!”).
3169 * This clearly describes how the followers should act, but not the leader.
3170 * We just ignore vote requests that arrive at a current leader. This
3171 * seems to be fairly safe, since a majority other than the current leader
3172 * can still elect a new leader and the first AppendEntries from that new
3173 * leader will depose the current leader. */
3174 const struct raft_vote_request
*rq
= raft_vote_request_cast(rpc
);
3175 if (rq
->leadership_transfer
) {
3179 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
3180 long long int now
= time_msec();
3181 switch (raft
->role
) {
3183 VLOG_WARN_RL(&rl
, "ignoring vote request received as leader");
3187 if (now
< raft
->election_base
+ ELECTION_BASE_MSEC
) {
3188 VLOG_WARN_RL(&rl
, "ignoring vote request received after only "
3189 "%lld ms (minimum election time is %d ms)",
3190 now
- raft
->election_base
, ELECTION_BASE_MSEC
);
3195 case RAFT_CANDIDATE
:
3203 /* Returns true if a reply should be sent. */
3205 raft_handle_vote_request__(struct raft
*raft
,
3206 const struct raft_vote_request
*rq
)
3208 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3209 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3211 if (uuid_equals(&raft
->vote
, &rq
->common
.sid
)) {
3212 /* Already voted for this candidate in this term. Resend vote. */
3214 } else if (!uuid_is_zero(&raft
->vote
)) {
3215 /* Already voted for different candidate in this term. Send a reply
3216 * saying what candidate we did vote for. This isn't a necessary part
3217 * of the Raft protocol but it can make debugging easier. */
3221 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3222 * includes information about the candidate’s log, and the voter denies its
3223 * vote if its own log is more up-to-date than that of the candidate. Raft
3224 * determines which of two logs is more up-to-date by comparing the index
3225 * and term of the last entries in the logs. If the logs have last entries
3226 * with different terms, then the log with the later term is more
3227 * up-to-date. If the logs end with the same term, then whichever log is
3228 * longer is more up-to-date." */
3229 uint64_t last_term
= (raft
->log_end
> raft
->log_start
3230 ? raft
->entries
[raft
->log_end
- 1
3231 - raft
->log_start
].term
3233 if (last_term
> rq
->last_log_term
3234 || (last_term
== rq
->last_log_term
3235 && raft
->log_end
- 1 > rq
->last_log_index
)) {
3236 /* Our log is more up-to-date than the peer's. Withhold vote. */
3240 /* Record a vote for the peer. */
3241 if (!raft_set_term(raft
, raft
->term
, &rq
->common
.sid
)) {
3245 raft_reset_timer(raft
);
3251 raft_send_vote_reply(struct raft
*raft
, const struct uuid
*dst
,
3252 const struct uuid
*vote
)
3254 union raft_rpc rpy
= {
3257 .type
= RAFT_RPC_VOTE_REPLY
,
3264 raft_send(raft
, &rpy
);
3268 raft_handle_vote_request(struct raft
*raft
,
3269 const struct raft_vote_request
*rq
)
3271 if (raft_handle_vote_request__(raft
, rq
)) {
3272 raft_send_vote_reply(raft
, &rq
->common
.sid
, &raft
->vote
);
3277 raft_handle_vote_reply(struct raft
*raft
,
3278 const struct raft_vote_reply
*rpy
)
3280 if (!raft_receive_term__(raft
, &rpy
->common
, rpy
->term
)) {
3284 if (raft
->role
!= RAFT_CANDIDATE
) {
3288 struct raft_server
*s
= raft_find_peer(raft
, &rpy
->common
.sid
);
3290 raft_accept_vote(raft
, s
, &rpy
->vote
);
3294 /* Returns true if 'raft''s log contains reconfiguration entries that have not
3295 * yet been committed. */
3297 raft_has_uncommitted_configuration(const struct raft
*raft
)
3299 for (uint64_t i
= raft
->commit_index
+ 1; i
< raft
->log_end
; i
++) {
3300 ovs_assert(i
>= raft
->log_start
);
3301 const struct raft_entry
*e
= &raft
->entries
[i
- raft
->log_start
];
3310 raft_log_reconfiguration(struct raft
*raft
)
3312 struct json
*servers_json
= raft_servers_to_json(&raft
->servers
);
3313 raft_command_unref(raft_command_execute__(
3314 raft
, NULL
, servers_json
, NULL
, NULL
));
3315 json_destroy(servers_json
);
3319 raft_run_reconfigure(struct raft
*raft
)
3321 ovs_assert(raft
->role
== RAFT_LEADER
);
3323 /* Reconfiguration only progresses when configuration changes commit. */
3324 if (raft_has_uncommitted_configuration(raft
)) {
3328 /* If we were waiting for a configuration change to commit, it's done. */
3329 struct raft_server
*s
;
3330 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
3331 if (s
->phase
== RAFT_PHASE_COMMITTING
) {
3332 raft_send_add_server_reply__(raft
, &s
->sid
, s
->address
,
3333 true, RAFT_SERVER_COMPLETED
);
3334 s
->phase
= RAFT_PHASE_STABLE
;
3337 if (raft
->remove_server
) {
3338 raft_send_remove_server_reply__(raft
, &raft
->remove_server
->sid
,
3339 &raft
->remove_server
->requester_sid
,
3340 raft
->remove_server
->requester_conn
,
3341 true, RAFT_SERVER_COMPLETED
);
3342 raft_server_destroy(raft
->remove_server
);
3343 raft
->remove_server
= NULL
;
3346 /* If a new server is caught up, add it to the configuration. */
3347 HMAP_FOR_EACH (s
, hmap_node
, &raft
->add_servers
) {
3348 if (s
->phase
== RAFT_PHASE_CAUGHT_UP
) {
3349 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3350 hmap_remove(&raft
->add_servers
, &s
->hmap_node
);
3351 hmap_insert(&raft
->servers
, &s
->hmap_node
, uuid_hash(&s
->sid
));
3353 /* Mark 's' as waiting for commit. */
3354 s
->phase
= RAFT_PHASE_COMMITTING
;
3356 raft_log_reconfiguration(raft
);
3358 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3359 * send a RAFT_SERVER_OK reply. */
3365 /* Remove a server, if one is scheduled for removal. */
3366 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
3367 if (s
->phase
== RAFT_PHASE_REMOVE
) {
3368 hmap_remove(&raft
->servers
, &s
->hmap_node
);
3369 raft
->remove_server
= s
;
3371 raft_log_reconfiguration(raft
);
3379 raft_handle_add_server_request(struct raft
*raft
,
3380 const struct raft_add_server_request
*rq
)
3382 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3383 if (raft
->role
!= RAFT_LEADER
) {
3384 raft_send_add_server_reply(raft
, rq
, false, RAFT_SERVER_NOT_LEADER
);
3388 /* Check for an existing server. */
3389 struct raft_server
*s
= raft_find_server(raft
, &rq
->common
.sid
);
3391 /* If the server is scheduled to be removed, cancel it. */
3392 if (s
->phase
== RAFT_PHASE_REMOVE
) {
3393 s
->phase
= RAFT_PHASE_STABLE
;
3394 raft_send_add_server_reply(raft
, rq
, false, RAFT_SERVER_CANCELED
);
3398 /* If the server is being added, then it's in progress. */
3399 if (s
->phase
!= RAFT_PHASE_STABLE
) {
3400 raft_send_add_server_reply(raft
, rq
,
3401 false, RAFT_SERVER_IN_PROGRESS
);
3404 /* Nothing to do--server is already part of the configuration. */
3405 raft_send_add_server_reply(raft
, rq
,
3406 true, RAFT_SERVER_ALREADY_PRESENT
);
3410 /* Check for a server being removed. */
3411 if (raft
->remove_server
3412 && uuid_equals(&rq
->common
.sid
, &raft
->remove_server
->sid
)) {
3413 raft_send_add_server_reply(raft
, rq
, false, RAFT_SERVER_COMMITTING
);
3417 /* Check for a server already being added. */
3418 if (raft_find_new_server(raft
, &rq
->common
.sid
)) {
3419 raft_send_add_server_reply(raft
, rq
, false, RAFT_SERVER_IN_PROGRESS
);
3423 /* Add server to 'add_servers'. */
3424 s
= raft_server_add(&raft
->add_servers
, &rq
->common
.sid
, rq
->address
);
3425 raft_server_init_leader(raft
, s
);
3426 s
->requester_sid
= rq
->common
.sid
;
3427 s
->requester_conn
= NULL
;
3428 s
->phase
= RAFT_PHASE_CATCHUP
;
3430 /* Start sending the log. If this is the first time we've tried to add
3431 * this server, then this will quickly degenerate into an InstallSnapshot
3432 * followed by a series of AddEntries, but if it's a retry of an earlier
3433 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3434 * leadership) then it will gracefully resume populating the log.
3436 * See the last few paragraphs of section 4.2.1 for further insight. */
3437 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(10, 10);
3439 "starting to add server %s ("SID_FMT
" at %s) "
3440 "to cluster "CID_FMT
, s
->nickname
, SID_ARGS(&s
->sid
),
3441 rq
->address
, CID_ARGS(&raft
->cid
));
3442 raft_send_append_request(raft
, s
, 0, "initialize new server");
3446 raft_handle_add_server_reply(struct raft
*raft
,
3447 const struct raft_add_server_reply
*rpy
)
3449 if (!raft
->joining
) {
3450 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
3451 VLOG_WARN_RL(&rl
, "received add_server_reply even though we're "
3452 "already part of the cluster");
3457 raft
->joining
= false;
3459 /* It is tempting, at this point, to check that this server is part of
3460 * the current configuration. However, this is not necessarily the
3461 * case, because the log entry that added this server to the cluster
3462 * might have been committed by a majority of the cluster that does not
3463 * include this one. This actually happens in testing. */
3465 const char *address
;
3466 SSET_FOR_EACH (address
, &rpy
->remote_addresses
) {
3467 if (sset_add(&raft
->remote_addresses
, address
)) {
3468 VLOG_INFO("%s: learned new server address for joining cluster",
3475 /* This is called by raft_unixctl_kick() as well as via RPC. */
3477 raft_handle_remove_server_request(struct raft
*raft
,
3478 const struct raft_remove_server_request
*rq
)
3480 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3481 if (raft
->role
!= RAFT_LEADER
) {
3482 raft_send_remove_server_reply(raft
, rq
, false, RAFT_SERVER_NOT_LEADER
);
3486 /* If the server to remove is currently waiting to be added, cancel it. */
3487 struct raft_server
*target
= raft_find_new_server(raft
, &rq
->sid
);
3489 raft_send_add_server_reply__(raft
, &target
->sid
, target
->address
,
3490 false, RAFT_SERVER_CANCELED
);
3491 hmap_remove(&raft
->add_servers
, &target
->hmap_node
);
3492 raft_server_destroy(target
);
3496 /* If the server isn't configured, report that. */
3497 target
= raft_find_server(raft
, &rq
->sid
);
3499 raft_send_remove_server_reply(raft
, rq
,
3500 true, RAFT_SERVER_ALREADY_GONE
);
3504 /* Check whether we're waiting for the addition of the server to commit. */
3505 if (target
->phase
== RAFT_PHASE_COMMITTING
) {
3506 raft_send_remove_server_reply(raft
, rq
, false, RAFT_SERVER_COMMITTING
);
3510 /* Check whether the server is already scheduled for removal. */
3511 if (target
->phase
== RAFT_PHASE_REMOVE
) {
3512 raft_send_remove_server_reply(raft
, rq
,
3513 false, RAFT_SERVER_IN_PROGRESS
);
3517 /* Make sure that if we remove this server then that at least one other
3518 * server will be left. We don't count servers currently being added (in
3519 * 'add_servers') since those could fail. */
3520 struct raft_server
*s
;
3522 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
3523 if (s
!= target
&& s
->phase
!= RAFT_PHASE_REMOVE
) {
3528 raft_send_remove_server_reply(raft
, rq
, false, RAFT_SERVER_EMPTY
);
3532 /* Mark the server for removal. */
3533 target
->phase
= RAFT_PHASE_REMOVE
;
3534 if (rq
->requester_conn
) {
3535 target
->requester_sid
= UUID_ZERO
;
3536 unixctl_command_reply(rq
->requester_conn
, "started removal");
3538 target
->requester_sid
= rq
->common
.sid
;
3539 target
->requester_conn
= NULL
;
3542 raft_run_reconfigure(raft
);
3543 /* Operation in progress, reply will be sent later. */
3547 raft_handle_remove_server_reply(struct raft
*raft
,
3548 const struct raft_remove_server_reply
*rpc
)
3551 VLOG_INFO(SID_FMT
": finished leaving cluster "CID_FMT
,
3552 SID_ARGS(&raft
->sid
), CID_ARGS(&raft
->cid
));
3554 raft_record_note(raft
, "left", "this server left the cluster");
3556 raft
->leaving
= false;
3562 raft_handle_write_error(struct raft
*raft
, struct ovsdb_error
*error
)
3564 if (error
&& !raft
->failed
) {
3565 raft
->failed
= true;
3567 char *s
= ovsdb_error_to_string_free(error
);
3568 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3572 return !raft
->failed
;
3575 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
3576 raft_write_snapshot(struct raft
*raft
, struct ovsdb_log
*log
,
3577 uint64_t new_log_start
,
3578 const struct raft_entry
*new_snapshot
)
3580 struct raft_header h
= {
3584 .local_address
= raft
->local_address
,
3585 .snap_index
= new_log_start
- 1,
3586 .snap
= *new_snapshot
,
3588 struct ovsdb_error
*error
= ovsdb_log_write_and_free(
3589 log
, raft_header_to_json(&h
));
3593 ovsdb_log_mark_base(raft
->log
);
3595 /* Write log records. */
3596 for (uint64_t index
= new_log_start
; index
< raft
->log_end
; index
++) {
3597 const struct raft_entry
*e
= &raft
->entries
[index
- raft
->log_start
];
3598 struct raft_record r
= {
3599 .type
= RAFT_REC_ENTRY
,
3604 .servers
= e
->servers
,
3608 error
= ovsdb_log_write_and_free(log
, raft_record_to_json(&r
));
3614 /* Write term and vote (if any).
3616 * The term is redundant if we wrote a log record for that term above. The
3617 * vote, if any, is never redundant.
3619 error
= raft_write_state(log
, raft
->term
, &raft
->vote
);
3624 /* Write commit_index if it's beyond the new start of the log. */
3625 if (raft
->commit_index
>= new_log_start
) {
3626 struct raft_record r
= {
3627 .type
= RAFT_REC_COMMIT_INDEX
,
3628 .commit_index
= raft
->commit_index
,
3630 return ovsdb_log_write_and_free(log
, raft_record_to_json(&r
));
3635 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
3636 raft_save_snapshot(struct raft
*raft
,
3637 uint64_t new_start
, const struct raft_entry
*new_snapshot
)
3640 struct ovsdb_log
*new_log
;
3641 struct ovsdb_error
*error
;
3642 error
= ovsdb_log_replace_start(raft
->log
, &new_log
);
3647 error
= raft_write_snapshot(raft
, new_log
, new_start
, new_snapshot
);
3649 ovsdb_log_replace_abort(new_log
);
3653 return ovsdb_log_replace_commit(raft
->log
, new_log
);
3657 raft_handle_install_snapshot_request__(
3658 struct raft
*raft
, const struct raft_install_snapshot_request
*rq
)
3660 raft_reset_timer(raft
);
3663 * Our behavior here depend on new_log_start in the snapshot compared to
3664 * log_start and log_end. There are three cases:
3666 * Case 1 | Case 2 | Case 3
3667 * <---------------->|<------------->|<------------------>
3671 * T | T | T | T | T |
3677 uint64_t new_log_start
= rq
->last_index
+ 1;
3678 if (new_log_start
< raft
->log_start
) {
3679 /* Case 1: The new snapshot covers less than our current one. Nothing
3682 } else if (new_log_start
< raft
->log_end
) {
3683 /* Case 2: The new snapshot starts in the middle of our log. We could
3684 * discard the first 'new_log_start - raft->log_start' entries in the
3685 * log. But there's not much value in that, since snapshotting is
3686 * supposed to be a local decision. Just skip it. */
3690 /* Case 3: The new snapshot starts past the end of our current log, so
3691 * discard all of our current log. */
3692 const struct raft_entry new_snapshot
= {
3693 .term
= rq
->last_term
,
3695 .eid
= rq
->last_eid
,
3696 .servers
= rq
->last_servers
,
3698 struct ovsdb_error
*error
= raft_save_snapshot(raft
, new_log_start
,
3701 char *error_s
= ovsdb_error_to_string(error
);
3702 VLOG_WARN("could not save snapshot: %s", error_s
);
3707 for (size_t i
= 0; i
< raft
->log_end
- raft
->log_start
; i
++) {
3708 raft_entry_uninit(&raft
->entries
[i
]);
3710 raft
->log_start
= raft
->log_end
= new_log_start
;
3711 raft
->log_synced
= raft
->log_end
- 1;
3712 raft
->commit_index
= raft
->log_start
- 1;
3713 if (raft
->last_applied
< raft
->commit_index
) {
3714 raft
->last_applied
= raft
->log_start
- 2;
3717 raft_entry_uninit(&raft
->snap
);
3718 raft_entry_clone(&raft
->snap
, &new_snapshot
);
3720 raft_get_servers_from_log(raft
, VLL_INFO
);
3726 raft_handle_install_snapshot_request(
3727 struct raft
*raft
, const struct raft_install_snapshot_request
*rq
)
3729 if (raft_handle_install_snapshot_request__(raft
, rq
)) {
3730 union raft_rpc rpy
= {
3731 .install_snapshot_reply
= {
3733 .type
= RAFT_RPC_INSTALL_SNAPSHOT_REPLY
,
3734 .sid
= rq
->common
.sid
,
3737 .last_index
= rq
->last_index
,
3738 .last_term
= rq
->last_term
,
3741 raft_send(raft
, &rpy
);
3746 raft_handle_install_snapshot_reply(
3747 struct raft
*raft
, const struct raft_install_snapshot_reply
*rpy
)
3749 /* We might get an InstallSnapshot reply from a configured server (e.g. a
3750 * peer) or a server in the process of being added. */
3751 struct raft_server
*s
= raft_find_peer(raft
, &rpy
->common
.sid
);
3753 s
= raft_find_new_server(raft
, &rpy
->common
.sid
);
3755 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
3756 VLOG_INFO_RL(&rl
, "cluster "CID_FMT
": received %s from "
3757 "unknown server "SID_FMT
, CID_ARGS(&raft
->cid
),
3758 raft_rpc_type_to_string(rpy
->common
.type
),
3759 SID_ARGS(&rpy
->common
.sid
));
3764 if (rpy
->last_index
!= raft
->log_start
- 1 ||
3765 rpy
->last_term
!= raft
->snap
.term
) {
3766 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
3767 VLOG_INFO_RL(&rl
, "cluster "CID_FMT
": server %s installed "
3768 "out-of-date snapshot, starting over",
3769 CID_ARGS(&raft
->cid
), s
->nickname
);
3770 raft_send_install_snapshot_request(raft
, s
,
3771 "installed obsolete snapshot");
3775 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(10, 10);
3776 VLOG_INFO_RL(&rl
, "cluster "CID_FMT
": installed snapshot on server %s "
3777 " up to %"PRIu64
":%"PRIu64
, CID_ARGS(&raft
->cid
),
3778 s
->nickname
, rpy
->last_term
, rpy
->last_index
);
3779 s
->next_index
= raft
->log_end
;
3780 raft_send_append_request(raft
, s
, 0, "snapshot installed");
3783 /* Returns true if 'raft' has grown enough since the last snapshot that
3784 * reducing the log to a snapshot would be valuable, false otherwise. */
3786 raft_grew_lots(const struct raft
*raft
)
3788 return ovsdb_log_grew_lots(raft
->log
);
3791 /* Returns the number of log entries that could be trimmed off the on-disk log
3792 * by snapshotting. */
3794 raft_get_log_length(const struct raft
*raft
)
3796 return (raft
->last_applied
< raft
->log_start
3798 : raft
->last_applied
- raft
->log_start
+ 1);
3801 /* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
3804 raft_may_snapshot(const struct raft
*raft
)
3806 return (!raft
->joining
3810 && raft
->last_applied
>= raft
->log_start
);
3813 /* Replaces the log for 'raft', up to the last log entry read, by
3814 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
3815 * the caller must eventually free.
3817 * This function can only succeed if raft_may_snapshot() returns true. It is
3818 * only valuable to call it if raft_get_log_length() is significant and
3819 * especially if raft_grew_lots() returns true. */
3820 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
3821 raft_store_snapshot(struct raft
*raft
, const struct json
*new_snapshot_data
)
3823 if (raft
->joining
) {
3824 return ovsdb_error(NULL
,
3825 "cannot store a snapshot while joining cluster");
3826 } else if (raft
->leaving
) {
3827 return ovsdb_error(NULL
,
3828 "cannot store a snapshot while leaving cluster");
3829 } else if (raft
->left
) {
3830 return ovsdb_error(NULL
,
3831 "cannot store a snapshot after leaving cluster");
3832 } else if (raft
->failed
) {
3833 return ovsdb_error(NULL
,
3834 "cannot store a snapshot following failure");
3837 if (raft
->last_applied
< raft
->log_start
) {
3838 return ovsdb_error(NULL
, "not storing a duplicate snapshot");
3841 uint64_t new_log_start
= raft
->last_applied
+ 1;
3842 const struct raft_entry new_snapshot
= {
3843 .term
= raft_get_term(raft
, new_log_start
- 1),
3844 .data
= CONST_CAST(struct json
*, new_snapshot_data
),
3845 .eid
= *raft_get_eid(raft
, new_log_start
- 1),
3846 .servers
= CONST_CAST(struct json
*,
3847 raft_servers_for_index(raft
, new_log_start
- 1)),
3849 struct ovsdb_error
*error
= raft_save_snapshot(raft
, new_log_start
,
3855 raft
->log_synced
= raft
->log_end
- 1;
3856 raft_entry_uninit(&raft
->snap
);
3857 raft_entry_clone(&raft
->snap
, &new_snapshot
);
3858 for (size_t i
= 0; i
< new_log_start
- raft
->log_start
; i
++) {
3859 raft_entry_uninit(&raft
->entries
[i
]);
3861 memmove(&raft
->entries
[0], &raft
->entries
[new_log_start
- raft
->log_start
],
3862 (raft
->log_end
- new_log_start
) * sizeof *raft
->entries
);
3863 raft
->log_start
= new_log_start
;
3868 raft_handle_become_leader(struct raft
*raft
,
3869 const struct raft_become_leader
*rq
)
3871 if (raft
->role
== RAFT_FOLLOWER
) {
3872 char buf
[SID_LEN
+ 1];
3873 VLOG_INFO("received leadership transfer from %s in term %"PRIu64
,
3874 raft_get_nickname(raft
, &rq
->common
.sid
, buf
, sizeof buf
),
3876 raft_start_election(raft
, true);
3881 raft_send_execute_command_reply(struct raft
*raft
,
3882 const struct uuid
*sid
,
3883 const struct uuid
*eid
,
3884 enum raft_command_status status
,
3885 uint64_t commit_index
)
3887 union raft_rpc rpc
= {
3888 .execute_command_reply
= {
3890 .type
= RAFT_RPC_EXECUTE_COMMAND_REPLY
,
3895 .commit_index
= commit_index
,
3898 raft_send(raft
, &rpc
);
3901 static enum raft_command_status
3902 raft_handle_execute_command_request__(
3903 struct raft
*raft
, const struct raft_execute_command_request
*rq
)
3905 if (raft
->role
!= RAFT_LEADER
) {
3906 return RAFT_CMD_NOT_LEADER
;
3909 const struct uuid
*current_eid
= raft_current_eid(raft
);
3910 if (!uuid_equals(&rq
->prereq
, current_eid
)) {
3911 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
3912 VLOG_INFO_RL(&rl
, "current entry eid "UUID_FMT
" does not match "
3913 "prerequisite "UUID_FMT
" in execute_command_request",
3914 UUID_ARGS(current_eid
), UUID_ARGS(&rq
->prereq
));
3915 return RAFT_CMD_BAD_PREREQ
;
3918 struct raft_command
*cmd
= raft_command_initiate(raft
, rq
->data
,
3920 cmd
->sid
= rq
->common
.sid
;
3922 enum raft_command_status status
= cmd
->status
;
3923 if (status
!= RAFT_CMD_INCOMPLETE
) {
3924 raft_command_unref(cmd
);
3930 raft_handle_execute_command_request(
3931 struct raft
*raft
, const struct raft_execute_command_request
*rq
)
3933 enum raft_command_status status
3934 = raft_handle_execute_command_request__(raft
, rq
);
3935 if (status
!= RAFT_CMD_INCOMPLETE
) {
3936 raft_send_execute_command_reply(raft
, &rq
->common
.sid
, &rq
->result
,
3942 raft_handle_execute_command_reply(
3943 struct raft
*raft
, const struct raft_execute_command_reply
*rpy
)
3945 struct raft_command
*cmd
= raft_find_command_by_eid(raft
, &rpy
->result
);
3947 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
3948 char buf
[SID_LEN
+ 1];
3950 "%s received \"%s\" reply from %s for unknown command",
3951 raft
->local_nickname
,
3952 raft_command_status_to_string(rpy
->status
),
3953 raft_get_nickname(raft
, &rpy
->common
.sid
,
3958 if (rpy
->status
== RAFT_CMD_INCOMPLETE
) {
3959 cmd
->timestamp
= time_msec();
3961 cmd
->index
= rpy
->commit_index
;
3962 raft_command_complete(raft
, cmd
, rpy
->status
);
3967 raft_handle_rpc(struct raft
*raft
, const union raft_rpc
*rpc
)
3969 uint64_t term
= raft_rpc_get_term(rpc
);
3971 && !raft_should_suppress_disruptive_server(raft
, rpc
)
3972 && !raft_receive_term__(raft
, &rpc
->common
, term
)) {
3973 if (rpc
->type
== RAFT_RPC_APPEND_REQUEST
) {
3974 /* Section 3.3: "If a server receives a request with a stale term
3975 * number, it rejects the request." */
3976 raft_send_append_reply(raft
, raft_append_request_cast(rpc
),
3977 RAFT_APPEND_INCONSISTENCY
, "stale term");
3982 switch (rpc
->type
) {
3983 #define RAFT_RPC(ENUM, NAME) \
3985 raft_handle_##NAME(raft, &rpc->NAME); \
3995 raft_rpc_is_heartbeat(const union raft_rpc
*rpc
)
3997 return ((rpc
->type
== RAFT_RPC_APPEND_REQUEST
3998 || rpc
->type
== RAFT_RPC_APPEND_REPLY
)
3999 && rpc
->common
.comment
4000 && !strcmp(rpc
->common
.comment
, "heartbeat"));
4005 raft_send__(struct raft
*raft
, const union raft_rpc
*rpc
,
4006 struct raft_conn
*conn
)
4008 log_rpc(rpc
, "-->", conn
);
4009 return !jsonrpc_session_send(
4010 conn
->js
, raft_rpc_to_jsonrpc(&raft
->cid
, &raft
->sid
, rpc
));
4014 raft_is_rpc_synced(const struct raft
*raft
, const union raft_rpc
*rpc
)
4016 uint64_t term
= raft_rpc_get_term(rpc
);
4017 uint64_t index
= raft_rpc_get_min_sync_index(rpc
);
4018 const struct uuid
*vote
= raft_rpc_get_vote(rpc
);
4020 return (term
<= raft
->synced_term
4021 && index
<= raft
->log_synced
4022 && (!vote
|| uuid_equals(vote
, &raft
->synced_vote
)));
4026 raft_send(struct raft
*raft
, const union raft_rpc
*rpc
)
4028 const struct uuid
*dst
= &rpc
->common
.sid
;
4029 if (uuid_equals(dst
, &raft
->sid
)) {
4030 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 1);
4031 VLOG_WARN_RL(&rl
, "attempting to send RPC to self");
4035 struct raft_conn
*conn
= raft_find_conn_by_sid(raft
, dst
);
4037 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 1);
4038 char buf
[SID_LEN
+ 1];
4039 VLOG_DBG_RL(&rl
, "%s: no connection to %s, cannot send RPC",
4040 raft
->local_nickname
,
4041 raft_get_nickname(raft
, dst
, buf
, sizeof buf
));
4045 if (!raft_is_rpc_synced(raft
, rpc
)) {
4046 raft_waiter_create(raft
, RAFT_W_RPC
, false)->rpc
= raft_rpc_clone(rpc
);
4050 return raft_send__(raft
, rpc
, conn
);
4053 static struct raft
*
4054 raft_lookup_by_name(const char *name
)
4058 HMAP_FOR_EACH_WITH_HASH (raft
, hmap_node
, hash_string(name
, 0),
4060 if (!strcmp(raft
->name
, name
)) {
4068 raft_unixctl_cid(struct unixctl_conn
*conn
,
4069 int argc OVS_UNUSED
, const char *argv
[],
4070 void *aux OVS_UNUSED
)
4072 struct raft
*raft
= raft_lookup_by_name(argv
[1]);
4074 unixctl_command_reply_error(conn
, "unknown cluster");
4075 } else if (uuid_is_zero(&raft
->cid
)) {
4076 unixctl_command_reply_error(conn
, "cluster id not yet known");
4078 char *uuid
= xasprintf(UUID_FMT
, UUID_ARGS(&raft
->cid
));
4079 unixctl_command_reply(conn
, uuid
);
4085 raft_unixctl_sid(struct unixctl_conn
*conn
,
4086 int argc OVS_UNUSED
, const char *argv
[],
4087 void *aux OVS_UNUSED
)
4089 struct raft
*raft
= raft_lookup_by_name(argv
[1]);
4091 unixctl_command_reply_error(conn
, "unknown cluster");
4093 char *uuid
= xasprintf(UUID_FMT
, UUID_ARGS(&raft
->sid
));
4094 unixctl_command_reply(conn
, uuid
);
4100 raft_put_sid(const char *title
, const struct uuid
*sid
,
4101 const struct raft
*raft
, struct ds
*s
)
4103 ds_put_format(s
, "%s: ", title
);
4104 if (uuid_equals(sid
, &raft
->sid
)) {
4105 ds_put_cstr(s
, "self");
4106 } else if (uuid_is_zero(sid
)) {
4107 ds_put_cstr(s
, "unknown");
4109 char buf
[SID_LEN
+ 1];
4110 ds_put_cstr(s
, raft_get_nickname(raft
, sid
, buf
, sizeof buf
));
4112 ds_put_char(s
, '\n');
4116 raft_unixctl_status(struct unixctl_conn
*conn
,
4117 int argc OVS_UNUSED
, const char *argv
[],
4118 void *aux OVS_UNUSED
)
4120 struct raft
*raft
= raft_lookup_by_name(argv
[1]);
4122 unixctl_command_reply_error(conn
, "unknown cluster");
4126 struct ds s
= DS_EMPTY_INITIALIZER
;
4127 ds_put_format(&s
, "%s\n", raft
->local_nickname
);
4128 ds_put_format(&s
, "Name: %s\n", raft
->name
);
4129 ds_put_format(&s
, "Cluster ID: ");
4130 if (!uuid_is_zero(&raft
->cid
)) {
4131 ds_put_format(&s
, CID_FMT
" ("UUID_FMT
")\n",
4132 CID_ARGS(&raft
->cid
), UUID_ARGS(&raft
->cid
));
4134 ds_put_format(&s
, "not yet known\n");
4136 ds_put_format(&s
, "Server ID: "SID_FMT
" ("UUID_FMT
")\n",
4137 SID_ARGS(&raft
->sid
), UUID_ARGS(&raft
->sid
));
4138 ds_put_format(&s
, "Address: %s\n", raft
->local_address
);
4139 ds_put_format(&s
, "Status: %s\n",
4140 raft
->joining
? "joining cluster"
4141 : raft
->leaving
? "leaving cluster"
4142 : raft
->left
? "left cluster"
4143 : raft
->failed
? "failed"
4144 : "cluster member");
4145 if (raft
->joining
) {
4146 ds_put_format(&s
, "Remotes for joining:");
4147 const char *address
;
4148 SSET_FOR_EACH (address
, &raft
->remote_addresses
) {
4149 ds_put_format(&s
, " %s", address
);
4151 ds_put_char(&s
, '\n');
4153 if (raft
->role
== RAFT_LEADER
) {
4154 struct raft_server
*as
;
4155 HMAP_FOR_EACH (as
, hmap_node
, &raft
->add_servers
) {
4156 ds_put_format(&s
, "Adding server %s ("SID_FMT
" at %s) (%s)\n",
4157 as
->nickname
, SID_ARGS(&as
->sid
), as
->address
,
4158 raft_server_phase_to_string(as
->phase
));
4161 struct raft_server
*rs
= raft
->remove_server
;
4163 ds_put_format(&s
, "Removing server %s ("SID_FMT
" at %s) (%s)\n",
4164 rs
->nickname
, SID_ARGS(&rs
->sid
), rs
->address
,
4165 raft_server_phase_to_string(rs
->phase
));
4169 ds_put_format(&s
, "Role: %s\n",
4170 raft
->role
== RAFT_LEADER
? "leader"
4171 : raft
->role
== RAFT_CANDIDATE
? "candidate"
4172 : raft
->role
== RAFT_FOLLOWER
? "follower"
4174 ds_put_format(&s
, "Term: %"PRIu64
"\n", raft
->term
);
4175 raft_put_sid("Leader", &raft
->leader_sid
, raft
, &s
);
4176 raft_put_sid("Vote", &raft
->vote
, raft
, &s
);
4177 ds_put_char(&s
, '\n');
4179 ds_put_format(&s
, "Log: [%"PRIu64
", %"PRIu64
"]\n",
4180 raft
->log_start
, raft
->log_end
);
4182 uint64_t n_uncommitted
= raft
->log_end
- raft
->commit_index
- 1;
4183 ds_put_format(&s
, "Entries not yet committed: %"PRIu64
"\n", n_uncommitted
);
4185 uint64_t n_unapplied
= raft
->log_end
- raft
->last_applied
- 1;
4186 ds_put_format(&s
, "Entries not yet applied: %"PRIu64
"\n", n_unapplied
);
4188 const struct raft_conn
*c
;
4189 ds_put_cstr(&s
, "Connections:");
4190 LIST_FOR_EACH (c
, list_node
, &raft
->conns
) {
4191 bool connected
= jsonrpc_session_is_connected(c
->js
);
4192 ds_put_format(&s
, " %s%s%s%s",
4193 connected
? "" : "(",
4194 c
->incoming
? "<-" : "->", c
->nickname
,
4195 connected
? "" : ")");
4197 ds_put_char(&s
, '\n');
4199 ds_put_cstr(&s
, "Servers:\n");
4200 struct raft_server
*server
;
4201 HMAP_FOR_EACH (server
, hmap_node
, &raft
->servers
) {
4202 ds_put_format(&s
, " %s ("SID_FMT
" at %s)",
4204 SID_ARGS(&server
->sid
), server
->address
);
4205 if (uuid_equals(&server
->sid
, &raft
->sid
)) {
4206 ds_put_cstr(&s
, " (self)");
4208 if (server
->phase
!= RAFT_PHASE_STABLE
) {
4209 ds_put_format (&s
, " (%s)",
4210 raft_server_phase_to_string(server
->phase
));
4212 if (raft
->role
== RAFT_CANDIDATE
) {
4213 if (!uuid_is_zero(&server
->vote
)) {
4214 char buf
[SID_LEN
+ 1];
4215 ds_put_format(&s
, " (voted for %s)",
4216 raft_get_nickname(raft
, &server
->vote
,
4219 } else if (raft
->role
== RAFT_LEADER
) {
4220 ds_put_format(&s
, " next_index=%"PRIu64
" match_index=%"PRIu64
,
4221 server
->next_index
, server
->match_index
);
4223 ds_put_char(&s
, '\n');
4226 unixctl_command_reply(conn
, ds_cstr(&s
));
4231 raft_unixctl_leave__(struct unixctl_conn
*conn
, struct raft
*raft
)
4233 if (raft_is_leaving(raft
)) {
4234 unixctl_command_reply_error(conn
,
4235 "already in progress leaving cluster");
4236 } else if (raft_is_joining(raft
)) {
4237 unixctl_command_reply_error(conn
,
4238 "can't leave while join in progress");
4239 } else if (raft_failed(raft
)) {
4240 unixctl_command_reply_error(conn
,
4241 "can't leave after failure");
4244 unixctl_command_reply(conn
, NULL
);
4249 raft_unixctl_leave(struct unixctl_conn
*conn
, int argc OVS_UNUSED
,
4250 const char *argv
[], void *aux OVS_UNUSED
)
4252 struct raft
*raft
= raft_lookup_by_name(argv
[1]);
4254 unixctl_command_reply_error(conn
, "unknown cluster");
4258 raft_unixctl_leave__(conn
, raft
);
4261 static struct raft_server
*
4262 raft_lookup_server_best_match(struct raft
*raft
, const char *id
)
4264 struct raft_server
*best
= NULL
;
4265 int best_score
= -1;
4268 struct raft_server
*s
;
4269 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
4270 int score
= (!strcmp(id
, s
->address
)
4272 : uuid_is_partial_match(&s
->sid
, id
));
4273 if (score
> best_score
) {
4277 } else if (score
== best_score
) {
4281 return n_best
== 1 ? best
: NULL
;
4285 raft_unixctl_kick(struct unixctl_conn
*conn
, int argc OVS_UNUSED
,
4286 const char *argv
[], void *aux OVS_UNUSED
)
4288 const char *cluster_name
= argv
[1];
4289 const char *server_name
= argv
[2];
4291 struct raft
*raft
= raft_lookup_by_name(cluster_name
);
4293 unixctl_command_reply_error(conn
, "unknown cluster");
4297 struct raft_server
*server
= raft_lookup_server_best_match(raft
,
4300 unixctl_command_reply_error(conn
, "unknown server");
4304 if (uuid_equals(&server
->sid
, &raft
->sid
)) {
4305 raft_unixctl_leave__(conn
, raft
);
4306 } else if (raft
->role
== RAFT_LEADER
) {
4307 const struct raft_remove_server_request rq
= {
4309 .requester_conn
= conn
,
4311 raft_handle_remove_server_request(raft
, &rq
);
4313 const union raft_rpc rpc
= {
4314 .remove_server_request
= {
4316 .type
= RAFT_RPC_REMOVE_SERVER_REQUEST
,
4317 .sid
= raft
->leader_sid
,
4318 .comment
= "via unixctl"
4323 if (raft_send(raft
, &rpc
)) {
4324 unixctl_command_reply(conn
, "sent removal request to leader");
4326 unixctl_command_reply_error(conn
,
4327 "failed to send removal request");
4335 static struct ovsthread_once once
= OVSTHREAD_ONCE_INITIALIZER
;
4336 if (!ovsthread_once_start(&once
)) {
4339 unixctl_command_register("cluster/cid", "DB", 1, 1,
4340 raft_unixctl_cid
, NULL
);
4341 unixctl_command_register("cluster/sid", "DB", 1, 1,
4342 raft_unixctl_sid
, NULL
);
4343 unixctl_command_register("cluster/status", "DB", 1, 1,
4344 raft_unixctl_status
, NULL
);
4345 unixctl_command_register("cluster/leave", "DB", 1, 1,
4346 raft_unixctl_leave
, NULL
);
4347 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4348 raft_unixctl_kick
, NULL
);
4349 ovsthread_once_done(&once
);