5 # Part of NetDEF Topology Tests
7 # Copyright (c) 2017 by
8 # Network Device Education Foundation, Inc. ("NetDEF")
10 # Permission to use, copy, modify, and/or distribute this software
11 # for any purpose with or without fee is hereby granted, provided
12 # that the above copyright notice and this permission notice appear
15 # THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
16 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
17 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
18 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
19 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
20 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
21 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
26 test_isis_topo1.py: Test ISIS topology.
36 CWD
= os
.path
.dirname(os
.path
.realpath(__file__
))
37 sys
.path
.append(os
.path
.join(CWD
, "../"))
39 # pylint: disable=C0413
40 from lib
import topotest
41 from lib
.common_config
import (
46 from lib
.topogen
import Topogen
, TopoRouter
, get_topogen
47 from lib
.topolog
import logger
50 pytestmark
= [pytest
.mark
.isisd
]
78 for routern
in range(1, 6):
79 tgen
.add_router("r{}".format(routern
))
82 sw
= tgen
.add_switch("sw1")
83 sw
.add_link(tgen
.gears
["r1"])
84 sw
.add_link(tgen
.gears
["r3"])
87 sw
= tgen
.add_switch("sw2")
88 sw
.add_link(tgen
.gears
["r2"])
89 sw
.add_link(tgen
.gears
["r4"])
92 sw
= tgen
.add_switch("sw3")
93 sw
.add_link(tgen
.gears
["r3"])
94 sw
.add_link(tgen
.gears
["r5"])
97 sw
= tgen
.add_switch("sw4")
98 sw
.add_link(tgen
.gears
["r4"])
99 sw
.add_link(tgen
.gears
["r5"])
102 def setup_module(mod
):
103 "Sets up the pytest environment"
104 tgen
= Topogen(build_topo
, mod
.__name
__)
105 tgen
.start_topology()
107 # For all registered routers, load the zebra configuration file
108 for rname
, router
in tgen
.routers().items():
110 TopoRouter
.RD_ZEBRA
, os
.path
.join(CWD
, "{}/zebra.conf".format(rname
))
113 TopoRouter
.RD_ISIS
, os
.path
.join(CWD
, "{}/isisd.conf".format(rname
))
116 # After loading the configurations, this function loads configured daemons.
120 def teardown_module(mod
):
121 "Teardown the pytest environment"
124 # This function tears down the whole topology.
128 def test_isis_convergence():
129 "Wait for the protocol to converge before starting to test"
131 # Don't run this test if we have any failure.
132 if tgen
.routers_have_failure():
133 pytest
.skip(tgen
.errors
)
135 logger
.info("waiting for ISIS protocol to converge")
136 # Code to generate the json files.
137 # for rname, router in tgen.routers().items():
138 # open('/tmp/{}_topology.json'.format(rname), 'w').write(
139 # json.dumps(show_isis_topology(router), indent=2, sort_keys=True)
142 for rname
, router
in tgen
.routers().items():
143 filename
= "{0}/{1}/{1}_topology.json".format(CWD
, rname
)
144 expected
= json
.loads(open(filename
).read())
146 def compare_isis_topology(router
, expected
):
147 "Helper function to test ISIS topology convergence."
148 actual
= show_isis_topology(router
)
149 return topotest
.json_cmp(actual
, expected
)
151 test_func
= functools
.partial(compare_isis_topology
, router
, expected
)
152 (result
, diff
) = topotest
.run_and_expect(test_func
, None, wait
=0.5, count
=120)
153 assert result
, "ISIS did not converge on {}:\n{}".format(rname
, diff
)
156 def test_isis_route_installation():
157 "Check whether all expected routes are present"
159 # Don't run this test if we have any failure.
160 if tgen
.routers_have_failure():
161 pytest
.skip(tgen
.errors
)
163 logger
.info("Checking routers for installed ISIS routes")
165 # Check for routes in 'show ip route json'
166 for rname
, router
in tgen
.routers().items():
167 filename
= "{0}/{1}/{1}_route.json".format(CWD
, rname
)
168 expected
= json
.loads(open(filename
, "r").read())
170 def compare_isis_installed_routes(router
, expected
):
171 "Helper function to test ISIS routes installed in rib."
172 actual
= router
.vtysh_cmd("show ip route json", isjson
=True)
173 return topotest
.json_cmp(actual
, expected
)
175 test_func
= functools
.partial(compare_isis_installed_routes
, router
, expected
)
176 (result
, diff
) = topotest
.run_and_expect(test_func
, None, wait
=1, count
=10)
177 assertmsg
= "Router '{}' routes mismatch".format(rname
)
178 assert result
, assertmsg
181 def test_isis_linux_route_installation():
182 "Check whether all expected routes are present and installed in the OS"
184 # Don't run this test if we have any failure.
185 if tgen
.routers_have_failure():
186 pytest
.skip(tgen
.errors
)
188 logger
.info("Checking routers for installed ISIS routes in OS")
190 # Check for routes in `ip route`
191 for rname
, router
in tgen
.routers().items():
192 filename
= "{0}/{1}/{1}_route_linux.json".format(CWD
, rname
)
193 expected
= json
.loads(open(filename
, "r").read())
194 actual
= topotest
.ip4_route(router
)
195 assertmsg
= "Router '{}' OS routes mismatch".format(rname
)
196 assert topotest
.json_cmp(actual
, expected
) is None, assertmsg
199 def test_isis_route6_installation():
200 "Check whether all expected routes are present"
202 # Don't run this test if we have any failure.
203 if tgen
.routers_have_failure():
204 pytest
.skip(tgen
.errors
)
206 logger
.info("Checking routers for installed ISIS IPv6 routes")
208 # Check for routes in 'show ip route json'
209 for rname
, router
in tgen
.routers().items():
210 filename
= "{0}/{1}/{1}_route6.json".format(CWD
, rname
)
211 expected
= json
.loads(open(filename
, "r").read())
213 def compare_isis_v6_installed_routes(router
, expected
):
214 "Helper function to test ISIS v6 routes installed in rib."
215 actual
= router
.vtysh_cmd("show ipv6 route json", isjson
=True)
216 return topotest
.json_cmp(actual
, expected
)
218 test_func
= functools
.partial(
219 compare_isis_v6_installed_routes
, router
, expected
221 (result
, diff
) = topotest
.run_and_expect(test_func
, None, wait
=1, count
=10)
222 assertmsg
= "Router '{}' routes mismatch".format(rname
)
223 assert result
, assertmsg
226 def test_isis_linux_route6_installation():
227 "Check whether all expected routes are present and installed in the OS"
229 # Don't run this test if we have any failure.
230 if tgen
.routers_have_failure():
231 pytest
.skip(tgen
.errors
)
233 logger
.info("Checking routers for installed ISIS IPv6 routes in OS")
235 # Check for routes in `ip route`
236 for rname
, router
in tgen
.routers().items():
237 filename
= "{0}/{1}/{1}_route6_linux.json".format(CWD
, rname
)
238 expected
= json
.loads(open(filename
, "r").read())
239 actual
= topotest
.ip6_route(router
)
240 assertmsg
= "Router '{}' OS routes mismatch".format(rname
)
241 assert topotest
.json_cmp(actual
, expected
) is None, assertmsg
244 def test_isis_summary_json():
245 "Check json struct in show isis summary json"
248 # Don't run this test if we have any failure.
249 if tgen
.routers_have_failure():
250 pytest
.skip(tgen
.errors
)
252 logger
.info("Checking 'show isis summary json'")
253 for rname
, router
in tgen
.routers().items():
254 logger
.info("Checking router %s", rname
)
255 json_output
= tgen
.gears
[rname
].vtysh_cmd("show isis summary json", isjson
=True)
256 assertmsg
= "Test isis summary json failed in '{}' data '{}'".format(
259 assert json_output
["vrf"] == "default", assertmsg
260 assert json_output
["areas"][0]["area"] == "1", assertmsg
261 assert json_output
["areas"][0]["levels"][0]["id"] != "3", assertmsg
264 def test_isis_interface_json():
265 "Check json struct in show isis interface json"
268 # Don't run this test if we have any failure.
269 if tgen
.routers_have_failure():
270 pytest
.skip(tgen
.errors
)
272 logger
.info("Checking 'show isis interface json'")
273 for rname
, router
in tgen
.routers().items():
274 logger
.info("Checking router %s", rname
)
275 json_output
= tgen
.gears
[rname
].vtysh_cmd(
276 "show isis interface json", isjson
=True
278 assertmsg
= "Test isis interface json failed in '{}' data '{}'".format(
282 json_output
["areas"][0]["circuits"][0]["interface"]["name"]
286 for rname
, router
in tgen
.routers().items():
287 logger
.info("Checking router %s", rname
)
288 json_output
= tgen
.gears
[rname
].vtysh_cmd(
289 "show isis interface detail json", isjson
=True
291 assertmsg
= "Test isis interface json failed in '{}' data '{}'".format(
295 json_output
["areas"][0]["circuits"][0]["interface"]["name"]
300 def test_isis_neighbor_json():
301 "Check json struct in show isis neighbor json"
304 # Don't run this test if we have any failure.
305 if tgen
.routers_have_failure():
306 pytest
.skip(tgen
.errors
)
309 logger
.info("Checking 'show isis neighbor json'")
310 for rname
, router
in tgen
.routers().items():
311 logger
.info("Checking router %s", rname
)
312 json_output
= tgen
.gears
[rname
].vtysh_cmd(
313 "show isis neighbor json", isjson
=True
315 assertmsg
= "Test isis neighbor json failed in '{}' data '{}'".format(
319 json_output
["areas"][0]["circuits"][0]["interface"] == rname
+ "-eth0"
322 for rname
, router
in tgen
.routers().items():
323 logger
.info("Checking router %s", rname
)
324 json_output
= tgen
.gears
[rname
].vtysh_cmd(
325 "show isis neighbor detail json", isjson
=True
327 assertmsg
= "Test isis neighbor json failed in '{}' data '{}'".format(
331 json_output
["areas"][0]["circuits"][0]["interface"]["name"]
336 def test_isis_database_json():
337 "Check json struct in show isis database json"
340 # Don't run this test if we have any failure.
341 if tgen
.routers_have_failure():
342 pytest
.skip(tgen
.errors
)
345 logger
.info("Checking 'show isis database json'")
346 for rname
, router
in tgen
.routers().items():
347 logger
.info("Checking router %s", rname
)
348 json_output
= tgen
.gears
[rname
].vtysh_cmd(
349 "show isis database json", isjson
=True
351 assertmsg
= "Test isis database json failed in '{}' data '{}'".format(
354 assert json_output
["areas"][0]["area"]["name"] == "1", assertmsg
355 assert json_output
["areas"][0]["levels"][0]["id"] != "3", assertmsg
357 for rname
, router
in tgen
.routers().items():
358 logger
.info("Checking router %s", rname
)
359 json_output
= tgen
.gears
[rname
].vtysh_cmd(
360 "show isis database detail json", isjson
=True
362 assertmsg
= "Test isis database json failed in '{}' data '{}'".format(
365 assert json_output
["areas"][0]["area"]["name"] == "1", assertmsg
366 assert json_output
["areas"][0]["levels"][0]["id"] != "3", assertmsg
369 def test_isis_overload_on_startup():
370 "Check that overload on startup behaves as expected"
373 net
= get_topogen().net
376 # Don't run this test if we have any failure.
377 if tgen
.routers_have_failure():
378 pytest
.skip(tgen
.errors
)
380 logger
.info("Testing overload on startup behavior")
382 # Configure set-overload-bit on-startup on r3
383 r3
= tgen
.gears
["r3"]
388 set-overload-bit on-startup {overload_time}
392 logger
.info("Stop router")
393 stop_router(tgen
, "r3")
394 logger
.info("Start router")
396 tstamp_before_start_router
= datetime
.datetime
.now()
397 start_router(tgen
, "r3")
398 tstamp_after_start_router
= datetime
.datetime
.now()
399 startup_router_time
= (
400 tstamp_after_start_router
- tstamp_before_start_router
403 # Check that the overload bit is set in r3's LSP
404 check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
405 check_lsp_overload_bit("r1", "r3.00-00", "0/0/1")
407 # Attempt to unset overload bit while timer is still running
412 no set-overload-bit on-startup
417 # Check overload bit is still set
418 check_lsp_overload_bit("r1", "r3.00-00", "0/0/1")
420 # Check that overload bit is unset after timer completes
421 check_lsp_overload_bit("r3", "r3.00-00", "0/0/0")
422 tstamp_after_bit_unset
= datetime
.datetime
.now()
423 check_lsp_overload_bit("r1", "r3.00-00", "0/0/0")
425 # Collect time overloaded
427 tstamp_after_bit_unset
- tstamp_after_start_router
429 logger
.info(f
"Time Overloaded: {time_overloaded}")
431 # Use time it took to startup router as lower bound
433 f
"Assert that overload time falls in range: {overload_time - startup_router_time} < {time_overloaded} <= {overload_time}"
435 result
= overload_time
- startup_router_time
< time_overloaded
<= overload_time
439 def test_isis_overload_on_startup_cancel_timer():
440 "Check that overload on startup timer is cancelled when overload bit is set/unset"
443 net
= get_topogen().net
446 # Don't run this test if we have any failure.
447 if tgen
.routers_have_failure():
448 pytest
.skip(tgen
.errors
)
451 "Testing overload on startup behavior with set overload bit: cancel timer"
454 # Configure set-overload-bit on-startup on r3
455 r3
= tgen
.gears
["r3"]
460 set-overload-bit on-startup {overload_time}
465 logger
.info("Stop router")
466 stop_router(tgen
, "r3")
467 logger
.info("Start router")
468 start_router(tgen
, "r3")
470 # Check that the overload bit is set in r3's LSP
471 check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
473 # Check that overload timer is running
474 check_overload_timer("r3", True)
476 # Unset overload bit while timer is running
485 # Check that overload timer is cancelled
486 check_overload_timer("r3", False)
488 # Check overload bit is unset
489 check_lsp_overload_bit("r3", "r3.00-00", "0/0/0")
492 def test_isis_overload_on_startup_override_timer():
493 "Check that overload bit remains set after overload timer expires if overload bit is configured"
496 net
= get_topogen().net
499 # Don't run this test if we have any failure.
500 if tgen
.routers_have_failure():
501 pytest
.skip(tgen
.errors
)
504 "Testing overload on startup behavior with set overload bit: override timer"
507 # Configure set-overload-bit on-startup on r3
508 r3
= tgen
.gears
["r3"]
513 set-overload-bit on-startup {overload_time}
518 logger
.info("Stop router")
519 stop_router(tgen
, "r3")
520 logger
.info("Start router")
521 start_router(tgen
, "r3")
523 # Check that the overload bit is set in r3's LSP
524 check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
526 # Check that overload timer is running
527 check_overload_timer("r3", True)
529 # Check that overload timer expired
530 check_overload_timer("r3", False)
532 # Check overload bit is still set
533 check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
536 @retry(retry_timeout
=200)
537 def _check_lsp_overload_bit(router
, overloaded_router_lsp
, att_p_ol_expected
):
538 "Verfiy overload bit in router's LSP"
541 router
= tgen
.gears
[router
]
542 logger
.info(f
"check_overload_bit {router}")
543 isis_database_output
= router
.vtysh_cmd(
544 "show isis database {} json".format(overloaded_router_lsp
)
547 database_json
= json
.loads(isis_database_output
)
548 att_p_ol
= database_json
["areas"][0]["levels"][1]["att-p-ol"]
549 if att_p_ol
== att_p_ol_expected
:
551 return "{} peer with expected att_p_ol {} got {} ".format(
552 router
.name
, att_p_ol_expected
, att_p_ol
556 def check_lsp_overload_bit(router
, overloaded_router_lsp
, att_p_ol_expected
):
557 "Verfiy overload bit in router's LSP"
559 assertmsg
= _check_lsp_overload_bit(
560 router
, overloaded_router_lsp
, att_p_ol_expected
562 assert assertmsg
is True, assertmsg
565 @retry(retry_timeout
=200)
566 def _check_overload_timer(router
, timer_expected
):
567 "Verfiy overload bit in router's LSP"
570 router
= tgen
.gears
[router
]
571 thread_output
= router
.vtysh_cmd("show thread timers")
573 timer_running
= "set_overload_on_start_timer" in thread_output
574 if timer_running
== timer_expected
:
576 return "Expected timer running status: {}".format(timer_expected
)
579 def check_overload_timer(router
, timer_expected
):
580 "Verfiy overload bit in router's LSP"
582 assertmsg
= _check_overload_timer(router
, timer_expected
)
583 assert assertmsg
is True, assertmsg
586 def test_memory_leak():
587 "Run the memory leak test and report results."
589 if not tgen
.is_memleak_enabled():
590 pytest
.skip("Memory leak test/report is disabled")
592 tgen
.report_memory_leaks()
595 if __name__
== "__main__":
596 args
= ["-s"] + sys
.argv
[1:]
597 sys
.exit(pytest
.main(args
))
601 # Auxiliary functions
605 def dict_merge(dct
, merge_dct
):
607 Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
608 updating only top-level keys, dict_merge recurses down into dicts nested
609 to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
611 :param dct: dict onto which the merge is executed
612 :param merge_dct: dct merged into dct
616 https://gist.github.com/angstwad/bf22d1822c38a92ec0a9
618 for k
, v
in merge_dct
.items():
619 if k
in dct
and isinstance(dct
[k
], dict) and topotest
.is_mapping(merge_dct
[k
]):
620 dict_merge(dct
[k
], merge_dct
[k
])
622 dct
[k
] = merge_dct
[k
]
625 def parse_topology(lines
, level
):
627 Parse the output of 'show isis topology level-X' into a Python dict.
632 vertex_type_regex
= "|".join(VERTEX_TYPE_LIST
)
635 area_match
= re
.match(r
"Area (.+):", line
)
637 area
= area_match
.group(1)
638 if area
not in areas
:
639 areas
[area
] = {level
: {"ipv4": [], "ipv6": []}}
645 if re
.match(r
"IS\-IS paths to level-. routers that speak IPv6", line
):
648 if re
.match(r
"IS\-IS paths to level-. routers that speak IP", line
):
652 item_match
= re
.match(
653 r
"([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+)", line
656 item_match
is not None
657 and item_match
.group(1) == "Vertex"
658 and item_match
.group(2) == "Type"
659 and item_match
.group(3) == "Metric"
660 and item_match
.group(4) == "Next-Hop"
661 and item_match
.group(5) == "Interface"
662 and item_match
.group(6) == "Parent"
667 item_match
= re
.match(
668 r
"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+) ([^\s]+) ([^\s]+)".format(
673 if item_match
is not None:
674 areas
[area
][level
][ipv
].append(
676 "vertex": item_match
.group(1),
677 "type": item_match
.group(2),
678 "metric": item_match
.group(3),
679 "next-hop": item_match
.group(5),
680 "interface": item_match
.group(6),
681 "parent": item_match
.group(7),
686 item_match
= re
.match(
687 r
"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+)".format(vertex_type_regex
),
691 if item_match
is not None:
692 areas
[area
][level
][ipv
].append(
694 "vertex": item_match
.group(1),
695 "type": item_match
.group(2),
696 "metric": item_match
.group(3),
697 "parent": item_match
.group(5),
702 item_match
= re
.match(r
"([^\s]+)", line
)
703 if item_match
is not None:
704 areas
[area
][level
][ipv
].append({"vertex": item_match
.group(1)})
710 def show_isis_topology(router
):
712 Get the ISIS topology in a dictionary format.
724 'vertex': '10.0.0.1/24',
734 "interface": "rX-ethY",
745 l1out
= topotest
.normalize_text(
746 router
.vtysh_cmd("show isis topology level-1")
748 l2out
= topotest
.normalize_text(
749 router
.vtysh_cmd("show isis topology level-2")
752 l1
= parse_topology(l1out
, "level-1")
753 l2
= parse_topology(l2out
, "level-2")