bool ever_had_leader; /* There has been leader elected since the raft
is initialized, meaning it is ever
connected. */
+
+ /* Connection backlog limits. */
+#define DEFAULT_MAX_BACKLOG_N_MSGS 500
+#define DEFAULT_MAX_BACKLOG_N_BYTES UINT32_MAX
+ size_t conn_backlog_max_n_msgs; /* Number of messages. */
+ size_t conn_backlog_max_n_bytes; /* Number of bytes. */
};
/* All Raft structures. */
raft->election_timer = ELECTION_BASE_MSEC;
+ raft->conn_backlog_max_n_msgs = DEFAULT_MAX_BACKLOG_N_MSGS;
+ raft->conn_backlog_max_n_bytes = DEFAULT_MAX_BACKLOG_N_BYTES;
+
return raft;
}
raft->ping_timeout = time_msec() + raft->election_timer / 3;
}
-#define RAFT_MAX_BACKLOG_N_MSGS 500
-#define RAFT_MAX_BACKLOG_BYTES UINT32_MAX
-
static void
raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
const struct uuid *sid, bool incoming)
conn->incoming = incoming;
conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
jsonrpc_session_set_probe_interval(js, 0);
- jsonrpc_session_set_backlog_threshold(js, RAFT_MAX_BACKLOG_N_MSGS,
- RAFT_MAX_BACKLOG_BYTES);
+ jsonrpc_session_set_backlog_threshold(js, raft->conn_backlog_max_n_msgs,
+ raft->conn_backlog_max_n_bytes);
}
/* Starts the local server in an existing Raft cluster, using the local copy of
unixctl_command_reply(conn, "change of election timer initiated.");
}
+static void
+raft_unixctl_set_backlog_threshold(struct unixctl_conn *conn,
+ int argc OVS_UNUSED, const char *argv[],
+ void *aux OVS_UNUSED)
+{
+ const char *cluster_name = argv[1];
+ unsigned long long n_msgs, n_bytes;
+ struct raft_conn *r_conn;
+
+ struct raft *raft = raft_lookup_by_name(cluster_name);
+ if (!raft) {
+ unixctl_command_reply_error(conn, "unknown cluster");
+ return;
+ }
+
+ if (!str_to_ullong(argv[2], 10, &n_msgs)
+ || !str_to_ullong(argv[3], 10, &n_bytes)) {
+ unixctl_command_reply_error(conn, "invalid argument");
+ return;
+ }
+
+ if (n_msgs < 50 || n_msgs > SIZE_MAX || n_bytes > SIZE_MAX) {
+ unixctl_command_reply_error(conn, "values out of range");
+ return;
+ }
+
+ raft->conn_backlog_max_n_msgs = n_msgs;
+ raft->conn_backlog_max_n_bytes = n_bytes;
+
+ LIST_FOR_EACH (r_conn, list_node, &raft->conns) {
+ jsonrpc_session_set_backlog_threshold(r_conn->js, n_msgs, n_bytes);
+ }
+
+ unixctl_command_reply(conn, NULL);
+}
+
static void
raft_unixctl_failure_test(struct unixctl_conn *conn OVS_UNUSED,
int argc OVS_UNUSED, const char *argv[],
raft_unixctl_kick, NULL);
unixctl_command_register("cluster/change-election-timer", "DB TIME", 2, 2,
raft_unixctl_change_election_timer, NULL);
+ unixctl_command_register("cluster/set-backlog-threshold",
+ "DB N_MSGS N_BYTES", 3, 3,
+ raft_unixctl_set_backlog_threshold, NULL);
unixctl_command_register("cluster/failure-test", "FAILURE SCENARIO", 1, 1,
raft_unixctl_failure_test, NULL);
ovsthread_once_done(&once);