]> git.proxmox.com Git - mirror_qemu.git/blob - block/nbd-client.c
nbd/client: Support qemu-img convert from unaligned size
[mirror_qemu.git] / block / nbd-client.c
1 /*
2 * QEMU Block driver for NBD
3 *
4 * Copyright (C) 2016 Red Hat, Inc.
5 * Copyright (C) 2008 Bull S.A.S.
6 * Author: Laurent Vivier <Laurent.Vivier@bull.net>
7 *
8 * Some parts:
9 * Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 * THE SOFTWARE.
28 */
29
30 #include "qemu/osdep.h"
31
32 #include "trace.h"
33 #include "qapi/error.h"
34 #include "nbd-client.h"
35
36 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
37 #define INDEX_TO_HANDLE(bs, index) ((index) ^ (uint64_t)(intptr_t)(bs))
38
39 static void nbd_recv_coroutines_wake_all(NBDClientSession *s)
40 {
41 int i;
42
43 for (i = 0; i < MAX_NBD_REQUESTS; i++) {
44 NBDClientRequest *req = &s->requests[i];
45
46 if (req->coroutine && req->receiving) {
47 aio_co_wake(req->coroutine);
48 }
49 }
50 }
51
52 static void nbd_teardown_connection(BlockDriverState *bs)
53 {
54 NBDClientSession *client = nbd_get_client_session(bs);
55
56 assert(client->ioc);
57
58 /* finish any pending coroutines */
59 qio_channel_shutdown(client->ioc,
60 QIO_CHANNEL_SHUTDOWN_BOTH,
61 NULL);
62 BDRV_POLL_WHILE(bs, client->connection_co);
63
64 nbd_client_detach_aio_context(bs);
65 object_unref(OBJECT(client->sioc));
66 client->sioc = NULL;
67 object_unref(OBJECT(client->ioc));
68 client->ioc = NULL;
69 }
70
71 static coroutine_fn void nbd_connection_entry(void *opaque)
72 {
73 NBDClientSession *s = opaque;
74 uint64_t i;
75 int ret = 0;
76 Error *local_err = NULL;
77
78 while (!s->quit) {
79 /*
80 * The NBD client can only really be considered idle when it has
81 * yielded from qio_channel_readv_all_eof(), waiting for data. This is
82 * the point where the additional scheduled coroutine entry happens
83 * after nbd_client_attach_aio_context().
84 *
85 * Therefore we keep an additional in_flight reference all the time and
86 * only drop it temporarily here.
87 */
88 assert(s->reply.handle == 0);
89 ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
90
91 if (local_err) {
92 trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
93 error_free(local_err);
94 }
95 if (ret <= 0) {
96 break;
97 }
98
99 /* There's no need for a mutex on the receive side, because the
100 * handler acts as a synchronization point and ensures that only
101 * one coroutine is called until the reply finishes.
102 */
103 i = HANDLE_TO_INDEX(s, s->reply.handle);
104 if (i >= MAX_NBD_REQUESTS ||
105 !s->requests[i].coroutine ||
106 !s->requests[i].receiving ||
107 (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
108 {
109 break;
110 }
111
112 /* We're woken up again by the request itself. Note that there
113 * is no race between yielding and reentering connection_co. This
114 * is because:
115 *
116 * - if the request runs on the same AioContext, it is only
117 * entered after we yield
118 *
119 * - if the request runs on a different AioContext, reentering
120 * connection_co happens through a bottom half, which can only
121 * run after we yield.
122 */
123 aio_co_wake(s->requests[i].coroutine);
124 qemu_coroutine_yield();
125 }
126
127 s->quit = true;
128 nbd_recv_coroutines_wake_all(s);
129 bdrv_dec_in_flight(s->bs);
130
131 s->connection_co = NULL;
132 aio_wait_kick();
133 }
134
135 static int nbd_co_send_request(BlockDriverState *bs,
136 NBDRequest *request,
137 QEMUIOVector *qiov)
138 {
139 NBDClientSession *s = nbd_get_client_session(bs);
140 int rc, i;
141
142 qemu_co_mutex_lock(&s->send_mutex);
143 while (s->in_flight == MAX_NBD_REQUESTS) {
144 qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
145 }
146 s->in_flight++;
147
148 for (i = 0; i < MAX_NBD_REQUESTS; i++) {
149 if (s->requests[i].coroutine == NULL) {
150 break;
151 }
152 }
153
154 g_assert(qemu_in_coroutine());
155 assert(i < MAX_NBD_REQUESTS);
156
157 s->requests[i].coroutine = qemu_coroutine_self();
158 s->requests[i].offset = request->from;
159 s->requests[i].receiving = false;
160
161 request->handle = INDEX_TO_HANDLE(s, i);
162
163 if (s->quit) {
164 rc = -EIO;
165 goto err;
166 }
167 assert(s->ioc);
168
169 if (qiov) {
170 qio_channel_set_cork(s->ioc, true);
171 rc = nbd_send_request(s->ioc, request);
172 if (rc >= 0 && !s->quit) {
173 if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
174 NULL) < 0) {
175 rc = -EIO;
176 }
177 } else if (rc >= 0) {
178 rc = -EIO;
179 }
180 qio_channel_set_cork(s->ioc, false);
181 } else {
182 rc = nbd_send_request(s->ioc, request);
183 }
184
185 err:
186 if (rc < 0) {
187 s->quit = true;
188 s->requests[i].coroutine = NULL;
189 s->in_flight--;
190 qemu_co_queue_next(&s->free_sema);
191 }
192 qemu_co_mutex_unlock(&s->send_mutex);
193 return rc;
194 }
195
196 static inline uint16_t payload_advance16(uint8_t **payload)
197 {
198 *payload += 2;
199 return lduw_be_p(*payload - 2);
200 }
201
202 static inline uint32_t payload_advance32(uint8_t **payload)
203 {
204 *payload += 4;
205 return ldl_be_p(*payload - 4);
206 }
207
208 static inline uint64_t payload_advance64(uint8_t **payload)
209 {
210 *payload += 8;
211 return ldq_be_p(*payload - 8);
212 }
213
214 static int nbd_parse_offset_hole_payload(NBDStructuredReplyChunk *chunk,
215 uint8_t *payload, uint64_t orig_offset,
216 QEMUIOVector *qiov, Error **errp)
217 {
218 uint64_t offset;
219 uint32_t hole_size;
220
221 if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
222 error_setg(errp, "Protocol error: invalid payload for "
223 "NBD_REPLY_TYPE_OFFSET_HOLE");
224 return -EINVAL;
225 }
226
227 offset = payload_advance64(&payload);
228 hole_size = payload_advance32(&payload);
229
230 if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
231 offset > orig_offset + qiov->size - hole_size) {
232 error_setg(errp, "Protocol error: server sent chunk exceeding requested"
233 " region");
234 return -EINVAL;
235 }
236
237 qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
238
239 return 0;
240 }
241
242 /* nbd_parse_blockstatus_payload
243 * Based on our request, we expect only one extent in reply, for the
244 * base:allocation context.
245 */
246 static int nbd_parse_blockstatus_payload(NBDClientSession *client,
247 NBDStructuredReplyChunk *chunk,
248 uint8_t *payload, uint64_t orig_length,
249 NBDExtent *extent, Error **errp)
250 {
251 uint32_t context_id;
252
253 /* The server succeeded, so it must have sent [at least] one extent */
254 if (chunk->length < sizeof(context_id) + sizeof(*extent)) {
255 error_setg(errp, "Protocol error: invalid payload for "
256 "NBD_REPLY_TYPE_BLOCK_STATUS");
257 return -EINVAL;
258 }
259
260 context_id = payload_advance32(&payload);
261 if (client->info.context_id != context_id) {
262 error_setg(errp, "Protocol error: unexpected context id %d for "
263 "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
264 "id is %d", context_id,
265 client->info.context_id);
266 return -EINVAL;
267 }
268
269 extent->length = payload_advance32(&payload);
270 extent->flags = payload_advance32(&payload);
271
272 if (extent->length == 0) {
273 error_setg(errp, "Protocol error: server sent status chunk with "
274 "zero length");
275 return -EINVAL;
276 }
277
278 /*
279 * A server sending unaligned block status is in violation of the
280 * protocol, but as qemu-nbd 3.1 is such a server (at least for
281 * POSIX files that are not a multiple of 512 bytes, since qemu
282 * rounds files up to 512-byte multiples but lseek(SEEK_HOLE)
283 * still sees an implicit hole beyond the real EOF), it's nicer to
284 * work around the misbehaving server. If the request included
285 * more than the final unaligned block, truncate it back to an
286 * aligned result; if the request was only the final block, round
287 * up to the full block and change the status to fully-allocated
288 * (always a safe status, even if it loses information).
289 */
290 if (client->info.min_block && !QEMU_IS_ALIGNED(extent->length,
291 client->info.min_block)) {
292 trace_nbd_parse_blockstatus_compliance("extent length is unaligned");
293 if (extent->length > client->info.min_block) {
294 extent->length = QEMU_ALIGN_DOWN(extent->length,
295 client->info.min_block);
296 } else {
297 extent->length = client->info.min_block;
298 extent->flags = 0;
299 }
300 }
301
302 /*
303 * We used NBD_CMD_FLAG_REQ_ONE, so the server should not have
304 * sent us any more than one extent, nor should it have included
305 * status beyond our request in that extent. However, it's easy
306 * enough to ignore the server's noncompliance without killing the
307 * connection; just ignore trailing extents, and clamp things to
308 * the length of our request.
309 */
310 if (chunk->length > sizeof(context_id) + sizeof(*extent)) {
311 trace_nbd_parse_blockstatus_compliance("more than one extent");
312 }
313 if (extent->length > orig_length) {
314 extent->length = orig_length;
315 trace_nbd_parse_blockstatus_compliance("extent length too large");
316 }
317
318 return 0;
319 }
320
321 /* nbd_parse_error_payload
322 * on success @errp contains message describing nbd error reply
323 */
324 static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
325 uint8_t *payload, int *request_ret,
326 Error **errp)
327 {
328 uint32_t error;
329 uint16_t message_size;
330
331 assert(chunk->type & (1 << 15));
332
333 if (chunk->length < sizeof(error) + sizeof(message_size)) {
334 error_setg(errp,
335 "Protocol error: invalid payload for structured error");
336 return -EINVAL;
337 }
338
339 error = nbd_errno_to_system_errno(payload_advance32(&payload));
340 if (error == 0) {
341 error_setg(errp, "Protocol error: server sent structured error chunk "
342 "with error = 0");
343 return -EINVAL;
344 }
345
346 *request_ret = -error;
347 message_size = payload_advance16(&payload);
348
349 if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
350 error_setg(errp, "Protocol error: server sent structured error chunk "
351 "with incorrect message size");
352 return -EINVAL;
353 }
354
355 /* TODO: Add a trace point to mention the server complaint */
356
357 /* TODO handle ERROR_OFFSET */
358
359 return 0;
360 }
361
362 static int nbd_co_receive_offset_data_payload(NBDClientSession *s,
363 uint64_t orig_offset,
364 QEMUIOVector *qiov, Error **errp)
365 {
366 QEMUIOVector sub_qiov;
367 uint64_t offset;
368 size_t data_size;
369 int ret;
370 NBDStructuredReplyChunk *chunk = &s->reply.structured;
371
372 assert(nbd_reply_is_structured(&s->reply));
373
374 /* The NBD spec requires at least one byte of payload */
375 if (chunk->length <= sizeof(offset)) {
376 error_setg(errp, "Protocol error: invalid payload for "
377 "NBD_REPLY_TYPE_OFFSET_DATA");
378 return -EINVAL;
379 }
380
381 if (nbd_read64(s->ioc, &offset, "OFFSET_DATA offset", errp) < 0) {
382 return -EIO;
383 }
384
385 data_size = chunk->length - sizeof(offset);
386 assert(data_size);
387 if (offset < orig_offset || data_size > qiov->size ||
388 offset > orig_offset + qiov->size - data_size) {
389 error_setg(errp, "Protocol error: server sent chunk exceeding requested"
390 " region");
391 return -EINVAL;
392 }
393
394 qemu_iovec_init(&sub_qiov, qiov->niov);
395 qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
396 ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
397 qemu_iovec_destroy(&sub_qiov);
398
399 return ret < 0 ? -EIO : 0;
400 }
401
402 #define NBD_MAX_MALLOC_PAYLOAD 1000
403 /* nbd_co_receive_structured_payload
404 */
405 static coroutine_fn int nbd_co_receive_structured_payload(
406 NBDClientSession *s, void **payload, Error **errp)
407 {
408 int ret;
409 uint32_t len;
410
411 assert(nbd_reply_is_structured(&s->reply));
412
413 len = s->reply.structured.length;
414
415 if (len == 0) {
416 return 0;
417 }
418
419 if (payload == NULL) {
420 error_setg(errp, "Unexpected structured payload");
421 return -EINVAL;
422 }
423
424 if (len > NBD_MAX_MALLOC_PAYLOAD) {
425 error_setg(errp, "Payload too large");
426 return -EINVAL;
427 }
428
429 *payload = g_new(char, len);
430 ret = nbd_read(s->ioc, *payload, len, "structured payload", errp);
431 if (ret < 0) {
432 g_free(*payload);
433 *payload = NULL;
434 return ret;
435 }
436
437 return 0;
438 }
439
440 /* nbd_co_do_receive_one_chunk
441 * for simple reply:
442 * set request_ret to received reply error
443 * if qiov is not NULL: read payload to @qiov
444 * for structured reply chunk:
445 * if error chunk: read payload, set @request_ret, do not set @payload
446 * else if offset_data chunk: read payload data to @qiov, do not set @payload
447 * else: read payload to @payload
448 *
449 * If function fails, @errp contains corresponding error message, and the
450 * connection with the server is suspect. If it returns 0, then the
451 * transaction succeeded (although @request_ret may be a negative errno
452 * corresponding to the server's error reply), and errp is unchanged.
453 */
454 static coroutine_fn int nbd_co_do_receive_one_chunk(
455 NBDClientSession *s, uint64_t handle, bool only_structured,
456 int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
457 {
458 int ret;
459 int i = HANDLE_TO_INDEX(s, handle);
460 void *local_payload = NULL;
461 NBDStructuredReplyChunk *chunk;
462
463 if (payload) {
464 *payload = NULL;
465 }
466 *request_ret = 0;
467
468 /* Wait until we're woken up by nbd_connection_entry. */
469 s->requests[i].receiving = true;
470 qemu_coroutine_yield();
471 s->requests[i].receiving = false;
472 if (s->quit) {
473 error_setg(errp, "Connection closed");
474 return -EIO;
475 }
476 assert(s->ioc);
477
478 assert(s->reply.handle == handle);
479
480 if (nbd_reply_is_simple(&s->reply)) {
481 if (only_structured) {
482 error_setg(errp, "Protocol error: simple reply when structured "
483 "reply chunk was expected");
484 return -EINVAL;
485 }
486
487 *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
488 if (*request_ret < 0 || !qiov) {
489 return 0;
490 }
491
492 return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
493 errp) < 0 ? -EIO : 0;
494 }
495
496 /* handle structured reply chunk */
497 assert(s->info.structured_reply);
498 chunk = &s->reply.structured;
499
500 if (chunk->type == NBD_REPLY_TYPE_NONE) {
501 if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
502 error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
503 " NBD_REPLY_FLAG_DONE flag set");
504 return -EINVAL;
505 }
506 if (chunk->length) {
507 error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
508 " nonzero length");
509 return -EINVAL;
510 }
511 return 0;
512 }
513
514 if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
515 if (!qiov) {
516 error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
517 return -EINVAL;
518 }
519
520 return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
521 qiov, errp);
522 }
523
524 if (nbd_reply_type_is_error(chunk->type)) {
525 payload = &local_payload;
526 }
527
528 ret = nbd_co_receive_structured_payload(s, payload, errp);
529 if (ret < 0) {
530 return ret;
531 }
532
533 if (nbd_reply_type_is_error(chunk->type)) {
534 ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
535 g_free(local_payload);
536 return ret;
537 }
538
539 return 0;
540 }
541
542 /* nbd_co_receive_one_chunk
543 * Read reply, wake up connection_co and set s->quit if needed.
544 * Return value is a fatal error code or normal nbd reply error code
545 */
546 static coroutine_fn int nbd_co_receive_one_chunk(
547 NBDClientSession *s, uint64_t handle, bool only_structured,
548 int *request_ret, QEMUIOVector *qiov, NBDReply *reply, void **payload,
549 Error **errp)
550 {
551 int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
552 request_ret, qiov, payload, errp);
553
554 if (ret < 0) {
555 s->quit = true;
556 } else {
557 /* For assert at loop start in nbd_connection_entry */
558 if (reply) {
559 *reply = s->reply;
560 }
561 s->reply.handle = 0;
562 }
563
564 if (s->connection_co) {
565 aio_co_wake(s->connection_co);
566 }
567
568 return ret;
569 }
570
571 typedef struct NBDReplyChunkIter {
572 int ret;
573 int request_ret;
574 Error *err;
575 bool done, only_structured;
576 } NBDReplyChunkIter;
577
578 static void nbd_iter_channel_error(NBDReplyChunkIter *iter,
579 int ret, Error **local_err)
580 {
581 assert(ret < 0);
582
583 if (!iter->ret) {
584 iter->ret = ret;
585 error_propagate(&iter->err, *local_err);
586 } else {
587 error_free(*local_err);
588 }
589
590 *local_err = NULL;
591 }
592
593 static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
594 {
595 assert(ret < 0);
596
597 if (!iter->request_ret) {
598 iter->request_ret = ret;
599 }
600 }
601
602 /* NBD_FOREACH_REPLY_CHUNK
603 */
604 #define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
605 qiov, reply, payload) \
606 for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
607 nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
608
609 /* nbd_reply_chunk_iter_receive
610 */
611 static bool nbd_reply_chunk_iter_receive(NBDClientSession *s,
612 NBDReplyChunkIter *iter,
613 uint64_t handle,
614 QEMUIOVector *qiov, NBDReply *reply,
615 void **payload)
616 {
617 int ret, request_ret;
618 NBDReply local_reply;
619 NBDStructuredReplyChunk *chunk;
620 Error *local_err = NULL;
621 if (s->quit) {
622 error_setg(&local_err, "Connection closed");
623 nbd_iter_channel_error(iter, -EIO, &local_err);
624 goto break_loop;
625 }
626
627 if (iter->done) {
628 /* Previous iteration was last. */
629 goto break_loop;
630 }
631
632 if (reply == NULL) {
633 reply = &local_reply;
634 }
635
636 ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
637 &request_ret, qiov, reply, payload,
638 &local_err);
639 if (ret < 0) {
640 nbd_iter_channel_error(iter, ret, &local_err);
641 } else if (request_ret < 0) {
642 nbd_iter_request_error(iter, request_ret);
643 }
644
645 /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
646 if (nbd_reply_is_simple(reply) || s->quit) {
647 goto break_loop;
648 }
649
650 chunk = &reply->structured;
651 iter->only_structured = true;
652
653 if (chunk->type == NBD_REPLY_TYPE_NONE) {
654 /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
655 assert(chunk->flags & NBD_REPLY_FLAG_DONE);
656 goto break_loop;
657 }
658
659 if (chunk->flags & NBD_REPLY_FLAG_DONE) {
660 /* This iteration is last. */
661 iter->done = true;
662 }
663
664 /* Execute the loop body */
665 return true;
666
667 break_loop:
668 s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
669
670 qemu_co_mutex_lock(&s->send_mutex);
671 s->in_flight--;
672 qemu_co_queue_next(&s->free_sema);
673 qemu_co_mutex_unlock(&s->send_mutex);
674
675 return false;
676 }
677
678 static int nbd_co_receive_return_code(NBDClientSession *s, uint64_t handle,
679 int *request_ret, Error **errp)
680 {
681 NBDReplyChunkIter iter;
682
683 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) {
684 /* nbd_reply_chunk_iter_receive does all the work */
685 }
686
687 error_propagate(errp, iter.err);
688 *request_ret = iter.request_ret;
689 return iter.ret;
690 }
691
692 static int nbd_co_receive_cmdread_reply(NBDClientSession *s, uint64_t handle,
693 uint64_t offset, QEMUIOVector *qiov,
694 int *request_ret, Error **errp)
695 {
696 NBDReplyChunkIter iter;
697 NBDReply reply;
698 void *payload = NULL;
699 Error *local_err = NULL;
700
701 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
702 qiov, &reply, &payload)
703 {
704 int ret;
705 NBDStructuredReplyChunk *chunk = &reply.structured;
706
707 assert(nbd_reply_is_structured(&reply));
708
709 switch (chunk->type) {
710 case NBD_REPLY_TYPE_OFFSET_DATA:
711 /* special cased in nbd_co_receive_one_chunk, data is already
712 * in qiov */
713 break;
714 case NBD_REPLY_TYPE_OFFSET_HOLE:
715 ret = nbd_parse_offset_hole_payload(&reply.structured, payload,
716 offset, qiov, &local_err);
717 if (ret < 0) {
718 s->quit = true;
719 nbd_iter_channel_error(&iter, ret, &local_err);
720 }
721 break;
722 default:
723 if (!nbd_reply_type_is_error(chunk->type)) {
724 /* not allowed reply type */
725 s->quit = true;
726 error_setg(&local_err,
727 "Unexpected reply type: %d (%s) for CMD_READ",
728 chunk->type, nbd_reply_type_lookup(chunk->type));
729 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
730 }
731 }
732
733 g_free(payload);
734 payload = NULL;
735 }
736
737 error_propagate(errp, iter.err);
738 *request_ret = iter.request_ret;
739 return iter.ret;
740 }
741
742 static int nbd_co_receive_blockstatus_reply(NBDClientSession *s,
743 uint64_t handle, uint64_t length,
744 NBDExtent *extent,
745 int *request_ret, Error **errp)
746 {
747 NBDReplyChunkIter iter;
748 NBDReply reply;
749 void *payload = NULL;
750 Error *local_err = NULL;
751 bool received = false;
752
753 assert(!extent->length);
754 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, &reply, &payload) {
755 int ret;
756 NBDStructuredReplyChunk *chunk = &reply.structured;
757
758 assert(nbd_reply_is_structured(&reply));
759
760 switch (chunk->type) {
761 case NBD_REPLY_TYPE_BLOCK_STATUS:
762 if (received) {
763 s->quit = true;
764 error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
765 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
766 }
767 received = true;
768
769 ret = nbd_parse_blockstatus_payload(s, &reply.structured,
770 payload, length, extent,
771 &local_err);
772 if (ret < 0) {
773 s->quit = true;
774 nbd_iter_channel_error(&iter, ret, &local_err);
775 }
776 break;
777 default:
778 if (!nbd_reply_type_is_error(chunk->type)) {
779 s->quit = true;
780 error_setg(&local_err,
781 "Unexpected reply type: %d (%s) "
782 "for CMD_BLOCK_STATUS",
783 chunk->type, nbd_reply_type_lookup(chunk->type));
784 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
785 }
786 }
787
788 g_free(payload);
789 payload = NULL;
790 }
791
792 if (!extent->length && !iter.request_ret) {
793 error_setg(&local_err, "Server did not reply with any status extents");
794 nbd_iter_channel_error(&iter, -EIO, &local_err);
795 }
796
797 error_propagate(errp, iter.err);
798 *request_ret = iter.request_ret;
799 return iter.ret;
800 }
801
802 static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
803 QEMUIOVector *write_qiov)
804 {
805 int ret, request_ret;
806 Error *local_err = NULL;
807 NBDClientSession *client = nbd_get_client_session(bs);
808
809 assert(request->type != NBD_CMD_READ);
810 if (write_qiov) {
811 assert(request->type == NBD_CMD_WRITE);
812 assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
813 } else {
814 assert(request->type != NBD_CMD_WRITE);
815 }
816 ret = nbd_co_send_request(bs, request, write_qiov);
817 if (ret < 0) {
818 return ret;
819 }
820
821 ret = nbd_co_receive_return_code(client, request->handle,
822 &request_ret, &local_err);
823 if (local_err) {
824 trace_nbd_co_request_fail(request->from, request->len, request->handle,
825 request->flags, request->type,
826 nbd_cmd_lookup(request->type),
827 ret, error_get_pretty(local_err));
828 error_free(local_err);
829 }
830 return ret ? ret : request_ret;
831 }
832
833 int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
834 uint64_t bytes, QEMUIOVector *qiov, int flags)
835 {
836 int ret, request_ret;
837 Error *local_err = NULL;
838 NBDClientSession *client = nbd_get_client_session(bs);
839 NBDRequest request = {
840 .type = NBD_CMD_READ,
841 .from = offset,
842 .len = bytes,
843 };
844
845 assert(bytes <= NBD_MAX_BUFFER_SIZE);
846 assert(!flags);
847
848 if (!bytes) {
849 return 0;
850 }
851 /*
852 * Work around the fact that the block layer doesn't do
853 * byte-accurate sizing yet - if the read exceeds the server's
854 * advertised size because the block layer rounded size up, then
855 * truncate the request to the server and tail-pad with zero.
856 */
857 if (offset >= client->info.size) {
858 assert(bytes < BDRV_SECTOR_SIZE);
859 qemu_iovec_memset(qiov, 0, 0, bytes);
860 return 0;
861 }
862 if (offset + bytes > client->info.size) {
863 uint64_t slop = offset + bytes - client->info.size;
864
865 assert(slop < BDRV_SECTOR_SIZE);
866 qemu_iovec_memset(qiov, bytes - slop, 0, slop);
867 request.len -= slop;
868 }
869
870 ret = nbd_co_send_request(bs, &request, NULL);
871 if (ret < 0) {
872 return ret;
873 }
874
875 ret = nbd_co_receive_cmdread_reply(client, request.handle, offset, qiov,
876 &request_ret, &local_err);
877 if (local_err) {
878 trace_nbd_co_request_fail(request.from, request.len, request.handle,
879 request.flags, request.type,
880 nbd_cmd_lookup(request.type),
881 ret, error_get_pretty(local_err));
882 error_free(local_err);
883 }
884 return ret ? ret : request_ret;
885 }
886
887 int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
888 uint64_t bytes, QEMUIOVector *qiov, int flags)
889 {
890 NBDClientSession *client = nbd_get_client_session(bs);
891 NBDRequest request = {
892 .type = NBD_CMD_WRITE,
893 .from = offset,
894 .len = bytes,
895 };
896
897 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
898 if (flags & BDRV_REQ_FUA) {
899 assert(client->info.flags & NBD_FLAG_SEND_FUA);
900 request.flags |= NBD_CMD_FLAG_FUA;
901 }
902
903 assert(bytes <= NBD_MAX_BUFFER_SIZE);
904
905 if (!bytes) {
906 return 0;
907 }
908 return nbd_co_request(bs, &request, qiov);
909 }
910
911 int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
912 int bytes, BdrvRequestFlags flags)
913 {
914 NBDClientSession *client = nbd_get_client_session(bs);
915 NBDRequest request = {
916 .type = NBD_CMD_WRITE_ZEROES,
917 .from = offset,
918 .len = bytes,
919 };
920
921 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
922 if (!(client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
923 return -ENOTSUP;
924 }
925
926 if (flags & BDRV_REQ_FUA) {
927 assert(client->info.flags & NBD_FLAG_SEND_FUA);
928 request.flags |= NBD_CMD_FLAG_FUA;
929 }
930 if (!(flags & BDRV_REQ_MAY_UNMAP)) {
931 request.flags |= NBD_CMD_FLAG_NO_HOLE;
932 }
933
934 if (!bytes) {
935 return 0;
936 }
937 return nbd_co_request(bs, &request, NULL);
938 }
939
940 int nbd_client_co_flush(BlockDriverState *bs)
941 {
942 NBDClientSession *client = nbd_get_client_session(bs);
943 NBDRequest request = { .type = NBD_CMD_FLUSH };
944
945 if (!(client->info.flags & NBD_FLAG_SEND_FLUSH)) {
946 return 0;
947 }
948
949 request.from = 0;
950 request.len = 0;
951
952 return nbd_co_request(bs, &request, NULL);
953 }
954
955 int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int bytes)
956 {
957 NBDClientSession *client = nbd_get_client_session(bs);
958 NBDRequest request = {
959 .type = NBD_CMD_TRIM,
960 .from = offset,
961 .len = bytes,
962 };
963
964 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
965 if (!(client->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
966 return 0;
967 }
968
969 return nbd_co_request(bs, &request, NULL);
970 }
971
972 int coroutine_fn nbd_client_co_block_status(BlockDriverState *bs,
973 bool want_zero,
974 int64_t offset, int64_t bytes,
975 int64_t *pnum, int64_t *map,
976 BlockDriverState **file)
977 {
978 int ret, request_ret;
979 NBDExtent extent = { 0 };
980 NBDClientSession *client = nbd_get_client_session(bs);
981 Error *local_err = NULL;
982
983 NBDRequest request = {
984 .type = NBD_CMD_BLOCK_STATUS,
985 .from = offset,
986 .len = MIN(MIN_NON_ZERO(QEMU_ALIGN_DOWN(INT_MAX,
987 bs->bl.request_alignment),
988 client->info.max_block),
989 MIN(bytes, client->info.size - offset)),
990 .flags = NBD_CMD_FLAG_REQ_ONE,
991 };
992
993 if (!client->info.base_allocation) {
994 *pnum = bytes;
995 *map = offset;
996 *file = bs;
997 return BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
998 }
999
1000 /*
1001 * Work around the fact that the block layer doesn't do
1002 * byte-accurate sizing yet - if the status request exceeds the
1003 * server's advertised size because the block layer rounded size
1004 * up, we truncated the request to the server (above), or are
1005 * called on just the hole.
1006 */
1007 if (offset >= client->info.size) {
1008 *pnum = bytes;
1009 assert(bytes < BDRV_SECTOR_SIZE);
1010 /* Intentionally don't report offset_valid for the hole */
1011 return BDRV_BLOCK_ZERO;
1012 }
1013
1014 if (client->info.min_block) {
1015 assert(QEMU_IS_ALIGNED(request.len, client->info.min_block));
1016 }
1017 ret = nbd_co_send_request(bs, &request, NULL);
1018 if (ret < 0) {
1019 return ret;
1020 }
1021
1022 ret = nbd_co_receive_blockstatus_reply(client, request.handle, bytes,
1023 &extent, &request_ret, &local_err);
1024 if (local_err) {
1025 trace_nbd_co_request_fail(request.from, request.len, request.handle,
1026 request.flags, request.type,
1027 nbd_cmd_lookup(request.type),
1028 ret, error_get_pretty(local_err));
1029 error_free(local_err);
1030 }
1031 if (ret < 0 || request_ret < 0) {
1032 return ret ? ret : request_ret;
1033 }
1034
1035 assert(extent.length);
1036 *pnum = extent.length;
1037 *map = offset;
1038 *file = bs;
1039 return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
1040 (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0) |
1041 BDRV_BLOCK_OFFSET_VALID;
1042 }
1043
1044 void nbd_client_detach_aio_context(BlockDriverState *bs)
1045 {
1046 NBDClientSession *client = nbd_get_client_session(bs);
1047 qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
1048 }
1049
1050 static void nbd_client_attach_aio_context_bh(void *opaque)
1051 {
1052 BlockDriverState *bs = opaque;
1053 NBDClientSession *client = nbd_get_client_session(bs);
1054
1055 /* The node is still drained, so we know the coroutine has yielded in
1056 * nbd_read_eof(), the only place where bs->in_flight can reach 0, or it is
1057 * entered for the first time. Both places are safe for entering the
1058 * coroutine.*/
1059 qemu_aio_coroutine_enter(bs->aio_context, client->connection_co);
1060 bdrv_dec_in_flight(bs);
1061 }
1062
1063 void nbd_client_attach_aio_context(BlockDriverState *bs,
1064 AioContext *new_context)
1065 {
1066 NBDClientSession *client = nbd_get_client_session(bs);
1067 qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc), new_context);
1068
1069 bdrv_inc_in_flight(bs);
1070
1071 /* Need to wait here for the BH to run because the BH must run while the
1072 * node is still drained. */
1073 aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
1074 }
1075
1076 void nbd_client_close(BlockDriverState *bs)
1077 {
1078 NBDClientSession *client = nbd_get_client_session(bs);
1079 NBDRequest request = { .type = NBD_CMD_DISC };
1080
1081 assert(client->ioc);
1082
1083 nbd_send_request(client->ioc, &request);
1084
1085 nbd_teardown_connection(bs);
1086 }
1087
1088 static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
1089 Error **errp)
1090 {
1091 QIOChannelSocket *sioc;
1092 Error *local_err = NULL;
1093
1094 sioc = qio_channel_socket_new();
1095 qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
1096
1097 qio_channel_socket_connect_sync(sioc, saddr, &local_err);
1098 if (local_err) {
1099 object_unref(OBJECT(sioc));
1100 error_propagate(errp, local_err);
1101 return NULL;
1102 }
1103
1104 qio_channel_set_delay(QIO_CHANNEL(sioc), false);
1105
1106 return sioc;
1107 }
1108
1109 static int nbd_client_connect(BlockDriverState *bs,
1110 SocketAddress *saddr,
1111 const char *export,
1112 QCryptoTLSCreds *tlscreds,
1113 const char *hostname,
1114 const char *x_dirty_bitmap,
1115 Error **errp)
1116 {
1117 NBDClientSession *client = nbd_get_client_session(bs);
1118 int ret;
1119
1120 /*
1121 * establish TCP connection, return error if it fails
1122 * TODO: Configurable retry-until-timeout behaviour.
1123 */
1124 QIOChannelSocket *sioc = nbd_establish_connection(saddr, errp);
1125
1126 if (!sioc) {
1127 return -ECONNREFUSED;
1128 }
1129
1130 /* NBD handshake */
1131 logout("session init %s\n", export);
1132 qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
1133
1134 client->info.request_sizes = true;
1135 client->info.structured_reply = true;
1136 client->info.base_allocation = true;
1137 client->info.x_dirty_bitmap = g_strdup(x_dirty_bitmap);
1138 client->info.name = g_strdup(export ?: "");
1139 ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), tlscreds, hostname,
1140 &client->ioc, &client->info, errp);
1141 g_free(client->info.x_dirty_bitmap);
1142 g_free(client->info.name);
1143 if (ret < 0) {
1144 logout("Failed to negotiate with the NBD server\n");
1145 object_unref(OBJECT(sioc));
1146 return ret;
1147 }
1148 if (x_dirty_bitmap && !client->info.base_allocation) {
1149 error_setg(errp, "requested x-dirty-bitmap %s not found",
1150 x_dirty_bitmap);
1151 ret = -EINVAL;
1152 goto fail;
1153 }
1154 if (client->info.flags & NBD_FLAG_READ_ONLY) {
1155 ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
1156 if (ret < 0) {
1157 goto fail;
1158 }
1159 }
1160 if (client->info.flags & NBD_FLAG_SEND_FUA) {
1161 bs->supported_write_flags = BDRV_REQ_FUA;
1162 bs->supported_zero_flags |= BDRV_REQ_FUA;
1163 }
1164 if (client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
1165 bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
1166 }
1167
1168 client->sioc = sioc;
1169
1170 if (!client->ioc) {
1171 client->ioc = QIO_CHANNEL(sioc);
1172 object_ref(OBJECT(client->ioc));
1173 }
1174
1175 /* Now that we're connected, set the socket to be non-blocking and
1176 * kick the reply mechanism. */
1177 qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
1178 client->connection_co = qemu_coroutine_create(nbd_connection_entry, client);
1179 bdrv_inc_in_flight(bs);
1180 nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
1181
1182 logout("Established connection with NBD server\n");
1183 return 0;
1184
1185 fail:
1186 /*
1187 * We have connected, but must fail for other reasons. The
1188 * connection is still blocking; send NBD_CMD_DISC as a courtesy
1189 * to the server.
1190 */
1191 {
1192 NBDRequest request = { .type = NBD_CMD_DISC };
1193
1194 nbd_send_request(client->ioc ?: QIO_CHANNEL(sioc), &request);
1195
1196 object_unref(OBJECT(sioc));
1197
1198 return ret;
1199 }
1200 }
1201
1202 int nbd_client_init(BlockDriverState *bs,
1203 SocketAddress *saddr,
1204 const char *export,
1205 QCryptoTLSCreds *tlscreds,
1206 const char *hostname,
1207 const char *x_dirty_bitmap,
1208 Error **errp)
1209 {
1210 NBDClientSession *client = nbd_get_client_session(bs);
1211
1212 client->bs = bs;
1213 qemu_co_mutex_init(&client->send_mutex);
1214 qemu_co_queue_init(&client->free_sema);
1215
1216 return nbd_client_connect(bs, saddr, export, tlscreds, hostname,
1217 x_dirty_bitmap, errp);
1218 }