]> git.proxmox.com Git - mirror_qemu.git/blobdiff - block/nbd-client.c
rbd: Fix to cleanly reject -drive without pool or image
[mirror_qemu.git] / block / nbd-client.c
index 40b28ab00beb330782e3085f7ff744f1cef9fdae..1e2952fdae6f4dd8474a4419a88cf7426eda7f6c 100644 (file)
@@ -1,6 +1,7 @@
 /*
  * QEMU Block driver for  NBD
  *
+ * Copyright (C) 2016 Red Hat, Inc.
  * Copyright (C) 2008 Bull S.A.S.
  *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
  *
 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
 #define INDEX_TO_HANDLE(bs, index)  ((index)  ^ ((uint64_t)(intptr_t)bs))
 
-static void nbd_recv_coroutines_enter_all(NbdClientSession *s)
+static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
 {
     int i;
 
     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
         if (s->recv_coroutine[i]) {
-            qemu_coroutine_enter(s->recv_coroutine[i]);
+            aio_co_wake(s->recv_coroutine[i]);
         }
     }
 }
 
 static void nbd_teardown_connection(BlockDriverState *bs)
 {
-    NbdClientSession *client = nbd_get_client_session(bs);
+    NBDClientSession *client = nbd_get_client_session(bs);
 
     if (!client->ioc) { /* Already closed */
         return;
@@ -55,7 +56,7 @@ static void nbd_teardown_connection(BlockDriverState *bs)
     qio_channel_shutdown(client->ioc,
                          QIO_CHANNEL_SHUTDOWN_BOTH,
                          NULL);
-    nbd_recv_coroutines_enter_all(client);
+    BDRV_POLL_WHILE(bs, client->read_reply_co);
 
     nbd_client_detach_aio_context(bs);
     object_unref(OBJECT(client->sioc));
@@ -64,62 +65,52 @@ static void nbd_teardown_connection(BlockDriverState *bs)
     client->ioc = NULL;
 }
 
-static void nbd_reply_ready(void *opaque)
+static coroutine_fn void nbd_read_reply_entry(void *opaque)
 {
-    BlockDriverState *bs = opaque;
-    NbdClientSession *s = nbd_get_client_session(bs);
+    NBDClientSession *s = opaque;
     uint64_t i;
     int ret;
 
-    if (!s->ioc) { /* Already closed */
-        return;
-    }
-
-    if (s->reply.handle == 0) {
-        /* No reply already in flight.  Fetch a header.  It is possible
-         * that another thread has done the same thing in parallel, so
-         * the socket is not readable anymore.
-         */
+    for (;;) {
+        assert(s->reply.handle == 0);
         ret = nbd_receive_reply(s->ioc, &s->reply);
-        if (ret == -EAGAIN) {
-            return;
-        }
-        if (ret < 0) {
-            s->reply.handle = 0;
-            goto fail;
+        if (ret <= 0) {
+            break;
         }
-    }
 
-    /* There's no need for a mutex on the receive side, because the
-     * handler acts as a synchronization point and ensures that only
-     * one coroutine is called until the reply finishes.  */
-    i = HANDLE_TO_INDEX(s, s->reply.handle);
-    if (i >= MAX_NBD_REQUESTS) {
-        goto fail;
-    }
+        /* There's no need for a mutex on the receive side, because the
+         * handler acts as a synchronization point and ensures that only
+         * one coroutine is called until the reply finishes.
+         */
+        i = HANDLE_TO_INDEX(s, s->reply.handle);
+        if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) {
+            break;
+        }
 
-    if (s->recv_coroutine[i]) {
-        qemu_coroutine_enter(s->recv_coroutine[i]);
-        return;
+        /* We're woken up by the recv_coroutine itself.  Note that there
+         * is no race between yielding and reentering read_reply_co.  This
+         * is because:
+         *
+         * - if recv_coroutine[i] runs on the same AioContext, it is only
+         *   entered after we yield
+         *
+         * - if recv_coroutine[i] runs on a different AioContext, reentering
+         *   read_reply_co happens through a bottom half, which can only
+         *   run after we yield.
+         */
+        aio_co_wake(s->recv_coroutine[i]);
+        qemu_coroutine_yield();
     }
 
-fail:
-    nbd_teardown_connection(bs);
-}
-
-static void nbd_restart_write(void *opaque)
-{
-    BlockDriverState *bs = opaque;
-
-    qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
+    nbd_recv_coroutines_enter_all(s);
+    s->read_reply_co = NULL;
 }
 
 static int nbd_co_send_request(BlockDriverState *bs,
-                               struct nbd_request *request,
+                               NBDRequest *request,
                                QEMUIOVector *qiov)
 {
-    NbdClientSession *s = nbd_get_client_session(bs);
-    AioContext *aio_context;
+    NBDClientSession *s = nbd_get_client_session(bs);
     int rc, ret, i;
 
     qemu_co_mutex_lock(&s->send_mutex);
@@ -140,11 +131,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
         return -EPIPE;
     }
 
-    s->send_coroutine = qemu_coroutine_self();
-    aio_context = bdrv_get_aio_context(bs);
-
-    aio_set_fd_handler(aio_context, s->sioc->fd, false,
-                       nbd_reply_ready, nbd_restart_write, bs);
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
         rc = nbd_send_request(s->ioc, request);
@@ -159,22 +145,18 @@ static int nbd_co_send_request(BlockDriverState *bs,
     } else {
         rc = nbd_send_request(s->ioc, request);
     }
-    aio_set_fd_handler(aio_context, s->sioc->fd, false,
-                       nbd_reply_ready, NULL, bs);
-    s->send_coroutine = NULL;
     qemu_co_mutex_unlock(&s->send_mutex);
     return rc;
 }
 
-static void nbd_co_receive_reply(NbdClientSession *s,
-                                 struct nbd_request *request,
-                                 struct nbd_reply *reply,
+static void nbd_co_receive_reply(NBDClientSession *s,
+                                 NBDRequest *request,
+                                 NBDReply *reply,
                                  QEMUIOVector *qiov)
 {
     int ret;
 
-    /* Wait until we're woken up by the read handler.  TODO: perhaps
-     * peek at the next reply and avoid yielding if it's ours?  */
+    /* Wait until we're woken up by nbd_read_reply_entry.  */
     qemu_coroutine_yield();
     *reply = s->reply;
     if (reply->handle != request->handle ||
@@ -194,13 +176,13 @@ static void nbd_co_receive_reply(NbdClientSession *s,
     }
 }
 
-static void nbd_coroutine_start(NbdClientSession *s,
-   struct nbd_request *request)
+static void nbd_coroutine_start(NBDClientSession *s,
+                                NBDRequest *request)
 {
     /* Poor man semaphore.  The free_sema is locked when no other request
      * can be accepted, and unlocked after receiving one reply.  */
     if (s->in_flight == MAX_NBD_REQUESTS) {
-        qemu_co_queue_wait(&s->free_sema);
+        qemu_co_queue_wait(&s->free_sema, NULL);
         assert(s->in_flight < MAX_NBD_REQUESTS);
     }
     s->in_flight++;
@@ -208,26 +190,32 @@ static void nbd_coroutine_start(NbdClientSession *s,
     /* s->recv_coroutine[i] is set as soon as we get the send_lock.  */
 }
 
-static void nbd_coroutine_end(NbdClientSession *s,
-    struct nbd_request *request)
+static void nbd_coroutine_end(BlockDriverState *bs,
+                              NBDRequest *request)
 {
+    NBDClientSession *s = nbd_get_client_session(bs);
     int i = HANDLE_TO_INDEX(s, request->handle);
+
     s->recv_coroutine[i] = NULL;
-    if (s->in_flight-- == MAX_NBD_REQUESTS) {
-        qemu_co_queue_next(&s->free_sema);
+    s->in_flight--;
+    qemu_co_queue_next(&s->free_sema);
+
+    /* Kick the read_reply_co to get the next reply.  */
+    if (s->read_reply_co) {
+        aio_co_wake(s->read_reply_co);
     }
 }
 
 int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
                          uint64_t bytes, QEMUIOVector *qiov, int flags)
 {
-    NbdClientSession *client = nbd_get_client_session(bs);
-    struct nbd_request request = {
+    NBDClientSession *client = nbd_get_client_session(bs);
+    NBDRequest request = {
         .type = NBD_CMD_READ,
         .from = offset,
         .len = bytes,
     };
-    struct nbd_reply reply;
+    NBDReply reply;
     ssize_t ret;
 
     assert(bytes <= NBD_MAX_BUFFER_SIZE);
@@ -240,25 +228,25 @@ int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
     } else {
         nbd_co_receive_reply(client, &request, &reply, qiov);
     }
-    nbd_coroutine_end(client, &request);
+    nbd_coroutine_end(bs, &request);
     return -reply.error;
 }
 
 int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
                           uint64_t bytes, QEMUIOVector *qiov, int flags)
 {
-    NbdClientSession *client = nbd_get_client_session(bs);
-    struct nbd_request request = {
+    NBDClientSession *client = nbd_get_client_session(bs);
+    NBDRequest request = {
         .type = NBD_CMD_WRITE,
         .from = offset,
         .len = bytes,
     };
-    struct nbd_reply reply;
+    NBDReply reply;
     ssize_t ret;
 
     if (flags & BDRV_REQ_FUA) {
         assert(client->nbdflags & NBD_FLAG_SEND_FUA);
-        request.type |= NBD_CMD_FLAG_FUA;
+        request.flags |= NBD_CMD_FLAG_FUA;
     }
 
     assert(bytes <= NBD_MAX_BUFFER_SIZE);
@@ -270,15 +258,50 @@ int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
     } else {
         nbd_co_receive_reply(client, &request, &reply, NULL);
     }
-    nbd_coroutine_end(client, &request);
+    nbd_coroutine_end(bs, &request);
+    return -reply.error;
+}
+
+int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
+                                int count, BdrvRequestFlags flags)
+{
+    ssize_t ret;
+    NBDClientSession *client = nbd_get_client_session(bs);
+    NBDRequest request = {
+        .type = NBD_CMD_WRITE_ZEROES,
+        .from = offset,
+        .len = count,
+    };
+    NBDReply reply;
+
+    if (!(client->nbdflags & NBD_FLAG_SEND_WRITE_ZEROES)) {
+        return -ENOTSUP;
+    }
+
+    if (flags & BDRV_REQ_FUA) {
+        assert(client->nbdflags & NBD_FLAG_SEND_FUA);
+        request.flags |= NBD_CMD_FLAG_FUA;
+    }
+    if (!(flags & BDRV_REQ_MAY_UNMAP)) {
+        request.flags |= NBD_CMD_FLAG_NO_HOLE;
+    }
+
+    nbd_coroutine_start(client, &request);
+    ret = nbd_co_send_request(bs, &request, NULL);
+    if (ret < 0) {
+        reply.error = -ret;
+    } else {
+        nbd_co_receive_reply(client, &request, &reply, NULL);
+    }
+    nbd_coroutine_end(bs, &request);
     return -reply.error;
 }
 
 int nbd_client_co_flush(BlockDriverState *bs)
 {
-    NbdClientSession *client = nbd_get_client_session(bs);
-    struct nbd_request request = { .type = NBD_CMD_FLUSH };
-    struct nbd_reply reply;
+    NBDClientSession *client = nbd_get_client_session(bs);
+    NBDRequest request = { .type = NBD_CMD_FLUSH };
+    NBDReply reply;
     ssize_t ret;
 
     if (!(client->nbdflags & NBD_FLAG_SEND_FLUSH)) {
@@ -295,19 +318,19 @@ int nbd_client_co_flush(BlockDriverState *bs)
     } else {
         nbd_co_receive_reply(client, &request, &reply, NULL);
     }
-    nbd_coroutine_end(client, &request);
+    nbd_coroutine_end(bs, &request);
     return -reply.error;
 }
 
 int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count)
 {
-    NbdClientSession *client = nbd_get_client_session(bs);
-    struct nbd_request request = {
+    NBDClientSession *client = nbd_get_client_session(bs);
+    NBDRequest request = {
         .type = NBD_CMD_TRIM,
         .from = offset,
         .len = count,
     };
-    struct nbd_reply reply;
+    NBDReply reply;
     ssize_t ret;
 
     if (!(client->nbdflags & NBD_FLAG_SEND_TRIM)) {
@@ -321,33 +344,29 @@ int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count)
     } else {
         nbd_co_receive_reply(client, &request, &reply, NULL);
     }
-    nbd_coroutine_end(client, &request);
+    nbd_coroutine_end(bs, &request);
     return -reply.error;
 
 }
 
 void nbd_client_detach_aio_context(BlockDriverState *bs)
 {
-    aio_set_fd_handler(bdrv_get_aio_context(bs),
-                       nbd_get_client_session(bs)->sioc->fd,
-                       false, NULL, NULL, NULL);
+    NBDClientSession *client = nbd_get_client_session(bs);
+    qio_channel_detach_aio_context(QIO_CHANNEL(client->sioc));
 }
 
 void nbd_client_attach_aio_context(BlockDriverState *bs,
                                    AioContext *new_context)
 {
-    aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
-                       false, nbd_reply_ready, NULL, bs);
+    NBDClientSession *client = nbd_get_client_session(bs);
+    qio_channel_attach_aio_context(QIO_CHANNEL(client->sioc), new_context);
+    aio_co_schedule(new_context, client->read_reply_co);
 }
 
 void nbd_client_close(BlockDriverState *bs)
 {
-    NbdClientSession *client = nbd_get_client_session(bs);
-    struct nbd_request request = {
-        .type = NBD_CMD_DISC,
-        .from = 0,
-        .len = 0
-    };
+    NBDClientSession *client = nbd_get_client_session(bs);
+    NBDRequest request = { .type = NBD_CMD_DISC };
 
     if (client->ioc == NULL) {
         return;
@@ -365,7 +384,7 @@ int nbd_client_init(BlockDriverState *bs,
                     const char *hostname,
                     Error **errp)
 {
-    NbdClientSession *client = nbd_get_client_session(bs);
+    NBDClientSession *client = nbd_get_client_session(bs);
     int ret;
 
     /* NBD handshake */
@@ -383,6 +402,10 @@ int nbd_client_init(BlockDriverState *bs,
     }
     if (client->nbdflags & NBD_FLAG_SEND_FUA) {
         bs->supported_write_flags = BDRV_REQ_FUA;
+        bs->supported_zero_flags |= BDRV_REQ_FUA;
+    }
+    if (client->nbdflags & NBD_FLAG_SEND_WRITE_ZEROES) {
+        bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
     }
 
     qemu_co_mutex_init(&client->send_mutex);
@@ -398,7 +421,7 @@ int nbd_client_init(BlockDriverState *bs,
     /* Now that we're connected, set the socket to be non-blocking and
      * kick the reply mechanism.  */
     qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
-
+    client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client);
     nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
 
     logout("Established connection with NBD server\n");