]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/spdk/lib/jsonrpc/jsonrpc_server_tcp.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / spdk / lib / jsonrpc / jsonrpc_server_tcp.c
index c69d7483c2341f76c6f73c1ced17dcee26c8735c..70b7a56b94ad57d133622fa3ea5603eea7859f4e 100644 (file)
@@ -33,6 +33,7 @@
 
 #include "jsonrpc_internal.h"
 #include "spdk/string.h"
+#include "spdk/util.h"
 
 struct spdk_jsonrpc_server *
 spdk_jsonrpc_server_listen(int domain, int protocol,
@@ -97,6 +98,21 @@ spdk_jsonrpc_server_listen(int domain, int protocol,
        return server;
 }
 
+static void
+spdk_jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn)
+{
+       conn->closed = true;
+
+       if (conn->sockfd >= 0) {
+               close(conn->sockfd);
+               conn->sockfd = -1;
+
+               if (conn->close_cb) {
+                       conn->close_cb(conn, conn->close_cb_ctx);
+               }
+       }
+}
+
 void
 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server)
 {
@@ -105,23 +121,12 @@ spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server)
        close(server->sockfd);
 
        TAILQ_FOREACH(conn, &server->conns, link) {
-               close(conn->sockfd);
+               spdk_jsonrpc_server_conn_close(conn);
        }
 
        free(server);
 }
 
-static void
-spdk_jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn)
-{
-       conn->closed = true;
-
-       if (conn->sockfd >= 0) {
-               close(conn->sockfd);
-               conn->sockfd = -1;
-       }
-}
-
 static void
 spdk_jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn)
 {
@@ -136,6 +141,41 @@ spdk_jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn)
        TAILQ_INSERT_HEAD(&server->free_conns, conn, link);
 }
 
+int
+spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn,
+                              spdk_jsonrpc_conn_closed_fn cb, void *ctx)
+{
+       int rc = 0;
+
+       pthread_spin_lock(&conn->queue_lock);
+       if (conn->close_cb == NULL) {
+               conn->close_cb = cb;
+               conn->close_cb_ctx = ctx;
+       } else {
+               rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC;
+       }
+       pthread_spin_unlock(&conn->queue_lock);
+
+       return rc;
+}
+
+int
+spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn,
+                              spdk_jsonrpc_conn_closed_fn cb, void *ctx)
+{
+       int rc = 0;
+
+       pthread_spin_lock(&conn->queue_lock);
+       if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) {
+               rc = -ENOENT;
+       } else {
+               conn->close_cb = NULL;
+       }
+       pthread_spin_unlock(&conn->queue_lock);
+
+       return rc;
+}
+
 static int
 spdk_jsonrpc_server_accept(struct spdk_jsonrpc_server *server)
 {
@@ -220,7 +260,7 @@ spdk_jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error
 static int
 spdk_jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn)
 {
-       ssize_t rc;
+       ssize_t rc, offset;
        size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len;
 
        rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0);
@@ -234,25 +274,31 @@ spdk_jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn)
 
        if (rc == 0) {
                SPDK_DEBUGLOG(SPDK_LOG_RPC, "remote closed connection\n");
-               return -1;
+               conn->closed = true;
+               return 0;
        }
 
        conn->recv_len += rc;
 
-       rc = spdk_jsonrpc_parse_request(conn, conn->recv_buf, conn->recv_len);
-       if (rc < 0) {
-               SPDK_ERRLOG("jsonrpc parse request failed\n");
-               return -1;
-       }
+       offset = 0;
+       do {
+               rc = spdk_jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset);
+               if (rc < 0) {
+                       SPDK_ERRLOG("jsonrpc parse request failed\n");
+                       return -1;
+               }
 
-       if (rc > 0) {
+               offset += rc;
+       } while (rc > 0);
+
+       if (offset > 0) {
                /*
-                * Successfully parsed a request - move any data past the end of the
-                * parsed request down to the beginning.
+                * Successfully parsed a requests - move any data past the end of the
+                * parsed requests down to the beginning.
                 */
-               assert((size_t)rc <= conn->recv_len);
-               memmove(conn->recv_buf, conn->recv_buf + rc, conn->recv_len - rc);
-               conn->recv_len -= rc;
+               assert((size_t)offset <= conn->recv_len);
+               memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset);
+               conn->recv_len -= offset;
        }
 
        return 0;
@@ -341,7 +387,12 @@ spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server)
        struct spdk_jsonrpc_server_conn *conn, *conn_tmp;
 
        TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) {
-               if (conn->closed) {
+               /* If we can't receive and there are no outstanding requests close the connection. */
+               if (conn->closed == true && conn->outstanding_requests == 0) {
+                       spdk_jsonrpc_server_conn_close(conn);
+               }
+
+               if (conn->sockfd == -1) {
                        struct spdk_jsonrpc_request *request;
 
                        /*
@@ -351,10 +402,8 @@ spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server)
                         * the connection is closed).
                         */
 
-                       if (conn->send_request) {
-                               spdk_jsonrpc_free_request(conn->send_request);
-                               conn->send_request = NULL;
-                       }
+                       spdk_jsonrpc_free_request(conn->send_request);
+                       conn->send_request = NULL;
 
                        while ((request = spdk_jsonrpc_server_dequeue_request(conn)) != NULL) {
                                spdk_jsonrpc_free_request(request);
@@ -373,7 +422,7 @@ spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server)
        }
 
        TAILQ_FOREACH(conn, &server->conns, link) {
-               if (conn->closed) {
+               if (conn->sockfd == -1) {
                        continue;
                }
 
@@ -383,10 +432,11 @@ spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server)
                        continue;
                }
 
-               rc = spdk_jsonrpc_server_conn_recv(conn);
-               if (rc != 0) {
-                       spdk_jsonrpc_server_conn_close(conn);
-                       continue;
+               if (!conn->closed) {
+                       rc = spdk_jsonrpc_server_conn_recv(conn);
+                       if (rc != 0) {
+                               spdk_jsonrpc_server_conn_close(conn);
+                       }
                }
        }