]> git.proxmox.com Git - mirror_qemu.git/blobdiff - nbd/server.c
nbd/client: Pull out oldstyle size determination
[mirror_qemu.git] / nbd / server.c
index 01ea97afe98292fe069f0f02521668447d20cbba..cb0d5634fa1d747360d7d2e3bbb760d719cd6865 100644 (file)
 #include "trace.h"
 #include "nbd-internal.h"
 
+#define NBD_META_ID_BASE_ALLOCATION 0
+#define NBD_META_ID_DIRTY_BITMAP 1
+
+/* NBD_MAX_BITMAP_EXTENTS: 1 mb of extents data. An empirical
+ * constant. If an increase is needed, note that the NBD protocol
+ * recommends no larger than 32 mb, so that the client won't consider
+ * the reply as a denial of service attack. */
+#define NBD_MAX_BITMAP_EXTENTS (0x100000 / 8)
+
 static int system_errno_to_nbd_errno(int err)
 {
     switch (err) {
@@ -68,8 +77,8 @@ struct NBDExport {
     BlockBackend *blk;
     char *name;
     char *description;
-    off_t dev_offset;
-    off_t size;
+    uint64_t dev_offset;
+    uint64_t size;
     uint16_t nbdflags;
     QTAILQ_HEAD(, NBDClient) clients;
     QTAILQ_ENTRY(NBDExport) next;
@@ -78,10 +87,24 @@ struct NBDExport {
 
     BlockBackend *eject_notifier_blk;
     Notifier eject_notifier;
+
+    BdrvDirtyBitmap *export_bitmap;
+    char *export_bitmap_context;
 };
 
 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
 
+/* NBDExportMetaContexts represents a list of contexts to be exported,
+ * as selected by NBD_OPT_SET_META_CONTEXT. Also used for
+ * NBD_OPT_LIST_META_CONTEXT. */
+typedef struct NBDExportMetaContexts {
+    NBDExport *exp;
+    bool valid; /* means that negotiation of the option finished without
+                   errors */
+    bool base_allocation; /* export base:allocation context (block status) */
+    bool bitmap; /* export qemu:dirty-bitmap:<export bitmap name> */
+} NBDExportMetaContexts;
+
 struct NBDClient {
     int refcount;
     void (*close_fn)(NBDClient *client, bool negotiated);
@@ -102,6 +125,7 @@ struct NBDClient {
     bool closing;
 
     bool structured_reply;
+    NBDExportMetaContexts export_meta;
 
     uint32_t opt; /* Current option being negotiated */
     uint32_t optlen; /* remaining length of data in ioc for the option being
@@ -273,6 +297,62 @@ static int nbd_opt_read(NBDClient *client, void *buffer, size_t size,
     return qio_channel_read_all(client->ioc, buffer, size, errp) < 0 ? -EIO : 1;
 }
 
+/* Drop size bytes from the unparsed payload of the current option.
+ * Return -errno on I/O error, 0 if option was completely handled by
+ * sending a reply about inconsistent lengths, or 1 on success. */
+static int nbd_opt_skip(NBDClient *client, size_t size, Error **errp)
+{
+    if (size > client->optlen) {
+        return nbd_opt_invalid(client, errp,
+                               "Inconsistent lengths in option %s",
+                               nbd_opt_lookup(client->opt));
+    }
+    client->optlen -= size;
+    return nbd_drop(client->ioc, size, errp) < 0 ? -EIO : 1;
+}
+
+/* nbd_opt_read_name
+ *
+ * Read a string with the format:
+ *   uint32_t len     (<= NBD_MAX_NAME_SIZE)
+ *   len bytes string (not 0-terminated)
+ *
+ * @name should be enough to store NBD_MAX_NAME_SIZE+1.
+ * If @length is non-null, it will be set to the actual string length.
+ *
+ * Return -errno on I/O error, 0 if option was completely handled by
+ * sending a reply about inconsistent lengths, or 1 on success.
+ */
+static int nbd_opt_read_name(NBDClient *client, char *name, uint32_t *length,
+                             Error **errp)
+{
+    int ret;
+    uint32_t len;
+
+    ret = nbd_opt_read(client, &len, sizeof(len), errp);
+    if (ret <= 0) {
+        return ret;
+    }
+    len = cpu_to_be32(len);
+
+    if (len > NBD_MAX_NAME_SIZE) {
+        return nbd_opt_invalid(client, errp,
+                               "Invalid name length: %" PRIu32, len);
+    }
+
+    ret = nbd_opt_read(client, name, len, errp);
+    if (ret <= 0) {
+        return ret;
+    }
+    name[len] = '\0';
+
+    if (length) {
+        *length = len;
+    }
+
+    return 1;
+}
+
 /* Send a single NBD_REP_SERVER reply to NBD_OPT_LIST, including payload.
  * Return -errno on error, 0 on success. */
 static int nbd_negotiate_send_rep_list(NBDClient *client, NBDExport *exp,
@@ -330,6 +410,11 @@ static int nbd_negotiate_handle_list(NBDClient *client, Error **errp)
     return nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
 }
 
+static void nbd_check_meta_export(NBDClient *client)
+{
+    client->export_meta.valid &= client->exp == client->export_meta.exp;
+}
+
 /* Send a reply to NBD_OPT_EXPORT_NAME.
  * Return -errno on error, 0 on success. */
 static int nbd_negotiate_handle_export_name(NBDClient *client,
@@ -381,6 +466,7 @@ static int nbd_negotiate_handle_export_name(NBDClient *client,
 
     QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
     nbd_export_get(client->exp);
+    nbd_check_meta_export(client);
 
     return 0;
 }
@@ -400,7 +486,7 @@ static int nbd_negotiate_send_info(NBDClient *client,
     if (rc < 0) {
         return rc;
     }
-    cpu_to_be16s(&info);
+    info = cpu_to_be16(info);
     if (nbd_write(client->ioc, &info, sizeof(info), errp) < 0) {
         return -EIO;
     }
@@ -455,33 +541,24 @@ static int nbd_negotiate_handle_info(NBDClient *client, uint16_t myflags,
         2 bytes: N, number of requests (can be 0)
         N * 2 bytes: N requests
     */
-    rc = nbd_opt_read(client, &namelen, sizeof(namelen), errp);
-    if (rc <= 0) {
-        return rc;
-    }
-    be32_to_cpus(&namelen);
-    if (namelen >= sizeof(name)) {
-        return nbd_opt_invalid(client, errp, "name too long for qemu");
-    }
-    rc = nbd_opt_read(client, name, namelen, errp);
+    rc = nbd_opt_read_name(client, name, &namelen, errp);
     if (rc <= 0) {
         return rc;
     }
-    name[namelen] = '\0';
     trace_nbd_negotiate_handle_export_name_request(name);
 
     rc = nbd_opt_read(client, &requests, sizeof(requests), errp);
     if (rc <= 0) {
         return rc;
     }
-    be16_to_cpus(&requests);
+    requests = be16_to_cpu(requests);
     trace_nbd_negotiate_handle_info_requests(requests);
     while (requests--) {
         rc = nbd_opt_read(client, &request, sizeof(request), errp);
         if (rc <= 0) {
             return rc;
         }
-        be16_to_cpus(&request);
+        request = be16_to_cpu(request);
         trace_nbd_negotiate_handle_info_request(request,
                                                 nbd_info_lookup(request));
         /* We care about NBD_INFO_NAME and NBD_INFO_BLOCK_SIZE;
@@ -541,9 +618,9 @@ static int nbd_negotiate_handle_info(NBDClient *client, uint16_t myflags,
     /* maximum - At most 32M, but smaller as appropriate. */
     sizes[2] = MIN(blk_get_max_transfer(exp->blk), NBD_MAX_BUFFER_SIZE);
     trace_nbd_negotiate_handle_info_block_size(sizes[0], sizes[1], sizes[2]);
-    cpu_to_be32s(&sizes[0]);
-    cpu_to_be32s(&sizes[1]);
-    cpu_to_be32s(&sizes[2]);
+    sizes[0] = cpu_to_be32(sizes[0]);
+    sizes[1] = cpu_to_be32(sizes[1]);
+    sizes[2] = cpu_to_be32(sizes[2]);
     rc = nbd_negotiate_send_info(client, NBD_INFO_BLOCK_SIZE,
                                  sizeof(sizes), sizes, errp);
     if (rc < 0) {
@@ -583,6 +660,7 @@ static int nbd_negotiate_handle_info(NBDClient *client, uint16_t myflags,
         client->exp = exp;
         QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
         nbd_export_get(client->exp);
+        nbd_check_meta_export(client);
         rc = 1;
     }
     return rc;
@@ -637,6 +715,306 @@ static QIOChannel *nbd_negotiate_handle_starttls(NBDClient *client,
     return QIO_CHANNEL(tioc);
 }
 
+/* nbd_negotiate_send_meta_context
+ *
+ * Send one chunk of reply to NBD_OPT_{LIST,SET}_META_CONTEXT
+ *
+ * For NBD_OPT_LIST_META_CONTEXT @context_id is ignored, 0 is used instead.
+ */
+static int nbd_negotiate_send_meta_context(NBDClient *client,
+                                           const char *context,
+                                           uint32_t context_id,
+                                           Error **errp)
+{
+    NBDOptionReplyMetaContext opt;
+    struct iovec iov[] = {
+        {.iov_base = &opt, .iov_len = sizeof(opt)},
+        {.iov_base = (void *)context, .iov_len = strlen(context)}
+    };
+
+    if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
+        context_id = 0;
+    }
+
+    trace_nbd_negotiate_meta_query_reply(context, context_id);
+    set_be_option_rep(&opt.h, client->opt, NBD_REP_META_CONTEXT,
+                      sizeof(opt) - sizeof(opt.h) + iov[1].iov_len);
+    stl_be_p(&opt.context_id, context_id);
+
+    return qio_channel_writev_all(client->ioc, iov, 2, errp) < 0 ? -EIO : 0;
+}
+
+/* Read strlen(@pattern) bytes, and set @match to true if they match @pattern.
+ * @match is never set to false.
+ *
+ * Return -errno on I/O error, 0 if option was completely handled by
+ * sending a reply about inconsistent lengths, or 1 on success.
+ *
+ * Note: return code = 1 doesn't mean that we've read exactly @pattern.
+ * It only means that there are no errors.
+ */
+static int nbd_meta_pattern(NBDClient *client, const char *pattern, bool *match,
+                            Error **errp)
+{
+    int ret;
+    char *query;
+    size_t len = strlen(pattern);
+
+    assert(len);
+
+    query = g_malloc(len);
+    ret = nbd_opt_read(client, query, len, errp);
+    if (ret <= 0) {
+        g_free(query);
+        return ret;
+    }
+
+    if (strncmp(query, pattern, len) == 0) {
+        trace_nbd_negotiate_meta_query_parse(pattern);
+        *match = true;
+    } else {
+        trace_nbd_negotiate_meta_query_skip("pattern not matched");
+    }
+    g_free(query);
+
+    return 1;
+}
+
+/*
+ * Read @len bytes, and set @match to true if they match @pattern, or if @len
+ * is 0 and the client is performing _LIST_. @match is never set to false.
+ *
+ * Return -errno on I/O error, 0 if option was completely handled by
+ * sending a reply about inconsistent lengths, or 1 on success.
+ *
+ * Note: return code = 1 doesn't mean that we've read exactly @pattern.
+ * It only means that there are no errors.
+ */
+static int nbd_meta_empty_or_pattern(NBDClient *client, const char *pattern,
+                                     uint32_t len, bool *match, Error **errp)
+{
+    if (len == 0) {
+        if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
+            *match = true;
+        }
+        trace_nbd_negotiate_meta_query_parse("empty");
+        return 1;
+    }
+
+    if (len != strlen(pattern)) {
+        trace_nbd_negotiate_meta_query_skip("different lengths");
+        return nbd_opt_skip(client, len, errp);
+    }
+
+    return nbd_meta_pattern(client, pattern, match, errp);
+}
+
+/* nbd_meta_base_query
+ *
+ * Handle queries to 'base' namespace. For now, only the base:allocation
+ * context is available.  'len' is the amount of text remaining to be read from
+ * the current name, after the 'base:' portion has been stripped.
+ *
+ * Return -errno on I/O error, 0 if option was completely handled by
+ * sending a reply about inconsistent lengths, or 1 on success.
+ */
+static int nbd_meta_base_query(NBDClient *client, NBDExportMetaContexts *meta,
+                               uint32_t len, Error **errp)
+{
+    return nbd_meta_empty_or_pattern(client, "allocation", len,
+                                     &meta->base_allocation, errp);
+}
+
+/* nbd_meta_bitmap_query
+ *
+ * Handle query to 'qemu:' namespace.
+ * @len is the amount of text remaining to be read from the current name, after
+ * the 'qemu:' portion has been stripped.
+ *
+ * Return -errno on I/O error, 0 if option was completely handled by
+ * sending a reply about inconsistent lengths, or 1 on success. */
+static int nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
+                               uint32_t len, Error **errp)
+{
+    bool dirty_bitmap = false;
+    size_t dirty_bitmap_len = strlen("dirty-bitmap:");
+    int ret;
+
+    if (!meta->exp->export_bitmap) {
+        trace_nbd_negotiate_meta_query_skip("no dirty-bitmap exported");
+        return nbd_opt_skip(client, len, errp);
+    }
+
+    if (len == 0) {
+        if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
+            meta->bitmap = true;
+        }
+        trace_nbd_negotiate_meta_query_parse("empty");
+        return 1;
+    }
+
+    if (len < dirty_bitmap_len) {
+        trace_nbd_negotiate_meta_query_skip("not dirty-bitmap:");
+        return nbd_opt_skip(client, len, errp);
+    }
+
+    len -= dirty_bitmap_len;
+    ret = nbd_meta_pattern(client, "dirty-bitmap:", &dirty_bitmap, errp);
+    if (ret <= 0) {
+        return ret;
+    }
+    if (!dirty_bitmap) {
+        trace_nbd_negotiate_meta_query_skip("not dirty-bitmap:");
+        return nbd_opt_skip(client, len, errp);
+    }
+
+    trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
+
+    return nbd_meta_empty_or_pattern(
+            client, meta->exp->export_bitmap_context +
+            strlen("qemu:dirty_bitmap:"), len, &meta->bitmap, errp);
+}
+
+/* nbd_negotiate_meta_query
+ *
+ * Parse namespace name and call corresponding function to parse body of the
+ * query.
+ *
+ * The only supported namespace now is 'base'.
+ *
+ * The function aims not wasting time and memory to read long unknown namespace
+ * names.
+ *
+ * Return -errno on I/O error, 0 if option was completely handled by
+ * sending a reply about inconsistent lengths, or 1 on success. */
+static int nbd_negotiate_meta_query(NBDClient *client,
+                                    NBDExportMetaContexts *meta, Error **errp)
+{
+    /*
+     * Both 'qemu' and 'base' namespaces have length = 5 including a
+     * colon. If another length namespace is later introduced, this
+     * should certainly be refactored.
+     */
+    int ret;
+    size_t ns_len = 5;
+    char ns[5];
+    uint32_t len;
+
+    ret = nbd_opt_read(client, &len, sizeof(len), errp);
+    if (ret <= 0) {
+        return ret;
+    }
+    len = cpu_to_be32(len);
+
+    if (len < ns_len) {
+        trace_nbd_negotiate_meta_query_skip("length too short");
+        return nbd_opt_skip(client, len, errp);
+    }
+
+    len -= ns_len;
+    ret = nbd_opt_read(client, ns, ns_len, errp);
+    if (ret <= 0) {
+        return ret;
+    }
+
+    if (!strncmp(ns, "base:", ns_len)) {
+        trace_nbd_negotiate_meta_query_parse("base:");
+        return nbd_meta_base_query(client, meta, len, errp);
+    } else if (!strncmp(ns, "qemu:", ns_len)) {
+        trace_nbd_negotiate_meta_query_parse("qemu:");
+        return nbd_meta_qemu_query(client, meta, len, errp);
+    }
+
+    trace_nbd_negotiate_meta_query_skip("unknown namespace");
+    return nbd_opt_skip(client, len, errp);
+}
+
+/* nbd_negotiate_meta_queries
+ * Handle NBD_OPT_LIST_META_CONTEXT and NBD_OPT_SET_META_CONTEXT
+ *
+ * Return -errno on I/O error, or 0 if option was completely handled. */
+static int nbd_negotiate_meta_queries(NBDClient *client,
+                                      NBDExportMetaContexts *meta, Error **errp)
+{
+    int ret;
+    char export_name[NBD_MAX_NAME_SIZE + 1];
+    NBDExportMetaContexts local_meta;
+    uint32_t nb_queries;
+    int i;
+
+    if (!client->structured_reply) {
+        return nbd_opt_invalid(client, errp,
+                               "request option '%s' when structured reply "
+                               "is not negotiated",
+                               nbd_opt_lookup(client->opt));
+    }
+
+    if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
+        /* Only change the caller's meta on SET. */
+        meta = &local_meta;
+    }
+
+    memset(meta, 0, sizeof(*meta));
+
+    ret = nbd_opt_read_name(client, export_name, NULL, errp);
+    if (ret <= 0) {
+        return ret;
+    }
+
+    meta->exp = nbd_export_find(export_name);
+    if (meta->exp == NULL) {
+        return nbd_opt_drop(client, NBD_REP_ERR_UNKNOWN, errp,
+                            "export '%s' not present", export_name);
+    }
+
+    ret = nbd_opt_read(client, &nb_queries, sizeof(nb_queries), errp);
+    if (ret <= 0) {
+        return ret;
+    }
+    nb_queries = cpu_to_be32(nb_queries);
+    trace_nbd_negotiate_meta_context(nbd_opt_lookup(client->opt),
+                                     export_name, nb_queries);
+
+    if (client->opt == NBD_OPT_LIST_META_CONTEXT && !nb_queries) {
+        /* enable all known contexts */
+        meta->base_allocation = true;
+        meta->bitmap = !!meta->exp->export_bitmap;
+    } else {
+        for (i = 0; i < nb_queries; ++i) {
+            ret = nbd_negotiate_meta_query(client, meta, errp);
+            if (ret <= 0) {
+                return ret;
+            }
+        }
+    }
+
+    if (meta->base_allocation) {
+        ret = nbd_negotiate_send_meta_context(client, "base:allocation",
+                                              NBD_META_ID_BASE_ALLOCATION,
+                                              errp);
+        if (ret < 0) {
+            return ret;
+        }
+    }
+
+    if (meta->bitmap) {
+        ret = nbd_negotiate_send_meta_context(client,
+                                              meta->exp->export_bitmap_context,
+                                              NBD_META_ID_DIRTY_BITMAP,
+                                              errp);
+        if (ret < 0) {
+            return ret;
+        }
+    }
+
+    ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
+    if (ret == 0) {
+        meta->valid = true;
+    }
+
+    return ret;
+}
+
 /* nbd_negotiate_options
  * Process all NBD_OPT_* client option commands, during fixed newstyle
  * negotiation.
@@ -672,7 +1050,7 @@ static int nbd_negotiate_options(NBDClient *client, uint16_t myflags,
         error_prepend(errp, "read failed: ");
         return -EIO;
     }
-    be32_to_cpus(&flags);
+    flags = be32_to_cpu(flags);
     trace_nbd_negotiate_options_flags(flags);
     if (flags & NBD_FLAG_C_FIXED_NEWSTYLE) {
         fixedNewstyle = true;
@@ -757,12 +1135,16 @@ static int nbd_negotiate_options(NBDClient *client, uint16_t myflags,
                 return -EINVAL;
 
             default:
-                ret = nbd_opt_drop(client, NBD_REP_ERR_TLS_REQD, errp,
-                                   "Option 0x%" PRIx32
-                                   "not permitted before TLS", option);
                 /* Let the client keep trying, unless they asked to
-                 * quit. In this mode, we've already sent an error, so
-                 * we can't ack the abort.  */
+                 * quit. Always try to give an error back to the
+                 * client; but when replying to OPT_ABORT, be aware
+                 * that the client may hang up before receiving the
+                 * error, in which case we are fine ignoring the
+                 * resulting EPIPE. */
+                ret = nbd_opt_drop(client, NBD_REP_ERR_TLS_REQD,
+                                   option == NBD_OPT_ABORT ? NULL : errp,
+                                   "Option 0x%" PRIx32
+                                   " not permitted before TLS", option);
                 if (option == NBD_OPT_ABORT) {
                     return 1;
                 }
@@ -827,6 +1209,12 @@ static int nbd_negotiate_options(NBDClient *client, uint16_t myflags,
                 }
                 break;
 
+            case NBD_OPT_LIST_META_CONTEXT:
+            case NBD_OPT_SET_META_CONTEXT:
+                ret = nbd_negotiate_meta_queries(client, &client->export_meta,
+                                                 errp);
+                break;
+
             default:
                 ret = nbd_opt_drop(client, NBD_REP_ERR_UNSUP, errp,
                                    "Unsupported option %" PRIu32 " (%s)",
@@ -869,8 +1257,7 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
     int ret;
     const uint16_t myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
                               NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA |
-                              NBD_FLAG_SEND_WRITE_ZEROES);
-    bool oldStyle;
+                              NBD_FLAG_SEND_WRITE_ZEROES | NBD_FLAG_SEND_CACHE);
 
     /* Old style negotiation header, no room for options
         [ 0 ..   7]   passwd       ("NBDMAGIC")
@@ -891,33 +1278,19 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
     trace_nbd_negotiate_begin();
     memcpy(buf, "NBDMAGIC", 8);
 
-    oldStyle = client->exp != NULL && !client->tlscreds;
-    if (oldStyle) {
-        trace_nbd_negotiate_old_style(client->exp->size,
-                                      client->exp->nbdflags | myflags);
-        stq_be_p(buf + 8, NBD_CLIENT_MAGIC);
-        stq_be_p(buf + 16, client->exp->size);
-        stl_be_p(buf + 24, client->exp->nbdflags | myflags);
+    stq_be_p(buf + 8, NBD_OPTS_MAGIC);
+    stw_be_p(buf + 16, NBD_FLAG_FIXED_NEWSTYLE | NBD_FLAG_NO_ZEROES);
 
-        if (nbd_write(client->ioc, buf, sizeof(buf), errp) < 0) {
-            error_prepend(errp, "write failed: ");
-            return -EINVAL;
-        }
-    } else {
-        stq_be_p(buf + 8, NBD_OPTS_MAGIC);
-        stw_be_p(buf + 16, NBD_FLAG_FIXED_NEWSTYLE | NBD_FLAG_NO_ZEROES);
-
-        if (nbd_write(client->ioc, buf, 18, errp) < 0) {
-            error_prepend(errp, "write failed: ");
-            return -EINVAL;
-        }
-        ret = nbd_negotiate_options(client, myflags, errp);
-        if (ret != 0) {
-            if (ret < 0) {
-                error_prepend(errp, "option negotiation failed: ");
-            }
-            return ret;
+    if (nbd_write(client->ioc, buf, 18, errp) < 0) {
+        error_prepend(errp, "write failed: ");
+        return -EINVAL;
+    }
+    ret = nbd_negotiate_options(client, myflags, errp);
+    if (ret != 0) {
+        if (ret < 0) {
+            error_prepend(errp, "option negotiation failed: ");
         }
+        return ret;
     }
 
     assert(!client->optlen);
@@ -1082,10 +1455,11 @@ static void nbd_eject_notifier(Notifier *n, void *data)
     nbd_export_close(exp);
 }
 
-NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, off_t size,
-                          uint16_t nbdflags, void (*close)(NBDExport *),
-                          bool writethrough, BlockBackend *on_eject_blk,
-                          Error **errp)
+NBDExport *nbd_export_new(BlockDriverState *bs, uint64_t dev_offset,
+                          uint64_t size, const char *name, const char *desc,
+                          const char *bitmap, uint16_t nbdflags,
+                          void (*close)(NBDExport *), bool writethrough,
+                          BlockBackend *on_eject_blk, Error **errp)
 {
     AioContext *ctx;
     BlockBackend *blk;
@@ -1098,6 +1472,7 @@ NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, off_t size,
      * that BDRV_O_INACTIVE is cleared and the image is ready for write
      * access since the export could be available before migration handover.
      */
+    assert(name);
     ctx = bdrv_get_aio_context(bs);
     aio_context_acquire(ctx);
     bdrv_invalidate_cache(bs, NULL);
@@ -1120,15 +1495,50 @@ NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, off_t size,
     exp->refcount = 1;
     QTAILQ_INIT(&exp->clients);
     exp->blk = blk;
+    assert(dev_offset <= INT64_MAX);
     exp->dev_offset = dev_offset;
+    exp->name = g_strdup(name);
+    exp->description = g_strdup(desc);
     exp->nbdflags = nbdflags;
-    exp->size = size < 0 ? blk_getlength(blk) : size;
-    if (exp->size < 0) {
-        error_setg_errno(errp, -exp->size,
-                         "Failed to determine the NBD export's length");
-        goto fail;
+    assert(size <= INT64_MAX - dev_offset);
+    exp->size = QEMU_ALIGN_DOWN(size, BDRV_SECTOR_SIZE);
+
+    if (bitmap) {
+        BdrvDirtyBitmap *bm = NULL;
+        BlockDriverState *bs = blk_bs(blk);
+
+        while (true) {
+            bm = bdrv_find_dirty_bitmap(bs, bitmap);
+            if (bm != NULL || bs->backing == NULL) {
+                break;
+            }
+
+            bs = bs->backing->bs;
+        }
+
+        if (bm == NULL) {
+            error_setg(errp, "Bitmap '%s' is not found", bitmap);
+            goto fail;
+        }
+
+        if ((nbdflags & NBD_FLAG_READ_ONLY) && bdrv_is_writable(bs) &&
+            bdrv_dirty_bitmap_enabled(bm)) {
+            error_setg(errp,
+                       "Enabled bitmap '%s' incompatible with readonly export",
+                       bitmap);
+            goto fail;
+        }
+
+        if (bdrv_dirty_bitmap_user_locked(bm)) {
+            error_setg(errp, "Bitmap '%s' is in use", bitmap);
+            goto fail;
+        }
+
+        bdrv_dirty_bitmap_set_qmp_locked(bm, true);
+        exp->export_bitmap = bm;
+        exp->export_bitmap_context = g_strdup_printf("qemu:dirty-bitmap:%s",
+                                                     bitmap);
     }
-    exp->size -= exp->size % BDRV_SECTOR_SIZE;
 
     exp->close = close;
     exp->ctx = blk_get_aio_context(blk);
@@ -1140,10 +1550,14 @@ NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, off_t size,
         exp->eject_notifier.notify = nbd_eject_notifier;
         blk_add_remove_bs_notifier(on_eject_blk, &exp->eject_notifier);
     }
+    QTAILQ_INSERT_TAIL(&exports, exp, next);
+    nbd_export_get(exp);
     return exp;
 
 fail:
     blk_unref(blk);
+    g_free(exp->name);
+    g_free(exp->description);
     g_free(exp);
     return NULL;
 }
@@ -1160,43 +1574,29 @@ NBDExport *nbd_export_find(const char *name)
     return NULL;
 }
 
-void nbd_export_set_name(NBDExport *exp, const char *name)
-{
-    if (exp->name == name) {
-        return;
-    }
-
-    nbd_export_get(exp);
-    if (exp->name != NULL) {
-        g_free(exp->name);
-        exp->name = NULL;
-        QTAILQ_REMOVE(&exports, exp, next);
-        nbd_export_put(exp);
-    }
-    if (name != NULL) {
-        nbd_export_get(exp);
-        exp->name = g_strdup(name);
-        QTAILQ_INSERT_TAIL(&exports, exp, next);
-    }
-    nbd_export_put(exp);
-}
-
-void nbd_export_set_description(NBDExport *exp, const char *description)
-{
-    g_free(exp->description);
-    exp->description = g_strdup(description);
-}
-
 void nbd_export_close(NBDExport *exp)
 {
     NBDClient *client, *next;
 
     nbd_export_get(exp);
+    /*
+     * TODO: Should we expand QMP NbdServerRemoveNode enum to allow a
+     * close mode that stops advertising the export to new clients but
+     * still permits existing clients to run to completion? Because of
+     * that possibility, nbd_export_close() can be called more than
+     * once on an export.
+     */
     QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
         client_close(client, true);
     }
-    nbd_export_set_name(exp, NULL);
-    nbd_export_set_description(exp, NULL);
+    if (exp->name) {
+        nbd_export_put(exp);
+        g_free(exp->name);
+        exp->name = NULL;
+        QTAILQ_REMOVE(&exports, exp, next);
+    }
+    g_free(exp->description);
+    exp->description = NULL;
     nbd_export_put(exp);
 }
 
@@ -1251,6 +1651,11 @@ void nbd_export_put(NBDExport *exp)
             exp->blk = NULL;
         }
 
+        if (exp->export_bitmap) {
+            bdrv_dirty_bitmap_set_qmp_locked(exp->export_bitmap, false);
+            g_free(exp->export_bitmap_context);
+        }
+
         g_free(exp);
     }
 }
@@ -1456,6 +1861,208 @@ static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client,
     return ret;
 }
 
+/*
+ * Populate @extents from block status. Update @bytes to be the actual
+ * length encoded (which may be smaller than the original), and update
+ * @nb_extents to the number of extents used.
+ *
+ * Returns zero on success and -errno on bdrv_block_status_above failure.
+ */
+static int blockstatus_to_extents(BlockDriverState *bs, uint64_t offset,
+                                  uint64_t *bytes, NBDExtent *extents,
+                                  unsigned int *nb_extents)
+{
+    uint64_t remaining_bytes = *bytes;
+    NBDExtent *extent = extents, *extents_end = extents + *nb_extents;
+    bool first_extent = true;
+
+    assert(*nb_extents);
+    while (remaining_bytes) {
+        uint32_t flags;
+        int64_t num;
+        int ret = bdrv_block_status_above(bs, NULL, offset, remaining_bytes,
+                                          &num, NULL, NULL);
+
+        if (ret < 0) {
+            return ret;
+        }
+
+        flags = (ret & BDRV_BLOCK_ALLOCATED ? 0 : NBD_STATE_HOLE) |
+                (ret & BDRV_BLOCK_ZERO      ? NBD_STATE_ZERO : 0);
+        offset += num;
+        remaining_bytes -= num;
+
+        if (first_extent) {
+            extent->flags = flags;
+            extent->length = num;
+            first_extent = false;
+            continue;
+        }
+
+        if (flags == extent->flags) {
+            /* extend current extent */
+            extent->length += num;
+        } else {
+            if (extent + 1 == extents_end) {
+                break;
+            }
+
+            /* start new extent */
+            extent++;
+            extent->flags = flags;
+            extent->length = num;
+        }
+    }
+
+    extents_end = extent + 1;
+
+    for (extent = extents; extent < extents_end; extent++) {
+        extent->flags = cpu_to_be32(extent->flags);
+        extent->length = cpu_to_be32(extent->length);
+    }
+
+    *bytes -= remaining_bytes;
+    *nb_extents = extents_end - extents;
+
+    return 0;
+}
+
+/* nbd_co_send_extents
+ *
+ * @length is only for tracing purposes (and may be smaller or larger
+ * than the client's original request). @last controls whether
+ * NBD_REPLY_FLAG_DONE is sent. @extents should already be in
+ * big-endian format.
+ */
+static int nbd_co_send_extents(NBDClient *client, uint64_t handle,
+                               NBDExtent *extents, unsigned int nb_extents,
+                               uint64_t length, bool last,
+                               uint32_t context_id, Error **errp)
+{
+    NBDStructuredMeta chunk;
+
+    struct iovec iov[] = {
+        {.iov_base = &chunk, .iov_len = sizeof(chunk)},
+        {.iov_base = extents, .iov_len = nb_extents * sizeof(extents[0])}
+    };
+
+    trace_nbd_co_send_extents(handle, nb_extents, context_id, length, last);
+    set_be_chunk(&chunk.h, last ? NBD_REPLY_FLAG_DONE : 0,
+                 NBD_REPLY_TYPE_BLOCK_STATUS,
+                 handle, sizeof(chunk) - sizeof(chunk.h) + iov[1].iov_len);
+    stl_be_p(&chunk.context_id, context_id);
+
+    return nbd_co_send_iov(client, iov, 2, errp);
+}
+
+/* Get block status from the exported device and send it to the client */
+static int nbd_co_send_block_status(NBDClient *client, uint64_t handle,
+                                    BlockDriverState *bs, uint64_t offset,
+                                    uint32_t length, bool dont_fragment,
+                                    bool last, uint32_t context_id,
+                                    Error **errp)
+{
+    int ret;
+    unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BITMAP_EXTENTS;
+    NBDExtent *extents = g_new(NBDExtent, nb_extents);
+    uint64_t final_length = length;
+
+    ret = blockstatus_to_extents(bs, offset, &final_length, extents,
+                                 &nb_extents);
+    if (ret < 0) {
+        g_free(extents);
+        return nbd_co_send_structured_error(
+                client, handle, -ret, "can't get block status", errp);
+    }
+
+    ret = nbd_co_send_extents(client, handle, extents, nb_extents,
+                              final_length, last, context_id, errp);
+
+    g_free(extents);
+
+    return ret;
+}
+
+/*
+ * Populate @extents from a dirty bitmap. Unless @dont_fragment, the
+ * final extent may exceed the original @length. Store in @length the
+ * byte length encoded (which may be smaller or larger than the
+ * original), and return the number of extents used.
+ */
+static unsigned int bitmap_to_extents(BdrvDirtyBitmap *bitmap, uint64_t offset,
+                                      uint64_t *length, NBDExtent *extents,
+                                      unsigned int nb_extents,
+                                      bool dont_fragment)
+{
+    uint64_t begin = offset, end = offset;
+    uint64_t overall_end = offset + *length;
+    unsigned int i = 0;
+    BdrvDirtyBitmapIter *it;
+    bool dirty;
+
+    bdrv_dirty_bitmap_lock(bitmap);
+
+    it = bdrv_dirty_iter_new(bitmap);
+    dirty = bdrv_get_dirty_locked(NULL, bitmap, offset);
+
+    assert(begin < overall_end && nb_extents);
+    while (begin < overall_end && i < nb_extents) {
+        bool next_dirty = !dirty;
+
+        if (dirty) {
+            end = bdrv_dirty_bitmap_next_zero(bitmap, begin, UINT64_MAX);
+        } else {
+            bdrv_set_dirty_iter(it, begin);
+            end = bdrv_dirty_iter_next(it);
+        }
+        if (end == -1 || end - begin > UINT32_MAX) {
+            /* Cap to an aligned value < 4G beyond begin. */
+            end = MIN(bdrv_dirty_bitmap_size(bitmap),
+                      begin + UINT32_MAX + 1 -
+                      bdrv_dirty_bitmap_granularity(bitmap));
+            next_dirty = dirty;
+        }
+        if (dont_fragment && end > overall_end) {
+            end = overall_end;
+        }
+
+        extents[i].length = cpu_to_be32(end - begin);
+        extents[i].flags = cpu_to_be32(dirty ? NBD_STATE_DIRTY : 0);
+        i++;
+        begin = end;
+        dirty = next_dirty;
+    }
+
+    bdrv_dirty_iter_free(it);
+
+    bdrv_dirty_bitmap_unlock(bitmap);
+
+    assert(offset < end);
+    *length = end - offset;
+    return i;
+}
+
+static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
+                              BdrvDirtyBitmap *bitmap, uint64_t offset,
+                              uint32_t length, bool dont_fragment, bool last,
+                              uint32_t context_id, Error **errp)
+{
+    int ret;
+    unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BITMAP_EXTENTS;
+    NBDExtent *extents = g_new(NBDExtent, nb_extents);
+    uint64_t final_length = length;
+
+    nb_extents = bitmap_to_extents(bitmap, offset, &final_length, extents,
+                                   nb_extents, dont_fragment);
+
+    ret = nbd_co_send_extents(client, handle, extents, nb_extents,
+                              final_length, last, context_id, errp);
+
+    g_free(extents);
+
+    return ret;
+}
+
 /* nbd_co_receive_request
  * Collect a client request. Return 0 if request looks valid, -EIO to drop
  * connection right away, and any other negative value to report an error to
@@ -1488,7 +2095,9 @@ static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
         return -EIO;
     }
 
-    if (request->type == NBD_CMD_READ || request->type == NBD_CMD_WRITE) {
+    if (request->type == NBD_CMD_READ || request->type == NBD_CMD_WRITE ||
+        request->type == NBD_CMD_CACHE)
+    {
         if (request->len > NBD_MAX_BUFFER_SIZE) {
             error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)",
                        request->len, NBD_MAX_BUFFER_SIZE);
@@ -1521,10 +2130,10 @@ static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
         return -EROFS;
     }
     if (request->from > client->exp->size ||
-        request->from + request->len > client->exp->size) {
+        request->len > client->exp->size - request->from) {
         error_setg(errp, "operation past EOF; From: %" PRIu64 ", Len: %" PRIu32
                    ", Size: %" PRIu64, request->from, request->len,
-                   (uint64_t)client->exp->size);
+                   client->exp->size);
         return (request->type == NBD_CMD_WRITE ||
                 request->type == NBD_CMD_WRITE_ZEROES) ? -ENOSPC : -EINVAL;
     }
@@ -1533,6 +2142,8 @@ static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
         valid_flags |= NBD_CMD_FLAG_DF;
     } else if (request->type == NBD_CMD_WRITE_ZEROES) {
         valid_flags |= NBD_CMD_FLAG_NO_HOLE;
+    } else if (request->type == NBD_CMD_BLOCK_STATUS) {
+        valid_flags |= NBD_CMD_FLAG_REQ_ONE;
     }
     if (request->flags & ~valid_flags) {
         error_setg(errp, "unsupported flags for command %s (got 0x%x)",
@@ -1571,7 +2182,7 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
     int ret;
     NBDExport *exp = client->exp;
 
-    assert(request->type == NBD_CMD_READ);
+    assert(request->type == NBD_CMD_READ || request->type == NBD_CMD_CACHE);
 
     /* XXX: NBD Protocol only documents use of FUA with WRITE */
     if (request->flags & NBD_CMD_FLAG_FUA) {
@@ -1583,14 +2194,15 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
     }
 
     if (client->structured_reply && !(request->flags & NBD_CMD_FLAG_DF) &&
-        request->len) {
+        request->len && request->type != NBD_CMD_CACHE)
+    {
         return nbd_co_send_sparse_read(client, request->handle, request->from,
                                        data, request->len, errp);
     }
 
     ret = blk_pread(exp->blk, request->from + exp->dev_offset, data,
                     request->len);
-    if (ret < 0) {
+    if (ret < 0 || request->type == NBD_CMD_CACHE) {
         return nbd_send_generic_reply(client, request->handle, ret,
                                       "reading from file failed", errp);
     }
@@ -1623,6 +2235,7 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
 
     switch (request->type) {
     case NBD_CMD_READ:
+    case NBD_CMD_CACHE:
         return nbd_do_cmd_read(client, request, data, errp);
 
     case NBD_CMD_WRITE:
@@ -1666,6 +2279,47 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
         return nbd_send_generic_reply(client, request->handle, ret,
                                       "discard failed", errp);
 
+    case NBD_CMD_BLOCK_STATUS:
+        if (!request->len) {
+            return nbd_send_generic_reply(client, request->handle, -EINVAL,
+                                          "need non-zero length", errp);
+        }
+        if (client->export_meta.valid &&
+            (client->export_meta.base_allocation ||
+             client->export_meta.bitmap))
+        {
+            bool dont_fragment = request->flags & NBD_CMD_FLAG_REQ_ONE;
+
+            if (client->export_meta.base_allocation) {
+                ret = nbd_co_send_block_status(client, request->handle,
+                                               blk_bs(exp->blk), request->from,
+                                               request->len, dont_fragment,
+                                               !client->export_meta.bitmap,
+                                               NBD_META_ID_BASE_ALLOCATION,
+                                               errp);
+                if (ret < 0) {
+                    return ret;
+                }
+            }
+
+            if (client->export_meta.bitmap) {
+                ret = nbd_co_send_bitmap(client, request->handle,
+                                         client->exp->export_bitmap,
+                                         request->from, request->len,
+                                         dont_fragment,
+                                         true, NBD_META_ID_DIRTY_BITMAP, errp);
+                if (ret < 0) {
+                    return ret;
+                }
+            }
+
+            return ret;
+        } else {
+            return nbd_send_generic_reply(client, request->handle, -EINVAL,
+                                          "CMD_BLOCK_STATUS not negotiated",
+                                          errp);
+        }
+
     default:
         msg = g_strdup_printf("invalid request type (%" PRIu32 ") received",
                               request->type);
@@ -1759,13 +2413,8 @@ static void nbd_client_receive_next_request(NBDClient *client)
 static coroutine_fn void nbd_co_client_start(void *opaque)
 {
     NBDClient *client = opaque;
-    NBDExport *exp = client->exp;
     Error *local_err = NULL;
 
-    if (exp) {
-        nbd_export_get(exp);
-        QTAILQ_INSERT_TAIL(&exp->clients, client, next);
-    }
     qemu_co_mutex_init(&client->send_lock);
 
     if (nbd_negotiate(client, &local_err)) {
@@ -1780,13 +2429,11 @@ static coroutine_fn void nbd_co_client_start(void *opaque)
 }
 
 /*
- * Create a new client listener on the given export @exp, using the
- * given channel @sioc.  Begin servicing it in a coroutine.  When the
- * connection closes, call @close_fn with an indication of whether the
- * client completed negotiation.
+ * Create a new client listener using the given channel @sioc.
+ * Begin servicing it in a coroutine.  When the connection closes, call
+ * @close_fn with an indication of whether the client completed negotiation.
  */
-void nbd_client_new(NBDExport *exp,
-                    QIOChannelSocket *sioc,
+void nbd_client_new(QIOChannelSocket *sioc,
                     QCryptoTLSCreds *tlscreds,
                     const char *tlsaclname,
                     void (*close_fn)(NBDClient *, bool))
@@ -1796,7 +2443,6 @@ void nbd_client_new(NBDExport *exp,
 
     client = g_new0(NBDClient, 1);
     client->refcount = 1;
-    client->exp = exp;
     client->tlscreds = tlscreds;
     if (tlscreds) {
         object_ref(OBJECT(client->tlscreds));