]>
Commit | Line | Data |
---|---|---|
e9ee6d7c | 1 | From c46b20023cd8f9bbc29df0210287b4ec5dfe19a8 Mon Sep 17 00:00:00 2001 |
ca0fe5f5 WB |
2 | From: Wolfgang Bumiller <w.bumiller@proxmox.com> |
3 | Date: Wed, 9 Dec 2015 15:40:00 +0100 | |
9c3bec39 | 4 | Subject: [PATCH 19/47] backup: vma: remove async queue |
ca0fe5f5 WB |
5 | |
6 | --- | |
7 | blockdev.c | 6 ++ | |
8 | vma-writer.c | 179 +++++++++++------------------------------------------------ | |
9 | 2 files changed, 38 insertions(+), 147 deletions(-) | |
10 | ||
11 | diff --git a/blockdev.c b/blockdev.c | |
e9ee6d7c | 12 | index db03f8b..0c1815d 100644 |
ca0fe5f5 WB |
13 | --- a/blockdev.c |
14 | +++ b/blockdev.c | |
1a91ab45 | 15 | @@ -3122,6 +3122,11 @@ static void pvebackup_cancel(void *opaque) |
ca0fe5f5 WB |
16 | error_setg(&backup_state.error, "backup cancelled"); |
17 | } | |
18 | ||
19 | + if (backup_state.vmaw) { | |
20 | + /* make sure vma writer does not block anymore */ | |
21 | + vma_writer_set_error(backup_state.vmaw, "backup cancelled"); | |
22 | + } | |
23 | + | |
24 | /* drain all i/o (awake jobs waiting for aio) */ | |
25 | bdrv_drain_all(); | |
26 | ||
1a91ab45 | 27 | @@ -3134,6 +3139,7 @@ static void pvebackup_cancel(void *opaque) |
ca0fe5f5 WB |
28 | if (job) { |
29 | if (!di->completed) { | |
30 | block_job_cancel_sync(job); | |
31 | + bdrv_drain_all(); /* drain all i/o (awake jobs waiting for aio) */ | |
32 | } | |
33 | } | |
34 | } | |
35 | diff --git a/vma-writer.c b/vma-writer.c | |
1a91ab45 | 36 | index 689e988..ec8da53 100644 |
ca0fe5f5 WB |
37 | --- a/vma-writer.c |
38 | +++ b/vma-writer.c | |
68a30562 | 39 | @@ -28,14 +28,8 @@ |
ca0fe5f5 WB |
40 | do { if (DEBUG_VMA) { printf("vma: " fmt, ## __VA_ARGS__); } } while (0) |
41 | ||
42 | #define WRITE_BUFFERS 5 | |
43 | - | |
44 | -typedef struct VmaAIOCB VmaAIOCB; | |
45 | -struct VmaAIOCB { | |
46 | - unsigned char buffer[VMA_MAX_EXTENT_SIZE]; | |
47 | - VmaWriter *vmaw; | |
48 | - size_t bytes; | |
49 | - Coroutine *co; | |
50 | -}; | |
51 | +#define HEADER_CLUSTERS 8 | |
52 | +#define HEADERBUF_SIZE (VMA_CLUSTER_SIZE*HEADER_CLUSTERS) | |
53 | ||
54 | struct VmaWriter { | |
55 | int fd; | |
68a30562 | 56 | @@ -47,16 +41,14 @@ struct VmaWriter { |
ca0fe5f5 WB |
57 | bool closed; |
58 | ||
59 | /* we always write extents */ | |
60 | - unsigned char outbuf[VMA_MAX_EXTENT_SIZE]; | |
61 | + unsigned char *outbuf; | |
62 | int outbuf_pos; /* in bytes */ | |
63 | int outbuf_count; /* in VMA_BLOCKS */ | |
64 | uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT]; | |
65 | ||
66 | - VmaAIOCB *aiocbs[WRITE_BUFFERS]; | |
67 | - CoQueue wqueue; | |
68 | + unsigned char *headerbuf; | |
69 | ||
70 | GChecksum *md5csum; | |
71 | - CoMutex writer_lock; | |
72 | CoMutex flush_lock; | |
73 | Coroutine *co_writer; | |
74 | ||
68a30562 | 75 | @@ -217,38 +209,39 @@ static void vma_co_continue_write(void *opaque) |
ca0fe5f5 WB |
76 | } |
77 | ||
78 | static ssize_t coroutine_fn | |
79 | -vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes) | |
80 | +vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes) | |
81 | { | |
82 | - size_t done = 0; | |
83 | - ssize_t ret; | |
84 | + DPRINTF("vma_queue_write enter %zd\n", bytes); | |
85 | ||
86 | - /* atomic writes (we cannot interleave writes) */ | |
87 | - qemu_co_mutex_lock(&vmaw->writer_lock); | |
88 | + assert(vmaw); | |
89 | + assert(buf); | |
90 | + assert(bytes <= VMA_MAX_EXTENT_SIZE); | |
91 | ||
92 | - DPRINTF("vma_co_write enter %zd\n", bytes); | |
93 | + size_t done = 0; | |
94 | + ssize_t ret; | |
95 | ||
96 | assert(vmaw->co_writer == NULL); | |
97 | ||
98 | vmaw->co_writer = qemu_coroutine_self(); | |
99 | ||
68a30562 | 100 | - aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, vma_co_continue_write, vmaw); |
ca0fe5f5 WB |
101 | - |
102 | - DPRINTF("vma_co_write wait until writable\n"); | |
103 | - qemu_coroutine_yield(); | |
104 | - DPRINTF("vma_co_write starting %zd\n", bytes); | |
105 | - | |
106 | while (done < bytes) { | |
1a91ab45 | 107 | + aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, vma_co_continue_write, NULL, vmaw); |
ca0fe5f5 | 108 | + qemu_coroutine_yield(); |
1a91ab45 | 109 | + aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, NULL, NULL, NULL); |
ca0fe5f5 WB |
110 | + if (vmaw->status < 0) { |
111 | + DPRINTF("vma_queue_write detected canceled backup\n"); | |
112 | + done = -1; | |
113 | + break; | |
114 | + } | |
115 | ret = write(vmaw->fd, buf + done, bytes - done); | |
116 | if (ret > 0) { | |
117 | done += ret; | |
118 | - DPRINTF("vma_co_write written %zd %zd\n", done, ret); | |
119 | + DPRINTF("vma_queue_write written %zd %zd\n", done, ret); | |
120 | } else if (ret < 0) { | |
121 | if (errno == EAGAIN || errno == EWOULDBLOCK) { | |
122 | - DPRINTF("vma_co_write yield %zd\n", done); | |
123 | - qemu_coroutine_yield(); | |
124 | - DPRINTF("vma_co_write restart %zd\n", done); | |
125 | - } else { | |
126 | - vma_writer_set_error(vmaw, "vma_co_write write error - %s", | |
127 | + /* try again */ | |
128 | + } else { | |
129 | + vma_writer_set_error(vmaw, "vma_queue_write: write error - %s", | |
130 | g_strerror(errno)); | |
131 | done = -1; /* always return failure for partial writes */ | |
132 | break; | |
68a30562 | 133 | @@ -258,102 +251,9 @@ vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes) |
ca0fe5f5 WB |
134 | } |
135 | } | |
136 | ||
68a30562 | 137 | - aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, NULL, NULL); |
ca0fe5f5 WB |
138 | - |
139 | vmaw->co_writer = NULL; | |
140 | - | |
141 | - qemu_co_mutex_unlock(&vmaw->writer_lock); | |
142 | - | |
143 | - DPRINTF("vma_co_write leave %zd\n", done); | |
144 | - return done; | |
145 | -} | |
146 | - | |
147 | -static void coroutine_fn vma_co_writer_task(void *opaque) | |
148 | -{ | |
149 | - VmaAIOCB *cb = opaque; | |
150 | - | |
151 | - DPRINTF("vma_co_writer_task start\n"); | |
152 | - | |
153 | - int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes); | |
154 | - DPRINTF("vma_co_writer_task write done %zd\n", done); | |
155 | - | |
156 | - if (done != cb->bytes) { | |
157 | - DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done); | |
158 | - vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd", | |
159 | - done); | |
160 | - } | |
161 | - | |
162 | - cb->bytes = 0; | |
163 | - | |
164 | - qemu_co_queue_next(&cb->vmaw->wqueue); | |
165 | - | |
166 | - DPRINTF("vma_co_writer_task end\n"); | |
167 | -} | |
168 | - | |
169 | -static void coroutine_fn vma_queue_flush(VmaWriter *vmaw) | |
170 | -{ | |
171 | - DPRINTF("vma_queue_flush enter\n"); | |
172 | - | |
173 | - assert(vmaw); | |
174 | - | |
175 | - while (1) { | |
176 | - int i; | |
177 | - VmaAIOCB *cb = NULL; | |
178 | - for (i = 0; i < WRITE_BUFFERS; i++) { | |
179 | - if (vmaw->aiocbs[i]->bytes) { | |
180 | - cb = vmaw->aiocbs[i]; | |
181 | - DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i, | |
182 | - vmaw->aiocbs[i]->bytes); | |
183 | - break; | |
184 | - } | |
185 | - } | |
186 | - if (!cb) { | |
187 | - break; | |
188 | - } | |
189 | - qemu_co_queue_wait(&vmaw->wqueue); | |
190 | - } | |
191 | - | |
192 | - DPRINTF("vma_queue_flush leave\n"); | |
193 | -} | |
194 | - | |
195 | -/** | |
196 | - * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a') | |
197 | - * So we need to create a coroutione to allow 'parallel' execution. | |
198 | - */ | |
199 | -static ssize_t coroutine_fn | |
200 | -vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes) | |
201 | -{ | |
202 | - DPRINTF("vma_queue_write enter %zd\n", bytes); | |
203 | - | |
204 | - assert(vmaw); | |
205 | - assert(buf); | |
206 | - assert(bytes <= VMA_MAX_EXTENT_SIZE); | |
207 | - | |
208 | - VmaAIOCB *cb = NULL; | |
209 | - while (!cb) { | |
210 | - int i; | |
211 | - for (i = 0; i < WRITE_BUFFERS; i++) { | |
212 | - if (!vmaw->aiocbs[i]->bytes) { | |
213 | - cb = vmaw->aiocbs[i]; | |
214 | - break; | |
215 | - } | |
216 | - } | |
217 | - if (!cb) { | |
218 | - qemu_co_queue_wait(&vmaw->wqueue); | |
219 | - } | |
220 | - } | |
221 | - | |
222 | - memcpy(cb->buffer, buf, bytes); | |
223 | - cb->bytes = bytes; | |
224 | - cb->vmaw = vmaw; | |
225 | - | |
226 | - DPRINTF("vma_queue_write start %zd\n", bytes); | |
227 | - cb->co = qemu_coroutine_create(vma_co_writer_task); | |
228 | - qemu_coroutine_enter(cb->co, cb); | |
229 | - | |
230 | - DPRINTF("vma_queue_write leave\n"); | |
231 | - | |
232 | - return bytes; | |
233 | + | |
234 | + return (done == bytes) ? bytes : -1; | |
235 | } | |
236 | ||
237 | VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, Error **errp) | |
68a30562 | 238 | @@ -420,20 +320,16 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, Error **errp) |
ca0fe5f5 WB |
239 | } |
240 | ||
241 | /* we use O_DIRECT, so we need to align IO buffers */ | |
242 | - int i; | |
243 | - for (i = 0; i < WRITE_BUFFERS; i++) { | |
244 | - vmaw->aiocbs[i] = qemu_memalign(512, sizeof(VmaAIOCB)); | |
245 | - memset(vmaw->aiocbs[i], 0, sizeof(VmaAIOCB)); | |
246 | - } | |
247 | + | |
248 | + vmaw->outbuf = qemu_memalign(512, VMA_MAX_EXTENT_SIZE); | |
249 | + vmaw->headerbuf = qemu_memalign(512, HEADERBUF_SIZE); | |
250 | ||
251 | vmaw->outbuf_count = 0; | |
252 | vmaw->outbuf_pos = VMA_EXTENT_HEADER_SIZE; | |
253 | ||
254 | vmaw->header_blob_table_pos = 1; /* start at pos 1 */ | |
255 | ||
256 | - qemu_co_mutex_init(&vmaw->writer_lock); | |
257 | qemu_co_mutex_init(&vmaw->flush_lock); | |
258 | - qemu_co_queue_init(&vmaw->wqueue); | |
259 | ||
260 | uuid_copy(vmaw->uuid, uuid); | |
261 | ||
68a30562 | 262 | @@ -460,8 +356,7 @@ err: |
ca0fe5f5 WB |
263 | static int coroutine_fn vma_write_header(VmaWriter *vmaw) |
264 | { | |
265 | assert(vmaw); | |
266 | - int header_clusters = 8; | |
267 | - char buf[65536*header_clusters]; | |
268 | + unsigned char *buf = vmaw->headerbuf; | |
269 | VmaHeader *head = (VmaHeader *)buf; | |
270 | ||
271 | int i; | |
68a30562 | 272 | @@ -472,7 +367,7 @@ static int coroutine_fn vma_write_header(VmaWriter *vmaw) |
ca0fe5f5 WB |
273 | return vmaw->status; |
274 | } | |
275 | ||
276 | - memset(buf, 0, sizeof(buf)); | |
277 | + memset(buf, 0, HEADERBUF_SIZE); | |
278 | ||
279 | head->magic = VMA_MAGIC; | |
280 | head->version = GUINT32_TO_BE(1); /* v1 */ | |
68a30562 | 281 | @@ -507,7 +402,7 @@ static int coroutine_fn vma_write_header(VmaWriter *vmaw) |
ca0fe5f5 WB |
282 | uint32_t header_size = sizeof(VmaHeader) + vmaw->header_blob_table_size; |
283 | head->header_size = GUINT32_TO_BE(header_size); | |
284 | ||
285 | - if (header_size > sizeof(buf)) { | |
286 | + if (header_size > HEADERBUF_SIZE) { | |
287 | return -1; /* just to be sure */ | |
288 | } | |
289 | ||
68a30562 | 290 | @@ -805,13 +700,7 @@ int vma_writer_close(VmaWriter *vmaw, Error **errp) |
ca0fe5f5 WB |
291 | |
292 | int i; | |
293 | ||
294 | - vma_queue_flush(vmaw); | |
295 | - | |
296 | - /* this should not happen - just to be sure */ | |
297 | - while (!qemu_co_queue_empty(&vmaw->wqueue)) { | |
298 | - DPRINTF("vma_writer_close wait\n"); | |
299 | - co_aio_sleep_ns(qemu_get_aio_context(), QEMU_CLOCK_REALTIME, 1000000); | |
300 | - } | |
301 | + assert(vmaw->co_writer == NULL); | |
302 | ||
303 | if (vmaw->cmd) { | |
304 | if (pclose(vmaw->cmd) < 0) { | |
68a30562 | 305 | @@ -869,9 +758,5 @@ void vma_writer_destroy(VmaWriter *vmaw) |
ca0fe5f5 WB |
306 | g_checksum_free(vmaw->md5csum); |
307 | } | |
308 | ||
309 | - for (i = 0; i < WRITE_BUFFERS; i++) { | |
310 | - free(vmaw->aiocbs[i]); | |
311 | - } | |
312 | - | |
313 | g_free(vmaw); | |
314 | } | |
315 | -- | |
316 | 2.1.4 | |
317 |