]> git.proxmox.com Git - ceph.git/blame - ceph/src/spdk/scripts/rpc/client.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / spdk / scripts / rpc / client.py
CommitLineData
11fdf7f2
TL
1import json
2import socket
3import time
9f95a23c
TL
4import os
5import logging
6import copy
11fdf7f2
TL
7
8
9def print_dict(d):
10 print(json.dumps(d, indent=2))
11
12
13class JSONRPCException(Exception):
14 def __init__(self, message):
15 self.message = message
16
17
18class JSONRPCClient(object):
9f95a23c
TL
19 def __init__(self, addr, port=None, timeout=60.0, **kwargs):
20 self.sock = None
21 ch = logging.StreamHandler()
22 ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
23 ch.setLevel(logging.DEBUG)
24 self._logger = logging.getLogger("JSONRPCClient(%s)" % addr)
25 self._logger.addHandler(ch)
26 self.set_log_level(kwargs.get('log_level', logging.ERROR))
27
11fdf7f2 28 self.timeout = timeout
9f95a23c
TL
29 self._request_id = 0
30 self._recv_buf = ""
31 self._reqs = []
11fdf7f2 32 try:
9f95a23c
TL
33 if os.path.exists(addr):
34 self._logger.debug("Trying to connect to UNIX socket: %s", addr)
11fdf7f2
TL
35 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
36 self.sock.connect(addr)
37 elif ':' in addr:
9f95a23c 38 self._logger.debug("Trying to connect to IPv6 address addr:%s, port:%i", addr, port)
11fdf7f2
TL
39 for res in socket.getaddrinfo(addr, port, socket.AF_INET6, socket.SOCK_STREAM, socket.SOL_TCP):
40 af, socktype, proto, canonname, sa = res
41 self.sock = socket.socket(af, socktype, proto)
42 self.sock.connect(sa)
43 else:
9f95a23c 44 self._logger.debug("Trying to connect to IPv4 address addr:%s, port:%i'", addr, port)
11fdf7f2
TL
45 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
46 self.sock.connect((addr, port))
47 except socket.error as ex:
48 raise JSONRPCException("Error while connecting to %s\n"
49 "Error details: %s" % (addr, ex))
50
9f95a23c
TL
51 def __enter__(self):
52 return self
53
54 def __exit__(self, exception_type, exception_value, traceback):
55 self.close()
56
57 def get_logger(self):
58 return self._logger
59
60 """Set logging level
61
62 Args:
63 lvl: Log level to set as accepted by logger.setLevel
64 """
65 def set_log_level(self, lvl):
66 self._logger.info("Setting log level to %s", lvl)
67 self._logger.setLevel(lvl)
68 self._logger.info("Log level set to %s", lvl)
69
70 def close(self):
71 if getattr(self, "sock", None):
72 self.sock.shutdown(socket.SHUT_RDWR)
73 self.sock.close()
74 self.sock = None
75
76 def add_request(self, method, params):
77 self._request_id += 1
78 req = {
79 'jsonrpc': '2.0',
80 'method': method,
81 'id': self._request_id
82 }
83
84 if params:
85 req['params'] = copy.deepcopy(params)
86
87 self._logger.debug("append request:\n%s\n", json.dumps(req))
88 self._reqs.append(req)
89 return self._request_id
90
91 def flush(self):
92 self._logger.debug("Flushing buffer")
93 # TODO: We can drop indent parameter
94 reqstr = "\n".join(json.dumps(req, indent=2) for req in self._reqs)
95 self._reqs = []
96 self._logger.info("Requests:\n%s\n", reqstr)
97 self.sock.sendall(reqstr.encode("utf-8"))
11fdf7f2 98
9f95a23c
TL
99 def send(self, method, params=None):
100 id = self.add_request(method, params)
101 self.flush()
102 return id
11fdf7f2 103
9f95a23c
TL
104 def decode_one_response(self):
105 try:
106 self._logger.debug("Trying to decode response '%s'", self._recv_buf)
107 buf = self._recv_buf.lstrip()
108 obj, idx = json.JSONDecoder().raw_decode(buf)
109 self._recv_buf = buf[idx:]
110 return obj
111 except ValueError:
112 self._logger.debug("Partial response")
113 return None
114
115 def recv(self):
11fdf7f2 116 start_time = time.clock()
9f95a23c
TL
117 response = self.decode_one_response()
118 while not response:
11fdf7f2
TL
119 try:
120 timeout = self.timeout - (time.clock() - start_time)
11fdf7f2
TL
121 self.sock.settimeout(timeout)
122 newdata = self.sock.recv(4096)
9f95a23c
TL
123 if not newdata:
124 self.sock.close()
125 self.sock = None
126 raise JSONRPCException("Connection closed with partial response:\n%s\n" % self._recv_buf)
127 self._recv_buf += newdata.decode("utf-8")
128 response = self.decode_one_response()
11fdf7f2 129 except socket.timeout:
9f95a23c 130 break # throw exception after loop to avoid Python freaking out about nested exceptions
11fdf7f2
TL
131 except ValueError:
132 continue # incomplete response; keep buffering
11fdf7f2
TL
133
134 if not response:
9f95a23c
TL
135 raise JSONRPCException("Timeout while waiting for response:\n%s\n" % self._recv_buf)
136
137 self._logger.info("response:\n%s\n", json.dumps(response, indent=2))
138 return response
139
140 def call(self, method, params=None):
141 self._logger.debug("call('%s')" % method)
142 req_id = self.send(method, params)
143 try:
144 response = self.recv()
145 except JSONRPCException as e:
146 """ Don't expect response to kill """
147 if not self.sock and method == "kill_instance":
148 self._logger.info("Connection terminated but ignoring since method is '%s'" % method)
11fdf7f2 149 return {}
11fdf7f2 150 else:
9f95a23c 151 raise e
11fdf7f2
TL
152
153 if 'error' in response:
9f95a23c
TL
154 msg = "\n".join(["request:", "%s" % json.dumps({**{"method": method, "req_id": req_id},
155 **params}, indent=2),
156 "Got JSON-RPC error response",
11fdf7f2
TL
157 "response:",
158 json.dumps(response['error'], indent=2)])
159 raise JSONRPCException(msg)
160
161 return response['result']