}
/*
- * Write a single packet.
+ * Write all pending messages to client socket.
+ *
+ * Any messages queued with zserv_send_message() before this function executes
+ * will be pushed to the output buffer. The buffer will then take care of
+ * writing chunks until it is empty.
+ *
+ * This function does not reschedule itself. As far as it is concerned it
+ * always writes all data. This saves us a mutex hit in thread_add_event at the
+ * theoretical expense of buffer memory usage. In practice this should never be
+ * an issue.
*/
static int zserv_write(struct thread *thread)
{
struct zserv *client = THREAD_ARG(thread);
struct stream *msg;
uint32_t wcmd;
- int writerv;
+ int writerv = BUFFER_EMPTY;
+ struct stream_fifo *cache = stream_fifo_new();
if (client->is_synchronous)
return 0;
pthread_mutex_lock(&client->obuf_mtx);
{
- msg = stream_fifo_pop(client->obuf_fifo);
+ while (client->obuf_fifo->head)
+ stream_fifo_push(cache,
+ stream_fifo_pop(client->obuf_fifo));
}
pthread_mutex_unlock(&client->obuf_mtx);
- stream_set_getp(msg, 0);
+ while (cache->head) {
+ msg = stream_fifo_pop(cache);
+ stream_set_getp(msg, 0);
- wcmd = stream_getw_from(msg, 6);
- atomic_store_explicit(&client->last_write_cmd, wcmd,
- memory_order_relaxed);
+ wcmd = stream_getw_from(msg, 6);
+ writerv = buffer_write(client->wb, client->sock,
+ STREAM_DATA(msg), stream_get_endp(msg));
- writerv = buffer_write(client->wb, client->sock, STREAM_DATA(msg),
- stream_get_endp(msg));
+ stream_free(msg);
+ }
- stream_free(msg);
+ stream_fifo_free(cache);
switch (writerv) {
case BUFFER_ERROR:
zlog_warn("%s: closing connection to %s", __func__,
zebra_route_string(client->proto));
zserv_client_close(client);
- return -1;
+ break;
case BUFFER_PENDING:
zserv_client_event(client, ZSERV_CLIENT_FLUSH_DATA);
break;
break;
}
- pthread_mutex_lock(&client->obuf_mtx);
- {
- if (client->obuf_fifo->count)
- zserv_client_event(client, ZSERV_CLIENT_WRITE);
- }
- pthread_mutex_unlock(&client->obuf_mtx);
+ atomic_store_explicit(&client->last_write_cmd, wcmd,
+ memory_order_relaxed);
atomic_store_explicit(&client->last_write_time,
- (uint32_t) monotime(NULL), memory_order_relaxed);
+ (uint32_t)monotime(NULL), memory_order_relaxed);
return 0;
}
struct zserv *client;
size_t already;
struct stream_fifo *cache = stream_fifo_new();
- int p2p_orig = atomic_load_explicit(&zebrad.packets_to_process,
- memory_order_relaxed);
+ uint32_t p2p_orig = atomic_load_explicit(&zebrad.packets_to_process,
+ memory_order_relaxed);
uint32_t p2p;
struct zmsghdr hdr;
#if defined(HANDLE_ZAPI_FUZZING)
- int p2p = 1;
+ p2p = 1;
#else
- int p2p = p2p_orig;
+ p2p = p2p_orig;
#endif
sock = THREAD_FD(thread);
client = THREAD_ARG(thread);