]> git.proxmox.com Git - mirror_ovs.git/blob - lib/jsonrpc.c
stream: Remove spurious #includes from header file.
[mirror_ovs.git] / lib / jsonrpc.c
1 /*
2 * Copyright (c) 2009, 2010 Nicira Networks.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18
19 #include "jsonrpc.h"
20
21 #include <assert.h>
22 #include <errno.h>
23
24 #include "byteq.h"
25 #include "dynamic-string.h"
26 #include "json.h"
27 #include "list.h"
28 #include "ofpbuf.h"
29 #include "poll-loop.h"
30 #include "queue.h"
31 #include "reconnect.h"
32 #include "stream.h"
33 #include "timeval.h"
34
35 #define THIS_MODULE VLM_jsonrpc
36 #include "vlog.h"
37 \f
38 struct jsonrpc {
39 struct stream *stream;
40 char *name;
41 int status;
42
43 /* Input. */
44 struct byteq input;
45 struct json_parser *parser;
46 struct jsonrpc_msg *received;
47
48 /* Output. */
49 struct ovs_queue output;
50 size_t backlog;
51 };
52
53 /* Rate limit for error messages. */
54 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
55
56 static void jsonrpc_received(struct jsonrpc *);
57 static void jsonrpc_cleanup(struct jsonrpc *);
58
59 struct jsonrpc *
60 jsonrpc_open(struct stream *stream)
61 {
62 struct jsonrpc *rpc;
63
64 assert(stream != NULL);
65
66 rpc = xzalloc(sizeof *rpc);
67 rpc->name = xstrdup(stream_get_name(stream));
68 rpc->stream = stream;
69 byteq_init(&rpc->input);
70 queue_init(&rpc->output);
71
72 return rpc;
73 }
74
75 void
76 jsonrpc_close(struct jsonrpc *rpc)
77 {
78 if (rpc) {
79 jsonrpc_cleanup(rpc);
80 free(rpc->name);
81 free(rpc);
82 }
83 }
84
85 void
86 jsonrpc_run(struct jsonrpc *rpc)
87 {
88 if (rpc->status) {
89 return;
90 }
91
92 stream_run(rpc->stream);
93 while (!queue_is_empty(&rpc->output)) {
94 struct ofpbuf *buf = rpc->output.head;
95 int retval;
96
97 retval = stream_send(rpc->stream, buf->data, buf->size);
98 if (retval >= 0) {
99 rpc->backlog -= retval;
100 ofpbuf_pull(buf, retval);
101 if (!buf->size) {
102 ofpbuf_delete(queue_pop_head(&rpc->output));
103 }
104 } else {
105 if (retval != -EAGAIN) {
106 VLOG_WARN_RL(&rl, "%s: send error: %s",
107 rpc->name, strerror(-retval));
108 jsonrpc_error(rpc, -retval);
109 }
110 break;
111 }
112 }
113 }
114
115 void
116 jsonrpc_wait(struct jsonrpc *rpc)
117 {
118 if (!rpc->status) {
119 stream_run_wait(rpc->stream);
120 if (!queue_is_empty(&rpc->output)) {
121 stream_send_wait(rpc->stream);
122 }
123 }
124 }
125
126 int
127 jsonrpc_get_status(const struct jsonrpc *rpc)
128 {
129 return rpc->status;
130 }
131
132 size_t
133 jsonrpc_get_backlog(const struct jsonrpc *rpc)
134 {
135 return rpc->status ? 0 : rpc->backlog;
136 }
137
138 const char *
139 jsonrpc_get_name(const struct jsonrpc *rpc)
140 {
141 return rpc->name;
142 }
143
144 static void
145 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
146 const struct jsonrpc_msg *msg)
147 {
148 if (VLOG_IS_DBG_ENABLED()) {
149 struct ds s = DS_EMPTY_INITIALIZER;
150 if (msg->method) {
151 ds_put_format(&s, ", method=\"%s\"", msg->method);
152 }
153 if (msg->params) {
154 ds_put_cstr(&s, ", params=");
155 ds_put_and_free_cstr(&s, json_to_string(msg->params, 0));
156 }
157 if (msg->result) {
158 ds_put_cstr(&s, ", result=");
159 ds_put_and_free_cstr(&s, json_to_string(msg->result, 0));
160 }
161 if (msg->error) {
162 ds_put_cstr(&s, ", error=");
163 ds_put_and_free_cstr(&s, json_to_string(msg->error, 0));
164 }
165 if (msg->id) {
166 ds_put_cstr(&s, ", id=");
167 ds_put_and_free_cstr(&s, json_to_string(msg->id, 0));
168 }
169 VLOG_DBG("%s: %s %s%s", rpc->name, title,
170 jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
171 ds_destroy(&s);
172 }
173 }
174
175 int
176 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
177 {
178 struct ofpbuf *buf;
179 struct json *json;
180 size_t length;
181 char *s;
182
183 if (rpc->status) {
184 jsonrpc_msg_destroy(msg);
185 return rpc->status;
186 }
187
188 jsonrpc_log_msg(rpc, "send", msg);
189
190 json = jsonrpc_msg_to_json(msg);
191 s = json_to_string(json, 0);
192 length = strlen(s);
193 json_destroy(json);
194
195 buf = xmalloc(sizeof *buf);
196 ofpbuf_use(buf, s, length);
197 buf->size = length;
198 queue_push_tail(&rpc->output, buf);
199 rpc->backlog += length;
200
201 if (rpc->output.n == 1) {
202 jsonrpc_run(rpc);
203 }
204 return rpc->status;
205 }
206
207 int
208 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
209 {
210 *msgp = NULL;
211 if (rpc->status) {
212 return rpc->status;
213 }
214
215 while (!rpc->received) {
216 if (byteq_is_empty(&rpc->input)) {
217 size_t chunk;
218 int retval;
219
220 chunk = byteq_headroom(&rpc->input);
221 retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
222 if (retval < 0) {
223 if (retval == -EAGAIN) {
224 return EAGAIN;
225 } else {
226 VLOG_WARN_RL(&rl, "%s: receive error: %s",
227 rpc->name, strerror(-retval));
228 jsonrpc_error(rpc, -retval);
229 return rpc->status;
230 }
231 } else if (retval == 0) {
232 VLOG_INFO_RL(&rl, "%s: connection closed", rpc->name);
233 jsonrpc_error(rpc, EOF);
234 return EOF;
235 }
236 byteq_advance_head(&rpc->input, retval);
237 } else {
238 size_t n, used;
239
240 if (!rpc->parser) {
241 rpc->parser = json_parser_create(0);
242 }
243 n = byteq_tailroom(&rpc->input);
244 used = json_parser_feed(rpc->parser,
245 (char *) byteq_tail(&rpc->input), n);
246 byteq_advance_tail(&rpc->input, used);
247 if (json_parser_is_done(rpc->parser)) {
248 jsonrpc_received(rpc);
249 if (rpc->status) {
250 return rpc->status;
251 }
252 }
253 }
254 }
255
256 *msgp = rpc->received;
257 rpc->received = NULL;
258 return 0;
259 }
260
261 void
262 jsonrpc_recv_wait(struct jsonrpc *rpc)
263 {
264 if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
265 poll_immediate_wake();
266 } else {
267 stream_recv_wait(rpc->stream);
268 }
269 }
270
271 int
272 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
273 {
274 int error;
275
276 error = jsonrpc_send(rpc, msg);
277 if (error) {
278 return error;
279 }
280
281 while (!queue_is_empty(&rpc->output) && !rpc->status) {
282 jsonrpc_run(rpc);
283 jsonrpc_wait(rpc);
284 poll_block();
285 }
286 return rpc->status;
287 }
288
289 int
290 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
291 {
292 for (;;) {
293 int error = jsonrpc_recv(rpc, msgp);
294 if (error != EAGAIN) {
295 return error;
296 }
297
298 jsonrpc_run(rpc);
299 jsonrpc_wait(rpc);
300 jsonrpc_recv_wait(rpc);
301 poll_block();
302 }
303 }
304
305 int
306 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
307 struct jsonrpc_msg **replyp)
308 {
309 struct jsonrpc_msg *reply = NULL;
310 struct json *id;
311 int error;
312
313 id = json_clone(request->id);
314 error = jsonrpc_send_block(rpc, request);
315 if (!error) {
316 for (;;) {
317 error = jsonrpc_recv_block(rpc, &reply);
318 if (error
319 || (reply->type == JSONRPC_REPLY
320 && json_equal(id, reply->id))) {
321 break;
322 }
323 jsonrpc_msg_destroy(reply);
324 }
325 }
326 *replyp = error ? NULL : reply;
327 json_destroy(id);
328 return error;
329 }
330
331 static void
332 jsonrpc_received(struct jsonrpc *rpc)
333 {
334 struct jsonrpc_msg *msg;
335 struct json *json;
336 char *error;
337
338 json = json_parser_finish(rpc->parser);
339 rpc->parser = NULL;
340 if (json->type == JSON_STRING) {
341 VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
342 rpc->name, json_string(json));
343 jsonrpc_error(rpc, EPROTO);
344 json_destroy(json);
345 return;
346 }
347
348 error = jsonrpc_msg_from_json(json, &msg);
349 if (error) {
350 VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
351 rpc->name, error);
352 free(error);
353 jsonrpc_error(rpc, EPROTO);
354 return;
355 }
356
357 jsonrpc_log_msg(rpc, "received", msg);
358 rpc->received = msg;
359 }
360
361 void
362 jsonrpc_error(struct jsonrpc *rpc, int error)
363 {
364 assert(error);
365 if (!rpc->status) {
366 rpc->status = error;
367 jsonrpc_cleanup(rpc);
368 }
369 }
370
371 static void
372 jsonrpc_cleanup(struct jsonrpc *rpc)
373 {
374 stream_close(rpc->stream);
375 rpc->stream = NULL;
376
377 json_parser_abort(rpc->parser);
378 rpc->parser = NULL;
379
380 jsonrpc_msg_destroy(rpc->received);
381 rpc->received = NULL;
382
383 queue_clear(&rpc->output);
384 rpc->backlog = 0;
385 }
386 \f
387 static struct jsonrpc_msg *
388 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
389 struct json *params, struct json *result, struct json *error,
390 struct json *id)
391 {
392 struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
393 msg->type = type;
394 msg->method = method ? xstrdup(method) : NULL;
395 msg->params = params;
396 msg->result = result;
397 msg->error = error;
398 msg->id = id;
399 return msg;
400 }
401
402 static struct json *
403 jsonrpc_create_id(void)
404 {
405 static unsigned int id;
406 return json_integer_create(id++);
407 }
408
409 struct jsonrpc_msg *
410 jsonrpc_create_request(const char *method, struct json *params,
411 struct json **idp)
412 {
413 struct json *id = jsonrpc_create_id();
414 if (idp) {
415 *idp = json_clone(id);
416 }
417 return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
418 }
419
420 struct jsonrpc_msg *
421 jsonrpc_create_notify(const char *method, struct json *params)
422 {
423 return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
424 }
425
426 struct jsonrpc_msg *
427 jsonrpc_create_reply(struct json *result, const struct json *id)
428 {
429 return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
430 json_clone(id));
431 }
432
433 struct jsonrpc_msg *
434 jsonrpc_create_error(struct json *error, const struct json *id)
435 {
436 return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
437 json_clone(id));
438 }
439
440 const char *
441 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
442 {
443 switch (type) {
444 case JSONRPC_REQUEST:
445 return "request";
446
447 case JSONRPC_NOTIFY:
448 return "notification";
449
450 case JSONRPC_REPLY:
451 return "reply";
452
453 case JSONRPC_ERROR:
454 return "error";
455 }
456 return "(null)";
457 }
458
459 char *
460 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
461 {
462 const char *type_name;
463 unsigned int pattern;
464
465 if (m->params && m->params->type != JSON_ARRAY) {
466 return xstrdup("\"params\" must be JSON array");
467 }
468
469 switch (m->type) {
470 case JSONRPC_REQUEST:
471 pattern = 0x11001;
472 break;
473
474 case JSONRPC_NOTIFY:
475 pattern = 0x11000;
476 break;
477
478 case JSONRPC_REPLY:
479 pattern = 0x00101;
480 break;
481
482 case JSONRPC_ERROR:
483 pattern = 0x00011;
484 break;
485
486 default:
487 return xasprintf("invalid JSON-RPC message type %d", m->type);
488 }
489
490 type_name = jsonrpc_msg_type_to_string(m->type);
491 if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
492 return xasprintf("%s must%s have \"method\"",
493 type_name, (pattern & 0x10000) ? "" : " not");
494
495 }
496 if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
497 return xasprintf("%s must%s have \"params\"",
498 type_name, (pattern & 0x1000) ? "" : " not");
499
500 }
501 if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
502 return xasprintf("%s must%s have \"result\"",
503 type_name, (pattern & 0x100) ? "" : " not");
504
505 }
506 if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
507 return xasprintf("%s must%s have \"error\"",
508 type_name, (pattern & 0x10) ? "" : " not");
509
510 }
511 if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
512 return xasprintf("%s must%s have \"id\"",
513 type_name, (pattern & 0x1) ? "" : " not");
514
515 }
516 return NULL;
517 }
518
519 void
520 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
521 {
522 if (m) {
523 free(m->method);
524 json_destroy(m->params);
525 json_destroy(m->result);
526 json_destroy(m->error);
527 json_destroy(m->id);
528 free(m);
529 }
530 }
531
532 static struct json *
533 null_from_json_null(struct json *json)
534 {
535 if (json && json->type == JSON_NULL) {
536 json_destroy(json);
537 return NULL;
538 }
539 return json;
540 }
541
542 char *
543 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
544 {
545 struct json *method = NULL;
546 struct jsonrpc_msg *msg = NULL;
547 struct shash *object;
548 char *error;
549
550 if (json->type != JSON_OBJECT) {
551 error = xstrdup("message is not a JSON object");
552 goto exit;
553 }
554 object = json_object(json);
555
556 method = shash_find_and_delete(object, "method");
557 if (method && method->type != JSON_STRING) {
558 error = xstrdup("method is not a JSON string");
559 goto exit;
560 }
561
562 msg = xzalloc(sizeof *msg);
563 msg->method = method ? xstrdup(method->u.string) : NULL;
564 msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
565 msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
566 msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
567 msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
568 msg->type = (msg->result ? JSONRPC_REPLY
569 : msg->error ? JSONRPC_ERROR
570 : msg->id ? JSONRPC_REQUEST
571 : JSONRPC_NOTIFY);
572 if (!shash_is_empty(object)) {
573 error = xasprintf("message has unexpected member \"%s\"",
574 shash_first(object)->name);
575 goto exit;
576 }
577 error = jsonrpc_msg_is_valid(msg);
578 if (error) {
579 goto exit;
580 }
581
582 exit:
583 json_destroy(method);
584 json_destroy(json);
585 if (error) {
586 jsonrpc_msg_destroy(msg);
587 msg = NULL;
588 }
589 *msgp = msg;
590 return error;
591 }
592
593 struct json *
594 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
595 {
596 struct json *json = json_object_create();
597
598 if (m->method) {
599 json_object_put(json, "method", json_string_create_nocopy(m->method));
600 }
601
602 if (m->params) {
603 json_object_put(json, "params", m->params);
604 }
605
606 if (m->result) {
607 json_object_put(json, "result", m->result);
608 } else if (m->type == JSONRPC_ERROR) {
609 json_object_put(json, "result", json_null_create());
610 }
611
612 if (m->error) {
613 json_object_put(json, "error", m->error);
614 } else if (m->type == JSONRPC_REPLY) {
615 json_object_put(json, "error", json_null_create());
616 }
617
618 if (m->id) {
619 json_object_put(json, "id", m->id);
620 } else if (m->type == JSONRPC_NOTIFY) {
621 json_object_put(json, "id", json_null_create());
622 }
623
624 free(m);
625
626 return json;
627 }
628 \f
629 /* A JSON-RPC session with reconnection. */
630
631 struct jsonrpc_session {
632 struct reconnect *reconnect;
633 struct jsonrpc *rpc;
634 struct stream *stream;
635 unsigned int seqno;
636 };
637
638 /* Creates and returns a jsonrpc_session that connects and reconnects, with
639 * back-off, to 'name', which should be a string acceptable to
640 * stream_open(). */
641 struct jsonrpc_session *
642 jsonrpc_session_open(const char *name)
643 {
644 struct jsonrpc_session *s;
645
646 s = xmalloc(sizeof *s);
647 s->reconnect = reconnect_create(time_msec());
648 reconnect_set_name(s->reconnect, name);
649 reconnect_enable(s->reconnect, time_msec());
650 s->rpc = NULL;
651 s->stream = NULL;
652 s->seqno = 0;
653
654 return s;
655 }
656
657 /* Creates and returns a jsonrpc_session that is initially connected to
658 * 'jsonrpc'. If the connection is dropped, it will not be reconnected. */
659 struct jsonrpc_session *
660 jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc)
661 {
662 struct jsonrpc_session *s;
663
664 s = xmalloc(sizeof *s);
665 s->reconnect = reconnect_create(time_msec());
666 reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
667 reconnect_set_max_tries(s->reconnect, 0);
668 reconnect_connected(s->reconnect, time_msec());
669 s->rpc = jsonrpc;
670 s->stream = NULL;
671 s->seqno = 0;
672
673 return s;
674 }
675
676 void
677 jsonrpc_session_close(struct jsonrpc_session *s)
678 {
679 if (s) {
680 jsonrpc_close(s->rpc);
681 reconnect_destroy(s->reconnect);
682 free(s);
683 }
684 }
685
686 static void
687 jsonrpc_session_disconnect(struct jsonrpc_session *s)
688 {
689 reconnect_disconnected(s->reconnect, time_msec(), 0);
690 if (s->rpc) {
691 jsonrpc_error(s->rpc, EOF);
692 jsonrpc_close(s->rpc);
693 s->rpc = NULL;
694 s->seqno++;
695 } else if (s->stream) {
696 stream_close(s->stream);
697 s->stream = NULL;
698 s->seqno++;
699 }
700 }
701
702 static void
703 jsonrpc_session_connect(struct jsonrpc_session *s)
704 {
705 int error;
706
707 jsonrpc_session_disconnect(s);
708 error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
709 if (error) {
710 reconnect_connect_failed(s->reconnect, time_msec(), error);
711 } else {
712 reconnect_connecting(s->reconnect, time_msec());
713 }
714 s->seqno++;
715 }
716
717 void
718 jsonrpc_session_run(struct jsonrpc_session *s)
719 {
720 if (s->rpc) {
721 int error;
722
723 jsonrpc_run(s->rpc);
724 error = jsonrpc_get_status(s->rpc);
725 if (error) {
726 jsonrpc_session_disconnect(s);
727 }
728 } else if (s->stream) {
729 int error;
730
731 stream_run(s->stream);
732 error = stream_connect(s->stream);
733 if (!error) {
734 reconnect_connected(s->reconnect, time_msec());
735 s->rpc = jsonrpc_open(s->stream);
736 s->stream = NULL;
737 } else if (error != EAGAIN) {
738 reconnect_connect_failed(s->reconnect, time_msec(), error);
739 stream_close(s->stream);
740 s->stream = NULL;
741 }
742 }
743
744 switch (reconnect_run(s->reconnect, time_msec())) {
745 case RECONNECT_CONNECT:
746 jsonrpc_session_connect(s);
747 break;
748
749 case RECONNECT_DISCONNECT:
750 jsonrpc_session_disconnect(s);
751 break;
752
753 case RECONNECT_PROBE:
754 if (s->rpc) {
755 struct json *params;
756 struct jsonrpc_msg *request;
757
758 params = json_array_create_empty();
759 request = jsonrpc_create_request("echo", params, NULL);
760 json_destroy(request->id);
761 request->id = json_string_create("echo");
762 jsonrpc_send(s->rpc, request);
763 }
764 break;
765 }
766 }
767
768 void
769 jsonrpc_session_wait(struct jsonrpc_session *s)
770 {
771 if (s->rpc) {
772 jsonrpc_wait(s->rpc);
773 } else if (s->stream) {
774 stream_run_wait(s->stream);
775 stream_connect_wait(s->stream);
776 }
777 reconnect_wait(s->reconnect, time_msec());
778 }
779
780 size_t
781 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
782 {
783 return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
784 }
785
786 const char *
787 jsonrpc_session_get_name(const struct jsonrpc_session *s)
788 {
789 return reconnect_get_name(s->reconnect);
790 }
791
792 int
793 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
794 {
795 return s->rpc ? jsonrpc_send(s->rpc, msg) : ENOTCONN;
796 }
797
798 struct jsonrpc_msg *
799 jsonrpc_session_recv(struct jsonrpc_session *s)
800 {
801 if (s->rpc) {
802 struct jsonrpc_msg *msg;
803 jsonrpc_recv(s->rpc, &msg);
804 if (msg) {
805 reconnect_received(s->reconnect, time_msec());
806 if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
807 /* Echo request. Send reply. */
808 struct jsonrpc_msg *reply;
809
810 reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
811 jsonrpc_session_send(s, reply);
812 } else if (msg->type == JSONRPC_REPLY
813 && msg->id && msg->id->type == JSON_STRING
814 && !strcmp(msg->id->u.string, "echo")) {
815 /* It's a reply to our echo request. Suppress it. */
816 } else {
817 return msg;
818 }
819 jsonrpc_msg_destroy(msg);
820 }
821 }
822 return NULL;
823 }
824
825 void
826 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
827 {
828 if (s->rpc) {
829 jsonrpc_recv_wait(s->rpc);
830 }
831 }
832
833 bool
834 jsonrpc_session_is_alive(const struct jsonrpc_session *s)
835 {
836 return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
837 }
838
839 bool
840 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
841 {
842 return s->rpc != NULL;
843 }
844
845 unsigned int
846 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
847 {
848 return s->seqno;
849 }
850
851 void
852 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
853 {
854 reconnect_force_reconnect(s->reconnect, time_msec());
855 }