]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | import json |
2 | import socket | |
3 | import time | |
9f95a23c TL |
4 | import os |
5 | import logging | |
6 | import copy | |
11fdf7f2 TL |
7 | |
8 | ||
9 | def print_dict(d): | |
10 | print(json.dumps(d, indent=2)) | |
11 | ||
12 | ||
f67539c2 TL |
13 | def print_json(s): |
14 | print(json.dumps(s, indent=2).strip('"')) | |
15 | ||
16 | ||
11fdf7f2 TL |
17 | class JSONRPCException(Exception): |
18 | def __init__(self, message): | |
19 | self.message = message | |
20 | ||
21 | ||
22 | class JSONRPCClient(object): | |
9f95a23c TL |
23 | def __init__(self, addr, port=None, timeout=60.0, **kwargs): |
24 | self.sock = None | |
25 | ch = logging.StreamHandler() | |
26 | ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) | |
27 | ch.setLevel(logging.DEBUG) | |
28 | self._logger = logging.getLogger("JSONRPCClient(%s)" % addr) | |
29 | self._logger.addHandler(ch) | |
f67539c2 TL |
30 | self.log_set_level(kwargs.get('log_level', logging.ERROR)) |
31 | connect_retries = kwargs.get('conn_retries', 0) | |
9f95a23c | 32 | |
11fdf7f2 | 33 | self.timeout = timeout |
9f95a23c TL |
34 | self._request_id = 0 |
35 | self._recv_buf = "" | |
36 | self._reqs = [] | |
f67539c2 TL |
37 | |
38 | for i in range(connect_retries): | |
39 | try: | |
40 | self._connect(addr, port) | |
41 | return | |
42 | except Exception as e: | |
43 | # ignore and retry in 200ms | |
44 | time.sleep(0.2) | |
45 | ||
46 | # try one last time without try/except | |
47 | self._connect(addr, port) | |
48 | ||
49 | def __enter__(self): | |
50 | return self | |
51 | ||
52 | def __exit__(self, exception_type, exception_value, traceback): | |
53 | self.close() | |
54 | ||
55 | def _connect(self, addr, port): | |
11fdf7f2 | 56 | try: |
9f95a23c TL |
57 | if os.path.exists(addr): |
58 | self._logger.debug("Trying to connect to UNIX socket: %s", addr) | |
11fdf7f2 TL |
59 | self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
60 | self.sock.connect(addr) | |
f67539c2 TL |
61 | elif port: |
62 | if ':' in addr: | |
63 | self._logger.debug("Trying to connect to IPv6 address addr:%s, port:%i", addr, port) | |
64 | for res in socket.getaddrinfo(addr, port, socket.AF_INET6, socket.SOCK_STREAM, socket.SOL_TCP): | |
65 | af, socktype, proto, canonname, sa = res | |
66 | self.sock = socket.socket(af, socktype, proto) | |
67 | self.sock.connect(sa) | |
68 | else: | |
69 | self._logger.debug("Trying to connect to IPv4 address addr:%s, port:%i'", addr, port) | |
70 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
71 | self.sock.connect((addr, port)) | |
11fdf7f2 | 72 | else: |
f67539c2 | 73 | raise socket.error("Unix socket '%s' does not exist" % addr) |
11fdf7f2 TL |
74 | except socket.error as ex: |
75 | raise JSONRPCException("Error while connecting to %s\n" | |
76 | "Error details: %s" % (addr, ex)) | |
77 | ||
9f95a23c TL |
78 | def get_logger(self): |
79 | return self._logger | |
80 | ||
81 | """Set logging level | |
82 | ||
83 | Args: | |
84 | lvl: Log level to set as accepted by logger.setLevel | |
85 | """ | |
f67539c2 | 86 | def log_set_level(self, lvl): |
9f95a23c TL |
87 | self._logger.info("Setting log level to %s", lvl) |
88 | self._logger.setLevel(lvl) | |
89 | self._logger.info("Log level set to %s", lvl) | |
90 | ||
91 | def close(self): | |
92 | if getattr(self, "sock", None): | |
93 | self.sock.shutdown(socket.SHUT_RDWR) | |
94 | self.sock.close() | |
95 | self.sock = None | |
96 | ||
97 | def add_request(self, method, params): | |
98 | self._request_id += 1 | |
99 | req = { | |
100 | 'jsonrpc': '2.0', | |
101 | 'method': method, | |
102 | 'id': self._request_id | |
103 | } | |
104 | ||
105 | if params: | |
106 | req['params'] = copy.deepcopy(params) | |
107 | ||
108 | self._logger.debug("append request:\n%s\n", json.dumps(req)) | |
109 | self._reqs.append(req) | |
110 | return self._request_id | |
111 | ||
112 | def flush(self): | |
113 | self._logger.debug("Flushing buffer") | |
114 | # TODO: We can drop indent parameter | |
115 | reqstr = "\n".join(json.dumps(req, indent=2) for req in self._reqs) | |
116 | self._reqs = [] | |
117 | self._logger.info("Requests:\n%s\n", reqstr) | |
118 | self.sock.sendall(reqstr.encode("utf-8")) | |
11fdf7f2 | 119 | |
9f95a23c TL |
120 | def send(self, method, params=None): |
121 | id = self.add_request(method, params) | |
122 | self.flush() | |
123 | return id | |
11fdf7f2 | 124 | |
9f95a23c TL |
125 | def decode_one_response(self): |
126 | try: | |
127 | self._logger.debug("Trying to decode response '%s'", self._recv_buf) | |
128 | buf = self._recv_buf.lstrip() | |
129 | obj, idx = json.JSONDecoder().raw_decode(buf) | |
130 | self._recv_buf = buf[idx:] | |
131 | return obj | |
132 | except ValueError: | |
133 | self._logger.debug("Partial response") | |
134 | return None | |
135 | ||
136 | def recv(self): | |
f67539c2 | 137 | start_time = time.process_time() |
9f95a23c TL |
138 | response = self.decode_one_response() |
139 | while not response: | |
11fdf7f2 | 140 | try: |
f67539c2 | 141 | timeout = self.timeout - (time.process_time() - start_time) |
11fdf7f2 TL |
142 | self.sock.settimeout(timeout) |
143 | newdata = self.sock.recv(4096) | |
9f95a23c TL |
144 | if not newdata: |
145 | self.sock.close() | |
146 | self.sock = None | |
147 | raise JSONRPCException("Connection closed with partial response:\n%s\n" % self._recv_buf) | |
148 | self._recv_buf += newdata.decode("utf-8") | |
149 | response = self.decode_one_response() | |
11fdf7f2 | 150 | except socket.timeout: |
9f95a23c | 151 | break # throw exception after loop to avoid Python freaking out about nested exceptions |
11fdf7f2 TL |
152 | except ValueError: |
153 | continue # incomplete response; keep buffering | |
11fdf7f2 TL |
154 | |
155 | if not response: | |
9f95a23c TL |
156 | raise JSONRPCException("Timeout while waiting for response:\n%s\n" % self._recv_buf) |
157 | ||
158 | self._logger.info("response:\n%s\n", json.dumps(response, indent=2)) | |
159 | return response | |
160 | ||
f67539c2 | 161 | def call(self, method, params={}): |
9f95a23c TL |
162 | self._logger.debug("call('%s')" % method) |
163 | req_id = self.send(method, params) | |
164 | try: | |
165 | response = self.recv() | |
166 | except JSONRPCException as e: | |
167 | """ Don't expect response to kill """ | |
f67539c2 | 168 | if not self.sock and method == "spdk_kill_instance": |
9f95a23c | 169 | self._logger.info("Connection terminated but ignoring since method is '%s'" % method) |
11fdf7f2 | 170 | return {} |
11fdf7f2 | 171 | else: |
9f95a23c | 172 | raise e |
11fdf7f2 TL |
173 | |
174 | if 'error' in response: | |
f67539c2 TL |
175 | params["method"] = method |
176 | params["req_id"] = req_id | |
177 | msg = "\n".join(["request:", "%s" % json.dumps(params, indent=2), | |
9f95a23c | 178 | "Got JSON-RPC error response", |
11fdf7f2 TL |
179 | "response:", |
180 | json.dumps(response['error'], indent=2)]) | |
181 | raise JSONRPCException(msg) | |
182 | ||
183 | return response['result'] |