]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/dpdk/app/test-eventdev/test_pipeline_queue.c
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / spdk / dpdk / app / test-eventdev / test_pipeline_queue.c
1 /*
2 * SPDX-License-Identifier: BSD-3-Clause
3 * Copyright 2017 Cavium, Inc.
4 */
5
6 #include "test_pipeline_common.h"
7
8 /* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */
9
10 static __rte_always_inline int
11 pipeline_queue_nb_event_queues(struct evt_options *opt)
12 {
13 uint16_t eth_count = rte_eth_dev_count_avail();
14
15 return (eth_count * opt->nb_stages) + eth_count;
16 }
17
18 static int
19 pipeline_queue_worker_single_stage_tx(void *arg)
20 {
21 PIPELINE_WROKER_SINGLE_STAGE_INIT;
22
23 while (t->done == false) {
24 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
25
26 if (!event) {
27 rte_pause();
28 continue;
29 }
30
31 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
32 pipeline_tx_pkt(ev.mbuf);
33 w->processed_pkts++;
34 } else {
35 ev.queue_id++;
36 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
37 pipeline_event_enqueue(dev, port, &ev);
38 }
39 }
40
41 return 0;
42 }
43
44 static int
45 pipeline_queue_worker_single_stage_fwd(void *arg)
46 {
47 PIPELINE_WROKER_SINGLE_STAGE_INIT;
48 const uint8_t tx_queue = t->tx_service.queue_id;
49
50 while (t->done == false) {
51 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
52
53 if (!event) {
54 rte_pause();
55 continue;
56 }
57
58 ev.queue_id = tx_queue;
59 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
60 pipeline_event_enqueue(dev, port, &ev);
61 w->processed_pkts++;
62 }
63
64 return 0;
65 }
66
67 static int
68 pipeline_queue_worker_single_stage_burst_tx(void *arg)
69 {
70 PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
71
72 while (t->done == false) {
73 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
74 BURST_SIZE, 0);
75
76 if (!nb_rx) {
77 rte_pause();
78 continue;
79 }
80
81 for (i = 0; i < nb_rx; i++) {
82 rte_prefetch0(ev[i + 1].mbuf);
83 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
84
85 pipeline_tx_pkt(ev[i].mbuf);
86 ev[i].op = RTE_EVENT_OP_RELEASE;
87 w->processed_pkts++;
88 } else {
89 ev[i].queue_id++;
90 pipeline_fwd_event(&ev[i],
91 RTE_SCHED_TYPE_ATOMIC);
92 }
93 }
94
95 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
96 }
97
98 return 0;
99 }
100
101 static int
102 pipeline_queue_worker_single_stage_burst_fwd(void *arg)
103 {
104 PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
105 const uint8_t tx_queue = t->tx_service.queue_id;
106
107 while (t->done == false) {
108 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
109 BURST_SIZE, 0);
110
111 if (!nb_rx) {
112 rte_pause();
113 continue;
114 }
115
116 for (i = 0; i < nb_rx; i++) {
117 rte_prefetch0(ev[i + 1].mbuf);
118 ev[i].queue_id = tx_queue;
119 pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
120 w->processed_pkts++;
121 }
122
123 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
124 }
125
126 return 0;
127 }
128
129
130 static int
131 pipeline_queue_worker_multi_stage_tx(void *arg)
132 {
133 PIPELINE_WROKER_MULTI_STAGE_INIT;
134 const uint8_t nb_stages = t->opt->nb_stages + 1;
135
136 while (t->done == false) {
137 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
138
139 if (!event) {
140 rte_pause();
141 continue;
142 }
143
144 cq_id = ev.queue_id % nb_stages;
145
146 if (cq_id >= last_queue) {
147 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
148
149 pipeline_tx_pkt(ev.mbuf);
150 w->processed_pkts++;
151 continue;
152 }
153 ev.queue_id += (cq_id == last_queue) ? 1 : 0;
154 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
155 } else {
156 ev.queue_id++;
157 pipeline_fwd_event(&ev, sched_type_list[cq_id]);
158 }
159
160 pipeline_event_enqueue(dev, port, &ev);
161 }
162 return 0;
163 }
164
165 static int
166 pipeline_queue_worker_multi_stage_fwd(void *arg)
167 {
168 PIPELINE_WROKER_MULTI_STAGE_INIT;
169 const uint8_t nb_stages = t->opt->nb_stages + 1;
170 const uint8_t tx_queue = t->tx_service.queue_id;
171
172 while (t->done == false) {
173 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
174
175 if (!event) {
176 rte_pause();
177 continue;
178 }
179
180 cq_id = ev.queue_id % nb_stages;
181
182 if (cq_id == last_queue) {
183 ev.queue_id = tx_queue;
184 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
185 w->processed_pkts++;
186 } else {
187 ev.queue_id++;
188 pipeline_fwd_event(&ev, sched_type_list[cq_id]);
189 }
190
191 pipeline_event_enqueue(dev, port, &ev);
192 }
193 return 0;
194 }
195
196 static int
197 pipeline_queue_worker_multi_stage_burst_tx(void *arg)
198 {
199 PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
200 const uint8_t nb_stages = t->opt->nb_stages + 1;
201
202 while (t->done == false) {
203 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
204 BURST_SIZE, 0);
205
206 if (!nb_rx) {
207 rte_pause();
208 continue;
209 }
210
211 for (i = 0; i < nb_rx; i++) {
212 rte_prefetch0(ev[i + 1].mbuf);
213 cq_id = ev[i].queue_id % nb_stages;
214
215 if (cq_id >= last_queue) {
216 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
217
218 pipeline_tx_pkt(ev[i].mbuf);
219 ev[i].op = RTE_EVENT_OP_RELEASE;
220 w->processed_pkts++;
221 continue;
222 }
223
224 ev[i].queue_id += (cq_id == last_queue) ? 1 : 0;
225 pipeline_fwd_event(&ev[i],
226 RTE_SCHED_TYPE_ATOMIC);
227 } else {
228 ev[i].queue_id++;
229 pipeline_fwd_event(&ev[i],
230 sched_type_list[cq_id]);
231 }
232
233 }
234
235 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
236 }
237 return 0;
238 }
239
240 static int
241 pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
242 {
243 PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
244 const uint8_t nb_stages = t->opt->nb_stages + 1;
245 const uint8_t tx_queue = t->tx_service.queue_id;
246
247 while (t->done == false) {
248 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
249 BURST_SIZE, 0);
250
251 if (!nb_rx) {
252 rte_pause();
253 continue;
254 }
255
256 for (i = 0; i < nb_rx; i++) {
257 rte_prefetch0(ev[i + 1].mbuf);
258 cq_id = ev[i].queue_id % nb_stages;
259
260 if (cq_id == last_queue) {
261 ev[i].queue_id = tx_queue;
262 pipeline_fwd_event(&ev[i],
263 RTE_SCHED_TYPE_ATOMIC);
264 w->processed_pkts++;
265 } else {
266 ev[i].queue_id++;
267 pipeline_fwd_event(&ev[i],
268 sched_type_list[cq_id]);
269 }
270 }
271
272 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
273 }
274 return 0;
275 }
276
277 static int
278 worker_wrapper(void *arg)
279 {
280 struct worker_data *w = arg;
281 struct evt_options *opt = w->t->opt;
282 const bool burst = evt_has_burst_mode(w->dev_id);
283 const bool mt_safe = !w->t->mt_unsafe;
284 const uint8_t nb_stages = opt->nb_stages;
285 RTE_SET_USED(opt);
286
287 if (nb_stages == 1) {
288 if (!burst && mt_safe)
289 return pipeline_queue_worker_single_stage_tx(arg);
290 else if (!burst && !mt_safe)
291 return pipeline_queue_worker_single_stage_fwd(arg);
292 else if (burst && mt_safe)
293 return pipeline_queue_worker_single_stage_burst_tx(arg);
294 else if (burst && !mt_safe)
295 return pipeline_queue_worker_single_stage_burst_fwd(
296 arg);
297 } else {
298 if (!burst && mt_safe)
299 return pipeline_queue_worker_multi_stage_tx(arg);
300 else if (!burst && !mt_safe)
301 return pipeline_queue_worker_multi_stage_fwd(arg);
302 else if (burst && mt_safe)
303 return pipeline_queue_worker_multi_stage_burst_tx(arg);
304 else if (burst && !mt_safe)
305 return pipeline_queue_worker_multi_stage_burst_fwd(arg);
306
307 }
308 rte_panic("invalid worker\n");
309 }
310
311 static int
312 pipeline_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
313 {
314 struct test_pipeline *t = evt_test_priv(test);
315
316 if (t->mt_unsafe)
317 rte_service_component_runstate_set(t->tx_service.service_id, 1);
318 return pipeline_launch_lcores(test, opt, worker_wrapper);
319 }
320
321 static int
322 pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
323 {
324 int ret;
325 int nb_ports;
326 int nb_queues;
327 int nb_stages = opt->nb_stages;
328 uint8_t queue;
329 struct rte_event_dev_info info;
330 struct test_pipeline *t = evt_test_priv(test);
331 uint8_t tx_evqueue_id = 0;
332 uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
333 uint8_t nb_worker_queues = 0;
334
335 nb_ports = evt_nr_active_lcores(opt->wlcores);
336 nb_queues = rte_eth_dev_count_avail() * (nb_stages);
337
338 /* Extra port for Tx service. */
339 if (t->mt_unsafe) {
340 tx_evqueue_id = nb_queues;
341 nb_ports++;
342 nb_queues++;
343 } else
344 nb_queues += rte_eth_dev_count_avail();
345
346 rte_event_dev_info_get(opt->dev_id, &info);
347
348 const struct rte_event_dev_config config = {
349 .nb_event_queues = nb_queues,
350 .nb_event_ports = nb_ports,
351 .nb_events_limit = info.max_num_events,
352 .nb_event_queue_flows = opt->nb_flows,
353 .nb_event_port_dequeue_depth =
354 info.max_event_port_dequeue_depth,
355 .nb_event_port_enqueue_depth =
356 info.max_event_port_enqueue_depth,
357 };
358 ret = rte_event_dev_configure(opt->dev_id, &config);
359 if (ret) {
360 evt_err("failed to configure eventdev %d", opt->dev_id);
361 return ret;
362 }
363
364 struct rte_event_queue_conf q_conf = {
365 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
366 .nb_atomic_flows = opt->nb_flows,
367 .nb_atomic_order_sequences = opt->nb_flows,
368 };
369 /* queue configurations */
370 for (queue = 0; queue < nb_queues; queue++) {
371 uint8_t slot;
372
373 if (!t->mt_unsafe) {
374 slot = queue % (nb_stages + 1);
375 q_conf.schedule_type = slot == nb_stages ?
376 RTE_SCHED_TYPE_ATOMIC :
377 opt->sched_type_list[slot];
378 } else {
379 slot = queue % nb_stages;
380
381 if (queue == tx_evqueue_id) {
382 q_conf.schedule_type = RTE_SCHED_TYPE_ATOMIC;
383 q_conf.event_queue_cfg =
384 RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
385 } else {
386 q_conf.schedule_type =
387 opt->sched_type_list[slot];
388 queue_arr[nb_worker_queues] = queue;
389 nb_worker_queues++;
390 }
391 }
392
393 ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
394 if (ret) {
395 evt_err("failed to setup queue=%d", queue);
396 return ret;
397 }
398 }
399
400 if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
401 opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
402
403 /* port configuration */
404 const struct rte_event_port_conf p_conf = {
405 .dequeue_depth = opt->wkr_deq_dep,
406 .enqueue_depth = info.max_event_port_dequeue_depth,
407 .new_event_threshold = info.max_num_events,
408 };
409
410 /*
411 * If tx is multi thread safe then allow workers to do Tx else use Tx
412 * service to Tx packets.
413 */
414 if (t->mt_unsafe) {
415 ret = pipeline_event_port_setup(test, opt, queue_arr,
416 nb_worker_queues, p_conf);
417 if (ret)
418 return ret;
419
420 ret = pipeline_event_tx_service_setup(test, opt, tx_evqueue_id,
421 nb_ports - 1, p_conf);
422
423 } else
424 ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
425 p_conf);
426
427 if (ret)
428 return ret;
429 /*
430 * The pipelines are setup in the following manner:
431 *
432 * eth_dev_count = 2, nb_stages = 2.
433 *
434 * Multi thread safe :
435 * queues = 6
436 * stride = 3
437 *
438 * event queue pipelines:
439 * eth0 -> q0 -> q1 -> (q2->tx)
440 * eth1 -> q3 -> q4 -> (q5->tx)
441 *
442 * q2, q5 configured as ATOMIC
443 *
444 * Multi thread unsafe :
445 * queues = 5
446 * stride = 2
447 *
448 * event queue pipelines:
449 * eth0 -> q0 -> q1
450 * } (q4->tx) Tx service
451 * eth1 -> q2 -> q3
452 *
453 * q4 configured as SINGLE_LINK|ATOMIC
454 */
455 ret = pipeline_event_rx_adapter_setup(opt,
456 t->mt_unsafe ? nb_stages : nb_stages + 1, p_conf);
457 if (ret)
458 return ret;
459
460 if (!evt_has_distributed_sched(opt->dev_id)) {
461 uint32_t service_id;
462 rte_event_dev_service_id_get(opt->dev_id, &service_id);
463 ret = evt_service_setup(service_id);
464 if (ret) {
465 evt_err("No service lcore found to run event dev.");
466 return ret;
467 }
468 }
469
470 ret = rte_event_dev_start(opt->dev_id);
471 if (ret) {
472 evt_err("failed to start eventdev %d", opt->dev_id);
473 return ret;
474 }
475
476 return 0;
477 }
478
479 static void
480 pipeline_queue_opt_dump(struct evt_options *opt)
481 {
482 pipeline_opt_dump(opt, pipeline_queue_nb_event_queues(opt));
483 }
484
485 static int
486 pipeline_queue_opt_check(struct evt_options *opt)
487 {
488 return pipeline_opt_check(opt, pipeline_queue_nb_event_queues(opt));
489 }
490
491 static bool
492 pipeline_queue_capability_check(struct evt_options *opt)
493 {
494 struct rte_event_dev_info dev_info;
495
496 rte_event_dev_info_get(opt->dev_id, &dev_info);
497 if (dev_info.max_event_queues < pipeline_queue_nb_event_queues(opt) ||
498 dev_info.max_event_ports <
499 evt_nr_active_lcores(opt->wlcores)) {
500 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
501 pipeline_queue_nb_event_queues(opt),
502 dev_info.max_event_queues,
503 evt_nr_active_lcores(opt->wlcores),
504 dev_info.max_event_ports);
505 }
506
507 return true;
508 }
509
510 static const struct evt_test_ops pipeline_queue = {
511 .cap_check = pipeline_queue_capability_check,
512 .opt_check = pipeline_queue_opt_check,
513 .opt_dump = pipeline_queue_opt_dump,
514 .test_setup = pipeline_test_setup,
515 .mempool_setup = pipeline_mempool_setup,
516 .ethdev_setup = pipeline_ethdev_setup,
517 .eventdev_setup = pipeline_queue_eventdev_setup,
518 .launch_lcores = pipeline_queue_launch_lcores,
519 .eventdev_destroy = pipeline_eventdev_destroy,
520 .mempool_destroy = pipeline_mempool_destroy,
521 .ethdev_destroy = pipeline_ethdev_destroy,
522 .test_result = pipeline_test_result,
523 .test_destroy = pipeline_test_destroy,
524 };
525
526 EVT_TEST_REGISTER(pipeline_queue);