]> git.proxmox.com Git - qemu.git/blobdiff - block/sheepdog.c
target-i386: yield to another VCPU on PAUSE
[qemu.git] / block / sheepdog.c
index bfa8a00ddaa23f6bb25ad0e86a2466b1083507f7..ef387de71f4d9e1d10e9fbe3a8fbe92861c4d600 100644 (file)
 #define SD_PROTO_VER 0x01
 
 #define SD_DEFAULT_ADDR "localhost"
-#define SD_DEFAULT_PORT "7000"
+#define SD_DEFAULT_PORT 7000
 
 #define SD_OP_CREATE_AND_WRITE_OBJ  0x01
 #define SD_OP_READ_OBJ       0x02
 #define SD_OP_WRITE_OBJ      0x03
+/* 0x04 is used internally by Sheepdog */
+#define SD_OP_DISCARD_OBJ    0x05
 
 #define SD_OP_NEW_VDI        0x11
 #define SD_OP_LOCK_VDI       0x12
@@ -34,6 +36,7 @@
 #define SD_OP_GET_VDI_INFO   0x14
 #define SD_OP_READ_VDIS      0x15
 #define SD_OP_FLUSH_VDI      0x16
+#define SD_OP_DEL_VDI        0x17
 
 #define SD_FLAG_CMD_WRITE    0x01
 #define SD_FLAG_CMD_COW      0x02
@@ -65,6 +68,8 @@
 #define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
 #define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
 #define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
+#define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
+#define SD_RES_READONLY      0x1A /* Object is read-only */
 
 /*
  * Object ID rules
@@ -86,7 +91,6 @@
 #define SD_NR_VDIS   (1U << 24)
 #define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
 #define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
-#define SECTOR_SIZE 512
 
 #define SD_INODE_SIZE (sizeof(SheepdogInode))
 #define CURRENT_VDI_ID 0
@@ -121,8 +125,9 @@ typedef struct SheepdogObjReq {
     uint32_t data_length;
     uint64_t oid;
     uint64_t cow_oid;
-    uint32_t copies;
-    uint32_t rsvd;
+    uint8_t copies;
+    uint8_t copy_policy;
+    uint8_t reserved[6];
     uint64_t offset;
 } SheepdogObjReq;
 
@@ -134,7 +139,9 @@ typedef struct SheepdogObjRsp {
     uint32_t id;
     uint32_t data_length;
     uint32_t result;
-    uint32_t copies;
+    uint8_t copies;
+    uint8_t copy_policy;
+    uint8_t reserved[2];
     uint32_t pad[6];
 } SheepdogObjRsp;
 
@@ -147,7 +154,9 @@ typedef struct SheepdogVdiReq {
     uint32_t data_length;
     uint64_t vdi_size;
     uint32_t vdi_id;
-    uint32_t copies;
+    uint8_t copies;
+    uint8_t copy_policy;
+    uint8_t reserved[2];
     uint32_t snapid;
     uint32_t pad[3];
 } SheepdogVdiReq;
@@ -218,6 +227,11 @@ static inline uint64_t data_oid_to_idx(uint64_t oid)
     return oid & (MAX_DATA_OBJS - 1);
 }
 
+static inline uint32_t oid_to_vid(uint64_t oid)
+{
+    return (oid & ~VDI_BIT) >> VDI_SPACE_SHIFT;
+}
+
 static inline uint64_t vid_to_vdi_oid(uint32_t vid)
 {
     return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
@@ -238,14 +252,14 @@ static inline bool is_snapshot(struct SheepdogInode *inode)
     return !!inode->snap_ctime;
 }
 
-#undef dprintf
+#undef DPRINTF
 #ifdef DEBUG_SDOG
-#define dprintf(fmt, args...)                                       \
+#define DPRINTF(fmt, args...)                                       \
     do {                                                            \
         fprintf(stdout, "%s %d: " fmt, __func__, __LINE__, ##args); \
     } while (0)
 #else
-#define dprintf(fmt, args...)
+#define DPRINTF(fmt, args...)
 #endif
 
 typedef struct SheepdogAIOCB SheepdogAIOCB;
@@ -268,6 +282,7 @@ enum AIOCBState {
     AIOCB_WRITE_UDATA,
     AIOCB_READ_UDATA,
     AIOCB_FLUSH_CACHE,
+    AIOCB_DISCARD_OBJ,
 };
 
 struct SheepdogAIOCB {
@@ -284,11 +299,14 @@ struct SheepdogAIOCB {
     Coroutine *coroutine;
     void (*aio_done_func)(SheepdogAIOCB *);
 
-    bool canceled;
+    bool cancelable;
+    bool *finished;
     int nr_pending;
 };
 
 typedef struct BDRVSheepdogState {
+    BlockDriverState *bs;
+
     SheepdogInode inode;
 
     uint32_t min_dirty_data_idx;
@@ -297,9 +315,10 @@ typedef struct BDRVSheepdogState {
     char name[SD_MAX_VDI_LEN];
     bool is_snapshot;
     uint32_t cache_flags;
+    bool discard_supported;
 
-    char *addr;
-    char *port;
+    char *host_spec;
+    bool is_unix;
     int fd;
 
     CoMutex lock;
@@ -307,8 +326,11 @@ typedef struct BDRVSheepdogState {
     Coroutine *co_recv;
 
     uint32_t aioreq_seq_num;
+
+    /* Every aio request must be linked to either of these queues. */
     QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
     QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head;
+    QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
 } BDRVSheepdogState;
 
 static const char * sd_strerror(int err)
@@ -344,6 +366,8 @@ static const char * sd_strerror(int err)
         {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
         {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
         {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
+        {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
+        {SD_RES_READONLY, "Object is read-only"},
     };
 
     for (i = 0; i < ARRAY_SIZE(errors); ++i) {
@@ -395,6 +419,7 @@ static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
 {
     SheepdogAIOCB *acb = aio_req->aiocb;
 
+    acb->cancelable = false;
     QLIST_REMOVE(aio_req, aio_siblings);
     g_free(aio_req);
 
@@ -403,23 +428,68 @@ static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
 
 static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
 {
-    if (!acb->canceled) {
-        qemu_coroutine_enter(acb->coroutine, NULL);
+    qemu_coroutine_enter(acb->coroutine, NULL);
+    if (acb->finished) {
+        *acb->finished = true;
     }
     qemu_aio_release(acb);
 }
 
+/*
+ * Check whether the specified acb can be canceled
+ *
+ * We can cancel aio when any request belonging to the acb is:
+ *  - Not processed by the sheepdog server.
+ *  - Not linked to the inflight queue.
+ */
+static bool sd_acb_cancelable(const SheepdogAIOCB *acb)
+{
+    BDRVSheepdogState *s = acb->common.bs->opaque;
+    AIOReq *aioreq;
+
+    if (!acb->cancelable) {
+        return false;
+    }
+
+    QLIST_FOREACH(aioreq, &s->inflight_aio_head, aio_siblings) {
+        if (aioreq->aiocb == acb) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
 static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
+    BDRVSheepdogState *s = acb->common.bs->opaque;
+    AIOReq *aioreq, *next;
+    bool finished = false;
+
+    acb->finished = &finished;
+    while (!finished) {
+        if (sd_acb_cancelable(acb)) {
+            /* Remove outstanding requests from pending and failed queues.  */
+            QLIST_FOREACH_SAFE(aioreq, &s->pending_aio_head, aio_siblings,
+                               next) {
+                if (aioreq->aiocb == acb) {
+                    free_aio_req(s, aioreq);
+                }
+            }
+            QLIST_FOREACH_SAFE(aioreq, &s->failed_aio_head, aio_siblings,
+                               next) {
+                if (aioreq->aiocb == acb) {
+                    free_aio_req(s, aioreq);
+                }
+            }
 
-    /*
-     * Sheepdog cannot cancel the requests which are already sent to
-     * the servers, so we just complete the request with -EIO here.
-     */
-    acb->ret = -EIO;
-    qemu_coroutine_enter(acb->coroutine, NULL);
-    acb->canceled = true;
+            assert(acb->nr_pending == 0);
+            sd_finish_aiocb(acb);
+            return;
+        }
+        qemu_aio_wait();
+    }
 }
 
 static const AIOCBInfo sd_aiocb_info = {
@@ -440,63 +510,39 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
     acb->nb_sectors = nb_sectors;
 
     acb->aio_done_func = NULL;
-    acb->canceled = false;
+    acb->cancelable = true;
+    acb->finished = NULL;
     acb->coroutine = qemu_coroutine_self();
     acb->ret = 0;
     acb->nr_pending = 0;
     return acb;
 }
 
-static int connect_to_sdog(const char *addr, const char *port)
+static int connect_to_sdog(BDRVSheepdogState *s)
 {
-    char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
-    int fd, ret;
-    struct addrinfo hints, *res, *res0;
-
-    if (!addr) {
-        addr = SD_DEFAULT_ADDR;
-        port = SD_DEFAULT_PORT;
-    }
-
-    memset(&hints, 0, sizeof(hints));
-    hints.ai_socktype = SOCK_STREAM;
-
-    ret = getaddrinfo(addr, port, &hints, &res0);
-    if (ret) {
-        error_report("unable to get address info %s, %s",
-                     addr, strerror(errno));
-        return -errno;
-    }
-
-    for (res = res0; res; res = res->ai_next) {
-        ret = getnameinfo(res->ai_addr, res->ai_addrlen, hbuf, sizeof(hbuf),
-                          sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV);
-        if (ret) {
-            continue;
-        }
+    int fd;
+    Error *err = NULL;
 
-        fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
-        if (fd < 0) {
-            continue;
-        }
+    if (s->is_unix) {
+        fd = unix_connect(s->host_spec, &err);
+    } else {
+        fd = inet_connect(s->host_spec, &err);
 
-    reconnect:
-        ret = connect(fd, res->ai_addr, res->ai_addrlen);
-        if (ret < 0) {
-            if (errno == EINTR) {
-                goto reconnect;
+        if (err == NULL) {
+            int ret = socket_set_nodelay(fd);
+            if (ret < 0) {
+                error_report("%s", strerror(errno));
             }
-            close(fd);
-            break;
         }
+    }
 
-        dprintf("connected to %s:%s\n", addr, port);
-        goto success;
+    if (err != NULL) {
+        qerror_report_err(err);
+        error_free(err);
+    } else {
+        qemu_set_nonblock(fd);
     }
-    fd = -errno;
-    error_report("failed connect to %s:%s", addr, port);
-success:
-    freeaddrinfo(res0);
+
     return fd;
 }
 
@@ -506,13 +552,13 @@ static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
     int ret;
 
     ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
-    if (ret < sizeof(*hdr)) {
+    if (ret != sizeof(*hdr)) {
         error_report("failed to send a req, %s", strerror(errno));
         return ret;
     }
 
     ret = qemu_co_send(sockfd, data, *wlen);
-    if (ret < *wlen) {
+    if (ret != *wlen) {
         error_report("failed to send a req, %s", strerror(errno));
     }
 
@@ -548,18 +594,17 @@ static coroutine_fn void do_co_req(void *opaque)
     unsigned int *rlen = srco->rlen;
 
     co = qemu_coroutine_self();
-    qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, NULL, co);
+    qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, co);
 
-    socket_set_block(sockfd);
     ret = send_co_req(sockfd, hdr, data, wlen);
     if (ret < 0) {
         goto out;
     }
 
-    qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, NULL, co);
+    qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, co);
 
     ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
-    if (ret < sizeof(*hdr)) {
+    if (ret != sizeof(*hdr)) {
         error_report("failed to get a rsp, %s", strerror(errno));
         ret = -errno;
         goto out;
@@ -571,7 +616,7 @@ static coroutine_fn void do_co_req(void *opaque)
 
     if (*rlen) {
         ret = qemu_co_recv(sockfd, data, *rlen);
-        if (ret < *rlen) {
+        if (ret != *rlen) {
             error_report("failed to get the data, %s", strerror(errno));
             ret = -errno;
             goto out;
@@ -579,8 +624,9 @@ static coroutine_fn void do_co_req(void *opaque)
     }
     ret = 0;
 out:
-    qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL, NULL);
-    socket_set_nonblock(sockfd);
+    /* there is at most one request for this sockfd, so it is safe to
+     * set each handler to NULL. */
+    qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL);
 
     srco->ret = ret;
     srco->finished = true;
@@ -613,10 +659,13 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
     return srco.ret;
 }
 
-static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
+static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
                            struct iovec *iov, int niov, bool create,
                            enum AIOCBState aiocb_type);
-
+static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
+static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
+static int get_sheep_fd(BDRVSheepdogState *s);
+static void co_write_request(void *opaque);
 
 static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid)
 {
@@ -639,22 +688,59 @@ static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid)
 {
     AIOReq *aio_req;
     SheepdogAIOCB *acb;
-    int ret;
 
     while ((aio_req = find_pending_req(s, oid)) != NULL) {
         acb = aio_req->aiocb;
         /* move aio_req from pending list to inflight one */
         QLIST_REMOVE(aio_req, aio_siblings);
         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
-        ret = add_aio_request(s, aio_req, acb->qiov->iov,
-                              acb->qiov->niov, false, acb->aiocb_type);
-        if (ret < 0) {
-            error_report("add_aio_request is failed");
-            free_aio_req(s, aio_req);
-            if (!acb->nr_pending) {
-                sd_finish_aiocb(acb);
-            }
+        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov, false,
+                        acb->aiocb_type);
+    }
+}
+
+static coroutine_fn void reconnect_to_sdog(void *opaque)
+{
+    BDRVSheepdogState *s = opaque;
+    AIOReq *aio_req, *next;
+
+    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL);
+    close(s->fd);
+    s->fd = -1;
+
+    /* Wait for outstanding write requests to be completed. */
+    while (s->co_send != NULL) {
+        co_write_request(opaque);
+    }
+
+    /* Try to reconnect the sheepdog server every one second. */
+    while (s->fd < 0) {
+        s->fd = get_sheep_fd(s);
+        if (s->fd < 0) {
+            DPRINTF("Wait for connection to be established\n");
+            co_aio_sleep_ns(bdrv_get_aio_context(s->bs), QEMU_CLOCK_REALTIME,
+                            1000000000ULL);
         }
+    };
+
+    /*
+     * Now we have to resend all the request in the inflight queue.  However,
+     * resend_aioreq() can yield and newly created requests can be added to the
+     * inflight queue before the coroutine is resumed.  To avoid mixing them, we
+     * have to move all the inflight requests to the failed queue before
+     * resend_aioreq() is called.
+     */
+    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
+        QLIST_REMOVE(aio_req, aio_siblings);
+        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
+    }
+
+    /* Resend all the failed aio requests. */
+    while (!QLIST_EMPTY(&s->failed_aio_head)) {
+        aio_req = QLIST_FIRST(&s->failed_aio_head);
+        QLIST_REMOVE(aio_req, aio_siblings);
+        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
+        resend_aioreq(s, aio_req);
     }
 }
 
@@ -672,17 +758,13 @@ static void coroutine_fn aio_read_response(void *opaque)
     int ret;
     AIOReq *aio_req = NULL;
     SheepdogAIOCB *acb;
-    unsigned long idx;
-
-    if (QLIST_EMPTY(&s->inflight_aio_head)) {
-        goto out;
-    }
+    uint64_t idx;
 
     /* read a header */
     ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
-    if (ret < 0) {
+    if (ret != sizeof(rsp)) {
         error_report("failed to get the header, %s", strerror(errno));
-        goto out;
+        goto err;
     }
 
     /* find the right aio_req from the inflight aio list */
@@ -693,7 +775,7 @@ static void coroutine_fn aio_read_response(void *opaque)
     }
     if (!aio_req) {
         error_report("cannot find aio_req %x", rsp.id);
-        goto out;
+        goto err;
     }
 
     acb = aio_req->aiocb;
@@ -731,23 +813,57 @@ static void coroutine_fn aio_read_response(void *opaque)
     case AIOCB_READ_UDATA:
         ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
                             aio_req->iov_offset, rsp.data_length);
-        if (ret < 0) {
+        if (ret != rsp.data_length) {
             error_report("failed to get the data, %s", strerror(errno));
-            goto out;
+            goto err;
         }
         break;
     case AIOCB_FLUSH_CACHE:
         if (rsp.result == SD_RES_INVALID_PARMS) {
-            dprintf("disable cache since the server doesn't support it\n");
+            DPRINTF("disable cache since the server doesn't support it\n");
             s->cache_flags = SD_FLAG_CMD_DIRECT;
             rsp.result = SD_RES_SUCCESS;
         }
         break;
+    case AIOCB_DISCARD_OBJ:
+        switch (rsp.result) {
+        case SD_RES_INVALID_PARMS:
+            error_report("sheep(%s) doesn't support discard command",
+                         s->host_spec);
+            rsp.result = SD_RES_SUCCESS;
+            s->discard_supported = false;
+            break;
+        case SD_RES_SUCCESS:
+            idx = data_oid_to_idx(aio_req->oid);
+            s->inode.data_vdi_id[idx] = 0;
+            break;
+        default:
+            break;
+        }
     }
 
-    if (rsp.result != SD_RES_SUCCESS) {
+    switch (rsp.result) {
+    case SD_RES_SUCCESS:
+        break;
+    case SD_RES_READONLY:
+        if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
+            ret = reload_inode(s, 0, "");
+            if (ret < 0) {
+                goto err;
+            }
+        }
+        if (is_data_obj(aio_req->oid)) {
+            aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
+                                           data_oid_to_idx(aio_req->oid));
+        } else {
+            aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
+        }
+        resend_aioreq(s, aio_req);
+        goto out;
+    default:
         acb->ret = -EIO;
         error_report("%s", sd_strerror(rsp.result));
+        break;
     }
 
     free_aio_req(s, aio_req);
@@ -760,6 +876,10 @@ static void coroutine_fn aio_read_response(void *opaque)
     }
 out:
     s->co_recv = NULL;
+    return;
+err:
+    s->co_recv = NULL;
+    reconnect_to_sdog(opaque);
 }
 
 static void co_read_response(void *opaque)
@@ -780,14 +900,6 @@ static void co_write_request(void *opaque)
     qemu_coroutine_enter(s->co_send, NULL);
 }
 
-static int aio_flush_request(void *opaque)
-{
-    BDRVSheepdogState *s = opaque;
-
-    return !QLIST_EMPTY(&s->inflight_aio_head) ||
-        !QLIST_EMPTY(&s->pending_aio_head);
-}
-
 /*
  * Return a socket discriptor to read/write objects.
  *
@@ -796,24 +908,14 @@ static int aio_flush_request(void *opaque)
  */
 static int get_sheep_fd(BDRVSheepdogState *s)
 {
-    int ret, fd;
+    int fd;
 
-    fd = connect_to_sdog(s->addr, s->port);
+    fd = connect_to_sdog(s);
     if (fd < 0) {
-        error_report("%s", strerror(errno));
         return fd;
     }
 
-    socket_set_nonblock(fd);
-
-    ret = socket_set_nodelay(fd);
-    if (ret) {
-        error_report("%s", strerror(errno));
-        closesocket(fd);
-        return -errno;
-    }
-
-    qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request, s);
+    qemu_aio_set_fd_handler(fd, co_read_response, NULL, s);
     return fd;
 }
 
@@ -829,18 +931,41 @@ static int sd_parse_uri(BDRVSheepdogState *s, const char *filename,
         return -EINVAL;
     }
 
+    /* transport */
+    if (!strcmp(uri->scheme, "sheepdog")) {
+        s->is_unix = false;
+    } else if (!strcmp(uri->scheme, "sheepdog+tcp")) {
+        s->is_unix = false;
+    } else if (!strcmp(uri->scheme, "sheepdog+unix")) {
+        s->is_unix = true;
+    } else {
+        ret = -EINVAL;
+        goto out;
+    }
+
     if (uri->path == NULL || !strcmp(uri->path, "/")) {
         ret = -EINVAL;
         goto out;
     }
     pstrcpy(vdi, SD_MAX_VDI_LEN, uri->path + 1);
 
-    /* sheepdog[+tcp]://[host:port]/vdiname */
-    s->addr = g_strdup(uri->server ?: SD_DEFAULT_ADDR);
-    if (uri->port) {
-        s->port = g_strdup_printf("%d", uri->port);
+    qp = query_params_parse(uri->query);
+    if (qp->n > 1 || (s->is_unix && !qp->n) || (!s->is_unix && qp->n)) {
+        ret = -EINVAL;
+        goto out;
+    }
+
+    if (s->is_unix) {
+        /* sheepdog+unix:///vdiname?socket=path */
+        if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
+            ret = -EINVAL;
+            goto out;
+        }
+        s->host_spec = g_strdup(qp->p[0].value);
     } else {
-        s->port = g_strdup(SD_DEFAULT_PORT);
+        /* sheepdog[+tcp]://[host:port]/vdiname */
+        s->host_spec = g_strdup_printf("%s:%d", uri->server ?: SD_DEFAULT_ADDR,
+                                       uri->port ?: SD_DEFAULT_PORT);
     }
 
     /* snapshot tag */
@@ -926,8 +1051,9 @@ static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
     return ret;
 }
 
-static int find_vdi_name(BDRVSheepdogState *s, char *filename, uint32_t snapid,
-                         char *tag, uint32_t *vid, int for_snapshot)
+static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
+                         uint32_t snapid, const char *tag, uint32_t *vid,
+                         bool lock)
 {
     int ret, fd;
     SheepdogVdiReq hdr;
@@ -935,7 +1061,7 @@ static int find_vdi_name(BDRVSheepdogState *s, char *filename, uint32_t snapid,
     unsigned int wlen, rlen = 0;
     char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
 
-    fd = connect_to_sdog(s->addr, s->port);
+    fd = connect_to_sdog(s);
     if (fd < 0) {
         return fd;
     }
@@ -948,10 +1074,10 @@ static int find_vdi_name(BDRVSheepdogState *s, char *filename, uint32_t snapid,
     strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
 
     memset(&hdr, 0, sizeof(hdr));
-    if (for_snapshot) {
-        hdr.opcode = SD_OP_GET_VDI_INFO;
-    } else {
+    if (lock) {
         hdr.opcode = SD_OP_LOCK_VDI;
+    } else {
+        hdr.opcode = SD_OP_GET_VDI_INFO;
     }
     wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
     hdr.proto_ver = SD_PROTO_VER;
@@ -982,7 +1108,7 @@ out:
     return ret;
 }
 
-static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
+static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
                            struct iovec *iov, int niov, bool create,
                            enum AIOCBState aiocb_type)
 {
@@ -1019,6 +1145,9 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
         wlen = datalen;
         hdr.flags = SD_FLAG_CMD_WRITE | flags;
         break;
+    case AIOCB_DISCARD_OBJ:
+        hdr.opcode = SD_OP_DISCARD_OBJ;
+        break;
     }
 
     if (s->cache_flags) {
@@ -1036,36 +1165,30 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
 
     qemu_co_mutex_lock(&s->lock);
     s->co_send = qemu_coroutine_self();
-    qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request,
-                            aio_flush_request, s);
+    qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request, s);
     socket_set_cork(s->fd, 1);
 
     /* send a header */
     ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
-    if (ret < 0) {
-        qemu_co_mutex_unlock(&s->lock);
+    if (ret != sizeof(hdr)) {
         error_report("failed to send a req, %s", strerror(errno));
-        return -errno;
+        goto out;
     }
 
     if (wlen) {
         ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
-        if (ret < 0) {
-            qemu_co_mutex_unlock(&s->lock);
+        if (ret != wlen) {
             error_report("failed to send a data, %s", strerror(errno));
-            return -errno;
         }
     }
-
+out:
     socket_set_cork(s->fd, 0);
-    qemu_aio_set_fd_handler(s->fd, co_read_response, NULL,
-                            aio_flush_request, s);
+    qemu_aio_set_fd_handler(s->fd, co_read_response, NULL, s);
+    s->co_send = NULL;
     qemu_co_mutex_unlock(&s->lock);
-
-    return 0;
 }
 
-static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
+static int read_write_object(int fd, char *buf, uint64_t oid, uint8_t copies,
                              unsigned int datalen, uint64_t offset,
                              bool write, bool create, uint32_t cache_flags)
 {
@@ -1113,7 +1236,7 @@ static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
     }
 }
 
-static int read_object(int fd, char *buf, uint64_t oid, int copies,
+static int read_object(int fd, char *buf, uint64_t oid, uint8_t copies,
                        unsigned int datalen, uint64_t offset,
                        uint32_t cache_flags)
 {
@@ -1121,7 +1244,7 @@ static int read_object(int fd, char *buf, uint64_t oid, int copies,
                              false, cache_flags);
 }
 
-static int write_object(int fd, char *buf, uint64_t oid, int copies,
+static int write_object(int fd, char *buf, uint64_t oid, uint8_t copies,
                         unsigned int datalen, uint64_t offset, bool create,
                         uint32_t cache_flags)
 {
@@ -1129,7 +1252,116 @@ static int write_object(int fd, char *buf, uint64_t oid, int copies,
                              create, cache_flags);
 }
 
-static int sd_open(BlockDriverState *bs, const char *filename, int flags)
+/* update inode with the latest state */
+static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
+{
+    SheepdogInode *inode;
+    int ret = 0, fd;
+    uint32_t vid = 0;
+
+    fd = connect_to_sdog(s);
+    if (fd < 0) {
+        return -EIO;
+    }
+
+    inode = g_malloc(sizeof(s->inode));
+
+    ret = find_vdi_name(s, s->name, snapid, tag, &vid, false);
+    if (ret) {
+        goto out;
+    }
+
+    ret = read_object(fd, (char *)inode, vid_to_vdi_oid(vid),
+                      s->inode.nr_copies, sizeof(*inode), 0, s->cache_flags);
+    if (ret < 0) {
+        goto out;
+    }
+
+    if (inode->vdi_id != s->inode.vdi_id) {
+        memcpy(&s->inode, inode, sizeof(s->inode));
+    }
+
+out:
+    g_free(inode);
+    closesocket(fd);
+
+    return ret;
+}
+
+/* Return true if the specified request is linked to the pending list. */
+static bool check_simultaneous_create(BDRVSheepdogState *s, AIOReq *aio_req)
+{
+    AIOReq *areq;
+    QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
+        if (areq != aio_req && areq->oid == aio_req->oid) {
+            /*
+             * Sheepdog cannot handle simultaneous create requests to the same
+             * object, so we cannot send the request until the previous request
+             * finishes.
+             */
+            DPRINTF("simultaneous create to %" PRIx64 "\n", aio_req->oid);
+            aio_req->flags = 0;
+            aio_req->base_oid = 0;
+            QLIST_REMOVE(aio_req, aio_siblings);
+            QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req, aio_siblings);
+            return true;
+        }
+    }
+
+    return false;
+}
+
+static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
+{
+    SheepdogAIOCB *acb = aio_req->aiocb;
+    bool create = false;
+
+    /* check whether this request becomes a CoW one */
+    if (acb->aiocb_type == AIOCB_WRITE_UDATA && is_data_obj(aio_req->oid)) {
+        int idx = data_oid_to_idx(aio_req->oid);
+
+        if (is_data_obj_writable(&s->inode, idx)) {
+            goto out;
+        }
+
+        if (check_simultaneous_create(s, aio_req)) {
+            return;
+        }
+
+        if (s->inode.data_vdi_id[idx]) {
+            aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
+            aio_req->flags |= SD_FLAG_CMD_COW;
+        }
+        create = true;
+    }
+out:
+    if (is_data_obj(aio_req->oid)) {
+        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov, create,
+                        acb->aiocb_type);
+    } else {
+        struct iovec iov;
+        iov.iov_base = &s->inode;
+        iov.iov_len = sizeof(s->inode);
+        add_aio_request(s, aio_req, &iov, 1, false, AIOCB_WRITE_UDATA);
+    }
+}
+
+/* TODO Convert to fine grained options */
+static QemuOptsList runtime_opts = {
+    .name = "sheepdog",
+    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
+    .desc = {
+        {
+            .name = "filename",
+            .type = QEMU_OPT_STRING,
+            .help = "URL to the sheepdog image",
+        },
+        { /* end of list */ }
+    },
+};
+
+static int sd_open(BlockDriverState *bs, QDict *options, int flags,
+                   Error **errp)
 {
     int ret, fd;
     uint32_t vid = 0;
@@ -1137,9 +1369,26 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
     char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
     uint32_t snapid;
     char *buf = NULL;
+    QemuOpts *opts;
+    Error *local_err = NULL;
+    const char *filename;
+
+    s->bs = bs;
+
+    opts = qemu_opts_create_nofail(&runtime_opts);
+    qemu_opts_absorb_qdict(opts, options, &local_err);
+    if (error_is_set(&local_err)) {
+        qerror_report_err(local_err);
+        error_free(local_err);
+        ret = -EINVAL;
+        goto out;
+    }
+
+    filename = qemu_opt_get(opts, "filename");
 
     QLIST_INIT(&s->inflight_aio_head);
     QLIST_INIT(&s->pending_aio_head);
+    QLIST_INIT(&s->failed_aio_head);
     s->fd = -1;
 
     memset(vdi, 0, sizeof(vdi));
@@ -1159,7 +1408,7 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
         goto out;
     }
 
-    ret = find_vdi_name(s, vdi, snapid, tag, &vid, 0);
+    ret = find_vdi_name(s, vdi, snapid, tag, &vid, true);
     if (ret) {
         goto out;
     }
@@ -1172,15 +1421,15 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
     if (flags & BDRV_O_NOCACHE) {
         s->cache_flags = SD_FLAG_CMD_DIRECT;
     }
+    s->discard_supported = true;
 
     if (snapid || tag[0] != '\0') {
-        dprintf("%" PRIx32 " snapshot inode was open.\n", vid);
+        DPRINTF("%" PRIx32 " snapshot inode was open.\n", vid);
         s->is_snapshot = true;
     }
 
-    fd = connect_to_sdog(s->addr, s->port);
+    fd = connect_to_sdog(s);
     if (fd < 0) {
-        error_report("failed to connect");
         ret = fd;
         goto out;
     }
@@ -1199,23 +1448,25 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
     s->min_dirty_data_idx = UINT32_MAX;
     s->max_dirty_data_idx = 0;
 
-    bs->total_sectors = s->inode.vdi_size / SECTOR_SIZE;
+    bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
     pstrcpy(s->name, sizeof(s->name), vdi);
     qemu_co_mutex_init(&s->lock);
+    qemu_opts_del(opts);
     g_free(buf);
     return 0;
 out:
-    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL);
+    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL);
     if (s->fd >= 0) {
         closesocket(s->fd);
     }
+    qemu_opts_del(opts);
     g_free(buf);
     return ret;
 }
 
-static int do_sd_create(char *filename, int64_t vdi_size,
+static int do_sd_create(BDRVSheepdogState *s, char *filename, int64_t vdi_size,
                         uint32_t base_vid, uint32_t *vdi_id, int snapshot,
-                        const char *addr, const char *port)
+                        uint8_t copy_policy)
 {
     SheepdogVdiReq hdr;
     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
@@ -1223,7 +1474,7 @@ static int do_sd_create(char *filename, int64_t vdi_size,
     unsigned int wlen, rlen = 0;
     char buf[SD_MAX_VDI_LEN];
 
-    fd = connect_to_sdog(addr, port);
+    fd = connect_to_sdog(s);
     if (fd < 0) {
         return fd;
     }
@@ -1245,6 +1496,7 @@ static int do_sd_create(char *filename, int64_t vdi_size,
 
     hdr.data_length = wlen;
     hdr.vdi_size = vdi_size;
+    hdr.copy_policy = copy_policy;
 
     ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
 
@@ -1272,10 +1524,13 @@ static int sd_prealloc(const char *filename)
     uint32_t idx, max_idx;
     int64_t vdi_size;
     void *buf = g_malloc0(SD_DATA_OBJ_SIZE);
+    Error *local_err = NULL;
     int ret;
 
-    ret = bdrv_file_open(&bs, filename, BDRV_O_RDWR);
+    ret = bdrv_file_open(&bs, filename, NULL, BDRV_O_RDWR, &local_err);
     if (ret < 0) {
+        qerror_report_err(local_err);
+        error_free(local_err);
         goto out;
     }
 
@@ -1302,14 +1557,15 @@ static int sd_prealloc(const char *filename)
     }
 out:
     if (bs) {
-        bdrv_delete(bs);
+        bdrv_unref(bs);
     }
     g_free(buf);
 
     return ret;
 }
 
-static int sd_create(const char *filename, QEMUOptionParameter *options)
+static int sd_create(const char *filename, QEMUOptionParameter *options,
+                     Error **errp)
 {
     int ret = 0;
     uint32_t vid = 0, base_vid = 0;
@@ -1319,6 +1575,7 @@ static int sd_create(const char *filename, QEMUOptionParameter *options)
     char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
     uint32_t snapid;
     bool prealloc = false;
+    Error *local_err = NULL;
 
     s = g_malloc0(sizeof(BDRVSheepdogState));
 
@@ -1365,15 +1622,17 @@ static int sd_create(const char *filename, QEMUOptionParameter *options)
         BlockDriver *drv;
 
         /* Currently, only Sheepdog backing image is supported. */
-        drv = bdrv_find_protocol(backing_file);
+        drv = bdrv_find_protocol(backing_file, true);
         if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
             error_report("backing_file must be a sheepdog image");
             ret = -EINVAL;
             goto out;
         }
 
-        ret = bdrv_file_open(&bs, backing_file, 0);
+        ret = bdrv_file_open(&bs, backing_file, NULL, 0, &local_err);
         if (ret < 0) {
+            qerror_report_err(local_err);
+            error_free(local_err);
             goto out;
         }
 
@@ -1381,16 +1640,17 @@ static int sd_create(const char *filename, QEMUOptionParameter *options)
 
         if (!is_snapshot(&s->inode)) {
             error_report("cannot clone from a non snapshot vdi");
-            bdrv_delete(bs);
+            bdrv_unref(bs);
             ret = -EINVAL;
             goto out;
         }
 
         base_vid = s->inode.vdi_id;
-        bdrv_delete(bs);
+        bdrv_unref(bs);
     }
 
-    ret = do_sd_create(vdi, vdi_size, base_vid, &vid, 0, s->addr, s->port);
+    /* TODO: allow users to specify copy number */
+    ret = do_sd_create(s, vdi, vdi_size, base_vid, &vid, 0, 0);
     if (!prealloc || ret) {
         goto out;
     }
@@ -1409,9 +1669,9 @@ static void sd_close(BlockDriverState *bs)
     unsigned int wlen, rlen = 0;
     int fd, ret;
 
-    dprintf("%s\n", s->name);
+    DPRINTF("%s\n", s->name);
 
-    fd = connect_to_sdog(s->addr, s->port);
+    fd = connect_to_sdog(s);
     if (fd < 0) {
         return;
     }
@@ -1433,10 +1693,9 @@ static void sd_close(BlockDriverState *bs)
         error_report("%s, %s", sd_strerror(rsp->result), s->name);
     }
 
-    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL);
+    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL);
     closesocket(s->fd);
-    g_free(s->addr);
-    g_free(s->port);
+    g_free(s->host_spec);
 }
 
 static int64_t sd_getlength(BlockDriverState *bs)
@@ -1460,7 +1719,7 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset)
         return -EINVAL;
     }
 
-    fd = connect_to_sdog(s->addr, s->port);
+    fd = connect_to_sdog(s);
     if (fd < 0) {
         return fd;
     }
@@ -1486,7 +1745,6 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset)
  */
 static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
 {
-    int ret;
     BDRVSheepdogState *s = acb->common.bs->opaque;
     struct iovec iov;
     AIOReq *aio_req;
@@ -1508,21 +1766,53 @@ static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
         aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
                                 data_len, offset, 0, 0, offset);
         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
-        ret = add_aio_request(s, aio_req, &iov, 1, false, AIOCB_WRITE_UDATA);
-        if (ret) {
-            free_aio_req(s, aio_req);
-            acb->ret = -EIO;
-            goto out;
-        }
+        add_aio_request(s, aio_req, &iov, 1, false, AIOCB_WRITE_UDATA);
 
         acb->aio_done_func = sd_finish_aiocb;
         acb->aiocb_type = AIOCB_WRITE_UDATA;
         return;
     }
-out:
+
     sd_finish_aiocb(acb);
 }
 
+/* Delete current working VDI on the snapshot chain */
+static bool sd_delete(BDRVSheepdogState *s)
+{
+    unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
+    SheepdogVdiReq hdr = {
+        .opcode = SD_OP_DEL_VDI,
+        .vdi_id = s->inode.vdi_id,
+        .data_length = wlen,
+        .flags = SD_FLAG_CMD_WRITE,
+    };
+    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
+    int fd, ret;
+
+    fd = connect_to_sdog(s);
+    if (fd < 0) {
+        return false;
+    }
+
+    ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen);
+    closesocket(fd);
+    if (ret) {
+        return false;
+    }
+    switch (rsp->result) {
+    case SD_RES_NO_VDI:
+        error_report("%s was already deleted", s->name);
+        /* fall through */
+    case SD_RES_SUCCESS:
+        break;
+    default:
+        error_report("%s, %s", sd_strerror(rsp->result), s->name);
+        return false;
+    }
+
+    return true;
+}
+
 /*
  * Create a writable VDI from a snapshot
  */
@@ -1531,22 +1821,28 @@ static int sd_create_branch(BDRVSheepdogState *s)
     int ret, fd;
     uint32_t vid;
     char *buf;
+    bool deleted;
 
-    dprintf("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
+    DPRINTF("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
 
     buf = g_malloc(SD_INODE_SIZE);
 
-    ret = do_sd_create(s->name, s->inode.vdi_size, s->inode.vdi_id, &vid, 1,
-                       s->addr, s->port);
+    /*
+     * Even If deletion fails, we will just create extra snapshot based on
+     * the workding VDI which was supposed to be deleted. So no need to
+     * false bail out.
+     */
+    deleted = sd_delete(s);
+    ret = do_sd_create(s, s->name, s->inode.vdi_size, s->inode.vdi_id, &vid,
+                       !deleted, s->inode.copy_policy);
     if (ret) {
         goto out;
     }
 
-    dprintf("%" PRIx32 " is created.\n", vid);
+    DPRINTF("%" PRIx32 " is created.\n", vid);
 
-    fd = connect_to_sdog(s->addr, s->port);
+    fd = connect_to_sdog(s);
     if (fd < 0) {
-        error_report("failed to connect");
         ret = fd;
         goto out;
     }
@@ -1564,7 +1860,7 @@ static int sd_create_branch(BDRVSheepdogState *s)
 
     s->is_snapshot = false;
     ret = 0;
-    dprintf("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
+    DPRINTF("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
 
 out:
     g_free(buf);
@@ -1588,10 +1884,10 @@ static int coroutine_fn sd_co_rw_vector(void *p)
 {
     SheepdogAIOCB *acb = p;
     int ret = 0;
-    unsigned long len, done = 0, total = acb->nb_sectors * SECTOR_SIZE;
-    unsigned long idx = acb->sector_num * SECTOR_SIZE / SD_DATA_OBJ_SIZE;
+    unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
+    unsigned long idx = acb->sector_num * BDRV_SECTOR_SIZE / SD_DATA_OBJ_SIZE;
     uint64_t oid;
-    uint64_t offset = (acb->sector_num * SECTOR_SIZE) % SD_DATA_OBJ_SIZE;
+    uint64_t offset = (acb->sector_num * BDRV_SECTOR_SIZE) % SD_DATA_OBJ_SIZE;
     BDRVSheepdogState *s = acb->common.bs->opaque;
     SheepdogInode *inode = &s->inode;
     AIOReq *aio_req;
@@ -1640,48 +1936,38 @@ static int coroutine_fn sd_co_rw_vector(void *p)
                 flags = SD_FLAG_CMD_COW;
             }
             break;
+        case AIOCB_DISCARD_OBJ:
+            /*
+             * We discard the object only when the whole object is
+             * 1) allocated 2) trimmed. Otherwise, simply skip it.
+             */
+            if (len != SD_DATA_OBJ_SIZE || inode->data_vdi_id[idx] == 0) {
+                goto done;
+            }
+            break;
         default:
             break;
         }
 
         if (create) {
-            dprintf("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
+            DPRINTF("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
                     inode->vdi_id, oid,
                     vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
             oid = vid_to_data_oid(inode->vdi_id, idx);
-            dprintf("new oid %" PRIx64 "\n", oid);
+            DPRINTF("new oid %" PRIx64 "\n", oid);
         }
 
         aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid, done);
+        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
 
         if (create) {
-            AIOReq *areq;
-            QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
-                if (areq->oid == oid) {
-                    /*
-                     * Sheepdog cannot handle simultaneous create
-                     * requests to the same object.  So we cannot send
-                     * the request until the previous request
-                     * finishes.
-                     */
-                    aio_req->flags = 0;
-                    aio_req->base_oid = 0;
-                    QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req,
-                                      aio_siblings);
-                    goto done;
-                }
+            if (check_simultaneous_create(s, aio_req)) {
+                goto done;
             }
         }
 
-        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
-        ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
-                              create, acb->aiocb_type);
-        if (ret < 0) {
-            error_report("add_aio_request is failed");
-            free_aio_req(s, aio_req);
-            acb->ret = -EIO;
-            goto out;
-        }
+        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov, create,
+                        acb->aiocb_type);
     done:
         offset = 0;
         idx++;
@@ -1701,7 +1987,7 @@ static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
     int ret;
 
     if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
-        ret = sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE);
+        ret = sd_truncate(bs, (sector_num + nb_sectors) * BDRV_SECTOR_SIZE);
         if (ret < 0) {
             return ret;
         }
@@ -1749,7 +2035,6 @@ static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
     BDRVSheepdogState *s = bs->opaque;
     SheepdogAIOCB *acb;
     AIOReq *aio_req;
-    int ret;
 
     if (s->cache_flags != SD_FLAG_CMD_CACHE) {
         return 0;
@@ -1762,13 +2047,7 @@ static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
     aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
                             0, 0, 0, 0, 0);
     QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
-    ret = add_aio_request(s, aio_req, NULL, 0, false, acb->aiocb_type);
-    if (ret < 0) {
-        error_report("add_aio_request is failed");
-        free_aio_req(s, aio_req);
-        qemu_aio_release(acb);
-        return ret;
-    }
+    add_aio_request(s, aio_req, NULL, 0, false, acb->aiocb_type);
 
     qemu_coroutine_yield();
     return acb->ret;
@@ -1782,7 +2061,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
     SheepdogInode *inode;
     unsigned int datalen;
 
-    dprintf("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
+    DPRINTF("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
             "is_snapshot %d\n", sn_info->name, sn_info->id_str,
             s->name, sn_info->vm_state_size, s->is_snapshot);
 
@@ -1793,7 +2072,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
         return -EINVAL;
     }
 
-    dprintf("%s %s\n", sn_info->name, sn_info->id_str);
+    DPRINTF("%s %s\n", sn_info->name, sn_info->id_str);
 
     s->inode.vm_state_size = sn_info->vm_state_size;
     s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
@@ -1805,7 +2084,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
     datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
 
     /* refresh inode. */
-    fd = connect_to_sdog(s->addr, s->port);
+    fd = connect_to_sdog(s);
     if (fd < 0) {
         ret = fd;
         goto cleanup;
@@ -1818,8 +2097,8 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
         goto cleanup;
     }
 
-    ret = do_sd_create(s->name, s->inode.vdi_size, s->inode.vdi_id, &new_vid, 1,
-                       s->addr, s->port);
+    ret = do_sd_create(s, s->name, s->inode.vdi_size, s->inode.vdi_id, &new_vid,
+                       1, s->inode.copy_policy);
     if (ret < 0) {
         error_report("failed to create inode for snapshot. %s",
                      strerror(errno));
@@ -1837,7 +2116,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
     }
 
     memcpy(&s->inode, inode, datalen);
-    dprintf("s->inode: name %s snap_id %x oid %x\n",
+    DPRINTF("s->inode: name %s snap_id %x oid %x\n",
             s->inode.name, s->inode.snap_id, s->inode.vdi_id);
 
 cleanup:
@@ -1845,70 +2124,47 @@ cleanup:
     return ret;
 }
 
+/*
+ * We implement rollback(loadvm) operation to the specified snapshot by
+ * 1) switch to the snapshot
+ * 2) rely on sd_create_branch to delete working VDI and
+ * 3) create a new working VDI based on the speicified snapshot
+ */
 static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
 {
     BDRVSheepdogState *s = bs->opaque;
     BDRVSheepdogState *old_s;
-    char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
-    char *buf = NULL;
-    uint32_t vid;
+    char tag[SD_MAX_VDI_TAG_LEN];
     uint32_t snapid = 0;
-    int ret = 0, fd;
+    int ret = 0;
 
     old_s = g_malloc(sizeof(BDRVSheepdogState));
 
     memcpy(old_s, s, sizeof(BDRVSheepdogState));
 
-    pstrcpy(vdi, sizeof(vdi), s->name);
-
     snapid = strtoul(snapshot_id, NULL, 10);
     if (snapid) {
         tag[0] = 0;
     } else {
-        pstrcpy(tag, sizeof(tag), s->name);
+        pstrcpy(tag, sizeof(tag), snapshot_id);
     }
 
-    ret = find_vdi_name(s, vdi, snapid, tag, &vid, 1);
+    ret = reload_inode(s, snapid, tag);
     if (ret) {
-        error_report("Failed to find_vdi_name");
         goto out;
     }
 
-    fd = connect_to_sdog(s->addr, s->port);
-    if (fd < 0) {
-        error_report("failed to connect");
-        ret = fd;
-        goto out;
-    }
-
-    buf = g_malloc(SD_INODE_SIZE);
-    ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies,
-                      SD_INODE_SIZE, 0, s->cache_flags);
-
-    closesocket(fd);
-
+    ret = sd_create_branch(s);
     if (ret) {
         goto out;
     }
 
-    memcpy(&s->inode, buf, sizeof(s->inode));
-
-    if (!s->inode.vm_state_size) {
-        error_report("Invalid snapshot");
-        ret = -ENOENT;
-        goto out;
-    }
-
-    s->is_snapshot = true;
-
-    g_free(buf);
     g_free(old_s);
 
     return 0;
 out:
     /* recover bdrv_sd_state */
     memcpy(s, old_s, sizeof(BDRVSheepdogState));
-    g_free(buf);
     g_free(old_s);
 
     error_report("failed to open. recover old bdrv_sd_state.");
@@ -1916,7 +2172,10 @@ out:
     return ret;
 }
 
-static int sd_snapshot_delete(BlockDriverState *bs, const char *snapshot_id)
+static int sd_snapshot_delete(BlockDriverState *bs,
+                              const char *snapshot_id,
+                              const char *name,
+                              Error **errp)
 {
     /* FIXME: Delete specified snapshot id.  */
     return 0;
@@ -1938,7 +2197,7 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
 
     vdi_inuse = g_malloc(max);
 
-    fd = connect_to_sdog(s->addr, s->port);
+    fd = connect_to_sdog(s);
     if (fd < 0) {
         ret = fd;
         goto out;
@@ -1965,9 +2224,8 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
     hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
     start_nr = hval & (SD_NR_VDIS - 1);
 
-    fd = connect_to_sdog(s->addr, s->port);
+    fd = connect_to_sdog(s);
     if (fd < 0) {
-        error_report("failed to connect");
         ret = fd;
         goto out;
     }
@@ -2021,10 +2279,11 @@ static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
     int fd, ret = 0, remaining = size;
     unsigned int data_len;
     uint64_t vmstate_oid;
-    uint32_t vdi_index;
     uint64_t offset;
+    uint32_t vdi_index;
+    uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
 
-    fd = connect_to_sdog(s->addr, s->port);
+    fd = connect_to_sdog(s);
     if (fd < 0) {
         return fd;
     }
@@ -2035,7 +2294,7 @@ static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
 
         data_len = MIN(remaining, SD_DATA_OBJ_SIZE - offset);
 
-        vmstate_oid = vid_to_vmstate_oid(s->inode.vdi_id, vdi_index);
+        vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
 
         create = (offset == 0);
         if (load) {
@@ -2063,12 +2322,19 @@ cleanup:
     return ret;
 }
 
-static int sd_save_vmstate(BlockDriverState *bs, const uint8_t *data,
-                           int64_t pos, int size)
+static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
+                           int64_t pos)
 {
     BDRVSheepdogState *s = bs->opaque;
+    void *buf;
+    int ret;
+
+    buf = qemu_blockalign(bs, qiov->size);
+    qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
+    ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
+    qemu_vfree(buf);
 
-    return do_load_save_vmstate(s, (uint8_t *)data, pos, size, 0);
+    return ret;
 }
 
 static int sd_load_vmstate(BlockDriverState *bs, uint8_t *data,
@@ -2080,6 +2346,67 @@ static int sd_load_vmstate(BlockDriverState *bs, uint8_t *data,
 }
 
 
+static coroutine_fn int sd_co_discard(BlockDriverState *bs, int64_t sector_num,
+                                      int nb_sectors)
+{
+    SheepdogAIOCB *acb;
+    QEMUIOVector dummy;
+    BDRVSheepdogState *s = bs->opaque;
+    int ret;
+
+    if (!s->discard_supported) {
+            return 0;
+    }
+
+    acb = sd_aio_setup(bs, &dummy, sector_num, nb_sectors);
+    acb->aiocb_type = AIOCB_DISCARD_OBJ;
+    acb->aio_done_func = sd_finish_aiocb;
+
+    ret = sd_co_rw_vector(acb);
+    if (ret <= 0) {
+        qemu_aio_release(acb);
+        return ret;
+    }
+
+    qemu_coroutine_yield();
+
+    return acb->ret;
+}
+
+static coroutine_fn int64_t
+sd_co_get_block_status(BlockDriverState *bs, int64_t sector_num, int nb_sectors,
+                       int *pnum)
+{
+    BDRVSheepdogState *s = bs->opaque;
+    SheepdogInode *inode = &s->inode;
+    unsigned long start = sector_num * BDRV_SECTOR_SIZE / SD_DATA_OBJ_SIZE,
+                  end = DIV_ROUND_UP((sector_num + nb_sectors) *
+                                     BDRV_SECTOR_SIZE, SD_DATA_OBJ_SIZE);
+    unsigned long idx;
+    int64_t ret = BDRV_BLOCK_DATA;
+
+    for (idx = start; idx < end; idx++) {
+        if (inode->data_vdi_id[idx] == 0) {
+            break;
+        }
+    }
+    if (idx == start) {
+        /* Get the longest length of unallocated sectors */
+        ret = 0;
+        for (idx = start + 1; idx < end; idx++) {
+            if (inode->data_vdi_id[idx] != 0) {
+                break;
+            }
+        }
+    }
+
+    *pnum = (idx - start) * SD_DATA_OBJ_SIZE / BDRV_SECTOR_SIZE;
+    if (*pnum > nb_sectors) {
+        *pnum = nb_sectors;
+    }
+    return ret;
+}
+
 static QEMUOptionParameter sd_create_options[] = {
     {
         .name = BLOCK_OPT_SIZE,
@@ -2103,15 +2430,19 @@ static BlockDriver bdrv_sheepdog = {
     .format_name    = "sheepdog",
     .protocol_name  = "sheepdog",
     .instance_size  = sizeof(BDRVSheepdogState),
+    .bdrv_needs_filename = true,
     .bdrv_file_open = sd_open,
     .bdrv_close     = sd_close,
     .bdrv_create    = sd_create,
+    .bdrv_has_zero_init = bdrv_has_zero_init_1,
     .bdrv_getlength = sd_getlength,
     .bdrv_truncate  = sd_truncate,
 
     .bdrv_co_readv  = sd_co_readv,
     .bdrv_co_writev = sd_co_writev,
     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
+    .bdrv_co_discard = sd_co_discard,
+    .bdrv_co_get_block_status = sd_co_get_block_status,
 
     .bdrv_snapshot_create   = sd_snapshot_create,
     .bdrv_snapshot_goto     = sd_snapshot_goto,
@@ -2128,15 +2459,48 @@ static BlockDriver bdrv_sheepdog_tcp = {
     .format_name    = "sheepdog",
     .protocol_name  = "sheepdog+tcp",
     .instance_size  = sizeof(BDRVSheepdogState),
+    .bdrv_needs_filename = true,
+    .bdrv_file_open = sd_open,
+    .bdrv_close     = sd_close,
+    .bdrv_create    = sd_create,
+    .bdrv_has_zero_init = bdrv_has_zero_init_1,
+    .bdrv_getlength = sd_getlength,
+    .bdrv_truncate  = sd_truncate,
+
+    .bdrv_co_readv  = sd_co_readv,
+    .bdrv_co_writev = sd_co_writev,
+    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
+    .bdrv_co_discard = sd_co_discard,
+    .bdrv_co_get_block_status = sd_co_get_block_status,
+
+    .bdrv_snapshot_create   = sd_snapshot_create,
+    .bdrv_snapshot_goto     = sd_snapshot_goto,
+    .bdrv_snapshot_delete   = sd_snapshot_delete,
+    .bdrv_snapshot_list     = sd_snapshot_list,
+
+    .bdrv_save_vmstate  = sd_save_vmstate,
+    .bdrv_load_vmstate  = sd_load_vmstate,
+
+    .create_options = sd_create_options,
+};
+
+static BlockDriver bdrv_sheepdog_unix = {
+    .format_name    = "sheepdog",
+    .protocol_name  = "sheepdog+unix",
+    .instance_size  = sizeof(BDRVSheepdogState),
+    .bdrv_needs_filename = true,
     .bdrv_file_open = sd_open,
     .bdrv_close     = sd_close,
     .bdrv_create    = sd_create,
+    .bdrv_has_zero_init = bdrv_has_zero_init_1,
     .bdrv_getlength = sd_getlength,
     .bdrv_truncate  = sd_truncate,
 
     .bdrv_co_readv  = sd_co_readv,
     .bdrv_co_writev = sd_co_writev,
     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
+    .bdrv_co_discard = sd_co_discard,
+    .bdrv_co_get_block_status = sd_co_get_block_status,
 
     .bdrv_snapshot_create   = sd_snapshot_create,
     .bdrv_snapshot_goto     = sd_snapshot_goto,
@@ -2153,5 +2517,6 @@ static void bdrv_sheepdog_init(void)
 {
     bdrv_register(&bdrv_sheepdog);
     bdrv_register(&bdrv_sheepdog_tcp);
+    bdrv_register(&bdrv_sheepdog_unix);
 }
 block_init(bdrv_sheepdog_init);