]> git.proxmox.com Git - pve-qemu-kvm.git/blobdiff - debian/patches/old/0007-use-extra-thread-for-vma-writer.patch
refer to the new repository
[pve-qemu-kvm.git] / debian / patches / old / 0007-use-extra-thread-for-vma-writer.patch
diff --git a/debian/patches/old/0007-use-extra-thread-for-vma-writer.patch b/debian/patches/old/0007-use-extra-thread-for-vma-writer.patch
deleted file mode 100644 (file)
index d6d1f5f..0000000
+++ /dev/null
@@ -1,471 +0,0 @@
-From a6f324d47b810809de2a6106849527c6a9590175 Mon Sep 17 00:00:00 2001
-From: Dietmar Maurer <dietmar@proxmox.com>
-Date: Mon, 14 Jan 2013 08:05:40 +0100
-Subject: [PATCH v3 7/7] use extra thread for vma writer
-
-The previous AIO approach has problem with bdrv_drain_all(), because writer
-coroutines are not considered there. Those coroutines are not restarted, so
-bdrv_drain_all() can fail (tracked_requests list not empty).
-
-We now use a thread, so we could also add compression here.
-
-Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
----
- vma-writer.c |  296 +++++++++++++++++++++++++++++++++++-----------------------
- 1 files changed, 180 insertions(+), 116 deletions(-)
-
-Index: new/vma-writer.c
-===================================================================
---- new.orig/vma-writer.c      2013-01-23 07:35:12.000000000 +0100
-+++ new/vma-writer.c   2013-01-23 09:24:19.000000000 +0100
-@@ -37,13 +37,21 @@
- #define WRITE_BUFFERS 5
--typedef struct VmaAIOCB VmaAIOCB;
--struct VmaAIOCB {
--    VmaWriter *vmaw;
-+typedef struct WriteBuffer {
-     unsigned char buffer[VMA_MAX_EXTENT_SIZE];
-     size_t bytes;
--    Coroutine *co;
--};
-+} WriteBuffer;
-+
-+typedef struct WriterThread {
-+    int fd;
-+    int error;
-+    bool cancel;
-+    GThread *thread;
-+    GMutex *mutex;
-+    GCond *change_cond;
-+    WriteBuffer wbuf[WRITE_BUFFERS];
-+    CoQueue wqueue;
-+} WriterThread;
- struct VmaWriter {
-     int fd;
-@@ -60,8 +68,7 @@
-     int outbuf_count; /* in VMA_BLOCKS */
-     uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT];
--    VmaAIOCB aiocbs[WRITE_BUFFERS];
--    CoQueue wqueue;
-+    WriterThread wt;
-     GChecksum *md5csum;
-     CoMutex writer_lock;
-@@ -86,6 +93,107 @@
-     uint32_t config_count;
- };
-+static gpointer vma_writer_thread(gpointer data)
-+{
-+    WriterThread *wt = (WriterThread *)data;
-+
-+    while (1) {
-+        WriteBuffer *b = NULL;
-+
-+      qemu_mutex_lock_iothread();
-+        int i;
-+        for (i = 0; i < WRITE_BUFFERS; i++) {
-+            if (wt->wbuf[i].bytes) {
-+                b = &wt->wbuf[i];
-+                break;
-+            }
-+        }
-+      qemu_mutex_unlock_iothread();
-+
-+        if (b) {
-+            size_t done = 0;
-+            while (done < b->bytes) {
-+                int ret = write(wt->fd, b->buffer + done, b->bytes - done);
-+                if (ret > 0) {
-+                    done += ret;
-+                } else if (ret < 0) {
-+                    if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
-+                        qemu_mutex_lock_iothread();
-+                        wt->error = errno;
-+                        qemu_mutex_unlock_iothread();
-+                        break;
-+                    }
-+                } else if (ret == 0) {
-+                    /* should not happen - simply try again */
-+                }
-+            }
-+            qemu_mutex_lock_iothread();
-+            b->bytes = 0;
-+            DPRINTF("AWAKE JOB %d\n", wt->error);
-+            if (wt->error) {
-+                for (i = 0; i < WRITE_BUFFERS; i++) {
-+                    wt->wbuf[i].bytes = 0;
-+                }
-+                qemu_co_queue_restart_all(&wt->wqueue);
-+            } else {
-+                qemu_co_queue_next(&wt->wqueue);
-+            }
-+            qemu_mutex_unlock_iothread();
-+            DPRINTF("AWAKE JOB END\n");
-+        }
-+
-+        if (wt->error) {
-+            DPRINTF("WRITER THREAD ERROR %d - exit thread\n", wt->error);
-+            g_thread_exit(NULL);
-+        }
-+
-+        g_mutex_lock(wt->mutex);
-+        bool cancel = wt->cancel;
-+        if (!b && !cancel) {
-+            DPRINTF("WRITER THREAD WAIT FOR DATA\n");         
-+            g_cond_wait(wt->change_cond, wt->mutex);
-+            cancel = wt->cancel;
-+        }
-+        g_mutex_unlock(wt->mutex);
-+
-+        if (cancel) {
-+            qemu_mutex_lock_iothread();
-+            for (i = 0; i < WRITE_BUFFERS; i++) {
-+                wt->wbuf[i].bytes = 0;
-+            }
-+            qemu_co_queue_restart_all(&wt->wqueue);
-+            qemu_mutex_unlock_iothread();
-+            DPRINTF("END WRITER THREAD\n");
-+            g_thread_exit(NULL);
-+        }
-+    }
-+
-+    return NULL;
-+}
-+
-+static void vma_stop_writer_thread(VmaWriter *vmaw)
-+{
-+    assert(vmaw);
-+
-+    DPRINTF("vma_stop_writer_thread start\n");
-+
-+    if (vmaw->wt.thread) {
-+        DPRINTF("vma_stop_writer_thread 1\n");
-+        g_mutex_lock(vmaw->wt.mutex);
-+        DPRINTF("vma_stop_writer_thread 2\n");
-+        vmaw->wt.cancel = true;
-+        g_cond_signal(vmaw->wt.change_cond);
-+        g_mutex_unlock(vmaw->wt.mutex);
-+        DPRINTF("vma_stop_writer_thread 3\n");
-+        qemu_mutex_unlock_iothread();
-+        g_thread_join(vmaw->wt.thread);
-+        qemu_mutex_lock_iothread();
-+        DPRINTF("vma_stop_writer_thread 4\n");
-+        vmaw->wt.thread = NULL;
-+    }
-+    DPRINTF("vma_stop_writer_thread end\n");
-+}
-+
- void vma_writer_set_error(VmaWriter *vmaw, const char *fmt, ...)
- {
-     va_list ap;
-@@ -213,111 +321,45 @@
-     return n;
- }
--static void vma_co_continue_write(void *opaque)
--{
--    VmaWriter *vmaw = opaque;
--
--    qemu_aio_set_fd_handler(vmaw->fd, NULL, NULL, NULL, NULL);
--
--    DPRINTF("vma_co_continue_write\n");
--    qemu_coroutine_enter(vmaw->co_writer, NULL);
--}
--
--static ssize_t coroutine_fn
--vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes)
--{
--    size_t done = 0;
--    ssize_t ret;
--
--    /* atomic writes (we cannot interleave writes) */
--    qemu_co_mutex_lock(&vmaw->writer_lock);
--
--    DPRINTF("vma_co_write enter %zd\n", bytes);
--
--    while (done < bytes) {
--        ret = write(vmaw->fd, buf + done, bytes - done);
--        if (ret > 0) {
--            done += ret;
--            DPRINTF("vma_co_write written %zd %zd\n", done, ret);
--        } else if (ret < 0) {
--            if (errno == EAGAIN || errno == EWOULDBLOCK) {
--                DPRINTF("vma_co_write yield %zd\n", done);
--
--                vmaw->co_writer = qemu_coroutine_self();
--                qemu_aio_set_fd_handler(vmaw->fd, NULL, vma_co_continue_write,
--                                        NULL, vmaw);
--
--                qemu_coroutine_yield();
--                DPRINTF("vma_co_write restart %zd\n", done);
--            } else {
--                vma_writer_set_error(vmaw, "vma_co_write write error - %s",
--                                     strerror(errno));
--                done = -1; /* always return failure for partial writes */
--                break;
--            }
--        } else if (ret == 0) {
--            /* should not happen - simply try again */
--        }
--    }
--
--    qemu_co_mutex_unlock(&vmaw->writer_lock);
--
--    DPRINTF("vma_co_write leave %zd\n", done);
--    return done;
--}
--
--static void coroutine_fn vma_co_writer_task(void *opaque)
--{
--    VmaAIOCB *cb = opaque;
--
--    DPRINTF("vma_co_writer_task start\n");
--
--    int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes);
--    DPRINTF("vma_co_writer_task write done %zd\n", done);
--
--    if (done != cb->bytes) {
--        DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done);
--        vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd",
--                             done);
--    }
--
--    cb->bytes = 0;
--
--    qemu_co_queue_next(&cb->vmaw->wqueue);
--
--    DPRINTF("vma_co_writer_task end\n");
--}
--
- static void coroutine_fn vma_queue_flush(VmaWriter *vmaw)
- {
-     DPRINTF("vma_queue_flush enter\n");
-     assert(vmaw);
-+    int error;
-+
-     while (1) {
-         int i;
--        VmaAIOCB *cb = NULL;
-+        WriteBuffer *b = NULL;
-+ 
-+        error = vmaw->wt.error;
-+
-         for (i = 0; i < WRITE_BUFFERS; i++) {
--            if (vmaw->aiocbs[i].bytes) {
--                cb = &vmaw->aiocbs[i];
--                DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i,
--                        vmaw->aiocbs[i].bytes);
-+            if (vmaw->wt.wbuf[i].bytes) {
-+                b = &vmaw->wt.wbuf[i];
-+                DPRINTF("FOUND USED WRITE BUFFER %d %zd\n", i,
-+                        vmaw->wt.wbuf[i].bytes);
-                 break;
-             }
-         }
--        if (!cb) {
-+
-+        if (!b || error) {
-             break;
-         }
--        qemu_co_queue_wait(&vmaw->wqueue);
-+        DPRINTF("WAIT FOR BUFFER FLUSH\n");
-+        qemu_co_queue_wait(&vmaw->wt.wqueue);
-+        DPRINTF("WAIT FOR BUFFER FLUSH END\n");
-+    }
-+
-+    if (error) {
-+        vma_writer_set_error(vmaw, "vma_queue_flush write error - %s",
-+                             strerror(error));
-     }
-     DPRINTF("vma_queue_flush leave\n");
- }
--/**
-- * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a')
-- * So we need to create a coroutione to allow 'parallel' execution.
-- */
- static ssize_t coroutine_fn
- vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
- {
-@@ -327,29 +369,42 @@
-     assert(buf);
-     assert(bytes <= VMA_MAX_EXTENT_SIZE);
--    VmaAIOCB *cb = NULL;
--    while (!cb) {
-+    int error = 0;
-+
-+    /* wait for a free output buffer */
-+    WriteBuffer *b = NULL;
-+    while (!b) {
-+        error = vmaw->wt.error;
-+        if (error) {
-+             vma_writer_set_error(vmaw, "vma_queue_write error - %s",
-+                                 strerror(error));
-+            return -1;
-+        }
-+
-         int i;
-         for (i = 0; i < WRITE_BUFFERS; i++) {
--            if (!vmaw->aiocbs[i].bytes) {
--                cb = &vmaw->aiocbs[i];
-+            if (!vmaw->wt.wbuf[i].bytes) {
-+                b = &vmaw->wt.wbuf[i];
-                 break;
-             }
-         }
--        if (!cb) {
--            qemu_co_queue_wait(&vmaw->wqueue);
-+        if (!b) {
-+            DPRINTF("WAIT FOR BUFFER\n");
-+            qemu_co_queue_wait(&vmaw->wt.wqueue);
-+            DPRINTF("WAIT FOR BUFFER DONE\n");
-         }
-     }
--    memcpy(cb->buffer, buf, bytes);
--    cb->bytes = bytes;
--    cb->vmaw = vmaw;
-+    /* copy data to output buffer */
-+    memcpy(b->buffer, buf, bytes);
-+    b->bytes = bytes;
--    DPRINTF("vma_queue_write start %zd\n", bytes);
--    cb->co = qemu_coroutine_create(vma_co_writer_task);
--    qemu_coroutine_enter(cb->co, cb);
-+    g_mutex_lock(vmaw->wt.mutex);
-+    /* signal writer thread that we have new data */
-+    g_cond_signal(vmaw->wt.change_cond);
-+    g_mutex_unlock(vmaw->wt.mutex);
--    DPRINTF("vma_queue_write leave\n");
-+    DPRINTF("vma_queue_write queued %zd\n", bytes);
-     return bytes;
- }
-@@ -386,10 +441,10 @@
-         const char *tmp_id_str;
-         if ((stat(filename, &st) == 0) && S_ISFIFO(st.st_mode)) {
--            oflags = O_NONBLOCK|O_WRONLY;
-+            oflags = O_WRONLY;
-             vmaw->fd = qemu_open(filename, oflags, 0644);
-         } else if (strstart(filename, "/dev/fdset/", &tmp_id_str)) {
--            oflags = O_NONBLOCK|O_WRONLY;
-+            oflags = O_WRONLY;
-             vmaw->fd = qemu_open(filename, oflags, 0644);
-         } else if (strstart(filename, "/dev/fdname/", &tmp_id_str)) {
-             vmaw->fd = monitor_get_fd(cur_mon, tmp_id_str, errp);
-@@ -397,7 +452,7 @@
-                 goto err;
-             }
-         } else  {
--            oflags = O_NONBLOCK|O_WRONLY|O_CREAT|O_EXCL;
-+            oflags = O_WRONLY|O_CREAT|O_EXCL;
-             vmaw->fd = qemu_open(filename, oflags, 0644);
-         }
-@@ -415,10 +470,19 @@
-     qemu_co_mutex_init(&vmaw->writer_lock);
-     qemu_co_mutex_init(&vmaw->flush_lock);
--    qemu_co_queue_init(&vmaw->wqueue);
-+    qemu_co_queue_init(&vmaw->wt.wqueue);
-     uuid_copy(vmaw->uuid, uuid);
-+    vmaw->wt.mutex = g_mutex_new();
-+    vmaw->wt.change_cond = g_cond_new();
-+    vmaw->wt.fd = vmaw->fd;
-+    vmaw->wt.thread = g_thread_create(vma_writer_thread, &vmaw->wt, true, NULL);
-+    if (vmaw->wt.thread == NULL) {
-+        error_setg(errp, "can't allocate writer thread\n");
-+        goto err;
-+    }
-+
-     return vmaw;
- err:
-@@ -433,6 +497,14 @@
-             g_checksum_free(vmaw->md5csum);
-         }
-+        if (vmaw->wt.mutex) {
-+            g_mutex_free(vmaw->wt.mutex);
-+        }
-+
-+        if (vmaw->wt.change_cond) {
-+            g_cond_free(vmaw->wt.change_cond);
-+        }
-+
-         g_free(vmaw);
-     }
-@@ -672,6 +744,14 @@
-     *zero_bytes = 0;
-+    int error = vmaw->wt.error;
-+
-+    if (error) {
-+        vma_writer_set_error(vmaw, "vma_writer_get_buffer write error - %s",
-+                             strerror(error));
-+        return -1;
-+    }
-+
-     if (vmaw->status < 0) {
-         return vmaw->status;
-     }
-@@ -783,14 +863,17 @@
-     int i;
-+    DPRINTF("vma_writer_close start\n");
-     vma_queue_flush(vmaw);
-     /* this should not happen - just to be sure */
--    while (!qemu_co_queue_empty(&vmaw->wqueue)) {
-+    while (!qemu_co_queue_empty(&vmaw->wt.wqueue)) {
-         DPRINTF("vma_writer_close wait\n");
-         co_sleep_ns(rt_clock, 1000000);
-     }
-+    vma_stop_writer_thread(vmaw);
-+
-     if (vmaw->cmd) {
-         if (pclose(vmaw->cmd) < 0) {
-             vma_writer_set_error(vmaw, "vma_writer_close: "
-@@ -835,8 +918,9 @@
- {
-     assert(vmaw);
--    int i;
-+    vma_stop_writer_thread(vmaw);
-+    int i;
-     for (i = 0; i <= 255; i++) {
-         if (vmaw->stream_info[i].devname) {
-             g_free(vmaw->stream_info[i].devname);
-@@ -847,6 +931,14 @@
-         g_checksum_free(vmaw->md5csum);
-     }
-+    if (vmaw->wt.mutex) {
-+        g_mutex_free(vmaw->wt.mutex);
-+    }
-+
-+    if (vmaw->wt.change_cond) {
-+        g_cond_free(vmaw->wt.change_cond);
-+    }
-+
-     g_free(vmaw);
- }