]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/micronet_cli.py
b6cba81d86e315c270c594f4ed8bf1b06dfce6d2
[mirror_frr.git] / tests / topotests / lib / micronet_cli.py
1 # -*- coding: utf-8 eval: (blacken-mode 1) -*-
2 #
3 # July 24 2021, Christian Hopps <chopps@labn.net>
4 #
5 # Copyright (c) 2021, LabN Consulting, L.L.C.
6 #
7 # This program is free software; you can redistribute it and/or
8 # modify it under the terms of the GNU General Public License
9 # as published by the Free Software Foundation; either version 2
10 # of the License, or (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License along
18 # with this program; see the file COPYING; if not, write to the Free Software
19 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 #
21 import argparse
22 import logging
23 import os
24 import pty
25 import re
26 import readline
27 import select
28 import socket
29 import subprocess
30 import sys
31 import tempfile
32 import termios
33 import tty
34
35
36 ENDMARKER = b"\x00END\x00"
37
38
39 def lineiter(sock):
40 s = ""
41 while True:
42 sb = sock.recv(256)
43 if not sb:
44 return
45
46 s += sb.decode("utf-8")
47 i = s.find("\n")
48 if i != -1:
49 yield s[:i]
50 s = s[i + 1 :]
51
52
53 def spawn(unet, host, cmd):
54 if sys.stdin.isatty():
55 old_tty = termios.tcgetattr(sys.stdin)
56 tty.setraw(sys.stdin.fileno())
57 try:
58 master_fd, slave_fd = pty.openpty()
59
60 # use os.setsid() make it run in a new process group, or bash job
61 # control will not be enabled
62 p = unet.hosts[host].popen(
63 cmd,
64 preexec_fn=os.setsid,
65 stdin=slave_fd,
66 stdout=slave_fd,
67 stderr=slave_fd,
68 universal_newlines=True,
69 )
70
71 while p.poll() is None:
72 r, w, e = select.select([sys.stdin, master_fd], [], [], 0.25)
73 if sys.stdin in r:
74 d = os.read(sys.stdin.fileno(), 10240)
75 os.write(master_fd, d)
76 elif master_fd in r:
77 o = os.read(master_fd, 10240)
78 if o:
79 os.write(sys.stdout.fileno(), o)
80 finally:
81 # restore tty settings back
82 if sys.stdin.isatty():
83 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
84
85
86 def doline(unet, line, writef):
87 def host_cmd_split(unet, cmd):
88 csplit = cmd.split()
89 for i, e in enumerate(csplit):
90 if e not in unet.hosts:
91 break
92 hosts = csplit[:i]
93 if not hosts:
94 hosts = sorted(unet.hosts.keys())
95 cmd = " ".join(csplit[i:])
96 return hosts, cmd
97
98 line = line.strip()
99 m = re.match(r"^(\S+)(?:\s+(.*))?$", line)
100 if not m:
101 return True
102
103 cmd = m.group(1)
104 oargs = m.group(2) if m.group(2) else ""
105 if cmd == "q" or cmd == "quit":
106 return False
107 if cmd == "hosts":
108 writef("%% hosts: %s\n" % " ".join(sorted(unet.hosts.keys())))
109 elif cmd in ["term", "vtysh", "xterm"]:
110 args = oargs.split()
111 if not args or (len(args) == 1 and args[0] == "*"):
112 args = sorted(unet.hosts.keys())
113 hosts = [unet.hosts[x] for x in args]
114 for host in hosts:
115 if cmd == "t" or cmd == "term":
116 host.run_in_window("bash")
117 elif cmd == "v" or cmd == "vtysh":
118 host.run_in_window("vtysh")
119 elif cmd == "x" or cmd == "xterm":
120 host.run_in_window("bash", forcex=True)
121 elif cmd == "sh":
122 hosts, cmd = host_cmd_split(unet, oargs)
123 for host in hosts:
124 if sys.stdin.isatty():
125 spawn(unet, host, cmd)
126 else:
127 if len(hosts) > 1:
128 writef("------ Host: %s ------\n" % host)
129 output = unet.hosts[host].cmd_legacy(cmd)
130 writef(output)
131 if len(hosts) > 1:
132 writef("------- End: %s ------\n" % host)
133 writef("\n")
134 elif cmd == "h" or cmd == "help":
135 writef(
136 """
137 Commands:
138 help :: this help
139 sh [hosts] <shell-command> :: execute <shell-command> on <host>
140 term [hosts] :: open shell terminals for hosts
141 vtysh [hosts] :: open vtysh terminals for hosts
142 [hosts] <vtysh-command> :: execute vtysh-command on hosts\n\n"""
143 )
144 else:
145 hosts, cmd = host_cmd_split(unet, line)
146 for host in hosts:
147 if len(hosts) > 1:
148 writef("------ Host: %s ------\n" % host)
149 output = unet.hosts[host].cmd_legacy('vtysh -c "{}"'.format(cmd))
150 writef(output)
151 if len(hosts) > 1:
152 writef("------- End: %s ------\n" % host)
153 writef("\n")
154 return True
155
156
157 def cli_server_setup(unet):
158 sockdir = tempfile.mkdtemp("-sockdir", "pyt")
159 sockpath = os.path.join(sockdir, "cli-server.sock")
160 try:
161 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
162 sock.settimeout(10)
163 sock.bind(sockpath)
164 sock.listen(1)
165 return sock, sockdir, sockpath
166 except Exception:
167 unet.cmd_status("rm -rf " + sockdir)
168 raise
169
170
171 def cli_server(unet, server_sock):
172 sock, addr = server_sock.accept()
173
174 # Go into full non-blocking mode now
175 sock.settimeout(None)
176
177 for line in lineiter(sock):
178 line = line.strip()
179 def writef(x):
180 xb = x.encode("utf-8")
181 sock.send(xb)
182 if not doline(unet, line, writef):
183 return
184 sock.send(ENDMARKER)
185
186
187 def cli_client(sockpath, prompt="unet> "):
188 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
189 sock.settimeout(10)
190 sock.connect(sockpath)
191
192 # Go into full non-blocking mode now
193 sock.settimeout(None)
194
195 print("\n--- Micronet CLI Starting ---\n\n")
196 while True:
197 if sys.version_info[0] == 2:
198 line = raw_input(prompt) # pylint: disable=E0602
199 else:
200 line = input(prompt)
201 if line is None:
202 return
203
204 # Need to put \n back
205 line += "\n"
206
207 # Send the CLI command
208 sock.send(line.encode("utf-8"))
209
210 def bendswith(b, sentinel):
211 slen = len(sentinel)
212 return len(b) >= slen and b[-slen:] == sentinel
213
214 # Collect the output
215 rb = b""
216 while not bendswith(rb, ENDMARKER):
217 lb = sock.recv(4096)
218 if not lb:
219 return
220 rb += lb
221
222 # Remove the marker
223 rb = rb[:-len(ENDMARKER)]
224
225 # Write the output
226 sys.stdout.write(rb.decode("utf-8"))
227
228
229 def local_cli(unet, outf, prompt="unet> "):
230 print("\n--- Micronet CLI Starting ---\n\n")
231 while True:
232 if sys.version_info[0] == 2:
233 line = raw_input(prompt) # pylint: disable=E0602
234 else:
235 line = input(prompt)
236 if line is None:
237 return
238 if not doline(unet, line, outf.write):
239 return
240
241
242 def cli(unet, histfile=None, sockpath=None, force_window=False, title=None, prompt=None, background=True):
243 if prompt is None:
244 prompt = "unet> "
245
246 if force_window or not sys.stdin.isatty():
247 # Run CLI in another window b/c we have no tty.
248 sock, sockdir, sockpath = cli_server_setup(unet)
249
250 python_path = unet.get_exec_path(["python3", "python"])
251 us = os.path.realpath(__file__)
252 cmd = "{} {}".format(python_path, us)
253 if histfile:
254 cmd += " --histfile=" + histfile
255 if title:
256 cmd += " --prompt={}".format(title)
257 cmd += " " + sockpath
258
259 try:
260 unet.run_in_window(cmd, new_window=True, title=title, background=background)
261 return cli_server(unet, sock)
262 finally:
263 unet.cmd_status("rm -rf " + sockdir)
264
265 if not unet:
266 logger.debug("client-cli using sockpath %s", sockpath)
267
268 try:
269 if histfile is None:
270 histfile = os.path.expanduser("~/.micronet-history.txt")
271 if not os.path.exists(histfile):
272 if unet:
273 unet.cmd("touch " + histfile)
274 else:
275 subprocess.run("touch " + histfile)
276 if histfile:
277 readline.read_history_file(histfile)
278 except Exception:
279 pass
280
281 try:
282 if sockpath:
283 cli_client(sockpath, prompt=prompt)
284 else:
285 local_cli(unet, sys.stdout, prompt=prompt)
286 except EOFError:
287 pass
288 except Exception as ex:
289 logger.critical("cli: got exception: %s", ex, exc_info=True)
290 raise
291 finally:
292 readline.write_history_file(histfile)
293
294
295 if __name__ == "__main__":
296 logging.basicConfig(level=logging.DEBUG, filename="/tmp/topotests/cli-client.log")
297 logger = logging.getLogger("cli-client")
298 logger.info("Start logging cli-client")
299
300 parser = argparse.ArgumentParser()
301 parser.add_argument("--histfile", help="file to user for history")
302 parser.add_argument("--prompt-text", help="prompt string to use")
303 parser.add_argument("socket", help="path to pair of sockets to communicate over")
304 args = parser.parse_args()
305
306 prompt = "{}> ".format(args.prompt_text) if args.prompt_text else "unet> "
307 cli(None, args.histfile, args.socket, prompt=prompt)