1 /* SPDX-License-Identifier: GPL-2.0 */
2 /* XDP user-space ring structure
3 * Copyright(c) 2018 Intel Corporation.
6 #ifndef _LINUX_XSK_QUEUE_H
7 #define _LINUX_XSK_QUEUE_H
9 #include <linux/types.h>
10 #include <linux/if_xdp.h>
11 #include <net/xdp_sock.h>
14 u32 producer ____cacheline_aligned_in_smp
;
15 u32 consumer ____cacheline_aligned_in_smp
;
19 /* Used for the RX and TX queues for packets */
20 struct xdp_rxtx_ring
{
22 struct xdp_desc desc
[0] ____cacheline_aligned_in_smp
;
25 /* Used for the fill and completion queues for buffers */
26 struct xdp_umem_ring
{
28 u64 desc
[0] ____cacheline_aligned_in_smp
;
39 struct xdp_ring
*ring
;
43 /* The structure of the shared state of the rings are the same as the
44 * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
45 * ring, the kernel is the producer and user space is the consumer. For
46 * the Tx and fill rings, the kernel is the consumer and user space is
51 * if (LOAD ->consumer) { LOAD ->producer
53 * STORE $data LOAD $data
54 * smp_wmb() (B) smp_mb() (D)
55 * STORE ->producer STORE ->consumer
58 * (A) pairs with (D), and (B) pairs with (C).
60 * Starting with (B), it protects the data from being written after
61 * the producer pointer. If this barrier was missing, the consumer
62 * could observe the producer pointer being set and thus load the data
63 * before the producer has written the new data. The consumer would in
64 * this case load the old data.
66 * (C) protects the consumer from speculatively loading the data before
67 * the producer pointer actually has been read. If we do not have this
68 * barrier, some architectures could load old data as speculative loads
69 * are not discarded as the CPU does not know there is a dependency
70 * between ->producer and data.
72 * (A) is a control dependency that separates the load of ->consumer
73 * from the stores of $data. In case ->consumer indicates there is no
74 * room in the buffer to store $data we do not. So no barrier is needed.
76 * (D) protects the load of the data to be observed to happen after the
77 * store of the consumer pointer. If we did not have this memory
78 * barrier, the producer could observe the consumer pointer being set
79 * and overwrite the data with a new value before the consumer got the
80 * chance to read the old value. The consumer would thus miss reading
81 * the old entry and very likely read the new entry twice, once right
82 * now and again after circling through the ring.
85 /* Common functions operating for both RXTX and umem queues */
87 static inline u64
xskq_nb_invalid_descs(struct xsk_queue
*q
)
89 return q
? q
->invalid_descs
: 0;
92 static inline u32
xskq_nb_avail(struct xsk_queue
*q
, u32 dcnt
)
94 u32 entries
= q
->cached_prod
- q
->cons_tail
;
97 /* Refresh the local pointer */
98 q
->cached_prod
= READ_ONCE(q
->ring
->producer
);
99 entries
= q
->cached_prod
- q
->cons_tail
;
102 return (entries
> dcnt
) ? dcnt
: entries
;
105 static inline u32
xskq_nb_free(struct xsk_queue
*q
, u32 dcnt
)
107 u32 free_entries
= q
->nentries
- (q
->cached_prod
- q
->cons_tail
);
109 if (free_entries
>= dcnt
)
112 /* Refresh the local tail pointer */
113 q
->cons_tail
= READ_ONCE(q
->ring
->consumer
);
114 return q
->nentries
- (q
->cached_prod
- q
->cons_tail
);
117 static inline bool xskq_has_addrs(struct xsk_queue
*q
, u32 cnt
)
119 u32 entries
= q
->cached_prod
- q
->cons_tail
;
124 /* Refresh the local pointer. */
125 q
->cached_prod
= READ_ONCE(q
->ring
->producer
);
126 entries
= q
->cached_prod
- q
->cons_tail
;
128 return entries
>= cnt
;
133 static inline bool xskq_crosses_non_contig_pg(struct xdp_umem
*umem
, u64 addr
,
136 bool cross_pg
= (addr
& (PAGE_SIZE
- 1)) + length
> PAGE_SIZE
;
137 bool next_pg_contig
=
138 (unsigned long)umem
->pages
[(addr
>> PAGE_SHIFT
)].addr
&
139 XSK_NEXT_PG_CONTIG_MASK
;
141 return cross_pg
&& !next_pg_contig
;
144 static inline bool xskq_is_valid_addr(struct xsk_queue
*q
, u64 addr
)
146 if (addr
>= q
->size
) {
154 static inline bool xskq_is_valid_addr_unaligned(struct xsk_queue
*q
, u64 addr
,
156 struct xdp_umem
*umem
)
158 u64 base_addr
= xsk_umem_extract_addr(addr
);
160 addr
= xsk_umem_add_offset_to_addr(addr
);
161 if (base_addr
>= q
->size
|| addr
>= q
->size
||
162 xskq_crosses_non_contig_pg(umem
, addr
, length
)) {
170 static inline u64
*xskq_validate_addr(struct xsk_queue
*q
, u64
*addr
,
171 struct xdp_umem
*umem
)
173 while (q
->cons_tail
!= q
->cons_head
) {
174 struct xdp_umem_ring
*ring
= (struct xdp_umem_ring
*)q
->ring
;
175 unsigned int idx
= q
->cons_tail
& q
->ring_mask
;
177 *addr
= READ_ONCE(ring
->desc
[idx
]) & q
->chunk_mask
;
179 if (umem
->flags
& XDP_UMEM_UNALIGNED_CHUNK_FLAG
) {
180 if (xskq_is_valid_addr_unaligned(q
, *addr
,
181 umem
->chunk_size_nohr
,
187 if (xskq_is_valid_addr(q
, *addr
))
197 static inline u64
*xskq_peek_addr(struct xsk_queue
*q
, u64
*addr
,
198 struct xdp_umem
*umem
)
200 if (q
->cons_tail
== q
->cons_head
) {
201 smp_mb(); /* D, matches A */
202 WRITE_ONCE(q
->ring
->consumer
, q
->cons_tail
);
203 q
->cons_head
= q
->cons_tail
+ xskq_nb_avail(q
, 1);
205 /* Order consumer and data */
209 return xskq_validate_addr(q
, addr
, umem
);
212 static inline void xskq_discard_addr(struct xsk_queue
*q
)
217 static inline int xskq_prod_reserve(struct xsk_queue
*q
)
219 if (xskq_nb_free(q
, 1) == 0)
227 static inline int xskq_prod_reserve_addr(struct xsk_queue
*q
, u64 addr
)
229 struct xdp_umem_ring
*ring
= (struct xdp_umem_ring
*)q
->ring
;
231 if (xskq_nb_free(q
, 1) == 0)
235 ring
->desc
[q
->cached_prod
++ & q
->ring_mask
] = addr
;
239 static inline void __xskq_prod_submit(struct xsk_queue
*q
, u32 idx
)
241 /* Order producer and data */
242 smp_wmb(); /* B, matches C */
244 WRITE_ONCE(q
->ring
->producer
, idx
);
247 static inline void xskq_prod_submit(struct xsk_queue
*q
)
249 __xskq_prod_submit(q
, q
->cached_prod
);
252 static inline void xskq_prod_submit_addr(struct xsk_queue
*q
, u64 addr
)
254 struct xdp_umem_ring
*ring
= (struct xdp_umem_ring
*)q
->ring
;
255 u32 idx
= q
->ring
->producer
;
257 ring
->desc
[idx
++ & q
->ring_mask
] = addr
;
259 __xskq_prod_submit(q
, idx
);
262 static inline void xskq_prod_submit_n(struct xsk_queue
*q
, u32 nb_entries
)
264 __xskq_prod_submit(q
, q
->ring
->producer
+ nb_entries
);
269 static inline bool xskq_is_valid_desc(struct xsk_queue
*q
, struct xdp_desc
*d
,
270 struct xdp_umem
*umem
)
272 if (umem
->flags
& XDP_UMEM_UNALIGNED_CHUNK_FLAG
) {
273 if (!xskq_is_valid_addr_unaligned(q
, d
->addr
, d
->len
, umem
))
276 if (d
->len
> umem
->chunk_size_nohr
|| d
->options
) {
284 if (!xskq_is_valid_addr(q
, d
->addr
))
287 if (((d
->addr
+ d
->len
) & q
->chunk_mask
) != (d
->addr
& q
->chunk_mask
) ||
296 static inline struct xdp_desc
*xskq_validate_desc(struct xsk_queue
*q
,
297 struct xdp_desc
*desc
,
298 struct xdp_umem
*umem
)
300 while (q
->cons_tail
!= q
->cons_head
) {
301 struct xdp_rxtx_ring
*ring
= (struct xdp_rxtx_ring
*)q
->ring
;
302 unsigned int idx
= q
->cons_tail
& q
->ring_mask
;
304 *desc
= READ_ONCE(ring
->desc
[idx
]);
305 if (xskq_is_valid_desc(q
, desc
, umem
))
314 static inline struct xdp_desc
*xskq_peek_desc(struct xsk_queue
*q
,
315 struct xdp_desc
*desc
,
316 struct xdp_umem
*umem
)
318 if (q
->cons_tail
== q
->cons_head
) {
319 smp_mb(); /* D, matches A */
320 WRITE_ONCE(q
->ring
->consumer
, q
->cons_tail
);
321 q
->cons_head
= q
->cons_tail
+ xskq_nb_avail(q
, 1);
323 /* Order consumer and data */
324 smp_rmb(); /* C, matches B */
327 return xskq_validate_desc(q
, desc
, umem
);
330 static inline void xskq_discard_desc(struct xsk_queue
*q
)
335 static inline int xskq_prod_reserve_desc(struct xsk_queue
*q
,
338 struct xdp_rxtx_ring
*ring
= (struct xdp_rxtx_ring
*)q
->ring
;
341 if (xskq_nb_free(q
, 1) == 0)
345 idx
= q
->cached_prod
++ & q
->ring_mask
;
346 ring
->desc
[idx
].addr
= addr
;
347 ring
->desc
[idx
].len
= len
;
352 static inline bool xskq_full_desc(struct xsk_queue
*q
)
354 /* No barriers needed since data is not accessed */
355 return READ_ONCE(q
->ring
->producer
) - READ_ONCE(q
->ring
->consumer
) ==
359 static inline bool xskq_prod_is_empty(struct xsk_queue
*q
)
361 /* No barriers needed since data is not accessed */
362 return READ_ONCE(q
->ring
->consumer
) == READ_ONCE(q
->ring
->producer
);
365 void xskq_set_umem(struct xsk_queue
*q
, u64 size
, u64 chunk_mask
);
366 struct xsk_queue
*xskq_create(u32 nentries
, bool umem_queue
);
367 void xskq_destroy(struct xsk_queue
*q_ops
);
369 /* Executed by the core when the entire UMEM gets freed */
370 void xsk_reuseq_destroy(struct xdp_umem
*umem
);
372 #endif /* _LINUX_XSK_QUEUE_H */