]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/lib/jsonrpc/jsonrpc_server_tcp.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / spdk / lib / jsonrpc / jsonrpc_server_tcp.c
1 /*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 #include "jsonrpc_internal.h"
35 #include "spdk/string.h"
36 #include "spdk/util.h"
37
38 struct spdk_jsonrpc_server *
39 spdk_jsonrpc_server_listen(int domain, int protocol,
40 struct sockaddr *listen_addr, socklen_t addrlen,
41 spdk_jsonrpc_handle_request_fn handle_request)
42 {
43 struct spdk_jsonrpc_server *server;
44 int rc, val, flag, i;
45
46 server = calloc(1, sizeof(struct spdk_jsonrpc_server));
47 if (server == NULL) {
48 return NULL;
49 }
50
51 TAILQ_INIT(&server->free_conns);
52 TAILQ_INIT(&server->conns);
53
54 for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) {
55 TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link);
56 }
57
58 server->handle_request = handle_request;
59
60 server->sockfd = socket(domain, SOCK_STREAM, protocol);
61 if (server->sockfd < 0) {
62 SPDK_ERRLOG("socket() failed\n");
63 free(server);
64 return NULL;
65 }
66
67 val = 1;
68 setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
69 if (protocol == IPPROTO_TCP) {
70 setsockopt(server->sockfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
71 }
72
73 flag = fcntl(server->sockfd, F_GETFL);
74 if (fcntl(server->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
75 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
76 server->sockfd, spdk_strerror(errno));
77 close(server->sockfd);
78 free(server);
79 return NULL;
80 }
81
82 rc = bind(server->sockfd, listen_addr, addrlen);
83 if (rc != 0) {
84 SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno));
85 close(server->sockfd);
86 free(server);
87 return NULL;
88 }
89
90 rc = listen(server->sockfd, 512);
91 if (rc != 0) {
92 SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
93 close(server->sockfd);
94 free(server);
95 return NULL;
96 }
97
98 return server;
99 }
100
101 static void
102 spdk_jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn)
103 {
104 conn->closed = true;
105
106 if (conn->sockfd >= 0) {
107 close(conn->sockfd);
108 conn->sockfd = -1;
109
110 if (conn->close_cb) {
111 conn->close_cb(conn, conn->close_cb_ctx);
112 }
113 }
114 }
115
116 void
117 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server)
118 {
119 struct spdk_jsonrpc_server_conn *conn;
120
121 close(server->sockfd);
122
123 TAILQ_FOREACH(conn, &server->conns, link) {
124 spdk_jsonrpc_server_conn_close(conn);
125 }
126
127 free(server);
128 }
129
130 static void
131 spdk_jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn)
132 {
133 struct spdk_jsonrpc_server *server = conn->server;
134
135 spdk_jsonrpc_server_conn_close(conn);
136
137 pthread_spin_destroy(&conn->queue_lock);
138 assert(STAILQ_EMPTY(&conn->send_queue));
139
140 TAILQ_REMOVE(&server->conns, conn, link);
141 TAILQ_INSERT_HEAD(&server->free_conns, conn, link);
142 }
143
144 int
145 spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn,
146 spdk_jsonrpc_conn_closed_fn cb, void *ctx)
147 {
148 int rc = 0;
149
150 pthread_spin_lock(&conn->queue_lock);
151 if (conn->close_cb == NULL) {
152 conn->close_cb = cb;
153 conn->close_cb_ctx = ctx;
154 } else {
155 rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC;
156 }
157 pthread_spin_unlock(&conn->queue_lock);
158
159 return rc;
160 }
161
162 int
163 spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn,
164 spdk_jsonrpc_conn_closed_fn cb, void *ctx)
165 {
166 int rc = 0;
167
168 pthread_spin_lock(&conn->queue_lock);
169 if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) {
170 rc = -ENOENT;
171 } else {
172 conn->close_cb = NULL;
173 }
174 pthread_spin_unlock(&conn->queue_lock);
175
176 return rc;
177 }
178
179 static int
180 spdk_jsonrpc_server_accept(struct spdk_jsonrpc_server *server)
181 {
182 struct spdk_jsonrpc_server_conn *conn;
183 int rc, flag;
184
185 rc = accept(server->sockfd, NULL, NULL);
186 if (rc >= 0) {
187 conn = TAILQ_FIRST(&server->free_conns);
188 assert(conn != NULL);
189
190 conn->server = server;
191 conn->sockfd = rc;
192 conn->closed = false;
193 conn->recv_len = 0;
194 conn->outstanding_requests = 0;
195 pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE);
196 STAILQ_INIT(&conn->send_queue);
197 conn->send_request = NULL;
198
199 flag = fcntl(conn->sockfd, F_GETFL);
200 if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
201 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
202 conn->sockfd, spdk_strerror(errno));
203 close(conn->sockfd);
204 return -1;
205 }
206
207 TAILQ_REMOVE(&server->free_conns, conn, link);
208 TAILQ_INSERT_TAIL(&server->conns, conn, link);
209 return 0;
210 }
211
212 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
213 return 0;
214 }
215
216 return -1;
217 }
218
219 void
220 spdk_jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request,
221 const struct spdk_json_val *method, const struct spdk_json_val *params)
222 {
223 request->conn->server->handle_request(request, method, params);
224 }
225
226 void
227 spdk_jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error)
228 {
229 const char *msg;
230
231 switch (error) {
232 case SPDK_JSONRPC_ERROR_PARSE_ERROR:
233 msg = "Parse error";
234 break;
235
236 case SPDK_JSONRPC_ERROR_INVALID_REQUEST:
237 msg = "Invalid request";
238 break;
239
240 case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND:
241 msg = "Method not found";
242 break;
243
244 case SPDK_JSONRPC_ERROR_INVALID_PARAMS:
245 msg = "Invalid parameters";
246 break;
247
248 case SPDK_JSONRPC_ERROR_INTERNAL_ERROR:
249 msg = "Internal error";
250 break;
251
252 default:
253 msg = "Error";
254 break;
255 }
256
257 spdk_jsonrpc_send_error_response(request, error, msg);
258 }
259
260 static int
261 spdk_jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn)
262 {
263 ssize_t rc, offset;
264 size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len;
265
266 rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0);
267 if (rc == -1) {
268 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
269 return 0;
270 }
271 SPDK_DEBUGLOG(SPDK_LOG_RPC, "recv() failed: %s\n", spdk_strerror(errno));
272 return -1;
273 }
274
275 if (rc == 0) {
276 SPDK_DEBUGLOG(SPDK_LOG_RPC, "remote closed connection\n");
277 conn->closed = true;
278 return 0;
279 }
280
281 conn->recv_len += rc;
282
283 offset = 0;
284 do {
285 rc = spdk_jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset);
286 if (rc < 0) {
287 SPDK_ERRLOG("jsonrpc parse request failed\n");
288 return -1;
289 }
290
291 offset += rc;
292 } while (rc > 0);
293
294 if (offset > 0) {
295 /*
296 * Successfully parsed a requests - move any data past the end of the
297 * parsed requests down to the beginning.
298 */
299 assert((size_t)offset <= conn->recv_len);
300 memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset);
301 conn->recv_len -= offset;
302 }
303
304 return 0;
305 }
306
307 void
308 spdk_jsonrpc_server_send_response(struct spdk_jsonrpc_request *request)
309 {
310 struct spdk_jsonrpc_server_conn *conn = request->conn;
311
312 /* Queue the response to be sent */
313 pthread_spin_lock(&conn->queue_lock);
314 STAILQ_INSERT_TAIL(&conn->send_queue, request, link);
315 pthread_spin_unlock(&conn->queue_lock);
316 }
317
318 static struct spdk_jsonrpc_request *
319 spdk_jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn)
320 {
321 struct spdk_jsonrpc_request *request = NULL;
322
323 pthread_spin_lock(&conn->queue_lock);
324 request = STAILQ_FIRST(&conn->send_queue);
325 if (request) {
326 STAILQ_REMOVE_HEAD(&conn->send_queue, link);
327 }
328 pthread_spin_unlock(&conn->queue_lock);
329 return request;
330 }
331
332
333 static int
334 spdk_jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn)
335 {
336 struct spdk_jsonrpc_request *request;
337 ssize_t rc;
338
339 more:
340 if (conn->outstanding_requests == 0) {
341 return 0;
342 }
343
344 if (conn->send_request == NULL) {
345 conn->send_request = spdk_jsonrpc_server_dequeue_request(conn);
346 }
347
348 request = conn->send_request;
349 if (request == NULL) {
350 /* Nothing to send right now */
351 return 0;
352 }
353
354 if (request->send_len > 0) {
355 rc = send(conn->sockfd, request->send_buf + request->send_offset,
356 request->send_len, 0);
357 if (rc < 0) {
358 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
359 return 0;
360 }
361
362 SPDK_DEBUGLOG(SPDK_LOG_RPC, "send() failed: %s\n", spdk_strerror(errno));
363 return -1;
364 }
365
366 request->send_offset += rc;
367 request->send_len -= rc;
368 }
369
370 if (request->send_len == 0) {
371 /*
372 * Full response has been sent.
373 * Free it and set send_request to NULL to move on to the next queued response.
374 */
375 conn->send_request = NULL;
376 spdk_jsonrpc_free_request(request);
377 goto more;
378 }
379
380 return 0;
381 }
382
383 int
384 spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server)
385 {
386 int rc;
387 struct spdk_jsonrpc_server_conn *conn, *conn_tmp;
388
389 TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) {
390 /* If we can't receive and there are no outstanding requests close the connection. */
391 if (conn->closed == true && conn->outstanding_requests == 0) {
392 spdk_jsonrpc_server_conn_close(conn);
393 }
394
395 if (conn->sockfd == -1) {
396 struct spdk_jsonrpc_request *request;
397
398 /*
399 * The client closed the connection, but there may still be requests
400 * outstanding; we have no way to cancel outstanding requests, so wait until
401 * each outstanding request sends a response (which will be discarded, since
402 * the connection is closed).
403 */
404
405 spdk_jsonrpc_free_request(conn->send_request);
406 conn->send_request = NULL;
407
408 while ((request = spdk_jsonrpc_server_dequeue_request(conn)) != NULL) {
409 spdk_jsonrpc_free_request(request);
410 }
411
412 if (conn->outstanding_requests == 0) {
413 SPDK_DEBUGLOG(SPDK_LOG_RPC, "all outstanding requests completed\n");
414 spdk_jsonrpc_server_conn_remove(conn);
415 }
416 }
417 }
418
419 /* Check listen socket */
420 if (!TAILQ_EMPTY(&server->free_conns)) {
421 spdk_jsonrpc_server_accept(server);
422 }
423
424 TAILQ_FOREACH(conn, &server->conns, link) {
425 if (conn->sockfd == -1) {
426 continue;
427 }
428
429 rc = spdk_jsonrpc_server_conn_send(conn);
430 if (rc != 0) {
431 spdk_jsonrpc_server_conn_close(conn);
432 continue;
433 }
434
435 if (!conn->closed) {
436 rc = spdk_jsonrpc_server_conn_recv(conn);
437 if (rc != 0) {
438 spdk_jsonrpc_server_conn_close(conn);
439 }
440 }
441 }
442
443 return 0;
444 }