]> git.proxmox.com Git - qemu.git/blobdiff - block/sheepdog.c
Merge remote-tracking branch 'mjt/mjt-iov2' into staging
[qemu.git] / block / sheepdog.c
index 57b6e1aad7d77a1ad0d97c4eb28c2630d8ed3dee..809df39d9e7dd22010c82e71d3ba9c68ae616ceb 100644 (file)
@@ -7,6 +7,9 @@
  *
  * You should have received a copy of the GNU General Public License
  * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * Contributions after 2012-01-13 are licensed under the terms of the
+ * GNU GPL, version 2 or (at your option) any later version.
  */
 
 #include "qemu-common.h"
 #define SD_OP_RELEASE_VDI    0x13
 #define SD_OP_GET_VDI_INFO   0x14
 #define SD_OP_READ_VDIS      0x15
+#define SD_OP_FLUSH_VDI      0x16
 
 #define SD_FLAG_CMD_WRITE    0x01
 #define SD_FLAG_CMD_COW      0x02
+#define SD_FLAG_CMD_CACHE    0x04
 
 #define SD_RES_SUCCESS       0x00 /* Success */
 #define SD_RES_UNKNOWN       0x01 /* Unknown error */
@@ -66,7 +71,7 @@
  * 20 - 31 (12 bits): reserved data object space
  * 32 - 55 (24 bits): vdi object space
  * 56 - 59 ( 4 bits): reserved vdi object space
- * 60 - 63 ( 4 bits): object type indentifier space
+ * 60 - 63 ( 4 bits): object type identifier space
  */
 
 #define VDI_SPACE_SHIFT   32
@@ -254,8 +259,7 @@ typedef struct AIOReq {
     uint8_t flags;
     uint32_t id;
 
-    QLIST_ENTRY(AIOReq) outstanding_aio_siblings;
-    QLIST_ENTRY(AIOReq) aioreq_siblings;
+    QLIST_ENTRY(AIOReq) aio_siblings;
 } AIOReq;
 
 enum AIOCBState {
@@ -274,12 +278,11 @@ struct SheepdogAIOCB {
     int ret;
     enum AIOCBState aiocb_type;
 
-    QEMUBH *bh;
+    Coroutine *coroutine;
     void (*aio_done_func)(SheepdogAIOCB *);
 
     int canceled;
-
-    QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
+    int nr_pending;
 };
 
 typedef struct BDRVSheepdogState {
@@ -290,13 +293,20 @@ typedef struct BDRVSheepdogState {
 
     char name[SD_MAX_VDI_LEN];
     int is_snapshot;
+    uint8_t cache_enabled;
 
     char *addr;
     char *port;
     int fd;
+    int flush_fd;
+
+    CoMutex lock;
+    Coroutine *co_send;
+    Coroutine *co_recv;
 
     uint32_t aioreq_seq_num;
-    QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
+    QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
+    QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head;
 } BDRVSheepdogState;
 
 static const char * sd_strerror(int err)
@@ -346,19 +356,16 @@ static const char * sd_strerror(int err)
 /*
  * Sheepdog I/O handling:
  *
- * 1. In the sd_aio_readv/writev, read/write requests are added to the
- *    QEMU Bottom Halves.
+ * 1. In sd_co_rw_vector, we send the I/O requests to the server and
+ *    link the requests to the inflight_list in the
+ *    BDRVSheepdogState.  The function exits without waiting for
+ *    receiving the response.
  *
- * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we send the I/O
- *    requests to the server and link the requests to the
- *    outstanding_list in the BDRVSheepdogState.  we exits the
- *    function without waiting for receiving the response.
- *
- * 3. We receive the response in aio_read_response, the fd handler to
+ * 2. We receive the response in aio_read_response, the fd handler to
  *    the sheepdog connection.  If metadata update is needed, we send
  *    the write request to the vdi object in sd_write_done, the write
- *    completion function.  The AIOCB callback is not called until all
- *    the requests belonging to the AIOCB are finished.
+ *    completion function.  We switch back to sd_co_readv/writev after
+ *    all the requests belonging to the AIOCB are finished.
  */
 
 static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
@@ -378,27 +385,24 @@ static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
     aio_req->flags = flags;
     aio_req->id = s->aioreq_seq_num++;
 
-    QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
-                      outstanding_aio_siblings);
-    QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
-
+    acb->nr_pending++;
     return aio_req;
 }
 
-static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
+static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
 {
     SheepdogAIOCB *acb = aio_req->aiocb;
-    QLIST_REMOVE(aio_req, outstanding_aio_siblings);
-    QLIST_REMOVE(aio_req, aioreq_siblings);
+
+    QLIST_REMOVE(aio_req, aio_siblings);
     g_free(aio_req);
 
-    return !QLIST_EMPTY(&acb->aioreq_head);
+    acb->nr_pending--;
 }
 
-static void sd_finish_aiocb(SheepdogAIOCB *acb)
+static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
 {
     if (!acb->canceled) {
-        acb->common.cb(acb->common.opaque, acb->ret);
+        qemu_coroutine_enter(acb->coroutine, NULL);
     }
     qemu_aio_release(acb);
 }
@@ -411,7 +415,8 @@ static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
      * Sheepdog cannot cancel the requests which are already sent to
      * the servers, so we just complete the request with -EIO here.
      */
-    acb->common.cb(acb->common.opaque, -EIO);
+    acb->ret = -EIO;
+    qemu_coroutine_enter(acb->coroutine, NULL);
     acb->canceled = 1;
 }
 
@@ -435,147 +440,12 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
 
     acb->aio_done_func = NULL;
     acb->canceled = 0;
-    acb->bh = NULL;
+    acb->coroutine = qemu_coroutine_self();
     acb->ret = 0;
-    QLIST_INIT(&acb->aioreq_head);
+    acb->nr_pending = 0;
     return acb;
 }
 
-static int sd_schedule_bh(QEMUBHFunc *cb, SheepdogAIOCB *acb)
-{
-    if (acb->bh) {
-        error_report("bug: %d %d", acb->aiocb_type, acb->aiocb_type);
-        return -EIO;
-    }
-
-    acb->bh = qemu_bh_new(cb, acb);
-    qemu_bh_schedule(acb->bh);
-    return 0;
-}
-
-#ifdef _WIN32
-
-struct msghdr {
-    struct iovec *msg_iov;
-    size_t        msg_iovlen;
-};
-
-static ssize_t sendmsg(int s, const struct msghdr *msg, int flags)
-{
-    size_t size = 0;
-    char *buf, *p;
-    int i, ret;
-
-    /* count the msg size */
-    for (i = 0; i < msg->msg_iovlen; i++) {
-        size += msg->msg_iov[i].iov_len;
-    }
-    buf = g_malloc(size);
-
-    p = buf;
-    for (i = 0; i < msg->msg_iovlen; i++) {
-        memcpy(p, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len);
-        p += msg->msg_iov[i].iov_len;
-    }
-
-    ret = send(s, buf, size, flags);
-
-    g_free(buf);
-    return ret;
-}
-
-static ssize_t recvmsg(int s, struct msghdr *msg, int flags)
-{
-    size_t size = 0;
-    char *buf, *p;
-    int i, ret;
-
-    /* count the msg size */
-    for (i = 0; i < msg->msg_iovlen; i++) {
-        size += msg->msg_iov[i].iov_len;
-    }
-    buf = g_malloc(size);
-
-    ret = qemu_recv(s, buf, size, flags);
-    if (ret < 0) {
-        goto out;
-    }
-
-    p = buf;
-    for (i = 0; i < msg->msg_iovlen; i++) {
-        memcpy(msg->msg_iov[i].iov_base, p, msg->msg_iov[i].iov_len);
-        p += msg->msg_iov[i].iov_len;
-    }
-out:
-    g_free(buf);
-    return ret;
-}
-
-#endif
-
-/*
- * Send/recv data with iovec buffers
- *
- * This function send/recv data from/to the iovec buffer directly.
- * The first `offset' bytes in the iovec buffer are skipped and next
- * `len' bytes are used.
- *
- * For example,
- *
- *   do_send_recv(sockfd, iov, len, offset, 1);
- *
- * is equals to
- *
- *   char *buf = malloc(size);
- *   iov_to_buf(iov, iovcnt, buf, offset, size);
- *   send(sockfd, buf, size, 0);
- *   free(buf);
- */
-static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset,
-                        int write)
-{
-    struct msghdr msg;
-    int ret, diff;
-
-    memset(&msg, 0, sizeof(msg));
-    msg.msg_iov = iov;
-    msg.msg_iovlen = 1;
-
-    len += offset;
-
-    while (iov->iov_len < len) {
-        len -= iov->iov_len;
-
-        iov++;
-        msg.msg_iovlen++;
-    }
-
-    diff = iov->iov_len - len;
-    iov->iov_len -= diff;
-
-    while (msg.msg_iov->iov_len <= offset) {
-        offset -= msg.msg_iov->iov_len;
-
-        msg.msg_iov++;
-        msg.msg_iovlen--;
-    }
-
-    msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset;
-    msg.msg_iov->iov_len -= offset;
-
-    if (write) {
-        ret = sendmsg(sockfd, &msg, 0);
-    } else {
-        ret = recvmsg(sockfd, &msg, 0);
-    }
-
-    msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset;
-    msg.msg_iov->iov_len += offset;
-
-    iov->iov_len += diff;
-    return ret;
-}
-
 static int connect_to_sdog(const char *addr, const char *port)
 {
     char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
@@ -594,7 +464,7 @@ static int connect_to_sdog(const char *addr, const char *port)
     if (ret) {
         error_report("unable to get address info %s, %s",
                      addr, strerror(errno));
-        return -1;
+        return -errno;
     }
 
     for (res = res0; res; res = res->ai_next) {
@@ -621,104 +491,123 @@ static int connect_to_sdog(const char *addr, const char *port)
         dprintf("connected to %s:%s\n", addr, port);
         goto success;
     }
-    fd = -1;
+    fd = -errno;
     error_report("failed connect to %s:%s", addr, port);
 success:
     freeaddrinfo(res0);
     return fd;
 }
 
-static int do_readv_writev(int sockfd, struct iovec *iov, int len,
-                           int iov_offset, int write)
+static int send_req(int sockfd, SheepdogReq *hdr, void *data,
+                    unsigned int *wlen)
 {
     int ret;
-again:
-    ret = do_send_recv(sockfd, iov, len, iov_offset, write);
-    if (ret < 0) {
-        if (errno == EINTR || errno == EAGAIN) {
-            goto again;
-        }
-        error_report("failed to recv a rsp, %s", strerror(errno));
-        return 1;
-    }
 
-    iov_offset += ret;
-    len -= ret;
-    if (len) {
-        goto again;
+    ret = qemu_send_full(sockfd, hdr, sizeof(*hdr), 0);
+    if (ret < sizeof(*hdr)) {
+        error_report("failed to send a req, %s", strerror(errno));
+        return -errno;
     }
 
-    return 0;
-}
-
-static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset)
-{
-    return do_readv_writev(sockfd, iov, len, iov_offset, 0);
-}
+    ret = qemu_send_full(sockfd, data, *wlen, 0);
+    if (ret < *wlen) {
+        error_report("failed to send a req, %s", strerror(errno));
+        ret = -errno;
+    }
 
-static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset)
-{
-    return do_readv_writev(sockfd, iov, len, iov_offset, 1);
+    return ret;
 }
 
-static int do_read_write(int sockfd, void *buf, int len, int write)
+static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
+                                    unsigned int *wlen)
 {
-    struct iovec iov;
+    int ret;
 
-    iov.iov_base = buf;
-    iov.iov_len = len;
+    ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
+    if (ret < sizeof(*hdr)) {
+        error_report("failed to send a req, %s", strerror(errno));
+        return ret;
+    }
 
-    return do_readv_writev(sockfd, &iov, len, 0, write);
-}
+    ret = qemu_co_send(sockfd, data, *wlen);
+    if (ret < *wlen) {
+        error_report("failed to send a req, %s", strerror(errno));
+    }
 
-static int do_read(int sockfd, void *buf, int len)
-{
-    return do_read_write(sockfd, buf, len, 0);
+    return ret;
 }
 
-static int do_write(int sockfd, void *buf, int len)
-{
-    return do_read_write(sockfd, buf, len, 1);
-}
+static coroutine_fn int do_co_req(int sockfd, SheepdogReq *hdr, void *data,
+                                  unsigned int *wlen, unsigned int *rlen);
 
-static int send_req(int sockfd, SheepdogReq *hdr, void *data,
-                    unsigned int *wlen)
+static int do_req(int sockfd, SheepdogReq *hdr, void *data,
+                  unsigned int *wlen, unsigned int *rlen)
 {
     int ret;
-    struct iovec iov[2];
 
-    iov[0].iov_base = hdr;
-    iov[0].iov_len = sizeof(*hdr);
+    if (qemu_in_coroutine()) {
+        return do_co_req(sockfd, hdr, data, wlen, rlen);
+    }
 
-    if (*wlen) {
-        iov[1].iov_base = data;
-        iov[1].iov_len = *wlen;
+    socket_set_block(sockfd);
+    ret = send_req(sockfd, hdr, data, wlen);
+    if (ret < 0) {
+        goto out;
     }
 
-    ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0);
-    if (ret) {
-        error_report("failed to send a req, %s", strerror(errno));
-        ret = -1;
+    ret = qemu_recv_full(sockfd, hdr, sizeof(*hdr), 0);
+    if (ret < sizeof(*hdr)) {
+        error_report("failed to get a rsp, %s", strerror(errno));
+        ret = -errno;
+        goto out;
     }
 
+    if (*rlen > hdr->data_length) {
+        *rlen = hdr->data_length;
+    }
+
+    if (*rlen) {
+        ret = qemu_recv_full(sockfd, data, *rlen, 0);
+        if (ret < *rlen) {
+            error_report("failed to get the data, %s", strerror(errno));
+            ret = -errno;
+            goto out;
+        }
+    }
+    ret = 0;
+out:
+    socket_set_nonblock(sockfd);
     return ret;
 }
 
-static int do_req(int sockfd, SheepdogReq *hdr, void *data,
-                  unsigned int *wlen, unsigned int *rlen)
+static void restart_co_req(void *opaque)
+{
+    Coroutine *co = opaque;
+
+    qemu_coroutine_enter(co, NULL);
+}
+
+static coroutine_fn int do_co_req(int sockfd, SheepdogReq *hdr, void *data,
+                                  unsigned int *wlen, unsigned int *rlen)
 {
     int ret;
+    Coroutine *co;
 
-    ret = send_req(sockfd, hdr, data, wlen);
-    if (ret) {
-        ret = -1;
+    co = qemu_coroutine_self();
+    qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, NULL, co);
+
+    socket_set_block(sockfd);
+    ret = send_co_req(sockfd, hdr, data, wlen);
+    if (ret < 0) {
         goto out;
     }
 
-    ret = do_read(sockfd, hdr, sizeof(*hdr));
-    if (ret) {
+    qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, NULL, co);
+
+    ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
+    if (ret < sizeof(*hdr)) {
         error_report("failed to get a rsp, %s", strerror(errno));
-        ret = -1;
+        ret = -errno;
         goto out;
     }
 
@@ -727,48 +616,59 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
     }
 
     if (*rlen) {
-        ret = do_read(sockfd, data, *rlen);
-        if (ret) {
+        ret = qemu_co_recv(sockfd, data, *rlen);
+        if (ret < *rlen) {
             error_report("failed to get the data, %s", strerror(errno));
-            ret = -1;
+            ret = -errno;
             goto out;
         }
     }
     ret = 0;
 out:
+    qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL, NULL);
+    socket_set_nonblock(sockfd);
     return ret;
 }
 
-static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
+static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
                            struct iovec *iov, int niov, int create,
                            enum AIOCBState aiocb_type);
 
+
+static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid)
+{
+    AIOReq *aio_req;
+
+    QLIST_FOREACH(aio_req, &s->pending_aio_head, aio_siblings) {
+        if (aio_req->oid == oid) {
+            return aio_req;
+        }
+    }
+
+    return NULL;
+}
+
 /*
  * This function searchs pending requests to the object `oid', and
  * sends them.
  */
-static void send_pending_req(BDRVSheepdogState *s, uint64_t oid, uint32_t id)
+static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid)
 {
-    AIOReq *aio_req, *next;
+    AIOReq *aio_req;
     SheepdogAIOCB *acb;
     int ret;
 
-    QLIST_FOREACH_SAFE(aio_req, &s->outstanding_aio_head,
-                       outstanding_aio_siblings, next) {
-        if (id == aio_req->id) {
-            continue;
-        }
-        if (aio_req->oid != oid) {
-            continue;
-        }
-
+    while ((aio_req = find_pending_req(s, oid)) != NULL) {
         acb = aio_req->aiocb;
+        /* move aio_req from pending list to inflight one */
+        QLIST_REMOVE(aio_req, aio_siblings);
+        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
         ret = add_aio_request(s, aio_req, acb->qiov->iov,
                               acb->qiov->niov, 0, acb->aiocb_type);
         if (ret < 0) {
             error_report("add_aio_request is failed");
             free_aio_req(s, aio_req);
-            if (QLIST_EMPTY(&acb->aioreq_head)) {
+            if (!acb->nr_pending) {
                 sd_finish_aiocb(acb);
             }
         }
@@ -781,7 +681,7 @@ static void send_pending_req(BDRVSheepdogState *s, uint64_t oid, uint32_t id)
  * This function is registered as a fd handler, and called from the
  * main loop when s->fd is ready for reading responses.
  */
-static void aio_read_response(void *opaque)
+static void coroutine_fn aio_read_response(void *opaque)
 {
     SheepdogObjRsp rsp;
     BDRVSheepdogState *s = opaque;
@@ -789,35 +689,37 @@ static void aio_read_response(void *opaque)
     int ret;
     AIOReq *aio_req = NULL;
     SheepdogAIOCB *acb;
-    int rest;
     unsigned long idx;
 
-    if (QLIST_EMPTY(&s->outstanding_aio_head)) {
-        return;
+    if (QLIST_EMPTY(&s->inflight_aio_head)) {
+        goto out;
     }
 
     /* read a header */
-    ret = do_read(fd, &rsp, sizeof(rsp));
-    if (ret) {
+    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
+    if (ret < 0) {
         error_report("failed to get the header, %s", strerror(errno));
-        return;
+        goto out;
     }
 
-    /* find the right aio_req from the outstanding_aio list */
-    QLIST_FOREACH(aio_req, &s->outstanding_aio_head, outstanding_aio_siblings) {
+    /* find the right aio_req from the inflight aio list */
+    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
         if (aio_req->id == rsp.id) {
             break;
         }
     }
     if (!aio_req) {
         error_report("cannot find aio_req %x", rsp.id);
-        return;
+        goto out;
     }
 
     acb = aio_req->aiocb;
 
     switch (acb->aiocb_type) {
     case AIOCB_WRITE_UDATA:
+        /* this coroutine context is no longer suitable for co_recv
+         * because we may send data to update vdi objects */
+        s->co_recv = NULL;
         if (!is_data_obj(aio_req->oid)) {
             break;
         }
@@ -839,15 +741,15 @@ static void aio_read_response(void *opaque)
              * create requests are not allowed, so we search the
              * pending requests here.
              */
-            send_pending_req(s, vid_to_data_oid(s->inode.vdi_id, idx), rsp.id);
+            send_pending_req(s, vid_to_data_oid(s->inode.vdi_id, idx));
         }
         break;
     case AIOCB_READ_UDATA:
-        ret = do_readv(fd, acb->qiov->iov, rsp.data_length,
-                       aio_req->iov_offset);
-        if (ret) {
+        ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
+                            aio_req->iov_offset, rsp.data_length);
+        if (ret < 0) {
             error_report("failed to get the data, %s", strerror(errno));
-            return;
+            goto out;
         }
         break;
     }
@@ -857,38 +759,43 @@ static void aio_read_response(void *opaque)
         error_report("%s", sd_strerror(rsp.result));
     }
 
-    rest = free_aio_req(s, aio_req);
-    if (!rest) {
+    free_aio_req(s, aio_req);
+    if (!acb->nr_pending) {
         /*
          * We've finished all requests which belong to the AIOCB, so
-         * we can call the callback now.
+         * we can switch back to sd_co_readv/writev now.
          */
         acb->aio_done_func(acb);
     }
+out:
+    s->co_recv = NULL;
 }
 
-static int aio_flush_request(void *opaque)
+static void co_read_response(void *opaque)
 {
     BDRVSheepdogState *s = opaque;
 
-    return !QLIST_EMPTY(&s->outstanding_aio_head);
-}
+    if (!s->co_recv) {
+        s->co_recv = qemu_coroutine_create(aio_read_response);
+    }
 
-#if !defined(SOL_TCP) || !defined(TCP_CORK)
+    qemu_coroutine_enter(s->co_recv, opaque);
+}
 
-static int set_cork(int fd, int v)
+static void co_write_request(void *opaque)
 {
-    return 0;
-}
+    BDRVSheepdogState *s = opaque;
 
-#else
+    qemu_coroutine_enter(s->co_send, NULL);
+}
 
-static int set_cork(int fd, int v)
+static int aio_flush_request(void *opaque)
 {
-    return setsockopt(fd, SOL_TCP, TCP_CORK, &v, sizeof(v));
-}
+    BDRVSheepdogState *s = opaque;
 
-#endif
+    return !QLIST_EMPTY(&s->inflight_aio_head) ||
+        !QLIST_EMPTY(&s->pending_aio_head);
+}
 
 static int set_nodelay(int fd)
 {
@@ -912,7 +819,7 @@ static int get_sheep_fd(BDRVSheepdogState *s)
     fd = connect_to_sdog(s->addr, s->port);
     if (fd < 0) {
         error_report("%s", strerror(errno));
-        return -1;
+        return fd;
     }
 
     socket_set_nonblock(fd);
@@ -921,11 +828,10 @@ static int get_sheep_fd(BDRVSheepdogState *s)
     if (ret) {
         error_report("%s", strerror(errno));
         closesocket(fd);
-        return -1;
+        return -errno;
     }
 
-    qemu_aio_set_fd_handler(fd, aio_read_response, NULL, aio_flush_request,
-                            NULL, s);
+    qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request, s);
     return fd;
 }
 
@@ -1009,7 +915,7 @@ static int find_vdi_name(BDRVSheepdogState *s, char *filename, uint32_t snapid,
 
     fd = connect_to_sdog(s->addr, s->port);
     if (fd < 0) {
-        return -1;
+        return fd;
     }
 
     memset(buf, 0, sizeof(buf));
@@ -1030,14 +936,17 @@ static int find_vdi_name(BDRVSheepdogState *s, char *filename, uint32_t snapid,
 
     ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
     if (ret) {
-        ret = -1;
         goto out;
     }
 
     if (rsp->result != SD_RES_SUCCESS) {
         error_report("cannot get vdi info, %s, %s %d %s",
                      sd_strerror(rsp->result), filename, snapid, tag);
-        ret = -1;
+        if (rsp->result == SD_RES_NO_VDI) {
+            ret = -ENOENT;
+        } else {
+            ret = -EIO;
+        }
         goto out;
     }
     *vid = rsp->vdi_id;
@@ -1048,7 +957,7 @@ out:
     return ret;
 }
 
-static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
+static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
                            struct iovec *iov, int niov, int create,
                            enum AIOCBState aiocb_type)
 {
@@ -1082,6 +991,10 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
         hdr.flags = SD_FLAG_CMD_WRITE | flags;
     }
 
+    if (s->cache_enabled) {
+        hdr.flags |= SD_FLAG_CMD_CACHE;
+    }
+
     hdr.oid = oid;
     hdr.cow_oid = old_oid;
     hdr.copies = s->inode.nr_copies;
@@ -1091,31 +1004,40 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
 
     hdr.id = aio_req->id;
 
-    set_cork(s->fd, 1);
+    qemu_co_mutex_lock(&s->lock);
+    s->co_send = qemu_coroutine_self();
+    qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request,
+                            aio_flush_request, s);
+    socket_set_cork(s->fd, 1);
 
     /* send a header */
-    ret = do_write(s->fd, &hdr, sizeof(hdr));
-    if (ret) {
+    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
+    if (ret < 0) {
+        qemu_co_mutex_unlock(&s->lock);
         error_report("failed to send a req, %s", strerror(errno));
-        return -EIO;
+        return -errno;
     }
 
     if (wlen) {
-        ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset);
-        if (ret) {
+        ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
+        if (ret < 0) {
+            qemu_co_mutex_unlock(&s->lock);
             error_report("failed to send a data, %s", strerror(errno));
-            return -EIO;
+            return -errno;
         }
     }
 
-    set_cork(s->fd, 0);
+    socket_set_cork(s->fd, 0);
+    qemu_aio_set_fd_handler(s->fd, co_read_response, NULL,
+                            aio_flush_request, s);
+    qemu_co_mutex_unlock(&s->lock);
 
     return 0;
 }
 
 static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
                              unsigned int datalen, uint64_t offset,
-                             int write, int create)
+                             int write, int create, uint8_t cache)
 {
     SheepdogObjReq hdr;
     SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
@@ -1138,6 +1060,11 @@ static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
         rlen = datalen;
         hdr.opcode = SD_OP_READ_OBJ;
     }
+
+    if (cache) {
+        hdr.flags |= SD_FLAG_CMD_CACHE;
+    }
+
     hdr.oid = oid;
     hdr.data_length = datalen;
     hdr.offset = offset;
@@ -1146,7 +1073,7 @@ static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
     ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
     if (ret) {
         error_report("failed to send a request to the sheep");
-        return -1;
+        return ret;
     }
 
     switch (rsp->result) {
@@ -1154,20 +1081,23 @@ static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
         return 0;
     default:
         error_report("%s", sd_strerror(rsp->result));
-        return -1;
+        return -EIO;
     }
 }
 
 static int read_object(int fd, char *buf, uint64_t oid, int copies,
-                       unsigned int datalen, uint64_t offset)
+                       unsigned int datalen, uint64_t offset, uint8_t cache)
 {
-    return read_write_object(fd, buf, oid, copies, datalen, offset, 0, 0);
+    return read_write_object(fd, buf, oid, copies, datalen, offset, 0, 0,
+                             cache);
 }
 
 static int write_object(int fd, char *buf, uint64_t oid, int copies,
-                        unsigned int datalen, uint64_t offset, int create)
+                        unsigned int datalen, uint64_t offset, int create,
+                        uint8_t cache)
 {
-    return read_write_object(fd, buf, oid, copies, datalen, offset, 1, create);
+    return read_write_object(fd, buf, oid, copies, datalen, offset, 1, create,
+                             cache);
 }
 
 static int sd_open(BlockDriverState *bs, const char *filename, int flags)
@@ -1181,16 +1111,19 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
 
     strstart(filename, "sheepdog:", (const char **)&filename);
 
-    QLIST_INIT(&s->outstanding_aio_head);
+    QLIST_INIT(&s->inflight_aio_head);
+    QLIST_INIT(&s->pending_aio_head);
     s->fd = -1;
 
     memset(vdi, 0, sizeof(vdi));
     memset(tag, 0, sizeof(tag));
     if (parse_vdiname(s, filename, vdi, &snapid, tag) < 0) {
+        ret = -EINVAL;
         goto out;
     }
     s->fd = get_sheep_fd(s);
     if (s->fd < 0) {
+        ret = s->fd;
         goto out;
     }
 
@@ -1199,7 +1132,17 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
         goto out;
     }
 
-    if (snapid) {
+    if (flags & BDRV_O_CACHE_WB) {
+        s->cache_enabled = 1;
+        s->flush_fd = connect_to_sdog(s->addr, s->port);
+        if (s->flush_fd < 0) {
+            error_report("failed to connect");
+            ret = s->flush_fd;
+            goto out;
+        }
+    }
+
+    if (snapid || tag[0] != '\0') {
         dprintf("%" PRIx32 " snapshot inode was open.\n", vid);
         s->is_snapshot = 1;
     }
@@ -1207,11 +1150,13 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
     fd = connect_to_sdog(s->addr, s->port);
     if (fd < 0) {
         error_report("failed to connect");
+        ret = fd;
         goto out;
     }
 
     buf = g_malloc(SD_INODE_SIZE);
-    ret = read_object(fd, buf, vid_to_vdi_oid(vid), 0, SD_INODE_SIZE, 0);
+    ret = read_object(fd, buf, vid_to_vdi_oid(vid), 0, SD_INODE_SIZE, 0,
+                      s->cache_enabled);
 
     closesocket(fd);
 
@@ -1225,15 +1170,16 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
 
     bs->total_sectors = s->inode.vdi_size / SECTOR_SIZE;
     strncpy(s->name, vdi, sizeof(s->name));
+    qemu_co_mutex_init(&s->lock);
     g_free(buf);
     return 0;
 out:
-    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL, NULL);
+    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL);
     if (s->fd >= 0) {
         closesocket(s->fd);
     }
     g_free(buf);
-    return -1;
+    return ret;
 }
 
 static int do_sd_create(char *filename, int64_t vdi_size,
@@ -1248,7 +1194,7 @@ static int do_sd_create(char *filename, int64_t vdi_size,
 
     fd = connect_to_sdog(addr, port);
     if (fd < 0) {
-        return -EIO;
+        return fd;
     }
 
     memset(buf, 0, sizeof(buf));
@@ -1271,7 +1217,7 @@ static int do_sd_create(char *filename, int64_t vdi_size,
     closesocket(fd);
 
     if (ret) {
-        return -EIO;
+        return ret;
     }
 
     if (rsp->result != SD_RES_SUCCESS) {
@@ -1331,24 +1277,26 @@ out:
 
 static int sd_create(const char *filename, QEMUOptionParameter *options)
 {
-    int ret;
+    int ret = 0;
     uint32_t vid = 0, base_vid = 0;
     int64_t vdi_size = 0;
     char *backing_file = NULL;
-    BDRVSheepdogState s;
+    BDRVSheepdogState *s;
     char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
     uint32_t snapid;
     int prealloc = 0;
     const char *vdiname;
 
+    s = g_malloc0(sizeof(BDRVSheepdogState));
+
     strstart(filename, "sheepdog:", &vdiname);
 
-    memset(&s, 0, sizeof(s));
     memset(vdi, 0, sizeof(vdi));
     memset(tag, 0, sizeof(tag));
-    if (parse_vdiname(&s, vdiname, vdi, &snapid, tag) < 0) {
+    if (parse_vdiname(s, vdiname, vdi, &snapid, tag) < 0) {
         error_report("invalid filename");
-        return -EINVAL;
+        ret = -EINVAL;
+        goto out;
     }
 
     while (options && options->name) {
@@ -1364,7 +1312,8 @@ static int sd_create(const char *filename, QEMUOptionParameter *options)
             } else {
                 error_report("Invalid preallocation mode: '%s'",
                              options->value.s);
-                return -EINVAL;
+                ret = -EINVAL;
+                goto out;
             }
         }
         options++;
@@ -1372,7 +1321,8 @@ static int sd_create(const char *filename, QEMUOptionParameter *options)
 
     if (vdi_size > SD_MAX_VDI_SIZE) {
         error_report("too big image size");
-        return -EINVAL;
+        ret = -EINVAL;
+        goto out;
     }
 
     if (backing_file) {
@@ -1384,31 +1334,37 @@ static int sd_create(const char *filename, QEMUOptionParameter *options)
         drv = bdrv_find_protocol(backing_file);
         if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
             error_report("backing_file must be a sheepdog image");
-            return -EINVAL;
+            ret = -EINVAL;
+            goto out;
         }
 
         ret = bdrv_file_open(&bs, backing_file, 0);
-        if (ret < 0)
-            return -EIO;
+        if (ret < 0) {
+            goto out;
+        }
 
         s = bs->opaque;
 
         if (!is_snapshot(&s->inode)) {
             error_report("cannot clone from a non snapshot vdi");
             bdrv_delete(bs);
-            return -EINVAL;
+            ret = -EINVAL;
+            goto out;
         }
 
         base_vid = s->inode.vdi_id;
         bdrv_delete(bs);
     }
 
-    ret = do_sd_create(vdi, vdi_size, base_vid, &vid, 0, s.addr, s.port);
+    ret = do_sd_create(vdi, vdi_size, base_vid, &vid, 0, s->addr, s->port);
     if (!prealloc || ret) {
-        return ret;
+        goto out;
     }
 
-    return sd_prealloc(filename);
+    ret = sd_prealloc(filename);
+out:
+    g_free(s);
+    return ret;
 }
 
 static void sd_close(BlockDriverState *bs)
@@ -1442,8 +1398,11 @@ static void sd_close(BlockDriverState *bs)
         error_report("%s, %s", sd_strerror(rsp->result), s->name);
     }
 
-    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL, NULL);
+    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL);
     closesocket(s->fd);
+    if (s->cache_enabled) {
+        closesocket(s->flush_fd);
+    }
     g_free(s->addr);
 }
 
@@ -1470,30 +1429,29 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset)
 
     fd = connect_to_sdog(s->addr, s->port);
     if (fd < 0) {
-        return -EIO;
+        return fd;
     }
 
     /* we don't need to update entire object */
     datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
     s->inode.vdi_size = offset;
     ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
-                       s->inode.nr_copies, datalen, 0, 0);
+                       s->inode.nr_copies, datalen, 0, 0, s->cache_enabled);
     close(fd);
 
     if (ret < 0) {
         error_report("failed to update an inode.");
-        return -EIO;
     }
 
-    return 0;
+    return ret;
 }
 
 /*
  * This function is called after writing data objects.  If we need to
  * update metadata, this sends a write request to the vdi object.
- * Otherwise, this calls the AIOCB callback.
+ * Otherwise, this switches back to sd_co_readv/writev.
  */
-static void sd_write_done(SheepdogAIOCB *acb)
+static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
 {
     int ret;
     BDRVSheepdogState *s = acb->common.bs->opaque;
@@ -1516,6 +1474,7 @@ static void sd_write_done(SheepdogAIOCB *acb)
         iov.iov_len = sizeof(s->inode);
         aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
                                 data_len, offset, 0, 0, offset);
+        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
         ret = add_aio_request(s, aio_req, &iov, 1, 0, AIOCB_WRITE_UDATA);
         if (ret) {
             free_aio_req(s, aio_req);
@@ -1555,11 +1514,12 @@ static int sd_create_branch(BDRVSheepdogState *s)
     fd = connect_to_sdog(s->addr, s->port);
     if (fd < 0) {
         error_report("failed to connect");
+        ret = fd;
         goto out;
     }
 
     ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies,
-                      SD_INODE_SIZE, 0);
+                      SD_INODE_SIZE, 0, s->cache_enabled);
 
     closesocket(fd);
 
@@ -1583,12 +1543,15 @@ out:
  * Send I/O requests to the server.
  *
  * This function sends requests to the server, links the requests to
- * the outstanding_list in BDRVSheepdogState, and exits without
+ * the inflight_list in BDRVSheepdogState, and exits without
  * waiting the response.  The responses are received in the
  * `aio_read_response' function which is called from the main loop as
  * a fd handler.
+ *
+ * Returns 1 when we need to wait a response, 0 when there is no sent
+ * request and -errno in error cases.
  */
-static void sd_readv_writev_bh_cb(void *p)
+static int coroutine_fn sd_co_rw_vector(void *p)
 {
     SheepdogAIOCB *acb = p;
     int ret = 0;
@@ -1600,9 +1563,6 @@ static void sd_readv_writev_bh_cb(void *p)
     SheepdogInode *inode = &s->inode;
     AIOReq *aio_req;
 
-    qemu_bh_delete(acb->bh);
-    acb->bh = NULL;
-
     if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
         /*
          * In the case we open the snapshot VDI, Sheepdog creates the
@@ -1615,6 +1575,12 @@ static void sd_readv_writev_bh_cb(void *p)
         }
     }
 
+    /*
+     * Make sure we don't free the aiocb before we are done with all requests.
+     * This additional reference is dropped at the end of this function.
+     */
+    acb->nr_pending++;
+
     while (done != total) {
         uint8_t flags = 0;
         uint64_t old_oid = 0;
@@ -1639,22 +1605,18 @@ static void sd_readv_writev_bh_cb(void *p)
         }
 
         if (create) {
-            dprintf("update ino (%" PRIu32") %" PRIu64 " %" PRIu64
-                    " %" PRIu64 "\n", inode->vdi_id, oid,
+            dprintf("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
+                    inode->vdi_id, oid,
                     vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
             oid = vid_to_data_oid(inode->vdi_id, idx);
-            dprintf("new oid %lx\n", oid);
+            dprintf("new oid %" PRIx64 "\n", oid);
         }
 
         aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid, done);
 
         if (create) {
             AIOReq *areq;
-            QLIST_FOREACH(areq, &s->outstanding_aio_head,
-                          outstanding_aio_siblings) {
-                if (areq == aio_req) {
-                    continue;
-                }
+            QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
                 if (areq->oid == oid) {
                     /*
                      * Sheepdog cannot handle simultaneous create
@@ -1664,11 +1626,14 @@ static void sd_readv_writev_bh_cb(void *p)
                      */
                     aio_req->flags = 0;
                     aio_req->base_oid = 0;
+                    QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req,
+                                      aio_siblings);
                     goto done;
                 }
             }
         }
 
+        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
         ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
                               create, acb->aiocb_type);
         if (ret < 0) {
@@ -1683,43 +1648,48 @@ static void sd_readv_writev_bh_cb(void *p)
         done += len;
     }
 out:
-    if (QLIST_EMPTY(&acb->aioreq_head)) {
-        sd_finish_aiocb(acb);
+    if (!--acb->nr_pending) {
+        return acb->ret;
     }
+    return 1;
 }
 
-static BlockDriverAIOCB *sd_aio_writev(BlockDriverState *bs, int64_t sector_num,
-                                       QEMUIOVector *qiov, int nb_sectors,
-                                       BlockDriverCompletionFunc *cb,
-                                       void *opaque)
+static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
+                        int nb_sectors, QEMUIOVector *qiov)
 {
     SheepdogAIOCB *acb;
+    int ret;
 
     if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
-        /* TODO: shouldn't block here */
-        if (sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE) < 0) {
-            return NULL;
+        ret = sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE);
+        if (ret < 0) {
+            return ret;
         }
         bs->total_sectors = sector_num + nb_sectors;
     }
 
-    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
     acb->aio_done_func = sd_write_done;
     acb->aiocb_type = AIOCB_WRITE_UDATA;
 
-    sd_schedule_bh(sd_readv_writev_bh_cb, acb);
-    return &acb->common;
+    ret = sd_co_rw_vector(acb);
+    if (ret <= 0) {
+        qemu_aio_release(acb);
+        return ret;
+    }
+
+    qemu_coroutine_yield();
+
+    return acb->ret;
 }
 
-static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs, int64_t sector_num,
-                                      QEMUIOVector *qiov, int nb_sectors,
-                                      BlockDriverCompletionFunc *cb,
-                                      void *opaque)
+static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
+                       int nb_sectors, QEMUIOVector *qiov)
 {
     SheepdogAIOCB *acb;
-    int i;
+    int i, ret;
 
-    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
     acb->aiocb_type = AIOCB_READ_UDATA;
     acb->aio_done_func = sd_finish_aiocb;
 
@@ -1731,8 +1701,53 @@ static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs, int64_t sector_num,
         memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len);
     }
 
-    sd_schedule_bh(sd_readv_writev_bh_cb, acb);
-    return &acb->common;
+    ret = sd_co_rw_vector(acb);
+    if (ret <= 0) {
+        qemu_aio_release(acb);
+        return ret;
+    }
+
+    qemu_coroutine_yield();
+
+    return acb->ret;
+}
+
+static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
+{
+    BDRVSheepdogState *s = bs->opaque;
+    SheepdogObjReq hdr = { 0 };
+    SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
+    SheepdogInode *inode = &s->inode;
+    int ret;
+    unsigned int wlen = 0, rlen = 0;
+
+    if (!s->cache_enabled) {
+        return 0;
+    }
+
+    hdr.opcode = SD_OP_FLUSH_VDI;
+    hdr.oid = vid_to_vdi_oid(inode->vdi_id);
+
+    ret = do_req(s->flush_fd, (SheepdogReq *)&hdr, NULL, &wlen, &rlen);
+    if (ret) {
+        error_report("failed to send a request to the sheep");
+        return ret;
+    }
+
+    if (rsp->result == SD_RES_INVALID_PARMS) {
+        dprintf("disable write cache since the server doesn't support it\n");
+
+        s->cache_enabled = 0;
+        closesocket(s->flush_fd);
+        return 0;
+    }
+
+    if (rsp->result != SD_RES_SUCCESS) {
+        error_report("%s", sd_strerror(rsp->result));
+        return -EIO;
+    }
+
+    return 0;
 }
 
 static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
@@ -1743,7 +1758,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
     SheepdogInode *inode;
     unsigned int datalen;
 
-    dprintf("sn_info: name %s id_str %s s: name %s vm_state_size %d "
+    dprintf("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
             "is_snapshot %d\n", sn_info->name, sn_info->id_str,
             s->name, sn_info->vm_state_size, s->is_snapshot);
 
@@ -1765,15 +1780,14 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
     /* refresh inode. */
     fd = connect_to_sdog(s->addr, s->port);
     if (fd < 0) {
-        ret = -EIO;
+        ret = fd;
         goto cleanup;
     }
 
     ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
-                       s->inode.nr_copies, datalen, 0, 0);
+                       s->inode.nr_copies, datalen, 0, 0, s->cache_enabled);
     if (ret < 0) {
         error_report("failed to write snapshot's inode.");
-        ret = -EIO;
         goto cleanup;
     }
 
@@ -1782,18 +1796,16 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
     if (ret < 0) {
         error_report("failed to create inode for snapshot. %s",
                      strerror(errno));
-        ret = -EIO;
         goto cleanup;
     }
 
     inode = (SheepdogInode *)g_malloc(datalen);
 
     ret = read_object(fd, (char *)inode, vid_to_vdi_oid(new_vid),
-                      s->inode.nr_copies, datalen, 0);
+                      s->inode.nr_copies, datalen, 0, s->cache_enabled);
 
     if (ret < 0) {
         error_report("failed to read new inode info. %s", strerror(errno));
-        ret = -EIO;
         goto cleanup;
     }
 
@@ -1814,7 +1826,7 @@ static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
     char *buf = NULL;
     uint32_t vid;
     uint32_t snapid = 0;
-    int ret = -ENOENT, fd;
+    int ret = 0, fd;
 
     old_s = g_malloc(sizeof(BDRVSheepdogState));
 
@@ -1832,24 +1844,23 @@ static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
     ret = find_vdi_name(s, vdi, snapid, tag, &vid, 1);
     if (ret) {
         error_report("Failed to find_vdi_name");
-        ret = -ENOENT;
         goto out;
     }
 
     fd = connect_to_sdog(s->addr, s->port);
     if (fd < 0) {
         error_report("failed to connect");
+        ret = fd;
         goto out;
     }
 
     buf = g_malloc(SD_INODE_SIZE);
     ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies,
-                      SD_INODE_SIZE, 0);
+                      SD_INODE_SIZE, 0, s->cache_enabled);
 
     closesocket(fd);
 
     if (ret) {
-        ret = -ENOENT;
         goto out;
     }
 
@@ -1902,6 +1913,7 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
 
     fd = connect_to_sdog(s->addr, s->port);
     if (fd < 0) {
+        ret = fd;
         goto out;
     }
 
@@ -1929,6 +1941,7 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
     fd = connect_to_sdog(s->addr, s->port);
     if (fd < 0) {
         error_report("failed to connect");
+        ret = fd;
         goto out;
     }
 
@@ -1939,7 +1952,8 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
 
         /* we don't need to read entire object */
         ret = read_object(fd, (char *)&inode, vid_to_vdi_oid(vid),
-                          0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0);
+                          0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
+                          s->cache_enabled);
 
         if (ret) {
             continue;
@@ -1965,6 +1979,10 @@ out:
 
     g_free(vdi_inuse);
 
+    if (ret < 0) {
+        return ret;
+    }
+
     return found;
 }
 
@@ -1972,7 +1990,7 @@ static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
                                 int64_t pos, int size, int load)
 {
     int fd, create;
-    int ret = 0;
+    int ret = 0, remaining = size;
     unsigned int data_len;
     uint64_t vmstate_oid;
     uint32_t vdi_index;
@@ -1980,37 +1998,37 @@ static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
 
     fd = connect_to_sdog(s->addr, s->port);
     if (fd < 0) {
-        ret = -EIO;
-        goto cleanup;
+        return fd;
     }
 
-    while (size) {
+    while (remaining) {
         vdi_index = pos / SD_DATA_OBJ_SIZE;
         offset = pos % SD_DATA_OBJ_SIZE;
 
-        data_len = MIN(size, SD_DATA_OBJ_SIZE);
+        data_len = MIN(remaining, SD_DATA_OBJ_SIZE);
 
         vmstate_oid = vid_to_vmstate_oid(s->inode.vdi_id, vdi_index);
 
         create = (offset == 0);
         if (load) {
             ret = read_object(fd, (char *)data, vmstate_oid,
-                              s->inode.nr_copies, data_len, offset);
+                              s->inode.nr_copies, data_len, offset,
+                              s->cache_enabled);
         } else {
             ret = write_object(fd, (char *)data, vmstate_oid,
-                               s->inode.nr_copies, data_len, offset, create);
+                               s->inode.nr_copies, data_len, offset, create,
+                               s->cache_enabled);
         }
 
         if (ret < 0) {
             error_report("failed to save vmstate %s", strerror(errno));
-            ret = -EIO;
             goto cleanup;
         }
 
         pos += data_len;
-        size -= data_len;
-        ret += data_len;
+        remaining -= data_len;
     }
+    ret = size;
 cleanup:
     closesocket(fd);
     return ret;
@@ -2062,8 +2080,9 @@ BlockDriver bdrv_sheepdog = {
     .bdrv_getlength = sd_getlength,
     .bdrv_truncate  = sd_truncate,
 
-    .bdrv_aio_readv     = sd_aio_readv,
-    .bdrv_aio_writev    = sd_aio_writev,
+    .bdrv_co_readv  = sd_co_readv,
+    .bdrv_co_writev = sd_co_writev,
+    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
 
     .bdrv_snapshot_create   = sd_snapshot_create,
     .bdrv_snapshot_goto     = sd_snapshot_goto,