]> git.proxmox.com Git - mirror_frr.git/blobdiff - lib/stream.c
zebra, lib: fix the ZEBRA_INTERFACE_VRF_UPDATE zapi message
[mirror_frr.git] / lib / stream.c
index c4edd3d5bff3e622f8fc35a01c67ad7e0389b0cb..6c187bd359a842b558b190c50ed031db3a9c7bfa 100644 (file)
 
 #include <zebra.h>
 #include <stddef.h>
+#include <pthread.h>
 
 #include "stream.h"
 #include "memory.h"
 #include "network.h"
 #include "prefix.h"
 #include "log.h"
+#include "lib_errors.h"
 
 DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream")
-DEFINE_MTYPE_STATIC(LIB, STREAM_DATA, "Stream data")
 DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
 
 /* Tests whether a position is valid */
@@ -53,7 +54,8 @@ DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
  * using stream_put..._at() functions.
  */
 #define STREAM_WARN_OFFSETS(S)                                                 \
-       zlog_warn("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n",   \
+       flog_warn(EC_LIB_STREAM,                                               \
+                 "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n",   \
                  (void *)(S), (unsigned long)(S)->size,                       \
                  (unsigned long)(S)->getp, (unsigned long)(S)->endp)
 
@@ -67,16 +69,16 @@ DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
 
 #define STREAM_BOUND_WARN(S, WHAT)                                             \
        do {                                                                   \
-               zlog_warn("%s: Attempt to %s out of bounds", __func__,         \
-                         (WHAT));                                             \
+               flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds",    \
+                         __func__, (WHAT));                                   \
                STREAM_WARN_OFFSETS(S);                                        \
                assert(0);                                                     \
        } while (0)
 
 #define STREAM_BOUND_WARN2(S, WHAT)                                            \
        do {                                                                   \
-               zlog_warn("%s: Attempt to %s out of bounds", __func__,         \
-                         (WHAT));                                             \
+               flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds",    \
+                         __func__, (WHAT));                                   \
                STREAM_WARN_OFFSETS(S);                                        \
        } while (0)
 
@@ -84,7 +86,8 @@ DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
 #define CHECK_SIZE(S, Z)                                                       \
        do {                                                                   \
                if (((S)->endp + (Z)) > (S)->size) {                           \
-                       zlog_warn(                                             \
+                       flog_warn(                                             \
+                               EC_LIB_STREAM,                                 \
                                "CHECK_SIZE: truncating requested size %lu\n", \
                                (unsigned long)(Z));                           \
                        STREAM_WARN_OFFSETS(S);                                \
@@ -99,16 +102,10 @@ struct stream *stream_new(size_t size)
 
        assert(size > 0);
 
-       s = XCALLOC(MTYPE_STREAM, sizeof(struct stream));
-
-       if (s == NULL)
-               return s;
-
-       if ((s->data = XMALLOC(MTYPE_STREAM_DATA, size)) == NULL) {
-               XFREE(MTYPE_STREAM, s);
-               return NULL;
-       }
+       s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size);
 
+       s->getp = s->endp = 0;
+       s->next = NULL;
        s->size = size;
        return s;
 }
@@ -119,7 +116,6 @@ void stream_free(struct stream *s)
        if (!s)
                return;
 
-       XFREE(MTYPE_STREAM_DATA, s->data);
        XFREE(MTYPE_STREAM, s);
 }
 
@@ -169,27 +165,33 @@ struct stream *stream_dupcat(struct stream *s1, struct stream *s2,
        return new;
 }
 
-size_t stream_resize(struct stream *s, size_t newsize)
+size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
 {
-       uint8_t *newdata;
-       STREAM_VERIFY_SANE(s);
+       struct stream *orig = *sptr;
 
-       newdata = XREALLOC(MTYPE_STREAM_DATA, s->data, newsize);
+       STREAM_VERIFY_SANE(orig);
 
-       if (newdata == NULL)
-               return s->size;
+       orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize);
 
-       s->data = newdata;
-       s->size = newsize;
+       orig->size = newsize;
 
-       if (s->endp > s->size)
-               s->endp = s->size;
-       if (s->getp > s->endp)
-               s->getp = s->endp;
+       if (orig->endp > orig->size)
+               orig->endp = orig->size;
+       if (orig->getp > orig->endp)
+               orig->getp = orig->endp;
 
-       STREAM_VERIFY_SANE(s);
+       STREAM_VERIFY_SANE(orig);
 
-       return s->size;
+       *sptr = orig;
+       return orig->size;
+}
+
+size_t __attribute__((deprecated))stream_resize_orig(struct stream *s,
+                                                    size_t newsize)
+{
+       assert("stream_resize: Switch code to use stream_resize_inplace" == NULL);
+
+       return stream_resize_inplace(&s, newsize);
 }
 
 size_t stream_get_getp(struct stream *s)
@@ -270,7 +272,7 @@ void stream_forward_endp(struct stream *s, size_t size)
 }
 
 /* Copy from stream to destination. */
-inline bool stream_get2(void *dst, struct stream *s, size_t size)
+bool stream_get2(void *dst, struct stream *s, size_t size)
 {
        STREAM_VERIFY_SANE(s);
 
@@ -299,7 +301,7 @@ void stream_get(void *dst, struct stream *s, size_t size)
 }
 
 /* Get next character from the stream. */
-inline bool stream_getc2(struct stream *s, uint8_t *byte)
+bool stream_getc2(struct stream *s, uint8_t *byte)
 {
        STREAM_VERIFY_SANE(s);
 
@@ -344,7 +346,7 @@ uint8_t stream_getc_from(struct stream *s, size_t from)
        return c;
 }
 
-inline bool stream_getw2(struct stream *s, uint16_t *word)
+bool stream_getw2(struct stream *s, uint16_t *word)
 {
        STREAM_VERIFY_SANE(s);
 
@@ -465,7 +467,7 @@ void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
        memcpy(dst, s->data + from, size);
 }
 
-inline bool stream_getl2(struct stream *s, uint32_t *l)
+bool stream_getl2(struct stream *s, uint32_t *l)
 {
        STREAM_VERIFY_SANE(s);
 
@@ -966,8 +968,8 @@ ssize_t stream_read_try(struct stream *s, int fd, size_t size)
        /* Error: was it transient (return -2) or fatal (return -1)? */
        if (ERRNO_IO_RETRY(errno))
                return -2;
-       zlog_warn("%s: read failed on fd %d: %s", __func__, fd,
-                 safe_strerror(errno));
+       flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
+                safe_strerror(errno));
        return -1;
 }
 
@@ -997,8 +999,8 @@ ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
        /* Error: was it transient (return -2) or fatal (return -1)? */
        if (ERRNO_IO_RETRY(errno))
                return -2;
-       zlog_warn("%s: read failed on fd %d: %s", __func__, fd,
-                 safe_strerror(errno));
+       flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
+                safe_strerror(errno));
        return -1;
 }
 
@@ -1101,12 +1103,17 @@ struct stream_fifo *stream_fifo_new(void)
        struct stream_fifo *new;
 
        new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
+       pthread_mutex_init(&new->mtx, NULL);
        return new;
 }
 
 /* Add new stream to fifo. */
 void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
 {
+#if defined DEV_BUILD
+       size_t max, curmax;
+#endif
+
        if (fifo->tail)
                fifo->tail->next = s;
        else
@@ -1114,8 +1121,24 @@ void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
 
        fifo->tail = s;
        fifo->tail->next = NULL;
-
-       fifo->count++;
+#if !defined DEV_BUILD
+       atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
+#else
+       max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
+       curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
+       if (max > curmax)
+               atomic_store_explicit(&fifo->max_count, max,
+                                     memory_order_relaxed);
+#endif
+}
+
+void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
+{
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               stream_fifo_push(fifo, s);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
 }
 
 /* Delete first stream from fifo. */
@@ -1131,7 +1154,8 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo)
                if (fifo->head == NULL)
                        fifo->tail = NULL;
 
-               fifo->count--;
+               atomic_fetch_sub_explicit(&fifo->count, 1,
+                                         memory_order_release);
 
                /* ensure stream is scrubbed of references to this fifo */
                s->next = NULL;
@@ -1140,12 +1164,37 @@ struct stream *stream_fifo_pop(struct stream_fifo *fifo)
        return s;
 }
 
-/* Return first fifo entry. */
+struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
+{
+       struct stream *ret;
+
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               ret = stream_fifo_pop(fifo);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
+
+       return ret;
+}
+
 struct stream *stream_fifo_head(struct stream_fifo *fifo)
 {
        return fifo->head;
 }
 
+struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
+{
+       struct stream *ret;
+
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               ret = stream_fifo_head(fifo);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
+
+       return ret;
+}
+
 void stream_fifo_clean(struct stream_fifo *fifo)
 {
        struct stream *s;
@@ -1156,11 +1205,26 @@ void stream_fifo_clean(struct stream_fifo *fifo)
                stream_free(s);
        }
        fifo->head = fifo->tail = NULL;
-       fifo->count = 0;
+       atomic_store_explicit(&fifo->count, 0, memory_order_release);
+}
+
+void stream_fifo_clean_safe(struct stream_fifo *fifo)
+{
+       pthread_mutex_lock(&fifo->mtx);
+       {
+               stream_fifo_clean(fifo);
+       }
+       pthread_mutex_unlock(&fifo->mtx);
+}
+
+size_t stream_fifo_count_safe(struct stream_fifo *fifo)
+{
+       return atomic_load_explicit(&fifo->count, memory_order_acquire);
 }
 
 void stream_fifo_free(struct stream_fifo *fifo)
 {
        stream_fifo_clean(fifo);
+       pthread_mutex_destroy(&fifo->mtx);
        XFREE(MTYPE_STREAM_FIFO, fifo);
 }