]>
git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/munet/parser.py
1 # -*- coding: utf-8 eval: (blacken-mode 1) -*-
2 # SPDX-License-Identifier: GPL-2.0-or-later
4 # September 30 2021, Christian Hopps <chopps@labn.net>
6 # Copyright 2021, LabN Consulting, L.L.C.
8 """A module that implements the standalone parser."""
10 import importlib
.resources
19 from pathlib
import Path
23 import jsonschema
# pylint: disable=C0415
24 import jsonschema
.validators
# pylint: disable=C0415
26 from jsonschema
.exceptions
import ValidationError
# pylint: disable=C0415
30 from .config
import list_to_dict_with_key
31 from .native
import Munet
35 if get_schema
.schema
is None:
36 with importlib
.resources
.path("munet", "munet-schema.json") as datapath
:
37 search
= [str(datapath
.parent
)]
38 get_schema
.schema
= get_config(basename
="munet-schema", search
=search
)
39 return get_schema
.schema
42 get_schema
.schema
= None
44 project_root_contains
= [
55 def is_project_root(path
: Path
) -> bool:
57 for contains
in project_root_contains
:
58 if path
.joinpath(contains
).exists():
63 def find_project_root(config_path
: Path
, project_root
=None):
64 if project_root
is not None:
65 project_root
= Path(project_root
)
66 if project_root
in config_path
.parents
:
69 "project_root %s is not a common ancestor of config file %s",
73 return config_path
.parent
74 for ppath
in config_path
.parents
:
75 if is_project_root(ppath
):
77 return config_path
.parent
80 def get_config(pathname
=None, basename
="munet", search
=None, logf
=logging
.debug
):
86 elif isinstance(search
, (str, Path
)):
90 pathname
= os
.path
.join(cwd
, pathname
)
91 if not os
.path
.exists(pathname
):
92 raise FileNotFoundError(pathname
)
95 logf("%s", f
'searching in "{d}" for "{basename}".{{yaml, toml, json}}')
96 for ext
in ("yaml", "toml", "json"):
97 pathname
= os
.path
.join(d
, basename
+ "." + ext
)
98 if os
.path
.exists(pathname
):
99 logf("%s", f
'Found "{pathname}"')
105 raise FileNotFoundError(basename
+ ".{json,toml,yaml} in " + f
"{search}")
107 _
, ext
= pathname
.rsplit(".", 1)
110 config
= json
.load(open(pathname
, encoding
="utf-8"))
112 import toml
# pylint: disable=C0415
114 config
= toml
.load(pathname
)
116 import yaml
# pylint: disable=C0415
118 config
= yaml
.safe_load(open(pathname
, encoding
="utf-8"))
120 raise ValueError("Filename does not end with (.json|.toml|.yaml)")
122 config
["config_pathname"] = os
.path
.realpath(pathname
)
126 def setup_logging(args
, config_base
="logconf"):
127 # Create rundir and arrange for future commands to run in it.
129 # Change CWD to the rundir prior to parsing config
131 os
.chdir(args
.rundir
)
134 with importlib
.resources
.path("munet", config_base
+ ".yaml") as datapath
:
135 search
.append(str(datapath
.parent
))
137 def logf(msg
, *p
, **k
):
139 print("PRELOG: " + msg
% p
, **k
, file=sys
.stderr
)
141 config
= get_config(args
.log_config
, config_base
, search
, logf
=logf
)
142 pathname
= config
["config_pathname"]
143 del config
["config_pathname"]
145 if "info_console" in config
["handlers"]:
148 config
["handlers"]["console"]["level"] = "DEBUG"
149 config
["handlers"]["info_console"]["level"] = "DEBUG"
151 config
["handlers"]["console"]["level"] = "INFO"
152 config
["handlers"]["info_console"]["level"] = "DEBUG"
155 config
["handlers"]["console"]["level"] = "DEBUG"
157 # add the rundir path to the filenames
158 for v
in config
["handlers"].values():
159 filename
= v
.get("filename")
162 v
["filename"] = os
.path
.join(args
.rundir
, filename
)
164 logging
.config
.dictConfig(dict(config
))
165 logging
.info("Loaded logging config %s", pathname
)
172 def append_hosts_files(unet
, netname
):
177 for name
in ("munet", *list(unet
.hosts
)):
179 node
= unet
.switches
[netname
]
182 node
= unet
.hosts
[name
]
183 if not hasattr(node
, "_intf_addrs"):
185 ifname
= node
.get_ifname(netname
)
187 for b
in (False, True):
188 ifaddr
= node
.get_intf_addr(ifname
, ipv6
=b
)
189 if ifaddr
and hasattr(ifaddr
, "ip"):
190 entries
.append((name
, ifaddr
.ip
))
192 for name
in ("munet", *list(unet
.hosts
)):
193 node
= unet
if name
== "munet" else unet
.hosts
[name
]
194 if not hasattr(node
, "rundir"):
196 with
open(os
.path
.join(node
.rundir
, "hosts.txt"), "a+", encoding
="ascii") as hf
:
199 hf
.write(f
"{e[1]}\t{e[0]}\n")
202 def validate_config(config
, logger
, args
):
203 if jsonschema
is None:
204 logger
.debug("No validation w/o jsonschema module")
209 os
.chdir(args
.rundir
)
212 validator
= jsonschema
.validators
.Draft202012Validator(get_schema())
213 validator
.validate(instance
=config
)
214 logger
.debug("Validated %s", config
["config_pathname"])
216 except FileNotFoundError
as error
:
217 logger
.info("No schema found: %s", error
)
219 except ValidationError
as error
:
220 logger
.info("Validation failed: %s", error
)
227 def load_kinds(args
, search
=None):
228 # Change CWD to the rundir prior to parsing config
231 os
.chdir(args
.rundir
)
233 args_config
= args
.kinds_config
if args
else None
237 with importlib
.resources
.path("munet", "kinds.yaml") as datapath
:
238 search
.insert(0, str(datapath
.parent
))
242 configs
.append(get_config(args_config
, "kinds", search
=[]))
244 # prefer directories at the front of the list
247 configs
.append(get_config(basename
="kinds", search
=[kdir
]))
248 except FileNotFoundError
:
252 for config
in configs
:
253 # XXX need to fix the issue with `connections: ["net0"]` not validating
254 # if jsonschema is not None:
255 # validator = jsonschema.validators.Draft202012Validator(get_schema())
256 # validator.validate(instance=config)
258 kinds_list
= config
.get("kinds", [])
259 kinds_dict
= list_to_dict_with_key(kinds_list
, "name")
261 logging
.info("Loading kinds config from %s", config
["config_pathname"])
263 kinds
["kinds"].update(**kinds_dict
)
265 kinds
["kinds"] = kinds_dict
267 cli_list
= config
.get("cli", {}).get("commands", [])
269 logging
.info("Loading cli comands from %s", config
["config_pathname"])
270 if "cli" not in kinds
:
272 if "commands" not in kinds
["cli"]:
273 kinds
["cli"]["commands"] = []
274 kinds
["cli"]["commands"].extend(cli_list
)
277 except FileNotFoundError
as error
:
278 # if we have kinds in args but the file doesn't exist, raise the error
279 if args_config
is not None:
287 async def async_build_topology(
292 unshare_inline
=False,
295 top_level_pidns
=True,
299 rundir
= tempfile
.mkdtemp(prefix
="unet")
300 subprocess
.run(f
"mkdir -p {rundir} && chmod 755 {rundir}", check
=True, shell
=True)
302 isolated
= not args
.host
if args
else True
304 config
= get_config(basename
="munet")
306 # create search directories from common root if given
307 cpath
= Path(config
["config_pathname"]).absolute()
308 project_root
= args
.project_root
if args
else None
310 search_root
= find_project_root(cpath
, project_root
)
312 search
= [cpath
.parent
]
314 search_root
= Path(search_root
).absolute()
315 if search_root
in cpath
.parents
:
316 search
= list(cpath
.parents
)
317 if remcount
:= len(search_root
.parents
):
318 search
= search
[0:-remcount
]
320 # load kinds along search path and merge into config
321 kinds
= load_kinds(args
, search
=search
)
322 config_kinds_dict
= list_to_dict_with_key(config
.get("kinds", []), "name")
323 config
["kinds"] = {**kinds
.get("kinds", {}), **config_kinds_dict
}
325 # mere CLI command from kinds into config as well.
326 kinds_cli_list
= kinds
.get("cli", {}).get("commands", [])
327 config_cli_list
= config
.get("cli", {}).get("commands", [])
330 config_cli_list
.extend(list(kinds_cli_list
))
332 if "cli" not in config
:
334 if "commands" not in config
["cli"]:
335 config
["cli"]["commands"] = []
336 config
["cli"]["commands"].extend(list(kinds_cli_list
))
341 pytestconfig
=pytestconfig
,
344 unshare_inline
=args
.unshare_inline
if args
else unshare_inline
,
349 await unet
._async
_build
(logger
) # pylint: disable=W0212
350 except Exception as error
:
351 logging
.critical("Failure building munet topology: %s", error
, exc_info
=True)
352 await unet
.async_delete()
354 except KeyboardInterrupt:
355 await unet
.async_delete()
358 topoconf
= config
.get("topology")
362 dns_network
= topoconf
.get("dns-network")
364 append_hosts_files(unet
, dns_network
)
366 # Write our current config to the run directory
367 with
open(f
"{unet.rundir}/config.json", "w", encoding
="utf-8") as f
:
368 json
.dump(unet
.config
, f
, indent
=2)
373 def build_topology(config
=None, logger
=None, rundir
=None, args
=None, pytestconfig
=None):
374 return asyncio
.run(async_build_topology(config
, logger
, rundir
, args
, pytestconfig
))