]> git.proxmox.com Git - mirror_frr.git/blame - lib/stream.c
Merge pull request #8036 from qlyoung/disable-mallinfo
[mirror_frr.git] / lib / stream.c
CommitLineData
896014f4 1/*
718e3744 2 * Packet interface
3 * Copyright (C) 1999 Kunihiro Ishiguro
4 *
5 * This file is part of GNU Zebra.
6 *
7 * GNU Zebra is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
10 * later version.
11 *
12 * GNU Zebra is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
896014f4
DL
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
718e3744 20 */
21
22#include <zebra.h>
7a2fbbf0 23#include <stddef.h>
363e24c6 24#include <pthread.h>
718e3744 25
26#include "stream.h"
27#include "memory.h"
28#include "network.h"
29#include "prefix.h"
050c013a 30#include "log.h"
00dffa8c 31#include "frr_pthread.h"
de75223e 32#include "lib_errors.h"
718e3744 33
d62a17ae 34DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream")
4a1ab8e4
DL
35DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO")
36
d62a17ae 37/* Tests whether a position is valid */
38#define GETP_VALID(S, G) ((G) <= (S)->endp)
050c013a 39#define PUT_AT_VALID(S,G) GETP_VALID(S,G)
d62a17ae 40#define ENDP_VALID(S, E) ((E) <= (S)->size)
718e3744 41
050c013a 42/* asserting sanity checks. Following must be true before
43 * stream functions are called:
44 *
45 * Following must always be true of stream elements
46 * before and after calls to stream functions:
47 *
48 * getp <= endp <= size
49 *
50 * Note that after a stream function is called following may be true:
51 * if (getp == endp) then stream is no longer readable
52 * if (endp == size) then stream is no longer writeable
53 *
54 * It is valid to put to anywhere within the size of the stream, but only
55 * using stream_put..._at() functions.
56 */
d62a17ae 57#define STREAM_WARN_OFFSETS(S) \
3211b92b
MS
58 do { \
59 flog_warn(EC_LIB_STREAM, \
60 "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
61 (void *)(S), (unsigned long)(S)->size, \
62 (unsigned long)(S)->getp, (unsigned long)(S)->endp); \
63 zlog_backtrace(LOG_WARNING); \
64 } while (0)
d62a17ae 65
66#define STREAM_VERIFY_SANE(S) \
67 do { \
3211b92b 68 if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) { \
d62a17ae 69 STREAM_WARN_OFFSETS(S); \
3211b92b 70 } \
d62a17ae 71 assert(GETP_VALID(S, (S)->getp)); \
72 assert(ENDP_VALID(S, (S)->endp)); \
73 } while (0)
74
996c9314
LB
75#define STREAM_BOUND_WARN(S, WHAT) \
76 do { \
1c50c1c0 77 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
a2b0f8b8 78 __func__, (WHAT)); \
996c9314
LB
79 STREAM_WARN_OFFSETS(S); \
80 assert(0); \
051cc28c
DS
81 } while (0)
82
996c9314
LB
83#define STREAM_BOUND_WARN2(S, WHAT) \
84 do { \
1c50c1c0 85 flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
a2b0f8b8 86 __func__, (WHAT)); \
996c9314 87 STREAM_WARN_OFFSETS(S); \
d62a17ae 88 } while (0)
050c013a 89
90/* XXX: Deprecated macro: do not use */
d62a17ae 91#define CHECK_SIZE(S, Z) \
92 do { \
93 if (((S)->endp + (Z)) > (S)->size) { \
a2b0f8b8 94 flog_warn( \
1c50c1c0 95 EC_LIB_STREAM, \
d62a17ae 96 "CHECK_SIZE: truncating requested size %lu\n", \
97 (unsigned long)(Z)); \
98 STREAM_WARN_OFFSETS(S); \
99 (Z) = (S)->size - (S)->endp; \
100 } \
101 } while (0);
718e3744 102
103/* Make stream buffer. */
d62a17ae 104struct stream *stream_new(size_t size)
718e3744 105{
d62a17ae 106 struct stream *s;
107
108 assert(size > 0);
109
de75223e 110 s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size);
d62a17ae 111
565b5561
DS
112 s->getp = s->endp = 0;
113 s->next = NULL;
d62a17ae 114 s->size = size;
115 return s;
718e3744 116}
117
118/* Free it now. */
d62a17ae 119void stream_free(struct stream *s)
718e3744 120{
d62a17ae 121 if (!s)
122 return;
123
d62a17ae 124 XFREE(MTYPE_STREAM, s);
718e3744 125}
050c013a 126
f8c511cd 127struct stream *stream_copy(struct stream *dest, const struct stream *src)
050c013a 128{
d62a17ae 129 STREAM_VERIFY_SANE(src);
130
f8c511cd
MS
131 assert(dest != NULL);
132 assert(STREAM_SIZE(dest) >= src->endp);
d62a17ae 133
f8c511cd
MS
134 dest->endp = src->endp;
135 dest->getp = src->getp;
d62a17ae 136
f8c511cd 137 memcpy(dest->data, src->data, src->endp);
d62a17ae 138
f8c511cd 139 return dest;
050c013a 140}
141
f8c511cd 142struct stream *stream_dup(const struct stream *s)
050c013a 143{
f8c511cd 144 struct stream *snew;
050c013a 145
d62a17ae 146 STREAM_VERIFY_SANE(s);
050c013a 147
f8c511cd 148 if ((snew = stream_new(s->endp)) == NULL)
d62a17ae 149 return NULL;
050c013a 150
f8c511cd 151 return (stream_copy(snew, s));
050c013a 152}
4b201d46 153
f8c511cd 154struct stream *stream_dupcat(const struct stream *s1, const struct stream *s2,
d62a17ae 155 size_t offset)
8c71e481 156{
d62a17ae 157 struct stream *new;
8c71e481 158
d62a17ae 159 STREAM_VERIFY_SANE(s1);
160 STREAM_VERIFY_SANE(s2);
8c71e481 161
d62a17ae 162 if ((new = stream_new(s1->endp + s2->endp)) == NULL)
163 return NULL;
8c71e481 164
d62a17ae 165 memcpy(new->data, s1->data, offset);
166 memcpy(new->data + offset, s2->data, s2->endp);
167 memcpy(new->data + offset + s2->endp, s1->data + offset,
168 (s1->endp - offset));
169 new->endp = s1->endp + s2->endp;
170 return new;
8c71e481
PM
171}
172
43888669
DS
173size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
174{
de75223e 175 struct stream *orig = *sptr;
43888669 176
de75223e 177 STREAM_VERIFY_SANE(orig);
d62a17ae 178
de75223e 179 orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize);
d62a17ae 180
de75223e 181 orig->size = newsize;
d62a17ae 182
de75223e
DS
183 if (orig->endp > orig->size)
184 orig->endp = orig->size;
185 if (orig->getp > orig->endp)
186 orig->getp = orig->endp;
d62a17ae 187
de75223e 188 STREAM_VERIFY_SANE(orig);
d62a17ae 189
de75223e
DS
190 *sptr = orig;
191 return orig->size;
192}
d62a17ae 193
f8c511cd 194size_t stream_get_getp(const struct stream *s)
718e3744 195{
d62a17ae 196 STREAM_VERIFY_SANE(s);
197 return s->getp;
718e3744 198}
199
f8c511cd 200size_t stream_get_endp(const struct stream *s)
718e3744 201{
d62a17ae 202 STREAM_VERIFY_SANE(s);
203 return s->endp;
718e3744 204}
205
f8c511cd 206size_t stream_get_size(const struct stream *s)
718e3744 207{
d62a17ae 208 STREAM_VERIFY_SANE(s);
209 return s->size;
718e3744 210}
211
212/* Stream structre' stream pointer related functions. */
d62a17ae 213void stream_set_getp(struct stream *s, size_t pos)
718e3744 214{
d62a17ae 215 STREAM_VERIFY_SANE(s);
216
217 if (!GETP_VALID(s, pos)) {
218 STREAM_BOUND_WARN(s, "set getp");
219 pos = s->endp;
220 }
221
222 s->getp = pos;
718e3744 223}
224
d62a17ae 225void stream_set_endp(struct stream *s, size_t pos)
d531050b 226{
d62a17ae 227 STREAM_VERIFY_SANE(s);
228
229 if (!ENDP_VALID(s, pos)) {
230 STREAM_BOUND_WARN(s, "set endp");
231 return;
232 }
233
234 /*
235 * Make sure the current read pointer is not beyond the new endp.
236 */
237 if (s->getp > pos) {
238 STREAM_BOUND_WARN(s, "set endp");
239 return;
240 }
241
242 s->endp = pos;
243 STREAM_VERIFY_SANE(s);
d531050b
SV
244}
245
9985f83c 246/* Forward pointer. */
d62a17ae 247void stream_forward_getp(struct stream *s, size_t size)
718e3744 248{
d62a17ae 249 STREAM_VERIFY_SANE(s);
250
251 if (!GETP_VALID(s, s->getp + size)) {
252 STREAM_BOUND_WARN(s, "seek getp");
253 return;
254 }
255
256 s->getp += size;
718e3744 257}
258
4adddf0a
QY
259bool stream_forward_getp2(struct stream *s, size_t size)
260{
261 STREAM_VERIFY_SANE(s);
262
263 if (!GETP_VALID(s, s->getp + size))
264 return false;
265
266 s->getp += size;
267
268 return true;
269}
270
06cf2c0c
QY
271void stream_rewind_getp(struct stream *s, size_t size)
272{
273 STREAM_VERIFY_SANE(s);
274
275 if (size > s->getp || !GETP_VALID(s, s->getp - size)) {
276 STREAM_BOUND_WARN(s, "rewind getp");
277 return;
278 }
279
280 s->getp -= size;
281}
282
283bool stream_rewind_getp2(struct stream *s, size_t size)
284{
285 STREAM_VERIFY_SANE(s);
286
287 if (size > s->getp || !GETP_VALID(s, s->getp - size))
288 return false;
289
290 s->getp -= size;
291
292 return true;
293}
294
d62a17ae 295void stream_forward_endp(struct stream *s, size_t size)
718e3744 296{
d62a17ae 297 STREAM_VERIFY_SANE(s);
298
299 if (!ENDP_VALID(s, s->endp + size)) {
300 STREAM_BOUND_WARN(s, "seek endp");
301 return;
302 }
303
304 s->endp += size;
718e3744 305}
6b0655a2 306
4adddf0a
QY
307bool stream_forward_endp2(struct stream *s, size_t size)
308{
309 STREAM_VERIFY_SANE(s);
310
311 if (!ENDP_VALID(s, s->endp + size))
312 return false;
313
314 s->endp += size;
315
316 return true;
317}
318
718e3744 319/* Copy from stream to destination. */
3009394b 320bool stream_get2(void *dst, struct stream *s, size_t size)
051cc28c
DS
321{
322 STREAM_VERIFY_SANE(s);
323
324 if (STREAM_READABLE(s) < size) {
325 STREAM_BOUND_WARN2(s, "get");
326 return false;
327 }
328
329 memcpy(dst, s->data + s->getp, size);
330 s->getp += size;
331
332 return true;
333}
334
d62a17ae 335void stream_get(void *dst, struct stream *s, size_t size)
718e3744 336{
d62a17ae 337 STREAM_VERIFY_SANE(s);
338
339 if (STREAM_READABLE(s) < size) {
340 STREAM_BOUND_WARN(s, "get");
341 return;
342 }
343
344 memcpy(dst, s->data + s->getp, size);
345 s->getp += size;
718e3744 346}
347
348/* Get next character from the stream. */
3009394b 349bool stream_getc2(struct stream *s, uint8_t *byte)
051cc28c
DS
350{
351 STREAM_VERIFY_SANE(s);
352
d7c0a89a 353 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
051cc28c
DS
354 STREAM_BOUND_WARN2(s, "get char");
355 return false;
356 }
357 *byte = s->data[s->getp++];
358
359 return true;
360}
361
d7c0a89a 362uint8_t stream_getc(struct stream *s)
718e3744 363{
d7c0a89a 364 uint8_t c;
d62a17ae 365
366 STREAM_VERIFY_SANE(s);
367
d7c0a89a 368 if (STREAM_READABLE(s) < sizeof(uint8_t)) {
d62a17ae 369 STREAM_BOUND_WARN(s, "get char");
370 return 0;
371 }
372 c = s->data[s->getp++];
373
374 return c;
718e3744 375}
376
377/* Get next character from the stream. */
d7c0a89a 378uint8_t stream_getc_from(struct stream *s, size_t from)
718e3744 379{
d7c0a89a 380 uint8_t c;
d62a17ae 381
382 STREAM_VERIFY_SANE(s);
383
d7c0a89a 384 if (!GETP_VALID(s, from + sizeof(uint8_t))) {
d62a17ae 385 STREAM_BOUND_WARN(s, "get char");
386 return 0;
387 }
388
389 c = s->data[from];
390
391 return c;
718e3744 392}
393
3009394b 394bool stream_getw2(struct stream *s, uint16_t *word)
051cc28c
DS
395{
396 STREAM_VERIFY_SANE(s);
397
398 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
399 STREAM_BOUND_WARN2(s, "get ");
400 return false;
401 }
402
996c9314 403 *word = s->data[s->getp++] << 8;
051cc28c
DS
404 *word |= s->data[s->getp++];
405
406 return true;
407}
408
718e3744 409/* Get next word from the stream. */
d7c0a89a 410uint16_t stream_getw(struct stream *s)
718e3744 411{
d7c0a89a 412 uint16_t w;
d62a17ae 413
414 STREAM_VERIFY_SANE(s);
415
d7c0a89a 416 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
d62a17ae 417 STREAM_BOUND_WARN(s, "get ");
418 return 0;
419 }
420
421 w = s->data[s->getp++] << 8;
422 w |= s->data[s->getp++];
423
424 return w;
718e3744 425}
426
427/* Get next word from the stream. */
d7c0a89a 428uint16_t stream_getw_from(struct stream *s, size_t from)
718e3744 429{
d7c0a89a 430 uint16_t w;
d62a17ae 431
432 STREAM_VERIFY_SANE(s);
433
d7c0a89a 434 if (!GETP_VALID(s, from + sizeof(uint16_t))) {
d62a17ae 435 STREAM_BOUND_WARN(s, "get ");
436 return 0;
437 }
438
439 w = s->data[from++] << 8;
440 w |= s->data[from];
441
442 return w;
718e3744 443}
444
d6f4a61d 445/* Get next 3-byte from the stream. */
d7c0a89a 446uint32_t stream_get3_from(struct stream *s, size_t from)
d6f4a61d 447{
d7c0a89a 448 uint32_t l;
d62a17ae 449
450 STREAM_VERIFY_SANE(s);
451
452 if (!GETP_VALID(s, from + 3)) {
453 STREAM_BOUND_WARN(s, "get 3byte");
454 return 0;
455 }
456
457 l = s->data[from++] << 16;
458 l |= s->data[from++] << 8;
459 l |= s->data[from];
460
461 return l;
d6f4a61d
DL
462}
463
d7c0a89a 464uint32_t stream_get3(struct stream *s)
d6f4a61d 465{
d7c0a89a 466 uint32_t l;
d62a17ae 467
468 STREAM_VERIFY_SANE(s);
469
470 if (STREAM_READABLE(s) < 3) {
471 STREAM_BOUND_WARN(s, "get 3byte");
472 return 0;
473 }
474
475 l = s->data[s->getp++] << 16;
476 l |= s->data[s->getp++] << 8;
477 l |= s->data[s->getp++];
478
479 return l;
d6f4a61d
DL
480}
481
718e3744 482/* Get next long word from the stream. */
d7c0a89a 483uint32_t stream_getl_from(struct stream *s, size_t from)
050c013a 484{
d7c0a89a 485 uint32_t l;
d62a17ae 486
487 STREAM_VERIFY_SANE(s);
488
d7c0a89a 489 if (!GETP_VALID(s, from + sizeof(uint32_t))) {
d62a17ae 490 STREAM_BOUND_WARN(s, "get long");
491 return 0;
492 }
493
937652c6 494 l = (unsigned)(s->data[from++]) << 24;
d62a17ae 495 l |= s->data[from++] << 16;
496 l |= s->data[from++] << 8;
497 l |= s->data[from];
498
499 return l;
050c013a 500}
501
3f9c7369 502/* Copy from stream at specific location to destination. */
d62a17ae 503void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
3f9c7369 504{
d62a17ae 505 STREAM_VERIFY_SANE(s);
3f9c7369 506
d62a17ae 507 if (!GETP_VALID(s, from + size)) {
508 STREAM_BOUND_WARN(s, "get from");
509 return;
510 }
3f9c7369 511
d62a17ae 512 memcpy(dst, s->data + from, size);
3f9c7369
DS
513}
514
3009394b 515bool stream_getl2(struct stream *s, uint32_t *l)
051cc28c
DS
516{
517 STREAM_VERIFY_SANE(s);
518
519 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
520 STREAM_BOUND_WARN2(s, "get long");
521 return false;
522 }
523
996c9314 524 *l = (unsigned int)(s->data[s->getp++]) << 24;
051cc28c
DS
525 *l |= s->data[s->getp++] << 16;
526 *l |= s->data[s->getp++] << 8;
527 *l |= s->data[s->getp++];
528
529 return true;
051cc28c
DS
530}
531
d7c0a89a 532uint32_t stream_getl(struct stream *s)
718e3744 533{
d7c0a89a 534 uint32_t l;
d62a17ae 535
536 STREAM_VERIFY_SANE(s);
537
d7c0a89a 538 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
d62a17ae 539 STREAM_BOUND_WARN(s, "get long");
540 return 0;
541 }
542
937652c6 543 l = (unsigned)(s->data[s->getp++]) << 24;
d62a17ae 544 l |= s->data[s->getp++] << 16;
545 l |= s->data[s->getp++] << 8;
546 l |= s->data[s->getp++];
547
548 return l;
718e3744 549}
4b201d46 550
551/* Get next quad word from the stream. */
d62a17ae 552uint64_t stream_getq_from(struct stream *s, size_t from)
4b201d46 553{
d62a17ae 554 uint64_t q;
555
556 STREAM_VERIFY_SANE(s);
557
558 if (!GETP_VALID(s, from + sizeof(uint64_t))) {
559 STREAM_BOUND_WARN(s, "get quad");
560 return 0;
561 }
562
563 q = ((uint64_t)s->data[from++]) << 56;
564 q |= ((uint64_t)s->data[from++]) << 48;
565 q |= ((uint64_t)s->data[from++]) << 40;
566 q |= ((uint64_t)s->data[from++]) << 32;
567 q |= ((uint64_t)s->data[from++]) << 24;
568 q |= ((uint64_t)s->data[from++]) << 16;
569 q |= ((uint64_t)s->data[from++]) << 8;
570 q |= ((uint64_t)s->data[from++]);
571
572 return q;
4b201d46 573}
574
d62a17ae 575uint64_t stream_getq(struct stream *s)
4b201d46 576{
d62a17ae 577 uint64_t q;
578
579 STREAM_VERIFY_SANE(s);
580
581 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
582 STREAM_BOUND_WARN(s, "get quad");
583 return 0;
584 }
585
586 q = ((uint64_t)s->data[s->getp++]) << 56;
587 q |= ((uint64_t)s->data[s->getp++]) << 48;
588 q |= ((uint64_t)s->data[s->getp++]) << 40;
589 q |= ((uint64_t)s->data[s->getp++]) << 32;
590 q |= ((uint64_t)s->data[s->getp++]) << 24;
591 q |= ((uint64_t)s->data[s->getp++]) << 16;
592 q |= ((uint64_t)s->data[s->getp++]) << 8;
593 q |= ((uint64_t)s->data[s->getp++]);
594
595 return q;
4b201d46 596}
597
c2b5a4e5
QY
598bool stream_getq2(struct stream *s, uint64_t *q)
599{
600 STREAM_VERIFY_SANE(s);
601
602 if (STREAM_READABLE(s) < sizeof(uint64_t)) {
603 STREAM_BOUND_WARN2(s, "get uint64");
604 return false;
605 }
606
607 *q = ((uint64_t)s->data[s->getp++]) << 56;
608 *q |= ((uint64_t)s->data[s->getp++]) << 48;
609 *q |= ((uint64_t)s->data[s->getp++]) << 40;
610 *q |= ((uint64_t)s->data[s->getp++]) << 32;
611 *q |= ((uint64_t)s->data[s->getp++]) << 24;
612 *q |= ((uint64_t)s->data[s->getp++]) << 16;
613 *q |= ((uint64_t)s->data[s->getp++]) << 8;
614 *q |= ((uint64_t)s->data[s->getp++]);
615
616 return true;
617}
618
718e3744 619/* Get next long word from the stream. */
d7c0a89a 620uint32_t stream_get_ipv4(struct stream *s)
718e3744 621{
d7c0a89a 622 uint32_t l;
d62a17ae 623
624 STREAM_VERIFY_SANE(s);
625
d7c0a89a 626 if (STREAM_READABLE(s) < sizeof(uint32_t)) {
d62a17ae 627 STREAM_BOUND_WARN(s, "get ipv4");
628 return 0;
629 }
630
d7c0a89a
QY
631 memcpy(&l, s->data + s->getp, sizeof(uint32_t));
632 s->getp += sizeof(uint32_t);
d62a17ae 633
634 return l;
718e3744 635}
6b0655a2 636
31f937fb
SM
637bool stream_get_ipaddr(struct stream *s, struct ipaddr *ip)
638{
639 uint16_t ipa_len;
640
641 STREAM_VERIFY_SANE(s);
642
643 /* Get address type. */
644 if (STREAM_READABLE(s) < sizeof(uint16_t)) {
645 STREAM_BOUND_WARN2(s, "get ipaddr");
646 return false;
647 }
648 ip->ipa_type = stream_getw(s);
649
650 /* Get address value. */
651 switch (ip->ipa_type) {
652 case IPADDR_V4:
653 ipa_len = IPV4_MAX_BYTELEN;
654 break;
655 case IPADDR_V6:
656 ipa_len = IPV6_MAX_BYTELEN;
657 break;
658 default:
659 flog_err(EC_LIB_DEVELOPMENT,
660 "%s: unknown ip address-family: %u", __func__,
661 ip->ipa_type);
662 return false;
663 }
664 if (STREAM_READABLE(s) < ipa_len) {
665 STREAM_BOUND_WARN2(s, "get ipaddr");
666 return false;
667 }
668 memcpy(&ip->ip, s->data + s->getp, ipa_len);
669 s->getp += ipa_len;
670
671 return true;
672}
673
d62a17ae 674float stream_getf(struct stream *s)
16f1b9ee 675{
d62a17ae 676 union {
677 float r;
678 uint32_t d;
679 } u;
680 u.d = stream_getl(s);
681 return u.r;
16f1b9ee
OD
682}
683
d62a17ae 684double stream_getd(struct stream *s)
16f1b9ee 685{
d62a17ae 686 union {
687 double r;
688 uint64_t d;
689 } u;
690 u.d = stream_getq(s);
691 return u.r;
16f1b9ee
OD
692}
693
050c013a 694/* Copy to source to stream.
695 *
696 * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
697 * around. This should be fixed once the stream updates are working.
0dab9303 698 *
699 * stream_write() is saner
050c013a 700 */
d62a17ae 701void stream_put(struct stream *s, const void *src, size_t size)
718e3744 702{
703
d62a17ae 704 /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
705 CHECK_SIZE(s, size);
706
707 STREAM_VERIFY_SANE(s);
708
709 if (STREAM_WRITEABLE(s) < size) {
710 STREAM_BOUND_WARN(s, "put");
711 return;
712 }
713
714 if (src)
715 memcpy(s->data + s->endp, src, size);
716 else
717 memset(s->data + s->endp, 0, size);
718
719 s->endp += size;
718e3744 720}
721
722/* Put character to the stream. */
d7c0a89a 723int stream_putc(struct stream *s, uint8_t c)
718e3744 724{
d62a17ae 725 STREAM_VERIFY_SANE(s);
726
d7c0a89a 727 if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
d62a17ae 728 STREAM_BOUND_WARN(s, "put");
729 return 0;
730 }
731
732 s->data[s->endp++] = c;
d7c0a89a 733 return sizeof(uint8_t);
718e3744 734}
735
736/* Put word to the stream. */
d7c0a89a 737int stream_putw(struct stream *s, uint16_t w)
718e3744 738{
d62a17ae 739 STREAM_VERIFY_SANE(s);
740
d7c0a89a 741 if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
d62a17ae 742 STREAM_BOUND_WARN(s, "put");
743 return 0;
744 }
745
d7c0a89a
QY
746 s->data[s->endp++] = (uint8_t)(w >> 8);
747 s->data[s->endp++] = (uint8_t)w;
d62a17ae 748
749 return 2;
718e3744 750}
751
d6f4a61d 752/* Put long word to the stream. */
d7c0a89a 753int stream_put3(struct stream *s, uint32_t l)
d6f4a61d 754{
d62a17ae 755 STREAM_VERIFY_SANE(s);
756
757 if (STREAM_WRITEABLE(s) < 3) {
758 STREAM_BOUND_WARN(s, "put");
759 return 0;
760 }
761
d7c0a89a
QY
762 s->data[s->endp++] = (uint8_t)(l >> 16);
763 s->data[s->endp++] = (uint8_t)(l >> 8);
764 s->data[s->endp++] = (uint8_t)l;
d62a17ae 765
766 return 3;
d6f4a61d
DL
767}
768
718e3744 769/* Put long word to the stream. */
d7c0a89a 770int stream_putl(struct stream *s, uint32_t l)
718e3744 771{
d62a17ae 772 STREAM_VERIFY_SANE(s);
773
d7c0a89a 774 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
d62a17ae 775 STREAM_BOUND_WARN(s, "put");
776 return 0;
777 }
778
d7c0a89a
QY
779 s->data[s->endp++] = (uint8_t)(l >> 24);
780 s->data[s->endp++] = (uint8_t)(l >> 16);
781 s->data[s->endp++] = (uint8_t)(l >> 8);
782 s->data[s->endp++] = (uint8_t)l;
d62a17ae 783
784 return 4;
718e3744 785}
786
4b201d46 787/* Put quad word to the stream. */
d62a17ae 788int stream_putq(struct stream *s, uint64_t q)
4b201d46 789{
d62a17ae 790 STREAM_VERIFY_SANE(s);
791
792 if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
793 STREAM_BOUND_WARN(s, "put quad");
794 return 0;
795 }
796
d7c0a89a
QY
797 s->data[s->endp++] = (uint8_t)(q >> 56);
798 s->data[s->endp++] = (uint8_t)(q >> 48);
799 s->data[s->endp++] = (uint8_t)(q >> 40);
800 s->data[s->endp++] = (uint8_t)(q >> 32);
801 s->data[s->endp++] = (uint8_t)(q >> 24);
802 s->data[s->endp++] = (uint8_t)(q >> 16);
803 s->data[s->endp++] = (uint8_t)(q >> 8);
804 s->data[s->endp++] = (uint8_t)q;
d62a17ae 805
806 return 8;
4b201d46 807}
808
d62a17ae 809int stream_putf(struct stream *s, float f)
16f1b9ee 810{
d62a17ae 811 union {
812 float i;
813 uint32_t o;
814 } u;
815 u.i = f;
816 return stream_putl(s, u.o);
16f1b9ee
OD
817}
818
d62a17ae 819int stream_putd(struct stream *s, double d)
16f1b9ee 820{
d62a17ae 821 union {
822 double i;
823 uint64_t o;
824 } u;
825 u.i = d;
826 return stream_putq(s, u.o);
16f1b9ee
OD
827}
828
d7c0a89a 829int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
718e3744 830{
d62a17ae 831 STREAM_VERIFY_SANE(s);
832
d7c0a89a 833 if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
d62a17ae 834 STREAM_BOUND_WARN(s, "put");
835 return 0;
836 }
837
838 s->data[putp] = c;
839
840 return 1;
718e3744 841}
842
d7c0a89a 843int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
718e3744 844{
d62a17ae 845 STREAM_VERIFY_SANE(s);
846
d7c0a89a 847 if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
d62a17ae 848 STREAM_BOUND_WARN(s, "put");
849 return 0;
850 }
851
d7c0a89a
QY
852 s->data[putp] = (uint8_t)(w >> 8);
853 s->data[putp + 1] = (uint8_t)w;
d62a17ae 854
855 return 2;
718e3744 856}
857
d7c0a89a 858int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
d6f4a61d 859{
d62a17ae 860 STREAM_VERIFY_SANE(s);
861
862 if (!PUT_AT_VALID(s, putp + 3)) {
863 STREAM_BOUND_WARN(s, "put");
864 return 0;
865 }
d7c0a89a
QY
866 s->data[putp] = (uint8_t)(l >> 16);
867 s->data[putp + 1] = (uint8_t)(l >> 8);
868 s->data[putp + 2] = (uint8_t)l;
d62a17ae 869
870 return 3;
d6f4a61d
DL
871}
872
d7c0a89a 873int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
718e3744 874{
d62a17ae 875 STREAM_VERIFY_SANE(s);
876
d7c0a89a 877 if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
d62a17ae 878 STREAM_BOUND_WARN(s, "put");
879 return 0;
880 }
d7c0a89a
QY
881 s->data[putp] = (uint8_t)(l >> 24);
882 s->data[putp + 1] = (uint8_t)(l >> 16);
883 s->data[putp + 2] = (uint8_t)(l >> 8);
884 s->data[putp + 3] = (uint8_t)l;
d62a17ae 885
886 return 4;
718e3744 887}
888
d62a17ae 889int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
4b201d46 890{
d62a17ae 891 STREAM_VERIFY_SANE(s);
892
893 if (!PUT_AT_VALID(s, putp + sizeof(uint64_t))) {
894 STREAM_BOUND_WARN(s, "put");
895 return 0;
896 }
d7c0a89a
QY
897 s->data[putp] = (uint8_t)(q >> 56);
898 s->data[putp + 1] = (uint8_t)(q >> 48);
899 s->data[putp + 2] = (uint8_t)(q >> 40);
900 s->data[putp + 3] = (uint8_t)(q >> 32);
901 s->data[putp + 4] = (uint8_t)(q >> 24);
902 s->data[putp + 5] = (uint8_t)(q >> 16);
903 s->data[putp + 6] = (uint8_t)(q >> 8);
904 s->data[putp + 7] = (uint8_t)q;
d62a17ae 905
906 return 8;
4b201d46 907}
908
718e3744 909/* Put long word to the stream. */
d7c0a89a 910int stream_put_ipv4(struct stream *s, uint32_t l)
718e3744 911{
d62a17ae 912 STREAM_VERIFY_SANE(s);
913
d7c0a89a 914 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
d62a17ae 915 STREAM_BOUND_WARN(s, "put");
916 return 0;
917 }
d7c0a89a
QY
918 memcpy(s->data + s->endp, &l, sizeof(uint32_t));
919 s->endp += sizeof(uint32_t);
d62a17ae 920
d7c0a89a 921 return sizeof(uint32_t);
718e3744 922}
923
924/* Put long word to the stream. */
d3d77ec4 925int stream_put_in_addr(struct stream *s, const struct in_addr *addr)
718e3744 926{
d62a17ae 927 STREAM_VERIFY_SANE(s);
928
d7c0a89a 929 if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
d62a17ae 930 STREAM_BOUND_WARN(s, "put");
931 return 0;
932 }
933
d7c0a89a
QY
934 memcpy(s->data + s->endp, addr, sizeof(uint32_t));
935 s->endp += sizeof(uint32_t);
d62a17ae 936
d7c0a89a 937 return sizeof(uint32_t);
718e3744 938}
939
31f937fb
SM
940bool stream_put_ipaddr(struct stream *s, struct ipaddr *ip)
941{
942 stream_putw(s, ip->ipa_type);
943
944 switch (ip->ipa_type) {
945 case IPADDR_V4:
946 stream_put_in_addr(s, &ip->ipaddr_v4);
947 break;
948 case IPADDR_V6:
949 stream_write(s, (uint8_t *)&ip->ipaddr_v6, 16);
950 break;
951 default:
952 flog_err(EC_LIB_DEVELOPMENT,
953 "%s: unknown ip address-family: %u", __func__,
954 ip->ipa_type);
955 return false;
956 }
957
958 return true;
959}
960
3f9c7369 961/* Put in_addr at location in the stream. */
d3d77ec4
MS
962int stream_put_in_addr_at(struct stream *s, size_t putp,
963 const struct in_addr *addr)
3f9c7369 964{
d62a17ae 965 STREAM_VERIFY_SANE(s);
3f9c7369 966
d62a17ae 967 if (!PUT_AT_VALID(s, putp + 4)) {
968 STREAM_BOUND_WARN(s, "put");
969 return 0;
970 }
3f9c7369 971
d62a17ae 972 memcpy(&s->data[putp], addr, 4);
973 return 4;
3f9c7369
DS
974}
975
976/* Put in6_addr at location in the stream. */
d3d77ec4
MS
977int stream_put_in6_addr_at(struct stream *s, size_t putp,
978 const struct in6_addr *addr)
3f9c7369 979{
d62a17ae 980 STREAM_VERIFY_SANE(s);
3f9c7369 981
d62a17ae 982 if (!PUT_AT_VALID(s, putp + 16)) {
983 STREAM_BOUND_WARN(s, "put");
984 return 0;
985 }
3f9c7369 986
d62a17ae 987 memcpy(&s->data[putp], addr, 16);
988 return 16;
3f9c7369
DS
989}
990
718e3744 991/* Put prefix by nlri type format. */
d3d77ec4 992int stream_put_prefix_addpath(struct stream *s, const struct prefix *p,
d7c0a89a 993 int addpath_encode, uint32_t addpath_tx_id)
718e3744 994{
d62a17ae 995 size_t psize;
996 size_t psize_with_addpath;
997
998 STREAM_VERIFY_SANE(s);
999
1000 psize = PSIZE(p->prefixlen);
1001
1002 if (addpath_encode)
1003 psize_with_addpath = psize + 4;
1004 else
1005 psize_with_addpath = psize;
1006
d7c0a89a 1007 if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
d62a17ae 1008 STREAM_BOUND_WARN(s, "put");
1009 return 0;
1010 }
1011
1012 if (addpath_encode) {
d7c0a89a
QY
1013 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
1014 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
1015 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
1016 s->data[s->endp++] = (uint8_t)addpath_tx_id;
d62a17ae 1017 }
1018
1019 s->data[s->endp++] = p->prefixlen;
1020 memcpy(s->data + s->endp, &p->u.prefix, psize);
1021 s->endp += psize;
1022
1023 return psize;
718e3744 1024}
6b0655a2 1025
d3d77ec4 1026int stream_put_prefix(struct stream *s, const struct prefix *p)
adbac85e 1027{
d62a17ae 1028 return stream_put_prefix_addpath(s, p, 0, 0);
adbac85e
DW
1029}
1030
cd1964ff 1031/* Put NLRI with label */
5f040085 1032int stream_put_labeled_prefix(struct stream *s, const struct prefix *p,
ec15e1b5
QY
1033 mpls_label_t *label, int addpath_encode,
1034 uint32_t addpath_tx_id)
cd1964ff 1035{
d62a17ae 1036 size_t psize;
ec15e1b5 1037 size_t psize_with_addpath;
d7c0a89a 1038 uint8_t *label_pnt = (uint8_t *)label;
cd1964ff 1039
d62a17ae 1040 STREAM_VERIFY_SANE(s);
cd1964ff 1041
d62a17ae 1042 psize = PSIZE(p->prefixlen);
ec15e1b5 1043 psize_with_addpath = psize + (addpath_encode ? 4 : 0);
cd1964ff 1044
ec15e1b5 1045 if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) {
d62a17ae 1046 STREAM_BOUND_WARN(s, "put");
1047 return 0;
1048 }
cd1964ff 1049
ec15e1b5
QY
1050 if (addpath_encode) {
1051 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
1052 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
1053 s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
1054 s->data[s->endp++] = (uint8_t)addpath_tx_id;
1055 }
1056
d62a17ae 1057 stream_putc(s, (p->prefixlen + 24));
1058 stream_putc(s, label_pnt[0]);
1059 stream_putc(s, label_pnt[1]);
1060 stream_putc(s, label_pnt[2]);
1061 memcpy(s->data + s->endp, &p->u.prefix, psize);
1062 s->endp += psize;
cd1964ff 1063
d62a17ae 1064 return (psize + 3);
cd1964ff 1065}
adbac85e 1066
718e3744 1067/* Read size from fd. */
d62a17ae 1068int stream_read(struct stream *s, int fd, size_t size)
718e3744 1069{
d62a17ae 1070 int nbytes;
1071
1072 STREAM_VERIFY_SANE(s);
1073
1074 if (STREAM_WRITEABLE(s) < size) {
1075 STREAM_BOUND_WARN(s, "put");
1076 return 0;
1077 }
1078
1079 nbytes = readn(fd, s->data + s->endp, size);
1080
1081 if (nbytes > 0)
1082 s->endp += nbytes;
1083
1084 return nbytes;
718e3744 1085}
1086
d62a17ae 1087ssize_t stream_read_try(struct stream *s, int fd, size_t size)
262feb1a 1088{
d62a17ae 1089 ssize_t nbytes;
1090
1091 STREAM_VERIFY_SANE(s);
1092
1093 if (STREAM_WRITEABLE(s) < size) {
1094 STREAM_BOUND_WARN(s, "put");
1095 /* Fatal (not transient) error, since retrying will not help
1096 (stream is too small to contain the desired data). */
1097 return -1;
1098 }
1099
1100 if ((nbytes = read(fd, s->data + s->endp, size)) >= 0) {
1101 s->endp += nbytes;
1102 return nbytes;
1103 }
1104 /* Error: was it transient (return -2) or fatal (return -1)? */
1105 if (ERRNO_IO_RETRY(errno))
1106 return -2;
450971aa 1107 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
a2b0f8b8 1108 safe_strerror(errno));
d62a17ae 1109 return -1;
262feb1a 1110}
1111
0dab9303 1112/* Read up to size bytes into the stream from the fd, using recvmsgfrom
1113 * whose arguments match the remaining arguments to this function
1114 */
d62a17ae 1115ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
1116 struct sockaddr *from, socklen_t *fromlen)
0dab9303 1117{
d62a17ae 1118 ssize_t nbytes;
1119
1120 STREAM_VERIFY_SANE(s);
1121
1122 if (STREAM_WRITEABLE(s) < size) {
1123 STREAM_BOUND_WARN(s, "put");
1124 /* Fatal (not transient) error, since retrying will not help
1125 (stream is too small to contain the desired data). */
1126 return -1;
1127 }
1128
1129 if ((nbytes = recvfrom(fd, s->data + s->endp, size, flags, from,
1130 fromlen))
1131 >= 0) {
1132 s->endp += nbytes;
1133 return nbytes;
1134 }
1135 /* Error: was it transient (return -2) or fatal (return -1)? */
1136 if (ERRNO_IO_RETRY(errno))
1137 return -2;
450971aa 1138 flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
a2b0f8b8 1139 safe_strerror(errno));
d62a17ae 1140 return -1;
0dab9303 1141}
1142
050c013a 1143/* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1144 * from endp.
1145 * First iovec will be used to receive the data.
1146 * Stream need not be empty.
1147 */
d62a17ae 1148ssize_t stream_recvmsg(struct stream *s, int fd, struct msghdr *msgh, int flags,
1149 size_t size)
050c013a 1150{
d62a17ae 1151 int nbytes;
1152 struct iovec *iov;
1153
1154 STREAM_VERIFY_SANE(s);
1155 assert(msgh->msg_iovlen > 0);
1156
1157 if (STREAM_WRITEABLE(s) < size) {
1158 STREAM_BOUND_WARN(s, "put");
1159 /* This is a logic error in the calling code: the stream is too
1160 small
1161 to hold the desired data! */
1162 return -1;
1163 }
1164
1165 iov = &(msgh->msg_iov[0]);
1166 iov->iov_base = (s->data + s->endp);
1167 iov->iov_len = size;
1168
1169 nbytes = recvmsg(fd, msgh, flags);
1170
1171 if (nbytes > 0)
1172 s->endp += nbytes;
1173
1174 return nbytes;
050c013a 1175}
d62a17ae 1176
718e3744 1177/* Write data to buffer. */
d62a17ae 1178size_t stream_write(struct stream *s, const void *ptr, size_t size)
718e3744 1179{
1180
d62a17ae 1181 CHECK_SIZE(s, size);
1182
1183 STREAM_VERIFY_SANE(s);
1184
1185 if (STREAM_WRITEABLE(s) < size) {
1186 STREAM_BOUND_WARN(s, "put");
1187 return 0;
1188 }
718e3744 1189
d62a17ae 1190 memcpy(s->data + s->endp, ptr, size);
1191 s->endp += size;
9985f83c 1192
d62a17ae 1193 return size;
718e3744 1194}
1195
d62a17ae 1196/* Return current read pointer.
050c013a 1197 * DEPRECATED!
1198 * Use stream_get_pnt_to if you must, but decoding streams properly
1199 * is preferred
1200 */
d7c0a89a 1201uint8_t *stream_pnt(struct stream *s)
718e3744 1202{
d62a17ae 1203 STREAM_VERIFY_SANE(s);
1204 return s->data + s->getp;
718e3744 1205}
1206
1207/* Check does this stream empty? */
d62a17ae 1208int stream_empty(struct stream *s)
718e3744 1209{
d62a17ae 1210 STREAM_VERIFY_SANE(s);
050c013a 1211
d62a17ae 1212 return (s->endp == 0);
718e3744 1213}
1214
1215/* Reset stream. */
d62a17ae 1216void stream_reset(struct stream *s)
718e3744 1217{
d62a17ae 1218 STREAM_VERIFY_SANE(s);
050c013a 1219
d62a17ae 1220 s->getp = s->endp = 0;
718e3744 1221}
1222
1223/* Write stream contens to the file discriptor. */
d62a17ae 1224int stream_flush(struct stream *s, int fd)
718e3744 1225{
d62a17ae 1226 int nbytes;
1227
1228 STREAM_VERIFY_SANE(s);
1229
1230 nbytes = write(fd, s->data + s->getp, s->endp - s->getp);
1231
1232 return nbytes;
718e3744 1233}
6b0655a2 1234
f1bc75da 1235void stream_hexdump(const struct stream *s)
9d72660d
WC
1236{
1237 zlog_hexdump(s->data, s->endp);
1238}
1239
718e3744 1240/* Stream first in first out queue. */
1241
d62a17ae 1242struct stream_fifo *stream_fifo_new(void)
718e3744 1243{
d62a17ae 1244 struct stream_fifo *new;
1245
f8c511cd
MS
1246 new = XMALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
1247 stream_fifo_init(new);
d62a17ae 1248 return new;
718e3744 1249}
1250
f8c511cd
MS
1251void stream_fifo_init(struct stream_fifo *fifo)
1252{
1253 memset(fifo, 0, sizeof(struct stream_fifo));
1254 pthread_mutex_init(&fifo->mtx, NULL);
1255}
1256
718e3744 1257/* Add new stream to fifo. */
d62a17ae 1258void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
718e3744 1259{
03ed85a6
DS
1260#if defined DEV_BUILD
1261 size_t max, curmax;
1262#endif
1263
d62a17ae 1264 if (fifo->tail)
1265 fifo->tail->next = s;
1266 else
1267 fifo->head = s;
1268
1269 fifo->tail = s;
08a0e54e 1270 fifo->tail->next = NULL;
03ed85a6 1271#if !defined DEV_BUILD
363e24c6 1272 atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
03ed85a6
DS
1273#else
1274 max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1275 curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
1276 if (max > curmax)
1277 atomic_store_explicit(&fifo->max_count, max,
1278 memory_order_relaxed);
1279#endif
363e24c6
QY
1280}
1281
1282void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
1283{
00dffa8c 1284 frr_with_mutex(&fifo->mtx) {
363e24c6
QY
1285 stream_fifo_push(fifo, s);
1286 }
718e3744 1287}
1288
1289/* Delete first stream from fifo. */
d62a17ae 1290struct stream *stream_fifo_pop(struct stream_fifo *fifo)
718e3744 1291{
d62a17ae 1292 struct stream *s;
1293
1294 s = fifo->head;
718e3744 1295
d62a17ae 1296 if (s) {
1297 fifo->head = s->next;
718e3744 1298
d62a17ae 1299 if (fifo->head == NULL)
1300 fifo->tail = NULL;
718e3744 1301
363e24c6
QY
1302 atomic_fetch_sub_explicit(&fifo->count, 1,
1303 memory_order_release);
08a0e54e
QY
1304
1305 /* ensure stream is scrubbed of references to this fifo */
1306 s->next = NULL;
d62a17ae 1307 }
718e3744 1308
d62a17ae 1309 return s;
718e3744 1310}
1311
363e24c6
QY
1312struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
1313{
1314 struct stream *ret;
1315
00dffa8c 1316 frr_with_mutex(&fifo->mtx) {
363e24c6
QY
1317 ret = stream_fifo_pop(fifo);
1318 }
363e24c6
QY
1319
1320 return ret;
1321}
1322
d62a17ae 1323struct stream *stream_fifo_head(struct stream_fifo *fifo)
718e3744 1324{
d62a17ae 1325 return fifo->head;
718e3744 1326}
1327
363e24c6
QY
1328struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
1329{
1330 struct stream *ret;
1331
00dffa8c 1332 frr_with_mutex(&fifo->mtx) {
363e24c6
QY
1333 ret = stream_fifo_head(fifo);
1334 }
363e24c6
QY
1335
1336 return ret;
1337}
1338
d62a17ae 1339void stream_fifo_clean(struct stream_fifo *fifo)
718e3744 1340{
d62a17ae 1341 struct stream *s;
1342 struct stream *next;
1343
1344 for (s = fifo->head; s; s = next) {
1345 next = s->next;
1346 stream_free(s);
1347 }
1348 fifo->head = fifo->tail = NULL;
363e24c6
QY
1349 atomic_store_explicit(&fifo->count, 0, memory_order_release);
1350}
1351
1352void stream_fifo_clean_safe(struct stream_fifo *fifo)
1353{
00dffa8c 1354 frr_with_mutex(&fifo->mtx) {
363e24c6
QY
1355 stream_fifo_clean(fifo);
1356 }
363e24c6
QY
1357}
1358
1359size_t stream_fifo_count_safe(struct stream_fifo *fifo)
1360{
1361 return atomic_load_explicit(&fifo->count, memory_order_acquire);
718e3744 1362}
1363
f8c511cd 1364void stream_fifo_deinit(struct stream_fifo *fifo)
718e3744 1365{
d62a17ae 1366 stream_fifo_clean(fifo);
363e24c6 1367 pthread_mutex_destroy(&fifo->mtx);
f8c511cd
MS
1368}
1369
1370void stream_fifo_free(struct stream_fifo *fifo)
1371{
1372 stream_fifo_deinit(fifo);
d62a17ae 1373 XFREE(MTYPE_STREAM_FIFO, fifo);
718e3744 1374}
91804f63
RZ
1375
1376void stream_pulldown(struct stream *s)
1377{
1378 size_t rlen = STREAM_READABLE(s);
1379
1380 /* No more data, so just move the pointers. */
1381 if (rlen == 0) {
1382 stream_reset(s);
1383 return;
1384 }
1385
1386 /* Move the available data to the beginning. */
1387 memmove(s->data, &s->data[s->getp], rlen);
1388 s->getp = 0;
1389 s->endp = rlen;
1390}