1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2010-2014 Intel Corporation
9 #include <rte_malloc.h>
11 #include "rte_port_ring.h"
16 #ifdef RTE_PORT_STATS_COLLECT
18 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val) \
19 port->stats.n_pkts_in += val
20 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val) \
21 port->stats.n_pkts_drop += val
25 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val)
26 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val)
30 struct rte_port_ring_reader
{
31 struct rte_port_in_stats stats
;
33 struct rte_ring
*ring
;
37 rte_port_ring_reader_create_internal(void *params
, int socket_id
,
40 struct rte_port_ring_reader_params
*conf
=
42 struct rte_port_ring_reader
*port
;
44 /* Check input parameters */
46 (conf
->ring
== NULL
) ||
47 (conf
->ring
->cons
.single
&& is_multi
) ||
48 (!(conf
->ring
->cons
.single
) && !is_multi
)) {
49 RTE_LOG(ERR
, PORT
, "%s: Invalid Parameters\n", __func__
);
53 /* Memory allocation */
54 port
= rte_zmalloc_socket("PORT", sizeof(*port
),
55 RTE_CACHE_LINE_SIZE
, socket_id
);
57 RTE_LOG(ERR
, PORT
, "%s: Failed to allocate port\n", __func__
);
62 port
->ring
= conf
->ring
;
68 rte_port_ring_reader_create(void *params
, int socket_id
)
70 return rte_port_ring_reader_create_internal(params
, socket_id
, 0);
74 rte_port_ring_multi_reader_create(void *params
, int socket_id
)
76 return rte_port_ring_reader_create_internal(params
, socket_id
, 1);
80 rte_port_ring_reader_rx(void *port
, struct rte_mbuf
**pkts
, uint32_t n_pkts
)
82 struct rte_port_ring_reader
*p
= port
;
85 nb_rx
= rte_ring_sc_dequeue_burst(p
->ring
, (void **) pkts
,
87 RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p
, nb_rx
);
93 rte_port_ring_multi_reader_rx(void *port
, struct rte_mbuf
**pkts
,
96 struct rte_port_ring_reader
*p
= port
;
99 nb_rx
= rte_ring_mc_dequeue_burst(p
->ring
, (void **) pkts
,
101 RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p
, nb_rx
);
107 rte_port_ring_reader_free(void *port
)
110 RTE_LOG(ERR
, PORT
, "%s: port is NULL\n", __func__
);
120 rte_port_ring_reader_stats_read(void *port
,
121 struct rte_port_in_stats
*stats
, int clear
)
123 struct rte_port_ring_reader
*p
=
127 memcpy(stats
, &p
->stats
, sizeof(p
->stats
));
130 memset(&p
->stats
, 0, sizeof(p
->stats
));
138 #ifdef RTE_PORT_STATS_COLLECT
140 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val) \
141 port->stats.n_pkts_in += val
142 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val) \
143 port->stats.n_pkts_drop += val
147 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val)
148 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val)
152 struct rte_port_ring_writer
{
153 struct rte_port_out_stats stats
;
155 struct rte_mbuf
*tx_buf
[2 * RTE_PORT_IN_BURST_SIZE_MAX
];
156 struct rte_ring
*ring
;
157 uint32_t tx_burst_sz
;
158 uint32_t tx_buf_count
;
164 rte_port_ring_writer_create_internal(void *params
, int socket_id
,
167 struct rte_port_ring_writer_params
*conf
=
169 struct rte_port_ring_writer
*port
;
171 /* Check input parameters */
172 if ((conf
== NULL
) ||
173 (conf
->ring
== NULL
) ||
174 (conf
->ring
->prod
.single
&& is_multi
) ||
175 (!(conf
->ring
->prod
.single
) && !is_multi
) ||
176 (conf
->tx_burst_sz
> RTE_PORT_IN_BURST_SIZE_MAX
)) {
177 RTE_LOG(ERR
, PORT
, "%s: Invalid Parameters\n", __func__
);
181 /* Memory allocation */
182 port
= rte_zmalloc_socket("PORT", sizeof(*port
),
183 RTE_CACHE_LINE_SIZE
, socket_id
);
185 RTE_LOG(ERR
, PORT
, "%s: Failed to allocate port\n", __func__
);
190 port
->ring
= conf
->ring
;
191 port
->tx_burst_sz
= conf
->tx_burst_sz
;
192 port
->tx_buf_count
= 0;
193 port
->bsz_mask
= 1LLU << (conf
->tx_burst_sz
- 1);
194 port
->is_multi
= is_multi
;
200 rte_port_ring_writer_create(void *params
, int socket_id
)
202 return rte_port_ring_writer_create_internal(params
, socket_id
, 0);
206 rte_port_ring_multi_writer_create(void *params
, int socket_id
)
208 return rte_port_ring_writer_create_internal(params
, socket_id
, 1);
212 send_burst(struct rte_port_ring_writer
*p
)
216 nb_tx
= rte_ring_sp_enqueue_burst(p
->ring
, (void **)p
->tx_buf
,
217 p
->tx_buf_count
, NULL
);
219 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p
, p
->tx_buf_count
- nb_tx
);
220 for ( ; nb_tx
< p
->tx_buf_count
; nb_tx
++)
221 rte_pktmbuf_free(p
->tx_buf
[nb_tx
]);
227 send_burst_mp(struct rte_port_ring_writer
*p
)
231 nb_tx
= rte_ring_mp_enqueue_burst(p
->ring
, (void **)p
->tx_buf
,
232 p
->tx_buf_count
, NULL
);
234 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p
, p
->tx_buf_count
- nb_tx
);
235 for ( ; nb_tx
< p
->tx_buf_count
; nb_tx
++)
236 rte_pktmbuf_free(p
->tx_buf
[nb_tx
]);
242 rte_port_ring_writer_tx(void *port
, struct rte_mbuf
*pkt
)
244 struct rte_port_ring_writer
*p
= port
;
246 p
->tx_buf
[p
->tx_buf_count
++] = pkt
;
247 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p
, 1);
248 if (p
->tx_buf_count
>= p
->tx_burst_sz
)
255 rte_port_ring_multi_writer_tx(void *port
, struct rte_mbuf
*pkt
)
257 struct rte_port_ring_writer
*p
= port
;
259 p
->tx_buf
[p
->tx_buf_count
++] = pkt
;
260 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p
, 1);
261 if (p
->tx_buf_count
>= p
->tx_burst_sz
)
267 static __rte_always_inline
int
268 rte_port_ring_writer_tx_bulk_internal(void *port
,
269 struct rte_mbuf
**pkts
,
273 struct rte_port_ring_writer
*p
=
276 uint64_t bsz_mask
= p
->bsz_mask
;
277 uint32_t tx_buf_count
= p
->tx_buf_count
;
278 uint64_t expr
= (pkts_mask
& (pkts_mask
+ 1)) |
279 ((pkts_mask
& bsz_mask
) ^ bsz_mask
);
282 uint64_t n_pkts
= __builtin_popcountll(pkts_mask
);
292 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p
, n_pkts
);
294 n_pkts_ok
= rte_ring_mp_enqueue_burst(p
->ring
,
295 (void **)pkts
, n_pkts
, NULL
);
297 n_pkts_ok
= rte_ring_sp_enqueue_burst(p
->ring
,
298 (void **)pkts
, n_pkts
, NULL
);
300 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p
, n_pkts
- n_pkts_ok
);
301 for ( ; n_pkts_ok
< n_pkts
; n_pkts_ok
++) {
302 struct rte_mbuf
*pkt
= pkts
[n_pkts_ok
];
304 rte_pktmbuf_free(pkt
);
307 for ( ; pkts_mask
; ) {
308 uint32_t pkt_index
= __builtin_ctzll(pkts_mask
);
309 uint64_t pkt_mask
= 1LLU << pkt_index
;
310 struct rte_mbuf
*pkt
= pkts
[pkt_index
];
312 p
->tx_buf
[tx_buf_count
++] = pkt
;
313 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p
, 1);
314 pkts_mask
&= ~pkt_mask
;
317 p
->tx_buf_count
= tx_buf_count
;
318 if (tx_buf_count
>= p
->tx_burst_sz
) {
330 rte_port_ring_writer_tx_bulk(void *port
,
331 struct rte_mbuf
**pkts
,
334 return rte_port_ring_writer_tx_bulk_internal(port
, pkts
, pkts_mask
, 0);
338 rte_port_ring_multi_writer_tx_bulk(void *port
,
339 struct rte_mbuf
**pkts
,
342 return rte_port_ring_writer_tx_bulk_internal(port
, pkts
, pkts_mask
, 1);
346 rte_port_ring_writer_flush(void *port
)
348 struct rte_port_ring_writer
*p
= port
;
350 if (p
->tx_buf_count
> 0)
357 rte_port_ring_multi_writer_flush(void *port
)
359 struct rte_port_ring_writer
*p
= port
;
361 if (p
->tx_buf_count
> 0)
368 rte_port_ring_writer_free(void *port
)
370 struct rte_port_ring_writer
*p
= port
;
373 RTE_LOG(ERR
, PORT
, "%s: Port is NULL\n", __func__
);
378 rte_port_ring_multi_writer_flush(port
);
380 rte_port_ring_writer_flush(port
);
388 rte_port_ring_writer_stats_read(void *port
,
389 struct rte_port_out_stats
*stats
, int clear
)
391 struct rte_port_ring_writer
*p
=
395 memcpy(stats
, &p
->stats
, sizeof(p
->stats
));
398 memset(&p
->stats
, 0, sizeof(p
->stats
));
404 * Port RING Writer Nodrop
406 #ifdef RTE_PORT_STATS_COLLECT
408 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
409 port->stats.n_pkts_in += val
410 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
411 port->stats.n_pkts_drop += val
415 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
416 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
420 struct rte_port_ring_writer_nodrop
{
421 struct rte_port_out_stats stats
;
423 struct rte_mbuf
*tx_buf
[2 * RTE_PORT_IN_BURST_SIZE_MAX
];
424 struct rte_ring
*ring
;
425 uint32_t tx_burst_sz
;
426 uint32_t tx_buf_count
;
433 rte_port_ring_writer_nodrop_create_internal(void *params
, int socket_id
,
436 struct rte_port_ring_writer_nodrop_params
*conf
=
438 struct rte_port_ring_writer_nodrop
*port
;
440 /* Check input parameters */
441 if ((conf
== NULL
) ||
442 (conf
->ring
== NULL
) ||
443 (conf
->ring
->prod
.single
&& is_multi
) ||
444 (!(conf
->ring
->prod
.single
) && !is_multi
) ||
445 (conf
->tx_burst_sz
> RTE_PORT_IN_BURST_SIZE_MAX
)) {
446 RTE_LOG(ERR
, PORT
, "%s: Invalid Parameters\n", __func__
);
450 /* Memory allocation */
451 port
= rte_zmalloc_socket("PORT", sizeof(*port
),
452 RTE_CACHE_LINE_SIZE
, socket_id
);
454 RTE_LOG(ERR
, PORT
, "%s: Failed to allocate port\n", __func__
);
459 port
->ring
= conf
->ring
;
460 port
->tx_burst_sz
= conf
->tx_burst_sz
;
461 port
->tx_buf_count
= 0;
462 port
->bsz_mask
= 1LLU << (conf
->tx_burst_sz
- 1);
463 port
->is_multi
= is_multi
;
466 * When n_retries is 0 it means that we should wait for every packet to
467 * send no matter how many retries should it take. To limit number of
468 * branches in fast path, we use UINT64_MAX instead of branching.
470 port
->n_retries
= (conf
->n_retries
== 0) ? UINT64_MAX
: conf
->n_retries
;
476 rte_port_ring_writer_nodrop_create(void *params
, int socket_id
)
478 return rte_port_ring_writer_nodrop_create_internal(params
, socket_id
, 0);
482 rte_port_ring_multi_writer_nodrop_create(void *params
, int socket_id
)
484 return rte_port_ring_writer_nodrop_create_internal(params
, socket_id
, 1);
488 send_burst_nodrop(struct rte_port_ring_writer_nodrop
*p
)
490 uint32_t nb_tx
= 0, i
;
492 nb_tx
= rte_ring_sp_enqueue_burst(p
->ring
, (void **)p
->tx_buf
,
493 p
->tx_buf_count
, NULL
);
495 /* We sent all the packets in a first try */
496 if (nb_tx
>= p
->tx_buf_count
) {
501 for (i
= 0; i
< p
->n_retries
; i
++) {
502 nb_tx
+= rte_ring_sp_enqueue_burst(p
->ring
,
503 (void **) (p
->tx_buf
+ nb_tx
),
504 p
->tx_buf_count
- nb_tx
, NULL
);
506 /* We sent all the packets in more than one try */
507 if (nb_tx
>= p
->tx_buf_count
) {
513 /* We didn't send the packets in maximum allowed attempts */
514 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p
, p
->tx_buf_count
- nb_tx
);
515 for ( ; nb_tx
< p
->tx_buf_count
; nb_tx
++)
516 rte_pktmbuf_free(p
->tx_buf
[nb_tx
]);
522 send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop
*p
)
524 uint32_t nb_tx
= 0, i
;
526 nb_tx
= rte_ring_mp_enqueue_burst(p
->ring
, (void **)p
->tx_buf
,
527 p
->tx_buf_count
, NULL
);
529 /* We sent all the packets in a first try */
530 if (nb_tx
>= p
->tx_buf_count
) {
535 for (i
= 0; i
< p
->n_retries
; i
++) {
536 nb_tx
+= rte_ring_mp_enqueue_burst(p
->ring
,
537 (void **) (p
->tx_buf
+ nb_tx
),
538 p
->tx_buf_count
- nb_tx
, NULL
);
540 /* We sent all the packets in more than one try */
541 if (nb_tx
>= p
->tx_buf_count
) {
547 /* We didn't send the packets in maximum allowed attempts */
548 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p
, p
->tx_buf_count
- nb_tx
);
549 for ( ; nb_tx
< p
->tx_buf_count
; nb_tx
++)
550 rte_pktmbuf_free(p
->tx_buf
[nb_tx
]);
556 rte_port_ring_writer_nodrop_tx(void *port
, struct rte_mbuf
*pkt
)
558 struct rte_port_ring_writer_nodrop
*p
=
561 p
->tx_buf
[p
->tx_buf_count
++] = pkt
;
562 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p
, 1);
563 if (p
->tx_buf_count
>= p
->tx_burst_sz
)
564 send_burst_nodrop(p
);
570 rte_port_ring_multi_writer_nodrop_tx(void *port
, struct rte_mbuf
*pkt
)
572 struct rte_port_ring_writer_nodrop
*p
=
575 p
->tx_buf
[p
->tx_buf_count
++] = pkt
;
576 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p
, 1);
577 if (p
->tx_buf_count
>= p
->tx_burst_sz
)
578 send_burst_mp_nodrop(p
);
583 static __rte_always_inline
int
584 rte_port_ring_writer_nodrop_tx_bulk_internal(void *port
,
585 struct rte_mbuf
**pkts
,
589 struct rte_port_ring_writer_nodrop
*p
=
592 uint64_t bsz_mask
= p
->bsz_mask
;
593 uint32_t tx_buf_count
= p
->tx_buf_count
;
594 uint64_t expr
= (pkts_mask
& (pkts_mask
+ 1)) |
595 ((pkts_mask
& bsz_mask
) ^ bsz_mask
);
598 uint64_t n_pkts
= __builtin_popcountll(pkts_mask
);
603 send_burst_mp_nodrop(p
);
605 send_burst_nodrop(p
);
608 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p
, n_pkts
);
611 rte_ring_mp_enqueue_burst(p
->ring
,
612 (void **)pkts
, n_pkts
, NULL
);
615 rte_ring_sp_enqueue_burst(p
->ring
,
616 (void **)pkts
, n_pkts
, NULL
);
618 if (n_pkts_ok
>= n_pkts
)
622 * If we didn't manage to send all packets in single burst, move
623 * remaining packets to the buffer and call send burst.
625 for (; n_pkts_ok
< n_pkts
; n_pkts_ok
++) {
626 struct rte_mbuf
*pkt
= pkts
[n_pkts_ok
];
628 p
->tx_buf
[p
->tx_buf_count
++] = pkt
;
631 send_burst_mp_nodrop(p
);
633 send_burst_nodrop(p
);
635 for ( ; pkts_mask
; ) {
636 uint32_t pkt_index
= __builtin_ctzll(pkts_mask
);
637 uint64_t pkt_mask
= 1LLU << pkt_index
;
638 struct rte_mbuf
*pkt
= pkts
[pkt_index
];
640 p
->tx_buf
[tx_buf_count
++] = pkt
;
641 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p
, 1);
642 pkts_mask
&= ~pkt_mask
;
645 p
->tx_buf_count
= tx_buf_count
;
646 if (tx_buf_count
>= p
->tx_burst_sz
) {
648 send_burst_mp_nodrop(p
);
650 send_burst_nodrop(p
);
658 rte_port_ring_writer_nodrop_tx_bulk(void *port
,
659 struct rte_mbuf
**pkts
,
663 rte_port_ring_writer_nodrop_tx_bulk_internal(port
, pkts
, pkts_mask
, 0);
667 rte_port_ring_multi_writer_nodrop_tx_bulk(void *port
,
668 struct rte_mbuf
**pkts
,
672 rte_port_ring_writer_nodrop_tx_bulk_internal(port
, pkts
, pkts_mask
, 1);
676 rte_port_ring_writer_nodrop_flush(void *port
)
678 struct rte_port_ring_writer_nodrop
*p
=
681 if (p
->tx_buf_count
> 0)
682 send_burst_nodrop(p
);
688 rte_port_ring_multi_writer_nodrop_flush(void *port
)
690 struct rte_port_ring_writer_nodrop
*p
=
693 if (p
->tx_buf_count
> 0)
694 send_burst_mp_nodrop(p
);
700 rte_port_ring_writer_nodrop_free(void *port
)
702 struct rte_port_ring_writer_nodrop
*p
=
706 RTE_LOG(ERR
, PORT
, "%s: Port is NULL\n", __func__
);
711 rte_port_ring_multi_writer_nodrop_flush(port
);
713 rte_port_ring_writer_nodrop_flush(port
);
721 rte_port_ring_writer_nodrop_stats_read(void *port
,
722 struct rte_port_out_stats
*stats
, int clear
)
724 struct rte_port_ring_writer_nodrop
*p
=
728 memcpy(stats
, &p
->stats
, sizeof(p
->stats
));
731 memset(&p
->stats
, 0, sizeof(p
->stats
));
737 * Summary of port operations
739 struct rte_port_in_ops rte_port_ring_reader_ops
= {
740 .f_create
= rte_port_ring_reader_create
,
741 .f_free
= rte_port_ring_reader_free
,
742 .f_rx
= rte_port_ring_reader_rx
,
743 .f_stats
= rte_port_ring_reader_stats_read
,
746 struct rte_port_out_ops rte_port_ring_writer_ops
= {
747 .f_create
= rte_port_ring_writer_create
,
748 .f_free
= rte_port_ring_writer_free
,
749 .f_tx
= rte_port_ring_writer_tx
,
750 .f_tx_bulk
= rte_port_ring_writer_tx_bulk
,
751 .f_flush
= rte_port_ring_writer_flush
,
752 .f_stats
= rte_port_ring_writer_stats_read
,
755 struct rte_port_out_ops rte_port_ring_writer_nodrop_ops
= {
756 .f_create
= rte_port_ring_writer_nodrop_create
,
757 .f_free
= rte_port_ring_writer_nodrop_free
,
758 .f_tx
= rte_port_ring_writer_nodrop_tx
,
759 .f_tx_bulk
= rte_port_ring_writer_nodrop_tx_bulk
,
760 .f_flush
= rte_port_ring_writer_nodrop_flush
,
761 .f_stats
= rte_port_ring_writer_nodrop_stats_read
,
764 struct rte_port_in_ops rte_port_ring_multi_reader_ops
= {
765 .f_create
= rte_port_ring_multi_reader_create
,
766 .f_free
= rte_port_ring_reader_free
,
767 .f_rx
= rte_port_ring_multi_reader_rx
,
768 .f_stats
= rte_port_ring_reader_stats_read
,
771 struct rte_port_out_ops rte_port_ring_multi_writer_ops
= {
772 .f_create
= rte_port_ring_multi_writer_create
,
773 .f_free
= rte_port_ring_writer_free
,
774 .f_tx
= rte_port_ring_multi_writer_tx
,
775 .f_tx_bulk
= rte_port_ring_multi_writer_tx_bulk
,
776 .f_flush
= rte_port_ring_multi_writer_flush
,
777 .f_stats
= rte_port_ring_writer_stats_read
,
780 struct rte_port_out_ops rte_port_ring_multi_writer_nodrop_ops
= {
781 .f_create
= rte_port_ring_multi_writer_nodrop_create
,
782 .f_free
= rte_port_ring_writer_nodrop_free
,
783 .f_tx
= rte_port_ring_multi_writer_nodrop_tx
,
784 .f_tx_bulk
= rte_port_ring_multi_writer_nodrop_tx_bulk
,
785 .f_flush
= rte_port_ring_multi_writer_nodrop_flush
,
786 .f_stats
= rte_port_ring_writer_nodrop_stats_read
,