]> git.proxmox.com Git - mirror_qemu.git/blob - block/nbd-client.c
Revert "audio: fix pc speaker init"
[mirror_qemu.git] / block / nbd-client.c
1 /*
2 * QEMU Block driver for NBD
3 *
4 * Copyright (C) 2016 Red Hat, Inc.
5 * Copyright (C) 2008 Bull S.A.S.
6 * Author: Laurent Vivier <Laurent.Vivier@bull.net>
7 *
8 * Some parts:
9 * Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 * THE SOFTWARE.
28 */
29
30 #include "qemu/osdep.h"
31
32 #include "trace.h"
33 #include "qapi/error.h"
34 #include "nbd-client.h"
35
36 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
37 #define INDEX_TO_HANDLE(bs, index) ((index) ^ (uint64_t)(intptr_t)(bs))
38
39 static void nbd_recv_coroutines_wake_all(NBDClientSession *s)
40 {
41 int i;
42
43 for (i = 0; i < MAX_NBD_REQUESTS; i++) {
44 NBDClientRequest *req = &s->requests[i];
45
46 if (req->coroutine && req->receiving) {
47 aio_co_wake(req->coroutine);
48 }
49 }
50 }
51
52 static void nbd_teardown_connection(BlockDriverState *bs)
53 {
54 NBDClientSession *client = nbd_get_client_session(bs);
55
56 assert(client->ioc);
57
58 /* finish any pending coroutines */
59 qio_channel_shutdown(client->ioc,
60 QIO_CHANNEL_SHUTDOWN_BOTH,
61 NULL);
62 BDRV_POLL_WHILE(bs, client->connection_co);
63
64 nbd_client_detach_aio_context(bs);
65 object_unref(OBJECT(client->sioc));
66 client->sioc = NULL;
67 object_unref(OBJECT(client->ioc));
68 client->ioc = NULL;
69 }
70
71 static coroutine_fn void nbd_connection_entry(void *opaque)
72 {
73 NBDClientSession *s = opaque;
74 uint64_t i;
75 int ret = 0;
76 Error *local_err = NULL;
77
78 while (!s->quit) {
79 /*
80 * The NBD client can only really be considered idle when it has
81 * yielded from qio_channel_readv_all_eof(), waiting for data. This is
82 * the point where the additional scheduled coroutine entry happens
83 * after nbd_client_attach_aio_context().
84 *
85 * Therefore we keep an additional in_flight reference all the time and
86 * only drop it temporarily here.
87 */
88 assert(s->reply.handle == 0);
89 ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
90
91 if (local_err) {
92 trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
93 error_free(local_err);
94 }
95 if (ret <= 0) {
96 break;
97 }
98
99 /* There's no need for a mutex on the receive side, because the
100 * handler acts as a synchronization point and ensures that only
101 * one coroutine is called until the reply finishes.
102 */
103 i = HANDLE_TO_INDEX(s, s->reply.handle);
104 if (i >= MAX_NBD_REQUESTS ||
105 !s->requests[i].coroutine ||
106 !s->requests[i].receiving ||
107 (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
108 {
109 break;
110 }
111
112 /* We're woken up again by the request itself. Note that there
113 * is no race between yielding and reentering connection_co. This
114 * is because:
115 *
116 * - if the request runs on the same AioContext, it is only
117 * entered after we yield
118 *
119 * - if the request runs on a different AioContext, reentering
120 * connection_co happens through a bottom half, which can only
121 * run after we yield.
122 */
123 aio_co_wake(s->requests[i].coroutine);
124 qemu_coroutine_yield();
125 }
126
127 s->quit = true;
128 nbd_recv_coroutines_wake_all(s);
129 bdrv_dec_in_flight(s->bs);
130
131 s->connection_co = NULL;
132 aio_wait_kick();
133 }
134
135 static int nbd_co_send_request(BlockDriverState *bs,
136 NBDRequest *request,
137 QEMUIOVector *qiov)
138 {
139 NBDClientSession *s = nbd_get_client_session(bs);
140 int rc, i;
141
142 qemu_co_mutex_lock(&s->send_mutex);
143 while (s->in_flight == MAX_NBD_REQUESTS) {
144 qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
145 }
146 s->in_flight++;
147
148 for (i = 0; i < MAX_NBD_REQUESTS; i++) {
149 if (s->requests[i].coroutine == NULL) {
150 break;
151 }
152 }
153
154 g_assert(qemu_in_coroutine());
155 assert(i < MAX_NBD_REQUESTS);
156
157 s->requests[i].coroutine = qemu_coroutine_self();
158 s->requests[i].offset = request->from;
159 s->requests[i].receiving = false;
160
161 request->handle = INDEX_TO_HANDLE(s, i);
162
163 if (s->quit) {
164 rc = -EIO;
165 goto err;
166 }
167 assert(s->ioc);
168
169 if (qiov) {
170 qio_channel_set_cork(s->ioc, true);
171 rc = nbd_send_request(s->ioc, request);
172 if (rc >= 0 && !s->quit) {
173 if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
174 NULL) < 0) {
175 rc = -EIO;
176 }
177 } else if (rc >= 0) {
178 rc = -EIO;
179 }
180 qio_channel_set_cork(s->ioc, false);
181 } else {
182 rc = nbd_send_request(s->ioc, request);
183 }
184
185 err:
186 if (rc < 0) {
187 s->quit = true;
188 s->requests[i].coroutine = NULL;
189 s->in_flight--;
190 qemu_co_queue_next(&s->free_sema);
191 }
192 qemu_co_mutex_unlock(&s->send_mutex);
193 return rc;
194 }
195
196 static inline uint16_t payload_advance16(uint8_t **payload)
197 {
198 *payload += 2;
199 return lduw_be_p(*payload - 2);
200 }
201
202 static inline uint32_t payload_advance32(uint8_t **payload)
203 {
204 *payload += 4;
205 return ldl_be_p(*payload - 4);
206 }
207
208 static inline uint64_t payload_advance64(uint8_t **payload)
209 {
210 *payload += 8;
211 return ldq_be_p(*payload - 8);
212 }
213
214 static int nbd_parse_offset_hole_payload(NBDStructuredReplyChunk *chunk,
215 uint8_t *payload, uint64_t orig_offset,
216 QEMUIOVector *qiov, Error **errp)
217 {
218 uint64_t offset;
219 uint32_t hole_size;
220
221 if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
222 error_setg(errp, "Protocol error: invalid payload for "
223 "NBD_REPLY_TYPE_OFFSET_HOLE");
224 return -EINVAL;
225 }
226
227 offset = payload_advance64(&payload);
228 hole_size = payload_advance32(&payload);
229
230 if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
231 offset > orig_offset + qiov->size - hole_size) {
232 error_setg(errp, "Protocol error: server sent chunk exceeding requested"
233 " region");
234 return -EINVAL;
235 }
236
237 qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
238
239 return 0;
240 }
241
242 /* nbd_parse_blockstatus_payload
243 * support only one extent in reply and only for
244 * base:allocation context
245 */
246 static int nbd_parse_blockstatus_payload(NBDClientSession *client,
247 NBDStructuredReplyChunk *chunk,
248 uint8_t *payload, uint64_t orig_length,
249 NBDExtent *extent, Error **errp)
250 {
251 uint32_t context_id;
252
253 if (chunk->length != sizeof(context_id) + sizeof(*extent)) {
254 error_setg(errp, "Protocol error: invalid payload for "
255 "NBD_REPLY_TYPE_BLOCK_STATUS");
256 return -EINVAL;
257 }
258
259 context_id = payload_advance32(&payload);
260 if (client->info.context_id != context_id) {
261 error_setg(errp, "Protocol error: unexpected context id %d for "
262 "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
263 "id is %d", context_id,
264 client->info.context_id);
265 return -EINVAL;
266 }
267
268 extent->length = payload_advance32(&payload);
269 extent->flags = payload_advance32(&payload);
270
271 if (extent->length == 0 ||
272 (client->info.min_block && !QEMU_IS_ALIGNED(extent->length,
273 client->info.min_block))) {
274 error_setg(errp, "Protocol error: server sent status chunk with "
275 "invalid length");
276 return -EINVAL;
277 }
278
279 /* The server is allowed to send us extra information on the final
280 * extent; just clamp it to the length we requested. */
281 if (extent->length > orig_length) {
282 extent->length = orig_length;
283 }
284
285 return 0;
286 }
287
288 /* nbd_parse_error_payload
289 * on success @errp contains message describing nbd error reply
290 */
291 static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
292 uint8_t *payload, int *request_ret,
293 Error **errp)
294 {
295 uint32_t error;
296 uint16_t message_size;
297
298 assert(chunk->type & (1 << 15));
299
300 if (chunk->length < sizeof(error) + sizeof(message_size)) {
301 error_setg(errp,
302 "Protocol error: invalid payload for structured error");
303 return -EINVAL;
304 }
305
306 error = nbd_errno_to_system_errno(payload_advance32(&payload));
307 if (error == 0) {
308 error_setg(errp, "Protocol error: server sent structured error chunk "
309 "with error = 0");
310 return -EINVAL;
311 }
312
313 *request_ret = -error;
314 message_size = payload_advance16(&payload);
315
316 if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
317 error_setg(errp, "Protocol error: server sent structured error chunk "
318 "with incorrect message size");
319 return -EINVAL;
320 }
321
322 /* TODO: Add a trace point to mention the server complaint */
323
324 /* TODO handle ERROR_OFFSET */
325
326 return 0;
327 }
328
329 static int nbd_co_receive_offset_data_payload(NBDClientSession *s,
330 uint64_t orig_offset,
331 QEMUIOVector *qiov, Error **errp)
332 {
333 QEMUIOVector sub_qiov;
334 uint64_t offset;
335 size_t data_size;
336 int ret;
337 NBDStructuredReplyChunk *chunk = &s->reply.structured;
338
339 assert(nbd_reply_is_structured(&s->reply));
340
341 /* The NBD spec requires at least one byte of payload */
342 if (chunk->length <= sizeof(offset)) {
343 error_setg(errp, "Protocol error: invalid payload for "
344 "NBD_REPLY_TYPE_OFFSET_DATA");
345 return -EINVAL;
346 }
347
348 if (nbd_read64(s->ioc, &offset, "OFFSET_DATA offset", errp) < 0) {
349 return -EIO;
350 }
351
352 data_size = chunk->length - sizeof(offset);
353 assert(data_size);
354 if (offset < orig_offset || data_size > qiov->size ||
355 offset > orig_offset + qiov->size - data_size) {
356 error_setg(errp, "Protocol error: server sent chunk exceeding requested"
357 " region");
358 return -EINVAL;
359 }
360
361 qemu_iovec_init(&sub_qiov, qiov->niov);
362 qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
363 ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
364 qemu_iovec_destroy(&sub_qiov);
365
366 return ret < 0 ? -EIO : 0;
367 }
368
369 #define NBD_MAX_MALLOC_PAYLOAD 1000
370 /* nbd_co_receive_structured_payload
371 */
372 static coroutine_fn int nbd_co_receive_structured_payload(
373 NBDClientSession *s, void **payload, Error **errp)
374 {
375 int ret;
376 uint32_t len;
377
378 assert(nbd_reply_is_structured(&s->reply));
379
380 len = s->reply.structured.length;
381
382 if (len == 0) {
383 return 0;
384 }
385
386 if (payload == NULL) {
387 error_setg(errp, "Unexpected structured payload");
388 return -EINVAL;
389 }
390
391 if (len > NBD_MAX_MALLOC_PAYLOAD) {
392 error_setg(errp, "Payload too large");
393 return -EINVAL;
394 }
395
396 *payload = g_new(char, len);
397 ret = nbd_read(s->ioc, *payload, len, "structured payload", errp);
398 if (ret < 0) {
399 g_free(*payload);
400 *payload = NULL;
401 return ret;
402 }
403
404 return 0;
405 }
406
407 /* nbd_co_do_receive_one_chunk
408 * for simple reply:
409 * set request_ret to received reply error
410 * if qiov is not NULL: read payload to @qiov
411 * for structured reply chunk:
412 * if error chunk: read payload, set @request_ret, do not set @payload
413 * else if offset_data chunk: read payload data to @qiov, do not set @payload
414 * else: read payload to @payload
415 *
416 * If function fails, @errp contains corresponding error message, and the
417 * connection with the server is suspect. If it returns 0, then the
418 * transaction succeeded (although @request_ret may be a negative errno
419 * corresponding to the server's error reply), and errp is unchanged.
420 */
421 static coroutine_fn int nbd_co_do_receive_one_chunk(
422 NBDClientSession *s, uint64_t handle, bool only_structured,
423 int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
424 {
425 int ret;
426 int i = HANDLE_TO_INDEX(s, handle);
427 void *local_payload = NULL;
428 NBDStructuredReplyChunk *chunk;
429
430 if (payload) {
431 *payload = NULL;
432 }
433 *request_ret = 0;
434
435 /* Wait until we're woken up by nbd_connection_entry. */
436 s->requests[i].receiving = true;
437 qemu_coroutine_yield();
438 s->requests[i].receiving = false;
439 if (s->quit) {
440 error_setg(errp, "Connection closed");
441 return -EIO;
442 }
443 assert(s->ioc);
444
445 assert(s->reply.handle == handle);
446
447 if (nbd_reply_is_simple(&s->reply)) {
448 if (only_structured) {
449 error_setg(errp, "Protocol error: simple reply when structured "
450 "reply chunk was expected");
451 return -EINVAL;
452 }
453
454 *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
455 if (*request_ret < 0 || !qiov) {
456 return 0;
457 }
458
459 return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
460 errp) < 0 ? -EIO : 0;
461 }
462
463 /* handle structured reply chunk */
464 assert(s->info.structured_reply);
465 chunk = &s->reply.structured;
466
467 if (chunk->type == NBD_REPLY_TYPE_NONE) {
468 if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
469 error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
470 " NBD_REPLY_FLAG_DONE flag set");
471 return -EINVAL;
472 }
473 if (chunk->length) {
474 error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
475 " nonzero length");
476 return -EINVAL;
477 }
478 return 0;
479 }
480
481 if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
482 if (!qiov) {
483 error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
484 return -EINVAL;
485 }
486
487 return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
488 qiov, errp);
489 }
490
491 if (nbd_reply_type_is_error(chunk->type)) {
492 payload = &local_payload;
493 }
494
495 ret = nbd_co_receive_structured_payload(s, payload, errp);
496 if (ret < 0) {
497 return ret;
498 }
499
500 if (nbd_reply_type_is_error(chunk->type)) {
501 ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
502 g_free(local_payload);
503 return ret;
504 }
505
506 return 0;
507 }
508
509 /* nbd_co_receive_one_chunk
510 * Read reply, wake up connection_co and set s->quit if needed.
511 * Return value is a fatal error code or normal nbd reply error code
512 */
513 static coroutine_fn int nbd_co_receive_one_chunk(
514 NBDClientSession *s, uint64_t handle, bool only_structured,
515 int *request_ret, QEMUIOVector *qiov, NBDReply *reply, void **payload,
516 Error **errp)
517 {
518 int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
519 request_ret, qiov, payload, errp);
520
521 if (ret < 0) {
522 s->quit = true;
523 } else {
524 /* For assert at loop start in nbd_connection_entry */
525 if (reply) {
526 *reply = s->reply;
527 }
528 s->reply.handle = 0;
529 }
530
531 if (s->connection_co) {
532 aio_co_wake(s->connection_co);
533 }
534
535 return ret;
536 }
537
538 typedef struct NBDReplyChunkIter {
539 int ret;
540 int request_ret;
541 Error *err;
542 bool done, only_structured;
543 } NBDReplyChunkIter;
544
545 static void nbd_iter_channel_error(NBDReplyChunkIter *iter,
546 int ret, Error **local_err)
547 {
548 assert(ret < 0);
549
550 if (!iter->ret) {
551 iter->ret = ret;
552 error_propagate(&iter->err, *local_err);
553 } else {
554 error_free(*local_err);
555 }
556
557 *local_err = NULL;
558 }
559
560 static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
561 {
562 assert(ret < 0);
563
564 if (!iter->request_ret) {
565 iter->request_ret = ret;
566 }
567 }
568
569 /* NBD_FOREACH_REPLY_CHUNK
570 */
571 #define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
572 qiov, reply, payload) \
573 for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
574 nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
575
576 /* nbd_reply_chunk_iter_receive
577 */
578 static bool nbd_reply_chunk_iter_receive(NBDClientSession *s,
579 NBDReplyChunkIter *iter,
580 uint64_t handle,
581 QEMUIOVector *qiov, NBDReply *reply,
582 void **payload)
583 {
584 int ret, request_ret;
585 NBDReply local_reply;
586 NBDStructuredReplyChunk *chunk;
587 Error *local_err = NULL;
588 if (s->quit) {
589 error_setg(&local_err, "Connection closed");
590 nbd_iter_channel_error(iter, -EIO, &local_err);
591 goto break_loop;
592 }
593
594 if (iter->done) {
595 /* Previous iteration was last. */
596 goto break_loop;
597 }
598
599 if (reply == NULL) {
600 reply = &local_reply;
601 }
602
603 ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
604 &request_ret, qiov, reply, payload,
605 &local_err);
606 if (ret < 0) {
607 nbd_iter_channel_error(iter, ret, &local_err);
608 } else if (request_ret < 0) {
609 nbd_iter_request_error(iter, request_ret);
610 }
611
612 /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
613 if (nbd_reply_is_simple(reply) || s->quit) {
614 goto break_loop;
615 }
616
617 chunk = &reply->structured;
618 iter->only_structured = true;
619
620 if (chunk->type == NBD_REPLY_TYPE_NONE) {
621 /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
622 assert(chunk->flags & NBD_REPLY_FLAG_DONE);
623 goto break_loop;
624 }
625
626 if (chunk->flags & NBD_REPLY_FLAG_DONE) {
627 /* This iteration is last. */
628 iter->done = true;
629 }
630
631 /* Execute the loop body */
632 return true;
633
634 break_loop:
635 s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
636
637 qemu_co_mutex_lock(&s->send_mutex);
638 s->in_flight--;
639 qemu_co_queue_next(&s->free_sema);
640 qemu_co_mutex_unlock(&s->send_mutex);
641
642 return false;
643 }
644
645 static int nbd_co_receive_return_code(NBDClientSession *s, uint64_t handle,
646 int *request_ret, Error **errp)
647 {
648 NBDReplyChunkIter iter;
649
650 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) {
651 /* nbd_reply_chunk_iter_receive does all the work */
652 }
653
654 error_propagate(errp, iter.err);
655 *request_ret = iter.request_ret;
656 return iter.ret;
657 }
658
659 static int nbd_co_receive_cmdread_reply(NBDClientSession *s, uint64_t handle,
660 uint64_t offset, QEMUIOVector *qiov,
661 int *request_ret, Error **errp)
662 {
663 NBDReplyChunkIter iter;
664 NBDReply reply;
665 void *payload = NULL;
666 Error *local_err = NULL;
667
668 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
669 qiov, &reply, &payload)
670 {
671 int ret;
672 NBDStructuredReplyChunk *chunk = &reply.structured;
673
674 assert(nbd_reply_is_structured(&reply));
675
676 switch (chunk->type) {
677 case NBD_REPLY_TYPE_OFFSET_DATA:
678 /* special cased in nbd_co_receive_one_chunk, data is already
679 * in qiov */
680 break;
681 case NBD_REPLY_TYPE_OFFSET_HOLE:
682 ret = nbd_parse_offset_hole_payload(&reply.structured, payload,
683 offset, qiov, &local_err);
684 if (ret < 0) {
685 s->quit = true;
686 nbd_iter_channel_error(&iter, ret, &local_err);
687 }
688 break;
689 default:
690 if (!nbd_reply_type_is_error(chunk->type)) {
691 /* not allowed reply type */
692 s->quit = true;
693 error_setg(&local_err,
694 "Unexpected reply type: %d (%s) for CMD_READ",
695 chunk->type, nbd_reply_type_lookup(chunk->type));
696 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
697 }
698 }
699
700 g_free(payload);
701 payload = NULL;
702 }
703
704 error_propagate(errp, iter.err);
705 *request_ret = iter.request_ret;
706 return iter.ret;
707 }
708
709 static int nbd_co_receive_blockstatus_reply(NBDClientSession *s,
710 uint64_t handle, uint64_t length,
711 NBDExtent *extent,
712 int *request_ret, Error **errp)
713 {
714 NBDReplyChunkIter iter;
715 NBDReply reply;
716 void *payload = NULL;
717 Error *local_err = NULL;
718 bool received = false;
719
720 assert(!extent->length);
721 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
722 NULL, &reply, &payload)
723 {
724 int ret;
725 NBDStructuredReplyChunk *chunk = &reply.structured;
726
727 assert(nbd_reply_is_structured(&reply));
728
729 switch (chunk->type) {
730 case NBD_REPLY_TYPE_BLOCK_STATUS:
731 if (received) {
732 s->quit = true;
733 error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
734 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
735 }
736 received = true;
737
738 ret = nbd_parse_blockstatus_payload(s, &reply.structured,
739 payload, length, extent,
740 &local_err);
741 if (ret < 0) {
742 s->quit = true;
743 nbd_iter_channel_error(&iter, ret, &local_err);
744 }
745 break;
746 default:
747 if (!nbd_reply_type_is_error(chunk->type)) {
748 s->quit = true;
749 error_setg(&local_err,
750 "Unexpected reply type: %d (%s) "
751 "for CMD_BLOCK_STATUS",
752 chunk->type, nbd_reply_type_lookup(chunk->type));
753 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
754 }
755 }
756
757 g_free(payload);
758 payload = NULL;
759 }
760
761 if (!extent->length && !iter.err) {
762 error_setg(&iter.err,
763 "Server did not reply with any status extents");
764 if (!iter.ret) {
765 iter.ret = -EIO;
766 }
767 }
768
769 error_propagate(errp, iter.err);
770 *request_ret = iter.request_ret;
771 return iter.ret;
772 }
773
774 static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
775 QEMUIOVector *write_qiov)
776 {
777 int ret, request_ret;
778 Error *local_err = NULL;
779 NBDClientSession *client = nbd_get_client_session(bs);
780
781 assert(request->type != NBD_CMD_READ);
782 if (write_qiov) {
783 assert(request->type == NBD_CMD_WRITE);
784 assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
785 } else {
786 assert(request->type != NBD_CMD_WRITE);
787 }
788 ret = nbd_co_send_request(bs, request, write_qiov);
789 if (ret < 0) {
790 return ret;
791 }
792
793 ret = nbd_co_receive_return_code(client, request->handle,
794 &request_ret, &local_err);
795 if (local_err) {
796 trace_nbd_co_request_fail(request->from, request->len, request->handle,
797 request->flags, request->type,
798 nbd_cmd_lookup(request->type),
799 ret, error_get_pretty(local_err));
800 error_free(local_err);
801 }
802 return ret ? ret : request_ret;
803 }
804
805 int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
806 uint64_t bytes, QEMUIOVector *qiov, int flags)
807 {
808 int ret, request_ret;
809 Error *local_err = NULL;
810 NBDClientSession *client = nbd_get_client_session(bs);
811 NBDRequest request = {
812 .type = NBD_CMD_READ,
813 .from = offset,
814 .len = bytes,
815 };
816
817 assert(bytes <= NBD_MAX_BUFFER_SIZE);
818 assert(!flags);
819
820 if (!bytes) {
821 return 0;
822 }
823 ret = nbd_co_send_request(bs, &request, NULL);
824 if (ret < 0) {
825 return ret;
826 }
827
828 ret = nbd_co_receive_cmdread_reply(client, request.handle, offset, qiov,
829 &request_ret, &local_err);
830 if (local_err) {
831 trace_nbd_co_request_fail(request.from, request.len, request.handle,
832 request.flags, request.type,
833 nbd_cmd_lookup(request.type),
834 ret, error_get_pretty(local_err));
835 error_free(local_err);
836 }
837 return ret ? ret : request_ret;
838 }
839
840 int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
841 uint64_t bytes, QEMUIOVector *qiov, int flags)
842 {
843 NBDClientSession *client = nbd_get_client_session(bs);
844 NBDRequest request = {
845 .type = NBD_CMD_WRITE,
846 .from = offset,
847 .len = bytes,
848 };
849
850 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
851 if (flags & BDRV_REQ_FUA) {
852 assert(client->info.flags & NBD_FLAG_SEND_FUA);
853 request.flags |= NBD_CMD_FLAG_FUA;
854 }
855
856 assert(bytes <= NBD_MAX_BUFFER_SIZE);
857
858 if (!bytes) {
859 return 0;
860 }
861 return nbd_co_request(bs, &request, qiov);
862 }
863
864 int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
865 int bytes, BdrvRequestFlags flags)
866 {
867 NBDClientSession *client = nbd_get_client_session(bs);
868 NBDRequest request = {
869 .type = NBD_CMD_WRITE_ZEROES,
870 .from = offset,
871 .len = bytes,
872 };
873
874 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
875 if (!(client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
876 return -ENOTSUP;
877 }
878
879 if (flags & BDRV_REQ_FUA) {
880 assert(client->info.flags & NBD_FLAG_SEND_FUA);
881 request.flags |= NBD_CMD_FLAG_FUA;
882 }
883 if (!(flags & BDRV_REQ_MAY_UNMAP)) {
884 request.flags |= NBD_CMD_FLAG_NO_HOLE;
885 }
886
887 if (!bytes) {
888 return 0;
889 }
890 return nbd_co_request(bs, &request, NULL);
891 }
892
893 int nbd_client_co_flush(BlockDriverState *bs)
894 {
895 NBDClientSession *client = nbd_get_client_session(bs);
896 NBDRequest request = { .type = NBD_CMD_FLUSH };
897
898 if (!(client->info.flags & NBD_FLAG_SEND_FLUSH)) {
899 return 0;
900 }
901
902 request.from = 0;
903 request.len = 0;
904
905 return nbd_co_request(bs, &request, NULL);
906 }
907
908 int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int bytes)
909 {
910 NBDClientSession *client = nbd_get_client_session(bs);
911 NBDRequest request = {
912 .type = NBD_CMD_TRIM,
913 .from = offset,
914 .len = bytes,
915 };
916
917 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
918 if (!(client->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
919 return 0;
920 }
921
922 return nbd_co_request(bs, &request, NULL);
923 }
924
925 int coroutine_fn nbd_client_co_block_status(BlockDriverState *bs,
926 bool want_zero,
927 int64_t offset, int64_t bytes,
928 int64_t *pnum, int64_t *map,
929 BlockDriverState **file)
930 {
931 int ret, request_ret;
932 NBDExtent extent = { 0 };
933 NBDClientSession *client = nbd_get_client_session(bs);
934 Error *local_err = NULL;
935
936 NBDRequest request = {
937 .type = NBD_CMD_BLOCK_STATUS,
938 .from = offset,
939 .len = MIN(MIN_NON_ZERO(QEMU_ALIGN_DOWN(INT_MAX,
940 bs->bl.request_alignment),
941 client->info.max_block), bytes),
942 .flags = NBD_CMD_FLAG_REQ_ONE,
943 };
944
945 if (!client->info.base_allocation) {
946 *pnum = bytes;
947 return BDRV_BLOCK_DATA;
948 }
949
950 ret = nbd_co_send_request(bs, &request, NULL);
951 if (ret < 0) {
952 return ret;
953 }
954
955 ret = nbd_co_receive_blockstatus_reply(client, request.handle, bytes,
956 &extent, &request_ret, &local_err);
957 if (local_err) {
958 trace_nbd_co_request_fail(request.from, request.len, request.handle,
959 request.flags, request.type,
960 nbd_cmd_lookup(request.type),
961 ret, error_get_pretty(local_err));
962 error_free(local_err);
963 }
964 if (ret < 0 || request_ret < 0) {
965 return ret ? ret : request_ret;
966 }
967
968 assert(extent.length);
969 *pnum = extent.length;
970 return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
971 (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0);
972 }
973
974 void nbd_client_detach_aio_context(BlockDriverState *bs)
975 {
976 NBDClientSession *client = nbd_get_client_session(bs);
977 qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
978 }
979
980 static void nbd_client_attach_aio_context_bh(void *opaque)
981 {
982 BlockDriverState *bs = opaque;
983 NBDClientSession *client = nbd_get_client_session(bs);
984
985 /* The node is still drained, so we know the coroutine has yielded in
986 * nbd_read_eof(), the only place where bs->in_flight can reach 0, or it is
987 * entered for the first time. Both places are safe for entering the
988 * coroutine.*/
989 qemu_aio_coroutine_enter(bs->aio_context, client->connection_co);
990 bdrv_dec_in_flight(bs);
991 }
992
993 void nbd_client_attach_aio_context(BlockDriverState *bs,
994 AioContext *new_context)
995 {
996 NBDClientSession *client = nbd_get_client_session(bs);
997 qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc), new_context);
998
999 bdrv_inc_in_flight(bs);
1000
1001 /* Need to wait here for the BH to run because the BH must run while the
1002 * node is still drained. */
1003 aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
1004 }
1005
1006 void nbd_client_close(BlockDriverState *bs)
1007 {
1008 NBDClientSession *client = nbd_get_client_session(bs);
1009 NBDRequest request = { .type = NBD_CMD_DISC };
1010
1011 assert(client->ioc);
1012
1013 nbd_send_request(client->ioc, &request);
1014
1015 nbd_teardown_connection(bs);
1016 }
1017
1018 static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
1019 Error **errp)
1020 {
1021 QIOChannelSocket *sioc;
1022 Error *local_err = NULL;
1023
1024 sioc = qio_channel_socket_new();
1025 qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
1026
1027 qio_channel_socket_connect_sync(sioc, saddr, &local_err);
1028 if (local_err) {
1029 object_unref(OBJECT(sioc));
1030 error_propagate(errp, local_err);
1031 return NULL;
1032 }
1033
1034 qio_channel_set_delay(QIO_CHANNEL(sioc), false);
1035
1036 return sioc;
1037 }
1038
1039 static int nbd_client_connect(BlockDriverState *bs,
1040 SocketAddress *saddr,
1041 const char *export,
1042 QCryptoTLSCreds *tlscreds,
1043 const char *hostname,
1044 const char *x_dirty_bitmap,
1045 Error **errp)
1046 {
1047 NBDClientSession *client = nbd_get_client_session(bs);
1048 int ret;
1049
1050 /*
1051 * establish TCP connection, return error if it fails
1052 * TODO: Configurable retry-until-timeout behaviour.
1053 */
1054 QIOChannelSocket *sioc = nbd_establish_connection(saddr, errp);
1055
1056 if (!sioc) {
1057 return -ECONNREFUSED;
1058 }
1059
1060 /* NBD handshake */
1061 logout("session init %s\n", export);
1062 qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
1063
1064 client->info.request_sizes = true;
1065 client->info.structured_reply = true;
1066 client->info.base_allocation = true;
1067 client->info.x_dirty_bitmap = g_strdup(x_dirty_bitmap);
1068 client->info.name = g_strdup(export ?: "");
1069 ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), tlscreds, hostname,
1070 &client->ioc, &client->info, errp);
1071 g_free(client->info.x_dirty_bitmap);
1072 g_free(client->info.name);
1073 if (ret < 0) {
1074 logout("Failed to negotiate with the NBD server\n");
1075 object_unref(OBJECT(sioc));
1076 return ret;
1077 }
1078 if (x_dirty_bitmap && !client->info.base_allocation) {
1079 error_setg(errp, "requested x-dirty-bitmap %s not found",
1080 x_dirty_bitmap);
1081 ret = -EINVAL;
1082 goto fail;
1083 }
1084 if (client->info.flags & NBD_FLAG_READ_ONLY) {
1085 ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
1086 if (ret < 0) {
1087 goto fail;
1088 }
1089 }
1090 if (client->info.flags & NBD_FLAG_SEND_FUA) {
1091 bs->supported_write_flags = BDRV_REQ_FUA;
1092 bs->supported_zero_flags |= BDRV_REQ_FUA;
1093 }
1094 if (client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
1095 bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
1096 }
1097
1098 client->sioc = sioc;
1099
1100 if (!client->ioc) {
1101 client->ioc = QIO_CHANNEL(sioc);
1102 object_ref(OBJECT(client->ioc));
1103 }
1104
1105 /* Now that we're connected, set the socket to be non-blocking and
1106 * kick the reply mechanism. */
1107 qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
1108 client->connection_co = qemu_coroutine_create(nbd_connection_entry, client);
1109 bdrv_inc_in_flight(bs);
1110 nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
1111
1112 logout("Established connection with NBD server\n");
1113 return 0;
1114
1115 fail:
1116 /*
1117 * We have connected, but must fail for other reasons. The
1118 * connection is still blocking; send NBD_CMD_DISC as a courtesy
1119 * to the server.
1120 */
1121 {
1122 NBDRequest request = { .type = NBD_CMD_DISC };
1123
1124 nbd_send_request(client->ioc ?: QIO_CHANNEL(sioc), &request);
1125
1126 object_unref(OBJECT(sioc));
1127
1128 return ret;
1129 }
1130 }
1131
1132 int nbd_client_init(BlockDriverState *bs,
1133 SocketAddress *saddr,
1134 const char *export,
1135 QCryptoTLSCreds *tlscreds,
1136 const char *hostname,
1137 const char *x_dirty_bitmap,
1138 Error **errp)
1139 {
1140 NBDClientSession *client = nbd_get_client_session(bs);
1141
1142 client->bs = bs;
1143 qemu_co_mutex_init(&client->send_mutex);
1144 qemu_co_queue_init(&client->free_sema);
1145
1146 return nbd_client_connect(bs, saddr, export, tlscreds, hostname,
1147 x_dirty_bitmap, errp);
1148 }