-From 9cf799746fb3b21362fd62574cd76bfa14a24070 Mon Sep 17 00:00:00 2001
+From a6f324d47b810809de2a6106849527c6a9590175 Mon Sep 17 00:00:00 2001
From: Dietmar Maurer <dietmar@proxmox.com>
Date: Mon, 14 Jan 2013 08:05:40 +0100
Subject: [PATCH v3 7/7] use extra thread for vma writer
vma-writer.c | 296 +++++++++++++++++++++++++++++++++++-----------------------
1 files changed, 180 insertions(+), 116 deletions(-)
-diff --git a/vma-writer.c b/vma-writer.c
-index 688af4b..e18591e 100644
---- a/vma-writer.c
-+++ b/vma-writer.c
-@@ -38,13 +38,20 @@
+Index: new/vma-writer.c
+===================================================================
+--- new.orig/vma-writer.c 2013-01-23 07:35:12.000000000 +0100
++++ new/vma-writer.c 2013-01-23 09:24:19.000000000 +0100
+@@ -37,13 +37,21 @@
#define WRITE_BUFFERS 5
+ GMutex *mutex;
+ GCond *change_cond;
+ WriteBuffer wbuf[WRITE_BUFFERS];
++ CoQueue wqueue;
+} WriterThread;
struct VmaWriter {
int fd;
-@@ -61,8 +68,7 @@ struct VmaWriter {
+@@ -60,8 +68,7 @@
int outbuf_count; /* in VMA_BLOCKS */
uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT];
GChecksum *md5csum;
CoMutex writer_lock;
-@@ -88,6 +94,80 @@ struct VmaWriter {
+@@ -86,6 +93,107 @@
uint32_t config_count;
};
+ while (1) {
+ WriteBuffer *b = NULL;
+
-+ g_mutex_lock(wt->mutex);
++ qemu_mutex_lock_iothread();
+ int i;
+ for (i = 0; i < WRITE_BUFFERS; i++) {
+ if (wt->wbuf[i].bytes) {
+ break;
+ }
+ }
-+ g_mutex_unlock(wt->mutex);
++ qemu_mutex_unlock_iothread();
+
+ if (b) {
+ size_t done = 0;
+ done += ret;
+ } else if (ret < 0) {
+ if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
-+ g_mutex_lock(wt->mutex);
++ qemu_mutex_lock_iothread();
+ wt->error = errno;
-+ g_mutex_unlock(wt->mutex);
++ qemu_mutex_unlock_iothread();
+ break;
+ }
+ } else if (ret == 0) {
+ /* should not happen - simply try again */
+ }
+ }
-+ g_mutex_lock(wt->mutex);
++ qemu_mutex_lock_iothread();
+ b->bytes = 0;
-+ g_mutex_unlock(wt->mutex);
++ DPRINTF("AWAKE JOB %d\n", wt->error);
++ if (wt->error) {
++ for (i = 0; i < WRITE_BUFFERS; i++) {
++ wt->wbuf[i].bytes = 0;
++ }
++ qemu_co_queue_restart_all(&wt->wqueue);
++ } else {
++ qemu_co_queue_next(&wt->wqueue);
++ }
++ qemu_mutex_unlock_iothread();
++ DPRINTF("AWAKE JOB END\n");
+ }
+
+ if (wt->error) {
+ g_mutex_lock(wt->mutex);
+ bool cancel = wt->cancel;
+ if (!b && !cancel) {
++ DPRINTF("WRITER THREAD WAIT FOR DATA\n");
+ g_cond_wait(wt->change_cond, wt->mutex);
++ cancel = wt->cancel;
+ }
+ g_mutex_unlock(wt->mutex);
+
+ if (cancel) {
++ qemu_mutex_lock_iothread();
++ for (i = 0; i < WRITE_BUFFERS; i++) {
++ wt->wbuf[i].bytes = 0;
++ }
++ qemu_co_queue_restart_all(&wt->wqueue);
++ qemu_mutex_unlock_iothread();
+ DPRINTF("END WRITER THREAD\n");
+ g_thread_exit(NULL);
+ }
+{
+ assert(vmaw);
+
++ DPRINTF("vma_stop_writer_thread start\n");
++
+ if (vmaw->wt.thread) {
++ DPRINTF("vma_stop_writer_thread 1\n");
+ g_mutex_lock(vmaw->wt.mutex);
++ DPRINTF("vma_stop_writer_thread 2\n");
+ vmaw->wt.cancel = true;
+ g_cond_signal(vmaw->wt.change_cond);
+ g_mutex_unlock(vmaw->wt.mutex);
++ DPRINTF("vma_stop_writer_thread 3\n");
++ qemu_mutex_unlock_iothread();
+ g_thread_join(vmaw->wt.thread);
++ qemu_mutex_lock_iothread();
++ DPRINTF("vma_stop_writer_thread 4\n");
+ vmaw->wt.thread = NULL;
+ }
++ DPRINTF("vma_stop_writer_thread end\n");
+}
+
void vma_writer_set_error(VmaWriter *vmaw, const char *fmt, ...)
{
va_list ap;
-@@ -215,111 +295,47 @@ int vma_writer_register_stream(VmaWriter *vmaw, const char *devname,
+@@ -213,111 +321,45 @@
return n;
}
int i;
- VmaAIOCB *cb = NULL;
+ WriteBuffer *b = NULL;
-+ g_mutex_lock(vmaw->wt.mutex);
-+
++
+ error = vmaw->wt.error;
+
for (i = 0; i < WRITE_BUFFERS; i++) {
}
}
- if (!cb) {
-+ g_mutex_unlock(vmaw->wt.mutex);
+
+ if (!b || error) {
break;
}
- qemu_co_queue_wait(&vmaw->wqueue);
-+ uint64_t delay_ns = 10000;
-+ DPRINTF("WAIT FOR BUFFER FLUSH %zd\n", delay_ns);
-+ co_sleep_ns(rt_clock, delay_ns);
++ DPRINTF("WAIT FOR BUFFER FLUSH\n");
++ qemu_co_queue_wait(&vmaw->wt.wqueue);
++ DPRINTF("WAIT FOR BUFFER FLUSH END\n");
+ }
+
+ if (error) {
static ssize_t coroutine_fn
vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
{
-@@ -329,29 +345,46 @@ vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
+@@ -327,29 +369,42 @@
assert(buf);
assert(bytes <= VMA_MAX_EXTENT_SIZE);
+ int error = 0;
+
+ /* wait for a free output buffer */
-+ g_mutex_lock(vmaw->wt.mutex);
+ WriteBuffer *b = NULL;
+ while (!b) {
+ error = vmaw->wt.error;
+ if (error) {
-+ g_mutex_unlock(vmaw->wt.mutex);
-+ vma_writer_set_error(vmaw, "vma_queue_write error - %s",
++ vma_writer_set_error(vmaw, "vma_queue_write error - %s",
+ strerror(error));
+ return -1;
+ }
- if (!cb) {
- qemu_co_queue_wait(&vmaw->wqueue);
+ if (!b) {
-+ uint64_t delay_ns = 10000;
-+ DPRINTF("WAIT FOR BUFFER %zd\n", delay_ns);
-+ g_mutex_unlock(vmaw->wt.mutex);
-+ co_sleep_ns(rt_clock, delay_ns);
-+ g_mutex_lock(vmaw->wt.mutex);
++ DPRINTF("WAIT FOR BUFFER\n");
++ qemu_co_queue_wait(&vmaw->wt.wqueue);
++ DPRINTF("WAIT FOR BUFFER DONE\n");
}
}
+ /* copy data to output buffer */
+ memcpy(b->buffer, buf, bytes);
+ b->bytes = bytes;
-+
-+ /* signal writer thread that we have new data */
-+ g_cond_signal(vmaw->wt.change_cond);
- DPRINTF("vma_queue_write start %zd\n", bytes);
- cb->co = qemu_coroutine_create(vma_co_writer_task);
- qemu_coroutine_enter(cb->co, cb);
++ g_mutex_lock(vmaw->wt.mutex);
++ /* signal writer thread that we have new data */
++ g_cond_signal(vmaw->wt.change_cond);
+ g_mutex_unlock(vmaw->wt.mutex);
- DPRINTF("vma_queue_write leave\n");
return bytes;
}
-@@ -389,10 +422,10 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
+@@ -386,10 +441,10 @@
const char *tmp_id_str;
if ((stat(filename, &st) == 0) && S_ISFIFO(st.st_mode)) {
vmaw->fd = qemu_open(filename, oflags, 0644);
} else if (strstart(filename, "/dev/fdname/", &tmp_id_str)) {
vmaw->fd = monitor_get_fd(cur_mon, tmp_id_str, errp);
-@@ -400,7 +433,7 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
+@@ -397,7 +452,7 @@
goto err;
}
} else {
vmaw->fd = qemu_open(filename, oflags, 0644);
}
-@@ -418,7 +451,6 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
+@@ -415,10 +470,19 @@
qemu_co_mutex_init(&vmaw->writer_lock);
qemu_co_mutex_init(&vmaw->flush_lock);
- qemu_co_queue_init(&vmaw->wqueue);
++ qemu_co_queue_init(&vmaw->wt.wqueue);
uuid_copy(vmaw->uuid, uuid);
-@@ -428,6 +460,15 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
-
- ratelimit_set_speed(&vmaw->limit, speed, 100000000ULL /* 0.1 sec */);
-
+ vmaw->wt.mutex = g_mutex_new();
+ vmaw->wt.change_cond = g_cond_new();
+ vmaw->wt.fd = vmaw->fd;
return vmaw;
err:
-@@ -442,6 +483,14 @@ err:
+@@ -433,6 +497,14 @@
g_checksum_free(vmaw->md5csum);
}
g_free(vmaw);
}
-@@ -688,6 +737,16 @@ vma_writer_write(VmaWriter *vmaw, uint8_t dev_id, int64_t cluster_num,
+@@ -672,6 +744,14 @@
*zero_bytes = 0;
-+ g_mutex_lock(vmaw->wt.mutex);
+ int error = vmaw->wt.error;
-+ g_mutex_unlock(vmaw->wt.mutex);
+
+ if (error) {
+ vma_writer_set_error(vmaw, "vma_writer_get_buffer write error - %s",
if (vmaw->status < 0) {
return vmaw->status;
}
-@@ -801,11 +860,7 @@ int vma_writer_close(VmaWriter *vmaw, Error **errp)
+@@ -783,14 +863,17 @@
+
+ int i;
++ DPRINTF("vma_writer_close start\n");
vma_queue_flush(vmaw);
-- /* this should not happen - just to be sure */
+ /* this should not happen - just to be sure */
- while (!qemu_co_queue_empty(&vmaw->wqueue)) {
-- DPRINTF("vma_writer_close wait\n");
-- co_sleep_ns(rt_clock, 1000000);
-- }
-+ vma_stop_writer_thread(vmaw);
++ while (!qemu_co_queue_empty(&vmaw->wt.wqueue)) {
+ DPRINTF("vma_writer_close wait\n");
+ co_sleep_ns(rt_clock, 1000000);
+ }
++ vma_stop_writer_thread(vmaw);
++
if (vmaw->cmd) {
if (pclose(vmaw->cmd) < 0) {
-@@ -851,8 +906,9 @@ void vma_writer_destroy(VmaWriter *vmaw)
+ vma_writer_set_error(vmaw, "vma_writer_close: "
+@@ -835,8 +918,9 @@
{
assert(vmaw);
for (i = 0; i <= 255; i++) {
if (vmaw->stream_info[i].devname) {
g_free(vmaw->stream_info[i].devname);
-@@ -863,6 +919,14 @@ void vma_writer_destroy(VmaWriter *vmaw)
+@@ -847,6 +931,14 @@
g_checksum_free(vmaw->md5csum);
}
g_free(vmaw);
}
---
-1.7.2.5
-