]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/spdk/lib/jsonrpc/jsonrpc_server_tcp.c
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / spdk / lib / jsonrpc / jsonrpc_server_tcp.c
index 6f8d3c472fc36b8c17738e2e12db2ed4b6da2069..c69d7483c2341f76c6f73c1ced17dcee26c8735c 100644 (file)
@@ -32,6 +32,7 @@
  */
 
 #include "jsonrpc_internal.h"
+#include "spdk/string.h"
 
 struct spdk_jsonrpc_server *
 spdk_jsonrpc_server_listen(int domain, int protocol,
@@ -39,13 +40,20 @@ spdk_jsonrpc_server_listen(int domain, int protocol,
                           spdk_jsonrpc_handle_request_fn handle_request)
 {
        struct spdk_jsonrpc_server *server;
-       int rc, val;
+       int rc, val, flag, i;
 
        server = calloc(1, sizeof(struct spdk_jsonrpc_server));
        if (server == NULL) {
                return NULL;
        }
 
+       TAILQ_INIT(&server->free_conns);
+       TAILQ_INIT(&server->conns);
+
+       for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) {
+               TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link);
+       }
+
        server->handle_request = handle_request;
 
        server->sockfd = socket(domain, SOCK_STREAM, protocol);
@@ -61,10 +69,10 @@ spdk_jsonrpc_server_listen(int domain, int protocol,
                setsockopt(server->sockfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
        }
 
-       val = 1;
-       rc = ioctl(server->sockfd, FIONBIO, &val);
-       if (rc != 0) {
-               SPDK_ERRLOG("ioctl(FIONBIO) failed\n");
+       flag = fcntl(server->sockfd, F_GETFL);
+       if (fcntl(server->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
+               SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
+                           server->sockfd, spdk_strerror(errno));
                close(server->sockfd);
                free(server);
                return NULL;
@@ -72,7 +80,7 @@ spdk_jsonrpc_server_listen(int domain, int protocol,
 
        rc = bind(server->sockfd, listen_addr, addrlen);
        if (rc != 0) {
-               SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", strerror(errno));
+               SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno));
                close(server->sockfd);
                free(server);
                return NULL;
@@ -86,73 +94,78 @@ spdk_jsonrpc_server_listen(int domain, int protocol,
                return NULL;
        }
 
-       /* Put listen socket in pollfds[0] */
-       server->pollfds[0].fd = server->sockfd;
-       server->pollfds[0].events = POLLIN;
-
        return server;
 }
 
 void
 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server)
 {
-       int i;
+       struct spdk_jsonrpc_server_conn *conn;
 
        close(server->sockfd);
 
-       for (i = 0; i < server->num_conns; i++) {
-               close(server->conns[i].sockfd);
+       TAILQ_FOREACH(conn, &server->conns, link) {
+               close(conn->sockfd);
        }
 
        free(server);
 }
 
+static void
+spdk_jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn)
+{
+       conn->closed = true;
+
+       if (conn->sockfd >= 0) {
+               close(conn->sockfd);
+               conn->sockfd = -1;
+       }
+}
+
 static void
 spdk_jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn)
 {
        struct spdk_jsonrpc_server *server = conn->server;
-       int conn_idx = conn - server->conns;
 
-       close(conn->sockfd);
+       spdk_jsonrpc_server_conn_close(conn);
+
+       pthread_spin_destroy(&conn->queue_lock);
+       assert(STAILQ_EMPTY(&conn->send_queue));
 
-       /* Swap conn with the last entry in conns */
-       server->conns[conn_idx] = server->conns[server->num_conns - 1];
-       server->num_conns--;
+       TAILQ_REMOVE(&server->conns, conn, link);
+       TAILQ_INSERT_HEAD(&server->free_conns, conn, link);
 }
 
 static int
 spdk_jsonrpc_server_accept(struct spdk_jsonrpc_server *server)
 {
        struct spdk_jsonrpc_server_conn *conn;
-       struct pollfd *pfd;
-       int rc, conn_idx, nonblock;
+       int rc, flag;
 
        rc = accept(server->sockfd, NULL, NULL);
        if (rc >= 0) {
-               assert(server->num_conns < SPDK_JSONRPC_MAX_CONNS);
-               conn_idx = server->num_conns;
-               conn = &server->conns[conn_idx];
+               conn = TAILQ_FIRST(&server->free_conns);
+               assert(conn != NULL);
+
                conn->server = server;
                conn->sockfd = rc;
+               conn->closed = false;
                conn->recv_len = 0;
-               conn->send_len = 0;
-               conn->json_writer = 0;
-
-               nonblock = 1;
-               rc = ioctl(conn->sockfd, FIONBIO, &nonblock);
-               if (rc != 0) {
-                       SPDK_ERRLOG("ioctl(FIONBIO) failed\n");
+               conn->outstanding_requests = 0;
+               pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE);
+               STAILQ_INIT(&conn->send_queue);
+               conn->send_request = NULL;
+
+               flag = fcntl(conn->sockfd, F_GETFL);
+               if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
+                       SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
+                                   conn->sockfd, spdk_strerror(errno));
                        close(conn->sockfd);
                        return -1;
                }
 
-               /* Add connection to pollfds */
-               pfd = &server->pollfds[conn_idx + 1];
-               pfd->fd = conn->sockfd;
-               pfd->events = POLLIN | POLLOUT;
-
-               server->num_conns++;
-
+               TAILQ_REMOVE(&server->free_conns, conn, link);
+               TAILQ_INSERT_TAIL(&server->conns, conn, link);
                return 0;
        }
 
@@ -163,34 +176,15 @@ spdk_jsonrpc_server_accept(struct spdk_jsonrpc_server *server)
        return -1;
 }
 
-int
-spdk_jsonrpc_server_write_cb(void *cb_ctx, const void *data, size_t size)
-{
-       struct spdk_jsonrpc_server_conn *conn = cb_ctx;
-
-       if (SPDK_JSONRPC_SEND_BUF_SIZE - conn->send_len < size) {
-               SPDK_ERRLOG("Not enough space in send buf\n");
-               return -1;
-       }
-
-       memcpy(conn->send_buf + conn->send_len, data, size);
-       conn->send_len += size;
-
-       return 0;
-}
-
 void
-spdk_jsonrpc_server_handle_request(struct spdk_jsonrpc_server_conn *conn,
-                                  const struct spdk_json_val *method, const struct spdk_json_val *params,
-                                  const struct spdk_json_val *id)
+spdk_jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request,
+                                  const struct spdk_json_val *method, const struct spdk_json_val *params)
 {
-       conn->server->handle_request(conn, method, params, id);
+       request->conn->server->handle_request(request, method, params);
 }
 
 void
-spdk_jsonrpc_server_handle_error(struct spdk_jsonrpc_server_conn *conn, int error,
-                                const struct spdk_json_val *method, const struct spdk_json_val *params,
-                                const struct spdk_json_val *id)
+spdk_jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error)
 {
        const char *msg;
 
@@ -220,7 +214,7 @@ spdk_jsonrpc_server_handle_error(struct spdk_jsonrpc_server_conn *conn, int erro
                break;
        }
 
-       spdk_jsonrpc_send_error_response(conn, id, error, msg);
+       spdk_jsonrpc_send_error_response(request, error, msg);
 }
 
 static int
@@ -234,13 +228,12 @@ spdk_jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn)
                if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
                        return 0;
                }
-
-               SPDK_TRACELOG(SPDK_TRACE_RPC, "recv() failed: %s\n", strerror(errno));
+               SPDK_DEBUGLOG(SPDK_LOG_RPC, "recv() failed: %s\n", spdk_strerror(errno));
                return -1;
        }
 
        if (rc == 0) {
-               SPDK_TRACELOG(SPDK_TRACE_RPC, "remote closed connection\n");
+               SPDK_DEBUGLOG(SPDK_LOG_RPC, "remote closed connection\n");
                return -1;
        }
 
@@ -265,27 +258,78 @@ spdk_jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn)
        return 0;
 }
 
+void
+spdk_jsonrpc_server_send_response(struct spdk_jsonrpc_request *request)
+{
+       struct spdk_jsonrpc_server_conn *conn = request->conn;
+
+       /* Queue the response to be sent */
+       pthread_spin_lock(&conn->queue_lock);
+       STAILQ_INSERT_TAIL(&conn->send_queue, request, link);
+       pthread_spin_unlock(&conn->queue_lock);
+}
+
+static struct spdk_jsonrpc_request *
+spdk_jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn)
+{
+       struct spdk_jsonrpc_request *request = NULL;
+
+       pthread_spin_lock(&conn->queue_lock);
+       request = STAILQ_FIRST(&conn->send_queue);
+       if (request) {
+               STAILQ_REMOVE_HEAD(&conn->send_queue, link);
+       }
+       pthread_spin_unlock(&conn->queue_lock);
+       return request;
+}
+
+
 static int
 spdk_jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn)
 {
+       struct spdk_jsonrpc_request *request;
        ssize_t rc;
 
-       rc = send(conn->sockfd, conn->send_buf, conn->send_len, 0);
-       if (rc < 0) {
-               if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
-                       return 0;
-               }
+more:
+       if (conn->outstanding_requests == 0) {
+               return 0;
+       }
 
-               SPDK_TRACELOG(SPDK_TRACE_RPC, "send() failed: %s\n", strerror(errno));
-               return -1;
+       if (conn->send_request == NULL) {
+               conn->send_request = spdk_jsonrpc_server_dequeue_request(conn);
        }
 
-       if (rc == 0) {
-               SPDK_TRACELOG(SPDK_TRACE_RPC, "remote closed connection\n");
-               return -1;
+       request = conn->send_request;
+       if (request == NULL) {
+               /* Nothing to send right now */
+               return 0;
+       }
+
+       if (request->send_len > 0) {
+               rc = send(conn->sockfd, request->send_buf + request->send_offset,
+                         request->send_len, 0);
+               if (rc < 0) {
+                       if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+                               return 0;
+                       }
+
+                       SPDK_DEBUGLOG(SPDK_LOG_RPC, "send() failed: %s\n", spdk_strerror(errno));
+                       return -1;
+               }
+
+               request->send_offset += rc;
+               request->send_len -= rc;
        }
 
-       conn->send_len -= rc;
+       if (request->send_len == 0) {
+               /*
+                * Full response has been sent.
+                * Free it and set send_request to NULL to move on to the next queued response.
+                */
+               conn->send_request = NULL;
+               spdk_jsonrpc_free_request(request);
+               goto more;
+       }
 
        return 0;
 }
@@ -293,64 +337,57 @@ spdk_jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn)
 int
 spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server)
 {
-       int rc, i;
-       struct pollfd *pfd;
-       struct spdk_jsonrpc_server_conn *conn;
+       int rc;
+       struct spdk_jsonrpc_server_conn *conn, *conn_tmp;
 
-       rc = poll(server->pollfds, server->num_conns + 1, 0);
+       TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) {
+               if (conn->closed) {
+                       struct spdk_jsonrpc_request *request;
 
-       if (rc < 0) {
-               if (errno == EINTR) {
-                       return 0;
-               }
+                       /*
+                        * The client closed the connection, but there may still be requests
+                        * outstanding; we have no way to cancel outstanding requests, so wait until
+                        * each outstanding request sends a response (which will be discarded, since
+                        * the connection is closed).
+                        */
 
-               SPDK_ERRLOG("jsonrpc poll() failed\n");
-               return -1;
-       }
+                       if (conn->send_request) {
+                               spdk_jsonrpc_free_request(conn->send_request);
+                               conn->send_request = NULL;
+                       }
 
-       if (rc == 0) {
-               /* No sockets are ready */
-               return 0;
+                       while ((request = spdk_jsonrpc_server_dequeue_request(conn)) != NULL) {
+                               spdk_jsonrpc_free_request(request);
+                       }
+
+                       if (conn->outstanding_requests == 0) {
+                               SPDK_DEBUGLOG(SPDK_LOG_RPC, "all outstanding requests completed\n");
+                               spdk_jsonrpc_server_conn_remove(conn);
+                       }
+               }
        }
 
        /* Check listen socket */
-       if (server->num_conns < SPDK_JSONRPC_MAX_CONNS) {
-               pfd = &server->pollfds[0];
-               if (pfd->revents) {
-                       spdk_jsonrpc_server_accept(server);
-               }
-               pfd->revents = 0;
+       if (!TAILQ_EMPTY(&server->free_conns)) {
+               spdk_jsonrpc_server_accept(server);
        }
 
-       for (i = 0; i < server->num_conns; i++) {
-               pfd = &server->pollfds[i + 1];
-               conn = &server->conns[i];
-               if (conn->send_len) {
-                       /*
-                        * If there is any data to send, keep sending it until the send buffer
-                        *  is empty.  Each response should be allowed the full send buffer, so
-                        *  don't accept any new requests until the previous response is sent out.
-                        */
-                       if (pfd->revents & POLLOUT) {
-                               rc = spdk_jsonrpc_server_conn_send(conn);
-                               if (rc != 0) {
-                                       SPDK_TRACELOG(SPDK_TRACE_RPC, "closing conn due to send failure\n");
-                                       spdk_jsonrpc_server_conn_remove(conn);
-                               }
-                       }
-               } else {
-                       /*
-                        * No data to send - we can receive a new request.
-                        */
-                       if (pfd->revents & POLLIN) {
-                               rc = spdk_jsonrpc_server_conn_recv(conn);
-                               if (rc != 0) {
-                                       SPDK_TRACELOG(SPDK_TRACE_RPC, "closing conn due to recv failure\n");
-                                       spdk_jsonrpc_server_conn_remove(conn);
-                               }
-                       }
+       TAILQ_FOREACH(conn, &server->conns, link) {
+               if (conn->closed) {
+                       continue;
+               }
+
+               rc = spdk_jsonrpc_server_conn_send(conn);
+               if (rc != 0) {
+                       spdk_jsonrpc_server_conn_close(conn);
+                       continue;
+               }
+
+               rc = spdk_jsonrpc_server_conn_recv(conn);
+               if (rc != 0) {
+                       spdk_jsonrpc_server_conn_close(conn);
+                       continue;
                }
-               pfd->revents = 0;
        }
 
        return 0;