]> git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
Merge pull request #8008 from chiragshah6/yang_nb5
[mirror_frr.git] / lib / stream.c
1 /*
2 * Packet interface
3 * Copyright (C) 1999 Kunihiro Ishiguro
4 *
5 * This file is part of GNU Zebra.
6 *
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
10 * later version.
11 *
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <zebra.h>
23 #include <stddef.h>
24 #include <pthread.h>
25
26 #include "stream.h"
27 #include "memory.h"
28 #include "network.h"
29 #include "prefix.h"
30 #include "log.h"
31 #include "frr_pthread.h"
32 #include "lib_errors.h"
33
34 DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream")
35 DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
36
37 /* Tests whether a position is valid */
38 #define GETP_VALID(S, G) ((G) <= (S)->endp)
39 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
40 #define ENDP_VALID(S, E) ((E) <= (S)->size)
41
42 /* asserting sanity checks. Following must be true before
43 * stream functions are called:
44 *
45 * Following must always be true of stream elements
46 * before and after calls to stream functions:
47 *
48 * getp <= endp <= size
49 *
50 * Note that after a stream function is called following may be true:
51 * if (getp == endp) then stream is no longer readable
52 * if (endp == size) then stream is no longer writeable
53 *
54 * It is valid to put to anywhere within the size of the stream, but only
55 * using stream_put..._at() functions.
56 */
57 #define STREAM_WARN_OFFSETS(S) \
58 do { \
59 flog_warn(EC_LIB_STREAM, \
60 "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu", \
61 (void *)(S), (unsigned long)(S)->size, \
62 (unsigned long)(S)->getp, (unsigned long)(S)->endp); \
63 zlog_backtrace(LOG_WARNING); \
64 } while (0)
65
66 #define STREAM_VERIFY_SANE(S) \
67 do { \
68 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) { \
69 STREAM_WARN_OFFSETS(S); \
70 } \
71 assert(GETP_VALID(S, (S)->getp)); \
72 assert(ENDP_VALID(S, (S)->endp)); \
73 } while (0)
74
75 #define STREAM_BOUND_WARN(S, WHAT) \
76 do { \
77 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
78 __func__, (WHAT)); \
79 STREAM_WARN_OFFSETS(S); \
80 assert(0); \
81 } while (0)
82
83 #define STREAM_BOUND_WARN2(S, WHAT) \
84 do { \
85 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
86 __func__, (WHAT)); \
87 STREAM_WARN_OFFSETS(S); \
88 } while (0)
89
90 /* XXX: Deprecated macro: do not use */
91 #define CHECK_SIZE(S, Z) \
92 do { \
93 if (((S)->endp + (Z)) > (S)->size) { \
94 flog_warn( \
95 EC_LIB_STREAM, \
96 "CHECK_SIZE: truncating requested size %lu", \
97 (unsigned long)(Z)); \
98 STREAM_WARN_OFFSETS(S); \
99 (Z) = (S)->size - (S)->endp; \
100 } \
101 } while (0);
102
103 /* Make stream buffer. */
104 struct stream *stream_new(size_t size)
105 {
106 struct stream *s;
107
108 assert(size > 0);
109
110 s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size);
111
112 s->getp = s->endp = 0;
113 s->next = NULL;
114 s->size = size;
115 return s;
116 }
117
118 /* Free it now. */
119 void stream_free(struct stream *s)
120 {
121 if (!s)
122 return;
123
124 XFREE(MTYPE_STREAM, s);
125 }
126
127 struct stream *stream_copy(struct stream *dest, const struct stream *src)
128 {
129 STREAM_VERIFY_SANE(src);
130
131 assert(dest != NULL);
132 assert(STREAM_SIZE(dest) >= src->endp);
133
134 dest->endp = src->endp;
135 dest->getp = src->getp;
136
137 memcpy(dest->data, src->data, src->endp);
138
139 return dest;
140 }
141
142 struct stream *stream_dup(const struct stream *s)
143 {
144 struct stream *snew;
145
146 STREAM_VERIFY_SANE(s);
147
148 if ((snew = stream_new(s->endp)) == NULL)
149 return NULL;
150
151 return (stream_copy(snew, s));
152 }
153
154 struct stream *stream_dupcat(const struct stream *s1, const struct stream *s2,
155 size_t offset)
156 {
157 struct stream *new;
158
159 STREAM_VERIFY_SANE(s1);
160 STREAM_VERIFY_SANE(s2);
161
162 if ((new = stream_new(s1->endp + s2->endp)) == NULL)
163 return NULL;
164
165 memcpy(new->data, s1->data, offset);
166 memcpy(new->data + offset, s2->data, s2->endp);
167 memcpy(new->data + offset + s2->endp, s1->data + offset,
168 (s1->endp - offset));
169 new->endp = s1->endp + s2->endp;
170 return new;
171 }
172
173 size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
174 {
175 struct stream *orig = *sptr;
176
177 STREAM_VERIFY_SANE(orig);
178
179 orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize);
180
181 orig->size = newsize;
182
183 if (orig->endp > orig->size)
184 orig->endp = orig->size;
185 if (orig->getp > orig->endp)
186 orig->getp = orig->endp;
187
188 STREAM_VERIFY_SANE(orig);
189
190 *sptr = orig;
191 return orig->size;
192 }
193
194 size_t stream_get_getp(const struct stream *s)
195 {
196 STREAM_VERIFY_SANE(s);
197 return s->getp;
198 }
199
200 size_t stream_get_endp(const struct stream *s)
201 {
202 STREAM_VERIFY_SANE(s);
203 return s->endp;
204 }
205
206 size_t stream_get_size(const struct stream *s)
207 {
208 STREAM_VERIFY_SANE(s);
209 return s->size;
210 }
211
212 /* Stream structre' stream pointer related functions. */
213 void stream_set_getp(struct stream *s, size_t pos)
214 {
215 STREAM_VERIFY_SANE(s);
216
217 if (!GETP_VALID(s, pos)) {
218 STREAM_BOUND_WARN(s, "set getp");
219 pos = s->endp;
220 }
221
222 s->getp = pos;
223 }
224
225 void stream_set_endp(struct stream *s, size_t pos)
226 {
227 STREAM_VERIFY_SANE(s);
228
229 if (!ENDP_VALID(s, pos)) {
230 STREAM_BOUND_WARN(s, "set endp");
231 return;
232 }
233
234 /*
235 * Make sure the current read pointer is not beyond the new endp.
236 */
237 if (s->getp > pos) {
238 STREAM_BOUND_WARN(s, "set endp");
239 return;
240 }
241
242 s->endp = pos;
243 STREAM_VERIFY_SANE(s);
244 }
245
246 /* Forward pointer. */
247 void stream_forward_getp(struct stream *s, size_t size)
248 {
249 STREAM_VERIFY_SANE(s);
250
251 if (!GETP_VALID(s, s->getp + size)) {
252 STREAM_BOUND_WARN(s, "seek getp");
253 return;
254 }
255
256 s->getp += size;
257 }
258
259 bool stream_forward_getp2(struct stream *s, size_t size)
260 {
261 STREAM_VERIFY_SANE(s);
262
263 if (!GETP_VALID(s, s->getp + size))
264 return false;
265
266 s->getp += size;
267
268 return true;
269 }
270
271 void stream_rewind_getp(struct stream *s, size_t size)
272 {
273 STREAM_VERIFY_SANE(s);
274
275 if (size > s->getp || !GETP_VALID(s, s->getp - size)) {
276 STREAM_BOUND_WARN(s, "rewind getp");
277 return;
278 }
279
280 s->getp -= size;
281 }
282
283 bool stream_rewind_getp2(struct stream *s, size_t size)
284 {
285 STREAM_VERIFY_SANE(s);
286
287 if (size > s->getp || !GETP_VALID(s, s->getp - size))
288 return false;
289
290 s->getp -= size;
291
292 return true;
293 }
294
295 void stream_forward_endp(struct stream *s, size_t size)
296 {
297 STREAM_VERIFY_SANE(s);
298
299 if (!ENDP_VALID(s, s->endp + size)) {
300 STREAM_BOUND_WARN(s, "seek endp");
301 return;
302 }
303
304 s->endp += size;
305 }
306
307 bool stream_forward_endp2(struct stream *s, size_t size)
308 {
309 STREAM_VERIFY_SANE(s);
310
311 if (!ENDP_VALID(s, s->endp + size))
312 return false;
313
314 s->endp += size;
315
316 return true;
317 }
318
319 /* Copy from stream to destination. */
320 bool stream_get2(void *dst, struct stream *s, size_t size)
321 {
322 STREAM_VERIFY_SANE(s);
323
324 if (STREAM_READABLE(s) < size) {
325 STREAM_BOUND_WARN2(s, "get");
326 return false;
327 }
328
329 memcpy(dst, s->data + s->getp, size);
330 s->getp += size;
331
332 return true;
333 }
334
335 void stream_get(void *dst, struct stream *s, size_t size)
336 {
337 STREAM_VERIFY_SANE(s);
338
339 if (STREAM_READABLE(s) < size) {
340 STREAM_BOUND_WARN(s, "get");
341 return;
342 }
343
344 memcpy(dst, s->data + s->getp, size);
345 s->getp += size;
346 }
347
348 /* Get next character from the stream. */
349 bool stream_getc2(struct stream *s, uint8_t *byte)
350 {
351 STREAM_VERIFY_SANE(s);
352
353 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
354 STREAM_BOUND_WARN2(s, "get char");
355 return false;
356 }
357 *byte = s->data[s->getp++];
358
359 return true;
360 }
361
362 uint8_t stream_getc(struct stream *s)
363 {
364 uint8_t c;
365
366 STREAM_VERIFY_SANE(s);
367
368 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
369 STREAM_BOUND_WARN(s, "get char");
370 return 0;
371 }
372 c = s->data[s->getp++];
373
374 return c;
375 }
376
377 /* Get next character from the stream. */
378 uint8_t stream_getc_from(struct stream *s, size_t from)
379 {
380 uint8_t c;
381
382 STREAM_VERIFY_SANE(s);
383
384 if (!GETP_VALID(s, from + sizeof(uint8_t))) {
385 STREAM_BOUND_WARN(s, "get char");
386 return 0;
387 }
388
389 c = s->data[from];
390
391 return c;
392 }
393
394 bool stream_getw2(struct stream *s, uint16_t *word)
395 {
396 STREAM_VERIFY_SANE(s);
397
398 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
399 STREAM_BOUND_WARN2(s, "get ");
400 return false;
401 }
402
403 *word = s->data[s->getp++] << 8;
404 *word |= s->data[s->getp++];
405
406 return true;
407 }
408
409 /* Get next word from the stream. */
410 uint16_t stream_getw(struct stream *s)
411 {
412 uint16_t w;
413
414 STREAM_VERIFY_SANE(s);
415
416 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
417 STREAM_BOUND_WARN(s, "get ");
418 return 0;
419 }
420
421 w = s->data[s->getp++] << 8;
422 w |= s->data[s->getp++];
423
424 return w;
425 }
426
427 /* Get next word from the stream. */
428 uint16_t stream_getw_from(struct stream *s, size_t from)
429 {
430 uint16_t w;
431
432 STREAM_VERIFY_SANE(s);
433
434 if (!GETP_VALID(s, from + sizeof(uint16_t))) {
435 STREAM_BOUND_WARN(s, "get ");
436 return 0;
437 }
438
439 w = s->data[from++] << 8;
440 w |= s->data[from];
441
442 return w;
443 }
444
445 /* Get next 3-byte from the stream. */
446 uint32_t stream_get3_from(struct stream *s, size_t from)
447 {
448 uint32_t l;
449
450 STREAM_VERIFY_SANE(s);
451
452 if (!GETP_VALID(s, from + 3)) {
453 STREAM_BOUND_WARN(s, "get 3byte");
454 return 0;
455 }
456
457 l = s->data[from++] << 16;
458 l |= s->data[from++] << 8;
459 l |= s->data[from];
460
461 return l;
462 }
463
464 uint32_t stream_get3(struct stream *s)
465 {
466 uint32_t l;
467
468 STREAM_VERIFY_SANE(s);
469
470 if (STREAM_READABLE(s) < 3) {
471 STREAM_BOUND_WARN(s, "get 3byte");
472 return 0;
473 }
474
475 l = s->data[s->getp++] << 16;
476 l |= s->data[s->getp++] << 8;
477 l |= s->data[s->getp++];
478
479 return l;
480 }
481
482 /* Get next long word from the stream. */
483 uint32_t stream_getl_from(struct stream *s, size_t from)
484 {
485 uint32_t l;
486
487 STREAM_VERIFY_SANE(s);
488
489 if (!GETP_VALID(s, from + sizeof(uint32_t))) {
490 STREAM_BOUND_WARN(s, "get long");
491 return 0;
492 }
493
494 l = (unsigned)(s->data[from++]) << 24;
495 l |= s->data[from++] << 16;
496 l |= s->data[from++] << 8;
497 l |= s->data[from];
498
499 return l;
500 }
501
502 /* Copy from stream at specific location to destination. */
503 void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
504 {
505 STREAM_VERIFY_SANE(s);
506
507 if (!GETP_VALID(s, from + size)) {
508 STREAM_BOUND_WARN(s, "get from");
509 return;
510 }
511
512 memcpy(dst, s->data + from, size);
513 }
514
515 bool stream_getl2(struct stream *s, uint32_t *l)
516 {
517 STREAM_VERIFY_SANE(s);
518
519 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
520 STREAM_BOUND_WARN2(s, "get long");
521 return false;
522 }
523
524 *l = (unsigned int)(s->data[s->getp++]) << 24;
525 *l |= s->data[s->getp++] << 16;
526 *l |= s->data[s->getp++] << 8;
527 *l |= s->data[s->getp++];
528
529 return true;
530 }
531
532 uint32_t stream_getl(struct stream *s)
533 {
534 uint32_t l;
535
536 STREAM_VERIFY_SANE(s);
537
538 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
539 STREAM_BOUND_WARN(s, "get long");
540 return 0;
541 }
542
543 l = (unsigned)(s->data[s->getp++]) << 24;
544 l |= s->data[s->getp++] << 16;
545 l |= s->data[s->getp++] << 8;
546 l |= s->data[s->getp++];
547
548 return l;
549 }
550
551 /* Get next quad word from the stream. */
552 uint64_t stream_getq_from(struct stream *s, size_t from)
553 {
554 uint64_t q;
555
556 STREAM_VERIFY_SANE(s);
557
558 if (!GETP_VALID(s, from + sizeof(uint64_t))) {
559 STREAM_BOUND_WARN(s, "get quad");
560 return 0;
561 }
562
563 q = ((uint64_t)s->data[from++]) << 56;
564 q |= ((uint64_t)s->data[from++]) << 48;
565 q |= ((uint64_t)s->data[from++]) << 40;
566 q |= ((uint64_t)s->data[from++]) << 32;
567 q |= ((uint64_t)s->data[from++]) << 24;
568 q |= ((uint64_t)s->data[from++]) << 16;
569 q |= ((uint64_t)s->data[from++]) << 8;
570 q |= ((uint64_t)s->data[from++]);
571
572 return q;
573 }
574
575 uint64_t stream_getq(struct stream *s)
576 {
577 uint64_t q;
578
579 STREAM_VERIFY_SANE(s);
580
581 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
582 STREAM_BOUND_WARN(s, "get quad");
583 return 0;
584 }
585
586 q = ((uint64_t)s->data[s->getp++]) << 56;
587 q |= ((uint64_t)s->data[s->getp++]) << 48;
588 q |= ((uint64_t)s->data[s->getp++]) << 40;
589 q |= ((uint64_t)s->data[s->getp++]) << 32;
590 q |= ((uint64_t)s->data[s->getp++]) << 24;
591 q |= ((uint64_t)s->data[s->getp++]) << 16;
592 q |= ((uint64_t)s->data[s->getp++]) << 8;
593 q |= ((uint64_t)s->data[s->getp++]);
594
595 return q;
596 }
597
598 bool stream_getq2(struct stream *s, uint64_t *q)
599 {
600 STREAM_VERIFY_SANE(s);
601
602 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
603 STREAM_BOUND_WARN2(s, "get uint64");
604 return false;
605 }
606
607 *q = ((uint64_t)s->data[s->getp++]) << 56;
608 *q |= ((uint64_t)s->data[s->getp++]) << 48;
609 *q |= ((uint64_t)s->data[s->getp++]) << 40;
610 *q |= ((uint64_t)s->data[s->getp++]) << 32;
611 *q |= ((uint64_t)s->data[s->getp++]) << 24;
612 *q |= ((uint64_t)s->data[s->getp++]) << 16;
613 *q |= ((uint64_t)s->data[s->getp++]) << 8;
614 *q |= ((uint64_t)s->data[s->getp++]);
615
616 return true;
617 }
618
619 /* Get next long word from the stream. */
620 uint32_t stream_get_ipv4(struct stream *s)
621 {
622 uint32_t l;
623
624 STREAM_VERIFY_SANE(s);
625
626 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
627 STREAM_BOUND_WARN(s, "get ipv4");
628 return 0;
629 }
630
631 memcpy(&l, s->data + s->getp, sizeof(uint32_t));
632 s->getp += sizeof(uint32_t);
633
634 return l;
635 }
636
637 bool stream_get_ipaddr(struct stream *s, struct ipaddr *ip)
638 {
639 uint16_t ipa_len;
640
641 STREAM_VERIFY_SANE(s);
642
643 /* Get address type. */
644 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
645 STREAM_BOUND_WARN2(s, "get ipaddr");
646 return false;
647 }
648 ip->ipa_type = stream_getw(s);
649
650 /* Get address value. */
651 switch (ip->ipa_type) {
652 case IPADDR_V4:
653 ipa_len = IPV4_MAX_BYTELEN;
654 break;
655 case IPADDR_V6:
656 ipa_len = IPV6_MAX_BYTELEN;
657 break;
658 default:
659 flog_err(EC_LIB_DEVELOPMENT,
660 "%s: unknown ip address-family: %u", __func__,
661 ip->ipa_type);
662 return false;
663 }
664 if (STREAM_READABLE(s) < ipa_len) {
665 STREAM_BOUND_WARN2(s, "get ipaddr");
666 return false;
667 }
668 memcpy(&ip->ip, s->data + s->getp, ipa_len);
669 s->getp += ipa_len;
670
671 return true;
672 }
673
674 float stream_getf(struct stream *s)
675 {
676 union {
677 float r;
678 uint32_t d;
679 } u;
680 u.d = stream_getl(s);
681 return u.r;
682 }
683
684 double stream_getd(struct stream *s)
685 {
686 union {
687 double r;
688 uint64_t d;
689 } u;
690 u.d = stream_getq(s);
691 return u.r;
692 }
693
694 /* Copy to source to stream.
695 *
696 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
697 * around. This should be fixed once the stream updates are working.
698 *
699 * stream_write() is saner
700 */
701 void stream_put(struct stream *s, const void *src, size_t size)
702 {
703
704 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
705 CHECK_SIZE(s, size);
706
707 STREAM_VERIFY_SANE(s);
708
709 if (STREAM_WRITEABLE(s) < size) {
710 STREAM_BOUND_WARN(s, "put");
711 return;
712 }
713
714 if (src)
715 memcpy(s->data + s->endp, src, size);
716 else
717 memset(s->data + s->endp, 0, size);
718
719 s->endp += size;
720 }
721
722 /* Put character to the stream. */
723 int stream_putc(struct stream *s, uint8_t c)
724 {
725 STREAM_VERIFY_SANE(s);
726
727 if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
728 STREAM_BOUND_WARN(s, "put");
729 return 0;
730 }
731
732 s->data[s->endp++] = c;
733 return sizeof(uint8_t);
734 }
735
736 /* Put word to the stream. */
737 int stream_putw(struct stream *s, uint16_t w)
738 {
739 STREAM_VERIFY_SANE(s);
740
741 if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
742 STREAM_BOUND_WARN(s, "put");
743 return 0;
744 }
745
746 s->data[s->endp++] = (uint8_t)(w >> 8);
747 s->data[s->endp++] = (uint8_t)w;
748
749 return 2;
750 }
751
752 /* Put long word to the stream. */
753 int stream_put3(struct stream *s, uint32_t l)
754 {
755 STREAM_VERIFY_SANE(s);
756
757 if (STREAM_WRITEABLE(s) < 3) {
758 STREAM_BOUND_WARN(s, "put");
759 return 0;
760 }
761
762 s->data[s->endp++] = (uint8_t)(l >> 16);
763 s->data[s->endp++] = (uint8_t)(l >> 8);
764 s->data[s->endp++] = (uint8_t)l;
765
766 return 3;
767 }
768
769 /* Put long word to the stream. */
770 int stream_putl(struct stream *s, uint32_t l)
771 {
772 STREAM_VERIFY_SANE(s);
773
774 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
775 STREAM_BOUND_WARN(s, "put");
776 return 0;
777 }
778
779 s->data[s->endp++] = (uint8_t)(l >> 24);
780 s->data[s->endp++] = (uint8_t)(l >> 16);
781 s->data[s->endp++] = (uint8_t)(l >> 8);
782 s->data[s->endp++] = (uint8_t)l;
783
784 return 4;
785 }
786
787 /* Put quad word to the stream. */
788 int stream_putq(struct stream *s, uint64_t q)
789 {
790 STREAM_VERIFY_SANE(s);
791
792 if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
793 STREAM_BOUND_WARN(s, "put quad");
794 return 0;
795 }
796
797 s->data[s->endp++] = (uint8_t)(q >> 56);
798 s->data[s->endp++] = (uint8_t)(q >> 48);
799 s->data[s->endp++] = (uint8_t)(q >> 40);
800 s->data[s->endp++] = (uint8_t)(q >> 32);
801 s->data[s->endp++] = (uint8_t)(q >> 24);
802 s->data[s->endp++] = (uint8_t)(q >> 16);
803 s->data[s->endp++] = (uint8_t)(q >> 8);
804 s->data[s->endp++] = (uint8_t)q;
805
806 return 8;
807 }
808
809 int stream_putf(struct stream *s, float f)
810 {
811 union {
812 float i;
813 uint32_t o;
814 } u;
815 u.i = f;
816 return stream_putl(s, u.o);
817 }
818
819 int stream_putd(struct stream *s, double d)
820 {
821 union {
822 double i;
823 uint64_t o;
824 } u;
825 u.i = d;
826 return stream_putq(s, u.o);
827 }
828
829 int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
830 {
831 STREAM_VERIFY_SANE(s);
832
833 if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
834 STREAM_BOUND_WARN(s, "put");
835 return 0;
836 }
837
838 s->data[putp] = c;
839
840 return 1;
841 }
842
843 int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
844 {
845 STREAM_VERIFY_SANE(s);
846
847 if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
848 STREAM_BOUND_WARN(s, "put");
849 return 0;
850 }
851
852 s->data[putp] = (uint8_t)(w >> 8);
853 s->data[putp + 1] = (uint8_t)w;
854
855 return 2;
856 }
857
858 int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
859 {
860 STREAM_VERIFY_SANE(s);
861
862 if (!PUT_AT_VALID(s, putp + 3)) {
863 STREAM_BOUND_WARN(s, "put");
864 return 0;
865 }
866 s->data[putp] = (uint8_t)(l >> 16);
867 s->data[putp + 1] = (uint8_t)(l >> 8);
868 s->data[putp + 2] = (uint8_t)l;
869
870 return 3;
871 }
872
873 int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
874 {
875 STREAM_VERIFY_SANE(s);
876
877 if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
878 STREAM_BOUND_WARN(s, "put");
879 return 0;
880 }
881 s->data[putp] = (uint8_t)(l >> 24);
882 s->data[putp + 1] = (uint8_t)(l >> 16);
883 s->data[putp + 2] = (uint8_t)(l >> 8);
884 s->data[putp + 3] = (uint8_t)l;
885
886 return 4;
887 }
888
889 int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
890 {
891 STREAM_VERIFY_SANE(s);
892
893 if (!PUT_AT_VALID(s, putp + sizeof(uint64_t))) {
894 STREAM_BOUND_WARN(s, "put");
895 return 0;
896 }
897 s->data[putp] = (uint8_t)(q >> 56);
898 s->data[putp + 1] = (uint8_t)(q >> 48);
899 s->data[putp + 2] = (uint8_t)(q >> 40);
900 s->data[putp + 3] = (uint8_t)(q >> 32);
901 s->data[putp + 4] = (uint8_t)(q >> 24);
902 s->data[putp + 5] = (uint8_t)(q >> 16);
903 s->data[putp + 6] = (uint8_t)(q >> 8);
904 s->data[putp + 7] = (uint8_t)q;
905
906 return 8;
907 }
908
909 /* Put long word to the stream. */
910 int stream_put_ipv4(struct stream *s, uint32_t l)
911 {
912 STREAM_VERIFY_SANE(s);
913
914 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
915 STREAM_BOUND_WARN(s, "put");
916 return 0;
917 }
918 memcpy(s->data + s->endp, &l, sizeof(uint32_t));
919 s->endp += sizeof(uint32_t);
920
921 return sizeof(uint32_t);
922 }
923
924 /* Put long word to the stream. */
925 int stream_put_in_addr(struct stream *s, const struct in_addr *addr)
926 {
927 STREAM_VERIFY_SANE(s);
928
929 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
930 STREAM_BOUND_WARN(s, "put");
931 return 0;
932 }
933
934 memcpy(s->data + s->endp, addr, sizeof(uint32_t));
935 s->endp += sizeof(uint32_t);
936
937 return sizeof(uint32_t);
938 }
939
940 bool stream_put_ipaddr(struct stream *s, struct ipaddr *ip)
941 {
942 stream_putw(s, ip->ipa_type);
943
944 switch (ip->ipa_type) {
945 case IPADDR_V4:
946 stream_put_in_addr(s, &ip->ipaddr_v4);
947 break;
948 case IPADDR_V6:
949 stream_write(s, (uint8_t *)&ip->ipaddr_v6, 16);
950 break;
951 default:
952 flog_err(EC_LIB_DEVELOPMENT,
953 "%s: unknown ip address-family: %u", __func__,
954 ip->ipa_type);
955 return false;
956 }
957
958 return true;
959 }
960
961 /* Put in_addr at location in the stream. */
962 int stream_put_in_addr_at(struct stream *s, size_t putp,
963 const struct in_addr *addr)
964 {
965 STREAM_VERIFY_SANE(s);
966
967 if (!PUT_AT_VALID(s, putp + 4)) {
968 STREAM_BOUND_WARN(s, "put");
969 return 0;
970 }
971
972 memcpy(&s->data[putp], addr, 4);
973 return 4;
974 }
975
976 /* Put in6_addr at location in the stream. */
977 int stream_put_in6_addr_at(struct stream *s, size_t putp,
978 const struct in6_addr *addr)
979 {
980 STREAM_VERIFY_SANE(s);
981
982 if (!PUT_AT_VALID(s, putp + 16)) {
983 STREAM_BOUND_WARN(s, "put");
984 return 0;
985 }
986
987 memcpy(&s->data[putp], addr, 16);
988 return 16;
989 }
990
991 /* Put prefix by nlri type format. */
992 int stream_put_prefix_addpath(struct stream *s, const struct prefix *p,
993 int addpath_encode, uint32_t addpath_tx_id)
994 {
995 size_t psize;
996 size_t psize_with_addpath;
997
998 STREAM_VERIFY_SANE(s);
999
1000 psize = PSIZE(p->prefixlen);
1001
1002 if (addpath_encode)
1003 psize_with_addpath = psize + 4;
1004 else
1005 psize_with_addpath = psize;
1006
1007 if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
1008 STREAM_BOUND_WARN(s, "put");
1009 return 0;
1010 }
1011
1012 if (addpath_encode) {
1013 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
1014 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
1015 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
1016 s->data[s->endp++] = (uint8_t)addpath_tx_id;
1017 }
1018
1019 s->data[s->endp++] = p->prefixlen;
1020 memcpy(s->data + s->endp, &p->u.prefix, psize);
1021 s->endp += psize;
1022
1023 return psize;
1024 }
1025
1026 int stream_put_prefix(struct stream *s, const struct prefix *p)
1027 {
1028 return stream_put_prefix_addpath(s, p, 0, 0);
1029 }
1030
1031 /* Put NLRI with label */
1032 int stream_put_labeled_prefix(struct stream *s, const struct prefix *p,
1033 mpls_label_t *label, int addpath_encode,
1034 uint32_t addpath_tx_id)
1035 {
1036 size_t psize;
1037 size_t psize_with_addpath;
1038 uint8_t *label_pnt = (uint8_t *)label;
1039
1040 STREAM_VERIFY_SANE(s);
1041
1042 psize = PSIZE(p->prefixlen);
1043 psize_with_addpath = psize + (addpath_encode ? 4 : 0);
1044
1045 if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) {
1046 STREAM_BOUND_WARN(s, "put");
1047 return 0;
1048 }
1049
1050 if (addpath_encode) {
1051 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
1052 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
1053 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
1054 s->data[s->endp++] = (uint8_t)addpath_tx_id;
1055 }
1056
1057 stream_putc(s, (p->prefixlen + 24));
1058 stream_putc(s, label_pnt[0]);
1059 stream_putc(s, label_pnt[1]);
1060 stream_putc(s, label_pnt[2]);
1061 memcpy(s->data + s->endp, &p->u.prefix, psize);
1062 s->endp += psize;
1063
1064 return (psize + 3);
1065 }
1066
1067 /* Read size from fd. */
1068 int stream_read(struct stream *s, int fd, size_t size)
1069 {
1070 int nbytes;
1071
1072 STREAM_VERIFY_SANE(s);
1073
1074 if (STREAM_WRITEABLE(s) < size) {
1075 STREAM_BOUND_WARN(s, "put");
1076 return 0;
1077 }
1078
1079 nbytes = readn(fd, s->data + s->endp, size);
1080
1081 if (nbytes > 0)
1082 s->endp += nbytes;
1083
1084 return nbytes;
1085 }
1086
1087 ssize_t stream_read_try(struct stream *s, int fd, size_t size)
1088 {
1089 ssize_t nbytes;
1090
1091 STREAM_VERIFY_SANE(s);
1092
1093 if (STREAM_WRITEABLE(s) < size) {
1094 STREAM_BOUND_WARN(s, "put");
1095 /* Fatal (not transient) error, since retrying will not help
1096 (stream is too small to contain the desired data). */
1097 return -1;
1098 }
1099
1100 if ((nbytes = read(fd, s->data + s->endp, size)) >= 0) {
1101 s->endp += nbytes;
1102 return nbytes;
1103 }
1104 /* Error: was it transient (return -2) or fatal (return -1)? */
1105 if (ERRNO_IO_RETRY(errno))
1106 return -2;
1107 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
1108 safe_strerror(errno));
1109 return -1;
1110 }
1111
1112 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
1113 * whose arguments match the remaining arguments to this function
1114 */
1115 ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
1116 struct sockaddr *from, socklen_t *fromlen)
1117 {
1118 ssize_t nbytes;
1119
1120 STREAM_VERIFY_SANE(s);
1121
1122 if (STREAM_WRITEABLE(s) < size) {
1123 STREAM_BOUND_WARN(s, "put");
1124 /* Fatal (not transient) error, since retrying will not help
1125 (stream is too small to contain the desired data). */
1126 return -1;
1127 }
1128
1129 if ((nbytes = recvfrom(fd, s->data + s->endp, size, flags, from,
1130 fromlen))
1131 >= 0) {
1132 s->endp += nbytes;
1133 return nbytes;
1134 }
1135 /* Error: was it transient (return -2) or fatal (return -1)? */
1136 if (ERRNO_IO_RETRY(errno))
1137 return -2;
1138 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
1139 safe_strerror(errno));
1140 return -1;
1141 }
1142
1143 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1144 * from endp.
1145 * First iovec will be used to receive the data.
1146 * Stream need not be empty.
1147 */
1148 ssize_t stream_recvmsg(struct stream *s, int fd, struct msghdr *msgh, int flags,
1149 size_t size)
1150 {
1151 int nbytes;
1152 struct iovec *iov;
1153
1154 STREAM_VERIFY_SANE(s);
1155 assert(msgh->msg_iovlen > 0);
1156
1157 if (STREAM_WRITEABLE(s) < size) {
1158 STREAM_BOUND_WARN(s, "put");
1159 /* This is a logic error in the calling code: the stream is too
1160 small
1161 to hold the desired data! */
1162 return -1;
1163 }
1164
1165 iov = &(msgh->msg_iov[0]);
1166 iov->iov_base = (s->data + s->endp);
1167 iov->iov_len = size;
1168
1169 nbytes = recvmsg(fd, msgh, flags);
1170
1171 if (nbytes > 0)
1172 s->endp += nbytes;
1173
1174 return nbytes;
1175 }
1176
1177 /* Write data to buffer. */
1178 size_t stream_write(struct stream *s, const void *ptr, size_t size)
1179 {
1180
1181 CHECK_SIZE(s, size);
1182
1183 STREAM_VERIFY_SANE(s);
1184
1185 if (STREAM_WRITEABLE(s) < size) {
1186 STREAM_BOUND_WARN(s, "put");
1187 return 0;
1188 }
1189
1190 memcpy(s->data + s->endp, ptr, size);
1191 s->endp += size;
1192
1193 return size;
1194 }
1195
1196 /* Return current read pointer.
1197 * DEPRECATED!
1198 * Use stream_get_pnt_to if you must, but decoding streams properly
1199 * is preferred
1200 */
1201 uint8_t *stream_pnt(struct stream *s)
1202 {
1203 STREAM_VERIFY_SANE(s);
1204 return s->data + s->getp;
1205 }
1206
1207 /* Check does this stream empty? */
1208 int stream_empty(struct stream *s)
1209 {
1210 STREAM_VERIFY_SANE(s);
1211
1212 return (s->endp == 0);
1213 }
1214
1215 /* Reset stream. */
1216 void stream_reset(struct stream *s)
1217 {
1218 STREAM_VERIFY_SANE(s);
1219
1220 s->getp = s->endp = 0;
1221 }
1222
1223 /* Write stream contens to the file discriptor. */
1224 int stream_flush(struct stream *s, int fd)
1225 {
1226 int nbytes;
1227
1228 STREAM_VERIFY_SANE(s);
1229
1230 nbytes = write(fd, s->data + s->getp, s->endp - s->getp);
1231
1232 return nbytes;
1233 }
1234
1235 void stream_hexdump(const struct stream *s)
1236 {
1237 zlog_hexdump(s->data, s->endp);
1238 }
1239
1240 /* Stream first in first out queue. */
1241
1242 struct stream_fifo *stream_fifo_new(void)
1243 {
1244 struct stream_fifo *new;
1245
1246 new = XMALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
1247 stream_fifo_init(new);
1248 return new;
1249 }
1250
1251 void stream_fifo_init(struct stream_fifo *fifo)
1252 {
1253 memset(fifo, 0, sizeof(struct stream_fifo));
1254 pthread_mutex_init(&fifo->mtx, NULL);
1255 }
1256
1257 /* Add new stream to fifo. */
1258 void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
1259 {
1260 #if defined DEV_BUILD
1261 size_t max, curmax;
1262 #endif
1263
1264 if (fifo->tail)
1265 fifo->tail->next = s;
1266 else
1267 fifo->head = s;
1268
1269 fifo->tail = s;
1270 fifo->tail->next = NULL;
1271 #if !defined DEV_BUILD
1272 atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1273 #else
1274 max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1275 curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
1276 if (max > curmax)
1277 atomic_store_explicit(&fifo->max_count, max,
1278 memory_order_relaxed);
1279 #endif
1280 }
1281
1282 void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
1283 {
1284 frr_with_mutex(&fifo->mtx) {
1285 stream_fifo_push(fifo, s);
1286 }
1287 }
1288
1289 /* Delete first stream from fifo. */
1290 struct stream *stream_fifo_pop(struct stream_fifo *fifo)
1291 {
1292 struct stream *s;
1293
1294 s = fifo->head;
1295
1296 if (s) {
1297 fifo->head = s->next;
1298
1299 if (fifo->head == NULL)
1300 fifo->tail = NULL;
1301
1302 atomic_fetch_sub_explicit(&fifo->count, 1,
1303 memory_order_release);
1304
1305 /* ensure stream is scrubbed of references to this fifo */
1306 s->next = NULL;
1307 }
1308
1309 return s;
1310 }
1311
1312 struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
1313 {
1314 struct stream *ret;
1315
1316 frr_with_mutex(&fifo->mtx) {
1317 ret = stream_fifo_pop(fifo);
1318 }
1319
1320 return ret;
1321 }
1322
1323 struct stream *stream_fifo_head(struct stream_fifo *fifo)
1324 {
1325 return fifo->head;
1326 }
1327
1328 struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
1329 {
1330 struct stream *ret;
1331
1332 frr_with_mutex(&fifo->mtx) {
1333 ret = stream_fifo_head(fifo);
1334 }
1335
1336 return ret;
1337 }
1338
1339 void stream_fifo_clean(struct stream_fifo *fifo)
1340 {
1341 struct stream *s;
1342 struct stream *next;
1343
1344 for (s = fifo->head; s; s = next) {
1345 next = s->next;
1346 stream_free(s);
1347 }
1348 fifo->head = fifo->tail = NULL;
1349 atomic_store_explicit(&fifo->count, 0, memory_order_release);
1350 }
1351
1352 void stream_fifo_clean_safe(struct stream_fifo *fifo)
1353 {
1354 frr_with_mutex(&fifo->mtx) {
1355 stream_fifo_clean(fifo);
1356 }
1357 }
1358
1359 size_t stream_fifo_count_safe(struct stream_fifo *fifo)
1360 {
1361 return atomic_load_explicit(&fifo->count, memory_order_acquire);
1362 }
1363
1364 void stream_fifo_deinit(struct stream_fifo *fifo)
1365 {
1366 stream_fifo_clean(fifo);
1367 pthread_mutex_destroy(&fifo->mtx);
1368 }
1369
1370 void stream_fifo_free(struct stream_fifo *fifo)
1371 {
1372 stream_fifo_deinit(fifo);
1373 XFREE(MTYPE_STREAM_FIFO, fifo);
1374 }
1375
1376 void stream_pulldown(struct stream *s)
1377 {
1378 size_t rlen = STREAM_READABLE(s);
1379
1380 /* No more data, so just move the pointers. */
1381 if (rlen == 0) {
1382 stream_reset(s);
1383 return;
1384 }
1385
1386 /* Move the available data to the beginning. */
1387 memmove(s->data, &s->data[s->getp], rlen);
1388 s->getp = 0;
1389 s->endp = rlen;
1390 }