]> git.proxmox.com Git - mirror_qemu.git/blobdiff - nbd/server.c
nbd: convert to use qio_channel_yield
[mirror_qemu.git] / nbd / server.c
index 5b762616664d1cc291ce303c1fa7718ad28eed92..ac92fa072751262ca0715504f1232b5c2e745470 100644 (file)
@@ -95,8 +95,6 @@ struct NBDClient {
     CoMutex send_lock;
     Coroutine *send_coroutine;
 
-    bool can_read;
-
     QTAILQ_ENTRY(NBDClient) next;
     int nb_requests;
     bool closing;
@@ -104,9 +102,7 @@ struct NBDClient {
 
 /* That's all folks */
 
-static void nbd_set_handlers(NBDClient *client);
-static void nbd_unset_handlers(NBDClient *client);
-static void nbd_update_can_read(NBDClient *client);
+static void nbd_client_receive_next_request(NBDClient *client);
 
 static gboolean nbd_negotiate_continue(QIOChannel *ioc,
                                        GIOCondition condition,
@@ -785,7 +781,7 @@ void nbd_client_put(NBDClient *client)
          */
         assert(client->closing);
 
-        nbd_unset_handlers(client);
+        qio_channel_detach_aio_context(client->ioc);
         object_unref(OBJECT(client->sioc));
         object_unref(OBJECT(client->ioc));
         if (client->tlscreds) {
@@ -826,7 +822,6 @@ static NBDRequestData *nbd_request_get(NBDClient *client)
 
     assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
     client->nb_requests++;
-    nbd_update_can_read(client);
 
     req = g_new0(NBDRequestData, 1);
     nbd_client_get(client);
@@ -844,7 +839,8 @@ static void nbd_request_put(NBDRequestData *req)
     g_free(req);
 
     client->nb_requests--;
-    nbd_update_can_read(client);
+    nbd_client_receive_next_request(client);
+
     nbd_client_put(client);
 }
 
@@ -858,7 +854,13 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
     exp->ctx = ctx;
 
     QTAILQ_FOREACH(client, &exp->clients, next) {
-        nbd_set_handlers(client);
+        qio_channel_attach_aio_context(client->ioc, ctx);
+        if (client->recv_coroutine) {
+            aio_co_schedule(ctx, client->recv_coroutine);
+        }
+        if (client->send_coroutine) {
+            aio_co_schedule(ctx, client->send_coroutine);
+        }
     }
 }
 
@@ -870,7 +872,7 @@ static void blk_aio_detach(void *opaque)
     TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx);
 
     QTAILQ_FOREACH(client, &exp->clients, next) {
-        nbd_unset_handlers(client);
+        qio_channel_detach_aio_context(client->ioc);
     }
 
     exp->ctx = NULL;
@@ -1045,7 +1047,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req, NBDReply *reply,
     g_assert(qemu_in_coroutine());
     qemu_co_mutex_lock(&client->send_lock);
     client->send_coroutine = qemu_coroutine_self();
-    nbd_set_handlers(client);
 
     if (!len) {
         rc = nbd_send_reply(client->ioc, reply);
@@ -1062,7 +1063,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req, NBDReply *reply,
     }
 
     client->send_coroutine = NULL;
-    nbd_set_handlers(client);
     qemu_co_mutex_unlock(&client->send_lock);
     return rc;
 }
@@ -1079,9 +1079,7 @@ static ssize_t nbd_co_receive_request(NBDRequestData *req,
     ssize_t rc;
 
     g_assert(qemu_in_coroutine());
-    client->recv_coroutine = qemu_coroutine_self();
-    nbd_update_can_read(client);
-
+    assert(client->recv_coroutine == qemu_coroutine_self());
     rc = nbd_receive_request(client->ioc, request);
     if (rc < 0) {
         if (rc != -EAGAIN) {
@@ -1163,23 +1161,25 @@ static ssize_t nbd_co_receive_request(NBDRequestData *req,
 
 out:
     client->recv_coroutine = NULL;
-    nbd_update_can_read(client);
+    nbd_client_receive_next_request(client);
 
     return rc;
 }
 
-static void nbd_trip(void *opaque)
+/* Owns a reference to the NBDClient passed as opaque.  */
+static coroutine_fn void nbd_trip(void *opaque)
 {
     NBDClient *client = opaque;
     NBDExport *exp = client->exp;
     NBDRequestData *req;
-    NBDRequest request;
+    NBDRequest request = { 0 };    /* GCC thinks it can be used uninitialized */
     NBDReply reply;
     ssize_t ret;
     int flags;
 
     TRACE("Reading request.");
     if (client->closing) {
+        nbd_client_put(client);
         return;
     }
 
@@ -1338,61 +1338,21 @@ static void nbd_trip(void *opaque)
 
 done:
     nbd_request_put(req);
+    nbd_client_put(client);
     return;
 
 out:
     nbd_request_put(req);
     client_close(client);
+    nbd_client_put(client);
 }
 
-static void nbd_read(void *opaque)
-{
-    NBDClient *client = opaque;
-
-    if (client->recv_coroutine) {
-        qemu_coroutine_enter(client->recv_coroutine);
-    } else {
-        qemu_coroutine_enter(qemu_coroutine_create(nbd_trip, client));
-    }
-}
-
-static void nbd_restart_write(void *opaque)
-{
-    NBDClient *client = opaque;
-
-    qemu_coroutine_enter(client->send_coroutine);
-}
-
-static void nbd_set_handlers(NBDClient *client)
-{
-    if (client->exp && client->exp->ctx) {
-        aio_set_fd_handler(client->exp->ctx, client->sioc->fd,
-                           true,
-                           client->can_read ? nbd_read : NULL,
-                           client->send_coroutine ? nbd_restart_write : NULL,
-                           client);
-    }
-}
-
-static void nbd_unset_handlers(NBDClient *client)
-{
-    if (client->exp && client->exp->ctx) {
-        aio_set_fd_handler(client->exp->ctx, client->sioc->fd,
-                           true, NULL, NULL, NULL);
-    }
-}
-
-static void nbd_update_can_read(NBDClient *client)
+static void nbd_client_receive_next_request(NBDClient *client)
 {
-    bool can_read = client->recv_coroutine ||
-                    client->nb_requests < MAX_NBD_REQUESTS;
-
-    if (can_read != client->can_read) {
-        client->can_read = can_read;
-        nbd_set_handlers(client);
-
-        /* There is no need to invoke aio_notify(), since aio_set_fd_handler()
-         * in nbd_set_handlers() will have taken care of that */
+    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
+        nbd_client_get(client);
+        client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
+        aio_co_schedule(client->exp->ctx, client->recv_coroutine);
     }
 }
 
@@ -1410,11 +1370,13 @@ static coroutine_fn void nbd_co_client_start(void *opaque)
         goto out;
     }
     qemu_co_mutex_init(&client->send_lock);
-    nbd_set_handlers(client);
 
     if (exp) {
         QTAILQ_INSERT_TAIL(&exp->clients, client, next);
     }
+
+    nbd_client_receive_next_request(client);
+
 out:
     g_free(data);
 }
@@ -1440,7 +1402,6 @@ void nbd_client_new(NBDExport *exp,
     object_ref(OBJECT(client->sioc));
     client->ioc = QIO_CHANNEL(sioc);
     object_ref(OBJECT(client->ioc));
-    client->can_read = true;
     client->close = close_fn;
 
     data->client = client;