]>
git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
3 * Copyright (C) 1999 Kunihiro Ishiguro
5 * This file is part of GNU Zebra.
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
31 #include "lib_errors.h"
33 DEFINE_MTYPE_STATIC(LIB
, STREAM
, "Stream")
34 DEFINE_MTYPE_STATIC(LIB
, STREAM_FIFO
, "Stream FIFO")
36 /* Tests whether a position is valid */
37 #define GETP_VALID(S, G) ((G) <= (S)->endp)
38 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
39 #define ENDP_VALID(S, E) ((E) <= (S)->size)
41 /* asserting sanity checks. Following must be true before
42 * stream functions are called:
44 * Following must always be true of stream elements
45 * before and after calls to stream functions:
47 * getp <= endp <= size
49 * Note that after a stream function is called following may be true:
50 * if (getp == endp) then stream is no longer readable
51 * if (endp == size) then stream is no longer writeable
53 * It is valid to put to anywhere within the size of the stream, but only
54 * using stream_put..._at() functions.
56 #define STREAM_WARN_OFFSETS(S) \
57 flog_warn(EC_LIB_STREAM, \
58 "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
59 (void *)(S), (unsigned long)(S)->size, \
60 (unsigned long)(S)->getp, (unsigned long)(S)->endp)
62 #define STREAM_VERIFY_SANE(S) \
64 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) \
65 STREAM_WARN_OFFSETS(S); \
66 assert(GETP_VALID(S, (S)->getp)); \
67 assert(ENDP_VALID(S, (S)->endp)); \
70 #define STREAM_BOUND_WARN(S, WHAT) \
72 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
74 STREAM_WARN_OFFSETS(S); \
78 #define STREAM_BOUND_WARN2(S, WHAT) \
80 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
82 STREAM_WARN_OFFSETS(S); \
85 /* XXX: Deprecated macro: do not use */
86 #define CHECK_SIZE(S, Z) \
88 if (((S)->endp + (Z)) > (S)->size) { \
91 "CHECK_SIZE: truncating requested size %lu\n", \
92 (unsigned long)(Z)); \
93 STREAM_WARN_OFFSETS(S); \
94 (Z) = (S)->size - (S)->endp; \
98 /* Make stream buffer. */
99 struct stream
*stream_new(size_t size
)
105 s
= XMALLOC(MTYPE_STREAM
, sizeof(struct stream
) + size
);
107 s
->getp
= s
->endp
= 0;
114 void stream_free(struct stream
*s
)
119 XFREE(MTYPE_STREAM
, s
);
122 struct stream
*stream_copy(struct stream
*new, struct stream
*src
)
124 STREAM_VERIFY_SANE(src
);
127 assert(STREAM_SIZE(new) >= src
->endp
);
129 new->endp
= src
->endp
;
130 new->getp
= src
->getp
;
132 memcpy(new->data
, src
->data
, src
->endp
);
137 struct stream
*stream_dup(struct stream
*s
)
141 STREAM_VERIFY_SANE(s
);
143 if ((new = stream_new(s
->endp
)) == NULL
)
146 return (stream_copy(new, s
));
149 struct stream
*stream_dupcat(struct stream
*s1
, struct stream
*s2
,
154 STREAM_VERIFY_SANE(s1
);
155 STREAM_VERIFY_SANE(s2
);
157 if ((new = stream_new(s1
->endp
+ s2
->endp
)) == NULL
)
160 memcpy(new->data
, s1
->data
, offset
);
161 memcpy(new->data
+ offset
, s2
->data
, s2
->endp
);
162 memcpy(new->data
+ offset
+ s2
->endp
, s1
->data
+ offset
,
163 (s1
->endp
- offset
));
164 new->endp
= s1
->endp
+ s2
->endp
;
168 size_t stream_resize_inplace(struct stream
**sptr
, size_t newsize
)
170 struct stream
*orig
= *sptr
;
172 STREAM_VERIFY_SANE(orig
);
174 orig
= XREALLOC(MTYPE_STREAM
, orig
, sizeof(struct stream
) + newsize
);
176 orig
->size
= newsize
;
178 if (orig
->endp
> orig
->size
)
179 orig
->endp
= orig
->size
;
180 if (orig
->getp
> orig
->endp
)
181 orig
->getp
= orig
->endp
;
183 STREAM_VERIFY_SANE(orig
);
189 size_t __attribute__((deprecated
))stream_resize_orig(struct stream
*s
,
192 assert("stream_resize: Switch code to use stream_resize_inplace" == NULL
);
194 return stream_resize_inplace(&s
, newsize
);
197 size_t stream_get_getp(struct stream
*s
)
199 STREAM_VERIFY_SANE(s
);
203 size_t stream_get_endp(struct stream
*s
)
205 STREAM_VERIFY_SANE(s
);
209 size_t stream_get_size(struct stream
*s
)
211 STREAM_VERIFY_SANE(s
);
215 /* Stream structre' stream pointer related functions. */
216 void stream_set_getp(struct stream
*s
, size_t pos
)
218 STREAM_VERIFY_SANE(s
);
220 if (!GETP_VALID(s
, pos
)) {
221 STREAM_BOUND_WARN(s
, "set getp");
228 void stream_set_endp(struct stream
*s
, size_t pos
)
230 STREAM_VERIFY_SANE(s
);
232 if (!ENDP_VALID(s
, pos
)) {
233 STREAM_BOUND_WARN(s
, "set endp");
238 * Make sure the current read pointer is not beyond the new endp.
241 STREAM_BOUND_WARN(s
, "set endp");
246 STREAM_VERIFY_SANE(s
);
249 /* Forward pointer. */
250 void stream_forward_getp(struct stream
*s
, size_t size
)
252 STREAM_VERIFY_SANE(s
);
254 if (!GETP_VALID(s
, s
->getp
+ size
)) {
255 STREAM_BOUND_WARN(s
, "seek getp");
262 void stream_forward_endp(struct stream
*s
, size_t size
)
264 STREAM_VERIFY_SANE(s
);
266 if (!ENDP_VALID(s
, s
->endp
+ size
)) {
267 STREAM_BOUND_WARN(s
, "seek endp");
274 /* Copy from stream to destination. */
275 bool stream_get2(void *dst
, struct stream
*s
, size_t size
)
277 STREAM_VERIFY_SANE(s
);
279 if (STREAM_READABLE(s
) < size
) {
280 STREAM_BOUND_WARN2(s
, "get");
284 memcpy(dst
, s
->data
+ s
->getp
, size
);
290 void stream_get(void *dst
, struct stream
*s
, size_t size
)
292 STREAM_VERIFY_SANE(s
);
294 if (STREAM_READABLE(s
) < size
) {
295 STREAM_BOUND_WARN(s
, "get");
299 memcpy(dst
, s
->data
+ s
->getp
, size
);
303 /* Get next character from the stream. */
304 bool stream_getc2(struct stream
*s
, uint8_t *byte
)
306 STREAM_VERIFY_SANE(s
);
308 if (STREAM_READABLE(s
) < sizeof(uint8_t)) {
309 STREAM_BOUND_WARN2(s
, "get char");
312 *byte
= s
->data
[s
->getp
++];
317 uint8_t stream_getc(struct stream
*s
)
321 STREAM_VERIFY_SANE(s
);
323 if (STREAM_READABLE(s
) < sizeof(uint8_t)) {
324 STREAM_BOUND_WARN(s
, "get char");
327 c
= s
->data
[s
->getp
++];
332 /* Get next character from the stream. */
333 uint8_t stream_getc_from(struct stream
*s
, size_t from
)
337 STREAM_VERIFY_SANE(s
);
339 if (!GETP_VALID(s
, from
+ sizeof(uint8_t))) {
340 STREAM_BOUND_WARN(s
, "get char");
349 bool stream_getw2(struct stream
*s
, uint16_t *word
)
351 STREAM_VERIFY_SANE(s
);
353 if (STREAM_READABLE(s
) < sizeof(uint16_t)) {
354 STREAM_BOUND_WARN2(s
, "get ");
358 *word
= s
->data
[s
->getp
++] << 8;
359 *word
|= s
->data
[s
->getp
++];
364 /* Get next word from the stream. */
365 uint16_t stream_getw(struct stream
*s
)
369 STREAM_VERIFY_SANE(s
);
371 if (STREAM_READABLE(s
) < sizeof(uint16_t)) {
372 STREAM_BOUND_WARN(s
, "get ");
376 w
= s
->data
[s
->getp
++] << 8;
377 w
|= s
->data
[s
->getp
++];
382 /* Get next word from the stream. */
383 uint16_t stream_getw_from(struct stream
*s
, size_t from
)
387 STREAM_VERIFY_SANE(s
);
389 if (!GETP_VALID(s
, from
+ sizeof(uint16_t))) {
390 STREAM_BOUND_WARN(s
, "get ");
394 w
= s
->data
[from
++] << 8;
400 /* Get next 3-byte from the stream. */
401 uint32_t stream_get3_from(struct stream
*s
, size_t from
)
405 STREAM_VERIFY_SANE(s
);
407 if (!GETP_VALID(s
, from
+ 3)) {
408 STREAM_BOUND_WARN(s
, "get 3byte");
412 l
= s
->data
[from
++] << 16;
413 l
|= s
->data
[from
++] << 8;
419 uint32_t stream_get3(struct stream
*s
)
423 STREAM_VERIFY_SANE(s
);
425 if (STREAM_READABLE(s
) < 3) {
426 STREAM_BOUND_WARN(s
, "get 3byte");
430 l
= s
->data
[s
->getp
++] << 16;
431 l
|= s
->data
[s
->getp
++] << 8;
432 l
|= s
->data
[s
->getp
++];
437 /* Get next long word from the stream. */
438 uint32_t stream_getl_from(struct stream
*s
, size_t from
)
442 STREAM_VERIFY_SANE(s
);
444 if (!GETP_VALID(s
, from
+ sizeof(uint32_t))) {
445 STREAM_BOUND_WARN(s
, "get long");
449 l
= (unsigned)(s
->data
[from
++]) << 24;
450 l
|= s
->data
[from
++] << 16;
451 l
|= s
->data
[from
++] << 8;
457 /* Copy from stream at specific location to destination. */
458 void stream_get_from(void *dst
, struct stream
*s
, size_t from
, size_t size
)
460 STREAM_VERIFY_SANE(s
);
462 if (!GETP_VALID(s
, from
+ size
)) {
463 STREAM_BOUND_WARN(s
, "get from");
467 memcpy(dst
, s
->data
+ from
, size
);
470 bool stream_getl2(struct stream
*s
, uint32_t *l
)
472 STREAM_VERIFY_SANE(s
);
474 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
475 STREAM_BOUND_WARN2(s
, "get long");
479 *l
= (unsigned int)(s
->data
[s
->getp
++]) << 24;
480 *l
|= s
->data
[s
->getp
++] << 16;
481 *l
|= s
->data
[s
->getp
++] << 8;
482 *l
|= s
->data
[s
->getp
++];
487 uint32_t stream_getl(struct stream
*s
)
491 STREAM_VERIFY_SANE(s
);
493 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
494 STREAM_BOUND_WARN(s
, "get long");
498 l
= (unsigned)(s
->data
[s
->getp
++]) << 24;
499 l
|= s
->data
[s
->getp
++] << 16;
500 l
|= s
->data
[s
->getp
++] << 8;
501 l
|= s
->data
[s
->getp
++];
506 /* Get next quad word from the stream. */
507 uint64_t stream_getq_from(struct stream
*s
, size_t from
)
511 STREAM_VERIFY_SANE(s
);
513 if (!GETP_VALID(s
, from
+ sizeof(uint64_t))) {
514 STREAM_BOUND_WARN(s
, "get quad");
518 q
= ((uint64_t)s
->data
[from
++]) << 56;
519 q
|= ((uint64_t)s
->data
[from
++]) << 48;
520 q
|= ((uint64_t)s
->data
[from
++]) << 40;
521 q
|= ((uint64_t)s
->data
[from
++]) << 32;
522 q
|= ((uint64_t)s
->data
[from
++]) << 24;
523 q
|= ((uint64_t)s
->data
[from
++]) << 16;
524 q
|= ((uint64_t)s
->data
[from
++]) << 8;
525 q
|= ((uint64_t)s
->data
[from
++]);
530 uint64_t stream_getq(struct stream
*s
)
534 STREAM_VERIFY_SANE(s
);
536 if (STREAM_READABLE(s
) < sizeof(uint64_t)) {
537 STREAM_BOUND_WARN(s
, "get quad");
541 q
= ((uint64_t)s
->data
[s
->getp
++]) << 56;
542 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 48;
543 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 40;
544 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 32;
545 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 24;
546 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 16;
547 q
|= ((uint64_t)s
->data
[s
->getp
++]) << 8;
548 q
|= ((uint64_t)s
->data
[s
->getp
++]);
553 /* Get next long word from the stream. */
554 uint32_t stream_get_ipv4(struct stream
*s
)
558 STREAM_VERIFY_SANE(s
);
560 if (STREAM_READABLE(s
) < sizeof(uint32_t)) {
561 STREAM_BOUND_WARN(s
, "get ipv4");
565 memcpy(&l
, s
->data
+ s
->getp
, sizeof(uint32_t));
566 s
->getp
+= sizeof(uint32_t);
571 float stream_getf(struct stream
*s
)
577 u
.d
= stream_getl(s
);
581 double stream_getd(struct stream
*s
)
587 u
.d
= stream_getq(s
);
591 /* Copy to source to stream.
593 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
594 * around. This should be fixed once the stream updates are working.
596 * stream_write() is saner
598 void stream_put(struct stream
*s
, const void *src
, size_t size
)
601 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
604 STREAM_VERIFY_SANE(s
);
606 if (STREAM_WRITEABLE(s
) < size
) {
607 STREAM_BOUND_WARN(s
, "put");
612 memcpy(s
->data
+ s
->endp
, src
, size
);
614 memset(s
->data
+ s
->endp
, 0, size
);
619 /* Put character to the stream. */
620 int stream_putc(struct stream
*s
, uint8_t c
)
622 STREAM_VERIFY_SANE(s
);
624 if (STREAM_WRITEABLE(s
) < sizeof(uint8_t)) {
625 STREAM_BOUND_WARN(s
, "put");
629 s
->data
[s
->endp
++] = c
;
630 return sizeof(uint8_t);
633 /* Put word to the stream. */
634 int stream_putw(struct stream
*s
, uint16_t w
)
636 STREAM_VERIFY_SANE(s
);
638 if (STREAM_WRITEABLE(s
) < sizeof(uint16_t)) {
639 STREAM_BOUND_WARN(s
, "put");
643 s
->data
[s
->endp
++] = (uint8_t)(w
>> 8);
644 s
->data
[s
->endp
++] = (uint8_t)w
;
649 /* Put long word to the stream. */
650 int stream_put3(struct stream
*s
, uint32_t l
)
652 STREAM_VERIFY_SANE(s
);
654 if (STREAM_WRITEABLE(s
) < 3) {
655 STREAM_BOUND_WARN(s
, "put");
659 s
->data
[s
->endp
++] = (uint8_t)(l
>> 16);
660 s
->data
[s
->endp
++] = (uint8_t)(l
>> 8);
661 s
->data
[s
->endp
++] = (uint8_t)l
;
666 /* Put long word to the stream. */
667 int stream_putl(struct stream
*s
, uint32_t l
)
669 STREAM_VERIFY_SANE(s
);
671 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
672 STREAM_BOUND_WARN(s
, "put");
676 s
->data
[s
->endp
++] = (uint8_t)(l
>> 24);
677 s
->data
[s
->endp
++] = (uint8_t)(l
>> 16);
678 s
->data
[s
->endp
++] = (uint8_t)(l
>> 8);
679 s
->data
[s
->endp
++] = (uint8_t)l
;
684 /* Put quad word to the stream. */
685 int stream_putq(struct stream
*s
, uint64_t q
)
687 STREAM_VERIFY_SANE(s
);
689 if (STREAM_WRITEABLE(s
) < sizeof(uint64_t)) {
690 STREAM_BOUND_WARN(s
, "put quad");
694 s
->data
[s
->endp
++] = (uint8_t)(q
>> 56);
695 s
->data
[s
->endp
++] = (uint8_t)(q
>> 48);
696 s
->data
[s
->endp
++] = (uint8_t)(q
>> 40);
697 s
->data
[s
->endp
++] = (uint8_t)(q
>> 32);
698 s
->data
[s
->endp
++] = (uint8_t)(q
>> 24);
699 s
->data
[s
->endp
++] = (uint8_t)(q
>> 16);
700 s
->data
[s
->endp
++] = (uint8_t)(q
>> 8);
701 s
->data
[s
->endp
++] = (uint8_t)q
;
706 int stream_putf(struct stream
*s
, float f
)
713 return stream_putl(s
, u
.o
);
716 int stream_putd(struct stream
*s
, double d
)
723 return stream_putq(s
, u
.o
);
726 int stream_putc_at(struct stream
*s
, size_t putp
, uint8_t c
)
728 STREAM_VERIFY_SANE(s
);
730 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint8_t))) {
731 STREAM_BOUND_WARN(s
, "put");
740 int stream_putw_at(struct stream
*s
, size_t putp
, uint16_t w
)
742 STREAM_VERIFY_SANE(s
);
744 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint16_t))) {
745 STREAM_BOUND_WARN(s
, "put");
749 s
->data
[putp
] = (uint8_t)(w
>> 8);
750 s
->data
[putp
+ 1] = (uint8_t)w
;
755 int stream_put3_at(struct stream
*s
, size_t putp
, uint32_t l
)
757 STREAM_VERIFY_SANE(s
);
759 if (!PUT_AT_VALID(s
, putp
+ 3)) {
760 STREAM_BOUND_WARN(s
, "put");
763 s
->data
[putp
] = (uint8_t)(l
>> 16);
764 s
->data
[putp
+ 1] = (uint8_t)(l
>> 8);
765 s
->data
[putp
+ 2] = (uint8_t)l
;
770 int stream_putl_at(struct stream
*s
, size_t putp
, uint32_t l
)
772 STREAM_VERIFY_SANE(s
);
774 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint32_t))) {
775 STREAM_BOUND_WARN(s
, "put");
778 s
->data
[putp
] = (uint8_t)(l
>> 24);
779 s
->data
[putp
+ 1] = (uint8_t)(l
>> 16);
780 s
->data
[putp
+ 2] = (uint8_t)(l
>> 8);
781 s
->data
[putp
+ 3] = (uint8_t)l
;
786 int stream_putq_at(struct stream
*s
, size_t putp
, uint64_t q
)
788 STREAM_VERIFY_SANE(s
);
790 if (!PUT_AT_VALID(s
, putp
+ sizeof(uint64_t))) {
791 STREAM_BOUND_WARN(s
, "put");
794 s
->data
[putp
] = (uint8_t)(q
>> 56);
795 s
->data
[putp
+ 1] = (uint8_t)(q
>> 48);
796 s
->data
[putp
+ 2] = (uint8_t)(q
>> 40);
797 s
->data
[putp
+ 3] = (uint8_t)(q
>> 32);
798 s
->data
[putp
+ 4] = (uint8_t)(q
>> 24);
799 s
->data
[putp
+ 5] = (uint8_t)(q
>> 16);
800 s
->data
[putp
+ 6] = (uint8_t)(q
>> 8);
801 s
->data
[putp
+ 7] = (uint8_t)q
;
806 /* Put long word to the stream. */
807 int stream_put_ipv4(struct stream
*s
, uint32_t l
)
809 STREAM_VERIFY_SANE(s
);
811 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
812 STREAM_BOUND_WARN(s
, "put");
815 memcpy(s
->data
+ s
->endp
, &l
, sizeof(uint32_t));
816 s
->endp
+= sizeof(uint32_t);
818 return sizeof(uint32_t);
821 /* Put long word to the stream. */
822 int stream_put_in_addr(struct stream
*s
, struct in_addr
*addr
)
824 STREAM_VERIFY_SANE(s
);
826 if (STREAM_WRITEABLE(s
) < sizeof(uint32_t)) {
827 STREAM_BOUND_WARN(s
, "put");
831 memcpy(s
->data
+ s
->endp
, addr
, sizeof(uint32_t));
832 s
->endp
+= sizeof(uint32_t);
834 return sizeof(uint32_t);
837 /* Put in_addr at location in the stream. */
838 int stream_put_in_addr_at(struct stream
*s
, size_t putp
, struct in_addr
*addr
)
840 STREAM_VERIFY_SANE(s
);
842 if (!PUT_AT_VALID(s
, putp
+ 4)) {
843 STREAM_BOUND_WARN(s
, "put");
847 memcpy(&s
->data
[putp
], addr
, 4);
851 /* Put in6_addr at location in the stream. */
852 int stream_put_in6_addr_at(struct stream
*s
, size_t putp
, struct in6_addr
*addr
)
854 STREAM_VERIFY_SANE(s
);
856 if (!PUT_AT_VALID(s
, putp
+ 16)) {
857 STREAM_BOUND_WARN(s
, "put");
861 memcpy(&s
->data
[putp
], addr
, 16);
865 /* Put prefix by nlri type format. */
866 int stream_put_prefix_addpath(struct stream
*s
, struct prefix
*p
,
867 int addpath_encode
, uint32_t addpath_tx_id
)
870 size_t psize_with_addpath
;
872 STREAM_VERIFY_SANE(s
);
874 psize
= PSIZE(p
->prefixlen
);
877 psize_with_addpath
= psize
+ 4;
879 psize_with_addpath
= psize
;
881 if (STREAM_WRITEABLE(s
) < (psize_with_addpath
+ sizeof(uint8_t))) {
882 STREAM_BOUND_WARN(s
, "put");
886 if (addpath_encode
) {
887 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 24);
888 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 16);
889 s
->data
[s
->endp
++] = (uint8_t)(addpath_tx_id
>> 8);
890 s
->data
[s
->endp
++] = (uint8_t)addpath_tx_id
;
893 s
->data
[s
->endp
++] = p
->prefixlen
;
894 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
900 int stream_put_prefix(struct stream
*s
, struct prefix
*p
)
902 return stream_put_prefix_addpath(s
, p
, 0, 0);
905 /* Put NLRI with label */
906 int stream_put_labeled_prefix(struct stream
*s
, struct prefix
*p
,
910 uint8_t *label_pnt
= (uint8_t *)label
;
912 STREAM_VERIFY_SANE(s
);
914 psize
= PSIZE(p
->prefixlen
);
916 if (STREAM_WRITEABLE(s
) < (psize
+ 3)) {
917 STREAM_BOUND_WARN(s
, "put");
921 stream_putc(s
, (p
->prefixlen
+ 24));
922 stream_putc(s
, label_pnt
[0]);
923 stream_putc(s
, label_pnt
[1]);
924 stream_putc(s
, label_pnt
[2]);
925 memcpy(s
->data
+ s
->endp
, &p
->u
.prefix
, psize
);
931 /* Read size from fd. */
932 int stream_read(struct stream
*s
, int fd
, size_t size
)
936 STREAM_VERIFY_SANE(s
);
938 if (STREAM_WRITEABLE(s
) < size
) {
939 STREAM_BOUND_WARN(s
, "put");
943 nbytes
= readn(fd
, s
->data
+ s
->endp
, size
);
951 ssize_t
stream_read_try(struct stream
*s
, int fd
, size_t size
)
955 STREAM_VERIFY_SANE(s
);
957 if (STREAM_WRITEABLE(s
) < size
) {
958 STREAM_BOUND_WARN(s
, "put");
959 /* Fatal (not transient) error, since retrying will not help
960 (stream is too small to contain the desired data). */
964 if ((nbytes
= read(fd
, s
->data
+ s
->endp
, size
)) >= 0) {
968 /* Error: was it transient (return -2) or fatal (return -1)? */
969 if (ERRNO_IO_RETRY(errno
))
971 flog_err(EC_LIB_SOCKET
, "%s: read failed on fd %d: %s", __func__
, fd
,
972 safe_strerror(errno
));
976 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
977 * whose arguments match the remaining arguments to this function
979 ssize_t
stream_recvfrom(struct stream
*s
, int fd
, size_t size
, int flags
,
980 struct sockaddr
*from
, socklen_t
*fromlen
)
984 STREAM_VERIFY_SANE(s
);
986 if (STREAM_WRITEABLE(s
) < size
) {
987 STREAM_BOUND_WARN(s
, "put");
988 /* Fatal (not transient) error, since retrying will not help
989 (stream is too small to contain the desired data). */
993 if ((nbytes
= recvfrom(fd
, s
->data
+ s
->endp
, size
, flags
, from
,
999 /* Error: was it transient (return -2) or fatal (return -1)? */
1000 if (ERRNO_IO_RETRY(errno
))
1002 flog_err(EC_LIB_SOCKET
, "%s: read failed on fd %d: %s", __func__
, fd
,
1003 safe_strerror(errno
));
1007 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1009 * First iovec will be used to receive the data.
1010 * Stream need not be empty.
1012 ssize_t
stream_recvmsg(struct stream
*s
, int fd
, struct msghdr
*msgh
, int flags
,
1018 STREAM_VERIFY_SANE(s
);
1019 assert(msgh
->msg_iovlen
> 0);
1021 if (STREAM_WRITEABLE(s
) < size
) {
1022 STREAM_BOUND_WARN(s
, "put");
1023 /* This is a logic error in the calling code: the stream is too
1025 to hold the desired data! */
1029 iov
= &(msgh
->msg_iov
[0]);
1030 iov
->iov_base
= (s
->data
+ s
->endp
);
1031 iov
->iov_len
= size
;
1033 nbytes
= recvmsg(fd
, msgh
, flags
);
1041 /* Write data to buffer. */
1042 size_t stream_write(struct stream
*s
, const void *ptr
, size_t size
)
1045 CHECK_SIZE(s
, size
);
1047 STREAM_VERIFY_SANE(s
);
1049 if (STREAM_WRITEABLE(s
) < size
) {
1050 STREAM_BOUND_WARN(s
, "put");
1054 memcpy(s
->data
+ s
->endp
, ptr
, size
);
1060 /* Return current read pointer.
1062 * Use stream_get_pnt_to if you must, but decoding streams properly
1065 uint8_t *stream_pnt(struct stream
*s
)
1067 STREAM_VERIFY_SANE(s
);
1068 return s
->data
+ s
->getp
;
1071 /* Check does this stream empty? */
1072 int stream_empty(struct stream
*s
)
1074 STREAM_VERIFY_SANE(s
);
1076 return (s
->endp
== 0);
1080 void stream_reset(struct stream
*s
)
1082 STREAM_VERIFY_SANE(s
);
1084 s
->getp
= s
->endp
= 0;
1087 /* Write stream contens to the file discriptor. */
1088 int stream_flush(struct stream
*s
, int fd
)
1092 STREAM_VERIFY_SANE(s
);
1094 nbytes
= write(fd
, s
->data
+ s
->getp
, s
->endp
- s
->getp
);
1099 /* Stream first in first out queue. */
1101 struct stream_fifo
*stream_fifo_new(void)
1103 struct stream_fifo
*new;
1105 new = XCALLOC(MTYPE_STREAM_FIFO
, sizeof(struct stream_fifo
));
1106 pthread_mutex_init(&new->mtx
, NULL
);
1110 /* Add new stream to fifo. */
1111 void stream_fifo_push(struct stream_fifo
*fifo
, struct stream
*s
)
1113 #if defined DEV_BUILD
1118 fifo
->tail
->next
= s
;
1123 fifo
->tail
->next
= NULL
;
1124 #if !defined DEV_BUILD
1125 atomic_fetch_add_explicit(&fifo
->count
, 1, memory_order_release
);
1127 max
= atomic_fetch_add_explicit(&fifo
->count
, 1, memory_order_release
);
1128 curmax
= atomic_load_explicit(&fifo
->max_count
, memory_order_relaxed
);
1130 atomic_store_explicit(&fifo
->max_count
, max
,
1131 memory_order_relaxed
);
1135 void stream_fifo_push_safe(struct stream_fifo
*fifo
, struct stream
*s
)
1137 pthread_mutex_lock(&fifo
->mtx
);
1139 stream_fifo_push(fifo
, s
);
1141 pthread_mutex_unlock(&fifo
->mtx
);
1144 /* Delete first stream from fifo. */
1145 struct stream
*stream_fifo_pop(struct stream_fifo
*fifo
)
1152 fifo
->head
= s
->next
;
1154 if (fifo
->head
== NULL
)
1157 atomic_fetch_sub_explicit(&fifo
->count
, 1,
1158 memory_order_release
);
1160 /* ensure stream is scrubbed of references to this fifo */
1167 struct stream
*stream_fifo_pop_safe(struct stream_fifo
*fifo
)
1171 pthread_mutex_lock(&fifo
->mtx
);
1173 ret
= stream_fifo_pop(fifo
);
1175 pthread_mutex_unlock(&fifo
->mtx
);
1180 struct stream
*stream_fifo_head(struct stream_fifo
*fifo
)
1185 struct stream
*stream_fifo_head_safe(struct stream_fifo
*fifo
)
1189 pthread_mutex_lock(&fifo
->mtx
);
1191 ret
= stream_fifo_head(fifo
);
1193 pthread_mutex_unlock(&fifo
->mtx
);
1198 void stream_fifo_clean(struct stream_fifo
*fifo
)
1201 struct stream
*next
;
1203 for (s
= fifo
->head
; s
; s
= next
) {
1207 fifo
->head
= fifo
->tail
= NULL
;
1208 atomic_store_explicit(&fifo
->count
, 0, memory_order_release
);
1211 void stream_fifo_clean_safe(struct stream_fifo
*fifo
)
1213 pthread_mutex_lock(&fifo
->mtx
);
1215 stream_fifo_clean(fifo
);
1217 pthread_mutex_unlock(&fifo
->mtx
);
1220 size_t stream_fifo_count_safe(struct stream_fifo
*fifo
)
1222 return atomic_load_explicit(&fifo
->count
, memory_order_acquire
);
1225 void stream_fifo_free(struct stream_fifo
*fifo
)
1227 stream_fifo_clean(fifo
);
1228 pthread_mutex_destroy(&fifo
->mtx
);
1229 XFREE(MTYPE_STREAM_FIFO
, fifo
);