]> git.proxmox.com Git - qemu.git/blob - buffered_file.c
migration: just lock migrate_fd_put_ready
[qemu.git] / buffered_file.c
1 /*
2 * QEMU buffered QEMUFile
3 *
4 * Copyright IBM, Corp. 2008
5 *
6 * Authors:
7 * Anthony Liguori <aliguori@us.ibm.com>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2. See
10 * the COPYING file in the top-level directory.
11 *
12 * Contributions after 2012-01-13 are licensed under the terms of the
13 * GNU GPL, version 2 or (at your option) any later version.
14 */
15
16 #include "qemu-common.h"
17 #include "hw/hw.h"
18 #include "qemu/timer.h"
19 #include "buffered_file.h"
20 #include "qemu/thread.h"
21
22 //#define DEBUG_BUFFERED_FILE
23
24 typedef struct QEMUFileBuffered
25 {
26 MigrationState *migration_state;
27 QEMUFile *file;
28 size_t bytes_xfer;
29 size_t xfer_limit;
30 uint8_t *buffer;
31 size_t buffer_size;
32 size_t buffer_capacity;
33 QemuThread thread;
34 } QEMUFileBuffered;
35
36 #ifdef DEBUG_BUFFERED_FILE
37 #define DPRINTF(fmt, ...) \
38 do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
39 #else
40 #define DPRINTF(fmt, ...) \
41 do { } while (0)
42 #endif
43
44 static void buffered_append(QEMUFileBuffered *s,
45 const uint8_t *buf, size_t size)
46 {
47 if (size > (s->buffer_capacity - s->buffer_size)) {
48 DPRINTF("increasing buffer capacity from %zu by %zu\n",
49 s->buffer_capacity, size + 1024);
50
51 s->buffer_capacity += size + 1024;
52
53 s->buffer = g_realloc(s->buffer, s->buffer_capacity);
54 }
55
56 memcpy(s->buffer + s->buffer_size, buf, size);
57 s->buffer_size += size;
58 }
59
60 static ssize_t buffered_flush(QEMUFileBuffered *s)
61 {
62 size_t offset = 0;
63 ssize_t ret = 0;
64
65 DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
66
67 while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) {
68 size_t to_send = MIN(s->buffer_size - offset, s->xfer_limit - s->bytes_xfer);
69 ret = migrate_fd_put_buffer(s->migration_state, s->buffer + offset,
70 to_send);
71 if (ret <= 0) {
72 DPRINTF("error flushing data, %zd\n", ret);
73 break;
74 } else {
75 DPRINTF("flushed %zd byte(s)\n", ret);
76 offset += ret;
77 s->bytes_xfer += ret;
78 }
79 }
80
81 DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
82 memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
83 s->buffer_size -= offset;
84
85 if (ret < 0) {
86 return ret;
87 }
88 return offset;
89 }
90
91 static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
92 {
93 QEMUFileBuffered *s = opaque;
94 ssize_t error;
95
96 DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
97
98 error = qemu_file_get_error(s->file);
99 if (error) {
100 DPRINTF("flush when error, bailing: %s\n", strerror(-error));
101 return error;
102 }
103
104 if (size > 0) {
105 DPRINTF("buffering %d bytes\n", size - offset);
106 buffered_append(s, buf, size);
107 }
108
109 error = buffered_flush(s);
110 if (error < 0) {
111 DPRINTF("buffered flush error. bailing: %s\n", strerror(-error));
112 return error;
113 }
114
115 if (pos == 0 && size == 0) {
116 DPRINTF("file is ready\n");
117 if (s->bytes_xfer < s->xfer_limit) {
118 DPRINTF("notifying client\n");
119 migrate_fd_put_ready(s->migration_state);
120 }
121 }
122
123 return size;
124 }
125
126 static int buffered_close(void *opaque)
127 {
128 QEMUFileBuffered *s = opaque;
129 ssize_t ret = 0;
130 int ret2;
131
132 DPRINTF("closing\n");
133
134 s->xfer_limit = INT_MAX;
135 while (!qemu_file_get_error(s->file) && s->buffer_size) {
136 ret = buffered_flush(s);
137 if (ret < 0) {
138 break;
139 }
140 }
141
142 ret2 = migrate_fd_close(s->migration_state);
143 if (ret >= 0) {
144 ret = ret2;
145 }
146 ret = migrate_fd_close(s->migration_state);
147 s->migration_state->complete = true;
148 return ret;
149 }
150
151 /*
152 * The meaning of the return values is:
153 * 0: We can continue sending
154 * 1: Time to stop
155 * negative: There has been an error
156 */
157 static int buffered_get_fd(void *opaque)
158 {
159 QEMUFileBuffered *s = opaque;
160
161 return qemu_get_fd(s->file);
162 }
163
164 static int buffered_rate_limit(void *opaque)
165 {
166 QEMUFileBuffered *s = opaque;
167 int ret;
168
169 ret = qemu_file_get_error(s->file);
170 if (ret) {
171 return ret;
172 }
173
174 if (s->bytes_xfer > s->xfer_limit)
175 return 1;
176
177 return 0;
178 }
179
180 static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
181 {
182 QEMUFileBuffered *s = opaque;
183 if (qemu_file_get_error(s->file)) {
184 goto out;
185 }
186 if (new_rate > SIZE_MAX) {
187 new_rate = SIZE_MAX;
188 }
189
190 s->xfer_limit = new_rate / 10;
191
192 out:
193 return s->xfer_limit;
194 }
195
196 static int64_t buffered_get_rate_limit(void *opaque)
197 {
198 QEMUFileBuffered *s = opaque;
199
200 return s->xfer_limit;
201 }
202
203 /* 10ms xfer_limit is the limit that we should write each 10ms */
204 #define BUFFER_DELAY 100
205
206 static void *buffered_file_thread(void *opaque)
207 {
208 QEMUFileBuffered *s = opaque;
209 int64_t expire_time = qemu_get_clock_ms(rt_clock) + BUFFER_DELAY;
210
211 while (true) {
212 int64_t current_time = qemu_get_clock_ms(rt_clock);
213
214 if (s->migration_state->complete) {
215 break;
216 }
217 if (current_time >= expire_time) {
218 s->bytes_xfer = 0;
219 expire_time = current_time + BUFFER_DELAY;
220 }
221 if (s->bytes_xfer >= s->xfer_limit) {
222 /* usleep expects microseconds */
223 g_usleep((expire_time - current_time)*1000);
224 }
225 buffered_put_buffer(s, NULL, 0, 0);
226 }
227 g_free(s->buffer);
228 g_free(s);
229 return NULL;
230 }
231
232 static const QEMUFileOps buffered_file_ops = {
233 .get_fd = buffered_get_fd,
234 .put_buffer = buffered_put_buffer,
235 .close = buffered_close,
236 .rate_limit = buffered_rate_limit,
237 .get_rate_limit = buffered_get_rate_limit,
238 .set_rate_limit = buffered_set_rate_limit,
239 };
240
241 void qemu_fopen_ops_buffered(MigrationState *migration_state)
242 {
243 QEMUFileBuffered *s;
244
245 s = g_malloc0(sizeof(*s));
246
247 s->migration_state = migration_state;
248 s->xfer_limit = migration_state->bandwidth_limit / 10;
249 s->migration_state->complete = false;
250
251 s->file = qemu_fopen_ops(s, &buffered_file_ops);
252
253 migration_state->file = s->file;
254
255 qemu_thread_create(&s->thread, buffered_file_thread, s,
256 QEMU_THREAD_DETACHED);
257 }