#############################################
# Verification APIs
#############################################
-def verify_pim_neighbors(tgen, topo, dut=None, iface=None):
+@retry(attempts=6, wait=2, return_is_str=True)
+def verify_pim_neighbors(tgen, topo, dut=None, iface=None, nbr_ip=None):
"""
Verify all PIM neighbors are up and running, config is verified
using "show ip pim neighbor" cli
* `topo` : json file data
* `dut` : dut info
* `iface` : link for which PIM nbr need to check
+ * `nbr_ip` : neighbor ip of interface
Usage
-----
- result = verify_pim_neighbors(tgen, topo, dut, link)
+ result = verify_pim_neighbors(tgen, topo, dut, iface=ens192, nbr_ip=20.1.1.2)
Returns
-------
return output_dict
-@retry(attempts=31, wait=2, return_is_str=True)
-def verify_pim_interface(tgen, topo, dut):
+@retry(attempts=21, wait=2, return_is_str=True)
+def verify_pim_interface(tgen, topo, dut, interface=None, interface_ip=None):
"""
Verify all PIM interface are up and running, config is verified
using "show ip pim interface" cli
* `tgen`: topogen object
* `topo` : json file data
* `dut` : device under test
+ * `interface` : interface name
+ * `interface_ip` : interface ip address
Usage
-----
- result = verify_pim_interfacetgen, topo, dut)
+ result = verify_pim_interfacetgen, topo, dut, interface=ens192, interface_ip=20.1.1.1)
Returns
-------
logger.info("[DUT: %s]: Verifying PIM interface status:", dut)
rnode = tgen.routers()[dut]
- show_ip_pim_interface_json = run_frr_cmd(
- rnode, "show ip pim interface json", isjson=True
- )
-
- for destLink, data in topo["routers"][dut]["links"].items():
- if "type" in data and data["type"] == "loopback":
- continue
+ show_ip_pim_interface_json = rnode.\
+ vtysh_cmd("show ip pim interface json", isjson=True)
+
+ logger.info("show_ip_pim_interface_json: \n %s",
+ show_ip_pim_interface_json)
+
+ if interface_ip:
+ if interface in show_ip_pim_interface_json:
+ pim_intf_json = show_ip_pim_interface_json[interface]
+ if pim_intf_json["address"] != interface_ip:
+ errormsg = ("[DUT %s]: PIM interface "
+ "ip is not correct "
+ "[FAILED]!! Expected : %s, Found : %s"
+ %(dut, pim_intf_json["address"],interface_ip))
+ return errormsg
+ else:
+ logger.info("[DUT %s]: PIM interface "
+ "ip is correct "
+ "[Passed]!! Expected : %s, Found : %s"
+ %(dut, pim_intf_json["address"],interface_ip))
+ return True
+ else:
+ for destLink, data in topo["routers"][dut]["links"].items():
+ if "type" in data and data["type"] == "loopback":
+ continue
- if "pim" in data and data["pim"] == "enable":
- pim_interface = data["interface"]
- pim_intf_ip = data["ipv4"].split("/")[0]
+ if "pim" in data and data["pim"] == "enable":
+ pim_interface = data["interface"]
+ pim_intf_ip = data["ipv4"].split("/")[0]
- if pim_interface in show_ip_pim_interface_json:
- pim_intf_json = show_ip_pim_interface_json[pim_interface]
+ if pim_interface in show_ip_pim_interface_json:
+ pim_intf_json = show_ip_pim_interface_json\
+ [pim_interface]
# Verifying PIM interface
- if (
- pim_intf_json["address"] != pim_intf_ip
- and pim_intf_json["state"] != "up"
- ):
- errormsg = (
- "[DUT %s]: PIM interface: %s "
- "PIM interface ip: %s, status check "
- "[FAILED]!! Expected : %s, Found : %s"
- % (
- dut,
- pim_interface,
- pim_intf_ip,
- pim_interface,
- pim_intf_json["state"],
- )
- )
+ if pim_intf_json["address"] != pim_intf_ip and \
+ pim_intf_json["state"] != "up":
+ errormsg = ("[DUT %s]: PIM interface: %s "
+ "PIM interface ip: %s, status check "
+ "[FAILED]!! Expected : %s, Found : %s"
+ %(dut, pim_interface, pim_intf_ip,
+ pim_interface, pim_intf_json["state"]))
return errormsg
- logger.info(
- "[DUT %s]: PIM interface: %s, "
- "interface ip: %s, status: %s"
- " [PASSED]!!",
- dut,
- pim_interface,
- pim_intf_ip,
- pim_intf_json["state"],
- )
- else:
- errormsg = (
- "[DUT %s]: PIM interface: %s "
- "PIM interface ip: %s, is not present "
- % (dut, pim_interface, pim_intf_ip,)
- )
- return errormsg
+ logger.info("[DUT %s]: PIM interface: %s, "
+ "interface ip: %s, status: %s"
+ " [PASSED]!!",
+ dut, pim_interface, pim_intf_ip,
+ pim_intf_json["state"])
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
rnode = tgen.routers()[dut]
- # sleep(60)
logger.info("[DUT: %s]: IP mroutes uptime before clear", dut)
mroute_json_1 = run_frr_cmd(rnode, "show ip mroute json", isjson=True)
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
+
+
+@retry(attempts=21, wait=2, return_is_str=True)
+def verify_igmp_interface(tgen, topo, dut, igmp_iface, interface_ip):
+ """
+ Verify all IGMP interface are up and running, config is verified
+ using "show ip igmp interface" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo` : json file data
+ * `dut` : device under test
+ * `igmp_iface` : interface name
+ * `interface_ip` : interface ip address
+
+ Usage
+ -----
+ result = verify_igmp_interface(tgen, topo, dut, igmp_iface, interface_ip)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for router in tgen.routers():
+ if router != dut:
+ continue
+
+ logger.info("[DUT: %s]: Verifying PIM interface status:",
+ dut)
+
+ rnode = tgen.routers()[dut]
+ show_ip_igmp_interface_json = \
+ run_frr_cmd(rnode, "show ip igmp interface json", isjson=True)
+
+ if igmp_iface in show_ip_igmp_interface_json:
+ igmp_intf_json = show_ip_igmp_interface_json[igmp_iface]
+ # Verifying igmp interface
+ if igmp_intf_json["address"] != interface_ip:
+ errormsg = ("[DUT %s]: igmp interface ip is not correct "
+ "[FAILED]!! Expected : %s, Found : %s"
+ %(dut, igmp_intf_json["address"], interface_ip))
+ return errormsg
+
+ logger.info("[DUT %s]: igmp interface: %s, "
+ "interface ip: %s"
+ " [PASSED]!!",
+ dut, igmp_iface, interface_ip)
+ else:
+ errormsg = ("[DUT %s]: igmp interface: %s "
+ "igmp interface ip: %s, is not present "
+ %(dut, igmp_iface, interface_ip))
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True