]> git.proxmox.com Git - mirror_frr.git/blobdiff - lib/stream.c
zebra, lib: fix the ZEBRA_INTERFACE_VRF_UPDATE zapi message
[mirror_frr.git] / lib / stream.c
index 3c08d4454bcb1c54d3fe50d391df1e5fc90a92ef..6c187bd359a842b558b190c50ed031db3a9c7bfa 100644 (file)
 
 #include <zebra.h>
 #include <stddef.h>
+#include <pthread.h>
 
 #include "stream.h"
 #include "memory.h"
 #include "network.h"
 #include "prefix.h"
 #include "log.h"
+#include "lib_errors.h"
 
 DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream")
-DEFINE_MTYPE_STATIC(LIB, STREAM_DATA, "Stream data")
 DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
 
 /* Tests whether a position is valid */
@@ -53,7 +54,8 @@ DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
  * using stream_put..._at() functions.
  */
 #define STREAM_WARN_OFFSETS(S)                                                 \
-       zlog_warn("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n",   \
+       flog_warn(EC_LIB_STREAM,                                               \
+                 "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n",   \
                  (void *)(S), (unsigned long)(S)->size,                       \
                  (unsigned long)(S)->getp, (unsigned long)(S)->endp)
 
@@ -67,16 +69,16 @@ DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
 
 #define STREAM_BOUND_WARN(S, WHAT)                                             \
        do {                                                                   \
-               zlog_warn("%s: Attempt to %s out of bounds", __func__,         \
-                         (WHAT));                                             \
+               flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds",    \
+                         __func__, (WHAT));                                   \
                STREAM_WARN_OFFSETS(S);                                        \
                assert(0);                                                     \
        } while (0)
 
 #define STREAM_BOUND_WARN2(S, WHAT)                                            \
        do {                                                                   \
-               zlog_warn("%s: Attempt to %s out of bounds", __func__,         \
-                         (WHAT));                                             \
+               flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds",    \
+                         __func__, (WHAT));                                   \
                STREAM_WARN_OFFSETS(S);                                        \
        } while (0)
 
@@ -84,7 +86,8 @@ DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
 #define CHECK_SIZE(S, Z)                                                       \
        do {                                                                   \
                if (((S)->endp + (Z)) > (S)->size) {                           \
-                       zlog_warn(                                             \
+                       flog_warn(                                             \
+                               EC_LIB_STREAM,                                 \
                                "CHECK_SIZE: truncating requested size %lu\n", \
                                (unsigned long)(Z));                           \
                        STREAM_WARN_OFFSETS(S);                                \
@@ -99,16 +102,10 @@ struct stream *stream_new(size_t size)
 
        assert(size > 0);
 
-       s = XCALLOC(MTYPE_STREAM, sizeof(struct stream));
-
-       if (s == NULL)
-               return s;
-
-       if ((s->data = XMALLOC(MTYPE_STREAM_DATA, size)) == NULL) {
-               XFREE(MTYPE_STREAM, s);
-               return NULL;
-       }
+       s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size);
 
+       s->getp = s->endp = 0;
+       s->next = NULL;
        s->size = size;
        return s;
 }
@@ -119,7 +116,6 @@ void stream_free(struct stream *s)
        if (!s)
                return;
 
-       XFREE(MTYPE_STREAM_DATA, s->data);
        XFREE(MTYPE_STREAM, s);
 }
 
@@ -169,27 +165,33 @@ struct stream *stream_dupcat(struct stream *s1, struct stream *s2,
        return new;
 }
 
-size_t stream_resize(struct stream *s, size_t newsize)
+size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
 {
-       u_char *newdata;
-       STREAM_VERIFY_SANE(s);
+       struct stream *orig = *sptr;
 
-       newdata = XREALLOC(MTYPE_STREAM_DATA, s->data, newsize);
+       STREAM_VERIFY_SANE(orig);
 
-       if (newdata == NULL)
-               return s->size;
+       orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize);
 
-       s->data = newdata;
-       s->size = newsize;
+       orig->size = newsize;
 
-       if (s->endp > s->size)
-               s->endp = s->size;
-       if (s->getp > s->endp)
-               s->getp = s->endp;
+       if (orig->endp > orig->size)
+               orig->endp = orig->size;
+       if (orig->getp > orig->endp)
+               orig->getp = orig->endp;
 
-       STREAM_VERIFY_SANE(s);
+       STREAM_VERIFY_SANE(orig);
 
-       return s->size;
+       *sptr = orig;
+       return orig->size;
+}
+
+size_t __attribute__((deprecated))stream_resize_orig(struct stream *s,
+                                                    size_t newsize)
+{
+       assert("stream_resize: Switch code to use stream_resize_inplace" == NULL);
+
+       return stream_resize_inplace(&s, newsize);
 }
 
 size_t stream_get_getp(struct stream *s)
@@ -270,7 +272,7 @@ void stream_forward_endp(struct stream *s, size_t size)
 }
 
 /* Copy from stream to destination. */
-inline bool stream_get2(void *dst, struct stream *s, size_t size)
+bool stream_get2(void *dst, struct stream *s, size_t size)
 {
        STREAM_VERIFY_SANE(s);
 
@@ -299,11 +301,11 @@ void stream_get(void *dst, struct stream *s, size_t size)
 }
 
 /* Get next character from the stream. */
-inline bool stream_getc2(struct stream *s, u_char *byte)
+bool stream_getc2(struct stream *s, uint8_t *byte)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_READABLE(s) < sizeof(u_char)) {
+       if (STREAM_READABLE(s) < sizeof(uint8_t)) {
                STREAM_BOUND_WARN2(s, "get char");
                return false;
        }
@@ -312,13 +314,13 @@ inline bool stream_getc2(struct stream *s, u_char *byte)
        return true;
 }
 
-u_char stream_getc(struct stream *s)
+uint8_t stream_getc(struct stream *s)
 {
-       u_char c;
+       uint8_t c;
 
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_READABLE(s) < sizeof(u_char)) {
+       if (STREAM_READABLE(s) < sizeof(uint8_t)) {
                STREAM_BOUND_WARN(s, "get char");
                return 0;
        }
@@ -328,13 +330,13 @@ u_char stream_getc(struct stream *s)
 }
 
 /* Get next character from the stream. */
-u_char stream_getc_from(struct stream *s, size_t from)
+uint8_t stream_getc_from(struct stream *s, size_t from)
 {
-       u_char c;
+       uint8_t c;
 
        STREAM_VERIFY_SANE(s);
 
-       if (!GETP_VALID(s, from + sizeof(u_char))) {
+       if (!GETP_VALID(s, from + sizeof(uint8_t))) {
                STREAM_BOUND_WARN(s, "get char");
                return 0;
        }
@@ -344,7 +346,7 @@ u_char stream_getc_from(struct stream *s, size_t from)
        return c;
 }
 
-inline bool stream_getw2(struct stream *s, uint16_t *word)
+bool stream_getw2(struct stream *s, uint16_t *word)
 {
        STREAM_VERIFY_SANE(s);
 
@@ -360,13 +362,13 @@ inline bool stream_getw2(struct stream *s, uint16_t *word)
 }
 
 /* Get next word from the stream. */
-u_int16_t stream_getw(struct stream *s)
+uint16_t stream_getw(struct stream *s)
 {
-       u_int16_t w;
+       uint16_t w;
 
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_READABLE(s) < sizeof(u_int16_t)) {
+       if (STREAM_READABLE(s) < sizeof(uint16_t)) {
                STREAM_BOUND_WARN(s, "get ");
                return 0;
        }
@@ -378,13 +380,13 @@ u_int16_t stream_getw(struct stream *s)
 }
 
 /* Get next word from the stream. */
-u_int16_t stream_getw_from(struct stream *s, size_t from)
+uint16_t stream_getw_from(struct stream *s, size_t from)
 {
-       u_int16_t w;
+       uint16_t w;
 
        STREAM_VERIFY_SANE(s);
 
-       if (!GETP_VALID(s, from + sizeof(u_int16_t))) {
+       if (!GETP_VALID(s, from + sizeof(uint16_t))) {
                STREAM_BOUND_WARN(s, "get ");
                return 0;
        }
@@ -396,9 +398,9 @@ u_int16_t stream_getw_from(struct stream *s, size_t from)
 }
 
 /* Get next 3-byte from the stream. */
-u_int32_t stream_get3_from(struct stream *s, size_t from)
+uint32_t stream_get3_from(struct stream *s, size_t from)
 {
-       u_int32_t l;
+       uint32_t l;
 
        STREAM_VERIFY_SANE(s);
 
@@ -414,9 +416,9 @@ u_int32_t stream_get3_from(struct stream *s, size_t from)
        return l;
 }
 
-u_int32_t stream_get3(struct stream *s)
+uint32_t stream_get3(struct stream *s)
 {
-       u_int32_t l;
+       uint32_t l;
 
        STREAM_VERIFY_SANE(s);
 
@@ -433,13 +435,13 @@ u_int32_t stream_get3(struct stream *s)
 }
 
 /* Get next long word from the stream. */
-u_int32_t stream_getl_from(struct stream *s, size_t from)
+uint32_t stream_getl_from(struct stream *s, size_t from)
 {
-       u_int32_t l;
+       uint32_t l;
 
        STREAM_VERIFY_SANE(s);
 
-       if (!GETP_VALID(s, from + sizeof(u_int32_t))) {
+       if (!GETP_VALID(s, from + sizeof(uint32_t))) {
                STREAM_BOUND_WARN(s, "get long");
                return 0;
        }
@@ -465,7 +467,7 @@ void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
        memcpy(dst, s->data + from, size);
 }
 
-inline bool stream_getl2(struct stream *s, uint32_t *l)
+bool stream_getl2(struct stream *s, uint32_t *l)
 {
        STREAM_VERIFY_SANE(s);
 
@@ -482,13 +484,13 @@ inline bool stream_getl2(struct stream *s, uint32_t *l)
        return true;
 }
 
-u_int32_t stream_getl(struct stream *s)
+uint32_t stream_getl(struct stream *s)
 {
-       u_int32_t l;
+       uint32_t l;
 
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_READABLE(s) < sizeof(u_int32_t)) {
+       if (STREAM_READABLE(s) < sizeof(uint32_t)) {
                STREAM_BOUND_WARN(s, "get long");
                return 0;
        }
@@ -549,19 +551,19 @@ uint64_t stream_getq(struct stream *s)
 }
 
 /* Get next long word from the stream. */
-u_int32_t stream_get_ipv4(struct stream *s)
+uint32_t stream_get_ipv4(struct stream *s)
 {
-       u_int32_t l;
+       uint32_t l;
 
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_READABLE(s) < sizeof(u_int32_t)) {
+       if (STREAM_READABLE(s) < sizeof(uint32_t)) {
                STREAM_BOUND_WARN(s, "get ipv4");
                return 0;
        }
 
-       memcpy(&l, s->data + s->getp, sizeof(u_int32_t));
-       s->getp += sizeof(u_int32_t);
+       memcpy(&l, s->data + s->getp, sizeof(uint32_t));
+       s->getp += sizeof(uint32_t);
 
        return l;
 }
@@ -615,37 +617,37 @@ void stream_put(struct stream *s, const void *src, size_t size)
 }
 
 /* Put character to the stream. */
-int stream_putc(struct stream *s, u_char c)
+int stream_putc(struct stream *s, uint8_t c)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_WRITEABLE(s) < sizeof(u_char)) {
+       if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
 
        s->data[s->endp++] = c;
-       return sizeof(u_char);
+       return sizeof(uint8_t);
 }
 
 /* Put word to the stream. */
-int stream_putw(struct stream *s, u_int16_t w)
+int stream_putw(struct stream *s, uint16_t w)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_WRITEABLE(s) < sizeof(u_int16_t)) {
+       if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
 
-       s->data[s->endp++] = (u_char)(w >> 8);
-       s->data[s->endp++] = (u_char)w;
+       s->data[s->endp++] = (uint8_t)(w >> 8);
+       s->data[s->endp++] = (uint8_t)w;
 
        return 2;
 }
 
 /* Put long word to the stream. */
-int stream_put3(struct stream *s, u_int32_t l)
+int stream_put3(struct stream *s, uint32_t l)
 {
        STREAM_VERIFY_SANE(s);
 
@@ -654,27 +656,27 @@ int stream_put3(struct stream *s, u_int32_t l)
                return 0;
        }
 
-       s->data[s->endp++] = (u_char)(l >> 16);
-       s->data[s->endp++] = (u_char)(l >> 8);
-       s->data[s->endp++] = (u_char)l;
+       s->data[s->endp++] = (uint8_t)(l >> 16);
+       s->data[s->endp++] = (uint8_t)(l >> 8);
+       s->data[s->endp++] = (uint8_t)l;
 
        return 3;
 }
 
 /* Put long word to the stream. */
-int stream_putl(struct stream *s, u_int32_t l)
+int stream_putl(struct stream *s, uint32_t l)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_WRITEABLE(s) < sizeof(u_int32_t)) {
+       if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
 
-       s->data[s->endp++] = (u_char)(l >> 24);
-       s->data[s->endp++] = (u_char)(l >> 16);
-       s->data[s->endp++] = (u_char)(l >> 8);
-       s->data[s->endp++] = (u_char)l;
+       s->data[s->endp++] = (uint8_t)(l >> 24);
+       s->data[s->endp++] = (uint8_t)(l >> 16);
+       s->data[s->endp++] = (uint8_t)(l >> 8);
+       s->data[s->endp++] = (uint8_t)l;
 
        return 4;
 }
@@ -689,14 +691,14 @@ int stream_putq(struct stream *s, uint64_t q)
                return 0;
        }
 
-       s->data[s->endp++] = (u_char)(q >> 56);
-       s->data[s->endp++] = (u_char)(q >> 48);
-       s->data[s->endp++] = (u_char)(q >> 40);
-       s->data[s->endp++] = (u_char)(q >> 32);
-       s->data[s->endp++] = (u_char)(q >> 24);
-       s->data[s->endp++] = (u_char)(q >> 16);
-       s->data[s->endp++] = (u_char)(q >> 8);
-       s->data[s->endp++] = (u_char)q;
+       s->data[s->endp++] = (uint8_t)(q >> 56);
+       s->data[s->endp++] = (uint8_t)(q >> 48);
+       s->data[s->endp++] = (uint8_t)(q >> 40);
+       s->data[s->endp++] = (uint8_t)(q >> 32);
+       s->data[s->endp++] = (uint8_t)(q >> 24);
+       s->data[s->endp++] = (uint8_t)(q >> 16);
+       s->data[s->endp++] = (uint8_t)(q >> 8);
+       s->data[s->endp++] = (uint8_t)q;
 
        return 8;
 }
@@ -721,11 +723,11 @@ int stream_putd(struct stream *s, double d)
        return stream_putq(s, u.o);
 }
 
-int stream_putc_at(struct stream *s, size_t putp, u_char c)
+int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (!PUT_AT_VALID(s, putp + sizeof(u_char))) {
+       if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
@@ -735,22 +737,22 @@ int stream_putc_at(struct stream *s, size_t putp, u_char c)
        return 1;
 }
 
-int stream_putw_at(struct stream *s, size_t putp, u_int16_t w)
+int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (!PUT_AT_VALID(s, putp + sizeof(u_int16_t))) {
+       if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
 
-       s->data[putp] = (u_char)(w >> 8);
-       s->data[putp + 1] = (u_char)w;
+       s->data[putp] = (uint8_t)(w >> 8);
+       s->data[putp + 1] = (uint8_t)w;
 
        return 2;
 }
 
-int stream_put3_at(struct stream *s, size_t putp, u_int32_t l)
+int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
 {
        STREAM_VERIFY_SANE(s);
 
@@ -758,25 +760,25 @@ int stream_put3_at(struct stream *s, size_t putp, u_int32_t l)
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
-       s->data[putp] = (u_char)(l >> 16);
-       s->data[putp + 1] = (u_char)(l >> 8);
-       s->data[putp + 2] = (u_char)l;
+       s->data[putp] = (uint8_t)(l >> 16);
+       s->data[putp + 1] = (uint8_t)(l >> 8);
+       s->data[putp + 2] = (uint8_t)l;
 
        return 3;
 }
 
-int stream_putl_at(struct stream *s, size_t putp, u_int32_t l)
+int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (!PUT_AT_VALID(s, putp + sizeof(u_int32_t))) {
+       if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
-       s->data[putp] = (u_char)(l >> 24);
-       s->data[putp + 1] = (u_char)(l >> 16);
-       s->data[putp + 2] = (u_char)(l >> 8);
-       s->data[putp + 3] = (u_char)l;
+       s->data[putp] = (uint8_t)(l >> 24);
+       s->data[putp + 1] = (uint8_t)(l >> 16);
+       s->data[putp + 2] = (uint8_t)(l >> 8);
+       s->data[putp + 3] = (uint8_t)l;
 
        return 4;
 }
@@ -789,31 +791,31 @@ int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
-       s->data[putp] = (u_char)(q >> 56);
-       s->data[putp + 1] = (u_char)(q >> 48);
-       s->data[putp + 2] = (u_char)(q >> 40);
-       s->data[putp + 3] = (u_char)(q >> 32);
-       s->data[putp + 4] = (u_char)(q >> 24);
-       s->data[putp + 5] = (u_char)(q >> 16);
-       s->data[putp + 6] = (u_char)(q >> 8);
-       s->data[putp + 7] = (u_char)q;
+       s->data[putp] = (uint8_t)(q >> 56);
+       s->data[putp + 1] = (uint8_t)(q >> 48);
+       s->data[putp + 2] = (uint8_t)(q >> 40);
+       s->data[putp + 3] = (uint8_t)(q >> 32);
+       s->data[putp + 4] = (uint8_t)(q >> 24);
+       s->data[putp + 5] = (uint8_t)(q >> 16);
+       s->data[putp + 6] = (uint8_t)(q >> 8);
+       s->data[putp + 7] = (uint8_t)q;
 
        return 8;
 }
 
 /* Put long word to the stream. */
-int stream_put_ipv4(struct stream *s, u_int32_t l)
+int stream_put_ipv4(struct stream *s, uint32_t l)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_WRITEABLE(s) < sizeof(u_int32_t)) {
+       if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
-       memcpy(s->data + s->endp, &l, sizeof(u_int32_t));
-       s->endp += sizeof(u_int32_t);
+       memcpy(s->data + s->endp, &l, sizeof(uint32_t));
+       s->endp += sizeof(uint32_t);
 
-       return sizeof(u_int32_t);
+       return sizeof(uint32_t);
 }
 
 /* Put long word to the stream. */
@@ -821,15 +823,15 @@ int stream_put_in_addr(struct stream *s, struct in_addr *addr)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_WRITEABLE(s) < sizeof(u_int32_t)) {
+       if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
 
-       memcpy(s->data + s->endp, addr, sizeof(u_int32_t));
-       s->endp += sizeof(u_int32_t);
+       memcpy(s->data + s->endp, addr, sizeof(uint32_t));
+       s->endp += sizeof(uint32_t);
 
-       return sizeof(u_int32_t);
+       return sizeof(uint32_t);
 }
 
 /* Put in_addr at location in the stream. */
@@ -862,7 +864,7 @@ int stream_put_in6_addr_at(struct stream *s, size_t putp, struct in6_addr *addr)
 
 /* Put prefix by nlri type format. */
 int stream_put_prefix_addpath(struct stream *s, struct prefix *p,
-                             int addpath_encode, u_int32_t addpath_tx_id)
+                             int addpath_encode, uint32_t addpath_tx_id)
 {
        size_t psize;
        size_t psize_with_addpath;
@@ -876,16 +878,16 @@ int stream_put_prefix_addpath(struct stream *s, struct prefix *p,
        else
                psize_with_addpath = psize;
 
-       if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(u_char))) {
+       if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
 
        if (addpath_encode) {
-               s->data[s->endp++] = (u_char)(addpath_tx_id >> 24);
-               s->data[s->endp++] = (u_char)(addpath_tx_id >> 16);
-               s->data[s->endp++] = (u_char)(addpath_tx_id >> 8);
-               s->data[s->endp++] = (u_char)addpath_tx_id;
+               s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
+               s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
+               s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
+               s->data[s->endp++] = (uint8_t)addpath_tx_id;
        }
 
        s->data[s->endp++] = p->prefixlen;
@@ -905,7 +907,7 @@ int stream_put_labeled_prefix(struct stream *s, struct prefix *p,
                              mpls_label_t *label)
 {
        size_t psize;
-       u_char *label_pnt = (u_char *)label;
+       uint8_t *label_pnt = (uint8_t *)label;
 
        STREAM_VERIFY_SANE(s);
 
@@ -966,8 +968,8 @@ ssize_t stream_read_try(struct stream *s, int fd, size_t size)
        /* Error: was it transient (return -2) or fatal (return -1)? */
        if (ERRNO_IO_RETRY(errno))
                return -2;
-       zlog_warn("%s: read failed on fd %d: %s", __func__, fd,
-                 safe_strerror(errno));
+       flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
+                safe_strerror(errno));
        return -1;
 }
 
@@ -997,8 +999,8 @@ ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
        /* Error: was it transient (return -2) or fatal (return -1)? */
        if (ERRNO_IO_RETRY(errno))
                return -2;
-       zlog_warn("%s: read failed on fd %d: %s", __func__, fd,
-                 safe_strerror(errno));
+       flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
+                safe_strerror(errno));
        return -1;
 }
 
@@ -1060,7 +1062,7 @@ size_t stream_write(struct stream *s, const void *ptr, size_t size)
  * Use stream_get_pnt_to if you must, but decoding streams properly
  * is preferred
  */
-u_char *stream_pnt(struct stream *s)
+uint8_t *stream_pnt(struct stream *s)
 {
        STREAM_VERIFY_SANE(s);
        return s->data + s->getp;
@@ -1101,20 +1103,42 @@ struct stream_fifo *stream_fifo_new(void)
        struct stream_fifo *new;
 
        new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
+       pthread_mutex_init(&new->mtx, NULL);
        return new;
 }
 
 /* Add new stream to fifo. */
 void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
 {
+#if defined DEV_BUILD
+       size_t max, curmax;
+#endif
+
        if (fifo->tail)
                fifo->tail->next = s;
        else
                fifo->head = s;
 
        fifo->tail = s;
-
-       fifo->count++;
+       fifo->tail->next = NULL;
+#if !defined DEV_BUILD
+       atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
+#else
+       max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
+       curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
+       if (max > curmax)
+               atomic_store_explicit(&fifo->max_count, max,
+                                     memory_order_relaxed);
+#endif
+}
+
+void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
+{
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               stream_fifo_push(fifo, s);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
 }
 
 /* Delete first stream from fifo. */
@@ -1130,18 +1154,47 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo)
                if (fifo->head == NULL)
                        fifo->tail = NULL;
 
-               fifo->count--;
+               atomic_fetch_sub_explicit(&fifo->count, 1,
+                                         memory_order_release);
+
+               /* ensure stream is scrubbed of references to this fifo */
+               s->next = NULL;
        }
 
        return s;
 }
 
-/* Return first fifo entry. */
+struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
+{
+       struct stream *ret;
+
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               ret = stream_fifo_pop(fifo);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
+
+       return ret;
+}
+
 struct stream *stream_fifo_head(struct stream_fifo *fifo)
 {
        return fifo->head;
 }
 
+struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
+{
+       struct stream *ret;
+
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               ret = stream_fifo_head(fifo);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
+
+       return ret;
+}
+
 void stream_fifo_clean(struct stream_fifo *fifo)
 {
        struct stream *s;
@@ -1152,11 +1205,26 @@ void stream_fifo_clean(struct stream_fifo *fifo)
                stream_free(s);
        }
        fifo->head = fifo->tail = NULL;
-       fifo->count = 0;
+       atomic_store_explicit(&fifo->count, 0, memory_order_release);
+}
+
+void stream_fifo_clean_safe(struct stream_fifo *fifo)
+{
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               stream_fifo_clean(fifo);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
+}
+
+size_t stream_fifo_count_safe(struct stream_fifo *fifo)
+{
+       return atomic_load_explicit(&fifo->count, memory_order_acquire);
 }
 
 void stream_fifo_free(struct stream_fifo *fifo)
 {
        stream_fifo_clean(fifo);
+       pthread_mutex_destroy(&fifo->mtx);
        XFREE(MTYPE_STREAM_FIFO, fifo);
 }