]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "ceph_io_uring.h" | |
5 | ||
6 | #if defined(HAVE_LIBURING) && defined(__x86_64__) | |
7 | ||
8 | #include "liburing.h" | |
9 | #include <sys/epoll.h> | |
10 | ||
11 | /* Options */ | |
12 | ||
13 | static bool hipri = false; /* use IO polling */ | |
14 | static bool sq_thread = false; /* use kernel submission/poller thread */ | |
15 | ||
16 | struct ioring_data { | |
17 | struct io_uring io_uring; | |
18 | pthread_mutex_t cq_mutex; | |
19 | pthread_mutex_t sq_mutex; | |
20 | int epoll_fd = -1; | |
21 | std::map<int, int> fixed_fds_map; | |
22 | }; | |
23 | ||
24 | static int ioring_get_cqe(struct ioring_data *d, unsigned int max, | |
25 | struct aio_t **paio) | |
26 | { | |
27 | struct io_uring *ring = &d->io_uring; | |
28 | struct io_uring_cqe *cqe; | |
29 | ||
30 | unsigned nr = 0; | |
31 | unsigned head; | |
32 | io_uring_for_each_cqe(ring, head, cqe) { | |
33 | struct aio_t *io = (struct aio_t *)(uintptr_t) io_uring_cqe_get_data(cqe); | |
34 | io->rval = cqe->res; | |
35 | ||
36 | paio[nr++] = io; | |
37 | ||
38 | if (nr == max) | |
39 | break; | |
40 | } | |
41 | io_uring_cq_advance(ring, nr); | |
42 | ||
43 | return nr; | |
44 | } | |
45 | ||
46 | static int find_fixed_fd(struct ioring_data *d, int real_fd) | |
47 | { | |
48 | auto it = d->fixed_fds_map.find(real_fd); | |
49 | if (it == d->fixed_fds_map.end()) | |
50 | return -1; | |
51 | ||
52 | return it->second; | |
53 | } | |
54 | ||
55 | static void init_sqe(struct ioring_data *d, struct io_uring_sqe *sqe, | |
56 | struct aio_t *io) | |
57 | { | |
58 | int fixed_fd = find_fixed_fd(d, io->fd); | |
59 | ||
60 | ceph_assert(fixed_fd != -1); | |
61 | ||
62 | if (io->iocb.aio_lio_opcode == IO_CMD_PWRITEV) | |
63 | io_uring_prep_writev(sqe, fixed_fd, &io->iov[0], | |
64 | io->iov.size(), io->offset); | |
65 | else if (io->iocb.aio_lio_opcode == IO_CMD_PREADV) | |
66 | io_uring_prep_readv(sqe, fixed_fd, &io->iov[0], | |
67 | io->iov.size(), io->offset); | |
68 | else | |
69 | ceph_assert(0); | |
70 | ||
71 | io_uring_sqe_set_data(sqe, io); | |
72 | io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); | |
73 | } | |
74 | ||
75 | static int ioring_queue(struct ioring_data *d, void *priv, | |
76 | list<aio_t>::iterator beg, list<aio_t>::iterator end) | |
77 | { | |
78 | struct io_uring *ring = &d->io_uring; | |
79 | struct aio_t *io = nullptr; | |
80 | ||
81 | ceph_assert(beg != end); | |
82 | ||
83 | do { | |
84 | struct io_uring_sqe *sqe = io_uring_get_sqe(ring); | |
85 | if (!sqe) | |
86 | break; | |
87 | ||
88 | io = &*beg; | |
89 | io->priv = priv; | |
90 | ||
91 | init_sqe(d, sqe, io); | |
92 | ||
93 | } while (++beg != end); | |
94 | ||
95 | if (!io) | |
96 | /* Queue is full, go and reap something first */ | |
97 | return 0; | |
98 | ||
99 | return io_uring_submit(ring); | |
100 | } | |
101 | ||
102 | static void build_fixed_fds_map(struct ioring_data *d, | |
103 | std::vector<int> &fds) | |
104 | { | |
105 | int fixed_fd = 0; | |
106 | for (int real_fd : fds) { | |
107 | d->fixed_fds_map[real_fd] = fixed_fd++; | |
108 | } | |
109 | } | |
110 | ||
111 | ioring_queue_t::ioring_queue_t(unsigned iodepth_) : | |
112 | d(make_unique<ioring_data>()), | |
113 | iodepth(iodepth_) | |
114 | { | |
115 | } | |
116 | ||
117 | ioring_queue_t::~ioring_queue_t() | |
118 | { | |
119 | } | |
120 | ||
121 | int ioring_queue_t::init(std::vector<int> &fds) | |
122 | { | |
123 | unsigned flags = 0; | |
124 | ||
125 | pthread_mutex_init(&d->cq_mutex, NULL); | |
126 | pthread_mutex_init(&d->sq_mutex, NULL); | |
127 | ||
128 | if (hipri) | |
129 | flags |= IORING_SETUP_IOPOLL; | |
130 | if (sq_thread) | |
131 | flags |= IORING_SETUP_SQPOLL; | |
132 | ||
133 | int ret = io_uring_queue_init(iodepth, &d->io_uring, flags); | |
134 | if (ret < 0) | |
135 | return ret; | |
136 | ||
137 | ret = io_uring_register(d->io_uring.ring_fd, IORING_REGISTER_FILES, | |
138 | &fds[0], fds.size()); | |
139 | if (ret < 0) { | |
140 | ret = -errno; | |
141 | goto close_ring_fd; | |
142 | } | |
143 | ||
144 | build_fixed_fds_map(d.get(), fds); | |
145 | ||
146 | d->epoll_fd = epoll_create1(0); | |
147 | if (d->epoll_fd < 0) { | |
148 | ret = -errno; | |
149 | goto close_ring_fd; | |
150 | } | |
151 | ||
152 | struct epoll_event ev; | |
153 | ev.events = EPOLLIN; | |
154 | ret = epoll_ctl(d->epoll_fd, EPOLL_CTL_ADD, d->io_uring.ring_fd, &ev); | |
155 | if (ret < 0) { | |
156 | ret = -errno; | |
157 | goto close_epoll_fd; | |
158 | } | |
159 | ||
160 | return 0; | |
161 | ||
162 | close_epoll_fd: | |
163 | close(d->epoll_fd); | |
164 | close_ring_fd: | |
165 | io_uring_queue_exit(&d->io_uring); | |
166 | ||
167 | return ret; | |
168 | } | |
169 | ||
170 | void ioring_queue_t::shutdown() | |
171 | { | |
172 | d->fixed_fds_map.clear(); | |
173 | close(d->epoll_fd); | |
174 | d->epoll_fd = -1; | |
175 | io_uring_queue_exit(&d->io_uring); | |
176 | } | |
177 | ||
178 | int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end, | |
179 | uint16_t aios_size, void *priv, | |
180 | int *retries) | |
181 | { | |
182 | (void)aios_size; | |
183 | (void)retries; | |
184 | ||
185 | pthread_mutex_lock(&d->sq_mutex); | |
186 | int rc = ioring_queue(d.get(), priv, beg, end); | |
187 | pthread_mutex_unlock(&d->sq_mutex); | |
188 | ||
189 | return rc; | |
190 | } | |
191 | ||
192 | int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max) | |
193 | { | |
194 | get_cqe: | |
195 | pthread_mutex_lock(&d->cq_mutex); | |
196 | int events = ioring_get_cqe(d.get(), max, paio); | |
197 | pthread_mutex_unlock(&d->cq_mutex); | |
198 | ||
199 | if (events == 0) { | |
200 | struct epoll_event ev; | |
201 | int ret = epoll_wait(d->epoll_fd, &ev, 1, timeout_ms); | |
202 | if (ret < 0) | |
203 | events = -errno; | |
204 | else if (ret > 0) | |
205 | /* Time to reap */ | |
206 | goto get_cqe; | |
207 | } | |
208 | ||
209 | return events; | |
210 | } | |
211 | ||
212 | bool ioring_queue_t::supported() | |
213 | { | |
214 | struct io_uring_params p; | |
215 | ||
216 | memset(&p, 0, sizeof(p)); | |
217 | int fd = io_uring_setup(16, &p); | |
218 | if (fd < 0) | |
219 | return false; | |
220 | ||
221 | close(fd); | |
222 | ||
223 | return true; | |
224 | } | |
225 | ||
226 | #else // #if defined(HAVE_LIBURING) && defined(__x86_64__) | |
227 | ||
228 | struct ioring_data {}; | |
229 | ||
230 | ioring_queue_t::ioring_queue_t(unsigned iodepth_) | |
231 | { | |
232 | ceph_assert(0); | |
233 | } | |
234 | ||
235 | ioring_queue_t::~ioring_queue_t() | |
236 | { | |
237 | ceph_assert(0); | |
238 | } | |
239 | ||
240 | int ioring_queue_t::init(std::vector<int> &fds) | |
241 | { | |
242 | ceph_assert(0); | |
243 | } | |
244 | ||
245 | void ioring_queue_t::shutdown() | |
246 | { | |
247 | ceph_assert(0); | |
248 | } | |
249 | ||
250 | int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end, | |
251 | uint16_t aios_size, void *priv, | |
252 | int *retries) | |
253 | { | |
254 | ceph_assert(0); | |
255 | } | |
256 | ||
257 | int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max) | |
258 | { | |
259 | ceph_assert(0); | |
260 | } | |
261 | ||
262 | bool ioring_queue_t::supported() | |
263 | { | |
264 | return false; | |
265 | } | |
266 | ||
267 | #endif // #if defined(HAVE_LIBURING) && defined(__x86_64__) |