X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=lib%2Fstream.c;h=6c187bd359a842b558b190c50ed031db3a9c7bfa;hb=91d227b7e3cb53ad8fdbcd9d4cff7f0a666918e3;hp=3c08d4454bcb1c54d3fe50d391df1e5fc90a92ef;hpb=4e262455a252c700f81df75fb8107d112062bba8;p=mirror_frr.git diff --git a/lib/stream.c b/lib/stream.c index 3c08d4454..6c187bd35 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -21,15 +21,16 @@ #include #include +#include #include "stream.h" #include "memory.h" #include "network.h" #include "prefix.h" #include "log.h" +#include "lib_errors.h" DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream") -DEFINE_MTYPE_STATIC(LIB, STREAM_DATA, "Stream data") DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO") /* Tests whether a position is valid */ @@ -53,7 +54,8 @@ DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO") * using stream_put..._at() functions. */ #define STREAM_WARN_OFFSETS(S) \ - zlog_warn("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \ + flog_warn(EC_LIB_STREAM, \ + "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \ (void *)(S), (unsigned long)(S)->size, \ (unsigned long)(S)->getp, (unsigned long)(S)->endp) @@ -67,16 +69,16 @@ DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO") #define STREAM_BOUND_WARN(S, WHAT) \ do { \ - zlog_warn("%s: Attempt to %s out of bounds", __func__, \ - (WHAT)); \ + flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \ + __func__, (WHAT)); \ STREAM_WARN_OFFSETS(S); \ assert(0); \ } while (0) #define STREAM_BOUND_WARN2(S, WHAT) \ do { \ - zlog_warn("%s: Attempt to %s out of bounds", __func__, \ - (WHAT)); \ + flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \ + __func__, (WHAT)); \ STREAM_WARN_OFFSETS(S); \ } while (0) @@ -84,7 +86,8 @@ DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO") #define CHECK_SIZE(S, Z) \ do { \ if (((S)->endp + (Z)) > (S)->size) { \ - zlog_warn( \ + flog_warn( \ + EC_LIB_STREAM, \ "CHECK_SIZE: truncating requested size %lu\n", \ (unsigned long)(Z)); \ STREAM_WARN_OFFSETS(S); \ @@ -99,16 +102,10 @@ struct stream *stream_new(size_t size) assert(size > 0); - s = XCALLOC(MTYPE_STREAM, sizeof(struct stream)); - - if (s == NULL) - return s; - - if ((s->data = XMALLOC(MTYPE_STREAM_DATA, size)) == NULL) { - XFREE(MTYPE_STREAM, s); - return NULL; - } + s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size); + s->getp = s->endp = 0; + s->next = NULL; s->size = size; return s; } @@ -119,7 +116,6 @@ void stream_free(struct stream *s) if (!s) return; - XFREE(MTYPE_STREAM_DATA, s->data); XFREE(MTYPE_STREAM, s); } @@ -169,27 +165,33 @@ struct stream *stream_dupcat(struct stream *s1, struct stream *s2, return new; } -size_t stream_resize(struct stream *s, size_t newsize) +size_t stream_resize_inplace(struct stream **sptr, size_t newsize) { - u_char *newdata; - STREAM_VERIFY_SANE(s); + struct stream *orig = *sptr; - newdata = XREALLOC(MTYPE_STREAM_DATA, s->data, newsize); + STREAM_VERIFY_SANE(orig); - if (newdata == NULL) - return s->size; + orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize); - s->data = newdata; - s->size = newsize; + orig->size = newsize; - if (s->endp > s->size) - s->endp = s->size; - if (s->getp > s->endp) - s->getp = s->endp; + if (orig->endp > orig->size) + orig->endp = orig->size; + if (orig->getp > orig->endp) + orig->getp = orig->endp; - STREAM_VERIFY_SANE(s); + STREAM_VERIFY_SANE(orig); - return s->size; + *sptr = orig; + return orig->size; +} + +size_t __attribute__((deprecated))stream_resize_orig(struct stream *s, + size_t newsize) +{ + assert("stream_resize: Switch code to use stream_resize_inplace" == NULL); + + return stream_resize_inplace(&s, newsize); } size_t stream_get_getp(struct stream *s) @@ -270,7 +272,7 @@ void stream_forward_endp(struct stream *s, size_t size) } /* Copy from stream to destination. */ -inline bool stream_get2(void *dst, struct stream *s, size_t size) +bool stream_get2(void *dst, struct stream *s, size_t size) { STREAM_VERIFY_SANE(s); @@ -299,11 +301,11 @@ void stream_get(void *dst, struct stream *s, size_t size) } /* Get next character from the stream. */ -inline bool stream_getc2(struct stream *s, u_char *byte) +bool stream_getc2(struct stream *s, uint8_t *byte) { STREAM_VERIFY_SANE(s); - if (STREAM_READABLE(s) < sizeof(u_char)) { + if (STREAM_READABLE(s) < sizeof(uint8_t)) { STREAM_BOUND_WARN2(s, "get char"); return false; } @@ -312,13 +314,13 @@ inline bool stream_getc2(struct stream *s, u_char *byte) return true; } -u_char stream_getc(struct stream *s) +uint8_t stream_getc(struct stream *s) { - u_char c; + uint8_t c; STREAM_VERIFY_SANE(s); - if (STREAM_READABLE(s) < sizeof(u_char)) { + if (STREAM_READABLE(s) < sizeof(uint8_t)) { STREAM_BOUND_WARN(s, "get char"); return 0; } @@ -328,13 +330,13 @@ u_char stream_getc(struct stream *s) } /* Get next character from the stream. */ -u_char stream_getc_from(struct stream *s, size_t from) +uint8_t stream_getc_from(struct stream *s, size_t from) { - u_char c; + uint8_t c; STREAM_VERIFY_SANE(s); - if (!GETP_VALID(s, from + sizeof(u_char))) { + if (!GETP_VALID(s, from + sizeof(uint8_t))) { STREAM_BOUND_WARN(s, "get char"); return 0; } @@ -344,7 +346,7 @@ u_char stream_getc_from(struct stream *s, size_t from) return c; } -inline bool stream_getw2(struct stream *s, uint16_t *word) +bool stream_getw2(struct stream *s, uint16_t *word) { STREAM_VERIFY_SANE(s); @@ -360,13 +362,13 @@ inline bool stream_getw2(struct stream *s, uint16_t *word) } /* Get next word from the stream. */ -u_int16_t stream_getw(struct stream *s) +uint16_t stream_getw(struct stream *s) { - u_int16_t w; + uint16_t w; STREAM_VERIFY_SANE(s); - if (STREAM_READABLE(s) < sizeof(u_int16_t)) { + if (STREAM_READABLE(s) < sizeof(uint16_t)) { STREAM_BOUND_WARN(s, "get "); return 0; } @@ -378,13 +380,13 @@ u_int16_t stream_getw(struct stream *s) } /* Get next word from the stream. */ -u_int16_t stream_getw_from(struct stream *s, size_t from) +uint16_t stream_getw_from(struct stream *s, size_t from) { - u_int16_t w; + uint16_t w; STREAM_VERIFY_SANE(s); - if (!GETP_VALID(s, from + sizeof(u_int16_t))) { + if (!GETP_VALID(s, from + sizeof(uint16_t))) { STREAM_BOUND_WARN(s, "get "); return 0; } @@ -396,9 +398,9 @@ u_int16_t stream_getw_from(struct stream *s, size_t from) } /* Get next 3-byte from the stream. */ -u_int32_t stream_get3_from(struct stream *s, size_t from) +uint32_t stream_get3_from(struct stream *s, size_t from) { - u_int32_t l; + uint32_t l; STREAM_VERIFY_SANE(s); @@ -414,9 +416,9 @@ u_int32_t stream_get3_from(struct stream *s, size_t from) return l; } -u_int32_t stream_get3(struct stream *s) +uint32_t stream_get3(struct stream *s) { - u_int32_t l; + uint32_t l; STREAM_VERIFY_SANE(s); @@ -433,13 +435,13 @@ u_int32_t stream_get3(struct stream *s) } /* Get next long word from the stream. */ -u_int32_t stream_getl_from(struct stream *s, size_t from) +uint32_t stream_getl_from(struct stream *s, size_t from) { - u_int32_t l; + uint32_t l; STREAM_VERIFY_SANE(s); - if (!GETP_VALID(s, from + sizeof(u_int32_t))) { + if (!GETP_VALID(s, from + sizeof(uint32_t))) { STREAM_BOUND_WARN(s, "get long"); return 0; } @@ -465,7 +467,7 @@ void stream_get_from(void *dst, struct stream *s, size_t from, size_t size) memcpy(dst, s->data + from, size); } -inline bool stream_getl2(struct stream *s, uint32_t *l) +bool stream_getl2(struct stream *s, uint32_t *l) { STREAM_VERIFY_SANE(s); @@ -482,13 +484,13 @@ inline bool stream_getl2(struct stream *s, uint32_t *l) return true; } -u_int32_t stream_getl(struct stream *s) +uint32_t stream_getl(struct stream *s) { - u_int32_t l; + uint32_t l; STREAM_VERIFY_SANE(s); - if (STREAM_READABLE(s) < sizeof(u_int32_t)) { + if (STREAM_READABLE(s) < sizeof(uint32_t)) { STREAM_BOUND_WARN(s, "get long"); return 0; } @@ -549,19 +551,19 @@ uint64_t stream_getq(struct stream *s) } /* Get next long word from the stream. */ -u_int32_t stream_get_ipv4(struct stream *s) +uint32_t stream_get_ipv4(struct stream *s) { - u_int32_t l; + uint32_t l; STREAM_VERIFY_SANE(s); - if (STREAM_READABLE(s) < sizeof(u_int32_t)) { + if (STREAM_READABLE(s) < sizeof(uint32_t)) { STREAM_BOUND_WARN(s, "get ipv4"); return 0; } - memcpy(&l, s->data + s->getp, sizeof(u_int32_t)); - s->getp += sizeof(u_int32_t); + memcpy(&l, s->data + s->getp, sizeof(uint32_t)); + s->getp += sizeof(uint32_t); return l; } @@ -615,37 +617,37 @@ void stream_put(struct stream *s, const void *src, size_t size) } /* Put character to the stream. */ -int stream_putc(struct stream *s, u_char c) +int stream_putc(struct stream *s, uint8_t c) { STREAM_VERIFY_SANE(s); - if (STREAM_WRITEABLE(s) < sizeof(u_char)) { + if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) { STREAM_BOUND_WARN(s, "put"); return 0; } s->data[s->endp++] = c; - return sizeof(u_char); + return sizeof(uint8_t); } /* Put word to the stream. */ -int stream_putw(struct stream *s, u_int16_t w) +int stream_putw(struct stream *s, uint16_t w) { STREAM_VERIFY_SANE(s); - if (STREAM_WRITEABLE(s) < sizeof(u_int16_t)) { + if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) { STREAM_BOUND_WARN(s, "put"); return 0; } - s->data[s->endp++] = (u_char)(w >> 8); - s->data[s->endp++] = (u_char)w; + s->data[s->endp++] = (uint8_t)(w >> 8); + s->data[s->endp++] = (uint8_t)w; return 2; } /* Put long word to the stream. */ -int stream_put3(struct stream *s, u_int32_t l) +int stream_put3(struct stream *s, uint32_t l) { STREAM_VERIFY_SANE(s); @@ -654,27 +656,27 @@ int stream_put3(struct stream *s, u_int32_t l) return 0; } - s->data[s->endp++] = (u_char)(l >> 16); - s->data[s->endp++] = (u_char)(l >> 8); - s->data[s->endp++] = (u_char)l; + s->data[s->endp++] = (uint8_t)(l >> 16); + s->data[s->endp++] = (uint8_t)(l >> 8); + s->data[s->endp++] = (uint8_t)l; return 3; } /* Put long word to the stream. */ -int stream_putl(struct stream *s, u_int32_t l) +int stream_putl(struct stream *s, uint32_t l) { STREAM_VERIFY_SANE(s); - if (STREAM_WRITEABLE(s) < sizeof(u_int32_t)) { + if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { STREAM_BOUND_WARN(s, "put"); return 0; } - s->data[s->endp++] = (u_char)(l >> 24); - s->data[s->endp++] = (u_char)(l >> 16); - s->data[s->endp++] = (u_char)(l >> 8); - s->data[s->endp++] = (u_char)l; + s->data[s->endp++] = (uint8_t)(l >> 24); + s->data[s->endp++] = (uint8_t)(l >> 16); + s->data[s->endp++] = (uint8_t)(l >> 8); + s->data[s->endp++] = (uint8_t)l; return 4; } @@ -689,14 +691,14 @@ int stream_putq(struct stream *s, uint64_t q) return 0; } - s->data[s->endp++] = (u_char)(q >> 56); - s->data[s->endp++] = (u_char)(q >> 48); - s->data[s->endp++] = (u_char)(q >> 40); - s->data[s->endp++] = (u_char)(q >> 32); - s->data[s->endp++] = (u_char)(q >> 24); - s->data[s->endp++] = (u_char)(q >> 16); - s->data[s->endp++] = (u_char)(q >> 8); - s->data[s->endp++] = (u_char)q; + s->data[s->endp++] = (uint8_t)(q >> 56); + s->data[s->endp++] = (uint8_t)(q >> 48); + s->data[s->endp++] = (uint8_t)(q >> 40); + s->data[s->endp++] = (uint8_t)(q >> 32); + s->data[s->endp++] = (uint8_t)(q >> 24); + s->data[s->endp++] = (uint8_t)(q >> 16); + s->data[s->endp++] = (uint8_t)(q >> 8); + s->data[s->endp++] = (uint8_t)q; return 8; } @@ -721,11 +723,11 @@ int stream_putd(struct stream *s, double d) return stream_putq(s, u.o); } -int stream_putc_at(struct stream *s, size_t putp, u_char c) +int stream_putc_at(struct stream *s, size_t putp, uint8_t c) { STREAM_VERIFY_SANE(s); - if (!PUT_AT_VALID(s, putp + sizeof(u_char))) { + if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) { STREAM_BOUND_WARN(s, "put"); return 0; } @@ -735,22 +737,22 @@ int stream_putc_at(struct stream *s, size_t putp, u_char c) return 1; } -int stream_putw_at(struct stream *s, size_t putp, u_int16_t w) +int stream_putw_at(struct stream *s, size_t putp, uint16_t w) { STREAM_VERIFY_SANE(s); - if (!PUT_AT_VALID(s, putp + sizeof(u_int16_t))) { + if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) { STREAM_BOUND_WARN(s, "put"); return 0; } - s->data[putp] = (u_char)(w >> 8); - s->data[putp + 1] = (u_char)w; + s->data[putp] = (uint8_t)(w >> 8); + s->data[putp + 1] = (uint8_t)w; return 2; } -int stream_put3_at(struct stream *s, size_t putp, u_int32_t l) +int stream_put3_at(struct stream *s, size_t putp, uint32_t l) { STREAM_VERIFY_SANE(s); @@ -758,25 +760,25 @@ int stream_put3_at(struct stream *s, size_t putp, u_int32_t l) STREAM_BOUND_WARN(s, "put"); return 0; } - s->data[putp] = (u_char)(l >> 16); - s->data[putp + 1] = (u_char)(l >> 8); - s->data[putp + 2] = (u_char)l; + s->data[putp] = (uint8_t)(l >> 16); + s->data[putp + 1] = (uint8_t)(l >> 8); + s->data[putp + 2] = (uint8_t)l; return 3; } -int stream_putl_at(struct stream *s, size_t putp, u_int32_t l) +int stream_putl_at(struct stream *s, size_t putp, uint32_t l) { STREAM_VERIFY_SANE(s); - if (!PUT_AT_VALID(s, putp + sizeof(u_int32_t))) { + if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) { STREAM_BOUND_WARN(s, "put"); return 0; } - s->data[putp] = (u_char)(l >> 24); - s->data[putp + 1] = (u_char)(l >> 16); - s->data[putp + 2] = (u_char)(l >> 8); - s->data[putp + 3] = (u_char)l; + s->data[putp] = (uint8_t)(l >> 24); + s->data[putp + 1] = (uint8_t)(l >> 16); + s->data[putp + 2] = (uint8_t)(l >> 8); + s->data[putp + 3] = (uint8_t)l; return 4; } @@ -789,31 +791,31 @@ int stream_putq_at(struct stream *s, size_t putp, uint64_t q) STREAM_BOUND_WARN(s, "put"); return 0; } - s->data[putp] = (u_char)(q >> 56); - s->data[putp + 1] = (u_char)(q >> 48); - s->data[putp + 2] = (u_char)(q >> 40); - s->data[putp + 3] = (u_char)(q >> 32); - s->data[putp + 4] = (u_char)(q >> 24); - s->data[putp + 5] = (u_char)(q >> 16); - s->data[putp + 6] = (u_char)(q >> 8); - s->data[putp + 7] = (u_char)q; + s->data[putp] = (uint8_t)(q >> 56); + s->data[putp + 1] = (uint8_t)(q >> 48); + s->data[putp + 2] = (uint8_t)(q >> 40); + s->data[putp + 3] = (uint8_t)(q >> 32); + s->data[putp + 4] = (uint8_t)(q >> 24); + s->data[putp + 5] = (uint8_t)(q >> 16); + s->data[putp + 6] = (uint8_t)(q >> 8); + s->data[putp + 7] = (uint8_t)q; return 8; } /* Put long word to the stream. */ -int stream_put_ipv4(struct stream *s, u_int32_t l) +int stream_put_ipv4(struct stream *s, uint32_t l) { STREAM_VERIFY_SANE(s); - if (STREAM_WRITEABLE(s) < sizeof(u_int32_t)) { + if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { STREAM_BOUND_WARN(s, "put"); return 0; } - memcpy(s->data + s->endp, &l, sizeof(u_int32_t)); - s->endp += sizeof(u_int32_t); + memcpy(s->data + s->endp, &l, sizeof(uint32_t)); + s->endp += sizeof(uint32_t); - return sizeof(u_int32_t); + return sizeof(uint32_t); } /* Put long word to the stream. */ @@ -821,15 +823,15 @@ int stream_put_in_addr(struct stream *s, struct in_addr *addr) { STREAM_VERIFY_SANE(s); - if (STREAM_WRITEABLE(s) < sizeof(u_int32_t)) { + if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) { STREAM_BOUND_WARN(s, "put"); return 0; } - memcpy(s->data + s->endp, addr, sizeof(u_int32_t)); - s->endp += sizeof(u_int32_t); + memcpy(s->data + s->endp, addr, sizeof(uint32_t)); + s->endp += sizeof(uint32_t); - return sizeof(u_int32_t); + return sizeof(uint32_t); } /* Put in_addr at location in the stream. */ @@ -862,7 +864,7 @@ int stream_put_in6_addr_at(struct stream *s, size_t putp, struct in6_addr *addr) /* Put prefix by nlri type format. */ int stream_put_prefix_addpath(struct stream *s, struct prefix *p, - int addpath_encode, u_int32_t addpath_tx_id) + int addpath_encode, uint32_t addpath_tx_id) { size_t psize; size_t psize_with_addpath; @@ -876,16 +878,16 @@ int stream_put_prefix_addpath(struct stream *s, struct prefix *p, else psize_with_addpath = psize; - if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(u_char))) { + if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) { STREAM_BOUND_WARN(s, "put"); return 0; } if (addpath_encode) { - s->data[s->endp++] = (u_char)(addpath_tx_id >> 24); - s->data[s->endp++] = (u_char)(addpath_tx_id >> 16); - s->data[s->endp++] = (u_char)(addpath_tx_id >> 8); - s->data[s->endp++] = (u_char)addpath_tx_id; + s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24); + s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16); + s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8); + s->data[s->endp++] = (uint8_t)addpath_tx_id; } s->data[s->endp++] = p->prefixlen; @@ -905,7 +907,7 @@ int stream_put_labeled_prefix(struct stream *s, struct prefix *p, mpls_label_t *label) { size_t psize; - u_char *label_pnt = (u_char *)label; + uint8_t *label_pnt = (uint8_t *)label; STREAM_VERIFY_SANE(s); @@ -966,8 +968,8 @@ ssize_t stream_read_try(struct stream *s, int fd, size_t size) /* Error: was it transient (return -2) or fatal (return -1)? */ if (ERRNO_IO_RETRY(errno)) return -2; - zlog_warn("%s: read failed on fd %d: %s", __func__, fd, - safe_strerror(errno)); + flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd, + safe_strerror(errno)); return -1; } @@ -997,8 +999,8 @@ ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags, /* Error: was it transient (return -2) or fatal (return -1)? */ if (ERRNO_IO_RETRY(errno)) return -2; - zlog_warn("%s: read failed on fd %d: %s", __func__, fd, - safe_strerror(errno)); + flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd, + safe_strerror(errno)); return -1; } @@ -1060,7 +1062,7 @@ size_t stream_write(struct stream *s, const void *ptr, size_t size) * Use stream_get_pnt_to if you must, but decoding streams properly * is preferred */ -u_char *stream_pnt(struct stream *s) +uint8_t *stream_pnt(struct stream *s) { STREAM_VERIFY_SANE(s); return s->data + s->getp; @@ -1101,20 +1103,42 @@ struct stream_fifo *stream_fifo_new(void) struct stream_fifo *new; new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo)); + pthread_mutex_init(&new->mtx, NULL); return new; } /* Add new stream to fifo. */ void stream_fifo_push(struct stream_fifo *fifo, struct stream *s) { +#if defined DEV_BUILD + size_t max, curmax; +#endif + if (fifo->tail) fifo->tail->next = s; else fifo->head = s; fifo->tail = s; - - fifo->count++; + fifo->tail->next = NULL; +#if !defined DEV_BUILD + atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release); +#else + max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release); + curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed); + if (max > curmax) + atomic_store_explicit(&fifo->max_count, max, + memory_order_relaxed); +#endif +} + +void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s) +{ + pthread_mutex_lock(&fifo->mtx); + { + stream_fifo_push(fifo, s); + } + pthread_mutex_unlock(&fifo->mtx); } /* Delete first stream from fifo. */ @@ -1130,18 +1154,47 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo) if (fifo->head == NULL) fifo->tail = NULL; - fifo->count--; + atomic_fetch_sub_explicit(&fifo->count, 1, + memory_order_release); + + /* ensure stream is scrubbed of references to this fifo */ + s->next = NULL; } return s; } -/* Return first fifo entry. */ +struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo) +{ + struct stream *ret; + + pthread_mutex_lock(&fifo->mtx); + { + ret = stream_fifo_pop(fifo); + } + pthread_mutex_unlock(&fifo->mtx); + + return ret; +} + struct stream *stream_fifo_head(struct stream_fifo *fifo) { return fifo->head; } +struct stream *stream_fifo_head_safe(struct stream_fifo *fifo) +{ + struct stream *ret; + + pthread_mutex_lock(&fifo->mtx); + { + ret = stream_fifo_head(fifo); + } + pthread_mutex_unlock(&fifo->mtx); + + return ret; +} + void stream_fifo_clean(struct stream_fifo *fifo) { struct stream *s; @@ -1152,11 +1205,26 @@ void stream_fifo_clean(struct stream_fifo *fifo) stream_free(s); } fifo->head = fifo->tail = NULL; - fifo->count = 0; + atomic_store_explicit(&fifo->count, 0, memory_order_release); +} + +void stream_fifo_clean_safe(struct stream_fifo *fifo) +{ + pthread_mutex_lock(&fifo->mtx); + { + stream_fifo_clean(fifo); + } + pthread_mutex_unlock(&fifo->mtx); +} + +size_t stream_fifo_count_safe(struct stream_fifo *fifo) +{ + return atomic_load_explicit(&fifo->count, memory_order_acquire); } void stream_fifo_free(struct stream_fifo *fifo) { stream_fifo_clean(fifo); + pthread_mutex_destroy(&fifo->mtx); XFREE(MTYPE_STREAM_FIFO, fifo); }