]> git.proxmox.com Git - mirror_ovs.git/blob - lib/netdev-afxdp.c
netdev-afxdp: Convert AFXDP_DEBUG to custom stats.
[mirror_ovs.git] / lib / netdev-afxdp.c
1 /*
2 * Copyright (c) 2018, 2019 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18
19 #include "netdev-linux-private.h"
20 #include "netdev-linux.h"
21 #include "netdev-afxdp.h"
22 #include "netdev-afxdp-pool.h"
23
24 #include <errno.h>
25 #include <inttypes.h>
26 #include <linux/rtnetlink.h>
27 #include <linux/if_xdp.h>
28 #include <net/if.h>
29 #include <stdlib.h>
30 #include <sys/resource.h>
31 #include <sys/socket.h>
32 #include <sys/types.h>
33 #include <unistd.h>
34
35 #include "coverage.h"
36 #include "dp-packet.h"
37 #include "dpif-netdev.h"
38 #include "fatal-signal.h"
39 #include "openvswitch/compiler.h"
40 #include "openvswitch/dynamic-string.h"
41 #include "openvswitch/list.h"
42 #include "openvswitch/vlog.h"
43 #include "packets.h"
44 #include "socket-util.h"
45 #include "util.h"
46
47 #ifndef SOL_XDP
48 #define SOL_XDP 283
49 #endif
50
51 COVERAGE_DEFINE(afxdp_cq_empty);
52 COVERAGE_DEFINE(afxdp_fq_full);
53 COVERAGE_DEFINE(afxdp_tx_full);
54 COVERAGE_DEFINE(afxdp_cq_skip);
55
56 VLOG_DEFINE_THIS_MODULE(netdev_afxdp);
57
58 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20);
59
60 #define MAX_XSKQ 16
61 #define FRAME_HEADROOM XDP_PACKET_HEADROOM
62 #define OVS_XDP_HEADROOM 128
63 #define FRAME_SIZE XSK_UMEM__DEFAULT_FRAME_SIZE
64 #define FRAME_SHIFT XSK_UMEM__DEFAULT_FRAME_SHIFT
65 #define FRAME_SHIFT_MASK ((1 << FRAME_SHIFT) - 1)
66
67 #define PROD_NUM_DESCS XSK_RING_PROD__DEFAULT_NUM_DESCS
68 #define CONS_NUM_DESCS XSK_RING_CONS__DEFAULT_NUM_DESCS
69
70 /* The worst case is all 4 queues TX/CQ/RX/FILL are full + some packets
71 * still on processing in threads. Number of packets currently in OVS
72 * processing is hard to estimate because it depends on number of ports.
73 * Setting NUM_FRAMES twice as large than total of ring sizes should be
74 * enough for most corner cases.
75 */
76 #define NUM_FRAMES (4 * (PROD_NUM_DESCS + CONS_NUM_DESCS))
77 #define BATCH_SIZE NETDEV_MAX_BURST
78
79 BUILD_ASSERT_DECL(IS_POW2(NUM_FRAMES));
80 BUILD_ASSERT_DECL(PROD_NUM_DESCS == CONS_NUM_DESCS);
81
82 #define UMEM2DESC(elem, base) ((uint64_t)((char *)elem - (char *)base))
83
84 static struct xsk_socket_info *xsk_configure(int ifindex, int xdp_queue_id,
85 int mode);
86 static void xsk_remove_xdp_program(uint32_t ifindex, int xdpmode);
87 static void xsk_destroy(struct xsk_socket_info *xsk);
88 static int xsk_configure_all(struct netdev *netdev);
89 static void xsk_destroy_all(struct netdev *netdev);
90
91 struct unused_pool {
92 struct xsk_umem_info *umem_info;
93 int lost_in_rings; /* Number of packets left in tx, rx, cq and fq. */
94 struct ovs_list list_node;
95 };
96
97 static struct ovs_mutex unused_pools_mutex = OVS_MUTEX_INITIALIZER;
98 static struct ovs_list unused_pools OVS_GUARDED_BY(unused_pools_mutex) =
99 OVS_LIST_INITIALIZER(&unused_pools);
100
101 struct xsk_umem_info {
102 struct umem_pool mpool;
103 struct xpacket_pool xpool;
104 struct xsk_ring_prod fq;
105 struct xsk_ring_cons cq;
106 struct xsk_umem *umem;
107 void *buffer;
108 };
109
110 struct xsk_socket_info {
111 struct xsk_ring_cons rx;
112 struct xsk_ring_prod tx;
113 struct xsk_umem_info *umem;
114 struct xsk_socket *xsk;
115 uint32_t outstanding_tx; /* Number of descriptors filled in tx and cq. */
116 uint32_t available_rx; /* Number of descriptors filled in rx and fq. */
117 atomic_uint64_t tx_dropped;
118 };
119
120 static void
121 netdev_afxdp_cleanup_unused_pool(struct unused_pool *pool)
122 {
123 /* Free the packet buffer. */
124 free_pagealign(pool->umem_info->buffer);
125
126 /* Cleanup umem pool. */
127 umem_pool_cleanup(&pool->umem_info->mpool);
128
129 /* Cleanup metadata pool. */
130 xpacket_pool_cleanup(&pool->umem_info->xpool);
131
132 free(pool->umem_info);
133 }
134
135 static void
136 netdev_afxdp_sweep_unused_pools(void *aux OVS_UNUSED)
137 {
138 struct unused_pool *pool, *next;
139 unsigned int count;
140
141 ovs_mutex_lock(&unused_pools_mutex);
142 LIST_FOR_EACH_SAFE (pool, next, list_node, &unused_pools) {
143
144 count = umem_pool_count(&pool->umem_info->mpool);
145 ovs_assert(count + pool->lost_in_rings <= NUM_FRAMES);
146
147 if (count + pool->lost_in_rings == NUM_FRAMES) {
148 /* OVS doesn't use this memory pool anymore. Kernel doesn't
149 * use it since closing the xdp socket. So, it's safe to free
150 * the pool now. */
151 VLOG_DBG("Freeing umem pool at 0x%"PRIxPTR,
152 (uintptr_t) pool->umem_info);
153 ovs_list_remove(&pool->list_node);
154 netdev_afxdp_cleanup_unused_pool(pool);
155 free(pool);
156 }
157 }
158 ovs_mutex_unlock(&unused_pools_mutex);
159 }
160
161 static struct xsk_umem_info *
162 xsk_configure_umem(void *buffer, uint64_t size, int xdpmode)
163 {
164 struct xsk_umem_config uconfig;
165 struct xsk_umem_info *umem;
166 int ret;
167 int i;
168
169 umem = xzalloc(sizeof *umem);
170
171 uconfig.fill_size = PROD_NUM_DESCS;
172 uconfig.comp_size = CONS_NUM_DESCS;
173 uconfig.frame_size = FRAME_SIZE;
174 uconfig.frame_headroom = OVS_XDP_HEADROOM;
175
176 ret = xsk_umem__create(&umem->umem, buffer, size, &umem->fq, &umem->cq,
177 &uconfig);
178 if (ret) {
179 VLOG_ERR("xsk_umem__create failed (%s) mode: %s",
180 ovs_strerror(errno),
181 xdpmode == XDP_COPY ? "SKB": "DRV");
182 free(umem);
183 return NULL;
184 }
185
186 umem->buffer = buffer;
187
188 /* Set-up umem pool. */
189 if (umem_pool_init(&umem->mpool, NUM_FRAMES) < 0) {
190 VLOG_ERR("umem_pool_init failed");
191 if (xsk_umem__delete(umem->umem)) {
192 VLOG_ERR("xsk_umem__delete failed");
193 }
194 free(umem);
195 return NULL;
196 }
197
198 for (i = NUM_FRAMES - 1; i >= 0; i--) {
199 void *elem;
200
201 elem = ALIGNED_CAST(void *, (char *)umem->buffer + i * FRAME_SIZE);
202 umem_elem_push(&umem->mpool, elem);
203 }
204
205 /* Set-up metadata. */
206 if (xpacket_pool_init(&umem->xpool, NUM_FRAMES) < 0) {
207 VLOG_ERR("xpacket_pool_init failed");
208 umem_pool_cleanup(&umem->mpool);
209 if (xsk_umem__delete(umem->umem)) {
210 VLOG_ERR("xsk_umem__delete failed");
211 }
212 free(umem);
213 return NULL;
214 }
215
216 VLOG_DBG("%s: xpacket pool from %p to %p", __func__,
217 umem->xpool.array,
218 (char *)umem->xpool.array +
219 NUM_FRAMES * sizeof(struct dp_packet_afxdp));
220
221 for (i = NUM_FRAMES - 1; i >= 0; i--) {
222 struct dp_packet_afxdp *xpacket;
223 struct dp_packet *packet;
224
225 xpacket = &umem->xpool.array[i];
226 xpacket->mpool = &umem->mpool;
227
228 packet = &xpacket->packet;
229 packet->source = DPBUF_AFXDP;
230 }
231
232 return umem;
233 }
234
235 static struct xsk_socket_info *
236 xsk_configure_socket(struct xsk_umem_info *umem, uint32_t ifindex,
237 uint32_t queue_id, int xdpmode)
238 {
239 struct xsk_socket_config cfg;
240 struct xsk_socket_info *xsk;
241 char devname[IF_NAMESIZE];
242 uint32_t idx = 0, prog_id;
243 int ret;
244 int i;
245
246 xsk = xzalloc(sizeof *xsk);
247 xsk->umem = umem;
248 cfg.rx_size = CONS_NUM_DESCS;
249 cfg.tx_size = PROD_NUM_DESCS;
250 cfg.libbpf_flags = 0;
251
252 if (xdpmode == XDP_ZEROCOPY) {
253 cfg.bind_flags = XDP_ZEROCOPY;
254 cfg.xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST | XDP_FLAGS_DRV_MODE;
255 } else {
256 cfg.bind_flags = XDP_COPY;
257 cfg.xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST | XDP_FLAGS_SKB_MODE;
258 }
259
260 if (if_indextoname(ifindex, devname) == NULL) {
261 VLOG_ERR("ifindex %d to devname failed (%s)",
262 ifindex, ovs_strerror(errno));
263 free(xsk);
264 return NULL;
265 }
266
267 ret = xsk_socket__create(&xsk->xsk, devname, queue_id, umem->umem,
268 &xsk->rx, &xsk->tx, &cfg);
269 if (ret) {
270 VLOG_ERR("xsk_socket__create failed (%s) mode: %s qid: %d",
271 ovs_strerror(errno),
272 xdpmode == XDP_COPY ? "SKB": "DRV",
273 queue_id);
274 free(xsk);
275 return NULL;
276 }
277
278 /* Make sure the built-in AF_XDP program is loaded. */
279 ret = bpf_get_link_xdp_id(ifindex, &prog_id, cfg.xdp_flags);
280 if (ret) {
281 VLOG_ERR("Get XDP prog ID failed (%s)", ovs_strerror(errno));
282 xsk_socket__delete(xsk->xsk);
283 free(xsk);
284 return NULL;
285 }
286
287 while (!xsk_ring_prod__reserve(&xsk->umem->fq,
288 PROD_NUM_DESCS, &idx)) {
289 VLOG_WARN_RL(&rl, "Retry xsk_ring_prod__reserve to FILL queue");
290 }
291
292 for (i = 0;
293 i < PROD_NUM_DESCS * FRAME_SIZE;
294 i += FRAME_SIZE) {
295 void *elem;
296 uint64_t addr;
297
298 elem = umem_elem_pop(&xsk->umem->mpool);
299 addr = UMEM2DESC(elem, xsk->umem->buffer);
300
301 *xsk_ring_prod__fill_addr(&xsk->umem->fq, idx++) = addr;
302 }
303
304 xsk_ring_prod__submit(&xsk->umem->fq,
305 PROD_NUM_DESCS);
306 return xsk;
307 }
308
309 static struct xsk_socket_info *
310 xsk_configure(int ifindex, int xdp_queue_id, int xdpmode)
311 {
312 struct xsk_socket_info *xsk;
313 struct xsk_umem_info *umem;
314 void *bufs;
315
316 netdev_afxdp_sweep_unused_pools(NULL);
317
318 /* Umem memory region. */
319 bufs = xmalloc_pagealign(NUM_FRAMES * FRAME_SIZE);
320 memset(bufs, 0, NUM_FRAMES * FRAME_SIZE);
321
322 /* Create AF_XDP socket. */
323 umem = xsk_configure_umem(bufs,
324 NUM_FRAMES * FRAME_SIZE,
325 xdpmode);
326 if (!umem) {
327 free_pagealign(bufs);
328 return NULL;
329 }
330
331 VLOG_DBG("Allocated umem pool at 0x%"PRIxPTR, (uintptr_t) umem);
332
333 xsk = xsk_configure_socket(umem, ifindex, xdp_queue_id, xdpmode);
334 if (!xsk) {
335 /* Clean up umem and xpacket pool. */
336 if (xsk_umem__delete(umem->umem)) {
337 VLOG_ERR("xsk_umem__delete failed.");
338 }
339 free_pagealign(bufs);
340 umem_pool_cleanup(&umem->mpool);
341 xpacket_pool_cleanup(&umem->xpool);
342 free(umem);
343 }
344 return xsk;
345 }
346
347 static int
348 xsk_configure_all(struct netdev *netdev)
349 {
350 struct netdev_linux *dev = netdev_linux_cast(netdev);
351 struct xsk_socket_info *xsk_info;
352 int i, ifindex, n_rxq, n_txq;
353
354 ifindex = linux_get_ifindex(netdev_get_name(netdev));
355
356 ovs_assert(dev->xsks == NULL);
357 ovs_assert(dev->tx_locks == NULL);
358
359 n_rxq = netdev_n_rxq(netdev);
360 dev->xsks = xcalloc(n_rxq, sizeof *dev->xsks);
361
362 /* Configure each queue. */
363 for (i = 0; i < n_rxq; i++) {
364 VLOG_INFO("%s: configure queue %d mode %s", __func__, i,
365 dev->xdpmode == XDP_COPY ? "SKB" : "DRV");
366 xsk_info = xsk_configure(ifindex, i, dev->xdpmode);
367 if (!xsk_info) {
368 VLOG_ERR("Failed to create AF_XDP socket on queue %d.", i);
369 dev->xsks[i] = NULL;
370 goto err;
371 }
372 dev->xsks[i] = xsk_info;
373 atomic_init(&xsk_info->tx_dropped, 0);
374 xsk_info->outstanding_tx = 0;
375 xsk_info->available_rx = PROD_NUM_DESCS;
376 }
377
378 n_txq = netdev_n_txq(netdev);
379 dev->tx_locks = xcalloc(n_txq, sizeof *dev->tx_locks);
380
381 for (i = 0; i < n_txq; i++) {
382 ovs_spin_init(&dev->tx_locks[i]);
383 }
384
385 return 0;
386
387 err:
388 xsk_destroy_all(netdev);
389 return EINVAL;
390 }
391
392 static void
393 xsk_destroy(struct xsk_socket_info *xsk_info)
394 {
395 struct xsk_umem *umem;
396 struct unused_pool *pool;
397
398 xsk_socket__delete(xsk_info->xsk);
399 xsk_info->xsk = NULL;
400
401 umem = xsk_info->umem->umem;
402 if (xsk_umem__delete(umem)) {
403 VLOG_ERR("xsk_umem__delete failed.");
404 }
405
406 pool = xzalloc(sizeof *pool);
407 pool->umem_info = xsk_info->umem;
408 pool->lost_in_rings = xsk_info->outstanding_tx + xsk_info->available_rx;
409
410 ovs_mutex_lock(&unused_pools_mutex);
411 ovs_list_push_back(&unused_pools, &pool->list_node);
412 ovs_mutex_unlock(&unused_pools_mutex);
413
414 free(xsk_info);
415
416 netdev_afxdp_sweep_unused_pools(NULL);
417 }
418
419 static void
420 xsk_destroy_all(struct netdev *netdev)
421 {
422 struct netdev_linux *dev = netdev_linux_cast(netdev);
423 int i, ifindex;
424
425 if (dev->xsks) {
426 for (i = 0; i < netdev_n_rxq(netdev); i++) {
427 if (dev->xsks[i]) {
428 xsk_destroy(dev->xsks[i]);
429 dev->xsks[i] = NULL;
430 VLOG_INFO("Destroyed xsk[%d].", i);
431 }
432 }
433
434 free(dev->xsks);
435 dev->xsks = NULL;
436 }
437
438 VLOG_INFO("%s: Removing xdp program.", netdev_get_name(netdev));
439 ifindex = linux_get_ifindex(netdev_get_name(netdev));
440 xsk_remove_xdp_program(ifindex, dev->xdpmode);
441
442 if (dev->tx_locks) {
443 for (i = 0; i < netdev_n_txq(netdev); i++) {
444 ovs_spin_destroy(&dev->tx_locks[i]);
445 }
446 free(dev->tx_locks);
447 dev->tx_locks = NULL;
448 }
449 }
450
451 int
452 netdev_afxdp_set_config(struct netdev *netdev, const struct smap *args,
453 char **errp OVS_UNUSED)
454 {
455 struct netdev_linux *dev = netdev_linux_cast(netdev);
456 const char *str_xdpmode;
457 int xdpmode, new_n_rxq;
458
459 ovs_mutex_lock(&dev->mutex);
460 new_n_rxq = MAX(smap_get_int(args, "n_rxq", NR_QUEUE), 1);
461 if (new_n_rxq > MAX_XSKQ) {
462 ovs_mutex_unlock(&dev->mutex);
463 VLOG_ERR("%s: Too big 'n_rxq' (%d > %d).",
464 netdev_get_name(netdev), new_n_rxq, MAX_XSKQ);
465 return EINVAL;
466 }
467
468 str_xdpmode = smap_get_def(args, "xdpmode", "skb");
469 if (!strcasecmp(str_xdpmode, "drv")) {
470 xdpmode = XDP_ZEROCOPY;
471 } else if (!strcasecmp(str_xdpmode, "skb")) {
472 xdpmode = XDP_COPY;
473 } else {
474 VLOG_ERR("%s: Incorrect xdpmode (%s).",
475 netdev_get_name(netdev), str_xdpmode);
476 ovs_mutex_unlock(&dev->mutex);
477 return EINVAL;
478 }
479
480 if (dev->requested_n_rxq != new_n_rxq
481 || dev->requested_xdpmode != xdpmode) {
482 dev->requested_n_rxq = new_n_rxq;
483 dev->requested_xdpmode = xdpmode;
484 netdev_request_reconfigure(netdev);
485 }
486 ovs_mutex_unlock(&dev->mutex);
487 return 0;
488 }
489
490 int
491 netdev_afxdp_get_config(const struct netdev *netdev, struct smap *args)
492 {
493 struct netdev_linux *dev = netdev_linux_cast(netdev);
494
495 ovs_mutex_lock(&dev->mutex);
496 smap_add_format(args, "n_rxq", "%d", netdev->n_rxq);
497 smap_add_format(args, "xdpmode", "%s",
498 dev->xdpmode == XDP_ZEROCOPY ? "drv" : "skb");
499 ovs_mutex_unlock(&dev->mutex);
500 return 0;
501 }
502
503 int
504 netdev_afxdp_reconfigure(struct netdev *netdev)
505 {
506 struct netdev_linux *dev = netdev_linux_cast(netdev);
507 struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY};
508 int err = 0;
509
510 ovs_mutex_lock(&dev->mutex);
511
512 if (netdev->n_rxq == dev->requested_n_rxq
513 && dev->xdpmode == dev->requested_xdpmode
514 && dev->xsks) {
515 goto out;
516 }
517
518 xsk_destroy_all(netdev);
519
520 netdev->n_rxq = dev->requested_n_rxq;
521 netdev->n_txq = netdev->n_rxq;
522
523 if (dev->requested_xdpmode == XDP_ZEROCOPY) {
524 dev->xdpmode = XDP_ZEROCOPY;
525 VLOG_INFO("AF_XDP device %s in DRV mode.", netdev_get_name(netdev));
526 if (setrlimit(RLIMIT_MEMLOCK, &r)) {
527 VLOG_ERR("ERROR: setrlimit(RLIMIT_MEMLOCK): %s",
528 ovs_strerror(errno));
529 }
530 } else {
531 dev->xdpmode = XDP_COPY;
532 VLOG_INFO("AF_XDP device %s in SKB mode.", netdev_get_name(netdev));
533 /* TODO: set rlimit back to previous value
534 * when no device is in DRV mode.
535 */
536 }
537
538 err = xsk_configure_all(netdev);
539 if (err) {
540 VLOG_ERR("AF_XDP device %s reconfig failed.", netdev_get_name(netdev));
541 }
542 netdev_change_seq_changed(netdev);
543 out:
544 ovs_mutex_unlock(&dev->mutex);
545 return err;
546 }
547
548 int
549 netdev_afxdp_get_numa_id(const struct netdev *netdev)
550 {
551 /* FIXME: Get netdev's PCIe device ID, then find
552 * its NUMA node id.
553 */
554 VLOG_INFO("FIXME: Device %s always use numa id 0.",
555 netdev_get_name(netdev));
556 return 0;
557 }
558
559 static void
560 xsk_remove_xdp_program(uint32_t ifindex, int xdpmode)
561 {
562 uint32_t flags;
563
564 flags = XDP_FLAGS_UPDATE_IF_NOEXIST;
565
566 if (xdpmode == XDP_COPY) {
567 flags |= XDP_FLAGS_SKB_MODE;
568 } else if (xdpmode == XDP_ZEROCOPY) {
569 flags |= XDP_FLAGS_DRV_MODE;
570 }
571
572 bpf_set_link_xdp_fd(ifindex, -1, flags);
573 }
574
575 void
576 signal_remove_xdp(struct netdev *netdev)
577 {
578 struct netdev_linux *dev = netdev_linux_cast(netdev);
579 int ifindex;
580
581 ifindex = linux_get_ifindex(netdev_get_name(netdev));
582
583 VLOG_WARN("Force removing xdp program.");
584 xsk_remove_xdp_program(ifindex, dev->xdpmode);
585 }
586
587 static struct dp_packet_afxdp *
588 dp_packet_cast_afxdp(const struct dp_packet *d)
589 {
590 ovs_assert(d->source == DPBUF_AFXDP);
591 return CONTAINER_OF(d, struct dp_packet_afxdp, packet);
592 }
593
594 static inline void
595 prepare_fill_queue(struct xsk_socket_info *xsk_info)
596 {
597 struct xsk_umem_info *umem;
598 void *elems[BATCH_SIZE];
599 unsigned int idx_fq;
600 int i, ret;
601
602 umem = xsk_info->umem;
603
604 if (xsk_prod_nb_free(&umem->fq, BATCH_SIZE) < BATCH_SIZE) {
605 return;
606 }
607
608 ret = umem_elem_pop_n(&umem->mpool, BATCH_SIZE, elems);
609 if (OVS_UNLIKELY(ret)) {
610 return;
611 }
612
613 if (!xsk_ring_prod__reserve(&umem->fq, BATCH_SIZE, &idx_fq)) {
614 umem_elem_push_n(&umem->mpool, BATCH_SIZE, elems);
615 COVERAGE_INC(afxdp_fq_full);
616 return;
617 }
618
619 for (i = 0; i < BATCH_SIZE; i++) {
620 uint64_t index;
621 void *elem;
622
623 elem = elems[i];
624 index = (uint64_t)((char *)elem - (char *)umem->buffer);
625 ovs_assert((index & FRAME_SHIFT_MASK) == 0);
626 *xsk_ring_prod__fill_addr(&umem->fq, idx_fq) = index;
627
628 idx_fq++;
629 }
630 xsk_ring_prod__submit(&umem->fq, BATCH_SIZE);
631 xsk_info->available_rx += BATCH_SIZE;
632 }
633
634 int
635 netdev_afxdp_rxq_recv(struct netdev_rxq *rxq_, struct dp_packet_batch *batch,
636 int *qfill)
637 {
638 struct netdev_rxq_linux *rx = netdev_rxq_linux_cast(rxq_);
639 struct netdev *netdev = rx->up.netdev;
640 struct netdev_linux *dev = netdev_linux_cast(netdev);
641 struct xsk_socket_info *xsk_info;
642 struct xsk_umem_info *umem;
643 uint32_t idx_rx = 0;
644 int qid = rxq_->queue_id;
645 unsigned int rcvd, i;
646
647 xsk_info = dev->xsks[qid];
648 if (!xsk_info || !xsk_info->xsk) {
649 return EAGAIN;
650 }
651
652 prepare_fill_queue(xsk_info);
653
654 umem = xsk_info->umem;
655 rx->fd = xsk_socket__fd(xsk_info->xsk);
656
657 rcvd = xsk_ring_cons__peek(&xsk_info->rx, BATCH_SIZE, &idx_rx);
658 if (!rcvd) {
659 return EAGAIN;
660 }
661
662 /* Setup a dp_packet batch from descriptors in RX queue. */
663 for (i = 0; i < rcvd; i++) {
664 struct dp_packet_afxdp *xpacket;
665 const struct xdp_desc *desc;
666 struct dp_packet *packet;
667 uint64_t addr, index;
668 uint32_t len;
669 char *pkt;
670
671 desc = xsk_ring_cons__rx_desc(&xsk_info->rx, idx_rx);
672 addr = desc->addr;
673 len = desc->len;
674
675 pkt = xsk_umem__get_data(umem->buffer, addr);
676 index = addr >> FRAME_SHIFT;
677 xpacket = &umem->xpool.array[index];
678 packet = &xpacket->packet;
679
680 /* Initialize the struct dp_packet. */
681 dp_packet_use_afxdp(packet, pkt,
682 FRAME_SIZE - FRAME_HEADROOM,
683 OVS_XDP_HEADROOM);
684 dp_packet_set_size(packet, len);
685
686 /* Add packet into batch, increase batch->count. */
687 dp_packet_batch_add(batch, packet);
688
689 idx_rx++;
690 }
691 /* Release the RX queue. */
692 xsk_ring_cons__release(&xsk_info->rx, rcvd);
693 xsk_info->available_rx -= rcvd;
694
695 if (qfill) {
696 /* TODO: return the number of remaining packets in the queue. */
697 *qfill = 0;
698 }
699 return 0;
700 }
701
702 static inline int
703 kick_tx(struct xsk_socket_info *xsk_info, int xdpmode)
704 {
705 int ret, retries;
706 static const int KERNEL_TX_BATCH_SIZE = 16;
707
708 /* In SKB_MODE packet transmission is synchronous, and the kernel xmits
709 * only TX_BATCH_SIZE(16) packets for a single sendmsg syscall.
710 * So, we have to kick the kernel (n_packets / 16) times to be sure that
711 * all packets are transmitted. */
712 retries = (xdpmode == XDP_COPY)
713 ? xsk_info->outstanding_tx / KERNEL_TX_BATCH_SIZE
714 : 0;
715 kick_retry:
716 /* This causes system call into kernel's xsk_sendmsg, and
717 * xsk_generic_xmit (skb mode) or xsk_async_xmit (driver mode).
718 */
719 ret = sendto(xsk_socket__fd(xsk_info->xsk), NULL, 0, MSG_DONTWAIT,
720 NULL, 0);
721 if (ret < 0) {
722 if (retries-- && errno == EAGAIN) {
723 goto kick_retry;
724 }
725 if (errno == ENXIO || errno == ENOBUFS || errno == EOPNOTSUPP) {
726 return errno;
727 }
728 }
729 /* No error, or EBUSY, or too many retries on EAGAIN. */
730 return 0;
731 }
732
733 void
734 free_afxdp_buf(struct dp_packet *p)
735 {
736 struct dp_packet_afxdp *xpacket;
737 uintptr_t addr;
738
739 xpacket = dp_packet_cast_afxdp(p);
740 if (xpacket->mpool) {
741 void *base = dp_packet_base(p);
742
743 addr = (uintptr_t)base & (~FRAME_SHIFT_MASK);
744 umem_elem_push(xpacket->mpool, (void *)addr);
745 }
746 }
747
748 static void
749 free_afxdp_buf_batch(struct dp_packet_batch *batch)
750 {
751 struct dp_packet_afxdp *xpacket = NULL;
752 struct dp_packet *packet;
753 void *elems[BATCH_SIZE];
754 uintptr_t addr;
755
756 DP_PACKET_BATCH_FOR_EACH (i, packet, batch) {
757 void *base;
758
759 xpacket = dp_packet_cast_afxdp(packet);
760 base = dp_packet_base(packet);
761 addr = (uintptr_t)base & (~FRAME_SHIFT_MASK);
762 elems[i] = (void *)addr;
763 }
764 umem_elem_push_n(xpacket->mpool, batch->count, elems);
765 dp_packet_batch_init(batch);
766 }
767
768 static inline bool
769 check_free_batch(struct dp_packet_batch *batch)
770 {
771 struct umem_pool *first_mpool = NULL;
772 struct dp_packet_afxdp *xpacket;
773 struct dp_packet *packet;
774
775 DP_PACKET_BATCH_FOR_EACH (i, packet, batch) {
776 if (packet->source != DPBUF_AFXDP) {
777 return false;
778 }
779 xpacket = dp_packet_cast_afxdp(packet);
780 if (i == 0) {
781 first_mpool = xpacket->mpool;
782 continue;
783 }
784 if (xpacket->mpool != first_mpool) {
785 return false;
786 }
787 }
788 /* All packets are DPBUF_AFXDP and from the same mpool. */
789 return true;
790 }
791
792 static inline void
793 afxdp_complete_tx(struct xsk_socket_info *xsk_info)
794 {
795 void *elems_push[BATCH_SIZE];
796 struct xsk_umem_info *umem;
797 uint32_t idx_cq = 0;
798 int tx_to_free = 0;
799 int tx_done, j;
800
801 umem = xsk_info->umem;
802 tx_done = xsk_ring_cons__peek(&umem->cq, CONS_NUM_DESCS, &idx_cq);
803
804 /* Recycle back to umem pool. */
805 for (j = 0; j < tx_done; j++) {
806 uint64_t *addr;
807 void *elem;
808
809 addr = (uint64_t *)xsk_ring_cons__comp_addr(&umem->cq, idx_cq++);
810 if (*addr == UINT64_MAX) {
811 /* The elem has been pushed already. */
812 COVERAGE_INC(afxdp_cq_skip);
813 continue;
814 }
815 elem = ALIGNED_CAST(void *, (char *)umem->buffer + *addr);
816 elems_push[tx_to_free] = elem;
817 *addr = UINT64_MAX; /* Mark as pushed. */
818 tx_to_free++;
819
820 if (tx_to_free == BATCH_SIZE || j == tx_done - 1) {
821 umem_elem_push_n(&umem->mpool, tx_to_free, elems_push);
822 xsk_info->outstanding_tx -= tx_to_free;
823 tx_to_free = 0;
824 }
825 }
826
827 if (tx_done > 0) {
828 xsk_ring_cons__release(&umem->cq, tx_done);
829 } else {
830 COVERAGE_INC(afxdp_cq_empty);
831 }
832 }
833
834 static inline int
835 __netdev_afxdp_batch_send(struct netdev *netdev, int qid,
836 struct dp_packet_batch *batch)
837 {
838 struct netdev_linux *dev = netdev_linux_cast(netdev);
839 struct xsk_socket_info *xsk_info;
840 void *elems_pop[BATCH_SIZE];
841 struct xsk_umem_info *umem;
842 struct dp_packet *packet;
843 bool free_batch = false;
844 unsigned long orig;
845 uint32_t idx = 0;
846 int error = 0;
847 int ret;
848
849 xsk_info = dev->xsks[qid];
850 if (!xsk_info || !xsk_info->xsk) {
851 goto out;
852 }
853
854 afxdp_complete_tx(xsk_info);
855
856 free_batch = check_free_batch(batch);
857
858 umem = xsk_info->umem;
859 ret = umem_elem_pop_n(&umem->mpool, batch->count, elems_pop);
860 if (OVS_UNLIKELY(ret)) {
861 atomic_add_relaxed(&xsk_info->tx_dropped, batch->count, &orig);
862 VLOG_WARN_RL(&rl, "%s: send failed due to exhausted memory pool.",
863 netdev_get_name(netdev));
864 error = ENOMEM;
865 goto out;
866 }
867
868 /* Make sure we have enough TX descs. */
869 ret = xsk_ring_prod__reserve(&xsk_info->tx, batch->count, &idx);
870 if (OVS_UNLIKELY(ret == 0)) {
871 umem_elem_push_n(&umem->mpool, batch->count, elems_pop);
872 atomic_add_relaxed(&xsk_info->tx_dropped, batch->count, &orig);
873 COVERAGE_INC(afxdp_tx_full);
874 afxdp_complete_tx(xsk_info);
875 kick_tx(xsk_info, dev->xdpmode);
876 error = ENOMEM;
877 goto out;
878 }
879
880 DP_PACKET_BATCH_FOR_EACH (i, packet, batch) {
881 uint64_t index;
882 void *elem;
883
884 elem = elems_pop[i];
885 /* Copy the packet to the umem we just pop from umem pool.
886 * TODO: avoid this copy if the packet and the pop umem
887 * are located in the same umem.
888 */
889 memcpy(elem, dp_packet_data(packet), dp_packet_size(packet));
890
891 index = (uint64_t)((char *)elem - (char *)umem->buffer);
892 xsk_ring_prod__tx_desc(&xsk_info->tx, idx + i)->addr = index;
893 xsk_ring_prod__tx_desc(&xsk_info->tx, idx + i)->len
894 = dp_packet_size(packet);
895 }
896 xsk_ring_prod__submit(&xsk_info->tx, batch->count);
897 xsk_info->outstanding_tx += batch->count;
898
899 ret = kick_tx(xsk_info, dev->xdpmode);
900 if (OVS_UNLIKELY(ret)) {
901 VLOG_WARN_RL(&rl, "%s: error sending AF_XDP packet: %s.",
902 netdev_get_name(netdev), ovs_strerror(ret));
903 }
904
905 out:
906 if (free_batch) {
907 free_afxdp_buf_batch(batch);
908 } else {
909 dp_packet_delete_batch(batch, true);
910 }
911
912 return error;
913 }
914
915 int
916 netdev_afxdp_batch_send(struct netdev *netdev, int qid,
917 struct dp_packet_batch *batch,
918 bool concurrent_txq)
919 {
920 struct netdev_linux *dev;
921 int ret;
922
923 if (concurrent_txq) {
924 dev = netdev_linux_cast(netdev);
925 qid = qid % netdev_n_txq(netdev);
926
927 ovs_spin_lock(&dev->tx_locks[qid]);
928 ret = __netdev_afxdp_batch_send(netdev, qid, batch);
929 ovs_spin_unlock(&dev->tx_locks[qid]);
930 } else {
931 ret = __netdev_afxdp_batch_send(netdev, qid, batch);
932 }
933
934 return ret;
935 }
936
937 int
938 netdev_afxdp_rxq_construct(struct netdev_rxq *rxq_ OVS_UNUSED)
939 {
940 /* Done at reconfigure. */
941 return 0;
942 }
943
944 void
945 netdev_afxdp_rxq_destruct(struct netdev_rxq *rxq_ OVS_UNUSED)
946 {
947 /* Nothing. */
948 }
949
950 int
951 netdev_afxdp_construct(struct netdev *netdev)
952 {
953 struct netdev_linux *dev = netdev_linux_cast(netdev);
954 int ret;
955
956 /* Configure common netdev-linux first. */
957 ret = netdev_linux_construct(netdev);
958 if (ret) {
959 return ret;
960 }
961
962 /* Queues should not be used before the first reconfiguration. Clearing. */
963 netdev->n_rxq = 0;
964 netdev->n_txq = 0;
965 dev->xdpmode = 0;
966
967 dev->requested_n_rxq = NR_QUEUE;
968 dev->requested_xdpmode = XDP_COPY;
969
970 dev->xsks = NULL;
971 dev->tx_locks = NULL;
972
973 netdev_request_reconfigure(netdev);
974 return 0;
975 }
976
977 void
978 netdev_afxdp_destruct(struct netdev *netdev)
979 {
980 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
981 struct netdev_linux *dev = netdev_linux_cast(netdev);
982
983 if (ovsthread_once_start(&once)) {
984 fatal_signal_add_hook(netdev_afxdp_sweep_unused_pools,
985 NULL, NULL, true);
986 ovsthread_once_done(&once);
987 }
988
989 /* Note: tc is by-passed when using drv-mode, but when using
990 * skb-mode, we might need to clean up tc. */
991
992 xsk_destroy_all(netdev);
993 ovs_mutex_destroy(&dev->mutex);
994 }
995
996 int
997 netdev_afxdp_get_custom_stats(const struct netdev *netdev,
998 struct netdev_custom_stats *custom_stats)
999 {
1000 struct netdev_linux *dev = netdev_linux_cast(netdev);
1001 struct xsk_socket_info *xsk_info;
1002 struct xdp_statistics stat;
1003 uint32_t i, c = 0;
1004 socklen_t optlen;
1005
1006 ovs_mutex_lock(&dev->mutex);
1007
1008 #define XDP_CSTATS \
1009 XDP_CSTAT(rx_dropped) \
1010 XDP_CSTAT(rx_invalid_descs) \
1011 XDP_CSTAT(tx_invalid_descs)
1012
1013 #define XDP_CSTAT(NAME) + 1
1014 enum { N_XDP_CSTATS = XDP_CSTATS };
1015 #undef XDP_CSTAT
1016
1017 custom_stats->counters = xcalloc(netdev_n_rxq(netdev) * N_XDP_CSTATS,
1018 sizeof *custom_stats->counters);
1019
1020 /* Account the stats for each xsk. */
1021 for (i = 0; i < netdev_n_rxq(netdev); i++) {
1022 xsk_info = dev->xsks[i];
1023 optlen = sizeof stat;
1024
1025 if (xsk_info && !getsockopt(xsk_socket__fd(xsk_info->xsk), SOL_XDP,
1026 XDP_STATISTICS, &stat, &optlen)) {
1027 #define XDP_CSTAT(NAME) \
1028 snprintf(custom_stats->counters[c].name, \
1029 NETDEV_CUSTOM_STATS_NAME_SIZE, \
1030 "xsk_queue_%d_" #NAME, i); \
1031 custom_stats->counters[c++].value = stat.NAME;
1032 XDP_CSTATS;
1033 #undef XDP_CSTAT
1034 }
1035 }
1036 custom_stats->size = c;
1037 ovs_mutex_unlock(&dev->mutex);
1038
1039 return 0;
1040 }
1041
1042 int
1043 netdev_afxdp_get_stats(const struct netdev *netdev,
1044 struct netdev_stats *stats)
1045 {
1046 struct netdev_linux *dev = netdev_linux_cast(netdev);
1047 struct xsk_socket_info *xsk_info;
1048 struct netdev_stats dev_stats;
1049 int error, i;
1050
1051 ovs_mutex_lock(&dev->mutex);
1052
1053 error = get_stats_via_netlink(netdev, &dev_stats);
1054 if (error) {
1055 VLOG_WARN_RL(&rl, "%s: Error getting AF_XDP statistics.",
1056 netdev_get_name(netdev));
1057 } else {
1058 /* Use kernel netdev's packet and byte counts. */
1059 stats->rx_packets = dev_stats.rx_packets;
1060 stats->rx_bytes = dev_stats.rx_bytes;
1061 stats->tx_packets = dev_stats.tx_packets;
1062 stats->tx_bytes = dev_stats.tx_bytes;
1063
1064 stats->rx_errors += dev_stats.rx_errors;
1065 stats->tx_errors += dev_stats.tx_errors;
1066 stats->rx_dropped += dev_stats.rx_dropped;
1067 stats->tx_dropped += dev_stats.tx_dropped;
1068 stats->multicast += dev_stats.multicast;
1069 stats->collisions += dev_stats.collisions;
1070 stats->rx_length_errors += dev_stats.rx_length_errors;
1071 stats->rx_over_errors += dev_stats.rx_over_errors;
1072 stats->rx_crc_errors += dev_stats.rx_crc_errors;
1073 stats->rx_frame_errors += dev_stats.rx_frame_errors;
1074 stats->rx_fifo_errors += dev_stats.rx_fifo_errors;
1075 stats->rx_missed_errors += dev_stats.rx_missed_errors;
1076 stats->tx_aborted_errors += dev_stats.tx_aborted_errors;
1077 stats->tx_carrier_errors += dev_stats.tx_carrier_errors;
1078 stats->tx_fifo_errors += dev_stats.tx_fifo_errors;
1079 stats->tx_heartbeat_errors += dev_stats.tx_heartbeat_errors;
1080 stats->tx_window_errors += dev_stats.tx_window_errors;
1081
1082 /* Account the dropped in each xsk. */
1083 for (i = 0; i < netdev_n_rxq(netdev); i++) {
1084 xsk_info = dev->xsks[i];
1085 if (xsk_info) {
1086 uint64_t tx_dropped;
1087
1088 atomic_read_relaxed(&xsk_info->tx_dropped, &tx_dropped);
1089 stats->tx_dropped += tx_dropped;
1090 }
1091 }
1092 }
1093 ovs_mutex_unlock(&dev->mutex);
1094
1095 return error;
1096 }