]>
git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
3 * Copyright (C) 1999 Kunihiro Ishiguro
5 * This file is part of GNU Zebra.
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with GNU Zebra; see the file COPYING. If not, write to the Free
19 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
32 DEFINE_MTYPE_STATIC(LIB
, STREAM
, "Stream")
33 DEFINE_MTYPE_STATIC(LIB
, STREAM_DATA
, "Stream data")
34 DEFINE_MTYPE_STATIC(LIB
, STREAM_FIFO
, "Stream FIFO")
36 /* Tests whether a position is valid */
37 #define GETP_VALID(S,G) \
39 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
40 #define ENDP_VALID(S,E) \
43 /* asserting sanity checks. Following must be true before
44 * stream functions are called:
46 * Following must always be true of stream elements
47 * before and after calls to stream functions:
49 * getp <= endp <= size
51 * Note that after a stream function is called following may be true:
52 * if (getp == endp) then stream is no longer readable
53 * if (endp == size) then stream is no longer writeable
55 * It is valid to put to anywhere within the size of the stream, but only
56 * using stream_put..._at() functions.
58 #define STREAM_WARN_OFFSETS(S) \
59 zlog_warn ("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
61 (unsigned long) (S)->size, \
62 (unsigned long) (S)->getp, \
63 (unsigned long) (S)->endp)\
65 #define STREAM_VERIFY_SANE(S) \
67 if ( !(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp)) ) \
68 STREAM_WARN_OFFSETS(S); \
69 assert ( GETP_VALID(S, (S)->getp) ); \
70 assert ( ENDP_VALID(S, (S)->endp) ); \
73 #define STREAM_BOUND_WARN(S, WHAT) \
75 zlog_warn ("%s: Attempt to %s out of bounds", __func__, (WHAT)); \
76 STREAM_WARN_OFFSETS(S); \
80 /* XXX: Deprecated macro: do not use */
81 #define CHECK_SIZE(S, Z) \
83 if (((S)->endp + (Z)) > (S)->size) \
85 zlog_warn ("CHECK_SIZE: truncating requested size %lu\n", \
86 (unsigned long) (Z)); \
87 STREAM_WARN_OFFSETS(S); \
88 (Z) = (S)->size - (S)->endp; \
92 /* Make stream buffer. */
94 stream_new (size_t size
)
100 s
= XCALLOC (MTYPE_STREAM
, sizeof (struct stream
));
105 if ( (s
->data
= XMALLOC (MTYPE_STREAM_DATA
, size
)) == NULL
)
107 XFREE (MTYPE_STREAM
, s
);
117 stream_free (struct stream
*s
)
122 XFREE (MTYPE_STREAM_DATA
, s
->data
);
123 XFREE (MTYPE_STREAM
, s
);
127 stream_copy (struct stream
*new, struct stream
*src
)
129 STREAM_VERIFY_SANE (src
);
131 assert (new != NULL
);
132 assert (STREAM_SIZE(new) >= src
->endp
);
134 new->endp
= src
->endp
;
135 new->getp
= src
->getp
;
137 memcpy (new->data
, src
->data
, src
->endp
);
143 stream_dup (struct stream
*s
)
147 STREAM_VERIFY_SANE (s
);
149 if ( (new = stream_new (s
->endp
)) == NULL
)
152 return (stream_copy (new, s
));
156 stream_dupcat (struct stream
*s1
, struct stream
*s2
, size_t offset
)
160 STREAM_VERIFY_SANE (s1
);
161 STREAM_VERIFY_SANE (s2
);
163 if ( (new = stream_new (s1
->endp
+ s2
->endp
)) == NULL
)
166 memcpy (new->data
, s1
->data
, offset
);
167 memcpy (new->data
+ offset
, s2
->data
, s2
->endp
);
168 memcpy (new->data
+ offset
+ s2
->endp
, s1
->data
+ offset
,
169 (s1
->endp
- offset
));
170 new->endp
= s1
->endp
+ s2
->endp
;
175 stream_resize (struct stream
*s
, size_t newsize
)
178 STREAM_VERIFY_SANE (s
);
180 newdata
= XREALLOC (MTYPE_STREAM_DATA
, s
->data
, newsize
);
188 if (s
->endp
> s
->size
)
190 if (s
->getp
> s
->endp
)
193 STREAM_VERIFY_SANE (s
);
199 stream_get_getp (struct stream
*s
)
201 STREAM_VERIFY_SANE(s
);
206 stream_get_endp (struct stream
*s
)
208 STREAM_VERIFY_SANE(s
);
213 stream_get_size (struct stream
*s
)
215 STREAM_VERIFY_SANE(s
);
219 /* Stream structre' stream pointer related functions. */
221 stream_set_getp (struct stream
*s
, size_t pos
)
223 STREAM_VERIFY_SANE(s
);
225 if (!GETP_VALID (s
, pos
))
227 STREAM_BOUND_WARN (s
, "set getp");
235 stream_set_endp (struct stream
*s
, size_t pos
)
237 STREAM_VERIFY_SANE(s
);
239 if (!ENDP_VALID(s
, pos
))
241 STREAM_BOUND_WARN (s
, "set endp");
246 * Make sure the current read pointer is not beyond the new endp.
250 STREAM_BOUND_WARN(s
, "set endp");
255 STREAM_VERIFY_SANE(s
);
258 /* Forward pointer. */
260 stream_forward_getp (struct stream
*s
, size_t size
)
262 STREAM_VERIFY_SANE(s
);
264 if (!GETP_VALID (s
, s
->getp
+ size
))
266 STREAM_BOUND_WARN (s
, "seek getp");
274 stream_forward_endp (struct stream
*s
, size_t size
)
276 STREAM_VERIFY_SANE(s
);
278 if (!ENDP_VALID (s
, s
->endp
+ size
))
280 STREAM_BOUND_WARN (s
, "seek endp");
287 /* Copy from stream to destination. */
289 stream_get (void *dst
, struct stream
*s
, size_t size
)
291 STREAM_VERIFY_SANE(s
);
293 if (STREAM_READABLE(s
) < size
)
295 STREAM_BOUND_WARN (s
, "get");
299 memcpy (dst
, s
->data
+ s
->getp
, size
);
303 /* Get next character from the stream. */
305 stream_getc (struct stream
*s
)
309 STREAM_VERIFY_SANE (s
);
311 if (STREAM_READABLE(s
) < sizeof (u_char
))
313 STREAM_BOUND_WARN (s
, "get char");
316 c
= s
->data
[s
->getp
++];
321 /* Get next character from the stream. */
323 stream_getc_from (struct stream
*s
, size_t from
)
327 STREAM_VERIFY_SANE(s
);
329 if (!GETP_VALID (s
, from
+ sizeof (u_char
)))
331 STREAM_BOUND_WARN (s
, "get char");
340 /* Get next word from the stream. */
342 stream_getw (struct stream
*s
)
346 STREAM_VERIFY_SANE (s
);
348 if (STREAM_READABLE (s
) < sizeof (u_int16_t
))
350 STREAM_BOUND_WARN (s
, "get ");
354 w
= s
->data
[s
->getp
++] << 8;
355 w
|= s
->data
[s
->getp
++];
360 /* Get next word from the stream. */
362 stream_getw_from (struct stream
*s
, size_t from
)
366 STREAM_VERIFY_SANE(s
);
368 if (!GETP_VALID (s
, from
+ sizeof (u_int16_t
)))
370 STREAM_BOUND_WARN (s
, "get ");
374 w
= s
->data
[from
++] << 8;
380 /* Get next 3-byte from the stream. */
382 stream_get3_from (struct stream
*s
, size_t from
)
386 STREAM_VERIFY_SANE(s
);
388 if (!GETP_VALID (s
, from
+ 3))
390 STREAM_BOUND_WARN (s
, "get 3byte");
394 l
= s
->data
[from
++] << 16;
395 l
|= s
->data
[from
++] << 8;
402 stream_get3 (struct stream
*s
)
406 STREAM_VERIFY_SANE(s
);
408 if (STREAM_READABLE (s
) < 3)
410 STREAM_BOUND_WARN (s
, "get 3byte");
414 l
= s
->data
[s
->getp
++] << 16;
415 l
|= s
->data
[s
->getp
++] << 8;
416 l
|= s
->data
[s
->getp
++];
421 /* Get next long word from the stream. */
423 stream_getl_from (struct stream
*s
, size_t from
)
427 STREAM_VERIFY_SANE(s
);
429 if (!GETP_VALID (s
, from
+ sizeof (u_int32_t
)))
431 STREAM_BOUND_WARN (s
, "get long");
435 l
= s
->data
[from
++] << 24;
436 l
|= s
->data
[from
++] << 16;
437 l
|= s
->data
[from
++] << 8;
443 /* Copy from stream at specific location to destination. */
445 stream_get_from (void *dst
, struct stream
*s
, size_t from
, size_t size
)
447 STREAM_VERIFY_SANE(s
);
449 if (!GETP_VALID (s
, from
+ size
))
451 STREAM_BOUND_WARN (s
, "get from");
455 memcpy (dst
, s
->data
+ from
, size
);
459 stream_getl (struct stream
*s
)
463 STREAM_VERIFY_SANE(s
);
465 if (STREAM_READABLE (s
) < sizeof (u_int32_t
))
467 STREAM_BOUND_WARN (s
, "get long");
471 l
= s
->data
[s
->getp
++] << 24;
472 l
|= s
->data
[s
->getp
++] << 16;
473 l
|= s
->data
[s
->getp
++] << 8;
474 l
|= s
->data
[s
->getp
++];
479 /* Get next quad word from the stream. */
481 stream_getq_from (struct stream
*s
, size_t from
)
485 STREAM_VERIFY_SANE(s
);
487 if (!GETP_VALID (s
, from
+ sizeof (uint64_t)))
489 STREAM_BOUND_WARN (s
, "get quad");
493 q
= ((uint64_t) s
->data
[from
++]) << 56;
494 q
|= ((uint64_t) s
->data
[from
++]) << 48;
495 q
|= ((uint64_t) s
->data
[from
++]) << 40;
496 q
|= ((uint64_t) s
->data
[from
++]) << 32;
497 q
|= ((uint64_t) s
->data
[from
++]) << 24;
498 q
|= ((uint64_t) s
->data
[from
++]) << 16;
499 q
|= ((uint64_t) s
->data
[from
++]) << 8;
500 q
|= ((uint64_t) s
->data
[from
++]);
506 stream_getq (struct stream
*s
)
510 STREAM_VERIFY_SANE(s
);
512 if (STREAM_READABLE (s
) < sizeof (uint64_t))
514 STREAM_BOUND_WARN (s
, "get quad");
518 q
= ((uint64_t) s
->data
[s
->getp
++]) << 56;
519 q
|= ((uint64_t) s
->data
[s
->getp
++]) << 48;
520 q
|= ((uint64_t) s
->data
[s
->getp
++]) << 40;
521 q
|= ((uint64_t) s
->data
[s
->getp
++]) << 32;
522 q
|= ((uint64_t) s
->data
[s
->getp
++]) << 24;
523 q
|= ((uint64_t) s
->data
[s
->getp
++]) << 16;
524 q
|= ((uint64_t) s
->data
[s
->getp
++]) << 8;
525 q
|= ((uint64_t) s
->data
[s
->getp
++]);
530 /* Get next long word from the stream. */
532 stream_get_ipv4 (struct stream
*s
)
536 STREAM_VERIFY_SANE(s
);
538 if (STREAM_READABLE (s
) < sizeof(u_int32_t
))
540 STREAM_BOUND_WARN (s
, "get ipv4");
544 memcpy (&l
, s
->data
+ s
->getp
, sizeof(u_int32_t
));
545 s
->getp
+= sizeof(u_int32_t
);
551 stream_getf (struct stream
*s
)
557 u
.d
= stream_getl (s
);
562 stream_getd (struct stream
*s
)
568 u
.d
= stream_getq (s
);
572 /* Copy to source to stream.
574 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
575 * around. This should be fixed once the stream updates are working.
577 * stream_write() is saner
580 stream_put (struct stream
*s
, const void *src
, size_t size
)
583 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
586 STREAM_VERIFY_SANE(s
);
588 if (STREAM_WRITEABLE (s
) < size
)
590 STREAM_BOUND_WARN (s
, "put");
595 memcpy (s
->data
+ s
->endp
, src
, size
);
597 memset (s
->data
+ s
->endp
, 0, size
);
602 /* Put character to the stream. */
604 stream_putc (struct stream
*s
, u_char c
)
606 STREAM_VERIFY_SANE(s
);
608 if (STREAM_WRITEABLE (s
) < sizeof(u_char
))
610 STREAM_BOUND_WARN (s
, "put");
614 s
->data
[s
->endp
++] = c
;
615 return sizeof (u_char
);
618 /* Put word to the stream. */
620 stream_putw (struct stream
*s
, u_int16_t w
)
622 STREAM_VERIFY_SANE (s
);
624 if (STREAM_WRITEABLE (s
) < sizeof (u_int16_t
))
626 STREAM_BOUND_WARN (s
, "put");
630 s
->data
[s
->endp
++] = (u_char
)(w
>> 8);
631 s
->data
[s
->endp
++] = (u_char
) w
;
636 /* Put long word to the stream. */
638 stream_put3 (struct stream
*s
, u_int32_t l
)
640 STREAM_VERIFY_SANE (s
);
642 if (STREAM_WRITEABLE (s
) < 3)
644 STREAM_BOUND_WARN (s
, "put");
648 s
->data
[s
->endp
++] = (u_char
)(l
>> 16);
649 s
->data
[s
->endp
++] = (u_char
)(l
>> 8);
650 s
->data
[s
->endp
++] = (u_char
)l
;
655 /* Put long word to the stream. */
657 stream_putl (struct stream
*s
, u_int32_t l
)
659 STREAM_VERIFY_SANE (s
);
661 if (STREAM_WRITEABLE (s
) < sizeof (u_int32_t
))
663 STREAM_BOUND_WARN (s
, "put");
667 s
->data
[s
->endp
++] = (u_char
)(l
>> 24);
668 s
->data
[s
->endp
++] = (u_char
)(l
>> 16);
669 s
->data
[s
->endp
++] = (u_char
)(l
>> 8);
670 s
->data
[s
->endp
++] = (u_char
)l
;
675 /* Put quad word to the stream. */
677 stream_putq (struct stream
*s
, uint64_t q
)
679 STREAM_VERIFY_SANE (s
);
681 if (STREAM_WRITEABLE (s
) < sizeof (uint64_t))
683 STREAM_BOUND_WARN (s
, "put quad");
687 s
->data
[s
->endp
++] = (u_char
)(q
>> 56);
688 s
->data
[s
->endp
++] = (u_char
)(q
>> 48);
689 s
->data
[s
->endp
++] = (u_char
)(q
>> 40);
690 s
->data
[s
->endp
++] = (u_char
)(q
>> 32);
691 s
->data
[s
->endp
++] = (u_char
)(q
>> 24);
692 s
->data
[s
->endp
++] = (u_char
)(q
>> 16);
693 s
->data
[s
->endp
++] = (u_char
)(q
>> 8);
694 s
->data
[s
->endp
++] = (u_char
)q
;
700 stream_putf (struct stream
*s
, float f
)
707 return stream_putl (s
, u
.o
);
711 stream_putd (struct stream
*s
, double d
)
718 return stream_putq (s
, u
.o
);
722 stream_putc_at (struct stream
*s
, size_t putp
, u_char c
)
724 STREAM_VERIFY_SANE(s
);
726 if (!PUT_AT_VALID (s
, putp
+ sizeof (u_char
)))
728 STREAM_BOUND_WARN (s
, "put");
738 stream_putw_at (struct stream
*s
, size_t putp
, u_int16_t w
)
740 STREAM_VERIFY_SANE(s
);
742 if (!PUT_AT_VALID (s
, putp
+ sizeof (u_int16_t
)))
744 STREAM_BOUND_WARN (s
, "put");
748 s
->data
[putp
] = (u_char
)(w
>> 8);
749 s
->data
[putp
+ 1] = (u_char
) w
;
755 stream_put3_at (struct stream
*s
, size_t putp
, u_int32_t l
)
757 STREAM_VERIFY_SANE(s
);
759 if (!PUT_AT_VALID (s
, putp
+ 3))
761 STREAM_BOUND_WARN (s
, "put");
764 s
->data
[putp
] = (u_char
)(l
>> 16);
765 s
->data
[putp
+ 1] = (u_char
)(l
>> 8);
766 s
->data
[putp
+ 2] = (u_char
)l
;
772 stream_putl_at (struct stream
*s
, size_t putp
, u_int32_t l
)
774 STREAM_VERIFY_SANE(s
);
776 if (!PUT_AT_VALID (s
, putp
+ sizeof (u_int32_t
)))
778 STREAM_BOUND_WARN (s
, "put");
781 s
->data
[putp
] = (u_char
)(l
>> 24);
782 s
->data
[putp
+ 1] = (u_char
)(l
>> 16);
783 s
->data
[putp
+ 2] = (u_char
)(l
>> 8);
784 s
->data
[putp
+ 3] = (u_char
)l
;
790 stream_putq_at (struct stream
*s
, size_t putp
, uint64_t q
)
792 STREAM_VERIFY_SANE(s
);
794 if (!PUT_AT_VALID (s
, putp
+ sizeof (uint64_t)))
796 STREAM_BOUND_WARN (s
, "put");
799 s
->data
[putp
] = (u_char
)(q
>> 56);
800 s
->data
[putp
+ 1] = (u_char
)(q
>> 48);
801 s
->data
[putp
+ 2] = (u_char
)(q
>> 40);
802 s
->data
[putp
+ 3] = (u_char
)(q
>> 32);
803 s
->data
[putp
+ 4] = (u_char
)(q
>> 24);
804 s
->data
[putp
+ 5] = (u_char
)(q
>> 16);
805 s
->data
[putp
+ 6] = (u_char
)(q
>> 8);
806 s
->data
[putp
+ 7] = (u_char
)q
;
811 /* Put long word to the stream. */
813 stream_put_ipv4 (struct stream
*s
, u_int32_t l
)
815 STREAM_VERIFY_SANE(s
);
817 if (STREAM_WRITEABLE (s
) < sizeof (u_int32_t
))
819 STREAM_BOUND_WARN (s
, "put");
822 memcpy (s
->data
+ s
->endp
, &l
, sizeof (u_int32_t
));
823 s
->endp
+= sizeof (u_int32_t
);
825 return sizeof (u_int32_t
);
828 /* Put long word to the stream. */
830 stream_put_in_addr (struct stream
*s
, struct in_addr
*addr
)
832 STREAM_VERIFY_SANE(s
);
834 if (STREAM_WRITEABLE (s
) < sizeof (u_int32_t
))
836 STREAM_BOUND_WARN (s
, "put");
840 memcpy (s
->data
+ s
->endp
, addr
, sizeof (u_int32_t
));
841 s
->endp
+= sizeof (u_int32_t
);
843 return sizeof (u_int32_t
);
846 /* Put in_addr at location in the stream. */
848 stream_put_in_addr_at (struct stream
*s
, size_t putp
, struct in_addr
*addr
)
850 STREAM_VERIFY_SANE(s
);
852 if (!PUT_AT_VALID (s
, putp
+ 4))
854 STREAM_BOUND_WARN (s
, "put");
858 memcpy (&s
->data
[putp
], addr
, 4);
862 /* Put in6_addr at location in the stream. */
864 stream_put_in6_addr_at (struct stream
*s
, size_t putp
, struct in6_addr
*addr
)
866 STREAM_VERIFY_SANE(s
);
868 if (!PUT_AT_VALID (s
, putp
+ 16))
870 STREAM_BOUND_WARN (s
, "put");
874 memcpy (&s
->data
[putp
], addr
, 16);
878 /* Put prefix by nlri type format. */
880 stream_put_prefix_addpath (struct stream
*s
, struct prefix
*p
,
881 int addpath_encode
, u_int32_t addpath_tx_id
)
884 size_t psize_with_addpath
;
886 STREAM_VERIFY_SANE(s
);
888 psize
= PSIZE (p
->prefixlen
);
891 psize_with_addpath
= psize
+ 4;
893 psize_with_addpath
= psize
;
895 if (STREAM_WRITEABLE (s
) < (psize_with_addpath
+ sizeof (u_char
)))
897 STREAM_BOUND_WARN (s
, "put");
903 s
->data
[s
->endp
++] = (u_char
)(addpath_tx_id
>> 24);
904 s
->data
[s
->endp
++] = (u_char
)(addpath_tx_id
>> 16);
905 s
->data
[s
->endp
++] = (u_char
)(addpath_tx_id
>> 8);
906 s
->data
[s
->endp
++] = (u_char
)addpath_tx_id
;
909 s
->data
[s
->endp
++] = p
->prefixlen
;
910 memcpy (s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
917 stream_put_prefix (struct stream
*s
, struct prefix
*p
)
919 return stream_put_prefix_addpath (s
, p
, 0, 0);
922 /* Put NLRI with label */
924 stream_put_labeled_prefix (struct stream
*s
, struct prefix
*p
, u_char
*label
)
928 STREAM_VERIFY_SANE(s
);
930 psize
= PSIZE (p
->prefixlen
);
932 if (STREAM_WRITEABLE (s
) < (psize
+ 3))
934 STREAM_BOUND_WARN (s
, "put");
938 stream_putc (s
, (p
->prefixlen
+ 24));
939 stream_putc(s
, label
[0]);
940 stream_putc(s
, label
[1]);
941 stream_putc(s
, label
[2]);
942 memcpy (s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
948 /* Read size from fd. */
950 stream_read (struct stream
*s
, int fd
, size_t size
)
954 STREAM_VERIFY_SANE(s
);
956 if (STREAM_WRITEABLE (s
) < size
)
958 STREAM_BOUND_WARN (s
, "put");
962 nbytes
= readn (fd
, s
->data
+ s
->endp
, size
);
971 stream_read_try(struct stream
*s
, int fd
, size_t size
)
975 STREAM_VERIFY_SANE(s
);
977 if (STREAM_WRITEABLE(s
) < size
)
979 STREAM_BOUND_WARN (s
, "put");
980 /* Fatal (not transient) error, since retrying will not help
981 (stream is too small to contain the desired data). */
985 if ((nbytes
= read(fd
, s
->data
+ s
->endp
, size
)) >= 0)
990 /* Error: was it transient (return -2) or fatal (return -1)? */
991 if (ERRNO_IO_RETRY(errno
))
993 zlog_warn("%s: read failed on fd %d: %s", __func__
, fd
, safe_strerror(errno
));
997 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
998 * whose arguments match the remaining arguments to this function
1001 stream_recvfrom (struct stream
*s
, int fd
, size_t size
, int flags
,
1002 struct sockaddr
*from
, socklen_t
*fromlen
)
1006 STREAM_VERIFY_SANE(s
);
1008 if (STREAM_WRITEABLE(s
) < size
)
1010 STREAM_BOUND_WARN (s
, "put");
1011 /* Fatal (not transient) error, since retrying will not help
1012 (stream is too small to contain the desired data). */
1016 if ((nbytes
= recvfrom (fd
, s
->data
+ s
->endp
, size
,
1017 flags
, from
, fromlen
)) >= 0)
1022 /* Error: was it transient (return -2) or fatal (return -1)? */
1023 if (ERRNO_IO_RETRY(errno
))
1025 zlog_warn("%s: read failed on fd %d: %s", __func__
, fd
, safe_strerror(errno
));
1029 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1031 * First iovec will be used to receive the data.
1032 * Stream need not be empty.
1035 stream_recvmsg (struct stream
*s
, int fd
, struct msghdr
*msgh
, int flags
,
1041 STREAM_VERIFY_SANE(s
);
1042 assert (msgh
->msg_iovlen
> 0);
1044 if (STREAM_WRITEABLE (s
) < size
)
1046 STREAM_BOUND_WARN (s
, "put");
1047 /* This is a logic error in the calling code: the stream is too small
1048 to hold the desired data! */
1052 iov
= &(msgh
->msg_iov
[0]);
1053 iov
->iov_base
= (s
->data
+ s
->endp
);
1054 iov
->iov_len
= size
;
1056 nbytes
= recvmsg (fd
, msgh
, flags
);
1064 /* Write data to buffer. */
1066 stream_write (struct stream
*s
, const void *ptr
, size_t size
)
1069 CHECK_SIZE(s
, size
);
1071 STREAM_VERIFY_SANE(s
);
1073 if (STREAM_WRITEABLE (s
) < size
)
1075 STREAM_BOUND_WARN (s
, "put");
1079 memcpy (s
->data
+ s
->endp
, ptr
, size
);
1085 /* Return current read pointer.
1087 * Use stream_get_pnt_to if you must, but decoding streams properly
1091 stream_pnt (struct stream
*s
)
1093 STREAM_VERIFY_SANE(s
);
1094 return s
->data
+ s
->getp
;
1097 /* Check does this stream empty? */
1099 stream_empty (struct stream
*s
)
1101 STREAM_VERIFY_SANE(s
);
1103 return (s
->endp
== 0);
1108 stream_reset (struct stream
*s
)
1110 STREAM_VERIFY_SANE (s
);
1112 s
->getp
= s
->endp
= 0;
1115 /* Write stream contens to the file discriptor. */
1117 stream_flush (struct stream
*s
, int fd
)
1121 STREAM_VERIFY_SANE(s
);
1123 nbytes
= write (fd
, s
->data
+ s
->getp
, s
->endp
- s
->getp
);
1128 /* Stream first in first out queue. */
1130 struct stream_fifo
*
1131 stream_fifo_new (void)
1133 struct stream_fifo
*new;
1135 new = XCALLOC (MTYPE_STREAM_FIFO
, sizeof (struct stream_fifo
));
1139 /* Add new stream to fifo. */
1141 stream_fifo_push (struct stream_fifo
*fifo
, struct stream
*s
)
1144 fifo
->tail
->next
= s
;
1153 /* Delete first stream from fifo. */
1155 stream_fifo_pop (struct stream_fifo
*fifo
)
1163 fifo
->head
= s
->next
;
1165 if (fifo
->head
== NULL
)
1174 /* Return first fifo entry. */
1176 stream_fifo_head (struct stream_fifo
*fifo
)
1182 stream_fifo_clean (struct stream_fifo
*fifo
)
1185 struct stream
*next
;
1187 for (s
= fifo
->head
; s
; s
= next
)
1192 fifo
->head
= fifo
->tail
= NULL
;
1197 stream_fifo_free (struct stream_fifo
*fifo
)
1199 stream_fifo_clean (fifo
);
1200 XFREE (MTYPE_STREAM_FIFO
, fifo
);