]> git.proxmox.com Git - ceph.git/blame - ceph/src/spdk/scripts/rpc/client.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / spdk / scripts / rpc / client.py
CommitLineData
11fdf7f2
TL
1import json
2import socket
3import time
9f95a23c
TL
4import os
5import logging
6import copy
11fdf7f2
TL
7
8
9def print_dict(d):
10 print(json.dumps(d, indent=2))
11
12
f67539c2
TL
13def print_json(s):
14 print(json.dumps(s, indent=2).strip('"'))
15
16
11fdf7f2
TL
17class JSONRPCException(Exception):
18 def __init__(self, message):
19 self.message = message
20
21
22class JSONRPCClient(object):
9f95a23c
TL
23 def __init__(self, addr, port=None, timeout=60.0, **kwargs):
24 self.sock = None
25 ch = logging.StreamHandler()
26 ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
27 ch.setLevel(logging.DEBUG)
28 self._logger = logging.getLogger("JSONRPCClient(%s)" % addr)
29 self._logger.addHandler(ch)
f67539c2
TL
30 self.log_set_level(kwargs.get('log_level', logging.ERROR))
31 connect_retries = kwargs.get('conn_retries', 0)
9f95a23c 32
11fdf7f2 33 self.timeout = timeout
9f95a23c
TL
34 self._request_id = 0
35 self._recv_buf = ""
36 self._reqs = []
f67539c2
TL
37
38 for i in range(connect_retries):
39 try:
40 self._connect(addr, port)
41 return
42 except Exception as e:
43 # ignore and retry in 200ms
44 time.sleep(0.2)
45
46 # try one last time without try/except
47 self._connect(addr, port)
48
49 def __enter__(self):
50 return self
51
52 def __exit__(self, exception_type, exception_value, traceback):
53 self.close()
54
55 def _connect(self, addr, port):
11fdf7f2 56 try:
9f95a23c
TL
57 if os.path.exists(addr):
58 self._logger.debug("Trying to connect to UNIX socket: %s", addr)
11fdf7f2
TL
59 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
60 self.sock.connect(addr)
f67539c2
TL
61 elif port:
62 if ':' in addr:
63 self._logger.debug("Trying to connect to IPv6 address addr:%s, port:%i", addr, port)
64 for res in socket.getaddrinfo(addr, port, socket.AF_INET6, socket.SOCK_STREAM, socket.SOL_TCP):
65 af, socktype, proto, canonname, sa = res
66 self.sock = socket.socket(af, socktype, proto)
67 self.sock.connect(sa)
68 else:
69 self._logger.debug("Trying to connect to IPv4 address addr:%s, port:%i'", addr, port)
70 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
71 self.sock.connect((addr, port))
11fdf7f2 72 else:
f67539c2 73 raise socket.error("Unix socket '%s' does not exist" % addr)
11fdf7f2
TL
74 except socket.error as ex:
75 raise JSONRPCException("Error while connecting to %s\n"
76 "Error details: %s" % (addr, ex))
77
9f95a23c
TL
78 def get_logger(self):
79 return self._logger
80
81 """Set logging level
82
83 Args:
84 lvl: Log level to set as accepted by logger.setLevel
85 """
f67539c2 86 def log_set_level(self, lvl):
9f95a23c
TL
87 self._logger.info("Setting log level to %s", lvl)
88 self._logger.setLevel(lvl)
89 self._logger.info("Log level set to %s", lvl)
90
91 def close(self):
92 if getattr(self, "sock", None):
93 self.sock.shutdown(socket.SHUT_RDWR)
94 self.sock.close()
95 self.sock = None
96
97 def add_request(self, method, params):
98 self._request_id += 1
99 req = {
100 'jsonrpc': '2.0',
101 'method': method,
102 'id': self._request_id
103 }
104
105 if params:
106 req['params'] = copy.deepcopy(params)
107
108 self._logger.debug("append request:\n%s\n", json.dumps(req))
109 self._reqs.append(req)
110 return self._request_id
111
112 def flush(self):
113 self._logger.debug("Flushing buffer")
114 # TODO: We can drop indent parameter
115 reqstr = "\n".join(json.dumps(req, indent=2) for req in self._reqs)
116 self._reqs = []
117 self._logger.info("Requests:\n%s\n", reqstr)
118 self.sock.sendall(reqstr.encode("utf-8"))
11fdf7f2 119
9f95a23c
TL
120 def send(self, method, params=None):
121 id = self.add_request(method, params)
122 self.flush()
123 return id
11fdf7f2 124
9f95a23c
TL
125 def decode_one_response(self):
126 try:
127 self._logger.debug("Trying to decode response '%s'", self._recv_buf)
128 buf = self._recv_buf.lstrip()
129 obj, idx = json.JSONDecoder().raw_decode(buf)
130 self._recv_buf = buf[idx:]
131 return obj
132 except ValueError:
133 self._logger.debug("Partial response")
134 return None
135
136 def recv(self):
f67539c2 137 start_time = time.process_time()
9f95a23c
TL
138 response = self.decode_one_response()
139 while not response:
11fdf7f2 140 try:
f67539c2 141 timeout = self.timeout - (time.process_time() - start_time)
11fdf7f2
TL
142 self.sock.settimeout(timeout)
143 newdata = self.sock.recv(4096)
9f95a23c
TL
144 if not newdata:
145 self.sock.close()
146 self.sock = None
147 raise JSONRPCException("Connection closed with partial response:\n%s\n" % self._recv_buf)
148 self._recv_buf += newdata.decode("utf-8")
149 response = self.decode_one_response()
11fdf7f2 150 except socket.timeout:
9f95a23c 151 break # throw exception after loop to avoid Python freaking out about nested exceptions
11fdf7f2
TL
152 except ValueError:
153 continue # incomplete response; keep buffering
11fdf7f2
TL
154
155 if not response:
9f95a23c
TL
156 raise JSONRPCException("Timeout while waiting for response:\n%s\n" % self._recv_buf)
157
158 self._logger.info("response:\n%s\n", json.dumps(response, indent=2))
159 return response
160
f67539c2 161 def call(self, method, params={}):
9f95a23c
TL
162 self._logger.debug("call('%s')" % method)
163 req_id = self.send(method, params)
164 try:
165 response = self.recv()
166 except JSONRPCException as e:
167 """ Don't expect response to kill """
f67539c2 168 if not self.sock and method == "spdk_kill_instance":
9f95a23c 169 self._logger.info("Connection terminated but ignoring since method is '%s'" % method)
11fdf7f2 170 return {}
11fdf7f2 171 else:
9f95a23c 172 raise e
11fdf7f2
TL
173
174 if 'error' in response:
f67539c2
TL
175 params["method"] = method
176 params["req_id"] = req_id
177 msg = "\n".join(["request:", "%s" % json.dumps(params, indent=2),
9f95a23c 178 "Got JSON-RPC error response",
11fdf7f2
TL
179 "response:",
180 json.dumps(response['error'], indent=2)])
181 raise JSONRPCException(msg)
182
183 return response['result']