]> git.proxmox.com Git - mirror_qemu.git/blobdiff - migration/qemu-file.c
tcg: Add a new line after incompatibility warning
[mirror_qemu.git] / migration / qemu-file.c
index d2d40073f04f2edd6777c4b506b6b6eabef9b6e2..195fa94fcf3e8792f4e6d8b820423924ade5eb98 100644 (file)
  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  * THE SOFTWARE.
  */
+#include "qemu/osdep.h"
+#include <zlib.h>
 #include "qemu-common.h"
+#include "qemu/error-report.h"
 #include "qemu/iov.h"
 #include "qemu/sockets.h"
-#include "block/coroutine.h"
+#include "qemu/coroutine.h"
 #include "migration/migration.h"
 #include "migration/qemu-file.h"
-#include "migration/qemu-file-internal.h"
 #include "trace.h"
 
+#define IO_BUF_SIZE 32768
+#define MAX_IOV_SIZE MIN(IOV_MAX, 64)
+
+struct QEMUFile {
+    const QEMUFileOps *ops;
+    const QEMUFileHooks *hooks;
+    void *opaque;
+
+    int64_t bytes_xfer;
+    int64_t xfer_limit;
+
+    int64_t pos; /* start of buffer when writing, end of buffer
+                    when reading */
+    int buf_index;
+    int buf_size; /* 0 when writing */
+    uint8_t buf[IO_BUF_SIZE];
+
+    DECLARE_BITMAP(may_free, MAX_IOV_SIZE);
+    struct iovec iov[MAX_IOV_SIZE];
+    unsigned int iovcnt;
+
+    int last_error;
+};
+
+/*
+ * Stop a file from being read/written - not all backing files can do this
+ * typically only sockets can.
+ */
+int qemu_file_shutdown(QEMUFile *f)
+{
+    if (!f->ops->shut_down) {
+        return -ENOSYS;
+    }
+    return f->ops->shut_down(f->opaque, true, true);
+}
+
+/*
+ * Result: QEMUFile* for a 'return path' for comms in the opposite direction
+ *         NULL if not available
+ */
+QEMUFile *qemu_file_get_return_path(QEMUFile *f)
+{
+    if (!f->ops->get_return_path) {
+        return NULL;
+    }
+    return f->ops->get_return_path(f->opaque);
+}
+
 bool qemu_file_mode_is_not_valid(const char *mode)
 {
     if (mode == NULL ||
@@ -46,13 +96,19 @@ QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops)
 {
     QEMUFile *f;
 
-    f = g_malloc0(sizeof(QEMUFile));
+    f = g_new0(QEMUFile, 1);
 
     f->opaque = opaque;
     f->ops = ops;
     return f;
 }
 
+
+void qemu_file_set_hooks(QEMUFile *f, const QEMUFileHooks *hooks)
+{
+    f->hooks = hooks;
+}
+
 /*
  * Get last error for stream f
  *
@@ -74,48 +130,86 @@ void qemu_file_set_error(QEMUFile *f, int ret)
 
 bool qemu_file_is_writable(QEMUFile *f)
 {
-    return f->ops->writev_buffer || f->ops->put_buffer;
+    return f->ops->writev_buffer;
+}
+
+static void qemu_iovec_release_ram(QEMUFile *f)
+{
+    struct iovec iov;
+    unsigned long idx;
+
+    /* Find and release all the contiguous memory ranges marked as may_free. */
+    idx = find_next_bit(f->may_free, f->iovcnt, 0);
+    if (idx >= f->iovcnt) {
+        return;
+    }
+    iov = f->iov[idx];
+
+    /* The madvise() in the loop is called for iov within a continuous range and
+     * then reinitialize the iov. And in the end, madvise() is called for the
+     * last iov.
+     */
+    while ((idx = find_next_bit(f->may_free, f->iovcnt, idx + 1)) < f->iovcnt) {
+        /* check for adjacent buffer and coalesce them */
+        if (iov.iov_base + iov.iov_len == f->iov[idx].iov_base) {
+            iov.iov_len += f->iov[idx].iov_len;
+            continue;
+        }
+        if (qemu_madvise(iov.iov_base, iov.iov_len, QEMU_MADV_DONTNEED) < 0) {
+            error_report("migrate: madvise DONTNEED failed %p %zd: %s",
+                         iov.iov_base, iov.iov_len, strerror(errno));
+        }
+        iov = f->iov[idx];
+    }
+    if (qemu_madvise(iov.iov_base, iov.iov_len, QEMU_MADV_DONTNEED) < 0) {
+            error_report("migrate: madvise DONTNEED failed %p %zd: %s",
+                         iov.iov_base, iov.iov_len, strerror(errno));
+    }
+    memset(f->may_free, 0, sizeof(f->may_free));
 }
 
 /**
  * Flushes QEMUFile buffer
  *
  * If there is writev_buffer QEMUFileOps it uses it otherwise uses
- * put_buffer ops.
+ * put_buffer ops. This will flush all pending data. If data was
+ * only partially flushed, it will set an error state.
  */
 void qemu_fflush(QEMUFile *f)
 {
     ssize_t ret = 0;
+    ssize_t expect = 0;
 
     if (!qemu_file_is_writable(f)) {
         return;
     }
 
-    if (f->ops->writev_buffer) {
-        if (f->iovcnt > 0) {
-            ret = f->ops->writev_buffer(f->opaque, f->iov, f->iovcnt, f->pos);
-        }
-    } else {
-        if (f->buf_index > 0) {
-            ret = f->ops->put_buffer(f->opaque, f->buf, f->pos, f->buf_index);
-        }
+    if (f->iovcnt > 0) {
+        expect = iov_size(f->iov, f->iovcnt);
+        ret = f->ops->writev_buffer(f->opaque, f->iov, f->iovcnt, f->pos);
+
+        qemu_iovec_release_ram(f);
     }
+
     if (ret >= 0) {
         f->pos += ret;
     }
+    /* We expect the QEMUFile write impl to send the full
+     * data set we requested, so sanity check that.
+     */
+    if (ret != expect) {
+        qemu_file_set_error(f, ret < 0 ? ret : -EIO);
+    }
     f->buf_index = 0;
     f->iovcnt = 0;
-    if (ret < 0) {
-        qemu_file_set_error(f, ret);
-    }
 }
 
 void ram_control_before_iterate(QEMUFile *f, uint64_t flags)
 {
     int ret = 0;
 
-    if (f->ops->before_ram_iterate) {
-        ret = f->ops->before_ram_iterate(f, f->opaque, flags);
+    if (f->hooks && f->hooks->before_ram_iterate) {
+        ret = f->hooks->before_ram_iterate(f, f->opaque, flags, NULL);
         if (ret < 0) {
             qemu_file_set_error(f, ret);
         }
@@ -126,34 +220,41 @@ void ram_control_after_iterate(QEMUFile *f, uint64_t flags)
 {
     int ret = 0;
 
-    if (f->ops->after_ram_iterate) {
-        ret = f->ops->after_ram_iterate(f, f->opaque, flags);
+    if (f->hooks && f->hooks->after_ram_iterate) {
+        ret = f->hooks->after_ram_iterate(f, f->opaque, flags, NULL);
         if (ret < 0) {
             qemu_file_set_error(f, ret);
         }
     }
 }
 
-void ram_control_load_hook(QEMUFile *f, uint64_t flags)
+void ram_control_load_hook(QEMUFile *f, uint64_t flags, void *data)
 {
     int ret = -EINVAL;
 
-    if (f->ops->hook_ram_load) {
-        ret = f->ops->hook_ram_load(f, f->opaque, flags);
+    if (f->hooks && f->hooks->hook_ram_load) {
+        ret = f->hooks->hook_ram_load(f, f->opaque, flags, data);
         if (ret < 0) {
             qemu_file_set_error(f, ret);
         }
     } else {
-        qemu_file_set_error(f, ret);
+        /*
+         * Hook is a hook specifically requested by the source sending a flag
+         * that expects there to be a hook on the destination.
+         */
+        if (flags == RAM_CONTROL_HOOK) {
+            qemu_file_set_error(f, ret);
+        }
     }
 }
 
 size_t ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
-                         ram_addr_t offset, size_t size, int *bytes_sent)
+                             ram_addr_t offset, size_t size,
+                             uint64_t *bytes_sent)
 {
-    if (f->ops->save_page) {
-        int ret = f->ops->save_page(f, f->opaque, block_offset,
-                                    offset, size, bytes_sent);
+    if (f->hooks && f->hooks->save_page) {
+        int ret = f->hooks->save_page(f, f->opaque, block_offset,
+                                      offset, size, bytes_sent);
 
         if (ret != RAM_SAVE_CONTROL_DELAYED) {
             if (bytes_sent && *bytes_sent > 0) {
@@ -205,14 +306,6 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
     return len;
 }
 
-int qemu_get_fd(QEMUFile *f)
-{
-    if (f->ops->get_fd) {
-        return f->ops->get_fd(f->opaque);
-    }
-    return -1;
-}
-
 void qemu_update_position(QEMUFile *f, size_t size)
 {
     f->pos += size;
@@ -249,13 +342,19 @@ int qemu_fclose(QEMUFile *f)
     return ret;
 }
 
-static void add_to_iovec(QEMUFile *f, const uint8_t *buf, int size)
+static void add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size,
+                         bool may_free)
 {
     /* check for adjacent buffer and coalesce them */
     if (f->iovcnt > 0 && buf == f->iov[f->iovcnt - 1].iov_base +
-        f->iov[f->iovcnt - 1].iov_len) {
+        f->iov[f->iovcnt - 1].iov_len &&
+        may_free == test_bit(f->iovcnt - 1, f->may_free))
+    {
         f->iov[f->iovcnt - 1].iov_len += size;
     } else {
+        if (may_free) {
+            set_bit(f->iovcnt, f->may_free);
+        }
         f->iov[f->iovcnt].iov_base = (uint8_t *)buf;
         f->iov[f->iovcnt++].iov_len = size;
     }
@@ -265,24 +364,20 @@ static void add_to_iovec(QEMUFile *f, const uint8_t *buf, int size)
     }
 }
 
-void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, int size)
+void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size,
+                           bool may_free)
 {
-    if (!f->ops->writev_buffer) {
-        qemu_put_buffer(f, buf, size);
-        return;
-    }
-
     if (f->last_error) {
         return;
     }
 
     f->bytes_xfer += size;
-    add_to_iovec(f, buf, size);
+    add_to_iovec(f, buf, size, may_free);
 }
 
-void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size)
+void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
 {
-    int l;
+    size_t l;
 
     if (f->last_error) {
         return;
@@ -295,9 +390,7 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size)
         }
         memcpy(f->buf + f->buf_index, buf, l);
         f->bytes_xfer += l;
-        if (f->ops->writev_buffer) {
-            add_to_iovec(f, f->buf + f->buf_index, l);
-        }
+        add_to_iovec(f, f->buf + f->buf_index, l, false);
         f->buf_index += l;
         if (f->buf_index == IO_BUF_SIZE) {
             qemu_fflush(f);
@@ -318,9 +411,7 @@ void qemu_put_byte(QEMUFile *f, int v)
 
     f->buf[f->buf_index] = v;
     f->bytes_xfer++;
-    if (f->ops->writev_buffer) {
-        add_to_iovec(f, f->buf + f->buf_index, 1);
-    }
+    add_to_iovec(f, f->buf + f->buf_index, 1, false);
     f->buf_index++;
     if (f->buf_index == IO_BUF_SIZE) {
         qemu_fflush(f);
@@ -335,17 +426,17 @@ void qemu_file_skip(QEMUFile *f, int size)
 }
 
 /*
- * Read 'size' bytes from file (at 'offset') into buf without moving the
- * pointer.
+ * Read 'size' bytes from file (at 'offset') without moving the
+ * pointer and set 'buf' to point to that data.
  *
  * It will return size bytes unless there was an error, in which case it will
  * return as many as it managed to read (assuming blocking fd's which
  * all current QEMUFile are)
  */
-int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, int size, size_t offset)
+size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset)
 {
-    int pending;
-    int index;
+    ssize_t pending;
+    size_t index;
 
     assert(!qemu_file_is_writable(f));
     assert(offset < IO_BUF_SIZE);
@@ -378,7 +469,7 @@ int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, int size, size_t offset)
         size = pending;
     }
 
-    memcpy(buf, f->buf + index, size);
+    *buf = f->buf + index;
     return size;
 }
 
@@ -390,18 +481,20 @@ int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, int size, size_t offset)
  * return as many as it managed to read (assuming blocking fd's which
  * all current QEMUFile are)
  */
-int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size)
+size_t qemu_get_buffer(QEMUFile *f, uint8_t *buf, size_t size)
 {
-    int pending = size;
-    int done = 0;
+    size_t pending = size;
+    size_t done = 0;
 
     while (pending > 0) {
-        int res;
+        size_t res;
+        uint8_t *src;
 
-        res = qemu_peek_buffer(f, buf, MIN(pending, IO_BUF_SIZE), 0);
+        res = qemu_peek_buffer(f, &src, MIN(pending, IO_BUF_SIZE), 0);
         if (res == 0) {
             return done;
         }
+        memcpy(buf, src, res);
         qemu_file_skip(f, res);
         buf += res;
         pending -= res;
@@ -410,6 +503,43 @@ int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size)
     return done;
 }
 
+/*
+ * Read 'size' bytes of data from the file.
+ * 'size' can be larger than the internal buffer.
+ *
+ * The data:
+ *   may be held on an internal buffer (in which case *buf is updated
+ *     to point to it) that is valid until the next qemu_file operation.
+ * OR
+ *   will be copied to the *buf that was passed in.
+ *
+ * The code tries to avoid the copy if possible.
+ *
+ * It will return size bytes unless there was an error, in which case it will
+ * return as many as it managed to read (assuming blocking fd's which
+ * all current QEMUFile are)
+ *
+ * Note: Since **buf may get changed, the caller should take care to
+ *       keep a pointer to the original buffer if it needs to deallocate it.
+ */
+size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size)
+{
+    if (size < IO_BUF_SIZE) {
+        size_t res;
+        uint8_t *src;
+
+        res = qemu_peek_buffer(f, &src, size, 0);
+
+        if (res == size) {
+            qemu_file_skip(f, res);
+            *buf = src;
+            return res;
+        }
+    }
+
+    return qemu_get_buffer(f, *buf, size);
+}
+
 /*
  * Peeks a single byte from the buffer; this isn't guaranteed to work if
  * offset leaves a gap after the previous read/peeked data.
@@ -440,6 +570,18 @@ int qemu_get_byte(QEMUFile *f)
     return result;
 }
 
+int64_t qemu_ftell_fast(QEMUFile *f)
+{
+    int64_t ret = f->pos;
+    int i;
+
+    for (i = 0; i < f->iovcnt; i++) {
+        ret += f->iov[i].iov_len;
+    }
+
+    return ret;
+}
+
 int64_t qemu_ftell(QEMUFile *f)
 {
     qemu_fflush(f);
@@ -503,7 +645,7 @@ unsigned int qemu_get_be16(QEMUFile *f)
 unsigned int qemu_get_be32(QEMUFile *f)
 {
     unsigned int v;
-    v = qemu_get_byte(f) << 24;
+    v = (unsigned int)qemu_get_byte(f) << 24;
     v |= qemu_get_byte(f) << 16;
     v |= qemu_get_byte(f) << 8;
     v |= qemu_get_byte(f);
@@ -517,3 +659,91 @@ uint64_t qemu_get_be64(QEMUFile *f)
     v |= qemu_get_be32(f);
     return v;
 }
+
+/* Compress size bytes of data start at p with specific compression
+ * level and store the compressed data to the buffer of f.
+ *
+ * When f is not writable, return -1 if f has no space to save the
+ * compressed data.
+ * When f is wirtable and it has no space to save the compressed data,
+ * do fflush first, if f still has no space to save the compressed
+ * data, return -1.
+ */
+
+ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
+                                  int level)
+{
+    ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
+
+    if (blen < compressBound(size)) {
+        if (!qemu_file_is_writable(f)) {
+            return -1;
+        }
+        qemu_fflush(f);
+        blen = IO_BUF_SIZE - sizeof(int32_t);
+        if (blen < compressBound(size)) {
+            return -1;
+        }
+    }
+    if (compress2(f->buf + f->buf_index + sizeof(int32_t), (uLongf *)&blen,
+                  (Bytef *)p, size, level) != Z_OK) {
+        error_report("Compress Failed!");
+        return 0;
+    }
+    qemu_put_be32(f, blen);
+    if (f->ops->writev_buffer) {
+        add_to_iovec(f, f->buf + f->buf_index, blen, false);
+    }
+    f->buf_index += blen;
+    if (f->buf_index == IO_BUF_SIZE) {
+        qemu_fflush(f);
+    }
+    return blen + sizeof(int32_t);
+}
+
+/* Put the data in the buffer of f_src to the buffer of f_des, and
+ * then reset the buf_index of f_src to 0.
+ */
+
+int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
+{
+    int len = 0;
+
+    if (f_src->buf_index > 0) {
+        len = f_src->buf_index;
+        qemu_put_buffer(f_des, f_src->buf, f_src->buf_index);
+        f_src->buf_index = 0;
+        f_src->iovcnt = 0;
+    }
+    return len;
+}
+
+/*
+ * Get a string whose length is determined by a single preceding byte
+ * A preallocated 256 byte buffer must be passed in.
+ * Returns: len on success and a 0 terminated string in the buffer
+ *          else 0
+ *          (Note a 0 length string will return 0 either way)
+ */
+size_t qemu_get_counted_string(QEMUFile *f, char buf[256])
+{
+    size_t len = qemu_get_byte(f);
+    size_t res = qemu_get_buffer(f, (uint8_t *)buf, len);
+
+    buf[res] = 0;
+
+    return res == len ? res : 0;
+}
+
+/*
+ * Set the blocking state of the QEMUFile.
+ * Note: On some transports the OS only keeps a single blocking state for
+ *       both directions, and thus changing the blocking on the main
+ *       QEMUFile can also affect the return path.
+ */
+void qemu_file_set_blocking(QEMUFile *f, bool block)
+{
+    if (f->ops->set_blocking) {
+        f->ops->set_blocking(f->opaque, block);
+    }
+}