]>
Commit | Line | Data |
---|---|---|
309874bd DM |
1 | From 9cf799746fb3b21362fd62574cd76bfa14a24070 Mon Sep 17 00:00:00 2001 |
2 | From: Dietmar Maurer <dietmar@proxmox.com> | |
3 | Date: Mon, 14 Jan 2013 08:05:40 +0100 | |
4 | Subject: [PATCH v3 7/7] use extra thread for vma writer | |
5 | ||
6 | The previous AIO approach has problem with bdrv_drain_all(), because writer | |
7 | coroutines are not considered there. Those coroutines are not restarted, so | |
8 | bdrv_drain_all() can fail (tracked_requests list not empty). | |
9 | ||
10 | We now use a thread, so we could also add compression here. | |
11 | ||
12 | Signed-off-by: Dietmar Maurer <dietmar@proxmox.com> | |
13 | --- | |
14 | vma-writer.c | 296 +++++++++++++++++++++++++++++++++++----------------------- | |
15 | 1 files changed, 180 insertions(+), 116 deletions(-) | |
16 | ||
17 | diff --git a/vma-writer.c b/vma-writer.c | |
18 | index 688af4b..e18591e 100644 | |
19 | --- a/vma-writer.c | |
20 | +++ b/vma-writer.c | |
21 | @@ -38,13 +38,20 @@ | |
22 | ||
23 | #define WRITE_BUFFERS 5 | |
24 | ||
25 | -typedef struct VmaAIOCB VmaAIOCB; | |
26 | -struct VmaAIOCB { | |
27 | - VmaWriter *vmaw; | |
28 | +typedef struct WriteBuffer { | |
29 | unsigned char buffer[VMA_MAX_EXTENT_SIZE]; | |
30 | size_t bytes; | |
31 | - Coroutine *co; | |
32 | -}; | |
33 | +} WriteBuffer; | |
34 | + | |
35 | +typedef struct WriterThread { | |
36 | + int fd; | |
37 | + int error; | |
38 | + bool cancel; | |
39 | + GThread *thread; | |
40 | + GMutex *mutex; | |
41 | + GCond *change_cond; | |
42 | + WriteBuffer wbuf[WRITE_BUFFERS]; | |
43 | +} WriterThread; | |
44 | ||
45 | struct VmaWriter { | |
46 | int fd; | |
47 | @@ -61,8 +68,7 @@ struct VmaWriter { | |
48 | int outbuf_count; /* in VMA_BLOCKS */ | |
49 | uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT]; | |
50 | ||
51 | - VmaAIOCB aiocbs[WRITE_BUFFERS]; | |
52 | - CoQueue wqueue; | |
53 | + WriterThread wt; | |
54 | ||
55 | GChecksum *md5csum; | |
56 | CoMutex writer_lock; | |
57 | @@ -88,6 +94,80 @@ struct VmaWriter { | |
58 | uint32_t config_count; | |
59 | }; | |
60 | ||
61 | +static gpointer vma_writer_thread(gpointer data) | |
62 | +{ | |
63 | + WriterThread *wt = (WriterThread *)data; | |
64 | + | |
65 | + while (1) { | |
66 | + WriteBuffer *b = NULL; | |
67 | + | |
68 | + g_mutex_lock(wt->mutex); | |
69 | + int i; | |
70 | + for (i = 0; i < WRITE_BUFFERS; i++) { | |
71 | + if (wt->wbuf[i].bytes) { | |
72 | + b = &wt->wbuf[i]; | |
73 | + break; | |
74 | + } | |
75 | + } | |
76 | + g_mutex_unlock(wt->mutex); | |
77 | + | |
78 | + if (b) { | |
79 | + size_t done = 0; | |
80 | + while (done < b->bytes) { | |
81 | + int ret = write(wt->fd, b->buffer + done, b->bytes - done); | |
82 | + if (ret > 0) { | |
83 | + done += ret; | |
84 | + } else if (ret < 0) { | |
85 | + if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { | |
86 | + g_mutex_lock(wt->mutex); | |
87 | + wt->error = errno; | |
88 | + g_mutex_unlock(wt->mutex); | |
89 | + break; | |
90 | + } | |
91 | + } else if (ret == 0) { | |
92 | + /* should not happen - simply try again */ | |
93 | + } | |
94 | + } | |
95 | + g_mutex_lock(wt->mutex); | |
96 | + b->bytes = 0; | |
97 | + g_mutex_unlock(wt->mutex); | |
98 | + } | |
99 | + | |
100 | + if (wt->error) { | |
101 | + DPRINTF("WRITER THREAD ERROR %d - exit thread\n", wt->error); | |
102 | + g_thread_exit(NULL); | |
103 | + } | |
104 | + | |
105 | + g_mutex_lock(wt->mutex); | |
106 | + bool cancel = wt->cancel; | |
107 | + if (!b && !cancel) { | |
108 | + g_cond_wait(wt->change_cond, wt->mutex); | |
109 | + } | |
110 | + g_mutex_unlock(wt->mutex); | |
111 | + | |
112 | + if (cancel) { | |
113 | + DPRINTF("END WRITER THREAD\n"); | |
114 | + g_thread_exit(NULL); | |
115 | + } | |
116 | + } | |
117 | + | |
118 | + return NULL; | |
119 | +} | |
120 | + | |
121 | +static void vma_stop_writer_thread(VmaWriter *vmaw) | |
122 | +{ | |
123 | + assert(vmaw); | |
124 | + | |
125 | + if (vmaw->wt.thread) { | |
126 | + g_mutex_lock(vmaw->wt.mutex); | |
127 | + vmaw->wt.cancel = true; | |
128 | + g_cond_signal(vmaw->wt.change_cond); | |
129 | + g_mutex_unlock(vmaw->wt.mutex); | |
130 | + g_thread_join(vmaw->wt.thread); | |
131 | + vmaw->wt.thread = NULL; | |
132 | + } | |
133 | +} | |
134 | + | |
135 | void vma_writer_set_error(VmaWriter *vmaw, const char *fmt, ...) | |
136 | { | |
137 | va_list ap; | |
138 | @@ -215,111 +295,47 @@ int vma_writer_register_stream(VmaWriter *vmaw, const char *devname, | |
139 | return n; | |
140 | } | |
141 | ||
142 | -static void vma_co_continue_write(void *opaque) | |
143 | -{ | |
144 | - VmaWriter *vmaw = opaque; | |
145 | - | |
146 | - qemu_aio_set_fd_handler(vmaw->fd, NULL, NULL, NULL, NULL); | |
147 | - | |
148 | - DPRINTF("vma_co_continue_write\n"); | |
149 | - qemu_coroutine_enter(vmaw->co_writer, NULL); | |
150 | -} | |
151 | - | |
152 | -static ssize_t coroutine_fn | |
153 | -vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes) | |
154 | -{ | |
155 | - size_t done = 0; | |
156 | - ssize_t ret; | |
157 | - | |
158 | - /* atomic writes (we cannot interleave writes) */ | |
159 | - qemu_co_mutex_lock(&vmaw->writer_lock); | |
160 | - | |
161 | - DPRINTF("vma_co_write enter %zd\n", bytes); | |
162 | - | |
163 | - while (done < bytes) { | |
164 | - ret = write(vmaw->fd, buf + done, bytes - done); | |
165 | - if (ret > 0) { | |
166 | - done += ret; | |
167 | - DPRINTF("vma_co_write written %zd %zd\n", done, ret); | |
168 | - } else if (ret < 0) { | |
169 | - if (errno == EAGAIN || errno == EWOULDBLOCK) { | |
170 | - DPRINTF("vma_co_write yield %zd\n", done); | |
171 | - | |
172 | - vmaw->co_writer = qemu_coroutine_self(); | |
173 | - qemu_aio_set_fd_handler(vmaw->fd, NULL, vma_co_continue_write, | |
174 | - NULL, vmaw); | |
175 | - | |
176 | - qemu_coroutine_yield(); | |
177 | - DPRINTF("vma_co_write restart %zd\n", done); | |
178 | - } else { | |
179 | - vma_writer_set_error(vmaw, "vma_co_write write error - %s", | |
180 | - strerror(errno)); | |
181 | - done = -1; /* always return failure for partial writes */ | |
182 | - break; | |
183 | - } | |
184 | - } else if (ret == 0) { | |
185 | - /* should not happen - simply try again */ | |
186 | - } | |
187 | - } | |
188 | - | |
189 | - qemu_co_mutex_unlock(&vmaw->writer_lock); | |
190 | - | |
191 | - DPRINTF("vma_co_write leave %zd\n", done); | |
192 | - return done; | |
193 | -} | |
194 | - | |
195 | -static void coroutine_fn vma_co_writer_task(void *opaque) | |
196 | -{ | |
197 | - VmaAIOCB *cb = opaque; | |
198 | - | |
199 | - DPRINTF("vma_co_writer_task start\n"); | |
200 | - | |
201 | - int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes); | |
202 | - DPRINTF("vma_co_writer_task write done %zd\n", done); | |
203 | - | |
204 | - if (done != cb->bytes) { | |
205 | - DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done); | |
206 | - vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd", | |
207 | - done); | |
208 | - } | |
209 | - | |
210 | - cb->bytes = 0; | |
211 | - | |
212 | - qemu_co_queue_next(&cb->vmaw->wqueue); | |
213 | - | |
214 | - DPRINTF("vma_co_writer_task end\n"); | |
215 | -} | |
216 | - | |
217 | static void coroutine_fn vma_queue_flush(VmaWriter *vmaw) | |
218 | { | |
219 | DPRINTF("vma_queue_flush enter\n"); | |
220 | ||
221 | assert(vmaw); | |
222 | ||
223 | + int error; | |
224 | + | |
225 | while (1) { | |
226 | int i; | |
227 | - VmaAIOCB *cb = NULL; | |
228 | + WriteBuffer *b = NULL; | |
229 | + g_mutex_lock(vmaw->wt.mutex); | |
230 | + | |
231 | + error = vmaw->wt.error; | |
232 | + | |
233 | for (i = 0; i < WRITE_BUFFERS; i++) { | |
234 | - if (vmaw->aiocbs[i].bytes) { | |
235 | - cb = &vmaw->aiocbs[i]; | |
236 | - DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i, | |
237 | - vmaw->aiocbs[i].bytes); | |
238 | + if (vmaw->wt.wbuf[i].bytes) { | |
239 | + b = &vmaw->wt.wbuf[i]; | |
240 | + DPRINTF("FOUND USED WRITE BUFFER %d %zd\n", i, | |
241 | + vmaw->wt.wbuf[i].bytes); | |
242 | break; | |
243 | } | |
244 | } | |
245 | - if (!cb) { | |
246 | + g_mutex_unlock(vmaw->wt.mutex); | |
247 | + | |
248 | + if (!b || error) { | |
249 | break; | |
250 | } | |
251 | - qemu_co_queue_wait(&vmaw->wqueue); | |
252 | + uint64_t delay_ns = 10000; | |
253 | + DPRINTF("WAIT FOR BUFFER FLUSH %zd\n", delay_ns); | |
254 | + co_sleep_ns(rt_clock, delay_ns); | |
255 | + } | |
256 | + | |
257 | + if (error) { | |
258 | + vma_writer_set_error(vmaw, "vma_queue_flush write error - %s", | |
259 | + strerror(error)); | |
260 | } | |
261 | ||
262 | DPRINTF("vma_queue_flush leave\n"); | |
263 | } | |
264 | ||
265 | -/** | |
266 | - * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a') | |
267 | - * So we need to create a coroutione to allow 'parallel' execution. | |
268 | - */ | |
269 | static ssize_t coroutine_fn | |
270 | vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes) | |
271 | { | |
272 | @@ -329,29 +345,46 @@ vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes) | |
273 | assert(buf); | |
274 | assert(bytes <= VMA_MAX_EXTENT_SIZE); | |
275 | ||
276 | - VmaAIOCB *cb = NULL; | |
277 | - while (!cb) { | |
278 | + int error = 0; | |
279 | + | |
280 | + /* wait for a free output buffer */ | |
281 | + g_mutex_lock(vmaw->wt.mutex); | |
282 | + WriteBuffer *b = NULL; | |
283 | + while (!b) { | |
284 | + error = vmaw->wt.error; | |
285 | + if (error) { | |
286 | + g_mutex_unlock(vmaw->wt.mutex); | |
287 | + vma_writer_set_error(vmaw, "vma_queue_write error - %s", | |
288 | + strerror(error)); | |
289 | + return -1; | |
290 | + } | |
291 | + | |
292 | int i; | |
293 | for (i = 0; i < WRITE_BUFFERS; i++) { | |
294 | - if (!vmaw->aiocbs[i].bytes) { | |
295 | - cb = &vmaw->aiocbs[i]; | |
296 | + if (!vmaw->wt.wbuf[i].bytes) { | |
297 | + b = &vmaw->wt.wbuf[i]; | |
298 | break; | |
299 | } | |
300 | } | |
301 | - if (!cb) { | |
302 | - qemu_co_queue_wait(&vmaw->wqueue); | |
303 | + if (!b) { | |
304 | + uint64_t delay_ns = 10000; | |
305 | + DPRINTF("WAIT FOR BUFFER %zd\n", delay_ns); | |
306 | + g_mutex_unlock(vmaw->wt.mutex); | |
307 | + co_sleep_ns(rt_clock, delay_ns); | |
308 | + g_mutex_lock(vmaw->wt.mutex); | |
309 | } | |
310 | } | |
311 | ||
312 | - memcpy(cb->buffer, buf, bytes); | |
313 | - cb->bytes = bytes; | |
314 | - cb->vmaw = vmaw; | |
315 | + /* copy data to output buffer */ | |
316 | + memcpy(b->buffer, buf, bytes); | |
317 | + b->bytes = bytes; | |
318 | + | |
319 | + /* signal writer thread that we have new data */ | |
320 | + g_cond_signal(vmaw->wt.change_cond); | |
321 | ||
322 | - DPRINTF("vma_queue_write start %zd\n", bytes); | |
323 | - cb->co = qemu_coroutine_create(vma_co_writer_task); | |
324 | - qemu_coroutine_enter(cb->co, cb); | |
325 | + g_mutex_unlock(vmaw->wt.mutex); | |
326 | ||
327 | - DPRINTF("vma_queue_write leave\n"); | |
328 | + DPRINTF("vma_queue_write queued %zd\n", bytes); | |
329 | ||
330 | return bytes; | |
331 | } | |
332 | @@ -389,10 +422,10 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed, | |
333 | const char *tmp_id_str; | |
334 | ||
335 | if ((stat(filename, &st) == 0) && S_ISFIFO(st.st_mode)) { | |
336 | - oflags = O_NONBLOCK|O_WRONLY; | |
337 | + oflags = O_WRONLY; | |
338 | vmaw->fd = qemu_open(filename, oflags, 0644); | |
339 | } else if (strstart(filename, "/dev/fdset/", &tmp_id_str)) { | |
340 | - oflags = O_NONBLOCK|O_WRONLY; | |
341 | + oflags = O_WRONLY; | |
342 | vmaw->fd = qemu_open(filename, oflags, 0644); | |
343 | } else if (strstart(filename, "/dev/fdname/", &tmp_id_str)) { | |
344 | vmaw->fd = monitor_get_fd(cur_mon, tmp_id_str, errp); | |
345 | @@ -400,7 +433,7 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed, | |
346 | goto err; | |
347 | } | |
348 | } else { | |
349 | - oflags = O_NONBLOCK|O_WRONLY|O_CREAT|O_EXCL; | |
350 | + oflags = O_WRONLY|O_CREAT|O_EXCL; | |
351 | vmaw->fd = qemu_open(filename, oflags, 0644); | |
352 | } | |
353 | ||
354 | @@ -418,7 +451,6 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed, | |
355 | ||
356 | qemu_co_mutex_init(&vmaw->writer_lock); | |
357 | qemu_co_mutex_init(&vmaw->flush_lock); | |
358 | - qemu_co_queue_init(&vmaw->wqueue); | |
359 | ||
360 | uuid_copy(vmaw->uuid, uuid); | |
361 | ||
362 | @@ -428,6 +460,15 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed, | |
363 | ||
364 | ratelimit_set_speed(&vmaw->limit, speed, 100000000ULL /* 0.1 sec */); | |
365 | ||
366 | + vmaw->wt.mutex = g_mutex_new(); | |
367 | + vmaw->wt.change_cond = g_cond_new(); | |
368 | + vmaw->wt.fd = vmaw->fd; | |
369 | + vmaw->wt.thread = g_thread_create(vma_writer_thread, &vmaw->wt, true, NULL); | |
370 | + if (vmaw->wt.thread == NULL) { | |
371 | + error_setg(errp, "can't allocate writer thread\n"); | |
372 | + goto err; | |
373 | + } | |
374 | + | |
375 | return vmaw; | |
376 | ||
377 | err: | |
378 | @@ -442,6 +483,14 @@ err: | |
379 | g_checksum_free(vmaw->md5csum); | |
380 | } | |
381 | ||
382 | + if (vmaw->wt.mutex) { | |
383 | + g_mutex_free(vmaw->wt.mutex); | |
384 | + } | |
385 | + | |
386 | + if (vmaw->wt.change_cond) { | |
387 | + g_cond_free(vmaw->wt.change_cond); | |
388 | + } | |
389 | + | |
390 | g_free(vmaw); | |
391 | } | |
392 | ||
393 | @@ -688,6 +737,16 @@ vma_writer_write(VmaWriter *vmaw, uint8_t dev_id, int64_t cluster_num, | |
394 | ||
395 | *zero_bytes = 0; | |
396 | ||
397 | + g_mutex_lock(vmaw->wt.mutex); | |
398 | + int error = vmaw->wt.error; | |
399 | + g_mutex_unlock(vmaw->wt.mutex); | |
400 | + | |
401 | + if (error) { | |
402 | + vma_writer_set_error(vmaw, "vma_writer_get_buffer write error - %s", | |
403 | + strerror(error)); | |
404 | + return -1; | |
405 | + } | |
406 | + | |
407 | if (vmaw->status < 0) { | |
408 | return vmaw->status; | |
409 | } | |
410 | @@ -801,11 +860,7 @@ int vma_writer_close(VmaWriter *vmaw, Error **errp) | |
411 | ||
412 | vma_queue_flush(vmaw); | |
413 | ||
414 | - /* this should not happen - just to be sure */ | |
415 | - while (!qemu_co_queue_empty(&vmaw->wqueue)) { | |
416 | - DPRINTF("vma_writer_close wait\n"); | |
417 | - co_sleep_ns(rt_clock, 1000000); | |
418 | - } | |
419 | + vma_stop_writer_thread(vmaw); | |
420 | ||
421 | if (vmaw->cmd) { | |
422 | if (pclose(vmaw->cmd) < 0) { | |
423 | @@ -851,8 +906,9 @@ void vma_writer_destroy(VmaWriter *vmaw) | |
424 | { | |
425 | assert(vmaw); | |
426 | ||
427 | - int i; | |
428 | + vma_stop_writer_thread(vmaw); | |
429 | ||
430 | + int i; | |
431 | for (i = 0; i <= 255; i++) { | |
432 | if (vmaw->stream_info[i].devname) { | |
433 | g_free(vmaw->stream_info[i].devname); | |
434 | @@ -863,6 +919,14 @@ void vma_writer_destroy(VmaWriter *vmaw) | |
435 | g_checksum_free(vmaw->md5csum); | |
436 | } | |
437 | ||
438 | + if (vmaw->wt.mutex) { | |
439 | + g_mutex_free(vmaw->wt.mutex); | |
440 | + } | |
441 | + | |
442 | + if (vmaw->wt.change_cond) { | |
443 | + g_cond_free(vmaw->wt.change_cond); | |
444 | + } | |
445 | + | |
446 | g_free(vmaw); | |
447 | } | |
448 | ||
449 | -- | |
450 | 1.7.2.5 | |
451 |