]> git.proxmox.com Git - mirror_ovs.git/blobdiff - ovsdb/raft.c
raft: Avoid sending equal snapshots.
[mirror_ovs.git] / ovsdb / raft.c
index f354d50a51832cee7456df54a3a7c371e3196397..708b0624cf6535b5e6beca36f85839a58263f97f 100644 (file)
@@ -36,6 +36,7 @@
 #include "ovsdb/log.h"
 #include "raft-rpc.h"
 #include "random.h"
+#include "simap.h"
 #include "socket-util.h"
 #include "stream.h"
 #include "timeval.h"
@@ -73,7 +74,8 @@ enum raft_failure_test {
     FT_CRASH_BEFORE_SEND_EXEC_REQ,
     FT_CRASH_AFTER_SEND_EXEC_REQ,
     FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE,
-    FT_DELAY_ELECTION
+    FT_DELAY_ELECTION,
+    FT_DONT_SEND_VOTE_REQUEST
 };
 static enum raft_failure_test failure_test;
 
@@ -298,6 +300,11 @@ struct raft {
     bool had_leader;            /* There has been leader elected since last
                                    election initiated. This is to help setting
                                    candidate_retrying. */
+
+    /* For all. */
+    bool ever_had_leader;       /* There has been leader elected since the raft
+                                   is initialized, meaning it is ever
+                                   connected. */
 };
 
 /* All Raft structures. */
@@ -849,7 +856,7 @@ raft_read_header(struct raft *raft)
     } else {
         raft_entry_clone(&raft->snap, &h.snap);
         raft->log_start = raft->log_end = h.snap_index + 1;
-        raft->commit_index = h.snap_index;
+        raft->log_synced = raft->commit_index = h.snap_index;
         raft->last_applied = h.snap_index - 1;
     }
 
@@ -932,6 +939,7 @@ raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
                                               &conn->sid);
     conn->incoming = incoming;
     conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
+    jsonrpc_session_set_probe_interval(js, 0);
 }
 
 /* Starts the local server in an existing Raft cluster, using the local copy of
@@ -1007,6 +1015,21 @@ raft_get_sid(const struct raft *raft)
     return &raft->sid;
 }
 
+/* Adds memory consumption info to 'usage' for later use by memory_report(). */
+void
+raft_get_memory_usage(const struct raft *raft, struct simap *usage)
+{
+    struct raft_conn *conn;
+    int cnt = 0;
+
+    LIST_FOR_EACH (conn, list_node, &raft->conns) {
+        simap_increase(usage, "raft-backlog",
+                       jsonrpc_session_get_backlog(conn->js));
+        cnt++;
+    }
+    simap_increase(usage, "raft-connections", cnt);
+}
+
 /* Returns true if 'raft' has completed joining its cluster, has not left or
  * initiated leaving the cluster, does not have failed disk storage, and is
  * apparently connected to the leader in a healthy way (or is itself the
@@ -1024,7 +1047,8 @@ raft_is_connected(const struct raft *raft)
             && !raft->joining
             && !raft->leaving
             && !raft->left
-            && !raft->failed);
+            && !raft->failed
+            && raft->ever_had_leader);
     VLOG_DBG("raft_is_connected: %s\n", ret? "true": "false");
     return ret;
 }
@@ -1397,8 +1421,20 @@ raft_conn_run(struct raft *raft, struct raft_conn *conn)
     jsonrpc_session_run(conn->js);
 
     unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
-    bool just_connected = (new_seqno != conn->js_seqno
+    bool reconnected = new_seqno != conn->js_seqno;
+    bool just_connected = (reconnected
                            && jsonrpc_session_is_connected(conn->js));
+
+    if (reconnected) {
+        /* Clear 'last_install_snapshot_request' since it might not reach the
+         * destination or server was restarted. */
+        struct raft_server *server = raft_find_server(raft, &conn->sid);
+        if (server) {
+            free(server->last_install_snapshot_request);
+            server->last_install_snapshot_request = NULL;
+        }
+    }
+
     conn->js_seqno = new_seqno;
     if (just_connected) {
         if (raft->joining) {
@@ -1641,6 +1677,8 @@ raft_start_election(struct raft *raft, bool leadership_transfer)
     }
 
     ovs_assert(raft->role != RAFT_LEADER);
+
+    raft->leader_sid = UUID_ZERO;
     raft->role = RAFT_CANDIDATE;
     /* If there was no leader elected since last election, we know we are
      * retrying now. */
@@ -1684,7 +1722,9 @@ raft_start_election(struct raft *raft, bool leadership_transfer)
                 .leadership_transfer = leadership_transfer,
             },
         };
-        raft_send(raft, &rq);
+        if (failure_test != FT_DONT_SEND_VOTE_REQUEST) {
+            raft_send(raft, &rq);
+        }
     }
 
     /* Vote for ourselves. */
@@ -2519,7 +2559,7 @@ static void
 raft_set_leader(struct raft *raft, const struct uuid *sid)
 {
     raft->leader_sid = *sid;
-    raft->had_leader = true;
+    raft->ever_had_leader = raft->had_leader = true;
     raft->candidate_retrying = false;
 }
 
@@ -2547,7 +2587,6 @@ raft_become_leader(struct raft *raft)
     raft->election_timer_new = 0;
 
     raft_update_our_match_index(raft, raft->log_end - 1);
-    raft_send_heartbeats(raft);
 
     /* Write the fact that we are leader to the log.  This is not used by the
      * algorithm (although it could be, for quick restart), but it is used for
@@ -2960,6 +2999,15 @@ raft_update_leader(struct raft *raft, const struct uuid *sid)
         };
         ignore(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
     }
+    if (raft->role == RAFT_CANDIDATE) {
+        /* Section 3.4: While waiting for votes, a candidate may
+         * receive an AppendEntries RPC from another server claiming to
+         * be leader. If the leader’s term (included in its RPC) is at
+         * least as large as the candidate’s current term, then the
+         * candidate recognizes the leader as legitimate and returns to
+         * follower state. */
+        raft->role = RAFT_FOLLOWER;
+    }
     return true;
 }
 
@@ -3260,6 +3308,31 @@ raft_send_install_snapshot_request(struct raft *raft,
             .election_timer = raft->election_timer, /* use latest value */
         }
     };
+
+    if (s->last_install_snapshot_request) {
+        struct raft_install_snapshot_request *old, *new;
+
+        old = s->last_install_snapshot_request;
+        new = &rpc.install_snapshot_request;
+        if (   old->term           == new->term
+            && old->last_index     == new->last_index
+            && old->last_term      == new->last_term
+            && old->last_servers   == new->last_servers
+            && old->data           == new->data
+            && old->election_timer == new->election_timer
+            && uuid_equals(&old->last_eid, &new->last_eid)) {
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
+
+            VLOG_WARN_RL(&rl, "not sending exact same install_snapshot_request"
+                              " to server %s again", s->nickname);
+            return;
+        }
+    }
+    free(s->last_install_snapshot_request);
+    CONST_CAST(struct raft_server *, s)->last_install_snapshot_request
+        = xmemdup(&rpc.install_snapshot_request,
+                  sizeof rpc.install_snapshot_request);
+
     raft_send(raft, &rpc);
 }
 
@@ -3339,7 +3412,7 @@ raft_handle_append_reply(struct raft *raft,
         raft_send_install_snapshot_request(raft, s, NULL);
     } else if (s->next_index < raft->log_end) {
         /* Case 2. */
-        raft_send_append_request(raft, s, 1, NULL);
+        raft_send_append_request(raft, s, raft->log_end - s->next_index, NULL);
     } else {
         /* Case 3. */
         if (s->phase == RAFT_PHASE_CATCHUP) {
@@ -3992,8 +4065,9 @@ raft_handle_install_snapshot_reply(
     VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
                  " up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
                  s->nickname, rpy->last_term, rpy->last_index);
-    s->next_index = raft->log_end;
-    raft_send_append_request(raft, s, 0, "snapshot installed");
+    s->next_index = raft->log_start;
+    raft_send_append_request(raft, s, raft->log_end - s->next_index,
+                             "snapshot installed");
 }
 
 /* Returns true if 'raft' has grown enough since the last snapshot that
@@ -4143,9 +4217,7 @@ raft_handle_execute_command_request__(
     cmd->sid = rq->common.sid;
 
     enum raft_command_status status = cmd->status;
-    if (status != RAFT_CMD_INCOMPLETE) {
-        raft_command_unref(cmd);
-    }
+    raft_command_unref(cmd);
     return status;
 }
 
@@ -4667,6 +4739,8 @@ raft_unixctl_failure_test(struct unixctl_conn *conn OVS_UNUSED,
                 raft_reset_election_timer(raft);
             }
         }
+    } else if (!strcmp(test, "dont-send-vote-request")) {
+        failure_test = FT_DONT_SEND_VOTE_REQUEST;
     } else if (!strcmp(test, "clear")) {
         failure_test = FT_NO_TEST;
         unixctl_command_reply(conn, "test dismissed");