]> git.proxmox.com Git - qemu.git/blob - block/rbd.c
rbd: update comment heading
[qemu.git] / block / rbd.c
1 /*
2 * QEMU Block driver for RADOS (Ceph)
3 *
4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5 * Josh Durgin <josh.durgin@dreamhost.com>
6 *
7 * This work is licensed under the terms of the GNU GPL, version 2. See
8 * the COPYING file in the top-level directory.
9 *
10 */
11
12 #include <inttypes.h>
13
14 #include "qemu-common.h"
15 #include "qemu-error.h"
16 #include "block_int.h"
17
18 #include <rbd/librbd.h>
19
20 /*
21 * When specifying the image filename use:
22 *
23 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
24 *
25 * poolname must be the name of an existing rados pool.
26 *
27 * devicename is the name of the rbd image.
28 *
29 * Each option given is used to configure rados, and may be any valid
30 * Ceph option, "id", or "conf".
31 *
32 * The "id" option indicates what user we should authenticate as to
33 * the Ceph cluster. If it is excluded we will use the Ceph default
34 * (normally 'admin').
35 *
36 * The "conf" option specifies a Ceph configuration file to read. If
37 * it is not specified, we will read from the default Ceph locations
38 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration
39 * file, specify conf=/dev/null.
40 *
41 * Configuration values containing :, @, or = can be escaped with a
42 * leading "\".
43 */
44
45 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
46
47 #define RBD_MAX_CONF_NAME_SIZE 128
48 #define RBD_MAX_CONF_VAL_SIZE 512
49 #define RBD_MAX_CONF_SIZE 1024
50 #define RBD_MAX_POOL_NAME_SIZE 128
51 #define RBD_MAX_SNAP_NAME_SIZE 128
52 #define RBD_MAX_SNAPS 100
53
54 typedef struct RBDAIOCB {
55 BlockDriverAIOCB common;
56 QEMUBH *bh;
57 int ret;
58 QEMUIOVector *qiov;
59 char *bounce;
60 int write;
61 int64_t sector_num;
62 int error;
63 struct BDRVRBDState *s;
64 int cancelled;
65 } RBDAIOCB;
66
67 typedef struct RADOSCB {
68 int rcbid;
69 RBDAIOCB *acb;
70 struct BDRVRBDState *s;
71 int done;
72 int64_t size;
73 char *buf;
74 int ret;
75 } RADOSCB;
76
77 #define RBD_FD_READ 0
78 #define RBD_FD_WRITE 1
79
80 typedef struct BDRVRBDState {
81 int fds[2];
82 rados_t cluster;
83 rados_ioctx_t io_ctx;
84 rbd_image_t image;
85 char name[RBD_MAX_IMAGE_NAME_SIZE];
86 int qemu_aio_count;
87 char *snap;
88 int event_reader_pos;
89 RADOSCB *event_rcb;
90 } BDRVRBDState;
91
92 static void rbd_aio_bh_cb(void *opaque);
93
94 static int qemu_rbd_next_tok(char *dst, int dst_len,
95 char *src, char delim,
96 const char *name,
97 char **p)
98 {
99 int l;
100 char *end;
101
102 *p = NULL;
103
104 if (delim != '\0') {
105 end = strchr(src, delim);
106 if (end) {
107 *p = end + 1;
108 *end = '\0';
109 }
110 }
111 l = strlen(src);
112 if (l >= dst_len) {
113 error_report("%s too long", name);
114 return -EINVAL;
115 } else if (l == 0) {
116 error_report("%s too short", name);
117 return -EINVAL;
118 }
119
120 pstrcpy(dst, dst_len, src);
121
122 return 0;
123 }
124
125 static int qemu_rbd_parsename(const char *filename,
126 char *pool, int pool_len,
127 char *snap, int snap_len,
128 char *name, int name_len,
129 char *conf, int conf_len)
130 {
131 const char *start;
132 char *p, *buf;
133 int ret;
134
135 if (!strstart(filename, "rbd:", &start)) {
136 return -EINVAL;
137 }
138
139 buf = g_strdup(start);
140 p = buf;
141 *snap = '\0';
142 *conf = '\0';
143
144 ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
145 if (ret < 0 || !p) {
146 ret = -EINVAL;
147 goto done;
148 }
149
150 if (strchr(p, '@')) {
151 ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
152 if (ret < 0) {
153 goto done;
154 }
155 ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p);
156 } else {
157 ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p);
158 }
159 if (ret < 0 || !p) {
160 goto done;
161 }
162
163 ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p);
164
165 done:
166 g_free(buf);
167 return ret;
168 }
169
170 static char *qemu_rbd_parse_clientname(const char *conf, char *clientname)
171 {
172 const char *p = conf;
173
174 while (*p) {
175 int len;
176 const char *end = strchr(p, ':');
177
178 if (end) {
179 len = end - p;
180 } else {
181 len = strlen(p);
182 }
183
184 if (strncmp(p, "id=", 3) == 0) {
185 len -= 3;
186 strncpy(clientname, p + 3, len);
187 clientname[len] = '\0';
188 return clientname;
189 }
190 if (end == NULL) {
191 break;
192 }
193 p = end + 1;
194 }
195 return NULL;
196 }
197
198 static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
199 {
200 char *p, *buf;
201 char name[RBD_MAX_CONF_NAME_SIZE];
202 char value[RBD_MAX_CONF_VAL_SIZE];
203 int ret = 0;
204
205 buf = g_strdup(conf);
206 p = buf;
207
208 while (p) {
209 ret = qemu_rbd_next_tok(name, sizeof(name), p,
210 '=', "conf option name", &p);
211 if (ret < 0) {
212 break;
213 }
214
215 if (!p) {
216 error_report("conf option %s has no value", name);
217 ret = -EINVAL;
218 break;
219 }
220
221 ret = qemu_rbd_next_tok(value, sizeof(value), p,
222 ':', "conf option value", &p);
223 if (ret < 0) {
224 break;
225 }
226
227 if (strcmp(name, "conf") == 0) {
228 ret = rados_conf_read_file(cluster, value);
229 if (ret < 0) {
230 error_report("error reading conf file %s", value);
231 break;
232 }
233 } else if (strcmp(name, "id") == 0) {
234 /* ignore, this is parsed by qemu_rbd_parse_clientname() */
235 } else {
236 ret = rados_conf_set(cluster, name, value);
237 if (ret < 0) {
238 error_report("invalid conf option %s", name);
239 ret = -EINVAL;
240 break;
241 }
242 }
243 }
244
245 g_free(buf);
246 return ret;
247 }
248
249 static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options)
250 {
251 int64_t bytes = 0;
252 int64_t objsize;
253 int obj_order = 0;
254 char pool[RBD_MAX_POOL_NAME_SIZE];
255 char name[RBD_MAX_IMAGE_NAME_SIZE];
256 char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
257 char conf[RBD_MAX_CONF_SIZE];
258 char clientname_buf[RBD_MAX_CONF_SIZE];
259 char *clientname;
260 rados_t cluster;
261 rados_ioctx_t io_ctx;
262 int ret;
263
264 if (qemu_rbd_parsename(filename, pool, sizeof(pool),
265 snap_buf, sizeof(snap_buf),
266 name, sizeof(name),
267 conf, sizeof(conf)) < 0) {
268 return -EINVAL;
269 }
270
271 /* Read out options */
272 while (options && options->name) {
273 if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
274 bytes = options->value.n;
275 } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
276 if (options->value.n) {
277 objsize = options->value.n;
278 if ((objsize - 1) & objsize) { /* not a power of 2? */
279 error_report("obj size needs to be power of 2");
280 return -EINVAL;
281 }
282 if (objsize < 4096) {
283 error_report("obj size too small");
284 return -EINVAL;
285 }
286 obj_order = ffs(objsize) - 1;
287 }
288 }
289 options++;
290 }
291
292 clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
293 if (rados_create(&cluster, clientname) < 0) {
294 error_report("error initializing");
295 return -EIO;
296 }
297
298 if (strstr(conf, "conf=") == NULL) {
299 /* try default location, but ignore failure */
300 rados_conf_read_file(cluster, NULL);
301 }
302
303 if (conf[0] != '\0' &&
304 qemu_rbd_set_conf(cluster, conf) < 0) {
305 error_report("error setting config options");
306 rados_shutdown(cluster);
307 return -EIO;
308 }
309
310 if (rados_connect(cluster) < 0) {
311 error_report("error connecting");
312 rados_shutdown(cluster);
313 return -EIO;
314 }
315
316 if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) {
317 error_report("error opening pool %s", pool);
318 rados_shutdown(cluster);
319 return -EIO;
320 }
321
322 ret = rbd_create(io_ctx, name, bytes, &obj_order);
323 rados_ioctx_destroy(io_ctx);
324 rados_shutdown(cluster);
325
326 return ret;
327 }
328
329 /*
330 * This aio completion is being called from qemu_rbd_aio_event_reader()
331 * and runs in qemu context. It schedules a bh, but just in case the aio
332 * was not cancelled before.
333 */
334 static void qemu_rbd_complete_aio(RADOSCB *rcb)
335 {
336 RBDAIOCB *acb = rcb->acb;
337 int64_t r;
338
339 if (acb->cancelled) {
340 qemu_vfree(acb->bounce);
341 qemu_aio_release(acb);
342 goto done;
343 }
344
345 r = rcb->ret;
346
347 if (acb->write) {
348 if (r < 0) {
349 acb->ret = r;
350 acb->error = 1;
351 } else if (!acb->error) {
352 acb->ret = rcb->size;
353 }
354 } else {
355 if (r < 0) {
356 memset(rcb->buf, 0, rcb->size);
357 acb->ret = r;
358 acb->error = 1;
359 } else if (r < rcb->size) {
360 memset(rcb->buf + r, 0, rcb->size - r);
361 if (!acb->error) {
362 acb->ret = rcb->size;
363 }
364 } else if (!acb->error) {
365 acb->ret = r;
366 }
367 }
368 /* Note that acb->bh can be NULL in case where the aio was cancelled */
369 acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
370 qemu_bh_schedule(acb->bh);
371 done:
372 g_free(rcb);
373 }
374
375 /*
376 * aio fd read handler. It runs in the qemu context and calls the
377 * completion handling of completed rados aio operations.
378 */
379 static void qemu_rbd_aio_event_reader(void *opaque)
380 {
381 BDRVRBDState *s = opaque;
382
383 ssize_t ret;
384
385 do {
386 char *p = (char *)&s->event_rcb;
387
388 /* now read the rcb pointer that was sent from a non qemu thread */
389 ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
390 sizeof(s->event_rcb) - s->event_reader_pos);
391 if (ret > 0) {
392 s->event_reader_pos += ret;
393 if (s->event_reader_pos == sizeof(s->event_rcb)) {
394 s->event_reader_pos = 0;
395 qemu_rbd_complete_aio(s->event_rcb);
396 s->qemu_aio_count--;
397 }
398 }
399 } while (ret < 0 && errno == EINTR);
400 }
401
402 static int qemu_rbd_aio_flush_cb(void *opaque)
403 {
404 BDRVRBDState *s = opaque;
405
406 return (s->qemu_aio_count > 0);
407 }
408
409 static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags)
410 {
411 BDRVRBDState *s = bs->opaque;
412 char pool[RBD_MAX_POOL_NAME_SIZE];
413 char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
414 char conf[RBD_MAX_CONF_SIZE];
415 char clientname_buf[RBD_MAX_CONF_SIZE];
416 char *clientname;
417 int r;
418
419 if (qemu_rbd_parsename(filename, pool, sizeof(pool),
420 snap_buf, sizeof(snap_buf),
421 s->name, sizeof(s->name),
422 conf, sizeof(conf)) < 0) {
423 return -EINVAL;
424 }
425
426 clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
427 r = rados_create(&s->cluster, clientname);
428 if (r < 0) {
429 error_report("error initializing");
430 return r;
431 }
432
433 s->snap = NULL;
434 if (snap_buf[0] != '\0') {
435 s->snap = g_strdup(snap_buf);
436 }
437
438 if (strstr(conf, "conf=") == NULL) {
439 /* try default location, but ignore failure */
440 rados_conf_read_file(s->cluster, NULL);
441 }
442
443 if (conf[0] != '\0') {
444 r = qemu_rbd_set_conf(s->cluster, conf);
445 if (r < 0) {
446 error_report("error setting config options");
447 goto failed_shutdown;
448 }
449 }
450
451 r = rados_connect(s->cluster);
452 if (r < 0) {
453 error_report("error connecting");
454 goto failed_shutdown;
455 }
456
457 r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
458 if (r < 0) {
459 error_report("error opening pool %s", pool);
460 goto failed_shutdown;
461 }
462
463 r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
464 if (r < 0) {
465 error_report("error reading header from %s", s->name);
466 goto failed_open;
467 }
468
469 bs->read_only = (s->snap != NULL);
470
471 s->event_reader_pos = 0;
472 r = qemu_pipe(s->fds);
473 if (r < 0) {
474 error_report("error opening eventfd");
475 goto failed;
476 }
477 fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
478 fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
479 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
480 NULL, qemu_rbd_aio_flush_cb, NULL, s);
481
482
483 return 0;
484
485 failed:
486 rbd_close(s->image);
487 failed_open:
488 rados_ioctx_destroy(s->io_ctx);
489 failed_shutdown:
490 rados_shutdown(s->cluster);
491 g_free(s->snap);
492 return r;
493 }
494
495 static void qemu_rbd_close(BlockDriverState *bs)
496 {
497 BDRVRBDState *s = bs->opaque;
498
499 close(s->fds[0]);
500 close(s->fds[1]);
501 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL,
502 NULL);
503
504 rbd_close(s->image);
505 rados_ioctx_destroy(s->io_ctx);
506 g_free(s->snap);
507 rados_shutdown(s->cluster);
508 }
509
510 /*
511 * Cancel aio. Since we don't reference acb in a non qemu threads,
512 * it is safe to access it here.
513 */
514 static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
515 {
516 RBDAIOCB *acb = (RBDAIOCB *) blockacb;
517 acb->cancelled = 1;
518 }
519
520 static AIOPool rbd_aio_pool = {
521 .aiocb_size = sizeof(RBDAIOCB),
522 .cancel = qemu_rbd_aio_cancel,
523 };
524
525 static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
526 {
527 int ret = 0;
528 while (1) {
529 fd_set wfd;
530 int fd = s->fds[RBD_FD_WRITE];
531
532 /* send the op pointer to the qemu thread that is responsible
533 for the aio/op completion. Must do it in a qemu thread context */
534 ret = write(fd, (void *)&rcb, sizeof(rcb));
535 if (ret >= 0) {
536 break;
537 }
538 if (errno == EINTR) {
539 continue;
540 }
541 if (errno != EAGAIN) {
542 break;
543 }
544
545 FD_ZERO(&wfd);
546 FD_SET(fd, &wfd);
547 do {
548 ret = select(fd + 1, NULL, &wfd, NULL, NULL);
549 } while (ret < 0 && errno == EINTR);
550 }
551
552 return ret;
553 }
554
555 /*
556 * This is the callback function for rbd_aio_read and _write
557 *
558 * Note: this function is being called from a non qemu thread so
559 * we need to be careful about what we do here. Generally we only
560 * write to the block notification pipe, and do the rest of the
561 * io completion handling from qemu_rbd_aio_event_reader() which
562 * runs in a qemu context.
563 */
564 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
565 {
566 int ret;
567 rcb->ret = rbd_aio_get_return_value(c);
568 rbd_aio_release(c);
569 ret = qemu_rbd_send_pipe(rcb->s, rcb);
570 if (ret < 0) {
571 error_report("failed writing to acb->s->fds");
572 g_free(rcb);
573 }
574 }
575
576 /* Callback when all queued rbd_aio requests are complete */
577
578 static void rbd_aio_bh_cb(void *opaque)
579 {
580 RBDAIOCB *acb = opaque;
581
582 if (!acb->write) {
583 qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
584 }
585 qemu_vfree(acb->bounce);
586 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
587 qemu_bh_delete(acb->bh);
588 acb->bh = NULL;
589
590 qemu_aio_release(acb);
591 }
592
593 static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
594 int64_t sector_num,
595 QEMUIOVector *qiov,
596 int nb_sectors,
597 BlockDriverCompletionFunc *cb,
598 void *opaque, int write)
599 {
600 RBDAIOCB *acb;
601 RADOSCB *rcb;
602 rbd_completion_t c;
603 int64_t off, size;
604 char *buf;
605 int r;
606
607 BDRVRBDState *s = bs->opaque;
608
609 acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
610 if (!acb) {
611 return NULL;
612 }
613 acb->write = write;
614 acb->qiov = qiov;
615 acb->bounce = qemu_blockalign(bs, qiov->size);
616 acb->ret = 0;
617 acb->error = 0;
618 acb->s = s;
619 acb->cancelled = 0;
620 acb->bh = NULL;
621
622 if (write) {
623 qemu_iovec_to_buffer(acb->qiov, acb->bounce);
624 }
625
626 buf = acb->bounce;
627
628 off = sector_num * BDRV_SECTOR_SIZE;
629 size = nb_sectors * BDRV_SECTOR_SIZE;
630
631 s->qemu_aio_count++; /* All the RADOSCB */
632
633 rcb = g_malloc(sizeof(RADOSCB));
634 rcb->done = 0;
635 rcb->acb = acb;
636 rcb->buf = buf;
637 rcb->s = acb->s;
638 rcb->size = size;
639 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
640 if (r < 0) {
641 goto failed;
642 }
643
644 if (write) {
645 r = rbd_aio_write(s->image, off, size, buf, c);
646 } else {
647 r = rbd_aio_read(s->image, off, size, buf, c);
648 }
649
650 if (r < 0) {
651 goto failed;
652 }
653
654 return &acb->common;
655
656 failed:
657 g_free(rcb);
658 s->qemu_aio_count--;
659 qemu_aio_release(acb);
660 return NULL;
661 }
662
663 static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
664 int64_t sector_num,
665 QEMUIOVector *qiov,
666 int nb_sectors,
667 BlockDriverCompletionFunc *cb,
668 void *opaque)
669 {
670 return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
671 }
672
673 static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
674 int64_t sector_num,
675 QEMUIOVector *qiov,
676 int nb_sectors,
677 BlockDriverCompletionFunc *cb,
678 void *opaque)
679 {
680 return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
681 }
682
683 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
684 {
685 BDRVRBDState *s = bs->opaque;
686 rbd_image_info_t info;
687 int r;
688
689 r = rbd_stat(s->image, &info, sizeof(info));
690 if (r < 0) {
691 return r;
692 }
693
694 bdi->cluster_size = info.obj_size;
695 return 0;
696 }
697
698 static int64_t qemu_rbd_getlength(BlockDriverState *bs)
699 {
700 BDRVRBDState *s = bs->opaque;
701 rbd_image_info_t info;
702 int r;
703
704 r = rbd_stat(s->image, &info, sizeof(info));
705 if (r < 0) {
706 return r;
707 }
708
709 return info.size;
710 }
711
712 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset)
713 {
714 BDRVRBDState *s = bs->opaque;
715 int r;
716
717 r = rbd_resize(s->image, offset);
718 if (r < 0) {
719 return r;
720 }
721
722 return 0;
723 }
724
725 static int qemu_rbd_snap_create(BlockDriverState *bs,
726 QEMUSnapshotInfo *sn_info)
727 {
728 BDRVRBDState *s = bs->opaque;
729 int r;
730
731 if (sn_info->name[0] == '\0') {
732 return -EINVAL; /* we need a name for rbd snapshots */
733 }
734
735 /*
736 * rbd snapshots are using the name as the user controlled unique identifier
737 * we can't use the rbd snapid for that purpose, as it can't be set
738 */
739 if (sn_info->id_str[0] != '\0' &&
740 strcmp(sn_info->id_str, sn_info->name) != 0) {
741 return -EINVAL;
742 }
743
744 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
745 return -ERANGE;
746 }
747
748 r = rbd_snap_create(s->image, sn_info->name);
749 if (r < 0) {
750 error_report("failed to create snap: %s", strerror(-r));
751 return r;
752 }
753
754 return 0;
755 }
756
757 static int qemu_rbd_snap_list(BlockDriverState *bs,
758 QEMUSnapshotInfo **psn_tab)
759 {
760 BDRVRBDState *s = bs->opaque;
761 QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
762 int i, snap_count;
763 rbd_snap_info_t *snaps;
764 int max_snaps = RBD_MAX_SNAPS;
765
766 do {
767 snaps = g_malloc(sizeof(*snaps) * max_snaps);
768 snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
769 if (snap_count < 0) {
770 g_free(snaps);
771 }
772 } while (snap_count == -ERANGE);
773
774 if (snap_count <= 0) {
775 return snap_count;
776 }
777
778 sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo));
779
780 for (i = 0; i < snap_count; i++) {
781 const char *snap_name = snaps[i].name;
782
783 sn_info = sn_tab + i;
784 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
785 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
786
787 sn_info->vm_state_size = snaps[i].size;
788 sn_info->date_sec = 0;
789 sn_info->date_nsec = 0;
790 sn_info->vm_clock_nsec = 0;
791 }
792 rbd_snap_list_end(snaps);
793
794 *psn_tab = sn_tab;
795 return snap_count;
796 }
797
798 static QEMUOptionParameter qemu_rbd_create_options[] = {
799 {
800 .name = BLOCK_OPT_SIZE,
801 .type = OPT_SIZE,
802 .help = "Virtual disk size"
803 },
804 {
805 .name = BLOCK_OPT_CLUSTER_SIZE,
806 .type = OPT_SIZE,
807 .help = "RBD object size"
808 },
809 {NULL}
810 };
811
812 static BlockDriver bdrv_rbd = {
813 .format_name = "rbd",
814 .instance_size = sizeof(BDRVRBDState),
815 .bdrv_file_open = qemu_rbd_open,
816 .bdrv_close = qemu_rbd_close,
817 .bdrv_create = qemu_rbd_create,
818 .bdrv_get_info = qemu_rbd_getinfo,
819 .create_options = qemu_rbd_create_options,
820 .bdrv_getlength = qemu_rbd_getlength,
821 .bdrv_truncate = qemu_rbd_truncate,
822 .protocol_name = "rbd",
823
824 .bdrv_aio_readv = qemu_rbd_aio_readv,
825 .bdrv_aio_writev = qemu_rbd_aio_writev,
826
827 .bdrv_snapshot_create = qemu_rbd_snap_create,
828 .bdrv_snapshot_list = qemu_rbd_snap_list,
829 };
830
831 static void bdrv_rbd_init(void)
832 {
833 bdrv_register(&bdrv_rbd);
834 }
835
836 block_init(bdrv_rbd_init);