]> git.proxmox.com Git - mirror_qemu.git/blobdiff - block/nbd.c
Merge remote-tracking branch 'remotes/vivier2/tags/linux-user-for-6.0-pull-request...
[mirror_qemu.git] / block / nbd.c
index eed160c5cda10bd07f2f702c7151b101460607ed..c26dc5a54f5221ffa0d6d0adcddbb80fbb992eef 100644 (file)
 #include "qemu/option.h"
 #include "qemu/cutils.h"
 #include "qemu/main-loop.h"
+#include "qemu/atomic.h"
 
 #include "qapi/qapi-visit-sockets.h"
 #include "qapi/qmp/qstring.h"
+#include "qapi/clone-visitor.h"
 
 #include "block/qdict.h"
 #include "block/nbd.h"
 #include "block/block_int.h"
 
+#include "qemu/yank.h"
+
 #define EN_OPTSTR ":exportname="
 #define MAX_NBD_REQUESTS    16
 
@@ -62,6 +66,47 @@ typedef enum NBDClientState {
     NBD_CLIENT_QUIT
 } NBDClientState;
 
+typedef enum NBDConnectThreadState {
+    /* No thread, no pending results */
+    CONNECT_THREAD_NONE,
+
+    /* Thread is running, no results for now */
+    CONNECT_THREAD_RUNNING,
+
+    /*
+     * Thread is running, but requestor exited. Thread should close
+     * the new socket and free the connect state on exit.
+     */
+    CONNECT_THREAD_RUNNING_DETACHED,
+
+    /* Thread finished, results are stored in a state */
+    CONNECT_THREAD_FAIL,
+    CONNECT_THREAD_SUCCESS
+} NBDConnectThreadState;
+
+typedef struct NBDConnectThread {
+    /* Initialization constants */
+    SocketAddress *saddr; /* address to connect to */
+    /*
+     * Bottom half to schedule on completion. Scheduled only if bh_ctx is not
+     * NULL
+     */
+    QEMUBHFunc *bh_func;
+    void *bh_opaque;
+
+    /*
+     * Result of last attempt. Valid in FAIL and SUCCESS states.
+     * If you want to steal error, don't forget to set pointer to NULL.
+     */
+    QIOChannelSocket *sioc;
+    Error *err;
+
+    /* state and bh_ctx are protected by mutex */
+    QemuMutex mutex;
+    NBDConnectThreadState state; /* current state of the thread */
+    AioContext *bh_ctx; /* where to schedule bh (NULL means don't schedule) */
+} NBDConnectThread;
+
 typedef struct BDRVNBDState {
     QIOChannelSocket *sioc; /* The master data channel */
     QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
@@ -80,6 +125,8 @@ typedef struct BDRVNBDState {
     Error *connect_err;
     bool wait_in_flight;
 
+    QEMUTimer *reconnect_delay_timer;
+
     NBDClientRequest requests[MAX_NBD_REQUESTS];
     NBDReply reply;
     BlockDriverState *bs;
@@ -91,9 +138,19 @@ typedef struct BDRVNBDState {
     QCryptoTLSCreds *tlscreds;
     const char *hostname;
     char *x_dirty_bitmap;
+    bool alloc_depth;
+
+    bool wait_connect;
+    NBDConnectThread *connect_thread;
 } BDRVNBDState;
 
-static int nbd_client_connect(BlockDriverState *bs, Error **errp);
+static int nbd_establish_connection(BlockDriverState *bs, SocketAddress *saddr,
+                                    Error **errp);
+static int nbd_co_establish_connection(BlockDriverState *bs, Error **errp);
+static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
+                                               bool detach);
+static int nbd_client_handshake(BlockDriverState *bs, Error **errp);
+static void nbd_yank(void *opaque);
 
 static void nbd_clear_bdrvstate(BDRVNBDState *s)
 {
@@ -111,12 +168,12 @@ static void nbd_clear_bdrvstate(BDRVNBDState *s)
 static void nbd_channel_error(BDRVNBDState *s, int ret)
 {
     if (ret == -EIO) {
-        if (s->state == NBD_CLIENT_CONNECTED) {
+        if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
             s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
                                             NBD_CLIENT_CONNECTING_NOWAIT;
         }
     } else {
-        if (s->state == NBD_CLIENT_CONNECTED) {
+        if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
             qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
         }
         s->state = NBD_CLIENT_QUIT;
@@ -136,11 +193,56 @@ static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
     }
 }
 
+static void reconnect_delay_timer_del(BDRVNBDState *s)
+{
+    if (s->reconnect_delay_timer) {
+        timer_free(s->reconnect_delay_timer);
+        s->reconnect_delay_timer = NULL;
+    }
+}
+
+static void reconnect_delay_timer_cb(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+
+    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
+        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
+        while (qemu_co_enter_next(&s->free_sema, NULL)) {
+            /* Resume all queued requests */
+        }
+    }
+
+    reconnect_delay_timer_del(s);
+}
+
+static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
+{
+    if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTING_WAIT) {
+        return;
+    }
+
+    assert(!s->reconnect_delay_timer);
+    s->reconnect_delay_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
+                                             QEMU_CLOCK_REALTIME,
+                                             SCALE_NS,
+                                             reconnect_delay_timer_cb, s);
+    timer_mod(s->reconnect_delay_timer, expire_time_ns);
+}
+
 static void nbd_client_detach_aio_context(BlockDriverState *bs)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
-    qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
+    /* Timer is deleted in nbd_client_co_drain_begin() */
+    assert(!s->reconnect_delay_timer);
+    /*
+     * If reconnect is in progress we may have no ->ioc.  It will be
+     * re-instantiated in the proper aio context once the connection is
+     * reestablished.
+     */
+    if (s->ioc) {
+        qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
+    }
 }
 
 static void nbd_client_attach_aio_context_bh(void *opaque)
@@ -148,13 +250,15 @@ static void nbd_client_attach_aio_context_bh(void *opaque)
     BlockDriverState *bs = opaque;
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
-    /*
-     * The node is still drained, so we know the coroutine has yielded in
-     * nbd_read_eof(), the only place where bs->in_flight can reach 0, or it is
-     * entered for the first time. Both places are safe for entering the
-     * coroutine.
-     */
-    qemu_aio_coroutine_enter(bs->aio_context, s->connection_co);
+    if (s->connection_co) {
+        /*
+         * The node is still drained, so we know the coroutine has yielded in
+         * nbd_read_eof(), the only place where bs->in_flight can reach 0, or
+         * it is entered for the first time. Both places are safe for entering
+         * the coroutine.
+         */
+        qemu_aio_coroutine_enter(bs->aio_context, s->connection_co);
+    }
     bdrv_dec_in_flight(bs);
 }
 
@@ -167,7 +271,7 @@ static void nbd_client_attach_aio_context(BlockDriverState *bs,
      * s->connection_co is either yielded from nbd_receive_reply or from
      * nbd_co_reconnect_loop()
      */
-    if (s->state == NBD_CLIENT_CONNECTED) {
+    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
         qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
     }
 
@@ -188,6 +292,15 @@ static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
     if (s->connection_co_sleep_ns_state) {
         qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
     }
+
+    nbd_co_establish_connection_cancel(bs, false);
+
+    reconnect_delay_timer_del(s);
+
+    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
+        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
+        qemu_co_queue_restart_all(&s->free_sema);
+    }
 }
 
 static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
@@ -206,16 +319,21 @@ static void nbd_teardown_connection(BlockDriverState *bs)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
-    if (s->state == NBD_CLIENT_CONNECTED) {
+    if (s->ioc) {
         /* finish any pending coroutines */
-        assert(s->ioc);
         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+    } else if (s->sioc) {
+        /* abort negotiation */
+        qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH,
+                             NULL);
     }
+
     s->state = NBD_CLIENT_QUIT;
     if (s->connection_co) {
         if (s->connection_co_sleep_ns_state) {
             qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
         }
+        nbd_co_establish_connection_cancel(bs, true);
     }
     if (qemu_in_coroutine()) {
         s->teardown_co = qemu_coroutine_self();
@@ -230,17 +348,236 @@ static void nbd_teardown_connection(BlockDriverState *bs)
 
 static bool nbd_client_connecting(BDRVNBDState *s)
 {
-    return s->state == NBD_CLIENT_CONNECTING_WAIT ||
-        s->state == NBD_CLIENT_CONNECTING_NOWAIT;
+    NBDClientState state = qatomic_load_acquire(&s->state);
+    return state == NBD_CLIENT_CONNECTING_WAIT ||
+        state == NBD_CLIENT_CONNECTING_NOWAIT;
 }
 
 static bool nbd_client_connecting_wait(BDRVNBDState *s)
 {
-    return s->state == NBD_CLIENT_CONNECTING_WAIT;
+    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
+}
+
+static void connect_bh(void *opaque)
+{
+    BDRVNBDState *state = opaque;
+
+    assert(state->wait_connect);
+    state->wait_connect = false;
+    aio_co_wake(state->connection_co);
+}
+
+static void nbd_init_connect_thread(BDRVNBDState *s)
+{
+    s->connect_thread = g_new(NBDConnectThread, 1);
+
+    *s->connect_thread = (NBDConnectThread) {
+        .saddr = QAPI_CLONE(SocketAddress, s->saddr),
+        .state = CONNECT_THREAD_NONE,
+        .bh_func = connect_bh,
+        .bh_opaque = s,
+    };
+
+    qemu_mutex_init(&s->connect_thread->mutex);
+}
+
+static void nbd_free_connect_thread(NBDConnectThread *thr)
+{
+    if (thr->sioc) {
+        qio_channel_close(QIO_CHANNEL(thr->sioc), NULL);
+    }
+    error_free(thr->err);
+    qapi_free_SocketAddress(thr->saddr);
+    g_free(thr);
+}
+
+static void *connect_thread_func(void *opaque)
+{
+    NBDConnectThread *thr = opaque;
+    int ret;
+    bool do_free = false;
+
+    thr->sioc = qio_channel_socket_new();
+
+    error_free(thr->err);
+    thr->err = NULL;
+    ret = qio_channel_socket_connect_sync(thr->sioc, thr->saddr, &thr->err);
+    if (ret < 0) {
+        object_unref(OBJECT(thr->sioc));
+        thr->sioc = NULL;
+    }
+
+    qemu_mutex_lock(&thr->mutex);
+
+    switch (thr->state) {
+    case CONNECT_THREAD_RUNNING:
+        thr->state = ret < 0 ? CONNECT_THREAD_FAIL : CONNECT_THREAD_SUCCESS;
+        if (thr->bh_ctx) {
+            aio_bh_schedule_oneshot(thr->bh_ctx, thr->bh_func, thr->bh_opaque);
+
+            /* play safe, don't reuse bh_ctx on further connection attempts */
+            thr->bh_ctx = NULL;
+        }
+        break;
+    case CONNECT_THREAD_RUNNING_DETACHED:
+        do_free = true;
+        break;
+    default:
+        abort();
+    }
+
+    qemu_mutex_unlock(&thr->mutex);
+
+    if (do_free) {
+        nbd_free_connect_thread(thr);
+    }
+
+    return NULL;
+}
+
+static int coroutine_fn
+nbd_co_establish_connection(BlockDriverState *bs, Error **errp)
+{
+    int ret;
+    QemuThread thread;
+    BDRVNBDState *s = bs->opaque;
+    NBDConnectThread *thr = s->connect_thread;
+
+    qemu_mutex_lock(&thr->mutex);
+
+    switch (thr->state) {
+    case CONNECT_THREAD_FAIL:
+    case CONNECT_THREAD_NONE:
+        error_free(thr->err);
+        thr->err = NULL;
+        thr->state = CONNECT_THREAD_RUNNING;
+        qemu_thread_create(&thread, "nbd-connect",
+                           connect_thread_func, thr, QEMU_THREAD_DETACHED);
+        break;
+    case CONNECT_THREAD_SUCCESS:
+        /* Previous attempt finally succeeded in background */
+        thr->state = CONNECT_THREAD_NONE;
+        s->sioc = thr->sioc;
+        thr->sioc = NULL;
+        yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
+                               nbd_yank, bs);
+        qemu_mutex_unlock(&thr->mutex);
+        return 0;
+    case CONNECT_THREAD_RUNNING:
+        /* Already running, will wait */
+        break;
+    default:
+        abort();
+    }
+
+    thr->bh_ctx = qemu_get_current_aio_context();
+
+    qemu_mutex_unlock(&thr->mutex);
+
+
+    /*
+     * We are going to wait for connect-thread finish, but
+     * nbd_client_co_drain_begin() can interrupt.
+     *
+     * Note that wait_connect variable is not visible for connect-thread. It
+     * doesn't need mutex protection, it used only inside home aio context of
+     * bs.
+     */
+    s->wait_connect = true;
+    qemu_coroutine_yield();
+
+    qemu_mutex_lock(&thr->mutex);
+
+    switch (thr->state) {
+    case CONNECT_THREAD_SUCCESS:
+    case CONNECT_THREAD_FAIL:
+        thr->state = CONNECT_THREAD_NONE;
+        error_propagate(errp, thr->err);
+        thr->err = NULL;
+        s->sioc = thr->sioc;
+        thr->sioc = NULL;
+        if (s->sioc) {
+            yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
+                                   nbd_yank, bs);
+        }
+        ret = (s->sioc ? 0 : -1);
+        break;
+    case CONNECT_THREAD_RUNNING:
+    case CONNECT_THREAD_RUNNING_DETACHED:
+        /*
+         * Obviously, drained section wants to start. Report the attempt as
+         * failed. Still connect thread is executing in background, and its
+         * result may be used for next connection attempt.
+         */
+        ret = -1;
+        error_setg(errp, "Connection attempt cancelled by other operation");
+        break;
+
+    case CONNECT_THREAD_NONE:
+        /*
+         * Impossible. We've seen this thread running. So it should be
+         * running or at least give some results.
+         */
+        abort();
+
+    default:
+        abort();
+    }
+
+    qemu_mutex_unlock(&thr->mutex);
+
+    return ret;
+}
+
+/*
+ * nbd_co_establish_connection_cancel
+ * Cancel nbd_co_establish_connection asynchronously: it will finish soon, to
+ * allow drained section to begin.
+ *
+ * If detach is true, also cleanup the state (or if thread is running, move it
+ * to CONNECT_THREAD_RUNNING_DETACHED state). s->connect_thread becomes NULL if
+ * detach is true.
+ */
+static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
+                                               bool detach)
+{
+    BDRVNBDState *s = bs->opaque;
+    NBDConnectThread *thr = s->connect_thread;
+    bool wake = false;
+    bool do_free = false;
+
+    qemu_mutex_lock(&thr->mutex);
+
+    if (thr->state == CONNECT_THREAD_RUNNING) {
+        /* We can cancel only in running state, when bh is not yet scheduled */
+        thr->bh_ctx = NULL;
+        if (s->wait_connect) {
+            s->wait_connect = false;
+            wake = true;
+        }
+        if (detach) {
+            thr->state = CONNECT_THREAD_RUNNING_DETACHED;
+            s->connect_thread = NULL;
+        }
+    } else if (detach) {
+        do_free = true;
+    }
+
+    qemu_mutex_unlock(&thr->mutex);
+
+    if (do_free) {
+        nbd_free_connect_thread(thr);
+        s->connect_thread = NULL;
+    }
+
+    if (wake) {
+        aio_co_wake(s->connection_co);
+    }
 }
 
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 {
+    int ret;
     Error *local_err = NULL;
 
     if (!nbd_client_connecting(s)) {
@@ -273,47 +610,62 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 
     /* Finalize previous connection if any */
     if (s->ioc) {
-        nbd_client_detach_aio_context(s->bs);
+        qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
+        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
+                                 nbd_yank, s->bs);
         object_unref(OBJECT(s->sioc));
         s->sioc = NULL;
         object_unref(OBJECT(s->ioc));
         s->ioc = NULL;
     }
 
-    s->connect_status = nbd_client_connect(s->bs, &local_err);
+    if (nbd_co_establish_connection(s->bs, &local_err) < 0) {
+        ret = -ECONNREFUSED;
+        goto out;
+    }
+
+    bdrv_dec_in_flight(s->bs);
+
+    ret = nbd_client_handshake(s->bs, &local_err);
+
+    if (s->drained) {
+        s->wait_drained_end = true;
+        while (s->drained) {
+            /*
+             * We may be entered once from nbd_client_attach_aio_context_bh
+             * and then from nbd_client_co_drain_end. So here is a loop.
+             */
+            qemu_coroutine_yield();
+        }
+    }
+    bdrv_inc_in_flight(s->bs);
+
+out:
+    s->connect_status = ret;
     error_free(s->connect_err);
     s->connect_err = NULL;
     error_propagate(&s->connect_err, local_err);
 
-    if (s->connect_status < 0) {
-        /* failed attempt */
-        return;
+    if (ret >= 0) {
+        /* successfully connected */
+        s->state = NBD_CLIENT_CONNECTED;
+        qemu_co_queue_restart_all(&s->free_sema);
     }
-
-    /* successfully connected */
-    s->state = NBD_CLIENT_CONNECTED;
-    qemu_co_queue_restart_all(&s->free_sema);
 }
 
 static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
 {
-    uint64_t start_time_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
-    uint64_t delay_ns = s->reconnect_delay * NANOSECONDS_PER_SECOND;
     uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
     uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
 
+    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
+        reconnect_delay_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
+                                   s->reconnect_delay * NANOSECONDS_PER_SECOND);
+    }
+
     nbd_reconnect_attempt(s);
 
     while (nbd_client_connecting(s)) {
-        if (s->state == NBD_CLIENT_CONNECTING_WAIT &&
-            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time_ns > delay_ns)
-        {
-            s->state = NBD_CLIENT_CONNECTING_NOWAIT;
-            qemu_co_queue_restart_all(&s->free_sema);
-        }
-
-        qemu_co_sleep_ns_wakeable(QEMU_CLOCK_REALTIME, timeout,
-                                  &s->connection_co_sleep_ns_state);
         if (s->drained) {
             bdrv_dec_in_flight(s->bs);
             s->wait_drained_end = true;
@@ -325,13 +677,21 @@ static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
                 qemu_coroutine_yield();
             }
             bdrv_inc_in_flight(s->bs);
-        }
-        if (timeout < max_timeout) {
-            timeout *= 2;
+        } else {
+            qemu_co_sleep_ns_wakeable(QEMU_CLOCK_REALTIME, timeout,
+                                      &s->connection_co_sleep_ns_state);
+            if (s->drained) {
+                continue;
+            }
+            if (timeout < max_timeout) {
+                timeout *= 2;
+            }
         }
 
         nbd_reconnect_attempt(s);
     }
+
+    reconnect_delay_timer_del(s);
 }
 
 static coroutine_fn void nbd_connection_entry(void *opaque)
@@ -341,7 +701,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
     int ret = 0;
     Error *local_err = NULL;
 
-    while (s->state != NBD_CLIENT_QUIT) {
+    while (qatomic_load_acquire(&s->state) != NBD_CLIENT_QUIT) {
         /*
          * The NBD client can only really be considered idle when it has
          * yielded from qio_channel_readv_all_eof(), waiting for data. This is
@@ -356,7 +716,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
             nbd_co_reconnect_loop(s);
         }
 
-        if (s->state != NBD_CLIENT_CONNECTED) {
+        if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
             continue;
         }
 
@@ -410,7 +770,9 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
 
     s->connection_co = NULL;
     if (s->ioc) {
-        nbd_client_detach_aio_context(s->bs);
+        qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
+        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
+                                 nbd_yank, s->bs);
         object_unref(OBJECT(s->sioc));
         s->sioc = NULL;
         object_unref(OBJECT(s->ioc));
@@ -435,7 +797,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
         qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
     }
 
-    if (s->state != NBD_CLIENT_CONNECTED) {
+    if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
         rc = -EIO;
         goto err;
     }
@@ -462,7 +824,8 @@ static int nbd_co_send_request(BlockDriverState *bs,
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
         rc = nbd_send_request(s->ioc, request);
-        if (rc >= 0 && s->state == NBD_CLIENT_CONNECTED) {
+        if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED &&
+            rc >= 0) {
             if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
                                        NULL) < 0) {
                 rc = -EIO;
@@ -620,6 +983,16 @@ static int nbd_parse_blockstatus_payload(BDRVNBDState *s,
         trace_nbd_parse_blockstatus_compliance("extent length too large");
     }
 
+    /*
+     * HACK: if we are using x-dirty-bitmaps to access
+     * qemu:allocation-depth, treat all depths > 2 the same as 2,
+     * since nbd_client_co_block_status is only expecting the low two
+     * bits to be set.
+     */
+    if (s->alloc_depth && extent->flags > 2) {
+        extent->flags = 2;
+    }
+
     return 0;
 }
 
@@ -777,7 +1150,7 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
     s->requests[i].receiving = true;
     qemu_coroutine_yield();
     s->requests[i].receiving = false;
-    if (s->state != NBD_CLIENT_CONNECTED) {
+    if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
         error_setg(errp, "Connection closed");
         return -EIO;
     }
@@ -936,7 +1309,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     NBDReply local_reply;
     NBDStructuredReplyChunk *chunk;
     Error *local_err = NULL;
-    if (s->state != NBD_CLIENT_CONNECTED) {
+    if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
         error_setg(&local_err, "Connection closed");
         nbd_iter_channel_error(iter, -EIO, &local_err);
         goto break_loop;
@@ -961,7 +1334,8 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     }
 
     /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
-    if (nbd_reply_is_simple(reply) || s->state != NBD_CLIENT_CONNECTED) {
+    if (nbd_reply_is_simple(reply) ||
+        qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
         goto break_loop;
     }
 
@@ -1393,6 +1767,15 @@ static int nbd_client_reopen_prepare(BDRVReopenState *state,
     return 0;
 }
 
+static void nbd_yank(void *opaque)
+{
+    BlockDriverState *bs = opaque;
+    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
+
+    qatomic_store_release(&s->state, NBD_CLIENT_QUIT);
+    qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+}
+
 static void nbd_client_close(BlockDriverState *bs)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
@@ -1405,66 +1788,66 @@ static void nbd_client_close(BlockDriverState *bs)
     nbd_teardown_connection(bs);
 }
 
-static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
-                                                  Error **errp)
+static int nbd_establish_connection(BlockDriverState *bs,
+                                    SocketAddress *saddr,
+                                    Error **errp)
 {
-    QIOChannelSocket *sioc;
-    Error *local_err = NULL;
+    ERRP_GUARD();
+    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
-    sioc = qio_channel_socket_new();
-    qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
+    s->sioc = qio_channel_socket_new();
+    qio_channel_set_name(QIO_CHANNEL(s->sioc), "nbd-client");
 
-    qio_channel_socket_connect_sync(sioc, saddr, &local_err);
-    if (local_err) {
-        object_unref(OBJECT(sioc));
-        error_propagate(errp, local_err);
-        return NULL;
+    qio_channel_socket_connect_sync(s->sioc, saddr, errp);
+    if (*errp) {
+        object_unref(OBJECT(s->sioc));
+        s->sioc = NULL;
+        return -1;
     }
 
-    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
+    yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), nbd_yank, bs);
+    qio_channel_set_delay(QIO_CHANNEL(s->sioc), false);
 
-    return sioc;
+    return 0;
 }
 
-static int nbd_client_connect(BlockDriverState *bs, Error **errp)
+/* nbd_client_handshake takes ownership on s->sioc. On failure it's unref'ed. */
+static int nbd_client_handshake(BlockDriverState *bs, Error **errp)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     AioContext *aio_context = bdrv_get_aio_context(bs);
     int ret;
 
-    /*
-     * establish TCP connection, return error if it fails
-     * TODO: Configurable retry-until-timeout behaviour.
-     */
-    QIOChannelSocket *sioc = nbd_establish_connection(s->saddr, errp);
-
-    if (!sioc) {
-        return -ECONNREFUSED;
-    }
-
-    /* NBD handshake */
-    trace_nbd_client_connect(s->export);
-    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
-    qio_channel_attach_aio_context(QIO_CHANNEL(sioc), aio_context);
+    trace_nbd_client_handshake(s->export);
+    qio_channel_set_blocking(QIO_CHANNEL(s->sioc), false, NULL);
+    qio_channel_attach_aio_context(QIO_CHANNEL(s->sioc), aio_context);
 
     s->info.request_sizes = true;
     s->info.structured_reply = true;
     s->info.base_allocation = true;
     s->info.x_dirty_bitmap = g_strdup(s->x_dirty_bitmap);
     s->info.name = g_strdup(s->export ?: "");
-    ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(sioc), s->tlscreds,
+    ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(s->sioc), s->tlscreds,
                                 s->hostname, &s->ioc, &s->info, errp);
     g_free(s->info.x_dirty_bitmap);
     g_free(s->info.name);
     if (ret < 0) {
-        object_unref(OBJECT(sioc));
+        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
+                                 nbd_yank, bs);
+        object_unref(OBJECT(s->sioc));
+        s->sioc = NULL;
         return ret;
     }
-    if (s->x_dirty_bitmap && !s->info.base_allocation) {
-        error_setg(errp, "requested x-dirty-bitmap %s not found",
-                   s->x_dirty_bitmap);
-        ret = -EINVAL;
-        goto fail;
+    if (s->x_dirty_bitmap) {
+        if (!s->info.base_allocation) {
+            error_setg(errp, "requested x-dirty-bitmap %s not found",
+                       s->x_dirty_bitmap);
+            ret = -EINVAL;
+            goto fail;
+        }
+        if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) {
+            s->alloc_depth = true;
+        }
     }
     if (s->info.flags & NBD_FLAG_READ_ONLY) {
         ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
@@ -1483,14 +1866,12 @@ static int nbd_client_connect(BlockDriverState *bs, Error **errp)
         }
     }
 
-    s->sioc = sioc;
-
     if (!s->ioc) {
-        s->ioc = QIO_CHANNEL(sioc);
+        s->ioc = QIO_CHANNEL(s->sioc);
         object_ref(OBJECT(s->ioc));
     }
 
-    trace_nbd_client_connect_success(s->export);
+    trace_nbd_client_handshake_success(s->export);
 
     return 0;
 
@@ -1502,9 +1883,12 @@ static int nbd_client_connect(BlockDriverState *bs, Error **errp)
     {
         NBDRequest request = { .type = NBD_CMD_DISC };
 
-        nbd_send_request(s->ioc ?: QIO_CHANNEL(sioc), &request);
+        nbd_send_request(s->ioc ?: QIO_CHANNEL(s->sioc), &request);
 
-        object_unref(OBJECT(sioc));
+        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
+                                 nbd_yank, bs);
+        object_unref(OBJECT(s->sioc));
+        s->sioc = NULL;
 
         return ret;
     }
@@ -1726,7 +2110,6 @@ static SocketAddress *nbd_config(BDRVNBDState *s, QDict *options,
     SocketAddress *saddr = NULL;
     QDict *addr = NULL;
     Visitor *iv = NULL;
-    Error *local_err = NULL;
 
     qdict_extract_subqdict(options, &addr, "server.");
     if (!qdict_size(addr)) {
@@ -1739,9 +2122,7 @@ static SocketAddress *nbd_config(BDRVNBDState *s, QDict *options,
         goto done;
     }
 
-    visit_type_SocketAddress(iv, NULL, &saddr, &local_err);
-    if (local_err) {
-        error_propagate(errp, local_err);
+    if (!visit_type_SocketAddress(iv, NULL, &saddr, errp)) {
         goto done;
     }
 
@@ -1836,13 +2217,10 @@ static int nbd_process_options(BlockDriverState *bs, QDict *options,
 {
     BDRVNBDState *s = bs->opaque;
     QemuOpts *opts;
-    Error *local_err = NULL;
     int ret = -EINVAL;
 
     opts = qemu_opts_create(&nbd_runtime_opts, NULL, 0, &error_abort);
-    qemu_opts_absorb_qdict(opts, options, &local_err);
-    if (local_err) {
-        error_propagate(errp, local_err);
+    if (!qemu_opts_absorb_qdict(opts, options, errp)) {
         goto error;
     }
 
@@ -1911,14 +2289,30 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
     qemu_co_mutex_init(&s->send_mutex);
     qemu_co_queue_init(&s->free_sema);
 
-    ret = nbd_client_connect(bs, errp);
+    if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
+        return -EEXIST;
+    }
+
+    /*
+     * establish TCP connection, return error if it fails
+     * TODO: Configurable retry-until-timeout behaviour.
+     */
+    if (nbd_establish_connection(bs, s->saddr, errp) < 0) {
+        yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
+        return -ECONNREFUSED;
+    }
+
+    ret = nbd_client_handshake(bs, errp);
     if (ret < 0) {
+        yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
         nbd_clear_bdrvstate(s);
         return ret;
     }
     /* successfully connected */
     s->state = NBD_CLIENT_CONNECTED;
 
+    nbd_init_connect_thread(s);
+
     s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
     bdrv_inc_in_flight(bs);
     aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
@@ -1970,9 +2364,37 @@ static void nbd_close(BlockDriverState *bs)
     BDRVNBDState *s = bs->opaque;
 
     nbd_client_close(bs);
+    yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
     nbd_clear_bdrvstate(s);
 }
 
+/*
+ * NBD cannot truncate, but if the caller asks to truncate to the same size, or
+ * to a smaller size with exact=false, there is no reason to fail the
+ * operation.
+ *
+ * Preallocation mode is ignored since it does not seems useful to fail when
+ * we never change anything.
+ */
+static int coroutine_fn nbd_co_truncate(BlockDriverState *bs, int64_t offset,
+                                        bool exact, PreallocMode prealloc,
+                                        BdrvRequestFlags flags, Error **errp)
+{
+    BDRVNBDState *s = bs->opaque;
+
+    if (offset != s->info.size && exact) {
+        error_setg(errp, "Cannot resize NBD nodes");
+        return -ENOTSUP;
+    }
+
+    if (offset > s->info.size) {
+        error_setg(errp, "Cannot grow NBD nodes");
+        return -EINVAL;
+    }
+
+    return 0;
+}
+
 static int64_t nbd_getlength(BlockDriverState *bs)
 {
     BDRVNBDState *s = bs->opaque;
@@ -2009,7 +2431,7 @@ static void nbd_refresh_filename(BlockDriverState *bs)
         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
                        "nbd://%s:%s", host, port);
     }
-    if (len > sizeof(bs->exact_filename)) {
+    if (len >= sizeof(bs->exact_filename)) {
         /* Name is too long to represent exactly, so leave it empty. */
         bs->exact_filename[0] = '\0';
     }
@@ -2036,6 +2458,18 @@ static const char *const nbd_strong_runtime_opts[] = {
     NULL
 };
 
+static void nbd_cancel_in_flight(BlockDriverState *bs)
+{
+    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
+
+    reconnect_delay_timer_del(s);
+
+    if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
+        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
+        qemu_co_queue_restart_all(&s->free_sema);
+    }
+}
+
 static BlockDriver bdrv_nbd = {
     .format_name                = "nbd",
     .protocol_name              = "nbd",
@@ -2052,6 +2486,7 @@ static BlockDriver bdrv_nbd = {
     .bdrv_co_flush_to_os        = nbd_co_flush,
     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
     .bdrv_refresh_limits        = nbd_refresh_limits,
+    .bdrv_co_truncate           = nbd_co_truncate,
     .bdrv_getlength             = nbd_getlength,
     .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
     .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
@@ -2061,6 +2496,7 @@ static BlockDriver bdrv_nbd = {
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
     .strong_runtime_opts        = nbd_strong_runtime_opts,
+    .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
 };
 
 static BlockDriver bdrv_nbd_tcp = {
@@ -2079,6 +2515,7 @@ static BlockDriver bdrv_nbd_tcp = {
     .bdrv_co_flush_to_os        = nbd_co_flush,
     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
     .bdrv_refresh_limits        = nbd_refresh_limits,
+    .bdrv_co_truncate           = nbd_co_truncate,
     .bdrv_getlength             = nbd_getlength,
     .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
     .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
@@ -2088,6 +2525,7 @@ static BlockDriver bdrv_nbd_tcp = {
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
     .strong_runtime_opts        = nbd_strong_runtime_opts,
+    .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
 };
 
 static BlockDriver bdrv_nbd_unix = {
@@ -2106,6 +2544,7 @@ static BlockDriver bdrv_nbd_unix = {
     .bdrv_co_flush_to_os        = nbd_co_flush,
     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
     .bdrv_refresh_limits        = nbd_refresh_limits,
+    .bdrv_co_truncate           = nbd_co_truncate,
     .bdrv_getlength             = nbd_getlength,
     .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
     .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
@@ -2115,6 +2554,7 @@ static BlockDriver bdrv_nbd_unix = {
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
     .strong_runtime_opts        = nbd_strong_runtime_opts,
+    .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
 };
 
 static void bdrv_nbd_init(void)