import ovs.poller
import ovs.stream
+
def handle_rpc(rpc, msg):
done = False
reply = None
else:
rpc.error(errno.EPROTO)
sys.stderr.write("unsolicited JSON-RPC reply or error\n")
-
+
if reply:
rpc.send(reply)
return done
+
def do_listen(name):
error, pstream = ovs.stream.PassiveStream.open(name)
if error:
poller.block()
pstream.close()
+
def do_request(name, method, params_string):
params = ovs.json.from_string(params_string)
msg = ovs.jsonrpc.Message.create_request(method, params)
if error:
sys.stderr.write("error waiting for reply: %s\n" % os.strerror(error))
sys.exit(1)
-
+
print ovs.json.to_string(msg.to_json())
rpc.close()
-
+
+
def do_notify(name, method, params_string):
params = ovs.json.from_string(params_string)
msg = ovs.jsonrpc.Message.create_notify(method, params)
rpc.close()
+
def main(argv):
try:
options, args = getopt.gnu_getopt(
func(*args)
+
def usage():
sys.stdout.write("""\
%s: JSON-RPC test utility for Python
""")
sys.exit(0)
+
if __name__ == '__main__':
main(sys.argv)
-