]> git.proxmox.com Git - mirror_qemu.git/blobdiff - block/sheepdog.c
sheepdog: reorganize coroutine flow
[mirror_qemu.git] / block / sheepdog.c
index 66e1cb2b2dd140b944d512f0dcebe93c034113b1..e0985df5fe80a52a15af046e617ea23977acee04 100644 (file)
@@ -345,9 +345,6 @@ struct SheepdogAIOCB {
     enum AIOCBState aiocb_type;
 
     Coroutine *coroutine;
-    void (*aio_done_func)(SheepdogAIOCB *);
-
-    bool cancelable;
     int nr_pending;
 
     uint32_t min_affect_data_idx;
@@ -450,14 +447,13 @@ static const char * sd_strerror(int err)
  *
  * 1. In sd_co_rw_vector, we send the I/O requests to the server and
  *    link the requests to the inflight_list in the
- *    BDRVSheepdogState.  The function exits without waiting for
+ *    BDRVSheepdogState.  The function yields while waiting for
  *    receiving the response.
  *
  * 2. We receive the response in aio_read_response, the fd handler to
- *    the sheepdog connection.  If metadata update is needed, we send
- *    the write request to the vdi object in sd_write_done, the write
- *    completion function.  We switch back to sd_co_readv/writev after
- *    all the requests belonging to the AIOCB are finished.
+ *    the sheepdog connection.  We switch back to sd_co_readv/sd_writev
+ *    after all the requests belonging to the AIOCB are finished.  If
+ *    needed, sd_co_writev will send another requests for the vdi object.
  */
 
 static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
@@ -486,70 +482,14 @@ static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
 {
     SheepdogAIOCB *acb = aio_req->aiocb;
 
-    acb->cancelable = false;
     QLIST_REMOVE(aio_req, aio_siblings);
     g_free(aio_req);
 
     acb->nr_pending--;
 }
 
-static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
-{
-    qemu_coroutine_enter(acb->coroutine);
-    qemu_aio_unref(acb);
-}
-
-/*
- * Check whether the specified acb can be canceled
- *
- * We can cancel aio when any request belonging to the acb is:
- *  - Not processed by the sheepdog server.
- *  - Not linked to the inflight queue.
- */
-static bool sd_acb_cancelable(const SheepdogAIOCB *acb)
-{
-    BDRVSheepdogState *s = acb->common.bs->opaque;
-    AIOReq *aioreq;
-
-    if (!acb->cancelable) {
-        return false;
-    }
-
-    QLIST_FOREACH(aioreq, &s->inflight_aio_head, aio_siblings) {
-        if (aioreq->aiocb == acb) {
-            return false;
-        }
-    }
-
-    return true;
-}
-
-static void sd_aio_cancel(BlockAIOCB *blockacb)
-{
-    SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
-    BDRVSheepdogState *s = acb->common.bs->opaque;
-    AIOReq *aioreq, *next;
-
-    if (sd_acb_cancelable(acb)) {
-        /* Remove outstanding requests from failed queue.  */
-        QLIST_FOREACH_SAFE(aioreq, &s->failed_aio_head, aio_siblings,
-                           next) {
-            if (aioreq->aiocb == acb) {
-                free_aio_req(s, aioreq);
-            }
-        }
-
-        assert(acb->nr_pending == 0);
-        if (acb->common.cb) {
-            acb->common.cb(acb->common.opaque, -ECANCELED);
-        }
-        sd_finish_aiocb(acb);
-    }
-}
-
 static const AIOCBInfo sd_aiocb_info = {
     .aiocb_size     = sizeof(SheepdogAIOCB),
-    .cancel_async   = sd_aio_cancel,
 };
 
 static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
@@ -568,8 +508,6 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
     acb->sector_num = sector_num;
     acb->nb_sectors = nb_sectors;
 
-    acb->aio_done_func = NULL;
-    acb->cancelable = true;
     acb->coroutine = qemu_coroutine_self();
     acb->ret = 0;
     acb->nr_pending = 0;
@@ -641,6 +579,7 @@ static void restart_co_req(void *opaque)
 
 typedef struct SheepdogReqCo {
     int sockfd;
+    BlockDriverState *bs;
     AioContext *aio_context;
     SheepdogReq *hdr;
     void *data;
@@ -663,7 +602,7 @@ static coroutine_fn void do_co_req(void *opaque)
 
     co = qemu_coroutine_self();
     aio_set_fd_handler(srco->aio_context, sockfd, false,
-                       NULL, restart_co_req, co);
+                       NULL, restart_co_req, NULL, co);
 
     ret = send_co_req(sockfd, hdr, data, wlen);
     if (ret < 0) {
@@ -671,7 +610,7 @@ static coroutine_fn void do_co_req(void *opaque)
     }
 
     aio_set_fd_handler(srco->aio_context, sockfd, false,
-                       restart_co_req, NULL, co);
+                       restart_co_req, NULL, NULL, co);
 
     ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
     if (ret != sizeof(*hdr)) {
@@ -697,10 +636,13 @@ out:
     /* there is at most one request for this sockfd, so it is safe to
      * set each handler to NULL. */
     aio_set_fd_handler(srco->aio_context, sockfd, false,
-                       NULL, NULL, NULL);
+                       NULL, NULL, NULL, NULL);
 
     srco->ret = ret;
     srco->finished = true;
+    if (srco->bs) {
+        bdrv_wakeup(srco->bs);
+    }
 }
 
 /*
@@ -708,13 +650,14 @@ out:
  *
  * Return 0 on success, -errno in case of error.
  */
-static int do_req(int sockfd, AioContext *aio_context, SheepdogReq *hdr,
+static int do_req(int sockfd, BlockDriverState *bs, SheepdogReq *hdr,
                   void *data, unsigned int *wlen, unsigned int *rlen)
 {
     Coroutine *co;
     SheepdogReqCo srco = {
         .sockfd = sockfd,
-        .aio_context = aio_context,
+        .aio_context = bs ? bdrv_get_aio_context(bs) : qemu_get_aio_context(),
+        .bs = bs,
         .hdr = hdr,
         .data = data,
         .wlen = wlen,
@@ -727,9 +670,14 @@ static int do_req(int sockfd, AioContext *aio_context, SheepdogReq *hdr,
         do_co_req(&srco);
     } else {
         co = qemu_coroutine_create(do_co_req, &srco);
-        qemu_coroutine_enter(co);
-        while (!srco.finished) {
-            aio_poll(aio_context, true);
+        if (bs) {
+            qemu_coroutine_enter(co);
+            BDRV_POLL_WHILE(bs, !srco.finished);
+        } else {
+            qemu_coroutine_enter(co);
+            while (!srco.finished) {
+                aio_poll(qemu_get_aio_context(), true);
+            }
         }
     }
 
@@ -750,7 +698,7 @@ static coroutine_fn void reconnect_to_sdog(void *opaque)
     AIOReq *aio_req, *next;
 
     aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
-                       NULL, NULL);
+                       NULL, NULL, NULL);
     close(s->fd);
     s->fd = -1;
 
@@ -830,9 +778,6 @@ static void coroutine_fn aio_read_response(void *opaque)
 
     switch (acb->aiocb_type) {
     case AIOCB_WRITE_UDATA:
-        /* this coroutine context is no longer suitable for co_recv
-         * because we may send data to update vdi objects */
-        s->co_recv = NULL;
         if (!is_data_obj(aio_req->oid)) {
             break;
         }
@@ -880,6 +825,11 @@ static void coroutine_fn aio_read_response(void *opaque)
         }
     }
 
+    /* No more data for this aio_req (reload_inode below uses its own file
+     * descriptor handler which doesn't use co_recv).
+    */
+    s->co_recv = NULL;
+
     switch (rsp.result) {
     case SD_RES_SUCCESS:
         break;
@@ -897,7 +847,7 @@ static void coroutine_fn aio_read_response(void *opaque)
             aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
         }
         resend_aioreq(s, aio_req);
-        goto out;
+        return;
     default:
         acb->ret = -EIO;
         error_report("%s", sd_strerror(rsp.result));
@@ -910,13 +860,12 @@ static void coroutine_fn aio_read_response(void *opaque)
          * We've finished all requests which belong to the AIOCB, so
          * we can switch back to sd_co_readv/writev now.
          */
-        acb->aio_done_func(acb);
+        qemu_coroutine_enter(acb->coroutine);
     }
-out:
-    s->co_recv = NULL;
+
     return;
+
 err:
-    s->co_recv = NULL;
     reconnect_to_sdog(opaque);
 }
 
@@ -954,7 +903,7 @@ static int get_sheep_fd(BDRVSheepdogState *s, Error **errp)
     }
 
     aio_set_fd_handler(s->aio_context, fd, false,
-                       co_read_response, NULL, s);
+                       co_read_response, NULL, NULL, s);
     return fd;
 }
 
@@ -1049,7 +998,7 @@ static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
     const char *host_spec, *vdi_spec;
     int nr_sep, ret;
 
-    strstart(filename, "sheepdog:", (const char **)&filename);
+    strstart(filename, "sheepdog:", &filename);
     p = q = g_strdup(filename);
 
     /* count the number of separators */
@@ -1125,7 +1074,7 @@ static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
     hdr.snapid = snapid;
     hdr.flags = SD_FLAG_CMD_WRITE;
 
-    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
+    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
     if (ret) {
         error_setg_errno(errp, -ret, "cannot get vdi info");
         goto out;
@@ -1216,7 +1165,7 @@ static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
     qemu_co_mutex_lock(&s->lock);
     s->co_send = qemu_coroutine_self();
     aio_set_fd_handler(s->aio_context, s->fd, false,
-                       co_read_response, co_write_request, s);
+                       co_read_response, co_write_request, NULL, s);
     socket_set_cork(s->fd, 1);
 
     /* send a header */
@@ -1235,12 +1184,12 @@ static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
 out:
     socket_set_cork(s->fd, 0);
     aio_set_fd_handler(s->aio_context, s->fd, false,
-                       co_read_response, NULL, s);
+                       co_read_response, NULL, NULL, s);
     s->co_send = NULL;
     qemu_co_mutex_unlock(&s->lock);
 }
 
-static int read_write_object(int fd, AioContext *aio_context, char *buf,
+static int read_write_object(int fd, BlockDriverState *bs, char *buf,
                              uint64_t oid, uint8_t copies,
                              unsigned int datalen, uint64_t offset,
                              bool write, bool create, uint32_t cache_flags)
@@ -1274,7 +1223,7 @@ static int read_write_object(int fd, AioContext *aio_context, char *buf,
     hdr.offset = offset;
     hdr.copies = copies;
 
-    ret = do_req(fd, aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
+    ret = do_req(fd, bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
     if (ret) {
         error_report("failed to send a request to the sheep");
         return ret;
@@ -1289,22 +1238,22 @@ static int read_write_object(int fd, AioContext *aio_context, char *buf,
     }
 }
 
-static int read_object(int fd, AioContext *aio_context, char *buf,
+static int read_object(int fd, BlockDriverState *bs, char *buf,
                        uint64_t oid, uint8_t copies,
                        unsigned int datalen, uint64_t offset,
                        uint32_t cache_flags)
 {
-    return read_write_object(fd, aio_context, buf, oid, copies,
+    return read_write_object(fd, bs, buf, oid, copies,
                              datalen, offset, false,
                              false, cache_flags);
 }
 
-static int write_object(int fd, AioContext *aio_context, char *buf,
+static int write_object(int fd, BlockDriverState *bs, char *buf,
                         uint64_t oid, uint8_t copies,
                         unsigned int datalen, uint64_t offset, bool create,
                         uint32_t cache_flags)
 {
-    return read_write_object(fd, aio_context, buf, oid, copies,
+    return read_write_object(fd, bs, buf, oid, copies,
                              datalen, offset, true,
                              create, cache_flags);
 }
@@ -1331,7 +1280,7 @@ static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
         goto out;
     }
 
-    ret = read_object(fd, s->aio_context, (char *)inode, vid_to_vdi_oid(vid),
+    ret = read_object(fd, s->bs, (char *)inode, vid_to_vdi_oid(vid),
                       s->inode.nr_copies, SD_INODE_HEADER_SIZE, 0,
                       s->cache_flags);
     if (ret < 0) {
@@ -1386,7 +1335,7 @@ static void sd_detach_aio_context(BlockDriverState *bs)
     BDRVSheepdogState *s = bs->opaque;
 
     aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
-                       NULL, NULL);
+                       NULL, NULL, NULL);
 }
 
 static void sd_attach_aio_context(BlockDriverState *bs,
@@ -1396,7 +1345,7 @@ static void sd_attach_aio_context(BlockDriverState *bs,
 
     s->aio_context = new_context;
     aio_set_fd_handler(new_context, s->fd, false,
-                       co_read_response, NULL, s);
+                       co_read_response, NULL, NULL, s);
 }
 
 /* TODO Convert to fine grained options */
@@ -1489,7 +1438,7 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags,
     }
 
     buf = g_malloc(SD_INODE_SIZE);
-    ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
+    ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
                       0, SD_INODE_SIZE, 0, s->cache_flags);
 
     closesocket(fd);
@@ -1510,7 +1459,7 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags,
     return 0;
 out:
     aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
-                       false, NULL, NULL, NULL);
+                       false, NULL, NULL, NULL, NULL);
     if (s->fd >= 0) {
         closesocket(s->fd);
     }
@@ -1549,7 +1498,7 @@ static void sd_reopen_commit(BDRVReopenState *state)
 
     if (s->fd) {
         aio_set_fd_handler(s->aio_context, s->fd, false,
-                           NULL, NULL, NULL);
+                           NULL, NULL, NULL, NULL);
         closesocket(s->fd);
     }
 
@@ -1573,7 +1522,7 @@ static void sd_reopen_abort(BDRVReopenState *state)
 
     if (re_s->fd) {
         aio_set_fd_handler(s->aio_context, re_s->fd, false,
-                           NULL, NULL, NULL);
+                           NULL, NULL, NULL, NULL);
         closesocket(re_s->fd);
     }
 
@@ -1618,7 +1567,7 @@ static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
     hdr.copies = s->inode.nr_copies;
     hdr.block_size_shift = s->inode.block_size_shift;
 
-    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
+    ret = do_req(fd, NULL, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
 
     closesocket(fd);
 
@@ -1886,7 +1835,7 @@ static int sd_create(const char *filename, QemuOpts *opts,
         hdr.opcode = SD_OP_GET_CLUSTER_DEFAULT;
         hdr.proto_ver = SD_PROTO_VER;
 
-        ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
+        ret = do_req(fd, NULL, (SheepdogReq *)&hdr,
                      NULL, &wlen, &rlen);
         closesocket(fd);
         if (ret) {
@@ -1951,7 +1900,7 @@ static void sd_close(BlockDriverState *bs)
     hdr.data_length = wlen;
     hdr.flags = SD_FLAG_CMD_WRITE;
 
-    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
+    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
                  s->name, &wlen, &rlen);
 
     closesocket(fd);
@@ -1962,7 +1911,7 @@ static void sd_close(BlockDriverState *bs)
     }
 
     aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
-                       false, NULL, NULL, NULL);
+                       false, NULL, NULL, NULL, NULL);
     closesocket(s->fd);
     g_free(s->host_spec);
 }
@@ -2000,7 +1949,7 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset)
     /* we don't need to update entire object */
     datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
     s->inode.vdi_size = offset;
-    ret = write_object(fd, s->aio_context, (char *)&s->inode,
+    ret = write_object(fd, s->bs, (char *)&s->inode,
                        vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
                        datalen, 0, false, s->cache_flags);
     close(fd);
@@ -2015,7 +1964,6 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset)
 /*
  * This function is called after writing data objects.  If we need to
  * update metadata, this sends a write request to the vdi object.
- * Otherwise, this switches back to sd_co_readv/writev.
  */
 static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
 {
@@ -2028,6 +1976,7 @@ static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
     mx = acb->max_dirty_data_idx;
     if (mn <= mx) {
         /* we need to update the vdi object. */
+        ++acb->nr_pending;
         offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
             mn * sizeof(s->inode.data_vdi_id[0]);
         data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
@@ -2041,13 +1990,10 @@ static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
                                 data_len, offset, 0, false, 0, offset);
         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
         add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
-
-        acb->aio_done_func = sd_finish_aiocb;
-        acb->aiocb_type = AIOCB_WRITE_UDATA;
-        return;
+        if (--acb->nr_pending) {
+            qemu_coroutine_yield();
+        }
     }
-
-    sd_finish_aiocb(acb);
 }
 
 /* Delete current working VDI on the snapshot chain */
@@ -2070,7 +2016,7 @@ static bool sd_delete(BDRVSheepdogState *s)
         return false;
     }
 
-    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
+    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
                  s->name, &wlen, &rlen);
     closesocket(fd);
     if (ret) {
@@ -2126,7 +2072,7 @@ static int sd_create_branch(BDRVSheepdogState *s)
         goto out;
     }
 
-    ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
+    ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
                       s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
 
     closesocket(fd);
@@ -2159,7 +2105,7 @@ out:
  * Returns 1 when we need to wait a response, 0 when there is no sent
  * request and -errno in error cases.
  */
-static int coroutine_fn sd_co_rw_vector(void *p)
+static void coroutine_fn sd_co_rw_vector(void *p)
 {
     SheepdogAIOCB *acb = p;
     int ret = 0;
@@ -2180,7 +2126,7 @@ static int coroutine_fn sd_co_rw_vector(void *p)
         ret = sd_create_branch(s);
         if (ret) {
             acb->ret = -EIO;
-            goto out;
+            return;
         }
     }
 
@@ -2254,11 +2200,9 @@ static int coroutine_fn sd_co_rw_vector(void *p)
         idx++;
         done += len;
     }
-out:
-    if (!--acb->nr_pending) {
-        return acb->ret;
+    if (--acb->nr_pending) {
+        qemu_coroutine_yield();
     }
-    return 1;
 }
 
 static bool check_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *aiocb)
@@ -2291,7 +2235,6 @@ static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
     }
 
     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
-    acb->aio_done_func = sd_write_done;
     acb->aiocb_type = AIOCB_WRITE_UDATA;
 
 retry:
@@ -2300,20 +2243,14 @@ retry:
         goto retry;
     }
 
-    ret = sd_co_rw_vector(acb);
-    if (ret <= 0) {
-        QLIST_REMOVE(acb, aiocb_siblings);
-        qemu_co_queue_restart_all(&s->overlapping_queue);
-        qemu_aio_unref(acb);
-        return ret;
-    }
-
-    qemu_coroutine_yield();
+    sd_co_rw_vector(acb);
+    sd_write_done(acb);
 
     QLIST_REMOVE(acb, aiocb_siblings);
     qemu_co_queue_restart_all(&s->overlapping_queue);
-
-    return acb->ret;
+    ret = acb->ret;
+    qemu_aio_unref(acb);
+    return ret;
 }
 
 static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
@@ -2325,7 +2262,6 @@ static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
 
     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
     acb->aiocb_type = AIOCB_READ_UDATA;
-    acb->aio_done_func = sd_finish_aiocb;
 
 retry:
     if (check_overlapping_aiocb(s, acb)) {
@@ -2333,25 +2269,20 @@ retry:
         goto retry;
     }
 
-    ret = sd_co_rw_vector(acb);
-    if (ret <= 0) {
-        QLIST_REMOVE(acb, aiocb_siblings);
-        qemu_co_queue_restart_all(&s->overlapping_queue);
-        qemu_aio_unref(acb);
-        return ret;
-    }
-
-    qemu_coroutine_yield();
+    sd_co_rw_vector(acb);
 
     QLIST_REMOVE(acb, aiocb_siblings);
     qemu_co_queue_restart_all(&s->overlapping_queue);
-    return acb->ret;
+    ret = acb->ret;
+    qemu_aio_unref(acb);
+    return ret;
 }
 
 static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
 {
     BDRVSheepdogState *s = bs->opaque;
     SheepdogAIOCB *acb;
+    int ret;
     AIOReq *aio_req;
 
     if (s->cache_flags != SD_FLAG_CMD_CACHE) {
@@ -2360,15 +2291,19 @@ static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
 
     acb = sd_aio_setup(bs, NULL, 0, 0);
     acb->aiocb_type = AIOCB_FLUSH_CACHE;
-    acb->aio_done_func = sd_finish_aiocb;
 
+    acb->nr_pending++;
     aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
                             0, 0, 0, false, 0, 0);
     QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
     add_aio_request(s, aio_req, NULL, 0, acb->aiocb_type);
 
-    qemu_coroutine_yield();
-    return acb->ret;
+    if (--acb->nr_pending) {
+        qemu_coroutine_yield();
+    }
+    ret = acb->ret;
+    qemu_aio_unref(acb);
+    return ret;
 }
 
 static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
@@ -2411,7 +2346,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
         goto cleanup;
     }
 
-    ret = write_object(fd, s->aio_context, (char *)&s->inode,
+    ret = write_object(fd, s->bs, (char *)&s->inode,
                        vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
                        datalen, 0, false, s->cache_flags);
     if (ret < 0) {
@@ -2426,7 +2361,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
         goto cleanup;
     }
 
-    ret = read_object(fd, s->aio_context, (char *)inode,
+    ret = read_object(fd, s->bs, (char *)inode,
                       vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
                       s->cache_flags);
 
@@ -2528,7 +2463,7 @@ static bool remove_objects(BDRVSheepdogState *s)
             i++;
         }
 
-        ret = write_object(fd, s->aio_context,
+        ret = write_object(fd, s->bs,
                            (char *)&inode->data_vdi_id[start_idx],
                            vid_to_vdi_oid(s->inode.vdi_id), inode->nr_copies,
                            (i - start_idx) * sizeof(uint32_t),
@@ -2600,7 +2535,7 @@ static int sd_snapshot_delete(BlockDriverState *bs,
         return -1;
     }
 
-    ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
+    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
                  buf, &wlen, &rlen);
     closesocket(fd);
     if (ret) {
@@ -2652,8 +2587,7 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
     req.opcode = SD_OP_READ_VDIS;
     req.data_length = max;
 
-    ret = do_req(fd, s->aio_context, (SheepdogReq *)&req,
-                 vdi_inuse, &wlen, &rlen);
+    ret = do_req(fd, s->bs, &req, vdi_inuse, &wlen, &rlen);
 
     closesocket(fd);
     if (ret) {
@@ -2679,7 +2613,7 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
         }
 
         /* we don't need to read entire object */
-        ret = read_object(fd, s->aio_context, (char *)&inode,
+        ret = read_object(fd, s->bs, (char *)&inode,
                           vid_to_vdi_oid(vid),
                           0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
                           s->cache_flags);
@@ -2745,11 +2679,11 @@ static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
 
         create = (offset == 0);
         if (load) {
-            ret = read_object(fd, s->aio_context, (char *)data, vmstate_oid,
+            ret = read_object(fd, s->bs, (char *)data, vmstate_oid,
                               s->inode.nr_copies, data_len, offset,
                               s->cache_flags);
         } else {
-            ret = write_object(fd, s->aio_context, (char *)data, vmstate_oid,
+            ret = write_object(fd, s->bs, (char *)data, vmstate_oid,
                                s->inode.nr_copies, data_len, offset, create,
                                s->cache_flags);
         }
@@ -2820,12 +2754,12 @@ static coroutine_fn int sd_co_pdiscard(BlockDriverState *bs, int64_t offset,
     iov.iov_len = sizeof(zero);
     discard_iov.iov = &iov;
     discard_iov.niov = 1;
-    assert((offset & (BDRV_SECTOR_SIZE - 1)) == 0);
-    assert((count & (BDRV_SECTOR_SIZE - 1)) == 0);
+    if (!QEMU_IS_ALIGNED(offset | count, BDRV_SECTOR_SIZE)) {
+        return -ENOTSUP;
+    }
     acb = sd_aio_setup(bs, &discard_iov, offset >> BDRV_SECTOR_BITS,
                        count >> BDRV_SECTOR_BITS);
     acb->aiocb_type = AIOCB_DISCARD_OBJ;
-    acb->aio_done_func = sd_finish_aiocb;
 
 retry:
     if (check_overlapping_aiocb(s, acb)) {
@@ -2833,20 +2767,13 @@ retry:
         goto retry;
     }
 
-    ret = sd_co_rw_vector(acb);
-    if (ret <= 0) {
-        QLIST_REMOVE(acb, aiocb_siblings);
-        qemu_co_queue_restart_all(&s->overlapping_queue);
-        qemu_aio_unref(acb);
-        return ret;
-    }
-
-    qemu_coroutine_yield();
+    sd_co_rw_vector(acb);
 
     QLIST_REMOVE(acb, aiocb_siblings);
     qemu_co_queue_restart_all(&s->overlapping_queue);
-
-    return acb->ret;
+    ret = acb->ret;
+    qemu_aio_unref(acb);
+    return ret;
 }
 
 static coroutine_fn int64_t