]> git.proxmox.com Git - mirror_qemu.git/blobdiff - block/nbd.c
target/riscv: Allow debugger to access seed CSR
[mirror_qemu.git] / block / nbd.c
index 3cbee762de8dd79290bec18c05f927634d11e641..bf2894ad5c9b8b5c695b24f4286c70b99bac84f5 100644 (file)
@@ -35,7 +35,6 @@
 #include "qemu/option.h"
 #include "qemu/cutils.h"
 #include "qemu/main-loop.h"
-#include "qemu/atomic.h"
 
 #include "qapi/qapi-visit-sockets.h"
 #include "qapi/qmp/qstring.h"
@@ -57,7 +56,7 @@
 typedef struct {
     Coroutine *coroutine;
     uint64_t offset;        /* original offset of the request */
-    bool receiving;         /* waiting for connection_co? */
+    bool receiving;         /* sleeping in the yield in nbd_receive_replies */
 } NBDClientRequest;
 
 typedef enum NBDClientState {
@@ -71,29 +70,39 @@ typedef struct BDRVNBDState {
     QIOChannel *ioc; /* The current I/O channel */
     NBDExportInfo info;
 
-    CoMutex send_mutex;
-    CoQueue free_sema;
-    Coroutine *connection_co;
-    Coroutine *teardown_co;
-    QemuCoSleep reconnect_sleep;
-    bool drained;
-    bool wait_drained_end;
-    int in_flight;
+    /*
+     * Protects state, free_sema, in_flight, requests[].coroutine,
+     * reconnect_delay_timer.
+     */
+    QemuMutex requests_lock;
     NBDClientState state;
-    bool wait_in_flight;
-
+    CoQueue free_sema;
+    unsigned in_flight;
+    NBDClientRequest requests[MAX_NBD_REQUESTS];
     QEMUTimer *reconnect_delay_timer;
 
-    NBDClientRequest requests[MAX_NBD_REQUESTS];
+    /* Protects sending data on the socket.  */
+    CoMutex send_mutex;
+
+    /*
+     * Protects receiving reply headers from the socket, as well as the
+     * fields reply and requests[].receiving
+     */
+    CoMutex receive_mutex;
     NBDReply reply;
+
+    QEMUTimer *open_timer;
+
     BlockDriverState *bs;
 
     /* Connection parameters */
     uint32_t reconnect_delay;
+    uint32_t open_timeout;
     SocketAddress *saddr;
-    char *export, *tlscredsid;
+    char *export;
+    char *tlscredsid;
     QCryptoTLSCreds *tlscreds;
-    const char *hostname;
+    char *tlshostname;
     char *x_dirty_bitmap;
     bool alloc_depth;
 
@@ -111,6 +120,10 @@ static void nbd_clear_bdrvstate(BlockDriverState *bs)
 
     yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
 
+    /* Must not leave timers behind that would access freed data */
+    assert(!s->reconnect_delay_timer);
+    assert(!s->open_timer);
+
     object_unref(OBJECT(s->tlscreds));
     qapi_free_SocketAddress(s->saddr);
     s->saddr = NULL;
@@ -118,42 +131,57 @@ static void nbd_clear_bdrvstate(BlockDriverState *bs)
     s->export = NULL;
     g_free(s->tlscredsid);
     s->tlscredsid = NULL;
+    g_free(s->tlshostname);
+    s->tlshostname = NULL;
     g_free(s->x_dirty_bitmap);
     s->x_dirty_bitmap = NULL;
 }
 
-static bool nbd_client_connected(BDRVNBDState *s)
+/* Called with s->receive_mutex taken.  */
+static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
 {
-    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED;
+    if (req->receiving) {
+        req->receiving = false;
+        aio_co_wake(req->coroutine);
+        return true;
+    }
+
+    return false;
 }
 
-static void nbd_channel_error(BDRVNBDState *s, int ret)
+static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s)
 {
+    int i;
+
+    QEMU_LOCK_GUARD(&s->receive_mutex);
+    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
+        if (nbd_recv_coroutine_wake_one(&s->requests[i])) {
+            return;
+        }
+    }
+}
+
+/* Called with s->requests_lock held.  */
+static void coroutine_fn nbd_channel_error_locked(BDRVNBDState *s, int ret)
+{
+    if (s->state == NBD_CLIENT_CONNECTED) {
+        qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+    }
+
     if (ret == -EIO) {
-        if (nbd_client_connected(s)) {
+        if (s->state == NBD_CLIENT_CONNECTED) {
             s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
                                             NBD_CLIENT_CONNECTING_NOWAIT;
         }
     } else {
-        if (nbd_client_connected(s)) {
-            qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
-        }
         s->state = NBD_CLIENT_QUIT;
     }
 }
 
-static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
+static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
 {
-    int i;
-
-    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
-        NBDClientRequest *req = &s->requests[i];
-
-        if (req->coroutine && req->receiving) {
-            req->receiving = false;
-            aio_co_wake(req->coroutine);
-        }
-    }
+    QEMU_LOCK_GUARD(&s->requests_lock);
+    nbd_channel_error_locked(s, ret);
 }
 
 static void reconnect_delay_timer_del(BDRVNBDState *s)
@@ -168,22 +196,18 @@ static void reconnect_delay_timer_cb(void *opaque)
 {
     BDRVNBDState *s = opaque;
 
-    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
-        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
-        while (qemu_co_enter_next(&s->free_sema, NULL)) {
-            /* Resume all queued requests */
+    reconnect_delay_timer_del(s);
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        if (s->state != NBD_CLIENT_CONNECTING_WAIT) {
+            return;
         }
+        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
     }
-
-    reconnect_delay_timer_del(s);
+    nbd_co_establish_connection_cancel(s->conn);
 }
 
 static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
 {
-    if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTING_WAIT) {
-        return;
-    }
-
     assert(!s->reconnect_delay_timer);
     s->reconnect_delay_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
                                              QEMU_CLOCK_REALTIME,
@@ -192,125 +216,58 @@ static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
     timer_mod(s->reconnect_delay_timer, expire_time_ns);
 }
 
-static void nbd_client_detach_aio_context(BlockDriverState *bs)
+static void nbd_teardown_connection(BlockDriverState *bs)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
-    /* Timer is deleted in nbd_client_co_drain_begin() */
-    assert(!s->reconnect_delay_timer);
-    /*
-     * If reconnect is in progress we may have no ->ioc.  It will be
-     * re-instantiated in the proper aio context once the connection is
-     * reestablished.
-     */
+    assert(!s->in_flight);
+
     if (s->ioc) {
-        qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
+        qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
+                                 nbd_yank, s->bs);
+        object_unref(OBJECT(s->ioc));
+        s->ioc = NULL;
     }
-}
 
-static void nbd_client_attach_aio_context_bh(void *opaque)
-{
-    BlockDriverState *bs = opaque;
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    if (s->connection_co) {
-        /*
-         * The node is still drained, so we know the coroutine has yielded in
-         * nbd_read_eof(), the only place where bs->in_flight can reach 0, or
-         * it is entered for the first time. Both places are safe for entering
-         * the coroutine.
-         */
-        qemu_aio_coroutine_enter(bs->aio_context, s->connection_co);
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        s->state = NBD_CLIENT_QUIT;
     }
-    bdrv_dec_in_flight(bs);
 }
 
-static void nbd_client_attach_aio_context(BlockDriverState *bs,
-                                          AioContext *new_context)
+static void open_timer_del(BDRVNBDState *s)
 {
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    /*
-     * s->connection_co is either yielded from nbd_receive_reply or from
-     * nbd_co_reconnect_loop()
-     */
-    if (nbd_client_connected(s)) {
-        qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
+    if (s->open_timer) {
+        timer_free(s->open_timer);
+        s->open_timer = NULL;
     }
-
-    bdrv_inc_in_flight(bs);
-
-    /*
-     * Need to wait here for the BH to run because the BH must run while the
-     * node is still drained.
-     */
-    aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
 }
 
-static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
+static void open_timer_cb(void *opaque)
 {
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    s->drained = true;
-    qemu_co_sleep_wake(&s->reconnect_sleep);
+    BDRVNBDState *s = opaque;
 
     nbd_co_establish_connection_cancel(s->conn);
-
-    reconnect_delay_timer_del(s);
-
-    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
-        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
-        qemu_co_queue_restart_all(&s->free_sema);
-    }
+    open_timer_del(s);
 }
 
-static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
+static void open_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
 {
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    s->drained = false;
-    if (s->wait_drained_end) {
-        s->wait_drained_end = false;
-        aio_co_wake(s->connection_co);
-    }
+    assert(!s->open_timer);
+    s->open_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
+                                  QEMU_CLOCK_REALTIME,
+                                  SCALE_NS,
+                                  open_timer_cb, s);
+    timer_mod(s->open_timer, expire_time_ns);
 }
 
-
-static void nbd_teardown_connection(BlockDriverState *bs)
+static bool nbd_client_will_reconnect(BDRVNBDState *s)
 {
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    if (s->ioc) {
-        /* finish any pending coroutines */
-        qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
-    }
-
-    s->state = NBD_CLIENT_QUIT;
-    if (s->connection_co) {
-        qemu_co_sleep_wake(&s->reconnect_sleep);
-        nbd_co_establish_connection_cancel(s->conn);
-    }
-    if (qemu_in_coroutine()) {
-        s->teardown_co = qemu_coroutine_self();
-        /* connection_co resumes us when it terminates */
-        qemu_coroutine_yield();
-        s->teardown_co = NULL;
-    } else {
-        BDRV_POLL_WHILE(bs, s->connection_co);
-    }
-    assert(!s->connection_co);
-}
-
-static bool nbd_client_connecting(BDRVNBDState *s)
-{
-    NBDClientState state = qatomic_load_acquire(&s->state);
-    return state == NBD_CLIENT_CONNECTING_WAIT ||
-        state == NBD_CLIENT_CONNECTING_NOWAIT;
-}
-
-static bool nbd_client_connecting_wait(BDRVNBDState *s)
-{
-    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
+    /*
+     * Called only after a socket error, so this is not performance sensitive.
+     */
+    QEMU_LOCK_GUARD(&s->requests_lock);
+    return s->state == NBD_CLIENT_CONNECTING_WAIT;
 }
 
 /*
@@ -359,18 +316,22 @@ static int nbd_handle_updated_info(BlockDriverState *bs, Error **errp)
 }
 
 int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
-                                                Error **errp)
+                                                bool blocking, Error **errp)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int ret;
+    IO_CODE();
 
     assert(!s->ioc);
 
-    s->ioc = nbd_co_establish_connection(s->conn, &s->info, true, errp);
+    s->ioc = nbd_co_establish_connection(s->conn, &s->info, blocking, errp);
     if (!s->ioc) {
         return -ECONNREFUSED;
     }
 
+    yank_register_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank,
+                           bs);
+
     ret = nbd_handle_updated_info(s->bs, NULL);
     if (ret < 0) {
         /*
@@ -381,6 +342,8 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
 
         nbd_send_request(s->ioc, &request);
 
+        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
+                                 nbd_yank, bs);
         object_unref(OBJECT(s->ioc));
         s->ioc = NULL;
 
@@ -390,45 +353,46 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
     qio_channel_set_blocking(s->ioc, false, NULL);
     qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
 
-    yank_register_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank,
-                           bs);
-
     /* successfully connected */
-    s->state = NBD_CLIENT_CONNECTED;
-    qemu_co_queue_restart_all(&s->free_sema);
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        s->state = NBD_CLIENT_CONNECTED;
+    }
 
     return 0;
 }
 
-static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
+/* Called with s->requests_lock held.  */
+static bool nbd_client_connecting(BDRVNBDState *s)
 {
-    if (!nbd_client_connecting(s)) {
-        return;
-    }
-
-    /* Wait for completion of all in-flight requests */
-
-    qemu_co_mutex_lock(&s->send_mutex);
-
-    while (s->in_flight > 0) {
-        qemu_co_mutex_unlock(&s->send_mutex);
-        nbd_recv_coroutines_wake_all(s);
-        s->wait_in_flight = true;
-        qemu_coroutine_yield();
-        s->wait_in_flight = false;
-        qemu_co_mutex_lock(&s->send_mutex);
-    }
-
-    qemu_co_mutex_unlock(&s->send_mutex);
+    return s->state == NBD_CLIENT_CONNECTING_WAIT ||
+        s->state == NBD_CLIENT_CONNECTING_NOWAIT;
+}
 
-    if (!nbd_client_connecting(s)) {
-        return;
-    }
+/* Called with s->requests_lock taken.  */
+static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
+{
+    int ret;
+    bool blocking = s->state == NBD_CLIENT_CONNECTING_WAIT;
 
     /*
      * Now we are sure that nobody is accessing the channel, and no one will
      * try until we set the state to CONNECTED.
      */
+    assert(nbd_client_connecting(s));
+    assert(s->in_flight == 1);
+
+    trace_nbd_reconnect_attempt(s->bs->in_flight);
+
+    if (blocking && !s->reconnect_delay_timer) {
+        /*
+         * It's the first reconnect attempt after switching to
+         * NBD_CLIENT_CONNECTING_WAIT
+         */
+        g_assert(s->reconnect_delay);
+        reconnect_delay_timer_init(s,
+            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
+            s->reconnect_delay * NANOSECONDS_PER_SECOND);
+    }
 
     /* Finalize previous connection if any */
     if (s->ioc) {
@@ -439,158 +403,107 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
         s->ioc = NULL;
     }
 
-    nbd_co_do_establish_connection(s->bs, NULL);
+    qemu_mutex_unlock(&s->requests_lock);
+    ret = nbd_co_do_establish_connection(s->bs, blocking, NULL);
+    trace_nbd_reconnect_attempt_result(ret, s->bs->in_flight);
+    qemu_mutex_lock(&s->requests_lock);
+
+    /*
+     * The reconnect attempt is done (maybe successfully, maybe not), so
+     * we no longer need this timer.  Delete it so it will not outlive
+     * this I/O request (so draining removes all timers).
+     */
+    reconnect_delay_timer_del(s);
 }
 
-static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
+static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
 {
-    uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
-    uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
-
-    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
-        reconnect_delay_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
-                                   s->reconnect_delay * NANOSECONDS_PER_SECOND);
-    }
-
-    nbd_reconnect_attempt(s);
+    int ret;
+    uint64_t ind = HANDLE_TO_INDEX(s, handle), ind2;
+    QEMU_LOCK_GUARD(&s->receive_mutex);
 
-    while (nbd_client_connecting(s)) {
-        if (s->drained) {
-            bdrv_dec_in_flight(s->bs);
-            s->wait_drained_end = true;
-            while (s->drained) {
-                /*
-                 * We may be entered once from nbd_client_attach_aio_context_bh
-                 * and then from nbd_client_co_drain_end. So here is a loop.
-                 */
-                qemu_coroutine_yield();
-            }
-            bdrv_inc_in_flight(s->bs);
-        } else {
-            qemu_co_sleep_ns_wakeable(&s->reconnect_sleep,
-                                      QEMU_CLOCK_REALTIME, timeout);
-            if (s->drained) {
-                continue;
-            }
-            if (timeout < max_timeout) {
-                timeout *= 2;
-            }
+    while (true) {
+        if (s->reply.handle == handle) {
+            /* We are done */
+            return 0;
         }
 
-        nbd_reconnect_attempt(s);
-    }
-
-    reconnect_delay_timer_del(s);
-}
-
-static coroutine_fn void nbd_connection_entry(void *opaque)
-{
-    BDRVNBDState *s = opaque;
-    uint64_t i;
-    int ret = 0;
-    Error *local_err = NULL;
+        if (s->reply.handle != 0) {
+            /*
+             * Some other request is being handled now. It should already be
+             * woken by whoever set s->reply.handle (or never wait in this
+             * yield). So, we should not wake it here.
+             */
+            ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
+            assert(!s->requests[ind2].receiving);
 
-    while (qatomic_load_acquire(&s->state) != NBD_CLIENT_QUIT) {
-        /*
-         * The NBD client can only really be considered idle when it has
-         * yielded from qio_channel_readv_all_eof(), waiting for data. This is
-         * the point where the additional scheduled coroutine entry happens
-         * after nbd_client_attach_aio_context().
-         *
-         * Therefore we keep an additional in_flight reference all the time and
-         * only drop it temporarily here.
-         */
+            s->requests[ind].receiving = true;
+            qemu_co_mutex_unlock(&s->receive_mutex);
 
-        if (nbd_client_connecting(s)) {
-            nbd_co_reconnect_loop(s);
-        }
+            qemu_coroutine_yield();
+            /*
+             * We may be woken for 2 reasons:
+             * 1. From this function, executing in parallel coroutine, when our
+             *    handle is received.
+             * 2. From nbd_co_receive_one_chunk(), when previous request is
+             *    finished and s->reply.handle set to 0.
+             * Anyway, it's OK to lock the mutex and go to the next iteration.
+             */
 
-        if (!nbd_client_connected(s)) {
+            qemu_co_mutex_lock(&s->receive_mutex);
+            assert(!s->requests[ind].receiving);
             continue;
         }
 
+        /* We are under mutex and handle is 0. We have to do the dirty work. */
         assert(s->reply.handle == 0);
-        ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
-
-        if (local_err) {
-            trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
-            error_free(local_err);
-            local_err = NULL;
-        }
+        ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, NULL);
         if (ret <= 0) {
-            nbd_channel_error(s, ret ? ret : -EIO);
-            continue;
+            ret = ret ? ret : -EIO;
+            nbd_channel_error(s, ret);
+            return ret;
         }
-
-        /*
-         * There's no need for a mutex on the receive side, because the
-         * handler acts as a synchronization point and ensures that only
-         * one coroutine is called until the reply finishes.
-         */
-        i = HANDLE_TO_INDEX(s, s->reply.handle);
-        if (i >= MAX_NBD_REQUESTS ||
-            !s->requests[i].coroutine ||
-            !s->requests[i].receiving ||
-            (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
-        {
+        if (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply) {
             nbd_channel_error(s, -EINVAL);
-            continue;
+            return -EINVAL;
         }
-
-        /*
-         * We're woken up again by the request itself.  Note that there
-         * is no race between yielding and reentering connection_co.  This
-         * is because:
-         *
-         * - if the request runs on the same AioContext, it is only
-         *   entered after we yield
-         *
-         * - if the request runs on a different AioContext, reentering
-         *   connection_co happens through a bottom half, which can only
-         *   run after we yield.
-         */
-        s->requests[i].receiving = false;
-        aio_co_wake(s->requests[i].coroutine);
-        qemu_coroutine_yield();
-    }
-
-    qemu_co_queue_restart_all(&s->free_sema);
-    nbd_recv_coroutines_wake_all(s);
-    bdrv_dec_in_flight(s->bs);
-
-    s->connection_co = NULL;
-    if (s->ioc) {
-        qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
-        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
-                                 nbd_yank, s->bs);
-        object_unref(OBJECT(s->ioc));
-        s->ioc = NULL;
-    }
-
-    if (s->teardown_co) {
-        aio_co_wake(s->teardown_co);
+        ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
+        if (ind2 >= MAX_NBD_REQUESTS || !s->requests[ind2].coroutine) {
+            nbd_channel_error(s, -EINVAL);
+            return -EINVAL;
+        }
+        if (s->reply.handle == handle) {
+            /* We are done */
+            return 0;
+        }
+        nbd_recv_coroutine_wake_one(&s->requests[ind2]);
     }
-    aio_wait_kick();
 }
 
-static int nbd_co_send_request(BlockDriverState *bs,
-                               NBDRequest *request,
-                               QEMUIOVector *qiov)
+static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
+                                            NBDRequest *request,
+                                            QEMUIOVector *qiov)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int rc, i = -1;
 
-    qemu_co_mutex_lock(&s->send_mutex);
-    while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) {
-        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
-    }
-
-    if (!nbd_client_connected(s)) {
-        rc = -EIO;
-        goto err;
+    qemu_mutex_lock(&s->requests_lock);
+    while (s->in_flight == MAX_NBD_REQUESTS ||
+           (s->state != NBD_CLIENT_CONNECTED && s->in_flight > 0)) {
+        qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
     }
 
     s->in_flight++;
+    if (s->state != NBD_CLIENT_CONNECTED) {
+        if (nbd_client_connecting(s)) {
+            nbd_reconnect_attempt(s);
+            qemu_co_queue_restart_all(&s->free_sema);
+        }
+        if (s->state != NBD_CLIENT_CONNECTED) {
+            rc = -EIO;
+            goto err;
+        }
+    }
 
     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
         if (s->requests[i].coroutine == NULL) {
@@ -598,13 +511,13 @@ static int nbd_co_send_request(BlockDriverState *bs,
         }
     }
 
-    g_assert(qemu_in_coroutine());
     assert(i < MAX_NBD_REQUESTS);
-
     s->requests[i].coroutine = qemu_coroutine_self();
     s->requests[i].offset = request->from;
     s->requests[i].receiving = false;
+    qemu_mutex_unlock(&s->requests_lock);
 
+    qemu_co_mutex_lock(&s->send_mutex);
     request->handle = INDEX_TO_HANDLE(s, i);
 
     assert(s->ioc);
@@ -612,33 +525,27 @@ static int nbd_co_send_request(BlockDriverState *bs,
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
         rc = nbd_send_request(s->ioc, request);
-        if (nbd_client_connected(s) && rc >= 0) {
-            if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
-                                       NULL) < 0) {
-                rc = -EIO;
-            }
-        } else if (rc >= 0) {
+        if (rc >= 0 && qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
+                                              NULL) < 0) {
             rc = -EIO;
         }
         qio_channel_set_cork(s->ioc, false);
     } else {
         rc = nbd_send_request(s->ioc, request);
     }
+    qemu_co_mutex_unlock(&s->send_mutex);
 
-err:
     if (rc < 0) {
-        nbd_channel_error(s, rc);
+        qemu_mutex_lock(&s->requests_lock);
+err:
+        nbd_channel_error_locked(s, rc);
         if (i != -1) {
             s->requests[i].coroutine = NULL;
-            s->in_flight--;
-        }
-        if (s->in_flight == 0 && s->wait_in_flight) {
-            aio_co_wake(s->connection_co);
-        } else {
-            qemu_co_queue_next(&s->free_sema);
         }
+        s->in_flight--;
+        qemu_co_queue_next(&s->free_sema);
+        qemu_mutex_unlock(&s->requests_lock);
     }
-    qemu_co_mutex_unlock(&s->send_mutex);
     return rc;
 }
 
@@ -825,9 +732,9 @@ static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
     return 0;
 }
 
-static int nbd_co_receive_offset_data_payload(BDRVNBDState *s,
-                                              uint64_t orig_offset,
-                                              QEMUIOVector *qiov, Error **errp)
+static int coroutine_fn
+nbd_co_receive_offset_data_payload(BDRVNBDState *s, uint64_t orig_offset,
+                                   QEMUIOVector *qiov, Error **errp)
 {
     QEMUIOVector sub_qiov;
     uint64_t offset;
@@ -933,11 +840,8 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
     }
     *request_ret = 0;
 
-    /* Wait until we're woken up by nbd_connection_entry.  */
-    s->requests[i].receiving = true;
-    qemu_coroutine_yield();
-    assert(!s->requests[i].receiving);
-    if (!nbd_client_connected(s)) {
+    ret = nbd_receive_replies(s, handle);
+    if (ret < 0) {
         error_setg(errp, "Connection closed");
         return -EIO;
     }
@@ -1029,14 +933,7 @@ static coroutine_fn int nbd_co_receive_one_chunk(
     }
     s->reply.handle = 0;
 
-    if (s->connection_co && !s->wait_in_flight) {
-        /*
-         * We must check s->wait_in_flight, because we may entered by
-         * nbd_recv_coroutines_wake_all(), in this case we should not
-         * wake connection_co here, it will woken by last request.
-         */
-        aio_co_wake(s->connection_co);
-    }
+    nbd_recv_coroutines_wake(s);
 
     return ret;
 }
@@ -1086,21 +983,17 @@ static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
  * nbd_reply_chunk_iter_receive
  * The pointer stored in @payload requires g_free() to free it.
  */
-static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
-                                         NBDReplyChunkIter *iter,
-                                         uint64_t handle,
-                                         QEMUIOVector *qiov, NBDReply *reply,
-                                         void **payload)
+static bool coroutine_fn nbd_reply_chunk_iter_receive(BDRVNBDState *s,
+                                                      NBDReplyChunkIter *iter,
+                                                      uint64_t handle,
+                                                      QEMUIOVector *qiov,
+                                                      NBDReply *reply,
+                                                      void **payload)
 {
     int ret, request_ret;
     NBDReply local_reply;
     NBDStructuredReplyChunk *chunk;
     Error *local_err = NULL;
-    if (!nbd_client_connected(s)) {
-        error_setg(&local_err, "Connection closed");
-        nbd_iter_channel_error(iter, -EIO, &local_err);
-        goto break_loop;
-    }
 
     if (iter->done) {
         /* Previous iteration was last. */
@@ -1121,7 +1014,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     }
 
     /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
-    if (nbd_reply_is_simple(reply) || !nbd_client_connected(s)) {
+    if (nbd_reply_is_simple(reply) || iter->ret < 0) {
         goto break_loop;
     }
 
@@ -1143,22 +1036,17 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     return true;
 
 break_loop:
+    qemu_mutex_lock(&s->requests_lock);
     s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
-
-    qemu_co_mutex_lock(&s->send_mutex);
     s->in_flight--;
-    if (s->in_flight == 0 && s->wait_in_flight) {
-        aio_co_wake(s->connection_co);
-    } else {
-        qemu_co_queue_next(&s->free_sema);
-    }
-    qemu_co_mutex_unlock(&s->send_mutex);
+    qemu_co_queue_next(&s->free_sema);
+    qemu_mutex_unlock(&s->requests_lock);
 
     return false;
 }
 
-static int nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
-                                      int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
+                                                   int *request_ret, Error **errp)
 {
     NBDReplyChunkIter iter;
 
@@ -1171,9 +1059,9 @@ static int nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
     return iter.ret;
 }
 
-static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
-                                        uint64_t offset, QEMUIOVector *qiov,
-                                        int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
+                                                     uint64_t offset, QEMUIOVector *qiov,
+                                                     int *request_ret, Error **errp)
 {
     NBDReplyChunkIter iter;
     NBDReply reply;
@@ -1223,10 +1111,10 @@ static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
     return iter.ret;
 }
 
-static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
-                                            uint64_t handle, uint64_t length,
-                                            NBDExtent *extent,
-                                            int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
+                                                         uint64_t handle, uint64_t length,
+                                                         NBDExtent *extent,
+                                                         int *request_ret, Error **errp)
 {
     NBDReplyChunkIter iter;
     NBDReply reply;
@@ -1283,8 +1171,8 @@ static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
     return iter.ret;
 }
 
-static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
-                          QEMUIOVector *write_qiov)
+static int coroutine_fn nbd_co_request(BlockDriverState *bs, NBDRequest *request,
+                                       QEMUIOVector *write_qiov)
 {
     int ret, request_ret;
     Error *local_err = NULL;
@@ -1315,13 +1203,14 @@ static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
             error_free(local_err);
             local_err = NULL;
         }
-    } while (ret < 0 && nbd_client_connecting_wait(s));
+    } while (ret < 0 && nbd_client_will_reconnect(s));
 
     return ret ? ret : request_ret;
 }
 
-static int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
-                                uint64_t bytes, QEMUIOVector *qiov, int flags)
+static int coroutine_fn nbd_client_co_preadv(BlockDriverState *bs, int64_t offset,
+                                             int64_t bytes, QEMUIOVector *qiov,
+                                             BdrvRequestFlags flags)
 {
     int ret, request_ret;
     Error *local_err = NULL;
@@ -1333,7 +1222,6 @@ static int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
     };
 
     assert(bytes <= NBD_MAX_BUFFER_SIZE);
-    assert(!flags);
 
     if (!bytes) {
         return 0;
@@ -1373,13 +1261,14 @@ static int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
             error_free(local_err);
             local_err = NULL;
         }
-    } while (ret < 0 && nbd_client_connecting_wait(s));
+    } while (ret < 0 && nbd_client_will_reconnect(s));
 
     return ret ? ret : request_ret;
 }
 
-static int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
-                                 uint64_t bytes, QEMUIOVector *qiov, int flags)
+static int coroutine_fn nbd_client_co_pwritev(BlockDriverState *bs, int64_t offset,
+                                              int64_t bytes, QEMUIOVector *qiov,
+                                              BdrvRequestFlags flags)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = {
@@ -1402,16 +1291,18 @@ static int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
     return nbd_co_request(bs, &request, qiov);
 }
 
-static int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
-                                       int bytes, BdrvRequestFlags flags)
+static int coroutine_fn nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
+                                                    int64_t bytes, BdrvRequestFlags flags)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = {
         .type = NBD_CMD_WRITE_ZEROES,
         .from = offset,
-        .len = bytes,
+        .len = bytes,  /* .len is uint32_t actually */
     };
 
+    assert(bytes <= UINT32_MAX); /* rely on max_pwrite_zeroes */
+
     assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
     if (!(s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
         return -ENOTSUP;
@@ -1435,7 +1326,7 @@ static int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
     return nbd_co_request(bs, &request, NULL);
 }
 
-static int nbd_client_co_flush(BlockDriverState *bs)
+static int coroutine_fn nbd_client_co_flush(BlockDriverState *bs)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = { .type = NBD_CMD_FLUSH };
@@ -1450,16 +1341,18 @@ static int nbd_client_co_flush(BlockDriverState *bs)
     return nbd_co_request(bs, &request, NULL);
 }
 
-static int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset,
-                                  int bytes)
+static int coroutine_fn nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset,
+                                               int64_t bytes)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = {
         .type = NBD_CMD_TRIM,
         .from = offset,
-        .len = bytes,
+        .len = bytes, /* len is uint32_t */
     };
 
+    assert(bytes <= UINT32_MAX); /* rely on max_pdiscard */
+
     assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
     if (!(s->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
         return 0;
@@ -1526,7 +1419,7 @@ static int coroutine_fn nbd_client_co_block_status(
             error_free(local_err);
             local_err = NULL;
         }
-    } while (ret < 0 && nbd_client_connecting_wait(s));
+    } while (ret < 0 && nbd_client_will_reconnect(s));
 
     if (ret < 0 || request_ret < 0) {
         return ret ? ret : request_ret;
@@ -1558,8 +1451,9 @@ static void nbd_yank(void *opaque)
     BlockDriverState *bs = opaque;
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
-    qatomic_store_release(&s->state, NBD_CLIENT_QUIT);
+    QEMU_LOCK_GUARD(&s->requests_lock);
     qio_channel_shutdown(QIO_CHANNEL(s->ioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+    s->state = NBD_CLIENT_QUIT;
 }
 
 static void nbd_client_close(BlockDriverState *bs)
@@ -1839,9 +1733,9 @@ static QCryptoTLSCreds *nbd_get_tls_creds(const char *id, Error **errp)
         return NULL;
     }
 
-    if (creds->endpoint != QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT) {
-        error_setg(errp,
-                   "Expecting TLS credentials with a client endpoint");
+    if (!qcrypto_tls_creds_check_endpoint(creds,
+                                          QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT,
+                                          errp)) {
         return NULL;
     }
     object_ref(obj);
@@ -1878,6 +1772,11 @@ static QemuOptsList nbd_runtime_opts = {
             .type = QEMU_OPT_STRING,
             .help = "ID of the TLS credentials to use",
         },
+        {
+            .name = "tls-hostname",
+            .type = QEMU_OPT_STRING,
+            .help = "Override hostname for validating TLS x509 certificate",
+        },
         {
             .name = "x-dirty-bitmap",
             .type = QEMU_OPT_STRING,
@@ -1895,6 +1794,15 @@ static QemuOptsList nbd_runtime_opts = {
                     "future requests before a successful reconnect will "
                     "immediately fail. Default 0",
         },
+        {
+            .name = "open-timeout",
+            .type = QEMU_OPT_NUMBER,
+            .help = "In seconds. If zero, the nbd driver tries the connection "
+                    "only once, and fails to open if the connection fails. "
+                    "If non-zero, the nbd driver will repeat connection "
+                    "attempts until successful or until @open-timeout seconds "
+                    "have elapsed. Default 0",
+        },
         { /* end of list */ }
     },
 };
@@ -1935,12 +1843,11 @@ static int nbd_process_options(BlockDriverState *bs, QDict *options,
             goto error;
         }
 
-        /* TODO SOCKET_ADDRESS_KIND_FD where fd has AF_INET or AF_INET6 */
-        if (s->saddr->type != SOCKET_ADDRESS_TYPE_INET) {
-            error_setg(errp, "TLS only supported over IP sockets");
-            goto error;
+        s->tlshostname = g_strdup(qemu_opt_get(opts, "tls-hostname"));
+        if (!s->tlshostname &&
+            s->saddr->type == SOCKET_ADDRESS_TYPE_INET) {
+            s->tlshostname = g_strdup(s->saddr->u.inet.host);
         }
-        s->hostname = s->saddr->u.inet.host;
     }
 
     s->x_dirty_bitmap = g_strdup(qemu_opt_get(opts, "x-dirty-bitmap"));
@@ -1950,6 +1857,7 @@ static int nbd_process_options(BlockDriverState *bs, QDict *options,
     }
 
     s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);
+    s->open_timeout = qemu_opt_get_number(opts, "open-timeout", 0);
 
     ret = 0;
 
@@ -1965,8 +1873,10 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
     s->bs = bs;
-    qemu_co_mutex_init(&s->send_mutex);
+    qemu_mutex_init(&s->requests_lock);
     qemu_co_queue_init(&s->free_sema);
+    qemu_co_mutex_init(&s->send_mutex);
+    qemu_co_mutex_init(&s->receive_mutex);
 
     if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
         return -EEXIST;
@@ -1978,26 +1888,39 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
     }
 
     s->conn = nbd_client_connection_new(s->saddr, true, s->export,
-                                        s->x_dirty_bitmap, s->tlscreds);
+                                        s->x_dirty_bitmap, s->tlscreds,
+                                        s->tlshostname);
+
+    if (s->open_timeout) {
+        nbd_client_connection_enable_retry(s->conn);
+        open_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
+                        s->open_timeout * NANOSECONDS_PER_SECOND);
+    }
 
-    /* TODO: Configurable retry-until-timeout behaviour. */
-    ret = nbd_do_establish_connection(bs, errp);
+    s->state = NBD_CLIENT_CONNECTING_WAIT;
+    ret = nbd_do_establish_connection(bs, true, errp);
     if (ret < 0) {
         goto fail;
     }
 
-    s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
-    bdrv_inc_in_flight(bs);
-    aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
+    /*
+     * The connect attempt is done, so we no longer need this timer.
+     * Delete it, because we do not want it to be around when this node
+     * is drained or closed.
+     */
+    open_timer_del(s);
+
+    nbd_client_connection_enable_retry(s->conn);
 
     return 0;
 
 fail:
+    open_timer_del(s);
     nbd_clear_bdrvstate(bs);
     return ret;
 }
 
-static int nbd_co_flush(BlockDriverState *bs)
+static int coroutine_fn nbd_co_flush(BlockDriverState *bs)
 {
     return nbd_client_co_flush(bs);
 }
@@ -2069,7 +1992,7 @@ static int coroutine_fn nbd_co_truncate(BlockDriverState *bs, int64_t offset,
     return 0;
 }
 
-static int64_t nbd_getlength(BlockDriverState *bs)
+static int64_t coroutine_fn nbd_co_getlength(BlockDriverState *bs)
 {
     BDRVNBDState *s = bs->opaque;
 
@@ -2127,6 +2050,7 @@ static const char *const nbd_strong_runtime_opts[] = {
     "port",
     "export",
     "tls-creds",
+    "tls-hostname",
     "server.",
 
     NULL
@@ -2138,9 +2062,48 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
 
     reconnect_delay_timer_del(s);
 
+    qemu_mutex_lock(&s->requests_lock);
     if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
-        qemu_co_queue_restart_all(&s->free_sema);
+    }
+    qemu_mutex_unlock(&s->requests_lock);
+
+    nbd_co_establish_connection_cancel(s->conn);
+}
+
+static void nbd_attach_aio_context(BlockDriverState *bs,
+                                   AioContext *new_context)
+{
+    BDRVNBDState *s = bs->opaque;
+
+    /* The open_timer is used only during nbd_open() */
+    assert(!s->open_timer);
+
+    /*
+     * The reconnect_delay_timer is scheduled in I/O paths when the
+     * connection is lost, to cancel the reconnection attempt after a
+     * given time.  Once this attempt is done (successfully or not),
+     * nbd_reconnect_attempt() ensures the timer is deleted before the
+     * respective I/O request is resumed.
+     * Since the AioContext can only be changed when a node is drained,
+     * the reconnect_delay_timer cannot be active here.
+     */
+    assert(!s->reconnect_delay_timer);
+
+    if (s->ioc) {
+        qio_channel_attach_aio_context(s->ioc, new_context);
+    }
+}
+
+static void nbd_detach_aio_context(BlockDriverState *bs)
+{
+    BDRVNBDState *s = bs->opaque;
+
+    assert(!s->open_timer);
+    assert(!s->reconnect_delay_timer);
+
+    if (s->ioc) {
+        qio_channel_detach_aio_context(s->ioc);
     }
 }
 
@@ -2161,16 +2124,15 @@ static BlockDriver bdrv_nbd = {
     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
     .bdrv_refresh_limits        = nbd_refresh_limits,
     .bdrv_co_truncate           = nbd_co_truncate,
-    .bdrv_getlength             = nbd_getlength,
-    .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
-    .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
-    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
-    .bdrv_co_drain_end          = nbd_client_co_drain_end,
+    .bdrv_co_getlength          = nbd_co_getlength,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
     .strong_runtime_opts        = nbd_strong_runtime_opts,
     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
+
+    .bdrv_attach_aio_context    = nbd_attach_aio_context,
+    .bdrv_detach_aio_context    = nbd_detach_aio_context,
 };
 
 static BlockDriver bdrv_nbd_tcp = {
@@ -2190,16 +2152,15 @@ static BlockDriver bdrv_nbd_tcp = {
     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
     .bdrv_refresh_limits        = nbd_refresh_limits,
     .bdrv_co_truncate           = nbd_co_truncate,
-    .bdrv_getlength             = nbd_getlength,
-    .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
-    .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
-    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
-    .bdrv_co_drain_end          = nbd_client_co_drain_end,
+    .bdrv_co_getlength          = nbd_co_getlength,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
     .strong_runtime_opts        = nbd_strong_runtime_opts,
     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
+
+    .bdrv_attach_aio_context    = nbd_attach_aio_context,
+    .bdrv_detach_aio_context    = nbd_detach_aio_context,
 };
 
 static BlockDriver bdrv_nbd_unix = {
@@ -2219,16 +2180,15 @@ static BlockDriver bdrv_nbd_unix = {
     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
     .bdrv_refresh_limits        = nbd_refresh_limits,
     .bdrv_co_truncate           = nbd_co_truncate,
-    .bdrv_getlength             = nbd_getlength,
-    .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
-    .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
-    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
-    .bdrv_co_drain_end          = nbd_client_co_drain_end,
+    .bdrv_co_getlength          = nbd_co_getlength,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
     .strong_runtime_opts        = nbd_strong_runtime_opts,
     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
+
+    .bdrv_attach_aio_context    = nbd_attach_aio_context,
+    .bdrv_detach_aio_context    = nbd_detach_aio_context,
 };
 
 static void bdrv_nbd_init(void)