]> git.proxmox.com Git - mirror_qemu.git/blob - block/nbd-client.c
nbd: Tolerate some server non-compliance in NBD_CMD_BLOCK_STATUS
[mirror_qemu.git] / block / nbd-client.c
1 /*
2 * QEMU Block driver for NBD
3 *
4 * Copyright (C) 2016 Red Hat, Inc.
5 * Copyright (C) 2008 Bull S.A.S.
6 * Author: Laurent Vivier <Laurent.Vivier@bull.net>
7 *
8 * Some parts:
9 * Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 * THE SOFTWARE.
28 */
29
30 #include "qemu/osdep.h"
31
32 #include "trace.h"
33 #include "qapi/error.h"
34 #include "nbd-client.h"
35
36 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
37 #define INDEX_TO_HANDLE(bs, index) ((index) ^ (uint64_t)(intptr_t)(bs))
38
39 static void nbd_recv_coroutines_wake_all(NBDClientSession *s)
40 {
41 int i;
42
43 for (i = 0; i < MAX_NBD_REQUESTS; i++) {
44 NBDClientRequest *req = &s->requests[i];
45
46 if (req->coroutine && req->receiving) {
47 aio_co_wake(req->coroutine);
48 }
49 }
50 }
51
52 static void nbd_teardown_connection(BlockDriverState *bs)
53 {
54 NBDClientSession *client = nbd_get_client_session(bs);
55
56 assert(client->ioc);
57
58 /* finish any pending coroutines */
59 qio_channel_shutdown(client->ioc,
60 QIO_CHANNEL_SHUTDOWN_BOTH,
61 NULL);
62 BDRV_POLL_WHILE(bs, client->connection_co);
63
64 nbd_client_detach_aio_context(bs);
65 object_unref(OBJECT(client->sioc));
66 client->sioc = NULL;
67 object_unref(OBJECT(client->ioc));
68 client->ioc = NULL;
69 }
70
71 static coroutine_fn void nbd_connection_entry(void *opaque)
72 {
73 NBDClientSession *s = opaque;
74 uint64_t i;
75 int ret = 0;
76 Error *local_err = NULL;
77
78 while (!s->quit) {
79 /*
80 * The NBD client can only really be considered idle when it has
81 * yielded from qio_channel_readv_all_eof(), waiting for data. This is
82 * the point where the additional scheduled coroutine entry happens
83 * after nbd_client_attach_aio_context().
84 *
85 * Therefore we keep an additional in_flight reference all the time and
86 * only drop it temporarily here.
87 */
88 assert(s->reply.handle == 0);
89 ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
90
91 if (local_err) {
92 trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
93 error_free(local_err);
94 }
95 if (ret <= 0) {
96 break;
97 }
98
99 /* There's no need for a mutex on the receive side, because the
100 * handler acts as a synchronization point and ensures that only
101 * one coroutine is called until the reply finishes.
102 */
103 i = HANDLE_TO_INDEX(s, s->reply.handle);
104 if (i >= MAX_NBD_REQUESTS ||
105 !s->requests[i].coroutine ||
106 !s->requests[i].receiving ||
107 (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
108 {
109 break;
110 }
111
112 /* We're woken up again by the request itself. Note that there
113 * is no race between yielding and reentering connection_co. This
114 * is because:
115 *
116 * - if the request runs on the same AioContext, it is only
117 * entered after we yield
118 *
119 * - if the request runs on a different AioContext, reentering
120 * connection_co happens through a bottom half, which can only
121 * run after we yield.
122 */
123 aio_co_wake(s->requests[i].coroutine);
124 qemu_coroutine_yield();
125 }
126
127 s->quit = true;
128 nbd_recv_coroutines_wake_all(s);
129 bdrv_dec_in_flight(s->bs);
130
131 s->connection_co = NULL;
132 aio_wait_kick();
133 }
134
135 static int nbd_co_send_request(BlockDriverState *bs,
136 NBDRequest *request,
137 QEMUIOVector *qiov)
138 {
139 NBDClientSession *s = nbd_get_client_session(bs);
140 int rc, i;
141
142 qemu_co_mutex_lock(&s->send_mutex);
143 while (s->in_flight == MAX_NBD_REQUESTS) {
144 qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
145 }
146 s->in_flight++;
147
148 for (i = 0; i < MAX_NBD_REQUESTS; i++) {
149 if (s->requests[i].coroutine == NULL) {
150 break;
151 }
152 }
153
154 g_assert(qemu_in_coroutine());
155 assert(i < MAX_NBD_REQUESTS);
156
157 s->requests[i].coroutine = qemu_coroutine_self();
158 s->requests[i].offset = request->from;
159 s->requests[i].receiving = false;
160
161 request->handle = INDEX_TO_HANDLE(s, i);
162
163 if (s->quit) {
164 rc = -EIO;
165 goto err;
166 }
167 assert(s->ioc);
168
169 if (qiov) {
170 qio_channel_set_cork(s->ioc, true);
171 rc = nbd_send_request(s->ioc, request);
172 if (rc >= 0 && !s->quit) {
173 if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
174 NULL) < 0) {
175 rc = -EIO;
176 }
177 } else if (rc >= 0) {
178 rc = -EIO;
179 }
180 qio_channel_set_cork(s->ioc, false);
181 } else {
182 rc = nbd_send_request(s->ioc, request);
183 }
184
185 err:
186 if (rc < 0) {
187 s->quit = true;
188 s->requests[i].coroutine = NULL;
189 s->in_flight--;
190 qemu_co_queue_next(&s->free_sema);
191 }
192 qemu_co_mutex_unlock(&s->send_mutex);
193 return rc;
194 }
195
196 static inline uint16_t payload_advance16(uint8_t **payload)
197 {
198 *payload += 2;
199 return lduw_be_p(*payload - 2);
200 }
201
202 static inline uint32_t payload_advance32(uint8_t **payload)
203 {
204 *payload += 4;
205 return ldl_be_p(*payload - 4);
206 }
207
208 static inline uint64_t payload_advance64(uint8_t **payload)
209 {
210 *payload += 8;
211 return ldq_be_p(*payload - 8);
212 }
213
214 static int nbd_parse_offset_hole_payload(NBDStructuredReplyChunk *chunk,
215 uint8_t *payload, uint64_t orig_offset,
216 QEMUIOVector *qiov, Error **errp)
217 {
218 uint64_t offset;
219 uint32_t hole_size;
220
221 if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
222 error_setg(errp, "Protocol error: invalid payload for "
223 "NBD_REPLY_TYPE_OFFSET_HOLE");
224 return -EINVAL;
225 }
226
227 offset = payload_advance64(&payload);
228 hole_size = payload_advance32(&payload);
229
230 if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
231 offset > orig_offset + qiov->size - hole_size) {
232 error_setg(errp, "Protocol error: server sent chunk exceeding requested"
233 " region");
234 return -EINVAL;
235 }
236
237 qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
238
239 return 0;
240 }
241
242 /* nbd_parse_blockstatus_payload
243 * Based on our request, we expect only one extent in reply, for the
244 * base:allocation context.
245 */
246 static int nbd_parse_blockstatus_payload(NBDClientSession *client,
247 NBDStructuredReplyChunk *chunk,
248 uint8_t *payload, uint64_t orig_length,
249 NBDExtent *extent, Error **errp)
250 {
251 uint32_t context_id;
252
253 /* The server succeeded, so it must have sent [at least] one extent */
254 if (chunk->length < sizeof(context_id) + sizeof(*extent)) {
255 error_setg(errp, "Protocol error: invalid payload for "
256 "NBD_REPLY_TYPE_BLOCK_STATUS");
257 return -EINVAL;
258 }
259
260 context_id = payload_advance32(&payload);
261 if (client->info.context_id != context_id) {
262 error_setg(errp, "Protocol error: unexpected context id %d for "
263 "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
264 "id is %d", context_id,
265 client->info.context_id);
266 return -EINVAL;
267 }
268
269 extent->length = payload_advance32(&payload);
270 extent->flags = payload_advance32(&payload);
271
272 if (extent->length == 0 ||
273 (client->info.min_block && !QEMU_IS_ALIGNED(extent->length,
274 client->info.min_block))) {
275 error_setg(errp, "Protocol error: server sent status chunk with "
276 "invalid length");
277 return -EINVAL;
278 }
279
280 /*
281 * We used NBD_CMD_FLAG_REQ_ONE, so the server should not have
282 * sent us any more than one extent, nor should it have included
283 * status beyond our request in that extent. However, it's easy
284 * enough to ignore the server's noncompliance without killing the
285 * connection; just ignore trailing extents, and clamp things to
286 * the length of our request.
287 */
288 if (chunk->length > sizeof(context_id) + sizeof(*extent)) {
289 trace_nbd_parse_blockstatus_compliance("more than one extent");
290 }
291 if (extent->length > orig_length) {
292 extent->length = orig_length;
293 trace_nbd_parse_blockstatus_compliance("extent length too large");
294 }
295
296 return 0;
297 }
298
299 /* nbd_parse_error_payload
300 * on success @errp contains message describing nbd error reply
301 */
302 static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
303 uint8_t *payload, int *request_ret,
304 Error **errp)
305 {
306 uint32_t error;
307 uint16_t message_size;
308
309 assert(chunk->type & (1 << 15));
310
311 if (chunk->length < sizeof(error) + sizeof(message_size)) {
312 error_setg(errp,
313 "Protocol error: invalid payload for structured error");
314 return -EINVAL;
315 }
316
317 error = nbd_errno_to_system_errno(payload_advance32(&payload));
318 if (error == 0) {
319 error_setg(errp, "Protocol error: server sent structured error chunk "
320 "with error = 0");
321 return -EINVAL;
322 }
323
324 *request_ret = -error;
325 message_size = payload_advance16(&payload);
326
327 if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
328 error_setg(errp, "Protocol error: server sent structured error chunk "
329 "with incorrect message size");
330 return -EINVAL;
331 }
332
333 /* TODO: Add a trace point to mention the server complaint */
334
335 /* TODO handle ERROR_OFFSET */
336
337 return 0;
338 }
339
340 static int nbd_co_receive_offset_data_payload(NBDClientSession *s,
341 uint64_t orig_offset,
342 QEMUIOVector *qiov, Error **errp)
343 {
344 QEMUIOVector sub_qiov;
345 uint64_t offset;
346 size_t data_size;
347 int ret;
348 NBDStructuredReplyChunk *chunk = &s->reply.structured;
349
350 assert(nbd_reply_is_structured(&s->reply));
351
352 /* The NBD spec requires at least one byte of payload */
353 if (chunk->length <= sizeof(offset)) {
354 error_setg(errp, "Protocol error: invalid payload for "
355 "NBD_REPLY_TYPE_OFFSET_DATA");
356 return -EINVAL;
357 }
358
359 if (nbd_read64(s->ioc, &offset, "OFFSET_DATA offset", errp) < 0) {
360 return -EIO;
361 }
362
363 data_size = chunk->length - sizeof(offset);
364 assert(data_size);
365 if (offset < orig_offset || data_size > qiov->size ||
366 offset > orig_offset + qiov->size - data_size) {
367 error_setg(errp, "Protocol error: server sent chunk exceeding requested"
368 " region");
369 return -EINVAL;
370 }
371
372 qemu_iovec_init(&sub_qiov, qiov->niov);
373 qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
374 ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
375 qemu_iovec_destroy(&sub_qiov);
376
377 return ret < 0 ? -EIO : 0;
378 }
379
380 #define NBD_MAX_MALLOC_PAYLOAD 1000
381 /* nbd_co_receive_structured_payload
382 */
383 static coroutine_fn int nbd_co_receive_structured_payload(
384 NBDClientSession *s, void **payload, Error **errp)
385 {
386 int ret;
387 uint32_t len;
388
389 assert(nbd_reply_is_structured(&s->reply));
390
391 len = s->reply.structured.length;
392
393 if (len == 0) {
394 return 0;
395 }
396
397 if (payload == NULL) {
398 error_setg(errp, "Unexpected structured payload");
399 return -EINVAL;
400 }
401
402 if (len > NBD_MAX_MALLOC_PAYLOAD) {
403 error_setg(errp, "Payload too large");
404 return -EINVAL;
405 }
406
407 *payload = g_new(char, len);
408 ret = nbd_read(s->ioc, *payload, len, "structured payload", errp);
409 if (ret < 0) {
410 g_free(*payload);
411 *payload = NULL;
412 return ret;
413 }
414
415 return 0;
416 }
417
418 /* nbd_co_do_receive_one_chunk
419 * for simple reply:
420 * set request_ret to received reply error
421 * if qiov is not NULL: read payload to @qiov
422 * for structured reply chunk:
423 * if error chunk: read payload, set @request_ret, do not set @payload
424 * else if offset_data chunk: read payload data to @qiov, do not set @payload
425 * else: read payload to @payload
426 *
427 * If function fails, @errp contains corresponding error message, and the
428 * connection with the server is suspect. If it returns 0, then the
429 * transaction succeeded (although @request_ret may be a negative errno
430 * corresponding to the server's error reply), and errp is unchanged.
431 */
432 static coroutine_fn int nbd_co_do_receive_one_chunk(
433 NBDClientSession *s, uint64_t handle, bool only_structured,
434 int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
435 {
436 int ret;
437 int i = HANDLE_TO_INDEX(s, handle);
438 void *local_payload = NULL;
439 NBDStructuredReplyChunk *chunk;
440
441 if (payload) {
442 *payload = NULL;
443 }
444 *request_ret = 0;
445
446 /* Wait until we're woken up by nbd_connection_entry. */
447 s->requests[i].receiving = true;
448 qemu_coroutine_yield();
449 s->requests[i].receiving = false;
450 if (s->quit) {
451 error_setg(errp, "Connection closed");
452 return -EIO;
453 }
454 assert(s->ioc);
455
456 assert(s->reply.handle == handle);
457
458 if (nbd_reply_is_simple(&s->reply)) {
459 if (only_structured) {
460 error_setg(errp, "Protocol error: simple reply when structured "
461 "reply chunk was expected");
462 return -EINVAL;
463 }
464
465 *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
466 if (*request_ret < 0 || !qiov) {
467 return 0;
468 }
469
470 return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
471 errp) < 0 ? -EIO : 0;
472 }
473
474 /* handle structured reply chunk */
475 assert(s->info.structured_reply);
476 chunk = &s->reply.structured;
477
478 if (chunk->type == NBD_REPLY_TYPE_NONE) {
479 if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
480 error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
481 " NBD_REPLY_FLAG_DONE flag set");
482 return -EINVAL;
483 }
484 if (chunk->length) {
485 error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
486 " nonzero length");
487 return -EINVAL;
488 }
489 return 0;
490 }
491
492 if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
493 if (!qiov) {
494 error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
495 return -EINVAL;
496 }
497
498 return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
499 qiov, errp);
500 }
501
502 if (nbd_reply_type_is_error(chunk->type)) {
503 payload = &local_payload;
504 }
505
506 ret = nbd_co_receive_structured_payload(s, payload, errp);
507 if (ret < 0) {
508 return ret;
509 }
510
511 if (nbd_reply_type_is_error(chunk->type)) {
512 ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
513 g_free(local_payload);
514 return ret;
515 }
516
517 return 0;
518 }
519
520 /* nbd_co_receive_one_chunk
521 * Read reply, wake up connection_co and set s->quit if needed.
522 * Return value is a fatal error code or normal nbd reply error code
523 */
524 static coroutine_fn int nbd_co_receive_one_chunk(
525 NBDClientSession *s, uint64_t handle, bool only_structured,
526 int *request_ret, QEMUIOVector *qiov, NBDReply *reply, void **payload,
527 Error **errp)
528 {
529 int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
530 request_ret, qiov, payload, errp);
531
532 if (ret < 0) {
533 s->quit = true;
534 } else {
535 /* For assert at loop start in nbd_connection_entry */
536 if (reply) {
537 *reply = s->reply;
538 }
539 s->reply.handle = 0;
540 }
541
542 if (s->connection_co) {
543 aio_co_wake(s->connection_co);
544 }
545
546 return ret;
547 }
548
549 typedef struct NBDReplyChunkIter {
550 int ret;
551 int request_ret;
552 Error *err;
553 bool done, only_structured;
554 } NBDReplyChunkIter;
555
556 static void nbd_iter_channel_error(NBDReplyChunkIter *iter,
557 int ret, Error **local_err)
558 {
559 assert(ret < 0);
560
561 if (!iter->ret) {
562 iter->ret = ret;
563 error_propagate(&iter->err, *local_err);
564 } else {
565 error_free(*local_err);
566 }
567
568 *local_err = NULL;
569 }
570
571 static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
572 {
573 assert(ret < 0);
574
575 if (!iter->request_ret) {
576 iter->request_ret = ret;
577 }
578 }
579
580 /* NBD_FOREACH_REPLY_CHUNK
581 */
582 #define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
583 qiov, reply, payload) \
584 for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
585 nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
586
587 /* nbd_reply_chunk_iter_receive
588 */
589 static bool nbd_reply_chunk_iter_receive(NBDClientSession *s,
590 NBDReplyChunkIter *iter,
591 uint64_t handle,
592 QEMUIOVector *qiov, NBDReply *reply,
593 void **payload)
594 {
595 int ret, request_ret;
596 NBDReply local_reply;
597 NBDStructuredReplyChunk *chunk;
598 Error *local_err = NULL;
599 if (s->quit) {
600 error_setg(&local_err, "Connection closed");
601 nbd_iter_channel_error(iter, -EIO, &local_err);
602 goto break_loop;
603 }
604
605 if (iter->done) {
606 /* Previous iteration was last. */
607 goto break_loop;
608 }
609
610 if (reply == NULL) {
611 reply = &local_reply;
612 }
613
614 ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
615 &request_ret, qiov, reply, payload,
616 &local_err);
617 if (ret < 0) {
618 nbd_iter_channel_error(iter, ret, &local_err);
619 } else if (request_ret < 0) {
620 nbd_iter_request_error(iter, request_ret);
621 }
622
623 /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
624 if (nbd_reply_is_simple(reply) || s->quit) {
625 goto break_loop;
626 }
627
628 chunk = &reply->structured;
629 iter->only_structured = true;
630
631 if (chunk->type == NBD_REPLY_TYPE_NONE) {
632 /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
633 assert(chunk->flags & NBD_REPLY_FLAG_DONE);
634 goto break_loop;
635 }
636
637 if (chunk->flags & NBD_REPLY_FLAG_DONE) {
638 /* This iteration is last. */
639 iter->done = true;
640 }
641
642 /* Execute the loop body */
643 return true;
644
645 break_loop:
646 s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
647
648 qemu_co_mutex_lock(&s->send_mutex);
649 s->in_flight--;
650 qemu_co_queue_next(&s->free_sema);
651 qemu_co_mutex_unlock(&s->send_mutex);
652
653 return false;
654 }
655
656 static int nbd_co_receive_return_code(NBDClientSession *s, uint64_t handle,
657 int *request_ret, Error **errp)
658 {
659 NBDReplyChunkIter iter;
660
661 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) {
662 /* nbd_reply_chunk_iter_receive does all the work */
663 }
664
665 error_propagate(errp, iter.err);
666 *request_ret = iter.request_ret;
667 return iter.ret;
668 }
669
670 static int nbd_co_receive_cmdread_reply(NBDClientSession *s, uint64_t handle,
671 uint64_t offset, QEMUIOVector *qiov,
672 int *request_ret, Error **errp)
673 {
674 NBDReplyChunkIter iter;
675 NBDReply reply;
676 void *payload = NULL;
677 Error *local_err = NULL;
678
679 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
680 qiov, &reply, &payload)
681 {
682 int ret;
683 NBDStructuredReplyChunk *chunk = &reply.structured;
684
685 assert(nbd_reply_is_structured(&reply));
686
687 switch (chunk->type) {
688 case NBD_REPLY_TYPE_OFFSET_DATA:
689 /* special cased in nbd_co_receive_one_chunk, data is already
690 * in qiov */
691 break;
692 case NBD_REPLY_TYPE_OFFSET_HOLE:
693 ret = nbd_parse_offset_hole_payload(&reply.structured, payload,
694 offset, qiov, &local_err);
695 if (ret < 0) {
696 s->quit = true;
697 nbd_iter_channel_error(&iter, ret, &local_err);
698 }
699 break;
700 default:
701 if (!nbd_reply_type_is_error(chunk->type)) {
702 /* not allowed reply type */
703 s->quit = true;
704 error_setg(&local_err,
705 "Unexpected reply type: %d (%s) for CMD_READ",
706 chunk->type, nbd_reply_type_lookup(chunk->type));
707 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
708 }
709 }
710
711 g_free(payload);
712 payload = NULL;
713 }
714
715 error_propagate(errp, iter.err);
716 *request_ret = iter.request_ret;
717 return iter.ret;
718 }
719
720 static int nbd_co_receive_blockstatus_reply(NBDClientSession *s,
721 uint64_t handle, uint64_t length,
722 NBDExtent *extent,
723 int *request_ret, Error **errp)
724 {
725 NBDReplyChunkIter iter;
726 NBDReply reply;
727 void *payload = NULL;
728 Error *local_err = NULL;
729 bool received = false;
730
731 assert(!extent->length);
732 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
733 NULL, &reply, &payload)
734 {
735 int ret;
736 NBDStructuredReplyChunk *chunk = &reply.structured;
737
738 assert(nbd_reply_is_structured(&reply));
739
740 switch (chunk->type) {
741 case NBD_REPLY_TYPE_BLOCK_STATUS:
742 if (received) {
743 s->quit = true;
744 error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
745 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
746 }
747 received = true;
748
749 ret = nbd_parse_blockstatus_payload(s, &reply.structured,
750 payload, length, extent,
751 &local_err);
752 if (ret < 0) {
753 s->quit = true;
754 nbd_iter_channel_error(&iter, ret, &local_err);
755 }
756 break;
757 default:
758 if (!nbd_reply_type_is_error(chunk->type)) {
759 s->quit = true;
760 error_setg(&local_err,
761 "Unexpected reply type: %d (%s) "
762 "for CMD_BLOCK_STATUS",
763 chunk->type, nbd_reply_type_lookup(chunk->type));
764 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
765 }
766 }
767
768 g_free(payload);
769 payload = NULL;
770 }
771
772 if (!extent->length && !iter.err) {
773 error_setg(&iter.err,
774 "Server did not reply with any status extents");
775 if (!iter.ret) {
776 iter.ret = -EIO;
777 }
778 }
779
780 error_propagate(errp, iter.err);
781 *request_ret = iter.request_ret;
782 return iter.ret;
783 }
784
785 static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
786 QEMUIOVector *write_qiov)
787 {
788 int ret, request_ret;
789 Error *local_err = NULL;
790 NBDClientSession *client = nbd_get_client_session(bs);
791
792 assert(request->type != NBD_CMD_READ);
793 if (write_qiov) {
794 assert(request->type == NBD_CMD_WRITE);
795 assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
796 } else {
797 assert(request->type != NBD_CMD_WRITE);
798 }
799 ret = nbd_co_send_request(bs, request, write_qiov);
800 if (ret < 0) {
801 return ret;
802 }
803
804 ret = nbd_co_receive_return_code(client, request->handle,
805 &request_ret, &local_err);
806 if (local_err) {
807 trace_nbd_co_request_fail(request->from, request->len, request->handle,
808 request->flags, request->type,
809 nbd_cmd_lookup(request->type),
810 ret, error_get_pretty(local_err));
811 error_free(local_err);
812 }
813 return ret ? ret : request_ret;
814 }
815
816 int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
817 uint64_t bytes, QEMUIOVector *qiov, int flags)
818 {
819 int ret, request_ret;
820 Error *local_err = NULL;
821 NBDClientSession *client = nbd_get_client_session(bs);
822 NBDRequest request = {
823 .type = NBD_CMD_READ,
824 .from = offset,
825 .len = bytes,
826 };
827
828 assert(bytes <= NBD_MAX_BUFFER_SIZE);
829 assert(!flags);
830
831 if (!bytes) {
832 return 0;
833 }
834 ret = nbd_co_send_request(bs, &request, NULL);
835 if (ret < 0) {
836 return ret;
837 }
838
839 ret = nbd_co_receive_cmdread_reply(client, request.handle, offset, qiov,
840 &request_ret, &local_err);
841 if (local_err) {
842 trace_nbd_co_request_fail(request.from, request.len, request.handle,
843 request.flags, request.type,
844 nbd_cmd_lookup(request.type),
845 ret, error_get_pretty(local_err));
846 error_free(local_err);
847 }
848 return ret ? ret : request_ret;
849 }
850
851 int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
852 uint64_t bytes, QEMUIOVector *qiov, int flags)
853 {
854 NBDClientSession *client = nbd_get_client_session(bs);
855 NBDRequest request = {
856 .type = NBD_CMD_WRITE,
857 .from = offset,
858 .len = bytes,
859 };
860
861 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
862 if (flags & BDRV_REQ_FUA) {
863 assert(client->info.flags & NBD_FLAG_SEND_FUA);
864 request.flags |= NBD_CMD_FLAG_FUA;
865 }
866
867 assert(bytes <= NBD_MAX_BUFFER_SIZE);
868
869 if (!bytes) {
870 return 0;
871 }
872 return nbd_co_request(bs, &request, qiov);
873 }
874
875 int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
876 int bytes, BdrvRequestFlags flags)
877 {
878 NBDClientSession *client = nbd_get_client_session(bs);
879 NBDRequest request = {
880 .type = NBD_CMD_WRITE_ZEROES,
881 .from = offset,
882 .len = bytes,
883 };
884
885 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
886 if (!(client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
887 return -ENOTSUP;
888 }
889
890 if (flags & BDRV_REQ_FUA) {
891 assert(client->info.flags & NBD_FLAG_SEND_FUA);
892 request.flags |= NBD_CMD_FLAG_FUA;
893 }
894 if (!(flags & BDRV_REQ_MAY_UNMAP)) {
895 request.flags |= NBD_CMD_FLAG_NO_HOLE;
896 }
897
898 if (!bytes) {
899 return 0;
900 }
901 return nbd_co_request(bs, &request, NULL);
902 }
903
904 int nbd_client_co_flush(BlockDriverState *bs)
905 {
906 NBDClientSession *client = nbd_get_client_session(bs);
907 NBDRequest request = { .type = NBD_CMD_FLUSH };
908
909 if (!(client->info.flags & NBD_FLAG_SEND_FLUSH)) {
910 return 0;
911 }
912
913 request.from = 0;
914 request.len = 0;
915
916 return nbd_co_request(bs, &request, NULL);
917 }
918
919 int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int bytes)
920 {
921 NBDClientSession *client = nbd_get_client_session(bs);
922 NBDRequest request = {
923 .type = NBD_CMD_TRIM,
924 .from = offset,
925 .len = bytes,
926 };
927
928 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
929 if (!(client->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
930 return 0;
931 }
932
933 return nbd_co_request(bs, &request, NULL);
934 }
935
936 int coroutine_fn nbd_client_co_block_status(BlockDriverState *bs,
937 bool want_zero,
938 int64_t offset, int64_t bytes,
939 int64_t *pnum, int64_t *map,
940 BlockDriverState **file)
941 {
942 int ret, request_ret;
943 NBDExtent extent = { 0 };
944 NBDClientSession *client = nbd_get_client_session(bs);
945 Error *local_err = NULL;
946
947 NBDRequest request = {
948 .type = NBD_CMD_BLOCK_STATUS,
949 .from = offset,
950 .len = MIN(MIN_NON_ZERO(QEMU_ALIGN_DOWN(INT_MAX,
951 bs->bl.request_alignment),
952 client->info.max_block), bytes),
953 .flags = NBD_CMD_FLAG_REQ_ONE,
954 };
955
956 if (!client->info.base_allocation) {
957 *pnum = bytes;
958 return BDRV_BLOCK_DATA;
959 }
960
961 ret = nbd_co_send_request(bs, &request, NULL);
962 if (ret < 0) {
963 return ret;
964 }
965
966 ret = nbd_co_receive_blockstatus_reply(client, request.handle, bytes,
967 &extent, &request_ret, &local_err);
968 if (local_err) {
969 trace_nbd_co_request_fail(request.from, request.len, request.handle,
970 request.flags, request.type,
971 nbd_cmd_lookup(request.type),
972 ret, error_get_pretty(local_err));
973 error_free(local_err);
974 }
975 if (ret < 0 || request_ret < 0) {
976 return ret ? ret : request_ret;
977 }
978
979 assert(extent.length);
980 *pnum = extent.length;
981 return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
982 (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0);
983 }
984
985 void nbd_client_detach_aio_context(BlockDriverState *bs)
986 {
987 NBDClientSession *client = nbd_get_client_session(bs);
988 qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
989 }
990
991 static void nbd_client_attach_aio_context_bh(void *opaque)
992 {
993 BlockDriverState *bs = opaque;
994 NBDClientSession *client = nbd_get_client_session(bs);
995
996 /* The node is still drained, so we know the coroutine has yielded in
997 * nbd_read_eof(), the only place where bs->in_flight can reach 0, or it is
998 * entered for the first time. Both places are safe for entering the
999 * coroutine.*/
1000 qemu_aio_coroutine_enter(bs->aio_context, client->connection_co);
1001 bdrv_dec_in_flight(bs);
1002 }
1003
1004 void nbd_client_attach_aio_context(BlockDriverState *bs,
1005 AioContext *new_context)
1006 {
1007 NBDClientSession *client = nbd_get_client_session(bs);
1008 qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc), new_context);
1009
1010 bdrv_inc_in_flight(bs);
1011
1012 /* Need to wait here for the BH to run because the BH must run while the
1013 * node is still drained. */
1014 aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
1015 }
1016
1017 void nbd_client_close(BlockDriverState *bs)
1018 {
1019 NBDClientSession *client = nbd_get_client_session(bs);
1020 NBDRequest request = { .type = NBD_CMD_DISC };
1021
1022 assert(client->ioc);
1023
1024 nbd_send_request(client->ioc, &request);
1025
1026 nbd_teardown_connection(bs);
1027 }
1028
1029 static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
1030 Error **errp)
1031 {
1032 QIOChannelSocket *sioc;
1033 Error *local_err = NULL;
1034
1035 sioc = qio_channel_socket_new();
1036 qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
1037
1038 qio_channel_socket_connect_sync(sioc, saddr, &local_err);
1039 if (local_err) {
1040 object_unref(OBJECT(sioc));
1041 error_propagate(errp, local_err);
1042 return NULL;
1043 }
1044
1045 qio_channel_set_delay(QIO_CHANNEL(sioc), false);
1046
1047 return sioc;
1048 }
1049
1050 static int nbd_client_connect(BlockDriverState *bs,
1051 SocketAddress *saddr,
1052 const char *export,
1053 QCryptoTLSCreds *tlscreds,
1054 const char *hostname,
1055 const char *x_dirty_bitmap,
1056 Error **errp)
1057 {
1058 NBDClientSession *client = nbd_get_client_session(bs);
1059 int ret;
1060
1061 /*
1062 * establish TCP connection, return error if it fails
1063 * TODO: Configurable retry-until-timeout behaviour.
1064 */
1065 QIOChannelSocket *sioc = nbd_establish_connection(saddr, errp);
1066
1067 if (!sioc) {
1068 return -ECONNREFUSED;
1069 }
1070
1071 /* NBD handshake */
1072 logout("session init %s\n", export);
1073 qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
1074
1075 client->info.request_sizes = true;
1076 client->info.structured_reply = true;
1077 client->info.base_allocation = true;
1078 client->info.x_dirty_bitmap = g_strdup(x_dirty_bitmap);
1079 client->info.name = g_strdup(export ?: "");
1080 ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), tlscreds, hostname,
1081 &client->ioc, &client->info, errp);
1082 g_free(client->info.x_dirty_bitmap);
1083 g_free(client->info.name);
1084 if (ret < 0) {
1085 logout("Failed to negotiate with the NBD server\n");
1086 object_unref(OBJECT(sioc));
1087 return ret;
1088 }
1089 if (x_dirty_bitmap && !client->info.base_allocation) {
1090 error_setg(errp, "requested x-dirty-bitmap %s not found",
1091 x_dirty_bitmap);
1092 ret = -EINVAL;
1093 goto fail;
1094 }
1095 if (client->info.flags & NBD_FLAG_READ_ONLY) {
1096 ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
1097 if (ret < 0) {
1098 goto fail;
1099 }
1100 }
1101 if (client->info.flags & NBD_FLAG_SEND_FUA) {
1102 bs->supported_write_flags = BDRV_REQ_FUA;
1103 bs->supported_zero_flags |= BDRV_REQ_FUA;
1104 }
1105 if (client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
1106 bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
1107 }
1108
1109 client->sioc = sioc;
1110
1111 if (!client->ioc) {
1112 client->ioc = QIO_CHANNEL(sioc);
1113 object_ref(OBJECT(client->ioc));
1114 }
1115
1116 /* Now that we're connected, set the socket to be non-blocking and
1117 * kick the reply mechanism. */
1118 qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
1119 client->connection_co = qemu_coroutine_create(nbd_connection_entry, client);
1120 bdrv_inc_in_flight(bs);
1121 nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
1122
1123 logout("Established connection with NBD server\n");
1124 return 0;
1125
1126 fail:
1127 /*
1128 * We have connected, but must fail for other reasons. The
1129 * connection is still blocking; send NBD_CMD_DISC as a courtesy
1130 * to the server.
1131 */
1132 {
1133 NBDRequest request = { .type = NBD_CMD_DISC };
1134
1135 nbd_send_request(client->ioc ?: QIO_CHANNEL(sioc), &request);
1136
1137 object_unref(OBJECT(sioc));
1138
1139 return ret;
1140 }
1141 }
1142
1143 int nbd_client_init(BlockDriverState *bs,
1144 SocketAddress *saddr,
1145 const char *export,
1146 QCryptoTLSCreds *tlscreds,
1147 const char *hostname,
1148 const char *x_dirty_bitmap,
1149 Error **errp)
1150 {
1151 NBDClientSession *client = nbd_get_client_session(bs);
1152
1153 client->bs = bs;
1154 qemu_co_mutex_init(&client->send_mutex);
1155 qemu_co_queue_init(&client->free_sema);
1156
1157 return nbd_client_connect(bs, saddr, export, tlscreds, hostname,
1158 x_dirty_bitmap, errp);
1159 }