]> git.proxmox.com Git - mirror_qemu.git/blob - block/archipelago.c
Merge remote-tracking branch 'remotes/mst/tags/for_upstream' into staging
[mirror_qemu.git] / block / archipelago.c
1 /*
2 * QEMU Block driver for Archipelago
3 *
4 * Copyright (C) 2014 Chrysostomos Nanakos <cnanakos@grnet.gr>
5 *
6 * This work is licensed under the terms of the GNU GPL, version 2 or later.
7 * See the COPYING file in the top-level directory.
8 *
9 */
10
11 /*
12 * VM Image on Archipelago volume is specified like this:
13 *
14 * file.driver=archipelago,file.volume=<volumename>
15 * [,file.mport=<mapperd_port>[,file.vport=<vlmcd_port>]
16 * [,file.segment=<segment_name>]]
17 *
18 * or
19 *
20 * file=archipelago:<volumename>[/mport=<mapperd_port>[:vport=<vlmcd_port>][:
21 * segment=<segment_name>]]
22 *
23 * 'archipelago' is the protocol.
24 *
25 * 'mport' is the port number on which mapperd is listening. This is optional
26 * and if not specified, QEMU will make Archipelago to use the default port.
27 *
28 * 'vport' is the port number on which vlmcd is listening. This is optional
29 * and if not specified, QEMU will make Archipelago to use the default port.
30 *
31 * 'segment' is the name of the shared memory segment Archipelago stack
32 * is using. This is optional and if not specified, QEMU will make Archipelago
33 * to use the default value, 'archipelago'.
34 *
35 * Examples:
36 *
37 * file.driver=archipelago,file.volume=my_vm_volume
38 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
39 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
40 * file.vport=1234
41 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
42 * file.vport=1234,file.segment=my_segment
43 *
44 * or
45 *
46 * file=archipelago:my_vm_volume
47 * file=archipelago:my_vm_volume/mport=123
48 * file=archipelago:my_vm_volume/mport=123:vport=1234
49 * file=archipelago:my_vm_volume/mport=123:vport=1234:segment=my_segment
50 *
51 */
52
53 #include "qemu-common.h"
54 #include "block/block_int.h"
55 #include "qemu/error-report.h"
56 #include "qemu/thread.h"
57 #include "qapi/qmp/qint.h"
58 #include "qapi/qmp/qstring.h"
59 #include "qapi/qmp/qjson.h"
60 #include "qemu/atomic.h"
61
62 #include <inttypes.h>
63 #include <xseg/xseg.h>
64 #include <xseg/protocol.h>
65
66 #define MAX_REQUEST_SIZE 524288
67
68 #define ARCHIPELAGO_OPT_VOLUME "volume"
69 #define ARCHIPELAGO_OPT_SEGMENT "segment"
70 #define ARCHIPELAGO_OPT_MPORT "mport"
71 #define ARCHIPELAGO_OPT_VPORT "vport"
72 #define ARCHIPELAGO_DFL_MPORT 1001
73 #define ARCHIPELAGO_DFL_VPORT 501
74
75 #define archipelagolog(fmt, ...) \
76 do { \
77 fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, ##__VA_ARGS__); \
78 } while (0)
79
80 typedef enum {
81 ARCHIP_OP_READ,
82 ARCHIP_OP_WRITE,
83 ARCHIP_OP_FLUSH,
84 ARCHIP_OP_VOLINFO,
85 ARCHIP_OP_TRUNCATE,
86 } ARCHIPCmd;
87
88 typedef struct ArchipelagoAIOCB {
89 BlockDriverAIOCB common;
90 QEMUBH *bh;
91 struct BDRVArchipelagoState *s;
92 QEMUIOVector *qiov;
93 ARCHIPCmd cmd;
94 bool cancelled;
95 int status;
96 int64_t size;
97 int64_t ret;
98 } ArchipelagoAIOCB;
99
100 typedef struct BDRVArchipelagoState {
101 ArchipelagoAIOCB *event_acb;
102 char *volname;
103 char *segment_name;
104 uint64_t size;
105 /* Archipelago specific */
106 struct xseg *xseg;
107 struct xseg_port *port;
108 xport srcport;
109 xport sport;
110 xport mportno;
111 xport vportno;
112 QemuMutex archip_mutex;
113 QemuCond archip_cond;
114 bool is_signaled;
115 /* Request handler specific */
116 QemuThread request_th;
117 QemuCond request_cond;
118 QemuMutex request_mutex;
119 bool th_is_signaled;
120 bool stopping;
121 } BDRVArchipelagoState;
122
123 typedef struct ArchipelagoSegmentedRequest {
124 size_t count;
125 size_t total;
126 int ref;
127 int failed;
128 } ArchipelagoSegmentedRequest;
129
130 typedef struct AIORequestData {
131 const char *volname;
132 off_t offset;
133 size_t size;
134 uint64_t bufidx;
135 int ret;
136 int op;
137 ArchipelagoAIOCB *aio_cb;
138 ArchipelagoSegmentedRequest *segreq;
139 } AIORequestData;
140
141 static void qemu_archipelago_complete_aio(void *opaque);
142
143 static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
144 {
145 if (xseg && (sport != srcport)) {
146 xseg_init_local_signal(xseg, srcport);
147 sport = srcport;
148 }
149 }
150
151 static void archipelago_finish_aiocb(AIORequestData *reqdata)
152 {
153 if (reqdata->aio_cb->ret != reqdata->segreq->total) {
154 reqdata->aio_cb->ret = -EIO;
155 } else if (reqdata->aio_cb->ret == reqdata->segreq->total) {
156 reqdata->aio_cb->ret = 0;
157 }
158 reqdata->aio_cb->bh = aio_bh_new(
159 bdrv_get_aio_context(reqdata->aio_cb->common.bs),
160 qemu_archipelago_complete_aio, reqdata
161 );
162 qemu_bh_schedule(reqdata->aio_cb->bh);
163 }
164
165 static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
166 struct xseg_request *expected_req)
167 {
168 struct xseg_request *req;
169 xseg_prepare_wait(xseg, srcport);
170 void *psd = xseg_get_signal_desc(xseg, port);
171 while (1) {
172 req = xseg_receive(xseg, srcport, X_NONBLOCK);
173 if (req) {
174 if (req != expected_req) {
175 archipelagolog("Unknown received request\n");
176 xseg_put_request(xseg, req, srcport);
177 } else if (!(req->state & XS_SERVED)) {
178 return -1;
179 } else {
180 break;
181 }
182 }
183 xseg_wait_signal(xseg, psd, 100000UL);
184 }
185 xseg_cancel_wait(xseg, srcport);
186 return 0;
187 }
188
189 static void xseg_request_handler(void *state)
190 {
191 BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
192 void *psd = xseg_get_signal_desc(s->xseg, s->port);
193 qemu_mutex_lock(&s->request_mutex);
194
195 while (!s->stopping) {
196 struct xseg_request *req;
197 void *data;
198 xseg_prepare_wait(s->xseg, s->srcport);
199 req = xseg_receive(s->xseg, s->srcport, X_NONBLOCK);
200 if (req) {
201 AIORequestData *reqdata;
202 ArchipelagoSegmentedRequest *segreq;
203 xseg_get_req_data(s->xseg, req, (void **)&reqdata);
204
205 switch (reqdata->op) {
206 case ARCHIP_OP_READ:
207 data = xseg_get_data(s->xseg, req);
208 segreq = reqdata->segreq;
209 segreq->count += req->serviced;
210
211 qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
212 data,
213 req->serviced);
214
215 xseg_put_request(s->xseg, req, s->srcport);
216
217 if (atomic_fetch_dec(&segreq->ref) == 1) {
218 if (!segreq->failed) {
219 reqdata->aio_cb->ret = segreq->count;
220 archipelago_finish_aiocb(reqdata);
221 g_free(segreq);
222 } else {
223 g_free(segreq);
224 g_free(reqdata);
225 }
226 } else {
227 g_free(reqdata);
228 }
229 break;
230 case ARCHIP_OP_WRITE:
231 case ARCHIP_OP_FLUSH:
232 segreq = reqdata->segreq;
233 segreq->count += req->serviced;
234 xseg_put_request(s->xseg, req, s->srcport);
235
236 if (atomic_fetch_dec(&segreq->ref) == 1) {
237 if (!segreq->failed) {
238 reqdata->aio_cb->ret = segreq->count;
239 archipelago_finish_aiocb(reqdata);
240 g_free(segreq);
241 } else {
242 g_free(segreq);
243 g_free(reqdata);
244 }
245 } else {
246 g_free(reqdata);
247 }
248 break;
249 case ARCHIP_OP_VOLINFO:
250 case ARCHIP_OP_TRUNCATE:
251 s->is_signaled = true;
252 qemu_cond_signal(&s->archip_cond);
253 break;
254 }
255 } else {
256 xseg_wait_signal(s->xseg, psd, 100000UL);
257 }
258 xseg_cancel_wait(s->xseg, s->srcport);
259 }
260
261 s->th_is_signaled = true;
262 qemu_cond_signal(&s->request_cond);
263 qemu_mutex_unlock(&s->request_mutex);
264 qemu_thread_exit(NULL);
265 }
266
267 static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
268 {
269 if (xseg_initialize()) {
270 archipelagolog("Cannot initialize XSEG\n");
271 goto err_exit;
272 }
273
274 s->xseg = xseg_join("posix", s->segment_name,
275 "posixfd", NULL);
276 if (!s->xseg) {
277 archipelagolog("Cannot join XSEG shared memory segment\n");
278 goto err_exit;
279 }
280 s->port = xseg_bind_dynport(s->xseg);
281 s->srcport = s->port->portno;
282 init_local_signal(s->xseg, s->sport, s->srcport);
283 return 0;
284
285 err_exit:
286 return -1;
287 }
288
289 static int qemu_archipelago_init(BDRVArchipelagoState *s)
290 {
291 int ret;
292
293 ret = qemu_archipelago_xseg_init(s);
294 if (ret < 0) {
295 error_report("Cannot initialize XSEG. Aborting...\n");
296 goto err_exit;
297 }
298
299 qemu_cond_init(&s->archip_cond);
300 qemu_mutex_init(&s->archip_mutex);
301 qemu_cond_init(&s->request_cond);
302 qemu_mutex_init(&s->request_mutex);
303 s->th_is_signaled = false;
304 qemu_thread_create(&s->request_th, "xseg_io_th",
305 (void *) xseg_request_handler,
306 (void *) s, QEMU_THREAD_JOINABLE);
307
308 err_exit:
309 return ret;
310 }
311
312 static void qemu_archipelago_complete_aio(void *opaque)
313 {
314 AIORequestData *reqdata = (AIORequestData *) opaque;
315 ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
316
317 qemu_bh_delete(aio_cb->bh);
318 aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
319 aio_cb->status = 0;
320
321 if (!aio_cb->cancelled) {
322 qemu_aio_release(aio_cb);
323 }
324 g_free(reqdata);
325 }
326
327 static void xseg_find_port(char *pstr, const char *needle, xport *aport)
328 {
329 const char *a;
330 char *endptr = NULL;
331 unsigned long port;
332 if (strstart(pstr, needle, &a)) {
333 if (strlen(a) > 0) {
334 port = strtoul(a, &endptr, 10);
335 if (strlen(endptr)) {
336 *aport = -2;
337 return;
338 }
339 *aport = (xport) port;
340 }
341 }
342 }
343
344 static void xseg_find_segment(char *pstr, const char *needle,
345 char **segment_name)
346 {
347 const char *a;
348 if (strstart(pstr, needle, &a)) {
349 if (strlen(a) > 0) {
350 *segment_name = g_strdup(a);
351 }
352 }
353 }
354
355 static void parse_filename_opts(const char *filename, Error **errp,
356 char **volume, char **segment_name,
357 xport *mport, xport *vport)
358 {
359 const char *start;
360 char *tokens[4], *ds;
361 int idx;
362 xport lmport = NoPort, lvport = NoPort;
363
364 strstart(filename, "archipelago:", &start);
365
366 ds = g_strdup(start);
367 tokens[0] = strtok(ds, "/");
368 tokens[1] = strtok(NULL, ":");
369 tokens[2] = strtok(NULL, ":");
370 tokens[3] = strtok(NULL, "\0");
371
372 if (!strlen(tokens[0])) {
373 error_setg(errp, "volume name must be specified first");
374 g_free(ds);
375 return;
376 }
377
378 for (idx = 1; idx < 4; idx++) {
379 if (tokens[idx] != NULL) {
380 if (strstart(tokens[idx], "mport=", NULL)) {
381 xseg_find_port(tokens[idx], "mport=", &lmport);
382 }
383 if (strstart(tokens[idx], "vport=", NULL)) {
384 xseg_find_port(tokens[idx], "vport=", &lvport);
385 }
386 if (strstart(tokens[idx], "segment=", NULL)) {
387 xseg_find_segment(tokens[idx], "segment=", segment_name);
388 }
389 }
390 }
391
392 if ((lmport == -2) || (lvport == -2)) {
393 error_setg(errp, "mport and/or vport must be set");
394 g_free(ds);
395 return;
396 }
397 *volume = g_strdup(tokens[0]);
398 *mport = lmport;
399 *vport = lvport;
400 g_free(ds);
401 }
402
403 static void archipelago_parse_filename(const char *filename, QDict *options,
404 Error **errp)
405 {
406 const char *start;
407 char *volume = NULL, *segment_name = NULL;
408 xport mport = NoPort, vport = NoPort;
409
410 if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
411 || qdict_haskey(options, ARCHIPELAGO_OPT_SEGMENT)
412 || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
413 || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
414 error_setg(errp, "volume/mport/vport/segment and a file name may not"
415 " be specified at the same time");
416 return;
417 }
418
419 if (!strstart(filename, "archipelago:", &start)) {
420 error_setg(errp, "File name must start with 'archipelago:'");
421 return;
422 }
423
424 if (!strlen(start) || strstart(start, "/", NULL)) {
425 error_setg(errp, "volume name must be specified");
426 return;
427 }
428
429 parse_filename_opts(filename, errp, &volume, &segment_name, &mport, &vport);
430
431 if (volume) {
432 qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
433 g_free(volume);
434 }
435 if (segment_name) {
436 qdict_put(options, ARCHIPELAGO_OPT_SEGMENT,
437 qstring_from_str(segment_name));
438 g_free(segment_name);
439 }
440 if (mport != NoPort) {
441 qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
442 }
443 if (vport != NoPort) {
444 qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
445 }
446 }
447
448 static QemuOptsList archipelago_runtime_opts = {
449 .name = "archipelago",
450 .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
451 .desc = {
452 {
453 .name = ARCHIPELAGO_OPT_VOLUME,
454 .type = QEMU_OPT_STRING,
455 .help = "Name of the volume image",
456 },
457 {
458 .name = ARCHIPELAGO_OPT_SEGMENT,
459 .type = QEMU_OPT_STRING,
460 .help = "Name of the Archipelago shared memory segment",
461 },
462 {
463 .name = ARCHIPELAGO_OPT_MPORT,
464 .type = QEMU_OPT_NUMBER,
465 .help = "Archipelago mapperd port number"
466 },
467 {
468 .name = ARCHIPELAGO_OPT_VPORT,
469 .type = QEMU_OPT_NUMBER,
470 .help = "Archipelago vlmcd port number"
471
472 },
473 { /* end of list */ }
474 },
475 };
476
477 static int qemu_archipelago_open(BlockDriverState *bs,
478 QDict *options,
479 int bdrv_flags,
480 Error **errp)
481 {
482 int ret = 0;
483 const char *volume, *segment_name;
484 QemuOpts *opts;
485 Error *local_err = NULL;
486 BDRVArchipelagoState *s = bs->opaque;
487
488 opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
489 qemu_opts_absorb_qdict(opts, options, &local_err);
490 if (local_err) {
491 error_propagate(errp, local_err);
492 ret = -EINVAL;
493 goto err_exit;
494 }
495
496 s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT,
497 ARCHIPELAGO_DFL_MPORT);
498 s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT,
499 ARCHIPELAGO_DFL_VPORT);
500
501 segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT);
502 if (segment_name == NULL) {
503 s->segment_name = g_strdup("archipelago");
504 } else {
505 s->segment_name = g_strdup(segment_name);
506 }
507
508 volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
509 if (volume == NULL) {
510 error_setg(errp, "archipelago block driver requires the 'volume'"
511 " option");
512 ret = -EINVAL;
513 goto err_exit;
514 }
515 s->volname = g_strdup(volume);
516
517 /* Initialize XSEG, join shared memory segment */
518 ret = qemu_archipelago_init(s);
519 if (ret < 0) {
520 error_setg(errp, "cannot initialize XSEG and join shared "
521 "memory segment");
522 goto err_exit;
523 }
524
525 qemu_opts_del(opts);
526 return 0;
527
528 err_exit:
529 g_free(s->volname);
530 g_free(s->segment_name);
531 qemu_opts_del(opts);
532 return ret;
533 }
534
535 static void qemu_archipelago_close(BlockDriverState *bs)
536 {
537 int r, targetlen;
538 char *target;
539 struct xseg_request *req;
540 BDRVArchipelagoState *s = bs->opaque;
541
542 s->stopping = true;
543
544 qemu_mutex_lock(&s->request_mutex);
545 while (!s->th_is_signaled) {
546 qemu_cond_wait(&s->request_cond,
547 &s->request_mutex);
548 }
549 qemu_mutex_unlock(&s->request_mutex);
550 qemu_thread_join(&s->request_th);
551 qemu_cond_destroy(&s->request_cond);
552 qemu_mutex_destroy(&s->request_mutex);
553
554 qemu_cond_destroy(&s->archip_cond);
555 qemu_mutex_destroy(&s->archip_mutex);
556
557 targetlen = strlen(s->volname);
558 req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
559 if (!req) {
560 archipelagolog("Cannot get XSEG request\n");
561 goto err_exit;
562 }
563 r = xseg_prep_request(s->xseg, req, targetlen, 0);
564 if (r < 0) {
565 xseg_put_request(s->xseg, req, s->srcport);
566 archipelagolog("Cannot prepare XSEG close request\n");
567 goto err_exit;
568 }
569
570 target = xseg_get_target(s->xseg, req);
571 memcpy(target, s->volname, targetlen);
572 req->size = req->datalen;
573 req->offset = 0;
574 req->op = X_CLOSE;
575
576 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
577 if (p == NoPort) {
578 xseg_put_request(s->xseg, req, s->srcport);
579 archipelagolog("Cannot submit XSEG close request\n");
580 goto err_exit;
581 }
582
583 xseg_signal(s->xseg, p);
584 wait_reply(s->xseg, s->srcport, s->port, req);
585
586 xseg_put_request(s->xseg, req, s->srcport);
587
588 err_exit:
589 g_free(s->volname);
590 g_free(s->segment_name);
591 xseg_quit_local_signal(s->xseg, s->srcport);
592 xseg_leave_dynport(s->xseg, s->port);
593 xseg_leave(s->xseg);
594 }
595
596 static int qemu_archipelago_create_volume(Error **errp, const char *volname,
597 char *segment_name,
598 uint64_t size, xport mportno,
599 xport vportno)
600 {
601 int ret, targetlen;
602 struct xseg *xseg = NULL;
603 struct xseg_request *req;
604 struct xseg_request_clone *xclone;
605 struct xseg_port *port;
606 xport srcport = NoPort, sport = NoPort;
607 char *target;
608
609 /* Try default values if none has been set */
610 if (mportno == (xport) -1) {
611 mportno = ARCHIPELAGO_DFL_MPORT;
612 }
613
614 if (vportno == (xport) -1) {
615 vportno = ARCHIPELAGO_DFL_VPORT;
616 }
617
618 if (xseg_initialize()) {
619 error_setg(errp, "Cannot initialize XSEG");
620 return -1;
621 }
622
623 xseg = xseg_join("posix", segment_name,
624 "posixfd", NULL);
625
626 if (!xseg) {
627 error_setg(errp, "Cannot join XSEG shared memory segment");
628 return -1;
629 }
630
631 port = xseg_bind_dynport(xseg);
632 srcport = port->portno;
633 init_local_signal(xseg, sport, srcport);
634
635 req = xseg_get_request(xseg, srcport, mportno, X_ALLOC);
636 if (!req) {
637 error_setg(errp, "Cannot get XSEG request");
638 return -1;
639 }
640
641 targetlen = strlen(volname);
642 ret = xseg_prep_request(xseg, req, targetlen,
643 sizeof(struct xseg_request_clone));
644 if (ret < 0) {
645 error_setg(errp, "Cannot prepare XSEG request");
646 goto err_exit;
647 }
648
649 target = xseg_get_target(xseg, req);
650 if (!target) {
651 error_setg(errp, "Cannot get XSEG target.\n");
652 goto err_exit;
653 }
654 memcpy(target, volname, targetlen);
655 xclone = (struct xseg_request_clone *) xseg_get_data(xseg, req);
656 memset(xclone->target, 0 , XSEG_MAX_TARGETLEN);
657 xclone->targetlen = 0;
658 xclone->size = size;
659 req->offset = 0;
660 req->size = req->datalen;
661 req->op = X_CLONE;
662
663 xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
664 if (p == NoPort) {
665 error_setg(errp, "Could not submit XSEG request");
666 goto err_exit;
667 }
668 xseg_signal(xseg, p);
669
670 ret = wait_reply(xseg, srcport, port, req);
671 if (ret < 0) {
672 error_setg(errp, "wait_reply() error.");
673 }
674
675 xseg_put_request(xseg, req, srcport);
676 xseg_quit_local_signal(xseg, srcport);
677 xseg_leave_dynport(xseg, port);
678 xseg_leave(xseg);
679 return ret;
680
681 err_exit:
682 xseg_put_request(xseg, req, srcport);
683 xseg_quit_local_signal(xseg, srcport);
684 xseg_leave_dynport(xseg, port);
685 xseg_leave(xseg);
686 return -1;
687 }
688
689 static int qemu_archipelago_create(const char *filename,
690 QemuOpts *options,
691 Error **errp)
692 {
693 int ret = 0;
694 uint64_t total_size = 0;
695 char *volname = NULL, *segment_name = NULL;
696 const char *start;
697 xport mport = NoPort, vport = NoPort;
698
699 if (!strstart(filename, "archipelago:", &start)) {
700 error_setg(errp, "File name must start with 'archipelago:'");
701 return -1;
702 }
703
704 if (!strlen(start) || strstart(start, "/", NULL)) {
705 error_setg(errp, "volume name must be specified");
706 return -1;
707 }
708
709 parse_filename_opts(filename, errp, &volname, &segment_name, &mport,
710 &vport);
711 total_size = ROUND_UP(qemu_opt_get_size_del(options, BLOCK_OPT_SIZE, 0),
712 BDRV_SECTOR_SIZE);
713
714 if (segment_name == NULL) {
715 segment_name = g_strdup("archipelago");
716 }
717
718 /* Create an Archipelago volume */
719 ret = qemu_archipelago_create_volume(errp, volname, segment_name,
720 total_size, mport,
721 vport);
722
723 g_free(volname);
724 g_free(segment_name);
725 return ret;
726 }
727
728 static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
729 {
730 ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
731 aio_cb->cancelled = true;
732 while (aio_cb->status == -EINPROGRESS) {
733 aio_poll(bdrv_get_aio_context(aio_cb->common.bs), true);
734 }
735 qemu_aio_release(aio_cb);
736 }
737
738 static const AIOCBInfo archipelago_aiocb_info = {
739 .aiocb_size = sizeof(ArchipelagoAIOCB),
740 .cancel = qemu_archipelago_aio_cancel,
741 };
742
743 static int archipelago_submit_request(BDRVArchipelagoState *s,
744 uint64_t bufidx,
745 size_t count,
746 off_t offset,
747 ArchipelagoAIOCB *aio_cb,
748 ArchipelagoSegmentedRequest *segreq,
749 int op)
750 {
751 int ret, targetlen;
752 char *target;
753 void *data = NULL;
754 struct xseg_request *req;
755 AIORequestData *reqdata = g_new(AIORequestData, 1);
756
757 targetlen = strlen(s->volname);
758 req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
759 if (!req) {
760 archipelagolog("Cannot get XSEG request\n");
761 goto err_exit2;
762 }
763 ret = xseg_prep_request(s->xseg, req, targetlen, count);
764 if (ret < 0) {
765 archipelagolog("Cannot prepare XSEG request\n");
766 goto err_exit;
767 }
768 target = xseg_get_target(s->xseg, req);
769 if (!target) {
770 archipelagolog("Cannot get XSEG target\n");
771 goto err_exit;
772 }
773 memcpy(target, s->volname, targetlen);
774 req->size = count;
775 req->offset = offset;
776
777 switch (op) {
778 case ARCHIP_OP_READ:
779 req->op = X_READ;
780 break;
781 case ARCHIP_OP_WRITE:
782 req->op = X_WRITE;
783 break;
784 case ARCHIP_OP_FLUSH:
785 req->op = X_FLUSH;
786 break;
787 }
788 reqdata->volname = s->volname;
789 reqdata->offset = offset;
790 reqdata->size = count;
791 reqdata->bufidx = bufidx;
792 reqdata->aio_cb = aio_cb;
793 reqdata->segreq = segreq;
794 reqdata->op = op;
795
796 xseg_set_req_data(s->xseg, req, reqdata);
797 if (op == ARCHIP_OP_WRITE) {
798 data = xseg_get_data(s->xseg, req);
799 if (!data) {
800 archipelagolog("Cannot get XSEG data\n");
801 goto err_exit;
802 }
803 qemu_iovec_to_buf(aio_cb->qiov, bufidx, data, count);
804 }
805
806 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
807 if (p == NoPort) {
808 archipelagolog("Could not submit XSEG request\n");
809 goto err_exit;
810 }
811 xseg_signal(s->xseg, p);
812 return 0;
813
814 err_exit:
815 g_free(reqdata);
816 xseg_put_request(s->xseg, req, s->srcport);
817 return -EIO;
818 err_exit2:
819 g_free(reqdata);
820 return -EIO;
821 }
822
823 static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
824 size_t count,
825 off_t offset,
826 ArchipelagoAIOCB *aio_cb,
827 int op)
828 {
829 int ret, segments_nr;
830 size_t pos = 0;
831 ArchipelagoSegmentedRequest *segreq;
832
833 segreq = g_new0(ArchipelagoSegmentedRequest, 1);
834
835 if (op == ARCHIP_OP_FLUSH) {
836 segments_nr = 1;
837 } else {
838 segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
839 ((count % MAX_REQUEST_SIZE) ? 1 : 0);
840 }
841 segreq->total = count;
842 atomic_mb_set(&segreq->ref, segments_nr);
843
844 while (segments_nr > 1) {
845 ret = archipelago_submit_request(s, pos,
846 MAX_REQUEST_SIZE,
847 offset + pos,
848 aio_cb, segreq, op);
849
850 if (ret < 0) {
851 goto err_exit;
852 }
853 count -= MAX_REQUEST_SIZE;
854 pos += MAX_REQUEST_SIZE;
855 segments_nr--;
856 }
857 ret = archipelago_submit_request(s, pos, count, offset + pos,
858 aio_cb, segreq, op);
859
860 if (ret < 0) {
861 goto err_exit;
862 }
863 return 0;
864
865 err_exit:
866 segreq->failed = 1;
867 if (atomic_fetch_sub(&segreq->ref, segments_nr) == segments_nr) {
868 g_free(segreq);
869 }
870 return ret;
871 }
872
873 static BlockDriverAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
874 int64_t sector_num,
875 QEMUIOVector *qiov,
876 int nb_sectors,
877 BlockDriverCompletionFunc *cb,
878 void *opaque,
879 int op)
880 {
881 ArchipelagoAIOCB *aio_cb;
882 BDRVArchipelagoState *s = bs->opaque;
883 int64_t size, off;
884 int ret;
885
886 aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
887 aio_cb->cmd = op;
888 aio_cb->qiov = qiov;
889
890 aio_cb->ret = 0;
891 aio_cb->s = s;
892 aio_cb->cancelled = false;
893 aio_cb->status = -EINPROGRESS;
894
895 off = sector_num * BDRV_SECTOR_SIZE;
896 size = nb_sectors * BDRV_SECTOR_SIZE;
897 aio_cb->size = size;
898
899 ret = archipelago_aio_segmented_rw(s, size, off,
900 aio_cb, op);
901 if (ret < 0) {
902 goto err_exit;
903 }
904 return &aio_cb->common;
905
906 err_exit:
907 error_report("qemu_archipelago_aio_rw(): I/O Error\n");
908 qemu_aio_release(aio_cb);
909 return NULL;
910 }
911
912 static BlockDriverAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
913 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
914 BlockDriverCompletionFunc *cb, void *opaque)
915 {
916 return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
917 opaque, ARCHIP_OP_READ);
918 }
919
920 static BlockDriverAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
921 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
922 BlockDriverCompletionFunc *cb, void *opaque)
923 {
924 return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
925 opaque, ARCHIP_OP_WRITE);
926 }
927
928 static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
929 {
930 uint64_t size;
931 int ret, targetlen;
932 struct xseg_request *req;
933 struct xseg_reply_info *xinfo;
934 AIORequestData *reqdata = g_new(AIORequestData, 1);
935
936 const char *volname = s->volname;
937 targetlen = strlen(volname);
938 req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
939 if (!req) {
940 archipelagolog("Cannot get XSEG request\n");
941 goto err_exit2;
942 }
943 ret = xseg_prep_request(s->xseg, req, targetlen,
944 sizeof(struct xseg_reply_info));
945 if (ret < 0) {
946 archipelagolog("Cannot prepare XSEG request\n");
947 goto err_exit;
948 }
949 char *target = xseg_get_target(s->xseg, req);
950 if (!target) {
951 archipelagolog("Cannot get XSEG target\n");
952 goto err_exit;
953 }
954 memcpy(target, volname, targetlen);
955 req->size = req->datalen;
956 req->offset = 0;
957 req->op = X_INFO;
958
959 reqdata->op = ARCHIP_OP_VOLINFO;
960 reqdata->volname = volname;
961 xseg_set_req_data(s->xseg, req, reqdata);
962
963 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
964 if (p == NoPort) {
965 archipelagolog("Cannot submit XSEG request\n");
966 goto err_exit;
967 }
968 xseg_signal(s->xseg, p);
969 qemu_mutex_lock(&s->archip_mutex);
970 while (!s->is_signaled) {
971 qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
972 }
973 s->is_signaled = false;
974 qemu_mutex_unlock(&s->archip_mutex);
975
976 xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
977 size = xinfo->size;
978 xseg_put_request(s->xseg, req, s->srcport);
979 g_free(reqdata);
980 s->size = size;
981 return size;
982
983 err_exit:
984 xseg_put_request(s->xseg, req, s->srcport);
985 err_exit2:
986 g_free(reqdata);
987 return -EIO;
988 }
989
990 static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
991 {
992 int64_t ret;
993 BDRVArchipelagoState *s = bs->opaque;
994
995 ret = archipelago_volume_info(s);
996 return ret;
997 }
998
999 static int qemu_archipelago_truncate(BlockDriverState *bs, int64_t offset)
1000 {
1001 int ret, targetlen;
1002 struct xseg_request *req;
1003 BDRVArchipelagoState *s = bs->opaque;
1004 AIORequestData *reqdata = g_new(AIORequestData, 1);
1005
1006 const char *volname = s->volname;
1007 targetlen = strlen(volname);
1008 req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
1009 if (!req) {
1010 archipelagolog("Cannot get XSEG request\n");
1011 return err_exit2;
1012 }
1013
1014 ret = xseg_prep_request(s->xseg, req, targetlen, 0);
1015 if (ret < 0) {
1016 archipelagolog("Cannot prepare XSEG request\n");
1017 goto err_exit;
1018 }
1019 char *target = xseg_get_target(s->xseg, req);
1020 if (!target) {
1021 archipelagolog("Cannot get XSEG target\n");
1022 goto err_exit;
1023 }
1024 memcpy(target, volname, targetlen);
1025 req->offset = offset;
1026 req->op = X_TRUNCATE;
1027
1028 reqdata->op = ARCHIP_OP_TRUNCATE;
1029 reqdata->volname = volname;
1030
1031 xseg_set_req_data(s->xseg, req, reqdata);
1032
1033 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
1034 if (p == NoPort) {
1035 archipelagolog("Cannot submit XSEG request\n");
1036 goto err_exit;
1037 }
1038
1039 xseg_signal(s->xseg, p);
1040 qemu_mutex_lock(&s->archip_mutex);
1041 while (!s->is_signaled) {
1042 qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
1043 }
1044 s->is_signaled = false;
1045 qemu_mutex_unlock(&s->archip_mutex);
1046 xseg_put_request(s->xseg, req, s->srcport);
1047 g_free(reqdata);
1048 return 0;
1049
1050 err_exit:
1051 xseg_put_request(s->xseg, req, s->srcport);
1052 err_exit2:
1053 g_free(reqdata);
1054 return -EIO;
1055 }
1056
1057 static QemuOptsList qemu_archipelago_create_opts = {
1058 .name = "archipelago-create-opts",
1059 .head = QTAILQ_HEAD_INITIALIZER(qemu_archipelago_create_opts.head),
1060 .desc = {
1061 {
1062 .name = BLOCK_OPT_SIZE,
1063 .type = QEMU_OPT_SIZE,
1064 .help = "Virtual disk size"
1065 },
1066 { /* end of list */ }
1067 }
1068 };
1069
1070 static BlockDriverAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
1071 BlockDriverCompletionFunc *cb, void *opaque)
1072 {
1073 return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
1074 ARCHIP_OP_FLUSH);
1075 }
1076
1077 static BlockDriver bdrv_archipelago = {
1078 .format_name = "archipelago",
1079 .protocol_name = "archipelago",
1080 .instance_size = sizeof(BDRVArchipelagoState),
1081 .bdrv_parse_filename = archipelago_parse_filename,
1082 .bdrv_file_open = qemu_archipelago_open,
1083 .bdrv_close = qemu_archipelago_close,
1084 .bdrv_create = qemu_archipelago_create,
1085 .bdrv_getlength = qemu_archipelago_getlength,
1086 .bdrv_truncate = qemu_archipelago_truncate,
1087 .bdrv_aio_readv = qemu_archipelago_aio_readv,
1088 .bdrv_aio_writev = qemu_archipelago_aio_writev,
1089 .bdrv_aio_flush = qemu_archipelago_aio_flush,
1090 .bdrv_has_zero_init = bdrv_has_zero_init_1,
1091 .create_opts = &qemu_archipelago_create_opts,
1092 };
1093
1094 static void bdrv_archipelago_init(void)
1095 {
1096 bdrv_register(&bdrv_archipelago);
1097 }
1098
1099 block_init(bdrv_archipelago_init);