]> git.proxmox.com Git - mirror_qemu.git/blobdiff - block/sheepdog.c
block/ssh: Propagate errors through authenticate()
[mirror_qemu.git] / block / sheepdog.c
index 5f81c93ee32af613662846c99ef9757590cf663a..2c3fb016a8f87b23e949bda139cb2c193a089cb3 100644 (file)
 #define SD_NR_VDIS   (1U << 24)
 #define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
 #define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
+/*
+ * For erasure coding, we use at most SD_EC_MAX_STRIP for data strips and
+ * (SD_EC_MAX_STRIP - 1) for parity strips
+ *
+ * SD_MAX_COPIES is sum of number of data strips and parity strips.
+ */
+#define SD_EC_MAX_STRIP 16
+#define SD_MAX_COPIES (SD_EC_MAX_STRIP * 2 - 1)
 
 #define SD_INODE_SIZE (sizeof(SheepdogInode))
 #define CURRENT_VDI_ID 0
@@ -125,8 +133,9 @@ typedef struct SheepdogObjReq {
     uint32_t data_length;
     uint64_t oid;
     uint64_t cow_oid;
-    uint32_t copies;
-    uint32_t rsvd;
+    uint8_t copies;
+    uint8_t copy_policy;
+    uint8_t reserved[6];
     uint64_t offset;
 } SheepdogObjReq;
 
@@ -138,7 +147,9 @@ typedef struct SheepdogObjRsp {
     uint32_t id;
     uint32_t data_length;
     uint32_t result;
-    uint32_t copies;
+    uint8_t copies;
+    uint8_t copy_policy;
+    uint8_t reserved[2];
     uint32_t pad[6];
 } SheepdogObjRsp;
 
@@ -150,8 +161,10 @@ typedef struct SheepdogVdiReq {
     uint32_t id;
     uint32_t data_length;
     uint64_t vdi_size;
-    uint32_t vdi_id;
-    uint32_t copies;
+    uint32_t base_vdi_id;
+    uint8_t copies;
+    uint8_t copy_policy;
+    uint8_t reserved[2];
     uint32_t snapid;
     uint32_t pad[3];
 } SheepdogVdiReq;
@@ -222,6 +235,11 @@ static inline uint64_t data_oid_to_idx(uint64_t oid)
     return oid & (MAX_DATA_OBJS - 1);
 }
 
+static inline uint32_t oid_to_vid(uint64_t oid)
+{
+    return (oid & ~VDI_BIT) >> VDI_SPACE_SHIFT;
+}
+
 static inline uint64_t vid_to_vdi_oid(uint32_t vid)
 {
     return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
@@ -289,11 +307,14 @@ struct SheepdogAIOCB {
     Coroutine *coroutine;
     void (*aio_done_func)(SheepdogAIOCB *);
 
-    bool canceled;
+    bool cancelable;
+    bool *finished;
     int nr_pending;
 };
 
 typedef struct BDRVSheepdogState {
+    BlockDriverState *bs;
+
     SheepdogInode inode;
 
     uint32_t min_dirty_data_idx;
@@ -313,8 +334,11 @@ typedef struct BDRVSheepdogState {
     Coroutine *co_recv;
 
     uint32_t aioreq_seq_num;
+
+    /* Every aio request must be linked to either of these queues. */
     QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
     QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head;
+    QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
 } BDRVSheepdogState;
 
 static const char * sd_strerror(int err)
@@ -403,6 +427,7 @@ static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
 {
     SheepdogAIOCB *acb = aio_req->aiocb;
 
+    acb->cancelable = false;
     QLIST_REMOVE(aio_req, aio_siblings);
     g_free(aio_req);
 
@@ -411,23 +436,68 @@ static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
 
 static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
 {
-    if (!acb->canceled) {
-        qemu_coroutine_enter(acb->coroutine, NULL);
+    qemu_coroutine_enter(acb->coroutine, NULL);
+    if (acb->finished) {
+        *acb->finished = true;
     }
     qemu_aio_release(acb);
 }
 
+/*
+ * Check whether the specified acb can be canceled
+ *
+ * We can cancel aio when any request belonging to the acb is:
+ *  - Not processed by the sheepdog server.
+ *  - Not linked to the inflight queue.
+ */
+static bool sd_acb_cancelable(const SheepdogAIOCB *acb)
+{
+    BDRVSheepdogState *s = acb->common.bs->opaque;
+    AIOReq *aioreq;
+
+    if (!acb->cancelable) {
+        return false;
+    }
+
+    QLIST_FOREACH(aioreq, &s->inflight_aio_head, aio_siblings) {
+        if (aioreq->aiocb == acb) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
 static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
+    BDRVSheepdogState *s = acb->common.bs->opaque;
+    AIOReq *aioreq, *next;
+    bool finished = false;
+
+    acb->finished = &finished;
+    while (!finished) {
+        if (sd_acb_cancelable(acb)) {
+            /* Remove outstanding requests from pending and failed queues.  */
+            QLIST_FOREACH_SAFE(aioreq, &s->pending_aio_head, aio_siblings,
+                               next) {
+                if (aioreq->aiocb == acb) {
+                    free_aio_req(s, aioreq);
+                }
+            }
+            QLIST_FOREACH_SAFE(aioreq, &s->failed_aio_head, aio_siblings,
+                               next) {
+                if (aioreq->aiocb == acb) {
+                    free_aio_req(s, aioreq);
+                }
+            }
 
-    /*
-     * Sheepdog cannot cancel the requests which are already sent to
-     * the servers, so we just complete the request with -EIO here.
-     */
-    acb->ret = -EIO;
-    qemu_coroutine_enter(acb->coroutine, NULL);
-    acb->canceled = true;
+            assert(acb->nr_pending == 0);
+            sd_finish_aiocb(acb);
+            return;
+        }
+        qemu_aio_wait();
+    }
 }
 
 static const AIOCBInfo sd_aiocb_info = {
@@ -448,7 +518,8 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
     acb->nb_sectors = nb_sectors;
 
     acb->aio_done_func = NULL;
-    acb->canceled = false;
+    acb->cancelable = true;
+    acb->finished = NULL;
     acb->coroutine = qemu_coroutine_self();
     acb->ret = 0;
     acb->nr_pending = 0;
@@ -489,13 +560,13 @@ static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
     int ret;
 
     ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
-    if (ret < sizeof(*hdr)) {
+    if (ret != sizeof(*hdr)) {
         error_report("failed to send a req, %s", strerror(errno));
         return ret;
     }
 
     ret = qemu_co_send(sockfd, data, *wlen);
-    if (ret < *wlen) {
+    if (ret != *wlen) {
         error_report("failed to send a req, %s", strerror(errno));
     }
 
@@ -541,7 +612,7 @@ static coroutine_fn void do_co_req(void *opaque)
     qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, co);
 
     ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
-    if (ret < sizeof(*hdr)) {
+    if (ret != sizeof(*hdr)) {
         error_report("failed to get a rsp, %s", strerror(errno));
         ret = -errno;
         goto out;
@@ -553,7 +624,7 @@ static coroutine_fn void do_co_req(void *opaque)
 
     if (*rlen) {
         ret = qemu_co_recv(sockfd, data, *rlen);
-        if (ret < *rlen) {
+        if (ret != *rlen) {
             error_report("failed to get the data, %s", strerror(errno));
             ret = -errno;
             goto out;
@@ -596,11 +667,13 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
     return srco.ret;
 }
 
-static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
+static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
                            struct iovec *iov, int niov, bool create,
                            enum AIOCBState aiocb_type);
-static int coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
-
+static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
+static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
+static int get_sheep_fd(BDRVSheepdogState *s);
+static void co_write_request(void *opaque);
 
 static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid)
 {
@@ -623,22 +696,59 @@ static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid)
 {
     AIOReq *aio_req;
     SheepdogAIOCB *acb;
-    int ret;
 
     while ((aio_req = find_pending_req(s, oid)) != NULL) {
         acb = aio_req->aiocb;
         /* move aio_req from pending list to inflight one */
         QLIST_REMOVE(aio_req, aio_siblings);
         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
-        ret = add_aio_request(s, aio_req, acb->qiov->iov,
-                              acb->qiov->niov, false, acb->aiocb_type);
-        if (ret < 0) {
-            error_report("add_aio_request is failed");
-            free_aio_req(s, aio_req);
-            if (!acb->nr_pending) {
-                sd_finish_aiocb(acb);
-            }
+        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov, false,
+                        acb->aiocb_type);
+    }
+}
+
+static coroutine_fn void reconnect_to_sdog(void *opaque)
+{
+    BDRVSheepdogState *s = opaque;
+    AIOReq *aio_req, *next;
+
+    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL);
+    close(s->fd);
+    s->fd = -1;
+
+    /* Wait for outstanding write requests to be completed. */
+    while (s->co_send != NULL) {
+        co_write_request(opaque);
+    }
+
+    /* Try to reconnect the sheepdog server every one second. */
+    while (s->fd < 0) {
+        s->fd = get_sheep_fd(s);
+        if (s->fd < 0) {
+            DPRINTF("Wait for connection to be established\n");
+            co_aio_sleep_ns(bdrv_get_aio_context(s->bs), QEMU_CLOCK_REALTIME,
+                            1000000000ULL);
         }
+    };
+
+    /*
+     * Now we have to resend all the request in the inflight queue.  However,
+     * resend_aioreq() can yield and newly created requests can be added to the
+     * inflight queue before the coroutine is resumed.  To avoid mixing them, we
+     * have to move all the inflight requests to the failed queue before
+     * resend_aioreq() is called.
+     */
+    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
+        QLIST_REMOVE(aio_req, aio_siblings);
+        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
+    }
+
+    /* Resend all the failed aio requests. */
+    while (!QLIST_EMPTY(&s->failed_aio_head)) {
+        aio_req = QLIST_FIRST(&s->failed_aio_head);
+        QLIST_REMOVE(aio_req, aio_siblings);
+        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
+        resend_aioreq(s, aio_req);
     }
 }
 
@@ -658,15 +768,11 @@ static void coroutine_fn aio_read_response(void *opaque)
     SheepdogAIOCB *acb;
     uint64_t idx;
 
-    if (QLIST_EMPTY(&s->inflight_aio_head)) {
-        goto out;
-    }
-
     /* read a header */
     ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
-    if (ret < 0) {
+    if (ret != sizeof(rsp)) {
         error_report("failed to get the header, %s", strerror(errno));
-        goto out;
+        goto err;
     }
 
     /* find the right aio_req from the inflight aio list */
@@ -677,7 +783,7 @@ static void coroutine_fn aio_read_response(void *opaque)
     }
     if (!aio_req) {
         error_report("cannot find aio_req %x", rsp.id);
-        goto out;
+        goto err;
     }
 
     acb = aio_req->aiocb;
@@ -715,9 +821,9 @@ static void coroutine_fn aio_read_response(void *opaque)
     case AIOCB_READ_UDATA:
         ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
                             aio_req->iov_offset, rsp.data_length);
-        if (ret < 0) {
+        if (ret != rsp.data_length) {
             error_report("failed to get the data, %s", strerror(errno));
-            goto out;
+            goto err;
         }
         break;
     case AIOCB_FLUSH_CACHE:
@@ -748,11 +854,20 @@ static void coroutine_fn aio_read_response(void *opaque)
     case SD_RES_SUCCESS:
         break;
     case SD_RES_READONLY:
-        ret = resend_aioreq(s, aio_req);
-        if (ret == SD_RES_SUCCESS) {
-            goto out;
+        if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
+            ret = reload_inode(s, 0, "");
+            if (ret < 0) {
+                goto err;
+            }
         }
-        /* fall through */
+        if (is_data_obj(aio_req->oid)) {
+            aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
+                                           data_oid_to_idx(aio_req->oid));
+        } else {
+            aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
+        }
+        resend_aioreq(s, aio_req);
+        goto out;
     default:
         acb->ret = -EIO;
         error_report("%s", sd_strerror(rsp.result));
@@ -769,6 +884,10 @@ static void coroutine_fn aio_read_response(void *opaque)
     }
 out:
     s->co_recv = NULL;
+    return;
+err:
+    s->co_recv = NULL;
+    reconnect_to_sdog(opaque);
 }
 
 static void co_read_response(void *opaque)
@@ -790,9 +909,9 @@ static void co_write_request(void *opaque)
 }
 
 /*
- * Return a socket discriptor to read/write objects.
+ * Return a socket descriptor to read/write objects.
  *
- * We cannot use this discriptor for other operations because
+ * We cannot use this descriptor for other operations because
  * the block driver may be on waiting response from the server.
  */
 static int get_sheep_fd(BDRVSheepdogState *s)
@@ -980,7 +1099,7 @@ static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
     }
 
     if (rsp->result != SD_RES_SUCCESS) {
-        error_report("cannot get vdi info, %s, %s %d %s",
+        error_report("cannot get vdi info, %s, %s %" PRIu32 " %s",
                      sd_strerror(rsp->result), filename, snapid, tag);
         if (rsp->result == SD_RES_NO_VDI) {
             ret = -ENOENT;
@@ -997,7 +1116,7 @@ out:
     return ret;
 }
 
-static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
+static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
                            struct iovec *iov, int niov, bool create,
                            enum AIOCBState aiocb_type)
 {
@@ -1059,29 +1178,25 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
 
     /* send a header */
     ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
-    if (ret < 0) {
-        qemu_co_mutex_unlock(&s->lock);
+    if (ret != sizeof(hdr)) {
         error_report("failed to send a req, %s", strerror(errno));
-        return -errno;
+        goto out;
     }
 
     if (wlen) {
         ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
-        if (ret < 0) {
-            qemu_co_mutex_unlock(&s->lock);
+        if (ret != wlen) {
             error_report("failed to send a data, %s", strerror(errno));
-            return -errno;
         }
     }
-
+out:
     socket_set_cork(s->fd, 0);
     qemu_aio_set_fd_handler(s->fd, co_read_response, NULL, s);
+    s->co_send = NULL;
     qemu_co_mutex_unlock(&s->lock);
-
-    return 0;
 }
 
-static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
+static int read_write_object(int fd, char *buf, uint64_t oid, uint8_t copies,
                              unsigned int datalen, uint64_t offset,
                              bool write, bool create, uint32_t cache_flags)
 {
@@ -1129,7 +1244,7 @@ static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
     }
 }
 
-static int read_object(int fd, char *buf, uint64_t oid, int copies,
+static int read_object(int fd, char *buf, uint64_t oid, uint8_t copies,
                        unsigned int datalen, uint64_t offset,
                        uint32_t cache_flags)
 {
@@ -1137,7 +1252,7 @@ static int read_object(int fd, char *buf, uint64_t oid, int copies,
                              false, cache_flags);
 }
 
-static int write_object(int fd, char *buf, uint64_t oid, int copies,
+static int write_object(int fd, char *buf, uint64_t oid, uint8_t copies,
                         unsigned int datalen, uint64_t offset, bool create,
                         uint32_t cache_flags)
 {
@@ -1181,51 +1296,62 @@ out:
     return ret;
 }
 
-static int coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
+/* Return true if the specified request is linked to the pending list. */
+static bool check_simultaneous_create(BDRVSheepdogState *s, AIOReq *aio_req)
 {
-    SheepdogAIOCB *acb = aio_req->aiocb;
-    bool create = false;
-    int ret;
-
-    ret = reload_inode(s, 0, "");
-    if (ret < 0) {
-        return ret;
+    AIOReq *areq;
+    QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
+        if (areq != aio_req && areq->oid == aio_req->oid) {
+            /*
+             * Sheepdog cannot handle simultaneous create requests to the same
+             * object, so we cannot send the request until the previous request
+             * finishes.
+             */
+            DPRINTF("simultaneous create to %" PRIx64 "\n", aio_req->oid);
+            aio_req->flags = 0;
+            aio_req->base_oid = 0;
+            QLIST_REMOVE(aio_req, aio_siblings);
+            QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req, aio_siblings);
+            return true;
+        }
     }
 
-    aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
-                                   data_oid_to_idx(aio_req->oid));
+    return false;
+}
+
+static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
+{
+    SheepdogAIOCB *acb = aio_req->aiocb;
+    bool create = false;
 
     /* check whether this request becomes a CoW one */
-    if (acb->aiocb_type == AIOCB_WRITE_UDATA) {
+    if (acb->aiocb_type == AIOCB_WRITE_UDATA && is_data_obj(aio_req->oid)) {
         int idx = data_oid_to_idx(aio_req->oid);
-        AIOReq *areq;
 
-        if (s->inode.data_vdi_id[idx] == 0) {
-            create = true;
-            goto out;
-        }
         if (is_data_obj_writable(&s->inode, idx)) {
             goto out;
         }
 
-        /* link to the pending list if there is another CoW request to
-         * the same object */
-        QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
-            if (areq != aio_req && areq->oid == aio_req->oid) {
-                DPRINTF("simultaneous CoW to %" PRIx64 "\n", aio_req->oid);
-                QLIST_REMOVE(aio_req, aio_siblings);
-                QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req, aio_siblings);
-                return SD_RES_SUCCESS;
-            }
+        if (check_simultaneous_create(s, aio_req)) {
+            return;
         }
 
-        aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
-        aio_req->flags |= SD_FLAG_CMD_COW;
+        if (s->inode.data_vdi_id[idx]) {
+            aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
+            aio_req->flags |= SD_FLAG_CMD_COW;
+        }
         create = true;
     }
 out:
-    return add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
-                           create, acb->aiocb_type);
+    if (is_data_obj(aio_req->oid)) {
+        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov, create,
+                        acb->aiocb_type);
+    } else {
+        struct iovec iov;
+        iov.iov_base = &s->inode;
+        iov.iov_len = sizeof(s->inode);
+        add_aio_request(s, aio_req, &iov, 1, false, AIOCB_WRITE_UDATA);
+    }
 }
 
 /* TODO Convert to fine grained options */
@@ -1255,9 +1381,11 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags,
     Error *local_err = NULL;
     const char *filename;
 
-    opts = qemu_opts_create_nofail(&runtime_opts);
+    s->bs = bs;
+
+    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
     qemu_opts_absorb_qdict(opts, options, &local_err);
-    if (error_is_set(&local_err)) {
+    if (local_err) {
         qerror_report_err(local_err);
         error_free(local_err);
         ret = -EINVAL;
@@ -1268,6 +1396,7 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags,
 
     QLIST_INIT(&s->inflight_aio_head);
     QLIST_INIT(&s->pending_aio_head);
+    QLIST_INIT(&s->failed_aio_head);
     s->fd = -1;
 
     memset(vdi, 0, sizeof(vdi));
@@ -1343,8 +1472,7 @@ out:
     return ret;
 }
 
-static int do_sd_create(BDRVSheepdogState *s, char *filename, int64_t vdi_size,
-                        uint32_t base_vid, uint32_t *vdi_id, int snapshot)
+static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot)
 {
     SheepdogVdiReq hdr;
     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
@@ -1361,11 +1489,11 @@ static int do_sd_create(BDRVSheepdogState *s, char *filename, int64_t vdi_size,
      * does not fit in buf?  For now, just truncate and avoid buffer overrun.
      */
     memset(buf, 0, sizeof(buf));
-    pstrcpy(buf, sizeof(buf), filename);
+    pstrcpy(buf, sizeof(buf), s->name);
 
     memset(&hdr, 0, sizeof(hdr));
     hdr.opcode = SD_OP_NEW_VDI;
-    hdr.vdi_id = base_vid;
+    hdr.base_vdi_id = s->inode.vdi_id;
 
     wlen = SD_MAX_VDI_LEN;
 
@@ -1373,7 +1501,9 @@ static int do_sd_create(BDRVSheepdogState *s, char *filename, int64_t vdi_size,
     hdr.snapid = snapshot;
 
     hdr.data_length = wlen;
-    hdr.vdi_size = vdi_size;
+    hdr.vdi_size = s->inode.vdi_size;
+    hdr.copy_policy = s->inode.copy_policy;
+    hdr.copies = s->inode.nr_copies;
 
     ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
 
@@ -1384,7 +1514,7 @@ static int do_sd_create(BDRVSheepdogState *s, char *filename, int64_t vdi_size,
     }
 
     if (rsp->result != SD_RES_SUCCESS) {
-        error_report("%s, %s", sd_strerror(rsp->result), filename);
+        error_report("%s, %s", sd_strerror(rsp->result), s->inode.name);
         return -EIO;
     }
 
@@ -1404,7 +1534,8 @@ static int sd_prealloc(const char *filename)
     Error *local_err = NULL;
     int ret;
 
-    ret = bdrv_file_open(&bs, filename, NULL, BDRV_O_RDWR, &local_err);
+    ret = bdrv_open(&bs, filename, NULL, NULL, BDRV_O_RDWR | BDRV_O_PROTOCOL,
+                    NULL, &local_err);
     if (ret < 0) {
         qerror_report_err(local_err);
         error_free(local_err);
@@ -1441,27 +1572,79 @@ out:
     return ret;
 }
 
+/*
+ * Sheepdog support two kinds of redundancy, full replication and erasure
+ * coding.
+ *
+ * # create a fully replicated vdi with x copies
+ * -o redundancy=x (1 <= x <= SD_MAX_COPIES)
+ *
+ * # create a erasure coded vdi with x data strips and y parity strips
+ * -o redundancy=x:y (x must be one of {2,4,8,16} and 1 <= y < SD_EC_MAX_STRIP)
+ */
+static int parse_redundancy(BDRVSheepdogState *s, const char *opt)
+{
+    struct SheepdogInode *inode = &s->inode;
+    const char *n1, *n2;
+    long copy, parity;
+    char p[10];
+
+    pstrcpy(p, sizeof(p), opt);
+    n1 = strtok(p, ":");
+    n2 = strtok(NULL, ":");
+
+    if (!n1) {
+        return -EINVAL;
+    }
+
+    copy = strtol(n1, NULL, 10);
+    if (copy > SD_MAX_COPIES || copy < 1) {
+        return -EINVAL;
+    }
+    if (!n2) {
+        inode->copy_policy = 0;
+        inode->nr_copies = copy;
+        return 0;
+    }
+
+    if (copy != 2 && copy != 4 && copy != 8 && copy != 16) {
+        return -EINVAL;
+    }
+
+    parity = strtol(n2, NULL, 10);
+    if (parity >= SD_EC_MAX_STRIP || parity < 1) {
+        return -EINVAL;
+    }
+
+    /*
+     * 4 bits for parity and 4 bits for data.
+     * We have to compress upper data bits because it can't represent 16
+     */
+    inode->copy_policy = ((copy / 2) << 4) + parity;
+    inode->nr_copies = copy + parity;
+
+    return 0;
+}
+
 static int sd_create(const char *filename, QEMUOptionParameter *options,
                      Error **errp)
 {
     int ret = 0;
-    uint32_t vid = 0, base_vid = 0;
-    int64_t vdi_size = 0;
+    uint32_t vid = 0;
     char *backing_file = NULL;
     BDRVSheepdogState *s;
-    char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
+    char tag[SD_MAX_VDI_TAG_LEN];
     uint32_t snapid;
     bool prealloc = false;
     Error *local_err = NULL;
 
     s = g_malloc0(sizeof(BDRVSheepdogState));
 
-    memset(vdi, 0, sizeof(vdi));
     memset(tag, 0, sizeof(tag));
     if (strstr(filename, "://")) {
-        ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
+        ret = sd_parse_uri(s, filename, s->name, &snapid, tag);
     } else {
-        ret = parse_vdiname(s, filename, vdi, &snapid, tag);
+        ret = parse_vdiname(s, filename, s->name, &snapid, tag);
     }
     if (ret < 0) {
         goto out;
@@ -1469,7 +1652,7 @@ static int sd_create(const char *filename, QEMUOptionParameter *options,
 
     while (options && options->name) {
         if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
-            vdi_size = options->value.n;
+            s->inode.vdi_size = options->value.n;
         } else if (!strcmp(options->name, BLOCK_OPT_BACKING_FILE)) {
             backing_file = options->value.s;
         } else if (!strcmp(options->name, BLOCK_OPT_PREALLOC)) {
@@ -1483,11 +1666,18 @@ static int sd_create(const char *filename, QEMUOptionParameter *options,
                 ret = -EINVAL;
                 goto out;
             }
+        } else if (!strcmp(options->name, BLOCK_OPT_REDUNDANCY)) {
+            if (options->value.s) {
+                ret = parse_redundancy(s, options->value.s);
+                if (ret < 0) {
+                    goto out;
+                }
+            }
         }
         options++;
     }
 
-    if (vdi_size > SD_MAX_VDI_SIZE) {
+    if (s->inode.vdi_size > SD_MAX_VDI_SIZE) {
         error_report("too big image size");
         ret = -EINVAL;
         goto out;
@@ -1495,7 +1685,7 @@ static int sd_create(const char *filename, QEMUOptionParameter *options,
 
     if (backing_file) {
         BlockDriverState *bs;
-        BDRVSheepdogState *s;
+        BDRVSheepdogState *base;
         BlockDriver *drv;
 
         /* Currently, only Sheepdog backing image is supported. */
@@ -1506,27 +1696,28 @@ static int sd_create(const char *filename, QEMUOptionParameter *options,
             goto out;
         }
 
-        ret = bdrv_file_open(&bs, backing_file, NULL, 0, &local_err);
+        bs = NULL;
+        ret = bdrv_open(&bs, backing_file, NULL, NULL, BDRV_O_PROTOCOL, NULL,
+                        &local_err);
         if (ret < 0) {
             qerror_report_err(local_err);
             error_free(local_err);
             goto out;
         }
 
-        s = bs->opaque;
+        base = bs->opaque;
 
-        if (!is_snapshot(&s->inode)) {
+        if (!is_snapshot(&base->inode)) {
             error_report("cannot clone from a non snapshot vdi");
             bdrv_unref(bs);
             ret = -EINVAL;
             goto out;
         }
-
-        base_vid = s->inode.vdi_id;
+        s->inode.vdi_id = base->inode.vdi_id;
         bdrv_unref(bs);
     }
 
-    ret = do_sd_create(s, vdi, vdi_size, base_vid, &vid, 0);
+    ret = do_sd_create(s, &vid, 0);
     if (!prealloc || ret) {
         goto out;
     }
@@ -1555,7 +1746,7 @@ static void sd_close(BlockDriverState *bs)
     memset(&hdr, 0, sizeof(hdr));
 
     hdr.opcode = SD_OP_RELEASE_VDI;
-    hdr.vdi_id = s->inode.vdi_id;
+    hdr.base_vdi_id = s->inode.vdi_id;
     wlen = strlen(s->name) + 1;
     hdr.data_length = wlen;
     hdr.flags = SD_FLAG_CMD_WRITE;
@@ -1621,7 +1812,6 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset)
  */
 static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
 {
-    int ret;
     BDRVSheepdogState *s = acb->common.bs->opaque;
     struct iovec iov;
     AIOReq *aio_req;
@@ -1643,18 +1833,13 @@ static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
         aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
                                 data_len, offset, 0, 0, offset);
         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
-        ret = add_aio_request(s, aio_req, &iov, 1, false, AIOCB_WRITE_UDATA);
-        if (ret) {
-            free_aio_req(s, aio_req);
-            acb->ret = -EIO;
-            goto out;
-        }
+        add_aio_request(s, aio_req, &iov, 1, false, AIOCB_WRITE_UDATA);
 
         acb->aio_done_func = sd_finish_aiocb;
         acb->aiocb_type = AIOCB_WRITE_UDATA;
         return;
     }
-out:
+
     sd_finish_aiocb(acb);
 }
 
@@ -1664,7 +1849,7 @@ static bool sd_delete(BDRVSheepdogState *s)
     unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
     SheepdogVdiReq hdr = {
         .opcode = SD_OP_DEL_VDI,
-        .vdi_id = s->inode.vdi_id,
+        .base_vdi_id = s->inode.vdi_id,
         .data_length = wlen,
         .flags = SD_FLAG_CMD_WRITE,
     };
@@ -1711,12 +1896,11 @@ static int sd_create_branch(BDRVSheepdogState *s)
 
     /*
      * Even If deletion fails, we will just create extra snapshot based on
-     * the workding VDI which was supposed to be deleted. So no need to
+     * the working VDI which was supposed to be deleted. So no need to
      * false bail out.
      */
     deleted = sd_delete(s);
-    ret = do_sd_create(s, s->name, s->inode.vdi_size, s->inode.vdi_id, &vid,
-                       !deleted);
+    ret = do_sd_create(s, &vid, !deleted);
     if (ret) {
         goto out;
     }
@@ -1840,35 +2024,16 @@ static int coroutine_fn sd_co_rw_vector(void *p)
         }
 
         aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid, done);
+        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
 
         if (create) {
-            AIOReq *areq;
-            QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
-                if (areq->oid == oid) {
-                    /*
-                     * Sheepdog cannot handle simultaneous create
-                     * requests to the same object.  So we cannot send
-                     * the request until the previous request
-                     * finishes.
-                     */
-                    aio_req->flags = 0;
-                    aio_req->base_oid = 0;
-                    QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req,
-                                      aio_siblings);
-                    goto done;
-                }
+            if (check_simultaneous_create(s, aio_req)) {
+                goto done;
             }
         }
 
-        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
-        ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
-                              create, acb->aiocb_type);
-        if (ret < 0) {
-            error_report("add_aio_request is failed");
-            free_aio_req(s, aio_req);
-            acb->ret = -EIO;
-            goto out;
-        }
+        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov, create,
+                        acb->aiocb_type);
     done:
         offset = 0;
         idx++;
@@ -1886,13 +2051,14 @@ static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
 {
     SheepdogAIOCB *acb;
     int ret;
+    int64_t offset = (sector_num + nb_sectors) * BDRV_SECTOR_SIZE;
+    BDRVSheepdogState *s = bs->opaque;
 
-    if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
-        ret = sd_truncate(bs, (sector_num + nb_sectors) * BDRV_SECTOR_SIZE);
+    if (bs->growable && offset > s->inode.vdi_size) {
+        ret = sd_truncate(bs, offset);
         if (ret < 0) {
             return ret;
         }
-        bs->total_sectors = sector_num + nb_sectors;
     }
 
     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
@@ -1936,7 +2102,6 @@ static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
     BDRVSheepdogState *s = bs->opaque;
     SheepdogAIOCB *acb;
     AIOReq *aio_req;
-    int ret;
 
     if (s->cache_flags != SD_FLAG_CMD_CACHE) {
         return 0;
@@ -1949,13 +2114,7 @@ static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
     aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
                             0, 0, 0, 0, 0);
     QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
-    ret = add_aio_request(s, aio_req, NULL, 0, false, acb->aiocb_type);
-    if (ret < 0) {
-        error_report("add_aio_request is failed");
-        free_aio_req(s, aio_req);
-        qemu_aio_release(acb);
-        return ret;
-    }
+    add_aio_request(s, aio_req, NULL, 0, false, acb->aiocb_type);
 
     qemu_coroutine_yield();
     return acb->ret;
@@ -2005,8 +2164,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
         goto cleanup;
     }
 
-    ret = do_sd_create(s, s->name, s->inode.vdi_size, s->inode.vdi_id, &new_vid,
-                       1);
+    ret = do_sd_create(s, &new_vid, 1);
     if (ret < 0) {
         error_report("failed to create inode for snapshot. %s",
                      strerror(errno));
@@ -2036,7 +2194,7 @@ cleanup:
  * We implement rollback(loadvm) operation to the specified snapshot by
  * 1) switch to the snapshot
  * 2) rely on sd_create_branch to delete working VDI and
- * 3) create a new working VDI based on the speicified snapshot
+ * 3) create a new working VDI based on the specified snapshot
  */
 static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
 {
@@ -2158,8 +2316,8 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
             sn_tab[found].vm_state_size = inode.vm_state_size;
             sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
 
-            snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str), "%u",
-                     inode.snap_id);
+            snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str),
+                     "%" PRIu32, inode.snap_id);
             pstrcpy(sn_tab[found].name,
                     MIN(sizeof(sn_tab[found].name), sizeof(inode.tag)),
                     inode.tag);
@@ -2287,11 +2445,12 @@ sd_co_get_block_status(BlockDriverState *bs, int64_t sector_num, int nb_sectors,
 {
     BDRVSheepdogState *s = bs->opaque;
     SheepdogInode *inode = &s->inode;
-    unsigned long start = sector_num * BDRV_SECTOR_SIZE / SD_DATA_OBJ_SIZE,
+    uint64_t offset = sector_num * BDRV_SECTOR_SIZE;
+    unsigned long start = offset / SD_DATA_OBJ_SIZE,
                   end = DIV_ROUND_UP((sector_num + nb_sectors) *
                                      BDRV_SECTOR_SIZE, SD_DATA_OBJ_SIZE);
     unsigned long idx;
-    int64_t ret = BDRV_BLOCK_DATA;
+    int64_t ret = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID | offset;
 
     for (idx = start; idx < end; idx++) {
         if (inode->data_vdi_id[idx] == 0) {
@@ -2315,6 +2474,22 @@ sd_co_get_block_status(BlockDriverState *bs, int64_t sector_num, int nb_sectors,
     return ret;
 }
 
+static int64_t sd_get_allocated_file_size(BlockDriverState *bs)
+{
+    BDRVSheepdogState *s = bs->opaque;
+    SheepdogInode *inode = &s->inode;
+    unsigned long i, last = DIV_ROUND_UP(inode->vdi_size, SD_DATA_OBJ_SIZE);
+    uint64_t size = 0;
+
+    for (i = 0; i < last; i++) {
+        if (inode->data_vdi_id[i] == 0) {
+            continue;
+        }
+        size += SD_DATA_OBJ_SIZE;
+    }
+    return size;
+}
+
 static QEMUOptionParameter sd_create_options[] = {
     {
         .name = BLOCK_OPT_SIZE,
@@ -2331,6 +2506,11 @@ static QEMUOptionParameter sd_create_options[] = {
         .type = OPT_STRING,
         .help = "Preallocation mode (allowed values: off, full)"
     },
+    {
+        .name = BLOCK_OPT_REDUNDANCY,
+        .type = OPT_STRING,
+        .help = "Redundancy of the image"
+    },
     { NULL }
 };
 
@@ -2344,6 +2524,7 @@ static BlockDriver bdrv_sheepdog = {
     .bdrv_create    = sd_create,
     .bdrv_has_zero_init = bdrv_has_zero_init_1,
     .bdrv_getlength = sd_getlength,
+    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
     .bdrv_truncate  = sd_truncate,
 
     .bdrv_co_readv  = sd_co_readv,
@@ -2373,6 +2554,7 @@ static BlockDriver bdrv_sheepdog_tcp = {
     .bdrv_create    = sd_create,
     .bdrv_has_zero_init = bdrv_has_zero_init_1,
     .bdrv_getlength = sd_getlength,
+    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
     .bdrv_truncate  = sd_truncate,
 
     .bdrv_co_readv  = sd_co_readv,
@@ -2402,6 +2584,7 @@ static BlockDriver bdrv_sheepdog_unix = {
     .bdrv_create    = sd_create,
     .bdrv_has_zero_init = bdrv_has_zero_init_1,
     .bdrv_getlength = sd_getlength,
+    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
     .bdrv_truncate  = sd_truncate,
 
     .bdrv_co_readv  = sd_co_readv,