]> git.proxmox.com Git - mirror_frr.git/blob - lib/stream.c
lib: fix doc comment of the "cli_show_end" northbound callback
[mirror_frr.git] / lib / stream.c
1 /*
2 * Packet interface
3 * Copyright (C) 1999 Kunihiro Ishiguro
4 *
5 * This file is part of GNU Zebra.
6 *
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
10 * later version.
11 *
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <zebra.h>
23 #include <stddef.h>
24 #include <pthread.h>
25
26 #include "stream.h"
27 #include "memory.h"
28 #include "network.h"
29 #include "prefix.h"
30 #include "log.h"
31 #include "lib_errors.h"
32
33 DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream")
34 DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
35
36 /* Tests whether a position is valid */
37 #define GETP_VALID(S, G) ((G) <= (S)->endp)
38 #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
39 #define ENDP_VALID(S, E) ((E) <= (S)->size)
40
41 /* asserting sanity checks. Following must be true before
42 * stream functions are called:
43 *
44 * Following must always be true of stream elements
45 * before and after calls to stream functions:
46 *
47 * getp <= endp <= size
48 *
49 * Note that after a stream function is called following may be true:
50 * if (getp == endp) then stream is no longer readable
51 * if (endp == size) then stream is no longer writeable
52 *
53 * It is valid to put to anywhere within the size of the stream, but only
54 * using stream_put..._at() functions.
55 */
56 #define STREAM_WARN_OFFSETS(S) \
57 flog_warn(EC_LIB_STREAM, \
58 "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
59 (void *)(S), (unsigned long)(S)->size, \
60 (unsigned long)(S)->getp, (unsigned long)(S)->endp)
61
62 #define STREAM_VERIFY_SANE(S) \
63 do { \
64 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) \
65 STREAM_WARN_OFFSETS(S); \
66 assert(GETP_VALID(S, (S)->getp)); \
67 assert(ENDP_VALID(S, (S)->endp)); \
68 } while (0)
69
70 #define STREAM_BOUND_WARN(S, WHAT) \
71 do { \
72 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
73 __func__, (WHAT)); \
74 STREAM_WARN_OFFSETS(S); \
75 assert(0); \
76 } while (0)
77
78 #define STREAM_BOUND_WARN2(S, WHAT) \
79 do { \
80 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
81 __func__, (WHAT)); \
82 STREAM_WARN_OFFSETS(S); \
83 } while (0)
84
85 /* XXX: Deprecated macro: do not use */
86 #define CHECK_SIZE(S, Z) \
87 do { \
88 if (((S)->endp + (Z)) > (S)->size) { \
89 flog_warn( \
90 EC_LIB_STREAM, \
91 "CHECK_SIZE: truncating requested size %lu\n", \
92 (unsigned long)(Z)); \
93 STREAM_WARN_OFFSETS(S); \
94 (Z) = (S)->size - (S)->endp; \
95 } \
96 } while (0);
97
98 /* Make stream buffer. */
99 struct stream *stream_new(size_t size)
100 {
101 struct stream *s;
102
103 assert(size > 0);
104
105 s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size);
106
107 s->getp = s->endp = 0;
108 s->next = NULL;
109 s->size = size;
110 return s;
111 }
112
113 /* Free it now. */
114 void stream_free(struct stream *s)
115 {
116 if (!s)
117 return;
118
119 XFREE(MTYPE_STREAM, s);
120 }
121
122 struct stream *stream_copy(struct stream *new, struct stream *src)
123 {
124 STREAM_VERIFY_SANE(src);
125
126 assert(new != NULL);
127 assert(STREAM_SIZE(new) >= src->endp);
128
129 new->endp = src->endp;
130 new->getp = src->getp;
131
132 memcpy(new->data, src->data, src->endp);
133
134 return new;
135 }
136
137 struct stream *stream_dup(struct stream *s)
138 {
139 struct stream *new;
140
141 STREAM_VERIFY_SANE(s);
142
143 if ((new = stream_new(s->endp)) == NULL)
144 return NULL;
145
146 return (stream_copy(new, s));
147 }
148
149 struct stream *stream_dupcat(struct stream *s1, struct stream *s2,
150 size_t offset)
151 {
152 struct stream *new;
153
154 STREAM_VERIFY_SANE(s1);
155 STREAM_VERIFY_SANE(s2);
156
157 if ((new = stream_new(s1->endp + s2->endp)) == NULL)
158 return NULL;
159
160 memcpy(new->data, s1->data, offset);
161 memcpy(new->data + offset, s2->data, s2->endp);
162 memcpy(new->data + offset + s2->endp, s1->data + offset,
163 (s1->endp - offset));
164 new->endp = s1->endp + s2->endp;
165 return new;
166 }
167
168 size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
169 {
170 struct stream *orig = *sptr;
171
172 STREAM_VERIFY_SANE(orig);
173
174 orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize);
175
176 orig->size = newsize;
177
178 if (orig->endp > orig->size)
179 orig->endp = orig->size;
180 if (orig->getp > orig->endp)
181 orig->getp = orig->endp;
182
183 STREAM_VERIFY_SANE(orig);
184
185 *sptr = orig;
186 return orig->size;
187 }
188
189 size_t __attribute__((deprecated))stream_resize_orig(struct stream *s,
190 size_t newsize)
191 {
192 assert("stream_resize: Switch code to use stream_resize_inplace" == NULL);
193
194 return stream_resize_inplace(&s, newsize);
195 }
196
197 size_t stream_get_getp(struct stream *s)
198 {
199 STREAM_VERIFY_SANE(s);
200 return s->getp;
201 }
202
203 size_t stream_get_endp(struct stream *s)
204 {
205 STREAM_VERIFY_SANE(s);
206 return s->endp;
207 }
208
209 size_t stream_get_size(struct stream *s)
210 {
211 STREAM_VERIFY_SANE(s);
212 return s->size;
213 }
214
215 /* Stream structre' stream pointer related functions. */
216 void stream_set_getp(struct stream *s, size_t pos)
217 {
218 STREAM_VERIFY_SANE(s);
219
220 if (!GETP_VALID(s, pos)) {
221 STREAM_BOUND_WARN(s, "set getp");
222 pos = s->endp;
223 }
224
225 s->getp = pos;
226 }
227
228 void stream_set_endp(struct stream *s, size_t pos)
229 {
230 STREAM_VERIFY_SANE(s);
231
232 if (!ENDP_VALID(s, pos)) {
233 STREAM_BOUND_WARN(s, "set endp");
234 return;
235 }
236
237 /*
238 * Make sure the current read pointer is not beyond the new endp.
239 */
240 if (s->getp > pos) {
241 STREAM_BOUND_WARN(s, "set endp");
242 return;
243 }
244
245 s->endp = pos;
246 STREAM_VERIFY_SANE(s);
247 }
248
249 /* Forward pointer. */
250 void stream_forward_getp(struct stream *s, size_t size)
251 {
252 STREAM_VERIFY_SANE(s);
253
254 if (!GETP_VALID(s, s->getp + size)) {
255 STREAM_BOUND_WARN(s, "seek getp");
256 return;
257 }
258
259 s->getp += size;
260 }
261
262 void stream_forward_endp(struct stream *s, size_t size)
263 {
264 STREAM_VERIFY_SANE(s);
265
266 if (!ENDP_VALID(s, s->endp + size)) {
267 STREAM_BOUND_WARN(s, "seek endp");
268 return;
269 }
270
271 s->endp += size;
272 }
273
274 /* Copy from stream to destination. */
275 bool stream_get2(void *dst, struct stream *s, size_t size)
276 {
277 STREAM_VERIFY_SANE(s);
278
279 if (STREAM_READABLE(s) < size) {
280 STREAM_BOUND_WARN2(s, "get");
281 return false;
282 }
283
284 memcpy(dst, s->data + s->getp, size);
285 s->getp += size;
286
287 return true;
288 }
289
290 void stream_get(void *dst, struct stream *s, size_t size)
291 {
292 STREAM_VERIFY_SANE(s);
293
294 if (STREAM_READABLE(s) < size) {
295 STREAM_BOUND_WARN(s, "get");
296 return;
297 }
298
299 memcpy(dst, s->data + s->getp, size);
300 s->getp += size;
301 }
302
303 /* Get next character from the stream. */
304 bool stream_getc2(struct stream *s, uint8_t *byte)
305 {
306 STREAM_VERIFY_SANE(s);
307
308 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
309 STREAM_BOUND_WARN2(s, "get char");
310 return false;
311 }
312 *byte = s->data[s->getp++];
313
314 return true;
315 }
316
317 uint8_t stream_getc(struct stream *s)
318 {
319 uint8_t c;
320
321 STREAM_VERIFY_SANE(s);
322
323 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
324 STREAM_BOUND_WARN(s, "get char");
325 return 0;
326 }
327 c = s->data[s->getp++];
328
329 return c;
330 }
331
332 /* Get next character from the stream. */
333 uint8_t stream_getc_from(struct stream *s, size_t from)
334 {
335 uint8_t c;
336
337 STREAM_VERIFY_SANE(s);
338
339 if (!GETP_VALID(s, from + sizeof(uint8_t))) {
340 STREAM_BOUND_WARN(s, "get char");
341 return 0;
342 }
343
344 c = s->data[from];
345
346 return c;
347 }
348
349 bool stream_getw2(struct stream *s, uint16_t *word)
350 {
351 STREAM_VERIFY_SANE(s);
352
353 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
354 STREAM_BOUND_WARN2(s, "get ");
355 return false;
356 }
357
358 *word = s->data[s->getp++] << 8;
359 *word |= s->data[s->getp++];
360
361 return true;
362 }
363
364 /* Get next word from the stream. */
365 uint16_t stream_getw(struct stream *s)
366 {
367 uint16_t w;
368
369 STREAM_VERIFY_SANE(s);
370
371 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
372 STREAM_BOUND_WARN(s, "get ");
373 return 0;
374 }
375
376 w = s->data[s->getp++] << 8;
377 w |= s->data[s->getp++];
378
379 return w;
380 }
381
382 /* Get next word from the stream. */
383 uint16_t stream_getw_from(struct stream *s, size_t from)
384 {
385 uint16_t w;
386
387 STREAM_VERIFY_SANE(s);
388
389 if (!GETP_VALID(s, from + sizeof(uint16_t))) {
390 STREAM_BOUND_WARN(s, "get ");
391 return 0;
392 }
393
394 w = s->data[from++] << 8;
395 w |= s->data[from];
396
397 return w;
398 }
399
400 /* Get next 3-byte from the stream. */
401 uint32_t stream_get3_from(struct stream *s, size_t from)
402 {
403 uint32_t l;
404
405 STREAM_VERIFY_SANE(s);
406
407 if (!GETP_VALID(s, from + 3)) {
408 STREAM_BOUND_WARN(s, "get 3byte");
409 return 0;
410 }
411
412 l = s->data[from++] << 16;
413 l |= s->data[from++] << 8;
414 l |= s->data[from];
415
416 return l;
417 }
418
419 uint32_t stream_get3(struct stream *s)
420 {
421 uint32_t l;
422
423 STREAM_VERIFY_SANE(s);
424
425 if (STREAM_READABLE(s) < 3) {
426 STREAM_BOUND_WARN(s, "get 3byte");
427 return 0;
428 }
429
430 l = s->data[s->getp++] << 16;
431 l |= s->data[s->getp++] << 8;
432 l |= s->data[s->getp++];
433
434 return l;
435 }
436
437 /* Get next long word from the stream. */
438 uint32_t stream_getl_from(struct stream *s, size_t from)
439 {
440 uint32_t l;
441
442 STREAM_VERIFY_SANE(s);
443
444 if (!GETP_VALID(s, from + sizeof(uint32_t))) {
445 STREAM_BOUND_WARN(s, "get long");
446 return 0;
447 }
448
449 l = (unsigned)(s->data[from++]) << 24;
450 l |= s->data[from++] << 16;
451 l |= s->data[from++] << 8;
452 l |= s->data[from];
453
454 return l;
455 }
456
457 /* Copy from stream at specific location to destination. */
458 void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
459 {
460 STREAM_VERIFY_SANE(s);
461
462 if (!GETP_VALID(s, from + size)) {
463 STREAM_BOUND_WARN(s, "get from");
464 return;
465 }
466
467 memcpy(dst, s->data + from, size);
468 }
469
470 bool stream_getl2(struct stream *s, uint32_t *l)
471 {
472 STREAM_VERIFY_SANE(s);
473
474 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
475 STREAM_BOUND_WARN2(s, "get long");
476 return false;
477 }
478
479 *l = (unsigned int)(s->data[s->getp++]) << 24;
480 *l |= s->data[s->getp++] << 16;
481 *l |= s->data[s->getp++] << 8;
482 *l |= s->data[s->getp++];
483
484 return true;
485 }
486
487 uint32_t stream_getl(struct stream *s)
488 {
489 uint32_t l;
490
491 STREAM_VERIFY_SANE(s);
492
493 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
494 STREAM_BOUND_WARN(s, "get long");
495 return 0;
496 }
497
498 l = (unsigned)(s->data[s->getp++]) << 24;
499 l |= s->data[s->getp++] << 16;
500 l |= s->data[s->getp++] << 8;
501 l |= s->data[s->getp++];
502
503 return l;
504 }
505
506 /* Get next quad word from the stream. */
507 uint64_t stream_getq_from(struct stream *s, size_t from)
508 {
509 uint64_t q;
510
511 STREAM_VERIFY_SANE(s);
512
513 if (!GETP_VALID(s, from + sizeof(uint64_t))) {
514 STREAM_BOUND_WARN(s, "get quad");
515 return 0;
516 }
517
518 q = ((uint64_t)s->data[from++]) << 56;
519 q |= ((uint64_t)s->data[from++]) << 48;
520 q |= ((uint64_t)s->data[from++]) << 40;
521 q |= ((uint64_t)s->data[from++]) << 32;
522 q |= ((uint64_t)s->data[from++]) << 24;
523 q |= ((uint64_t)s->data[from++]) << 16;
524 q |= ((uint64_t)s->data[from++]) << 8;
525 q |= ((uint64_t)s->data[from++]);
526
527 return q;
528 }
529
530 uint64_t stream_getq(struct stream *s)
531 {
532 uint64_t q;
533
534 STREAM_VERIFY_SANE(s);
535
536 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
537 STREAM_BOUND_WARN(s, "get quad");
538 return 0;
539 }
540
541 q = ((uint64_t)s->data[s->getp++]) << 56;
542 q |= ((uint64_t)s->data[s->getp++]) << 48;
543 q |= ((uint64_t)s->data[s->getp++]) << 40;
544 q |= ((uint64_t)s->data[s->getp++]) << 32;
545 q |= ((uint64_t)s->data[s->getp++]) << 24;
546 q |= ((uint64_t)s->data[s->getp++]) << 16;
547 q |= ((uint64_t)s->data[s->getp++]) << 8;
548 q |= ((uint64_t)s->data[s->getp++]);
549
550 return q;
551 }
552
553 /* Get next long word from the stream. */
554 uint32_t stream_get_ipv4(struct stream *s)
555 {
556 uint32_t l;
557
558 STREAM_VERIFY_SANE(s);
559
560 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
561 STREAM_BOUND_WARN(s, "get ipv4");
562 return 0;
563 }
564
565 memcpy(&l, s->data + s->getp, sizeof(uint32_t));
566 s->getp += sizeof(uint32_t);
567
568 return l;
569 }
570
571 float stream_getf(struct stream *s)
572 {
573 union {
574 float r;
575 uint32_t d;
576 } u;
577 u.d = stream_getl(s);
578 return u.r;
579 }
580
581 double stream_getd(struct stream *s)
582 {
583 union {
584 double r;
585 uint64_t d;
586 } u;
587 u.d = stream_getq(s);
588 return u.r;
589 }
590
591 /* Copy to source to stream.
592 *
593 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
594 * around. This should be fixed once the stream updates are working.
595 *
596 * stream_write() is saner
597 */
598 void stream_put(struct stream *s, const void *src, size_t size)
599 {
600
601 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
602 CHECK_SIZE(s, size);
603
604 STREAM_VERIFY_SANE(s);
605
606 if (STREAM_WRITEABLE(s) < size) {
607 STREAM_BOUND_WARN(s, "put");
608 return;
609 }
610
611 if (src)
612 memcpy(s->data + s->endp, src, size);
613 else
614 memset(s->data + s->endp, 0, size);
615
616 s->endp += size;
617 }
618
619 /* Put character to the stream. */
620 int stream_putc(struct stream *s, uint8_t c)
621 {
622 STREAM_VERIFY_SANE(s);
623
624 if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
625 STREAM_BOUND_WARN(s, "put");
626 return 0;
627 }
628
629 s->data[s->endp++] = c;
630 return sizeof(uint8_t);
631 }
632
633 /* Put word to the stream. */
634 int stream_putw(struct stream *s, uint16_t w)
635 {
636 STREAM_VERIFY_SANE(s);
637
638 if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
639 STREAM_BOUND_WARN(s, "put");
640 return 0;
641 }
642
643 s->data[s->endp++] = (uint8_t)(w >> 8);
644 s->data[s->endp++] = (uint8_t)w;
645
646 return 2;
647 }
648
649 /* Put long word to the stream. */
650 int stream_put3(struct stream *s, uint32_t l)
651 {
652 STREAM_VERIFY_SANE(s);
653
654 if (STREAM_WRITEABLE(s) < 3) {
655 STREAM_BOUND_WARN(s, "put");
656 return 0;
657 }
658
659 s->data[s->endp++] = (uint8_t)(l >> 16);
660 s->data[s->endp++] = (uint8_t)(l >> 8);
661 s->data[s->endp++] = (uint8_t)l;
662
663 return 3;
664 }
665
666 /* Put long word to the stream. */
667 int stream_putl(struct stream *s, uint32_t l)
668 {
669 STREAM_VERIFY_SANE(s);
670
671 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
672 STREAM_BOUND_WARN(s, "put");
673 return 0;
674 }
675
676 s->data[s->endp++] = (uint8_t)(l >> 24);
677 s->data[s->endp++] = (uint8_t)(l >> 16);
678 s->data[s->endp++] = (uint8_t)(l >> 8);
679 s->data[s->endp++] = (uint8_t)l;
680
681 return 4;
682 }
683
684 /* Put quad word to the stream. */
685 int stream_putq(struct stream *s, uint64_t q)
686 {
687 STREAM_VERIFY_SANE(s);
688
689 if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
690 STREAM_BOUND_WARN(s, "put quad");
691 return 0;
692 }
693
694 s->data[s->endp++] = (uint8_t)(q >> 56);
695 s->data[s->endp++] = (uint8_t)(q >> 48);
696 s->data[s->endp++] = (uint8_t)(q >> 40);
697 s->data[s->endp++] = (uint8_t)(q >> 32);
698 s->data[s->endp++] = (uint8_t)(q >> 24);
699 s->data[s->endp++] = (uint8_t)(q >> 16);
700 s->data[s->endp++] = (uint8_t)(q >> 8);
701 s->data[s->endp++] = (uint8_t)q;
702
703 return 8;
704 }
705
706 int stream_putf(struct stream *s, float f)
707 {
708 union {
709 float i;
710 uint32_t o;
711 } u;
712 u.i = f;
713 return stream_putl(s, u.o);
714 }
715
716 int stream_putd(struct stream *s, double d)
717 {
718 union {
719 double i;
720 uint64_t o;
721 } u;
722 u.i = d;
723 return stream_putq(s, u.o);
724 }
725
726 int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
727 {
728 STREAM_VERIFY_SANE(s);
729
730 if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
731 STREAM_BOUND_WARN(s, "put");
732 return 0;
733 }
734
735 s->data[putp] = c;
736
737 return 1;
738 }
739
740 int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
741 {
742 STREAM_VERIFY_SANE(s);
743
744 if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
745 STREAM_BOUND_WARN(s, "put");
746 return 0;
747 }
748
749 s->data[putp] = (uint8_t)(w >> 8);
750 s->data[putp + 1] = (uint8_t)w;
751
752 return 2;
753 }
754
755 int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
756 {
757 STREAM_VERIFY_SANE(s);
758
759 if (!PUT_AT_VALID(s, putp + 3)) {
760 STREAM_BOUND_WARN(s, "put");
761 return 0;
762 }
763 s->data[putp] = (uint8_t)(l >> 16);
764 s->data[putp + 1] = (uint8_t)(l >> 8);
765 s->data[putp + 2] = (uint8_t)l;
766
767 return 3;
768 }
769
770 int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
771 {
772 STREAM_VERIFY_SANE(s);
773
774 if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
775 STREAM_BOUND_WARN(s, "put");
776 return 0;
777 }
778 s->data[putp] = (uint8_t)(l >> 24);
779 s->data[putp + 1] = (uint8_t)(l >> 16);
780 s->data[putp + 2] = (uint8_t)(l >> 8);
781 s->data[putp + 3] = (uint8_t)l;
782
783 return 4;
784 }
785
786 int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
787 {
788 STREAM_VERIFY_SANE(s);
789
790 if (!PUT_AT_VALID(s, putp + sizeof(uint64_t))) {
791 STREAM_BOUND_WARN(s, "put");
792 return 0;
793 }
794 s->data[putp] = (uint8_t)(q >> 56);
795 s->data[putp + 1] = (uint8_t)(q >> 48);
796 s->data[putp + 2] = (uint8_t)(q >> 40);
797 s->data[putp + 3] = (uint8_t)(q >> 32);
798 s->data[putp + 4] = (uint8_t)(q >> 24);
799 s->data[putp + 5] = (uint8_t)(q >> 16);
800 s->data[putp + 6] = (uint8_t)(q >> 8);
801 s->data[putp + 7] = (uint8_t)q;
802
803 return 8;
804 }
805
806 /* Put long word to the stream. */
807 int stream_put_ipv4(struct stream *s, uint32_t l)
808 {
809 STREAM_VERIFY_SANE(s);
810
811 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
812 STREAM_BOUND_WARN(s, "put");
813 return 0;
814 }
815 memcpy(s->data + s->endp, &l, sizeof(uint32_t));
816 s->endp += sizeof(uint32_t);
817
818 return sizeof(uint32_t);
819 }
820
821 /* Put long word to the stream. */
822 int stream_put_in_addr(struct stream *s, struct in_addr *addr)
823 {
824 STREAM_VERIFY_SANE(s);
825
826 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
827 STREAM_BOUND_WARN(s, "put");
828 return 0;
829 }
830
831 memcpy(s->data + s->endp, addr, sizeof(uint32_t));
832 s->endp += sizeof(uint32_t);
833
834 return sizeof(uint32_t);
835 }
836
837 /* Put in_addr at location in the stream. */
838 int stream_put_in_addr_at(struct stream *s, size_t putp, struct in_addr *addr)
839 {
840 STREAM_VERIFY_SANE(s);
841
842 if (!PUT_AT_VALID(s, putp + 4)) {
843 STREAM_BOUND_WARN(s, "put");
844 return 0;
845 }
846
847 memcpy(&s->data[putp], addr, 4);
848 return 4;
849 }
850
851 /* Put in6_addr at location in the stream. */
852 int stream_put_in6_addr_at(struct stream *s, size_t putp, struct in6_addr *addr)
853 {
854 STREAM_VERIFY_SANE(s);
855
856 if (!PUT_AT_VALID(s, putp + 16)) {
857 STREAM_BOUND_WARN(s, "put");
858 return 0;
859 }
860
861 memcpy(&s->data[putp], addr, 16);
862 return 16;
863 }
864
865 /* Put prefix by nlri type format. */
866 int stream_put_prefix_addpath(struct stream *s, struct prefix *p,
867 int addpath_encode, uint32_t addpath_tx_id)
868 {
869 size_t psize;
870 size_t psize_with_addpath;
871
872 STREAM_VERIFY_SANE(s);
873
874 psize = PSIZE(p->prefixlen);
875
876 if (addpath_encode)
877 psize_with_addpath = psize + 4;
878 else
879 psize_with_addpath = psize;
880
881 if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
882 STREAM_BOUND_WARN(s, "put");
883 return 0;
884 }
885
886 if (addpath_encode) {
887 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
888 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
889 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
890 s->data[s->endp++] = (uint8_t)addpath_tx_id;
891 }
892
893 s->data[s->endp++] = p->prefixlen;
894 memcpy(s->data + s->endp, &p->u.prefix, psize);
895 s->endp += psize;
896
897 return psize;
898 }
899
900 int stream_put_prefix(struct stream *s, struct prefix *p)
901 {
902 return stream_put_prefix_addpath(s, p, 0, 0);
903 }
904
905 /* Put NLRI with label */
906 int stream_put_labeled_prefix(struct stream *s, struct prefix *p,
907 mpls_label_t *label, int addpath_encode,
908 uint32_t addpath_tx_id)
909 {
910 size_t psize;
911 size_t psize_with_addpath;
912 uint8_t *label_pnt = (uint8_t *)label;
913
914 STREAM_VERIFY_SANE(s);
915
916 psize = PSIZE(p->prefixlen);
917 psize_with_addpath = psize + (addpath_encode ? 4 : 0);
918
919 if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) {
920 STREAM_BOUND_WARN(s, "put");
921 return 0;
922 }
923
924 if (addpath_encode) {
925 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
926 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
927 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
928 s->data[s->endp++] = (uint8_t)addpath_tx_id;
929 }
930
931 stream_putc(s, (p->prefixlen + 24));
932 stream_putc(s, label_pnt[0]);
933 stream_putc(s, label_pnt[1]);
934 stream_putc(s, label_pnt[2]);
935 memcpy(s->data + s->endp, &p->u.prefix, psize);
936 s->endp += psize;
937
938 return (psize + 3);
939 }
940
941 /* Read size from fd. */
942 int stream_read(struct stream *s, int fd, size_t size)
943 {
944 int nbytes;
945
946 STREAM_VERIFY_SANE(s);
947
948 if (STREAM_WRITEABLE(s) < size) {
949 STREAM_BOUND_WARN(s, "put");
950 return 0;
951 }
952
953 nbytes = readn(fd, s->data + s->endp, size);
954
955 if (nbytes > 0)
956 s->endp += nbytes;
957
958 return nbytes;
959 }
960
961 ssize_t stream_read_try(struct stream *s, int fd, size_t size)
962 {
963 ssize_t nbytes;
964
965 STREAM_VERIFY_SANE(s);
966
967 if (STREAM_WRITEABLE(s) < size) {
968 STREAM_BOUND_WARN(s, "put");
969 /* Fatal (not transient) error, since retrying will not help
970 (stream is too small to contain the desired data). */
971 return -1;
972 }
973
974 if ((nbytes = read(fd, s->data + s->endp, size)) >= 0) {
975 s->endp += nbytes;
976 return nbytes;
977 }
978 /* Error: was it transient (return -2) or fatal (return -1)? */
979 if (ERRNO_IO_RETRY(errno))
980 return -2;
981 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
982 safe_strerror(errno));
983 return -1;
984 }
985
986 /* Read up to size bytes into the stream from the fd, using recvmsgfrom
987 * whose arguments match the remaining arguments to this function
988 */
989 ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
990 struct sockaddr *from, socklen_t *fromlen)
991 {
992 ssize_t nbytes;
993
994 STREAM_VERIFY_SANE(s);
995
996 if (STREAM_WRITEABLE(s) < size) {
997 STREAM_BOUND_WARN(s, "put");
998 /* Fatal (not transient) error, since retrying will not help
999 (stream is too small to contain the desired data). */
1000 return -1;
1001 }
1002
1003 if ((nbytes = recvfrom(fd, s->data + s->endp, size, flags, from,
1004 fromlen))
1005 >= 0) {
1006 s->endp += nbytes;
1007 return nbytes;
1008 }
1009 /* Error: was it transient (return -2) or fatal (return -1)? */
1010 if (ERRNO_IO_RETRY(errno))
1011 return -2;
1012 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
1013 safe_strerror(errno));
1014 return -1;
1015 }
1016
1017 /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1018 * from endp.
1019 * First iovec will be used to receive the data.
1020 * Stream need not be empty.
1021 */
1022 ssize_t stream_recvmsg(struct stream *s, int fd, struct msghdr *msgh, int flags,
1023 size_t size)
1024 {
1025 int nbytes;
1026 struct iovec *iov;
1027
1028 STREAM_VERIFY_SANE(s);
1029 assert(msgh->msg_iovlen > 0);
1030
1031 if (STREAM_WRITEABLE(s) < size) {
1032 STREAM_BOUND_WARN(s, "put");
1033 /* This is a logic error in the calling code: the stream is too
1034 small
1035 to hold the desired data! */
1036 return -1;
1037 }
1038
1039 iov = &(msgh->msg_iov[0]);
1040 iov->iov_base = (s->data + s->endp);
1041 iov->iov_len = size;
1042
1043 nbytes = recvmsg(fd, msgh, flags);
1044
1045 if (nbytes > 0)
1046 s->endp += nbytes;
1047
1048 return nbytes;
1049 }
1050
1051 /* Write data to buffer. */
1052 size_t stream_write(struct stream *s, const void *ptr, size_t size)
1053 {
1054
1055 CHECK_SIZE(s, size);
1056
1057 STREAM_VERIFY_SANE(s);
1058
1059 if (STREAM_WRITEABLE(s) < size) {
1060 STREAM_BOUND_WARN(s, "put");
1061 return 0;
1062 }
1063
1064 memcpy(s->data + s->endp, ptr, size);
1065 s->endp += size;
1066
1067 return size;
1068 }
1069
1070 /* Return current read pointer.
1071 * DEPRECATED!
1072 * Use stream_get_pnt_to if you must, but decoding streams properly
1073 * is preferred
1074 */
1075 uint8_t *stream_pnt(struct stream *s)
1076 {
1077 STREAM_VERIFY_SANE(s);
1078 return s->data + s->getp;
1079 }
1080
1081 /* Check does this stream empty? */
1082 int stream_empty(struct stream *s)
1083 {
1084 STREAM_VERIFY_SANE(s);
1085
1086 return (s->endp == 0);
1087 }
1088
1089 /* Reset stream. */
1090 void stream_reset(struct stream *s)
1091 {
1092 STREAM_VERIFY_SANE(s);
1093
1094 s->getp = s->endp = 0;
1095 }
1096
1097 /* Write stream contens to the file discriptor. */
1098 int stream_flush(struct stream *s, int fd)
1099 {
1100 int nbytes;
1101
1102 STREAM_VERIFY_SANE(s);
1103
1104 nbytes = write(fd, s->data + s->getp, s->endp - s->getp);
1105
1106 return nbytes;
1107 }
1108
1109 /* Stream first in first out queue. */
1110
1111 struct stream_fifo *stream_fifo_new(void)
1112 {
1113 struct stream_fifo *new;
1114
1115 new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
1116 pthread_mutex_init(&new->mtx, NULL);
1117 return new;
1118 }
1119
1120 /* Add new stream to fifo. */
1121 void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
1122 {
1123 #if defined DEV_BUILD
1124 size_t max, curmax;
1125 #endif
1126
1127 if (fifo->tail)
1128 fifo->tail->next = s;
1129 else
1130 fifo->head = s;
1131
1132 fifo->tail = s;
1133 fifo->tail->next = NULL;
1134 #if !defined DEV_BUILD
1135 atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1136 #else
1137 max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1138 curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
1139 if (max > curmax)
1140 atomic_store_explicit(&fifo->max_count, max,
1141 memory_order_relaxed);
1142 #endif
1143 }
1144
1145 void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
1146 {
1147 pthread_mutex_lock(&fifo->mtx);
1148 {
1149 stream_fifo_push(fifo, s);
1150 }
1151 pthread_mutex_unlock(&fifo->mtx);
1152 }
1153
1154 /* Delete first stream from fifo. */
1155 struct stream *stream_fifo_pop(struct stream_fifo *fifo)
1156 {
1157 struct stream *s;
1158
1159 s = fifo->head;
1160
1161 if (s) {
1162 fifo->head = s->next;
1163
1164 if (fifo->head == NULL)
1165 fifo->tail = NULL;
1166
1167 atomic_fetch_sub_explicit(&fifo->count, 1,
1168 memory_order_release);
1169
1170 /* ensure stream is scrubbed of references to this fifo */
1171 s->next = NULL;
1172 }
1173
1174 return s;
1175 }
1176
1177 struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
1178 {
1179 struct stream *ret;
1180
1181 pthread_mutex_lock(&fifo->mtx);
1182 {
1183 ret = stream_fifo_pop(fifo);
1184 }
1185 pthread_mutex_unlock(&fifo->mtx);
1186
1187 return ret;
1188 }
1189
1190 struct stream *stream_fifo_head(struct stream_fifo *fifo)
1191 {
1192 return fifo->head;
1193 }
1194
1195 struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
1196 {
1197 struct stream *ret;
1198
1199 pthread_mutex_lock(&fifo->mtx);
1200 {
1201 ret = stream_fifo_head(fifo);
1202 }
1203 pthread_mutex_unlock(&fifo->mtx);
1204
1205 return ret;
1206 }
1207
1208 void stream_fifo_clean(struct stream_fifo *fifo)
1209 {
1210 struct stream *s;
1211 struct stream *next;
1212
1213 for (s = fifo->head; s; s = next) {
1214 next = s->next;
1215 stream_free(s);
1216 }
1217 fifo->head = fifo->tail = NULL;
1218 atomic_store_explicit(&fifo->count, 0, memory_order_release);
1219 }
1220
1221 void stream_fifo_clean_safe(struct stream_fifo *fifo)
1222 {
1223 pthread_mutex_lock(&fifo->mtx);
1224 {
1225 stream_fifo_clean(fifo);
1226 }
1227 pthread_mutex_unlock(&fifo->mtx);
1228 }
1229
1230 size_t stream_fifo_count_safe(struct stream_fifo *fifo)
1231 {
1232 return atomic_load_explicit(&fifo->count, memory_order_acquire);
1233 }
1234
1235 void stream_fifo_free(struct stream_fifo *fifo)
1236 {
1237 stream_fifo_clean(fifo);
1238 pthread_mutex_destroy(&fifo->mtx);
1239 XFREE(MTYPE_STREAM_FIFO, fifo);
1240 }