]> git.proxmox.com Git - mirror_qemu.git/blame - block/nbd-client.c
block/nbd: move connection code from block/nbd to block/nbd-client
[mirror_qemu.git] / block / nbd-client.c
CommitLineData
2302c1ca
MAL
1/*
2 * QEMU Block driver for NBD
3 *
b626b51a 4 * Copyright (C) 2016 Red Hat, Inc.
2302c1ca
MAL
5 * Copyright (C) 2008 Bull S.A.S.
6 * Author: Laurent Vivier <Laurent.Vivier@bull.net>
7 *
8 * Some parts:
9 * Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 * THE SOFTWARE.
28 */
29
80c71a24 30#include "qemu/osdep.h"
d8b4bad8
VSO
31
32#include "trace.h"
be41c100 33#include "qapi/error.h"
2302c1ca 34#include "nbd-client.h"
2302c1ca 35
cfa3ad63
EB
36#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
37#define INDEX_TO_HANDLE(bs, index) ((index) ^ (uint64_t)(intptr_t)(bs))
2302c1ca 38
07b1b99c 39static void nbd_recv_coroutines_wake_all(NBDClientSession *s)
69152c09
MAL
40{
41 int i;
42
43 for (i = 0; i < MAX_NBD_REQUESTS; i++) {
40f4a218
SH
44 NBDClientRequest *req = &s->requests[i];
45
46 if (req->coroutine && req->receiving) {
47 aio_co_wake(req->coroutine);
69152c09
MAL
48 }
49 }
50}
51
f53a829b 52static void nbd_teardown_connection(BlockDriverState *bs)
4a41a2d6 53{
10676b81 54 NBDClientSession *client = nbd_get_client_session(bs);
f53a829b 55
064097d9
DB
56 if (!client->ioc) { /* Already closed */
57 return;
58 }
59
4a41a2d6 60 /* finish any pending coroutines */
064097d9
DB
61 qio_channel_shutdown(client->ioc,
62 QIO_CHANNEL_SHUTDOWN_BOTH,
63 NULL);
a12a712a 64 BDRV_POLL_WHILE(bs, client->read_reply_co);
4a41a2d6 65
f53a829b 66 nbd_client_detach_aio_context(bs);
064097d9
DB
67 object_unref(OBJECT(client->sioc));
68 client->sioc = NULL;
69 object_unref(OBJECT(client->ioc));
70 client->ioc = NULL;
4a41a2d6
SH
71}
72
ff82911c 73static coroutine_fn void nbd_read_reply_entry(void *opaque)
2302c1ca 74{
ff82911c 75 NBDClientSession *s = opaque;
2302c1ca 76 uint64_t i;
d0a18013 77 int ret = 0;
be41c100 78 Error *local_err = NULL;
2302c1ca 79
72b6ffc7 80 while (!s->quit) {
ff82911c 81 assert(s->reply.handle == 0);
be41c100 82 ret = nbd_receive_reply(s->ioc, &s->reply, &local_err);
08ace1d7 83 if (local_err) {
d8b4bad8
VSO
84 trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
85 error_free(local_err);
be41c100 86 }
a12a712a 87 if (ret <= 0) {
ff82911c 88 break;
2302c1ca 89 }
2302c1ca 90
ff82911c
PB
91 /* There's no need for a mutex on the receive side, because the
92 * handler acts as a synchronization point and ensures that only
93 * one coroutine is called until the reply finishes.
94 */
95 i = HANDLE_TO_INDEX(s, s->reply.handle);
40f4a218
SH
96 if (i >= MAX_NBD_REQUESTS ||
97 !s->requests[i].coroutine ||
d2febedb 98 !s->requests[i].receiving ||
f140e300 99 (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
d2febedb 100 {
ff82911c
PB
101 break;
102 }
2302c1ca 103
40f4a218 104 /* We're woken up again by the request itself. Note that there
ff82911c
PB
105 * is no race between yielding and reentering read_reply_co. This
106 * is because:
107 *
40f4a218 108 * - if the request runs on the same AioContext, it is only
ff82911c
PB
109 * entered after we yield
110 *
40f4a218 111 * - if the request runs on a different AioContext, reentering
ff82911c
PB
112 * read_reply_co happens through a bottom half, which can only
113 * run after we yield.
114 */
40f4a218 115 aio_co_wake(s->requests[i].coroutine);
ff82911c 116 qemu_coroutine_yield();
2302c1ca 117 }
a12a712a 118
40f4a218 119 s->quit = true;
07b1b99c 120 nbd_recv_coroutines_wake_all(s);
ff82911c 121 s->read_reply_co = NULL;
4720cbee 122 aio_wait_kick();
2302c1ca
MAL
123}
124
f53a829b 125static int nbd_co_send_request(BlockDriverState *bs,
ed2dd912 126 NBDRequest *request,
1e2a77a8 127 QEMUIOVector *qiov)
2302c1ca 128{
10676b81 129 NBDClientSession *s = nbd_get_client_session(bs);
030fa7f6 130 int rc, i;
2302c1ca
MAL
131
132 qemu_co_mutex_lock(&s->send_mutex);
6bdcc018
PB
133 while (s->in_flight == MAX_NBD_REQUESTS) {
134 qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
135 }
136 s->in_flight++;
141cabe6
BW
137
138 for (i = 0; i < MAX_NBD_REQUESTS; i++) {
40f4a218 139 if (s->requests[i].coroutine == NULL) {
141cabe6
BW
140 break;
141 }
142 }
143
1c778ef7 144 g_assert(qemu_in_coroutine());
141cabe6 145 assert(i < MAX_NBD_REQUESTS);
40f4a218
SH
146
147 s->requests[i].coroutine = qemu_coroutine_self();
f140e300 148 s->requests[i].offset = request->from;
40f4a218
SH
149 s->requests[i].receiving = false;
150
141cabe6 151 request->handle = INDEX_TO_HANDLE(s, i);
064097d9 152
72b6ffc7 153 if (s->quit) {
3c2d5183
SH
154 rc = -EIO;
155 goto err;
72b6ffc7 156 }
064097d9 157 if (!s->ioc) {
3c2d5183
SH
158 rc = -EPIPE;
159 goto err;
064097d9
DB
160 }
161
2302c1ca 162 if (qiov) {
064097d9 163 qio_channel_set_cork(s->ioc, true);
1c778ef7 164 rc = nbd_send_request(s->ioc, request);
72b6ffc7 165 if (rc >= 0 && !s->quit) {
030fa7f6
EB
166 if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
167 NULL) < 0) {
2302c1ca
MAL
168 rc = -EIO;
169 }
a6934370
VSO
170 } else if (rc >= 0) {
171 rc = -EIO;
2302c1ca 172 }
064097d9 173 qio_channel_set_cork(s->ioc, false);
2302c1ca 174 } else {
1c778ef7 175 rc = nbd_send_request(s->ioc, request);
2302c1ca 176 }
3c2d5183
SH
177
178err:
72b6ffc7
EB
179 if (rc < 0) {
180 s->quit = true;
3c2d5183
SH
181 s->requests[i].coroutine = NULL;
182 s->in_flight--;
183 qemu_co_queue_next(&s->free_sema);
72b6ffc7 184 }
2302c1ca
MAL
185 qemu_co_mutex_unlock(&s->send_mutex);
186 return rc;
187}
188
f140e300
VSO
189static inline uint16_t payload_advance16(uint8_t **payload)
190{
191 *payload += 2;
192 return lduw_be_p(*payload - 2);
193}
194
195static inline uint32_t payload_advance32(uint8_t **payload)
196{
197 *payload += 4;
198 return ldl_be_p(*payload - 4);
199}
200
201static inline uint64_t payload_advance64(uint8_t **payload)
202{
203 *payload += 8;
204 return ldq_be_p(*payload - 8);
205}
206
207static int nbd_parse_offset_hole_payload(NBDStructuredReplyChunk *chunk,
208 uint8_t *payload, uint64_t orig_offset,
209 QEMUIOVector *qiov, Error **errp)
210{
211 uint64_t offset;
212 uint32_t hole_size;
213
214 if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
215 error_setg(errp, "Protocol error: invalid payload for "
216 "NBD_REPLY_TYPE_OFFSET_HOLE");
217 return -EINVAL;
218 }
219
220 offset = payload_advance64(&payload);
221 hole_size = payload_advance32(&payload);
222
b4176cb3 223 if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
f140e300
VSO
224 offset > orig_offset + qiov->size - hole_size) {
225 error_setg(errp, "Protocol error: server sent chunk exceeding requested"
226 " region");
227 return -EINVAL;
228 }
229
230 qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
231
232 return 0;
233}
234
78a33ab5
VSO
235/* nbd_parse_blockstatus_payload
236 * support only one extent in reply and only for
237 * base:allocation context
238 */
239static int nbd_parse_blockstatus_payload(NBDClientSession *client,
240 NBDStructuredReplyChunk *chunk,
241 uint8_t *payload, uint64_t orig_length,
242 NBDExtent *extent, Error **errp)
243{
244 uint32_t context_id;
245
00d96a46 246 if (chunk->length != sizeof(context_id) + sizeof(*extent)) {
78a33ab5
VSO
247 error_setg(errp, "Protocol error: invalid payload for "
248 "NBD_REPLY_TYPE_BLOCK_STATUS");
249 return -EINVAL;
250 }
251
252 context_id = payload_advance32(&payload);
2df94eb5 253 if (client->info.context_id != context_id) {
78a33ab5
VSO
254 error_setg(errp, "Protocol error: unexpected context id %d for "
255 "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
256 "id is %d", context_id,
2df94eb5 257 client->info.context_id);
78a33ab5
VSO
258 return -EINVAL;
259 }
260
261 extent->length = payload_advance32(&payload);
262 extent->flags = payload_advance32(&payload);
263
264 if (extent->length == 0 ||
265 (client->info.min_block && !QEMU_IS_ALIGNED(extent->length,
acfd8f7a 266 client->info.min_block))) {
78a33ab5
VSO
267 error_setg(errp, "Protocol error: server sent status chunk with "
268 "invalid length");
269 return -EINVAL;
270 }
271
acfd8f7a
EB
272 /* The server is allowed to send us extra information on the final
273 * extent; just clamp it to the length we requested. */
274 if (extent->length > orig_length) {
275 extent->length = orig_length;
276 }
277
78a33ab5
VSO
278 return 0;
279}
280
f140e300
VSO
281/* nbd_parse_error_payload
282 * on success @errp contains message describing nbd error reply
283 */
284static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
285 uint8_t *payload, int *request_ret,
286 Error **errp)
287{
288 uint32_t error;
289 uint16_t message_size;
290
291 assert(chunk->type & (1 << 15));
292
293 if (chunk->length < sizeof(error) + sizeof(message_size)) {
294 error_setg(errp,
295 "Protocol error: invalid payload for structured error");
296 return -EINVAL;
297 }
298
299 error = nbd_errno_to_system_errno(payload_advance32(&payload));
300 if (error == 0) {
e659fb3b 301 error_setg(errp, "Protocol error: server sent structured error chunk "
f140e300
VSO
302 "with error = 0");
303 return -EINVAL;
304 }
305
306 *request_ret = -error;
307 message_size = payload_advance16(&payload);
308
309 if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
e659fb3b 310 error_setg(errp, "Protocol error: server sent structured error chunk "
f140e300
VSO
311 "with incorrect message size");
312 return -EINVAL;
313 }
314
315 /* TODO: Add a trace point to mention the server complaint */
316
317 /* TODO handle ERROR_OFFSET */
318
319 return 0;
320}
321
322static int nbd_co_receive_offset_data_payload(NBDClientSession *s,
323 uint64_t orig_offset,
324 QEMUIOVector *qiov, Error **errp)
325{
326 QEMUIOVector sub_qiov;
327 uint64_t offset;
328 size_t data_size;
329 int ret;
330 NBDStructuredReplyChunk *chunk = &s->reply.structured;
331
332 assert(nbd_reply_is_structured(&s->reply));
333
b4176cb3
EB
334 /* The NBD spec requires at least one byte of payload */
335 if (chunk->length <= sizeof(offset)) {
f140e300
VSO
336 error_setg(errp, "Protocol error: invalid payload for "
337 "NBD_REPLY_TYPE_OFFSET_DATA");
338 return -EINVAL;
339 }
340
e6798f06 341 if (nbd_read64(s->ioc, &offset, "OFFSET_DATA offset", errp) < 0) {
f140e300
VSO
342 return -EIO;
343 }
f140e300
VSO
344
345 data_size = chunk->length - sizeof(offset);
b4176cb3 346 assert(data_size);
f140e300
VSO
347 if (offset < orig_offset || data_size > qiov->size ||
348 offset > orig_offset + qiov->size - data_size) {
349 error_setg(errp, "Protocol error: server sent chunk exceeding requested"
350 " region");
351 return -EINVAL;
352 }
353
354 qemu_iovec_init(&sub_qiov, qiov->niov);
355 qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
356 ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
357 qemu_iovec_destroy(&sub_qiov);
358
359 return ret < 0 ? -EIO : 0;
360}
361
362#define NBD_MAX_MALLOC_PAYLOAD 1000
363/* nbd_co_receive_structured_payload
364 */
365static coroutine_fn int nbd_co_receive_structured_payload(
366 NBDClientSession *s, void **payload, Error **errp)
367{
368 int ret;
369 uint32_t len;
370
371 assert(nbd_reply_is_structured(&s->reply));
372
373 len = s->reply.structured.length;
374
375 if (len == 0) {
376 return 0;
377 }
378
379 if (payload == NULL) {
380 error_setg(errp, "Unexpected structured payload");
381 return -EINVAL;
382 }
383
384 if (len > NBD_MAX_MALLOC_PAYLOAD) {
385 error_setg(errp, "Payload too large");
386 return -EINVAL;
387 }
388
389 *payload = g_new(char, len);
e6798f06 390 ret = nbd_read(s->ioc, *payload, len, "structured payload", errp);
f140e300
VSO
391 if (ret < 0) {
392 g_free(*payload);
393 *payload = NULL;
394 return ret;
395 }
396
397 return 0;
398}
399
400/* nbd_co_do_receive_one_chunk
401 * for simple reply:
402 * set request_ret to received reply error
403 * if qiov is not NULL: read payload to @qiov
404 * for structured reply chunk:
405 * if error chunk: read payload, set @request_ret, do not set @payload
406 * else if offset_data chunk: read payload data to @qiov, do not set @payload
407 * else: read payload to @payload
408 *
409 * If function fails, @errp contains corresponding error message, and the
410 * connection with the server is suspect. If it returns 0, then the
411 * transaction succeeded (although @request_ret may be a negative errno
412 * corresponding to the server's error reply), and errp is unchanged.
413 */
414static coroutine_fn int nbd_co_do_receive_one_chunk(
415 NBDClientSession *s, uint64_t handle, bool only_structured,
416 int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
2302c1ca 417{
319a56cd 418 int ret;
ed397b2f 419 int i = HANDLE_TO_INDEX(s, handle);
f140e300
VSO
420 void *local_payload = NULL;
421 NBDStructuredReplyChunk *chunk;
422
423 if (payload) {
424 *payload = NULL;
425 }
426 *request_ret = 0;
2302c1ca 427
ff82911c 428 /* Wait until we're woken up by nbd_read_reply_entry. */
40f4a218 429 s->requests[i].receiving = true;
2302c1ca 430 qemu_coroutine_yield();
40f4a218 431 s->requests[i].receiving = false;
93970672 432 if (!s->ioc || s->quit) {
f140e300
VSO
433 error_setg(errp, "Connection closed");
434 return -EIO;
435 }
436
437 assert(s->reply.handle == handle);
438
439 if (nbd_reply_is_simple(&s->reply)) {
440 if (only_structured) {
441 error_setg(errp, "Protocol error: simple reply when structured "
442 "reply chunk was expected");
443 return -EINVAL;
2302c1ca
MAL
444 }
445
f140e300
VSO
446 *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
447 if (*request_ret < 0 || !qiov) {
448 return 0;
449 }
450
451 return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
452 errp) < 0 ? -EIO : 0;
453 }
454
455 /* handle structured reply chunk */
456 assert(s->info.structured_reply);
457 chunk = &s->reply.structured;
458
459 if (chunk->type == NBD_REPLY_TYPE_NONE) {
460 if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
461 error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
e659fb3b 462 " NBD_REPLY_FLAG_DONE flag set");
f140e300
VSO
463 return -EINVAL;
464 }
b4176cb3
EB
465 if (chunk->length) {
466 error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
467 " nonzero length");
468 return -EINVAL;
469 }
f140e300
VSO
470 return 0;
471 }
472
473 if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
474 if (!qiov) {
475 error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
476 return -EINVAL;
477 }
478
479 return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
480 qiov, errp);
481 }
482
483 if (nbd_reply_type_is_error(chunk->type)) {
484 payload = &local_payload;
485 }
486
487 ret = nbd_co_receive_structured_payload(s, payload, errp);
488 if (ret < 0) {
489 return ret;
2302c1ca 490 }
ff82911c 491
f140e300
VSO
492 if (nbd_reply_type_is_error(chunk->type)) {
493 ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
494 g_free(local_payload);
495 return ret;
496 }
497
498 return 0;
499}
500
501/* nbd_co_receive_one_chunk
502 * Read reply, wake up read_reply_co and set s->quit if needed.
503 * Return value is a fatal error code or normal nbd reply error code
504 */
505static coroutine_fn int nbd_co_receive_one_chunk(
506 NBDClientSession *s, uint64_t handle, bool only_structured,
7f86068d
VSO
507 int *request_ret, QEMUIOVector *qiov, NBDReply *reply, void **payload,
508 Error **errp)
f140e300 509{
f140e300 510 int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
7f86068d 511 request_ret, qiov, payload, errp);
f140e300
VSO
512
513 if (ret < 0) {
514 s->quit = true;
515 } else {
516 /* For assert at loop start in nbd_read_reply_entry */
517 if (reply) {
518 *reply = s->reply;
519 }
520 s->reply.handle = 0;
f140e300 521 }
ff82911c 522
ff82911c
PB
523 if (s->read_reply_co) {
524 aio_co_wake(s->read_reply_co);
2302c1ca 525 }
6bdcc018 526
f140e300
VSO
527 return ret;
528}
529
530typedef struct NBDReplyChunkIter {
531 int ret;
7f86068d 532 int request_ret;
f140e300
VSO
533 Error *err;
534 bool done, only_structured;
535} NBDReplyChunkIter;
536
7f86068d
VSO
537static void nbd_iter_channel_error(NBDReplyChunkIter *iter,
538 int ret, Error **local_err)
f140e300
VSO
539{
540 assert(ret < 0);
541
7f86068d 542 if (!iter->ret) {
f140e300
VSO
543 iter->ret = ret;
544 error_propagate(&iter->err, *local_err);
545 } else {
546 error_free(*local_err);
547 }
548
549 *local_err = NULL;
550}
551
7f86068d
VSO
552static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
553{
554 assert(ret < 0);
555
556 if (!iter->request_ret) {
557 iter->request_ret = ret;
558 }
559}
560
f140e300
VSO
561/* NBD_FOREACH_REPLY_CHUNK
562 */
563#define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
564 qiov, reply, payload) \
565 for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
566 nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
567
568/* nbd_reply_chunk_iter_receive
569 */
570static bool nbd_reply_chunk_iter_receive(NBDClientSession *s,
571 NBDReplyChunkIter *iter,
572 uint64_t handle,
573 QEMUIOVector *qiov, NBDReply *reply,
574 void **payload)
575{
7f86068d 576 int ret, request_ret;
f140e300
VSO
577 NBDReply local_reply;
578 NBDStructuredReplyChunk *chunk;
579 Error *local_err = NULL;
580 if (s->quit) {
581 error_setg(&local_err, "Connection closed");
7f86068d 582 nbd_iter_channel_error(iter, -EIO, &local_err);
f140e300
VSO
583 goto break_loop;
584 }
585
586 if (iter->done) {
587 /* Previous iteration was last. */
588 goto break_loop;
589 }
590
591 if (reply == NULL) {
592 reply = &local_reply;
593 }
594
595 ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
7f86068d
VSO
596 &request_ret, qiov, reply, payload,
597 &local_err);
f140e300 598 if (ret < 0) {
7f86068d
VSO
599 nbd_iter_channel_error(iter, ret, &local_err);
600 } else if (request_ret < 0) {
601 nbd_iter_request_error(iter, request_ret);
f140e300
VSO
602 }
603
604 /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
605 if (nbd_reply_is_simple(&s->reply) || s->quit) {
606 goto break_loop;
607 }
608
609 chunk = &reply->structured;
610 iter->only_structured = true;
611
612 if (chunk->type == NBD_REPLY_TYPE_NONE) {
613 /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
614 assert(chunk->flags & NBD_REPLY_FLAG_DONE);
615 goto break_loop;
616 }
617
618 if (chunk->flags & NBD_REPLY_FLAG_DONE) {
619 /* This iteration is last. */
620 iter->done = true;
621 }
622
623 /* Execute the loop body */
624 return true;
625
626break_loop:
627 s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
628
6bdcc018
PB
629 qemu_co_mutex_lock(&s->send_mutex);
630 s->in_flight--;
631 qemu_co_queue_next(&s->free_sema);
632 qemu_co_mutex_unlock(&s->send_mutex);
319a56cd 633
f140e300 634 return false;
2302c1ca
MAL
635}
636
f140e300 637static int nbd_co_receive_return_code(NBDClientSession *s, uint64_t handle,
7f86068d 638 int *request_ret, Error **errp)
f140e300
VSO
639{
640 NBDReplyChunkIter iter;
641
642 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) {
643 /* nbd_reply_chunk_iter_receive does all the work */
644 }
645
646 error_propagate(errp, iter.err);
7f86068d 647 *request_ret = iter.request_ret;
f140e300
VSO
648 return iter.ret;
649}
650
651static int nbd_co_receive_cmdread_reply(NBDClientSession *s, uint64_t handle,
652 uint64_t offset, QEMUIOVector *qiov,
7f86068d 653 int *request_ret, Error **errp)
f140e300
VSO
654{
655 NBDReplyChunkIter iter;
656 NBDReply reply;
657 void *payload = NULL;
658 Error *local_err = NULL;
659
660 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
661 qiov, &reply, &payload)
662 {
663 int ret;
664 NBDStructuredReplyChunk *chunk = &reply.structured;
665
666 assert(nbd_reply_is_structured(&reply));
667
668 switch (chunk->type) {
669 case NBD_REPLY_TYPE_OFFSET_DATA:
670 /* special cased in nbd_co_receive_one_chunk, data is already
671 * in qiov */
672 break;
673 case NBD_REPLY_TYPE_OFFSET_HOLE:
674 ret = nbd_parse_offset_hole_payload(&reply.structured, payload,
675 offset, qiov, &local_err);
676 if (ret < 0) {
677 s->quit = true;
7f86068d 678 nbd_iter_channel_error(&iter, ret, &local_err);
f140e300
VSO
679 }
680 break;
681 default:
682 if (!nbd_reply_type_is_error(chunk->type)) {
683 /* not allowed reply type */
684 s->quit = true;
685 error_setg(&local_err,
686 "Unexpected reply type: %d (%s) for CMD_READ",
687 chunk->type, nbd_reply_type_lookup(chunk->type));
7f86068d 688 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
f140e300
VSO
689 }
690 }
691
692 g_free(payload);
693 payload = NULL;
694 }
695
696 error_propagate(errp, iter.err);
7f86068d 697 *request_ret = iter.request_ret;
f140e300
VSO
698 return iter.ret;
699}
700
78a33ab5
VSO
701static int nbd_co_receive_blockstatus_reply(NBDClientSession *s,
702 uint64_t handle, uint64_t length,
7f86068d
VSO
703 NBDExtent *extent,
704 int *request_ret, Error **errp)
78a33ab5
VSO
705{
706 NBDReplyChunkIter iter;
707 NBDReply reply;
708 void *payload = NULL;
709 Error *local_err = NULL;
710 bool received = false;
711
712 assert(!extent->length);
713 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
714 NULL, &reply, &payload)
715 {
716 int ret;
717 NBDStructuredReplyChunk *chunk = &reply.structured;
718
719 assert(nbd_reply_is_structured(&reply));
720
721 switch (chunk->type) {
722 case NBD_REPLY_TYPE_BLOCK_STATUS:
723 if (received) {
724 s->quit = true;
725 error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
7f86068d 726 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
78a33ab5
VSO
727 }
728 received = true;
729
730 ret = nbd_parse_blockstatus_payload(s, &reply.structured,
731 payload, length, extent,
732 &local_err);
733 if (ret < 0) {
734 s->quit = true;
7f86068d 735 nbd_iter_channel_error(&iter, ret, &local_err);
78a33ab5
VSO
736 }
737 break;
738 default:
739 if (!nbd_reply_type_is_error(chunk->type)) {
740 s->quit = true;
741 error_setg(&local_err,
742 "Unexpected reply type: %d (%s) "
743 "for CMD_BLOCK_STATUS",
744 chunk->type, nbd_reply_type_lookup(chunk->type));
7f86068d 745 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
78a33ab5
VSO
746 }
747 }
748
749 g_free(payload);
750 payload = NULL;
751 }
752
753 if (!extent->length && !iter.err) {
754 error_setg(&iter.err,
755 "Server did not reply with any status extents");
756 if (!iter.ret) {
757 iter.ret = -EIO;
758 }
759 }
7f86068d 760
78a33ab5 761 error_propagate(errp, iter.err);
7f86068d 762 *request_ret = iter.request_ret;
78a33ab5
VSO
763 return iter.ret;
764}
765
f140e300
VSO
766static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
767 QEMUIOVector *write_qiov)
f35dff7e 768{
7f86068d 769 int ret, request_ret;
f140e300
VSO
770 Error *local_err = NULL;
771 NBDClientSession *client = nbd_get_client_session(bs);
f35dff7e 772
f140e300
VSO
773 assert(request->type != NBD_CMD_READ);
774 if (write_qiov) {
775 assert(request->type == NBD_CMD_WRITE);
776 assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
4bfe4478 777 } else {
f140e300 778 assert(request->type != NBD_CMD_WRITE);
4bfe4478 779 }
f140e300 780 ret = nbd_co_send_request(bs, request, write_qiov);
f35dff7e 781 if (ret < 0) {
319a56cd 782 return ret;
f35dff7e 783 }
319a56cd 784
7f86068d
VSO
785 ret = nbd_co_receive_return_code(client, request->handle,
786 &request_ret, &local_err);
f140e300 787 if (local_err) {
d8b4bad8
VSO
788 trace_nbd_co_request_fail(request->from, request->len, request->handle,
789 request->flags, request->type,
790 nbd_cmd_lookup(request->type),
791 ret, error_get_pretty(local_err));
792 error_free(local_err);
f140e300 793 }
7f86068d 794 return ret ? ret : request_ret;
f35dff7e
VSO
795}
796
70c4fb26
EB
797int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
798 uint64_t bytes, QEMUIOVector *qiov, int flags)
2302c1ca 799{
7f86068d 800 int ret, request_ret;
f140e300
VSO
801 Error *local_err = NULL;
802 NBDClientSession *client = nbd_get_client_session(bs);
ed2dd912 803 NBDRequest request = {
70c4fb26
EB
804 .type = NBD_CMD_READ,
805 .from = offset,
806 .len = bytes,
807 };
2302c1ca 808
70c4fb26
EB
809 assert(bytes <= NBD_MAX_BUFFER_SIZE);
810 assert(!flags);
2302c1ca 811
9d8f818c
EB
812 if (!bytes) {
813 return 0;
814 }
f140e300
VSO
815 ret = nbd_co_send_request(bs, &request, NULL);
816 if (ret < 0) {
817 return ret;
818 }
819
820 ret = nbd_co_receive_cmdread_reply(client, request.handle, offset, qiov,
7f86068d 821 &request_ret, &local_err);
08ace1d7 822 if (local_err) {
d8b4bad8
VSO
823 trace_nbd_co_request_fail(request.from, request.len, request.handle,
824 request.flags, request.type,
825 nbd_cmd_lookup(request.type),
826 ret, error_get_pretty(local_err));
827 error_free(local_err);
f140e300 828 }
7f86068d 829 return ret ? ret : request_ret;
2302c1ca
MAL
830}
831
70c4fb26
EB
832int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
833 uint64_t bytes, QEMUIOVector *qiov, int flags)
2302c1ca 834{
10676b81 835 NBDClientSession *client = nbd_get_client_session(bs);
ed2dd912 836 NBDRequest request = {
70c4fb26
EB
837 .type = NBD_CMD_WRITE,
838 .from = offset,
839 .len = bytes,
840 };
2302c1ca 841
1104d83c 842 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
52a46505 843 if (flags & BDRV_REQ_FUA) {
004a89fc 844 assert(client->info.flags & NBD_FLAG_SEND_FUA);
b626b51a 845 request.flags |= NBD_CMD_FLAG_FUA;
2302c1ca
MAL
846 }
847
70c4fb26 848 assert(bytes <= NBD_MAX_BUFFER_SIZE);
2302c1ca 849
9d8f818c
EB
850 if (!bytes) {
851 return 0;
852 }
f35dff7e 853 return nbd_co_request(bs, &request, qiov);
2302c1ca
MAL
854}
855
fa778fff 856int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
f5a5ca79 857 int bytes, BdrvRequestFlags flags)
fa778fff 858{
fa778fff
EB
859 NBDClientSession *client = nbd_get_client_session(bs);
860 NBDRequest request = {
861 .type = NBD_CMD_WRITE_ZEROES,
862 .from = offset,
f5a5ca79 863 .len = bytes,
fa778fff 864 };
fa778fff 865
1104d83c 866 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
004a89fc 867 if (!(client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
fa778fff
EB
868 return -ENOTSUP;
869 }
870
871 if (flags & BDRV_REQ_FUA) {
004a89fc 872 assert(client->info.flags & NBD_FLAG_SEND_FUA);
fa778fff
EB
873 request.flags |= NBD_CMD_FLAG_FUA;
874 }
875 if (!(flags & BDRV_REQ_MAY_UNMAP)) {
876 request.flags |= NBD_CMD_FLAG_NO_HOLE;
877 }
878
9d8f818c
EB
879 if (!bytes) {
880 return 0;
881 }
f35dff7e 882 return nbd_co_request(bs, &request, NULL);
fa778fff
EB
883}
884
f53a829b 885int nbd_client_co_flush(BlockDriverState *bs)
2302c1ca 886{
10676b81 887 NBDClientSession *client = nbd_get_client_session(bs);
ed2dd912 888 NBDRequest request = { .type = NBD_CMD_FLUSH };
2302c1ca 889
004a89fc 890 if (!(client->info.flags & NBD_FLAG_SEND_FLUSH)) {
2302c1ca
MAL
891 return 0;
892 }
893
2302c1ca
MAL
894 request.from = 0;
895 request.len = 0;
896
f35dff7e 897 return nbd_co_request(bs, &request, NULL);
2302c1ca
MAL
898}
899
f5a5ca79 900int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int bytes)
2302c1ca 901{
10676b81 902 NBDClientSession *client = nbd_get_client_session(bs);
ed2dd912 903 NBDRequest request = {
447e57c3
EB
904 .type = NBD_CMD_TRIM,
905 .from = offset,
f5a5ca79 906 .len = bytes,
447e57c3 907 };
2302c1ca 908
1104d83c 909 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
9d8f818c 910 if (!(client->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
2302c1ca
MAL
911 return 0;
912 }
2302c1ca 913
f35dff7e 914 return nbd_co_request(bs, &request, NULL);
2302c1ca
MAL
915}
916
78a33ab5
VSO
917int coroutine_fn nbd_client_co_block_status(BlockDriverState *bs,
918 bool want_zero,
919 int64_t offset, int64_t bytes,
920 int64_t *pnum, int64_t *map,
921 BlockDriverState **file)
922{
7f86068d 923 int ret, request_ret;
78a33ab5
VSO
924 NBDExtent extent = { 0 };
925 NBDClientSession *client = nbd_get_client_session(bs);
926 Error *local_err = NULL;
927
928 NBDRequest request = {
929 .type = NBD_CMD_BLOCK_STATUS,
930 .from = offset,
931 .len = MIN(MIN_NON_ZERO(QEMU_ALIGN_DOWN(INT_MAX,
932 bs->bl.request_alignment),
933 client->info.max_block), bytes),
934 .flags = NBD_CMD_FLAG_REQ_ONE,
935 };
936
937 if (!client->info.base_allocation) {
938 *pnum = bytes;
939 return BDRV_BLOCK_DATA;
940 }
941
942 ret = nbd_co_send_request(bs, &request, NULL);
943 if (ret < 0) {
944 return ret;
945 }
946
947 ret = nbd_co_receive_blockstatus_reply(client, request.handle, bytes,
7f86068d 948 &extent, &request_ret, &local_err);
78a33ab5 949 if (local_err) {
d8b4bad8
VSO
950 trace_nbd_co_request_fail(request.from, request.len, request.handle,
951 request.flags, request.type,
952 nbd_cmd_lookup(request.type),
953 ret, error_get_pretty(local_err));
954 error_free(local_err);
78a33ab5 955 }
7f86068d
VSO
956 if (ret < 0 || request_ret < 0) {
957 return ret ? ret : request_ret;
78a33ab5
VSO
958 }
959
960 assert(extent.length);
961 *pnum = extent.length;
962 return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
963 (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0);
964}
965
f53a829b 966void nbd_client_detach_aio_context(BlockDriverState *bs)
69447cd8 967{
ff82911c 968 NBDClientSession *client = nbd_get_client_session(bs);
96d06835 969 qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
69447cd8
SH
970}
971
f53a829b
HR
972void nbd_client_attach_aio_context(BlockDriverState *bs,
973 AioContext *new_context)
69447cd8 974{
ff82911c 975 NBDClientSession *client = nbd_get_client_session(bs);
96d06835 976 qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc), new_context);
ff82911c 977 aio_co_schedule(new_context, client->read_reply_co);
69447cd8
SH
978}
979
f53a829b 980void nbd_client_close(BlockDriverState *bs)
2302c1ca 981{
10676b81 982 NBDClientSession *client = nbd_get_client_session(bs);
ed2dd912 983 NBDRequest request = { .type = NBD_CMD_DISC };
2302c1ca 984
064097d9 985 if (client->ioc == NULL) {
4a41a2d6
SH
986 return;
987 }
988
1c778ef7 989 nbd_send_request(client->ioc, &request);
5ad283eb 990
f53a829b 991 nbd_teardown_connection(bs);
2302c1ca
MAL
992}
993
d42f78e9
VSO
994static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
995 Error **errp)
996{
997 QIOChannelSocket *sioc;
998 Error *local_err = NULL;
999
1000 sioc = qio_channel_socket_new();
1001 qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
1002
1003 qio_channel_socket_connect_sync(sioc, saddr, &local_err);
1004 if (local_err) {
1005 object_unref(OBJECT(sioc));
1006 error_propagate(errp, local_err);
1007 return NULL;
1008 }
1009
1010 qio_channel_set_delay(QIO_CHANNEL(sioc), false);
1011
1012 return sioc;
1013}
1014
75822a12 1015int nbd_client_init(BlockDriverState *bs,
d42f78e9 1016 SocketAddress *saddr,
75822a12
DB
1017 const char *export,
1018 QCryptoTLSCreds *tlscreds,
1019 const char *hostname,
216ee365 1020 const char *x_dirty_bitmap,
75822a12 1021 Error **errp)
2302c1ca 1022{
10676b81 1023 NBDClientSession *client = nbd_get_client_session(bs);
2302c1ca
MAL
1024 int ret;
1025
d42f78e9
VSO
1026 /*
1027 * establish TCP connection, return error if it fails
1028 * TODO: Configurable retry-until-timeout behaviour.
1029 */
1030 QIOChannelSocket *sioc = nbd_establish_connection(saddr, errp);
1031
1032 if (!sioc) {
1033 return -ECONNREFUSED;
1034 }
1035
2302c1ca 1036 /* NBD handshake */
e2bc625f 1037 logout("session init %s\n", export);
064097d9
DB
1038 qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
1039
081dd1fe 1040 client->info.request_sizes = true;
f140e300 1041 client->info.structured_reply = true;
78a33ab5 1042 client->info.base_allocation = true;
216ee365 1043 client->info.x_dirty_bitmap = g_strdup(x_dirty_bitmap);
6dc1667d
EB
1044 client->info.name = g_strdup(export ?: "");
1045 ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), tlscreds, hostname,
004a89fc 1046 &client->ioc, &client->info, errp);
216ee365 1047 g_free(client->info.x_dirty_bitmap);
6dc1667d 1048 g_free(client->info.name);
2302c1ca
MAL
1049 if (ret < 0) {
1050 logout("Failed to negotiate with the NBD server\n");
d42f78e9 1051 object_unref(OBJECT(sioc));
2302c1ca
MAL
1052 return ret;
1053 }
47829c40
EB
1054 if (x_dirty_bitmap && !client->info.base_allocation) {
1055 error_setg(errp, "requested x-dirty-bitmap %s not found",
1056 x_dirty_bitmap);
c688e6ca
EB
1057 ret = -EINVAL;
1058 goto fail;
47829c40 1059 }
6c2e581d
KW
1060 if (client->info.flags & NBD_FLAG_READ_ONLY) {
1061 ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
1062 if (ret < 0) {
c688e6ca 1063 goto fail;
6c2e581d 1064 }
1104d83c 1065 }
004a89fc 1066 if (client->info.flags & NBD_FLAG_SEND_FUA) {
4df863f3 1067 bs->supported_write_flags = BDRV_REQ_FUA;
169407e1
EB
1068 bs->supported_zero_flags |= BDRV_REQ_FUA;
1069 }
004a89fc 1070 if (client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
169407e1 1071 bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
4df863f3 1072 }
2302c1ca
MAL
1073
1074 qemu_co_mutex_init(&client->send_mutex);
9bc9732f 1075 qemu_co_queue_init(&client->free_sema);
064097d9 1076 client->sioc = sioc;
f95910fe
DB
1077
1078 if (!client->ioc) {
1079 client->ioc = QIO_CHANNEL(sioc);
1080 object_ref(OBJECT(client->ioc));
1081 }
2302c1ca
MAL
1082
1083 /* Now that we're connected, set the socket to be non-blocking and
1084 * kick the reply mechanism. */
064097d9 1085 qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
ff82911c 1086 client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client);
f53a829b 1087 nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
2302c1ca
MAL
1088
1089 logout("Established connection with NBD server\n");
1090 return 0;
c688e6ca
EB
1091
1092 fail:
1093 /*
1094 * We have connected, but must fail for other reasons. The
1095 * connection is still blocking; send NBD_CMD_DISC as a courtesy
1096 * to the server.
1097 */
1098 {
1099 NBDRequest request = { .type = NBD_CMD_DISC };
1100
1101 nbd_send_request(client->ioc ?: QIO_CHANNEL(sioc), &request);
d42f78e9
VSO
1102
1103 object_unref(OBJECT(sioc));
1104
c688e6ca
EB
1105 return ret;
1106 }
2302c1ca 1107}