X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=migration%2Fqemu-file.c;h=04315855022d70d26a35bde708e086fdd5413312;hb=c4107e8208d0222f9b328691b519aaee4101db87;hp=6bb3dc15cd40b779806d706ed8d58dc645ebe6c8;hpb=f3a1b5068cea303a55e2a21a97e66d057eaae638;p=mirror_qemu.git diff --git a/migration/qemu-file.c b/migration/qemu-file.c index 6bb3dc15cd..0431585502 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -21,17 +21,38 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ +#include "qemu/osdep.h" #include -#include "qemu-common.h" #include "qemu/error-report.h" #include "qemu/iov.h" -#include "qemu/sockets.h" -#include "block/coroutine.h" -#include "migration/migration.h" -#include "migration/qemu-file.h" -#include "migration/qemu-file-internal.h" +#include "migration.h" +#include "qemu-file.h" #include "trace.h" +#define IO_BUF_SIZE 32768 +#define MAX_IOV_SIZE MIN(IOV_MAX, 64) + +struct QEMUFile { + const QEMUFileOps *ops; + const QEMUFileHooks *hooks; + void *opaque; + + int64_t bytes_xfer; + int64_t xfer_limit; + + int64_t pos; /* start of buffer when writing, end of buffer + when reading */ + int buf_index; + int buf_size; /* 0 when writing */ + uint8_t buf[IO_BUF_SIZE]; + + DECLARE_BITMAP(may_free, MAX_IOV_SIZE); + struct iovec iov[MAX_IOV_SIZE]; + unsigned int iovcnt; + + int last_error; +}; + /* * Stop a file from being read/written - not all backing files can do this * typically only sockets can. @@ -44,6 +65,18 @@ int qemu_file_shutdown(QEMUFile *f) return f->ops->shut_down(f->opaque, true, true); } +/* + * Result: QEMUFile* for a 'return path' for comms in the opposite direction + * NULL if not available + */ +QEMUFile *qemu_file_get_return_path(QEMUFile *f) +{ + if (!f->ops->get_return_path) { + return NULL; + } + return f->ops->get_return_path(f->opaque); +} + bool qemu_file_mode_is_not_valid(const char *mode) { if (mode == NULL || @@ -60,13 +93,19 @@ QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops) { QEMUFile *f; - f = g_malloc0(sizeof(QEMUFile)); + f = g_new0(QEMUFile, 1); f->opaque = opaque; f->ops = ops; return f; } + +void qemu_file_set_hooks(QEMUFile *f, const QEMUFileHooks *hooks) +{ + f->hooks = hooks; +} + /* * Get last error for stream f * @@ -88,48 +127,86 @@ void qemu_file_set_error(QEMUFile *f, int ret) bool qemu_file_is_writable(QEMUFile *f) { - return f->ops->writev_buffer || f->ops->put_buffer; + return f->ops->writev_buffer; +} + +static void qemu_iovec_release_ram(QEMUFile *f) +{ + struct iovec iov; + unsigned long idx; + + /* Find and release all the contiguous memory ranges marked as may_free. */ + idx = find_next_bit(f->may_free, f->iovcnt, 0); + if (idx >= f->iovcnt) { + return; + } + iov = f->iov[idx]; + + /* The madvise() in the loop is called for iov within a continuous range and + * then reinitialize the iov. And in the end, madvise() is called for the + * last iov. + */ + while ((idx = find_next_bit(f->may_free, f->iovcnt, idx + 1)) < f->iovcnt) { + /* check for adjacent buffer and coalesce them */ + if (iov.iov_base + iov.iov_len == f->iov[idx].iov_base) { + iov.iov_len += f->iov[idx].iov_len; + continue; + } + if (qemu_madvise(iov.iov_base, iov.iov_len, QEMU_MADV_DONTNEED) < 0) { + error_report("migrate: madvise DONTNEED failed %p %zd: %s", + iov.iov_base, iov.iov_len, strerror(errno)); + } + iov = f->iov[idx]; + } + if (qemu_madvise(iov.iov_base, iov.iov_len, QEMU_MADV_DONTNEED) < 0) { + error_report("migrate: madvise DONTNEED failed %p %zd: %s", + iov.iov_base, iov.iov_len, strerror(errno)); + } + memset(f->may_free, 0, sizeof(f->may_free)); } /** * Flushes QEMUFile buffer * * If there is writev_buffer QEMUFileOps it uses it otherwise uses - * put_buffer ops. + * put_buffer ops. This will flush all pending data. If data was + * only partially flushed, it will set an error state. */ void qemu_fflush(QEMUFile *f) { ssize_t ret = 0; + ssize_t expect = 0; if (!qemu_file_is_writable(f)) { return; } - if (f->ops->writev_buffer) { - if (f->iovcnt > 0) { - ret = f->ops->writev_buffer(f->opaque, f->iov, f->iovcnt, f->pos); - } - } else { - if (f->buf_index > 0) { - ret = f->ops->put_buffer(f->opaque, f->buf, f->pos, f->buf_index); - } + if (f->iovcnt > 0) { + expect = iov_size(f->iov, f->iovcnt); + ret = f->ops->writev_buffer(f->opaque, f->iov, f->iovcnt, f->pos); + + qemu_iovec_release_ram(f); } + if (ret >= 0) { f->pos += ret; } + /* We expect the QEMUFile write impl to send the full + * data set we requested, so sanity check that. + */ + if (ret != expect) { + qemu_file_set_error(f, ret < 0 ? ret : -EIO); + } f->buf_index = 0; f->iovcnt = 0; - if (ret < 0) { - qemu_file_set_error(f, ret); - } } void ram_control_before_iterate(QEMUFile *f, uint64_t flags) { int ret = 0; - if (f->ops->before_ram_iterate) { - ret = f->ops->before_ram_iterate(f, f->opaque, flags, NULL); + if (f->hooks && f->hooks->before_ram_iterate) { + ret = f->hooks->before_ram_iterate(f, f->opaque, flags, NULL); if (ret < 0) { qemu_file_set_error(f, ret); } @@ -140,8 +217,8 @@ void ram_control_after_iterate(QEMUFile *f, uint64_t flags) { int ret = 0; - if (f->ops->after_ram_iterate) { - ret = f->ops->after_ram_iterate(f, f->opaque, flags, NULL); + if (f->hooks && f->hooks->after_ram_iterate) { + ret = f->hooks->after_ram_iterate(f, f->opaque, flags, NULL); if (ret < 0) { qemu_file_set_error(f, ret); } @@ -152,8 +229,8 @@ void ram_control_load_hook(QEMUFile *f, uint64_t flags, void *data) { int ret = -EINVAL; - if (f->ops->hook_ram_load) { - ret = f->ops->hook_ram_load(f, f->opaque, flags, data); + if (f->hooks && f->hooks->hook_ram_load) { + ret = f->hooks->hook_ram_load(f, f->opaque, flags, data); if (ret < 0) { qemu_file_set_error(f, ret); } @@ -172,11 +249,15 @@ size_t ram_control_save_page(QEMUFile *f, ram_addr_t block_offset, ram_addr_t offset, size_t size, uint64_t *bytes_sent) { - if (f->ops->save_page) { - int ret = f->ops->save_page(f, f->opaque, block_offset, - offset, size, bytes_sent); + if (f->hooks && f->hooks->save_page) { + int ret = f->hooks->save_page(f, f->opaque, block_offset, + offset, size, bytes_sent); + if (ret != RAM_SAVE_CONTROL_NOT_SUPP) { + f->bytes_xfer += size; + } - if (ret != RAM_SAVE_CONTROL_DELAYED) { + if (ret != RAM_SAVE_CONTROL_DELAYED && + ret != RAM_SAVE_CONTROL_NOT_SUPP) { if (bytes_sent && *bytes_sent > 0) { qemu_update_position(f, *bytes_sent); } else if (ret < 0) { @@ -226,14 +307,6 @@ static ssize_t qemu_fill_buffer(QEMUFile *f) return len; } -int qemu_get_fd(QEMUFile *f) -{ - if (f->ops->get_fd) { - return f->ops->get_fd(f->opaque); - } - return -1; -} - void qemu_update_position(QEMUFile *f, size_t size) { f->pos += size; @@ -270,13 +343,19 @@ int qemu_fclose(QEMUFile *f) return ret; } -static void add_to_iovec(QEMUFile *f, const uint8_t *buf, int size) +static void add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size, + bool may_free) { /* check for adjacent buffer and coalesce them */ if (f->iovcnt > 0 && buf == f->iov[f->iovcnt - 1].iov_base + - f->iov[f->iovcnt - 1].iov_len) { + f->iov[f->iovcnt - 1].iov_len && + may_free == test_bit(f->iovcnt - 1, f->may_free)) + { f->iov[f->iovcnt - 1].iov_len += size; } else { + if (may_free) { + set_bit(f->iovcnt, f->may_free); + } f->iov[f->iovcnt].iov_base = (uint8_t *)buf; f->iov[f->iovcnt++].iov_len = size; } @@ -286,24 +365,20 @@ static void add_to_iovec(QEMUFile *f, const uint8_t *buf, int size) } } -void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, int size) +void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size, + bool may_free) { - if (!f->ops->writev_buffer) { - qemu_put_buffer(f, buf, size); - return; - } - if (f->last_error) { return; } f->bytes_xfer += size; - add_to_iovec(f, buf, size); + add_to_iovec(f, buf, size, may_free); } -void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size) +void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size) { - int l; + size_t l; if (f->last_error) { return; @@ -316,9 +391,7 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size) } memcpy(f->buf + f->buf_index, buf, l); f->bytes_xfer += l; - if (f->ops->writev_buffer) { - add_to_iovec(f, f->buf + f->buf_index, l); - } + add_to_iovec(f, f->buf + f->buf_index, l, false); f->buf_index += l; if (f->buf_index == IO_BUF_SIZE) { qemu_fflush(f); @@ -339,9 +412,7 @@ void qemu_put_byte(QEMUFile *f, int v) f->buf[f->buf_index] = v; f->bytes_xfer++; - if (f->ops->writev_buffer) { - add_to_iovec(f, f->buf + f->buf_index, 1); - } + add_to_iovec(f, f->buf + f->buf_index, 1, false); f->buf_index++; if (f->buf_index == IO_BUF_SIZE) { qemu_fflush(f); @@ -363,10 +434,10 @@ void qemu_file_skip(QEMUFile *f, int size) * return as many as it managed to read (assuming blocking fd's which * all current QEMUFile are) */ -int qemu_peek_buffer(QEMUFile *f, uint8_t **buf, int size, size_t offset) +size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset) { - int pending; - int index; + ssize_t pending; + size_t index; assert(!qemu_file_is_writable(f)); assert(offset < IO_BUF_SIZE); @@ -411,13 +482,13 @@ int qemu_peek_buffer(QEMUFile *f, uint8_t **buf, int size, size_t offset) * return as many as it managed to read (assuming blocking fd's which * all current QEMUFile are) */ -int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size) +size_t qemu_get_buffer(QEMUFile *f, uint8_t *buf, size_t size) { - int pending = size; - int done = 0; + size_t pending = size; + size_t done = 0; while (pending > 0) { - int res; + size_t res; uint8_t *src; res = qemu_peek_buffer(f, &src, MIN(pending, IO_BUF_SIZE), 0); @@ -433,6 +504,43 @@ int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size) return done; } +/* + * Read 'size' bytes of data from the file. + * 'size' can be larger than the internal buffer. + * + * The data: + * may be held on an internal buffer (in which case *buf is updated + * to point to it) that is valid until the next qemu_file operation. + * OR + * will be copied to the *buf that was passed in. + * + * The code tries to avoid the copy if possible. + * + * It will return size bytes unless there was an error, in which case it will + * return as many as it managed to read (assuming blocking fd's which + * all current QEMUFile are) + * + * Note: Since **buf may get changed, the caller should take care to + * keep a pointer to the original buffer if it needs to deallocate it. + */ +size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size) +{ + if (size < IO_BUF_SIZE) { + size_t res; + uint8_t *src; + + res = qemu_peek_buffer(f, &src, size, 0); + + if (res == size) { + qemu_file_skip(f, res); + *buf = src; + return res; + } + } + + return qemu_get_buffer(f, *buf, size); +} + /* * Peeks a single byte from the buffer; this isn't guaranteed to work if * offset leaves a gap after the previous read/peeked data. @@ -468,12 +576,8 @@ int64_t qemu_ftell_fast(QEMUFile *f) int64_t ret = f->pos; int i; - if (f->ops->writev_buffer) { - for (i = 0; i < f->iovcnt; i++) { - ret += f->iov[i].iov_len; - } - } else { - ret += f->buf_index; + for (i = 0; i < f->iovcnt; i++) { + ret += f->iov[i].iov_len; } return ret; @@ -557,25 +661,69 @@ uint64_t qemu_get_be64(QEMUFile *f) return v; } -/* compress size bytes of data start at p with specific compression - * level and store the compressed data to the buffer of f. - */ +/* return the size after compression, or negative value on error */ +static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len, + const uint8_t *source, size_t source_len) +{ + int err; + + err = deflateReset(stream); + if (err != Z_OK) { + return -1; + } + + stream->avail_in = source_len; + stream->next_in = (uint8_t *)source; + stream->avail_out = dest_len; + stream->next_out = dest; + + err = deflate(stream, Z_FINISH); + if (err != Z_STREAM_END) { + return -1; + } + + return stream->next_out - dest; +} -ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, - int level) +/* Compress size bytes of data start at p and store the compressed + * data to the buffer of f. + * + * When f is not writable, return -1 if f has no space to save the + * compressed data. + * When f is wirtable and it has no space to save the compressed data, + * do fflush first, if f still has no space to save the compressed + * data, return -1. + */ +ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, + const uint8_t *p, size_t size) { ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t); if (blen < compressBound(size)) { - return 0; + if (!qemu_file_is_writable(f)) { + return -1; + } + qemu_fflush(f); + blen = IO_BUF_SIZE - sizeof(int32_t); + if (blen < compressBound(size)) { + return -1; + } } - if (compress2(f->buf + f->buf_index + sizeof(int32_t), (uLongf *)&blen, - (Bytef *)p, size, level) != Z_OK) { - error_report("Compress Failed!"); - return 0; + + blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t), + blen, p, size); + if (blen < 0) { + return -1; } + qemu_put_be32(f, blen); + if (f->ops->writev_buffer) { + add_to_iovec(f, f->buf + f->buf_index, blen, false); + } f->buf_index += blen; + if (f->buf_index == IO_BUF_SIZE) { + qemu_fflush(f); + } return blen + sizeof(int32_t); } @@ -591,6 +739,7 @@ int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src) len = f_src->buf_index; qemu_put_buffer(f_des, f_src->buf, f_src->buf_index); f_src->buf_index = 0; + f_src->iovcnt = 0; } return len; } @@ -611,3 +760,29 @@ size_t qemu_get_counted_string(QEMUFile *f, char buf[256]) return res == len ? res : 0; } + +/* + * Put a string with one preceding byte containing its length. The length of + * the string should be less than 256. + */ +void qemu_put_counted_string(QEMUFile *f, const char *str) +{ + size_t len = strlen(str); + + assert(len < 256); + qemu_put_byte(f, len); + qemu_put_buffer(f, (const uint8_t *)str, len); +} + +/* + * Set the blocking state of the QEMUFile. + * Note: On some transports the OS only keeps a single blocking state for + * both directions, and thus changing the blocking on the main + * QEMUFile can also affect the return path. + */ +void qemu_file_set_blocking(QEMUFile *f, bool block) +{ + if (f->ops->set_blocking) { + f->ops->set_blocking(f->opaque, block); + } +}