2 # Copyright (c) 2019 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
21 from collections
import OrderedDict
22 from datetime
import datetime
23 from time
import sleep
24 from copy
import deepcopy
25 from subprocess
import call
26 from subprocess
import STDOUT
as SUB_STDOUT
27 from subprocess
import PIPE
as SUB_PIPE
28 from subprocess
import Popen
29 from functools
import wraps
30 from re
import search
as re_search
31 from tempfile
import mkdtemp
41 if sys
.version_info
[0] > 2:
45 import ConfigParser
as configparser
47 from lib
.topolog
import logger
, logger_config
48 from lib
.topogen
import TopoRouter
, get_topogen
49 from lib
.topotest
import interface_set_status
, version_cmp
, frr_unicode
51 FRRCFG_FILE
= "frr_json.conf"
52 FRRCFG_BKUP_FILE
= "frr_json_initial.conf"
54 ERROR_LIST
= ["Malformed", "Failure", "Unknown", "Incomplete"]
58 CD
= os
.path
.dirname(os
.path
.realpath(__file__
))
59 PYTESTINI_PATH
= os
.path
.join(CD
, "../pytest.ini")
61 # Creating tmp dir with testsuite name to avoid conflict condition when
62 # multiple testsuites run together. All temporary files would be created
63 # in this dir and this dir would be removed once testsuite run is
65 LOGDIR
= "/tmp/topotests/"
68 # NOTE: to save execution logs to log file frrtest_log_dir must be configured
70 config
= configparser
.ConfigParser()
71 config
.read(PYTESTINI_PATH
)
73 config_section
= "topogen"
75 if config
.has_option("topogen", "verbosity"):
76 loglevel
= config
.get("topogen", "verbosity")
77 loglevel
= loglevel
.upper()
81 if config
.has_option("topogen", "frrtest_log_dir"):
82 frrtest_log_dir
= config
.get("topogen", "frrtest_log_dir")
83 time_stamp
= datetime
.time(datetime
.now())
84 logfile_name
= "frr_test_bgp_"
85 frrtest_log_file
= frrtest_log_dir
+ logfile_name
+ str(time_stamp
)
86 print("frrtest_log_file..", frrtest_log_file
)
88 logger
= logger_config
.get_logger(
89 name
="test_execution_logs", log_level
=loglevel
, target
=frrtest_log_file
91 print("Logs will be sent to logfile: {}".format(frrtest_log_file
))
93 if config
.has_option("topogen", "show_router_config"):
94 show_router_config
= config
.get("topogen", "show_router_config")
96 show_router_config
= False
98 # env variable for setting what address type to test
99 ADDRESS_TYPES
= os
.environ
.get("ADDRESS_TYPES")
102 # Saves sequence id numbers
103 SEQ_ID
= {"prefix_lists": {}, "route_maps": {}}
106 def get_seq_id(obj_type
, router
, obj_name
):
108 Generates and saves sequence number in interval of 10
111 * `obj_type`: prefix_lists or route_maps
112 * `router`: router name
113 *` obj_name`: name of the prefix-list or route-map
116 Sequence number generated
119 router_data
= SEQ_ID
[obj_type
].setdefault(router
, {})
120 obj_data
= router_data
.setdefault(obj_name
, {})
121 seq_id
= obj_data
.setdefault("seq_id", 0)
123 seq_id
= int(seq_id
) + 10
124 obj_data
["seq_id"] = seq_id
129 def set_seq_id(obj_type
, router
, id, obj_name
):
131 Saves sequence number if not auto-generated and given by user
134 * `obj_type`: prefix_lists or route_maps
135 * `router`: router name
136 *` obj_name`: name of the prefix-list or route-map
138 router_data
= SEQ_ID
[obj_type
].setdefault(router
, {})
139 obj_data
= router_data
.setdefault(obj_name
, {})
140 seq_id
= obj_data
.setdefault("seq_id", 0)
142 seq_id
= int(seq_id
) + int(id)
143 obj_data
["seq_id"] = seq_id
146 class InvalidCLIError(Exception):
147 """Raise when the CLI command is wrong"""
152 def run_frr_cmd(rnode
, cmd
, isjson
=False):
154 Execute frr show commands in priviledged mode
155 * `rnode`: router node on which commands needs to executed
156 * `cmd`: Command to be executed on frr
157 * `isjson`: If command is to get json data or not
162 ret_data
= rnode
.vtysh_cmd(cmd
, isjson
=isjson
)
166 logger
.debug(ret_data
)
167 print_data
= rnode
.vtysh_cmd(cmd
.rstrip("json"), isjson
=False)
169 print_data
= ret_data
172 "Output for command [ %s] on router %s:\n%s",
180 raise InvalidCLIError("No actual cmd passed")
183 def apply_raw_config(tgen
, input_dict
):
186 API to configure raw configuration on device. This can be used for any cli
187 which does has not been implemented in JSON.
191 * `tgen`: tgen onject
192 * `input_dict`: configuration that needs to be applied
200 "no bgp update-group-split-horizon"
210 for router_name
in input_dict
.keys():
211 config_cmd
= input_dict
[router_name
]["raw_config"]
213 if not isinstance(config_cmd
, list):
214 config_cmd
= [config_cmd
]
216 frr_cfg_file
= "{}/{}/{}".format(TMPDIR
, router_name
, FRRCFG_FILE
)
217 with
open(frr_cfg_file
, "w") as cfg
:
218 for cmd
in config_cmd
:
219 cfg
.write("{}\n".format(cmd
))
221 result
= load_config_to_router(tgen
, router_name
)
226 def create_common_configuration(
227 tgen
, router
, data
, config_type
=None, build
=False, load_config
=True
230 API to create object of class FRRConfig and also create frr_json.conf
231 file. It will create interface and common configurations and save it to
232 frr_json.conf and load to router
235 * `tgen`: tgen onject
236 * `data`: Congiguration data saved in a list.
237 * `router` : router id to be configured.
238 * `config_type` : Syntactic information while writing configuration. Should
239 be one of the value as mentioned in the config_map below.
240 * `build` : Only for initial setup phase this is set as True
245 TMPDIR
= os
.path
.join(LOGDIR
, tgen
.modname
)
247 fname
= "{}/{}/{}".format(TMPDIR
, router
, FRRCFG_FILE
)
249 config_map
= OrderedDict(
251 "general_config": "! FRR General Config\n",
252 "interface_config": "! Interfaces Config\n",
253 "static_route": "! Static Route Config\n",
254 "prefix_list": "! Prefix List Config\n",
255 "bgp_community_list": "! Community List Config\n",
256 "route_maps": "! Route Maps Config\n",
257 "bgp": "! BGP Config\n",
258 "vrf": "! VRF Config\n",
259 "ospf": "! OSPF Config\n",
265 elif not load_config
:
271 frr_cfg_fd
= open(fname
, mode
)
273 frr_cfg_fd
.write(config_map
[config_type
])
275 frr_cfg_fd
.write("{} \n".format(str(line
)))
276 frr_cfg_fd
.write("\n")
278 except IOError as err
:
280 "Unable to open FRR Config File. error(%s): %s" % (err
.errno
, err
.strerror
)
286 # If configuration applied from build, it will done at last
287 if not build
and load_config
:
288 load_config_to_router(tgen
, router
)
293 def kill_router_daemons(tgen
, router
, daemons
):
295 Router's current config would be saved to /etc/frr/ for each deamon
296 and deamon would be killed forcefully using SIGKILL.
297 * `tgen` : topogen object
298 * `router`: Device under test
299 * `daemons`: list of daemons to be killed
302 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
305 router_list
= tgen
.routers()
307 # Saving router config to /etc/frr, which will be loaded to router
309 router_list
[router
].vtysh_cmd("write memory")
312 result
= router_list
[router
].killDaemons(daemons
)
314 assert "Errors found post shutdown - details follow:" == 0, result
317 except Exception as e
:
318 errormsg
= traceback
.format_exc()
319 logger
.error(errormsg
)
323 def start_router_daemons(tgen
, router
, daemons
):
325 Daemons defined by user would be started
326 * `tgen` : topogen object
327 * `router`: Device under test
328 * `daemons`: list of daemons to be killed
331 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
334 router_list
= tgen
.routers()
337 result
= router_list
[router
].startDaemons(daemons
)
340 except Exception as e
:
341 errormsg
= traceback
.format_exc()
342 logger
.error(errormsg
)
345 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
349 def kill_mininet_routers_process(tgen
):
351 Kill all mininet stale router' processes
352 * `tgen` : topogen object
355 router_list
= tgen
.routers()
356 for rname
, router
in router_list
.items():
369 for daemon
in daemon_list
:
370 router
.run("killall -9 {}".format(daemon
))
373 def check_router_status(tgen
):
375 Check if all daemons are running for all routers in topology
376 * `tgen` : topogen object
379 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
382 router_list
= tgen
.routers()
383 for router
, rnode
in router_list
.items():
385 result
= rnode
.check_router_running()
389 daemons
.append("bgpd")
390 if "zebra" in result
:
391 daemons
.append("zebra")
393 rnode
.startDaemons(daemons
)
395 except Exception as e
:
396 errormsg
= traceback
.format_exc()
397 logger
.error(errormsg
)
400 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
406 Return a StringIO object appropriate for the current python version.
408 if sys
.version_info
[0] > 2:
411 return StringIO
.StringIO()
414 def reset_config_on_routers(tgen
, routerName
=None):
416 Resets configuration on routers to the snapshot created using input JSON
417 file. It replaces existing router configuration with FRRCFG_BKUP_FILE
421 * `tgen` : Topogen object
422 * `routerName` : router config is to be reset
425 logger
.debug("Entering API: reset_config_on_routers")
427 router_list
= tgen
.routers()
428 for rname
in ROUTER_LIST
:
429 if routerName
and routerName
!= rname
:
432 router
= router_list
[rname
]
433 logger
.info("Configuring router %s to initial test configuration", rname
)
435 cfg
= router
.run("vtysh -c 'show running'")
436 fname
= "{}/{}/frr.sav".format(TMPDIR
, rname
)
437 dname
= "{}/{}/delta.conf".format(TMPDIR
, rname
)
439 for line
in cfg
.split("\n"):
443 line
== "Building configuration..."
444 or line
== "Current configuration:"
452 run_cfg_file
= "{}/{}/frr.sav".format(TMPDIR
, rname
)
453 init_cfg_file
= "{}/{}/frr_json_initial.conf".format(TMPDIR
, rname
)
454 command
= "/usr/lib/frr/frr-reload.py --input {} --test {} > {}".format(
455 run_cfg_file
, init_cfg_file
, dname
457 result
= call(command
, shell
=True, stderr
=SUB_STDOUT
, stdout
=SUB_PIPE
)
459 # Assert if command fail
461 logger
.error("Delta file creation failed. Command executed %s", command
)
462 with
open(run_cfg_file
, "r") as fd
:
464 "Running configuration saved in %s is:\n%s", run_cfg_file
, fd
.read()
466 with
open(init_cfg_file
, "r") as fd
:
468 "Test configuration saved in %s is:\n%s", init_cfg_file
, fd
.read()
471 err_cmd
= ["/usr/bin/vtysh", "-m", "-f", run_cfg_file
]
472 result
= Popen(err_cmd
, stdout
=SUB_PIPE
, stderr
=SUB_PIPE
)
473 output
= result
.communicate()
474 for out_data
in output
:
475 temp_data
= out_data
.decode("utf-8").lower()
476 for out_err
in ERROR_LIST
:
477 if out_err
.lower() in temp_data
:
479 "Found errors while validating data in" " %s", run_cfg_file
481 raise InvalidCLIError(out_data
)
482 raise InvalidCLIError("Unknown error in %s", output
)
486 delta
.write("configure terminal\n")
489 # Don't disable debugs
492 for line
in t_delta
.split("\n"):
494 if line
== "Lines To Delete" or line
== "===============" or not line
:
497 if line
== "Lines To Add":
501 if line
== "============" or not line
:
504 # Leave debugs and log output alone
506 if "debug" in line
or "log file" in line
:
516 output
= router
.vtysh_multicmd(delta
.getvalue(), pretty_output
=False)
520 cfg
= router
.run("vtysh -c 'show running'")
521 for line
in cfg
.split("\n"):
526 # Router current configuration to log file or console if
527 # "show_router_config" is defined in "pytest.ini"
528 if show_router_config
:
529 logger
.info("Configuration on router {} after reset:".format(rname
))
530 logger
.info(delta
.getvalue())
533 logger
.debug("Exiting API: reset_config_on_routers")
537 def load_config_to_router(tgen
, routerName
, save_bkup
=False):
539 Loads configuration on router from the file FRRCFG_FILE.
543 * `tgen` : Topogen object
544 * `routerName` : router for which configuration to be loaded
545 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
548 logger
.debug("Entering API: load_config_to_router")
550 router_list
= tgen
.routers()
551 for rname
in ROUTER_LIST
:
552 if routerName
and rname
!= routerName
:
555 router
= router_list
[rname
]
557 frr_cfg_file
= "{}/{}/{}".format(TMPDIR
, rname
, FRRCFG_FILE
)
558 frr_cfg_bkup
= "{}/{}/{}".format(TMPDIR
, rname
, FRRCFG_BKUP_FILE
)
559 with
open(frr_cfg_file
, "r+") as cfg
:
562 "Applying following configuration on router"
563 " {}:\n{}".format(rname
, data
)
566 with
open(frr_cfg_bkup
, "w") as bkup
:
569 output
= router
.vtysh_multicmd(data
, pretty_output
=False)
570 for out_err
in ERROR_LIST
:
571 if out_err
.lower() in output
.lower():
572 raise InvalidCLIError("%s" % output
)
576 except IOError as err
:
578 "Unable to open config File. error(%s):" " %s",
579 (err
.errno
, err
.strerror
),
583 # Router current configuration to log file or console if
584 # "show_router_config" is defined in "pytest.ini"
585 if show_router_config
:
586 logger
.info("New configuration for router {}:".format(rname
))
587 new_config
= router
.run("vtysh -c 'show running'")
588 logger
.info(new_config
)
590 logger
.debug("Exiting API: load_config_to_router")
594 def get_frr_ipv6_linklocal(tgen
, router
, intf
=None, vrf
=None):
596 API to get the link local ipv6 address of a perticular interface using
597 FRR command 'show interface'
599 * `tgen`: tgen onject
600 * `router` : router for which hightest interface should be
602 * `intf` : interface for which linklocal address needs to be taken
607 linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A)
611 1) array of interface names to link local ips.
614 router_list
= tgen
.routers()
615 for rname
, rnode
in router_list
.items():
622 cmd
= "show interface vrf {}".format(vrf
)
624 cmd
= "show interface"
626 ifaces
= router_list
[router
].run('vtysh -c "{}"'.format(cmd
))
628 # Fix newlines (make them all the same)
629 ifaces
= ("\n".join(ifaces
.splitlines()) + "\n").splitlines()
635 m
= re_search("Interface ([a-zA-Z0-9-]+) is", line
)
637 interface
= m
.group(1).split(" ")[0]
641 m1
= re_search("inet6 (fe80[:a-fA-F0-9]+[/0-9]+)", line
)
645 if ll_per_if_count
> 1:
646 linklocal
+= [["%s-%s" % (interface
, ll_per_if_count
), local
]]
648 linklocal
+= [[interface
, local
]]
652 return [_linklocal
[1] for _linklocal
in linklocal
if _linklocal
[0] == intf
][
657 errormsg
= "Link local ip missing on router {}"
661 def generate_support_bundle():
663 API to generate support bundle on any verification ste failure.
664 it runs a python utility, /usr/lib/frr/generate_support_bundle.py,
665 which basically runs defined CLIs and dumps the data to specified location
669 router_list
= tgen
.routers()
670 test_name
= sys
._getframe
(2).f_code
.co_name
671 TMPDIR
= os
.path
.join(LOGDIR
, tgen
.modname
)
673 for rname
, rnode
in router_list
.items():
674 logger
.info("Generating support bundle for {}".format(rname
))
675 rnode
.run("mkdir -p /var/log/frr")
676 bundle_log
= rnode
.run("python2 /usr/lib/frr/generate_support_bundle.py")
677 logger
.info(bundle_log
)
679 dst_bundle
= "{}/{}/support_bundles/{}".format(TMPDIR
, rname
, test_name
)
680 src_bundle
= "/var/log/frr"
681 rnode
.run("rm -rf {}".format(dst_bundle
))
682 rnode
.run("mkdir -p {}".format(dst_bundle
))
683 rnode
.run("mv -f {}/* {}".format(src_bundle
, dst_bundle
))
688 def start_topology(tgen
, daemon
=None):
690 Starting topology, create tmp files which are loaded to routers
691 to start deamons and then start routers
692 * `tgen` : topogen object
695 global TMPDIR
, ROUTER_LIST
697 tgen
.start_topology()
701 router_list
= tgen
.routers()
702 ROUTER_LIST
= sorted(
703 router_list
.keys(), key
=lambda x
: int(re_search("[0-9]+", x
).group(0))
705 TMPDIR
= os
.path
.join(LOGDIR
, tgen
.modname
)
708 router_list
= tgen
.routers()
709 for rname
in ROUTER_LIST
:
710 router
= router_list
[rname
]
712 # It will help in debugging the failures, will give more details on which
713 # specific kernel version tests are failing
715 linux_ver
= router
.run("uname -a")
716 logger
.info("Logging platform related details: \n %s \n", linux_ver
)
721 # Creating router named dir and empty zebra.conf bgpd.conf files
722 # inside the current directory
723 if os
.path
.isdir("{}".format(rname
)):
724 os
.system("rm -rf {}".format(rname
))
725 os
.mkdir("{}".format(rname
))
726 os
.system("chmod -R go+rw {}".format(rname
))
727 os
.chdir("{}/{}".format(TMPDIR
, rname
))
728 os
.system("touch zebra.conf bgpd.conf")
730 os
.mkdir("{}".format(rname
))
731 os
.system("chmod -R go+rw {}".format(rname
))
732 os
.chdir("{}/{}".format(TMPDIR
, rname
))
733 os
.system("touch zebra.conf bgpd.conf")
735 except IOError as err
:
736 logger
.error("I/O error({0}): {1}".format(err
.errno
, err
.strerror
))
738 # Loading empty zebra.conf file to router, to start the zebra deamon
740 TopoRouter
.RD_ZEBRA
, "{}/{}/zebra.conf".format(TMPDIR
, rname
)
743 # Loading empty bgpd.conf file to router, to start the bgp deamon
744 router
.load_config(TopoRouter
.RD_BGP
, "{}/{}/bgpd.conf".format(TMPDIR
, rname
))
746 if daemon
and "ospfd" in daemon
:
747 # Loading empty ospf.conf file to router, to start the bgp deamon
749 TopoRouter
.RD_OSPF
, "{}/{}/ospfd.conf".format(TMPDIR
, rname
)
752 logger
.info("Starting all routers once topology is created")
756 def stop_router(tgen
, router
):
758 Router"s current config would be saved to /etc/frr/ for each deamon
759 and router and its deamons would be stopped.
761 * `tgen` : topogen object
762 * `router`: Device under test
765 router_list
= tgen
.routers()
767 # Saving router config to /etc/frr, which will be loaded to router
769 router_list
[router
].vtysh_cmd("write memory")
772 router_list
[router
].stop()
775 def start_router(tgen
, router
):
777 Router will started and config would be loaded from /etc/frr/ for each
780 * `tgen` : topogen object
781 * `router`: Device under test
784 logger
.debug("Entering lib API: start_router")
787 router_list
= tgen
.routers()
789 # Router and its deamons would be started and config would
790 # be loaded to router for each deamon from /etc/frr
791 router_list
[router
].start()
793 # Waiting for router to come up
796 except Exception as e
:
797 errormsg
= traceback
.format_exc()
798 logger
.error(errormsg
)
801 logger
.debug("Exiting lib API: start_router()")
805 def number_to_row(routerName
):
807 Returns the number for the router.
808 Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23
811 return int(routerName
[1:])
814 def number_to_column(routerName
):
816 Returns the number for the router.
817 Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1,
820 return ord(routerName
[0]) - 97
823 def topo_daemons(tgen
, topo
):
825 Returns daemon list required for the suite based on topojson.
829 router_list
= tgen
.routers()
830 ROUTER_LIST
= sorted(
831 router_list
.keys(), key
=lambda x
: int(re_search("[0-9]+", x
).group(0))
834 for rtr
in ROUTER_LIST
:
835 if "ospf" in topo
["routers"][rtr
] and "ospfd" not in daemon_list
:
836 daemon_list
.append("ospfd")
841 #############################################
842 # Common APIs, will be used by all protocols
843 #############################################
846 def create_vrf_cfg(tgen
, topo
, input_dict
=None, build
=False):
848 Create vrf configuration for created topology. VRF
849 configuration is provided in input json file.
851 VRF config is done in Linux Kernel:
853 * Attach interface to VRF
858 * `tgen` : Topogen object
859 * `topo` : json file data
860 * `input_dict` : Input dict data, required when configuring
862 * `build` : Only for initial setup phase this is set as True.
869 "r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_A"},
870 "r2-link2": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_B"},
871 "r2-link3": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_A"},
872 "r2-link4": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_B"},
895 result = create_vrf_cfg(tgen, topo, input_dict)
903 input_dict
= deepcopy(topo
)
905 input_dict
= deepcopy(input_dict
)
908 for c_router
, c_data
in input_dict
.items():
909 rnode
= tgen
.routers()[c_router
]
911 for vrf
in c_data
["vrfs"]:
913 del_action
= vrf
.setdefault("delete", False)
914 name
= vrf
.setdefault("name", None)
915 table_id
= vrf
.setdefault("id", None)
916 vni
= vrf
.setdefault("vni", None)
917 del_vni
= vrf
.setdefault("no_vni", None)
920 # Kernel cmd- Add VRF and table
921 cmd
= "ip link del {} type vrf table {}".format(
922 vrf
["name"], vrf
["id"]
925 logger
.info("[DUT: %s]: Running kernel cmd [%s]", c_router
, cmd
)
928 # Kernel cmd - Bring down VRF
929 cmd
= "ip link set dev {} down".format(name
)
930 logger
.info("[DUT: %s]: Running kernel cmd [%s]", c_router
, cmd
)
934 if name
and table_id
:
935 # Kernel cmd- Add VRF and table
936 cmd
= "ip link add {} type vrf table {}".format(
940 "[DUT: %s]: Running kernel cmd " "[%s]", c_router
, cmd
944 # Kernel cmd - Bring up VRF
945 cmd
= "ip link set dev {} up".format(name
)
947 "[DUT: %s]: Running kernel " "cmd [%s]", c_router
, cmd
951 if "links" in c_data
:
952 for destRouterLink
, data
in sorted(
953 c_data
["links"].items()
955 # Loopback interfaces
956 if "type" in data
and data
["type"] == "loopback":
957 interface_name
= destRouterLink
959 interface_name
= data
["interface"]
962 vrf_list
= data
["vrf"]
964 if type(vrf_list
) is not list:
965 vrf_list
= [vrf_list
]
967 for _vrf
in vrf_list
:
968 cmd
= "ip link set {} master {}".format(
973 "[DUT: %s]: Running" " kernel cmd [%s]",
980 config_data
.append("vrf {}".format(vrf
["name"]))
981 cmd
= "vni {}".format(vni
)
982 config_data
.append(cmd
)
985 config_data
.append("vrf {}".format(vrf
["name"]))
986 cmd
= "no vni {}".format(del_vni
)
987 config_data
.append(cmd
)
989 result
= create_common_configuration(
990 tgen
, c_router
, config_data
, "vrf", build
=build
993 except InvalidCLIError
:
995 errormsg
= traceback
.format_exc()
996 logger
.error(errormsg
)
1002 def create_interface_in_kernel(
1003 tgen
, dut
, name
, ip_addr
, vrf
=None, netmask
=None, create
=True
1006 Cretae interfaces in kernel for ipv4/ipv6
1007 Config is done in Linux Kernel:
1011 * `tgen` : Topogen object
1012 * `dut` : Device for which interfaces to be added
1013 * `name` : interface name
1014 * `ip_addr` : ip address for interface
1015 * `vrf` : VRF name, to which interface will be associated
1016 * `netmask` : netmask value, default is None
1017 * `create`: Create interface in kernel, if created then no need
1021 rnode
= tgen
.routers()[dut
]
1024 cmd
= "sudo ip link add name {} type dummy".format(name
)
1027 addr_type
= validate_ip_address(ip_addr
)
1028 if addr_type
== "ipv4":
1029 cmd
= "ifconfig {} {} netmask {}".format(name
, ip_addr
, netmask
)
1031 cmd
= "ifconfig {} inet6 add {}/{}".format(name
, ip_addr
, netmask
)
1036 cmd
= "ip link set {} master {}".format(name
, vrf
)
1040 def shutdown_bringup_interface_in_kernel(tgen
, dut
, intf_name
, ifaceaction
=False):
1042 Cretae interfaces in kernel for ipv4/ipv6
1043 Config is done in Linux Kernel:
1047 * `tgen` : Topogen object
1048 * `dut` : Device for which interfaces to be added
1049 * `intf_name` : interface name
1050 * `ifaceaction` : False to shutdown and True to bringup the
1054 rnode
= tgen
.routers()[dut
]
1056 cmd
= "ip link set dev"
1059 cmd
= "{} {} {}".format(cmd
, intf_name
, action
)
1062 cmd
= "{} {} {}".format(cmd
, intf_name
, action
)
1064 logger
.info("[DUT: %s]: Running command: %s", dut
, cmd
)
1068 def validate_ip_address(ip_address
):
1070 Validates the type of ip address
1073 * `ip_address`: IPv4/IPv6 address
1076 Type of address as string
1079 if "/" in ip_address
:
1080 ip_address
= ip_address
.split("/")[0]
1085 socket
.inet_aton(ip_address
)
1086 except socket
.error
as error
:
1087 logger
.debug("Not a valid IPv4 address")
1093 socket
.inet_pton(socket
.AF_INET6
, ip_address
)
1094 except socket
.error
as error
:
1095 logger
.debug("Not a valid IPv6 address")
1100 if not v4
and not v6
:
1102 "InvalidIpAddr", "%s is neither valid IPv4 or IPv6" " address" % ip_address
1106 def check_address_types(addr_type
=None):
1108 Checks environment variable set and compares with the current address type
1111 addr_types_env
= os
.environ
.get("ADDRESS_TYPES")
1112 if not addr_types_env
:
1113 addr_types_env
= "dual"
1115 if addr_types_env
== "dual":
1116 addr_types
= ["ipv4", "ipv6"]
1117 elif addr_types_env
== "ipv4":
1118 addr_types
= ["ipv4"]
1119 elif addr_types_env
== "ipv6":
1120 addr_types
= ["ipv6"]
1122 if addr_type
is None:
1125 if addr_type
not in addr_types
:
1127 "{} not in supported/configured address types {}".format(
1128 addr_type
, addr_types
1136 def generate_ips(network
, no_of_ips
):
1138 Returns list of IPs.
1139 based on start_ip and no_of_ips
1140 * `network` : from here the ip will start generating,
1142 * `no_of_ips` : these many IPs will be generated
1146 if type(network
) is not list:
1149 for start_ipaddr
in network
:
1150 if "/" in start_ipaddr
:
1151 start_ip
= start_ipaddr
.split("/")[0]
1152 mask
= int(start_ipaddr
.split("/")[1])
1154 logger
.debug("start_ipaddr {} must have a / in it".format(start_ipaddr
))
1157 addr_type
= validate_ip_address(start_ip
)
1158 if addr_type
== "ipv4":
1159 start_ip
= ipaddress
.IPv4Address(frr_unicode(start_ip
))
1160 step
= 2 ** (32 - mask
)
1161 if addr_type
== "ipv6":
1162 start_ip
= ipaddress
.IPv6Address(frr_unicode(start_ip
))
1163 step
= 2 ** (128 - mask
)
1167 while count
< no_of_ips
:
1168 ipaddress_list
.append("{}/{}".format(next_ip
, mask
))
1169 if addr_type
== "ipv6":
1170 next_ip
= ipaddress
.IPv6Address(int(next_ip
) + step
)
1175 return ipaddress_list
1178 def find_interface_with_greater_ip(topo
, router
, loopback
=True, interface
=True):
1180 Returns highest interface ip for ipv4/ipv6. If loopback is there then
1181 it will return highest IP from loopback IPs otherwise from physical
1183 * `topo` : json file data
1184 * `router` : router for which hightest interface should be calculated
1187 link_data
= topo
["routers"][router
]["links"]
1189 interfaces_list
= []
1191 for destRouterLink
, data
in sorted(link_data
.items()):
1193 if "type" in data
and data
["type"] == "loopback":
1195 ip_address
= topo
["routers"][router
]["links"][destRouterLink
][
1198 lo_list
.append(ip_address
)
1200 ip_address
= topo
["routers"][router
]["links"][destRouterLink
]["ipv4"].split(
1203 interfaces_list
.append(ip_address
)
1206 return sorted(lo_list
)[-1]
1208 return sorted(interfaces_list
)[-1]
1211 def write_test_header(tc_name
):
1212 """ Display message at beginning of test case"""
1214 logger
.info("*" * (len(tc_name
) + count
))
1215 step("START -> Testcase : %s" % tc_name
, reset
=True)
1216 logger
.info("*" * (len(tc_name
) + count
))
1219 def write_test_footer(tc_name
):
1220 """ Display message at end of test case"""
1222 logger
.info("=" * (len(tc_name
) + count
))
1223 logger
.info("Testcase : %s -> PASSED", tc_name
)
1224 logger
.info("=" * (len(tc_name
) + count
))
1227 def interface_status(tgen
, topo
, input_dict
):
1229 Delete ip route maps from device
1230 * `tgen` : Topogen object
1231 * `topo` : json file data
1232 * `input_dict` : for which router, route map has to be deleted
1237 "interface_list": ['eth1-r1-r2', 'eth2-r1-r3'],
1243 errormsg(str) or True
1245 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1249 for router
in input_dict
.keys():
1251 interface_list
= input_dict
[router
]["interface_list"]
1252 status
= input_dict
[router
].setdefault("status", "up")
1253 for intf
in interface_list
:
1254 rnode
= tgen
.routers()[router
]
1255 interface_set_status(rnode
, intf
, status
)
1257 # Load config to router
1258 load_config_to_router(tgen
, router
)
1260 except Exception as e
:
1261 # handle any exception
1262 logger
.error("Error %s occured. Arguments %s.", e
.message
, e
.args
)
1265 errormsg
= traceback
.format_exc()
1266 logger
.error(errormsg
)
1269 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1273 def retry(attempts
=3, wait
=2, return_is_str
=True, initial_wait
=0, return_is_dict
=False):
1275 Retries function execution, if return is an errormsg or exception
1277 * `attempts`: Number of attempts to make
1278 * `wait`: Number of seconds to wait between each attempt
1279 * `return_is_str`: Return val is an errormsg in case of failure
1280 * `initial_wait`: Sleeps for this much seconds before executing function
1286 def func_retry(*args
, **kwargs
):
1287 _wait
= kwargs
.pop("wait", wait
)
1288 _attempts
= kwargs
.pop("attempts", attempts
)
1289 _attempts
= int(_attempts
)
1292 raise ValueError("attempts must be 0 or greater")
1294 if initial_wait
> 0:
1295 logger
.info("Waiting for [%s]s as initial delay", initial_wait
)
1298 _return_is_str
= kwargs
.pop("return_is_str", return_is_str
)
1299 _return_is_dict
= kwargs
.pop("return_is_str", return_is_dict
)
1300 for i
in range(1, _attempts
+ 1):
1302 _expected
= kwargs
.setdefault("expected", True)
1303 if _expected
is False:
1304 expected
= _expected
1305 kwargs
.pop("expected")
1306 ret
= func(*args
, **kwargs
)
1307 logger
.debug("Function returned %s", ret
)
1308 if _return_is_str
and isinstance(ret
, bool) and _expected
:
1311 isinstance(ret
, str) or isinstance(ret
, unicode)
1312 ) and _expected
is False:
1314 if _return_is_dict
and isinstance(ret
, dict):
1317 if _attempts
== i
and expected
:
1318 generate_support_bundle()
1320 except Exception as err
:
1321 if _attempts
== i
and expected
:
1322 generate_support_bundle()
1323 logger
.info("Max number of attempts (%r) reached", _attempts
)
1326 logger
.info("Function returned %s", err
)
1328 logger
.info("Retry [#%r] after sleeping for %ss" % (i
, _wait
))
1331 func_retry
._original
= func
1339 Prints step number for the test case step being executed
1344 def __call__(self
, msg
, reset
):
1349 logger
.info("STEP %s: '%s'", Stepper
.count
, msg
)
1353 def step(msg
, reset
=False):
1355 Call Stepper to print test steps. Need to reset at the beginning of test.
1356 * ` msg` : Step message body.
1357 * `reset` : Reset step count to 1 when set to True.
1363 #############################################
1364 # These APIs, will used by testcase
1365 #############################################
1366 def create_interfaces_cfg(tgen
, topo
, build
=False):
1368 Create interface configuration for created topology. Basic Interface
1369 configuration is provided in input json file.
1373 * `tgen` : Topogen object
1374 * `topo` : json file data
1375 * `build` : Only for initial setup phase this is set as True.
1382 topo
= deepcopy(topo
)
1385 for c_router
, c_data
in topo
.items():
1387 for destRouterLink
, data
in sorted(c_data
["links"].items()):
1388 # Loopback interfaces
1389 if "type" in data
and data
["type"] == "loopback":
1390 interface_name
= destRouterLink
1392 interface_name
= data
["interface"]
1394 # Include vrf if present
1396 interface_data
.append(
1397 "interface {} vrf {}".format(
1398 str(interface_name
), str(data
["vrf"])
1402 interface_data
.append("interface {}".format(str(interface_name
)))
1405 intf_addr
= c_data
["links"][destRouterLink
]["ipv4"]
1407 if "delete" in data
and data
["delete"]:
1408 interface_data
.append("no ip address {}".format(intf_addr
))
1410 interface_data
.append("ip address {}".format(intf_addr
))
1412 intf_addr
= c_data
["links"][destRouterLink
]["ipv6"]
1414 if "delete" in data
and data
["delete"]:
1415 interface_data
.append("no ipv6 address {}".format(intf_addr
))
1417 interface_data
.append("ipv6 address {}".format(intf_addr
))
1419 if "ipv6-link-local" in data
:
1420 intf_addr
= c_data
["links"][destRouterLink
]["ipv6-link-local"]
1422 if "delete" in data
and data
["delete"]:
1423 interface_data
.append("no ipv6 address {}".format(intf_addr
))
1425 interface_data
.append("ipv6 address {}\n".format(intf_addr
))
1428 ospf_data
= data
["ospf"]
1429 if "area" in ospf_data
:
1430 intf_ospf_area
= c_data
["links"][destRouterLink
]["ospf"]["area"]
1431 if "delete" in data
and data
["delete"]:
1432 interface_data
.append("no ip ospf area")
1434 interface_data
.append(
1435 "ip ospf area {}".format(intf_ospf_area
)
1438 if "hello_interval" in ospf_data
:
1439 intf_ospf_hello
= c_data
["links"][destRouterLink
]["ospf"][
1442 if "delete" in data
and data
["delete"]:
1443 interface_data
.append("no ip ospf " " hello-interval")
1445 interface_data
.append(
1446 "ip ospf " " hello-interval {}".format(intf_ospf_hello
)
1449 if "dead_interval" in ospf_data
:
1450 intf_ospf_dead
= c_data
["links"][destRouterLink
]["ospf"][
1453 if "delete" in data
and data
["delete"]:
1454 interface_data
.append("no ip ospf" " dead-interval")
1456 interface_data
.append(
1457 "ip ospf " " dead-interval {}".format(intf_ospf_dead
)
1460 if "network" in ospf_data
:
1461 intf_ospf_nw
= c_data
["links"][destRouterLink
]["ospf"][
1464 if "delete" in data
and data
["delete"]:
1465 interface_data
.append(
1466 "no ip ospf" " network {}".format(intf_ospf_nw
)
1469 interface_data
.append(
1470 "ip ospf" " network {}".format(intf_ospf_nw
)
1473 if "priority" in ospf_data
:
1474 intf_ospf_nw
= c_data
["links"][destRouterLink
]["ospf"][
1478 if "delete" in data
and data
["delete"]:
1479 interface_data
.append("no ip ospf" " priority")
1481 interface_data
.append(
1482 "ip ospf" " priority {}".format(intf_ospf_nw
)
1484 result
= create_common_configuration(
1485 tgen
, c_router
, interface_data
, "interface_config", build
=build
1487 except InvalidCLIError
:
1489 errormsg
= traceback
.format_exc()
1490 logger
.error(errormsg
)
1496 def create_static_routes(tgen
, input_dict
, build
=False):
1498 Create static routes for given router as defined in input_dict
1502 * `tgen` : Topogen object
1503 * `input_dict` : Input dict data, required when configuring from testcase
1504 * `build` : Only for initial setup phase this is set as True.
1508 input_dict should be in the format below:
1509 # static_routes: list of all routes
1510 # network: network address
1511 # no_of_ip: number of next-hop address that will be configured
1512 # admin_distance: admin distance for route/routes.
1513 # next_hop: starting next-hop address
1514 # tag: tag id for static routes
1515 # vrf: VRF name in which static routes needs to be created
1516 # delete: True if config to be removed. Default False.
1523 "network": "100.0.20.1/32",
1525 "admin_distance": 100,
1526 "next_hop": "10.0.0.1",
1537 errormsg(str) or True
1540 logger
.debug("Entering lib API: create_static_routes()")
1541 input_dict
= deepcopy(input_dict
)
1544 for router
in input_dict
.keys():
1545 if "static_routes" not in input_dict
[router
]:
1546 errormsg
= "static_routes not present in input_dict"
1547 logger
.info(errormsg
)
1550 static_routes_list
= []
1552 static_routes
= input_dict
[router
]["static_routes"]
1553 for static_route
in static_routes
:
1554 del_action
= static_route
.setdefault("delete", False)
1555 no_of_ip
= static_route
.setdefault("no_of_ip", 1)
1556 network
= static_route
.setdefault("network", [])
1557 if type(network
) is not list:
1560 admin_distance
= static_route
.setdefault("admin_distance", None)
1561 tag
= static_route
.setdefault("tag", None)
1562 vrf
= static_route
.setdefault("vrf", None)
1563 interface
= static_route
.setdefault("interface", None)
1564 next_hop
= static_route
.setdefault("next_hop", None)
1565 nexthop_vrf
= static_route
.setdefault("nexthop_vrf", None)
1567 ip_list
= generate_ips(network
, no_of_ip
)
1569 addr_type
= validate_ip_address(ip
)
1571 if addr_type
== "ipv4":
1572 cmd
= "ip route {}".format(ip
)
1574 cmd
= "ipv6 route {}".format(ip
)
1577 cmd
= "{} {}".format(cmd
, interface
)
1580 cmd
= "{} {}".format(cmd
, next_hop
)
1583 cmd
= "{} nexthop-vrf {}".format(cmd
, nexthop_vrf
)
1586 cmd
= "{} vrf {}".format(cmd
, vrf
)
1589 cmd
= "{} tag {}".format(cmd
, str(tag
))
1592 cmd
= "{} {}".format(cmd
, admin_distance
)
1595 cmd
= "no {}".format(cmd
)
1597 static_routes_list
.append(cmd
)
1599 result
= create_common_configuration(
1600 tgen
, router
, static_routes_list
, "static_route", build
=build
1603 except InvalidCLIError
:
1605 errormsg
= traceback
.format_exc()
1606 logger
.error(errormsg
)
1609 logger
.debug("Exiting lib API: create_static_routes()")
1613 def create_prefix_lists(tgen
, input_dict
, build
=False):
1615 Create ip prefix lists as per the config provided in input
1619 * `tgen` : Topogen object
1620 * `input_dict` : Input dict data, required when configuring from testcase
1621 * `build` : Only for initial setup phase this is set as True.
1624 # pf_lists_1: name of prefix-list, user defined
1625 # seqid: prefix-list seqid, auto-generated if not given by user
1626 # network: criteria for applying prefix-list
1627 # action: permit/deny
1628 # le: less than or equal number of bits
1629 # ge: greater than or equal number of bits
1655 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1658 for router
in input_dict
.keys():
1659 if "prefix_lists" not in input_dict
[router
]:
1660 errormsg
= "prefix_lists not present in input_dict"
1661 logger
.debug(errormsg
)
1665 prefix_lists
= input_dict
[router
]["prefix_lists"]
1666 for addr_type
, prefix_data
in prefix_lists
.items():
1667 if not check_address_types(addr_type
):
1670 for prefix_name
, prefix_list
in prefix_data
.items():
1671 for prefix_dict
in prefix_list
:
1672 if "action" not in prefix_dict
or "network" not in prefix_dict
:
1673 errormsg
= "'action' or network' missing in" " input_dict"
1676 network_addr
= prefix_dict
["network"]
1677 action
= prefix_dict
["action"]
1678 le
= prefix_dict
.setdefault("le", None)
1679 ge
= prefix_dict
.setdefault("ge", None)
1680 seqid
= prefix_dict
.setdefault("seqid", None)
1681 del_action
= prefix_dict
.setdefault("delete", False)
1683 seqid
= get_seq_id("prefix_lists", router
, prefix_name
)
1685 set_seq_id("prefix_lists", router
, seqid
, prefix_name
)
1687 if addr_type
== "ipv4":
1692 cmd
= "{} prefix-list {} seq {} {} {}".format(
1693 protocol
, prefix_name
, seqid
, action
, network_addr
1696 cmd
= "{} le {}".format(cmd
, le
)
1698 cmd
= "{} ge {}".format(cmd
, ge
)
1701 cmd
= "no {}".format(cmd
)
1703 config_data
.append(cmd
)
1704 result
= create_common_configuration(
1705 tgen
, router
, config_data
, "prefix_list", build
=build
1708 except InvalidCLIError
:
1710 errormsg
= traceback
.format_exc()
1711 logger
.error(errormsg
)
1714 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1718 def create_route_maps(tgen
, input_dict
, build
=False):
1720 Create route-map on the devices as per the arguments passed
1723 * `tgen` : Topogen object
1724 * `input_dict` : Input dict data, required when configuring from testcase
1725 * `build` : Only for initial setup phase this is set as True.
1728 # route_maps: key, value pair for route-map name and its attribute
1729 # rmap_match_prefix_list_1: user given name for route-map
1730 # action: PERMIT/DENY
1731 # match: key,value pair for match criteria. prefix_list, community-list,
1732 large-community-list or tag. Only one option at a time.
1733 # prefix_list: name of prefix list
1734 # large-community-list: name of large community list
1735 # community-ist: name of community list
1736 # tag: tag id for static routes
1737 # set: key, value pair for modifying route attributes
1738 # localpref: preference value for the network
1739 # med: metric value advertised for AS
1740 # aspath: set AS path value
1741 # weight: weight for the route
1742 # community: standard community value to be attached
1743 # large_community: large community value to be attached
1744 # community_additive: if set to "additive", adds community/large-community
1745 value to the existing values of the network prefix
1751 "rmap_match_prefix_list_1": [
1756 "prefix_list": "pf_list_1"
1759 "prefix_list": "pf_list_1"
1761 "large-community-list": {
1762 "id": "community_1",
1766 "id": "community_2",
1776 "action": "prepend",
1783 "large_community": {
1784 "num": "1:2:3 4:5;6",
1795 errormsg(str) or True
1799 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1800 input_dict
= deepcopy(input_dict
)
1802 for router
in input_dict
.keys():
1803 if "route_maps" not in input_dict
[router
]:
1804 logger
.debug("route_maps not present in input_dict")
1807 for rmap_name
, rmap_value
in input_dict
[router
]["route_maps"].items():
1809 for rmap_dict
in rmap_value
:
1810 del_action
= rmap_dict
.setdefault("delete", False)
1813 rmap_data
.append("no route-map {}".format(rmap_name
))
1816 if "action" not in rmap_dict
:
1817 errormsg
= "action not present in input_dict"
1818 logger
.error(errormsg
)
1821 rmap_action
= rmap_dict
.setdefault("action", "deny")
1823 seq_id
= rmap_dict
.setdefault("seq_id", None)
1825 seq_id
= get_seq_id("route_maps", router
, rmap_name
)
1827 set_seq_id("route_maps", router
, seq_id
, rmap_name
)
1830 "route-map {} {} {}".format(rmap_name
, rmap_action
, seq_id
)
1833 if "continue" in rmap_dict
:
1834 continue_to
= rmap_dict
["continue"]
1836 rmap_data
.append("on-match goto {}".format(continue_to
))
1839 "In continue, 'route-map entry "
1840 "sequence number' is not provided"
1844 if "goto" in rmap_dict
:
1845 go_to
= rmap_dict
["goto"]
1847 rmap_data
.append("on-match goto {}".format(go_to
))
1850 "In goto, 'Goto Clause number' is not" " provided"
1854 if "call" in rmap_dict
:
1855 call_rmap
= rmap_dict
["call"]
1857 rmap_data
.append("call {}".format(call_rmap
))
1860 "In call, 'destination Route-Map' is" " not provided"
1864 # Verifying if SET criteria is defined
1865 if "set" in rmap_dict
:
1866 set_data
= rmap_dict
["set"]
1867 ipv4_data
= set_data
.setdefault("ipv4", {})
1868 ipv6_data
= set_data
.setdefault("ipv6", {})
1869 local_preference
= set_data
.setdefault("locPrf", None)
1870 metric
= set_data
.setdefault("metric", None)
1871 as_path
= set_data
.setdefault("path", {})
1872 weight
= set_data
.setdefault("weight", None)
1873 community
= set_data
.setdefault("community", {})
1874 large_community
= set_data
.setdefault("large_community", {})
1875 large_comm_list
= set_data
.setdefault("large_comm_list", {})
1876 set_action
= set_data
.setdefault("set_action", None)
1877 nexthop
= set_data
.setdefault("nexthop", None)
1878 origin
= set_data
.setdefault("origin", None)
1879 ext_comm_list
= set_data
.setdefault("extcommunity", {})
1882 if local_preference
:
1884 "set local-preference {}".format(local_preference
)
1889 rmap_data
.append("set metric {} \n".format(metric
))
1893 rmap_data
.append("set origin {} \n".format(origin
))
1897 as_num
= as_path
.setdefault("as_num", None)
1898 as_action
= as_path
.setdefault("as_action", None)
1899 if as_action
and as_num
:
1901 "set as-path {} {}".format(as_action
, as_num
)
1906 num
= community
.setdefault("num", None)
1907 comm_action
= community
.setdefault("action", None)
1909 cmd
= "set community {}".format(num
)
1911 cmd
= "{} {}".format(cmd
, comm_action
)
1912 rmap_data
.append(cmd
)
1914 logger
.error("In community, AS Num not" " provided")
1918 num
= large_community
.setdefault("num", None)
1919 comm_action
= large_community
.setdefault("action", None)
1921 cmd
= "set large-community {}".format(num
)
1923 cmd
= "{} {}".format(cmd
, comm_action
)
1925 rmap_data
.append(cmd
)
1928 "In large_community, AS Num not" " provided"
1932 id = large_comm_list
.setdefault("id", None)
1933 del_comm
= large_comm_list
.setdefault("delete", None)
1935 cmd
= "set large-comm-list {}".format(id)
1937 cmd
= "{} delete".format(cmd
)
1939 rmap_data
.append(cmd
)
1941 logger
.error("In large_comm_list 'id' not" " provided")
1945 rt
= ext_comm_list
.setdefault("rt", None)
1946 del_comm
= ext_comm_list
.setdefault("delete", None)
1948 cmd
= "set extcommunity rt {}".format(rt
)
1950 cmd
= "{} delete".format(cmd
)
1952 rmap_data
.append(cmd
)
1954 logger
.debug("In ext_comm_list 'rt' not" " provided")
1959 rmap_data
.append("set weight {}".format(weight
))
1961 nexthop
= ipv6_data
.setdefault("nexthop", None)
1963 rmap_data
.append("set ipv6 next-hop {}".format(nexthop
))
1965 # Adding MATCH and SET sequence to RMAP if defined
1966 if "match" in rmap_dict
:
1967 match_data
= rmap_dict
["match"]
1968 ipv4_data
= match_data
.setdefault("ipv4", {})
1969 ipv6_data
= match_data
.setdefault("ipv6", {})
1970 community
= match_data
.setdefault("community_list", {})
1971 large_community
= match_data
.setdefault("large_community", {})
1972 large_community_list
= match_data
.setdefault(
1973 "large_community_list", {}
1976 metric
= match_data
.setdefault("metric", None)
1977 source_vrf
= match_data
.setdefault("source-vrf", None)
1980 # fetch prefix list data from rmap
1981 prefix_name
= ipv4_data
.setdefault("prefix_lists", None)
1985 " prefix-list {}".format(prefix_name
)
1988 # fetch tag data from rmap
1989 tag
= ipv4_data
.setdefault("tag", None)
1991 rmap_data
.append("match tag {}".format(tag
))
1993 # fetch large community data from rmap
1994 large_community_list
= ipv4_data
.setdefault(
1995 "large_community_list", {}
1997 large_community
= match_data
.setdefault(
1998 "large_community", {}
2002 prefix_name
= ipv6_data
.setdefault("prefix_lists", None)
2005 "match ipv6 address"
2006 " prefix-list {}".format(prefix_name
)
2009 # fetch tag data from rmap
2010 tag
= ipv6_data
.setdefault("tag", None)
2012 rmap_data
.append("match tag {}".format(tag
))
2014 # fetch large community data from rmap
2015 large_community_list
= ipv6_data
.setdefault(
2016 "large_community_list", {}
2018 large_community
= match_data
.setdefault(
2019 "large_community", {}
2023 if "id" not in community
:
2025 "'id' is mandatory for "
2026 "community-list in match"
2030 cmd
= "match community {}".format(community
["id"])
2031 exact_match
= community
.setdefault("exact_match", False)
2033 cmd
= "{} exact-match".format(cmd
)
2035 rmap_data
.append(cmd
)
2037 if "id" not in large_community
:
2039 "'id' is mandatory for "
2040 "large-community-list in match "
2044 cmd
= "match large-community {}".format(
2045 large_community
["id"]
2047 exact_match
= large_community
.setdefault(
2048 "exact_match", False
2051 cmd
= "{} exact-match".format(cmd
)
2052 rmap_data
.append(cmd
)
2053 if large_community_list
:
2054 if "id" not in large_community_list
:
2056 "'id' is mandatory for "
2057 "large-community-list in match "
2061 cmd
= "match large-community {}".format(
2062 large_community_list
["id"]
2064 exact_match
= large_community_list
.setdefault(
2065 "exact_match", False
2068 cmd
= "{} exact-match".format(cmd
)
2069 rmap_data
.append(cmd
)
2072 cmd
= "match source-vrf {}".format(source_vrf
)
2073 rmap_data
.append(cmd
)
2076 cmd
= "match metric {}".format(metric
)
2077 rmap_data
.append(cmd
)
2079 result
= create_common_configuration(
2080 tgen
, router
, rmap_data
, "route_maps", build
=build
2083 except InvalidCLIError
:
2085 errormsg
= traceback
.format_exc()
2086 logger
.error(errormsg
)
2089 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2093 def delete_route_maps(tgen
, input_dict
):
2095 Delete ip route maps from device
2096 * `tgen` : Topogen object
2097 * `input_dict` : for which router,
2098 route map has to be deleted
2101 # Delete route-map rmap_1 and rmap_2 from router r1
2104 "route_maps": ["rmap_1", "rmap__2"]
2107 result = delete_route_maps("ipv4", input_dict)
2110 errormsg(str) or True
2112 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2114 for router
in input_dict
.keys():
2115 route_maps
= input_dict
[router
]["route_maps"][:]
2116 rmap_data
= input_dict
[router
]
2117 rmap_data
["route_maps"] = {}
2118 for route_map_name
in route_maps
:
2119 rmap_data
["route_maps"].update({route_map_name
: [{"delete": True}]})
2121 return create_route_maps(tgen
, input_dict
)
2124 def create_bgp_community_lists(tgen
, input_dict
, build
=False):
2126 Create bgp community-list or large-community-list on the devices as per
2127 the arguments passed. Takes list of communities in input.
2130 * `tgen` : Topogen object
2131 * `input_dict` : Input dict data, required when configuring from testcase
2132 * `build` : Only for initial setup phase this is set as True.
2137 "bgp_community_lists": [
2139 "community_type": "standard",
2141 "name": "rmap_lcomm_{}".format(addr_type),
2142 "value": "1:1:1 1:2:3 2:1:1 2:2:2",
2149 result = create_bgp_community_lists(tgen, input_dict_1)
2153 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2154 input_dict
= deepcopy(input_dict
)
2156 for router
in input_dict
.keys():
2157 if "bgp_community_lists" not in input_dict
[router
]:
2158 errormsg
= "bgp_community_lists not present in input_dict"
2159 logger
.debug(errormsg
)
2164 community_list
= input_dict
[router
]["bgp_community_lists"]
2165 for community_dict
in community_list
:
2166 del_action
= community_dict
.setdefault("delete", False)
2167 community_type
= community_dict
.setdefault("community_type", None)
2168 action
= community_dict
.setdefault("action", None)
2169 value
= community_dict
.setdefault("value", "")
2170 large
= community_dict
.setdefault("large", None)
2171 name
= community_dict
.setdefault("name", None)
2173 cmd
= "bgp large-community-list"
2175 cmd
= "bgp community-list"
2177 if not large
and not (community_type
and action
and value
):
2179 "community_type, action and value are "
2180 "required in bgp_community_list"
2182 logger
.error(errormsg
)
2186 community_type
= int(community_type
)
2187 cmd
= "{} {} {} {}".format(cmd
, community_type
, action
, value
)
2190 cmd
= "{} {} {} {} {}".format(
2191 cmd
, community_type
, name
, action
, value
2195 cmd
= "no {}".format(cmd
)
2197 config_data
.append(cmd
)
2199 result
= create_common_configuration(
2200 tgen
, router
, config_data
, "bgp_community_list", build
=build
2203 except InvalidCLIError
:
2205 errormsg
= traceback
.format_exc()
2206 logger
.error(errormsg
)
2209 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2213 def shutdown_bringup_interface(tgen
, dut
, intf_name
, ifaceaction
=False):
2215 Shutdown or bringup router's interface "
2216 * `tgen` : Topogen object
2217 * `dut` : Device under test
2218 * `intf_name` : Interface name to be shut/no shut
2219 * `ifaceaction` : Action, to shut/no shut interface,
2225 # Shut down ineterface
2226 shutdown_bringup_interface(tgen, dut, intf, False)
2227 # Bring up ineterface
2228 shutdown_bringup_interface(tgen, dut, intf, True)
2231 errormsg(str) or True
2234 router_list
= tgen
.routers()
2236 logger
.info("Bringing up interface : {}".format(intf_name
))
2238 logger
.info("Shutting down interface : {}".format(intf_name
))
2240 interface_set_status(router_list
[dut
], intf_name
, ifaceaction
)
2244 tgen
, router
, intf
, group_addr_range
, next_hop
=None, src
=None, del_action
=None
2251 * `tgen` : Topogen object
2252 * `router`: router for which kernal routes needs to be added
2253 * `intf`: interface name, for which kernal routes needs to be added
2254 * `bindToAddress`: bind to <host>, an interface or multicast
2262 logger
.debug("Entering lib API: addKernelRoute()")
2264 rnode
= tgen
.routers()[router
]
2266 if type(group_addr_range
) is not list:
2267 group_addr_range
= [group_addr_range
]
2269 for grp_addr
in group_addr_range
:
2271 addr_type
= validate_ip_address(grp_addr
)
2272 if addr_type
== "ipv4":
2273 if next_hop
is not None:
2274 cmd
= "ip route add {} via {}".format(grp_addr
, next_hop
)
2276 cmd
= "ip route add {} dev {}".format(grp_addr
, intf
)
2278 cmd
= "ip route del {}".format(grp_addr
)
2279 verify_cmd
= "ip route"
2280 elif addr_type
== "ipv6":
2282 cmd
= "ip -6 route add {} dev {} src {}".format(grp_addr
, intf
, src
)
2284 cmd
= "ip -6 route add {} via {}".format(grp_addr
, next_hop
)
2285 verify_cmd
= "ip -6 route"
2287 cmd
= "ip -6 route del {}".format(grp_addr
)
2289 logger
.info("[DUT: {}]: Running command: [{}]".format(router
, cmd
))
2290 output
= rnode
.run(cmd
)
2292 # Verifying if ip route added to kernal
2293 result
= rnode
.run(verify_cmd
)
2294 logger
.debug("{}\n{}".format(verify_cmd
, result
))
2296 ip
, mask
= grp_addr
.split("/")
2297 if mask
== "32" or mask
== "128":
2300 if not re_search(r
"{}".format(grp_addr
), result
) and mask
!= "0":
2302 "[DUT: {}]: Kernal route is not added for group"
2303 " address {} Config output: {}".format(router
, grp_addr
, output
)
2308 logger
.debug("Exiting lib API: addKernelRoute()")
2312 def configure_vxlan(tgen
, input_dict
):
2314 Add and configure vxlan
2316 * `tgen`: tgen onject
2317 * `input_dict` : data for vxlan config
2324 "vxlan_name": "vxlan75100",
2325 "vxlan_id": "75100",
2327 "local_addr": "120.0.0.1",
2334 configure_vxlan(tgen, input_dict)
2342 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2344 router_list
= tgen
.routers()
2345 for dut
in input_dict
.keys():
2346 rnode
= tgen
.routers()[dut
]
2348 if "vxlan" in input_dict
[dut
]:
2349 for vxlan_dict
in input_dict
[dut
]["vxlan"]:
2352 del_vxlan
= vxlan_dict
.setdefault("delete", None)
2353 vxlan_names
= vxlan_dict
.setdefault("vxlan_name", [])
2354 vxlan_ids
= vxlan_dict
.setdefault("vxlan_id", [])
2355 dstport
= vxlan_dict
.setdefault("dstport", None)
2356 local_addr
= vxlan_dict
.setdefault("local_addr", None)
2357 learning
= vxlan_dict
.setdefault("learning", None)
2360 if vxlan_names
and vxlan_ids
:
2361 for vxlan_name
, vxlan_id
in zip(vxlan_names
, vxlan_ids
):
2365 cmd
= "{} del {} type vxlan id {}".format(
2366 cmd
, vxlan_name
, vxlan_id
2369 cmd
= "{} add {} type vxlan id {}".format(
2370 cmd
, vxlan_name
, vxlan_id
2374 cmd
= "{} dstport {}".format(cmd
, dstport
)
2377 ip_cmd
= "ip addr add {} dev {}".format(
2378 local_addr
, vxlan_name
2381 ip_cmd
= "ip addr del {} dev {}".format(
2382 local_addr
, vxlan_name
2385 config_data
.append(ip_cmd
)
2387 cmd
= "{} local {}".format(cmd
, local_addr
)
2389 if learning
== "no":
2390 cmd
= "{} nolearning".format(cmd
)
2392 elif learning
== "yes":
2393 cmd
= "{} learning".format(cmd
)
2395 config_data
.append(cmd
)
2398 for _cmd
in config_data
:
2399 logger
.info("[DUT: %s]: Running command: %s", dut
, _cmd
)
2402 except InvalidCLIError
:
2404 errormsg
= traceback
.format_exc()
2405 logger
.error(errormsg
)
2408 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2413 def configure_brctl(tgen
, topo
, input_dict
):
2415 Add and configure brctl
2417 * `tgen`: tgen onject
2418 * `input_dict` : data for brctl config
2425 "brctl_name": "br100",
2426 "addvxlan": "vxlan75100",
2433 configure_brctl(tgen, topo, input_dict)
2441 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2443 router_list
= tgen
.routers()
2444 for dut
in input_dict
.keys():
2445 rnode
= tgen
.routers()[dut
]
2447 if "brctl" in input_dict
[dut
]:
2448 for brctl_dict
in input_dict
[dut
]["brctl"]:
2450 brctl_names
= brctl_dict
.setdefault("brctl_name", [])
2451 addvxlans
= brctl_dict
.setdefault("addvxlan", [])
2452 stp_values
= brctl_dict
.setdefault("stp", [])
2453 vrfs
= brctl_dict
.setdefault("vrf", [])
2455 ip_cmd
= "ip link set"
2456 for brctl_name
, vxlan
, vrf
, stp
in zip(
2457 brctl_names
, addvxlans
, vrfs
, stp_values
2461 cmd
= "ip link add name {} type bridge stp_state {}".format(
2465 logger
.info("[DUT: %s]: Running command: %s", dut
, cmd
)
2468 ip_cmd_list
.append("{} up dev {}".format(ip_cmd
, brctl_name
))
2471 cmd
= "{} dev {} master {}".format(ip_cmd
, vxlan
, brctl_name
)
2473 logger
.info("[DUT: %s]: Running command: %s", dut
, cmd
)
2476 ip_cmd_list
.append("{} up dev {}".format(ip_cmd
, vxlan
))
2480 "{} dev {} master {}".format(ip_cmd
, brctl_name
, vrf
)
2483 for intf_name
, data
in topo
["routers"][dut
]["links"].items():
2484 if "vrf" not in data
:
2487 if data
["vrf"] == vrf
:
2489 "{} up dev {}".format(ip_cmd
, data
["interface"])
2493 for _ip_cmd
in ip_cmd_list
:
2494 logger
.info("[DUT: %s]: Running command: %s", dut
, _ip_cmd
)
2497 except InvalidCLIError
:
2499 errormsg
= traceback
.format_exc()
2500 logger
.error(errormsg
)
2503 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2507 def configure_interface_mac(tgen
, input_dict
):
2509 Add and configure brctl
2511 * `tgen`: tgen onject
2512 * `input_dict` : data for mac config
2516 "br75100": "00:80:48:BA:d1:00,
2517 "br75200": "00:80:48:BA:d1:00
2521 configure_interface_mac(tgen, input_mac)
2529 router_list
= tgen
.routers()
2530 for dut
in input_dict
.keys():
2531 rnode
= tgen
.routers()[dut
]
2533 for intf
, mac
in input_dict
[dut
].items():
2534 cmd
= "ifconfig {} hw ether {}".format(intf
, mac
)
2535 logger
.info("[DUT: %s]: Running command: %s", dut
, cmd
)
2538 result
= rnode
.run(cmd
)
2539 if len(result
) != 0:
2542 except InvalidCLIError
:
2544 errormsg
= traceback
.format_exc()
2545 logger
.error(errormsg
)
2551 #############################################
2553 #############################################
2554 @retry(attempts
=6, wait
=2, return_is_str
=True)
2568 Data will be read from input_dict or input JSON file, API will generate
2569 same prefixes, which were redistributed by either create_static_routes() or
2570 advertise_networks_using_network_command() and do will verify next_hop and
2571 each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
2576 * `tgen` : topogen object
2577 * `addr_type` : ip type, ipv4/ipv6
2578 * `dut`: Device Under Test, for which user wants to test the data
2579 * `input_dict` : input dict, has details of static routes
2580 * `next_hop`[optional]: next_hop which needs to be verified,
2582 * `protocol`[optional]: protocol, default = None
2583 * `count_only`[optional]: count of nexthops only, not specific addresses,
2588 # RIB can be verified for static routes OR network advertised using
2589 network command. Following are input_dicts to create static routes
2590 and advertise networks using network command. Any one of the input_dict
2591 can be passed to verify_rib() to verify routes in DUT"s RIB.
2593 # Creating static routes for r1
2596 "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
2597 "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
2599 # Advertising networks using network command in router r1
2602 "advertise_networks": [{"start_ip": "20.0.0.0/32",
2603 "no_of_network": 10},
2604 {"start_ip": "30.0.0.0/32"}]
2606 # Verifying ipv4 routes in router r1 learned via BGP
2609 result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
2613 errormsg(str) or True
2616 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2618 router_list
= tgen
.routers()
2619 additional_nexthops_in_required_nhs
= []
2621 for routerInput
in input_dict
.keys():
2622 for router
, rnode
in router_list
.items():
2626 logger
.info("Checking router %s RIB:", router
)
2628 # Verifying RIB routes
2629 if addr_type
== "ipv4":
2630 command
= "show ip route"
2632 command
= "show ipv6 route"
2637 if "static_routes" in input_dict
[routerInput
]:
2638 static_routes
= input_dict
[routerInput
]["static_routes"]
2640 for static_route
in static_routes
:
2641 if "vrf" in static_route
and static_route
["vrf"] is not None:
2644 "[DUT: {}]: Verifying routes for VRF:"
2645 " {}".format(router
, static_route
["vrf"])
2648 cmd
= "{} vrf {}".format(command
, static_route
["vrf"])
2651 cmd
= "{}".format(command
)
2654 cmd
= "{} {}".format(cmd
, protocol
)
2656 cmd
= "{} json".format(cmd
)
2658 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2660 # Verifying output dictionary rib_routes_json is not empty
2661 if bool(rib_routes_json
) is False:
2662 errormsg
= "No route found in rib of router {}..".format(router
)
2665 network
= static_route
["network"]
2666 if "no_of_ip" in static_route
:
2667 no_of_ip
= static_route
["no_of_ip"]
2671 if "tag" in static_route
:
2672 _tag
= static_route
["tag"]
2676 # Generating IPs for verification
2677 ip_list
= generate_ips(network
, no_of_ip
)
2681 for st_rt
in ip_list
:
2682 st_rt
= str(ipaddress
.ip_network(frr_unicode(st_rt
)))
2684 _addr_type
= validate_ip_address(st_rt
)
2685 if _addr_type
!= addr_type
:
2688 if st_rt
in rib_routes_json
:
2690 found_routes
.append(st_rt
)
2692 if fib
and next_hop
:
2693 if type(next_hop
) is not list:
2694 next_hop
= [next_hop
]
2696 for mnh
in range(0, len(rib_routes_json
[st_rt
])):
2699 in rib_routes_json
[st_rt
][mnh
]["nexthops"][0]
2704 for rib_r
in rib_routes_json
[st_rt
][
2711 missing_list_of_nexthops
= set(
2713 ).difference(next_hop
)
2714 additional_nexthops_in_required_nhs
= set(
2716 ).difference(found_hops
[0])
2718 if additional_nexthops_in_required_nhs
:
2721 "%s is not active for route %s in "
2722 "RIB of router %s\n",
2723 additional_nexthops_in_required_nhs
,
2728 "Nexthop {} is not active"
2729 " for route {} in RIB of router"
2731 additional_nexthops_in_required_nhs
,
2740 elif next_hop
and fib
is None:
2741 if type(next_hop
) is not list:
2742 next_hop
= [next_hop
]
2745 for rib_r
in rib_routes_json
[st_rt
][0]["nexthops"]
2748 # Check only the count of nexthops
2750 if len(next_hop
) == len(found_hops
):
2754 "Nexthops are missing for "
2755 "route {} in RIB of router {}: "
2756 "expected {}, found {}\n".format(
2765 # Check the actual nexthops
2767 missing_list_of_nexthops
= set(
2769 ).difference(next_hop
)
2770 additional_nexthops_in_required_nhs
= set(
2772 ).difference(found_hops
)
2774 if additional_nexthops_in_required_nhs
:
2776 "Missing nexthop %s for route"
2777 " %s in RIB of router %s\n",
2778 additional_nexthops_in_required_nhs
,
2783 "Nexthop {} is Missing for "
2784 "route {} in RIB of router {}\n".format(
2785 additional_nexthops_in_required_nhs
,
2795 if "tag" not in rib_routes_json
[st_rt
][0]:
2797 "[DUT: {}]: tag is not"
2799 " route {} in RIB \n".format(dut
, st_rt
)
2803 if _tag
!= rib_routes_json
[st_rt
][0]["tag"]:
2805 "[DUT: {}]: tag value {}"
2806 " is not matched for"
2807 " route {} in RIB \n".format(
2815 if metric
is not None:
2816 if "metric" not in rib_routes_json
[st_rt
][0]:
2818 "[DUT: {}]: metric is"
2820 " route {} in RIB \n".format(dut
, st_rt
)
2824 if metric
!= rib_routes_json
[st_rt
][0]["metric"]:
2826 "[DUT: {}]: metric value "
2827 "{} is not matched for "
2828 "route {} in RIB \n".format(
2837 missing_routes
.append(st_rt
)
2841 "[DUT: {}]: Found next_hop {} for all bgp"
2842 " routes in RIB".format(router
, next_hop
)
2845 if len(missing_routes
) > 0:
2846 errormsg
= "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
2853 "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
2860 if "bgp" in input_dict
[routerInput
]:
2862 "advertise_networks"
2863 not in input_dict
[routerInput
]["bgp"]["address_family"][addr_type
][
2871 advertise_network
= input_dict
[routerInput
]["bgp"]["address_family"][
2873 ]["unicast"]["advertise_networks"]
2875 # Continue if there are no network advertise
2876 if len(advertise_network
) == 0:
2879 for advertise_network_dict
in advertise_network
:
2880 if "vrf" in advertise_network_dict
:
2881 cmd
= "{} vrf {} json".format(
2882 command
, advertise_network_dict
["vrf"]
2885 cmd
= "{} json".format(command
)
2887 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2889 # Verifying output dictionary rib_routes_json is not empty
2890 if bool(rib_routes_json
) is False:
2891 errormsg
= "No route found in rib of router {}..".format(router
)
2894 start_ip
= advertise_network_dict
["network"]
2895 if "no_of_network" in advertise_network_dict
:
2896 no_of_network
= advertise_network_dict
["no_of_network"]
2900 # Generating IPs for verification
2901 ip_list
= generate_ips(start_ip
, no_of_network
)
2905 for st_rt
in ip_list
:
2906 st_rt
= str(ipaddress
.ip_network(frr_unicode(st_rt
)))
2908 _addr_type
= validate_ip_address(st_rt
)
2909 if _addr_type
!= addr_type
:
2912 if st_rt
in rib_routes_json
:
2914 found_routes
.append(st_rt
)
2917 if type(next_hop
) is not list:
2918 next_hop
= [next_hop
]
2922 for nh_dict
in rib_routes_json
[st_rt
][0]["nexthops"]:
2923 if nh_dict
["ip"] != nh
:
2928 if count
== len(next_hop
):
2932 "Nexthop {} is Missing"
2934 "RIB of router {}\n".format(next_hop
, st_rt
, dut
)
2938 missing_routes
.append(st_rt
)
2942 "Found next_hop {} for all routes in RIB"
2943 " of router {}\n".format(next_hop
, dut
)
2946 if len(missing_routes
) > 0:
2948 "Missing {} route in RIB of router {}, "
2949 "routes: {} \n".format(addr_type
, dut
, missing_routes
)
2955 "Verified {} routes in router {} RIB, found"
2956 " routes are: {}\n".format(addr_type
, dut
, found_routes
)
2959 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2963 @retry(attempts
=6, wait
=2, return_is_str
=True)
2964 def verify_fib_routes(tgen
, addr_type
, dut
, input_dict
, next_hop
=None):
2966 Data will be read from input_dict or input JSON file, API will generate
2967 same prefixes, which were redistributed by either create_static_routes() or
2968 advertise_networks_using_network_command() and will verify next_hop and
2969 each prefix/routes is present in "show ip/ipv6 fib json"
2974 * `tgen` : topogen object
2975 * `addr_type` : ip type, ipv4/ipv6
2976 * `dut`: Device Under Test, for which user wants to test the data
2977 * `input_dict` : input dict, has details of static routes
2978 * `next_hop`[optional]: next_hop which needs to be verified,
2986 "network": ["1.1.1.1/32],
2987 "next_hop": "Null0",
2992 result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1)
2996 errormsg(str) or True
2999 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3001 router_list
= tgen
.routers()
3002 for routerInput
in input_dict
.keys():
3003 for router
, rnode
in router_list
.items():
3007 logger
.info("Checking router %s FIB routes:", router
)
3009 # Verifying RIB routes
3010 if addr_type
== "ipv4":
3011 command
= "show ip fib"
3013 command
= "show ipv6 fib"
3018 if "static_routes" in input_dict
[routerInput
]:
3019 static_routes
= input_dict
[routerInput
]["static_routes"]
3021 for static_route
in static_routes
:
3022 if "vrf" in static_route
and static_route
["vrf"] is not None:
3025 "[DUT: {}]: Verifying routes for VRF:"
3026 " {}".format(router
, static_route
["vrf"])
3029 cmd
= "{} vrf {}".format(command
, static_route
["vrf"])
3032 cmd
= "{}".format(command
)
3034 cmd
= "{} json".format(cmd
)
3036 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
3038 # Verifying output dictionary rib_routes_json is not empty
3039 if bool(rib_routes_json
) is False:
3040 errormsg
= "[DUT: {}]: No route found in fib".format(router
)
3043 network
= static_route
["network"]
3044 if "no_of_ip" in static_route
:
3045 no_of_ip
= static_route
["no_of_ip"]
3049 # Generating IPs for verification
3050 ip_list
= generate_ips(network
, no_of_ip
)
3054 for st_rt
in ip_list
:
3055 st_rt
= str(ipaddress
.ip_network(frr_unicode(st_rt
)))
3057 _addr_type
= validate_ip_address(st_rt
)
3058 if _addr_type
!= addr_type
:
3061 if st_rt
in rib_routes_json
:
3063 found_routes
.append(st_rt
)
3066 if type(next_hop
) is not list:
3067 next_hop
= [next_hop
]
3071 for nh_dict
in rib_routes_json
[st_rt
][0][
3074 if nh_dict
["ip"] != nh
:
3079 if count
== len(next_hop
):
3082 missing_routes
.append(st_rt
)
3084 "Nexthop {} is Missing"
3086 "RIB of router {}\n".format(
3087 next_hop
, st_rt
, dut
3093 missing_routes
.append(st_rt
)
3095 if len(missing_routes
) > 0:
3096 errormsg
= "[DUT: {}]: Missing route in FIB:" " {}".format(
3103 "Found next_hop {} for all routes in RIB"
3104 " of router {}\n".format(next_hop
, dut
)
3109 "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n",
3116 if "bgp" in input_dict
[routerInput
]:
3118 "advertise_networks"
3119 not in input_dict
[routerInput
]["bgp"]["address_family"][addr_type
][
3127 advertise_network
= input_dict
[routerInput
]["bgp"]["address_family"][
3129 ]["unicast"]["advertise_networks"]
3131 # Continue if there are no network advertise
3132 if len(advertise_network
) == 0:
3135 for advertise_network_dict
in advertise_network
:
3136 if "vrf" in advertise_network_dict
:
3137 cmd
= "{} vrf {} json".format(command
, static_route
["vrf"])
3139 cmd
= "{} json".format(command
)
3141 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
3143 # Verifying output dictionary rib_routes_json is not empty
3144 if bool(rib_routes_json
) is False:
3145 errormsg
= "No route found in rib of router {}..".format(router
)
3148 start_ip
= advertise_network_dict
["network"]
3149 if "no_of_network" in advertise_network_dict
:
3150 no_of_network
= advertise_network_dict
["no_of_network"]
3154 # Generating IPs for verification
3155 ip_list
= generate_ips(start_ip
, no_of_network
)
3159 for st_rt
in ip_list
:
3160 st_rt
= str(ipaddress
.ip_network(frr_unicode(st_rt
)))
3162 _addr_type
= validate_ip_address(st_rt
)
3163 if _addr_type
!= addr_type
:
3166 if st_rt
in rib_routes_json
:
3168 found_routes
.append(st_rt
)
3171 if type(next_hop
) is not list:
3172 next_hop
= [next_hop
]
3176 for nh_dict
in rib_routes_json
[st_rt
][0]["nexthops"]:
3177 if nh_dict
["ip"] != nh
:
3182 if count
== len(next_hop
):
3185 missing_routes
.append(st_rt
)
3187 "Nexthop {} is Missing"
3189 "RIB of router {}\n".format(next_hop
, st_rt
, dut
)
3193 missing_routes
.append(st_rt
)
3195 if len(missing_routes
) > 0:
3196 errormsg
= "[DUT: {}]: Missing route in FIB: " "{} \n".format(
3203 "Found next_hop {} for all routes in RIB"
3204 " of router {}\n".format(next_hop
, dut
)
3209 "[DUT: {}]: Verified routes FIB"
3210 ", found routes are: {}\n".format(dut
, found_routes
)
3213 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3217 def verify_admin_distance_for_static_routes(tgen
, input_dict
):
3219 API to verify admin distance for static routes as defined in input_dict/
3220 input JSON by running show ip/ipv6 route json command.
3223 * `tgen` : topogen object
3224 * `input_dict`: having details like - for which router and static routes
3225 admin dsitance needs to be verified
3228 # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
3229 10.0.0.2 in router r1
3233 "network": "10.0.20.1/32",
3234 "admin_distance": 10,
3235 "next_hop": "10.0.0.2"
3239 result = verify_admin_distance_for_static_routes(tgen, input_dict)
3242 errormsg(str) or True
3245 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3247 for router
in input_dict
.keys():
3248 if router
not in tgen
.routers():
3251 rnode
= tgen
.routers()[router
]
3253 for static_route
in input_dict
[router
]["static_routes"]:
3254 addr_type
= validate_ip_address(static_route
["network"])
3255 # Command to execute
3256 if addr_type
== "ipv4":
3257 command
= "show ip route json"
3259 command
= "show ipv6 route json"
3260 show_ip_route_json
= run_frr_cmd(rnode
, command
, isjson
=True)
3263 "Verifying admin distance for static route %s" " under dut %s:",
3267 network
= static_route
["network"]
3268 next_hop
= static_route
["next_hop"]
3269 admin_distance
= static_route
["admin_distance"]
3270 route_data
= show_ip_route_json
[network
][0]
3271 if network
in show_ip_route_json
:
3272 if route_data
["nexthops"][0]["ip"] == next_hop
:
3273 if route_data
["distance"] != admin_distance
:
3275 "Verification failed: admin distance"
3276 " for static route {} under dut {},"
3277 " found:{} but expected:{}".format(
3280 route_data
["distance"],
3287 "Verification successful: admin"
3288 " distance for static route %s under"
3289 " dut %s, found:%s",
3292 route_data
["distance"],
3297 "Static route {} not found in "
3298 "show_ip_route_json for dut {}".format(network
, router
)
3302 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3306 def verify_prefix_lists(tgen
, input_dict
):
3308 Running "show ip prefix-list" command and verifying given prefix-list
3309 is present in router.
3312 * `tgen` : topogen object
3313 * `input_dict`: data to verify prefix lists
3316 # To verify pf_list_1 is present in router r1
3319 "prefix_lists": ["pf_list_1"]
3321 result = verify_prefix_lists("ipv4", input_dict, tgen)
3324 errormsg(str) or True
3327 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3329 for router
in input_dict
.keys():
3330 if router
not in tgen
.routers():
3333 rnode
= tgen
.routers()[router
]
3335 # Show ip prefix list
3336 show_prefix_list
= run_frr_cmd(rnode
, "show ip prefix-list")
3338 # Verify Prefix list is deleted
3339 prefix_lists_addr
= input_dict
[router
]["prefix_lists"]
3340 for addr_type
in prefix_lists_addr
:
3341 if not check_address_types(addr_type
):
3344 for prefix_list
in prefix_lists_addr
[addr_type
].keys():
3345 if prefix_list
in show_prefix_list
:
3347 "Prefix list {} is/are present in the router"
3348 " {}".format(prefix_list
, router
)
3353 "Prefix list %s is/are not present in the router" " from router %s",
3358 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3362 @retry(attempts
=3, wait
=4, return_is_str
=True)
3363 def verify_route_maps(tgen
, input_dict
):
3365 Running "show route-map" command and verifying given route-map
3366 is present in router.
3369 * `tgen` : topogen object
3370 * `input_dict`: data to verify prefix lists
3373 # To verify rmap_1 and rmap_2 are present in router r1
3376 "route_maps": ["rmap_1", "rmap_2"]
3379 result = verify_route_maps(tgen, input_dict)
3382 errormsg(str) or True
3385 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3387 for router
in input_dict
.keys():
3388 if router
not in tgen
.routers():
3391 rnode
= tgen
.routers()[router
]
3393 show_route_maps
= rnode
.vtysh_cmd("show route-map")
3395 # Verify route-map is deleted
3396 route_maps
= input_dict
[router
]["route_maps"]
3397 for route_map
in route_maps
:
3398 if route_map
in show_route_maps
:
3399 errormsg
= "Route map {} is not deleted from router" " {}".format(
3405 "Route map %s is/are deleted successfully from" " router %s",
3410 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3414 @retry(attempts
=4, wait
=4, return_is_str
=True)
3415 def verify_bgp_community(tgen
, addr_type
, router
, network
, input_dict
=None):
3417 API to veiryf BGP large community is attached in route for any given
3418 DUT by running "show bgp ipv4/6 {route address} json" command.
3421 * `tgen`: topogen object
3422 * `addr_type` : ip type, ipv4/ipv6
3423 * `dut`: Device Under Test
3424 * `network`: network for which set criteria needs to be verified
3425 * `input_dict`: having details like - for which router, community and
3426 values needs to be verified
3429 networks = ["200.50.2.0/32"]
3431 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
3433 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
3436 errormsg(str) or True
3439 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3440 if router
not in tgen
.routers():
3443 rnode
= tgen
.routers()[router
]
3446 "Verifying BGP community attributes on dut %s: for %s " "network %s",
3453 cmd
= "show bgp {} {} json".format(addr_type
, net
)
3454 show_bgp_json
= rnode
.vtysh_cmd(cmd
, isjson
=True)
3455 logger
.info(show_bgp_json
)
3456 if "paths" not in show_bgp_json
:
3457 return "Prefix {} not found in BGP table of router: {}".format(net
, router
)
3459 as_paths
= show_bgp_json
["paths"]
3461 for i
in range(len(as_paths
)):
3463 "largeCommunity" in show_bgp_json
["paths"][i
]
3464 or "community" in show_bgp_json
["paths"][i
]
3468 "Large Community attribute is found for route:" " %s in router: %s",
3472 if input_dict
is not None:
3473 for criteria
, comm_val
in input_dict
.items():
3474 show_val
= show_bgp_json
["paths"][i
][criteria
]["string"]
3475 if comm_val
== show_val
:
3477 "Verifying BGP %s for prefix: %s"
3478 " in router: %s, found expected"
3487 "Failed: Verifying BGP attribute"
3488 " {} for route: {} in router: {}"
3489 ", expected value: {} but found"
3490 ": {}".format(criteria
, net
, router
, comm_val
, show_val
)
3496 "Large Community attribute is not found for route: "
3497 "{} in router: {} ".format(net
, router
)
3501 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3505 def verify_create_community_list(tgen
, input_dict
):
3507 API is to verify if large community list is created for any given DUT in
3508 input_dict by running "sh bgp large-community-list {"comm_name"} detail"
3512 * `tgen`: topogen object
3513 * `input_dict`: having details like - for which router, large community
3514 needs to be verified
3519 "large-community-list": {
3521 "Test1": [{"action": "PERMIT", "attribute":\
3524 result = verify_create_community_list(tgen, input_dict)
3527 errormsg(str) or True
3530 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3532 for router
in input_dict
.keys():
3533 if router
not in tgen
.routers():
3536 rnode
= tgen
.routers()[router
]
3538 logger
.info("Verifying large-community is created for dut %s:", router
)
3540 for comm_data
in input_dict
[router
]["bgp_community_lists"]:
3541 comm_name
= comm_data
["name"]
3542 comm_type
= comm_data
["community_type"]
3543 show_bgp_community
= run_frr_cmd(
3544 rnode
, "show bgp large-community-list {} detail".format(comm_name
)
3547 # Verify community list and type
3548 if comm_name
in show_bgp_community
and comm_type
in show_bgp_community
:
3550 "BGP %s large-community-list %s is" " created", comm_type
, comm_name
3553 errormsg
= "BGP {} large-community-list {} is not" " created".format(
3554 comm_type
, comm_name
3558 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3562 def verify_cli_json(tgen
, input_dict
):
3564 API to verify if JSON is available for clis
3569 * `tgen`: topogen object
3570 * `input_dict`: CLIs for which JSON needs to be verified
3575 "cli": ["show evpn vni detail", show evpn rmac vni all]
3579 result = verify_cli_json(tgen, input_dict)
3583 errormsg(str) or True
3586 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3587 for dut
in input_dict
.keys():
3588 rnode
= tgen
.routers()[dut
]
3590 for cli
in input_dict
[dut
]["cli"]:
3592 "[DUT: %s]: Verifying JSON is available for " "CLI %s :", dut
, cli
3595 test_cli
= "{} json".format(cli
)
3596 ret_json
= rnode
.vtysh_cmd(test_cli
, isjson
=True)
3597 if not bool(ret_json
):
3598 errormsg
= "CLI: %s, JSON format is not available" % (cli
)
3600 elif "unknown" in ret_json
or "Unknown" in ret_json
:
3601 errormsg
= "CLI: %s, JSON format is not available" % (cli
)
3605 "CLI : %s JSON format is available: " "\n %s", cli
, ret_json
3608 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3613 @retry(attempts
=3, wait
=4, return_is_str
=True)
3614 def verify_evpn_vni(tgen
, input_dict
):
3616 API to verify evpn vni details using "show evpn vni detail json"
3621 * `tgen`: topogen object
3622 * `input_dict`: having details like - for which router, evpn details
3623 needs to be verified
3632 "vxlanIntf": "vxlan75100",
3633 "localVtepIp": "120.1.1.1",
3641 result = verify_evpn_vni(tgen, input_dict)
3645 errormsg(str) or True
3648 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3649 for dut
in input_dict
.keys():
3650 rnode
= tgen
.routers()[dut
]
3652 logger
.info("[DUT: %s]: Verifying evpn vni details :", dut
)
3654 cmd
= "show evpn vni detail json"
3655 evpn_all_vni_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
3656 if not bool(evpn_all_vni_json
):
3657 errormsg
= "No output for '{}' cli".format(cmd
)
3660 if "vni" in input_dict
[dut
]:
3661 for vni_dict
in input_dict
[dut
]["vni"]:
3663 vni
= vni_dict
["name"]
3664 for evpn_vni_json
in evpn_all_vni_json
:
3665 if "vni" in evpn_vni_json
:
3666 if evpn_vni_json
["vni"] != int(vni
):
3669 for attribute
in vni_dict
.keys():
3670 if vni_dict
[attribute
] != evpn_vni_json
[attribute
]:
3672 "[DUT: %s] Verifying "
3673 "%s for VNI: %s [FAILED]||"
3680 vni_dict
[attribute
],
3681 evpn_vni_json
[attribute
],
3689 "[DUT: %s] Verifying"
3690 " %s for VNI: %s , "
3691 "Found Expected : %s ",
3695 evpn_vni_json
[attribute
],
3698 if evpn_vni_json
["state"] != "Up":
3700 "[DUT: %s] Failed: Verifying"
3701 " State for VNI: %s is not Up" % (dut
, vni
)
3708 " VNI: %s is not present in JSON" % (dut
, vni
)
3714 "[DUT %s]: Verifying VNI : %s "
3715 "details and state is Up [PASSED]!!",
3723 "[DUT: %s] Failed:" " vni details are not present in input data" % (dut
)
3727 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3731 @retry(attempts
=3, wait
=4, return_is_str
=True)
3732 def verify_vrf_vni(tgen
, input_dict
):
3734 API to verify vrf vni details using "show vrf vni json"
3739 * `tgen`: topogen object
3740 * `input_dict`: having details like - for which router, evpn details
3741 needs to be verified
3750 "vxlanIntf": "vxlan75100",
3752 "routerMac": "00:80:48:ba:d1:00",
3760 result = verify_vrf_vni(tgen, input_dict)
3764 errormsg(str) or True
3767 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3768 for dut
in input_dict
.keys():
3769 rnode
= tgen
.routers()[dut
]
3771 logger
.info("[DUT: %s]: Verifying vrf vni details :", dut
)
3773 cmd
= "show vrf vni json"
3774 vrf_all_vni_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
3775 if not bool(vrf_all_vni_json
):
3776 errormsg
= "No output for '{}' cli".format(cmd
)
3779 if "vrfs" in input_dict
[dut
]:
3780 for vrfs
in input_dict
[dut
]["vrfs"]:
3781 for vrf
, vrf_dict
in vrfs
.items():
3783 for vrf_vni_json
in vrf_all_vni_json
["vrfs"]:
3784 if "vrf" in vrf_vni_json
:
3785 if vrf_vni_json
["vrf"] != vrf
:
3788 for attribute
in vrf_dict
.keys():
3789 if vrf_dict
[attribute
] == vrf_vni_json
[attribute
]:
3792 "[DUT %s]: VRF: %s, "
3794 ", Found Expected: %s "
3799 vrf_vni_json
[attribute
],
3803 "[DUT: %s] VRF: %s, "
3804 "verifying %s [FAILED!!] "
3811 vrf_dict
[attribute
],
3812 vrf_vni_json
[attribute
],
3818 errormsg
= "[DUT: %s] VRF: %s " "is not present in JSON" % (
3826 "[DUT %s] Verifying VRF: %s " " details [PASSED]!!",
3834 "[DUT: %s] Failed:" " vrf details are not present in input data" % (dut
)
3838 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3842 def required_linux_kernel_version(required_version
):
3844 This API is used to check linux version compatibility of the test suite.
3845 If version mentioned in required_version is higher than the linux kernel
3846 of the system, test suite will be skipped. This API returns true or errormsg.
3850 * `required_version` : Kernel version required for the suites to run.
3854 result = linux_kernel_version_lowerthan('4.15')
3858 errormsg(str) or True
3860 system_kernel
= platform
.release()
3861 if version_cmp(system_kernel
, required_version
) < 0:
3863 'These tests will not run on kernel "{}", '
3864 "they require kernel >= {})".format(system_kernel
, required_version
)