Date: Mon, 6 Apr 2020 12:16:59 +0200
Subject: [PATCH] PVE-Backup: proxmox backup patches for qemu
+Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
+[PVE-Backup: avoid coroutines to fix AIO freeze, cleanups]
+Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
block/meson.build | 5 +
block/monitor/block-hmp-cmds.c | 33 ++
monitor/hmp-cmds.c | 44 ++
proxmox-backup-client.c | 176 ++++++
proxmox-backup-client.h | 59 ++
- pve-backup.c | 955 +++++++++++++++++++++++++++++++++
+ pve-backup.c | 957 +++++++++++++++++++++++++++++++++
qapi/block-core.json | 109 ++++
qapi/common.json | 13 +
qapi/machine.json | 15 +-
- 15 files changed, 1444 insertions(+), 14 deletions(-)
+ 15 files changed, 1446 insertions(+), 14 deletions(-)
create mode 100644 proxmox-backup-client.c
create mode 100644 proxmox-backup-client.h
create mode 100644 pve-backup.c
+#endif /* PROXMOX_BACKUP_CLIENT_H */
diff --git a/pve-backup.c b/pve-backup.c
new file mode 100644
-index 0000000000..55441eb9d1
+index 0000000000..d40f3f2fd6
--- /dev/null
+++ b/pve-backup.c
-@@ -0,0 +1,955 @@
+@@ -0,0 +1,957 @@
+#include "proxmox-backup-client.h"
+#include "vma.h"
+
+
+/* PVE backup state and related function */
+
++/*
++ * Note: A resume from a qemu_coroutine_yield can happen in a different thread,
++ * so you may not use normal mutexes within coroutines:
++ *
++ * ---bad-example---
++ * qemu_rec_mutex_lock(lock)
++ * ...
++ * qemu_coroutine_yield() // wait for something
++ * // we are now inside a different thread
++ * qemu_rec_mutex_unlock(lock) // Crash - wrong thread!!
++ * ---end-bad-example--
++ *
++ * ==> Always use CoMutext inside coroutines.
++ * ==> Never acquire/release AioContext withing coroutines (because that use QemuRecMutex)
++ *
++ */
+
+static struct PVEBackupState {
+ struct {
-+ // Everithing accessed from qmp command, protected using rwlock
-+ CoRwlock rwlock;
++ // Everithing accessed from qmp_backup_query command is protected using lock
++ QemuMutex lock;
+ Error *error;
+ time_t start_time;
+ time_t end_time;
+ size_t total;
+ size_t transferred;
+ size_t zero_bytes;
-+ bool cancel;
+ } stat;
+ int64_t speed;
+ VmaWriter *vmaw;
+ ProxmoxBackupHandle *pbs;
+ GList *di_list;
-+ CoMutex backup_mutex;
++ QemuMutex backup_mutex;
++ CoMutex dump_callback_mutex;
+} backup_state;
+
+static void pvebackup_init(void)
+{
-+ qemu_co_rwlock_init(&backup_state.stat.rwlock);
-+ qemu_co_mutex_init(&backup_state.backup_mutex);
++ qemu_mutex_init(&backup_state.stat.lock);
++ qemu_mutex_init(&backup_state.backup_mutex);
++ qemu_co_mutex_init(&backup_state.dump_callback_mutex);
+}
+
+// initialize PVEBackupState at startup
+ BlockDriverState *target;
+} PVEBackupDevInfo;
+
-+static void pvebackup_co_run_next_job(void);
++static void pvebackup_run_next_job(void);
+
++static BlockJob *
++lookup_active_block_job(PVEBackupDevInfo *di)
++{
++ if (!di->completed && di->bs) {
++ for (BlockJob *job = block_job_next(NULL); job; job = block_job_next(job)) {
++ if (job->job.driver->job_type != JOB_TYPE_BACKUP) {
++ continue;
++ }
++
++ BackupBlockJob *bjob = container_of(job, BackupBlockJob, common);
++ if (bjob && bjob->source_bs == di->bs) {
++ return job;
++ }
++ }
++ }
++ return NULL;
++}
++
++static void pvebackup_propagate_error(Error *err)
++{
++ qemu_mutex_lock(&backup_state.stat.lock);
++ error_propagate(&backup_state.stat.error, err);
++ qemu_mutex_unlock(&backup_state.stat.lock);
++}
++
++static bool pvebackup_error_or_canceled(void)
++{
++ qemu_mutex_lock(&backup_state.stat.lock);
++ bool error_or_canceled = !!backup_state.stat.error;
++ qemu_mutex_unlock(&backup_state.stat.lock);
++
++ return error_or_canceled;
++}
++
++static void pvebackup_add_transfered_bytes(size_t transferred, size_t zero_bytes)
++{
++ qemu_mutex_lock(&backup_state.stat.lock);
++ backup_state.stat.zero_bytes += zero_bytes;
++ backup_state.stat.transferred += transferred;
++ qemu_mutex_unlock(&backup_state.stat.lock);
++}
++
++// This may get called from multiple coroutines in multiple io-threads
++// Note1: this may get called after job_cancel()
+static int coroutine_fn
-+pvebackup_co_dump_cb(
++pvebackup_co_dump_pbs_cb(
+ void *opaque,
+ uint64_t start,
+ uint64_t bytes,
+ const unsigned char *buf = pbuf;
+ PVEBackupDevInfo *di = opaque;
+
-+ qemu_co_rwlock_rdlock(&backup_state.stat.rwlock);
-+ bool cancel = backup_state.stat.cancel;
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
++ assert(backup_state.pbs);
++
++ Error *local_err = NULL;
++ int pbs_res = -1;
++
++ qemu_co_mutex_lock(&backup_state.dump_callback_mutex);
+
-+ if (cancel) {
-+ return size; // return success
++ // avoid deadlock if job is cancelled
++ if (pvebackup_error_or_canceled()) {
++ qemu_co_mutex_unlock(&backup_state.dump_callback_mutex);
++ return -1;
+ }
+
-+ qemu_co_mutex_lock(&backup_state.backup_mutex);
++ pbs_res = proxmox_backup_co_write_data(backup_state.pbs, di->dev_id, buf, start, size, &local_err);
++ qemu_co_mutex_unlock(&backup_state.dump_callback_mutex);
++
++ if (pbs_res < 0) {
++ pvebackup_propagate_error(local_err);
++ return pbs_res;
++ } else {
++ pvebackup_add_transfered_bytes(size, !buf ? size : 0);
++ }
++
++ return size;
++}
++
++// This may get called from multiple coroutines in multiple io-threads
++static int coroutine_fn
++pvebackup_co_dump_vma_cb(
++ void *opaque,
++ uint64_t start,
++ uint64_t bytes,
++ const void *pbuf)
++{
++ assert(qemu_in_coroutine());
++
++ const uint64_t size = bytes;
++ const unsigned char *buf = pbuf;
++ PVEBackupDevInfo *di = opaque;
+
+ int ret = -1;
+
-+ if (backup_state.vmaw) {
-+ size_t zero_bytes = 0;
-+ uint64_t remaining = size;
-+
-+ uint64_t cluster_num = start / VMA_CLUSTER_SIZE;
-+ if ((cluster_num * VMA_CLUSTER_SIZE) != start) {
-+ qemu_co_rwlock_rdlock(&backup_state.stat.rwlock);
-+ if (!backup_state.stat.error) {
-+ qemu_co_rwlock_upgrade(&backup_state.stat.rwlock);
-+ error_setg(&backup_state.stat.error,
-+ "got unaligned write inside backup dump "
-+ "callback (sector %ld)", start);
-+ }
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
-+ qemu_co_mutex_unlock(&backup_state.backup_mutex);
-+ return -1; // not aligned to cluster size
-+ }
++ assert(backup_state.vmaw);
+
-+ while (remaining > 0) {
-+ ret = vma_writer_write(backup_state.vmaw, di->dev_id, cluster_num,
-+ buf, &zero_bytes);
-+ ++cluster_num;
-+ if (buf) {
-+ buf += VMA_CLUSTER_SIZE;
-+ }
-+ if (ret < 0) {
-+ qemu_co_rwlock_rdlock(&backup_state.stat.rwlock);
-+ if (!backup_state.stat.error) {
-+ qemu_co_rwlock_upgrade(&backup_state.stat.rwlock);
-+ vma_writer_error_propagate(backup_state.vmaw, &backup_state.stat.error);
-+ }
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
++ uint64_t remaining = size;
+
-+ qemu_co_mutex_unlock(&backup_state.backup_mutex);
-+ return ret;
-+ } else {
-+ qemu_co_rwlock_wrlock(&backup_state.stat.rwlock);
-+ backup_state.stat.zero_bytes += zero_bytes;
-+ if (remaining >= VMA_CLUSTER_SIZE) {
-+ backup_state.stat.transferred += VMA_CLUSTER_SIZE;
-+ remaining -= VMA_CLUSTER_SIZE;
-+ } else {
-+ backup_state.stat.transferred += remaining;
-+ remaining = 0;
-+ }
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
-+ }
-+ }
-+ } else if (backup_state.pbs) {
++ uint64_t cluster_num = start / VMA_CLUSTER_SIZE;
++ if ((cluster_num * VMA_CLUSTER_SIZE) != start) {
+ Error *local_err = NULL;
-+ int pbs_res = -1;
++ error_setg(&local_err,
++ "got unaligned write inside backup dump "
++ "callback (sector %ld)", start);
++ pvebackup_propagate_error(local_err);
++ return -1; // not aligned to cluster size
++ }
+
-+ pbs_res = proxmox_backup_co_write_data(backup_state.pbs, di->dev_id, buf, start, size, &local_err);
++ while (remaining > 0) {
++ qemu_co_mutex_lock(&backup_state.dump_callback_mutex);
++ // avoid deadlock if job is cancelled
++ if (pvebackup_error_or_canceled()) {
++ qemu_co_mutex_unlock(&backup_state.dump_callback_mutex);
++ return -1;
++ }
+
-+ qemu_co_rwlock_wrlock(&backup_state.stat.rwlock);
++ size_t zero_bytes = 0;
++ ret = vma_writer_write(backup_state.vmaw, di->dev_id, cluster_num, buf, &zero_bytes);
++ qemu_co_mutex_unlock(&backup_state.dump_callback_mutex);
+
-+ if (pbs_res < 0) {
-+ error_propagate(&backup_state.stat.error, local_err);
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
-+ qemu_co_mutex_unlock(&backup_state.backup_mutex);
-+ return pbs_res;
++ ++cluster_num;
++ if (buf) {
++ buf += VMA_CLUSTER_SIZE;
++ }
++ if (ret < 0) {
++ Error *local_err = NULL;
++ vma_writer_error_propagate(backup_state.vmaw, &local_err);
++ pvebackup_propagate_error(local_err);
++ return ret;
+ } else {
-+ if (!buf) {
-+ backup_state.stat.zero_bytes += size;
++ if (remaining >= VMA_CLUSTER_SIZE) {
++ assert(ret == VMA_CLUSTER_SIZE);
++ pvebackup_add_transfered_bytes(VMA_CLUSTER_SIZE, zero_bytes);
++ remaining -= VMA_CLUSTER_SIZE;
++ } else {
++ assert(ret == remaining);
++ pvebackup_add_transfered_bytes(remaining, zero_bytes);
++ remaining = 0;
+ }
-+ backup_state.stat.transferred += size;
-+ }
-+
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
-+
-+ } else {
-+ qemu_co_rwlock_wrlock(&backup_state.stat.rwlock);
-+ if (!buf) {
-+ backup_state.stat.zero_bytes += size;
+ }
-+ backup_state.stat.transferred += size;
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
+ }
+
-+ qemu_co_mutex_unlock(&backup_state.backup_mutex);
-+
+ return size;
+}
+
-+static void coroutine_fn pvebackup_co_cleanup(void)
++// assumes the caller holds backup_mutex
++static void coroutine_fn pvebackup_co_cleanup(void *unused)
+{
+ assert(qemu_in_coroutine());
+
-+ qemu_co_mutex_lock(&backup_state.backup_mutex);
-+
-+ qemu_co_rwlock_wrlock(&backup_state.stat.rwlock);
++ qemu_mutex_lock(&backup_state.stat.lock);
+ backup_state.stat.end_time = time(NULL);
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
++ qemu_mutex_unlock(&backup_state.stat.lock);
+
+ if (backup_state.vmaw) {
+ Error *local_err = NULL;
+ vma_writer_close(backup_state.vmaw, &local_err);
+
+ if (local_err != NULL) {
-+ qemu_co_rwlock_wrlock(&backup_state.stat.rwlock);
-+ error_propagate(&backup_state.stat.error, local_err);
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
-+ }
++ pvebackup_propagate_error(local_err);
++ }
+
+ backup_state.vmaw = NULL;
+ }
+
+ if (backup_state.pbs) {
-+ qemu_co_rwlock_rdlock(&backup_state.stat.rwlock);
-+ bool error_or_canceled = backup_state.stat.error || backup_state.stat.cancel;
-+ if (!error_or_canceled) {
++ if (!pvebackup_error_or_canceled()) {
+ Error *local_err = NULL;
+ proxmox_backup_co_finish(backup_state.pbs, &local_err);
+ if (local_err != NULL) {
-+ qemu_co_rwlock_upgrade(&backup_state.stat.rwlock);
-+ error_propagate(&backup_state.stat.error, local_err);
-+ }
++ pvebackup_propagate_error(local_err);
++ }
+ }
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
+
+ proxmox_backup_disconnect(backup_state.pbs);
+ backup_state.pbs = NULL;
+
+ g_list_free(backup_state.di_list);
+ backup_state.di_list = NULL;
-+ qemu_co_mutex_unlock(&backup_state.backup_mutex);
+}
+
-+typedef struct PVEBackupCompeteCallbackData {
-+ PVEBackupDevInfo *di;
-+ int result;
-+} PVEBackupCompeteCallbackData;
-+
-+static void coroutine_fn pvebackup_co_complete_cb(void *opaque)
++// assumes the caller holds backup_mutex
++static void coroutine_fn pvebackup_complete_stream(void *opaque)
+{
-+ assert(qemu_in_coroutine());
-+
-+ PVEBackupCompeteCallbackData *cb_data = opaque;
-+
-+ qemu_co_mutex_lock(&backup_state.backup_mutex);
-+
-+ PVEBackupDevInfo *di = cb_data->di;
-+ int ret = cb_data->result;
-+
-+ di->completed = true;
-+
-+ qemu_co_rwlock_rdlock(&backup_state.stat.rwlock);
-+ bool error_or_canceled = backup_state.stat.error || backup_state.stat.cancel;
-+
-+ if (ret < 0 && !backup_state.stat.error) {
-+ qemu_co_rwlock_upgrade(&backup_state.stat.rwlock);
-+ error_setg(&backup_state.stat.error, "job failed with err %d - %s",
-+ ret, strerror(-ret));
-+ }
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
-+
-+ di->bs = NULL;
++ PVEBackupDevInfo *di = opaque;
+
-+ if (di->target) {
-+ bdrv_unref(di->target);
-+ di->target = NULL;
-+ }
++ bool error_or_canceled = pvebackup_error_or_canceled();
+
+ if (backup_state.vmaw) {
+ vma_writer_close_stream(backup_state.vmaw, di->dev_id);
+ Error *local_err = NULL;
+ proxmox_backup_co_close_image(backup_state.pbs, di->dev_id, &local_err);
+ if (local_err != NULL) {
-+ qemu_co_rwlock_wrlock(&backup_state.stat.rwlock);
-+ error_propagate(&backup_state.stat.error, local_err);
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
++ pvebackup_propagate_error(local_err);
+ }
+ }
++}
+
-+ // remove self from job queue
-+ backup_state.di_list = g_list_remove(backup_state.di_list, di);
-+ g_free(di);
++static void pvebackup_complete_cb(void *opaque, int ret)
++{
++ assert(!qemu_in_coroutine());
+
-+ int pending_jobs = g_list_length(backup_state.di_list);
++ PVEBackupDevInfo *di = opaque;
+
-+ qemu_co_mutex_unlock(&backup_state.backup_mutex);
++ qemu_mutex_lock(&backup_state.backup_mutex);
+
-+ if (pending_jobs > 0) {
-+ pvebackup_co_run_next_job();
-+ } else {
-+ pvebackup_co_cleanup();
++ di->completed = true;
++
++ if (ret < 0) {
++ Error *local_err = NULL;
++ error_setg(&local_err, "job failed with err %d - %s", ret, strerror(-ret));
++ pvebackup_propagate_error(local_err);
+ }
-+}
+
-+static void pvebackup_complete_cb(void *opaque, int ret)
-+{
-+ // This can be called from the main loop, or from a coroutine
-+ PVEBackupCompeteCallbackData cb_data = {
-+ .di = opaque,
-+ .result = ret,
-+ };
++ di->bs = NULL;
+
-+ if (qemu_in_coroutine()) {
-+ pvebackup_co_complete_cb(&cb_data);
-+ } else {
-+ block_on_coroutine_fn(pvebackup_co_complete_cb, &cb_data);
-+ }
-+}
++ assert(di->target == NULL);
+
-+static void coroutine_fn pvebackup_co_cancel(void *opaque)
-+{
-+ assert(qemu_in_coroutine());
++ block_on_coroutine_fn(pvebackup_complete_stream, di);
+
-+ qemu_co_rwlock_wrlock(&backup_state.stat.rwlock);
-+ backup_state.stat.cancel = true;
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
++ // remove self from job queue
++ backup_state.di_list = g_list_remove(backup_state.di_list, di);
+
-+ qemu_co_mutex_lock(&backup_state.backup_mutex);
++ g_free(di);
+
-+ // Avoid race between block jobs and backup-cancel command:
-+ if (!(backup_state.vmaw || backup_state.pbs)) {
-+ qemu_co_mutex_unlock(&backup_state.backup_mutex);
-+ return;
-+ }
++ qemu_mutex_unlock(&backup_state.backup_mutex);
+
-+ qemu_co_rwlock_rdlock(&backup_state.stat.rwlock);
-+ if (!backup_state.stat.error) {
-+ qemu_co_rwlock_upgrade(&backup_state.stat.rwlock);
-+ error_setg(&backup_state.stat.error, "backup cancelled");
-+ }
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
++ pvebackup_run_next_job();
++}
++
++static void pvebackup_cancel(void)
++{
++ assert(!qemu_in_coroutine());
++
++ Error *cancel_err = NULL;
++ error_setg(&cancel_err, "backup canceled");
++ pvebackup_propagate_error(cancel_err);
++
++ qemu_mutex_lock(&backup_state.backup_mutex);
+
+ if (backup_state.vmaw) {
+ /* make sure vma writer does not block anymore */
-+ vma_writer_set_error(backup_state.vmaw, "backup cancelled");
++ vma_writer_set_error(backup_state.vmaw, "backup canceled");
+ }
+
+ if (backup_state.pbs) {
-+ proxmox_backup_abort(backup_state.pbs, "backup cancelled");
++ proxmox_backup_abort(backup_state.pbs, "backup canceled");
+ }
+
-+ bool running_jobs = 0;
-+ GList *l = backup_state.di_list;
-+ while (l) {
-+ PVEBackupDevInfo *di = (PVEBackupDevInfo *)l->data;
-+ l = g_list_next(l);
-+ if (!di->completed && di->bs) {
-+ for (BlockJob *job = block_job_next(NULL); job; job = block_job_next(job)) {
-+ if (job->job.driver->job_type != JOB_TYPE_BACKUP) {
-+ continue;
-+ }
++ qemu_mutex_unlock(&backup_state.backup_mutex);
+
-+ BackupBlockJob *bjob = container_of(job, BackupBlockJob, common);
-+ if (bjob && bjob->source_bs == di->bs) {
-+ AioContext *aio_context = job->job.aio_context;
-+ aio_context_acquire(aio_context);
++ for(;;) {
+
-+ if (!di->completed) {
-+ running_jobs += 1;
-+ job_cancel(&job->job, false);
-+ }
-+ aio_context_release(aio_context);
-+ }
++ BlockJob *next_job = NULL;
++
++ qemu_mutex_lock(&backup_state.backup_mutex);
++
++ GList *l = backup_state.di_list;
++ while (l) {
++ PVEBackupDevInfo *di = (PVEBackupDevInfo *)l->data;
++ l = g_list_next(l);
++
++ BlockJob *job = lookup_active_block_job(di);
++ if (job != NULL) {
++ next_job = job;
++ break;
+ }
+ }
-+ }
+
-+ qemu_co_mutex_unlock(&backup_state.backup_mutex);
++ qemu_mutex_unlock(&backup_state.backup_mutex);
+
-+ if (running_jobs == 0) pvebackup_co_cleanup(); // else job will call completion handler
++ if (next_job) {
++ AioContext *aio_context = next_job->job.aio_context;
++ aio_context_acquire(aio_context);
++ job_cancel_sync(&next_job->job);
++ aio_context_release(aio_context);
++ } else {
++ break;
++ }
++ }
+}
+
+void qmp_backup_cancel(Error **errp)
+{
-+ block_on_coroutine_fn(pvebackup_co_cancel, NULL);
++ pvebackup_cancel();
+}
+
++// assumes the caller holds backup_mutex
+static int coroutine_fn pvebackup_co_add_config(
+ const char *file,
+ const char *name,
+
+bool job_should_pause(Job *job);
+
-+static void coroutine_fn pvebackup_co_run_next_job(void)
++static void pvebackup_run_next_job(void)
+{
-+ assert(qemu_in_coroutine());
++ assert(!qemu_in_coroutine());
+
-+ qemu_co_mutex_lock(&backup_state.backup_mutex);
++ qemu_mutex_lock(&backup_state.backup_mutex);
+
+ GList *l = backup_state.di_list;
+ while (l) {
+ PVEBackupDevInfo *di = (PVEBackupDevInfo *)l->data;
+ l = g_list_next(l);
-+ if (!di->completed && di->bs) {
-+ for (BlockJob *job = block_job_next(NULL); job; job = block_job_next(job)) {
-+ if (job->job.driver->job_type != JOB_TYPE_BACKUP) {
-+ continue;
-+ }
+
-+ BackupBlockJob *bjob = container_of(job, BackupBlockJob, common);
-+ if (bjob && bjob->source_bs == di->bs) {
-+ AioContext *aio_context = job->job.aio_context;
-+ qemu_co_mutex_unlock(&backup_state.backup_mutex);
-+ aio_context_acquire(aio_context);
-+
-+ if (job_should_pause(&job->job)) {
-+ qemu_co_rwlock_rdlock(&backup_state.stat.rwlock);
-+ bool error_or_canceled = backup_state.stat.error || backup_state.stat.cancel;
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
-+
-+ if (error_or_canceled) {
-+ job_cancel(&job->job, false);
-+ } else {
-+ job_resume(&job->job);
-+ }
-+ }
-+ aio_context_release(aio_context);
-+ return;
++ BlockJob *job = lookup_active_block_job(di);
++
++ if (job) {
++ qemu_mutex_unlock(&backup_state.backup_mutex);
++
++ AioContext *aio_context = job->job.aio_context;
++ aio_context_acquire(aio_context);
++
++ if (job_should_pause(&job->job)) {
++ bool error_or_canceled = pvebackup_error_or_canceled();
++ if (error_or_canceled) {
++ job_cancel_sync(&job->job);
++ } else {
++ job_resume(&job->job);
+ }
+ }
++ aio_context_release(aio_context);
++ return;
+ }
+ }
-+ qemu_co_mutex_unlock(&backup_state.backup_mutex);
++
++ block_on_coroutine_fn(pvebackup_co_cleanup, NULL); // no more jobs, run cleanup
++
++ qemu_mutex_unlock(&backup_state.backup_mutex);
++}
++
++static bool create_backup_jobs(void) {
++
++ assert(!qemu_in_coroutine());
++
++ Error *local_err = NULL;
++
++ /* create and start all jobs (paused state) */
++ GList *l = backup_state.di_list;
++ while (l) {
++ PVEBackupDevInfo *di = (PVEBackupDevInfo *)l->data;
++ l = g_list_next(l);
++
++ assert(di->target != NULL);
++
++ AioContext *aio_context = bdrv_get_aio_context(di->bs);
++ aio_context_acquire(aio_context);
++
++ BlockJob *job = backup_job_create(
++ NULL, di->bs, di->target, backup_state.speed, MIRROR_SYNC_MODE_FULL, NULL,
++ BITMAP_SYNC_MODE_NEVER, false, NULL, BLOCKDEV_ON_ERROR_REPORT, BLOCKDEV_ON_ERROR_REPORT,
++ JOB_DEFAULT, pvebackup_complete_cb, di, 1, NULL, &local_err);
++
++ aio_context_release(aio_context);
++
++ if (!job || local_err != NULL) {
++ Error *create_job_err = NULL;
++ error_setg(&create_job_err, "backup_job_create failed: %s",
++ local_err ? error_get_pretty(local_err) : "null");
++
++ pvebackup_propagate_error(create_job_err);
++ break;
++ }
++ job_start(&job->job);
++
++ bdrv_unref(di->target);
++ di->target = NULL;
++ }
++
++ bool errors = pvebackup_error_or_canceled();
++
++ if (errors) {
++ l = backup_state.di_list;
++ while (l) {
++ PVEBackupDevInfo *di = (PVEBackupDevInfo *)l->data;
++ l = g_list_next(l);
++
++ if (di->target) {
++ bdrv_unref(di->target);
++ di->target = NULL;
++ }
++ }
++ }
++
++ return errors;
+}
+
+typedef struct QmpBackupTask {
+ UuidInfo *result;
+} QmpBackupTask;
+
-+static void coroutine_fn pvebackup_co_start(void *opaque)
++// assumes the caller holds backup_mutex
++static void coroutine_fn pvebackup_co_prepare(void *opaque)
+{
+ assert(qemu_in_coroutine());
+
+ GList *di_list = NULL;
+ GList *l;
+ UuidInfo *uuid_info;
-+ BlockJob *job;
+
+ const char *config_name = "qemu-server.conf";
+ const char *firewall_name = "qemu-server.fw";
+
-+ qemu_co_mutex_lock(&backup_state.backup_mutex);
-+
+ if (backup_state.di_list) {
-+ qemu_co_mutex_unlock(&backup_state.backup_mutex);
-+ error_set(task->errp, ERROR_CLASS_GENERIC_ERROR,
++ error_set(task->errp, ERROR_CLASS_GENERIC_ERROR,
+ "previous backup not finished");
+ return;
+ }
+ if (dev_id < 0)
+ goto err;
+
-+ if (!(di->target = bdrv_backup_dump_create(dump_cb_block_size, di->size, pvebackup_co_dump_cb, di, task->errp))) {
++ if (!(di->target = bdrv_backup_dump_create(dump_cb_block_size, di->size, pvebackup_co_dump_pbs_cb, di, task->errp))) {
+ goto err;
+ }
+
+ PVEBackupDevInfo *di = (PVEBackupDevInfo *)l->data;
+ l = g_list_next(l);
+
-+ if (!(di->target = bdrv_backup_dump_create(VMA_CLUSTER_SIZE, di->size, pvebackup_co_dump_cb, di, task->errp))) {
++ if (!(di->target = bdrv_backup_dump_create(VMA_CLUSTER_SIZE, di->size, pvebackup_co_dump_vma_cb, di, task->errp))) {
+ goto err;
+ }
+
+ }
+ /* initialize global backup_state now */
+
-+ qemu_co_rwlock_wrlock(&backup_state.stat.rwlock);
-+
-+ backup_state.stat.cancel = false;
++ qemu_mutex_lock(&backup_state.stat.lock);
+
+ if (backup_state.stat.error) {
+ error_free(backup_state.stat.error);
+ backup_state.stat.transferred = 0;
+ backup_state.stat.zero_bytes = 0;
+
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
++ qemu_mutex_unlock(&backup_state.stat.lock);
+
+ backup_state.speed = (task->has_speed && task->speed > 0) ? task->speed : 0;
+
+
+ backup_state.di_list = di_list;
+
-+ /* start all jobs (paused state) */
-+ l = di_list;
-+ while (l) {
-+ PVEBackupDevInfo *di = (PVEBackupDevInfo *)l->data;
-+ l = g_list_next(l);
-+
-+ // make sure target runs in same aoi_context as source
-+ AioContext *aio_context = bdrv_get_aio_context(di->bs);
-+ aio_context_acquire(aio_context);
-+ GSList *ignore = NULL;
-+ bdrv_set_aio_context_ignore(di->target, aio_context, &ignore);
-+ g_slist_free(ignore);
-+ aio_context_release(aio_context);
-+
-+ job = backup_job_create(NULL, di->bs, di->target, backup_state.speed, MIRROR_SYNC_MODE_FULL, NULL,
-+ BITMAP_SYNC_MODE_NEVER, false, NULL, BLOCKDEV_ON_ERROR_REPORT, BLOCKDEV_ON_ERROR_REPORT,
-+ JOB_DEFAULT, pvebackup_complete_cb, di, 1, NULL, &local_err);
-+ if (!job || local_err != NULL) {
-+ qemu_co_rwlock_wrlock(&backup_state.stat.rwlock);
-+ error_setg(&backup_state.stat.error, "backup_job_create failed");
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
-+ break;
-+ }
-+ job_start(&job->job);
-+ if (di->target) {
-+ bdrv_unref(di->target);
-+ di->target = NULL;
-+ }
-+ }
-+
-+ qemu_co_mutex_unlock(&backup_state.backup_mutex);
-+
-+ qemu_co_rwlock_rdlock(&backup_state.stat.rwlock);
-+ bool no_errors = !backup_state.stat.error;
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
-+
-+ if (no_errors) {
-+ pvebackup_co_run_next_job(); // run one job
-+ } else {
-+ pvebackup_co_cancel(NULL);
-+ }
-+
+ uuid_info = g_malloc0(sizeof(*uuid_info));
+ uuid_info->UUID = uuid_str;
+
+ rmdir(backup_dir);
+ }
+
-+ qemu_co_mutex_unlock(&backup_state.backup_mutex);
-+
+ task->result = NULL;
+ return;
+}
+ .errp = errp,
+ };
+
-+ block_on_coroutine_fn(pvebackup_co_start, &task);
++ qemu_mutex_lock(&backup_state.backup_mutex);
+
-+ return task.result;
-+}
++ block_on_coroutine_fn(pvebackup_co_prepare, &task);
+
++ if (*errp == NULL) {
++ create_backup_jobs();
++ qemu_mutex_unlock(&backup_state.backup_mutex);
++ pvebackup_run_next_job();
++ } else {
++ qemu_mutex_unlock(&backup_state.backup_mutex);
++ }
+
-+typedef struct QmpQueryBackupTask {
-+ Error **errp;
-+ BackupStatus *result;
-+} QmpQueryBackupTask;
++ return task.result;
++}
+
-+static void coroutine_fn pvebackup_co_query(void *opaque)
++BackupStatus *qmp_query_backup(Error **errp)
+{
-+ assert(qemu_in_coroutine());
-+
-+ QmpQueryBackupTask *task = opaque;
-+
+ BackupStatus *info = g_malloc0(sizeof(*info));
+
-+ qemu_co_rwlock_rdlock(&backup_state.stat.rwlock);
++ qemu_mutex_lock(&backup_state.stat.lock);
+
+ if (!backup_state.stat.start_time) {
+ /* not started, return {} */
-+ task->result = info;
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
-+ return;
++ qemu_mutex_unlock(&backup_state.stat.lock);
++ return info;
+ }
+
+ info->has_status = true;
+ info->has_transferred = true;
+ info->transferred = backup_state.stat.transferred;
+
-+ task->result = info;
-+
-+ qemu_co_rwlock_unlock(&backup_state.stat.rwlock);
-+}
-+
-+BackupStatus *qmp_query_backup(Error **errp)
-+{
-+ QmpQueryBackupTask task = {
-+ .errp = errp,
-+ .result = NULL,
-+ };
-+
-+ block_on_coroutine_fn(pvebackup_co_query, &task);
++ qemu_mutex_unlock(&backup_state.stat.lock);
+
-+ return task.result;
++ return info;
+}
diff --git a/qapi/block-core.json b/qapi/block-core.json
index 7957b9867d..be67dc3376 100644