setup is 3 routers with 3 links between each each link in a different vrf
Default, blue and red respectively
Tests check various fiddling with passwords and checking that the peer
-establishment is as expected and passwords are not leaked across sockets
+establishment is as expected and passwords are not leaked across sockets
for bgp instances
"""
+# pylint: disable=C0413
-import os
-import sys
import json
+import os
import platform
-import pytest
+import sys
from time import sleep
-# Save the Current Working Directory to find configuration files.
-CWD = os.path.dirname(os.path.realpath(__file__))
-sys.path.append(os.path.join(CWD, "../"))
-
-# pylint: disable=C0413
-# Import topogen and topotest helpers
-from lib import common_config
-from lib import topotest
-from lib.topogen import Topogen, TopoRouter, get_topogen
+import pytest
+from lib import common_config, topotest
from lib.common_config import (
- FRRCFG_FILE,
- FRRCFG_BKUP_FILE,
- load_config_to_routers,
- reset_config_on_routers,
+ save_initial_config_on_routers,
+ reset_with_new_configs,
)
-
-ERROR_LIST = ["Malformed", "Failure", "Unknown", "Incomplete"]
+from lib.topogen import Topogen, TopoRouter, get_topogen
pytestmark = [pytest.mark.bgpd, pytest.mark.ospfd]
-
-def reload_new_configs(tgen, *cflist):
- reset_config_on_routers(tgen)
-
- routers = tgen.routers()
- for rname, router in routers.items():
- destname = "{}/{}/{}".format(tgen.logdir, rname, FRRCFG_FILE)
- wmode="w"
- for cfbase in cflist:
- confname = os.path.join(CWD, "{}/{}".format(rname, cfbase))
- with open(confname, "r") as cf:
- with open(destname, wmode) as df:
- df.write(cf.read())
- wmode="a"
-
- # import pdb
- # pdb.set_trace()
- common_config.ROUTER_LIST = routers
- load_config_to_routers(tgen, routers, save_bkup=False)
-
-
-class InvalidCLIError(Exception):
- """Raise when the CLI command is wrong"""
+CWD = os.path.dirname(os.path.realpath(__file__))
def build_topo(tgen):
- # Create routers
tgen.add_router("R1")
tgen.add_router("R2")
tgen.add_router("R3")
- # R1-R2 1
tgen.add_link(tgen.gears["R1"], tgen.gears["R2"])
- # R1-R3 1
tgen.add_link(tgen.gears["R1"], tgen.gears["R3"])
- # R2-R3 1
tgen.add_link(tgen.gears["R2"], tgen.gears["R3"])
- # R1-R2 2
tgen.add_link(tgen.gears["R1"], tgen.gears["R2"])
- # R1-R3 2
tgen.add_link(tgen.gears["R1"], tgen.gears["R3"])
- # R2-R3 2
tgen.add_link(tgen.gears["R2"], tgen.gears["R3"])
- # R1-R2 3
tgen.add_link(tgen.gears["R1"], tgen.gears["R2"])
- # R1-R3 2
tgen.add_link(tgen.gears["R1"], tgen.gears["R3"])
- # R2-R3 2
tgen.add_link(tgen.gears["R2"], tgen.gears["R3"])
# For all registred routers, load the zebra configuration file
for rname, router in router_list.items():
- router.load_config(
- TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
- )
- router.load_config(
- TopoRouter.RD_OSPF, os.path.join(CWD, "{}/empty.conf".format(rname))
- )
- router.load_config(
- TopoRouter.RD_BGP, os.path.join(CWD, "{}/empty.conf".format(rname))
- )
+ router.load_config(TopoRouter.RD_ZEBRA, "zebra.conf")
+ router.load_config(TopoRouter.RD_OSPF)
+ router.load_config(TopoRouter.RD_BGP)
- # After loading the configurations, this function loads configured daemons.
+ # After copying the configurations, this function loads configured daemons.
tgen.start_router()
- # Set the initial (saved) config to the zebra config. reset_config_on_routers
- # will use this config to reset to.
- routers = tgen.routers()
- for rname, router in routers.items():
- confname = os.path.join(CWD, "{}/zebra.conf".format(rname))
- destname = "{}/{}/{}".format(tgen.logdir, rname, FRRCFG_BKUP_FILE)
- router.cmd_raises("cp {} {}".format(confname, destname))
+ # Save the initial router config. reset_config_on_routers will return to this config.
+ save_initial_config_on_routers(tgen)
def teardown_module(mod):
print(router.vtysh_cmd("show bgp {} neighbor".format(vrf_str(vrf))))
-def configure(conf_file):
- "configure from a file"
-
- tgen = get_topogen()
- router_list = tgen.routers()
- for rname, router in router_list.items():
- with open(
- os.path.join(CWD, "{}/{}").format(router.name, conf_file), "r+"
- ) as cfg:
- new_config = cfg.read()
-
- output = router.vtysh_multicmd(new_config, pretty_output=False)
- for out_err in ERROR_LIST:
- if out_err.lower() in output.lower():
- raise InvalidCLIError("%s" % output)
-
-
-def clear_bgp():
- "clear bgp configuration for a vrf"
-
- tgen = get_topogen()
- r1 = tgen.gears["R1"]
- r2 = tgen.gears["R2"]
- r3 = tgen.gears["R3"]
-
- r1.vtysh_cmd("conf t\nno router bgp 65001")
- r2.vtysh_cmd("conf t\nno router bgp 65002")
- r3.vtysh_cmd("conf t\nno router bgp 65003")
- r1.vtysh_cmd("conf t\nno router bgp 65001 vrf blue")
- r2.vtysh_cmd("conf t\nno router bgp 65002 vrf blue")
- r3.vtysh_cmd("conf t\nno router bgp 65003 vrf blue")
- r1.vtysh_cmd("conf t\nno router bgp 65001 vrf red")
- r2.vtysh_cmd("conf t\nno router bgp 65002 vrf red")
- r3.vtysh_cmd("conf t\nno router bgp 65003 vrf red")
-
-
-def configure_bgp(conf_file):
- "configure bgp from file"
-
- clear_bgp()
- configure(conf_file)
-
-
-def clear_ospf():
- "clear ospf configuration for a vrf"
-
- tgen = get_topogen()
- router_list = tgen.routers()
- for rname, router in router_list.items():
- router.vtysh_cmd("conf t\nno router ospf")
- router.vtysh_cmd("conf t\nno router ospf vrf blue")
- router.vtysh_cmd("conf t\nno router ospf vrf red")
-
+@common_config.retry(retry_timeout=190)
+def _check_neigh_state(router, peer, state, vrf=""):
+ "check BGP neighbor state on a router"
-def configure_ospf(conf_file):
- "configure bgp from file"
+ neigh_output = router.vtysh_cmd(
+ "show bgp {} neighbors {} json".format(vrf_str(vrf), peer)
+ )
- clear_ospf()
- configure(conf_file)
+ peer_state = "Unknown"
+ neigh_output_json = json.loads(neigh_output)
+ if peer in neigh_output_json:
+ peer_state = neigh_output_json[peer]["bgpState"]
+ if peer_state == state:
+ return True
+ return "{} peer with {} expected state {} got {} ".format(
+ router.name, peer, state, peer_state
+ )
def check_neigh_state(router, peer, state, vrf=""):
"check BGP neighbor state on a router"
- count = 0
- matched = False
- neigh_output = ""
- while count < 125:
- if vrf == "":
- neigh_output = router.vtysh_cmd("show bgp neighbors {} json".format(peer))
- else:
- neigh_output = router.vtysh_cmd(
- "show bgp vrf {} neighbors {} json".format(vrf, peer)
- )
- neigh_output_json = json.loads(neigh_output)
- if peer in neigh_output_json.keys():
- if neigh_output_json[peer]["bgpState"] == state:
- matched = True
- break
- count += 1
- sleep(1)
-
- assertmsg = "{} could not peer {} state expected {} got {} ".format(
- router.name, peer, state, neigh_output_json[peer]["bgpState"]
- )
- if matched != True:
- print_diag(vrf)
- assert matched == True, assertmsg
+ assertmsg = _check_neigh_state(router, peer, state, vrf)
+ assert assertmsg is True, assertmsg
def check_all_peers_established(vrf=""):
def test_default_peer_established(tgen):
"default vrf 3 peers same password"
- # configure_bgp("bgpd.conf")
- # configure_ospf("ospfd.conf")
- reload_new_configs(tgen, "bgpd.conf", "ospfd.conf")
+ reset_with_new_configs(tgen, "bgpd.conf", "ospfd.conf")
check_all_peers_established()
- # tgen.mininet_cli()
def test_default_peer_remove_passwords(tgen):
"selectively remove passwords checking state"
- # configure_bgp("bgpd.conf")
- # configure_ospf("ospfd.conf")
- reload_new_configs(tgen, "bgpd.conf", "ospfd.conf")
+ reset_with_new_configs(tgen, "bgpd.conf", "ospfd.conf")
check_vrf_peer_remove_passwords()
def test_default_peer_change_passwords(tgen):
"selectively change passwords checking state"
- # configure_bgp("bgpd.conf")
- # configure_ospf("ospfd.conf")
- reload_new_configs(tgen, "bgpd.conf", "ospfd.conf")
+ reset_with_new_configs(tgen, "bgpd.conf", "ospfd.conf")
check_vrf_peer_change_passwords()
if topotest.version_cmp(platform.release(), "5.3") < 0:
return
- # configure_bgp("bgpd_prefix.conf")
- # configure_ospf("ospfd.conf")
- reload_new_configs(tgen, "bgpd_prefix.conf", "ospfd.conf")
+ reset_with_new_configs(tgen, "bgpd_prefix.conf", "ospfd.conf")
check_all_peers_established()
- # tgen.mininet_cli()
def test_prefix_peer_remove_passwords(tgen):
if topotest.version_cmp(platform.release(), "5.3") < 0:
return
- # configure_bgp("bgpd_prefix.conf")
- # configure_ospf("ospfd.conf")
- reload_new_configs(tgen, "bgpd_prefix.conf", "ospfd.conf")
+ reset_with_new_configs(tgen, "bgpd_prefix.conf", "ospfd.conf")
check_vrf_peer_remove_passwords(prefix="yes")
if topotest.version_cmp(platform.release(), "5.3") < 0:
return
- # configure_bgp("bgpd_prefix.conf")
- # configure_ospf("ospfd.conf")
- reload_new_configs(tgen, "bgpd_prefix.conf", "ospfd.conf")
+ reset_with_new_configs(tgen, "bgpd_prefix.conf", "ospfd.conf")
check_vrf_peer_change_passwords(prefix="yes")
"default vrf 3 peers same password with VRF config"
# clean routers and load vrf config
- # configure_bgp("bgpd_vrf.conf")
- # configure_ospf("ospfd_vrf.conf")
- reload_new_configs(tgen, "bgpd_vrf.conf", "ospfd_vrf.conf")
+ reset_with_new_configs(tgen, "bgpd_vrf.conf", "ospfd_vrf.conf")
check_all_peers_established("blue")
- # tgen.mininet_cli()
def test_vrf_peer_remove_passwords(tgen):
"selectively remove passwords checking state with VRF config"
- # configure_bgp("bgpd_vrf.conf")
- # configure_ospf("ospfd_vrf.conf")
- reload_new_configs(tgen, "bgpd_vrf.conf", "ospfd_vrf.conf")
+ reset_with_new_configs(tgen, "bgpd_vrf.conf", "ospfd_vrf.conf")
check_vrf_peer_remove_passwords(vrf="blue")
def test_vrf_peer_change_passwords(tgen):
"selectively change passwords checking state with VRF config"
- # configure_bgp("bgpd_vrf.conf")
- # configure_ospf("ospfd_vrf.conf")
- reload_new_configs(tgen, "bgpd_vrf.conf", "ospfd_vrf.conf")
+ reset_with_new_configs(tgen, "bgpd_vrf.conf", "ospfd_vrf.conf")
check_vrf_peer_change_passwords(vrf="blue")
if topotest.version_cmp(platform.release(), "5.3") < 0:
return
- # configure_bgp("bgpd_vrf_prefix.conf")
- # configure_ospf("ospfd_vrf.conf")
- reload_new_configs(tgen, "bgpd_vrf_prefix.conf", "ospfd_vrf.conf")
+ reset_with_new_configs(tgen, "bgpd_vrf_prefix.conf", "ospfd_vrf.conf")
check_all_peers_established("blue")
if topotest.version_cmp(platform.release(), "5.3") < 0:
return
- # configure_bgp("bgpd_vrf_prefix.conf")
- # configure_ospf("ospfd_vrf.conf")
- reload_new_configs(tgen, "bgpd_vrf_prefix.conf", "ospfd_vrf.conf")
+ reset_with_new_configs(tgen, "bgpd_vrf_prefix.conf", "ospfd_vrf.conf")
check_vrf_peer_remove_passwords(vrf="blue", prefix="yes")
if topotest.version_cmp(platform.release(), "5.3") < 0:
return
- # configure_bgp("bgpd_vrf_prefix.conf")
- # configure_ospf("ospfd_vrf.conf")
- reload_new_configs(tgen, "bgpd_vrf_prefix.conf", "ospfd_vrf.conf")
+ reset_with_new_configs(tgen, "bgpd_vrf_prefix.conf", "ospfd_vrf.conf")
check_vrf_peer_change_passwords(vrf="blue", prefix="yes")
def test_multiple_vrf_peer_established(tgen):
"default vrf 3 peers same password with multiple VRFs"
- # configure_bgp("bgpd_multi_vrf.conf")
- # configure_ospf("ospfd_multi_vrf.conf")
- reload_new_configs(tgen, "bgpd_multi_vrf.conf", "ospfd_multi_vrf.conf")
+ reset_with_new_configs(tgen, "bgpd_multi_vrf.conf", "ospfd_multi_vrf.conf")
check_all_peers_established("blue")
check_all_peers_established("red")
- # tgen.mininet_cli()
def test_multiple_vrf_peer_remove_passwords(tgen):
"selectively remove passwords checking state with multiple VRFs"
- # configure_bgp("bgpd_multi_vrf.conf")
- # configure_ospf("ospfd_multi_vrf.conf")
- reload_new_configs(tgen, "bgpd_multi_vrf.conf", "ospfd_multi_vrf.conf")
+ reset_with_new_configs(tgen, "bgpd_multi_vrf.conf", "ospfd_multi_vrf.conf")
check_vrf_peer_remove_passwords("blue")
check_all_peers_established("red")
check_vrf_peer_remove_passwords("red")
check_all_peers_established("blue")
- # tgen.mininet_cli()
def test_multiple_vrf_peer_change_passwords(tgen):
"selectively change passwords checking state with multiple VRFs"
- # configure_bgp("bgpd_multi_vrf.conf")
- # configure_ospf("ospfd_multi_vrf.conf")
- reload_new_configs(tgen, "bgpd_multi_vrf.conf", "ospfd_multi_vrf.conf")
+ reset_with_new_configs(tgen, "bgpd_multi_vrf.conf", "ospfd_multi_vrf.conf")
check_vrf_peer_change_passwords("blue")
check_all_peers_established("red")
check_vrf_peer_change_passwords("red")
check_all_peers_established("blue")
- # tgen.mininet_cli()
def test_multiple_vrf_prefix_peer_established(tgen):
if topotest.version_cmp(platform.release(), "5.3") < 0:
return
- # configure_bgp("bgpd_multi_vrf_prefix.conf")
- # configure_ospf("ospfd_multi_vrf.conf")
- reload_new_configs(tgen, "bgpd_multi_vrf_prefix.conf", "ospfd_multi_vrf.conf")
+ reset_with_new_configs(tgen, "bgpd_multi_vrf_prefix.conf", "ospfd_multi_vrf.conf")
check_all_peers_established("blue")
check_all_peers_established("red")
- # tgen.mininet_cli()
def test_multiple_vrf_prefix_peer_remove_passwords(tgen):
if topotest.version_cmp(platform.release(), "5.3") < 0:
return
- # configure_bgp("bgpd_multi_vrf_prefix.conf")
- # configure_ospf("ospfd_multi_vrf.conf")
- reload_new_configs(tgen, "bgpd_multi_vrf_prefix.conf", "ospfd_multi_vrf.conf")
+ reset_with_new_configs(tgen, "bgpd_multi_vrf_prefix.conf", "ospfd_multi_vrf.conf")
check_vrf_peer_remove_passwords(vrf="blue", prefix="yes")
check_all_peers_established("red")
check_vrf_peer_remove_passwords(vrf="red", prefix="yes")
check_all_peers_established("blue")
- # tgen.mininet_cli()
def test_multiple_vrf_prefix_peer_change_passwords(tgen):
if topotest.version_cmp(platform.release(), "5.3") < 0:
return
- # configure_bgp("bgpd_multi_vrf_prefix.conf")
- # configure_ospf("ospfd_multi_vrf.conf")
- reload_new_configs(tgen, "bgpd_multi_vrf_prefix.conf", "ospfd_multi_vrf.conf")
+ reset_with_new_configs(tgen, "bgpd_multi_vrf_prefix.conf", "ospfd_multi_vrf.conf")
check_vrf_peer_change_passwords(vrf="blue", prefix="yes")
check_all_peers_established("red")
check_vrf_peer_change_passwords(vrf="red", prefix="yes")
check_all_peers_established("blue")
- # tgen.mininet_cli()
def test_memory_leak(tgen):
FRRCFG_BKUP_FILE = "frr_json_initial.conf"
ERROR_LIST = ["Malformed", "Failure", "Unknown", "Incomplete"]
-ROUTER_LIST = []
####
CD = os.path.dirname(os.path.realpath(__file__))
return True
+def save_initial_config_on_routers(tgen):
+ """Save current configuration on routers to FRRCFG_BKUP_FILE.
+
+ FRRCFG_BKUP_FILE is the file that will be restored when `reset_config_on_routers()`
+ is called.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ """
+ router_list = tgen.routers()
+ target_cfg_fmt = tgen.logdir + "/{}/frr_json_initial.conf"
+
+ # Get all running configs in parallel
+ procs = {}
+ for rname in router_list:
+ logger.info("Fetching running config for router %s", rname)
+ procs[rname] = router_list[rname].popen(
+ ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
+ stdin=None,
+ stdout=open(target_cfg_fmt.format(rname), "w"),
+ stderr=subprocess.PIPE,
+ )
+ for rname, p in procs.items():
+ _, error = p.communicate()
+ if p.returncode:
+ logger.error(
+ "Get running config for %s failed %d: %s", rname, p.returncode, error
+ )
+ raise InvalidCLIError(
+ "vtysh show running error on {}: {}".format(rname, error)
+ )
+
+
def reset_config_on_routers(tgen, routerName=None):
"""
Resets configuration on routers to the snapshot created using input JSON
# Trim the router list if needed
router_list = tgen.routers()
if routerName:
- if (routerName not in ROUTER_LIST) or (routerName not in router_list):
- logger.debug("Exiting API: reset_config_on_routers: no routers")
+ if routerName not in router_list:
+ logger.warning(
+ "Exiting API: reset_config_on_routers: no router %s",
+ routerName,
+ exc_info=True,
+ )
return True
router_list = {routerName: router_list[routerName]}
return True
+def prep_load_config_to_routers(tgen, *config_name_list):
+ """Create common config for `load_config_to_routers`.
+
+ The common config file is constructed from the list of sub-config files passed as
+ position arguments to this function. Each entry in `config_name_list` is looked for
+ under the router sub-directory in the test directory and those files are
+ concatenated together to create the common config. e.g.,
+
+ # Routers are "r1" and "r2", test file is `example/test_example_foo.py`
+ prepare_load_config_to_routers(tgen, "bgpd.conf", "ospfd.conf")
+
+ When the above call is made the files in
+
+ example/r1/bgpd.conf
+ example/r1/ospfd.conf
+
+ Are concat'd together into a single config file that will be loaded on r1, and
+
+ example/r2/bgpd.conf
+ example/r2/ospfd.conf
+
+ Are concat'd together into a single config file that will be loaded on r2 when
+ the call to `load_config_to_routers` is made.
+ """
+
+ routers = tgen.routers()
+ for rname, router in routers.items():
+ destname = "{}/{}/{}".format(tgen.logdir, rname, FRRCFG_FILE)
+ wmode = "w"
+ for cfbase in config_name_list:
+ script_dir = os.environ["PYTEST_TOPOTEST_SCRIPTDIR"]
+ confname = os.path.join(script_dir, "{}/{}".format(rname, cfbase))
+ with open(confname, "r") as cf:
+ with open(destname, wmode) as df:
+ df.write(cf.read())
+ wmode = "a"
+
+
def load_config_to_routers(tgen, routers, save_bkup=False):
"""
Loads configuration on routers from the file FRRCFG_FILE.
base_router_list = tgen.routers()
router_list = {}
for router in routers:
- if (router not in ROUTER_LIST) or (router not in base_router_list):
+ if router not in base_router_list:
continue
router_list[router] = base_router_list[router]
return load_config_to_routers(tgen, [routerName], save_bkup)
+def reset_with_new_configs(tgen, *cflist):
+ """Reset the router to initial config, then load new configs.
+
+ Resets routers to the initial config state (see `save_initial_config_on_routers()
+ and `reset_config_on_routers()` `), then concat list of router sub-configs together
+ and load onto the routers (see `prep_load_config_to_routers()` and
+ `load_config_to_routers()`)
+ """
+ routers = tgen.routers()
+
+ reset_config_on_routers(tgen)
+ prep_load_config_to_routers(tgen, *cflist)
+ load_config_to_routers(tgen, tgen.routers(), save_bkup=False)
+
+
def get_frr_ipv6_linklocal(tgen, router, intf=None, vrf=None):
"""
API to get the link local ipv6 address of a particular interface using
* `tgen` : topogen object
"""
- global ROUTER_LIST
# Starting topology
tgen.start_topology()
# Starting daemons
router_list = tgen.routers()
- ROUTER_LIST = sorted(
+ routers_sorted = sorted(
router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0))
)
linux_ver = ""
router_list = tgen.routers()
- for rname in ROUTER_LIST:
+ for rname in routers_sorted:
router = router_list[rname]
# It will help in debugging the failures, will give more details on which
topo = tgen.json_topo
router_list = tgen.routers()
- ROUTER_LIST = sorted(
+ routers_sorted = sorted(
router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0))
)
- for rtr in ROUTER_LIST:
+ for rtr in routers_sorted:
if "ospf" in topo["routers"][rtr] and "ospfd" not in daemon_list:
daemon_list.append("ospfd")