]>
git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
3 * Copyright (C) 1999 Kunihiro Ishiguro
5 * This file is part of GNU Zebra.
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
32 DEFINE_MTYPE_STATIC(LIB
, STREAM
, "Stream")
33 DEFINE_MTYPE_STATIC(LIB
, STREAM_DATA
, "Stream data")
34 DEFINE_MTYPE_STATIC(LIB
, STREAM_FIFO
, "Stream FIFO")
36 /* Tests whether a position is valid */
37 #define GETP_VALID(S, G) ((G) <= (S)->endp)
38 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
39 #define ENDP_VALID(S, E) ((E) <= (S)->size)
41 /* asserting sanity checks. Following must be true before
42 * stream functions are called:
44 * Following must always be true of stream elements
45 * before and after calls to stream functions:
47 * getp <= endp <= size
49 * Note that after a stream function is called following may be true:
50 * if (getp == endp) then stream is no longer readable
51 * if (endp == size) then stream is no longer writeable
53 * It is valid to put to anywhere within the size of the stream, but only
54 * using stream_put..._at() functions.
56 #define STREAM_WARN_OFFSETS(S) \
57 zlog_warn("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
58 (void *)(S), (unsigned long)(S)->size, \
59 (unsigned long)(S)->getp, (unsigned long)(S)->endp)
61 #define STREAM_VERIFY_SANE(S) \
63 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) \
64 STREAM_WARN_OFFSETS(S); \
65 assert(GETP_VALID(S, (S)->getp)); \
66 assert(ENDP_VALID(S, (S)->endp)); \
69 #define STREAM_BOUND_WARN(S, WHAT) \
71 zlog_warn("%s: Attempt to %s out of bounds", __func__, \
73 STREAM_WARN_OFFSETS(S); \
77 #define STREAM_BOUND_WARN2(S, WHAT) \
79 zlog_warn("%s: Attempt to %s out of bounds", __func__, \
81 STREAM_WARN_OFFSETS(S); \
84 /* XXX: Deprecated macro: do not use */
85 #define CHECK_SIZE(S, Z) \
87 if (((S)->endp + (Z)) > (S)->size) { \
89 "CHECK_SIZE: truncating requested size %lu\n", \
90 (unsigned long)(Z)); \
91 STREAM_WARN_OFFSETS(S); \
92 (Z) = (S)->size - (S)->endp; \
96 /* Make stream buffer. */
97 struct stream
*stream_new(size_t size
)
103 s
= XMALLOC(MTYPE_STREAM
, sizeof(struct stream
));
105 s
->data
= XMALLOC(MTYPE_STREAM_DATA
, size
);
107 s
->getp
= s
->endp
= 0;
114 void stream_free(struct stream
*s
)
119 XFREE(MTYPE_STREAM_DATA
, s
->data
);
120 XFREE(MTYPE_STREAM
, s
);
123 struct stream
*stream_copy(struct stream
*new, struct stream
*src
)
125 STREAM_VERIFY_SANE(src
);
128 assert(STREAM_SIZE(new) >= src
->endp
);
130 new->endp
= src
->endp
;
131 new->getp
= src
->getp
;
133 memcpy(new->data
, src
->data
, src
->endp
);
138 struct stream
*stream_dup(struct stream
*s
)
142 STREAM_VERIFY_SANE(s
);
144 if ((new = stream_new(s
->endp
)) == NULL
)
147 return (stream_copy(new, s
));
150 struct stream
*stream_dupcat(struct stream
*s1
, struct stream
*s2
,
155 STREAM_VERIFY_SANE(s1
);
156 STREAM_VERIFY_SANE(s2
);
158 if ((new = stream_new(s1
->endp
+ s2
->endp
)) == NULL
)
161 memcpy(new->data
, s1
->data
, offset
);
162 memcpy(new->data
+ offset
, s2
->data
, s2
->endp
);
163 memcpy(new->data
+ offset
+ s2
->endp
, s1
->data
+ offset
,
164 (s1
->endp
- offset
));
165 new->endp
= s1
->endp
+ s2
->endp
;
169 size_t stream_resize(struct stream
*s
, size_t newsize
)
172 STREAM_VERIFY_SANE(s
);
174 newdata
= XREALLOC(MTYPE_STREAM_DATA
, s
->data
, newsize
);
182 if (s
->endp
> s
->size
)
184 if (s
->getp
> s
->endp
)
187 STREAM_VERIFY_SANE(s
);
192 size_t stream_get_getp(struct stream
*s
)
194 STREAM_VERIFY_SANE(s
);
198 size_t stream_get_endp(struct stream
*s
)
200 STREAM_VERIFY_SANE(s
);
204 size_t stream_get_size(struct stream
*s
)
206 STREAM_VERIFY_SANE(s
);
210 /* Stream structre' stream pointer related functions. */
211 void stream_set_getp(struct stream
*s
, size_t pos
)
213 STREAM_VERIFY_SANE(s
);
215 if (!GETP_VALID(s
, pos
)) {
216 STREAM_BOUND_WARN(s
, "set getp");
223 void stream_set_endp(struct stream
*s
, size_t pos
)
225 STREAM_VERIFY_SANE(s
);
227 if (!ENDP_VALID(s
, pos
)) {
228 STREAM_BOUND_WARN(s
, "set endp");
233 * Make sure the current read pointer is not beyond the new endp.
236 STREAM_BOUND_WARN(s
, "set endp");
241 STREAM_VERIFY_SANE(s
);
244 /* Forward pointer. */
245 void stream_forward_getp(struct stream
*s
, size_t size
)
247 STREAM_VERIFY_SANE(s
);
249 if (!GETP_VALID(s
, s
->getp
+ size
)) {
250 STREAM_BOUND_WARN(s
, "seek getp");
257 void stream_forward_endp(struct stream
*s
, size_t size
)
259 STREAM_VERIFY_SANE(s
);
261 if (!ENDP_VALID(s
, s
->endp
+ size
)) {
262 STREAM_BOUND_WARN(s
, "seek endp");
269 /* Copy from stream to destination. */
270 inline bool stream_get2(void *dst
, struct stream
*s
, size_t size
)
272 STREAM_VERIFY_SANE(s
);
274 if (STREAM_READABLE(s
) < size
) {
275 STREAM_BOUND_WARN2(s
, "get");
279 memcpy(dst
, s
->data
+ s
->getp
, size
);
285 void stream_get(void *dst
, struct stream
*s
, size_t size
)
287 STREAM_VERIFY_SANE(s
);
289 if (STREAM_READABLE(s
) < size
) {
290 STREAM_BOUND_WARN(s
, "get");
294 memcpy(dst
, s
->data
+ s
->getp
, size
);
298 /* Get next character from the stream. */
299 inline bool stream_getc2(struct stream
*s
, uint8_t *byte
)
301 STREAM_VERIFY_SANE(s
);
303 if (STREAM_READABLE(s
) < sizeof(uint8_t)) {
304 STREAM_BOUND_WARN2(s
, "get char");
307 *byte
= s
->data
[s
->getp
++];
312 uint8_t stream_getc(struct stream
*s
)
316 STREAM_VERIFY_SANE(s
);
318 if (STREAM_READABLE(s
) < sizeof(uint8_t)) {
319 STREAM_BOUND_WARN(s
, "get char");
322 c
= s
->data
[s
->getp
++];
327 /* Get next character from the stream. */
328 uint8_t stream_getc_from(struct stream
*s
, size_t from
)
332 STREAM_VERIFY_SANE(s
);
334 if (!GETP_VALID(s
, from
+ sizeof(uint8_t))) {
335 STREAM_BOUND_WARN(s
, "get char");
344 inline bool stream_getw2(struct stream
*s
, uint16_t *word
)
346 STREAM_VERIFY_SANE(s
);
348 if (STREAM_READABLE(s
) < sizeof(uint16_t)) {
349 STREAM_BOUND_WARN2(s
, "get ");
353 *word
= s
->data
[s
->getp
++] << 8;
354 *word
|= s
->data
[s
->getp
++];
359 /* Get next word from the stream. */
360 uint16_t stream_getw(struct stream
*s
)
364 STREAM_VERIFY_SANE(s
);
366 if (STREAM_READABLE(s
) < sizeof(uint16_t)) {
367 STREAM_BOUND_WARN(s
, "get ");
371 w
= s
->data
[s
->getp
++] << 8;
372 w
|= s
->data
[s
->getp
++];
377 /* Get next word from the stream. */
378 uint16_t stream_getw_from(struct stream
*s
, size_t from
)
382 STREAM_VERIFY_SANE(s
);
384 if (!GETP_VALID(s
, from
+ sizeof(uint16_t))) {
385 STREAM_BOUND_WARN(s
, "get ");
389 w
= s
->data
[from
++] << 8;
395 /* Get next 3-byte from the stream. */
396 uint32_t stream_get3_from(struct stream
*s
, size_t from
)
400 STREAM_VERIFY_SANE(s
);
402 if (!GETP_VALID(s
, from
+ 3)) {
403 STREAM_BOUND_WARN(s
, "get 3byte");
407 l
= s
->data
[from
++] << 16;
408 l
|= s
->data
[from
++] << 8;
414 uint32_t stream_get3(struct stream
*s
)
418 STREAM_VERIFY_SANE(s
);
420 if (STREAM_READABLE(s
) < 3) {
421 STREAM_BOUND_WARN(s
, "get 3byte");
425 l
= s
->data
[s
->getp
++] << 16;
426 l
|= s
->data
[s
->getp
++] << 8;
427 l
|= s
->data
[s
->getp
++];
432 /* Get next long word from the stream. */
433 uint32_t stream_getl_from(struct stream
*s
, size_t from
)
437 STREAM_VERIFY_SANE(s
);
439 if (!GETP_VALID(s
, from
+ sizeof(uint32_t))) {
440 STREAM_BOUND_WARN(s
, "get long");
444 l
= (unsigned)(s
->data
[from
++]) << 24;
445 l
|= s
->data
[from
++] << 16;
446 l
|= s
->data
[from
++] << 8;
452 /* Copy from stream at specific location to destination. */
453 void stream_get_from(void *dst
, struct stream
*s
, size_t from
, size_t size
)
455 STREAM_VERIFY_SANE(s
);
457 if (!GETP_VALID(s
, from
+ size
)) {
458 STREAM_BOUND_WARN(s
, "get from");
462 memcpy(dst
, s
->data
+ from
, size
);
465 inline bool stream_getl2(struct stream
*s
, uint32_t *l
)
467 STREAM_VERIFY_SANE(s
);
469 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
470 STREAM_BOUND_WARN2(s
, "get long");
474 *l
= (unsigned int)(s
->data
[s
->getp
++]) << 24;
475 *l
|= s
->data
[s
->getp
++] << 16;
476 *l
|= s
->data
[s
->getp
++] << 8;
477 *l
|= s
->data
[s
->getp
++];
482 uint32_t stream_getl(struct stream
*s
)
486 STREAM_VERIFY_SANE(s
);
488 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
489 STREAM_BOUND_WARN(s
, "get long");
493 l
= (unsigned)(s
->data
[s
->getp
++]) << 24;
494 l
|= s
->data
[s
->getp
++] << 16;
495 l
|= s
->data
[s
->getp
++] << 8;
496 l
|= s
->data
[s
->getp
++];
501 /* Get next quad word from the stream. */
502 uint64_t stream_getq_from(struct stream
*s
, size_t from
)
506 STREAM_VERIFY_SANE(s
);
508 if (!GETP_VALID(s
, from
+ sizeof(uint64_t))) {
509 STREAM_BOUND_WARN(s
, "get quad");
513 q
= ((uint64_t)s
->data
[from
++]) << 56;
514 q
|= ((uint64_t)s
->data
[from
++]) << 48;
515 q
|= ((uint64_t)s
->data
[from
++]) << 40;
516 q
|= ((uint64_t)s
->data
[from
++]) << 32;
517 q
|= ((uint64_t)s
->data
[from
++]) << 24;
518 q
|= ((uint64_t)s
->data
[from
++]) << 16;
519 q
|= ((uint64_t)s
->data
[from
++]) << 8;
520 q
|= ((uint64_t)s
->data
[from
++]);
525 uint64_t stream_getq(struct stream
*s
)
529 STREAM_VERIFY_SANE(s
);
531 if (STREAM_READABLE(s
) < sizeof(uint64_t)) {
532 STREAM_BOUND_WARN(s
, "get quad");
536 q
= ((uint64_t)s
->data
[s
->getp
++]) << 56;
537 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 48;
538 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 40;
539 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 32;
540 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 24;
541 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 16;
542 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 8;
543 q
|= ((uint64_t)s
->data
[s
->getp
++]);
548 /* Get next long word from the stream. */
549 uint32_t stream_get_ipv4(struct stream
*s
)
553 STREAM_VERIFY_SANE(s
);
555 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
556 STREAM_BOUND_WARN(s
, "get ipv4");
560 memcpy(&l
, s
->data
+ s
->getp
, sizeof(uint32_t));
561 s
->getp
+= sizeof(uint32_t);
566 float stream_getf(struct stream
*s
)
572 u
.d
= stream_getl(s
);
576 double stream_getd(struct stream
*s
)
582 u
.d
= stream_getq(s
);
586 /* Copy to source to stream.
588 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
589 * around. This should be fixed once the stream updates are working.
591 * stream_write() is saner
593 void stream_put(struct stream
*s
, const void *src
, size_t size
)
596 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
599 STREAM_VERIFY_SANE(s
);
601 if (STREAM_WRITEABLE(s
) < size
) {
602 STREAM_BOUND_WARN(s
, "put");
607 memcpy(s
->data
+ s
->endp
, src
, size
);
609 memset(s
->data
+ s
->endp
, 0, size
);
614 /* Put character to the stream. */
615 int stream_putc(struct stream
*s
, uint8_t c
)
617 STREAM_VERIFY_SANE(s
);
619 if (STREAM_WRITEABLE(s
) < sizeof(uint8_t)) {
620 STREAM_BOUND_WARN(s
, "put");
624 s
->data
[s
->endp
++] = c
;
625 return sizeof(uint8_t);
628 /* Put word to the stream. */
629 int stream_putw(struct stream
*s
, uint16_t w
)
631 STREAM_VERIFY_SANE(s
);
633 if (STREAM_WRITEABLE(s
) < sizeof(uint16_t)) {
634 STREAM_BOUND_WARN(s
, "put");
638 s
->data
[s
->endp
++] = (uint8_t)(w
>> 8);
639 s
->data
[s
->endp
++] = (uint8_t)w
;
644 /* Put long word to the stream. */
645 int stream_put3(struct stream
*s
, uint32_t l
)
647 STREAM_VERIFY_SANE(s
);
649 if (STREAM_WRITEABLE(s
) < 3) {
650 STREAM_BOUND_WARN(s
, "put");
654 s
->data
[s
->endp
++] = (uint8_t)(l
>> 16);
655 s
->data
[s
->endp
++] = (uint8_t)(l
>> 8);
656 s
->data
[s
->endp
++] = (uint8_t)l
;
661 /* Put long word to the stream. */
662 int stream_putl(struct stream
*s
, uint32_t l
)
664 STREAM_VERIFY_SANE(s
);
666 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
667 STREAM_BOUND_WARN(s
, "put");
671 s
->data
[s
->endp
++] = (uint8_t)(l
>> 24);
672 s
->data
[s
->endp
++] = (uint8_t)(l
>> 16);
673 s
->data
[s
->endp
++] = (uint8_t)(l
>> 8);
674 s
->data
[s
->endp
++] = (uint8_t)l
;
679 /* Put quad word to the stream. */
680 int stream_putq(struct stream
*s
, uint64_t q
)
682 STREAM_VERIFY_SANE(s
);
684 if (STREAM_WRITEABLE(s
) < sizeof(uint64_t)) {
685 STREAM_BOUND_WARN(s
, "put quad");
689 s
->data
[s
->endp
++] = (uint8_t)(q
>> 56);
690 s
->data
[s
->endp
++] = (uint8_t)(q
>> 48);
691 s
->data
[s
->endp
++] = (uint8_t)(q
>> 40);
692 s
->data
[s
->endp
++] = (uint8_t)(q
>> 32);
693 s
->data
[s
->endp
++] = (uint8_t)(q
>> 24);
694 s
->data
[s
->endp
++] = (uint8_t)(q
>> 16);
695 s
->data
[s
->endp
++] = (uint8_t)(q
>> 8);
696 s
->data
[s
->endp
++] = (uint8_t)q
;
701 int stream_putf(struct stream
*s
, float f
)
708 return stream_putl(s
, u
.o
);
711 int stream_putd(struct stream
*s
, double d
)
718 return stream_putq(s
, u
.o
);
721 int stream_putc_at(struct stream
*s
, size_t putp
, uint8_t c
)
723 STREAM_VERIFY_SANE(s
);
725 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint8_t))) {
726 STREAM_BOUND_WARN(s
, "put");
735 int stream_putw_at(struct stream
*s
, size_t putp
, uint16_t w
)
737 STREAM_VERIFY_SANE(s
);
739 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint16_t))) {
740 STREAM_BOUND_WARN(s
, "put");
744 s
->data
[putp
] = (uint8_t)(w
>> 8);
745 s
->data
[putp
+ 1] = (uint8_t)w
;
750 int stream_put3_at(struct stream
*s
, size_t putp
, uint32_t l
)
752 STREAM_VERIFY_SANE(s
);
754 if (!PUT_AT_VALID(s
, putp
+ 3)) {
755 STREAM_BOUND_WARN(s
, "put");
758 s
->data
[putp
] = (uint8_t)(l
>> 16);
759 s
->data
[putp
+ 1] = (uint8_t)(l
>> 8);
760 s
->data
[putp
+ 2] = (uint8_t)l
;
765 int stream_putl_at(struct stream
*s
, size_t putp
, uint32_t l
)
767 STREAM_VERIFY_SANE(s
);
769 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint32_t))) {
770 STREAM_BOUND_WARN(s
, "put");
773 s
->data
[putp
] = (uint8_t)(l
>> 24);
774 s
->data
[putp
+ 1] = (uint8_t)(l
>> 16);
775 s
->data
[putp
+ 2] = (uint8_t)(l
>> 8);
776 s
->data
[putp
+ 3] = (uint8_t)l
;
781 int stream_putq_at(struct stream
*s
, size_t putp
, uint64_t q
)
783 STREAM_VERIFY_SANE(s
);
785 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint64_t))) {
786 STREAM_BOUND_WARN(s
, "put");
789 s
->data
[putp
] = (uint8_t)(q
>> 56);
790 s
->data
[putp
+ 1] = (uint8_t)(q
>> 48);
791 s
->data
[putp
+ 2] = (uint8_t)(q
>> 40);
792 s
->data
[putp
+ 3] = (uint8_t)(q
>> 32);
793 s
->data
[putp
+ 4] = (uint8_t)(q
>> 24);
794 s
->data
[putp
+ 5] = (uint8_t)(q
>> 16);
795 s
->data
[putp
+ 6] = (uint8_t)(q
>> 8);
796 s
->data
[putp
+ 7] = (uint8_t)q
;
801 /* Put long word to the stream. */
802 int stream_put_ipv4(struct stream
*s
, uint32_t l
)
804 STREAM_VERIFY_SANE(s
);
806 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
807 STREAM_BOUND_WARN(s
, "put");
810 memcpy(s
->data
+ s
->endp
, &l
, sizeof(uint32_t));
811 s
->endp
+= sizeof(uint32_t);
813 return sizeof(uint32_t);
816 /* Put long word to the stream. */
817 int stream_put_in_addr(struct stream
*s
, struct in_addr
*addr
)
819 STREAM_VERIFY_SANE(s
);
821 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
822 STREAM_BOUND_WARN(s
, "put");
826 memcpy(s
->data
+ s
->endp
, addr
, sizeof(uint32_t));
827 s
->endp
+= sizeof(uint32_t);
829 return sizeof(uint32_t);
832 /* Put in_addr at location in the stream. */
833 int stream_put_in_addr_at(struct stream
*s
, size_t putp
, struct in_addr
*addr
)
835 STREAM_VERIFY_SANE(s
);
837 if (!PUT_AT_VALID(s
, putp
+ 4)) {
838 STREAM_BOUND_WARN(s
, "put");
842 memcpy(&s
->data
[putp
], addr
, 4);
846 /* Put in6_addr at location in the stream. */
847 int stream_put_in6_addr_at(struct stream
*s
, size_t putp
, struct in6_addr
*addr
)
849 STREAM_VERIFY_SANE(s
);
851 if (!PUT_AT_VALID(s
, putp
+ 16)) {
852 STREAM_BOUND_WARN(s
, "put");
856 memcpy(&s
->data
[putp
], addr
, 16);
860 /* Put prefix by nlri type format. */
861 int stream_put_prefix_addpath(struct stream
*s
, struct prefix
*p
,
862 int addpath_encode
, uint32_t addpath_tx_id
)
865 size_t psize_with_addpath
;
867 STREAM_VERIFY_SANE(s
);
869 psize
= PSIZE(p
->prefixlen
);
872 psize_with_addpath
= psize
+ 4;
874 psize_with_addpath
= psize
;
876 if (STREAM_WRITEABLE(s
) < (psize_with_addpath
+ sizeof(uint8_t))) {
877 STREAM_BOUND_WARN(s
, "put");
881 if (addpath_encode
) {
882 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 24);
883 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 16);
884 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 8);
885 s
->data
[s
->endp
++] = (uint8_t)addpath_tx_id
;
888 s
->data
[s
->endp
++] = p
->prefixlen
;
889 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
895 int stream_put_prefix(struct stream
*s
, struct prefix
*p
)
897 return stream_put_prefix_addpath(s
, p
, 0, 0);
900 /* Put NLRI with label */
901 int stream_put_labeled_prefix(struct stream
*s
, struct prefix
*p
,
905 uint8_t *label_pnt
= (uint8_t *)label
;
907 STREAM_VERIFY_SANE(s
);
909 psize
= PSIZE(p
->prefixlen
);
911 if (STREAM_WRITEABLE(s
) < (psize
+ 3)) {
912 STREAM_BOUND_WARN(s
, "put");
916 stream_putc(s
, (p
->prefixlen
+ 24));
917 stream_putc(s
, label_pnt
[0]);
918 stream_putc(s
, label_pnt
[1]);
919 stream_putc(s
, label_pnt
[2]);
920 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
926 /* Read size from fd. */
927 int stream_read(struct stream
*s
, int fd
, size_t size
)
931 STREAM_VERIFY_SANE(s
);
933 if (STREAM_WRITEABLE(s
) < size
) {
934 STREAM_BOUND_WARN(s
, "put");
938 nbytes
= readn(fd
, s
->data
+ s
->endp
, size
);
946 ssize_t
stream_read_try(struct stream
*s
, int fd
, size_t size
)
950 STREAM_VERIFY_SANE(s
);
952 if (STREAM_WRITEABLE(s
) < size
) {
953 STREAM_BOUND_WARN(s
, "put");
954 /* Fatal (not transient) error, since retrying will not help
955 (stream is too small to contain the desired data). */
959 if ((nbytes
= read(fd
, s
->data
+ s
->endp
, size
)) >= 0) {
963 /* Error: was it transient (return -2) or fatal (return -1)? */
964 if (ERRNO_IO_RETRY(errno
))
966 zlog_warn("%s: read failed on fd %d: %s", __func__
, fd
,
967 safe_strerror(errno
));
971 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
972 * whose arguments match the remaining arguments to this function
974 ssize_t
stream_recvfrom(struct stream
*s
, int fd
, size_t size
, int flags
,
975 struct sockaddr
*from
, socklen_t
*fromlen
)
979 STREAM_VERIFY_SANE(s
);
981 if (STREAM_WRITEABLE(s
) < size
) {
982 STREAM_BOUND_WARN(s
, "put");
983 /* Fatal (not transient) error, since retrying will not help
984 (stream is too small to contain the desired data). */
988 if ((nbytes
= recvfrom(fd
, s
->data
+ s
->endp
, size
, flags
, from
,
994 /* Error: was it transient (return -2) or fatal (return -1)? */
995 if (ERRNO_IO_RETRY(errno
))
997 zlog_warn("%s: read failed on fd %d: %s", __func__
, fd
,
998 safe_strerror(errno
));
1002 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1004 * First iovec will be used to receive the data.
1005 * Stream need not be empty.
1007 ssize_t
stream_recvmsg(struct stream
*s
, int fd
, struct msghdr
*msgh
, int flags
,
1013 STREAM_VERIFY_SANE(s
);
1014 assert(msgh
->msg_iovlen
> 0);
1016 if (STREAM_WRITEABLE(s
) < size
) {
1017 STREAM_BOUND_WARN(s
, "put");
1018 /* This is a logic error in the calling code: the stream is too
1020 to hold the desired data! */
1024 iov
= &(msgh
->msg_iov
[0]);
1025 iov
->iov_base
= (s
->data
+ s
->endp
);
1026 iov
->iov_len
= size
;
1028 nbytes
= recvmsg(fd
, msgh
, flags
);
1036 /* Write data to buffer. */
1037 size_t stream_write(struct stream
*s
, const void *ptr
, size_t size
)
1040 CHECK_SIZE(s
, size
);
1042 STREAM_VERIFY_SANE(s
);
1044 if (STREAM_WRITEABLE(s
) < size
) {
1045 STREAM_BOUND_WARN(s
, "put");
1049 memcpy(s
->data
+ s
->endp
, ptr
, size
);
1055 /* Return current read pointer.
1057 * Use stream_get_pnt_to if you must, but decoding streams properly
1060 uint8_t *stream_pnt(struct stream
*s
)
1062 STREAM_VERIFY_SANE(s
);
1063 return s
->data
+ s
->getp
;
1066 /* Check does this stream empty? */
1067 int stream_empty(struct stream
*s
)
1069 STREAM_VERIFY_SANE(s
);
1071 return (s
->endp
== 0);
1075 void stream_reset(struct stream
*s
)
1077 STREAM_VERIFY_SANE(s
);
1079 s
->getp
= s
->endp
= 0;
1082 /* Write stream contens to the file discriptor. */
1083 int stream_flush(struct stream
*s
, int fd
)
1087 STREAM_VERIFY_SANE(s
);
1089 nbytes
= write(fd
, s
->data
+ s
->getp
, s
->endp
- s
->getp
);
1094 /* Stream first in first out queue. */
1096 struct stream_fifo
*stream_fifo_new(void)
1098 struct stream_fifo
*new;
1100 new = XCALLOC(MTYPE_STREAM_FIFO
, sizeof(struct stream_fifo
));
1101 pthread_mutex_init(&new->mtx
, NULL
);
1105 /* Add new stream to fifo. */
1106 void stream_fifo_push(struct stream_fifo
*fifo
, struct stream
*s
)
1108 #if defined DEV_BUILD
1113 fifo
->tail
->next
= s
;
1118 fifo
->tail
->next
= NULL
;
1119 #if !defined DEV_BUILD
1120 atomic_fetch_add_explicit(&fifo
->count
, 1, memory_order_release
);
1122 max
= atomic_fetch_add_explicit(&fifo
->count
, 1, memory_order_release
);
1123 curmax
= atomic_load_explicit(&fifo
->max_count
, memory_order_relaxed
);
1125 atomic_store_explicit(&fifo
->max_count
, max
,
1126 memory_order_relaxed
);
1130 void stream_fifo_push_safe(struct stream_fifo
*fifo
, struct stream
*s
)
1132 pthread_mutex_lock(&fifo
->mtx
);
1134 stream_fifo_push(fifo
, s
);
1136 pthread_mutex_unlock(&fifo
->mtx
);
1139 /* Delete first stream from fifo. */
1140 struct stream
*stream_fifo_pop(struct stream_fifo
*fifo
)
1147 fifo
->head
= s
->next
;
1149 if (fifo
->head
== NULL
)
1152 atomic_fetch_sub_explicit(&fifo
->count
, 1,
1153 memory_order_release
);
1155 /* ensure stream is scrubbed of references to this fifo */
1162 struct stream
*stream_fifo_pop_safe(struct stream_fifo
*fifo
)
1166 pthread_mutex_lock(&fifo
->mtx
);
1168 ret
= stream_fifo_pop(fifo
);
1170 pthread_mutex_unlock(&fifo
->mtx
);
1175 struct stream
*stream_fifo_head(struct stream_fifo
*fifo
)
1180 struct stream
*stream_fifo_head_safe(struct stream_fifo
*fifo
)
1184 pthread_mutex_lock(&fifo
->mtx
);
1186 ret
= stream_fifo_head(fifo
);
1188 pthread_mutex_unlock(&fifo
->mtx
);
1193 void stream_fifo_clean(struct stream_fifo
*fifo
)
1196 struct stream
*next
;
1198 for (s
= fifo
->head
; s
; s
= next
) {
1202 fifo
->head
= fifo
->tail
= NULL
;
1203 atomic_store_explicit(&fifo
->count
, 0, memory_order_release
);
1206 void stream_fifo_clean_safe(struct stream_fifo
*fifo
)
1208 pthread_mutex_lock(&fifo
->mtx
);
1210 stream_fifo_clean(fifo
);
1212 pthread_mutex_unlock(&fifo
->mtx
);
1215 size_t stream_fifo_count_safe(struct stream_fifo
*fifo
)
1217 return atomic_load_explicit(&fifo
->count
, memory_order_acquire
);
1220 void stream_fifo_free(struct stream_fifo
*fifo
)
1222 stream_fifo_clean(fifo
);
1223 pthread_mutex_destroy(&fifo
->mtx
);
1224 XFREE(MTYPE_STREAM_FIFO
, fifo
);