2 * SPDX-License-Identifier: BSD-3-Clause
3 * Copyright 2017 Cavium, Inc.
6 #include "test_pipeline_common.h"
8 /* See http://doc.dpdk.org/guides/tools/testeventdev.html for test details */
10 static __rte_always_inline
int
11 pipeline_queue_nb_event_queues(struct evt_options
*opt
)
13 uint16_t eth_count
= rte_eth_dev_count_avail();
15 return (eth_count
* opt
->nb_stages
) + eth_count
;
18 static __rte_noinline
int
19 pipeline_queue_worker_single_stage_tx(void *arg
)
21 PIPELINE_WORKER_SINGLE_STAGE_INIT
;
23 while (t
->done
== false) {
24 uint16_t event
= rte_event_dequeue_burst(dev
, port
, &ev
, 1, 0);
31 if (ev
.sched_type
== RTE_SCHED_TYPE_ATOMIC
) {
32 pipeline_event_tx(dev
, port
, &ev
);
36 pipeline_fwd_event(&ev
, RTE_SCHED_TYPE_ATOMIC
);
37 pipeline_event_enqueue(dev
, port
, &ev
);
44 static __rte_noinline
int
45 pipeline_queue_worker_single_stage_fwd(void *arg
)
47 PIPELINE_WORKER_SINGLE_STAGE_INIT
;
48 const uint8_t *tx_queue
= t
->tx_evqueue_id
;
50 while (t
->done
== false) {
51 uint16_t event
= rte_event_dequeue_burst(dev
, port
, &ev
, 1, 0);
58 ev
.queue_id
= tx_queue
[ev
.mbuf
->port
];
59 rte_event_eth_tx_adapter_txq_set(ev
.mbuf
, 0);
60 pipeline_fwd_event(&ev
, RTE_SCHED_TYPE_ATOMIC
);
61 pipeline_event_enqueue(dev
, port
, &ev
);
68 static __rte_noinline
int
69 pipeline_queue_worker_single_stage_burst_tx(void *arg
)
71 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT
;
73 while (t
->done
== false) {
74 uint16_t nb_rx
= rte_event_dequeue_burst(dev
, port
, ev
,
82 for (i
= 0; i
< nb_rx
; i
++) {
83 rte_prefetch0(ev
[i
+ 1].mbuf
);
84 if (ev
[i
].sched_type
== RTE_SCHED_TYPE_ATOMIC
) {
85 pipeline_event_tx(dev
, port
, &ev
[i
]);
86 ev
[i
].op
= RTE_EVENT_OP_RELEASE
;
90 pipeline_fwd_event(&ev
[i
],
91 RTE_SCHED_TYPE_ATOMIC
);
95 pipeline_event_enqueue_burst(dev
, port
, ev
, nb_rx
);
101 static __rte_noinline
int
102 pipeline_queue_worker_single_stage_burst_fwd(void *arg
)
104 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT
;
105 const uint8_t *tx_queue
= t
->tx_evqueue_id
;
107 while (t
->done
== false) {
108 uint16_t nb_rx
= rte_event_dequeue_burst(dev
, port
, ev
,
116 for (i
= 0; i
< nb_rx
; i
++) {
117 rte_prefetch0(ev
[i
+ 1].mbuf
);
118 ev
[i
].queue_id
= tx_queue
[ev
[i
].mbuf
->port
];
119 rte_event_eth_tx_adapter_txq_set(ev
[i
].mbuf
, 0);
120 pipeline_fwd_event(&ev
[i
], RTE_SCHED_TYPE_ATOMIC
);
123 pipeline_event_enqueue_burst(dev
, port
, ev
, nb_rx
);
124 w
->processed_pkts
+= nb_rx
;
131 static __rte_noinline
int
132 pipeline_queue_worker_multi_stage_tx(void *arg
)
134 PIPELINE_WORKER_MULTI_STAGE_INIT
;
135 const uint8_t *tx_queue
= t
->tx_evqueue_id
;
137 while (t
->done
== false) {
138 uint16_t event
= rte_event_dequeue_burst(dev
, port
, &ev
, 1, 0);
145 cq_id
= ev
.queue_id
% nb_stages
;
147 if (ev
.queue_id
== tx_queue
[ev
.mbuf
->port
]) {
148 pipeline_event_tx(dev
, port
, &ev
);
154 pipeline_fwd_event(&ev
, cq_id
!= last_queue
?
155 sched_type_list
[cq_id
] :
156 RTE_SCHED_TYPE_ATOMIC
);
157 pipeline_event_enqueue(dev
, port
, &ev
);
163 static __rte_noinline
int
164 pipeline_queue_worker_multi_stage_fwd(void *arg
)
166 PIPELINE_WORKER_MULTI_STAGE_INIT
;
167 const uint8_t *tx_queue
= t
->tx_evqueue_id
;
169 while (t
->done
== false) {
170 uint16_t event
= rte_event_dequeue_burst(dev
, port
, &ev
, 1, 0);
177 cq_id
= ev
.queue_id
% nb_stages
;
179 if (cq_id
== last_queue
) {
180 ev
.queue_id
= tx_queue
[ev
.mbuf
->port
];
181 rte_event_eth_tx_adapter_txq_set(ev
.mbuf
, 0);
182 pipeline_fwd_event(&ev
, RTE_SCHED_TYPE_ATOMIC
);
186 pipeline_fwd_event(&ev
, sched_type_list
[cq_id
]);
189 pipeline_event_enqueue(dev
, port
, &ev
);
195 static __rte_noinline
int
196 pipeline_queue_worker_multi_stage_burst_tx(void *arg
)
198 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT
;
199 const uint8_t *tx_queue
= t
->tx_evqueue_id
;
201 while (t
->done
== false) {
202 uint16_t nb_rx
= rte_event_dequeue_burst(dev
, port
, ev
,
210 for (i
= 0; i
< nb_rx
; i
++) {
211 rte_prefetch0(ev
[i
+ 1].mbuf
);
212 cq_id
= ev
[i
].queue_id
% nb_stages
;
214 if (ev
[i
].queue_id
== tx_queue
[ev
[i
].mbuf
->port
]) {
215 pipeline_event_tx(dev
, port
, &ev
[i
]);
216 ev
[i
].op
= RTE_EVENT_OP_RELEASE
;
222 pipeline_fwd_event(&ev
[i
], cq_id
!= last_queue
?
223 sched_type_list
[cq_id
] :
224 RTE_SCHED_TYPE_ATOMIC
);
227 pipeline_event_enqueue_burst(dev
, port
, ev
, nb_rx
);
233 static __rte_noinline
int
234 pipeline_queue_worker_multi_stage_burst_fwd(void *arg
)
236 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT
;
237 const uint8_t *tx_queue
= t
->tx_evqueue_id
;
239 while (t
->done
== false) {
240 uint16_t nb_rx
= rte_event_dequeue_burst(dev
, port
, ev
,
248 for (i
= 0; i
< nb_rx
; i
++) {
249 rte_prefetch0(ev
[i
+ 1].mbuf
);
250 cq_id
= ev
[i
].queue_id
% nb_stages
;
252 if (cq_id
== last_queue
) {
253 ev
[i
].queue_id
= tx_queue
[ev
[i
].mbuf
->port
];
254 rte_event_eth_tx_adapter_txq_set(ev
[i
].mbuf
, 0);
255 pipeline_fwd_event(&ev
[i
],
256 RTE_SCHED_TYPE_ATOMIC
);
260 pipeline_fwd_event(&ev
[i
],
261 sched_type_list
[cq_id
]);
265 pipeline_event_enqueue_burst(dev
, port
, ev
, nb_rx
);
272 worker_wrapper(void *arg
)
274 struct worker_data
*w
= arg
;
275 struct evt_options
*opt
= w
->t
->opt
;
276 const bool burst
= evt_has_burst_mode(w
->dev_id
);
277 const bool internal_port
= w
->t
->internal_port
;
278 const uint8_t nb_stages
= opt
->nb_stages
;
281 if (nb_stages
== 1) {
282 if (!burst
&& internal_port
)
283 return pipeline_queue_worker_single_stage_tx(arg
);
284 else if (!burst
&& !internal_port
)
285 return pipeline_queue_worker_single_stage_fwd(arg
);
286 else if (burst
&& internal_port
)
287 return pipeline_queue_worker_single_stage_burst_tx(arg
);
288 else if (burst
&& !internal_port
)
289 return pipeline_queue_worker_single_stage_burst_fwd(
292 if (!burst
&& internal_port
)
293 return pipeline_queue_worker_multi_stage_tx(arg
);
294 else if (!burst
&& !internal_port
)
295 return pipeline_queue_worker_multi_stage_fwd(arg
);
296 else if (burst
&& internal_port
)
297 return pipeline_queue_worker_multi_stage_burst_tx(arg
);
298 else if (burst
&& !internal_port
)
299 return pipeline_queue_worker_multi_stage_burst_fwd(arg
);
302 rte_panic("invalid worker\n");
306 pipeline_queue_launch_lcores(struct evt_test
*test
, struct evt_options
*opt
)
308 return pipeline_launch_lcores(test
, opt
, worker_wrapper
);
312 pipeline_queue_eventdev_setup(struct evt_test
*test
, struct evt_options
*opt
)
317 int nb_stages
= opt
->nb_stages
;
319 uint8_t tx_evport_id
= 0;
320 uint8_t tx_evqueue_id
[RTE_MAX_ETHPORTS
];
321 uint8_t queue_arr
[RTE_EVENT_MAX_QUEUES_PER_DEV
];
322 uint8_t nb_worker_queues
= 0;
324 struct rte_event_dev_info info
;
325 struct test_pipeline
*t
= evt_test_priv(test
);
327 nb_ports
= evt_nr_active_lcores(opt
->wlcores
);
328 nb_queues
= rte_eth_dev_count_avail() * (nb_stages
);
330 /* One queue for Tx adapter per port */
331 nb_queues
+= rte_eth_dev_count_avail();
333 memset(tx_evqueue_id
, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS
);
334 memset(queue_arr
, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV
);
336 rte_event_dev_info_get(opt
->dev_id
, &info
);
337 ret
= evt_configure_eventdev(opt
, nb_queues
, nb_ports
);
339 evt_err("failed to configure eventdev %d", opt
->dev_id
);
343 struct rte_event_queue_conf q_conf
= {
344 .priority
= RTE_EVENT_DEV_PRIORITY_NORMAL
,
345 .nb_atomic_flows
= opt
->nb_flows
,
346 .nb_atomic_order_sequences
= opt
->nb_flows
,
348 /* queue configurations */
349 for (queue
= 0; queue
< nb_queues
; queue
++) {
352 q_conf
.event_queue_cfg
= 0;
353 slot
= queue
% (nb_stages
+ 1);
354 if (slot
== nb_stages
) {
355 q_conf
.schedule_type
= RTE_SCHED_TYPE_ATOMIC
;
356 if (!t
->internal_port
) {
357 q_conf
.event_queue_cfg
=
358 RTE_EVENT_QUEUE_CFG_SINGLE_LINK
;
360 tx_evqueue_id
[prod
++] = queue
;
362 q_conf
.schedule_type
= opt
->sched_type_list
[slot
];
363 queue_arr
[nb_worker_queues
] = queue
;
367 ret
= rte_event_queue_setup(opt
->dev_id
, queue
, &q_conf
);
369 evt_err("failed to setup queue=%d", queue
);
374 if (opt
->wkr_deq_dep
> info
.max_event_port_dequeue_depth
)
375 opt
->wkr_deq_dep
= info
.max_event_port_dequeue_depth
;
377 /* port configuration */
378 const struct rte_event_port_conf p_conf
= {
379 .dequeue_depth
= opt
->wkr_deq_dep
,
380 .enqueue_depth
= info
.max_event_port_dequeue_depth
,
381 .new_event_threshold
= info
.max_num_events
,
384 if (!t
->internal_port
) {
385 ret
= pipeline_event_port_setup(test
, opt
, queue_arr
,
386 nb_worker_queues
, p_conf
);
390 ret
= pipeline_event_port_setup(test
, opt
, NULL
, nb_queues
,
396 * The pipelines are setup in the following manner:
398 * eth_dev_count = 2, nb_stages = 2.
403 * event queue pipelines:
404 * eth0 -> q0 -> q1 -> (q2->tx)
405 * eth1 -> q3 -> q4 -> (q5->tx)
407 * q2, q5 configured as ATOMIC | SINGLE_LINK
410 ret
= pipeline_event_rx_adapter_setup(opt
, nb_stages
+ 1, p_conf
);
414 ret
= pipeline_event_tx_adapter_setup(opt
, p_conf
);
418 if (!evt_has_distributed_sched(opt
->dev_id
)) {
420 rte_event_dev_service_id_get(opt
->dev_id
, &service_id
);
421 ret
= evt_service_setup(service_id
);
423 evt_err("No service lcore found to run event dev.");
428 /* Connect the tx_evqueue_id to the Tx adapter port */
429 if (!t
->internal_port
) {
430 RTE_ETH_FOREACH_DEV(prod
) {
431 ret
= rte_event_eth_tx_adapter_event_port_get(prod
,
434 evt_err("Unable to get Tx adptr[%d] evprt[%d]",
439 if (rte_event_port_link(opt
->dev_id
, tx_evport_id
,
440 &tx_evqueue_id
[prod
],
442 evt_err("Unable to link Tx adptr[%d] evprt[%d]",
449 ret
= rte_event_dev_start(opt
->dev_id
);
451 evt_err("failed to start eventdev %d", opt
->dev_id
);
456 RTE_ETH_FOREACH_DEV(prod
) {
457 ret
= rte_eth_dev_start(prod
);
459 evt_err("Ethernet dev [%d] failed to start."
460 " Using synthetic producer", prod
);
466 RTE_ETH_FOREACH_DEV(prod
) {
467 ret
= rte_event_eth_rx_adapter_start(prod
);
469 evt_err("Rx adapter[%d] start failed", prod
);
473 ret
= rte_event_eth_tx_adapter_start(prod
);
475 evt_err("Tx adapter[%d] start failed", prod
);
480 memcpy(t
->tx_evqueue_id
, tx_evqueue_id
, sizeof(uint8_t) *
487 pipeline_queue_opt_dump(struct evt_options
*opt
)
489 pipeline_opt_dump(opt
, pipeline_queue_nb_event_queues(opt
));
493 pipeline_queue_opt_check(struct evt_options
*opt
)
495 return pipeline_opt_check(opt
, pipeline_queue_nb_event_queues(opt
));
499 pipeline_queue_capability_check(struct evt_options
*opt
)
501 struct rte_event_dev_info dev_info
;
503 rte_event_dev_info_get(opt
->dev_id
, &dev_info
);
504 if (dev_info
.max_event_queues
< pipeline_queue_nb_event_queues(opt
) ||
505 dev_info
.max_event_ports
<
506 evt_nr_active_lcores(opt
->wlcores
)) {
507 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
508 pipeline_queue_nb_event_queues(opt
),
509 dev_info
.max_event_queues
,
510 evt_nr_active_lcores(opt
->wlcores
),
511 dev_info
.max_event_ports
);
517 static const struct evt_test_ops pipeline_queue
= {
518 .cap_check
= pipeline_queue_capability_check
,
519 .opt_check
= pipeline_queue_opt_check
,
520 .opt_dump
= pipeline_queue_opt_dump
,
521 .test_setup
= pipeline_test_setup
,
522 .mempool_setup
= pipeline_mempool_setup
,
523 .ethdev_setup
= pipeline_ethdev_setup
,
524 .eventdev_setup
= pipeline_queue_eventdev_setup
,
525 .launch_lcores
= pipeline_queue_launch_lcores
,
526 .eventdev_destroy
= pipeline_eventdev_destroy
,
527 .mempool_destroy
= pipeline_mempool_destroy
,
528 .ethdev_destroy
= pipeline_ethdev_destroy
,
529 .test_result
= pipeline_test_result
,
530 .test_destroy
= pipeline_test_destroy
,
533 EVT_TEST_REGISTER(pipeline_queue
);