]> git.proxmox.com Git - qemu.git/blame - buffered_file.c
buffered_file: unfold migrate_fd_put_buffer
[qemu.git] / buffered_file.c
CommitLineData
39b65c2e
AL
1/*
2 * QEMU buffered QEMUFile
3 *
4 * Copyright IBM, Corp. 2008
5 *
6 * Authors:
7 * Anthony Liguori <aliguori@us.ibm.com>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2. See
10 * the COPYING file in the top-level directory.
11 *
6b620ca3
PB
12 * Contributions after 2012-01-13 are licensed under the terms of the
13 * GNU GPL, version 2 or (at your option) any later version.
39b65c2e
AL
14 */
15
16#include "qemu-common.h"
17#include "hw/hw.h"
18#include "qemu-timer.h"
39b65c2e
AL
19#include "qemu-char.h"
20#include "buffered_file.h"
21
22//#define DEBUG_BUFFERED_FILE
23
24typedef struct QEMUFileBuffered
25{
39b65c2e
AL
26 BufferedPutReadyFunc *put_ready;
27 BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
28 BufferedCloseFunc *close;
c7a8f0cd 29 MigrationState *migration_state;
39b65c2e 30 QEMUFile *file;
39b65c2e
AL
31 int freeze_output;
32 size_t bytes_xfer;
33 size_t xfer_limit;
34 uint8_t *buffer;
35 size_t buffer_size;
36 size_t buffer_capacity;
37 QEMUTimer *timer;
38} QEMUFileBuffered;
39
40#ifdef DEBUG_BUFFERED_FILE
d0f2c4c6 41#define DPRINTF(fmt, ...) \
39b65c2e
AL
42 do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
43#else
d0f2c4c6 44#define DPRINTF(fmt, ...) \
39b65c2e
AL
45 do { } while (0)
46#endif
47
48static void buffered_append(QEMUFileBuffered *s,
49 const uint8_t *buf, size_t size)
50{
51 if (size > (s->buffer_capacity - s->buffer_size)) {
d0f2c4c6 52 DPRINTF("increasing buffer capacity from %zu by %zu\n",
39b65c2e
AL
53 s->buffer_capacity, size + 1024);
54
55 s->buffer_capacity += size + 1024;
56
05e72dc5 57 s->buffer = g_realloc(s->buffer, s->buffer_capacity);
39b65c2e
AL
58 }
59
60 memcpy(s->buffer + s->buffer_size, buf, size);
61 s->buffer_size += size;
62}
63
64static void buffered_flush(QEMUFileBuffered *s)
65{
66 size_t offset = 0;
42802d47 67 int error;
39b65c2e 68
42802d47
JQ
69 error = qemu_file_get_error(s->file);
70 if (error != 0) {
71 DPRINTF("flush when error, bailing: %s\n", strerror(-error));
39b65c2e
AL
72 return;
73 }
74
d0f2c4c6 75 DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
39b65c2e 76
2dddf6f4 77 while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) {
39b65c2e
AL
78 ssize_t ret;
79
c87b015b
JQ
80 ret = migrate_fd_put_buffer(s->migration_state, s->buffer + offset,
81 s->buffer_size - offset);
39b65c2e 82 if (ret == -EAGAIN) {
d0f2c4c6 83 DPRINTF("backend not ready, freezing\n");
39b65c2e
AL
84 s->freeze_output = 1;
85 break;
86 }
87
88 if (ret <= 0) {
d0f2c4c6 89 DPRINTF("error flushing data, %zd\n", ret);
dcd1d224 90 qemu_file_set_error(s->file, ret);
39b65c2e
AL
91 break;
92 } else {
d0f2c4c6 93 DPRINTF("flushed %zd byte(s)\n", ret);
39b65c2e 94 offset += ret;
2dddf6f4 95 s->bytes_xfer += ret;
39b65c2e
AL
96 }
97 }
98
d0f2c4c6 99 DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
39b65c2e
AL
100 memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
101 s->buffer_size -= offset;
102}
103
104static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
105{
106 QEMUFileBuffered *s = opaque;
2dddf6f4 107 int error;
39b65c2e 108
d0f2c4c6 109 DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
39b65c2e 110
42802d47
JQ
111 error = qemu_file_get_error(s->file);
112 if (error) {
113 DPRINTF("flush when error, bailing: %s\n", strerror(-error));
114 return error;
39b65c2e
AL
115 }
116
d0f2c4c6 117 DPRINTF("unfreezing output\n");
39b65c2e
AL
118 s->freeze_output = 0;
119
2dddf6f4 120 if (size > 0) {
d0f2c4c6 121 DPRINTF("buffering %d bytes\n", size - offset);
2dddf6f4 122 buffered_append(s, buf, size);
39b65c2e
AL
123 }
124
2dddf6f4
PB
125 buffered_flush(s);
126
5e77aaa0
AK
127 if (pos == 0 && size == 0) {
128 DPRINTF("file is ready\n");
2dddf6f4 129 if (!s->freeze_output && s->bytes_xfer < s->xfer_limit) {
5e77aaa0 130 DPRINTF("notifying client\n");
8e92c9e2 131 s->put_ready(s->migration_state);
5e77aaa0
AK
132 }
133 }
134
2dddf6f4 135 return size;
39b65c2e
AL
136}
137
138static int buffered_close(void *opaque)
139{
140 QEMUFileBuffered *s = opaque;
141 int ret;
142
d0f2c4c6 143 DPRINTF("closing\n");
39b65c2e 144
2dddf6f4 145 s->xfer_limit = INT_MAX;
624b9cc2 146 while (!qemu_file_get_error(s->file) && s->buffer_size) {
39b65c2e
AL
147 buffered_flush(s);
148 if (s->freeze_output)
8e92c9e2 149 s->wait_for_unfreeze(s->migration_state);
39b65c2e
AL
150 }
151
8e92c9e2 152 ret = s->close(s->migration_state);
39b65c2e
AL
153
154 qemu_del_timer(s->timer);
155 qemu_free_timer(s->timer);
7267c094
AL
156 g_free(s->buffer);
157 g_free(s);
39b65c2e
AL
158
159 return ret;
160}
161
4fc7d819
JQ
162/*
163 * The meaning of the return values is:
164 * 0: We can continue sending
165 * 1: Time to stop
42802d47 166 * negative: There has been an error
4fc7d819 167 */
39b65c2e
AL
168static int buffered_rate_limit(void *opaque)
169{
170 QEMUFileBuffered *s = opaque;
42802d47 171 int ret;
39b65c2e 172
42802d47
JQ
173 ret = qemu_file_get_error(s->file);
174 if (ret) {
175 return ret;
4fc7d819 176 }
39b65c2e
AL
177 if (s->freeze_output)
178 return 1;
179
180 if (s->bytes_xfer > s->xfer_limit)
181 return 1;
182
183 return 0;
184}
185
3d002df3 186static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
19629537
GC
187{
188 QEMUFileBuffered *s = opaque;
624b9cc2 189 if (qemu_file_get_error(s->file)) {
19629537 190 goto out;
624b9cc2 191 }
3d002df3
MT
192 if (new_rate > SIZE_MAX) {
193 new_rate = SIZE_MAX;
194 }
195
19629537
GC
196 s->xfer_limit = new_rate / 10;
197
198out:
199 return s->xfer_limit;
200}
201
3d002df3 202static int64_t buffered_get_rate_limit(void *opaque)
c163b5ca 203{
204 QEMUFileBuffered *s = opaque;
205
206 return s->xfer_limit;
207}
208
39b65c2e
AL
209static void buffered_rate_tick(void *opaque)
210{
211 QEMUFileBuffered *s = opaque;
212
624b9cc2 213 if (qemu_file_get_error(s->file)) {
e447b1a6 214 buffered_close(s);
39b65c2e 215 return;
e447b1a6 216 }
39b65c2e 217
7bd427d8 218 qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
39b65c2e
AL
219
220 if (s->freeze_output)
221 return;
222
223 s->bytes_xfer = 0;
224
2dddf6f4 225 buffered_put_buffer(s, NULL, 0, 0);
39b65c2e
AL
226}
227
c7a8f0cd 228QEMUFile *qemu_fopen_ops_buffered(MigrationState *migration_state,
39b65c2e 229 size_t bytes_per_sec,
39b65c2e
AL
230 BufferedPutReadyFunc *put_ready,
231 BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
232 BufferedCloseFunc *close)
233{
234 QEMUFileBuffered *s;
235
7267c094 236 s = g_malloc0(sizeof(*s));
39b65c2e 237
c7a8f0cd 238 s->migration_state = migration_state;
39b65c2e 239 s->xfer_limit = bytes_per_sec / 10;
39b65c2e
AL
240 s->put_ready = put_ready;
241 s->wait_for_unfreeze = wait_for_unfreeze;
242 s->close = close;
243
244 s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
19629537 245 buffered_close, buffered_rate_limit,
c163b5ca 246 buffered_set_rate_limit,
247 buffered_get_rate_limit);
39b65c2e 248
7bd427d8 249 s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
39b65c2e 250
7bd427d8 251 qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
39b65c2e
AL
252
253 return s->file;
254}