From 370d8dad7992460ecbf312d9dd03ec2692aeb0bf Mon Sep 17 00:00:00 2001 From: Quentin Young Date: Tue, 24 Apr 2018 14:51:26 -0400 Subject: [PATCH] zebra: optimize zserv_write Dequeue all pending messages when writing and push them all into the write buffer. This removes the necessity to self-schedule, avoiding a mutex lock, and should also maximize throughput by not writing 1 packet per job. Signed-off-by: Quentin Young --- zebra/zserv.c | 54 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/zebra/zserv.c b/zebra/zserv.c index b666b2e0a..fa8549d4e 100644 --- a/zebra/zserv.c +++ b/zebra/zserv.c @@ -199,34 +199,48 @@ static int zserv_flush_data(struct thread *thread) } /* - * Write a single packet. + * Write all pending messages to client socket. + * + * Any messages queued with zserv_send_message() before this function executes + * will be pushed to the output buffer. The buffer will then take care of + * writing chunks until it is empty. + * + * This function does not reschedule itself. As far as it is concerned it + * always writes all data. This saves us a mutex hit in thread_add_event at the + * theoretical expense of buffer memory usage. In practice this should never be + * an issue. */ static int zserv_write(struct thread *thread) { struct zserv *client = THREAD_ARG(thread); struct stream *msg; uint32_t wcmd; - int writerv; + int writerv = BUFFER_EMPTY; + struct stream_fifo *cache = stream_fifo_new(); if (client->is_synchronous) return 0; pthread_mutex_lock(&client->obuf_mtx); { - msg = stream_fifo_pop(client->obuf_fifo); + while (client->obuf_fifo->head) + stream_fifo_push(cache, + stream_fifo_pop(client->obuf_fifo)); } pthread_mutex_unlock(&client->obuf_mtx); - stream_set_getp(msg, 0); + while (cache->head) { + msg = stream_fifo_pop(cache); + stream_set_getp(msg, 0); - wcmd = stream_getw_from(msg, 6); - atomic_store_explicit(&client->last_write_cmd, wcmd, - memory_order_relaxed); + wcmd = stream_getw_from(msg, 6); + writerv = buffer_write(client->wb, client->sock, + STREAM_DATA(msg), stream_get_endp(msg)); - writerv = buffer_write(client->wb, client->sock, STREAM_DATA(msg), - stream_get_endp(msg)); + stream_free(msg); + } - stream_free(msg); + stream_fifo_free(cache); switch (writerv) { case BUFFER_ERROR: @@ -236,7 +250,7 @@ static int zserv_write(struct thread *thread) zlog_warn("%s: closing connection to %s", __func__, zebra_route_string(client->proto)); zserv_client_close(client); - return -1; + break; case BUFFER_PENDING: zserv_client_event(client, ZSERV_CLIENT_FLUSH_DATA); break; @@ -244,15 +258,11 @@ static int zserv_write(struct thread *thread) break; } - pthread_mutex_lock(&client->obuf_mtx); - { - if (client->obuf_fifo->count) - zserv_client_event(client, ZSERV_CLIENT_WRITE); - } - pthread_mutex_unlock(&client->obuf_mtx); + atomic_store_explicit(&client->last_write_cmd, wcmd, + memory_order_relaxed); atomic_store_explicit(&client->last_write_time, - (uint32_t) monotime(NULL), memory_order_relaxed); + (uint32_t)monotime(NULL), memory_order_relaxed); return 0; } @@ -305,15 +315,15 @@ static int zserv_read(struct thread *thread) struct zserv *client; size_t already; struct stream_fifo *cache = stream_fifo_new(); - int p2p_orig = atomic_load_explicit(&zebrad.packets_to_process, - memory_order_relaxed); + uint32_t p2p_orig = atomic_load_explicit(&zebrad.packets_to_process, + memory_order_relaxed); uint32_t p2p; struct zmsghdr hdr; #if defined(HANDLE_ZAPI_FUZZING) - int p2p = 1; + p2p = 1; #else - int p2p = p2p_orig; + p2p = p2p_orig; #endif sock = THREAD_FD(thread); client = THREAD_ARG(thread); -- 2.39.2