#include "qemu/cutils.h"
#include "qemu/module.h"
#include "qemu/option.h"
+#include "qemu/memalign.h"
#include "block/block_int.h"
+#include "block/coroutines.h"
#include "block/qdict.h"
#include "qapi/error.h"
#include "qapi/qapi-events-block.h"
#define HASH_LENGTH 32
+#define INDEXSTR_LEN 32
+
#define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold"
#define QUORUM_OPT_BLKVERIFY "blkverify"
#define QUORUM_OPT_REWRITE "rewrite-corrupted"
return a->l == b->l;
}
-static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
- QEMUIOVector *qiov,
- uint64_t offset,
- uint64_t bytes,
- int flags)
+static QuorumAIOCB *coroutine_fn quorum_aio_get(BlockDriverState *bs,
+ QEMUIOVector *qiov,
+ uint64_t offset, uint64_t bytes,
+ int flags)
{
BDRVQuorumState *s = bs->opaque;
QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
msg = strerror(-ret);
}
- qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector,
+ qapi_event_send_quorum_report_bad(type, msg, node_name, start_sector,
end_sector - start_sector);
}
return false;
}
-static int read_fifo_child(QuorumAIOCB *acb);
-
static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
{
int i;
}
}
-static void quorum_rewrite_entry(void *opaque)
+/*
+ * This function can count as GRAPH_RDLOCK because read_quorum_children() holds
+ * the graph lock and keeps it until this coroutine has terminated.
+ */
+static void coroutine_fn GRAPH_RDLOCK quorum_rewrite_entry(void *opaque)
{
QuorumCo *co = opaque;
QuorumAIOCB *acb = co->acb;
}
}
-static bool quorum_rewrite_bad_versions(QuorumAIOCB *acb,
- QuorumVoteValue *value)
+static bool coroutine_fn GRAPH_RDLOCK
+quorum_rewrite_bad_versions(QuorumAIOCB *acb, QuorumVoteValue *value)
{
QuorumVoteVersion *version;
QuorumVoteItem *item;
return ret;
}
-static void quorum_vote(QuorumAIOCB *acb)
+static void coroutine_fn GRAPH_RDLOCK quorum_vote(QuorumAIOCB *acb)
{
bool quorum = true;
int i, j, ret;
quorum_free_vote_list(&acb->votes);
}
-static void read_quorum_children_entry(void *opaque)
+/*
+ * This function can count as GRAPH_RDLOCK because read_quorum_children() holds
+ * the graph lock and keeps it until this coroutine has terminated.
+ */
+static void coroutine_fn GRAPH_RDLOCK read_quorum_children_entry(void *opaque)
{
QuorumCo *co = opaque;
QuorumAIOCB *acb = co->acb;
}
}
-static int read_quorum_children(QuorumAIOCB *acb)
+static int coroutine_fn GRAPH_RDLOCK read_quorum_children(QuorumAIOCB *acb)
{
BDRVQuorumState *s = acb->bs->opaque;
int i;
return acb->vote_ret;
}
-static int read_fifo_child(QuorumAIOCB *acb)
+static int coroutine_fn GRAPH_RDLOCK read_fifo_child(QuorumAIOCB *acb)
{
BDRVQuorumState *s = acb->bs->opaque;
int n, ret;
return ret;
}
-static int quorum_co_preadv(BlockDriverState *bs, uint64_t offset,
- uint64_t bytes, QEMUIOVector *qiov, int flags)
+static int coroutine_fn GRAPH_RDLOCK
+quorum_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
+ QEMUIOVector *qiov, BdrvRequestFlags flags)
{
BDRVQuorumState *s = bs->opaque;
QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
return ret;
}
-static void write_quorum_entry(void *opaque)
+/*
+ * This function can count as GRAPH_RDLOCK because quorum_co_pwritev() holds the
+ * graph lock and keeps it until this coroutine has terminated.
+ */
+static void coroutine_fn GRAPH_RDLOCK write_quorum_entry(void *opaque)
{
QuorumCo *co = opaque;
QuorumAIOCB *acb = co->acb;
QuorumChildRequest *sacb = &acb->qcrs[i];
sacb->bs = s->children[i]->bs;
- sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes,
- acb->qiov, acb->flags);
+ if (acb->flags & BDRV_REQ_ZERO_WRITE) {
+ sacb->ret = bdrv_co_pwrite_zeroes(s->children[i], acb->offset,
+ acb->bytes, acb->flags);
+ } else {
+ sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes,
+ acb->qiov, acb->flags);
+ }
if (sacb->ret == 0) {
acb->success_count++;
} else {
}
}
-static int quorum_co_pwritev(BlockDriverState *bs, uint64_t offset,
- uint64_t bytes, QEMUIOVector *qiov, int flags)
+static int coroutine_fn GRAPH_RDLOCK
+quorum_co_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
+ QEMUIOVector *qiov, BdrvRequestFlags flags)
{
BDRVQuorumState *s = bs->opaque;
QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
return ret;
}
-static int64_t quorum_getlength(BlockDriverState *bs)
+static int coroutine_fn GRAPH_RDLOCK
+quorum_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset, int64_t bytes,
+ BdrvRequestFlags flags)
+{
+ return quorum_co_pwritev(bs, offset, bytes, NULL,
+ flags | BDRV_REQ_ZERO_WRITE);
+}
+
+static int64_t coroutine_fn GRAPH_RDLOCK
+quorum_co_getlength(BlockDriverState *bs)
{
BDRVQuorumState *s = bs->opaque;
int64_t result;
int i;
/* check that all file have the same length */
- result = bdrv_getlength(s->children[0]->bs);
+ result = bdrv_co_getlength(s->children[0]->bs);
if (result < 0) {
return result;
}
for (i = 1; i < s->num_children; i++) {
- int64_t value = bdrv_getlength(s->children[i]->bs);
+ int64_t value = bdrv_co_getlength(s->children[i]->bs);
if (value < 0) {
return value;
}
return result;
}
-static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
+static coroutine_fn GRAPH_RDLOCK int quorum_co_flush(BlockDriverState *bs)
{
BDRVQuorumState *s = bs->opaque;
QuorumVoteVersion *winner = NULL;
return result;
}
-static bool quorum_recurse_can_replace(BlockDriverState *bs,
- BlockDriverState *to_replace)
+static bool GRAPH_RDLOCK
+quorum_recurse_can_replace(BlockDriverState *bs, BlockDriverState *to_replace)
{
BDRVQuorumState *s = bs->opaque;
int i;
if (threshold < 1) {
error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
- "vote-threshold", "value >= 1");
+ "vote-threshold", "a value >= 1");
return -ERANGE;
}
},
};
+static void quorum_refresh_flags(BlockDriverState *bs)
+{
+ BDRVQuorumState *s = bs->opaque;
+ int i;
+
+ bs->supported_zero_flags =
+ BDRV_REQ_FUA | BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
+
+ for (i = 0; i < s->num_children; i++) {
+ bs->supported_zero_flags &= s->children[i]->bs->supported_zero_flags;
+ }
+
+ bs->supported_zero_flags |= BDRV_REQ_WRITE_UNCHANGED;
+}
+
static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
Error **errp)
{
BDRVQuorumState *s = bs->opaque;
- Error *local_err = NULL;
QemuOpts *opts = NULL;
const char *pattern_str;
bool *opened;
/* count how many different children are present */
s->num_children = qdict_array_entries(options, "children.");
if (s->num_children < 0) {
- error_setg(&local_err, "Option children is not a valid array");
+ error_setg(errp, "Option children is not a valid array");
ret = -EINVAL;
goto exit;
}
if (s->num_children < 1) {
- error_setg(&local_err,
- "Number of provided children must be 1 or more");
+ error_setg(errp, "Number of provided children must be 1 or more");
ret = -EINVAL;
goto exit;
}
opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort);
- qemu_opts_absorb_qdict(opts, options, &local_err);
- if (local_err) {
+ if (!qemu_opts_absorb_qdict(opts, options, errp)) {
ret = -EINVAL;
goto exit;
}
s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0);
/* and validate it against s->num_children */
- ret = quorum_valid_threshold(s->threshold, s->num_children, &local_err);
+ ret = quorum_valid_threshold(s->threshold, s->num_children, errp);
if (ret < 0) {
goto exit;
}
-EINVAL, NULL);
}
if (ret < 0) {
- error_setg(&local_err, "Please set read-pattern as fifo or quorum");
+ error_setg(errp, "Please set read-pattern as fifo or quorum");
goto exit;
}
s->read_pattern = ret;
if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
s->is_blkverify = qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false);
if (s->is_blkverify && (s->num_children != 2 || s->threshold != 2)) {
- error_setg(&local_err, "blkverify=on can only be set if there are "
+ error_setg(errp, "blkverify=on can only be set if there are "
"exactly two files and vote-threshold is 2");
ret = -EINVAL;
goto exit;
s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE,
false);
if (s->rewrite_corrupted && s->is_blkverify) {
- error_setg(&local_err,
+ error_setg(errp,
"rewrite-corrupted=on cannot be used with blkverify=on");
ret = -EINVAL;
goto exit;
opened = g_new0(bool, s->num_children);
for (i = 0; i < s->num_children; i++) {
- char indexstr[32];
- ret = snprintf(indexstr, 32, "children.%d", i);
- assert(ret < 32);
+ char indexstr[INDEXSTR_LEN];
+ ret = snprintf(indexstr, INDEXSTR_LEN, "children.%d", i);
+ assert(ret < INDEXSTR_LEN);
s->children[i] = bdrv_open_child(NULL, options, indexstr, bs,
&child_of_bds, BDRV_CHILD_DATA, false,
- &local_err);
- if (local_err) {
+ errp);
+ if (!s->children[i]) {
ret = -EINVAL;
goto close_exit;
}
s->next_child_index = s->num_children;
bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
+ quorum_refresh_flags(bs);
g_free(opened);
goto exit;
close_exit:
/* cleanup on error */
+ bdrv_graph_wrlock(NULL);
for (i = 0; i < s->num_children; i++) {
if (!opened[i]) {
continue;
}
bdrv_unref_child(bs, s->children[i]);
}
+ bdrv_graph_wrunlock();
g_free(s->children);
g_free(opened);
exit:
qemu_opts_del(opts);
- /* propagate error */
- error_propagate(errp, local_err);
return ret;
}
BDRVQuorumState *s = bs->opaque;
int i;
+ bdrv_graph_wrlock(NULL);
for (i = 0; i < s->num_children; i++) {
bdrv_unref_child(bs, s->children[i]);
}
+ bdrv_graph_wrunlock();
g_free(s->children);
}
-static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs,
- Error **errp)
+static void GRAPH_WRLOCK
+quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs, Error **errp)
{
BDRVQuorumState *s = bs->opaque;
BdrvChild *child;
- char indexstr[32];
+ char indexstr[INDEXSTR_LEN];
int ret;
if (s->is_blkverify) {
return;
}
- ret = snprintf(indexstr, 32, "children.%u", s->next_child_index);
- if (ret < 0 || ret >= 32) {
+ ret = snprintf(indexstr, INDEXSTR_LEN, "children.%u", s->next_child_index);
+ if (ret < 0 || ret >= INDEXSTR_LEN) {
error_setg(errp, "cannot generate child name");
return;
}
s->next_child_index++;
- bdrv_drained_begin(bs);
-
/* We can safely add the child now */
bdrv_ref(child_bs);
BDRV_CHILD_DATA, errp);
if (child == NULL) {
s->next_child_index--;
- goto out;
+ return;
}
s->children = g_renew(BdrvChild *, s->children, s->num_children + 1);
s->children[s->num_children++] = child;
-
-out:
- bdrv_drained_end(bs);
+ quorum_refresh_flags(bs);
}
-static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
- Error **errp)
+static void GRAPH_WRLOCK
+quorum_del_child(BlockDriverState *bs, BdrvChild *child, Error **errp)
{
BDRVQuorumState *s = bs->opaque;
+ char indexstr[INDEXSTR_LEN];
int i;
for (i = 0; i < s->num_children; i++) {
/* We know now that num_children > threshold, so blkverify must be false */
assert(!s->is_blkverify);
- bdrv_drained_begin(bs);
+ snprintf(indexstr, INDEXSTR_LEN, "children.%u", s->next_child_index - 1);
+ if (!strncmp(child->name, indexstr, INDEXSTR_LEN)) {
+ s->next_child_index--;
+ }
/* We can safely remove this child now */
memmove(&s->children[i], &s->children[i + 1],
(s->num_children - i - 1) * sizeof(BdrvChild *));
s->children = g_renew(BdrvChild *, s->children, --s->num_children);
+
bdrv_unref_child(bs, child);
- bdrv_drained_end(bs);
+ quorum_refresh_flags(bs);
}
static void quorum_gather_child_options(BlockDriverState *bs, QDict *target,
uint64_t perm, uint64_t shared,
uint64_t *nperm, uint64_t *nshared)
{
+ BDRVQuorumState *s = bs->opaque;
+
*nperm = perm & DEFAULT_PERM_PASSTHROUGH;
+ if (s->rewrite_corrupted) {
+ *nperm |= BLK_PERM_WRITE;
+ }
/*
* We cannot share RESIZE or WRITE, as this would make the
| DEFAULT_PERM_UNCHANGED;
}
+/*
+ * Each one of the children can report different status flags even
+ * when they contain the same data, so what this function does is
+ * return BDRV_BLOCK_ZERO if *all* children agree that a certain
+ * region contains zeroes, and BDRV_BLOCK_DATA otherwise.
+ */
+static int coroutine_fn GRAPH_RDLOCK
+quorum_co_block_status(BlockDriverState *bs, bool want_zero,
+ int64_t offset, int64_t count,
+ int64_t *pnum, int64_t *map, BlockDriverState **file)
+{
+ BDRVQuorumState *s = bs->opaque;
+ int i, ret;
+ int64_t pnum_zero = count;
+ int64_t pnum_data = 0;
+
+ for (i = 0; i < s->num_children; i++) {
+ int64_t bytes;
+ ret = bdrv_co_common_block_status_above(s->children[i]->bs, NULL, false,
+ want_zero, offset, count,
+ &bytes, NULL, NULL, NULL);
+ if (ret < 0) {
+ quorum_report_bad(QUORUM_OP_TYPE_READ, offset, count,
+ s->children[i]->bs->node_name, ret);
+ pnum_data = count;
+ break;
+ }
+ /*
+ * Even if all children agree about whether there are zeroes
+ * or not at @offset they might disagree on the size, so use
+ * the smallest when reporting BDRV_BLOCK_ZERO and the largest
+ * when reporting BDRV_BLOCK_DATA.
+ */
+ if (ret & BDRV_BLOCK_ZERO) {
+ pnum_zero = MIN(pnum_zero, bytes);
+ } else {
+ pnum_data = MAX(pnum_data, bytes);
+ }
+ }
+
+ if (pnum_data) {
+ *pnum = pnum_data;
+ return BDRV_BLOCK_DATA;
+ } else {
+ *pnum = pnum_zero;
+ return BDRV_BLOCK_ZERO;
+ }
+}
+
static const char *const quorum_strong_runtime_opts[] = {
QUORUM_OPT_VOTE_THRESHOLD,
QUORUM_OPT_BLKVERIFY,
.bdrv_close = quorum_close,
.bdrv_gather_child_options = quorum_gather_child_options,
.bdrv_dirname = quorum_dirname,
+ .bdrv_co_block_status = quorum_co_block_status,
- .bdrv_co_flush_to_disk = quorum_co_flush,
+ .bdrv_co_flush = quorum_co_flush,
- .bdrv_getlength = quorum_getlength,
+ .bdrv_co_getlength = quorum_co_getlength,
.bdrv_co_preadv = quorum_co_preadv,
.bdrv_co_pwritev = quorum_co_pwritev,
+ .bdrv_co_pwrite_zeroes = quorum_co_pwrite_zeroes,
.bdrv_add_child = quorum_add_child,
.bdrv_del_child = quorum_del_child,