]> git.proxmox.com Git - mirror_qemu.git/blobdiff - nbd/server.c
target/i386: reimplement 0x0f 0x3a, add AVX
[mirror_qemu.git] / nbd / server.c
index b6841e455414acbcab670d4788009d53a3917674..ada16089f32e1b28b9538c8a07315c776694ed2b 100644 (file)
@@ -1,5 +1,5 @@
 /*
- *  Copyright (C) 2016-2020 Red Hat, Inc.
+ *  Copyright (C) 2016-2022 Red Hat, Inc.
  *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
  *
  *  Network Block Device Server Side
 #include "trace.h"
 #include "nbd-internal.h"
 #include "qemu/units.h"
+#include "qemu/memalign.h"
 
 #define NBD_META_ID_BASE_ALLOCATION 0
+#define NBD_META_ID_ALLOCATION_DEPTH 1
 /* Dirty bitmaps use 'NBD_META_ID_DIRTY_BITMAP + i', so keep this id last. */
-#define NBD_META_ID_DIRTY_BITMAP 1
+#define NBD_META_ID_DIRTY_BITMAP 2
 
 /*
  * NBD_MAX_BLOCK_STATUS_EXTENTS: 1 MiB of extents data. An empirical
@@ -76,7 +78,6 @@ static int system_errno_to_nbd_errno(int err)
 typedef struct NBDRequestData NBDRequestData;
 
 struct NBDRequestData {
-    QSIMPLEQ_ENTRY(NBDRequestData) entry;
     NBDClient *client;
     uint8_t *data;
     bool complete;
@@ -95,6 +96,7 @@ struct NBDExport {
     BlockBackend *eject_notifier_blk;
     Notifier eject_notifier;
 
+    bool allocation_depth;
     BdrvDirtyBitmap **export_bitmaps;
     size_t nr_export_bitmaps;
 };
@@ -108,6 +110,7 @@ typedef struct NBDExportMetaContexts {
     NBDExport *exp;
     size_t count; /* number of negotiated contexts */
     bool base_allocation; /* export base:allocation context (block status) */
+    bool allocation_depth; /* export qemu:allocation-depth */
     bool *bitmaps; /*
                     * export qemu:dirty-bitmap:<export bitmap name>,
                     * sized by exp->nr_export_bitmaps
@@ -129,6 +132,9 @@ struct NBDClient {
     CoMutex send_lock;
     Coroutine *send_coroutine;
 
+    bool read_yielding;
+    bool quiescing;
+
     QTAILQ_ENTRY(NBDClient) next;
     int nb_requests;
     bool closing;
@@ -207,7 +213,7 @@ static int nbd_negotiate_send_rep(NBDClient *client, uint32_t type,
 
 /* Send an error reply.
  * Return -errno on error, 0 on success. */
-static int GCC_FMT_ATTR(4, 0)
+static int G_GNUC_PRINTF(4, 0)
 nbd_negotiate_send_rep_verr(NBDClient *client, uint32_t type,
                             Error **errp, const char *fmt, va_list va)
 {
@@ -247,7 +253,7 @@ nbd_sanitize_name(const char *name)
 
 /* Send an error reply.
  * Return -errno on error, 0 on success. */
-static int GCC_FMT_ATTR(4, 5)
+static int G_GNUC_PRINTF(4, 5)
 nbd_negotiate_send_rep_err(NBDClient *client, uint32_t type,
                            Error **errp, const char *fmt, ...)
 {
@@ -263,7 +269,7 @@ nbd_negotiate_send_rep_err(NBDClient *client, uint32_t type,
 /* Drop remainder of the current option, and send a reply with the
  * given error type and message. Return -errno on read or write
  * failure; or 0 if connection is still live. */
-static int GCC_FMT_ATTR(4, 0)
+static int G_GNUC_PRINTF(4, 0)
 nbd_opt_vdrop(NBDClient *client, uint32_t type, Error **errp,
               const char *fmt, va_list va)
 {
@@ -276,7 +282,7 @@ nbd_opt_vdrop(NBDClient *client, uint32_t type, Error **errp,
     return ret;
 }
 
-static int GCC_FMT_ATTR(4, 5)
+static int G_GNUC_PRINTF(4, 5)
 nbd_opt_drop(NBDClient *client, uint32_t type, Error **errp,
              const char *fmt, ...)
 {
@@ -290,7 +296,7 @@ nbd_opt_drop(NBDClient *client, uint32_t type, Error **errp,
     return ret;
 }
 
-static int GCC_FMT_ATTR(3, 4)
+static int G_GNUC_PRINTF(3, 4)
 nbd_opt_invalid(NBDClient *client, Error **errp, const char *fmt, ...)
 {
     int ret;
@@ -857,7 +863,8 @@ static bool nbd_meta_base_query(NBDClient *client, NBDExportMetaContexts *meta,
 /* nbd_meta_qemu_query
  *
  * Handle queries to 'qemu' namespace. For now, only the qemu:dirty-bitmap:
- * context is available.  Return true if @query has been handled.
+ * and qemu:allocation-depth contexts are available.  Return true if @query
+ * has been handled.
  */
 static bool nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
                                 const char *query)
@@ -871,16 +878,26 @@ static bool nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
 
     if (!*query) {
         if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
-            memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
+            meta->allocation_depth = meta->exp->allocation_depth;
+            if (meta->exp->nr_export_bitmaps) {
+                memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
+            }
         }
         trace_nbd_negotiate_meta_query_parse("empty");
         return true;
     }
 
+    if (strcmp(query, "allocation-depth") == 0) {
+        trace_nbd_negotiate_meta_query_parse("allocation-depth");
+        meta->allocation_depth = meta->exp->allocation_depth;
+        return true;
+    }
+
     if (nbd_strshift(&query, "dirty-bitmap:")) {
         trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
         if (!*query) {
-            if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
+            if (client->opt == NBD_OPT_LIST_META_CONTEXT &&
+                meta->exp->nr_export_bitmaps) {
                 memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
             }
             trace_nbd_negotiate_meta_query_parse("empty");
@@ -901,7 +918,7 @@ static bool nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
         return true;
     }
 
-    trace_nbd_negotiate_meta_query_skip("not dirty-bitmap");
+    trace_nbd_negotiate_meta_query_skip("unknown qemu context");
     return true;
 }
 
@@ -959,13 +976,14 @@ static int nbd_negotiate_meta_queries(NBDClient *client,
 {
     int ret;
     g_autofree char *export_name = NULL;
-    g_autofree bool *bitmaps = NULL;
+    /* Mark unused to work around https://bugs.llvm.org/show_bug.cgi?id=3888 */
+    g_autofree G_GNUC_UNUSED bool *bitmaps = NULL;
     NBDExportMetaContexts local_meta = {0};
     uint32_t nb_queries;
     size_t i;
     size_t count = 0;
 
-    if (!client->structured_reply) {
+    if (client->opt == NBD_OPT_SET_META_CONTEXT && !client->structured_reply) {
         return nbd_opt_invalid(client, errp,
                                "request option '%s' when structured reply "
                                "is not negotiated",
@@ -1008,7 +1026,10 @@ static int nbd_negotiate_meta_queries(NBDClient *client,
     if (client->opt == NBD_OPT_LIST_META_CONTEXT && !nb_queries) {
         /* enable all known contexts */
         meta->base_allocation = true;
-        memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
+        meta->allocation_depth = meta->exp->allocation_depth;
+        if (meta->exp->nr_export_bitmaps) {
+            memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
+        }
     } else {
         for (i = 0; i < nb_queries; ++i) {
             ret = nbd_negotiate_meta_query(client, meta, errp);
@@ -1028,6 +1049,16 @@ static int nbd_negotiate_meta_queries(NBDClient *client,
         count++;
     }
 
+    if (meta->allocation_depth) {
+        ret = nbd_negotiate_send_meta_context(client, "qemu:allocation-depth",
+                                              NBD_META_ID_ALLOCATION_DEPTH,
+                                              errp);
+        if (ret < 0) {
+            return ret;
+        }
+        count++;
+    }
+
     for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
         const char *bm_name;
         g_autofree char *context = NULL;
@@ -1330,17 +1361,66 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
     return 0;
 }
 
-static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request,
+/* nbd_read_eof
+ * Tries to read @size bytes from @ioc. This is a local implementation of
+ * qio_channel_readv_all_eof. We have it here because we need it to be
+ * interruptible and to know when the coroutine is yielding.
+ * Returns 1 on success
+ *         0 on eof, when no data was read (errp is not set)
+ *         negative errno on failure (errp is set)
+ */
+static inline int coroutine_fn
+nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
+{
+    bool partial = false;
+
+    assert(size);
+    while (size > 0) {
+        struct iovec iov = { .iov_base = buffer, .iov_len = size };
+        ssize_t len;
+
+        len = qio_channel_readv(client->ioc, &iov, 1, errp);
+        if (len == QIO_CHANNEL_ERR_BLOCK) {
+            client->read_yielding = true;
+            qio_channel_yield(client->ioc, G_IO_IN);
+            client->read_yielding = false;
+            if (client->quiescing) {
+                return -EAGAIN;
+            }
+            continue;
+        } else if (len < 0) {
+            return -EIO;
+        } else if (len == 0) {
+            if (partial) {
+                error_setg(errp,
+                           "Unexpected end-of-file before all bytes were read");
+                return -EIO;
+            } else {
+                return 0;
+            }
+        }
+
+        partial = true;
+        size -= len;
+        buffer = (uint8_t *) buffer + len;
+    }
+    return 1;
+}
+
+static int nbd_receive_request(NBDClient *client, NBDRequest *request,
                                Error **errp)
 {
     uint8_t buf[NBD_REQUEST_SIZE];
     uint32_t magic;
     int ret;
 
-    ret = nbd_read(ioc, buf, sizeof(buf), "request", errp);
+    ret = nbd_read_eof(client, buf, sizeof(buf), errp);
     if (ret < 0) {
         return ret;
     }
+    if (ret == 0) {
+        return -EIO;
+    }
 
     /* Request
        [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
@@ -1442,6 +1522,11 @@ static void nbd_request_put(NBDRequestData *req)
     g_free(req);
 
     client->nb_requests--;
+
+    if (client->quiescing && client->nb_requests == 0) {
+        aio_wait_kick();
+    }
+
     nbd_client_receive_next_request(client);
 
     nbd_client_put(client);
@@ -1458,12 +1543,10 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
 
     QTAILQ_FOREACH(client, &exp->clients, next) {
         qio_channel_attach_aio_context(client->ioc, ctx);
-        if (client->recv_coroutine) {
-            aio_co_schedule(ctx, client->recv_coroutine);
-        }
-        if (client->send_coroutine) {
-            aio_co_schedule(ctx, client->send_coroutine);
-        }
+
+        assert(client->nb_requests == 0);
+        assert(client->recv_coroutine == NULL);
+        assert(client->send_coroutine == NULL);
     }
 }
 
@@ -1481,6 +1564,50 @@ static void blk_aio_detach(void *opaque)
     exp->common.ctx = NULL;
 }
 
+static void nbd_drained_begin(void *opaque)
+{
+    NBDExport *exp = opaque;
+    NBDClient *client;
+
+    QTAILQ_FOREACH(client, &exp->clients, next) {
+        client->quiescing = true;
+    }
+}
+
+static void nbd_drained_end(void *opaque)
+{
+    NBDExport *exp = opaque;
+    NBDClient *client;
+
+    QTAILQ_FOREACH(client, &exp->clients, next) {
+        client->quiescing = false;
+        nbd_client_receive_next_request(client);
+    }
+}
+
+static bool nbd_drained_poll(void *opaque)
+{
+    NBDExport *exp = opaque;
+    NBDClient *client;
+
+    QTAILQ_FOREACH(client, &exp->clients, next) {
+        if (client->nb_requests != 0) {
+            /*
+             * If there's a coroutine waiting for a request on nbd_read_eof()
+             * enter it here so we don't depend on the client to wake it up.
+             */
+            if (client->recv_coroutine != NULL && client->read_yielding) {
+                qemu_aio_coroutine_enter(exp->common.ctx,
+                                         client->recv_coroutine);
+            }
+
+            return true;
+        }
+    }
+
+    return false;
+}
+
 static void nbd_eject_notifier(Notifier *n, void *data)
 {
     NBDExport *exp = container_of(n, NBDExport, eject_notifier);
@@ -1500,6 +1627,12 @@ void nbd_export_set_on_eject_blk(BlockExport *exp, BlockBackend *blk)
     blk_add_remove_bs_notifier(blk, &nbd_exp->eject_notifier);
 }
 
+static const BlockDevOps nbd_block_ops = {
+    .drained_begin = nbd_drained_begin,
+    .drained_end = nbd_drained_end,
+    .drained_poll = nbd_drained_poll,
+};
+
 static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args,
                              Error **errp)
 {
@@ -1509,8 +1642,7 @@ static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args,
     int64_t size;
     uint64_t perm, shared_perm;
     bool readonly = !exp_args->writable;
-    bool shared = !exp_args->writable;
-    strList *bitmaps;
+    BlockDirtyBitmapOrStrList *bitmaps;
     size_t i;
     int ret;
 
@@ -1560,11 +1692,12 @@ static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args,
     exp->description = g_strdup(arg->description);
     exp->nbdflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH |
                      NBD_FLAG_SEND_FUA | NBD_FLAG_SEND_CACHE);
+
+    if (nbd_server_max_connections() != 1) {
+        exp->nbdflags |= NBD_FLAG_CAN_MULTI_CONN;
+    }
     if (readonly) {
         exp->nbdflags |= NBD_FLAG_READ_ONLY;
-        if (shared) {
-            exp->nbdflags |= NBD_FLAG_CAN_MULTI_CONN;
-        }
     } else {
         exp->nbdflags |= (NBD_FLAG_SEND_TRIM | NBD_FLAG_SEND_WRITE_ZEROES |
                           NBD_FLAG_SEND_FAST_ZERO);
@@ -1576,37 +1709,56 @@ static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args,
     }
     exp->export_bitmaps = g_new0(BdrvDirtyBitmap *, exp->nr_export_bitmaps);
     for (i = 0, bitmaps = arg->bitmaps; bitmaps;
-         i++, bitmaps = bitmaps->next) {
-        const char *bitmap = bitmaps->value;
+         i++, bitmaps = bitmaps->next)
+    {
+        const char *bitmap;
         BlockDriverState *bs = blk_bs(blk);
         BdrvDirtyBitmap *bm = NULL;
 
-        while (bs) {
-            bm = bdrv_find_dirty_bitmap(bs, bitmap);
-            if (bm != NULL) {
-                break;
+        switch (bitmaps->value->type) {
+        case QTYPE_QSTRING:
+            bitmap = bitmaps->value->u.local;
+            while (bs) {
+                bm = bdrv_find_dirty_bitmap(bs, bitmap);
+                if (bm != NULL) {
+                    break;
+                }
+
+                bs = bdrv_filter_or_cow_bs(bs);
             }
 
-            bs = bdrv_filter_or_cow_bs(bs);
-        }
+            if (bm == NULL) {
+                ret = -ENOENT;
+                error_setg(errp, "Bitmap '%s' is not found",
+                           bitmaps->value->u.local);
+                goto fail;
+            }
 
-        if (bm == NULL) {
-            ret = -ENOENT;
-            error_setg(errp, "Bitmap '%s' is not found", bitmap);
-            goto fail;
+            if (readonly && bdrv_is_writable(bs) &&
+                bdrv_dirty_bitmap_enabled(bm)) {
+                ret = -EINVAL;
+                error_setg(errp, "Enabled bitmap '%s' incompatible with "
+                           "readonly export", bitmap);
+                goto fail;
+            }
+            break;
+        case QTYPE_QDICT:
+            bitmap = bitmaps->value->u.external.name;
+            bm = block_dirty_bitmap_lookup(bitmaps->value->u.external.node,
+                                           bitmap, NULL, errp);
+            if (!bm) {
+                ret = -ENOENT;
+                goto fail;
+            }
+            break;
+        default:
+            abort();
         }
 
-        if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) {
-            ret = -EINVAL;
-            goto fail;
-        }
+        assert(bm);
 
-        if (readonly && bdrv_is_writable(bs) &&
-            bdrv_dirty_bitmap_enabled(bm)) {
+        if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) {
             ret = -EINVAL;
-            error_setg(errp,
-                       "Enabled bitmap '%s' incompatible with readonly export",
-                       bitmap);
             goto fail;
         }
 
@@ -1619,8 +1771,19 @@ static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args,
         bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], true);
     }
 
+    exp->allocation_depth = arg->allocation_depth;
+
+    /*
+     * We need to inhibit request queuing in the block layer to ensure we can
+     * be properly quiesced when entering a drained section, as our coroutines
+     * servicing pending requests might enter blk_pread().
+     */
+    blk_set_disable_request_queuing(blk, true);
+
     blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
 
+    blk_set_dev_ops(blk, &nbd_block_ops, exp);
+
     QTAILQ_INSERT_TAIL(&exports, exp, next);
 
     return 0;
@@ -1692,6 +1855,7 @@ static void nbd_export_delete(BlockExport *blk_exp)
         }
         blk_remove_aio_context_notifier(exp->common.blk, blk_aio_attached,
                                         blk_aio_detach, exp);
+        blk_set_disable_request_queuing(exp->common.blk, false);
     }
 
     for (i = 0; i < exp->nr_export_bitmaps; i++) {
@@ -1875,8 +2039,8 @@ static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client,
             stl_be_p(&chunk.length, pnum);
             ret = nbd_co_send_iov(client, iov, 1, errp);
         } else {
-            ret = blk_pread(exp->common.blk, offset + progress,
-                            data + progress, pnum);
+            ret = blk_pread(exp->common.blk, offset + progress, pnum,
+                            data + progress, 0);
             if (ret < 0) {
                 error_setg_errno(errp, -ret, "reading from file failed");
                 break;
@@ -1919,7 +2083,7 @@ static void nbd_extent_array_free(NBDExtentArray *ea)
     g_free(ea->extents);
     g_free(ea);
 }
-G_DEFINE_AUTOPTR_CLEANUP_FUNC(NBDExtentArray, nbd_extent_array_free);
+G_DEFINE_AUTOPTR_CLEANUP_FUNC(NBDExtentArray, nbd_extent_array_free)
 
 /* Further modifications of the array after conversion are abandoned */
 static void nbd_extent_array_convert_to_be(NBDExtentArray *ea)
@@ -1940,11 +2104,10 @@ static void nbd_extent_array_convert_to_be(NBDExtentArray *ea)
  * Add extent to NBDExtentArray. If extent can't be added (no available space),
  * return -1.
  * For safety, when returning -1 for the first time, .can_add is set to false,
- * further call to nbd_extent_array_add() will crash.
- * (to avoid the situation, when after failing to add an extent (returned -1),
- * user miss this failure and add another extent, which is successfully added
- * (array is full, but new extent may be squashed into the last one), then we
- * have invalid array with skipped extent)
+ * and further calls to nbd_extent_array_add() will crash.
+ * (this avoids the situation where a caller ignores failure to add one extent,
+ * where adding another extent that would squash into the last array entry
+ * would result in an incorrect range reported to the client)
  */
 static int nbd_extent_array_add(NBDExtentArray *ea,
                                 uint32_t length, uint32_t flags)
@@ -1991,8 +2154,8 @@ static int blockstatus_to_extents(BlockDriverState *bs, uint64_t offset,
             return ret;
         }
 
-        flags = (ret & BDRV_BLOCK_ALLOCATED ? 0 : NBD_STATE_HOLE) |
-                (ret & BDRV_BLOCK_ZERO      ? NBD_STATE_ZERO : 0);
+        flags = (ret & BDRV_BLOCK_DATA ? 0 : NBD_STATE_HOLE) |
+                (ret & BDRV_BLOCK_ZERO ? NBD_STATE_ZERO : 0);
 
         if (nbd_extent_array_add(ea, num, flags) < 0) {
             return 0;
@@ -2005,6 +2168,29 @@ static int blockstatus_to_extents(BlockDriverState *bs, uint64_t offset,
     return 0;
 }
 
+static int blockalloc_to_extents(BlockDriverState *bs, uint64_t offset,
+                                 uint64_t bytes, NBDExtentArray *ea)
+{
+    while (bytes) {
+        int64_t num;
+        int ret = bdrv_is_allocated_above(bs, NULL, false, offset, bytes,
+                                          &num);
+
+        if (ret < 0) {
+            return ret;
+        }
+
+        if (nbd_extent_array_add(ea, num, ret) < 0) {
+            return 0;
+        }
+
+        offset += num;
+        bytes -= num;
+    }
+
+    return 0;
+}
+
 /*
  * nbd_co_send_extents
  *
@@ -2044,7 +2230,11 @@ static int nbd_co_send_block_status(NBDClient *client, uint64_t handle,
     unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
     g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
 
-    ret = blockstatus_to_extents(bs, offset, length, ea);
+    if (context_id == NBD_META_ID_BASE_ALLOCATION) {
+        ret = blockstatus_to_extents(bs, offset, length, ea);
+    } else {
+        ret = blockalloc_to_extents(bs, offset, length, ea);
+    }
     if (ret < 0) {
         return nbd_co_send_structured_error(
                 client, handle, -ret, "can't get block status", errp);
@@ -2078,8 +2268,8 @@ static void bitmap_to_extents(BdrvDirtyBitmap *bitmap,
     }
 
     if (!full) {
-        /* last non dirty extent */
-        nbd_extent_array_add(es, end - start, 0);
+        /* last non dirty extent, nothing to do if array is now full */
+        (void) nbd_extent_array_add(es, end - start, 0);
     }
 
     bdrv_dirty_bitmap_unlock(bitmap);
@@ -2100,20 +2290,23 @@ static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
 
 /* nbd_co_receive_request
  * Collect a client request. Return 0 if request looks valid, -EIO to drop
- * connection right away, and any other negative value to report an error to
- * the client (although the caller may still need to disconnect after reporting
- * the error).
+ * connection right away, -EAGAIN to indicate we were interrupted and the
+ * channel should be quiesced, and any other negative value to report an error
+ * to the client (although the caller may still need to disconnect after
+ * reporting the error).
  */
 static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
                                   Error **errp)
 {
     NBDClient *client = req->client;
     int valid_flags;
+    int ret;
 
     g_assert(qemu_in_coroutine());
     assert(client->recv_coroutine == qemu_coroutine_self());
-    if (nbd_receive_request(client->ioc, request, errp) < 0) {
-        return -EIO;
+    ret = nbd_receive_request(client, request, errp);
+    if (ret < 0) {
+        return ret;
     }
 
     trace_nbd_co_receive_request_decode_type(request->handle, request->type,
@@ -2251,7 +2444,7 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
                                        data, request->len, errp);
     }
 
-    ret = blk_pread(exp->common.blk, request->from, data, request->len);
+    ret = blk_pread(exp->common.blk, request->from, request->len, data, 0);
     if (ret < 0) {
         return nbd_send_generic_reply(client, request->handle, ret,
                                       "reading from file failed", errp);
@@ -2318,7 +2511,7 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
         if (request->flags & NBD_CMD_FLAG_FUA) {
             flags |= BDRV_REQ_FUA;
         }
-        ret = blk_pwrite(exp->common.blk, request->from, data, request->len,
+        ret = blk_pwrite(exp->common.blk, request->from, request->len, data,
                          flags);
         return nbd_send_generic_reply(client, request->handle, ret,
                                       "writing to file failed", errp);
@@ -2334,16 +2527,8 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
         if (request->flags & NBD_CMD_FLAG_FAST_ZERO) {
             flags |= BDRV_REQ_NO_FALLBACK;
         }
-        ret = 0;
-        /* FIXME simplify this when blk_pwrite_zeroes switches to 64-bit */
-        while (ret >= 0 && request->len) {
-            int align = client->check_align ?: 1;
-            int len = MIN(request->len, QEMU_ALIGN_DOWN(BDRV_REQUEST_MAX_BYTES,
-                                                        align));
-            ret = blk_pwrite_zeroes(exp->common.blk, request->from, len, flags);
-            request->len -= len;
-            request->from += len;
-        }
+        ret = blk_pwrite_zeroes(exp->common.blk, request->from, request->len,
+                                flags);
         return nbd_send_generic_reply(client, request->handle, ret,
                                       "writing to file failed", errp);
 
@@ -2357,16 +2542,7 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
                                       "flush failed", errp);
 
     case NBD_CMD_TRIM:
-        ret = 0;
-        /* FIXME simplify this when blk_co_pdiscard switches to 64-bit */
-        while (ret >= 0 && request->len) {
-            int align = client->check_align ?: 1;
-            int len = MIN(request->len, QEMU_ALIGN_DOWN(BDRV_REQUEST_MAX_BYTES,
-                                                        align));
-            ret = blk_co_pdiscard(exp->common.blk, request->from, len);
-            request->len -= len;
-            request->from += len;
-        }
+        ret = blk_co_pdiscard(exp->common.blk, request->from, request->len);
         if (ret >= 0 && request->flags & NBD_CMD_FLAG_FUA) {
             ret = blk_co_flush(exp->common.blk);
         }
@@ -2395,6 +2571,19 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
                 }
             }
 
+            if (client->export_meta.allocation_depth) {
+                ret = nbd_co_send_block_status(client, request->handle,
+                                               blk_bs(exp->common.blk),
+                                               request->from, request->len,
+                                               dont_fragment,
+                                               !--contexts_remaining,
+                                               NBD_META_ID_ALLOCATION_DEPTH,
+                                               errp);
+                if (ret < 0) {
+                    return ret;
+                }
+            }
+
             for (i = 0; i < client->exp->nr_export_bitmaps; i++) {
                 if (!client->export_meta.bitmaps[i]) {
                     continue;
@@ -2443,6 +2632,17 @@ static coroutine_fn void nbd_trip(void *opaque)
         return;
     }
 
+    if (client->quiescing) {
+        /*
+         * We're switching between AIO contexts. Don't attempt to receive a new
+         * request and kick the main context which may be waiting for us.
+         */
+        nbd_client_put(client);
+        client->recv_coroutine = NULL;
+        aio_wait_kick();
+        return;
+    }
+
     req = nbd_request_get(client);
     ret = nbd_co_receive_request(req, &request, &local_err);
     client->recv_coroutine = NULL;
@@ -2455,13 +2655,18 @@ static coroutine_fn void nbd_trip(void *opaque)
         goto done;
     }
 
+    if (ret == -EAGAIN) {
+        assert(client->quiescing);
+        goto done;
+    }
+
     nbd_client_receive_next_request(client);
     if (ret == -EIO) {
         goto disconnect;
     }
 
     if (ret < 0) {
-        /* It wans't -EIO, so, according to nbd_co_receive_request()
+        /* It wasn't -EIO, so, according to nbd_co_receive_request()
          * semantics, we should return the error to the client. */
         Error *export_err = local_err;
 
@@ -2501,7 +2706,8 @@ disconnect:
 
 static void nbd_client_receive_next_request(NBDClient *client)
 {
-    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
+    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
+        !client->quiescing) {
         nbd_client_get(client);
         client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
         aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);