]> git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
tools: fix frr-reload.py daemon option
[mirror_frr.git] / lib / stream.c
1 /*
2 * Packet interface
3 * Copyright (C) 1999 Kunihiro Ishiguro
4 *
5 * This file is part of GNU Zebra.
6 *
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
10 * later version.
11 *
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <zebra.h>
23 #include <stddef.h>
24 #include <pthread.h>
25
26 #include "stream.h"
27 #include "memory.h"
28 #include "network.h"
29 #include "prefix.h"
30 #include "log.h"
31 #include "frr_pthread.h"
32 #include "lib_errors.h"
33
34 DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream")
35 DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
36
37 /* Tests whether a position is valid */
38 #define GETP_VALID(S, G) ((G) <= (S)->endp)
39 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
40 #define ENDP_VALID(S, E) ((E) <= (S)->size)
41
42 /* asserting sanity checks. Following must be true before
43 * stream functions are called:
44 *
45 * Following must always be true of stream elements
46 * before and after calls to stream functions:
47 *
48 * getp <= endp <= size
49 *
50 * Note that after a stream function is called following may be true:
51 * if (getp == endp) then stream is no longer readable
52 * if (endp == size) then stream is no longer writeable
53 *
54 * It is valid to put to anywhere within the size of the stream, but only
55 * using stream_put..._at() functions.
56 */
57 #define STREAM_WARN_OFFSETS(S) \
58 flog_warn(EC_LIB_STREAM, \
59 "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
60 (void *)(S), (unsigned long)(S)->size, \
61 (unsigned long)(S)->getp, (unsigned long)(S)->endp)
62
63 #define STREAM_VERIFY_SANE(S) \
64 do { \
65 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) \
66 STREAM_WARN_OFFSETS(S); \
67 assert(GETP_VALID(S, (S)->getp)); \
68 assert(ENDP_VALID(S, (S)->endp)); \
69 } while (0)
70
71 #define STREAM_BOUND_WARN(S, WHAT) \
72 do { \
73 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
74 __func__, (WHAT)); \
75 STREAM_WARN_OFFSETS(S); \
76 assert(0); \
77 } while (0)
78
79 #define STREAM_BOUND_WARN2(S, WHAT) \
80 do { \
81 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
82 __func__, (WHAT)); \
83 STREAM_WARN_OFFSETS(S); \
84 } while (0)
85
86 /* XXX: Deprecated macro: do not use */
87 #define CHECK_SIZE(S, Z) \
88 do { \
89 if (((S)->endp + (Z)) > (S)->size) { \
90 flog_warn( \
91 EC_LIB_STREAM, \
92 "CHECK_SIZE: truncating requested size %lu\n", \
93 (unsigned long)(Z)); \
94 STREAM_WARN_OFFSETS(S); \
95 (Z) = (S)->size - (S)->endp; \
96 } \
97 } while (0);
98
99 /* Make stream buffer. */
100 struct stream *stream_new(size_t size)
101 {
102 struct stream *s;
103
104 assert(size > 0);
105
106 s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size);
107
108 s->getp = s->endp = 0;
109 s->next = NULL;
110 s->size = size;
111 return s;
112 }
113
114 /* Free it now. */
115 void stream_free(struct stream *s)
116 {
117 if (!s)
118 return;
119
120 XFREE(MTYPE_STREAM, s);
121 }
122
123 struct stream *stream_copy(struct stream *dest, const struct stream *src)
124 {
125 STREAM_VERIFY_SANE(src);
126
127 assert(dest != NULL);
128 assert(STREAM_SIZE(dest) >= src->endp);
129
130 dest->endp = src->endp;
131 dest->getp = src->getp;
132
133 memcpy(dest->data, src->data, src->endp);
134
135 return dest;
136 }
137
138 struct stream *stream_dup(const struct stream *s)
139 {
140 struct stream *snew;
141
142 STREAM_VERIFY_SANE(s);
143
144 if ((snew = stream_new(s->endp)) == NULL)
145 return NULL;
146
147 return (stream_copy(snew, s));
148 }
149
150 struct stream *stream_dupcat(const struct stream *s1, const struct stream *s2,
151 size_t offset)
152 {
153 struct stream *new;
154
155 STREAM_VERIFY_SANE(s1);
156 STREAM_VERIFY_SANE(s2);
157
158 if ((new = stream_new(s1->endp + s2->endp)) == NULL)
159 return NULL;
160
161 memcpy(new->data, s1->data, offset);
162 memcpy(new->data + offset, s2->data, s2->endp);
163 memcpy(new->data + offset + s2->endp, s1->data + offset,
164 (s1->endp - offset));
165 new->endp = s1->endp + s2->endp;
166 return new;
167 }
168
169 size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
170 {
171 struct stream *orig = *sptr;
172
173 STREAM_VERIFY_SANE(orig);
174
175 orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize);
176
177 orig->size = newsize;
178
179 if (orig->endp > orig->size)
180 orig->endp = orig->size;
181 if (orig->getp > orig->endp)
182 orig->getp = orig->endp;
183
184 STREAM_VERIFY_SANE(orig);
185
186 *sptr = orig;
187 return orig->size;
188 }
189
190 size_t stream_get_getp(const struct stream *s)
191 {
192 STREAM_VERIFY_SANE(s);
193 return s->getp;
194 }
195
196 size_t stream_get_endp(const struct stream *s)
197 {
198 STREAM_VERIFY_SANE(s);
199 return s->endp;
200 }
201
202 size_t stream_get_size(const struct stream *s)
203 {
204 STREAM_VERIFY_SANE(s);
205 return s->size;
206 }
207
208 /* Stream structre' stream pointer related functions. */
209 void stream_set_getp(struct stream *s, size_t pos)
210 {
211 STREAM_VERIFY_SANE(s);
212
213 if (!GETP_VALID(s, pos)) {
214 STREAM_BOUND_WARN(s, "set getp");
215 pos = s->endp;
216 }
217
218 s->getp = pos;
219 }
220
221 void stream_set_endp(struct stream *s, size_t pos)
222 {
223 STREAM_VERIFY_SANE(s);
224
225 if (!ENDP_VALID(s, pos)) {
226 STREAM_BOUND_WARN(s, "set endp");
227 return;
228 }
229
230 /*
231 * Make sure the current read pointer is not beyond the new endp.
232 */
233 if (s->getp > pos) {
234 STREAM_BOUND_WARN(s, "set endp");
235 return;
236 }
237
238 s->endp = pos;
239 STREAM_VERIFY_SANE(s);
240 }
241
242 /* Forward pointer. */
243 void stream_forward_getp(struct stream *s, size_t size)
244 {
245 STREAM_VERIFY_SANE(s);
246
247 if (!GETP_VALID(s, s->getp + size)) {
248 STREAM_BOUND_WARN(s, "seek getp");
249 return;
250 }
251
252 s->getp += size;
253 }
254
255 void stream_forward_endp(struct stream *s, size_t size)
256 {
257 STREAM_VERIFY_SANE(s);
258
259 if (!ENDP_VALID(s, s->endp + size)) {
260 STREAM_BOUND_WARN(s, "seek endp");
261 return;
262 }
263
264 s->endp += size;
265 }
266
267 /* Copy from stream to destination. */
268 bool stream_get2(void *dst, struct stream *s, size_t size)
269 {
270 STREAM_VERIFY_SANE(s);
271
272 if (STREAM_READABLE(s) < size) {
273 STREAM_BOUND_WARN2(s, "get");
274 return false;
275 }
276
277 memcpy(dst, s->data + s->getp, size);
278 s->getp += size;
279
280 return true;
281 }
282
283 void stream_get(void *dst, struct stream *s, size_t size)
284 {
285 STREAM_VERIFY_SANE(s);
286
287 if (STREAM_READABLE(s) < size) {
288 STREAM_BOUND_WARN(s, "get");
289 return;
290 }
291
292 memcpy(dst, s->data + s->getp, size);
293 s->getp += size;
294 }
295
296 /* Get next character from the stream. */
297 bool stream_getc2(struct stream *s, uint8_t *byte)
298 {
299 STREAM_VERIFY_SANE(s);
300
301 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
302 STREAM_BOUND_WARN2(s, "get char");
303 return false;
304 }
305 *byte = s->data[s->getp++];
306
307 return true;
308 }
309
310 uint8_t stream_getc(struct stream *s)
311 {
312 uint8_t c;
313
314 STREAM_VERIFY_SANE(s);
315
316 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
317 STREAM_BOUND_WARN(s, "get char");
318 return 0;
319 }
320 c = s->data[s->getp++];
321
322 return c;
323 }
324
325 /* Get next character from the stream. */
326 uint8_t stream_getc_from(struct stream *s, size_t from)
327 {
328 uint8_t c;
329
330 STREAM_VERIFY_SANE(s);
331
332 if (!GETP_VALID(s, from + sizeof(uint8_t))) {
333 STREAM_BOUND_WARN(s, "get char");
334 return 0;
335 }
336
337 c = s->data[from];
338
339 return c;
340 }
341
342 bool stream_getw2(struct stream *s, uint16_t *word)
343 {
344 STREAM_VERIFY_SANE(s);
345
346 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
347 STREAM_BOUND_WARN2(s, "get ");
348 return false;
349 }
350
351 *word = s->data[s->getp++] << 8;
352 *word |= s->data[s->getp++];
353
354 return true;
355 }
356
357 /* Get next word from the stream. */
358 uint16_t stream_getw(struct stream *s)
359 {
360 uint16_t w;
361
362 STREAM_VERIFY_SANE(s);
363
364 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
365 STREAM_BOUND_WARN(s, "get ");
366 return 0;
367 }
368
369 w = s->data[s->getp++] << 8;
370 w |= s->data[s->getp++];
371
372 return w;
373 }
374
375 /* Get next word from the stream. */
376 uint16_t stream_getw_from(struct stream *s, size_t from)
377 {
378 uint16_t w;
379
380 STREAM_VERIFY_SANE(s);
381
382 if (!GETP_VALID(s, from + sizeof(uint16_t))) {
383 STREAM_BOUND_WARN(s, "get ");
384 return 0;
385 }
386
387 w = s->data[from++] << 8;
388 w |= s->data[from];
389
390 return w;
391 }
392
393 /* Get next 3-byte from the stream. */
394 uint32_t stream_get3_from(struct stream *s, size_t from)
395 {
396 uint32_t l;
397
398 STREAM_VERIFY_SANE(s);
399
400 if (!GETP_VALID(s, from + 3)) {
401 STREAM_BOUND_WARN(s, "get 3byte");
402 return 0;
403 }
404
405 l = s->data[from++] << 16;
406 l |= s->data[from++] << 8;
407 l |= s->data[from];
408
409 return l;
410 }
411
412 uint32_t stream_get3(struct stream *s)
413 {
414 uint32_t l;
415
416 STREAM_VERIFY_SANE(s);
417
418 if (STREAM_READABLE(s) < 3) {
419 STREAM_BOUND_WARN(s, "get 3byte");
420 return 0;
421 }
422
423 l = s->data[s->getp++] << 16;
424 l |= s->data[s->getp++] << 8;
425 l |= s->data[s->getp++];
426
427 return l;
428 }
429
430 /* Get next long word from the stream. */
431 uint32_t stream_getl_from(struct stream *s, size_t from)
432 {
433 uint32_t l;
434
435 STREAM_VERIFY_SANE(s);
436
437 if (!GETP_VALID(s, from + sizeof(uint32_t))) {
438 STREAM_BOUND_WARN(s, "get long");
439 return 0;
440 }
441
442 l = (unsigned)(s->data[from++]) << 24;
443 l |= s->data[from++] << 16;
444 l |= s->data[from++] << 8;
445 l |= s->data[from];
446
447 return l;
448 }
449
450 /* Copy from stream at specific location to destination. */
451 void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
452 {
453 STREAM_VERIFY_SANE(s);
454
455 if (!GETP_VALID(s, from + size)) {
456 STREAM_BOUND_WARN(s, "get from");
457 return;
458 }
459
460 memcpy(dst, s->data + from, size);
461 }
462
463 bool stream_getl2(struct stream *s, uint32_t *l)
464 {
465 STREAM_VERIFY_SANE(s);
466
467 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
468 STREAM_BOUND_WARN2(s, "get long");
469 return false;
470 }
471
472 *l = (unsigned int)(s->data[s->getp++]) << 24;
473 *l |= s->data[s->getp++] << 16;
474 *l |= s->data[s->getp++] << 8;
475 *l |= s->data[s->getp++];
476
477 return true;
478 }
479
480 uint32_t stream_getl(struct stream *s)
481 {
482 uint32_t l;
483
484 STREAM_VERIFY_SANE(s);
485
486 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
487 STREAM_BOUND_WARN(s, "get long");
488 return 0;
489 }
490
491 l = (unsigned)(s->data[s->getp++]) << 24;
492 l |= s->data[s->getp++] << 16;
493 l |= s->data[s->getp++] << 8;
494 l |= s->data[s->getp++];
495
496 return l;
497 }
498
499 /* Get next quad word from the stream. */
500 uint64_t stream_getq_from(struct stream *s, size_t from)
501 {
502 uint64_t q;
503
504 STREAM_VERIFY_SANE(s);
505
506 if (!GETP_VALID(s, from + sizeof(uint64_t))) {
507 STREAM_BOUND_WARN(s, "get quad");
508 return 0;
509 }
510
511 q = ((uint64_t)s->data[from++]) << 56;
512 q |= ((uint64_t)s->data[from++]) << 48;
513 q |= ((uint64_t)s->data[from++]) << 40;
514 q |= ((uint64_t)s->data[from++]) << 32;
515 q |= ((uint64_t)s->data[from++]) << 24;
516 q |= ((uint64_t)s->data[from++]) << 16;
517 q |= ((uint64_t)s->data[from++]) << 8;
518 q |= ((uint64_t)s->data[from++]);
519
520 return q;
521 }
522
523 uint64_t stream_getq(struct stream *s)
524 {
525 uint64_t q;
526
527 STREAM_VERIFY_SANE(s);
528
529 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
530 STREAM_BOUND_WARN(s, "get quad");
531 return 0;
532 }
533
534 q = ((uint64_t)s->data[s->getp++]) << 56;
535 q |= ((uint64_t)s->data[s->getp++]) << 48;
536 q |= ((uint64_t)s->data[s->getp++]) << 40;
537 q |= ((uint64_t)s->data[s->getp++]) << 32;
538 q |= ((uint64_t)s->data[s->getp++]) << 24;
539 q |= ((uint64_t)s->data[s->getp++]) << 16;
540 q |= ((uint64_t)s->data[s->getp++]) << 8;
541 q |= ((uint64_t)s->data[s->getp++]);
542
543 return q;
544 }
545
546 bool stream_getq2(struct stream *s, uint64_t *q)
547 {
548 STREAM_VERIFY_SANE(s);
549
550 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
551 STREAM_BOUND_WARN2(s, "get uint64");
552 return false;
553 }
554
555 *q = ((uint64_t)s->data[s->getp++]) << 56;
556 *q |= ((uint64_t)s->data[s->getp++]) << 48;
557 *q |= ((uint64_t)s->data[s->getp++]) << 40;
558 *q |= ((uint64_t)s->data[s->getp++]) << 32;
559 *q |= ((uint64_t)s->data[s->getp++]) << 24;
560 *q |= ((uint64_t)s->data[s->getp++]) << 16;
561 *q |= ((uint64_t)s->data[s->getp++]) << 8;
562 *q |= ((uint64_t)s->data[s->getp++]);
563
564 return true;
565 }
566
567 /* Get next long word from the stream. */
568 uint32_t stream_get_ipv4(struct stream *s)
569 {
570 uint32_t l;
571
572 STREAM_VERIFY_SANE(s);
573
574 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
575 STREAM_BOUND_WARN(s, "get ipv4");
576 return 0;
577 }
578
579 memcpy(&l, s->data + s->getp, sizeof(uint32_t));
580 s->getp += sizeof(uint32_t);
581
582 return l;
583 }
584
585 float stream_getf(struct stream *s)
586 {
587 union {
588 float r;
589 uint32_t d;
590 } u;
591 u.d = stream_getl(s);
592 return u.r;
593 }
594
595 double stream_getd(struct stream *s)
596 {
597 union {
598 double r;
599 uint64_t d;
600 } u;
601 u.d = stream_getq(s);
602 return u.r;
603 }
604
605 /* Copy to source to stream.
606 *
607 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
608 * around. This should be fixed once the stream updates are working.
609 *
610 * stream_write() is saner
611 */
612 void stream_put(struct stream *s, const void *src, size_t size)
613 {
614
615 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
616 CHECK_SIZE(s, size);
617
618 STREAM_VERIFY_SANE(s);
619
620 if (STREAM_WRITEABLE(s) < size) {
621 STREAM_BOUND_WARN(s, "put");
622 return;
623 }
624
625 if (src)
626 memcpy(s->data + s->endp, src, size);
627 else
628 memset(s->data + s->endp, 0, size);
629
630 s->endp += size;
631 }
632
633 /* Put character to the stream. */
634 int stream_putc(struct stream *s, uint8_t c)
635 {
636 STREAM_VERIFY_SANE(s);
637
638 if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
639 STREAM_BOUND_WARN(s, "put");
640 return 0;
641 }
642
643 s->data[s->endp++] = c;
644 return sizeof(uint8_t);
645 }
646
647 /* Put word to the stream. */
648 int stream_putw(struct stream *s, uint16_t w)
649 {
650 STREAM_VERIFY_SANE(s);
651
652 if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
653 STREAM_BOUND_WARN(s, "put");
654 return 0;
655 }
656
657 s->data[s->endp++] = (uint8_t)(w >> 8);
658 s->data[s->endp++] = (uint8_t)w;
659
660 return 2;
661 }
662
663 /* Put long word to the stream. */
664 int stream_put3(struct stream *s, uint32_t l)
665 {
666 STREAM_VERIFY_SANE(s);
667
668 if (STREAM_WRITEABLE(s) < 3) {
669 STREAM_BOUND_WARN(s, "put");
670 return 0;
671 }
672
673 s->data[s->endp++] = (uint8_t)(l >> 16);
674 s->data[s->endp++] = (uint8_t)(l >> 8);
675 s->data[s->endp++] = (uint8_t)l;
676
677 return 3;
678 }
679
680 /* Put long word to the stream. */
681 int stream_putl(struct stream *s, uint32_t l)
682 {
683 STREAM_VERIFY_SANE(s);
684
685 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
686 STREAM_BOUND_WARN(s, "put");
687 return 0;
688 }
689
690 s->data[s->endp++] = (uint8_t)(l >> 24);
691 s->data[s->endp++] = (uint8_t)(l >> 16);
692 s->data[s->endp++] = (uint8_t)(l >> 8);
693 s->data[s->endp++] = (uint8_t)l;
694
695 return 4;
696 }
697
698 /* Put quad word to the stream. */
699 int stream_putq(struct stream *s, uint64_t q)
700 {
701 STREAM_VERIFY_SANE(s);
702
703 if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
704 STREAM_BOUND_WARN(s, "put quad");
705 return 0;
706 }
707
708 s->data[s->endp++] = (uint8_t)(q >> 56);
709 s->data[s->endp++] = (uint8_t)(q >> 48);
710 s->data[s->endp++] = (uint8_t)(q >> 40);
711 s->data[s->endp++] = (uint8_t)(q >> 32);
712 s->data[s->endp++] = (uint8_t)(q >> 24);
713 s->data[s->endp++] = (uint8_t)(q >> 16);
714 s->data[s->endp++] = (uint8_t)(q >> 8);
715 s->data[s->endp++] = (uint8_t)q;
716
717 return 8;
718 }
719
720 int stream_putf(struct stream *s, float f)
721 {
722 union {
723 float i;
724 uint32_t o;
725 } u;
726 u.i = f;
727 return stream_putl(s, u.o);
728 }
729
730 int stream_putd(struct stream *s, double d)
731 {
732 union {
733 double i;
734 uint64_t o;
735 } u;
736 u.i = d;
737 return stream_putq(s, u.o);
738 }
739
740 int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
741 {
742 STREAM_VERIFY_SANE(s);
743
744 if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
745 STREAM_BOUND_WARN(s, "put");
746 return 0;
747 }
748
749 s->data[putp] = c;
750
751 return 1;
752 }
753
754 int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
755 {
756 STREAM_VERIFY_SANE(s);
757
758 if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
759 STREAM_BOUND_WARN(s, "put");
760 return 0;
761 }
762
763 s->data[putp] = (uint8_t)(w >> 8);
764 s->data[putp + 1] = (uint8_t)w;
765
766 return 2;
767 }
768
769 int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
770 {
771 STREAM_VERIFY_SANE(s);
772
773 if (!PUT_AT_VALID(s, putp + 3)) {
774 STREAM_BOUND_WARN(s, "put");
775 return 0;
776 }
777 s->data[putp] = (uint8_t)(l >> 16);
778 s->data[putp + 1] = (uint8_t)(l >> 8);
779 s->data[putp + 2] = (uint8_t)l;
780
781 return 3;
782 }
783
784 int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
785 {
786 STREAM_VERIFY_SANE(s);
787
788 if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
789 STREAM_BOUND_WARN(s, "put");
790 return 0;
791 }
792 s->data[putp] = (uint8_t)(l >> 24);
793 s->data[putp + 1] = (uint8_t)(l >> 16);
794 s->data[putp + 2] = (uint8_t)(l >> 8);
795 s->data[putp + 3] = (uint8_t)l;
796
797 return 4;
798 }
799
800 int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
801 {
802 STREAM_VERIFY_SANE(s);
803
804 if (!PUT_AT_VALID(s, putp + sizeof(uint64_t))) {
805 STREAM_BOUND_WARN(s, "put");
806 return 0;
807 }
808 s->data[putp] = (uint8_t)(q >> 56);
809 s->data[putp + 1] = (uint8_t)(q >> 48);
810 s->data[putp + 2] = (uint8_t)(q >> 40);
811 s->data[putp + 3] = (uint8_t)(q >> 32);
812 s->data[putp + 4] = (uint8_t)(q >> 24);
813 s->data[putp + 5] = (uint8_t)(q >> 16);
814 s->data[putp + 6] = (uint8_t)(q >> 8);
815 s->data[putp + 7] = (uint8_t)q;
816
817 return 8;
818 }
819
820 /* Put long word to the stream. */
821 int stream_put_ipv4(struct stream *s, uint32_t l)
822 {
823 STREAM_VERIFY_SANE(s);
824
825 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
826 STREAM_BOUND_WARN(s, "put");
827 return 0;
828 }
829 memcpy(s->data + s->endp, &l, sizeof(uint32_t));
830 s->endp += sizeof(uint32_t);
831
832 return sizeof(uint32_t);
833 }
834
835 /* Put long word to the stream. */
836 int stream_put_in_addr(struct stream *s, const struct in_addr *addr)
837 {
838 STREAM_VERIFY_SANE(s);
839
840 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
841 STREAM_BOUND_WARN(s, "put");
842 return 0;
843 }
844
845 memcpy(s->data + s->endp, addr, sizeof(uint32_t));
846 s->endp += sizeof(uint32_t);
847
848 return sizeof(uint32_t);
849 }
850
851 /* Put in_addr at location in the stream. */
852 int stream_put_in_addr_at(struct stream *s, size_t putp,
853 const struct in_addr *addr)
854 {
855 STREAM_VERIFY_SANE(s);
856
857 if (!PUT_AT_VALID(s, putp + 4)) {
858 STREAM_BOUND_WARN(s, "put");
859 return 0;
860 }
861
862 memcpy(&s->data[putp], addr, 4);
863 return 4;
864 }
865
866 /* Put in6_addr at location in the stream. */
867 int stream_put_in6_addr_at(struct stream *s, size_t putp,
868 const struct in6_addr *addr)
869 {
870 STREAM_VERIFY_SANE(s);
871
872 if (!PUT_AT_VALID(s, putp + 16)) {
873 STREAM_BOUND_WARN(s, "put");
874 return 0;
875 }
876
877 memcpy(&s->data[putp], addr, 16);
878 return 16;
879 }
880
881 /* Put prefix by nlri type format. */
882 int stream_put_prefix_addpath(struct stream *s, const struct prefix *p,
883 int addpath_encode, uint32_t addpath_tx_id)
884 {
885 size_t psize;
886 size_t psize_with_addpath;
887
888 STREAM_VERIFY_SANE(s);
889
890 psize = PSIZE(p->prefixlen);
891
892 if (addpath_encode)
893 psize_with_addpath = psize + 4;
894 else
895 psize_with_addpath = psize;
896
897 if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
898 STREAM_BOUND_WARN(s, "put");
899 return 0;
900 }
901
902 if (addpath_encode) {
903 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
904 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
905 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
906 s->data[s->endp++] = (uint8_t)addpath_tx_id;
907 }
908
909 s->data[s->endp++] = p->prefixlen;
910 memcpy(s->data + s->endp, &p->u.prefix, psize);
911 s->endp += psize;
912
913 return psize;
914 }
915
916 int stream_put_prefix(struct stream *s, const struct prefix *p)
917 {
918 return stream_put_prefix_addpath(s, p, 0, 0);
919 }
920
921 /* Put NLRI with label */
922 int stream_put_labeled_prefix(struct stream *s, const struct prefix *p,
923 mpls_label_t *label, int addpath_encode,
924 uint32_t addpath_tx_id)
925 {
926 size_t psize;
927 size_t psize_with_addpath;
928 uint8_t *label_pnt = (uint8_t *)label;
929
930 STREAM_VERIFY_SANE(s);
931
932 psize = PSIZE(p->prefixlen);
933 psize_with_addpath = psize + (addpath_encode ? 4 : 0);
934
935 if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) {
936 STREAM_BOUND_WARN(s, "put");
937 return 0;
938 }
939
940 if (addpath_encode) {
941 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
942 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
943 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
944 s->data[s->endp++] = (uint8_t)addpath_tx_id;
945 }
946
947 stream_putc(s, (p->prefixlen + 24));
948 stream_putc(s, label_pnt[0]);
949 stream_putc(s, label_pnt[1]);
950 stream_putc(s, label_pnt[2]);
951 memcpy(s->data + s->endp, &p->u.prefix, psize);
952 s->endp += psize;
953
954 return (psize + 3);
955 }
956
957 /* Read size from fd. */
958 int stream_read(struct stream *s, int fd, size_t size)
959 {
960 int nbytes;
961
962 STREAM_VERIFY_SANE(s);
963
964 if (STREAM_WRITEABLE(s) < size) {
965 STREAM_BOUND_WARN(s, "put");
966 return 0;
967 }
968
969 nbytes = readn(fd, s->data + s->endp, size);
970
971 if (nbytes > 0)
972 s->endp += nbytes;
973
974 return nbytes;
975 }
976
977 ssize_t stream_read_try(struct stream *s, int fd, size_t size)
978 {
979 ssize_t nbytes;
980
981 STREAM_VERIFY_SANE(s);
982
983 if (STREAM_WRITEABLE(s) < size) {
984 STREAM_BOUND_WARN(s, "put");
985 /* Fatal (not transient) error, since retrying will not help
986 (stream is too small to contain the desired data). */
987 return -1;
988 }
989
990 if ((nbytes = read(fd, s->data + s->endp, size)) >= 0) {
991 s->endp += nbytes;
992 return nbytes;
993 }
994 /* Error: was it transient (return -2) or fatal (return -1)? */
995 if (ERRNO_IO_RETRY(errno))
996 return -2;
997 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
998 safe_strerror(errno));
999 return -1;
1000 }
1001
1002 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
1003 * whose arguments match the remaining arguments to this function
1004 */
1005 ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
1006 struct sockaddr *from, socklen_t *fromlen)
1007 {
1008 ssize_t nbytes;
1009
1010 STREAM_VERIFY_SANE(s);
1011
1012 if (STREAM_WRITEABLE(s) < size) {
1013 STREAM_BOUND_WARN(s, "put");
1014 /* Fatal (not transient) error, since retrying will not help
1015 (stream is too small to contain the desired data). */
1016 return -1;
1017 }
1018
1019 if ((nbytes = recvfrom(fd, s->data + s->endp, size, flags, from,
1020 fromlen))
1021 >= 0) {
1022 s->endp += nbytes;
1023 return nbytes;
1024 }
1025 /* Error: was it transient (return -2) or fatal (return -1)? */
1026 if (ERRNO_IO_RETRY(errno))
1027 return -2;
1028 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
1029 safe_strerror(errno));
1030 return -1;
1031 }
1032
1033 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1034 * from endp.
1035 * First iovec will be used to receive the data.
1036 * Stream need not be empty.
1037 */
1038 ssize_t stream_recvmsg(struct stream *s, int fd, struct msghdr *msgh, int flags,
1039 size_t size)
1040 {
1041 int nbytes;
1042 struct iovec *iov;
1043
1044 STREAM_VERIFY_SANE(s);
1045 assert(msgh->msg_iovlen > 0);
1046
1047 if (STREAM_WRITEABLE(s) < size) {
1048 STREAM_BOUND_WARN(s, "put");
1049 /* This is a logic error in the calling code: the stream is too
1050 small
1051 to hold the desired data! */
1052 return -1;
1053 }
1054
1055 iov = &(msgh->msg_iov[0]);
1056 iov->iov_base = (s->data + s->endp);
1057 iov->iov_len = size;
1058
1059 nbytes = recvmsg(fd, msgh, flags);
1060
1061 if (nbytes > 0)
1062 s->endp += nbytes;
1063
1064 return nbytes;
1065 }
1066
1067 /* Write data to buffer. */
1068 size_t stream_write(struct stream *s, const void *ptr, size_t size)
1069 {
1070
1071 CHECK_SIZE(s, size);
1072
1073 STREAM_VERIFY_SANE(s);
1074
1075 if (STREAM_WRITEABLE(s) < size) {
1076 STREAM_BOUND_WARN(s, "put");
1077 return 0;
1078 }
1079
1080 memcpy(s->data + s->endp, ptr, size);
1081 s->endp += size;
1082
1083 return size;
1084 }
1085
1086 /* Return current read pointer.
1087 * DEPRECATED!
1088 * Use stream_get_pnt_to if you must, but decoding streams properly
1089 * is preferred
1090 */
1091 uint8_t *stream_pnt(struct stream *s)
1092 {
1093 STREAM_VERIFY_SANE(s);
1094 return s->data + s->getp;
1095 }
1096
1097 /* Check does this stream empty? */
1098 int stream_empty(struct stream *s)
1099 {
1100 STREAM_VERIFY_SANE(s);
1101
1102 return (s->endp == 0);
1103 }
1104
1105 /* Reset stream. */
1106 void stream_reset(struct stream *s)
1107 {
1108 STREAM_VERIFY_SANE(s);
1109
1110 s->getp = s->endp = 0;
1111 }
1112
1113 /* Write stream contens to the file discriptor. */
1114 int stream_flush(struct stream *s, int fd)
1115 {
1116 int nbytes;
1117
1118 STREAM_VERIFY_SANE(s);
1119
1120 nbytes = write(fd, s->data + s->getp, s->endp - s->getp);
1121
1122 return nbytes;
1123 }
1124
1125 void stream_hexdump(const struct stream *s)
1126 {
1127 zlog_hexdump(s->data, s->endp);
1128 }
1129
1130 /* Stream first in first out queue. */
1131
1132 struct stream_fifo *stream_fifo_new(void)
1133 {
1134 struct stream_fifo *new;
1135
1136 new = XMALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
1137 stream_fifo_init(new);
1138 return new;
1139 }
1140
1141 void stream_fifo_init(struct stream_fifo *fifo)
1142 {
1143 memset(fifo, 0, sizeof(struct stream_fifo));
1144 pthread_mutex_init(&fifo->mtx, NULL);
1145 }
1146
1147 /* Add new stream to fifo. */
1148 void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
1149 {
1150 #if defined DEV_BUILD
1151 size_t max, curmax;
1152 #endif
1153
1154 if (fifo->tail)
1155 fifo->tail->next = s;
1156 else
1157 fifo->head = s;
1158
1159 fifo->tail = s;
1160 fifo->tail->next = NULL;
1161 #if !defined DEV_BUILD
1162 atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1163 #else
1164 max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1165 curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
1166 if (max > curmax)
1167 atomic_store_explicit(&fifo->max_count, max,
1168 memory_order_relaxed);
1169 #endif
1170 }
1171
1172 void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
1173 {
1174 frr_with_mutex(&fifo->mtx) {
1175 stream_fifo_push(fifo, s);
1176 }
1177 }
1178
1179 /* Delete first stream from fifo. */
1180 struct stream *stream_fifo_pop(struct stream_fifo *fifo)
1181 {
1182 struct stream *s;
1183
1184 s = fifo->head;
1185
1186 if (s) {
1187 fifo->head = s->next;
1188
1189 if (fifo->head == NULL)
1190 fifo->tail = NULL;
1191
1192 atomic_fetch_sub_explicit(&fifo->count, 1,
1193 memory_order_release);
1194
1195 /* ensure stream is scrubbed of references to this fifo */
1196 s->next = NULL;
1197 }
1198
1199 return s;
1200 }
1201
1202 struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
1203 {
1204 struct stream *ret;
1205
1206 frr_with_mutex(&fifo->mtx) {
1207 ret = stream_fifo_pop(fifo);
1208 }
1209
1210 return ret;
1211 }
1212
1213 struct stream *stream_fifo_head(struct stream_fifo *fifo)
1214 {
1215 return fifo->head;
1216 }
1217
1218 struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
1219 {
1220 struct stream *ret;
1221
1222 frr_with_mutex(&fifo->mtx) {
1223 ret = stream_fifo_head(fifo);
1224 }
1225
1226 return ret;
1227 }
1228
1229 void stream_fifo_clean(struct stream_fifo *fifo)
1230 {
1231 struct stream *s;
1232 struct stream *next;
1233
1234 for (s = fifo->head; s; s = next) {
1235 next = s->next;
1236 stream_free(s);
1237 }
1238 fifo->head = fifo->tail = NULL;
1239 atomic_store_explicit(&fifo->count, 0, memory_order_release);
1240 }
1241
1242 void stream_fifo_clean_safe(struct stream_fifo *fifo)
1243 {
1244 frr_with_mutex(&fifo->mtx) {
1245 stream_fifo_clean(fifo);
1246 }
1247 }
1248
1249 size_t stream_fifo_count_safe(struct stream_fifo *fifo)
1250 {
1251 return atomic_load_explicit(&fifo->count, memory_order_acquire);
1252 }
1253
1254 void stream_fifo_deinit(struct stream_fifo *fifo)
1255 {
1256 stream_fifo_clean(fifo);
1257 pthread_mutex_destroy(&fifo->mtx);
1258 }
1259
1260 void stream_fifo_free(struct stream_fifo *fifo)
1261 {
1262 stream_fifo_deinit(fifo);
1263 XFREE(MTYPE_STREAM_FIFO, fifo);
1264 }