]> git.proxmox.com Git - ceph.git/blame - ceph/src/dpdk/lib/librte_port/rte_port_ring.c
bump version to 12.2.12-pve1
[ceph.git] / ceph / src / dpdk / lib / librte_port / rte_port_ring.c
CommitLineData
7c673cae
FG
1/*-
2 * BSD LICENSE
3 *
4 * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33#include <string.h>
34#include <stdint.h>
35
36#include <rte_mbuf.h>
37#include <rte_ring.h>
38#include <rte_malloc.h>
39
40#include "rte_port_ring.h"
41
42/*
43 * Port RING Reader
44 */
45#ifdef RTE_PORT_STATS_COLLECT
46
47#define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val) \
48 port->stats.n_pkts_in += val
49#define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val) \
50 port->stats.n_pkts_drop += val
51
52#else
53
54#define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val)
55#define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val)
56
57#endif
58
59struct rte_port_ring_reader {
60 struct rte_port_in_stats stats;
61
62 struct rte_ring *ring;
63};
64
65static void *
66rte_port_ring_reader_create_internal(void *params, int socket_id,
67 uint32_t is_multi)
68{
69 struct rte_port_ring_reader_params *conf =
70 (struct rte_port_ring_reader_params *) params;
71 struct rte_port_ring_reader *port;
72
73 /* Check input parameters */
74 if ((conf == NULL) ||
75 (conf->ring == NULL) ||
76 (conf->ring->cons.sc_dequeue && is_multi) ||
77 (!(conf->ring->cons.sc_dequeue) && !is_multi)) {
78 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
79 return NULL;
80 }
81
82 /* Memory allocation */
83 port = rte_zmalloc_socket("PORT", sizeof(*port),
84 RTE_CACHE_LINE_SIZE, socket_id);
85 if (port == NULL) {
86 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
87 return NULL;
88 }
89
90 /* Initialization */
91 port->ring = conf->ring;
92
93 return port;
94}
95
96static void *
97rte_port_ring_reader_create(void *params, int socket_id)
98{
99 return rte_port_ring_reader_create_internal(params, socket_id, 0);
100}
101
102static void *
103rte_port_ring_multi_reader_create(void *params, int socket_id)
104{
105 return rte_port_ring_reader_create_internal(params, socket_id, 1);
106}
107
108static int
109rte_port_ring_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
110{
111 struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port;
112 uint32_t nb_rx;
113
114 nb_rx = rte_ring_sc_dequeue_burst(p->ring, (void **) pkts, n_pkts);
115 RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
116
117 return nb_rx;
118}
119
120static int
121rte_port_ring_multi_reader_rx(void *port, struct rte_mbuf **pkts,
122 uint32_t n_pkts)
123{
124 struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port;
125 uint32_t nb_rx;
126
127 nb_rx = rte_ring_mc_dequeue_burst(p->ring, (void **) pkts, n_pkts);
128 RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
129
130 return nb_rx;
131}
132
133static int
134rte_port_ring_reader_free(void *port)
135{
136 if (port == NULL) {
137 RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
138 return -EINVAL;
139 }
140
141 rte_free(port);
142
143 return 0;
144}
145
146static int
147rte_port_ring_reader_stats_read(void *port,
148 struct rte_port_in_stats *stats, int clear)
149{
150 struct rte_port_ring_reader *p =
151 (struct rte_port_ring_reader *) port;
152
153 if (stats != NULL)
154 memcpy(stats, &p->stats, sizeof(p->stats));
155
156 if (clear)
157 memset(&p->stats, 0, sizeof(p->stats));
158
159 return 0;
160}
161
162/*
163 * Port RING Writer
164 */
165#ifdef RTE_PORT_STATS_COLLECT
166
167#define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val) \
168 port->stats.n_pkts_in += val
169#define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val) \
170 port->stats.n_pkts_drop += val
171
172#else
173
174#define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val)
175#define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val)
176
177#endif
178
179struct rte_port_ring_writer {
180 struct rte_port_out_stats stats;
181
182 struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
183 struct rte_ring *ring;
184 uint32_t tx_burst_sz;
185 uint32_t tx_buf_count;
186 uint64_t bsz_mask;
187 uint32_t is_multi;
188};
189
190static void *
191rte_port_ring_writer_create_internal(void *params, int socket_id,
192 uint32_t is_multi)
193{
194 struct rte_port_ring_writer_params *conf =
195 (struct rte_port_ring_writer_params *) params;
196 struct rte_port_ring_writer *port;
197
198 /* Check input parameters */
199 if ((conf == NULL) ||
200 (conf->ring == NULL) ||
201 (conf->ring->prod.sp_enqueue && is_multi) ||
202 (!(conf->ring->prod.sp_enqueue) && !is_multi) ||
203 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
204 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
205 return NULL;
206 }
207
208 /* Memory allocation */
209 port = rte_zmalloc_socket("PORT", sizeof(*port),
210 RTE_CACHE_LINE_SIZE, socket_id);
211 if (port == NULL) {
212 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
213 return NULL;
214 }
215
216 /* Initialization */
217 port->ring = conf->ring;
218 port->tx_burst_sz = conf->tx_burst_sz;
219 port->tx_buf_count = 0;
220 port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
221 port->is_multi = is_multi;
222
223 return port;
224}
225
226static void *
227rte_port_ring_writer_create(void *params, int socket_id)
228{
229 return rte_port_ring_writer_create_internal(params, socket_id, 0);
230}
231
232static void *
233rte_port_ring_multi_writer_create(void *params, int socket_id)
234{
235 return rte_port_ring_writer_create_internal(params, socket_id, 1);
236}
237
238static inline void
239send_burst(struct rte_port_ring_writer *p)
240{
241 uint32_t nb_tx;
242
243 nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
244 p->tx_buf_count);
245
246 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
247 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
248 rte_pktmbuf_free(p->tx_buf[nb_tx]);
249
250 p->tx_buf_count = 0;
251}
252
253static inline void
254send_burst_mp(struct rte_port_ring_writer *p)
255{
256 uint32_t nb_tx;
257
258 nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
259 p->tx_buf_count);
260
261 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
262 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
263 rte_pktmbuf_free(p->tx_buf[nb_tx]);
264
265 p->tx_buf_count = 0;
266}
267
268static int
269rte_port_ring_writer_tx(void *port, struct rte_mbuf *pkt)
270{
271 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
272
273 p->tx_buf[p->tx_buf_count++] = pkt;
274 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
275 if (p->tx_buf_count >= p->tx_burst_sz)
276 send_burst(p);
277
278 return 0;
279}
280
281static int
282rte_port_ring_multi_writer_tx(void *port, struct rte_mbuf *pkt)
283{
284 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
285
286 p->tx_buf[p->tx_buf_count++] = pkt;
287 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
288 if (p->tx_buf_count >= p->tx_burst_sz)
289 send_burst_mp(p);
290
291 return 0;
292}
293
294static inline int __attribute__((always_inline))
295rte_port_ring_writer_tx_bulk_internal(void *port,
296 struct rte_mbuf **pkts,
297 uint64_t pkts_mask,
298 uint32_t is_multi)
299{
300 struct rte_port_ring_writer *p =
301 (struct rte_port_ring_writer *) port;
302
303 uint64_t bsz_mask = p->bsz_mask;
304 uint32_t tx_buf_count = p->tx_buf_count;
305 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
306 ((pkts_mask & bsz_mask) ^ bsz_mask);
307
308 if (expr == 0) {
309 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
310 uint32_t n_pkts_ok;
311
312 if (tx_buf_count) {
313 if (is_multi)
314 send_burst_mp(p);
315 else
316 send_burst(p);
317 }
318
319 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
320 if (is_multi)
321 n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring, (void **)pkts,
322 n_pkts);
323 else
324 n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts,
325 n_pkts);
326
327 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok);
328 for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
329 struct rte_mbuf *pkt = pkts[n_pkts_ok];
330
331 rte_pktmbuf_free(pkt);
332 }
333 } else {
334 for ( ; pkts_mask; ) {
335 uint32_t pkt_index = __builtin_ctzll(pkts_mask);
336 uint64_t pkt_mask = 1LLU << pkt_index;
337 struct rte_mbuf *pkt = pkts[pkt_index];
338
339 p->tx_buf[tx_buf_count++] = pkt;
340 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
341 pkts_mask &= ~pkt_mask;
342 }
343
344 p->tx_buf_count = tx_buf_count;
345 if (tx_buf_count >= p->tx_burst_sz) {
346 if (is_multi)
347 send_burst_mp(p);
348 else
349 send_burst(p);
350 }
351 }
352
353 return 0;
354}
355
356static int
357rte_port_ring_writer_tx_bulk(void *port,
358 struct rte_mbuf **pkts,
359 uint64_t pkts_mask)
360{
361 return rte_port_ring_writer_tx_bulk_internal(port, pkts, pkts_mask, 0);
362}
363
364static int
365rte_port_ring_multi_writer_tx_bulk(void *port,
366 struct rte_mbuf **pkts,
367 uint64_t pkts_mask)
368{
369 return rte_port_ring_writer_tx_bulk_internal(port, pkts, pkts_mask, 1);
370}
371
372static int
373rte_port_ring_writer_flush(void *port)
374{
375 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
376
377 if (p->tx_buf_count > 0)
378 send_burst(p);
379
380 return 0;
381}
382
383static int
384rte_port_ring_multi_writer_flush(void *port)
385{
386 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
387
388 if (p->tx_buf_count > 0)
389 send_burst_mp(p);
390
391 return 0;
392}
393
394static int
395rte_port_ring_writer_free(void *port)
396{
397 struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
398
399 if (port == NULL) {
400 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
401 return -EINVAL;
402 }
403
404 if (p->is_multi)
405 rte_port_ring_multi_writer_flush(port);
406 else
407 rte_port_ring_writer_flush(port);
408
409 rte_free(port);
410
411 return 0;
412}
413
414static int
415rte_port_ring_writer_stats_read(void *port,
416 struct rte_port_out_stats *stats, int clear)
417{
418 struct rte_port_ring_writer *p =
419 (struct rte_port_ring_writer *) port;
420
421 if (stats != NULL)
422 memcpy(stats, &p->stats, sizeof(p->stats));
423
424 if (clear)
425 memset(&p->stats, 0, sizeof(p->stats));
426
427 return 0;
428}
429
430/*
431 * Port RING Writer Nodrop
432 */
433#ifdef RTE_PORT_STATS_COLLECT
434
435#define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
436 port->stats.n_pkts_in += val
437#define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
438 port->stats.n_pkts_drop += val
439
440#else
441
442#define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
443#define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
444
445#endif
446
447struct rte_port_ring_writer_nodrop {
448 struct rte_port_out_stats stats;
449
450 struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
451 struct rte_ring *ring;
452 uint32_t tx_burst_sz;
453 uint32_t tx_buf_count;
454 uint64_t bsz_mask;
455 uint64_t n_retries;
456 uint32_t is_multi;
457};
458
459static void *
460rte_port_ring_writer_nodrop_create_internal(void *params, int socket_id,
461 uint32_t is_multi)
462{
463 struct rte_port_ring_writer_nodrop_params *conf =
464 (struct rte_port_ring_writer_nodrop_params *) params;
465 struct rte_port_ring_writer_nodrop *port;
466
467 /* Check input parameters */
468 if ((conf == NULL) ||
469 (conf->ring == NULL) ||
470 (conf->ring->prod.sp_enqueue && is_multi) ||
471 (!(conf->ring->prod.sp_enqueue) && !is_multi) ||
472 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
473 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
474 return NULL;
475 }
476
477 /* Memory allocation */
478 port = rte_zmalloc_socket("PORT", sizeof(*port),
479 RTE_CACHE_LINE_SIZE, socket_id);
480 if (port == NULL) {
481 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
482 return NULL;
483 }
484
485 /* Initialization */
486 port->ring = conf->ring;
487 port->tx_burst_sz = conf->tx_burst_sz;
488 port->tx_buf_count = 0;
489 port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
490 port->is_multi = is_multi;
491
492 /*
493 * When n_retries is 0 it means that we should wait for every packet to
494 * send no matter how many retries should it take. To limit number of
495 * branches in fast path, we use UINT64_MAX instead of branching.
496 */
497 port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
498
499 return port;
500}
501
502static void *
503rte_port_ring_writer_nodrop_create(void *params, int socket_id)
504{
505 return rte_port_ring_writer_nodrop_create_internal(params, socket_id, 0);
506}
507
508static void *
509rte_port_ring_multi_writer_nodrop_create(void *params, int socket_id)
510{
511 return rte_port_ring_writer_nodrop_create_internal(params, socket_id, 1);
512}
513
514static inline void
515send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
516{
517 uint32_t nb_tx = 0, i;
518
519 nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
520 p->tx_buf_count);
521
522 /* We sent all the packets in a first try */
523 if (nb_tx >= p->tx_buf_count) {
524 p->tx_buf_count = 0;
525 return;
526 }
527
528 for (i = 0; i < p->n_retries; i++) {
529 nb_tx += rte_ring_sp_enqueue_burst(p->ring,
530 (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
531
532 /* We sent all the packets in more than one try */
533 if (nb_tx >= p->tx_buf_count) {
534 p->tx_buf_count = 0;
535 return;
536 }
537 }
538
539 /* We didn't send the packets in maximum allowed attempts */
540 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
541 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
542 rte_pktmbuf_free(p->tx_buf[nb_tx]);
543
544 p->tx_buf_count = 0;
545}
546
547static inline void
548send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p)
549{
550 uint32_t nb_tx = 0, i;
551
552 nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
553 p->tx_buf_count);
554
555 /* We sent all the packets in a first try */
556 if (nb_tx >= p->tx_buf_count) {
557 p->tx_buf_count = 0;
558 return;
559 }
560
561 for (i = 0; i < p->n_retries; i++) {
562 nb_tx += rte_ring_mp_enqueue_burst(p->ring,
563 (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
564
565 /* We sent all the packets in more than one try */
566 if (nb_tx >= p->tx_buf_count) {
567 p->tx_buf_count = 0;
568 return;
569 }
570 }
571
572 /* We didn't send the packets in maximum allowed attempts */
573 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
574 for ( ; nb_tx < p->tx_buf_count; nb_tx++)
575 rte_pktmbuf_free(p->tx_buf[nb_tx]);
576
577 p->tx_buf_count = 0;
578}
579
580static int
581rte_port_ring_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
582{
583 struct rte_port_ring_writer_nodrop *p =
584 (struct rte_port_ring_writer_nodrop *) port;
585
586 p->tx_buf[p->tx_buf_count++] = pkt;
587 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
588 if (p->tx_buf_count >= p->tx_burst_sz)
589 send_burst_nodrop(p);
590
591 return 0;
592}
593
594static int
595rte_port_ring_multi_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
596{
597 struct rte_port_ring_writer_nodrop *p =
598 (struct rte_port_ring_writer_nodrop *) port;
599
600 p->tx_buf[p->tx_buf_count++] = pkt;
601 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
602 if (p->tx_buf_count >= p->tx_burst_sz)
603 send_burst_mp_nodrop(p);
604
605 return 0;
606}
607
608static inline int __attribute__((always_inline))
609rte_port_ring_writer_nodrop_tx_bulk_internal(void *port,
610 struct rte_mbuf **pkts,
611 uint64_t pkts_mask,
612 uint32_t is_multi)
613{
614 struct rte_port_ring_writer_nodrop *p =
615 (struct rte_port_ring_writer_nodrop *) port;
616
617 uint64_t bsz_mask = p->bsz_mask;
618 uint32_t tx_buf_count = p->tx_buf_count;
619 uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
620 ((pkts_mask & bsz_mask) ^ bsz_mask);
621
622 if (expr == 0) {
623 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
624 uint32_t n_pkts_ok;
625
626 if (tx_buf_count) {
627 if (is_multi)
628 send_burst_mp_nodrop(p);
629 else
630 send_burst_nodrop(p);
631 }
632
633 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
634 if (is_multi)
635 n_pkts_ok =
636 rte_ring_mp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
637 else
638 n_pkts_ok =
639 rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
640
641 if (n_pkts_ok >= n_pkts)
642 return 0;
643
644 /*
645 * If we didn't manage to send all packets in single burst, move
646 * remaining packets to the buffer and call send burst.
647 */
648 for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
649 struct rte_mbuf *pkt = pkts[n_pkts_ok];
650
651 p->tx_buf[p->tx_buf_count++] = pkt;
652 }
653 if (is_multi)
654 send_burst_mp_nodrop(p);
655 else
656 send_burst_nodrop(p);
657 } else {
658 for ( ; pkts_mask; ) {
659 uint32_t pkt_index = __builtin_ctzll(pkts_mask);
660 uint64_t pkt_mask = 1LLU << pkt_index;
661 struct rte_mbuf *pkt = pkts[pkt_index];
662
663 p->tx_buf[tx_buf_count++] = pkt;
664 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
665 pkts_mask &= ~pkt_mask;
666 }
667
668 p->tx_buf_count = tx_buf_count;
669 if (tx_buf_count >= p->tx_burst_sz) {
670 if (is_multi)
671 send_burst_mp_nodrop(p);
672 else
673 send_burst_nodrop(p);
674 }
675 }
676
677 return 0;
678}
679
680static int
681rte_port_ring_writer_nodrop_tx_bulk(void *port,
682 struct rte_mbuf **pkts,
683 uint64_t pkts_mask)
684{
685 return
686 rte_port_ring_writer_nodrop_tx_bulk_internal(port, pkts, pkts_mask, 0);
687}
688
689static int
690rte_port_ring_multi_writer_nodrop_tx_bulk(void *port,
691 struct rte_mbuf **pkts,
692 uint64_t pkts_mask)
693{
694 return
695 rte_port_ring_writer_nodrop_tx_bulk_internal(port, pkts, pkts_mask, 1);
696}
697
698static int
699rte_port_ring_writer_nodrop_flush(void *port)
700{
701 struct rte_port_ring_writer_nodrop *p =
702 (struct rte_port_ring_writer_nodrop *) port;
703
704 if (p->tx_buf_count > 0)
705 send_burst_nodrop(p);
706
707 return 0;
708}
709
710static int
711rte_port_ring_multi_writer_nodrop_flush(void *port)
712{
713 struct rte_port_ring_writer_nodrop *p =
714 (struct rte_port_ring_writer_nodrop *) port;
715
716 if (p->tx_buf_count > 0)
717 send_burst_mp_nodrop(p);
718
719 return 0;
720}
721
722static int
723rte_port_ring_writer_nodrop_free(void *port)
724{
725 struct rte_port_ring_writer_nodrop *p =
726 (struct rte_port_ring_writer_nodrop *) port;
727
728 if (port == NULL) {
729 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
730 return -EINVAL;
731 }
732
733 if (p->is_multi)
734 rte_port_ring_multi_writer_nodrop_flush(port);
735 else
736 rte_port_ring_writer_nodrop_flush(port);
737
738 rte_free(port);
739
740 return 0;
741}
742
743static int
744rte_port_ring_writer_nodrop_stats_read(void *port,
745 struct rte_port_out_stats *stats, int clear)
746{
747 struct rte_port_ring_writer_nodrop *p =
748 (struct rte_port_ring_writer_nodrop *) port;
749
750 if (stats != NULL)
751 memcpy(stats, &p->stats, sizeof(p->stats));
752
753 if (clear)
754 memset(&p->stats, 0, sizeof(p->stats));
755
756 return 0;
757}
758
759/*
760 * Summary of port operations
761 */
762struct rte_port_in_ops rte_port_ring_reader_ops = {
763 .f_create = rte_port_ring_reader_create,
764 .f_free = rte_port_ring_reader_free,
765 .f_rx = rte_port_ring_reader_rx,
766 .f_stats = rte_port_ring_reader_stats_read,
767};
768
769struct rte_port_out_ops rte_port_ring_writer_ops = {
770 .f_create = rte_port_ring_writer_create,
771 .f_free = rte_port_ring_writer_free,
772 .f_tx = rte_port_ring_writer_tx,
773 .f_tx_bulk = rte_port_ring_writer_tx_bulk,
774 .f_flush = rte_port_ring_writer_flush,
775 .f_stats = rte_port_ring_writer_stats_read,
776};
777
778struct rte_port_out_ops rte_port_ring_writer_nodrop_ops = {
779 .f_create = rte_port_ring_writer_nodrop_create,
780 .f_free = rte_port_ring_writer_nodrop_free,
781 .f_tx = rte_port_ring_writer_nodrop_tx,
782 .f_tx_bulk = rte_port_ring_writer_nodrop_tx_bulk,
783 .f_flush = rte_port_ring_writer_nodrop_flush,
784 .f_stats = rte_port_ring_writer_nodrop_stats_read,
785};
786
787struct rte_port_in_ops rte_port_ring_multi_reader_ops = {
788 .f_create = rte_port_ring_multi_reader_create,
789 .f_free = rte_port_ring_reader_free,
790 .f_rx = rte_port_ring_multi_reader_rx,
791 .f_stats = rte_port_ring_reader_stats_read,
792};
793
794struct rte_port_out_ops rte_port_ring_multi_writer_ops = {
795 .f_create = rte_port_ring_multi_writer_create,
796 .f_free = rte_port_ring_writer_free,
797 .f_tx = rte_port_ring_multi_writer_tx,
798 .f_tx_bulk = rte_port_ring_multi_writer_tx_bulk,
799 .f_flush = rte_port_ring_multi_writer_flush,
800 .f_stats = rte_port_ring_writer_stats_read,
801};
802
803struct rte_port_out_ops rte_port_ring_multi_writer_nodrop_ops = {
804 .f_create = rte_port_ring_multi_writer_nodrop_create,
805 .f_free = rte_port_ring_writer_nodrop_free,
806 .f_tx = rte_port_ring_multi_writer_nodrop_tx,
807 .f_tx_bulk = rte_port_ring_multi_writer_nodrop_tx_bulk,
808 .f_flush = rte_port_ring_multi_writer_nodrop_flush,
809 .f_stats = rte_port_ring_writer_nodrop_stats_read,
810};