]> git.proxmox.com Git - mirror_ovs.git/blame - python/ovs/jsonrpc.py
python: Style cleanup.
[mirror_ovs.git] / python / ovs / jsonrpc.py
CommitLineData
8758e8a3 1# Copyright (c) 2010, 2011 Nicira Networks
99155935
BP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at:
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import errno
16import logging
17import os
18
da51646f 19import ovs.json
99155935
BP
20import ovs.poller
21import ovs.reconnect
22import ovs.stream
23import ovs.timeval
24
25EOF = -1
26
26bb0f31 27
99155935
BP
28class Message(object):
29 T_REQUEST = 0 # Request.
30 T_NOTIFY = 1 # Notification.
31 T_REPLY = 2 # Successful reply.
32 T_ERROR = 3 # Error reply.
33
34 __types = {T_REQUEST: "request",
35 T_NOTIFY: "notification",
36 T_REPLY: "reply",
37 T_ERROR: "error"}
99155935 38
8758e8a3
BP
39 def __init__(self, type_, method, params, result, error, id):
40 self.type = type_
99155935
BP
41 self.method = method
42 self.params = params
43 self.result = result
44 self.error = error
45 self.id = id
46
47 _next_id = 0
26bb0f31 48
99155935
BP
49 @staticmethod
50 def _create_id():
51 this_id = Message._next_id
52 Message._next_id += 1
53 return this_id
54
55 @staticmethod
56 def create_request(method, params):
57 return Message(Message.T_REQUEST, method, params, None, None,
58 Message._create_id())
59
60 @staticmethod
61 def create_notify(method, params):
62 return Message(Message.T_NOTIFY, method, params, None, None,
63 None)
64
65 @staticmethod
66 def create_reply(result, id):
67 return Message(Message.T_REPLY, None, None, result, None, id)
68
69 @staticmethod
70 def create_error(error, id):
71 return Message(Message.T_ERROR, None, None, None, error, id)
72
73 @staticmethod
8758e8a3
BP
74 def type_to_string(type_):
75 return Message.__types[type_]
99155935 76
b2a5856f 77 def __validate_arg(self, value, name, must_have):
99155935
BP
78 if (value is not None) == (must_have != 0):
79 return None
80 else:
81 type_name = Message.type_to_string(self.type)
82 if must_have:
83 verb = "must"
84 else:
85 verb = "must not"
86 return "%s %s have \"%s\"" % (type_name, verb, name)
87
88 def is_valid(self):
89 if self.params is not None and type(self.params) != list:
90 return "\"params\" must be JSON array"
91
92 pattern = {Message.T_REQUEST: 0x11001,
93 Message.T_NOTIFY: 0x11000,
94 Message.T_REPLY: 0x00101,
95 Message.T_ERROR: 0x00011}.get(self.type)
96 if pattern is None:
97 return "invalid JSON-RPC message type %s" % self.type
98
99 return (
b2a5856f
BP
100 self.__validate_arg(self.method, "method", pattern & 0x10000) or
101 self.__validate_arg(self.params, "params", pattern & 0x1000) or
102 self.__validate_arg(self.result, "result", pattern & 0x100) or
103 self.__validate_arg(self.error, "error", pattern & 0x10) or
104 self.__validate_arg(self.id, "id", pattern & 0x1))
99155935
BP
105
106 @staticmethod
107 def from_json(json):
108 if type(json) != dict:
109 return "message is not a JSON object"
110
111 # Make a copy to avoid modifying the caller's dict.
112 json = dict(json)
113
114 if "method" in json:
115 method = json.pop("method")
116 if type(method) not in [str, unicode]:
117 return "method is not a JSON string"
118 else:
119 method = None
120
121 params = json.pop("params", None)
122 result = json.pop("result", None)
123 error = json.pop("error", None)
9b46cccc 124 id_ = json.pop("id", None)
99155935
BP
125 if len(json):
126 return "message has unexpected member \"%s\"" % json.popitem()[0]
127
128 if result is not None:
129 msg_type = Message.T_REPLY
130 elif error is not None:
131 msg_type = Message.T_ERROR
9b46cccc 132 elif id_ is not None:
99155935
BP
133 msg_type = Message.T_REQUEST
134 else:
135 msg_type = Message.T_NOTIFY
26bb0f31 136
9b46cccc 137 msg = Message(msg_type, method, params, result, error, id_)
99155935
BP
138 validation_error = msg.is_valid()
139 if validation_error is not None:
140 return validation_error
141 else:
142 return msg
143
144 def to_json(self):
145 json = {}
146
147 if self.method is not None:
148 json["method"] = self.method
149
150 if self.params is not None:
151 json["params"] = self.params
152
153 if self.result is not None or self.type == Message.T_ERROR:
154 json["result"] = self.result
155
156 if self.error is not None or self.type == Message.T_REPLY:
157 json["error"] = self.error
158
159 if self.id is not None or self.type == Message.T_NOTIFY:
160 json["id"] = self.id
161
162 return json
163
164 def __str__(self):
165 s = [Message.type_to_string(self.type)]
166 if self.method is not None:
167 s.append("method=\"%s\"" % self.method)
168 if self.params is not None:
169 s.append("params=" + ovs.json.to_string(self.params))
b2edc4e7
BP
170 if self.result is not None:
171 s.append("result=" + ovs.json.to_string(self.result))
99155935
BP
172 if self.error is not None:
173 s.append("error=" + ovs.json.to_string(self.error))
174 if self.id is not None:
175 s.append("id=" + ovs.json.to_string(self.id))
176 return ", ".join(s)
177
26bb0f31 178
99155935
BP
179class Connection(object):
180 def __init__(self, stream):
63b1a521 181 self.name = stream.name
99155935
BP
182 self.stream = stream
183 self.status = 0
184 self.input = ""
185 self.output = ""
186 self.parser = None
187
188 def close(self):
189 self.stream.close()
190 self.stream = None
191
192 def run(self):
193 if self.status:
194 return
195
196 while len(self.output):
197 retval = self.stream.send(self.output)
198 if retval >= 0:
199 self.output = self.output[retval:]
200 else:
201 if retval != -errno.EAGAIN:
202 logging.warn("%s: send error: %s" % (self.name,
203 os.strerror(-retval)))
204 self.error(-retval)
205 break
206
207 def wait(self, poller):
208 if not self.status:
209 self.stream.run_wait(poller)
210 if len(self.output):
211 self.stream.send_wait()
212
213 def get_status(self):
214 return self.status
215
216 def get_backlog(self):
217 if self.status != 0:
218 return 0
219 else:
220 return len(self.output)
221
99155935
BP
222 def __log_msg(self, title, msg):
223 logging.debug("%s: %s %s" % (self.name, title, msg))
224
225 def send(self, msg):
226 if self.status:
227 return self.status
228
229 self.__log_msg("send", msg)
230
231 was_empty = len(self.output) == 0
232 self.output += ovs.json.to_string(msg.to_json())
233 if was_empty:
234 self.run()
235 return self.status
236
237 def send_block(self, msg):
238 error = self.send(msg)
239 if error:
240 return error
241
242 while True:
243 self.run()
244 if not self.get_backlog() or self.get_status():
245 return self.status
246
247 poller = ovs.poller.Poller()
248 self.wait(poller)
249 poller.block()
250
251 def recv(self):
252 if self.status:
253 return self.status, None
254
255 while True:
2ad4ef89 256 if not self.input:
99155935
BP
257 error, data = self.stream.recv(4096)
258 if error:
259 if error == errno.EAGAIN:
260 return error, None
261 else:
262 # XXX rate-limit
263 logging.warning("%s: receive error: %s"
264 % (self.name, os.strerror(error)))
265 self.error(error)
266 return self.status, None
2ad4ef89 267 elif not data:
99155935
BP
268 self.error(EOF)
269 return EOF, None
270 else:
271 self.input += data
272 else:
273 if self.parser is None:
274 self.parser = ovs.json.Parser()
275 self.input = self.input[self.parser.feed(self.input):]
276 if self.parser.is_done():
277 msg = self.__process_msg()
278 if msg:
279 return 0, msg
280 else:
281 return self.status, None
282
283 def recv_block(self):
284 while True:
285 error, msg = self.recv()
286 if error != errno.EAGAIN:
287 return error, msg
288
289 self.run()
290
291 poller = ovs.poller.Poller()
292 self.wait(poller)
293 self.recv_wait(poller)
294 poller.block()
26bb0f31 295
99155935 296 def transact_block(self, request):
9b46cccc 297 id_ = request.id
99155935
BP
298
299 error = self.send(request)
300 reply = None
301 while not error:
302 error, reply = self.recv_block()
9b46cccc 303 if reply and reply.type == Message.T_REPLY and reply.id == id_:
99155935
BP
304 break
305 return error, reply
306
307 def __process_msg(self):
308 json = self.parser.finish()
309 self.parser = None
310 if type(json) in [str, unicode]:
311 # XXX rate-limit
312 logging.warning("%s: error parsing stream: %s" % (self.name, json))
313 self.error(errno.EPROTO)
314 return
315
316 msg = Message.from_json(json)
317 if not isinstance(msg, Message):
318 # XXX rate-limit
319 logging.warning("%s: received bad JSON-RPC message: %s"
320 % (self.name, msg))
321 self.error(errno.EPROTO)
322 return
323
324 self.__log_msg("received", msg)
325 return msg
26bb0f31 326
99155935 327 def recv_wait(self, poller):
2ad4ef89 328 if self.status or self.input:
99155935
BP
329 poller.immediate_wake()
330 else:
331 self.stream.recv_wait(poller)
332
333 def error(self, error):
334 if self.status == 0:
335 self.status = error
336 self.stream.close()
337 self.output = ""
26bb0f31
EJ
338
339
99155935
BP
340class Session(object):
341 """A JSON-RPC session with reconnection."""
342
343 def __init__(self, reconnect, rpc):
344 self.reconnect = reconnect
345 self.rpc = rpc
346 self.stream = None
347 self.pstream = None
348 self.seqno = 0
349
350 @staticmethod
351 def open(name):
352 """Creates and returns a Session that maintains a JSON-RPC session to
353 'name', which should be a string acceptable to ovs.stream.Stream or
354 ovs.stream.PassiveStream's initializer.
26bb0f31 355
99155935
BP
356 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
357 session connects and reconnects, with back-off, to 'name'.
26bb0f31 358
99155935
BP
359 If 'name' is a passive connection method, e.g. "ptcp:", the new session
360 listens for connections to 'name'. It maintains at most one connection
361 at any given time. Any new connection causes the previous one (if any)
362 to be dropped."""
363 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
364 reconnect.set_name(name)
365 reconnect.enable(ovs.timeval.msec())
366
367 if ovs.stream.PassiveStream.is_valid_name(name):
4071e24d 368 reconnect.set_passive(True, ovs.timeval.msec())
99155935
BP
369
370 return Session(reconnect, None)
371
372 @staticmethod
373 def open_unreliably(jsonrpc):
374 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
375 reconnect.set_quiet(True)
22bb61e9 376 reconnect.set_name(jsonrpc.name)
99155935
BP
377 reconnect.set_max_tries(0)
378 reconnect.connected(ovs.timeval.msec())
379 return Session(reconnect, jsonrpc)
380
381 def close(self):
382 if self.rpc is not None:
383 self.rpc.close()
384 self.rpc = None
385 if self.stream is not None:
386 self.stream.close()
387 self.stream = None
388 if self.pstream is not None:
389 self.pstream.close()
390 self.pstream = None
391
392 def __disconnect(self):
393 if self.rpc is not None:
394 self.rpc.error(EOF)
395 self.rpc.close()
396 self.rpc = None
397 self.seqno += 1
398 elif self.stream is not None:
399 self.stream.close()
400 self.stream = None
401 self.seqno += 1
26bb0f31 402
99155935
BP
403 def __connect(self):
404 self.__disconnect()
405
406 name = self.reconnect.get_name()
407 if not self.reconnect.is_passive():
408 error, self.stream = ovs.stream.Stream.open(name)
409 if not error:
410 self.reconnect.connecting(ovs.timeval.msec())
411 else:
412 self.reconnect.connect_failed(ovs.timeval.msec(), error)
413 elif self.pstream is not None:
414 error, self.pstream = ovs.stream.PassiveStream.open(name)
415 if not error:
416 self.reconnect.listening(ovs.timeval.msec())
417 else:
418 self.reconnect.connect_failed(ovs.timeval.msec(), error)
419
420 self.seqno += 1
421
422 def run(self):
423 if self.pstream is not None:
424 error, stream = self.pstream.accept()
425 if error == 0:
426 if self.rpc or self.stream:
427 # XXX rate-limit
428 logging.info("%s: new connection replacing active "
429 "connection" % self.reconnect.get_name())
430 self.__disconnect()
431 self.reconnect.connected(ovs.timeval.msec())
432 self.rpc = Connection(stream)
433 elif error != errno.EAGAIN:
434 self.reconnect.listen_error(ovs.timeval.msec(), error)
435 self.pstream.close()
436 self.pstream = None
437
438 if self.rpc:
439 self.rpc.run()
440 error = self.rpc.get_status()
441 if error != 0:
442 self.reconnect.disconnected(ovs.timeval.msec(), error)
443 self.__disconnect()
444 elif self.stream is not None:
445 self.stream.run()
446 error = self.stream.connect()
447 if error == 0:
448 self.reconnect.connected(ovs.timeval.msec())
449 self.rpc = Connection(self.stream)
450 self.stream = None
451 elif error != errno.EAGAIN:
452 self.reconnect.connect_failed(ovs.timeval.msec(), error)
453 self.stream.close()
454 self.stream = None
455
456 action = self.reconnect.run(ovs.timeval.msec())
457 if action == ovs.reconnect.CONNECT:
458 self.__connect()
459 elif action == ovs.reconnect.DISCONNECT:
460 self.reconnect.disconnected(ovs.timeval.msec(), 0)
461 self.__disconnect()
462 elif action == ovs.reconnect.PROBE:
463 if self.rpc:
464 request = Message.create_request("echo", [])
465 request.id = "echo"
466 self.rpc.send(request)
467 else:
468 assert action == None
469
470 def wait(self, poller):
471 if self.rpc is not None:
472 self.rpc.wait(poller)
473 elif self.stream is not None:
474 self.stream.run_wait(poller)
475 self.stream.connect_wait(poller)
476 if self.pstream is not None:
477 self.pstream.wait(poller)
478 self.reconnect.wait(poller, ovs.timeval.msec())
479
480 def get_backlog(self):
481 if self.rpc is not None:
482 return self.rpc.get_backlog()
483 else:
484 return 0
485
486 def get_name(self):
487 return self.reconnect.get_name()
488
489 def send(self, msg):
490 if self.rpc is not None:
491 return self.rpc.send(msg)
492 else:
493 return errno.ENOTCONN
494
495 def recv(self):
496 if self.rpc is not None:
497 error, msg = self.rpc.recv()
498 if not error:
499 self.reconnect.received(ovs.timeval.msec())
500 if msg.type == Message.T_REQUEST and msg.method == "echo":
501 # Echo request. Send reply.
502 self.send(Message.create_reply(msg.params, msg.id))
503 elif msg.type == Message.T_REPLY and msg.id == "echo":
504 # It's a reply to our echo request. Suppress it.
505 pass
506 else:
507 return msg
508 return None
509
510 def recv_wait(self, poller):
511 if self.rpc is not None:
512 self.rpc.recv_wait(poller)
513
514 def is_alive(self):
515 if self.rpc is not None or self.stream is not None:
516 return True
517 else:
518 max_tries = self.reconnect.get_max_tries()
519 return max_tries is None or max_tries > 0
26bb0f31 520
99155935
BP
521 def is_connected(self):
522 return self.rpc is not None
523
524 def get_seqno(self):
525 return self.seqno
526
527 def force_reconnect(self):
528 self.reconnect.force_reconnect(ovs.timeval.msec())