X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=lib%2Fstream.c;h=6c187bd359a842b558b190c50ed031db3a9c7bfa;hb=91d227b7e3cb53ad8fdbcd9d4cff7f0a666918e3;hp=c4edd3d5bff3e622f8fc35a01c67ad7e0389b0cb;hpb=d90b2b73cb174e781b796d2f9c7a9075e4327a59;p=mirror_frr.git diff --git a/lib/stream.c b/lib/stream.c index c4edd3d5b..6c187bd35 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -21,15 +21,16 @@ #include #include +#include #include "stream.h" #include "memory.h" #include "network.h" #include "prefix.h" #include "log.h" +#include "lib_errors.h" DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream") -DEFINE_MTYPE_STATIC(LIB, STREAM_DATA, "Stream data") DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO") /* Tests whether a position is valid */ @@ -53,7 +54,8 @@ DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO") * using stream_put..._at() functions. */ #define STREAM_WARN_OFFSETS(S) \ - zlog_warn("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \ + flog_warn(EC_LIB_STREAM, \ + "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \ (void *)(S), (unsigned long)(S)->size, \ (unsigned long)(S)->getp, (unsigned long)(S)->endp) @@ -67,16 +69,16 @@ DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO") #define STREAM_BOUND_WARN(S, WHAT) \ do { \ - zlog_warn("%s: Attempt to %s out of bounds", __func__, \ - (WHAT)); \ + flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \ + __func__, (WHAT)); \ STREAM_WARN_OFFSETS(S); \ assert(0); \ } while (0) #define STREAM_BOUND_WARN2(S, WHAT) \ do { \ - zlog_warn("%s: Attempt to %s out of bounds", __func__, \ - (WHAT)); \ + flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \ + __func__, (WHAT)); \ STREAM_WARN_OFFSETS(S); \ } while (0) @@ -84,7 +86,8 @@ DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO") #define CHECK_SIZE(S, Z) \ do { \ if (((S)->endp + (Z)) > (S)->size) { \ - zlog_warn( \ + flog_warn( \ + EC_LIB_STREAM, \ "CHECK_SIZE: truncating requested size %lu\n", \ (unsigned long)(Z)); \ STREAM_WARN_OFFSETS(S); \ @@ -99,16 +102,10 @@ struct stream *stream_new(size_t size) assert(size > 0); - s = XCALLOC(MTYPE_STREAM, sizeof(struct stream)); - - if (s == NULL) - return s; - - if ((s->data = XMALLOC(MTYPE_STREAM_DATA, size)) == NULL) { - XFREE(MTYPE_STREAM, s); - return NULL; - } + s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size); + s->getp = s->endp = 0; + s->next = NULL; s->size = size; return s; } @@ -119,7 +116,6 @@ void stream_free(struct stream *s) if (!s) return; - XFREE(MTYPE_STREAM_DATA, s->data); XFREE(MTYPE_STREAM, s); } @@ -169,27 +165,33 @@ struct stream *stream_dupcat(struct stream *s1, struct stream *s2, return new; } -size_t stream_resize(struct stream *s, size_t newsize) +size_t stream_resize_inplace(struct stream **sptr, size_t newsize) { - uint8_t *newdata; - STREAM_VERIFY_SANE(s); + struct stream *orig = *sptr; - newdata = XREALLOC(MTYPE_STREAM_DATA, s->data, newsize); + STREAM_VERIFY_SANE(orig); - if (newdata == NULL) - return s->size; + orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize); - s->data = newdata; - s->size = newsize; + orig->size = newsize; - if (s->endp > s->size) - s->endp = s->size; - if (s->getp > s->endp) - s->getp = s->endp; + if (orig->endp > orig->size) + orig->endp = orig->size; + if (orig->getp > orig->endp) + orig->getp = orig->endp; - STREAM_VERIFY_SANE(s); + STREAM_VERIFY_SANE(orig); - return s->size; + *sptr = orig; + return orig->size; +} + +size_t __attribute__((deprecated))stream_resize_orig(struct stream *s, + size_t newsize) +{ + assert("stream_resize: Switch code to use stream_resize_inplace" == NULL); + + return stream_resize_inplace(&s, newsize); } size_t stream_get_getp(struct stream *s) @@ -270,7 +272,7 @@ void stream_forward_endp(struct stream *s, size_t size) } /* Copy from stream to destination. */ -inline bool stream_get2(void *dst, struct stream *s, size_t size) +bool stream_get2(void *dst, struct stream *s, size_t size) { STREAM_VERIFY_SANE(s); @@ -299,7 +301,7 @@ void stream_get(void *dst, struct stream *s, size_t size) } /* Get next character from the stream. */ -inline bool stream_getc2(struct stream *s, uint8_t *byte) +bool stream_getc2(struct stream *s, uint8_t *byte) { STREAM_VERIFY_SANE(s); @@ -344,7 +346,7 @@ uint8_t stream_getc_from(struct stream *s, size_t from) return c; } -inline bool stream_getw2(struct stream *s, uint16_t *word) +bool stream_getw2(struct stream *s, uint16_t *word) { STREAM_VERIFY_SANE(s); @@ -465,7 +467,7 @@ void stream_get_from(void *dst, struct stream *s, size_t from, size_t size) memcpy(dst, s->data + from, size); } -inline bool stream_getl2(struct stream *s, uint32_t *l) +bool stream_getl2(struct stream *s, uint32_t *l) { STREAM_VERIFY_SANE(s); @@ -966,8 +968,8 @@ ssize_t stream_read_try(struct stream *s, int fd, size_t size) /* Error: was it transient (return -2) or fatal (return -1)? */ if (ERRNO_IO_RETRY(errno)) return -2; - zlog_warn("%s: read failed on fd %d: %s", __func__, fd, - safe_strerror(errno)); + flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd, + safe_strerror(errno)); return -1; } @@ -997,8 +999,8 @@ ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags, /* Error: was it transient (return -2) or fatal (return -1)? */ if (ERRNO_IO_RETRY(errno)) return -2; - zlog_warn("%s: read failed on fd %d: %s", __func__, fd, - safe_strerror(errno)); + flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd, + safe_strerror(errno)); return -1; } @@ -1101,12 +1103,17 @@ struct stream_fifo *stream_fifo_new(void) struct stream_fifo *new; new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo)); + pthread_mutex_init(&new->mtx, NULL); return new; } /* Add new stream to fifo. */ void stream_fifo_push(struct stream_fifo *fifo, struct stream *s) { +#if defined DEV_BUILD + size_t max, curmax; +#endif + if (fifo->tail) fifo->tail->next = s; else @@ -1114,8 +1121,24 @@ void stream_fifo_push(struct stream_fifo *fifo, struct stream *s) fifo->tail = s; fifo->tail->next = NULL; - - fifo->count++; +#if !defined DEV_BUILD + atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release); +#else + max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release); + curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed); + if (max > curmax) + atomic_store_explicit(&fifo->max_count, max, + memory_order_relaxed); +#endif +} + +void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s) +{ + pthread_mutex_lock(&fifo->mtx); + { + stream_fifo_push(fifo, s); + } + pthread_mutex_unlock(&fifo->mtx); } /* Delete first stream from fifo. */ @@ -1131,7 +1154,8 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo) if (fifo->head == NULL) fifo->tail = NULL; - fifo->count--; + atomic_fetch_sub_explicit(&fifo->count, 1, + memory_order_release); /* ensure stream is scrubbed of references to this fifo */ s->next = NULL; @@ -1140,12 +1164,37 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo) return s; } -/* Return first fifo entry. */ +struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo) +{ + struct stream *ret; + + pthread_mutex_lock(&fifo->mtx); + { + ret = stream_fifo_pop(fifo); + } + pthread_mutex_unlock(&fifo->mtx); + + return ret; +} + struct stream *stream_fifo_head(struct stream_fifo *fifo) { return fifo->head; } +struct stream *stream_fifo_head_safe(struct stream_fifo *fifo) +{ + struct stream *ret; + + pthread_mutex_lock(&fifo->mtx); + { + ret = stream_fifo_head(fifo); + } + pthread_mutex_unlock(&fifo->mtx); + + return ret; +} + void stream_fifo_clean(struct stream_fifo *fifo) { struct stream *s; @@ -1156,11 +1205,26 @@ void stream_fifo_clean(struct stream_fifo *fifo) stream_free(s); } fifo->head = fifo->tail = NULL; - fifo->count = 0; + atomic_store_explicit(&fifo->count, 0, memory_order_release); +} + +void stream_fifo_clean_safe(struct stream_fifo *fifo) +{ + pthread_mutex_lock(&fifo->mtx); + { + stream_fifo_clean(fifo); + } + pthread_mutex_unlock(&fifo->mtx); +} + +size_t stream_fifo_count_safe(struct stream_fifo *fifo) +{ + return atomic_load_explicit(&fifo->count, memory_order_acquire); } void stream_fifo_free(struct stream_fifo *fifo) { stream_fifo_clean(fifo); + pthread_mutex_destroy(&fifo->mtx); XFREE(MTYPE_STREAM_FIFO, fifo); }