]>
git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
3 *Copyright (C) 1999 Kunihiro Ishiguro
5 *This file is part of GNU Zebra.
7 *GNU Zebra is free software; you can redistribute it and/or modify it
8 *under the terms of the GNU General Public License as published by the
9 *Free Software Foundation; either version 2, or (at your option) any
12 *GNU Zebra is distributed in the hope that it will be useful, but
13 *WITHOUT ANY WARRANTY; without even the implied warranty of
14 *MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 *General Public License for more details.
17 *You should have received a copy of the GNU General Public License
18 *along with GNU Zebra; see the file COPYING. If not, write to the Free
19 *Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
32 DEFINE_MTYPE_STATIC(LIB
, STREAM
, "Stream")
33 DEFINE_MTYPE_STATIC(LIB
, STREAM_DATA
, "Stream data")
34 DEFINE_MTYPE_STATIC(LIB
, STREAM_FIFO
, "Stream FIFO")
36 /* Tests whether a position is valid */
37 #define GETP_VALID(S, G) ((G) <= (S)->endp)
38 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
39 #define ENDP_VALID(S, E) ((E) <= (S)->size)
41 /* asserting sanity checks. Following must be true before
42 * stream functions are called:
44 * Following must always be true of stream elements
45 * before and after calls to stream functions:
47 * getp <= endp <= size
49 * Note that after a stream function is called following may be true:
50 * if (getp == endp) then stream is no longer readable
51 * if (endp == size) then stream is no longer writeable
53 * It is valid to put to anywhere within the size of the stream, but only
54 * using stream_put..._at() functions.
56 #define STREAM_WARN_OFFSETS(S) \
57 zlog_warn("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
58 (void *)(S), (unsigned long)(S)->size, \
59 (unsigned long)(S)->getp, (unsigned long)(S)->endp)
61 #define STREAM_VERIFY_SANE(S) \
63 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) \
64 STREAM_WARN_OFFSETS(S); \
65 assert(GETP_VALID(S, (S)->getp)); \
66 assert(ENDP_VALID(S, (S)->endp)); \
69 #define STREAM_BOUND_WARN(S, WHAT) \
71 zlog_warn("%s: Attempt to %s out of bounds", __func__, \
73 STREAM_WARN_OFFSETS(S); \
77 /* XXX: Deprecated macro: do not use */
78 #define CHECK_SIZE(S, Z) \
80 if (((S)->endp + (Z)) > (S)->size) { \
82 "CHECK_SIZE: truncating requested size %lu\n", \
83 (unsigned long)(Z)); \
84 STREAM_WARN_OFFSETS(S); \
85 (Z) = (S)->size - (S)->endp; \
89 /* Make stream buffer. */
90 struct stream
*stream_new(size_t size
)
96 s
= XCALLOC(MTYPE_STREAM
, sizeof(struct stream
));
101 if ((s
->data
= XMALLOC(MTYPE_STREAM_DATA
, size
)) == NULL
) {
102 XFREE(MTYPE_STREAM
, s
);
111 void stream_free(struct stream
*s
)
116 XFREE(MTYPE_STREAM_DATA
, s
->data
);
117 XFREE(MTYPE_STREAM
, s
);
120 struct stream
*stream_copy(struct stream
*new, struct stream
*src
)
122 STREAM_VERIFY_SANE(src
);
125 assert(STREAM_SIZE(new) >= src
->endp
);
127 new->endp
= src
->endp
;
128 new->getp
= src
->getp
;
130 memcpy(new->data
, src
->data
, src
->endp
);
135 struct stream
*stream_dup(struct stream
*s
)
139 STREAM_VERIFY_SANE(s
);
141 if ((new = stream_new(s
->endp
)) == NULL
)
144 return (stream_copy(new, s
));
147 struct stream
*stream_dupcat(struct stream
*s1
, struct stream
*s2
,
152 STREAM_VERIFY_SANE(s1
);
153 STREAM_VERIFY_SANE(s2
);
155 if ((new = stream_new(s1
->endp
+ s2
->endp
)) == NULL
)
158 memcpy(new->data
, s1
->data
, offset
);
159 memcpy(new->data
+ offset
, s2
->data
, s2
->endp
);
160 memcpy(new->data
+ offset
+ s2
->endp
, s1
->data
+ offset
,
161 (s1
->endp
- offset
));
162 new->endp
= s1
->endp
+ s2
->endp
;
166 size_t stream_resize(struct stream
*s
, size_t newsize
)
169 STREAM_VERIFY_SANE(s
);
171 newdata
= XREALLOC(MTYPE_STREAM_DATA
, s
->data
, newsize
);
179 if (s
->endp
> s
->size
)
181 if (s
->getp
> s
->endp
)
184 STREAM_VERIFY_SANE(s
);
189 size_t stream_get_getp(struct stream
*s
)
191 STREAM_VERIFY_SANE(s
);
195 size_t stream_get_endp(struct stream
*s
)
197 STREAM_VERIFY_SANE(s
);
201 size_t stream_get_size(struct stream
*s
)
203 STREAM_VERIFY_SANE(s
);
207 /* Stream structre' stream pointer related functions. */
208 void stream_set_getp(struct stream
*s
, size_t pos
)
210 STREAM_VERIFY_SANE(s
);
212 if (!GETP_VALID(s
, pos
)) {
213 STREAM_BOUND_WARN(s
, "set getp");
220 void stream_set_endp(struct stream
*s
, size_t pos
)
222 STREAM_VERIFY_SANE(s
);
224 if (!ENDP_VALID(s
, pos
)) {
225 STREAM_BOUND_WARN(s
, "set endp");
230 * Make sure the current read pointer is not beyond the new endp.
233 STREAM_BOUND_WARN(s
, "set endp");
238 STREAM_VERIFY_SANE(s
);
241 /* Forward pointer. */
242 void stream_forward_getp(struct stream
*s
, size_t size
)
244 STREAM_VERIFY_SANE(s
);
246 if (!GETP_VALID(s
, s
->getp
+ size
)) {
247 STREAM_BOUND_WARN(s
, "seek getp");
254 void stream_forward_endp(struct stream
*s
, size_t size
)
256 STREAM_VERIFY_SANE(s
);
258 if (!ENDP_VALID(s
, s
->endp
+ size
)) {
259 STREAM_BOUND_WARN(s
, "seek endp");
266 /* Copy from stream to destination. */
267 void stream_get(void *dst
, struct stream
*s
, size_t size
)
269 STREAM_VERIFY_SANE(s
);
271 if (STREAM_READABLE(s
) < size
) {
272 STREAM_BOUND_WARN(s
, "get");
276 memcpy(dst
, s
->data
+ s
->getp
, size
);
280 /* Get next character from the stream. */
281 u_char
stream_getc(struct stream
*s
)
285 STREAM_VERIFY_SANE(s
);
287 if (STREAM_READABLE(s
) < sizeof(u_char
)) {
288 STREAM_BOUND_WARN(s
, "get char");
291 c
= s
->data
[s
->getp
++];
296 /* Get next character from the stream. */
297 u_char
stream_getc_from(struct stream
*s
, size_t from
)
301 STREAM_VERIFY_SANE(s
);
303 if (!GETP_VALID(s
, from
+ sizeof(u_char
))) {
304 STREAM_BOUND_WARN(s
, "get char");
313 /* Get next word from the stream. */
314 u_int16_t
stream_getw(struct stream
*s
)
318 STREAM_VERIFY_SANE(s
);
320 if (STREAM_READABLE(s
) < sizeof(u_int16_t
)) {
321 STREAM_BOUND_WARN(s
, "get ");
325 w
= s
->data
[s
->getp
++] << 8;
326 w
|= s
->data
[s
->getp
++];
331 /* Get next word from the stream. */
332 u_int16_t
stream_getw_from(struct stream
*s
, size_t from
)
336 STREAM_VERIFY_SANE(s
);
338 if (!GETP_VALID(s
, from
+ sizeof(u_int16_t
))) {
339 STREAM_BOUND_WARN(s
, "get ");
343 w
= s
->data
[from
++] << 8;
349 /* Get next 3-byte from the stream. */
350 u_int32_t
stream_get3_from(struct stream
*s
, size_t from
)
354 STREAM_VERIFY_SANE(s
);
356 if (!GETP_VALID(s
, from
+ 3)) {
357 STREAM_BOUND_WARN(s
, "get 3byte");
361 l
= s
->data
[from
++] << 16;
362 l
|= s
->data
[from
++] << 8;
368 u_int32_t
stream_get3(struct stream
*s
)
372 STREAM_VERIFY_SANE(s
);
374 if (STREAM_READABLE(s
) < 3) {
375 STREAM_BOUND_WARN(s
, "get 3byte");
379 l
= s
->data
[s
->getp
++] << 16;
380 l
|= s
->data
[s
->getp
++] << 8;
381 l
|= s
->data
[s
->getp
++];
386 /* Get next long word from the stream. */
387 u_int32_t
stream_getl_from(struct stream
*s
, size_t from
)
391 STREAM_VERIFY_SANE(s
);
393 if (!GETP_VALID(s
, from
+ sizeof(u_int32_t
))) {
394 STREAM_BOUND_WARN(s
, "get long");
398 l
= s
->data
[from
++] << 24;
399 l
|= s
->data
[from
++] << 16;
400 l
|= s
->data
[from
++] << 8;
406 /* Copy from stream at specific location to destination. */
407 void stream_get_from(void *dst
, struct stream
*s
, size_t from
, size_t size
)
409 STREAM_VERIFY_SANE(s
);
411 if (!GETP_VALID(s
, from
+ size
)) {
412 STREAM_BOUND_WARN(s
, "get from");
416 memcpy(dst
, s
->data
+ from
, size
);
419 u_int32_t
stream_getl(struct stream
*s
)
423 STREAM_VERIFY_SANE(s
);
425 if (STREAM_READABLE(s
) < sizeof(u_int32_t
)) {
426 STREAM_BOUND_WARN(s
, "get long");
430 l
= s
->data
[s
->getp
++] << 24;
431 l
|= s
->data
[s
->getp
++] << 16;
432 l
|= s
->data
[s
->getp
++] << 8;
433 l
|= s
->data
[s
->getp
++];
438 /* Get next quad word from the stream. */
439 uint64_t stream_getq_from(struct stream
*s
, size_t from
)
443 STREAM_VERIFY_SANE(s
);
445 if (!GETP_VALID(s
, from
+ sizeof(uint64_t))) {
446 STREAM_BOUND_WARN(s
, "get quad");
450 q
= ((uint64_t)s
->data
[from
++]) << 56;
451 q
|= ((uint64_t)s
->data
[from
++]) << 48;
452 q
|= ((uint64_t)s
->data
[from
++]) << 40;
453 q
|= ((uint64_t)s
->data
[from
++]) << 32;
454 q
|= ((uint64_t)s
->data
[from
++]) << 24;
455 q
|= ((uint64_t)s
->data
[from
++]) << 16;
456 q
|= ((uint64_t)s
->data
[from
++]) << 8;
457 q
|= ((uint64_t)s
->data
[from
++]);
462 uint64_t stream_getq(struct stream
*s
)
466 STREAM_VERIFY_SANE(s
);
468 if (STREAM_READABLE(s
) < sizeof(uint64_t)) {
469 STREAM_BOUND_WARN(s
, "get quad");
473 q
= ((uint64_t)s
->data
[s
->getp
++]) << 56;
474 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 48;
475 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 40;
476 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 32;
477 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 24;
478 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 16;
479 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 8;
480 q
|= ((uint64_t)s
->data
[s
->getp
++]);
485 /* Get next long word from the stream. */
486 u_int32_t
stream_get_ipv4(struct stream
*s
)
490 STREAM_VERIFY_SANE(s
);
492 if (STREAM_READABLE(s
) < sizeof(u_int32_t
)) {
493 STREAM_BOUND_WARN(s
, "get ipv4");
497 memcpy(&l
, s
->data
+ s
->getp
, sizeof(u_int32_t
));
498 s
->getp
+= sizeof(u_int32_t
);
503 float stream_getf(struct stream
*s
)
509 u
.d
= stream_getl(s
);
513 double stream_getd(struct stream
*s
)
519 u
.d
= stream_getq(s
);
523 /* Copy to source to stream.
525 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
526 * around. This should be fixed once the stream updates are working.
528 * stream_write() is saner
530 void stream_put(struct stream
*s
, const void *src
, size_t size
)
533 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
536 STREAM_VERIFY_SANE(s
);
538 if (STREAM_WRITEABLE(s
) < size
) {
539 STREAM_BOUND_WARN(s
, "put");
544 memcpy(s
->data
+ s
->endp
, src
, size
);
546 memset(s
->data
+ s
->endp
, 0, size
);
551 /* Put character to the stream. */
552 int stream_putc(struct stream
*s
, u_char c
)
554 STREAM_VERIFY_SANE(s
);
556 if (STREAM_WRITEABLE(s
) < sizeof(u_char
)) {
557 STREAM_BOUND_WARN(s
, "put");
561 s
->data
[s
->endp
++] = c
;
562 return sizeof(u_char
);
565 /* Put word to the stream. */
566 int stream_putw(struct stream
*s
, u_int16_t w
)
568 STREAM_VERIFY_SANE(s
);
570 if (STREAM_WRITEABLE(s
) < sizeof(u_int16_t
)) {
571 STREAM_BOUND_WARN(s
, "put");
575 s
->data
[s
->endp
++] = (u_char
)(w
>> 8);
576 s
->data
[s
->endp
++] = (u_char
)w
;
581 /* Put long word to the stream. */
582 int stream_put3(struct stream
*s
, u_int32_t l
)
584 STREAM_VERIFY_SANE(s
);
586 if (STREAM_WRITEABLE(s
) < 3) {
587 STREAM_BOUND_WARN(s
, "put");
591 s
->data
[s
->endp
++] = (u_char
)(l
>> 16);
592 s
->data
[s
->endp
++] = (u_char
)(l
>> 8);
593 s
->data
[s
->endp
++] = (u_char
)l
;
598 /* Put long word to the stream. */
599 int stream_putl(struct stream
*s
, u_int32_t l
)
601 STREAM_VERIFY_SANE(s
);
603 if (STREAM_WRITEABLE(s
) < sizeof(u_int32_t
)) {
604 STREAM_BOUND_WARN(s
, "put");
608 s
->data
[s
->endp
++] = (u_char
)(l
>> 24);
609 s
->data
[s
->endp
++] = (u_char
)(l
>> 16);
610 s
->data
[s
->endp
++] = (u_char
)(l
>> 8);
611 s
->data
[s
->endp
++] = (u_char
)l
;
616 /* Put quad word to the stream. */
617 int stream_putq(struct stream
*s
, uint64_t q
)
619 STREAM_VERIFY_SANE(s
);
621 if (STREAM_WRITEABLE(s
) < sizeof(uint64_t)) {
622 STREAM_BOUND_WARN(s
, "put quad");
626 s
->data
[s
->endp
++] = (u_char
)(q
>> 56);
627 s
->data
[s
->endp
++] = (u_char
)(q
>> 48);
628 s
->data
[s
->endp
++] = (u_char
)(q
>> 40);
629 s
->data
[s
->endp
++] = (u_char
)(q
>> 32);
630 s
->data
[s
->endp
++] = (u_char
)(q
>> 24);
631 s
->data
[s
->endp
++] = (u_char
)(q
>> 16);
632 s
->data
[s
->endp
++] = (u_char
)(q
>> 8);
633 s
->data
[s
->endp
++] = (u_char
)q
;
638 int stream_putf(struct stream
*s
, float f
)
645 return stream_putl(s
, u
.o
);
648 int stream_putd(struct stream
*s
, double d
)
655 return stream_putq(s
, u
.o
);
658 int stream_putc_at(struct stream
*s
, size_t putp
, u_char c
)
660 STREAM_VERIFY_SANE(s
);
662 if (!PUT_AT_VALID(s
, putp
+ sizeof(u_char
))) {
663 STREAM_BOUND_WARN(s
, "put");
672 int stream_putw_at(struct stream
*s
, size_t putp
, u_int16_t w
)
674 STREAM_VERIFY_SANE(s
);
676 if (!PUT_AT_VALID(s
, putp
+ sizeof(u_int16_t
))) {
677 STREAM_BOUND_WARN(s
, "put");
681 s
->data
[putp
] = (u_char
)(w
>> 8);
682 s
->data
[putp
+ 1] = (u_char
)w
;
687 int stream_put3_at(struct stream
*s
, size_t putp
, u_int32_t l
)
689 STREAM_VERIFY_SANE(s
);
691 if (!PUT_AT_VALID(s
, putp
+ 3)) {
692 STREAM_BOUND_WARN(s
, "put");
695 s
->data
[putp
] = (u_char
)(l
>> 16);
696 s
->data
[putp
+ 1] = (u_char
)(l
>> 8);
697 s
->data
[putp
+ 2] = (u_char
)l
;
702 int stream_putl_at(struct stream
*s
, size_t putp
, u_int32_t l
)
704 STREAM_VERIFY_SANE(s
);
706 if (!PUT_AT_VALID(s
, putp
+ sizeof(u_int32_t
))) {
707 STREAM_BOUND_WARN(s
, "put");
710 s
->data
[putp
] = (u_char
)(l
>> 24);
711 s
->data
[putp
+ 1] = (u_char
)(l
>> 16);
712 s
->data
[putp
+ 2] = (u_char
)(l
>> 8);
713 s
->data
[putp
+ 3] = (u_char
)l
;
718 int stream_putq_at(struct stream
*s
, size_t putp
, uint64_t q
)
720 STREAM_VERIFY_SANE(s
);
722 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint64_t))) {
723 STREAM_BOUND_WARN(s
, "put");
726 s
->data
[putp
] = (u_char
)(q
>> 56);
727 s
->data
[putp
+ 1] = (u_char
)(q
>> 48);
728 s
->data
[putp
+ 2] = (u_char
)(q
>> 40);
729 s
->data
[putp
+ 3] = (u_char
)(q
>> 32);
730 s
->data
[putp
+ 4] = (u_char
)(q
>> 24);
731 s
->data
[putp
+ 5] = (u_char
)(q
>> 16);
732 s
->data
[putp
+ 6] = (u_char
)(q
>> 8);
733 s
->data
[putp
+ 7] = (u_char
)q
;
738 /* Put long word to the stream. */
739 int stream_put_ipv4(struct stream
*s
, u_int32_t l
)
741 STREAM_VERIFY_SANE(s
);
743 if (STREAM_WRITEABLE(s
) < sizeof(u_int32_t
)) {
744 STREAM_BOUND_WARN(s
, "put");
747 memcpy(s
->data
+ s
->endp
, &l
, sizeof(u_int32_t
));
748 s
->endp
+= sizeof(u_int32_t
);
750 return sizeof(u_int32_t
);
753 /* Put long word to the stream. */
754 int stream_put_in_addr(struct stream
*s
, struct in_addr
*addr
)
756 STREAM_VERIFY_SANE(s
);
758 if (STREAM_WRITEABLE(s
) < sizeof(u_int32_t
)) {
759 STREAM_BOUND_WARN(s
, "put");
763 memcpy(s
->data
+ s
->endp
, addr
, sizeof(u_int32_t
));
764 s
->endp
+= sizeof(u_int32_t
);
766 return sizeof(u_int32_t
);
769 /* Put in_addr at location in the stream. */
770 int stream_put_in_addr_at(struct stream
*s
, size_t putp
, struct in_addr
*addr
)
772 STREAM_VERIFY_SANE(s
);
774 if (!PUT_AT_VALID(s
, putp
+ 4)) {
775 STREAM_BOUND_WARN(s
, "put");
779 memcpy(&s
->data
[putp
], addr
, 4);
783 /* Put in6_addr at location in the stream. */
784 int stream_put_in6_addr_at(struct stream
*s
, size_t putp
, struct in6_addr
*addr
)
786 STREAM_VERIFY_SANE(s
);
788 if (!PUT_AT_VALID(s
, putp
+ 16)) {
789 STREAM_BOUND_WARN(s
, "put");
793 memcpy(&s
->data
[putp
], addr
, 16);
797 /* Put prefix by nlri type format. */
798 int stream_put_prefix_addpath(struct stream
*s
, struct prefix
*p
,
799 int addpath_encode
, u_int32_t addpath_tx_id
)
802 size_t psize_with_addpath
;
804 STREAM_VERIFY_SANE(s
);
806 psize
= PSIZE(p
->prefixlen
);
809 psize_with_addpath
= psize
+ 4;
811 psize_with_addpath
= psize
;
813 if (STREAM_WRITEABLE(s
) < (psize_with_addpath
+ sizeof(u_char
))) {
814 STREAM_BOUND_WARN(s
, "put");
818 if (addpath_encode
) {
819 s
->data
[s
->endp
++] = (u_char
)(addpath_tx_id
>> 24);
820 s
->data
[s
->endp
++] = (u_char
)(addpath_tx_id
>> 16);
821 s
->data
[s
->endp
++] = (u_char
)(addpath_tx_id
>> 8);
822 s
->data
[s
->endp
++] = (u_char
)addpath_tx_id
;
825 s
->data
[s
->endp
++] = p
->prefixlen
;
826 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
832 int stream_put_prefix(struct stream
*s
, struct prefix
*p
)
834 return stream_put_prefix_addpath(s
, p
, 0, 0);
838 /* Read size from fd. */
839 int stream_read(struct stream
*s
, int fd
, size_t size
)
843 STREAM_VERIFY_SANE(s
);
845 if (STREAM_WRITEABLE(s
) < size
) {
846 STREAM_BOUND_WARN(s
, "put");
850 nbytes
= readn(fd
, s
->data
+ s
->endp
, size
);
858 ssize_t
stream_read_try(struct stream
*s
, int fd
, size_t size
)
862 STREAM_VERIFY_SANE(s
);
864 if (STREAM_WRITEABLE(s
) < size
) {
865 STREAM_BOUND_WARN(s
, "put");
866 /* Fatal (not transient) error, since retrying will not help
867 (stream is too small to contain the desired data). */
871 if ((nbytes
= read(fd
, s
->data
+ s
->endp
, size
)) >= 0) {
875 /* Error: was it transient (return -2) or fatal (return -1)? */
876 if (ERRNO_IO_RETRY(errno
))
878 zlog_warn("%s: read failed on fd %d: %s", __func__
, fd
,
879 safe_strerror(errno
));
883 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
884 * whose arguments match the remaining arguments to this function
886 ssize_t
stream_recvfrom(struct stream
*s
, int fd
, size_t size
, int flags
,
887 struct sockaddr
*from
, socklen_t
*fromlen
)
891 STREAM_VERIFY_SANE(s
);
893 if (STREAM_WRITEABLE(s
) < size
) {
894 STREAM_BOUND_WARN(s
, "put");
895 /* Fatal (not transient) error, since retrying will not help
896 (stream is too small to contain the desired data). */
900 if ((nbytes
= recvfrom(fd
, s
->data
+ s
->endp
, size
, flags
, from
,
906 /* Error: was it transient (return -2) or fatal (return -1)? */
907 if (ERRNO_IO_RETRY(errno
))
909 zlog_warn("%s: read failed on fd %d: %s", __func__
, fd
,
910 safe_strerror(errno
));
914 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
916 * First iovec will be used to receive the data.
917 * Stream need not be empty.
919 ssize_t
stream_recvmsg(struct stream
*s
, int fd
, struct msghdr
*msgh
, int flags
,
925 STREAM_VERIFY_SANE(s
);
926 assert(msgh
->msg_iovlen
> 0);
928 if (STREAM_WRITEABLE(s
) < size
) {
929 STREAM_BOUND_WARN(s
, "put");
930 /* This is a logic error in the calling code: the stream is too
932 to hold the desired data! */
936 iov
= &(msgh
->msg_iov
[0]);
937 iov
->iov_base
= (s
->data
+ s
->endp
);
940 nbytes
= recvmsg(fd
, msgh
, flags
);
948 /* Write data to buffer. */
949 size_t stream_write(struct stream
*s
, const void *ptr
, size_t size
)
954 STREAM_VERIFY_SANE(s
);
956 if (STREAM_WRITEABLE(s
) < size
) {
957 STREAM_BOUND_WARN(s
, "put");
961 memcpy(s
->data
+ s
->endp
, ptr
, size
);
967 /* Return current read pointer.
969 * Use stream_get_pnt_to if you must, but decoding streams properly
972 u_char
*stream_pnt(struct stream
*s
)
974 STREAM_VERIFY_SANE(s
);
975 return s
->data
+ s
->getp
;
978 /* Check does this stream empty? */
979 int stream_empty(struct stream
*s
)
981 STREAM_VERIFY_SANE(s
);
983 return (s
->endp
== 0);
987 void stream_reset(struct stream
*s
)
989 STREAM_VERIFY_SANE(s
);
991 s
->getp
= s
->endp
= 0;
994 /* Write stream contens to the file discriptor. */
995 int stream_flush(struct stream
*s
, int fd
)
999 STREAM_VERIFY_SANE(s
);
1001 nbytes
= write(fd
, s
->data
+ s
->getp
, s
->endp
- s
->getp
);
1006 /* Stream first in first out queue. */
1008 struct stream_fifo
*stream_fifo_new(void)
1010 struct stream_fifo
*new;
1012 new = XCALLOC(MTYPE_STREAM_FIFO
, sizeof(struct stream_fifo
));
1016 /* Add new stream to fifo. */
1017 void stream_fifo_push(struct stream_fifo
*fifo
, struct stream
*s
)
1020 fifo
->tail
->next
= s
;
1029 /* Delete first stream from fifo. */
1030 struct stream
*stream_fifo_pop(struct stream_fifo
*fifo
)
1037 fifo
->head
= s
->next
;
1039 if (fifo
->head
== NULL
)
1048 /* Return first fifo entry. */
1049 struct stream
*stream_fifo_head(struct stream_fifo
*fifo
)
1054 void stream_fifo_clean(struct stream_fifo
*fifo
)
1057 struct stream
*next
;
1059 for (s
= fifo
->head
; s
; s
= next
) {
1063 fifo
->head
= fifo
->tail
= NULL
;
1067 void stream_fifo_free(struct stream_fifo
*fifo
)
1069 stream_fifo_clean(fifo
);
1070 XFREE(MTYPE_STREAM_FIFO
, fifo
);