X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=nbd.c;h=2606403a41facd02d5b50d14dcd407b3aa62bd1c;hb=15d23fb96656b1ae31bf4305b2108230c29298c6;hp=406e555bc69fcdd35c0a49f094ba17d2d43c6a9b;hpb=e6f5d0be730a41bacb10edba19d1369ec2949486;p=qemu.git diff --git a/nbd.c b/nbd.c index 406e555bc..2606403a4 100644 --- a/nbd.c +++ b/nbd.c @@ -16,11 +16,10 @@ * along with this program; if not, see . */ -#include "nbd.h" -#include "block.h" -#include "block_int.h" +#include "block/nbd.h" +#include "block/block.h" -#include "qemu-coroutine.h" +#include "block/coroutine.h" #include #include @@ -37,8 +36,8 @@ #include #endif -#include "qemu_socket.h" -#include "qemu-queue.h" +#include "qemu/sockets.h" +#include "qemu/queue.h" //#define DEBUG_NBD @@ -58,9 +57,12 @@ /* This is all part of the "official" NBD API */ +#define NBD_REQUEST_SIZE (4 + 4 + 8 + 8 + 4) #define NBD_REPLY_SIZE (4 + 4 + 8) #define NBD_REQUEST_MAGIC 0x25609513 #define NBD_REPLY_MAGIC 0x67446698 +#define NBD_OPTS_MAGIC 0x49484156454F5054LL +#define NBD_CLIENT_MAGIC 0x0000420281861253LL #define NBD_SET_SOCK _IO(0xab, 0) #define NBD_SET_BLKSIZE _IO(0xab, 1) @@ -76,14 +78,54 @@ #define NBD_OPT_EXPORT_NAME (1 << 0) -/* That's all folks */ +/* Definitions for opaque data types */ + +typedef struct NBDRequest NBDRequest; + +struct NBDRequest { + QSIMPLEQ_ENTRY(NBDRequest) entry; + NBDClient *client; + uint8_t *data; +}; + +struct NBDExport { + int refcount; + void (*close)(NBDExport *exp); + + BlockDriverState *bs; + char *name; + off_t dev_offset; + off_t size; + uint32_t nbdflags; + QTAILQ_HEAD(, NBDClient) clients; + QTAILQ_ENTRY(NBDExport) next; +}; + +static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports); + +struct NBDClient { + int refcount; + void (*close)(NBDClient *client); + + NBDExport *exp; + int sock; + + Coroutine *recv_coroutine; + + CoMutex send_lock; + Coroutine *send_coroutine; -#define read_sync(fd, buffer, size) nbd_wr_sync(fd, buffer, size, true) -#define write_sync(fd, buffer, size) nbd_wr_sync(fd, buffer, size, false) + QTAILQ_ENTRY(NBDClient) next; + int nb_requests; + bool closing; +}; + +/* That's all folks */ -size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) +ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) { size_t offset = 0; + int err; if (qemu_in_coroutine()) { if (do_read) { @@ -102,12 +144,16 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) len = send(fd, buffer + offset, size - offset, 0); } - if (len == -1) - errno = socket_error(); + if (len < 0) { + err = socket_error(); - /* recoverable error */ - if (len == -1 && (errno == EAGAIN || errno == EINTR)) { - continue; + /* recoverable error */ + if (err == EINTR || (offset > 0 && err == EAGAIN)) { + continue; + } + + /* unrecoverable error */ + return -err; } /* eof */ @@ -115,17 +161,32 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) break; } - /* unrecoverable error */ - if (len == -1) { - return 0; - } - offset += len; } return offset; } +static ssize_t read_sync(int fd, void *buffer, size_t size) +{ + /* Sockets are kept in blocking mode in the negotiation phase. After + * that, a non-readable socket simply means that another thread stole + * our request/reply. Synchronization is done with recv_coroutine, so + * that this is coroutine-safe. + */ + return nbd_wr_sync(fd, buffer, size, true); +} + +static ssize_t write_sync(int fd, void *buffer, size_t size) +{ + int ret; + do { + /* For writes, we do expect the socket to be writable. */ + ret = nbd_wr_sync(fd, buffer, size, false); + } while (ret == -EAGAIN); + return ret; +} + static void combine_addr(char *buf, size_t len, const char* address, uint16_t port) { @@ -137,16 +198,16 @@ static void combine_addr(char *buf, size_t len, const char* address, } } -int tcp_socket_outgoing(const char *address, uint16_t port) +int tcp_socket_outgoing_opts(QemuOpts *opts) { - char address_and_port[128]; - combine_addr(address_and_port, 128, address, port); - return tcp_socket_outgoing_spec(address_and_port); -} + Error *local_err = NULL; + int fd = inet_connect_opts(opts, &local_err, NULL, NULL); + if (local_err != NULL) { + qerror_report_err(local_err); + error_free(local_err); + } -int tcp_socket_outgoing_spec(const char *address_and_port) -{ - return inet_connect(address_and_port, SOCK_STREAM); + return fd; } int tcp_socket_incoming(const char *address, uint16_t port) @@ -158,29 +219,57 @@ int tcp_socket_incoming(const char *address, uint16_t port) int tcp_socket_incoming_spec(const char *address_and_port) { - char *ostr = NULL; - int olen = 0; - return inet_listen(address_and_port, ostr, olen, SOCK_STREAM, 0); + Error *local_err = NULL; + int fd = inet_listen(address_and_port, NULL, 0, SOCK_STREAM, 0, &local_err); + + if (local_err != NULL) { + qerror_report_err(local_err); + error_free(local_err); + } + return fd; } int unix_socket_incoming(const char *path) { - char *ostr = NULL; - int olen = 0; + Error *local_err = NULL; + int fd = unix_listen(path, NULL, 0, &local_err); - return unix_listen(path, ostr, olen); + if (local_err != NULL) { + qerror_report_err(local_err); + error_free(local_err); + } + return fd; } int unix_socket_outgoing(const char *path) { - return unix_connect(path); + Error *local_err = NULL; + int fd = unix_connect(path, &local_err); + + if (local_err != NULL) { + qerror_report_err(local_err); + error_free(local_err); + } + return fd; } -/* Basic flow +/* Basic flow for negotiation Server Client - Negotiate + + or + + Server Client + Negotiate #1 + Option + Negotiate #2 + + ---- + + followed by + + Server Client Request Response Request @@ -188,38 +277,159 @@ int unix_socket_outgoing(const char *path) ... ... Request (type == 2) + */ -static int nbd_send_negotiate(int csock, off_t size, uint32_t flags) +static int nbd_receive_options(NBDClient *client) { + int csock = client->sock; + char name[256]; + uint32_t tmp, length; + uint64_t magic; + int rc; + + /* Client sends: + [ 0 .. 3] reserved (0) + [ 4 .. 11] NBD_OPTS_MAGIC + [12 .. 15] NBD_OPT_EXPORT_NAME + [16 .. 19] length + [20 .. xx] export name (length bytes) + */ + + rc = -EINVAL; + if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) { + LOG("read failed"); + goto fail; + } + TRACE("Checking reserved"); + if (tmp != 0) { + LOG("Bad reserved received"); + goto fail; + } + + if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) { + LOG("read failed"); + goto fail; + } + TRACE("Checking reserved"); + if (magic != be64_to_cpu(NBD_OPTS_MAGIC)) { + LOG("Bad magic received"); + goto fail; + } + + if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) { + LOG("read failed"); + goto fail; + } + TRACE("Checking option"); + if (tmp != be32_to_cpu(NBD_OPT_EXPORT_NAME)) { + LOG("Bad option received"); + goto fail; + } + + if (read_sync(csock, &length, sizeof(length)) != sizeof(length)) { + LOG("read failed"); + goto fail; + } + TRACE("Checking length"); + length = be32_to_cpu(length); + if (length > 255) { + LOG("Bad length received"); + goto fail; + } + if (read_sync(csock, name, length) != length) { + LOG("read failed"); + goto fail; + } + name[length] = '\0'; + + client->exp = nbd_export_find(name); + if (!client->exp) { + LOG("export not found"); + goto fail; + } + + QTAILQ_INSERT_TAIL(&client->exp->clients, client, next); + nbd_export_get(client->exp); + + TRACE("Option negotiation succeeded."); + rc = 0; +fail: + return rc; +} + +static int nbd_send_negotiate(NBDClient *client) +{ + int csock = client->sock; char buf[8 + 8 + 8 + 128]; + int rc; + const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM | + NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA); - /* Negotiate - [ 0 .. 7] passwd ("NBDMAGIC") - [ 8 .. 15] magic (0x00420281861253) + /* Negotiation header without options: + [ 0 .. 7] passwd ("NBDMAGIC") + [ 8 .. 15] magic (NBD_CLIENT_MAGIC) [16 .. 23] size - [24 .. 27] flags - [28 .. 151] reserved (0) + [24 .. 25] server flags (0) + [24 .. 27] export flags + [28 .. 151] reserved (0) + + Negotiation header with options, part 1: + [ 0 .. 7] passwd ("NBDMAGIC") + [ 8 .. 15] magic (NBD_OPTS_MAGIC) + [16 .. 17] server flags (0) + + part 2 (after options are sent): + [18 .. 25] size + [26 .. 27] export flags + [28 .. 151] reserved (0) */ + qemu_set_block(csock); + rc = -EINVAL; + TRACE("Beginning negotiation."); + memset(buf, 0, sizeof(buf)); memcpy(buf, "NBDMAGIC", 8); - cpu_to_be64w((uint64_t*)(buf + 8), 0x00420281861253LL); - cpu_to_be64w((uint64_t*)(buf + 16), size); - cpu_to_be32w((uint32_t*)(buf + 24), - flags | NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM | - NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA); - memset(buf + 28, 0, 124); + if (client->exp) { + assert ((client->exp->nbdflags & ~65535) == 0); + cpu_to_be64w((uint64_t*)(buf + 8), NBD_CLIENT_MAGIC); + cpu_to_be64w((uint64_t*)(buf + 16), client->exp->size); + cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags); + } else { + cpu_to_be64w((uint64_t*)(buf + 8), NBD_OPTS_MAGIC); + } + + if (client->exp) { + if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) { + LOG("write failed"); + goto fail; + } + } else { + if (write_sync(csock, buf, 18) != 18) { + LOG("write failed"); + goto fail; + } + rc = nbd_receive_options(client); + if (rc < 0) { + LOG("option negotiation failed"); + goto fail; + } - if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) { - LOG("write failed"); - errno = EINVAL; - return -1; + assert ((client->exp->nbdflags & ~65535) == 0); + cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size); + cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags); + if (write_sync(csock, buf + 18, sizeof(buf) - 18) != sizeof(buf) - 18) { + LOG("write failed"); + goto fail; + } } TRACE("Negotiation succeeded."); - - return 0; + rc = 0; +fail: + qemu_set_nonblock(csock); + return rc; } int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, @@ -228,20 +438,22 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, char buf[256]; uint64_t magic, s; uint16_t tmp; + int rc; TRACE("Receiving negotiation."); + qemu_set_block(csock); + rc = -EINVAL; + if (read_sync(csock, buf, 8) != 8) { LOG("read failed"); - errno = EINVAL; - return -1; + goto fail; } buf[8] = '\0'; if (strlen(buf) == 0) { LOG("server connection closed"); - errno = EINVAL; - return -1; + goto fail; } TRACE("Magic is %c%c%c%c%c%c%c%c", @@ -256,14 +468,12 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, if (memcmp(buf, "NBDMAGIC", 8) != 0) { LOG("Invalid magic received"); - errno = EINVAL; - return -1; + goto fail; } if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) { LOG("read failed"); - errno = EINVAL; - return -1; + goto fail; } magic = be64_to_cpu(magic); TRACE("Magic is 0x%" PRIx64, magic); @@ -274,63 +484,54 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, uint32_t namesize; TRACE("Checking magic (opts_magic)"); - if (magic != 0x49484156454F5054LL) { + if (magic != NBD_OPTS_MAGIC) { LOG("Bad magic received"); - errno = EINVAL; - return -1; + goto fail; } if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) { LOG("flags read failed"); - errno = EINVAL; - return -1; + goto fail; } *flags = be16_to_cpu(tmp) << 16; /* reserved for future use */ if (write_sync(csock, &reserved, sizeof(reserved)) != sizeof(reserved)) { LOG("write failed (reserved)"); - errno = EINVAL; - return -1; + goto fail; } /* write the export name */ magic = cpu_to_be64(magic); if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) { LOG("write failed (magic)"); - errno = EINVAL; - return -1; + goto fail; } opt = cpu_to_be32(NBD_OPT_EXPORT_NAME); if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) { LOG("write failed (opt)"); - errno = EINVAL; - return -1; + goto fail; } namesize = cpu_to_be32(strlen(name)); if (write_sync(csock, &namesize, sizeof(namesize)) != sizeof(namesize)) { LOG("write failed (namesize)"); - errno = EINVAL; - return -1; + goto fail; } if (write_sync(csock, (char*)name, strlen(name)) != strlen(name)) { LOG("write failed (name)"); - errno = EINVAL; - return -1; + goto fail; } } else { TRACE("Checking magic (cli_magic)"); - if (magic != 0x00420281861253LL) { + if (magic != NBD_CLIENT_MAGIC) { LOG("Bad magic received"); - errno = EINVAL; - return -1; + goto fail; } } if (read_sync(csock, &s, sizeof(s)) != sizeof(s)) { LOG("read failed"); - errno = EINVAL; - return -1; + goto fail; } *size = be64_to_cpu(s); *blocksize = 1024; @@ -339,24 +540,25 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, if (!name) { if (read_sync(csock, flags, sizeof(*flags)) != sizeof(*flags)) { LOG("read failed (flags)"); - errno = EINVAL; - return -1; + goto fail; } *flags = be32_to_cpup(flags); } else { if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) { LOG("read failed (tmp)"); - errno = EINVAL; - return -1; + goto fail; } *flags |= be32_to_cpu(tmp); } if (read_sync(csock, &buf, 124) != 124) { LOG("read failed (buf)"); - errno = EINVAL; - return -1; + goto fail; } - return 0; + rc = 0; + +fail: + qemu_set_nonblock(csock); + return rc; } #ifdef __linux__ @@ -364,51 +566,45 @@ int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize) { TRACE("Setting NBD socket"); - if (ioctl(fd, NBD_SET_SOCK, csock) == -1) { + if (ioctl(fd, NBD_SET_SOCK, csock) < 0) { int serrno = errno; LOG("Failed to set NBD socket"); - errno = serrno; - return -1; + return -serrno; } TRACE("Setting block size to %lu", (unsigned long)blocksize); - if (ioctl(fd, NBD_SET_BLKSIZE, blocksize) == -1) { + if (ioctl(fd, NBD_SET_BLKSIZE, blocksize) < 0) { int serrno = errno; LOG("Failed setting NBD block size"); - errno = serrno; - return -1; + return -serrno; } TRACE("Setting size to %zd block(s)", (size_t)(size / blocksize)); - if (ioctl(fd, NBD_SET_SIZE_BLOCKS, size / blocksize) == -1) { + if (ioctl(fd, NBD_SET_SIZE_BLOCKS, size / blocksize) < 0) { int serrno = errno; LOG("Failed setting size (in blocks)"); - errno = serrno; - return -1; + return -serrno; } - if (flags & NBD_FLAG_READ_ONLY) { - int read_only = 1; - TRACE("Setting readonly attribute"); + if (ioctl(fd, NBD_SET_FLAGS, flags) < 0) { + if (errno == ENOTTY) { + int read_only = (flags & NBD_FLAG_READ_ONLY) != 0; + TRACE("Setting readonly attribute"); - if (ioctl(fd, BLKROSET, (unsigned long) &read_only) < 0) { + if (ioctl(fd, BLKROSET, (unsigned long) &read_only) < 0) { + int serrno = errno; + LOG("Failed setting read-only attribute"); + return -serrno; + } + } else { int serrno = errno; - LOG("Failed setting read-only attribute"); - errno = serrno; - return -1; + LOG("Failed setting flags"); + return -serrno; } } - if (ioctl(fd, NBD_SET_FLAGS, flags) < 0 - && errno != ENOTTY) { - int serrno = errno; - LOG("Failed setting flags"); - errno = serrno; - return -1; - } - TRACE("Negotiation ended"); return 0; @@ -430,7 +626,7 @@ int nbd_client(int fd) TRACE("Doing NBD loop"); ret = ioctl(fd, NBD_DO_IT); - if (ret == -1 && errno == EPIPE) { + if (ret < 0 && errno == EPIPE) { /* NBD_DO_IT normally returns EPIPE when someone has disconnected * the socket via NBD_DISCONNECT. We do not want to return 1 in * that case. @@ -453,26 +649,24 @@ int nbd_client(int fd) #else int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize) { - errno = ENOTSUP; - return -1; + return -ENOTSUP; } int nbd_disconnect(int fd) { - errno = ENOTSUP; - return -1; + return -ENOTSUP; } int nbd_client(int fd) { - errno = ENOTSUP; - return -1; + return -ENOTSUP; } #endif -int nbd_send_request(int csock, struct nbd_request *request) +ssize_t nbd_send_request(int csock, struct nbd_request *request) { - uint8_t buf[4 + 4 + 8 + 8 + 4]; + uint8_t buf[NBD_REQUEST_SIZE]; + ssize_t ret; cpu_to_be32w((uint32_t*)buf, NBD_REQUEST_MAGIC); cpu_to_be32w((uint32_t*)(buf + 4), request->type); @@ -484,23 +678,32 @@ int nbd_send_request(int csock, struct nbd_request *request) "{ .from = %" PRIu64", .len = %u, .handle = %" PRIu64", .type=%i}", request->from, request->len, request->handle, request->type); - if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) { + ret = write_sync(csock, buf, sizeof(buf)); + if (ret < 0) { + return ret; + } + + if (ret != sizeof(buf)) { LOG("writing to socket failed"); - errno = EINVAL; - return -1; + return -EINVAL; } return 0; } -static int nbd_receive_request(int csock, struct nbd_request *request) +static ssize_t nbd_receive_request(int csock, struct nbd_request *request) { - uint8_t buf[4 + 4 + 8 + 8 + 4]; + uint8_t buf[NBD_REQUEST_SIZE]; uint32_t magic; + ssize_t ret; - if (read_sync(csock, buf, sizeof(buf)) != sizeof(buf)) { + ret = read_sync(csock, buf, sizeof(buf)); + if (ret < 0) { + return ret; + } + + if (ret != sizeof(buf)) { LOG("read failed"); - errno = EINVAL; - return -1; + return -EINVAL; } /* Request @@ -523,23 +726,25 @@ static int nbd_receive_request(int csock, struct nbd_request *request) if (magic != NBD_REQUEST_MAGIC) { LOG("invalid magic (got 0x%x)", magic); - errno = EINVAL; - return -1; + return -EINVAL; } return 0; } -int nbd_receive_reply(int csock, struct nbd_reply *reply) +ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply) { uint8_t buf[NBD_REPLY_SIZE]; uint32_t magic; + ssize_t ret; - memset(buf, 0xAA, sizeof(buf)); + ret = read_sync(csock, buf, sizeof(buf)); + if (ret < 0) { + return ret; + } - if (read_sync(csock, buf, sizeof(buf)) != sizeof(buf)) { + if (ret != sizeof(buf)) { LOG("read failed"); - errno = EINVAL; - return -1; + return -EINVAL; } /* Reply @@ -558,15 +763,15 @@ int nbd_receive_reply(int csock, struct nbd_reply *reply) if (magic != NBD_REPLY_MAGIC) { LOG("invalid magic (got 0x%x)", magic); - errno = EINVAL; - return -1; + return -EINVAL; } return 0; } -static int nbd_send_reply(int csock, struct nbd_reply *reply) +static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply) { - uint8_t buf[4 + 4 + 8]; + uint8_t buf[NBD_REPLY_SIZE]; + ssize_t ret; /* Reply [ 0 .. 3] magic (NBD_REPLY_MAGIC) @@ -579,85 +784,71 @@ static int nbd_send_reply(int csock, struct nbd_reply *reply) TRACE("Sending response to client"); - if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) { + ret = write_sync(csock, buf, sizeof(buf)); + if (ret < 0) { + return ret; + } + + if (ret != sizeof(buf)) { LOG("writing to socket failed"); - errno = EINVAL; - return -1; + return -EINVAL; } return 0; } #define MAX_NBD_REQUESTS 16 -typedef struct NBDRequest NBDRequest; - -struct NBDRequest { - QSIMPLEQ_ENTRY(NBDRequest) entry; - NBDClient *client; - uint8_t *data; -}; - -struct NBDExport { - BlockDriverState *bs; - off_t dev_offset; - off_t size; - uint32_t nbdflags; - QSIMPLEQ_HEAD(, NBDRequest) requests; -}; - -struct NBDClient { - int refcount; - void (*close)(NBDClient *client); - - NBDExport *exp; - int sock; - - Coroutine *recv_coroutine; - - CoMutex send_lock; - Coroutine *send_coroutine; - - int nb_requests; -}; - -static void nbd_client_get(NBDClient *client) +void nbd_client_get(NBDClient *client) { client->refcount++; } -static void nbd_client_put(NBDClient *client) +void nbd_client_put(NBDClient *client) { if (--client->refcount == 0) { + /* The last reference should be dropped by client->close, + * which is called by nbd_client_close. + */ + assert(client->closing); + + qemu_set_fd_handler2(client->sock, NULL, NULL, NULL, NULL); + close(client->sock); + client->sock = -1; + if (client->exp) { + QTAILQ_REMOVE(&client->exp->clients, client, next); + nbd_export_put(client->exp); + } g_free(client); } } -static void nbd_client_close(NBDClient *client) +void nbd_client_close(NBDClient *client) { - qemu_set_fd_handler2(client->sock, NULL, NULL, NULL, NULL); - close(client->sock); - client->sock = -1; + if (client->closing) { + return; + } + + client->closing = true; + + /* Force requests to finish. They will drop their own references, + * then we'll close the socket and free the NBDClient. + */ + shutdown(client->sock, 2); + + /* Also tell the client, so that they release their reference. */ if (client->close) { client->close(client); } - nbd_client_put(client); } static NBDRequest *nbd_request_get(NBDClient *client) { NBDRequest *req; - NBDExport *exp = client->exp; assert(client->nb_requests <= MAX_NBD_REQUESTS - 1); client->nb_requests++; - if (QSIMPLEQ_EMPTY(&exp->requests)) { - req = g_malloc0(sizeof(NBDRequest)); - req->data = qemu_blockalign(exp->bs, NBD_BUFFER_SIZE); - } else { - req = QSIMPLEQ_FIRST(&exp->requests); - QSIMPLEQ_REMOVE_HEAD(&exp->requests, entry); - } + req = g_slice_new0(NBDRequest); nbd_client_get(client); req->client = client; return req; @@ -666,7 +857,12 @@ static NBDRequest *nbd_request_get(NBDClient *client) static void nbd_request_put(NBDRequest *req) { NBDClient *client = req->client; - QSIMPLEQ_INSERT_HEAD(&client->exp->requests, req, entry); + + if (req->data) { + qemu_vfree(req->data); + } + g_slice_free(NBDRequest, req); + if (client->nb_requests-- == MAX_NBD_REQUESTS) { qemu_notify_event(); } @@ -674,40 +870,113 @@ static void nbd_request_put(NBDRequest *req) } NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, - off_t size, uint32_t nbdflags) + off_t size, uint32_t nbdflags, + void (*close)(NBDExport *)) { NBDExport *exp = g_malloc0(sizeof(NBDExport)); - QSIMPLEQ_INIT(&exp->requests); + exp->refcount = 1; + QTAILQ_INIT(&exp->clients); exp->bs = bs; exp->dev_offset = dev_offset; exp->nbdflags = nbdflags; - exp->size = size == -1 ? exp->bs->total_sectors * 512 : size; + exp->size = size == -1 ? bdrv_getlength(bs) : size; + exp->close = close; return exp; } +NBDExport *nbd_export_find(const char *name) +{ + NBDExport *exp; + QTAILQ_FOREACH(exp, &exports, next) { + if (strcmp(name, exp->name) == 0) { + return exp; + } + } + + return NULL; +} + +void nbd_export_set_name(NBDExport *exp, const char *name) +{ + if (exp->name == name) { + return; + } + + nbd_export_get(exp); + if (exp->name != NULL) { + g_free(exp->name); + exp->name = NULL; + QTAILQ_REMOVE(&exports, exp, next); + nbd_export_put(exp); + } + if (name != NULL) { + nbd_export_get(exp); + exp->name = g_strdup(name); + QTAILQ_INSERT_TAIL(&exports, exp, next); + } + nbd_export_put(exp); +} + void nbd_export_close(NBDExport *exp) { - while (!QSIMPLEQ_EMPTY(&exp->requests)) { - NBDRequest *first = QSIMPLEQ_FIRST(&exp->requests); - QSIMPLEQ_REMOVE_HEAD(&exp->requests, entry); - qemu_vfree(first->data); - g_free(first); + NBDClient *client, *next; + + nbd_export_get(exp); + QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) { + nbd_client_close(client); } + nbd_export_set_name(exp, NULL); + nbd_export_put(exp); +} - bdrv_close(exp->bs); - g_free(exp); +void nbd_export_get(NBDExport *exp) +{ + assert(exp->refcount > 0); + exp->refcount++; +} + +void nbd_export_put(NBDExport *exp) +{ + assert(exp->refcount > 0); + if (exp->refcount == 1) { + nbd_export_close(exp); + } + + if (--exp->refcount == 0) { + assert(exp->name == NULL); + + if (exp->close) { + exp->close(exp); + } + + g_free(exp); + } +} + +BlockDriverState *nbd_export_get_blockdev(NBDExport *exp) +{ + return exp->bs; +} + +void nbd_export_close_all(void) +{ + NBDExport *exp, *next; + + QTAILQ_FOREACH_SAFE(exp, &exports, next, next) { + nbd_export_close(exp); + } } static int nbd_can_read(void *opaque); static void nbd_read(void *opaque); static void nbd_restart_write(void *opaque); -static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, - int len) +static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, + int len) { NBDClient *client = req->client; int csock = client->sock; - int rc, ret; + ssize_t rc, ret; qemu_co_mutex_lock(&client->send_lock); qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, @@ -716,22 +985,15 @@ static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, if (!len) { rc = nbd_send_reply(csock, reply); - if (rc == -1) { - rc = -errno; - } } else { socket_set_cork(csock, 1); rc = nbd_send_reply(csock, reply); - if (rc != -1) { + if (rc >= 0) { ret = qemu_co_send(csock, req->data, len); if (ret != len) { - errno = EIO; - rc = -1; + rc = -EIO; } } - if (rc == -1) { - rc = -errno; - } socket_set_cork(csock, 0); } @@ -741,21 +1003,25 @@ static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, return rc; } -static int nbd_co_receive_request(NBDRequest *req, struct nbd_request *request) +static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *request) { NBDClient *client = req->client; int csock = client->sock; - int rc; + uint32_t command; + ssize_t rc; client->recv_coroutine = qemu_coroutine_self(); - if (nbd_receive_request(csock, request) == -1) { - rc = -EIO; + rc = nbd_receive_request(csock, request); + if (rc < 0) { + if (rc != -EAGAIN) { + rc = -EIO; + } goto out; } - if (request->len > NBD_BUFFER_SIZE) { + if (request->len > NBD_MAX_BUFFER_SIZE) { LOG("len (%u) is larger than max len (%u)", - request->len, NBD_BUFFER_SIZE); + request->len, NBD_MAX_BUFFER_SIZE); rc = -EINVAL; goto out; } @@ -769,7 +1035,11 @@ static int nbd_co_receive_request(NBDRequest *req, struct nbd_request *request) TRACE("Decoding type"); - if ((request->type & NBD_CMD_MASK_COMMAND) == NBD_CMD_WRITE) { + command = request->type & NBD_CMD_MASK_COMMAND; + if (command == NBD_CMD_READ || command == NBD_CMD_WRITE) { + req->data = qemu_blockalign(client->exp->bs, request->len); + } + if (command == NBD_CMD_WRITE) { TRACE("Reading %u byte(s)", request->len); if (qemu_co_recv(csock, req->data, request->len) != request->len) { @@ -788,15 +1058,22 @@ out: static void nbd_trip(void *opaque) { NBDClient *client = opaque; - NBDRequest *req = nbd_request_get(client); NBDExport *exp = client->exp; + NBDRequest *req; struct nbd_request request; struct nbd_reply reply; - int ret; + ssize_t ret; TRACE("Reading request."); + if (client->closing) { + return; + } + req = nbd_request_get(client); ret = nbd_co_receive_request(req, &request); + if (ret == -EAGAIN) { + goto done; + } if (ret == -EIO) { goto out; } @@ -822,6 +1099,15 @@ static void nbd_trip(void *opaque) case NBD_CMD_READ: TRACE("Request type is READ"); + if (request.type & NBD_CMD_FLAG_FUA) { + ret = bdrv_co_flush(exp->bs); + if (ret < 0) { + LOG("flush failed"); + reply.error = -ret; + goto error_reply; + } + } + ret = bdrv_read(exp->bs, (request.from + exp->dev_offset) / 512, req->data, request.len / 512); if (ret < 0) { @@ -862,8 +1148,9 @@ static void nbd_trip(void *opaque) } } - if (nbd_co_send_reply(req, &reply, 0) < 0) + if (nbd_co_send_reply(req, &reply, 0) < 0) { goto out; + } break; case NBD_CMD_DISC: TRACE("Request type is DISCONNECT"); @@ -877,9 +1164,9 @@ static void nbd_trip(void *opaque) LOG("flush failed"); reply.error = -ret; } - - if (nbd_co_send_reply(req, &reply, 0) < 0) + if (nbd_co_send_reply(req, &reply, 0) < 0) { goto out; + } break; case NBD_CMD_TRIM: TRACE("Request type is TRIM"); @@ -889,21 +1176,24 @@ static void nbd_trip(void *opaque) LOG("discard failed"); reply.error = -ret; } - if (nbd_co_send_reply(req, &reply, 0) < 0) + if (nbd_co_send_reply(req, &reply, 0) < 0) { goto out; + } break; default: LOG("invalid request type (%u) received", request.type); invalid_request: reply.error = -EINVAL; error_reply: - if (nbd_co_send_reply(req, &reply, 0) == -1) + if (nbd_co_send_reply(req, &reply, 0) < 0) { goto out; + } break; } TRACE("Request/Reply complete"); +done: nbd_request_put(req); return; @@ -941,15 +1231,21 @@ NBDClient *nbd_client_new(NBDExport *exp, int csock, void (*close)(NBDClient *)) { NBDClient *client; - if (nbd_send_negotiate(csock, exp->size, exp->nbdflags) == -1) { - return NULL; - } client = g_malloc0(sizeof(NBDClient)); client->refcount = 1; client->exp = exp; client->sock = csock; + if (nbd_send_negotiate(client) < 0) { + g_free(client); + return NULL; + } client->close = close; qemu_co_mutex_init(&client->send_lock); qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client); + + if (exp) { + QTAILQ_INSERT_TAIL(&exp->clients, client, next); + nbd_export_get(exp); + } return client; }