]> git.proxmox.com Git - mirror_frr.git/blame - lib/stream.c
Merge pull request #13278 from FRRouting/mergify/bp/stable/8.5/pr-13269
[mirror_frr.git] / lib / stream.c
CommitLineData
896014f4 1/*
718e3744 2 * Packet interface
3 * Copyright (C) 1999 Kunihiro Ishiguro
4 *
5 * This file is part of GNU Zebra.
6 *
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
10 * later version.
11 *
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
896014f4
DL
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 20 */
21
22#include <zebra.h>
7a2fbbf0 23#include <stddef.h>
363e24c6 24#include <pthread.h>
718e3744 25
26#include "stream.h"
27#include "memory.h"
28#include "network.h"
29#include "prefix.h"
050c013a 30#include "log.h"
00dffa8c 31#include "frr_pthread.h"
de75223e 32#include "lib_errors.h"
718e3744 33
bf8d3d6a
DL
34DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream");
35DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO");
4a1ab8e4 36
d62a17ae 37/* Tests whether a position is valid */
38#define GETP_VALID(S, G) ((G) <= (S)->endp)
050c013a 39#define PUT_AT_VALID(S,G) GETP_VALID(S,G)
d62a17ae 40#define ENDP_VALID(S, E) ((E) <= (S)->size)
718e3744 41
050c013a 42/* asserting sanity checks. Following must be true before
43 * stream functions are called:
44 *
45 * Following must always be true of stream elements
46 * before and after calls to stream functions:
47 *
48 * getp <= endp <= size
49 *
50 * Note that after a stream function is called following may be true:
51 * if (getp == endp) then stream is no longer readable
52 * if (endp == size) then stream is no longer writeable
53 *
54 * It is valid to put to anywhere within the size of the stream, but only
55 * using stream_put..._at() functions.
56 */
d62a17ae 57#define STREAM_WARN_OFFSETS(S) \
3211b92b
MS
58 do { \
59 flog_warn(EC_LIB_STREAM, \
1d5453d6 60 "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu", \
3211b92b
MS
61 (void *)(S), (unsigned long)(S)->size, \
62 (unsigned long)(S)->getp, (unsigned long)(S)->endp); \
63 zlog_backtrace(LOG_WARNING); \
64 } while (0)
d62a17ae 65
66#define STREAM_VERIFY_SANE(S) \
67 do { \
3211b92b 68 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) { \
d62a17ae 69 STREAM_WARN_OFFSETS(S); \
3211b92b 70 } \
d62a17ae 71 assert(GETP_VALID(S, (S)->getp)); \
72 assert(ENDP_VALID(S, (S)->endp)); \
73 } while (0)
74
996c9314
LB
75#define STREAM_BOUND_WARN(S, WHAT) \
76 do { \
1c50c1c0 77 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
a2b0f8b8 78 __func__, (WHAT)); \
996c9314
LB
79 STREAM_WARN_OFFSETS(S); \
80 assert(0); \
051cc28c
DS
81 } while (0)
82
996c9314
LB
83#define STREAM_BOUND_WARN2(S, WHAT) \
84 do { \
1c50c1c0 85 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
a2b0f8b8 86 __func__, (WHAT)); \
996c9314 87 STREAM_WARN_OFFSETS(S); \
d62a17ae 88 } while (0)
050c013a 89
90/* XXX: Deprecated macro: do not use */
d62a17ae 91#define CHECK_SIZE(S, Z) \
92 do { \
93 if (((S)->endp + (Z)) > (S)->size) { \
a2b0f8b8 94 flog_warn( \
1c50c1c0 95 EC_LIB_STREAM, \
1d5453d6 96 "CHECK_SIZE: truncating requested size %lu", \
d62a17ae 97 (unsigned long)(Z)); \
98 STREAM_WARN_OFFSETS(S); \
99 (Z) = (S)->size - (S)->endp; \
100 } \
101 } while (0);
718e3744 102
103/* Make stream buffer. */
d62a17ae 104struct stream *stream_new(size_t size)
718e3744 105{
d62a17ae 106 struct stream *s;
107
108 assert(size > 0);
109
de75223e 110 s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size);
d62a17ae 111
565b5561
DS
112 s->getp = s->endp = 0;
113 s->next = NULL;
d62a17ae 114 s->size = size;
115 return s;
718e3744 116}
117
118/* Free it now. */
d62a17ae 119void stream_free(struct stream *s)
718e3744 120{
d62a17ae 121 if (!s)
122 return;
123
d62a17ae 124 XFREE(MTYPE_STREAM, s);
718e3744 125}
050c013a 126
f8c511cd 127struct stream *stream_copy(struct stream *dest, const struct stream *src)
050c013a 128{
d62a17ae 129 STREAM_VERIFY_SANE(src);
130
f8c511cd
MS
131 assert(dest != NULL);
132 assert(STREAM_SIZE(dest) >= src->endp);
d62a17ae 133
f8c511cd
MS
134 dest->endp = src->endp;
135 dest->getp = src->getp;
d62a17ae 136
f8c511cd 137 memcpy(dest->data, src->data, src->endp);
d62a17ae 138
f8c511cd 139 return dest;
050c013a 140}
141
f8c511cd 142struct stream *stream_dup(const struct stream *s)
050c013a 143{
f8c511cd 144 struct stream *snew;
050c013a 145
d62a17ae 146 STREAM_VERIFY_SANE(s);
050c013a 147
ab08ef82 148 snew = stream_new(s->endp);
050c013a 149
f8c511cd 150 return (stream_copy(snew, s));
050c013a 151}
4b201d46 152
f8c511cd 153struct stream *stream_dupcat(const struct stream *s1, const struct stream *s2,
d62a17ae 154 size_t offset)
8c71e481 155{
d62a17ae 156 struct stream *new;
8c71e481 157
d62a17ae 158 STREAM_VERIFY_SANE(s1);
159 STREAM_VERIFY_SANE(s2);
8c71e481 160
d62a17ae 161 if ((new = stream_new(s1->endp + s2->endp)) == NULL)
162 return NULL;
8c71e481 163
d62a17ae 164 memcpy(new->data, s1->data, offset);
165 memcpy(new->data + offset, s2->data, s2->endp);
166 memcpy(new->data + offset + s2->endp, s1->data + offset,
167 (s1->endp - offset));
168 new->endp = s1->endp + s2->endp;
169 return new;
8c71e481
PM
170}
171
43888669
DS
172size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
173{
de75223e 174 struct stream *orig = *sptr;
43888669 175
de75223e 176 STREAM_VERIFY_SANE(orig);
d62a17ae 177
de75223e 178 orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize);
d62a17ae 179
de75223e 180 orig->size = newsize;
d62a17ae 181
de75223e
DS
182 if (orig->endp > orig->size)
183 orig->endp = orig->size;
184 if (orig->getp > orig->endp)
185 orig->getp = orig->endp;
d62a17ae 186
de75223e 187 STREAM_VERIFY_SANE(orig);
d62a17ae 188
de75223e
DS
189 *sptr = orig;
190 return orig->size;
191}
d62a17ae 192
f8c511cd 193size_t stream_get_getp(const struct stream *s)
718e3744 194{
d62a17ae 195 STREAM_VERIFY_SANE(s);
196 return s->getp;
718e3744 197}
198
f8c511cd 199size_t stream_get_endp(const struct stream *s)
718e3744 200{
d62a17ae 201 STREAM_VERIFY_SANE(s);
202 return s->endp;
718e3744 203}
204
f8c511cd 205size_t stream_get_size(const struct stream *s)
718e3744 206{
d62a17ae 207 STREAM_VERIFY_SANE(s);
208 return s->size;
718e3744 209}
210
211/* Stream structre' stream pointer related functions. */
d62a17ae 212void stream_set_getp(struct stream *s, size_t pos)
718e3744 213{
d62a17ae 214 STREAM_VERIFY_SANE(s);
215
216 if (!GETP_VALID(s, pos)) {
217 STREAM_BOUND_WARN(s, "set getp");
218 pos = s->endp;
219 }
220
221 s->getp = pos;
718e3744 222}
223
d62a17ae 224void stream_set_endp(struct stream *s, size_t pos)
d531050b 225{
d62a17ae 226 STREAM_VERIFY_SANE(s);
227
228 if (!ENDP_VALID(s, pos)) {
229 STREAM_BOUND_WARN(s, "set endp");
230 return;
231 }
232
233 /*
234 * Make sure the current read pointer is not beyond the new endp.
235 */
236 if (s->getp > pos) {
237 STREAM_BOUND_WARN(s, "set endp");
238 return;
239 }
240
241 s->endp = pos;
242 STREAM_VERIFY_SANE(s);
d531050b
SV
243}
244
9985f83c 245/* Forward pointer. */
d62a17ae 246void stream_forward_getp(struct stream *s, size_t size)
718e3744 247{
d62a17ae 248 STREAM_VERIFY_SANE(s);
249
250 if (!GETP_VALID(s, s->getp + size)) {
251 STREAM_BOUND_WARN(s, "seek getp");
252 return;
253 }
254
255 s->getp += size;
718e3744 256}
257
4adddf0a
QY
258bool stream_forward_getp2(struct stream *s, size_t size)
259{
260 STREAM_VERIFY_SANE(s);
261
262 if (!GETP_VALID(s, s->getp + size))
263 return false;
264
265 s->getp += size;
266
267 return true;
268}
269
06cf2c0c
QY
270void stream_rewind_getp(struct stream *s, size_t size)
271{
272 STREAM_VERIFY_SANE(s);
273
274 if (size > s->getp || !GETP_VALID(s, s->getp - size)) {
275 STREAM_BOUND_WARN(s, "rewind getp");
276 return;
277 }
278
279 s->getp -= size;
280}
281
282bool stream_rewind_getp2(struct stream *s, size_t size)
283{
284 STREAM_VERIFY_SANE(s);
285
286 if (size > s->getp || !GETP_VALID(s, s->getp - size))
287 return false;
288
289 s->getp -= size;
290
291 return true;
292}
293
d62a17ae 294void stream_forward_endp(struct stream *s, size_t size)
718e3744 295{
d62a17ae 296 STREAM_VERIFY_SANE(s);
297
298 if (!ENDP_VALID(s, s->endp + size)) {
299 STREAM_BOUND_WARN(s, "seek endp");
300 return;
301 }
302
303 s->endp += size;
718e3744 304}
6b0655a2 305
4adddf0a
QY
306bool stream_forward_endp2(struct stream *s, size_t size)
307{
308 STREAM_VERIFY_SANE(s);
309
310 if (!ENDP_VALID(s, s->endp + size))
311 return false;
312
313 s->endp += size;
314
315 return true;
316}
317
718e3744 318/* Copy from stream to destination. */
3009394b 319bool stream_get2(void *dst, struct stream *s, size_t size)
051cc28c
DS
320{
321 STREAM_VERIFY_SANE(s);
322
323 if (STREAM_READABLE(s) < size) {
324 STREAM_BOUND_WARN2(s, "get");
325 return false;
326 }
327
328 memcpy(dst, s->data + s->getp, size);
329 s->getp += size;
330
331 return true;
332}
333
d62a17ae 334void stream_get(void *dst, struct stream *s, size_t size)
718e3744 335{
d62a17ae 336 STREAM_VERIFY_SANE(s);
337
338 if (STREAM_READABLE(s) < size) {
339 STREAM_BOUND_WARN(s, "get");
340 return;
341 }
342
343 memcpy(dst, s->data + s->getp, size);
344 s->getp += size;
718e3744 345}
346
347/* Get next character from the stream. */
3009394b 348bool stream_getc2(struct stream *s, uint8_t *byte)
051cc28c
DS
349{
350 STREAM_VERIFY_SANE(s);
351
d7c0a89a 352 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
051cc28c
DS
353 STREAM_BOUND_WARN2(s, "get char");
354 return false;
355 }
356 *byte = s->data[s->getp++];
357
358 return true;
359}
360
d7c0a89a 361uint8_t stream_getc(struct stream *s)
718e3744 362{
d7c0a89a 363 uint8_t c;
d62a17ae 364
365 STREAM_VERIFY_SANE(s);
366
d7c0a89a 367 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
d62a17ae 368 STREAM_BOUND_WARN(s, "get char");
369 return 0;
370 }
371 c = s->data[s->getp++];
372
373 return c;
718e3744 374}
375
376/* Get next character from the stream. */
d7c0a89a 377uint8_t stream_getc_from(struct stream *s, size_t from)
718e3744 378{
d7c0a89a 379 uint8_t c;
d62a17ae 380
381 STREAM_VERIFY_SANE(s);
382
d7c0a89a 383 if (!GETP_VALID(s, from + sizeof(uint8_t))) {
d62a17ae 384 STREAM_BOUND_WARN(s, "get char");
385 return 0;
386 }
387
388 c = s->data[from];
389
390 return c;
718e3744 391}
392
3009394b 393bool stream_getw2(struct stream *s, uint16_t *word)
051cc28c
DS
394{
395 STREAM_VERIFY_SANE(s);
396
397 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
398 STREAM_BOUND_WARN2(s, "get ");
399 return false;
400 }
401
996c9314 402 *word = s->data[s->getp++] << 8;
051cc28c
DS
403 *word |= s->data[s->getp++];
404
405 return true;
406}
407
718e3744 408/* Get next word from the stream. */
d7c0a89a 409uint16_t stream_getw(struct stream *s)
718e3744 410{
d7c0a89a 411 uint16_t w;
d62a17ae 412
413 STREAM_VERIFY_SANE(s);
414
d7c0a89a 415 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
d62a17ae 416 STREAM_BOUND_WARN(s, "get ");
417 return 0;
418 }
419
420 w = s->data[s->getp++] << 8;
421 w |= s->data[s->getp++];
422
423 return w;
718e3744 424}
425
426/* Get next word from the stream. */
d7c0a89a 427uint16_t stream_getw_from(struct stream *s, size_t from)
718e3744 428{
d7c0a89a 429 uint16_t w;
d62a17ae 430
431 STREAM_VERIFY_SANE(s);
432
d7c0a89a 433 if (!GETP_VALID(s, from + sizeof(uint16_t))) {
d62a17ae 434 STREAM_BOUND_WARN(s, "get ");
435 return 0;
436 }
437
438 w = s->data[from++] << 8;
439 w |= s->data[from];
440
441 return w;
718e3744 442}
443
d6f4a61d 444/* Get next 3-byte from the stream. */
d7c0a89a 445uint32_t stream_get3_from(struct stream *s, size_t from)
d6f4a61d 446{
d7c0a89a 447 uint32_t l;
d62a17ae 448
449 STREAM_VERIFY_SANE(s);
450
451 if (!GETP_VALID(s, from + 3)) {
452 STREAM_BOUND_WARN(s, "get 3byte");
453 return 0;
454 }
455
456 l = s->data[from++] << 16;
457 l |= s->data[from++] << 8;
458 l |= s->data[from];
459
460 return l;
d6f4a61d
DL
461}
462
d7c0a89a 463uint32_t stream_get3(struct stream *s)
d6f4a61d 464{
d7c0a89a 465 uint32_t l;
d62a17ae 466
467 STREAM_VERIFY_SANE(s);
468
469 if (STREAM_READABLE(s) < 3) {
470 STREAM_BOUND_WARN(s, "get 3byte");
471 return 0;
472 }
473
474 l = s->data[s->getp++] << 16;
475 l |= s->data[s->getp++] << 8;
476 l |= s->data[s->getp++];
477
478 return l;
d6f4a61d
DL
479}
480
718e3744 481/* Get next long word from the stream. */
d7c0a89a 482uint32_t stream_getl_from(struct stream *s, size_t from)
050c013a 483{
d7c0a89a 484 uint32_t l;
d62a17ae 485
486 STREAM_VERIFY_SANE(s);
487
d7c0a89a 488 if (!GETP_VALID(s, from + sizeof(uint32_t))) {
d62a17ae 489 STREAM_BOUND_WARN(s, "get long");
490 return 0;
491 }
492
937652c6 493 l = (unsigned)(s->data[from++]) << 24;
d62a17ae 494 l |= s->data[from++] << 16;
495 l |= s->data[from++] << 8;
496 l |= s->data[from];
497
498 return l;
050c013a 499}
500
3f9c7369 501/* Copy from stream at specific location to destination. */
d62a17ae 502void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
3f9c7369 503{
d62a17ae 504 STREAM_VERIFY_SANE(s);
3f9c7369 505
d62a17ae 506 if (!GETP_VALID(s, from + size)) {
507 STREAM_BOUND_WARN(s, "get from");
508 return;
509 }
3f9c7369 510
d62a17ae 511 memcpy(dst, s->data + from, size);
3f9c7369
DS
512}
513
3009394b 514bool stream_getl2(struct stream *s, uint32_t *l)
051cc28c
DS
515{
516 STREAM_VERIFY_SANE(s);
517
518 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
519 STREAM_BOUND_WARN2(s, "get long");
520 return false;
521 }
522
996c9314 523 *l = (unsigned int)(s->data[s->getp++]) << 24;
051cc28c
DS
524 *l |= s->data[s->getp++] << 16;
525 *l |= s->data[s->getp++] << 8;
526 *l |= s->data[s->getp++];
527
528 return true;
051cc28c
DS
529}
530
d7c0a89a 531uint32_t stream_getl(struct stream *s)
718e3744 532{
d7c0a89a 533 uint32_t l;
d62a17ae 534
535 STREAM_VERIFY_SANE(s);
536
d7c0a89a 537 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
d62a17ae 538 STREAM_BOUND_WARN(s, "get long");
539 return 0;
540 }
541
937652c6 542 l = (unsigned)(s->data[s->getp++]) << 24;
d62a17ae 543 l |= s->data[s->getp++] << 16;
544 l |= s->data[s->getp++] << 8;
545 l |= s->data[s->getp++];
546
547 return l;
718e3744 548}
4b201d46 549
550/* Get next quad word from the stream. */
d62a17ae 551uint64_t stream_getq_from(struct stream *s, size_t from)
4b201d46 552{
d62a17ae 553 uint64_t q;
554
555 STREAM_VERIFY_SANE(s);
556
557 if (!GETP_VALID(s, from + sizeof(uint64_t))) {
558 STREAM_BOUND_WARN(s, "get quad");
559 return 0;
560 }
561
562 q = ((uint64_t)s->data[from++]) << 56;
563 q |= ((uint64_t)s->data[from++]) << 48;
564 q |= ((uint64_t)s->data[from++]) << 40;
565 q |= ((uint64_t)s->data[from++]) << 32;
566 q |= ((uint64_t)s->data[from++]) << 24;
567 q |= ((uint64_t)s->data[from++]) << 16;
568 q |= ((uint64_t)s->data[from++]) << 8;
569 q |= ((uint64_t)s->data[from++]);
570
571 return q;
4b201d46 572}
573
d62a17ae 574uint64_t stream_getq(struct stream *s)
4b201d46 575{
d62a17ae 576 uint64_t q;
577
578 STREAM_VERIFY_SANE(s);
579
580 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
581 STREAM_BOUND_WARN(s, "get quad");
582 return 0;
583 }
584
585 q = ((uint64_t)s->data[s->getp++]) << 56;
586 q |= ((uint64_t)s->data[s->getp++]) << 48;
587 q |= ((uint64_t)s->data[s->getp++]) << 40;
588 q |= ((uint64_t)s->data[s->getp++]) << 32;
589 q |= ((uint64_t)s->data[s->getp++]) << 24;
590 q |= ((uint64_t)s->data[s->getp++]) << 16;
591 q |= ((uint64_t)s->data[s->getp++]) << 8;
592 q |= ((uint64_t)s->data[s->getp++]);
593
594 return q;
4b201d46 595}
596
c2b5a4e5
QY
597bool stream_getq2(struct stream *s, uint64_t *q)
598{
599 STREAM_VERIFY_SANE(s);
600
601 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
602 STREAM_BOUND_WARN2(s, "get uint64");
603 return false;
604 }
605
606 *q = ((uint64_t)s->data[s->getp++]) << 56;
607 *q |= ((uint64_t)s->data[s->getp++]) << 48;
608 *q |= ((uint64_t)s->data[s->getp++]) << 40;
609 *q |= ((uint64_t)s->data[s->getp++]) << 32;
610 *q |= ((uint64_t)s->data[s->getp++]) << 24;
611 *q |= ((uint64_t)s->data[s->getp++]) << 16;
612 *q |= ((uint64_t)s->data[s->getp++]) << 8;
613 *q |= ((uint64_t)s->data[s->getp++]);
614
615 return true;
616}
617
718e3744 618/* Get next long word from the stream. */
d7c0a89a 619uint32_t stream_get_ipv4(struct stream *s)
718e3744 620{
d7c0a89a 621 uint32_t l;
d62a17ae 622
623 STREAM_VERIFY_SANE(s);
624
d7c0a89a 625 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
d62a17ae 626 STREAM_BOUND_WARN(s, "get ipv4");
627 return 0;
628 }
629
d7c0a89a
QY
630 memcpy(&l, s->data + s->getp, sizeof(uint32_t));
631 s->getp += sizeof(uint32_t);
d62a17ae 632
633 return l;
718e3744 634}
6b0655a2 635
31f937fb
SM
636bool stream_get_ipaddr(struct stream *s, struct ipaddr *ip)
637{
638 uint16_t ipa_len;
639
640 STREAM_VERIFY_SANE(s);
641
642 /* Get address type. */
643 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
644 STREAM_BOUND_WARN2(s, "get ipaddr");
645 return false;
646 }
647 ip->ipa_type = stream_getw(s);
648
649 /* Get address value. */
650 switch (ip->ipa_type) {
651 case IPADDR_V4:
652 ipa_len = IPV4_MAX_BYTELEN;
653 break;
654 case IPADDR_V6:
655 ipa_len = IPV6_MAX_BYTELEN;
656 break;
657 default:
658 flog_err(EC_LIB_DEVELOPMENT,
659 "%s: unknown ip address-family: %u", __func__,
660 ip->ipa_type);
661 return false;
662 }
663 if (STREAM_READABLE(s) < ipa_len) {
664 STREAM_BOUND_WARN2(s, "get ipaddr");
665 return false;
666 }
667 memcpy(&ip->ip, s->data + s->getp, ipa_len);
668 s->getp += ipa_len;
669
670 return true;
671}
672
d62a17ae 673float stream_getf(struct stream *s)
16f1b9ee 674{
d62a17ae 675 union {
676 float r;
677 uint32_t d;
678 } u;
679 u.d = stream_getl(s);
680 return u.r;
16f1b9ee
OD
681}
682
d62a17ae 683double stream_getd(struct stream *s)
16f1b9ee 684{
d62a17ae 685 union {
686 double r;
687 uint64_t d;
688 } u;
689 u.d = stream_getq(s);
690 return u.r;
16f1b9ee
OD
691}
692
f978382d 693/* Copy from source to stream.
050c013a 694 *
695 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
696 * around. This should be fixed once the stream updates are working.
0dab9303 697 *
698 * stream_write() is saner
050c013a 699 */
d62a17ae 700void stream_put(struct stream *s, const void *src, size_t size)
718e3744 701{
702
d62a17ae 703 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
704 CHECK_SIZE(s, size);
705
706 STREAM_VERIFY_SANE(s);
707
708 if (STREAM_WRITEABLE(s) < size) {
709 STREAM_BOUND_WARN(s, "put");
710 return;
711 }
712
713 if (src)
714 memcpy(s->data + s->endp, src, size);
715 else
716 memset(s->data + s->endp, 0, size);
717
718 s->endp += size;
718e3744 719}
720
721/* Put character to the stream. */
d7c0a89a 722int stream_putc(struct stream *s, uint8_t c)
718e3744 723{
d62a17ae 724 STREAM_VERIFY_SANE(s);
725
d7c0a89a 726 if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
d62a17ae 727 STREAM_BOUND_WARN(s, "put");
728 return 0;
729 }
730
731 s->data[s->endp++] = c;
d7c0a89a 732 return sizeof(uint8_t);
718e3744 733}
734
735/* Put word to the stream. */
d7c0a89a 736int stream_putw(struct stream *s, uint16_t w)
718e3744 737{
d62a17ae 738 STREAM_VERIFY_SANE(s);
739
d7c0a89a 740 if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
d62a17ae 741 STREAM_BOUND_WARN(s, "put");
742 return 0;
743 }
744
d7c0a89a
QY
745 s->data[s->endp++] = (uint8_t)(w >> 8);
746 s->data[s->endp++] = (uint8_t)w;
d62a17ae 747
748 return 2;
718e3744 749}
750
d6f4a61d 751/* Put long word to the stream. */
d7c0a89a 752int stream_put3(struct stream *s, uint32_t l)
d6f4a61d 753{
d62a17ae 754 STREAM_VERIFY_SANE(s);
755
756 if (STREAM_WRITEABLE(s) < 3) {
757 STREAM_BOUND_WARN(s, "put");
758 return 0;
759 }
760
d7c0a89a
QY
761 s->data[s->endp++] = (uint8_t)(l >> 16);
762 s->data[s->endp++] = (uint8_t)(l >> 8);
763 s->data[s->endp++] = (uint8_t)l;
d62a17ae 764
765 return 3;
d6f4a61d
DL
766}
767
718e3744 768/* Put long word to the stream. */
d7c0a89a 769int stream_putl(struct stream *s, uint32_t l)
718e3744 770{
d62a17ae 771 STREAM_VERIFY_SANE(s);
772
d7c0a89a 773 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
d62a17ae 774 STREAM_BOUND_WARN(s, "put");
775 return 0;
776 }
777
d7c0a89a
QY
778 s->data[s->endp++] = (uint8_t)(l >> 24);
779 s->data[s->endp++] = (uint8_t)(l >> 16);
780 s->data[s->endp++] = (uint8_t)(l >> 8);
781 s->data[s->endp++] = (uint8_t)l;
d62a17ae 782
783 return 4;
718e3744 784}
785
4b201d46 786/* Put quad word to the stream. */
d62a17ae 787int stream_putq(struct stream *s, uint64_t q)
4b201d46 788{
d62a17ae 789 STREAM_VERIFY_SANE(s);
790
791 if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
792 STREAM_BOUND_WARN(s, "put quad");
793 return 0;
794 }
795
d7c0a89a
QY
796 s->data[s->endp++] = (uint8_t)(q >> 56);
797 s->data[s->endp++] = (uint8_t)(q >> 48);
798 s->data[s->endp++] = (uint8_t)(q >> 40);
799 s->data[s->endp++] = (uint8_t)(q >> 32);
800 s->data[s->endp++] = (uint8_t)(q >> 24);
801 s->data[s->endp++] = (uint8_t)(q >> 16);
802 s->data[s->endp++] = (uint8_t)(q >> 8);
803 s->data[s->endp++] = (uint8_t)q;
d62a17ae 804
805 return 8;
4b201d46 806}
807
d62a17ae 808int stream_putf(struct stream *s, float f)
16f1b9ee 809{
d62a17ae 810 union {
811 float i;
812 uint32_t o;
813 } u;
814 u.i = f;
815 return stream_putl(s, u.o);
16f1b9ee
OD
816}
817
d62a17ae 818int stream_putd(struct stream *s, double d)
16f1b9ee 819{
d62a17ae 820 union {
821 double i;
822 uint64_t o;
823 } u;
824 u.i = d;
825 return stream_putq(s, u.o);
16f1b9ee
OD
826}
827
d7c0a89a 828int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
718e3744 829{
d62a17ae 830 STREAM_VERIFY_SANE(s);
831
d7c0a89a 832 if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
d62a17ae 833 STREAM_BOUND_WARN(s, "put");
834 return 0;
835 }
836
837 s->data[putp] = c;
838
839 return 1;
718e3744 840}
841
d7c0a89a 842int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
718e3744 843{
d62a17ae 844 STREAM_VERIFY_SANE(s);
845
d7c0a89a 846 if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
d62a17ae 847 STREAM_BOUND_WARN(s, "put");
848 return 0;
849 }
850
d7c0a89a
QY
851 s->data[putp] = (uint8_t)(w >> 8);
852 s->data[putp + 1] = (uint8_t)w;
d62a17ae 853
854 return 2;
718e3744 855}
856
d7c0a89a 857int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
d6f4a61d 858{
d62a17ae 859 STREAM_VERIFY_SANE(s);
860
861 if (!PUT_AT_VALID(s, putp + 3)) {
862 STREAM_BOUND_WARN(s, "put");
863 return 0;
864 }
d7c0a89a
QY
865 s->data[putp] = (uint8_t)(l >> 16);
866 s->data[putp + 1] = (uint8_t)(l >> 8);
867 s->data[putp + 2] = (uint8_t)l;
d62a17ae 868
869 return 3;
d6f4a61d
DL
870}
871
d7c0a89a 872int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
718e3744 873{
d62a17ae 874 STREAM_VERIFY_SANE(s);
875
d7c0a89a 876 if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
d62a17ae 877 STREAM_BOUND_WARN(s, "put");
878 return 0;
879 }
d7c0a89a
QY
880 s->data[putp] = (uint8_t)(l >> 24);
881 s->data[putp + 1] = (uint8_t)(l >> 16);
882 s->data[putp + 2] = (uint8_t)(l >> 8);
883 s->data[putp + 3] = (uint8_t)l;
d62a17ae 884
885 return 4;
718e3744 886}
887
d62a17ae 888int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
4b201d46 889{
d62a17ae 890 STREAM_VERIFY_SANE(s);
891
892 if (!PUT_AT_VALID(s, putp + sizeof(uint64_t))) {
893 STREAM_BOUND_WARN(s, "put");
894 return 0;
895 }
d7c0a89a
QY
896 s->data[putp] = (uint8_t)(q >> 56);
897 s->data[putp + 1] = (uint8_t)(q >> 48);
898 s->data[putp + 2] = (uint8_t)(q >> 40);
899 s->data[putp + 3] = (uint8_t)(q >> 32);
900 s->data[putp + 4] = (uint8_t)(q >> 24);
901 s->data[putp + 5] = (uint8_t)(q >> 16);
902 s->data[putp + 6] = (uint8_t)(q >> 8);
903 s->data[putp + 7] = (uint8_t)q;
d62a17ae 904
905 return 8;
4b201d46 906}
907
718e3744 908/* Put long word to the stream. */
d7c0a89a 909int stream_put_ipv4(struct stream *s, uint32_t l)
718e3744 910{
d62a17ae 911 STREAM_VERIFY_SANE(s);
912
d7c0a89a 913 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
d62a17ae 914 STREAM_BOUND_WARN(s, "put");
915 return 0;
916 }
d7c0a89a
QY
917 memcpy(s->data + s->endp, &l, sizeof(uint32_t));
918 s->endp += sizeof(uint32_t);
d62a17ae 919
d7c0a89a 920 return sizeof(uint32_t);
718e3744 921}
922
923/* Put long word to the stream. */
d3d77ec4 924int stream_put_in_addr(struct stream *s, const struct in_addr *addr)
718e3744 925{
d62a17ae 926 STREAM_VERIFY_SANE(s);
927
d7c0a89a 928 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
d62a17ae 929 STREAM_BOUND_WARN(s, "put");
930 return 0;
931 }
932
d7c0a89a
QY
933 memcpy(s->data + s->endp, addr, sizeof(uint32_t));
934 s->endp += sizeof(uint32_t);
d62a17ae 935
d7c0a89a 936 return sizeof(uint32_t);
718e3744 937}
938
31f937fb
SM
939bool stream_put_ipaddr(struct stream *s, struct ipaddr *ip)
940{
941 stream_putw(s, ip->ipa_type);
942
943 switch (ip->ipa_type) {
944 case IPADDR_V4:
945 stream_put_in_addr(s, &ip->ipaddr_v4);
946 break;
947 case IPADDR_V6:
948 stream_write(s, (uint8_t *)&ip->ipaddr_v6, 16);
949 break;
950 default:
951 flog_err(EC_LIB_DEVELOPMENT,
952 "%s: unknown ip address-family: %u", __func__,
953 ip->ipa_type);
954 return false;
955 }
956
957 return true;
958}
959
3f9c7369 960/* Put in_addr at location in the stream. */
d3d77ec4
MS
961int stream_put_in_addr_at(struct stream *s, size_t putp,
962 const struct in_addr *addr)
3f9c7369 963{
d62a17ae 964 STREAM_VERIFY_SANE(s);
3f9c7369 965
d62a17ae 966 if (!PUT_AT_VALID(s, putp + 4)) {
967 STREAM_BOUND_WARN(s, "put");
968 return 0;
969 }
3f9c7369 970
d62a17ae 971 memcpy(&s->data[putp], addr, 4);
972 return 4;
3f9c7369
DS
973}
974
975/* Put in6_addr at location in the stream. */
d3d77ec4
MS
976int stream_put_in6_addr_at(struct stream *s, size_t putp,
977 const struct in6_addr *addr)
3f9c7369 978{
d62a17ae 979 STREAM_VERIFY_SANE(s);
3f9c7369 980
d62a17ae 981 if (!PUT_AT_VALID(s, putp + 16)) {
982 STREAM_BOUND_WARN(s, "put");
983 return 0;
984 }
3f9c7369 985
d62a17ae 986 memcpy(&s->data[putp], addr, 16);
987 return 16;
3f9c7369
DS
988}
989
718e3744 990/* Put prefix by nlri type format. */
d3d77ec4 991int stream_put_prefix_addpath(struct stream *s, const struct prefix *p,
be92fc9f 992 bool addpath_capable, uint32_t addpath_tx_id)
718e3744 993{
d62a17ae 994 size_t psize;
995 size_t psize_with_addpath;
996
997 STREAM_VERIFY_SANE(s);
998
999 psize = PSIZE(p->prefixlen);
1000
be92fc9f 1001 if (addpath_capable)
d62a17ae 1002 psize_with_addpath = psize + 4;
1003 else
1004 psize_with_addpath = psize;
1005
d7c0a89a 1006 if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
d62a17ae 1007 STREAM_BOUND_WARN(s, "put");
1008 return 0;
1009 }
1010
be92fc9f 1011 if (addpath_capable) {
d7c0a89a
QY
1012 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
1013 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
1014 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
1015 s->data[s->endp++] = (uint8_t)addpath_tx_id;
d62a17ae 1016 }
1017
1018 s->data[s->endp++] = p->prefixlen;
1019 memcpy(s->data + s->endp, &p->u.prefix, psize);
1020 s->endp += psize;
1021
1022 return psize;
718e3744 1023}
6b0655a2 1024
d3d77ec4 1025int stream_put_prefix(struct stream *s, const struct prefix *p)
adbac85e 1026{
d62a17ae 1027 return stream_put_prefix_addpath(s, p, 0, 0);
adbac85e
DW
1028}
1029
cd1964ff 1030/* Put NLRI with label */
5f040085 1031int stream_put_labeled_prefix(struct stream *s, const struct prefix *p,
be92fc9f 1032 mpls_label_t *label, bool addpath_capable,
ec15e1b5 1033 uint32_t addpath_tx_id)
cd1964ff 1034{
d62a17ae 1035 size_t psize;
ec15e1b5 1036 size_t psize_with_addpath;
d7c0a89a 1037 uint8_t *label_pnt = (uint8_t *)label;
cd1964ff 1038
d62a17ae 1039 STREAM_VERIFY_SANE(s);
cd1964ff 1040
d62a17ae 1041 psize = PSIZE(p->prefixlen);
be92fc9f 1042 psize_with_addpath = psize + (addpath_capable ? 4 : 0);
cd1964ff 1043
ec15e1b5 1044 if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) {
d62a17ae 1045 STREAM_BOUND_WARN(s, "put");
1046 return 0;
1047 }
cd1964ff 1048
be92fc9f 1049 if (addpath_capable) {
ec15e1b5
QY
1050 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
1051 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
1052 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
1053 s->data[s->endp++] = (uint8_t)addpath_tx_id;
1054 }
1055
d62a17ae 1056 stream_putc(s, (p->prefixlen + 24));
1057 stream_putc(s, label_pnt[0]);
1058 stream_putc(s, label_pnt[1]);
1059 stream_putc(s, label_pnt[2]);
1060 memcpy(s->data + s->endp, &p->u.prefix, psize);
1061 s->endp += psize;
cd1964ff 1062
d62a17ae 1063 return (psize + 3);
cd1964ff 1064}
adbac85e 1065
718e3744 1066/* Read size from fd. */
d62a17ae 1067int stream_read(struct stream *s, int fd, size_t size)
718e3744 1068{
d62a17ae 1069 int nbytes;
1070
1071 STREAM_VERIFY_SANE(s);
1072
1073 if (STREAM_WRITEABLE(s) < size) {
1074 STREAM_BOUND_WARN(s, "put");
1075 return 0;
1076 }
1077
1078 nbytes = readn(fd, s->data + s->endp, size);
1079
1080 if (nbytes > 0)
1081 s->endp += nbytes;
1082
1083 return nbytes;
718e3744 1084}
1085
d62a17ae 1086ssize_t stream_read_try(struct stream *s, int fd, size_t size)
262feb1a 1087{
d62a17ae 1088 ssize_t nbytes;
1089
1090 STREAM_VERIFY_SANE(s);
1091
1092 if (STREAM_WRITEABLE(s) < size) {
1093 STREAM_BOUND_WARN(s, "put");
1094 /* Fatal (not transient) error, since retrying will not help
1095 (stream is too small to contain the desired data). */
1096 return -1;
1097 }
1098
0e2d7076
DA
1099 nbytes = read(fd, s->data + s->endp, size);
1100 if (nbytes >= 0) {
d62a17ae 1101 s->endp += nbytes;
1102 return nbytes;
1103 }
1104 /* Error: was it transient (return -2) or fatal (return -1)? */
1105 if (ERRNO_IO_RETRY(errno))
1106 return -2;
450971aa 1107 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
a2b0f8b8 1108 safe_strerror(errno));
d62a17ae 1109 return -1;
262feb1a 1110}
1111
0dab9303 1112/* Read up to size bytes into the stream from the fd, using recvmsgfrom
1113 * whose arguments match the remaining arguments to this function
1114 */
d62a17ae 1115ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
1116 struct sockaddr *from, socklen_t *fromlen)
0dab9303 1117{
d62a17ae 1118 ssize_t nbytes;
1119
1120 STREAM_VERIFY_SANE(s);
1121
1122 if (STREAM_WRITEABLE(s) < size) {
1123 STREAM_BOUND_WARN(s, "put");
1124 /* Fatal (not transient) error, since retrying will not help
1125 (stream is too small to contain the desired data). */
1126 return -1;
1127 }
1128
0e2d7076
DA
1129 nbytes = recvfrom(fd, s->data + s->endp, size, flags, from, fromlen);
1130 if (nbytes >= 0) {
d62a17ae 1131 s->endp += nbytes;
1132 return nbytes;
1133 }
1134 /* Error: was it transient (return -2) or fatal (return -1)? */
1135 if (ERRNO_IO_RETRY(errno))
1136 return -2;
450971aa 1137 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
a2b0f8b8 1138 safe_strerror(errno));
d62a17ae 1139 return -1;
0dab9303 1140}
1141
050c013a 1142/* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1143 * from endp.
1144 * First iovec will be used to receive the data.
1145 * Stream need not be empty.
1146 */
d62a17ae 1147ssize_t stream_recvmsg(struct stream *s, int fd, struct msghdr *msgh, int flags,
1148 size_t size)
050c013a 1149{
d62a17ae 1150 int nbytes;
1151 struct iovec *iov;
1152
1153 STREAM_VERIFY_SANE(s);
1154 assert(msgh->msg_iovlen > 0);
1155
1156 if (STREAM_WRITEABLE(s) < size) {
1157 STREAM_BOUND_WARN(s, "put");
1158 /* This is a logic error in the calling code: the stream is too
1159 small
1160 to hold the desired data! */
1161 return -1;
1162 }
1163
1164 iov = &(msgh->msg_iov[0]);
1165 iov->iov_base = (s->data + s->endp);
1166 iov->iov_len = size;
1167
1168 nbytes = recvmsg(fd, msgh, flags);
1169
1170 if (nbytes > 0)
1171 s->endp += nbytes;
1172
1173 return nbytes;
050c013a 1174}
d62a17ae 1175
718e3744 1176/* Write data to buffer. */
d62a17ae 1177size_t stream_write(struct stream *s, const void *ptr, size_t size)
718e3744 1178{
1179
d62a17ae 1180 CHECK_SIZE(s, size);
1181
1182 STREAM_VERIFY_SANE(s);
1183
1184 if (STREAM_WRITEABLE(s) < size) {
1185 STREAM_BOUND_WARN(s, "put");
1186 return 0;
1187 }
718e3744 1188
d62a17ae 1189 memcpy(s->data + s->endp, ptr, size);
1190 s->endp += size;
9985f83c 1191
d62a17ae 1192 return size;
718e3744 1193}
1194
d62a17ae 1195/* Return current read pointer.
050c013a 1196 * DEPRECATED!
1197 * Use stream_get_pnt_to if you must, but decoding streams properly
1198 * is preferred
1199 */
d7c0a89a 1200uint8_t *stream_pnt(struct stream *s)
718e3744 1201{
d62a17ae 1202 STREAM_VERIFY_SANE(s);
1203 return s->data + s->getp;
718e3744 1204}
1205
1206/* Check does this stream empty? */
d62a17ae 1207int stream_empty(struct stream *s)
718e3744 1208{
d62a17ae 1209 STREAM_VERIFY_SANE(s);
050c013a 1210
d62a17ae 1211 return (s->endp == 0);
718e3744 1212}
1213
1214/* Reset stream. */
d62a17ae 1215void stream_reset(struct stream *s)
718e3744 1216{
d62a17ae 1217 STREAM_VERIFY_SANE(s);
050c013a 1218
d62a17ae 1219 s->getp = s->endp = 0;
718e3744 1220}
1221
1222/* Write stream contens to the file discriptor. */
d62a17ae 1223int stream_flush(struct stream *s, int fd)
718e3744 1224{
d62a17ae 1225 int nbytes;
1226
1227 STREAM_VERIFY_SANE(s);
1228
1229 nbytes = write(fd, s->data + s->getp, s->endp - s->getp);
1230
1231 return nbytes;
718e3744 1232}
6b0655a2 1233
f1bc75da 1234void stream_hexdump(const struct stream *s)
9d72660d
WC
1235{
1236 zlog_hexdump(s->data, s->endp);
1237}
1238
718e3744 1239/* Stream first in first out queue. */
1240
d62a17ae 1241struct stream_fifo *stream_fifo_new(void)
718e3744 1242{
d62a17ae 1243 struct stream_fifo *new;
1244
f8c511cd
MS
1245 new = XMALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
1246 stream_fifo_init(new);
d62a17ae 1247 return new;
718e3744 1248}
1249
f8c511cd
MS
1250void stream_fifo_init(struct stream_fifo *fifo)
1251{
1252 memset(fifo, 0, sizeof(struct stream_fifo));
1253 pthread_mutex_init(&fifo->mtx, NULL);
1254}
1255
718e3744 1256/* Add new stream to fifo. */
d62a17ae 1257void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
718e3744 1258{
03ed85a6
DS
1259#if defined DEV_BUILD
1260 size_t max, curmax;
1261#endif
1262
d62a17ae 1263 if (fifo->tail)
1264 fifo->tail->next = s;
1265 else
1266 fifo->head = s;
1267
1268 fifo->tail = s;
08a0e54e 1269 fifo->tail->next = NULL;
03ed85a6 1270#if !defined DEV_BUILD
363e24c6 1271 atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
03ed85a6
DS
1272#else
1273 max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1274 curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
1275 if (max > curmax)
1276 atomic_store_explicit(&fifo->max_count, max,
1277 memory_order_relaxed);
1278#endif
363e24c6
QY
1279}
1280
1281void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
1282{
cb1991af 1283 frr_with_mutex (&fifo->mtx) {
363e24c6
QY
1284 stream_fifo_push(fifo, s);
1285 }
718e3744 1286}
1287
1288/* Delete first stream from fifo. */
d62a17ae 1289struct stream *stream_fifo_pop(struct stream_fifo *fifo)
718e3744 1290{
d62a17ae 1291 struct stream *s;
1292
1293 s = fifo->head;
718e3744 1294
d62a17ae 1295 if (s) {
1296 fifo->head = s->next;
718e3744 1297
d62a17ae 1298 if (fifo->head == NULL)
1299 fifo->tail = NULL;
718e3744 1300
363e24c6
QY
1301 atomic_fetch_sub_explicit(&fifo->count, 1,
1302 memory_order_release);
08a0e54e
QY
1303
1304 /* ensure stream is scrubbed of references to this fifo */
1305 s->next = NULL;
d62a17ae 1306 }
718e3744 1307
d62a17ae 1308 return s;
718e3744 1309}
1310
363e24c6
QY
1311struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
1312{
1313 struct stream *ret;
1314
cb1991af 1315 frr_with_mutex (&fifo->mtx) {
363e24c6
QY
1316 ret = stream_fifo_pop(fifo);
1317 }
363e24c6
QY
1318
1319 return ret;
1320}
1321
d62a17ae 1322struct stream *stream_fifo_head(struct stream_fifo *fifo)
718e3744 1323{
d62a17ae 1324 return fifo->head;
718e3744 1325}
1326
363e24c6
QY
1327struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
1328{
1329 struct stream *ret;
1330
cb1991af 1331 frr_with_mutex (&fifo->mtx) {
363e24c6
QY
1332 ret = stream_fifo_head(fifo);
1333 }
363e24c6
QY
1334
1335 return ret;
1336}
1337
d62a17ae 1338void stream_fifo_clean(struct stream_fifo *fifo)
718e3744 1339{
d62a17ae 1340 struct stream *s;
1341 struct stream *next;
1342
1343 for (s = fifo->head; s; s = next) {
1344 next = s->next;
1345 stream_free(s);
1346 }
1347 fifo->head = fifo->tail = NULL;
363e24c6
QY
1348 atomic_store_explicit(&fifo->count, 0, memory_order_release);
1349}
1350
1351void stream_fifo_clean_safe(struct stream_fifo *fifo)
1352{
cb1991af 1353 frr_with_mutex (&fifo->mtx) {
363e24c6
QY
1354 stream_fifo_clean(fifo);
1355 }
363e24c6
QY
1356}
1357
1358size_t stream_fifo_count_safe(struct stream_fifo *fifo)
1359{
1360 return atomic_load_explicit(&fifo->count, memory_order_acquire);
718e3744 1361}
1362
f8c511cd 1363void stream_fifo_deinit(struct stream_fifo *fifo)
718e3744 1364{
d62a17ae 1365 stream_fifo_clean(fifo);
363e24c6 1366 pthread_mutex_destroy(&fifo->mtx);
f8c511cd
MS
1367}
1368
1369void stream_fifo_free(struct stream_fifo *fifo)
1370{
1371 stream_fifo_deinit(fifo);
d62a17ae 1372 XFREE(MTYPE_STREAM_FIFO, fifo);
718e3744 1373}
91804f63
RZ
1374
1375void stream_pulldown(struct stream *s)
1376{
1377 size_t rlen = STREAM_READABLE(s);
1378
1379 /* No more data, so just move the pointers. */
1380 if (rlen == 0) {
1381 stream_reset(s);
1382 return;
1383 }
1384
1385 /* Move the available data to the beginning. */
1386 memmove(s->data, &s->data[s->getp], rlen);
1387 s->getp = 0;
1388 s->endp = rlen;
1389}