]>
Commit | Line | Data |
---|---|---|
ddfd618f | 1 | From a6f324d47b810809de2a6106849527c6a9590175 Mon Sep 17 00:00:00 2001 |
309874bd DM |
2 | From: Dietmar Maurer <dietmar@proxmox.com> |
3 | Date: Mon, 14 Jan 2013 08:05:40 +0100 | |
4 | Subject: [PATCH v3 7/7] use extra thread for vma writer | |
5 | ||
6 | The previous AIO approach has problem with bdrv_drain_all(), because writer | |
7 | coroutines are not considered there. Those coroutines are not restarted, so | |
8 | bdrv_drain_all() can fail (tracked_requests list not empty). | |
9 | ||
10 | We now use a thread, so we could also add compression here. | |
11 | ||
12 | Signed-off-by: Dietmar Maurer <dietmar@proxmox.com> | |
13 | --- | |
14 | vma-writer.c | 296 +++++++++++++++++++++++++++++++++++----------------------- | |
15 | 1 files changed, 180 insertions(+), 116 deletions(-) | |
16 | ||
2dfd543c DM |
17 | Index: new/vma-writer.c |
18 | =================================================================== | |
19 | --- new.orig/vma-writer.c 2013-01-23 07:35:12.000000000 +0100 | |
20 | +++ new/vma-writer.c 2013-01-23 09:24:19.000000000 +0100 | |
21 | @@ -37,13 +37,21 @@ | |
309874bd DM |
22 | |
23 | #define WRITE_BUFFERS 5 | |
24 | ||
25 | -typedef struct VmaAIOCB VmaAIOCB; | |
26 | -struct VmaAIOCB { | |
27 | - VmaWriter *vmaw; | |
28 | +typedef struct WriteBuffer { | |
29 | unsigned char buffer[VMA_MAX_EXTENT_SIZE]; | |
30 | size_t bytes; | |
31 | - Coroutine *co; | |
32 | -}; | |
33 | +} WriteBuffer; | |
34 | + | |
35 | +typedef struct WriterThread { | |
36 | + int fd; | |
37 | + int error; | |
38 | + bool cancel; | |
39 | + GThread *thread; | |
40 | + GMutex *mutex; | |
41 | + GCond *change_cond; | |
42 | + WriteBuffer wbuf[WRITE_BUFFERS]; | |
2dfd543c | 43 | + CoQueue wqueue; |
309874bd DM |
44 | +} WriterThread; |
45 | ||
46 | struct VmaWriter { | |
47 | int fd; | |
2dfd543c | 48 | @@ -60,8 +68,7 @@ |
309874bd DM |
49 | int outbuf_count; /* in VMA_BLOCKS */ |
50 | uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT]; | |
51 | ||
52 | - VmaAIOCB aiocbs[WRITE_BUFFERS]; | |
53 | - CoQueue wqueue; | |
54 | + WriterThread wt; | |
55 | ||
56 | GChecksum *md5csum; | |
57 | CoMutex writer_lock; | |
2dfd543c | 58 | @@ -86,6 +93,107 @@ |
309874bd DM |
59 | uint32_t config_count; |
60 | }; | |
61 | ||
62 | +static gpointer vma_writer_thread(gpointer data) | |
63 | +{ | |
64 | + WriterThread *wt = (WriterThread *)data; | |
65 | + | |
66 | + while (1) { | |
67 | + WriteBuffer *b = NULL; | |
68 | + | |
2dfd543c | 69 | + qemu_mutex_lock_iothread(); |
309874bd DM |
70 | + int i; |
71 | + for (i = 0; i < WRITE_BUFFERS; i++) { | |
72 | + if (wt->wbuf[i].bytes) { | |
73 | + b = &wt->wbuf[i]; | |
74 | + break; | |
75 | + } | |
76 | + } | |
2dfd543c | 77 | + qemu_mutex_unlock_iothread(); |
309874bd DM |
78 | + |
79 | + if (b) { | |
80 | + size_t done = 0; | |
81 | + while (done < b->bytes) { | |
82 | + int ret = write(wt->fd, b->buffer + done, b->bytes - done); | |
83 | + if (ret > 0) { | |
84 | + done += ret; | |
85 | + } else if (ret < 0) { | |
86 | + if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { | |
2dfd543c | 87 | + qemu_mutex_lock_iothread(); |
309874bd | 88 | + wt->error = errno; |
2dfd543c | 89 | + qemu_mutex_unlock_iothread(); |
309874bd DM |
90 | + break; |
91 | + } | |
92 | + } else if (ret == 0) { | |
93 | + /* should not happen - simply try again */ | |
94 | + } | |
95 | + } | |
2dfd543c | 96 | + qemu_mutex_lock_iothread(); |
309874bd | 97 | + b->bytes = 0; |
2dfd543c DM |
98 | + DPRINTF("AWAKE JOB %d\n", wt->error); |
99 | + if (wt->error) { | |
100 | + for (i = 0; i < WRITE_BUFFERS; i++) { | |
101 | + wt->wbuf[i].bytes = 0; | |
102 | + } | |
103 | + qemu_co_queue_restart_all(&wt->wqueue); | |
104 | + } else { | |
105 | + qemu_co_queue_next(&wt->wqueue); | |
106 | + } | |
107 | + qemu_mutex_unlock_iothread(); | |
108 | + DPRINTF("AWAKE JOB END\n"); | |
309874bd DM |
109 | + } |
110 | + | |
111 | + if (wt->error) { | |
112 | + DPRINTF("WRITER THREAD ERROR %d - exit thread\n", wt->error); | |
113 | + g_thread_exit(NULL); | |
114 | + } | |
115 | + | |
116 | + g_mutex_lock(wt->mutex); | |
117 | + bool cancel = wt->cancel; | |
118 | + if (!b && !cancel) { | |
2dfd543c | 119 | + DPRINTF("WRITER THREAD WAIT FOR DATA\n"); |
309874bd | 120 | + g_cond_wait(wt->change_cond, wt->mutex); |
2dfd543c | 121 | + cancel = wt->cancel; |
309874bd DM |
122 | + } |
123 | + g_mutex_unlock(wt->mutex); | |
124 | + | |
125 | + if (cancel) { | |
2dfd543c DM |
126 | + qemu_mutex_lock_iothread(); |
127 | + for (i = 0; i < WRITE_BUFFERS; i++) { | |
128 | + wt->wbuf[i].bytes = 0; | |
129 | + } | |
130 | + qemu_co_queue_restart_all(&wt->wqueue); | |
131 | + qemu_mutex_unlock_iothread(); | |
309874bd DM |
132 | + DPRINTF("END WRITER THREAD\n"); |
133 | + g_thread_exit(NULL); | |
134 | + } | |
135 | + } | |
136 | + | |
137 | + return NULL; | |
138 | +} | |
139 | + | |
140 | +static void vma_stop_writer_thread(VmaWriter *vmaw) | |
141 | +{ | |
142 | + assert(vmaw); | |
143 | + | |
2dfd543c DM |
144 | + DPRINTF("vma_stop_writer_thread start\n"); |
145 | + | |
309874bd | 146 | + if (vmaw->wt.thread) { |
2dfd543c | 147 | + DPRINTF("vma_stop_writer_thread 1\n"); |
309874bd | 148 | + g_mutex_lock(vmaw->wt.mutex); |
2dfd543c | 149 | + DPRINTF("vma_stop_writer_thread 2\n"); |
309874bd DM |
150 | + vmaw->wt.cancel = true; |
151 | + g_cond_signal(vmaw->wt.change_cond); | |
152 | + g_mutex_unlock(vmaw->wt.mutex); | |
2dfd543c DM |
153 | + DPRINTF("vma_stop_writer_thread 3\n"); |
154 | + qemu_mutex_unlock_iothread(); | |
309874bd | 155 | + g_thread_join(vmaw->wt.thread); |
2dfd543c DM |
156 | + qemu_mutex_lock_iothread(); |
157 | + DPRINTF("vma_stop_writer_thread 4\n"); | |
309874bd DM |
158 | + vmaw->wt.thread = NULL; |
159 | + } | |
2dfd543c | 160 | + DPRINTF("vma_stop_writer_thread end\n"); |
309874bd DM |
161 | +} |
162 | + | |
163 | void vma_writer_set_error(VmaWriter *vmaw, const char *fmt, ...) | |
164 | { | |
165 | va_list ap; | |
2dfd543c | 166 | @@ -213,111 +321,45 @@ |
309874bd DM |
167 | return n; |
168 | } | |
169 | ||
170 | -static void vma_co_continue_write(void *opaque) | |
171 | -{ | |
172 | - VmaWriter *vmaw = opaque; | |
173 | - | |
174 | - qemu_aio_set_fd_handler(vmaw->fd, NULL, NULL, NULL, NULL); | |
175 | - | |
176 | - DPRINTF("vma_co_continue_write\n"); | |
177 | - qemu_coroutine_enter(vmaw->co_writer, NULL); | |
178 | -} | |
179 | - | |
180 | -static ssize_t coroutine_fn | |
181 | -vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes) | |
182 | -{ | |
183 | - size_t done = 0; | |
184 | - ssize_t ret; | |
185 | - | |
186 | - /* atomic writes (we cannot interleave writes) */ | |
187 | - qemu_co_mutex_lock(&vmaw->writer_lock); | |
188 | - | |
189 | - DPRINTF("vma_co_write enter %zd\n", bytes); | |
190 | - | |
191 | - while (done < bytes) { | |
192 | - ret = write(vmaw->fd, buf + done, bytes - done); | |
193 | - if (ret > 0) { | |
194 | - done += ret; | |
195 | - DPRINTF("vma_co_write written %zd %zd\n", done, ret); | |
196 | - } else if (ret < 0) { | |
197 | - if (errno == EAGAIN || errno == EWOULDBLOCK) { | |
198 | - DPRINTF("vma_co_write yield %zd\n", done); | |
199 | - | |
200 | - vmaw->co_writer = qemu_coroutine_self(); | |
201 | - qemu_aio_set_fd_handler(vmaw->fd, NULL, vma_co_continue_write, | |
202 | - NULL, vmaw); | |
203 | - | |
204 | - qemu_coroutine_yield(); | |
205 | - DPRINTF("vma_co_write restart %zd\n", done); | |
206 | - } else { | |
207 | - vma_writer_set_error(vmaw, "vma_co_write write error - %s", | |
208 | - strerror(errno)); | |
209 | - done = -1; /* always return failure for partial writes */ | |
210 | - break; | |
211 | - } | |
212 | - } else if (ret == 0) { | |
213 | - /* should not happen - simply try again */ | |
214 | - } | |
215 | - } | |
216 | - | |
217 | - qemu_co_mutex_unlock(&vmaw->writer_lock); | |
218 | - | |
219 | - DPRINTF("vma_co_write leave %zd\n", done); | |
220 | - return done; | |
221 | -} | |
222 | - | |
223 | -static void coroutine_fn vma_co_writer_task(void *opaque) | |
224 | -{ | |
225 | - VmaAIOCB *cb = opaque; | |
226 | - | |
227 | - DPRINTF("vma_co_writer_task start\n"); | |
228 | - | |
229 | - int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes); | |
230 | - DPRINTF("vma_co_writer_task write done %zd\n", done); | |
231 | - | |
232 | - if (done != cb->bytes) { | |
233 | - DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done); | |
234 | - vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd", | |
235 | - done); | |
236 | - } | |
237 | - | |
238 | - cb->bytes = 0; | |
239 | - | |
240 | - qemu_co_queue_next(&cb->vmaw->wqueue); | |
241 | - | |
242 | - DPRINTF("vma_co_writer_task end\n"); | |
243 | -} | |
244 | - | |
245 | static void coroutine_fn vma_queue_flush(VmaWriter *vmaw) | |
246 | { | |
247 | DPRINTF("vma_queue_flush enter\n"); | |
248 | ||
249 | assert(vmaw); | |
250 | ||
251 | + int error; | |
252 | + | |
253 | while (1) { | |
254 | int i; | |
255 | - VmaAIOCB *cb = NULL; | |
256 | + WriteBuffer *b = NULL; | |
2dfd543c | 257 | + |
309874bd DM |
258 | + error = vmaw->wt.error; |
259 | + | |
260 | for (i = 0; i < WRITE_BUFFERS; i++) { | |
261 | - if (vmaw->aiocbs[i].bytes) { | |
262 | - cb = &vmaw->aiocbs[i]; | |
263 | - DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i, | |
264 | - vmaw->aiocbs[i].bytes); | |
265 | + if (vmaw->wt.wbuf[i].bytes) { | |
266 | + b = &vmaw->wt.wbuf[i]; | |
267 | + DPRINTF("FOUND USED WRITE BUFFER %d %zd\n", i, | |
268 | + vmaw->wt.wbuf[i].bytes); | |
269 | break; | |
270 | } | |
271 | } | |
272 | - if (!cb) { | |
309874bd DM |
273 | + |
274 | + if (!b || error) { | |
275 | break; | |
276 | } | |
277 | - qemu_co_queue_wait(&vmaw->wqueue); | |
2dfd543c DM |
278 | + DPRINTF("WAIT FOR BUFFER FLUSH\n"); |
279 | + qemu_co_queue_wait(&vmaw->wt.wqueue); | |
280 | + DPRINTF("WAIT FOR BUFFER FLUSH END\n"); | |
309874bd DM |
281 | + } |
282 | + | |
283 | + if (error) { | |
284 | + vma_writer_set_error(vmaw, "vma_queue_flush write error - %s", | |
285 | + strerror(error)); | |
286 | } | |
287 | ||
288 | DPRINTF("vma_queue_flush leave\n"); | |
289 | } | |
290 | ||
291 | -/** | |
292 | - * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a') | |
293 | - * So we need to create a coroutione to allow 'parallel' execution. | |
294 | - */ | |
295 | static ssize_t coroutine_fn | |
296 | vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes) | |
297 | { | |
2dfd543c | 298 | @@ -327,29 +369,42 @@ |
309874bd DM |
299 | assert(buf); |
300 | assert(bytes <= VMA_MAX_EXTENT_SIZE); | |
301 | ||
302 | - VmaAIOCB *cb = NULL; | |
303 | - while (!cb) { | |
304 | + int error = 0; | |
305 | + | |
306 | + /* wait for a free output buffer */ | |
309874bd DM |
307 | + WriteBuffer *b = NULL; |
308 | + while (!b) { | |
309 | + error = vmaw->wt.error; | |
310 | + if (error) { | |
2dfd543c | 311 | + vma_writer_set_error(vmaw, "vma_queue_write error - %s", |
309874bd DM |
312 | + strerror(error)); |
313 | + return -1; | |
314 | + } | |
315 | + | |
316 | int i; | |
317 | for (i = 0; i < WRITE_BUFFERS; i++) { | |
318 | - if (!vmaw->aiocbs[i].bytes) { | |
319 | - cb = &vmaw->aiocbs[i]; | |
320 | + if (!vmaw->wt.wbuf[i].bytes) { | |
321 | + b = &vmaw->wt.wbuf[i]; | |
322 | break; | |
323 | } | |
324 | } | |
325 | - if (!cb) { | |
326 | - qemu_co_queue_wait(&vmaw->wqueue); | |
327 | + if (!b) { | |
2dfd543c DM |
328 | + DPRINTF("WAIT FOR BUFFER\n"); |
329 | + qemu_co_queue_wait(&vmaw->wt.wqueue); | |
330 | + DPRINTF("WAIT FOR BUFFER DONE\n"); | |
309874bd DM |
331 | } |
332 | } | |
333 | ||
334 | - memcpy(cb->buffer, buf, bytes); | |
335 | - cb->bytes = bytes; | |
336 | - cb->vmaw = vmaw; | |
337 | + /* copy data to output buffer */ | |
338 | + memcpy(b->buffer, buf, bytes); | |
339 | + b->bytes = bytes; | |
309874bd DM |
340 | |
341 | - DPRINTF("vma_queue_write start %zd\n", bytes); | |
342 | - cb->co = qemu_coroutine_create(vma_co_writer_task); | |
343 | - qemu_coroutine_enter(cb->co, cb); | |
2dfd543c DM |
344 | + g_mutex_lock(vmaw->wt.mutex); |
345 | + /* signal writer thread that we have new data */ | |
346 | + g_cond_signal(vmaw->wt.change_cond); | |
309874bd DM |
347 | + g_mutex_unlock(vmaw->wt.mutex); |
348 | ||
349 | - DPRINTF("vma_queue_write leave\n"); | |
350 | + DPRINTF("vma_queue_write queued %zd\n", bytes); | |
351 | ||
352 | return bytes; | |
353 | } | |
2dfd543c | 354 | @@ -386,10 +441,10 @@ |
309874bd DM |
355 | const char *tmp_id_str; |
356 | ||
357 | if ((stat(filename, &st) == 0) && S_ISFIFO(st.st_mode)) { | |
358 | - oflags = O_NONBLOCK|O_WRONLY; | |
359 | + oflags = O_WRONLY; | |
360 | vmaw->fd = qemu_open(filename, oflags, 0644); | |
361 | } else if (strstart(filename, "/dev/fdset/", &tmp_id_str)) { | |
362 | - oflags = O_NONBLOCK|O_WRONLY; | |
363 | + oflags = O_WRONLY; | |
364 | vmaw->fd = qemu_open(filename, oflags, 0644); | |
365 | } else if (strstart(filename, "/dev/fdname/", &tmp_id_str)) { | |
366 | vmaw->fd = monitor_get_fd(cur_mon, tmp_id_str, errp); | |
2dfd543c | 367 | @@ -397,7 +452,7 @@ |
309874bd DM |
368 | goto err; |
369 | } | |
370 | } else { | |
371 | - oflags = O_NONBLOCK|O_WRONLY|O_CREAT|O_EXCL; | |
372 | + oflags = O_WRONLY|O_CREAT|O_EXCL; | |
373 | vmaw->fd = qemu_open(filename, oflags, 0644); | |
374 | } | |
375 | ||
2dfd543c | 376 | @@ -415,10 +470,19 @@ |
309874bd DM |
377 | |
378 | qemu_co_mutex_init(&vmaw->writer_lock); | |
379 | qemu_co_mutex_init(&vmaw->flush_lock); | |
380 | - qemu_co_queue_init(&vmaw->wqueue); | |
2dfd543c | 381 | + qemu_co_queue_init(&vmaw->wt.wqueue); |
309874bd DM |
382 | |
383 | uuid_copy(vmaw->uuid, uuid); | |
384 | ||
309874bd DM |
385 | + vmaw->wt.mutex = g_mutex_new(); |
386 | + vmaw->wt.change_cond = g_cond_new(); | |
387 | + vmaw->wt.fd = vmaw->fd; | |
388 | + vmaw->wt.thread = g_thread_create(vma_writer_thread, &vmaw->wt, true, NULL); | |
389 | + if (vmaw->wt.thread == NULL) { | |
390 | + error_setg(errp, "can't allocate writer thread\n"); | |
391 | + goto err; | |
392 | + } | |
393 | + | |
394 | return vmaw; | |
395 | ||
396 | err: | |
2dfd543c | 397 | @@ -433,6 +497,14 @@ |
309874bd DM |
398 | g_checksum_free(vmaw->md5csum); |
399 | } | |
400 | ||
401 | + if (vmaw->wt.mutex) { | |
402 | + g_mutex_free(vmaw->wt.mutex); | |
403 | + } | |
404 | + | |
405 | + if (vmaw->wt.change_cond) { | |
406 | + g_cond_free(vmaw->wt.change_cond); | |
407 | + } | |
408 | + | |
409 | g_free(vmaw); | |
410 | } | |
411 | ||
2dfd543c | 412 | @@ -672,6 +744,14 @@ |
309874bd DM |
413 | |
414 | *zero_bytes = 0; | |
415 | ||
309874bd | 416 | + int error = vmaw->wt.error; |
309874bd DM |
417 | + |
418 | + if (error) { | |
419 | + vma_writer_set_error(vmaw, "vma_writer_get_buffer write error - %s", | |
420 | + strerror(error)); | |
421 | + return -1; | |
422 | + } | |
423 | + | |
424 | if (vmaw->status < 0) { | |
425 | return vmaw->status; | |
426 | } | |
2dfd543c DM |
427 | @@ -783,14 +863,17 @@ |
428 | ||
429 | int i; | |
309874bd | 430 | |
2dfd543c | 431 | + DPRINTF("vma_writer_close start\n"); |
309874bd DM |
432 | vma_queue_flush(vmaw); |
433 | ||
2dfd543c | 434 | /* this should not happen - just to be sure */ |
309874bd | 435 | - while (!qemu_co_queue_empty(&vmaw->wqueue)) { |
2dfd543c DM |
436 | + while (!qemu_co_queue_empty(&vmaw->wt.wqueue)) { |
437 | DPRINTF("vma_writer_close wait\n"); | |
438 | co_sleep_ns(rt_clock, 1000000); | |
439 | } | |
309874bd | 440 | |
2dfd543c DM |
441 | + vma_stop_writer_thread(vmaw); |
442 | + | |
309874bd DM |
443 | if (vmaw->cmd) { |
444 | if (pclose(vmaw->cmd) < 0) { | |
2dfd543c DM |
445 | vma_writer_set_error(vmaw, "vma_writer_close: " |
446 | @@ -835,8 +918,9 @@ | |
309874bd DM |
447 | { |
448 | assert(vmaw); | |
449 | ||
450 | - int i; | |
451 | + vma_stop_writer_thread(vmaw); | |
452 | ||
453 | + int i; | |
454 | for (i = 0; i <= 255; i++) { | |
455 | if (vmaw->stream_info[i].devname) { | |
456 | g_free(vmaw->stream_info[i].devname); | |
2dfd543c | 457 | @@ -847,6 +931,14 @@ |
309874bd DM |
458 | g_checksum_free(vmaw->md5csum); |
459 | } | |
460 | ||
461 | + if (vmaw->wt.mutex) { | |
462 | + g_mutex_free(vmaw->wt.mutex); | |
463 | + } | |
464 | + | |
465 | + if (vmaw->wt.change_cond) { | |
466 | + g_cond_free(vmaw->wt.change_cond); | |
467 | + } | |
468 | + | |
469 | g_free(vmaw); | |
470 | } | |
471 |