]> git.proxmox.com Git - mirror_qemu.git/blobdiff - block/rbd.c
include/block: Untangle inclusion loops
[mirror_qemu.git] / block / rbd.c
index 617553b022c73728b98bfb694b2ffab9e05e4554..6167c5e424ce72eab559431a9c0aa83df118b3a4 100644 (file)
@@ -18,6 +18,7 @@
 #include "qemu/error-report.h"
 #include "qemu/module.h"
 #include "qemu/option.h"
+#include "block/block-io.h"
 #include "block/block_int.h"
 #include "block/qdict.h"
 #include "crypto/secret.h"
  * leading "\".
  */
 
-/* rbd_aio_discard added in 0.1.2 */
-#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
-#define LIBRBD_SUPPORTS_DISCARD
-#else
-#undef LIBRBD_SUPPORTS_DISCARD
-#endif
-
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
 
 #define RBD_MAX_SNAPS 100
 
-/* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
-#ifdef LIBRBD_SUPPORTS_IOVEC
-#define LIBRBD_USE_IOVEC 1
-#else
-#define LIBRBD_USE_IOVEC 0
-#endif
+#define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
+
+static const char rbd_luks_header_verification[
+        RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
+    'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
+};
+
+static const char rbd_luks2_header_verification[
+        RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
+    'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
+};
 
 typedef enum {
     RBD_AIO_READ,
     RBD_AIO_WRITE,
     RBD_AIO_DISCARD,
-    RBD_AIO_FLUSH
+    RBD_AIO_FLUSH,
+    RBD_AIO_WRITE_ZEROES
 } RBDAIOCmd;
 
-typedef struct RBDAIOCB {
-    BlockAIOCB common;
-    int64_t ret;
-    QEMUIOVector *qiov;
-    char *bounce;
-    RBDAIOCmd cmd;
-    int error;
-    struct BDRVRBDState *s;
-} RBDAIOCB;
-
-typedef struct RADOSCB {
-    RBDAIOCB *acb;
-    struct BDRVRBDState *s;
-    int64_t size;
-    char *buf;
-    int64_t ret;
-} RADOSCB;
-
 typedef struct BDRVRBDState {
     rados_t cluster;
     rados_ioctx_t io_ctx;
@@ -106,28 +88,52 @@ typedef struct BDRVRBDState {
     char *snap;
     char *namespace;
     uint64_t image_size;
+    uint64_t object_size;
 } BDRVRBDState;
 
+typedef struct RBDTask {
+    BlockDriverState *bs;
+    Coroutine *co;
+    bool complete;
+    int64_t ret;
+} RBDTask;
+
+typedef struct RBDDiffIterateReq {
+    uint64_t offs;
+    uint64_t bytes;
+    bool exists;
+} RBDDiffIterateReq;
+
 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
                             BlockdevOptionsRbd *opts, bool cache,
                             const char *keypairs, const char *secretid,
                             Error **errp);
 
+static char *qemu_rbd_strchr(char *src, char delim)
+{
+    char *p;
+
+    for (p = src; *p; ++p) {
+        if (*p == delim) {
+            return p;
+        }
+        if (*p == '\\' && p[1] != '\0') {
+            ++p;
+        }
+    }
+
+    return NULL;
+}
+
+
 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
 {
     char *end;
 
     *p = NULL;
 
-    for (end = src; *end; ++end) {
-        if (*end == delim) {
-            break;
-        }
-        if (*end == '\\' && end[1] != '\0') {
-            end++;
-        }
-    }
-    if (*end == delim) {
+    end = qemu_rbd_strchr(src, delim);
+    if (end) {
         *p = end + 1;
         *end = '\0';
     }
@@ -171,7 +177,7 @@ static void qemu_rbd_parse_filename(const char *filename, QDict *options,
     qemu_rbd_unescape(found_str);
     qdict_put_str(options, "pool", found_str);
 
-    if (strchr(p, '@')) {
+    if (qemu_rbd_strchr(p, '@')) {
         image_name = qemu_rbd_next_tok(p, '@', &p);
 
         found_str = qemu_rbd_next_tok(p, ':', &p);
@@ -181,7 +187,7 @@ static void qemu_rbd_parse_filename(const char *filename, QDict *options,
         image_name = qemu_rbd_next_tok(p, ':', &p);
     }
     /* Check for namespace in the image_name */
-    if (strchr(image_name, '/')) {
+    if (qemu_rbd_strchr(image_name, '/')) {
         found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
         qemu_rbd_unescape(found_str);
         qdict_put_str(options, "namespace", found_str);
@@ -232,7 +238,7 @@ static void qemu_rbd_parse_filename(const char *filename, QDict *options,
 
     if (keypairs) {
         qdict_put(options, "=keyvalue-pairs",
-                  qobject_to_json(QOBJECT(keypairs)));
+                  qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
     }
 
 done:
@@ -241,14 +247,6 @@ done:
     return;
 }
 
-
-static void qemu_rbd_refresh_limits(BlockDriverState *bs, Error **errp)
-{
-    /* XXX Does RBD support AIO on less than 512-byte alignment? */
-    bs->bl.request_alignment = 512;
-}
-
-
 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
                              Error **errp)
 {
@@ -330,58 +328,202 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
     return ret;
 }
 
-static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
+#ifdef LIBRBD_SUPPORTS_ENCRYPTION
+static int qemu_rbd_convert_luks_options(
+        RbdEncryptionOptionsLUKSBase *luks_opts,
+        char **passphrase,
+        size_t *passphrase_len,
+        Error **errp)
 {
-    if (LIBRBD_USE_IOVEC) {
-        RBDAIOCB *acb = rcb->acb;
-        iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
-                   acb->qiov->size - offs);
+    return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
+                                 passphrase_len, errp);
+}
+
+static int qemu_rbd_convert_luks_create_options(
+        RbdEncryptionCreateOptionsLUKSBase *luks_opts,
+        rbd_encryption_algorithm_t *alg,
+        char **passphrase,
+        size_t *passphrase_len,
+        Error **errp)
+{
+    int r = 0;
+
+    r = qemu_rbd_convert_luks_options(
+            qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
+            passphrase, passphrase_len, errp);
+    if (r < 0) {
+        return r;
+    }
+
+    if (luks_opts->has_cipher_alg) {
+        switch (luks_opts->cipher_alg) {
+            case QCRYPTO_CIPHER_ALG_AES_128: {
+                *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
+                break;
+            }
+            case QCRYPTO_CIPHER_ALG_AES_256: {
+                *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
+                break;
+            }
+            default: {
+                r = -ENOTSUP;
+                error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
+                                 luks_opts->cipher_alg);
+                return r;
+            }
+        }
     } else {
-        memset(rcb->buf + offs, 0, rcb->size - offs);
+        /* default alg */
+        *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
     }
+
+    return 0;
 }
 
-static QemuOptsList runtime_opts = {
-    .name = "rbd",
-    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
-    .desc = {
-        {
-            .name = "pool",
-            .type = QEMU_OPT_STRING,
-            .help = "Rados pool name",
-        },
-        {
-            .name = "namespace",
-            .type = QEMU_OPT_STRING,
-            .help = "Rados namespace name in the pool",
-        },
-        {
-            .name = "image",
-            .type = QEMU_OPT_STRING,
-            .help = "Image name in the pool",
-        },
-        {
-            .name = "conf",
-            .type = QEMU_OPT_STRING,
-            .help = "Rados config file location",
-        },
-        {
-            .name = "snapshot",
-            .type = QEMU_OPT_STRING,
-            .help = "Ceph snapshot name",
-        },
-        {
-            /* maps to 'id' in rados_create() */
-            .name = "user",
-            .type = QEMU_OPT_STRING,
-            .help = "Rados id name",
-        },
-        /*
-         * server.* extracted manually, see qemu_rbd_mon_host()
-         */
-        { /* end of list */ }
-    },
-};
+static int qemu_rbd_encryption_format(rbd_image_t image,
+                                      RbdEncryptionCreateOptions *encrypt,
+                                      Error **errp)
+{
+    int r = 0;
+    g_autofree char *passphrase = NULL;
+    size_t passphrase_len;
+    rbd_encryption_format_t format;
+    rbd_encryption_options_t opts;
+    rbd_encryption_luks1_format_options_t luks_opts;
+    rbd_encryption_luks2_format_options_t luks2_opts;
+    size_t opts_size;
+    uint64_t raw_size, effective_size;
+
+    r = rbd_get_size(image, &raw_size);
+    if (r < 0) {
+        error_setg_errno(errp, -r, "cannot get raw image size");
+        return r;
+    }
+
+    switch (encrypt->format) {
+        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
+            memset(&luks_opts, 0, sizeof(luks_opts));
+            format = RBD_ENCRYPTION_FORMAT_LUKS1;
+            opts = &luks_opts;
+            opts_size = sizeof(luks_opts);
+            r = qemu_rbd_convert_luks_create_options(
+                    qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
+                    &luks_opts.alg, &passphrase, &passphrase_len, errp);
+            if (r < 0) {
+                return r;
+            }
+            luks_opts.passphrase = passphrase;
+            luks_opts.passphrase_size = passphrase_len;
+            break;
+        }
+        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
+            memset(&luks2_opts, 0, sizeof(luks2_opts));
+            format = RBD_ENCRYPTION_FORMAT_LUKS2;
+            opts = &luks2_opts;
+            opts_size = sizeof(luks2_opts);
+            r = qemu_rbd_convert_luks_create_options(
+                    qapi_RbdEncryptionCreateOptionsLUKS2_base(
+                            &encrypt->u.luks2),
+                    &luks2_opts.alg, &passphrase, &passphrase_len, errp);
+            if (r < 0) {
+                return r;
+            }
+            luks2_opts.passphrase = passphrase;
+            luks2_opts.passphrase_size = passphrase_len;
+            break;
+        }
+        default: {
+            r = -ENOTSUP;
+            error_setg_errno(
+                    errp, -r, "unknown image encryption format: %u",
+                    encrypt->format);
+            return r;
+        }
+    }
+
+    r = rbd_encryption_format(image, format, opts, opts_size);
+    if (r < 0) {
+        error_setg_errno(errp, -r, "encryption format fail");
+        return r;
+    }
+
+    r = rbd_get_size(image, &effective_size);
+    if (r < 0) {
+        error_setg_errno(errp, -r, "cannot get effective image size");
+        return r;
+    }
+
+    r = rbd_resize(image, raw_size + (raw_size - effective_size));
+    if (r < 0) {
+        error_setg_errno(errp, -r, "cannot resize image after format");
+        return r;
+    }
+
+    return 0;
+}
+
+static int qemu_rbd_encryption_load(rbd_image_t image,
+                                    RbdEncryptionOptions *encrypt,
+                                    Error **errp)
+{
+    int r = 0;
+    g_autofree char *passphrase = NULL;
+    size_t passphrase_len;
+    rbd_encryption_luks1_format_options_t luks_opts;
+    rbd_encryption_luks2_format_options_t luks2_opts;
+    rbd_encryption_format_t format;
+    rbd_encryption_options_t opts;
+    size_t opts_size;
+
+    switch (encrypt->format) {
+        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
+            memset(&luks_opts, 0, sizeof(luks_opts));
+            format = RBD_ENCRYPTION_FORMAT_LUKS1;
+            opts = &luks_opts;
+            opts_size = sizeof(luks_opts);
+            r = qemu_rbd_convert_luks_options(
+                    qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
+                    &passphrase, &passphrase_len, errp);
+            if (r < 0) {
+                return r;
+            }
+            luks_opts.passphrase = passphrase;
+            luks_opts.passphrase_size = passphrase_len;
+            break;
+        }
+        case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
+            memset(&luks2_opts, 0, sizeof(luks2_opts));
+            format = RBD_ENCRYPTION_FORMAT_LUKS2;
+            opts = &luks2_opts;
+            opts_size = sizeof(luks2_opts);
+            r = qemu_rbd_convert_luks_options(
+                    qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
+                    &passphrase, &passphrase_len, errp);
+            if (r < 0) {
+                return r;
+            }
+            luks2_opts.passphrase = passphrase;
+            luks2_opts.passphrase_size = passphrase_len;
+            break;
+        }
+        default: {
+            r = -ENOTSUP;
+            error_setg_errno(
+                    errp, -r, "unknown image encryption format: %u",
+                    encrypt->format);
+            return r;
+        }
+    }
+
+    r = rbd_encryption_load(image, format, opts, opts_size);
+    if (r < 0) {
+        error_setg_errno(errp, -r, "encryption load fail");
+        return r;
+    }
+
+    return 0;
+}
+#endif
 
 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
@@ -395,11 +537,18 @@ static int qemu_rbd_do_create(BlockdevCreateOptions *options,
     int ret;
 
     assert(options->driver == BLOCKDEV_DRIVER_RBD);
-    if (opts->location->has_snapshot) {
+    if (opts->location->snapshot) {
         error_setg(errp, "Can't use snapshot name for image creation");
         return -EINVAL;
     }
 
+#ifndef LIBRBD_SUPPORTS_ENCRYPTION
+    if (opts->encrypt) {
+        error_setg(errp, "RBD library does not support image encryption");
+        return -ENOTSUP;
+    }
+#endif
+
     if (opts->has_cluster_size) {
         int64_t objsize = opts->cluster_size;
         if ((objsize - 1) & objsize) {    /* not a power of 2? */
@@ -425,6 +574,28 @@ static int qemu_rbd_do_create(BlockdevCreateOptions *options,
         goto out;
     }
 
+#ifdef LIBRBD_SUPPORTS_ENCRYPTION
+    if (opts->encrypt) {
+        rbd_image_t image;
+
+        ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
+        if (ret < 0) {
+            error_setg_errno(errp, -ret,
+                             "error opening image '%s' for encryption format",
+                             opts->location->image);
+            goto out;
+        }
+
+        ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
+        rbd_close(image);
+        if (ret < 0) {
+            /* encryption format fail, try removing the image */
+            rbd_remove(io_ctx, opts->location->image);
+            goto out;
+        }
+    }
+#endif
+
     ret = 0;
 out:
     rados_ioctx_destroy(io_ctx);
@@ -437,6 +608,43 @@ static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
     return qemu_rbd_do_create(options, NULL, NULL, errp);
 }
 
+static int qemu_rbd_extract_encryption_create_options(
+        QemuOpts *opts,
+        RbdEncryptionCreateOptions **spec,
+        Error **errp)
+{
+    QDict *opts_qdict;
+    QDict *encrypt_qdict;
+    Visitor *v;
+    int ret = 0;
+
+    opts_qdict = qemu_opts_to_qdict(opts, NULL);
+    qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
+    qobject_unref(opts_qdict);
+    if (!qdict_size(encrypt_qdict)) {
+        *spec = NULL;
+        goto exit;
+    }
+
+    /* Convert options into a QAPI object */
+    v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
+    if (!v) {
+        ret = -EINVAL;
+        goto exit;
+    }
+
+    visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
+    visit_free(v);
+    if (!*spec) {
+        ret = -EINVAL;
+        goto exit;
+    }
+
+exit:
+    qobject_unref(encrypt_qdict);
+    return ret;
+}
+
 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
                                                 const char *filename,
                                                 QemuOpts *opts,
@@ -445,6 +653,7 @@ static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
     BlockdevCreateOptions *create_options;
     BlockdevCreateOptionsRbd *rbd_opts;
     BlockdevOptionsRbd *loc;
+    RbdEncryptionCreateOptions *encrypt = NULL;
     Error *local_err = NULL;
     const char *keypairs, *password_secret;
     QDict *options = NULL;
@@ -473,6 +682,12 @@ static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
         goto exit;
     }
 
+    ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
+    if (ret < 0) {
+        goto exit;
+    }
+    rbd_opts->encrypt     = encrypt;
+
     /*
      * Caution: while qdict_get_try_str() is fine, getting non-string
      * types would require more care.  When @options come from -blockdev
@@ -482,9 +697,7 @@ static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
     loc = rbd_opts->location;
     loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
     loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
-    loc->has_conf    = !!loc->conf;
     loc->user        = g_strdup(qdict_get_try_str(options, "user"));
-    loc->has_user    = !!loc->user;
     loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
     loc->image       = g_strdup(qdict_get_try_str(options, "image"));
     keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
@@ -500,53 +713,6 @@ exit:
     return ret;
 }
 
-/*
- * This aio completion is being called from rbd_finish_bh() and runs in qemu
- * BH context.
- */
-static void qemu_rbd_complete_aio(RADOSCB *rcb)
-{
-    RBDAIOCB *acb = rcb->acb;
-    int64_t r;
-
-    r = rcb->ret;
-
-    if (acb->cmd != RBD_AIO_READ) {
-        if (r < 0) {
-            acb->ret = r;
-            acb->error = 1;
-        } else if (!acb->error) {
-            acb->ret = rcb->size;
-        }
-    } else {
-        if (r < 0) {
-            qemu_rbd_memset(rcb, 0);
-            acb->ret = r;
-            acb->error = 1;
-        } else if (r < rcb->size) {
-            qemu_rbd_memset(rcb, r);
-            if (!acb->error) {
-                acb->ret = rcb->size;
-            }
-        } else if (!acb->error) {
-            acb->ret = r;
-        }
-    }
-
-    g_free(rcb);
-
-    if (!LIBRBD_USE_IOVEC) {
-        if (acb->cmd == RBD_AIO_READ) {
-            qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
-        }
-        qemu_vfree(acb->bounce);
-    }
-
-    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
-
-    qemu_aio_unref(acb);
-}
-
 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
 {
     const char **vals;
@@ -598,25 +764,24 @@ static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
             return -EINVAL;
         }
         opts->key_secret = g_strdup(secretid);
-        opts->has_key_secret = true;
     }
 
     mon_host = qemu_rbd_mon_host(opts, &local_err);
     if (local_err) {
         error_propagate(errp, local_err);
         r = -EINVAL;
-        goto failed_opts;
+        goto out;
     }
 
     r = rados_create(cluster, opts->user);
     if (r < 0) {
         error_setg_errno(errp, -r, "error initializing");
-        goto failed_opts;
+        goto out;
     }
 
     /* try default location when conf=NULL, but ignore failure */
     r = rados_conf_read_file(*cluster, opts->conf);
-    if (opts->has_conf && r < 0) {
+    if (opts->conf && r < 0) {
         error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
         goto failed_shutdown;
     }
@@ -662,17 +827,42 @@ static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
         goto failed_shutdown;
     }
+
+#ifdef HAVE_RBD_NAMESPACE_EXISTS
+    if (opts->q_namespace && strlen(opts->q_namespace) > 0) {
+        bool exists;
+
+        r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
+        if (r < 0) {
+            error_setg_errno(errp, -r, "error checking namespace");
+            goto failed_ioctx_destroy;
+        }
+
+        if (!exists) {
+            error_setg(errp, "namespace '%s' does not exist",
+                       opts->q_namespace);
+            r = -ENOENT;
+            goto failed_ioctx_destroy;
+        }
+    }
+#endif
+
     /*
      * Set the namespace after opening the io context on the pool,
      * if nspace == NULL or if nspace == "", it is just as we did nothing
      */
     rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
 
-    return 0;
+    r = 0;
+    goto out;
 
+#ifdef HAVE_RBD_NAMESPACE_EXISTS
+failed_ioctx_destroy:
+    rados_ioctx_destroy(*io_ctx);
+#endif
 failed_shutdown:
     rados_shutdown(*cluster);
-failed_opts:
+out:
     g_free(mon_host);
     return r;
 }
@@ -681,7 +871,6 @@ static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
                                     Error **errp)
 {
     Visitor *v;
-    Error *local_err = NULL;
 
     /* Convert the remaining options into a QAPI object */
     v = qobject_input_visitor_new_flat_confused(options, errp);
@@ -689,11 +878,9 @@ static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
         return -EINVAL;
     }
 
-    visit_type_BlockdevOptionsRbd(v, NULL, opts, &local_err);
+    visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
     visit_free(v);
-
-    if (local_err) {
-        error_propagate(errp, local_err);
+    if (!opts) {
         return -EINVAL;
     }
 
@@ -735,6 +922,7 @@ static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
     const QDictEntry *e;
     Error *local_err = NULL;
     char *keypairs, *secretid;
+    rbd_image_info_t info;
     int r;
 
     keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
@@ -799,30 +987,49 @@ static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
         goto failed_open;
     }
 
-    r = rbd_get_size(s->image, &s->image_size);
+    if (opts->encrypt) {
+#ifdef LIBRBD_SUPPORTS_ENCRYPTION
+        r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
+        if (r < 0) {
+            goto failed_post_open;
+        }
+#else
+        r = -ENOTSUP;
+        error_setg(errp, "RBD library does not support image encryption");
+        goto failed_post_open;
+#endif
+    }
+
+    r = rbd_stat(s->image, &info, sizeof(info));
     if (r < 0) {
-        error_setg_errno(errp, -r, "error getting image size from %s",
+        error_setg_errno(errp, -r, "error getting image info from %s",
                          s->image_name);
-        rbd_close(s->image);
-        goto failed_open;
+        goto failed_post_open;
     }
+    s->image_size = info.size;
+    s->object_size = info.obj_size;
 
     /* If we are using an rbd snapshot, we must be r/o, otherwise
      * leave as-is */
     if (s->snap != NULL) {
         r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
         if (r < 0) {
-            rbd_close(s->image);
-            goto failed_open;
+            goto failed_post_open;
         }
     }
 
+#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
+    bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
+#endif
+
     /* When extending regular files, we get zeros from the OS */
     bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
 
     r = 0;
     goto out;
 
+failed_post_open:
+    rbd_close(s->image);
 failed_open:
     rados_ioctx_destroy(s->io_ctx);
     g_free(s->snap);
@@ -882,229 +1089,358 @@ static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
     return 0;
 }
 
-static const AIOCBInfo rbd_aiocb_info = {
-    .aiocb_size = sizeof(RBDAIOCB),
-};
-
-static void rbd_finish_bh(void *opaque)
+static void qemu_rbd_finish_bh(void *opaque)
 {
-    RADOSCB *rcb = opaque;
-    qemu_rbd_complete_aio(rcb);
+    RBDTask *task = opaque;
+    task->complete = true;
+    aio_co_wake(task->co);
 }
 
 /*
- * This is the callback function for rbd_aio_read and _write
+ * This is the completion callback function for all rbd aio calls
+ * started from qemu_rbd_start_co().
  *
  * Note: this function is being called from a non qemu thread so
  * we need to be careful about what we do here. Generally we only
  * schedule a BH, and do the rest of the io completion handling
- * from rbd_finish_bh() which runs in a qemu context.
+ * from qemu_rbd_finish_bh() which runs in a qemu context.
  */
-static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
+static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
 {
-    RBDAIOCB *acb = rcb->acb;
-
-    rcb->ret = rbd_aio_get_return_value(c);
+    task->ret = rbd_aio_get_return_value(c);
     rbd_aio_release(c);
-
-    replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
-                                     rbd_finish_bh, rcb);
-}
-
-static int rbd_aio_discard_wrapper(rbd_image_t image,
-                                   uint64_t off,
-                                   uint64_t len,
-                                   rbd_completion_t comp)
-{
-#ifdef LIBRBD_SUPPORTS_DISCARD
-    return rbd_aio_discard(image, off, len, comp);
-#else
-    return -ENOTSUP;
-#endif
-}
-
-static int rbd_aio_flush_wrapper(rbd_image_t image,
-                                 rbd_completion_t comp)
-{
-#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
-    return rbd_aio_flush(image, comp);
-#else
-    return -ENOTSUP;
-#endif
+    aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
+                            qemu_rbd_finish_bh, task);
 }
 
-static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
-                                 int64_t off,
-                                 QEMUIOVector *qiov,
-                                 int64_t size,
-                                 BlockCompletionFunc *cb,
-                                 void *opaque,
-                                 RBDAIOCmd cmd)
+static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
+                                          uint64_t offset,
+                                          uint64_t bytes,
+                                          QEMUIOVector *qiov,
+                                          int flags,
+                                          RBDAIOCmd cmd)
 {
-    RBDAIOCB *acb;
-    RADOSCB *rcb = NULL;
+    BDRVRBDState *s = bs->opaque;
+    RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
     rbd_completion_t c;
     int r;
 
-    BDRVRBDState *s = bs->opaque;
-
-    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
-    acb->cmd = cmd;
-    acb->qiov = qiov;
-    assert(!qiov || qiov->size == size);
+    assert(!qiov || qiov->size == bytes);
 
-    rcb = g_new(RADOSCB, 1);
-
-    if (!LIBRBD_USE_IOVEC) {
-        if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
-            acb->bounce = NULL;
-        } else {
-            acb->bounce = qemu_try_blockalign(bs, qiov->size);
-            if (acb->bounce == NULL) {
-                goto failed;
-            }
-        }
-        if (cmd == RBD_AIO_WRITE) {
-            qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
-        }
-        rcb->buf = acb->bounce;
-    }
-
-    acb->ret = 0;
-    acb->error = 0;
-    acb->s = s;
-
-    rcb->acb = acb;
-    rcb->s = acb->s;
-    rcb->size = size;
-    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
-    if (r < 0) {
-        goto failed;
-    }
-
-    switch (cmd) {
-    case RBD_AIO_WRITE: {
+    if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
         /*
          * RBD APIs don't allow us to write more than actual size, so in order
          * to support growing images, we resize the image before write
          * operations that exceed the current size.
          */
-        if (off + size > s->image_size) {
-            r = qemu_rbd_resize(bs, off + size);
+        if (offset + bytes > s->image_size) {
+            int r = qemu_rbd_resize(bs, offset + bytes);
             if (r < 0) {
-                goto failed_completion;
+                return r;
             }
         }
-#ifdef LIBRBD_SUPPORTS_IOVEC
-            r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
-#else
-            r = rbd_aio_write(s->image, off, size, rcb->buf, c);
-#endif
-        break;
     }
+
+    r = rbd_aio_create_completion(&task,
+                                  (rbd_callback_t) qemu_rbd_completion_cb, &c);
+    if (r < 0) {
+        return r;
+    }
+
+    switch (cmd) {
     case RBD_AIO_READ:
-#ifdef LIBRBD_SUPPORTS_IOVEC
-            r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
-#else
-            r = rbd_aio_read(s->image, off, size, rcb->buf, c);
-#endif
+        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
+        break;
+    case RBD_AIO_WRITE:
+        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
         break;
     case RBD_AIO_DISCARD:
-        r = rbd_aio_discard_wrapper(s->image, off, size, c);
+        r = rbd_aio_discard(s->image, offset, bytes, c);
         break;
     case RBD_AIO_FLUSH:
-        r = rbd_aio_flush_wrapper(s->image, c);
+        r = rbd_aio_flush(s->image, c);
+        break;
+#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
+    case RBD_AIO_WRITE_ZEROES: {
+        int zero_flags = 0;
+#ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
+        if (!(flags & BDRV_REQ_MAY_UNMAP)) {
+            zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
+        }
+#endif
+        r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
         break;
+    }
+#endif
     default:
         r = -EINVAL;
     }
 
     if (r < 0) {
-        goto failed_completion;
+        error_report("rbd request failed early: cmd %d offset %" PRIu64
+                     " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
+                     bytes, flags, r, strerror(-r));
+        rbd_aio_release(c);
+        return r;
     }
-    return &acb->common;
 
-failed_completion:
-    rbd_aio_release(c);
-failed:
-    g_free(rcb);
-    if (!LIBRBD_USE_IOVEC) {
-        qemu_vfree(acb->bounce);
+    while (!task.complete) {
+        qemu_coroutine_yield();
     }
 
-    qemu_aio_unref(acb);
-    return NULL;
+    if (task.ret < 0) {
+        error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
+                     PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
+                     bytes, flags, task.ret, strerror(-task.ret));
+        return task.ret;
+    }
+
+    /* zero pad short reads */
+    if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
+        qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
+    }
+
+    return 0;
 }
 
-static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
-                                       uint64_t offset, uint64_t bytes,
-                                       QEMUIOVector *qiov, int flags,
-                                       BlockCompletionFunc *cb,
-                                       void *opaque)
+static int
+coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
+                                int64_t bytes, QEMUIOVector *qiov,
+                                BdrvRequestFlags flags)
 {
-    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
-                         RBD_AIO_READ);
+    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
 }
 
-static BlockAIOCB *qemu_rbd_aio_pwritev(BlockDriverState *bs,
-                                        uint64_t offset, uint64_t bytes,
-                                        QEMUIOVector *qiov, int flags,
-                                        BlockCompletionFunc *cb,
-                                        void *opaque)
+static int
+coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
+                                 int64_t bytes, QEMUIOVector *qiov,
+                                 BdrvRequestFlags flags)
 {
-    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
-                         RBD_AIO_WRITE);
+    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
 }
 
-#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
-static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
-                                      BlockCompletionFunc *cb,
-                                      void *opaque)
+static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
 {
-    return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
+    return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
 }
 
-#else
+static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
+                                             int64_t offset, int64_t bytes)
+{
+    return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
+}
 
-static int qemu_rbd_co_flush(BlockDriverState *bs)
+#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
+static int
+coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
+                                       int64_t bytes, BdrvRequestFlags flags)
 {
-#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
-    /* rbd_flush added in 0.1.1 */
-    BDRVRBDState *s = bs->opaque;
-    return rbd_flush(s->image);
-#else
-    return 0;
-#endif
+    return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
+                             RBD_AIO_WRITE_ZEROES);
 }
 #endif
 
 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
 {
     BDRVRBDState *s = bs->opaque;
-    rbd_image_info_t info;
+    bdi->cluster_size = s->object_size;
+    return 0;
+}
+
+static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
+                                                     Error **errp)
+{
+    BDRVRBDState *s = bs->opaque;
+    ImageInfoSpecific *spec_info;
+    char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
     int r;
 
-    r = rbd_stat(s->image, &info, sizeof(info));
-    if (r < 0) {
-        return r;
+    if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
+        r = rbd_read(s->image, 0,
+                     RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
+        if (r < 0) {
+            error_setg_errno(errp, -r, "cannot read image start for probe");
+            return NULL;
+        }
+    }
+
+    spec_info = g_new(ImageInfoSpecific, 1);
+    *spec_info = (ImageInfoSpecific){
+        .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
+        .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
+    };
+
+    if (memcmp(buf, rbd_luks_header_verification,
+               RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
+        spec_info->u.rbd.data->encryption_format =
+                RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
+        spec_info->u.rbd.data->has_encryption_format = true;
+    } else if (memcmp(buf, rbd_luks2_header_verification,
+               RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
+        spec_info->u.rbd.data->encryption_format =
+                RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
+        spec_info->u.rbd.data->has_encryption_format = true;
+    } else {
+        spec_info->u.rbd.data->has_encryption_format = false;
+    }
+
+    return spec_info;
+}
+
+/*
+ * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
+ * value in the callback routine. Choose a value that does not conflict with
+ * an existing exitcode and return it if we want to prematurely stop the
+ * execution because we detected a change in the allocation status.
+ */
+#define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
+
+static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
+                                    int exists, void *opaque)
+{
+    RBDDiffIterateReq *req = opaque;
+
+    assert(req->offs + req->bytes <= offs);
+
+    /* treat a hole like an unallocated area and bail out */
+    if (!exists) {
+        return 0;
+    }
+
+    if (!req->exists && offs > req->offs) {
+        /*
+         * we started in an unallocated area and hit the first allocated
+         * block. req->bytes must be set to the length of the unallocated area
+         * before the allocated area. stop further processing.
+         */
+        req->bytes = offs - req->offs;
+        return QEMU_RBD_EXIT_DIFF_ITERATE2;
     }
 
-    bdi->cluster_size = info.obj_size;
+    if (req->exists && offs > req->offs + req->bytes) {
+        /*
+         * we started in an allocated area and jumped over an unallocated area,
+         * req->bytes contains the length of the allocated area before the
+         * unallocated area. stop further processing.
+         */
+        return QEMU_RBD_EXIT_DIFF_ITERATE2;
+    }
+
+    req->bytes += len;
+    req->exists = true;
+
     return 0;
 }
 
+static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
+                                                 bool want_zero, int64_t offset,
+                                                 int64_t bytes, int64_t *pnum,
+                                                 int64_t *map,
+                                                 BlockDriverState **file)
+{
+    BDRVRBDState *s = bs->opaque;
+    int status, r;
+    RBDDiffIterateReq req = { .offs = offset };
+    uint64_t features, flags;
+    uint64_t head = 0;
+
+    assert(offset + bytes <= s->image_size);
+
+    /* default to all sectors allocated */
+    status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
+    *map = offset;
+    *file = bs;
+    *pnum = bytes;
+
+    /* check if RBD image supports fast-diff */
+    r = rbd_get_features(s->image, &features);
+    if (r < 0) {
+        return status;
+    }
+    if (!(features & RBD_FEATURE_FAST_DIFF)) {
+        return status;
+    }
+
+    /* check if RBD fast-diff result is valid */
+    r = rbd_get_flags(s->image, &flags);
+    if (r < 0) {
+        return status;
+    }
+    if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
+        return status;
+    }
+
+#if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
+    /*
+     * librbd had a bug until early 2022 that affected all versions of ceph that
+     * supported fast-diff. This bug results in reporting of incorrect offsets
+     * if the offset parameter to rbd_diff_iterate2 is not object aligned.
+     * Work around this bug by rounding down the offset to object boundaries.
+     * This is OK because we call rbd_diff_iterate2 with whole_object = true.
+     * However, this workaround only works for non cloned images with default
+     * striping.
+     *
+     * See: https://tracker.ceph.com/issues/53784
+     */
+
+    /* check if RBD image has non-default striping enabled */
+    if (features & RBD_FEATURE_STRIPINGV2) {
+        return status;
+    }
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+    /*
+     * check if RBD image is a clone (= has a parent).
+     *
+     * rbd_get_parent_info is deprecated from Nautilus onwards, but the
+     * replacement rbd_get_parent is not present in Luminous and Mimic.
+     */
+    if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
+        return status;
+    }
+#pragma GCC diagnostic pop
+
+    head = req.offs & (s->object_size - 1);
+    req.offs -= head;
+    bytes += head;
+#endif
+
+    r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
+                          qemu_rbd_diff_iterate_cb, &req);
+    if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
+        return status;
+    }
+    assert(req.bytes <= bytes);
+    if (!req.exists) {
+        if (r == 0) {
+            /*
+             * rbd_diff_iterate2 does not invoke callbacks for unallocated
+             * areas. This here catches the case where no callback was
+             * invoked at all (req.bytes == 0).
+             */
+            assert(req.bytes == 0);
+            req.bytes = bytes;
+        }
+        status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
+    }
+
+    assert(req.bytes > head);
+    *pnum = req.bytes - head;
+    return status;
+}
+
 static int64_t qemu_rbd_getlength(BlockDriverState *bs)
 {
     BDRVRBDState *s = bs->opaque;
-    rbd_image_info_t info;
     int r;
 
-    r = rbd_stat(s->image, &info, sizeof(info));
+    r = rbd_get_size(s->image, &s->image_size);
     if (r < 0) {
         return r;
     }
 
-    return info.size;
+    return s->image_size;
 }
 
 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
@@ -1243,19 +1579,6 @@ static int qemu_rbd_snap_list(BlockDriverState *bs,
     return snap_count;
 }
 
-#ifdef LIBRBD_SUPPORTS_DISCARD
-static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
-                                         int64_t offset,
-                                         int bytes,
-                                         BlockCompletionFunc *cb,
-                                         void *opaque)
-{
-    return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque,
-                         RBD_AIO_DISCARD);
-}
-#endif
-
-#ifdef LIBRBD_SUPPORTS_INVALIDATE
 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
                                                       Error **errp)
 {
@@ -1265,7 +1588,6 @@ static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
         error_setg_errno(errp, -r, "Failed to invalidate the cache");
     }
 }
-#endif
 
 static QemuOptsList qemu_rbd_create_opts = {
     .name = "rbd-create-opts",
@@ -1286,12 +1608,29 @@ static QemuOptsList qemu_rbd_create_opts = {
             .type = QEMU_OPT_STRING,
             .help = "ID of secret providing the password",
         },
+        {
+            .name = "encrypt.format",
+            .type = QEMU_OPT_STRING,
+            .help = "Encrypt the image, format choices: 'luks', 'luks2'",
+        },
+        {
+            .name = "encrypt.cipher-alg",
+            .type = QEMU_OPT_STRING,
+            .help = "Name of encryption cipher algorithm"
+                    " (allowed values: aes-128, aes-256)",
+        },
+        {
+            .name = "encrypt.key-secret",
+            .type = QEMU_OPT_STRING,
+            .help = "ID of secret providing LUKS passphrase",
+        },
         { /* end of list */ }
     }
 };
 
 static const char *const qemu_rbd_strong_runtime_opts[] = {
     "pool",
+    "namespace",
     "image",
     "conf",
     "snapshot",
@@ -1306,7 +1645,6 @@ static BlockDriver bdrv_rbd = {
     .format_name            = "rbd",
     .instance_size          = sizeof(BDRVRBDState),
     .bdrv_parse_filename    = qemu_rbd_parse_filename,
-    .bdrv_refresh_limits    = qemu_rbd_refresh_limits,
     .bdrv_file_open         = qemu_rbd_open,
     .bdrv_close             = qemu_rbd_close,
     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
@@ -1314,31 +1652,26 @@ static BlockDriver bdrv_rbd = {
     .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
     .bdrv_get_info          = qemu_rbd_getinfo,
+    .bdrv_get_specific_info = qemu_rbd_get_specific_info,
     .create_opts            = &qemu_rbd_create_opts,
     .bdrv_getlength         = qemu_rbd_getlength,
     .bdrv_co_truncate       = qemu_rbd_co_truncate,
     .protocol_name          = "rbd",
 
-    .bdrv_aio_preadv        = qemu_rbd_aio_preadv,
-    .bdrv_aio_pwritev       = qemu_rbd_aio_pwritev,
-
-#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
-    .bdrv_aio_flush         = qemu_rbd_aio_flush,
-#else
+    .bdrv_co_preadv         = qemu_rbd_co_preadv,
+    .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
+    .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
+#ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
+    .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
 #endif
-
-#ifdef LIBRBD_SUPPORTS_DISCARD
-    .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
-#endif
+    .bdrv_co_block_status   = qemu_rbd_co_block_status,
 
     .bdrv_snapshot_create   = qemu_rbd_snap_create,
     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
     .bdrv_snapshot_list     = qemu_rbd_snap_list,
     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
-#ifdef LIBRBD_SUPPORTS_INVALIDATE
     .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
-#endif
 
     .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
 };