]>
git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
3 * Copyright (C) 1999 Kunihiro Ishiguro
5 * This file is part of GNU Zebra.
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
31 #include "frr_pthread.h"
32 #include "lib_errors.h"
34 DEFINE_MTYPE_STATIC(LIB
, STREAM
, "Stream")
35 DEFINE_MTYPE_STATIC(LIB
, STREAM_FIFO
, "Stream FIFO")
37 /* Tests whether a position is valid */
38 #define GETP_VALID(S, G) ((G) <= (S)->endp)
39 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
40 #define ENDP_VALID(S, E) ((E) <= (S)->size)
42 /* asserting sanity checks. Following must be true before
43 * stream functions are called:
45 * Following must always be true of stream elements
46 * before and after calls to stream functions:
48 * getp <= endp <= size
50 * Note that after a stream function is called following may be true:
51 * if (getp == endp) then stream is no longer readable
52 * if (endp == size) then stream is no longer writeable
54 * It is valid to put to anywhere within the size of the stream, but only
55 * using stream_put..._at() functions.
57 #define STREAM_WARN_OFFSETS(S) \
59 flog_warn(EC_LIB_STREAM, \
60 "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu", \
61 (void *)(S), (unsigned long)(S)->size, \
62 (unsigned long)(S)->getp, (unsigned long)(S)->endp); \
63 zlog_backtrace(LOG_WARNING); \
66 #define STREAM_VERIFY_SANE(S) \
68 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) { \
69 STREAM_WARN_OFFSETS(S); \
71 assert(GETP_VALID(S, (S)->getp)); \
72 assert(ENDP_VALID(S, (S)->endp)); \
75 #define STREAM_BOUND_WARN(S, WHAT) \
77 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
79 STREAM_WARN_OFFSETS(S); \
83 #define STREAM_BOUND_WARN2(S, WHAT) \
85 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
87 STREAM_WARN_OFFSETS(S); \
90 /* XXX: Deprecated macro: do not use */
91 #define CHECK_SIZE(S, Z) \
93 if (((S)->endp + (Z)) > (S)->size) { \
96 "CHECK_SIZE: truncating requested size %lu", \
97 (unsigned long)(Z)); \
98 STREAM_WARN_OFFSETS(S); \
99 (Z) = (S)->size - (S)->endp; \
103 /* Make stream buffer. */
104 struct stream
*stream_new(size_t size
)
110 s
= XMALLOC(MTYPE_STREAM
, sizeof(struct stream
) + size
);
112 s
->getp
= s
->endp
= 0;
119 void stream_free(struct stream
*s
)
124 XFREE(MTYPE_STREAM
, s
);
127 struct stream
*stream_copy(struct stream
*dest
, const struct stream
*src
)
129 STREAM_VERIFY_SANE(src
);
131 assert(dest
!= NULL
);
132 assert(STREAM_SIZE(dest
) >= src
->endp
);
134 dest
->endp
= src
->endp
;
135 dest
->getp
= src
->getp
;
137 memcpy(dest
->data
, src
->data
, src
->endp
);
142 struct stream
*stream_dup(const struct stream
*s
)
146 STREAM_VERIFY_SANE(s
);
148 if ((snew
= stream_new(s
->endp
)) == NULL
)
151 return (stream_copy(snew
, s
));
154 struct stream
*stream_dupcat(const struct stream
*s1
, const struct stream
*s2
,
159 STREAM_VERIFY_SANE(s1
);
160 STREAM_VERIFY_SANE(s2
);
162 if ((new = stream_new(s1
->endp
+ s2
->endp
)) == NULL
)
165 memcpy(new->data
, s1
->data
, offset
);
166 memcpy(new->data
+ offset
, s2
->data
, s2
->endp
);
167 memcpy(new->data
+ offset
+ s2
->endp
, s1
->data
+ offset
,
168 (s1
->endp
- offset
));
169 new->endp
= s1
->endp
+ s2
->endp
;
173 size_t stream_resize_inplace(struct stream
**sptr
, size_t newsize
)
175 struct stream
*orig
= *sptr
;
177 STREAM_VERIFY_SANE(orig
);
179 orig
= XREALLOC(MTYPE_STREAM
, orig
, sizeof(struct stream
) + newsize
);
181 orig
->size
= newsize
;
183 if (orig
->endp
> orig
->size
)
184 orig
->endp
= orig
->size
;
185 if (orig
->getp
> orig
->endp
)
186 orig
->getp
= orig
->endp
;
188 STREAM_VERIFY_SANE(orig
);
194 size_t stream_get_getp(const struct stream
*s
)
196 STREAM_VERIFY_SANE(s
);
200 size_t stream_get_endp(const struct stream
*s
)
202 STREAM_VERIFY_SANE(s
);
206 size_t stream_get_size(const struct stream
*s
)
208 STREAM_VERIFY_SANE(s
);
212 /* Stream structre' stream pointer related functions. */
213 void stream_set_getp(struct stream
*s
, size_t pos
)
215 STREAM_VERIFY_SANE(s
);
217 if (!GETP_VALID(s
, pos
)) {
218 STREAM_BOUND_WARN(s
, "set getp");
225 void stream_set_endp(struct stream
*s
, size_t pos
)
227 STREAM_VERIFY_SANE(s
);
229 if (!ENDP_VALID(s
, pos
)) {
230 STREAM_BOUND_WARN(s
, "set endp");
235 * Make sure the current read pointer is not beyond the new endp.
238 STREAM_BOUND_WARN(s
, "set endp");
243 STREAM_VERIFY_SANE(s
);
246 /* Forward pointer. */
247 void stream_forward_getp(struct stream
*s
, size_t size
)
249 STREAM_VERIFY_SANE(s
);
251 if (!GETP_VALID(s
, s
->getp
+ size
)) {
252 STREAM_BOUND_WARN(s
, "seek getp");
259 bool stream_forward_getp2(struct stream
*s
, size_t size
)
261 STREAM_VERIFY_SANE(s
);
263 if (!GETP_VALID(s
, s
->getp
+ size
))
271 void stream_rewind_getp(struct stream
*s
, size_t size
)
273 STREAM_VERIFY_SANE(s
);
275 if (size
> s
->getp
|| !GETP_VALID(s
, s
->getp
- size
)) {
276 STREAM_BOUND_WARN(s
, "rewind getp");
283 bool stream_rewind_getp2(struct stream
*s
, size_t size
)
285 STREAM_VERIFY_SANE(s
);
287 if (size
> s
->getp
|| !GETP_VALID(s
, s
->getp
- size
))
295 void stream_forward_endp(struct stream
*s
, size_t size
)
297 STREAM_VERIFY_SANE(s
);
299 if (!ENDP_VALID(s
, s
->endp
+ size
)) {
300 STREAM_BOUND_WARN(s
, "seek endp");
307 bool stream_forward_endp2(struct stream
*s
, size_t size
)
309 STREAM_VERIFY_SANE(s
);
311 if (!ENDP_VALID(s
, s
->endp
+ size
))
319 /* Copy from stream to destination. */
320 bool stream_get2(void *dst
, struct stream
*s
, size_t size
)
322 STREAM_VERIFY_SANE(s
);
324 if (STREAM_READABLE(s
) < size
) {
325 STREAM_BOUND_WARN2(s
, "get");
329 memcpy(dst
, s
->data
+ s
->getp
, size
);
335 void stream_get(void *dst
, struct stream
*s
, size_t size
)
337 STREAM_VERIFY_SANE(s
);
339 if (STREAM_READABLE(s
) < size
) {
340 STREAM_BOUND_WARN(s
, "get");
344 memcpy(dst
, s
->data
+ s
->getp
, size
);
348 /* Get next character from the stream. */
349 bool stream_getc2(struct stream
*s
, uint8_t *byte
)
351 STREAM_VERIFY_SANE(s
);
353 if (STREAM_READABLE(s
) < sizeof(uint8_t)) {
354 STREAM_BOUND_WARN2(s
, "get char");
357 *byte
= s
->data
[s
->getp
++];
362 uint8_t stream_getc(struct stream
*s
)
366 STREAM_VERIFY_SANE(s
);
368 if (STREAM_READABLE(s
) < sizeof(uint8_t)) {
369 STREAM_BOUND_WARN(s
, "get char");
372 c
= s
->data
[s
->getp
++];
377 /* Get next character from the stream. */
378 uint8_t stream_getc_from(struct stream
*s
, size_t from
)
382 STREAM_VERIFY_SANE(s
);
384 if (!GETP_VALID(s
, from
+ sizeof(uint8_t))) {
385 STREAM_BOUND_WARN(s
, "get char");
394 bool stream_getw2(struct stream
*s
, uint16_t *word
)
396 STREAM_VERIFY_SANE(s
);
398 if (STREAM_READABLE(s
) < sizeof(uint16_t)) {
399 STREAM_BOUND_WARN2(s
, "get ");
403 *word
= s
->data
[s
->getp
++] << 8;
404 *word
|= s
->data
[s
->getp
++];
409 /* Get next word from the stream. */
410 uint16_t stream_getw(struct stream
*s
)
414 STREAM_VERIFY_SANE(s
);
416 if (STREAM_READABLE(s
) < sizeof(uint16_t)) {
417 STREAM_BOUND_WARN(s
, "get ");
421 w
= s
->data
[s
->getp
++] << 8;
422 w
|= s
->data
[s
->getp
++];
427 /* Get next word from the stream. */
428 uint16_t stream_getw_from(struct stream
*s
, size_t from
)
432 STREAM_VERIFY_SANE(s
);
434 if (!GETP_VALID(s
, from
+ sizeof(uint16_t))) {
435 STREAM_BOUND_WARN(s
, "get ");
439 w
= s
->data
[from
++] << 8;
445 /* Get next 3-byte from the stream. */
446 uint32_t stream_get3_from(struct stream
*s
, size_t from
)
450 STREAM_VERIFY_SANE(s
);
452 if (!GETP_VALID(s
, from
+ 3)) {
453 STREAM_BOUND_WARN(s
, "get 3byte");
457 l
= s
->data
[from
++] << 16;
458 l
|= s
->data
[from
++] << 8;
464 uint32_t stream_get3(struct stream
*s
)
468 STREAM_VERIFY_SANE(s
);
470 if (STREAM_READABLE(s
) < 3) {
471 STREAM_BOUND_WARN(s
, "get 3byte");
475 l
= s
->data
[s
->getp
++] << 16;
476 l
|= s
->data
[s
->getp
++] << 8;
477 l
|= s
->data
[s
->getp
++];
482 /* Get next long word from the stream. */
483 uint32_t stream_getl_from(struct stream
*s
, size_t from
)
487 STREAM_VERIFY_SANE(s
);
489 if (!GETP_VALID(s
, from
+ sizeof(uint32_t))) {
490 STREAM_BOUND_WARN(s
, "get long");
494 l
= (unsigned)(s
->data
[from
++]) << 24;
495 l
|= s
->data
[from
++] << 16;
496 l
|= s
->data
[from
++] << 8;
502 /* Copy from stream at specific location to destination. */
503 void stream_get_from(void *dst
, struct stream
*s
, size_t from
, size_t size
)
505 STREAM_VERIFY_SANE(s
);
507 if (!GETP_VALID(s
, from
+ size
)) {
508 STREAM_BOUND_WARN(s
, "get from");
512 memcpy(dst
, s
->data
+ from
, size
);
515 bool stream_getl2(struct stream
*s
, uint32_t *l
)
517 STREAM_VERIFY_SANE(s
);
519 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
520 STREAM_BOUND_WARN2(s
, "get long");
524 *l
= (unsigned int)(s
->data
[s
->getp
++]) << 24;
525 *l
|= s
->data
[s
->getp
++] << 16;
526 *l
|= s
->data
[s
->getp
++] << 8;
527 *l
|= s
->data
[s
->getp
++];
532 uint32_t stream_getl(struct stream
*s
)
536 STREAM_VERIFY_SANE(s
);
538 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
539 STREAM_BOUND_WARN(s
, "get long");
543 l
= (unsigned)(s
->data
[s
->getp
++]) << 24;
544 l
|= s
->data
[s
->getp
++] << 16;
545 l
|= s
->data
[s
->getp
++] << 8;
546 l
|= s
->data
[s
->getp
++];
551 /* Get next quad word from the stream. */
552 uint64_t stream_getq_from(struct stream
*s
, size_t from
)
556 STREAM_VERIFY_SANE(s
);
558 if (!GETP_VALID(s
, from
+ sizeof(uint64_t))) {
559 STREAM_BOUND_WARN(s
, "get quad");
563 q
= ((uint64_t)s
->data
[from
++]) << 56;
564 q
|= ((uint64_t)s
->data
[from
++]) << 48;
565 q
|= ((uint64_t)s
->data
[from
++]) << 40;
566 q
|= ((uint64_t)s
->data
[from
++]) << 32;
567 q
|= ((uint64_t)s
->data
[from
++]) << 24;
568 q
|= ((uint64_t)s
->data
[from
++]) << 16;
569 q
|= ((uint64_t)s
->data
[from
++]) << 8;
570 q
|= ((uint64_t)s
->data
[from
++]);
575 uint64_t stream_getq(struct stream
*s
)
579 STREAM_VERIFY_SANE(s
);
581 if (STREAM_READABLE(s
) < sizeof(uint64_t)) {
582 STREAM_BOUND_WARN(s
, "get quad");
586 q
= ((uint64_t)s
->data
[s
->getp
++]) << 56;
587 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 48;
588 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 40;
589 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 32;
590 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 24;
591 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 16;
592 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 8;
593 q
|= ((uint64_t)s
->data
[s
->getp
++]);
598 bool stream_getq2(struct stream
*s
, uint64_t *q
)
600 STREAM_VERIFY_SANE(s
);
602 if (STREAM_READABLE(s
) < sizeof(uint64_t)) {
603 STREAM_BOUND_WARN2(s
, "get uint64");
607 *q
= ((uint64_t)s
->data
[s
->getp
++]) << 56;
608 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 48;
609 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 40;
610 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 32;
611 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 24;
612 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 16;
613 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 8;
614 *q
|= ((uint64_t)s
->data
[s
->getp
++]);
619 /* Get next long word from the stream. */
620 uint32_t stream_get_ipv4(struct stream
*s
)
624 STREAM_VERIFY_SANE(s
);
626 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
627 STREAM_BOUND_WARN(s
, "get ipv4");
631 memcpy(&l
, s
->data
+ s
->getp
, sizeof(uint32_t));
632 s
->getp
+= sizeof(uint32_t);
637 bool stream_get_ipaddr(struct stream
*s
, struct ipaddr
*ip
)
641 STREAM_VERIFY_SANE(s
);
643 /* Get address type. */
644 if (STREAM_READABLE(s
) < sizeof(uint16_t)) {
645 STREAM_BOUND_WARN2(s
, "get ipaddr");
648 ip
->ipa_type
= stream_getw(s
);
650 /* Get address value. */
651 switch (ip
->ipa_type
) {
653 ipa_len
= IPV4_MAX_BYTELEN
;
656 ipa_len
= IPV6_MAX_BYTELEN
;
659 flog_err(EC_LIB_DEVELOPMENT
,
660 "%s: unknown ip address-family: %u", __func__
,
664 if (STREAM_READABLE(s
) < ipa_len
) {
665 STREAM_BOUND_WARN2(s
, "get ipaddr");
668 memcpy(&ip
->ip
, s
->data
+ s
->getp
, ipa_len
);
674 float stream_getf(struct stream
*s
)
680 u
.d
= stream_getl(s
);
684 double stream_getd(struct stream
*s
)
690 u
.d
= stream_getq(s
);
694 /* Copy to source to stream.
696 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
697 * around. This should be fixed once the stream updates are working.
699 * stream_write() is saner
701 void stream_put(struct stream
*s
, const void *src
, size_t size
)
704 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
707 STREAM_VERIFY_SANE(s
);
709 if (STREAM_WRITEABLE(s
) < size
) {
710 STREAM_BOUND_WARN(s
, "put");
715 memcpy(s
->data
+ s
->endp
, src
, size
);
717 memset(s
->data
+ s
->endp
, 0, size
);
722 /* Put character to the stream. */
723 int stream_putc(struct stream
*s
, uint8_t c
)
725 STREAM_VERIFY_SANE(s
);
727 if (STREAM_WRITEABLE(s
) < sizeof(uint8_t)) {
728 STREAM_BOUND_WARN(s
, "put");
732 s
->data
[s
->endp
++] = c
;
733 return sizeof(uint8_t);
736 /* Put word to the stream. */
737 int stream_putw(struct stream
*s
, uint16_t w
)
739 STREAM_VERIFY_SANE(s
);
741 if (STREAM_WRITEABLE(s
) < sizeof(uint16_t)) {
742 STREAM_BOUND_WARN(s
, "put");
746 s
->data
[s
->endp
++] = (uint8_t)(w
>> 8);
747 s
->data
[s
->endp
++] = (uint8_t)w
;
752 /* Put long word to the stream. */
753 int stream_put3(struct stream
*s
, uint32_t l
)
755 STREAM_VERIFY_SANE(s
);
757 if (STREAM_WRITEABLE(s
) < 3) {
758 STREAM_BOUND_WARN(s
, "put");
762 s
->data
[s
->endp
++] = (uint8_t)(l
>> 16);
763 s
->data
[s
->endp
++] = (uint8_t)(l
>> 8);
764 s
->data
[s
->endp
++] = (uint8_t)l
;
769 /* Put long word to the stream. */
770 int stream_putl(struct stream
*s
, uint32_t l
)
772 STREAM_VERIFY_SANE(s
);
774 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
775 STREAM_BOUND_WARN(s
, "put");
779 s
->data
[s
->endp
++] = (uint8_t)(l
>> 24);
780 s
->data
[s
->endp
++] = (uint8_t)(l
>> 16);
781 s
->data
[s
->endp
++] = (uint8_t)(l
>> 8);
782 s
->data
[s
->endp
++] = (uint8_t)l
;
787 /* Put quad word to the stream. */
788 int stream_putq(struct stream
*s
, uint64_t q
)
790 STREAM_VERIFY_SANE(s
);
792 if (STREAM_WRITEABLE(s
) < sizeof(uint64_t)) {
793 STREAM_BOUND_WARN(s
, "put quad");
797 s
->data
[s
->endp
++] = (uint8_t)(q
>> 56);
798 s
->data
[s
->endp
++] = (uint8_t)(q
>> 48);
799 s
->data
[s
->endp
++] = (uint8_t)(q
>> 40);
800 s
->data
[s
->endp
++] = (uint8_t)(q
>> 32);
801 s
->data
[s
->endp
++] = (uint8_t)(q
>> 24);
802 s
->data
[s
->endp
++] = (uint8_t)(q
>> 16);
803 s
->data
[s
->endp
++] = (uint8_t)(q
>> 8);
804 s
->data
[s
->endp
++] = (uint8_t)q
;
809 int stream_putf(struct stream
*s
, float f
)
816 return stream_putl(s
, u
.o
);
819 int stream_putd(struct stream
*s
, double d
)
826 return stream_putq(s
, u
.o
);
829 int stream_putc_at(struct stream
*s
, size_t putp
, uint8_t c
)
831 STREAM_VERIFY_SANE(s
);
833 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint8_t))) {
834 STREAM_BOUND_WARN(s
, "put");
843 int stream_putw_at(struct stream
*s
, size_t putp
, uint16_t w
)
845 STREAM_VERIFY_SANE(s
);
847 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint16_t))) {
848 STREAM_BOUND_WARN(s
, "put");
852 s
->data
[putp
] = (uint8_t)(w
>> 8);
853 s
->data
[putp
+ 1] = (uint8_t)w
;
858 int stream_put3_at(struct stream
*s
, size_t putp
, uint32_t l
)
860 STREAM_VERIFY_SANE(s
);
862 if (!PUT_AT_VALID(s
, putp
+ 3)) {
863 STREAM_BOUND_WARN(s
, "put");
866 s
->data
[putp
] = (uint8_t)(l
>> 16);
867 s
->data
[putp
+ 1] = (uint8_t)(l
>> 8);
868 s
->data
[putp
+ 2] = (uint8_t)l
;
873 int stream_putl_at(struct stream
*s
, size_t putp
, uint32_t l
)
875 STREAM_VERIFY_SANE(s
);
877 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint32_t))) {
878 STREAM_BOUND_WARN(s
, "put");
881 s
->data
[putp
] = (uint8_t)(l
>> 24);
882 s
->data
[putp
+ 1] = (uint8_t)(l
>> 16);
883 s
->data
[putp
+ 2] = (uint8_t)(l
>> 8);
884 s
->data
[putp
+ 3] = (uint8_t)l
;
889 int stream_putq_at(struct stream
*s
, size_t putp
, uint64_t q
)
891 STREAM_VERIFY_SANE(s
);
893 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint64_t))) {
894 STREAM_BOUND_WARN(s
, "put");
897 s
->data
[putp
] = (uint8_t)(q
>> 56);
898 s
->data
[putp
+ 1] = (uint8_t)(q
>> 48);
899 s
->data
[putp
+ 2] = (uint8_t)(q
>> 40);
900 s
->data
[putp
+ 3] = (uint8_t)(q
>> 32);
901 s
->data
[putp
+ 4] = (uint8_t)(q
>> 24);
902 s
->data
[putp
+ 5] = (uint8_t)(q
>> 16);
903 s
->data
[putp
+ 6] = (uint8_t)(q
>> 8);
904 s
->data
[putp
+ 7] = (uint8_t)q
;
909 /* Put long word to the stream. */
910 int stream_put_ipv4(struct stream
*s
, uint32_t l
)
912 STREAM_VERIFY_SANE(s
);
914 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
915 STREAM_BOUND_WARN(s
, "put");
918 memcpy(s
->data
+ s
->endp
, &l
, sizeof(uint32_t));
919 s
->endp
+= sizeof(uint32_t);
921 return sizeof(uint32_t);
924 /* Put long word to the stream. */
925 int stream_put_in_addr(struct stream
*s
, const struct in_addr
*addr
)
927 STREAM_VERIFY_SANE(s
);
929 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
930 STREAM_BOUND_WARN(s
, "put");
934 memcpy(s
->data
+ s
->endp
, addr
, sizeof(uint32_t));
935 s
->endp
+= sizeof(uint32_t);
937 return sizeof(uint32_t);
940 bool stream_put_ipaddr(struct stream
*s
, struct ipaddr
*ip
)
942 stream_putw(s
, ip
->ipa_type
);
944 switch (ip
->ipa_type
) {
946 stream_put_in_addr(s
, &ip
->ipaddr_v4
);
949 stream_write(s
, (uint8_t *)&ip
->ipaddr_v6
, 16);
952 flog_err(EC_LIB_DEVELOPMENT
,
953 "%s: unknown ip address-family: %u", __func__
,
961 /* Put in_addr at location in the stream. */
962 int stream_put_in_addr_at(struct stream
*s
, size_t putp
,
963 const struct in_addr
*addr
)
965 STREAM_VERIFY_SANE(s
);
967 if (!PUT_AT_VALID(s
, putp
+ 4)) {
968 STREAM_BOUND_WARN(s
, "put");
972 memcpy(&s
->data
[putp
], addr
, 4);
976 /* Put in6_addr at location in the stream. */
977 int stream_put_in6_addr_at(struct stream
*s
, size_t putp
,
978 const struct in6_addr
*addr
)
980 STREAM_VERIFY_SANE(s
);
982 if (!PUT_AT_VALID(s
, putp
+ 16)) {
983 STREAM_BOUND_WARN(s
, "put");
987 memcpy(&s
->data
[putp
], addr
, 16);
991 /* Put prefix by nlri type format. */
992 int stream_put_prefix_addpath(struct stream
*s
, const struct prefix
*p
,
993 int addpath_encode
, uint32_t addpath_tx_id
)
996 size_t psize_with_addpath
;
998 STREAM_VERIFY_SANE(s
);
1000 psize
= PSIZE(p
->prefixlen
);
1003 psize_with_addpath
= psize
+ 4;
1005 psize_with_addpath
= psize
;
1007 if (STREAM_WRITEABLE(s
) < (psize_with_addpath
+ sizeof(uint8_t))) {
1008 STREAM_BOUND_WARN(s
, "put");
1012 if (addpath_encode
) {
1013 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 24);
1014 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 16);
1015 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 8);
1016 s
->data
[s
->endp
++] = (uint8_t)addpath_tx_id
;
1019 s
->data
[s
->endp
++] = p
->prefixlen
;
1020 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
1026 int stream_put_prefix(struct stream
*s
, const struct prefix
*p
)
1028 return stream_put_prefix_addpath(s
, p
, 0, 0);
1031 /* Put NLRI with label */
1032 int stream_put_labeled_prefix(struct stream
*s
, const struct prefix
*p
,
1033 mpls_label_t
*label
, int addpath_encode
,
1034 uint32_t addpath_tx_id
)
1037 size_t psize_with_addpath
;
1038 uint8_t *label_pnt
= (uint8_t *)label
;
1040 STREAM_VERIFY_SANE(s
);
1042 psize
= PSIZE(p
->prefixlen
);
1043 psize_with_addpath
= psize
+ (addpath_encode
? 4 : 0);
1045 if (STREAM_WRITEABLE(s
) < (psize_with_addpath
+ 3)) {
1046 STREAM_BOUND_WARN(s
, "put");
1050 if (addpath_encode
) {
1051 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 24);
1052 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 16);
1053 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 8);
1054 s
->data
[s
->endp
++] = (uint8_t)addpath_tx_id
;
1057 stream_putc(s
, (p
->prefixlen
+ 24));
1058 stream_putc(s
, label_pnt
[0]);
1059 stream_putc(s
, label_pnt
[1]);
1060 stream_putc(s
, label_pnt
[2]);
1061 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
1067 /* Read size from fd. */
1068 int stream_read(struct stream
*s
, int fd
, size_t size
)
1072 STREAM_VERIFY_SANE(s
);
1074 if (STREAM_WRITEABLE(s
) < size
) {
1075 STREAM_BOUND_WARN(s
, "put");
1079 nbytes
= readn(fd
, s
->data
+ s
->endp
, size
);
1087 ssize_t
stream_read_try(struct stream
*s
, int fd
, size_t size
)
1091 STREAM_VERIFY_SANE(s
);
1093 if (STREAM_WRITEABLE(s
) < size
) {
1094 STREAM_BOUND_WARN(s
, "put");
1095 /* Fatal (not transient) error, since retrying will not help
1096 (stream is too small to contain the desired data). */
1100 if ((nbytes
= read(fd
, s
->data
+ s
->endp
, size
)) >= 0) {
1104 /* Error: was it transient (return -2) or fatal (return -1)? */
1105 if (ERRNO_IO_RETRY(errno
))
1107 flog_err(EC_LIB_SOCKET
, "%s: read failed on fd %d: %s", __func__
, fd
,
1108 safe_strerror(errno
));
1112 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
1113 * whose arguments match the remaining arguments to this function
1115 ssize_t
stream_recvfrom(struct stream
*s
, int fd
, size_t size
, int flags
,
1116 struct sockaddr
*from
, socklen_t
*fromlen
)
1120 STREAM_VERIFY_SANE(s
);
1122 if (STREAM_WRITEABLE(s
) < size
) {
1123 STREAM_BOUND_WARN(s
, "put");
1124 /* Fatal (not transient) error, since retrying will not help
1125 (stream is too small to contain the desired data). */
1129 if ((nbytes
= recvfrom(fd
, s
->data
+ s
->endp
, size
, flags
, from
,
1135 /* Error: was it transient (return -2) or fatal (return -1)? */
1136 if (ERRNO_IO_RETRY(errno
))
1138 flog_err(EC_LIB_SOCKET
, "%s: read failed on fd %d: %s", __func__
, fd
,
1139 safe_strerror(errno
));
1143 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1145 * First iovec will be used to receive the data.
1146 * Stream need not be empty.
1148 ssize_t
stream_recvmsg(struct stream
*s
, int fd
, struct msghdr
*msgh
, int flags
,
1154 STREAM_VERIFY_SANE(s
);
1155 assert(msgh
->msg_iovlen
> 0);
1157 if (STREAM_WRITEABLE(s
) < size
) {
1158 STREAM_BOUND_WARN(s
, "put");
1159 /* This is a logic error in the calling code: the stream is too
1161 to hold the desired data! */
1165 iov
= &(msgh
->msg_iov
[0]);
1166 iov
->iov_base
= (s
->data
+ s
->endp
);
1167 iov
->iov_len
= size
;
1169 nbytes
= recvmsg(fd
, msgh
, flags
);
1177 /* Write data to buffer. */
1178 size_t stream_write(struct stream
*s
, const void *ptr
, size_t size
)
1181 CHECK_SIZE(s
, size
);
1183 STREAM_VERIFY_SANE(s
);
1185 if (STREAM_WRITEABLE(s
) < size
) {
1186 STREAM_BOUND_WARN(s
, "put");
1190 memcpy(s
->data
+ s
->endp
, ptr
, size
);
1196 /* Return current read pointer.
1198 * Use stream_get_pnt_to if you must, but decoding streams properly
1201 uint8_t *stream_pnt(struct stream
*s
)
1203 STREAM_VERIFY_SANE(s
);
1204 return s
->data
+ s
->getp
;
1207 /* Check does this stream empty? */
1208 int stream_empty(struct stream
*s
)
1210 STREAM_VERIFY_SANE(s
);
1212 return (s
->endp
== 0);
1216 void stream_reset(struct stream
*s
)
1218 STREAM_VERIFY_SANE(s
);
1220 s
->getp
= s
->endp
= 0;
1223 /* Write stream contens to the file discriptor. */
1224 int stream_flush(struct stream
*s
, int fd
)
1228 STREAM_VERIFY_SANE(s
);
1230 nbytes
= write(fd
, s
->data
+ s
->getp
, s
->endp
- s
->getp
);
1235 void stream_hexdump(const struct stream
*s
)
1237 zlog_hexdump(s
->data
, s
->endp
);
1240 /* Stream first in first out queue. */
1242 struct stream_fifo
*stream_fifo_new(void)
1244 struct stream_fifo
*new;
1246 new = XMALLOC(MTYPE_STREAM_FIFO
, sizeof(struct stream_fifo
));
1247 stream_fifo_init(new);
1251 void stream_fifo_init(struct stream_fifo
*fifo
)
1253 memset(fifo
, 0, sizeof(struct stream_fifo
));
1254 pthread_mutex_init(&fifo
->mtx
, NULL
);
1257 /* Add new stream to fifo. */
1258 void stream_fifo_push(struct stream_fifo
*fifo
, struct stream
*s
)
1260 #if defined DEV_BUILD
1265 fifo
->tail
->next
= s
;
1270 fifo
->tail
->next
= NULL
;
1271 #if !defined DEV_BUILD
1272 atomic_fetch_add_explicit(&fifo
->count
, 1, memory_order_release
);
1274 max
= atomic_fetch_add_explicit(&fifo
->count
, 1, memory_order_release
);
1275 curmax
= atomic_load_explicit(&fifo
->max_count
, memory_order_relaxed
);
1277 atomic_store_explicit(&fifo
->max_count
, max
,
1278 memory_order_relaxed
);
1282 void stream_fifo_push_safe(struct stream_fifo
*fifo
, struct stream
*s
)
1284 frr_with_mutex(&fifo
->mtx
) {
1285 stream_fifo_push(fifo
, s
);
1289 /* Delete first stream from fifo. */
1290 struct stream
*stream_fifo_pop(struct stream_fifo
*fifo
)
1297 fifo
->head
= s
->next
;
1299 if (fifo
->head
== NULL
)
1302 atomic_fetch_sub_explicit(&fifo
->count
, 1,
1303 memory_order_release
);
1305 /* ensure stream is scrubbed of references to this fifo */
1312 struct stream
*stream_fifo_pop_safe(struct stream_fifo
*fifo
)
1316 frr_with_mutex(&fifo
->mtx
) {
1317 ret
= stream_fifo_pop(fifo
);
1323 struct stream
*stream_fifo_head(struct stream_fifo
*fifo
)
1328 struct stream
*stream_fifo_head_safe(struct stream_fifo
*fifo
)
1332 frr_with_mutex(&fifo
->mtx
) {
1333 ret
= stream_fifo_head(fifo
);
1339 void stream_fifo_clean(struct stream_fifo
*fifo
)
1342 struct stream
*next
;
1344 for (s
= fifo
->head
; s
; s
= next
) {
1348 fifo
->head
= fifo
->tail
= NULL
;
1349 atomic_store_explicit(&fifo
->count
, 0, memory_order_release
);
1352 void stream_fifo_clean_safe(struct stream_fifo
*fifo
)
1354 frr_with_mutex(&fifo
->mtx
) {
1355 stream_fifo_clean(fifo
);
1359 size_t stream_fifo_count_safe(struct stream_fifo
*fifo
)
1361 return atomic_load_explicit(&fifo
->count
, memory_order_acquire
);
1364 void stream_fifo_deinit(struct stream_fifo
*fifo
)
1366 stream_fifo_clean(fifo
);
1367 pthread_mutex_destroy(&fifo
->mtx
);
1370 void stream_fifo_free(struct stream_fifo
*fifo
)
1372 stream_fifo_deinit(fifo
);
1373 XFREE(MTYPE_STREAM_FIFO
, fifo
);
1376 void stream_pulldown(struct stream
*s
)
1378 size_t rlen
= STREAM_READABLE(s
);
1380 /* No more data, so just move the pointers. */
1386 /* Move the available data to the beginning. */
1387 memmove(s
->data
, &s
->data
[s
->getp
], rlen
);