#!/usr/bin/env python
+# SPDX-License-Identifier: ISC
#
# Copyright (c) 2020 by VMware, Inc. ("VMware")
# Used Copyright (c) 2018 by Network Device Education Foundation,
# Inc. ("NetDEF") in this file.
#
-# Permission to use, copy, modify, and/or distribute this software
-# for any purpose with or without fee is hereby granted, provided
-# that the above copyright notice and this permission notice appear
-# in all copies.
-#
-# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
-# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
-# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
-# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
-# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
-# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
-# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
-# OF THIS SOFTWARE.
-#
"""
Following tests are covered to test PIM BSM processing basic functionality:
reset_config_on_routers,
run_frr_cmd,
required_linux_kernel_version,
- topo_daemons,
+ verify_rib,
)
from lib.pim import (
find_rp_from_bsrp_info,
verify_pim_grp_rp_source,
verify_pim_bsr,
- verify_ip_mroutes,
+ verify_mroutes,
verify_join_state_and_timer,
verify_pim_state,
verify_upstream_iif,
verify_igmp_groups,
- verify_ip_pim_upstream_rpf,
- clear_ip_mroute,
- clear_ip_pim_interface_traffic,
+ verify_pim_upstream_rpf,
+ clear_mroute,
+ clear_pim_interface_traffic,
McastTesterHelper,
+ verify_pim_neighbors,
)
from lib.topolog import logger
from lib.topojson import build_config_from_json
# Required linux kernel version for this suite to run.
result = required_linux_kernel_version("4.15")
if result is not True:
- pytest.skip("Kernel requirements are not met")
+ pytest.skip("Kernel version should be >= 4.15")
testsuite_run_time = time.asctime(time.localtime(time.time()))
logger.info("Testsuite start time: {}".format(testsuite_run_time))
topo = tgen.json_topo
# ... and here it calls Mininet initialization functions.
- # get list of daemons needs to be started for this suite.
- daemons = topo_daemons(tgen, topo)
-
# Starting topology, create tmp files which are loaded to routers
# to start daemons and then start routers
- start_topology(tgen, daemons)
+ start_topology(tgen)
# Don"t run this test if we have any failure.
if tgen.routers_have_failure():
# Creating configuration from JSON
build_config_from_json(tgen, topo)
+ # Verify PIM neighbors
+ result = verify_pim_neighbors(tgen, topo)
+ assert result is True, " Verify PIM neighbor: Failed Error: {}".format(result)
+
# XXX Replace this using "with McastTesterHelper()... " in each test if possible.
global app_helper
app_helper = McastTesterHelper(tgen)
# Add static routes
input_dict = {
- fhr: {"static_routes": [{"network": bsr_route, "next_hop": next_hop}]},
rp: {"static_routes": [{"network": bsr_route, "next_hop": next_hop_rp}]},
lhr: {"static_routes": [{"network": bsr_route, "next_hop": next_hop_lhr}]},
}
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+ # Verifying static routes are installed
+ for dut, _nexthop in zip([rp, lhr], [next_hop_rp, next_hop_lhr]):
+ input_routes = {dut: input_dict[dut]}
+ result = verify_rib(
+ tgen, "ipv4", dut, input_routes, _nexthop, protocol="static"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
# Add kernel route for source
group = topo["routers"][bsr]["bsm"]["bsr_packets"][packet]["pkt_dst"]
bsr_interface = topo["routers"][bsr]["links"][fhr]["interface"]
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+ # Verifying static routes are installed
+ result = verify_rib(
+ tgen, "ipv4", fhr, input_dict, next_hop_fhr, protocol="static"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
input_dict = {
lhr: {"static_routes": [{"network": rp_list, "next_hop": next_hop_lhr}]},
}
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+
+ # Verifying static routes are installed
+ result = verify_rib(
+ tgen, "ipv4", lhr, input_dict, next_hop_lhr, protocol="static"
+ )
+ assert result is True, "Testcase {} : Failed \n Error {}".format(
+ tc_name, result
+ )
+
return True
pytest.skip(tgen.errors)
app_helper.stop_all_hosts()
- clear_ip_mroute(tgen)
- reset_config_on_routers(tgen)
- clear_ip_pim_interface_traffic(tgen, topo)
-
+ clear_mroute(tgen)
reset_config_on_routers(tgen)
+ clear_pim_interface_traffic(tgen, topo)
result = pre_config_to_bsm(
tgen, topo, tc_name, "b1", "s1", "r1", "f1", "i1", "l1", "packet1"
# Verify ip mroute
step("Verify ip mroute in l1")
src_addr = "*"
- result = verify_ip_mroutes(tgen, dut, src_addr, GROUP_ADDRESS, iif, oil)
+ result = verify_mroutes(tgen, dut, src_addr, GROUP_ADDRESS, iif, oil)
assert result is True, "Testcase {}:Failed \n Error: {}".format(tc_name, result)
# Remove the group rp mapping and send bsm
# Verify mroute not installed
step("Verify mroute not installed in l1")
- result = verify_ip_mroutes(
+ result = verify_mroutes(
tgen, dut, src_addr, GROUP_ADDRESS, iif, oil, retry_timeout=20, expected=False
)
- assert (
- result is not True
- ), "Testcase {} : Failed \n " "mroute installed in l1 \n Error: {}".format(
- tc_name, result
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "Expected: [{}]: mroute (S, G) should not be installed \n "
+ "Found: {}".format(tc_name, dut, result)
)
# Send BSM again to configure rp
# Verify that (*,G) installed in mroute again
iif = "l1-i1-eth0"
- result = verify_ip_mroutes(tgen, dut, src_addr, GROUP_ADDRESS, iif, oil)
+ result = verify_mroutes(tgen, dut, src_addr, GROUP_ADDRESS, iif, oil)
assert result is True, "Testcase {}:Failed \n Error: {}".format(tc_name, result)
step("clear BSM database before moving to next case")
pytest.skip(tgen.errors)
app_helper.stop_all_hosts()
- clear_ip_mroute(tgen)
- reset_config_on_routers(tgen)
- clear_ip_pim_interface_traffic(tgen, topo)
-
+ clear_mroute(tgen)
reset_config_on_routers(tgen)
+ clear_pim_interface_traffic(tgen, topo)
result = pre_config_to_bsm(
tgen, topo, tc_name, "b1", "s1", "r1", "f1", "i1", "l1", "packet1"
iif = "l1-i1-eth0"
# Verify upstream rpf for 225.1.1.1 is chosen as rp1
step("Verify upstream rpf for 225.1.1.1 is chosen as rp1 in l1")
- result = verify_ip_pim_upstream_rpf(tgen, topo, dut, iif, GROUP_ADDRESS, rp1)
+ result = verify_pim_upstream_rpf(tgen, topo, dut, iif, GROUP_ADDRESS, rp1)
assert result is True, "Testcase {}:Failed \n Error: {}".format(tc_name, result)
# Send BSR packet from b1 with rp for 225.1.1.1/32 removed
# Verify upstream rpf for 225.1.1.1 is chosen as rp1
step("Verify upstream rpf for 225.1.1.1 is chosen as rp2 in l1")
- result = verify_ip_pim_upstream_rpf(tgen, topo, dut, iif, GROUP_ADDRESS, rp2)
+ result = verify_pim_upstream_rpf(tgen, topo, dut, iif, GROUP_ADDRESS, rp2)
assert result is True, "Testcase {}:Failed \n Error: {}".format(tc_name, result)
# Verify IIF/OIL in pim state
pytest.skip(tgen.errors)
app_helper.stop_all_hosts()
- clear_ip_mroute(tgen)
- reset_config_on_routers(tgen)
- clear_ip_pim_interface_traffic(tgen, topo)
-
+ clear_mroute(tgen)
reset_config_on_routers(tgen)
+ clear_pim_interface_traffic(tgen, topo)
result = pre_config_to_bsm(
tgen, topo, tc_name, "b1", "s1", "r1", "f1", "i1", "l1", "packet1"
write_test_header(tc_name)
app_helper.stop_all_hosts()
- clear_ip_mroute(tgen)
+ clear_mroute(tgen)
reset_config_on_routers(tgen)
- clear_ip_pim_interface_traffic(tgen, topo)
+ clear_pim_interface_traffic(tgen, topo)
# Don"t run this test if we have any failure.
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- reset_config_on_routers(tgen)
-
result = pre_config_to_bsm(
tgen, topo, tc_name, "b1", "s1", "r1", "f1", "i1", "l1", "packet1"
)
# Verify bsr state in FHR
step("Verify if b2 is not chosen as bsr in f1")
result = verify_pim_bsr(tgen, topo, "f1", bsr_ip2, expected=False)
- assert (
- result is not True
- ), "Testcase {} : Failed \n " "b2 is chosen as bsr in f1 \n Error: {}".format(
- tc_name, result
+ assert result is not True, (
+ "Testcase {} : Failed \n "
+ "Expected: [{}]: b2 should not be chosen as bsr \n "
+ "Found: {}".format(tc_name, "f1", result)
)
# Verify if b1 is still chosen as bsr
pytest.skip(tgen.errors)
app_helper.stop_all_hosts()
- clear_ip_mroute(tgen)
- reset_config_on_routers(tgen)
- clear_ip_pim_interface_traffic(tgen, topo)
-
+ clear_mroute(tgen)
reset_config_on_routers(tgen)
+ clear_pim_interface_traffic(tgen, topo)
result = pre_config_to_bsm(
tgen, topo, tc_name, "b1", "s1", "r1", "f1", "i1", "l1", "packet1"
pytest.skip(tgen.errors)
app_helper.stop_all_hosts()
- clear_ip_mroute(tgen)
- reset_config_on_routers(tgen)
- clear_ip_pim_interface_traffic(tgen, topo)
-
+ clear_mroute(tgen)
reset_config_on_routers(tgen)
+ clear_pim_interface_traffic(tgen, topo)
result = pre_config_to_bsm(
tgen, topo, tc_name, "b1", "s1", "r1", "f1", "i1", "l1", "packet1"
pytest.skip(tgen.errors)
app_helper.stop_all_hosts()
- clear_ip_mroute(tgen)
+ clear_mroute(tgen)
reset_config_on_routers(tgen)
- clear_ip_pim_interface_traffic(tgen, topo)
+ clear_pim_interface_traffic(tgen, topo)
step("pre-configure BSM packet")
result = pre_config_to_bsm(