]> git.proxmox.com Git - mirror_qemu.git/blobdiff - block/nbd.c
target/arm: Explicitly select short-format FSR for M-profile
[mirror_qemu.git] / block / nbd.c
index 8ff6daf43d4665e732ef83f858e59dbe3abd5f5b..a3f8f8a9d5efc72ac95a4ed25406dc33d8d67fa2 100644 (file)
@@ -35,7 +35,6 @@
 #include "qemu/option.h"
 #include "qemu/cutils.h"
 #include "qemu/main-loop.h"
-#include "qemu/atomic.h"
 
 #include "qapi/qapi-visit-sockets.h"
 #include "qapi/qmp/qstring.h"
@@ -71,25 +70,39 @@ typedef struct BDRVNBDState {
     QIOChannel *ioc; /* The current I/O channel */
     NBDExportInfo info;
 
-    CoMutex send_mutex;
+    /*
+     * Protects state, free_sema, in_flight, requests[].coroutine,
+     * reconnect_delay_timer.
+     */
+    QemuMutex requests_lock;
+    NBDClientState state;
     CoQueue free_sema;
+    unsigned in_flight;
+    NBDClientRequest requests[MAX_NBD_REQUESTS];
+    QEMUTimer *reconnect_delay_timer;
+
+    /* Protects sending data on the socket.  */
+    CoMutex send_mutex;
 
+    /*
+     * Protects receiving reply headers from the socket, as well as the
+     * fields reply and requests[].receiving
+     */
     CoMutex receive_mutex;
-    int in_flight;
-    NBDClientState state;
+    NBDReply reply;
 
-    QEMUTimer *reconnect_delay_timer;
+    QEMUTimer *open_timer;
 
-    NBDClientRequest requests[MAX_NBD_REQUESTS];
-    NBDReply reply;
     BlockDriverState *bs;
 
     /* Connection parameters */
     uint32_t reconnect_delay;
+    uint32_t open_timeout;
     SocketAddress *saddr;
-    char *export, *tlscredsid;
+    char *export;
+    char *tlscredsid;
     QCryptoTLSCreds *tlscreds;
-    const char *hostname;
+    char *tlshostname;
     char *x_dirty_bitmap;
     bool alloc_depth;
 
@@ -107,6 +120,10 @@ static void nbd_clear_bdrvstate(BlockDriverState *bs)
 
     yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
 
+    /* Must not leave timers behind that would access freed data */
+    assert(!s->reconnect_delay_timer);
+    assert(!s->open_timer);
+
     object_unref(OBJECT(s->tlscreds));
     qapi_free_SocketAddress(s->saddr);
     s->saddr = NULL;
@@ -114,16 +131,14 @@ static void nbd_clear_bdrvstate(BlockDriverState *bs)
     s->export = NULL;
     g_free(s->tlscredsid);
     s->tlscredsid = NULL;
+    g_free(s->tlshostname);
+    s->tlshostname = NULL;
     g_free(s->x_dirty_bitmap);
     s->x_dirty_bitmap = NULL;
 }
 
-static bool nbd_client_connected(BDRVNBDState *s)
-{
-    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED;
-}
-
-static bool nbd_recv_coroutine_wake_one(NBDClientRequest *req)
+/* Called with s->receive_mutex taken.  */
+static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
 {
     if (req->receiving) {
         req->receiving = false;
@@ -134,33 +149,39 @@ static bool nbd_recv_coroutine_wake_one(NBDClientRequest *req)
     return false;
 }
 
-static void nbd_recv_coroutines_wake(BDRVNBDState *s, bool all)
+static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s)
 {
     int i;
 
+    QEMU_LOCK_GUARD(&s->receive_mutex);
     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
-        if (nbd_recv_coroutine_wake_one(&s->requests[i]) && !all) {
+        if (nbd_recv_coroutine_wake_one(&s->requests[i])) {
             return;
         }
     }
 }
 
-static void nbd_channel_error(BDRVNBDState *s, int ret)
+/* Called with s->requests_lock held.  */
+static void coroutine_fn nbd_channel_error_locked(BDRVNBDState *s, int ret)
 {
-    if (nbd_client_connected(s)) {
+    if (s->state == NBD_CLIENT_CONNECTED) {
         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
     }
 
     if (ret == -EIO) {
-        if (nbd_client_connected(s)) {
+        if (s->state == NBD_CLIENT_CONNECTED) {
             s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
                                             NBD_CLIENT_CONNECTING_NOWAIT;
         }
     } else {
         s->state = NBD_CLIENT_QUIT;
     }
+}
 
-    nbd_recv_coroutines_wake(s, true);
+static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
+{
+    QEMU_LOCK_GUARD(&s->requests_lock);
+    nbd_channel_error_locked(s, ret);
 }
 
 static void reconnect_delay_timer_del(BDRVNBDState *s)
@@ -175,23 +196,18 @@ static void reconnect_delay_timer_cb(void *opaque)
 {
     BDRVNBDState *s = opaque;
 
-    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
-        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
-        nbd_co_establish_connection_cancel(s->conn);
-        while (qemu_co_enter_next(&s->free_sema, NULL)) {
-            /* Resume all queued requests */
+    reconnect_delay_timer_del(s);
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        if (s->state != NBD_CLIENT_CONNECTING_WAIT) {
+            return;
         }
+        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
     }
-
-    reconnect_delay_timer_del(s);
+    nbd_co_establish_connection_cancel(s->conn);
 }
 
 static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
 {
-    if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTING_WAIT) {
-        return;
-    }
-
     assert(!s->reconnect_delay_timer);
     s->reconnect_delay_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
                                              QEMU_CLOCK_REALTIME,
@@ -214,19 +230,44 @@ static void nbd_teardown_connection(BlockDriverState *bs)
         s->ioc = NULL;
     }
 
-    s->state = NBD_CLIENT_QUIT;
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        s->state = NBD_CLIENT_QUIT;
+    }
 }
 
-static bool nbd_client_connecting(BDRVNBDState *s)
+static void open_timer_del(BDRVNBDState *s)
 {
-    NBDClientState state = qatomic_load_acquire(&s->state);
-    return state == NBD_CLIENT_CONNECTING_WAIT ||
-        state == NBD_CLIENT_CONNECTING_NOWAIT;
+    if (s->open_timer) {
+        timer_free(s->open_timer);
+        s->open_timer = NULL;
+    }
 }
 
-static bool nbd_client_connecting_wait(BDRVNBDState *s)
+static void open_timer_cb(void *opaque)
 {
-    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
+    BDRVNBDState *s = opaque;
+
+    nbd_co_establish_connection_cancel(s->conn);
+    open_timer_del(s);
+}
+
+static void open_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
+{
+    assert(!s->open_timer);
+    s->open_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
+                                  QEMU_CLOCK_REALTIME,
+                                  SCALE_NS,
+                                  open_timer_cb, s);
+    timer_mod(s->open_timer, expire_time_ns);
+}
+
+static bool nbd_client_will_reconnect(BDRVNBDState *s)
+{
+    /*
+     * Called only after a socket error, so this is not performance sensitive.
+     */
+    QEMU_LOCK_GUARD(&s->requests_lock);
+    return s->state == NBD_CLIENT_CONNECTING_WAIT;
 }
 
 /*
@@ -275,12 +316,13 @@ static int nbd_handle_updated_info(BlockDriverState *bs, Error **errp)
 }
 
 int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
-                                                Error **errp)
+                                                bool blocking, Error **errp)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int ret;
-    bool blocking = nbd_client_connecting_wait(s);
+    IO_CODE();
 
+    assert_bdrv_graph_readable();
     assert(!s->ioc);
 
     s->ioc = nbd_co_establish_connection(s->conn, &s->info, blocking, errp);
@@ -313,35 +355,46 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
     qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
 
     /* successfully connected */
-    s->state = NBD_CLIENT_CONNECTED;
-    qemu_co_queue_restart_all(&s->free_sema);
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        s->state = NBD_CLIENT_CONNECTED;
+    }
 
     return 0;
 }
 
-/* called under s->send_mutex */
-static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
+/* Called with s->requests_lock held.  */
+static bool nbd_client_connecting(BDRVNBDState *s)
 {
+    return s->state == NBD_CLIENT_CONNECTING_WAIT ||
+        s->state == NBD_CLIENT_CONNECTING_NOWAIT;
+}
+
+/* Called with s->requests_lock taken.  */
+static void coroutine_fn GRAPH_RDLOCK nbd_reconnect_attempt(BDRVNBDState *s)
+{
+    int ret;
+    bool blocking = s->state == NBD_CLIENT_CONNECTING_WAIT;
+
+    /*
+     * Now we are sure that nobody is accessing the channel, and no one will
+     * try until we set the state to CONNECTED.
+     */
     assert(nbd_client_connecting(s));
-    assert(s->in_flight == 0);
+    assert(s->in_flight == 1);
 
-    if (nbd_client_connecting_wait(s) && s->reconnect_delay &&
-        !s->reconnect_delay_timer)
-    {
+    trace_nbd_reconnect_attempt(s->bs->in_flight);
+
+    if (blocking && !s->reconnect_delay_timer) {
         /*
-         * It's first reconnect attempt after switching to
+         * It's the first reconnect attempt after switching to
          * NBD_CLIENT_CONNECTING_WAIT
          */
+        g_assert(s->reconnect_delay);
         reconnect_delay_timer_init(s,
             qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
             s->reconnect_delay * NANOSECONDS_PER_SECOND);
     }
 
-    /*
-     * Now we are sure that nobody is accessing the channel, and no one will
-     * try until we set the state to CONNECTED.
-     */
-
     /* Finalize previous connection if any */
     if (s->ioc) {
         qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
@@ -351,7 +404,17 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
         s->ioc = NULL;
     }
 
-    nbd_co_do_establish_connection(s->bs, NULL);
+    qemu_mutex_unlock(&s->requests_lock);
+    ret = nbd_co_do_establish_connection(s->bs, blocking, NULL);
+    trace_nbd_reconnect_attempt_result(ret, s->bs->in_flight);
+    qemu_mutex_lock(&s->requests_lock);
+
+    /*
+     * The reconnect attempt is done (maybe successfully, maybe not), so
+     * we no longer need this timer.  Delete it so it will not outlive
+     * this I/O request (so draining removes all timers).
+     */
+    reconnect_delay_timer_del(s);
 }
 
 static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
@@ -366,10 +429,6 @@ static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
             return 0;
         }
 
-        if (!nbd_client_connected(s)) {
-            return -EIO;
-        }
-
         if (s->reply.handle != 0) {
             /*
              * Some other request is being handled now. It should already be
@@ -384,11 +443,10 @@ static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
 
             qemu_coroutine_yield();
             /*
-             * We may be woken for 3 reasons:
+             * We may be woken for 2 reasons:
              * 1. From this function, executing in parallel coroutine, when our
              *    handle is received.
-             * 2. From nbd_channel_error(), when connection is lost.
-             * 3. From nbd_co_receive_one_chunk(), when previous request is
+             * 2. From nbd_co_receive_one_chunk(), when previous request is
              *    finished and s->reply.handle set to 0.
              * Anyway, it's OK to lock the mutex and go to the next iteration.
              */
@@ -410,51 +468,43 @@ static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
             nbd_channel_error(s, -EINVAL);
             return -EINVAL;
         }
-        if (s->reply.handle == handle) {
-            /* We are done */
-            return 0;
-        }
         ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
         if (ind2 >= MAX_NBD_REQUESTS || !s->requests[ind2].coroutine) {
-            /*
-             * We only check that ind2 request exists. But don't check
-             * whether it is now waiting for the reply header or
-             * not. We can't just check s->requests[ind2].receiving:
-             * ind2 request may wait in trying to lock
-             * receive_mutex. So that's a TODO.
-             */
             nbd_channel_error(s, -EINVAL);
             return -EINVAL;
         }
+        if (s->reply.handle == handle) {
+            /* We are done */
+            return 0;
+        }
         nbd_recv_coroutine_wake_one(&s->requests[ind2]);
     }
 }
 
-static int nbd_co_send_request(BlockDriverState *bs,
-                               NBDRequest *request,
-                               QEMUIOVector *qiov)
+static int coroutine_fn GRAPH_RDLOCK
+nbd_co_send_request(BlockDriverState *bs, NBDRequest *request,
+                    QEMUIOVector *qiov)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int rc, i = -1;
 
-    qemu_co_mutex_lock(&s->send_mutex);
-
+    qemu_mutex_lock(&s->requests_lock);
     while (s->in_flight == MAX_NBD_REQUESTS ||
-           (!nbd_client_connected(s) && s->in_flight > 0))
-    {
-        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
-    }
-
-    if (nbd_client_connecting(s)) {
-        nbd_reconnect_attempt(s);
-    }
-
-    if (!nbd_client_connected(s)) {
-        rc = -EIO;
-        goto err;
+           (s->state != NBD_CLIENT_CONNECTED && s->in_flight > 0)) {
+        qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
     }
 
     s->in_flight++;
+    if (s->state != NBD_CLIENT_CONNECTED) {
+        if (nbd_client_connecting(s)) {
+            nbd_reconnect_attempt(s);
+            qemu_co_queue_restart_all(&s->free_sema);
+        }
+        if (s->state != NBD_CLIENT_CONNECTED) {
+            rc = -EIO;
+            goto err;
+        }
+    }
 
     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
         if (s->requests[i].coroutine == NULL) {
@@ -462,13 +512,13 @@ static int nbd_co_send_request(BlockDriverState *bs,
         }
     }
 
-    g_assert(qemu_in_coroutine());
     assert(i < MAX_NBD_REQUESTS);
-
     s->requests[i].coroutine = qemu_coroutine_self();
     s->requests[i].offset = request->from;
     s->requests[i].receiving = false;
+    qemu_mutex_unlock(&s->requests_lock);
 
+    qemu_co_mutex_lock(&s->send_mutex);
     request->handle = INDEX_TO_HANDLE(s, i);
 
     assert(s->ioc);
@@ -476,29 +526,27 @@ static int nbd_co_send_request(BlockDriverState *bs,
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
         rc = nbd_send_request(s->ioc, request);
-        if (nbd_client_connected(s) && rc >= 0) {
-            if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
-                                       NULL) < 0) {
-                rc = -EIO;
-            }
-        } else if (rc >= 0) {
+        if (rc >= 0 && qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
+                                              NULL) < 0) {
             rc = -EIO;
         }
         qio_channel_set_cork(s->ioc, false);
     } else {
         rc = nbd_send_request(s->ioc, request);
     }
+    qemu_co_mutex_unlock(&s->send_mutex);
 
-err:
     if (rc < 0) {
-        nbd_channel_error(s, rc);
+        qemu_mutex_lock(&s->requests_lock);
+err:
+        nbd_channel_error_locked(s, rc);
         if (i != -1) {
             s->requests[i].coroutine = NULL;
-            s->in_flight--;
-            qemu_co_queue_next(&s->free_sema);
         }
+        s->in_flight--;
+        qemu_co_queue_next(&s->free_sema);
+        qemu_mutex_unlock(&s->requests_lock);
     }
-    qemu_co_mutex_unlock(&s->send_mutex);
     return rc;
 }
 
@@ -685,9 +733,9 @@ static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
     return 0;
 }
 
-static int nbd_co_receive_offset_data_payload(BDRVNBDState *s,
-                                              uint64_t orig_offset,
-                                              QEMUIOVector *qiov, Error **errp)
+static int coroutine_fn
+nbd_co_receive_offset_data_payload(BDRVNBDState *s, uint64_t orig_offset,
+                                   QEMUIOVector *qiov, Error **errp)
 {
     QEMUIOVector sub_qiov;
     uint64_t offset;
@@ -793,8 +841,8 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
     }
     *request_ret = 0;
 
-    nbd_receive_replies(s, handle);
-    if (!nbd_client_connected(s)) {
+    ret = nbd_receive_replies(s, handle);
+    if (ret < 0) {
         error_setg(errp, "Connection closed");
         return -EIO;
     }
@@ -886,7 +934,7 @@ static coroutine_fn int nbd_co_receive_one_chunk(
     }
     s->reply.handle = 0;
 
-    nbd_recv_coroutines_wake(s, false);
+    nbd_recv_coroutines_wake(s);
 
     return ret;
 }
@@ -936,21 +984,17 @@ static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
  * nbd_reply_chunk_iter_receive
  * The pointer stored in @payload requires g_free() to free it.
  */
-static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
-                                         NBDReplyChunkIter *iter,
-                                         uint64_t handle,
-                                         QEMUIOVector *qiov, NBDReply *reply,
-                                         void **payload)
+static bool coroutine_fn nbd_reply_chunk_iter_receive(BDRVNBDState *s,
+                                                      NBDReplyChunkIter *iter,
+                                                      uint64_t handle,
+                                                      QEMUIOVector *qiov,
+                                                      NBDReply *reply,
+                                                      void **payload)
 {
     int ret, request_ret;
     NBDReply local_reply;
     NBDStructuredReplyChunk *chunk;
     Error *local_err = NULL;
-    if (!nbd_client_connected(s)) {
-        error_setg(&local_err, "Connection closed");
-        nbd_iter_channel_error(iter, -EIO, &local_err);
-        goto break_loop;
-    }
 
     if (iter->done) {
         /* Previous iteration was last. */
@@ -971,7 +1015,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     }
 
     /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
-    if (nbd_reply_is_simple(reply) || !nbd_client_connected(s)) {
+    if (nbd_reply_is_simple(reply) || iter->ret < 0) {
         goto break_loop;
     }
 
@@ -993,18 +1037,17 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     return true;
 
 break_loop:
+    qemu_mutex_lock(&s->requests_lock);
     s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
-
-    qemu_co_mutex_lock(&s->send_mutex);
     s->in_flight--;
     qemu_co_queue_next(&s->free_sema);
-    qemu_co_mutex_unlock(&s->send_mutex);
+    qemu_mutex_unlock(&s->requests_lock);
 
     return false;
 }
 
-static int nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
-                                      int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
+                                                   int *request_ret, Error **errp)
 {
     NBDReplyChunkIter iter;
 
@@ -1017,9 +1060,9 @@ static int nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
     return iter.ret;
 }
 
-static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
-                                        uint64_t offset, QEMUIOVector *qiov,
-                                        int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
+                                                     uint64_t offset, QEMUIOVector *qiov,
+                                                     int *request_ret, Error **errp)
 {
     NBDReplyChunkIter iter;
     NBDReply reply;
@@ -1069,10 +1112,10 @@ static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
     return iter.ret;
 }
 
-static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
-                                            uint64_t handle, uint64_t length,
-                                            NBDExtent *extent,
-                                            int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
+                                                         uint64_t handle, uint64_t length,
+                                                         NBDExtent *extent,
+                                                         int *request_ret, Error **errp)
 {
     NBDReplyChunkIter iter;
     NBDReply reply;
@@ -1129,8 +1172,9 @@ static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
     return iter.ret;
 }
 
-static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
-                          QEMUIOVector *write_qiov)
+static int coroutine_fn GRAPH_RDLOCK
+nbd_co_request(BlockDriverState *bs, NBDRequest *request,
+               QEMUIOVector *write_qiov)
 {
     int ret, request_ret;
     Error *local_err = NULL;
@@ -1161,14 +1205,14 @@ static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
             error_free(local_err);
             local_err = NULL;
         }
-    } while (ret < 0 && nbd_client_connecting_wait(s));
+    } while (ret < 0 && nbd_client_will_reconnect(s));
 
     return ret ? ret : request_ret;
 }
 
-static int nbd_client_co_preadv(BlockDriverState *bs, int64_t offset,
-                                int64_t bytes, QEMUIOVector *qiov,
-                                BdrvRequestFlags flags)
+static int coroutine_fn GRAPH_RDLOCK
+nbd_client_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
+                     QEMUIOVector *qiov, BdrvRequestFlags flags)
 {
     int ret, request_ret;
     Error *local_err = NULL;
@@ -1180,7 +1224,6 @@ static int nbd_client_co_preadv(BlockDriverState *bs, int64_t offset,
     };
 
     assert(bytes <= NBD_MAX_BUFFER_SIZE);
-    assert(!flags);
 
     if (!bytes) {
         return 0;
@@ -1220,14 +1263,14 @@ static int nbd_client_co_preadv(BlockDriverState *bs, int64_t offset,
             error_free(local_err);
             local_err = NULL;
         }
-    } while (ret < 0 && nbd_client_connecting_wait(s));
+    } while (ret < 0 && nbd_client_will_reconnect(s));
 
     return ret ? ret : request_ret;
 }
 
-static int nbd_client_co_pwritev(BlockDriverState *bs, int64_t offset,
-                                 int64_t bytes, QEMUIOVector *qiov,
-                                 BdrvRequestFlags flags)
+static int coroutine_fn GRAPH_RDLOCK
+nbd_client_co_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
+                      QEMUIOVector *qiov, BdrvRequestFlags flags)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = {
@@ -1250,8 +1293,9 @@ static int nbd_client_co_pwritev(BlockDriverState *bs, int64_t offset,
     return nbd_co_request(bs, &request, qiov);
 }
 
-static int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
-                                       int64_t bytes, BdrvRequestFlags flags)
+static int coroutine_fn GRAPH_RDLOCK
+nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset, int64_t bytes,
+                            BdrvRequestFlags flags)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = {
@@ -1285,7 +1329,7 @@ static int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
     return nbd_co_request(bs, &request, NULL);
 }
 
-static int nbd_client_co_flush(BlockDriverState *bs)
+static int coroutine_fn GRAPH_RDLOCK nbd_client_co_flush(BlockDriverState *bs)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = { .type = NBD_CMD_FLUSH };
@@ -1300,8 +1344,8 @@ static int nbd_client_co_flush(BlockDriverState *bs)
     return nbd_co_request(bs, &request, NULL);
 }
 
-static int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset,
-                                  int64_t bytes)
+static int coroutine_fn GRAPH_RDLOCK
+nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = {
@@ -1320,7 +1364,7 @@ static int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset,
     return nbd_co_request(bs, &request, NULL);
 }
 
-static int coroutine_fn nbd_client_co_block_status(
+static int coroutine_fn GRAPH_RDLOCK nbd_client_co_block_status(
         BlockDriverState *bs, bool want_zero, int64_t offset, int64_t bytes,
         int64_t *pnum, int64_t *map, BlockDriverState **file)
 {
@@ -1378,7 +1422,7 @@ static int coroutine_fn nbd_client_co_block_status(
             error_free(local_err);
             local_err = NULL;
         }
-    } while (ret < 0 && nbd_client_connecting_wait(s));
+    } while (ret < 0 && nbd_client_will_reconnect(s));
 
     if (ret < 0 || request_ret < 0) {
         return ret ? ret : request_ret;
@@ -1410,8 +1454,9 @@ static void nbd_yank(void *opaque)
     BlockDriverState *bs = opaque;
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
-    qatomic_store_release(&s->state, NBD_CLIENT_QUIT);
+    QEMU_LOCK_GUARD(&s->requests_lock);
     qio_channel_shutdown(QIO_CHANNEL(s->ioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+    s->state = NBD_CLIENT_QUIT;
 }
 
 static void nbd_client_close(BlockDriverState *bs)
@@ -1730,6 +1775,11 @@ static QemuOptsList nbd_runtime_opts = {
             .type = QEMU_OPT_STRING,
             .help = "ID of the TLS credentials to use",
         },
+        {
+            .name = "tls-hostname",
+            .type = QEMU_OPT_STRING,
+            .help = "Override hostname for validating TLS x509 certificate",
+        },
         {
             .name = "x-dirty-bitmap",
             .type = QEMU_OPT_STRING,
@@ -1747,6 +1797,15 @@ static QemuOptsList nbd_runtime_opts = {
                     "future requests before a successful reconnect will "
                     "immediately fail. Default 0",
         },
+        {
+            .name = "open-timeout",
+            .type = QEMU_OPT_NUMBER,
+            .help = "In seconds. If zero, the nbd driver tries the connection "
+                    "only once, and fails to open if the connection fails. "
+                    "If non-zero, the nbd driver will repeat connection "
+                    "attempts until successful or until @open-timeout seconds "
+                    "have elapsed. Default 0",
+        },
         { /* end of list */ }
     },
 };
@@ -1787,12 +1846,11 @@ static int nbd_process_options(BlockDriverState *bs, QDict *options,
             goto error;
         }
 
-        /* TODO SOCKET_ADDRESS_KIND_FD where fd has AF_INET or AF_INET6 */
-        if (s->saddr->type != SOCKET_ADDRESS_TYPE_INET) {
-            error_setg(errp, "TLS only supported over IP sockets");
-            goto error;
+        s->tlshostname = g_strdup(qemu_opt_get(opts, "tls-hostname"));
+        if (!s->tlshostname &&
+            s->saddr->type == SOCKET_ADDRESS_TYPE_INET) {
+            s->tlshostname = g_strdup(s->saddr->u.inet.host);
         }
-        s->hostname = s->saddr->u.inet.host;
     }
 
     s->x_dirty_bitmap = g_strdup(qemu_opt_get(opts, "x-dirty-bitmap"));
@@ -1802,6 +1860,7 @@ static int nbd_process_options(BlockDriverState *bs, QDict *options,
     }
 
     s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);
+    s->open_timeout = qemu_opt_get_number(opts, "open-timeout", 0);
 
     ret = 0;
 
@@ -1817,8 +1876,9 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
     s->bs = bs;
-    qemu_co_mutex_init(&s->send_mutex);
+    qemu_mutex_init(&s->requests_lock);
     qemu_co_queue_init(&s->free_sema);
+    qemu_co_mutex_init(&s->send_mutex);
     qemu_co_mutex_init(&s->receive_mutex);
 
     if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
@@ -1831,29 +1891,38 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
     }
 
     s->conn = nbd_client_connection_new(s->saddr, true, s->export,
-                                        s->x_dirty_bitmap, s->tlscreds);
+                                        s->x_dirty_bitmap, s->tlscreds,
+                                        s->tlshostname);
+
+    if (s->open_timeout) {
+        nbd_client_connection_enable_retry(s->conn);
+        open_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
+                        s->open_timeout * NANOSECONDS_PER_SECOND);
+    }
 
-    /* TODO: Configurable retry-until-timeout behaviour. */
     s->state = NBD_CLIENT_CONNECTING_WAIT;
-    ret = nbd_do_establish_connection(bs, errp);
+    ret = nbd_do_establish_connection(bs, true, errp);
     if (ret < 0) {
         goto fail;
     }
 
+    /*
+     * The connect attempt is done, so we no longer need this timer.
+     * Delete it, because we do not want it to be around when this node
+     * is drained or closed.
+     */
+    open_timer_del(s);
+
     nbd_client_connection_enable_retry(s->conn);
 
     return 0;
 
 fail:
+    open_timer_del(s);
     nbd_clear_bdrvstate(bs);
     return ret;
 }
 
-static int nbd_co_flush(BlockDriverState *bs)
-{
-    return nbd_client_co_flush(bs);
-}
-
 static void nbd_refresh_limits(BlockDriverState *bs, Error **errp)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
@@ -1921,7 +1990,7 @@ static int coroutine_fn nbd_co_truncate(BlockDriverState *bs, int64_t offset,
     return 0;
 }
 
-static int64_t nbd_getlength(BlockDriverState *bs)
+static int64_t coroutine_fn nbd_co_getlength(BlockDriverState *bs)
 {
     BDRVNBDState *s = bs->opaque;
 
@@ -1979,6 +2048,7 @@ static const char *const nbd_strong_runtime_opts[] = {
     "port",
     "export",
     "tls-creds",
+    "tls-hostname",
     "server.",
 
     NULL
@@ -1990,14 +2060,51 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
 
     reconnect_delay_timer_del(s);
 
+    qemu_mutex_lock(&s->requests_lock);
     if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
-        qemu_co_queue_restart_all(&s->free_sema);
     }
+    qemu_mutex_unlock(&s->requests_lock);
 
     nbd_co_establish_connection_cancel(s->conn);
 }
 
+static void nbd_attach_aio_context(BlockDriverState *bs,
+                                   AioContext *new_context)
+{
+    BDRVNBDState *s = bs->opaque;
+
+    /* The open_timer is used only during nbd_open() */
+    assert(!s->open_timer);
+
+    /*
+     * The reconnect_delay_timer is scheduled in I/O paths when the
+     * connection is lost, to cancel the reconnection attempt after a
+     * given time.  Once this attempt is done (successfully or not),
+     * nbd_reconnect_attempt() ensures the timer is deleted before the
+     * respective I/O request is resumed.
+     * Since the AioContext can only be changed when a node is drained,
+     * the reconnect_delay_timer cannot be active here.
+     */
+    assert(!s->reconnect_delay_timer);
+
+    if (s->ioc) {
+        qio_channel_attach_aio_context(s->ioc, new_context);
+    }
+}
+
+static void nbd_detach_aio_context(BlockDriverState *bs)
+{
+    BDRVNBDState *s = bs->opaque;
+
+    assert(!s->open_timer);
+    assert(!s->reconnect_delay_timer);
+
+    if (s->ioc) {
+        qio_channel_detach_aio_context(s->ioc);
+    }
+}
+
 static BlockDriver bdrv_nbd = {
     .format_name                = "nbd",
     .protocol_name              = "nbd",
@@ -2011,16 +2118,19 @@ static BlockDriver bdrv_nbd = {
     .bdrv_co_pwritev            = nbd_client_co_pwritev,
     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
     .bdrv_close                 = nbd_close,
-    .bdrv_co_flush_to_os        = nbd_co_flush,
+    .bdrv_co_flush_to_os        = nbd_client_co_flush,
     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
     .bdrv_refresh_limits        = nbd_refresh_limits,
     .bdrv_co_truncate           = nbd_co_truncate,
-    .bdrv_getlength             = nbd_getlength,
+    .bdrv_co_getlength          = nbd_co_getlength,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
     .strong_runtime_opts        = nbd_strong_runtime_opts,
     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
+
+    .bdrv_attach_aio_context    = nbd_attach_aio_context,
+    .bdrv_detach_aio_context    = nbd_detach_aio_context,
 };
 
 static BlockDriver bdrv_nbd_tcp = {
@@ -2036,16 +2146,19 @@ static BlockDriver bdrv_nbd_tcp = {
     .bdrv_co_pwritev            = nbd_client_co_pwritev,
     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
     .bdrv_close                 = nbd_close,
-    .bdrv_co_flush_to_os        = nbd_co_flush,
+    .bdrv_co_flush_to_os        = nbd_client_co_flush,
     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
     .bdrv_refresh_limits        = nbd_refresh_limits,
     .bdrv_co_truncate           = nbd_co_truncate,
-    .bdrv_getlength             = nbd_getlength,
+    .bdrv_co_getlength          = nbd_co_getlength,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
     .strong_runtime_opts        = nbd_strong_runtime_opts,
     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
+
+    .bdrv_attach_aio_context    = nbd_attach_aio_context,
+    .bdrv_detach_aio_context    = nbd_detach_aio_context,
 };
 
 static BlockDriver bdrv_nbd_unix = {
@@ -2061,16 +2174,19 @@ static BlockDriver bdrv_nbd_unix = {
     .bdrv_co_pwritev            = nbd_client_co_pwritev,
     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
     .bdrv_close                 = nbd_close,
-    .bdrv_co_flush_to_os        = nbd_co_flush,
+    .bdrv_co_flush_to_os        = nbd_client_co_flush,
     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
     .bdrv_refresh_limits        = nbd_refresh_limits,
     .bdrv_co_truncate           = nbd_co_truncate,
-    .bdrv_getlength             = nbd_getlength,
+    .bdrv_co_getlength          = nbd_co_getlength,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
     .strong_runtime_opts        = nbd_strong_runtime_opts,
     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
+
+    .bdrv_attach_aio_context    = nbd_attach_aio_context,
+    .bdrv_detach_aio_context    = nbd_detach_aio_context,
 };
 
 static void bdrv_nbd_init(void)