return 0;
}
+#define MAX_NBD_REQUESTS 16
+
typedef struct NBDRequest NBDRequest;
struct NBDRequest {
CoMutex send_lock;
Coroutine *send_coroutine;
+
+ int nb_requests;
};
static void nbd_client_get(NBDClient *client)
NBDRequest *req;
NBDExport *exp = client->exp;
+ assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
+ client->nb_requests++;
+
if (QSIMPLEQ_EMPTY(&exp->requests)) {
req = g_malloc0(sizeof(NBDRequest));
req->data = qemu_blockalign(exp->bs, NBD_BUFFER_SIZE);
{
NBDClient *client = req->client;
QSIMPLEQ_INSERT_HEAD(&client->exp->requests, req, entry);
+ if (client->nb_requests-- == MAX_NBD_REQUESTS) {
+ qemu_notify_event();
+ }
nbd_client_put(client);
}
g_free(exp);
}
+static int nbd_can_read(void *opaque);
static void nbd_read(void *opaque);
static void nbd_restart_write(void *opaque);
int rc, ret;
qemu_co_mutex_lock(&client->send_lock);
- qemu_set_fd_handler2(csock, NULL, nbd_read, nbd_restart_write, client);
+ qemu_set_fd_handler2(csock, nbd_can_read, nbd_read,
+ nbd_restart_write, client);
client->send_coroutine = qemu_coroutine_self();
if (!len) {
}
client->send_coroutine = NULL;
- qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client);
+ qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
qemu_co_mutex_unlock(&client->send_lock);
return rc;
}
nbd_client_close(client);
}
+static int nbd_can_read(void *opaque)
+{
+ NBDClient *client = opaque;
+
+ return client->recv_coroutine || client->nb_requests < MAX_NBD_REQUESTS;
+}
+
static void nbd_read(void *opaque)
{
NBDClient *client = opaque;
client->sock = csock;
client->close = close;
qemu_co_mutex_init(&client->send_lock);
- qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client);
+ qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
return client;
}