]>
git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/micronet_cli.py
b6cba81d86e315c270c594f4ed8bf1b06dfce6d2
1 # -*- coding: utf-8 eval: (blacken-mode 1) -*-
3 # July 24 2021, Christian Hopps <chopps@labn.net>
5 # Copyright (c) 2021, LabN Consulting, L.L.C.
7 # This program is free software; you can redistribute it and/or
8 # modify it under the terms of the GNU General Public License
9 # as published by the Free Software Foundation; either version 2
10 # of the License, or (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License along
18 # with this program; see the file COPYING; if not, write to the Free Software
19 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
36 ENDMARKER
= b
"\x00END\x00"
46 s
+= sb
.decode("utf-8")
53 def spawn(unet
, host
, cmd
):
54 if sys
.stdin
.isatty():
55 old_tty
= termios
.tcgetattr(sys
.stdin
)
56 tty
.setraw(sys
.stdin
.fileno())
58 master_fd
, slave_fd
= pty
.openpty()
60 # use os.setsid() make it run in a new process group, or bash job
61 # control will not be enabled
62 p
= unet
.hosts
[host
].popen(
68 universal_newlines
=True,
71 while p
.poll() is None:
72 r
, w
, e
= select
.select([sys
.stdin
, master_fd
], [], [], 0.25)
74 d
= os
.read(sys
.stdin
.fileno(), 10240)
75 os
.write(master_fd
, d
)
77 o
= os
.read(master_fd
, 10240)
79 os
.write(sys
.stdout
.fileno(), o
)
81 # restore tty settings back
82 if sys
.stdin
.isatty():
83 termios
.tcsetattr(sys
.stdin
, termios
.TCSADRAIN
, old_tty
)
86 def doline(unet
, line
, writef
):
87 def host_cmd_split(unet
, cmd
):
89 for i
, e
in enumerate(csplit
):
90 if e
not in unet
.hosts
:
94 hosts
= sorted(unet
.hosts
.keys())
95 cmd
= " ".join(csplit
[i
:])
99 m
= re
.match(r
"^(\S+)(?:\s+(.*))?$", line
)
104 oargs
= m
.group(2) if m
.group(2) else ""
105 if cmd
== "q" or cmd
== "quit":
108 writef("%% hosts: %s\n" % " ".join(sorted(unet
.hosts
.keys())))
109 elif cmd
in ["term", "vtysh", "xterm"]:
111 if not args
or (len(args
) == 1 and args
[0] == "*"):
112 args
= sorted(unet
.hosts
.keys())
113 hosts
= [unet
.hosts
[x
] for x
in args
]
115 if cmd
== "t" or cmd
== "term":
116 host
.run_in_window("bash")
117 elif cmd
== "v" or cmd
== "vtysh":
118 host
.run_in_window("vtysh")
119 elif cmd
== "x" or cmd
== "xterm":
120 host
.run_in_window("bash", forcex
=True)
122 hosts
, cmd
= host_cmd_split(unet
, oargs
)
124 if sys
.stdin
.isatty():
125 spawn(unet
, host
, cmd
)
128 writef("------ Host: %s ------\n" % host
)
129 output
= unet
.hosts
[host
].cmd_legacy(cmd
)
132 writef("------- End: %s ------\n" % host
)
134 elif cmd
== "h" or cmd
== "help":
139 sh [hosts] <shell-command> :: execute <shell-command> on <host>
140 term [hosts] :: open shell terminals for hosts
141 vtysh [hosts] :: open vtysh terminals for hosts
142 [hosts] <vtysh-command> :: execute vtysh-command on hosts\n\n"""
145 hosts
, cmd
= host_cmd_split(unet
, line
)
148 writef("------ Host: %s ------\n" % host
)
149 output
= unet
.hosts
[host
].cmd_legacy('vtysh -c "{}"'.format(cmd
))
152 writef("------- End: %s ------\n" % host
)
157 def cli_server_setup(unet
):
158 sockdir
= tempfile
.mkdtemp("-sockdir", "pyt")
159 sockpath
= os
.path
.join(sockdir
, "cli-server.sock")
161 sock
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
165 return sock
, sockdir
, sockpath
167 unet
.cmd_status("rm -rf " + sockdir
)
171 def cli_server(unet
, server_sock
):
172 sock
, addr
= server_sock
.accept()
174 # Go into full non-blocking mode now
175 sock
.settimeout(None)
177 for line
in lineiter(sock
):
180 xb
= x
.encode("utf-8")
182 if not doline(unet
, line
, writef
):
187 def cli_client(sockpath
, prompt
="unet> "):
188 sock
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
190 sock
.connect(sockpath
)
192 # Go into full non-blocking mode now
193 sock
.settimeout(None)
195 print("\n--- Micronet CLI Starting ---\n\n")
197 if sys
.version_info
[0] == 2:
198 line
= raw_input(prompt
) # pylint: disable=E0602
204 # Need to put \n back
207 # Send the CLI command
208 sock
.send(line
.encode("utf-8"))
210 def bendswith(b
, sentinel
):
212 return len(b
) >= slen
and b
[-slen
:] == sentinel
216 while not bendswith(rb
, ENDMARKER
):
223 rb
= rb
[:-len(ENDMARKER
)]
226 sys
.stdout
.write(rb
.decode("utf-8"))
229 def local_cli(unet
, outf
, prompt
="unet> "):
230 print("\n--- Micronet CLI Starting ---\n\n")
232 if sys
.version_info
[0] == 2:
233 line
= raw_input(prompt
) # pylint: disable=E0602
238 if not doline(unet
, line
, outf
.write
):
242 def cli(unet
, histfile
=None, sockpath
=None, force_window
=False, title
=None, prompt
=None, background
=True):
246 if force_window
or not sys
.stdin
.isatty():
247 # Run CLI in another window b/c we have no tty.
248 sock
, sockdir
, sockpath
= cli_server_setup(unet
)
250 python_path
= unet
.get_exec_path(["python3", "python"])
251 us
= os
.path
.realpath(__file__
)
252 cmd
= "{} {}".format(python_path
, us
)
254 cmd
+= " --histfile=" + histfile
256 cmd
+= " --prompt={}".format(title
)
257 cmd
+= " " + sockpath
260 unet
.run_in_window(cmd
, new_window
=True, title
=title
, background
=background
)
261 return cli_server(unet
, sock
)
263 unet
.cmd_status("rm -rf " + sockdir
)
266 logger
.debug("client-cli using sockpath %s", sockpath
)
270 histfile
= os
.path
.expanduser("~/.micronet-history.txt")
271 if not os
.path
.exists(histfile
):
273 unet
.cmd("touch " + histfile
)
275 subprocess
.run("touch " + histfile
)
277 readline
.read_history_file(histfile
)
283 cli_client(sockpath
, prompt
=prompt
)
285 local_cli(unet
, sys
.stdout
, prompt
=prompt
)
288 except Exception as ex
:
289 logger
.critical("cli: got exception: %s", ex
, exc_info
=True)
292 readline
.write_history_file(histfile
)
295 if __name__
== "__main__":
296 logging
.basicConfig(level
=logging
.DEBUG
, filename
="/tmp/topotests/cli-client.log")
297 logger
= logging
.getLogger("cli-client")
298 logger
.info("Start logging cli-client")
300 parser
= argparse
.ArgumentParser()
301 parser
.add_argument("--histfile", help="file to user for history")
302 parser
.add_argument("--prompt-text", help="prompt string to use")
303 parser
.add_argument("socket", help="path to pair of sockets to communicate over")
304 args
= parser
.parse_args()
306 prompt
= "{}> ".format(args
.prompt_text
) if args
.prompt_text
else "unet> "
307 cli(None, args
.histfile
, args
.socket
, prompt
=prompt
)