]> git.proxmox.com Git - mirror_frr.git/blame - lib/stream.c
zebra: Re-add tracking of redistribution events
[mirror_frr.git] / lib / stream.c
CommitLineData
896014f4 1/*
718e3744 2 * Packet interface
3 * Copyright (C) 1999 Kunihiro Ishiguro
4 *
5 * This file is part of GNU Zebra.
6 *
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
10 * later version.
11 *
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
896014f4
DL
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 20 */
21
22#include <zebra.h>
7a2fbbf0 23#include <stddef.h>
363e24c6 24#include <pthread.h>
718e3744 25
26#include "stream.h"
27#include "memory.h"
28#include "network.h"
29#include "prefix.h"
050c013a 30#include "log.h"
718e3744 31
d62a17ae 32DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream")
4a1ab8e4
DL
33DEFINE_MTYPE_STATIC(LIB, STREAM_DATA, "Stream data")
34DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
35
d62a17ae 36/* Tests whether a position is valid */
37#define GETP_VALID(S, G) ((G) <= (S)->endp)
050c013a 38#define PUT_AT_VALID(S,G) GETP_VALID(S,G)
d62a17ae 39#define ENDP_VALID(S, E) ((E) <= (S)->size)
718e3744 40
050c013a 41/* asserting sanity checks. Following must be true before
42 * stream functions are called:
43 *
44 * Following must always be true of stream elements
45 * before and after calls to stream functions:
46 *
47 * getp <= endp <= size
48 *
49 * Note that after a stream function is called following may be true:
50 * if (getp == endp) then stream is no longer readable
51 * if (endp == size) then stream is no longer writeable
52 *
53 * It is valid to put to anywhere within the size of the stream, but only
54 * using stream_put..._at() functions.
55 */
d62a17ae 56#define STREAM_WARN_OFFSETS(S) \
57 zlog_warn("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
58 (void *)(S), (unsigned long)(S)->size, \
59 (unsigned long)(S)->getp, (unsigned long)(S)->endp)
60
61#define STREAM_VERIFY_SANE(S) \
62 do { \
63 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) \
64 STREAM_WARN_OFFSETS(S); \
65 assert(GETP_VALID(S, (S)->getp)); \
66 assert(ENDP_VALID(S, (S)->endp)); \
67 } while (0)
68
996c9314
LB
69#define STREAM_BOUND_WARN(S, WHAT) \
70 do { \
71 zlog_warn("%s: Attempt to %s out of bounds", __func__, \
72 (WHAT)); \
73 STREAM_WARN_OFFSETS(S); \
74 assert(0); \
051cc28c
DS
75 } while (0)
76
996c9314
LB
77#define STREAM_BOUND_WARN2(S, WHAT) \
78 do { \
79 zlog_warn("%s: Attempt to %s out of bounds", __func__, \
80 (WHAT)); \
81 STREAM_WARN_OFFSETS(S); \
d62a17ae 82 } while (0)
050c013a 83
84/* XXX: Deprecated macro: do not use */
d62a17ae 85#define CHECK_SIZE(S, Z) \
86 do { \
87 if (((S)->endp + (Z)) > (S)->size) { \
88 zlog_warn( \
89 "CHECK_SIZE: truncating requested size %lu\n", \
90 (unsigned long)(Z)); \
91 STREAM_WARN_OFFSETS(S); \
92 (Z) = (S)->size - (S)->endp; \
93 } \
94 } while (0);
718e3744 95
96/* Make stream buffer. */
d62a17ae 97struct stream *stream_new(size_t size)
718e3744 98{
d62a17ae 99 struct stream *s;
100
101 assert(size > 0);
102
103 s = XCALLOC(MTYPE_STREAM, sizeof(struct stream));
104
105 if (s == NULL)
106 return s;
107
108 if ((s->data = XMALLOC(MTYPE_STREAM_DATA, size)) == NULL) {
109 XFREE(MTYPE_STREAM, s);
110 return NULL;
111 }
112
113 s->size = size;
114 return s;
718e3744 115}
116
117/* Free it now. */
d62a17ae 118void stream_free(struct stream *s)
718e3744 119{
d62a17ae 120 if (!s)
121 return;
122
123 XFREE(MTYPE_STREAM_DATA, s->data);
124 XFREE(MTYPE_STREAM, s);
718e3744 125}
050c013a 126
d62a17ae 127struct stream *stream_copy(struct stream *new, struct stream *src)
050c013a 128{
d62a17ae 129 STREAM_VERIFY_SANE(src);
130
131 assert(new != NULL);
132 assert(STREAM_SIZE(new) >= src->endp);
133
134 new->endp = src->endp;
135 new->getp = src->getp;
136
137 memcpy(new->data, src->data, src->endp);
138
139 return new;
050c013a 140}
141
d62a17ae 142struct stream *stream_dup(struct stream *s)
050c013a 143{
d62a17ae 144 struct stream *new;
050c013a 145
d62a17ae 146 STREAM_VERIFY_SANE(s);
050c013a 147
d62a17ae 148 if ((new = stream_new(s->endp)) == NULL)
149 return NULL;
050c013a 150
d62a17ae 151 return (stream_copy(new, s));
050c013a 152}
4b201d46 153
d62a17ae 154struct stream *stream_dupcat(struct stream *s1, struct stream *s2,
155 size_t offset)
8c71e481 156{
d62a17ae 157 struct stream *new;
8c71e481 158
d62a17ae 159 STREAM_VERIFY_SANE(s1);
160 STREAM_VERIFY_SANE(s2);
8c71e481 161
d62a17ae 162 if ((new = stream_new(s1->endp + s2->endp)) == NULL)
163 return NULL;
8c71e481 164
d62a17ae 165 memcpy(new->data, s1->data, offset);
166 memcpy(new->data + offset, s2->data, s2->endp);
167 memcpy(new->data + offset + s2->endp, s1->data + offset,
168 (s1->endp - offset));
169 new->endp = s1->endp + s2->endp;
170 return new;
8c71e481
PM
171}
172
d62a17ae 173size_t stream_resize(struct stream *s, size_t newsize)
4b201d46 174{
d7c0a89a 175 uint8_t *newdata;
d62a17ae 176 STREAM_VERIFY_SANE(s);
177
178 newdata = XREALLOC(MTYPE_STREAM_DATA, s->data, newsize);
179
180 if (newdata == NULL)
181 return s->size;
182
183 s->data = newdata;
184 s->size = newsize;
185
186 if (s->endp > s->size)
187 s->endp = s->size;
188 if (s->getp > s->endp)
189 s->getp = s->endp;
190
191 STREAM_VERIFY_SANE(s);
192
193 return s->size;
4b201d46 194}
6b0655a2 195
d62a17ae 196size_t stream_get_getp(struct stream *s)
718e3744 197{
d62a17ae 198 STREAM_VERIFY_SANE(s);
199 return s->getp;
718e3744 200}
201
d62a17ae 202size_t stream_get_endp(struct stream *s)
718e3744 203{
d62a17ae 204 STREAM_VERIFY_SANE(s);
205 return s->endp;
718e3744 206}
207
d62a17ae 208size_t stream_get_size(struct stream *s)
718e3744 209{
d62a17ae 210 STREAM_VERIFY_SANE(s);
211 return s->size;
718e3744 212}
213
214/* Stream structre' stream pointer related functions. */
d62a17ae 215void stream_set_getp(struct stream *s, size_t pos)
718e3744 216{
d62a17ae 217 STREAM_VERIFY_SANE(s);
218
219 if (!GETP_VALID(s, pos)) {
220 STREAM_BOUND_WARN(s, "set getp");
221 pos = s->endp;
222 }
223
224 s->getp = pos;
718e3744 225}
226
d62a17ae 227void stream_set_endp(struct stream *s, size_t pos)
d531050b 228{
d62a17ae 229 STREAM_VERIFY_SANE(s);
230
231 if (!ENDP_VALID(s, pos)) {
232 STREAM_BOUND_WARN(s, "set endp");
233 return;
234 }
235
236 /*
237 * Make sure the current read pointer is not beyond the new endp.
238 */
239 if (s->getp > pos) {
240 STREAM_BOUND_WARN(s, "set endp");
241 return;
242 }
243
244 s->endp = pos;
245 STREAM_VERIFY_SANE(s);
d531050b
SV
246}
247
9985f83c 248/* Forward pointer. */
d62a17ae 249void stream_forward_getp(struct stream *s, size_t size)
718e3744 250{
d62a17ae 251 STREAM_VERIFY_SANE(s);
252
253 if (!GETP_VALID(s, s->getp + size)) {
254 STREAM_BOUND_WARN(s, "seek getp");
255 return;
256 }
257
258 s->getp += size;
718e3744 259}
260
d62a17ae 261void stream_forward_endp(struct stream *s, size_t size)
718e3744 262{
d62a17ae 263 STREAM_VERIFY_SANE(s);
264
265 if (!ENDP_VALID(s, s->endp + size)) {
266 STREAM_BOUND_WARN(s, "seek endp");
267 return;
268 }
269
270 s->endp += size;
718e3744 271}
6b0655a2 272
718e3744 273/* Copy from stream to destination. */
051cc28c
DS
274inline bool stream_get2(void *dst, struct stream *s, size_t size)
275{
276 STREAM_VERIFY_SANE(s);
277
278 if (STREAM_READABLE(s) < size) {
279 STREAM_BOUND_WARN2(s, "get");
280 return false;
281 }
282
283 memcpy(dst, s->data + s->getp, size);
284 s->getp += size;
285
286 return true;
287}
288
d62a17ae 289void stream_get(void *dst, struct stream *s, size_t size)
718e3744 290{
d62a17ae 291 STREAM_VERIFY_SANE(s);
292
293 if (STREAM_READABLE(s) < size) {
294 STREAM_BOUND_WARN(s, "get");
295 return;
296 }
297
298 memcpy(dst, s->data + s->getp, size);
299 s->getp += size;
718e3744 300}
301
302/* Get next character from the stream. */
d7c0a89a 303inline bool stream_getc2(struct stream *s, uint8_t *byte)
051cc28c
DS
304{
305 STREAM_VERIFY_SANE(s);
306
d7c0a89a 307 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
051cc28c
DS
308 STREAM_BOUND_WARN2(s, "get char");
309 return false;
310 }
311 *byte = s->data[s->getp++];
312
313 return true;
314}
315
d7c0a89a 316uint8_t stream_getc(struct stream *s)
718e3744 317{
d7c0a89a 318 uint8_t c;
d62a17ae 319
320 STREAM_VERIFY_SANE(s);
321
d7c0a89a 322 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
d62a17ae 323 STREAM_BOUND_WARN(s, "get char");
324 return 0;
325 }
326 c = s->data[s->getp++];
327
328 return c;
718e3744 329}
330
331/* Get next character from the stream. */
d7c0a89a 332uint8_t stream_getc_from(struct stream *s, size_t from)
718e3744 333{
d7c0a89a 334 uint8_t c;
d62a17ae 335
336 STREAM_VERIFY_SANE(s);
337
d7c0a89a 338 if (!GETP_VALID(s, from + sizeof(uint8_t))) {
d62a17ae 339 STREAM_BOUND_WARN(s, "get char");
340 return 0;
341 }
342
343 c = s->data[from];
344
345 return c;
718e3744 346}
347
051cc28c
DS
348inline bool stream_getw2(struct stream *s, uint16_t *word)
349{
350 STREAM_VERIFY_SANE(s);
351
352 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
353 STREAM_BOUND_WARN2(s, "get ");
354 return false;
355 }
356
996c9314 357 *word = s->data[s->getp++] << 8;
051cc28c
DS
358 *word |= s->data[s->getp++];
359
360 return true;
361}
362
718e3744 363/* Get next word from the stream. */
d7c0a89a 364uint16_t stream_getw(struct stream *s)
718e3744 365{
d7c0a89a 366 uint16_t w;
d62a17ae 367
368 STREAM_VERIFY_SANE(s);
369
d7c0a89a 370 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
d62a17ae 371 STREAM_BOUND_WARN(s, "get ");
372 return 0;
373 }
374
375 w = s->data[s->getp++] << 8;
376 w |= s->data[s->getp++];
377
378 return w;
718e3744 379}
380
381/* Get next word from the stream. */
d7c0a89a 382uint16_t stream_getw_from(struct stream *s, size_t from)
718e3744 383{
d7c0a89a 384 uint16_t w;
d62a17ae 385
386 STREAM_VERIFY_SANE(s);
387
d7c0a89a 388 if (!GETP_VALID(s, from + sizeof(uint16_t))) {
d62a17ae 389 STREAM_BOUND_WARN(s, "get ");
390 return 0;
391 }
392
393 w = s->data[from++] << 8;
394 w |= s->data[from];
395
396 return w;
718e3744 397}
398
d6f4a61d 399/* Get next 3-byte from the stream. */
d7c0a89a 400uint32_t stream_get3_from(struct stream *s, size_t from)
d6f4a61d 401{
d7c0a89a 402 uint32_t l;
d62a17ae 403
404 STREAM_VERIFY_SANE(s);
405
406 if (!GETP_VALID(s, from + 3)) {
407 STREAM_BOUND_WARN(s, "get 3byte");
408 return 0;
409 }
410
411 l = s->data[from++] << 16;
412 l |= s->data[from++] << 8;
413 l |= s->data[from];
414
415 return l;
d6f4a61d
DL
416}
417
d7c0a89a 418uint32_t stream_get3(struct stream *s)
d6f4a61d 419{
d7c0a89a 420 uint32_t l;
d62a17ae 421
422 STREAM_VERIFY_SANE(s);
423
424 if (STREAM_READABLE(s) < 3) {
425 STREAM_BOUND_WARN(s, "get 3byte");
426 return 0;
427 }
428
429 l = s->data[s->getp++] << 16;
430 l |= s->data[s->getp++] << 8;
431 l |= s->data[s->getp++];
432
433 return l;
d6f4a61d
DL
434}
435
718e3744 436/* Get next long word from the stream. */
d7c0a89a 437uint32_t stream_getl_from(struct stream *s, size_t from)
050c013a 438{
d7c0a89a 439 uint32_t l;
d62a17ae 440
441 STREAM_VERIFY_SANE(s);
442
d7c0a89a 443 if (!GETP_VALID(s, from + sizeof(uint32_t))) {
d62a17ae 444 STREAM_BOUND_WARN(s, "get long");
445 return 0;
446 }
447
937652c6 448 l = (unsigned)(s->data[from++]) << 24;
d62a17ae 449 l |= s->data[from++] << 16;
450 l |= s->data[from++] << 8;
451 l |= s->data[from];
452
453 return l;
050c013a 454}
455
3f9c7369 456/* Copy from stream at specific location to destination. */
d62a17ae 457void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
3f9c7369 458{
d62a17ae 459 STREAM_VERIFY_SANE(s);
3f9c7369 460
d62a17ae 461 if (!GETP_VALID(s, from + size)) {
462 STREAM_BOUND_WARN(s, "get from");
463 return;
464 }
3f9c7369 465
d62a17ae 466 memcpy(dst, s->data + from, size);
3f9c7369
DS
467}
468
051cc28c
DS
469inline bool stream_getl2(struct stream *s, uint32_t *l)
470{
471 STREAM_VERIFY_SANE(s);
472
473 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
474 STREAM_BOUND_WARN2(s, "get long");
475 return false;
476 }
477
996c9314 478 *l = (unsigned int)(s->data[s->getp++]) << 24;
051cc28c
DS
479 *l |= s->data[s->getp++] << 16;
480 *l |= s->data[s->getp++] << 8;
481 *l |= s->data[s->getp++];
482
483 return true;
051cc28c
DS
484}
485
d7c0a89a 486uint32_t stream_getl(struct stream *s)
718e3744 487{
d7c0a89a 488 uint32_t l;
d62a17ae 489
490 STREAM_VERIFY_SANE(s);
491
d7c0a89a 492 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
d62a17ae 493 STREAM_BOUND_WARN(s, "get long");
494 return 0;
495 }
496
937652c6 497 l = (unsigned)(s->data[s->getp++]) << 24;
d62a17ae 498 l |= s->data[s->getp++] << 16;
499 l |= s->data[s->getp++] << 8;
500 l |= s->data[s->getp++];
501
502 return l;
718e3744 503}
4b201d46 504
505/* Get next quad word from the stream. */
d62a17ae 506uint64_t stream_getq_from(struct stream *s, size_t from)
4b201d46 507{
d62a17ae 508 uint64_t q;
509
510 STREAM_VERIFY_SANE(s);
511
512 if (!GETP_VALID(s, from + sizeof(uint64_t))) {
513 STREAM_BOUND_WARN(s, "get quad");
514 return 0;
515 }
516
517 q = ((uint64_t)s->data[from++]) << 56;
518 q |= ((uint64_t)s->data[from++]) << 48;
519 q |= ((uint64_t)s->data[from++]) << 40;
520 q |= ((uint64_t)s->data[from++]) << 32;
521 q |= ((uint64_t)s->data[from++]) << 24;
522 q |= ((uint64_t)s->data[from++]) << 16;
523 q |= ((uint64_t)s->data[from++]) << 8;
524 q |= ((uint64_t)s->data[from++]);
525
526 return q;
4b201d46 527}
528
d62a17ae 529uint64_t stream_getq(struct stream *s)
4b201d46 530{
d62a17ae 531 uint64_t q;
532
533 STREAM_VERIFY_SANE(s);
534
535 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
536 STREAM_BOUND_WARN(s, "get quad");
537 return 0;
538 }
539
540 q = ((uint64_t)s->data[s->getp++]) << 56;
541 q |= ((uint64_t)s->data[s->getp++]) << 48;
542 q |= ((uint64_t)s->data[s->getp++]) << 40;
543 q |= ((uint64_t)s->data[s->getp++]) << 32;
544 q |= ((uint64_t)s->data[s->getp++]) << 24;
545 q |= ((uint64_t)s->data[s->getp++]) << 16;
546 q |= ((uint64_t)s->data[s->getp++]) << 8;
547 q |= ((uint64_t)s->data[s->getp++]);
548
549 return q;
4b201d46 550}
551
718e3744 552/* Get next long word from the stream. */
d7c0a89a 553uint32_t stream_get_ipv4(struct stream *s)
718e3744 554{
d7c0a89a 555 uint32_t l;
d62a17ae 556
557 STREAM_VERIFY_SANE(s);
558
d7c0a89a 559 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
d62a17ae 560 STREAM_BOUND_WARN(s, "get ipv4");
561 return 0;
562 }
563
d7c0a89a
QY
564 memcpy(&l, s->data + s->getp, sizeof(uint32_t));
565 s->getp += sizeof(uint32_t);
d62a17ae 566
567 return l;
718e3744 568}
6b0655a2 569
d62a17ae 570float stream_getf(struct stream *s)
16f1b9ee 571{
d62a17ae 572 union {
573 float r;
574 uint32_t d;
575 } u;
576 u.d = stream_getl(s);
577 return u.r;
16f1b9ee
OD
578}
579
d62a17ae 580double stream_getd(struct stream *s)
16f1b9ee 581{
d62a17ae 582 union {
583 double r;
584 uint64_t d;
585 } u;
586 u.d = stream_getq(s);
587 return u.r;
16f1b9ee
OD
588}
589
050c013a 590/* Copy to source to stream.
591 *
592 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
593 * around. This should be fixed once the stream updates are working.
0dab9303 594 *
595 * stream_write() is saner
050c013a 596 */
d62a17ae 597void stream_put(struct stream *s, const void *src, size_t size)
718e3744 598{
599
d62a17ae 600 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
601 CHECK_SIZE(s, size);
602
603 STREAM_VERIFY_SANE(s);
604
605 if (STREAM_WRITEABLE(s) < size) {
606 STREAM_BOUND_WARN(s, "put");
607 return;
608 }
609
610 if (src)
611 memcpy(s->data + s->endp, src, size);
612 else
613 memset(s->data + s->endp, 0, size);
614
615 s->endp += size;
718e3744 616}
617
618/* Put character to the stream. */
d7c0a89a 619int stream_putc(struct stream *s, uint8_t c)
718e3744 620{
d62a17ae 621 STREAM_VERIFY_SANE(s);
622
d7c0a89a 623 if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
d62a17ae 624 STREAM_BOUND_WARN(s, "put");
625 return 0;
626 }
627
628 s->data[s->endp++] = c;
d7c0a89a 629 return sizeof(uint8_t);
718e3744 630}
631
632/* Put word to the stream. */
d7c0a89a 633int stream_putw(struct stream *s, uint16_t w)
718e3744 634{
d62a17ae 635 STREAM_VERIFY_SANE(s);
636
d7c0a89a 637 if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
d62a17ae 638 STREAM_BOUND_WARN(s, "put");
639 return 0;
640 }
641
d7c0a89a
QY
642 s->data[s->endp++] = (uint8_t)(w >> 8);
643 s->data[s->endp++] = (uint8_t)w;
d62a17ae 644
645 return 2;
718e3744 646}
647
d6f4a61d 648/* Put long word to the stream. */
d7c0a89a 649int stream_put3(struct stream *s, uint32_t l)
d6f4a61d 650{
d62a17ae 651 STREAM_VERIFY_SANE(s);
652
653 if (STREAM_WRITEABLE(s) < 3) {
654 STREAM_BOUND_WARN(s, "put");
655 return 0;
656 }
657
d7c0a89a
QY
658 s->data[s->endp++] = (uint8_t)(l >> 16);
659 s->data[s->endp++] = (uint8_t)(l >> 8);
660 s->data[s->endp++] = (uint8_t)l;
d62a17ae 661
662 return 3;
d6f4a61d
DL
663}
664
718e3744 665/* Put long word to the stream. */
d7c0a89a 666int stream_putl(struct stream *s, uint32_t l)
718e3744 667{
d62a17ae 668 STREAM_VERIFY_SANE(s);
669
d7c0a89a 670 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
d62a17ae 671 STREAM_BOUND_WARN(s, "put");
672 return 0;
673 }
674
d7c0a89a
QY
675 s->data[s->endp++] = (uint8_t)(l >> 24);
676 s->data[s->endp++] = (uint8_t)(l >> 16);
677 s->data[s->endp++] = (uint8_t)(l >> 8);
678 s->data[s->endp++] = (uint8_t)l;
d62a17ae 679
680 return 4;
718e3744 681}
682
4b201d46 683/* Put quad word to the stream. */
d62a17ae 684int stream_putq(struct stream *s, uint64_t q)
4b201d46 685{
d62a17ae 686 STREAM_VERIFY_SANE(s);
687
688 if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
689 STREAM_BOUND_WARN(s, "put quad");
690 return 0;
691 }
692
d7c0a89a
QY
693 s->data[s->endp++] = (uint8_t)(q >> 56);
694 s->data[s->endp++] = (uint8_t)(q >> 48);
695 s->data[s->endp++] = (uint8_t)(q >> 40);
696 s->data[s->endp++] = (uint8_t)(q >> 32);
697 s->data[s->endp++] = (uint8_t)(q >> 24);
698 s->data[s->endp++] = (uint8_t)(q >> 16);
699 s->data[s->endp++] = (uint8_t)(q >> 8);
700 s->data[s->endp++] = (uint8_t)q;
d62a17ae 701
702 return 8;
4b201d46 703}
704
d62a17ae 705int stream_putf(struct stream *s, float f)
16f1b9ee 706{
d62a17ae 707 union {
708 float i;
709 uint32_t o;
710 } u;
711 u.i = f;
712 return stream_putl(s, u.o);
16f1b9ee
OD
713}
714
d62a17ae 715int stream_putd(struct stream *s, double d)
16f1b9ee 716{
d62a17ae 717 union {
718 double i;
719 uint64_t o;
720 } u;
721 u.i = d;
722 return stream_putq(s, u.o);
16f1b9ee
OD
723}
724
d7c0a89a 725int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
718e3744 726{
d62a17ae 727 STREAM_VERIFY_SANE(s);
728
d7c0a89a 729 if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
d62a17ae 730 STREAM_BOUND_WARN(s, "put");
731 return 0;
732 }
733
734 s->data[putp] = c;
735
736 return 1;
718e3744 737}
738
d7c0a89a 739int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
718e3744 740{
d62a17ae 741 STREAM_VERIFY_SANE(s);
742
d7c0a89a 743 if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
d62a17ae 744 STREAM_BOUND_WARN(s, "put");
745 return 0;
746 }
747
d7c0a89a
QY
748 s->data[putp] = (uint8_t)(w >> 8);
749 s->data[putp + 1] = (uint8_t)w;
d62a17ae 750
751 return 2;
718e3744 752}
753
d7c0a89a 754int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
d6f4a61d 755{
d62a17ae 756 STREAM_VERIFY_SANE(s);
757
758 if (!PUT_AT_VALID(s, putp + 3)) {
759 STREAM_BOUND_WARN(s, "put");
760 return 0;
761 }
d7c0a89a
QY
762 s->data[putp] = (uint8_t)(l >> 16);
763 s->data[putp + 1] = (uint8_t)(l >> 8);
764 s->data[putp + 2] = (uint8_t)l;
d62a17ae 765
766 return 3;
d6f4a61d
DL
767}
768
d7c0a89a 769int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
718e3744 770{
d62a17ae 771 STREAM_VERIFY_SANE(s);
772
d7c0a89a 773 if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
d62a17ae 774 STREAM_BOUND_WARN(s, "put");
775 return 0;
776 }
d7c0a89a
QY
777 s->data[putp] = (uint8_t)(l >> 24);
778 s->data[putp + 1] = (uint8_t)(l >> 16);
779 s->data[putp + 2] = (uint8_t)(l >> 8);
780 s->data[putp + 3] = (uint8_t)l;
d62a17ae 781
782 return 4;
718e3744 783}
784
d62a17ae 785int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
4b201d46 786{
d62a17ae 787 STREAM_VERIFY_SANE(s);
788
789 if (!PUT_AT_VALID(s, putp + sizeof(uint64_t))) {
790 STREAM_BOUND_WARN(s, "put");
791 return 0;
792 }
d7c0a89a
QY
793 s->data[putp] = (uint8_t)(q >> 56);
794 s->data[putp + 1] = (uint8_t)(q >> 48);
795 s->data[putp + 2] = (uint8_t)(q >> 40);
796 s->data[putp + 3] = (uint8_t)(q >> 32);
797 s->data[putp + 4] = (uint8_t)(q >> 24);
798 s->data[putp + 5] = (uint8_t)(q >> 16);
799 s->data[putp + 6] = (uint8_t)(q >> 8);
800 s->data[putp + 7] = (uint8_t)q;
d62a17ae 801
802 return 8;
4b201d46 803}
804
718e3744 805/* Put long word to the stream. */
d7c0a89a 806int stream_put_ipv4(struct stream *s, uint32_t l)
718e3744 807{
d62a17ae 808 STREAM_VERIFY_SANE(s);
809
d7c0a89a 810 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
d62a17ae 811 STREAM_BOUND_WARN(s, "put");
812 return 0;
813 }
d7c0a89a
QY
814 memcpy(s->data + s->endp, &l, sizeof(uint32_t));
815 s->endp += sizeof(uint32_t);
d62a17ae 816
d7c0a89a 817 return sizeof(uint32_t);
718e3744 818}
819
820/* Put long word to the stream. */
d62a17ae 821int stream_put_in_addr(struct stream *s, struct in_addr *addr)
718e3744 822{
d62a17ae 823 STREAM_VERIFY_SANE(s);
824
d7c0a89a 825 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
d62a17ae 826 STREAM_BOUND_WARN(s, "put");
827 return 0;
828 }
829
d7c0a89a
QY
830 memcpy(s->data + s->endp, addr, sizeof(uint32_t));
831 s->endp += sizeof(uint32_t);
d62a17ae 832
d7c0a89a 833 return sizeof(uint32_t);
718e3744 834}
835
3f9c7369 836/* Put in_addr at location in the stream. */
d62a17ae 837int stream_put_in_addr_at(struct stream *s, size_t putp, struct in_addr *addr)
3f9c7369 838{
d62a17ae 839 STREAM_VERIFY_SANE(s);
3f9c7369 840
d62a17ae 841 if (!PUT_AT_VALID(s, putp + 4)) {
842 STREAM_BOUND_WARN(s, "put");
843 return 0;
844 }
3f9c7369 845
d62a17ae 846 memcpy(&s->data[putp], addr, 4);
847 return 4;
3f9c7369
DS
848}
849
850/* Put in6_addr at location in the stream. */
d62a17ae 851int stream_put_in6_addr_at(struct stream *s, size_t putp, struct in6_addr *addr)
3f9c7369 852{
d62a17ae 853 STREAM_VERIFY_SANE(s);
3f9c7369 854
d62a17ae 855 if (!PUT_AT_VALID(s, putp + 16)) {
856 STREAM_BOUND_WARN(s, "put");
857 return 0;
858 }
3f9c7369 859
d62a17ae 860 memcpy(&s->data[putp], addr, 16);
861 return 16;
3f9c7369
DS
862}
863
718e3744 864/* Put prefix by nlri type format. */
d62a17ae 865int stream_put_prefix_addpath(struct stream *s, struct prefix *p,
d7c0a89a 866 int addpath_encode, uint32_t addpath_tx_id)
718e3744 867{
d62a17ae 868 size_t psize;
869 size_t psize_with_addpath;
870
871 STREAM_VERIFY_SANE(s);
872
873 psize = PSIZE(p->prefixlen);
874
875 if (addpath_encode)
876 psize_with_addpath = psize + 4;
877 else
878 psize_with_addpath = psize;
879
d7c0a89a 880 if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
d62a17ae 881 STREAM_BOUND_WARN(s, "put");
882 return 0;
883 }
884
885 if (addpath_encode) {
d7c0a89a
QY
886 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
887 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
888 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
889 s->data[s->endp++] = (uint8_t)addpath_tx_id;
d62a17ae 890 }
891
892 s->data[s->endp++] = p->prefixlen;
893 memcpy(s->data + s->endp, &p->u.prefix, psize);
894 s->endp += psize;
895
896 return psize;
718e3744 897}
6b0655a2 898
d62a17ae 899int stream_put_prefix(struct stream *s, struct prefix *p)
adbac85e 900{
d62a17ae 901 return stream_put_prefix_addpath(s, p, 0, 0);
adbac85e
DW
902}
903
cd1964ff 904/* Put NLRI with label */
d62a17ae 905int stream_put_labeled_prefix(struct stream *s, struct prefix *p,
906 mpls_label_t *label)
cd1964ff 907{
d62a17ae 908 size_t psize;
d7c0a89a 909 uint8_t *label_pnt = (uint8_t *)label;
cd1964ff 910
d62a17ae 911 STREAM_VERIFY_SANE(s);
cd1964ff 912
d62a17ae 913 psize = PSIZE(p->prefixlen);
cd1964ff 914
d62a17ae 915 if (STREAM_WRITEABLE(s) < (psize + 3)) {
916 STREAM_BOUND_WARN(s, "put");
917 return 0;
918 }
cd1964ff 919
d62a17ae 920 stream_putc(s, (p->prefixlen + 24));
921 stream_putc(s, label_pnt[0]);
922 stream_putc(s, label_pnt[1]);
923 stream_putc(s, label_pnt[2]);
924 memcpy(s->data + s->endp, &p->u.prefix, psize);
925 s->endp += psize;
cd1964ff 926
d62a17ae 927 return (psize + 3);
cd1964ff 928}
adbac85e 929
718e3744 930/* Read size from fd. */
d62a17ae 931int stream_read(struct stream *s, int fd, size_t size)
718e3744 932{
d62a17ae 933 int nbytes;
934
935 STREAM_VERIFY_SANE(s);
936
937 if (STREAM_WRITEABLE(s) < size) {
938 STREAM_BOUND_WARN(s, "put");
939 return 0;
940 }
941
942 nbytes = readn(fd, s->data + s->endp, size);
943
944 if (nbytes > 0)
945 s->endp += nbytes;
946
947 return nbytes;
718e3744 948}
949
d62a17ae 950ssize_t stream_read_try(struct stream *s, int fd, size_t size)
262feb1a 951{
d62a17ae 952 ssize_t nbytes;
953
954 STREAM_VERIFY_SANE(s);
955
956 if (STREAM_WRITEABLE(s) < size) {
957 STREAM_BOUND_WARN(s, "put");
958 /* Fatal (not transient) error, since retrying will not help
959 (stream is too small to contain the desired data). */
960 return -1;
961 }
962
963 if ((nbytes = read(fd, s->data + s->endp, size)) >= 0) {
964 s->endp += nbytes;
965 return nbytes;
966 }
967 /* Error: was it transient (return -2) or fatal (return -1)? */
968 if (ERRNO_IO_RETRY(errno))
969 return -2;
970 zlog_warn("%s: read failed on fd %d: %s", __func__, fd,
971 safe_strerror(errno));
972 return -1;
262feb1a 973}
974
0dab9303 975/* Read up to size bytes into the stream from the fd, using recvmsgfrom
976 * whose arguments match the remaining arguments to this function
977 */
d62a17ae 978ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
979 struct sockaddr *from, socklen_t *fromlen)
0dab9303 980{
d62a17ae 981 ssize_t nbytes;
982
983 STREAM_VERIFY_SANE(s);
984
985 if (STREAM_WRITEABLE(s) < size) {
986 STREAM_BOUND_WARN(s, "put");
987 /* Fatal (not transient) error, since retrying will not help
988 (stream is too small to contain the desired data). */
989 return -1;
990 }
991
992 if ((nbytes = recvfrom(fd, s->data + s->endp, size, flags, from,
993 fromlen))
994 >= 0) {
995 s->endp += nbytes;
996 return nbytes;
997 }
998 /* Error: was it transient (return -2) or fatal (return -1)? */
999 if (ERRNO_IO_RETRY(errno))
1000 return -2;
1001 zlog_warn("%s: read failed on fd %d: %s", __func__, fd,
1002 safe_strerror(errno));
1003 return -1;
0dab9303 1004}
1005
050c013a 1006/* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1007 * from endp.
1008 * First iovec will be used to receive the data.
1009 * Stream need not be empty.
1010 */
d62a17ae 1011ssize_t stream_recvmsg(struct stream *s, int fd, struct msghdr *msgh, int flags,
1012 size_t size)
050c013a 1013{
d62a17ae 1014 int nbytes;
1015 struct iovec *iov;
1016
1017 STREAM_VERIFY_SANE(s);
1018 assert(msgh->msg_iovlen > 0);
1019
1020 if (STREAM_WRITEABLE(s) < size) {
1021 STREAM_BOUND_WARN(s, "put");
1022 /* This is a logic error in the calling code: the stream is too
1023 small
1024 to hold the desired data! */
1025 return -1;
1026 }
1027
1028 iov = &(msgh->msg_iov[0]);
1029 iov->iov_base = (s->data + s->endp);
1030 iov->iov_len = size;
1031
1032 nbytes = recvmsg(fd, msgh, flags);
1033
1034 if (nbytes > 0)
1035 s->endp += nbytes;
1036
1037 return nbytes;
050c013a 1038}
d62a17ae 1039
718e3744 1040/* Write data to buffer. */
d62a17ae 1041size_t stream_write(struct stream *s, const void *ptr, size_t size)
718e3744 1042{
1043
d62a17ae 1044 CHECK_SIZE(s, size);
1045
1046 STREAM_VERIFY_SANE(s);
1047
1048 if (STREAM_WRITEABLE(s) < size) {
1049 STREAM_BOUND_WARN(s, "put");
1050 return 0;
1051 }
718e3744 1052
d62a17ae 1053 memcpy(s->data + s->endp, ptr, size);
1054 s->endp += size;
9985f83c 1055
d62a17ae 1056 return size;
718e3744 1057}
1058
d62a17ae 1059/* Return current read pointer.
050c013a 1060 * DEPRECATED!
1061 * Use stream_get_pnt_to if you must, but decoding streams properly
1062 * is preferred
1063 */
d7c0a89a 1064uint8_t *stream_pnt(struct stream *s)
718e3744 1065{
d62a17ae 1066 STREAM_VERIFY_SANE(s);
1067 return s->data + s->getp;
718e3744 1068}
1069
1070/* Check does this stream empty? */
d62a17ae 1071int stream_empty(struct stream *s)
718e3744 1072{
d62a17ae 1073 STREAM_VERIFY_SANE(s);
050c013a 1074
d62a17ae 1075 return (s->endp == 0);
718e3744 1076}
1077
1078/* Reset stream. */
d62a17ae 1079void stream_reset(struct stream *s)
718e3744 1080{
d62a17ae 1081 STREAM_VERIFY_SANE(s);
050c013a 1082
d62a17ae 1083 s->getp = s->endp = 0;
718e3744 1084}
1085
1086/* Write stream contens to the file discriptor. */
d62a17ae 1087int stream_flush(struct stream *s, int fd)
718e3744 1088{
d62a17ae 1089 int nbytes;
1090
1091 STREAM_VERIFY_SANE(s);
1092
1093 nbytes = write(fd, s->data + s->getp, s->endp - s->getp);
1094
1095 return nbytes;
718e3744 1096}
6b0655a2 1097
718e3744 1098/* Stream first in first out queue. */
1099
d62a17ae 1100struct stream_fifo *stream_fifo_new(void)
718e3744 1101{
d62a17ae 1102 struct stream_fifo *new;
1103
1104 new = XCALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
363e24c6 1105 pthread_mutex_init(&new->mtx, NULL);
d62a17ae 1106 return new;
718e3744 1107}
1108
1109/* Add new stream to fifo. */
d62a17ae 1110void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
718e3744 1111{
d62a17ae 1112 if (fifo->tail)
1113 fifo->tail->next = s;
1114 else
1115 fifo->head = s;
1116
1117 fifo->tail = s;
08a0e54e 1118 fifo->tail->next = NULL;
d62a17ae 1119
363e24c6
QY
1120 atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1121}
1122
1123void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
1124{
1125 pthread_mutex_lock(&fifo->mtx);
1126 {
1127 stream_fifo_push(fifo, s);
1128 }
1129 pthread_mutex_unlock(&fifo->mtx);
718e3744 1130}
1131
1132/* Delete first stream from fifo. */
d62a17ae 1133struct stream *stream_fifo_pop(struct stream_fifo *fifo)
718e3744 1134{
d62a17ae 1135 struct stream *s;
1136
1137 s = fifo->head;
718e3744 1138
d62a17ae 1139 if (s) {
1140 fifo->head = s->next;
718e3744 1141
d62a17ae 1142 if (fifo->head == NULL)
1143 fifo->tail = NULL;
718e3744 1144
363e24c6
QY
1145 atomic_fetch_sub_explicit(&fifo->count, 1,
1146 memory_order_release);
08a0e54e
QY
1147
1148 /* ensure stream is scrubbed of references to this fifo */
1149 s->next = NULL;
d62a17ae 1150 }
718e3744 1151
d62a17ae 1152 return s;
718e3744 1153}
1154
363e24c6
QY
1155struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
1156{
1157 struct stream *ret;
1158
1159 pthread_mutex_lock(&fifo->mtx);
1160 {
1161 ret = stream_fifo_pop(fifo);
1162 }
1163 pthread_mutex_unlock(&fifo->mtx);
1164
1165 return ret;
1166}
1167
d62a17ae 1168struct stream *stream_fifo_head(struct stream_fifo *fifo)
718e3744 1169{
d62a17ae 1170 return fifo->head;
718e3744 1171}
1172
363e24c6
QY
1173struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
1174{
1175 struct stream *ret;
1176
1177 pthread_mutex_lock(&fifo->mtx);
1178 {
1179 ret = stream_fifo_head(fifo);
1180 }
1181 pthread_mutex_unlock(&fifo->mtx);
1182
1183 return ret;
1184}
1185
d62a17ae 1186void stream_fifo_clean(struct stream_fifo *fifo)
718e3744 1187{
d62a17ae 1188 struct stream *s;
1189 struct stream *next;
1190
1191 for (s = fifo->head; s; s = next) {
1192 next = s->next;
1193 stream_free(s);
1194 }
1195 fifo->head = fifo->tail = NULL;
363e24c6
QY
1196 atomic_store_explicit(&fifo->count, 0, memory_order_release);
1197}
1198
1199void stream_fifo_clean_safe(struct stream_fifo *fifo)
1200{
1201 pthread_mutex_lock(&fifo->mtx);
1202 {
1203 stream_fifo_clean(fifo);
1204 }
1205 pthread_mutex_unlock(&fifo->mtx);
1206}
1207
1208size_t stream_fifo_count_safe(struct stream_fifo *fifo)
1209{
1210 return atomic_load_explicit(&fifo->count, memory_order_acquire);
718e3744 1211}
1212
d62a17ae 1213void stream_fifo_free(struct stream_fifo *fifo)
718e3744 1214{
d62a17ae 1215 stream_fifo_clean(fifo);
363e24c6 1216 pthread_mutex_destroy(&fifo->mtx);
d62a17ae 1217 XFREE(MTYPE_STREAM_FIFO, fifo);
718e3744 1218}