]>
git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
4 * Copyright (C) 1999 Kunihiro Ishiguro
16 #include "frr_pthread.h"
17 #include "lib_errors.h"
19 DEFINE_MTYPE_STATIC(LIB
, STREAM
, "Stream");
20 DEFINE_MTYPE_STATIC(LIB
, STREAM_FIFO
, "Stream FIFO");
22 /* Tests whether a position is valid */
23 #define GETP_VALID(S, G) ((G) <= (S)->endp)
24 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
25 #define ENDP_VALID(S, E) ((E) <= (S)->size)
27 /* asserting sanity checks. Following must be true before
28 * stream functions are called:
30 * Following must always be true of stream elements
31 * before and after calls to stream functions:
33 * getp <= endp <= size
35 * Note that after a stream function is called following may be true:
36 * if (getp == endp) then stream is no longer readable
37 * if (endp == size) then stream is no longer writeable
39 * It is valid to put to anywhere within the size of the stream, but only
40 * using stream_put..._at() functions.
42 #define STREAM_WARN_OFFSETS(S) \
44 flog_warn(EC_LIB_STREAM, \
45 "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu", \
46 (void *)(S), (unsigned long)(S)->size, \
47 (unsigned long)(S)->getp, (unsigned long)(S)->endp); \
48 zlog_backtrace(LOG_WARNING); \
51 #define STREAM_VERIFY_SANE(S) \
53 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) { \
54 STREAM_WARN_OFFSETS(S); \
56 assert(GETP_VALID(S, (S)->getp)); \
57 assert(ENDP_VALID(S, (S)->endp)); \
60 #define STREAM_BOUND_WARN(S, WHAT) \
62 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
64 STREAM_WARN_OFFSETS(S); \
68 #define STREAM_BOUND_WARN2(S, WHAT) \
70 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
72 STREAM_WARN_OFFSETS(S); \
75 /* XXX: Deprecated macro: do not use */
76 #define CHECK_SIZE(S, Z) \
78 if (((S)->endp + (Z)) > (S)->size) { \
81 "CHECK_SIZE: truncating requested size %lu", \
82 (unsigned long)(Z)); \
83 STREAM_WARN_OFFSETS(S); \
84 (Z) = (S)->size - (S)->endp; \
88 /* Make stream buffer. */
89 struct stream
*stream_new(size_t size
)
95 s
= XMALLOC(MTYPE_STREAM
, sizeof(struct stream
) + size
);
97 s
->getp
= s
->endp
= 0;
104 void stream_free(struct stream
*s
)
109 XFREE(MTYPE_STREAM
, s
);
112 struct stream
*stream_copy(struct stream
*dest
, const struct stream
*src
)
114 STREAM_VERIFY_SANE(src
);
116 assert(dest
!= NULL
);
117 assert(STREAM_SIZE(dest
) >= src
->endp
);
119 dest
->endp
= src
->endp
;
120 dest
->getp
= src
->getp
;
122 memcpy(dest
->data
, src
->data
, src
->endp
);
127 struct stream
*stream_dup(const struct stream
*s
)
131 STREAM_VERIFY_SANE(s
);
133 snew
= stream_new(s
->endp
);
135 return (stream_copy(snew
, s
));
138 struct stream
*stream_dupcat(const struct stream
*s1
, const struct stream
*s2
,
143 STREAM_VERIFY_SANE(s1
);
144 STREAM_VERIFY_SANE(s2
);
146 if ((new = stream_new(s1
->endp
+ s2
->endp
)) == NULL
)
149 memcpy(new->data
, s1
->data
, offset
);
150 memcpy(new->data
+ offset
, s2
->data
, s2
->endp
);
151 memcpy(new->data
+ offset
+ s2
->endp
, s1
->data
+ offset
,
152 (s1
->endp
- offset
));
153 new->endp
= s1
->endp
+ s2
->endp
;
157 size_t stream_resize_inplace(struct stream
**sptr
, size_t newsize
)
159 struct stream
*orig
= *sptr
;
161 STREAM_VERIFY_SANE(orig
);
163 orig
= XREALLOC(MTYPE_STREAM
, orig
, sizeof(struct stream
) + newsize
);
165 orig
->size
= newsize
;
167 if (orig
->endp
> orig
->size
)
168 orig
->endp
= orig
->size
;
169 if (orig
->getp
> orig
->endp
)
170 orig
->getp
= orig
->endp
;
172 STREAM_VERIFY_SANE(orig
);
178 size_t stream_get_getp(const struct stream
*s
)
180 STREAM_VERIFY_SANE(s
);
184 size_t stream_get_endp(const struct stream
*s
)
186 STREAM_VERIFY_SANE(s
);
190 size_t stream_get_size(const struct stream
*s
)
192 STREAM_VERIFY_SANE(s
);
196 /* Stream structre' stream pointer related functions. */
197 void stream_set_getp(struct stream
*s
, size_t pos
)
199 STREAM_VERIFY_SANE(s
);
201 if (!GETP_VALID(s
, pos
)) {
202 STREAM_BOUND_WARN(s
, "set getp");
209 void stream_set_endp(struct stream
*s
, size_t pos
)
211 STREAM_VERIFY_SANE(s
);
213 if (!ENDP_VALID(s
, pos
)) {
214 STREAM_BOUND_WARN(s
, "set endp");
219 * Make sure the current read pointer is not beyond the new endp.
222 STREAM_BOUND_WARN(s
, "set endp");
227 STREAM_VERIFY_SANE(s
);
230 /* Forward pointer. */
231 void stream_forward_getp(struct stream
*s
, size_t size
)
233 STREAM_VERIFY_SANE(s
);
235 if (!GETP_VALID(s
, s
->getp
+ size
)) {
236 STREAM_BOUND_WARN(s
, "seek getp");
243 bool stream_forward_getp2(struct stream
*s
, size_t size
)
245 STREAM_VERIFY_SANE(s
);
247 if (!GETP_VALID(s
, s
->getp
+ size
))
255 void stream_rewind_getp(struct stream
*s
, size_t size
)
257 STREAM_VERIFY_SANE(s
);
259 if (size
> s
->getp
|| !GETP_VALID(s
, s
->getp
- size
)) {
260 STREAM_BOUND_WARN(s
, "rewind getp");
267 bool stream_rewind_getp2(struct stream
*s
, size_t size
)
269 STREAM_VERIFY_SANE(s
);
271 if (size
> s
->getp
|| !GETP_VALID(s
, s
->getp
- size
))
279 void stream_forward_endp(struct stream
*s
, size_t size
)
281 STREAM_VERIFY_SANE(s
);
283 if (!ENDP_VALID(s
, s
->endp
+ size
)) {
284 STREAM_BOUND_WARN(s
, "seek endp");
291 bool stream_forward_endp2(struct stream
*s
, size_t size
)
293 STREAM_VERIFY_SANE(s
);
295 if (!ENDP_VALID(s
, s
->endp
+ size
))
303 /* Copy from stream to destination. */
304 bool stream_get2(void *dst
, struct stream
*s
, size_t size
)
306 STREAM_VERIFY_SANE(s
);
308 if (STREAM_READABLE(s
) < size
) {
309 STREAM_BOUND_WARN2(s
, "get");
313 memcpy(dst
, s
->data
+ s
->getp
, size
);
319 void stream_get(void *dst
, struct stream
*s
, size_t size
)
321 STREAM_VERIFY_SANE(s
);
323 if (STREAM_READABLE(s
) < size
) {
324 STREAM_BOUND_WARN(s
, "get");
328 memcpy(dst
, s
->data
+ s
->getp
, size
);
332 /* Get next character from the stream. */
333 bool stream_getc2(struct stream
*s
, uint8_t *byte
)
335 STREAM_VERIFY_SANE(s
);
337 if (STREAM_READABLE(s
) < sizeof(uint8_t)) {
338 STREAM_BOUND_WARN2(s
, "get char");
341 *byte
= s
->data
[s
->getp
++];
346 uint8_t stream_getc(struct stream
*s
)
350 STREAM_VERIFY_SANE(s
);
352 if (STREAM_READABLE(s
) < sizeof(uint8_t)) {
353 STREAM_BOUND_WARN(s
, "get char");
356 c
= s
->data
[s
->getp
++];
361 /* Get next character from the stream. */
362 uint8_t stream_getc_from(struct stream
*s
, size_t from
)
366 STREAM_VERIFY_SANE(s
);
368 if (!GETP_VALID(s
, from
+ sizeof(uint8_t))) {
369 STREAM_BOUND_WARN(s
, "get char");
378 bool stream_getw2(struct stream
*s
, uint16_t *word
)
380 STREAM_VERIFY_SANE(s
);
382 if (STREAM_READABLE(s
) < sizeof(uint16_t)) {
383 STREAM_BOUND_WARN2(s
, "get ");
387 *word
= s
->data
[s
->getp
++] << 8;
388 *word
|= s
->data
[s
->getp
++];
393 /* Get next word from the stream. */
394 uint16_t stream_getw(struct stream
*s
)
398 STREAM_VERIFY_SANE(s
);
400 if (STREAM_READABLE(s
) < sizeof(uint16_t)) {
401 STREAM_BOUND_WARN(s
, "get ");
405 w
= s
->data
[s
->getp
++] << 8;
406 w
|= s
->data
[s
->getp
++];
411 /* Get next word from the stream. */
412 uint16_t stream_getw_from(struct stream
*s
, size_t from
)
416 STREAM_VERIFY_SANE(s
);
418 if (!GETP_VALID(s
, from
+ sizeof(uint16_t))) {
419 STREAM_BOUND_WARN(s
, "get ");
423 w
= s
->data
[from
++] << 8;
429 /* Get next 3-byte from the stream. */
430 uint32_t stream_get3_from(struct stream
*s
, size_t from
)
434 STREAM_VERIFY_SANE(s
);
436 if (!GETP_VALID(s
, from
+ 3)) {
437 STREAM_BOUND_WARN(s
, "get 3byte");
441 l
= s
->data
[from
++] << 16;
442 l
|= s
->data
[from
++] << 8;
448 uint32_t stream_get3(struct stream
*s
)
452 STREAM_VERIFY_SANE(s
);
454 if (STREAM_READABLE(s
) < 3) {
455 STREAM_BOUND_WARN(s
, "get 3byte");
459 l
= s
->data
[s
->getp
++] << 16;
460 l
|= s
->data
[s
->getp
++] << 8;
461 l
|= s
->data
[s
->getp
++];
466 /* Get next long word from the stream. */
467 uint32_t stream_getl_from(struct stream
*s
, size_t from
)
471 STREAM_VERIFY_SANE(s
);
473 if (!GETP_VALID(s
, from
+ sizeof(uint32_t))) {
474 STREAM_BOUND_WARN(s
, "get long");
478 l
= (unsigned)(s
->data
[from
++]) << 24;
479 l
|= s
->data
[from
++] << 16;
480 l
|= s
->data
[from
++] << 8;
486 /* Copy from stream at specific location to destination. */
487 void stream_get_from(void *dst
, struct stream
*s
, size_t from
, size_t size
)
489 STREAM_VERIFY_SANE(s
);
491 if (!GETP_VALID(s
, from
+ size
)) {
492 STREAM_BOUND_WARN(s
, "get from");
496 memcpy(dst
, s
->data
+ from
, size
);
499 bool stream_getl2(struct stream
*s
, uint32_t *l
)
501 STREAM_VERIFY_SANE(s
);
503 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
504 STREAM_BOUND_WARN2(s
, "get long");
508 *l
= (unsigned int)(s
->data
[s
->getp
++]) << 24;
509 *l
|= s
->data
[s
->getp
++] << 16;
510 *l
|= s
->data
[s
->getp
++] << 8;
511 *l
|= s
->data
[s
->getp
++];
516 uint32_t stream_getl(struct stream
*s
)
520 STREAM_VERIFY_SANE(s
);
522 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
523 STREAM_BOUND_WARN(s
, "get long");
527 l
= (unsigned)(s
->data
[s
->getp
++]) << 24;
528 l
|= s
->data
[s
->getp
++] << 16;
529 l
|= s
->data
[s
->getp
++] << 8;
530 l
|= s
->data
[s
->getp
++];
535 /* Get next quad word from the stream. */
536 uint64_t stream_getq_from(struct stream
*s
, size_t from
)
540 STREAM_VERIFY_SANE(s
);
542 if (!GETP_VALID(s
, from
+ sizeof(uint64_t))) {
543 STREAM_BOUND_WARN(s
, "get quad");
547 q
= ((uint64_t)s
->data
[from
++]) << 56;
548 q
|= ((uint64_t)s
->data
[from
++]) << 48;
549 q
|= ((uint64_t)s
->data
[from
++]) << 40;
550 q
|= ((uint64_t)s
->data
[from
++]) << 32;
551 q
|= ((uint64_t)s
->data
[from
++]) << 24;
552 q
|= ((uint64_t)s
->data
[from
++]) << 16;
553 q
|= ((uint64_t)s
->data
[from
++]) << 8;
554 q
|= ((uint64_t)s
->data
[from
++]);
559 uint64_t stream_getq(struct stream
*s
)
563 STREAM_VERIFY_SANE(s
);
565 if (STREAM_READABLE(s
) < sizeof(uint64_t)) {
566 STREAM_BOUND_WARN(s
, "get quad");
570 q
= ((uint64_t)s
->data
[s
->getp
++]) << 56;
571 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 48;
572 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 40;
573 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 32;
574 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 24;
575 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 16;
576 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 8;
577 q
|= ((uint64_t)s
->data
[s
->getp
++]);
582 bool stream_getq2(struct stream
*s
, uint64_t *q
)
584 STREAM_VERIFY_SANE(s
);
586 if (STREAM_READABLE(s
) < sizeof(uint64_t)) {
587 STREAM_BOUND_WARN2(s
, "get uint64");
591 *q
= ((uint64_t)s
->data
[s
->getp
++]) << 56;
592 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 48;
593 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 40;
594 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 32;
595 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 24;
596 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 16;
597 *q
|= ((uint64_t)s
->data
[s
->getp
++]) << 8;
598 *q
|= ((uint64_t)s
->data
[s
->getp
++]);
603 /* Get next long word from the stream. */
604 uint32_t stream_get_ipv4(struct stream
*s
)
608 STREAM_VERIFY_SANE(s
);
610 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
611 STREAM_BOUND_WARN(s
, "get ipv4");
615 memcpy(&l
, s
->data
+ s
->getp
, sizeof(uint32_t));
616 s
->getp
+= sizeof(uint32_t);
621 bool stream_get_ipaddr(struct stream
*s
, struct ipaddr
*ip
)
623 uint16_t ipa_len
= 0;
625 STREAM_VERIFY_SANE(s
);
627 /* Get address type. */
628 if (STREAM_READABLE(s
) < sizeof(uint16_t)) {
629 STREAM_BOUND_WARN2(s
, "get ipaddr");
632 ip
->ipa_type
= stream_getw(s
);
634 /* Get address value. */
635 switch (ip
->ipa_type
) {
637 ipa_len
= IPV4_MAX_BYTELEN
;
640 ipa_len
= IPV6_MAX_BYTELEN
;
643 flog_err(EC_LIB_DEVELOPMENT
,
644 "%s: unknown ip address-family: %u", __func__
,
648 if (STREAM_READABLE(s
) < ipa_len
) {
649 STREAM_BOUND_WARN2(s
, "get ipaddr");
652 memcpy(&ip
->ip
, s
->data
+ s
->getp
, ipa_len
);
658 float stream_getf(struct stream
*s
)
664 u
.d
= stream_getl(s
);
668 double stream_getd(struct stream
*s
)
674 u
.d
= stream_getq(s
);
678 /* Copy from source to stream.
680 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
681 * around. This should be fixed once the stream updates are working.
683 * stream_write() is saner
685 void stream_put(struct stream
*s
, const void *src
, size_t size
)
688 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
691 STREAM_VERIFY_SANE(s
);
693 if (STREAM_WRITEABLE(s
) < size
) {
694 STREAM_BOUND_WARN(s
, "put");
699 memcpy(s
->data
+ s
->endp
, src
, size
);
701 memset(s
->data
+ s
->endp
, 0, size
);
706 /* Put character to the stream. */
707 int stream_putc(struct stream
*s
, uint8_t c
)
709 STREAM_VERIFY_SANE(s
);
711 if (STREAM_WRITEABLE(s
) < sizeof(uint8_t)) {
712 STREAM_BOUND_WARN(s
, "put");
716 s
->data
[s
->endp
++] = c
;
717 return sizeof(uint8_t);
720 /* Put word to the stream. */
721 int stream_putw(struct stream
*s
, uint16_t w
)
723 STREAM_VERIFY_SANE(s
);
725 if (STREAM_WRITEABLE(s
) < sizeof(uint16_t)) {
726 STREAM_BOUND_WARN(s
, "put");
730 s
->data
[s
->endp
++] = (uint8_t)(w
>> 8);
731 s
->data
[s
->endp
++] = (uint8_t)w
;
736 /* Put long word to the stream. */
737 int stream_put3(struct stream
*s
, uint32_t l
)
739 STREAM_VERIFY_SANE(s
);
741 if (STREAM_WRITEABLE(s
) < 3) {
742 STREAM_BOUND_WARN(s
, "put");
746 s
->data
[s
->endp
++] = (uint8_t)(l
>> 16);
747 s
->data
[s
->endp
++] = (uint8_t)(l
>> 8);
748 s
->data
[s
->endp
++] = (uint8_t)l
;
753 /* Put long word to the stream. */
754 int stream_putl(struct stream
*s
, uint32_t l
)
756 STREAM_VERIFY_SANE(s
);
758 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
759 STREAM_BOUND_WARN(s
, "put");
763 s
->data
[s
->endp
++] = (uint8_t)(l
>> 24);
764 s
->data
[s
->endp
++] = (uint8_t)(l
>> 16);
765 s
->data
[s
->endp
++] = (uint8_t)(l
>> 8);
766 s
->data
[s
->endp
++] = (uint8_t)l
;
771 /* Put quad word to the stream. */
772 int stream_putq(struct stream
*s
, uint64_t q
)
774 STREAM_VERIFY_SANE(s
);
776 if (STREAM_WRITEABLE(s
) < sizeof(uint64_t)) {
777 STREAM_BOUND_WARN(s
, "put quad");
781 s
->data
[s
->endp
++] = (uint8_t)(q
>> 56);
782 s
->data
[s
->endp
++] = (uint8_t)(q
>> 48);
783 s
->data
[s
->endp
++] = (uint8_t)(q
>> 40);
784 s
->data
[s
->endp
++] = (uint8_t)(q
>> 32);
785 s
->data
[s
->endp
++] = (uint8_t)(q
>> 24);
786 s
->data
[s
->endp
++] = (uint8_t)(q
>> 16);
787 s
->data
[s
->endp
++] = (uint8_t)(q
>> 8);
788 s
->data
[s
->endp
++] = (uint8_t)q
;
793 int stream_putf(struct stream
*s
, float f
)
800 return stream_putl(s
, u
.o
);
803 int stream_putd(struct stream
*s
, double d
)
810 return stream_putq(s
, u
.o
);
813 int stream_putc_at(struct stream
*s
, size_t putp
, uint8_t c
)
815 STREAM_VERIFY_SANE(s
);
817 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint8_t))) {
818 STREAM_BOUND_WARN(s
, "put");
827 int stream_putw_at(struct stream
*s
, size_t putp
, uint16_t w
)
829 STREAM_VERIFY_SANE(s
);
831 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint16_t))) {
832 STREAM_BOUND_WARN(s
, "put");
836 s
->data
[putp
] = (uint8_t)(w
>> 8);
837 s
->data
[putp
+ 1] = (uint8_t)w
;
842 int stream_put3_at(struct stream
*s
, size_t putp
, uint32_t l
)
844 STREAM_VERIFY_SANE(s
);
846 if (!PUT_AT_VALID(s
, putp
+ 3)) {
847 STREAM_BOUND_WARN(s
, "put");
850 s
->data
[putp
] = (uint8_t)(l
>> 16);
851 s
->data
[putp
+ 1] = (uint8_t)(l
>> 8);
852 s
->data
[putp
+ 2] = (uint8_t)l
;
857 int stream_putl_at(struct stream
*s
, size_t putp
, uint32_t l
)
859 STREAM_VERIFY_SANE(s
);
861 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint32_t))) {
862 STREAM_BOUND_WARN(s
, "put");
865 s
->data
[putp
] = (uint8_t)(l
>> 24);
866 s
->data
[putp
+ 1] = (uint8_t)(l
>> 16);
867 s
->data
[putp
+ 2] = (uint8_t)(l
>> 8);
868 s
->data
[putp
+ 3] = (uint8_t)l
;
873 int stream_putq_at(struct stream
*s
, size_t putp
, uint64_t q
)
875 STREAM_VERIFY_SANE(s
);
877 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint64_t))) {
878 STREAM_BOUND_WARN(s
, "put");
881 s
->data
[putp
] = (uint8_t)(q
>> 56);
882 s
->data
[putp
+ 1] = (uint8_t)(q
>> 48);
883 s
->data
[putp
+ 2] = (uint8_t)(q
>> 40);
884 s
->data
[putp
+ 3] = (uint8_t)(q
>> 32);
885 s
->data
[putp
+ 4] = (uint8_t)(q
>> 24);
886 s
->data
[putp
+ 5] = (uint8_t)(q
>> 16);
887 s
->data
[putp
+ 6] = (uint8_t)(q
>> 8);
888 s
->data
[putp
+ 7] = (uint8_t)q
;
893 /* Put long word to the stream. */
894 int stream_put_ipv4(struct stream
*s
, uint32_t l
)
896 STREAM_VERIFY_SANE(s
);
898 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
899 STREAM_BOUND_WARN(s
, "put");
902 memcpy(s
->data
+ s
->endp
, &l
, sizeof(uint32_t));
903 s
->endp
+= sizeof(uint32_t);
905 return sizeof(uint32_t);
908 /* Put long word to the stream. */
909 int stream_put_in_addr(struct stream
*s
, const struct in_addr
*addr
)
911 STREAM_VERIFY_SANE(s
);
913 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
914 STREAM_BOUND_WARN(s
, "put");
918 memcpy(s
->data
+ s
->endp
, addr
, sizeof(uint32_t));
919 s
->endp
+= sizeof(uint32_t);
921 return sizeof(uint32_t);
924 bool stream_put_ipaddr(struct stream
*s
, struct ipaddr
*ip
)
926 stream_putw(s
, ip
->ipa_type
);
928 switch (ip
->ipa_type
) {
930 stream_put_in_addr(s
, &ip
->ipaddr_v4
);
933 stream_write(s
, (uint8_t *)&ip
->ipaddr_v6
, 16);
936 flog_err(EC_LIB_DEVELOPMENT
,
937 "%s: unknown ip address-family: %u", __func__
,
945 /* Put in_addr at location in the stream. */
946 int stream_put_in_addr_at(struct stream
*s
, size_t putp
,
947 const struct in_addr
*addr
)
949 STREAM_VERIFY_SANE(s
);
951 if (!PUT_AT_VALID(s
, putp
+ 4)) {
952 STREAM_BOUND_WARN(s
, "put");
956 memcpy(&s
->data
[putp
], addr
, 4);
960 /* Put in6_addr at location in the stream. */
961 int stream_put_in6_addr_at(struct stream
*s
, size_t putp
,
962 const struct in6_addr
*addr
)
964 STREAM_VERIFY_SANE(s
);
966 if (!PUT_AT_VALID(s
, putp
+ 16)) {
967 STREAM_BOUND_WARN(s
, "put");
971 memcpy(&s
->data
[putp
], addr
, 16);
975 /* Put prefix by nlri type format. */
976 int stream_put_prefix_addpath(struct stream
*s
, const struct prefix
*p
,
977 bool addpath_capable
, uint32_t addpath_tx_id
)
980 size_t psize_with_addpath
;
982 STREAM_VERIFY_SANE(s
);
984 psize
= PSIZE(p
->prefixlen
);
987 psize_with_addpath
= psize
+ 4;
989 psize_with_addpath
= psize
;
991 if (STREAM_WRITEABLE(s
) < (psize_with_addpath
+ sizeof(uint8_t))) {
992 STREAM_BOUND_WARN(s
, "put");
996 if (addpath_capable
) {
997 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 24);
998 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 16);
999 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 8);
1000 s
->data
[s
->endp
++] = (uint8_t)addpath_tx_id
;
1003 s
->data
[s
->endp
++] = p
->prefixlen
;
1004 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
1010 int stream_put_prefix(struct stream
*s
, const struct prefix
*p
)
1012 return stream_put_prefix_addpath(s
, p
, 0, 0);
1015 /* Put NLRI with label */
1016 int stream_put_labeled_prefix(struct stream
*s
, const struct prefix
*p
,
1017 mpls_label_t
*label
, bool addpath_capable
,
1018 uint32_t addpath_tx_id
)
1021 size_t psize_with_addpath
;
1022 uint8_t *label_pnt
= (uint8_t *)label
;
1024 STREAM_VERIFY_SANE(s
);
1026 psize
= PSIZE(p
->prefixlen
);
1027 psize_with_addpath
= psize
+ (addpath_capable
? 4 : 0);
1029 if (STREAM_WRITEABLE(s
) < (psize_with_addpath
+ 3)) {
1030 STREAM_BOUND_WARN(s
, "put");
1034 if (addpath_capable
) {
1035 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 24);
1036 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 16);
1037 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 8);
1038 s
->data
[s
->endp
++] = (uint8_t)addpath_tx_id
;
1041 stream_putc(s
, (p
->prefixlen
+ 24));
1042 stream_putc(s
, label_pnt
[0]);
1043 stream_putc(s
, label_pnt
[1]);
1044 stream_putc(s
, label_pnt
[2]);
1045 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
1051 /* Read size from fd. */
1052 int stream_read(struct stream
*s
, int fd
, size_t size
)
1056 STREAM_VERIFY_SANE(s
);
1058 if (STREAM_WRITEABLE(s
) < size
) {
1059 STREAM_BOUND_WARN(s
, "put");
1063 nbytes
= readn(fd
, s
->data
+ s
->endp
, size
);
1071 ssize_t
stream_read_try(struct stream
*s
, int fd
, size_t size
)
1075 STREAM_VERIFY_SANE(s
);
1077 if (STREAM_WRITEABLE(s
) < size
) {
1078 STREAM_BOUND_WARN(s
, "put");
1079 /* Fatal (not transient) error, since retrying will not help
1080 (stream is too small to contain the desired data). */
1084 nbytes
= read(fd
, s
->data
+ s
->endp
, size
);
1089 /* Error: was it transient (return -2) or fatal (return -1)? */
1090 if (ERRNO_IO_RETRY(errno
))
1092 flog_err(EC_LIB_SOCKET
, "%s: read failed on fd %d: %s", __func__
, fd
,
1093 safe_strerror(errno
));
1097 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
1098 * whose arguments match the remaining arguments to this function
1100 ssize_t
stream_recvfrom(struct stream
*s
, int fd
, size_t size
, int flags
,
1101 struct sockaddr
*from
, socklen_t
*fromlen
)
1105 STREAM_VERIFY_SANE(s
);
1107 if (STREAM_WRITEABLE(s
) < size
) {
1108 STREAM_BOUND_WARN(s
, "put");
1109 /* Fatal (not transient) error, since retrying will not help
1110 (stream is too small to contain the desired data). */
1114 nbytes
= recvfrom(fd
, s
->data
+ s
->endp
, size
, flags
, from
, fromlen
);
1119 /* Error: was it transient (return -2) or fatal (return -1)? */
1120 if (ERRNO_IO_RETRY(errno
))
1122 flog_err(EC_LIB_SOCKET
, "%s: read failed on fd %d: %s", __func__
, fd
,
1123 safe_strerror(errno
));
1127 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1129 * First iovec will be used to receive the data.
1130 * Stream need not be empty.
1132 ssize_t
stream_recvmsg(struct stream
*s
, int fd
, struct msghdr
*msgh
, int flags
,
1138 STREAM_VERIFY_SANE(s
);
1139 assert(msgh
->msg_iovlen
> 0);
1141 if (STREAM_WRITEABLE(s
) < size
) {
1142 STREAM_BOUND_WARN(s
, "put");
1143 /* This is a logic error in the calling code: the stream is too
1145 to hold the desired data! */
1149 iov
= &(msgh
->msg_iov
[0]);
1150 iov
->iov_base
= (s
->data
+ s
->endp
);
1151 iov
->iov_len
= size
;
1153 nbytes
= recvmsg(fd
, msgh
, flags
);
1161 /* Write data to buffer. */
1162 size_t stream_write(struct stream
*s
, const void *ptr
, size_t size
)
1165 CHECK_SIZE(s
, size
);
1167 STREAM_VERIFY_SANE(s
);
1169 if (STREAM_WRITEABLE(s
) < size
) {
1170 STREAM_BOUND_WARN(s
, "put");
1174 memcpy(s
->data
+ s
->endp
, ptr
, size
);
1180 /* Return current read pointer.
1182 * Use stream_get_pnt_to if you must, but decoding streams properly
1185 uint8_t *stream_pnt(struct stream
*s
)
1187 STREAM_VERIFY_SANE(s
);
1188 return s
->data
+ s
->getp
;
1191 /* Check does this stream empty? */
1192 int stream_empty(struct stream
*s
)
1194 STREAM_VERIFY_SANE(s
);
1196 return (s
->endp
== 0);
1200 void stream_reset(struct stream
*s
)
1202 STREAM_VERIFY_SANE(s
);
1204 s
->getp
= s
->endp
= 0;
1207 /* Write stream contens to the file discriptor. */
1208 int stream_flush(struct stream
*s
, int fd
)
1212 STREAM_VERIFY_SANE(s
);
1214 nbytes
= write(fd
, s
->data
+ s
->getp
, s
->endp
- s
->getp
);
1219 void stream_hexdump(const struct stream
*s
)
1221 zlog_hexdump(s
->data
, s
->endp
);
1224 /* Stream first in first out queue. */
1226 struct stream_fifo
*stream_fifo_new(void)
1228 struct stream_fifo
*new;
1230 new = XMALLOC(MTYPE_STREAM_FIFO
, sizeof(struct stream_fifo
));
1231 stream_fifo_init(new);
1235 void stream_fifo_init(struct stream_fifo
*fifo
)
1237 memset(fifo
, 0, sizeof(struct stream_fifo
));
1238 pthread_mutex_init(&fifo
->mtx
, NULL
);
1241 /* Add new stream to fifo. */
1242 void stream_fifo_push(struct stream_fifo
*fifo
, struct stream
*s
)
1244 #if defined DEV_BUILD
1249 fifo
->tail
->next
= s
;
1254 fifo
->tail
->next
= NULL
;
1255 #if !defined DEV_BUILD
1256 atomic_fetch_add_explicit(&fifo
->count
, 1, memory_order_release
);
1258 max
= atomic_fetch_add_explicit(&fifo
->count
, 1, memory_order_release
);
1259 curmax
= atomic_load_explicit(&fifo
->max_count
, memory_order_relaxed
);
1261 atomic_store_explicit(&fifo
->max_count
, max
,
1262 memory_order_relaxed
);
1266 void stream_fifo_push_safe(struct stream_fifo
*fifo
, struct stream
*s
)
1268 frr_with_mutex (&fifo
->mtx
) {
1269 stream_fifo_push(fifo
, s
);
1273 /* Delete first stream from fifo. */
1274 struct stream
*stream_fifo_pop(struct stream_fifo
*fifo
)
1281 fifo
->head
= s
->next
;
1283 if (fifo
->head
== NULL
)
1286 atomic_fetch_sub_explicit(&fifo
->count
, 1,
1287 memory_order_release
);
1289 /* ensure stream is scrubbed of references to this fifo */
1296 struct stream
*stream_fifo_pop_safe(struct stream_fifo
*fifo
)
1300 frr_with_mutex (&fifo
->mtx
) {
1301 ret
= stream_fifo_pop(fifo
);
1307 struct stream
*stream_fifo_head(struct stream_fifo
*fifo
)
1312 struct stream
*stream_fifo_head_safe(struct stream_fifo
*fifo
)
1316 frr_with_mutex (&fifo
->mtx
) {
1317 ret
= stream_fifo_head(fifo
);
1323 void stream_fifo_clean(struct stream_fifo
*fifo
)
1326 struct stream
*next
;
1328 for (s
= fifo
->head
; s
; s
= next
) {
1332 fifo
->head
= fifo
->tail
= NULL
;
1333 atomic_store_explicit(&fifo
->count
, 0, memory_order_release
);
1336 void stream_fifo_clean_safe(struct stream_fifo
*fifo
)
1338 frr_with_mutex (&fifo
->mtx
) {
1339 stream_fifo_clean(fifo
);
1343 size_t stream_fifo_count_safe(struct stream_fifo
*fifo
)
1345 return atomic_load_explicit(&fifo
->count
, memory_order_acquire
);
1348 void stream_fifo_deinit(struct stream_fifo
*fifo
)
1350 stream_fifo_clean(fifo
);
1351 pthread_mutex_destroy(&fifo
->mtx
);
1354 void stream_fifo_free(struct stream_fifo
*fifo
)
1356 stream_fifo_deinit(fifo
);
1357 XFREE(MTYPE_STREAM_FIFO
, fifo
);
1360 void stream_pulldown(struct stream
*s
)
1362 size_t rlen
= STREAM_READABLE(s
);
1364 /* No more data, so just move the pointers. */
1370 /* Move the available data to the beginning. */
1371 memmove(s
->data
, &s
->data
[s
->getp
], rlen
);