]> git.proxmox.com Git - pve-qemu-kvm.git/blame - debian/patches/0007-use-extra-thread-for-vma-writer.patch
update backup patches
[pve-qemu-kvm.git] / debian / patches / 0007-use-extra-thread-for-vma-writer.patch
CommitLineData
309874bd
DM
1From 9cf799746fb3b21362fd62574cd76bfa14a24070 Mon Sep 17 00:00:00 2001
2From: Dietmar Maurer <dietmar@proxmox.com>
3Date: Mon, 14 Jan 2013 08:05:40 +0100
4Subject: [PATCH v3 7/7] use extra thread for vma writer
5
6The previous AIO approach has problem with bdrv_drain_all(), because writer
7coroutines are not considered there. Those coroutines are not restarted, so
8bdrv_drain_all() can fail (tracked_requests list not empty).
9
10We now use a thread, so we could also add compression here.
11
12Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
13---
14 vma-writer.c | 296 +++++++++++++++++++++++++++++++++++-----------------------
15 1 files changed, 180 insertions(+), 116 deletions(-)
16
17diff --git a/vma-writer.c b/vma-writer.c
18index 688af4b..e18591e 100644
19--- a/vma-writer.c
20+++ b/vma-writer.c
21@@ -38,13 +38,20 @@
22
23 #define WRITE_BUFFERS 5
24
25-typedef struct VmaAIOCB VmaAIOCB;
26-struct VmaAIOCB {
27- VmaWriter *vmaw;
28+typedef struct WriteBuffer {
29 unsigned char buffer[VMA_MAX_EXTENT_SIZE];
30 size_t bytes;
31- Coroutine *co;
32-};
33+} WriteBuffer;
34+
35+typedef struct WriterThread {
36+ int fd;
37+ int error;
38+ bool cancel;
39+ GThread *thread;
40+ GMutex *mutex;
41+ GCond *change_cond;
42+ WriteBuffer wbuf[WRITE_BUFFERS];
43+} WriterThread;
44
45 struct VmaWriter {
46 int fd;
47@@ -61,8 +68,7 @@ struct VmaWriter {
48 int outbuf_count; /* in VMA_BLOCKS */
49 uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT];
50
51- VmaAIOCB aiocbs[WRITE_BUFFERS];
52- CoQueue wqueue;
53+ WriterThread wt;
54
55 GChecksum *md5csum;
56 CoMutex writer_lock;
57@@ -88,6 +94,80 @@ struct VmaWriter {
58 uint32_t config_count;
59 };
60
61+static gpointer vma_writer_thread(gpointer data)
62+{
63+ WriterThread *wt = (WriterThread *)data;
64+
65+ while (1) {
66+ WriteBuffer *b = NULL;
67+
68+ g_mutex_lock(wt->mutex);
69+ int i;
70+ for (i = 0; i < WRITE_BUFFERS; i++) {
71+ if (wt->wbuf[i].bytes) {
72+ b = &wt->wbuf[i];
73+ break;
74+ }
75+ }
76+ g_mutex_unlock(wt->mutex);
77+
78+ if (b) {
79+ size_t done = 0;
80+ while (done < b->bytes) {
81+ int ret = write(wt->fd, b->buffer + done, b->bytes - done);
82+ if (ret > 0) {
83+ done += ret;
84+ } else if (ret < 0) {
85+ if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
86+ g_mutex_lock(wt->mutex);
87+ wt->error = errno;
88+ g_mutex_unlock(wt->mutex);
89+ break;
90+ }
91+ } else if (ret == 0) {
92+ /* should not happen - simply try again */
93+ }
94+ }
95+ g_mutex_lock(wt->mutex);
96+ b->bytes = 0;
97+ g_mutex_unlock(wt->mutex);
98+ }
99+
100+ if (wt->error) {
101+ DPRINTF("WRITER THREAD ERROR %d - exit thread\n", wt->error);
102+ g_thread_exit(NULL);
103+ }
104+
105+ g_mutex_lock(wt->mutex);
106+ bool cancel = wt->cancel;
107+ if (!b && !cancel) {
108+ g_cond_wait(wt->change_cond, wt->mutex);
109+ }
110+ g_mutex_unlock(wt->mutex);
111+
112+ if (cancel) {
113+ DPRINTF("END WRITER THREAD\n");
114+ g_thread_exit(NULL);
115+ }
116+ }
117+
118+ return NULL;
119+}
120+
121+static void vma_stop_writer_thread(VmaWriter *vmaw)
122+{
123+ assert(vmaw);
124+
125+ if (vmaw->wt.thread) {
126+ g_mutex_lock(vmaw->wt.mutex);
127+ vmaw->wt.cancel = true;
128+ g_cond_signal(vmaw->wt.change_cond);
129+ g_mutex_unlock(vmaw->wt.mutex);
130+ g_thread_join(vmaw->wt.thread);
131+ vmaw->wt.thread = NULL;
132+ }
133+}
134+
135 void vma_writer_set_error(VmaWriter *vmaw, const char *fmt, ...)
136 {
137 va_list ap;
138@@ -215,111 +295,47 @@ int vma_writer_register_stream(VmaWriter *vmaw, const char *devname,
139 return n;
140 }
141
142-static void vma_co_continue_write(void *opaque)
143-{
144- VmaWriter *vmaw = opaque;
145-
146- qemu_aio_set_fd_handler(vmaw->fd, NULL, NULL, NULL, NULL);
147-
148- DPRINTF("vma_co_continue_write\n");
149- qemu_coroutine_enter(vmaw->co_writer, NULL);
150-}
151-
152-static ssize_t coroutine_fn
153-vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes)
154-{
155- size_t done = 0;
156- ssize_t ret;
157-
158- /* atomic writes (we cannot interleave writes) */
159- qemu_co_mutex_lock(&vmaw->writer_lock);
160-
161- DPRINTF("vma_co_write enter %zd\n", bytes);
162-
163- while (done < bytes) {
164- ret = write(vmaw->fd, buf + done, bytes - done);
165- if (ret > 0) {
166- done += ret;
167- DPRINTF("vma_co_write written %zd %zd\n", done, ret);
168- } else if (ret < 0) {
169- if (errno == EAGAIN || errno == EWOULDBLOCK) {
170- DPRINTF("vma_co_write yield %zd\n", done);
171-
172- vmaw->co_writer = qemu_coroutine_self();
173- qemu_aio_set_fd_handler(vmaw->fd, NULL, vma_co_continue_write,
174- NULL, vmaw);
175-
176- qemu_coroutine_yield();
177- DPRINTF("vma_co_write restart %zd\n", done);
178- } else {
179- vma_writer_set_error(vmaw, "vma_co_write write error - %s",
180- strerror(errno));
181- done = -1; /* always return failure for partial writes */
182- break;
183- }
184- } else if (ret == 0) {
185- /* should not happen - simply try again */
186- }
187- }
188-
189- qemu_co_mutex_unlock(&vmaw->writer_lock);
190-
191- DPRINTF("vma_co_write leave %zd\n", done);
192- return done;
193-}
194-
195-static void coroutine_fn vma_co_writer_task(void *opaque)
196-{
197- VmaAIOCB *cb = opaque;
198-
199- DPRINTF("vma_co_writer_task start\n");
200-
201- int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes);
202- DPRINTF("vma_co_writer_task write done %zd\n", done);
203-
204- if (done != cb->bytes) {
205- DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done);
206- vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd",
207- done);
208- }
209-
210- cb->bytes = 0;
211-
212- qemu_co_queue_next(&cb->vmaw->wqueue);
213-
214- DPRINTF("vma_co_writer_task end\n");
215-}
216-
217 static void coroutine_fn vma_queue_flush(VmaWriter *vmaw)
218 {
219 DPRINTF("vma_queue_flush enter\n");
220
221 assert(vmaw);
222
223+ int error;
224+
225 while (1) {
226 int i;
227- VmaAIOCB *cb = NULL;
228+ WriteBuffer *b = NULL;
229+ g_mutex_lock(vmaw->wt.mutex);
230+
231+ error = vmaw->wt.error;
232+
233 for (i = 0; i < WRITE_BUFFERS; i++) {
234- if (vmaw->aiocbs[i].bytes) {
235- cb = &vmaw->aiocbs[i];
236- DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i,
237- vmaw->aiocbs[i].bytes);
238+ if (vmaw->wt.wbuf[i].bytes) {
239+ b = &vmaw->wt.wbuf[i];
240+ DPRINTF("FOUND USED WRITE BUFFER %d %zd\n", i,
241+ vmaw->wt.wbuf[i].bytes);
242 break;
243 }
244 }
245- if (!cb) {
246+ g_mutex_unlock(vmaw->wt.mutex);
247+
248+ if (!b || error) {
249 break;
250 }
251- qemu_co_queue_wait(&vmaw->wqueue);
252+ uint64_t delay_ns = 10000;
253+ DPRINTF("WAIT FOR BUFFER FLUSH %zd\n", delay_ns);
254+ co_sleep_ns(rt_clock, delay_ns);
255+ }
256+
257+ if (error) {
258+ vma_writer_set_error(vmaw, "vma_queue_flush write error - %s",
259+ strerror(error));
260 }
261
262 DPRINTF("vma_queue_flush leave\n");
263 }
264
265-/**
266- * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a')
267- * So we need to create a coroutione to allow 'parallel' execution.
268- */
269 static ssize_t coroutine_fn
270 vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
271 {
272@@ -329,29 +345,46 @@ vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
273 assert(buf);
274 assert(bytes <= VMA_MAX_EXTENT_SIZE);
275
276- VmaAIOCB *cb = NULL;
277- while (!cb) {
278+ int error = 0;
279+
280+ /* wait for a free output buffer */
281+ g_mutex_lock(vmaw->wt.mutex);
282+ WriteBuffer *b = NULL;
283+ while (!b) {
284+ error = vmaw->wt.error;
285+ if (error) {
286+ g_mutex_unlock(vmaw->wt.mutex);
287+ vma_writer_set_error(vmaw, "vma_queue_write error - %s",
288+ strerror(error));
289+ return -1;
290+ }
291+
292 int i;
293 for (i = 0; i < WRITE_BUFFERS; i++) {
294- if (!vmaw->aiocbs[i].bytes) {
295- cb = &vmaw->aiocbs[i];
296+ if (!vmaw->wt.wbuf[i].bytes) {
297+ b = &vmaw->wt.wbuf[i];
298 break;
299 }
300 }
301- if (!cb) {
302- qemu_co_queue_wait(&vmaw->wqueue);
303+ if (!b) {
304+ uint64_t delay_ns = 10000;
305+ DPRINTF("WAIT FOR BUFFER %zd\n", delay_ns);
306+ g_mutex_unlock(vmaw->wt.mutex);
307+ co_sleep_ns(rt_clock, delay_ns);
308+ g_mutex_lock(vmaw->wt.mutex);
309 }
310 }
311
312- memcpy(cb->buffer, buf, bytes);
313- cb->bytes = bytes;
314- cb->vmaw = vmaw;
315+ /* copy data to output buffer */
316+ memcpy(b->buffer, buf, bytes);
317+ b->bytes = bytes;
318+
319+ /* signal writer thread that we have new data */
320+ g_cond_signal(vmaw->wt.change_cond);
321
322- DPRINTF("vma_queue_write start %zd\n", bytes);
323- cb->co = qemu_coroutine_create(vma_co_writer_task);
324- qemu_coroutine_enter(cb->co, cb);
325+ g_mutex_unlock(vmaw->wt.mutex);
326
327- DPRINTF("vma_queue_write leave\n");
328+ DPRINTF("vma_queue_write queued %zd\n", bytes);
329
330 return bytes;
331 }
332@@ -389,10 +422,10 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
333 const char *tmp_id_str;
334
335 if ((stat(filename, &st) == 0) && S_ISFIFO(st.st_mode)) {
336- oflags = O_NONBLOCK|O_WRONLY;
337+ oflags = O_WRONLY;
338 vmaw->fd = qemu_open(filename, oflags, 0644);
339 } else if (strstart(filename, "/dev/fdset/", &tmp_id_str)) {
340- oflags = O_NONBLOCK|O_WRONLY;
341+ oflags = O_WRONLY;
342 vmaw->fd = qemu_open(filename, oflags, 0644);
343 } else if (strstart(filename, "/dev/fdname/", &tmp_id_str)) {
344 vmaw->fd = monitor_get_fd(cur_mon, tmp_id_str, errp);
345@@ -400,7 +433,7 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
346 goto err;
347 }
348 } else {
349- oflags = O_NONBLOCK|O_WRONLY|O_CREAT|O_EXCL;
350+ oflags = O_WRONLY|O_CREAT|O_EXCL;
351 vmaw->fd = qemu_open(filename, oflags, 0644);
352 }
353
354@@ -418,7 +451,6 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
355
356 qemu_co_mutex_init(&vmaw->writer_lock);
357 qemu_co_mutex_init(&vmaw->flush_lock);
358- qemu_co_queue_init(&vmaw->wqueue);
359
360 uuid_copy(vmaw->uuid, uuid);
361
362@@ -428,6 +460,15 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, int64_t speed,
363
364 ratelimit_set_speed(&vmaw->limit, speed, 100000000ULL /* 0.1 sec */);
365
366+ vmaw->wt.mutex = g_mutex_new();
367+ vmaw->wt.change_cond = g_cond_new();
368+ vmaw->wt.fd = vmaw->fd;
369+ vmaw->wt.thread = g_thread_create(vma_writer_thread, &vmaw->wt, true, NULL);
370+ if (vmaw->wt.thread == NULL) {
371+ error_setg(errp, "can't allocate writer thread\n");
372+ goto err;
373+ }
374+
375 return vmaw;
376
377 err:
378@@ -442,6 +483,14 @@ err:
379 g_checksum_free(vmaw->md5csum);
380 }
381
382+ if (vmaw->wt.mutex) {
383+ g_mutex_free(vmaw->wt.mutex);
384+ }
385+
386+ if (vmaw->wt.change_cond) {
387+ g_cond_free(vmaw->wt.change_cond);
388+ }
389+
390 g_free(vmaw);
391 }
392
393@@ -688,6 +737,16 @@ vma_writer_write(VmaWriter *vmaw, uint8_t dev_id, int64_t cluster_num,
394
395 *zero_bytes = 0;
396
397+ g_mutex_lock(vmaw->wt.mutex);
398+ int error = vmaw->wt.error;
399+ g_mutex_unlock(vmaw->wt.mutex);
400+
401+ if (error) {
402+ vma_writer_set_error(vmaw, "vma_writer_get_buffer write error - %s",
403+ strerror(error));
404+ return -1;
405+ }
406+
407 if (vmaw->status < 0) {
408 return vmaw->status;
409 }
410@@ -801,11 +860,7 @@ int vma_writer_close(VmaWriter *vmaw, Error **errp)
411
412 vma_queue_flush(vmaw);
413
414- /* this should not happen - just to be sure */
415- while (!qemu_co_queue_empty(&vmaw->wqueue)) {
416- DPRINTF("vma_writer_close wait\n");
417- co_sleep_ns(rt_clock, 1000000);
418- }
419+ vma_stop_writer_thread(vmaw);
420
421 if (vmaw->cmd) {
422 if (pclose(vmaw->cmd) < 0) {
423@@ -851,8 +906,9 @@ void vma_writer_destroy(VmaWriter *vmaw)
424 {
425 assert(vmaw);
426
427- int i;
428+ vma_stop_writer_thread(vmaw);
429
430+ int i;
431 for (i = 0; i <= 255; i++) {
432 if (vmaw->stream_info[i].devname) {
433 g_free(vmaw->stream_info[i].devname);
434@@ -863,6 +919,14 @@ void vma_writer_destroy(VmaWriter *vmaw)
435 g_checksum_free(vmaw->md5csum);
436 }
437
438+ if (vmaw->wt.mutex) {
439+ g_mutex_free(vmaw->wt.mutex);
440+ }
441+
442+ if (vmaw->wt.change_cond) {
443+ g_cond_free(vmaw->wt.change_cond);
444+ }
445+
446 g_free(vmaw);
447 }
448
449--
4501.7.2.5
451