]> git.proxmox.com Git - ovs.git/blob - tests/test-unixctl.py
Fix ovs-dpctl-top by removing 3 wrong hunks in py3-compat.patch.
[ovs.git] / tests / test-unixctl.py
1 # Copyright (c) 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import argparse
16
17 import ovs.daemon
18 import ovs.unixctl
19 import ovs.unixctl.server
20
21 vlog = ovs.vlog.Vlog("test-unixctl")
22 exiting = False
23
24
25 def unixctl_exit(conn, unused_argv, aux):
26 assert aux == "aux_exit"
27 global exiting
28
29 exiting = True
30 conn.reply(None)
31
32
33 def unixctl_echo(conn, argv, aux):
34 assert aux == "aux_echo"
35 conn.reply(str(argv))
36
37
38 def unixctl_echo_error(conn, argv, aux):
39 assert aux == "aux_echo_error"
40 conn.reply_error(str(argv))
41
42
43 def unixctl_log(conn, argv, unused_aux):
44 vlog.info(str(argv[0]))
45 conn.reply(None)
46
47
48 def unixctl_block(conn, unused_argv, unused_aux):
49 pass
50
51
52 def main():
53 parser = argparse.ArgumentParser(
54 description="Open vSwitch unixctl test program for Python")
55 parser.add_argument("--unixctl", help="UNIXCTL socket location or 'none'.")
56
57 ovs.daemon.add_args(parser)
58 ovs.vlog.add_args(parser)
59 args = parser.parse_args()
60 ovs.daemon.handle_args(args)
61 ovs.vlog.handle_args(args)
62
63 ovs.daemon.daemonize_start()
64 error, server = ovs.unixctl.server.UnixctlServer.create(args.unixctl)
65 if error:
66 ovs.util.ovs_fatal(error, "could not create unixctl server at %s"
67 % args.unixctl, vlog)
68
69 ovs.unixctl.command_register("exit", "", 0, 0, unixctl_exit, "aux_exit")
70 ovs.unixctl.command_register("echo", "[arg ...]", 1, 2, unixctl_echo,
71 "aux_echo")
72 ovs.unixctl.command_register("log", "[arg ...]", 1, 2, unixctl_log, None)
73 ovs.unixctl.command_register("echo_error", "[arg ...]", 1, 2,
74 unixctl_echo_error, "aux_echo_error")
75 ovs.unixctl.command_register("block", "", 0, 0, unixctl_block, None)
76 ovs.daemon.daemonize_complete()
77
78 vlog.info("Entering run loop.")
79 poller = ovs.poller.Poller()
80 while not exiting:
81 server.run()
82 server.wait(poller)
83 if exiting:
84 poller.immediate_wake()
85 poller.block()
86 server.close()
87
88
89 if __name__ == '__main__':
90 main()