]> git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
Merge pull request #2764 from opensourcerouting/isis-srcdest
[mirror_frr.git] / lib / stream.c
1 /*
2 * Packet interface
3 * Copyright (C) 1999 Kunihiro Ishiguro
4 *
5 * This file is part of GNU Zebra.
6 *
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
10 * later version.
11 *
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <zebra.h>
23 #include <stddef.h>
24 #include <pthread.h>
25
26 #include "stream.h"
27 #include "memory.h"
28 #include "network.h"
29 #include "prefix.h"
30 #include "log.h"
31
32 DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream")
33 DEFINE_MTYPE_STATIC(LIB, STREAM_DATA, "Stream data")
34 DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
35
36 /* Tests whether a position is valid */
37 #define GETP_VALID(S, G) ((G) <= (S)->endp)
38 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
39 #define ENDP_VALID(S, E) ((E) <= (S)->size)
40
41 /* asserting sanity checks. Following must be true before
42 * stream functions are called:
43 *
44 * Following must always be true of stream elements
45 * before and after calls to stream functions:
46 *
47 * getp <= endp <= size
48 *
49 * Note that after a stream function is called following may be true:
50 * if (getp == endp) then stream is no longer readable
51 * if (endp == size) then stream is no longer writeable
52 *
53 * It is valid to put to anywhere within the size of the stream, but only
54 * using stream_put..._at() functions.
55 */
56 #define STREAM_WARN_OFFSETS(S) \
57 zlog_warn("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
58 (void *)(S), (unsigned long)(S)->size, \
59 (unsigned long)(S)->getp, (unsigned long)(S)->endp)
60
61 #define STREAM_VERIFY_SANE(S) \
62 do { \
63 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) \
64 STREAM_WARN_OFFSETS(S); \
65 assert(GETP_VALID(S, (S)->getp)); \
66 assert(ENDP_VALID(S, (S)->endp)); \
67 } while (0)
68
69 #define STREAM_BOUND_WARN(S, WHAT) \
70 do { \
71 zlog_warn("%s: Attempt to %s out of bounds", __func__, \
72 (WHAT)); \
73 STREAM_WARN_OFFSETS(S); \
74 assert(0); \
75 } while (0)
76
77 #define STREAM_BOUND_WARN2(S, WHAT) \
78 do { \
79 zlog_warn("%s: Attempt to %s out of bounds", __func__, \
80 (WHAT)); \
81 STREAM_WARN_OFFSETS(S); \
82 } while (0)
83
84 /* XXX: Deprecated macro: do not use */
85 #define CHECK_SIZE(S, Z) \
86 do { \
87 if (((S)->endp + (Z)) > (S)->size) { \
88 zlog_warn( \
89 "CHECK_SIZE: truncating requested size %lu\n", \
90 (unsigned long)(Z)); \
91 STREAM_WARN_OFFSETS(S); \
92 (Z) = (S)->size - (S)->endp; \
93 } \
94 } while (0);
95
96 /* Make stream buffer. */
97 struct stream *stream_new(size_t size)
98 {
99 struct stream *s;
100
101 assert(size > 0);
102
103 s = XMALLOC(MTYPE_STREAM, sizeof(struct stream));
104
105 s->data = XMALLOC(MTYPE_STREAM_DATA, size);
106
107 s->getp = s->endp = 0;
108 s->next = NULL;
109 s->size = size;
110 return s;
111 }
112
113 /* Free it now. */
114 void stream_free(struct stream *s)
115 {
116 if (!s)
117 return;
118
119 XFREE(MTYPE_STREAM_DATA, s->data);
120 XFREE(MTYPE_STREAM, s);
121 }
122
123 struct stream *stream_copy(struct stream *new, struct stream *src)
124 {
125 STREAM_VERIFY_SANE(src);
126
127 assert(new != NULL);
128 assert(STREAM_SIZE(new) >= src->endp);
129
130 new->endp = src->endp;
131 new->getp = src->getp;
132
133 memcpy(new->data, src->data, src->endp);
134
135 return new;
136 }
137
138 struct stream *stream_dup(struct stream *s)
139 {
140 struct stream *new;
141
142 STREAM_VERIFY_SANE(s);
143
144 if ((new = stream_new(s->endp)) == NULL)
145 return NULL;
146
147 return (stream_copy(new, s));
148 }
149
150 struct stream *stream_dupcat(struct stream *s1, struct stream *s2,
151 size_t offset)
152 {
153 struct stream *new;
154
155 STREAM_VERIFY_SANE(s1);
156 STREAM_VERIFY_SANE(s2);
157
158 if ((new = stream_new(s1->endp + s2->endp)) == NULL)
159 return NULL;
160
161 memcpy(new->data, s1->data, offset);
162 memcpy(new->data + offset, s2->data, s2->endp);
163 memcpy(new->data + offset + s2->endp, s1->data + offset,
164 (s1->endp - offset));
165 new->endp = s1->endp + s2->endp;
166 return new;
167 }
168
169 size_t stream_resize(struct stream *s, size_t newsize)
170 {
171 uint8_t *newdata;
172 STREAM_VERIFY_SANE(s);
173
174 newdata = XREALLOC(MTYPE_STREAM_DATA, s->data, newsize);
175
176 if (newdata == NULL)
177 return s->size;
178
179 s->data = newdata;
180 s->size = newsize;
181
182 if (s->endp > s->size)
183 s->endp = s->size;
184 if (s->getp > s->endp)
185 s->getp = s->endp;
186
187 STREAM_VERIFY_SANE(s);
188
189 return s->size;
190 }
191
192 size_t stream_get_getp(struct stream *s)
193 {
194 STREAM_VERIFY_SANE(s);
195 return s->getp;
196 }
197
198 size_t stream_get_endp(struct stream *s)
199 {
200 STREAM_VERIFY_SANE(s);
201 return s->endp;
202 }
203
204 size_t stream_get_size(struct stream *s)
205 {
206 STREAM_VERIFY_SANE(s);
207 return s->size;
208 }
209
210 /* Stream structre' stream pointer related functions. */
211 void stream_set_getp(struct stream *s, size_t pos)
212 {
213 STREAM_VERIFY_SANE(s);
214
215 if (!GETP_VALID(s, pos)) {
216 STREAM_BOUND_WARN(s, "set getp");
217 pos = s->endp;
218 }
219
220 s->getp = pos;
221 }
222
223 void stream_set_endp(struct stream *s, size_t pos)
224 {
225 STREAM_VERIFY_SANE(s);
226
227 if (!ENDP_VALID(s, pos)) {
228 STREAM_BOUND_WARN(s, "set endp");
229 return;
230 }
231
232 /*
233 * Make sure the current read pointer is not beyond the new endp.
234 */
235 if (s->getp > pos) {
236 STREAM_BOUND_WARN(s, "set endp");
237 return;
238 }
239
240 s->endp = pos;
241 STREAM_VERIFY_SANE(s);
242 }
243
244 /* Forward pointer. */
245 void stream_forward_getp(struct stream *s, size_t size)
246 {
247 STREAM_VERIFY_SANE(s);
248
249 if (!GETP_VALID(s, s->getp + size)) {
250 STREAM_BOUND_WARN(s, "seek getp");
251 return;
252 }
253
254 s->getp += size;
255 }
256
257 void stream_forward_endp(struct stream *s, size_t size)
258 {
259 STREAM_VERIFY_SANE(s);
260
261 if (!ENDP_VALID(s, s->endp + size)) {
262 STREAM_BOUND_WARN(s, "seek endp");
263 return;
264 }
265
266 s->endp += size;
267 }
268
269 /* Copy from stream to destination. */
270 inline bool stream_get2(void *dst, struct stream *s, size_t size)
271 {
272 STREAM_VERIFY_SANE(s);
273
274 if (STREAM_READABLE(s) < size) {
275 STREAM_BOUND_WARN2(s, "get");
276 return false;
277 }
278
279 memcpy(dst, s->data + s->getp, size);
280 s->getp += size;
281
282 return true;
283 }
284
285 void stream_get(void *dst, struct stream *s, size_t size)
286 {
287 STREAM_VERIFY_SANE(s);
288
289 if (STREAM_READABLE(s) < size) {
290 STREAM_BOUND_WARN(s, "get");
291 return;
292 }
293
294 memcpy(dst, s->data + s->getp, size);
295 s->getp += size;
296 }
297
298 /* Get next character from the stream. */
299 inline bool stream_getc2(struct stream *s, uint8_t *byte)
300 {
301 STREAM_VERIFY_SANE(s);
302
303 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
304 STREAM_BOUND_WARN2(s, "get char");
305 return false;
306 }
307 *byte = s->data[s->getp++];
308
309 return true;
310 }
311
312 uint8_t stream_getc(struct stream *s)
313 {
314 uint8_t c;
315
316 STREAM_VERIFY_SANE(s);
317
318 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
319 STREAM_BOUND_WARN(s, "get char");
320 return 0;
321 }
322 c = s->data[s->getp++];
323
324 return c;
325 }
326
327 /* Get next character from the stream. */
328 uint8_t stream_getc_from(struct stream *s, size_t from)
329 {
330 uint8_t c;
331
332 STREAM_VERIFY_SANE(s);
333
334 if (!GETP_VALID(s, from + sizeof(uint8_t))) {
335 STREAM_BOUND_WARN(s, "get char");
336 return 0;
337 }
338
339 c = s->data[from];
340
341 return c;
342 }
343
344 inline bool stream_getw2(struct stream *s, uint16_t *word)
345 {
346 STREAM_VERIFY_SANE(s);
347
348 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
349 STREAM_BOUND_WARN2(s, "get ");
350 return false;
351 }
352
353 *word = s->data[s->getp++] << 8;
354 *word |= s->data[s->getp++];
355
356 return true;
357 }
358
359 /* Get next word from the stream. */
360 uint16_t stream_getw(struct stream *s)
361 {
362 uint16_t w;
363
364 STREAM_VERIFY_SANE(s);
365
366 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
367 STREAM_BOUND_WARN(s, "get ");
368 return 0;
369 }
370
371 w = s->data[s->getp++] << 8;
372 w |= s->data[s->getp++];
373
374 return w;
375 }
376
377 /* Get next word from the stream. */
378 uint16_t stream_getw_from(struct stream *s, size_t from)
379 {
380 uint16_t w;
381
382 STREAM_VERIFY_SANE(s);
383
384 if (!GETP_VALID(s, from + sizeof(uint16_t))) {
385 STREAM_BOUND_WARN(s, "get ");
386 return 0;
387 }
388
389 w = s->data[from++] << 8;
390 w |= s->data[from];
391
392 return w;
393 }
394
395 /* Get next 3-byte from the stream. */
396 uint32_t stream_get3_from(struct stream *s, size_t from)
397 {
398 uint32_t l;
399
400 STREAM_VERIFY_SANE(s);
401
402 if (!GETP_VALID(s, from + 3)) {
403 STREAM_BOUND_WARN(s, "get 3byte");
404 return 0;
405 }
406
407 l = s->data[from++] << 16;
408 l |= s->data[from++] << 8;
409 l |= s->data[from];
410
411 return l;
412 }
413
414 uint32_t stream_get3(struct stream *s)
415 {
416 uint32_t l;
417
418 STREAM_VERIFY_SANE(s);
419
420 if (STREAM_READABLE(s) < 3) {
421 STREAM_BOUND_WARN(s, "get 3byte");
422 return 0;
423 }
424
425 l = s->data[s->getp++] << 16;
426 l |= s->data[s->getp++] << 8;
427 l |= s->data[s->getp++];
428
429 return l;
430 }
431
432 /* Get next long word from the stream. */
433 uint32_t stream_getl_from(struct stream *s, size_t from)
434 {
435 uint32_t l;
436
437 STREAM_VERIFY_SANE(s);
438
439 if (!GETP_VALID(s, from + sizeof(uint32_t))) {
440 STREAM_BOUND_WARN(s, "get long");
441 return 0;
442 }
443
444 l = (unsigned)(s->data[from++]) << 24;
445 l |= s->data[from++] << 16;
446 l |= s->data[from++] << 8;
447 l |= s->data[from];
448
449 return l;
450 }
451
452 /* Copy from stream at specific location to destination. */
453 void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
454 {
455 STREAM_VERIFY_SANE(s);
456
457 if (!GETP_VALID(s, from + size)) {
458 STREAM_BOUND_WARN(s, "get from");
459 return;
460 }
461
462 memcpy(dst, s->data + from, size);
463 }
464
465 inline bool stream_getl2(struct stream *s, uint32_t *l)
466 {
467 STREAM_VERIFY_SANE(s);
468
469 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
470 STREAM_BOUND_WARN2(s, "get long");
471 return false;
472 }
473
474 *l = (unsigned int)(s->data[s->getp++]) << 24;
475 *l |= s->data[s->getp++] << 16;
476 *l |= s->data[s->getp++] << 8;
477 *l |= s->data[s->getp++];
478
479 return true;
480 }
481
482 uint32_t stream_getl(struct stream *s)
483 {
484 uint32_t l;
485
486 STREAM_VERIFY_SANE(s);
487
488 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
489 STREAM_BOUND_WARN(s, "get long");
490 return 0;
491 }
492
493 l = (unsigned)(s->data[s->getp++]) << 24;
494 l |= s->data[s->getp++] << 16;
495 l |= s->data[s->getp++] << 8;
496 l |= s->data[s->getp++];
497
498 return l;
499 }
500
501 /* Get next quad word from the stream. */
502 uint64_t stream_getq_from(struct stream *s, size_t from)
503 {
504 uint64_t q;
505
506 STREAM_VERIFY_SANE(s);
507
508 if (!GETP_VALID(s, from + sizeof(uint64_t))) {
509 STREAM_BOUND_WARN(s, "get quad");
510 return 0;
511 }
512
513 q = ((uint64_t)s->data[from++]) << 56;
514 q |= ((uint64_t)s->data[from++]) << 48;
515 q |= ((uint64_t)s->data[from++]) << 40;
516 q |= ((uint64_t)s->data[from++]) << 32;
517 q |= ((uint64_t)s->data[from++]) << 24;
518 q |= ((uint64_t)s->data[from++]) << 16;
519 q |= ((uint64_t)s->data[from++]) << 8;
520 q |= ((uint64_t)s->data[from++]);
521
522 return q;
523 }
524
525 uint64_t stream_getq(struct stream *s)
526 {
527 uint64_t q;
528
529 STREAM_VERIFY_SANE(s);
530
531 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
532 STREAM_BOUND_WARN(s, "get quad");
533 return 0;
534 }
535
536 q = ((uint64_t)s->data[s->getp++]) << 56;
537 q |= ((uint64_t)s->data[s->getp++]) << 48;
538 q |= ((uint64_t)s->data[s->getp++]) << 40;
539 q |= ((uint64_t)s->data[s->getp++]) << 32;
540 q |= ((uint64_t)s->data[s->getp++]) << 24;
541 q |= ((uint64_t)s->data[s->getp++]) << 16;
542 q |= ((uint64_t)s->data[s->getp++]) << 8;
543 q |= ((uint64_t)s->data[s->getp++]);
544
545 return q;
546 }
547
548 /* Get next long word from the stream. */
549 uint32_t stream_get_ipv4(struct stream *s)
550 {
551 uint32_t l;
552
553 STREAM_VERIFY_SANE(s);
554
555 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
556 STREAM_BOUND_WARN(s, "get ipv4");
557 return 0;
558 }
559
560 memcpy(&l, s->data + s->getp, sizeof(uint32_t));
561 s->getp += sizeof(uint32_t);
562
563 return l;
564 }
565
566 float stream_getf(struct stream *s)
567 {
568 union {
569 float r;
570 uint32_t d;
571 } u;
572 u.d = stream_getl(s);
573 return u.r;
574 }
575
576 double stream_getd(struct stream *s)
577 {
578 union {
579 double r;
580 uint64_t d;
581 } u;
582 u.d = stream_getq(s);
583 return u.r;
584 }
585
586 /* Copy to source to stream.
587 *
588 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
589 * around. This should be fixed once the stream updates are working.
590 *
591 * stream_write() is saner
592 */
593 void stream_put(struct stream *s, const void *src, size_t size)
594 {
595
596 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
597 CHECK_SIZE(s, size);
598
599 STREAM_VERIFY_SANE(s);
600
601 if (STREAM_WRITEABLE(s) < size) {
602 STREAM_BOUND_WARN(s, "put");
603 return;
604 }
605
606 if (src)
607 memcpy(s->data + s->endp, src, size);
608 else
609 memset(s->data + s->endp, 0, size);
610
611 s->endp += size;
612 }
613
614 /* Put character to the stream. */
615 int stream_putc(struct stream *s, uint8_t c)
616 {
617 STREAM_VERIFY_SANE(s);
618
619 if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
620 STREAM_BOUND_WARN(s, "put");
621 return 0;
622 }
623
624 s->data[s->endp++] = c;
625 return sizeof(uint8_t);
626 }
627
628 /* Put word to the stream. */
629 int stream_putw(struct stream *s, uint16_t w)
630 {
631 STREAM_VERIFY_SANE(s);
632
633 if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
634 STREAM_BOUND_WARN(s, "put");
635 return 0;
636 }
637
638 s->data[s->endp++] = (uint8_t)(w >> 8);
639 s->data[s->endp++] = (uint8_t)w;
640
641 return 2;
642 }
643
644 /* Put long word to the stream. */
645 int stream_put3(struct stream *s, uint32_t l)
646 {
647 STREAM_VERIFY_SANE(s);
648
649 if (STREAM_WRITEABLE(s) < 3) {
650 STREAM_BOUND_WARN(s, "put");
651 return 0;
652 }
653
654 s->data[s->endp++] = (uint8_t)(l >> 16);
655 s->data[s->endp++] = (uint8_t)(l >> 8);
656 s->data[s->endp++] = (uint8_t)l;
657
658 return 3;
659 }
660
661 /* Put long word to the stream. */
662 int stream_putl(struct stream *s, uint32_t l)
663 {
664 STREAM_VERIFY_SANE(s);
665
666 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
667 STREAM_BOUND_WARN(s, "put");
668 return 0;
669 }
670
671 s->data[s->endp++] = (uint8_t)(l >> 24);
672 s->data[s->endp++] = (uint8_t)(l >> 16);
673 s->data[s->endp++] = (uint8_t)(l >> 8);
674 s->data[s->endp++] = (uint8_t)l;
675
676 return 4;
677 }
678
679 /* Put quad word to the stream. */
680 int stream_putq(struct stream *s, uint64_t q)
681 {
682 STREAM_VERIFY_SANE(s);
683
684 if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
685 STREAM_BOUND_WARN(s, "put quad");
686 return 0;
687 }
688
689 s->data[s->endp++] = (uint8_t)(q >> 56);
690 s->data[s->endp++] = (uint8_t)(q >> 48);
691 s->data[s->endp++] = (uint8_t)(q >> 40);
692 s->data[s->endp++] = (uint8_t)(q >> 32);
693 s->data[s->endp++] = (uint8_t)(q >> 24);
694 s->data[s->endp++] = (uint8_t)(q >> 16);
695 s->data[s->endp++] = (uint8_t)(q >> 8);
696 s->data[s->endp++] = (uint8_t)q;
697
698 return 8;
699 }
700
701 int stream_putf(struct stream *s, float f)
702 {
703 union {
704 float i;
705 uint32_t o;
706 } u;
707 u.i = f;
708 return stream_putl(s, u.o);
709 }
710
711 int stream_putd(struct stream *s, double d)
712 {
713 union {
714 double i;
715 uint64_t o;
716 } u;
717 u.i = d;
718 return stream_putq(s, u.o);
719 }
720
721 int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
722 {
723 STREAM_VERIFY_SANE(s);
724
725 if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
726 STREAM_BOUND_WARN(s, "put");
727 return 0;
728 }
729
730 s->data[putp] = c;
731
732 return 1;
733 }
734
735 int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
736 {
737 STREAM_VERIFY_SANE(s);
738
739 if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
740 STREAM_BOUND_WARN(s, "put");
741 return 0;
742 }
743
744 s->data[putp] = (uint8_t)(w >> 8);
745 s->data[putp + 1] = (uint8_t)w;
746
747 return 2;
748 }
749
750 int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
751 {
752 STREAM_VERIFY_SANE(s);
753
754 if (!PUT_AT_VALID(s, putp + 3)) {
755 STREAM_BOUND_WARN(s, "put");
756 return 0;
757 }
758 s->data[putp] = (uint8_t)(l >> 16);
759 s->data[putp + 1] = (uint8_t)(l >> 8);
760 s->data[putp + 2] = (uint8_t)l;
761
762 return 3;
763 }
764
765 int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
766 {
767 STREAM_VERIFY_SANE(s);
768
769 if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
770 STREAM_BOUND_WARN(s, "put");
771 return 0;
772 }
773 s->data[putp] = (uint8_t)(l >> 24);
774 s->data[putp + 1] = (uint8_t)(l >> 16);
775 s->data[putp + 2] = (uint8_t)(l >> 8);
776 s->data[putp + 3] = (uint8_t)l;
777
778 return 4;
779 }
780
781 int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
782 {
783 STREAM_VERIFY_SANE(s);
784
785 if (!PUT_AT_VALID(s, putp + sizeof(uint64_t))) {
786 STREAM_BOUND_WARN(s, "put");
787 return 0;
788 }
789 s->data[putp] = (uint8_t)(q >> 56);
790 s->data[putp + 1] = (uint8_t)(q >> 48);
791 s->data[putp + 2] = (uint8_t)(q >> 40);
792 s->data[putp + 3] = (uint8_t)(q >> 32);
793 s->data[putp + 4] = (uint8_t)(q >> 24);
794 s->data[putp + 5] = (uint8_t)(q >> 16);
795 s->data[putp + 6] = (uint8_t)(q >> 8);
796 s->data[putp + 7] = (uint8_t)q;
797
798 return 8;
799 }
800
801 /* Put long word to the stream. */
802 int stream_put_ipv4(struct stream *s, uint32_t l)
803 {
804 STREAM_VERIFY_SANE(s);
805
806 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
807 STREAM_BOUND_WARN(s, "put");
808 return 0;
809 }
810 memcpy(s->data + s->endp, &l, sizeof(uint32_t));
811 s->endp += sizeof(uint32_t);
812
813 return sizeof(uint32_t);
814 }
815
816 /* Put long word to the stream. */
817 int stream_put_in_addr(struct stream *s, struct in_addr *addr)
818 {
819 STREAM_VERIFY_SANE(s);
820
821 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
822 STREAM_BOUND_WARN(s, "put");
823 return 0;
824 }
825
826 memcpy(s->data + s->endp, addr, sizeof(uint32_t));
827 s->endp += sizeof(uint32_t);
828
829 return sizeof(uint32_t);
830 }
831
832 /* Put in_addr at location in the stream. */
833 int stream_put_in_addr_at(struct stream *s, size_t putp, struct in_addr *addr)
834 {
835 STREAM_VERIFY_SANE(s);
836
837 if (!PUT_AT_VALID(s, putp + 4)) {
838 STREAM_BOUND_WARN(s, "put");
839 return 0;
840 }
841
842 memcpy(&s->data[putp], addr, 4);
843 return 4;
844 }
845
846 /* Put in6_addr at location in the stream. */
847 int stream_put_in6_addr_at(struct stream *s, size_t putp, struct in6_addr *addr)
848 {
849 STREAM_VERIFY_SANE(s);
850
851 if (!PUT_AT_VALID(s, putp + 16)) {
852 STREAM_BOUND_WARN(s, "put");
853 return 0;
854 }
855
856 memcpy(&s->data[putp], addr, 16);
857 return 16;
858 }
859
860 /* Put prefix by nlri type format. */
861 int stream_put_prefix_addpath(struct stream *s, struct prefix *p,
862 int addpath_encode, uint32_t addpath_tx_id)
863 {
864 size_t psize;
865 size_t psize_with_addpath;
866
867 STREAM_VERIFY_SANE(s);
868
869 psize = PSIZE(p->prefixlen);
870
871 if (addpath_encode)
872 psize_with_addpath = psize + 4;
873 else
874 psize_with_addpath = psize;
875
876 if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
877 STREAM_BOUND_WARN(s, "put");
878 return 0;
879 }
880
881 if (addpath_encode) {
882 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
883 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
884 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
885 s->data[s->endp++] = (uint8_t)addpath_tx_id;
886 }
887
888 s->data[s->endp++] = p->prefixlen;
889 memcpy(s->data + s->endp, &p->u.prefix, psize);
890 s->endp += psize;
891
892 return psize;
893 }
894
895 int stream_put_prefix(struct stream *s, struct prefix *p)
896 {
897 return stream_put_prefix_addpath(s, p, 0, 0);
898 }
899
900 /* Put NLRI with label */
901 int stream_put_labeled_prefix(struct stream *s, struct prefix *p,
902 mpls_label_t *label)
903 {
904 size_t psize;
905 uint8_t *label_pnt = (uint8_t *)label;
906
907 STREAM_VERIFY_SANE(s);
908
909 psize = PSIZE(p->prefixlen);
910
911 if (STREAM_WRITEABLE(s) < (psize + 3)) {
912 STREAM_BOUND_WARN(s, "put");
913 return 0;
914 }
915
916 stream_putc(s, (p->prefixlen + 24));
917 stream_putc(s, label_pnt[0]);
918 stream_putc(s, label_pnt[1]);
919 stream_putc(s, label_pnt[2]);
920 memcpy(s->data + s->endp, &p->u.prefix, psize);
921 s->endp += psize;
922
923 return (psize + 3);
924 }
925
926 /* Read size from fd. */
927 int stream_read(struct stream *s, int fd, size_t size)
928 {
929 int nbytes;
930
931 STREAM_VERIFY_SANE(s);
932
933 if (STREAM_WRITEABLE(s) < size) {
934 STREAM_BOUND_WARN(s, "put");
935 return 0;
936 }
937
938 nbytes = readn(fd, s->data + s->endp, size);
939
940 if (nbytes > 0)
941 s->endp += nbytes;
942
943 return nbytes;
944 }
945
946 ssize_t stream_read_try(struct stream *s, int fd, size_t size)
947 {
948 ssize_t nbytes;
949
950 STREAM_VERIFY_SANE(s);
951
952 if (STREAM_WRITEABLE(s) < size) {
953 STREAM_BOUND_WARN(s, "put");
954 /* Fatal (not transient) error, since retrying will not help
955 (stream is too small to contain the desired data). */
956 return -1;
957 }
958
959 if ((nbytes = read(fd, s->data + s->endp, size)) >= 0) {
960 s->endp += nbytes;
961 return nbytes;
962 }
963 /* Error: was it transient (return -2) or fatal (return -1)? */
964 if (ERRNO_IO_RETRY(errno))
965 return -2;
966 zlog_warn("%s: read failed on fd %d: %s", __func__, fd,
967 safe_strerror(errno));
968 return -1;
969 }
970
971 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
972 * whose arguments match the remaining arguments to this function
973 */
974 ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
975 struct sockaddr *from, socklen_t *fromlen)
976 {
977 ssize_t nbytes;
978
979 STREAM_VERIFY_SANE(s);
980
981 if (STREAM_WRITEABLE(s) < size) {
982 STREAM_BOUND_WARN(s, "put");
983 /* Fatal (not transient) error, since retrying will not help
984 (stream is too small to contain the desired data). */
985 return -1;
986 }
987
988 if ((nbytes = recvfrom(fd, s->data + s->endp, size, flags, from,
989 fromlen))
990 >= 0) {
991 s->endp += nbytes;
992 return nbytes;
993 }
994 /* Error: was it transient (return -2) or fatal (return -1)? */
995 if (ERRNO_IO_RETRY(errno))
996 return -2;
997 zlog_warn("%s: read failed on fd %d: %s", __func__, fd,
998 safe_strerror(errno));
999 return -1;
1000 }
1001
1002 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1003 * from endp.
1004 * First iovec will be used to receive the data.
1005 * Stream need not be empty.
1006 */
1007 ssize_t stream_recvmsg(struct stream *s, int fd, struct msghdr *msgh, int flags,
1008 size_t size)
1009 {
1010 int nbytes;
1011 struct iovec *iov;
1012
1013 STREAM_VERIFY_SANE(s);
1014 assert(msgh->msg_iovlen > 0);
1015
1016 if (STREAM_WRITEABLE(s) < size) {
1017 STREAM_BOUND_WARN(s, "put");
1018 /* This is a logic error in the calling code: the stream is too
1019 small
1020 to hold the desired data! */
1021 return -1;
1022 }
1023
1024 iov = &(msgh->msg_iov[0]);
1025 iov->iov_base = (s->data + s->endp);
1026 iov->iov_len = size;
1027
1028 nbytes = recvmsg(fd, msgh, flags);
1029
1030 if (nbytes > 0)
1031 s->endp += nbytes;
1032
1033 return nbytes;
1034 }
1035
1036 /* Write data to buffer. */
1037 size_t stream_write(struct stream *s, const void *ptr, size_t size)
1038 {
1039
1040 CHECK_SIZE(s, size);
1041
1042 STREAM_VERIFY_SANE(s);
1043
1044 if (STREAM_WRITEABLE(s) < size) {
1045 STREAM_BOUND_WARN(s, "put");
1046 return 0;
1047 }
1048
1049 memcpy(s->data + s->endp, ptr, size);
1050 s->endp += size;
1051
1052 return size;
1053 }
1054
1055 /* Return current read pointer.
1056 * DEPRECATED!
1057 * Use stream_get_pnt_to if you must, but decoding streams properly
1058 * is preferred
1059 */
1060 uint8_t *stream_pnt(struct stream *s)
1061 {
1062 STREAM_VERIFY_SANE(s);
1063 return s->data + s->getp;
1064 }
1065
1066 /* Check does this stream empty? */
1067 int stream_empty(struct stream *s)
1068 {
1069 STREAM_VERIFY_SANE(s);
1070
1071 return (s->endp == 0);
1072 }
1073
1074 /* Reset stream. */
1075 void stream_reset(struct stream *s)
1076 {
1077 STREAM_VERIFY_SANE(s);
1078
1079 s->getp = s->endp = 0;
1080 }
1081
1082 /* Write stream contens to the file discriptor. */
1083 int stream_flush(struct stream *s, int fd)
1084 {
1085 int nbytes;
1086
1087 STREAM_VERIFY_SANE(s);
1088
1089 nbytes = write(fd, s->data + s->getp, s->endp - s->getp);
1090
1091 return nbytes;
1092 }
1093
1094 /* Stream first in first out queue. */
1095
1096 struct stream_fifo *stream_fifo_new(void)
1097 {
1098 struct stream_fifo *new;
1099
1100 new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
1101 pthread_mutex_init(&new->mtx, NULL);
1102 return new;
1103 }
1104
1105 /* Add new stream to fifo. */
1106 void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
1107 {
1108 #if defined DEV_BUILD
1109 size_t max, curmax;
1110 #endif
1111
1112 if (fifo->tail)
1113 fifo->tail->next = s;
1114 else
1115 fifo->head = s;
1116
1117 fifo->tail = s;
1118 fifo->tail->next = NULL;
1119 #if !defined DEV_BUILD
1120 atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1121 #else
1122 max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1123 curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
1124 if (max > curmax)
1125 atomic_store_explicit(&fifo->max_count, max,
1126 memory_order_relaxed);
1127 #endif
1128 }
1129
1130 void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
1131 {
1132 pthread_mutex_lock(&fifo->mtx);
1133 {
1134 stream_fifo_push(fifo, s);
1135 }
1136 pthread_mutex_unlock(&fifo->mtx);
1137 }
1138
1139 /* Delete first stream from fifo. */
1140 struct stream *stream_fifo_pop(struct stream_fifo *fifo)
1141 {
1142 struct stream *s;
1143
1144 s = fifo->head;
1145
1146 if (s) {
1147 fifo->head = s->next;
1148
1149 if (fifo->head == NULL)
1150 fifo->tail = NULL;
1151
1152 atomic_fetch_sub_explicit(&fifo->count, 1,
1153 memory_order_release);
1154
1155 /* ensure stream is scrubbed of references to this fifo */
1156 s->next = NULL;
1157 }
1158
1159 return s;
1160 }
1161
1162 struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
1163 {
1164 struct stream *ret;
1165
1166 pthread_mutex_lock(&fifo->mtx);
1167 {
1168 ret = stream_fifo_pop(fifo);
1169 }
1170 pthread_mutex_unlock(&fifo->mtx);
1171
1172 return ret;
1173 }
1174
1175 struct stream *stream_fifo_head(struct stream_fifo *fifo)
1176 {
1177 return fifo->head;
1178 }
1179
1180 struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
1181 {
1182 struct stream *ret;
1183
1184 pthread_mutex_lock(&fifo->mtx);
1185 {
1186 ret = stream_fifo_head(fifo);
1187 }
1188 pthread_mutex_unlock(&fifo->mtx);
1189
1190 return ret;
1191 }
1192
1193 void stream_fifo_clean(struct stream_fifo *fifo)
1194 {
1195 struct stream *s;
1196 struct stream *next;
1197
1198 for (s = fifo->head; s; s = next) {
1199 next = s->next;
1200 stream_free(s);
1201 }
1202 fifo->head = fifo->tail = NULL;
1203 atomic_store_explicit(&fifo->count, 0, memory_order_release);
1204 }
1205
1206 void stream_fifo_clean_safe(struct stream_fifo *fifo)
1207 {
1208 pthread_mutex_lock(&fifo->mtx);
1209 {
1210 stream_fifo_clean(fifo);
1211 }
1212 pthread_mutex_unlock(&fifo->mtx);
1213 }
1214
1215 size_t stream_fifo_count_safe(struct stream_fifo *fifo)
1216 {
1217 return atomic_load_explicit(&fifo->count, memory_order_acquire);
1218 }
1219
1220 void stream_fifo_free(struct stream_fifo *fifo)
1221 {
1222 stream_fifo_clean(fifo);
1223 pthread_mutex_destroy(&fifo->mtx);
1224 XFREE(MTYPE_STREAM_FIFO, fifo);
1225 }