]>
git.proxmox.com Git - mirror_ovs.git/blob - python/ovs/jsonrpc.py
1 # Copyright (c) 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
28 class Message(object):
29 T_REQUEST
= 0 # Request.
30 T_NOTIFY
= 1 # Notification.
31 T_REPLY
= 2 # Successful reply.
32 T_ERROR
= 3 # Error reply.
34 __types
= {T_REQUEST
: "request",
35 T_NOTIFY
: "notification",
39 def __init__(self
, type_
, method
, params
, result
, error
, id):
51 this_id
= Message
._next
_id
56 def create_request(method
, params
):
57 return Message(Message
.T_REQUEST
, method
, params
, None, None,
61 def create_notify(method
, params
):
62 return Message(Message
.T_NOTIFY
, method
, params
, None, None,
66 def create_reply(result
, id):
67 return Message(Message
.T_REPLY
, None, None, result
, None, id)
70 def create_error(error
, id):
71 return Message(Message
.T_ERROR
, None, None, None, error
, id)
74 def type_to_string(type_
):
75 return Message
.__types
[type_
]
77 def __validate_arg(self
, value
, name
, must_have
):
78 if (value
is not None) == (must_have
!= 0):
81 type_name
= Message
.type_to_string(self
.type)
86 return "%s %s have \"%s\"" % (type_name
, verb
, name
)
89 if self
.params
is not None and type(self
.params
) != list:
90 return "\"params\" must be JSON array"
92 pattern
= {Message
.T_REQUEST
: 0x11001,
93 Message
.T_NOTIFY
: 0x11000,
94 Message
.T_REPLY
: 0x00101,
95 Message
.T_ERROR
: 0x00011}.get(self
.type)
97 return "invalid JSON-RPC message type %s" % self
.type
100 self
.__validate
_arg
(self
.method
, "method", pattern
& 0x10000) or
101 self
.__validate
_arg
(self
.params
, "params", pattern
& 0x1000) or
102 self
.__validate
_arg
(self
.result
, "result", pattern
& 0x100) or
103 self
.__validate
_arg
(self
.error
, "error", pattern
& 0x10) or
104 self
.__validate
_arg
(self
.id, "id", pattern
& 0x1))
108 if type(json
) != dict:
109 return "message is not a JSON object"
111 # Make a copy to avoid modifying the caller's dict.
115 method
= json
.pop("method")
116 if type(method
) not in [str, unicode]:
117 return "method is not a JSON string"
121 params
= json
.pop("params", None)
122 result
= json
.pop("result", None)
123 error
= json
.pop("error", None)
124 id_
= json
.pop("id", None)
126 return "message has unexpected member \"%s\"" % json
.popitem()[0]
128 if result
is not None:
129 msg_type
= Message
.T_REPLY
130 elif error
is not None:
131 msg_type
= Message
.T_ERROR
132 elif id_
is not None:
133 msg_type
= Message
.T_REQUEST
135 msg_type
= Message
.T_NOTIFY
137 msg
= Message(msg_type
, method
, params
, result
, error
, id_
)
138 validation_error
= msg
.is_valid()
139 if validation_error
is not None:
140 return validation_error
147 if self
.method
is not None:
148 json
["method"] = self
.method
150 if self
.params
is not None:
151 json
["params"] = self
.params
153 if self
.result
is not None or self
.type == Message
.T_ERROR
:
154 json
["result"] = self
.result
156 if self
.error
is not None or self
.type == Message
.T_REPLY
:
157 json
["error"] = self
.error
159 if self
.id is not None or self
.type == Message
.T_NOTIFY
:
165 s
= [Message
.type_to_string(self
.type)]
166 if self
.method
is not None:
167 s
.append("method=\"%s\"" % self
.method
)
168 if self
.params
is not None:
169 s
.append("params=" + ovs
.json
.to_string(self
.params
))
170 if self
.result
is not None:
171 s
.append("result=" + ovs
.json
.to_string(self
.result
))
172 if self
.error
is not None:
173 s
.append("error=" + ovs
.json
.to_string(self
.error
))
174 if self
.id is not None:
175 s
.append("id=" + ovs
.json
.to_string(self
.id))
179 class Connection(object):
180 def __init__(self
, stream
):
181 self
.name
= stream
.name
196 while len(self
.output
):
197 retval
= self
.stream
.send(self
.output
)
199 self
.output
= self
.output
[retval
:]
201 if retval
!= -errno
.EAGAIN
:
202 logging
.warn("%s: send error: %s" % (self
.name
,
203 os
.strerror(-retval
)))
207 def wait(self
, poller
):
209 self
.stream
.run_wait(poller
)
211 self
.stream
.send_wait()
213 def get_status(self
):
216 def get_backlog(self
):
220 return len(self
.output
)
222 def __log_msg(self
, title
, msg
):
223 logging
.debug("%s: %s %s" % (self
.name
, title
, msg
))
229 self
.__log
_msg
("send", msg
)
231 was_empty
= len(self
.output
) == 0
232 self
.output
+= ovs
.json
.to_string(msg
.to_json())
237 def send_block(self
, msg
):
238 error
= self
.send(msg
)
244 if not self
.get_backlog() or self
.get_status():
247 poller
= ovs
.poller
.Poller()
253 return self
.status
, None
257 error
, data
= self
.stream
.recv(4096)
259 if error
== errno
.EAGAIN
:
263 logging
.warning("%s: receive error: %s"
264 % (self
.name
, os
.strerror(error
)))
266 return self
.status
, None
273 if self
.parser
is None:
274 self
.parser
= ovs
.json
.Parser()
275 self
.input = self
.input[self
.parser
.feed(self
.input):]
276 if self
.parser
.is_done():
277 msg
= self
.__process
_msg
()
281 return self
.status
, None
283 def recv_block(self
):
285 error
, msg
= self
.recv()
286 if error
!= errno
.EAGAIN
:
291 poller
= ovs
.poller
.Poller()
293 self
.recv_wait(poller
)
296 def transact_block(self
, request
):
299 error
= self
.send(request
)
302 error
, reply
= self
.recv_block()
303 if reply
and reply
.type == Message
.T_REPLY
and reply
.id == id_
:
307 def __process_msg(self
):
308 json
= self
.parser
.finish()
310 if type(json
) in [str, unicode]:
312 logging
.warning("%s: error parsing stream: %s" % (self
.name
, json
))
313 self
.error(errno
.EPROTO
)
316 msg
= Message
.from_json(json
)
317 if not isinstance(msg
, Message
):
319 logging
.warning("%s: received bad JSON-RPC message: %s"
321 self
.error(errno
.EPROTO
)
324 self
.__log
_msg
("received", msg
)
327 def recv_wait(self
, poller
):
328 if self
.status
or self
.input:
329 poller
.immediate_wake()
331 self
.stream
.recv_wait(poller
)
333 def error(self
, error
):
340 class Session(object):
341 """A JSON-RPC session with reconnection."""
343 def __init__(self
, reconnect
, rpc
):
344 self
.reconnect
= reconnect
352 """Creates and returns a Session that maintains a JSON-RPC session to
353 'name', which should be a string acceptable to ovs.stream.Stream or
354 ovs.stream.PassiveStream's initializer.
356 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
357 session connects and reconnects, with back-off, to 'name'.
359 If 'name' is a passive connection method, e.g. "ptcp:", the new session
360 listens for connections to 'name'. It maintains at most one connection
361 at any given time. Any new connection causes the previous one (if any)
363 reconnect
= ovs
.reconnect
.Reconnect(ovs
.timeval
.msec())
364 reconnect
.set_name(name
)
365 reconnect
.enable(ovs
.timeval
.msec())
367 if ovs
.stream
.PassiveStream
.is_valid_name(name
):
368 reconnect
.set_passive(True, ovs
.timeval
.msec())
370 return Session(reconnect
, None)
373 def open_unreliably(jsonrpc
):
374 reconnect
= ovs
.reconnect
.Reconnect(ovs
.timeval
.msec())
375 reconnect
.set_quiet(True)
376 reconnect
.set_name(jsonrpc
.name
)
377 reconnect
.set_max_tries(0)
378 reconnect
.connected(ovs
.timeval
.msec())
379 return Session(reconnect
, jsonrpc
)
382 if self
.rpc
is not None:
385 if self
.stream
is not None:
388 if self
.pstream
is not None:
392 def __disconnect(self
):
393 if self
.rpc
is not None:
398 elif self
.stream
is not None:
406 name
= self
.reconnect
.get_name()
407 if not self
.reconnect
.is_passive():
408 error
, self
.stream
= ovs
.stream
.Stream
.open(name
)
410 self
.reconnect
.connecting(ovs
.timeval
.msec())
412 self
.reconnect
.connect_failed(ovs
.timeval
.msec(), error
)
413 elif self
.pstream
is not None:
414 error
, self
.pstream
= ovs
.stream
.PassiveStream
.open(name
)
416 self
.reconnect
.listening(ovs
.timeval
.msec())
418 self
.reconnect
.connect_failed(ovs
.timeval
.msec(), error
)
423 if self
.pstream
is not None:
424 error
, stream
= self
.pstream
.accept()
426 if self
.rpc
or self
.stream
:
428 logging
.info("%s: new connection replacing active "
429 "connection" % self
.reconnect
.get_name())
431 self
.reconnect
.connected(ovs
.timeval
.msec())
432 self
.rpc
= Connection(stream
)
433 elif error
!= errno
.EAGAIN
:
434 self
.reconnect
.listen_error(ovs
.timeval
.msec(), error
)
440 error
= self
.rpc
.get_status()
442 self
.reconnect
.disconnected(ovs
.timeval
.msec(), error
)
444 elif self
.stream
is not None:
446 error
= self
.stream
.connect()
448 self
.reconnect
.connected(ovs
.timeval
.msec())
449 self
.rpc
= Connection(self
.stream
)
451 elif error
!= errno
.EAGAIN
:
452 self
.reconnect
.connect_failed(ovs
.timeval
.msec(), error
)
456 action
= self
.reconnect
.run(ovs
.timeval
.msec())
457 if action
== ovs
.reconnect
.CONNECT
:
459 elif action
== ovs
.reconnect
.DISCONNECT
:
460 self
.reconnect
.disconnected(ovs
.timeval
.msec(), 0)
462 elif action
== ovs
.reconnect
.PROBE
:
464 request
= Message
.create_request("echo", [])
466 self
.rpc
.send(request
)
468 assert action
== None
470 def wait(self
, poller
):
471 if self
.rpc
is not None:
472 self
.rpc
.wait(poller
)
473 elif self
.stream
is not None:
474 self
.stream
.run_wait(poller
)
475 self
.stream
.connect_wait(poller
)
476 if self
.pstream
is not None:
477 self
.pstream
.wait(poller
)
478 self
.reconnect
.wait(poller
, ovs
.timeval
.msec())
480 def get_backlog(self
):
481 if self
.rpc
is not None:
482 return self
.rpc
.get_backlog()
487 return self
.reconnect
.get_name()
490 if self
.rpc
is not None:
491 return self
.rpc
.send(msg
)
493 return errno
.ENOTCONN
496 if self
.rpc
is not None:
497 error
, msg
= self
.rpc
.recv()
499 self
.reconnect
.received(ovs
.timeval
.msec())
500 if msg
.type == Message
.T_REQUEST
and msg
.method
== "echo":
501 # Echo request. Send reply.
502 self
.send(Message
.create_reply(msg
.params
, msg
.id))
503 elif msg
.type == Message
.T_REPLY
and msg
.id == "echo":
504 # It's a reply to our echo request. Suppress it.
510 def recv_wait(self
, poller
):
511 if self
.rpc
is not None:
512 self
.rpc
.recv_wait(poller
)
515 if self
.rpc
is not None or self
.stream
is not None:
518 max_tries
= self
.reconnect
.get_max_tries()
519 return max_tries
is None or max_tries
> 0
521 def is_connected(self
):
522 return self
.rpc
is not None
527 def force_reconnect(self
):
528 self
.reconnect
.force_reconnect(ovs
.timeval
.msec())