]> git.proxmox.com Git - pve-qemu-kvm.git/blame - debian/patches/0007-use-extra-thread-for-vma-writer.patch
Two more fixes
[pve-qemu-kvm.git] / debian / patches / 0007-use-extra-thread-for-vma-writer.patch
CommitLineData
ddfd618f 1From a6f324d47b810809de2a6106849527c6a9590175 Mon Sep 17 00:00:00 2001
309874bd
DM
2From: Dietmar Maurer <dietmar@proxmox.com>
3Date: Mon, 14 Jan 2013 08:05:40 +0100
4Subject: [PATCH v3 7/7] use extra thread for vma writer
5
6The previous AIO approach has problem with bdrv_drain_all(), because writer
7coroutines are not considered there. Those coroutines are not restarted, so
8bdrv_drain_all() can fail (tracked_requests list not empty).
9
10We now use a thread, so we could also add compression here.
11
12Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
13---
14 vma-writer.c | 296 +++++++++++++++++++++++++++++++++++-----------------------
15 1 files changed, 180 insertions(+), 116 deletions(-)
16
2dfd543c
DM
17Index: new/vma-writer.c
18===================================================================
19--- new.orig/vma-writer.c 2013-01-23 07:35:12.000000000 +0100
20+++ new/vma-writer.c 2013-01-23 09:24:19.000000000 +0100
21@@ -37,13 +37,21 @@
309874bd
DM
22
23 #define WRITE_BUFFERS 5
24
25-typedef struct VmaAIOCB VmaAIOCB;
26-struct VmaAIOCB {
27- VmaWriter *vmaw;
28+typedef struct WriteBuffer {
29 unsigned char buffer[VMA_MAX_EXTENT_SIZE];
30 size_t bytes;
31- Coroutine *co;
32-};
33+} WriteBuffer;
34+
35+typedef struct WriterThread {
36+ int fd;
37+ int error;
38+ bool cancel;
39+ GThread *thread;
40+ GMutex *mutex;
41+ GCond *change_cond;
42+ WriteBuffer wbuf[WRITE_BUFFERS];
2dfd543c 43+ CoQueue wqueue;
309874bd
DM
44+} WriterThread;
45
46 struct VmaWriter {
47 int fd;
2dfd543c 48@@ -60,8 +68,7 @@
309874bd
DM
49 int outbuf_count; /* in VMA_BLOCKS */
50 uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT];
51
52- VmaAIOCB aiocbs[WRITE_BUFFERS];
53- CoQueue wqueue;
54+ WriterThread wt;
55
56 GChecksum *md5csum;
57 CoMutex writer_lock;
2dfd543c 58@@ -86,6 +93,107 @@
309874bd
DM
59 uint32_t config_count;
60 };
61
62+static gpointer vma_writer_thread(gpointer data)
63+{
64+ WriterThread *wt = (WriterThread *)data;
65+
66+ while (1) {
67+ WriteBuffer *b = NULL;
68+
2dfd543c 69+ qemu_mutex_lock_iothread();
309874bd
DM
70+ int i;
71+ for (i = 0; i < WRITE_BUFFERS; i++) {
72+ if (wt->wbuf[i].bytes) {
73+ b = &wt->wbuf[i];
74+ break;
75+ }
76+ }
2dfd543c 77+ qemu_mutex_unlock_iothread();
309874bd
DM
78+
79+ if (b) {
80+ size_t done = 0;
81+ while (done < b->bytes) {
82+ int ret = write(wt->fd, b->buffer + done, b->bytes - done);
83+ if (ret > 0) {
84+ done += ret;
85+ } else if (ret < 0) {
86+ if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
2dfd543c 87+ qemu_mutex_lock_iothread();
309874bd 88+ wt->error = errno;
2dfd543c 89+ qemu_mutex_unlock_iothread();
309874bd
DM
90+ break;
91+ }
92+ } else if (ret == 0) {
93+ /* should not happen - simply try again */
94+ }
95+ }
2dfd543c 96+ qemu_mutex_lock_iothread();
309874bd 97+ b->bytes = 0;
2dfd543c
DM
98+ DPRINTF("AWAKE JOB %d\n", wt->error);
99+ if (wt->error) {
100+ for (i = 0; i < WRITE_BUFFERS; i++) {
101+ wt->wbuf[i].bytes = 0;
102+ }
103+ qemu_co_queue_restart_all(&wt->wqueue);
104+ } else {
105+ qemu_co_queue_next(&wt->wqueue);
106+ }
107+ qemu_mutex_unlock_iothread();
108+ DPRINTF("AWAKE JOB END\n");
309874bd
DM
109+ }
110+
111+ if (wt->error) {
112+ DPRINTF("WRITER THREAD ERROR %d - exit thread\n", wt->error);
113+ g_thread_exit(NULL);
114+ }
115+
116+ g_mutex_lock(wt->mutex);
117+ bool cancel = wt->cancel;
118+ if (!b && !cancel) {
2dfd543c 119+ DPRINTF("WRITER THREAD WAIT FOR DATA\n");
309874bd 120+ g_cond_wait(wt->change_cond, wt->mutex);
2dfd543c 121+ cancel = wt->cancel;
309874bd
DM
122+ }
123+ g_mutex_unlock(wt->mutex);
124+
125+ if (cancel) {
2dfd543c
DM
126+ qemu_mutex_lock_iothread();
127+ for (i = 0; i < WRITE_BUFFERS; i++) {
128+ wt->wbuf[i].bytes = 0;
129+ }
130+ qemu_co_queue_restart_all(&wt->wqueue);
131+ qemu_mutex_unlock_iothread();
309874bd
DM
132+ DPRINTF("END WRITER THREAD\n");
133+ g_thread_exit(NULL);
134+ }
135+ }
136+
137+ return NULL;
138+}
139+
140+static void vma_stop_writer_thread(VmaWriter *vmaw)
141+{
142+ assert(vmaw);
143+
2dfd543c
DM
144+ DPRINTF("vma_stop_writer_thread start\n");
145+
309874bd 146+ if (vmaw->wt.thread) {
2dfd543c 147+ DPRINTF("vma_stop_writer_thread 1\n");
309874bd 148+ g_mutex_lock(vmaw->wt.mutex);
2dfd543c 149+ DPRINTF("vma_stop_writer_thread 2\n");
309874bd
DM
150+ vmaw->wt.cancel = true;
151+ g_cond_signal(vmaw->wt.change_cond);
152+ g_mutex_unlock(vmaw->wt.mutex);
2dfd543c
DM
153+ DPRINTF("vma_stop_writer_thread 3\n");
154+ qemu_mutex_unlock_iothread();
309874bd 155+ g_thread_join(vmaw->wt.thread);
2dfd543c
DM
156+ qemu_mutex_lock_iothread();
157+ DPRINTF("vma_stop_writer_thread 4\n");
309874bd
DM
158+ vmaw->wt.thread = NULL;
159+ }
2dfd543c 160+ DPRINTF("vma_stop_writer_thread end\n");
309874bd
DM
161+}
162+
163 void vma_writer_set_error(VmaWriter *vmaw, const char *fmt, ...)
164 {
165 va_list ap;
2dfd543c 166@@ -213,111 +321,45 @@
309874bd
DM
167 return n;
168 }
169
170-static void vma_co_continue_write(void *opaque)
171-{
172- VmaWriter *vmaw = opaque;
173-
174- qemu_aio_set_fd_handler(vmaw->fd, NULL, NULL, NULL, NULL);
175-
176- DPRINTF("vma_co_continue_write\n");
177- qemu_coroutine_enter(vmaw->co_writer, NULL);
178-}
179-
180-static ssize_t coroutine_fn
181-vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes)
182-{
183- size_t done = 0;
184- ssize_t ret;
185-
186- /* atomic writes (we cannot interleave writes) */
187- qemu_co_mutex_lock(&vmaw->writer_lock);
188-
189- DPRINTF("vma_co_write enter %zd\n", bytes);
190-
191- while (done < bytes) {
192- ret = write(vmaw->fd, buf + done, bytes - done);
193- if (ret > 0) {
194- done += ret;
195- DPRINTF("vma_co_write written %zd %zd\n", done, ret);
196- } else if (ret < 0) {
197- if (errno == EAGAIN || errno == EWOULDBLOCK) {
198- DPRINTF("vma_co_write yield %zd\n", done);
199-
200- vmaw->co_writer = qemu_coroutine_self();
201- qemu_aio_set_fd_handler(vmaw->fd, NULL, vma_co_continue_write,
202- NULL, vmaw);
203-
204- qemu_coroutine_yield();
205- DPRINTF("vma_co_write restart %zd\n", done);
206- } else {
207- vma_writer_set_error(vmaw, "vma_co_write write error - %s",
208- strerror(errno));
209- done = -1; /* always return failure for partial writes */
210- break;
211- }
212- } else if (ret == 0) {
213- /* should not happen - simply try again */
214- }
215- }
216-
217- qemu_co_mutex_unlock(&vmaw->writer_lock);
218-
219- DPRINTF("vma_co_write leave %zd\n", done);
220- return done;
221-}
222-
223-static void coroutine_fn vma_co_writer_task(void *opaque)
224-{
225- VmaAIOCB *cb = opaque;
226-
227- DPRINTF("vma_co_writer_task start\n");
228-
229- int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes);
230- DPRINTF("vma_co_writer_task write done %zd\n", done);
231-
232- if (done != cb->bytes) {
233- DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done);
234- vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd",
235- done);
236- }
237-
238- cb->bytes = 0;
239-
240- qemu_co_queue_next(&cb->vmaw->wqueue);
241-
242- DPRINTF("vma_co_writer_task end\n");
243-}
244-
245 static void coroutine_fn vma_queue_flush(VmaWriter *vmaw)
246 {
247 DPRINTF("vma_queue_flush enter\n");
248
249 assert(vmaw);
250
251+ int error;
252+
253 while (1) {
254 int i;
255- VmaAIOCB *cb = NULL;
256+ WriteBuffer *b = NULL;
2dfd543c 257+
309874bd
DM
258+ error = vmaw->wt.error;
259+
260 for (i = 0; i < WRITE_BUFFERS; i++) {
261- if (vmaw->aiocbs[i].bytes) {
262- cb = &vmaw->aiocbs[i];
263- DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i,
264- vmaw->aiocbs[i].bytes);
265+ if (vmaw->wt.wbuf[i].bytes) {
266+ b = &vmaw->wt.wbuf[i];
267+ DPRINTF("FOUND USED WRITE BUFFER %d %zd\n", i,
268+ vmaw->wt.wbuf[i].bytes);
269 break;
270 }
271 }
272- if (!cb) {
309874bd
DM
273+
274+ if (!b || error) {
275 break;
276 }
277- qemu_co_queue_wait(&vmaw->wqueue);
2dfd543c
DM
278+ DPRINTF("WAIT FOR BUFFER FLUSH\n");
279+ qemu_co_queue_wait(&vmaw->wt.wqueue);
280+ DPRINTF("WAIT FOR BUFFER FLUSH END\n");
309874bd
DM
281+ }
282+
283+ if (error) {
284+ vma_writer_set_error(vmaw, "vma_queue_flush write error - %s",
285+ strerror(error));
286 }
287
288 DPRINTF("vma_queue_flush leave\n");
289 }
290
291-/**
292- * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a')
293- * So we need to create a coroutione to allow 'parallel' execution.
294- */
295 static ssize_t coroutine_fn
296 vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
297 {
2dfd543c 298@@ -327,29 +369,42 @@
309874bd
DM
299 assert(buf);
300 assert(bytes <= VMA_MAX_EXTENT_SIZE);
301
302- VmaAIOCB *cb = NULL;
303- while (!cb) {
304+ int error = 0;
305+
306+ /* wait for a free output buffer */
309874bd
DM
307+ WriteBuffer *b = NULL;
308+ while (!b) {
309+ error = vmaw->wt.error;
310+ if (error) {
2dfd543c 311+ vma_writer_set_error(vmaw, "vma_queue_write error - %s",
309874bd
DM
312+ strerror(error));
313+ return -1;
314+ }
315+
316 int i;
317 for (i = 0; i < WRITE_BUFFERS; i++) {
318- if (!vmaw->aiocbs[i].bytes) {
319- cb = &vmaw->aiocbs[i];
320+ if (!vmaw->wt.wbuf[i].bytes) {
321+ b = &vmaw->wt.wbuf[i];
322 break;
323 }
324 }
325- if (!cb) {
326- qemu_co_queue_wait(&vmaw->wqueue);
327+ if (!b) {
2dfd543c
DM
328+ DPRINTF("WAIT FOR BUFFER\n");
329+ qemu_co_queue_wait(&vmaw->wt.wqueue);
330+ DPRINTF("WAIT FOR BUFFER DONE\n");
309874bd
DM
331 }
332 }
333
334- memcpy(cb->buffer, buf, bytes);
335- cb->bytes = bytes;
336- cb->vmaw = vmaw;
337+ /* copy data to output buffer */
338+ memcpy(b->buffer, buf, bytes);
339+ b->bytes = bytes;
309874bd
DM
340
341- DPRINTF("vma_queue_write start %zd\n", bytes);
342- cb->co = qemu_coroutine_create(vma_co_writer_task);
343- qemu_coroutine_enter(cb->co, cb);
2dfd543c
DM
344+ g_mutex_lock(vmaw->wt.mutex);
345+ /* signal writer thread that we have new data */
346+ g_cond_signal(vmaw->wt.change_cond);
309874bd
DM
347+ g_mutex_unlock(vmaw->wt.mutex);
348
349- DPRINTF("vma_queue_write leave\n");
350+ DPRINTF("vma_queue_write queued %zd\n", bytes);
351
352 return bytes;
353 }
2dfd543c 354@@ -386,10 +441,10 @@
309874bd
DM
355 const char *tmp_id_str;
356
357 if ((stat(filename, &st) == 0) && S_ISFIFO(st.st_mode)) {
358- oflags = O_NONBLOCK|O_WRONLY;
359+ oflags = O_WRONLY;
360 vmaw->fd = qemu_open(filename, oflags, 0644);
361 } else if (strstart(filename, "/dev/fdset/", &tmp_id_str)) {
362- oflags = O_NONBLOCK|O_WRONLY;
363+ oflags = O_WRONLY;
364 vmaw->fd = qemu_open(filename, oflags, 0644);
365 } else if (strstart(filename, "/dev/fdname/", &tmp_id_str)) {
366 vmaw->fd = monitor_get_fd(cur_mon, tmp_id_str, errp);
2dfd543c 367@@ -397,7 +452,7 @@
309874bd
DM
368 goto err;
369 }
370 } else {
371- oflags = O_NONBLOCK|O_WRONLY|O_CREAT|O_EXCL;
372+ oflags = O_WRONLY|O_CREAT|O_EXCL;
373 vmaw->fd = qemu_open(filename, oflags, 0644);
374 }
375
2dfd543c 376@@ -415,10 +470,19 @@
309874bd
DM
377
378 qemu_co_mutex_init(&vmaw->writer_lock);
379 qemu_co_mutex_init(&vmaw->flush_lock);
380- qemu_co_queue_init(&vmaw->wqueue);
2dfd543c 381+ qemu_co_queue_init(&vmaw->wt.wqueue);
309874bd
DM
382
383 uuid_copy(vmaw->uuid, uuid);
384
309874bd
DM
385+ vmaw->wt.mutex = g_mutex_new();
386+ vmaw->wt.change_cond = g_cond_new();
387+ vmaw->wt.fd = vmaw->fd;
388+ vmaw->wt.thread = g_thread_create(vma_writer_thread, &vmaw->wt, true, NULL);
389+ if (vmaw->wt.thread == NULL) {
390+ error_setg(errp, "can't allocate writer thread\n");
391+ goto err;
392+ }
393+
394 return vmaw;
395
396 err:
2dfd543c 397@@ -433,6 +497,14 @@
309874bd
DM
398 g_checksum_free(vmaw->md5csum);
399 }
400
401+ if (vmaw->wt.mutex) {
402+ g_mutex_free(vmaw->wt.mutex);
403+ }
404+
405+ if (vmaw->wt.change_cond) {
406+ g_cond_free(vmaw->wt.change_cond);
407+ }
408+
409 g_free(vmaw);
410 }
411
2dfd543c 412@@ -672,6 +744,14 @@
309874bd
DM
413
414 *zero_bytes = 0;
415
309874bd 416+ int error = vmaw->wt.error;
309874bd
DM
417+
418+ if (error) {
419+ vma_writer_set_error(vmaw, "vma_writer_get_buffer write error - %s",
420+ strerror(error));
421+ return -1;
422+ }
423+
424 if (vmaw->status < 0) {
425 return vmaw->status;
426 }
2dfd543c
DM
427@@ -783,14 +863,17 @@
428
429 int i;
309874bd 430
2dfd543c 431+ DPRINTF("vma_writer_close start\n");
309874bd
DM
432 vma_queue_flush(vmaw);
433
2dfd543c 434 /* this should not happen - just to be sure */
309874bd 435- while (!qemu_co_queue_empty(&vmaw->wqueue)) {
2dfd543c
DM
436+ while (!qemu_co_queue_empty(&vmaw->wt.wqueue)) {
437 DPRINTF("vma_writer_close wait\n");
438 co_sleep_ns(rt_clock, 1000000);
439 }
309874bd 440
2dfd543c
DM
441+ vma_stop_writer_thread(vmaw);
442+
309874bd
DM
443 if (vmaw->cmd) {
444 if (pclose(vmaw->cmd) < 0) {
2dfd543c
DM
445 vma_writer_set_error(vmaw, "vma_writer_close: "
446@@ -835,8 +918,9 @@
309874bd
DM
447 {
448 assert(vmaw);
449
450- int i;
451+ vma_stop_writer_thread(vmaw);
452
453+ int i;
454 for (i = 0; i <= 255; i++) {
455 if (vmaw->stream_info[i].devname) {
456 g_free(vmaw->stream_info[i].devname);
2dfd543c 457@@ -847,6 +931,14 @@
309874bd
DM
458 g_checksum_free(vmaw->md5csum);
459 }
460
461+ if (vmaw->wt.mutex) {
462+ g_mutex_free(vmaw->wt.mutex);
463+ }
464+
465+ if (vmaw->wt.change_cond) {
466+ g_cond_free(vmaw->wt.change_cond);
467+ }
468+
469 g_free(vmaw);
470 }
471