#include "ovsdb/log.h"
#include "raft-rpc.h"
#include "random.h"
+#include "simap.h"
#include "socket-util.h"
#include "stream.h"
#include "timeval.h"
FT_CRASH_BEFORE_SEND_EXEC_REQ,
FT_CRASH_AFTER_SEND_EXEC_REQ,
FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE,
- FT_DELAY_ELECTION
+ FT_DELAY_ELECTION,
+ FT_DONT_SEND_VOTE_REQUEST
};
static enum raft_failure_test failure_test;
bool had_leader; /* There has been leader elected since last
election initiated. This is to help setting
candidate_retrying. */
+
+ /* For all. */
+ bool ever_had_leader; /* There has been leader elected since the raft
+ is initialized, meaning it is ever
+ connected. */
};
/* All Raft structures. */
} else {
raft_entry_clone(&raft->snap, &h.snap);
raft->log_start = raft->log_end = h.snap_index + 1;
- raft->commit_index = h.snap_index;
+ raft->log_synced = raft->commit_index = h.snap_index;
raft->last_applied = h.snap_index - 1;
}
&conn->sid);
conn->incoming = incoming;
conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
+ jsonrpc_session_set_probe_interval(js, 0);
}
/* Starts the local server in an existing Raft cluster, using the local copy of
return &raft->sid;
}
+/* Adds memory consumption info to 'usage' for later use by memory_report(). */
+void
+raft_get_memory_usage(const struct raft *raft, struct simap *usage)
+{
+ struct raft_conn *conn;
+ int cnt = 0;
+
+ LIST_FOR_EACH (conn, list_node, &raft->conns) {
+ simap_increase(usage, "raft-backlog",
+ jsonrpc_session_get_backlog(conn->js));
+ cnt++;
+ }
+ simap_increase(usage, "raft-connections", cnt);
+}
+
/* Returns true if 'raft' has completed joining its cluster, has not left or
* initiated leaving the cluster, does not have failed disk storage, and is
* apparently connected to the leader in a healthy way (or is itself the
&& !raft->joining
&& !raft->leaving
&& !raft->left
- && !raft->failed);
+ && !raft->failed
+ && raft->ever_had_leader);
VLOG_DBG("raft_is_connected: %s\n", ret? "true": "false");
return ret;
}
jsonrpc_session_run(conn->js);
unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
- bool just_connected = (new_seqno != conn->js_seqno
+ bool reconnected = new_seqno != conn->js_seqno;
+ bool just_connected = (reconnected
&& jsonrpc_session_is_connected(conn->js));
+
+ if (reconnected) {
+ /* Clear 'last_install_snapshot_request' since it might not reach the
+ * destination or server was restarted. */
+ struct raft_server *server = raft_find_server(raft, &conn->sid);
+ if (server) {
+ free(server->last_install_snapshot_request);
+ server->last_install_snapshot_request = NULL;
+ }
+ }
+
conn->js_seqno = new_seqno;
if (just_connected) {
if (raft->joining) {
}
ovs_assert(raft->role != RAFT_LEADER);
+
+ raft->leader_sid = UUID_ZERO;
raft->role = RAFT_CANDIDATE;
/* If there was no leader elected since last election, we know we are
* retrying now. */
.leadership_transfer = leadership_transfer,
},
};
- raft_send(raft, &rq);
+ if (failure_test != FT_DONT_SEND_VOTE_REQUEST) {
+ raft_send(raft, &rq);
+ }
}
/* Vote for ourselves. */
raft_set_leader(struct raft *raft, const struct uuid *sid)
{
raft->leader_sid = *sid;
- raft->had_leader = true;
+ raft->ever_had_leader = raft->had_leader = true;
raft->candidate_retrying = false;
}
raft->election_timer_new = 0;
raft_update_our_match_index(raft, raft->log_end - 1);
- raft_send_heartbeats(raft);
/* Write the fact that we are leader to the log. This is not used by the
* algorithm (although it could be, for quick restart), but it is used for
};
ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
}
+ if (raft->role == RAFT_CANDIDATE) {
+ /* Section 3.4: While waiting for votes, a candidate may
+ * receive an AppendEntries RPC from another server claiming to
+ * be leader. If the leader’s term (included in its RPC) is at
+ * least as large as the candidate’s current term, then the
+ * candidate recognizes the leader as legitimate and returns to
+ * follower state. */
+ raft->role = RAFT_FOLLOWER;
+ }
return true;
}
.last_servers = raft->snap.servers,
.last_eid = raft->snap.eid,
.data = raft->snap.data,
- .election_timer = raft->election_timer,
+ .election_timer = raft->election_timer, /* use latest value */
}
};
+
+ if (s->last_install_snapshot_request) {
+ struct raft_install_snapshot_request *old, *new;
+
+ old = s->last_install_snapshot_request;
+ new = &rpc.install_snapshot_request;
+ if ( old->term == new->term
+ && old->last_index == new->last_index
+ && old->last_term == new->last_term
+ && old->last_servers == new->last_servers
+ && old->data == new->data
+ && old->election_timer == new->election_timer
+ && uuid_equals(&old->last_eid, &new->last_eid)) {
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
+
+ VLOG_WARN_RL(&rl, "not sending exact same install_snapshot_request"
+ " to server %s again", s->nickname);
+ return;
+ }
+ }
+ free(s->last_install_snapshot_request);
+ CONST_CAST(struct raft_server *, s)->last_install_snapshot_request
+ = xmemdup(&rpc.install_snapshot_request,
+ sizeof rpc.install_snapshot_request);
+
raft_send(raft, &rpc);
}
raft_send_install_snapshot_request(raft, s, NULL);
} else if (s->next_index < raft->log_end) {
/* Case 2. */
- raft_send_append_request(raft, s, 1, NULL);
+ raft_send_append_request(raft, s, raft->log_end - s->next_index, NULL);
} else {
/* Case 3. */
if (s->phase == RAFT_PHASE_CATCHUP) {
VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
" up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
s->nickname, rpy->last_term, rpy->last_index);
- s->next_index = raft->log_end;
- raft_send_append_request(raft, s, 0, "snapshot installed");
+ s->next_index = raft->log_start;
+ raft_send_append_request(raft, s, raft->log_end - s->next_index,
+ "snapshot installed");
}
/* Returns true if 'raft' has grown enough since the last snapshot that
cmd->sid = rq->common.sid;
enum raft_command_status status = cmd->status;
- if (status != RAFT_CMD_INCOMPLETE) {
- raft_command_unref(cmd);
- }
+ raft_command_unref(cmd);
return status;
}
raft_reset_election_timer(raft);
}
}
+ } else if (!strcmp(test, "dont-send-vote-request")) {
+ failure_test = FT_DONT_SEND_VOTE_REQUEST;
} else if (!strcmp(test, "clear")) {
failure_test = FT_NO_TEST;
unixctl_command_reply(conn, "test dismissed");