]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/spdk/lib/jsonrpc/jsonrpc_client_tcp.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / spdk / lib / jsonrpc / jsonrpc_client_tcp.c
index a7696c8446c77ccb9afc9f984b14ea2519fbb8be..d3638c6335db024a14c898db4c74d67907bd8460 100644 (file)
  */
 #include "spdk/string.h"
 #include "jsonrpc_internal.h"
+#include "spdk/util.h"
 
 #define RPC_DEFAULT_PORT       "5260"
 
-static struct spdk_jsonrpc_client *
-_spdk_jsonrpc_client_connect(int domain, int protocol,
-                            struct sockaddr *server_addr, socklen_t addrlen)
+static int
+_spdk_jsonrpc_client_send_request(struct spdk_jsonrpc_client *client)
+{
+       ssize_t rc;
+       struct spdk_jsonrpc_client_request *request = client->request;
+
+       if (!request) {
+               return 0;
+       }
+
+       if (request->send_len > 0) {
+               rc = send(client->sockfd, request->send_buf + request->send_offset,
+                         request->send_len, 0);
+               if (rc < 0) {
+                       /* For EINTR we pretend that nothing was send. */
+                       if (errno == EINTR) {
+                               rc = 0;
+                       } else {
+                               rc = -errno;
+                               SPDK_ERRLOG("poll() failed (%d): %s\n", errno, spdk_strerror(errno));
+                       }
+
+                       return rc;
+               }
+
+               request->send_offset += rc;
+               request->send_len -= rc;
+       }
+
+       if (request->send_len == 0) {
+               client->request = NULL;
+               spdk_jsonrpc_client_free_request(request);
+       }
+
+       return 0;
+}
+
+static int
+recv_buf_expand(struct spdk_jsonrpc_client *client)
+{
+       uint8_t *new_buf;
+
+       if (client->recv_buf_size * 2 > SPDK_JSONRPC_SEND_BUF_SIZE_MAX) {
+               return -ENOSPC;
+       }
+
+       new_buf = realloc(client->recv_buf, client->recv_buf_size * 2);
+       if (new_buf == NULL) {
+               SPDK_ERRLOG("Resizing recv_buf failed (current size %zu, new size %zu)\n",
+                           client->recv_buf_size, client->recv_buf_size * 2);
+               return -ENOMEM;
+       }
+
+       client->recv_buf = new_buf;
+       client->recv_buf_size *= 2;
+
+       return 0;
+}
+
+static int
+_spdk_jsonrpc_client_resp_ready_count(struct spdk_jsonrpc_client *client)
+{
+       return client->resp != NULL && client->resp->ready ? 1 : 0;
+}
+
+static int
+_spdk_jsonrpc_client_recv(struct spdk_jsonrpc_client *client)
+{
+       ssize_t rc;
+
+       if (client->recv_buf == NULL) {
+               client->recv_buf = malloc(SPDK_JSONRPC_SEND_BUF_SIZE_INIT);
+               if (!client->recv_buf) {
+                       rc = errno;
+                       SPDK_ERRLOG("malloc() failed (%d): %s\n", (int)rc, spdk_strerror(rc));
+                       return -rc;
+               }
+               client->recv_buf_size = SPDK_JSONRPC_SEND_BUF_SIZE_INIT;
+               client->recv_offset = 0;
+       } else if (client->recv_offset == client->recv_buf_size - 1) {
+               rc = recv_buf_expand(client);
+               if (rc) {
+                       return rc;
+               }
+       }
+
+       rc = recv(client->sockfd, client->recv_buf + client->recv_offset,
+                 client->recv_buf_size - client->recv_offset - 1, 0);
+       if (rc < 0) {
+               /* For EINTR we pretend that nothing was reveived. */
+               if (errno == EINTR) {
+                       return 0;
+               } else {
+                       rc = -errno;
+                       SPDK_ERRLOG("recv() failed (%d): %s\n", errno, spdk_strerror(errno));
+                       return rc;
+               }
+       } else if (rc == 0) {
+               return -EIO;
+       }
+
+       client->recv_offset += rc;
+       client->recv_buf[client->recv_offset] = '\0';
+
+       /* Check to see if we have received a full JSON value. */
+       return spdk_jsonrpc_parse_response(client);
+}
+
+static int
+_spdk_jsonrpc_client_poll(struct spdk_jsonrpc_client *client, int timeout)
 {
-       struct spdk_jsonrpc_client *client;
        int rc;
+       struct pollfd pfd = { .fd = client->sockfd, .events = POLLIN | POLLOUT };
+
+       rc = poll(&pfd, 1, timeout);
+       if (rc == -1) {
+               if (errno == EINTR) {
+                       /* For EINTR we pretend that nothing was received nor send. */
+                       rc = 0;
+               } else {
+                       rc = -errno;
+                       SPDK_ERRLOG("poll() failed (%d): %s\n", errno, spdk_strerror(errno));
+               }
+       } else if (rc > 0) {
+               rc = 0;
 
-       client = calloc(1, sizeof(struct spdk_jsonrpc_client));
-       if (client == NULL) {
-               return NULL;
+               if (pfd.revents & POLLOUT) {
+                       rc = _spdk_jsonrpc_client_send_request(client);
+               }
+
+               if (rc == 0 && (pfd.revents & POLLIN)) {
+                       rc = _spdk_jsonrpc_client_recv(client);
+                       /* Incomplete message in buffer isn't an error. */
+                       if (rc == -EAGAIN) {
+                               rc = 0;
+                       }
+               }
+       }
+
+       return rc ? rc : _spdk_jsonrpc_client_resp_ready_count(client);
+}
+
+static int
+_spdk_jsonrpc_client_poll_connecting(struct spdk_jsonrpc_client *client, int timeout)
+{
+       socklen_t rc_len;
+       int rc;
+
+       struct pollfd pfd = {
+               .fd = client->sockfd,
+               .events = POLLOUT
+       };
+
+       rc = poll(&pfd, 1, timeout);
+       if (rc == 0) {
+               return -ENOTCONN;
+       } else if (rc == -1) {
+               if (errno != EINTR) {
+                       SPDK_ERRLOG("poll() failed (%d): %s\n", errno, spdk_strerror(errno));
+                       goto err;
+               }
+
+               /* We are still not connected. Caller will have to call us again. */
+               return -ENOTCONN;
+       } else if (pfd.revents & ~POLLOUT) {
+               /* We only poll for POLLOUT */
+               goto err;
+       } else if ((pfd.revents & POLLOUT) == 0) {
+               /* Is this even possible to get here? */
+               return -ENOTCONN;
+       }
+
+       rc_len = sizeof(int);
+       /* connection might fail so need to check SO_ERROR. */
+       if (getsockopt(client->sockfd, SOL_SOCKET, SO_ERROR, &rc, &rc_len) == -1) {
+               goto err;
        }
 
+       if (rc == 0) {
+               client->connected = true;
+               return 0;
+       }
+
+err:
+       return -EIO;
+}
+
+static int
+_spdk_jsonrpc_client_connect(struct spdk_jsonrpc_client *client, int domain, int protocol,
+                            struct sockaddr *server_addr, socklen_t addrlen)
+{
+       int rc, flags;
+
        client->sockfd = socket(domain, SOCK_STREAM, protocol);
        if (client->sockfd < 0) {
+               rc = errno;
                SPDK_ERRLOG("socket() failed\n");
-               free(client);
-               return NULL;
+               return -rc;
        }
 
-       rc = connect(client->sockfd, server_addr, addrlen);
-       if (rc != 0) {
-               SPDK_ERRLOG("could not connet JSON-RPC server: %s\n", spdk_strerror(errno));
-               close(client->sockfd);
-               free(client);
-               return NULL;
+       flags = fcntl(client->sockfd, F_GETFL);
+       if (flags < 0 || fcntl(client->sockfd, F_SETFL, flags | O_NONBLOCK) < 0) {
+               rc = errno;
+               SPDK_ERRLOG("fcntl(): can't set nonblocking mode for socket (%d): %s\n",
+                           errno, spdk_strerror(errno));
+               goto err;
        }
 
-       /* memory malloc for recv-buf */
-       client->recv_buf = malloc(SPDK_JSONRPC_SEND_BUF_SIZE_INIT);
-       if (!client->recv_buf) {
-               SPDK_ERRLOG("memory malloc for recv-buf failed\n");
-               close(client->sockfd);
-               free(client);
-               return NULL;
+       rc = connect(client->sockfd, server_addr, addrlen);
+       if (rc != 0) {
+               rc = errno;
+               if (rc != EINPROGRESS) {
+                       SPDK_ERRLOG("could not connect to JSON-RPC server: %s\n", spdk_strerror(errno));
+                       goto err;
+               }
+       } else {
+               client->connected = true;
        }
-       client->recv_buf_size = SPDK_JSONRPC_SEND_BUF_SIZE_INIT;
 
-       return client;
+       return -rc;
+err:
+       close(client->sockfd);
+       client->sockfd = -1;
+       return -rc;
 }
 
 struct spdk_jsonrpc_client *
-spdk_jsonrpc_client_connect(const char *rpc_sock_addr, int addr_family)
+spdk_jsonrpc_client_connect(const char *addr, int addr_family)
 {
-       struct spdk_jsonrpc_client *client;
+       struct spdk_jsonrpc_client *client = calloc(1, sizeof(struct spdk_jsonrpc_client));
+       /* Unix Domain Socket */
+       struct sockaddr_un addr_un = {};
+       char *add_in = NULL;
+       int rc;
+
+       if (client == NULL) {
+               SPDK_ERRLOG("%s\n", spdk_strerror(errno));
+               return NULL;
+       }
 
        if (addr_family == AF_UNIX) {
-               /* Unix Domain Socket */
-               struct sockaddr_un rpc_sock_addr_unix = {};
-               int rc;
-
-               rpc_sock_addr_unix.sun_family = AF_UNIX;
-               rc = snprintf(rpc_sock_addr_unix.sun_path,
-                             sizeof(rpc_sock_addr_unix.sun_path),
-                             "%s", rpc_sock_addr);
-               if (rc < 0 || (size_t)rc >= sizeof(rpc_sock_addr_unix.sun_path)) {
+               addr_un.sun_family = AF_UNIX;
+               rc = snprintf(addr_un.sun_path, sizeof(addr_un.sun_path), "%s", addr);
+               if (rc < 0 || (size_t)rc >= sizeof(addr_un.sun_path)) {
+                       rc = -EINVAL;
                        SPDK_ERRLOG("RPC Listen address Unix socket path too long\n");
-                       return NULL;
+                       goto err;
                }
 
-               client = _spdk_jsonrpc_client_connect(AF_UNIX, 0,
-                                                     (struct sockaddr *)&rpc_sock_addr_unix,
-                                                     sizeof(rpc_sock_addr_unix));
+               rc = _spdk_jsonrpc_client_connect(client, AF_UNIX, 0, (struct sockaddr *)&addr_un, sizeof(addr_un));
        } else {
                /* TCP/IP socket */
                struct addrinfo         hints;
                struct addrinfo         *res;
-               char *tmp;
                char *host, *port;
 
-               tmp = strdup(rpc_sock_addr);
-               if (!tmp) {
-                       SPDK_ERRLOG("Out of memory\n");
-                       return NULL;
+               add_in = strdup(addr);
+               if (!add_in) {
+                       rc = -errno;
+                       SPDK_ERRLOG("%s\n", spdk_strerror(errno));
+                       goto err;
                }
 
-               if (spdk_parse_ip_addr(tmp, &host, &port) < 0) {
-                       free(tmp);
-                       SPDK_ERRLOG("Invalid listen address '%s'\n", rpc_sock_addr);
-                       return NULL;
+               rc = spdk_parse_ip_addr(add_in, &host, &port);
+               if (rc) {
+                       SPDK_ERRLOG("Invalid listen address '%s'\n", addr);
+                       goto err;
                }
 
                if (port == NULL) {
@@ -125,19 +314,26 @@ spdk_jsonrpc_client_connect(const char *rpc_sock_addr, int addr_family)
                hints.ai_socktype = SOCK_STREAM;
                hints.ai_protocol = IPPROTO_TCP;
 
-               if (getaddrinfo(host, port, &hints, &res) != 0) {
-                       free(tmp);
-                       SPDK_ERRLOG("Unable to look up RPC connnect address '%s'\n", rpc_sock_addr);
-                       return NULL;
+               rc = getaddrinfo(host, port, &hints, &res);
+               if (rc != 0) {
+                       SPDK_ERRLOG("Unable to look up RPC connnect address '%s' (%d): %s\n", addr, rc, gai_strerror(rc));
+                       rc = -EINVAL;
+                       goto err;
                }
 
-               client = _spdk_jsonrpc_client_connect(res->ai_family, res->ai_protocol,
-                                                     res->ai_addr, res->ai_addrlen);
-
+               rc = _spdk_jsonrpc_client_connect(client, res->ai_family, res->ai_protocol, res->ai_addr,
+                                                 res->ai_addrlen);
                freeaddrinfo(res);
-               free(tmp);
        }
 
+err:
+       if (rc != 0 && rc != -EINPROGRESS) {
+               free(client);
+               client = NULL;
+               errno = -rc;
+       }
+
+       free(add_in);
        return client;
 }
 
@@ -146,8 +342,11 @@ spdk_jsonrpc_client_close(struct spdk_jsonrpc_client *client)
 {
        if (client->sockfd >= 0) {
                close(client->sockfd);
-               free(client->recv_buf);
-               client->sockfd = -1;
+       }
+
+       free(client->recv_buf);
+       if (client->resp) {
+               spdk_jsonrpc_client_free_response(&client->resp->jsonrpc);
        }
 
        free(client);
@@ -183,102 +382,50 @@ spdk_jsonrpc_client_free_request(struct spdk_jsonrpc_client_request *req)
 }
 
 int
-spdk_jsonrpc_client_send_request(struct spdk_jsonrpc_client *client,
-                                struct spdk_jsonrpc_client_request *request)
+spdk_jsonrpc_client_poll(struct spdk_jsonrpc_client *client, int timeout)
 {
-       ssize_t rc;
-
-       /* Reset offset in request */
-       request->send_offset = 0;
-
-       while (request->send_len > 0) {
-               rc = send(client->sockfd, request->send_buf + request->send_offset,
-                         request->send_len, 0);
-               if (rc <= 0) {
-                       if (rc < 0 && errno == EINTR) {
-                               rc = 0;
-                       } else {
-                               return rc;
-                       }
-               }
-
-               request->send_offset += rc;
-               request->send_len -= rc;
+       if (client->connected) {
+               return _spdk_jsonrpc_client_poll(client, timeout);
+       } else {
+               return _spdk_jsonrpc_client_poll_connecting(client, timeout);
        }
-
-       return 0;
 }
 
-static int
-recv_buf_expand(struct spdk_jsonrpc_client *client)
+int spdk_jsonrpc_client_send_request(struct spdk_jsonrpc_client *client,
+                                    struct spdk_jsonrpc_client_request *req)
 {
-       uint8_t *new_buf;
-
-       if (client->recv_buf_size * 2 > SPDK_JSONRPC_SEND_BUF_SIZE_MAX) {
+       if (client->request != NULL) {
                return -ENOSPC;
        }
 
-       new_buf = realloc(client->recv_buf, client->recv_buf_size * 2);
-       if (new_buf == NULL) {
-               SPDK_ERRLOG("Resizing recv_buf failed (current size %zu, new size %zu)\n",
-                           client->recv_buf_size, client->recv_buf_size * 2);
-               return -ENOMEM;
-       }
-
-       client->recv_buf = new_buf;
-       client->recv_buf_size *= 2;
-
+       client->request = req;
        return 0;
 }
 
-int
-spdk_jsonrpc_client_recv_response(struct spdk_jsonrpc_client *client,
-                                 spdk_jsonrpc_client_response_parser parser_fn,
-                                 void *parser_ctx)
+struct spdk_jsonrpc_client_response *
+spdk_jsonrpc_client_get_response(struct spdk_jsonrpc_client *client)
 {
-       ssize_t rc = 0;
-       size_t recv_avail;
-       size_t recv_offset = 0;
-
-       client->parser_fn = parser_fn;
-       client->parser_ctx = parser_ctx;
-
-       recv_avail = client->recv_buf_size;
+       struct spdk_jsonrpc_client_response_internal *r;
 
-       while (recv_avail > 0) {
-               rc = recv(client->sockfd, client->recv_buf + recv_offset, recv_avail, 0);
-               if (rc < 0) {
-                       if (errno == EINTR) {
-                               continue;
-                       } else {
-                               return errno;
-                       }
-               } else if (rc == 0) {
-                       return -EIO;
-               }
+       r = client->resp;
+       if (r == NULL || r->ready == false) {
+               return NULL;
+       }
 
-               recv_offset += rc;
-               recv_avail -= rc;
+       client->resp = NULL;
+       return &r->jsonrpc;
+}
 
-               /* Check to see if we have received a full JSON value. */
-               rc = spdk_jsonrpc_parse_response(client, client->recv_buf, recv_offset);
-               if (rc == 0) {
-                       /* Successfully parsed response */
-                       return 0;
-               } else if (rc != SPDK_JSON_PARSE_INCOMPLETE) {
-                       SPDK_ERRLOG("jsonrpc parse request failed\n");
-                       return -EINVAL;
-               }
+void
+spdk_jsonrpc_client_free_response(struct spdk_jsonrpc_client_response *resp)
+{
+       struct spdk_jsonrpc_client_response_internal *r;
 
-               /* Expand receive buffer if larger one is needed */
-               if (recv_avail == 0) {
-                       rc = recv_buf_expand(client);
-                       if (rc != 0) {
-                               return rc;
-                       }
-                       recv_avail = client->recv_buf_size - recv_offset;
-               }
+       if (!resp) {
+               return;
        }
 
-       return 0;
+       r = SPDK_CONTAINEROF(resp, struct spdk_jsonrpc_client_response_internal, jsonrpc);
+       free(r->buf);
+       free(r);
 }