]> git.proxmox.com Git - mirror_ubuntu-hirsute-kernel.git/blob - net/xdp/xsk_queue.h
xsk: Eliminate the RX batch size
[mirror_ubuntu-hirsute-kernel.git] / net / xdp / xsk_queue.h
1 /* SPDX-License-Identifier: GPL-2.0 */
2 /* XDP user-space ring structure
3 * Copyright(c) 2018 Intel Corporation.
4 */
5
6 #ifndef _LINUX_XSK_QUEUE_H
7 #define _LINUX_XSK_QUEUE_H
8
9 #include <linux/types.h>
10 #include <linux/if_xdp.h>
11 #include <net/xdp_sock.h>
12
13 struct xdp_ring {
14 u32 producer ____cacheline_aligned_in_smp;
15 u32 consumer ____cacheline_aligned_in_smp;
16 u32 flags;
17 };
18
19 /* Used for the RX and TX queues for packets */
20 struct xdp_rxtx_ring {
21 struct xdp_ring ptrs;
22 struct xdp_desc desc[0] ____cacheline_aligned_in_smp;
23 };
24
25 /* Used for the fill and completion queues for buffers */
26 struct xdp_umem_ring {
27 struct xdp_ring ptrs;
28 u64 desc[0] ____cacheline_aligned_in_smp;
29 };
30
31 struct xsk_queue {
32 u64 chunk_mask;
33 u64 size;
34 u32 ring_mask;
35 u32 nentries;
36 u32 cached_prod;
37 u32 cons_head;
38 u32 cons_tail;
39 struct xdp_ring *ring;
40 u64 invalid_descs;
41 };
42
43 /* The structure of the shared state of the rings are the same as the
44 * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
45 * ring, the kernel is the producer and user space is the consumer. For
46 * the Tx and fill rings, the kernel is the consumer and user space is
47 * the producer.
48 *
49 * producer consumer
50 *
51 * if (LOAD ->consumer) { LOAD ->producer
52 * (A) smp_rmb() (C)
53 * STORE $data LOAD $data
54 * smp_wmb() (B) smp_mb() (D)
55 * STORE ->producer STORE ->consumer
56 * }
57 *
58 * (A) pairs with (D), and (B) pairs with (C).
59 *
60 * Starting with (B), it protects the data from being written after
61 * the producer pointer. If this barrier was missing, the consumer
62 * could observe the producer pointer being set and thus load the data
63 * before the producer has written the new data. The consumer would in
64 * this case load the old data.
65 *
66 * (C) protects the consumer from speculatively loading the data before
67 * the producer pointer actually has been read. If we do not have this
68 * barrier, some architectures could load old data as speculative loads
69 * are not discarded as the CPU does not know there is a dependency
70 * between ->producer and data.
71 *
72 * (A) is a control dependency that separates the load of ->consumer
73 * from the stores of $data. In case ->consumer indicates there is no
74 * room in the buffer to store $data we do not. So no barrier is needed.
75 *
76 * (D) protects the load of the data to be observed to happen after the
77 * store of the consumer pointer. If we did not have this memory
78 * barrier, the producer could observe the consumer pointer being set
79 * and overwrite the data with a new value before the consumer got the
80 * chance to read the old value. The consumer would thus miss reading
81 * the old entry and very likely read the new entry twice, once right
82 * now and again after circling through the ring.
83 */
84
85 /* Common functions operating for both RXTX and umem queues */
86
87 static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
88 {
89 return q ? q->invalid_descs : 0;
90 }
91
92 static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt)
93 {
94 u32 entries = q->cached_prod - q->cons_tail;
95
96 if (entries == 0) {
97 /* Refresh the local pointer */
98 q->cached_prod = READ_ONCE(q->ring->producer);
99 entries = q->cached_prod - q->cons_tail;
100 }
101
102 return (entries > dcnt) ? dcnt : entries;
103 }
104
105 static inline u32 xskq_nb_free(struct xsk_queue *q, u32 dcnt)
106 {
107 u32 free_entries = q->nentries - (q->cached_prod - q->cons_tail);
108
109 if (free_entries >= dcnt)
110 return free_entries;
111
112 /* Refresh the local tail pointer */
113 q->cons_tail = READ_ONCE(q->ring->consumer);
114 return q->nentries - (q->cached_prod - q->cons_tail);
115 }
116
117 static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt)
118 {
119 u32 entries = q->cached_prod - q->cons_tail;
120
121 if (entries >= cnt)
122 return true;
123
124 /* Refresh the local pointer. */
125 q->cached_prod = READ_ONCE(q->ring->producer);
126 entries = q->cached_prod - q->cons_tail;
127
128 return entries >= cnt;
129 }
130
131 /* UMEM queue */
132
133 static inline bool xskq_crosses_non_contig_pg(struct xdp_umem *umem, u64 addr,
134 u64 length)
135 {
136 bool cross_pg = (addr & (PAGE_SIZE - 1)) + length > PAGE_SIZE;
137 bool next_pg_contig =
138 (unsigned long)umem->pages[(addr >> PAGE_SHIFT)].addr &
139 XSK_NEXT_PG_CONTIG_MASK;
140
141 return cross_pg && !next_pg_contig;
142 }
143
144 static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr)
145 {
146 if (addr >= q->size) {
147 q->invalid_descs++;
148 return false;
149 }
150
151 return true;
152 }
153
154 static inline bool xskq_is_valid_addr_unaligned(struct xsk_queue *q, u64 addr,
155 u64 length,
156 struct xdp_umem *umem)
157 {
158 u64 base_addr = xsk_umem_extract_addr(addr);
159
160 addr = xsk_umem_add_offset_to_addr(addr);
161 if (base_addr >= q->size || addr >= q->size ||
162 xskq_crosses_non_contig_pg(umem, addr, length)) {
163 q->invalid_descs++;
164 return false;
165 }
166
167 return true;
168 }
169
170 static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr,
171 struct xdp_umem *umem)
172 {
173 while (q->cons_tail != q->cons_head) {
174 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
175 unsigned int idx = q->cons_tail & q->ring_mask;
176
177 *addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask;
178
179 if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
180 if (xskq_is_valid_addr_unaligned(q, *addr,
181 umem->chunk_size_nohr,
182 umem))
183 return addr;
184 goto out;
185 }
186
187 if (xskq_is_valid_addr(q, *addr))
188 return addr;
189
190 out:
191 q->cons_tail++;
192 }
193
194 return NULL;
195 }
196
197 static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr,
198 struct xdp_umem *umem)
199 {
200 if (q->cons_tail == q->cons_head) {
201 smp_mb(); /* D, matches A */
202 WRITE_ONCE(q->ring->consumer, q->cons_tail);
203 q->cons_head = q->cons_tail + xskq_nb_avail(q, 1);
204
205 /* Order consumer and data */
206 smp_rmb();
207 }
208
209 return xskq_validate_addr(q, addr, umem);
210 }
211
212 static inline void xskq_discard_addr(struct xsk_queue *q)
213 {
214 q->cons_tail++;
215 }
216
217 static inline int xskq_prod_reserve(struct xsk_queue *q)
218 {
219 if (xskq_nb_free(q, 1) == 0)
220 return -ENOSPC;
221
222 /* A, matches D */
223 q->cached_prod++;
224 return 0;
225 }
226
227 static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
228 {
229 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
230
231 if (xskq_nb_free(q, 1) == 0)
232 return -ENOSPC;
233
234 /* A, matches D */
235 ring->desc[q->cached_prod++ & q->ring_mask] = addr;
236 return 0;
237 }
238
239 static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
240 {
241 /* Order producer and data */
242 smp_wmb(); /* B, matches C */
243
244 WRITE_ONCE(q->ring->producer, idx);
245 }
246
247 static inline void xskq_prod_submit(struct xsk_queue *q)
248 {
249 __xskq_prod_submit(q, q->cached_prod);
250 }
251
252 static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
253 {
254 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
255 u32 idx = q->ring->producer;
256
257 ring->desc[idx++ & q->ring_mask] = addr;
258
259 __xskq_prod_submit(q, idx);
260 }
261
262 static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
263 {
264 __xskq_prod_submit(q, q->ring->producer + nb_entries);
265 }
266
267 /* Rx/Tx queue */
268
269 static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d,
270 struct xdp_umem *umem)
271 {
272 if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
273 if (!xskq_is_valid_addr_unaligned(q, d->addr, d->len, umem))
274 return false;
275
276 if (d->len > umem->chunk_size_nohr || d->options) {
277 q->invalid_descs++;
278 return false;
279 }
280
281 return true;
282 }
283
284 if (!xskq_is_valid_addr(q, d->addr))
285 return false;
286
287 if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) ||
288 d->options) {
289 q->invalid_descs++;
290 return false;
291 }
292
293 return true;
294 }
295
296 static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q,
297 struct xdp_desc *desc,
298 struct xdp_umem *umem)
299 {
300 while (q->cons_tail != q->cons_head) {
301 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
302 unsigned int idx = q->cons_tail & q->ring_mask;
303
304 *desc = READ_ONCE(ring->desc[idx]);
305 if (xskq_is_valid_desc(q, desc, umem))
306 return desc;
307
308 q->cons_tail++;
309 }
310
311 return NULL;
312 }
313
314 static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
315 struct xdp_desc *desc,
316 struct xdp_umem *umem)
317 {
318 if (q->cons_tail == q->cons_head) {
319 smp_mb(); /* D, matches A */
320 WRITE_ONCE(q->ring->consumer, q->cons_tail);
321 q->cons_head = q->cons_tail + xskq_nb_avail(q, 1);
322
323 /* Order consumer and data */
324 smp_rmb(); /* C, matches B */
325 }
326
327 return xskq_validate_desc(q, desc, umem);
328 }
329
330 static inline void xskq_discard_desc(struct xsk_queue *q)
331 {
332 q->cons_tail++;
333 }
334
335 static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
336 u64 addr, u32 len)
337 {
338 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
339 u32 idx;
340
341 if (xskq_nb_free(q, 1) == 0)
342 return -ENOSPC;
343
344 /* A, matches D */
345 idx = q->cached_prod++ & q->ring_mask;
346 ring->desc[idx].addr = addr;
347 ring->desc[idx].len = len;
348
349 return 0;
350 }
351
352 static inline bool xskq_full_desc(struct xsk_queue *q)
353 {
354 /* No barriers needed since data is not accessed */
355 return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) ==
356 q->nentries;
357 }
358
359 static inline bool xskq_prod_is_empty(struct xsk_queue *q)
360 {
361 /* No barriers needed since data is not accessed */
362 return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer);
363 }
364
365 void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);
366 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue);
367 void xskq_destroy(struct xsk_queue *q_ops);
368
369 /* Executed by the core when the entire UMEM gets freed */
370 void xsk_reuseq_destroy(struct xdp_umem *umem);
371
372 #endif /* _LINUX_XSK_QUEUE_H */