def apply_raw_config(tgen, input_dict):
-
"""
API to configure raw configuration on device. This can be used for any cli
which has not been implemented in JSON.
try:
router_list = tgen.routers()
for router, rnode in router_list.items():
-
result = rnode.check_router_running()
if result != "":
daemons = []
# Get all running configs in parallel
procs = {}
for rname in router_list:
- logger.info("Fetching running config for router %s", rname)
+ logger.debug("Fetching running config for router %s", rname)
procs[rname] = router_list[rname].popen(
["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
stdin=None,
#
procs = {}
for rname in router_list:
- logger.info("Fetching running config for router %s", rname)
+ logger.debug("Fetching running config for router %s", rname)
procs[rname] = router_list[rname].popen(
["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
stdin=None,
#
procs = {}
for rname in router_list:
- logger.info(
+ logger.debug(
"Generating delta for router %s to new configuration (gen %d)", rname, gen
)
procs[rname] = tgen.net.popen(
#
procs = {}
for rname in router_list:
- logger.info("Applying delta config on router %s", rname)
+ logger.debug("Applying delta config on router %s", rname)
procs[rname] = router_list[rname].popen(
["/usr/bin/env", "vtysh", "-f", delta_fmt.format(rname, gen)],
output, _ = p.communicate()
vtysh_command = "vtysh -f {}".format(delta_fmt.format(rname, gen))
if not p.returncode:
- router_list[rname].logger.info(
+ router_list[rname].logger.debug(
'\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(
vtysh_command, output
)
if show_router_config:
procs = {}
for rname in router_list:
- logger.info("Fetching running config for router %s", rname)
+ logger.debug("Fetching running config for router %s", rname)
procs[rname] = router_list[rname].popen(
["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
stdin=None,
output,
)
else:
- logger.info(
+ logger.debug(
"Configuration on router %s after reset:\n%s", rname, output
)
frr_cfg_bkup = frr_cfg_bkup_fmt.format(rname)
with open(frr_cfg_file, "r+") as cfg:
data = cfg.read()
- logger.info(
+ logger.debug(
"Applying following configuration on router %s (gen: %d):\n%s",
rname,
gen,
frr_cfg_file = frr_cfg_file_fmt.format(rname)
vtysh_command = "vtysh -f " + frr_cfg_file
if not p.returncode:
- router_list[rname].logger.info(
+ router_list[rname].logger.debug(
'\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(
vtysh_command, output
)
output,
)
else:
- logger.info("New configuration for router %s:\n%s", rname, output)
+ logger.debug("New configuration for router %s:\n%s", rname, output)
logger.debug("Exiting API: load_config_to_routers")
return not errors
bundle_procs[rname] = tgen.net[rname].popen(gen_sup_cmd, stdin=None)
for rname, rnode in router_list.items():
- logger.info("Waiting on support bundle for %s", rname)
+ logger.debug("Waiting on support bundle for %s", rname)
output, error = bundle_procs[rname].communicate()
if output:
- logger.info(
+ logger.debug(
"Output from collecting support bundle for %s:\n%s", rname, output
)
if error:
cmd = "ip link add link {} name {} type vlan id {}".format(
interface, vlan_intf, vlan
)
- logger.info("[DUT: %s]: Running command: %s", dut, cmd)
+ logger.debug("[DUT: %s]: Running command: %s", dut, cmd)
result = rnode.run(cmd)
- logger.info("result %s", result)
+ logger.debug("result %s", result)
# Bringing interface up
cmd = "ip link set {} up".format(vlan_intf)
- logger.info("[DUT: %s]: Running command: %s", dut, cmd)
+ logger.debug("[DUT: %s]: Running command: %s", dut, cmd)
result = rnode.run(cmd)
- logger.info("result %s", result)
+ logger.debug("result %s", result)
# Assigning IP address
ifaddr = ipaddress.ip_interface(
cmd = "ip -{0} a flush {1} scope global && ip a add {2} dev {1} && ip l set {1} up".format(
ifaddr.version, vlan_intf, ifaddr
)
- logger.info("[DUT: %s]: Running command: %s", dut, cmd)
+ logger.debug("[DUT: %s]: Running command: %s", dut, cmd)
result = rnode.run(cmd)
- logger.info("result %s", result)
+ logger.debug("result %s", result)
def tcpdump_capture_start(
vrf["name"], vrf["id"]
)
- logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
+ logger.debug(
+ "[DUT: %s]: Running kernel cmd [%s]", c_router, cmd
+ )
rnode.run(cmd)
# Kernel cmd - Bring down VRF
cmd = "ip link set dev {} down".format(name)
- logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
+ logger.debug(
+ "[DUT: %s]: Running kernel cmd [%s]", c_router, cmd
+ )
rnode.run(cmd)
else:
cmd = "ip link add {} type vrf table {}".format(
name, table_id
)
- logger.info(
+ logger.debug(
"[DUT: %s]: Running kernel cmd " "[%s]", c_router, cmd
)
rnode.run(cmd)
# Kernel cmd - Bring up VRF
cmd = "ip link set dev {} up".format(name)
- logger.info(
+ logger.debug(
"[DUT: %s]: Running kernel " "cmd [%s]", c_router, cmd
)
rnode.run(cmd)
interface_name, _vrf
)
- logger.info(
+ logger.debug(
"[DUT: %s]: Running" " kernel cmd [%s]",
c_router,
cmd,
cmd = "ip -{0} a flush {1} scope global && ip a add {2} dev {1} && ip l set {1} up".format(
ifaddr.version, name, ifaddr
)
- logger.info("[DUT: %s]: Running command: %s", dut, cmd)
+ logger.debug("[DUT: %s]: Running command: %s", dut, cmd)
rnode.run(cmd)
if vrf:
action = "down"
cmd = "{} {} {}".format(cmd, intf_name, action)
- logger.info("[DUT: %s]: Running command: %s", dut, cmd)
+ logger.debug("[DUT: %s]: Running command: %s", dut, cmd)
rnode.run(cmd)
rlist = []
for router in input_dict.keys():
-
interface_list = input_dict[router]["interface_list"]
status = input_dict[router].setdefault("status", "up")
for intf in interface_list:
)
if initial_wait > 0:
- logger.info("Waiting for [%s]s as initial delay", initial_wait)
+ logger.debug("Waiting for [%s]s as initial delay", initial_wait)
sleep(initial_wait)
invert_logic = not _expected
return saved_failure
if saved_failure:
- logger.info(
+ logger.debug(
"RETRY DIAG: [failure] Sleeping %ds until next retry with %.1f retry time left - too see if timeout was too short",
retry_sleep,
seconds_left,
)
else:
- logger.info(
+ logger.debug(
"Sleeping %ds until next retry with %.1f retry time left",
retry_sleep,
seconds_left,
interface_data += _create_interfaces_ospf_cfg(
"ospf6", c_data, data, ospf_keywords + ["area"]
)
+ interface_data.append("exit")
if interface_data:
interface_data_dict[c_router] = interface_data
continue
rmap_data = []
for rmap_name, rmap_value in input_dict[router]["route_maps"].items():
-
for rmap_dict in rmap_value:
del_action = rmap_dict.setdefault("delete", False)
group_addr_range = [group_addr_range]
for grp_addr in group_addr_range:
-
addr_type = validate_ip_address(grp_addr)
if addr_type == "ipv4":
if next_hop is not None:
if "brctl" in input_dict[dut]:
for brctl_dict in input_dict[dut]["brctl"]:
-
brctl_names = brctl_dict.setdefault("brctl_name", [])
addvxlans = brctl_dict.setdefault("addvxlan", [])
stp_values = brctl_dict.setdefault("stp", [])
for brctl_name, vxlan, vrf, stp in zip(
brctl_names, addvxlans, vrfs, stp_values
):
-
ip_cmd_list = []
cmd = "ip link add name {} type bridge stp_state {}".format(
brctl_name, stp
for static_route in static_routes:
if "vrf" in static_route and static_route["vrf"] is not None:
-
logger.info(
"[DUT: {}]: Verifying routes for VRF:"
" {}".format(router, static_route["vrf"])
for static_route in static_routes:
if "vrf" in static_route and static_route["vrf"] is not None:
-
logger.info(
"[DUT: {}]: Verifying routes for VRF:"
" {}".format(router, static_route["vrf"])