]> git.proxmox.com Git - qemu.git/blobdiff - block/rbd.c
Merge remote-tracking branch 'afaerber/qom-cpu' into staging
[qemu.git] / block / rbd.c
index 1b78d51398b6d3ad9d7c79ec62fed75c16f7207c..8cd10a7b59421cb778c9ac57a009f466f16688b9 100644 (file)
@@ -7,43 +7,50 @@
  * This work is licensed under the terms of the GNU GPL, version 2.  See
  * the COPYING file in the top-level directory.
  *
+ * Contributions after 2012-01-13 are licensed under the terms of the
+ * GNU GPL, version 2 or (at your option) any later version.
  */
 
 #include <inttypes.h>
 
 #include "qemu-common.h"
-#include "qemu-error.h"
-
-#include "block_int.h"
+#include "qemu/error-report.h"
+#include "block/block_int.h"
 
 #include <rbd/librbd.h>
 
-
-
 /*
  * When specifying the image filename use:
  *
  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
  *
- * poolname must be the name of an existing rados pool
+ * poolname must be the name of an existing rados pool.
  *
- * devicename is the basename for all objects used to
- * emulate the raw device.
+ * devicename is the name of the rbd image.
  *
- * Each option given is used to configure rados, and may be
- * any Ceph option, or "conf". The "conf" option specifies
- * a Ceph configuration file to read.
+ * Each option given is used to configure rados, and may be any valid
+ * Ceph option, "id", or "conf".
  *
- * Metadata information (image size, ...) is stored in an
- * object with the name "devicename.rbd".
+ * The "id" option indicates what user we should authenticate as to
+ * the Ceph cluster.  If it is excluded we will use the Ceph default
+ * (normally 'admin').
  *
- * The raw device is split into 4MB sized objects by default.
- * The sequencenumber is encoded in a 12 byte long hex-string,
- * and is attached to the devicename, separated by a dot.
- * e.g. "devicename.1234567890ab"
+ * The "conf" option specifies a Ceph configuration file to read.  If
+ * it is not specified, we will read from the default Ceph locations
+ * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
+ * file, specify conf=/dev/null.
  *
+ * Configuration values containing :, @, or = can be escaped with a
+ * leading "\".
  */
 
+/* rbd_aio_discard added in 0.1.2 */
+#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
+#define LIBRBD_SUPPORTS_DISCARD
+#else
+#undef LIBRBD_SUPPORTS_DISCARD
+#endif
+
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
 
 #define RBD_MAX_CONF_NAME_SIZE 128
 #define RBD_MAX_SNAP_NAME_SIZE 128
 #define RBD_MAX_SNAPS 100
 
+typedef enum {
+    RBD_AIO_READ,
+    RBD_AIO_WRITE,
+    RBD_AIO_DISCARD
+} RBDAIOCmd;
+
 typedef struct RBDAIOCB {
     BlockDriverAIOCB common;
     QEMUBH *bh;
-    int ret;
+    int64_t ret;
     QEMUIOVector *qiov;
     char *bounce;
-    int write;
+    RBDAIOCmd cmd;
     int64_t sector_num;
     int error;
     struct BDRVRBDState *s;
     int cancelled;
+    int status;
 } RBDAIOCB;
 
 typedef struct RADOSCB {
@@ -73,7 +87,7 @@ typedef struct RADOSCB {
     int done;
     int64_t size;
     char *buf;
-    int ret;
+    int64_t ret;
 } RADOSCB;
 
 #define RBD_FD_READ 0
@@ -104,8 +118,15 @@ static int qemu_rbd_next_tok(char *dst, int dst_len,
     *p = NULL;
 
     if (delim != '\0') {
-        end = strchr(src, delim);
-        if (end) {
+        for (end = src; *end; ++end) {
+            if (*end == delim) {
+                break;
+            }
+            if (*end == '\\' && end[1] != '\0') {
+                end++;
+            }
+        }
+        if (*end == delim) {
             *p = end + 1;
             *end = '\0';
         }
@@ -124,6 +145,19 @@ static int qemu_rbd_next_tok(char *dst, int dst_len,
     return 0;
 }
 
+static void qemu_rbd_unescape(char *src)
+{
+    char *p;
+
+    for (p = src; *src; ++src, ++p) {
+        if (*src == '\\' && src[1] != '\0') {
+            src++;
+        }
+        *p = *src;
+    }
+    *p = '\0';
+}
+
 static int qemu_rbd_parsename(const char *filename,
                               char *pool, int pool_len,
                               char *snap, int snap_len,
@@ -148,6 +182,7 @@ static int qemu_rbd_parsename(const char *filename,
         ret = -EINVAL;
         goto done;
     }
+    qemu_rbd_unescape(pool);
 
     if (strchr(p, '@')) {
         ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
@@ -155,9 +190,11 @@ static int qemu_rbd_parsename(const char *filename,
             goto done;
         }
         ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p);
+        qemu_rbd_unescape(snap);
     } else {
         ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p);
     }
+    qemu_rbd_unescape(name);
     if (ret < 0 || !p) {
         goto done;
     }
@@ -213,6 +250,7 @@ static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
         if (ret < 0) {
             break;
         }
+        qemu_rbd_unescape(name);
 
         if (!p) {
             error_report("conf option %s has no value", name);
@@ -225,6 +263,7 @@ static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
         if (ret < 0) {
             break;
         }
+        qemu_rbd_unescape(value);
 
         if (strcmp(name, "conf") == 0) {
             ret = rados_conf_read_file(cluster, value);
@@ -298,11 +337,8 @@ static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options)
     }
 
     if (strstr(conf, "conf=") == NULL) {
-        if (rados_conf_read_file(cluster, NULL) < 0) {
-            error_report("error reading config file");
-            rados_shutdown(cluster);
-            return -EIO;
-        }
+        /* try default location, but ignore failure */
+        rados_conf_read_file(cluster, NULL);
     }
 
     if (conf[0] != '\0' &&
@@ -341,15 +377,10 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
     RBDAIOCB *acb = rcb->acb;
     int64_t r;
 
-    if (acb->cancelled) {
-        qemu_vfree(acb->bounce);
-        qemu_aio_release(acb);
-        goto done;
-    }
-
     r = rcb->ret;
 
-    if (acb->write) {
+    if (acb->cmd == RBD_AIO_WRITE ||
+        acb->cmd == RBD_AIO_DISCARD) {
         if (r < 0) {
             acb->ret = r;
             acb->error = 1;
@@ -373,7 +404,6 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
     /* Note that acb->bh can be NULL in case where the aio was cancelled */
     acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
     qemu_bh_schedule(acb->bh);
-done:
     g_free(rcb);
 }
 
@@ -440,12 +470,22 @@ static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags)
         s->snap = g_strdup(snap_buf);
     }
 
+    /*
+     * Fallback to more conservative semantics if setting cache
+     * options fails. Ignore errors from setting rbd_cache because the
+     * only possible error is that the option does not exist, and
+     * librbd defaults to no caching. If write through caching cannot
+     * be set up, fall back to no caching.
+     */
+    if (flags & BDRV_O_NOCACHE) {
+        rados_conf_set(s->cluster, "rbd_cache", "false");
+    } else {
+        rados_conf_set(s->cluster, "rbd_cache", "true");
+    }
+
     if (strstr(conf, "conf=") == NULL) {
-        r = rados_conf_read_file(s->cluster, NULL);
-        if (r < 0) {
-            error_report("error reading config file");
-            goto failed_shutdown;
-        }
+        /* try default location, but ignore failure */
+        rados_conf_read_file(s->cluster, NULL);
     }
 
     if (conf[0] != '\0') {
@@ -485,7 +525,7 @@ static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags)
     fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
     fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
     qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
-                            NULL, qemu_rbd_aio_flush_cb, NULL, s);
+                            NULL, qemu_rbd_aio_flush_cb, s);
 
 
     return 0;
@@ -506,8 +546,7 @@ static void qemu_rbd_close(BlockDriverState *bs)
 
     close(s->fds[0]);
     close(s->fds[1]);
-    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL,
-        NULL);
+    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL, NULL);
 
     rbd_close(s->image);
     rados_ioctx_destroy(s->io_ctx);
@@ -523,9 +562,15 @@ static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     RBDAIOCB *acb = (RBDAIOCB *) blockacb;
     acb->cancelled = 1;
+
+    while (acb->status == -EINPROGRESS) {
+        qemu_aio_wait();
+    }
+
+    qemu_aio_release(acb);
 }
 
-static AIOPool rbd_aio_pool = {
+static const AIOCBInfo rbd_aiocb_info = {
     .aiocb_size = sizeof(RBDAIOCB),
     .cancel = qemu_rbd_aio_cancel,
 };
@@ -587,23 +632,39 @@ static void rbd_aio_bh_cb(void *opaque)
 {
     RBDAIOCB *acb = opaque;
 
-    if (!acb->write) {
-        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
+    if (acb->cmd == RBD_AIO_READ) {
+        qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
     }
     qemu_vfree(acb->bounce);
     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
+    acb->status = 0;
 
-    qemu_aio_release(acb);
+    if (!acb->cancelled) {
+        qemu_aio_release(acb);
+    }
 }
 
-static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
-                                           int64_t sector_num,
-                                           QEMUIOVector *qiov,
-                                           int nb_sectors,
-                                           BlockDriverCompletionFunc *cb,
-                                           void *opaque, int write)
+static int rbd_aio_discard_wrapper(rbd_image_t image,
+                                   uint64_t off,
+                                   uint64_t len,
+                                   rbd_completion_t comp)
+{
+#ifdef LIBRBD_SUPPORTS_DISCARD
+    return rbd_aio_discard(image, off, len, comp);
+#else
+    return -ENOTSUP;
+#endif
+}
+
+static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
+                                       int64_t sector_num,
+                                       QEMUIOVector *qiov,
+                                       int nb_sectors,
+                                       BlockDriverCompletionFunc *cb,
+                                       void *opaque,
+                                       RBDAIOCmd cmd)
 {
     RBDAIOCB *acb;
     RADOSCB *rcb;
@@ -614,21 +675,23 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
 
     BDRVRBDState *s = bs->opaque;
 
-    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
-    if (!acb) {
-        return NULL;
-    }
-    acb->write = write;
+    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
+    acb->cmd = cmd;
     acb->qiov = qiov;
-    acb->bounce = qemu_blockalign(bs, qiov->size);
+    if (cmd == RBD_AIO_DISCARD) {
+        acb->bounce = NULL;
+    } else {
+        acb->bounce = qemu_blockalign(bs, qiov->size);
+    }
     acb->ret = 0;
     acb->error = 0;
     acb->s = s;
     acb->cancelled = 0;
     acb->bh = NULL;
+    acb->status = -EINPROGRESS;
 
-    if (write) {
-        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
+    if (cmd == RBD_AIO_WRITE) {
+        qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
     }
 
     buf = acb->bounce;
@@ -649,10 +712,18 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
         goto failed;
     }
 
-    if (write) {
+    switch (cmd) {
+    case RBD_AIO_WRITE:
         r = rbd_aio_write(s->image, off, size, buf, c);
-    } else {
+        break;
+    case RBD_AIO_READ:
         r = rbd_aio_read(s->image, off, size, buf, c);
+        break;
+    case RBD_AIO_DISCARD:
+        r = rbd_aio_discard_wrapper(s->image, off, size, c);
+        break;
+    default:
+        r = -EINVAL;
     }
 
     if (r < 0) {
@@ -675,7 +746,8 @@ static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
                                             BlockDriverCompletionFunc *cb,
                                             void *opaque)
 {
-    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
+    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
+                         RBD_AIO_READ);
 }
 
 static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
@@ -685,7 +757,19 @@ static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
                                              BlockDriverCompletionFunc *cb,
                                              void *opaque)
 {
-    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
+    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
+                         RBD_AIO_WRITE);
+}
+
+static int qemu_rbd_co_flush(BlockDriverState *bs)
+{
+#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
+    /* rbd_flush added in 0.1.1 */
+    BDRVRBDState *s = bs->opaque;
+    return rbd_flush(s->image);
+#else
+    return 0;
+#endif
 }
 
 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
@@ -762,6 +846,26 @@ static int qemu_rbd_snap_create(BlockDriverState *bs,
     return 0;
 }
 
+static int qemu_rbd_snap_remove(BlockDriverState *bs,
+                                const char *snapshot_name)
+{
+    BDRVRBDState *s = bs->opaque;
+    int r;
+
+    r = rbd_snap_remove(s->image, snapshot_name);
+    return r;
+}
+
+static int qemu_rbd_snap_rollback(BlockDriverState *bs,
+                                  const char *snapshot_name)
+{
+    BDRVRBDState *s = bs->opaque;
+    int r;
+
+    r = rbd_snap_rollback(s->image, snapshot_name);
+    return r;
+}
+
 static int qemu_rbd_snap_list(BlockDriverState *bs,
                               QEMUSnapshotInfo **psn_tab)
 {
@@ -780,7 +884,7 @@ static int qemu_rbd_snap_list(BlockDriverState *bs,
     } while (snap_count == -ERANGE);
 
     if (snap_count <= 0) {
-        return snap_count;
+        goto done;
     }
 
     sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo));
@@ -799,10 +903,23 @@ static int qemu_rbd_snap_list(BlockDriverState *bs,
     }
     rbd_snap_list_end(snaps);
 
+ done:
     *psn_tab = sn_tab;
     return snap_count;
 }
 
+#ifdef LIBRBD_SUPPORTS_DISCARD
+static BlockDriverAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs,
+                                              int64_t sector_num,
+                                              int nb_sectors,
+                                              BlockDriverCompletionFunc *cb,
+                                              void *opaque)
+{
+    return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque,
+                         RBD_AIO_DISCARD);
+}
+#endif
+
 static QEMUOptionParameter qemu_rbd_create_options[] = {
     {
      .name = BLOCK_OPT_SIZE,
@@ -829,11 +946,18 @@ static BlockDriver bdrv_rbd = {
     .bdrv_truncate      = qemu_rbd_truncate,
     .protocol_name      = "rbd",
 
-    .bdrv_aio_readv     = qemu_rbd_aio_readv,
-    .bdrv_aio_writev    = qemu_rbd_aio_writev,
+    .bdrv_aio_readv         = qemu_rbd_aio_readv,
+    .bdrv_aio_writev        = qemu_rbd_aio_writev,
+    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
+
+#ifdef LIBRBD_SUPPORTS_DISCARD
+    .bdrv_aio_discard       = qemu_rbd_aio_discard,
+#endif
 
-    .bdrv_snapshot_create = qemu_rbd_snap_create,
-    .bdrv_snapshot_list = qemu_rbd_snap_list,
+    .bdrv_snapshot_create   = qemu_rbd_snap_create,
+    .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
+    .bdrv_snapshot_list     = qemu_rbd_snap_list,
+    .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
 };
 
 static void bdrv_rbd_init(void)