]>
git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
3 * Copyright (C) 1999 Kunihiro Ishiguro
5 * This file is part of GNU Zebra.
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
31 DEFINE_MTYPE_STATIC(LIB
, STREAM
, "Stream")
32 DEFINE_MTYPE_STATIC(LIB
, STREAM_DATA
, "Stream data")
33 DEFINE_MTYPE_STATIC(LIB
, STREAM_FIFO
, "Stream FIFO")
35 /* Tests whether a position is valid */
36 #define GETP_VALID(S, G) ((G) <= (S)->endp)
37 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
38 #define ENDP_VALID(S, E) ((E) <= (S)->size)
40 /* asserting sanity checks. Following must be true before
41 * stream functions are called:
43 * Following must always be true of stream elements
44 * before and after calls to stream functions:
46 * getp <= endp <= size
48 * Note that after a stream function is called following may be true:
49 * if (getp == endp) then stream is no longer readable
50 * if (endp == size) then stream is no longer writeable
52 * It is valid to put to anywhere within the size of the stream, but only
53 * using stream_put..._at() functions.
55 #define STREAM_WARN_OFFSETS(S) \
56 zlog_warn("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
57 (void *)(S), (unsigned long)(S)->size, \
58 (unsigned long)(S)->getp, (unsigned long)(S)->endp)
60 #define STREAM_VERIFY_SANE(S) \
62 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) \
63 STREAM_WARN_OFFSETS(S); \
64 assert(GETP_VALID(S, (S)->getp)); \
65 assert(ENDP_VALID(S, (S)->endp)); \
68 #define STREAM_BOUND_WARN(S, WHAT) \
70 zlog_warn("%s: Attempt to %s out of bounds", __func__, \
72 STREAM_WARN_OFFSETS(S); \
76 /* XXX: Deprecated macro: do not use */
77 #define CHECK_SIZE(S, Z) \
79 if (((S)->endp + (Z)) > (S)->size) { \
81 "CHECK_SIZE: truncating requested size %lu\n", \
82 (unsigned long)(Z)); \
83 STREAM_WARN_OFFSETS(S); \
84 (Z) = (S)->size - (S)->endp; \
88 /* Make stream buffer. */
89 struct stream
*stream_new(size_t size
)
95 s
= XCALLOC(MTYPE_STREAM
, sizeof(struct stream
));
100 if ((s
->data
= XMALLOC(MTYPE_STREAM_DATA
, size
)) == NULL
) {
101 XFREE(MTYPE_STREAM
, s
);
110 void stream_free(struct stream
*s
)
115 XFREE(MTYPE_STREAM_DATA
, s
->data
);
116 XFREE(MTYPE_STREAM
, s
);
119 struct stream
*stream_copy(struct stream
*new, struct stream
*src
)
121 STREAM_VERIFY_SANE(src
);
124 assert(STREAM_SIZE(new) >= src
->endp
);
126 new->endp
= src
->endp
;
127 new->getp
= src
->getp
;
129 memcpy(new->data
, src
->data
, src
->endp
);
134 struct stream
*stream_dup(struct stream
*s
)
138 STREAM_VERIFY_SANE(s
);
140 if ((new = stream_new(s
->endp
)) == NULL
)
143 return (stream_copy(new, s
));
146 struct stream
*stream_dupcat(struct stream
*s1
, struct stream
*s2
,
151 STREAM_VERIFY_SANE(s1
);
152 STREAM_VERIFY_SANE(s2
);
154 if ((new = stream_new(s1
->endp
+ s2
->endp
)) == NULL
)
157 memcpy(new->data
, s1
->data
, offset
);
158 memcpy(new->data
+ offset
, s2
->data
, s2
->endp
);
159 memcpy(new->data
+ offset
+ s2
->endp
, s1
->data
+ offset
,
160 (s1
->endp
- offset
));
161 new->endp
= s1
->endp
+ s2
->endp
;
165 size_t stream_resize(struct stream
*s
, size_t newsize
)
168 STREAM_VERIFY_SANE(s
);
170 newdata
= XREALLOC(MTYPE_STREAM_DATA
, s
->data
, newsize
);
178 if (s
->endp
> s
->size
)
180 if (s
->getp
> s
->endp
)
183 STREAM_VERIFY_SANE(s
);
188 size_t stream_get_getp(struct stream
*s
)
190 STREAM_VERIFY_SANE(s
);
194 size_t stream_get_endp(struct stream
*s
)
196 STREAM_VERIFY_SANE(s
);
200 size_t stream_get_size(struct stream
*s
)
202 STREAM_VERIFY_SANE(s
);
206 /* Stream structre' stream pointer related functions. */
207 void stream_set_getp(struct stream
*s
, size_t pos
)
209 STREAM_VERIFY_SANE(s
);
211 if (!GETP_VALID(s
, pos
)) {
212 STREAM_BOUND_WARN(s
, "set getp");
219 void stream_set_endp(struct stream
*s
, size_t pos
)
221 STREAM_VERIFY_SANE(s
);
223 if (!ENDP_VALID(s
, pos
)) {
224 STREAM_BOUND_WARN(s
, "set endp");
229 * Make sure the current read pointer is not beyond the new endp.
232 STREAM_BOUND_WARN(s
, "set endp");
237 STREAM_VERIFY_SANE(s
);
240 /* Forward pointer. */
241 void stream_forward_getp(struct stream
*s
, size_t size
)
243 STREAM_VERIFY_SANE(s
);
245 if (!GETP_VALID(s
, s
->getp
+ size
)) {
246 STREAM_BOUND_WARN(s
, "seek getp");
253 void stream_forward_endp(struct stream
*s
, size_t size
)
255 STREAM_VERIFY_SANE(s
);
257 if (!ENDP_VALID(s
, s
->endp
+ size
)) {
258 STREAM_BOUND_WARN(s
, "seek endp");
265 /* Copy from stream to destination. */
266 void stream_get(void *dst
, struct stream
*s
, size_t size
)
268 STREAM_VERIFY_SANE(s
);
270 if (STREAM_READABLE(s
) < size
) {
271 STREAM_BOUND_WARN(s
, "get");
275 memcpy(dst
, s
->data
+ s
->getp
, size
);
279 /* Get next character from the stream. */
280 u_char
stream_getc(struct stream
*s
)
284 STREAM_VERIFY_SANE(s
);
286 if (STREAM_READABLE(s
) < sizeof(u_char
)) {
287 STREAM_BOUND_WARN(s
, "get char");
290 c
= s
->data
[s
->getp
++];
295 /* Get next character from the stream. */
296 u_char
stream_getc_from(struct stream
*s
, size_t from
)
300 STREAM_VERIFY_SANE(s
);
302 if (!GETP_VALID(s
, from
+ sizeof(u_char
))) {
303 STREAM_BOUND_WARN(s
, "get char");
312 /* Get next word from the stream. */
313 u_int16_t
stream_getw(struct stream
*s
)
317 STREAM_VERIFY_SANE(s
);
319 if (STREAM_READABLE(s
) < sizeof(u_int16_t
)) {
320 STREAM_BOUND_WARN(s
, "get ");
324 w
= s
->data
[s
->getp
++] << 8;
325 w
|= s
->data
[s
->getp
++];
330 /* Get next word from the stream. */
331 u_int16_t
stream_getw_from(struct stream
*s
, size_t from
)
335 STREAM_VERIFY_SANE(s
);
337 if (!GETP_VALID(s
, from
+ sizeof(u_int16_t
))) {
338 STREAM_BOUND_WARN(s
, "get ");
342 w
= s
->data
[from
++] << 8;
348 /* Get next 3-byte from the stream. */
349 u_int32_t
stream_get3_from(struct stream
*s
, size_t from
)
353 STREAM_VERIFY_SANE(s
);
355 if (!GETP_VALID(s
, from
+ 3)) {
356 STREAM_BOUND_WARN(s
, "get 3byte");
360 l
= s
->data
[from
++] << 16;
361 l
|= s
->data
[from
++] << 8;
367 u_int32_t
stream_get3(struct stream
*s
)
371 STREAM_VERIFY_SANE(s
);
373 if (STREAM_READABLE(s
) < 3) {
374 STREAM_BOUND_WARN(s
, "get 3byte");
378 l
= s
->data
[s
->getp
++] << 16;
379 l
|= s
->data
[s
->getp
++] << 8;
380 l
|= s
->data
[s
->getp
++];
385 /* Get next long word from the stream. */
386 u_int32_t
stream_getl_from(struct stream
*s
, size_t from
)
390 STREAM_VERIFY_SANE(s
);
392 if (!GETP_VALID(s
, from
+ sizeof(u_int32_t
))) {
393 STREAM_BOUND_WARN(s
, "get long");
397 l
= s
->data
[from
++] << 24;
398 l
|= s
->data
[from
++] << 16;
399 l
|= s
->data
[from
++] << 8;
405 /* Copy from stream at specific location to destination. */
406 void stream_get_from(void *dst
, struct stream
*s
, size_t from
, size_t size
)
408 STREAM_VERIFY_SANE(s
);
410 if (!GETP_VALID(s
, from
+ size
)) {
411 STREAM_BOUND_WARN(s
, "get from");
415 memcpy(dst
, s
->data
+ from
, size
);
418 u_int32_t
stream_getl(struct stream
*s
)
422 STREAM_VERIFY_SANE(s
);
424 if (STREAM_READABLE(s
) < sizeof(u_int32_t
)) {
425 STREAM_BOUND_WARN(s
, "get long");
429 l
= s
->data
[s
->getp
++] << 24;
430 l
|= s
->data
[s
->getp
++] << 16;
431 l
|= s
->data
[s
->getp
++] << 8;
432 l
|= s
->data
[s
->getp
++];
437 /* Get next quad word from the stream. */
438 uint64_t stream_getq_from(struct stream
*s
, size_t from
)
442 STREAM_VERIFY_SANE(s
);
444 if (!GETP_VALID(s
, from
+ sizeof(uint64_t))) {
445 STREAM_BOUND_WARN(s
, "get quad");
449 q
= ((uint64_t)s
->data
[from
++]) << 56;
450 q
|= ((uint64_t)s
->data
[from
++]) << 48;
451 q
|= ((uint64_t)s
->data
[from
++]) << 40;
452 q
|= ((uint64_t)s
->data
[from
++]) << 32;
453 q
|= ((uint64_t)s
->data
[from
++]) << 24;
454 q
|= ((uint64_t)s
->data
[from
++]) << 16;
455 q
|= ((uint64_t)s
->data
[from
++]) << 8;
456 q
|= ((uint64_t)s
->data
[from
++]);
461 uint64_t stream_getq(struct stream
*s
)
465 STREAM_VERIFY_SANE(s
);
467 if (STREAM_READABLE(s
) < sizeof(uint64_t)) {
468 STREAM_BOUND_WARN(s
, "get quad");
472 q
= ((uint64_t)s
->data
[s
->getp
++]) << 56;
473 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 48;
474 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 40;
475 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 32;
476 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 24;
477 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 16;
478 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 8;
479 q
|= ((uint64_t)s
->data
[s
->getp
++]);
484 /* Get next long word from the stream. */
485 u_int32_t
stream_get_ipv4(struct stream
*s
)
489 STREAM_VERIFY_SANE(s
);
491 if (STREAM_READABLE(s
) < sizeof(u_int32_t
)) {
492 STREAM_BOUND_WARN(s
, "get ipv4");
496 memcpy(&l
, s
->data
+ s
->getp
, sizeof(u_int32_t
));
497 s
->getp
+= sizeof(u_int32_t
);
502 float stream_getf(struct stream
*s
)
508 u
.d
= stream_getl(s
);
512 double stream_getd(struct stream
*s
)
518 u
.d
= stream_getq(s
);
522 /* Copy to source to stream.
524 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
525 * around. This should be fixed once the stream updates are working.
527 * stream_write() is saner
529 void stream_put(struct stream
*s
, const void *src
, size_t size
)
532 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
535 STREAM_VERIFY_SANE(s
);
537 if (STREAM_WRITEABLE(s
) < size
) {
538 STREAM_BOUND_WARN(s
, "put");
543 memcpy(s
->data
+ s
->endp
, src
, size
);
545 memset(s
->data
+ s
->endp
, 0, size
);
550 /* Put character to the stream. */
551 int stream_putc(struct stream
*s
, u_char c
)
553 STREAM_VERIFY_SANE(s
);
555 if (STREAM_WRITEABLE(s
) < sizeof(u_char
)) {
556 STREAM_BOUND_WARN(s
, "put");
560 s
->data
[s
->endp
++] = c
;
561 return sizeof(u_char
);
564 /* Put word to the stream. */
565 int stream_putw(struct stream
*s
, u_int16_t w
)
567 STREAM_VERIFY_SANE(s
);
569 if (STREAM_WRITEABLE(s
) < sizeof(u_int16_t
)) {
570 STREAM_BOUND_WARN(s
, "put");
574 s
->data
[s
->endp
++] = (u_char
)(w
>> 8);
575 s
->data
[s
->endp
++] = (u_char
)w
;
580 /* Put long word to the stream. */
581 int stream_put3(struct stream
*s
, u_int32_t l
)
583 STREAM_VERIFY_SANE(s
);
585 if (STREAM_WRITEABLE(s
) < 3) {
586 STREAM_BOUND_WARN(s
, "put");
590 s
->data
[s
->endp
++] = (u_char
)(l
>> 16);
591 s
->data
[s
->endp
++] = (u_char
)(l
>> 8);
592 s
->data
[s
->endp
++] = (u_char
)l
;
597 /* Put long word to the stream. */
598 int stream_putl(struct stream
*s
, u_int32_t l
)
600 STREAM_VERIFY_SANE(s
);
602 if (STREAM_WRITEABLE(s
) < sizeof(u_int32_t
)) {
603 STREAM_BOUND_WARN(s
, "put");
607 s
->data
[s
->endp
++] = (u_char
)(l
>> 24);
608 s
->data
[s
->endp
++] = (u_char
)(l
>> 16);
609 s
->data
[s
->endp
++] = (u_char
)(l
>> 8);
610 s
->data
[s
->endp
++] = (u_char
)l
;
615 /* Put quad word to the stream. */
616 int stream_putq(struct stream
*s
, uint64_t q
)
618 STREAM_VERIFY_SANE(s
);
620 if (STREAM_WRITEABLE(s
) < sizeof(uint64_t)) {
621 STREAM_BOUND_WARN(s
, "put quad");
625 s
->data
[s
->endp
++] = (u_char
)(q
>> 56);
626 s
->data
[s
->endp
++] = (u_char
)(q
>> 48);
627 s
->data
[s
->endp
++] = (u_char
)(q
>> 40);
628 s
->data
[s
->endp
++] = (u_char
)(q
>> 32);
629 s
->data
[s
->endp
++] = (u_char
)(q
>> 24);
630 s
->data
[s
->endp
++] = (u_char
)(q
>> 16);
631 s
->data
[s
->endp
++] = (u_char
)(q
>> 8);
632 s
->data
[s
->endp
++] = (u_char
)q
;
637 int stream_putf(struct stream
*s
, float f
)
644 return stream_putl(s
, u
.o
);
647 int stream_putd(struct stream
*s
, double d
)
654 return stream_putq(s
, u
.o
);
657 int stream_putc_at(struct stream
*s
, size_t putp
, u_char c
)
659 STREAM_VERIFY_SANE(s
);
661 if (!PUT_AT_VALID(s
, putp
+ sizeof(u_char
))) {
662 STREAM_BOUND_WARN(s
, "put");
671 int stream_putw_at(struct stream
*s
, size_t putp
, u_int16_t w
)
673 STREAM_VERIFY_SANE(s
);
675 if (!PUT_AT_VALID(s
, putp
+ sizeof(u_int16_t
))) {
676 STREAM_BOUND_WARN(s
, "put");
680 s
->data
[putp
] = (u_char
)(w
>> 8);
681 s
->data
[putp
+ 1] = (u_char
)w
;
686 int stream_put3_at(struct stream
*s
, size_t putp
, u_int32_t l
)
688 STREAM_VERIFY_SANE(s
);
690 if (!PUT_AT_VALID(s
, putp
+ 3)) {
691 STREAM_BOUND_WARN(s
, "put");
694 s
->data
[putp
] = (u_char
)(l
>> 16);
695 s
->data
[putp
+ 1] = (u_char
)(l
>> 8);
696 s
->data
[putp
+ 2] = (u_char
)l
;
701 int stream_putl_at(struct stream
*s
, size_t putp
, u_int32_t l
)
703 STREAM_VERIFY_SANE(s
);
705 if (!PUT_AT_VALID(s
, putp
+ sizeof(u_int32_t
))) {
706 STREAM_BOUND_WARN(s
, "put");
709 s
->data
[putp
] = (u_char
)(l
>> 24);
710 s
->data
[putp
+ 1] = (u_char
)(l
>> 16);
711 s
->data
[putp
+ 2] = (u_char
)(l
>> 8);
712 s
->data
[putp
+ 3] = (u_char
)l
;
717 int stream_putq_at(struct stream
*s
, size_t putp
, uint64_t q
)
719 STREAM_VERIFY_SANE(s
);
721 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint64_t))) {
722 STREAM_BOUND_WARN(s
, "put");
725 s
->data
[putp
] = (u_char
)(q
>> 56);
726 s
->data
[putp
+ 1] = (u_char
)(q
>> 48);
727 s
->data
[putp
+ 2] = (u_char
)(q
>> 40);
728 s
->data
[putp
+ 3] = (u_char
)(q
>> 32);
729 s
->data
[putp
+ 4] = (u_char
)(q
>> 24);
730 s
->data
[putp
+ 5] = (u_char
)(q
>> 16);
731 s
->data
[putp
+ 6] = (u_char
)(q
>> 8);
732 s
->data
[putp
+ 7] = (u_char
)q
;
737 /* Put long word to the stream. */
738 int stream_put_ipv4(struct stream
*s
, u_int32_t l
)
740 STREAM_VERIFY_SANE(s
);
742 if (STREAM_WRITEABLE(s
) < sizeof(u_int32_t
)) {
743 STREAM_BOUND_WARN(s
, "put");
746 memcpy(s
->data
+ s
->endp
, &l
, sizeof(u_int32_t
));
747 s
->endp
+= sizeof(u_int32_t
);
749 return sizeof(u_int32_t
);
752 /* Put long word to the stream. */
753 int stream_put_in_addr(struct stream
*s
, struct in_addr
*addr
)
755 STREAM_VERIFY_SANE(s
);
757 if (STREAM_WRITEABLE(s
) < sizeof(u_int32_t
)) {
758 STREAM_BOUND_WARN(s
, "put");
762 memcpy(s
->data
+ s
->endp
, addr
, sizeof(u_int32_t
));
763 s
->endp
+= sizeof(u_int32_t
);
765 return sizeof(u_int32_t
);
768 /* Put in_addr at location in the stream. */
769 int stream_put_in_addr_at(struct stream
*s
, size_t putp
, struct in_addr
*addr
)
771 STREAM_VERIFY_SANE(s
);
773 if (!PUT_AT_VALID(s
, putp
+ 4)) {
774 STREAM_BOUND_WARN(s
, "put");
778 memcpy(&s
->data
[putp
], addr
, 4);
782 /* Put in6_addr at location in the stream. */
783 int stream_put_in6_addr_at(struct stream
*s
, size_t putp
, struct in6_addr
*addr
)
785 STREAM_VERIFY_SANE(s
);
787 if (!PUT_AT_VALID(s
, putp
+ 16)) {
788 STREAM_BOUND_WARN(s
, "put");
792 memcpy(&s
->data
[putp
], addr
, 16);
796 /* Put prefix by nlri type format. */
797 int stream_put_prefix_addpath(struct stream
*s
, struct prefix
*p
,
798 int addpath_encode
, u_int32_t addpath_tx_id
)
801 size_t psize_with_addpath
;
803 STREAM_VERIFY_SANE(s
);
805 psize
= PSIZE(p
->prefixlen
);
808 psize_with_addpath
= psize
+ 4;
810 psize_with_addpath
= psize
;
812 if (STREAM_WRITEABLE(s
) < (psize_with_addpath
+ sizeof(u_char
))) {
813 STREAM_BOUND_WARN(s
, "put");
817 if (addpath_encode
) {
818 s
->data
[s
->endp
++] = (u_char
)(addpath_tx_id
>> 24);
819 s
->data
[s
->endp
++] = (u_char
)(addpath_tx_id
>> 16);
820 s
->data
[s
->endp
++] = (u_char
)(addpath_tx_id
>> 8);
821 s
->data
[s
->endp
++] = (u_char
)addpath_tx_id
;
824 s
->data
[s
->endp
++] = p
->prefixlen
;
825 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
831 int stream_put_prefix(struct stream
*s
, struct prefix
*p
)
833 return stream_put_prefix_addpath(s
, p
, 0, 0);
836 /* Put NLRI with label */
837 int stream_put_labeled_prefix(struct stream
*s
, struct prefix
*p
,
841 u_char
*label_pnt
= (u_char
*)label
;
843 STREAM_VERIFY_SANE(s
);
845 psize
= PSIZE(p
->prefixlen
);
847 if (STREAM_WRITEABLE(s
) < (psize
+ 3)) {
848 STREAM_BOUND_WARN(s
, "put");
852 stream_putc(s
, (p
->prefixlen
+ 24));
853 stream_putc(s
, label_pnt
[0]);
854 stream_putc(s
, label_pnt
[1]);
855 stream_putc(s
, label_pnt
[2]);
856 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
862 /* Read size from fd. */
863 int stream_read(struct stream
*s
, int fd
, size_t size
)
867 STREAM_VERIFY_SANE(s
);
869 if (STREAM_WRITEABLE(s
) < size
) {
870 STREAM_BOUND_WARN(s
, "put");
874 nbytes
= readn(fd
, s
->data
+ s
->endp
, size
);
882 ssize_t
stream_read_try(struct stream
*s
, int fd
, size_t size
)
886 STREAM_VERIFY_SANE(s
);
888 if (STREAM_WRITEABLE(s
) < size
) {
889 STREAM_BOUND_WARN(s
, "put");
890 /* Fatal (not transient) error, since retrying will not help
891 (stream is too small to contain the desired data). */
895 if ((nbytes
= read(fd
, s
->data
+ s
->endp
, size
)) >= 0) {
899 /* Error: was it transient (return -2) or fatal (return -1)? */
900 if (ERRNO_IO_RETRY(errno
))
902 zlog_warn("%s: read failed on fd %d: %s", __func__
, fd
,
903 safe_strerror(errno
));
907 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
908 * whose arguments match the remaining arguments to this function
910 ssize_t
stream_recvfrom(struct stream
*s
, int fd
, size_t size
, int flags
,
911 struct sockaddr
*from
, socklen_t
*fromlen
)
915 STREAM_VERIFY_SANE(s
);
917 if (STREAM_WRITEABLE(s
) < size
) {
918 STREAM_BOUND_WARN(s
, "put");
919 /* Fatal (not transient) error, since retrying will not help
920 (stream is too small to contain the desired data). */
924 if ((nbytes
= recvfrom(fd
, s
->data
+ s
->endp
, size
, flags
, from
,
930 /* Error: was it transient (return -2) or fatal (return -1)? */
931 if (ERRNO_IO_RETRY(errno
))
933 zlog_warn("%s: read failed on fd %d: %s", __func__
, fd
,
934 safe_strerror(errno
));
938 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
940 * First iovec will be used to receive the data.
941 * Stream need not be empty.
943 ssize_t
stream_recvmsg(struct stream
*s
, int fd
, struct msghdr
*msgh
, int flags
,
949 STREAM_VERIFY_SANE(s
);
950 assert(msgh
->msg_iovlen
> 0);
952 if (STREAM_WRITEABLE(s
) < size
) {
953 STREAM_BOUND_WARN(s
, "put");
954 /* This is a logic error in the calling code: the stream is too
956 to hold the desired data! */
960 iov
= &(msgh
->msg_iov
[0]);
961 iov
->iov_base
= (s
->data
+ s
->endp
);
964 nbytes
= recvmsg(fd
, msgh
, flags
);
972 /* Write data to buffer. */
973 size_t stream_write(struct stream
*s
, const void *ptr
, size_t size
)
978 STREAM_VERIFY_SANE(s
);
980 if (STREAM_WRITEABLE(s
) < size
) {
981 STREAM_BOUND_WARN(s
, "put");
985 memcpy(s
->data
+ s
->endp
, ptr
, size
);
991 /* Return current read pointer.
993 * Use stream_get_pnt_to if you must, but decoding streams properly
996 u_char
*stream_pnt(struct stream
*s
)
998 STREAM_VERIFY_SANE(s
);
999 return s
->data
+ s
->getp
;
1002 /* Check does this stream empty? */
1003 int stream_empty(struct stream
*s
)
1005 STREAM_VERIFY_SANE(s
);
1007 return (s
->endp
== 0);
1011 void stream_reset(struct stream
*s
)
1013 STREAM_VERIFY_SANE(s
);
1015 s
->getp
= s
->endp
= 0;
1018 /* Write stream contens to the file discriptor. */
1019 int stream_flush(struct stream
*s
, int fd
)
1023 STREAM_VERIFY_SANE(s
);
1025 nbytes
= write(fd
, s
->data
+ s
->getp
, s
->endp
- s
->getp
);
1030 /* Stream first in first out queue. */
1032 struct stream_fifo
*stream_fifo_new(void)
1034 struct stream_fifo
*new;
1036 new = XCALLOC(MTYPE_STREAM_FIFO
, sizeof(struct stream_fifo
));
1040 /* Add new stream to fifo. */
1041 void stream_fifo_push(struct stream_fifo
*fifo
, struct stream
*s
)
1044 fifo
->tail
->next
= s
;
1053 /* Delete first stream from fifo. */
1054 struct stream
*stream_fifo_pop(struct stream_fifo
*fifo
)
1061 fifo
->head
= s
->next
;
1063 if (fifo
->head
== NULL
)
1072 /* Return first fifo entry. */
1073 struct stream
*stream_fifo_head(struct stream_fifo
*fifo
)
1078 void stream_fifo_clean(struct stream_fifo
*fifo
)
1081 struct stream
*next
;
1083 for (s
= fifo
->head
; s
; s
= next
) {
1087 fifo
->head
= fifo
->tail
= NULL
;
1091 void stream_fifo_free(struct stream_fifo
*fifo
)
1093 stream_fifo_clean(fifo
);
1094 XFREE(MTYPE_STREAM_FIFO
, fifo
);