]> git.proxmox.com Git - pve-qemu-kvm.git/blob - debian/patches/pve/0019-backup-vma-remove-async-queue.patch
066f1b1fe56f0dc9741d3b1a822e720ea41ab4ea
[pve-qemu-kvm.git] / debian / patches / pve / 0019-backup-vma-remove-async-queue.patch
1 From bf0b444a62df49c016eb47f0299e5656d830234e Mon Sep 17 00:00:00 2001
2 From: Wolfgang Bumiller <w.bumiller@proxmox.com>
3 Date: Wed, 9 Dec 2015 15:40:00 +0100
4 Subject: [PATCH 19/47] backup: vma: remove async queue
5
6 ---
7 blockdev.c | 6 ++
8 vma-writer.c | 179 +++++++++++------------------------------------------------
9 2 files changed, 38 insertions(+), 147 deletions(-)
10
11 diff --git a/blockdev.c b/blockdev.c
12 index 6253ef1..ef159b0 100644
13 --- a/blockdev.c
14 +++ b/blockdev.c
15 @@ -3122,6 +3122,11 @@ static void pvebackup_cancel(void *opaque)
16 error_setg(&backup_state.error, "backup cancelled");
17 }
18
19 + if (backup_state.vmaw) {
20 + /* make sure vma writer does not block anymore */
21 + vma_writer_set_error(backup_state.vmaw, "backup cancelled");
22 + }
23 +
24 /* drain all i/o (awake jobs waiting for aio) */
25 bdrv_drain_all();
26
27 @@ -3134,6 +3139,7 @@ static void pvebackup_cancel(void *opaque)
28 if (job) {
29 if (!di->completed) {
30 block_job_cancel_sync(job);
31 + bdrv_drain_all(); /* drain all i/o (awake jobs waiting for aio) */
32 }
33 }
34 }
35 diff --git a/vma-writer.c b/vma-writer.c
36 index 689e988..ec8da53 100644
37 --- a/vma-writer.c
38 +++ b/vma-writer.c
39 @@ -28,14 +28,8 @@
40 do { if (DEBUG_VMA) { printf("vma: " fmt, ## __VA_ARGS__); } } while (0)
41
42 #define WRITE_BUFFERS 5
43 -
44 -typedef struct VmaAIOCB VmaAIOCB;
45 -struct VmaAIOCB {
46 - unsigned char buffer[VMA_MAX_EXTENT_SIZE];
47 - VmaWriter *vmaw;
48 - size_t bytes;
49 - Coroutine *co;
50 -};
51 +#define HEADER_CLUSTERS 8
52 +#define HEADERBUF_SIZE (VMA_CLUSTER_SIZE*HEADER_CLUSTERS)
53
54 struct VmaWriter {
55 int fd;
56 @@ -47,16 +41,14 @@ struct VmaWriter {
57 bool closed;
58
59 /* we always write extents */
60 - unsigned char outbuf[VMA_MAX_EXTENT_SIZE];
61 + unsigned char *outbuf;
62 int outbuf_pos; /* in bytes */
63 int outbuf_count; /* in VMA_BLOCKS */
64 uint64_t outbuf_block_info[VMA_BLOCKS_PER_EXTENT];
65
66 - VmaAIOCB *aiocbs[WRITE_BUFFERS];
67 - CoQueue wqueue;
68 + unsigned char *headerbuf;
69
70 GChecksum *md5csum;
71 - CoMutex writer_lock;
72 CoMutex flush_lock;
73 Coroutine *co_writer;
74
75 @@ -217,38 +209,39 @@ static void vma_co_continue_write(void *opaque)
76 }
77
78 static ssize_t coroutine_fn
79 -vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes)
80 +vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
81 {
82 - size_t done = 0;
83 - ssize_t ret;
84 + DPRINTF("vma_queue_write enter %zd\n", bytes);
85
86 - /* atomic writes (we cannot interleave writes) */
87 - qemu_co_mutex_lock(&vmaw->writer_lock);
88 + assert(vmaw);
89 + assert(buf);
90 + assert(bytes <= VMA_MAX_EXTENT_SIZE);
91
92 - DPRINTF("vma_co_write enter %zd\n", bytes);
93 + size_t done = 0;
94 + ssize_t ret;
95
96 assert(vmaw->co_writer == NULL);
97
98 vmaw->co_writer = qemu_coroutine_self();
99
100 - aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, vma_co_continue_write, vmaw);
101 -
102 - DPRINTF("vma_co_write wait until writable\n");
103 - qemu_coroutine_yield();
104 - DPRINTF("vma_co_write starting %zd\n", bytes);
105 -
106 while (done < bytes) {
107 + aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, vma_co_continue_write, NULL, vmaw);
108 + qemu_coroutine_yield();
109 + aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, NULL, NULL, NULL);
110 + if (vmaw->status < 0) {
111 + DPRINTF("vma_queue_write detected canceled backup\n");
112 + done = -1;
113 + break;
114 + }
115 ret = write(vmaw->fd, buf + done, bytes - done);
116 if (ret > 0) {
117 done += ret;
118 - DPRINTF("vma_co_write written %zd %zd\n", done, ret);
119 + DPRINTF("vma_queue_write written %zd %zd\n", done, ret);
120 } else if (ret < 0) {
121 if (errno == EAGAIN || errno == EWOULDBLOCK) {
122 - DPRINTF("vma_co_write yield %zd\n", done);
123 - qemu_coroutine_yield();
124 - DPRINTF("vma_co_write restart %zd\n", done);
125 - } else {
126 - vma_writer_set_error(vmaw, "vma_co_write write error - %s",
127 + /* try again */
128 + } else {
129 + vma_writer_set_error(vmaw, "vma_queue_write: write error - %s",
130 g_strerror(errno));
131 done = -1; /* always return failure for partial writes */
132 break;
133 @@ -258,102 +251,9 @@ vma_co_write(VmaWriter *vmaw, const void *buf, size_t bytes)
134 }
135 }
136
137 - aio_set_fd_handler(qemu_get_aio_context(), vmaw->fd, false, NULL, NULL, NULL);
138 -
139 vmaw->co_writer = NULL;
140 -
141 - qemu_co_mutex_unlock(&vmaw->writer_lock);
142 -
143 - DPRINTF("vma_co_write leave %zd\n", done);
144 - return done;
145 -}
146 -
147 -static void coroutine_fn vma_co_writer_task(void *opaque)
148 -{
149 - VmaAIOCB *cb = opaque;
150 -
151 - DPRINTF("vma_co_writer_task start\n");
152 -
153 - int64_t done = vma_co_write(cb->vmaw, cb->buffer, cb->bytes);
154 - DPRINTF("vma_co_writer_task write done %zd\n", done);
155 -
156 - if (done != cb->bytes) {
157 - DPRINTF("vma_co_writer_task failed write %zd %zd", cb->bytes, done);
158 - vma_writer_set_error(cb->vmaw, "vma_co_writer_task failed write %zd",
159 - done);
160 - }
161 -
162 - cb->bytes = 0;
163 -
164 - qemu_co_queue_next(&cb->vmaw->wqueue);
165 -
166 - DPRINTF("vma_co_writer_task end\n");
167 -}
168 -
169 -static void coroutine_fn vma_queue_flush(VmaWriter *vmaw)
170 -{
171 - DPRINTF("vma_queue_flush enter\n");
172 -
173 - assert(vmaw);
174 -
175 - while (1) {
176 - int i;
177 - VmaAIOCB *cb = NULL;
178 - for (i = 0; i < WRITE_BUFFERS; i++) {
179 - if (vmaw->aiocbs[i]->bytes) {
180 - cb = vmaw->aiocbs[i];
181 - DPRINTF("FOUND USED AIO BUFFER %d %zd\n", i,
182 - vmaw->aiocbs[i]->bytes);
183 - break;
184 - }
185 - }
186 - if (!cb) {
187 - break;
188 - }
189 - qemu_co_queue_wait(&vmaw->wqueue);
190 - }
191 -
192 - DPRINTF("vma_queue_flush leave\n");
193 -}
194 -
195 -/**
196 - * NOTE: pipe buffer size in only 4096 bytes on linux (see 'ulimit -a')
197 - * So we need to create a coroutione to allow 'parallel' execution.
198 - */
199 -static ssize_t coroutine_fn
200 -vma_queue_write(VmaWriter *vmaw, const void *buf, size_t bytes)
201 -{
202 - DPRINTF("vma_queue_write enter %zd\n", bytes);
203 -
204 - assert(vmaw);
205 - assert(buf);
206 - assert(bytes <= VMA_MAX_EXTENT_SIZE);
207 -
208 - VmaAIOCB *cb = NULL;
209 - while (!cb) {
210 - int i;
211 - for (i = 0; i < WRITE_BUFFERS; i++) {
212 - if (!vmaw->aiocbs[i]->bytes) {
213 - cb = vmaw->aiocbs[i];
214 - break;
215 - }
216 - }
217 - if (!cb) {
218 - qemu_co_queue_wait(&vmaw->wqueue);
219 - }
220 - }
221 -
222 - memcpy(cb->buffer, buf, bytes);
223 - cb->bytes = bytes;
224 - cb->vmaw = vmaw;
225 -
226 - DPRINTF("vma_queue_write start %zd\n", bytes);
227 - cb->co = qemu_coroutine_create(vma_co_writer_task);
228 - qemu_coroutine_enter(cb->co, cb);
229 -
230 - DPRINTF("vma_queue_write leave\n");
231 -
232 - return bytes;
233 +
234 + return (done == bytes) ? bytes : -1;
235 }
236
237 VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, Error **errp)
238 @@ -420,20 +320,16 @@ VmaWriter *vma_writer_create(const char *filename, uuid_t uuid, Error **errp)
239 }
240
241 /* we use O_DIRECT, so we need to align IO buffers */
242 - int i;
243 - for (i = 0; i < WRITE_BUFFERS; i++) {
244 - vmaw->aiocbs[i] = qemu_memalign(512, sizeof(VmaAIOCB));
245 - memset(vmaw->aiocbs[i], 0, sizeof(VmaAIOCB));
246 - }
247 +
248 + vmaw->outbuf = qemu_memalign(512, VMA_MAX_EXTENT_SIZE);
249 + vmaw->headerbuf = qemu_memalign(512, HEADERBUF_SIZE);
250
251 vmaw->outbuf_count = 0;
252 vmaw->outbuf_pos = VMA_EXTENT_HEADER_SIZE;
253
254 vmaw->header_blob_table_pos = 1; /* start at pos 1 */
255
256 - qemu_co_mutex_init(&vmaw->writer_lock);
257 qemu_co_mutex_init(&vmaw->flush_lock);
258 - qemu_co_queue_init(&vmaw->wqueue);
259
260 uuid_copy(vmaw->uuid, uuid);
261
262 @@ -460,8 +356,7 @@ err:
263 static int coroutine_fn vma_write_header(VmaWriter *vmaw)
264 {
265 assert(vmaw);
266 - int header_clusters = 8;
267 - char buf[65536*header_clusters];
268 + unsigned char *buf = vmaw->headerbuf;
269 VmaHeader *head = (VmaHeader *)buf;
270
271 int i;
272 @@ -472,7 +367,7 @@ static int coroutine_fn vma_write_header(VmaWriter *vmaw)
273 return vmaw->status;
274 }
275
276 - memset(buf, 0, sizeof(buf));
277 + memset(buf, 0, HEADERBUF_SIZE);
278
279 head->magic = VMA_MAGIC;
280 head->version = GUINT32_TO_BE(1); /* v1 */
281 @@ -507,7 +402,7 @@ static int coroutine_fn vma_write_header(VmaWriter *vmaw)
282 uint32_t header_size = sizeof(VmaHeader) + vmaw->header_blob_table_size;
283 head->header_size = GUINT32_TO_BE(header_size);
284
285 - if (header_size > sizeof(buf)) {
286 + if (header_size > HEADERBUF_SIZE) {
287 return -1; /* just to be sure */
288 }
289
290 @@ -805,13 +700,7 @@ int vma_writer_close(VmaWriter *vmaw, Error **errp)
291
292 int i;
293
294 - vma_queue_flush(vmaw);
295 -
296 - /* this should not happen - just to be sure */
297 - while (!qemu_co_queue_empty(&vmaw->wqueue)) {
298 - DPRINTF("vma_writer_close wait\n");
299 - co_aio_sleep_ns(qemu_get_aio_context(), QEMU_CLOCK_REALTIME, 1000000);
300 - }
301 + assert(vmaw->co_writer == NULL);
302
303 if (vmaw->cmd) {
304 if (pclose(vmaw->cmd) < 0) {
305 @@ -869,9 +758,5 @@ void vma_writer_destroy(VmaWriter *vmaw)
306 g_checksum_free(vmaw->md5csum);
307 }
308
309 - for (i = 0; i < WRITE_BUFFERS; i++) {
310 - free(vmaw->aiocbs[i]);
311 - }
312 -
313 g_free(vmaw);
314 }
315 --
316 2.1.4
317