]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/lib/jsonrpc/jsonrpc_server_tcp.c
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / spdk / lib / jsonrpc / jsonrpc_server_tcp.c
1 /*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 #include "jsonrpc_internal.h"
35 #include "spdk/string.h"
36 #include "spdk/util.h"
37
38 struct spdk_jsonrpc_server *
39 spdk_jsonrpc_server_listen(int domain, int protocol,
40 struct sockaddr *listen_addr, socklen_t addrlen,
41 spdk_jsonrpc_handle_request_fn handle_request)
42 {
43 struct spdk_jsonrpc_server *server;
44 int rc, val, flag, i;
45
46 server = calloc(1, sizeof(struct spdk_jsonrpc_server));
47 if (server == NULL) {
48 return NULL;
49 }
50
51 TAILQ_INIT(&server->free_conns);
52 TAILQ_INIT(&server->conns);
53
54 for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) {
55 TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link);
56 }
57
58 server->handle_request = handle_request;
59
60 server->sockfd = socket(domain, SOCK_STREAM, protocol);
61 if (server->sockfd < 0) {
62 SPDK_ERRLOG("socket() failed\n");
63 free(server);
64 return NULL;
65 }
66
67 val = 1;
68 setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
69
70 flag = fcntl(server->sockfd, F_GETFL);
71 if (fcntl(server->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
72 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
73 server->sockfd, spdk_strerror(errno));
74 close(server->sockfd);
75 free(server);
76 return NULL;
77 }
78
79 rc = bind(server->sockfd, listen_addr, addrlen);
80 if (rc != 0) {
81 SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno));
82 close(server->sockfd);
83 free(server);
84 return NULL;
85 }
86
87 rc = listen(server->sockfd, 512);
88 if (rc != 0) {
89 SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
90 close(server->sockfd);
91 free(server);
92 return NULL;
93 }
94
95 return server;
96 }
97
98 static struct spdk_jsonrpc_request *
99 jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn)
100 {
101 struct spdk_jsonrpc_request *request = NULL;
102
103 pthread_spin_lock(&conn->queue_lock);
104 request = STAILQ_FIRST(&conn->send_queue);
105 if (request) {
106 STAILQ_REMOVE_HEAD(&conn->send_queue, link);
107 }
108 pthread_spin_unlock(&conn->queue_lock);
109 return request;
110 }
111
112 static void
113 jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn *conn)
114 {
115 struct spdk_jsonrpc_request *request;
116
117 jsonrpc_free_request(conn->send_request);
118 conn->send_request = NULL ;
119 while ((request = jsonrpc_server_dequeue_request(conn)) != NULL) {
120 jsonrpc_free_request(request);
121 }
122 }
123
124 static void
125 jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn)
126 {
127 conn->closed = true;
128
129 if (conn->sockfd >= 0) {
130 jsonrpc_server_free_conn_request(conn);
131 close(conn->sockfd);
132 conn->sockfd = -1;
133
134 if (conn->close_cb) {
135 conn->close_cb(conn, conn->close_cb_ctx);
136 }
137 }
138 }
139
140 void
141 spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server)
142 {
143 struct spdk_jsonrpc_server_conn *conn;
144
145 close(server->sockfd);
146
147 TAILQ_FOREACH(conn, &server->conns, link) {
148 jsonrpc_server_conn_close(conn);
149 }
150
151 free(server);
152 }
153
154 static void
155 jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn)
156 {
157 struct spdk_jsonrpc_server *server = conn->server;
158
159 jsonrpc_server_conn_close(conn);
160
161 pthread_spin_destroy(&conn->queue_lock);
162 assert(STAILQ_EMPTY(&conn->send_queue));
163
164 TAILQ_REMOVE(&server->conns, conn, link);
165 TAILQ_INSERT_HEAD(&server->free_conns, conn, link);
166 }
167
168 int
169 spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn,
170 spdk_jsonrpc_conn_closed_fn cb, void *ctx)
171 {
172 int rc = 0;
173
174 pthread_spin_lock(&conn->queue_lock);
175 if (conn->close_cb == NULL) {
176 conn->close_cb = cb;
177 conn->close_cb_ctx = ctx;
178 } else {
179 rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC;
180 }
181 pthread_spin_unlock(&conn->queue_lock);
182
183 return rc;
184 }
185
186 int
187 spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn,
188 spdk_jsonrpc_conn_closed_fn cb, void *ctx)
189 {
190 int rc = 0;
191
192 pthread_spin_lock(&conn->queue_lock);
193 if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) {
194 rc = -ENOENT;
195 } else {
196 conn->close_cb = NULL;
197 }
198 pthread_spin_unlock(&conn->queue_lock);
199
200 return rc;
201 }
202
203 static int
204 jsonrpc_server_accept(struct spdk_jsonrpc_server *server)
205 {
206 struct spdk_jsonrpc_server_conn *conn;
207 int rc, flag;
208
209 rc = accept(server->sockfd, NULL, NULL);
210 if (rc >= 0) {
211 conn = TAILQ_FIRST(&server->free_conns);
212 assert(conn != NULL);
213
214 conn->server = server;
215 conn->sockfd = rc;
216 conn->closed = false;
217 conn->recv_len = 0;
218 conn->outstanding_requests = 0;
219 STAILQ_INIT(&conn->send_queue);
220 conn->send_request = NULL;
221
222 if (pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE)) {
223 SPDK_ERRLOG("Unable to create queue lock for socket: %d", conn->sockfd);
224 close(conn->sockfd);
225 return -1;
226 }
227
228 flag = fcntl(conn->sockfd, F_GETFL);
229 if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
230 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
231 conn->sockfd, spdk_strerror(errno));
232 close(conn->sockfd);
233 pthread_spin_destroy(&conn->queue_lock);
234 return -1;
235 }
236
237 TAILQ_REMOVE(&server->free_conns, conn, link);
238 TAILQ_INSERT_TAIL(&server->conns, conn, link);
239 return 0;
240 }
241
242 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
243 return 0;
244 }
245
246 return -1;
247 }
248
249 void
250 jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request,
251 const struct spdk_json_val *method, const struct spdk_json_val *params)
252 {
253 request->conn->server->handle_request(request, method, params);
254 }
255
256 void
257 jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error)
258 {
259 const char *msg;
260
261 switch (error) {
262 case SPDK_JSONRPC_ERROR_PARSE_ERROR:
263 msg = "Parse error";
264 break;
265
266 case SPDK_JSONRPC_ERROR_INVALID_REQUEST:
267 msg = "Invalid request";
268 break;
269
270 case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND:
271 msg = "Method not found";
272 break;
273
274 case SPDK_JSONRPC_ERROR_INVALID_PARAMS:
275 msg = "Invalid parameters";
276 break;
277
278 case SPDK_JSONRPC_ERROR_INTERNAL_ERROR:
279 msg = "Internal error";
280 break;
281
282 default:
283 msg = "Error";
284 break;
285 }
286
287 spdk_jsonrpc_send_error_response(request, error, msg);
288 }
289
290 static int
291 jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn)
292 {
293 ssize_t rc, offset;
294 size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len;
295
296 rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0);
297 if (rc == -1) {
298 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
299 return 0;
300 }
301 SPDK_DEBUGLOG(SPDK_LOG_RPC, "recv() failed: %s\n", spdk_strerror(errno));
302 return -1;
303 }
304
305 if (rc == 0) {
306 SPDK_DEBUGLOG(SPDK_LOG_RPC, "remote closed connection\n");
307 conn->closed = true;
308 return 0;
309 }
310
311 conn->recv_len += rc;
312
313 offset = 0;
314 do {
315 rc = jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset);
316 if (rc < 0) {
317 SPDK_ERRLOG("jsonrpc parse request failed\n");
318 return -1;
319 }
320
321 offset += rc;
322 } while (rc > 0);
323
324 if (offset > 0) {
325 /*
326 * Successfully parsed a requests - move any data past the end of the
327 * parsed requests down to the beginning.
328 */
329 assert((size_t)offset <= conn->recv_len);
330 memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset);
331 conn->recv_len -= offset;
332 }
333
334 return 0;
335 }
336
337 void
338 jsonrpc_server_send_response(struct spdk_jsonrpc_request *request)
339 {
340 struct spdk_jsonrpc_server_conn *conn = request->conn;
341
342 /* Queue the response to be sent */
343 pthread_spin_lock(&conn->queue_lock);
344 STAILQ_INSERT_TAIL(&conn->send_queue, request, link);
345 pthread_spin_unlock(&conn->queue_lock);
346 }
347
348
349 static int
350 jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn)
351 {
352 struct spdk_jsonrpc_request *request;
353 ssize_t rc;
354
355 more:
356 if (conn->outstanding_requests == 0) {
357 return 0;
358 }
359
360 if (conn->send_request == NULL) {
361 conn->send_request = jsonrpc_server_dequeue_request(conn);
362 }
363
364 request = conn->send_request;
365 if (request == NULL) {
366 /* Nothing to send right now */
367 return 0;
368 }
369
370 if (request->send_len > 0) {
371 rc = send(conn->sockfd, request->send_buf + request->send_offset,
372 request->send_len, 0);
373 if (rc < 0) {
374 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
375 return 0;
376 }
377
378 SPDK_DEBUGLOG(SPDK_LOG_RPC, "send() failed: %s\n", spdk_strerror(errno));
379 return -1;
380 }
381
382 request->send_offset += rc;
383 request->send_len -= rc;
384 }
385
386 if (request->send_len == 0) {
387 /*
388 * Full response has been sent.
389 * Free it and set send_request to NULL to move on to the next queued response.
390 */
391 conn->send_request = NULL;
392 jsonrpc_free_request(request);
393 goto more;
394 }
395
396 return 0;
397 }
398
399 int
400 spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server)
401 {
402 int rc;
403 struct spdk_jsonrpc_server_conn *conn, *conn_tmp;
404
405 TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) {
406 /* If we can't receive and there are no outstanding requests close the connection. */
407 if (conn->closed == true && conn->outstanding_requests == 0) {
408 jsonrpc_server_conn_close(conn);
409 }
410
411 if (conn->sockfd == -1 && conn->outstanding_requests == 0) {
412 jsonrpc_server_conn_remove(conn);
413 }
414 }
415
416 /* Check listen socket */
417 if (!TAILQ_EMPTY(&server->free_conns)) {
418 jsonrpc_server_accept(server);
419 }
420
421 TAILQ_FOREACH(conn, &server->conns, link) {
422 if (conn->sockfd == -1) {
423 continue;
424 }
425
426 rc = jsonrpc_server_conn_send(conn);
427 if (rc != 0) {
428 jsonrpc_server_conn_close(conn);
429 continue;
430 }
431
432 if (!conn->closed) {
433 rc = jsonrpc_server_conn_recv(conn);
434 if (rc != 0) {
435 jsonrpc_server_conn_close(conn);
436 }
437 }
438 }
439
440 return 0;
441 }