]> git.proxmox.com Git - pve-qemu-kvm.git/blob - debian/patches/old/0007-use-extra-thread-for-vma-writer.patch
d6d1f5fac2dac67f89e9b7324afdbb101fa1871b
[pve-qemu-kvm.git] / debian / patches / old / 0007-use-extra-thread-for-vma-writer.patch
1 From a6f324d47b810809de2a6106849527c6a9590175 Mon Sep 17 00:00:00 2001
2 From: Dietmar Maurer <dietmar@proxmox.com>
3 Date: Mon, 14 Jan 2013 08:05:40 +0100
4 Subject: [PATCH v3 7/7] use extra thread for vma writer
5
6 The previous AIO approach has problem with bdrv_drain_all(), because writer
7 coroutines are not considered there. Those coroutines are not restarted, so
8 bdrv_drain_all() can fail (tracked_requests list not empty).
9
10 We now use a thread, so we could also add compression here.
11
12 Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
13 ---
14 vma-writer.c | 296 +++++++++++++++++++++++++++++++++++-----------------------
15 1 files changed, 180 insertions(+), 116 deletions(-)
16
17 Index: new/vma-writer.c
18 ===================================================================
19 --- new.orig/vma-writer.c 2013-01-23 07:35:12.000000000 +0100
20 +++ new/vma-writer.c 2013-01-23 09:24:19.000000000 +0100
21 @@ -37,13 +37,21 @@
22
23 #define WRITE_BUFFERS 5
24
25 -typedef struct VmaAIOCB VmaAIOCB;
26 -struct VmaAIOCB {
27 - VmaWriter *vmaw;
28 +typedef struct WriteBuffer {
29 unsigned char buffer[VMA_MAX_EXTENT_SIZE];
30 size_t bytes;
31 - Coroutine *co;
32 -};
33 +} WriteBuffer;
34 +
35 +typedef struct WriterThread {
36 + int fd;
37 + int error;
38 + bool cancel;
39 + GThread *thread;
40 + GMutex *mutex;
41 + GCond *change_cond;
42 + WriteBuffer wbuf[WRITE_BUFFERS];
43 + CoQueue wqueue;
44 +} WriterThread;
45
46 struct VmaWriter {
47 int fd;
48 @@ -60,8 +68,7 @@
49 int outbuf_count; /* in VMA_BLOCKS */
50 uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT];
51
52 - VmaAIOCB aiocbs[WRITE_BUFFERS];
53 - CoQueue wqueue;
54 + WriterThread wt;
55
56 GChecksum *md5csum;
57 CoMutex writer_lock;
58 @@ -86,6 +93,107 @@
59 uint32_t config_count;
60 };
61
62 +static gpointer vma_writer_thread(gpointer data)
63 +{
64 + WriterThread *wt = (WriterThread *)data;
65 +
66 + while (1) {
67 + WriteBuffer *b = NULL;
68 +
69 + qemu_mutex_lock_iothread();
70 + int i;
71 + for (i = 0; i < WRITE_BUFFERS; i++) {
72 + if (wt->wbuf[i].bytes) {
73 + b = &wt->wbuf[i];
74 + break;
75 + }
76 + }
77 + qemu_mutex_unlock_iothread();
78 +
79 + if (b) {
80 + size_t done = 0;
81 + while (done < b->bytes) {
82 + int ret = write(wt->fd, b->buffer + done, b->bytes - done);
83 + if (ret > 0) {
84 + done += ret;
85 + } else if (ret < 0) {
86 + if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
87 + qemu_mutex_lock_iothread();
88 + wt->error = errno;
89 + qemu_mutex_unlock_iothread();
90 + break;
91 + }
92 + } else if (ret == 0) {
93 + /* should not happen - simply try again */
94 + }
95 + }
96 + qemu_mutex_lock_iothread();
97 + b->bytes = 0;
98 + DPRINTF("AWAKE JOB %d\n", wt->error);
99 + if (wt->error) {
100 + for (i = 0; i < WRITE_BUFFERS; i++) {
101 + wt->wbuf[i].bytes = 0;
102 + }
103 + qemu_co_queue_restart_all(&wt->wqueue);
104 + } else {
105 + qemu_co_queue_next(&wt->wqueue);
106 + }
107 + qemu_mutex_unlock_iothread();
108 + DPRINTF("AWAKE JOB END\n");
109 + }
110 +
111 + if (wt->error) {
112 + DPRINTF("WRITER THREAD ERROR %d - exit thread\n", wt->error);
113 + g_thread_exit(NULL);
114 + }
115 +
116 + g_mutex_lock(wt->mutex);
117 + bool cancel = wt->cancel;
118 + if (!b && !cancel) {
119 + DPRINTF("WRITER THREAD WAIT FOR DATA\n");
120 + g_cond_wait(wt->change_cond, wt->mutex);
121 + cancel = wt->cancel;
122 + }
123 + g_mutex_unlock(wt->mutex);
124 +
125 + if (cancel) {
126 + qemu_mutex_lock_iothread();
127 + for (i = 0; i < WRITE_BUFFERS; i++) {
128 + wt->wbuf[i].bytes = 0;
129 + }
130 + qemu_co_queue_restart_all(&wt->wqueue);
131 + qemu_mutex_unlock_iothread();
132 + DPRINTF("END WRITER THREAD\n");
133 + g_thread_exit(NULL);
134 + }
135 + }
136 +
137 + return NULL;
138 +}
139 +
140 +static void vma_stop_writer_thread(VmaWriter *vmaw)
141 +{
142 + assert(vmaw);
143 +
144 + DPRINTF("vma_stop_writer_thread start\n");
145 +
146 + if (vmaw->wt.thread) {
147 + DPRINTF("vma_stop_writer_thread 1\n");
148 + g_mutex_lock(vmaw->wt.mutex);
149 + DPRINTF("vma_stop_writer_thread 2\n");
150 + vmaw->wt.cancel = true;
151 + g_cond_signal(vmaw->wt.change_cond);
152 + g_mutex_unlock(vmaw->wt.mutex);
153 + DPRINTF("vma_stop_writer_thread 3\n");
154 + qemu_mutex_unlock_iothread();
155 + g_thread_join(vmaw->wt.thread);
156 + qemu_mutex_lock_iothread();
157 + DPRINTF("vma_stop_writer_thread 4\n");
158 + vmaw->wt.thread = NULL;
159 + }
160 + DPRINTF("vma_stop_writer_thread end\n");
161 +}
162 +
163 void vma_writer_set_error(VmaWriter *vmaw, const char *fmt, ...)
164 {
165 va_list ap;
166 @@ -213,111 +321,45 @@
167 return n;
168 }
169
170 -static void vma_co_continue_write(void *opaque)
171 -{
172 - VmaWriter *vmaw = opaque;
173 -
174 - qemu_aio_set_fd_handler(vmaw->fd, NULL, NULL, NULL, NULL);
175 -
176 - DPRINTF("vma_co_continue_write\n");
177 - qemu_coroutine_enter(vmaw->co_writer, NULL);
178 -}
179 -
180 -static ssize_t coroutine_fn
181 -vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes)
182 -{
183 - size_t done = 0;
184 - ssize_t ret;
185 -
186 - /* atomic writes (we cannot interleave writes) */
187 - qemu_co_mutex_lock(&vmaw->writer_lock);
188 -
189 - DPRINTF("vma_co_write enter %zd\n", bytes);
190 -
191 - while (done < bytes) {
192 - ret = write(vmaw->fd, buf + done, bytes - done);
193 - if (ret > 0) {
194 - done += ret;
195 - DPRINTF("vma_co_write written %zd %zd\n", done, ret);
196 - } else if (ret < 0) {
197 - if (errno == EAGAIN || errno == EWOULDBLOCK) {
198 - DPRINTF("vma_co_write yield %zd\n", done);
199 -
200 - vmaw->co_writer = qemu_coroutine_self();
201 - qemu_aio_set_fd_handler(vmaw->fd, NULL, vma_co_continue_write,
202 - NULL, vmaw);
203 -
204 - qemu_coroutine_yield();
205 - DPRINTF("vma_co_write restart %zd\n", done);
206 - } else {
207 - vma_writer_set_error(vmaw, "vma_co_write write error - %s",
208 - strerror(errno));
209 - done = -1; /* always return failure for partial writes */
210 - break;
211 - }
212 - } else if (ret == 0) {
213 - /* should not happen - simply try again */
214 - }
215 - }
216 -
217 - qemu_co_mutex_unlock(&vmaw->writer_lock);
218 -
219 - DPRINTF("vma_co_write leave %zd\n", done);
220 - return done;
221 -}
222 -
223 -static void coroutine_fn vma_co_writer_task(void *opaque)
224 -{
225 - VmaAIOCB *cb = opaque;
226 -
227 - DPRINTF("vma_co_writer_task start\n");
228 -
229 - int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes);
230 - DPRINTF("vma_co_writer_task write done %zd\n", done);
231 -
232 - if (done != cb->bytes) {
233 - DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done);
234 - vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd",
235 - done);
236 - }
237 -
238 - cb->bytes = 0;
239 -
240 - qemu_co_queue_next(&cb->vmaw->wqueue);
241 -
242 - DPRINTF("vma_co_writer_task end\n");
243 -}
244 -
245 static void coroutine_fn vma_queue_flush(VmaWriter *vmaw)
246 {
247 DPRINTF("vma_queue_flush enter\n");
248
249 assert(vmaw);
250
251 + int error;
252 +
253 while (1) {
254 int i;
255 - VmaAIOCB *cb = NULL;
256 + WriteBuffer *b = NULL;
257 +
258 + error = vmaw->wt.error;
259 +
260 for (i = 0; i < WRITE_BUFFERS; i++) {
261 - if (vmaw->aiocbs[i].bytes) {
262 - cb = &vmaw->aiocbs[i];
263 - DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i,
264 - vmaw->aiocbs[i].bytes);
265 + if (vmaw->wt.wbuf[i].bytes) {
266 + b = &vmaw->wt.wbuf[i];
267 + DPRINTF("FOUND USED WRITE BUFFER %d %zd\n", i,
268 + vmaw->wt.wbuf[i].bytes);
269 break;
270 }
271 }
272 - if (!cb) {
273 +
274 + if (!b || error) {
275 break;
276 }
277 - qemu_co_queue_wait(&vmaw->wqueue);
278 + DPRINTF("WAIT FOR BUFFER FLUSH\n");
279 + qemu_co_queue_wait(&vmaw->wt.wqueue);
280 + DPRINTF("WAIT FOR BUFFER FLUSH END\n");
281 + }
282 +
283 + if (error) {
284 + vma_writer_set_error(vmaw, "vma_queue_flush write error - %s",
285 + strerror(error));
286 }
287
288 DPRINTF("vma_queue_flush leave\n");
289 }
290
291 -/**
292 - * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a')
293 - * So we need to create a coroutione to allow 'parallel' execution.
294 - */
295 static ssize_t coroutine_fn
296 vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
297 {
298 @@ -327,29 +369,42 @@
299 assert(buf);
300 assert(bytes <= VMA_MAX_EXTENT_SIZE);
301
302 - VmaAIOCB *cb = NULL;
303 - while (!cb) {
304 + int error = 0;
305 +
306 + /* wait for a free output buffer */
307 + WriteBuffer *b = NULL;
308 + while (!b) {
309 + error = vmaw->wt.error;
310 + if (error) {
311 + vma_writer_set_error(vmaw, "vma_queue_write error - %s",
312 + strerror(error));
313 + return -1;
314 + }
315 +
316 int i;
317 for (i = 0; i < WRITE_BUFFERS; i++) {
318 - if (!vmaw->aiocbs[i].bytes) {
319 - cb = &vmaw->aiocbs[i];
320 + if (!vmaw->wt.wbuf[i].bytes) {
321 + b = &vmaw->wt.wbuf[i];
322 break;
323 }
324 }
325 - if (!cb) {
326 - qemu_co_queue_wait(&vmaw->wqueue);
327 + if (!b) {
328 + DPRINTF("WAIT FOR BUFFER\n");
329 + qemu_co_queue_wait(&vmaw->wt.wqueue);
330 + DPRINTF("WAIT FOR BUFFER DONE\n");
331 }
332 }
333
334 - memcpy(cb->buffer, buf, bytes);
335 - cb->bytes = bytes;
336 - cb->vmaw = vmaw;
337 + /* copy data to output buffer */
338 + memcpy(b->buffer, buf, bytes);
339 + b->bytes = bytes;
340
341 - DPRINTF("vma_queue_write start %zd\n", bytes);
342 - cb->co = qemu_coroutine_create(vma_co_writer_task);
343 - qemu_coroutine_enter(cb->co, cb);
344 + g_mutex_lock(vmaw->wt.mutex);
345 + /* signal writer thread that we have new data */
346 + g_cond_signal(vmaw->wt.change_cond);
347 + g_mutex_unlock(vmaw->wt.mutex);
348
349 - DPRINTF("vma_queue_write leave\n");
350 + DPRINTF("vma_queue_write queued %zd\n", bytes);
351
352 return bytes;
353 }
354 @@ -386,10 +441,10 @@
355 const char *tmp_id_str;
356
357 if ((stat(filename, &st) == 0) && S_ISFIFO(st.st_mode)) {
358 - oflags = O_NONBLOCK|O_WRONLY;
359 + oflags = O_WRONLY;
360 vmaw->fd = qemu_open(filename, oflags, 0644);
361 } else if (strstart(filename, "/dev/fdset/", &tmp_id_str)) {
362 - oflags = O_NONBLOCK|O_WRONLY;
363 + oflags = O_WRONLY;
364 vmaw->fd = qemu_open(filename, oflags, 0644);
365 } else if (strstart(filename, "/dev/fdname/", &tmp_id_str)) {
366 vmaw->fd = monitor_get_fd(cur_mon, tmp_id_str, errp);
367 @@ -397,7 +452,7 @@
368 goto err;
369 }
370 } else {
371 - oflags = O_NONBLOCK|O_WRONLY|O_CREAT|O_EXCL;
372 + oflags = O_WRONLY|O_CREAT|O_EXCL;
373 vmaw->fd = qemu_open(filename, oflags, 0644);
374 }
375
376 @@ -415,10 +470,19 @@
377
378 qemu_co_mutex_init(&vmaw->writer_lock);
379 qemu_co_mutex_init(&vmaw->flush_lock);
380 - qemu_co_queue_init(&vmaw->wqueue);
381 + qemu_co_queue_init(&vmaw->wt.wqueue);
382
383 uuid_copy(vmaw->uuid, uuid);
384
385 + vmaw->wt.mutex = g_mutex_new();
386 + vmaw->wt.change_cond = g_cond_new();
387 + vmaw->wt.fd = vmaw->fd;
388 + vmaw->wt.thread = g_thread_create(vma_writer_thread, &vmaw->wt, true, NULL);
389 + if (vmaw->wt.thread == NULL) {
390 + error_setg(errp, "can't allocate writer thread\n");
391 + goto err;
392 + }
393 +
394 return vmaw;
395
396 err:
397 @@ -433,6 +497,14 @@
398 g_checksum_free(vmaw->md5csum);
399 }
400
401 + if (vmaw->wt.mutex) {
402 + g_mutex_free(vmaw->wt.mutex);
403 + }
404 +
405 + if (vmaw->wt.change_cond) {
406 + g_cond_free(vmaw->wt.change_cond);
407 + }
408 +
409 g_free(vmaw);
410 }
411
412 @@ -672,6 +744,14 @@
413
414 *zero_bytes = 0;
415
416 + int error = vmaw->wt.error;
417 +
418 + if (error) {
419 + vma_writer_set_error(vmaw, "vma_writer_get_buffer write error - %s",
420 + strerror(error));
421 + return -1;
422 + }
423 +
424 if (vmaw->status < 0) {
425 return vmaw->status;
426 }
427 @@ -783,14 +863,17 @@
428
429 int i;
430
431 + DPRINTF("vma_writer_close start\n");
432 vma_queue_flush(vmaw);
433
434 /* this should not happen - just to be sure */
435 - while (!qemu_co_queue_empty(&vmaw->wqueue)) {
436 + while (!qemu_co_queue_empty(&vmaw->wt.wqueue)) {
437 DPRINTF("vma_writer_close wait\n");
438 co_sleep_ns(rt_clock, 1000000);
439 }
440
441 + vma_stop_writer_thread(vmaw);
442 +
443 if (vmaw->cmd) {
444 if (pclose(vmaw->cmd) < 0) {
445 vma_writer_set_error(vmaw, "vma_writer_close: "
446 @@ -835,8 +918,9 @@
447 {
448 assert(vmaw);
449
450 - int i;
451 + vma_stop_writer_thread(vmaw);
452
453 + int i;
454 for (i = 0; i <= 255; i++) {
455 if (vmaw->stream_info[i].devname) {
456 g_free(vmaw->stream_info[i].devname);
457 @@ -847,6 +931,14 @@
458 g_checksum_free(vmaw->md5csum);
459 }
460
461 + if (vmaw->wt.mutex) {
462 + g_mutex_free(vmaw->wt.mutex);
463 + }
464 +
465 + if (vmaw->wt.change_cond) {
466 + g_cond_free(vmaw->wt.change_cond);
467 + }
468 +
469 g_free(vmaw);
470 }
471