]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/munet/config.py
Merge pull request #13455 from sri-mohan1/srib-ldpd
[mirror_frr.git] / tests / topotests / munet / config.py
1 # -*- coding: utf-8 eval: (blacken-mode 1) -*-
2 # SPDX-License-Identifier: GPL-2.0-or-later
3 #
4 # June 25 2022, Christian Hopps <chopps@gmail.com>
5 #
6 # Copyright (c) 2021-2022, LabN Consulting, L.L.C.
7 #
8 """A module that defines common configuration utility functions."""
9 import logging
10
11 from collections.abc import Iterable
12 from copy import deepcopy
13 from typing import overload
14
15
16 def find_with_kv(lst, k, v):
17 if lst:
18 for e in lst:
19 if k in e and e[k] == v:
20 return e
21 return {}
22
23
24 def find_all_with_kv(lst, k, v):
25 rv = []
26 if lst:
27 for e in lst:
28 if k in e and e[k] == v:
29 rv.append(e)
30 return rv
31
32
33 def find_matching_net_config(name, cconf, oconf):
34 p = find_all_with_kv(oconf.get("connections", {}), "to", name)
35 if not p:
36 return {}
37
38 rname = cconf.get("remote-name", None)
39 if not rname:
40 return p[0]
41
42 return find_with_kv(p, "name", rname)
43
44
45 def merge_using_key(a, b, k):
46 # First get a dict of indexes in `a` for the key value of `k` in objects of `a`
47 m = list(a)
48 mi = {o[k]: i for i, o in enumerate(m)}
49 for o in b:
50 bkv = o[k]
51 if bkv in mi:
52 m[mi[bkv]] = o
53 else:
54 mi[bkv] = len(m)
55 m.append(o)
56 return m
57
58
59 def list_to_dict_with_key(lst, k):
60 """Convert a YANG styl list of objects to dict of objects.
61
62 This function converts a YANG style list of objects (dictionaries) to a plain python
63 dictionary of objects (dictionaries). The value for the supplied key for each
64 object is used to store the object in the new diciontary.
65
66 This only works for lists of objects which are keyed on a single contained value.
67
68 Args:
69 lst: a *list* of python dictionary objects.
70 k: the key value contained in each dictionary object in the list.
71
72 Returns:
73 A dictionary of objects (dictionaries).
74 """
75 return {x[k]: x for x in (lst if lst else [])}
76
77
78 def config_to_dict_with_key(c, ck, k):
79 """Convert the config item from a list of objects to dict.
80
81 Use :py:func:`list_to_dict_with_key` to convert the list of objects
82 at ``c[ck]`` to a dict of the objects using the key ``k``.
83
84 Args:
85 c: config dictionary
86 ck: The key identifying the list of objects from ``c``.
87 k: The key to pass to :py:func:`list_to_dict_with_key`.
88
89 Returns:
90 A dictionary of objects (dictionaries).
91 """
92 c[ck] = list_to_dict_with_key(c.get(ck, []), k)
93 return c[ck]
94
95
96 @overload
97 def config_subst(config: str, **kwargs) -> str:
98 ...
99
100
101 @overload
102 def config_subst(config: Iterable, **kwargs) -> Iterable:
103 ...
104
105
106 def config_subst(config: Iterable, **kwargs) -> Iterable:
107 if isinstance(config, str):
108 if "%RUNDIR%/%NAME%" in config:
109 config = config.replace("%RUNDIR%/%NAME%", "%RUNDIR%")
110 logging.warning(
111 "config '%RUNDIR%/%NAME%' should be changed to '%RUNDIR%' only, "
112 "converting automatically for now."
113 )
114 for name, value in kwargs.items():
115 config = config.replace(f"%{name.upper()}%", str(value))
116 elif isinstance(config, Iterable):
117 try:
118 return {k: config_subst(config[k], **kwargs) for k in config}
119 except (KeyError, TypeError):
120 return [config_subst(x, **kwargs) for x in config]
121 return config
122
123
124 def value_merge_deepcopy(s1, s2):
125 """Merge values using deepcopy.
126
127 Create a deepcopy of the result of merging the values from dicts ``s1`` and ``s2``.
128 If a key exists in both ``s1`` and ``s2`` the value from ``s2`` is used."
129 """
130 d = {}
131 for k, v in s1.items():
132 if k in s2:
133 d[k] = deepcopy(s2[k])
134 else:
135 d[k] = deepcopy(v)
136 return d
137
138
139 def merge_kind_config(kconf, config):
140 mergekeys = kconf.get("merge", [])
141 config = deepcopy(config)
142 new = deepcopy(kconf)
143 for k in new:
144 if k not in config:
145 continue
146
147 if k not in mergekeys:
148 new[k] = config[k]
149 elif isinstance(new[k], list):
150 new[k].extend(config[k])
151 elif isinstance(new[k], dict):
152 new[k] = {**new[k], **config[k]}
153 else:
154 new[k] = config[k]
155 for k in config:
156 if k not in new:
157 new[k] = config[k]
158 return new
159
160
161 def cli_opt_list(option_list):
162 if not option_list:
163 return []
164 if isinstance(option_list, str):
165 return [x for x in option_list.split(",") if x]
166 return [x for x in option_list if x]
167
168
169 def name_in_cli_opt_str(name, option_list):
170 ol = cli_opt_list(option_list)
171 return name in ol or "all" in ol
172
173
174 class ConfigOptionsProxy:
175 """Proxy options object to fill in for any missing pytest config."""
176
177 class DefNoneObject:
178 """An object that returns None for any attribute access."""
179
180 def __getattr__(self, attr):
181 return None
182
183 def __init__(self, pytestconfig=None):
184 if isinstance(pytestconfig, ConfigOptionsProxy):
185 self.config = pytestconfig.config
186 self.option = self.config.option
187 else:
188 self.config = pytestconfig
189 if self.config:
190 self.option = self.config.option
191 else:
192 self.option = ConfigOptionsProxy.DefNoneObject()
193
194 def getoption(self, opt, default=None):
195 if not self.config:
196 return default
197
198 try:
199 value = self.config.getoption(opt)
200 return value if value is not None else default
201 except ValueError:
202 return default
203
204 def get_option(self, opt, default=None):
205 return self.getoption(opt, default)
206
207 def get_option_list(self, opt):
208 value = self.get_option(opt, "")
209 return cli_opt_list(value)
210
211 def name_in_option_list(self, name, opt):
212 optlist = self.get_option_list(opt)
213 return "all" in optlist or name in optlist