]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/io_uring.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / os / bluestore / io_uring.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "ceph_io_uring.h"
5
6 #if defined(HAVE_LIBURING) && defined(__x86_64__)
7
8 #include "liburing.h"
9 #include <sys/epoll.h>
10
11 /* Options */
12
13 static bool hipri = false; /* use IO polling */
14 static bool sq_thread = false; /* use kernel submission/poller thread */
15
16 struct ioring_data {
17 struct io_uring io_uring;
18 pthread_mutex_t cq_mutex;
19 pthread_mutex_t sq_mutex;
20 int epoll_fd = -1;
21 std::map<int, int> fixed_fds_map;
22 };
23
24 static int ioring_get_cqe(struct ioring_data *d, unsigned int max,
25 struct aio_t **paio)
26 {
27 struct io_uring *ring = &d->io_uring;
28 struct io_uring_cqe *cqe;
29
30 unsigned nr = 0;
31 unsigned head;
32 io_uring_for_each_cqe(ring, head, cqe) {
33 struct aio_t *io = (struct aio_t *)(uintptr_t) io_uring_cqe_get_data(cqe);
34 io->rval = cqe->res;
35
36 paio[nr++] = io;
37
38 if (nr == max)
39 break;
40 }
41 io_uring_cq_advance(ring, nr);
42
43 return nr;
44 }
45
46 static int find_fixed_fd(struct ioring_data *d, int real_fd)
47 {
48 auto it = d->fixed_fds_map.find(real_fd);
49 if (it == d->fixed_fds_map.end())
50 return -1;
51
52 return it->second;
53 }
54
55 static void init_sqe(struct ioring_data *d, struct io_uring_sqe *sqe,
56 struct aio_t *io)
57 {
58 int fixed_fd = find_fixed_fd(d, io->fd);
59
60 ceph_assert(fixed_fd != -1);
61
62 if (io->iocb.aio_lio_opcode == IO_CMD_PWRITEV)
63 io_uring_prep_writev(sqe, fixed_fd, &io->iov[0],
64 io->iov.size(), io->offset);
65 else if (io->iocb.aio_lio_opcode == IO_CMD_PREADV)
66 io_uring_prep_readv(sqe, fixed_fd, &io->iov[0],
67 io->iov.size(), io->offset);
68 else
69 ceph_assert(0);
70
71 io_uring_sqe_set_data(sqe, io);
72 io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
73 }
74
75 static int ioring_queue(struct ioring_data *d, void *priv,
76 list<aio_t>::iterator beg, list<aio_t>::iterator end)
77 {
78 struct io_uring *ring = &d->io_uring;
79 struct aio_t *io = nullptr;
80
81 ceph_assert(beg != end);
82
83 do {
84 struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
85 if (!sqe)
86 break;
87
88 io = &*beg;
89 io->priv = priv;
90
91 init_sqe(d, sqe, io);
92
93 } while (++beg != end);
94
95 if (!io)
96 /* Queue is full, go and reap something first */
97 return 0;
98
99 return io_uring_submit(ring);
100 }
101
102 static void build_fixed_fds_map(struct ioring_data *d,
103 std::vector<int> &fds)
104 {
105 int fixed_fd = 0;
106 for (int real_fd : fds) {
107 d->fixed_fds_map[real_fd] = fixed_fd++;
108 }
109 }
110
111 ioring_queue_t::ioring_queue_t(unsigned iodepth_) :
112 d(make_unique<ioring_data>()),
113 iodepth(iodepth_)
114 {
115 }
116
117 ioring_queue_t::~ioring_queue_t()
118 {
119 }
120
121 int ioring_queue_t::init(std::vector<int> &fds)
122 {
123 unsigned flags = 0;
124
125 pthread_mutex_init(&d->cq_mutex, NULL);
126 pthread_mutex_init(&d->sq_mutex, NULL);
127
128 if (hipri)
129 flags |= IORING_SETUP_IOPOLL;
130 if (sq_thread)
131 flags |= IORING_SETUP_SQPOLL;
132
133 int ret = io_uring_queue_init(iodepth, &d->io_uring, flags);
134 if (ret < 0)
135 return ret;
136
137 ret = io_uring_register(d->io_uring.ring_fd, IORING_REGISTER_FILES,
138 &fds[0], fds.size());
139 if (ret < 0) {
140 ret = -errno;
141 goto close_ring_fd;
142 }
143
144 build_fixed_fds_map(d.get(), fds);
145
146 d->epoll_fd = epoll_create1(0);
147 if (d->epoll_fd < 0) {
148 ret = -errno;
149 goto close_ring_fd;
150 }
151
152 struct epoll_event ev;
153 ev.events = EPOLLIN;
154 ret = epoll_ctl(d->epoll_fd, EPOLL_CTL_ADD, d->io_uring.ring_fd, &ev);
155 if (ret < 0) {
156 ret = -errno;
157 goto close_epoll_fd;
158 }
159
160 return 0;
161
162 close_epoll_fd:
163 close(d->epoll_fd);
164 close_ring_fd:
165 io_uring_queue_exit(&d->io_uring);
166
167 return ret;
168 }
169
170 void ioring_queue_t::shutdown()
171 {
172 d->fixed_fds_map.clear();
173 close(d->epoll_fd);
174 d->epoll_fd = -1;
175 io_uring_queue_exit(&d->io_uring);
176 }
177
178 int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end,
179 uint16_t aios_size, void *priv,
180 int *retries)
181 {
182 (void)aios_size;
183 (void)retries;
184
185 pthread_mutex_lock(&d->sq_mutex);
186 int rc = ioring_queue(d.get(), priv, beg, end);
187 pthread_mutex_unlock(&d->sq_mutex);
188
189 return rc;
190 }
191
192 int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
193 {
194 get_cqe:
195 pthread_mutex_lock(&d->cq_mutex);
196 int events = ioring_get_cqe(d.get(), max, paio);
197 pthread_mutex_unlock(&d->cq_mutex);
198
199 if (events == 0) {
200 struct epoll_event ev;
201 int ret = epoll_wait(d->epoll_fd, &ev, 1, timeout_ms);
202 if (ret < 0)
203 events = -errno;
204 else if (ret > 0)
205 /* Time to reap */
206 goto get_cqe;
207 }
208
209 return events;
210 }
211
212 bool ioring_queue_t::supported()
213 {
214 struct io_uring_params p;
215
216 memset(&p, 0, sizeof(p));
217 int fd = io_uring_setup(16, &p);
218 if (fd < 0)
219 return false;
220
221 close(fd);
222
223 return true;
224 }
225
226 #else // #if defined(HAVE_LIBURING) && defined(__x86_64__)
227
228 struct ioring_data {};
229
230 ioring_queue_t::ioring_queue_t(unsigned iodepth_)
231 {
232 ceph_assert(0);
233 }
234
235 ioring_queue_t::~ioring_queue_t()
236 {
237 ceph_assert(0);
238 }
239
240 int ioring_queue_t::init(std::vector<int> &fds)
241 {
242 ceph_assert(0);
243 }
244
245 void ioring_queue_t::shutdown()
246 {
247 ceph_assert(0);
248 }
249
250 int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end,
251 uint16_t aios_size, void *priv,
252 int *retries)
253 {
254 ceph_assert(0);
255 }
256
257 int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
258 {
259 ceph_assert(0);
260 }
261
262 bool ioring_queue_t::supported()
263 {
264 return false;
265 }
266
267 #endif // #if defined(HAVE_LIBURING) && defined(__x86_64__)