]> git.proxmox.com Git - mirror_frr.git/blobdiff - lib/stream.c
doc, lib, zebra: Remove deprecated encode and decode functionality
[mirror_frr.git] / lib / stream.c
index f88689f677c4842f6226c3657c8eb170bb63a21e..55e7f6435826118df2c8735f12a3832ea7912191 100644 (file)
 
 #include <zebra.h>
 #include <stddef.h>
+#include <pthread.h>
 
 #include "stream.h"
 #include "memory.h"
 #include "network.h"
 #include "prefix.h"
 #include "log.h"
+#include "lib_errors.h"
 
 DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream")
-DEFINE_MTYPE_STATIC(LIB, STREAM_DATA, "Stream data")
 DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
 
 /* Tests whether a position is valid */
@@ -73,6 +74,13 @@ DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
                assert(0);                                                     \
        } while (0)
 
+#define STREAM_BOUND_WARN2(S, WHAT)                                            \
+       do {                                                                   \
+               zlog_warn("%s: Attempt to %s out of bounds", __func__,         \
+                         (WHAT));                                             \
+               STREAM_WARN_OFFSETS(S);                                        \
+       } while (0)
+
 /* XXX: Deprecated macro: do not use */
 #define CHECK_SIZE(S, Z)                                                       \
        do {                                                                   \
@@ -92,16 +100,10 @@ struct stream *stream_new(size_t size)
 
        assert(size > 0);
 
-       s = XCALLOC(MTYPE_STREAM, sizeof(struct stream));
-
-       if (s == NULL)
-               return s;
-
-       if ((s->data = XMALLOC(MTYPE_STREAM_DATA, size)) == NULL) {
-               XFREE(MTYPE_STREAM, s);
-               return NULL;
-       }
+       s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size);
 
+       s->getp = s->endp = 0;
+       s->next = NULL;
        s->size = size;
        return s;
 }
@@ -112,7 +114,6 @@ void stream_free(struct stream *s)
        if (!s)
                return;
 
-       XFREE(MTYPE_STREAM_DATA, s->data);
        XFREE(MTYPE_STREAM, s);
 }
 
@@ -162,27 +163,33 @@ struct stream *stream_dupcat(struct stream *s1, struct stream *s2,
        return new;
 }
 
-size_t stream_resize(struct stream *s, size_t newsize)
+size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
 {
-       u_char *newdata;
-       STREAM_VERIFY_SANE(s);
+       struct stream *orig = *sptr;
 
-       newdata = XREALLOC(MTYPE_STREAM_DATA, s->data, newsize);
+       STREAM_VERIFY_SANE(orig);
 
-       if (newdata == NULL)
-               return s->size;
+       orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize);
 
-       s->data = newdata;
-       s->size = newsize;
+       orig->size = newsize;
 
-       if (s->endp > s->size)
-               s->endp = s->size;
-       if (s->getp > s->endp)
-               s->getp = s->endp;
+       if (orig->endp > orig->size)
+               orig->endp = orig->size;
+       if (orig->getp > orig->endp)
+               orig->getp = orig->endp;
 
-       STREAM_VERIFY_SANE(s);
+       STREAM_VERIFY_SANE(orig);
 
-       return s->size;
+       *sptr = orig;
+       return orig->size;
+}
+
+size_t __attribute__((deprecated))stream_resize_orig(struct stream *s,
+                                                    size_t newsize)
+{
+       assert("stream_resize: Switch code to use stream_resize_inplace" == NULL);
+
+       return stream_resize_inplace(&s, newsize);
 }
 
 size_t stream_get_getp(struct stream *s)
@@ -263,6 +270,21 @@ void stream_forward_endp(struct stream *s, size_t size)
 }
 
 /* Copy from stream to destination. */
+inline bool stream_get2(void *dst, struct stream *s, size_t size)
+{
+       STREAM_VERIFY_SANE(s);
+
+       if (STREAM_READABLE(s) < size) {
+               STREAM_BOUND_WARN2(s, "get");
+               return false;
+       }
+
+       memcpy(dst, s->data + s->getp, size);
+       s->getp += size;
+
+       return true;
+}
+
 void stream_get(void *dst, struct stream *s, size_t size)
 {
        STREAM_VERIFY_SANE(s);
@@ -277,13 +299,26 @@ void stream_get(void *dst, struct stream *s, size_t size)
 }
 
 /* Get next character from the stream. */
-u_char stream_getc(struct stream *s)
+inline bool stream_getc2(struct stream *s, uint8_t *byte)
 {
-       u_char c;
+       STREAM_VERIFY_SANE(s);
+
+       if (STREAM_READABLE(s) < sizeof(uint8_t)) {
+               STREAM_BOUND_WARN2(s, "get char");
+               return false;
+       }
+       *byte = s->data[s->getp++];
+
+       return true;
+}
+
+uint8_t stream_getc(struct stream *s)
+{
+       uint8_t c;
 
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_READABLE(s) < sizeof(u_char)) {
+       if (STREAM_READABLE(s) < sizeof(uint8_t)) {
                STREAM_BOUND_WARN(s, "get char");
                return 0;
        }
@@ -293,13 +328,13 @@ u_char stream_getc(struct stream *s)
 }
 
 /* Get next character from the stream. */
-u_char stream_getc_from(struct stream *s, size_t from)
+uint8_t stream_getc_from(struct stream *s, size_t from)
 {
-       u_char c;
+       uint8_t c;
 
        STREAM_VERIFY_SANE(s);
 
-       if (!GETP_VALID(s, from + sizeof(u_char))) {
+       if (!GETP_VALID(s, from + sizeof(uint8_t))) {
                STREAM_BOUND_WARN(s, "get char");
                return 0;
        }
@@ -309,14 +344,29 @@ u_char stream_getc_from(struct stream *s, size_t from)
        return c;
 }
 
+inline bool stream_getw2(struct stream *s, uint16_t *word)
+{
+       STREAM_VERIFY_SANE(s);
+
+       if (STREAM_READABLE(s) < sizeof(uint16_t)) {
+               STREAM_BOUND_WARN2(s, "get ");
+               return false;
+       }
+
+       *word = s->data[s->getp++] << 8;
+       *word |= s->data[s->getp++];
+
+       return true;
+}
+
 /* Get next word from the stream. */
-u_int16_t stream_getw(struct stream *s)
+uint16_t stream_getw(struct stream *s)
 {
-       u_int16_t w;
+       uint16_t w;
 
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_READABLE(s) < sizeof(u_int16_t)) {
+       if (STREAM_READABLE(s) < sizeof(uint16_t)) {
                STREAM_BOUND_WARN(s, "get ");
                return 0;
        }
@@ -328,13 +378,13 @@ u_int16_t stream_getw(struct stream *s)
 }
 
 /* Get next word from the stream. */
-u_int16_t stream_getw_from(struct stream *s, size_t from)
+uint16_t stream_getw_from(struct stream *s, size_t from)
 {
-       u_int16_t w;
+       uint16_t w;
 
        STREAM_VERIFY_SANE(s);
 
-       if (!GETP_VALID(s, from + sizeof(u_int16_t))) {
+       if (!GETP_VALID(s, from + sizeof(uint16_t))) {
                STREAM_BOUND_WARN(s, "get ");
                return 0;
        }
@@ -346,9 +396,9 @@ u_int16_t stream_getw_from(struct stream *s, size_t from)
 }
 
 /* Get next 3-byte from the stream. */
-u_int32_t stream_get3_from(struct stream *s, size_t from)
+uint32_t stream_get3_from(struct stream *s, size_t from)
 {
-       u_int32_t l;
+       uint32_t l;
 
        STREAM_VERIFY_SANE(s);
 
@@ -364,9 +414,9 @@ u_int32_t stream_get3_from(struct stream *s, size_t from)
        return l;
 }
 
-u_int32_t stream_get3(struct stream *s)
+uint32_t stream_get3(struct stream *s)
 {
-       u_int32_t l;
+       uint32_t l;
 
        STREAM_VERIFY_SANE(s);
 
@@ -383,13 +433,13 @@ u_int32_t stream_get3(struct stream *s)
 }
 
 /* Get next long word from the stream. */
-u_int32_t stream_getl_from(struct stream *s, size_t from)
+uint32_t stream_getl_from(struct stream *s, size_t from)
 {
-       u_int32_t l;
+       uint32_t l;
 
        STREAM_VERIFY_SANE(s);
 
-       if (!GETP_VALID(s, from + sizeof(u_int32_t))) {
+       if (!GETP_VALID(s, from + sizeof(uint32_t))) {
                STREAM_BOUND_WARN(s, "get long");
                return 0;
        }
@@ -415,13 +465,30 @@ void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
        memcpy(dst, s->data + from, size);
 }
 
-u_int32_t stream_getl(struct stream *s)
+inline bool stream_getl2(struct stream *s, uint32_t *l)
 {
-       u_int32_t l;
+       STREAM_VERIFY_SANE(s);
+
+       if (STREAM_READABLE(s) < sizeof(uint32_t)) {
+               STREAM_BOUND_WARN2(s, "get long");
+               return false;
+       }
+
+       *l = (unsigned int)(s->data[s->getp++]) << 24;
+       *l |= s->data[s->getp++] << 16;
+       *l |= s->data[s->getp++] << 8;
+       *l |= s->data[s->getp++];
+
+       return true;
+}
+
+uint32_t stream_getl(struct stream *s)
+{
+       uint32_t l;
 
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_READABLE(s) < sizeof(u_int32_t)) {
+       if (STREAM_READABLE(s) < sizeof(uint32_t)) {
                STREAM_BOUND_WARN(s, "get long");
                return 0;
        }
@@ -482,19 +549,19 @@ uint64_t stream_getq(struct stream *s)
 }
 
 /* Get next long word from the stream. */
-u_int32_t stream_get_ipv4(struct stream *s)
+uint32_t stream_get_ipv4(struct stream *s)
 {
-       u_int32_t l;
+       uint32_t l;
 
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_READABLE(s) < sizeof(u_int32_t)) {
+       if (STREAM_READABLE(s) < sizeof(uint32_t)) {
                STREAM_BOUND_WARN(s, "get ipv4");
                return 0;
        }
 
-       memcpy(&l, s->data + s->getp, sizeof(u_int32_t));
-       s->getp += sizeof(u_int32_t);
+       memcpy(&l, s->data + s->getp, sizeof(uint32_t));
+       s->getp += sizeof(uint32_t);
 
        return l;
 }
@@ -548,37 +615,37 @@ void stream_put(struct stream *s, const void *src, size_t size)
 }
 
 /* Put character to the stream. */
-int stream_putc(struct stream *s, u_char c)
+int stream_putc(struct stream *s, uint8_t c)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_WRITEABLE(s) < sizeof(u_char)) {
+       if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
 
        s->data[s->endp++] = c;
-       return sizeof(u_char);
+       return sizeof(uint8_t);
 }
 
 /* Put word to the stream. */
-int stream_putw(struct stream *s, u_int16_t w)
+int stream_putw(struct stream *s, uint16_t w)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_WRITEABLE(s) < sizeof(u_int16_t)) {
+       if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
 
-       s->data[s->endp++] = (u_char)(w >> 8);
-       s->data[s->endp++] = (u_char)w;
+       s->data[s->endp++] = (uint8_t)(w >> 8);
+       s->data[s->endp++] = (uint8_t)w;
 
        return 2;
 }
 
 /* Put long word to the stream. */
-int stream_put3(struct stream *s, u_int32_t l)
+int stream_put3(struct stream *s, uint32_t l)
 {
        STREAM_VERIFY_SANE(s);
 
@@ -587,27 +654,27 @@ int stream_put3(struct stream *s, u_int32_t l)
                return 0;
        }
 
-       s->data[s->endp++] = (u_char)(l >> 16);
-       s->data[s->endp++] = (u_char)(l >> 8);
-       s->data[s->endp++] = (u_char)l;
+       s->data[s->endp++] = (uint8_t)(l >> 16);
+       s->data[s->endp++] = (uint8_t)(l >> 8);
+       s->data[s->endp++] = (uint8_t)l;
 
        return 3;
 }
 
 /* Put long word to the stream. */
-int stream_putl(struct stream *s, u_int32_t l)
+int stream_putl(struct stream *s, uint32_t l)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_WRITEABLE(s) < sizeof(u_int32_t)) {
+       if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
 
-       s->data[s->endp++] = (u_char)(l >> 24);
-       s->data[s->endp++] = (u_char)(l >> 16);
-       s->data[s->endp++] = (u_char)(l >> 8);
-       s->data[s->endp++] = (u_char)l;
+       s->data[s->endp++] = (uint8_t)(l >> 24);
+       s->data[s->endp++] = (uint8_t)(l >> 16);
+       s->data[s->endp++] = (uint8_t)(l >> 8);
+       s->data[s->endp++] = (uint8_t)l;
 
        return 4;
 }
@@ -622,14 +689,14 @@ int stream_putq(struct stream *s, uint64_t q)
                return 0;
        }
 
-       s->data[s->endp++] = (u_char)(q >> 56);
-       s->data[s->endp++] = (u_char)(q >> 48);
-       s->data[s->endp++] = (u_char)(q >> 40);
-       s->data[s->endp++] = (u_char)(q >> 32);
-       s->data[s->endp++] = (u_char)(q >> 24);
-       s->data[s->endp++] = (u_char)(q >> 16);
-       s->data[s->endp++] = (u_char)(q >> 8);
-       s->data[s->endp++] = (u_char)q;
+       s->data[s->endp++] = (uint8_t)(q >> 56);
+       s->data[s->endp++] = (uint8_t)(q >> 48);
+       s->data[s->endp++] = (uint8_t)(q >> 40);
+       s->data[s->endp++] = (uint8_t)(q >> 32);
+       s->data[s->endp++] = (uint8_t)(q >> 24);
+       s->data[s->endp++] = (uint8_t)(q >> 16);
+       s->data[s->endp++] = (uint8_t)(q >> 8);
+       s->data[s->endp++] = (uint8_t)q;
 
        return 8;
 }
@@ -654,11 +721,11 @@ int stream_putd(struct stream *s, double d)
        return stream_putq(s, u.o);
 }
 
-int stream_putc_at(struct stream *s, size_t putp, u_char c)
+int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (!PUT_AT_VALID(s, putp + sizeof(u_char))) {
+       if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
@@ -668,22 +735,22 @@ int stream_putc_at(struct stream *s, size_t putp, u_char c)
        return 1;
 }
 
-int stream_putw_at(struct stream *s, size_t putp, u_int16_t w)
+int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (!PUT_AT_VALID(s, putp + sizeof(u_int16_t))) {
+       if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
 
-       s->data[putp] = (u_char)(w >> 8);
-       s->data[putp + 1] = (u_char)w;
+       s->data[putp] = (uint8_t)(w >> 8);
+       s->data[putp + 1] = (uint8_t)w;
 
        return 2;
 }
 
-int stream_put3_at(struct stream *s, size_t putp, u_int32_t l)
+int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
 {
        STREAM_VERIFY_SANE(s);
 
@@ -691,25 +758,25 @@ int stream_put3_at(struct stream *s, size_t putp, u_int32_t l)
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
-       s->data[putp] = (u_char)(l >> 16);
-       s->data[putp + 1] = (u_char)(l >> 8);
-       s->data[putp + 2] = (u_char)l;
+       s->data[putp] = (uint8_t)(l >> 16);
+       s->data[putp + 1] = (uint8_t)(l >> 8);
+       s->data[putp + 2] = (uint8_t)l;
 
        return 3;
 }
 
-int stream_putl_at(struct stream *s, size_t putp, u_int32_t l)
+int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (!PUT_AT_VALID(s, putp + sizeof(u_int32_t))) {
+       if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
-       s->data[putp] = (u_char)(l >> 24);
-       s->data[putp + 1] = (u_char)(l >> 16);
-       s->data[putp + 2] = (u_char)(l >> 8);
-       s->data[putp + 3] = (u_char)l;
+       s->data[putp] = (uint8_t)(l >> 24);
+       s->data[putp + 1] = (uint8_t)(l >> 16);
+       s->data[putp + 2] = (uint8_t)(l >> 8);
+       s->data[putp + 3] = (uint8_t)l;
 
        return 4;
 }
@@ -722,31 +789,31 @@ int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
-       s->data[putp] = (u_char)(q >> 56);
-       s->data[putp + 1] = (u_char)(q >> 48);
-       s->data[putp + 2] = (u_char)(q >> 40);
-       s->data[putp + 3] = (u_char)(q >> 32);
-       s->data[putp + 4] = (u_char)(q >> 24);
-       s->data[putp + 5] = (u_char)(q >> 16);
-       s->data[putp + 6] = (u_char)(q >> 8);
-       s->data[putp + 7] = (u_char)q;
+       s->data[putp] = (uint8_t)(q >> 56);
+       s->data[putp + 1] = (uint8_t)(q >> 48);
+       s->data[putp + 2] = (uint8_t)(q >> 40);
+       s->data[putp + 3] = (uint8_t)(q >> 32);
+       s->data[putp + 4] = (uint8_t)(q >> 24);
+       s->data[putp + 5] = (uint8_t)(q >> 16);
+       s->data[putp + 6] = (uint8_t)(q >> 8);
+       s->data[putp + 7] = (uint8_t)q;
 
        return 8;
 }
 
 /* Put long word to the stream. */
-int stream_put_ipv4(struct stream *s, u_int32_t l)
+int stream_put_ipv4(struct stream *s, uint32_t l)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_WRITEABLE(s) < sizeof(u_int32_t)) {
+       if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
-       memcpy(s->data + s->endp, &l, sizeof(u_int32_t));
-       s->endp += sizeof(u_int32_t);
+       memcpy(s->data + s->endp, &l, sizeof(uint32_t));
+       s->endp += sizeof(uint32_t);
 
-       return sizeof(u_int32_t);
+       return sizeof(uint32_t);
 }
 
 /* Put long word to the stream. */
@@ -754,15 +821,15 @@ int stream_put_in_addr(struct stream *s, struct in_addr *addr)
 {
        STREAM_VERIFY_SANE(s);
 
-       if (STREAM_WRITEABLE(s) < sizeof(u_int32_t)) {
+       if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
 
-       memcpy(s->data + s->endp, addr, sizeof(u_int32_t));
-       s->endp += sizeof(u_int32_t);
+       memcpy(s->data + s->endp, addr, sizeof(uint32_t));
+       s->endp += sizeof(uint32_t);
 
-       return sizeof(u_int32_t);
+       return sizeof(uint32_t);
 }
 
 /* Put in_addr at location in the stream. */
@@ -795,7 +862,7 @@ int stream_put_in6_addr_at(struct stream *s, size_t putp, struct in6_addr *addr)
 
 /* Put prefix by nlri type format. */
 int stream_put_prefix_addpath(struct stream *s, struct prefix *p,
-                             int addpath_encode, u_int32_t addpath_tx_id)
+                             int addpath_encode, uint32_t addpath_tx_id)
 {
        size_t psize;
        size_t psize_with_addpath;
@@ -809,16 +876,16 @@ int stream_put_prefix_addpath(struct stream *s, struct prefix *p,
        else
                psize_with_addpath = psize;
 
-       if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(u_char))) {
+       if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
                STREAM_BOUND_WARN(s, "put");
                return 0;
        }
 
        if (addpath_encode) {
-               s->data[s->endp++] = (u_char)(addpath_tx_id >> 24);
-               s->data[s->endp++] = (u_char)(addpath_tx_id >> 16);
-               s->data[s->endp++] = (u_char)(addpath_tx_id >> 8);
-               s->data[s->endp++] = (u_char)addpath_tx_id;
+               s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
+               s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
+               s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
+               s->data[s->endp++] = (uint8_t)addpath_tx_id;
        }
 
        s->data[s->endp++] = p->prefixlen;
@@ -838,7 +905,7 @@ int stream_put_labeled_prefix(struct stream *s, struct prefix *p,
                              mpls_label_t *label)
 {
        size_t psize;
-       u_char *label_pnt = (u_char *)label;
+       uint8_t *label_pnt = (uint8_t *)label;
 
        STREAM_VERIFY_SANE(s);
 
@@ -993,7 +1060,7 @@ size_t stream_write(struct stream *s, const void *ptr, size_t size)
  * Use stream_get_pnt_to if you must, but decoding streams properly
  * is preferred
  */
-u_char *stream_pnt(struct stream *s)
+uint8_t *stream_pnt(struct stream *s)
 {
        STREAM_VERIFY_SANE(s);
        return s->data + s->getp;
@@ -1034,20 +1101,42 @@ struct stream_fifo *stream_fifo_new(void)
        struct stream_fifo *new;
 
        new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
+       pthread_mutex_init(&new->mtx, NULL);
        return new;
 }
 
 /* Add new stream to fifo. */
 void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
 {
+#if defined DEV_BUILD
+       size_t max, curmax;
+#endif
+
        if (fifo->tail)
                fifo->tail->next = s;
        else
                fifo->head = s;
 
        fifo->tail = s;
+       fifo->tail->next = NULL;
+#if !defined DEV_BUILD
+       atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
+#else
+       max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
+       curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
+       if (max > curmax)
+               atomic_store_explicit(&fifo->max_count, max,
+                                     memory_order_relaxed);
+#endif
+}
 
-       fifo->count++;
+void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
+{
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               stream_fifo_push(fifo, s);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
 }
 
 /* Delete first stream from fifo. */
@@ -1063,18 +1152,47 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo)
                if (fifo->head == NULL)
                        fifo->tail = NULL;
 
-               fifo->count--;
+               atomic_fetch_sub_explicit(&fifo->count, 1,
+                                         memory_order_release);
+
+               /* ensure stream is scrubbed of references to this fifo */
+               s->next = NULL;
        }
 
        return s;
 }
 
-/* Return first fifo entry. */
+struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
+{
+       struct stream *ret;
+
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               ret = stream_fifo_pop(fifo);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
+
+       return ret;
+}
+
 struct stream *stream_fifo_head(struct stream_fifo *fifo)
 {
        return fifo->head;
 }
 
+struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
+{
+       struct stream *ret;
+
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               ret = stream_fifo_head(fifo);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
+
+       return ret;
+}
+
 void stream_fifo_clean(struct stream_fifo *fifo)
 {
        struct stream *s;
@@ -1085,11 +1203,26 @@ void stream_fifo_clean(struct stream_fifo *fifo)
                stream_free(s);
        }
        fifo->head = fifo->tail = NULL;
-       fifo->count = 0;
+       atomic_store_explicit(&fifo->count, 0, memory_order_release);
+}
+
+void stream_fifo_clean_safe(struct stream_fifo *fifo)
+{
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               stream_fifo_clean(fifo);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
+}
+
+size_t stream_fifo_count_safe(struct stream_fifo *fifo)
+{
+       return atomic_load_explicit(&fifo->count, memory_order_acquire);
 }
 
 void stream_fifo_free(struct stream_fifo *fifo)
 {
        stream_fifo_clean(fifo);
+       pthread_mutex_destroy(&fifo->mtx);
        XFREE(MTYPE_STREAM_FIFO, fifo);
 }