2 # SPDX-License-Identifier: ISC
6 # Part of NetDEF Topology Tests
8 # Copyright (c) 2017 by
9 # Network Device Education Foundation, Inc. ("NetDEF")
13 test_isis_topo1.py: Test ISIS topology.
23 CWD
= os
.path
.dirname(os
.path
.realpath(__file__
))
24 sys
.path
.append(os
.path
.join(CWD
, "../"))
26 # pylint: disable=C0413
27 from lib
import topotest
28 from lib
.common_config
import (
33 from lib
.topogen
import Topogen
, TopoRouter
, get_topogen
34 from lib
.topolog
import logger
37 pytestmark
= [pytest
.mark
.isisd
]
65 for routern
in range(1, 6):
66 tgen
.add_router("r{}".format(routern
))
69 sw
= tgen
.add_switch("sw1")
70 sw
.add_link(tgen
.gears
["r1"])
71 sw
.add_link(tgen
.gears
["r3"])
74 sw
= tgen
.add_switch("sw2")
75 sw
.add_link(tgen
.gears
["r2"])
76 sw
.add_link(tgen
.gears
["r4"])
79 sw
= tgen
.add_switch("sw3")
80 sw
.add_link(tgen
.gears
["r3"])
81 sw
.add_link(tgen
.gears
["r5"])
84 sw
= tgen
.add_switch("sw4")
85 sw
.add_link(tgen
.gears
["r4"])
86 sw
.add_link(tgen
.gears
["r5"])
89 def setup_module(mod
):
90 "Sets up the pytest environment"
91 tgen
= Topogen(build_topo
, mod
.__name
__)
94 # For all registered routers, load the zebra configuration file
95 for rname
, router
in tgen
.routers().items():
97 TopoRouter
.RD_ZEBRA
, os
.path
.join(CWD
, "{}/zebra.conf".format(rname
))
100 TopoRouter
.RD_ISIS
, os
.path
.join(CWD
, "{}/isisd.conf".format(rname
))
103 # After loading the configurations, this function loads configured daemons.
107 def teardown_module(mod
):
108 "Teardown the pytest environment"
111 # This function tears down the whole topology.
115 def test_isis_convergence():
116 "Wait for the protocol to converge before starting to test"
118 # Don't run this test if we have any failure.
119 if tgen
.routers_have_failure():
120 pytest
.skip(tgen
.errors
)
122 logger
.info("waiting for ISIS protocol to converge")
123 # Code to generate the json files.
124 # for rname, router in tgen.routers().items():
125 # open('/tmp/{}_topology.json'.format(rname), 'w').write(
126 # json.dumps(show_isis_topology(router), indent=2, sort_keys=True)
129 for rname
, router
in tgen
.routers().items():
130 filename
= "{0}/{1}/{1}_topology.json".format(CWD
, rname
)
131 expected
= json
.loads(open(filename
).read())
133 def compare_isis_topology(router
, expected
):
134 "Helper function to test ISIS topology convergence."
135 actual
= show_isis_topology(router
)
136 return topotest
.json_cmp(actual
, expected
)
138 test_func
= functools
.partial(compare_isis_topology
, router
, expected
)
139 (result
, diff
) = topotest
.run_and_expect(test_func
, None, wait
=0.5, count
=120)
140 assert result
, "ISIS did not converge on {}:\n{}".format(rname
, diff
)
143 def test_isis_route_installation():
144 "Check whether all expected routes are present"
146 # Don't run this test if we have any failure.
147 if tgen
.routers_have_failure():
148 pytest
.skip(tgen
.errors
)
150 logger
.info("Checking routers for installed ISIS routes")
152 # Check for routes in 'show ip route json'
153 for rname
, router
in tgen
.routers().items():
154 filename
= "{0}/{1}/{1}_route.json".format(CWD
, rname
)
155 expected
= json
.loads(open(filename
, "r").read())
157 def compare_isis_installed_routes(router
, expected
):
158 "Helper function to test ISIS routes installed in rib."
159 actual
= router
.vtysh_cmd("show ip route json", isjson
=True)
160 return topotest
.json_cmp(actual
, expected
)
162 test_func
= functools
.partial(compare_isis_installed_routes
, router
, expected
)
163 (result
, diff
) = topotest
.run_and_expect(test_func
, None, wait
=1, count
=10)
164 assertmsg
= "Router '{}' routes mismatch".format(rname
)
165 assert result
, assertmsg
168 def test_isis_linux_route_installation():
169 "Check whether all expected routes are present and installed in the OS"
171 # Don't run this test if we have any failure.
172 if tgen
.routers_have_failure():
173 pytest
.skip(tgen
.errors
)
175 logger
.info("Checking routers for installed ISIS routes in OS")
177 # Check for routes in `ip route`
178 for rname
, router
in tgen
.routers().items():
179 filename
= "{0}/{1}/{1}_route_linux.json".format(CWD
, rname
)
180 expected
= json
.loads(open(filename
, "r").read())
181 actual
= topotest
.ip4_route(router
)
182 assertmsg
= "Router '{}' OS routes mismatch".format(rname
)
183 assert topotest
.json_cmp(actual
, expected
) is None, assertmsg
186 def test_isis_route6_installation():
187 "Check whether all expected routes are present"
189 # Don't run this test if we have any failure.
190 if tgen
.routers_have_failure():
191 pytest
.skip(tgen
.errors
)
193 logger
.info("Checking routers for installed ISIS IPv6 routes")
195 # Check for routes in 'show ip route json'
196 for rname
, router
in tgen
.routers().items():
197 filename
= "{0}/{1}/{1}_route6.json".format(CWD
, rname
)
198 expected
= json
.loads(open(filename
, "r").read())
200 def compare_isis_v6_installed_routes(router
, expected
):
201 "Helper function to test ISIS v6 routes installed in rib."
202 actual
= router
.vtysh_cmd("show ipv6 route json", isjson
=True)
203 return topotest
.json_cmp(actual
, expected
)
205 test_func
= functools
.partial(
206 compare_isis_v6_installed_routes
, router
, expected
208 (result
, diff
) = topotest
.run_and_expect(test_func
, None, wait
=1, count
=10)
209 assertmsg
= "Router '{}' routes mismatch".format(rname
)
210 assert result
, assertmsg
213 def test_isis_linux_route6_installation():
214 "Check whether all expected routes are present and installed in the OS"
216 # Don't run this test if we have any failure.
217 if tgen
.routers_have_failure():
218 pytest
.skip(tgen
.errors
)
220 logger
.info("Checking routers for installed ISIS IPv6 routes in OS")
222 # Check for routes in `ip route`
223 for rname
, router
in tgen
.routers().items():
224 filename
= "{0}/{1}/{1}_route6_linux.json".format(CWD
, rname
)
225 expected
= json
.loads(open(filename
, "r").read())
226 actual
= topotest
.ip6_route(router
)
227 assertmsg
= "Router '{}' OS routes mismatch".format(rname
)
228 assert topotest
.json_cmp(actual
, expected
) is None, assertmsg
231 def test_isis_summary_json():
232 "Check json struct in show isis summary json"
235 # Don't run this test if we have any failure.
236 if tgen
.routers_have_failure():
237 pytest
.skip(tgen
.errors
)
239 logger
.info("Checking 'show isis summary json'")
240 for rname
, router
in tgen
.routers().items():
241 logger
.info("Checking router %s", rname
)
242 json_output
= tgen
.gears
[rname
].vtysh_cmd("show isis summary json", isjson
=True)
243 assertmsg
= "Test isis summary json failed in '{}' data '{}'".format(
246 assert json_output
["vrf"] == "default", assertmsg
247 assert json_output
["areas"][0]["area"] == "1", assertmsg
248 assert json_output
["areas"][0]["levels"][0]["id"] != "3", assertmsg
251 def test_isis_interface_json():
252 "Check json struct in show isis interface json"
255 # Don't run this test if we have any failure.
256 if tgen
.routers_have_failure():
257 pytest
.skip(tgen
.errors
)
259 logger
.info("Checking 'show isis interface json'")
260 for rname
, router
in tgen
.routers().items():
261 logger
.info("Checking router %s", rname
)
262 json_output
= tgen
.gears
[rname
].vtysh_cmd(
263 "show isis interface json", isjson
=True
265 assertmsg
= "Test isis interface json failed in '{}' data '{}'".format(
269 json_output
["areas"][0]["circuits"][0]["interface"]["name"]
273 for rname
, router
in tgen
.routers().items():
274 logger
.info("Checking router %s", rname
)
275 json_output
= tgen
.gears
[rname
].vtysh_cmd(
276 "show isis interface detail json", isjson
=True
278 assertmsg
= "Test isis interface json failed in '{}' data '{}'".format(
282 json_output
["areas"][0]["circuits"][0]["interface"]["name"]
287 def test_isis_neighbor_json():
288 "Check json struct in show isis neighbor json"
291 # Don't run this test if we have any failure.
292 if tgen
.routers_have_failure():
293 pytest
.skip(tgen
.errors
)
296 logger
.info("Checking 'show isis neighbor json'")
297 for rname
, router
in tgen
.routers().items():
298 logger
.info("Checking router %s", rname
)
299 json_output
= tgen
.gears
[rname
].vtysh_cmd(
300 "show isis neighbor json", isjson
=True
302 assertmsg
= "Test isis neighbor json failed in '{}' data '{}'".format(
306 json_output
["areas"][0]["circuits"][0]["interface"] == rname
+ "-eth0"
309 for rname
, router
in tgen
.routers().items():
310 logger
.info("Checking router %s", rname
)
311 json_output
= tgen
.gears
[rname
].vtysh_cmd(
312 "show isis neighbor detail json", isjson
=True
314 assertmsg
= "Test isis neighbor json failed in '{}' data '{}'".format(
318 json_output
["areas"][0]["circuits"][0]["interface"]["name"]
323 def test_isis_database_json():
324 "Check json struct in show isis database json"
327 # Don't run this test if we have any failure.
328 if tgen
.routers_have_failure():
329 pytest
.skip(tgen
.errors
)
332 logger
.info("Checking 'show isis database json'")
333 for rname
, router
in tgen
.routers().items():
334 logger
.info("Checking router %s", rname
)
335 json_output
= tgen
.gears
[rname
].vtysh_cmd(
336 "show isis database json", isjson
=True
338 assertmsg
= "Test isis database json failed in '{}' data '{}'".format(
341 assert json_output
["areas"][0]["area"]["name"] == "1", assertmsg
342 assert json_output
["areas"][0]["levels"][0]["id"] != "3", assertmsg
344 for rname
, router
in tgen
.routers().items():
345 logger
.info("Checking router %s", rname
)
346 json_output
= tgen
.gears
[rname
].vtysh_cmd(
347 "show isis database detail json", isjson
=True
349 assertmsg
= "Test isis database json failed in '{}' data '{}'".format(
352 assert json_output
["areas"][0]["area"]["name"] == "1", assertmsg
353 assert json_output
["areas"][0]["levels"][0]["id"] != "3", assertmsg
356 def test_isis_overload_on_startup():
357 "Check that overload on startup behaves as expected"
360 net
= get_topogen().net
363 # Don't run this test if we have any failure.
364 if tgen
.routers_have_failure():
365 pytest
.skip(tgen
.errors
)
367 logger
.info("Testing overload on startup behavior")
369 # Configure set-overload-bit on-startup on r3
370 r3
= tgen
.gears
["r3"]
375 set-overload-bit on-startup {overload_time}
379 logger
.info("Stop router")
380 stop_router(tgen
, "r3")
381 logger
.info("Start router")
383 tstamp_before_start_router
= datetime
.datetime
.now()
384 start_router(tgen
, "r3")
385 tstamp_after_start_router
= datetime
.datetime
.now()
386 startup_router_time
= (
387 tstamp_after_start_router
- tstamp_before_start_router
390 # Check that the overload bit is set in r3's LSP
391 check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
392 check_lsp_overload_bit("r1", "r3.00-00", "0/0/1")
394 # Attempt to unset overload bit while timer is still running
399 no set-overload-bit on-startup
404 # Check overload bit is still set
405 check_lsp_overload_bit("r1", "r3.00-00", "0/0/1")
407 # Check that overload bit is unset after timer completes
408 check_lsp_overload_bit("r3", "r3.00-00", "0/0/0")
409 tstamp_after_bit_unset
= datetime
.datetime
.now()
410 check_lsp_overload_bit("r1", "r3.00-00", "0/0/0")
412 # Collect time overloaded
414 tstamp_after_bit_unset
- tstamp_after_start_router
416 logger
.info(f
"Time Overloaded: {time_overloaded}")
418 # Use time it took to startup router as lower bound
420 f
"Assert that overload time falls in range: {overload_time - startup_router_time} < {time_overloaded} <= {overload_time}"
422 result
= overload_time
- startup_router_time
< time_overloaded
<= overload_time
426 def test_isis_overload_on_startup_cancel_timer():
427 "Check that overload on startup timer is cancelled when overload bit is set/unset"
430 net
= get_topogen().net
433 # Don't run this test if we have any failure.
434 if tgen
.routers_have_failure():
435 pytest
.skip(tgen
.errors
)
438 "Testing overload on startup behavior with set overload bit: cancel timer"
441 # Configure set-overload-bit on-startup on r3
442 r3
= tgen
.gears
["r3"]
447 set-overload-bit on-startup {overload_time}
452 logger
.info("Stop router")
453 stop_router(tgen
, "r3")
454 logger
.info("Start router")
455 start_router(tgen
, "r3")
457 # Check that the overload bit is set in r3's LSP
458 check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
460 # Check that overload timer is running
461 check_overload_timer("r3", True)
463 # Unset overload bit while timer is running
472 # Check that overload timer is cancelled
473 check_overload_timer("r3", False)
475 # Check overload bit is unset
476 check_lsp_overload_bit("r3", "r3.00-00", "0/0/0")
479 def test_isis_overload_on_startup_override_timer():
480 "Check that overload bit remains set after overload timer expires if overload bit is configured"
483 net
= get_topogen().net
486 # Don't run this test if we have any failure.
487 if tgen
.routers_have_failure():
488 pytest
.skip(tgen
.errors
)
491 "Testing overload on startup behavior with set overload bit: override timer"
494 # Configure set-overload-bit on-startup on r3
495 r3
= tgen
.gears
["r3"]
500 set-overload-bit on-startup {overload_time}
505 logger
.info("Stop router")
506 stop_router(tgen
, "r3")
507 logger
.info("Start router")
508 start_router(tgen
, "r3")
510 # Check that the overload bit is set in r3's LSP
511 check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
513 # Check that overload timer is running
514 check_overload_timer("r3", True)
516 # Check that overload timer expired
517 check_overload_timer("r3", False)
519 # Check overload bit is still set
520 check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
523 @retry(retry_timeout
=200)
524 def _check_lsp_overload_bit(router
, overloaded_router_lsp
, att_p_ol_expected
):
525 "Verfiy overload bit in router's LSP"
528 router
= tgen
.gears
[router
]
529 logger
.info(f
"check_overload_bit {router}")
530 isis_database_output
= router
.vtysh_cmd(
531 "show isis database {} json".format(overloaded_router_lsp
)
534 database_json
= json
.loads(isis_database_output
)
535 att_p_ol
= database_json
["areas"][0]["levels"][1]["att-p-ol"]
536 if att_p_ol
== att_p_ol_expected
:
538 return "{} peer with expected att_p_ol {} got {} ".format(
539 router
.name
, att_p_ol_expected
, att_p_ol
543 def check_lsp_overload_bit(router
, overloaded_router_lsp
, att_p_ol_expected
):
544 "Verfiy overload bit in router's LSP"
546 assertmsg
= _check_lsp_overload_bit(
547 router
, overloaded_router_lsp
, att_p_ol_expected
549 assert assertmsg
is True, assertmsg
552 @retry(retry_timeout
=200)
553 def _check_overload_timer(router
, timer_expected
):
554 "Verfiy overload bit in router's LSP"
557 router
= tgen
.gears
[router
]
558 thread_output
= router
.vtysh_cmd("show thread timers")
560 timer_running
= "set_overload_on_start_timer" in thread_output
561 if timer_running
== timer_expected
:
563 return "Expected timer running status: {}".format(timer_expected
)
566 def check_overload_timer(router
, timer_expected
):
567 "Verfiy overload bit in router's LSP"
569 assertmsg
= _check_overload_timer(router
, timer_expected
)
570 assert assertmsg
is True, assertmsg
573 def test_memory_leak():
574 "Run the memory leak test and report results."
576 if not tgen
.is_memleak_enabled():
577 pytest
.skip("Memory leak test/report is disabled")
579 tgen
.report_memory_leaks()
582 if __name__
== "__main__":
583 args
= ["-s"] + sys
.argv
[1:]
584 sys
.exit(pytest
.main(args
))
588 # Auxiliary functions
592 def dict_merge(dct
, merge_dct
):
594 Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
595 updating only top-level keys, dict_merge recurses down into dicts nested
596 to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
598 :param dct: dict onto which the merge is executed
599 :param merge_dct: dct merged into dct
603 https://gist.github.com/angstwad/bf22d1822c38a92ec0a9
605 for k
, v
in merge_dct
.items():
606 if k
in dct
and isinstance(dct
[k
], dict) and topotest
.is_mapping(merge_dct
[k
]):
607 dict_merge(dct
[k
], merge_dct
[k
])
609 dct
[k
] = merge_dct
[k
]
612 def parse_topology(lines
, level
):
614 Parse the output of 'show isis topology level-X' into a Python dict.
619 vertex_type_regex
= "|".join(VERTEX_TYPE_LIST
)
622 area_match
= re
.match(r
"Area (.+):", line
)
624 area
= area_match
.group(1)
625 if area
not in areas
:
626 areas
[area
] = {level
: {"ipv4": [], "ipv6": []}}
632 if re
.match(r
"IS\-IS paths to level-. routers that speak IPv6", line
):
635 if re
.match(r
"IS\-IS paths to level-. routers that speak IP", line
):
639 item_match
= re
.match(
640 r
"([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+)", line
643 item_match
is not None
644 and item_match
.group(1) == "Vertex"
645 and item_match
.group(2) == "Type"
646 and item_match
.group(3) == "Metric"
647 and item_match
.group(4) == "Next-Hop"
648 and item_match
.group(5) == "Interface"
649 and item_match
.group(6) == "Parent"
654 item_match
= re
.match(
655 r
"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+) ([^\s]+) ([^\s]+)".format(
660 if item_match
is not None:
661 areas
[area
][level
][ipv
].append(
663 "vertex": item_match
.group(1),
664 "type": item_match
.group(2),
665 "metric": item_match
.group(3),
666 "next-hop": item_match
.group(5),
667 "interface": item_match
.group(6),
668 "parent": item_match
.group(7),
673 item_match
= re
.match(
674 r
"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+)".format(vertex_type_regex
),
678 if item_match
is not None:
679 areas
[area
][level
][ipv
].append(
681 "vertex": item_match
.group(1),
682 "type": item_match
.group(2),
683 "metric": item_match
.group(3),
684 "parent": item_match
.group(5),
689 item_match
= re
.match(r
"([^\s]+)", line
)
690 if item_match
is not None:
691 areas
[area
][level
][ipv
].append({"vertex": item_match
.group(1)})
697 def show_isis_topology(router
):
699 Get the ISIS topology in a dictionary format.
711 'vertex': '10.0.0.1/24',
721 "interface": "rX-ethY",
732 l1out
= topotest
.normalize_text(
733 router
.vtysh_cmd("show isis topology level-1")
735 l2out
= topotest
.normalize_text(
736 router
.vtysh_cmd("show isis topology level-2")
739 l1
= parse_topology(l1out
, "level-1")
740 l2
= parse_topology(l2out
, "level-2")