]>
git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
3 * Copyright (C) 1999 Kunihiro Ishiguro
5 * This file is part of GNU Zebra.
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
31 DEFINE_MTYPE_STATIC(LIB
, STREAM
, "Stream")
32 DEFINE_MTYPE_STATIC(LIB
, STREAM_DATA
, "Stream data")
33 DEFINE_MTYPE_STATIC(LIB
, STREAM_FIFO
, "Stream FIFO")
35 /* Tests whether a position is valid */
36 #define GETP_VALID(S, G) ((G) <= (S)->endp)
37 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
38 #define ENDP_VALID(S, E) ((E) <= (S)->size)
40 /* asserting sanity checks. Following must be true before
41 * stream functions are called:
43 * Following must always be true of stream elements
44 * before and after calls to stream functions:
46 * getp <= endp <= size
48 * Note that after a stream function is called following may be true:
49 * if (getp == endp) then stream is no longer readable
50 * if (endp == size) then stream is no longer writeable
52 * It is valid to put to anywhere within the size of the stream, but only
53 * using stream_put..._at() functions.
55 #define STREAM_WARN_OFFSETS(S) \
56 zlog_warn("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
57 (void *)(S), (unsigned long)(S)->size, \
58 (unsigned long)(S)->getp, (unsigned long)(S)->endp)
60 #define STREAM_VERIFY_SANE(S) \
62 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) \
63 STREAM_WARN_OFFSETS(S); \
64 assert(GETP_VALID(S, (S)->getp)); \
65 assert(ENDP_VALID(S, (S)->endp)); \
68 #define STREAM_BOUND_WARN(S, WHAT) \
70 zlog_warn("%s: Attempt to %s out of bounds", __func__, \
72 STREAM_WARN_OFFSETS(S); \
76 #define STREAM_BOUND_WARN2(S, WHAT) \
78 zlog_warn("%s: Attempt to %s out of bounds", __func__, \
80 STREAM_WARN_OFFSETS(S); \
83 /* XXX: Deprecated macro: do not use */
84 #define CHECK_SIZE(S, Z) \
86 if (((S)->endp + (Z)) > (S)->size) { \
88 "CHECK_SIZE: truncating requested size %lu\n", \
89 (unsigned long)(Z)); \
90 STREAM_WARN_OFFSETS(S); \
91 (Z) = (S)->size - (S)->endp; \
95 /* Make stream buffer. */
96 struct stream
*stream_new(size_t size
)
102 s
= XCALLOC(MTYPE_STREAM
, sizeof(struct stream
));
107 if ((s
->data
= XMALLOC(MTYPE_STREAM_DATA
, size
)) == NULL
) {
108 XFREE(MTYPE_STREAM
, s
);
117 void stream_free(struct stream
*s
)
122 XFREE(MTYPE_STREAM_DATA
, s
->data
);
123 XFREE(MTYPE_STREAM
, s
);
126 struct stream
*stream_copy(struct stream
*new, struct stream
*src
)
128 STREAM_VERIFY_SANE(src
);
131 assert(STREAM_SIZE(new) >= src
->endp
);
133 new->endp
= src
->endp
;
134 new->getp
= src
->getp
;
136 memcpy(new->data
, src
->data
, src
->endp
);
141 struct stream
*stream_dup(struct stream
*s
)
145 STREAM_VERIFY_SANE(s
);
147 if ((new = stream_new(s
->endp
)) == NULL
)
150 return (stream_copy(new, s
));
153 struct stream
*stream_dupcat(struct stream
*s1
, struct stream
*s2
,
158 STREAM_VERIFY_SANE(s1
);
159 STREAM_VERIFY_SANE(s2
);
161 if ((new = stream_new(s1
->endp
+ s2
->endp
)) == NULL
)
164 memcpy(new->data
, s1
->data
, offset
);
165 memcpy(new->data
+ offset
, s2
->data
, s2
->endp
);
166 memcpy(new->data
+ offset
+ s2
->endp
, s1
->data
+ offset
,
167 (s1
->endp
- offset
));
168 new->endp
= s1
->endp
+ s2
->endp
;
172 size_t stream_resize(struct stream
*s
, size_t newsize
)
175 STREAM_VERIFY_SANE(s
);
177 newdata
= XREALLOC(MTYPE_STREAM_DATA
, s
->data
, newsize
);
185 if (s
->endp
> s
->size
)
187 if (s
->getp
> s
->endp
)
190 STREAM_VERIFY_SANE(s
);
195 size_t stream_get_getp(struct stream
*s
)
197 STREAM_VERIFY_SANE(s
);
201 size_t stream_get_endp(struct stream
*s
)
203 STREAM_VERIFY_SANE(s
);
207 size_t stream_get_size(struct stream
*s
)
209 STREAM_VERIFY_SANE(s
);
213 /* Stream structre' stream pointer related functions. */
214 void stream_set_getp(struct stream
*s
, size_t pos
)
216 STREAM_VERIFY_SANE(s
);
218 if (!GETP_VALID(s
, pos
)) {
219 STREAM_BOUND_WARN(s
, "set getp");
226 void stream_set_endp(struct stream
*s
, size_t pos
)
228 STREAM_VERIFY_SANE(s
);
230 if (!ENDP_VALID(s
, pos
)) {
231 STREAM_BOUND_WARN(s
, "set endp");
236 * Make sure the current read pointer is not beyond the new endp.
239 STREAM_BOUND_WARN(s
, "set endp");
244 STREAM_VERIFY_SANE(s
);
247 /* Forward pointer. */
248 void stream_forward_getp(struct stream
*s
, size_t size
)
250 STREAM_VERIFY_SANE(s
);
252 if (!GETP_VALID(s
, s
->getp
+ size
)) {
253 STREAM_BOUND_WARN(s
, "seek getp");
260 void stream_forward_endp(struct stream
*s
, size_t size
)
262 STREAM_VERIFY_SANE(s
);
264 if (!ENDP_VALID(s
, s
->endp
+ size
)) {
265 STREAM_BOUND_WARN(s
, "seek endp");
272 /* Copy from stream to destination. */
273 inline bool stream_get2(void *dst
, struct stream
*s
, size_t size
)
275 STREAM_VERIFY_SANE(s
);
277 if (STREAM_READABLE(s
) < size
) {
278 STREAM_BOUND_WARN2(s
, "get");
282 memcpy(dst
, s
->data
+ s
->getp
, size
);
288 void stream_get(void *dst
, struct stream
*s
, size_t size
)
290 STREAM_VERIFY_SANE(s
);
292 if (STREAM_READABLE(s
) < size
) {
293 STREAM_BOUND_WARN(s
, "get");
297 memcpy(dst
, s
->data
+ s
->getp
, size
);
301 /* Get next character from the stream. */
302 inline bool stream_getc2(struct stream
*s
, u_char
*byte
)
304 STREAM_VERIFY_SANE(s
);
306 if (STREAM_READABLE(s
) < sizeof(u_char
)) {
307 STREAM_BOUND_WARN2(s
, "get char");
310 *byte
= s
->data
[s
->getp
++];
315 u_char
stream_getc(struct stream
*s
)
319 STREAM_VERIFY_SANE(s
);
321 if (STREAM_READABLE(s
) < sizeof(u_char
)) {
322 STREAM_BOUND_WARN(s
, "get char");
325 c
= s
->data
[s
->getp
++];
330 /* Get next character from the stream. */
331 u_char
stream_getc_from(struct stream
*s
, size_t from
)
335 STREAM_VERIFY_SANE(s
);
337 if (!GETP_VALID(s
, from
+ sizeof(u_char
))) {
338 STREAM_BOUND_WARN(s
, "get char");
347 inline bool stream_getw2(struct stream
*s
, uint16_t *word
)
349 STREAM_VERIFY_SANE(s
);
351 if (STREAM_READABLE(s
) < sizeof(uint16_t)) {
352 STREAM_BOUND_WARN2(s
, "get ");
356 *word
= s
->data
[s
->getp
++] << 8;
357 *word
|= s
->data
[s
->getp
++];
362 /* Get next word from the stream. */
363 u_int16_t
stream_getw(struct stream
*s
)
367 STREAM_VERIFY_SANE(s
);
369 if (STREAM_READABLE(s
) < sizeof(u_int16_t
)) {
370 STREAM_BOUND_WARN(s
, "get ");
374 w
= s
->data
[s
->getp
++] << 8;
375 w
|= s
->data
[s
->getp
++];
380 /* Get next word from the stream. */
381 u_int16_t
stream_getw_from(struct stream
*s
, size_t from
)
385 STREAM_VERIFY_SANE(s
);
387 if (!GETP_VALID(s
, from
+ sizeof(u_int16_t
))) {
388 STREAM_BOUND_WARN(s
, "get ");
392 w
= s
->data
[from
++] << 8;
398 /* Get next 3-byte from the stream. */
399 u_int32_t
stream_get3_from(struct stream
*s
, size_t from
)
403 STREAM_VERIFY_SANE(s
);
405 if (!GETP_VALID(s
, from
+ 3)) {
406 STREAM_BOUND_WARN(s
, "get 3byte");
410 l
= s
->data
[from
++] << 16;
411 l
|= s
->data
[from
++] << 8;
417 u_int32_t
stream_get3(struct stream
*s
)
421 STREAM_VERIFY_SANE(s
);
423 if (STREAM_READABLE(s
) < 3) {
424 STREAM_BOUND_WARN(s
, "get 3byte");
428 l
= s
->data
[s
->getp
++] << 16;
429 l
|= s
->data
[s
->getp
++] << 8;
430 l
|= s
->data
[s
->getp
++];
435 /* Get next long word from the stream. */
436 u_int32_t
stream_getl_from(struct stream
*s
, size_t from
)
440 STREAM_VERIFY_SANE(s
);
442 if (!GETP_VALID(s
, from
+ sizeof(u_int32_t
))) {
443 STREAM_BOUND_WARN(s
, "get long");
447 l
= (unsigned)(s
->data
[from
++]) << 24;
448 l
|= s
->data
[from
++] << 16;
449 l
|= s
->data
[from
++] << 8;
455 /* Copy from stream at specific location to destination. */
456 void stream_get_from(void *dst
, struct stream
*s
, size_t from
, size_t size
)
458 STREAM_VERIFY_SANE(s
);
460 if (!GETP_VALID(s
, from
+ size
)) {
461 STREAM_BOUND_WARN(s
, "get from");
465 memcpy(dst
, s
->data
+ from
, size
);
468 inline bool stream_getl2(struct stream
*s
, uint32_t *l
)
470 STREAM_VERIFY_SANE(s
);
472 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
473 STREAM_BOUND_WARN2(s
, "get long");
477 *l
= (unsigned int)(s
->data
[s
->getp
++]) << 24;
478 *l
|= s
->data
[s
->getp
++] << 16;
479 *l
|= s
->data
[s
->getp
++] << 8;
480 *l
|= s
->data
[s
->getp
++];
485 u_int32_t
stream_getl(struct stream
*s
)
489 STREAM_VERIFY_SANE(s
);
491 if (STREAM_READABLE(s
) < sizeof(u_int32_t
)) {
492 STREAM_BOUND_WARN(s
, "get long");
496 l
= (unsigned)(s
->data
[s
->getp
++]) << 24;
497 l
|= s
->data
[s
->getp
++] << 16;
498 l
|= s
->data
[s
->getp
++] << 8;
499 l
|= s
->data
[s
->getp
++];
504 /* Get next quad word from the stream. */
505 uint64_t stream_getq_from(struct stream
*s
, size_t from
)
509 STREAM_VERIFY_SANE(s
);
511 if (!GETP_VALID(s
, from
+ sizeof(uint64_t))) {
512 STREAM_BOUND_WARN(s
, "get quad");
516 q
= ((uint64_t)s
->data
[from
++]) << 56;
517 q
|= ((uint64_t)s
->data
[from
++]) << 48;
518 q
|= ((uint64_t)s
->data
[from
++]) << 40;
519 q
|= ((uint64_t)s
->data
[from
++]) << 32;
520 q
|= ((uint64_t)s
->data
[from
++]) << 24;
521 q
|= ((uint64_t)s
->data
[from
++]) << 16;
522 q
|= ((uint64_t)s
->data
[from
++]) << 8;
523 q
|= ((uint64_t)s
->data
[from
++]);
528 uint64_t stream_getq(struct stream
*s
)
532 STREAM_VERIFY_SANE(s
);
534 if (STREAM_READABLE(s
) < sizeof(uint64_t)) {
535 STREAM_BOUND_WARN(s
, "get quad");
539 q
= ((uint64_t)s
->data
[s
->getp
++]) << 56;
540 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 48;
541 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 40;
542 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 32;
543 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 24;
544 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 16;
545 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 8;
546 q
|= ((uint64_t)s
->data
[s
->getp
++]);
551 /* Get next long word from the stream. */
552 u_int32_t
stream_get_ipv4(struct stream
*s
)
556 STREAM_VERIFY_SANE(s
);
558 if (STREAM_READABLE(s
) < sizeof(u_int32_t
)) {
559 STREAM_BOUND_WARN(s
, "get ipv4");
563 memcpy(&l
, s
->data
+ s
->getp
, sizeof(u_int32_t
));
564 s
->getp
+= sizeof(u_int32_t
);
569 float stream_getf(struct stream
*s
)
575 u
.d
= stream_getl(s
);
579 double stream_getd(struct stream
*s
)
585 u
.d
= stream_getq(s
);
589 /* Copy to source to stream.
591 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
592 * around. This should be fixed once the stream updates are working.
594 * stream_write() is saner
596 void stream_put(struct stream
*s
, const void *src
, size_t size
)
599 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
602 STREAM_VERIFY_SANE(s
);
604 if (STREAM_WRITEABLE(s
) < size
) {
605 STREAM_BOUND_WARN(s
, "put");
610 memcpy(s
->data
+ s
->endp
, src
, size
);
612 memset(s
->data
+ s
->endp
, 0, size
);
617 /* Put character to the stream. */
618 int stream_putc(struct stream
*s
, u_char c
)
620 STREAM_VERIFY_SANE(s
);
622 if (STREAM_WRITEABLE(s
) < sizeof(u_char
)) {
623 STREAM_BOUND_WARN(s
, "put");
627 s
->data
[s
->endp
++] = c
;
628 return sizeof(u_char
);
631 /* Put word to the stream. */
632 int stream_putw(struct stream
*s
, u_int16_t w
)
634 STREAM_VERIFY_SANE(s
);
636 if (STREAM_WRITEABLE(s
) < sizeof(u_int16_t
)) {
637 STREAM_BOUND_WARN(s
, "put");
641 s
->data
[s
->endp
++] = (u_char
)(w
>> 8);
642 s
->data
[s
->endp
++] = (u_char
)w
;
647 /* Put long word to the stream. */
648 int stream_put3(struct stream
*s
, u_int32_t l
)
650 STREAM_VERIFY_SANE(s
);
652 if (STREAM_WRITEABLE(s
) < 3) {
653 STREAM_BOUND_WARN(s
, "put");
657 s
->data
[s
->endp
++] = (u_char
)(l
>> 16);
658 s
->data
[s
->endp
++] = (u_char
)(l
>> 8);
659 s
->data
[s
->endp
++] = (u_char
)l
;
664 /* Put long word to the stream. */
665 int stream_putl(struct stream
*s
, u_int32_t l
)
667 STREAM_VERIFY_SANE(s
);
669 if (STREAM_WRITEABLE(s
) < sizeof(u_int32_t
)) {
670 STREAM_BOUND_WARN(s
, "put");
674 s
->data
[s
->endp
++] = (u_char
)(l
>> 24);
675 s
->data
[s
->endp
++] = (u_char
)(l
>> 16);
676 s
->data
[s
->endp
++] = (u_char
)(l
>> 8);
677 s
->data
[s
->endp
++] = (u_char
)l
;
682 /* Put quad word to the stream. */
683 int stream_putq(struct stream
*s
, uint64_t q
)
685 STREAM_VERIFY_SANE(s
);
687 if (STREAM_WRITEABLE(s
) < sizeof(uint64_t)) {
688 STREAM_BOUND_WARN(s
, "put quad");
692 s
->data
[s
->endp
++] = (u_char
)(q
>> 56);
693 s
->data
[s
->endp
++] = (u_char
)(q
>> 48);
694 s
->data
[s
->endp
++] = (u_char
)(q
>> 40);
695 s
->data
[s
->endp
++] = (u_char
)(q
>> 32);
696 s
->data
[s
->endp
++] = (u_char
)(q
>> 24);
697 s
->data
[s
->endp
++] = (u_char
)(q
>> 16);
698 s
->data
[s
->endp
++] = (u_char
)(q
>> 8);
699 s
->data
[s
->endp
++] = (u_char
)q
;
704 int stream_putf(struct stream
*s
, float f
)
711 return stream_putl(s
, u
.o
);
714 int stream_putd(struct stream
*s
, double d
)
721 return stream_putq(s
, u
.o
);
724 int stream_putc_at(struct stream
*s
, size_t putp
, u_char c
)
726 STREAM_VERIFY_SANE(s
);
728 if (!PUT_AT_VALID(s
, putp
+ sizeof(u_char
))) {
729 STREAM_BOUND_WARN(s
, "put");
738 int stream_putw_at(struct stream
*s
, size_t putp
, u_int16_t w
)
740 STREAM_VERIFY_SANE(s
);
742 if (!PUT_AT_VALID(s
, putp
+ sizeof(u_int16_t
))) {
743 STREAM_BOUND_WARN(s
, "put");
747 s
->data
[putp
] = (u_char
)(w
>> 8);
748 s
->data
[putp
+ 1] = (u_char
)w
;
753 int stream_put3_at(struct stream
*s
, size_t putp
, u_int32_t l
)
755 STREAM_VERIFY_SANE(s
);
757 if (!PUT_AT_VALID(s
, putp
+ 3)) {
758 STREAM_BOUND_WARN(s
, "put");
761 s
->data
[putp
] = (u_char
)(l
>> 16);
762 s
->data
[putp
+ 1] = (u_char
)(l
>> 8);
763 s
->data
[putp
+ 2] = (u_char
)l
;
768 int stream_putl_at(struct stream
*s
, size_t putp
, u_int32_t l
)
770 STREAM_VERIFY_SANE(s
);
772 if (!PUT_AT_VALID(s
, putp
+ sizeof(u_int32_t
))) {
773 STREAM_BOUND_WARN(s
, "put");
776 s
->data
[putp
] = (u_char
)(l
>> 24);
777 s
->data
[putp
+ 1] = (u_char
)(l
>> 16);
778 s
->data
[putp
+ 2] = (u_char
)(l
>> 8);
779 s
->data
[putp
+ 3] = (u_char
)l
;
784 int stream_putq_at(struct stream
*s
, size_t putp
, uint64_t q
)
786 STREAM_VERIFY_SANE(s
);
788 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint64_t))) {
789 STREAM_BOUND_WARN(s
, "put");
792 s
->data
[putp
] = (u_char
)(q
>> 56);
793 s
->data
[putp
+ 1] = (u_char
)(q
>> 48);
794 s
->data
[putp
+ 2] = (u_char
)(q
>> 40);
795 s
->data
[putp
+ 3] = (u_char
)(q
>> 32);
796 s
->data
[putp
+ 4] = (u_char
)(q
>> 24);
797 s
->data
[putp
+ 5] = (u_char
)(q
>> 16);
798 s
->data
[putp
+ 6] = (u_char
)(q
>> 8);
799 s
->data
[putp
+ 7] = (u_char
)q
;
804 /* Put long word to the stream. */
805 int stream_put_ipv4(struct stream
*s
, u_int32_t l
)
807 STREAM_VERIFY_SANE(s
);
809 if (STREAM_WRITEABLE(s
) < sizeof(u_int32_t
)) {
810 STREAM_BOUND_WARN(s
, "put");
813 memcpy(s
->data
+ s
->endp
, &l
, sizeof(u_int32_t
));
814 s
->endp
+= sizeof(u_int32_t
);
816 return sizeof(u_int32_t
);
819 /* Put long word to the stream. */
820 int stream_put_in_addr(struct stream
*s
, struct in_addr
*addr
)
822 STREAM_VERIFY_SANE(s
);
824 if (STREAM_WRITEABLE(s
) < sizeof(u_int32_t
)) {
825 STREAM_BOUND_WARN(s
, "put");
829 memcpy(s
->data
+ s
->endp
, addr
, sizeof(u_int32_t
));
830 s
->endp
+= sizeof(u_int32_t
);
832 return sizeof(u_int32_t
);
835 /* Put in_addr at location in the stream. */
836 int stream_put_in_addr_at(struct stream
*s
, size_t putp
, struct in_addr
*addr
)
838 STREAM_VERIFY_SANE(s
);
840 if (!PUT_AT_VALID(s
, putp
+ 4)) {
841 STREAM_BOUND_WARN(s
, "put");
845 memcpy(&s
->data
[putp
], addr
, 4);
849 /* Put in6_addr at location in the stream. */
850 int stream_put_in6_addr_at(struct stream
*s
, size_t putp
, struct in6_addr
*addr
)
852 STREAM_VERIFY_SANE(s
);
854 if (!PUT_AT_VALID(s
, putp
+ 16)) {
855 STREAM_BOUND_WARN(s
, "put");
859 memcpy(&s
->data
[putp
], addr
, 16);
863 /* Put prefix by nlri type format. */
864 int stream_put_prefix_addpath(struct stream
*s
, struct prefix
*p
,
865 int addpath_encode
, u_int32_t addpath_tx_id
)
868 size_t psize_with_addpath
;
870 STREAM_VERIFY_SANE(s
);
872 psize
= PSIZE(p
->prefixlen
);
875 psize_with_addpath
= psize
+ 4;
877 psize_with_addpath
= psize
;
879 if (STREAM_WRITEABLE(s
) < (psize_with_addpath
+ sizeof(u_char
))) {
880 STREAM_BOUND_WARN(s
, "put");
884 if (addpath_encode
) {
885 s
->data
[s
->endp
++] = (u_char
)(addpath_tx_id
>> 24);
886 s
->data
[s
->endp
++] = (u_char
)(addpath_tx_id
>> 16);
887 s
->data
[s
->endp
++] = (u_char
)(addpath_tx_id
>> 8);
888 s
->data
[s
->endp
++] = (u_char
)addpath_tx_id
;
891 s
->data
[s
->endp
++] = p
->prefixlen
;
892 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
898 int stream_put_prefix(struct stream
*s
, struct prefix
*p
)
900 return stream_put_prefix_addpath(s
, p
, 0, 0);
903 /* Put NLRI with label */
904 int stream_put_labeled_prefix(struct stream
*s
, struct prefix
*p
,
908 u_char
*label_pnt
= (u_char
*)label
;
910 STREAM_VERIFY_SANE(s
);
912 psize
= PSIZE(p
->prefixlen
);
914 if (STREAM_WRITEABLE(s
) < (psize
+ 3)) {
915 STREAM_BOUND_WARN(s
, "put");
919 stream_putc(s
, (p
->prefixlen
+ 24));
920 stream_putc(s
, label_pnt
[0]);
921 stream_putc(s
, label_pnt
[1]);
922 stream_putc(s
, label_pnt
[2]);
923 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
929 /* Read size from fd. */
930 int stream_read(struct stream
*s
, int fd
, size_t size
)
934 STREAM_VERIFY_SANE(s
);
936 if (STREAM_WRITEABLE(s
) < size
) {
937 STREAM_BOUND_WARN(s
, "put");
941 nbytes
= readn(fd
, s
->data
+ s
->endp
, size
);
949 ssize_t
stream_read_try(struct stream
*s
, int fd
, size_t size
)
953 STREAM_VERIFY_SANE(s
);
955 if (STREAM_WRITEABLE(s
) < size
) {
956 STREAM_BOUND_WARN(s
, "put");
957 /* Fatal (not transient) error, since retrying will not help
958 (stream is too small to contain the desired data). */
962 if ((nbytes
= read(fd
, s
->data
+ s
->endp
, size
)) >= 0) {
966 /* Error: was it transient (return -2) or fatal (return -1)? */
967 if (ERRNO_IO_RETRY(errno
))
969 zlog_warn("%s: read failed on fd %d: %s", __func__
, fd
,
970 safe_strerror(errno
));
974 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
975 * whose arguments match the remaining arguments to this function
977 ssize_t
stream_recvfrom(struct stream
*s
, int fd
, size_t size
, int flags
,
978 struct sockaddr
*from
, socklen_t
*fromlen
)
982 STREAM_VERIFY_SANE(s
);
984 if (STREAM_WRITEABLE(s
) < size
) {
985 STREAM_BOUND_WARN(s
, "put");
986 /* Fatal (not transient) error, since retrying will not help
987 (stream is too small to contain the desired data). */
991 if ((nbytes
= recvfrom(fd
, s
->data
+ s
->endp
, size
, flags
, from
,
997 /* Error: was it transient (return -2) or fatal (return -1)? */
998 if (ERRNO_IO_RETRY(errno
))
1000 zlog_warn("%s: read failed on fd %d: %s", __func__
, fd
,
1001 safe_strerror(errno
));
1005 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1007 * First iovec will be used to receive the data.
1008 * Stream need not be empty.
1010 ssize_t
stream_recvmsg(struct stream
*s
, int fd
, struct msghdr
*msgh
, int flags
,
1016 STREAM_VERIFY_SANE(s
);
1017 assert(msgh
->msg_iovlen
> 0);
1019 if (STREAM_WRITEABLE(s
) < size
) {
1020 STREAM_BOUND_WARN(s
, "put");
1021 /* This is a logic error in the calling code: the stream is too
1023 to hold the desired data! */
1027 iov
= &(msgh
->msg_iov
[0]);
1028 iov
->iov_base
= (s
->data
+ s
->endp
);
1029 iov
->iov_len
= size
;
1031 nbytes
= recvmsg(fd
, msgh
, flags
);
1039 /* Write data to buffer. */
1040 size_t stream_write(struct stream
*s
, const void *ptr
, size_t size
)
1043 CHECK_SIZE(s
, size
);
1045 STREAM_VERIFY_SANE(s
);
1047 if (STREAM_WRITEABLE(s
) < size
) {
1048 STREAM_BOUND_WARN(s
, "put");
1052 memcpy(s
->data
+ s
->endp
, ptr
, size
);
1058 /* Return current read pointer.
1060 * Use stream_get_pnt_to if you must, but decoding streams properly
1063 u_char
*stream_pnt(struct stream
*s
)
1065 STREAM_VERIFY_SANE(s
);
1066 return s
->data
+ s
->getp
;
1069 /* Check does this stream empty? */
1070 int stream_empty(struct stream
*s
)
1072 STREAM_VERIFY_SANE(s
);
1074 return (s
->endp
== 0);
1078 void stream_reset(struct stream
*s
)
1080 STREAM_VERIFY_SANE(s
);
1082 s
->getp
= s
->endp
= 0;
1085 /* Write stream contens to the file discriptor. */
1086 int stream_flush(struct stream
*s
, int fd
)
1090 STREAM_VERIFY_SANE(s
);
1092 nbytes
= write(fd
, s
->data
+ s
->getp
, s
->endp
- s
->getp
);
1097 /* Stream first in first out queue. */
1099 struct stream_fifo
*stream_fifo_new(void)
1101 struct stream_fifo
*new;
1103 new = XCALLOC(MTYPE_STREAM_FIFO
, sizeof(struct stream_fifo
));
1107 /* Add new stream to fifo. */
1108 void stream_fifo_push(struct stream_fifo
*fifo
, struct stream
*s
)
1111 fifo
->tail
->next
= s
;
1120 /* Delete first stream from fifo. */
1121 struct stream
*stream_fifo_pop(struct stream_fifo
*fifo
)
1128 fifo
->head
= s
->next
;
1130 if (fifo
->head
== NULL
)
1139 /* Return first fifo entry. */
1140 struct stream
*stream_fifo_head(struct stream_fifo
*fifo
)
1145 void stream_fifo_clean(struct stream_fifo
*fifo
)
1148 struct stream
*next
;
1150 for (s
= fifo
->head
; s
; s
= next
) {
1154 fifo
->head
= fifo
->tail
= NULL
;
1158 void stream_fifo_free(struct stream_fifo
*fifo
)
1160 stream_fifo_clean(fifo
);
1161 XFREE(MTYPE_STREAM_FIFO
, fifo
);