X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=lib%2Fjsonrpc.c;h=ecbc939fe7f96a612bbcb413f21561fab64926c2;hb=0f6a89d07f3f2766c1f54b2c6bf9c1d2057cf4de;hp=5c3359cb241d929ee1c1f1b5f03237d34d8bc3ff;hpb=59efa47adf3234ec51541405726d033173851285;p=mirror_ovs.git diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c index 5c3359cb2..ecbc939fe 100644 --- a/lib/jsonrpc.c +++ b/lib/jsonrpc.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc. + * Copyright (c) 2009-2017 Nicira, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,20 +18,21 @@ #include "jsonrpc.h" -#include #include #include "byteq.h" -#include "dynamic-string.h" +#include "openvswitch/dynamic-string.h" #include "fatal-signal.h" -#include "json.h" -#include "list.h" -#include "ofpbuf.h" -#include "poll-loop.h" +#include "openvswitch/json.h" +#include "openvswitch/list.h" +#include "openvswitch/ofpbuf.h" +#include "ovs-thread.h" +#include "openvswitch/poll-loop.h" #include "reconnect.h" #include "stream.h" +#include "svec.h" #include "timeval.h" -#include "vlog.h" +#include "openvswitch/vlog.h" VLOG_DEFINE_THIS_MODULE(jsonrpc); @@ -42,38 +43,36 @@ struct jsonrpc { /* Input. */ struct byteq input; + uint8_t input_buffer[4096]; struct json_parser *parser; - struct jsonrpc_msg *received; /* Output. */ - struct list output; /* Contains "struct ofpbuf"s. */ + struct ovs_list output; /* Contains "struct ofpbuf"s. */ + size_t output_count; /* Number of elements in "output". */ size_t backlog; }; /* Rate limit for error messages. */ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); -static void jsonrpc_received(struct jsonrpc *); +static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *); static void jsonrpc_cleanup(struct jsonrpc *); static void jsonrpc_error(struct jsonrpc *, int error); /* This is just the same as stream_open() except that it uses the default - * JSONRPC ports if none is specified. */ + * JSONRPC port if none is specified. */ int jsonrpc_stream_open(const char *name, struct stream **streamp, uint8_t dscp) { - return stream_open_with_default_ports(name, JSONRPC_TCP_PORT, - JSONRPC_SSL_PORT, streamp, - dscp); + return stream_open_with_default_port(name, OVSDB_PORT, streamp, dscp); } /* This is just the same as pstream_open() except that it uses the default - * JSONRPC ports if none is specified. */ + * JSONRPC port if none is specified. */ int jsonrpc_pstream_open(const char *name, struct pstream **pstreamp, uint8_t dscp) { - return pstream_open_with_default_ports(name, JSONRPC_TCP_PORT, - JSONRPC_SSL_PORT, pstreamp, dscp); + return pstream_open_with_default_port(name, OVSDB_PORT, pstreamp, dscp); } /* Returns a new JSON-RPC stream that uses 'stream' for input and output. The @@ -83,13 +82,13 @@ jsonrpc_open(struct stream *stream) { struct jsonrpc *rpc; - assert(stream != NULL); + ovs_assert(stream != NULL); rpc = xzalloc(sizeof *rpc); rpc->name = xstrdup(stream_get_name(stream)); rpc->stream = stream; - byteq_init(&rpc->input); - list_init(&rpc->output); + byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer); + ovs_list_init(&rpc->output); return rpc; } @@ -115,7 +114,7 @@ jsonrpc_run(struct jsonrpc *rpc) } stream_run(rpc->stream); - while (!list_is_empty(&rpc->output)) { + while (!ovs_list_is_empty(&rpc->output)) { struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next); int retval; @@ -124,13 +123,14 @@ jsonrpc_run(struct jsonrpc *rpc) rpc->backlog -= retval; ofpbuf_pull(buf, retval); if (!buf->size) { - list_remove(&buf->list_node); + ovs_list_remove(&buf->list_node); + rpc->output_count--; ofpbuf_delete(buf); } } else { if (retval != -EAGAIN) { VLOG_WARN_RL(&rl, "%s: send error: %s", - rpc->name, strerror(-retval)); + rpc->name, ovs_strerror(-retval)); jsonrpc_error(rpc, -retval); } break; @@ -145,7 +145,7 @@ jsonrpc_wait(struct jsonrpc *rpc) { if (!rpc->status) { stream_run_wait(rpc->stream); - if (!list_is_empty(&rpc->output)) { + if (!ovs_list_is_empty(&rpc->output)) { stream_send_wait(rpc->stream); } } @@ -157,11 +157,11 @@ jsonrpc_wait(struct jsonrpc *rpc) * - >0: errno value * - EOF: end of file (remote end closed connection; not necessarily an error). * - * When this functions nonzero, 'rpc' is effectively out of commission. 'rpc' - * will not receive any more messages and any further messages that one - * attempts to send with 'rpc' will be discarded. The caller can keep 'rpc' - * around as long as it wants, but it's not going to provide any more useful - * services. + * When this function returns nonzero, 'rpc' is effectively out of + * commission. 'rpc' will not receive any more messages and any further + * messages that one attempts to send with 'rpc' will be discarded. The + * caller can keep 'rpc' around as long as it wants, but it's not going + * to provide any more useful services. */ int jsonrpc_get_status(const struct jsonrpc *rpc) @@ -178,6 +178,14 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc) return rpc->status ? 0 : rpc->backlog; } +/* Returns the number of bytes that have been received on 'rpc''s underlying + * stream. (The value wraps around if it exceeds UINT_MAX.) */ +unsigned int +jsonrpc_get_received_bytes(const struct jsonrpc *rpc) +{ + return rpc->input.head; +} + /* Returns 'rpc''s name, that is, the name returned by stream_get_name() for * the stream underlying 'rpc' when 'rpc' was created. */ const char * @@ -231,8 +239,8 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) { struct ofpbuf *buf; struct json *json; + struct ds ds = DS_EMPTY_INITIALIZER; size_t length; - char *s; if (rpc->status) { jsonrpc_msg_destroy(msg); @@ -242,16 +250,22 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) jsonrpc_log_msg(rpc, "send", msg); json = jsonrpc_msg_to_json(msg); - s = json_to_string(json, 0); - length = strlen(s); + json_to_ds(json, 0, &ds); + length = ds.length; json_destroy(json); buf = xmalloc(sizeof *buf); - ofpbuf_use(buf, s, length); - buf->size = length; - list_push_back(&rpc->output, &buf->list_node); + ofpbuf_use_ds(buf, &ds); + ovs_list_push_back(&rpc->output, &buf->list_node); + rpc->output_count++; rpc->backlog += length; + if (rpc->output_count >= 50) { + VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of" + " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name, + rpc->output_count, rpc->backlog); + } + if (rpc->backlog == length) { jsonrpc_run(rpc); } @@ -285,11 +299,10 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) } for (i = 0; i < 50; i++) { - if (rpc->received) { - *msgp = rpc->received; - rpc->received = NULL; - return 0; - } else if (byteq_is_empty(&rpc->input)) { + size_t n, used; + + /* Fill our input buffer if it's empty. */ + if (byteq_is_empty(&rpc->input)) { size_t chunk; int retval; @@ -300,7 +313,7 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) return EAGAIN; } else { VLOG_WARN_RL(&rl, "%s: receive error: %s", - rpc->name, strerror(-retval)); + rpc->name, ovs_strerror(-retval)); jsonrpc_error(rpc, -retval); return rpc->status; } @@ -309,27 +322,31 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) return EOF; } byteq_advance_head(&rpc->input, retval); - } else { - size_t n, used; + } - if (!rpc->parser) { - rpc->parser = json_parser_create(0); + /* We have some input. Feed it into the JSON parser. */ + if (!rpc->parser) { + rpc->parser = json_parser_create(0); + } + n = byteq_tailroom(&rpc->input); + used = json_parser_feed(rpc->parser, + (char *) byteq_tail(&rpc->input), n); + byteq_advance_tail(&rpc->input, used); + + /* If we have complete JSON, attempt to parse it as JSON-RPC. */ + if (json_parser_is_done(rpc->parser)) { + *msgp = jsonrpc_parse_received_message(rpc); + if (*msgp) { + return 0; } - n = byteq_tailroom(&rpc->input); - used = json_parser_feed(rpc->parser, - (char *) byteq_tail(&rpc->input), n); - byteq_advance_tail(&rpc->input, used); - if (json_parser_is_done(rpc->parser)) { - jsonrpc_received(rpc); - if (rpc->status) { - const struct byteq *q = &rpc->input; - if (q->head <= BYTEQ_SIZE) { - stream_report_content(q->buffer, q->head, - STREAM_JSONRPC, - THIS_MODULE, rpc->name); - } - return rpc->status; + + if (rpc->status) { + const struct byteq *q = &rpc->input; + if (q->head <= q->size) { + stream_report_content(q->buffer, q->head, STREAM_JSONRPC, + &this_module, rpc->name); } + return rpc->status; } } } @@ -342,8 +359,8 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) void jsonrpc_recv_wait(struct jsonrpc *rpc) { - if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) { - (poll_immediate_wake)(rpc->name); + if (rpc->status || !byteq_is_empty(&rpc->input)) { + poll_immediate_wake_at(rpc->name); } else { stream_recv_wait(rpc->stream); } @@ -368,7 +385,7 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg) for (;;) { jsonrpc_run(rpc); - if (list_is_empty(&rpc->output) || rpc->status) { + if (ovs_list_is_empty(&rpc->output) || rpc->status) { return rpc->status; } jsonrpc_wait(rpc); @@ -432,8 +449,11 @@ jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request, return error; } -static void -jsonrpc_received(struct jsonrpc *rpc) +/* Attempts to parse the content of 'rpc->parser' (which is complete JSON) as a + * JSON-RPC message. If successful, returns the JSON-RPC message. On failure, + * signals an error on 'rpc' with jsonrpc_error() and returns NULL. */ +static struct jsonrpc_msg * +jsonrpc_parse_received_message(struct jsonrpc *rpc) { struct jsonrpc_msg *msg; struct json *json; @@ -446,7 +466,7 @@ jsonrpc_received(struct jsonrpc *rpc) rpc->name, json_string(json)); jsonrpc_error(rpc, EPROTO); json_destroy(json); - return; + return NULL; } error = jsonrpc_msg_from_json(json, &msg); @@ -455,17 +475,17 @@ jsonrpc_received(struct jsonrpc *rpc) rpc->name, error); free(error); jsonrpc_error(rpc, EPROTO); - return; + return NULL; } jsonrpc_log_msg(rpc, "received", msg); - rpc->received = msg; + return msg; } static void jsonrpc_error(struct jsonrpc *rpc, int error) { - assert(error); + ovs_assert(error); if (!rpc->status) { rpc->status = error; jsonrpc_cleanup(rpc); @@ -481,11 +501,9 @@ jsonrpc_cleanup(struct jsonrpc *rpc) json_parser_abort(rpc->parser); rpc->parser = NULL; - jsonrpc_msg_destroy(rpc->received); - rpc->received = NULL; - ofpbuf_list_delete(&rpc->output); rpc->backlog = 0; + rpc->output_count = 0; } static struct jsonrpc_msg * @@ -495,7 +513,7 @@ jsonrpc_create(enum jsonrpc_msg_type type, const char *method, { struct jsonrpc_msg *msg = xmalloc(sizeof *msg); msg->type = type; - msg->method = method ? xstrdup(method) : NULL; + msg->method = nullable_xstrdup(method); msg->params = params; msg->result = result; msg->error = error; @@ -506,8 +524,11 @@ jsonrpc_create(enum jsonrpc_msg_type type, const char *method, static struct json * jsonrpc_create_id(void) { - static unsigned int id; - return json_integer_create(id++); + static atomic_count next_id = ATOMIC_COUNT_INIT(0); + unsigned int id; + + id = atomic_count_inc(&next_id); + return json_integer_create(id); } struct jsonrpc_msg * @@ -541,6 +562,16 @@ jsonrpc_create_error(struct json *error, const struct json *id) json_clone(id)); } +struct jsonrpc_msg * +jsonrpc_msg_clone(const struct jsonrpc_msg *old) +{ + return jsonrpc_create(old->type, old->method, + json_nullable_clone(old->params), + json_nullable_clone(old->result), + json_nullable_clone(old->error), + json_nullable_clone(old->id)); +} + const char * jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type) { @@ -664,7 +695,7 @@ jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp) } msg = xzalloc(sizeof *msg); - msg->method = method ? xstrdup(method->u.string) : NULL; + msg->method = method ? xstrdup(method->string) : NULL; msg->params = null_from_json_null(shash_find_and_delete(object, "params")); msg->result = null_from_json_null(shash_find_and_delete(object, "result")); msg->error = null_from_json_null(shash_find_and_delete(object, "error")); @@ -694,6 +725,9 @@ exit: return error; } +/* Returns 'm' converted to JSON suitable for sending as a JSON-RPC message. + * + * Consumes and destroys 'm'. */ struct json * jsonrpc_msg_to_json(struct jsonrpc_msg *m) { @@ -729,45 +763,91 @@ jsonrpc_msg_to_json(struct jsonrpc_msg *m) return json; } + +char * +jsonrpc_msg_to_string(const struct jsonrpc_msg *m) +{ + struct jsonrpc_msg *copy = jsonrpc_msg_clone(m); + struct json *json = jsonrpc_msg_to_json(copy); + char *s = json_to_string(json, JSSF_SORT); + json_destroy(json); + return s; +} /* A JSON-RPC session with reconnection. */ struct jsonrpc_session { + struct svec remotes; + size_t next_remote; + struct reconnect *reconnect; struct jsonrpc *rpc; struct stream *stream; struct pstream *pstream; + int last_error; unsigned int seqno; uint8_t dscp; }; +static void +jsonrpc_session_pick_remote(struct jsonrpc_session *s) +{ + reconnect_set_name(s->reconnect, + s->remotes.names[s->next_remote++ % s->remotes.n]); +} + /* Creates and returns a jsonrpc_session to 'name', which should be a string * acceptable to stream_open() or pstream_open(). * * If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new - * jsonrpc_session connects and reconnects, with back-off, to 'name'. + * jsonrpc_session connects to 'name'. If 'retry' is true, then the new + * session connects and reconnects to 'name', with backoff. If 'retry' is + * false, the new session will only try to connect once and after a connection + * failure or a disconnection jsonrpc_session_is_alive() will return false for + * the new session. * * If 'name' is a passive connection method, e.g. "ptcp:", the new * jsonrpc_session listens for connections to 'name'. It maintains at most one * connection at any given time. Any new connection causes the previous one * (if any) to be dropped. */ struct jsonrpc_session * -jsonrpc_session_open(const char *name) +jsonrpc_session_open(const char *name, bool retry) +{ + const struct svec remotes = { .names = (char **) &name, .n = 1 }; + return jsonrpc_session_open_multiple(&remotes, retry); +} + +struct jsonrpc_session * +jsonrpc_session_open_multiple(const struct svec *remotes, bool retry) { struct jsonrpc_session *s; s = xmalloc(sizeof *s); + + /* Set 'n' remotes from 'names'. */ + svec_clone(&s->remotes, remotes); + if (!s->remotes.n) { + svec_add(&s->remotes, "invalid:"); + } + s->next_remote = 0; + s->reconnect = reconnect_create(time_msec()); - reconnect_set_name(s->reconnect, name); + jsonrpc_session_pick_remote(s); reconnect_enable(s->reconnect, time_msec()); + reconnect_set_backoff_free_tries(s->reconnect, remotes->n); s->rpc = NULL; s->stream = NULL; s->pstream = NULL; s->seqno = 0; s->dscp = 0; + s->last_error = 0; + const char *name = reconnect_get_name(s->reconnect); if (!pstream_verify_name(name)) { reconnect_set_passive(s->reconnect, true, time_msec()); + } else if (!retry) { + reconnect_set_max_tries(s->reconnect, remotes->n); + reconnect_set_backoff(s->reconnect, INT_MAX, INT_MAX); } if (!stream_or_pstream_needs_probes(name)) { @@ -783,20 +863,24 @@ jsonrpc_session_open(const char *name) * On the assumption that such connections are likely to be short-lived * (e.g. from ovs-vsctl), informational logging for them is suppressed. */ struct jsonrpc_session * -jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc) +jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp) { struct jsonrpc_session *s; s = xmalloc(sizeof *s); + svec_init(&s->remotes); + svec_add(&s->remotes, jsonrpc_get_name(jsonrpc)); + s->next_remote = 0; s->reconnect = reconnect_create(time_msec()); reconnect_set_quiet(s->reconnect, true); reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc)); reconnect_set_max_tries(s->reconnect, 0); reconnect_connected(s->reconnect, time_msec()); + s->dscp = dscp; s->rpc = jsonrpc; s->stream = NULL; s->pstream = NULL; - s->seqno = 0; + s->seqno = 1; return s; } @@ -809,10 +893,20 @@ jsonrpc_session_close(struct jsonrpc_session *s) reconnect_destroy(s->reconnect); stream_close(s->stream); pstream_close(s->pstream); + svec_destroy(&s->remotes); free(s); } } +struct jsonrpc * +jsonrpc_session_steal(struct jsonrpc_session *s) +{ + struct jsonrpc *rpc = s->rpc; + s->rpc = NULL; + jsonrpc_session_close(s); + return rpc; +} + static void jsonrpc_session_disconnect(struct jsonrpc_session *s) { @@ -820,12 +914,15 @@ jsonrpc_session_disconnect(struct jsonrpc_session *s) jsonrpc_error(s->rpc, EOF); jsonrpc_close(s->rpc); s->rpc = NULL; - s->seqno++; } else if (s->stream) { stream_close(s->stream); s->stream = NULL; - s->seqno++; + } else { + return; } + + s->seqno++; + jsonrpc_session_pick_remote(s); } static void @@ -839,6 +936,8 @@ jsonrpc_session_connect(struct jsonrpc_session *s) error = jsonrpc_stream_open(name, &s->stream, s->dscp); if (!error) { reconnect_connecting(s->reconnect, time_msec()); + } else { + s->last_error = error; } } else { error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream, @@ -850,8 +949,8 @@ jsonrpc_session_connect(struct jsonrpc_session *s) if (error) { reconnect_connect_failed(s->reconnect, time_msec(), error); + jsonrpc_session_pick_remote(s); } - s->seqno++; } void @@ -871,6 +970,7 @@ jsonrpc_session_run(struct jsonrpc_session *s) } reconnect_connected(s->reconnect, time_msec()); s->rpc = jsonrpc_open(stream); + s->seqno++; } else if (error != EAGAIN) { reconnect_listen_error(s->reconnect, time_msec(), error); pstream_close(s->pstream); @@ -879,13 +979,28 @@ jsonrpc_session_run(struct jsonrpc_session *s) } if (s->rpc) { + size_t backlog; int error; + backlog = jsonrpc_get_backlog(s->rpc); jsonrpc_run(s->rpc); + if (jsonrpc_get_backlog(s->rpc) < backlog) { + /* Data previously caught in a queue was successfully sent (or + * there's an error, which we'll catch below.) + * + * We don't count data that is successfully sent immediately as + * activity, because there's a lot of queuing downstream from us, + * which means that we can push a lot of data into a connection + * that has stalled and won't ever recover. + */ + reconnect_activity(s->reconnect, time_msec()); + } + error = jsonrpc_get_status(s->rpc); if (error) { reconnect_disconnected(s->reconnect, time_msec(), error); jsonrpc_session_disconnect(s); + s->last_error = error; } } else if (s->stream) { int error; @@ -896,10 +1011,13 @@ jsonrpc_session_run(struct jsonrpc_session *s) reconnect_connected(s->reconnect, time_msec()); s->rpc = jsonrpc_open(s->stream); s->stream = NULL; + s->seqno++; } else if (error != EAGAIN) { reconnect_connect_failed(s->reconnect, time_msec(), error); + jsonrpc_session_pick_remote(s); stream_close(s->stream); s->stream = NULL; + s->last_error = error; } } @@ -957,6 +1075,22 @@ jsonrpc_session_get_name(const struct jsonrpc_session *s) return reconnect_get_name(s->reconnect); } +const char * +jsonrpc_session_get_id(const struct jsonrpc_session *s) +{ + if (s->rpc && s->rpc->stream) { + return stream_get_peer_id(s->rpc->stream); + } else { + return NULL; + } +} + +size_t +jsonrpc_session_get_n_remotes(const struct jsonrpc_session *s) +{ + return s->remotes.n; +} + /* Always takes ownership of 'msg', regardless of success. */ int jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg) @@ -973,10 +1107,21 @@ struct jsonrpc_msg * jsonrpc_session_recv(struct jsonrpc_session *s) { if (s->rpc) { + unsigned int received_bytes; struct jsonrpc_msg *msg; + + received_bytes = jsonrpc_get_received_bytes(s->rpc); jsonrpc_recv(s->rpc, &msg); + if (received_bytes != jsonrpc_get_received_bytes(s->rpc)) { + /* Data was successfully received. + * + * Previously we only counted receiving a full message as activity, + * but with large messages or a slow connection that policy could + * time out the session mid-message. */ + reconnect_activity(s->reconnect, time_msec()); + } + if (msg) { - reconnect_received(s->reconnect, time_msec()); if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) { /* Echo request. Send reply. */ struct jsonrpc_msg *reply; @@ -985,7 +1130,7 @@ jsonrpc_session_recv(struct jsonrpc_session *s) jsonrpc_session_send(s, reply); } else if (msg->type == JSONRPC_REPLY && msg->id && msg->id->type == JSON_STRING - && !strcmp(msg->id->u.string, "echo")) { + && !strcmp(msg->id->string, "echo")) { /* It's a reply to our echo request. Suppress it. */ } else { return msg; @@ -1004,30 +1149,49 @@ jsonrpc_session_recv_wait(struct jsonrpc_session *s) } } +/* Returns true if 's' is currently connected or trying to connect. */ bool jsonrpc_session_is_alive(const struct jsonrpc_session *s) { return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect); } +/* Returns true if 's' is currently connected. */ bool jsonrpc_session_is_connected(const struct jsonrpc_session *s) { return s->rpc != NULL; } +/* Returns a sequence number for 's'. The sequence number increments every + * time 's' connects or disconnects. Thus, a caller can use the change (or + * lack of change) in the sequence number to figure out whether the underlying + * connection is the same as before. */ unsigned int jsonrpc_session_get_seqno(const struct jsonrpc_session *s) { return s->seqno; } +/* Returns the current status of 's'. If 's' is NULL or is disconnected, this + * is 0, otherwise it is the status of the connection, as reported by + * jsonrpc_get_status(). */ int jsonrpc_session_get_status(const struct jsonrpc_session *s) { return s && s->rpc ? jsonrpc_get_status(s->rpc) : 0; } +/* Returns the last error reported on a connection by 's'. The return value is + * 0 only if no connection made by 's' has ever encountered an error. See + * jsonrpc_get_status() for return value interpretation. */ +int +jsonrpc_session_get_last_error(const struct jsonrpc_session *s) +{ + return s->last_error; +} + +/* Populates 'stats' with statistics from 's'. */ void jsonrpc_session_get_reconnect_stats(const struct jsonrpc_session *s, struct reconnect_stats *stats) @@ -1035,18 +1199,36 @@ jsonrpc_session_get_reconnect_stats(const struct jsonrpc_session *s, reconnect_get_stats(s->reconnect, time_msec(), stats); } +/* Enables 's' to reconnect to the peer if the connection drops. */ +void +jsonrpc_session_enable_reconnect(struct jsonrpc_session *s) +{ + reconnect_set_max_tries(s->reconnect, UINT_MAX); + reconnect_set_backoff(s->reconnect, RECONNECT_DEFAULT_MIN_BACKOFF, + RECONNECT_DEFAULT_MAX_BACKOFF); +} + +/* Forces 's' to drop its connection (if any) and reconnect. */ void jsonrpc_session_force_reconnect(struct jsonrpc_session *s) { reconnect_force_reconnect(s->reconnect, time_msec()); } +/* Sets 'max_backoff' as the maximum time, in milliseconds, to wait after a + * connection attempt fails before attempting to connect again. */ void jsonrpc_session_set_max_backoff(struct jsonrpc_session *s, int max_backoff) { reconnect_set_backoff(s->reconnect, 0, max_backoff); } +/* Sets the "probe interval" for 's' to 'probe_interval', in milliseconds. If + * this is zero, it disables the connection keepalive feature. Otherwise, if + * 's' is idle for 'probe_interval' milliseconds then 's' will send an echo + * request and, if no reply is received within an additional 'probe_interval' + * milliseconds, close the connection (then reconnect, if that feature is + * enabled). */ void jsonrpc_session_set_probe_interval(struct jsonrpc_session *s, int probe_interval) @@ -1054,9 +1236,17 @@ jsonrpc_session_set_probe_interval(struct jsonrpc_session *s, reconnect_set_probe_interval(s->reconnect, probe_interval); } +/* Sets the DSCP value used for 's''s connection to 'dscp'. If this is + * different from the DSCP value currently in use then the connection is closed + * and reconnected. */ void -jsonrpc_session_set_dscp(struct jsonrpc_session *s, - uint8_t dscp) +jsonrpc_session_set_dscp(struct jsonrpc_session *s, uint8_t dscp) { - s->dscp = dscp; + if (s->dscp != dscp) { + pstream_close(s->pstream); + s->pstream = NULL; + + s->dscp = dscp; + jsonrpc_session_force_reconnect(s); + } }