]> git.proxmox.com Git - mirror_frr.git/blame - lib/stream.c
Merge pull request #12798 from donaldsharp/rib_match_multicast
[mirror_frr.git] / lib / stream.c
CommitLineData
acddc0ed 1// SPDX-License-Identifier: GPL-2.0-or-later
896014f4 2/*
718e3744 3 * Packet interface
4 * Copyright (C) 1999 Kunihiro Ishiguro
718e3744 5 */
6
7#include <zebra.h>
7a2fbbf0 8#include <stddef.h>
363e24c6 9#include <pthread.h>
718e3744 10
11#include "stream.h"
12#include "memory.h"
13#include "network.h"
14#include "prefix.h"
050c013a 15#include "log.h"
00dffa8c 16#include "frr_pthread.h"
de75223e 17#include "lib_errors.h"
718e3744 18
bf8d3d6a
DL
19DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream");
20DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO");
4a1ab8e4 21
d62a17ae 22/* Tests whether a position is valid */
23#define GETP_VALID(S, G) ((G) <= (S)->endp)
050c013a 24#define PUT_AT_VALID(S,G) GETP_VALID(S,G)
d62a17ae 25#define ENDP_VALID(S, E) ((E) <= (S)->size)
718e3744 26
050c013a 27/* asserting sanity checks. Following must be true before
28 * stream functions are called:
29 *
30 * Following must always be true of stream elements
31 * before and after calls to stream functions:
32 *
33 * getp <= endp <= size
34 *
35 * Note that after a stream function is called following may be true:
36 * if (getp == endp) then stream is no longer readable
37 * if (endp == size) then stream is no longer writeable
38 *
39 * It is valid to put to anywhere within the size of the stream, but only
40 * using stream_put..._at() functions.
41 */
d62a17ae 42#define STREAM_WARN_OFFSETS(S) \
3211b92b
MS
43 do { \
44 flog_warn(EC_LIB_STREAM, \
1d5453d6 45 "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu", \
3211b92b
MS
46 (void *)(S), (unsigned long)(S)->size, \
47 (unsigned long)(S)->getp, (unsigned long)(S)->endp); \
48 zlog_backtrace(LOG_WARNING); \
49 } while (0)
d62a17ae 50
51#define STREAM_VERIFY_SANE(S) \
52 do { \
3211b92b 53 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) { \
d62a17ae 54 STREAM_WARN_OFFSETS(S); \
3211b92b 55 } \
d62a17ae 56 assert(GETP_VALID(S, (S)->getp)); \
57 assert(ENDP_VALID(S, (S)->endp)); \
58 } while (0)
59
996c9314
LB
60#define STREAM_BOUND_WARN(S, WHAT) \
61 do { \
1c50c1c0 62 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
a2b0f8b8 63 __func__, (WHAT)); \
996c9314
LB
64 STREAM_WARN_OFFSETS(S); \
65 assert(0); \
051cc28c
DS
66 } while (0)
67
996c9314
LB
68#define STREAM_BOUND_WARN2(S, WHAT) \
69 do { \
1c50c1c0 70 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
a2b0f8b8 71 __func__, (WHAT)); \
996c9314 72 STREAM_WARN_OFFSETS(S); \
d62a17ae 73 } while (0)
050c013a 74
75/* XXX: Deprecated macro: do not use */
d62a17ae 76#define CHECK_SIZE(S, Z) \
77 do { \
78 if (((S)->endp + (Z)) > (S)->size) { \
a2b0f8b8 79 flog_warn( \
1c50c1c0 80 EC_LIB_STREAM, \
1d5453d6 81 "CHECK_SIZE: truncating requested size %lu", \
d62a17ae 82 (unsigned long)(Z)); \
83 STREAM_WARN_OFFSETS(S); \
84 (Z) = (S)->size - (S)->endp; \
85 } \
86 } while (0);
718e3744 87
88/* Make stream buffer. */
d62a17ae 89struct stream *stream_new(size_t size)
718e3744 90{
d62a17ae 91 struct stream *s;
92
93 assert(size > 0);
94
de75223e 95 s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size);
d62a17ae 96
565b5561
DS
97 s->getp = s->endp = 0;
98 s->next = NULL;
d62a17ae 99 s->size = size;
100 return s;
718e3744 101}
102
103/* Free it now. */
d62a17ae 104void stream_free(struct stream *s)
718e3744 105{
d62a17ae 106 if (!s)
107 return;
108
d62a17ae 109 XFREE(MTYPE_STREAM, s);
718e3744 110}
050c013a 111
f8c511cd 112struct stream *stream_copy(struct stream *dest, const struct stream *src)
050c013a 113{
d62a17ae 114 STREAM_VERIFY_SANE(src);
115
f8c511cd
MS
116 assert(dest != NULL);
117 assert(STREAM_SIZE(dest) >= src->endp);
d62a17ae 118
f8c511cd
MS
119 dest->endp = src->endp;
120 dest->getp = src->getp;
d62a17ae 121
f8c511cd 122 memcpy(dest->data, src->data, src->endp);
d62a17ae 123
f8c511cd 124 return dest;
050c013a 125}
126
f8c511cd 127struct stream *stream_dup(const struct stream *s)
050c013a 128{
f8c511cd 129 struct stream *snew;
050c013a 130
d62a17ae 131 STREAM_VERIFY_SANE(s);
050c013a 132
ab08ef82 133 snew = stream_new(s->endp);
050c013a 134
f8c511cd 135 return (stream_copy(snew, s));
050c013a 136}
4b201d46 137
f8c511cd 138struct stream *stream_dupcat(const struct stream *s1, const struct stream *s2,
d62a17ae 139 size_t offset)
8c71e481 140{
d62a17ae 141 struct stream *new;
8c71e481 142
d62a17ae 143 STREAM_VERIFY_SANE(s1);
144 STREAM_VERIFY_SANE(s2);
8c71e481 145
d62a17ae 146 if ((new = stream_new(s1->endp + s2->endp)) == NULL)
147 return NULL;
8c71e481 148
d62a17ae 149 memcpy(new->data, s1->data, offset);
150 memcpy(new->data + offset, s2->data, s2->endp);
151 memcpy(new->data + offset + s2->endp, s1->data + offset,
152 (s1->endp - offset));
153 new->endp = s1->endp + s2->endp;
154 return new;
8c71e481
PM
155}
156
43888669
DS
157size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
158{
de75223e 159 struct stream *orig = *sptr;
43888669 160
de75223e 161 STREAM_VERIFY_SANE(orig);
d62a17ae 162
de75223e 163 orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize);
d62a17ae 164
de75223e 165 orig->size = newsize;
d62a17ae 166
de75223e
DS
167 if (orig->endp > orig->size)
168 orig->endp = orig->size;
169 if (orig->getp > orig->endp)
170 orig->getp = orig->endp;
d62a17ae 171
de75223e 172 STREAM_VERIFY_SANE(orig);
d62a17ae 173
de75223e
DS
174 *sptr = orig;
175 return orig->size;
176}
d62a17ae 177
f8c511cd 178size_t stream_get_getp(const struct stream *s)
718e3744 179{
d62a17ae 180 STREAM_VERIFY_SANE(s);
181 return s->getp;
718e3744 182}
183
f8c511cd 184size_t stream_get_endp(const struct stream *s)
718e3744 185{
d62a17ae 186 STREAM_VERIFY_SANE(s);
187 return s->endp;
718e3744 188}
189
f8c511cd 190size_t stream_get_size(const struct stream *s)
718e3744 191{
d62a17ae 192 STREAM_VERIFY_SANE(s);
193 return s->size;
718e3744 194}
195
196/* Stream structre' stream pointer related functions. */
d62a17ae 197void stream_set_getp(struct stream *s, size_t pos)
718e3744 198{
d62a17ae 199 STREAM_VERIFY_SANE(s);
200
201 if (!GETP_VALID(s, pos)) {
202 STREAM_BOUND_WARN(s, "set getp");
203 pos = s->endp;
204 }
205
206 s->getp = pos;
718e3744 207}
208
d62a17ae 209void stream_set_endp(struct stream *s, size_t pos)
d531050b 210{
d62a17ae 211 STREAM_VERIFY_SANE(s);
212
213 if (!ENDP_VALID(s, pos)) {
214 STREAM_BOUND_WARN(s, "set endp");
215 return;
216 }
217
218 /*
219 * Make sure the current read pointer is not beyond the new endp.
220 */
221 if (s->getp > pos) {
222 STREAM_BOUND_WARN(s, "set endp");
223 return;
224 }
225
226 s->endp = pos;
227 STREAM_VERIFY_SANE(s);
d531050b
SV
228}
229
9985f83c 230/* Forward pointer. */
d62a17ae 231void stream_forward_getp(struct stream *s, size_t size)
718e3744 232{
d62a17ae 233 STREAM_VERIFY_SANE(s);
234
235 if (!GETP_VALID(s, s->getp + size)) {
236 STREAM_BOUND_WARN(s, "seek getp");
237 return;
238 }
239
240 s->getp += size;
718e3744 241}
242
4adddf0a
QY
243bool stream_forward_getp2(struct stream *s, size_t size)
244{
245 STREAM_VERIFY_SANE(s);
246
247 if (!GETP_VALID(s, s->getp + size))
248 return false;
249
250 s->getp += size;
251
252 return true;
253}
254
06cf2c0c
QY
255void stream_rewind_getp(struct stream *s, size_t size)
256{
257 STREAM_VERIFY_SANE(s);
258
259 if (size > s->getp || !GETP_VALID(s, s->getp - size)) {
260 STREAM_BOUND_WARN(s, "rewind getp");
261 return;
262 }
263
264 s->getp -= size;
265}
266
267bool stream_rewind_getp2(struct stream *s, size_t size)
268{
269 STREAM_VERIFY_SANE(s);
270
271 if (size > s->getp || !GETP_VALID(s, s->getp - size))
272 return false;
273
274 s->getp -= size;
275
276 return true;
277}
278
d62a17ae 279void stream_forward_endp(struct stream *s, size_t size)
718e3744 280{
d62a17ae 281 STREAM_VERIFY_SANE(s);
282
283 if (!ENDP_VALID(s, s->endp + size)) {
284 STREAM_BOUND_WARN(s, "seek endp");
285 return;
286 }
287
288 s->endp += size;
718e3744 289}
6b0655a2 290
4adddf0a
QY
291bool stream_forward_endp2(struct stream *s, size_t size)
292{
293 STREAM_VERIFY_SANE(s);
294
295 if (!ENDP_VALID(s, s->endp + size))
296 return false;
297
298 s->endp += size;
299
300 return true;
301}
302
718e3744 303/* Copy from stream to destination. */
3009394b 304bool stream_get2(void *dst, struct stream *s, size_t size)
051cc28c
DS
305{
306 STREAM_VERIFY_SANE(s);
307
308 if (STREAM_READABLE(s) < size) {
309 STREAM_BOUND_WARN2(s, "get");
310 return false;
311 }
312
313 memcpy(dst, s->data + s->getp, size);
314 s->getp += size;
315
316 return true;
317}
318
d62a17ae 319void stream_get(void *dst, struct stream *s, size_t size)
718e3744 320{
d62a17ae 321 STREAM_VERIFY_SANE(s);
322
323 if (STREAM_READABLE(s) < size) {
324 STREAM_BOUND_WARN(s, "get");
325 return;
326 }
327
328 memcpy(dst, s->data + s->getp, size);
329 s->getp += size;
718e3744 330}
331
332/* Get next character from the stream. */
3009394b 333bool stream_getc2(struct stream *s, uint8_t *byte)
051cc28c
DS
334{
335 STREAM_VERIFY_SANE(s);
336
d7c0a89a 337 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
051cc28c
DS
338 STREAM_BOUND_WARN2(s, "get char");
339 return false;
340 }
341 *byte = s->data[s->getp++];
342
343 return true;
344}
345
d7c0a89a 346uint8_t stream_getc(struct stream *s)
718e3744 347{
d7c0a89a 348 uint8_t c;
d62a17ae 349
350 STREAM_VERIFY_SANE(s);
351
d7c0a89a 352 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
d62a17ae 353 STREAM_BOUND_WARN(s, "get char");
354 return 0;
355 }
356 c = s->data[s->getp++];
357
358 return c;
718e3744 359}
360
361/* Get next character from the stream. */
d7c0a89a 362uint8_t stream_getc_from(struct stream *s, size_t from)
718e3744 363{
d7c0a89a 364 uint8_t c;
d62a17ae 365
366 STREAM_VERIFY_SANE(s);
367
d7c0a89a 368 if (!GETP_VALID(s, from + sizeof(uint8_t))) {
d62a17ae 369 STREAM_BOUND_WARN(s, "get char");
370 return 0;
371 }
372
373 c = s->data[from];
374
375 return c;
718e3744 376}
377
3009394b 378bool stream_getw2(struct stream *s, uint16_t *word)
051cc28c
DS
379{
380 STREAM_VERIFY_SANE(s);
381
382 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
383 STREAM_BOUND_WARN2(s, "get ");
384 return false;
385 }
386
996c9314 387 *word = s->data[s->getp++] << 8;
051cc28c
DS
388 *word |= s->data[s->getp++];
389
390 return true;
391}
392
718e3744 393/* Get next word from the stream. */
d7c0a89a 394uint16_t stream_getw(struct stream *s)
718e3744 395{
d7c0a89a 396 uint16_t w;
d62a17ae 397
398 STREAM_VERIFY_SANE(s);
399
d7c0a89a 400 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
d62a17ae 401 STREAM_BOUND_WARN(s, "get ");
402 return 0;
403 }
404
405 w = s->data[s->getp++] << 8;
406 w |= s->data[s->getp++];
407
408 return w;
718e3744 409}
410
411/* Get next word from the stream. */
d7c0a89a 412uint16_t stream_getw_from(struct stream *s, size_t from)
718e3744 413{
d7c0a89a 414 uint16_t w;
d62a17ae 415
416 STREAM_VERIFY_SANE(s);
417
d7c0a89a 418 if (!GETP_VALID(s, from + sizeof(uint16_t))) {
d62a17ae 419 STREAM_BOUND_WARN(s, "get ");
420 return 0;
421 }
422
423 w = s->data[from++] << 8;
424 w |= s->data[from];
425
426 return w;
718e3744 427}
428
d6f4a61d 429/* Get next 3-byte from the stream. */
d7c0a89a 430uint32_t stream_get3_from(struct stream *s, size_t from)
d6f4a61d 431{
d7c0a89a 432 uint32_t l;
d62a17ae 433
434 STREAM_VERIFY_SANE(s);
435
436 if (!GETP_VALID(s, from + 3)) {
437 STREAM_BOUND_WARN(s, "get 3byte");
438 return 0;
439 }
440
441 l = s->data[from++] << 16;
442 l |= s->data[from++] << 8;
443 l |= s->data[from];
444
445 return l;
d6f4a61d
DL
446}
447
d7c0a89a 448uint32_t stream_get3(struct stream *s)
d6f4a61d 449{
d7c0a89a 450 uint32_t l;
d62a17ae 451
452 STREAM_VERIFY_SANE(s);
453
454 if (STREAM_READABLE(s) < 3) {
455 STREAM_BOUND_WARN(s, "get 3byte");
456 return 0;
457 }
458
459 l = s->data[s->getp++] << 16;
460 l |= s->data[s->getp++] << 8;
461 l |= s->data[s->getp++];
462
463 return l;
d6f4a61d
DL
464}
465
718e3744 466/* Get next long word from the stream. */
d7c0a89a 467uint32_t stream_getl_from(struct stream *s, size_t from)
050c013a 468{
d7c0a89a 469 uint32_t l;
d62a17ae 470
471 STREAM_VERIFY_SANE(s);
472
d7c0a89a 473 if (!GETP_VALID(s, from + sizeof(uint32_t))) {
d62a17ae 474 STREAM_BOUND_WARN(s, "get long");
475 return 0;
476 }
477
937652c6 478 l = (unsigned)(s->data[from++]) << 24;
d62a17ae 479 l |= s->data[from++] << 16;
480 l |= s->data[from++] << 8;
481 l |= s->data[from];
482
483 return l;
050c013a 484}
485
3f9c7369 486/* Copy from stream at specific location to destination. */
d62a17ae 487void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
3f9c7369 488{
d62a17ae 489 STREAM_VERIFY_SANE(s);
3f9c7369 490
d62a17ae 491 if (!GETP_VALID(s, from + size)) {
492 STREAM_BOUND_WARN(s, "get from");
493 return;
494 }
3f9c7369 495
d62a17ae 496 memcpy(dst, s->data + from, size);
3f9c7369
DS
497}
498
3009394b 499bool stream_getl2(struct stream *s, uint32_t *l)
051cc28c
DS
500{
501 STREAM_VERIFY_SANE(s);
502
503 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
504 STREAM_BOUND_WARN2(s, "get long");
505 return false;
506 }
507
996c9314 508 *l = (unsigned int)(s->data[s->getp++]) << 24;
051cc28c
DS
509 *l |= s->data[s->getp++] << 16;
510 *l |= s->data[s->getp++] << 8;
511 *l |= s->data[s->getp++];
512
513 return true;
051cc28c
DS
514}
515
d7c0a89a 516uint32_t stream_getl(struct stream *s)
718e3744 517{
d7c0a89a 518 uint32_t l;
d62a17ae 519
520 STREAM_VERIFY_SANE(s);
521
d7c0a89a 522 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
d62a17ae 523 STREAM_BOUND_WARN(s, "get long");
524 return 0;
525 }
526
937652c6 527 l = (unsigned)(s->data[s->getp++]) << 24;
d62a17ae 528 l |= s->data[s->getp++] << 16;
529 l |= s->data[s->getp++] << 8;
530 l |= s->data[s->getp++];
531
532 return l;
718e3744 533}
4b201d46 534
535/* Get next quad word from the stream. */
d62a17ae 536uint64_t stream_getq_from(struct stream *s, size_t from)
4b201d46 537{
d62a17ae 538 uint64_t q;
539
540 STREAM_VERIFY_SANE(s);
541
542 if (!GETP_VALID(s, from + sizeof(uint64_t))) {
543 STREAM_BOUND_WARN(s, "get quad");
544 return 0;
545 }
546
547 q = ((uint64_t)s->data[from++]) << 56;
548 q |= ((uint64_t)s->data[from++]) << 48;
549 q |= ((uint64_t)s->data[from++]) << 40;
550 q |= ((uint64_t)s->data[from++]) << 32;
551 q |= ((uint64_t)s->data[from++]) << 24;
552 q |= ((uint64_t)s->data[from++]) << 16;
553 q |= ((uint64_t)s->data[from++]) << 8;
554 q |= ((uint64_t)s->data[from++]);
555
556 return q;
4b201d46 557}
558
d62a17ae 559uint64_t stream_getq(struct stream *s)
4b201d46 560{
d62a17ae 561 uint64_t q;
562
563 STREAM_VERIFY_SANE(s);
564
565 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
566 STREAM_BOUND_WARN(s, "get quad");
567 return 0;
568 }
569
570 q = ((uint64_t)s->data[s->getp++]) << 56;
571 q |= ((uint64_t)s->data[s->getp++]) << 48;
572 q |= ((uint64_t)s->data[s->getp++]) << 40;
573 q |= ((uint64_t)s->data[s->getp++]) << 32;
574 q |= ((uint64_t)s->data[s->getp++]) << 24;
575 q |= ((uint64_t)s->data[s->getp++]) << 16;
576 q |= ((uint64_t)s->data[s->getp++]) << 8;
577 q |= ((uint64_t)s->data[s->getp++]);
578
579 return q;
4b201d46 580}
581
c2b5a4e5
QY
582bool stream_getq2(struct stream *s, uint64_t *q)
583{
584 STREAM_VERIFY_SANE(s);
585
586 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
587 STREAM_BOUND_WARN2(s, "get uint64");
588 return false;
589 }
590
591 *q = ((uint64_t)s->data[s->getp++]) << 56;
592 *q |= ((uint64_t)s->data[s->getp++]) << 48;
593 *q |= ((uint64_t)s->data[s->getp++]) << 40;
594 *q |= ((uint64_t)s->data[s->getp++]) << 32;
595 *q |= ((uint64_t)s->data[s->getp++]) << 24;
596 *q |= ((uint64_t)s->data[s->getp++]) << 16;
597 *q |= ((uint64_t)s->data[s->getp++]) << 8;
598 *q |= ((uint64_t)s->data[s->getp++]);
599
600 return true;
601}
602
718e3744 603/* Get next long word from the stream. */
d7c0a89a 604uint32_t stream_get_ipv4(struct stream *s)
718e3744 605{
d7c0a89a 606 uint32_t l;
d62a17ae 607
608 STREAM_VERIFY_SANE(s);
609
d7c0a89a 610 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
d62a17ae 611 STREAM_BOUND_WARN(s, "get ipv4");
612 return 0;
613 }
614
d7c0a89a
QY
615 memcpy(&l, s->data + s->getp, sizeof(uint32_t));
616 s->getp += sizeof(uint32_t);
d62a17ae 617
618 return l;
718e3744 619}
6b0655a2 620
31f937fb
SM
621bool stream_get_ipaddr(struct stream *s, struct ipaddr *ip)
622{
bde30e78 623 uint16_t ipa_len = 0;
31f937fb
SM
624
625 STREAM_VERIFY_SANE(s);
626
627 /* Get address type. */
628 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
629 STREAM_BOUND_WARN2(s, "get ipaddr");
630 return false;
631 }
632 ip->ipa_type = stream_getw(s);
633
634 /* Get address value. */
635 switch (ip->ipa_type) {
636 case IPADDR_V4:
637 ipa_len = IPV4_MAX_BYTELEN;
638 break;
639 case IPADDR_V6:
640 ipa_len = IPV6_MAX_BYTELEN;
641 break;
bde30e78 642 case IPADDR_NONE:
31f937fb
SM
643 flog_err(EC_LIB_DEVELOPMENT,
644 "%s: unknown ip address-family: %u", __func__,
645 ip->ipa_type);
646 return false;
647 }
648 if (STREAM_READABLE(s) < ipa_len) {
649 STREAM_BOUND_WARN2(s, "get ipaddr");
650 return false;
651 }
652 memcpy(&ip->ip, s->data + s->getp, ipa_len);
653 s->getp += ipa_len;
654
655 return true;
656}
657
d62a17ae 658float stream_getf(struct stream *s)
16f1b9ee 659{
d62a17ae 660 union {
661 float r;
662 uint32_t d;
663 } u;
664 u.d = stream_getl(s);
665 return u.r;
16f1b9ee
OD
666}
667
d62a17ae 668double stream_getd(struct stream *s)
16f1b9ee 669{
d62a17ae 670 union {
671 double r;
672 uint64_t d;
673 } u;
674 u.d = stream_getq(s);
675 return u.r;
16f1b9ee
OD
676}
677
f978382d 678/* Copy from source to stream.
050c013a 679 *
680 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
681 * around. This should be fixed once the stream updates are working.
0dab9303 682 *
683 * stream_write() is saner
050c013a 684 */
d62a17ae 685void stream_put(struct stream *s, const void *src, size_t size)
718e3744 686{
687
d62a17ae 688 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
689 CHECK_SIZE(s, size);
690
691 STREAM_VERIFY_SANE(s);
692
693 if (STREAM_WRITEABLE(s) < size) {
694 STREAM_BOUND_WARN(s, "put");
695 return;
696 }
697
698 if (src)
699 memcpy(s->data + s->endp, src, size);
700 else
701 memset(s->data + s->endp, 0, size);
702
703 s->endp += size;
718e3744 704}
705
706/* Put character to the stream. */
d7c0a89a 707int stream_putc(struct stream *s, uint8_t c)
718e3744 708{
d62a17ae 709 STREAM_VERIFY_SANE(s);
710
d7c0a89a 711 if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
d62a17ae 712 STREAM_BOUND_WARN(s, "put");
713 return 0;
714 }
715
716 s->data[s->endp++] = c;
d7c0a89a 717 return sizeof(uint8_t);
718e3744 718}
719
720/* Put word to the stream. */
d7c0a89a 721int stream_putw(struct stream *s, uint16_t w)
718e3744 722{
d62a17ae 723 STREAM_VERIFY_SANE(s);
724
d7c0a89a 725 if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
d62a17ae 726 STREAM_BOUND_WARN(s, "put");
727 return 0;
728 }
729
d7c0a89a
QY
730 s->data[s->endp++] = (uint8_t)(w >> 8);
731 s->data[s->endp++] = (uint8_t)w;
d62a17ae 732
733 return 2;
718e3744 734}
735
d6f4a61d 736/* Put long word to the stream. */
d7c0a89a 737int stream_put3(struct stream *s, uint32_t l)
d6f4a61d 738{
d62a17ae 739 STREAM_VERIFY_SANE(s);
740
741 if (STREAM_WRITEABLE(s) < 3) {
742 STREAM_BOUND_WARN(s, "put");
743 return 0;
744 }
745
d7c0a89a
QY
746 s->data[s->endp++] = (uint8_t)(l >> 16);
747 s->data[s->endp++] = (uint8_t)(l >> 8);
748 s->data[s->endp++] = (uint8_t)l;
d62a17ae 749
750 return 3;
d6f4a61d
DL
751}
752
718e3744 753/* Put long word to the stream. */
d7c0a89a 754int stream_putl(struct stream *s, uint32_t l)
718e3744 755{
d62a17ae 756 STREAM_VERIFY_SANE(s);
757
d7c0a89a 758 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
d62a17ae 759 STREAM_BOUND_WARN(s, "put");
760 return 0;
761 }
762
d7c0a89a
QY
763 s->data[s->endp++] = (uint8_t)(l >> 24);
764 s->data[s->endp++] = (uint8_t)(l >> 16);
765 s->data[s->endp++] = (uint8_t)(l >> 8);
766 s->data[s->endp++] = (uint8_t)l;
d62a17ae 767
768 return 4;
718e3744 769}
770
4b201d46 771/* Put quad word to the stream. */
d62a17ae 772int stream_putq(struct stream *s, uint64_t q)
4b201d46 773{
d62a17ae 774 STREAM_VERIFY_SANE(s);
775
776 if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
777 STREAM_BOUND_WARN(s, "put quad");
778 return 0;
779 }
780
d7c0a89a
QY
781 s->data[s->endp++] = (uint8_t)(q >> 56);
782 s->data[s->endp++] = (uint8_t)(q >> 48);
783 s->data[s->endp++] = (uint8_t)(q >> 40);
784 s->data[s->endp++] = (uint8_t)(q >> 32);
785 s->data[s->endp++] = (uint8_t)(q >> 24);
786 s->data[s->endp++] = (uint8_t)(q >> 16);
787 s->data[s->endp++] = (uint8_t)(q >> 8);
788 s->data[s->endp++] = (uint8_t)q;
d62a17ae 789
790 return 8;
4b201d46 791}
792
d62a17ae 793int stream_putf(struct stream *s, float f)
16f1b9ee 794{
d62a17ae 795 union {
796 float i;
797 uint32_t o;
798 } u;
799 u.i = f;
800 return stream_putl(s, u.o);
16f1b9ee
OD
801}
802
d62a17ae 803int stream_putd(struct stream *s, double d)
16f1b9ee 804{
d62a17ae 805 union {
806 double i;
807 uint64_t o;
808 } u;
809 u.i = d;
810 return stream_putq(s, u.o);
16f1b9ee
OD
811}
812
d7c0a89a 813int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
718e3744 814{
d62a17ae 815 STREAM_VERIFY_SANE(s);
816
d7c0a89a 817 if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
d62a17ae 818 STREAM_BOUND_WARN(s, "put");
819 return 0;
820 }
821
822 s->data[putp] = c;
823
824 return 1;
718e3744 825}
826
d7c0a89a 827int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
718e3744 828{
d62a17ae 829 STREAM_VERIFY_SANE(s);
830
d7c0a89a 831 if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
d62a17ae 832 STREAM_BOUND_WARN(s, "put");
833 return 0;
834 }
835
d7c0a89a
QY
836 s->data[putp] = (uint8_t)(w >> 8);
837 s->data[putp + 1] = (uint8_t)w;
d62a17ae 838
839 return 2;
718e3744 840}
841
d7c0a89a 842int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
d6f4a61d 843{
d62a17ae 844 STREAM_VERIFY_SANE(s);
845
846 if (!PUT_AT_VALID(s, putp + 3)) {
847 STREAM_BOUND_WARN(s, "put");
848 return 0;
849 }
d7c0a89a
QY
850 s->data[putp] = (uint8_t)(l >> 16);
851 s->data[putp + 1] = (uint8_t)(l >> 8);
852 s->data[putp + 2] = (uint8_t)l;
d62a17ae 853
854 return 3;
d6f4a61d
DL
855}
856
d7c0a89a 857int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
718e3744 858{
d62a17ae 859 STREAM_VERIFY_SANE(s);
860
d7c0a89a 861 if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
d62a17ae 862 STREAM_BOUND_WARN(s, "put");
863 return 0;
864 }
d7c0a89a
QY
865 s->data[putp] = (uint8_t)(l >> 24);
866 s->data[putp + 1] = (uint8_t)(l >> 16);
867 s->data[putp + 2] = (uint8_t)(l >> 8);
868 s->data[putp + 3] = (uint8_t)l;
d62a17ae 869
870 return 4;
718e3744 871}
872
d62a17ae 873int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
4b201d46 874{
d62a17ae 875 STREAM_VERIFY_SANE(s);
876
877 if (!PUT_AT_VALID(s, putp + sizeof(uint64_t))) {
878 STREAM_BOUND_WARN(s, "put");
879 return 0;
880 }
d7c0a89a
QY
881 s->data[putp] = (uint8_t)(q >> 56);
882 s->data[putp + 1] = (uint8_t)(q >> 48);
883 s->data[putp + 2] = (uint8_t)(q >> 40);
884 s->data[putp + 3] = (uint8_t)(q >> 32);
885 s->data[putp + 4] = (uint8_t)(q >> 24);
886 s->data[putp + 5] = (uint8_t)(q >> 16);
887 s->data[putp + 6] = (uint8_t)(q >> 8);
888 s->data[putp + 7] = (uint8_t)q;
d62a17ae 889
890 return 8;
4b201d46 891}
892
718e3744 893/* Put long word to the stream. */
d7c0a89a 894int stream_put_ipv4(struct stream *s, uint32_t l)
718e3744 895{
d62a17ae 896 STREAM_VERIFY_SANE(s);
897
d7c0a89a 898 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
d62a17ae 899 STREAM_BOUND_WARN(s, "put");
900 return 0;
901 }
d7c0a89a
QY
902 memcpy(s->data + s->endp, &l, sizeof(uint32_t));
903 s->endp += sizeof(uint32_t);
d62a17ae 904
d7c0a89a 905 return sizeof(uint32_t);
718e3744 906}
907
908/* Put long word to the stream. */
d3d77ec4 909int stream_put_in_addr(struct stream *s, const struct in_addr *addr)
718e3744 910{
d62a17ae 911 STREAM_VERIFY_SANE(s);
912
d7c0a89a 913 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
d62a17ae 914 STREAM_BOUND_WARN(s, "put");
915 return 0;
916 }
917
d7c0a89a
QY
918 memcpy(s->data + s->endp, addr, sizeof(uint32_t));
919 s->endp += sizeof(uint32_t);
d62a17ae 920
d7c0a89a 921 return sizeof(uint32_t);
718e3744 922}
923
31f937fb
SM
924bool stream_put_ipaddr(struct stream *s, struct ipaddr *ip)
925{
926 stream_putw(s, ip->ipa_type);
927
928 switch (ip->ipa_type) {
929 case IPADDR_V4:
930 stream_put_in_addr(s, &ip->ipaddr_v4);
931 break;
932 case IPADDR_V6:
933 stream_write(s, (uint8_t *)&ip->ipaddr_v6, 16);
934 break;
bde30e78 935 case IPADDR_NONE:
31f937fb
SM
936 flog_err(EC_LIB_DEVELOPMENT,
937 "%s: unknown ip address-family: %u", __func__,
938 ip->ipa_type);
939 return false;
940 }
941
942 return true;
943}
944
3f9c7369 945/* Put in_addr at location in the stream. */
d3d77ec4
MS
946int stream_put_in_addr_at(struct stream *s, size_t putp,
947 const struct in_addr *addr)
3f9c7369 948{
d62a17ae 949 STREAM_VERIFY_SANE(s);
3f9c7369 950
d62a17ae 951 if (!PUT_AT_VALID(s, putp + 4)) {
952 STREAM_BOUND_WARN(s, "put");
953 return 0;
954 }
3f9c7369 955
d62a17ae 956 memcpy(&s->data[putp], addr, 4);
957 return 4;
3f9c7369
DS
958}
959
960/* Put in6_addr at location in the stream. */
d3d77ec4
MS
961int stream_put_in6_addr_at(struct stream *s, size_t putp,
962 const struct in6_addr *addr)
3f9c7369 963{
d62a17ae 964 STREAM_VERIFY_SANE(s);
3f9c7369 965
d62a17ae 966 if (!PUT_AT_VALID(s, putp + 16)) {
967 STREAM_BOUND_WARN(s, "put");
968 return 0;
969 }
3f9c7369 970
d62a17ae 971 memcpy(&s->data[putp], addr, 16);
972 return 16;
3f9c7369
DS
973}
974
718e3744 975/* Put prefix by nlri type format. */
d3d77ec4 976int stream_put_prefix_addpath(struct stream *s, const struct prefix *p,
be92fc9f 977 bool addpath_capable, uint32_t addpath_tx_id)
718e3744 978{
d62a17ae 979 size_t psize;
980 size_t psize_with_addpath;
981
982 STREAM_VERIFY_SANE(s);
983
984 psize = PSIZE(p->prefixlen);
985
be92fc9f 986 if (addpath_capable)
d62a17ae 987 psize_with_addpath = psize + 4;
988 else
989 psize_with_addpath = psize;
990
d7c0a89a 991 if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
d62a17ae 992 STREAM_BOUND_WARN(s, "put");
993 return 0;
994 }
995
be92fc9f 996 if (addpath_capable) {
d7c0a89a
QY
997 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
998 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
999 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
1000 s->data[s->endp++] = (uint8_t)addpath_tx_id;
d62a17ae 1001 }
1002
1003 s->data[s->endp++] = p->prefixlen;
1004 memcpy(s->data + s->endp, &p->u.prefix, psize);
1005 s->endp += psize;
1006
1007 return psize;
718e3744 1008}
6b0655a2 1009
d3d77ec4 1010int stream_put_prefix(struct stream *s, const struct prefix *p)
adbac85e 1011{
d62a17ae 1012 return stream_put_prefix_addpath(s, p, 0, 0);
adbac85e
DW
1013}
1014
cd1964ff 1015/* Put NLRI with label */
5f040085 1016int stream_put_labeled_prefix(struct stream *s, const struct prefix *p,
be92fc9f 1017 mpls_label_t *label, bool addpath_capable,
ec15e1b5 1018 uint32_t addpath_tx_id)
cd1964ff 1019{
d62a17ae 1020 size_t psize;
ec15e1b5 1021 size_t psize_with_addpath;
d7c0a89a 1022 uint8_t *label_pnt = (uint8_t *)label;
cd1964ff 1023
d62a17ae 1024 STREAM_VERIFY_SANE(s);
cd1964ff 1025
d62a17ae 1026 psize = PSIZE(p->prefixlen);
be92fc9f 1027 psize_with_addpath = psize + (addpath_capable ? 4 : 0);
cd1964ff 1028
ec15e1b5 1029 if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) {
d62a17ae 1030 STREAM_BOUND_WARN(s, "put");
1031 return 0;
1032 }
cd1964ff 1033
be92fc9f 1034 if (addpath_capable) {
ec15e1b5
QY
1035 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
1036 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
1037 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
1038 s->data[s->endp++] = (uint8_t)addpath_tx_id;
1039 }
1040
d62a17ae 1041 stream_putc(s, (p->prefixlen + 24));
1042 stream_putc(s, label_pnt[0]);
1043 stream_putc(s, label_pnt[1]);
1044 stream_putc(s, label_pnt[2]);
1045 memcpy(s->data + s->endp, &p->u.prefix, psize);
1046 s->endp += psize;
cd1964ff 1047
d62a17ae 1048 return (psize + 3);
cd1964ff 1049}
adbac85e 1050
718e3744 1051/* Read size from fd. */
d62a17ae 1052int stream_read(struct stream *s, int fd, size_t size)
718e3744 1053{
d62a17ae 1054 int nbytes;
1055
1056 STREAM_VERIFY_SANE(s);
1057
1058 if (STREAM_WRITEABLE(s) < size) {
1059 STREAM_BOUND_WARN(s, "put");
1060 return 0;
1061 }
1062
1063 nbytes = readn(fd, s->data + s->endp, size);
1064
1065 if (nbytes > 0)
1066 s->endp += nbytes;
1067
1068 return nbytes;
718e3744 1069}
1070
d62a17ae 1071ssize_t stream_read_try(struct stream *s, int fd, size_t size)
262feb1a 1072{
d62a17ae 1073 ssize_t nbytes;
1074
1075 STREAM_VERIFY_SANE(s);
1076
1077 if (STREAM_WRITEABLE(s) < size) {
1078 STREAM_BOUND_WARN(s, "put");
1079 /* Fatal (not transient) error, since retrying will not help
1080 (stream is too small to contain the desired data). */
1081 return -1;
1082 }
1083
0e2d7076
DA
1084 nbytes = read(fd, s->data + s->endp, size);
1085 if (nbytes >= 0) {
d62a17ae 1086 s->endp += nbytes;
1087 return nbytes;
1088 }
1089 /* Error: was it transient (return -2) or fatal (return -1)? */
1090 if (ERRNO_IO_RETRY(errno))
1091 return -2;
450971aa 1092 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
a2b0f8b8 1093 safe_strerror(errno));
d62a17ae 1094 return -1;
262feb1a 1095}
1096
0dab9303 1097/* Read up to size bytes into the stream from the fd, using recvmsgfrom
1098 * whose arguments match the remaining arguments to this function
1099 */
d62a17ae 1100ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
1101 struct sockaddr *from, socklen_t *fromlen)
0dab9303 1102{
d62a17ae 1103 ssize_t nbytes;
1104
1105 STREAM_VERIFY_SANE(s);
1106
1107 if (STREAM_WRITEABLE(s) < size) {
1108 STREAM_BOUND_WARN(s, "put");
1109 /* Fatal (not transient) error, since retrying will not help
1110 (stream is too small to contain the desired data). */
1111 return -1;
1112 }
1113
0e2d7076
DA
1114 nbytes = recvfrom(fd, s->data + s->endp, size, flags, from, fromlen);
1115 if (nbytes >= 0) {
d62a17ae 1116 s->endp += nbytes;
1117 return nbytes;
1118 }
1119 /* Error: was it transient (return -2) or fatal (return -1)? */
1120 if (ERRNO_IO_RETRY(errno))
1121 return -2;
450971aa 1122 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
a2b0f8b8 1123 safe_strerror(errno));
d62a17ae 1124 return -1;
0dab9303 1125}
1126
050c013a 1127/* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1128 * from endp.
1129 * First iovec will be used to receive the data.
1130 * Stream need not be empty.
1131 */
d62a17ae 1132ssize_t stream_recvmsg(struct stream *s, int fd, struct msghdr *msgh, int flags,
1133 size_t size)
050c013a 1134{
d62a17ae 1135 int nbytes;
1136 struct iovec *iov;
1137
1138 STREAM_VERIFY_SANE(s);
1139 assert(msgh->msg_iovlen > 0);
1140
1141 if (STREAM_WRITEABLE(s) < size) {
1142 STREAM_BOUND_WARN(s, "put");
1143 /* This is a logic error in the calling code: the stream is too
1144 small
1145 to hold the desired data! */
1146 return -1;
1147 }
1148
1149 iov = &(msgh->msg_iov[0]);
1150 iov->iov_base = (s->data + s->endp);
1151 iov->iov_len = size;
1152
1153 nbytes = recvmsg(fd, msgh, flags);
1154
1155 if (nbytes > 0)
1156 s->endp += nbytes;
1157
1158 return nbytes;
050c013a 1159}
d62a17ae 1160
718e3744 1161/* Write data to buffer. */
d62a17ae 1162size_t stream_write(struct stream *s, const void *ptr, size_t size)
718e3744 1163{
1164
d62a17ae 1165 CHECK_SIZE(s, size);
1166
1167 STREAM_VERIFY_SANE(s);
1168
1169 if (STREAM_WRITEABLE(s) < size) {
1170 STREAM_BOUND_WARN(s, "put");
1171 return 0;
1172 }
718e3744 1173
d62a17ae 1174 memcpy(s->data + s->endp, ptr, size);
1175 s->endp += size;
9985f83c 1176
d62a17ae 1177 return size;
718e3744 1178}
1179
d62a17ae 1180/* Return current read pointer.
050c013a 1181 * DEPRECATED!
1182 * Use stream_get_pnt_to if you must, but decoding streams properly
1183 * is preferred
1184 */
d7c0a89a 1185uint8_t *stream_pnt(struct stream *s)
718e3744 1186{
d62a17ae 1187 STREAM_VERIFY_SANE(s);
1188 return s->data + s->getp;
718e3744 1189}
1190
1191/* Check does this stream empty? */
d62a17ae 1192int stream_empty(struct stream *s)
718e3744 1193{
d62a17ae 1194 STREAM_VERIFY_SANE(s);
050c013a 1195
d62a17ae 1196 return (s->endp == 0);
718e3744 1197}
1198
1199/* Reset stream. */
d62a17ae 1200void stream_reset(struct stream *s)
718e3744 1201{
d62a17ae 1202 STREAM_VERIFY_SANE(s);
050c013a 1203
d62a17ae 1204 s->getp = s->endp = 0;
718e3744 1205}
1206
1207/* Write stream contens to the file discriptor. */
d62a17ae 1208int stream_flush(struct stream *s, int fd)
718e3744 1209{
d62a17ae 1210 int nbytes;
1211
1212 STREAM_VERIFY_SANE(s);
1213
1214 nbytes = write(fd, s->data + s->getp, s->endp - s->getp);
1215
1216 return nbytes;
718e3744 1217}
6b0655a2 1218
f1bc75da 1219void stream_hexdump(const struct stream *s)
9d72660d
WC
1220{
1221 zlog_hexdump(s->data, s->endp);
1222}
1223
718e3744 1224/* Stream first in first out queue. */
1225
d62a17ae 1226struct stream_fifo *stream_fifo_new(void)
718e3744 1227{
d62a17ae 1228 struct stream_fifo *new;
1229
f8c511cd
MS
1230 new = XMALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
1231 stream_fifo_init(new);
d62a17ae 1232 return new;
718e3744 1233}
1234
f8c511cd
MS
1235void stream_fifo_init(struct stream_fifo *fifo)
1236{
1237 memset(fifo, 0, sizeof(struct stream_fifo));
1238 pthread_mutex_init(&fifo->mtx, NULL);
1239}
1240
718e3744 1241/* Add new stream to fifo. */
d62a17ae 1242void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
718e3744 1243{
03ed85a6
DS
1244#if defined DEV_BUILD
1245 size_t max, curmax;
1246#endif
1247
d62a17ae 1248 if (fifo->tail)
1249 fifo->tail->next = s;
1250 else
1251 fifo->head = s;
1252
1253 fifo->tail = s;
08a0e54e 1254 fifo->tail->next = NULL;
03ed85a6 1255#if !defined DEV_BUILD
363e24c6 1256 atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
03ed85a6
DS
1257#else
1258 max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1259 curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
1260 if (max > curmax)
1261 atomic_store_explicit(&fifo->max_count, max,
1262 memory_order_relaxed);
1263#endif
363e24c6
QY
1264}
1265
1266void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
1267{
cb1991af 1268 frr_with_mutex (&fifo->mtx) {
363e24c6
QY
1269 stream_fifo_push(fifo, s);
1270 }
718e3744 1271}
1272
1273/* Delete first stream from fifo. */
d62a17ae 1274struct stream *stream_fifo_pop(struct stream_fifo *fifo)
718e3744 1275{
d62a17ae 1276 struct stream *s;
1277
1278 s = fifo->head;
718e3744 1279
d62a17ae 1280 if (s) {
1281 fifo->head = s->next;
718e3744 1282
d62a17ae 1283 if (fifo->head == NULL)
1284 fifo->tail = NULL;
718e3744 1285
363e24c6
QY
1286 atomic_fetch_sub_explicit(&fifo->count, 1,
1287 memory_order_release);
08a0e54e
QY
1288
1289 /* ensure stream is scrubbed of references to this fifo */
1290 s->next = NULL;
d62a17ae 1291 }
718e3744 1292
d62a17ae 1293 return s;
718e3744 1294}
1295
363e24c6
QY
1296struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
1297{
1298 struct stream *ret;
1299
cb1991af 1300 frr_with_mutex (&fifo->mtx) {
363e24c6
QY
1301 ret = stream_fifo_pop(fifo);
1302 }
363e24c6
QY
1303
1304 return ret;
1305}
1306
d62a17ae 1307struct stream *stream_fifo_head(struct stream_fifo *fifo)
718e3744 1308{
d62a17ae 1309 return fifo->head;
718e3744 1310}
1311
363e24c6
QY
1312struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
1313{
1314 struct stream *ret;
1315
cb1991af 1316 frr_with_mutex (&fifo->mtx) {
363e24c6
QY
1317 ret = stream_fifo_head(fifo);
1318 }
363e24c6
QY
1319
1320 return ret;
1321}
1322
d62a17ae 1323void stream_fifo_clean(struct stream_fifo *fifo)
718e3744 1324{
d62a17ae 1325 struct stream *s;
1326 struct stream *next;
1327
1328 for (s = fifo->head; s; s = next) {
1329 next = s->next;
1330 stream_free(s);
1331 }
1332 fifo->head = fifo->tail = NULL;
363e24c6
QY
1333 atomic_store_explicit(&fifo->count, 0, memory_order_release);
1334}
1335
1336void stream_fifo_clean_safe(struct stream_fifo *fifo)
1337{
cb1991af 1338 frr_with_mutex (&fifo->mtx) {
363e24c6
QY
1339 stream_fifo_clean(fifo);
1340 }
363e24c6
QY
1341}
1342
1343size_t stream_fifo_count_safe(struct stream_fifo *fifo)
1344{
1345 return atomic_load_explicit(&fifo->count, memory_order_acquire);
718e3744 1346}
1347
f8c511cd 1348void stream_fifo_deinit(struct stream_fifo *fifo)
718e3744 1349{
d62a17ae 1350 stream_fifo_clean(fifo);
363e24c6 1351 pthread_mutex_destroy(&fifo->mtx);
f8c511cd
MS
1352}
1353
1354void stream_fifo_free(struct stream_fifo *fifo)
1355{
1356 stream_fifo_deinit(fifo);
d62a17ae 1357 XFREE(MTYPE_STREAM_FIFO, fifo);
718e3744 1358}
91804f63
RZ
1359
1360void stream_pulldown(struct stream *s)
1361{
1362 size_t rlen = STREAM_READABLE(s);
1363
1364 /* No more data, so just move the pointers. */
1365 if (rlen == 0) {
1366 stream_reset(s);
1367 return;
1368 }
1369
1370 /* Move the available data to the beginning. */
1371 memmove(s->data, &s->data[s->getp], rlen);
1372 s->getp = 0;
1373 s->endp = rlen;
1374}