test_isis_topo1.py: Test ISIS topology.
"""
+import collections
+import json
import os
+import re
import sys
import pytest
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- topotest.sleep(10, "waiting for ISIS protocol to converge")
+ topotest.sleep(45, "waiting for ISIS protocol to converge")
+
+ # Code to generate the json files.
+ # for rname, router in tgen.routers().iteritems():
+ # open('/tmp/{}_topology.json'.format(rname), 'w').write(
+ # json.dumps(show_isis_topology(router), indent=2, sort_keys=True)
+ # )
+
+ for rname, router in tgen.routers().iteritems():
+ filename = '{0}/{1}/{1}_topology.json'.format(CWD, rname)
+ expected = json.loads(open(filename, 'r').read())
+ actual = show_isis_topology(router)
+ assertmsg = "Router '{}' topology mismatch".format(rname)
+ assert topotest.json_cmp(actual, expected) is None, assertmsg
def test_memory_leak():
if __name__ == '__main__':
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))
+
+
+#
+# Auxiliary functions
+#
+
+
+def dict_merge(dct, merge_dct):
+ """
+ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
+ updating only top-level keys, dict_merge recurses down into dicts nested
+ to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
+ ``dct``.
+ :param dct: dict onto which the merge is executed
+ :param merge_dct: dct merged into dct
+ :return: None
+
+ Source:
+ https://gist.github.com/angstwad/bf22d1822c38a92ec0a9
+ """
+ for k, v in merge_dct.iteritems():
+ if (k in dct and isinstance(dct[k], dict)
+ and isinstance(merge_dct[k], collections.Mapping)):
+ dict_merge(dct[k], merge_dct[k])
+ else:
+ dct[k] = merge_dct[k]
+
+
+def parse_topology(lines, level):
+ """
+ Parse the output of 'show isis topology level-X' into a Python dict.
+ """
+ areas = {}
+ in_area = False
+ area = None
+
+ for line in lines:
+ if not in_area:
+ area_match = re.match(r"Area (.+):", line)
+ if not area_match:
+ continue
+
+ area = area_match.group(1)
+ areas[area] = {level: []}
+ in_area = True
+ continue
+
+ if re.match(r"IS\-IS paths to", line):
+ continue
+ if re.match(r"Vertex Type Metric Next\-Hop Interface Parent", line):
+ continue
+
+ item_match = re.match(
+ r"([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+)", line)
+ if item_match is not None:
+ areas[area][level].append({
+ 'vertex': item_match.group(1),
+ 'type': item_match.group(2),
+ 'metric': item_match.group(3),
+ 'next-hop': item_match.group(4),
+ 'interface': item_match.group(5),
+ 'parent': item_match.group(6),
+ })
+ continue
+
+ item_match = re.match(r"([^ ]+) ([^ ]+) ([^ ]+) ([^ ]+)", line)
+ if item_match is not None:
+ areas[area][level].append({
+ 'vertex': item_match.group(1),
+ 'type': item_match.group(2),
+ 'metric': item_match.group(3),
+ 'parent': item_match.group(4),
+ })
+ continue
+
+ item_match = re.match(r"([^ ]+)", line)
+ if item_match is not None:
+ areas[area][level].append({'vertex': item_match.group(1)})
+ continue
+
+ in_area = False
+
+ return areas
+
+
+def show_isis_topology(router):
+ """
+ Get the ISIS topology in a dictionary format.
+
+ Sample:
+ {
+ 'area-name': {
+ 'level-1': [
+ {
+ 'vertex': 'r1'
+ }
+ ],
+ 'level-2': [
+ {
+ 'vertex': '10.0.0.1/24',
+ 'type': 'IP',
+ 'parent': '0',
+ 'metric': 'internal'
+ }
+ ]
+ },
+ 'area-name-2': {
+ 'level-2': [
+ {
+ "interface": "rX-ethY",
+ "metric": "Z",
+ "next-hop": "rA",
+ "parent": "rC(B)",
+ "type": "TE-IS",
+ "vertex": "rD"
+ }
+ ]
+ }
+ }
+ """
+ l1out = topotest.normalize_text(
+ router.vtysh_cmd('show isis topology level-1')
+ ).splitlines()
+ l2out = topotest.normalize_text(
+ router.vtysh_cmd('show isis topology level-2')
+ ).splitlines()
+
+ l1 = parse_topology(l1out, 'level-1')
+ l2 = parse_topology(l2out, 'level-2')
+
+ dict_merge(l1, l2)
+ return l1