]>
git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
3 * Copyright (C) 1999 Kunihiro Ishiguro
5 * This file is part of GNU Zebra.
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
31 #include "frr_pthread.h"
32 #include "lib_errors.h"
34 DEFINE_MTYPE_STATIC(LIB
, STREAM
, "Stream")
35 DEFINE_MTYPE_STATIC(LIB
, STREAM_FIFO
, "Stream FIFO")
37 /* Tests whether a position is valid */
38 #define GETP_VALID(S, G) ((G) <= (S)->endp)
39 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
40 #define ENDP_VALID(S, E) ((E) <= (S)->size)
42 /* asserting sanity checks. Following must be true before
43 * stream functions are called:
45 * Following must always be true of stream elements
46 * before and after calls to stream functions:
48 * getp <= endp <= size
50 * Note that after a stream function is called following may be true:
51 * if (getp == endp) then stream is no longer readable
52 * if (endp == size) then stream is no longer writeable
54 * It is valid to put to anywhere within the size of the stream, but only
55 * using stream_put..._at() functions.
57 #define STREAM_WARN_OFFSETS(S) \
58 flog_warn(EC_LIB_STREAM, \
59 "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
60 (void *)(S), (unsigned long)(S)->size, \
61 (unsigned long)(S)->getp, (unsigned long)(S)->endp)
63 #define STREAM_VERIFY_SANE(S) \
65 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) \
66 STREAM_WARN_OFFSETS(S); \
67 assert(GETP_VALID(S, (S)->getp)); \
68 assert(ENDP_VALID(S, (S)->endp)); \
71 #define STREAM_BOUND_WARN(S, WHAT) \
73 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
75 STREAM_WARN_OFFSETS(S); \
79 #define STREAM_BOUND_WARN2(S, WHAT) \
81 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
83 STREAM_WARN_OFFSETS(S); \
86 /* XXX: Deprecated macro: do not use */
87 #define CHECK_SIZE(S, Z) \
89 if (((S)->endp + (Z)) > (S)->size) { \
92 "CHECK_SIZE: truncating requested size %lu\n", \
93 (unsigned long)(Z)); \
94 STREAM_WARN_OFFSETS(S); \
95 (Z) = (S)->size - (S)->endp; \
99 /* Make stream buffer. */
100 struct stream
*stream_new(size_t size
)
106 s
= XMALLOC(MTYPE_STREAM
, sizeof(struct stream
) + size
);
108 s
->getp
= s
->endp
= 0;
115 void stream_free(struct stream
*s
)
120 XFREE(MTYPE_STREAM
, s
);
123 struct stream
*stream_copy(struct stream
*dest
, const struct stream
*src
)
125 STREAM_VERIFY_SANE(src
);
127 assert(dest
!= NULL
);
128 assert(STREAM_SIZE(dest
) >= src
->endp
);
130 dest
->endp
= src
->endp
;
131 dest
->getp
= src
->getp
;
133 memcpy(dest
->data
, src
->data
, src
->endp
);
138 struct stream
*stream_dup(const struct stream
*s
)
142 STREAM_VERIFY_SANE(s
);
144 if ((snew
= stream_new(s
->endp
)) == NULL
)
147 return (stream_copy(snew
, s
));
150 struct stream
*stream_dupcat(const struct stream
*s1
, const struct stream
*s2
,
155 STREAM_VERIFY_SANE(s1
);
156 STREAM_VERIFY_SANE(s2
);
158 if ((new = stream_new(s1
->endp
+ s2
->endp
)) == NULL
)
161 memcpy(new->data
, s1
->data
, offset
);
162 memcpy(new->data
+ offset
, s2
->data
, s2
->endp
);
163 memcpy(new->data
+ offset
+ s2
->endp
, s1
->data
+ offset
,
164 (s1
->endp
- offset
));
165 new->endp
= s1
->endp
+ s2
->endp
;
169 size_t stream_resize_inplace(struct stream
**sptr
, size_t newsize
)
171 struct stream
*orig
= *sptr
;
173 STREAM_VERIFY_SANE(orig
);
175 orig
= XREALLOC(MTYPE_STREAM
, orig
, sizeof(struct stream
) + newsize
);
177 orig
->size
= newsize
;
179 if (orig
->endp
> orig
->size
)
180 orig
->endp
= orig
->size
;
181 if (orig
->getp
> orig
->endp
)
182 orig
->getp
= orig
->endp
;
184 STREAM_VERIFY_SANE(orig
);
190 size_t stream_get_getp(const struct stream
*s
)
192 STREAM_VERIFY_SANE(s
);
196 size_t stream_get_endp(const struct stream
*s
)
198 STREAM_VERIFY_SANE(s
);
202 size_t stream_get_size(const struct stream
*s
)
204 STREAM_VERIFY_SANE(s
);
208 /* Stream structre' stream pointer related functions. */
209 void stream_set_getp(struct stream
*s
, size_t pos
)
211 STREAM_VERIFY_SANE(s
);
213 if (!GETP_VALID(s
, pos
)) {
214 STREAM_BOUND_WARN(s
, "set getp");
221 void stream_set_endp(struct stream
*s
, size_t pos
)
223 STREAM_VERIFY_SANE(s
);
225 if (!ENDP_VALID(s
, pos
)) {
226 STREAM_BOUND_WARN(s
, "set endp");
231 * Make sure the current read pointer is not beyond the new endp.
234 STREAM_BOUND_WARN(s
, "set endp");
239 STREAM_VERIFY_SANE(s
);
242 /* Forward pointer. */
243 void stream_forward_getp(struct stream
*s
, size_t size
)
245 STREAM_VERIFY_SANE(s
);
247 if (!GETP_VALID(s
, s
->getp
+ size
)) {
248 STREAM_BOUND_WARN(s
, "seek getp");
255 void stream_forward_endp(struct stream
*s
, size_t size
)
257 STREAM_VERIFY_SANE(s
);
259 if (!ENDP_VALID(s
, s
->endp
+ size
)) {
260 STREAM_BOUND_WARN(s
, "seek endp");
267 /* Copy from stream to destination. */
268 bool stream_get2(void *dst
, struct stream
*s
, size_t size
)
270 STREAM_VERIFY_SANE(s
);
272 if (STREAM_READABLE(s
) < size
) {
273 STREAM_BOUND_WARN2(s
, "get");
277 memcpy(dst
, s
->data
+ s
->getp
, size
);
283 void stream_get(void *dst
, struct stream
*s
, size_t size
)
285 STREAM_VERIFY_SANE(s
);
287 if (STREAM_READABLE(s
) < size
) {
288 STREAM_BOUND_WARN(s
, "get");
292 memcpy(dst
, s
->data
+ s
->getp
, size
);
296 /* Get next character from the stream. */
297 bool stream_getc2(struct stream
*s
, uint8_t *byte
)
299 STREAM_VERIFY_SANE(s
);
301 if (STREAM_READABLE(s
) < sizeof(uint8_t)) {
302 STREAM_BOUND_WARN2(s
, "get char");
305 *byte
= s
->data
[s
->getp
++];
310 uint8_t stream_getc(struct stream
*s
)
314 STREAM_VERIFY_SANE(s
);
316 if (STREAM_READABLE(s
) < sizeof(uint8_t)) {
317 STREAM_BOUND_WARN(s
, "get char");
320 c
= s
->data
[s
->getp
++];
325 /* Get next character from the stream. */
326 uint8_t stream_getc_from(struct stream
*s
, size_t from
)
330 STREAM_VERIFY_SANE(s
);
332 if (!GETP_VALID(s
, from
+ sizeof(uint8_t))) {
333 STREAM_BOUND_WARN(s
, "get char");
342 bool stream_getw2(struct stream
*s
, uint16_t *word
)
344 STREAM_VERIFY_SANE(s
);
346 if (STREAM_READABLE(s
) < sizeof(uint16_t)) {
347 STREAM_BOUND_WARN2(s
, "get ");
351 *word
= s
->data
[s
->getp
++] << 8;
352 *word
|= s
->data
[s
->getp
++];
357 /* Get next word from the stream. */
358 uint16_t stream_getw(struct stream
*s
)
362 STREAM_VERIFY_SANE(s
);
364 if (STREAM_READABLE(s
) < sizeof(uint16_t)) {
365 STREAM_BOUND_WARN(s
, "get ");
369 w
= s
->data
[s
->getp
++] << 8;
370 w
|= s
->data
[s
->getp
++];
375 /* Get next word from the stream. */
376 uint16_t stream_getw_from(struct stream
*s
, size_t from
)
380 STREAM_VERIFY_SANE(s
);
382 if (!GETP_VALID(s
, from
+ sizeof(uint16_t))) {
383 STREAM_BOUND_WARN(s
, "get ");
387 w
= s
->data
[from
++] << 8;
393 /* Get next 3-byte from the stream. */
394 uint32_t stream_get3_from(struct stream
*s
, size_t from
)
398 STREAM_VERIFY_SANE(s
);
400 if (!GETP_VALID(s
, from
+ 3)) {
401 STREAM_BOUND_WARN(s
, "get 3byte");
405 l
= s
->data
[from
++] << 16;
406 l
|= s
->data
[from
++] << 8;
412 uint32_t stream_get3(struct stream
*s
)
416 STREAM_VERIFY_SANE(s
);
418 if (STREAM_READABLE(s
) < 3) {
419 STREAM_BOUND_WARN(s
, "get 3byte");
423 l
= s
->data
[s
->getp
++] << 16;
424 l
|= s
->data
[s
->getp
++] << 8;
425 l
|= s
->data
[s
->getp
++];
430 /* Get next long word from the stream. */
431 uint32_t stream_getl_from(struct stream
*s
, size_t from
)
435 STREAM_VERIFY_SANE(s
);
437 if (!GETP_VALID(s
, from
+ sizeof(uint32_t))) {
438 STREAM_BOUND_WARN(s
, "get long");
442 l
= (unsigned)(s
->data
[from
++]) << 24;
443 l
|= s
->data
[from
++] << 16;
444 l
|= s
->data
[from
++] << 8;
450 /* Copy from stream at specific location to destination. */
451 void stream_get_from(void *dst
, struct stream
*s
, size_t from
, size_t size
)
453 STREAM_VERIFY_SANE(s
);
455 if (!GETP_VALID(s
, from
+ size
)) {
456 STREAM_BOUND_WARN(s
, "get from");
460 memcpy(dst
, s
->data
+ from
, size
);
463 bool stream_getl2(struct stream
*s
, uint32_t *l
)
465 STREAM_VERIFY_SANE(s
);
467 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
468 STREAM_BOUND_WARN2(s
, "get long");
472 *l
= (unsigned int)(s
->data
[s
->getp
++]) << 24;
473 *l
|= s
->data
[s
->getp
++] << 16;
474 *l
|= s
->data
[s
->getp
++] << 8;
475 *l
|= s
->data
[s
->getp
++];
480 uint32_t stream_getl(struct stream
*s
)
484 STREAM_VERIFY_SANE(s
);
486 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
487 STREAM_BOUND_WARN(s
, "get long");
491 l
= (unsigned)(s
->data
[s
->getp
++]) << 24;
492 l
|= s
->data
[s
->getp
++] << 16;
493 l
|= s
->data
[s
->getp
++] << 8;
494 l
|= s
->data
[s
->getp
++];
499 /* Get next quad word from the stream. */
500 uint64_t stream_getq_from(struct stream
*s
, size_t from
)
504 STREAM_VERIFY_SANE(s
);
506 if (!GETP_VALID(s
, from
+ sizeof(uint64_t))) {
507 STREAM_BOUND_WARN(s
, "get quad");
511 q
= ((uint64_t)s
->data
[from
++]) << 56;
512 q
|= ((uint64_t)s
->data
[from
++]) << 48;
513 q
|= ((uint64_t)s
->data
[from
++]) << 40;
514 q
|= ((uint64_t)s
->data
[from
++]) << 32;
515 q
|= ((uint64_t)s
->data
[from
++]) << 24;
516 q
|= ((uint64_t)s
->data
[from
++]) << 16;
517 q
|= ((uint64_t)s
->data
[from
++]) << 8;
518 q
|= ((uint64_t)s
->data
[from
++]);
523 uint64_t stream_getq(struct stream
*s
)
527 STREAM_VERIFY_SANE(s
);
529 if (STREAM_READABLE(s
) < sizeof(uint64_t)) {
530 STREAM_BOUND_WARN(s
, "get quad");
534 q
= ((uint64_t)s
->data
[s
->getp
++]) << 56;
535 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 48;
536 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 40;
537 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 32;
538 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 24;
539 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 16;
540 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 8;
541 q
|= ((uint64_t)s
->data
[s
->getp
++]);
546 bool stream_getq2(struct stream
*s
, uint64_t *q
)
548 STREAM_VERIFY_SANE(s
);
550 if (STREAM_READABLE(s
) < sizeof(uint64_t)) {
551 STREAM_BOUND_WARN2(s
, "get uint64");
555 *q
= ((uint64_t)s
->data
[s
->getp
++]) << 56;
556 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 48;
557 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 40;
558 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 32;
559 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 24;
560 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 16;
561 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 8;
562 *q
|= ((uint64_t)s
->data
[s
->getp
++]);
567 /* Get next long word from the stream. */
568 uint32_t stream_get_ipv4(struct stream
*s
)
572 STREAM_VERIFY_SANE(s
);
574 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
575 STREAM_BOUND_WARN(s
, "get ipv4");
579 memcpy(&l
, s
->data
+ s
->getp
, sizeof(uint32_t));
580 s
->getp
+= sizeof(uint32_t);
585 float stream_getf(struct stream
*s
)
591 u
.d
= stream_getl(s
);
595 double stream_getd(struct stream
*s
)
601 u
.d
= stream_getq(s
);
605 /* Copy to source to stream.
607 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
608 * around. This should be fixed once the stream updates are working.
610 * stream_write() is saner
612 void stream_put(struct stream
*s
, const void *src
, size_t size
)
615 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
618 STREAM_VERIFY_SANE(s
);
620 if (STREAM_WRITEABLE(s
) < size
) {
621 STREAM_BOUND_WARN(s
, "put");
626 memcpy(s
->data
+ s
->endp
, src
, size
);
628 memset(s
->data
+ s
->endp
, 0, size
);
633 /* Put character to the stream. */
634 int stream_putc(struct stream
*s
, uint8_t c
)
636 STREAM_VERIFY_SANE(s
);
638 if (STREAM_WRITEABLE(s
) < sizeof(uint8_t)) {
639 STREAM_BOUND_WARN(s
, "put");
643 s
->data
[s
->endp
++] = c
;
644 return sizeof(uint8_t);
647 /* Put word to the stream. */
648 int stream_putw(struct stream
*s
, uint16_t w
)
650 STREAM_VERIFY_SANE(s
);
652 if (STREAM_WRITEABLE(s
) < sizeof(uint16_t)) {
653 STREAM_BOUND_WARN(s
, "put");
657 s
->data
[s
->endp
++] = (uint8_t)(w
>> 8);
658 s
->data
[s
->endp
++] = (uint8_t)w
;
663 /* Put long word to the stream. */
664 int stream_put3(struct stream
*s
, uint32_t l
)
666 STREAM_VERIFY_SANE(s
);
668 if (STREAM_WRITEABLE(s
) < 3) {
669 STREAM_BOUND_WARN(s
, "put");
673 s
->data
[s
->endp
++] = (uint8_t)(l
>> 16);
674 s
->data
[s
->endp
++] = (uint8_t)(l
>> 8);
675 s
->data
[s
->endp
++] = (uint8_t)l
;
680 /* Put long word to the stream. */
681 int stream_putl(struct stream
*s
, uint32_t l
)
683 STREAM_VERIFY_SANE(s
);
685 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
686 STREAM_BOUND_WARN(s
, "put");
690 s
->data
[s
->endp
++] = (uint8_t)(l
>> 24);
691 s
->data
[s
->endp
++] = (uint8_t)(l
>> 16);
692 s
->data
[s
->endp
++] = (uint8_t)(l
>> 8);
693 s
->data
[s
->endp
++] = (uint8_t)l
;
698 /* Put quad word to the stream. */
699 int stream_putq(struct stream
*s
, uint64_t q
)
701 STREAM_VERIFY_SANE(s
);
703 if (STREAM_WRITEABLE(s
) < sizeof(uint64_t)) {
704 STREAM_BOUND_WARN(s
, "put quad");
708 s
->data
[s
->endp
++] = (uint8_t)(q
>> 56);
709 s
->data
[s
->endp
++] = (uint8_t)(q
>> 48);
710 s
->data
[s
->endp
++] = (uint8_t)(q
>> 40);
711 s
->data
[s
->endp
++] = (uint8_t)(q
>> 32);
712 s
->data
[s
->endp
++] = (uint8_t)(q
>> 24);
713 s
->data
[s
->endp
++] = (uint8_t)(q
>> 16);
714 s
->data
[s
->endp
++] = (uint8_t)(q
>> 8);
715 s
->data
[s
->endp
++] = (uint8_t)q
;
720 int stream_putf(struct stream
*s
, float f
)
727 return stream_putl(s
, u
.o
);
730 int stream_putd(struct stream
*s
, double d
)
737 return stream_putq(s
, u
.o
);
740 int stream_putc_at(struct stream
*s
, size_t putp
, uint8_t c
)
742 STREAM_VERIFY_SANE(s
);
744 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint8_t))) {
745 STREAM_BOUND_WARN(s
, "put");
754 int stream_putw_at(struct stream
*s
, size_t putp
, uint16_t w
)
756 STREAM_VERIFY_SANE(s
);
758 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint16_t))) {
759 STREAM_BOUND_WARN(s
, "put");
763 s
->data
[putp
] = (uint8_t)(w
>> 8);
764 s
->data
[putp
+ 1] = (uint8_t)w
;
769 int stream_put3_at(struct stream
*s
, size_t putp
, uint32_t l
)
771 STREAM_VERIFY_SANE(s
);
773 if (!PUT_AT_VALID(s
, putp
+ 3)) {
774 STREAM_BOUND_WARN(s
, "put");
777 s
->data
[putp
] = (uint8_t)(l
>> 16);
778 s
->data
[putp
+ 1] = (uint8_t)(l
>> 8);
779 s
->data
[putp
+ 2] = (uint8_t)l
;
784 int stream_putl_at(struct stream
*s
, size_t putp
, uint32_t l
)
786 STREAM_VERIFY_SANE(s
);
788 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint32_t))) {
789 STREAM_BOUND_WARN(s
, "put");
792 s
->data
[putp
] = (uint8_t)(l
>> 24);
793 s
->data
[putp
+ 1] = (uint8_t)(l
>> 16);
794 s
->data
[putp
+ 2] = (uint8_t)(l
>> 8);
795 s
->data
[putp
+ 3] = (uint8_t)l
;
800 int stream_putq_at(struct stream
*s
, size_t putp
, uint64_t q
)
802 STREAM_VERIFY_SANE(s
);
804 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint64_t))) {
805 STREAM_BOUND_WARN(s
, "put");
808 s
->data
[putp
] = (uint8_t)(q
>> 56);
809 s
->data
[putp
+ 1] = (uint8_t)(q
>> 48);
810 s
->data
[putp
+ 2] = (uint8_t)(q
>> 40);
811 s
->data
[putp
+ 3] = (uint8_t)(q
>> 32);
812 s
->data
[putp
+ 4] = (uint8_t)(q
>> 24);
813 s
->data
[putp
+ 5] = (uint8_t)(q
>> 16);
814 s
->data
[putp
+ 6] = (uint8_t)(q
>> 8);
815 s
->data
[putp
+ 7] = (uint8_t)q
;
820 /* Put long word to the stream. */
821 int stream_put_ipv4(struct stream
*s
, uint32_t l
)
823 STREAM_VERIFY_SANE(s
);
825 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
826 STREAM_BOUND_WARN(s
, "put");
829 memcpy(s
->data
+ s
->endp
, &l
, sizeof(uint32_t));
830 s
->endp
+= sizeof(uint32_t);
832 return sizeof(uint32_t);
835 /* Put long word to the stream. */
836 int stream_put_in_addr(struct stream
*s
, const struct in_addr
*addr
)
838 STREAM_VERIFY_SANE(s
);
840 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
841 STREAM_BOUND_WARN(s
, "put");
845 memcpy(s
->data
+ s
->endp
, addr
, sizeof(uint32_t));
846 s
->endp
+= sizeof(uint32_t);
848 return sizeof(uint32_t);
851 /* Put in_addr at location in the stream. */
852 int stream_put_in_addr_at(struct stream
*s
, size_t putp
,
853 const struct in_addr
*addr
)
855 STREAM_VERIFY_SANE(s
);
857 if (!PUT_AT_VALID(s
, putp
+ 4)) {
858 STREAM_BOUND_WARN(s
, "put");
862 memcpy(&s
->data
[putp
], addr
, 4);
866 /* Put in6_addr at location in the stream. */
867 int stream_put_in6_addr_at(struct stream
*s
, size_t putp
,
868 const struct in6_addr
*addr
)
870 STREAM_VERIFY_SANE(s
);
872 if (!PUT_AT_VALID(s
, putp
+ 16)) {
873 STREAM_BOUND_WARN(s
, "put");
877 memcpy(&s
->data
[putp
], addr
, 16);
881 /* Put prefix by nlri type format. */
882 int stream_put_prefix_addpath(struct stream
*s
, const struct prefix
*p
,
883 int addpath_encode
, uint32_t addpath_tx_id
)
886 size_t psize_with_addpath
;
888 STREAM_VERIFY_SANE(s
);
890 psize
= PSIZE(p
->prefixlen
);
893 psize_with_addpath
= psize
+ 4;
895 psize_with_addpath
= psize
;
897 if (STREAM_WRITEABLE(s
) < (psize_with_addpath
+ sizeof(uint8_t))) {
898 STREAM_BOUND_WARN(s
, "put");
902 if (addpath_encode
) {
903 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 24);
904 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 16);
905 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 8);
906 s
->data
[s
->endp
++] = (uint8_t)addpath_tx_id
;
909 s
->data
[s
->endp
++] = p
->prefixlen
;
910 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
916 int stream_put_prefix(struct stream
*s
, const struct prefix
*p
)
918 return stream_put_prefix_addpath(s
, p
, 0, 0);
921 /* Put NLRI with label */
922 int stream_put_labeled_prefix(struct stream
*s
, const struct prefix
*p
,
923 mpls_label_t
*label
, int addpath_encode
,
924 uint32_t addpath_tx_id
)
927 size_t psize_with_addpath
;
928 uint8_t *label_pnt
= (uint8_t *)label
;
930 STREAM_VERIFY_SANE(s
);
932 psize
= PSIZE(p
->prefixlen
);
933 psize_with_addpath
= psize
+ (addpath_encode
? 4 : 0);
935 if (STREAM_WRITEABLE(s
) < (psize_with_addpath
+ 3)) {
936 STREAM_BOUND_WARN(s
, "put");
940 if (addpath_encode
) {
941 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 24);
942 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 16);
943 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 8);
944 s
->data
[s
->endp
++] = (uint8_t)addpath_tx_id
;
947 stream_putc(s
, (p
->prefixlen
+ 24));
948 stream_putc(s
, label_pnt
[0]);
949 stream_putc(s
, label_pnt
[1]);
950 stream_putc(s
, label_pnt
[2]);
951 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
957 /* Read size from fd. */
958 int stream_read(struct stream
*s
, int fd
, size_t size
)
962 STREAM_VERIFY_SANE(s
);
964 if (STREAM_WRITEABLE(s
) < size
) {
965 STREAM_BOUND_WARN(s
, "put");
969 nbytes
= readn(fd
, s
->data
+ s
->endp
, size
);
977 ssize_t
stream_read_try(struct stream
*s
, int fd
, size_t size
)
981 STREAM_VERIFY_SANE(s
);
983 if (STREAM_WRITEABLE(s
) < size
) {
984 STREAM_BOUND_WARN(s
, "put");
985 /* Fatal (not transient) error, since retrying will not help
986 (stream is too small to contain the desired data). */
990 if ((nbytes
= read(fd
, s
->data
+ s
->endp
, size
)) >= 0) {
994 /* Error: was it transient (return -2) or fatal (return -1)? */
995 if (ERRNO_IO_RETRY(errno
))
997 flog_err(EC_LIB_SOCKET
, "%s: read failed on fd %d: %s", __func__
, fd
,
998 safe_strerror(errno
));
1002 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
1003 * whose arguments match the remaining arguments to this function
1005 ssize_t
stream_recvfrom(struct stream
*s
, int fd
, size_t size
, int flags
,
1006 struct sockaddr
*from
, socklen_t
*fromlen
)
1010 STREAM_VERIFY_SANE(s
);
1012 if (STREAM_WRITEABLE(s
) < size
) {
1013 STREAM_BOUND_WARN(s
, "put");
1014 /* Fatal (not transient) error, since retrying will not help
1015 (stream is too small to contain the desired data). */
1019 if ((nbytes
= recvfrom(fd
, s
->data
+ s
->endp
, size
, flags
, from
,
1025 /* Error: was it transient (return -2) or fatal (return -1)? */
1026 if (ERRNO_IO_RETRY(errno
))
1028 flog_err(EC_LIB_SOCKET
, "%s: read failed on fd %d: %s", __func__
, fd
,
1029 safe_strerror(errno
));
1033 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1035 * First iovec will be used to receive the data.
1036 * Stream need not be empty.
1038 ssize_t
stream_recvmsg(struct stream
*s
, int fd
, struct msghdr
*msgh
, int flags
,
1044 STREAM_VERIFY_SANE(s
);
1045 assert(msgh
->msg_iovlen
> 0);
1047 if (STREAM_WRITEABLE(s
) < size
) {
1048 STREAM_BOUND_WARN(s
, "put");
1049 /* This is a logic error in the calling code: the stream is too
1051 to hold the desired data! */
1055 iov
= &(msgh
->msg_iov
[0]);
1056 iov
->iov_base
= (s
->data
+ s
->endp
);
1057 iov
->iov_len
= size
;
1059 nbytes
= recvmsg(fd
, msgh
, flags
);
1067 /* Write data to buffer. */
1068 size_t stream_write(struct stream
*s
, const void *ptr
, size_t size
)
1071 CHECK_SIZE(s
, size
);
1073 STREAM_VERIFY_SANE(s
);
1075 if (STREAM_WRITEABLE(s
) < size
) {
1076 STREAM_BOUND_WARN(s
, "put");
1080 memcpy(s
->data
+ s
->endp
, ptr
, size
);
1086 /* Return current read pointer.
1088 * Use stream_get_pnt_to if you must, but decoding streams properly
1091 uint8_t *stream_pnt(struct stream
*s
)
1093 STREAM_VERIFY_SANE(s
);
1094 return s
->data
+ s
->getp
;
1097 /* Check does this stream empty? */
1098 int stream_empty(struct stream
*s
)
1100 STREAM_VERIFY_SANE(s
);
1102 return (s
->endp
== 0);
1106 void stream_reset(struct stream
*s
)
1108 STREAM_VERIFY_SANE(s
);
1110 s
->getp
= s
->endp
= 0;
1113 /* Write stream contens to the file discriptor. */
1114 int stream_flush(struct stream
*s
, int fd
)
1118 STREAM_VERIFY_SANE(s
);
1120 nbytes
= write(fd
, s
->data
+ s
->getp
, s
->endp
- s
->getp
);
1125 void stream_hexdump(const struct stream
*s
)
1127 zlog_hexdump(s
->data
, s
->endp
);
1130 /* Stream first in first out queue. */
1132 struct stream_fifo
*stream_fifo_new(void)
1134 struct stream_fifo
*new;
1136 new = XMALLOC(MTYPE_STREAM_FIFO
, sizeof(struct stream_fifo
));
1137 stream_fifo_init(new);
1141 void stream_fifo_init(struct stream_fifo
*fifo
)
1143 memset(fifo
, 0, sizeof(struct stream_fifo
));
1144 pthread_mutex_init(&fifo
->mtx
, NULL
);
1147 /* Add new stream to fifo. */
1148 void stream_fifo_push(struct stream_fifo
*fifo
, struct stream
*s
)
1150 #if defined DEV_BUILD
1155 fifo
->tail
->next
= s
;
1160 fifo
->tail
->next
= NULL
;
1161 #if !defined DEV_BUILD
1162 atomic_fetch_add_explicit(&fifo
->count
, 1, memory_order_release
);
1164 max
= atomic_fetch_add_explicit(&fifo
->count
, 1, memory_order_release
);
1165 curmax
= atomic_load_explicit(&fifo
->max_count
, memory_order_relaxed
);
1167 atomic_store_explicit(&fifo
->max_count
, max
,
1168 memory_order_relaxed
);
1172 void stream_fifo_push_safe(struct stream_fifo
*fifo
, struct stream
*s
)
1174 frr_with_mutex(&fifo
->mtx
) {
1175 stream_fifo_push(fifo
, s
);
1179 /* Delete first stream from fifo. */
1180 struct stream
*stream_fifo_pop(struct stream_fifo
*fifo
)
1187 fifo
->head
= s
->next
;
1189 if (fifo
->head
== NULL
)
1192 atomic_fetch_sub_explicit(&fifo
->count
, 1,
1193 memory_order_release
);
1195 /* ensure stream is scrubbed of references to this fifo */
1202 struct stream
*stream_fifo_pop_safe(struct stream_fifo
*fifo
)
1206 frr_with_mutex(&fifo
->mtx
) {
1207 ret
= stream_fifo_pop(fifo
);
1213 struct stream
*stream_fifo_head(struct stream_fifo
*fifo
)
1218 struct stream
*stream_fifo_head_safe(struct stream_fifo
*fifo
)
1222 frr_with_mutex(&fifo
->mtx
) {
1223 ret
= stream_fifo_head(fifo
);
1229 void stream_fifo_clean(struct stream_fifo
*fifo
)
1232 struct stream
*next
;
1234 for (s
= fifo
->head
; s
; s
= next
) {
1238 fifo
->head
= fifo
->tail
= NULL
;
1239 atomic_store_explicit(&fifo
->count
, 0, memory_order_release
);
1242 void stream_fifo_clean_safe(struct stream_fifo
*fifo
)
1244 frr_with_mutex(&fifo
->mtx
) {
1245 stream_fifo_clean(fifo
);
1249 size_t stream_fifo_count_safe(struct stream_fifo
*fifo
)
1251 return atomic_load_explicit(&fifo
->count
, memory_order_acquire
);
1254 void stream_fifo_deinit(struct stream_fifo
*fifo
)
1256 stream_fifo_clean(fifo
);
1257 pthread_mutex_destroy(&fifo
->mtx
);
1260 void stream_fifo_free(struct stream_fifo
*fifo
)
1262 stream_fifo_deinit(fifo
);
1263 XFREE(MTYPE_STREAM_FIFO
, fifo
);