1 From 9cf799746fb3b21362fd62574cd76bfa14a24070 Mon Sep 17 00:00:00 2001
2 From: Dietmar Maurer <dietmar@proxmox.com>
3 Date: Mon, 14 Jan 2013 08:05:40 +0100
4 Subject: [PATCH v3 7/7] use extra thread for vma writer
6 The previous AIO approach has problem with bdrv_drain_all(), because writer
7 coroutines are not considered there. Those coroutines are not restarted, so
8 bdrv_drain_all() can fail (tracked_requests list not empty).
10 We now use a thread, so we could also add compression here.
12 Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
14 vma-writer.c | 296 +++++++++++++++++++++++++++++++++++-----------------------
15 1 files changed, 180 insertions(+), 116 deletions(-)
17 diff --git a/vma-writer.c b/vma-writer.c
18 index 688af4b..e18591e 100644
23 #define WRITE_BUFFERS 5
25 -typedef struct VmaAIOCB VmaAIOCB;
28 +typedef struct WriteBuffer {
29 unsigned char buffer[VMA_MAX_EXTENT_SIZE];
35 +typedef struct WriterThread {
42 + WriteBuffer wbuf[WRITE_BUFFERS];
47 @@ -61,8 +68,7 @@ struct VmaWriter {
48 int outbuf_count; /* in VMA_BLOCKS */
49 uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT];
51 - VmaAIOCB aiocbs[WRITE_BUFFERS];
57 @@ -88,6 +94,80 @@ struct VmaWriter {
58 uint32_t config_count;
61 +static gpointer vma_writer_thread(gpointer data)
63 + WriterThread *wt = (WriterThread *)data;
66 + WriteBuffer *b = NULL;
68 + g_mutex_lock(wt->mutex);
70 + for (i = 0; i < WRITE_BUFFERS; i++) {
71 + if (wt->wbuf[i].bytes) {
76 + g_mutex_unlock(wt->mutex);
80 + while (done < b->bytes) {
81 + int ret = write(wt->fd, b->buffer + done, b->bytes - done);
84 + } else if (ret < 0) {
85 + if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
86 + g_mutex_lock(wt->mutex);
88 + g_mutex_unlock(wt->mutex);
91 + } else if (ret == 0) {
92 + /* should not happen - simply try again */
95 + g_mutex_lock(wt->mutex);
97 + g_mutex_unlock(wt->mutex);
101 + DPRINTF("WRITER THREAD ERROR %d - exit thread\n", wt->error);
102 + g_thread_exit(NULL);
105 + g_mutex_lock(wt->mutex);
106 + bool cancel = wt->cancel;
107 + if (!b && !cancel) {
108 + g_cond_wait(wt->change_cond, wt->mutex);
110 + g_mutex_unlock(wt->mutex);
113 + DPRINTF("END WRITER THREAD\n");
114 + g_thread_exit(NULL);
121 +static void vma_stop_writer_thread(VmaWriter *vmaw)
125 + if (vmaw->wt.thread) {
126 + g_mutex_lock(vmaw->wt.mutex);
127 + vmaw->wt.cancel = true;
128 + g_cond_signal(vmaw->wt.change_cond);
129 + g_mutex_unlock(vmaw->wt.mutex);
130 + g_thread_join(vmaw->wt.thread);
131 + vmaw->wt.thread = NULL;
135 void vma_writer_set_error(VmaWriter *vmaw, const char *fmt, ...)
138 @@ -215,111 +295,47 @@ int vma_writer_register_stream(VmaWriter *vmaw, const char *devname,
142 -static void vma_co_continue_write(void *opaque)
144 - VmaWriter *vmaw = opaque;
146 - qemu_aio_set_fd_handler(vmaw->fd, NULL, NULL, NULL, NULL);
148 - DPRINTF("vma_co_continue_write\n");
149 - qemu_coroutine_enter(vmaw->co_writer, NULL);
152 -static ssize_t coroutine_fn
153 -vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes)
158 - /* atomic writes (we cannot interleave writes) */
159 - qemu_co_mutex_lock(&vmaw->writer_lock);
161 - DPRINTF("vma_co_write enter %zd\n", bytes);
163 - while (done < bytes) {
164 - ret = write(vmaw->fd, buf + done, bytes - done);
167 - DPRINTF("vma_co_write written %zd %zd\n", done, ret);
168 - } else if (ret < 0) {
169 - if (errno == EAGAIN || errno == EWOULDBLOCK) {
170 - DPRINTF("vma_co_write yield %zd\n", done);
172 - vmaw->co_writer = qemu_coroutine_self();
173 - qemu_aio_set_fd_handler(vmaw->fd, NULL, vma_co_continue_write,
176 - qemu_coroutine_yield();
177 - DPRINTF("vma_co_write restart %zd\n", done);
179 - vma_writer_set_error(vmaw, "vma_co_write write error - %s",
181 - done = -1; /* always return failure for partial writes */
184 - } else if (ret == 0) {
185 - /* should not happen - simply try again */
189 - qemu_co_mutex_unlock(&vmaw->writer_lock);
191 - DPRINTF("vma_co_write leave %zd\n", done);
195 -static void coroutine_fn vma_co_writer_task(void *opaque)
197 - VmaAIOCB *cb = opaque;
199 - DPRINTF("vma_co_writer_task start\n");
201 - int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes);
202 - DPRINTF("vma_co_writer_task write done %zd\n", done);
204 - if (done != cb->bytes) {
205 - DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done);
206 - vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd",
212 - qemu_co_queue_next(&cb->vmaw->wqueue);
214 - DPRINTF("vma_co_writer_task end\n");
217 static void coroutine_fn vma_queue_flush(VmaWriter *vmaw)
219 DPRINTF("vma_queue_flush enter\n");
227 - VmaAIOCB *cb = NULL;
228 + WriteBuffer *b = NULL;
229 + g_mutex_lock(vmaw->wt.mutex);
231 + error = vmaw->wt.error;
233 for (i = 0; i < WRITE_BUFFERS; i++) {
234 - if (vmaw->aiocbs[i].bytes) {
235 - cb = &vmaw->aiocbs[i];
236 - DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i,
237 - vmaw->aiocbs[i].bytes);
238 + if (vmaw->wt.wbuf[i].bytes) {
239 + b = &vmaw->wt.wbuf[i];
240 + DPRINTF("FOUND USED WRITE BUFFER %d %zd\n", i,
241 + vmaw->wt.wbuf[i].bytes);
246 + g_mutex_unlock(vmaw->wt.mutex);
251 - qemu_co_queue_wait(&vmaw->wqueue);
252 + uint64_t delay_ns = 10000;
253 + DPRINTF("WAIT FOR BUFFER FLUSH %zd\n", delay_ns);
254 + co_sleep_ns(rt_clock, delay_ns);
258 + vma_writer_set_error(vmaw, "vma_queue_flush write error - %s",
262 DPRINTF("vma_queue_flush leave\n");
266 - * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a')
267 - * So we need to create a coroutione to allow 'parallel' execution.
269 static ssize_t coroutine_fn
270 vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
272 @@ -329,29 +345,46 @@ vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
274 assert(bytes <= VMA_MAX_EXTENT_SIZE);
276 - VmaAIOCB *cb = NULL;
280 + /* wait for a free output buffer */
281 + g_mutex_lock(vmaw->wt.mutex);
282 + WriteBuffer *b = NULL;
284 + error = vmaw->wt.error;
286 + g_mutex_unlock(vmaw->wt.mutex);
287 + vma_writer_set_error(vmaw, "vma_queue_write error - %s",
293 for (i = 0; i < WRITE_BUFFERS; i++) {
294 - if (!vmaw->aiocbs[i].bytes) {
295 - cb = &vmaw->aiocbs[i];
296 + if (!vmaw->wt.wbuf[i].bytes) {
297 + b = &vmaw->wt.wbuf[i];
302 - qemu_co_queue_wait(&vmaw->wqueue);
304 + uint64_t delay_ns = 10000;
305 + DPRINTF("WAIT FOR BUFFER %zd\n", delay_ns);
306 + g_mutex_unlock(vmaw->wt.mutex);
307 + co_sleep_ns(rt_clock, delay_ns);
308 + g_mutex_lock(vmaw->wt.mutex);
312 - memcpy(cb->buffer, buf, bytes);
315 + /* copy data to output buffer */
316 + memcpy(b->buffer, buf, bytes);
319 + /* signal writer thread that we have new data */
320 + g_cond_signal(vmaw->wt.change_cond);
322 - DPRINTF("vma_queue_write start %zd\n", bytes);
323 - cb->co = qemu_coroutine_create(vma_co_writer_task);
324 - qemu_coroutine_enter(cb->co, cb);
325 + g_mutex_unlock(vmaw->wt.mutex);
327 - DPRINTF("vma_queue_write leave\n");
328 + DPRINTF("vma_queue_write queued %zd\n", bytes);
332 @@ -389,10 +422,10 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
333 const char *tmp_id_str;
335 if ((stat(filename, &st) == 0) && S_ISFIFO(st.st_mode)) {
336 - oflags = O_NONBLOCK|O_WRONLY;
338 vmaw->fd = qemu_open(filename, oflags, 0644);
339 } else if (strstart(filename, "/dev/fdset/", &tmp_id_str)) {
340 - oflags = O_NONBLOCK|O_WRONLY;
342 vmaw->fd = qemu_open(filename, oflags, 0644);
343 } else if (strstart(filename, "/dev/fdname/", &tmp_id_str)) {
344 vmaw->fd = monitor_get_fd(cur_mon, tmp_id_str, errp);
345 @@ -400,7 +433,7 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
349 - oflags = O_NONBLOCK|O_WRONLY|O_CREAT|O_EXCL;
350 + oflags = O_WRONLY|O_CREAT|O_EXCL;
351 vmaw->fd = qemu_open(filename, oflags, 0644);
354 @@ -418,7 +451,6 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
356 qemu_co_mutex_init(&vmaw->writer_lock);
357 qemu_co_mutex_init(&vmaw->flush_lock);
358 - qemu_co_queue_init(&vmaw->wqueue);
360 uuid_copy(vmaw->uuid, uuid);
362 @@ -428,6 +460,15 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
364 ratelimit_set_speed(&vmaw->limit, speed, 100000000ULL /* 0.1 sec */);
366 + vmaw->wt.mutex = g_mutex_new();
367 + vmaw->wt.change_cond = g_cond_new();
368 + vmaw->wt.fd = vmaw->fd;
369 + vmaw->wt.thread = g_thread_create(vma_writer_thread, &vmaw->wt, true, NULL);
370 + if (vmaw->wt.thread == NULL) {
371 + error_setg(errp, "can't allocate writer thread\n");
378 @@ -442,6 +483,14 @@ err:
379 g_checksum_free(vmaw->md5csum);
382 + if (vmaw->wt.mutex) {
383 + g_mutex_free(vmaw->wt.mutex);
386 + if (vmaw->wt.change_cond) {
387 + g_cond_free(vmaw->wt.change_cond);
393 @@ -688,6 +737,16 @@ vma_writer_write(VmaWriter *vmaw, uint8_t dev_id, int64_t cluster_num,
397 + g_mutex_lock(vmaw->wt.mutex);
398 + int error = vmaw->wt.error;
399 + g_mutex_unlock(vmaw->wt.mutex);
402 + vma_writer_set_error(vmaw, "vma_writer_get_buffer write error - %s",
407 if (vmaw->status < 0) {
410 @@ -801,11 +860,7 @@ int vma_writer_close(VmaWriter *vmaw, Error **errp)
412 vma_queue_flush(vmaw);
414 - /* this should not happen - just to be sure */
415 - while (!qemu_co_queue_empty(&vmaw->wqueue)) {
416 - DPRINTF("vma_writer_close wait\n");
417 - co_sleep_ns(rt_clock, 1000000);
419 + vma_stop_writer_thread(vmaw);
422 if (pclose(vmaw->cmd) < 0) {
423 @@ -851,8 +906,9 @@ void vma_writer_destroy(VmaWriter *vmaw)
428 + vma_stop_writer_thread(vmaw);
431 for (i = 0; i <= 255; i++) {
432 if (vmaw->stream_info[i].devname) {
433 g_free(vmaw->stream_info[i].devname);
434 @@ -863,6 +919,14 @@ void vma_writer_destroy(VmaWriter *vmaw)
435 g_checksum_free(vmaw->md5csum);
438 + if (vmaw->wt.mutex) {
439 + g_mutex_free(vmaw->wt.mutex);
442 + if (vmaw->wt.change_cond) {
443 + g_cond_free(vmaw->wt.change_cond);