+From 9cf799746fb3b21362fd62574cd76bfa14a24070 Mon Sep 17 00:00:00 2001
+From: Dietmar Maurer <dietmar@proxmox.com>
+Date: Mon, 14 Jan 2013 08:05:40 +0100
+Subject: [PATCH v3 7/7] use extra thread for vma writer
+
+The previous AIO approach has problem with bdrv_drain_all(), because writer
+coroutines are not considered there. Those coroutines are not restarted, so
+bdrv_drain_all() can fail (tracked_requests list not empty).
+
+We now use a thread, so we could also add compression here.
+
+Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
+---
+ vma-writer.c | 296 +++++++++++++++++++++++++++++++++++-----------------------
+ 1 files changed, 180 insertions(+), 116 deletions(-)
+
+diff --git a/vma-writer.c b/vma-writer.c
+index 688af4b..e18591e 100644
+--- a/vma-writer.c
++++ b/vma-writer.c
+@@ -38,13 +38,20 @@
+
+ #define WRITE_BUFFERS 5
+
+-typedef struct VmaAIOCB VmaAIOCB;
+-struct VmaAIOCB {
+- VmaWriter *vmaw;
++typedef struct WriteBuffer {
+ unsigned char buffer[VMA_MAX_EXTENT_SIZE];
+ size_t bytes;
+- Coroutine *co;
+-};
++} WriteBuffer;
++
++typedef struct WriterThread {
++ int fd;
++ int error;
++ bool cancel;
++ GThread *thread;
++ GMutex *mutex;
++ GCond *change_cond;
++ WriteBuffer wbuf[WRITE_BUFFERS];
++} WriterThread;
+
+ struct VmaWriter {
+ int fd;
+@@ -61,8 +68,7 @@ struct VmaWriter {
+ int outbuf_count; /* in VMA_BLOCKS */
+ uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT];
+
+- VmaAIOCB aiocbs[WRITE_BUFFERS];
+- CoQueue wqueue;
++ WriterThread wt;
+
+ GChecksum *md5csum;
+ CoMutex writer_lock;
+@@ -88,6 +94,80 @@ struct VmaWriter {
+ uint32_t config_count;
+ };
+
++static gpointer vma_writer_thread(gpointer data)
++{
++ WriterThread *wt = (WriterThread *)data;
++
++ while (1) {
++ WriteBuffer *b = NULL;
++
++ g_mutex_lock(wt->mutex);
++ int i;
++ for (i = 0; i < WRITE_BUFFERS; i++) {
++ if (wt->wbuf[i].bytes) {
++ b = &wt->wbuf[i];
++ break;
++ }
++ }
++ g_mutex_unlock(wt->mutex);
++
++ if (b) {
++ size_t done = 0;
++ while (done < b->bytes) {
++ int ret = write(wt->fd, b->buffer + done, b->bytes - done);
++ if (ret > 0) {
++ done += ret;
++ } else if (ret < 0) {
++ if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
++ g_mutex_lock(wt->mutex);
++ wt->error = errno;
++ g_mutex_unlock(wt->mutex);
++ break;
++ }
++ } else if (ret == 0) {
++ /* should not happen - simply try again */
++ }
++ }
++ g_mutex_lock(wt->mutex);
++ b->bytes = 0;
++ g_mutex_unlock(wt->mutex);
++ }
++
++ if (wt->error) {
++ DPRINTF("WRITER THREAD ERROR %d - exit thread\n", wt->error);
++ g_thread_exit(NULL);
++ }
++
++ g_mutex_lock(wt->mutex);
++ bool cancel = wt->cancel;
++ if (!b && !cancel) {
++ g_cond_wait(wt->change_cond, wt->mutex);
++ }
++ g_mutex_unlock(wt->mutex);
++
++ if (cancel) {
++ DPRINTF("END WRITER THREAD\n");
++ g_thread_exit(NULL);
++ }
++ }
++
++ return NULL;
++}
++
++static void vma_stop_writer_thread(VmaWriter *vmaw)
++{
++ assert(vmaw);
++
++ if (vmaw->wt.thread) {
++ g_mutex_lock(vmaw->wt.mutex);
++ vmaw->wt.cancel = true;
++ g_cond_signal(vmaw->wt.change_cond);
++ g_mutex_unlock(vmaw->wt.mutex);
++ g_thread_join(vmaw->wt.thread);
++ vmaw->wt.thread = NULL;
++ }
++}
++
+ void vma_writer_set_error(VmaWriter *vmaw, const char *fmt, ...)
+ {
+ va_list ap;
+@@ -215,111 +295,47 @@ int vma_writer_register_stream(VmaWriter *vmaw, const char *devname,
+ return n;
+ }
+
+-static void vma_co_continue_write(void *opaque)
+-{
+- VmaWriter *vmaw = opaque;
+-
+- qemu_aio_set_fd_handler(vmaw->fd, NULL, NULL, NULL, NULL);
+-
+- DPRINTF("vma_co_continue_write\n");
+- qemu_coroutine_enter(vmaw->co_writer, NULL);
+-}
+-
+-static ssize_t coroutine_fn
+-vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes)
+-{
+- size_t done = 0;
+- ssize_t ret;
+-
+- /* atomic writes (we cannot interleave writes) */
+- qemu_co_mutex_lock(&vmaw->writer_lock);
+-
+- DPRINTF("vma_co_write enter %zd\n", bytes);
+-
+- while (done < bytes) {
+- ret = write(vmaw->fd, buf + done, bytes - done);
+- if (ret > 0) {
+- done += ret;
+- DPRINTF("vma_co_write written %zd %zd\n", done, ret);
+- } else if (ret < 0) {
+- if (errno == EAGAIN || errno == EWOULDBLOCK) {
+- DPRINTF("vma_co_write yield %zd\n", done);
+-
+- vmaw->co_writer = qemu_coroutine_self();
+- qemu_aio_set_fd_handler(vmaw->fd, NULL, vma_co_continue_write,
+- NULL, vmaw);
+-
+- qemu_coroutine_yield();
+- DPRINTF("vma_co_write restart %zd\n", done);
+- } else {
+- vma_writer_set_error(vmaw, "vma_co_write write error - %s",
+- strerror(errno));
+- done = -1; /* always return failure for partial writes */
+- break;
+- }
+- } else if (ret == 0) {
+- /* should not happen - simply try again */
+- }
+- }
+-
+- qemu_co_mutex_unlock(&vmaw->writer_lock);
+-
+- DPRINTF("vma_co_write leave %zd\n", done);
+- return done;
+-}
+-
+-static void coroutine_fn vma_co_writer_task(void *opaque)
+-{
+- VmaAIOCB *cb = opaque;
+-
+- DPRINTF("vma_co_writer_task start\n");
+-
+- int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes);
+- DPRINTF("vma_co_writer_task write done %zd\n", done);
+-
+- if (done != cb->bytes) {
+- DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done);
+- vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd",
+- done);
+- }
+-
+- cb->bytes = 0;
+-
+- qemu_co_queue_next(&cb->vmaw->wqueue);
+-
+- DPRINTF("vma_co_writer_task end\n");
+-}
+-
+ static void coroutine_fn vma_queue_flush(VmaWriter *vmaw)
+ {
+ DPRINTF("vma_queue_flush enter\n");
+
+ assert(vmaw);
+
++ int error;
++
+ while (1) {
+ int i;
+- VmaAIOCB *cb = NULL;
++ WriteBuffer *b = NULL;
++ g_mutex_lock(vmaw->wt.mutex);
++
++ error = vmaw->wt.error;
++
+ for (i = 0; i < WRITE_BUFFERS; i++) {
+- if (vmaw->aiocbs[i].bytes) {
+- cb = &vmaw->aiocbs[i];
+- DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i,
+- vmaw->aiocbs[i].bytes);
++ if (vmaw->wt.wbuf[i].bytes) {
++ b = &vmaw->wt.wbuf[i];
++ DPRINTF("FOUND USED WRITE BUFFER %d %zd\n", i,
++ vmaw->wt.wbuf[i].bytes);
+ break;
+ }
+ }
+- if (!cb) {
++ g_mutex_unlock(vmaw->wt.mutex);
++
++ if (!b || error) {
+ break;
+ }
+- qemu_co_queue_wait(&vmaw->wqueue);
++ uint64_t delay_ns = 10000;
++ DPRINTF("WAIT FOR BUFFER FLUSH %zd\n", delay_ns);
++ co_sleep_ns(rt_clock, delay_ns);
++ }
++
++ if (error) {
++ vma_writer_set_error(vmaw, "vma_queue_flush write error - %s",
++ strerror(error));
+ }
+
+ DPRINTF("vma_queue_flush leave\n");
+ }
+
+-/**
+- * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a')
+- * So we need to create a coroutione to allow 'parallel' execution.
+- */
+ static ssize_t coroutine_fn
+ vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
+ {
+@@ -329,29 +345,46 @@ vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
+ assert(buf);
+ assert(bytes <= VMA_MAX_EXTENT_SIZE);
+
+- VmaAIOCB *cb = NULL;
+- while (!cb) {
++ int error = 0;
++
++ /* wait for a free output buffer */
++ g_mutex_lock(vmaw->wt.mutex);
++ WriteBuffer *b = NULL;
++ while (!b) {
++ error = vmaw->wt.error;
++ if (error) {
++ g_mutex_unlock(vmaw->wt.mutex);
++ vma_writer_set_error(vmaw, "vma_queue_write error - %s",
++ strerror(error));
++ return -1;
++ }
++
+ int i;
+ for (i = 0; i < WRITE_BUFFERS; i++) {
+- if (!vmaw->aiocbs[i].bytes) {
+- cb = &vmaw->aiocbs[i];
++ if (!vmaw->wt.wbuf[i].bytes) {
++ b = &vmaw->wt.wbuf[i];
+ break;
+ }
+ }
+- if (!cb) {
+- qemu_co_queue_wait(&vmaw->wqueue);
++ if (!b) {
++ uint64_t delay_ns = 10000;
++ DPRINTF("WAIT FOR BUFFER %zd\n", delay_ns);
++ g_mutex_unlock(vmaw->wt.mutex);
++ co_sleep_ns(rt_clock, delay_ns);
++ g_mutex_lock(vmaw->wt.mutex);
+ }
+ }
+
+- memcpy(cb->buffer, buf, bytes);
+- cb->bytes = bytes;
+- cb->vmaw = vmaw;
++ /* copy data to output buffer */
++ memcpy(b->buffer, buf, bytes);
++ b->bytes = bytes;
++
++ /* signal writer thread that we have new data */
++ g_cond_signal(vmaw->wt.change_cond);
+
+- DPRINTF("vma_queue_write start %zd\n", bytes);
+- cb->co = qemu_coroutine_create(vma_co_writer_task);
+- qemu_coroutine_enter(cb->co, cb);
++ g_mutex_unlock(vmaw->wt.mutex);
+
+- DPRINTF("vma_queue_write leave\n");
++ DPRINTF("vma_queue_write queued %zd\n", bytes);
+
+ return bytes;
+ }
+@@ -389,10 +422,10 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
+ const char *tmp_id_str;
+
+ if ((stat(filename, &st) == 0) && S_ISFIFO(st.st_mode)) {
+- oflags = O_NONBLOCK|O_WRONLY;
++ oflags = O_WRONLY;
+ vmaw->fd = qemu_open(filename, oflags, 0644);
+ } else if (strstart(filename, "/dev/fdset/", &tmp_id_str)) {
+- oflags = O_NONBLOCK|O_WRONLY;
++ oflags = O_WRONLY;
+ vmaw->fd = qemu_open(filename, oflags, 0644);
+ } else if (strstart(filename, "/dev/fdname/", &tmp_id_str)) {
+ vmaw->fd = monitor_get_fd(cur_mon, tmp_id_str, errp);
+@@ -400,7 +433,7 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
+ goto err;
+ }
+ } else {
+- oflags = O_NONBLOCK|O_WRONLY|O_CREAT|O_EXCL;
++ oflags = O_WRONLY|O_CREAT|O_EXCL;
+ vmaw->fd = qemu_open(filename, oflags, 0644);
+ }
+
+@@ -418,7 +451,6 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
+
+ qemu_co_mutex_init(&vmaw->writer_lock);
+ qemu_co_mutex_init(&vmaw->flush_lock);
+- qemu_co_queue_init(&vmaw->wqueue);
+
+ uuid_copy(vmaw->uuid, uuid);
+
+@@ -428,6 +460,15 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
+
+ ratelimit_set_speed(&vmaw->limit, speed, 100000000ULL /* 0.1 sec */);
+
++ vmaw->wt.mutex = g_mutex_new();
++ vmaw->wt.change_cond = g_cond_new();
++ vmaw->wt.fd = vmaw->fd;
++ vmaw->wt.thread = g_thread_create(vma_writer_thread, &vmaw->wt, true, NULL);
++ if (vmaw->wt.thread == NULL) {
++ error_setg(errp, "can't allocate writer thread\n");
++ goto err;
++ }
++
+ return vmaw;
+
+ err:
+@@ -442,6 +483,14 @@ err:
+ g_checksum_free(vmaw->md5csum);
+ }
+
++ if (vmaw->wt.mutex) {
++ g_mutex_free(vmaw->wt.mutex);
++ }
++
++ if (vmaw->wt.change_cond) {
++ g_cond_free(vmaw->wt.change_cond);
++ }
++
+ g_free(vmaw);
+ }
+
+@@ -688,6 +737,16 @@ vma_writer_write(VmaWriter *vmaw, uint8_t dev_id, int64_t cluster_num,
+
+ *zero_bytes = 0;
+
++ g_mutex_lock(vmaw->wt.mutex);
++ int error = vmaw->wt.error;
++ g_mutex_unlock(vmaw->wt.mutex);
++
++ if (error) {
++ vma_writer_set_error(vmaw, "vma_writer_get_buffer write error - %s",
++ strerror(error));
++ return -1;
++ }
++
+ if (vmaw->status < 0) {
+ return vmaw->status;
+ }
+@@ -801,11 +860,7 @@ int vma_writer_close(VmaWriter *vmaw, Error **errp)
+
+ vma_queue_flush(vmaw);
+
+- /* this should not happen - just to be sure */
+- while (!qemu_co_queue_empty(&vmaw->wqueue)) {
+- DPRINTF("vma_writer_close wait\n");
+- co_sleep_ns(rt_clock, 1000000);
+- }
++ vma_stop_writer_thread(vmaw);
+
+ if (vmaw->cmd) {
+ if (pclose(vmaw->cmd) < 0) {
+@@ -851,8 +906,9 @@ void vma_writer_destroy(VmaWriter *vmaw)
+ {
+ assert(vmaw);
+
+- int i;
++ vma_stop_writer_thread(vmaw);
+
++ int i;
+ for (i = 0; i <= 255; i++) {
+ if (vmaw->stream_info[i].devname) {
+ g_free(vmaw->stream_info[i].devname);
+@@ -863,6 +919,14 @@ void vma_writer_destroy(VmaWriter *vmaw)
+ g_checksum_free(vmaw->md5csum);
+ }
+
++ if (vmaw->wt.mutex) {
++ g_mutex_free(vmaw->wt.mutex);
++ }
++
++ if (vmaw->wt.change_cond) {
++ g_cond_free(vmaw->wt.change_cond);
++ }
++
+ g_free(vmaw);
+ }
+
+--
+1.7.2.5
+