]> git.proxmox.com Git - mirror_qemu.git/blob - block/quorum.c
Merge remote-tracking branch 'remotes/qmp-unstable/queue/qmp' into staging
[mirror_qemu.git] / block / quorum.c
1 /*
2 * Quorum Block filter
3 *
4 * Copyright (C) 2012-2014 Nodalink, EURL.
5 *
6 * Author:
7 * BenoƮt Canet <benoit.canet@irqsave.net>
8 *
9 * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp)
10 * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc).
11 *
12 * This work is licensed under the terms of the GNU GPL, version 2 or later.
13 * See the COPYING file in the top-level directory.
14 */
15
16 #include <gnutls/gnutls.h>
17 #include <gnutls/crypto.h>
18 #include "block/block_int.h"
19 #include "qapi/qmp/qjson.h"
20 #include "qapi-event.h"
21
22 #define HASH_LENGTH 32
23
24 #define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold"
25 #define QUORUM_OPT_BLKVERIFY "blkverify"
26
27 /* This union holds a vote hash value */
28 typedef union QuorumVoteValue {
29 char h[HASH_LENGTH]; /* SHA-256 hash */
30 int64_t l; /* simpler 64 bits hash */
31 } QuorumVoteValue;
32
33 /* A vote item */
34 typedef struct QuorumVoteItem {
35 int index;
36 QLIST_ENTRY(QuorumVoteItem) next;
37 } QuorumVoteItem;
38
39 /* this structure is a vote version. A version is the set of votes sharing the
40 * same vote value.
41 * The set of votes will be tracked with the items field and its cardinality is
42 * vote_count.
43 */
44 typedef struct QuorumVoteVersion {
45 QuorumVoteValue value;
46 int index;
47 int vote_count;
48 QLIST_HEAD(, QuorumVoteItem) items;
49 QLIST_ENTRY(QuorumVoteVersion) next;
50 } QuorumVoteVersion;
51
52 /* this structure holds a group of vote versions together */
53 typedef struct QuorumVotes {
54 QLIST_HEAD(, QuorumVoteVersion) vote_list;
55 bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
56 } QuorumVotes;
57
58 /* the following structure holds the state of one quorum instance */
59 typedef struct BDRVQuorumState {
60 BlockDriverState **bs; /* children BlockDriverStates */
61 int num_children; /* children count */
62 int threshold; /* if less than threshold children reads gave the
63 * same result a quorum error occurs.
64 */
65 bool is_blkverify; /* true if the driver is in blkverify mode
66 * Writes are mirrored on two children devices.
67 * On reads the two children devices' contents are
68 * compared and if a difference is spotted its
69 * location is printed and the code aborts.
70 * It is useful to debug other block drivers by
71 * comparing them with a reference one.
72 */
73 } BDRVQuorumState;
74
75 typedef struct QuorumAIOCB QuorumAIOCB;
76
77 /* Quorum will create one instance of the following structure per operation it
78 * performs on its children.
79 * So for each read/write operation coming from the upper layer there will be
80 * $children_count QuorumChildRequest.
81 */
82 typedef struct QuorumChildRequest {
83 BlockDriverAIOCB *aiocb;
84 QEMUIOVector qiov;
85 uint8_t *buf;
86 int ret;
87 QuorumAIOCB *parent;
88 } QuorumChildRequest;
89
90 /* Quorum will use the following structure to track progress of each read/write
91 * operation received by the upper layer.
92 * This structure hold pointers to the QuorumChildRequest structures instances
93 * used to do operations on each children and track overall progress.
94 */
95 struct QuorumAIOCB {
96 BlockDriverAIOCB common;
97
98 /* Request metadata */
99 uint64_t sector_num;
100 int nb_sectors;
101
102 QEMUIOVector *qiov; /* calling IOV */
103
104 QuorumChildRequest *qcrs; /* individual child requests */
105 int count; /* number of completed AIOCB */
106 int success_count; /* number of successfully completed AIOCB */
107
108 QuorumVotes votes;
109
110 bool is_read;
111 int vote_ret;
112 };
113
114 static void quorum_vote(QuorumAIOCB *acb);
115
116 static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
117 {
118 QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
119 BDRVQuorumState *s = acb->common.bs->opaque;
120 int i;
121
122 /* cancel all callbacks */
123 for (i = 0; i < s->num_children; i++) {
124 bdrv_aio_cancel(acb->qcrs[i].aiocb);
125 }
126
127 g_free(acb->qcrs);
128 qemu_aio_release(acb);
129 }
130
131 static AIOCBInfo quorum_aiocb_info = {
132 .aiocb_size = sizeof(QuorumAIOCB),
133 .cancel = quorum_aio_cancel,
134 };
135
136 static void quorum_aio_finalize(QuorumAIOCB *acb)
137 {
138 BDRVQuorumState *s = acb->common.bs->opaque;
139 int i, ret = 0;
140
141 if (acb->vote_ret) {
142 ret = acb->vote_ret;
143 }
144
145 acb->common.cb(acb->common.opaque, ret);
146
147 if (acb->is_read) {
148 for (i = 0; i < s->num_children; i++) {
149 qemu_vfree(acb->qcrs[i].buf);
150 qemu_iovec_destroy(&acb->qcrs[i].qiov);
151 }
152 }
153
154 g_free(acb->qcrs);
155 qemu_aio_release(acb);
156 }
157
158 static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
159 {
160 return !memcmp(a->h, b->h, HASH_LENGTH);
161 }
162
163 static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
164 {
165 return a->l == b->l;
166 }
167
168 static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
169 BlockDriverState *bs,
170 QEMUIOVector *qiov,
171 uint64_t sector_num,
172 int nb_sectors,
173 BlockDriverCompletionFunc *cb,
174 void *opaque)
175 {
176 QuorumAIOCB *acb = qemu_aio_get(&quorum_aiocb_info, bs, cb, opaque);
177 int i;
178
179 acb->common.bs->opaque = s;
180 acb->sector_num = sector_num;
181 acb->nb_sectors = nb_sectors;
182 acb->qiov = qiov;
183 acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
184 acb->count = 0;
185 acb->success_count = 0;
186 acb->votes.compare = quorum_sha256_compare;
187 QLIST_INIT(&acb->votes.vote_list);
188 acb->is_read = false;
189 acb->vote_ret = 0;
190
191 for (i = 0; i < s->num_children; i++) {
192 acb->qcrs[i].buf = NULL;
193 acb->qcrs[i].ret = 0;
194 acb->qcrs[i].parent = acb;
195 }
196
197 return acb;
198 }
199
200 static void quorum_report_bad(QuorumAIOCB *acb, char *node_name, int ret)
201 {
202 const char *msg = NULL;
203 if (ret < 0) {
204 msg = strerror(-ret);
205 }
206 qapi_event_send_quorum_report_bad(!!msg, msg, node_name,
207 acb->sector_num, acb->nb_sectors, &error_abort);
208 }
209
210 static void quorum_report_failure(QuorumAIOCB *acb)
211 {
212 const char *reference = acb->common.bs->device_name[0] ?
213 acb->common.bs->device_name :
214 acb->common.bs->node_name;
215
216 qapi_event_send_quorum_failure(reference, acb->sector_num,
217 acb->nb_sectors, &error_abort);
218 }
219
220 static int quorum_vote_error(QuorumAIOCB *acb);
221
222 static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
223 {
224 BDRVQuorumState *s = acb->common.bs->opaque;
225
226 if (acb->success_count < s->threshold) {
227 acb->vote_ret = quorum_vote_error(acb);
228 quorum_report_failure(acb);
229 return true;
230 }
231
232 return false;
233 }
234
235 static void quorum_aio_cb(void *opaque, int ret)
236 {
237 QuorumChildRequest *sacb = opaque;
238 QuorumAIOCB *acb = sacb->parent;
239 BDRVQuorumState *s = acb->common.bs->opaque;
240
241 sacb->ret = ret;
242 acb->count++;
243 if (ret == 0) {
244 acb->success_count++;
245 } else {
246 quorum_report_bad(acb, sacb->aiocb->bs->node_name, ret);
247 }
248 assert(acb->count <= s->num_children);
249 assert(acb->success_count <= s->num_children);
250 if (acb->count < s->num_children) {
251 return;
252 }
253
254 /* Do the vote on read */
255 if (acb->is_read) {
256 quorum_vote(acb);
257 } else {
258 quorum_has_too_much_io_failed(acb);
259 }
260
261 quorum_aio_finalize(acb);
262 }
263
264 static void quorum_report_bad_versions(BDRVQuorumState *s,
265 QuorumAIOCB *acb,
266 QuorumVoteValue *value)
267 {
268 QuorumVoteVersion *version;
269 QuorumVoteItem *item;
270
271 QLIST_FOREACH(version, &acb->votes.vote_list, next) {
272 if (acb->votes.compare(&version->value, value)) {
273 continue;
274 }
275 QLIST_FOREACH(item, &version->items, next) {
276 quorum_report_bad(acb, s->bs[item->index]->node_name, 0);
277 }
278 }
279 }
280
281 static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
282 {
283 int i;
284 assert(dest->niov == source->niov);
285 assert(dest->size == source->size);
286 for (i = 0; i < source->niov; i++) {
287 assert(dest->iov[i].iov_len == source->iov[i].iov_len);
288 memcpy(dest->iov[i].iov_base,
289 source->iov[i].iov_base,
290 source->iov[i].iov_len);
291 }
292 }
293
294 static void quorum_count_vote(QuorumVotes *votes,
295 QuorumVoteValue *value,
296 int index)
297 {
298 QuorumVoteVersion *v = NULL, *version = NULL;
299 QuorumVoteItem *item;
300
301 /* look if we have something with this hash */
302 QLIST_FOREACH(v, &votes->vote_list, next) {
303 if (votes->compare(&v->value, value)) {
304 version = v;
305 break;
306 }
307 }
308
309 /* It's a version not yet in the list add it */
310 if (!version) {
311 version = g_new0(QuorumVoteVersion, 1);
312 QLIST_INIT(&version->items);
313 memcpy(&version->value, value, sizeof(version->value));
314 version->index = index;
315 version->vote_count = 0;
316 QLIST_INSERT_HEAD(&votes->vote_list, version, next);
317 }
318
319 version->vote_count++;
320
321 item = g_new0(QuorumVoteItem, 1);
322 item->index = index;
323 QLIST_INSERT_HEAD(&version->items, item, next);
324 }
325
326 static void quorum_free_vote_list(QuorumVotes *votes)
327 {
328 QuorumVoteVersion *version, *next_version;
329 QuorumVoteItem *item, *next_item;
330
331 QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
332 QLIST_REMOVE(version, next);
333 QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
334 QLIST_REMOVE(item, next);
335 g_free(item);
336 }
337 g_free(version);
338 }
339 }
340
341 static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
342 {
343 int j, ret;
344 gnutls_hash_hd_t dig;
345 QEMUIOVector *qiov = &acb->qcrs[i].qiov;
346
347 ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
348
349 if (ret < 0) {
350 return ret;
351 }
352
353 for (j = 0; j < qiov->niov; j++) {
354 ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
355 if (ret < 0) {
356 break;
357 }
358 }
359
360 gnutls_hash_deinit(dig, (void *) hash);
361 return ret;
362 }
363
364 static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
365 {
366 int max = 0;
367 QuorumVoteVersion *candidate, *winner = NULL;
368
369 QLIST_FOREACH(candidate, &votes->vote_list, next) {
370 if (candidate->vote_count > max) {
371 max = candidate->vote_count;
372 winner = candidate;
373 }
374 }
375
376 return winner;
377 }
378
379 /* qemu_iovec_compare is handy for blkverify mode because it returns the first
380 * differing byte location. Yet it is handcoded to compare vectors one byte
381 * after another so it does not benefit from the libc SIMD optimizations.
382 * quorum_iovec_compare is written for speed and should be used in the non
383 * blkverify mode of quorum.
384 */
385 static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
386 {
387 int i;
388 int result;
389
390 assert(a->niov == b->niov);
391 for (i = 0; i < a->niov; i++) {
392 assert(a->iov[i].iov_len == b->iov[i].iov_len);
393 result = memcmp(a->iov[i].iov_base,
394 b->iov[i].iov_base,
395 a->iov[i].iov_len);
396 if (result) {
397 return false;
398 }
399 }
400
401 return true;
402 }
403
404 static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
405 const char *fmt, ...)
406 {
407 va_list ap;
408
409 va_start(ap, fmt);
410 fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
411 acb->sector_num, acb->nb_sectors);
412 vfprintf(stderr, fmt, ap);
413 fprintf(stderr, "\n");
414 va_end(ap);
415 exit(1);
416 }
417
418 static bool quorum_compare(QuorumAIOCB *acb,
419 QEMUIOVector *a,
420 QEMUIOVector *b)
421 {
422 BDRVQuorumState *s = acb->common.bs->opaque;
423 ssize_t offset;
424
425 /* This driver will replace blkverify in this particular case */
426 if (s->is_blkverify) {
427 offset = qemu_iovec_compare(a, b);
428 if (offset != -1) {
429 quorum_err(acb, "contents mismatch in sector %" PRId64,
430 acb->sector_num +
431 (uint64_t)(offset / BDRV_SECTOR_SIZE));
432 }
433 return true;
434 }
435
436 return quorum_iovec_compare(a, b);
437 }
438
439 /* Do a vote to get the error code */
440 static int quorum_vote_error(QuorumAIOCB *acb)
441 {
442 BDRVQuorumState *s = acb->common.bs->opaque;
443 QuorumVoteVersion *winner = NULL;
444 QuorumVotes error_votes;
445 QuorumVoteValue result_value;
446 int i, ret = 0;
447 bool error = false;
448
449 QLIST_INIT(&error_votes.vote_list);
450 error_votes.compare = quorum_64bits_compare;
451
452 for (i = 0; i < s->num_children; i++) {
453 ret = acb->qcrs[i].ret;
454 if (ret) {
455 error = true;
456 result_value.l = ret;
457 quorum_count_vote(&error_votes, &result_value, i);
458 }
459 }
460
461 if (error) {
462 winner = quorum_get_vote_winner(&error_votes);
463 ret = winner->value.l;
464 }
465
466 quorum_free_vote_list(&error_votes);
467
468 return ret;
469 }
470
471 static void quorum_vote(QuorumAIOCB *acb)
472 {
473 bool quorum = true;
474 int i, j, ret;
475 QuorumVoteValue hash;
476 BDRVQuorumState *s = acb->common.bs->opaque;
477 QuorumVoteVersion *winner;
478
479 if (quorum_has_too_much_io_failed(acb)) {
480 return;
481 }
482
483 /* get the index of the first successful read */
484 for (i = 0; i < s->num_children; i++) {
485 if (!acb->qcrs[i].ret) {
486 break;
487 }
488 }
489
490 assert(i < s->num_children);
491
492 /* compare this read with all other successful reads stopping at quorum
493 * failure
494 */
495 for (j = i + 1; j < s->num_children; j++) {
496 if (acb->qcrs[j].ret) {
497 continue;
498 }
499 quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov);
500 if (!quorum) {
501 break;
502 }
503 }
504
505 /* Every successful read agrees */
506 if (quorum) {
507 quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov);
508 return;
509 }
510
511 /* compute hashes for each successful read, also store indexes */
512 for (i = 0; i < s->num_children; i++) {
513 if (acb->qcrs[i].ret) {
514 continue;
515 }
516 ret = quorum_compute_hash(acb, i, &hash);
517 /* if ever the hash computation failed */
518 if (ret < 0) {
519 acb->vote_ret = ret;
520 goto free_exit;
521 }
522 quorum_count_vote(&acb->votes, &hash, i);
523 }
524
525 /* vote to select the most represented version */
526 winner = quorum_get_vote_winner(&acb->votes);
527
528 /* if the winner count is smaller than threshold the read fails */
529 if (winner->vote_count < s->threshold) {
530 quorum_report_failure(acb);
531 acb->vote_ret = -EIO;
532 goto free_exit;
533 }
534
535 /* we have a winner: copy it */
536 quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov);
537
538 /* some versions are bad print them */
539 quorum_report_bad_versions(s, acb, &winner->value);
540
541 free_exit:
542 /* free lists */
543 quorum_free_vote_list(&acb->votes);
544 }
545
546 static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
547 int64_t sector_num,
548 QEMUIOVector *qiov,
549 int nb_sectors,
550 BlockDriverCompletionFunc *cb,
551 void *opaque)
552 {
553 BDRVQuorumState *s = bs->opaque;
554 QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num,
555 nb_sectors, cb, opaque);
556 int i;
557
558 acb->is_read = true;
559
560 for (i = 0; i < s->num_children; i++) {
561 acb->qcrs[i].buf = qemu_blockalign(s->bs[i], qiov->size);
562 qemu_iovec_init(&acb->qcrs[i].qiov, qiov->niov);
563 qemu_iovec_clone(&acb->qcrs[i].qiov, qiov, acb->qcrs[i].buf);
564 }
565
566 for (i = 0; i < s->num_children; i++) {
567 bdrv_aio_readv(s->bs[i], sector_num, &acb->qcrs[i].qiov, nb_sectors,
568 quorum_aio_cb, &acb->qcrs[i]);
569 }
570
571 return &acb->common;
572 }
573
574 static BlockDriverAIOCB *quorum_aio_writev(BlockDriverState *bs,
575 int64_t sector_num,
576 QEMUIOVector *qiov,
577 int nb_sectors,
578 BlockDriverCompletionFunc *cb,
579 void *opaque)
580 {
581 BDRVQuorumState *s = bs->opaque;
582 QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num, nb_sectors,
583 cb, opaque);
584 int i;
585
586 for (i = 0; i < s->num_children; i++) {
587 acb->qcrs[i].aiocb = bdrv_aio_writev(s->bs[i], sector_num, qiov,
588 nb_sectors, &quorum_aio_cb,
589 &acb->qcrs[i]);
590 }
591
592 return &acb->common;
593 }
594
595 static int64_t quorum_getlength(BlockDriverState *bs)
596 {
597 BDRVQuorumState *s = bs->opaque;
598 int64_t result;
599 int i;
600
601 /* check that all file have the same length */
602 result = bdrv_getlength(s->bs[0]);
603 if (result < 0) {
604 return result;
605 }
606 for (i = 1; i < s->num_children; i++) {
607 int64_t value = bdrv_getlength(s->bs[i]);
608 if (value < 0) {
609 return value;
610 }
611 if (value != result) {
612 return -EIO;
613 }
614 }
615
616 return result;
617 }
618
619 static void quorum_invalidate_cache(BlockDriverState *bs, Error **errp)
620 {
621 BDRVQuorumState *s = bs->opaque;
622 Error *local_err = NULL;
623 int i;
624
625 for (i = 0; i < s->num_children; i++) {
626 bdrv_invalidate_cache(s->bs[i], &local_err);
627 if (local_err) {
628 error_propagate(errp, local_err);
629 return;
630 }
631 }
632 }
633
634 static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
635 {
636 BDRVQuorumState *s = bs->opaque;
637 QuorumVoteVersion *winner = NULL;
638 QuorumVotes error_votes;
639 QuorumVoteValue result_value;
640 int i;
641 int result = 0;
642
643 QLIST_INIT(&error_votes.vote_list);
644 error_votes.compare = quorum_64bits_compare;
645
646 for (i = 0; i < s->num_children; i++) {
647 result = bdrv_co_flush(s->bs[i]);
648 result_value.l = result;
649 quorum_count_vote(&error_votes, &result_value, i);
650 }
651
652 winner = quorum_get_vote_winner(&error_votes);
653 result = winner->value.l;
654
655 quorum_free_vote_list(&error_votes);
656
657 return result;
658 }
659
660 static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs,
661 BlockDriverState *candidate)
662 {
663 BDRVQuorumState *s = bs->opaque;
664 int i;
665
666 for (i = 0; i < s->num_children; i++) {
667 bool perm = bdrv_recurse_is_first_non_filter(s->bs[i],
668 candidate);
669 if (perm) {
670 return true;
671 }
672 }
673
674 return false;
675 }
676
677 static int quorum_valid_threshold(int threshold, int num_children, Error **errp)
678 {
679
680 if (threshold < 1) {
681 error_set(errp, QERR_INVALID_PARAMETER_VALUE,
682 "vote-threshold", "value >= 1");
683 return -ERANGE;
684 }
685
686 if (threshold > num_children) {
687 error_setg(errp, "threshold may not exceed children count");
688 return -ERANGE;
689 }
690
691 return 0;
692 }
693
694 static QemuOptsList quorum_runtime_opts = {
695 .name = "quorum",
696 .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head),
697 .desc = {
698 {
699 .name = QUORUM_OPT_VOTE_THRESHOLD,
700 .type = QEMU_OPT_NUMBER,
701 .help = "The number of vote needed for reaching quorum",
702 },
703 {
704 .name = QUORUM_OPT_BLKVERIFY,
705 .type = QEMU_OPT_BOOL,
706 .help = "Trigger block verify mode if set",
707 },
708 { /* end of list */ }
709 },
710 };
711
712 static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
713 Error **errp)
714 {
715 BDRVQuorumState *s = bs->opaque;
716 Error *local_err = NULL;
717 QemuOpts *opts;
718 bool *opened;
719 QDict *sub = NULL;
720 QList *list = NULL;
721 const QListEntry *lentry;
722 int i;
723 int ret = 0;
724
725 qdict_flatten(options);
726 qdict_extract_subqdict(options, &sub, "children.");
727 qdict_array_split(sub, &list);
728
729 if (qdict_size(sub)) {
730 error_setg(&local_err, "Invalid option children.%s",
731 qdict_first(sub)->key);
732 ret = -EINVAL;
733 goto exit;
734 }
735
736 /* count how many different children are present */
737 s->num_children = qlist_size(list);
738 if (s->num_children < 2) {
739 error_setg(&local_err,
740 "Number of provided children must be greater than 1");
741 ret = -EINVAL;
742 goto exit;
743 }
744
745 opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort);
746 qemu_opts_absorb_qdict(opts, options, &local_err);
747 if (local_err) {
748 ret = -EINVAL;
749 goto exit;
750 }
751
752 s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0);
753
754 /* and validate it against s->num_children */
755 ret = quorum_valid_threshold(s->threshold, s->num_children, &local_err);
756 if (ret < 0) {
757 goto exit;
758 }
759
760 /* is the driver in blkverify mode */
761 if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false) &&
762 s->num_children == 2 && s->threshold == 2) {
763 s->is_blkverify = true;
764 } else if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false)) {
765 fprintf(stderr, "blkverify mode is set by setting blkverify=on "
766 "and using two files with vote_threshold=2\n");
767 }
768
769 /* allocate the children BlockDriverState array */
770 s->bs = g_new0(BlockDriverState *, s->num_children);
771 opened = g_new0(bool, s->num_children);
772
773 for (i = 0, lentry = qlist_first(list); lentry;
774 lentry = qlist_next(lentry), i++) {
775 QDict *d;
776 QString *string;
777
778 switch (qobject_type(lentry->value))
779 {
780 /* List of options */
781 case QTYPE_QDICT:
782 d = qobject_to_qdict(lentry->value);
783 QINCREF(d);
784 ret = bdrv_open(&s->bs[i], NULL, NULL, d, flags, NULL,
785 &local_err);
786 break;
787
788 /* QMP reference */
789 case QTYPE_QSTRING:
790 string = qobject_to_qstring(lentry->value);
791 ret = bdrv_open(&s->bs[i], NULL, qstring_get_str(string), NULL,
792 flags, NULL, &local_err);
793 break;
794
795 default:
796 error_setg(&local_err, "Specification of child block device %i "
797 "is invalid", i);
798 ret = -EINVAL;
799 }
800
801 if (ret < 0) {
802 goto close_exit;
803 }
804 opened[i] = true;
805 }
806
807 g_free(opened);
808 goto exit;
809
810 close_exit:
811 /* cleanup on error */
812 for (i = 0; i < s->num_children; i++) {
813 if (!opened[i]) {
814 continue;
815 }
816 bdrv_unref(s->bs[i]);
817 }
818 g_free(s->bs);
819 g_free(opened);
820 exit:
821 /* propagate error */
822 if (local_err) {
823 error_propagate(errp, local_err);
824 }
825 QDECREF(list);
826 QDECREF(sub);
827 return ret;
828 }
829
830 static void quorum_close(BlockDriverState *bs)
831 {
832 BDRVQuorumState *s = bs->opaque;
833 int i;
834
835 for (i = 0; i < s->num_children; i++) {
836 bdrv_unref(s->bs[i]);
837 }
838
839 g_free(s->bs);
840 }
841
842 static void quorum_detach_aio_context(BlockDriverState *bs)
843 {
844 BDRVQuorumState *s = bs->opaque;
845 int i;
846
847 for (i = 0; i < s->num_children; i++) {
848 bdrv_detach_aio_context(s->bs[i]);
849 }
850 }
851
852 static void quorum_attach_aio_context(BlockDriverState *bs,
853 AioContext *new_context)
854 {
855 BDRVQuorumState *s = bs->opaque;
856 int i;
857
858 for (i = 0; i < s->num_children; i++) {
859 bdrv_attach_aio_context(s->bs[i], new_context);
860 }
861 }
862
863 static BlockDriver bdrv_quorum = {
864 .format_name = "quorum",
865 .protocol_name = "quorum",
866
867 .instance_size = sizeof(BDRVQuorumState),
868
869 .bdrv_file_open = quorum_open,
870 .bdrv_close = quorum_close,
871
872 .bdrv_co_flush_to_disk = quorum_co_flush,
873
874 .bdrv_getlength = quorum_getlength,
875
876 .bdrv_aio_readv = quorum_aio_readv,
877 .bdrv_aio_writev = quorum_aio_writev,
878 .bdrv_invalidate_cache = quorum_invalidate_cache,
879
880 .bdrv_detach_aio_context = quorum_detach_aio_context,
881 .bdrv_attach_aio_context = quorum_attach_aio_context,
882
883 .is_filter = true,
884 .bdrv_recurse_is_first_non_filter = quorum_recurse_is_first_non_filter,
885 };
886
887 static void bdrv_quorum_init(void)
888 {
889 bdrv_register(&bdrv_quorum);
890 }
891
892 block_init(bdrv_quorum_init);