2 * QEMU Block driver for NBD
4 * Copyright (C) 2016 Red Hat, Inc.
5 * Copyright (C) 2008 Bull S.A.S.
6 * Author: Laurent Vivier <Laurent.Vivier@bull.net>
9 * Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
30 #include "qemu/osdep.h"
33 #include "qapi/error.h"
34 #include "nbd-client.h"
36 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
37 #define INDEX_TO_HANDLE(bs, index) ((index) ^ (uint64_t)(intptr_t)(bs))
39 static void nbd_recv_coroutines_wake_all(NBDClientSession
*s
)
43 for (i
= 0; i
< MAX_NBD_REQUESTS
; i
++) {
44 NBDClientRequest
*req
= &s
->requests
[i
];
46 if (req
->coroutine
&& req
->receiving
) {
47 aio_co_wake(req
->coroutine
);
52 static void nbd_teardown_connection(BlockDriverState
*bs
)
54 NBDClientSession
*client
= nbd_get_client_session(bs
);
58 /* finish any pending coroutines */
59 qio_channel_shutdown(client
->ioc
,
60 QIO_CHANNEL_SHUTDOWN_BOTH
,
62 BDRV_POLL_WHILE(bs
, client
->connection_co
);
64 nbd_client_detach_aio_context(bs
);
65 object_unref(OBJECT(client
->sioc
));
67 object_unref(OBJECT(client
->ioc
));
71 static coroutine_fn
void nbd_connection_entry(void *opaque
)
73 NBDClientSession
*s
= opaque
;
76 Error
*local_err
= NULL
;
80 * The NBD client can only really be considered idle when it has
81 * yielded from qio_channel_readv_all_eof(), waiting for data. This is
82 * the point where the additional scheduled coroutine entry happens
83 * after nbd_client_attach_aio_context().
85 * Therefore we keep an additional in_flight reference all the time and
86 * only drop it temporarily here.
88 assert(s
->reply
.handle
== 0);
89 ret
= nbd_receive_reply(s
->bs
, s
->ioc
, &s
->reply
, &local_err
);
92 trace_nbd_read_reply_entry_fail(ret
, error_get_pretty(local_err
));
93 error_free(local_err
);
99 /* There's no need for a mutex on the receive side, because the
100 * handler acts as a synchronization point and ensures that only
101 * one coroutine is called until the reply finishes.
103 i
= HANDLE_TO_INDEX(s
, s
->reply
.handle
);
104 if (i
>= MAX_NBD_REQUESTS
||
105 !s
->requests
[i
].coroutine
||
106 !s
->requests
[i
].receiving
||
107 (nbd_reply_is_structured(&s
->reply
) && !s
->info
.structured_reply
))
112 /* We're woken up again by the request itself. Note that there
113 * is no race between yielding and reentering connection_co. This
116 * - if the request runs on the same AioContext, it is only
117 * entered after we yield
119 * - if the request runs on a different AioContext, reentering
120 * connection_co happens through a bottom half, which can only
121 * run after we yield.
123 aio_co_wake(s
->requests
[i
].coroutine
);
124 qemu_coroutine_yield();
128 nbd_recv_coroutines_wake_all(s
);
129 bdrv_dec_in_flight(s
->bs
);
131 s
->connection_co
= NULL
;
135 static int nbd_co_send_request(BlockDriverState
*bs
,
139 NBDClientSession
*s
= nbd_get_client_session(bs
);
142 qemu_co_mutex_lock(&s
->send_mutex
);
143 while (s
->in_flight
== MAX_NBD_REQUESTS
) {
144 qemu_co_queue_wait(&s
->free_sema
, &s
->send_mutex
);
148 for (i
= 0; i
< MAX_NBD_REQUESTS
; i
++) {
149 if (s
->requests
[i
].coroutine
== NULL
) {
154 g_assert(qemu_in_coroutine());
155 assert(i
< MAX_NBD_REQUESTS
);
157 s
->requests
[i
].coroutine
= qemu_coroutine_self();
158 s
->requests
[i
].offset
= request
->from
;
159 s
->requests
[i
].receiving
= false;
161 request
->handle
= INDEX_TO_HANDLE(s
, i
);
170 qio_channel_set_cork(s
->ioc
, true);
171 rc
= nbd_send_request(s
->ioc
, request
);
172 if (rc
>= 0 && !s
->quit
) {
173 if (qio_channel_writev_all(s
->ioc
, qiov
->iov
, qiov
->niov
,
177 } else if (rc
>= 0) {
180 qio_channel_set_cork(s
->ioc
, false);
182 rc
= nbd_send_request(s
->ioc
, request
);
188 s
->requests
[i
].coroutine
= NULL
;
190 qemu_co_queue_next(&s
->free_sema
);
192 qemu_co_mutex_unlock(&s
->send_mutex
);
196 static inline uint16_t payload_advance16(uint8_t **payload
)
199 return lduw_be_p(*payload
- 2);
202 static inline uint32_t payload_advance32(uint8_t **payload
)
205 return ldl_be_p(*payload
- 4);
208 static inline uint64_t payload_advance64(uint8_t **payload
)
211 return ldq_be_p(*payload
- 8);
214 static int nbd_parse_offset_hole_payload(NBDStructuredReplyChunk
*chunk
,
215 uint8_t *payload
, uint64_t orig_offset
,
216 QEMUIOVector
*qiov
, Error
**errp
)
221 if (chunk
->length
!= sizeof(offset
) + sizeof(hole_size
)) {
222 error_setg(errp
, "Protocol error: invalid payload for "
223 "NBD_REPLY_TYPE_OFFSET_HOLE");
227 offset
= payload_advance64(&payload
);
228 hole_size
= payload_advance32(&payload
);
230 if (!hole_size
|| offset
< orig_offset
|| hole_size
> qiov
->size
||
231 offset
> orig_offset
+ qiov
->size
- hole_size
) {
232 error_setg(errp
, "Protocol error: server sent chunk exceeding requested"
237 qemu_iovec_memset(qiov
, offset
- orig_offset
, 0, hole_size
);
242 /* nbd_parse_blockstatus_payload
243 * Based on our request, we expect only one extent in reply, for the
244 * base:allocation context.
246 static int nbd_parse_blockstatus_payload(NBDClientSession
*client
,
247 NBDStructuredReplyChunk
*chunk
,
248 uint8_t *payload
, uint64_t orig_length
,
249 NBDExtent
*extent
, Error
**errp
)
253 /* The server succeeded, so it must have sent [at least] one extent */
254 if (chunk
->length
< sizeof(context_id
) + sizeof(*extent
)) {
255 error_setg(errp
, "Protocol error: invalid payload for "
256 "NBD_REPLY_TYPE_BLOCK_STATUS");
260 context_id
= payload_advance32(&payload
);
261 if (client
->info
.context_id
!= context_id
) {
262 error_setg(errp
, "Protocol error: unexpected context id %d for "
263 "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
264 "id is %d", context_id
,
265 client
->info
.context_id
);
269 extent
->length
= payload_advance32(&payload
);
270 extent
->flags
= payload_advance32(&payload
);
272 if (extent
->length
== 0) {
273 error_setg(errp
, "Protocol error: server sent status chunk with "
279 * A server sending unaligned block status is in violation of the
280 * protocol, but as qemu-nbd 3.1 is such a server (at least for
281 * POSIX files that are not a multiple of 512 bytes, since qemu
282 * rounds files up to 512-byte multiples but lseek(SEEK_HOLE)
283 * still sees an implicit hole beyond the real EOF), it's nicer to
284 * work around the misbehaving server. If the request included
285 * more than the final unaligned block, truncate it back to an
286 * aligned result; if the request was only the final block, round
287 * up to the full block and change the status to fully-allocated
288 * (always a safe status, even if it loses information).
290 if (client
->info
.min_block
&& !QEMU_IS_ALIGNED(extent
->length
,
291 client
->info
.min_block
)) {
292 trace_nbd_parse_blockstatus_compliance("extent length is unaligned");
293 if (extent
->length
> client
->info
.min_block
) {
294 extent
->length
= QEMU_ALIGN_DOWN(extent
->length
,
295 client
->info
.min_block
);
297 extent
->length
= client
->info
.min_block
;
303 * We used NBD_CMD_FLAG_REQ_ONE, so the server should not have
304 * sent us any more than one extent, nor should it have included
305 * status beyond our request in that extent. However, it's easy
306 * enough to ignore the server's noncompliance without killing the
307 * connection; just ignore trailing extents, and clamp things to
308 * the length of our request.
310 if (chunk
->length
> sizeof(context_id
) + sizeof(*extent
)) {
311 trace_nbd_parse_blockstatus_compliance("more than one extent");
313 if (extent
->length
> orig_length
) {
314 extent
->length
= orig_length
;
315 trace_nbd_parse_blockstatus_compliance("extent length too large");
321 /* nbd_parse_error_payload
322 * on success @errp contains message describing nbd error reply
324 static int nbd_parse_error_payload(NBDStructuredReplyChunk
*chunk
,
325 uint8_t *payload
, int *request_ret
,
329 uint16_t message_size
;
331 assert(chunk
->type
& (1 << 15));
333 if (chunk
->length
< sizeof(error
) + sizeof(message_size
)) {
335 "Protocol error: invalid payload for structured error");
339 error
= nbd_errno_to_system_errno(payload_advance32(&payload
));
341 error_setg(errp
, "Protocol error: server sent structured error chunk "
346 *request_ret
= -error
;
347 message_size
= payload_advance16(&payload
);
349 if (message_size
> chunk
->length
- sizeof(error
) - sizeof(message_size
)) {
350 error_setg(errp
, "Protocol error: server sent structured error chunk "
351 "with incorrect message size");
355 /* TODO: Add a trace point to mention the server complaint */
357 /* TODO handle ERROR_OFFSET */
362 static int nbd_co_receive_offset_data_payload(NBDClientSession
*s
,
363 uint64_t orig_offset
,
364 QEMUIOVector
*qiov
, Error
**errp
)
366 QEMUIOVector sub_qiov
;
370 NBDStructuredReplyChunk
*chunk
= &s
->reply
.structured
;
372 assert(nbd_reply_is_structured(&s
->reply
));
374 /* The NBD spec requires at least one byte of payload */
375 if (chunk
->length
<= sizeof(offset
)) {
376 error_setg(errp
, "Protocol error: invalid payload for "
377 "NBD_REPLY_TYPE_OFFSET_DATA");
381 if (nbd_read64(s
->ioc
, &offset
, "OFFSET_DATA offset", errp
) < 0) {
385 data_size
= chunk
->length
- sizeof(offset
);
387 if (offset
< orig_offset
|| data_size
> qiov
->size
||
388 offset
> orig_offset
+ qiov
->size
- data_size
) {
389 error_setg(errp
, "Protocol error: server sent chunk exceeding requested"
394 qemu_iovec_init(&sub_qiov
, qiov
->niov
);
395 qemu_iovec_concat(&sub_qiov
, qiov
, offset
- orig_offset
, data_size
);
396 ret
= qio_channel_readv_all(s
->ioc
, sub_qiov
.iov
, sub_qiov
.niov
, errp
);
397 qemu_iovec_destroy(&sub_qiov
);
399 return ret
< 0 ? -EIO
: 0;
402 #define NBD_MAX_MALLOC_PAYLOAD 1000
403 /* nbd_co_receive_structured_payload
405 static coroutine_fn
int nbd_co_receive_structured_payload(
406 NBDClientSession
*s
, void **payload
, Error
**errp
)
411 assert(nbd_reply_is_structured(&s
->reply
));
413 len
= s
->reply
.structured
.length
;
419 if (payload
== NULL
) {
420 error_setg(errp
, "Unexpected structured payload");
424 if (len
> NBD_MAX_MALLOC_PAYLOAD
) {
425 error_setg(errp
, "Payload too large");
429 *payload
= g_new(char, len
);
430 ret
= nbd_read(s
->ioc
, *payload
, len
, "structured payload", errp
);
440 /* nbd_co_do_receive_one_chunk
442 * set request_ret to received reply error
443 * if qiov is not NULL: read payload to @qiov
444 * for structured reply chunk:
445 * if error chunk: read payload, set @request_ret, do not set @payload
446 * else if offset_data chunk: read payload data to @qiov, do not set @payload
447 * else: read payload to @payload
449 * If function fails, @errp contains corresponding error message, and the
450 * connection with the server is suspect. If it returns 0, then the
451 * transaction succeeded (although @request_ret may be a negative errno
452 * corresponding to the server's error reply), and errp is unchanged.
454 static coroutine_fn
int nbd_co_do_receive_one_chunk(
455 NBDClientSession
*s
, uint64_t handle
, bool only_structured
,
456 int *request_ret
, QEMUIOVector
*qiov
, void **payload
, Error
**errp
)
459 int i
= HANDLE_TO_INDEX(s
, handle
);
460 void *local_payload
= NULL
;
461 NBDStructuredReplyChunk
*chunk
;
468 /* Wait until we're woken up by nbd_connection_entry. */
469 s
->requests
[i
].receiving
= true;
470 qemu_coroutine_yield();
471 s
->requests
[i
].receiving
= false;
473 error_setg(errp
, "Connection closed");
478 assert(s
->reply
.handle
== handle
);
480 if (nbd_reply_is_simple(&s
->reply
)) {
481 if (only_structured
) {
482 error_setg(errp
, "Protocol error: simple reply when structured "
483 "reply chunk was expected");
487 *request_ret
= -nbd_errno_to_system_errno(s
->reply
.simple
.error
);
488 if (*request_ret
< 0 || !qiov
) {
492 return qio_channel_readv_all(s
->ioc
, qiov
->iov
, qiov
->niov
,
493 errp
) < 0 ? -EIO
: 0;
496 /* handle structured reply chunk */
497 assert(s
->info
.structured_reply
);
498 chunk
= &s
->reply
.structured
;
500 if (chunk
->type
== NBD_REPLY_TYPE_NONE
) {
501 if (!(chunk
->flags
& NBD_REPLY_FLAG_DONE
)) {
502 error_setg(errp
, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
503 " NBD_REPLY_FLAG_DONE flag set");
507 error_setg(errp
, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
514 if (chunk
->type
== NBD_REPLY_TYPE_OFFSET_DATA
) {
516 error_setg(errp
, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
520 return nbd_co_receive_offset_data_payload(s
, s
->requests
[i
].offset
,
524 if (nbd_reply_type_is_error(chunk
->type
)) {
525 payload
= &local_payload
;
528 ret
= nbd_co_receive_structured_payload(s
, payload
, errp
);
533 if (nbd_reply_type_is_error(chunk
->type
)) {
534 ret
= nbd_parse_error_payload(chunk
, local_payload
, request_ret
, errp
);
535 g_free(local_payload
);
542 /* nbd_co_receive_one_chunk
543 * Read reply, wake up connection_co and set s->quit if needed.
544 * Return value is a fatal error code or normal nbd reply error code
546 static coroutine_fn
int nbd_co_receive_one_chunk(
547 NBDClientSession
*s
, uint64_t handle
, bool only_structured
,
548 int *request_ret
, QEMUIOVector
*qiov
, NBDReply
*reply
, void **payload
,
551 int ret
= nbd_co_do_receive_one_chunk(s
, handle
, only_structured
,
552 request_ret
, qiov
, payload
, errp
);
557 /* For assert at loop start in nbd_connection_entry */
564 if (s
->connection_co
) {
565 aio_co_wake(s
->connection_co
);
571 typedef struct NBDReplyChunkIter
{
575 bool done
, only_structured
;
578 static void nbd_iter_channel_error(NBDReplyChunkIter
*iter
,
579 int ret
, Error
**local_err
)
585 error_propagate(&iter
->err
, *local_err
);
587 error_free(*local_err
);
593 static void nbd_iter_request_error(NBDReplyChunkIter
*iter
, int ret
)
597 if (!iter
->request_ret
) {
598 iter
->request_ret
= ret
;
602 /* NBD_FOREACH_REPLY_CHUNK
604 #define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
605 qiov, reply, payload) \
606 for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
607 nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
609 /* nbd_reply_chunk_iter_receive
611 static bool nbd_reply_chunk_iter_receive(NBDClientSession
*s
,
612 NBDReplyChunkIter
*iter
,
614 QEMUIOVector
*qiov
, NBDReply
*reply
,
617 int ret
, request_ret
;
618 NBDReply local_reply
;
619 NBDStructuredReplyChunk
*chunk
;
620 Error
*local_err
= NULL
;
622 error_setg(&local_err
, "Connection closed");
623 nbd_iter_channel_error(iter
, -EIO
, &local_err
);
628 /* Previous iteration was last. */
633 reply
= &local_reply
;
636 ret
= nbd_co_receive_one_chunk(s
, handle
, iter
->only_structured
,
637 &request_ret
, qiov
, reply
, payload
,
640 nbd_iter_channel_error(iter
, ret
, &local_err
);
641 } else if (request_ret
< 0) {
642 nbd_iter_request_error(iter
, request_ret
);
645 /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
646 if (nbd_reply_is_simple(reply
) || s
->quit
) {
650 chunk
= &reply
->structured
;
651 iter
->only_structured
= true;
653 if (chunk
->type
== NBD_REPLY_TYPE_NONE
) {
654 /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
655 assert(chunk
->flags
& NBD_REPLY_FLAG_DONE
);
659 if (chunk
->flags
& NBD_REPLY_FLAG_DONE
) {
660 /* This iteration is last. */
664 /* Execute the loop body */
668 s
->requests
[HANDLE_TO_INDEX(s
, handle
)].coroutine
= NULL
;
670 qemu_co_mutex_lock(&s
->send_mutex
);
672 qemu_co_queue_next(&s
->free_sema
);
673 qemu_co_mutex_unlock(&s
->send_mutex
);
678 static int nbd_co_receive_return_code(NBDClientSession
*s
, uint64_t handle
,
679 int *request_ret
, Error
**errp
)
681 NBDReplyChunkIter iter
;
683 NBD_FOREACH_REPLY_CHUNK(s
, iter
, handle
, false, NULL
, NULL
, NULL
) {
684 /* nbd_reply_chunk_iter_receive does all the work */
687 error_propagate(errp
, iter
.err
);
688 *request_ret
= iter
.request_ret
;
692 static int nbd_co_receive_cmdread_reply(NBDClientSession
*s
, uint64_t handle
,
693 uint64_t offset
, QEMUIOVector
*qiov
,
694 int *request_ret
, Error
**errp
)
696 NBDReplyChunkIter iter
;
698 void *payload
= NULL
;
699 Error
*local_err
= NULL
;
701 NBD_FOREACH_REPLY_CHUNK(s
, iter
, handle
, s
->info
.structured_reply
,
702 qiov
, &reply
, &payload
)
705 NBDStructuredReplyChunk
*chunk
= &reply
.structured
;
707 assert(nbd_reply_is_structured(&reply
));
709 switch (chunk
->type
) {
710 case NBD_REPLY_TYPE_OFFSET_DATA
:
711 /* special cased in nbd_co_receive_one_chunk, data is already
714 case NBD_REPLY_TYPE_OFFSET_HOLE
:
715 ret
= nbd_parse_offset_hole_payload(&reply
.structured
, payload
,
716 offset
, qiov
, &local_err
);
719 nbd_iter_channel_error(&iter
, ret
, &local_err
);
723 if (!nbd_reply_type_is_error(chunk
->type
)) {
724 /* not allowed reply type */
726 error_setg(&local_err
,
727 "Unexpected reply type: %d (%s) for CMD_READ",
728 chunk
->type
, nbd_reply_type_lookup(chunk
->type
));
729 nbd_iter_channel_error(&iter
, -EINVAL
, &local_err
);
737 error_propagate(errp
, iter
.err
);
738 *request_ret
= iter
.request_ret
;
742 static int nbd_co_receive_blockstatus_reply(NBDClientSession
*s
,
743 uint64_t handle
, uint64_t length
,
745 int *request_ret
, Error
**errp
)
747 NBDReplyChunkIter iter
;
749 void *payload
= NULL
;
750 Error
*local_err
= NULL
;
751 bool received
= false;
753 assert(!extent
->length
);
754 NBD_FOREACH_REPLY_CHUNK(s
, iter
, handle
, false, NULL
, &reply
, &payload
) {
756 NBDStructuredReplyChunk
*chunk
= &reply
.structured
;
758 assert(nbd_reply_is_structured(&reply
));
760 switch (chunk
->type
) {
761 case NBD_REPLY_TYPE_BLOCK_STATUS
:
764 error_setg(&local_err
, "Several BLOCK_STATUS chunks in reply");
765 nbd_iter_channel_error(&iter
, -EINVAL
, &local_err
);
769 ret
= nbd_parse_blockstatus_payload(s
, &reply
.structured
,
770 payload
, length
, extent
,
774 nbd_iter_channel_error(&iter
, ret
, &local_err
);
778 if (!nbd_reply_type_is_error(chunk
->type
)) {
780 error_setg(&local_err
,
781 "Unexpected reply type: %d (%s) "
782 "for CMD_BLOCK_STATUS",
783 chunk
->type
, nbd_reply_type_lookup(chunk
->type
));
784 nbd_iter_channel_error(&iter
, -EINVAL
, &local_err
);
792 if (!extent
->length
&& !iter
.request_ret
) {
793 error_setg(&local_err
, "Server did not reply with any status extents");
794 nbd_iter_channel_error(&iter
, -EIO
, &local_err
);
797 error_propagate(errp
, iter
.err
);
798 *request_ret
= iter
.request_ret
;
802 static int nbd_co_request(BlockDriverState
*bs
, NBDRequest
*request
,
803 QEMUIOVector
*write_qiov
)
805 int ret
, request_ret
;
806 Error
*local_err
= NULL
;
807 NBDClientSession
*client
= nbd_get_client_session(bs
);
809 assert(request
->type
!= NBD_CMD_READ
);
811 assert(request
->type
== NBD_CMD_WRITE
);
812 assert(request
->len
== iov_size(write_qiov
->iov
, write_qiov
->niov
));
814 assert(request
->type
!= NBD_CMD_WRITE
);
816 ret
= nbd_co_send_request(bs
, request
, write_qiov
);
821 ret
= nbd_co_receive_return_code(client
, request
->handle
,
822 &request_ret
, &local_err
);
824 trace_nbd_co_request_fail(request
->from
, request
->len
, request
->handle
,
825 request
->flags
, request
->type
,
826 nbd_cmd_lookup(request
->type
),
827 ret
, error_get_pretty(local_err
));
828 error_free(local_err
);
830 return ret
? ret
: request_ret
;
833 int nbd_client_co_preadv(BlockDriverState
*bs
, uint64_t offset
,
834 uint64_t bytes
, QEMUIOVector
*qiov
, int flags
)
836 int ret
, request_ret
;
837 Error
*local_err
= NULL
;
838 NBDClientSession
*client
= nbd_get_client_session(bs
);
839 NBDRequest request
= {
840 .type
= NBD_CMD_READ
,
845 assert(bytes
<= NBD_MAX_BUFFER_SIZE
);
852 * Work around the fact that the block layer doesn't do
853 * byte-accurate sizing yet - if the read exceeds the server's
854 * advertised size because the block layer rounded size up, then
855 * truncate the request to the server and tail-pad with zero.
857 if (offset
>= client
->info
.size
) {
858 assert(bytes
< BDRV_SECTOR_SIZE
);
859 qemu_iovec_memset(qiov
, 0, 0, bytes
);
862 if (offset
+ bytes
> client
->info
.size
) {
863 uint64_t slop
= offset
+ bytes
- client
->info
.size
;
865 assert(slop
< BDRV_SECTOR_SIZE
);
866 qemu_iovec_memset(qiov
, bytes
- slop
, 0, slop
);
870 ret
= nbd_co_send_request(bs
, &request
, NULL
);
875 ret
= nbd_co_receive_cmdread_reply(client
, request
.handle
, offset
, qiov
,
876 &request_ret
, &local_err
);
878 trace_nbd_co_request_fail(request
.from
, request
.len
, request
.handle
,
879 request
.flags
, request
.type
,
880 nbd_cmd_lookup(request
.type
),
881 ret
, error_get_pretty(local_err
));
882 error_free(local_err
);
884 return ret
? ret
: request_ret
;
887 int nbd_client_co_pwritev(BlockDriverState
*bs
, uint64_t offset
,
888 uint64_t bytes
, QEMUIOVector
*qiov
, int flags
)
890 NBDClientSession
*client
= nbd_get_client_session(bs
);
891 NBDRequest request
= {
892 .type
= NBD_CMD_WRITE
,
897 assert(!(client
->info
.flags
& NBD_FLAG_READ_ONLY
));
898 if (flags
& BDRV_REQ_FUA
) {
899 assert(client
->info
.flags
& NBD_FLAG_SEND_FUA
);
900 request
.flags
|= NBD_CMD_FLAG_FUA
;
903 assert(bytes
<= NBD_MAX_BUFFER_SIZE
);
908 return nbd_co_request(bs
, &request
, qiov
);
911 int nbd_client_co_pwrite_zeroes(BlockDriverState
*bs
, int64_t offset
,
912 int bytes
, BdrvRequestFlags flags
)
914 NBDClientSession
*client
= nbd_get_client_session(bs
);
915 NBDRequest request
= {
916 .type
= NBD_CMD_WRITE_ZEROES
,
921 assert(!(client
->info
.flags
& NBD_FLAG_READ_ONLY
));
922 if (!(client
->info
.flags
& NBD_FLAG_SEND_WRITE_ZEROES
)) {
926 if (flags
& BDRV_REQ_FUA
) {
927 assert(client
->info
.flags
& NBD_FLAG_SEND_FUA
);
928 request
.flags
|= NBD_CMD_FLAG_FUA
;
930 if (!(flags
& BDRV_REQ_MAY_UNMAP
)) {
931 request
.flags
|= NBD_CMD_FLAG_NO_HOLE
;
937 return nbd_co_request(bs
, &request
, NULL
);
940 int nbd_client_co_flush(BlockDriverState
*bs
)
942 NBDClientSession
*client
= nbd_get_client_session(bs
);
943 NBDRequest request
= { .type
= NBD_CMD_FLUSH
};
945 if (!(client
->info
.flags
& NBD_FLAG_SEND_FLUSH
)) {
952 return nbd_co_request(bs
, &request
, NULL
);
955 int nbd_client_co_pdiscard(BlockDriverState
*bs
, int64_t offset
, int bytes
)
957 NBDClientSession
*client
= nbd_get_client_session(bs
);
958 NBDRequest request
= {
959 .type
= NBD_CMD_TRIM
,
964 assert(!(client
->info
.flags
& NBD_FLAG_READ_ONLY
));
965 if (!(client
->info
.flags
& NBD_FLAG_SEND_TRIM
) || !bytes
) {
969 return nbd_co_request(bs
, &request
, NULL
);
972 int coroutine_fn
nbd_client_co_block_status(BlockDriverState
*bs
,
974 int64_t offset
, int64_t bytes
,
975 int64_t *pnum
, int64_t *map
,
976 BlockDriverState
**file
)
978 int ret
, request_ret
;
979 NBDExtent extent
= { 0 };
980 NBDClientSession
*client
= nbd_get_client_session(bs
);
981 Error
*local_err
= NULL
;
983 NBDRequest request
= {
984 .type
= NBD_CMD_BLOCK_STATUS
,
986 .len
= MIN(MIN_NON_ZERO(QEMU_ALIGN_DOWN(INT_MAX
,
987 bs
->bl
.request_alignment
),
988 client
->info
.max_block
),
989 MIN(bytes
, client
->info
.size
- offset
)),
990 .flags
= NBD_CMD_FLAG_REQ_ONE
,
993 if (!client
->info
.base_allocation
) {
997 return BDRV_BLOCK_DATA
| BDRV_BLOCK_OFFSET_VALID
;
1001 * Work around the fact that the block layer doesn't do
1002 * byte-accurate sizing yet - if the status request exceeds the
1003 * server's advertised size because the block layer rounded size
1004 * up, we truncated the request to the server (above), or are
1005 * called on just the hole.
1007 if (offset
>= client
->info
.size
) {
1009 assert(bytes
< BDRV_SECTOR_SIZE
);
1010 /* Intentionally don't report offset_valid for the hole */
1011 return BDRV_BLOCK_ZERO
;
1014 if (client
->info
.min_block
) {
1015 assert(QEMU_IS_ALIGNED(request
.len
, client
->info
.min_block
));
1017 ret
= nbd_co_send_request(bs
, &request
, NULL
);
1022 ret
= nbd_co_receive_blockstatus_reply(client
, request
.handle
, bytes
,
1023 &extent
, &request_ret
, &local_err
);
1025 trace_nbd_co_request_fail(request
.from
, request
.len
, request
.handle
,
1026 request
.flags
, request
.type
,
1027 nbd_cmd_lookup(request
.type
),
1028 ret
, error_get_pretty(local_err
));
1029 error_free(local_err
);
1031 if (ret
< 0 || request_ret
< 0) {
1032 return ret
? ret
: request_ret
;
1035 assert(extent
.length
);
1036 *pnum
= extent
.length
;
1039 return (extent
.flags
& NBD_STATE_HOLE
? 0 : BDRV_BLOCK_DATA
) |
1040 (extent
.flags
& NBD_STATE_ZERO
? BDRV_BLOCK_ZERO
: 0) |
1041 BDRV_BLOCK_OFFSET_VALID
;
1044 void nbd_client_detach_aio_context(BlockDriverState
*bs
)
1046 NBDClientSession
*client
= nbd_get_client_session(bs
);
1047 qio_channel_detach_aio_context(QIO_CHANNEL(client
->ioc
));
1050 static void nbd_client_attach_aio_context_bh(void *opaque
)
1052 BlockDriverState
*bs
= opaque
;
1053 NBDClientSession
*client
= nbd_get_client_session(bs
);
1055 /* The node is still drained, so we know the coroutine has yielded in
1056 * nbd_read_eof(), the only place where bs->in_flight can reach 0, or it is
1057 * entered for the first time. Both places are safe for entering the
1059 qemu_aio_coroutine_enter(bs
->aio_context
, client
->connection_co
);
1060 bdrv_dec_in_flight(bs
);
1063 void nbd_client_attach_aio_context(BlockDriverState
*bs
,
1064 AioContext
*new_context
)
1066 NBDClientSession
*client
= nbd_get_client_session(bs
);
1067 qio_channel_attach_aio_context(QIO_CHANNEL(client
->ioc
), new_context
);
1069 bdrv_inc_in_flight(bs
);
1071 /* Need to wait here for the BH to run because the BH must run while the
1072 * node is still drained. */
1073 aio_wait_bh_oneshot(new_context
, nbd_client_attach_aio_context_bh
, bs
);
1076 void nbd_client_close(BlockDriverState
*bs
)
1078 NBDClientSession
*client
= nbd_get_client_session(bs
);
1079 NBDRequest request
= { .type
= NBD_CMD_DISC
};
1081 assert(client
->ioc
);
1083 nbd_send_request(client
->ioc
, &request
);
1085 nbd_teardown_connection(bs
);
1088 static QIOChannelSocket
*nbd_establish_connection(SocketAddress
*saddr
,
1091 QIOChannelSocket
*sioc
;
1092 Error
*local_err
= NULL
;
1094 sioc
= qio_channel_socket_new();
1095 qio_channel_set_name(QIO_CHANNEL(sioc
), "nbd-client");
1097 qio_channel_socket_connect_sync(sioc
, saddr
, &local_err
);
1099 object_unref(OBJECT(sioc
));
1100 error_propagate(errp
, local_err
);
1104 qio_channel_set_delay(QIO_CHANNEL(sioc
), false);
1109 static int nbd_client_connect(BlockDriverState
*bs
,
1110 SocketAddress
*saddr
,
1112 QCryptoTLSCreds
*tlscreds
,
1113 const char *hostname
,
1114 const char *x_dirty_bitmap
,
1117 NBDClientSession
*client
= nbd_get_client_session(bs
);
1121 * establish TCP connection, return error if it fails
1122 * TODO: Configurable retry-until-timeout behaviour.
1124 QIOChannelSocket
*sioc
= nbd_establish_connection(saddr
, errp
);
1127 return -ECONNREFUSED
;
1131 logout("session init %s\n", export
);
1132 qio_channel_set_blocking(QIO_CHANNEL(sioc
), true, NULL
);
1134 client
->info
.request_sizes
= true;
1135 client
->info
.structured_reply
= true;
1136 client
->info
.base_allocation
= true;
1137 client
->info
.x_dirty_bitmap
= g_strdup(x_dirty_bitmap
);
1138 client
->info
.name
= g_strdup(export
?: "");
1139 ret
= nbd_receive_negotiate(QIO_CHANNEL(sioc
), tlscreds
, hostname
,
1140 &client
->ioc
, &client
->info
, errp
);
1141 g_free(client
->info
.x_dirty_bitmap
);
1142 g_free(client
->info
.name
);
1144 logout("Failed to negotiate with the NBD server\n");
1145 object_unref(OBJECT(sioc
));
1148 if (x_dirty_bitmap
&& !client
->info
.base_allocation
) {
1149 error_setg(errp
, "requested x-dirty-bitmap %s not found",
1154 if (client
->info
.flags
& NBD_FLAG_READ_ONLY
) {
1155 ret
= bdrv_apply_auto_read_only(bs
, "NBD export is read-only", errp
);
1160 if (client
->info
.flags
& NBD_FLAG_SEND_FUA
) {
1161 bs
->supported_write_flags
= BDRV_REQ_FUA
;
1162 bs
->supported_zero_flags
|= BDRV_REQ_FUA
;
1164 if (client
->info
.flags
& NBD_FLAG_SEND_WRITE_ZEROES
) {
1165 bs
->supported_zero_flags
|= BDRV_REQ_MAY_UNMAP
;
1168 client
->sioc
= sioc
;
1171 client
->ioc
= QIO_CHANNEL(sioc
);
1172 object_ref(OBJECT(client
->ioc
));
1175 /* Now that we're connected, set the socket to be non-blocking and
1176 * kick the reply mechanism. */
1177 qio_channel_set_blocking(QIO_CHANNEL(sioc
), false, NULL
);
1178 client
->connection_co
= qemu_coroutine_create(nbd_connection_entry
, client
);
1179 bdrv_inc_in_flight(bs
);
1180 nbd_client_attach_aio_context(bs
, bdrv_get_aio_context(bs
));
1182 logout("Established connection with NBD server\n");
1187 * We have connected, but must fail for other reasons. The
1188 * connection is still blocking; send NBD_CMD_DISC as a courtesy
1192 NBDRequest request
= { .type
= NBD_CMD_DISC
};
1194 nbd_send_request(client
->ioc
?: QIO_CHANNEL(sioc
), &request
);
1196 object_unref(OBJECT(sioc
));
1202 int nbd_client_init(BlockDriverState
*bs
,
1203 SocketAddress
*saddr
,
1205 QCryptoTLSCreds
*tlscreds
,
1206 const char *hostname
,
1207 const char *x_dirty_bitmap
,
1210 NBDClientSession
*client
= nbd_get_client_session(bs
);
1213 qemu_co_mutex_init(&client
->send_mutex
);
1214 qemu_co_queue_init(&client
->free_sema
);
1216 return nbd_client_connect(bs
, saddr
, export
, tlscreds
, hostname
,
1217 x_dirty_bitmap
, errp
);