]> git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
Merge pull request #13649 from donaldsharp/unlock_the_node_or_else
[mirror_frr.git] / lib / stream.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * Packet interface
4 * Copyright (C) 1999 Kunihiro Ishiguro
5 */
6
7 #include <zebra.h>
8 #include <stddef.h>
9 #include <pthread.h>
10
11 #include "stream.h"
12 #include "memory.h"
13 #include "network.h"
14 #include "prefix.h"
15 #include "log.h"
16 #include "frr_pthread.h"
17 #include "lib_errors.h"
18
19 DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream");
20 DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO");
21
22 /* Tests whether a position is valid */
23 #define GETP_VALID(S, G) ((G) <= (S)->endp)
24 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
25 #define ENDP_VALID(S, E) ((E) <= (S)->size)
26
27 /* asserting sanity checks. Following must be true before
28 * stream functions are called:
29 *
30 * Following must always be true of stream elements
31 * before and after calls to stream functions:
32 *
33 * getp <= endp <= size
34 *
35 * Note that after a stream function is called following may be true:
36 * if (getp == endp) then stream is no longer readable
37 * if (endp == size) then stream is no longer writeable
38 *
39 * It is valid to put to anywhere within the size of the stream, but only
40 * using stream_put..._at() functions.
41 */
42 #define STREAM_WARN_OFFSETS(S) \
43 do { \
44 flog_warn(EC_LIB_STREAM, \
45 "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu", \
46 (void *)(S), (unsigned long)(S)->size, \
47 (unsigned long)(S)->getp, (unsigned long)(S)->endp); \
48 zlog_backtrace(LOG_WARNING); \
49 } while (0)
50
51 #define STREAM_VERIFY_SANE(S) \
52 do { \
53 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) { \
54 STREAM_WARN_OFFSETS(S); \
55 } \
56 assert(GETP_VALID(S, (S)->getp)); \
57 assert(ENDP_VALID(S, (S)->endp)); \
58 } while (0)
59
60 #define STREAM_BOUND_WARN(S, WHAT) \
61 do { \
62 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
63 __func__, (WHAT)); \
64 STREAM_WARN_OFFSETS(S); \
65 assert(0); \
66 } while (0)
67
68 #define STREAM_BOUND_WARN2(S, WHAT) \
69 do { \
70 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
71 __func__, (WHAT)); \
72 STREAM_WARN_OFFSETS(S); \
73 } while (0)
74
75 /* XXX: Deprecated macro: do not use */
76 #define CHECK_SIZE(S, Z) \
77 do { \
78 if (((S)->endp + (Z)) > (S)->size) { \
79 flog_warn( \
80 EC_LIB_STREAM, \
81 "CHECK_SIZE: truncating requested size %lu", \
82 (unsigned long)(Z)); \
83 STREAM_WARN_OFFSETS(S); \
84 (Z) = (S)->size - (S)->endp; \
85 } \
86 } while (0);
87
88 /* Make stream buffer. */
89 struct stream *stream_new(size_t size)
90 {
91 struct stream *s;
92
93 assert(size > 0);
94
95 s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size);
96
97 s->getp = s->endp = 0;
98 s->next = NULL;
99 s->size = size;
100 return s;
101 }
102
103 /* Free it now. */
104 void stream_free(struct stream *s)
105 {
106 if (!s)
107 return;
108
109 XFREE(MTYPE_STREAM, s);
110 }
111
112 struct stream *stream_copy(struct stream *dest, const struct stream *src)
113 {
114 STREAM_VERIFY_SANE(src);
115
116 assert(dest != NULL);
117 assert(STREAM_SIZE(dest) >= src->endp);
118
119 dest->endp = src->endp;
120 dest->getp = src->getp;
121
122 memcpy(dest->data, src->data, src->endp);
123
124 return dest;
125 }
126
127 struct stream *stream_dup(const struct stream *s)
128 {
129 struct stream *snew;
130
131 STREAM_VERIFY_SANE(s);
132
133 snew = stream_new(s->endp);
134
135 return (stream_copy(snew, s));
136 }
137
138 struct stream *stream_dupcat(const struct stream *s1, const struct stream *s2,
139 size_t offset)
140 {
141 struct stream *new;
142
143 STREAM_VERIFY_SANE(s1);
144 STREAM_VERIFY_SANE(s2);
145
146 if ((new = stream_new(s1->endp + s2->endp)) == NULL)
147 return NULL;
148
149 memcpy(new->data, s1->data, offset);
150 memcpy(new->data + offset, s2->data, s2->endp);
151 memcpy(new->data + offset + s2->endp, s1->data + offset,
152 (s1->endp - offset));
153 new->endp = s1->endp + s2->endp;
154 return new;
155 }
156
157 size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
158 {
159 struct stream *orig = *sptr;
160
161 STREAM_VERIFY_SANE(orig);
162
163 orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize);
164
165 orig->size = newsize;
166
167 if (orig->endp > orig->size)
168 orig->endp = orig->size;
169 if (orig->getp > orig->endp)
170 orig->getp = orig->endp;
171
172 STREAM_VERIFY_SANE(orig);
173
174 *sptr = orig;
175 return orig->size;
176 }
177
178 size_t stream_get_getp(const struct stream *s)
179 {
180 STREAM_VERIFY_SANE(s);
181 return s->getp;
182 }
183
184 size_t stream_get_endp(const struct stream *s)
185 {
186 STREAM_VERIFY_SANE(s);
187 return s->endp;
188 }
189
190 size_t stream_get_size(const struct stream *s)
191 {
192 STREAM_VERIFY_SANE(s);
193 return s->size;
194 }
195
196 /* Stream structre' stream pointer related functions. */
197 void stream_set_getp(struct stream *s, size_t pos)
198 {
199 STREAM_VERIFY_SANE(s);
200
201 if (!GETP_VALID(s, pos)) {
202 STREAM_BOUND_WARN(s, "set getp");
203 pos = s->endp;
204 }
205
206 s->getp = pos;
207 }
208
209 void stream_set_endp(struct stream *s, size_t pos)
210 {
211 STREAM_VERIFY_SANE(s);
212
213 if (!ENDP_VALID(s, pos)) {
214 STREAM_BOUND_WARN(s, "set endp");
215 return;
216 }
217
218 /*
219 * Make sure the current read pointer is not beyond the new endp.
220 */
221 if (s->getp > pos) {
222 STREAM_BOUND_WARN(s, "set endp");
223 return;
224 }
225
226 s->endp = pos;
227 STREAM_VERIFY_SANE(s);
228 }
229
230 /* Forward pointer. */
231 void stream_forward_getp(struct stream *s, size_t size)
232 {
233 STREAM_VERIFY_SANE(s);
234
235 if (!GETP_VALID(s, s->getp + size)) {
236 STREAM_BOUND_WARN(s, "seek getp");
237 return;
238 }
239
240 s->getp += size;
241 }
242
243 bool stream_forward_getp2(struct stream *s, size_t size)
244 {
245 STREAM_VERIFY_SANE(s);
246
247 if (!GETP_VALID(s, s->getp + size))
248 return false;
249
250 s->getp += size;
251
252 return true;
253 }
254
255 void stream_rewind_getp(struct stream *s, size_t size)
256 {
257 STREAM_VERIFY_SANE(s);
258
259 if (size > s->getp || !GETP_VALID(s, s->getp - size)) {
260 STREAM_BOUND_WARN(s, "rewind getp");
261 return;
262 }
263
264 s->getp -= size;
265 }
266
267 bool stream_rewind_getp2(struct stream *s, size_t size)
268 {
269 STREAM_VERIFY_SANE(s);
270
271 if (size > s->getp || !GETP_VALID(s, s->getp - size))
272 return false;
273
274 s->getp -= size;
275
276 return true;
277 }
278
279 void stream_forward_endp(struct stream *s, size_t size)
280 {
281 STREAM_VERIFY_SANE(s);
282
283 if (!ENDP_VALID(s, s->endp + size)) {
284 STREAM_BOUND_WARN(s, "seek endp");
285 return;
286 }
287
288 s->endp += size;
289 }
290
291 bool stream_forward_endp2(struct stream *s, size_t size)
292 {
293 STREAM_VERIFY_SANE(s);
294
295 if (!ENDP_VALID(s, s->endp + size))
296 return false;
297
298 s->endp += size;
299
300 return true;
301 }
302
303 /* Copy from stream to destination. */
304 bool stream_get2(void *dst, struct stream *s, size_t size)
305 {
306 STREAM_VERIFY_SANE(s);
307
308 if (STREAM_READABLE(s) < size) {
309 STREAM_BOUND_WARN2(s, "get");
310 return false;
311 }
312
313 memcpy(dst, s->data + s->getp, size);
314 s->getp += size;
315
316 return true;
317 }
318
319 void stream_get(void *dst, struct stream *s, size_t size)
320 {
321 STREAM_VERIFY_SANE(s);
322
323 if (STREAM_READABLE(s) < size) {
324 STREAM_BOUND_WARN(s, "get");
325 return;
326 }
327
328 memcpy(dst, s->data + s->getp, size);
329 s->getp += size;
330 }
331
332 /* Get next character from the stream. */
333 bool stream_getc2(struct stream *s, uint8_t *byte)
334 {
335 STREAM_VERIFY_SANE(s);
336
337 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
338 STREAM_BOUND_WARN2(s, "get char");
339 return false;
340 }
341 *byte = s->data[s->getp++];
342
343 return true;
344 }
345
346 uint8_t stream_getc(struct stream *s)
347 {
348 uint8_t c;
349
350 STREAM_VERIFY_SANE(s);
351
352 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
353 STREAM_BOUND_WARN(s, "get char");
354 return 0;
355 }
356 c = s->data[s->getp++];
357
358 return c;
359 }
360
361 /* Get next character from the stream. */
362 uint8_t stream_getc_from(struct stream *s, size_t from)
363 {
364 uint8_t c;
365
366 STREAM_VERIFY_SANE(s);
367
368 if (!GETP_VALID(s, from + sizeof(uint8_t))) {
369 STREAM_BOUND_WARN(s, "get char");
370 return 0;
371 }
372
373 c = s->data[from];
374
375 return c;
376 }
377
378 bool stream_getw2(struct stream *s, uint16_t *word)
379 {
380 STREAM_VERIFY_SANE(s);
381
382 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
383 STREAM_BOUND_WARN2(s, "get ");
384 return false;
385 }
386
387 *word = s->data[s->getp++] << 8;
388 *word |= s->data[s->getp++];
389
390 return true;
391 }
392
393 /* Get next word from the stream. */
394 uint16_t stream_getw(struct stream *s)
395 {
396 uint16_t w;
397
398 STREAM_VERIFY_SANE(s);
399
400 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
401 STREAM_BOUND_WARN(s, "get ");
402 return 0;
403 }
404
405 w = s->data[s->getp++] << 8;
406 w |= s->data[s->getp++];
407
408 return w;
409 }
410
411 /* Get next word from the stream. */
412 uint16_t stream_getw_from(struct stream *s, size_t from)
413 {
414 uint16_t w;
415
416 STREAM_VERIFY_SANE(s);
417
418 if (!GETP_VALID(s, from + sizeof(uint16_t))) {
419 STREAM_BOUND_WARN(s, "get ");
420 return 0;
421 }
422
423 w = s->data[from++] << 8;
424 w |= s->data[from];
425
426 return w;
427 }
428
429 /* Get next 3-byte from the stream. */
430 uint32_t stream_get3_from(struct stream *s, size_t from)
431 {
432 uint32_t l;
433
434 STREAM_VERIFY_SANE(s);
435
436 if (!GETP_VALID(s, from + 3)) {
437 STREAM_BOUND_WARN(s, "get 3byte");
438 return 0;
439 }
440
441 l = s->data[from++] << 16;
442 l |= s->data[from++] << 8;
443 l |= s->data[from];
444
445 return l;
446 }
447
448 uint32_t stream_get3(struct stream *s)
449 {
450 uint32_t l;
451
452 STREAM_VERIFY_SANE(s);
453
454 if (STREAM_READABLE(s) < 3) {
455 STREAM_BOUND_WARN(s, "get 3byte");
456 return 0;
457 }
458
459 l = s->data[s->getp++] << 16;
460 l |= s->data[s->getp++] << 8;
461 l |= s->data[s->getp++];
462
463 return l;
464 }
465
466 /* Get next long word from the stream. */
467 uint32_t stream_getl_from(struct stream *s, size_t from)
468 {
469 uint32_t l;
470
471 STREAM_VERIFY_SANE(s);
472
473 if (!GETP_VALID(s, from + sizeof(uint32_t))) {
474 STREAM_BOUND_WARN(s, "get long");
475 return 0;
476 }
477
478 l = (unsigned)(s->data[from++]) << 24;
479 l |= s->data[from++] << 16;
480 l |= s->data[from++] << 8;
481 l |= s->data[from];
482
483 return l;
484 }
485
486 /* Copy from stream at specific location to destination. */
487 void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
488 {
489 STREAM_VERIFY_SANE(s);
490
491 if (!GETP_VALID(s, from + size)) {
492 STREAM_BOUND_WARN(s, "get from");
493 return;
494 }
495
496 memcpy(dst, s->data + from, size);
497 }
498
499 bool stream_getl2(struct stream *s, uint32_t *l)
500 {
501 STREAM_VERIFY_SANE(s);
502
503 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
504 STREAM_BOUND_WARN2(s, "get long");
505 return false;
506 }
507
508 *l = (unsigned int)(s->data[s->getp++]) << 24;
509 *l |= s->data[s->getp++] << 16;
510 *l |= s->data[s->getp++] << 8;
511 *l |= s->data[s->getp++];
512
513 return true;
514 }
515
516 uint32_t stream_getl(struct stream *s)
517 {
518 uint32_t l;
519
520 STREAM_VERIFY_SANE(s);
521
522 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
523 STREAM_BOUND_WARN(s, "get long");
524 return 0;
525 }
526
527 l = (unsigned)(s->data[s->getp++]) << 24;
528 l |= s->data[s->getp++] << 16;
529 l |= s->data[s->getp++] << 8;
530 l |= s->data[s->getp++];
531
532 return l;
533 }
534
535 /* Get next quad word from the stream. */
536 uint64_t stream_getq_from(struct stream *s, size_t from)
537 {
538 uint64_t q;
539
540 STREAM_VERIFY_SANE(s);
541
542 if (!GETP_VALID(s, from + sizeof(uint64_t))) {
543 STREAM_BOUND_WARN(s, "get quad");
544 return 0;
545 }
546
547 q = ((uint64_t)s->data[from++]) << 56;
548 q |= ((uint64_t)s->data[from++]) << 48;
549 q |= ((uint64_t)s->data[from++]) << 40;
550 q |= ((uint64_t)s->data[from++]) << 32;
551 q |= ((uint64_t)s->data[from++]) << 24;
552 q |= ((uint64_t)s->data[from++]) << 16;
553 q |= ((uint64_t)s->data[from++]) << 8;
554 q |= ((uint64_t)s->data[from++]);
555
556 return q;
557 }
558
559 uint64_t stream_getq(struct stream *s)
560 {
561 uint64_t q;
562
563 STREAM_VERIFY_SANE(s);
564
565 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
566 STREAM_BOUND_WARN(s, "get quad");
567 return 0;
568 }
569
570 q = ((uint64_t)s->data[s->getp++]) << 56;
571 q |= ((uint64_t)s->data[s->getp++]) << 48;
572 q |= ((uint64_t)s->data[s->getp++]) << 40;
573 q |= ((uint64_t)s->data[s->getp++]) << 32;
574 q |= ((uint64_t)s->data[s->getp++]) << 24;
575 q |= ((uint64_t)s->data[s->getp++]) << 16;
576 q |= ((uint64_t)s->data[s->getp++]) << 8;
577 q |= ((uint64_t)s->data[s->getp++]);
578
579 return q;
580 }
581
582 bool stream_getq2(struct stream *s, uint64_t *q)
583 {
584 STREAM_VERIFY_SANE(s);
585
586 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
587 STREAM_BOUND_WARN2(s, "get uint64");
588 return false;
589 }
590
591 *q = ((uint64_t)s->data[s->getp++]) << 56;
592 *q |= ((uint64_t)s->data[s->getp++]) << 48;
593 *q |= ((uint64_t)s->data[s->getp++]) << 40;
594 *q |= ((uint64_t)s->data[s->getp++]) << 32;
595 *q |= ((uint64_t)s->data[s->getp++]) << 24;
596 *q |= ((uint64_t)s->data[s->getp++]) << 16;
597 *q |= ((uint64_t)s->data[s->getp++]) << 8;
598 *q |= ((uint64_t)s->data[s->getp++]);
599
600 return true;
601 }
602
603 /* Get next long word from the stream. */
604 uint32_t stream_get_ipv4(struct stream *s)
605 {
606 uint32_t l;
607
608 STREAM_VERIFY_SANE(s);
609
610 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
611 STREAM_BOUND_WARN(s, "get ipv4");
612 return 0;
613 }
614
615 memcpy(&l, s->data + s->getp, sizeof(uint32_t));
616 s->getp += sizeof(uint32_t);
617
618 return l;
619 }
620
621 bool stream_get_ipaddr(struct stream *s, struct ipaddr *ip)
622 {
623 uint16_t ipa_len = 0;
624
625 STREAM_VERIFY_SANE(s);
626
627 /* Get address type. */
628 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
629 STREAM_BOUND_WARN2(s, "get ipaddr");
630 return false;
631 }
632 ip->ipa_type = stream_getw(s);
633
634 /* Get address value. */
635 switch (ip->ipa_type) {
636 case IPADDR_V4:
637 ipa_len = IPV4_MAX_BYTELEN;
638 break;
639 case IPADDR_V6:
640 ipa_len = IPV6_MAX_BYTELEN;
641 break;
642 case IPADDR_NONE:
643 flog_err(EC_LIB_DEVELOPMENT,
644 "%s: unknown ip address-family: %u", __func__,
645 ip->ipa_type);
646 return false;
647 }
648 if (STREAM_READABLE(s) < ipa_len) {
649 STREAM_BOUND_WARN2(s, "get ipaddr");
650 return false;
651 }
652 memcpy(&ip->ip, s->data + s->getp, ipa_len);
653 s->getp += ipa_len;
654
655 return true;
656 }
657
658 float stream_getf(struct stream *s)
659 {
660 union {
661 float r;
662 uint32_t d;
663 } u;
664 u.d = stream_getl(s);
665 return u.r;
666 }
667
668 double stream_getd(struct stream *s)
669 {
670 union {
671 double r;
672 uint64_t d;
673 } u;
674 u.d = stream_getq(s);
675 return u.r;
676 }
677
678 /* Copy from source to stream.
679 *
680 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
681 * around. This should be fixed once the stream updates are working.
682 *
683 * stream_write() is saner
684 */
685 void stream_put(struct stream *s, const void *src, size_t size)
686 {
687
688 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
689 CHECK_SIZE(s, size);
690
691 STREAM_VERIFY_SANE(s);
692
693 if (STREAM_WRITEABLE(s) < size) {
694 STREAM_BOUND_WARN(s, "put");
695 return;
696 }
697
698 if (src)
699 memcpy(s->data + s->endp, src, size);
700 else
701 memset(s->data + s->endp, 0, size);
702
703 s->endp += size;
704 }
705
706 /* Put character to the stream. */
707 int stream_putc(struct stream *s, uint8_t c)
708 {
709 STREAM_VERIFY_SANE(s);
710
711 if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
712 STREAM_BOUND_WARN(s, "put");
713 return 0;
714 }
715
716 s->data[s->endp++] = c;
717 return sizeof(uint8_t);
718 }
719
720 /* Put word to the stream. */
721 int stream_putw(struct stream *s, uint16_t w)
722 {
723 STREAM_VERIFY_SANE(s);
724
725 if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
726 STREAM_BOUND_WARN(s, "put");
727 return 0;
728 }
729
730 s->data[s->endp++] = (uint8_t)(w >> 8);
731 s->data[s->endp++] = (uint8_t)w;
732
733 return 2;
734 }
735
736 /* Put long word to the stream. */
737 int stream_put3(struct stream *s, uint32_t l)
738 {
739 STREAM_VERIFY_SANE(s);
740
741 if (STREAM_WRITEABLE(s) < 3) {
742 STREAM_BOUND_WARN(s, "put");
743 return 0;
744 }
745
746 s->data[s->endp++] = (uint8_t)(l >> 16);
747 s->data[s->endp++] = (uint8_t)(l >> 8);
748 s->data[s->endp++] = (uint8_t)l;
749
750 return 3;
751 }
752
753 /* Put long word to the stream. */
754 int stream_putl(struct stream *s, uint32_t l)
755 {
756 STREAM_VERIFY_SANE(s);
757
758 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
759 STREAM_BOUND_WARN(s, "put");
760 return 0;
761 }
762
763 s->data[s->endp++] = (uint8_t)(l >> 24);
764 s->data[s->endp++] = (uint8_t)(l >> 16);
765 s->data[s->endp++] = (uint8_t)(l >> 8);
766 s->data[s->endp++] = (uint8_t)l;
767
768 return 4;
769 }
770
771 /* Put quad word to the stream. */
772 int stream_putq(struct stream *s, uint64_t q)
773 {
774 STREAM_VERIFY_SANE(s);
775
776 if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
777 STREAM_BOUND_WARN(s, "put quad");
778 return 0;
779 }
780
781 s->data[s->endp++] = (uint8_t)(q >> 56);
782 s->data[s->endp++] = (uint8_t)(q >> 48);
783 s->data[s->endp++] = (uint8_t)(q >> 40);
784 s->data[s->endp++] = (uint8_t)(q >> 32);
785 s->data[s->endp++] = (uint8_t)(q >> 24);
786 s->data[s->endp++] = (uint8_t)(q >> 16);
787 s->data[s->endp++] = (uint8_t)(q >> 8);
788 s->data[s->endp++] = (uint8_t)q;
789
790 return 8;
791 }
792
793 int stream_putf(struct stream *s, float f)
794 {
795 union {
796 float i;
797 uint32_t o;
798 } u;
799 u.i = f;
800 return stream_putl(s, u.o);
801 }
802
803 int stream_putd(struct stream *s, double d)
804 {
805 union {
806 double i;
807 uint64_t o;
808 } u;
809 u.i = d;
810 return stream_putq(s, u.o);
811 }
812
813 int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
814 {
815 STREAM_VERIFY_SANE(s);
816
817 if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
818 STREAM_BOUND_WARN(s, "put");
819 return 0;
820 }
821
822 s->data[putp] = c;
823
824 return 1;
825 }
826
827 int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
828 {
829 STREAM_VERIFY_SANE(s);
830
831 if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
832 STREAM_BOUND_WARN(s, "put");
833 return 0;
834 }
835
836 s->data[putp] = (uint8_t)(w >> 8);
837 s->data[putp + 1] = (uint8_t)w;
838
839 return 2;
840 }
841
842 int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
843 {
844 STREAM_VERIFY_SANE(s);
845
846 if (!PUT_AT_VALID(s, putp + 3)) {
847 STREAM_BOUND_WARN(s, "put");
848 return 0;
849 }
850 s->data[putp] = (uint8_t)(l >> 16);
851 s->data[putp + 1] = (uint8_t)(l >> 8);
852 s->data[putp + 2] = (uint8_t)l;
853
854 return 3;
855 }
856
857 int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
858 {
859 STREAM_VERIFY_SANE(s);
860
861 if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
862 STREAM_BOUND_WARN(s, "put");
863 return 0;
864 }
865 s->data[putp] = (uint8_t)(l >> 24);
866 s->data[putp + 1] = (uint8_t)(l >> 16);
867 s->data[putp + 2] = (uint8_t)(l >> 8);
868 s->data[putp + 3] = (uint8_t)l;
869
870 return 4;
871 }
872
873 int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
874 {
875 STREAM_VERIFY_SANE(s);
876
877 if (!PUT_AT_VALID(s, putp + sizeof(uint64_t))) {
878 STREAM_BOUND_WARN(s, "put");
879 return 0;
880 }
881 s->data[putp] = (uint8_t)(q >> 56);
882 s->data[putp + 1] = (uint8_t)(q >> 48);
883 s->data[putp + 2] = (uint8_t)(q >> 40);
884 s->data[putp + 3] = (uint8_t)(q >> 32);
885 s->data[putp + 4] = (uint8_t)(q >> 24);
886 s->data[putp + 5] = (uint8_t)(q >> 16);
887 s->data[putp + 6] = (uint8_t)(q >> 8);
888 s->data[putp + 7] = (uint8_t)q;
889
890 return 8;
891 }
892
893 /* Put long word to the stream. */
894 int stream_put_ipv4(struct stream *s, uint32_t l)
895 {
896 STREAM_VERIFY_SANE(s);
897
898 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
899 STREAM_BOUND_WARN(s, "put");
900 return 0;
901 }
902 memcpy(s->data + s->endp, &l, sizeof(uint32_t));
903 s->endp += sizeof(uint32_t);
904
905 return sizeof(uint32_t);
906 }
907
908 /* Put long word to the stream. */
909 int stream_put_in_addr(struct stream *s, const struct in_addr *addr)
910 {
911 STREAM_VERIFY_SANE(s);
912
913 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
914 STREAM_BOUND_WARN(s, "put");
915 return 0;
916 }
917
918 memcpy(s->data + s->endp, addr, sizeof(uint32_t));
919 s->endp += sizeof(uint32_t);
920
921 return sizeof(uint32_t);
922 }
923
924 bool stream_put_ipaddr(struct stream *s, struct ipaddr *ip)
925 {
926 stream_putw(s, ip->ipa_type);
927
928 switch (ip->ipa_type) {
929 case IPADDR_V4:
930 stream_put_in_addr(s, &ip->ipaddr_v4);
931 break;
932 case IPADDR_V6:
933 stream_write(s, (uint8_t *)&ip->ipaddr_v6, 16);
934 break;
935 case IPADDR_NONE:
936 flog_err(EC_LIB_DEVELOPMENT,
937 "%s: unknown ip address-family: %u", __func__,
938 ip->ipa_type);
939 return false;
940 }
941
942 return true;
943 }
944
945 /* Put in_addr at location in the stream. */
946 int stream_put_in_addr_at(struct stream *s, size_t putp,
947 const struct in_addr *addr)
948 {
949 STREAM_VERIFY_SANE(s);
950
951 if (!PUT_AT_VALID(s, putp + 4)) {
952 STREAM_BOUND_WARN(s, "put");
953 return 0;
954 }
955
956 memcpy(&s->data[putp], addr, 4);
957 return 4;
958 }
959
960 /* Put in6_addr at location in the stream. */
961 int stream_put_in6_addr_at(struct stream *s, size_t putp,
962 const struct in6_addr *addr)
963 {
964 STREAM_VERIFY_SANE(s);
965
966 if (!PUT_AT_VALID(s, putp + 16)) {
967 STREAM_BOUND_WARN(s, "put");
968 return 0;
969 }
970
971 memcpy(&s->data[putp], addr, 16);
972 return 16;
973 }
974
975 /* Put prefix by nlri type format. */
976 int stream_put_prefix_addpath(struct stream *s, const struct prefix *p,
977 bool addpath_capable, uint32_t addpath_tx_id)
978 {
979 size_t psize;
980 size_t psize_with_addpath;
981
982 STREAM_VERIFY_SANE(s);
983
984 psize = PSIZE(p->prefixlen);
985
986 if (addpath_capable)
987 psize_with_addpath = psize + 4;
988 else
989 psize_with_addpath = psize;
990
991 if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
992 STREAM_BOUND_WARN(s, "put");
993 return 0;
994 }
995
996 if (addpath_capable) {
997 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
998 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
999 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
1000 s->data[s->endp++] = (uint8_t)addpath_tx_id;
1001 }
1002
1003 s->data[s->endp++] = p->prefixlen;
1004 memcpy(s->data + s->endp, &p->u.prefix, psize);
1005 s->endp += psize;
1006
1007 return psize;
1008 }
1009
1010 int stream_put_prefix(struct stream *s, const struct prefix *p)
1011 {
1012 return stream_put_prefix_addpath(s, p, 0, 0);
1013 }
1014
1015 /* Put NLRI with label */
1016 int stream_put_labeled_prefix(struct stream *s, const struct prefix *p,
1017 mpls_label_t *label, bool addpath_capable,
1018 uint32_t addpath_tx_id)
1019 {
1020 size_t psize;
1021 size_t psize_with_addpath;
1022 uint8_t *label_pnt = (uint8_t *)label;
1023
1024 STREAM_VERIFY_SANE(s);
1025
1026 psize = PSIZE(p->prefixlen);
1027 psize_with_addpath = psize + (addpath_capable ? 4 : 0);
1028
1029 if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) {
1030 STREAM_BOUND_WARN(s, "put");
1031 return 0;
1032 }
1033
1034 if (addpath_capable) {
1035 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
1036 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
1037 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
1038 s->data[s->endp++] = (uint8_t)addpath_tx_id;
1039 }
1040
1041 stream_putc(s, (p->prefixlen + 24));
1042 stream_putc(s, label_pnt[0]);
1043 stream_putc(s, label_pnt[1]);
1044 stream_putc(s, label_pnt[2]);
1045 memcpy(s->data + s->endp, &p->u.prefix, psize);
1046 s->endp += psize;
1047
1048 return (psize + 3);
1049 }
1050
1051 /* Read size from fd. */
1052 int stream_read(struct stream *s, int fd, size_t size)
1053 {
1054 int nbytes;
1055
1056 STREAM_VERIFY_SANE(s);
1057
1058 if (STREAM_WRITEABLE(s) < size) {
1059 STREAM_BOUND_WARN(s, "put");
1060 return 0;
1061 }
1062
1063 nbytes = readn(fd, s->data + s->endp, size);
1064
1065 if (nbytes > 0)
1066 s->endp += nbytes;
1067
1068 return nbytes;
1069 }
1070
1071 ssize_t stream_read_try(struct stream *s, int fd, size_t size)
1072 {
1073 ssize_t nbytes;
1074
1075 STREAM_VERIFY_SANE(s);
1076
1077 if (STREAM_WRITEABLE(s) < size) {
1078 STREAM_BOUND_WARN(s, "put");
1079 /* Fatal (not transient) error, since retrying will not help
1080 (stream is too small to contain the desired data). */
1081 return -1;
1082 }
1083
1084 nbytes = read(fd, s->data + s->endp, size);
1085 if (nbytes >= 0) {
1086 s->endp += nbytes;
1087 return nbytes;
1088 }
1089 /* Error: was it transient (return -2) or fatal (return -1)? */
1090 if (ERRNO_IO_RETRY(errno))
1091 return -2;
1092 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
1093 safe_strerror(errno));
1094 return -1;
1095 }
1096
1097 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
1098 * whose arguments match the remaining arguments to this function
1099 */
1100 ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
1101 struct sockaddr *from, socklen_t *fromlen)
1102 {
1103 ssize_t nbytes;
1104
1105 STREAM_VERIFY_SANE(s);
1106
1107 if (STREAM_WRITEABLE(s) < size) {
1108 STREAM_BOUND_WARN(s, "put");
1109 /* Fatal (not transient) error, since retrying will not help
1110 (stream is too small to contain the desired data). */
1111 return -1;
1112 }
1113
1114 nbytes = recvfrom(fd, s->data + s->endp, size, flags, from, fromlen);
1115 if (nbytes >= 0) {
1116 s->endp += nbytes;
1117 return nbytes;
1118 }
1119 /* Error: was it transient (return -2) or fatal (return -1)? */
1120 if (ERRNO_IO_RETRY(errno))
1121 return -2;
1122 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
1123 safe_strerror(errno));
1124 return -1;
1125 }
1126
1127 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1128 * from endp.
1129 * First iovec will be used to receive the data.
1130 * Stream need not be empty.
1131 */
1132 ssize_t stream_recvmsg(struct stream *s, int fd, struct msghdr *msgh, int flags,
1133 size_t size)
1134 {
1135 int nbytes;
1136 struct iovec *iov;
1137
1138 STREAM_VERIFY_SANE(s);
1139 assert(msgh->msg_iovlen > 0);
1140
1141 if (STREAM_WRITEABLE(s) < size) {
1142 STREAM_BOUND_WARN(s, "put");
1143 /* This is a logic error in the calling code: the stream is too
1144 small
1145 to hold the desired data! */
1146 return -1;
1147 }
1148
1149 iov = &(msgh->msg_iov[0]);
1150 iov->iov_base = (s->data + s->endp);
1151 iov->iov_len = size;
1152
1153 nbytes = recvmsg(fd, msgh, flags);
1154
1155 if (nbytes > 0)
1156 s->endp += nbytes;
1157
1158 return nbytes;
1159 }
1160
1161 /* Write data to buffer. */
1162 size_t stream_write(struct stream *s, const void *ptr, size_t size)
1163 {
1164
1165 CHECK_SIZE(s, size);
1166
1167 STREAM_VERIFY_SANE(s);
1168
1169 if (STREAM_WRITEABLE(s) < size) {
1170 STREAM_BOUND_WARN(s, "put");
1171 return 0;
1172 }
1173
1174 memcpy(s->data + s->endp, ptr, size);
1175 s->endp += size;
1176
1177 return size;
1178 }
1179
1180 /* Return current read pointer.
1181 * DEPRECATED!
1182 * Use stream_get_pnt_to if you must, but decoding streams properly
1183 * is preferred
1184 */
1185 uint8_t *stream_pnt(struct stream *s)
1186 {
1187 STREAM_VERIFY_SANE(s);
1188 return s->data + s->getp;
1189 }
1190
1191 /* Check does this stream empty? */
1192 int stream_empty(struct stream *s)
1193 {
1194 STREAM_VERIFY_SANE(s);
1195
1196 return (s->endp == 0);
1197 }
1198
1199 /* Reset stream. */
1200 void stream_reset(struct stream *s)
1201 {
1202 STREAM_VERIFY_SANE(s);
1203
1204 s->getp = s->endp = 0;
1205 }
1206
1207 /* Write stream contens to the file discriptor. */
1208 int stream_flush(struct stream *s, int fd)
1209 {
1210 int nbytes;
1211
1212 STREAM_VERIFY_SANE(s);
1213
1214 nbytes = write(fd, s->data + s->getp, s->endp - s->getp);
1215
1216 return nbytes;
1217 }
1218
1219 void stream_hexdump(const struct stream *s)
1220 {
1221 zlog_hexdump(s->data, s->endp);
1222 }
1223
1224 /* Stream first in first out queue. */
1225
1226 struct stream_fifo *stream_fifo_new(void)
1227 {
1228 struct stream_fifo *new;
1229
1230 new = XMALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
1231 stream_fifo_init(new);
1232 return new;
1233 }
1234
1235 void stream_fifo_init(struct stream_fifo *fifo)
1236 {
1237 memset(fifo, 0, sizeof(struct stream_fifo));
1238 pthread_mutex_init(&fifo->mtx, NULL);
1239 }
1240
1241 /* Add new stream to fifo. */
1242 void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
1243 {
1244 #if defined DEV_BUILD
1245 size_t max, curmax;
1246 #endif
1247
1248 if (fifo->tail)
1249 fifo->tail->next = s;
1250 else
1251 fifo->head = s;
1252
1253 fifo->tail = s;
1254 fifo->tail->next = NULL;
1255 #if !defined DEV_BUILD
1256 atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1257 #else
1258 max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1259 curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
1260 if (max > curmax)
1261 atomic_store_explicit(&fifo->max_count, max,
1262 memory_order_relaxed);
1263 #endif
1264 }
1265
1266 void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
1267 {
1268 frr_with_mutex (&fifo->mtx) {
1269 stream_fifo_push(fifo, s);
1270 }
1271 }
1272
1273 /* Delete first stream from fifo. */
1274 struct stream *stream_fifo_pop(struct stream_fifo *fifo)
1275 {
1276 struct stream *s;
1277
1278 s = fifo->head;
1279
1280 if (s) {
1281 fifo->head = s->next;
1282
1283 if (fifo->head == NULL)
1284 fifo->tail = NULL;
1285
1286 atomic_fetch_sub_explicit(&fifo->count, 1,
1287 memory_order_release);
1288
1289 /* ensure stream is scrubbed of references to this fifo */
1290 s->next = NULL;
1291 }
1292
1293 return s;
1294 }
1295
1296 struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
1297 {
1298 struct stream *ret;
1299
1300 frr_with_mutex (&fifo->mtx) {
1301 ret = stream_fifo_pop(fifo);
1302 }
1303
1304 return ret;
1305 }
1306
1307 struct stream *stream_fifo_head(struct stream_fifo *fifo)
1308 {
1309 return fifo->head;
1310 }
1311
1312 struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
1313 {
1314 struct stream *ret;
1315
1316 frr_with_mutex (&fifo->mtx) {
1317 ret = stream_fifo_head(fifo);
1318 }
1319
1320 return ret;
1321 }
1322
1323 void stream_fifo_clean(struct stream_fifo *fifo)
1324 {
1325 struct stream *s;
1326 struct stream *next;
1327
1328 for (s = fifo->head; s; s = next) {
1329 next = s->next;
1330 stream_free(s);
1331 }
1332 fifo->head = fifo->tail = NULL;
1333 atomic_store_explicit(&fifo->count, 0, memory_order_release);
1334 }
1335
1336 void stream_fifo_clean_safe(struct stream_fifo *fifo)
1337 {
1338 frr_with_mutex (&fifo->mtx) {
1339 stream_fifo_clean(fifo);
1340 }
1341 }
1342
1343 size_t stream_fifo_count_safe(struct stream_fifo *fifo)
1344 {
1345 return atomic_load_explicit(&fifo->count, memory_order_acquire);
1346 }
1347
1348 void stream_fifo_deinit(struct stream_fifo *fifo)
1349 {
1350 stream_fifo_clean(fifo);
1351 pthread_mutex_destroy(&fifo->mtx);
1352 }
1353
1354 void stream_fifo_free(struct stream_fifo *fifo)
1355 {
1356 stream_fifo_deinit(fifo);
1357 XFREE(MTYPE_STREAM_FIFO, fifo);
1358 }
1359
1360 void stream_pulldown(struct stream *s)
1361 {
1362 size_t rlen = STREAM_READABLE(s);
1363
1364 /* No more data, so just move the pointers. */
1365 if (rlen == 0) {
1366 stream_reset(s);
1367 return;
1368 }
1369
1370 /* Move the available data to the beginning. */
1371 memmove(s->data, &s->data[s->getp], rlen);
1372 s->getp = 0;
1373 s->endp = rlen;
1374 }