#include "nbd.h"
#include "block.h"
+#include "block_int.h"
+
+#include "qemu-coroutine.h"
#include <errno.h>
#include <string.h>
#endif
#include "qemu_socket.h"
+#include "qemu-queue.h"
//#define DEBUG_NBD
Request (type == 2)
*/
-int nbd_negotiate(int csock, off_t size, uint32_t flags)
+static int nbd_send_negotiate(int csock, off_t size, uint32_t flags)
{
char buf[8 + 8 + 8 + 128];
return 0;
}
-int nbd_trip(BlockDriverState *bs, int csock, off_t size,
- uint64_t dev_offset, uint32_t nbdflags,
- uint8_t *data)
+#define MAX_NBD_REQUESTS 16
+
+typedef struct NBDRequest NBDRequest;
+
+struct NBDRequest {
+ QSIMPLEQ_ENTRY(NBDRequest) entry;
+ NBDClient *client;
+ uint8_t *data;
+};
+
+struct NBDExport {
+ BlockDriverState *bs;
+ off_t dev_offset;
+ off_t size;
+ uint32_t nbdflags;
+ QSIMPLEQ_HEAD(, NBDRequest) requests;
+};
+
+struct NBDClient {
+ int refcount;
+ void (*close)(NBDClient *client);
+
+ NBDExport *exp;
+ int sock;
+
+ Coroutine *recv_coroutine;
+
+ CoMutex send_lock;
+ Coroutine *send_coroutine;
+
+ int nb_requests;
+};
+
+static void nbd_client_get(NBDClient *client)
{
- struct nbd_request request;
- struct nbd_reply reply;
- int ret;
+ client->refcount++;
+}
- TRACE("Reading request.");
+static void nbd_client_put(NBDClient *client)
+{
+ if (--client->refcount == 0) {
+ g_free(client);
+ }
+}
- if (nbd_receive_request(csock, &request) == -1)
- return -1;
+static void nbd_client_close(NBDClient *client)
+{
+ qemu_set_fd_handler2(client->sock, NULL, NULL, NULL, NULL);
+ close(client->sock);
+ client->sock = -1;
+ if (client->close) {
+ client->close(client);
+ }
+ nbd_client_put(client);
+}
+
+static NBDRequest *nbd_request_get(NBDClient *client)
+{
+ NBDRequest *req;
+ NBDExport *exp = client->exp;
+
+ assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
+ client->nb_requests++;
+
+ if (QSIMPLEQ_EMPTY(&exp->requests)) {
+ req = g_malloc0(sizeof(NBDRequest));
+ req->data = qemu_blockalign(exp->bs, NBD_BUFFER_SIZE);
+ } else {
+ req = QSIMPLEQ_FIRST(&exp->requests);
+ QSIMPLEQ_REMOVE_HEAD(&exp->requests, entry);
+ }
+ nbd_client_get(client);
+ req->client = client;
+ return req;
+}
+
+static void nbd_request_put(NBDRequest *req)
+{
+ NBDClient *client = req->client;
+ QSIMPLEQ_INSERT_HEAD(&client->exp->requests, req, entry);
+ if (client->nb_requests-- == MAX_NBD_REQUESTS) {
+ qemu_notify_event();
+ }
+ nbd_client_put(client);
+}
+
+NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset,
+ off_t size, uint32_t nbdflags)
+{
+ NBDExport *exp = g_malloc0(sizeof(NBDExport));
+ QSIMPLEQ_INIT(&exp->requests);
+ exp->bs = bs;
+ exp->dev_offset = dev_offset;
+ exp->nbdflags = nbdflags;
+ exp->size = size == -1 ? exp->bs->total_sectors * 512 : size;
+ return exp;
+}
+
+void nbd_export_close(NBDExport *exp)
+{
+ while (!QSIMPLEQ_EMPTY(&exp->requests)) {
+ NBDRequest *first = QSIMPLEQ_FIRST(&exp->requests);
+ QSIMPLEQ_REMOVE_HEAD(&exp->requests, entry);
+ qemu_vfree(first->data);
+ g_free(first);
+ }
+
+ bdrv_close(exp->bs);
+ g_free(exp);
+}
+
+static int nbd_can_read(void *opaque);
+static void nbd_read(void *opaque);
+static void nbd_restart_write(void *opaque);
+
+static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
+ int len)
+{
+ NBDClient *client = req->client;
+ int csock = client->sock;
+ int rc, ret;
+
+ qemu_co_mutex_lock(&client->send_lock);
+ qemu_set_fd_handler2(csock, nbd_can_read, nbd_read,
+ nbd_restart_write, client);
+ client->send_coroutine = qemu_coroutine_self();
+
+ if (!len) {
+ rc = nbd_send_reply(csock, reply);
+ if (rc == -1) {
+ rc = -errno;
+ }
+ } else {
+ socket_set_cork(csock, 1);
+ rc = nbd_send_reply(csock, reply);
+ if (rc != -1) {
+ ret = qemu_co_send(csock, req->data, len);
+ if (ret != len) {
+ errno = EIO;
+ rc = -1;
+ }
+ }
+ if (rc == -1) {
+ rc = -errno;
+ }
+ socket_set_cork(csock, 0);
+ }
+
+ client->send_coroutine = NULL;
+ qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
+ qemu_co_mutex_unlock(&client->send_lock);
+ return rc;
+}
- if (request.len > NBD_BUFFER_SIZE) {
+static int nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
+{
+ NBDClient *client = req->client;
+ int csock = client->sock;
+ int rc;
+
+ client->recv_coroutine = qemu_coroutine_self();
+ if (nbd_receive_request(csock, request) == -1) {
+ rc = -EIO;
+ goto out;
+ }
+
+ if (request->len > NBD_BUFFER_SIZE) {
LOG("len (%u) is larger than max len (%u)",
- request.len, NBD_BUFFER_SIZE);
- errno = EINVAL;
- return -1;
+ request->len, NBD_BUFFER_SIZE);
+ rc = -EINVAL;
+ goto out;
}
- if ((request.from + request.len) < request.from) {
+ if ((request->from + request->len) < request->from) {
LOG("integer overflow detected! "
"you're probably being attacked");
- errno = EINVAL;
- return -1;
+ rc = -EINVAL;
+ goto out;
}
- if ((request.from + request.len) > size) {
- LOG("From: %" PRIu64 ", Len: %u, Size: %" PRIu64
- ", Offset: %" PRIu64 "\n",
- request.from, request.len, (uint64_t)size, dev_offset);
- LOG("requested operation past EOF--bad client?");
- errno = EINVAL;
- return -1;
+ TRACE("Decoding type");
+
+ if ((request->type & NBD_CMD_MASK_COMMAND) == NBD_CMD_WRITE) {
+ TRACE("Reading %u byte(s)", request->len);
+
+ if (qemu_co_recv(csock, req->data, request->len) != request->len) {
+ LOG("reading from socket failed");
+ rc = -EIO;
+ goto out;
+ }
}
+ rc = 0;
- TRACE("Decoding type");
+out:
+ client->recv_coroutine = NULL;
+ return rc;
+}
+
+static void nbd_trip(void *opaque)
+{
+ NBDClient *client = opaque;
+ NBDRequest *req = nbd_request_get(client);
+ NBDExport *exp = client->exp;
+ struct nbd_request request;
+ struct nbd_reply reply;
+ int ret;
+
+ TRACE("Reading request.");
+
+ ret = nbd_co_receive_request(req, &request);
+ if (ret == -EIO) {
+ goto out;
+ }
reply.handle = request.handle;
reply.error = 0;
+ if (ret < 0) {
+ reply.error = -ret;
+ goto error_reply;
+ }
+
+ if ((request.from + request.len) > exp->size) {
+ LOG("From: %" PRIu64 ", Len: %u, Size: %" PRIu64
+ ", Offset: %" PRIu64 "\n",
+ request.from, request.len,
+ (uint64_t)exp->size, exp->dev_offset);
+ LOG("requested operation past EOF--bad client?");
+ goto invalid_request;
+ }
+
switch (request.type & NBD_CMD_MASK_COMMAND) {
case NBD_CMD_READ:
TRACE("Request type is READ");
- ret = bdrv_read(bs, (request.from + dev_offset) / 512,
- data, request.len / 512);
+ ret = bdrv_read(exp->bs, (request.from + exp->dev_offset) / 512,
+ req->data, request.len / 512);
if (ret < 0) {
LOG("reading from file failed");
reply.error = -ret;
- request.len = 0;
+ goto error_reply;
}
TRACE("Read %u byte(s)", request.len);
- socket_set_cork(csock, 1);
- if (nbd_send_reply(csock, &reply) == -1)
- return -1;
-
- TRACE("Sending data to client");
-
- if (write_sync(csock, data, request.len) != request.len) {
- LOG("writing to socket failed");
- errno = EINVAL;
- return -1;
- }
- socket_set_cork(csock, 0);
+ if (nbd_co_send_reply(req, &reply, request.len) < 0)
+ goto out;
break;
case NBD_CMD_WRITE:
TRACE("Request type is WRITE");
- TRACE("Reading %u byte(s)", request.len);
-
- if (read_sync(csock, data, request.len) != request.len) {
- LOG("reading from socket failed");
- errno = EINVAL;
- return -1;
+ if (exp->nbdflags & NBD_FLAG_READ_ONLY) {
+ TRACE("Server is read-only, return error");
+ reply.error = EROFS;
+ goto error_reply;
}
- if (nbdflags & NBD_FLAG_READ_ONLY) {
- TRACE("Server is read-only, return error");
- reply.error = 1;
- } else {
- TRACE("Writing to device");
+ TRACE("Writing to device");
- ret = bdrv_write(bs, (request.from + dev_offset) / 512,
- data, request.len / 512);
+ ret = bdrv_write(exp->bs, (request.from + exp->dev_offset) / 512,
+ req->data, request.len / 512);
+ if (ret < 0) {
+ LOG("writing to file failed");
+ reply.error = -ret;
+ goto error_reply;
+ }
+
+ if (request.type & NBD_CMD_FLAG_FUA) {
+ ret = bdrv_co_flush(exp->bs);
if (ret < 0) {
- LOG("writing to file failed");
+ LOG("flush failed");
reply.error = -ret;
- request.len = 0;
- }
-
- if (request.type & NBD_CMD_FLAG_FUA) {
- ret = bdrv_flush(bs);
- if (ret < 0) {
- LOG("flush failed");
- reply.error = -ret;
- }
+ goto error_reply;
}
}
- if (nbd_send_reply(csock, &reply) == -1)
- return -1;
+ if (nbd_co_send_reply(req, &reply, 0) < 0)
+ goto out;
break;
case NBD_CMD_DISC:
TRACE("Request type is DISCONNECT");
errno = 0;
- return 1;
+ goto out;
case NBD_CMD_FLUSH:
TRACE("Request type is FLUSH");
- ret = bdrv_flush(bs);
+ ret = bdrv_co_flush(exp->bs);
if (ret < 0) {
LOG("flush failed");
reply.error = -ret;
}
- if (nbd_send_reply(csock, &reply) == -1)
- return -1;
+ if (nbd_co_send_reply(req, &reply, 0) < 0)
+ goto out;
break;
case NBD_CMD_TRIM:
TRACE("Request type is TRIM");
- ret = bdrv_discard(bs, (request.from + dev_offset) / 512,
- request.len / 512);
+ ret = bdrv_co_discard(exp->bs, (request.from + exp->dev_offset) / 512,
+ request.len / 512);
if (ret < 0) {
LOG("discard failed");
reply.error = -ret;
}
- if (nbd_send_reply(csock, &reply) == -1)
- return -1;
+ if (nbd_co_send_reply(req, &reply, 0) < 0)
+ goto out;
break;
default:
LOG("invalid request type (%u) received", request.type);
- errno = EINVAL;
- return -1;
+ invalid_request:
+ reply.error = -EINVAL;
+ error_reply:
+ if (nbd_co_send_reply(req, &reply, 0) == -1)
+ goto out;
+ break;
}
TRACE("Request/Reply complete");
- return 0;
+ nbd_request_put(req);
+ return;
+
+out:
+ nbd_request_put(req);
+ nbd_client_close(client);
+}
+
+static int nbd_can_read(void *opaque)
+{
+ NBDClient *client = opaque;
+
+ return client->recv_coroutine || client->nb_requests < MAX_NBD_REQUESTS;
+}
+
+static void nbd_read(void *opaque)
+{
+ NBDClient *client = opaque;
+
+ if (client->recv_coroutine) {
+ qemu_coroutine_enter(client->recv_coroutine, NULL);
+ } else {
+ qemu_coroutine_enter(qemu_coroutine_create(nbd_trip), client);
+ }
+}
+
+static void nbd_restart_write(void *opaque)
+{
+ NBDClient *client = opaque;
+
+ qemu_coroutine_enter(client->send_coroutine, NULL);
+}
+
+NBDClient *nbd_client_new(NBDExport *exp, int csock,
+ void (*close)(NBDClient *))
+{
+ NBDClient *client;
+ if (nbd_send_negotiate(csock, exp->size, exp->nbdflags) == -1) {
+ return NULL;
+ }
+ client = g_malloc0(sizeof(NBDClient));
+ client->refcount = 1;
+ client->exp = exp;
+ client->sock = csock;
+ client->close = close;
+ qemu_co_mutex_init(&client->send_lock);
+ qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
+ return client;
}