/*
* When specifying the image filename use:
*
- * rbd:poolname/devicename
+ * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
*
* poolname must be the name of an existing rados pool
*
* devicename is the basename for all objects used to
* emulate the raw device.
*
+ * Each option given is used to configure rados, and may be
+ * any Ceph option, or "conf". The "conf" option specifies
+ * a Ceph configuration file to read.
+ *
* Metadata information (image size, ...) is stored in an
* object with the name "devicename.rbd".
*
static int qemu_rbd_parsename(const char *filename,
char *pool, int pool_len,
char *snap, int snap_len,
- char *name, int name_len)
+ char *name, int name_len,
+ char *conf, int conf_len)
{
const char *start;
char *p, *buf;
buf = qemu_strdup(start);
p = buf;
+ *snap = '\0';
+ *conf = '\0';
ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
if (ret < 0 || !p) {
ret = -EINVAL;
goto done;
}
- ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
- if (ret < 0) {
- goto done;
+
+ if (strchr(p, '@')) {
+ ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
+ if (ret < 0) {
+ goto done;
+ }
+ ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p);
+ } else {
+ ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p);
}
- if (!p) {
- *snap = '\0';
+ if (ret < 0 || !p) {
goto done;
}
- ret = qemu_rbd_next_tok(snap, snap_len, p, '\0', "snap name", &p);
+ ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p);
done:
qemu_free(buf);
return ret;
}
+static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
+{
+ char *p, *buf;
+ char name[RBD_MAX_CONF_NAME_SIZE];
+ char value[RBD_MAX_CONF_VAL_SIZE];
+ int ret = 0;
+
+ buf = qemu_strdup(conf);
+ p = buf;
+
+ while (p) {
+ ret = qemu_rbd_next_tok(name, sizeof(name), p,
+ '=', "conf option name", &p);
+ if (ret < 0) {
+ break;
+ }
+
+ if (!p) {
+ error_report("conf option %s has no value", name);
+ ret = -EINVAL;
+ break;
+ }
+
+ ret = qemu_rbd_next_tok(value, sizeof(value), p,
+ ':', "conf option value", &p);
+ if (ret < 0) {
+ break;
+ }
+
+ if (strcmp(name, "conf")) {
+ ret = rados_conf_set(cluster, name, value);
+ if (ret < 0) {
+ error_report("invalid conf option %s", name);
+ ret = -EINVAL;
+ break;
+ }
+ } else {
+ ret = rados_conf_read_file(cluster, value);
+ if (ret < 0) {
+ error_report("error reading conf file %s", value);
+ break;
+ }
+ }
+ }
+
+ qemu_free(buf);
+ return ret;
+}
+
static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options)
{
int64_t bytes = 0;
char pool[RBD_MAX_POOL_NAME_SIZE];
char name[RBD_MAX_IMAGE_NAME_SIZE];
char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
- char *snap = NULL;
+ char conf[RBD_MAX_CONF_SIZE];
rados_t cluster;
rados_ioctx_t io_ctx;
int ret;
if (qemu_rbd_parsename(filename, pool, sizeof(pool),
snap_buf, sizeof(snap_buf),
- name, sizeof(name)) < 0) {
+ name, sizeof(name),
+ conf, sizeof(conf)) < 0) {
return -EINVAL;
}
- if (snap_buf[0] != '\0') {
- snap = snap_buf;
- }
/* Read out options */
while (options && options->name) {
return -EIO;
}
- if (rados_conf_read_file(cluster, NULL) < 0) {
- error_report("error reading config file");
+ if (strstr(conf, "conf=") == NULL) {
+ if (rados_conf_read_file(cluster, NULL) < 0) {
+ error_report("error reading config file");
+ rados_shutdown(cluster);
+ return -EIO;
+ }
+ }
+
+ if (conf[0] != '\0' &&
+ qemu_rbd_set_conf(cluster, conf) < 0) {
+ error_report("error setting config options");
rados_shutdown(cluster);
return -EIO;
}
BDRVRBDState *s = bs->opaque;
char pool[RBD_MAX_POOL_NAME_SIZE];
char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
+ char conf[RBD_MAX_CONF_SIZE];
int r;
if (qemu_rbd_parsename(filename, pool, sizeof(pool),
snap_buf, sizeof(snap_buf),
- s->name, sizeof(s->name)) < 0) {
+ s->name, sizeof(s->name),
+ conf, sizeof(conf)) < 0) {
return -EINVAL;
}
s->snap = NULL;
return r;
}
- r = rados_conf_read_file(s->cluster, NULL);
- if (r < 0) {
- error_report("error reading config file");
- rados_shutdown(s->cluster);
- return r;
+ if (strstr(conf, "conf=") == NULL) {
+ r = rados_conf_read_file(s->cluster, NULL);
+ if (r < 0) {
+ error_report("error reading config file");
+ rados_shutdown(s->cluster);
+ return r;
+ }
+ }
+
+ if (conf[0] != '\0') {
+ r = qemu_rbd_set_conf(s->cluster, conf);
+ if (r < 0) {
+ error_report("error setting config options");
+ rados_shutdown(s->cluster);
+ return r;
+ }
}
r = rados_connect(s->cluster);
rbd_completion_t c;
int64_t off, size;
char *buf;
+ int r;
BDRVRBDState *s = bs->opaque;
acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
+ if (!acb) {
+ return NULL;
+ }
acb->write = write;
acb->qiov = qiov;
acb->bounce = qemu_blockalign(bs, qiov->size);
rcb->buf = buf;
rcb->s = acb->s;
rcb->size = size;
+ r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
+ if (r < 0) {
+ goto failed;
+ }
if (write) {
- rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
- rbd_aio_write(s->image, off, size, buf, c);
+ r = rbd_aio_write(s->image, off, size, buf, c);
} else {
- rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
- rbd_aio_read(s->image, off, size, buf, c);
+ r = rbd_aio_read(s->image, off, size, buf, c);
+ }
+
+ if (r < 0) {
+ goto failed;
}
return &acb->common;
+
+failed:
+ qemu_free(rcb);
+ s->qemu_aio_count--;
+ qemu_aio_release(acb);
+ return NULL;
}
static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
return info.size;
}
+static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset)
+{
+ BDRVRBDState *s = bs->opaque;
+ int r;
+
+ r = rbd_resize(s->image, offset);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
static int qemu_rbd_snap_create(BlockDriverState *bs,
QEMUSnapshotInfo *sn_info)
{
.bdrv_get_info = qemu_rbd_getinfo,
.create_options = qemu_rbd_create_options,
.bdrv_getlength = qemu_rbd_getlength,
+ .bdrv_truncate = qemu_rbd_truncate,
.protocol_name = "rbd",
.bdrv_aio_readv = qemu_rbd_aio_readv,