#####################################################
-def config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, iperf, iperf_intf, GROUP_RANGE, join=False, traffic=False
-):
- """
- API to do pre-configuration to send IGMP join and multicast
- traffic
-
- parameters:
- -----------
- * `tgen`: topogen object
- * `topo`: input json data
- * `tc_name`: caller test case name
- * `iperf`: router running iperf
- * `iperf_intf`: interface name router running iperf
- * `GROUP_RANGE`: group range
- * `join`: IGMP join, default False
- * `traffic`: multicast traffic, default False
- """
-
- if join:
- # Add route to kernal
- result = addKernelRoute(tgen, iperf, iperf_intf, GROUP_RANGE)
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
- if traffic:
- # Add route to kernal
- result = addKernelRoute(tgen, iperf, iperf_intf, GROUP_RANGE)
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
- rnode = tgen.gears[iperf]
- rnode.run("echo 2 > /proc/sys/net/ipv4/conf/all/rp_filter")
-
- return True
-
-
def verify_state_incremented(state_before, state_after):
"""
API to compare interface traffic state incrementing
step("Start traffic first and then send the IGMP join")
step("Send multicast traffic from FRR3 to 225.1.1.1 receiver")
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i2", "i2-f1-eth0", GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic("i2", IGMP_JOIN, "f1")
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
step("Enable IGMP on FRR1 interface and send IGMP join (225.1.1.1)")
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i1", "i1-l1-eth0", GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
step("joinRx value before join sent")
state_dict = {"r2": {"r2-l1-eth2": ["joinRx"]}}
state_before = verify_pim_interface_traffic(tgen, state_dict)
"Enable IGMP on FRR1 interface and send IGMP join 225.1.1.1 "
"to 225.1.1.5 from different interfaces"
)
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i1", "i1-l1-eth0", GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join("i1", IGMP_JOIN_RANGE_1, "l1")
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
step("Send multicast traffic from FRR3, wait for SPT switchover")
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i2", "i2-f1-eth0", GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic("i2", IGMP_JOIN_RANGE_1, "f1")
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_join = {"i1": "i1-l1-eth0", "i8": "i8-f1-eth0", "i3": "i3-r2-eth0"}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
step("Send multicast traffic from R3 to 225.1.1.1 receiver")
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i2", "i2-f1-eth0", GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic("i2", IGMP_JOIN, "f1")
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
"for group (226.1.1.1-5, 232.1.1.1-5)"
)
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i1", "i1-l1-eth0", _GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join("i1", _IGMP_JOIN_RANGE, "l1")
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
result = create_igmp_config(tgen, topo, input_dict)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i8", "i8-f1-eth0", _GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join("i8", _IGMP_JOIN_RANGE, "f1")
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
}
for src, src_intf in input_traffic.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, _GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, _IGMP_JOIN_RANGE, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
step("Enable IGMP on FRR1 interface and send IGMP join " "(225.1.1.1-5) to FRR1")
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i1", "i1-l1-eth0", GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join("i1", IGMP_JOIN_RANGE_1, "l1")
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
step("Send multicast traffic from FRR3 to 225.1.1.1-5 receivers")
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i2", "i2-f1-eth0", GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic("i2", IGMP_JOIN_RANGE_1, "f1")
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
step("Enable IGMP on FRR1 interface and send IGMP join (225.1.1.1)")
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i1", "i1-l1-eth0", GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join("i1", IGMP_JOIN, "l1")
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
step("Send multicast traffic from FRR3 to 225.1.1.1 receiver")
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i2", "i2-f1-eth0", GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic("i2", IGMP_JOIN, "f1")
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
result = create_igmp_config(tgen, topo, input_dict)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i8", "i8-f1-eth0", GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join("i8", IGMP_JOIN, "f1")
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
clear_ip_pim_interface_traffic(tgen, topo)
step("Enable IGMP on FRR1 interface and send IGMP join (225.1.1.1)")
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i1", "i1-l1-eth0", GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join("i1", IGMP_JOIN, "l1")
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
step("Send multicast traffic from FRR3 to 225.1.1.1 receiver")
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i2", "i2-f1-eth0", GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic("i2", IGMP_JOIN, "f1")
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
clear_ip_pim_interface_traffic(tgen, topo)
step("Enable IGMP on FRR1 interface and send IGMP join (225.1.1.1)")
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i1", "i1-l1-eth0", GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join("i1", IGMP_JOIN, "l1")
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
step("Send multicast traffic from FRR3 to 225.1.1.1 receiver")
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i2", "i2-f1-eth0", GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic("i2", IGMP_JOIN, "f1")
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
#####################################################
-def config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, iperf, iperf_intf, GROUP_RANGE, join=False, traffic=False
-):
- """
- API to do pre-configuration to send IGMP join and multicast
- traffic
-
- parameters:
- -----------
- * `tgen`: topogen object
- * `topo`: input json data
- * `tc_name`: caller test case name
- * `iperf`: router running iperf
- * `iperf_intf`: interface name router running iperf
- * `GROUP_RANGE`: group range
- * `join`: IGMP join, default False
- * `traffic`: multicast traffic, default False
- """
-
- if join:
- # Add route to kernal
- result = addKernelRoute(tgen, iperf, iperf_intf, GROUP_RANGE)
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
- if traffic:
- # Add route to kernal
- result = addKernelRoute(tgen, iperf, iperf_intf, GROUP_RANGE)
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
- router_list = tgen.routers()
- for router in router_list.keys():
- if router == iperf:
- continue
-
- rnode = router_list[router]
- rnode.run("echo 2 > /proc/sys/net/ipv4/conf/all/rp_filter")
-
- return True
-
-
def verify_state_incremented(state_before, state_after):
"""
API to compare interface traffic state incrementing
input_join = {"i1": "i1-l1-eth0", "i8": "i8-f1-eth0"}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, _GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, _IGMP_JOIN_RANGE, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i2": "i2-f1-eth0", "i5": "i5-c2-eth0"}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, _GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, _IGMP_JOIN_RANGE, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_join = {"i1": "i1-l1-eth0", "i8": "i8-f1-eth0"}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, _GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, _IGMP_JOIN_RANGE, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i2": "i2-f1-eth0", "i5": "i5-c2-eth0"}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, _GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, _IGMP_JOIN_RANGE, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
"(226.1.1.1-5) and (232.1.1.1-5)"
)
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i1", "i1-l1-eth0", _GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join("i1", _IGMP_JOIN_RANGE, "l1")
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
step("Send multicast traffic from FRR3 to '226.1.1.1-5'" ", '232.1.1.1-5' receiver")
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i2", "i2-f1-eth0", _GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
step("registerRx and registerStopTx value before traffic sent")
state_dict = {"c2": {"c2-f1-eth1": ["registerRx", "registerStopTx"]}}
state_before = verify_pim_interface_traffic(tgen, state_dict)
input_join = {"i1": "i1-l1-eth0", "i8": "i8-f1-eth0"}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, _GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, _IGMP_JOIN_RANGE, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i2": "i2-f1-eth0", "i5": "i5-c2-eth0"}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, _GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, _IGMP_JOIN_RANGE, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
"Enable IGMP on FRR1 interface and send IGMP join"
" (226.1.1.1-5) and (232.1.1.1-5)"
)
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i1", "i1-l1-eth0", _GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join("i1", _IGMP_JOIN_RANGE, "l1")
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
"Send multicast traffic from FRR3 to all the receivers "
"(226.1.1.1-5) and (232.1.1.1-5)"
)
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i2", "i2-f1-eth0", _GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic("i2", _IGMP_JOIN_RANGE, "f1")
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
result = create_igmp_config(tgen, topo, input_dict)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, "i5", "i5-c2-eth0", _GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join("i5", _IGMP_JOIN_RANGE, "c2")
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
"(226.1.1.1-5) and (232.1.1.1-5)"
)
- # result = config_to_send_igmp_join_and_traffic(
- # tgen, topo, tc_name, "i1", "i1-l1-eth0", _GROUP_RANGE, join=True
- # )
- # assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
- # result = config_to_send_igmp_join_and_traffic(
- # tgen, topo, tc_name, "i2", "i2-f1-eth0", _GROUP_RANGE, traffic=True
- # )
- # assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
step("Send IGMP join (226.1.1.1-5, 232.1.1.1-5) to LHR(l1)")
result = app_helper.run_join("i1", _IGMP_JOIN_RANGE, "l1")
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
result = create_igmp_config(tgen, topo, input_dict)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
- # result = config_to_send_igmp_join_and_traffic(
- # tgen, topo, tc_name, "i8", "i8-f1-eth0", _GROUP_RANGE, join=True
- # )
- # assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join("i8", _IGMP_JOIN_RANGE, "f1")
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
step(
}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, _GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, _IGMP_JOIN_RANGE, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
+
step("Configure one source in FRR2 , one in c1")
step(
"Send multicast traffic from both the sources to all the"
input_src = {"i3": "i3-r2-eth0"}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, _GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, _IGMP_JOIN_RANGE, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
step(
input_join = {"i1": "i1-l1-eth0", "i8": "i8-f1-eth0"}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, _GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, _IGMP_JOIN_RANGE, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i6": "i6-l1-eth0", "i2": "i2-f1-eth0"}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, _GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, _IGMP_JOIN_RANGE, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
#####################################################
-def config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, iperf, iperf_intf, GROUP_RANGE, join=False, traffic=False
-):
- """
- API to do pre-configuration to send IGMP join and multicast
- traffic
-
- parameters:
- -----------
- * `tgen`: topogen object
- * `topo`: input json data
- * `tc_name`: caller test case name
- * `iperf`: router running iperf
- * `iperf_intf`: interface name router running iperf
- * `GROUP_RANGE`: group range
- * `join`: IGMP join, default False
- * `traffic`: multicast traffic, default False
- """
-
- if join:
- # Add route to kernal
- result = addKernelRoute(tgen, iperf, iperf_intf, GROUP_RANGE)
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
- if traffic:
- # Add route to kernal
- result = addKernelRoute(tgen, iperf, iperf_intf, GROUP_RANGE)
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
- router_list = tgen.routers()
- for router in router_list.keys():
- if router == iperf:
- continue
-
- rnode = router_list[router]
- rnode.run("echo 2 > /proc/sys/net/ipv4/conf/all/rp_filter")
-
- for router in topo["routers"].keys():
- if "static_routes" in topo["routers"][router]:
- static_routes = topo["routers"][router]["static_routes"]
- for static_route in static_routes:
- network = static_route["network"]
- next_hop = static_route["next_hop"]
- if type(network) is not list:
- network = [network]
- for net in network:
- addKernelRoute(tgen, router, iperf_intf, net, next_hop)
- return True
-
-
def verify_mroute_repopulated(uptime_before, uptime_after):
"""
API to compare uptime for mroutes
}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i2": topo["routers"]["i2"]["links"]["f1"]["interface"]}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_join = {"i1": topo["routers"]["i1"]["links"]["l1"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i2": topo["routers"]["i2"]["links"]["f1"]["interface"]}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_join = {"i1": topo["routers"]["i1"]["links"]["l1"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i2": topo["routers"]["i2"]["links"]["f1"]["interface"]}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_join = {"i1": topo["routers"]["i1"]["links"]["l1"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i2": topo["routers"]["i2"]["links"]["f1"]["interface"]}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_src = {"i6": topo["routers"]["i6"]["links"]["l1"]["interface"]}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_join = {"i1": topo["routers"]["i1"]["links"]["l1"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i2": topo["routers"]["i2"]["links"]["f1"]["interface"]}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_src = {"i6": topo["routers"]["i6"]["links"]["l1"]["interface"]}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_join = {"i1": topo["routers"]["i1"]["links"]["l1"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_join = {"i1": topo["routers"]["i1"]["links"]["l1"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i2": topo["routers"]["i2"]["links"]["f1"]["interface"]}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_join = {"i1": topo["routers"]["i1"]["links"]["l1"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_join = {"i8": topo["routers"]["i8"]["links"]["f1"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i6": topo["routers"]["i6"]["links"]["l1"]["interface"]}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
step("Send IGMP joins again from LHR,check IGMP joins and starg received")
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
step("Send traffic from FHR and verify mroute upstream")
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_join = {"i8": topo["routers"]["i8"]["links"]["f1"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_join = {"i8": topo["routers"]["i8"]["links"]["f1"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_join = {"i3": topo["routers"]["i3"]["links"]["r2"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, _GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, _IGMP_JOIN_RANGE, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i1": topo["routers"]["i1"]["links"]["l1"]["interface"]}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, _GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, _IGMP_JOIN_RANGE, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_join = {"i1": topo["routers"]["i1"]["links"]["l1"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, _GROUP_RANGE, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, _IGMP_JOIN_RANGE, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i3": topo["routers"]["i3"]["links"]["r2"]["interface"]}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, _GROUP_RANGE, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, _IGMP_JOIN_RANGE, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
#####################################################
-def config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, iperf, iperf_intf, GROUP_RANGE, join=False, traffic=False
-):
- """
- API to do pre-configuration to send IGMP join and multicast
- traffic
-
- parameters:
- -----------
- * `tgen`: topogen object
- * `topo`: input json data
- * `tc_name`: caller test case name
- * `iperf`: router running iperf
- * `iperf_intf`: interface name router running iperf
- * `GROUP_RANGE`: group range
- * `join`: IGMP join, default False
- * `traffic`: multicast traffic, default False
- """
-
- if join:
- # Add route to kernal
- result = addKernelRoute(tgen, iperf, iperf_intf, GROUP_RANGE)
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
- if traffic:
- # Add route to kernal
- result = addKernelRoute(tgen, iperf, iperf_intf, GROUP_RANGE)
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
- router_list = tgen.routers()
- for router in router_list.keys():
- if router == iperf:
- continue
-
- rnode = router_list[router]
- rnode.run("echo 2 > /proc/sys/net/ipv4/conf/all/rp_filter")
-
- for router in topo["routers"].keys():
- if "static_routes" in topo["routers"][router]:
- static_routes = topo["routers"][router]["static_routes"]
- for static_route in static_routes:
- network = static_route["network"]
- next_hop = static_route["next_hop"]
- if type(network) is not list:
- network = [network]
-
- return True
-
-
def verify_state_incremented(state_before, state_after):
"""
API to compare interface traffic state incrementing
input_join = {"i5": topo["routers"]["i5"]["links"]["c2"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i4": topo["routers"]["i4"]["links"]["c1"]["interface"]}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_join = {"i4": topo["routers"]["i4"]["links"]["c1"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i5": topo["routers"]["i5"]["links"]["c2"]["interface"]}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
input_join = {"i4": topo["routers"]["i4"]["links"]["c1"]["interface"]}
for recvr, recvr_intf in input_join.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, recvr, recvr_intf, GROUP_RANGE_1, join=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_join(recvr, IGMP_JOIN_RANGE_1, join_intf=recvr_intf)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_src = {"i5": topo["routers"]["i5"]["links"]["c2"]["interface"]}
for src, src_intf in input_src.items():
- result = config_to_send_igmp_join_and_traffic(
- tgen, topo, tc_name, src, src_intf, GROUP_RANGE_1, traffic=True
- )
- assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
-
result = app_helper.run_traffic(src, IGMP_JOIN_RANGE_1, bind_intf=src_intf)
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
#####################################################
-def config_to_send_igmp_join_and_traffic(tgen, tc_name):
- """
- API to do pre-configuration to send IGMP join and multicast
- traffic
-
- parameters:
- -----------
- * `tgen`: topogen object
- * `tc_name`: caller test case name
- """
-
- step("r0: Add route to kernal")
- result = addKernelRoute(tgen, "r0", "r0-r1-eth0", GROUP_RANGE_ALL)
- assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
-
- step("r5: Add route to kernal")
- result = addKernelRoute(tgen, "r5", "r5-r3-eth0", GROUP_RANGE_ALL)
- assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
-
- rnode = tgen.routers()["r1"]
- rnode.run("ip route add 10.0.6.0/24 via 10.0.2.2")
- rnode = tgen.routers()["r2"]
- rnode.run("ip route add 10.0.6.0/24 via 10.0.4.2")
- rnode = tgen.routers()["r4"]
- rnode.run("ip route add 10.0.6.0/24 via 10.0.5.1")
-
- router_list = tgen.routers()
- for router in router_list.keys():
- rnode = router_list[router]
- rnode.run("echo 2 > /proc/sys/net/ipv4/conf/all/rp_filter")
-
- return True
-
-
def verify_mroute_repopulated(uptime_before, uptime_after):
"""
API to compare uptime for mroutes
pytest.skip(tgen.errors)
step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
step("Enable IGMP on r1 interface and send IGMP " "join (225.1.1.1) to r1")
step("Configure r2 loopback interface as RP")
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
dut = "r1"
intf = "r1-r3-eth2"
shutdown_bringup_interface(tgen, dut, intf, False)
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
dut = "r1"
intf = "r1-r3-eth2"
shutdown_bringup_interface(tgen, dut, intf, False)
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
step("Enable IGMP on R1 interface")
step("Configure r2 loopback interface as RP")
step("Enable PIM between r1 and r2")
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
step("Enable IGMP on r1 interface and send IGMP " "join (225.1.1.1) to r1")
step("Configure r2 loopback interface as RP")
step("Enable PIM between r1 and r2")
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
step("Enable IGMP on r1 interface")
step("Configure RP on r2 (loopback interface) for the group range " "224.0.0.0/4")
step("Configure RP on r4 (loopback interface) for the group range " "225.1.1.1/32")
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
step("Enable IGMP on r1 interface")
step("Configure RP on r1 (loopback interface) for the group range" " 224.0.0.0/4")
step("Enable the PIM on all the interfaces of r1, r2, r3 and r4 routers")
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
step("Enable IGMP on r1 interface")
step("Configure RP on r1 (loopback interface) for the group range" " 224.0.0.0/4")
step("Enable the PIM on all the interfaces of r1, r2, r3 and r4 routers")
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
step("Enable IGMP on r1 interface")
step("Configure RP on r2 (loopback interface) for the group range" " 225.1.1.0/24")
step("Enable the PIM on all the interfaces of r1, r2, r3 and r4 routers")
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
step("Enable IGMP on r1 interface")
step("Configure RP on r2 (loopback interface) for the group range" " 225.1.1.0/24")
step("Enable the PIM on all the interfaces of r1, r2, r3 and r4 routers")
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
step("Enable IGMP on r1 interface and send IGMP join (225.1.1.1) to r1")
step("Configure RP on r2 (loopback interface) for the group range" " 224.0.0.0/4")
step("Enable the PIM on all the interfaces of r1, r2, r3 and r4 routers")
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
step("Enable IGMP on r1 interface")
step("Configure RP on r2 (loopback interface) for the group range" " 224.0.0.0/4")
step("Enable the PIM on all the interfaces of r1, r2, r3 and r4 routers")
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
step("Enable IGMP on r1 interface and send IGMP join (225.1.1.1) to R1")
step("Configure RP on r3 (loopback interface) for the group range" " 224.0.0.0/4")
step("Enable the PIM on all the interfaces of r1, r2, r3 and r4 routers")
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
step("Enable IGMP on r1 interface and send IGMP join (225.1.1.1) to r1")
step("Configure RP on r2 (loopback interface) for the group range" "225.1.1.0/24")
step("Enable the PIM on all the interfaces of r1-r2-r3")
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
step("Delete existing RP configuration")
input_dict = {
"r2": {
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
# Steps to execute
step("Enable IGMP on r1 interface")
step("Configure RP on r2 (loopback interface) for the group range" " 224.0.0.0/4")
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
step("Enable IGMP on r1 interface")
step("Configure RP on r2 (loopback interface) for the group range" " 224.0.0.0/4")
step("r1: Delete the RP config")
clear_ip_mroute(tgen)
clear_ip_pim_interface_traffic(tgen, TOPO)
- step("pre-configuration to send IGMP join and multicast traffic")
- result = config_to_send_igmp_join_and_traffic(tgen, tc_name)
- assert result is True, "Testcase{}: Failed Error: {}".format(tc_name, result)
-
step("Enable IGMP on r1 interface")
step("Configure RP on r2 (lo) for the group range" " 224.0.0.0/4")
step("r2: Delete the RP configuration")