#!/usr/bin/env python
+# SPDX-License-Identifier: ISC
#
# test_ospf_sr_te_topo1.py
# Copyright (c) 2021 by
# Volta Networks
#
-# Permission to use, copy, modify, and/or distribute this software
-# for any purpose with or without fee is hereby granted, provided
-# that the above copyright notice and this permission notice appear
-# in all copies.
-#
-# THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
-# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
-# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
-# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
-# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
-# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
-# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
-# OF THIS SOFTWARE.
-#
"""
test_ospf_sr_te_topo1.py:
import sys
import pytest
import json
-import re
from time import sleep
from functools import partial
from lib.topolog import logger
# Required to instantiate the topology builder class.
-from mininet.topo import Topo
pytestmark = [pytest.mark.bgpd, pytest.mark.ospfd, pytest.mark.pathd]
-class TemplateTopo(Topo):
- "Test topology builder"
+def build_topo(tgen):
+ "Build function"
- def build(self, *_args, **_opts):
- "Build function"
- tgen = get_topogen(self)
+ #
+ # Define FRR Routers
+ #
+ for router in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "dst"]:
+ tgen.add_router(router)
- #
- # Define FRR Routers
- #
- for router in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "dst"]:
- tgen.add_router(router)
+ #
+ # Define connections
+ #
+ switch = tgen.add_switch("s1")
+ switch.add_link(tgen.gears["rt1"], nodeif="eth-sw1")
+ switch.add_link(tgen.gears["rt2"], nodeif="eth-sw1")
+ # switch.add_link(tgen.gears["rt3"], nodeif="eth-sw1")
- #
- # Define connections
- #
- switch = tgen.add_switch("s1")
- switch.add_link(tgen.gears["rt1"], nodeif="eth-sw1")
- switch.add_link(tgen.gears["rt2"], nodeif="eth-sw1")
- #switch.add_link(tgen.gears["rt3"], nodeif="eth-sw1")
+ switch = tgen.add_switch("s2")
+ switch.add_link(tgen.gears["rt2"], nodeif="eth-rt4-1")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt2-1")
- switch = tgen.add_switch("s2")
- switch.add_link(tgen.gears["rt2"], nodeif="eth-rt4-1")
- switch.add_link(tgen.gears["rt4"], nodeif="eth-rt2-1")
+ # switch = tgen.add_switch("s3")
+ # switch.add_link(tgen.gears["rt2"], nodeif="eth-rt4-2")
+ # switch.add_link(tgen.gears["rt4"], nodeif="eth-rt2-2")
- #switch = tgen.add_switch("s3")
- #switch.add_link(tgen.gears["rt2"], nodeif="eth-rt4-2")
- #switch.add_link(tgen.gears["rt4"], nodeif="eth-rt2-2")
+ switch = tgen.add_switch("s4")
+ switch.add_link(tgen.gears["rt3"], nodeif="eth-rt5-1")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt3-1")
- switch = tgen.add_switch("s4")
- switch.add_link(tgen.gears["rt3"], nodeif="eth-rt5-1")
- switch.add_link(tgen.gears["rt5"], nodeif="eth-rt3-1")
+ switch = tgen.add_switch("s5")
+ switch.add_link(tgen.gears["rt3"], nodeif="eth-rt5-2")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt3-2")
- switch = tgen.add_switch("s5")
- switch.add_link(tgen.gears["rt3"], nodeif="eth-rt5-2")
- switch.add_link(tgen.gears["rt5"], nodeif="eth-rt3-2")
+ switch = tgen.add_switch("s6")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt5")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt4")
- switch = tgen.add_switch("s6")
- switch.add_link(tgen.gears["rt4"], nodeif="eth-rt5")
- switch.add_link(tgen.gears["rt5"], nodeif="eth-rt4")
+ switch = tgen.add_switch("s7")
+ switch.add_link(tgen.gears["rt4"], nodeif="eth-rt6")
+ switch.add_link(tgen.gears["rt6"], nodeif="eth-rt4")
- switch = tgen.add_switch("s7")
- switch.add_link(tgen.gears["rt4"], nodeif="eth-rt6")
- switch.add_link(tgen.gears["rt6"], nodeif="eth-rt4")
+ switch = tgen.add_switch("s8")
+ switch.add_link(tgen.gears["rt5"], nodeif="eth-rt6")
+ switch.add_link(tgen.gears["rt6"], nodeif="eth-rt5")
- switch = tgen.add_switch("s8")
- switch.add_link(tgen.gears["rt5"], nodeif="eth-rt6")
- switch.add_link(tgen.gears["rt6"], nodeif="eth-rt5")
-
- switch = tgen.add_switch("s9")
- switch.add_link(tgen.gears["rt6"], nodeif="eth-dst")
- switch.add_link(tgen.gears["dst"], nodeif="eth-rt6")
+ switch = tgen.add_switch("s9")
+ switch.add_link(tgen.gears["rt6"], nodeif="eth-dst")
+ switch.add_link(tgen.gears["dst"], nodeif="eth-rt6")
def setup_module(mod):
"Sets up the pytest environment"
- tgen = Topogen(TemplateTopo, mod.__name__)
+ tgen = Topogen(build_topo, mod.__name__)
frrdir = tgen.config.get(tgen.CONFIG_SECTION, "frrdir")
if not os.path.isfile(os.path.join(frrdir, "pathd")):
- pytest.skip("pathd daemon wasn't built in:"+frrdir)
+ pytest.skip("pathd daemon wasn't built in:" + frrdir)
tgen.start_topology()
candidate_output = router.vtysh_cmd("show mpls table json")
candidate_output_json = json.loads(candidate_output)
for item in candidate_output_json.items():
- # logger.info('item "%s"', item)
- if item[0] == candidate_key:
- matched_key = True
- if positive:
- break
+ # logger.info('item "%s"', item)
+ if item[0] == candidate_key:
+ matched_key = True
+ if positive:
+ break
if positive:
if matched_key:
matched = True
assertmsg = "{} don't has entry {} but is was expected".format(
- router.name, candidate_key)
+ router.name, candidate_key
+ )
else:
if not matched_key:
matched = True
assertmsg = "{} has entry {} but is wans't expected".format(
- router.name, candidate_key)
+ router.name, candidate_key
+ )
if matched:
logger.info('Success "%s" in "%s"', router.name, fn_name)
return
for rname, endpoint in [("rt1", "6.6.6.6"), ("rt6", "1.1.1.1")]:
add_candidate_path(rname, endpoint, 100, "default")
- check_bsid(rname, "1111" if rname == "rt1" else "6666", test_srte_init_step1.__name__, True)
+ check_bsid(
+ rname,
+ "1111" if rname == "rt1" else "6666",
+ test_srte_init_step1.__name__,
+ True,
+ )
delete_candidate_path(rname, endpoint, 100)
check_bsid(rname, bsid, test_srte_init_step1.__name__, False)
create_sr_policy(rname, endpoint, bsid)
add_candidate_path(rname, endpoint, 100, "default")
- check_bsid(rname, "1111" if rname == "rt1" else "6666", test_srte_init_step1.__name__, True)
+ check_bsid(
+ rname,
+ "1111" if rname == "rt1" else "6666",
+ test_srte_init_step1.__name__,
+ True,
+ )
delete_candidate_path(rname, endpoint, 100)
add_candidate_path(rname, endpoint, 100, "default")
# now change the segment list name
add_candidate_path(rname, endpoint, 100, "default", "test")
- check_bsid(rname, "1111" if rname == "rt1" else "6666", test_srte_init_step1.__name__, True)
+ check_bsid(
+ rname,
+ "1111" if rname == "rt1" else "6666",
+ test_srte_init_step1.__name__,
+ True,
+ )
delete_segment(rname, "test", 10)
delete_segment(rname, "test", 20)
delete_segment(rname, "test", 30)
add_segment_adj(rname, "test", 20, "10.0.6.5", "10.0.6.4")
add_segment_adj(rname, "test", 30, "10.0.2.4", "10.0.2.2")
add_segment_adj(rname, "test", 40, "10.0.1.2", "10.0.1.1")
- check_bsid(rname, "1111" if rname == "rt1" else "6666", test_srte_init_step1.__name__, True)
+ check_bsid(
+ rname,
+ "1111" if rname == "rt1" else "6666",
+ test_srte_init_step1.__name__,
+ True,
+ )
delete_candidate_path(rname, endpoint, 100)
add_candidate_path(rname, endpoint, 100, "default")
# now change the segment list name
add_candidate_path(rname, endpoint, 200, "test", "test")
- check_bsid(rname, "1111" if rname == "rt1" else "6666", test_srte_init_step1.__name__, True)
+ check_bsid(
+ rname,
+ "1111" if rname == "rt1" else "6666",
+ test_srte_init_step1.__name__,
+ True,
+ )
delete_segment(rname, "test", 10)
delete_segment(rname, "test", 20)
delete_segment(rname, "test", 30)
add_segment_adj(rname, "test", 30, "10.0.2.99", "10.0.2.99")
add_segment_adj(rname, "test", 40, "10.0.1.99", "10.0.1.99")
# So policy sticks with default sl even higher prio
- check_bsid(rname, "1111" if rname == "rt1" else "6666", test_srte_init_step1.__name__, True)
+ check_bsid(
+ rname,
+ "1111" if rname == "rt1" else "6666",
+ test_srte_init_step1.__name__,
+ True,
+ )
delete_candidate_path(rname, endpoint, 100)