]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/isis_topo1_vrf/test_isis_topo1_vrf.py
Merge pull request #12798 from donaldsharp/rib_match_multicast
[mirror_frr.git] / tests / topotests / isis_topo1_vrf / test_isis_topo1_vrf.py
1 #!/usr/bin/env python
2 # SPDX-License-Identifier: ISC
3
4 #
5 # Copyright (c) 2020 by Niral Networks, Inc. ("Niral Networks")
6 # Used Copyright (c) 2018 by Network Device Education Foundation,
7 # Inc. ("NetDEF") in this file.
8 #
9
10 """
11 test_isis_topo1_vrf.py: Test ISIS vrf topology.
12 """
13
14 import functools
15 import json
16 import os
17 import re
18 import sys
19 import pytest
20
21 CWD = os.path.dirname(os.path.realpath(__file__))
22 sys.path.append(os.path.join(CWD, "../"))
23
24 # pylint: disable=C0413
25 from lib import topotest
26 from lib.topogen import Topogen, TopoRouter, get_topogen
27 from lib.topolog import logger
28 from lib.topotest import iproute2_is_vrf_capable
29 from lib.common_config import required_linux_kernel_version
30
31
32 pytestmark = [pytest.mark.isisd]
33
34 VERTEX_TYPE_LIST = [
35 "pseudo_IS",
36 "pseudo_TE-IS",
37 "IS",
38 "TE-IS",
39 "ES",
40 "IP internal",
41 "IP external",
42 "IP TE",
43 "IP6 internal",
44 "IP6 external",
45 "UNKNOWN",
46 ]
47
48
49 def build_topo(tgen):
50 "Build function"
51
52 # Add ISIS routers:
53 # r1 r2
54 # | sw1 | sw2
55 # r3 r4
56 # | |
57 # sw3 sw4
58 # \ /
59 # r5
60 for routern in range(1, 6):
61 tgen.add_router("r{}".format(routern))
62
63 # r1 <- sw1 -> r3
64 sw = tgen.add_switch("sw1")
65 sw.add_link(tgen.gears["r1"])
66 sw.add_link(tgen.gears["r3"])
67
68 # r2 <- sw2 -> r4
69 sw = tgen.add_switch("sw2")
70 sw.add_link(tgen.gears["r2"])
71 sw.add_link(tgen.gears["r4"])
72
73 # r3 <- sw3 -> r5
74 sw = tgen.add_switch("sw3")
75 sw.add_link(tgen.gears["r3"])
76 sw.add_link(tgen.gears["r5"])
77
78 # r4 <- sw4 -> r5
79 sw = tgen.add_switch("sw4")
80 sw.add_link(tgen.gears["r4"])
81 sw.add_link(tgen.gears["r5"])
82
83
84 def setup_module(mod):
85 "Sets up the pytest environment"
86 tgen = Topogen(build_topo, mod.__name__)
87 tgen.start_topology()
88
89 logger.info("Testing with VRF Lite support")
90
91 cmds = [
92 "ip link add {0}-cust1 type vrf table 1001",
93 "ip link add loop1 type dummy",
94 "ip link set {0}-eth0 master {0}-cust1",
95 ]
96
97 eth1_cmds = ["ip link set {0}-eth1 master {0}-cust1"]
98
99 # For all registered routers, load the zebra configuration file
100 for rname, router in tgen.routers().items():
101 # create VRF rx-cust1 and link rx-eth0 to rx-cust1
102 for cmd in cmds:
103 output = tgen.net[rname].cmd(cmd.format(rname))
104
105 # If router has an rX-eth1, link that to vrf also
106 if "{}-eth1".format(rname) in router.links.keys():
107 for cmd in eth1_cmds:
108 output = output + tgen.net[rname].cmd(cmd.format(rname))
109
110 for rname, router in tgen.routers().items():
111 router.load_config(
112 TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
113 )
114 router.load_config(
115 TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))
116 )
117 # After loading the configurations, this function loads configured daemons.
118 tgen.start_router()
119
120
121 def teardown_module(mod):
122 "Teardown the pytest environment"
123 tgen = get_topogen()
124 # move back rx-eth0 to default VRF
125 # delete rx-vrf
126 tgen.stop_topology()
127
128
129 def test_isis_convergence():
130 "Wait for the protocol to converge before starting to test"
131 tgen = get_topogen()
132 # Don't run this test if we have any failure.
133 if tgen.routers_have_failure():
134 pytest.skip(tgen.errors)
135
136 logger.info("waiting for ISIS protocol to converge")
137
138 for rname, router in tgen.routers().items():
139 filename = "{0}/{1}/{1}_topology.json".format(CWD, rname)
140 expected = json.loads(open(filename).read())
141
142 def compare_isis_topology(router, expected):
143 "Helper function to test ISIS vrf topology convergence."
144 actual = show_isis_topology(router)
145
146 return topotest.json_cmp(actual, expected)
147
148 test_func = functools.partial(compare_isis_topology, router, expected)
149 (result, diff) = topotest.run_and_expect(test_func, None, wait=0.5, count=120)
150 assert result, "ISIS did not converge on {}:\n{}".format(rname, diff)
151
152
153 def test_isis_route_installation():
154 "Check whether all expected routes are present"
155 tgen = get_topogen()
156 # Don't run this test if we have any failure.
157 if tgen.routers_have_failure():
158 pytest.skip(tgen.errors)
159
160 logger.info("Checking routers for installed ISIS vrf routes")
161 # Check for routes in 'show ip route vrf {}-cust1 json'
162 for rname, router in tgen.routers().items():
163 filename = "{0}/{1}/{1}_route.json".format(CWD, rname)
164 expected = json.loads(open(filename, "r").read())
165
166 def compare_routing_table(router, expected):
167 "Helper function to ensure zebra rib convergence"
168
169 actual = router.vtysh_cmd(
170 "show ip route vrf {0}-cust1 json".format(rname), isjson=True
171 )
172 return topotest.json_cmp(actual, expected)
173
174 test_func = functools.partial(compare_routing_table, router, expected)
175 (result, diff) = topotest.run_and_expect(test_func, None, count=20, wait=1)
176 assertmsg = "Router '{}' routes mismatch diff: {}".format(rname, diff)
177 assert result, assertmsg
178
179
180 def test_isis_linux_route_installation():
181 "Check whether all expected routes are present and installed in the OS"
182 tgen = get_topogen()
183 # Don't run this test if we have any failure.
184 if tgen.routers_have_failure():
185 pytest.skip(tgen.errors)
186
187 # Required linux kernel version for this suite to run.
188 result = required_linux_kernel_version("4.15")
189 if result is not True:
190 pytest.skip("Kernel requirements are not met")
191
192 # iproute2 needs to support VRFs for this suite to run.
193 if not iproute2_is_vrf_capable():
194 pytest.skip("Installed iproute2 version does not support VRFs")
195
196 logger.info("Checking routers for installed ISIS vrf routes in OS")
197 # Check for routes in `ip route show vrf {}-cust1`
198 for rname, router in tgen.routers().items():
199 filename = "{0}/{1}/{1}_route_linux.json".format(CWD, rname)
200 expected = json.loads(open(filename, "r").read())
201 actual = topotest.ip4_vrf_route(router)
202 assertmsg = "Router '{}' OS routes mismatch".format(rname)
203 assert topotest.json_cmp(actual, expected) is None, assertmsg
204
205
206 def test_isis_route6_installation():
207 "Check whether all expected routes are present"
208 tgen = get_topogen()
209 # Don't run this test if we have any failure.
210 if tgen.routers_have_failure():
211 pytest.skip(tgen.errors)
212
213 logger.info("Checking routers for installed ISIS vrf IPv6 routes")
214 # Check for routes in 'show ipv6 route vrf {}-cust1 json'
215 for rname, router in tgen.routers().items():
216 filename = "{0}/{1}/{1}_route6.json".format(CWD, rname)
217 expected = json.loads(open(filename, "r").read())
218
219 def compare_routing_table(router, expected):
220 "Helper function to ensure zebra rib convergence"
221 actual = router.vtysh_cmd(
222 "show ipv6 route vrf {}-cust1 json".format(rname), isjson=True
223 )
224 return topotest.json_cmp(actual, expected)
225
226 test_func = functools.partial(compare_routing_table, router, expected)
227 (result, diff) = topotest.run_and_expect(test_func, None, count=20, wait=1)
228 assertmsg = "Router '{}' routes mismatch diff: ".format(rname, diff)
229 assert result, assertmsg
230
231
232 def test_isis_linux_route6_installation():
233 "Check whether all expected routes are present and installed in the OS"
234 tgen = get_topogen()
235 # Don't run this test if we have any failure.
236 if tgen.routers_have_failure():
237 pytest.skip(tgen.errors)
238
239 # Required linux kernel version for this suite to run.
240 result = required_linux_kernel_version("4.15")
241 if result is not True:
242 pytest.skip("Kernel requirements are not met")
243
244 # iproute2 needs to support VRFs for this suite to run.
245 if not iproute2_is_vrf_capable():
246 pytest.skip("Installed iproute2 version does not support VRFs")
247
248 logger.info("Checking routers for installed ISIS vrf IPv6 routes in OS")
249 # Check for routes in `ip -6 route show vrf {}-cust1`
250 for rname, router in tgen.routers().items():
251 filename = "{0}/{1}/{1}_route6_linux.json".format(CWD, rname)
252 expected = json.loads(open(filename, "r").read())
253 actual = topotest.ip6_vrf_route(router)
254 assertmsg = "Router '{}' OS routes mismatch".format(rname)
255 assert topotest.json_cmp(actual, expected) is None, assertmsg
256
257
258 def test_memory_leak():
259 "Run the memory leak test and report results."
260 tgen = get_topogen()
261 if not tgen.is_memleak_enabled():
262 pytest.skip("Memory leak test/report is disabled")
263
264 tgen.report_memory_leaks()
265
266
267 if __name__ == "__main__":
268 args = ["-s"] + sys.argv[1:]
269 sys.exit(pytest.main(args))
270
271
272 #
273 # Auxiliary functions
274 #
275
276
277 def dict_merge(dct, merge_dct):
278 """
279 Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
280 updating only top-level keys, dict_merge recurses down into dicts nested
281 to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
282 ``dct``.
283 :param dct: dict onto which the merge is executed
284 :param merge_dct: dct merged into dct
285 :return: None
286
287 Source:
288 https://gist.github.com/angstwad/bf22d1822c38a92ec0a9
289 """
290 for k, v in merge_dct.items():
291 if k in dct and isinstance(dct[k], dict) and topotest.is_mapping(merge_dct[k]):
292 dict_merge(dct[k], merge_dct[k])
293 else:
294 dct[k] = merge_dct[k]
295
296
297 def parse_topology(lines, level):
298 """
299 Parse the output of 'show isis topology level-X' into a Python dict.
300 """
301 areas = {}
302 area = None
303 ipv = None
304 vertex_type_regex = "|".join(VERTEX_TYPE_LIST)
305
306 for line in lines:
307 area_match = re.match(r"Area (.+):", line)
308 if area_match:
309 area = area_match.group(1)
310 if area not in areas:
311 areas[area] = {level: {"ipv4": [], "ipv6": []}}
312 ipv = None
313 continue
314 elif area is None:
315 continue
316
317 if re.match(r"IS\-IS paths to level-. routers that speak IPv6", line):
318 ipv = "ipv6"
319 continue
320 if re.match(r"IS\-IS paths to level-. routers that speak IP", line):
321 ipv = "ipv4"
322 continue
323
324 item_match = re.match(
325 r"([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+)", line
326 )
327 if (
328 item_match is not None
329 and item_match.group(1) == "Vertex"
330 and item_match.group(2) == "Type"
331 and item_match.group(3) == "Metric"
332 and item_match.group(4) == "Next-Hop"
333 and item_match.group(5) == "Interface"
334 and item_match.group(6) == "Parent"
335 ):
336 # Skip header
337 continue
338
339 item_match = re.match(
340 r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+) ([^\s]+) ([^\s]+)".format(
341 vertex_type_regex
342 ),
343 line,
344 )
345 if item_match is not None:
346 areas[area][level][ipv].append(
347 {
348 "vertex": item_match.group(1),
349 "type": item_match.group(2),
350 "metric": item_match.group(3),
351 "next-hop": item_match.group(5),
352 "interface": item_match.group(6),
353 "parent": item_match.group(7),
354 }
355 )
356 continue
357
358 item_match = re.match(
359 r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+)".format(vertex_type_regex),
360 line,
361 )
362
363 if item_match is not None:
364 areas[area][level][ipv].append(
365 {
366 "vertex": item_match.group(1),
367 "type": item_match.group(2),
368 "metric": item_match.group(3),
369 "parent": item_match.group(5),
370 }
371 )
372 continue
373
374 item_match = re.match(r"([^\s]+)", line)
375 if item_match is not None:
376 areas[area][level][ipv].append({"vertex": item_match.group(1)})
377 continue
378
379 return areas
380
381
382 def show_isis_topology(router):
383 """
384 Get the ISIS vrf topology in a dictionary format.
385
386 Sample:
387 {
388 'area-name': {
389 'level-1': [
390 {
391 'vertex': 'r1'
392 }
393 ],
394 'level-2': [
395 {
396 'vertex': '10.0.0.1/24',
397 'type': 'IP',
398 'parent': '0',
399 'metric': 'internal'
400 }
401 ]
402 },
403 'area-name-2': {
404 'level-2': [
405 {
406 "interface": "rX-ethY",
407 "metric": "Z",
408 "next-hop": "rA",
409 "parent": "rC(B)",
410 "type": "TE-IS",
411 "vertex": "rD"
412 }
413 ]
414 }
415 }
416 """
417 l1out = topotest.normalize_text(
418 router.vtysh_cmd("show isis vrf {}-cust1 topology level-1".format(router.name))
419 ).splitlines()
420 l2out = topotest.normalize_text(
421 router.vtysh_cmd("show isis vrf {}-cust1 topology level-2".format(router.name))
422 ).splitlines()
423
424 l1 = parse_topology(l1out, "level-1")
425 l2 = parse_topology(l2out, "level-2")
426
427 dict_merge(l1, l2)
428 return l1