]> git.proxmox.com Git - qemu.git/blobdiff - block/sheepdog.c
Merge remote-tracking branch 'mjt/mjt-iov2' into staging
[qemu.git] / block / sheepdog.c
index f46ca8fb694a8de4204b6df2b515cd87abff3b3a..809df39d9e7dd22010c82e71d3ba9c68ae616ceb 100644 (file)
@@ -259,8 +259,7 @@ typedef struct AIOReq {
     uint8_t flags;
     uint32_t id;
 
-    QLIST_ENTRY(AIOReq) outstanding_aio_siblings;
-    QLIST_ENTRY(AIOReq) aioreq_siblings;
+    QLIST_ENTRY(AIOReq) aio_siblings;
 } AIOReq;
 
 enum AIOCBState {
@@ -283,8 +282,7 @@ struct SheepdogAIOCB {
     void (*aio_done_func)(SheepdogAIOCB *);
 
     int canceled;
-
-    QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
+    int nr_pending;
 };
 
 typedef struct BDRVSheepdogState {
@@ -307,7 +305,8 @@ typedef struct BDRVSheepdogState {
     Coroutine *co_recv;
 
     uint32_t aioreq_seq_num;
-    QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
+    QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
+    QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head;
 } BDRVSheepdogState;
 
 static const char * sd_strerror(int err)
@@ -358,7 +357,7 @@ static const char * sd_strerror(int err)
  * Sheepdog I/O handling:
  *
  * 1. In sd_co_rw_vector, we send the I/O requests to the server and
- *    link the requests to the outstanding_list in the
+ *    link the requests to the inflight_list in the
  *    BDRVSheepdogState.  The function exits without waiting for
  *    receiving the response.
  *
@@ -386,21 +385,18 @@ static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
     aio_req->flags = flags;
     aio_req->id = s->aioreq_seq_num++;
 
-    QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
-                      outstanding_aio_siblings);
-    QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
-
+    acb->nr_pending++;
     return aio_req;
 }
 
-static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
+static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
 {
     SheepdogAIOCB *acb = aio_req->aiocb;
-    QLIST_REMOVE(aio_req, outstanding_aio_siblings);
-    QLIST_REMOVE(aio_req, aioreq_siblings);
+
+    QLIST_REMOVE(aio_req, aio_siblings);
     g_free(aio_req);
 
-    return !QLIST_EMPTY(&acb->aioreq_head);
+    acb->nr_pending--;
 }
 
 static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
@@ -446,7 +442,7 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
     acb->canceled = 0;
     acb->coroutine = qemu_coroutine_self();
     acb->ret = 0;
-    QLIST_INIT(&acb->aioreq_head);
+    acb->nr_pending = 0;
     return acb;
 }
 
@@ -522,8 +518,8 @@ static int send_req(int sockfd, SheepdogReq *hdr, void *data,
     return ret;
 }
 
-static int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
-                       unsigned int *wlen)
+static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
+                                    unsigned int *wlen)
 {
     int ret;
 
@@ -540,11 +536,19 @@ static int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
 
     return ret;
 }
+
+static coroutine_fn int do_co_req(int sockfd, SheepdogReq *hdr, void *data,
+                                  unsigned int *wlen, unsigned int *rlen);
+
 static int do_req(int sockfd, SheepdogReq *hdr, void *data,
                   unsigned int *wlen, unsigned int *rlen)
 {
     int ret;
 
+    if (qemu_in_coroutine()) {
+        return do_co_req(sockfd, hdr, data, wlen, rlen);
+    }
+
     socket_set_block(sockfd);
     ret = send_req(sockfd, hdr, data, wlen);
     if (ret < 0) {
@@ -576,10 +580,21 @@ out:
     return ret;
 }
 
-static int do_co_req(int sockfd, SheepdogReq *hdr, void *data,
-                     unsigned int *wlen, unsigned int *rlen)
+static void restart_co_req(void *opaque)
+{
+    Coroutine *co = opaque;
+
+    qemu_coroutine_enter(co, NULL);
+}
+
+static coroutine_fn int do_co_req(int sockfd, SheepdogReq *hdr, void *data,
+                                  unsigned int *wlen, unsigned int *rlen)
 {
     int ret;
+    Coroutine *co;
+
+    co = qemu_coroutine_self();
+    qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, NULL, co);
 
     socket_set_block(sockfd);
     ret = send_co_req(sockfd, hdr, data, wlen);
@@ -587,6 +602,8 @@ static int do_co_req(int sockfd, SheepdogReq *hdr, void *data,
         goto out;
     }
 
+    qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, NULL, co);
+
     ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
     if (ret < sizeof(*hdr)) {
         error_report("failed to get a rsp, %s", strerror(errno));
@@ -608,6 +625,7 @@ static int do_co_req(int sockfd, SheepdogReq *hdr, void *data,
     }
     ret = 0;
 out:
+    qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL, NULL);
     socket_set_nonblock(sockfd);
     return ret;
 }
@@ -616,32 +634,41 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
                            struct iovec *iov, int niov, int create,
                            enum AIOCBState aiocb_type);
 
+
+static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid)
+{
+    AIOReq *aio_req;
+
+    QLIST_FOREACH(aio_req, &s->pending_aio_head, aio_siblings) {
+        if (aio_req->oid == oid) {
+            return aio_req;
+        }
+    }
+
+    return NULL;
+}
+
 /*
  * This function searchs pending requests to the object `oid', and
  * sends them.
  */
-static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid, uint32_t id)
+static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid)
 {
-    AIOReq *aio_req, *next;
+    AIOReq *aio_req;
     SheepdogAIOCB *acb;
     int ret;
 
-    QLIST_FOREACH_SAFE(aio_req, &s->outstanding_aio_head,
-                       outstanding_aio_siblings, next) {
-        if (id == aio_req->id) {
-            continue;
-        }
-        if (aio_req->oid != oid) {
-            continue;
-        }
-
+    while ((aio_req = find_pending_req(s, oid)) != NULL) {
         acb = aio_req->aiocb;
+        /* move aio_req from pending list to inflight one */
+        QLIST_REMOVE(aio_req, aio_siblings);
+        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
         ret = add_aio_request(s, aio_req, acb->qiov->iov,
                               acb->qiov->niov, 0, acb->aiocb_type);
         if (ret < 0) {
             error_report("add_aio_request is failed");
             free_aio_req(s, aio_req);
-            if (QLIST_EMPTY(&acb->aioreq_head)) {
+            if (!acb->nr_pending) {
                 sd_finish_aiocb(acb);
             }
         }
@@ -662,10 +689,9 @@ static void coroutine_fn aio_read_response(void *opaque)
     int ret;
     AIOReq *aio_req = NULL;
     SheepdogAIOCB *acb;
-    int rest;
     unsigned long idx;
 
-    if (QLIST_EMPTY(&s->outstanding_aio_head)) {
+    if (QLIST_EMPTY(&s->inflight_aio_head)) {
         goto out;
     }
 
@@ -676,8 +702,8 @@ static void coroutine_fn aio_read_response(void *opaque)
         goto out;
     }
 
-    /* find the right aio_req from the outstanding_aio list */
-    QLIST_FOREACH(aio_req, &s->outstanding_aio_head, outstanding_aio_siblings) {
+    /* find the right aio_req from the inflight aio list */
+    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
         if (aio_req->id == rsp.id) {
             break;
         }
@@ -715,12 +741,12 @@ static void coroutine_fn aio_read_response(void *opaque)
              * create requests are not allowed, so we search the
              * pending requests here.
              */
-            send_pending_req(s, vid_to_data_oid(s->inode.vdi_id, idx), rsp.id);
+            send_pending_req(s, vid_to_data_oid(s->inode.vdi_id, idx));
         }
         break;
     case AIOCB_READ_UDATA:
-        ret = qemu_co_recvv(fd, acb->qiov->iov, rsp.data_length,
-                            aio_req->iov_offset);
+        ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
+                            aio_req->iov_offset, rsp.data_length);
         if (ret < 0) {
             error_report("failed to get the data, %s", strerror(errno));
             goto out;
@@ -733,8 +759,8 @@ static void coroutine_fn aio_read_response(void *opaque)
         error_report("%s", sd_strerror(rsp.result));
     }
 
-    rest = free_aio_req(s, aio_req);
-    if (!rest) {
+    free_aio_req(s, aio_req);
+    if (!acb->nr_pending) {
         /*
          * We've finished all requests which belong to the AIOCB, so
          * we can switch back to sd_co_readv/writev now.
@@ -767,7 +793,8 @@ static int aio_flush_request(void *opaque)
 {
     BDRVSheepdogState *s = opaque;
 
-    return !QLIST_EMPTY(&s->outstanding_aio_head);
+    return !QLIST_EMPTY(&s->inflight_aio_head) ||
+        !QLIST_EMPTY(&s->pending_aio_head);
 }
 
 static int set_nodelay(int fd)
@@ -992,7 +1019,7 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
     }
 
     if (wlen) {
-        ret = qemu_co_sendv(s->fd, iov, wlen, aio_req->iov_offset);
+        ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
         if (ret < 0) {
             qemu_co_mutex_unlock(&s->lock);
             error_report("failed to send a data, %s", strerror(errno));
@@ -1084,7 +1111,8 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
 
     strstart(filename, "sheepdog:", (const char **)&filename);
 
-    QLIST_INIT(&s->outstanding_aio_head);
+    QLIST_INIT(&s->inflight_aio_head);
+    QLIST_INIT(&s->pending_aio_head);
     s->fd = -1;
 
     memset(vdi, 0, sizeof(vdi));
@@ -1446,6 +1474,7 @@ static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
         iov.iov_len = sizeof(s->inode);
         aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
                                 data_len, offset, 0, 0, offset);
+        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
         ret = add_aio_request(s, aio_req, &iov, 1, 0, AIOCB_WRITE_UDATA);
         if (ret) {
             free_aio_req(s, aio_req);
@@ -1514,7 +1543,7 @@ out:
  * Send I/O requests to the server.
  *
  * This function sends requests to the server, links the requests to
- * the outstanding_list in BDRVSheepdogState, and exits without
+ * the inflight_list in BDRVSheepdogState, and exits without
  * waiting the response.  The responses are received in the
  * `aio_read_response' function which is called from the main loop as
  * a fd handler.
@@ -1546,6 +1575,12 @@ static int coroutine_fn sd_co_rw_vector(void *p)
         }
     }
 
+    /*
+     * Make sure we don't free the aiocb before we are done with all requests.
+     * This additional reference is dropped at the end of this function.
+     */
+    acb->nr_pending++;
+
     while (done != total) {
         uint8_t flags = 0;
         uint64_t old_oid = 0;
@@ -1570,22 +1605,18 @@ static int coroutine_fn sd_co_rw_vector(void *p)
         }
 
         if (create) {
-            dprintf("update ino (%" PRIu32") %" PRIu64 " %" PRIu64
-                    " %" PRIu64 "\n", inode->vdi_id, oid,
+            dprintf("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
+                    inode->vdi_id, oid,
                     vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
             oid = vid_to_data_oid(inode->vdi_id, idx);
-            dprintf("new oid %lx\n", oid);
+            dprintf("new oid %" PRIx64 "\n", oid);
         }
 
         aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid, done);
 
         if (create) {
             AIOReq *areq;
-            QLIST_FOREACH(areq, &s->outstanding_aio_head,
-                          outstanding_aio_siblings) {
-                if (areq == aio_req) {
-                    continue;
-                }
+            QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
                 if (areq->oid == oid) {
                     /*
                      * Sheepdog cannot handle simultaneous create
@@ -1595,11 +1626,14 @@ static int coroutine_fn sd_co_rw_vector(void *p)
                      */
                     aio_req->flags = 0;
                     aio_req->base_oid = 0;
+                    QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req,
+                                      aio_siblings);
                     goto done;
                 }
             }
         }
 
+        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
         ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
                               create, acb->aiocb_type);
         if (ret < 0) {
@@ -1614,7 +1648,7 @@ static int coroutine_fn sd_co_rw_vector(void *p)
         done += len;
     }
 out:
-    if (QLIST_EMPTY(&acb->aioreq_head)) {
+    if (!--acb->nr_pending) {
         return acb->ret;
     }
     return 1;
@@ -1627,7 +1661,6 @@ static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
     int ret;
 
     if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
-        /* TODO: shouldn't block here */
         ret = sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE);
         if (ret < 0) {
             return ret;
@@ -1695,7 +1728,7 @@ static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
     hdr.opcode = SD_OP_FLUSH_VDI;
     hdr.oid = vid_to_vdi_oid(inode->vdi_id);
 
-    ret = do_co_req(s->flush_fd, (SheepdogReq *)&hdr, NULL, &wlen, &rlen);
+    ret = do_req(s->flush_fd, (SheepdogReq *)&hdr, NULL, &wlen, &rlen);
     if (ret) {
         error_report("failed to send a request to the sheep");
         return ret;
@@ -1725,7 +1758,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
     SheepdogInode *inode;
     unsigned int datalen;
 
-    dprintf("sn_info: name %s id_str %s s: name %s vm_state_size %d "
+    dprintf("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
             "is_snapshot %d\n", sn_info->name, sn_info->id_str,
             s->name, sn_info->vm_state_size, s->is_snapshot);