]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/isis_topo1/test_isis_topo1.py
Merge pull request #13649 from donaldsharp/unlock_the_node_or_else
[mirror_frr.git] / tests / topotests / isis_topo1 / test_isis_topo1.py
1 #!/usr/bin/env python
2 # SPDX-License-Identifier: ISC
3
4 #
5 # test_isis_topo1.py
6 # Part of NetDEF Topology Tests
7 #
8 # Copyright (c) 2017 by
9 # Network Device Education Foundation, Inc. ("NetDEF")
10 #
11
12 """
13 test_isis_topo1.py: Test ISIS topology.
14 """
15 import datetime
16 import functools
17 import json
18 import os
19 import re
20 import sys
21 import pytest
22
23 CWD = os.path.dirname(os.path.realpath(__file__))
24 sys.path.append(os.path.join(CWD, "../"))
25
26 # pylint: disable=C0413
27 from lib import topotest
28 from lib.common_config import (
29 retry,
30 stop_router,
31 start_router,
32 )
33 from lib.topogen import Topogen, TopoRouter, get_topogen
34 from lib.topolog import logger
35
36
37 pytestmark = [pytest.mark.isisd]
38
39 VERTEX_TYPE_LIST = [
40 "pseudo_IS",
41 "pseudo_TE-IS",
42 "IS",
43 "TE-IS",
44 "ES",
45 "IP internal",
46 "IP external",
47 "IP TE",
48 "IP6 internal",
49 "IP6 external",
50 "UNKNOWN",
51 ]
52
53
54 def build_topo(tgen):
55 "Build function"
56
57 # Add ISIS routers:
58 # r1 r2
59 # | sw1 | sw2
60 # r3 r4
61 # | |
62 # sw3 sw4
63 # \ /
64 # r5
65 for routern in range(1, 6):
66 tgen.add_router("r{}".format(routern))
67
68 # r1 <- sw1 -> r3
69 sw = tgen.add_switch("sw1")
70 sw.add_link(tgen.gears["r1"])
71 sw.add_link(tgen.gears["r3"])
72
73 # r2 <- sw2 -> r4
74 sw = tgen.add_switch("sw2")
75 sw.add_link(tgen.gears["r2"])
76 sw.add_link(tgen.gears["r4"])
77
78 # r3 <- sw3 -> r5
79 sw = tgen.add_switch("sw3")
80 sw.add_link(tgen.gears["r3"])
81 sw.add_link(tgen.gears["r5"])
82
83 # r4 <- sw4 -> r5
84 sw = tgen.add_switch("sw4")
85 sw.add_link(tgen.gears["r4"])
86 sw.add_link(tgen.gears["r5"])
87
88
89 def setup_module(mod):
90 "Sets up the pytest environment"
91 tgen = Topogen(build_topo, mod.__name__)
92 tgen.start_topology()
93
94 # For all registered routers, load the zebra configuration file
95 for rname, router in tgen.routers().items():
96 router.load_config(
97 TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
98 )
99 router.load_config(
100 TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))
101 )
102
103 # After loading the configurations, this function loads configured daemons.
104 tgen.start_router()
105
106
107 def teardown_module(mod):
108 "Teardown the pytest environment"
109 tgen = get_topogen()
110
111 # This function tears down the whole topology.
112 tgen.stop_topology()
113
114
115 def test_isis_convergence():
116 "Wait for the protocol to converge before starting to test"
117 tgen = get_topogen()
118 # Don't run this test if we have any failure.
119 if tgen.routers_have_failure():
120 pytest.skip(tgen.errors)
121
122 logger.info("waiting for ISIS protocol to converge")
123 # Code to generate the json files.
124 # for rname, router in tgen.routers().items():
125 # open('/tmp/{}_topology.json'.format(rname), 'w').write(
126 # json.dumps(show_isis_topology(router), indent=2, sort_keys=True)
127 # )
128
129 for rname, router in tgen.routers().items():
130 filename = "{0}/{1}/{1}_topology.json".format(CWD, rname)
131 expected = json.loads(open(filename).read())
132
133 def compare_isis_topology(router, expected):
134 "Helper function to test ISIS topology convergence."
135 actual = show_isis_topology(router)
136 return topotest.json_cmp(actual, expected)
137
138 test_func = functools.partial(compare_isis_topology, router, expected)
139 (result, diff) = topotest.run_and_expect(test_func, None, wait=0.5, count=120)
140 assert result, "ISIS did not converge on {}:\n{}".format(rname, diff)
141
142
143 def test_isis_route_installation():
144 "Check whether all expected routes are present"
145 tgen = get_topogen()
146 # Don't run this test if we have any failure.
147 if tgen.routers_have_failure():
148 pytest.skip(tgen.errors)
149
150 logger.info("Checking routers for installed ISIS routes")
151
152 # Check for routes in 'show ip route json'
153 for rname, router in tgen.routers().items():
154 filename = "{0}/{1}/{1}_route.json".format(CWD, rname)
155 expected = json.loads(open(filename, "r").read())
156
157 def compare_isis_installed_routes(router, expected):
158 "Helper function to test ISIS routes installed in rib."
159 actual = router.vtysh_cmd("show ip route json", isjson=True)
160 return topotest.json_cmp(actual, expected)
161
162 test_func = functools.partial(compare_isis_installed_routes, router, expected)
163 (result, diff) = topotest.run_and_expect(test_func, None, wait=1, count=10)
164 assertmsg = "Router '{}' routes mismatch".format(rname)
165 assert result, assertmsg
166
167
168 def test_isis_linux_route_installation():
169 "Check whether all expected routes are present and installed in the OS"
170 tgen = get_topogen()
171 # Don't run this test if we have any failure.
172 if tgen.routers_have_failure():
173 pytest.skip(tgen.errors)
174
175 logger.info("Checking routers for installed ISIS routes in OS")
176
177 # Check for routes in `ip route`
178 for rname, router in tgen.routers().items():
179 filename = "{0}/{1}/{1}_route_linux.json".format(CWD, rname)
180 expected = json.loads(open(filename, "r").read())
181 actual = topotest.ip4_route(router)
182 assertmsg = "Router '{}' OS routes mismatch".format(rname)
183 assert topotest.json_cmp(actual, expected) is None, assertmsg
184
185
186 def test_isis_route6_installation():
187 "Check whether all expected routes are present"
188 tgen = get_topogen()
189 # Don't run this test if we have any failure.
190 if tgen.routers_have_failure():
191 pytest.skip(tgen.errors)
192
193 logger.info("Checking routers for installed ISIS IPv6 routes")
194
195 # Check for routes in 'show ip route json'
196 for rname, router in tgen.routers().items():
197 filename = "{0}/{1}/{1}_route6.json".format(CWD, rname)
198 expected = json.loads(open(filename, "r").read())
199
200 def compare_isis_v6_installed_routes(router, expected):
201 "Helper function to test ISIS v6 routes installed in rib."
202 actual = router.vtysh_cmd("show ipv6 route json", isjson=True)
203 return topotest.json_cmp(actual, expected)
204
205 test_func = functools.partial(
206 compare_isis_v6_installed_routes, router, expected
207 )
208 (result, diff) = topotest.run_and_expect(test_func, None, wait=1, count=10)
209 assertmsg = "Router '{}' routes mismatch".format(rname)
210 assert result, assertmsg
211
212
213 def test_isis_linux_route6_installation():
214 "Check whether all expected routes are present and installed in the OS"
215 tgen = get_topogen()
216 # Don't run this test if we have any failure.
217 if tgen.routers_have_failure():
218 pytest.skip(tgen.errors)
219
220 logger.info("Checking routers for installed ISIS IPv6 routes in OS")
221
222 # Check for routes in `ip route`
223 for rname, router in tgen.routers().items():
224 filename = "{0}/{1}/{1}_route6_linux.json".format(CWD, rname)
225 expected = json.loads(open(filename, "r").read())
226 actual = topotest.ip6_route(router)
227 assertmsg = "Router '{}' OS routes mismatch".format(rname)
228 assert topotest.json_cmp(actual, expected) is None, assertmsg
229
230
231 def test_isis_summary_json():
232 "Check json struct in show isis summary json"
233
234 tgen = get_topogen()
235 # Don't run this test if we have any failure.
236 if tgen.routers_have_failure():
237 pytest.skip(tgen.errors)
238
239 logger.info("Checking 'show isis summary json'")
240 for rname, router in tgen.routers().items():
241 logger.info("Checking router %s", rname)
242 json_output = tgen.gears[rname].vtysh_cmd("show isis summary json", isjson=True)
243 assertmsg = "Test isis summary json failed in '{}' data '{}'".format(
244 rname, json_output
245 )
246 assert json_output["vrf"] == "default", assertmsg
247 assert json_output["areas"][0]["area"] == "1", assertmsg
248 assert json_output["areas"][0]["levels"][0]["id"] != "3", assertmsg
249
250
251 def test_isis_interface_json():
252 "Check json struct in show isis interface json"
253
254 tgen = get_topogen()
255 # Don't run this test if we have any failure.
256 if tgen.routers_have_failure():
257 pytest.skip(tgen.errors)
258
259 logger.info("Checking 'show isis interface json'")
260 for rname, router in tgen.routers().items():
261 logger.info("Checking router %s", rname)
262 json_output = tgen.gears[rname].vtysh_cmd(
263 "show isis interface json", isjson=True
264 )
265 assertmsg = "Test isis interface json failed in '{}' data '{}'".format(
266 rname, json_output
267 )
268 assert (
269 json_output["areas"][0]["circuits"][0]["interface"]["name"]
270 == rname + "-eth0"
271 ), assertmsg
272
273 for rname, router in tgen.routers().items():
274 logger.info("Checking router %s", rname)
275 json_output = tgen.gears[rname].vtysh_cmd(
276 "show isis interface detail json", isjson=True
277 )
278 assertmsg = "Test isis interface json failed in '{}' data '{}'".format(
279 rname, json_output
280 )
281 assert (
282 json_output["areas"][0]["circuits"][0]["interface"]["name"]
283 == rname + "-eth0"
284 ), assertmsg
285
286
287 def test_isis_neighbor_json():
288 "Check json struct in show isis neighbor json"
289
290 tgen = get_topogen()
291 # Don't run this test if we have any failure.
292 if tgen.routers_have_failure():
293 pytest.skip(tgen.errors)
294
295 # tgen.mininet_cli()
296 logger.info("Checking 'show isis neighbor json'")
297 for rname, router in tgen.routers().items():
298 logger.info("Checking router %s", rname)
299 json_output = tgen.gears[rname].vtysh_cmd(
300 "show isis neighbor json", isjson=True
301 )
302 assertmsg = "Test isis neighbor json failed in '{}' data '{}'".format(
303 rname, json_output
304 )
305 assert (
306 json_output["areas"][0]["circuits"][0]["interface"] == rname + "-eth0"
307 ), assertmsg
308
309 for rname, router in tgen.routers().items():
310 logger.info("Checking router %s", rname)
311 json_output = tgen.gears[rname].vtysh_cmd(
312 "show isis neighbor detail json", isjson=True
313 )
314 assertmsg = "Test isis neighbor json failed in '{}' data '{}'".format(
315 rname, json_output
316 )
317 assert (
318 json_output["areas"][0]["circuits"][0]["interface"]["name"]
319 == rname + "-eth0"
320 ), assertmsg
321
322
323 def test_isis_database_json():
324 "Check json struct in show isis database json"
325
326 tgen = get_topogen()
327 # Don't run this test if we have any failure.
328 if tgen.routers_have_failure():
329 pytest.skip(tgen.errors)
330
331 # tgen.mininet_cli()
332 logger.info("Checking 'show isis database json'")
333 for rname, router in tgen.routers().items():
334 logger.info("Checking router %s", rname)
335 json_output = tgen.gears[rname].vtysh_cmd(
336 "show isis database json", isjson=True
337 )
338 assertmsg = "Test isis database json failed in '{}' data '{}'".format(
339 rname, json_output
340 )
341 assert json_output["areas"][0]["area"]["name"] == "1", assertmsg
342 assert json_output["areas"][0]["levels"][0]["id"] != "3", assertmsg
343
344 for rname, router in tgen.routers().items():
345 logger.info("Checking router %s", rname)
346 json_output = tgen.gears[rname].vtysh_cmd(
347 "show isis database detail json", isjson=True
348 )
349 assertmsg = "Test isis database json failed in '{}' data '{}'".format(
350 rname, json_output
351 )
352 assert json_output["areas"][0]["area"]["name"] == "1", assertmsg
353 assert json_output["areas"][0]["levels"][0]["id"] != "3", assertmsg
354
355
356 def test_isis_overload_on_startup():
357 "Check that overload on startup behaves as expected"
358
359 tgen = get_topogen()
360 net = get_topogen().net
361 overload_time = 120
362
363 # Don't run this test if we have any failure.
364 if tgen.routers_have_failure():
365 pytest.skip(tgen.errors)
366
367 logger.info("Testing overload on startup behavior")
368
369 # Configure set-overload-bit on-startup on r3
370 r3 = tgen.gears["r3"]
371 r3.vtysh_cmd(
372 f"""
373 configure
374 router isis 1
375 set-overload-bit on-startup {overload_time}
376 """
377 )
378 # Restart r3
379 logger.info("Stop router")
380 stop_router(tgen, "r3")
381 logger.info("Start router")
382
383 tstamp_before_start_router = datetime.datetime.now()
384 start_router(tgen, "r3")
385 tstamp_after_start_router = datetime.datetime.now()
386 startup_router_time = (
387 tstamp_after_start_router - tstamp_before_start_router
388 ).total_seconds()
389
390 # Check that the overload bit is set in r3's LSP
391 check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
392 check_lsp_overload_bit("r1", "r3.00-00", "0/0/1")
393
394 # Attempt to unset overload bit while timer is still running
395 r3.vtysh_cmd(
396 """
397 configure
398 router isis 1
399 no set-overload-bit on-startup
400 no set-overload-bit
401 """
402 )
403
404 # Check overload bit is still set
405 check_lsp_overload_bit("r1", "r3.00-00", "0/0/1")
406
407 # Check that overload bit is unset after timer completes
408 check_lsp_overload_bit("r3", "r3.00-00", "0/0/0")
409 tstamp_after_bit_unset = datetime.datetime.now()
410 check_lsp_overload_bit("r1", "r3.00-00", "0/0/0")
411
412 # Collect time overloaded
413 time_overloaded = (
414 tstamp_after_bit_unset - tstamp_after_start_router
415 ).total_seconds()
416 logger.info(f"Time Overloaded: {time_overloaded}")
417
418 # Use time it took to startup router as lower bound
419 logger.info(
420 f"Assert that overload time falls in range: {overload_time - startup_router_time} < {time_overloaded} <= {overload_time}"
421 )
422 result = overload_time - startup_router_time < time_overloaded <= overload_time
423 assert result
424
425
426 def test_isis_overload_on_startup_cancel_timer():
427 "Check that overload on startup timer is cancelled when overload bit is set/unset"
428
429 tgen = get_topogen()
430 net = get_topogen().net
431 overload_time = 90
432
433 # Don't run this test if we have any failure.
434 if tgen.routers_have_failure():
435 pytest.skip(tgen.errors)
436
437 logger.info(
438 "Testing overload on startup behavior with set overload bit: cancel timer"
439 )
440
441 # Configure set-overload-bit on-startup on r3
442 r3 = tgen.gears["r3"]
443 r3.vtysh_cmd(
444 f"""
445 configure
446 router isis 1
447 set-overload-bit on-startup {overload_time}
448 set-overload-bit
449 """
450 )
451 # Restart r3
452 logger.info("Stop router")
453 stop_router(tgen, "r3")
454 logger.info("Start router")
455 start_router(tgen, "r3")
456
457 # Check that the overload bit is set in r3's LSP
458 check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
459
460 # Check that overload timer is running
461 check_overload_timer("r3", True)
462
463 # Unset overload bit while timer is running
464 r3.vtysh_cmd(
465 """
466 configure
467 router isis 1
468 no set-overload-bit
469 """
470 )
471
472 # Check that overload timer is cancelled
473 check_overload_timer("r3", False)
474
475 # Check overload bit is unset
476 check_lsp_overload_bit("r3", "r3.00-00", "0/0/0")
477
478
479 def test_isis_overload_on_startup_override_timer():
480 "Check that overload bit remains set after overload timer expires if overload bit is configured"
481
482 tgen = get_topogen()
483 net = get_topogen().net
484 overload_time = 60
485
486 # Don't run this test if we have any failure.
487 if tgen.routers_have_failure():
488 pytest.skip(tgen.errors)
489
490 logger.info(
491 "Testing overload on startup behavior with set overload bit: override timer"
492 )
493
494 # Configure set-overload-bit on-startup on r3
495 r3 = tgen.gears["r3"]
496 r3.vtysh_cmd(
497 f"""
498 configure
499 router isis 1
500 set-overload-bit on-startup {overload_time}
501 set-overload-bit
502 """
503 )
504 # Restart r3
505 logger.info("Stop router")
506 stop_router(tgen, "r3")
507 logger.info("Start router")
508 start_router(tgen, "r3")
509
510 # Check that the overload bit is set in r3's LSP
511 check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
512
513 # Check that overload timer is running
514 check_overload_timer("r3", True)
515
516 # Check that overload timer expired
517 check_overload_timer("r3", False)
518
519 # Check overload bit is still set
520 check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
521
522
523 def test_isis_advertise_passive_only():
524 """Check that we only advertise prefixes of passive interfaces when advertise-passive-only is enabled."""
525 tgen = get_topogen()
526 net = get_topogen().net
527 # Don't run this test if we have any failure.
528 if tgen.routers_have_failure():
529 pytest.skip(tgen.errors)
530
531 logger.info("Testing isis advertise-passive-only behavior")
532 expected_prefixes_no_advertise_passive_only = set(
533 ["10.0.20.0/24", "10.254.0.1/32", "2001:db8:f::1/128", "2001:db8:1:1::/64"]
534 )
535 expected_prefixes_advertise_passive_only = set(
536 ["10.254.0.1/32", "2001:db8:f::1/128"]
537 )
538 lsp_id = "r1.00-00"
539
540 r1 = tgen.gears["r1"]
541 r1.vtysh_cmd(
542 """
543 configure
544 router isis 1
545 no redistribute ipv4 connected level-2
546 no redistribute ipv6 connected level-2
547 interface lo
548 ip router isis 1
549 ipv6 router isis 1
550 isis passive
551 end
552 """
553 )
554
555 result = check_advertised_prefixes(
556 r1, lsp_id, expected_prefixes_no_advertise_passive_only
557 )
558 assert result is True, result
559
560 r1.vtysh_cmd(
561 """
562 configure
563 router isis 1
564 advertise-passive-only
565 end
566 """
567 )
568
569 result = check_advertised_prefixes(
570 r1, lsp_id, expected_prefixes_advertise_passive_only
571 )
572 assert result is True, result
573
574
575 def test_isis_hello_padding_during_adjacency_formation():
576 """Check that IIH packets is only padded when adjacency is still being formed
577 when isis hello padding during-adjacency-formation is configured
578 """
579 tgen = get_topogen()
580 net = get_topogen().net
581 # Don't run this test if we have any failure.
582 if tgen.routers_have_failure():
583 pytest.skip(tgen.errors)
584
585 logger.info("Testing isis hello padding during-adjacency-formation behavior")
586 r3 = tgen.gears["r3"]
587
588 # Reduce hello-multiplier to make the adjacency go down faster.
589 r3.vtysh_cmd(
590 """
591 configure
592 interface r3-eth0
593 isis hello-multiplier 2
594 """
595 )
596
597 r1 = tgen.gears["r1"]
598 cmd_output = r1.vtysh_cmd(
599 """
600 configure
601 interface r1-eth0
602 isis hello padding during-adjacency-formation
603 end
604 debug isis adj-packets
605 """
606 )
607 result = check_last_iih_packet_for_padding(r1, expect_padding=False)
608 assert result is True, result
609
610 r3.vtysh_cmd(
611 """
612 configure
613 interface r3-eth0
614 shutdown
615 """
616 )
617 result = check_last_iih_packet_for_padding(r1, expect_padding=True)
618 assert result is True, result
619
620 r3 = tgen.gears["r3"]
621 r3.vtysh_cmd(
622 """
623 configure
624 interface r3-eth0
625 no shutdown
626 """
627 )
628 result = check_last_iih_packet_for_padding(r1, expect_padding=False)
629 assert result is True, result
630
631
632 @retry(retry_timeout=5)
633 def check_last_iih_packet_for_padding(router, expect_padding):
634 logfilename = "{}/{}".format(router.gearlogdir, "isisd.log")
635 last_hello_packet_line = None
636 with open(logfilename, "r") as f:
637 lines = f.readlines()
638 for line in lines:
639 if re.search("Sending .+? IIH", line):
640 last_hello_packet_line = line
641
642 if last_hello_packet_line is None:
643 return "Expected IIH packet in {}, but no packet found".format(logfilename)
644
645 interface_name, packet_length = re.search(
646 r"Sending .+ IIH on (.+), length (\d+)", last_hello_packet_line
647 ).group(1, 2)
648 packet_length = int(packet_length)
649 interface_output = router.vtysh_cmd("show interface {} json".format(interface_name))
650 interface_json = json.loads(interface_output)
651 padded_packet_length = interface_json[interface_name]["mtu"] - 3
652 if expect_padding:
653 if packet_length == padded_packet_length:
654 return True
655 return (
656 "Expected padded packet with length {}, got packet with length {}".format(
657 padded_packet_length, packet_length
658 )
659 )
660 if packet_length < padded_packet_length:
661 return True
662 return "Expected unpadded packet with length less than {}, got packet with length {}".format(
663 padded_packet_length, packet_length
664 )
665
666
667 @retry(retry_timeout=5)
668 def check_advertised_prefixes(router, lsp_id, expected_prefixes):
669 output = router.vtysh_cmd("show isis database detail {}".format(lsp_id))
670 prefixes = set(re.findall(r"IP(?:v6)? Reachability: (.*) \(Metric: 10\)", output))
671 if prefixes == expected_prefixes:
672 return True
673 return str({"expected_prefixes:": expected_prefixes, "prefixes": prefixes})
674
675
676 @retry(retry_timeout=200)
677 def _check_lsp_overload_bit(router, overloaded_router_lsp, att_p_ol_expected):
678 "Verfiy overload bit in router's LSP"
679
680 tgen = get_topogen()
681 router = tgen.gears[router]
682 logger.info(f"check_overload_bit {router}")
683 isis_database_output = router.vtysh_cmd(
684 "show isis database {} json".format(overloaded_router_lsp)
685 )
686
687 database_json = json.loads(isis_database_output)
688 att_p_ol = database_json["areas"][0]["levels"][1]["att-p-ol"]
689 if att_p_ol == att_p_ol_expected:
690 return True
691 return "{} peer with expected att_p_ol {} got {} ".format(
692 router.name, att_p_ol_expected, att_p_ol
693 )
694
695
696 def check_lsp_overload_bit(router, overloaded_router_lsp, att_p_ol_expected):
697 "Verfiy overload bit in router's LSP"
698
699 assertmsg = _check_lsp_overload_bit(
700 router, overloaded_router_lsp, att_p_ol_expected
701 )
702 assert assertmsg is True, assertmsg
703
704
705 @retry(retry_timeout=200)
706 def _check_overload_timer(router, timer_expected):
707 "Verfiy overload bit in router's LSP"
708
709 tgen = get_topogen()
710 router = tgen.gears[router]
711 thread_output = router.vtysh_cmd("show thread timers")
712
713 timer_running = "set_overload_on_start_timer" in thread_output
714 if timer_running == timer_expected:
715 return True
716 return "Expected timer running status: {}".format(timer_expected)
717
718
719 def check_overload_timer(router, timer_expected):
720 "Verfiy overload bit in router's LSP"
721
722 assertmsg = _check_overload_timer(router, timer_expected)
723 assert assertmsg is True, assertmsg
724
725
726 def test_memory_leak():
727 "Run the memory leak test and report results."
728 tgen = get_topogen()
729 if not tgen.is_memleak_enabled():
730 pytest.skip("Memory leak test/report is disabled")
731
732 tgen.report_memory_leaks()
733
734
735 if __name__ == "__main__":
736 args = ["-s"] + sys.argv[1:]
737 sys.exit(pytest.main(args))
738
739
740 #
741 # Auxiliary functions
742 #
743
744
745 def dict_merge(dct, merge_dct):
746 """
747 Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
748 updating only top-level keys, dict_merge recurses down into dicts nested
749 to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
750 ``dct``.
751 :param dct: dict onto which the merge is executed
752 :param merge_dct: dct merged into dct
753 :return: None
754
755 Source:
756 https://gist.github.com/angstwad/bf22d1822c38a92ec0a9
757 """
758 for k, v in merge_dct.items():
759 if k in dct and isinstance(dct[k], dict) and topotest.is_mapping(merge_dct[k]):
760 dict_merge(dct[k], merge_dct[k])
761 else:
762 dct[k] = merge_dct[k]
763
764
765 def parse_topology(lines, level):
766 """
767 Parse the output of 'show isis topology level-X' into a Python dict.
768 """
769 areas = {}
770 area = None
771 ipv = None
772 vertex_type_regex = "|".join(VERTEX_TYPE_LIST)
773
774 for line in lines:
775 area_match = re.match(r"Area (.+):", line)
776 if area_match:
777 area = area_match.group(1)
778 if area not in areas:
779 areas[area] = {level: {"ipv4": [], "ipv6": []}}
780 ipv = None
781 continue
782 elif area is None:
783 continue
784
785 if re.match(r"IS\-IS paths to level-. routers that speak IPv6", line):
786 ipv = "ipv6"
787 continue
788 if re.match(r"IS\-IS paths to level-. routers that speak IP", line):
789 ipv = "ipv4"
790 continue
791
792 item_match = re.match(
793 r"([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+)", line
794 )
795 if (
796 item_match is not None
797 and item_match.group(1) == "Vertex"
798 and item_match.group(2) == "Type"
799 and item_match.group(3) == "Metric"
800 and item_match.group(4) == "Next-Hop"
801 and item_match.group(5) == "Interface"
802 and item_match.group(6) == "Parent"
803 ):
804 # Skip header
805 continue
806
807 item_match = re.match(
808 r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+) ([^\s]+) ([^\s]+)".format(
809 vertex_type_regex
810 ),
811 line,
812 )
813 if item_match is not None:
814 areas[area][level][ipv].append(
815 {
816 "vertex": item_match.group(1),
817 "type": item_match.group(2),
818 "metric": item_match.group(3),
819 "next-hop": item_match.group(5),
820 "interface": item_match.group(6),
821 "parent": item_match.group(7),
822 }
823 )
824 continue
825
826 item_match = re.match(
827 r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+)".format(vertex_type_regex),
828 line,
829 )
830
831 if item_match is not None:
832 areas[area][level][ipv].append(
833 {
834 "vertex": item_match.group(1),
835 "type": item_match.group(2),
836 "metric": item_match.group(3),
837 "parent": item_match.group(5),
838 }
839 )
840 continue
841
842 item_match = re.match(r"([^\s]+)", line)
843 if item_match is not None:
844 areas[area][level][ipv].append({"vertex": item_match.group(1)})
845 continue
846
847 return areas
848
849
850 def show_isis_topology(router):
851 """
852 Get the ISIS topology in a dictionary format.
853
854 Sample:
855 {
856 'area-name': {
857 'level-1': [
858 {
859 'vertex': 'r1'
860 }
861 ],
862 'level-2': [
863 {
864 'vertex': '10.0.0.1/24',
865 'type': 'IP',
866 'parent': '0',
867 'metric': 'internal'
868 }
869 ]
870 },
871 'area-name-2': {
872 'level-2': [
873 {
874 "interface": "rX-ethY",
875 "metric": "Z",
876 "next-hop": "rA",
877 "parent": "rC(B)",
878 "type": "TE-IS",
879 "vertex": "rD"
880 }
881 ]
882 }
883 }
884 """
885 l1out = topotest.normalize_text(
886 router.vtysh_cmd("show isis topology level-1")
887 ).splitlines()
888 l2out = topotest.normalize_text(
889 router.vtysh_cmd("show isis topology level-2")
890 ).splitlines()
891
892 l1 = parse_topology(l1out, "level-1")
893 l2 = parse_topology(l2out, "level-2")
894
895 dict_merge(l1, l2)
896 return l1