2 * Copyright (c) 2017, 2018 Nicira, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include "raft-private.h"
28 #include "openvswitch/dynamic-string.h"
29 #include "openvswitch/hmap.h"
30 #include "openvswitch/json.h"
31 #include "openvswitch/list.h"
32 #include "openvswitch/poll-loop.h"
33 #include "openvswitch/vlog.h"
34 #include "ovsdb-error.h"
35 #include "ovsdb-parser.h"
36 #include "ovsdb/log.h"
40 #include "socket-util.h"
48 VLOG_DEFINE_THIS_MODULE(raft
);
50 /* Roles for a Raft server:
52 * - Followers: Servers in touch with the current leader.
54 * - Candidate: Servers unaware of a current leader and seeking election to
57 * - Leader: Handles all client requests. At most one at a time.
59 * In normal operation there is exactly one leader and all of the other servers
67 /* Flags for unit tests. */
68 enum raft_failure_test
{
70 FT_CRASH_BEFORE_SEND_APPEND_REQ
,
71 FT_CRASH_AFTER_SEND_APPEND_REQ
,
72 FT_CRASH_BEFORE_SEND_EXEC_REP
,
73 FT_CRASH_AFTER_SEND_EXEC_REP
,
74 FT_CRASH_BEFORE_SEND_EXEC_REQ
,
75 FT_CRASH_AFTER_SEND_EXEC_REQ
,
76 FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE
,
78 FT_DONT_SEND_VOTE_REQUEST
80 static enum raft_failure_test failure_test
;
82 /* A connection between this Raft server and another one. */
84 struct ovs_list list_node
; /* In struct raft's 'conns' list. */
85 struct jsonrpc_session
*js
; /* JSON-RPC connection. */
86 struct uuid sid
; /* This server's unique ID. */
87 char *nickname
; /* Short name for use in log messages. */
88 bool incoming
; /* True if incoming, false if outgoing. */
89 unsigned int js_seqno
; /* Seqno for noticing (re)connections. */
92 static void raft_conn_close(struct raft_conn
*);
94 /* A "command", that is, a request to append an entry to the log.
96 * The Raft specification only allows clients to issue commands to the leader.
97 * With this implementation, clients may issue a command on any server, which
98 * then relays the command to the leader if necessary.
100 * This structure is thus used in three cases:
102 * 1. We are the leader and the command was issued to us directly.
104 * 2. We are a follower and relayed the command to the leader.
106 * 3. We are the leader and a follower relayed the command to us.
108 struct raft_command
{
110 struct hmap_node hmap_node
; /* In struct raft's 'commands' hmap. */
111 unsigned int n_refs
; /* Reference count. */
112 enum raft_command_status status
; /* Execution status. */
113 struct uuid eid
; /* Entry ID of result. */
116 uint64_t index
; /* Index in log (0 if being relayed). */
119 long long int timestamp
; /* Issue or last ping time, for expiration. */
122 struct uuid sid
; /* The follower (otherwise UUID_ZERO). */
125 static void raft_command_complete(struct raft
*, struct raft_command
*,
126 enum raft_command_status
);
128 static void raft_complete_all_commands(struct raft
*,
129 enum raft_command_status
);
131 /* Type of deferred action, see struct raft_waiter. */
132 enum raft_waiter_type
{
138 /* An action deferred until a log write commits to disk. */
140 struct ovs_list list_node
;
141 uint64_t commit_ticket
;
143 enum raft_waiter_type type
;
147 * Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
148 * completion, updates 'log_synced' to indicate that the new log entry
149 * or entries are committed and, if we are leader, also updates our
150 * local 'match_index'. */
157 * Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
158 * Upon completion, updates 'synced_term' and 'synced_vote', which
159 * triggers sending RPCs deferred by the uncommitted 'term' and
168 * Sometimes, sending an RPC to a peer must be delayed until an entry,
169 * a term, or a vote mentioned in the RPC is synced to disk. This
170 * waiter keeps a copy of such an RPC until the previous waiters have
176 static struct raft_waiter
*raft_waiter_create(struct raft
*,
177 enum raft_waiter_type
,
179 static void raft_waiters_destroy(struct raft
*);
181 /* The Raft state machine. */
183 struct hmap_node hmap_node
; /* In 'all_rafts'. */
184 struct ovsdb_log
*log
;
186 /* Persistent derived state.
188 * This must be updated on stable storage before responding to RPCs. It can be
189 * derived from the header, snapshot, and log in 'log'. */
191 struct uuid cid
; /* Cluster ID (immutable for the cluster). */
192 struct uuid sid
; /* Server ID (immutable for the server). */
193 char *local_address
; /* Local address (immutable for the server). */
194 char *local_nickname
; /* Used for local server in log messages. */
195 char *name
; /* Schema name (immutable for the cluster). */
197 /* Contains "struct raft_server"s and represents the server configuration
198 * most recently added to 'log'. */
201 #define ELECTION_BASE_MSEC 1000
202 #define ELECTION_RANGE_MSEC 1000
203 /* The election timeout base value for leader election, in milliseconds.
204 * It can be set by unixctl cluster/change-election-timer. Default value is
205 * ELECTION_BASE_MSEC. */
206 uint64_t election_timer
;
207 /* If not 0, it is the new value of election_timer being proposed. */
208 uint64_t election_timer_new
;
210 /* Persistent state on all servers.
212 * Must be updated on stable storage before responding to RPCs. */
214 /* Current term and the vote for that term. These might be on the way to
216 uint64_t term
; /* Initialized to 0 and only increases. */
217 struct uuid vote
; /* All-zeros if no vote yet in 'term'. */
219 /* The term and vote that have been synced to disk. */
220 uint64_t synced_term
;
221 struct uuid synced_vote
;
225 * A log entry with index 1 never really exists; the initial snapshot for a
226 * Raft is considered to include this index. The first real log entry has
229 * A new Raft instance contains an empty log: log_start=2, log_end=2.
230 * Over time, the log grows: log_start=2, log_end=N.
231 * At some point, the server takes a snapshot: log_start=N, log_end=N.
232 * The log continues to grow: log_start=N, log_end=N+1...
234 * Must be updated on stable storage before responding to RPCs. */
235 struct raft_entry
*entries
; /* Log entry i is in log[i - log_start]. */
236 uint64_t log_start
; /* Index of first entry in log. */
237 uint64_t log_end
; /* Index of last entry in log, plus 1. */
238 uint64_t log_synced
; /* Index of last synced entry. */
239 size_t allocated_log
; /* Allocated entries in 'log'. */
241 /* Snapshot state (see Figure 5.1)
243 * This is the state of the cluster as of the last discarded log entry,
244 * that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
245 * Only committed log entries can be included in a snapshot. */
246 struct raft_entry snap
;
250 * The snapshot is always committed, but the rest of the log might not be yet.
251 * 'last_applied' tracks what entries have been passed to the client. If the
252 * client hasn't yet read the latest snapshot, then even the snapshot isn't
253 * applied yet. Thus, the invariants are different for these members:
255 * log_start - 2 <= last_applied <= commit_index < log_end.
256 * log_start - 1 <= commit_index < log_end.
259 enum raft_role role
; /* Current role. */
260 uint64_t commit_index
; /* Max log index known to be committed. */
261 uint64_t last_applied
; /* Max log index applied to state machine. */
262 struct uuid leader_sid
; /* Server ID of leader (zero, if unknown). */
264 long long int election_base
; /* Time of last heartbeat from leader. */
265 long long int election_timeout
; /* Time at which we start an election. */
267 long long int election_start
; /* Start election time. */
268 long long int election_won
; /* Time of election completion. */
269 bool leadership_transfer
; /* Was the leadership transferred? */
271 unsigned int n_disconnections
;
273 /* Used for joining a cluster. */
274 bool joining
; /* Attempting to join the cluster? */
275 struct sset remote_addresses
; /* Addresses to try to find other servers. */
276 long long int join_timeout
; /* Time to re-send add server request. */
278 /* Used for leaving a cluster. */
279 bool leaving
; /* True if we are leaving the cluster. */
280 bool left
; /* True if we have finished leaving. */
281 long long int leave_timeout
; /* Time to re-send remove server request. */
284 bool failed
; /* True if unrecoverable error has occurred. */
286 /* File synchronization. */
287 struct ovs_list waiters
; /* Contains "struct raft_waiter"s. */
289 /* Network connections. */
290 struct pstream
*listener
; /* For connections from other Raft servers. */
291 long long int listen_backoff
; /* For retrying creating 'listener'. */
292 struct ovs_list conns
; /* Contains struct raft_conns. */
294 /* Leaders only. Reinitialized after becoming leader. */
295 struct hmap add_servers
; /* Contains "struct raft_server"s to add. */
296 struct raft_server
*remove_server
; /* Server being removed. */
297 struct hmap commands
; /* Contains "struct raft_command"s. */
298 long long int ping_timeout
; /* Time at which to send a heartbeat */
300 /* Candidates only. Reinitialized at start of election. */
301 int n_votes
; /* Number of votes for me. */
303 /* Followers and candidates only. */
304 bool candidate_retrying
; /* The earlier election timed-out and we are
306 bool had_leader
; /* There has been leader elected since last
307 election initiated. This is to help setting
308 candidate_retrying. */
311 bool ever_had_leader
; /* There has been leader elected since the raft
312 is initialized, meaning it is ever
315 /* Connection backlog limits. */
316 #define DEFAULT_MAX_BACKLOG_N_MSGS 500
317 #define DEFAULT_MAX_BACKLOG_N_BYTES UINT32_MAX
318 size_t conn_backlog_max_n_msgs
; /* Number of messages. */
319 size_t conn_backlog_max_n_bytes
; /* Number of bytes. */
322 /* All Raft structures. */
323 static struct hmap all_rafts
= HMAP_INITIALIZER(&all_rafts
);
325 static void raft_init(void);
327 static struct ovsdb_error
*raft_read_header(struct raft
*)
328 OVS_WARN_UNUSED_RESULT
;
330 static void raft_send_execute_command_reply(struct raft
*,
331 const struct uuid
*sid
,
332 const struct uuid
*eid
,
333 enum raft_command_status
,
334 uint64_t commit_index
);
336 static void raft_update_our_match_index(struct raft
*, uint64_t min_index
);
338 static void raft_send_remove_server_reply__(
339 struct raft
*, const struct uuid
*target_sid
,
340 const struct uuid
*requester_sid
, struct unixctl_conn
*requester_conn
,
341 bool success
, const char *comment
);
342 static void raft_finished_leaving_cluster(struct raft
*);
344 static void raft_server_init_leader(struct raft
*, struct raft_server
*);
346 static bool raft_rpc_is_heartbeat(const union raft_rpc
*);
347 static bool raft_is_rpc_synced(const struct raft
*, const union raft_rpc
*);
349 static void raft_handle_rpc(struct raft
*, const union raft_rpc
*);
351 static bool raft_send_at(struct raft
*, const union raft_rpc
*,
353 #define raft_send(raft, rpc) raft_send_at(raft, rpc, __LINE__)
355 static bool raft_send_to_conn_at(struct raft
*, const union raft_rpc
*,
356 struct raft_conn
*, int line_number
);
357 #define raft_send_to_conn(raft, rpc, conn) \
358 raft_send_to_conn_at(raft, rpc, conn, __LINE__)
360 static void raft_send_append_request(struct raft
*,
361 struct raft_server
*, unsigned int n
,
362 const char *comment
);
364 static void raft_become_leader(struct raft
*);
365 static void raft_become_follower(struct raft
*);
366 static void raft_reset_election_timer(struct raft
*);
367 static void raft_reset_ping_timer(struct raft
*);
368 static void raft_send_heartbeats(struct raft
*);
369 static void raft_start_election(struct raft
*, bool leadership_transfer
);
370 static bool raft_truncate(struct raft
*, uint64_t new_end
);
371 static void raft_get_servers_from_log(struct raft
*, enum vlog_level
);
372 static void raft_get_election_timer_from_log(struct raft
*);
374 static bool raft_handle_write_error(struct raft
*, struct ovsdb_error
*);
376 static void raft_run_reconfigure(struct raft
*);
378 static void raft_set_leader(struct raft
*, const struct uuid
*sid
);
379 static struct raft_server
*
380 raft_find_server(const struct raft
*raft
, const struct uuid
*sid
)
382 return raft_server_find(&raft
->servers
, sid
);
386 raft_make_address_passive(const char *address_
)
388 if (!strncmp(address_
, "unix:", 5)) {
389 return xasprintf("p%s", address_
);
391 char *address
= xstrdup(address_
);
393 inet_parse_host_port_tokens(strchr(address
, ':') + 1, &host
, &port
);
395 struct ds paddr
= DS_EMPTY_INITIALIZER
;
396 ds_put_format(&paddr
, "p%.3s:%s:", address
, port
);
397 if (strchr(host
, ':')) {
398 ds_put_format(&paddr
, "[%s]", host
);
400 ds_put_cstr(&paddr
, host
);
403 return ds_steal_cstr(&paddr
);
412 struct raft
*raft
= xzalloc(sizeof *raft
);
413 hmap_node_nullify(&raft
->hmap_node
);
414 hmap_init(&raft
->servers
);
415 raft
->log_start
= raft
->log_end
= 1;
416 raft
->role
= RAFT_FOLLOWER
;
417 sset_init(&raft
->remote_addresses
);
418 raft
->join_timeout
= LLONG_MAX
;
419 ovs_list_init(&raft
->waiters
);
420 raft
->listen_backoff
= LLONG_MIN
;
421 ovs_list_init(&raft
->conns
);
422 hmap_init(&raft
->add_servers
);
423 hmap_init(&raft
->commands
);
425 raft
->election_timer
= ELECTION_BASE_MSEC
;
427 raft
->conn_backlog_max_n_msgs
= DEFAULT_MAX_BACKLOG_N_MSGS
;
428 raft
->conn_backlog_max_n_bytes
= DEFAULT_MAX_BACKLOG_N_BYTES
;
433 /* Creates an on-disk file that represents a new Raft cluster and initializes
434 * it to consist of a single server, the one on which this function is called.
436 * Creates the local copy of the cluster's log in 'file_name', which must not
437 * already exist. Gives it the name 'name', which should be the database
438 * schema name and which is used only to match up this database with the server
439 * added to the cluster later if the cluster ID is unavailable.
441 * The new server is located at 'local_address', which must take one of the
442 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
443 * square bracket enclosed IPv6 address and PORT is a TCP port number.
445 * This only creates the on-disk file. Use raft_open() to start operating the
448 * Returns null if successful, otherwise an ovsdb_error describing the
450 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
451 raft_create_cluster(const char *file_name
, const char *name
,
452 const char *local_address
, const struct json
*data
)
454 /* Parse and verify validity of the local address. */
455 struct ovsdb_error
*error
= raft_address_validate(local_address
);
460 /* Create log file. */
461 struct ovsdb_log
*log
;
462 error
= ovsdb_log_open(file_name
, RAFT_MAGIC
, OVSDB_LOG_CREATE_EXCL
,
468 /* Write log file. */
469 struct raft_header h
= {
470 .sid
= uuid_random(),
471 .cid
= uuid_random(),
472 .name
= xstrdup(name
),
473 .local_address
= xstrdup(local_address
),
475 .remote_addresses
= SSET_INITIALIZER(&h
.remote_addresses
),
479 .data
= json_nullable_clone(data
),
480 .eid
= uuid_random(),
481 .servers
= json_object_create(),
484 shash_add_nocopy(json_object(h
.snap
.servers
),
485 xasprintf(UUID_FMT
, UUID_ARGS(&h
.sid
)),
486 json_string_create(local_address
));
487 error
= ovsdb_log_write_and_free(log
, raft_header_to_json(&h
));
488 raft_header_uninit(&h
);
490 error
= ovsdb_log_commit_block(log
);
492 ovsdb_log_close(log
);
497 /* Creates a database file that represents a new server in an existing Raft
500 * Creates the local copy of the cluster's log in 'file_name', which must not
501 * already exist. Gives it the name 'name', which must be the same name
502 * passed in to raft_create_cluster() earlier.
504 * 'cid' is optional. If specified, the new server will join only the cluster
505 * with the given cluster ID.
507 * The new server is located at 'local_address', which must take one of the
508 * forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
509 * square bracket enclosed IPv6 address and PORT is a TCP port number.
511 * Joining the cluster requires contacting it. Thus, 'remote_addresses'
512 * specifies the addresses of existing servers in the cluster. One server out
513 * of the existing cluster is sufficient, as long as that server is reachable
514 * and not partitioned from the current cluster leader. If multiple servers
515 * from the cluster are specified, then it is sufficient for any of them to
516 * meet this criterion.
518 * This only creates the on-disk file and does no network access. Use
519 * raft_open() to start operating the new server. (Until this happens, the
520 * new server has not joined the cluster.)
522 * Returns null if successful, otherwise an ovsdb_error describing the
524 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
525 raft_join_cluster(const char *file_name
,
526 const char *name
, const char *local_address
,
527 const struct sset
*remote_addresses
,
528 const struct uuid
*cid
)
530 ovs_assert(!sset_is_empty(remote_addresses
));
532 /* Parse and verify validity of the addresses. */
533 struct ovsdb_error
*error
= raft_address_validate(local_address
);
538 SSET_FOR_EACH (addr
, remote_addresses
) {
539 error
= raft_address_validate(addr
);
543 if (!strcmp(addr
, local_address
)) {
544 return ovsdb_error(NULL
, "remote addresses cannot be the same "
545 "as the local address");
549 /* Verify validity of the cluster ID (if provided). */
550 if (cid
&& uuid_is_zero(cid
)) {
551 return ovsdb_error(NULL
, "all-zero UUID is not valid cluster ID");
554 /* Create log file. */
555 struct ovsdb_log
*log
;
556 error
= ovsdb_log_open(file_name
, RAFT_MAGIC
, OVSDB_LOG_CREATE_EXCL
,
562 /* Write log file. */
563 struct raft_header h
= {
564 .sid
= uuid_random(),
565 .cid
= cid
? *cid
: UUID_ZERO
,
566 .name
= xstrdup(name
),
567 .local_address
= xstrdup(local_address
),
569 /* No snapshot yet. */
571 sset_clone(&h
.remote_addresses
, remote_addresses
);
572 error
= ovsdb_log_write_and_free(log
, raft_header_to_json(&h
));
573 raft_header_uninit(&h
);
575 error
= ovsdb_log_commit_block(log
);
577 ovsdb_log_close(log
);
582 /* Reads the initial header record from 'log', which must be a Raft clustered
583 * database log, and populates '*md' with the information read from it. The
584 * caller must eventually destroy 'md' with raft_metadata_destroy().
586 * On success, returns NULL. On failure, returns an error that the caller must
587 * eventually destroy and zeros '*md'. */
588 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
589 raft_read_metadata(struct ovsdb_log
*log
, struct raft_metadata
*md
)
591 struct raft
*raft
= raft_alloc();
594 struct ovsdb_error
*error
= raft_read_header(raft
);
597 md
->name
= xstrdup(raft
->name
);
598 md
->local
= xstrdup(raft
->local_address
);
601 memset(md
, 0, sizeof *md
);
609 /* Frees the metadata in 'md'. */
611 raft_metadata_destroy(struct raft_metadata
*md
)
619 static const struct raft_entry
*
620 raft_get_entry(const struct raft
*raft
, uint64_t index
)
622 ovs_assert(index
>= raft
->log_start
);
623 ovs_assert(index
< raft
->log_end
);
624 return &raft
->entries
[index
- raft
->log_start
];
628 raft_get_term(const struct raft
*raft
, uint64_t index
)
630 return (index
== raft
->log_start
- 1
632 : raft_get_entry(raft
, index
)->term
);
635 static const struct json
*
636 raft_servers_for_index(const struct raft
*raft
, uint64_t index
)
638 ovs_assert(index
>= raft
->log_start
- 1);
639 ovs_assert(index
< raft
->log_end
);
641 const struct json
*servers
= raft
->snap
.servers
;
642 for (uint64_t i
= raft
->log_start
; i
<= index
; i
++) {
643 const struct raft_entry
*e
= raft_get_entry(raft
, i
);
645 servers
= e
->servers
;
652 raft_set_servers(struct raft
*raft
, const struct hmap
*new_servers
,
653 enum vlog_level level
)
655 struct raft_server
*s
, *next
;
656 HMAP_FOR_EACH_SAFE (s
, next
, hmap_node
, &raft
->servers
) {
657 if (!raft_server_find(new_servers
, &s
->sid
)) {
658 ovs_assert(s
!= raft
->remove_server
);
660 hmap_remove(&raft
->servers
, &s
->hmap_node
);
661 VLOG(level
, "server %s removed from configuration", s
->nickname
);
662 raft_server_destroy(s
);
666 HMAP_FOR_EACH_SAFE (s
, next
, hmap_node
, new_servers
) {
667 if (!raft_find_server(raft
, &s
->sid
)) {
668 VLOG(level
, "server %s added to configuration", s
->nickname
);
670 struct raft_server
*new
671 = raft_server_add(&raft
->servers
, &s
->sid
, s
->address
);
672 raft_server_init_leader(raft
, new);
678 raft_add_entry(struct raft
*raft
,
679 uint64_t term
, struct json
*data
, const struct uuid
*eid
,
680 struct json
*servers
, uint64_t election_timer
)
682 if (raft
->log_end
- raft
->log_start
>= raft
->allocated_log
) {
683 raft
->entries
= x2nrealloc(raft
->entries
, &raft
->allocated_log
,
684 sizeof *raft
->entries
);
687 uint64_t index
= raft
->log_end
++;
688 struct raft_entry
*entry
= &raft
->entries
[index
- raft
->log_start
];
691 entry
->eid
= eid
? *eid
: UUID_ZERO
;
692 entry
->servers
= servers
;
693 entry
->election_timer
= election_timer
;
697 /* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers',
698 * 'election_timer' to * 'raft''s log and returns an error indication. */
699 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
700 raft_write_entry(struct raft
*raft
, uint64_t term
, struct json
*data
,
701 const struct uuid
*eid
, struct json
*servers
,
702 uint64_t election_timer
)
704 struct raft_record r
= {
705 .type
= RAFT_REC_ENTRY
,
708 .index
= raft_add_entry(raft
, term
, data
, eid
, servers
,
712 .election_timer
= election_timer
,
713 .eid
= eid
? *eid
: UUID_ZERO
,
716 return ovsdb_log_write_and_free(raft
->log
, raft_record_to_json(&r
));
719 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
720 raft_write_state(struct ovsdb_log
*log
,
721 uint64_t term
, const struct uuid
*vote
)
723 struct raft_record r
= { .term
= term
};
724 if (vote
&& !uuid_is_zero(vote
)) {
725 r
.type
= RAFT_REC_VOTE
;
728 r
.type
= RAFT_REC_TERM
;
730 return ovsdb_log_write_and_free(log
, raft_record_to_json(&r
));
733 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
734 raft_apply_record(struct raft
*raft
, unsigned long long int rec_idx
,
735 const struct raft_record
*r
)
737 /* Apply "term", which is present in most kinds of records (and otherwise
740 * A Raft leader can replicate entries from previous terms to the other
741 * servers in the cluster, retaining the original terms on those entries
742 * (see section 3.6.2 "Committing entries from previous terms" for more
743 * information), so it's OK for the term in a log record to precede the
745 if (r
->term
> raft
->term
) {
746 raft
->term
= raft
->synced_term
= r
->term
;
747 raft
->vote
= raft
->synced_vote
= UUID_ZERO
;
752 if (r
->entry
.index
< raft
->commit_index
) {
753 return ovsdb_error(NULL
, "record %llu attempts to truncate log "
754 "from %"PRIu64
" to %"PRIu64
" entries, but "
755 "commit index is already %"PRIu64
,
756 rec_idx
, raft
->log_end
, r
->entry
.index
,
758 } else if (r
->entry
.index
> raft
->log_end
) {
759 return ovsdb_error(NULL
, "record %llu with index %"PRIu64
" skips "
760 "past expected index %"PRIu64
,
761 rec_idx
, r
->entry
.index
, raft
->log_end
);
764 if (r
->entry
.index
< raft
->log_end
) {
765 /* This can happen, but it is notable. */
766 VLOG_DBG("record %llu truncates log from %"PRIu64
" to %"PRIu64
767 " entries", rec_idx
, raft
->log_end
, r
->entry
.index
);
768 raft_truncate(raft
, r
->entry
.index
);
771 uint64_t prev_term
= (raft
->log_end
> raft
->log_start
772 ? raft
->entries
[raft
->log_end
773 - raft
->log_start
- 1].term
775 if (r
->term
< prev_term
) {
776 return ovsdb_error(NULL
, "record %llu with index %"PRIu64
" term "
777 "%"PRIu64
" precedes previous entry's term "
779 rec_idx
, r
->entry
.index
, r
->term
, prev_term
);
782 raft
->log_synced
= raft_add_entry(
784 json_nullable_clone(r
->entry
.data
), &r
->entry
.eid
,
785 json_nullable_clone(r
->entry
.servers
),
786 r
->entry
.election_timer
);
793 if (r
->term
< raft
->term
) {
794 return ovsdb_error(NULL
, "record %llu votes for term %"PRIu64
" "
795 "but current term is %"PRIu64
,
796 rec_idx
, r
->term
, raft
->term
);
797 } else if (!uuid_is_zero(&raft
->vote
)
798 && !uuid_equals(&raft
->vote
, &r
->sid
)) {
799 return ovsdb_error(NULL
, "record %llu votes for "SID_FMT
" in term "
800 "%"PRIu64
" but a previous record for the "
801 "same term voted for "SID_FMT
, rec_idx
,
802 SID_ARGS(&raft
->vote
), r
->term
,
805 raft
->vote
= raft
->synced_vote
= r
->sid
;
811 if (!strcmp(r
->note
, "left")) {
812 return ovsdb_error(NULL
, "record %llu indicates server has left "
813 "the cluster; it cannot be added back (use "
814 "\"ovsdb-tool join-cluster\" to add a new "
819 case RAFT_REC_COMMIT_INDEX
:
820 if (r
->commit_index
< raft
->commit_index
) {
821 return ovsdb_error(NULL
, "record %llu regresses commit index "
822 "from %"PRIu64
" to %"PRIu64
,
823 rec_idx
, raft
->commit_index
, r
->commit_index
);
824 } else if (r
->commit_index
>= raft
->log_end
) {
825 return ovsdb_error(NULL
, "record %llu advances commit index to "
826 "%"PRIu64
" but last log index is %"PRIu64
,
827 rec_idx
, r
->commit_index
, raft
->log_end
- 1);
829 raft
->commit_index
= r
->commit_index
;
834 case RAFT_REC_LEADER
:
835 /* XXX we could use this to take back leadership for quick restart */
843 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
844 raft_read_header(struct raft
*raft
)
846 /* Read header record. */
848 struct ovsdb_error
*error
= ovsdb_log_read(raft
->log
, &json
);
849 if (error
|| !json
) {
850 /* Report error or end-of-file. */
853 ovsdb_log_mark_base(raft
->log
);
855 struct raft_header h
;
856 error
= raft_header_from_json(&h
, json
);
864 raft
->name
= xstrdup(h
.name
);
865 raft
->local_address
= xstrdup(h
.local_address
);
866 raft
->local_nickname
= raft_address_to_nickname(h
.local_address
, &h
.sid
);
867 raft
->joining
= h
.joining
;
870 sset_clone(&raft
->remote_addresses
, &h
.remote_addresses
);
872 raft_entry_clone(&raft
->snap
, &h
.snap
);
873 raft
->log_start
= raft
->log_end
= h
.snap_index
+ 1;
874 raft
->log_synced
= raft
->commit_index
= h
.snap_index
;
875 raft
->last_applied
= h
.snap_index
- 1;
878 raft_header_uninit(&h
);
883 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
884 raft_read_log(struct raft
*raft
)
886 for (unsigned long long int i
= 1; ; i
++) {
888 struct ovsdb_error
*error
= ovsdb_log_read(raft
->log
, &json
);
891 /* We assume that the error is due to a partial write while
892 * appending to the file before a crash, so log it and
894 char *error_string
= ovsdb_error_to_string_free(error
);
895 VLOG_WARN("%s", error_string
);
902 struct raft_record r
;
903 error
= raft_record_from_json(&r
, json
);
905 error
= raft_apply_record(raft
, i
, &r
);
906 raft_record_uninit(&r
);
910 return ovsdb_wrap_error(error
, "error reading record %llu from "
911 "%s log", i
, raft
->name
);
915 /* Set the most recent servers. */
916 raft_get_servers_from_log(raft
, VLL_DBG
);
918 /* Set the most recent election_timer. */
919 raft_get_election_timer_from_log(raft
);
925 raft_reset_election_timer(struct raft
*raft
)
927 unsigned int duration
= (raft
->election_timer
928 + random_range(ELECTION_RANGE_MSEC
));
929 raft
->election_base
= time_msec();
930 if (failure_test
== FT_DELAY_ELECTION
) {
931 /* Slow down this node so that it won't win the next election. */
932 duration
+= raft
->election_timer
;
934 raft
->election_timeout
= raft
->election_base
+ duration
;
938 raft_reset_ping_timer(struct raft
*raft
)
940 raft
->ping_timeout
= time_msec() + raft
->election_timer
/ 3;
944 raft_add_conn(struct raft
*raft
, struct jsonrpc_session
*js
,
945 const struct uuid
*sid
, bool incoming
)
947 struct raft_conn
*conn
= xzalloc(sizeof *conn
);
948 ovs_list_push_back(&raft
->conns
, &conn
->list_node
);
953 conn
->nickname
= raft_address_to_nickname(jsonrpc_session_get_name(js
),
955 conn
->incoming
= incoming
;
956 conn
->js_seqno
= jsonrpc_session_get_seqno(conn
->js
);
957 jsonrpc_session_set_probe_interval(js
, 0);
958 jsonrpc_session_set_backlog_threshold(js
, raft
->conn_backlog_max_n_msgs
,
959 raft
->conn_backlog_max_n_bytes
);
962 /* Starts the local server in an existing Raft cluster, using the local copy of
963 * the cluster's log in 'file_name'. Takes ownership of 'log', whether
964 * successful or not. */
965 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
966 raft_open(struct ovsdb_log
*log
, struct raft
**raftp
)
968 struct raft
*raft
= raft_alloc();
971 struct ovsdb_error
*error
= raft_read_header(raft
);
976 if (!raft
->joining
) {
977 error
= raft_read_log(raft
);
982 /* Find our own server. */
983 if (!raft_find_server(raft
, &raft
->sid
)) {
984 error
= ovsdb_error(NULL
, "server does not belong to cluster");
988 /* If there's only one server, start an election right away so that the
989 * cluster bootstraps quickly. */
990 if (hmap_count(&raft
->servers
) == 1) {
991 raft_start_election(raft
, false);
994 raft
->join_timeout
= time_msec() + 1000;
997 raft_reset_ping_timer(raft
);
998 raft_reset_election_timer(raft
);
1001 hmap_insert(&all_rafts
, &raft
->hmap_node
, hash_string(raft
->name
, 0));
1010 /* Returns the name of 'raft', which in OVSDB is the database schema name. */
1012 raft_get_name(const struct raft
*raft
)
1017 /* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
1018 * its cluster, then 'cid' will be all-zeros (unless the administrator
1019 * specified a cluster ID running "ovsdb-tool join-cluster").
1021 * Each cluster has a unique cluster ID. */
1023 raft_get_cid(const struct raft
*raft
)
1028 /* Returns the server ID of 'raft'. Each server has a unique server ID. */
1030 raft_get_sid(const struct raft
*raft
)
1035 /* Adds memory consumption info to 'usage' for later use by memory_report(). */
1037 raft_get_memory_usage(const struct raft
*raft
, struct simap
*usage
)
1039 struct raft_conn
*conn
;
1040 uint64_t backlog
= 0;
1043 LIST_FOR_EACH (conn
, list_node
, &raft
->conns
) {
1044 backlog
+= jsonrpc_session_get_backlog(conn
->js
);
1047 simap_increase(usage
, "raft-backlog-kB", backlog
/ 1000);
1048 simap_increase(usage
, "raft-connections", cnt
);
1049 simap_increase(usage
, "raft-log", raft
->log_end
- raft
->log_start
);
1052 /* Returns true if 'raft' has completed joining its cluster, has not left or
1053 * initiated leaving the cluster, does not have failed disk storage, and is
1054 * apparently connected to the leader in a healthy way (or is itself the
1057 * If 'raft' is candidate:
1058 * a) if it is the first round of election, consider it as connected, hoping
1059 * it will successfully elect a new leader soon.
1060 * b) if it is already retrying, consider it as disconnected (so that clients
1061 * may decide to reconnect to other members). */
1063 raft_is_connected(const struct raft
*raft
)
1065 static bool last_state
= false;
1066 bool ret
= (!raft
->candidate_retrying
1071 && raft
->ever_had_leader
);
1074 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
1075 VLOG_DBG_RL(&rl
, "raft_is_connected: false");
1076 } else if (!last_state
) {
1077 VLOG_DBG("raft_is_connected: true");
1084 /* Returns true if 'raft' is the cluster leader. */
1086 raft_is_leader(const struct raft
*raft
)
1088 return raft
->role
== RAFT_LEADER
;
1091 /* Returns true if 'raft' is the process of joining its cluster. */
1093 raft_is_joining(const struct raft
*raft
)
1095 return raft
->joining
;
1098 /* Only returns *connected* connections. */
1099 static struct raft_conn
*
1100 raft_find_conn_by_sid(struct raft
*raft
, const struct uuid
*sid
)
1102 if (!uuid_is_zero(sid
)) {
1103 struct raft_conn
*conn
;
1104 LIST_FOR_EACH (conn
, list_node
, &raft
->conns
) {
1105 if (uuid_equals(sid
, &conn
->sid
)
1106 && jsonrpc_session_is_connected(conn
->js
)) {
1114 static struct raft_conn
*
1115 raft_find_conn_by_address(struct raft
*raft
, const char *address
)
1117 struct raft_conn
*conn
;
1118 LIST_FOR_EACH (conn
, list_node
, &raft
->conns
) {
1119 if (!strcmp(jsonrpc_session_get_name(conn
->js
), address
)) {
1126 static void OVS_PRINTF_FORMAT(3, 4)
1127 raft_record_note(struct raft
*raft
, const char *note
,
1128 const char *comment_format
, ...)
1131 va_start(args
, comment_format
);
1132 char *comment
= xvasprintf(comment_format
, args
);
1135 struct raft_record r
= {
1136 .type
= RAFT_REC_NOTE
,
1138 .note
= CONST_CAST(char *, note
),
1140 ignore(ovsdb_log_write_and_free(raft
->log
, raft_record_to_json(&r
)));
1145 /* If we're leader, try to transfer leadership to another server, logging
1146 * 'reason' as the human-readable reason (it should be a phrase suitable for
1147 * following "because") . */
1149 raft_transfer_leadership(struct raft
*raft
, const char *reason
)
1151 if (raft
->role
!= RAFT_LEADER
) {
1155 struct raft_server
*s
;
1156 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
1157 if (!uuid_equals(&raft
->sid
, &s
->sid
)
1158 && s
->phase
== RAFT_PHASE_STABLE
) {
1159 struct raft_conn
*conn
= raft_find_conn_by_sid(raft
, &s
->sid
);
1164 union raft_rpc rpc
= {
1167 .comment
= CONST_CAST(char *, reason
),
1168 .type
= RAFT_RPC_BECOME_LEADER
,
1174 raft_send_to_conn(raft
, &rpc
, conn
);
1176 raft_record_note(raft
, "transfer leadership",
1177 "transferring leadership to %s because %s",
1178 s
->nickname
, reason
);
1184 /* Send a RemoveServerRequest to the rest of the servers in the cluster.
1186 * If we know which server is the leader, we can just send the request to it.
1187 * However, we might not know which server is the leader, and we might never
1188 * find out if the remove request was actually previously committed by a
1189 * majority of the servers (because in that case the new leader will not send
1190 * AppendRequests or heartbeats to us). Therefore, we instead send
1191 * RemoveRequests to every server. This theoretically has the same problem, if
1192 * the current cluster leader was not previously a member of the cluster, but
1193 * it seems likely to be more robust in practice. */
1195 raft_send_remove_server_requests(struct raft
*raft
)
1197 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
1198 VLOG_INFO_RL(&rl
, "sending remove request (joining=%s, leaving=%s)",
1199 raft
->joining
? "true" : "false",
1200 raft
->leaving
? "true" : "false");
1201 const struct raft_server
*s
;
1202 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
1203 if (!uuid_equals(&s
->sid
, &raft
->sid
)) {
1204 union raft_rpc rpc
= (union raft_rpc
) {
1205 .remove_server_request
= {
1207 .type
= RAFT_RPC_REMOVE_SERVER_REQUEST
,
1213 raft_send(raft
, &rpc
);
1217 raft
->leave_timeout
= time_msec() + raft
->election_timer
;
1220 /* Attempts to start 'raft' leaving its cluster. The caller can check progress
1221 * using raft_is_leaving() and raft_left(). */
1223 raft_leave(struct raft
*raft
)
1225 if (raft
->joining
|| raft
->failed
|| raft
->leaving
|| raft
->left
) {
1228 VLOG_INFO(SID_FMT
": starting to leave cluster "CID_FMT
,
1229 SID_ARGS(&raft
->sid
), CID_ARGS(&raft
->cid
));
1230 raft
->leaving
= true;
1231 raft_transfer_leadership(raft
, "this server is leaving the cluster");
1232 raft_become_follower(raft
);
1233 raft_send_remove_server_requests(raft
);
1234 raft
->leave_timeout
= time_msec() + raft
->election_timer
;
1237 /* Returns true if 'raft' is currently attempting to leave its cluster. */
1239 raft_is_leaving(const struct raft
*raft
)
1241 return raft
->leaving
;
1244 /* Returns true if 'raft' successfully left its cluster. */
1246 raft_left(const struct raft
*raft
)
1251 /* Returns true if 'raft' has experienced a disk I/O failure. When this
1252 * returns true, only closing and reopening 'raft' allows for recovery. */
1254 raft_failed(const struct raft
*raft
)
1256 return raft
->failed
;
1259 /* Forces 'raft' to attempt to take leadership of the cluster by deposing the
1260 * current cluster. */
1262 raft_take_leadership(struct raft
*raft
)
1264 if (raft
->role
!= RAFT_LEADER
) {
1265 raft_start_election(raft
, true);
1269 /* Closes everything owned by 'raft' that might be visible outside the process:
1270 * network connections, commands, etc. This is part of closing 'raft'; it is
1271 * also used if 'raft' has failed in an unrecoverable way. */
1273 raft_close__(struct raft
*raft
)
1275 if (!hmap_node_is_null(&raft
->hmap_node
)) {
1276 hmap_remove(&all_rafts
, &raft
->hmap_node
);
1277 hmap_node_nullify(&raft
->hmap_node
);
1280 raft_complete_all_commands(raft
, RAFT_CMD_SHUTDOWN
);
1282 struct raft_server
*rs
= raft
->remove_server
;
1284 raft_send_remove_server_reply__(raft
, &rs
->sid
, &rs
->requester_sid
,
1285 rs
->requester_conn
, false,
1286 RAFT_SERVER_SHUTDOWN
);
1287 raft_server_destroy(raft
->remove_server
);
1288 raft
->remove_server
= NULL
;
1291 struct raft_conn
*conn
, *next
;
1292 LIST_FOR_EACH_SAFE (conn
, next
, list_node
, &raft
->conns
) {
1293 raft_conn_close(conn
);
1297 /* Closes and frees 'raft'.
1299 * A server's cluster membership is independent of whether the server is
1300 * actually running. When a server that is a member of a cluster closes, the
1301 * cluster treats this as a server failure. */
1303 raft_close(struct raft
*raft
)
1309 raft_transfer_leadership(raft
, "this server is shutting down");
1313 ovsdb_log_close(raft
->log
);
1315 raft_servers_destroy(&raft
->servers
);
1317 for (uint64_t index
= raft
->log_start
; index
< raft
->log_end
; index
++) {
1318 struct raft_entry
*e
= &raft
->entries
[index
- raft
->log_start
];
1319 raft_entry_uninit(e
);
1321 free(raft
->entries
);
1323 raft_entry_uninit(&raft
->snap
);
1325 raft_waiters_destroy(raft
);
1327 raft_servers_destroy(&raft
->add_servers
);
1329 hmap_destroy(&raft
->commands
);
1331 pstream_close(raft
->listener
);
1333 sset_destroy(&raft
->remote_addresses
);
1334 free(raft
->local_address
);
1335 free(raft
->local_nickname
);
1342 raft_conn_receive(struct raft
*raft
, struct raft_conn
*conn
,
1343 union raft_rpc
*rpc
)
1345 struct jsonrpc_msg
*msg
= jsonrpc_session_recv(conn
->js
);
1350 struct ovsdb_error
*error
= raft_rpc_from_jsonrpc(&raft
->cid
, &raft
->sid
,
1352 jsonrpc_msg_destroy(msg
);
1354 char *s
= ovsdb_error_to_string_free(error
);
1355 VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn
->js
), s
);
1360 if (uuid_is_zero(&conn
->sid
)) {
1361 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(50, 50);
1362 conn
->sid
= rpc
->common
.sid
;
1363 VLOG_INFO_RL(&rl
, "%s: learned server ID "SID_FMT
,
1364 jsonrpc_session_get_name(conn
->js
), SID_ARGS(&conn
->sid
));
1365 } else if (!uuid_equals(&conn
->sid
, &rpc
->common
.sid
)) {
1366 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
1367 VLOG_WARN_RL(&rl
, "%s: ignoring message with unexpected server ID "
1368 SID_FMT
" (expected "SID_FMT
")",
1369 jsonrpc_session_get_name(conn
->js
),
1370 SID_ARGS(&rpc
->common
.sid
), SID_ARGS(&conn
->sid
));
1371 raft_rpc_uninit(rpc
);
1375 const char *address
= (rpc
->type
== RAFT_RPC_HELLO_REQUEST
1376 ? rpc
->hello_request
.address
1377 : rpc
->type
== RAFT_RPC_ADD_SERVER_REQUEST
1378 ? rpc
->add_server_request
.address
1381 char *new_nickname
= raft_address_to_nickname(address
, &conn
->sid
);
1382 if (strcmp(conn
->nickname
, new_nickname
)) {
1383 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(50, 50);
1384 VLOG_INFO_RL(&rl
, "%s: learned remote address %s",
1385 jsonrpc_session_get_name(conn
->js
), address
);
1387 free(conn
->nickname
);
1388 conn
->nickname
= new_nickname
;
1398 raft_get_nickname(const struct raft
*raft
, const struct uuid
*sid
,
1399 char buf
[SID_LEN
+ 1], size_t bufsize
)
1401 if (uuid_equals(sid
, &raft
->sid
)) {
1402 return raft
->local_nickname
;
1405 const char *s
= raft_servers_get_nickname__(&raft
->servers
, sid
);
1410 return raft_servers_get_nickname(&raft
->add_servers
, sid
, buf
, bufsize
);
1414 log_rpc(const union raft_rpc
*rpc
, const char *direction
,
1415 const struct raft_conn
*conn
, int line_number
)
1417 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(600, 600);
1418 if (!raft_rpc_is_heartbeat(rpc
) && !VLOG_DROP_DBG(&rl
)) {
1419 struct ds s
= DS_EMPTY_INITIALIZER
;
1421 ds_put_format(&s
, "raft.c:%d ", line_number
);
1423 ds_put_format(&s
, "%s%s ", direction
, conn
->nickname
);
1424 raft_rpc_format(rpc
, &s
);
1425 VLOG_DBG("%s", ds_cstr(&s
));
1431 raft_send_add_server_request(struct raft
*raft
, struct raft_conn
*conn
)
1433 union raft_rpc rq
= {
1434 .add_server_request
= {
1436 .type
= RAFT_RPC_ADD_SERVER_REQUEST
,
1440 .address
= raft
->local_address
,
1443 raft_send_to_conn(raft
, &rq
, conn
);
1447 raft_conn_run(struct raft
*raft
, struct raft_conn
*conn
)
1449 jsonrpc_session_run(conn
->js
);
1451 unsigned int new_seqno
= jsonrpc_session_get_seqno(conn
->js
);
1452 bool reconnected
= new_seqno
!= conn
->js_seqno
;
1453 bool just_connected
= (reconnected
1454 && jsonrpc_session_is_connected(conn
->js
));
1457 /* Clear 'install_snapshot_request_in_progress' since it might not
1458 * reach the destination or server was restarted. */
1459 struct raft_server
*server
= raft_find_server(raft
, &conn
->sid
);
1461 server
->install_snapshot_request_in_progress
= false;
1465 conn
->js_seqno
= new_seqno
;
1466 if (just_connected
) {
1467 if (raft
->joining
) {
1468 raft_send_add_server_request(raft
, conn
);
1469 } else if (raft
->leaving
) {
1470 union raft_rpc rq
= {
1471 .remove_server_request
= {
1473 .type
= RAFT_RPC_REMOVE_SERVER_REQUEST
,
1479 raft_send_to_conn(raft
, &rq
, conn
);
1481 union raft_rpc rq
= (union raft_rpc
) {
1484 .type
= RAFT_RPC_HELLO_REQUEST
,
1487 .address
= raft
->local_address
,
1490 raft_send_to_conn(raft
, &rq
, conn
);
1494 for (size_t i
= 0; i
< 50; i
++) {
1496 if (!raft_conn_receive(raft
, conn
, &rpc
)) {
1500 log_rpc(&rpc
, "<--", conn
, 0);
1501 raft_handle_rpc(raft
, &rpc
);
1502 raft_rpc_uninit(&rpc
);
1507 raft_waiter_complete_rpc(struct raft
*raft
, const union raft_rpc
*rpc
)
1509 uint64_t term
= raft_rpc_get_term(rpc
);
1510 if (term
&& term
< raft
->term
) {
1511 /* Drop the message because it's for an expired term. */
1515 if (!raft_is_rpc_synced(raft
, rpc
)) {
1516 /* This is a bug. A reply message is deferred because some state in
1517 * the message, such as a term or index, has not been committed to
1518 * disk, and they should only be completed when that commit is done.
1519 * But this message is being completed before the commit is finished.
1520 * Complain, and hope that someone reports the bug. */
1521 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
1522 if (VLOG_DROP_ERR(&rl
)) {
1526 struct ds s
= DS_EMPTY_INITIALIZER
;
1528 if (term
> raft
->synced_term
) {
1529 ds_put_format(&s
, " because message term %"PRIu64
" is "
1530 "past synced term %"PRIu64
,
1531 term
, raft
->synced_term
);
1534 uint64_t index
= raft_rpc_get_min_sync_index(rpc
);
1535 if (index
> raft
->log_synced
) {
1536 ds_put_format(&s
, " %s message index %"PRIu64
" is past last "
1537 "synced index %"PRIu64
,
1538 s
.length
? "and" : "because",
1539 index
, raft
->log_synced
);
1542 const struct uuid
*vote
= raft_rpc_get_vote(rpc
);
1543 if (vote
&& !uuid_equals(vote
, &raft
->synced_vote
)) {
1544 char buf1
[SID_LEN
+ 1];
1545 char buf2
[SID_LEN
+ 1];
1546 ds_put_format(&s
, " %s vote %s differs from synced vote %s",
1547 s
.length
? "and" : "because",
1548 raft_get_nickname(raft
, vote
, buf1
, sizeof buf1
),
1549 raft_get_nickname(raft
, &raft
->synced_vote
,
1550 buf2
, sizeof buf2
));
1553 char buf
[SID_LEN
+ 1];
1554 ds_put_format(&s
, ": %s ",
1555 raft_get_nickname(raft
, &rpc
->common
.sid
,
1557 raft_rpc_format(rpc
, &s
);
1558 VLOG_ERR("internal error: deferred %s message completed "
1559 "but not ready to send%s",
1560 raft_rpc_type_to_string(rpc
->type
), ds_cstr(&s
));
1566 struct raft_conn
*dst
= raft_find_conn_by_sid(raft
, &rpc
->common
.sid
);
1568 raft_send_to_conn(raft
, rpc
, dst
);
1573 raft_waiter_complete(struct raft
*raft
, struct raft_waiter
*w
)
1577 if (raft
->role
== RAFT_LEADER
) {
1578 raft_update_our_match_index(raft
, w
->entry
.index
);
1580 raft
->log_synced
= w
->entry
.index
;
1584 raft
->synced_term
= w
->term
.term
;
1585 raft
->synced_vote
= w
->term
.vote
;
1589 raft_waiter_complete_rpc(raft
, w
->rpc
);
1595 raft_waiter_destroy(struct raft_waiter
*w
)
1601 ovs_list_remove(&w
->list_node
);
1609 raft_rpc_uninit(w
->rpc
);
1617 raft_waiters_run(struct raft
*raft
)
1619 if (ovs_list_is_empty(&raft
->waiters
)) {
1623 uint64_t cur
= ovsdb_log_commit_progress(raft
->log
);
1624 struct raft_waiter
*w
, *next
;
1625 LIST_FOR_EACH_SAFE (w
, next
, list_node
, &raft
->waiters
) {
1626 if (cur
< w
->commit_ticket
) {
1629 raft_waiter_complete(raft
, w
);
1630 raft_waiter_destroy(w
);
1635 raft_waiters_wait(struct raft
*raft
)
1637 struct raft_waiter
*w
;
1638 LIST_FOR_EACH (w
, list_node
, &raft
->waiters
) {
1639 ovsdb_log_commit_wait(raft
->log
, w
->commit_ticket
);
1645 raft_waiters_destroy(struct raft
*raft
)
1647 struct raft_waiter
*w
, *next
;
1648 LIST_FOR_EACH_SAFE (w
, next
, list_node
, &raft
->waiters
) {
1649 raft_waiter_destroy(w
);
1653 static bool OVS_WARN_UNUSED_RESULT
1654 raft_set_term(struct raft
*raft
, uint64_t term
, const struct uuid
*vote
)
1656 struct ovsdb_error
*error
= raft_write_state(raft
->log
, term
, vote
);
1657 if (!raft_handle_write_error(raft
, error
)) {
1661 struct raft_waiter
*w
= raft_waiter_create(raft
, RAFT_W_TERM
, true);
1662 raft
->term
= w
->term
.term
= term
;
1663 raft
->vote
= w
->term
.vote
= vote
? *vote
: UUID_ZERO
;
1668 raft_accept_vote(struct raft
*raft
, struct raft_server
*s
,
1669 const struct uuid
*vote
)
1671 if (uuid_equals(&s
->vote
, vote
)) {
1674 if (!uuid_is_zero(&s
->vote
)) {
1675 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 1);
1676 char buf1
[SID_LEN
+ 1];
1677 char buf2
[SID_LEN
+ 1];
1678 VLOG_WARN_RL(&rl
, "server %s changed its vote from %s to %s",
1680 raft_get_nickname(raft
, &s
->vote
, buf1
, sizeof buf1
),
1681 raft_get_nickname(raft
, vote
, buf2
, sizeof buf2
));
1684 if (uuid_equals(vote
, &raft
->sid
)
1685 && ++raft
->n_votes
> hmap_count(&raft
->servers
) / 2) {
1686 raft_become_leader(raft
);
1691 raft_start_election(struct raft
*raft
, bool leadership_transfer
)
1693 if (raft
->leaving
) {
1697 struct raft_server
*me
= raft_find_server(raft
, &raft
->sid
);
1702 if (!raft_set_term(raft
, raft
->term
+ 1, &raft
->sid
)) {
1706 ovs_assert(raft
->role
!= RAFT_LEADER
);
1708 raft
->leader_sid
= UUID_ZERO
;
1709 raft
->role
= RAFT_CANDIDATE
;
1710 /* If there was no leader elected since last election, we know we are
1712 raft
->candidate_retrying
= !raft
->had_leader
;
1713 raft
->had_leader
= false;
1717 raft
->election_start
= time_msec();
1718 raft
->election_won
= 0;
1719 raft
->leadership_transfer
= leadership_transfer
;
1721 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
1722 if (!VLOG_DROP_INFO(&rl
)) {
1723 long long int now
= time_msec();
1724 if (now
>= raft
->election_timeout
) {
1725 VLOG_INFO("term %"PRIu64
": %lld ms timeout expired, "
1726 "starting election",
1727 raft
->term
, now
- raft
->election_base
);
1729 VLOG_INFO("term %"PRIu64
": starting election", raft
->term
);
1732 raft_reset_election_timer(raft
);
1734 struct raft_server
*peer
;
1735 HMAP_FOR_EACH (peer
, hmap_node
, &raft
->servers
) {
1736 peer
->vote
= UUID_ZERO
;
1737 if (uuid_equals(&raft
->sid
, &peer
->sid
)) {
1741 union raft_rpc rq
= {
1744 .type
= RAFT_RPC_VOTE_REQUEST
,
1748 .last_log_index
= raft
->log_end
- 1,
1750 raft
->log_end
> raft
->log_start
1751 ? raft
->entries
[raft
->log_end
- raft
->log_start
- 1].term
1753 .leadership_transfer
= leadership_transfer
,
1756 if (failure_test
!= FT_DONT_SEND_VOTE_REQUEST
) {
1757 raft_send(raft
, &rq
);
1761 /* Vote for ourselves. */
1762 raft_accept_vote(raft
, me
, &raft
->sid
);
1766 raft_open_conn(struct raft
*raft
, const char *address
, const struct uuid
*sid
)
1768 if (strcmp(address
, raft
->local_address
)
1769 && !raft_find_conn_by_address(raft
, address
)) {
1770 raft_add_conn(raft
, jsonrpc_session_open(address
, true), sid
, false);
1775 raft_conn_close(struct raft_conn
*conn
)
1777 jsonrpc_session_close(conn
->js
);
1778 ovs_list_remove(&conn
->list_node
);
1779 free(conn
->nickname
);
1783 /* Returns true if 'conn' should stay open, false if it should be closed. */
1785 raft_conn_should_stay_open(struct raft
*raft
, struct raft_conn
*conn
)
1787 /* Close the connection if it's actually dead. If necessary, we'll
1788 * initiate a new session later. */
1789 if (!jsonrpc_session_is_alive(conn
->js
)) {
1793 /* Keep incoming sessions. We trust the originator to decide to drop
1795 if (conn
->incoming
) {
1799 /* If we are joining the cluster, keep sessions to the remote addresses
1800 * that are supposed to be part of the cluster we're joining. */
1801 if (raft
->joining
&& sset_contains(&raft
->remote_addresses
,
1802 jsonrpc_session_get_name(conn
->js
))) {
1806 /* We have joined the cluster. If we did that "recently", then there is a
1807 * chance that we do not have the most recent server configuration log
1808 * entry. If so, it's a waste to disconnect from the servers that were in
1809 * remote_addresses and that will probably appear in the configuration,
1810 * just to reconnect to them a moment later when we do get the
1811 * configuration update. If we are not ourselves in the configuration,
1812 * then we know that there must be a new configuration coming up, so in
1813 * that case keep the connection. */
1814 if (!raft_find_server(raft
, &raft
->sid
)) {
1818 /* Keep the connection only if the server is part of the configuration. */
1819 return raft_find_server(raft
, &conn
->sid
);
1822 /* Allows 'raft' to maintain the distributed log. Call this function as part
1823 * of the process's main loop. */
1825 raft_run(struct raft
*raft
)
1827 if (raft
->left
|| raft
->failed
) {
1831 raft_waiters_run(raft
);
1833 if (!raft
->listener
&& time_msec() >= raft
->listen_backoff
) {
1834 char *paddr
= raft_make_address_passive(raft
->local_address
);
1835 int error
= pstream_open(paddr
, &raft
->listener
, DSCP_DEFAULT
);
1837 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
1838 VLOG_WARN_RL(&rl
, "%s: listen failed (%s)",
1839 paddr
, ovs_strerror(error
));
1840 raft
->listen_backoff
= time_msec() + 1000;
1845 if (raft
->listener
) {
1846 struct stream
*stream
;
1847 int error
= pstream_accept(raft
->listener
, &stream
);
1849 raft_add_conn(raft
, jsonrpc_session_open_unreliably(
1850 jsonrpc_open(stream
), DSCP_DEFAULT
), NULL
,
1852 } else if (error
!= EAGAIN
) {
1853 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 1);
1854 VLOG_WARN_RL(&rl
, "%s: accept failed: %s",
1855 pstream_get_name(raft
->listener
),
1856 ovs_strerror(error
));
1860 /* Run RPCs for all open sessions. */
1861 struct raft_conn
*conn
;
1862 LIST_FOR_EACH (conn
, list_node
, &raft
->conns
) {
1863 raft_conn_run(raft
, conn
);
1866 /* Close unneeded sessions. */
1867 struct raft_conn
*next
;
1868 LIST_FOR_EACH_SAFE (conn
, next
, list_node
, &raft
->conns
) {
1869 if (!raft_conn_should_stay_open(raft
, conn
)) {
1870 raft
->n_disconnections
++;
1871 raft_conn_close(conn
);
1875 /* Open needed sessions. */
1876 struct raft_server
*server
;
1877 HMAP_FOR_EACH (server
, hmap_node
, &raft
->servers
) {
1878 raft_open_conn(raft
, server
->address
, &server
->sid
);
1880 if (raft
->joining
) {
1881 const char *address
;
1882 SSET_FOR_EACH (address
, &raft
->remote_addresses
) {
1883 raft_open_conn(raft
, address
, NULL
);
1887 if (!raft
->joining
&& time_msec() >= raft
->election_timeout
) {
1888 if (raft
->role
== RAFT_LEADER
) {
1889 /* Check if majority of followers replied, then reset
1890 * election_timeout and reset s->replied. Otherwise, become
1893 * Raft paper section 6.2: Leaders: A server might be in the leader
1894 * state, but if it isn’t the current leader, it could be
1895 * needlessly delaying client requests. For example, suppose a
1896 * leader is partitioned from the rest of the cluster, but it can
1897 * still communicate with a particular client. Without additional
1898 * mechanism, it could delay a request from that client forever,
1899 * being unable to replicate a log entry to any other servers.
1900 * Meanwhile, there might be another leader of a newer term that is
1901 * able to communicate with a majority of the cluster and would be
1902 * able to commit the client’s request. Thus, a leader in Raft
1903 * steps down if an election timeout elapses without a successful
1904 * round of heartbeats to a majority of its cluster; this allows
1905 * clients to retry their requests with another server. */
1907 HMAP_FOR_EACH (server
, hmap_node
, &raft
->servers
) {
1908 if (server
->replied
) {
1912 if (count
>= hmap_count(&raft
->servers
) / 2) {
1913 HMAP_FOR_EACH (server
, hmap_node
, &raft
->servers
) {
1914 server
->replied
= false;
1916 raft_reset_election_timer(raft
);
1918 raft_become_follower(raft
);
1919 raft_start_election(raft
, false);
1922 raft_start_election(raft
, false);
1927 if (raft
->leaving
&& time_msec() >= raft
->leave_timeout
) {
1928 raft_send_remove_server_requests(raft
);
1931 if (raft
->joining
&& time_msec() >= raft
->join_timeout
) {
1932 raft
->join_timeout
= time_msec() + 1000;
1933 LIST_FOR_EACH (conn
, list_node
, &raft
->conns
) {
1934 raft_send_add_server_request(raft
, conn
);
1938 long long int now
= time_msec();
1939 if (now
>= raft
->ping_timeout
) {
1940 if (raft
->role
== RAFT_LEADER
) {
1941 raft_send_heartbeats(raft
);
1943 /* Check if any commands timeout. Timeout is set to twice the time of
1944 * election base time so that commands can complete properly during
1945 * leader election. E.g. a leader crashed and current node with pending
1946 * commands becomes new leader: the pending commands can still complete
1947 * if the crashed leader has replicated the transactions to majority of
1948 * followers before it crashed. */
1949 struct raft_command
*cmd
, *next_cmd
;
1950 HMAP_FOR_EACH_SAFE (cmd
, next_cmd
, hmap_node
, &raft
->commands
) {
1952 && now
- cmd
->timestamp
> raft
->election_timer
* 2) {
1953 raft_command_complete(raft
, cmd
, RAFT_CMD_TIMEOUT
);
1956 raft_reset_ping_timer(raft
);
1959 /* Do this only at the end; if we did it as soon as we set raft->left or
1960 * raft->failed in handling the RemoveServerReply, then it could easily
1961 * cause references to freed memory in RPC sessions, etc. */
1962 if (raft
->left
|| raft
->failed
) {
1968 raft_wait_session(struct jsonrpc_session
*js
)
1971 jsonrpc_session_wait(js
);
1972 jsonrpc_session_recv_wait(js
);
1976 /* Causes the next call to poll_block() to wake up when 'raft' needs to do
1979 raft_wait(struct raft
*raft
)
1981 if (raft
->left
|| raft
->failed
) {
1985 raft_waiters_wait(raft
);
1987 if (raft
->listener
) {
1988 pstream_wait(raft
->listener
);
1990 poll_timer_wait_until(raft
->listen_backoff
);
1993 struct raft_conn
*conn
;
1994 LIST_FOR_EACH (conn
, list_node
, &raft
->conns
) {
1995 raft_wait_session(conn
->js
);
1998 if (!raft
->joining
) {
1999 poll_timer_wait_until(raft
->election_timeout
);
2001 poll_timer_wait_until(raft
->join_timeout
);
2003 if (raft
->leaving
) {
2004 poll_timer_wait_until(raft
->leave_timeout
);
2006 if (raft
->role
== RAFT_LEADER
|| !hmap_is_empty(&raft
->commands
)) {
2007 poll_timer_wait_until(raft
->ping_timeout
);
2011 static struct raft_waiter
*
2012 raft_waiter_create(struct raft
*raft
, enum raft_waiter_type type
,
2015 struct raft_waiter
*w
= xzalloc(sizeof *w
);
2016 ovs_list_push_back(&raft
->waiters
, &w
->list_node
);
2017 w
->commit_ticket
= start_commit
? ovsdb_log_commit_start(raft
->log
) : 0;
2022 /* Returns a human-readable representation of 'status' (or NULL if 'status' is
2025 raft_command_status_to_string(enum raft_command_status status
)
2028 case RAFT_CMD_INCOMPLETE
:
2029 return "operation still in progress";
2030 case RAFT_CMD_SUCCESS
:
2032 case RAFT_CMD_NOT_LEADER
:
2033 return "not leader";
2034 case RAFT_CMD_BAD_PREREQ
:
2035 return "prerequisite check failed";
2036 case RAFT_CMD_LOST_LEADERSHIP
:
2037 return "lost leadership";
2038 case RAFT_CMD_SHUTDOWN
:
2039 return "server shutdown";
2040 case RAFT_CMD_IO_ERROR
:
2042 case RAFT_CMD_TIMEOUT
:
2049 /* Converts human-readable status in 's' into status code in '*statusp'.
2050 * Returns true if successful, false if 's' is unknown. */
2052 raft_command_status_from_string(const char *s
,
2053 enum raft_command_status
*statusp
)
2055 for (enum raft_command_status status
= 0; ; status
++) {
2056 const char *s2
= raft_command_status_to_string(status
);
2060 } else if (!strcmp(s
, s2
)) {
2067 static const struct uuid
*
2068 raft_get_eid(const struct raft
*raft
, uint64_t index
)
2070 for (; index
>= raft
->log_start
; index
--) {
2071 const struct raft_entry
*e
= raft_get_entry(raft
, index
);
2076 return &raft
->snap
.eid
;
2080 raft_current_eid(const struct raft
*raft
)
2082 return raft_get_eid(raft
, raft
->log_end
- 1);
2085 static struct raft_command
*
2086 raft_command_create_completed(enum raft_command_status status
)
2088 ovs_assert(status
!= RAFT_CMD_INCOMPLETE
);
2090 struct raft_command
*cmd
= xzalloc(sizeof *cmd
);
2092 cmd
->status
= status
;
2096 static struct raft_command
*
2097 raft_command_create_incomplete(struct raft
*raft
, uint64_t index
)
2099 struct raft_command
*cmd
= xzalloc(sizeof *cmd
);
2100 cmd
->n_refs
= 2; /* One for client, one for raft->commands. */
2102 cmd
->status
= RAFT_CMD_INCOMPLETE
;
2103 hmap_insert(&raft
->commands
, &cmd
->hmap_node
, cmd
->index
);
2107 static struct raft_command
* OVS_WARN_UNUSED_RESULT
2108 raft_command_initiate(struct raft
*raft
,
2109 const struct json
*data
, const struct json
*servers
,
2110 uint64_t election_timer
, const struct uuid
*eid
)
2112 /* Write to local log. */
2113 uint64_t index
= raft
->log_end
;
2114 if (!raft_handle_write_error(
2115 raft
, raft_write_entry(
2116 raft
, raft
->term
, json_nullable_clone(data
), eid
,
2117 json_nullable_clone(servers
),
2119 return raft_command_create_completed(RAFT_CMD_IO_ERROR
);
2122 struct raft_command
*cmd
= raft_command_create_incomplete(raft
, index
);
2125 cmd
->timestamp
= time_msec();
2127 raft_waiter_create(raft
, RAFT_W_ENTRY
, true)->entry
.index
= cmd
->index
;
2129 if (failure_test
== FT_CRASH_BEFORE_SEND_APPEND_REQ
) {
2130 ovs_fatal(0, "Raft test: crash before sending append_request.");
2132 /* Write to remote logs. */
2133 struct raft_server
*s
;
2134 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
2135 if (!uuid_equals(&s
->sid
, &raft
->sid
) && s
->next_index
== index
) {
2136 raft_send_append_request(raft
, s
, 1, "execute command");
2140 if (failure_test
== FT_CRASH_AFTER_SEND_APPEND_REQ
) {
2141 ovs_fatal(0, "Raft test: crash after sending append_request.");
2143 raft_reset_ping_timer(raft
);
2149 log_all_commands(struct raft
*raft
)
2151 struct raft_command
*cmd
, *next
;
2152 HMAP_FOR_EACH_SAFE (cmd
, next
, hmap_node
, &raft
->commands
) {
2153 VLOG_DBG("raft command eid: "UUID_FMT
, UUID_ARGS(&cmd
->eid
));
2157 static struct raft_command
* OVS_WARN_UNUSED_RESULT
2158 raft_command_execute__(struct raft
*raft
, const struct json
*data
,
2159 const struct json
*servers
, uint64_t election_timer
,
2160 const struct uuid
*prereq
, struct uuid
*result
)
2162 if (raft
->joining
|| raft
->leaving
|| raft
->left
|| raft
->failed
) {
2163 return raft_command_create_completed(RAFT_CMD_SHUTDOWN
);
2166 if (raft
->role
!= RAFT_LEADER
) {
2167 /* Consider proxying the command to the leader. We can only do that if
2168 * we know the leader and the command does not change the set of
2169 * servers. We do not proxy commands without prerequisites, even
2170 * though we could, because in an OVSDB context a log entry doesn't
2171 * make sense without context. */
2172 if (servers
|| election_timer
|| !data
2173 || raft
->role
!= RAFT_FOLLOWER
|| uuid_is_zero(&raft
->leader_sid
)
2175 return raft_command_create_completed(RAFT_CMD_NOT_LEADER
);
2179 struct uuid eid
= data
? uuid_random() : UUID_ZERO
;
2184 if (raft
->role
!= RAFT_LEADER
) {
2185 const union raft_rpc rpc
= {
2186 .execute_command_request
= {
2188 .type
= RAFT_RPC_EXECUTE_COMMAND_REQUEST
,
2189 .sid
= raft
->leader_sid
,
2191 .data
= CONST_CAST(struct json
*, data
),
2196 if (failure_test
== FT_CRASH_BEFORE_SEND_EXEC_REQ
) {
2197 ovs_fatal(0, "Raft test: crash before sending "
2198 "execute_command_request");
2200 if (!raft_send(raft
, &rpc
)) {
2201 /* Couldn't send command, so it definitely failed. */
2202 return raft_command_create_completed(RAFT_CMD_NOT_LEADER
);
2204 if (failure_test
== FT_CRASH_AFTER_SEND_EXEC_REQ
) {
2205 ovs_fatal(0, "Raft test: crash after sending "
2206 "execute_command_request");
2209 struct raft_command
*cmd
= raft_command_create_incomplete(raft
, 0);
2210 cmd
->timestamp
= time_msec();
2212 log_all_commands(raft
);
2216 const struct uuid
*current_eid
= raft_current_eid(raft
);
2217 if (prereq
&& !uuid_equals(prereq
, current_eid
)) {
2218 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
2219 VLOG_INFO_RL(&rl
, "current entry eid "UUID_FMT
" does not match "
2220 "prerequisite "UUID_FMT
,
2221 UUID_ARGS(current_eid
), UUID_ARGS(prereq
));
2222 return raft_command_create_completed(RAFT_CMD_BAD_PREREQ
);
2225 return raft_command_initiate(raft
, data
, servers
, election_timer
, &eid
);
2228 /* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
2229 * and, if 'prereq' is nonnull, it is only added to the log if the previous
2230 * entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
2231 * populated with the entry ID for the new log entry.
2233 * Returns a "struct raft_command" that may be used to track progress adding
2234 * the log entry. The caller must eventually free the returned structure, with
2235 * raft_command_unref(). */
2236 struct raft_command
* OVS_WARN_UNUSED_RESULT
2237 raft_command_execute(struct raft
*raft
, const struct json
*data
,
2238 const struct uuid
*prereq
, struct uuid
*result
)
2240 return raft_command_execute__(raft
, data
, NULL
, 0, prereq
, result
);
2243 /* Returns the status of 'cmd'. */
2244 enum raft_command_status
2245 raft_command_get_status(const struct raft_command
*cmd
)
2247 ovs_assert(cmd
->n_refs
> 0);
2251 /* Returns the index of the log entry at which 'cmd' was committed.
2253 * This function works only with successful commands. */
2255 raft_command_get_commit_index(const struct raft_command
*cmd
)
2257 ovs_assert(cmd
->n_refs
> 0);
2258 ovs_assert(cmd
->status
== RAFT_CMD_SUCCESS
);
2264 raft_command_unref(struct raft_command
*cmd
)
2267 ovs_assert(cmd
->n_refs
> 0);
2268 if (!--cmd
->n_refs
) {
2274 /* Causes poll_block() to wake up when 'cmd' has status to report. */
2276 raft_command_wait(const struct raft_command
*cmd
)
2278 if (cmd
->status
!= RAFT_CMD_INCOMPLETE
) {
2279 poll_immediate_wake();
2284 raft_command_complete(struct raft
*raft
,
2285 struct raft_command
*cmd
,
2286 enum raft_command_status status
)
2288 VLOG_DBG("raft_command_complete eid "UUID_FMT
" status: %s",
2289 UUID_ARGS(&cmd
->eid
), raft_command_status_to_string(status
));
2290 if (!uuid_is_zero(&cmd
->sid
)) {
2291 uint64_t commit_index
= status
== RAFT_CMD_SUCCESS
? cmd
->index
: 0;
2292 raft_send_execute_command_reply(raft
, &cmd
->sid
, &cmd
->eid
, status
,
2296 ovs_assert(cmd
->status
== RAFT_CMD_INCOMPLETE
);
2297 ovs_assert(cmd
->n_refs
> 0);
2298 hmap_remove(&raft
->commands
, &cmd
->hmap_node
);
2299 cmd
->status
= status
;
2300 raft_command_unref(cmd
);
2304 raft_complete_all_commands(struct raft
*raft
, enum raft_command_status status
)
2306 struct raft_command
*cmd
, *next
;
2307 HMAP_FOR_EACH_SAFE (cmd
, next
, hmap_node
, &raft
->commands
) {
2308 raft_command_complete(raft
, cmd
, status
);
2312 static struct raft_command
*
2313 raft_find_command_by_eid(struct raft
*raft
, const struct uuid
*eid
)
2315 struct raft_command
*cmd
;
2317 HMAP_FOR_EACH (cmd
, hmap_node
, &raft
->commands
) {
2318 if (uuid_equals(&cmd
->eid
, eid
)) {
2325 #define RAFT_RPC(ENUM, NAME) \
2326 static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
2331 raft_handle_hello_request(struct raft
*raft OVS_UNUSED
,
2332 const struct raft_hello_request
*hello OVS_UNUSED
)
2336 /* 'sid' is the server being added. */
2338 raft_send_add_server_reply__(struct raft
*raft
, const struct uuid
*sid
,
2339 const char *address
,
2340 bool success
, const char *comment
)
2342 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(10, 10);
2343 if (!VLOG_DROP_INFO(&rl
)) {
2344 struct ds s
= DS_EMPTY_INITIALIZER
;
2345 char buf
[SID_LEN
+ 1];
2346 ds_put_format(&s
, "adding %s ("SID_FMT
" at %s) "
2347 "to cluster "CID_FMT
" %s",
2348 raft_get_nickname(raft
, sid
, buf
, sizeof buf
),
2349 SID_ARGS(sid
), address
, CID_ARGS(&raft
->cid
),
2350 success
? "succeeded" : "failed");
2352 ds_put_format(&s
, " (%s)", comment
);
2354 VLOG_INFO("%s", ds_cstr(&s
));
2358 union raft_rpc rpy
= {
2359 .add_server_reply
= {
2361 .type
= RAFT_RPC_ADD_SERVER_REPLY
,
2363 .comment
= CONST_CAST(char *, comment
),
2369 struct sset
*remote_addresses
= &rpy
.add_server_reply
.remote_addresses
;
2370 sset_init(remote_addresses
);
2371 if (!raft
->joining
) {
2372 struct raft_server
*s
;
2373 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
2374 if (!uuid_equals(&s
->sid
, &raft
->sid
)) {
2375 sset_add(remote_addresses
, s
->address
);
2380 raft_send(raft
, &rpy
);
2382 sset_destroy(remote_addresses
);
2386 raft_send_remove_server_reply_rpc(struct raft
*raft
,
2387 const struct uuid
*dst_sid
,
2388 const struct uuid
*target_sid
,
2389 bool success
, const char *comment
)
2391 if (uuid_equals(&raft
->sid
, dst_sid
)) {
2392 if (success
&& uuid_equals(&raft
->sid
, target_sid
)) {
2393 raft_finished_leaving_cluster(raft
);
2398 const union raft_rpc rpy
= {
2399 .remove_server_reply
= {
2401 .type
= RAFT_RPC_REMOVE_SERVER_REPLY
,
2403 .comment
= CONST_CAST(char *, comment
),
2405 .target_sid
= (uuid_equals(dst_sid
, target_sid
)
2411 raft_send(raft
, &rpy
);
2415 raft_send_remove_server_reply__(struct raft
*raft
,
2416 const struct uuid
*target_sid
,
2417 const struct uuid
*requester_sid
,
2418 struct unixctl_conn
*requester_conn
,
2419 bool success
, const char *comment
)
2421 struct ds s
= DS_EMPTY_INITIALIZER
;
2422 ds_put_format(&s
, "request ");
2423 if (!uuid_is_zero(requester_sid
)) {
2424 char buf
[SID_LEN
+ 1];
2425 ds_put_format(&s
, "by %s",
2426 raft_get_nickname(raft
, requester_sid
, buf
, sizeof buf
));
2428 ds_put_cstr(&s
, "via unixctl");
2430 ds_put_cstr(&s
, " to remove ");
2431 if (!requester_conn
&& uuid_equals(target_sid
, requester_sid
)) {
2432 ds_put_cstr(&s
, "itself");
2434 char buf
[SID_LEN
+ 1];
2435 ds_put_cstr(&s
, raft_get_nickname(raft
, target_sid
, buf
, sizeof buf
));
2436 if (uuid_equals(target_sid
, &raft
->sid
)) {
2437 ds_put_cstr(&s
, " (ourselves)");
2440 ds_put_format(&s
, " from cluster "CID_FMT
" %s",
2441 CID_ARGS(&raft
->cid
),
2442 success
? "succeeded" : "failed");
2444 ds_put_format(&s
, " (%s)", comment
);
2447 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(10, 10);
2448 VLOG_INFO_RL(&rl
, "%s", ds_cstr(&s
));
2450 /* Send RemoveServerReply to the requester (which could be a server or a
2451 * unixctl connection. Also always send it to the removed server; this
2452 * allows it to be sure that it's really removed and update its log and
2453 * disconnect permanently. */
2454 if (!uuid_is_zero(requester_sid
)) {
2455 raft_send_remove_server_reply_rpc(raft
, requester_sid
, target_sid
,
2458 if (!uuid_equals(requester_sid
, target_sid
)) {
2459 raft_send_remove_server_reply_rpc(raft
, target_sid
, target_sid
,
2462 if (requester_conn
) {
2464 unixctl_command_reply(requester_conn
, ds_cstr(&s
));
2466 unixctl_command_reply_error(requester_conn
, ds_cstr(&s
));
2474 raft_send_add_server_reply(struct raft
*raft
,
2475 const struct raft_add_server_request
*rq
,
2476 bool success
, const char *comment
)
2478 return raft_send_add_server_reply__(raft
, &rq
->common
.sid
, rq
->address
,
2483 raft_send_remove_server_reply(struct raft
*raft
,
2484 const struct raft_remove_server_request
*rq
,
2485 bool success
, const char *comment
)
2487 return raft_send_remove_server_reply__(raft
, &rq
->sid
, &rq
->common
.sid
,
2488 rq
->requester_conn
, success
,
2493 raft_become_follower(struct raft
*raft
)
2495 raft
->leader_sid
= UUID_ZERO
;
2496 if (raft
->role
== RAFT_FOLLOWER
) {
2500 raft
->role
= RAFT_FOLLOWER
;
2501 raft_reset_election_timer(raft
);
2503 /* Notify clients about lost leadership.
2505 * We do not reverse our changes to 'raft->servers' because the new
2506 * configuration is already part of the log. Possibly the configuration
2507 * log entry will not be committed, but until we know that we must use the
2508 * new configuration. Our AppendEntries processing will properly update
2509 * the server configuration later, if necessary. */
2510 struct raft_server
*s
;
2511 HMAP_FOR_EACH (s
, hmap_node
, &raft
->add_servers
) {
2512 raft_send_add_server_reply__(raft
, &s
->sid
, s
->address
, false,
2513 RAFT_SERVER_LOST_LEADERSHIP
);
2515 if (raft
->remove_server
) {
2516 raft_send_remove_server_reply__(raft
, &raft
->remove_server
->sid
,
2517 &raft
->remove_server
->requester_sid
,
2518 raft
->remove_server
->requester_conn
,
2519 false, RAFT_SERVER_LOST_LEADERSHIP
);
2520 raft_server_destroy(raft
->remove_server
);
2521 raft
->remove_server
= NULL
;
2524 raft_complete_all_commands(raft
, RAFT_CMD_LOST_LEADERSHIP
);
2528 raft_send_append_request(struct raft
*raft
,
2529 struct raft_server
*peer
, unsigned int n
,
2530 const char *comment
)
2532 ovs_assert(raft
->role
== RAFT_LEADER
);
2534 const union raft_rpc rq
= {
2537 .type
= RAFT_RPC_APPEND_REQUEST
,
2539 .comment
= CONST_CAST(char *, comment
),
2542 .prev_log_index
= peer
->next_index
- 1,
2543 .prev_log_term
= (peer
->next_index
- 1 >= raft
->log_start
2544 ? raft
->entries
[peer
->next_index
- 1
2545 - raft
->log_start
].term
2547 .leader_commit
= raft
->commit_index
,
2548 .entries
= &raft
->entries
[peer
->next_index
- raft
->log_start
],
2552 raft_send(raft
, &rq
);
2556 raft_send_heartbeats(struct raft
*raft
)
2558 struct raft_server
*s
;
2559 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
2560 if (!uuid_equals(&raft
->sid
, &s
->sid
)) {
2561 raft_send_append_request(raft
, s
, 0, "heartbeat");
2565 /* Send anyone waiting for a command to complete a ping to let them
2566 * know we're still working on it. */
2567 struct raft_command
*cmd
;
2568 HMAP_FOR_EACH (cmd
, hmap_node
, &raft
->commands
) {
2569 if (!uuid_is_zero(&cmd
->sid
)) {
2570 raft_send_execute_command_reply(raft
, &cmd
->sid
,
2572 RAFT_CMD_INCOMPLETE
, 0);
2576 raft_reset_ping_timer(raft
);
2579 /* Initializes the fields in 's' that represent the leader's view of the
2582 raft_server_init_leader(struct raft
*raft
, struct raft_server
*s
)
2584 s
->next_index
= raft
->log_end
;
2586 s
->phase
= RAFT_PHASE_STABLE
;
2588 s
->install_snapshot_request_in_progress
= false;
2592 raft_set_leader(struct raft
*raft
, const struct uuid
*sid
)
2594 raft
->leader_sid
= *sid
;
2595 raft
->ever_had_leader
= raft
->had_leader
= true;
2596 raft
->candidate_retrying
= false;
2600 raft_become_leader(struct raft
*raft
)
2602 log_all_commands(raft
);
2604 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 5);
2605 VLOG_INFO_RL(&rl
, "term %"PRIu64
": elected leader by %d+ of "
2606 "%"PRIuSIZE
" servers", raft
->term
,
2607 raft
->n_votes
, hmap_count(&raft
->servers
));
2609 ovs_assert(raft
->role
!= RAFT_LEADER
);
2610 raft
->role
= RAFT_LEADER
;
2611 raft
->election_won
= time_msec();
2612 raft_set_leader(raft
, &raft
->sid
);
2613 raft_reset_election_timer(raft
);
2614 raft_reset_ping_timer(raft
);
2616 struct raft_server
*s
;
2617 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
2618 raft_server_init_leader(raft
, s
);
2621 raft
->election_timer_new
= 0;
2623 raft_update_our_match_index(raft
, raft
->log_end
- 1);
2625 /* Write the fact that we are leader to the log. This is not used by the
2626 * algorithm (although it could be, for quick restart), but it is used for
2627 * offline analysis to check for conformance with the properties that Raft
2629 struct raft_record r
= {
2630 .type
= RAFT_REC_LEADER
,
2634 ignore(ovsdb_log_write_and_free(raft
->log
, raft_record_to_json(&r
)));
2636 /* Initiate a no-op commit. Otherwise we might never find out what's in
2637 * the log. See section 6.4 item 1:
2639 * The Leader Completeness Property guarantees that a leader has all
2640 * committed entries, but at the start of its term, it may not know
2641 * which those are. To find out, it needs to commit an entry from its
2642 * term. Raft handles this by having each leader commit a blank no-op
2643 * entry into the log at the start of its term. As soon as this no-op
2644 * entry is committed, the leader’s commit index will be at least as
2645 * large as any other servers’ during its term.
2647 raft_command_unref(raft_command_execute__(raft
, NULL
, NULL
, 0, NULL
,
2651 /* Processes term 'term' received as part of RPC 'common'. Returns true if the
2652 * caller should continue processing the RPC, false if the caller should reject
2653 * it due to a stale term. */
2655 raft_receive_term__(struct raft
*raft
, const struct raft_rpc_common
*common
,
2658 /* Section 3.3 says:
2660 * Current terms are exchanged whenever servers communicate; if one
2661 * server’s current term is smaller than the other’s, then it updates
2662 * its current term to the larger value. If a candidate or leader
2663 * discovers that its term is out of date, it immediately reverts to
2664 * follower state. If a server receives a request with a stale term
2665 * number, it rejects the request.
2667 if (term
> raft
->term
) {
2668 if (!raft_set_term(raft
, term
, NULL
)) {
2669 /* Failed to update the term to 'term'. */
2672 raft_become_follower(raft
);
2673 } else if (term
< raft
->term
) {
2674 char buf
[SID_LEN
+ 1];
2675 VLOG_INFO("rejecting term %"PRIu64
" < current term %"PRIu64
" received "
2676 "in %s message from server %s",
2678 raft_rpc_type_to_string(common
->type
),
2679 raft_get_nickname(raft
, &common
->sid
, buf
, sizeof buf
));
2686 raft_get_servers_from_log(struct raft
*raft
, enum vlog_level level
)
2688 const struct json
*servers_json
= raft
->snap
.servers
;
2689 for (uint64_t index
= raft
->log_end
- 1; index
>= raft
->log_start
;
2691 struct raft_entry
*e
= &raft
->entries
[index
- raft
->log_start
];
2693 servers_json
= e
->servers
;
2698 struct hmap servers
;
2699 struct ovsdb_error
*error
= raft_servers_from_json(servers_json
, &servers
);
2701 raft_set_servers(raft
, &servers
, level
);
2702 raft_servers_destroy(&servers
);
2705 /* Truncates the log, so that raft->log_end becomes 'new_end'.
2707 * Doesn't write anything to disk. In theory, we could truncate the on-disk
2708 * log file, but we don't have the right information to know how long it should
2709 * be. What we actually do is to append entries for older indexes to the
2710 * on-disk log; when we re-read it later, these entries truncate the log.
2712 * Returns true if any of the removed log entries were server configuration
2713 * entries, false otherwise. */
2715 raft_truncate(struct raft
*raft
, uint64_t new_end
)
2717 ovs_assert(new_end
>= raft
->log_start
);
2718 if (raft
->log_end
> new_end
) {
2719 char buf
[SID_LEN
+ 1];
2720 VLOG_INFO("%s truncating %"PRIu64
" entries from end of log",
2721 raft_get_nickname(raft
, &raft
->sid
, buf
, sizeof buf
),
2722 raft
->log_end
- new_end
);
2725 bool servers_changed
= false;
2726 while (raft
->log_end
> new_end
) {
2727 struct raft_entry
*entry
= &raft
->entries
[--raft
->log_end
2729 if (entry
->servers
) {
2730 servers_changed
= true;
2732 raft_entry_uninit(entry
);
2734 return servers_changed
;
2737 static const struct json
*
2738 raft_peek_next_entry(struct raft
*raft
, struct uuid
*eid
)
2740 /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
2741 ovs_assert(raft
->log_start
<= raft
->last_applied
+ 2);
2742 ovs_assert(raft
->last_applied
<= raft
->commit_index
);
2743 ovs_assert(raft
->commit_index
< raft
->log_end
);
2745 if (raft
->joining
|| raft
->failed
) {
2749 if (raft
->log_start
== raft
->last_applied
+ 2) {
2750 *eid
= raft
->snap
.eid
;
2751 return raft
->snap
.data
;
2754 while (raft
->last_applied
< raft
->commit_index
) {
2755 const struct raft_entry
*e
= raft_get_entry(raft
,
2756 raft
->last_applied
+ 1);
2761 raft
->last_applied
++;
2766 static const struct json
*
2767 raft_get_next_entry(struct raft
*raft
, struct uuid
*eid
)
2769 const struct json
*data
= raft_peek_next_entry(raft
, eid
);
2771 raft
->last_applied
++;
2776 /* Updates commit index in raft log. If commit index is already up-to-date
2777 * it does nothing and return false, otherwise, returns true. */
2779 raft_update_commit_index(struct raft
*raft
, uint64_t new_commit_index
)
2781 if (new_commit_index
<= raft
->commit_index
) {
2785 if (raft
->role
== RAFT_LEADER
) {
2786 while (raft
->commit_index
< new_commit_index
) {
2787 uint64_t index
= ++raft
->commit_index
;
2788 const struct raft_entry
*e
= raft_get_entry(raft
, index
);
2790 struct raft_command
*cmd
2791 = raft_find_command_by_eid(raft
, &e
->eid
);
2794 VLOG_DBG("Command completed after role change from"
2795 " follower to leader "UUID_FMT
,
2796 UUID_ARGS(&e
->eid
));
2799 raft_command_complete(raft
, cmd
, RAFT_CMD_SUCCESS
);
2802 if (e
->election_timer
) {
2803 VLOG_INFO("Election timer changed from %"PRIu64
" to %"PRIu64
,
2804 raft
->election_timer
, e
->election_timer
);
2805 raft
->election_timer
= e
->election_timer
;
2806 raft
->election_timer_new
= 0;
2809 /* raft_run_reconfigure() can write a new Raft entry, which can
2810 * reallocate raft->entries, which would invalidate 'e', so
2811 * this case must be last, after the one for 'e->data'. */
2812 raft_run_reconfigure(raft
);
2816 while (raft
->commit_index
< new_commit_index
) {
2817 uint64_t index
= ++raft
->commit_index
;
2818 const struct raft_entry
*e
= raft_get_entry(raft
, index
);
2819 if (e
->election_timer
) {
2820 VLOG_INFO("Election timer changed from %"PRIu64
" to %"PRIu64
,
2821 raft
->election_timer
, e
->election_timer
);
2822 raft
->election_timer
= e
->election_timer
;
2825 /* Check if any pending command can be completed, and complete it.
2826 * This can happen when leader fail-over before sending
2827 * execute_command_reply. */
2828 const struct uuid
*eid
= raft_get_eid(raft
, new_commit_index
);
2829 struct raft_command
*cmd
= raft_find_command_by_eid(raft
, eid
);
2831 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
2833 "Command completed without reply (eid: "UUID_FMT
", "
2834 "commit index: %"PRIu64
")",
2835 UUID_ARGS(eid
), new_commit_index
);
2836 cmd
->index
= new_commit_index
;
2837 raft_command_complete(raft
, cmd
, RAFT_CMD_SUCCESS
);
2841 /* Write the commit index to the log. The next time we restart, this
2842 * allows us to start exporting a reasonably fresh log, instead of a log
2843 * that only contains the snapshot. */
2844 struct raft_record r
= {
2845 .type
= RAFT_REC_COMMIT_INDEX
,
2846 .commit_index
= raft
->commit_index
,
2848 ignore(ovsdb_log_write_and_free(raft
->log
, raft_record_to_json(&r
)));
2852 /* This doesn't use rq->entries (but it does use rq->n_entries). */
2854 raft_send_append_reply(struct raft
*raft
, const struct raft_append_request
*rq
,
2855 enum raft_append_result result
, const char *comment
)
2857 /* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
2858 * min(leaderCommit, index of last new entry)" */
2859 if (result
== RAFT_APPEND_OK
&& rq
->leader_commit
> raft
->commit_index
) {
2860 raft_update_commit_index(
2861 raft
, MIN(rq
->leader_commit
, rq
->prev_log_index
+ rq
->n_entries
));
2865 union raft_rpc reply
= {
2868 .type
= RAFT_RPC_APPEND_REPLY
,
2869 .sid
= rq
->common
.sid
,
2870 .comment
= CONST_CAST(char *, comment
),
2873 .log_end
= raft
->log_end
,
2874 .prev_log_index
= rq
->prev_log_index
,
2875 .prev_log_term
= rq
->prev_log_term
,
2876 .n_entries
= rq
->n_entries
,
2880 raft_send(raft
, &reply
);
2883 /* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
2884 * NULL. Otherwise, returns an explanation for the mismatch. */
2886 match_index_and_term(const struct raft
*raft
,
2887 uint64_t prev_log_index
, uint64_t prev_log_term
)
2889 if (prev_log_index
< raft
->log_start
- 1) {
2890 return "mismatch before start of log";
2891 } else if (prev_log_index
== raft
->log_start
- 1) {
2892 if (prev_log_term
!= raft
->snap
.term
) {
2893 return "prev_term mismatch";
2895 } else if (prev_log_index
< raft
->log_end
) {
2896 if (raft
->entries
[prev_log_index
- raft
->log_start
].term
2898 return "term mismatch";
2901 /* prev_log_index >= raft->log_end */
2902 return "mismatch past end of log";
2908 raft_handle_append_entries(struct raft
*raft
,
2909 const struct raft_append_request
*rq
,
2910 uint64_t prev_log_index
, uint64_t prev_log_term
,
2911 const struct raft_entry
*entries
,
2912 unsigned int n_entries
)
2914 /* Section 3.5: "When sending an AppendEntries RPC, the leader includes
2915 * the index and term of the entry in its log that immediately precedes
2916 * the new entries. If the follower does not find an entry in its log
2917 * with the same index and term, then it refuses the new entries." */
2918 const char *mismatch
= match_index_and_term(raft
, prev_log_index
,
2921 VLOG_INFO("rejecting append_request because previous entry "
2922 "%"PRIu64
",%"PRIu64
" not in local log (%s)",
2923 prev_log_term
, prev_log_index
, mismatch
);
2924 raft_send_append_reply(raft
, rq
, RAFT_APPEND_INCONSISTENCY
, mismatch
);
2928 /* Figure 3.1: "If an existing entry conflicts with a new one (same
2929 * index but different terms), delete the existing entry and all that
2932 bool servers_changed
= false;
2933 for (i
= 0; ; i
++) {
2934 if (i
>= n_entries
) {
2936 if (rq
->common
.comment
2937 && !strcmp(rq
->common
.comment
, "heartbeat")) {
2938 raft_send_append_reply(raft
, rq
, RAFT_APPEND_OK
, "heartbeat");
2940 raft_send_append_reply(raft
, rq
, RAFT_APPEND_OK
, "no change");
2945 uint64_t log_index
= (prev_log_index
+ 1) + i
;
2946 if (log_index
>= raft
->log_end
) {
2949 if (raft
->entries
[log_index
- raft
->log_start
].term
2950 != entries
[i
].term
) {
2951 if (raft_truncate(raft
, log_index
)) {
2952 servers_changed
= true;
2958 if (failure_test
== FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE
) {
2959 ovs_fatal(0, "Raft test: crash after receiving append_request with "
2962 /* Figure 3.1: "Append any entries not already in the log." */
2963 struct ovsdb_error
*error
= NULL
;
2964 bool any_written
= false;
2965 for (; i
< n_entries
; i
++) {
2966 const struct raft_entry
*e
= &entries
[i
];
2967 error
= raft_write_entry(raft
, e
->term
,
2968 json_nullable_clone(e
->data
), &e
->eid
,
2969 json_nullable_clone(e
->servers
),
2976 servers_changed
= true;
2981 raft_waiter_create(raft
, RAFT_W_ENTRY
, true)->entry
.index
2982 = raft
->log_end
- 1;
2984 if (servers_changed
) {
2985 /* The set of servers might have changed; check. */
2986 raft_get_servers_from_log(raft
, VLL_INFO
);
2990 char *s
= ovsdb_error_to_string_free(error
);
2993 raft_send_append_reply(raft
, rq
, RAFT_APPEND_IO_ERROR
, "I/O error");
2997 raft_send_append_reply(raft
, rq
, RAFT_APPEND_OK
, "log updated");
3001 raft_update_leader(struct raft
*raft
, const struct uuid
*sid
)
3003 if (raft
->role
== RAFT_LEADER
) {
3004 char buf
[SID_LEN
+ 1];
3005 VLOG_ERR("this server is leader but server %s claims to be",
3006 raft_get_nickname(raft
, sid
, buf
, sizeof buf
));
3008 } else if (!uuid_equals(sid
, &raft
->leader_sid
)) {
3009 if (!uuid_is_zero(&raft
->leader_sid
)) {
3010 char buf1
[SID_LEN
+ 1];
3011 char buf2
[SID_LEN
+ 1];
3012 VLOG_ERR("leader for term %"PRIu64
" changed from %s to %s",
3014 raft_get_nickname(raft
, &raft
->leader_sid
,
3016 raft_get_nickname(raft
, sid
, buf2
, sizeof buf2
));
3018 char buf
[SID_LEN
+ 1];
3019 VLOG_INFO("server %s is leader for term %"PRIu64
,
3020 raft_get_nickname(raft
, sid
, buf
, sizeof buf
),
3023 raft_set_leader(raft
, sid
);
3025 /* Record the leader to the log. This is not used by the algorithm
3026 * (although it could be, for quick restart), but it is used for
3027 * offline analysis to check for conformance with the properties
3028 * that Raft guarantees. */
3029 struct raft_record r
= {
3030 .type
= RAFT_REC_LEADER
,
3034 ignore(ovsdb_log_write_and_free(raft
->log
, raft_record_to_json(&r
)));
3036 if (raft
->role
== RAFT_CANDIDATE
) {
3037 /* Section 3.4: While waiting for votes, a candidate may
3038 * receive an AppendEntries RPC from another server claiming to
3039 * be leader. If the leader’s term (included in its RPC) is at
3040 * least as large as the candidate’s current term, then the
3041 * candidate recognizes the leader as legitimate and returns to
3042 * follower state. */
3043 raft
->role
= RAFT_FOLLOWER
;
3049 raft_handle_append_request(struct raft
*raft
,
3050 const struct raft_append_request
*rq
)
3052 /* We do not check whether the server that sent the request is part of the
3053 * cluster. As section 4.1 says, "A server accepts AppendEntries requests
3054 * from a leader that is not part of the server’s latest configuration.
3055 * Otherwise, a new server could never be added to the cluster (it would
3056 * never accept any log entries preceding the configuration entry that adds
3058 if (!raft_update_leader(raft
, &rq
->common
.sid
)) {
3059 raft_send_append_reply(raft
, rq
, RAFT_APPEND_INCONSISTENCY
,
3060 "usurped leadership");
3063 raft_reset_election_timer(raft
);
3065 /* First check for the common case, where the AppendEntries request is
3066 * entirely for indexes covered by 'log_start' ... 'log_end - 1', something
3069 * rq->prev_log_index
3070 * | first_entry_index
3071 * | | nth_entry_index
3075 * T | T | T | T | T |
3078 * T | T | T | T | T |
3084 uint64_t first_entry_index
= rq
->prev_log_index
+ 1;
3085 uint64_t nth_entry_index
= rq
->prev_log_index
+ rq
->n_entries
;
3086 if (OVS_LIKELY(first_entry_index
>= raft
->log_start
)) {
3087 raft_handle_append_entries(raft
, rq
,
3088 rq
->prev_log_index
, rq
->prev_log_term
,
3089 rq
->entries
, rq
->n_entries
);
3093 /* Now a series of checks for odd cases, where the AppendEntries request
3094 * extends earlier than the beginning of our log, into the log entries
3095 * discarded by the most recent snapshot. */
3098 * Handle the case where the indexes covered by rq->entries[] are entirely
3099 * disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
3100 * everything in the AppendEntries request must already have been
3101 * committed, and we might as well return true.
3103 * rq->prev_log_index
3104 * | first_entry_index
3105 * | | nth_entry_index
3109 * T | T | T | T | T |
3112 * T | T | T | T | T |
3118 if (nth_entry_index
< raft
->log_start
- 1) {
3119 raft_send_append_reply(raft
, rq
, RAFT_APPEND_OK
,
3120 "append before log start");
3125 * Handle the case where the last entry in rq->entries[] has the same index
3126 * as 'log_start - 1', so we can compare their terms:
3128 * rq->prev_log_index
3129 * | first_entry_index
3130 * | | nth_entry_index
3134 * T | T | T | T | T |
3137 * T | T | T | T | T |
3143 * There's actually a sub-case where rq->n_entries == 0, in which we
3144 * compare rq->prev_term:
3146 * rq->prev_log_index
3154 * T | T | T | T | T |
3160 if (nth_entry_index
== raft
->log_start
- 1) {
3162 ? raft
->snap
.term
== rq
->entries
[rq
->n_entries
- 1].term
3163 : raft
->snap
.term
== rq
->prev_log_term
) {
3164 raft_send_append_reply(raft
, rq
, RAFT_APPEND_OK
, "no change");
3166 raft_send_append_reply(raft
, rq
, RAFT_APPEND_INCONSISTENCY
,
3173 * We now know that the data in rq->entries[] overlaps the data in
3174 * raft->entries[], as shown below, with some positive 'ofs':
3176 * rq->prev_log_index
3177 * | first_entry_index
3178 * | | nth_entry_index
3181 * +---+---+---+---+---+
3182 * T | T | T | T | T | T |
3183 * +---+-------+---+---+
3185 * T | T | T | T | T |
3193 * We transform this into the following by trimming the first 'ofs'
3194 * elements off of rq->entries[], ending up with the following. Notice how
3195 * we retain the term but not the data for rq->entries[ofs - 1]:
3197 * first_entry_index + ofs - 1
3198 * | first_entry_index + ofs
3199 * | | nth_entry_index + ofs
3206 * T | T | T | T | T |
3212 uint64_t ofs
= raft
->log_start
- first_entry_index
;
3213 raft_handle_append_entries(raft
, rq
,
3214 raft
->log_start
- 1, rq
->entries
[ofs
- 1].term
,
3215 &rq
->entries
[ofs
], rq
->n_entries
- ofs
);
3218 /* Returns true if 'raft' has another log entry or snapshot to read. */
3220 raft_has_next_entry(const struct raft
*raft_
)
3222 struct raft
*raft
= CONST_CAST(struct raft
*, raft_
);
3224 return raft_peek_next_entry(raft
, &eid
) != NULL
;
3227 /* Returns the next log entry or snapshot from 'raft', or NULL if there are
3228 * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
3229 * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
3232 raft_next_entry(struct raft
*raft
, struct uuid
*eid
, bool *is_snapshot
)
3234 const struct json
*data
= raft_get_next_entry(raft
, eid
);
3235 *is_snapshot
= data
== raft
->snap
.data
;
3239 /* Returns the log index of the last-read snapshot or log entry. */
3241 raft_get_applied_index(const struct raft
*raft
)
3243 return raft
->last_applied
;
3246 /* Returns the log index of the last snapshot or log entry that is available to
3249 raft_get_commit_index(const struct raft
*raft
)
3251 return raft
->commit_index
;
3254 static struct raft_server
*
3255 raft_find_peer(struct raft
*raft
, const struct uuid
*uuid
)
3257 struct raft_server
*s
= raft_find_server(raft
, uuid
);
3258 return s
&& !uuid_equals(&raft
->sid
, &s
->sid
) ? s
: NULL
;
3261 static struct raft_server
*
3262 raft_find_new_server(struct raft
*raft
, const struct uuid
*uuid
)
3264 return raft_server_find(&raft
->add_servers
, uuid
);
3267 /* Figure 3.1: "If there exists an N such that N > commitIndex, a
3268 * majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
3269 * commitIndex = N (sections 3.5 and 3.6)." */
3271 raft_consider_updating_commit_index(struct raft
*raft
)
3273 /* This loop cannot just bail out when it comes across a log entry that
3274 * does not match the criteria. For example, Figure 3.7(d2) shows a
3275 * case where the log entry for term 2 cannot be committed directly
3276 * (because it is not for the current term) but it can be committed as
3277 * a side effect of commit the entry for term 4 (the current term).
3278 * XXX Is there a more efficient way to do this? */
3279 ovs_assert(raft
->role
== RAFT_LEADER
);
3281 uint64_t new_commit_index
= raft
->commit_index
;
3282 for (uint64_t idx
= MAX(raft
->commit_index
+ 1, raft
->log_start
);
3283 idx
< raft
->log_end
; idx
++) {
3284 if (raft
->entries
[idx
- raft
->log_start
].term
== raft
->term
) {
3286 struct raft_server
*s2
;
3287 HMAP_FOR_EACH (s2
, hmap_node
, &raft
->servers
) {
3288 if (s2
->match_index
>= idx
) {
3292 if (count
> hmap_count(&raft
->servers
) / 2) {
3293 VLOG_DBG("index %"PRIu64
" committed to %"PRIuSIZE
" servers, "
3294 "applying", idx
, count
);
3295 new_commit_index
= idx
;
3299 if (raft_update_commit_index(raft
, new_commit_index
)) {
3300 raft_send_heartbeats(raft
);
3305 raft_update_match_index(struct raft
*raft
, struct raft_server
*s
,
3308 ovs_assert(raft
->role
== RAFT_LEADER
);
3309 if (min_index
> s
->match_index
) {
3310 s
->match_index
= min_index
;
3311 raft_consider_updating_commit_index(raft
);
3316 raft_update_our_match_index(struct raft
*raft
, uint64_t min_index
)
3318 struct raft_server
*server
= raft_find_server(raft
, &raft
->sid
);
3320 raft_update_match_index(raft
, server
, min_index
);
3325 raft_send_install_snapshot_request(struct raft
*raft
,
3326 const struct raft_server
*s
,
3327 const char *comment
)
3329 union raft_rpc rpc
= {
3330 .install_snapshot_request
= {
3332 .type
= RAFT_RPC_INSTALL_SNAPSHOT_REQUEST
,
3334 .comment
= CONST_CAST(char *, comment
),
3337 .last_index
= raft
->log_start
- 1,
3338 .last_term
= raft
->snap
.term
,
3339 .last_servers
= raft
->snap
.servers
,
3340 .last_eid
= raft
->snap
.eid
,
3341 .data
= raft
->snap
.data
,
3342 .election_timer
= raft
->election_timer
, /* use latest value */
3346 if (s
->install_snapshot_request_in_progress
) {
3347 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
3349 VLOG_INFO_RL(&rl
, "not sending snapshot to server %s, "
3350 "already in progress", s
->nickname
);
3354 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
3355 VLOG_INFO_RL(&rl
, "sending snapshot to server %s, %"PRIu64
":%"PRIu64
".",
3356 s
->nickname
, raft
->term
, raft
->log_start
- 1);
3357 CONST_CAST(struct raft_server
*, s
)->install_snapshot_request_in_progress
3358 = raft_send(raft
, &rpc
);
3362 raft_handle_append_reply(struct raft
*raft
,
3363 const struct raft_append_reply
*rpy
)
3365 if (raft
->role
!= RAFT_LEADER
) {
3366 VLOG_INFO("rejected append_reply (not leader)");
3370 /* Most commonly we'd be getting an AppendEntries reply from a configured
3371 * server (e.g. a peer), but we can also get them from servers in the
3372 * process of being added. */
3373 struct raft_server
*s
= raft_find_peer(raft
, &rpy
->common
.sid
);
3375 s
= raft_find_new_server(raft
, &rpy
->common
.sid
);
3377 VLOG_INFO("rejected append_reply from unknown server "SID_FMT
,
3378 SID_ARGS(&rpy
->common
.sid
));
3384 if (rpy
->result
== RAFT_APPEND_OK
) {
3385 /* Figure 3.1: "If successful, update nextIndex and matchIndex for
3386 * follower (section 3.5)." */
3387 uint64_t min_index
= rpy
->prev_log_index
+ rpy
->n_entries
+ 1;
3388 if (s
->next_index
< min_index
) {
3389 s
->next_index
= min_index
;
3391 raft_update_match_index(raft
, s
, min_index
- 1);
3393 /* Figure 3.1: "If AppendEntries fails because of log inconsistency,
3394 * decrement nextIndex and retry (section 3.5)."
3396 * We also implement the optimization suggested in section 4.2.1:
3397 * "Various approaches can make nextIndex converge to its correct value
3398 * more quickly, including those described in Chapter 3. The simplest
3399 * approach to solving this particular problem of adding a new server,
3400 * however, is to have followers return the length of their logs in the
3401 * AppendEntries response; this allows the leader to cap the follower’s
3402 * nextIndex accordingly." */
3403 s
->next_index
= (s
->next_index
> 0
3404 ? MIN(s
->next_index
- 1, rpy
->log_end
)
3407 if (rpy
->result
== RAFT_APPEND_IO_ERROR
) {
3408 /* Append failed but not because of a log inconsistency. Because
3409 * of the I/O error, there's no point in re-sending the append
3411 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
3412 VLOG_INFO_RL(&rl
, "%s reported I/O error", s
->nickname
);
3418 * Our behavior here must depend on the value of next_index relative to
3419 * log_start and log_end. There are three cases:
3421 * Case 1 | Case 2 | Case 3
3422 * <---------------->|<------------->|<------------------>
3426 * T | T | T | T | T |
3432 if (s
->next_index
< raft
->log_start
) {
3434 raft_send_install_snapshot_request(raft
, s
, NULL
);
3435 } else if (s
->next_index
< raft
->log_end
) {
3437 raft_send_append_request(raft
, s
, raft
->log_end
- s
->next_index
, NULL
);
3440 if (s
->phase
== RAFT_PHASE_CATCHUP
) {
3441 s
->phase
= RAFT_PHASE_CAUGHT_UP
;
3442 raft_run_reconfigure(raft
);
3448 raft_should_suppress_disruptive_server(struct raft
*raft
,
3449 const union raft_rpc
*rpc
)
3451 if (rpc
->type
!= RAFT_RPC_VOTE_REQUEST
) {
3455 /* Section 4.2.3 "Disruptive Servers" says:
3457 * ...if a server receives a RequestVote request within the minimum
3458 * election timeout of hearing from a current leader, it does not update
3459 * its term or grant its vote...
3461 * ...This change conflicts with the leadership transfer mechanism as
3462 * described in Chapter 3, in which a server legitimately starts an
3463 * election without waiting an election timeout. In that case,
3464 * RequestVote messages should be processed by other servers even when
3465 * they believe a current cluster leader exists. Those RequestVote
3466 * requests can include a special flag to indicate this behavior (“I
3467 * have permission to disrupt the leader--it told me to!”).
3469 * This clearly describes how the followers should act, but not the leader.
3470 * We just ignore vote requests that arrive at a current leader. This
3471 * seems to be fairly safe, since a majority other than the current leader
3472 * can still elect a new leader and the first AppendEntries from that new
3473 * leader will depose the current leader. */
3474 const struct raft_vote_request
*rq
= raft_vote_request_cast(rpc
);
3475 if (rq
->leadership_transfer
) {
3479 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
3480 long long int now
= time_msec();
3481 switch (raft
->role
) {
3483 VLOG_WARN_RL(&rl
, "ignoring vote request received as leader");
3487 if (now
< raft
->election_base
+ raft
->election_timer
) {
3488 VLOG_WARN_RL(&rl
, "ignoring vote request received after only "
3489 "%lld ms (minimum election time is %"PRIu64
" ms)",
3490 now
- raft
->election_base
, raft
->election_timer
);
3495 case RAFT_CANDIDATE
:
3503 /* Returns true if a reply should be sent. */
3505 raft_handle_vote_request__(struct raft
*raft
,
3506 const struct raft_vote_request
*rq
)
3508 /* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
3509 * at least as up-to-date as receiver's log, grant vote (sections 3.4,
3511 if (uuid_equals(&raft
->vote
, &rq
->common
.sid
)) {
3512 /* Already voted for this candidate in this term. Resend vote. */
3514 } else if (!uuid_is_zero(&raft
->vote
)) {
3515 /* Already voted for different candidate in this term. Send a reply
3516 * saying what candidate we did vote for. This isn't a necessary part
3517 * of the Raft protocol but it can make debugging easier. */
3521 /* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
3522 * includes information about the candidate’s log, and the voter denies its
3523 * vote if its own log is more up-to-date than that of the candidate. Raft
3524 * determines which of two logs is more up-to-date by comparing the index
3525 * and term of the last entries in the logs. If the logs have last entries
3526 * with different terms, then the log with the later term is more
3527 * up-to-date. If the logs end with the same term, then whichever log is
3528 * longer is more up-to-date." */
3529 uint64_t last_term
= (raft
->log_end
> raft
->log_start
3530 ? raft
->entries
[raft
->log_end
- 1
3531 - raft
->log_start
].term
3533 if (last_term
> rq
->last_log_term
3534 || (last_term
== rq
->last_log_term
3535 && raft
->log_end
- 1 > rq
->last_log_index
)) {
3536 /* Our log is more up-to-date than the peer's. Withhold vote. */
3540 /* Record a vote for the peer. */
3541 if (!raft_set_term(raft
, raft
->term
, &rq
->common
.sid
)) {
3545 raft_reset_election_timer(raft
);
3551 raft_send_vote_reply(struct raft
*raft
, const struct uuid
*dst
,
3552 const struct uuid
*vote
)
3554 union raft_rpc rpy
= {
3557 .type
= RAFT_RPC_VOTE_REPLY
,
3564 raft_send(raft
, &rpy
);
3568 raft_handle_vote_request(struct raft
*raft
,
3569 const struct raft_vote_request
*rq
)
3571 if (raft_handle_vote_request__(raft
, rq
)) {
3572 raft_send_vote_reply(raft
, &rq
->common
.sid
, &raft
->vote
);
3577 raft_handle_vote_reply(struct raft
*raft
,
3578 const struct raft_vote_reply
*rpy
)
3580 if (!raft_receive_term__(raft
, &rpy
->common
, rpy
->term
)) {
3584 if (raft
->role
!= RAFT_CANDIDATE
) {
3588 struct raft_server
*s
= raft_find_peer(raft
, &rpy
->common
.sid
);
3590 raft_accept_vote(raft
, s
, &rpy
->vote
);
3594 /* Returns true if 'raft''s log contains reconfiguration entries that have not
3595 * yet been committed. */
3597 raft_has_uncommitted_configuration(const struct raft
*raft
)
3599 for (uint64_t i
= raft
->commit_index
+ 1; i
< raft
->log_end
; i
++) {
3600 ovs_assert(i
>= raft
->log_start
);
3601 const struct raft_entry
*e
= &raft
->entries
[i
- raft
->log_start
];
3610 raft_log_reconfiguration(struct raft
*raft
)
3612 struct json
*servers_json
= raft_servers_to_json(&raft
->servers
);
3613 raft_command_unref(raft_command_execute__(
3614 raft
, NULL
, servers_json
, 0, NULL
, NULL
));
3615 json_destroy(servers_json
);
3619 raft_run_reconfigure(struct raft
*raft
)
3621 ovs_assert(raft
->role
== RAFT_LEADER
);
3623 /* Reconfiguration only progresses when configuration changes commit. */
3624 if (raft_has_uncommitted_configuration(raft
)) {
3628 /* If we were waiting for a configuration change to commit, it's done. */
3629 struct raft_server
*s
;
3630 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
3631 if (s
->phase
== RAFT_PHASE_COMMITTING
) {
3632 raft_send_add_server_reply__(raft
, &s
->sid
, s
->address
,
3633 true, RAFT_SERVER_COMPLETED
);
3634 s
->phase
= RAFT_PHASE_STABLE
;
3637 if (raft
->remove_server
) {
3638 raft_send_remove_server_reply__(raft
, &raft
->remove_server
->sid
,
3639 &raft
->remove_server
->requester_sid
,
3640 raft
->remove_server
->requester_conn
,
3641 true, RAFT_SERVER_COMPLETED
);
3642 raft_server_destroy(raft
->remove_server
);
3643 raft
->remove_server
= NULL
;
3646 /* If a new server is caught up, add it to the configuration. */
3647 HMAP_FOR_EACH (s
, hmap_node
, &raft
->add_servers
) {
3648 if (s
->phase
== RAFT_PHASE_CAUGHT_UP
) {
3649 /* Move 's' from 'raft->add_servers' to 'raft->servers'. */
3650 hmap_remove(&raft
->add_servers
, &s
->hmap_node
);
3651 hmap_insert(&raft
->servers
, &s
->hmap_node
, uuid_hash(&s
->sid
));
3653 /* Mark 's' as waiting for commit. */
3654 s
->phase
= RAFT_PHASE_COMMITTING
;
3656 raft_log_reconfiguration(raft
);
3658 /* When commit completes we'll transition to RAFT_PHASE_STABLE and
3659 * send a RAFT_SERVER_OK reply. */
3665 /* Remove a server, if one is scheduled for removal. */
3666 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
3667 if (s
->phase
== RAFT_PHASE_REMOVE
) {
3668 hmap_remove(&raft
->servers
, &s
->hmap_node
);
3669 raft
->remove_server
= s
;
3671 raft_log_reconfiguration(raft
);
3679 raft_handle_add_server_request(struct raft
*raft
,
3680 const struct raft_add_server_request
*rq
)
3682 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3683 if (raft
->role
!= RAFT_LEADER
) {
3684 raft_send_add_server_reply(raft
, rq
, false, RAFT_SERVER_NOT_LEADER
);
3688 /* Check for an existing server. */
3689 struct raft_server
*s
= raft_find_server(raft
, &rq
->common
.sid
);
3691 /* If the server is scheduled to be removed, cancel it. */
3692 if (s
->phase
== RAFT_PHASE_REMOVE
) {
3693 s
->phase
= RAFT_PHASE_STABLE
;
3694 raft_send_add_server_reply(raft
, rq
, false, RAFT_SERVER_CANCELED
);
3698 /* If the server is being added, then it's in progress. */
3699 if (s
->phase
!= RAFT_PHASE_STABLE
) {
3700 raft_send_add_server_reply(raft
, rq
,
3701 false, RAFT_SERVER_IN_PROGRESS
);
3704 /* Nothing to do--server is already part of the configuration. */
3705 raft_send_add_server_reply(raft
, rq
,
3706 true, RAFT_SERVER_ALREADY_PRESENT
);
3710 /* Check for a server being removed. */
3711 if (raft
->remove_server
3712 && uuid_equals(&rq
->common
.sid
, &raft
->remove_server
->sid
)) {
3713 raft_send_add_server_reply(raft
, rq
, false, RAFT_SERVER_COMMITTING
);
3717 /* Check for a server already being added. */
3718 if (raft_find_new_server(raft
, &rq
->common
.sid
)) {
3719 raft_send_add_server_reply(raft
, rq
, false, RAFT_SERVER_IN_PROGRESS
);
3723 /* Add server to 'add_servers'. */
3724 s
= raft_server_add(&raft
->add_servers
, &rq
->common
.sid
, rq
->address
);
3725 raft_server_init_leader(raft
, s
);
3726 s
->requester_sid
= rq
->common
.sid
;
3727 s
->requester_conn
= NULL
;
3728 s
->phase
= RAFT_PHASE_CATCHUP
;
3729 s
->last_msg_ts
= time_msec();
3731 /* Start sending the log. If this is the first time we've tried to add
3732 * this server, then this will quickly degenerate into an InstallSnapshot
3733 * followed by a series of AddEntries, but if it's a retry of an earlier
3734 * AddRequest that was interrupted (e.g. by a timeout or a loss of
3735 * leadership) then it will gracefully resume populating the log.
3737 * See the last few paragraphs of section 4.2.1 for further insight. */
3738 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(10, 10);
3740 "starting to add server %s ("SID_FMT
" at %s) "
3741 "to cluster "CID_FMT
, s
->nickname
, SID_ARGS(&s
->sid
),
3742 rq
->address
, CID_ARGS(&raft
->cid
));
3743 raft_send_append_request(raft
, s
, 0, "initialize new server");
3747 raft_handle_add_server_reply(struct raft
*raft
,
3748 const struct raft_add_server_reply
*rpy
)
3750 if (!raft
->joining
) {
3751 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
3752 VLOG_WARN_RL(&rl
, "received add_server_reply even though we're "
3753 "already part of the cluster");
3758 raft
->joining
= false;
3760 /* It is tempting, at this point, to check that this server is part of
3761 * the current configuration. However, this is not necessarily the
3762 * case, because the log entry that added this server to the cluster
3763 * might have been committed by a majority of the cluster that does not
3764 * include this one. This actually happens in testing. */
3766 const char *address
;
3767 SSET_FOR_EACH (address
, &rpy
->remote_addresses
) {
3768 if (sset_add(&raft
->remote_addresses
, address
)) {
3769 VLOG_INFO("%s: learned new server address for joining cluster",
3776 /* This is called by raft_unixctl_kick() as well as via RPC. */
3778 raft_handle_remove_server_request(struct raft
*raft
,
3779 const struct raft_remove_server_request
*rq
)
3781 /* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
3782 if (raft
->role
!= RAFT_LEADER
) {
3783 raft_send_remove_server_reply(raft
, rq
, false, RAFT_SERVER_NOT_LEADER
);
3787 /* If the server to remove is currently waiting to be added, cancel it. */
3788 struct raft_server
*target
= raft_find_new_server(raft
, &rq
->sid
);
3790 raft_send_add_server_reply__(raft
, &target
->sid
, target
->address
,
3791 false, RAFT_SERVER_CANCELED
);
3792 hmap_remove(&raft
->add_servers
, &target
->hmap_node
);
3793 raft_server_destroy(target
);
3797 /* If the server isn't configured, report that. */
3798 target
= raft_find_server(raft
, &rq
->sid
);
3800 raft_send_remove_server_reply(raft
, rq
,
3801 true, RAFT_SERVER_ALREADY_GONE
);
3805 /* Check whether we're waiting for the addition of the server to commit. */
3806 if (target
->phase
== RAFT_PHASE_COMMITTING
) {
3807 raft_send_remove_server_reply(raft
, rq
, false, RAFT_SERVER_COMMITTING
);
3811 /* Check whether the server is already scheduled for removal. */
3812 if (target
->phase
== RAFT_PHASE_REMOVE
) {
3813 raft_send_remove_server_reply(raft
, rq
,
3814 false, RAFT_SERVER_IN_PROGRESS
);
3818 /* Make sure that if we remove this server then that at least one other
3819 * server will be left. We don't count servers currently being added (in
3820 * 'add_servers') since those could fail. */
3821 struct raft_server
*s
;
3823 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
3824 if (s
!= target
&& s
->phase
!= RAFT_PHASE_REMOVE
) {
3829 raft_send_remove_server_reply(raft
, rq
, false, RAFT_SERVER_EMPTY
);
3833 /* Mark the server for removal. */
3834 target
->phase
= RAFT_PHASE_REMOVE
;
3835 if (rq
->requester_conn
) {
3836 target
->requester_sid
= UUID_ZERO
;
3837 unixctl_command_reply(rq
->requester_conn
, "started removal");
3839 target
->requester_sid
= rq
->common
.sid
;
3840 target
->requester_conn
= NULL
;
3843 raft_run_reconfigure(raft
);
3844 /* Operation in progress, reply will be sent later. */
3848 raft_finished_leaving_cluster(struct raft
*raft
)
3850 VLOG_INFO(SID_FMT
": finished leaving cluster "CID_FMT
,
3851 SID_ARGS(&raft
->sid
), CID_ARGS(&raft
->cid
));
3853 raft_record_note(raft
, "left", "this server left the cluster");
3855 raft
->leaving
= false;
3860 raft_handle_remove_server_reply(struct raft
*raft
,
3861 const struct raft_remove_server_reply
*rpc
)
3864 && (uuid_is_zero(&rpc
->target_sid
)
3865 || uuid_equals(&rpc
->target_sid
, &raft
->sid
))) {
3866 raft_finished_leaving_cluster(raft
);
3871 raft_handle_write_error(struct raft
*raft
, struct ovsdb_error
*error
)
3873 if (error
&& !raft
->failed
) {
3874 raft
->failed
= true;
3876 char *s
= ovsdb_error_to_string_free(error
);
3877 VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
3881 return !raft
->failed
;
3884 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
3885 raft_write_snapshot(struct raft
*raft
, struct ovsdb_log
*log
,
3886 uint64_t new_log_start
,
3887 const struct raft_entry
*new_snapshot
)
3889 struct raft_header h
= {
3893 .local_address
= raft
->local_address
,
3894 .snap_index
= new_log_start
- 1,
3895 .snap
= *new_snapshot
,
3897 struct ovsdb_error
*error
= ovsdb_log_write_and_free(
3898 log
, raft_header_to_json(&h
));
3902 ovsdb_log_mark_base(raft
->log
);
3904 /* Write log records. */
3905 for (uint64_t index
= new_log_start
; index
< raft
->log_end
; index
++) {
3906 const struct raft_entry
*e
= &raft
->entries
[index
- raft
->log_start
];
3907 struct raft_record r
= {
3908 .type
= RAFT_REC_ENTRY
,
3913 .servers
= e
->servers
,
3914 .election_timer
= e
->election_timer
,
3918 error
= ovsdb_log_write_and_free(log
, raft_record_to_json(&r
));
3924 /* Write term and vote (if any).
3926 * The term is redundant if we wrote a log record for that term above. The
3927 * vote, if any, is never redundant.
3929 error
= raft_write_state(log
, raft
->term
, &raft
->vote
);
3934 /* Write commit_index if it's beyond the new start of the log. */
3935 if (raft
->commit_index
>= new_log_start
) {
3936 struct raft_record r
= {
3937 .type
= RAFT_REC_COMMIT_INDEX
,
3938 .commit_index
= raft
->commit_index
,
3940 return ovsdb_log_write_and_free(log
, raft_record_to_json(&r
));
3945 static struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
3946 raft_save_snapshot(struct raft
*raft
,
3947 uint64_t new_start
, const struct raft_entry
*new_snapshot
)
3950 struct ovsdb_log
*new_log
;
3951 struct ovsdb_error
*error
;
3952 error
= ovsdb_log_replace_start(raft
->log
, &new_log
);
3957 error
= raft_write_snapshot(raft
, new_log
, new_start
, new_snapshot
);
3959 ovsdb_log_replace_abort(new_log
);
3963 return ovsdb_log_replace_commit(raft
->log
, new_log
);
3967 raft_handle_install_snapshot_request__(
3968 struct raft
*raft
, const struct raft_install_snapshot_request
*rq
)
3970 raft_reset_election_timer(raft
);
3973 * Our behavior here depend on new_log_start in the snapshot compared to
3974 * log_start and log_end. There are three cases:
3976 * Case 1 | Case 2 | Case 3
3977 * <---------------->|<------------->|<------------------>
3981 * T | T | T | T | T |
3987 uint64_t new_log_start
= rq
->last_index
+ 1;
3988 if (new_log_start
< raft
->log_start
) {
3989 /* Case 1: The new snapshot covers less than our current one. Nothing
3992 } else if (new_log_start
< raft
->log_end
) {
3993 /* Case 2: The new snapshot starts in the middle of our log. We could
3994 * discard the first 'new_log_start - raft->log_start' entries in the
3995 * log. But there's not much value in that, since snapshotting is
3996 * supposed to be a local decision. Just skip it. */
4000 /* Case 3: The new snapshot starts past the end of our current log, so
4001 * discard all of our current log. */
4002 const struct raft_entry new_snapshot
= {
4003 .term
= rq
->last_term
,
4005 .eid
= rq
->last_eid
,
4006 .servers
= rq
->last_servers
,
4007 .election_timer
= rq
->election_timer
,
4009 struct ovsdb_error
*error
= raft_save_snapshot(raft
, new_log_start
,
4012 char *error_s
= ovsdb_error_to_string_free(error
);
4013 VLOG_WARN("could not save snapshot: %s", error_s
);
4018 for (size_t i
= 0; i
< raft
->log_end
- raft
->log_start
; i
++) {
4019 raft_entry_uninit(&raft
->entries
[i
]);
4021 raft
->log_start
= raft
->log_end
= new_log_start
;
4022 raft
->log_synced
= raft
->log_end
- 1;
4023 raft
->commit_index
= raft
->log_start
- 1;
4024 if (raft
->last_applied
< raft
->commit_index
) {
4025 raft
->last_applied
= raft
->log_start
- 2;
4028 raft_entry_uninit(&raft
->snap
);
4029 raft_entry_clone(&raft
->snap
, &new_snapshot
);
4031 raft_get_servers_from_log(raft
, VLL_INFO
);
4032 raft_get_election_timer_from_log(raft
);
4038 raft_handle_install_snapshot_request(
4039 struct raft
*raft
, const struct raft_install_snapshot_request
*rq
)
4041 if (raft_handle_install_snapshot_request__(raft
, rq
)) {
4042 union raft_rpc rpy
= {
4043 .install_snapshot_reply
= {
4045 .type
= RAFT_RPC_INSTALL_SNAPSHOT_REPLY
,
4046 .sid
= rq
->common
.sid
,
4049 .last_index
= rq
->last_index
,
4050 .last_term
= rq
->last_term
,
4053 raft_send(raft
, &rpy
);
4058 raft_handle_install_snapshot_reply(
4059 struct raft
*raft
, const struct raft_install_snapshot_reply
*rpy
)
4061 /* We might get an InstallSnapshot reply from a configured server (e.g. a
4062 * peer) or a server in the process of being added. */
4063 struct raft_server
*s
= raft_find_peer(raft
, &rpy
->common
.sid
);
4065 s
= raft_find_new_server(raft
, &rpy
->common
.sid
);
4067 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
4068 VLOG_INFO_RL(&rl
, "cluster "CID_FMT
": received %s from "
4069 "unknown server "SID_FMT
, CID_ARGS(&raft
->cid
),
4070 raft_rpc_type_to_string(rpy
->common
.type
),
4071 SID_ARGS(&rpy
->common
.sid
));
4076 s
->install_snapshot_request_in_progress
= false;
4078 if (rpy
->last_index
!= raft
->log_start
- 1 ||
4079 rpy
->last_term
!= raft
->snap
.term
) {
4080 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
4081 VLOG_INFO_RL(&rl
, "cluster "CID_FMT
": server %s installed "
4082 "out-of-date snapshot, starting over",
4083 CID_ARGS(&raft
->cid
), s
->nickname
);
4084 raft_send_install_snapshot_request(raft
, s
,
4085 "installed obsolete snapshot");
4089 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(10, 10);
4090 VLOG_INFO_RL(&rl
, "cluster "CID_FMT
": installed snapshot on server %s "
4091 " up to %"PRIu64
":%"PRIu64
, CID_ARGS(&raft
->cid
),
4092 s
->nickname
, rpy
->last_term
, rpy
->last_index
);
4093 s
->next_index
= raft
->log_start
;
4094 raft_send_append_request(raft
, s
, raft
->log_end
- s
->next_index
,
4095 "snapshot installed");
4098 /* Returns true if 'raft' has grown enough since the last snapshot that
4099 * reducing the log to a snapshot would be valuable, false otherwise. */
4101 raft_grew_lots(const struct raft
*raft
)
4103 return ovsdb_log_grew_lots(raft
->log
);
4106 /* Returns the number of log entries that could be trimmed off the on-disk log
4107 * by snapshotting. */
4109 raft_get_log_length(const struct raft
*raft
)
4111 return (raft
->last_applied
< raft
->log_start
4113 : raft
->last_applied
- raft
->log_start
+ 1);
4116 /* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
4119 raft_may_snapshot(const struct raft
*raft
)
4121 return (!raft
->joining
4125 && raft
->last_applied
>= raft
->log_start
);
4128 /* Replaces the log for 'raft', up to the last log entry read, by
4129 * 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
4130 * the caller must eventually free.
4132 * This function can only succeed if raft_may_snapshot() returns true. It is
4133 * only valuable to call it if raft_get_log_length() is significant and
4134 * especially if raft_grew_lots() returns true. */
4135 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
4136 raft_store_snapshot(struct raft
*raft
, const struct json
*new_snapshot_data
)
4138 if (raft
->joining
) {
4139 return ovsdb_error(NULL
,
4140 "cannot store a snapshot while joining cluster");
4141 } else if (raft
->leaving
) {
4142 return ovsdb_error(NULL
,
4143 "cannot store a snapshot while leaving cluster");
4144 } else if (raft
->left
) {
4145 return ovsdb_error(NULL
,
4146 "cannot store a snapshot after leaving cluster");
4147 } else if (raft
->failed
) {
4148 return ovsdb_error(NULL
,
4149 "cannot store a snapshot following failure");
4152 if (raft
->last_applied
< raft
->log_start
) {
4153 return ovsdb_error(NULL
, "not storing a duplicate snapshot");
4156 uint64_t new_log_start
= raft
->last_applied
+ 1;
4157 struct raft_entry new_snapshot
= {
4158 .term
= raft_get_term(raft
, new_log_start
- 1),
4159 .data
= json_clone(new_snapshot_data
),
4160 .eid
= *raft_get_eid(raft
, new_log_start
- 1),
4161 .servers
= json_clone(raft_servers_for_index(raft
, new_log_start
- 1)),
4162 .election_timer
= raft
->election_timer
,
4164 struct ovsdb_error
*error
= raft_save_snapshot(raft
, new_log_start
,
4167 raft_entry_uninit(&new_snapshot
);
4171 raft
->log_synced
= raft
->log_end
- 1;
4172 raft_entry_uninit(&raft
->snap
);
4173 raft
->snap
= new_snapshot
;
4174 for (size_t i
= 0; i
< new_log_start
- raft
->log_start
; i
++) {
4175 raft_entry_uninit(&raft
->entries
[i
]);
4177 memmove(&raft
->entries
[0], &raft
->entries
[new_log_start
- raft
->log_start
],
4178 (raft
->log_end
- new_log_start
) * sizeof *raft
->entries
);
4179 raft
->log_start
= new_log_start
;
4184 raft_handle_become_leader(struct raft
*raft
,
4185 const struct raft_become_leader
*rq
)
4187 if (raft
->role
== RAFT_FOLLOWER
) {
4188 char buf
[SID_LEN
+ 1];
4189 VLOG_INFO("received leadership transfer from %s in term %"PRIu64
,
4190 raft_get_nickname(raft
, &rq
->common
.sid
, buf
, sizeof buf
),
4192 raft_start_election(raft
, true);
4197 raft_send_execute_command_reply(struct raft
*raft
,
4198 const struct uuid
*sid
,
4199 const struct uuid
*eid
,
4200 enum raft_command_status status
,
4201 uint64_t commit_index
)
4203 if (failure_test
== FT_CRASH_BEFORE_SEND_EXEC_REP
) {
4204 ovs_fatal(0, "Raft test: crash before sending execute_command_reply");
4206 union raft_rpc rpc
= {
4207 .execute_command_reply
= {
4209 .type
= RAFT_RPC_EXECUTE_COMMAND_REPLY
,
4214 .commit_index
= commit_index
,
4217 raft_send(raft
, &rpc
);
4218 if (failure_test
== FT_CRASH_AFTER_SEND_EXEC_REP
) {
4219 ovs_fatal(0, "Raft test: crash after sending execute_command_reply.");
4223 static enum raft_command_status
4224 raft_handle_execute_command_request__(
4225 struct raft
*raft
, const struct raft_execute_command_request
*rq
)
4227 if (raft
->role
!= RAFT_LEADER
) {
4228 return RAFT_CMD_NOT_LEADER
;
4231 const struct uuid
*current_eid
= raft_current_eid(raft
);
4232 if (!uuid_equals(&rq
->prereq
, current_eid
)) {
4233 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
4234 VLOG_INFO_RL(&rl
, "current entry eid "UUID_FMT
" does not match "
4235 "prerequisite "UUID_FMT
" in execute_command_request",
4236 UUID_ARGS(current_eid
), UUID_ARGS(&rq
->prereq
));
4237 return RAFT_CMD_BAD_PREREQ
;
4240 struct raft_command
*cmd
= raft_command_initiate(raft
, rq
->data
,
4241 NULL
, 0, &rq
->result
);
4242 cmd
->sid
= rq
->common
.sid
;
4244 enum raft_command_status status
= cmd
->status
;
4245 raft_command_unref(cmd
);
4250 raft_handle_execute_command_request(
4251 struct raft
*raft
, const struct raft_execute_command_request
*rq
)
4253 enum raft_command_status status
4254 = raft_handle_execute_command_request__(raft
, rq
);
4255 if (status
!= RAFT_CMD_INCOMPLETE
) {
4256 raft_send_execute_command_reply(raft
, &rq
->common
.sid
, &rq
->result
,
4262 raft_handle_execute_command_reply(
4263 struct raft
*raft
, const struct raft_execute_command_reply
*rpy
)
4265 struct raft_command
*cmd
= raft_find_command_by_eid(raft
, &rpy
->result
);
4267 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(5, 5);
4268 char buf
[SID_LEN
+ 1];
4270 "%s received \"%s\" reply from %s for unknown command",
4271 raft
->local_nickname
,
4272 raft_command_status_to_string(rpy
->status
),
4273 raft_get_nickname(raft
, &rpy
->common
.sid
,
4278 if (rpy
->status
== RAFT_CMD_INCOMPLETE
) {
4279 cmd
->timestamp
= time_msec();
4281 cmd
->index
= rpy
->commit_index
;
4282 raft_command_complete(raft
, cmd
, rpy
->status
);
4287 raft_handle_rpc(struct raft
*raft
, const union raft_rpc
*rpc
)
4289 struct raft_server
*s
= raft_find_server(raft
, &rpc
->common
.sid
);
4291 s
->last_msg_ts
= time_msec();
4294 uint64_t term
= raft_rpc_get_term(rpc
);
4296 && !raft_should_suppress_disruptive_server(raft
, rpc
)
4297 && !raft_receive_term__(raft
, &rpc
->common
, term
)) {
4298 if (rpc
->type
== RAFT_RPC_APPEND_REQUEST
) {
4299 /* Section 3.3: "If a server receives a request with a stale term
4300 * number, it rejects the request." */
4301 raft_send_append_reply(raft
, raft_append_request_cast(rpc
),
4302 RAFT_APPEND_INCONSISTENCY
, "stale term");
4307 switch (rpc
->type
) {
4308 #define RAFT_RPC(ENUM, NAME) \
4310 raft_handle_##NAME(raft, &rpc->NAME); \
4320 raft_rpc_is_heartbeat(const union raft_rpc
*rpc
)
4322 return ((rpc
->type
== RAFT_RPC_APPEND_REQUEST
4323 || rpc
->type
== RAFT_RPC_APPEND_REPLY
)
4324 && rpc
->common
.comment
4325 && !strcmp(rpc
->common
.comment
, "heartbeat"));
4330 raft_send_to_conn_at(struct raft
*raft
, const union raft_rpc
*rpc
,
4331 struct raft_conn
*conn
, int line_number
)
4333 log_rpc(rpc
, "-->", conn
, line_number
);
4334 return !jsonrpc_session_send(
4335 conn
->js
, raft_rpc_to_jsonrpc(&raft
->cid
, &raft
->sid
, rpc
));
4339 raft_is_rpc_synced(const struct raft
*raft
, const union raft_rpc
*rpc
)
4341 uint64_t term
= raft_rpc_get_term(rpc
);
4342 uint64_t index
= raft_rpc_get_min_sync_index(rpc
);
4343 const struct uuid
*vote
= raft_rpc_get_vote(rpc
);
4345 return (term
<= raft
->synced_term
4346 && index
<= raft
->log_synced
4347 && (!vote
|| uuid_equals(vote
, &raft
->synced_vote
)));
4351 raft_send_at(struct raft
*raft
, const union raft_rpc
*rpc
, int line_number
)
4353 const struct uuid
*dst
= &rpc
->common
.sid
;
4354 if (uuid_equals(dst
, &raft
->sid
)) {
4355 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 1);
4356 VLOG_WARN_RL(&rl
, "attempted to send RPC to self from raft.c:%d",
4361 struct raft_conn
*conn
= raft_find_conn_by_sid(raft
, dst
);
4363 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(1, 1);
4364 char buf
[SID_LEN
+ 1];
4365 VLOG_DBG_RL(&rl
, "%s: no connection to %s, cannot send RPC "
4366 "from raft.c:%d", raft
->local_nickname
,
4367 raft_get_nickname(raft
, dst
, buf
, sizeof buf
),
4372 if (!raft_is_rpc_synced(raft
, rpc
)) {
4373 raft_waiter_create(raft
, RAFT_W_RPC
, false)->rpc
= raft_rpc_clone(rpc
);
4377 return raft_send_to_conn_at(raft
, rpc
, conn
, line_number
);
4380 static struct raft
*
4381 raft_lookup_by_name(const char *name
)
4385 HMAP_FOR_EACH_WITH_HASH (raft
, hmap_node
, hash_string(name
, 0),
4387 if (!strcmp(raft
->name
, name
)) {
4395 raft_unixctl_cid(struct unixctl_conn
*conn
,
4396 int argc OVS_UNUSED
, const char *argv
[],
4397 void *aux OVS_UNUSED
)
4399 struct raft
*raft
= raft_lookup_by_name(argv
[1]);
4401 unixctl_command_reply_error(conn
, "unknown cluster");
4402 } else if (uuid_is_zero(&raft
->cid
)) {
4403 unixctl_command_reply_error(conn
, "cluster id not yet known");
4405 char *uuid
= xasprintf(UUID_FMT
, UUID_ARGS(&raft
->cid
));
4406 unixctl_command_reply(conn
, uuid
);
4412 raft_unixctl_sid(struct unixctl_conn
*conn
,
4413 int argc OVS_UNUSED
, const char *argv
[],
4414 void *aux OVS_UNUSED
)
4416 struct raft
*raft
= raft_lookup_by_name(argv
[1]);
4418 unixctl_command_reply_error(conn
, "unknown cluster");
4420 char *uuid
= xasprintf(UUID_FMT
, UUID_ARGS(&raft
->sid
));
4421 unixctl_command_reply(conn
, uuid
);
4427 raft_put_sid(const char *title
, const struct uuid
*sid
,
4428 const struct raft
*raft
, struct ds
*s
)
4430 ds_put_format(s
, "%s: ", title
);
4431 if (uuid_equals(sid
, &raft
->sid
)) {
4432 ds_put_cstr(s
, "self");
4433 } else if (uuid_is_zero(sid
)) {
4434 ds_put_cstr(s
, "unknown");
4436 char buf
[SID_LEN
+ 1];
4437 ds_put_cstr(s
, raft_get_nickname(raft
, sid
, buf
, sizeof buf
));
4439 ds_put_char(s
, '\n');
4443 raft_unixctl_status(struct unixctl_conn
*conn
,
4444 int argc OVS_UNUSED
, const char *argv
[],
4445 void *aux OVS_UNUSED
)
4447 struct raft
*raft
= raft_lookup_by_name(argv
[1]);
4449 unixctl_command_reply_error(conn
, "unknown cluster");
4453 struct ds s
= DS_EMPTY_INITIALIZER
;
4454 ds_put_format(&s
, "%s\n", raft
->local_nickname
);
4455 ds_put_format(&s
, "Name: %s\n", raft
->name
);
4456 ds_put_format(&s
, "Cluster ID: ");
4457 if (!uuid_is_zero(&raft
->cid
)) {
4458 ds_put_format(&s
, CID_FMT
" ("UUID_FMT
")\n",
4459 CID_ARGS(&raft
->cid
), UUID_ARGS(&raft
->cid
));
4461 ds_put_format(&s
, "not yet known\n");
4463 ds_put_format(&s
, "Server ID: "SID_FMT
" ("UUID_FMT
")\n",
4464 SID_ARGS(&raft
->sid
), UUID_ARGS(&raft
->sid
));
4465 ds_put_format(&s
, "Address: %s\n", raft
->local_address
);
4466 ds_put_format(&s
, "Status: %s\n",
4467 raft
->joining
? "joining cluster"
4468 : raft
->leaving
? "leaving cluster"
4469 : raft
->left
? "left cluster"
4470 : raft
->failed
? "failed"
4471 : "cluster member");
4472 if (raft
->joining
) {
4473 ds_put_format(&s
, "Remotes for joining:");
4474 const char *address
;
4475 SSET_FOR_EACH (address
, &raft
->remote_addresses
) {
4476 ds_put_format(&s
, " %s", address
);
4478 ds_put_char(&s
, '\n');
4480 if (raft
->role
== RAFT_LEADER
) {
4481 struct raft_server
*as
;
4482 HMAP_FOR_EACH (as
, hmap_node
, &raft
->add_servers
) {
4483 ds_put_format(&s
, "Adding server %s ("SID_FMT
" at %s) (%s)\n",
4484 as
->nickname
, SID_ARGS(&as
->sid
), as
->address
,
4485 raft_server_phase_to_string(as
->phase
));
4488 struct raft_server
*rs
= raft
->remove_server
;
4490 ds_put_format(&s
, "Removing server %s ("SID_FMT
" at %s) (%s)\n",
4491 rs
->nickname
, SID_ARGS(&rs
->sid
), rs
->address
,
4492 raft_server_phase_to_string(rs
->phase
));
4496 ds_put_format(&s
, "Role: %s\n",
4497 raft
->role
== RAFT_LEADER
? "leader"
4498 : raft
->role
== RAFT_CANDIDATE
? "candidate"
4499 : raft
->role
== RAFT_FOLLOWER
? "follower"
4501 ds_put_format(&s
, "Term: %"PRIu64
"\n", raft
->term
);
4502 raft_put_sid("Leader", &raft
->leader_sid
, raft
, &s
);
4503 raft_put_sid("Vote", &raft
->vote
, raft
, &s
);
4504 ds_put_char(&s
, '\n');
4506 if (raft
->election_start
) {
4508 "Last Election started %"PRIu64
" ms ago, reason: %s\n",
4509 (uint64_t) (time_msec() - raft
->election_start
),
4510 raft
->leadership_transfer
4511 ? "leadership_transfer" : "timeout");
4513 if (raft
->election_won
) {
4514 ds_put_format(&s
, "Last Election won: %"PRIu64
" ms ago\n",
4515 (uint64_t) (time_msec() - raft
->election_won
));
4517 ds_put_format(&s
, "Election timer: %"PRIu64
, raft
->election_timer
);
4518 if (raft
->role
== RAFT_LEADER
&& raft
->election_timer_new
) {
4519 ds_put_format(&s
, " (changing to %"PRIu64
")",
4520 raft
->election_timer_new
);
4522 ds_put_char(&s
, '\n');
4524 ds_put_format(&s
, "Log: [%"PRIu64
", %"PRIu64
"]\n",
4525 raft
->log_start
, raft
->log_end
);
4527 uint64_t n_uncommitted
= raft
->log_end
- raft
->commit_index
- 1;
4528 ds_put_format(&s
, "Entries not yet committed: %"PRIu64
"\n", n_uncommitted
);
4530 uint64_t n_unapplied
= raft
->log_end
- raft
->last_applied
- 1;
4531 ds_put_format(&s
, "Entries not yet applied: %"PRIu64
"\n", n_unapplied
);
4533 const struct raft_conn
*c
;
4534 ds_put_cstr(&s
, "Connections:");
4535 LIST_FOR_EACH (c
, list_node
, &raft
->conns
) {
4536 bool connected
= jsonrpc_session_is_connected(c
->js
);
4537 ds_put_format(&s
, " %s%s%s%s",
4538 connected
? "" : "(",
4539 c
->incoming
? "<-" : "->", c
->nickname
,
4540 connected
? "" : ")");
4542 ds_put_char(&s
, '\n');
4544 ds_put_format(&s
, "Disconnections: %u\n", raft
->n_disconnections
);
4546 ds_put_cstr(&s
, "Servers:\n");
4547 struct raft_server
*server
;
4548 HMAP_FOR_EACH (server
, hmap_node
, &raft
->servers
) {
4549 ds_put_format(&s
, " %s ("SID_FMT
" at %s)",
4551 SID_ARGS(&server
->sid
), server
->address
);
4552 if (uuid_equals(&server
->sid
, &raft
->sid
)) {
4553 ds_put_cstr(&s
, " (self)");
4555 if (server
->phase
!= RAFT_PHASE_STABLE
) {
4556 ds_put_format (&s
, " (%s)",
4557 raft_server_phase_to_string(server
->phase
));
4559 if (raft
->role
== RAFT_CANDIDATE
) {
4560 if (!uuid_is_zero(&server
->vote
)) {
4561 char buf
[SID_LEN
+ 1];
4562 ds_put_format(&s
, " (voted for %s)",
4563 raft_get_nickname(raft
, &server
->vote
,
4566 } else if (raft
->role
== RAFT_LEADER
) {
4567 ds_put_format(&s
, " next_index=%"PRIu64
" match_index=%"PRIu64
,
4568 server
->next_index
, server
->match_index
);
4570 if (server
->last_msg_ts
) {
4571 ds_put_format(&s
, " last msg %"PRIu64
" ms ago",
4572 (uint64_t) (time_msec() - server
->last_msg_ts
));
4574 ds_put_char(&s
, '\n');
4577 unixctl_command_reply(conn
, ds_cstr(&s
));
4582 raft_unixctl_leave__(struct unixctl_conn
*conn
, struct raft
*raft
)
4584 if (raft_is_leaving(raft
)) {
4585 unixctl_command_reply_error(conn
,
4586 "already in progress leaving cluster");
4587 } else if (raft_is_joining(raft
)) {
4588 unixctl_command_reply_error(conn
,
4589 "can't leave while join in progress");
4590 } else if (raft_failed(raft
)) {
4591 unixctl_command_reply_error(conn
,
4592 "can't leave after failure");
4595 unixctl_command_reply(conn
, NULL
);
4600 raft_unixctl_leave(struct unixctl_conn
*conn
, int argc OVS_UNUSED
,
4601 const char *argv
[], void *aux OVS_UNUSED
)
4603 struct raft
*raft
= raft_lookup_by_name(argv
[1]);
4605 unixctl_command_reply_error(conn
, "unknown cluster");
4609 raft_unixctl_leave__(conn
, raft
);
4612 static struct raft_server
*
4613 raft_lookup_server_best_match(struct raft
*raft
, const char *id
)
4615 struct raft_server
*best
= NULL
;
4616 int best_score
= -1;
4619 struct raft_server
*s
;
4620 HMAP_FOR_EACH (s
, hmap_node
, &raft
->servers
) {
4621 int score
= (!strcmp(id
, s
->address
)
4623 : uuid_is_partial_match(&s
->sid
, id
));
4624 if (score
> best_score
) {
4628 } else if (score
== best_score
) {
4632 return n_best
== 1 ? best
: NULL
;
4636 raft_unixctl_kick(struct unixctl_conn
*conn
, int argc OVS_UNUSED
,
4637 const char *argv
[], void *aux OVS_UNUSED
)
4639 const char *cluster_name
= argv
[1];
4640 const char *server_name
= argv
[2];
4642 struct raft
*raft
= raft_lookup_by_name(cluster_name
);
4644 unixctl_command_reply_error(conn
, "unknown cluster");
4648 struct raft_server
*server
= raft_lookup_server_best_match(raft
,
4651 unixctl_command_reply_error(conn
, "unknown server");
4655 if (uuid_equals(&server
->sid
, &raft
->sid
)) {
4656 raft_unixctl_leave__(conn
, raft
);
4657 } else if (raft
->role
== RAFT_LEADER
) {
4658 const struct raft_remove_server_request rq
= {
4660 .requester_conn
= conn
,
4662 raft_handle_remove_server_request(raft
, &rq
);
4664 const union raft_rpc rpc
= {
4665 .remove_server_request
= {
4667 .type
= RAFT_RPC_REMOVE_SERVER_REQUEST
,
4668 .sid
= raft
->leader_sid
,
4669 .comment
= "via unixctl"
4674 if (raft_send(raft
, &rpc
)) {
4675 unixctl_command_reply(conn
, "sent removal request to leader");
4677 unixctl_command_reply_error(conn
,
4678 "failed to send removal request");
4684 raft_get_election_timer_from_log(struct raft
*raft
)
4686 if (raft
->snap
.election_timer
) {
4687 raft
->election_timer
= raft
->snap
.election_timer
;
4689 for (uint64_t index
= raft
->commit_index
; index
>= raft
->log_start
;
4691 struct raft_entry
*e
= &raft
->entries
[index
- raft
->log_start
];
4692 if (e
->election_timer
) {
4693 raft
->election_timer
= e
->election_timer
;
4700 raft_log_election_timer(struct raft
*raft
)
4702 raft_command_unref(raft_command_execute__(raft
, NULL
, NULL
,
4703 raft
->election_timer_new
, NULL
,
4708 raft_unixctl_change_election_timer(struct unixctl_conn
*conn
,
4709 int argc OVS_UNUSED
, const char *argv
[],
4710 void *aux OVS_UNUSED
)
4712 const char *cluster_name
= argv
[1];
4713 const char *election_timer_str
= argv
[2];
4715 struct raft
*raft
= raft_lookup_by_name(cluster_name
);
4717 unixctl_command_reply_error(conn
, "unknown cluster");
4721 if (raft
->role
!= RAFT_LEADER
) {
4722 unixctl_command_reply_error(conn
, "election timer must be changed"
4723 " through leader.");
4727 /* If there are pending changes for election timer, reject it. */
4728 if (raft
->election_timer_new
) {
4729 unixctl_command_reply_error(conn
, "election timer change pending.");
4733 uint64_t election_timer
= atoll(election_timer_str
);
4734 if (election_timer
== raft
->election_timer
) {
4735 unixctl_command_reply(conn
, "change election timer to current value.");
4739 /* Election timer smaller than 100ms or bigger than 10min doesn't make
4741 if (election_timer
< 100 || election_timer
> 600000) {
4742 unixctl_command_reply_error(conn
, "election timer must be between "
4743 "100 and 600000, in msec.");
4747 /* If election timer is to be enlarged, it should be done gradually so that
4748 * it won't cause timeout when new value is applied on leader but not yet
4749 * applied on some of the followers. */
4750 if (election_timer
> raft
->election_timer
* 2) {
4751 unixctl_command_reply_error(conn
, "election timer increase should not "
4752 "exceed the current value x 2.");
4756 raft
->election_timer_new
= election_timer
;
4757 raft_log_election_timer(raft
);
4758 unixctl_command_reply(conn
, "change of election timer initiated.");
4762 raft_unixctl_set_backlog_threshold(struct unixctl_conn
*conn
,
4763 int argc OVS_UNUSED
, const char *argv
[],
4764 void *aux OVS_UNUSED
)
4766 const char *cluster_name
= argv
[1];
4767 unsigned long long n_msgs
, n_bytes
;
4768 struct raft_conn
*r_conn
;
4770 struct raft
*raft
= raft_lookup_by_name(cluster_name
);
4772 unixctl_command_reply_error(conn
, "unknown cluster");
4776 if (!str_to_ullong(argv
[2], 10, &n_msgs
)
4777 || !str_to_ullong(argv
[3], 10, &n_bytes
)) {
4778 unixctl_command_reply_error(conn
, "invalid argument");
4782 if (n_msgs
< 50 || n_msgs
> SIZE_MAX
|| n_bytes
> SIZE_MAX
) {
4783 unixctl_command_reply_error(conn
, "values out of range");
4787 raft
->conn_backlog_max_n_msgs
= n_msgs
;
4788 raft
->conn_backlog_max_n_bytes
= n_bytes
;
4790 LIST_FOR_EACH (r_conn
, list_node
, &raft
->conns
) {
4791 jsonrpc_session_set_backlog_threshold(r_conn
->js
, n_msgs
, n_bytes
);
4794 unixctl_command_reply(conn
, NULL
);
4798 raft_unixctl_failure_test(struct unixctl_conn
*conn OVS_UNUSED
,
4799 int argc OVS_UNUSED
, const char *argv
[],
4800 void *aux OVS_UNUSED
)
4802 const char *test
= argv
[1];
4803 if (!strcmp(test
, "crash-before-sending-append-request")) {
4804 failure_test
= FT_CRASH_BEFORE_SEND_APPEND_REQ
;
4805 } else if (!strcmp(test
, "crash-after-sending-append-request")) {
4806 failure_test
= FT_CRASH_AFTER_SEND_APPEND_REQ
;
4807 } else if (!strcmp(test
, "crash-before-sending-execute-command-reply")) {
4808 failure_test
= FT_CRASH_BEFORE_SEND_EXEC_REP
;
4809 } else if (!strcmp(test
, "crash-after-sending-execute-command-reply")) {
4810 failure_test
= FT_CRASH_AFTER_SEND_EXEC_REP
;
4811 } else if (!strcmp(test
, "crash-before-sending-execute-command-request")) {
4812 failure_test
= FT_CRASH_BEFORE_SEND_EXEC_REQ
;
4813 } else if (!strcmp(test
, "crash-after-sending-execute-command-request")) {
4814 failure_test
= FT_CRASH_AFTER_SEND_EXEC_REQ
;
4815 } else if (!strcmp(test
, "crash-after-receiving-append-request-update")) {
4816 failure_test
= FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE
;
4817 } else if (!strcmp(test
, "delay-election")) {
4818 failure_test
= FT_DELAY_ELECTION
;
4820 HMAP_FOR_EACH (raft
, hmap_node
, &all_rafts
) {
4821 if (raft
->role
== RAFT_FOLLOWER
) {
4822 raft_reset_election_timer(raft
);
4825 } else if (!strcmp(test
, "dont-send-vote-request")) {
4826 failure_test
= FT_DONT_SEND_VOTE_REQUEST
;
4827 } else if (!strcmp(test
, "clear")) {
4828 failure_test
= FT_NO_TEST
;
4829 unixctl_command_reply(conn
, "test dismissed");
4832 unixctl_command_reply_error(conn
, "unknown test scenario");
4835 unixctl_command_reply(conn
, "test engaged");
4841 static struct ovsthread_once once
= OVSTHREAD_ONCE_INITIALIZER
;
4842 if (!ovsthread_once_start(&once
)) {
4845 unixctl_command_register("cluster/cid", "DB", 1, 1,
4846 raft_unixctl_cid
, NULL
);
4847 unixctl_command_register("cluster/sid", "DB", 1, 1,
4848 raft_unixctl_sid
, NULL
);
4849 unixctl_command_register("cluster/status", "DB", 1, 1,
4850 raft_unixctl_status
, NULL
);
4851 unixctl_command_register("cluster/leave", "DB", 1, 1,
4852 raft_unixctl_leave
, NULL
);
4853 unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
4854 raft_unixctl_kick
, NULL
);
4855 unixctl_command_register("cluster/change-election-timer", "DB TIME", 2, 2,
4856 raft_unixctl_change_election_timer
, NULL
);
4857 unixctl_command_register("cluster/set-backlog-threshold",
4858 "DB N_MSGS N_BYTES", 3, 3,
4859 raft_unixctl_set_backlog_threshold
, NULL
);
4860 unixctl_command_register("cluster/failure-test", "FAILURE SCENARIO", 1, 1,
4861 raft_unixctl_failure_test
, NULL
);
4862 ovsthread_once_done(&once
);