]> git.proxmox.com Git - ceph.git/blame - ceph/src/liburing/src/queue.c
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / liburing / src / queue.c
CommitLineData
f67539c2
TL
1/* SPDX-License-Identifier: MIT */
2#include <sys/types.h>
3#include <sys/stat.h>
4#include <sys/mman.h>
5#include <unistd.h>
6#include <errno.h>
7#include <string.h>
8#include <stdbool.h>
9
10#include "liburing/compat.h"
11#include "liburing/io_uring.h"
12#include "liburing.h"
13#include "liburing/barrier.h"
14
15#include "syscall.h"
16
17/*
18 * Returns true if we're not using SQ thread (thus nobody submits but us)
19 * or if IORING_SQ_NEED_WAKEUP is set, so submit thread must be explicitly
20 * awakened. For the latter case, we set the thread wakeup flag.
21 */
22static inline bool sq_ring_needs_enter(struct io_uring *ring,
23 unsigned submitted, unsigned *flags)
24{
25 if (!(ring->flags & IORING_SETUP_SQPOLL) && submitted)
26 return true;
27 if (IO_URING_READ_ONCE(*ring->sq.kflags) & IORING_SQ_NEED_WAKEUP) {
28 *flags |= IORING_ENTER_SQ_WAKEUP;
29 return true;
30 }
31
32 return false;
33}
34
35static inline bool cq_ring_needs_flush(struct io_uring *ring)
36{
37 return IO_URING_READ_ONCE(*ring->sq.kflags) & IORING_SQ_CQ_OVERFLOW;
38}
39
40static int __io_uring_peek_cqe(struct io_uring *ring,
41 struct io_uring_cqe **cqe_ptr)
42{
43 struct io_uring_cqe *cqe;
44 unsigned head;
45 int err = 0;
46
47 do {
48 io_uring_for_each_cqe(ring, head, cqe)
49 break;
50 if (cqe) {
51 if (cqe->user_data == LIBURING_UDATA_TIMEOUT) {
52 if (cqe->res < 0)
53 err = cqe->res;
54 io_uring_cq_advance(ring, 1);
55 if (!err)
56 continue;
57 cqe = NULL;
58 }
59 }
60 break;
61 } while (1);
62
63 *cqe_ptr = cqe;
64 return err;
65}
66
67int __io_uring_get_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
68 unsigned submit, unsigned wait_nr, sigset_t *sigmask)
69{
70 struct io_uring_cqe *cqe = NULL;
71 const int to_wait = wait_nr;
72 int ret = 0, err;
73
74 do {
75 bool cq_overflow_flush = false;
76 unsigned flags = 0;
77
78 err = __io_uring_peek_cqe(ring, &cqe);
79 if (err)
80 break;
81 if (!cqe && !to_wait && !submit) {
82 if (!cq_ring_needs_flush(ring)) {
83 err = -EAGAIN;
84 break;
85 }
86 cq_overflow_flush = true;
87 }
88 if (wait_nr && cqe)
89 wait_nr--;
90 if (wait_nr || cq_overflow_flush)
91 flags = IORING_ENTER_GETEVENTS;
92 if (submit)
93 sq_ring_needs_enter(ring, submit, &flags);
94 if (wait_nr || submit || cq_overflow_flush)
95 ret = __sys_io_uring_enter(ring->ring_fd, submit,
96 wait_nr, flags, sigmask);
97 if (ret < 0) {
98 err = -errno;
99 } else if (ret == (int)submit) {
100 submit = 0;
101 /*
102 * When SETUP_IOPOLL is set, __sys_io_uring enter()
103 * must be called to reap new completions but the call
104 * won't be made if both wait_nr and submit are zero
105 * so preserve wait_nr.
106 */
107 if (!(ring->flags & IORING_SETUP_IOPOLL))
108 wait_nr = 0;
109 } else {
110 submit -= ret;
111 }
112 if (cqe)
113 break;
114 } while (!err);
115
116 *cqe_ptr = cqe;
117 return err;
118}
119
120/*
121 * Fill in an array of IO completions up to count, if any are available.
122 * Returns the amount of IO completions filled.
123 */
124unsigned io_uring_peek_batch_cqe(struct io_uring *ring,
125 struct io_uring_cqe **cqes, unsigned count)
126{
127 unsigned ready;
128 bool overflow_checked = false;
129
130again:
131 ready = io_uring_cq_ready(ring);
132 if (ready) {
133 unsigned head = *ring->cq.khead;
134 unsigned mask = *ring->cq.kring_mask;
135 unsigned last;
136 int i = 0;
137
138 count = count > ready ? ready : count;
139 last = head + count;
140 for (;head != last; head++, i++)
141 cqes[i] = &ring->cq.cqes[head & mask];
142
143 return count;
144 }
145
146 if (overflow_checked)
147 goto done;
148
149 if (cq_ring_needs_flush(ring)) {
150 __sys_io_uring_enter(ring->ring_fd, 0, 0,
151 IORING_ENTER_GETEVENTS, NULL);
152 overflow_checked = true;
153 goto again;
154 }
155
156done:
157 return 0;
158}
159
160/*
161 * Sync internal state with kernel ring state on the SQ side. Returns the
162 * number of pending items in the SQ ring, for the shared ring.
163 */
164static int __io_uring_flush_sq(struct io_uring *ring)
165{
166 struct io_uring_sq *sq = &ring->sq;
167 const unsigned mask = *sq->kring_mask;
168 unsigned ktail, to_submit;
169
170 if (sq->sqe_head == sq->sqe_tail) {
171 ktail = *sq->ktail;
172 goto out;
173 }
174
175 /*
176 * Fill in sqes that we have queued up, adding them to the kernel ring
177 */
178 ktail = *sq->ktail;
179 to_submit = sq->sqe_tail - sq->sqe_head;
180 while (to_submit--) {
181 sq->array[ktail & mask] = sq->sqe_head & mask;
182 ktail++;
183 sq->sqe_head++;
184 }
185
186 /*
187 * Ensure that the kernel sees the SQE updates before it sees the tail
188 * update.
189 */
190 io_uring_smp_store_release(sq->ktail, ktail);
191out:
192 return ktail - *sq->khead;
193}
194
195/*
196 * Like io_uring_wait_cqe(), except it accepts a timeout value as well. Note
197 * that an sqe is used internally to handle the timeout. Applications using
198 * this function must never set sqe->user_data to LIBURING_UDATA_TIMEOUT!
199 *
200 * If 'ts' is specified, the application need not call io_uring_submit() before
201 * calling this function, as we will do that on its behalf. From this it also
202 * follows that this function isn't safe to use for applications that split SQ
203 * and CQ handling between two threads and expect that to work without
204 * synchronization, as this function manipulates both the SQ and CQ side.
205 */
206int io_uring_wait_cqes(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
207 unsigned wait_nr, struct __kernel_timespec *ts,
208 sigset_t *sigmask)
209{
210 unsigned to_submit = 0;
211
212 if (ts) {
213 struct io_uring_sqe *sqe;
214 int ret;
215
216 /*
217 * If the SQ ring is full, we may need to submit IO first
218 */
219 sqe = io_uring_get_sqe(ring);
220 if (!sqe) {
221 ret = io_uring_submit(ring);
222 if (ret < 0)
223 return ret;
224 sqe = io_uring_get_sqe(ring);
225 if (!sqe)
226 return -EAGAIN;
227 }
228 io_uring_prep_timeout(sqe, ts, wait_nr, 0);
229 sqe->user_data = LIBURING_UDATA_TIMEOUT;
230 to_submit = __io_uring_flush_sq(ring);
231 }
232
233 return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask);
234}
235
236/*
237 * See io_uring_wait_cqes() - this function is the same, it just always uses
238 * '1' as the wait_nr.
239 */
240int io_uring_wait_cqe_timeout(struct io_uring *ring,
241 struct io_uring_cqe **cqe_ptr,
242 struct __kernel_timespec *ts)
243{
244 return io_uring_wait_cqes(ring, cqe_ptr, 1, ts, NULL);
245}
246
247/*
248 * Submit sqes acquired from io_uring_get_sqe() to the kernel.
249 *
250 * Returns number of sqes submitted
251 */
252static int __io_uring_submit(struct io_uring *ring, unsigned submitted,
253 unsigned wait_nr)
254{
255 unsigned flags;
256 int ret;
257
258 flags = 0;
259 if (sq_ring_needs_enter(ring, submitted, &flags) || wait_nr) {
260 if (wait_nr || (ring->flags & IORING_SETUP_IOPOLL))
261 flags |= IORING_ENTER_GETEVENTS;
262
263 ret = __sys_io_uring_enter(ring->ring_fd, submitted, wait_nr,
264 flags, NULL);
265 if (ret < 0)
266 return -errno;
267 } else
268 ret = submitted;
269
270 return ret;
271}
272
273static int __io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
274{
275 return __io_uring_submit(ring, __io_uring_flush_sq(ring), wait_nr);
276}
277
278/*
279 * Submit sqes acquired from io_uring_get_sqe() to the kernel.
280 *
281 * Returns number of sqes submitted
282 */
283int io_uring_submit(struct io_uring *ring)
284{
285 return __io_uring_submit_and_wait(ring, 0);
286}
287
288/*
289 * Like io_uring_submit(), but allows waiting for events as well.
290 *
291 * Returns number of sqes submitted
292 */
293int io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
294{
295 return __io_uring_submit_and_wait(ring, wait_nr);
296}
297
298static inline struct io_uring_sqe *
299__io_uring_get_sqe(struct io_uring_sq *sq, unsigned int __head)
300{
301 unsigned int __next = (sq)->sqe_tail + 1;
302 struct io_uring_sqe *__sqe = NULL;
303
304 if (__next - __head <= *(sq)->kring_entries) {
305 __sqe = &(sq)->sqes[(sq)->sqe_tail & *(sq)->kring_mask];
306 (sq)->sqe_tail = __next;
307 }
308 return __sqe;
309}
310
311/*
312 * Return an sqe to fill. Application must later call io_uring_submit()
313 * when it's ready to tell the kernel about it. The caller may call this
314 * function multiple times before calling io_uring_submit().
315 *
316 * Returns a vacant sqe, or NULL if we're full.
317 */
318struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring)
319{
320 struct io_uring_sq *sq = &ring->sq;
321
322 return __io_uring_get_sqe(sq, io_uring_smp_load_acquire(sq->khead));
323}