]> git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
Merge pull request #2886 from donaldsharp/stream_resize
[mirror_frr.git] / lib / stream.c
1 /*
2 * Packet interface
3 * Copyright (C) 1999 Kunihiro Ishiguro
4 *
5 * This file is part of GNU Zebra.
6 *
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
10 * later version.
11 *
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <zebra.h>
23 #include <stddef.h>
24 #include <pthread.h>
25
26 #include "stream.h"
27 #include "memory.h"
28 #include "network.h"
29 #include "prefix.h"
30 #include "log.h"
31 #include "lib_errors.h"
32
33 DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream")
34 DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
35
36 /* Tests whether a position is valid */
37 #define GETP_VALID(S, G) ((G) <= (S)->endp)
38 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
39 #define ENDP_VALID(S, E) ((E) <= (S)->size)
40
41 /* asserting sanity checks. Following must be true before
42 * stream functions are called:
43 *
44 * Following must always be true of stream elements
45 * before and after calls to stream functions:
46 *
47 * getp <= endp <= size
48 *
49 * Note that after a stream function is called following may be true:
50 * if (getp == endp) then stream is no longer readable
51 * if (endp == size) then stream is no longer writeable
52 *
53 * It is valid to put to anywhere within the size of the stream, but only
54 * using stream_put..._at() functions.
55 */
56 #define STREAM_WARN_OFFSETS(S) \
57 zlog_warn("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
58 (void *)(S), (unsigned long)(S)->size, \
59 (unsigned long)(S)->getp, (unsigned long)(S)->endp)
60
61 #define STREAM_VERIFY_SANE(S) \
62 do { \
63 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) \
64 STREAM_WARN_OFFSETS(S); \
65 assert(GETP_VALID(S, (S)->getp)); \
66 assert(ENDP_VALID(S, (S)->endp)); \
67 } while (0)
68
69 #define STREAM_BOUND_WARN(S, WHAT) \
70 do { \
71 zlog_warn("%s: Attempt to %s out of bounds", __func__, \
72 (WHAT)); \
73 STREAM_WARN_OFFSETS(S); \
74 assert(0); \
75 } while (0)
76
77 #define STREAM_BOUND_WARN2(S, WHAT) \
78 do { \
79 zlog_warn("%s: Attempt to %s out of bounds", __func__, \
80 (WHAT)); \
81 STREAM_WARN_OFFSETS(S); \
82 } while (0)
83
84 /* XXX: Deprecated macro: do not use */
85 #define CHECK_SIZE(S, Z) \
86 do { \
87 if (((S)->endp + (Z)) > (S)->size) { \
88 zlog_warn( \
89 "CHECK_SIZE: truncating requested size %lu\n", \
90 (unsigned long)(Z)); \
91 STREAM_WARN_OFFSETS(S); \
92 (Z) = (S)->size - (S)->endp; \
93 } \
94 } while (0);
95
96 /* Make stream buffer. */
97 struct stream *stream_new(size_t size)
98 {
99 struct stream *s;
100
101 assert(size > 0);
102
103 s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size);
104
105 s->getp = s->endp = 0;
106 s->next = NULL;
107 s->size = size;
108 return s;
109 }
110
111 /* Free it now. */
112 void stream_free(struct stream *s)
113 {
114 if (!s)
115 return;
116
117 XFREE(MTYPE_STREAM, s);
118 }
119
120 struct stream *stream_copy(struct stream *new, struct stream *src)
121 {
122 STREAM_VERIFY_SANE(src);
123
124 assert(new != NULL);
125 assert(STREAM_SIZE(new) >= src->endp);
126
127 new->endp = src->endp;
128 new->getp = src->getp;
129
130 memcpy(new->data, src->data, src->endp);
131
132 return new;
133 }
134
135 struct stream *stream_dup(struct stream *s)
136 {
137 struct stream *new;
138
139 STREAM_VERIFY_SANE(s);
140
141 if ((new = stream_new(s->endp)) == NULL)
142 return NULL;
143
144 return (stream_copy(new, s));
145 }
146
147 struct stream *stream_dupcat(struct stream *s1, struct stream *s2,
148 size_t offset)
149 {
150 struct stream *new;
151
152 STREAM_VERIFY_SANE(s1);
153 STREAM_VERIFY_SANE(s2);
154
155 if ((new = stream_new(s1->endp + s2->endp)) == NULL)
156 return NULL;
157
158 memcpy(new->data, s1->data, offset);
159 memcpy(new->data + offset, s2->data, s2->endp);
160 memcpy(new->data + offset + s2->endp, s1->data + offset,
161 (s1->endp - offset));
162 new->endp = s1->endp + s2->endp;
163 return new;
164 }
165
166 size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
167 {
168 struct stream *orig = *sptr;
169
170 STREAM_VERIFY_SANE(orig);
171
172 orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize);
173
174 orig->size = newsize;
175
176 if (orig->endp > orig->size)
177 orig->endp = orig->size;
178 if (orig->getp > orig->endp)
179 orig->getp = orig->endp;
180
181 STREAM_VERIFY_SANE(orig);
182
183 *sptr = orig;
184 return orig->size;
185 }
186
187 size_t __attribute__((deprecated))stream_resize_orig(struct stream *s,
188 size_t newsize)
189 {
190 assert("stream_resize: Switch code to use stream_resize_inplace" == NULL);
191
192 return stream_resize_inplace(&s, newsize);
193 }
194
195 size_t stream_get_getp(struct stream *s)
196 {
197 STREAM_VERIFY_SANE(s);
198 return s->getp;
199 }
200
201 size_t stream_get_endp(struct stream *s)
202 {
203 STREAM_VERIFY_SANE(s);
204 return s->endp;
205 }
206
207 size_t stream_get_size(struct stream *s)
208 {
209 STREAM_VERIFY_SANE(s);
210 return s->size;
211 }
212
213 /* Stream structre' stream pointer related functions. */
214 void stream_set_getp(struct stream *s, size_t pos)
215 {
216 STREAM_VERIFY_SANE(s);
217
218 if (!GETP_VALID(s, pos)) {
219 STREAM_BOUND_WARN(s, "set getp");
220 pos = s->endp;
221 }
222
223 s->getp = pos;
224 }
225
226 void stream_set_endp(struct stream *s, size_t pos)
227 {
228 STREAM_VERIFY_SANE(s);
229
230 if (!ENDP_VALID(s, pos)) {
231 STREAM_BOUND_WARN(s, "set endp");
232 return;
233 }
234
235 /*
236 * Make sure the current read pointer is not beyond the new endp.
237 */
238 if (s->getp > pos) {
239 STREAM_BOUND_WARN(s, "set endp");
240 return;
241 }
242
243 s->endp = pos;
244 STREAM_VERIFY_SANE(s);
245 }
246
247 /* Forward pointer. */
248 void stream_forward_getp(struct stream *s, size_t size)
249 {
250 STREAM_VERIFY_SANE(s);
251
252 if (!GETP_VALID(s, s->getp + size)) {
253 STREAM_BOUND_WARN(s, "seek getp");
254 return;
255 }
256
257 s->getp += size;
258 }
259
260 void stream_forward_endp(struct stream *s, size_t size)
261 {
262 STREAM_VERIFY_SANE(s);
263
264 if (!ENDP_VALID(s, s->endp + size)) {
265 STREAM_BOUND_WARN(s, "seek endp");
266 return;
267 }
268
269 s->endp += size;
270 }
271
272 /* Copy from stream to destination. */
273 inline bool stream_get2(void *dst, struct stream *s, size_t size)
274 {
275 STREAM_VERIFY_SANE(s);
276
277 if (STREAM_READABLE(s) < size) {
278 STREAM_BOUND_WARN2(s, "get");
279 return false;
280 }
281
282 memcpy(dst, s->data + s->getp, size);
283 s->getp += size;
284
285 return true;
286 }
287
288 void stream_get(void *dst, struct stream *s, size_t size)
289 {
290 STREAM_VERIFY_SANE(s);
291
292 if (STREAM_READABLE(s) < size) {
293 STREAM_BOUND_WARN(s, "get");
294 return;
295 }
296
297 memcpy(dst, s->data + s->getp, size);
298 s->getp += size;
299 }
300
301 /* Get next character from the stream. */
302 inline bool stream_getc2(struct stream *s, uint8_t *byte)
303 {
304 STREAM_VERIFY_SANE(s);
305
306 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
307 STREAM_BOUND_WARN2(s, "get char");
308 return false;
309 }
310 *byte = s->data[s->getp++];
311
312 return true;
313 }
314
315 uint8_t stream_getc(struct stream *s)
316 {
317 uint8_t c;
318
319 STREAM_VERIFY_SANE(s);
320
321 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
322 STREAM_BOUND_WARN(s, "get char");
323 return 0;
324 }
325 c = s->data[s->getp++];
326
327 return c;
328 }
329
330 /* Get next character from the stream. */
331 uint8_t stream_getc_from(struct stream *s, size_t from)
332 {
333 uint8_t c;
334
335 STREAM_VERIFY_SANE(s);
336
337 if (!GETP_VALID(s, from + sizeof(uint8_t))) {
338 STREAM_BOUND_WARN(s, "get char");
339 return 0;
340 }
341
342 c = s->data[from];
343
344 return c;
345 }
346
347 inline bool stream_getw2(struct stream *s, uint16_t *word)
348 {
349 STREAM_VERIFY_SANE(s);
350
351 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
352 STREAM_BOUND_WARN2(s, "get ");
353 return false;
354 }
355
356 *word = s->data[s->getp++] << 8;
357 *word |= s->data[s->getp++];
358
359 return true;
360 }
361
362 /* Get next word from the stream. */
363 uint16_t stream_getw(struct stream *s)
364 {
365 uint16_t w;
366
367 STREAM_VERIFY_SANE(s);
368
369 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
370 STREAM_BOUND_WARN(s, "get ");
371 return 0;
372 }
373
374 w = s->data[s->getp++] << 8;
375 w |= s->data[s->getp++];
376
377 return w;
378 }
379
380 /* Get next word from the stream. */
381 uint16_t stream_getw_from(struct stream *s, size_t from)
382 {
383 uint16_t w;
384
385 STREAM_VERIFY_SANE(s);
386
387 if (!GETP_VALID(s, from + sizeof(uint16_t))) {
388 STREAM_BOUND_WARN(s, "get ");
389 return 0;
390 }
391
392 w = s->data[from++] << 8;
393 w |= s->data[from];
394
395 return w;
396 }
397
398 /* Get next 3-byte from the stream. */
399 uint32_t stream_get3_from(struct stream *s, size_t from)
400 {
401 uint32_t l;
402
403 STREAM_VERIFY_SANE(s);
404
405 if (!GETP_VALID(s, from + 3)) {
406 STREAM_BOUND_WARN(s, "get 3byte");
407 return 0;
408 }
409
410 l = s->data[from++] << 16;
411 l |= s->data[from++] << 8;
412 l |= s->data[from];
413
414 return l;
415 }
416
417 uint32_t stream_get3(struct stream *s)
418 {
419 uint32_t l;
420
421 STREAM_VERIFY_SANE(s);
422
423 if (STREAM_READABLE(s) < 3) {
424 STREAM_BOUND_WARN(s, "get 3byte");
425 return 0;
426 }
427
428 l = s->data[s->getp++] << 16;
429 l |= s->data[s->getp++] << 8;
430 l |= s->data[s->getp++];
431
432 return l;
433 }
434
435 /* Get next long word from the stream. */
436 uint32_t stream_getl_from(struct stream *s, size_t from)
437 {
438 uint32_t l;
439
440 STREAM_VERIFY_SANE(s);
441
442 if (!GETP_VALID(s, from + sizeof(uint32_t))) {
443 STREAM_BOUND_WARN(s, "get long");
444 return 0;
445 }
446
447 l = (unsigned)(s->data[from++]) << 24;
448 l |= s->data[from++] << 16;
449 l |= s->data[from++] << 8;
450 l |= s->data[from];
451
452 return l;
453 }
454
455 /* Copy from stream at specific location to destination. */
456 void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
457 {
458 STREAM_VERIFY_SANE(s);
459
460 if (!GETP_VALID(s, from + size)) {
461 STREAM_BOUND_WARN(s, "get from");
462 return;
463 }
464
465 memcpy(dst, s->data + from, size);
466 }
467
468 inline bool stream_getl2(struct stream *s, uint32_t *l)
469 {
470 STREAM_VERIFY_SANE(s);
471
472 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
473 STREAM_BOUND_WARN2(s, "get long");
474 return false;
475 }
476
477 *l = (unsigned int)(s->data[s->getp++]) << 24;
478 *l |= s->data[s->getp++] << 16;
479 *l |= s->data[s->getp++] << 8;
480 *l |= s->data[s->getp++];
481
482 return true;
483 }
484
485 uint32_t stream_getl(struct stream *s)
486 {
487 uint32_t l;
488
489 STREAM_VERIFY_SANE(s);
490
491 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
492 STREAM_BOUND_WARN(s, "get long");
493 return 0;
494 }
495
496 l = (unsigned)(s->data[s->getp++]) << 24;
497 l |= s->data[s->getp++] << 16;
498 l |= s->data[s->getp++] << 8;
499 l |= s->data[s->getp++];
500
501 return l;
502 }
503
504 /* Get next quad word from the stream. */
505 uint64_t stream_getq_from(struct stream *s, size_t from)
506 {
507 uint64_t q;
508
509 STREAM_VERIFY_SANE(s);
510
511 if (!GETP_VALID(s, from + sizeof(uint64_t))) {
512 STREAM_BOUND_WARN(s, "get quad");
513 return 0;
514 }
515
516 q = ((uint64_t)s->data[from++]) << 56;
517 q |= ((uint64_t)s->data[from++]) << 48;
518 q |= ((uint64_t)s->data[from++]) << 40;
519 q |= ((uint64_t)s->data[from++]) << 32;
520 q |= ((uint64_t)s->data[from++]) << 24;
521 q |= ((uint64_t)s->data[from++]) << 16;
522 q |= ((uint64_t)s->data[from++]) << 8;
523 q |= ((uint64_t)s->data[from++]);
524
525 return q;
526 }
527
528 uint64_t stream_getq(struct stream *s)
529 {
530 uint64_t q;
531
532 STREAM_VERIFY_SANE(s);
533
534 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
535 STREAM_BOUND_WARN(s, "get quad");
536 return 0;
537 }
538
539 q = ((uint64_t)s->data[s->getp++]) << 56;
540 q |= ((uint64_t)s->data[s->getp++]) << 48;
541 q |= ((uint64_t)s->data[s->getp++]) << 40;
542 q |= ((uint64_t)s->data[s->getp++]) << 32;
543 q |= ((uint64_t)s->data[s->getp++]) << 24;
544 q |= ((uint64_t)s->data[s->getp++]) << 16;
545 q |= ((uint64_t)s->data[s->getp++]) << 8;
546 q |= ((uint64_t)s->data[s->getp++]);
547
548 return q;
549 }
550
551 /* Get next long word from the stream. */
552 uint32_t stream_get_ipv4(struct stream *s)
553 {
554 uint32_t l;
555
556 STREAM_VERIFY_SANE(s);
557
558 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
559 STREAM_BOUND_WARN(s, "get ipv4");
560 return 0;
561 }
562
563 memcpy(&l, s->data + s->getp, sizeof(uint32_t));
564 s->getp += sizeof(uint32_t);
565
566 return l;
567 }
568
569 float stream_getf(struct stream *s)
570 {
571 union {
572 float r;
573 uint32_t d;
574 } u;
575 u.d = stream_getl(s);
576 return u.r;
577 }
578
579 double stream_getd(struct stream *s)
580 {
581 union {
582 double r;
583 uint64_t d;
584 } u;
585 u.d = stream_getq(s);
586 return u.r;
587 }
588
589 /* Copy to source to stream.
590 *
591 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
592 * around. This should be fixed once the stream updates are working.
593 *
594 * stream_write() is saner
595 */
596 void stream_put(struct stream *s, const void *src, size_t size)
597 {
598
599 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
600 CHECK_SIZE(s, size);
601
602 STREAM_VERIFY_SANE(s);
603
604 if (STREAM_WRITEABLE(s) < size) {
605 STREAM_BOUND_WARN(s, "put");
606 return;
607 }
608
609 if (src)
610 memcpy(s->data + s->endp, src, size);
611 else
612 memset(s->data + s->endp, 0, size);
613
614 s->endp += size;
615 }
616
617 /* Put character to the stream. */
618 int stream_putc(struct stream *s, uint8_t c)
619 {
620 STREAM_VERIFY_SANE(s);
621
622 if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
623 STREAM_BOUND_WARN(s, "put");
624 return 0;
625 }
626
627 s->data[s->endp++] = c;
628 return sizeof(uint8_t);
629 }
630
631 /* Put word to the stream. */
632 int stream_putw(struct stream *s, uint16_t w)
633 {
634 STREAM_VERIFY_SANE(s);
635
636 if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
637 STREAM_BOUND_WARN(s, "put");
638 return 0;
639 }
640
641 s->data[s->endp++] = (uint8_t)(w >> 8);
642 s->data[s->endp++] = (uint8_t)w;
643
644 return 2;
645 }
646
647 /* Put long word to the stream. */
648 int stream_put3(struct stream *s, uint32_t l)
649 {
650 STREAM_VERIFY_SANE(s);
651
652 if (STREAM_WRITEABLE(s) < 3) {
653 STREAM_BOUND_WARN(s, "put");
654 return 0;
655 }
656
657 s->data[s->endp++] = (uint8_t)(l >> 16);
658 s->data[s->endp++] = (uint8_t)(l >> 8);
659 s->data[s->endp++] = (uint8_t)l;
660
661 return 3;
662 }
663
664 /* Put long word to the stream. */
665 int stream_putl(struct stream *s, uint32_t l)
666 {
667 STREAM_VERIFY_SANE(s);
668
669 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
670 STREAM_BOUND_WARN(s, "put");
671 return 0;
672 }
673
674 s->data[s->endp++] = (uint8_t)(l >> 24);
675 s->data[s->endp++] = (uint8_t)(l >> 16);
676 s->data[s->endp++] = (uint8_t)(l >> 8);
677 s->data[s->endp++] = (uint8_t)l;
678
679 return 4;
680 }
681
682 /* Put quad word to the stream. */
683 int stream_putq(struct stream *s, uint64_t q)
684 {
685 STREAM_VERIFY_SANE(s);
686
687 if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
688 STREAM_BOUND_WARN(s, "put quad");
689 return 0;
690 }
691
692 s->data[s->endp++] = (uint8_t)(q >> 56);
693 s->data[s->endp++] = (uint8_t)(q >> 48);
694 s->data[s->endp++] = (uint8_t)(q >> 40);
695 s->data[s->endp++] = (uint8_t)(q >> 32);
696 s->data[s->endp++] = (uint8_t)(q >> 24);
697 s->data[s->endp++] = (uint8_t)(q >> 16);
698 s->data[s->endp++] = (uint8_t)(q >> 8);
699 s->data[s->endp++] = (uint8_t)q;
700
701 return 8;
702 }
703
704 int stream_putf(struct stream *s, float f)
705 {
706 union {
707 float i;
708 uint32_t o;
709 } u;
710 u.i = f;
711 return stream_putl(s, u.o);
712 }
713
714 int stream_putd(struct stream *s, double d)
715 {
716 union {
717 double i;
718 uint64_t o;
719 } u;
720 u.i = d;
721 return stream_putq(s, u.o);
722 }
723
724 int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
725 {
726 STREAM_VERIFY_SANE(s);
727
728 if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
729 STREAM_BOUND_WARN(s, "put");
730 return 0;
731 }
732
733 s->data[putp] = c;
734
735 return 1;
736 }
737
738 int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
739 {
740 STREAM_VERIFY_SANE(s);
741
742 if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
743 STREAM_BOUND_WARN(s, "put");
744 return 0;
745 }
746
747 s->data[putp] = (uint8_t)(w >> 8);
748 s->data[putp + 1] = (uint8_t)w;
749
750 return 2;
751 }
752
753 int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
754 {
755 STREAM_VERIFY_SANE(s);
756
757 if (!PUT_AT_VALID(s, putp + 3)) {
758 STREAM_BOUND_WARN(s, "put");
759 return 0;
760 }
761 s->data[putp] = (uint8_t)(l >> 16);
762 s->data[putp + 1] = (uint8_t)(l >> 8);
763 s->data[putp + 2] = (uint8_t)l;
764
765 return 3;
766 }
767
768 int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
769 {
770 STREAM_VERIFY_SANE(s);
771
772 if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
773 STREAM_BOUND_WARN(s, "put");
774 return 0;
775 }
776 s->data[putp] = (uint8_t)(l >> 24);
777 s->data[putp + 1] = (uint8_t)(l >> 16);
778 s->data[putp + 2] = (uint8_t)(l >> 8);
779 s->data[putp + 3] = (uint8_t)l;
780
781 return 4;
782 }
783
784 int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
785 {
786 STREAM_VERIFY_SANE(s);
787
788 if (!PUT_AT_VALID(s, putp + sizeof(uint64_t))) {
789 STREAM_BOUND_WARN(s, "put");
790 return 0;
791 }
792 s->data[putp] = (uint8_t)(q >> 56);
793 s->data[putp + 1] = (uint8_t)(q >> 48);
794 s->data[putp + 2] = (uint8_t)(q >> 40);
795 s->data[putp + 3] = (uint8_t)(q >> 32);
796 s->data[putp + 4] = (uint8_t)(q >> 24);
797 s->data[putp + 5] = (uint8_t)(q >> 16);
798 s->data[putp + 6] = (uint8_t)(q >> 8);
799 s->data[putp + 7] = (uint8_t)q;
800
801 return 8;
802 }
803
804 /* Put long word to the stream. */
805 int stream_put_ipv4(struct stream *s, uint32_t l)
806 {
807 STREAM_VERIFY_SANE(s);
808
809 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
810 STREAM_BOUND_WARN(s, "put");
811 return 0;
812 }
813 memcpy(s->data + s->endp, &l, sizeof(uint32_t));
814 s->endp += sizeof(uint32_t);
815
816 return sizeof(uint32_t);
817 }
818
819 /* Put long word to the stream. */
820 int stream_put_in_addr(struct stream *s, struct in_addr *addr)
821 {
822 STREAM_VERIFY_SANE(s);
823
824 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
825 STREAM_BOUND_WARN(s, "put");
826 return 0;
827 }
828
829 memcpy(s->data + s->endp, addr, sizeof(uint32_t));
830 s->endp += sizeof(uint32_t);
831
832 return sizeof(uint32_t);
833 }
834
835 /* Put in_addr at location in the stream. */
836 int stream_put_in_addr_at(struct stream *s, size_t putp, struct in_addr *addr)
837 {
838 STREAM_VERIFY_SANE(s);
839
840 if (!PUT_AT_VALID(s, putp + 4)) {
841 STREAM_BOUND_WARN(s, "put");
842 return 0;
843 }
844
845 memcpy(&s->data[putp], addr, 4);
846 return 4;
847 }
848
849 /* Put in6_addr at location in the stream. */
850 int stream_put_in6_addr_at(struct stream *s, size_t putp, struct in6_addr *addr)
851 {
852 STREAM_VERIFY_SANE(s);
853
854 if (!PUT_AT_VALID(s, putp + 16)) {
855 STREAM_BOUND_WARN(s, "put");
856 return 0;
857 }
858
859 memcpy(&s->data[putp], addr, 16);
860 return 16;
861 }
862
863 /* Put prefix by nlri type format. */
864 int stream_put_prefix_addpath(struct stream *s, struct prefix *p,
865 int addpath_encode, uint32_t addpath_tx_id)
866 {
867 size_t psize;
868 size_t psize_with_addpath;
869
870 STREAM_VERIFY_SANE(s);
871
872 psize = PSIZE(p->prefixlen);
873
874 if (addpath_encode)
875 psize_with_addpath = psize + 4;
876 else
877 psize_with_addpath = psize;
878
879 if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
880 STREAM_BOUND_WARN(s, "put");
881 return 0;
882 }
883
884 if (addpath_encode) {
885 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
886 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
887 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
888 s->data[s->endp++] = (uint8_t)addpath_tx_id;
889 }
890
891 s->data[s->endp++] = p->prefixlen;
892 memcpy(s->data + s->endp, &p->u.prefix, psize);
893 s->endp += psize;
894
895 return psize;
896 }
897
898 int stream_put_prefix(struct stream *s, struct prefix *p)
899 {
900 return stream_put_prefix_addpath(s, p, 0, 0);
901 }
902
903 /* Put NLRI with label */
904 int stream_put_labeled_prefix(struct stream *s, struct prefix *p,
905 mpls_label_t *label)
906 {
907 size_t psize;
908 uint8_t *label_pnt = (uint8_t *)label;
909
910 STREAM_VERIFY_SANE(s);
911
912 psize = PSIZE(p->prefixlen);
913
914 if (STREAM_WRITEABLE(s) < (psize + 3)) {
915 STREAM_BOUND_WARN(s, "put");
916 return 0;
917 }
918
919 stream_putc(s, (p->prefixlen + 24));
920 stream_putc(s, label_pnt[0]);
921 stream_putc(s, label_pnt[1]);
922 stream_putc(s, label_pnt[2]);
923 memcpy(s->data + s->endp, &p->u.prefix, psize);
924 s->endp += psize;
925
926 return (psize + 3);
927 }
928
929 /* Read size from fd. */
930 int stream_read(struct stream *s, int fd, size_t size)
931 {
932 int nbytes;
933
934 STREAM_VERIFY_SANE(s);
935
936 if (STREAM_WRITEABLE(s) < size) {
937 STREAM_BOUND_WARN(s, "put");
938 return 0;
939 }
940
941 nbytes = readn(fd, s->data + s->endp, size);
942
943 if (nbytes > 0)
944 s->endp += nbytes;
945
946 return nbytes;
947 }
948
949 ssize_t stream_read_try(struct stream *s, int fd, size_t size)
950 {
951 ssize_t nbytes;
952
953 STREAM_VERIFY_SANE(s);
954
955 if (STREAM_WRITEABLE(s) < size) {
956 STREAM_BOUND_WARN(s, "put");
957 /* Fatal (not transient) error, since retrying will not help
958 (stream is too small to contain the desired data). */
959 return -1;
960 }
961
962 if ((nbytes = read(fd, s->data + s->endp, size)) >= 0) {
963 s->endp += nbytes;
964 return nbytes;
965 }
966 /* Error: was it transient (return -2) or fatal (return -1)? */
967 if (ERRNO_IO_RETRY(errno))
968 return -2;
969 zlog_warn("%s: read failed on fd %d: %s", __func__, fd,
970 safe_strerror(errno));
971 return -1;
972 }
973
974 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
975 * whose arguments match the remaining arguments to this function
976 */
977 ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
978 struct sockaddr *from, socklen_t *fromlen)
979 {
980 ssize_t nbytes;
981
982 STREAM_VERIFY_SANE(s);
983
984 if (STREAM_WRITEABLE(s) < size) {
985 STREAM_BOUND_WARN(s, "put");
986 /* Fatal (not transient) error, since retrying will not help
987 (stream is too small to contain the desired data). */
988 return -1;
989 }
990
991 if ((nbytes = recvfrom(fd, s->data + s->endp, size, flags, from,
992 fromlen))
993 >= 0) {
994 s->endp += nbytes;
995 return nbytes;
996 }
997 /* Error: was it transient (return -2) or fatal (return -1)? */
998 if (ERRNO_IO_RETRY(errno))
999 return -2;
1000 zlog_warn("%s: read failed on fd %d: %s", __func__, fd,
1001 safe_strerror(errno));
1002 return -1;
1003 }
1004
1005 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1006 * from endp.
1007 * First iovec will be used to receive the data.
1008 * Stream need not be empty.
1009 */
1010 ssize_t stream_recvmsg(struct stream *s, int fd, struct msghdr *msgh, int flags,
1011 size_t size)
1012 {
1013 int nbytes;
1014 struct iovec *iov;
1015
1016 STREAM_VERIFY_SANE(s);
1017 assert(msgh->msg_iovlen > 0);
1018
1019 if (STREAM_WRITEABLE(s) < size) {
1020 STREAM_BOUND_WARN(s, "put");
1021 /* This is a logic error in the calling code: the stream is too
1022 small
1023 to hold the desired data! */
1024 return -1;
1025 }
1026
1027 iov = &(msgh->msg_iov[0]);
1028 iov->iov_base = (s->data + s->endp);
1029 iov->iov_len = size;
1030
1031 nbytes = recvmsg(fd, msgh, flags);
1032
1033 if (nbytes > 0)
1034 s->endp += nbytes;
1035
1036 return nbytes;
1037 }
1038
1039 /* Write data to buffer. */
1040 size_t stream_write(struct stream *s, const void *ptr, size_t size)
1041 {
1042
1043 CHECK_SIZE(s, size);
1044
1045 STREAM_VERIFY_SANE(s);
1046
1047 if (STREAM_WRITEABLE(s) < size) {
1048 STREAM_BOUND_WARN(s, "put");
1049 return 0;
1050 }
1051
1052 memcpy(s->data + s->endp, ptr, size);
1053 s->endp += size;
1054
1055 return size;
1056 }
1057
1058 /* Return current read pointer.
1059 * DEPRECATED!
1060 * Use stream_get_pnt_to if you must, but decoding streams properly
1061 * is preferred
1062 */
1063 uint8_t *stream_pnt(struct stream *s)
1064 {
1065 STREAM_VERIFY_SANE(s);
1066 return s->data + s->getp;
1067 }
1068
1069 /* Check does this stream empty? */
1070 int stream_empty(struct stream *s)
1071 {
1072 STREAM_VERIFY_SANE(s);
1073
1074 return (s->endp == 0);
1075 }
1076
1077 /* Reset stream. */
1078 void stream_reset(struct stream *s)
1079 {
1080 STREAM_VERIFY_SANE(s);
1081
1082 s->getp = s->endp = 0;
1083 }
1084
1085 /* Write stream contens to the file discriptor. */
1086 int stream_flush(struct stream *s, int fd)
1087 {
1088 int nbytes;
1089
1090 STREAM_VERIFY_SANE(s);
1091
1092 nbytes = write(fd, s->data + s->getp, s->endp - s->getp);
1093
1094 return nbytes;
1095 }
1096
1097 /* Stream first in first out queue. */
1098
1099 struct stream_fifo *stream_fifo_new(void)
1100 {
1101 struct stream_fifo *new;
1102
1103 new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
1104 pthread_mutex_init(&new->mtx, NULL);
1105 return new;
1106 }
1107
1108 /* Add new stream to fifo. */
1109 void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
1110 {
1111 #if defined DEV_BUILD
1112 size_t max, curmax;
1113 #endif
1114
1115 if (fifo->tail)
1116 fifo->tail->next = s;
1117 else
1118 fifo->head = s;
1119
1120 fifo->tail = s;
1121 fifo->tail->next = NULL;
1122 #if !defined DEV_BUILD
1123 atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1124 #else
1125 max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1126 curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
1127 if (max > curmax)
1128 atomic_store_explicit(&fifo->max_count, max,
1129 memory_order_relaxed);
1130 #endif
1131 }
1132
1133 void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
1134 {
1135 pthread_mutex_lock(&fifo->mtx);
1136 {
1137 stream_fifo_push(fifo, s);
1138 }
1139 pthread_mutex_unlock(&fifo->mtx);
1140 }
1141
1142 /* Delete first stream from fifo. */
1143 struct stream *stream_fifo_pop(struct stream_fifo *fifo)
1144 {
1145 struct stream *s;
1146
1147 s = fifo->head;
1148
1149 if (s) {
1150 fifo->head = s->next;
1151
1152 if (fifo->head == NULL)
1153 fifo->tail = NULL;
1154
1155 atomic_fetch_sub_explicit(&fifo->count, 1,
1156 memory_order_release);
1157
1158 /* ensure stream is scrubbed of references to this fifo */
1159 s->next = NULL;
1160 }
1161
1162 return s;
1163 }
1164
1165 struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
1166 {
1167 struct stream *ret;
1168
1169 pthread_mutex_lock(&fifo->mtx);
1170 {
1171 ret = stream_fifo_pop(fifo);
1172 }
1173 pthread_mutex_unlock(&fifo->mtx);
1174
1175 return ret;
1176 }
1177
1178 struct stream *stream_fifo_head(struct stream_fifo *fifo)
1179 {
1180 return fifo->head;
1181 }
1182
1183 struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
1184 {
1185 struct stream *ret;
1186
1187 pthread_mutex_lock(&fifo->mtx);
1188 {
1189 ret = stream_fifo_head(fifo);
1190 }
1191 pthread_mutex_unlock(&fifo->mtx);
1192
1193 return ret;
1194 }
1195
1196 void stream_fifo_clean(struct stream_fifo *fifo)
1197 {
1198 struct stream *s;
1199 struct stream *next;
1200
1201 for (s = fifo->head; s; s = next) {
1202 next = s->next;
1203 stream_free(s);
1204 }
1205 fifo->head = fifo->tail = NULL;
1206 atomic_store_explicit(&fifo->count, 0, memory_order_release);
1207 }
1208
1209 void stream_fifo_clean_safe(struct stream_fifo *fifo)
1210 {
1211 pthread_mutex_lock(&fifo->mtx);
1212 {
1213 stream_fifo_clean(fifo);
1214 }
1215 pthread_mutex_unlock(&fifo->mtx);
1216 }
1217
1218 size_t stream_fifo_count_safe(struct stream_fifo *fifo)
1219 {
1220 return atomic_load_explicit(&fifo->count, memory_order_acquire);
1221 }
1222
1223 void stream_fifo_free(struct stream_fifo *fifo)
1224 {
1225 stream_fifo_clean(fifo);
1226 pthread_mutex_destroy(&fifo->mtx);
1227 XFREE(MTYPE_STREAM_FIFO, fifo);
1228 }