int sock;
struct zserv *client;
size_t already;
+ struct stream_fifo *cache = stream_fifo_new();
+ int p2p_orig = atomic_load_explicit(&zebrad.packets_to_process,
+ memory_order_relaxed);
+ uint32_t p2p;
+ struct zmsghdr hdr;
+
#if defined(HANDLE_ZAPI_FUZZING)
int p2p = 1;
#else
- int p2p = zebrad.packets_to_process;
+ int p2p = p2p_orig;
#endif
sock = THREAD_FD(thread);
client = THREAD_ARG(thread);
while (p2p--) {
- struct zmsghdr hdr;
ssize_t nb;
bool hdrvalid;
char errmsg[256];
"Message has corrupt header\n%s: socket %d message length %u exceeds buffer size %lu",
__func__, sock, hdr.length,
(unsigned long)STREAM_SIZE(client->ibuf_work));
+ zserv_log_message(errmsg, client->ibuf_work, &hdr);
goto zread_fail;
}
if (IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV)
zserv_log_message(NULL, client->ibuf_work, &hdr);
+ stream_set_getp(client->ibuf_work, 0);
+ struct stream *msg = stream_dup(client->ibuf_work);
+
+ stream_fifo_push(cache, msg);
+ stream_reset(client->ibuf_work);
+ }
+
+ if (p2p < p2p_orig) {
+ /* update session statistics */
atomic_store_explicit(&client->last_read_time, monotime(NULL),
memory_order_relaxed);
atomic_store_explicit(&client->last_read_cmd, hdr.command,
memory_order_relaxed);
- stream_set_getp(client->ibuf_work, 0);
- struct stream *msg = stream_dup(client->ibuf_work);
-
+ /* publish read packets on client's input queue */
pthread_mutex_lock(&client->ibuf_mtx);
{
- stream_fifo_push(client->ibuf_fifo, msg);
+ while (cache->head)
+ stream_fifo_push(client->ibuf_fifo,
+ stream_fifo_pop(cache));
}
pthread_mutex_unlock(&client->ibuf_mtx);
-
- stream_reset(client->ibuf_work);
}
if (IS_ZEBRA_DEBUG_PACKET)
- zlog_debug("Read %d packets",
- zebrad.packets_to_process - p2p);
+ zlog_debug("Read %d packets", p2p_orig - p2p);
/* Schedule job to process those packets */
zserv_event(client, ZSERV_PROCESS_MESSAGES);
/* Reschedule ourselves */
zserv_client_event(client, ZSERV_CLIENT_READ);
+ stream_fifo_free(cache);
+
return 0;
zread_fail:
+ stream_fifo_free(cache);
zserv_client_close(client);
return -1;
}