]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/dpdk/app/test-eventdev/test_pipeline_atq.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / dpdk / app / test-eventdev / test_pipeline_atq.c
1 /*
2 * SPDX-License-Identifier: BSD-3-Clause
3 * Copyright 2017 Cavium, Inc.
4 */
5
6 #include "test_pipeline_common.h"
7
8 /* See http://doc.dpdk.org/guides/tools/testeventdev.html for test details */
9
10 static __rte_always_inline int
11 pipeline_atq_nb_event_queues(struct evt_options *opt)
12 {
13 RTE_SET_USED(opt);
14
15 return rte_eth_dev_count_avail();
16 }
17
18 static __rte_noinline int
19 pipeline_atq_worker_single_stage_tx(void *arg)
20 {
21 PIPELINE_WORKER_SINGLE_STAGE_INIT;
22
23 while (t->done == false) {
24 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
25
26 if (!event) {
27 rte_pause();
28 continue;
29 }
30
31 pipeline_event_tx(dev, port, &ev);
32 w->processed_pkts++;
33 }
34
35 return 0;
36 }
37
38 static __rte_noinline int
39 pipeline_atq_worker_single_stage_fwd(void *arg)
40 {
41 PIPELINE_WORKER_SINGLE_STAGE_INIT;
42 const uint8_t *tx_queue = t->tx_evqueue_id;
43
44 while (t->done == false) {
45 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
46
47 if (!event) {
48 rte_pause();
49 continue;
50 }
51
52 ev.queue_id = tx_queue[ev.mbuf->port];
53 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
54 pipeline_event_enqueue(dev, port, &ev);
55 w->processed_pkts++;
56 }
57
58 return 0;
59 }
60
61 static __rte_noinline int
62 pipeline_atq_worker_single_stage_burst_tx(void *arg)
63 {
64 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
65
66 while (t->done == false) {
67 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
68 BURST_SIZE, 0);
69
70 if (!nb_rx) {
71 rte_pause();
72 continue;
73 }
74
75 for (i = 0; i < nb_rx; i++) {
76 rte_prefetch0(ev[i + 1].mbuf);
77 rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
78 }
79
80 pipeline_event_tx_burst(dev, port, ev, nb_rx);
81 w->processed_pkts += nb_rx;
82 }
83
84 return 0;
85 }
86
87 static __rte_noinline int
88 pipeline_atq_worker_single_stage_burst_fwd(void *arg)
89 {
90 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
91 const uint8_t *tx_queue = t->tx_evqueue_id;
92
93 while (t->done == false) {
94 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
95 BURST_SIZE, 0);
96
97 if (!nb_rx) {
98 rte_pause();
99 continue;
100 }
101
102 for (i = 0; i < nb_rx; i++) {
103 rte_prefetch0(ev[i + 1].mbuf);
104 rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
105 ev[i].queue_id = tx_queue[ev[i].mbuf->port];
106 pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
107 }
108
109 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
110 w->processed_pkts += nb_rx;
111 }
112
113 return 0;
114 }
115
116 static __rte_noinline int
117 pipeline_atq_worker_multi_stage_tx(void *arg)
118 {
119 PIPELINE_WORKER_MULTI_STAGE_INIT;
120
121 while (t->done == false) {
122 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
123
124 if (!event) {
125 rte_pause();
126 continue;
127 }
128
129 cq_id = ev.sub_event_type % nb_stages;
130
131 if (cq_id == last_queue) {
132 pipeline_event_tx(dev, port, &ev);
133 w->processed_pkts++;
134 continue;
135 }
136
137 ev.sub_event_type++;
138 pipeline_fwd_event(&ev, sched_type_list[cq_id]);
139 pipeline_event_enqueue(dev, port, &ev);
140 }
141
142 return 0;
143 }
144
145 static __rte_noinline int
146 pipeline_atq_worker_multi_stage_fwd(void *arg)
147 {
148 PIPELINE_WORKER_MULTI_STAGE_INIT;
149 const uint8_t *tx_queue = t->tx_evqueue_id;
150
151 while (t->done == false) {
152 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
153
154 if (!event) {
155 rte_pause();
156 continue;
157 }
158
159 cq_id = ev.sub_event_type % nb_stages;
160
161 if (cq_id == last_queue) {
162 ev.queue_id = tx_queue[ev.mbuf->port];
163 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
164 w->processed_pkts++;
165 } else {
166 ev.sub_event_type++;
167 pipeline_fwd_event(&ev, sched_type_list[cq_id]);
168 }
169
170 pipeline_event_enqueue(dev, port, &ev);
171 }
172
173 return 0;
174 }
175
176 static __rte_noinline int
177 pipeline_atq_worker_multi_stage_burst_tx(void *arg)
178 {
179 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
180
181 while (t->done == false) {
182 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
183 BURST_SIZE, 0);
184
185 if (!nb_rx) {
186 rte_pause();
187 continue;
188 }
189
190 for (i = 0; i < nb_rx; i++) {
191 rte_prefetch0(ev[i + 1].mbuf);
192 cq_id = ev[i].sub_event_type % nb_stages;
193
194 if (cq_id == last_queue) {
195 pipeline_event_tx(dev, port, &ev[i]);
196 ev[i].op = RTE_EVENT_OP_RELEASE;
197 w->processed_pkts++;
198 continue;
199 }
200
201 ev[i].sub_event_type++;
202 pipeline_fwd_event(&ev[i], sched_type_list[cq_id]);
203 }
204
205 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
206 }
207
208 return 0;
209 }
210
211 static __rte_noinline int
212 pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
213 {
214 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
215 const uint8_t *tx_queue = t->tx_evqueue_id;
216
217 while (t->done == false) {
218 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
219 BURST_SIZE, 0);
220
221 if (!nb_rx) {
222 rte_pause();
223 continue;
224 }
225
226 for (i = 0; i < nb_rx; i++) {
227 rte_prefetch0(ev[i + 1].mbuf);
228 cq_id = ev[i].sub_event_type % nb_stages;
229
230 if (cq_id == last_queue) {
231 w->processed_pkts++;
232 ev[i].queue_id = tx_queue[ev[i].mbuf->port];
233 pipeline_fwd_event(&ev[i],
234 RTE_SCHED_TYPE_ATOMIC);
235 } else {
236 ev[i].sub_event_type++;
237 pipeline_fwd_event(&ev[i],
238 sched_type_list[cq_id]);
239 }
240 }
241
242 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
243 }
244
245 return 0;
246 }
247
248 static int
249 worker_wrapper(void *arg)
250 {
251 struct worker_data *w = arg;
252 struct evt_options *opt = w->t->opt;
253 const bool burst = evt_has_burst_mode(w->dev_id);
254 const bool internal_port = w->t->internal_port;
255 const uint8_t nb_stages = opt->nb_stages;
256 RTE_SET_USED(opt);
257
258 if (nb_stages == 1) {
259 if (!burst && internal_port)
260 return pipeline_atq_worker_single_stage_tx(arg);
261 else if (!burst && !internal_port)
262 return pipeline_atq_worker_single_stage_fwd(arg);
263 else if (burst && internal_port)
264 return pipeline_atq_worker_single_stage_burst_tx(arg);
265 else if (burst && !internal_port)
266 return pipeline_atq_worker_single_stage_burst_fwd(arg);
267 } else {
268 if (!burst && internal_port)
269 return pipeline_atq_worker_multi_stage_tx(arg);
270 else if (!burst && !internal_port)
271 return pipeline_atq_worker_multi_stage_fwd(arg);
272 if (burst && internal_port)
273 return pipeline_atq_worker_multi_stage_burst_tx(arg);
274 else if (burst && !internal_port)
275 return pipeline_atq_worker_multi_stage_burst_fwd(arg);
276 }
277
278 rte_panic("invalid worker\n");
279 }
280
281 static int
282 pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt)
283 {
284 return pipeline_launch_lcores(test, opt, worker_wrapper);
285 }
286
287 static int
288 pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
289 {
290 int ret;
291 int nb_ports;
292 int nb_queues;
293 uint8_t queue;
294 uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS];
295 uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
296 uint8_t nb_worker_queues = 0;
297 uint8_t tx_evport_id = 0;
298 uint16_t prod = 0;
299 struct rte_event_dev_info info;
300 struct test_pipeline *t = evt_test_priv(test);
301
302 nb_ports = evt_nr_active_lcores(opt->wlcores);
303 nb_queues = rte_eth_dev_count_avail();
304
305 memset(tx_evqueue_id, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS);
306 memset(queue_arr, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV);
307 /* One queue for Tx adapter per port */
308 if (!t->internal_port) {
309 RTE_ETH_FOREACH_DEV(prod) {
310 tx_evqueue_id[prod] = nb_queues;
311 nb_queues++;
312 }
313 }
314
315 rte_event_dev_info_get(opt->dev_id, &info);
316
317 ret = evt_configure_eventdev(opt, nb_queues, nb_ports);
318 if (ret) {
319 evt_err("failed to configure eventdev %d", opt->dev_id);
320 return ret;
321 }
322
323 struct rte_event_queue_conf q_conf = {
324 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
325 .nb_atomic_flows = opt->nb_flows,
326 .nb_atomic_order_sequences = opt->nb_flows,
327 };
328 /* queue configurations */
329 for (queue = 0; queue < nb_queues; queue++) {
330 q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
331
332 if (!t->internal_port) {
333 RTE_ETH_FOREACH_DEV(prod) {
334 if (queue == tx_evqueue_id[prod]) {
335 q_conf.event_queue_cfg =
336 RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
337 } else {
338 queue_arr[nb_worker_queues] = queue;
339 nb_worker_queues++;
340 }
341 }
342 }
343
344 ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
345 if (ret) {
346 evt_err("failed to setup queue=%d", queue);
347 return ret;
348 }
349 }
350
351 if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
352 opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
353
354 /* port configuration */
355 const struct rte_event_port_conf p_conf = {
356 .dequeue_depth = opt->wkr_deq_dep,
357 .enqueue_depth = info.max_event_port_dequeue_depth,
358 .new_event_threshold = info.max_num_events,
359 };
360
361 if (!t->internal_port)
362 ret = pipeline_event_port_setup(test, opt, queue_arr,
363 nb_worker_queues, p_conf);
364 else
365 ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
366 p_conf);
367
368 if (ret)
369 return ret;
370
371 /*
372 * The pipelines are setup in the following manner:
373 *
374 * eth_dev_count = 2, nb_stages = 2, atq mode
375 *
376 * eth0, eth1 have Internal port capability :
377 * queues = 2
378 * stride = 1
379 *
380 * event queue pipelines:
381 * eth0 -> q0 ->Tx
382 * eth1 -> q1 ->Tx
383 *
384 * q0, q1 are configured as ATQ so, all the different stages can
385 * be enqueued on the same queue.
386 *
387 * eth0, eth1 use Tx adapters service core :
388 * queues = 4
389 * stride = 1
390 *
391 * event queue pipelines:
392 * eth0 -> q0 -> q2 -> Tx
393 * eth1 -> q1 -> q3 -> Tx
394 *
395 * q0, q1 are configured as stated above.
396 * q2, q3 configured as SINGLE_LINK.
397 */
398 ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf);
399 if (ret)
400 return ret;
401 ret = pipeline_event_tx_adapter_setup(opt, p_conf);
402 if (ret)
403 return ret;
404
405 if (!evt_has_distributed_sched(opt->dev_id)) {
406 uint32_t service_id;
407 rte_event_dev_service_id_get(opt->dev_id, &service_id);
408 ret = evt_service_setup(service_id);
409 if (ret) {
410 evt_err("No service lcore found to run event dev.");
411 return ret;
412 }
413 }
414
415 /* Connect the tx_evqueue_id to the Tx adapter port */
416 if (!t->internal_port) {
417 RTE_ETH_FOREACH_DEV(prod) {
418 ret = rte_event_eth_tx_adapter_event_port_get(prod,
419 &tx_evport_id);
420 if (ret) {
421 evt_err("Unable to get Tx adapter[%d]", prod);
422 return ret;
423 }
424
425 if (rte_event_port_link(opt->dev_id, tx_evport_id,
426 &tx_evqueue_id[prod],
427 NULL, 1) != 1) {
428 evt_err("Unable to link Tx adptr[%d] evprt[%d]",
429 prod, tx_evport_id);
430 return ret;
431 }
432 }
433 }
434
435 ret = rte_event_dev_start(opt->dev_id);
436 if (ret) {
437 evt_err("failed to start eventdev %d", opt->dev_id);
438 return ret;
439 }
440
441
442 RTE_ETH_FOREACH_DEV(prod) {
443 ret = rte_eth_dev_start(prod);
444 if (ret) {
445 evt_err("Ethernet dev [%d] failed to start."
446 " Using synthetic producer", prod);
447 return ret;
448 }
449 }
450
451 RTE_ETH_FOREACH_DEV(prod) {
452 ret = rte_event_eth_rx_adapter_start(prod);
453 if (ret) {
454 evt_err("Rx adapter[%d] start failed", prod);
455 return ret;
456 }
457
458 ret = rte_event_eth_tx_adapter_start(prod);
459 if (ret) {
460 evt_err("Tx adapter[%d] start failed", prod);
461 return ret;
462 }
463 }
464
465 memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) *
466 RTE_MAX_ETHPORTS);
467
468 return 0;
469 }
470
471 static void
472 pipeline_atq_opt_dump(struct evt_options *opt)
473 {
474 pipeline_opt_dump(opt, pipeline_atq_nb_event_queues(opt));
475 }
476
477 static int
478 pipeline_atq_opt_check(struct evt_options *opt)
479 {
480 return pipeline_opt_check(opt, pipeline_atq_nb_event_queues(opt));
481 }
482
483 static bool
484 pipeline_atq_capability_check(struct evt_options *opt)
485 {
486 struct rte_event_dev_info dev_info;
487
488 rte_event_dev_info_get(opt->dev_id, &dev_info);
489 if (dev_info.max_event_queues < pipeline_atq_nb_event_queues(opt) ||
490 dev_info.max_event_ports <
491 evt_nr_active_lcores(opt->wlcores)) {
492 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
493 pipeline_atq_nb_event_queues(opt),
494 dev_info.max_event_queues,
495 evt_nr_active_lcores(opt->wlcores),
496 dev_info.max_event_ports);
497 }
498
499 return true;
500 }
501
502 static const struct evt_test_ops pipeline_atq = {
503 .cap_check = pipeline_atq_capability_check,
504 .opt_check = pipeline_atq_opt_check,
505 .opt_dump = pipeline_atq_opt_dump,
506 .test_setup = pipeline_test_setup,
507 .mempool_setup = pipeline_mempool_setup,
508 .ethdev_setup = pipeline_ethdev_setup,
509 .eventdev_setup = pipeline_atq_eventdev_setup,
510 .launch_lcores = pipeline_atq_launch_lcores,
511 .eventdev_destroy = pipeline_eventdev_destroy,
512 .mempool_destroy = pipeline_mempool_destroy,
513 .ethdev_destroy = pipeline_ethdev_destroy,
514 .test_result = pipeline_test_result,
515 .test_destroy = pipeline_test_destroy,
516 };
517
518 EVT_TEST_REGISTER(pipeline_atq);