]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/isis_topo1_vrf/test_isis_topo1_vrf.py
Merge pull request #10018 from ckishimo/ospf6d_bitN
[mirror_frr.git] / tests / topotests / isis_topo1_vrf / test_isis_topo1_vrf.py
1 #!/usr/bin/env python
2
3 #
4 # Copyright (c) 2020 by Niral Networks, Inc. ("Niral Networks")
5 # Used Copyright (c) 2018 by Network Device Education Foundation,
6 # Inc. ("NetDEF") in this file.
7 #
8 # Permission to use, copy, modify, and/or distribute this software
9 # for any purpose with or without fee is hereby granted, provided
10 # that the above copyright notice and this permission notice appear
11 # in all copies.
12 #
13 # THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
14 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
16 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
17 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
18 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
19 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
20 # OF THIS SOFTWARE.
21 #
22
23 """
24 test_isis_topo1_vrf.py: Test ISIS vrf topology.
25 """
26
27 import functools
28 import json
29 import os
30 import re
31 import sys
32 import pytest
33
34 CWD = os.path.dirname(os.path.realpath(__file__))
35 sys.path.append(os.path.join(CWD, "../"))
36
37 # pylint: disable=C0413
38 from lib import topotest
39 from lib.topogen import Topogen, TopoRouter, get_topogen
40 from lib.topolog import logger
41 from lib.topotest import iproute2_is_vrf_capable
42 from lib.common_config import required_linux_kernel_version
43
44
45 pytestmark = [pytest.mark.isisd]
46
47 VERTEX_TYPE_LIST = [
48 "pseudo_IS",
49 "pseudo_TE-IS",
50 "IS",
51 "TE-IS",
52 "ES",
53 "IP internal",
54 "IP external",
55 "IP TE",
56 "IP6 internal",
57 "IP6 external",
58 "UNKNOWN",
59 ]
60
61
62 def build_topo(tgen):
63 "Build function"
64
65 # Add ISIS routers:
66 # r1 r2
67 # | sw1 | sw2
68 # r3 r4
69 # | |
70 # sw3 sw4
71 # \ /
72 # r5
73 for routern in range(1, 6):
74 tgen.add_router("r{}".format(routern))
75
76 # r1 <- sw1 -> r3
77 sw = tgen.add_switch("sw1")
78 sw.add_link(tgen.gears["r1"])
79 sw.add_link(tgen.gears["r3"])
80
81 # r2 <- sw2 -> r4
82 sw = tgen.add_switch("sw2")
83 sw.add_link(tgen.gears["r2"])
84 sw.add_link(tgen.gears["r4"])
85
86 # r3 <- sw3 -> r5
87 sw = tgen.add_switch("sw3")
88 sw.add_link(tgen.gears["r3"])
89 sw.add_link(tgen.gears["r5"])
90
91 # r4 <- sw4 -> r5
92 sw = tgen.add_switch("sw4")
93 sw.add_link(tgen.gears["r4"])
94 sw.add_link(tgen.gears["r5"])
95
96
97 def setup_module(mod):
98 "Sets up the pytest environment"
99 tgen = Topogen(build_topo, mod.__name__)
100 tgen.start_topology()
101
102 logger.info("Testing with VRF Lite support")
103
104 cmds = [
105 "ip link add {0}-cust1 type vrf table 1001",
106 "ip link add loop1 type dummy",
107 "ip link set {0}-eth0 master {0}-cust1",
108 ]
109
110 eth1_cmds = ["ip link set {0}-eth1 master {0}-cust1"]
111
112 # For all registered routers, load the zebra configuration file
113 for rname, router in tgen.routers().items():
114 # create VRF rx-cust1 and link rx-eth0 to rx-cust1
115 for cmd in cmds:
116 output = tgen.net[rname].cmd(cmd.format(rname))
117
118 # If router has an rX-eth1, link that to vrf also
119 if "{}-eth1".format(rname) in router.links.keys():
120 for cmd in eth1_cmds:
121 output = output + tgen.net[rname].cmd(cmd.format(rname))
122
123 for rname, router in tgen.routers().items():
124 router.load_config(
125 TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
126 )
127 router.load_config(
128 TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))
129 )
130 # After loading the configurations, this function loads configured daemons.
131 tgen.start_router()
132
133
134 def teardown_module(mod):
135 "Teardown the pytest environment"
136 tgen = get_topogen()
137 # move back rx-eth0 to default VRF
138 # delete rx-vrf
139 tgen.stop_topology()
140
141
142 def test_isis_convergence():
143 "Wait for the protocol to converge before starting to test"
144 tgen = get_topogen()
145 # Don't run this test if we have any failure.
146 if tgen.routers_have_failure():
147 pytest.skip(tgen.errors)
148
149 logger.info("waiting for ISIS protocol to converge")
150
151 for rname, router in tgen.routers().items():
152 filename = "{0}/{1}/{1}_topology.json".format(CWD, rname)
153 expected = json.loads(open(filename).read())
154
155 def compare_isis_topology(router, expected):
156 "Helper function to test ISIS vrf topology convergence."
157 actual = show_isis_topology(router)
158
159 return topotest.json_cmp(actual, expected)
160
161 test_func = functools.partial(compare_isis_topology, router, expected)
162 (result, diff) = topotest.run_and_expect(test_func, None, wait=0.5, count=120)
163 assert result, "ISIS did not converge on {}:\n{}".format(rname, diff)
164
165
166 def test_isis_route_installation():
167 "Check whether all expected routes are present"
168 tgen = get_topogen()
169 # Don't run this test if we have any failure.
170 if tgen.routers_have_failure():
171 pytest.skip(tgen.errors)
172
173 logger.info("Checking routers for installed ISIS vrf routes")
174 # Check for routes in 'show ip route vrf {}-cust1 json'
175 for rname, router in tgen.routers().items():
176 filename = "{0}/{1}/{1}_route.json".format(CWD, rname)
177 expected = json.loads(open(filename, "r").read())
178
179 def compare_routing_table(router, expected):
180 "Helper function to ensure zebra rib convergence"
181
182 actual = router.vtysh_cmd(
183 "show ip route vrf {0}-cust1 json".format(rname), isjson=True
184 )
185 return topotest.json_cmp(actual, expected)
186
187 test_func = functools.partial(compare_routing_table, router, expected)
188 (result, diff) = topotest.run_and_expect(test_func, None, count=20, wait=1)
189 assertmsg = "Router '{}' routes mismatch diff: {}".format(rname, diff)
190 assert result, assertmsg
191
192
193 def test_isis_linux_route_installation():
194 "Check whether all expected routes are present and installed in the OS"
195 tgen = get_topogen()
196 # Don't run this test if we have any failure.
197 if tgen.routers_have_failure():
198 pytest.skip(tgen.errors)
199
200 # Required linux kernel version for this suite to run.
201 result = required_linux_kernel_version("4.15")
202 if result is not True:
203 pytest.skip("Kernel requirements are not met")
204
205 # iproute2 needs to support VRFs for this suite to run.
206 if not iproute2_is_vrf_capable():
207 pytest.skip("Installed iproute2 version does not support VRFs")
208
209 logger.info("Checking routers for installed ISIS vrf routes in OS")
210 # Check for routes in `ip route show vrf {}-cust1`
211 for rname, router in tgen.routers().items():
212 filename = "{0}/{1}/{1}_route_linux.json".format(CWD, rname)
213 expected = json.loads(open(filename, "r").read())
214 actual = topotest.ip4_vrf_route(router)
215 assertmsg = "Router '{}' OS routes mismatch".format(rname)
216 assert topotest.json_cmp(actual, expected) is None, assertmsg
217
218
219 def test_isis_route6_installation():
220 "Check whether all expected routes are present"
221 tgen = get_topogen()
222 # Don't run this test if we have any failure.
223 if tgen.routers_have_failure():
224 pytest.skip(tgen.errors)
225
226 logger.info("Checking routers for installed ISIS vrf IPv6 routes")
227 # Check for routes in 'show ipv6 route vrf {}-cust1 json'
228 for rname, router in tgen.routers().items():
229 filename = "{0}/{1}/{1}_route6.json".format(CWD, rname)
230 expected = json.loads(open(filename, "r").read())
231
232 def compare_routing_table(router, expected):
233 "Helper function to ensure zebra rib convergence"
234 actual = router.vtysh_cmd(
235 "show ipv6 route vrf {}-cust1 json".format(rname), isjson=True
236 )
237 return topotest.json_cmp(actual, expected)
238
239 test_func = functools.partial(compare_routing_table, router, expected)
240 (result, diff) = topotest.run_and_expect(test_func, None, count=20, wait=1)
241 assertmsg = "Router '{}' routes mismatch diff: ".format(rname, diff)
242 assert result, assertmsg
243
244
245 def test_isis_linux_route6_installation():
246 "Check whether all expected routes are present and installed in the OS"
247 tgen = get_topogen()
248 # Don't run this test if we have any failure.
249 if tgen.routers_have_failure():
250 pytest.skip(tgen.errors)
251
252 # Required linux kernel version for this suite to run.
253 result = required_linux_kernel_version("4.15")
254 if result is not True:
255 pytest.skip("Kernel requirements are not met")
256
257 # iproute2 needs to support VRFs for this suite to run.
258 if not iproute2_is_vrf_capable():
259 pytest.skip("Installed iproute2 version does not support VRFs")
260
261 logger.info("Checking routers for installed ISIS vrf IPv6 routes in OS")
262 # Check for routes in `ip -6 route show vrf {}-cust1`
263 for rname, router in tgen.routers().items():
264 filename = "{0}/{1}/{1}_route6_linux.json".format(CWD, rname)
265 expected = json.loads(open(filename, "r").read())
266 actual = topotest.ip6_vrf_route(router)
267 assertmsg = "Router '{}' OS routes mismatch".format(rname)
268 assert topotest.json_cmp(actual, expected) is None, assertmsg
269
270
271 def test_memory_leak():
272 "Run the memory leak test and report results."
273 tgen = get_topogen()
274 if not tgen.is_memleak_enabled():
275 pytest.skip("Memory leak test/report is disabled")
276
277 tgen.report_memory_leaks()
278
279
280 if __name__ == "__main__":
281 args = ["-s"] + sys.argv[1:]
282 sys.exit(pytest.main(args))
283
284
285 #
286 # Auxiliary functions
287 #
288
289
290 def dict_merge(dct, merge_dct):
291 """
292 Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
293 updating only top-level keys, dict_merge recurses down into dicts nested
294 to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
295 ``dct``.
296 :param dct: dict onto which the merge is executed
297 :param merge_dct: dct merged into dct
298 :return: None
299
300 Source:
301 https://gist.github.com/angstwad/bf22d1822c38a92ec0a9
302 """
303 for k, v in merge_dct.items():
304 if k in dct and isinstance(dct[k], dict) and topotest.is_mapping(merge_dct[k]):
305 dict_merge(dct[k], merge_dct[k])
306 else:
307 dct[k] = merge_dct[k]
308
309
310 def parse_topology(lines, level):
311 """
312 Parse the output of 'show isis topology level-X' into a Python dict.
313 """
314 areas = {}
315 area = None
316 ipv = None
317 vertex_type_regex = "|".join(VERTEX_TYPE_LIST)
318
319 for line in lines:
320 area_match = re.match(r"Area (.+):", line)
321 if area_match:
322 area = area_match.group(1)
323 if area not in areas:
324 areas[area] = {level: {"ipv4": [], "ipv6": []}}
325 ipv = None
326 continue
327 elif area is None:
328 continue
329
330 if re.match(r"IS\-IS paths to level-. routers that speak IPv6", line):
331 ipv = "ipv6"
332 continue
333 if re.match(r"IS\-IS paths to level-. routers that speak IP", line):
334 ipv = "ipv4"
335 continue
336
337 item_match = re.match(
338 r"([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+)", line
339 )
340 if (
341 item_match is not None
342 and item_match.group(1) == "Vertex"
343 and item_match.group(2) == "Type"
344 and item_match.group(3) == "Metric"
345 and item_match.group(4) == "Next-Hop"
346 and item_match.group(5) == "Interface"
347 and item_match.group(6) == "Parent"
348 ):
349 # Skip header
350 continue
351
352 item_match = re.match(
353 r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+) ([^\s]+) ([^\s]+)".format(
354 vertex_type_regex
355 ),
356 line,
357 )
358 if item_match is not None:
359 areas[area][level][ipv].append(
360 {
361 "vertex": item_match.group(1),
362 "type": item_match.group(2),
363 "metric": item_match.group(3),
364 "next-hop": item_match.group(5),
365 "interface": item_match.group(6),
366 "parent": item_match.group(7),
367 }
368 )
369 continue
370
371 item_match = re.match(
372 r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+)".format(vertex_type_regex),
373 line,
374 )
375
376 if item_match is not None:
377 areas[area][level][ipv].append(
378 {
379 "vertex": item_match.group(1),
380 "type": item_match.group(2),
381 "metric": item_match.group(3),
382 "parent": item_match.group(5),
383 }
384 )
385 continue
386
387 item_match = re.match(r"([^\s]+)", line)
388 if item_match is not None:
389 areas[area][level][ipv].append({"vertex": item_match.group(1)})
390 continue
391
392 return areas
393
394
395 def show_isis_topology(router):
396 """
397 Get the ISIS vrf topology in a dictionary format.
398
399 Sample:
400 {
401 'area-name': {
402 'level-1': [
403 {
404 'vertex': 'r1'
405 }
406 ],
407 'level-2': [
408 {
409 'vertex': '10.0.0.1/24',
410 'type': 'IP',
411 'parent': '0',
412 'metric': 'internal'
413 }
414 ]
415 },
416 'area-name-2': {
417 'level-2': [
418 {
419 "interface": "rX-ethY",
420 "metric": "Z",
421 "next-hop": "rA",
422 "parent": "rC(B)",
423 "type": "TE-IS",
424 "vertex": "rD"
425 }
426 ]
427 }
428 }
429 """
430 l1out = topotest.normalize_text(
431 router.vtysh_cmd("show isis vrf {}-cust1 topology level-1".format(router.name))
432 ).splitlines()
433 l2out = topotest.normalize_text(
434 router.vtysh_cmd("show isis vrf {}-cust1 topology level-2".format(router.name))
435 ).splitlines()
436
437 l1 = parse_topology(l1out, "level-1")
438 l2 = parse_topology(l2out, "level-2")
439
440 dict_merge(l1, l2)
441 return l1