2 * QEMU Block driver for NBD
4 * Copyright (C) 2016 Red Hat, Inc.
5 * Copyright (C) 2008 Bull S.A.S.
6 * Author: Laurent Vivier <Laurent.Vivier@bull.net>
9 * Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
30 #include "qemu/osdep.h"
33 #include "qapi/error.h"
34 #include "nbd-client.h"
36 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
37 #define INDEX_TO_HANDLE(bs, index) ((index) ^ (uint64_t)(intptr_t)(bs))
39 static void nbd_recv_coroutines_wake_all(NBDClientSession
*s
)
43 for (i
= 0; i
< MAX_NBD_REQUESTS
; i
++) {
44 NBDClientRequest
*req
= &s
->requests
[i
];
46 if (req
->coroutine
&& req
->receiving
) {
47 aio_co_wake(req
->coroutine
);
52 static void nbd_teardown_connection(BlockDriverState
*bs
)
54 NBDClientSession
*client
= nbd_get_client_session(bs
);
58 /* finish any pending coroutines */
59 qio_channel_shutdown(client
->ioc
,
60 QIO_CHANNEL_SHUTDOWN_BOTH
,
62 BDRV_POLL_WHILE(bs
, client
->connection_co
);
64 nbd_client_detach_aio_context(bs
);
65 object_unref(OBJECT(client
->sioc
));
67 object_unref(OBJECT(client
->ioc
));
71 static coroutine_fn
void nbd_connection_entry(void *opaque
)
73 NBDClientSession
*s
= opaque
;
76 Error
*local_err
= NULL
;
80 * The NBD client can only really be considered idle when it has
81 * yielded from qio_channel_readv_all_eof(), waiting for data. This is
82 * the point where the additional scheduled coroutine entry happens
83 * after nbd_client_attach_aio_context().
85 * Therefore we keep an additional in_flight reference all the time and
86 * only drop it temporarily here.
88 assert(s
->reply
.handle
== 0);
89 ret
= nbd_receive_reply(s
->bs
, s
->ioc
, &s
->reply
, &local_err
);
92 trace_nbd_read_reply_entry_fail(ret
, error_get_pretty(local_err
));
93 error_free(local_err
);
99 /* There's no need for a mutex on the receive side, because the
100 * handler acts as a synchronization point and ensures that only
101 * one coroutine is called until the reply finishes.
103 i
= HANDLE_TO_INDEX(s
, s
->reply
.handle
);
104 if (i
>= MAX_NBD_REQUESTS
||
105 !s
->requests
[i
].coroutine
||
106 !s
->requests
[i
].receiving
||
107 (nbd_reply_is_structured(&s
->reply
) && !s
->info
.structured_reply
))
112 /* We're woken up again by the request itself. Note that there
113 * is no race between yielding and reentering connection_co. This
116 * - if the request runs on the same AioContext, it is only
117 * entered after we yield
119 * - if the request runs on a different AioContext, reentering
120 * connection_co happens through a bottom half, which can only
121 * run after we yield.
123 aio_co_wake(s
->requests
[i
].coroutine
);
124 qemu_coroutine_yield();
128 nbd_recv_coroutines_wake_all(s
);
129 bdrv_dec_in_flight(s
->bs
);
131 s
->connection_co
= NULL
;
135 static int nbd_co_send_request(BlockDriverState
*bs
,
139 NBDClientSession
*s
= nbd_get_client_session(bs
);
142 qemu_co_mutex_lock(&s
->send_mutex
);
143 while (s
->in_flight
== MAX_NBD_REQUESTS
) {
144 qemu_co_queue_wait(&s
->free_sema
, &s
->send_mutex
);
148 for (i
= 0; i
< MAX_NBD_REQUESTS
; i
++) {
149 if (s
->requests
[i
].coroutine
== NULL
) {
154 g_assert(qemu_in_coroutine());
155 assert(i
< MAX_NBD_REQUESTS
);
157 s
->requests
[i
].coroutine
= qemu_coroutine_self();
158 s
->requests
[i
].offset
= request
->from
;
159 s
->requests
[i
].receiving
= false;
161 request
->handle
= INDEX_TO_HANDLE(s
, i
);
170 qio_channel_set_cork(s
->ioc
, true);
171 rc
= nbd_send_request(s
->ioc
, request
);
172 if (rc
>= 0 && !s
->quit
) {
173 if (qio_channel_writev_all(s
->ioc
, qiov
->iov
, qiov
->niov
,
177 } else if (rc
>= 0) {
180 qio_channel_set_cork(s
->ioc
, false);
182 rc
= nbd_send_request(s
->ioc
, request
);
188 s
->requests
[i
].coroutine
= NULL
;
190 qemu_co_queue_next(&s
->free_sema
);
192 qemu_co_mutex_unlock(&s
->send_mutex
);
196 static inline uint16_t payload_advance16(uint8_t **payload
)
199 return lduw_be_p(*payload
- 2);
202 static inline uint32_t payload_advance32(uint8_t **payload
)
205 return ldl_be_p(*payload
- 4);
208 static inline uint64_t payload_advance64(uint8_t **payload
)
211 return ldq_be_p(*payload
- 8);
214 static int nbd_parse_offset_hole_payload(NBDStructuredReplyChunk
*chunk
,
215 uint8_t *payload
, uint64_t orig_offset
,
216 QEMUIOVector
*qiov
, Error
**errp
)
221 if (chunk
->length
!= sizeof(offset
) + sizeof(hole_size
)) {
222 error_setg(errp
, "Protocol error: invalid payload for "
223 "NBD_REPLY_TYPE_OFFSET_HOLE");
227 offset
= payload_advance64(&payload
);
228 hole_size
= payload_advance32(&payload
);
230 if (!hole_size
|| offset
< orig_offset
|| hole_size
> qiov
->size
||
231 offset
> orig_offset
+ qiov
->size
- hole_size
) {
232 error_setg(errp
, "Protocol error: server sent chunk exceeding requested"
237 qemu_iovec_memset(qiov
, offset
- orig_offset
, 0, hole_size
);
242 /* nbd_parse_blockstatus_payload
243 * Based on our request, we expect only one extent in reply, for the
244 * base:allocation context.
246 static int nbd_parse_blockstatus_payload(NBDClientSession
*client
,
247 NBDStructuredReplyChunk
*chunk
,
248 uint8_t *payload
, uint64_t orig_length
,
249 NBDExtent
*extent
, Error
**errp
)
253 /* The server succeeded, so it must have sent [at least] one extent */
254 if (chunk
->length
< sizeof(context_id
) + sizeof(*extent
)) {
255 error_setg(errp
, "Protocol error: invalid payload for "
256 "NBD_REPLY_TYPE_BLOCK_STATUS");
260 context_id
= payload_advance32(&payload
);
261 if (client
->info
.context_id
!= context_id
) {
262 error_setg(errp
, "Protocol error: unexpected context id %d for "
263 "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
264 "id is %d", context_id
,
265 client
->info
.context_id
);
269 extent
->length
= payload_advance32(&payload
);
270 extent
->flags
= payload_advance32(&payload
);
272 if (extent
->length
== 0 ||
273 (client
->info
.min_block
&& !QEMU_IS_ALIGNED(extent
->length
,
274 client
->info
.min_block
))) {
275 error_setg(errp
, "Protocol error: server sent status chunk with "
281 * We used NBD_CMD_FLAG_REQ_ONE, so the server should not have
282 * sent us any more than one extent, nor should it have included
283 * status beyond our request in that extent. However, it's easy
284 * enough to ignore the server's noncompliance without killing the
285 * connection; just ignore trailing extents, and clamp things to
286 * the length of our request.
288 if (chunk
->length
> sizeof(context_id
) + sizeof(*extent
)) {
289 trace_nbd_parse_blockstatus_compliance("more than one extent");
291 if (extent
->length
> orig_length
) {
292 extent
->length
= orig_length
;
293 trace_nbd_parse_blockstatus_compliance("extent length too large");
299 /* nbd_parse_error_payload
300 * on success @errp contains message describing nbd error reply
302 static int nbd_parse_error_payload(NBDStructuredReplyChunk
*chunk
,
303 uint8_t *payload
, int *request_ret
,
307 uint16_t message_size
;
309 assert(chunk
->type
& (1 << 15));
311 if (chunk
->length
< sizeof(error
) + sizeof(message_size
)) {
313 "Protocol error: invalid payload for structured error");
317 error
= nbd_errno_to_system_errno(payload_advance32(&payload
));
319 error_setg(errp
, "Protocol error: server sent structured error chunk "
324 *request_ret
= -error
;
325 message_size
= payload_advance16(&payload
);
327 if (message_size
> chunk
->length
- sizeof(error
) - sizeof(message_size
)) {
328 error_setg(errp
, "Protocol error: server sent structured error chunk "
329 "with incorrect message size");
333 /* TODO: Add a trace point to mention the server complaint */
335 /* TODO handle ERROR_OFFSET */
340 static int nbd_co_receive_offset_data_payload(NBDClientSession
*s
,
341 uint64_t orig_offset
,
342 QEMUIOVector
*qiov
, Error
**errp
)
344 QEMUIOVector sub_qiov
;
348 NBDStructuredReplyChunk
*chunk
= &s
->reply
.structured
;
350 assert(nbd_reply_is_structured(&s
->reply
));
352 /* The NBD spec requires at least one byte of payload */
353 if (chunk
->length
<= sizeof(offset
)) {
354 error_setg(errp
, "Protocol error: invalid payload for "
355 "NBD_REPLY_TYPE_OFFSET_DATA");
359 if (nbd_read64(s
->ioc
, &offset
, "OFFSET_DATA offset", errp
) < 0) {
363 data_size
= chunk
->length
- sizeof(offset
);
365 if (offset
< orig_offset
|| data_size
> qiov
->size
||
366 offset
> orig_offset
+ qiov
->size
- data_size
) {
367 error_setg(errp
, "Protocol error: server sent chunk exceeding requested"
372 qemu_iovec_init(&sub_qiov
, qiov
->niov
);
373 qemu_iovec_concat(&sub_qiov
, qiov
, offset
- orig_offset
, data_size
);
374 ret
= qio_channel_readv_all(s
->ioc
, sub_qiov
.iov
, sub_qiov
.niov
, errp
);
375 qemu_iovec_destroy(&sub_qiov
);
377 return ret
< 0 ? -EIO
: 0;
380 #define NBD_MAX_MALLOC_PAYLOAD 1000
381 /* nbd_co_receive_structured_payload
383 static coroutine_fn
int nbd_co_receive_structured_payload(
384 NBDClientSession
*s
, void **payload
, Error
**errp
)
389 assert(nbd_reply_is_structured(&s
->reply
));
391 len
= s
->reply
.structured
.length
;
397 if (payload
== NULL
) {
398 error_setg(errp
, "Unexpected structured payload");
402 if (len
> NBD_MAX_MALLOC_PAYLOAD
) {
403 error_setg(errp
, "Payload too large");
407 *payload
= g_new(char, len
);
408 ret
= nbd_read(s
->ioc
, *payload
, len
, "structured payload", errp
);
418 /* nbd_co_do_receive_one_chunk
420 * set request_ret to received reply error
421 * if qiov is not NULL: read payload to @qiov
422 * for structured reply chunk:
423 * if error chunk: read payload, set @request_ret, do not set @payload
424 * else if offset_data chunk: read payload data to @qiov, do not set @payload
425 * else: read payload to @payload
427 * If function fails, @errp contains corresponding error message, and the
428 * connection with the server is suspect. If it returns 0, then the
429 * transaction succeeded (although @request_ret may be a negative errno
430 * corresponding to the server's error reply), and errp is unchanged.
432 static coroutine_fn
int nbd_co_do_receive_one_chunk(
433 NBDClientSession
*s
, uint64_t handle
, bool only_structured
,
434 int *request_ret
, QEMUIOVector
*qiov
, void **payload
, Error
**errp
)
437 int i
= HANDLE_TO_INDEX(s
, handle
);
438 void *local_payload
= NULL
;
439 NBDStructuredReplyChunk
*chunk
;
446 /* Wait until we're woken up by nbd_connection_entry. */
447 s
->requests
[i
].receiving
= true;
448 qemu_coroutine_yield();
449 s
->requests
[i
].receiving
= false;
451 error_setg(errp
, "Connection closed");
456 assert(s
->reply
.handle
== handle
);
458 if (nbd_reply_is_simple(&s
->reply
)) {
459 if (only_structured
) {
460 error_setg(errp
, "Protocol error: simple reply when structured "
461 "reply chunk was expected");
465 *request_ret
= -nbd_errno_to_system_errno(s
->reply
.simple
.error
);
466 if (*request_ret
< 0 || !qiov
) {
470 return qio_channel_readv_all(s
->ioc
, qiov
->iov
, qiov
->niov
,
471 errp
) < 0 ? -EIO
: 0;
474 /* handle structured reply chunk */
475 assert(s
->info
.structured_reply
);
476 chunk
= &s
->reply
.structured
;
478 if (chunk
->type
== NBD_REPLY_TYPE_NONE
) {
479 if (!(chunk
->flags
& NBD_REPLY_FLAG_DONE
)) {
480 error_setg(errp
, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
481 " NBD_REPLY_FLAG_DONE flag set");
485 error_setg(errp
, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
492 if (chunk
->type
== NBD_REPLY_TYPE_OFFSET_DATA
) {
494 error_setg(errp
, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
498 return nbd_co_receive_offset_data_payload(s
, s
->requests
[i
].offset
,
502 if (nbd_reply_type_is_error(chunk
->type
)) {
503 payload
= &local_payload
;
506 ret
= nbd_co_receive_structured_payload(s
, payload
, errp
);
511 if (nbd_reply_type_is_error(chunk
->type
)) {
512 ret
= nbd_parse_error_payload(chunk
, local_payload
, request_ret
, errp
);
513 g_free(local_payload
);
520 /* nbd_co_receive_one_chunk
521 * Read reply, wake up connection_co and set s->quit if needed.
522 * Return value is a fatal error code or normal nbd reply error code
524 static coroutine_fn
int nbd_co_receive_one_chunk(
525 NBDClientSession
*s
, uint64_t handle
, bool only_structured
,
526 int *request_ret
, QEMUIOVector
*qiov
, NBDReply
*reply
, void **payload
,
529 int ret
= nbd_co_do_receive_one_chunk(s
, handle
, only_structured
,
530 request_ret
, qiov
, payload
, errp
);
535 /* For assert at loop start in nbd_connection_entry */
542 if (s
->connection_co
) {
543 aio_co_wake(s
->connection_co
);
549 typedef struct NBDReplyChunkIter
{
553 bool done
, only_structured
;
556 static void nbd_iter_channel_error(NBDReplyChunkIter
*iter
,
557 int ret
, Error
**local_err
)
563 error_propagate(&iter
->err
, *local_err
);
565 error_free(*local_err
);
571 static void nbd_iter_request_error(NBDReplyChunkIter
*iter
, int ret
)
575 if (!iter
->request_ret
) {
576 iter
->request_ret
= ret
;
580 /* NBD_FOREACH_REPLY_CHUNK
582 #define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
583 qiov, reply, payload) \
584 for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
585 nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
587 /* nbd_reply_chunk_iter_receive
589 static bool nbd_reply_chunk_iter_receive(NBDClientSession
*s
,
590 NBDReplyChunkIter
*iter
,
592 QEMUIOVector
*qiov
, NBDReply
*reply
,
595 int ret
, request_ret
;
596 NBDReply local_reply
;
597 NBDStructuredReplyChunk
*chunk
;
598 Error
*local_err
= NULL
;
600 error_setg(&local_err
, "Connection closed");
601 nbd_iter_channel_error(iter
, -EIO
, &local_err
);
606 /* Previous iteration was last. */
611 reply
= &local_reply
;
614 ret
= nbd_co_receive_one_chunk(s
, handle
, iter
->only_structured
,
615 &request_ret
, qiov
, reply
, payload
,
618 nbd_iter_channel_error(iter
, ret
, &local_err
);
619 } else if (request_ret
< 0) {
620 nbd_iter_request_error(iter
, request_ret
);
623 /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
624 if (nbd_reply_is_simple(reply
) || s
->quit
) {
628 chunk
= &reply
->structured
;
629 iter
->only_structured
= true;
631 if (chunk
->type
== NBD_REPLY_TYPE_NONE
) {
632 /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
633 assert(chunk
->flags
& NBD_REPLY_FLAG_DONE
);
637 if (chunk
->flags
& NBD_REPLY_FLAG_DONE
) {
638 /* This iteration is last. */
642 /* Execute the loop body */
646 s
->requests
[HANDLE_TO_INDEX(s
, handle
)].coroutine
= NULL
;
648 qemu_co_mutex_lock(&s
->send_mutex
);
650 qemu_co_queue_next(&s
->free_sema
);
651 qemu_co_mutex_unlock(&s
->send_mutex
);
656 static int nbd_co_receive_return_code(NBDClientSession
*s
, uint64_t handle
,
657 int *request_ret
, Error
**errp
)
659 NBDReplyChunkIter iter
;
661 NBD_FOREACH_REPLY_CHUNK(s
, iter
, handle
, false, NULL
, NULL
, NULL
) {
662 /* nbd_reply_chunk_iter_receive does all the work */
665 error_propagate(errp
, iter
.err
);
666 *request_ret
= iter
.request_ret
;
670 static int nbd_co_receive_cmdread_reply(NBDClientSession
*s
, uint64_t handle
,
671 uint64_t offset
, QEMUIOVector
*qiov
,
672 int *request_ret
, Error
**errp
)
674 NBDReplyChunkIter iter
;
676 void *payload
= NULL
;
677 Error
*local_err
= NULL
;
679 NBD_FOREACH_REPLY_CHUNK(s
, iter
, handle
, s
->info
.structured_reply
,
680 qiov
, &reply
, &payload
)
683 NBDStructuredReplyChunk
*chunk
= &reply
.structured
;
685 assert(nbd_reply_is_structured(&reply
));
687 switch (chunk
->type
) {
688 case NBD_REPLY_TYPE_OFFSET_DATA
:
689 /* special cased in nbd_co_receive_one_chunk, data is already
692 case NBD_REPLY_TYPE_OFFSET_HOLE
:
693 ret
= nbd_parse_offset_hole_payload(&reply
.structured
, payload
,
694 offset
, qiov
, &local_err
);
697 nbd_iter_channel_error(&iter
, ret
, &local_err
);
701 if (!nbd_reply_type_is_error(chunk
->type
)) {
702 /* not allowed reply type */
704 error_setg(&local_err
,
705 "Unexpected reply type: %d (%s) for CMD_READ",
706 chunk
->type
, nbd_reply_type_lookup(chunk
->type
));
707 nbd_iter_channel_error(&iter
, -EINVAL
, &local_err
);
715 error_propagate(errp
, iter
.err
);
716 *request_ret
= iter
.request_ret
;
720 static int nbd_co_receive_blockstatus_reply(NBDClientSession
*s
,
721 uint64_t handle
, uint64_t length
,
723 int *request_ret
, Error
**errp
)
725 NBDReplyChunkIter iter
;
727 void *payload
= NULL
;
728 Error
*local_err
= NULL
;
729 bool received
= false;
731 assert(!extent
->length
);
732 NBD_FOREACH_REPLY_CHUNK(s
, iter
, handle
, s
->info
.structured_reply
,
733 NULL
, &reply
, &payload
)
736 NBDStructuredReplyChunk
*chunk
= &reply
.structured
;
738 assert(nbd_reply_is_structured(&reply
));
740 switch (chunk
->type
) {
741 case NBD_REPLY_TYPE_BLOCK_STATUS
:
744 error_setg(&local_err
, "Several BLOCK_STATUS chunks in reply");
745 nbd_iter_channel_error(&iter
, -EINVAL
, &local_err
);
749 ret
= nbd_parse_blockstatus_payload(s
, &reply
.structured
,
750 payload
, length
, extent
,
754 nbd_iter_channel_error(&iter
, ret
, &local_err
);
758 if (!nbd_reply_type_is_error(chunk
->type
)) {
760 error_setg(&local_err
,
761 "Unexpected reply type: %d (%s) "
762 "for CMD_BLOCK_STATUS",
763 chunk
->type
, nbd_reply_type_lookup(chunk
->type
));
764 nbd_iter_channel_error(&iter
, -EINVAL
, &local_err
);
772 if (!extent
->length
&& !iter
.err
) {
773 error_setg(&iter
.err
,
774 "Server did not reply with any status extents");
780 error_propagate(errp
, iter
.err
);
781 *request_ret
= iter
.request_ret
;
785 static int nbd_co_request(BlockDriverState
*bs
, NBDRequest
*request
,
786 QEMUIOVector
*write_qiov
)
788 int ret
, request_ret
;
789 Error
*local_err
= NULL
;
790 NBDClientSession
*client
= nbd_get_client_session(bs
);
792 assert(request
->type
!= NBD_CMD_READ
);
794 assert(request
->type
== NBD_CMD_WRITE
);
795 assert(request
->len
== iov_size(write_qiov
->iov
, write_qiov
->niov
));
797 assert(request
->type
!= NBD_CMD_WRITE
);
799 ret
= nbd_co_send_request(bs
, request
, write_qiov
);
804 ret
= nbd_co_receive_return_code(client
, request
->handle
,
805 &request_ret
, &local_err
);
807 trace_nbd_co_request_fail(request
->from
, request
->len
, request
->handle
,
808 request
->flags
, request
->type
,
809 nbd_cmd_lookup(request
->type
),
810 ret
, error_get_pretty(local_err
));
811 error_free(local_err
);
813 return ret
? ret
: request_ret
;
816 int nbd_client_co_preadv(BlockDriverState
*bs
, uint64_t offset
,
817 uint64_t bytes
, QEMUIOVector
*qiov
, int flags
)
819 int ret
, request_ret
;
820 Error
*local_err
= NULL
;
821 NBDClientSession
*client
= nbd_get_client_session(bs
);
822 NBDRequest request
= {
823 .type
= NBD_CMD_READ
,
828 assert(bytes
<= NBD_MAX_BUFFER_SIZE
);
834 ret
= nbd_co_send_request(bs
, &request
, NULL
);
839 ret
= nbd_co_receive_cmdread_reply(client
, request
.handle
, offset
, qiov
,
840 &request_ret
, &local_err
);
842 trace_nbd_co_request_fail(request
.from
, request
.len
, request
.handle
,
843 request
.flags
, request
.type
,
844 nbd_cmd_lookup(request
.type
),
845 ret
, error_get_pretty(local_err
));
846 error_free(local_err
);
848 return ret
? ret
: request_ret
;
851 int nbd_client_co_pwritev(BlockDriverState
*bs
, uint64_t offset
,
852 uint64_t bytes
, QEMUIOVector
*qiov
, int flags
)
854 NBDClientSession
*client
= nbd_get_client_session(bs
);
855 NBDRequest request
= {
856 .type
= NBD_CMD_WRITE
,
861 assert(!(client
->info
.flags
& NBD_FLAG_READ_ONLY
));
862 if (flags
& BDRV_REQ_FUA
) {
863 assert(client
->info
.flags
& NBD_FLAG_SEND_FUA
);
864 request
.flags
|= NBD_CMD_FLAG_FUA
;
867 assert(bytes
<= NBD_MAX_BUFFER_SIZE
);
872 return nbd_co_request(bs
, &request
, qiov
);
875 int nbd_client_co_pwrite_zeroes(BlockDriverState
*bs
, int64_t offset
,
876 int bytes
, BdrvRequestFlags flags
)
878 NBDClientSession
*client
= nbd_get_client_session(bs
);
879 NBDRequest request
= {
880 .type
= NBD_CMD_WRITE_ZEROES
,
885 assert(!(client
->info
.flags
& NBD_FLAG_READ_ONLY
));
886 if (!(client
->info
.flags
& NBD_FLAG_SEND_WRITE_ZEROES
)) {
890 if (flags
& BDRV_REQ_FUA
) {
891 assert(client
->info
.flags
& NBD_FLAG_SEND_FUA
);
892 request
.flags
|= NBD_CMD_FLAG_FUA
;
894 if (!(flags
& BDRV_REQ_MAY_UNMAP
)) {
895 request
.flags
|= NBD_CMD_FLAG_NO_HOLE
;
901 return nbd_co_request(bs
, &request
, NULL
);
904 int nbd_client_co_flush(BlockDriverState
*bs
)
906 NBDClientSession
*client
= nbd_get_client_session(bs
);
907 NBDRequest request
= { .type
= NBD_CMD_FLUSH
};
909 if (!(client
->info
.flags
& NBD_FLAG_SEND_FLUSH
)) {
916 return nbd_co_request(bs
, &request
, NULL
);
919 int nbd_client_co_pdiscard(BlockDriverState
*bs
, int64_t offset
, int bytes
)
921 NBDClientSession
*client
= nbd_get_client_session(bs
);
922 NBDRequest request
= {
923 .type
= NBD_CMD_TRIM
,
928 assert(!(client
->info
.flags
& NBD_FLAG_READ_ONLY
));
929 if (!(client
->info
.flags
& NBD_FLAG_SEND_TRIM
) || !bytes
) {
933 return nbd_co_request(bs
, &request
, NULL
);
936 int coroutine_fn
nbd_client_co_block_status(BlockDriverState
*bs
,
938 int64_t offset
, int64_t bytes
,
939 int64_t *pnum
, int64_t *map
,
940 BlockDriverState
**file
)
942 int ret
, request_ret
;
943 NBDExtent extent
= { 0 };
944 NBDClientSession
*client
= nbd_get_client_session(bs
);
945 Error
*local_err
= NULL
;
947 NBDRequest request
= {
948 .type
= NBD_CMD_BLOCK_STATUS
,
950 .len
= MIN(MIN_NON_ZERO(QEMU_ALIGN_DOWN(INT_MAX
,
951 bs
->bl
.request_alignment
),
952 client
->info
.max_block
), bytes
),
953 .flags
= NBD_CMD_FLAG_REQ_ONE
,
956 if (!client
->info
.base_allocation
) {
958 return BDRV_BLOCK_DATA
;
961 ret
= nbd_co_send_request(bs
, &request
, NULL
);
966 ret
= nbd_co_receive_blockstatus_reply(client
, request
.handle
, bytes
,
967 &extent
, &request_ret
, &local_err
);
969 trace_nbd_co_request_fail(request
.from
, request
.len
, request
.handle
,
970 request
.flags
, request
.type
,
971 nbd_cmd_lookup(request
.type
),
972 ret
, error_get_pretty(local_err
));
973 error_free(local_err
);
975 if (ret
< 0 || request_ret
< 0) {
976 return ret
? ret
: request_ret
;
979 assert(extent
.length
);
980 *pnum
= extent
.length
;
981 return (extent
.flags
& NBD_STATE_HOLE
? 0 : BDRV_BLOCK_DATA
) |
982 (extent
.flags
& NBD_STATE_ZERO
? BDRV_BLOCK_ZERO
: 0);
985 void nbd_client_detach_aio_context(BlockDriverState
*bs
)
987 NBDClientSession
*client
= nbd_get_client_session(bs
);
988 qio_channel_detach_aio_context(QIO_CHANNEL(client
->ioc
));
991 static void nbd_client_attach_aio_context_bh(void *opaque
)
993 BlockDriverState
*bs
= opaque
;
994 NBDClientSession
*client
= nbd_get_client_session(bs
);
996 /* The node is still drained, so we know the coroutine has yielded in
997 * nbd_read_eof(), the only place where bs->in_flight can reach 0, or it is
998 * entered for the first time. Both places are safe for entering the
1000 qemu_aio_coroutine_enter(bs
->aio_context
, client
->connection_co
);
1001 bdrv_dec_in_flight(bs
);
1004 void nbd_client_attach_aio_context(BlockDriverState
*bs
,
1005 AioContext
*new_context
)
1007 NBDClientSession
*client
= nbd_get_client_session(bs
);
1008 qio_channel_attach_aio_context(QIO_CHANNEL(client
->ioc
), new_context
);
1010 bdrv_inc_in_flight(bs
);
1012 /* Need to wait here for the BH to run because the BH must run while the
1013 * node is still drained. */
1014 aio_wait_bh_oneshot(new_context
, nbd_client_attach_aio_context_bh
, bs
);
1017 void nbd_client_close(BlockDriverState
*bs
)
1019 NBDClientSession
*client
= nbd_get_client_session(bs
);
1020 NBDRequest request
= { .type
= NBD_CMD_DISC
};
1022 assert(client
->ioc
);
1024 nbd_send_request(client
->ioc
, &request
);
1026 nbd_teardown_connection(bs
);
1029 static QIOChannelSocket
*nbd_establish_connection(SocketAddress
*saddr
,
1032 QIOChannelSocket
*sioc
;
1033 Error
*local_err
= NULL
;
1035 sioc
= qio_channel_socket_new();
1036 qio_channel_set_name(QIO_CHANNEL(sioc
), "nbd-client");
1038 qio_channel_socket_connect_sync(sioc
, saddr
, &local_err
);
1040 object_unref(OBJECT(sioc
));
1041 error_propagate(errp
, local_err
);
1045 qio_channel_set_delay(QIO_CHANNEL(sioc
), false);
1050 static int nbd_client_connect(BlockDriverState
*bs
,
1051 SocketAddress
*saddr
,
1053 QCryptoTLSCreds
*tlscreds
,
1054 const char *hostname
,
1055 const char *x_dirty_bitmap
,
1058 NBDClientSession
*client
= nbd_get_client_session(bs
);
1062 * establish TCP connection, return error if it fails
1063 * TODO: Configurable retry-until-timeout behaviour.
1065 QIOChannelSocket
*sioc
= nbd_establish_connection(saddr
, errp
);
1068 return -ECONNREFUSED
;
1072 logout("session init %s\n", export
);
1073 qio_channel_set_blocking(QIO_CHANNEL(sioc
), true, NULL
);
1075 client
->info
.request_sizes
= true;
1076 client
->info
.structured_reply
= true;
1077 client
->info
.base_allocation
= true;
1078 client
->info
.x_dirty_bitmap
= g_strdup(x_dirty_bitmap
);
1079 client
->info
.name
= g_strdup(export
?: "");
1080 ret
= nbd_receive_negotiate(QIO_CHANNEL(sioc
), tlscreds
, hostname
,
1081 &client
->ioc
, &client
->info
, errp
);
1082 g_free(client
->info
.x_dirty_bitmap
);
1083 g_free(client
->info
.name
);
1085 logout("Failed to negotiate with the NBD server\n");
1086 object_unref(OBJECT(sioc
));
1089 if (x_dirty_bitmap
&& !client
->info
.base_allocation
) {
1090 error_setg(errp
, "requested x-dirty-bitmap %s not found",
1095 if (client
->info
.flags
& NBD_FLAG_READ_ONLY
) {
1096 ret
= bdrv_apply_auto_read_only(bs
, "NBD export is read-only", errp
);
1101 if (client
->info
.flags
& NBD_FLAG_SEND_FUA
) {
1102 bs
->supported_write_flags
= BDRV_REQ_FUA
;
1103 bs
->supported_zero_flags
|= BDRV_REQ_FUA
;
1105 if (client
->info
.flags
& NBD_FLAG_SEND_WRITE_ZEROES
) {
1106 bs
->supported_zero_flags
|= BDRV_REQ_MAY_UNMAP
;
1109 client
->sioc
= sioc
;
1112 client
->ioc
= QIO_CHANNEL(sioc
);
1113 object_ref(OBJECT(client
->ioc
));
1116 /* Now that we're connected, set the socket to be non-blocking and
1117 * kick the reply mechanism. */
1118 qio_channel_set_blocking(QIO_CHANNEL(sioc
), false, NULL
);
1119 client
->connection_co
= qemu_coroutine_create(nbd_connection_entry
, client
);
1120 bdrv_inc_in_flight(bs
);
1121 nbd_client_attach_aio_context(bs
, bdrv_get_aio_context(bs
));
1123 logout("Established connection with NBD server\n");
1128 * We have connected, but must fail for other reasons. The
1129 * connection is still blocking; send NBD_CMD_DISC as a courtesy
1133 NBDRequest request
= { .type
= NBD_CMD_DISC
};
1135 nbd_send_request(client
->ioc
?: QIO_CHANNEL(sioc
), &request
);
1137 object_unref(OBJECT(sioc
));
1143 int nbd_client_init(BlockDriverState
*bs
,
1144 SocketAddress
*saddr
,
1146 QCryptoTLSCreds
*tlscreds
,
1147 const char *hostname
,
1148 const char *x_dirty_bitmap
,
1151 NBDClientSession
*client
= nbd_get_client_session(bs
);
1154 qemu_co_mutex_init(&client
->send_mutex
);
1155 qemu_co_queue_init(&client
->free_sema
);
1157 return nbd_client_connect(bs
, saddr
, export
, tlscreds
, hostname
,
1158 x_dirty_bitmap
, errp
);