]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/dpdk/examples/eventdev_pipeline/main.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / spdk / dpdk / examples / eventdev_pipeline / main.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2016-2017 Intel Corporation
3 */
4
5 #include <getopt.h>
6 #include <stdint.h>
7 #include <stdio.h>
8 #include <signal.h>
9 #include <sched.h>
10
11 #include "pipeline_common.h"
12
13 struct config_data cdata = {
14 .num_packets = (1L << 25), /* do ~32M packets */
15 .num_fids = 512,
16 .queue_type = RTE_SCHED_TYPE_ATOMIC,
17 .next_qid = {-1},
18 .qid = {-1},
19 .num_stages = 1,
20 .worker_cq_depth = 16
21 };
22
23 static bool
24 core_in_use(unsigned int lcore_id) {
25 return (fdata->rx_core[lcore_id] || fdata->sched_core[lcore_id] ||
26 fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
27 }
28
29 /*
30 * Parse the coremask given as argument (hexadecimal string) and fill
31 * the global configuration (core role and core count) with the parsed
32 * value.
33 */
34 static int xdigit2val(unsigned char c)
35 {
36 int val;
37
38 if (isdigit(c))
39 val = c - '0';
40 else if (isupper(c))
41 val = c - 'A' + 10;
42 else
43 val = c - 'a' + 10;
44 return val;
45 }
46
47 static uint64_t
48 parse_coremask(const char *coremask)
49 {
50 int i, j, idx = 0;
51 unsigned int count = 0;
52 char c;
53 int val;
54 uint64_t mask = 0;
55 const int32_t BITS_HEX = 4;
56
57 if (coremask == NULL)
58 return -1;
59 /* Remove all blank characters ahead and after .
60 * Remove 0x/0X if exists.
61 */
62 while (isblank(*coremask))
63 coremask++;
64 if (coremask[0] == '0' && ((coremask[1] == 'x')
65 || (coremask[1] == 'X')))
66 coremask += 2;
67 i = strlen(coremask);
68 while ((i > 0) && isblank(coremask[i - 1]))
69 i--;
70 if (i == 0)
71 return -1;
72
73 for (i = i - 1; i >= 0 && idx < MAX_NUM_CORE; i--) {
74 c = coremask[i];
75 if (isxdigit(c) == 0) {
76 /* invalid characters */
77 return -1;
78 }
79 val = xdigit2val(c);
80 for (j = 0; j < BITS_HEX && idx < MAX_NUM_CORE; j++, idx++) {
81 if ((1 << j) & val) {
82 mask |= (1UL << idx);
83 count++;
84 }
85 }
86 }
87 for (; i >= 0; i--)
88 if (coremask[i] != '0')
89 return -1;
90 if (count == 0)
91 return -1;
92 return mask;
93 }
94
95 static struct option long_options[] = {
96 {"workers", required_argument, 0, 'w'},
97 {"packets", required_argument, 0, 'n'},
98 {"atomic-flows", required_argument, 0, 'f'},
99 {"num_stages", required_argument, 0, 's'},
100 {"rx-mask", required_argument, 0, 'r'},
101 {"tx-mask", required_argument, 0, 't'},
102 {"sched-mask", required_argument, 0, 'e'},
103 {"cq-depth", required_argument, 0, 'c'},
104 {"work-cycles", required_argument, 0, 'W'},
105 {"mempool-size", required_argument, 0, 'm'},
106 {"queue-priority", no_argument, 0, 'P'},
107 {"parallel", no_argument, 0, 'p'},
108 {"ordered", no_argument, 0, 'o'},
109 {"quiet", no_argument, 0, 'q'},
110 {"use-atq", no_argument, 0, 'a'},
111 {"dump", no_argument, 0, 'D'},
112 {0, 0, 0, 0}
113 };
114
115 static void
116 usage(void)
117 {
118 const char *usage_str =
119 " Usage: eventdev_demo [options]\n"
120 " Options:\n"
121 " -n, --packets=N Send N packets (default ~32M), 0 implies no limit\n"
122 " -f, --atomic-flows=N Use N random flows from 1 to N (default 16)\n"
123 " -s, --num_stages=N Use N atomic stages (default 1)\n"
124 " -r, --rx-mask=core mask Run NIC rx on CPUs in core mask\n"
125 " -w, --worker-mask=core mask Run worker on CPUs in core mask\n"
126 " -t, --tx-mask=core mask Run NIC tx on CPUs in core mask\n"
127 " -e --sched-mask=core mask Run scheduler on CPUs in core mask\n"
128 " -c --cq-depth=N Worker CQ depth (default 16)\n"
129 " -W --work-cycles=N Worker cycles (default 0)\n"
130 " -P --queue-priority Enable scheduler queue prioritization\n"
131 " -o, --ordered Use ordered scheduling\n"
132 " -p, --parallel Use parallel scheduling\n"
133 " -q, --quiet Minimize printed output\n"
134 " -a, --use-atq Use all type queues\n"
135 " -m, --mempool-size=N Dictate the mempool size\n"
136 " -D, --dump Print detailed statistics before exit"
137 "\n";
138 fprintf(stderr, "%s", usage_str);
139 exit(1);
140 }
141
142 static void
143 parse_app_args(int argc, char **argv)
144 {
145 /* Parse cli options*/
146 int option_index;
147 int c;
148 opterr = 0;
149 uint64_t rx_lcore_mask = 0;
150 uint64_t tx_lcore_mask = 0;
151 uint64_t sched_lcore_mask = 0;
152 uint64_t worker_lcore_mask = 0;
153 int i;
154
155 for (;;) {
156 c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:m:paoPqDW:",
157 long_options, &option_index);
158 if (c == -1)
159 break;
160
161 int popcnt = 0;
162 switch (c) {
163 case 'n':
164 cdata.num_packets = (int64_t)atol(optarg);
165 if (cdata.num_packets == 0)
166 cdata.num_packets = INT64_MAX;
167 break;
168 case 'f':
169 cdata.num_fids = (unsigned int)atoi(optarg);
170 break;
171 case 's':
172 cdata.num_stages = (unsigned int)atoi(optarg);
173 break;
174 case 'c':
175 cdata.worker_cq_depth = (unsigned int)atoi(optarg);
176 break;
177 case 'W':
178 cdata.worker_cycles = (unsigned int)atoi(optarg);
179 break;
180 case 'P':
181 cdata.enable_queue_priorities = 1;
182 break;
183 case 'o':
184 cdata.queue_type = RTE_SCHED_TYPE_ORDERED;
185 break;
186 case 'p':
187 cdata.queue_type = RTE_SCHED_TYPE_PARALLEL;
188 break;
189 case 'a':
190 cdata.all_type_queues = 1;
191 break;
192 case 'q':
193 cdata.quiet = 1;
194 break;
195 case 'D':
196 cdata.dump_dev = 1;
197 break;
198 case 'w':
199 worker_lcore_mask = parse_coremask(optarg);
200 break;
201 case 'r':
202 rx_lcore_mask = parse_coremask(optarg);
203 popcnt = __builtin_popcountll(rx_lcore_mask);
204 fdata->rx_single = (popcnt == 1);
205 break;
206 case 't':
207 tx_lcore_mask = parse_coremask(optarg);
208 popcnt = __builtin_popcountll(tx_lcore_mask);
209 fdata->tx_single = (popcnt == 1);
210 break;
211 case 'e':
212 sched_lcore_mask = parse_coremask(optarg);
213 popcnt = __builtin_popcountll(sched_lcore_mask);
214 fdata->sched_single = (popcnt == 1);
215 break;
216 case 'm':
217 cdata.num_mbuf = (uint64_t)atol(optarg);
218 break;
219 default:
220 usage();
221 }
222 }
223
224 cdata.worker_lcore_mask = worker_lcore_mask;
225 cdata.sched_lcore_mask = sched_lcore_mask;
226 cdata.rx_lcore_mask = rx_lcore_mask;
227 cdata.tx_lcore_mask = tx_lcore_mask;
228
229 if (cdata.num_stages == 0 || cdata.num_stages > MAX_NUM_STAGES)
230 usage();
231
232 for (i = 0; i < MAX_NUM_CORE; i++) {
233 fdata->rx_core[i] = !!(rx_lcore_mask & (1UL << i));
234 fdata->tx_core[i] = !!(tx_lcore_mask & (1UL << i));
235 fdata->sched_core[i] = !!(sched_lcore_mask & (1UL << i));
236 fdata->worker_core[i] = !!(worker_lcore_mask & (1UL << i));
237
238 if (fdata->worker_core[i])
239 cdata.num_workers++;
240 if (core_in_use(i))
241 cdata.active_cores++;
242 }
243 }
244
245 /*
246 * Initializes a given port using global settings and with the RX buffers
247 * coming from the mbuf_pool passed as a parameter.
248 */
249 static inline int
250 port_init(uint8_t port, struct rte_mempool *mbuf_pool)
251 {
252 struct rte_eth_rxconf rx_conf;
253 static const struct rte_eth_conf port_conf_default = {
254 .rxmode = {
255 .mq_mode = ETH_MQ_RX_RSS,
256 .max_rx_pkt_len = ETHER_MAX_LEN,
257 },
258 .rx_adv_conf = {
259 .rss_conf = {
260 .rss_hf = ETH_RSS_IP |
261 ETH_RSS_TCP |
262 ETH_RSS_UDP,
263 }
264 }
265 };
266 const uint16_t rx_rings = 1, tx_rings = 1;
267 const uint16_t rx_ring_size = 512, tx_ring_size = 512;
268 struct rte_eth_conf port_conf = port_conf_default;
269 int retval;
270 uint16_t q;
271 struct rte_eth_dev_info dev_info;
272 struct rte_eth_txconf txconf;
273
274 if (!rte_eth_dev_is_valid_port(port))
275 return -1;
276
277 rte_eth_dev_info_get(port, &dev_info);
278 if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
279 port_conf.txmode.offloads |=
280 DEV_TX_OFFLOAD_MBUF_FAST_FREE;
281 rx_conf = dev_info.default_rxconf;
282 rx_conf.offloads = port_conf.rxmode.offloads;
283
284 port_conf.rx_adv_conf.rss_conf.rss_hf &=
285 dev_info.flow_type_rss_offloads;
286 if (port_conf.rx_adv_conf.rss_conf.rss_hf !=
287 port_conf_default.rx_adv_conf.rss_conf.rss_hf) {
288 printf("Port %u modified RSS hash function based on hardware support,"
289 "requested:%#"PRIx64" configured:%#"PRIx64"\n",
290 port,
291 port_conf_default.rx_adv_conf.rss_conf.rss_hf,
292 port_conf.rx_adv_conf.rss_conf.rss_hf);
293 }
294
295 /* Configure the Ethernet device. */
296 retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
297 if (retval != 0)
298 return retval;
299
300 /* Allocate and set up 1 RX queue per Ethernet port. */
301 for (q = 0; q < rx_rings; q++) {
302 retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
303 rte_eth_dev_socket_id(port), &rx_conf,
304 mbuf_pool);
305 if (retval < 0)
306 return retval;
307 }
308
309 txconf = dev_info.default_txconf;
310 txconf.offloads = port_conf_default.txmode.offloads;
311 /* Allocate and set up 1 TX queue per Ethernet port. */
312 for (q = 0; q < tx_rings; q++) {
313 retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
314 rte_eth_dev_socket_id(port), &txconf);
315 if (retval < 0)
316 return retval;
317 }
318
319 /* Display the port MAC address. */
320 struct ether_addr addr;
321 rte_eth_macaddr_get(port, &addr);
322 printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
323 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
324 (unsigned int)port,
325 addr.addr_bytes[0], addr.addr_bytes[1],
326 addr.addr_bytes[2], addr.addr_bytes[3],
327 addr.addr_bytes[4], addr.addr_bytes[5]);
328
329 /* Enable RX in promiscuous mode for the Ethernet device. */
330 rte_eth_promiscuous_enable(port);
331
332 return 0;
333 }
334
335 static int
336 init_ports(uint16_t num_ports)
337 {
338 uint16_t portid;
339
340 if (!cdata.num_mbuf)
341 cdata.num_mbuf = 16384 * num_ports;
342
343 struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
344 /* mbufs */ cdata.num_mbuf,
345 /* cache_size */ 512,
346 /* priv_size*/ 0,
347 /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
348 rte_socket_id());
349
350 RTE_ETH_FOREACH_DEV(portid)
351 if (port_init(portid, mp) != 0)
352 rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
353 portid);
354
355 return 0;
356 }
357
358 static void
359 do_capability_setup(uint8_t eventdev_id)
360 {
361 int ret;
362 uint16_t i;
363 uint8_t generic_pipeline = 0;
364 uint8_t burst = 0;
365
366 RTE_ETH_FOREACH_DEV(i) {
367 uint32_t caps = 0;
368
369 ret = rte_event_eth_tx_adapter_caps_get(eventdev_id, i, &caps);
370 if (ret)
371 rte_exit(EXIT_FAILURE,
372 "Invalid capability for Tx adptr port %d\n", i);
373 generic_pipeline |= !(caps &
374 RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT);
375 }
376
377 struct rte_event_dev_info eventdev_info;
378 memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
379
380 rte_event_dev_info_get(eventdev_id, &eventdev_info);
381 burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
382 0;
383
384 if (generic_pipeline)
385 set_worker_generic_setup_data(&fdata->cap, burst);
386 else
387 set_worker_tx_enq_setup_data(&fdata->cap, burst);
388 }
389
390 static void
391 signal_handler(int signum)
392 {
393 static uint8_t once;
394 uint16_t portid;
395
396 if (fdata->done)
397 rte_exit(1, "Exiting on signal %d\n", signum);
398 if ((signum == SIGINT || signum == SIGTERM) && !once) {
399 printf("\n\nSignal %d received, preparing to exit...\n",
400 signum);
401 if (cdata.dump_dev)
402 rte_event_dev_dump(0, stdout);
403 once = 1;
404 fdata->done = 1;
405 rte_smp_wmb();
406
407 RTE_ETH_FOREACH_DEV(portid) {
408 rte_event_eth_rx_adapter_stop(portid);
409 rte_event_eth_tx_adapter_stop(portid);
410 rte_eth_dev_stop(portid);
411 }
412
413 rte_eal_mp_wait_lcore();
414
415 RTE_ETH_FOREACH_DEV(portid) {
416 rte_eth_dev_close(portid);
417 }
418
419 rte_event_dev_stop(0);
420 rte_event_dev_close(0);
421 }
422 if (signum == SIGTSTP)
423 rte_event_dev_dump(0, stdout);
424 }
425
426 static inline uint64_t
427 port_stat(int dev_id, int32_t p)
428 {
429 char statname[64];
430 snprintf(statname, sizeof(statname), "port_%u_rx", p);
431 return rte_event_dev_xstats_by_name_get(dev_id, statname, NULL);
432 }
433
434 int
435 main(int argc, char **argv)
436 {
437 struct worker_data *worker_data;
438 uint16_t num_ports;
439 uint16_t portid;
440 int lcore_id;
441 int err;
442
443 signal(SIGINT, signal_handler);
444 signal(SIGTERM, signal_handler);
445 signal(SIGTSTP, signal_handler);
446
447 err = rte_eal_init(argc, argv);
448 if (err < 0)
449 rte_panic("Invalid EAL arguments\n");
450
451 argc -= err;
452 argv += err;
453
454 fdata = rte_malloc(NULL, sizeof(struct fastpath_data), 0);
455 if (fdata == NULL)
456 rte_panic("Out of memory\n");
457
458 /* Parse cli options*/
459 parse_app_args(argc, argv);
460
461 num_ports = rte_eth_dev_count_avail();
462 if (num_ports == 0)
463 rte_panic("No ethernet ports found\n");
464
465 const unsigned int cores_needed = cdata.active_cores;
466
467 if (!cdata.quiet) {
468 printf(" Config:\n");
469 printf("\tports: %u\n", num_ports);
470 printf("\tworkers: %u\n", cdata.num_workers);
471 printf("\tpackets: %"PRIi64"\n", cdata.num_packets);
472 printf("\tQueue-prio: %u\n", cdata.enable_queue_priorities);
473 if (cdata.queue_type == RTE_SCHED_TYPE_ORDERED)
474 printf("\tqid0 type: ordered\n");
475 if (cdata.queue_type == RTE_SCHED_TYPE_ATOMIC)
476 printf("\tqid0 type: atomic\n");
477 printf("\tCores available: %u\n", rte_lcore_count());
478 printf("\tCores used: %u\n", cores_needed);
479 }
480
481 if (rte_lcore_count() < cores_needed)
482 rte_panic("Too few cores (%d < %d)\n", rte_lcore_count(),
483 cores_needed);
484
485 const unsigned int ndevs = rte_event_dev_count();
486 if (ndevs == 0)
487 rte_panic("No dev_id devs found. Pasl in a --vdev eventdev.\n");
488 if (ndevs > 1)
489 fprintf(stderr, "Warning: More than one eventdev, using idx 0");
490
491
492 do_capability_setup(0);
493 fdata->cap.check_opt();
494
495 worker_data = rte_calloc(0, cdata.num_workers,
496 sizeof(worker_data[0]), 0);
497 if (worker_data == NULL)
498 rte_panic("rte_calloc failed\n");
499
500 int dev_id = fdata->cap.evdev_setup(worker_data);
501 if (dev_id < 0)
502 rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
503
504 init_ports(num_ports);
505 fdata->cap.adptr_setup(num_ports);
506
507 /* Start the Ethernet port. */
508 RTE_ETH_FOREACH_DEV(portid) {
509 err = rte_eth_dev_start(portid);
510 if (err < 0)
511 rte_exit(EXIT_FAILURE, "Error starting ethdev %d\n",
512 portid);
513 }
514
515 int worker_idx = 0;
516 RTE_LCORE_FOREACH_SLAVE(lcore_id) {
517 if (lcore_id >= MAX_NUM_CORE)
518 break;
519
520 if (!fdata->rx_core[lcore_id] &&
521 !fdata->worker_core[lcore_id] &&
522 !fdata->tx_core[lcore_id] &&
523 !fdata->sched_core[lcore_id])
524 continue;
525
526 if (fdata->rx_core[lcore_id])
527 printf(
528 "[%s()] lcore %d executing NIC Rx\n",
529 __func__, lcore_id);
530
531 if (fdata->tx_core[lcore_id])
532 printf(
533 "[%s()] lcore %d executing NIC Tx\n",
534 __func__, lcore_id);
535
536 if (fdata->sched_core[lcore_id])
537 printf("[%s()] lcore %d executing scheduler\n",
538 __func__, lcore_id);
539
540 if (fdata->worker_core[lcore_id])
541 printf(
542 "[%s()] lcore %d executing worker, using eventdev port %u\n",
543 __func__, lcore_id,
544 worker_data[worker_idx].port_id);
545
546 err = rte_eal_remote_launch(fdata->cap.worker,
547 &worker_data[worker_idx], lcore_id);
548 if (err) {
549 rte_panic("Failed to launch worker on core %d\n",
550 lcore_id);
551 continue;
552 }
553 if (fdata->worker_core[lcore_id])
554 worker_idx++;
555 }
556
557 lcore_id = rte_lcore_id();
558
559 if (core_in_use(lcore_id))
560 fdata->cap.worker(&worker_data[worker_idx++]);
561
562 rte_eal_mp_wait_lcore();
563
564 if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) !=
565 (uint64_t)-ENOTSUP)) {
566 printf("\nPort Workload distribution:\n");
567 uint32_t i;
568 uint64_t tot_pkts = 0;
569 uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
570 for (i = 0; i < cdata.num_workers; i++) {
571 pkts_per_wkr[i] =
572 port_stat(dev_id, worker_data[i].port_id);
573 tot_pkts += pkts_per_wkr[i];
574 }
575 for (i = 0; i < cdata.num_workers; i++) {
576 float pc = pkts_per_wkr[i] * 100 /
577 ((float)tot_pkts);
578 printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
579 i, pc, pkts_per_wkr[i]);
580 }
581
582 }
583
584 return 0;
585 }