2 * QEMU Block driver for Archipelago
4 * Copyright (C) 2014 Chrysostomos Nanakos <cnanakos@grnet.gr>
6 * This work is licensed under the terms of the GNU GPL, version 2 or later.
7 * See the COPYING file in the top-level directory.
12 * VM Image on Archipelago volume is specified like this:
14 * file.driver=archipelago,file.volume=<volumename>
15 * [,file.mport=<mapperd_port>[,file.vport=<vlmcd_port>]
16 * [,file.segment=<segment_name>]]
18 * 'archipelago' is the protocol.
20 * 'mport' is the port number on which mapperd is listening. This is optional
21 * and if not specified, QEMU will make Archipelago to use the default port.
23 * 'vport' is the port number on which vlmcd is listening. This is optional
24 * and if not specified, QEMU will make Archipelago to use the default port.
26 * 'segment' is the name of the shared memory segment Archipelago stack
27 * is using. This is optional and if not specified, QEMU will make Archipelago
28 * to use the default value, 'archipelago'.
32 * file.driver=archipelago,file.volume=my_vm_volume
33 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
34 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
36 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
37 * file.vport=1234,file.segment=my_segment
40 #include "block/block_int.h"
41 #include "qemu/error-report.h"
42 #include "qemu/thread.h"
43 #include "qapi/qmp/qint.h"
44 #include "qapi/qmp/qstring.h"
45 #include "qapi/qmp/qjson.h"
48 #include <xseg/xseg.h>
49 #include <xseg/protocol.h>
51 #define ARCHIP_FD_READ 0
52 #define ARCHIP_FD_WRITE 1
53 #define MAX_REQUEST_SIZE 524288
55 #define ARCHIPELAGO_OPT_VOLUME "volume"
56 #define ARCHIPELAGO_OPT_SEGMENT "segment"
57 #define ARCHIPELAGO_OPT_MPORT "mport"
58 #define ARCHIPELAGO_OPT_VPORT "vport"
59 #define ARCHIPELAGO_DFL_MPORT 1001
60 #define ARCHIPELAGO_DFL_VPORT 501
62 #define archipelagolog(fmt, ...) \
64 fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, ##__VA_ARGS__); \
74 typedef struct ArchipelagoAIOCB
{
75 BlockDriverAIOCB common
;
77 struct BDRVArchipelagoState
*s
;
86 typedef struct BDRVArchipelagoState
{
87 ArchipelagoAIOCB
*event_acb
;
91 /* Archipelago specific */
93 struct xseg_port
*port
;
98 QemuMutex archip_mutex
;
101 /* Request handler specific */
102 QemuThread request_th
;
103 QemuCond request_cond
;
104 QemuMutex request_mutex
;
107 } BDRVArchipelagoState
;
109 typedef struct ArchipelagoSegmentedRequest
{
114 } ArchipelagoSegmentedRequest
;
116 typedef struct AIORequestData
{
123 ArchipelagoAIOCB
*aio_cb
;
124 ArchipelagoSegmentedRequest
*segreq
;
127 static void qemu_archipelago_complete_aio(void *opaque
);
129 static void init_local_signal(struct xseg
*xseg
, xport sport
, xport srcport
)
131 if (xseg
&& (sport
!= srcport
)) {
132 xseg_init_local_signal(xseg
, srcport
);
137 static void archipelago_finish_aiocb(AIORequestData
*reqdata
)
139 if (reqdata
->aio_cb
->ret
!= reqdata
->segreq
->total
) {
140 reqdata
->aio_cb
->ret
= -EIO
;
141 } else if (reqdata
->aio_cb
->ret
== reqdata
->segreq
->total
) {
142 reqdata
->aio_cb
->ret
= 0;
144 reqdata
->aio_cb
->bh
= aio_bh_new(
145 bdrv_get_aio_context(reqdata
->aio_cb
->common
.bs
),
146 qemu_archipelago_complete_aio
, reqdata
148 qemu_bh_schedule(reqdata
->aio_cb
->bh
);
151 static int wait_reply(struct xseg
*xseg
, xport srcport
, struct xseg_port
*port
,
152 struct xseg_request
*expected_req
)
154 struct xseg_request
*req
;
155 xseg_prepare_wait(xseg
, srcport
);
156 void *psd
= xseg_get_signal_desc(xseg
, port
);
158 req
= xseg_receive(xseg
, srcport
, X_NONBLOCK
);
160 if (req
!= expected_req
) {
161 archipelagolog("Unknown received request\n");
162 xseg_put_request(xseg
, req
, srcport
);
163 } else if (!(req
->state
& XS_SERVED
)) {
169 xseg_wait_signal(xseg
, psd
, 100000UL);
171 xseg_cancel_wait(xseg
, srcport
);
175 static void xseg_request_handler(void *state
)
177 BDRVArchipelagoState
*s
= (BDRVArchipelagoState
*) state
;
178 void *psd
= xseg_get_signal_desc(s
->xseg
, s
->port
);
179 qemu_mutex_lock(&s
->request_mutex
);
181 while (!s
->stopping
) {
182 struct xseg_request
*req
;
184 xseg_prepare_wait(s
->xseg
, s
->srcport
);
185 req
= xseg_receive(s
->xseg
, s
->srcport
, X_NONBLOCK
);
187 AIORequestData
*reqdata
;
188 ArchipelagoSegmentedRequest
*segreq
;
189 xseg_get_req_data(s
->xseg
, req
, (void **)&reqdata
);
191 switch (reqdata
->op
) {
193 data
= xseg_get_data(s
->xseg
, req
);
194 segreq
= reqdata
->segreq
;
195 segreq
->count
+= req
->serviced
;
197 qemu_iovec_from_buf(reqdata
->aio_cb
->qiov
, reqdata
->bufidx
,
201 xseg_put_request(s
->xseg
, req
, s
->srcport
);
203 if ((__sync_add_and_fetch(&segreq
->ref
, -1)) == 0) {
204 if (!segreq
->failed
) {
205 reqdata
->aio_cb
->ret
= segreq
->count
;
206 archipelago_finish_aiocb(reqdata
);
216 case ARCHIP_OP_WRITE
:
217 case ARCHIP_OP_FLUSH
:
218 segreq
= reqdata
->segreq
;
219 segreq
->count
+= req
->serviced
;
220 xseg_put_request(s
->xseg
, req
, s
->srcport
);
222 if ((__sync_add_and_fetch(&segreq
->ref
, -1)) == 0) {
223 if (!segreq
->failed
) {
224 reqdata
->aio_cb
->ret
= segreq
->count
;
225 archipelago_finish_aiocb(reqdata
);
235 case ARCHIP_OP_VOLINFO
:
236 s
->is_signaled
= true;
237 qemu_cond_signal(&s
->archip_cond
);
241 xseg_wait_signal(s
->xseg
, psd
, 100000UL);
243 xseg_cancel_wait(s
->xseg
, s
->srcport
);
246 s
->th_is_signaled
= true;
247 qemu_cond_signal(&s
->request_cond
);
248 qemu_mutex_unlock(&s
->request_mutex
);
249 qemu_thread_exit(NULL
);
252 static int qemu_archipelago_xseg_init(BDRVArchipelagoState
*s
)
254 if (xseg_initialize()) {
255 archipelagolog("Cannot initialize XSEG\n");
259 s
->xseg
= xseg_join("posix", s
->segment_name
,
262 archipelagolog("Cannot join XSEG shared memory segment\n");
265 s
->port
= xseg_bind_dynport(s
->xseg
);
266 s
->srcport
= s
->port
->portno
;
267 init_local_signal(s
->xseg
, s
->sport
, s
->srcport
);
274 static int qemu_archipelago_init(BDRVArchipelagoState
*s
)
278 ret
= qemu_archipelago_xseg_init(s
);
280 error_report("Cannot initialize XSEG. Aborting...\n");
284 qemu_cond_init(&s
->archip_cond
);
285 qemu_mutex_init(&s
->archip_mutex
);
286 qemu_cond_init(&s
->request_cond
);
287 qemu_mutex_init(&s
->request_mutex
);
288 s
->th_is_signaled
= false;
289 qemu_thread_create(&s
->request_th
, "xseg_io_th",
290 (void *) xseg_request_handler
,
291 (void *) s
, QEMU_THREAD_JOINABLE
);
297 static void qemu_archipelago_complete_aio(void *opaque
)
299 AIORequestData
*reqdata
= (AIORequestData
*) opaque
;
300 ArchipelagoAIOCB
*aio_cb
= (ArchipelagoAIOCB
*) reqdata
->aio_cb
;
302 qemu_bh_delete(aio_cb
->bh
);
303 aio_cb
->common
.cb(aio_cb
->common
.opaque
, aio_cb
->ret
);
306 if (!aio_cb
->cancelled
) {
307 qemu_aio_release(aio_cb
);
312 static QemuOptsList archipelago_runtime_opts
= {
313 .name
= "archipelago",
314 .head
= QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts
.head
),
317 .name
= ARCHIPELAGO_OPT_VOLUME
,
318 .type
= QEMU_OPT_STRING
,
319 .help
= "Name of the volume image",
322 .name
= ARCHIPELAGO_OPT_SEGMENT
,
323 .type
= QEMU_OPT_STRING
,
324 .help
= "Name of the Archipelago shared memory segment",
327 .name
= ARCHIPELAGO_OPT_MPORT
,
328 .type
= QEMU_OPT_NUMBER
,
329 .help
= "Archipelago mapperd port number"
332 .name
= ARCHIPELAGO_OPT_VPORT
,
333 .type
= QEMU_OPT_NUMBER
,
334 .help
= "Archipelago vlmcd port number"
337 { /* end of list */ }
341 static int qemu_archipelago_open(BlockDriverState
*bs
,
347 const char *volume
, *segment_name
;
349 Error
*local_err
= NULL
;
350 BDRVArchipelagoState
*s
= bs
->opaque
;
352 opts
= qemu_opts_create(&archipelago_runtime_opts
, NULL
, 0, &error_abort
);
353 qemu_opts_absorb_qdict(opts
, options
, &local_err
);
355 error_propagate(errp
, local_err
);
360 s
->mportno
= qemu_opt_get_number(opts
, ARCHIPELAGO_OPT_MPORT
,
361 ARCHIPELAGO_DFL_MPORT
);
362 s
->vportno
= qemu_opt_get_number(opts
, ARCHIPELAGO_OPT_VPORT
,
363 ARCHIPELAGO_DFL_VPORT
);
365 segment_name
= qemu_opt_get(opts
, ARCHIPELAGO_OPT_SEGMENT
);
366 if (segment_name
== NULL
) {
367 s
->segment_name
= g_strdup("archipelago");
369 s
->segment_name
= g_strdup(segment_name
);
372 volume
= qemu_opt_get(opts
, ARCHIPELAGO_OPT_VOLUME
);
373 if (volume
== NULL
) {
374 error_setg(errp
, "archipelago block driver requires the 'volume'"
379 s
->volname
= g_strdup(volume
);
381 /* Initialize XSEG, join shared memory segment */
382 ret
= qemu_archipelago_init(s
);
384 error_setg(errp
, "cannot initialize XSEG and join shared "
394 g_free(s
->segment_name
);
399 static void qemu_archipelago_close(BlockDriverState
*bs
)
403 struct xseg_request
*req
;
404 BDRVArchipelagoState
*s
= bs
->opaque
;
408 qemu_mutex_lock(&s
->request_mutex
);
409 while (!s
->th_is_signaled
) {
410 qemu_cond_wait(&s
->request_cond
,
413 qemu_mutex_unlock(&s
->request_mutex
);
414 qemu_thread_join(&s
->request_th
);
415 qemu_cond_destroy(&s
->request_cond
);
416 qemu_mutex_destroy(&s
->request_mutex
);
418 qemu_cond_destroy(&s
->archip_cond
);
419 qemu_mutex_destroy(&s
->archip_mutex
);
421 targetlen
= strlen(s
->volname
);
422 req
= xseg_get_request(s
->xseg
, s
->srcport
, s
->vportno
, X_ALLOC
);
424 archipelagolog("Cannot get XSEG request\n");
427 r
= xseg_prep_request(s
->xseg
, req
, targetlen
, 0);
429 xseg_put_request(s
->xseg
, req
, s
->srcport
);
430 archipelagolog("Cannot prepare XSEG close request\n");
434 target
= xseg_get_target(s
->xseg
, req
);
435 memcpy(target
, s
->volname
, targetlen
);
436 req
->size
= req
->datalen
;
440 xport p
= xseg_submit(s
->xseg
, req
, s
->srcport
, X_ALLOC
);
442 xseg_put_request(s
->xseg
, req
, s
->srcport
);
443 archipelagolog("Cannot submit XSEG close request\n");
447 xseg_signal(s
->xseg
, p
);
448 wait_reply(s
->xseg
, s
->srcport
, s
->port
, req
);
450 xseg_put_request(s
->xseg
, req
, s
->srcport
);
454 g_free(s
->segment_name
);
455 xseg_quit_local_signal(s
->xseg
, s
->srcport
);
456 xseg_leave_dynport(s
->xseg
, s
->port
);
460 static void qemu_archipelago_aio_cancel(BlockDriverAIOCB
*blockacb
)
462 ArchipelagoAIOCB
*aio_cb
= (ArchipelagoAIOCB
*) blockacb
;
463 aio_cb
->cancelled
= true;
464 while (aio_cb
->status
== -EINPROGRESS
) {
465 aio_poll(bdrv_get_aio_context(aio_cb
->common
.bs
), true);
467 qemu_aio_release(aio_cb
);
470 static const AIOCBInfo archipelago_aiocb_info
= {
471 .aiocb_size
= sizeof(ArchipelagoAIOCB
),
472 .cancel
= qemu_archipelago_aio_cancel
,
475 static int archipelago_submit_request(BDRVArchipelagoState
*s
,
479 ArchipelagoAIOCB
*aio_cb
,
480 ArchipelagoSegmentedRequest
*segreq
,
486 struct xseg_request
*req
;
487 AIORequestData
*reqdata
= g_malloc(sizeof(AIORequestData
));
489 targetlen
= strlen(s
->volname
);
490 req
= xseg_get_request(s
->xseg
, s
->srcport
, s
->vportno
, X_ALLOC
);
492 archipelagolog("Cannot get XSEG request\n");
495 ret
= xseg_prep_request(s
->xseg
, req
, targetlen
, count
);
497 archipelagolog("Cannot prepare XSEG request\n");
500 target
= xseg_get_target(s
->xseg
, req
);
502 archipelagolog("Cannot get XSEG target\n");
505 memcpy(target
, s
->volname
, targetlen
);
507 req
->offset
= offset
;
513 case ARCHIP_OP_WRITE
:
516 case ARCHIP_OP_FLUSH
:
520 reqdata
->volname
= s
->volname
;
521 reqdata
->offset
= offset
;
522 reqdata
->size
= count
;
523 reqdata
->bufidx
= bufidx
;
524 reqdata
->aio_cb
= aio_cb
;
525 reqdata
->segreq
= segreq
;
528 xseg_set_req_data(s
->xseg
, req
, reqdata
);
529 if (op
== ARCHIP_OP_WRITE
) {
530 data
= xseg_get_data(s
->xseg
, req
);
532 archipelagolog("Cannot get XSEG data\n");
535 qemu_iovec_to_buf(aio_cb
->qiov
, bufidx
, data
, count
);
538 xport p
= xseg_submit(s
->xseg
, req
, s
->srcport
, X_ALLOC
);
540 archipelagolog("Could not submit XSEG request\n");
543 xseg_signal(s
->xseg
, p
);
548 xseg_put_request(s
->xseg
, req
, s
->srcport
);
555 static int archipelago_aio_segmented_rw(BDRVArchipelagoState
*s
,
558 ArchipelagoAIOCB
*aio_cb
,
561 int i
, ret
, segments_nr
, last_segment_size
;
562 ArchipelagoSegmentedRequest
*segreq
;
564 segreq
= g_malloc(sizeof(ArchipelagoSegmentedRequest
));
566 if (op
== ARCHIP_OP_FLUSH
) {
568 segreq
->ref
= segments_nr
;
569 segreq
->total
= count
;
572 ret
= archipelago_submit_request(s
, 0, count
, offset
, aio_cb
,
573 segreq
, ARCHIP_OP_FLUSH
);
580 segments_nr
= (int)(count
/ MAX_REQUEST_SIZE
) + \
581 ((count
% MAX_REQUEST_SIZE
) ? 1 : 0);
582 last_segment_size
= (int)(count
% MAX_REQUEST_SIZE
);
584 segreq
->ref
= segments_nr
;
585 segreq
->total
= count
;
589 for (i
= 0; i
< segments_nr
- 1; i
++) {
590 ret
= archipelago_submit_request(s
, i
* MAX_REQUEST_SIZE
,
592 offset
+ i
* MAX_REQUEST_SIZE
,
600 if ((segments_nr
> 1) && last_segment_size
) {
601 ret
= archipelago_submit_request(s
, i
* MAX_REQUEST_SIZE
,
603 offset
+ i
* MAX_REQUEST_SIZE
,
605 } else if ((segments_nr
> 1) && !last_segment_size
) {
606 ret
= archipelago_submit_request(s
, i
* MAX_REQUEST_SIZE
,
608 offset
+ i
* MAX_REQUEST_SIZE
,
610 } else if (segments_nr
== 1) {
611 ret
= archipelago_submit_request(s
, 0, count
, offset
, aio_cb
,
622 __sync_add_and_fetch(&segreq
->failed
, 1);
623 if (segments_nr
== 1) {
624 if (__sync_add_and_fetch(&segreq
->ref
, -1) == 0) {
628 if ((__sync_add_and_fetch(&segreq
->ref
, -segments_nr
+ i
)) == 0) {
636 static BlockDriverAIOCB
*qemu_archipelago_aio_rw(BlockDriverState
*bs
,
640 BlockDriverCompletionFunc
*cb
,
644 ArchipelagoAIOCB
*aio_cb
;
645 BDRVArchipelagoState
*s
= bs
->opaque
;
649 aio_cb
= qemu_aio_get(&archipelago_aiocb_info
, bs
, cb
, opaque
);
655 aio_cb
->cancelled
= false;
656 aio_cb
->status
= -EINPROGRESS
;
658 off
= sector_num
* BDRV_SECTOR_SIZE
;
659 size
= nb_sectors
* BDRV_SECTOR_SIZE
;
662 ret
= archipelago_aio_segmented_rw(s
, size
, off
,
667 return &aio_cb
->common
;
670 error_report("qemu_archipelago_aio_rw(): I/O Error\n");
671 qemu_aio_release(aio_cb
);
675 static BlockDriverAIOCB
*qemu_archipelago_aio_readv(BlockDriverState
*bs
,
676 int64_t sector_num
, QEMUIOVector
*qiov
, int nb_sectors
,
677 BlockDriverCompletionFunc
*cb
, void *opaque
)
679 return qemu_archipelago_aio_rw(bs
, sector_num
, qiov
, nb_sectors
, cb
,
680 opaque
, ARCHIP_OP_READ
);
683 static BlockDriverAIOCB
*qemu_archipelago_aio_writev(BlockDriverState
*bs
,
684 int64_t sector_num
, QEMUIOVector
*qiov
, int nb_sectors
,
685 BlockDriverCompletionFunc
*cb
, void *opaque
)
687 return qemu_archipelago_aio_rw(bs
, sector_num
, qiov
, nb_sectors
, cb
,
688 opaque
, ARCHIP_OP_WRITE
);
691 static int64_t archipelago_volume_info(BDRVArchipelagoState
*s
)
695 struct xseg_request
*req
;
696 struct xseg_reply_info
*xinfo
;
697 AIORequestData
*reqdata
= g_malloc(sizeof(AIORequestData
));
699 const char *volname
= s
->volname
;
700 targetlen
= strlen(volname
);
701 req
= xseg_get_request(s
->xseg
, s
->srcport
, s
->mportno
, X_ALLOC
);
703 archipelagolog("Cannot get XSEG request\n");
706 ret
= xseg_prep_request(s
->xseg
, req
, targetlen
,
707 sizeof(struct xseg_reply_info
));
709 archipelagolog("Cannot prepare XSEG request\n");
712 char *target
= xseg_get_target(s
->xseg
, req
);
714 archipelagolog("Cannot get XSEG target\n");
717 memcpy(target
, volname
, targetlen
);
718 req
->size
= req
->datalen
;
722 reqdata
->op
= ARCHIP_OP_VOLINFO
;
723 reqdata
->volname
= volname
;
724 xseg_set_req_data(s
->xseg
, req
, reqdata
);
726 xport p
= xseg_submit(s
->xseg
, req
, s
->srcport
, X_ALLOC
);
728 archipelagolog("Cannot submit XSEG request\n");
731 xseg_signal(s
->xseg
, p
);
732 qemu_mutex_lock(&s
->archip_mutex
);
733 while (!s
->is_signaled
) {
734 qemu_cond_wait(&s
->archip_cond
, &s
->archip_mutex
);
736 s
->is_signaled
= false;
737 qemu_mutex_unlock(&s
->archip_mutex
);
739 xinfo
= (struct xseg_reply_info
*) xseg_get_data(s
->xseg
, req
);
741 xseg_put_request(s
->xseg
, req
, s
->srcport
);
747 xseg_put_request(s
->xseg
, req
, s
->srcport
);
753 static int64_t qemu_archipelago_getlength(BlockDriverState
*bs
)
756 BDRVArchipelagoState
*s
= bs
->opaque
;
758 ret
= archipelago_volume_info(s
);
762 static BlockDriverAIOCB
*qemu_archipelago_aio_flush(BlockDriverState
*bs
,
763 BlockDriverCompletionFunc
*cb
, void *opaque
)
765 return qemu_archipelago_aio_rw(bs
, 0, NULL
, 0, cb
, opaque
,
769 static BlockDriver bdrv_archipelago
= {
770 .format_name
= "archipelago",
771 .protocol_name
= "archipelago",
772 .instance_size
= sizeof(BDRVArchipelagoState
),
773 .bdrv_file_open
= qemu_archipelago_open
,
774 .bdrv_close
= qemu_archipelago_close
,
775 .bdrv_getlength
= qemu_archipelago_getlength
,
776 .bdrv_aio_readv
= qemu_archipelago_aio_readv
,
777 .bdrv_aio_writev
= qemu_archipelago_aio_writev
,
778 .bdrv_aio_flush
= qemu_archipelago_aio_flush
,
779 .bdrv_has_zero_init
= bdrv_has_zero_init_1
,
782 static void bdrv_archipelago_init(void)
784 bdrv_register(&bdrv_archipelago
);
787 block_init(bdrv_archipelago_init
);