]>
git.proxmox.com Git - mirror_ovs.git/blob - lib/async-append-aio.c
1 /* Copyright (c) 2013 Nicira, Inc.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 /* This implementation of the async-append.h interface uses the POSIX
19 * asynchronous I/O interface. */
21 #include "async-append.h"
29 #include "ovs-thread.h"
32 /* Maximum number of bytes of buffered data. */
33 enum { BUFFER_SIZE
= 65536 };
35 /* Maximum number of aiocbs to use.
37 * aiocbs are big (144 bytes with glibc 2.11 on i386) so we try to allow for a
38 * reasonable number by basing the number we allocate on the amount of buffer
40 enum { MAX_CBS
= ROUND_DOWN_POW2(BUFFER_SIZE
/ sizeof(struct aiocb
)) };
41 BUILD_ASSERT_DECL(IS_POW2(MAX_CBS
));
47 unsigned int aiocb_head
, aiocb_tail
;
54 async_append_create(int fd
)
56 struct async_append
*ap
;
58 ap
= xmalloc(sizeof *ap
);
60 ap
->aiocbs
= xmalloc(MAX_CBS
* sizeof *ap
->aiocbs
);
61 ap
->aiocb_head
= ap
->aiocb_tail
= 0;
62 ap
->buffer
= xmalloc(BUFFER_SIZE
);
63 byteq_init(&ap
->byteq
, ap
->buffer
, BUFFER_SIZE
);
69 async_append_destroy(struct async_append
*ap
)
72 async_append_flush(ap
);
80 async_append_is_full(const struct async_append
*ap
)
82 return (ap
->aiocb_head
- ap
->aiocb_tail
>= MAX_CBS
83 || byteq_is_full(&ap
->byteq
));
87 async_append_is_empty(const struct async_append
*ap
)
89 return byteq_is_empty(&ap
->byteq
);
93 async_append_wait(struct async_append
*ap
)
97 while (!async_append_is_empty(ap
)) {
98 struct aiocb
*aiocb
= &ap
->aiocbs
[ap
->aiocb_tail
& (MAX_CBS
- 1)];
99 int error
= aio_error(aiocb
);
101 if (error
== EINPROGRESS
) {
102 const struct aiocb
*p
= aiocb
;
106 aio_suspend(&p
, 1, NULL
);
108 ignore(aio_return(aiocb
));
110 byteq_advance_tail(&ap
->byteq
, aiocb
->aio_nbytes
);
117 async_append_write(struct async_append
*ap
, const void *data_
, size_t size
)
119 const uint8_t *data
= data_
;
126 while (async_append_is_full(ap
)) {
127 async_append_wait(ap
);
130 chunk
= byteq_head(&ap
->byteq
);
131 chunk_size
= byteq_headroom(&ap
->byteq
);
132 if (chunk_size
> size
) {
135 memcpy(chunk
, data
, chunk_size
);
137 aiocb
= &ap
->aiocbs
[ap
->aiocb_head
& (MAX_CBS
- 1)];
138 memset(aiocb
, 0, sizeof *aiocb
);
139 aiocb
->aio_fildes
= ap
->fd
;
140 aiocb
->aio_offset
= 0;
141 aiocb
->aio_buf
= chunk
;
142 aiocb
->aio_nbytes
= chunk_size
;
143 aiocb
->aio_sigevent
.sigev_notify
= SIGEV_NONE
;
144 if (aio_write(aiocb
) == -1) {
145 async_append_flush(ap
);
146 ignore(write(ap
->fd
, data
, size
));
152 byteq_advance_head(&ap
->byteq
, chunk_size
);
158 async_append_flush(struct async_append
*ap
)
160 while (!async_append_is_empty(ap
)) {
161 async_append_wait(ap
);