]> git.proxmox.com Git - mirror_qemu.git/blobdiff - block/quorum.c
block: Mark bdrv_add/del_child() and caller GRAPH_WRLOCK
[mirror_qemu.git] / block / quorum.c
index 7cf7ab15467441708e7b04e7bcba353afa26c671..05220cab7f01d0cee4fddafaa14bffee5e9ba829 100644 (file)
@@ -17,7 +17,9 @@
 #include "qemu/cutils.h"
 #include "qemu/module.h"
 #include "qemu/option.h"
+#include "qemu/memalign.h"
 #include "block/block_int.h"
+#include "block/coroutines.h"
 #include "block/qdict.h"
 #include "qapi/error.h"
 #include "qapi/qapi-events-block.h"
@@ -29,6 +31,8 @@
 
 #define HASH_LENGTH 32
 
+#define INDEXSTR_LEN 32
+
 #define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold"
 #define QUORUM_OPT_BLKVERIFY      "blkverify"
 #define QUORUM_OPT_REWRITE        "rewrite-corrupted"
@@ -157,11 +161,10 @@ static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
     return a->l == b->l;
 }
 
-static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
-                                   QEMUIOVector *qiov,
-                                   uint64_t offset,
-                                   uint64_t bytes,
-                                   int flags)
+static QuorumAIOCB *coroutine_fn quorum_aio_get(BlockDriverState *bs,
+                                                QEMUIOVector *qiov,
+                                                uint64_t offset, uint64_t bytes,
+                                                int flags)
 {
     BDRVQuorumState *s = bs->opaque;
     QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
@@ -199,7 +202,7 @@ static void quorum_report_bad(QuorumOpType type, uint64_t offset,
         msg = strerror(-ret);
     }
 
-    qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector,
+    qapi_event_send_quorum_report_bad(type, msg, node_name, start_sector,
                                       end_sector - start_sector);
 }
 
@@ -229,8 +232,6 @@ static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
     return false;
 }
 
-static int read_fifo_child(QuorumAIOCB *acb);
-
 static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
 {
     int i;
@@ -269,7 +270,11 @@ static void quorum_report_bad_versions(BDRVQuorumState *s,
     }
 }
 
-static void quorum_rewrite_entry(void *opaque)
+/*
+ * This function can count as GRAPH_RDLOCK because read_quorum_children() holds
+ * the graph lock and keeps it until this coroutine has terminated.
+ */
+static void coroutine_fn GRAPH_RDLOCK quorum_rewrite_entry(void *opaque)
 {
     QuorumCo *co = opaque;
     QuorumAIOCB *acb = co->acb;
@@ -289,8 +294,8 @@ static void quorum_rewrite_entry(void *opaque)
     }
 }
 
-static bool quorum_rewrite_bad_versions(QuorumAIOCB *acb,
-                                        QuorumVoteValue *value)
+static bool coroutine_fn GRAPH_RDLOCK
+quorum_rewrite_bad_versions(QuorumAIOCB *acb, QuorumVoteValue *value)
 {
     QuorumVoteVersion *version;
     QuorumVoteItem *item;
@@ -490,7 +495,7 @@ static int quorum_vote_error(QuorumAIOCB *acb)
     return ret;
 }
 
-static void quorum_vote(QuorumAIOCB *acb)
+static void coroutine_fn GRAPH_RDLOCK quorum_vote(QuorumAIOCB *acb)
 {
     bool quorum = true;
     int i, j, ret;
@@ -570,7 +575,11 @@ free_exit:
     quorum_free_vote_list(&acb->votes);
 }
 
-static void read_quorum_children_entry(void *opaque)
+/*
+ * This function can count as GRAPH_RDLOCK because read_quorum_children() holds
+ * the graph lock and keeps it until this coroutine has terminated.
+ */
+static void coroutine_fn GRAPH_RDLOCK read_quorum_children_entry(void *opaque)
 {
     QuorumCo *co = opaque;
     QuorumAIOCB *acb = co->acb;
@@ -598,7 +607,7 @@ static void read_quorum_children_entry(void *opaque)
     }
 }
 
-static int read_quorum_children(QuorumAIOCB *acb)
+static int coroutine_fn GRAPH_RDLOCK read_quorum_children(QuorumAIOCB *acb)
 {
     BDRVQuorumState *s = acb->bs->opaque;
     int i;
@@ -639,7 +648,7 @@ static int read_quorum_children(QuorumAIOCB *acb)
     return acb->vote_ret;
 }
 
-static int read_fifo_child(QuorumAIOCB *acb)
+static int coroutine_fn GRAPH_RDLOCK read_fifo_child(QuorumAIOCB *acb)
 {
     BDRVQuorumState *s = acb->bs->opaque;
     int n, ret;
@@ -660,8 +669,9 @@ static int read_fifo_child(QuorumAIOCB *acb)
     return ret;
 }
 
-static int quorum_co_preadv(BlockDriverState *bs, uint64_t offset,
-                            uint64_t bytes, QEMUIOVector *qiov, int flags)
+static int coroutine_fn GRAPH_RDLOCK
+quorum_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
+                 QEMUIOVector *qiov, BdrvRequestFlags flags)
 {
     BDRVQuorumState *s = bs->opaque;
     QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
@@ -680,7 +690,11 @@ static int quorum_co_preadv(BlockDriverState *bs, uint64_t offset,
     return ret;
 }
 
-static void write_quorum_entry(void *opaque)
+/*
+ * This function can count as GRAPH_RDLOCK because quorum_co_pwritev() holds the
+ * graph lock and keeps it until this coroutine has terminated.
+ */
+static void coroutine_fn GRAPH_RDLOCK write_quorum_entry(void *opaque)
 {
     QuorumCo *co = opaque;
     QuorumAIOCB *acb = co->acb;
@@ -689,8 +703,13 @@ static void write_quorum_entry(void *opaque)
     QuorumChildRequest *sacb = &acb->qcrs[i];
 
     sacb->bs = s->children[i]->bs;
-    sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes,
-                                acb->qiov, acb->flags);
+    if (acb->flags & BDRV_REQ_ZERO_WRITE) {
+        sacb->ret = bdrv_co_pwrite_zeroes(s->children[i], acb->offset,
+                                          acb->bytes, acb->flags);
+    } else {
+        sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes,
+                                    acb->qiov, acb->flags);
+    }
     if (sacb->ret == 0) {
         acb->success_count++;
     } else {
@@ -706,8 +725,9 @@ static void write_quorum_entry(void *opaque)
     }
 }
 
-static int quorum_co_pwritev(BlockDriverState *bs, uint64_t offset,
-                             uint64_t bytes, QEMUIOVector *qiov, int flags)
+static int coroutine_fn GRAPH_RDLOCK
+quorum_co_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
+                  QEMUIOVector *qiov, BdrvRequestFlags flags)
 {
     BDRVQuorumState *s = bs->opaque;
     QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
@@ -736,19 +756,28 @@ static int quorum_co_pwritev(BlockDriverState *bs, uint64_t offset,
     return ret;
 }
 
-static int64_t quorum_getlength(BlockDriverState *bs)
+static int coroutine_fn GRAPH_RDLOCK
+quorum_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset, int64_t bytes,
+                        BdrvRequestFlags flags)
+{
+    return quorum_co_pwritev(bs, offset, bytes, NULL,
+                             flags | BDRV_REQ_ZERO_WRITE);
+}
+
+static int64_t coroutine_fn GRAPH_RDLOCK
+quorum_co_getlength(BlockDriverState *bs)
 {
     BDRVQuorumState *s = bs->opaque;
     int64_t result;
     int i;
 
     /* check that all file have the same length */
-    result = bdrv_getlength(s->children[0]->bs);
+    result = bdrv_co_getlength(s->children[0]->bs);
     if (result < 0) {
         return result;
     }
     for (i = 1; i < s->num_children; i++) {
-        int64_t value = bdrv_getlength(s->children[i]->bs);
+        int64_t value = bdrv_co_getlength(s->children[i]->bs);
         if (value < 0) {
             return value;
         }
@@ -760,7 +789,7 @@ static int64_t quorum_getlength(BlockDriverState *bs)
     return result;
 }
 
-static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
+static coroutine_fn GRAPH_RDLOCK int quorum_co_flush(BlockDriverState *bs)
 {
     BDRVQuorumState *s = bs->opaque;
     QuorumVoteVersion *winner = NULL;
@@ -796,8 +825,8 @@ static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
     return result;
 }
 
-static bool quorum_recurse_can_replace(BlockDriverState *bs,
-                                       BlockDriverState *to_replace)
+static bool GRAPH_RDLOCK
+quorum_recurse_can_replace(BlockDriverState *bs, BlockDriverState *to_replace)
 {
     BDRVQuorumState *s = bs->opaque;
     int i;
@@ -854,7 +883,7 @@ static int quorum_valid_threshold(int threshold, int num_children, Error **errp)
 
     if (threshold < 1) {
         error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
-                   "vote-threshold", "value >= 1");
+                   "vote-threshold", "value >= 1");
         return -ERANGE;
     }
 
@@ -894,11 +923,25 @@ static QemuOptsList quorum_runtime_opts = {
     },
 };
 
+static void quorum_refresh_flags(BlockDriverState *bs)
+{
+    BDRVQuorumState *s = bs->opaque;
+    int i;
+
+    bs->supported_zero_flags =
+        BDRV_REQ_FUA | BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
+
+    for (i = 0; i < s->num_children; i++) {
+        bs->supported_zero_flags &= s->children[i]->bs->supported_zero_flags;
+    }
+
+    bs->supported_zero_flags |= BDRV_REQ_WRITE_UNCHANGED;
+}
+
 static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
                        Error **errp)
 {
     BDRVQuorumState *s = bs->opaque;
-    Error *local_err = NULL;
     QemuOpts *opts = NULL;
     const char *pattern_str;
     bool *opened;
@@ -910,27 +953,25 @@ static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
     /* count how many different children are present */
     s->num_children = qdict_array_entries(options, "children.");
     if (s->num_children < 0) {
-        error_setg(&local_err, "Option children is not a valid array");
+        error_setg(errp, "Option children is not a valid array");
         ret = -EINVAL;
         goto exit;
     }
     if (s->num_children < 1) {
-        error_setg(&local_err,
-                   "Number of provided children must be 1 or more");
+        error_setg(errp, "Number of provided children must be 1 or more");
         ret = -EINVAL;
         goto exit;
     }
 
     opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort);
-    qemu_opts_absorb_qdict(opts, options, &local_err);
-    if (local_err) {
+    if (!qemu_opts_absorb_qdict(opts, options, errp)) {
         ret = -EINVAL;
         goto exit;
     }
 
     s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0);
     /* and validate it against s->num_children */
-    ret = quorum_valid_threshold(s->threshold, s->num_children, &local_err);
+    ret = quorum_valid_threshold(s->threshold, s->num_children, errp);
     if (ret < 0) {
         goto exit;
     }
@@ -943,7 +984,7 @@ static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
                               -EINVAL, NULL);
     }
     if (ret < 0) {
-        error_setg(&local_err, "Please set read-pattern as fifo or quorum");
+        error_setg(errp, "Please set read-pattern as fifo or quorum");
         goto exit;
     }
     s->read_pattern = ret;
@@ -951,7 +992,7 @@ static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
     if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
         s->is_blkverify = qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false);
         if (s->is_blkverify && (s->num_children != 2 || s->threshold != 2)) {
-            error_setg(&local_err, "blkverify=on can only be set if there are "
+            error_setg(errp, "blkverify=on can only be set if there are "
                        "exactly two files and vote-threshold is 2");
             ret = -EINVAL;
             goto exit;
@@ -960,7 +1001,7 @@ static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
         s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE,
                                                  false);
         if (s->rewrite_corrupted && s->is_blkverify) {
-            error_setg(&local_err,
+            error_setg(errp,
                        "rewrite-corrupted=on cannot be used with blkverify=on");
             ret = -EINVAL;
             goto exit;
@@ -972,14 +1013,14 @@ static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
     opened = g_new0(bool, s->num_children);
 
     for (i = 0; i < s->num_children; i++) {
-        char indexstr[32];
-        ret = snprintf(indexstr, 32, "children.%d", i);
-        assert(ret < 32);
+        char indexstr[INDEXSTR_LEN];
+        ret = snprintf(indexstr, INDEXSTR_LEN, "children.%d", i);
+        assert(ret < INDEXSTR_LEN);
 
         s->children[i] = bdrv_open_child(NULL, options, indexstr, bs,
                                          &child_of_bds, BDRV_CHILD_DATA, false,
-                                         &local_err);
-        if (local_err) {
+                                         errp);
+        if (!s->children[i]) {
             ret = -EINVAL;
             goto close_exit;
         }
@@ -989,24 +1030,25 @@ static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
     s->next_child_index = s->num_children;
 
     bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
+    quorum_refresh_flags(bs);
 
     g_free(opened);
     goto exit;
 
 close_exit:
     /* cleanup on error */
+    bdrv_graph_wrlock(NULL);
     for (i = 0; i < s->num_children; i++) {
         if (!opened[i]) {
             continue;
         }
         bdrv_unref_child(bs, s->children[i]);
     }
+    bdrv_graph_wrunlock();
     g_free(s->children);
     g_free(opened);
 exit:
     qemu_opts_del(opts);
-    /* propagate error */
-    error_propagate(errp, local_err);
     return ret;
 }
 
@@ -1015,19 +1057,21 @@ static void quorum_close(BlockDriverState *bs)
     BDRVQuorumState *s = bs->opaque;
     int i;
 
+    bdrv_graph_wrlock(NULL);
     for (i = 0; i < s->num_children; i++) {
         bdrv_unref_child(bs, s->children[i]);
     }
+    bdrv_graph_wrunlock();
 
     g_free(s->children);
 }
 
-static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs,
-                             Error **errp)
+static void GRAPH_WRLOCK
+quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs, Error **errp)
 {
     BDRVQuorumState *s = bs->opaque;
     BdrvChild *child;
-    char indexstr[32];
+    char indexstr[INDEXSTR_LEN];
     int ret;
 
     if (s->is_blkverify) {
@@ -1042,15 +1086,13 @@ static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs,
         return;
     }
 
-    ret = snprintf(indexstr, 32, "children.%u", s->next_child_index);
-    if (ret < 0 || ret >= 32) {
+    ret = snprintf(indexstr, INDEXSTR_LEN, "children.%u", s->next_child_index);
+    if (ret < 0 || ret >= INDEXSTR_LEN) {
         error_setg(errp, "cannot generate child name");
         return;
     }
     s->next_child_index++;
 
-    bdrv_drained_begin(bs);
-
     /* We can safely add the child now */
     bdrv_ref(child_bs);
 
@@ -1058,19 +1100,18 @@ static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs,
                               BDRV_CHILD_DATA, errp);
     if (child == NULL) {
         s->next_child_index--;
-        goto out;
+        return;
     }
     s->children = g_renew(BdrvChild *, s->children, s->num_children + 1);
     s->children[s->num_children++] = child;
-
-out:
-    bdrv_drained_end(bs);
+    quorum_refresh_flags(bs);
 }
 
-static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
-                             Error **errp)
+static void GRAPH_WRLOCK
+quorum_del_child(BlockDriverState *bs, BdrvChild *child, Error **errp)
 {
     BDRVQuorumState *s = bs->opaque;
+    char indexstr[INDEXSTR_LEN];
     int i;
 
     for (i = 0; i < s->num_children; i++) {
@@ -1092,15 +1133,19 @@ static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
     /* We know now that num_children > threshold, so blkverify must be false */
     assert(!s->is_blkverify);
 
-    bdrv_drained_begin(bs);
+    snprintf(indexstr, INDEXSTR_LEN, "children.%u", s->next_child_index - 1);
+    if (!strncmp(child->name, indexstr, INDEXSTR_LEN)) {
+        s->next_child_index--;
+    }
 
     /* We can safely remove this child now */
     memmove(&s->children[i], &s->children[i + 1],
             (s->num_children - i - 1) * sizeof(BdrvChild *));
     s->children = g_renew(BdrvChild *, s->children, --s->num_children);
+
     bdrv_unref_child(bs, child);
 
-    bdrv_drained_end(bs);
+    quorum_refresh_flags(bs);
 }
 
 static void quorum_gather_child_options(BlockDriverState *bs, QDict *target,
@@ -1158,7 +1203,12 @@ static void quorum_child_perm(BlockDriverState *bs, BdrvChild *c,
                               uint64_t perm, uint64_t shared,
                               uint64_t *nperm, uint64_t *nshared)
 {
+    BDRVQuorumState *s = bs->opaque;
+
     *nperm = perm & DEFAULT_PERM_PASSTHROUGH;
+    if (s->rewrite_corrupted) {
+        *nperm |= BLK_PERM_WRITE;
+    }
 
     /*
      * We cannot share RESIZE or WRITE, as this would make the
@@ -1169,6 +1219,55 @@ static void quorum_child_perm(BlockDriverState *bs, BdrvChild *c,
              | DEFAULT_PERM_UNCHANGED;
 }
 
+/*
+ * Each one of the children can report different status flags even
+ * when they contain the same data, so what this function does is
+ * return BDRV_BLOCK_ZERO if *all* children agree that a certain
+ * region contains zeroes, and BDRV_BLOCK_DATA otherwise.
+ */
+static int coroutine_fn GRAPH_RDLOCK
+quorum_co_block_status(BlockDriverState *bs, bool want_zero,
+                       int64_t offset, int64_t count,
+                       int64_t *pnum, int64_t *map, BlockDriverState **file)
+{
+    BDRVQuorumState *s = bs->opaque;
+    int i, ret;
+    int64_t pnum_zero = count;
+    int64_t pnum_data = 0;
+
+    for (i = 0; i < s->num_children; i++) {
+        int64_t bytes;
+        ret = bdrv_co_common_block_status_above(s->children[i]->bs, NULL, false,
+                                                want_zero, offset, count,
+                                                &bytes, NULL, NULL, NULL);
+        if (ret < 0) {
+            quorum_report_bad(QUORUM_OP_TYPE_READ, offset, count,
+                              s->children[i]->bs->node_name, ret);
+            pnum_data = count;
+            break;
+        }
+        /*
+         * Even if all children agree about whether there are zeroes
+         * or not at @offset they might disagree on the size, so use
+         * the smallest when reporting BDRV_BLOCK_ZERO and the largest
+         * when reporting BDRV_BLOCK_DATA.
+         */
+        if (ret & BDRV_BLOCK_ZERO) {
+            pnum_zero = MIN(pnum_zero, bytes);
+        } else {
+            pnum_data = MAX(pnum_data, bytes);
+        }
+    }
+
+    if (pnum_data) {
+        *pnum = pnum_data;
+        return BDRV_BLOCK_DATA;
+    } else {
+        *pnum = pnum_zero;
+        return BDRV_BLOCK_ZERO;
+    }
+}
+
 static const char *const quorum_strong_runtime_opts[] = {
     QUORUM_OPT_VOTE_THRESHOLD,
     QUORUM_OPT_BLKVERIFY,
@@ -1187,13 +1286,15 @@ static BlockDriver bdrv_quorum = {
     .bdrv_close                         = quorum_close,
     .bdrv_gather_child_options          = quorum_gather_child_options,
     .bdrv_dirname                       = quorum_dirname,
+    .bdrv_co_block_status               = quorum_co_block_status,
 
-    .bdrv_co_flush_to_disk              = quorum_co_flush,
+    .bdrv_co_flush                      = quorum_co_flush,
 
-    .bdrv_getlength                     = quorum_getlength,
+    .bdrv_co_getlength                  = quorum_co_getlength,
 
     .bdrv_co_preadv                     = quorum_co_preadv,
     .bdrv_co_pwritev                    = quorum_co_pwritev,
+    .bdrv_co_pwrite_zeroes              = quorum_co_pwrite_zeroes,
 
     .bdrv_add_child                     = quorum_add_child,
     .bdrv_del_child                     = quorum_del_child,