X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=block%2Fsheepdog.c;h=809df39d9e7dd22010c82e71d3ba9c68ae616ceb;hb=23797df3d9f08031d19aaaa1d2863d5feebe3d8b;hp=c1f6e07ec163fbfc9784f386c2774ef2c57658bb;hpb=63236c15e93e18d37ce657171a42af1f809d0aa6;p=qemu.git diff --git a/block/sheepdog.c b/block/sheepdog.c index c1f6e07ec..809df39d9 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -7,6 +7,9 @@ * * You should have received a copy of the GNU General Public License * along with this program. If not, see . + * + * Contributions after 2012-01-13 are licensed under the terms of the + * GNU GPL, version 2 or (at your option) any later version. */ #include "qemu-common.h" @@ -29,9 +32,11 @@ #define SD_OP_RELEASE_VDI 0x13 #define SD_OP_GET_VDI_INFO 0x14 #define SD_OP_READ_VDIS 0x15 +#define SD_OP_FLUSH_VDI 0x16 #define SD_FLAG_CMD_WRITE 0x01 #define SD_FLAG_CMD_COW 0x02 +#define SD_FLAG_CMD_CACHE 0x04 #define SD_RES_SUCCESS 0x00 /* Success */ #define SD_RES_UNKNOWN 0x01 /* Unknown error */ @@ -66,7 +71,7 @@ * 20 - 31 (12 bits): reserved data object space * 32 - 55 (24 bits): vdi object space * 56 - 59 ( 4 bits): reserved vdi object space - * 60 - 63 ( 4 bits): object type indentifier space + * 60 - 63 ( 4 bits): object type identifier space */ #define VDI_SPACE_SHIFT 32 @@ -254,8 +259,7 @@ typedef struct AIOReq { uint8_t flags; uint32_t id; - QLIST_ENTRY(AIOReq) outstanding_aio_siblings; - QLIST_ENTRY(AIOReq) aioreq_siblings; + QLIST_ENTRY(AIOReq) aio_siblings; } AIOReq; enum AIOCBState { @@ -278,8 +282,7 @@ struct SheepdogAIOCB { void (*aio_done_func)(SheepdogAIOCB *); int canceled; - - QLIST_HEAD(aioreq_head, AIOReq) aioreq_head; + int nr_pending; }; typedef struct BDRVSheepdogState { @@ -290,17 +293,20 @@ typedef struct BDRVSheepdogState { char name[SD_MAX_VDI_LEN]; int is_snapshot; + uint8_t cache_enabled; char *addr; char *port; int fd; + int flush_fd; CoMutex lock; Coroutine *co_send; Coroutine *co_recv; uint32_t aioreq_seq_num; - QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head; + QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head; + QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head; } BDRVSheepdogState; static const char * sd_strerror(int err) @@ -351,7 +357,7 @@ static const char * sd_strerror(int err) * Sheepdog I/O handling: * * 1. In sd_co_rw_vector, we send the I/O requests to the server and - * link the requests to the outstanding_list in the + * link the requests to the inflight_list in the * BDRVSheepdogState. The function exits without waiting for * receiving the response. * @@ -379,24 +385,21 @@ static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb, aio_req->flags = flags; aio_req->id = s->aioreq_seq_num++; - QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req, - outstanding_aio_siblings); - QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings); - + acb->nr_pending++; return aio_req; } -static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req) +static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req) { SheepdogAIOCB *acb = aio_req->aiocb; - QLIST_REMOVE(aio_req, outstanding_aio_siblings); - QLIST_REMOVE(aio_req, aioreq_siblings); + + QLIST_REMOVE(aio_req, aio_siblings); g_free(aio_req); - return !QLIST_EMPTY(&acb->aioreq_head); + acb->nr_pending--; } -static void sd_finish_aiocb(SheepdogAIOCB *acb) +static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb) { if (!acb->canceled) { qemu_coroutine_enter(acb->coroutine, NULL); @@ -439,133 +442,10 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov, acb->canceled = 0; acb->coroutine = qemu_coroutine_self(); acb->ret = 0; - QLIST_INIT(&acb->aioreq_head); + acb->nr_pending = 0; return acb; } -#ifdef _WIN32 - -struct msghdr { - struct iovec *msg_iov; - size_t msg_iovlen; -}; - -static ssize_t sendmsg(int s, const struct msghdr *msg, int flags) -{ - size_t size = 0; - char *buf, *p; - int i, ret; - - /* count the msg size */ - for (i = 0; i < msg->msg_iovlen; i++) { - size += msg->msg_iov[i].iov_len; - } - buf = g_malloc(size); - - p = buf; - for (i = 0; i < msg->msg_iovlen; i++) { - memcpy(p, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len); - p += msg->msg_iov[i].iov_len; - } - - ret = send(s, buf, size, flags); - - g_free(buf); - return ret; -} - -static ssize_t recvmsg(int s, struct msghdr *msg, int flags) -{ - size_t size = 0; - char *buf, *p; - int i, ret; - - /* count the msg size */ - for (i = 0; i < msg->msg_iovlen; i++) { - size += msg->msg_iov[i].iov_len; - } - buf = g_malloc(size); - - ret = qemu_recv(s, buf, size, flags); - if (ret < 0) { - goto out; - } - - p = buf; - for (i = 0; i < msg->msg_iovlen; i++) { - memcpy(msg->msg_iov[i].iov_base, p, msg->msg_iov[i].iov_len); - p += msg->msg_iov[i].iov_len; - } -out: - g_free(buf); - return ret; -} - -#endif - -/* - * Send/recv data with iovec buffers - * - * This function send/recv data from/to the iovec buffer directly. - * The first `offset' bytes in the iovec buffer are skipped and next - * `len' bytes are used. - * - * For example, - * - * do_send_recv(sockfd, iov, len, offset, 1); - * - * is equals to - * - * char *buf = malloc(size); - * iov_to_buf(iov, iovcnt, buf, offset, size); - * send(sockfd, buf, size, 0); - * free(buf); - */ -static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset, - int write) -{ - struct msghdr msg; - int ret, diff; - - memset(&msg, 0, sizeof(msg)); - msg.msg_iov = iov; - msg.msg_iovlen = 1; - - len += offset; - - while (iov->iov_len < len) { - len -= iov->iov_len; - - iov++; - msg.msg_iovlen++; - } - - diff = iov->iov_len - len; - iov->iov_len -= diff; - - while (msg.msg_iov->iov_len <= offset) { - offset -= msg.msg_iov->iov_len; - - msg.msg_iov++; - msg.msg_iovlen--; - } - - msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset; - msg.msg_iov->iov_len -= offset; - - if (write) { - ret = sendmsg(sockfd, &msg, 0); - } else { - ret = recvmsg(sockfd, &msg, 0); - } - - msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset; - msg.msg_iov->iov_len += offset; - - iov->iov_len += diff; - return ret; -} - static int connect_to_sdog(const char *addr, const char *port) { char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; @@ -584,7 +464,7 @@ static int connect_to_sdog(const char *addr, const char *port) if (ret) { error_report("unable to get address info %s, %s", addr, strerror(errno)); - return -1; + return -errno; } for (res = res0; res; res = res->ai_next) { @@ -611,110 +491,123 @@ static int connect_to_sdog(const char *addr, const char *port) dprintf("connected to %s:%s\n", addr, port); goto success; } - fd = -1; + fd = -errno; error_report("failed connect to %s:%s", addr, port); success: freeaddrinfo(res0); return fd; } -static int do_readv_writev(int sockfd, struct iovec *iov, int len, - int iov_offset, int write) +static int send_req(int sockfd, SheepdogReq *hdr, void *data, + unsigned int *wlen) { int ret; -again: - ret = do_send_recv(sockfd, iov, len, iov_offset, write); - if (ret < 0) { - if (errno == EINTR) { - goto again; - } - if (errno == EAGAIN) { - if (qemu_in_coroutine()) { - qemu_coroutine_yield(); - } - goto again; - } - error_report("failed to recv a rsp, %s", strerror(errno)); - return 1; - } - iov_offset += ret; - len -= ret; - if (len) { - goto again; + ret = qemu_send_full(sockfd, hdr, sizeof(*hdr), 0); + if (ret < sizeof(*hdr)) { + error_report("failed to send a req, %s", strerror(errno)); + return -errno; } - return 0; -} - -static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset) -{ - return do_readv_writev(sockfd, iov, len, iov_offset, 0); -} + ret = qemu_send_full(sockfd, data, *wlen, 0); + if (ret < *wlen) { + error_report("failed to send a req, %s", strerror(errno)); + ret = -errno; + } -static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset) -{ - return do_readv_writev(sockfd, iov, len, iov_offset, 1); + return ret; } -static int do_read_write(int sockfd, void *buf, int len, int write) +static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data, + unsigned int *wlen) { - struct iovec iov; + int ret; - iov.iov_base = buf; - iov.iov_len = len; + ret = qemu_co_send(sockfd, hdr, sizeof(*hdr)); + if (ret < sizeof(*hdr)) { + error_report("failed to send a req, %s", strerror(errno)); + return ret; + } - return do_readv_writev(sockfd, &iov, len, 0, write); -} + ret = qemu_co_send(sockfd, data, *wlen); + if (ret < *wlen) { + error_report("failed to send a req, %s", strerror(errno)); + } -static int do_read(int sockfd, void *buf, int len) -{ - return do_read_write(sockfd, buf, len, 0); + return ret; } -static int do_write(int sockfd, void *buf, int len) -{ - return do_read_write(sockfd, buf, len, 1); -} +static coroutine_fn int do_co_req(int sockfd, SheepdogReq *hdr, void *data, + unsigned int *wlen, unsigned int *rlen); -static int send_req(int sockfd, SheepdogReq *hdr, void *data, - unsigned int *wlen) +static int do_req(int sockfd, SheepdogReq *hdr, void *data, + unsigned int *wlen, unsigned int *rlen) { int ret; - struct iovec iov[2]; - iov[0].iov_base = hdr; - iov[0].iov_len = sizeof(*hdr); + if (qemu_in_coroutine()) { + return do_co_req(sockfd, hdr, data, wlen, rlen); + } - if (*wlen) { - iov[1].iov_base = data; - iov[1].iov_len = *wlen; + socket_set_block(sockfd); + ret = send_req(sockfd, hdr, data, wlen); + if (ret < 0) { + goto out; } - ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0); - if (ret) { - error_report("failed to send a req, %s", strerror(errno)); - ret = -1; + ret = qemu_recv_full(sockfd, hdr, sizeof(*hdr), 0); + if (ret < sizeof(*hdr)) { + error_report("failed to get a rsp, %s", strerror(errno)); + ret = -errno; + goto out; + } + + if (*rlen > hdr->data_length) { + *rlen = hdr->data_length; } + if (*rlen) { + ret = qemu_recv_full(sockfd, data, *rlen, 0); + if (ret < *rlen) { + error_report("failed to get the data, %s", strerror(errno)); + ret = -errno; + goto out; + } + } + ret = 0; +out: + socket_set_nonblock(sockfd); return ret; } -static int do_req(int sockfd, SheepdogReq *hdr, void *data, - unsigned int *wlen, unsigned int *rlen) +static void restart_co_req(void *opaque) +{ + Coroutine *co = opaque; + + qemu_coroutine_enter(co, NULL); +} + +static coroutine_fn int do_co_req(int sockfd, SheepdogReq *hdr, void *data, + unsigned int *wlen, unsigned int *rlen) { int ret; + Coroutine *co; - ret = send_req(sockfd, hdr, data, wlen); - if (ret) { - ret = -1; + co = qemu_coroutine_self(); + qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, NULL, co); + + socket_set_block(sockfd); + ret = send_co_req(sockfd, hdr, data, wlen); + if (ret < 0) { goto out; } - ret = do_read(sockfd, hdr, sizeof(*hdr)); - if (ret) { + qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, NULL, co); + + ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr)); + if (ret < sizeof(*hdr)) { error_report("failed to get a rsp, %s", strerror(errno)); - ret = -1; + ret = -errno; goto out; } @@ -723,48 +616,59 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data, } if (*rlen) { - ret = do_read(sockfd, data, *rlen); - if (ret) { + ret = qemu_co_recv(sockfd, data, *rlen); + if (ret < *rlen) { error_report("failed to get the data, %s", strerror(errno)); - ret = -1; + ret = -errno; goto out; } } ret = 0; out: + qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL, NULL); + socket_set_nonblock(sockfd); return ret; } -static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, +static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, struct iovec *iov, int niov, int create, enum AIOCBState aiocb_type); + +static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid) +{ + AIOReq *aio_req; + + QLIST_FOREACH(aio_req, &s->pending_aio_head, aio_siblings) { + if (aio_req->oid == oid) { + return aio_req; + } + } + + return NULL; +} + /* * This function searchs pending requests to the object `oid', and * sends them. */ -static void send_pending_req(BDRVSheepdogState *s, uint64_t oid, uint32_t id) +static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid) { - AIOReq *aio_req, *next; + AIOReq *aio_req; SheepdogAIOCB *acb; int ret; - QLIST_FOREACH_SAFE(aio_req, &s->outstanding_aio_head, - outstanding_aio_siblings, next) { - if (id == aio_req->id) { - continue; - } - if (aio_req->oid != oid) { - continue; - } - + while ((aio_req = find_pending_req(s, oid)) != NULL) { acb = aio_req->aiocb; + /* move aio_req from pending list to inflight one */ + QLIST_REMOVE(aio_req, aio_siblings); + QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings); ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov, 0, acb->aiocb_type); if (ret < 0) { error_report("add_aio_request is failed"); free_aio_req(s, aio_req); - if (QLIST_EMPTY(&acb->aioreq_head)) { + if (!acb->nr_pending) { sd_finish_aiocb(acb); } } @@ -777,7 +681,7 @@ static void send_pending_req(BDRVSheepdogState *s, uint64_t oid, uint32_t id) * This function is registered as a fd handler, and called from the * main loop when s->fd is ready for reading responses. */ -static void aio_read_response(void *opaque) +static void coroutine_fn aio_read_response(void *opaque) { SheepdogObjRsp rsp; BDRVSheepdogState *s = opaque; @@ -785,22 +689,21 @@ static void aio_read_response(void *opaque) int ret; AIOReq *aio_req = NULL; SheepdogAIOCB *acb; - int rest; unsigned long idx; - if (QLIST_EMPTY(&s->outstanding_aio_head)) { + if (QLIST_EMPTY(&s->inflight_aio_head)) { goto out; } /* read a header */ - ret = do_read(fd, &rsp, sizeof(rsp)); - if (ret) { + ret = qemu_co_recv(fd, &rsp, sizeof(rsp)); + if (ret < 0) { error_report("failed to get the header, %s", strerror(errno)); goto out; } - /* find the right aio_req from the outstanding_aio list */ - QLIST_FOREACH(aio_req, &s->outstanding_aio_head, outstanding_aio_siblings) { + /* find the right aio_req from the inflight aio list */ + QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) { if (aio_req->id == rsp.id) { break; } @@ -814,6 +717,9 @@ static void aio_read_response(void *opaque) switch (acb->aiocb_type) { case AIOCB_WRITE_UDATA: + /* this coroutine context is no longer suitable for co_recv + * because we may send data to update vdi objects */ + s->co_recv = NULL; if (!is_data_obj(aio_req->oid)) { break; } @@ -835,13 +741,13 @@ static void aio_read_response(void *opaque) * create requests are not allowed, so we search the * pending requests here. */ - send_pending_req(s, vid_to_data_oid(s->inode.vdi_id, idx), rsp.id); + send_pending_req(s, vid_to_data_oid(s->inode.vdi_id, idx)); } break; case AIOCB_READ_UDATA: - ret = do_readv(fd, acb->qiov->iov, rsp.data_length, - aio_req->iov_offset); - if (ret) { + ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov, + aio_req->iov_offset, rsp.data_length); + if (ret < 0) { error_report("failed to get the data, %s", strerror(errno)); goto out; } @@ -853,8 +759,8 @@ static void aio_read_response(void *opaque) error_report("%s", sd_strerror(rsp.result)); } - rest = free_aio_req(s, aio_req); - if (!rest) { + free_aio_req(s, aio_req); + if (!acb->nr_pending) { /* * We've finished all requests which belong to the AIOCB, so * we can switch back to sd_co_readv/writev now. @@ -887,25 +793,10 @@ static int aio_flush_request(void *opaque) { BDRVSheepdogState *s = opaque; - return !QLIST_EMPTY(&s->outstanding_aio_head); + return !QLIST_EMPTY(&s->inflight_aio_head) || + !QLIST_EMPTY(&s->pending_aio_head); } -#if !defined(SOL_TCP) || !defined(TCP_CORK) - -static int set_cork(int fd, int v) -{ - return 0; -} - -#else - -static int set_cork(int fd, int v) -{ - return setsockopt(fd, SOL_TCP, TCP_CORK, &v, sizeof(v)); -} - -#endif - static int set_nodelay(int fd) { int ret, opt; @@ -928,7 +819,7 @@ static int get_sheep_fd(BDRVSheepdogState *s) fd = connect_to_sdog(s->addr, s->port); if (fd < 0) { error_report("%s", strerror(errno)); - return -1; + return fd; } socket_set_nonblock(fd); @@ -937,11 +828,10 @@ static int get_sheep_fd(BDRVSheepdogState *s) if (ret) { error_report("%s", strerror(errno)); closesocket(fd); - return -1; + return -errno; } - qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request, - NULL, s); + qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request, s); return fd; } @@ -1025,7 +915,7 @@ static int find_vdi_name(BDRVSheepdogState *s, char *filename, uint32_t snapid, fd = connect_to_sdog(s->addr, s->port); if (fd < 0) { - return -1; + return fd; } memset(buf, 0, sizeof(buf)); @@ -1046,14 +936,17 @@ static int find_vdi_name(BDRVSheepdogState *s, char *filename, uint32_t snapid, ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen); if (ret) { - ret = -1; goto out; } if (rsp->result != SD_RES_SUCCESS) { error_report("cannot get vdi info, %s, %s %d %s", sd_strerror(rsp->result), filename, snapid, tag); - ret = -1; + if (rsp->result == SD_RES_NO_VDI) { + ret = -ENOENT; + } else { + ret = -EIO; + } goto out; } *vid = rsp->vdi_id; @@ -1064,7 +957,7 @@ out: return ret; } -static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, +static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, struct iovec *iov, int niov, int create, enum AIOCBState aiocb_type) { @@ -1098,6 +991,10 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, hdr.flags = SD_FLAG_CMD_WRITE | flags; } + if (s->cache_enabled) { + hdr.flags |= SD_FLAG_CMD_CACHE; + } + hdr.oid = oid; hdr.cow_oid = old_oid; hdr.copies = s->inode.nr_copies; @@ -1110,27 +1007,29 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, qemu_co_mutex_lock(&s->lock); s->co_send = qemu_coroutine_self(); qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request, - aio_flush_request, NULL, s); - set_cork(s->fd, 1); + aio_flush_request, s); + socket_set_cork(s->fd, 1); /* send a header */ - ret = do_write(s->fd, &hdr, sizeof(hdr)); - if (ret) { + ret = qemu_co_send(s->fd, &hdr, sizeof(hdr)); + if (ret < 0) { + qemu_co_mutex_unlock(&s->lock); error_report("failed to send a req, %s", strerror(errno)); - return -EIO; + return -errno; } if (wlen) { - ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset); - if (ret) { + ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen); + if (ret < 0) { + qemu_co_mutex_unlock(&s->lock); error_report("failed to send a data, %s", strerror(errno)); - return -EIO; + return -errno; } } - set_cork(s->fd, 0); + socket_set_cork(s->fd, 0); qemu_aio_set_fd_handler(s->fd, co_read_response, NULL, - aio_flush_request, NULL, s); + aio_flush_request, s); qemu_co_mutex_unlock(&s->lock); return 0; @@ -1138,7 +1037,7 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, static int read_write_object(int fd, char *buf, uint64_t oid, int copies, unsigned int datalen, uint64_t offset, - int write, int create) + int write, int create, uint8_t cache) { SheepdogObjReq hdr; SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr; @@ -1161,6 +1060,11 @@ static int read_write_object(int fd, char *buf, uint64_t oid, int copies, rlen = datalen; hdr.opcode = SD_OP_READ_OBJ; } + + if (cache) { + hdr.flags |= SD_FLAG_CMD_CACHE; + } + hdr.oid = oid; hdr.data_length = datalen; hdr.offset = offset; @@ -1169,7 +1073,7 @@ static int read_write_object(int fd, char *buf, uint64_t oid, int copies, ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen); if (ret) { error_report("failed to send a request to the sheep"); - return -1; + return ret; } switch (rsp->result) { @@ -1177,20 +1081,23 @@ static int read_write_object(int fd, char *buf, uint64_t oid, int copies, return 0; default: error_report("%s", sd_strerror(rsp->result)); - return -1; + return -EIO; } } static int read_object(int fd, char *buf, uint64_t oid, int copies, - unsigned int datalen, uint64_t offset) + unsigned int datalen, uint64_t offset, uint8_t cache) { - return read_write_object(fd, buf, oid, copies, datalen, offset, 0, 0); + return read_write_object(fd, buf, oid, copies, datalen, offset, 0, 0, + cache); } static int write_object(int fd, char *buf, uint64_t oid, int copies, - unsigned int datalen, uint64_t offset, int create) + unsigned int datalen, uint64_t offset, int create, + uint8_t cache) { - return read_write_object(fd, buf, oid, copies, datalen, offset, 1, create); + return read_write_object(fd, buf, oid, copies, datalen, offset, 1, create, + cache); } static int sd_open(BlockDriverState *bs, const char *filename, int flags) @@ -1204,16 +1111,19 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags) strstart(filename, "sheepdog:", (const char **)&filename); - QLIST_INIT(&s->outstanding_aio_head); + QLIST_INIT(&s->inflight_aio_head); + QLIST_INIT(&s->pending_aio_head); s->fd = -1; memset(vdi, 0, sizeof(vdi)); memset(tag, 0, sizeof(tag)); if (parse_vdiname(s, filename, vdi, &snapid, tag) < 0) { + ret = -EINVAL; goto out; } s->fd = get_sheep_fd(s); if (s->fd < 0) { + ret = s->fd; goto out; } @@ -1222,7 +1132,17 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags) goto out; } - if (snapid) { + if (flags & BDRV_O_CACHE_WB) { + s->cache_enabled = 1; + s->flush_fd = connect_to_sdog(s->addr, s->port); + if (s->flush_fd < 0) { + error_report("failed to connect"); + ret = s->flush_fd; + goto out; + } + } + + if (snapid || tag[0] != '\0') { dprintf("%" PRIx32 " snapshot inode was open.\n", vid); s->is_snapshot = 1; } @@ -1230,11 +1150,13 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags) fd = connect_to_sdog(s->addr, s->port); if (fd < 0) { error_report("failed to connect"); + ret = fd; goto out; } buf = g_malloc(SD_INODE_SIZE); - ret = read_object(fd, buf, vid_to_vdi_oid(vid), 0, SD_INODE_SIZE, 0); + ret = read_object(fd, buf, vid_to_vdi_oid(vid), 0, SD_INODE_SIZE, 0, + s->cache_enabled); closesocket(fd); @@ -1252,12 +1174,12 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags) g_free(buf); return 0; out: - qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL, NULL); + qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL); if (s->fd >= 0) { closesocket(s->fd); } g_free(buf); - return -1; + return ret; } static int do_sd_create(char *filename, int64_t vdi_size, @@ -1272,7 +1194,7 @@ static int do_sd_create(char *filename, int64_t vdi_size, fd = connect_to_sdog(addr, port); if (fd < 0) { - return -EIO; + return fd; } memset(buf, 0, sizeof(buf)); @@ -1295,7 +1217,7 @@ static int do_sd_create(char *filename, int64_t vdi_size, closesocket(fd); if (ret) { - return -EIO; + return ret; } if (rsp->result != SD_RES_SUCCESS) { @@ -1355,24 +1277,26 @@ out: static int sd_create(const char *filename, QEMUOptionParameter *options) { - int ret; + int ret = 0; uint32_t vid = 0, base_vid = 0; int64_t vdi_size = 0; char *backing_file = NULL; - BDRVSheepdogState s; + BDRVSheepdogState *s; char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN]; uint32_t snapid; int prealloc = 0; const char *vdiname; + s = g_malloc0(sizeof(BDRVSheepdogState)); + strstart(filename, "sheepdog:", &vdiname); - memset(&s, 0, sizeof(s)); memset(vdi, 0, sizeof(vdi)); memset(tag, 0, sizeof(tag)); - if (parse_vdiname(&s, vdiname, vdi, &snapid, tag) < 0) { + if (parse_vdiname(s, vdiname, vdi, &snapid, tag) < 0) { error_report("invalid filename"); - return -EINVAL; + ret = -EINVAL; + goto out; } while (options && options->name) { @@ -1388,7 +1312,8 @@ static int sd_create(const char *filename, QEMUOptionParameter *options) } else { error_report("Invalid preallocation mode: '%s'", options->value.s); - return -EINVAL; + ret = -EINVAL; + goto out; } } options++; @@ -1396,7 +1321,8 @@ static int sd_create(const char *filename, QEMUOptionParameter *options) if (vdi_size > SD_MAX_VDI_SIZE) { error_report("too big image size"); - return -EINVAL; + ret = -EINVAL; + goto out; } if (backing_file) { @@ -1408,31 +1334,37 @@ static int sd_create(const char *filename, QEMUOptionParameter *options) drv = bdrv_find_protocol(backing_file); if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) { error_report("backing_file must be a sheepdog image"); - return -EINVAL; + ret = -EINVAL; + goto out; } ret = bdrv_file_open(&bs, backing_file, 0); - if (ret < 0) - return -EIO; + if (ret < 0) { + goto out; + } s = bs->opaque; if (!is_snapshot(&s->inode)) { error_report("cannot clone from a non snapshot vdi"); bdrv_delete(bs); - return -EINVAL; + ret = -EINVAL; + goto out; } base_vid = s->inode.vdi_id; bdrv_delete(bs); } - ret = do_sd_create(vdi, vdi_size, base_vid, &vid, 0, s.addr, s.port); + ret = do_sd_create(vdi, vdi_size, base_vid, &vid, 0, s->addr, s->port); if (!prealloc || ret) { - return ret; + goto out; } - return sd_prealloc(filename); + ret = sd_prealloc(filename); +out: + g_free(s); + return ret; } static void sd_close(BlockDriverState *bs) @@ -1466,8 +1398,11 @@ static void sd_close(BlockDriverState *bs) error_report("%s, %s", sd_strerror(rsp->result), s->name); } - qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL, NULL); + qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL); closesocket(s->fd); + if (s->cache_enabled) { + closesocket(s->flush_fd); + } g_free(s->addr); } @@ -1494,22 +1429,21 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset) fd = connect_to_sdog(s->addr, s->port); if (fd < 0) { - return -EIO; + return fd; } /* we don't need to update entire object */ datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id); s->inode.vdi_size = offset; ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id), - s->inode.nr_copies, datalen, 0, 0); + s->inode.nr_copies, datalen, 0, 0, s->cache_enabled); close(fd); if (ret < 0) { error_report("failed to update an inode."); - return -EIO; } - return 0; + return ret; } /* @@ -1517,7 +1451,7 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset) * update metadata, this sends a write request to the vdi object. * Otherwise, this switches back to sd_co_readv/writev. */ -static void sd_write_done(SheepdogAIOCB *acb) +static void coroutine_fn sd_write_done(SheepdogAIOCB *acb) { int ret; BDRVSheepdogState *s = acb->common.bs->opaque; @@ -1540,6 +1474,7 @@ static void sd_write_done(SheepdogAIOCB *acb) iov.iov_len = sizeof(s->inode); aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id), data_len, offset, 0, 0, offset); + QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings); ret = add_aio_request(s, aio_req, &iov, 1, 0, AIOCB_WRITE_UDATA); if (ret) { free_aio_req(s, aio_req); @@ -1579,11 +1514,12 @@ static int sd_create_branch(BDRVSheepdogState *s) fd = connect_to_sdog(s->addr, s->port); if (fd < 0) { error_report("failed to connect"); + ret = fd; goto out; } ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies, - SD_INODE_SIZE, 0); + SD_INODE_SIZE, 0, s->cache_enabled); closesocket(fd); @@ -1607,7 +1543,7 @@ out: * Send I/O requests to the server. * * This function sends requests to the server, links the requests to - * the outstanding_list in BDRVSheepdogState, and exits without + * the inflight_list in BDRVSheepdogState, and exits without * waiting the response. The responses are received in the * `aio_read_response' function which is called from the main loop as * a fd handler. @@ -1615,7 +1551,7 @@ out: * Returns 1 when we need to wait a response, 0 when there is no sent * request and -errno in error cases. */ -static int sd_co_rw_vector(void *p) +static int coroutine_fn sd_co_rw_vector(void *p) { SheepdogAIOCB *acb = p; int ret = 0; @@ -1639,6 +1575,12 @@ static int sd_co_rw_vector(void *p) } } + /* + * Make sure we don't free the aiocb before we are done with all requests. + * This additional reference is dropped at the end of this function. + */ + acb->nr_pending++; + while (done != total) { uint8_t flags = 0; uint64_t old_oid = 0; @@ -1663,22 +1605,18 @@ static int sd_co_rw_vector(void *p) } if (create) { - dprintf("update ino (%" PRIu32") %" PRIu64 " %" PRIu64 - " %" PRIu64 "\n", inode->vdi_id, oid, + dprintf("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n", + inode->vdi_id, oid, vid_to_data_oid(inode->data_vdi_id[idx], idx), idx); oid = vid_to_data_oid(inode->vdi_id, idx); - dprintf("new oid %lx\n", oid); + dprintf("new oid %" PRIx64 "\n", oid); } aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid, done); if (create) { AIOReq *areq; - QLIST_FOREACH(areq, &s->outstanding_aio_head, - outstanding_aio_siblings) { - if (areq == aio_req) { - continue; - } + QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) { if (areq->oid == oid) { /* * Sheepdog cannot handle simultaneous create @@ -1688,11 +1626,14 @@ static int sd_co_rw_vector(void *p) */ aio_req->flags = 0; aio_req->base_oid = 0; + QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req, + aio_siblings); goto done; } } } + QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings); ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov, create, acb->aiocb_type); if (ret < 0) { @@ -1707,22 +1648,22 @@ static int sd_co_rw_vector(void *p) done += len; } out: - if (QLIST_EMPTY(&acb->aioreq_head)) { + if (!--acb->nr_pending) { return acb->ret; } return 1; } -static int sd_co_writev(BlockDriverState *bs, int64_t sector_num, +static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num, int nb_sectors, QEMUIOVector *qiov) { SheepdogAIOCB *acb; int ret; if (bs->growable && sector_num + nb_sectors > bs->total_sectors) { - /* TODO: shouldn't block here */ - if (sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE) < 0) { - return -EIO; + ret = sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE); + if (ret < 0) { + return ret; } bs->total_sectors = sector_num + nb_sectors; } @@ -1742,7 +1683,7 @@ static int sd_co_writev(BlockDriverState *bs, int64_t sector_num, return acb->ret; } -static int sd_co_readv(BlockDriverState *bs, int64_t sector_num, +static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num, int nb_sectors, QEMUIOVector *qiov) { SheepdogAIOCB *acb; @@ -1771,6 +1712,44 @@ static int sd_co_readv(BlockDriverState *bs, int64_t sector_num, return acb->ret; } +static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs) +{ + BDRVSheepdogState *s = bs->opaque; + SheepdogObjReq hdr = { 0 }; + SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr; + SheepdogInode *inode = &s->inode; + int ret; + unsigned int wlen = 0, rlen = 0; + + if (!s->cache_enabled) { + return 0; + } + + hdr.opcode = SD_OP_FLUSH_VDI; + hdr.oid = vid_to_vdi_oid(inode->vdi_id); + + ret = do_req(s->flush_fd, (SheepdogReq *)&hdr, NULL, &wlen, &rlen); + if (ret) { + error_report("failed to send a request to the sheep"); + return ret; + } + + if (rsp->result == SD_RES_INVALID_PARMS) { + dprintf("disable write cache since the server doesn't support it\n"); + + s->cache_enabled = 0; + closesocket(s->flush_fd); + return 0; + } + + if (rsp->result != SD_RES_SUCCESS) { + error_report("%s", sd_strerror(rsp->result)); + return -EIO; + } + + return 0; +} + static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) { BDRVSheepdogState *s = bs->opaque; @@ -1779,7 +1758,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) SheepdogInode *inode; unsigned int datalen; - dprintf("sn_info: name %s id_str %s s: name %s vm_state_size %d " + dprintf("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " " "is_snapshot %d\n", sn_info->name, sn_info->id_str, s->name, sn_info->vm_state_size, s->is_snapshot); @@ -1801,15 +1780,14 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) /* refresh inode. */ fd = connect_to_sdog(s->addr, s->port); if (fd < 0) { - ret = -EIO; + ret = fd; goto cleanup; } ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id), - s->inode.nr_copies, datalen, 0, 0); + s->inode.nr_copies, datalen, 0, 0, s->cache_enabled); if (ret < 0) { error_report("failed to write snapshot's inode."); - ret = -EIO; goto cleanup; } @@ -1818,18 +1796,16 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) if (ret < 0) { error_report("failed to create inode for snapshot. %s", strerror(errno)); - ret = -EIO; goto cleanup; } inode = (SheepdogInode *)g_malloc(datalen); ret = read_object(fd, (char *)inode, vid_to_vdi_oid(new_vid), - s->inode.nr_copies, datalen, 0); + s->inode.nr_copies, datalen, 0, s->cache_enabled); if (ret < 0) { error_report("failed to read new inode info. %s", strerror(errno)); - ret = -EIO; goto cleanup; } @@ -1850,7 +1826,7 @@ static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id) char *buf = NULL; uint32_t vid; uint32_t snapid = 0; - int ret = -ENOENT, fd; + int ret = 0, fd; old_s = g_malloc(sizeof(BDRVSheepdogState)); @@ -1868,24 +1844,23 @@ static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id) ret = find_vdi_name(s, vdi, snapid, tag, &vid, 1); if (ret) { error_report("Failed to find_vdi_name"); - ret = -ENOENT; goto out; } fd = connect_to_sdog(s->addr, s->port); if (fd < 0) { error_report("failed to connect"); + ret = fd; goto out; } buf = g_malloc(SD_INODE_SIZE); ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies, - SD_INODE_SIZE, 0); + SD_INODE_SIZE, 0, s->cache_enabled); closesocket(fd); if (ret) { - ret = -ENOENT; goto out; } @@ -1938,6 +1913,7 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab) fd = connect_to_sdog(s->addr, s->port); if (fd < 0) { + ret = fd; goto out; } @@ -1965,6 +1941,7 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab) fd = connect_to_sdog(s->addr, s->port); if (fd < 0) { error_report("failed to connect"); + ret = fd; goto out; } @@ -1975,7 +1952,8 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab) /* we don't need to read entire object */ ret = read_object(fd, (char *)&inode, vid_to_vdi_oid(vid), - 0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0); + 0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0, + s->cache_enabled); if (ret) { continue; @@ -2001,6 +1979,10 @@ out: g_free(vdi_inuse); + if (ret < 0) { + return ret; + } + return found; } @@ -2008,7 +1990,7 @@ static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data, int64_t pos, int size, int load) { int fd, create; - int ret = 0; + int ret = 0, remaining = size; unsigned int data_len; uint64_t vmstate_oid; uint32_t vdi_index; @@ -2016,37 +1998,37 @@ static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data, fd = connect_to_sdog(s->addr, s->port); if (fd < 0) { - ret = -EIO; - goto cleanup; + return fd; } - while (size) { + while (remaining) { vdi_index = pos / SD_DATA_OBJ_SIZE; offset = pos % SD_DATA_OBJ_SIZE; - data_len = MIN(size, SD_DATA_OBJ_SIZE); + data_len = MIN(remaining, SD_DATA_OBJ_SIZE); vmstate_oid = vid_to_vmstate_oid(s->inode.vdi_id, vdi_index); create = (offset == 0); if (load) { ret = read_object(fd, (char *)data, vmstate_oid, - s->inode.nr_copies, data_len, offset); + s->inode.nr_copies, data_len, offset, + s->cache_enabled); } else { ret = write_object(fd, (char *)data, vmstate_oid, - s->inode.nr_copies, data_len, offset, create); + s->inode.nr_copies, data_len, offset, create, + s->cache_enabled); } if (ret < 0) { error_report("failed to save vmstate %s", strerror(errno)); - ret = -EIO; goto cleanup; } pos += data_len; - size -= data_len; - ret += data_len; + remaining -= data_len; } + ret = size; cleanup: closesocket(fd); return ret; @@ -2100,6 +2082,7 @@ BlockDriver bdrv_sheepdog = { .bdrv_co_readv = sd_co_readv, .bdrv_co_writev = sd_co_writev, + .bdrv_co_flush_to_disk = sd_co_flush_to_disk, .bdrv_snapshot_create = sd_snapshot_create, .bdrv_snapshot_goto = sd_snapshot_goto,