]>
Commit | Line | Data |
---|---|---|
320054e8 DG |
1 | #include <aio.h> |
2 | #include <pthread.h> | |
3 | #include <semaphore.h> | |
4 | #include <limits.h> | |
5 | #include <errno.h> | |
6 | #include <unistd.h> | |
7 | #include <stdlib.h> | |
8 | #include <sys/auxv.h> | |
9 | #include "syscall.h" | |
10 | #include "atomic.h" | |
11 | #include "pthread_impl.h" | |
322bd4ff DG |
12 | #include "aio_impl.h" |
13 | ||
14 | #define malloc __libc_malloc | |
15 | #define calloc __libc_calloc | |
16 | #define realloc __libc_realloc | |
17 | #define free __libc_free | |
320054e8 DG |
18 | |
19 | /* The following is a threads-based implementation of AIO with minimal | |
20 | * dependence on implementation details. Most synchronization is | |
21 | * performed with pthread primitives, but atomics and futex operations | |
22 | * are used for notification in a couple places where the pthread | |
23 | * primitives would be inefficient or impractical. | |
24 | * | |
25 | * For each fd with outstanding aio operations, an aio_queue structure | |
26 | * is maintained. These are reference-counted and destroyed by the last | |
27 | * aio worker thread to exit. Accessing any member of the aio_queue | |
28 | * structure requires a lock on the aio_queue. Adding and removing aio | |
29 | * queues themselves requires a write lock on the global map object, | |
30 | * a 4-level table mapping file descriptor numbers to aio queues. A | |
31 | * read lock on the map is used to obtain locks on existing queues by | |
32 | * excluding destruction of the queue by a different thread while it is | |
33 | * being locked. | |
34 | * | |
35 | * Each aio queue has a list of active threads/operations. Presently there | |
36 | * is a one to one relationship between threads and operations. The only | |
37 | * members of the aio_thread structure which are accessed by other threads | |
38 | * are the linked list pointers, op (which is immutable), running (which | |
39 | * is updated atomically), and err (which is synchronized via running), | |
40 | * so no locking is necessary. Most of the other other members are used | |
41 | * for sharing data between the main flow of execution and cancellation | |
42 | * cleanup handler. | |
43 | * | |
44 | * Taking any aio locks requires having all signals blocked. This is | |
45 | * necessary because aio_cancel is needed by close, and close is required | |
46 | * to be async-signal safe. All aio worker threads run with all signals | |
47 | * blocked permanently. | |
48 | */ | |
49 | ||
50 | struct aio_thread { | |
51 | pthread_t td; | |
52 | struct aiocb *cb; | |
53 | struct aio_thread *next, *prev; | |
54 | struct aio_queue *q; | |
55 | volatile int running; | |
56 | int err, op; | |
57 | ssize_t ret; | |
58 | }; | |
59 | ||
60 | struct aio_queue { | |
61 | int fd, seekable, append, ref, init; | |
62 | pthread_mutex_t lock; | |
63 | pthread_cond_t cond; | |
64 | struct aio_thread *head; | |
65 | }; | |
66 | ||
67 | struct aio_args { | |
68 | struct aiocb *cb; | |
69 | struct aio_queue *q; | |
70 | int op; | |
71 | sem_t sem; | |
72 | }; | |
73 | ||
74 | static pthread_rwlock_t maplock = PTHREAD_RWLOCK_INITIALIZER; | |
75 | static struct aio_queue *****map; | |
76 | static volatile int aio_fd_cnt; | |
77 | volatile int __aio_fut; | |
78 | ||
322bd4ff DG |
79 | static size_t io_thread_stack_size; |
80 | ||
81 | #define MAX(a,b) ((a)>(b) ? (a) : (b)) | |
82 | ||
320054e8 DG |
83 | static struct aio_queue *__aio_get_queue(int fd, int need) |
84 | { | |
85 | if (fd < 0) { | |
86 | errno = EBADF; | |
87 | return 0; | |
88 | } | |
89 | int a=fd>>24; | |
90 | unsigned char b=fd>>16, c=fd>>8, d=fd; | |
91 | struct aio_queue *q = 0; | |
92 | pthread_rwlock_rdlock(&maplock); | |
93 | if ((!map || !map[a] || !map[a][b] || !map[a][b][c] || !(q=map[a][b][c][d])) && need) { | |
94 | pthread_rwlock_unlock(&maplock); | |
95 | if (fcntl(fd, F_GETFD) < 0) return 0; | |
96 | pthread_rwlock_wrlock(&maplock); | |
322bd4ff DG |
97 | if (!io_thread_stack_size) { |
98 | unsigned long val = __getauxval(AT_MINSIGSTKSZ); | |
99 | io_thread_stack_size = MAX(MINSIGSTKSZ+2048, val+512); | |
100 | } | |
320054e8 DG |
101 | if (!map) map = calloc(sizeof *map, (-1U/2+1)>>24); |
102 | if (!map) goto out; | |
103 | if (!map[a]) map[a] = calloc(sizeof **map, 256); | |
104 | if (!map[a]) goto out; | |
105 | if (!map[a][b]) map[a][b] = calloc(sizeof ***map, 256); | |
106 | if (!map[a][b]) goto out; | |
107 | if (!map[a][b][c]) map[a][b][c] = calloc(sizeof ****map, 256); | |
108 | if (!map[a][b][c]) goto out; | |
109 | if (!(q = map[a][b][c][d])) { | |
110 | map[a][b][c][d] = q = calloc(sizeof *****map, 1); | |
111 | if (q) { | |
112 | q->fd = fd; | |
113 | pthread_mutex_init(&q->lock, 0); | |
114 | pthread_cond_init(&q->cond, 0); | |
115 | a_inc(&aio_fd_cnt); | |
116 | } | |
117 | } | |
118 | } | |
119 | if (q) pthread_mutex_lock(&q->lock); | |
120 | out: | |
121 | pthread_rwlock_unlock(&maplock); | |
122 | return q; | |
123 | } | |
124 | ||
125 | static void __aio_unref_queue(struct aio_queue *q) | |
126 | { | |
127 | if (q->ref > 1) { | |
128 | q->ref--; | |
129 | pthread_mutex_unlock(&q->lock); | |
130 | return; | |
131 | } | |
132 | ||
133 | /* This is potentially the last reference, but a new reference | |
134 | * may arrive since we cannot free the queue object without first | |
135 | * taking the maplock, which requires releasing the queue lock. */ | |
136 | pthread_mutex_unlock(&q->lock); | |
137 | pthread_rwlock_wrlock(&maplock); | |
138 | pthread_mutex_lock(&q->lock); | |
139 | if (q->ref == 1) { | |
140 | int fd=q->fd; | |
141 | int a=fd>>24; | |
142 | unsigned char b=fd>>16, c=fd>>8, d=fd; | |
143 | map[a][b][c][d] = 0; | |
144 | a_dec(&aio_fd_cnt); | |
145 | pthread_rwlock_unlock(&maplock); | |
146 | pthread_mutex_unlock(&q->lock); | |
147 | free(q); | |
148 | } else { | |
149 | q->ref--; | |
150 | pthread_rwlock_unlock(&maplock); | |
151 | pthread_mutex_unlock(&q->lock); | |
152 | } | |
153 | } | |
154 | ||
155 | static void cleanup(void *ctx) | |
156 | { | |
157 | struct aio_thread *at = ctx; | |
158 | struct aio_queue *q = at->q; | |
159 | struct aiocb *cb = at->cb; | |
160 | struct sigevent sev = cb->aio_sigevent; | |
161 | ||
162 | /* There are four potential types of waiters we could need to wake: | |
163 | * 1. Callers of aio_cancel/close. | |
164 | * 2. Callers of aio_suspend with a single aiocb. | |
165 | * 3. Callers of aio_suspend with a list. | |
166 | * 4. AIO worker threads waiting for sequenced operations. | |
167 | * Types 1-3 are notified via atomics/futexes, mainly for AS-safety | |
168 | * considerations. Type 4 is notified later via a cond var. */ | |
169 | ||
170 | cb->__ret = at->ret; | |
171 | if (a_swap(&at->running, 0) < 0) | |
172 | __wake(&at->running, -1, 1); | |
173 | if (a_swap(&cb->__err, at->err) != EINPROGRESS) | |
174 | __wake(&cb->__err, -1, 1); | |
175 | if (a_swap(&__aio_fut, 0)) | |
176 | __wake(&__aio_fut, -1, 1); | |
177 | ||
178 | pthread_mutex_lock(&q->lock); | |
179 | ||
180 | if (at->next) at->next->prev = at->prev; | |
181 | if (at->prev) at->prev->next = at->next; | |
182 | else q->head = at->next; | |
183 | ||
184 | /* Signal aio worker threads waiting for sequenced operations. */ | |
185 | pthread_cond_broadcast(&q->cond); | |
186 | ||
187 | __aio_unref_queue(q); | |
188 | ||
189 | if (sev.sigev_notify == SIGEV_SIGNAL) { | |
190 | siginfo_t si = { | |
191 | .si_signo = sev.sigev_signo, | |
192 | .si_value = sev.sigev_value, | |
193 | .si_code = SI_ASYNCIO, | |
194 | .si_pid = getpid(), | |
195 | .si_uid = getuid() | |
196 | }; | |
197 | __syscall(SYS_rt_sigqueueinfo, si.si_pid, si.si_signo, &si); | |
198 | } | |
199 | if (sev.sigev_notify == SIGEV_THREAD) { | |
200 | a_store(&__pthread_self()->cancel, 0); | |
201 | sev.sigev_notify_function(sev.sigev_value); | |
202 | } | |
203 | } | |
204 | ||
205 | static void *io_thread_func(void *ctx) | |
206 | { | |
207 | struct aio_thread at, *p; | |
208 | ||
209 | struct aio_args *args = ctx; | |
210 | struct aiocb *cb = args->cb; | |
211 | int fd = cb->aio_fildes; | |
212 | int op = args->op; | |
213 | void *buf = (void *)cb->aio_buf; | |
214 | size_t len = cb->aio_nbytes; | |
215 | off_t off = cb->aio_offset; | |
216 | ||
217 | struct aio_queue *q = args->q; | |
218 | ssize_t ret; | |
219 | ||
220 | pthread_mutex_lock(&q->lock); | |
221 | sem_post(&args->sem); | |
222 | ||
223 | at.op = op; | |
224 | at.running = 1; | |
225 | at.ret = -1; | |
226 | at.err = ECANCELED; | |
227 | at.q = q; | |
228 | at.td = __pthread_self(); | |
229 | at.cb = cb; | |
230 | at.prev = 0; | |
231 | if ((at.next = q->head)) at.next->prev = &at; | |
232 | q->head = &at; | |
233 | ||
234 | if (!q->init) { | |
235 | int seekable = lseek(fd, 0, SEEK_CUR) >= 0; | |
236 | q->seekable = seekable; | |
237 | q->append = !seekable || (fcntl(fd, F_GETFL) & O_APPEND); | |
238 | q->init = 1; | |
239 | } | |
240 | ||
241 | pthread_cleanup_push(cleanup, &at); | |
242 | ||
243 | /* Wait for sequenced operations. */ | |
244 | if (op!=LIO_READ && (op!=LIO_WRITE || q->append)) { | |
245 | for (;;) { | |
246 | for (p=at.next; p && p->op!=LIO_WRITE; p=p->next); | |
247 | if (!p) break; | |
248 | pthread_cond_wait(&q->cond, &q->lock); | |
249 | } | |
250 | } | |
251 | ||
252 | pthread_mutex_unlock(&q->lock); | |
253 | ||
254 | switch (op) { | |
255 | case LIO_WRITE: | |
256 | ret = q->append ? write(fd, buf, len) : pwrite(fd, buf, len, off); | |
257 | break; | |
258 | case LIO_READ: | |
259 | ret = !q->seekable ? read(fd, buf, len) : pread(fd, buf, len, off); | |
260 | break; | |
261 | case O_SYNC: | |
262 | ret = fsync(fd); | |
263 | break; | |
264 | case O_DSYNC: | |
265 | ret = fdatasync(fd); | |
266 | break; | |
267 | } | |
268 | at.ret = ret; | |
269 | at.err = ret<0 ? errno : 0; | |
270 | ||
271 | pthread_cleanup_pop(1); | |
272 | ||
273 | return 0; | |
274 | } | |
275 | ||
320054e8 DG |
276 | static int submit(struct aiocb *cb, int op) |
277 | { | |
278 | int ret = 0; | |
279 | pthread_attr_t a; | |
280 | sigset_t allmask, origmask; | |
281 | pthread_t td; | |
282 | struct aio_queue *q = __aio_get_queue(cb->aio_fildes, 1); | |
283 | struct aio_args args = { .cb = cb, .op = op, .q = q }; | |
284 | sem_init(&args.sem, 0, 0); | |
285 | ||
286 | if (!q) { | |
287 | if (errno != EBADF) errno = EAGAIN; | |
288 | cb->__ret = -1; | |
289 | cb->__err = errno; | |
290 | return -1; | |
291 | } | |
292 | q->ref++; | |
293 | pthread_mutex_unlock(&q->lock); | |
294 | ||
295 | if (cb->aio_sigevent.sigev_notify == SIGEV_THREAD) { | |
296 | if (cb->aio_sigevent.sigev_notify_attributes) | |
297 | a = *cb->aio_sigevent.sigev_notify_attributes; | |
298 | else | |
299 | pthread_attr_init(&a); | |
300 | } else { | |
320054e8 DG |
301 | pthread_attr_init(&a); |
302 | pthread_attr_setstacksize(&a, io_thread_stack_size); | |
303 | pthread_attr_setguardsize(&a, 0); | |
304 | } | |
305 | pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED); | |
306 | sigfillset(&allmask); | |
307 | pthread_sigmask(SIG_BLOCK, &allmask, &origmask); | |
308 | cb->__err = EINPROGRESS; | |
309 | if (pthread_create(&td, &a, io_thread_func, &args)) { | |
310 | pthread_mutex_lock(&q->lock); | |
311 | __aio_unref_queue(q); | |
312 | cb->__err = errno = EAGAIN; | |
313 | cb->__ret = ret = -1; | |
314 | } | |
315 | pthread_sigmask(SIG_SETMASK, &origmask, 0); | |
316 | ||
317 | if (!ret) { | |
318 | while (sem_wait(&args.sem)); | |
319 | } | |
320 | ||
321 | return ret; | |
322 | } | |
323 | ||
324 | int aio_read(struct aiocb *cb) | |
325 | { | |
326 | return submit(cb, LIO_READ); | |
327 | } | |
328 | ||
329 | int aio_write(struct aiocb *cb) | |
330 | { | |
331 | return submit(cb, LIO_WRITE); | |
332 | } | |
333 | ||
334 | int aio_fsync(int op, struct aiocb *cb) | |
335 | { | |
336 | if (op != O_SYNC && op != O_DSYNC) { | |
337 | errno = EINVAL; | |
338 | return -1; | |
339 | } | |
340 | return submit(cb, op); | |
341 | } | |
342 | ||
343 | ssize_t aio_return(struct aiocb *cb) | |
344 | { | |
345 | return cb->__ret; | |
346 | } | |
347 | ||
348 | int aio_error(const struct aiocb *cb) | |
349 | { | |
350 | a_barrier(); | |
351 | return cb->__err & 0x7fffffff; | |
352 | } | |
353 | ||
354 | int aio_cancel(int fd, struct aiocb *cb) | |
355 | { | |
356 | sigset_t allmask, origmask; | |
357 | int ret = AIO_ALLDONE; | |
358 | struct aio_thread *p; | |
359 | struct aio_queue *q; | |
360 | ||
361 | /* Unspecified behavior case. Report an error. */ | |
362 | if (cb && fd != cb->aio_fildes) { | |
363 | errno = EINVAL; | |
364 | return -1; | |
365 | } | |
366 | ||
367 | sigfillset(&allmask); | |
368 | pthread_sigmask(SIG_BLOCK, &allmask, &origmask); | |
369 | ||
370 | errno = ENOENT; | |
371 | if (!(q = __aio_get_queue(fd, 0))) { | |
372 | if (errno == EBADF) ret = -1; | |
373 | goto done; | |
374 | } | |
375 | ||
376 | for (p = q->head; p; p = p->next) { | |
377 | if (cb && cb != p->cb) continue; | |
378 | /* Transition target from running to running-with-waiters */ | |
379 | if (a_cas(&p->running, 1, -1)) { | |
380 | pthread_cancel(p->td); | |
381 | __wait(&p->running, 0, -1, 1); | |
382 | if (p->err == ECANCELED) ret = AIO_CANCELED; | |
383 | } | |
384 | } | |
385 | ||
386 | pthread_mutex_unlock(&q->lock); | |
387 | done: | |
388 | pthread_sigmask(SIG_SETMASK, &origmask, 0); | |
389 | return ret; | |
390 | } | |
391 | ||
392 | int __aio_close(int fd) | |
393 | { | |
394 | a_barrier(); | |
395 | if (aio_fd_cnt) aio_cancel(fd, 0); | |
396 | return fd; | |
397 | } | |
398 | ||
322bd4ff DG |
399 | void __aio_atfork(int who) |
400 | { | |
401 | if (who<0) { | |
402 | pthread_rwlock_rdlock(&maplock); | |
403 | return; | |
404 | } | |
405 | if (who>0 && map) for (int a=0; a<(-1U/2+1)>>24; a++) | |
406 | if (map[a]) for (int b=0; b<256; b++) | |
407 | if (map[a][b]) for (int c=0; c<256; c++) | |
408 | if (map[a][b][c]) for (int d=0; d<256; d++) | |
409 | map[a][b][c][d] = 0; | |
410 | pthread_rwlock_unlock(&maplock); | |
411 | } | |
412 | ||
320054e8 DG |
413 | weak_alias(aio_cancel, aio_cancel64); |
414 | weak_alias(aio_error, aio_error64); | |
415 | weak_alias(aio_fsync, aio_fsync64); | |
416 | weak_alias(aio_read, aio_read64); | |
417 | weak_alias(aio_write, aio_write64); | |
418 | weak_alias(aio_return, aio_return64); |