]> git.proxmox.com Git - mirror_ovs.git/commitdiff
raft: Set threshold on backlog for raft connections.
authorIlya Maximets <i.maximets@ovn.org>
Wed, 21 Oct 2020 01:32:49 +0000 (03:32 +0200)
committerIlya Maximets <i.maximets@ovn.org>
Tue, 10 Nov 2020 00:23:33 +0000 (01:23 +0100)
RAFT messages could be fairly big.  If something abnormal happens to
one of the servers in a cluster it may not be able to process all the
incoming messages in a timely manner.  This results in jsonrpc backlog
growth on the sender's side.  For example if follower gets many new
clients at once that it needs to serve, or it decides to take a
snapshot in a period of high number of database changes.
If backlog grows large enough it becomes harder and harder for follower
to process incoming raft messages, it sends outdated replies and
starts receiving snapshots and the whole raft log from the leader.
Sometimes backlog grows too high (60GB in this example):

      jsonrpc|INFO|excessive sending backlog, jsonrpc: ssl:<ip>,
                   num of msgs: 15370, backlog: 61731060773.

In this case OS might actually decide to kill the sender to free some
memory.  Anyway, It could take a lot of time for such a server to catch
up with the rest of the cluster if it has so much data to receive and
process.

Introducing backlog thresholds for jsonrpc connections.
If sending backlog will exceed particular values (500 messages or
4GB in size), connection will be dropped and re-created.  This will
allow to drop all the current backlog and start over increasing
chances of cluster recovery.

Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=1888829
Acked-by: Dumitru Ceara <dceara@redhat.com>
Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
NEWS
lib/jsonrpc.c
lib/jsonrpc.h
ovsdb/raft.c

diff --git a/NEWS b/NEWS
index 2860a8e9ce6309589eba49c9fda087aa5bdbe725..ebdf8758bfe2c2a5c57fd4e45a1f930f41f84630 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -6,6 +6,8 @@ Post-v2.14.0
      * New unixctl command 'ovsdb-server/memory-trim-on-compaction on|off'.
        If turned on, ovsdb-server will try to reclaim all the unused memory
        after every DB compaction back to OS.  Disabled by default.
+     * Maximum backlog on RAFT connections limited to 500 messages or 4GB.
+       Once threshold reached, connection is dropped (and re-established).
    - DPDK:
      * Removed support for vhost-user dequeue zero-copy.
    - The environment variable OVS_UNBOUND_CONF, if set, is now used
index ecbc939fe7f96a612bbcb413f21561fab64926c2..08aaff061ae510806c86101a0411a7e2261f1e03 100644 (file)
@@ -50,6 +50,10 @@ struct jsonrpc {
     struct ovs_list output;     /* Contains "struct ofpbuf"s. */
     size_t output_count;        /* Number of elements in "output". */
     size_t backlog;
+
+    /* Limits. */
+    size_t max_output;          /* 'output_count' disconnection threshold. */
+    size_t max_backlog;         /* 'backlog' disconnection threshold. */
 };
 
 /* Rate limit for error messages. */
@@ -178,6 +182,17 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc)
     return rpc->status ? 0 : rpc->backlog;
 }
 
+/* Sets thresholds for send backlog.  If send backlog contains more than
+ * 'max_n_msgs' messages or is larger than 'max_backlog_bytes' bytes,
+ * connection will be dropped. */
+void
+jsonrpc_set_backlog_threshold(struct jsonrpc *rpc,
+                              size_t max_n_msgs, size_t max_backlog_bytes)
+{
+    rpc->max_output = max_n_msgs;
+    rpc->max_backlog = max_backlog_bytes;
+}
+
 /* Returns the number of bytes that have been received on 'rpc''s underlying
  * stream.  (The value wraps around if it exceeds UINT_MAX.) */
 unsigned int
@@ -261,9 +276,26 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
     rpc->backlog += length;
 
     if (rpc->output_count >= 50) {
-        VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
+        static struct vlog_rate_limit bl_rl = VLOG_RATE_LIMIT_INIT(5, 5);
+        bool disconnect = false;
+
+        VLOG_INFO_RL(&bl_rl, "excessive sending backlog, jsonrpc: %s, num of"
                      " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
                      rpc->output_count, rpc->backlog);
+        if (rpc->max_output && rpc->output_count > rpc->max_output) {
+            disconnect = true;
+            VLOG_WARN("sending backlog exceeded maximum number of messages (%"
+                      PRIuSIZE" > %"PRIuSIZE"), disconnecting, jsonrpc: %s.",
+                      rpc->output_count, rpc->max_output, rpc->name);
+        } else if (rpc->max_backlog && rpc->backlog > rpc->max_backlog) {
+            disconnect = true;
+            VLOG_WARN("sending backlog exceeded maximum size (%"PRIuSIZE" > %"
+                      PRIuSIZE" bytes), disconnecting, jsonrpc: %s.",
+                      rpc->backlog, rpc->max_backlog, rpc->name);
+        }
+        if (disconnect) {
+            jsonrpc_error(rpc, E2BIG);
+        }
     }
 
     if (rpc->backlog == length) {
@@ -787,6 +819,10 @@ struct jsonrpc_session {
     int last_error;
     unsigned int seqno;
     uint8_t dscp;
+
+    /* Limits for jsonrpc. */
+    size_t max_n_msgs;
+    size_t max_backlog_bytes;
 };
 
 static void
@@ -842,6 +878,8 @@ jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
     s->dscp = 0;
     s->last_error = 0;
 
+    jsonrpc_session_set_backlog_threshold(s, 0, 0);
+
     const char *name = reconnect_get_name(s->reconnect);
     if (!pstream_verify_name(name)) {
         reconnect_set_passive(s->reconnect, true, time_msec());
@@ -882,6 +920,7 @@ jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
     s->pstream = NULL;
     s->seqno = 1;
 
+    jsonrpc_session_set_backlog_threshold(s, 0, 0);
     return s;
 }
 
@@ -970,6 +1009,8 @@ jsonrpc_session_run(struct jsonrpc_session *s)
             }
             reconnect_connected(s->reconnect, time_msec());
             s->rpc = jsonrpc_open(stream);
+            jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
+                                                  s->max_backlog_bytes);
             s->seqno++;
         } else if (error != EAGAIN) {
             reconnect_listen_error(s->reconnect, time_msec(), error);
@@ -1010,6 +1051,8 @@ jsonrpc_session_run(struct jsonrpc_session *s)
         if (!error) {
             reconnect_connected(s->reconnect, time_msec());
             s->rpc = jsonrpc_open(s->stream);
+            jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
+                                                  s->max_backlog_bytes);
             s->stream = NULL;
             s->seqno++;
         } else if (error != EAGAIN) {
@@ -1250,3 +1293,18 @@ jsonrpc_session_set_dscp(struct jsonrpc_session *s, uint8_t dscp)
         jsonrpc_session_force_reconnect(s);
     }
 }
+
+/* Sets thresholds for send backlog.  If send backlog contains more than
+ * 'max_n_msgs' messages or is larger than 'max_backlog_bytes' bytes,
+ * connection will be closed (then reconnected, if that feature is enabled). */
+void
+jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *s,
+                                      size_t max_n_msgs,
+                                      size_t max_backlog_bytes)
+{
+    s->max_n_msgs = max_n_msgs;
+    s->max_backlog_bytes = max_backlog_bytes;
+    if (s->rpc) {
+        jsonrpc_set_backlog_threshold(s->rpc, max_n_msgs, max_backlog_bytes);
+    }
+}
index a44114e8dcd9133c3f4ac2c7c841cd368717a301..d75d66b863cbe3bc31fa3e562e52407875ef27bc 100644 (file)
@@ -51,6 +51,9 @@ void jsonrpc_wait(struct jsonrpc *);
 
 int jsonrpc_get_status(const struct jsonrpc *);
 size_t jsonrpc_get_backlog(const struct jsonrpc *);
+void jsonrpc_set_backlog_threshold(struct jsonrpc *, size_t max_n_msgs,
+                                                     size_t max_backlog_bytes);
+
 unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *);
 const char *jsonrpc_get_name(const struct jsonrpc *);
 
@@ -140,6 +143,9 @@ void jsonrpc_session_set_probe_interval(struct jsonrpc_session *,
                                         int probe_interval);
 void jsonrpc_session_set_dscp(struct jsonrpc_session *,
                               uint8_t dscp);
+void jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *,
+                                           size_t max_n_msgs,
+                                           size_t max_backlog_bytes);
 const char *jsonrpc_session_get_id(const struct jsonrpc_session *);
 
 #endif /* jsonrpc.h */
index f94a3eed8efc79b2dc0a638388a2df2106b4dd6d..67c714ff4d1bf925171cd045ab99cccf75ba4191 100644 (file)
@@ -925,6 +925,9 @@ raft_reset_ping_timer(struct raft *raft)
     raft->ping_timeout = time_msec() + raft->election_timer / 3;
 }
 
+#define RAFT_MAX_BACKLOG_N_MSGS    500
+#define RAFT_MAX_BACKLOG_BYTES     UINT32_MAX
+
 static void
 raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
               const struct uuid *sid, bool incoming)
@@ -940,6 +943,8 @@ raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
     conn->incoming = incoming;
     conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
     jsonrpc_session_set_probe_interval(js, 0);
+    jsonrpc_session_set_backlog_threshold(js, RAFT_MAX_BACKLOG_N_MSGS,
+                                              RAFT_MAX_BACKLOG_BYTES);
 }
 
 /* Starts the local server in an existing Raft cluster, using the local copy of