]> git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
Merge pull request #5590 from qlyoung/fix-nhrp-underflow
[mirror_frr.git] / lib / stream.c
1 /*
2 * Packet interface
3 * Copyright (C) 1999 Kunihiro Ishiguro
4 *
5 * This file is part of GNU Zebra.
6 *
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
10 * later version.
11 *
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <zebra.h>
23 #include <stddef.h>
24 #include <pthread.h>
25
26 #include "stream.h"
27 #include "memory.h"
28 #include "network.h"
29 #include "prefix.h"
30 #include "log.h"
31 #include "frr_pthread.h"
32 #include "lib_errors.h"
33
34 DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream")
35 DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
36
37 /* Tests whether a position is valid */
38 #define GETP_VALID(S, G) ((G) <= (S)->endp)
39 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
40 #define ENDP_VALID(S, E) ((E) <= (S)->size)
41
42 /* asserting sanity checks. Following must be true before
43 * stream functions are called:
44 *
45 * Following must always be true of stream elements
46 * before and after calls to stream functions:
47 *
48 * getp <= endp <= size
49 *
50 * Note that after a stream function is called following may be true:
51 * if (getp == endp) then stream is no longer readable
52 * if (endp == size) then stream is no longer writeable
53 *
54 * It is valid to put to anywhere within the size of the stream, but only
55 * using stream_put..._at() functions.
56 */
57 #define STREAM_WARN_OFFSETS(S) \
58 flog_warn(EC_LIB_STREAM, \
59 "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
60 (void *)(S), (unsigned long)(S)->size, \
61 (unsigned long)(S)->getp, (unsigned long)(S)->endp)
62
63 #define STREAM_VERIFY_SANE(S) \
64 do { \
65 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) \
66 STREAM_WARN_OFFSETS(S); \
67 assert(GETP_VALID(S, (S)->getp)); \
68 assert(ENDP_VALID(S, (S)->endp)); \
69 } while (0)
70
71 #define STREAM_BOUND_WARN(S, WHAT) \
72 do { \
73 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
74 __func__, (WHAT)); \
75 STREAM_WARN_OFFSETS(S); \
76 assert(0); \
77 } while (0)
78
79 #define STREAM_BOUND_WARN2(S, WHAT) \
80 do { \
81 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
82 __func__, (WHAT)); \
83 STREAM_WARN_OFFSETS(S); \
84 } while (0)
85
86 /* XXX: Deprecated macro: do not use */
87 #define CHECK_SIZE(S, Z) \
88 do { \
89 if (((S)->endp + (Z)) > (S)->size) { \
90 flog_warn( \
91 EC_LIB_STREAM, \
92 "CHECK_SIZE: truncating requested size %lu\n", \
93 (unsigned long)(Z)); \
94 STREAM_WARN_OFFSETS(S); \
95 (Z) = (S)->size - (S)->endp; \
96 } \
97 } while (0);
98
99 /* Make stream buffer. */
100 struct stream *stream_new(size_t size)
101 {
102 struct stream *s;
103
104 assert(size > 0);
105
106 s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size);
107
108 s->getp = s->endp = 0;
109 s->next = NULL;
110 s->size = size;
111 return s;
112 }
113
114 /* Free it now. */
115 void stream_free(struct stream *s)
116 {
117 if (!s)
118 return;
119
120 XFREE(MTYPE_STREAM, s);
121 }
122
123 struct stream *stream_copy(struct stream *new, struct stream *src)
124 {
125 STREAM_VERIFY_SANE(src);
126
127 assert(new != NULL);
128 assert(STREAM_SIZE(new) >= src->endp);
129
130 new->endp = src->endp;
131 new->getp = src->getp;
132
133 memcpy(new->data, src->data, src->endp);
134
135 return new;
136 }
137
138 struct stream *stream_dup(struct stream *s)
139 {
140 struct stream *new;
141
142 STREAM_VERIFY_SANE(s);
143
144 if ((new = stream_new(s->endp)) == NULL)
145 return NULL;
146
147 return (stream_copy(new, s));
148 }
149
150 struct stream *stream_dupcat(struct stream *s1, struct stream *s2,
151 size_t offset)
152 {
153 struct stream *new;
154
155 STREAM_VERIFY_SANE(s1);
156 STREAM_VERIFY_SANE(s2);
157
158 if ((new = stream_new(s1->endp + s2->endp)) == NULL)
159 return NULL;
160
161 memcpy(new->data, s1->data, offset);
162 memcpy(new->data + offset, s2->data, s2->endp);
163 memcpy(new->data + offset + s2->endp, s1->data + offset,
164 (s1->endp - offset));
165 new->endp = s1->endp + s2->endp;
166 return new;
167 }
168
169 size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
170 {
171 struct stream *orig = *sptr;
172
173 STREAM_VERIFY_SANE(orig);
174
175 orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize);
176
177 orig->size = newsize;
178
179 if (orig->endp > orig->size)
180 orig->endp = orig->size;
181 if (orig->getp > orig->endp)
182 orig->getp = orig->endp;
183
184 STREAM_VERIFY_SANE(orig);
185
186 *sptr = orig;
187 return orig->size;
188 }
189
190 size_t stream_get_getp(struct stream *s)
191 {
192 STREAM_VERIFY_SANE(s);
193 return s->getp;
194 }
195
196 size_t stream_get_endp(struct stream *s)
197 {
198 STREAM_VERIFY_SANE(s);
199 return s->endp;
200 }
201
202 size_t stream_get_size(struct stream *s)
203 {
204 STREAM_VERIFY_SANE(s);
205 return s->size;
206 }
207
208 /* Stream structre' stream pointer related functions. */
209 void stream_set_getp(struct stream *s, size_t pos)
210 {
211 STREAM_VERIFY_SANE(s);
212
213 if (!GETP_VALID(s, pos)) {
214 STREAM_BOUND_WARN(s, "set getp");
215 pos = s->endp;
216 }
217
218 s->getp = pos;
219 }
220
221 void stream_set_endp(struct stream *s, size_t pos)
222 {
223 STREAM_VERIFY_SANE(s);
224
225 if (!ENDP_VALID(s, pos)) {
226 STREAM_BOUND_WARN(s, "set endp");
227 return;
228 }
229
230 /*
231 * Make sure the current read pointer is not beyond the new endp.
232 */
233 if (s->getp > pos) {
234 STREAM_BOUND_WARN(s, "set endp");
235 return;
236 }
237
238 s->endp = pos;
239 STREAM_VERIFY_SANE(s);
240 }
241
242 /* Forward pointer. */
243 void stream_forward_getp(struct stream *s, size_t size)
244 {
245 STREAM_VERIFY_SANE(s);
246
247 if (!GETP_VALID(s, s->getp + size)) {
248 STREAM_BOUND_WARN(s, "seek getp");
249 return;
250 }
251
252 s->getp += size;
253 }
254
255 void stream_forward_endp(struct stream *s, size_t size)
256 {
257 STREAM_VERIFY_SANE(s);
258
259 if (!ENDP_VALID(s, s->endp + size)) {
260 STREAM_BOUND_WARN(s, "seek endp");
261 return;
262 }
263
264 s->endp += size;
265 }
266
267 /* Copy from stream to destination. */
268 bool stream_get2(void *dst, struct stream *s, size_t size)
269 {
270 STREAM_VERIFY_SANE(s);
271
272 if (STREAM_READABLE(s) < size) {
273 STREAM_BOUND_WARN2(s, "get");
274 return false;
275 }
276
277 memcpy(dst, s->data + s->getp, size);
278 s->getp += size;
279
280 return true;
281 }
282
283 void stream_get(void *dst, struct stream *s, size_t size)
284 {
285 STREAM_VERIFY_SANE(s);
286
287 if (STREAM_READABLE(s) < size) {
288 STREAM_BOUND_WARN(s, "get");
289 return;
290 }
291
292 memcpy(dst, s->data + s->getp, size);
293 s->getp += size;
294 }
295
296 /* Get next character from the stream. */
297 bool stream_getc2(struct stream *s, uint8_t *byte)
298 {
299 STREAM_VERIFY_SANE(s);
300
301 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
302 STREAM_BOUND_WARN2(s, "get char");
303 return false;
304 }
305 *byte = s->data[s->getp++];
306
307 return true;
308 }
309
310 uint8_t stream_getc(struct stream *s)
311 {
312 uint8_t c;
313
314 STREAM_VERIFY_SANE(s);
315
316 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
317 STREAM_BOUND_WARN(s, "get char");
318 return 0;
319 }
320 c = s->data[s->getp++];
321
322 return c;
323 }
324
325 /* Get next character from the stream. */
326 uint8_t stream_getc_from(struct stream *s, size_t from)
327 {
328 uint8_t c;
329
330 STREAM_VERIFY_SANE(s);
331
332 if (!GETP_VALID(s, from + sizeof(uint8_t))) {
333 STREAM_BOUND_WARN(s, "get char");
334 return 0;
335 }
336
337 c = s->data[from];
338
339 return c;
340 }
341
342 bool stream_getw2(struct stream *s, uint16_t *word)
343 {
344 STREAM_VERIFY_SANE(s);
345
346 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
347 STREAM_BOUND_WARN2(s, "get ");
348 return false;
349 }
350
351 *word = s->data[s->getp++] << 8;
352 *word |= s->data[s->getp++];
353
354 return true;
355 }
356
357 /* Get next word from the stream. */
358 uint16_t stream_getw(struct stream *s)
359 {
360 uint16_t w;
361
362 STREAM_VERIFY_SANE(s);
363
364 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
365 STREAM_BOUND_WARN(s, "get ");
366 return 0;
367 }
368
369 w = s->data[s->getp++] << 8;
370 w |= s->data[s->getp++];
371
372 return w;
373 }
374
375 /* Get next word from the stream. */
376 uint16_t stream_getw_from(struct stream *s, size_t from)
377 {
378 uint16_t w;
379
380 STREAM_VERIFY_SANE(s);
381
382 if (!GETP_VALID(s, from + sizeof(uint16_t))) {
383 STREAM_BOUND_WARN(s, "get ");
384 return 0;
385 }
386
387 w = s->data[from++] << 8;
388 w |= s->data[from];
389
390 return w;
391 }
392
393 /* Get next 3-byte from the stream. */
394 uint32_t stream_get3_from(struct stream *s, size_t from)
395 {
396 uint32_t l;
397
398 STREAM_VERIFY_SANE(s);
399
400 if (!GETP_VALID(s, from + 3)) {
401 STREAM_BOUND_WARN(s, "get 3byte");
402 return 0;
403 }
404
405 l = s->data[from++] << 16;
406 l |= s->data[from++] << 8;
407 l |= s->data[from];
408
409 return l;
410 }
411
412 uint32_t stream_get3(struct stream *s)
413 {
414 uint32_t l;
415
416 STREAM_VERIFY_SANE(s);
417
418 if (STREAM_READABLE(s) < 3) {
419 STREAM_BOUND_WARN(s, "get 3byte");
420 return 0;
421 }
422
423 l = s->data[s->getp++] << 16;
424 l |= s->data[s->getp++] << 8;
425 l |= s->data[s->getp++];
426
427 return l;
428 }
429
430 /* Get next long word from the stream. */
431 uint32_t stream_getl_from(struct stream *s, size_t from)
432 {
433 uint32_t l;
434
435 STREAM_VERIFY_SANE(s);
436
437 if (!GETP_VALID(s, from + sizeof(uint32_t))) {
438 STREAM_BOUND_WARN(s, "get long");
439 return 0;
440 }
441
442 l = (unsigned)(s->data[from++]) << 24;
443 l |= s->data[from++] << 16;
444 l |= s->data[from++] << 8;
445 l |= s->data[from];
446
447 return l;
448 }
449
450 /* Copy from stream at specific location to destination. */
451 void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
452 {
453 STREAM_VERIFY_SANE(s);
454
455 if (!GETP_VALID(s, from + size)) {
456 STREAM_BOUND_WARN(s, "get from");
457 return;
458 }
459
460 memcpy(dst, s->data + from, size);
461 }
462
463 bool stream_getl2(struct stream *s, uint32_t *l)
464 {
465 STREAM_VERIFY_SANE(s);
466
467 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
468 STREAM_BOUND_WARN2(s, "get long");
469 return false;
470 }
471
472 *l = (unsigned int)(s->data[s->getp++]) << 24;
473 *l |= s->data[s->getp++] << 16;
474 *l |= s->data[s->getp++] << 8;
475 *l |= s->data[s->getp++];
476
477 return true;
478 }
479
480 uint32_t stream_getl(struct stream *s)
481 {
482 uint32_t l;
483
484 STREAM_VERIFY_SANE(s);
485
486 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
487 STREAM_BOUND_WARN(s, "get long");
488 return 0;
489 }
490
491 l = (unsigned)(s->data[s->getp++]) << 24;
492 l |= s->data[s->getp++] << 16;
493 l |= s->data[s->getp++] << 8;
494 l |= s->data[s->getp++];
495
496 return l;
497 }
498
499 /* Get next quad word from the stream. */
500 uint64_t stream_getq_from(struct stream *s, size_t from)
501 {
502 uint64_t q;
503
504 STREAM_VERIFY_SANE(s);
505
506 if (!GETP_VALID(s, from + sizeof(uint64_t))) {
507 STREAM_BOUND_WARN(s, "get quad");
508 return 0;
509 }
510
511 q = ((uint64_t)s->data[from++]) << 56;
512 q |= ((uint64_t)s->data[from++]) << 48;
513 q |= ((uint64_t)s->data[from++]) << 40;
514 q |= ((uint64_t)s->data[from++]) << 32;
515 q |= ((uint64_t)s->data[from++]) << 24;
516 q |= ((uint64_t)s->data[from++]) << 16;
517 q |= ((uint64_t)s->data[from++]) << 8;
518 q |= ((uint64_t)s->data[from++]);
519
520 return q;
521 }
522
523 uint64_t stream_getq(struct stream *s)
524 {
525 uint64_t q;
526
527 STREAM_VERIFY_SANE(s);
528
529 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
530 STREAM_BOUND_WARN(s, "get quad");
531 return 0;
532 }
533
534 q = ((uint64_t)s->data[s->getp++]) << 56;
535 q |= ((uint64_t)s->data[s->getp++]) << 48;
536 q |= ((uint64_t)s->data[s->getp++]) << 40;
537 q |= ((uint64_t)s->data[s->getp++]) << 32;
538 q |= ((uint64_t)s->data[s->getp++]) << 24;
539 q |= ((uint64_t)s->data[s->getp++]) << 16;
540 q |= ((uint64_t)s->data[s->getp++]) << 8;
541 q |= ((uint64_t)s->data[s->getp++]);
542
543 return q;
544 }
545
546 /* Get next long word from the stream. */
547 uint32_t stream_get_ipv4(struct stream *s)
548 {
549 uint32_t l;
550
551 STREAM_VERIFY_SANE(s);
552
553 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
554 STREAM_BOUND_WARN(s, "get ipv4");
555 return 0;
556 }
557
558 memcpy(&l, s->data + s->getp, sizeof(uint32_t));
559 s->getp += sizeof(uint32_t);
560
561 return l;
562 }
563
564 float stream_getf(struct stream *s)
565 {
566 union {
567 float r;
568 uint32_t d;
569 } u;
570 u.d = stream_getl(s);
571 return u.r;
572 }
573
574 double stream_getd(struct stream *s)
575 {
576 union {
577 double r;
578 uint64_t d;
579 } u;
580 u.d = stream_getq(s);
581 return u.r;
582 }
583
584 /* Copy to source to stream.
585 *
586 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
587 * around. This should be fixed once the stream updates are working.
588 *
589 * stream_write() is saner
590 */
591 void stream_put(struct stream *s, const void *src, size_t size)
592 {
593
594 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
595 CHECK_SIZE(s, size);
596
597 STREAM_VERIFY_SANE(s);
598
599 if (STREAM_WRITEABLE(s) < size) {
600 STREAM_BOUND_WARN(s, "put");
601 return;
602 }
603
604 if (src)
605 memcpy(s->data + s->endp, src, size);
606 else
607 memset(s->data + s->endp, 0, size);
608
609 s->endp += size;
610 }
611
612 /* Put character to the stream. */
613 int stream_putc(struct stream *s, uint8_t c)
614 {
615 STREAM_VERIFY_SANE(s);
616
617 if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
618 STREAM_BOUND_WARN(s, "put");
619 return 0;
620 }
621
622 s->data[s->endp++] = c;
623 return sizeof(uint8_t);
624 }
625
626 /* Put word to the stream. */
627 int stream_putw(struct stream *s, uint16_t w)
628 {
629 STREAM_VERIFY_SANE(s);
630
631 if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
632 STREAM_BOUND_WARN(s, "put");
633 return 0;
634 }
635
636 s->data[s->endp++] = (uint8_t)(w >> 8);
637 s->data[s->endp++] = (uint8_t)w;
638
639 return 2;
640 }
641
642 /* Put long word to the stream. */
643 int stream_put3(struct stream *s, uint32_t l)
644 {
645 STREAM_VERIFY_SANE(s);
646
647 if (STREAM_WRITEABLE(s) < 3) {
648 STREAM_BOUND_WARN(s, "put");
649 return 0;
650 }
651
652 s->data[s->endp++] = (uint8_t)(l >> 16);
653 s->data[s->endp++] = (uint8_t)(l >> 8);
654 s->data[s->endp++] = (uint8_t)l;
655
656 return 3;
657 }
658
659 /* Put long word to the stream. */
660 int stream_putl(struct stream *s, uint32_t l)
661 {
662 STREAM_VERIFY_SANE(s);
663
664 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
665 STREAM_BOUND_WARN(s, "put");
666 return 0;
667 }
668
669 s->data[s->endp++] = (uint8_t)(l >> 24);
670 s->data[s->endp++] = (uint8_t)(l >> 16);
671 s->data[s->endp++] = (uint8_t)(l >> 8);
672 s->data[s->endp++] = (uint8_t)l;
673
674 return 4;
675 }
676
677 /* Put quad word to the stream. */
678 int stream_putq(struct stream *s, uint64_t q)
679 {
680 STREAM_VERIFY_SANE(s);
681
682 if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
683 STREAM_BOUND_WARN(s, "put quad");
684 return 0;
685 }
686
687 s->data[s->endp++] = (uint8_t)(q >> 56);
688 s->data[s->endp++] = (uint8_t)(q >> 48);
689 s->data[s->endp++] = (uint8_t)(q >> 40);
690 s->data[s->endp++] = (uint8_t)(q >> 32);
691 s->data[s->endp++] = (uint8_t)(q >> 24);
692 s->data[s->endp++] = (uint8_t)(q >> 16);
693 s->data[s->endp++] = (uint8_t)(q >> 8);
694 s->data[s->endp++] = (uint8_t)q;
695
696 return 8;
697 }
698
699 int stream_putf(struct stream *s, float f)
700 {
701 union {
702 float i;
703 uint32_t o;
704 } u;
705 u.i = f;
706 return stream_putl(s, u.o);
707 }
708
709 int stream_putd(struct stream *s, double d)
710 {
711 union {
712 double i;
713 uint64_t o;
714 } u;
715 u.i = d;
716 return stream_putq(s, u.o);
717 }
718
719 int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
720 {
721 STREAM_VERIFY_SANE(s);
722
723 if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
724 STREAM_BOUND_WARN(s, "put");
725 return 0;
726 }
727
728 s->data[putp] = c;
729
730 return 1;
731 }
732
733 int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
734 {
735 STREAM_VERIFY_SANE(s);
736
737 if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
738 STREAM_BOUND_WARN(s, "put");
739 return 0;
740 }
741
742 s->data[putp] = (uint8_t)(w >> 8);
743 s->data[putp + 1] = (uint8_t)w;
744
745 return 2;
746 }
747
748 int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
749 {
750 STREAM_VERIFY_SANE(s);
751
752 if (!PUT_AT_VALID(s, putp + 3)) {
753 STREAM_BOUND_WARN(s, "put");
754 return 0;
755 }
756 s->data[putp] = (uint8_t)(l >> 16);
757 s->data[putp + 1] = (uint8_t)(l >> 8);
758 s->data[putp + 2] = (uint8_t)l;
759
760 return 3;
761 }
762
763 int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
764 {
765 STREAM_VERIFY_SANE(s);
766
767 if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
768 STREAM_BOUND_WARN(s, "put");
769 return 0;
770 }
771 s->data[putp] = (uint8_t)(l >> 24);
772 s->data[putp + 1] = (uint8_t)(l >> 16);
773 s->data[putp + 2] = (uint8_t)(l >> 8);
774 s->data[putp + 3] = (uint8_t)l;
775
776 return 4;
777 }
778
779 int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
780 {
781 STREAM_VERIFY_SANE(s);
782
783 if (!PUT_AT_VALID(s, putp + sizeof(uint64_t))) {
784 STREAM_BOUND_WARN(s, "put");
785 return 0;
786 }
787 s->data[putp] = (uint8_t)(q >> 56);
788 s->data[putp + 1] = (uint8_t)(q >> 48);
789 s->data[putp + 2] = (uint8_t)(q >> 40);
790 s->data[putp + 3] = (uint8_t)(q >> 32);
791 s->data[putp + 4] = (uint8_t)(q >> 24);
792 s->data[putp + 5] = (uint8_t)(q >> 16);
793 s->data[putp + 6] = (uint8_t)(q >> 8);
794 s->data[putp + 7] = (uint8_t)q;
795
796 return 8;
797 }
798
799 /* Put long word to the stream. */
800 int stream_put_ipv4(struct stream *s, uint32_t l)
801 {
802 STREAM_VERIFY_SANE(s);
803
804 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
805 STREAM_BOUND_WARN(s, "put");
806 return 0;
807 }
808 memcpy(s->data + s->endp, &l, sizeof(uint32_t));
809 s->endp += sizeof(uint32_t);
810
811 return sizeof(uint32_t);
812 }
813
814 /* Put long word to the stream. */
815 int stream_put_in_addr(struct stream *s, const struct in_addr *addr)
816 {
817 STREAM_VERIFY_SANE(s);
818
819 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
820 STREAM_BOUND_WARN(s, "put");
821 return 0;
822 }
823
824 memcpy(s->data + s->endp, addr, sizeof(uint32_t));
825 s->endp += sizeof(uint32_t);
826
827 return sizeof(uint32_t);
828 }
829
830 /* Put in_addr at location in the stream. */
831 int stream_put_in_addr_at(struct stream *s, size_t putp,
832 const struct in_addr *addr)
833 {
834 STREAM_VERIFY_SANE(s);
835
836 if (!PUT_AT_VALID(s, putp + 4)) {
837 STREAM_BOUND_WARN(s, "put");
838 return 0;
839 }
840
841 memcpy(&s->data[putp], addr, 4);
842 return 4;
843 }
844
845 /* Put in6_addr at location in the stream. */
846 int stream_put_in6_addr_at(struct stream *s, size_t putp,
847 const struct in6_addr *addr)
848 {
849 STREAM_VERIFY_SANE(s);
850
851 if (!PUT_AT_VALID(s, putp + 16)) {
852 STREAM_BOUND_WARN(s, "put");
853 return 0;
854 }
855
856 memcpy(&s->data[putp], addr, 16);
857 return 16;
858 }
859
860 /* Put prefix by nlri type format. */
861 int stream_put_prefix_addpath(struct stream *s, const struct prefix *p,
862 int addpath_encode, uint32_t addpath_tx_id)
863 {
864 size_t psize;
865 size_t psize_with_addpath;
866
867 STREAM_VERIFY_SANE(s);
868
869 psize = PSIZE(p->prefixlen);
870
871 if (addpath_encode)
872 psize_with_addpath = psize + 4;
873 else
874 psize_with_addpath = psize;
875
876 if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
877 STREAM_BOUND_WARN(s, "put");
878 return 0;
879 }
880
881 if (addpath_encode) {
882 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
883 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
884 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
885 s->data[s->endp++] = (uint8_t)addpath_tx_id;
886 }
887
888 s->data[s->endp++] = p->prefixlen;
889 memcpy(s->data + s->endp, &p->u.prefix, psize);
890 s->endp += psize;
891
892 return psize;
893 }
894
895 int stream_put_prefix(struct stream *s, const struct prefix *p)
896 {
897 return stream_put_prefix_addpath(s, p, 0, 0);
898 }
899
900 /* Put NLRI with label */
901 int stream_put_labeled_prefix(struct stream *s, struct prefix *p,
902 mpls_label_t *label, int addpath_encode,
903 uint32_t addpath_tx_id)
904 {
905 size_t psize;
906 size_t psize_with_addpath;
907 uint8_t *label_pnt = (uint8_t *)label;
908
909 STREAM_VERIFY_SANE(s);
910
911 psize = PSIZE(p->prefixlen);
912 psize_with_addpath = psize + (addpath_encode ? 4 : 0);
913
914 if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) {
915 STREAM_BOUND_WARN(s, "put");
916 return 0;
917 }
918
919 if (addpath_encode) {
920 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
921 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
922 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
923 s->data[s->endp++] = (uint8_t)addpath_tx_id;
924 }
925
926 stream_putc(s, (p->prefixlen + 24));
927 stream_putc(s, label_pnt[0]);
928 stream_putc(s, label_pnt[1]);
929 stream_putc(s, label_pnt[2]);
930 memcpy(s->data + s->endp, &p->u.prefix, psize);
931 s->endp += psize;
932
933 return (psize + 3);
934 }
935
936 /* Read size from fd. */
937 int stream_read(struct stream *s, int fd, size_t size)
938 {
939 int nbytes;
940
941 STREAM_VERIFY_SANE(s);
942
943 if (STREAM_WRITEABLE(s) < size) {
944 STREAM_BOUND_WARN(s, "put");
945 return 0;
946 }
947
948 nbytes = readn(fd, s->data + s->endp, size);
949
950 if (nbytes > 0)
951 s->endp += nbytes;
952
953 return nbytes;
954 }
955
956 ssize_t stream_read_try(struct stream *s, int fd, size_t size)
957 {
958 ssize_t nbytes;
959
960 STREAM_VERIFY_SANE(s);
961
962 if (STREAM_WRITEABLE(s) < size) {
963 STREAM_BOUND_WARN(s, "put");
964 /* Fatal (not transient) error, since retrying will not help
965 (stream is too small to contain the desired data). */
966 return -1;
967 }
968
969 if ((nbytes = read(fd, s->data + s->endp, size)) >= 0) {
970 s->endp += nbytes;
971 return nbytes;
972 }
973 /* Error: was it transient (return -2) or fatal (return -1)? */
974 if (ERRNO_IO_RETRY(errno))
975 return -2;
976 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
977 safe_strerror(errno));
978 return -1;
979 }
980
981 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
982 * whose arguments match the remaining arguments to this function
983 */
984 ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
985 struct sockaddr *from, socklen_t *fromlen)
986 {
987 ssize_t nbytes;
988
989 STREAM_VERIFY_SANE(s);
990
991 if (STREAM_WRITEABLE(s) < size) {
992 STREAM_BOUND_WARN(s, "put");
993 /* Fatal (not transient) error, since retrying will not help
994 (stream is too small to contain the desired data). */
995 return -1;
996 }
997
998 if ((nbytes = recvfrom(fd, s->data + s->endp, size, flags, from,
999 fromlen))
1000 >= 0) {
1001 s->endp += nbytes;
1002 return nbytes;
1003 }
1004 /* Error: was it transient (return -2) or fatal (return -1)? */
1005 if (ERRNO_IO_RETRY(errno))
1006 return -2;
1007 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
1008 safe_strerror(errno));
1009 return -1;
1010 }
1011
1012 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1013 * from endp.
1014 * First iovec will be used to receive the data.
1015 * Stream need not be empty.
1016 */
1017 ssize_t stream_recvmsg(struct stream *s, int fd, struct msghdr *msgh, int flags,
1018 size_t size)
1019 {
1020 int nbytes;
1021 struct iovec *iov;
1022
1023 STREAM_VERIFY_SANE(s);
1024 assert(msgh->msg_iovlen > 0);
1025
1026 if (STREAM_WRITEABLE(s) < size) {
1027 STREAM_BOUND_WARN(s, "put");
1028 /* This is a logic error in the calling code: the stream is too
1029 small
1030 to hold the desired data! */
1031 return -1;
1032 }
1033
1034 iov = &(msgh->msg_iov[0]);
1035 iov->iov_base = (s->data + s->endp);
1036 iov->iov_len = size;
1037
1038 nbytes = recvmsg(fd, msgh, flags);
1039
1040 if (nbytes > 0)
1041 s->endp += nbytes;
1042
1043 return nbytes;
1044 }
1045
1046 /* Write data to buffer. */
1047 size_t stream_write(struct stream *s, const void *ptr, size_t size)
1048 {
1049
1050 CHECK_SIZE(s, size);
1051
1052 STREAM_VERIFY_SANE(s);
1053
1054 if (STREAM_WRITEABLE(s) < size) {
1055 STREAM_BOUND_WARN(s, "put");
1056 return 0;
1057 }
1058
1059 memcpy(s->data + s->endp, ptr, size);
1060 s->endp += size;
1061
1062 return size;
1063 }
1064
1065 /* Return current read pointer.
1066 * DEPRECATED!
1067 * Use stream_get_pnt_to if you must, but decoding streams properly
1068 * is preferred
1069 */
1070 uint8_t *stream_pnt(struct stream *s)
1071 {
1072 STREAM_VERIFY_SANE(s);
1073 return s->data + s->getp;
1074 }
1075
1076 /* Check does this stream empty? */
1077 int stream_empty(struct stream *s)
1078 {
1079 STREAM_VERIFY_SANE(s);
1080
1081 return (s->endp == 0);
1082 }
1083
1084 /* Reset stream. */
1085 void stream_reset(struct stream *s)
1086 {
1087 STREAM_VERIFY_SANE(s);
1088
1089 s->getp = s->endp = 0;
1090 }
1091
1092 /* Write stream contens to the file discriptor. */
1093 int stream_flush(struct stream *s, int fd)
1094 {
1095 int nbytes;
1096
1097 STREAM_VERIFY_SANE(s);
1098
1099 nbytes = write(fd, s->data + s->getp, s->endp - s->getp);
1100
1101 return nbytes;
1102 }
1103
1104 /* Stream first in first out queue. */
1105
1106 struct stream_fifo *stream_fifo_new(void)
1107 {
1108 struct stream_fifo *new;
1109
1110 new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
1111 pthread_mutex_init(&new->mtx, NULL);
1112 return new;
1113 }
1114
1115 /* Add new stream to fifo. */
1116 void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
1117 {
1118 #if defined DEV_BUILD
1119 size_t max, curmax;
1120 #endif
1121
1122 if (fifo->tail)
1123 fifo->tail->next = s;
1124 else
1125 fifo->head = s;
1126
1127 fifo->tail = s;
1128 fifo->tail->next = NULL;
1129 #if !defined DEV_BUILD
1130 atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1131 #else
1132 max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1133 curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
1134 if (max > curmax)
1135 atomic_store_explicit(&fifo->max_count, max,
1136 memory_order_relaxed);
1137 #endif
1138 }
1139
1140 void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
1141 {
1142 frr_with_mutex(&fifo->mtx) {
1143 stream_fifo_push(fifo, s);
1144 }
1145 }
1146
1147 /* Delete first stream from fifo. */
1148 struct stream *stream_fifo_pop(struct stream_fifo *fifo)
1149 {
1150 struct stream *s;
1151
1152 s = fifo->head;
1153
1154 if (s) {
1155 fifo->head = s->next;
1156
1157 if (fifo->head == NULL)
1158 fifo->tail = NULL;
1159
1160 atomic_fetch_sub_explicit(&fifo->count, 1,
1161 memory_order_release);
1162
1163 /* ensure stream is scrubbed of references to this fifo */
1164 s->next = NULL;
1165 }
1166
1167 return s;
1168 }
1169
1170 struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
1171 {
1172 struct stream *ret;
1173
1174 frr_with_mutex(&fifo->mtx) {
1175 ret = stream_fifo_pop(fifo);
1176 }
1177
1178 return ret;
1179 }
1180
1181 struct stream *stream_fifo_head(struct stream_fifo *fifo)
1182 {
1183 return fifo->head;
1184 }
1185
1186 struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
1187 {
1188 struct stream *ret;
1189
1190 frr_with_mutex(&fifo->mtx) {
1191 ret = stream_fifo_head(fifo);
1192 }
1193
1194 return ret;
1195 }
1196
1197 void stream_fifo_clean(struct stream_fifo *fifo)
1198 {
1199 struct stream *s;
1200 struct stream *next;
1201
1202 for (s = fifo->head; s; s = next) {
1203 next = s->next;
1204 stream_free(s);
1205 }
1206 fifo->head = fifo->tail = NULL;
1207 atomic_store_explicit(&fifo->count, 0, memory_order_release);
1208 }
1209
1210 void stream_fifo_clean_safe(struct stream_fifo *fifo)
1211 {
1212 frr_with_mutex(&fifo->mtx) {
1213 stream_fifo_clean(fifo);
1214 }
1215 }
1216
1217 size_t stream_fifo_count_safe(struct stream_fifo *fifo)
1218 {
1219 return atomic_load_explicit(&fifo->count, memory_order_acquire);
1220 }
1221
1222 void stream_fifo_free(struct stream_fifo *fifo)
1223 {
1224 stream_fifo_clean(fifo);
1225 pthread_mutex_destroy(&fifo->mtx);
1226 XFREE(MTYPE_STREAM_FIFO, fifo);
1227 }