]> git.proxmox.com Git - mirror_qemu.git/blame - block/archipelago.c
block: Use g_new() & friends where that makes obvious sense
[mirror_qemu.git] / block / archipelago.c
CommitLineData
c9a12e75
CN
1/*
2 * QEMU Block driver for Archipelago
3 *
4 * Copyright (C) 2014 Chrysostomos Nanakos <cnanakos@grnet.gr>
5 *
6 * This work is licensed under the terms of the GNU GPL, version 2 or later.
7 * See the COPYING file in the top-level directory.
8 *
9 */
10
11/*
12 * VM Image on Archipelago volume is specified like this:
13 *
14 * file.driver=archipelago,file.volume=<volumename>
15 * [,file.mport=<mapperd_port>[,file.vport=<vlmcd_port>]
16 * [,file.segment=<segment_name>]]
17 *
70537a85
CN
18 * or
19 *
20 * file=archipelago:<volumename>[/mport=<mapperd_port>[:vport=<vlmcd_port>][:
21 * segment=<segment_name>]]
22 *
c9a12e75
CN
23 * 'archipelago' is the protocol.
24 *
25 * 'mport' is the port number on which mapperd is listening. This is optional
26 * and if not specified, QEMU will make Archipelago to use the default port.
27 *
28 * 'vport' is the port number on which vlmcd is listening. This is optional
29 * and if not specified, QEMU will make Archipelago to use the default port.
30 *
31 * 'segment' is the name of the shared memory segment Archipelago stack
32 * is using. This is optional and if not specified, QEMU will make Archipelago
33 * to use the default value, 'archipelago'.
34 *
35 * Examples:
36 *
37 * file.driver=archipelago,file.volume=my_vm_volume
38 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
39 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
70537a85 40 * file.vport=1234
c9a12e75 41 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
70537a85
CN
42 * file.vport=1234,file.segment=my_segment
43 *
44 * or
45 *
46 * file=archipelago:my_vm_volume
47 * file=archipelago:my_vm_volume/mport=123
48 * file=archipelago:my_vm_volume/mport=123:vport=1234
49 * file=archipelago:my_vm_volume/mport=123:vport=1234:segment=my_segment
50 *
c9a12e75
CN
51 */
52
70537a85 53#include "qemu-common.h"
c9a12e75
CN
54#include "block/block_int.h"
55#include "qemu/error-report.h"
56#include "qemu/thread.h"
57#include "qapi/qmp/qint.h"
58#include "qapi/qmp/qstring.h"
59#include "qapi/qmp/qjson.h"
60
61#include <inttypes.h>
62#include <xseg/xseg.h>
63#include <xseg/protocol.h>
64
65#define ARCHIP_FD_READ 0
66#define ARCHIP_FD_WRITE 1
67#define MAX_REQUEST_SIZE 524288
68
69#define ARCHIPELAGO_OPT_VOLUME "volume"
70#define ARCHIPELAGO_OPT_SEGMENT "segment"
71#define ARCHIPELAGO_OPT_MPORT "mport"
72#define ARCHIPELAGO_OPT_VPORT "vport"
73#define ARCHIPELAGO_DFL_MPORT 1001
74#define ARCHIPELAGO_DFL_VPORT 501
75
76#define archipelagolog(fmt, ...) \
77 do { \
78 fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, ##__VA_ARGS__); \
79 } while (0)
80
81typedef enum {
82 ARCHIP_OP_READ,
83 ARCHIP_OP_WRITE,
84 ARCHIP_OP_FLUSH,
85 ARCHIP_OP_VOLINFO,
86} ARCHIPCmd;
87
88typedef struct ArchipelagoAIOCB {
89 BlockDriverAIOCB common;
90 QEMUBH *bh;
91 struct BDRVArchipelagoState *s;
92 QEMUIOVector *qiov;
93 ARCHIPCmd cmd;
94 bool cancelled;
95 int status;
96 int64_t size;
97 int64_t ret;
98} ArchipelagoAIOCB;
99
100typedef struct BDRVArchipelagoState {
101 ArchipelagoAIOCB *event_acb;
102 char *volname;
103 char *segment_name;
104 uint64_t size;
105 /* Archipelago specific */
106 struct xseg *xseg;
107 struct xseg_port *port;
108 xport srcport;
109 xport sport;
110 xport mportno;
111 xport vportno;
112 QemuMutex archip_mutex;
113 QemuCond archip_cond;
114 bool is_signaled;
115 /* Request handler specific */
116 QemuThread request_th;
117 QemuCond request_cond;
118 QemuMutex request_mutex;
119 bool th_is_signaled;
120 bool stopping;
121} BDRVArchipelagoState;
122
123typedef struct ArchipelagoSegmentedRequest {
124 size_t count;
125 size_t total;
126 int ref;
127 int failed;
128} ArchipelagoSegmentedRequest;
129
130typedef struct AIORequestData {
131 const char *volname;
132 off_t offset;
133 size_t size;
134 uint64_t bufidx;
135 int ret;
136 int op;
137 ArchipelagoAIOCB *aio_cb;
138 ArchipelagoSegmentedRequest *segreq;
139} AIORequestData;
140
141static void qemu_archipelago_complete_aio(void *opaque);
142
143static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
144{
145 if (xseg && (sport != srcport)) {
146 xseg_init_local_signal(xseg, srcport);
147 sport = srcport;
148 }
149}
150
151static void archipelago_finish_aiocb(AIORequestData *reqdata)
152{
153 if (reqdata->aio_cb->ret != reqdata->segreq->total) {
154 reqdata->aio_cb->ret = -EIO;
155 } else if (reqdata->aio_cb->ret == reqdata->segreq->total) {
156 reqdata->aio_cb->ret = 0;
157 }
158 reqdata->aio_cb->bh = aio_bh_new(
159 bdrv_get_aio_context(reqdata->aio_cb->common.bs),
160 qemu_archipelago_complete_aio, reqdata
161 );
162 qemu_bh_schedule(reqdata->aio_cb->bh);
163}
164
165static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
166 struct xseg_request *expected_req)
167{
168 struct xseg_request *req;
169 xseg_prepare_wait(xseg, srcport);
170 void *psd = xseg_get_signal_desc(xseg, port);
171 while (1) {
172 req = xseg_receive(xseg, srcport, X_NONBLOCK);
173 if (req) {
174 if (req != expected_req) {
175 archipelagolog("Unknown received request\n");
176 xseg_put_request(xseg, req, srcport);
177 } else if (!(req->state & XS_SERVED)) {
178 return -1;
179 } else {
180 break;
181 }
182 }
183 xseg_wait_signal(xseg, psd, 100000UL);
184 }
185 xseg_cancel_wait(xseg, srcport);
186 return 0;
187}
188
189static void xseg_request_handler(void *state)
190{
191 BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
192 void *psd = xseg_get_signal_desc(s->xseg, s->port);
193 qemu_mutex_lock(&s->request_mutex);
194
195 while (!s->stopping) {
196 struct xseg_request *req;
197 void *data;
198 xseg_prepare_wait(s->xseg, s->srcport);
199 req = xseg_receive(s->xseg, s->srcport, X_NONBLOCK);
200 if (req) {
201 AIORequestData *reqdata;
202 ArchipelagoSegmentedRequest *segreq;
203 xseg_get_req_data(s->xseg, req, (void **)&reqdata);
204
205 switch (reqdata->op) {
206 case ARCHIP_OP_READ:
207 data = xseg_get_data(s->xseg, req);
208 segreq = reqdata->segreq;
209 segreq->count += req->serviced;
210
211 qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
212 data,
213 req->serviced);
214
215 xseg_put_request(s->xseg, req, s->srcport);
216
217 if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) {
218 if (!segreq->failed) {
219 reqdata->aio_cb->ret = segreq->count;
220 archipelago_finish_aiocb(reqdata);
221 g_free(segreq);
222 } else {
223 g_free(segreq);
224 g_free(reqdata);
225 }
226 } else {
227 g_free(reqdata);
228 }
229 break;
230 case ARCHIP_OP_WRITE:
231 case ARCHIP_OP_FLUSH:
232 segreq = reqdata->segreq;
233 segreq->count += req->serviced;
234 xseg_put_request(s->xseg, req, s->srcport);
235
236 if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) {
237 if (!segreq->failed) {
238 reqdata->aio_cb->ret = segreq->count;
239 archipelago_finish_aiocb(reqdata);
240 g_free(segreq);
241 } else {
242 g_free(segreq);
243 g_free(reqdata);
244 }
245 } else {
246 g_free(reqdata);
247 }
248 break;
249 case ARCHIP_OP_VOLINFO:
250 s->is_signaled = true;
251 qemu_cond_signal(&s->archip_cond);
252 break;
253 }
254 } else {
255 xseg_wait_signal(s->xseg, psd, 100000UL);
256 }
257 xseg_cancel_wait(s->xseg, s->srcport);
258 }
259
260 s->th_is_signaled = true;
261 qemu_cond_signal(&s->request_cond);
262 qemu_mutex_unlock(&s->request_mutex);
263 qemu_thread_exit(NULL);
264}
265
266static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
267{
268 if (xseg_initialize()) {
269 archipelagolog("Cannot initialize XSEG\n");
270 goto err_exit;
271 }
272
273 s->xseg = xseg_join("posix", s->segment_name,
274 "posixfd", NULL);
275 if (!s->xseg) {
276 archipelagolog("Cannot join XSEG shared memory segment\n");
277 goto err_exit;
278 }
279 s->port = xseg_bind_dynport(s->xseg);
280 s->srcport = s->port->portno;
281 init_local_signal(s->xseg, s->sport, s->srcport);
282 return 0;
283
284err_exit:
285 return -1;
286}
287
288static int qemu_archipelago_init(BDRVArchipelagoState *s)
289{
290 int ret;
291
292 ret = qemu_archipelago_xseg_init(s);
293 if (ret < 0) {
294 error_report("Cannot initialize XSEG. Aborting...\n");
295 goto err_exit;
296 }
297
298 qemu_cond_init(&s->archip_cond);
299 qemu_mutex_init(&s->archip_mutex);
300 qemu_cond_init(&s->request_cond);
301 qemu_mutex_init(&s->request_mutex);
302 s->th_is_signaled = false;
303 qemu_thread_create(&s->request_th, "xseg_io_th",
304 (void *) xseg_request_handler,
305 (void *) s, QEMU_THREAD_JOINABLE);
306
307err_exit:
308 return ret;
309}
310
311static void qemu_archipelago_complete_aio(void *opaque)
312{
313 AIORequestData *reqdata = (AIORequestData *) opaque;
314 ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
315
316 qemu_bh_delete(aio_cb->bh);
317 aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
318 aio_cb->status = 0;
319
320 if (!aio_cb->cancelled) {
321 qemu_aio_release(aio_cb);
322 }
323 g_free(reqdata);
324}
325
70537a85
CN
326static void xseg_find_port(char *pstr, const char *needle, xport *aport)
327{
328 const char *a;
329 char *endptr = NULL;
330 unsigned long port;
331 if (strstart(pstr, needle, &a)) {
332 if (strlen(a) > 0) {
333 port = strtoul(a, &endptr, 10);
334 if (strlen(endptr)) {
335 *aport = -2;
336 return;
337 }
338 *aport = (xport) port;
339 }
340 }
341}
342
343static void xseg_find_segment(char *pstr, const char *needle,
344 char **segment_name)
345{
346 const char *a;
347 if (strstart(pstr, needle, &a)) {
348 if (strlen(a) > 0) {
349 *segment_name = g_strdup(a);
350 }
351 }
352}
353
354static void parse_filename_opts(const char *filename, Error **errp,
355 char **volume, char **segment_name,
356 xport *mport, xport *vport)
357{
358 const char *start;
359 char *tokens[4], *ds;
360 int idx;
361 xport lmport = NoPort, lvport = NoPort;
362
363 strstart(filename, "archipelago:", &start);
364
365 ds = g_strdup(start);
366 tokens[0] = strtok(ds, "/");
367 tokens[1] = strtok(NULL, ":");
368 tokens[2] = strtok(NULL, ":");
369 tokens[3] = strtok(NULL, "\0");
370
371 if (!strlen(tokens[0])) {
372 error_setg(errp, "volume name must be specified first");
373 g_free(ds);
374 return;
375 }
376
377 for (idx = 1; idx < 4; idx++) {
378 if (tokens[idx] != NULL) {
379 if (strstart(tokens[idx], "mport=", NULL)) {
380 xseg_find_port(tokens[idx], "mport=", &lmport);
381 }
382 if (strstart(tokens[idx], "vport=", NULL)) {
383 xseg_find_port(tokens[idx], "vport=", &lvport);
384 }
385 if (strstart(tokens[idx], "segment=", NULL)) {
386 xseg_find_segment(tokens[idx], "segment=", segment_name);
387 }
388 }
389 }
390
391 if ((lmport == -2) || (lvport == -2)) {
392 error_setg(errp, "mport and/or vport must be set");
393 g_free(ds);
394 return;
395 }
396 *volume = g_strdup(tokens[0]);
397 *mport = lmport;
398 *vport = lvport;
399 g_free(ds);
400}
401
402static void archipelago_parse_filename(const char *filename, QDict *options,
403 Error **errp)
404{
405 const char *start;
406 char *volume = NULL, *segment_name = NULL;
407 xport mport = NoPort, vport = NoPort;
408
409 if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
410 || qdict_haskey(options, ARCHIPELAGO_OPT_SEGMENT)
411 || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
412 || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
413 error_setg(errp, "volume/mport/vport/segment and a file name may not"
414 " be specified at the same time");
415 return;
416 }
417
418 if (!strstart(filename, "archipelago:", &start)) {
419 error_setg(errp, "File name must start with 'archipelago:'");
420 return;
421 }
422
423 if (!strlen(start) || strstart(start, "/", NULL)) {
424 error_setg(errp, "volume name must be specified");
425 return;
426 }
427
428 parse_filename_opts(filename, errp, &volume, &segment_name, &mport, &vport);
429
430 if (volume) {
431 qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
432 g_free(volume);
433 }
434 if (segment_name) {
435 qdict_put(options, ARCHIPELAGO_OPT_SEGMENT,
436 qstring_from_str(segment_name));
437 g_free(segment_name);
438 }
439 if (mport != NoPort) {
440 qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
441 }
442 if (vport != NoPort) {
443 qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
444 }
445}
446
c9a12e75
CN
447static QemuOptsList archipelago_runtime_opts = {
448 .name = "archipelago",
449 .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
450 .desc = {
451 {
452 .name = ARCHIPELAGO_OPT_VOLUME,
453 .type = QEMU_OPT_STRING,
454 .help = "Name of the volume image",
455 },
456 {
457 .name = ARCHIPELAGO_OPT_SEGMENT,
458 .type = QEMU_OPT_STRING,
459 .help = "Name of the Archipelago shared memory segment",
460 },
461 {
462 .name = ARCHIPELAGO_OPT_MPORT,
463 .type = QEMU_OPT_NUMBER,
464 .help = "Archipelago mapperd port number"
465 },
466 {
467 .name = ARCHIPELAGO_OPT_VPORT,
468 .type = QEMU_OPT_NUMBER,
469 .help = "Archipelago vlmcd port number"
470
471 },
472 { /* end of list */ }
473 },
474};
475
476static int qemu_archipelago_open(BlockDriverState *bs,
477 QDict *options,
478 int bdrv_flags,
479 Error **errp)
480{
481 int ret = 0;
482 const char *volume, *segment_name;
483 QemuOpts *opts;
484 Error *local_err = NULL;
485 BDRVArchipelagoState *s = bs->opaque;
486
487 opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
488 qemu_opts_absorb_qdict(opts, options, &local_err);
489 if (local_err) {
490 error_propagate(errp, local_err);
491 ret = -EINVAL;
492 goto err_exit;
493 }
494
495 s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT,
496 ARCHIPELAGO_DFL_MPORT);
497 s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT,
498 ARCHIPELAGO_DFL_VPORT);
499
500 segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT);
501 if (segment_name == NULL) {
502 s->segment_name = g_strdup("archipelago");
503 } else {
504 s->segment_name = g_strdup(segment_name);
505 }
506
507 volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
508 if (volume == NULL) {
509 error_setg(errp, "archipelago block driver requires the 'volume'"
510 " option");
511 ret = -EINVAL;
512 goto err_exit;
513 }
514 s->volname = g_strdup(volume);
515
516 /* Initialize XSEG, join shared memory segment */
517 ret = qemu_archipelago_init(s);
518 if (ret < 0) {
519 error_setg(errp, "cannot initialize XSEG and join shared "
520 "memory segment");
521 goto err_exit;
522 }
523
524 qemu_opts_del(opts);
525 return 0;
526
527err_exit:
528 g_free(s->volname);
529 g_free(s->segment_name);
530 qemu_opts_del(opts);
531 return ret;
532}
533
534static void qemu_archipelago_close(BlockDriverState *bs)
535{
536 int r, targetlen;
537 char *target;
538 struct xseg_request *req;
539 BDRVArchipelagoState *s = bs->opaque;
540
541 s->stopping = true;
542
543 qemu_mutex_lock(&s->request_mutex);
544 while (!s->th_is_signaled) {
545 qemu_cond_wait(&s->request_cond,
546 &s->request_mutex);
547 }
548 qemu_mutex_unlock(&s->request_mutex);
549 qemu_thread_join(&s->request_th);
550 qemu_cond_destroy(&s->request_cond);
551 qemu_mutex_destroy(&s->request_mutex);
552
553 qemu_cond_destroy(&s->archip_cond);
554 qemu_mutex_destroy(&s->archip_mutex);
555
556 targetlen = strlen(s->volname);
557 req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
558 if (!req) {
559 archipelagolog("Cannot get XSEG request\n");
560 goto err_exit;
561 }
562 r = xseg_prep_request(s->xseg, req, targetlen, 0);
563 if (r < 0) {
564 xseg_put_request(s->xseg, req, s->srcport);
565 archipelagolog("Cannot prepare XSEG close request\n");
566 goto err_exit;
567 }
568
569 target = xseg_get_target(s->xseg, req);
570 memcpy(target, s->volname, targetlen);
571 req->size = req->datalen;
572 req->offset = 0;
573 req->op = X_CLOSE;
574
575 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
576 if (p == NoPort) {
577 xseg_put_request(s->xseg, req, s->srcport);
578 archipelagolog("Cannot submit XSEG close request\n");
579 goto err_exit;
580 }
581
582 xseg_signal(s->xseg, p);
583 wait_reply(s->xseg, s->srcport, s->port, req);
584
585 xseg_put_request(s->xseg, req, s->srcport);
586
587err_exit:
588 g_free(s->volname);
589 g_free(s->segment_name);
590 xseg_quit_local_signal(s->xseg, s->srcport);
591 xseg_leave_dynport(s->xseg, s->port);
592 xseg_leave(s->xseg);
593}
594
76d3d83a
CN
595static int qemu_archipelago_create_volume(Error **errp, const char *volname,
596 char *segment_name,
597 uint64_t size, xport mportno,
598 xport vportno)
599{
600 int ret, targetlen;
601 struct xseg *xseg = NULL;
602 struct xseg_request *req;
603 struct xseg_request_clone *xclone;
604 struct xseg_port *port;
605 xport srcport = NoPort, sport = NoPort;
606 char *target;
607
608 /* Try default values if none has been set */
609 if (mportno == (xport) -1) {
610 mportno = ARCHIPELAGO_DFL_MPORT;
611 }
612
613 if (vportno == (xport) -1) {
614 vportno = ARCHIPELAGO_DFL_VPORT;
615 }
616
617 if (xseg_initialize()) {
618 error_setg(errp, "Cannot initialize XSEG");
619 return -1;
620 }
621
622 xseg = xseg_join("posix", segment_name,
623 "posixfd", NULL);
624
625 if (!xseg) {
626 error_setg(errp, "Cannot join XSEG shared memory segment");
627 return -1;
628 }
629
630 port = xseg_bind_dynport(xseg);
631 srcport = port->portno;
632 init_local_signal(xseg, sport, srcport);
633
634 req = xseg_get_request(xseg, srcport, mportno, X_ALLOC);
635 if (!req) {
636 error_setg(errp, "Cannot get XSEG request");
637 return -1;
638 }
639
640 targetlen = strlen(volname);
641 ret = xseg_prep_request(xseg, req, targetlen,
642 sizeof(struct xseg_request_clone));
643 if (ret < 0) {
644 error_setg(errp, "Cannot prepare XSEG request");
645 goto err_exit;
646 }
647
648 target = xseg_get_target(xseg, req);
649 if (!target) {
650 error_setg(errp, "Cannot get XSEG target.\n");
651 goto err_exit;
652 }
653 memcpy(target, volname, targetlen);
654 xclone = (struct xseg_request_clone *) xseg_get_data(xseg, req);
655 memset(xclone->target, 0 , XSEG_MAX_TARGETLEN);
656 xclone->targetlen = 0;
657 xclone->size = size;
658 req->offset = 0;
659 req->size = req->datalen;
660 req->op = X_CLONE;
661
662 xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
663 if (p == NoPort) {
664 error_setg(errp, "Could not submit XSEG request");
665 goto err_exit;
666 }
667 xseg_signal(xseg, p);
668
669 ret = wait_reply(xseg, srcport, port, req);
670 if (ret < 0) {
671 error_setg(errp, "wait_reply() error.");
672 }
673
674 xseg_put_request(xseg, req, srcport);
675 xseg_quit_local_signal(xseg, srcport);
676 xseg_leave_dynport(xseg, port);
677 xseg_leave(xseg);
678 return ret;
679
680err_exit:
681 xseg_put_request(xseg, req, srcport);
682 xseg_quit_local_signal(xseg, srcport);
683 xseg_leave_dynport(xseg, port);
684 xseg_leave(xseg);
685 return -1;
686}
687
688static int qemu_archipelago_create(const char *filename,
689 QemuOpts *options,
690 Error **errp)
691{
692 int ret = 0;
693 uint64_t total_size = 0;
694 char *volname = NULL, *segment_name = NULL;
695 const char *start;
696 xport mport = NoPort, vport = NoPort;
697
698 if (!strstart(filename, "archipelago:", &start)) {
699 error_setg(errp, "File name must start with 'archipelago:'");
700 return -1;
701 }
702
703 if (!strlen(start) || strstart(start, "/", NULL)) {
704 error_setg(errp, "volume name must be specified");
705 return -1;
706 }
707
708 parse_filename_opts(filename, errp, &volname, &segment_name, &mport,
709 &vport);
710 total_size = qemu_opt_get_size_del(options, BLOCK_OPT_SIZE, 0);
711
712 if (segment_name == NULL) {
713 segment_name = g_strdup("archipelago");
714 }
715
716 /* Create an Archipelago volume */
717 ret = qemu_archipelago_create_volume(errp, volname, segment_name,
718 total_size, mport,
719 vport);
720
721 g_free(volname);
722 g_free(segment_name);
723 return ret;
724}
725
c9a12e75
CN
726static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
727{
728 ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
729 aio_cb->cancelled = true;
730 while (aio_cb->status == -EINPROGRESS) {
731 aio_poll(bdrv_get_aio_context(aio_cb->common.bs), true);
732 }
733 qemu_aio_release(aio_cb);
734}
735
736static const AIOCBInfo archipelago_aiocb_info = {
737 .aiocb_size = sizeof(ArchipelagoAIOCB),
738 .cancel = qemu_archipelago_aio_cancel,
739};
740
741static int archipelago_submit_request(BDRVArchipelagoState *s,
742 uint64_t bufidx,
743 size_t count,
744 off_t offset,
745 ArchipelagoAIOCB *aio_cb,
746 ArchipelagoSegmentedRequest *segreq,
747 int op)
748{
749 int ret, targetlen;
750 char *target;
751 void *data = NULL;
752 struct xseg_request *req;
5839e53b 753 AIORequestData *reqdata = g_new(AIORequestData, 1);
c9a12e75
CN
754
755 targetlen = strlen(s->volname);
756 req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
757 if (!req) {
758 archipelagolog("Cannot get XSEG request\n");
759 goto err_exit2;
760 }
761 ret = xseg_prep_request(s->xseg, req, targetlen, count);
762 if (ret < 0) {
763 archipelagolog("Cannot prepare XSEG request\n");
764 goto err_exit;
765 }
766 target = xseg_get_target(s->xseg, req);
767 if (!target) {
768 archipelagolog("Cannot get XSEG target\n");
769 goto err_exit;
770 }
771 memcpy(target, s->volname, targetlen);
772 req->size = count;
773 req->offset = offset;
774
775 switch (op) {
776 case ARCHIP_OP_READ:
777 req->op = X_READ;
778 break;
779 case ARCHIP_OP_WRITE:
780 req->op = X_WRITE;
781 break;
782 case ARCHIP_OP_FLUSH:
783 req->op = X_FLUSH;
784 break;
785 }
786 reqdata->volname = s->volname;
787 reqdata->offset = offset;
788 reqdata->size = count;
789 reqdata->bufidx = bufidx;
790 reqdata->aio_cb = aio_cb;
791 reqdata->segreq = segreq;
792 reqdata->op = op;
793
794 xseg_set_req_data(s->xseg, req, reqdata);
795 if (op == ARCHIP_OP_WRITE) {
796 data = xseg_get_data(s->xseg, req);
797 if (!data) {
798 archipelagolog("Cannot get XSEG data\n");
799 goto err_exit;
800 }
801 qemu_iovec_to_buf(aio_cb->qiov, bufidx, data, count);
802 }
803
804 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
805 if (p == NoPort) {
806 archipelagolog("Could not submit XSEG request\n");
807 goto err_exit;
808 }
809 xseg_signal(s->xseg, p);
810 return 0;
811
812err_exit:
813 g_free(reqdata);
814 xseg_put_request(s->xseg, req, s->srcport);
815 return -EIO;
816err_exit2:
817 g_free(reqdata);
818 return -EIO;
819}
820
821static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
822 size_t count,
823 off_t offset,
824 ArchipelagoAIOCB *aio_cb,
825 int op)
826{
827 int i, ret, segments_nr, last_segment_size;
828 ArchipelagoSegmentedRequest *segreq;
829
5839e53b 830 segreq = g_new(ArchipelagoSegmentedRequest, 1);
c9a12e75
CN
831
832 if (op == ARCHIP_OP_FLUSH) {
833 segments_nr = 1;
834 segreq->ref = segments_nr;
835 segreq->total = count;
836 segreq->count = 0;
837 segreq->failed = 0;
838 ret = archipelago_submit_request(s, 0, count, offset, aio_cb,
839 segreq, ARCHIP_OP_FLUSH);
840 if (ret < 0) {
841 goto err_exit;
842 }
843 return 0;
844 }
845
846 segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
847 ((count % MAX_REQUEST_SIZE) ? 1 : 0);
848 last_segment_size = (int)(count % MAX_REQUEST_SIZE);
849
850 segreq->ref = segments_nr;
851 segreq->total = count;
852 segreq->count = 0;
853 segreq->failed = 0;
854
855 for (i = 0; i < segments_nr - 1; i++) {
856 ret = archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
857 MAX_REQUEST_SIZE,
858 offset + i * MAX_REQUEST_SIZE,
859 aio_cb, segreq, op);
860
861 if (ret < 0) {
862 goto err_exit;
863 }
864 }
865
866 if ((segments_nr > 1) && last_segment_size) {
867 ret = archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
868 last_segment_size,
869 offset + i * MAX_REQUEST_SIZE,
870 aio_cb, segreq, op);
871 } else if ((segments_nr > 1) && !last_segment_size) {
872 ret = archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
873 MAX_REQUEST_SIZE,
874 offset + i * MAX_REQUEST_SIZE,
875 aio_cb, segreq, op);
876 } else if (segments_nr == 1) {
877 ret = archipelago_submit_request(s, 0, count, offset, aio_cb,
878 segreq, op);
879 }
880
881 if (ret < 0) {
882 goto err_exit;
883 }
884
885 return 0;
886
887err_exit:
888 __sync_add_and_fetch(&segreq->failed, 1);
889 if (segments_nr == 1) {
890 if (__sync_add_and_fetch(&segreq->ref, -1) == 0) {
891 g_free(segreq);
892 }
893 } else {
894 if ((__sync_add_and_fetch(&segreq->ref, -segments_nr + i)) == 0) {
895 g_free(segreq);
896 }
897 }
898
899 return ret;
900}
901
902static BlockDriverAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
903 int64_t sector_num,
904 QEMUIOVector *qiov,
905 int nb_sectors,
906 BlockDriverCompletionFunc *cb,
907 void *opaque,
908 int op)
909{
910 ArchipelagoAIOCB *aio_cb;
911 BDRVArchipelagoState *s = bs->opaque;
912 int64_t size, off;
913 int ret;
914
915 aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
916 aio_cb->cmd = op;
917 aio_cb->qiov = qiov;
918
919 aio_cb->ret = 0;
920 aio_cb->s = s;
921 aio_cb->cancelled = false;
922 aio_cb->status = -EINPROGRESS;
923
924 off = sector_num * BDRV_SECTOR_SIZE;
925 size = nb_sectors * BDRV_SECTOR_SIZE;
926 aio_cb->size = size;
927
928 ret = archipelago_aio_segmented_rw(s, size, off,
929 aio_cb, op);
930 if (ret < 0) {
931 goto err_exit;
932 }
933 return &aio_cb->common;
934
935err_exit:
936 error_report("qemu_archipelago_aio_rw(): I/O Error\n");
937 qemu_aio_release(aio_cb);
938 return NULL;
939}
940
941static BlockDriverAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
942 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
943 BlockDriverCompletionFunc *cb, void *opaque)
944{
945 return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
946 opaque, ARCHIP_OP_READ);
947}
948
949static BlockDriverAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
950 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
951 BlockDriverCompletionFunc *cb, void *opaque)
952{
953 return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
954 opaque, ARCHIP_OP_WRITE);
955}
956
957static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
958{
959 uint64_t size;
960 int ret, targetlen;
961 struct xseg_request *req;
962 struct xseg_reply_info *xinfo;
5839e53b 963 AIORequestData *reqdata = g_new(AIORequestData, 1);
c9a12e75
CN
964
965 const char *volname = s->volname;
966 targetlen = strlen(volname);
967 req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
968 if (!req) {
969 archipelagolog("Cannot get XSEG request\n");
970 goto err_exit2;
971 }
972 ret = xseg_prep_request(s->xseg, req, targetlen,
973 sizeof(struct xseg_reply_info));
974 if (ret < 0) {
975 archipelagolog("Cannot prepare XSEG request\n");
976 goto err_exit;
977 }
978 char *target = xseg_get_target(s->xseg, req);
979 if (!target) {
980 archipelagolog("Cannot get XSEG target\n");
981 goto err_exit;
982 }
983 memcpy(target, volname, targetlen);
984 req->size = req->datalen;
985 req->offset = 0;
986 req->op = X_INFO;
987
988 reqdata->op = ARCHIP_OP_VOLINFO;
989 reqdata->volname = volname;
990 xseg_set_req_data(s->xseg, req, reqdata);
991
992 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
993 if (p == NoPort) {
994 archipelagolog("Cannot submit XSEG request\n");
995 goto err_exit;
996 }
997 xseg_signal(s->xseg, p);
998 qemu_mutex_lock(&s->archip_mutex);
999 while (!s->is_signaled) {
1000 qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
1001 }
1002 s->is_signaled = false;
1003 qemu_mutex_unlock(&s->archip_mutex);
1004
1005 xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
1006 size = xinfo->size;
1007 xseg_put_request(s->xseg, req, s->srcport);
1008 g_free(reqdata);
1009 s->size = size;
1010 return size;
1011
1012err_exit:
1013 xseg_put_request(s->xseg, req, s->srcport);
1014err_exit2:
1015 g_free(reqdata);
1016 return -EIO;
1017}
1018
1019static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
1020{
1021 int64_t ret;
1022 BDRVArchipelagoState *s = bs->opaque;
1023
1024 ret = archipelago_volume_info(s);
1025 return ret;
1026}
1027
76d3d83a
CN
1028static QemuOptsList qemu_archipelago_create_opts = {
1029 .name = "archipelago-create-opts",
1030 .head = QTAILQ_HEAD_INITIALIZER(qemu_archipelago_create_opts.head),
1031 .desc = {
1032 {
1033 .name = BLOCK_OPT_SIZE,
1034 .type = QEMU_OPT_SIZE,
1035 .help = "Virtual disk size"
1036 },
1037 { /* end of list */ }
1038 }
1039};
1040
c9a12e75
CN
1041static BlockDriverAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
1042 BlockDriverCompletionFunc *cb, void *opaque)
1043{
1044 return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
1045 ARCHIP_OP_FLUSH);
1046}
1047
1048static BlockDriver bdrv_archipelago = {
1049 .format_name = "archipelago",
1050 .protocol_name = "archipelago",
1051 .instance_size = sizeof(BDRVArchipelagoState),
70537a85 1052 .bdrv_parse_filename = archipelago_parse_filename,
c9a12e75
CN
1053 .bdrv_file_open = qemu_archipelago_open,
1054 .bdrv_close = qemu_archipelago_close,
76d3d83a 1055 .bdrv_create = qemu_archipelago_create,
c9a12e75
CN
1056 .bdrv_getlength = qemu_archipelago_getlength,
1057 .bdrv_aio_readv = qemu_archipelago_aio_readv,
1058 .bdrv_aio_writev = qemu_archipelago_aio_writev,
1059 .bdrv_aio_flush = qemu_archipelago_aio_flush,
1060 .bdrv_has_zero_init = bdrv_has_zero_init_1,
76d3d83a 1061 .create_opts = &qemu_archipelago_create_opts,
c9a12e75
CN
1062};
1063
1064static void bdrv_archipelago_init(void)
1065{
1066 bdrv_register(&bdrv_archipelago);
1067}
1068
1069block_init(bdrv_archipelago_init);