]> git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
Merge pull request #2609 from pacovn/clang_scan_nhrpd_vici_initialization
[mirror_frr.git] / lib / stream.c
1 /*
2 * Packet interface
3 * Copyright (C) 1999 Kunihiro Ishiguro
4 *
5 * This file is part of GNU Zebra.
6 *
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
10 * later version.
11 *
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <zebra.h>
23 #include <stddef.h>
24 #include <pthread.h>
25
26 #include "stream.h"
27 #include "memory.h"
28 #include "network.h"
29 #include "prefix.h"
30 #include "log.h"
31
32 DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream")
33 DEFINE_MTYPE_STATIC(LIB, STREAM_DATA, "Stream data")
34 DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
35
36 /* Tests whether a position is valid */
37 #define GETP_VALID(S, G) ((G) <= (S)->endp)
38 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
39 #define ENDP_VALID(S, E) ((E) <= (S)->size)
40
41 /* asserting sanity checks. Following must be true before
42 * stream functions are called:
43 *
44 * Following must always be true of stream elements
45 * before and after calls to stream functions:
46 *
47 * getp <= endp <= size
48 *
49 * Note that after a stream function is called following may be true:
50 * if (getp == endp) then stream is no longer readable
51 * if (endp == size) then stream is no longer writeable
52 *
53 * It is valid to put to anywhere within the size of the stream, but only
54 * using stream_put..._at() functions.
55 */
56 #define STREAM_WARN_OFFSETS(S) \
57 zlog_warn("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
58 (void *)(S), (unsigned long)(S)->size, \
59 (unsigned long)(S)->getp, (unsigned long)(S)->endp)
60
61 #define STREAM_VERIFY_SANE(S) \
62 do { \
63 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) \
64 STREAM_WARN_OFFSETS(S); \
65 assert(GETP_VALID(S, (S)->getp)); \
66 assert(ENDP_VALID(S, (S)->endp)); \
67 } while (0)
68
69 #define STREAM_BOUND_WARN(S, WHAT) \
70 do { \
71 zlog_warn("%s: Attempt to %s out of bounds", __func__, \
72 (WHAT)); \
73 STREAM_WARN_OFFSETS(S); \
74 assert(0); \
75 } while (0)
76
77 #define STREAM_BOUND_WARN2(S, WHAT) \
78 do { \
79 zlog_warn("%s: Attempt to %s out of bounds", __func__, \
80 (WHAT)); \
81 STREAM_WARN_OFFSETS(S); \
82 } while (0)
83
84 /* XXX: Deprecated macro: do not use */
85 #define CHECK_SIZE(S, Z) \
86 do { \
87 if (((S)->endp + (Z)) > (S)->size) { \
88 zlog_warn( \
89 "CHECK_SIZE: truncating requested size %lu\n", \
90 (unsigned long)(Z)); \
91 STREAM_WARN_OFFSETS(S); \
92 (Z) = (S)->size - (S)->endp; \
93 } \
94 } while (0);
95
96 /* Make stream buffer. */
97 struct stream *stream_new(size_t size)
98 {
99 struct stream *s;
100
101 assert(size > 0);
102
103 s = XCALLOC(MTYPE_STREAM, sizeof(struct stream));
104
105 if (s == NULL)
106 return s;
107
108 if ((s->data = XMALLOC(MTYPE_STREAM_DATA, size)) == NULL) {
109 XFREE(MTYPE_STREAM, s);
110 return NULL;
111 }
112
113 s->size = size;
114 return s;
115 }
116
117 /* Free it now. */
118 void stream_free(struct stream *s)
119 {
120 if (!s)
121 return;
122
123 XFREE(MTYPE_STREAM_DATA, s->data);
124 XFREE(MTYPE_STREAM, s);
125 }
126
127 struct stream *stream_copy(struct stream *new, struct stream *src)
128 {
129 STREAM_VERIFY_SANE(src);
130
131 assert(new != NULL);
132 assert(STREAM_SIZE(new) >= src->endp);
133
134 new->endp = src->endp;
135 new->getp = src->getp;
136
137 memcpy(new->data, src->data, src->endp);
138
139 return new;
140 }
141
142 struct stream *stream_dup(struct stream *s)
143 {
144 struct stream *new;
145
146 STREAM_VERIFY_SANE(s);
147
148 if ((new = stream_new(s->endp)) == NULL)
149 return NULL;
150
151 return (stream_copy(new, s));
152 }
153
154 struct stream *stream_dupcat(struct stream *s1, struct stream *s2,
155 size_t offset)
156 {
157 struct stream *new;
158
159 STREAM_VERIFY_SANE(s1);
160 STREAM_VERIFY_SANE(s2);
161
162 if ((new = stream_new(s1->endp + s2->endp)) == NULL)
163 return NULL;
164
165 memcpy(new->data, s1->data, offset);
166 memcpy(new->data + offset, s2->data, s2->endp);
167 memcpy(new->data + offset + s2->endp, s1->data + offset,
168 (s1->endp - offset));
169 new->endp = s1->endp + s2->endp;
170 return new;
171 }
172
173 size_t stream_resize(struct stream *s, size_t newsize)
174 {
175 uint8_t *newdata;
176 STREAM_VERIFY_SANE(s);
177
178 newdata = XREALLOC(MTYPE_STREAM_DATA, s->data, newsize);
179
180 if (newdata == NULL)
181 return s->size;
182
183 s->data = newdata;
184 s->size = newsize;
185
186 if (s->endp > s->size)
187 s->endp = s->size;
188 if (s->getp > s->endp)
189 s->getp = s->endp;
190
191 STREAM_VERIFY_SANE(s);
192
193 return s->size;
194 }
195
196 size_t stream_get_getp(struct stream *s)
197 {
198 STREAM_VERIFY_SANE(s);
199 return s->getp;
200 }
201
202 size_t stream_get_endp(struct stream *s)
203 {
204 STREAM_VERIFY_SANE(s);
205 return s->endp;
206 }
207
208 size_t stream_get_size(struct stream *s)
209 {
210 STREAM_VERIFY_SANE(s);
211 return s->size;
212 }
213
214 /* Stream structre' stream pointer related functions. */
215 void stream_set_getp(struct stream *s, size_t pos)
216 {
217 STREAM_VERIFY_SANE(s);
218
219 if (!GETP_VALID(s, pos)) {
220 STREAM_BOUND_WARN(s, "set getp");
221 pos = s->endp;
222 }
223
224 s->getp = pos;
225 }
226
227 void stream_set_endp(struct stream *s, size_t pos)
228 {
229 STREAM_VERIFY_SANE(s);
230
231 if (!ENDP_VALID(s, pos)) {
232 STREAM_BOUND_WARN(s, "set endp");
233 return;
234 }
235
236 /*
237 * Make sure the current read pointer is not beyond the new endp.
238 */
239 if (s->getp > pos) {
240 STREAM_BOUND_WARN(s, "set endp");
241 return;
242 }
243
244 s->endp = pos;
245 STREAM_VERIFY_SANE(s);
246 }
247
248 /* Forward pointer. */
249 void stream_forward_getp(struct stream *s, size_t size)
250 {
251 STREAM_VERIFY_SANE(s);
252
253 if (!GETP_VALID(s, s->getp + size)) {
254 STREAM_BOUND_WARN(s, "seek getp");
255 return;
256 }
257
258 s->getp += size;
259 }
260
261 void stream_forward_endp(struct stream *s, size_t size)
262 {
263 STREAM_VERIFY_SANE(s);
264
265 if (!ENDP_VALID(s, s->endp + size)) {
266 STREAM_BOUND_WARN(s, "seek endp");
267 return;
268 }
269
270 s->endp += size;
271 }
272
273 /* Copy from stream to destination. */
274 inline bool stream_get2(void *dst, struct stream *s, size_t size)
275 {
276 STREAM_VERIFY_SANE(s);
277
278 if (STREAM_READABLE(s) < size) {
279 STREAM_BOUND_WARN2(s, "get");
280 return false;
281 }
282
283 memcpy(dst, s->data + s->getp, size);
284 s->getp += size;
285
286 return true;
287 }
288
289 void stream_get(void *dst, struct stream *s, size_t size)
290 {
291 STREAM_VERIFY_SANE(s);
292
293 if (STREAM_READABLE(s) < size) {
294 STREAM_BOUND_WARN(s, "get");
295 return;
296 }
297
298 memcpy(dst, s->data + s->getp, size);
299 s->getp += size;
300 }
301
302 /* Get next character from the stream. */
303 inline bool stream_getc2(struct stream *s, uint8_t *byte)
304 {
305 STREAM_VERIFY_SANE(s);
306
307 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
308 STREAM_BOUND_WARN2(s, "get char");
309 return false;
310 }
311 *byte = s->data[s->getp++];
312
313 return true;
314 }
315
316 uint8_t stream_getc(struct stream *s)
317 {
318 uint8_t c;
319
320 STREAM_VERIFY_SANE(s);
321
322 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
323 STREAM_BOUND_WARN(s, "get char");
324 return 0;
325 }
326 c = s->data[s->getp++];
327
328 return c;
329 }
330
331 /* Get next character from the stream. */
332 uint8_t stream_getc_from(struct stream *s, size_t from)
333 {
334 uint8_t c;
335
336 STREAM_VERIFY_SANE(s);
337
338 if (!GETP_VALID(s, from + sizeof(uint8_t))) {
339 STREAM_BOUND_WARN(s, "get char");
340 return 0;
341 }
342
343 c = s->data[from];
344
345 return c;
346 }
347
348 inline bool stream_getw2(struct stream *s, uint16_t *word)
349 {
350 STREAM_VERIFY_SANE(s);
351
352 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
353 STREAM_BOUND_WARN2(s, "get ");
354 return false;
355 }
356
357 *word = s->data[s->getp++] << 8;
358 *word |= s->data[s->getp++];
359
360 return true;
361 }
362
363 /* Get next word from the stream. */
364 uint16_t stream_getw(struct stream *s)
365 {
366 uint16_t w;
367
368 STREAM_VERIFY_SANE(s);
369
370 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
371 STREAM_BOUND_WARN(s, "get ");
372 return 0;
373 }
374
375 w = s->data[s->getp++] << 8;
376 w |= s->data[s->getp++];
377
378 return w;
379 }
380
381 /* Get next word from the stream. */
382 uint16_t stream_getw_from(struct stream *s, size_t from)
383 {
384 uint16_t w;
385
386 STREAM_VERIFY_SANE(s);
387
388 if (!GETP_VALID(s, from + sizeof(uint16_t))) {
389 STREAM_BOUND_WARN(s, "get ");
390 return 0;
391 }
392
393 w = s->data[from++] << 8;
394 w |= s->data[from];
395
396 return w;
397 }
398
399 /* Get next 3-byte from the stream. */
400 uint32_t stream_get3_from(struct stream *s, size_t from)
401 {
402 uint32_t l;
403
404 STREAM_VERIFY_SANE(s);
405
406 if (!GETP_VALID(s, from + 3)) {
407 STREAM_BOUND_WARN(s, "get 3byte");
408 return 0;
409 }
410
411 l = s->data[from++] << 16;
412 l |= s->data[from++] << 8;
413 l |= s->data[from];
414
415 return l;
416 }
417
418 uint32_t stream_get3(struct stream *s)
419 {
420 uint32_t l;
421
422 STREAM_VERIFY_SANE(s);
423
424 if (STREAM_READABLE(s) < 3) {
425 STREAM_BOUND_WARN(s, "get 3byte");
426 return 0;
427 }
428
429 l = s->data[s->getp++] << 16;
430 l |= s->data[s->getp++] << 8;
431 l |= s->data[s->getp++];
432
433 return l;
434 }
435
436 /* Get next long word from the stream. */
437 uint32_t stream_getl_from(struct stream *s, size_t from)
438 {
439 uint32_t l;
440
441 STREAM_VERIFY_SANE(s);
442
443 if (!GETP_VALID(s, from + sizeof(uint32_t))) {
444 STREAM_BOUND_WARN(s, "get long");
445 return 0;
446 }
447
448 l = (unsigned)(s->data[from++]) << 24;
449 l |= s->data[from++] << 16;
450 l |= s->data[from++] << 8;
451 l |= s->data[from];
452
453 return l;
454 }
455
456 /* Copy from stream at specific location to destination. */
457 void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
458 {
459 STREAM_VERIFY_SANE(s);
460
461 if (!GETP_VALID(s, from + size)) {
462 STREAM_BOUND_WARN(s, "get from");
463 return;
464 }
465
466 memcpy(dst, s->data + from, size);
467 }
468
469 inline bool stream_getl2(struct stream *s, uint32_t *l)
470 {
471 STREAM_VERIFY_SANE(s);
472
473 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
474 STREAM_BOUND_WARN2(s, "get long");
475 return false;
476 }
477
478 *l = (unsigned int)(s->data[s->getp++]) << 24;
479 *l |= s->data[s->getp++] << 16;
480 *l |= s->data[s->getp++] << 8;
481 *l |= s->data[s->getp++];
482
483 return true;
484 }
485
486 uint32_t stream_getl(struct stream *s)
487 {
488 uint32_t l;
489
490 STREAM_VERIFY_SANE(s);
491
492 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
493 STREAM_BOUND_WARN(s, "get long");
494 return 0;
495 }
496
497 l = (unsigned)(s->data[s->getp++]) << 24;
498 l |= s->data[s->getp++] << 16;
499 l |= s->data[s->getp++] << 8;
500 l |= s->data[s->getp++];
501
502 return l;
503 }
504
505 /* Get next quad word from the stream. */
506 uint64_t stream_getq_from(struct stream *s, size_t from)
507 {
508 uint64_t q;
509
510 STREAM_VERIFY_SANE(s);
511
512 if (!GETP_VALID(s, from + sizeof(uint64_t))) {
513 STREAM_BOUND_WARN(s, "get quad");
514 return 0;
515 }
516
517 q = ((uint64_t)s->data[from++]) << 56;
518 q |= ((uint64_t)s->data[from++]) << 48;
519 q |= ((uint64_t)s->data[from++]) << 40;
520 q |= ((uint64_t)s->data[from++]) << 32;
521 q |= ((uint64_t)s->data[from++]) << 24;
522 q |= ((uint64_t)s->data[from++]) << 16;
523 q |= ((uint64_t)s->data[from++]) << 8;
524 q |= ((uint64_t)s->data[from++]);
525
526 return q;
527 }
528
529 uint64_t stream_getq(struct stream *s)
530 {
531 uint64_t q;
532
533 STREAM_VERIFY_SANE(s);
534
535 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
536 STREAM_BOUND_WARN(s, "get quad");
537 return 0;
538 }
539
540 q = ((uint64_t)s->data[s->getp++]) << 56;
541 q |= ((uint64_t)s->data[s->getp++]) << 48;
542 q |= ((uint64_t)s->data[s->getp++]) << 40;
543 q |= ((uint64_t)s->data[s->getp++]) << 32;
544 q |= ((uint64_t)s->data[s->getp++]) << 24;
545 q |= ((uint64_t)s->data[s->getp++]) << 16;
546 q |= ((uint64_t)s->data[s->getp++]) << 8;
547 q |= ((uint64_t)s->data[s->getp++]);
548
549 return q;
550 }
551
552 /* Get next long word from the stream. */
553 uint32_t stream_get_ipv4(struct stream *s)
554 {
555 uint32_t l;
556
557 STREAM_VERIFY_SANE(s);
558
559 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
560 STREAM_BOUND_WARN(s, "get ipv4");
561 return 0;
562 }
563
564 memcpy(&l, s->data + s->getp, sizeof(uint32_t));
565 s->getp += sizeof(uint32_t);
566
567 return l;
568 }
569
570 float stream_getf(struct stream *s)
571 {
572 union {
573 float r;
574 uint32_t d;
575 } u;
576 u.d = stream_getl(s);
577 return u.r;
578 }
579
580 double stream_getd(struct stream *s)
581 {
582 union {
583 double r;
584 uint64_t d;
585 } u;
586 u.d = stream_getq(s);
587 return u.r;
588 }
589
590 /* Copy to source to stream.
591 *
592 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
593 * around. This should be fixed once the stream updates are working.
594 *
595 * stream_write() is saner
596 */
597 void stream_put(struct stream *s, const void *src, size_t size)
598 {
599
600 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
601 CHECK_SIZE(s, size);
602
603 STREAM_VERIFY_SANE(s);
604
605 if (STREAM_WRITEABLE(s) < size) {
606 STREAM_BOUND_WARN(s, "put");
607 return;
608 }
609
610 if (src)
611 memcpy(s->data + s->endp, src, size);
612 else
613 memset(s->data + s->endp, 0, size);
614
615 s->endp += size;
616 }
617
618 /* Put character to the stream. */
619 int stream_putc(struct stream *s, uint8_t c)
620 {
621 STREAM_VERIFY_SANE(s);
622
623 if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
624 STREAM_BOUND_WARN(s, "put");
625 return 0;
626 }
627
628 s->data[s->endp++] = c;
629 return sizeof(uint8_t);
630 }
631
632 /* Put word to the stream. */
633 int stream_putw(struct stream *s, uint16_t w)
634 {
635 STREAM_VERIFY_SANE(s);
636
637 if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
638 STREAM_BOUND_WARN(s, "put");
639 return 0;
640 }
641
642 s->data[s->endp++] = (uint8_t)(w >> 8);
643 s->data[s->endp++] = (uint8_t)w;
644
645 return 2;
646 }
647
648 /* Put long word to the stream. */
649 int stream_put3(struct stream *s, uint32_t l)
650 {
651 STREAM_VERIFY_SANE(s);
652
653 if (STREAM_WRITEABLE(s) < 3) {
654 STREAM_BOUND_WARN(s, "put");
655 return 0;
656 }
657
658 s->data[s->endp++] = (uint8_t)(l >> 16);
659 s->data[s->endp++] = (uint8_t)(l >> 8);
660 s->data[s->endp++] = (uint8_t)l;
661
662 return 3;
663 }
664
665 /* Put long word to the stream. */
666 int stream_putl(struct stream *s, uint32_t l)
667 {
668 STREAM_VERIFY_SANE(s);
669
670 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
671 STREAM_BOUND_WARN(s, "put");
672 return 0;
673 }
674
675 s->data[s->endp++] = (uint8_t)(l >> 24);
676 s->data[s->endp++] = (uint8_t)(l >> 16);
677 s->data[s->endp++] = (uint8_t)(l >> 8);
678 s->data[s->endp++] = (uint8_t)l;
679
680 return 4;
681 }
682
683 /* Put quad word to the stream. */
684 int stream_putq(struct stream *s, uint64_t q)
685 {
686 STREAM_VERIFY_SANE(s);
687
688 if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
689 STREAM_BOUND_WARN(s, "put quad");
690 return 0;
691 }
692
693 s->data[s->endp++] = (uint8_t)(q >> 56);
694 s->data[s->endp++] = (uint8_t)(q >> 48);
695 s->data[s->endp++] = (uint8_t)(q >> 40);
696 s->data[s->endp++] = (uint8_t)(q >> 32);
697 s->data[s->endp++] = (uint8_t)(q >> 24);
698 s->data[s->endp++] = (uint8_t)(q >> 16);
699 s->data[s->endp++] = (uint8_t)(q >> 8);
700 s->data[s->endp++] = (uint8_t)q;
701
702 return 8;
703 }
704
705 int stream_putf(struct stream *s, float f)
706 {
707 union {
708 float i;
709 uint32_t o;
710 } u;
711 u.i = f;
712 return stream_putl(s, u.o);
713 }
714
715 int stream_putd(struct stream *s, double d)
716 {
717 union {
718 double i;
719 uint64_t o;
720 } u;
721 u.i = d;
722 return stream_putq(s, u.o);
723 }
724
725 int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
726 {
727 STREAM_VERIFY_SANE(s);
728
729 if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
730 STREAM_BOUND_WARN(s, "put");
731 return 0;
732 }
733
734 s->data[putp] = c;
735
736 return 1;
737 }
738
739 int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
740 {
741 STREAM_VERIFY_SANE(s);
742
743 if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
744 STREAM_BOUND_WARN(s, "put");
745 return 0;
746 }
747
748 s->data[putp] = (uint8_t)(w >> 8);
749 s->data[putp + 1] = (uint8_t)w;
750
751 return 2;
752 }
753
754 int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
755 {
756 STREAM_VERIFY_SANE(s);
757
758 if (!PUT_AT_VALID(s, putp + 3)) {
759 STREAM_BOUND_WARN(s, "put");
760 return 0;
761 }
762 s->data[putp] = (uint8_t)(l >> 16);
763 s->data[putp + 1] = (uint8_t)(l >> 8);
764 s->data[putp + 2] = (uint8_t)l;
765
766 return 3;
767 }
768
769 int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
770 {
771 STREAM_VERIFY_SANE(s);
772
773 if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
774 STREAM_BOUND_WARN(s, "put");
775 return 0;
776 }
777 s->data[putp] = (uint8_t)(l >> 24);
778 s->data[putp + 1] = (uint8_t)(l >> 16);
779 s->data[putp + 2] = (uint8_t)(l >> 8);
780 s->data[putp + 3] = (uint8_t)l;
781
782 return 4;
783 }
784
785 int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
786 {
787 STREAM_VERIFY_SANE(s);
788
789 if (!PUT_AT_VALID(s, putp + sizeof(uint64_t))) {
790 STREAM_BOUND_WARN(s, "put");
791 return 0;
792 }
793 s->data[putp] = (uint8_t)(q >> 56);
794 s->data[putp + 1] = (uint8_t)(q >> 48);
795 s->data[putp + 2] = (uint8_t)(q >> 40);
796 s->data[putp + 3] = (uint8_t)(q >> 32);
797 s->data[putp + 4] = (uint8_t)(q >> 24);
798 s->data[putp + 5] = (uint8_t)(q >> 16);
799 s->data[putp + 6] = (uint8_t)(q >> 8);
800 s->data[putp + 7] = (uint8_t)q;
801
802 return 8;
803 }
804
805 /* Put long word to the stream. */
806 int stream_put_ipv4(struct stream *s, uint32_t l)
807 {
808 STREAM_VERIFY_SANE(s);
809
810 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
811 STREAM_BOUND_WARN(s, "put");
812 return 0;
813 }
814 memcpy(s->data + s->endp, &l, sizeof(uint32_t));
815 s->endp += sizeof(uint32_t);
816
817 return sizeof(uint32_t);
818 }
819
820 /* Put long word to the stream. */
821 int stream_put_in_addr(struct stream *s, struct in_addr *addr)
822 {
823 STREAM_VERIFY_SANE(s);
824
825 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
826 STREAM_BOUND_WARN(s, "put");
827 return 0;
828 }
829
830 memcpy(s->data + s->endp, addr, sizeof(uint32_t));
831 s->endp += sizeof(uint32_t);
832
833 return sizeof(uint32_t);
834 }
835
836 /* Put in_addr at location in the stream. */
837 int stream_put_in_addr_at(struct stream *s, size_t putp, struct in_addr *addr)
838 {
839 STREAM_VERIFY_SANE(s);
840
841 if (!PUT_AT_VALID(s, putp + 4)) {
842 STREAM_BOUND_WARN(s, "put");
843 return 0;
844 }
845
846 memcpy(&s->data[putp], addr, 4);
847 return 4;
848 }
849
850 /* Put in6_addr at location in the stream. */
851 int stream_put_in6_addr_at(struct stream *s, size_t putp, struct in6_addr *addr)
852 {
853 STREAM_VERIFY_SANE(s);
854
855 if (!PUT_AT_VALID(s, putp + 16)) {
856 STREAM_BOUND_WARN(s, "put");
857 return 0;
858 }
859
860 memcpy(&s->data[putp], addr, 16);
861 return 16;
862 }
863
864 /* Put prefix by nlri type format. */
865 int stream_put_prefix_addpath(struct stream *s, struct prefix *p,
866 int addpath_encode, uint32_t addpath_tx_id)
867 {
868 size_t psize;
869 size_t psize_with_addpath;
870
871 STREAM_VERIFY_SANE(s);
872
873 psize = PSIZE(p->prefixlen);
874
875 if (addpath_encode)
876 psize_with_addpath = psize + 4;
877 else
878 psize_with_addpath = psize;
879
880 if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
881 STREAM_BOUND_WARN(s, "put");
882 return 0;
883 }
884
885 if (addpath_encode) {
886 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
887 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
888 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
889 s->data[s->endp++] = (uint8_t)addpath_tx_id;
890 }
891
892 s->data[s->endp++] = p->prefixlen;
893 memcpy(s->data + s->endp, &p->u.prefix, psize);
894 s->endp += psize;
895
896 return psize;
897 }
898
899 int stream_put_prefix(struct stream *s, struct prefix *p)
900 {
901 return stream_put_prefix_addpath(s, p, 0, 0);
902 }
903
904 /* Put NLRI with label */
905 int stream_put_labeled_prefix(struct stream *s, struct prefix *p,
906 mpls_label_t *label)
907 {
908 size_t psize;
909 uint8_t *label_pnt = (uint8_t *)label;
910
911 STREAM_VERIFY_SANE(s);
912
913 psize = PSIZE(p->prefixlen);
914
915 if (STREAM_WRITEABLE(s) < (psize + 3)) {
916 STREAM_BOUND_WARN(s, "put");
917 return 0;
918 }
919
920 stream_putc(s, (p->prefixlen + 24));
921 stream_putc(s, label_pnt[0]);
922 stream_putc(s, label_pnt[1]);
923 stream_putc(s, label_pnt[2]);
924 memcpy(s->data + s->endp, &p->u.prefix, psize);
925 s->endp += psize;
926
927 return (psize + 3);
928 }
929
930 /* Read size from fd. */
931 int stream_read(struct stream *s, int fd, size_t size)
932 {
933 int nbytes;
934
935 STREAM_VERIFY_SANE(s);
936
937 if (STREAM_WRITEABLE(s) < size) {
938 STREAM_BOUND_WARN(s, "put");
939 return 0;
940 }
941
942 nbytes = readn(fd, s->data + s->endp, size);
943
944 if (nbytes > 0)
945 s->endp += nbytes;
946
947 return nbytes;
948 }
949
950 ssize_t stream_read_try(struct stream *s, int fd, size_t size)
951 {
952 ssize_t nbytes;
953
954 STREAM_VERIFY_SANE(s);
955
956 if (STREAM_WRITEABLE(s) < size) {
957 STREAM_BOUND_WARN(s, "put");
958 /* Fatal (not transient) error, since retrying will not help
959 (stream is too small to contain the desired data). */
960 return -1;
961 }
962
963 if ((nbytes = read(fd, s->data + s->endp, size)) >= 0) {
964 s->endp += nbytes;
965 return nbytes;
966 }
967 /* Error: was it transient (return -2) or fatal (return -1)? */
968 if (ERRNO_IO_RETRY(errno))
969 return -2;
970 zlog_warn("%s: read failed on fd %d: %s", __func__, fd,
971 safe_strerror(errno));
972 return -1;
973 }
974
975 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
976 * whose arguments match the remaining arguments to this function
977 */
978 ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
979 struct sockaddr *from, socklen_t *fromlen)
980 {
981 ssize_t nbytes;
982
983 STREAM_VERIFY_SANE(s);
984
985 if (STREAM_WRITEABLE(s) < size) {
986 STREAM_BOUND_WARN(s, "put");
987 /* Fatal (not transient) error, since retrying will not help
988 (stream is too small to contain the desired data). */
989 return -1;
990 }
991
992 if ((nbytes = recvfrom(fd, s->data + s->endp, size, flags, from,
993 fromlen))
994 >= 0) {
995 s->endp += nbytes;
996 return nbytes;
997 }
998 /* Error: was it transient (return -2) or fatal (return -1)? */
999 if (ERRNO_IO_RETRY(errno))
1000 return -2;
1001 zlog_warn("%s: read failed on fd %d: %s", __func__, fd,
1002 safe_strerror(errno));
1003 return -1;
1004 }
1005
1006 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1007 * from endp.
1008 * First iovec will be used to receive the data.
1009 * Stream need not be empty.
1010 */
1011 ssize_t stream_recvmsg(struct stream *s, int fd, struct msghdr *msgh, int flags,
1012 size_t size)
1013 {
1014 int nbytes;
1015 struct iovec *iov;
1016
1017 STREAM_VERIFY_SANE(s);
1018 assert(msgh->msg_iovlen > 0);
1019
1020 if (STREAM_WRITEABLE(s) < size) {
1021 STREAM_BOUND_WARN(s, "put");
1022 /* This is a logic error in the calling code: the stream is too
1023 small
1024 to hold the desired data! */
1025 return -1;
1026 }
1027
1028 iov = &(msgh->msg_iov[0]);
1029 iov->iov_base = (s->data + s->endp);
1030 iov->iov_len = size;
1031
1032 nbytes = recvmsg(fd, msgh, flags);
1033
1034 if (nbytes > 0)
1035 s->endp += nbytes;
1036
1037 return nbytes;
1038 }
1039
1040 /* Write data to buffer. */
1041 size_t stream_write(struct stream *s, const void *ptr, size_t size)
1042 {
1043
1044 CHECK_SIZE(s, size);
1045
1046 STREAM_VERIFY_SANE(s);
1047
1048 if (STREAM_WRITEABLE(s) < size) {
1049 STREAM_BOUND_WARN(s, "put");
1050 return 0;
1051 }
1052
1053 memcpy(s->data + s->endp, ptr, size);
1054 s->endp += size;
1055
1056 return size;
1057 }
1058
1059 /* Return current read pointer.
1060 * DEPRECATED!
1061 * Use stream_get_pnt_to if you must, but decoding streams properly
1062 * is preferred
1063 */
1064 uint8_t *stream_pnt(struct stream *s)
1065 {
1066 STREAM_VERIFY_SANE(s);
1067 return s->data + s->getp;
1068 }
1069
1070 /* Check does this stream empty? */
1071 int stream_empty(struct stream *s)
1072 {
1073 STREAM_VERIFY_SANE(s);
1074
1075 return (s->endp == 0);
1076 }
1077
1078 /* Reset stream. */
1079 void stream_reset(struct stream *s)
1080 {
1081 STREAM_VERIFY_SANE(s);
1082
1083 s->getp = s->endp = 0;
1084 }
1085
1086 /* Write stream contens to the file discriptor. */
1087 int stream_flush(struct stream *s, int fd)
1088 {
1089 int nbytes;
1090
1091 STREAM_VERIFY_SANE(s);
1092
1093 nbytes = write(fd, s->data + s->getp, s->endp - s->getp);
1094
1095 return nbytes;
1096 }
1097
1098 /* Stream first in first out queue. */
1099
1100 struct stream_fifo *stream_fifo_new(void)
1101 {
1102 struct stream_fifo *new;
1103
1104 new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
1105 pthread_mutex_init(&new->mtx, NULL);
1106 return new;
1107 }
1108
1109 /* Add new stream to fifo. */
1110 void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
1111 {
1112 #if defined DEV_BUILD
1113 size_t max, curmax;
1114 #endif
1115
1116 if (fifo->tail)
1117 fifo->tail->next = s;
1118 else
1119 fifo->head = s;
1120
1121 fifo->tail = s;
1122 fifo->tail->next = NULL;
1123 #if !defined DEV_BUILD
1124 atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1125 #else
1126 max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1127 curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
1128 if (max > curmax)
1129 atomic_store_explicit(&fifo->max_count, max,
1130 memory_order_relaxed);
1131 #endif
1132 }
1133
1134 void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
1135 {
1136 pthread_mutex_lock(&fifo->mtx);
1137 {
1138 stream_fifo_push(fifo, s);
1139 }
1140 pthread_mutex_unlock(&fifo->mtx);
1141 }
1142
1143 /* Delete first stream from fifo. */
1144 struct stream *stream_fifo_pop(struct stream_fifo *fifo)
1145 {
1146 struct stream *s;
1147
1148 s = fifo->head;
1149
1150 if (s) {
1151 fifo->head = s->next;
1152
1153 if (fifo->head == NULL)
1154 fifo->tail = NULL;
1155
1156 atomic_fetch_sub_explicit(&fifo->count, 1,
1157 memory_order_release);
1158
1159 /* ensure stream is scrubbed of references to this fifo */
1160 s->next = NULL;
1161 }
1162
1163 return s;
1164 }
1165
1166 struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
1167 {
1168 struct stream *ret;
1169
1170 pthread_mutex_lock(&fifo->mtx);
1171 {
1172 ret = stream_fifo_pop(fifo);
1173 }
1174 pthread_mutex_unlock(&fifo->mtx);
1175
1176 return ret;
1177 }
1178
1179 struct stream *stream_fifo_head(struct stream_fifo *fifo)
1180 {
1181 return fifo->head;
1182 }
1183
1184 struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
1185 {
1186 struct stream *ret;
1187
1188 pthread_mutex_lock(&fifo->mtx);
1189 {
1190 ret = stream_fifo_head(fifo);
1191 }
1192 pthread_mutex_unlock(&fifo->mtx);
1193
1194 return ret;
1195 }
1196
1197 void stream_fifo_clean(struct stream_fifo *fifo)
1198 {
1199 struct stream *s;
1200 struct stream *next;
1201
1202 for (s = fifo->head; s; s = next) {
1203 next = s->next;
1204 stream_free(s);
1205 }
1206 fifo->head = fifo->tail = NULL;
1207 atomic_store_explicit(&fifo->count, 0, memory_order_release);
1208 }
1209
1210 void stream_fifo_clean_safe(struct stream_fifo *fifo)
1211 {
1212 pthread_mutex_lock(&fifo->mtx);
1213 {
1214 stream_fifo_clean(fifo);
1215 }
1216 pthread_mutex_unlock(&fifo->mtx);
1217 }
1218
1219 size_t stream_fifo_count_safe(struct stream_fifo *fifo)
1220 {
1221 return atomic_load_explicit(&fifo->count, memory_order_acquire);
1222 }
1223
1224 void stream_fifo_free(struct stream_fifo *fifo)
1225 {
1226 stream_fifo_clean(fifo);
1227 pthread_mutex_destroy(&fifo->mtx);
1228 XFREE(MTYPE_STREAM_FIFO, fifo);
1229 }