]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/munet/parser.py
zebra: Unlock the route node when sending route notifications
[mirror_frr.git] / tests / topotests / munet / parser.py
1 # -*- coding: utf-8 eval: (blacken-mode 1) -*-
2 # SPDX-License-Identifier: GPL-2.0-or-later
3 #
4 # September 30 2021, Christian Hopps <chopps@labn.net>
5 #
6 # Copyright 2021, LabN Consulting, L.L.C.
7 #
8 """A module that implements the standalone parser."""
9 import asyncio
10 import importlib.resources
11 import json
12 import logging
13 import logging.config
14 import os
15 import subprocess
16 import sys
17 import tempfile
18
19 from pathlib import Path
20
21
22 try:
23 import jsonschema # pylint: disable=C0415
24 import jsonschema.validators # pylint: disable=C0415
25
26 from jsonschema.exceptions import ValidationError # pylint: disable=C0415
27 except ImportError:
28 jsonschema = None
29
30 from .config import list_to_dict_with_key
31 from .native import Munet
32
33
34 def get_schema():
35 if get_schema.schema is None:
36 with importlib.resources.path("munet", "munet-schema.json") as datapath:
37 search = [str(datapath.parent)]
38 get_schema.schema = get_config(basename="munet-schema", search=search)
39 return get_schema.schema
40
41
42 get_schema.schema = None
43
44 project_root_contains = [
45 ".git",
46 "pyproject.toml",
47 "tox.ini",
48 "setup.cfg",
49 "setup.py",
50 "pytest.ini",
51 ".projectile",
52 ]
53
54
55 def is_project_root(path: Path) -> bool:
56
57 for contains in project_root_contains:
58 if path.joinpath(contains).exists():
59 return True
60 return False
61
62
63 def find_project_root(config_path: Path, project_root=None):
64 if project_root is not None:
65 project_root = Path(project_root)
66 if project_root in config_path.parents:
67 return project_root
68 logging.warning(
69 "project_root %s is not a common ancestor of config file %s",
70 project_root,
71 config_path,
72 )
73 return config_path.parent
74 for ppath in config_path.parents:
75 if is_project_root(ppath):
76 return ppath
77 return config_path.parent
78
79
80 def get_config(pathname=None, basename="munet", search=None, logf=logging.debug):
81
82 cwd = os.getcwd()
83
84 if not search:
85 search = [cwd]
86 elif isinstance(search, (str, Path)):
87 search = [search]
88
89 if pathname:
90 pathname = os.path.join(cwd, pathname)
91 if not os.path.exists(pathname):
92 raise FileNotFoundError(pathname)
93 else:
94 for d in search:
95 logf("%s", f'searching in "{d}" for "{basename}".{{yaml, toml, json}}')
96 for ext in ("yaml", "toml", "json"):
97 pathname = os.path.join(d, basename + "." + ext)
98 if os.path.exists(pathname):
99 logf("%s", f'Found "{pathname}"')
100 break
101 else:
102 continue
103 break
104 else:
105 raise FileNotFoundError(basename + ".{json,toml,yaml} in " + f"{search}")
106
107 _, ext = pathname.rsplit(".", 1)
108
109 if ext == "json":
110 config = json.load(open(pathname, encoding="utf-8"))
111 elif ext == "toml":
112 import toml # pylint: disable=C0415
113
114 config = toml.load(pathname)
115 elif ext == "yaml":
116 import yaml # pylint: disable=C0415
117
118 config = yaml.safe_load(open(pathname, encoding="utf-8"))
119 else:
120 raise ValueError("Filename does not end with (.json|.toml|.yaml)")
121
122 config["config_pathname"] = os.path.realpath(pathname)
123 return config
124
125
126 def setup_logging(args, config_base="logconf"):
127 # Create rundir and arrange for future commands to run in it.
128
129 # Change CWD to the rundir prior to parsing config
130 old = os.getcwd()
131 os.chdir(args.rundir)
132 try:
133 search = [old]
134 with importlib.resources.path("munet", config_base + ".yaml") as datapath:
135 search.append(str(datapath.parent))
136
137 def logf(msg, *p, **k):
138 if args.verbose:
139 print("PRELOG: " + msg % p, **k, file=sys.stderr)
140
141 config = get_config(args.log_config, config_base, search, logf=logf)
142 pathname = config["config_pathname"]
143 del config["config_pathname"]
144
145 if "info_console" in config["handlers"]:
146 # mutest case
147 if args.verbose > 1:
148 config["handlers"]["console"]["level"] = "DEBUG"
149 config["handlers"]["info_console"]["level"] = "DEBUG"
150 elif args.verbose:
151 config["handlers"]["console"]["level"] = "INFO"
152 config["handlers"]["info_console"]["level"] = "DEBUG"
153 elif args.verbose:
154 # munet case
155 config["handlers"]["console"]["level"] = "DEBUG"
156
157 # add the rundir path to the filenames
158 for v in config["handlers"].values():
159 filename = v.get("filename")
160 if not filename:
161 continue
162 v["filename"] = os.path.join(args.rundir, filename)
163
164 logging.config.dictConfig(dict(config))
165 logging.info("Loaded logging config %s", pathname)
166
167 return config
168 finally:
169 os.chdir(old)
170
171
172 def append_hosts_files(unet, netname):
173 if not netname:
174 return
175
176 entries = []
177 for name in ("munet", *list(unet.hosts)):
178 if name == "munet":
179 node = unet.switches[netname]
180 ifname = None
181 else:
182 node = unet.hosts[name]
183 if not hasattr(node, "_intf_addrs"):
184 continue
185 ifname = node.get_ifname(netname)
186
187 for b in (False, True):
188 ifaddr = node.get_intf_addr(ifname, ipv6=b)
189 if ifaddr and hasattr(ifaddr, "ip"):
190 entries.append((name, ifaddr.ip))
191
192 for name in ("munet", *list(unet.hosts)):
193 node = unet if name == "munet" else unet.hosts[name]
194 if not hasattr(node, "rundir"):
195 continue
196 with open(os.path.join(node.rundir, "hosts.txt"), "a+", encoding="ascii") as hf:
197 hf.write("\n")
198 for e in entries:
199 hf.write(f"{e[1]}\t{e[0]}\n")
200
201
202 def validate_config(config, logger, args):
203 if jsonschema is None:
204 logger.debug("No validation w/o jsonschema module")
205 return True
206
207 old = os.getcwd()
208 if args:
209 os.chdir(args.rundir)
210
211 try:
212 validator = jsonschema.validators.Draft202012Validator(get_schema())
213 validator.validate(instance=config)
214 logger.debug("Validated %s", config["config_pathname"])
215 return True
216 except FileNotFoundError as error:
217 logger.info("No schema found: %s", error)
218 return False
219 except ValidationError as error:
220 logger.info("Validation failed: %s", error)
221 return False
222 finally:
223 if args:
224 os.chdir(old)
225
226
227 def load_kinds(args, search=None):
228 # Change CWD to the rundir prior to parsing config
229 cwd = os.getcwd()
230 if args:
231 os.chdir(args.rundir)
232
233 args_config = args.kinds_config if args else None
234 try:
235 if search is None:
236 search = [cwd]
237 with importlib.resources.path("munet", "kinds.yaml") as datapath:
238 search.insert(0, str(datapath.parent))
239
240 configs = []
241 if args_config:
242 configs.append(get_config(args_config, "kinds", search=[]))
243 else:
244 # prefer directories at the front of the list
245 for kdir in search:
246 try:
247 configs.append(get_config(basename="kinds", search=[kdir]))
248 except FileNotFoundError:
249 continue
250
251 kinds = {}
252 for config in configs:
253 # XXX need to fix the issue with `connections: ["net0"]` not validating
254 # if jsonschema is not None:
255 # validator = jsonschema.validators.Draft202012Validator(get_schema())
256 # validator.validate(instance=config)
257
258 kinds_list = config.get("kinds", [])
259 kinds_dict = list_to_dict_with_key(kinds_list, "name")
260 if kinds_dict:
261 logging.info("Loading kinds config from %s", config["config_pathname"])
262 if "kinds" in kinds:
263 kinds["kinds"].update(**kinds_dict)
264 else:
265 kinds["kinds"] = kinds_dict
266
267 cli_list = config.get("cli", {}).get("commands", [])
268 if cli_list:
269 logging.info("Loading cli comands from %s", config["config_pathname"])
270 if "cli" not in kinds:
271 kinds["cli"] = {}
272 if "commands" not in kinds["cli"]:
273 kinds["cli"]["commands"] = []
274 kinds["cli"]["commands"].extend(cli_list)
275
276 return kinds
277 except FileNotFoundError as error:
278 # if we have kinds in args but the file doesn't exist, raise the error
279 if args_config is not None:
280 raise error
281 return {}
282 finally:
283 if args:
284 os.chdir(cwd)
285
286
287 async def async_build_topology(
288 config=None,
289 logger=None,
290 rundir=None,
291 args=None,
292 unshare_inline=False,
293 pytestconfig=None,
294 search_root=None,
295 top_level_pidns=True,
296 ):
297
298 if not rundir:
299 rundir = tempfile.mkdtemp(prefix="unet")
300 subprocess.run(f"mkdir -p {rundir} && chmod 755 {rundir}", check=True, shell=True)
301
302 isolated = not args.host if args else True
303 if not config:
304 config = get_config(basename="munet")
305
306 # create search directories from common root if given
307 cpath = Path(config["config_pathname"]).absolute()
308 project_root = args.project_root if args else None
309 if not search_root:
310 search_root = find_project_root(cpath, project_root)
311 if not search_root:
312 search = [cpath.parent]
313 else:
314 search_root = Path(search_root).absolute()
315 if search_root in cpath.parents:
316 search = list(cpath.parents)
317 if remcount := len(search_root.parents):
318 search = search[0:-remcount]
319
320 # load kinds along search path and merge into config
321 kinds = load_kinds(args, search=search)
322 config_kinds_dict = list_to_dict_with_key(config.get("kinds", []), "name")
323 config["kinds"] = {**kinds.get("kinds", {}), **config_kinds_dict}
324
325 # mere CLI command from kinds into config as well.
326 kinds_cli_list = kinds.get("cli", {}).get("commands", [])
327 config_cli_list = config.get("cli", {}).get("commands", [])
328 if config_cli_list:
329 if kinds_cli_list:
330 config_cli_list.extend(list(kinds_cli_list))
331 elif kinds_cli_list:
332 if "cli" not in config:
333 config["cli"] = {}
334 if "commands" not in config["cli"]:
335 config["cli"]["commands"] = []
336 config["cli"]["commands"].extend(list(kinds_cli_list))
337
338 unet = Munet(
339 rundir=rundir,
340 config=config,
341 pytestconfig=pytestconfig,
342 isolated=isolated,
343 pid=top_level_pidns,
344 unshare_inline=args.unshare_inline if args else unshare_inline,
345 logger=logger,
346 )
347
348 try:
349 await unet._async_build(logger) # pylint: disable=W0212
350 except Exception as error:
351 logging.critical("Failure building munet topology: %s", error, exc_info=True)
352 await unet.async_delete()
353 raise
354 except KeyboardInterrupt:
355 await unet.async_delete()
356 raise
357
358 topoconf = config.get("topology")
359 if not topoconf:
360 return unet
361
362 dns_network = topoconf.get("dns-network")
363 if dns_network:
364 append_hosts_files(unet, dns_network)
365
366 # Write our current config to the run directory
367 with open(f"{unet.rundir}/config.json", "w", encoding="utf-8") as f:
368 json.dump(unet.config, f, indent=2)
369
370 return unet
371
372
373 def build_topology(config=None, logger=None, rundir=None, args=None, pytestconfig=None):
374 return asyncio.run(async_build_topology(config, logger, rundir, args, pytestconfig))