]> git.proxmox.com Git - mirror_qemu.git/blobdiff - block/rbd.c
Merge remote-tracking branch 'remotes/mst/tags/for_upstream' into staging
[mirror_qemu.git] / block / rbd.c
index dbc79f45251c863f956ec05c28bd84d79a42a431..a60a19d58d01b7f37600a36bb2dcbe4ca93f5da3 100644 (file)
@@ -68,32 +68,24 @@ typedef enum {
 } RBDAIOCmd;
 
 typedef struct RBDAIOCB {
-    BlockDriverAIOCB common;
+    BlockAIOCB common;
     QEMUBH *bh;
     int64_t ret;
     QEMUIOVector *qiov;
     char *bounce;
     RBDAIOCmd cmd;
-    int64_t sector_num;
     int error;
     struct BDRVRBDState *s;
-    int cancelled;
-    int status;
 } RBDAIOCB;
 
 typedef struct RADOSCB {
-    int rcbid;
     RBDAIOCB *acb;
     struct BDRVRBDState *s;
-    int done;
     int64_t size;
     char *buf;
     int64_t ret;
 } RADOSCB;
 
-#define RBD_FD_READ 0
-#define RBD_FD_WRITE 1
-
 typedef struct BDRVRBDState {
     rados_t cluster;
     rados_ioctx_t io_ctx;
@@ -105,7 +97,7 @@ typedef struct BDRVRBDState {
 static int qemu_rbd_next_tok(char *dst, int dst_len,
                              char *src, char delim,
                              const char *name,
-                             char **p)
+                             char **p, Error **errp)
 {
     int l;
     char *end;
@@ -128,10 +120,10 @@ static int qemu_rbd_next_tok(char *dst, int dst_len,
     }
     l = strlen(src);
     if (l >= dst_len) {
-        error_report("%s too long", name);
+        error_setg(errp, "%s too long", name);
         return -EINVAL;
     } else if (l == 0) {
-        error_report("%s too short", name);
+        error_setg(errp, "%s too short", name);
         return -EINVAL;
     }
 
@@ -157,13 +149,15 @@ static int qemu_rbd_parsename(const char *filename,
                               char *pool, int pool_len,
                               char *snap, int snap_len,
                               char *name, int name_len,
-                              char *conf, int conf_len)
+                              char *conf, int conf_len,
+                              Error **errp)
 {
     const char *start;
     char *p, *buf;
     int ret;
 
     if (!strstart(filename, "rbd:", &start)) {
+        error_setg(errp, "File name must start with 'rbd:'");
         return -EINVAL;
     }
 
@@ -172,7 +166,8 @@ static int qemu_rbd_parsename(const char *filename,
     *snap = '\0';
     *conf = '\0';
 
-    ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
+    ret = qemu_rbd_next_tok(pool, pool_len, p,
+                            '/', "pool name", &p, errp);
     if (ret < 0 || !p) {
         ret = -EINVAL;
         goto done;
@@ -180,21 +175,25 @@ static int qemu_rbd_parsename(const char *filename,
     qemu_rbd_unescape(pool);
 
     if (strchr(p, '@')) {
-        ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
+        ret = qemu_rbd_next_tok(name, name_len, p,
+                                '@', "object name", &p, errp);
         if (ret < 0) {
             goto done;
         }
-        ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p);
+        ret = qemu_rbd_next_tok(snap, snap_len, p,
+                                ':', "snap name", &p, errp);
         qemu_rbd_unescape(snap);
     } else {
-        ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p);
+        ret = qemu_rbd_next_tok(name, name_len, p,
+                                ':', "object name", &p, errp);
     }
     qemu_rbd_unescape(name);
     if (ret < 0 || !p) {
         goto done;
     }
 
-    ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p);
+    ret = qemu_rbd_next_tok(conf, conf_len, p,
+                            '\0', "configuration", &p, errp);
 
 done:
     g_free(buf);
@@ -229,7 +228,9 @@ static char *qemu_rbd_parse_clientname(const char *conf, char *clientname)
     return NULL;
 }
 
-static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
+static int qemu_rbd_set_conf(rados_t cluster, const char *conf,
+                             bool only_read_conf_file,
+                             Error **errp)
 {
     char *p, *buf;
     char name[RBD_MAX_CONF_NAME_SIZE];
@@ -241,37 +242,41 @@ static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
 
     while (p) {
         ret = qemu_rbd_next_tok(name, sizeof(name), p,
-                                '=', "conf option name", &p);
+                                '=', "conf option name", &p, errp);
         if (ret < 0) {
             break;
         }
         qemu_rbd_unescape(name);
 
         if (!p) {
-            error_report("conf option %s has no value", name);
+            error_setg(errp, "conf option %s has no value", name);
             ret = -EINVAL;
             break;
         }
 
         ret = qemu_rbd_next_tok(value, sizeof(value), p,
-                                ':', "conf option value", &p);
+                                ':', "conf option value", &p, errp);
         if (ret < 0) {
             break;
         }
         qemu_rbd_unescape(value);
 
         if (strcmp(name, "conf") == 0) {
-            ret = rados_conf_read_file(cluster, value);
-            if (ret < 0) {
-                error_report("error reading conf file %s", value);
-                break;
+            /* read the conf file alone, so it doesn't override more
+               specific settings for a particular device */
+            if (only_read_conf_file) {
+                ret = rados_conf_read_file(cluster, value);
+                if (ret < 0) {
+                    error_setg(errp, "error reading conf file %s", value);
+                    break;
+                }
             }
         } else if (strcmp(name, "id") == 0) {
             /* ignore, this is parsed by qemu_rbd_parse_clientname() */
-        } else {
+        } else if (!only_read_conf_file) {
             ret = rados_conf_set(cluster, name, value);
             if (ret < 0) {
-                error_report("invalid conf option %s", name);
+                error_setg(errp, "invalid conf option %s", name);
                 ret = -EINVAL;
                 break;
             }
@@ -282,9 +287,9 @@ static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
     return ret;
 }
 
-static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options,
-                           Error **errp)
+static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
 {
+    Error *local_err = NULL;
     int64_t bytes = 0;
     int64_t objsize;
     int obj_order = 0;
@@ -301,57 +306,58 @@ static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options,
     if (qemu_rbd_parsename(filename, pool, sizeof(pool),
                            snap_buf, sizeof(snap_buf),
                            name, sizeof(name),
-                           conf, sizeof(conf)) < 0) {
+                           conf, sizeof(conf), &local_err) < 0) {
+        error_propagate(errp, local_err);
         return -EINVAL;
     }
 
     /* Read out options */
-    while (options && options->name) {
-        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
-            bytes = options->value.n;
-        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
-            if (options->value.n) {
-                objsize = options->value.n;
-                if ((objsize - 1) & objsize) {    /* not a power of 2? */
-                    error_report("obj size needs to be power of 2");
-                    return -EINVAL;
-                }
-                if (objsize < 4096) {
-                    error_report("obj size too small");
-                    return -EINVAL;
-                }
-                obj_order = ffs(objsize) - 1;
-            }
+    bytes = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
+                     BDRV_SECTOR_SIZE);
+    objsize = qemu_opt_get_size_del(opts, BLOCK_OPT_CLUSTER_SIZE, 0);
+    if (objsize) {
+        if ((objsize - 1) & objsize) {    /* not a power of 2? */
+            error_setg(errp, "obj size needs to be power of 2");
+            return -EINVAL;
+        }
+        if (objsize < 4096) {
+            error_setg(errp, "obj size too small");
+            return -EINVAL;
         }
-        options++;
+        obj_order = ctz32(objsize);
     }
 
     clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
     if (rados_create(&cluster, clientname) < 0) {
-        error_report("error initializing");
+        error_setg(errp, "error initializing");
         return -EIO;
     }
 
     if (strstr(conf, "conf=") == NULL) {
         /* try default location, but ignore failure */
         rados_conf_read_file(cluster, NULL);
+    } else if (conf[0] != '\0' &&
+               qemu_rbd_set_conf(cluster, conf, true, &local_err) < 0) {
+        rados_shutdown(cluster);
+        error_propagate(errp, local_err);
+        return -EIO;
     }
 
     if (conf[0] != '\0' &&
-        qemu_rbd_set_conf(cluster, conf) < 0) {
-        error_report("error setting config options");
+        qemu_rbd_set_conf(cluster, conf, false, &local_err) < 0) {
         rados_shutdown(cluster);
+        error_propagate(errp, local_err);
         return -EIO;
     }
 
     if (rados_connect(cluster) < 0) {
-        error_report("error connecting");
+        error_setg(errp, "error connecting");
         rados_shutdown(cluster);
         return -EIO;
     }
 
     if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) {
-        error_report("error opening pool %s", pool);
+        error_setg(errp, "error opening pool %s", pool);
         rados_shutdown(cluster);
         return -EIO;
     }
@@ -403,11 +409,8 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
     }
     qemu_vfree(acb->bounce);
     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
-    acb->status = 0;
 
-    if (!acb->cancelled) {
-        qemu_aio_release(acb);
-    }
+    qemu_aio_unref(acb);
 }
 
 /* TODO Convert to fine grained options */
@@ -441,8 +444,7 @@ static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
     opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
     qemu_opts_absorb_qdict(opts, options, &local_err);
     if (local_err) {
-        qerror_report_err(local_err);
-        error_free(local_err);
+        error_propagate(errp, local_err);
         qemu_opts_del(opts);
         return -EINVAL;
     }
@@ -452,7 +454,7 @@ static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
     if (qemu_rbd_parsename(filename, pool, sizeof(pool),
                            snap_buf, sizeof(snap_buf),
                            s->name, sizeof(s->name),
-                           conf, sizeof(conf)) < 0) {
+                           conf, sizeof(conf), errp) < 0) {
         r = -EINVAL;
         goto failed_opts;
     }
@@ -460,7 +462,7 @@ static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
     clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
     r = rados_create(&s->cluster, clientname);
     if (r < 0) {
-        error_report("error initializing");
+        error_setg(errp, "error initializing");
         goto failed_opts;
     }
 
@@ -469,6 +471,23 @@ static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
         s->snap = g_strdup(snap_buf);
     }
 
+    if (strstr(conf, "conf=") == NULL) {
+        /* try default location, but ignore failure */
+        rados_conf_read_file(s->cluster, NULL);
+    } else if (conf[0] != '\0') {
+        r = qemu_rbd_set_conf(s->cluster, conf, true, errp);
+        if (r < 0) {
+            goto failed_shutdown;
+        }
+    }
+
+    if (conf[0] != '\0') {
+        r = qemu_rbd_set_conf(s->cluster, conf, false, errp);
+        if (r < 0) {
+            goto failed_shutdown;
+        }
+    }
+
     /*
      * Fallback to more conservative semantics if setting cache
      * options fails. Ignore errors from setting rbd_cache because the
@@ -482,34 +501,21 @@ static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
         rados_conf_set(s->cluster, "rbd_cache", "true");
     }
 
-    if (strstr(conf, "conf=") == NULL) {
-        /* try default location, but ignore failure */
-        rados_conf_read_file(s->cluster, NULL);
-    }
-
-    if (conf[0] != '\0') {
-        r = qemu_rbd_set_conf(s->cluster, conf);
-        if (r < 0) {
-            error_report("error setting config options");
-            goto failed_shutdown;
-        }
-    }
-
     r = rados_connect(s->cluster);
     if (r < 0) {
-        error_report("error connecting");
+        error_setg(errp, "error connecting");
         goto failed_shutdown;
     }
 
     r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
     if (r < 0) {
-        error_report("error opening pool %s", pool);
+        error_setg(errp, "error opening pool %s", pool);
         goto failed_shutdown;
     }
 
     r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
     if (r < 0) {
-        error_report("error reading header from %s", s->name);
+        error_setg(errp, "error reading header from %s", s->name);
         goto failed_open;
     }
 
@@ -538,25 +544,8 @@ static void qemu_rbd_close(BlockDriverState *bs)
     rados_shutdown(s->cluster);
 }
 
-/*
- * Cancel aio. Since we don't reference acb in a non qemu threads,
- * it is safe to access it here.
- */
-static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
-{
-    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
-    acb->cancelled = 1;
-
-    while (acb->status == -EINPROGRESS) {
-        qemu_aio_wait();
-    }
-
-    qemu_aio_release(acb);
-}
-
 static const AIOCBInfo rbd_aiocb_info = {
     .aiocb_size = sizeof(RBDAIOCB),
-    .cancel = qemu_rbd_aio_cancel,
 };
 
 static void rbd_finish_bh(void *opaque)
@@ -581,7 +570,8 @@ static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
     rcb->ret = rbd_aio_get_return_value(c);
     rbd_aio_release(c);
 
-    acb->bh = qemu_bh_new(rbd_finish_bh, rcb);
+    acb->bh = aio_bh_new(bdrv_get_aio_context(acb->common.bs),
+                         rbd_finish_bh, rcb);
     qemu_bh_schedule(acb->bh);
 }
 
@@ -607,16 +597,16 @@ static int rbd_aio_flush_wrapper(rbd_image_t image,
 #endif
 }
 
-static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
-                                       int64_t sector_num,
-                                       QEMUIOVector *qiov,
-                                       int nb_sectors,
-                                       BlockDriverCompletionFunc *cb,
-                                       void *opaque,
-                                       RBDAIOCmd cmd)
+static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
+                                 int64_t sector_num,
+                                 QEMUIOVector *qiov,
+                                 int nb_sectors,
+                                 BlockCompletionFunc *cb,
+                                 void *opaque,
+                                 RBDAIOCmd cmd)
 {
     RBDAIOCB *acb;
-    RADOSCB *rcb;
+    RADOSCB *rcb = NULL;
     rbd_completion_t c;
     int64_t off, size;
     char *buf;
@@ -630,14 +620,15 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
     if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
         acb->bounce = NULL;
     } else {
-        acb->bounce = qemu_blockalign(bs, qiov->size);
+        acb->bounce = qemu_try_blockalign(bs, qiov->size);
+        if (acb->bounce == NULL) {
+            goto failed;
+        }
     }
     acb->ret = 0;
     acb->error = 0;
     acb->s = s;
-    acb->cancelled = 0;
     acb->bh = NULL;
-    acb->status = -EINPROGRESS;
 
     if (cmd == RBD_AIO_WRITE) {
         qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
@@ -648,8 +639,7 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
     off = sector_num * BDRV_SECTOR_SIZE;
     size = nb_sectors * BDRV_SECTOR_SIZE;
 
-    rcb = g_malloc(sizeof(RADOSCB));
-    rcb->done = 0;
+    rcb = g_new(RADOSCB, 1);
     rcb->acb = acb;
     rcb->buf = buf;
     rcb->s = acb->s;
@@ -677,43 +667,46 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
     }
 
     if (r < 0) {
-        goto failed;
+        goto failed_completion;
     }
 
     return &acb->common;
 
+failed_completion:
+    rbd_aio_release(c);
 failed:
     g_free(rcb);
-    qemu_aio_release(acb);
+    qemu_vfree(acb->bounce);
+    qemu_aio_unref(acb);
     return NULL;
 }
 
-static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
-                                            int64_t sector_num,
-                                            QEMUIOVector *qiov,
-                                            int nb_sectors,
-                                            BlockDriverCompletionFunc *cb,
-                                            void *opaque)
+static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
+                                      int64_t sector_num,
+                                      QEMUIOVector *qiov,
+                                      int nb_sectors,
+                                      BlockCompletionFunc *cb,
+                                      void *opaque)
 {
     return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
                          RBD_AIO_READ);
 }
 
-static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
-                                             int64_t sector_num,
-                                             QEMUIOVector *qiov,
-                                             int nb_sectors,
-                                             BlockDriverCompletionFunc *cb,
-                                             void *opaque)
+static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
+                                       int64_t sector_num,
+                                       QEMUIOVector *qiov,
+                                       int nb_sectors,
+                                       BlockCompletionFunc *cb,
+                                       void *opaque)
 {
     return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
                          RBD_AIO_WRITE);
 }
 
 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
-static BlockDriverAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
-                                            BlockDriverCompletionFunc *cb,
-                                            void *opaque)
+static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
+                                      BlockCompletionFunc *cb,
+                                      void *opaque)
 {
     return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
 }
@@ -855,7 +848,7 @@ static int qemu_rbd_snap_list(BlockDriverState *bs,
     int max_snaps = RBD_MAX_SNAPS;
 
     do {
-        snaps = g_malloc(sizeof(*snaps) * max_snaps);
+        snaps = g_new(rbd_snap_info_t, max_snaps);
         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
         if (snap_count <= 0) {
             g_free(snaps);
@@ -866,7 +859,7 @@ static int qemu_rbd_snap_list(BlockDriverState *bs,
         goto done;
     }
 
-    sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo));
+    sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
 
     for (i = 0; i < snap_count; i++) {
         const char *snap_name = snaps[i].name;
@@ -889,29 +882,45 @@ static int qemu_rbd_snap_list(BlockDriverState *bs,
 }
 
 #ifdef LIBRBD_SUPPORTS_DISCARD
-static BlockDriverAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs,
-                                              int64_t sector_num,
-                                              int nb_sectors,
-                                              BlockDriverCompletionFunc *cb,
-                                              void *opaque)
+static BlockAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs,
+                                        int64_t sector_num,
+                                        int nb_sectors,
+                                        BlockCompletionFunc *cb,
+                                        void *opaque)
 {
     return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque,
                          RBD_AIO_DISCARD);
 }
 #endif
 
-static QEMUOptionParameter qemu_rbd_create_options[] = {
-    {
-     .name = BLOCK_OPT_SIZE,
-     .type = OPT_SIZE,
-     .help = "Virtual disk size"
-    },
-    {
-     .name = BLOCK_OPT_CLUSTER_SIZE,
-     .type = OPT_SIZE,
-     .help = "RBD object size"
-    },
-    {NULL}
+#ifdef LIBRBD_SUPPORTS_INVALIDATE
+static void qemu_rbd_invalidate_cache(BlockDriverState *bs,
+                                      Error **errp)
+{
+    BDRVRBDState *s = bs->opaque;
+    int r = rbd_invalidate_cache(s->image);
+    if (r < 0) {
+        error_setg_errno(errp, -r, "Failed to invalidate the cache");
+    }
+}
+#endif
+
+static QemuOptsList qemu_rbd_create_opts = {
+    .name = "rbd-create-opts",
+    .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
+    .desc = {
+        {
+            .name = BLOCK_OPT_SIZE,
+            .type = QEMU_OPT_SIZE,
+            .help = "Virtual disk size"
+        },
+        {
+            .name = BLOCK_OPT_CLUSTER_SIZE,
+            .type = QEMU_OPT_SIZE,
+            .help = "RBD object size"
+        },
+        { /* end of list */ }
+    }
 };
 
 static BlockDriver bdrv_rbd = {
@@ -923,7 +932,7 @@ static BlockDriver bdrv_rbd = {
     .bdrv_create        = qemu_rbd_create,
     .bdrv_has_zero_init = bdrv_has_zero_init_1,
     .bdrv_get_info      = qemu_rbd_getinfo,
-    .create_options     = qemu_rbd_create_options,
+    .create_opts        = &qemu_rbd_create_opts,
     .bdrv_getlength     = qemu_rbd_getlength,
     .bdrv_truncate      = qemu_rbd_truncate,
     .protocol_name      = "rbd",
@@ -945,6 +954,9 @@ static BlockDriver bdrv_rbd = {
     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
     .bdrv_snapshot_list     = qemu_rbd_snap_list,
     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
+#ifdef LIBRBD_SUPPORTS_INVALIDATE
+    .bdrv_invalidate_cache  = qemu_rbd_invalidate_cache,
+#endif
 };
 
 static void bdrv_rbd_init(void)