struct ovs_list output; /* Contains "struct ofpbuf"s. */
size_t output_count; /* Number of elements in "output". */
size_t backlog;
+
+ /* Limits. */
+ size_t max_output; /* 'output_count' disconnection threshold. */
+ size_t max_backlog; /* 'backlog' disconnection threshold. */
};
/* Rate limit for error messages. */
return rpc->status ? 0 : rpc->backlog;
}
+/* Sets thresholds for send backlog. If send backlog contains more than
+ * 'max_n_msgs' messages or is larger than 'max_backlog_bytes' bytes,
+ * connection will be dropped. */
+void
+jsonrpc_set_backlog_threshold(struct jsonrpc *rpc,
+ size_t max_n_msgs, size_t max_backlog_bytes)
+{
+ rpc->max_output = max_n_msgs;
+ rpc->max_backlog = max_backlog_bytes;
+}
+
/* Returns the number of bytes that have been received on 'rpc''s underlying
* stream. (The value wraps around if it exceeds UINT_MAX.) */
unsigned int
rpc->backlog += length;
if (rpc->output_count >= 50) {
- VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
+ static struct vlog_rate_limit bl_rl = VLOG_RATE_LIMIT_INIT(5, 5);
+ bool disconnect = false;
+
+ VLOG_INFO_RL(&bl_rl, "excessive sending backlog, jsonrpc: %s, num of"
" msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
rpc->output_count, rpc->backlog);
+ if (rpc->max_output && rpc->output_count > rpc->max_output) {
+ disconnect = true;
+ VLOG_WARN("sending backlog exceeded maximum number of messages (%"
+ PRIuSIZE" > %"PRIuSIZE"), disconnecting, jsonrpc: %s.",
+ rpc->output_count, rpc->max_output, rpc->name);
+ } else if (rpc->max_backlog && rpc->backlog > rpc->max_backlog) {
+ disconnect = true;
+ VLOG_WARN("sending backlog exceeded maximum size (%"PRIuSIZE" > %"
+ PRIuSIZE" bytes), disconnecting, jsonrpc: %s.",
+ rpc->backlog, rpc->max_backlog, rpc->name);
+ }
+ if (disconnect) {
+ jsonrpc_error(rpc, E2BIG);
+ }
}
if (rpc->backlog == length) {
int last_error;
unsigned int seqno;
uint8_t dscp;
+
+ /* Limits for jsonrpc. */
+ size_t max_n_msgs;
+ size_t max_backlog_bytes;
};
static void
s->dscp = 0;
s->last_error = 0;
+ jsonrpc_session_set_backlog_threshold(s, 0, 0);
+
const char *name = reconnect_get_name(s->reconnect);
if (!pstream_verify_name(name)) {
reconnect_set_passive(s->reconnect, true, time_msec());
s->pstream = NULL;
s->seqno = 1;
+ jsonrpc_session_set_backlog_threshold(s, 0, 0);
return s;
}
}
reconnect_connected(s->reconnect, time_msec());
s->rpc = jsonrpc_open(stream);
+ jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
+ s->max_backlog_bytes);
s->seqno++;
} else if (error != EAGAIN) {
reconnect_listen_error(s->reconnect, time_msec(), error);
if (!error) {
reconnect_connected(s->reconnect, time_msec());
s->rpc = jsonrpc_open(s->stream);
+ jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
+ s->max_backlog_bytes);
s->stream = NULL;
s->seqno++;
} else if (error != EAGAIN) {
jsonrpc_session_force_reconnect(s);
}
}
+
+/* Sets thresholds for send backlog. If send backlog contains more than
+ * 'max_n_msgs' messages or is larger than 'max_backlog_bytes' bytes,
+ * connection will be closed (then reconnected, if that feature is enabled). */
+void
+jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *s,
+ size_t max_n_msgs,
+ size_t max_backlog_bytes)
+{
+ s->max_n_msgs = max_n_msgs;
+ s->max_backlog_bytes = max_backlog_bytes;
+ if (s->rpc) {
+ jsonrpc_set_backlog_threshold(s->rpc, max_n_msgs, max_backlog_bytes);
+ }
+}