]> git.proxmox.com Git - mirror_qemu.git/blobdiff - block/rbd.c
rbd: Fix bugs around -drive parameter "server"
[mirror_qemu.git] / block / rbd.c
index 47cab8be9422b7269bea751af6c8bb50bae12209..498322b30b914266c97016d53075419c4abfc4a3 100644 (file)
  * GNU GPL, version 2 or (at your option) any later version.
  */
 
-#include <inttypes.h>
+#include "qemu/osdep.h"
 
-#include "qemu-common.h"
+#include <rbd/librbd.h>
+#include "qapi/error.h"
 #include "qemu/error-report.h"
 #include "block/block_int.h"
-
-#include <rbd/librbd.h>
+#include "crypto/secret.h"
+#include "qemu/cutils.h"
+#include "qapi/qmp/qstring.h"
 
 /*
  * When specifying the image filename use:
 
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
 
-#define RBD_MAX_CONF_NAME_SIZE 128
-#define RBD_MAX_CONF_VAL_SIZE 512
-#define RBD_MAX_CONF_SIZE 1024
-#define RBD_MAX_POOL_NAME_SIZE 128
-#define RBD_MAX_SNAP_NAME_SIZE 128
 #define RBD_MAX_SNAPS 100
 
+/* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
+#ifdef LIBRBD_SUPPORTS_IOVEC
+#define LIBRBD_USE_IOVEC 1
+#else
+#define LIBRBD_USE_IOVEC 0
+#endif
+
 typedef enum {
     RBD_AIO_READ,
     RBD_AIO_WRITE,
@@ -69,74 +73,49 @@ typedef enum {
 
 typedef struct RBDAIOCB {
     BlockAIOCB common;
-    QEMUBH *bh;
     int64_t ret;
     QEMUIOVector *qiov;
     char *bounce;
     RBDAIOCmd cmd;
-    int64_t sector_num;
     int error;
     struct BDRVRBDState *s;
-    int status;
 } RBDAIOCB;
 
 typedef struct RADOSCB {
-    int rcbid;
     RBDAIOCB *acb;
     struct BDRVRBDState *s;
-    int done;
     int64_t size;
     char *buf;
     int64_t ret;
 } RADOSCB;
 
-#define RBD_FD_READ 0
-#define RBD_FD_WRITE 1
-
 typedef struct BDRVRBDState {
     rados_t cluster;
     rados_ioctx_t io_ctx;
     rbd_image_t image;
-    char name[RBD_MAX_IMAGE_NAME_SIZE];
+    char *name;
     char *snap;
 } BDRVRBDState;
 
-static int qemu_rbd_next_tok(char *dst, int dst_len,
-                             char *src, char delim,
-                             const char *name,
-                             char **p, Error **errp)
+static char *qemu_rbd_next_tok(char *src, char delim, char **p)
 {
-    int l;
     char *end;
 
     *p = NULL;
 
-    if (delim != '\0') {
-        for (end = src; *end; ++end) {
-            if (*end == delim) {
-                break;
-            }
-            if (*end == '\\' && end[1] != '\0') {
-                end++;
-            }
-        }
+    for (end = src; *end; ++end) {
         if (*end == delim) {
-            *p = end + 1;
-            *end = '\0';
+            break;
+        }
+        if (*end == '\\' && end[1] != '\0') {
+            end++;
         }
     }
-    l = strlen(src);
-    if (l >= dst_len) {
-        error_setg(errp, "%s too long", name);
-        return -EINVAL;
-    } else if (l == 0) {
-        error_setg(errp, "%s too short", name);
-        return -EINVAL;
+    if (*end == delim) {
+        *p = end + 1;
+        *end = '\0';
     }
-
-    pstrcpy(dst, dst_len, src);
-
-    return 0;
+    return src;
 }
 
 static void qemu_rbd_unescape(char *src)
@@ -152,165 +131,232 @@ static void qemu_rbd_unescape(char *src)
     *p = '\0';
 }
 
-static int qemu_rbd_parsename(const char *filename,
-                              char *pool, int pool_len,
-                              char *snap, int snap_len,
-                              char *name, int name_len,
-                              char *conf, int conf_len,
-                              Error **errp)
+static void qemu_rbd_parse_filename(const char *filename, QDict *options,
+                                    Error **errp)
 {
     const char *start;
-    char *p, *buf;
-    int ret;
+    char *p, *buf, *keypairs;
+    char *found_str;
+    size_t max_keypair_size;
 
     if (!strstart(filename, "rbd:", &start)) {
         error_setg(errp, "File name must start with 'rbd:'");
-        return -EINVAL;
+        return;
     }
 
+    max_keypair_size = strlen(start) + 1;
     buf = g_strdup(start);
+    keypairs = g_malloc0(max_keypair_size);
     p = buf;
-    *snap = '\0';
-    *conf = '\0';
 
-    ret = qemu_rbd_next_tok(pool, pool_len, p,
-                            '/', "pool name", &p, errp);
-    if (ret < 0 || !p) {
-        ret = -EINVAL;
+    found_str = qemu_rbd_next_tok(p, '/', &p);
+    if (!p) {
+        error_setg(errp, "Pool name is required");
         goto done;
     }
-    qemu_rbd_unescape(pool);
+    qemu_rbd_unescape(found_str);
+    qdict_put(options, "pool", qstring_from_str(found_str));
 
     if (strchr(p, '@')) {
-        ret = qemu_rbd_next_tok(name, name_len, p,
-                                '@', "object name", &p, errp);
-        if (ret < 0) {
-            goto done;
-        }
-        ret = qemu_rbd_next_tok(snap, snap_len, p,
-                                ':', "snap name", &p, errp);
-        qemu_rbd_unescape(snap);
+        found_str = qemu_rbd_next_tok(p, '@', &p);
+        qemu_rbd_unescape(found_str);
+        qdict_put(options, "image", qstring_from_str(found_str));
+
+        found_str = qemu_rbd_next_tok(p, ':', &p);
+        qemu_rbd_unescape(found_str);
+        qdict_put(options, "snapshot", qstring_from_str(found_str));
     } else {
-        ret = qemu_rbd_next_tok(name, name_len, p,
-                                ':', "object name", &p, errp);
+        found_str = qemu_rbd_next_tok(p, ':', &p);
+        qemu_rbd_unescape(found_str);
+        qdict_put(options, "image", qstring_from_str(found_str));
     }
-    qemu_rbd_unescape(name);
-    if (ret < 0 || !p) {
+    if (!p) {
         goto done;
     }
 
-    ret = qemu_rbd_next_tok(conf, conf_len, p,
-                            '\0', "configuration", &p, errp);
+    /* The following are essentially all key/value pairs, and we treat
+     * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
+    while (p) {
+        char *name, *value;
+        name = qemu_rbd_next_tok(p, '=', &p);
+        if (!p) {
+            error_setg(errp, "conf option %s has no value", name);
+            break;
+        }
+
+        qemu_rbd_unescape(name);
+
+        value = qemu_rbd_next_tok(p, ':', &p);
+        qemu_rbd_unescape(value);
+
+        if (!strcmp(name, "conf")) {
+            qdict_put(options, "conf", qstring_from_str(value));
+        } else if (!strcmp(name, "id")) {
+            qdict_put(options, "user" , qstring_from_str(value));
+        } else {
+            /* FIXME: This is pretty ugly, and not the right way to do this.
+             *        These should be contained in a structure, and then
+             *        passed explicitly as individual key/value pairs to
+             *        rados.  Consider this legacy code that needs to be
+             *        updated. */
+            char *tmp = g_malloc0(max_keypair_size);
+            /* only use a delimiter if it is not the first keypair found */
+            /* These are sets of unknown key/value pairs we'll pass along
+             * to ceph */
+            if (keypairs[0]) {
+                snprintf(tmp, max_keypair_size, ":%s=%s", name, value);
+                pstrcat(keypairs, max_keypair_size, tmp);
+            } else {
+                snprintf(keypairs, max_keypair_size, "%s=%s", name, value);
+            }
+            g_free(tmp);
+        }
+    }
+
+    if (keypairs[0]) {
+        qdict_put(options, "=keyvalue-pairs", qstring_from_str(keypairs));
+    }
+
 
 done:
     g_free(buf);
-    return ret;
+    g_free(keypairs);
+    return;
 }
 
-static char *qemu_rbd_parse_clientname(const char *conf, char *clientname)
+
+static int qemu_rbd_set_auth(rados_t cluster, const char *secretid,
+                             Error **errp)
 {
-    const char *p = conf;
+    if (secretid == 0) {
+        return 0;
+    }
 
-    while (*p) {
-        int len;
-        const char *end = strchr(p, ':');
+    gchar *secret = qcrypto_secret_lookup_as_base64(secretid,
+                                                    errp);
+    if (!secret) {
+        return -1;
+    }
 
-        if (end) {
-            len = end - p;
-        } else {
-            len = strlen(p);
-        }
+    rados_conf_set(cluster, "key", secret);
+    g_free(secret);
 
-        if (strncmp(p, "id=", 3) == 0) {
-            len -= 3;
-            strncpy(clientname, p + 3, len);
-            clientname[len] = '\0';
-            return clientname;
-        }
-        if (end == NULL) {
-            break;
-        }
-        p = end + 1;
-    }
-    return NULL;
+    return 0;
 }
 
-static int qemu_rbd_set_conf(rados_t cluster, const char *conf, Error **errp)
+static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs,
+                                 Error **errp)
 {
     char *p, *buf;
-    char name[RBD_MAX_CONF_NAME_SIZE];
-    char value[RBD_MAX_CONF_VAL_SIZE];
+    char *name;
+    char *value;
     int ret = 0;
 
-    buf = g_strdup(conf);
+    buf = g_strdup(keypairs);
     p = buf;
 
     while (p) {
-        ret = qemu_rbd_next_tok(name, sizeof(name), p,
-                                '=', "conf option name", &p, errp);
-        if (ret < 0) {
-            break;
-        }
-        qemu_rbd_unescape(name);
-
+        name = qemu_rbd_next_tok(p, '=', &p);
         if (!p) {
             error_setg(errp, "conf option %s has no value", name);
             ret = -EINVAL;
             break;
         }
 
-        ret = qemu_rbd_next_tok(value, sizeof(value), p,
-                                ':', "conf option value", &p, errp);
+        value = qemu_rbd_next_tok(p, ':', &p);
+
+        ret = rados_conf_set(cluster, name, value);
         if (ret < 0) {
+            error_setg_errno(errp, -ret, "invalid conf option %s", name);
+            ret = -EINVAL;
             break;
         }
-        qemu_rbd_unescape(value);
-
-        if (strcmp(name, "conf") == 0) {
-            ret = rados_conf_read_file(cluster, value);
-            if (ret < 0) {
-                error_setg(errp, "error reading conf file %s", value);
-                break;
-            }
-        } else if (strcmp(name, "id") == 0) {
-            /* ignore, this is parsed by qemu_rbd_parse_clientname() */
-        } else {
-            ret = rados_conf_set(cluster, name, value);
-            if (ret < 0) {
-                error_setg(errp, "invalid conf option %s", name);
-                ret = -EINVAL;
-                break;
-            }
-        }
     }
 
     g_free(buf);
     return ret;
 }
 
+static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
+{
+    if (LIBRBD_USE_IOVEC) {
+        RBDAIOCB *acb = rcb->acb;
+        iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
+                   acb->qiov->size - offs);
+    } else {
+        memset(rcb->buf + offs, 0, rcb->size - offs);
+    }
+}
+
+static QemuOptsList runtime_opts = {
+    .name = "rbd",
+    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
+    .desc = {
+        {
+            .name = "pool",
+            .type = QEMU_OPT_STRING,
+            .help = "Rados pool name",
+        },
+        {
+            .name = "image",
+            .type = QEMU_OPT_STRING,
+            .help = "Image name in the pool",
+        },
+        {
+            .name = "conf",
+            .type = QEMU_OPT_STRING,
+            .help = "Rados config file location",
+        },
+        {
+            .name = "snapshot",
+            .type = QEMU_OPT_STRING,
+            .help = "Ceph snapshot name",
+        },
+        {
+            /* maps to 'id' in rados_create() */
+            .name = "user",
+            .type = QEMU_OPT_STRING,
+            .help = "Rados id name",
+        },
+        /*
+         * server.* extracted manually, see qemu_rbd_mon_host()
+         */
+        {
+            .name = "password-secret",
+            .type = QEMU_OPT_STRING,
+            .help = "ID of secret providing the password",
+        },
+
+        /*
+         * Keys for qemu_rbd_parse_filename(), not in the QAPI schema
+         */
+        {
+            /*
+             * HACK: name starts with '=' so that qemu_opts_parse()
+             * can't set it
+             */
+            .name = "=keyvalue-pairs",
+            .type = QEMU_OPT_STRING,
+            .help = "Legacy rados key/value option parameters",
+        },
+        { /* end of list */ }
+    },
+};
+
 static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
 {
     Error *local_err = NULL;
     int64_t bytes = 0;
     int64_t objsize;
     int obj_order = 0;
-    char pool[RBD_MAX_POOL_NAME_SIZE];
-    char name[RBD_MAX_IMAGE_NAME_SIZE];
-    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
-    char conf[RBD_MAX_CONF_SIZE];
-    char clientname_buf[RBD_MAX_CONF_SIZE];
-    char *clientname;
+    const char *pool, *name, *conf, *clientname, *keypairs;
+    const char *secretid;
     rados_t cluster;
     rados_ioctx_t io_ctx;
-    int ret;
+    QDict *options = NULL;
+    int ret = 0;
 
-    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
-                           snap_buf, sizeof(snap_buf),
-                           name, sizeof(name),
-                           conf, sizeof(conf), &local_err) < 0) {
-        error_propagate(errp, local_err);
-        return -EINVAL;
-    }
+    secretid = qemu_opt_get(opts, "password-secret");
 
     /* Read out options */
     bytes = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
@@ -319,49 +365,80 @@ static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
     if (objsize) {
         if ((objsize - 1) & objsize) {    /* not a power of 2? */
             error_setg(errp, "obj size needs to be power of 2");
-            return -EINVAL;
+            ret = -EINVAL;
+            goto exit;
         }
         if (objsize < 4096) {
             error_setg(errp, "obj size too small");
-            return -EINVAL;
+            ret = -EINVAL;
+            goto exit;
         }
-        obj_order = ffs(objsize) - 1;
+        obj_order = ctz32(objsize);
     }
 
-    clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
-    if (rados_create(&cluster, clientname) < 0) {
-        error_setg(errp, "error initializing");
-        return -EIO;
+    options = qdict_new();
+    qemu_rbd_parse_filename(filename, options, &local_err);
+    if (local_err) {
+        ret = -EINVAL;
+        error_propagate(errp, local_err);
+        goto exit;
     }
 
-    if (strstr(conf, "conf=") == NULL) {
-        /* try default location, but ignore failure */
-        rados_conf_read_file(cluster, NULL);
+    pool       = qdict_get_try_str(options, "pool");
+    conf       = qdict_get_try_str(options, "conf");
+    clientname = qdict_get_try_str(options, "user");
+    name       = qdict_get_try_str(options, "image");
+    keypairs   = qdict_get_try_str(options, "=keyvalue-pairs");
+
+    ret = rados_create(&cluster, clientname);
+    if (ret < 0) {
+        error_setg_errno(errp, -ret, "error initializing");
+        goto exit;
     }
 
-    if (conf[0] != '\0' &&
-        qemu_rbd_set_conf(cluster, conf, &local_err) < 0) {
-        rados_shutdown(cluster);
-        error_propagate(errp, local_err);
-        return -EIO;
+    /* try default location when conf=NULL, but ignore failure */
+    ret = rados_conf_read_file(cluster, conf);
+    if (conf && ret < 0) {
+        error_setg_errno(errp, -ret, "error reading conf file %s", conf);
+        ret = -EIO;
+        goto shutdown;
+    }
+
+    ret = qemu_rbd_set_keypairs(cluster, keypairs, errp);
+    if (ret < 0) {
+        ret = -EIO;
+        goto shutdown;
     }
 
-    if (rados_connect(cluster) < 0) {
-        error_setg(errp, "error connecting");
-        rados_shutdown(cluster);
-        return -EIO;
+    if (qemu_rbd_set_auth(cluster, secretid, errp) < 0) {
+        ret = -EIO;
+        goto shutdown;
     }
 
-    if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) {
-        error_setg(errp, "error opening pool %s", pool);
-        rados_shutdown(cluster);
-        return -EIO;
+    ret = rados_connect(cluster);
+    if (ret < 0) {
+        error_setg_errno(errp, -ret, "error connecting");
+        goto shutdown;
+    }
+
+    ret = rados_ioctx_create(cluster, pool, &io_ctx);
+    if (ret < 0) {
+        error_setg_errno(errp, -ret, "error opening pool %s", pool);
+        goto shutdown;
     }
 
     ret = rbd_create(io_ctx, name, bytes, &obj_order);
+    if (ret < 0) {
+        error_setg_errno(errp, -ret, "error rbd create");
+    }
+
     rados_ioctx_destroy(io_ctx);
+
+shutdown:
     rados_shutdown(cluster);
 
+exit:
+    QDECREF(options);
     return ret;
 }
 
@@ -385,11 +462,11 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
         }
     } else {
         if (r < 0) {
-            memset(rcb->buf, 0, rcb->size);
+            qemu_rbd_memset(rcb, 0);
             acb->ret = r;
             acb->error = 1;
         } else if (r < rcb->size) {
-            memset(rcb->buf + r, 0, rcb->size - r);
+            qemu_rbd_memset(rcb, r);
             if (!acb->error) {
                 acb->ret = rcb->size;
             }
@@ -400,72 +477,130 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 
     g_free(rcb);
 
-    if (acb->cmd == RBD_AIO_READ) {
-        qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
+    if (!LIBRBD_USE_IOVEC) {
+        if (acb->cmd == RBD_AIO_READ) {
+            qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
+        }
+        qemu_vfree(acb->bounce);
     }
-    qemu_vfree(acb->bounce);
+
     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
-    acb->status = 0;
 
     qemu_aio_unref(acb);
 }
 
-/* TODO Convert to fine grained options */
-static QemuOptsList runtime_opts = {
-    .name = "rbd",
-    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
-    .desc = {
-        {
-            .name = "filename",
-            .type = QEMU_OPT_STRING,
-            .help = "Specification of the rbd image",
-        },
-        { /* end of list */ }
-    },
-};
+static char *qemu_rbd_mon_host(QDict *options, Error **errp)
+{
+    const char **vals = g_new(const char *, qdict_size(options) + 1);
+    char keybuf[32];
+    const char *host, *port;
+    char *rados_str;
+    int i;
+
+    for (i = 0;; i++) {
+        sprintf(keybuf, "server.%d.host", i);
+        host = qdict_get_try_str(options, keybuf);
+        qdict_del(options, keybuf);
+        sprintf(keybuf, "server.%d.port", i);
+        port = qdict_get_try_str(options, keybuf);
+        qdict_del(options, keybuf);
+        if (!host && !port) {
+            break;
+        }
+        if (!host) {
+            error_setg(errp, "Parameter server.%d.host is missing", i);
+            rados_str = NULL;
+            goto out;
+        }
+
+        if (strchr(host, ':')) {
+            vals[i] = port ? g_strdup_printf("[%s]:%s", host, port)
+                : g_strdup_printf("[%s]", host);
+        } else {
+            vals[i] = port ? g_strdup_printf("%s:%s", host, port)
+                : g_strdup(host);
+        }
+    }
+    vals[i] = NULL;
+
+    rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
+out:
+    g_strfreev((char **)vals);
+    return rados_str;
+}
 
 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
                          Error **errp)
 {
     BDRVRBDState *s = bs->opaque;
-    char pool[RBD_MAX_POOL_NAME_SIZE];
-    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
-    char conf[RBD_MAX_CONF_SIZE];
-    char clientname_buf[RBD_MAX_CONF_SIZE];
-    char *clientname;
+    const char *pool, *snap, *conf, *clientname, *name, *keypairs;
+    const char *secretid;
     QemuOpts *opts;
     Error *local_err = NULL;
-    const char *filename;
+    char *mon_host = NULL;
     int r;
 
     opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
     qemu_opts_absorb_qdict(opts, options, &local_err);
     if (local_err) {
         error_propagate(errp, local_err);
-        qemu_opts_del(opts);
-        return -EINVAL;
+        r = -EINVAL;
+        goto failed_opts;
+    }
+
+    mon_host = qemu_rbd_mon_host(options, &local_err);
+    if (local_err) {
+        error_propagate(errp, local_err);
+        r = -EINVAL;
+        goto failed_opts;
     }
 
-    filename = qemu_opt_get(opts, "filename");
+    secretid = qemu_opt_get(opts, "password-secret");
+
+    pool           = qemu_opt_get(opts, "pool");
+    conf           = qemu_opt_get(opts, "conf");
+    snap           = qemu_opt_get(opts, "snapshot");
+    clientname     = qemu_opt_get(opts, "user");
+    name           = qemu_opt_get(opts, "image");
+    keypairs       = qemu_opt_get(opts, "=keyvalue-pairs");
 
-    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
-                           snap_buf, sizeof(snap_buf),
-                           s->name, sizeof(s->name),
-                           conf, sizeof(conf), errp) < 0) {
+    if (!pool || !name) {
+        error_setg(errp, "Parameters 'pool' and 'image' are required");
         r = -EINVAL;
         goto failed_opts;
     }
 
-    clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
     r = rados_create(&s->cluster, clientname);
     if (r < 0) {
-        error_setg(&local_err, "error initializing");
+        error_setg_errno(errp, -r, "error initializing");
         goto failed_opts;
     }
 
-    s->snap = NULL;
-    if (snap_buf[0] != '\0') {
-        s->snap = g_strdup(snap_buf);
+    s->snap = g_strdup(snap);
+    s->name = g_strdup(name);
+
+    /* try default location when conf=NULL, but ignore failure */
+    r = rados_conf_read_file(s->cluster, conf);
+    if (conf && r < 0) {
+        error_setg_errno(errp, -r, "error reading conf file %s", conf);
+        goto failed_shutdown;
+    }
+
+    r = qemu_rbd_set_keypairs(s->cluster, keypairs, errp);
+    if (r < 0) {
+        goto failed_shutdown;
+    }
+
+    if (mon_host) {
+        r = rados_conf_set(s->cluster, "mon_host", mon_host);
+        if (r < 0) {
+            goto failed_shutdown;
+        }
+    }
+
+    if (qemu_rbd_set_auth(s->cluster, secretid, errp) < 0) {
+        r = -EIO;
+        goto failed_shutdown;
     }
 
     /*
@@ -481,33 +616,21 @@ static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
         rados_conf_set(s->cluster, "rbd_cache", "true");
     }
 
-    if (strstr(conf, "conf=") == NULL) {
-        /* try default location, but ignore failure */
-        rados_conf_read_file(s->cluster, NULL);
-    }
-
-    if (conf[0] != '\0') {
-        r = qemu_rbd_set_conf(s->cluster, conf, errp);
-        if (r < 0) {
-            goto failed_shutdown;
-        }
-    }
-
     r = rados_connect(s->cluster);
     if (r < 0) {
-        error_setg(&local_err, "error connecting");
+        error_setg_errno(errp, -r, "error connecting");
         goto failed_shutdown;
     }
 
     r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
     if (r < 0) {
-        error_setg(&local_err, "error opening pool %s", pool);
+        error_setg_errno(errp, -r, "error opening pool %s", pool);
         goto failed_shutdown;
     }
 
     r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
     if (r < 0) {
-        error_setg(&local_err, "error reading header from %s", s->name);
+        error_setg_errno(errp, -r, "error reading header from %s", s->name);
         goto failed_open;
     }
 
@@ -521,8 +644,10 @@ failed_open:
 failed_shutdown:
     rados_shutdown(s->cluster);
     g_free(s->snap);
+    g_free(s->name);
 failed_opts:
     qemu_opts_del(opts);
+    g_free(mon_host);
     return r;
 }
 
@@ -533,6 +658,7 @@ static void qemu_rbd_close(BlockDriverState *bs)
     rbd_close(s->image);
     rados_ioctx_destroy(s->io_ctx);
     g_free(s->snap);
+    g_free(s->name);
     rados_shutdown(s->cluster);
 }
 
@@ -543,7 +669,6 @@ static const AIOCBInfo rbd_aiocb_info = {
 static void rbd_finish_bh(void *opaque)
 {
     RADOSCB *rcb = opaque;
-    qemu_bh_delete(rcb->acb->bh);
     qemu_rbd_complete_aio(rcb);
 }
 
@@ -562,9 +687,8 @@ static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
     rcb->ret = rbd_aio_get_return_value(c);
     rbd_aio_release(c);
 
-    acb->bh = aio_bh_new(bdrv_get_aio_context(acb->common.bs),
-                         rbd_finish_bh, rcb);
-    qemu_bh_schedule(acb->bh);
+    aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs),
+                            rbd_finish_bh, rcb);
 }
 
 static int rbd_aio_discard_wrapper(rbd_image_t image,
@@ -590,9 +714,9 @@ static int rbd_aio_flush_wrapper(rbd_image_t image,
 }
 
 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
-                                 int64_t sector_num,
+                                 int64_t off,
                                  QEMUIOVector *qiov,
-                                 int nb_sectors,
+                                 int64_t size,
                                  BlockCompletionFunc *cb,
                                  void *opaque,
                                  RBDAIOCmd cmd)
@@ -600,8 +724,6 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
     RBDAIOCB *acb;
     RADOSCB *rcb = NULL;
     rbd_completion_t c;
-    int64_t off, size;
-    char *buf;
     int r;
 
     BDRVRBDState *s = bs->opaque;
@@ -609,33 +731,30 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
     acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
     acb->cmd = cmd;
     acb->qiov = qiov;
-    if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
-        acb->bounce = NULL;
-    } else {
-        acb->bounce = qemu_try_blockalign(bs, qiov->size);
-        if (acb->bounce == NULL) {
-            goto failed;
+    assert(!qiov || qiov->size == size);
+
+    rcb = g_new(RADOSCB, 1);
+
+    if (!LIBRBD_USE_IOVEC) {
+        if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
+            acb->bounce = NULL;
+        } else {
+            acb->bounce = qemu_try_blockalign(bs, qiov->size);
+            if (acb->bounce == NULL) {
+                goto failed;
+            }
+        }
+        if (cmd == RBD_AIO_WRITE) {
+            qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
         }
+        rcb->buf = acb->bounce;
     }
+
     acb->ret = 0;
     acb->error = 0;
     acb->s = s;
-    acb->bh = NULL;
-    acb->status = -EINPROGRESS;
 
-    if (cmd == RBD_AIO_WRITE) {
-        qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
-    }
-
-    buf = acb->bounce;
-
-    off = sector_num * BDRV_SECTOR_SIZE;
-    size = nb_sectors * BDRV_SECTOR_SIZE;
-
-    rcb = g_new(RADOSCB, 1);
-    rcb->done = 0;
     rcb->acb = acb;
-    rcb->buf = buf;
     rcb->s = acb->s;
     rcb->size = size;
     r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
@@ -645,10 +764,18 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 
     switch (cmd) {
     case RBD_AIO_WRITE:
-        r = rbd_aio_write(s->image, off, size, buf, c);
+#ifdef LIBRBD_SUPPORTS_IOVEC
+            r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
+#else
+            r = rbd_aio_write(s->image, off, size, rcb->buf, c);
+#endif
         break;
     case RBD_AIO_READ:
-        r = rbd_aio_read(s->image, off, size, buf, c);
+#ifdef LIBRBD_SUPPORTS_IOVEC
+            r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
+#else
+            r = rbd_aio_read(s->image, off, size, rcb->buf, c);
+#endif
         break;
     case RBD_AIO_DISCARD:
         r = rbd_aio_discard_wrapper(s->image, off, size, c);
@@ -663,14 +790,16 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
     if (r < 0) {
         goto failed_completion;
     }
-
     return &acb->common;
 
 failed_completion:
     rbd_aio_release(c);
 failed:
     g_free(rcb);
-    qemu_vfree(acb->bounce);
+    if (!LIBRBD_USE_IOVEC) {
+        qemu_vfree(acb->bounce);
+    }
+
     qemu_aio_unref(acb);
     return NULL;
 }
@@ -682,7 +811,8 @@ static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
                                       BlockCompletionFunc *cb,
                                       void *opaque)
 {
-    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
+    return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov,
+                         (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque,
                          RBD_AIO_READ);
 }
 
@@ -693,7 +823,8 @@ static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
                                        BlockCompletionFunc *cb,
                                        void *opaque)
 {
-    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
+    return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov,
+                         (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque,
                          RBD_AIO_WRITE);
 }
 
@@ -826,10 +957,8 @@ static int qemu_rbd_snap_rollback(BlockDriverState *bs,
                                   const char *snapshot_name)
 {
     BDRVRBDState *s = bs->opaque;
-    int r;
 
-    r = rbd_snap_rollback(s->image, snapshot_name);
-    return r;
+    return rbd_snap_rollback(s->image, snapshot_name);
 }
 
 static int qemu_rbd_snap_list(BlockDriverState *bs,
@@ -876,17 +1005,29 @@ static int qemu_rbd_snap_list(BlockDriverState *bs,
 }
 
 #ifdef LIBRBD_SUPPORTS_DISCARD
-static BlockAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs,
-                                        int64_t sector_num,
-                                        int nb_sectors,
-                                        BlockCompletionFunc *cb,
-                                        void *opaque)
+static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
+                                         int64_t offset,
+                                         int count,
+                                         BlockCompletionFunc *cb,
+                                         void *opaque)
 {
-    return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque,
+    return rbd_start_aio(bs, offset, NULL, count, cb, opaque,
                          RBD_AIO_DISCARD);
 }
 #endif
 
+#ifdef LIBRBD_SUPPORTS_INVALIDATE
+static void qemu_rbd_invalidate_cache(BlockDriverState *bs,
+                                      Error **errp)
+{
+    BDRVRBDState *s = bs->opaque;
+    int r = rbd_invalidate_cache(s->image);
+    if (r < 0) {
+        error_setg_errno(errp, -r, "Failed to invalidate the cache");
+    }
+}
+#endif
+
 static QemuOptsList qemu_rbd_create_opts = {
     .name = "rbd-create-opts",
     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
@@ -901,23 +1042,28 @@ static QemuOptsList qemu_rbd_create_opts = {
             .type = QEMU_OPT_SIZE,
             .help = "RBD object size"
         },
+        {
+            .name = "password-secret",
+            .type = QEMU_OPT_STRING,
+            .help = "ID of secret providing the password",
+        },
         { /* end of list */ }
     }
 };
 
 static BlockDriver bdrv_rbd = {
-    .format_name        = "rbd",
-    .instance_size      = sizeof(BDRVRBDState),
-    .bdrv_needs_filename = true,
-    .bdrv_file_open     = qemu_rbd_open,
-    .bdrv_close         = qemu_rbd_close,
-    .bdrv_create        = qemu_rbd_create,
-    .bdrv_has_zero_init = bdrv_has_zero_init_1,
-    .bdrv_get_info      = qemu_rbd_getinfo,
-    .create_opts        = &qemu_rbd_create_opts,
-    .bdrv_getlength     = qemu_rbd_getlength,
-    .bdrv_truncate      = qemu_rbd_truncate,
-    .protocol_name      = "rbd",
+    .format_name            = "rbd",
+    .instance_size          = sizeof(BDRVRBDState),
+    .bdrv_parse_filename    = qemu_rbd_parse_filename,
+    .bdrv_file_open         = qemu_rbd_open,
+    .bdrv_close             = qemu_rbd_close,
+    .bdrv_create            = qemu_rbd_create,
+    .bdrv_has_zero_init     = bdrv_has_zero_init_1,
+    .bdrv_get_info          = qemu_rbd_getinfo,
+    .create_opts            = &qemu_rbd_create_opts,
+    .bdrv_getlength         = qemu_rbd_getlength,
+    .bdrv_truncate          = qemu_rbd_truncate,
+    .protocol_name          = "rbd",
 
     .bdrv_aio_readv         = qemu_rbd_aio_readv,
     .bdrv_aio_writev        = qemu_rbd_aio_writev,
@@ -929,13 +1075,16 @@ static BlockDriver bdrv_rbd = {
 #endif
 
 #ifdef LIBRBD_SUPPORTS_DISCARD
-    .bdrv_aio_discard       = qemu_rbd_aio_discard,
+    .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
 #endif
 
     .bdrv_snapshot_create   = qemu_rbd_snap_create,
     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
     .bdrv_snapshot_list     = qemu_rbd_snap_list,
     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
+#ifdef LIBRBD_SUPPORTS_INVALIDATE
+    .bdrv_invalidate_cache  = qemu_rbd_invalidate_cache,
+#endif
 };
 
 static void bdrv_rbd_init(void)