#include <rbd/librbd.h>
#include "qapi/error.h"
#include "qemu/error-report.h"
+#include "qemu/module.h"
#include "qemu/option.h"
#include "block/block_int.h"
#include "block/qdict.h"
#include "crypto/secret.h"
#include "qemu/cutils.h"
+#include "sysemu/replay.h"
#include "qapi/qmp/qstring.h"
#include "qapi/qmp/qdict.h"
#include "qapi/qmp/qjson.h"
rbd_image_t image;
char *image_name;
char *snap;
+ char *namespace;
+ uint64_t image_size;
} BDRVRBDState;
static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
const char *start;
char *p, *buf;
QList *keypairs = NULL;
- char *found_str;
+ char *found_str, *image_name;
if (!strstart(filename, "rbd:", &start)) {
error_setg(errp, "File name must start with 'rbd:'");
qdict_put_str(options, "pool", found_str);
if (strchr(p, '@')) {
- found_str = qemu_rbd_next_tok(p, '@', &p);
- qemu_rbd_unescape(found_str);
- qdict_put_str(options, "image", found_str);
+ image_name = qemu_rbd_next_tok(p, '@', &p);
found_str = qemu_rbd_next_tok(p, ':', &p);
qemu_rbd_unescape(found_str);
qdict_put_str(options, "snapshot", found_str);
} else {
- found_str = qemu_rbd_next_tok(p, ':', &p);
+ image_name = qemu_rbd_next_tok(p, ':', &p);
+ }
+ /* Check for namespace in the image_name */
+ if (strchr(image_name, '/')) {
+ found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
qemu_rbd_unescape(found_str);
- qdict_put_str(options, "image", found_str);
+ qdict_put_str(options, "namespace", found_str);
+ } else {
+ qdict_put_str(options, "namespace", "");
}
+ qemu_rbd_unescape(image_name);
+ qdict_put_str(options, "image", image_name);
if (!p) {
goto done;
}
.type = QEMU_OPT_STRING,
.help = "Rados pool name",
},
+ {
+ .name = "namespace",
+ .type = QEMU_OPT_STRING,
+ .help = "Rados namespace name in the pool",
+ },
{
.name = "image",
.type = QEMU_OPT_STRING,
return qemu_rbd_do_create(options, NULL, NULL, errp);
}
-static int coroutine_fn qemu_rbd_co_create_opts(const char *filename,
+static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
+ const char *filename,
QemuOpts *opts,
Error **errp)
{
* schema, but when they come from -drive, they're all QString.
*/
loc = rbd_opts->location;
- loc->pool = g_strdup(qdict_get_try_str(options, "pool"));
- loc->conf = g_strdup(qdict_get_try_str(options, "conf"));
- loc->has_conf = !!loc->conf;
- loc->user = g_strdup(qdict_get_try_str(options, "user"));
- loc->has_user = !!loc->user;
- loc->image = g_strdup(qdict_get_try_str(options, "image"));
- keypairs = qdict_get_try_str(options, "=keyvalue-pairs");
+ loc->pool = g_strdup(qdict_get_try_str(options, "pool"));
+ loc->conf = g_strdup(qdict_get_try_str(options, "conf"));
+ loc->has_conf = !!loc->conf;
+ loc->user = g_strdup(qdict_get_try_str(options, "user"));
+ loc->has_user = !!loc->user;
+ loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
+ loc->image = g_strdup(qdict_get_try_str(options, "image"));
+ keypairs = qdict_get_try_str(options, "=keyvalue-pairs");
ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
if (ret < 0) {
error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
goto failed_shutdown;
}
+ /*
+ * Set the namespace after opening the io context on the pool,
+ * if nspace == NULL or if nspace == "", it is just as we did nothing
+ */
+ rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
return 0;
goto failed_open;
}
+ r = rbd_get_size(s->image, &s->image_size);
+ if (r < 0) {
+ error_setg_errno(errp, -r, "error getting image size from %s",
+ s->image_name);
+ rbd_close(s->image);
+ goto failed_open;
+ }
+
/* If we are using an rbd snapshot, we must be r/o, otherwise
* leave as-is */
if (s->snap != NULL) {
rados_shutdown(s->cluster);
}
+/* Resize the RBD image and update the 'image_size' with the current size */
+static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
+{
+ BDRVRBDState *s = bs->opaque;
+ int r;
+
+ r = rbd_resize(s->image, size);
+ if (r < 0) {
+ return r;
+ }
+
+ s->image_size = size;
+
+ return 0;
+}
+
static const AIOCBInfo rbd_aiocb_info = {
.aiocb_size = sizeof(RBDAIOCB),
};
rcb->ret = rbd_aio_get_return_value(c);
rbd_aio_release(c);
- aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs),
- rbd_finish_bh, rcb);
+ replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
+ rbd_finish_bh, rcb);
}
static int rbd_aio_discard_wrapper(rbd_image_t image,
}
switch (cmd) {
- case RBD_AIO_WRITE:
+ case RBD_AIO_WRITE: {
+ /*
+ * RBD APIs don't allow us to write more than actual size, so in order
+ * to support growing images, we resize the image before write
+ * operations that exceed the current size.
+ */
+ if (off + size > s->image_size) {
+ r = qemu_rbd_resize(bs, off + size);
+ if (r < 0) {
+ goto failed_completion;
+ }
+ }
#ifdef LIBRBD_SUPPORTS_IOVEC
r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
#else
r = rbd_aio_write(s->image, off, size, rcb->buf, c);
#endif
break;
+ }
case RBD_AIO_READ:
#ifdef LIBRBD_SUPPORTS_IOVEC
r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
int64_t offset,
+ bool exact,
PreallocMode prealloc,
+ BdrvRequestFlags flags,
Error **errp)
{
- BDRVRBDState *s = bs->opaque;
int r;
if (prealloc != PREALLOC_MODE_OFF) {
return -ENOTSUP;
}
- r = rbd_resize(s->image, offset);
+ r = qemu_rbd_resize(bs, offset);
if (r < 0) {
error_setg_errno(errp, -r, "Failed to resize file");
return r;
.bdrv_co_create = qemu_rbd_co_create,
.bdrv_co_create_opts = qemu_rbd_co_create_opts,
.bdrv_has_zero_init = bdrv_has_zero_init_1,
+ .bdrv_has_zero_init_truncate = bdrv_has_zero_init_1,
.bdrv_get_info = qemu_rbd_getinfo,
.create_opts = &qemu_rbd_create_opts,
.bdrv_getlength = qemu_rbd_getlength,