2 * SPDX-License-Identifier: BSD-3-Clause
3 * Copyright 2017 Cavium, Inc.
6 #include "test_pipeline_common.h"
8 /* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */
10 static __rte_always_inline
int
11 pipeline_queue_nb_event_queues(struct evt_options
*opt
)
13 uint16_t eth_count
= rte_eth_dev_count_avail();
15 return (eth_count
* opt
->nb_stages
) + eth_count
;
19 pipeline_queue_worker_single_stage_tx(void *arg
)
21 PIPELINE_WROKER_SINGLE_STAGE_INIT
;
23 while (t
->done
== false) {
24 uint16_t event
= rte_event_dequeue_burst(dev
, port
, &ev
, 1, 0);
31 if (ev
.sched_type
== RTE_SCHED_TYPE_ATOMIC
) {
32 pipeline_tx_pkt(ev
.mbuf
);
36 pipeline_fwd_event(&ev
, RTE_SCHED_TYPE_ATOMIC
);
37 pipeline_event_enqueue(dev
, port
, &ev
);
45 pipeline_queue_worker_single_stage_fwd(void *arg
)
47 PIPELINE_WROKER_SINGLE_STAGE_INIT
;
48 const uint8_t tx_queue
= t
->tx_service
.queue_id
;
50 while (t
->done
== false) {
51 uint16_t event
= rte_event_dequeue_burst(dev
, port
, &ev
, 1, 0);
58 ev
.queue_id
= tx_queue
;
59 pipeline_fwd_event(&ev
, RTE_SCHED_TYPE_ATOMIC
);
60 pipeline_event_enqueue(dev
, port
, &ev
);
68 pipeline_queue_worker_single_stage_burst_tx(void *arg
)
70 PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT
;
72 while (t
->done
== false) {
73 uint16_t nb_rx
= rte_event_dequeue_burst(dev
, port
, ev
,
81 for (i
= 0; i
< nb_rx
; i
++) {
82 rte_prefetch0(ev
[i
+ 1].mbuf
);
83 if (ev
[i
].sched_type
== RTE_SCHED_TYPE_ATOMIC
) {
85 pipeline_tx_pkt(ev
[i
].mbuf
);
86 ev
[i
].op
= RTE_EVENT_OP_RELEASE
;
90 pipeline_fwd_event(&ev
[i
],
91 RTE_SCHED_TYPE_ATOMIC
);
95 pipeline_event_enqueue_burst(dev
, port
, ev
, nb_rx
);
102 pipeline_queue_worker_single_stage_burst_fwd(void *arg
)
104 PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT
;
105 const uint8_t tx_queue
= t
->tx_service
.queue_id
;
107 while (t
->done
== false) {
108 uint16_t nb_rx
= rte_event_dequeue_burst(dev
, port
, ev
,
116 for (i
= 0; i
< nb_rx
; i
++) {
117 rte_prefetch0(ev
[i
+ 1].mbuf
);
118 ev
[i
].queue_id
= tx_queue
;
119 pipeline_fwd_event(&ev
[i
], RTE_SCHED_TYPE_ATOMIC
);
123 pipeline_event_enqueue_burst(dev
, port
, ev
, nb_rx
);
131 pipeline_queue_worker_multi_stage_tx(void *arg
)
133 PIPELINE_WROKER_MULTI_STAGE_INIT
;
134 const uint8_t nb_stages
= t
->opt
->nb_stages
+ 1;
136 while (t
->done
== false) {
137 uint16_t event
= rte_event_dequeue_burst(dev
, port
, &ev
, 1, 0);
144 cq_id
= ev
.queue_id
% nb_stages
;
146 if (cq_id
>= last_queue
) {
147 if (ev
.sched_type
== RTE_SCHED_TYPE_ATOMIC
) {
149 pipeline_tx_pkt(ev
.mbuf
);
153 ev
.queue_id
+= (cq_id
== last_queue
) ? 1 : 0;
154 pipeline_fwd_event(&ev
, RTE_SCHED_TYPE_ATOMIC
);
157 pipeline_fwd_event(&ev
, sched_type_list
[cq_id
]);
160 pipeline_event_enqueue(dev
, port
, &ev
);
166 pipeline_queue_worker_multi_stage_fwd(void *arg
)
168 PIPELINE_WROKER_MULTI_STAGE_INIT
;
169 const uint8_t nb_stages
= t
->opt
->nb_stages
+ 1;
170 const uint8_t tx_queue
= t
->tx_service
.queue_id
;
172 while (t
->done
== false) {
173 uint16_t event
= rte_event_dequeue_burst(dev
, port
, &ev
, 1, 0);
180 cq_id
= ev
.queue_id
% nb_stages
;
182 if (cq_id
== last_queue
) {
183 ev
.queue_id
= tx_queue
;
184 pipeline_fwd_event(&ev
, RTE_SCHED_TYPE_ATOMIC
);
188 pipeline_fwd_event(&ev
, sched_type_list
[cq_id
]);
191 pipeline_event_enqueue(dev
, port
, &ev
);
197 pipeline_queue_worker_multi_stage_burst_tx(void *arg
)
199 PIPELINE_WROKER_MULTI_STAGE_BURST_INIT
;
200 const uint8_t nb_stages
= t
->opt
->nb_stages
+ 1;
202 while (t
->done
== false) {
203 uint16_t nb_rx
= rte_event_dequeue_burst(dev
, port
, ev
,
211 for (i
= 0; i
< nb_rx
; i
++) {
212 rte_prefetch0(ev
[i
+ 1].mbuf
);
213 cq_id
= ev
[i
].queue_id
% nb_stages
;
215 if (cq_id
>= last_queue
) {
216 if (ev
[i
].sched_type
== RTE_SCHED_TYPE_ATOMIC
) {
218 pipeline_tx_pkt(ev
[i
].mbuf
);
219 ev
[i
].op
= RTE_EVENT_OP_RELEASE
;
224 ev
[i
].queue_id
+= (cq_id
== last_queue
) ? 1 : 0;
225 pipeline_fwd_event(&ev
[i
],
226 RTE_SCHED_TYPE_ATOMIC
);
229 pipeline_fwd_event(&ev
[i
],
230 sched_type_list
[cq_id
]);
235 pipeline_event_enqueue_burst(dev
, port
, ev
, nb_rx
);
241 pipeline_queue_worker_multi_stage_burst_fwd(void *arg
)
243 PIPELINE_WROKER_MULTI_STAGE_BURST_INIT
;
244 const uint8_t nb_stages
= t
->opt
->nb_stages
+ 1;
245 const uint8_t tx_queue
= t
->tx_service
.queue_id
;
247 while (t
->done
== false) {
248 uint16_t nb_rx
= rte_event_dequeue_burst(dev
, port
, ev
,
256 for (i
= 0; i
< nb_rx
; i
++) {
257 rte_prefetch0(ev
[i
+ 1].mbuf
);
258 cq_id
= ev
[i
].queue_id
% nb_stages
;
260 if (cq_id
== last_queue
) {
261 ev
[i
].queue_id
= tx_queue
;
262 pipeline_fwd_event(&ev
[i
],
263 RTE_SCHED_TYPE_ATOMIC
);
267 pipeline_fwd_event(&ev
[i
],
268 sched_type_list
[cq_id
]);
272 pipeline_event_enqueue_burst(dev
, port
, ev
, nb_rx
);
278 worker_wrapper(void *arg
)
280 struct worker_data
*w
= arg
;
281 struct evt_options
*opt
= w
->t
->opt
;
282 const bool burst
= evt_has_burst_mode(w
->dev_id
);
283 const bool mt_safe
= !w
->t
->mt_unsafe
;
284 const uint8_t nb_stages
= opt
->nb_stages
;
287 if (nb_stages
== 1) {
288 if (!burst
&& mt_safe
)
289 return pipeline_queue_worker_single_stage_tx(arg
);
290 else if (!burst
&& !mt_safe
)
291 return pipeline_queue_worker_single_stage_fwd(arg
);
292 else if (burst
&& mt_safe
)
293 return pipeline_queue_worker_single_stage_burst_tx(arg
);
294 else if (burst
&& !mt_safe
)
295 return pipeline_queue_worker_single_stage_burst_fwd(
298 if (!burst
&& mt_safe
)
299 return pipeline_queue_worker_multi_stage_tx(arg
);
300 else if (!burst
&& !mt_safe
)
301 return pipeline_queue_worker_multi_stage_fwd(arg
);
302 else if (burst
&& mt_safe
)
303 return pipeline_queue_worker_multi_stage_burst_tx(arg
);
304 else if (burst
&& !mt_safe
)
305 return pipeline_queue_worker_multi_stage_burst_fwd(arg
);
308 rte_panic("invalid worker\n");
312 pipeline_queue_launch_lcores(struct evt_test
*test
, struct evt_options
*opt
)
314 struct test_pipeline
*t
= evt_test_priv(test
);
317 rte_service_component_runstate_set(t
->tx_service
.service_id
, 1);
318 return pipeline_launch_lcores(test
, opt
, worker_wrapper
);
322 pipeline_queue_eventdev_setup(struct evt_test
*test
, struct evt_options
*opt
)
327 int nb_stages
= opt
->nb_stages
;
329 struct rte_event_dev_info info
;
330 struct test_pipeline
*t
= evt_test_priv(test
);
331 uint8_t tx_evqueue_id
= 0;
332 uint8_t queue_arr
[RTE_EVENT_MAX_QUEUES_PER_DEV
];
333 uint8_t nb_worker_queues
= 0;
335 nb_ports
= evt_nr_active_lcores(opt
->wlcores
);
336 nb_queues
= rte_eth_dev_count_avail() * (nb_stages
);
338 /* Extra port for Tx service. */
340 tx_evqueue_id
= nb_queues
;
344 nb_queues
+= rte_eth_dev_count_avail();
346 rte_event_dev_info_get(opt
->dev_id
, &info
);
348 const struct rte_event_dev_config config
= {
349 .nb_event_queues
= nb_queues
,
350 .nb_event_ports
= nb_ports
,
351 .nb_events_limit
= info
.max_num_events
,
352 .nb_event_queue_flows
= opt
->nb_flows
,
353 .nb_event_port_dequeue_depth
=
354 info
.max_event_port_dequeue_depth
,
355 .nb_event_port_enqueue_depth
=
356 info
.max_event_port_enqueue_depth
,
358 ret
= rte_event_dev_configure(opt
->dev_id
, &config
);
360 evt_err("failed to configure eventdev %d", opt
->dev_id
);
364 struct rte_event_queue_conf q_conf
= {
365 .priority
= RTE_EVENT_DEV_PRIORITY_NORMAL
,
366 .nb_atomic_flows
= opt
->nb_flows
,
367 .nb_atomic_order_sequences
= opt
->nb_flows
,
369 /* queue configurations */
370 for (queue
= 0; queue
< nb_queues
; queue
++) {
374 slot
= queue
% (nb_stages
+ 1);
375 q_conf
.schedule_type
= slot
== nb_stages
?
376 RTE_SCHED_TYPE_ATOMIC
:
377 opt
->sched_type_list
[slot
];
379 slot
= queue
% nb_stages
;
381 if (queue
== tx_evqueue_id
) {
382 q_conf
.schedule_type
= RTE_SCHED_TYPE_ATOMIC
;
383 q_conf
.event_queue_cfg
=
384 RTE_EVENT_QUEUE_CFG_SINGLE_LINK
;
386 q_conf
.schedule_type
=
387 opt
->sched_type_list
[slot
];
388 queue_arr
[nb_worker_queues
] = queue
;
393 ret
= rte_event_queue_setup(opt
->dev_id
, queue
, &q_conf
);
395 evt_err("failed to setup queue=%d", queue
);
400 if (opt
->wkr_deq_dep
> info
.max_event_port_dequeue_depth
)
401 opt
->wkr_deq_dep
= info
.max_event_port_dequeue_depth
;
403 /* port configuration */
404 const struct rte_event_port_conf p_conf
= {
405 .dequeue_depth
= opt
->wkr_deq_dep
,
406 .enqueue_depth
= info
.max_event_port_dequeue_depth
,
407 .new_event_threshold
= info
.max_num_events
,
411 * If tx is multi thread safe then allow workers to do Tx else use Tx
412 * service to Tx packets.
415 ret
= pipeline_event_port_setup(test
, opt
, queue_arr
,
416 nb_worker_queues
, p_conf
);
420 ret
= pipeline_event_tx_service_setup(test
, opt
, tx_evqueue_id
,
421 nb_ports
- 1, p_conf
);
424 ret
= pipeline_event_port_setup(test
, opt
, NULL
, nb_queues
,
430 * The pipelines are setup in the following manner:
432 * eth_dev_count = 2, nb_stages = 2.
434 * Multi thread safe :
438 * event queue pipelines:
439 * eth0 -> q0 -> q1 -> (q2->tx)
440 * eth1 -> q3 -> q4 -> (q5->tx)
442 * q2, q5 configured as ATOMIC
444 * Multi thread unsafe :
448 * event queue pipelines:
450 * } (q4->tx) Tx service
453 * q4 configured as SINGLE_LINK|ATOMIC
455 ret
= pipeline_event_rx_adapter_setup(opt
,
456 t
->mt_unsafe
? nb_stages
: nb_stages
+ 1, p_conf
);
460 if (!evt_has_distributed_sched(opt
->dev_id
)) {
462 rte_event_dev_service_id_get(opt
->dev_id
, &service_id
);
463 ret
= evt_service_setup(service_id
);
465 evt_err("No service lcore found to run event dev.");
470 ret
= rte_event_dev_start(opt
->dev_id
);
472 evt_err("failed to start eventdev %d", opt
->dev_id
);
480 pipeline_queue_opt_dump(struct evt_options
*opt
)
482 pipeline_opt_dump(opt
, pipeline_queue_nb_event_queues(opt
));
486 pipeline_queue_opt_check(struct evt_options
*opt
)
488 return pipeline_opt_check(opt
, pipeline_queue_nb_event_queues(opt
));
492 pipeline_queue_capability_check(struct evt_options
*opt
)
494 struct rte_event_dev_info dev_info
;
496 rte_event_dev_info_get(opt
->dev_id
, &dev_info
);
497 if (dev_info
.max_event_queues
< pipeline_queue_nb_event_queues(opt
) ||
498 dev_info
.max_event_ports
<
499 evt_nr_active_lcores(opt
->wlcores
)) {
500 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
501 pipeline_queue_nb_event_queues(opt
),
502 dev_info
.max_event_queues
,
503 evt_nr_active_lcores(opt
->wlcores
),
504 dev_info
.max_event_ports
);
510 static const struct evt_test_ops pipeline_queue
= {
511 .cap_check
= pipeline_queue_capability_check
,
512 .opt_check
= pipeline_queue_opt_check
,
513 .opt_dump
= pipeline_queue_opt_dump
,
514 .test_setup
= pipeline_test_setup
,
515 .mempool_setup
= pipeline_mempool_setup
,
516 .ethdev_setup
= pipeline_ethdev_setup
,
517 .eventdev_setup
= pipeline_queue_eventdev_setup
,
518 .launch_lcores
= pipeline_queue_launch_lcores
,
519 .eventdev_destroy
= pipeline_eventdev_destroy
,
520 .mempool_destroy
= pipeline_mempool_destroy
,
521 .ethdev_destroy
= pipeline_ethdev_destroy
,
522 .test_result
= pipeline_test_result
,
523 .test_destroy
= pipeline_test_destroy
,
526 EVT_TEST_REGISTER(pipeline_queue
);