2 # Copyright (c) 2019 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
21 from collections
import OrderedDict
22 from datetime
import datetime
23 from time
import sleep
24 from copy
import deepcopy
25 from subprocess
import call
26 from subprocess
import STDOUT
as SUB_STDOUT
27 from subprocess
import PIPE
as SUB_PIPE
28 from subprocess
import Popen
29 from functools
import wraps
30 from re
import search
as re_search
31 from tempfile
import mkdtemp
41 from lib
.topolog
import logger
, logger_config
42 from lib
.topogen
import TopoRouter
, get_topogen
43 from lib
.topotest
import interface_set_status
45 FRRCFG_FILE
= "frr_json.conf"
46 FRRCFG_BKUP_FILE
= "frr_json_initial.conf"
48 ERROR_LIST
= ["Malformed", "Failure", "Unknown", "Incomplete"]
52 CD
= os
.path
.dirname(os
.path
.realpath(__file__
))
53 PYTESTINI_PATH
= os
.path
.join(CD
, "../pytest.ini")
55 # Creating tmp dir with testsuite name to avoid conflict condition when
56 # multiple testsuites run together. All temporary files would be created
57 # in this dir and this dir would be removed once testsuite run is
59 LOGDIR
= "/tmp/topotests/"
62 # NOTE: to save execution logs to log file frrtest_log_dir must be configured
64 config
= ConfigParser
.ConfigParser()
65 config
.read(PYTESTINI_PATH
)
67 config_section
= "topogen"
69 if config
.has_option("topogen", "verbosity"):
70 loglevel
= config
.get("topogen", "verbosity")
71 loglevel
= loglevel
.upper()
75 if config
.has_option("topogen", "frrtest_log_dir"):
76 frrtest_log_dir
= config
.get("topogen", "frrtest_log_dir")
77 time_stamp
= datetime
.time(datetime
.now())
78 logfile_name
= "frr_test_bgp_"
79 frrtest_log_file
= frrtest_log_dir
+ logfile_name
+ str(time_stamp
)
80 print("frrtest_log_file..", frrtest_log_file
)
82 logger
= logger_config
.get_logger(
83 name
="test_execution_logs", log_level
=loglevel
, target
=frrtest_log_file
85 print("Logs will be sent to logfile: {}".format(frrtest_log_file
))
87 if config
.has_option("topogen", "show_router_config"):
88 show_router_config
= config
.get("topogen", "show_router_config")
90 show_router_config
= False
92 # env variable for setting what address type to test
93 ADDRESS_TYPES
= os
.environ
.get("ADDRESS_TYPES")
96 # Saves sequence id numbers
97 SEQ_ID
= {"prefix_lists": {}, "route_maps": {}}
100 def get_seq_id(obj_type
, router
, obj_name
):
102 Generates and saves sequence number in interval of 10
105 * `obj_type`: prefix_lists or route_maps
106 * `router`: router name
107 *` obj_name`: name of the prefix-list or route-map
110 Sequence number generated
113 router_data
= SEQ_ID
[obj_type
].setdefault(router
, {})
114 obj_data
= router_data
.setdefault(obj_name
, {})
115 seq_id
= obj_data
.setdefault("seq_id", 0)
117 seq_id
= int(seq_id
) + 10
118 obj_data
["seq_id"] = seq_id
123 def set_seq_id(obj_type
, router
, id, obj_name
):
125 Saves sequence number if not auto-generated and given by user
128 * `obj_type`: prefix_lists or route_maps
129 * `router`: router name
130 *` obj_name`: name of the prefix-list or route-map
132 router_data
= SEQ_ID
[obj_type
].setdefault(router
, {})
133 obj_data
= router_data
.setdefault(obj_name
, {})
134 seq_id
= obj_data
.setdefault("seq_id", 0)
136 seq_id
= int(seq_id
) + int(id)
137 obj_data
["seq_id"] = seq_id
140 class InvalidCLIError(Exception):
141 """Raise when the CLI command is wrong"""
146 def run_frr_cmd(rnode
, cmd
, isjson
=False):
148 Execute frr show commands in priviledged mode
149 * `rnode`: router node on which commands needs to executed
150 * `cmd`: Command to be executed on frr
151 * `isjson`: If command is to get json data or not
156 ret_data
= rnode
.vtysh_cmd(cmd
, isjson
=isjson
)
160 logger
.debug(ret_data
)
161 print_data
= rnode
.vtysh_cmd(cmd
.rstrip("json"), isjson
=False)
163 print_data
= ret_data
166 "Output for command [ %s] on router %s:\n%s",
174 raise InvalidCLIError("No actual cmd passed")
177 def apply_raw_config(tgen
, input_dict
):
180 API to configure raw configuration on device. This can be used for any cli
181 which does has not been implemented in JSON.
185 * `tgen`: tgen onject
186 * `input_dict`: configuration that needs to be applied
194 "no bgp update-group-split-horizon"
204 for router_name
in input_dict
.keys():
205 config_cmd
= input_dict
[router_name
]["raw_config"]
207 if not isinstance(config_cmd
, list):
208 config_cmd
= [config_cmd
]
210 frr_cfg_file
= "{}/{}/{}".format(TMPDIR
, router_name
, FRRCFG_FILE
)
211 with
open(frr_cfg_file
, "w") as cfg
:
212 for cmd
in config_cmd
:
213 cfg
.write("{}\n".format(cmd
))
215 result
= load_config_to_router(tgen
, router_name
)
220 def create_common_configuration(
221 tgen
, router
, data
, config_type
=None, build
=False, load_config
=True
224 API to create object of class FRRConfig and also create frr_json.conf
225 file. It will create interface and common configurations and save it to
226 frr_json.conf and load to router
229 * `tgen`: tgen onject
230 * `data`: Congiguration data saved in a list.
231 * `router` : router id to be configured.
232 * `config_type` : Syntactic information while writing configuration. Should
233 be one of the value as mentioned in the config_map below.
234 * `build` : Only for initial setup phase this is set as True
239 TMPDIR
= os
.path
.join(LOGDIR
, tgen
.modname
)
241 fname
= "{}/{}/{}".format(TMPDIR
, router
, FRRCFG_FILE
)
243 config_map
= OrderedDict(
245 "general_config": "! FRR General Config\n",
246 "interface_config": "! Interfaces Config\n",
247 "static_route": "! Static Route Config\n",
248 "prefix_list": "! Prefix List Config\n",
249 "bgp_community_list": "! Community List Config\n",
250 "route_maps": "! Route Maps Config\n",
251 "bgp": "! BGP Config\n",
252 "vrf": "! VRF Config\n",
258 elif not load_config
:
264 frr_cfg_fd
= open(fname
, mode
)
266 frr_cfg_fd
.write(config_map
[config_type
])
268 frr_cfg_fd
.write("{} \n".format(str(line
)))
269 frr_cfg_fd
.write("\n")
271 except IOError as err
:
273 "Unable to open FRR Config File. error(%s): %s" % (err
.errno
, err
.strerror
)
279 # If configuration applied from build, it will done at last
280 if not build
and load_config
:
281 load_config_to_router(tgen
, router
)
286 def kill_router_daemons(tgen
, router
, daemons
):
288 Router's current config would be saved to /etc/frr/ for each deamon
289 and deamon would be killed forcefully using SIGKILL.
290 * `tgen` : topogen object
291 * `router`: Device under test
292 * `daemons`: list of daemons to be killed
295 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
298 router_list
= tgen
.routers()
300 # Saving router config to /etc/frr, which will be loaded to router
302 router_list
[router
].vtysh_cmd("write memory")
305 result
= router_list
[router
].killDaemons(daemons
)
307 assert "Errors found post shutdown - details follow:" == 0, result
310 except Exception as e
:
311 errormsg
= traceback
.format_exc()
312 logger
.error(errormsg
)
316 def start_router_daemons(tgen
, router
, daemons
):
318 Daemons defined by user would be started
319 * `tgen` : topogen object
320 * `router`: Device under test
321 * `daemons`: list of daemons to be killed
324 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
327 router_list
= tgen
.routers()
330 result
= router_list
[router
].startDaemons(daemons
)
333 except Exception as e
:
334 errormsg
= traceback
.format_exc()
335 logger
.error(errormsg
)
338 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
342 def kill_mininet_routers_process(tgen
):
344 Kill all mininet stale router' processes
345 * `tgen` : topogen object
348 router_list
= tgen
.routers()
349 for rname
, router
in router_list
.iteritems():
362 for daemon
in daemon_list
:
363 router
.run("killall -9 {}".format(daemon
))
366 def check_router_status(tgen
):
368 Check if all daemons are running for all routers in topology
369 * `tgen` : topogen object
372 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
375 router_list
= tgen
.routers()
376 for router
, rnode
in router_list
.iteritems():
378 result
= rnode
.check_router_running()
382 daemons
.append("bgpd")
383 if "zebra" in result
:
384 daemons
.append("zebra")
386 rnode
.startDaemons(daemons
)
388 except Exception as e
:
389 errormsg
= traceback
.format_exc()
390 logger
.error(errormsg
)
393 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
397 def reset_config_on_routers(tgen
, routerName
=None):
399 Resets configuration on routers to the snapshot created using input JSON
400 file. It replaces existing router configuration with FRRCFG_BKUP_FILE
404 * `tgen` : Topogen object
405 * `routerName` : router config is to be reset
408 logger
.debug("Entering API: reset_config_on_routers")
410 router_list
= tgen
.routers()
411 for rname
in ROUTER_LIST
:
412 if routerName
and routerName
!= rname
:
415 router
= router_list
[rname
]
416 logger
.info("Configuring router %s to initial test configuration", rname
)
418 cfg
= router
.run("vtysh -c 'show running'")
419 fname
= "{}/{}/frr.sav".format(TMPDIR
, rname
)
420 dname
= "{}/{}/delta.conf".format(TMPDIR
, rname
)
422 for line
in cfg
.split("\n"):
426 line
== "Building configuration..."
427 or line
== "Current configuration:"
435 run_cfg_file
= "{}/{}/frr.sav".format(TMPDIR
, rname
)
436 init_cfg_file
= "{}/{}/frr_json_initial.conf".format(TMPDIR
, rname
)
437 command
= "/usr/lib/frr/frr-reload.py --input {} --test {} > {}".format(
438 run_cfg_file
, init_cfg_file
, dname
440 result
= call(command
, shell
=True, stderr
=SUB_STDOUT
, stdout
=SUB_PIPE
)
442 # Assert if command fail
444 logger
.error("Delta file creation failed. Command executed %s", command
)
445 with
open(run_cfg_file
, "r") as fd
:
447 "Running configuration saved in %s is:\n%s", run_cfg_file
, fd
.read()
449 with
open(init_cfg_file
, "r") as fd
:
451 "Test configuration saved in %s is:\n%s", init_cfg_file
, fd
.read()
454 err_cmd
= ["/usr/bin/vtysh", "-m", "-f", run_cfg_file
]
455 result
= Popen(err_cmd
, stdout
=SUB_PIPE
, stderr
=SUB_PIPE
)
456 output
= result
.communicate()
457 for out_data
in output
:
458 temp_data
= out_data
.decode("utf-8").lower()
459 for out_err
in ERROR_LIST
:
460 if out_err
.lower() in temp_data
:
462 "Found errors while validating data in" " %s", run_cfg_file
464 raise InvalidCLIError(out_data
)
465 raise InvalidCLIError("Unknown error in %s", output
)
468 delta
= StringIO
.StringIO()
469 delta
.write("configure terminal\n")
472 # Don't disable debugs
475 for line
in t_delta
.split("\n"):
477 if line
== "Lines To Delete" or line
== "===============" or not line
:
480 if line
== "Lines To Add":
484 if line
== "============" or not line
:
487 # Leave debugs and log output alone
489 if "debug" in line
or "log file" in line
:
499 output
= router
.vtysh_multicmd(delta
.getvalue(), pretty_output
=False)
502 delta
= StringIO
.StringIO()
503 cfg
= router
.run("vtysh -c 'show running'")
504 for line
in cfg
.split("\n"):
509 # Router current configuration to log file or console if
510 # "show_router_config" is defined in "pytest.ini"
511 if show_router_config
:
512 logger
.info("Configuration on router {} after reset:".format(rname
))
513 logger
.info(delta
.getvalue())
516 logger
.debug("Exiting API: reset_config_on_routers")
520 def load_config_to_router(tgen
, routerName
, save_bkup
=False):
522 Loads configuration on router from the file FRRCFG_FILE.
526 * `tgen` : Topogen object
527 * `routerName` : router for which configuration to be loaded
528 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
531 logger
.debug("Entering API: load_config_to_router")
533 router_list
= tgen
.routers()
534 for rname
in ROUTER_LIST
:
535 if routerName
and rname
!= routerName
:
538 router
= router_list
[rname
]
540 frr_cfg_file
= "{}/{}/{}".format(TMPDIR
, rname
, FRRCFG_FILE
)
541 frr_cfg_bkup
= "{}/{}/{}".format(TMPDIR
, rname
, FRRCFG_BKUP_FILE
)
542 with
open(frr_cfg_file
, "r+") as cfg
:
545 "Applying following configuration on router"
546 " {}:\n{}".format(rname
, data
)
549 with
open(frr_cfg_bkup
, "w") as bkup
:
552 output
= router
.vtysh_multicmd(data
, pretty_output
=False)
553 for out_err
in ERROR_LIST
:
554 if out_err
.lower() in output
.lower():
555 raise InvalidCLIError("%s" % output
)
559 except IOError as err
:
561 "Unable to open config File. error(%s):" " %s",
562 (err
.errno
, err
.strerror
),
566 # Router current configuration to log file or console if
567 # "show_router_config" is defined in "pytest.ini"
568 if show_router_config
:
569 logger
.info("New configuration for router {}:".format(rname
))
570 new_config
= router
.run("vtysh -c 'show running'")
571 logger
.info(new_config
)
573 logger
.debug("Exiting API: load_config_to_router")
577 def get_frr_ipv6_linklocal(tgen
, router
, intf
=None, vrf
=None):
579 API to get the link local ipv6 address of a perticular interface using
580 FRR command 'show interface'
582 * `tgen`: tgen onject
583 * `router` : router for which hightest interface should be
585 * `intf` : interface for which linklocal address needs to be taken
590 linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A)
594 1) array of interface names to link local ips.
597 router_list
= tgen
.routers()
598 for rname
, rnode
in router_list
.iteritems():
605 cmd
= "show interface vrf {}".format(vrf
)
607 cmd
= "show interface"
609 ifaces
= router_list
[router
].run('vtysh -c "{}"'.format(cmd
))
611 # Fix newlines (make them all the same)
612 ifaces
= ("\n".join(ifaces
.splitlines()) + "\n").splitlines()
618 m
= re_search("Interface ([a-zA-Z0-9-]+) is", line
)
620 interface
= m
.group(1).split(" ")[0]
624 m1
= re_search("inet6 (fe80[:a-fA-F0-9]+[\/0-9]+)", line
)
628 if ll_per_if_count
> 1:
629 linklocal
+= [["%s-%s" % (interface
, ll_per_if_count
), local
]]
631 linklocal
+= [[interface
, local
]]
635 return [_linklocal
[1] for _linklocal
in linklocal
if _linklocal
[0] == intf
][
640 errormsg
= "Link local ip missing on router {}"
644 def generate_support_bundle():
646 API to generate support bundle on any verification ste failure.
647 it runs a python utility, /usr/lib/frr/generate_support_bundle.py,
648 which basically runs defined CLIs and dumps the data to specified location
652 router_list
= tgen
.routers()
653 test_name
= sys
._getframe
(2).f_code
.co_name
654 TMPDIR
= os
.path
.join(LOGDIR
, tgen
.modname
)
656 for rname
, rnode
in router_list
.iteritems():
657 logger
.info("Generating support bundle for {}".format(rname
))
658 rnode
.run("mkdir -p /var/log/frr")
659 bundle_log
= rnode
.run("python2 /usr/lib/frr/generate_support_bundle.py")
660 logger
.info(bundle_log
)
662 dst_bundle
= "{}/{}/support_bundles/{}".format(TMPDIR
, rname
, test_name
)
663 src_bundle
= "/var/log/frr"
664 rnode
.run("rm -rf {}".format(dst_bundle
))
665 rnode
.run("mkdir -p {}".format(dst_bundle
))
666 rnode
.run("mv -f {}/* {}".format(src_bundle
, dst_bundle
))
671 def start_topology(tgen
):
673 Starting topology, create tmp files which are loaded to routers
674 to start deamons and then start routers
675 * `tgen` : topogen object
678 global TMPDIR
, ROUTER_LIST
680 tgen
.start_topology()
684 router_list
= tgen
.routers()
685 ROUTER_LIST
= sorted(
686 router_list
.keys(), key
=lambda x
: int(re_search("\d+", x
).group(0))
688 TMPDIR
= os
.path
.join(LOGDIR
, tgen
.modname
)
690 router_list
= tgen
.routers()
691 for rname
in ROUTER_LIST
:
692 router
= router_list
[rname
]
694 # It will help in debugging the failures, will give more details on which
695 # specific kernel version tests are failing
696 linux_ver
= router
.run("uname -a")
697 logger
.info("Logging platform related details: \n %s \n", linux_ver
)
702 # Creating router named dir and empty zebra.conf bgpd.conf files
703 # inside the current directory
704 if os
.path
.isdir("{}".format(rname
)):
705 os
.system("rm -rf {}".format(rname
))
706 os
.mkdir("{}".format(rname
))
707 os
.system("chmod -R go+rw {}".format(rname
))
708 os
.chdir("{}/{}".format(TMPDIR
, rname
))
709 os
.system("touch zebra.conf bgpd.conf")
711 os
.mkdir("{}".format(rname
))
712 os
.system("chmod -R go+rw {}".format(rname
))
713 os
.chdir("{}/{}".format(TMPDIR
, rname
))
714 os
.system("touch zebra.conf bgpd.conf")
716 except IOError as (errno
, strerror
):
717 logger
.error("I/O error({0}): {1}".format(errno
, strerror
))
719 # Loading empty zebra.conf file to router, to start the zebra deamon
721 TopoRouter
.RD_ZEBRA
, "{}/{}/zebra.conf".format(TMPDIR
, rname
)
723 # Loading empty bgpd.conf file to router, to start the bgp deamon
724 router
.load_config(TopoRouter
.RD_BGP
, "{}/{}/bgpd.conf".format(TMPDIR
, rname
))
727 logger
.info("Starting all routers once topology is created")
731 def stop_router(tgen
, router
):
733 Router"s current config would be saved to /etc/frr/ for each deamon
734 and router and its deamons would be stopped.
736 * `tgen` : topogen object
737 * `router`: Device under test
740 router_list
= tgen
.routers()
742 # Saving router config to /etc/frr, which will be loaded to router
744 router_list
[router
].vtysh_cmd("write memory")
747 router_list
[router
].stop()
750 def start_router(tgen
, router
):
752 Router will started and config would be loaded from /etc/frr/ for each
755 * `tgen` : topogen object
756 * `router`: Device under test
759 logger
.debug("Entering lib API: start_router")
762 router_list
= tgen
.routers()
764 # Router and its deamons would be started and config would
765 # be loaded to router for each deamon from /etc/frr
766 router_list
[router
].start()
768 # Waiting for router to come up
771 except Exception as e
:
772 errormsg
= traceback
.format_exc()
773 logger
.error(errormsg
)
776 logger
.debug("Exiting lib API: start_router()")
780 def number_to_row(routerName
):
782 Returns the number for the router.
783 Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23
786 return int(routerName
[1:])
789 def number_to_column(routerName
):
791 Returns the number for the router.
792 Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1,
795 return ord(routerName
[0]) - 97
798 #############################################
799 # Common APIs, will be used by all protocols
800 #############################################
803 def create_vrf_cfg(tgen
, topo
, input_dict
=None, build
=False):
805 Create vrf configuration for created topology. VRF
806 configuration is provided in input json file.
808 VRF config is done in Linux Kernel:
810 * Attach interface to VRF
815 * `tgen` : Topogen object
816 * `topo` : json file data
817 * `input_dict` : Input dict data, required when configuring
819 * `build` : Only for initial setup phase this is set as True.
826 "r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_A"},
827 "r2-link2": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_B"},
828 "r2-link3": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_A"},
829 "r2-link4": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_B"},
852 result = create_vrf_cfg(tgen, topo, input_dict)
860 input_dict
= deepcopy(topo
)
862 input_dict
= deepcopy(input_dict
)
865 for c_router
, c_data
in input_dict
.iteritems():
866 rnode
= tgen
.routers()[c_router
]
868 for vrf
in c_data
["vrfs"]:
870 del_action
= vrf
.setdefault("delete", False)
871 name
= vrf
.setdefault("name", None)
872 table_id
= vrf
.setdefault("id", None)
873 vni
= vrf
.setdefault("vni", None)
874 del_vni
= vrf
.setdefault("no_vni", None)
877 # Kernel cmd- Add VRF and table
878 cmd
= "ip link del {} type vrf table {}".format(
879 vrf
["name"], vrf
["id"]
882 logger
.info("[DUT: %s]: Running kernel cmd [%s]", c_router
, cmd
)
885 # Kernel cmd - Bring down VRF
886 cmd
= "ip link set dev {} down".format(name
)
887 logger
.info("[DUT: %s]: Running kernel cmd [%s]", c_router
, cmd
)
891 if name
and table_id
:
892 # Kernel cmd- Add VRF and table
893 cmd
= "ip link add {} type vrf table {}".format(
897 "[DUT: %s]: Running kernel cmd " "[%s]", c_router
, cmd
901 # Kernel cmd - Bring up VRF
902 cmd
= "ip link set dev {} up".format(name
)
904 "[DUT: %s]: Running kernel " "cmd [%s]", c_router
, cmd
908 if "links" in c_data
:
909 for destRouterLink
, data
in sorted(
910 c_data
["links"].iteritems()
912 # Loopback interfaces
913 if "type" in data
and data
["type"] == "loopback":
914 interface_name
= destRouterLink
916 interface_name
= data
["interface"]
919 vrf_list
= data
["vrf"]
921 if type(vrf_list
) is not list:
922 vrf_list
= [vrf_list
]
924 for _vrf
in vrf_list
:
925 cmd
= "ip link set {} master {}".format(
930 "[DUT: %s]: Running" " kernel cmd [%s]",
937 config_data
.append("vrf {}".format(vrf
["name"]))
938 cmd
= "vni {}".format(vni
)
939 config_data
.append(cmd
)
942 config_data
.append("vrf {}".format(vrf
["name"]))
943 cmd
= "no vni {}".format(del_vni
)
944 config_data
.append(cmd
)
946 result
= create_common_configuration(
947 tgen
, c_router
, config_data
, "vrf", build
=build
950 except InvalidCLIError
:
952 errormsg
= traceback
.format_exc()
953 logger
.error(errormsg
)
959 def create_interface_in_kernel(
960 tgen
, dut
, name
, ip_addr
, vrf
=None, netmask
=None, create
=True
963 Cretae interfaces in kernel for ipv4/ipv6
964 Config is done in Linux Kernel:
968 * `tgen` : Topogen object
969 * `dut` : Device for which interfaces to be added
970 * `name` : interface name
971 * `ip_addr` : ip address for interface
972 * `vrf` : VRF name, to which interface will be associated
973 * `netmask` : netmask value, default is None
974 * `create`: Create interface in kernel, if created then no need
978 rnode
= tgen
.routers()[dut
]
981 cmd
= "sudo ip link add name {} type dummy".format(name
)
984 addr_type
= validate_ip_address(ip_addr
)
985 if addr_type
== "ipv4":
986 cmd
= "ifconfig {} {} netmask {}".format(name
, ip_addr
, netmask
)
988 cmd
= "ifconfig {} inet6 add {}/{}".format(name
, ip_addr
, netmask
)
993 cmd
= "ip link set {} master {}".format(name
, vrf
)
997 def shutdown_bringup_interface_in_kernel(tgen
, dut
, intf_name
, ifaceaction
=False):
999 Cretae interfaces in kernel for ipv4/ipv6
1000 Config is done in Linux Kernel:
1004 * `tgen` : Topogen object
1005 * `dut` : Device for which interfaces to be added
1006 * `intf_name` : interface name
1007 * `ifaceaction` : False to shutdown and True to bringup the
1011 rnode
= tgen
.routers()[dut
]
1013 cmd
= "ip link set dev"
1016 cmd
= "{} {} {}".format(cmd
, intf_name
, action
)
1019 cmd
= "{} {} {}".format(cmd
, intf_name
, action
)
1021 logger
.info("[DUT: %s]: Running command: %s", dut
, cmd
)
1025 def validate_ip_address(ip_address
):
1027 Validates the type of ip address
1030 * `ip_address`: IPv4/IPv6 address
1033 Type of address as string
1036 if "/" in ip_address
:
1037 ip_address
= ip_address
.split("/")[0]
1042 socket
.inet_aton(ip_address
)
1043 except socket
.error
as error
:
1044 logger
.debug("Not a valid IPv4 address")
1050 socket
.inet_pton(socket
.AF_INET6
, ip_address
)
1051 except socket
.error
as error
:
1052 logger
.debug("Not a valid IPv6 address")
1057 if not v4
and not v6
:
1059 "InvalidIpAddr", "%s is neither valid IPv4 or IPv6" " address" % ip_address
1063 def check_address_types(addr_type
=None):
1065 Checks environment variable set and compares with the current address type
1068 addr_types_env
= os
.environ
.get("ADDRESS_TYPES")
1069 if not addr_types_env
:
1070 addr_types_env
= "dual"
1072 if addr_types_env
== "dual":
1073 addr_types
= ["ipv4", "ipv6"]
1074 elif addr_types_env
== "ipv4":
1075 addr_types
= ["ipv4"]
1076 elif addr_types_env
== "ipv6":
1077 addr_types
= ["ipv6"]
1079 if addr_type
is None:
1082 if addr_type
not in addr_types
:
1084 "{} not in supported/configured address types {}".format(
1085 addr_type
, addr_types
1093 def generate_ips(network
, no_of_ips
):
1095 Returns list of IPs.
1096 based on start_ip and no_of_ips
1097 * `network` : from here the ip will start generating,
1099 * `no_of_ips` : these many IPs will be generated
1103 if type(network
) is not list:
1106 for start_ipaddr
in network
:
1107 if "/" in start_ipaddr
:
1108 start_ip
= start_ipaddr
.split("/")[0]
1109 mask
= int(start_ipaddr
.split("/")[1])
1111 addr_type
= validate_ip_address(start_ip
)
1112 if addr_type
== "ipv4":
1113 start_ip
= ipaddress
.IPv4Address(unicode(start_ip
))
1114 step
= 2 ** (32 - mask
)
1115 if addr_type
== "ipv6":
1116 start_ip
= ipaddress
.IPv6Address(unicode(start_ip
))
1117 step
= 2 ** (128 - mask
)
1121 while count
< no_of_ips
:
1122 ipaddress_list
.append("{}/{}".format(next_ip
, mask
))
1123 if addr_type
== "ipv6":
1124 next_ip
= ipaddress
.IPv6Address(int(next_ip
) + step
)
1129 return ipaddress_list
1132 def find_interface_with_greater_ip(topo
, router
, loopback
=True, interface
=True):
1134 Returns highest interface ip for ipv4/ipv6. If loopback is there then
1135 it will return highest IP from loopback IPs otherwise from physical
1137 * `topo` : json file data
1138 * `router` : router for which hightest interface should be calculated
1141 link_data
= topo
["routers"][router
]["links"]
1143 interfaces_list
= []
1145 for destRouterLink
, data
in sorted(link_data
.iteritems()):
1147 if "type" in data
and data
["type"] == "loopback":
1149 ip_address
= topo
["routers"][router
]["links"][destRouterLink
][
1152 lo_list
.append(ip_address
)
1154 ip_address
= topo
["routers"][router
]["links"][destRouterLink
]["ipv4"].split(
1157 interfaces_list
.append(ip_address
)
1160 return sorted(lo_list
)[-1]
1162 return sorted(interfaces_list
)[-1]
1165 def write_test_header(tc_name
):
1166 """ Display message at beginning of test case"""
1168 logger
.info("*" * (len(tc_name
) + count
))
1169 step("START -> Testcase : %s" % tc_name
, reset
=True)
1170 logger
.info("*" * (len(tc_name
) + count
))
1173 def write_test_footer(tc_name
):
1174 """ Display message at end of test case"""
1176 logger
.info("=" * (len(tc_name
) + count
))
1177 logger
.info("Testcase : %s -> PASSED", tc_name
)
1178 logger
.info("=" * (len(tc_name
) + count
))
1181 def interface_status(tgen
, topo
, input_dict
):
1183 Delete ip route maps from device
1184 * `tgen` : Topogen object
1185 * `topo` : json file data
1186 * `input_dict` : for which router, route map has to be deleted
1191 "interface_list": ['eth1-r1-r2', 'eth2-r1-r3'],
1197 errormsg(str) or True
1199 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1203 for router
in input_dict
.keys():
1205 interface_list
= input_dict
[router
]["interface_list"]
1206 status
= input_dict
[router
].setdefault("status", "up")
1207 for intf
in interface_list
:
1208 rnode
= tgen
.routers()[router
]
1209 interface_set_status(rnode
, intf
, status
)
1211 # Load config to router
1212 load_config_to_router(tgen
, router
)
1214 except Exception as e
:
1215 # handle any exception
1216 logger
.error("Error %s occured. Arguments %s.", e
.message
, e
.args
)
1219 errormsg
= traceback
.format_exc()
1220 logger
.error(errormsg
)
1223 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1227 def retry(attempts
=3, wait
=2, return_is_str
=True, initial_wait
=0, return_is_dict
=False):
1229 Retries function execution, if return is an errormsg or exception
1231 * `attempts`: Number of attempts to make
1232 * `wait`: Number of seconds to wait between each attempt
1233 * `return_is_str`: Return val is an errormsg in case of failure
1234 * `initial_wait`: Sleeps for this much seconds before executing function
1240 def func_retry(*args
, **kwargs
):
1241 _wait
= kwargs
.pop("wait", wait
)
1242 _attempts
= kwargs
.pop("attempts", attempts
)
1243 _attempts
= int(_attempts
)
1245 raise ValueError("attempts must be 0 or greater")
1247 if initial_wait
> 0:
1248 logger
.info("Waiting for [%s]s as initial delay", initial_wait
)
1251 _return_is_str
= kwargs
.pop("return_is_str", return_is_str
)
1252 _return_is_dict
= kwargs
.pop("return_is_str", return_is_dict
)
1253 for i
in range(1, _attempts
+ 1):
1255 _expected
= kwargs
.setdefault("expected", True)
1256 kwargs
.pop("expected")
1257 ret
= func(*args
, **kwargs
)
1258 logger
.debug("Function returned %s" % ret
)
1259 if _return_is_str
and isinstance(ret
, bool) and _expected
:
1262 isinstance(ret
, str) or isinstance(ret
, unicode)
1263 ) and _expected
is False:
1265 if _return_is_dict
and isinstance(ret
, dict):
1269 generate_support_bundle()
1271 except Exception as err
:
1273 generate_support_bundle()
1274 logger
.info("Max number of attempts (%r) reached", _attempts
)
1277 logger
.info("Function returned %s", err
)
1279 logger
.info("Retry [#%r] after sleeping for %ss" % (i
, _wait
))
1282 func_retry
._original
= func
1290 Prints step number for the test case step being executed
1295 def __call__(self
, msg
, reset
):
1300 logger
.info("STEP %s: '%s'", Stepper
.count
, msg
)
1304 def step(msg
, reset
=False):
1306 Call Stepper to print test steps. Need to reset at the beginning of test.
1307 * ` msg` : Step message body.
1308 * `reset` : Reset step count to 1 when set to True.
1314 #############################################
1315 # These APIs, will used by testcase
1316 #############################################
1317 def create_interfaces_cfg(tgen
, topo
, build
=False):
1319 Create interface configuration for created topology. Basic Interface
1320 configuration is provided in input json file.
1324 * `tgen` : Topogen object
1325 * `topo` : json file data
1326 * `build` : Only for initial setup phase this is set as True.
1333 topo
= deepcopy(topo
)
1336 for c_router
, c_data
in topo
.iteritems():
1338 for destRouterLink
, data
in sorted(c_data
["links"].iteritems()):
1339 # Loopback interfaces
1340 if "type" in data
and data
["type"] == "loopback":
1341 interface_name
= destRouterLink
1343 interface_name
= data
["interface"]
1345 # Include vrf if present
1347 interface_data
.append(
1348 "interface {} vrf {}".format(
1349 str(interface_name
), str(data
["vrf"])
1353 interface_data
.append("interface {}".format(str(interface_name
)))
1356 intf_addr
= c_data
["links"][destRouterLink
]["ipv4"]
1358 if "delete" in data
and data
["delete"]:
1359 interface_data
.append("no ip address {}".format(intf_addr
))
1361 interface_data
.append("ip address {}".format(intf_addr
))
1363 intf_addr
= c_data
["links"][destRouterLink
]["ipv6"]
1365 if "delete" in data
and data
["delete"]:
1366 interface_data
.append("no ipv6 address {}".format(intf_addr
))
1368 interface_data
.append("ipv6 address {}".format(intf_addr
))
1370 if "ipv6-link-local" in data
:
1371 intf_addr
= c_data
["links"][destRouterLink
]["ipv6-link-local"]
1373 if "delete" in data
and data
["delete"]:
1374 interface_data
.append("no ipv6 address {}".format(intf_addr
))
1376 interface_data
.append("ipv6 address {}\n".format(intf_addr
))
1378 result
= create_common_configuration(
1379 tgen
, c_router
, interface_data
, "interface_config", build
=build
1381 except InvalidCLIError
:
1383 errormsg
= traceback
.format_exc()
1384 logger
.error(errormsg
)
1390 def create_static_routes(tgen
, input_dict
, build
=False):
1392 Create static routes for given router as defined in input_dict
1396 * `tgen` : Topogen object
1397 * `input_dict` : Input dict data, required when configuring from testcase
1398 * `build` : Only for initial setup phase this is set as True.
1402 input_dict should be in the format below:
1403 # static_routes: list of all routes
1404 # network: network address
1405 # no_of_ip: number of next-hop address that will be configured
1406 # admin_distance: admin distance for route/routes.
1407 # next_hop: starting next-hop address
1408 # tag: tag id for static routes
1409 # vrf: VRF name in which static routes needs to be created
1410 # delete: True if config to be removed. Default False.
1417 "network": "100.0.20.1/32",
1419 "admin_distance": 100,
1420 "next_hop": "10.0.0.1",
1431 errormsg(str) or True
1434 logger
.debug("Entering lib API: create_static_routes()")
1435 input_dict
= deepcopy(input_dict
)
1438 for router
in input_dict
.keys():
1439 if "static_routes" not in input_dict
[router
]:
1440 errormsg
= "static_routes not present in input_dict"
1441 logger
.info(errormsg
)
1444 static_routes_list
= []
1446 static_routes
= input_dict
[router
]["static_routes"]
1447 for static_route
in static_routes
:
1448 del_action
= static_route
.setdefault("delete", False)
1449 no_of_ip
= static_route
.setdefault("no_of_ip", 1)
1450 network
= static_route
.setdefault("network", [])
1451 if type(network
) is not list:
1454 admin_distance
= static_route
.setdefault("admin_distance", None)
1455 tag
= static_route
.setdefault("tag", None)
1456 vrf
= static_route
.setdefault("vrf", None)
1457 interface
= static_route
.setdefault("interface", None)
1458 next_hop
= static_route
.setdefault("next_hop", None)
1459 nexthop_vrf
= static_route
.setdefault("nexthop_vrf", None)
1461 ip_list
= generate_ips(network
, no_of_ip
)
1463 addr_type
= validate_ip_address(ip
)
1465 if addr_type
== "ipv4":
1466 cmd
= "ip route {}".format(ip
)
1468 cmd
= "ipv6 route {}".format(ip
)
1471 cmd
= "{} {}".format(cmd
, interface
)
1474 cmd
= "{} {}".format(cmd
, next_hop
)
1477 cmd
= "{} nexthop-vrf {}".format(cmd
, nexthop_vrf
)
1480 cmd
= "{} vrf {}".format(cmd
, vrf
)
1483 cmd
= "{} tag {}".format(cmd
, str(tag
))
1486 cmd
= "{} {}".format(cmd
, admin_distance
)
1489 cmd
= "no {}".format(cmd
)
1491 static_routes_list
.append(cmd
)
1493 result
= create_common_configuration(
1494 tgen
, router
, static_routes_list
, "static_route", build
=build
1497 except InvalidCLIError
:
1499 errormsg
= traceback
.format_exc()
1500 logger
.error(errormsg
)
1503 logger
.debug("Exiting lib API: create_static_routes()")
1507 def create_prefix_lists(tgen
, input_dict
, build
=False):
1509 Create ip prefix lists as per the config provided in input
1513 * `tgen` : Topogen object
1514 * `input_dict` : Input dict data, required when configuring from testcase
1515 * `build` : Only for initial setup phase this is set as True.
1518 # pf_lists_1: name of prefix-list, user defined
1519 # seqid: prefix-list seqid, auto-generated if not given by user
1520 # network: criteria for applying prefix-list
1521 # action: permit/deny
1522 # le: less than or equal number of bits
1523 # ge: greater than or equal number of bits
1549 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1552 for router
in input_dict
.keys():
1553 if "prefix_lists" not in input_dict
[router
]:
1554 errormsg
= "prefix_lists not present in input_dict"
1555 logger
.debug(errormsg
)
1559 prefix_lists
= input_dict
[router
]["prefix_lists"]
1560 for addr_type
, prefix_data
in prefix_lists
.iteritems():
1561 if not check_address_types(addr_type
):
1564 for prefix_name
, prefix_list
in prefix_data
.iteritems():
1565 for prefix_dict
in prefix_list
:
1566 if "action" not in prefix_dict
or "network" not in prefix_dict
:
1567 errormsg
= "'action' or network' missing in" " input_dict"
1570 network_addr
= prefix_dict
["network"]
1571 action
= prefix_dict
["action"]
1572 le
= prefix_dict
.setdefault("le", None)
1573 ge
= prefix_dict
.setdefault("ge", None)
1574 seqid
= prefix_dict
.setdefault("seqid", None)
1575 del_action
= prefix_dict
.setdefault("delete", False)
1577 seqid
= get_seq_id("prefix_lists", router
, prefix_name
)
1579 set_seq_id("prefix_lists", router
, seqid
, prefix_name
)
1581 if addr_type
== "ipv4":
1586 cmd
= "{} prefix-list {} seq {} {} {}".format(
1587 protocol
, prefix_name
, seqid
, action
, network_addr
1590 cmd
= "{} le {}".format(cmd
, le
)
1592 cmd
= "{} ge {}".format(cmd
, ge
)
1595 cmd
= "no {}".format(cmd
)
1597 config_data
.append(cmd
)
1598 result
= create_common_configuration(
1599 tgen
, router
, config_data
, "prefix_list", build
=build
1602 except InvalidCLIError
:
1604 errormsg
= traceback
.format_exc()
1605 logger
.error(errormsg
)
1608 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1612 def create_route_maps(tgen
, input_dict
, build
=False):
1614 Create route-map on the devices as per the arguments passed
1617 * `tgen` : Topogen object
1618 * `input_dict` : Input dict data, required when configuring from testcase
1619 * `build` : Only for initial setup phase this is set as True.
1622 # route_maps: key, value pair for route-map name and its attribute
1623 # rmap_match_prefix_list_1: user given name for route-map
1624 # action: PERMIT/DENY
1625 # match: key,value pair for match criteria. prefix_list, community-list,
1626 large-community-list or tag. Only one option at a time.
1627 # prefix_list: name of prefix list
1628 # large-community-list: name of large community list
1629 # community-ist: name of community list
1630 # tag: tag id for static routes
1631 # set: key, value pair for modifying route attributes
1632 # localpref: preference value for the network
1633 # med: metric value advertised for AS
1634 # aspath: set AS path value
1635 # weight: weight for the route
1636 # community: standard community value to be attached
1637 # large_community: large community value to be attached
1638 # community_additive: if set to "additive", adds community/large-community
1639 value to the existing values of the network prefix
1645 "rmap_match_prefix_list_1": [
1650 "prefix_list": "pf_list_1"
1653 "prefix_list": "pf_list_1"
1655 "large-community-list": {
1656 "id": "community_1",
1660 "id": "community_2",
1670 "action": "prepend",
1677 "large_community": {
1678 "num": "1:2:3 4:5;6",
1689 errormsg(str) or True
1693 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1694 input_dict
= deepcopy(input_dict
)
1696 for router
in input_dict
.keys():
1697 if "route_maps" not in input_dict
[router
]:
1698 logger
.debug("route_maps not present in input_dict")
1701 for rmap_name
, rmap_value
in input_dict
[router
]["route_maps"].iteritems():
1703 for rmap_dict
in rmap_value
:
1704 del_action
= rmap_dict
.setdefault("delete", False)
1707 rmap_data
.append("no route-map {}".format(rmap_name
))
1710 if "action" not in rmap_dict
:
1711 errormsg
= "action not present in input_dict"
1712 logger
.error(errormsg
)
1715 rmap_action
= rmap_dict
.setdefault("action", "deny")
1717 seq_id
= rmap_dict
.setdefault("seq_id", None)
1719 seq_id
= get_seq_id("route_maps", router
, rmap_name
)
1721 set_seq_id("route_maps", router
, seq_id
, rmap_name
)
1724 "route-map {} {} {}".format(rmap_name
, rmap_action
, seq_id
)
1727 if "continue" in rmap_dict
:
1728 continue_to
= rmap_dict
["continue"]
1730 rmap_data
.append("on-match goto {}".format(continue_to
))
1733 "In continue, 'route-map entry "
1734 "sequence number' is not provided"
1738 if "goto" in rmap_dict
:
1739 go_to
= rmap_dict
["goto"]
1741 rmap_data
.append("on-match goto {}".format(go_to
))
1744 "In goto, 'Goto Clause number' is not" " provided"
1748 if "call" in rmap_dict
:
1749 call_rmap
= rmap_dict
["call"]
1751 rmap_data
.append("call {}".format(call_rmap
))
1754 "In call, 'destination Route-Map' is" " not provided"
1758 # Verifying if SET criteria is defined
1759 if "set" in rmap_dict
:
1760 set_data
= rmap_dict
["set"]
1761 ipv4_data
= set_data
.setdefault("ipv4", {})
1762 ipv6_data
= set_data
.setdefault("ipv6", {})
1763 local_preference
= set_data
.setdefault("locPrf", None)
1764 metric
= set_data
.setdefault("metric", None)
1765 as_path
= set_data
.setdefault("path", {})
1766 weight
= set_data
.setdefault("weight", None)
1767 community
= set_data
.setdefault("community", {})
1768 large_community
= set_data
.setdefault("large_community", {})
1769 large_comm_list
= set_data
.setdefault("large_comm_list", {})
1770 set_action
= set_data
.setdefault("set_action", None)
1771 nexthop
= set_data
.setdefault("nexthop", None)
1772 origin
= set_data
.setdefault("origin", None)
1773 ext_comm_list
= set_data
.setdefault("extcommunity", {})
1776 if local_preference
:
1778 "set local-preference {}".format(local_preference
)
1783 rmap_data
.append("set metric {} \n".format(metric
))
1787 rmap_data
.append("set origin {} \n".format(origin
))
1791 as_num
= as_path
.setdefault("as_num", None)
1792 as_action
= as_path
.setdefault("as_action", None)
1793 if as_action
and as_num
:
1795 "set as-path {} {}".format(as_action
, as_num
)
1800 num
= community
.setdefault("num", None)
1801 comm_action
= community
.setdefault("action", None)
1803 cmd
= "set community {}".format(num
)
1805 cmd
= "{} {}".format(cmd
, comm_action
)
1806 rmap_data
.append(cmd
)
1808 logger
.error("In community, AS Num not" " provided")
1812 num
= large_community
.setdefault("num", None)
1813 comm_action
= large_community
.setdefault("action", None)
1815 cmd
= "set large-community {}".format(num
)
1817 cmd
= "{} {}".format(cmd
, comm_action
)
1819 rmap_data
.append(cmd
)
1822 "In large_community, AS Num not" " provided"
1826 id = large_comm_list
.setdefault("id", None)
1827 del_comm
= large_comm_list
.setdefault("delete", None)
1829 cmd
= "set large-comm-list {}".format(id)
1831 cmd
= "{} delete".format(cmd
)
1833 rmap_data
.append(cmd
)
1835 logger
.error("In large_comm_list 'id' not" " provided")
1839 rt
= ext_comm_list
.setdefault("rt", None)
1840 del_comm
= ext_comm_list
.setdefault("delete", None)
1842 cmd
= "set extcommunity rt {}".format(rt
)
1844 cmd
= "{} delete".format(cmd
)
1846 rmap_data
.append(cmd
)
1848 logger
.debug("In ext_comm_list 'rt' not" " provided")
1853 rmap_data
.append("set weight {}".format(weight
))
1855 nexthop
= ipv6_data
.setdefault("nexthop", None)
1857 rmap_data
.append("set ipv6 next-hop {}".format(nexthop
))
1859 # Adding MATCH and SET sequence to RMAP if defined
1860 if "match" in rmap_dict
:
1861 match_data
= rmap_dict
["match"]
1862 ipv4_data
= match_data
.setdefault("ipv4", {})
1863 ipv6_data
= match_data
.setdefault("ipv6", {})
1864 community
= match_data
.setdefault("community_list", {})
1865 large_community
= match_data
.setdefault("large_community", {})
1866 large_community_list
= match_data
.setdefault(
1867 "large_community_list", {}
1870 metric
= match_data
.setdefault("metric", None)
1871 source_vrf
= match_data
.setdefault("source-vrf", None)
1874 # fetch prefix list data from rmap
1875 prefix_name
= ipv4_data
.setdefault("prefix_lists", None)
1879 " prefix-list {}".format(prefix_name
)
1882 # fetch tag data from rmap
1883 tag
= ipv4_data
.setdefault("tag", None)
1885 rmap_data
.append("match tag {}".format(tag
))
1887 # fetch large community data from rmap
1888 large_community_list
= ipv4_data
.setdefault(
1889 "large_community_list", {}
1891 large_community
= match_data
.setdefault(
1892 "large_community", {}
1896 prefix_name
= ipv6_data
.setdefault("prefix_lists", None)
1899 "match ipv6 address"
1900 " prefix-list {}".format(prefix_name
)
1903 # fetch tag data from rmap
1904 tag
= ipv6_data
.setdefault("tag", None)
1906 rmap_data
.append("match tag {}".format(tag
))
1908 # fetch large community data from rmap
1909 large_community_list
= ipv6_data
.setdefault(
1910 "large_community_list", {}
1912 large_community
= match_data
.setdefault(
1913 "large_community", {}
1917 if "id" not in community
:
1919 "'id' is mandatory for "
1920 "community-list in match"
1924 cmd
= "match community {}".format(community
["id"])
1925 exact_match
= community
.setdefault("exact_match", False)
1927 cmd
= "{} exact-match".format(cmd
)
1929 rmap_data
.append(cmd
)
1931 if "id" not in large_community
:
1933 "'id' is mandatory for "
1934 "large-community-list in match "
1938 cmd
= "match large-community {}".format(
1939 large_community
["id"]
1941 exact_match
= large_community
.setdefault(
1942 "exact_match", False
1945 cmd
= "{} exact-match".format(cmd
)
1946 rmap_data
.append(cmd
)
1947 if large_community_list
:
1948 if "id" not in large_community_list
:
1950 "'id' is mandatory for "
1951 "large-community-list in match "
1955 cmd
= "match large-community {}".format(
1956 large_community_list
["id"]
1958 exact_match
= large_community_list
.setdefault(
1959 "exact_match", False
1962 cmd
= "{} exact-match".format(cmd
)
1963 rmap_data
.append(cmd
)
1966 cmd
= "match source-vrf {}".format(source_vrf
)
1967 rmap_data
.append(cmd
)
1970 cmd
= "match metric {}".format(metric
)
1971 rmap_data
.append(cmd
)
1973 result
= create_common_configuration(
1974 tgen
, router
, rmap_data
, "route_maps", build
=build
1977 except InvalidCLIError
:
1979 errormsg
= traceback
.format_exc()
1980 logger
.error(errormsg
)
1983 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1987 def delete_route_maps(tgen
, input_dict
):
1989 Delete ip route maps from device
1990 * `tgen` : Topogen object
1991 * `input_dict` : for which router,
1992 route map has to be deleted
1995 # Delete route-map rmap_1 and rmap_2 from router r1
1998 "route_maps": ["rmap_1", "rmap__2"]
2001 result = delete_route_maps("ipv4", input_dict)
2004 errormsg(str) or True
2006 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2008 for router
in input_dict
.keys():
2009 route_maps
= input_dict
[router
]["route_maps"][:]
2010 rmap_data
= input_dict
[router
]
2011 rmap_data
["route_maps"] = {}
2012 for route_map_name
in route_maps
:
2013 rmap_data
["route_maps"].update({route_map_name
: [{"delete": True}]})
2015 return create_route_maps(tgen
, input_dict
)
2018 def create_bgp_community_lists(tgen
, input_dict
, build
=False):
2020 Create bgp community-list or large-community-list on the devices as per
2021 the arguments passed. Takes list of communities in input.
2024 * `tgen` : Topogen object
2025 * `input_dict` : Input dict data, required when configuring from testcase
2026 * `build` : Only for initial setup phase this is set as True.
2031 "bgp_community_lists": [
2033 "community_type": "standard",
2035 "name": "rmap_lcomm_{}".format(addr_type),
2036 "value": "1:1:1 1:2:3 2:1:1 2:2:2",
2043 result = create_bgp_community_lists(tgen, input_dict_1)
2047 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2048 input_dict
= deepcopy(input_dict
)
2050 for router
in input_dict
.keys():
2051 if "bgp_community_lists" not in input_dict
[router
]:
2052 errormsg
= "bgp_community_lists not present in input_dict"
2053 logger
.debug(errormsg
)
2058 community_list
= input_dict
[router
]["bgp_community_lists"]
2059 for community_dict
in community_list
:
2060 del_action
= community_dict
.setdefault("delete", False)
2061 community_type
= community_dict
.setdefault("community_type", None)
2062 action
= community_dict
.setdefault("action", None)
2063 value
= community_dict
.setdefault("value", "")
2064 large
= community_dict
.setdefault("large", None)
2065 name
= community_dict
.setdefault("name", None)
2067 cmd
= "bgp large-community-list"
2069 cmd
= "bgp community-list"
2071 if not large
and not (community_type
and action
and value
):
2073 "community_type, action and value are "
2074 "required in bgp_community_list"
2076 logger
.error(errormsg
)
2080 community_type
= int(community_type
)
2081 cmd
= "{} {} {} {}".format(cmd
, community_type
, action
, value
)
2084 cmd
= "{} {} {} {} {}".format(
2085 cmd
, community_type
, name
, action
, value
2089 cmd
= "no {}".format(cmd
)
2091 config_data
.append(cmd
)
2093 result
= create_common_configuration(
2094 tgen
, router
, config_data
, "bgp_community_list", build
=build
2097 except InvalidCLIError
:
2099 errormsg
= traceback
.format_exc()
2100 logger
.error(errormsg
)
2103 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2107 def shutdown_bringup_interface(tgen
, dut
, intf_name
, ifaceaction
=False):
2109 Shutdown or bringup router's interface "
2110 * `tgen` : Topogen object
2111 * `dut` : Device under test
2112 * `intf_name` : Interface name to be shut/no shut
2113 * `ifaceaction` : Action, to shut/no shut interface,
2119 # Shut down ineterface
2120 shutdown_bringup_interface(tgen, dut, intf, False)
2121 # Bring up ineterface
2122 shutdown_bringup_interface(tgen, dut, intf, True)
2125 errormsg(str) or True
2128 router_list
= tgen
.routers()
2130 logger
.info("Bringing up interface : {}".format(intf_name
))
2132 logger
.info("Shutting down interface : {}".format(intf_name
))
2134 interface_set_status(router_list
[dut
], intf_name
, ifaceaction
)
2138 tgen
, router
, intf
, group_addr_range
, next_hop
=None, src
=None, del_action
=None
2145 * `tgen` : Topogen object
2146 * `router`: router for which kernal routes needs to be added
2147 * `intf`: interface name, for which kernal routes needs to be added
2148 * `bindToAddress`: bind to <host>, an interface or multicast
2156 logger
.debug("Entering lib API: addKernelRoute()")
2158 rnode
= tgen
.routers()[router
]
2160 if type(group_addr_range
) is not list:
2161 group_addr_range
= [group_addr_range
]
2163 for grp_addr
in group_addr_range
:
2165 addr_type
= validate_ip_address(grp_addr
)
2166 if addr_type
== "ipv4":
2167 if next_hop
is not None:
2168 cmd
= "ip route add {} via {}".format(grp_addr
, next_hop
)
2170 cmd
= "ip route add {} dev {}".format(grp_addr
, intf
)
2172 cmd
= "ip route del {}".format(grp_addr
)
2173 verify_cmd
= "ip route"
2174 elif addr_type
== "ipv6":
2176 cmd
= "ip -6 route add {} dev {} src {}".format(grp_addr
, intf
, src
)
2178 cmd
= "ip -6 route add {} via {}".format(grp_addr
, next_hop
)
2179 verify_cmd
= "ip -6 route"
2181 cmd
= "ip -6 route del {}".format(grp_addr
)
2183 logger
.info("[DUT: {}]: Running command: [{}]".format(router
, cmd
))
2184 output
= rnode
.run(cmd
)
2186 # Verifying if ip route added to kernal
2187 result
= rnode
.run(verify_cmd
)
2188 logger
.debug("{}\n{}".format(verify_cmd
, result
))
2190 ip
, mask
= grp_addr
.split("/")
2191 if mask
== "32" or mask
== "128":
2194 if not re_search(r
"{}".format(grp_addr
), result
) and mask
is not "0":
2196 "[DUT: {}]: Kernal route is not added for group"
2197 " address {} Config output: {}".format(router
, grp_addr
, output
)
2202 logger
.debug("Exiting lib API: addKernelRoute()")
2206 def configure_vxlan(tgen
, input_dict
):
2208 Add and configure vxlan
2210 * `tgen`: tgen onject
2211 * `input_dict` : data for vxlan config
2218 "vxlan_name": "vxlan75100",
2219 "vxlan_id": "75100",
2221 "local_addr": "120.0.0.1",
2228 configure_vxlan(tgen, input_dict)
2236 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2238 router_list
= tgen
.routers()
2239 for dut
in input_dict
.keys():
2240 rnode
= tgen
.routers()[dut
]
2242 if "vxlan" in input_dict
[dut
]:
2243 for vxlan_dict
in input_dict
[dut
]["vxlan"]:
2246 del_vxlan
= vxlan_dict
.setdefault("delete", None)
2247 vxlan_names
= vxlan_dict
.setdefault("vxlan_name", [])
2248 vxlan_ids
= vxlan_dict
.setdefault("vxlan_id", [])
2249 dstport
= vxlan_dict
.setdefault("dstport", None)
2250 local_addr
= vxlan_dict
.setdefault("local_addr", None)
2251 learning
= vxlan_dict
.setdefault("learning", None)
2254 if vxlan_names
and vxlan_ids
:
2255 for vxlan_name
, vxlan_id
in zip(vxlan_names
, vxlan_ids
):
2259 cmd
= "{} del {} type vxlan id {}".format(
2260 cmd
, vxlan_name
, vxlan_id
2263 cmd
= "{} add {} type vxlan id {}".format(
2264 cmd
, vxlan_name
, vxlan_id
2268 cmd
= "{} dstport {}".format(cmd
, dstport
)
2271 ip_cmd
= "ip addr add {} dev {}".format(
2272 local_addr
, vxlan_name
2275 ip_cmd
= "ip addr del {} dev {}".format(
2276 local_addr
, vxlan_name
2279 config_data
.append(ip_cmd
)
2281 cmd
= "{} local {}".format(cmd
, local_addr
)
2283 if learning
== "no":
2284 cmd
= "{} {} learning".format(cmd
, learning
)
2286 elif learning
== "yes":
2287 cmd
= "{} learning".format(cmd
)
2289 config_data
.append(cmd
)
2292 for _cmd
in config_data
:
2293 logger
.info("[DUT: %s]: Running command: %s", dut
, _cmd
)
2296 except InvalidCLIError
:
2298 errormsg
= traceback
.format_exc()
2299 logger
.error(errormsg
)
2302 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2307 def configure_brctl(tgen
, topo
, input_dict
):
2309 Add and configure brctl
2311 * `tgen`: tgen onject
2312 * `input_dict` : data for brctl config
2319 "brctl_name": "br100",
2320 "addvxlan": "vxlan75100",
2327 configure_brctl(tgen, topo, input_dict)
2335 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2337 router_list
= tgen
.routers()
2338 for dut
in input_dict
.keys():
2339 rnode
= tgen
.routers()[dut
]
2341 if "brctl" in input_dict
[dut
]:
2342 for brctl_dict
in input_dict
[dut
]["brctl"]:
2344 brctl_names
= brctl_dict
.setdefault("brctl_name", [])
2345 addvxlans
= brctl_dict
.setdefault("addvxlan", [])
2346 stp_values
= brctl_dict
.setdefault("stp", [])
2347 vrfs
= brctl_dict
.setdefault("vrf", [])
2349 ip_cmd
= "ip link set"
2350 for brctl_name
, vxlan
, vrf
, stp
in zip(
2351 brctl_names
, addvxlans
, vrfs
, stp_values
2355 cmd
= "brctl addbr {}".format(brctl_name
)
2357 logger
.info("[DUT: %s]: Running command: %s", dut
, cmd
)
2360 ip_cmd_list
.append("{} up dev {}".format(ip_cmd
, brctl_name
))
2363 cmd
= "brctl addif {} {}".format(brctl_name
, vxlan
)
2365 logger
.info("[DUT: %s]: Running command: %s", dut
, cmd
)
2368 ip_cmd_list
.append("{} up dev {}".format(ip_cmd
, vxlan
))
2371 cmd
= "brctl stp {} {}".format(brctl_name
, stp
)
2373 logger
.info("[DUT: %s]: Running command: %s", dut
, cmd
)
2378 "{} dev {} master {}".format(ip_cmd
, brctl_name
, vrf
)
2381 for intf_name
, data
in topo
["routers"][dut
]["links"].items():
2382 if "vrf" not in data
:
2385 if data
["vrf"] == vrf
:
2387 "{} up dev {}".format(ip_cmd
, data
["interface"])
2391 for _ip_cmd
in ip_cmd_list
:
2392 logger
.info("[DUT: %s]: Running command: %s", dut
, _ip_cmd
)
2395 except InvalidCLIError
:
2397 errormsg
= traceback
.format_exc()
2398 logger
.error(errormsg
)
2401 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2405 def configure_interface_mac(tgen
, input_dict
):
2407 Add and configure brctl
2409 * `tgen`: tgen onject
2410 * `input_dict` : data for mac config
2414 "br75100": "00:80:48:BA:d1:00,
2415 "br75200": "00:80:48:BA:d1:00
2419 configure_interface_mac(tgen, input_mac)
2427 router_list
= tgen
.routers()
2428 for dut
in input_dict
.keys():
2429 rnode
= tgen
.routers()[dut
]
2431 for intf
, mac
in input_dict
[dut
].items():
2432 cmd
= "ifconfig {} hw ether {}".format(intf
, mac
)
2433 logger
.info("[DUT: %s]: Running command: %s", dut
, cmd
)
2436 result
= rnode
.run(cmd
)
2437 if len(result
) != 0:
2440 except InvalidCLIError
:
2442 errormsg
= traceback
.format_exc()
2443 logger
.error(errormsg
)
2449 #############################################
2451 #############################################
2452 @retry(attempts
=5, wait
=2, return_is_str
=True, initial_wait
=2)
2465 Data will be read from input_dict or input JSON file, API will generate
2466 same prefixes, which were redistributed by either create_static_routes() or
2467 advertise_networks_using_network_command() and do will verify next_hop and
2468 each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
2473 * `tgen` : topogen object
2474 * `addr_type` : ip type, ipv4/ipv6
2475 * `dut`: Device Under Test, for which user wants to test the data
2476 * `input_dict` : input dict, has details of static routes
2477 * `next_hop`[optional]: next_hop which needs to be verified,
2479 * `protocol`[optional]: protocol, default = None
2483 # RIB can be verified for static routes OR network advertised using
2484 network command. Following are input_dicts to create static routes
2485 and advertise networks using network command. Any one of the input_dict
2486 can be passed to verify_rib() to verify routes in DUT"s RIB.
2488 # Creating static routes for r1
2491 "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
2492 "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
2494 # Advertising networks using network command in router r1
2497 "advertise_networks": [{"start_ip": "20.0.0.0/32",
2498 "no_of_network": 10},
2499 {"start_ip": "30.0.0.0/32"}]
2501 # Verifying ipv4 routes in router r1 learned via BGP
2504 result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
2508 errormsg(str) or True
2511 logger
.info("Entering lib API: verify_rib()")
2513 router_list
= tgen
.routers()
2514 additional_nexthops_in_required_nhs
= []
2516 for routerInput
in input_dict
.keys():
2517 for router
, rnode
in router_list
.iteritems():
2521 logger
.info("Checking router %s RIB:", router
)
2523 # Verifying RIB routes
2524 if addr_type
== "ipv4":
2525 command
= "show ip route"
2527 command
= "show ipv6 route"
2532 if "static_routes" in input_dict
[routerInput
]:
2533 static_routes
= input_dict
[routerInput
]["static_routes"]
2535 for static_route
in static_routes
:
2536 if "vrf" in static_route
and static_route
["vrf"] is not None:
2539 "[DUT: {}]: Verifying routes for VRF:"
2540 " {}".format(router
, static_route
["vrf"])
2543 cmd
= "{} vrf {}".format(command
, static_route
["vrf"])
2546 cmd
= "{}".format(command
)
2549 cmd
= "{} {}".format(cmd
, protocol
)
2551 cmd
= "{} json".format(cmd
)
2553 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2555 # Verifying output dictionary rib_routes_json is not empty
2556 if bool(rib_routes_json
) is False:
2557 errormsg
= "No route found in rib of router {}..".format(router
)
2560 network
= static_route
["network"]
2561 if "no_of_ip" in static_route
:
2562 no_of_ip
= static_route
["no_of_ip"]
2566 if "tag" in static_route
:
2567 _tag
= static_route
["tag"]
2571 # Generating IPs for verification
2572 ip_list
= generate_ips(network
, no_of_ip
)
2576 for st_rt
in ip_list
:
2577 st_rt
= str(ipaddress
.ip_network(unicode(st_rt
)))
2579 _addr_type
= validate_ip_address(st_rt
)
2580 if _addr_type
!= addr_type
:
2583 if st_rt
in rib_routes_json
:
2585 found_routes
.append(st_rt
)
2587 if fib
and next_hop
:
2588 if type(next_hop
) is not list:
2589 next_hop
= [next_hop
]
2591 for mnh
in range(0, len(rib_routes_json
[st_rt
])):
2594 in rib_routes_json
[st_rt
][mnh
]["nexthops"][0]
2599 for rib_r
in rib_routes_json
[st_rt
][
2606 missing_list_of_nexthops
= set(
2608 ).difference(next_hop
)
2609 additional_nexthops_in_required_nhs
= set(
2611 ).difference(found_hops
[0])
2613 if additional_nexthops_in_required_nhs
:
2616 "%s is not active for route %s in "
2617 "RIB of router %s\n",
2618 additional_nexthops_in_required_nhs
,
2623 "Nexthop {} is not active"
2624 " for route {} in RIB of router"
2626 additional_nexthops_in_required_nhs
,
2635 elif next_hop
and fib
is None:
2636 if type(next_hop
) is not list:
2637 next_hop
= [next_hop
]
2640 for rib_r
in rib_routes_json
[st_rt
][0]["nexthops"]
2644 missing_list_of_nexthops
= set(
2646 ).difference(next_hop
)
2647 additional_nexthops_in_required_nhs
= set(
2649 ).difference(found_hops
)
2651 if additional_nexthops_in_required_nhs
:
2653 "Missing nexthop %s for route"
2654 " %s in RIB of router %s\n",
2655 additional_nexthops_in_required_nhs
,
2660 "Nexthop {} is Missing for "
2661 "route {} in RIB of router {}\n".format(
2662 additional_nexthops_in_required_nhs
,
2672 if "tag" not in rib_routes_json
[st_rt
][0]:
2674 "[DUT: {}]: tag is not"
2676 " route {} in RIB \n".format(dut
, st_rt
)
2680 if _tag
!= rib_routes_json
[st_rt
][0]["tag"]:
2682 "[DUT: {}]: tag value {}"
2683 " is not matched for"
2684 " route {} in RIB \n".format(dut
, _tag
, st_rt
,)
2688 if metric
is not None:
2689 if "metric" not in rib_routes_json
[st_rt
][0]:
2691 "[DUT: {}]: metric is"
2693 " route {} in RIB \n".format(dut
, st_rt
)
2697 if metric
!= rib_routes_json
[st_rt
][0]["metric"]:
2699 "[DUT: {}]: metric value "
2700 "{} is not matched for "
2701 "route {} in RIB \n".format(dut
, metric
, st_rt
,)
2706 missing_routes
.append(st_rt
)
2710 "[DUT: {}]: Found next_hop {} for all bgp"
2711 " routes in RIB".format(router
, next_hop
)
2714 if len(missing_routes
) > 0:
2715 errormsg
= "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
2722 "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
2729 if "bgp" in input_dict
[routerInput
]:
2731 "advertise_networks"
2732 not in input_dict
[routerInput
]["bgp"]["address_family"][addr_type
][
2740 advertise_network
= input_dict
[routerInput
]["bgp"]["address_family"][
2742 ]["unicast"]["advertise_networks"]
2744 # Continue if there are no network advertise
2745 if len(advertise_network
) == 0:
2748 for advertise_network_dict
in advertise_network
:
2749 if "vrf" in advertise_network_dict
:
2750 cmd
= "{} vrf {} json".format(command
, static_route
["vrf"])
2752 cmd
= "{} json".format(command
)
2754 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2756 # Verifying output dictionary rib_routes_json is not empty
2757 if bool(rib_routes_json
) is False:
2758 errormsg
= "No route found in rib of router {}..".format(router
)
2761 start_ip
= advertise_network_dict
["network"]
2762 if "no_of_network" in advertise_network_dict
:
2763 no_of_network
= advertise_network_dict
["no_of_network"]
2767 # Generating IPs for verification
2768 ip_list
= generate_ips(start_ip
, no_of_network
)
2772 for st_rt
in ip_list
:
2773 st_rt
= str(ipaddress
.ip_network(unicode(st_rt
)))
2775 _addr_type
= validate_ip_address(st_rt
)
2776 if _addr_type
!= addr_type
:
2779 if st_rt
in rib_routes_json
:
2781 found_routes
.append(st_rt
)
2784 if type(next_hop
) is not list:
2785 next_hop
= [next_hop
]
2789 for nh_dict
in rib_routes_json
[st_rt
][0]["nexthops"]:
2790 if nh_dict
["ip"] != nh
:
2795 if count
== len(next_hop
):
2799 "Nexthop {} is Missing"
2801 "RIB of router {}\n".format(next_hop
, st_rt
, dut
)
2805 missing_routes
.append(st_rt
)
2809 "Found next_hop {} for all routes in RIB"
2810 " of router {}\n".format(next_hop
, dut
)
2813 if len(missing_routes
) > 0:
2815 "Missing {} route in RIB of router {}, "
2816 "routes: {} \n".format(addr_type
, dut
, missing_routes
)
2822 "Verified {} routes in router {} RIB, found"
2823 " routes are: {}\n".format(addr_type
, dut
, found_routes
)
2826 logger
.info("Exiting lib API: verify_rib()")
2830 def verify_admin_distance_for_static_routes(tgen
, input_dict
):
2832 API to verify admin distance for static routes as defined in input_dict/
2833 input JSON by running show ip/ipv6 route json command.
2836 * `tgen` : topogen object
2837 * `input_dict`: having details like - for which router and static routes
2838 admin dsitance needs to be verified
2841 # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
2842 10.0.0.2 in router r1
2846 "network": "10.0.20.1/32",
2847 "admin_distance": 10,
2848 "next_hop": "10.0.0.2"
2852 result = verify_admin_distance_for_static_routes(tgen, input_dict)
2855 errormsg(str) or True
2858 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2860 for router
in input_dict
.keys():
2861 if router
not in tgen
.routers():
2864 rnode
= tgen
.routers()[router
]
2866 for static_route
in input_dict
[router
]["static_routes"]:
2867 addr_type
= validate_ip_address(static_route
["network"])
2868 # Command to execute
2869 if addr_type
== "ipv4":
2870 command
= "show ip route json"
2872 command
= "show ipv6 route json"
2873 show_ip_route_json
= run_frr_cmd(rnode
, command
, isjson
=True)
2876 "Verifying admin distance for static route %s" " under dut %s:",
2880 network
= static_route
["network"]
2881 next_hop
= static_route
["next_hop"]
2882 admin_distance
= static_route
["admin_distance"]
2883 route_data
= show_ip_route_json
[network
][0]
2884 if network
in show_ip_route_json
:
2885 if route_data
["nexthops"][0]["ip"] == next_hop
:
2886 if route_data
["distance"] != admin_distance
:
2888 "Verification failed: admin distance"
2889 " for static route {} under dut {},"
2890 " found:{} but expected:{}".format(
2893 route_data
["distance"],
2900 "Verification successful: admin"
2901 " distance for static route %s under"
2902 " dut %s, found:%s",
2905 route_data
["distance"],
2910 "Static route {} not found in "
2911 "show_ip_route_json for dut {}".format(network
, router
)
2915 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2919 def verify_prefix_lists(tgen
, input_dict
):
2921 Running "show ip prefix-list" command and verifying given prefix-list
2922 is present in router.
2925 * `tgen` : topogen object
2926 * `input_dict`: data to verify prefix lists
2929 # To verify pf_list_1 is present in router r1
2932 "prefix_lists": ["pf_list_1"]
2934 result = verify_prefix_lists("ipv4", input_dict, tgen)
2937 errormsg(str) or True
2940 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2942 for router
in input_dict
.keys():
2943 if router
not in tgen
.routers():
2946 rnode
= tgen
.routers()[router
]
2948 # Show ip prefix list
2949 show_prefix_list
= run_frr_cmd(rnode
, "show ip prefix-list")
2951 # Verify Prefix list is deleted
2952 prefix_lists_addr
= input_dict
[router
]["prefix_lists"]
2953 for addr_type
in prefix_lists_addr
:
2954 if not check_address_types(addr_type
):
2957 for prefix_list
in prefix_lists_addr
[addr_type
].keys():
2958 if prefix_list
in show_prefix_list
:
2960 "Prefix list {} is/are present in the router"
2961 " {}".format(prefix_list
, router
)
2966 "Prefix list %s is/are not present in the router" " from router %s",
2971 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2975 @retry(attempts
=2, wait
=4, return_is_str
=True, initial_wait
=2)
2976 def verify_route_maps(tgen
, input_dict
):
2978 Running "show route-map" command and verifying given route-map
2979 is present in router.
2982 * `tgen` : topogen object
2983 * `input_dict`: data to verify prefix lists
2986 # To verify rmap_1 and rmap_2 are present in router r1
2989 "route_maps": ["rmap_1", "rmap_2"]
2992 result = verify_route_maps(tgen, input_dict)
2995 errormsg(str) or True
2998 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3000 for router
in input_dict
.keys():
3001 if router
not in tgen
.routers():
3004 rnode
= tgen
.routers()[router
]
3006 show_route_maps
= rnode
.vtysh_cmd("show route-map")
3008 # Verify route-map is deleted
3009 route_maps
= input_dict
[router
]["route_maps"]
3010 for route_map
in route_maps
:
3011 if route_map
in show_route_maps
:
3012 errormsg
= "Route map {} is not deleted from router" " {}".format(
3018 "Route map %s is/are deleted successfully from" " router %s",
3023 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3027 @retry(attempts
=3, wait
=4, return_is_str
=True)
3028 def verify_bgp_community(tgen
, addr_type
, router
, network
, input_dict
=None):
3030 API to veiryf BGP large community is attached in route for any given
3031 DUT by running "show bgp ipv4/6 {route address} json" command.
3034 * `tgen`: topogen object
3035 * `addr_type` : ip type, ipv4/ipv6
3036 * `dut`: Device Under Test
3037 * `network`: network for which set criteria needs to be verified
3038 * `input_dict`: having details like - for which router, community and
3039 values needs to be verified
3042 networks = ["200.50.2.0/32"]
3044 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
3046 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
3049 errormsg(str) or True
3052 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3053 if router
not in tgen
.routers():
3056 rnode
= tgen
.routers()[router
]
3059 "Verifying BGP community attributes on dut %s: for %s " "network %s",
3066 cmd
= "show bgp {} {} json".format(addr_type
, net
)
3067 show_bgp_json
= rnode
.vtysh_cmd(cmd
, isjson
=True)
3068 logger
.info(show_bgp_json
)
3069 if "paths" not in show_bgp_json
:
3070 return "Prefix {} not found in BGP table of router: {}".format(net
, router
)
3072 as_paths
= show_bgp_json
["paths"]
3074 for i
in range(len(as_paths
)):
3076 "largeCommunity" in show_bgp_json
["paths"][i
]
3077 or "community" in show_bgp_json
["paths"][i
]
3081 "Large Community attribute is found for route:" " %s in router: %s",
3085 if input_dict
is not None:
3086 for criteria
, comm_val
in input_dict
.items():
3087 show_val
= show_bgp_json
["paths"][i
][criteria
]["string"]
3088 if comm_val
== show_val
:
3090 "Verifying BGP %s for prefix: %s"
3091 " in router: %s, found expected"
3100 "Failed: Verifying BGP attribute"
3101 " {} for route: {} in router: {}"
3102 ", expected value: {} but found"
3103 ": {}".format(criteria
, net
, router
, comm_val
, show_val
)
3109 "Large Community attribute is not found for route: "
3110 "{} in router: {} ".format(net
, router
)
3114 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3118 def verify_create_community_list(tgen
, input_dict
):
3120 API is to verify if large community list is created for any given DUT in
3121 input_dict by running "sh bgp large-community-list {"comm_name"} detail"
3125 * `tgen`: topogen object
3126 * `input_dict`: having details like - for which router, large community
3127 needs to be verified
3132 "large-community-list": {
3134 "Test1": [{"action": "PERMIT", "attribute":\
3137 result = verify_create_community_list(tgen, input_dict)
3140 errormsg(str) or True
3143 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3145 for router
in input_dict
.keys():
3146 if router
not in tgen
.routers():
3149 rnode
= tgen
.routers()[router
]
3151 logger
.info("Verifying large-community is created for dut %s:", router
)
3153 for comm_data
in input_dict
[router
]["bgp_community_lists"]:
3154 comm_name
= comm_data
["name"]
3155 comm_type
= comm_data
["community_type"]
3156 show_bgp_community
= run_frr_cmd(
3157 rnode
, "show bgp large-community-list {} detail".format(comm_name
)
3160 # Verify community list and type
3161 if comm_name
in show_bgp_community
and comm_type
in show_bgp_community
:
3163 "BGP %s large-community-list %s is" " created", comm_type
, comm_name
3166 errormsg
= "BGP {} large-community-list {} is not" " created".format(
3167 comm_type
, comm_name
3171 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3175 def verify_cli_json(tgen
, input_dict
):
3177 API to verify if JSON is available for clis
3182 * `tgen`: topogen object
3183 * `input_dict`: CLIs for which JSON needs to be verified
3188 "cli": ["show evpn vni detail", show evpn rmac vni all]
3192 result = verify_cli_json(tgen, input_dict)
3196 errormsg(str) or True
3199 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3200 for dut
in input_dict
.keys():
3201 rnode
= tgen
.routers()[dut
]
3203 for cli
in input_dict
[dut
]["cli"]:
3205 "[DUT: %s]: Verifying JSON is available for " "CLI %s :", dut
, cli
3208 test_cli
= "{} json".format(cli
)
3209 ret_json
= rnode
.vtysh_cmd(test_cli
, isjson
=True)
3210 if not bool(ret_json
):
3211 errormsg
= "CLI: %s, JSON format is not available" % (cli
)
3213 elif "unknown" in ret_json
or "Unknown" in ret_json
:
3214 errormsg
= "CLI: %s, JSON format is not available" % (cli
)
3218 "CLI : %s JSON format is available: " "\n %s", cli
, ret_json
3221 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3226 @retry(attempts
=2, wait
=4, return_is_str
=True, initial_wait
=2)
3227 def verify_evpn_vni(tgen
, input_dict
):
3229 API to verify evpn vni details using "show evpn vni detail json"
3234 * `tgen`: topogen object
3235 * `input_dict`: having details like - for which router, evpn details
3236 needs to be verified
3245 "vxlanIntf": "vxlan75100",
3246 "localVtepIp": "120.1.1.1",
3254 result = verify_evpn_vni(tgen, input_dict)
3258 errormsg(str) or True
3261 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3262 for dut
in input_dict
.keys():
3263 rnode
= tgen
.routers()[dut
]
3265 logger
.info("[DUT: %s]: Verifying evpn vni details :", dut
)
3267 cmd
= "show evpn vni detail json"
3268 evpn_all_vni_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
3269 if not bool(evpn_all_vni_json
):
3270 errormsg
= "No output for '{}' cli".format(cmd
)
3273 if "vni" in input_dict
[dut
]:
3274 for vni_dict
in input_dict
[dut
]["vni"]:
3276 vni
= vni_dict
["name"]
3277 for evpn_vni_json
in evpn_all_vni_json
:
3278 if "vni" in evpn_vni_json
:
3279 if evpn_vni_json
["vni"] != int(vni
):
3282 for attribute
in vni_dict
.keys():
3283 if vni_dict
[attribute
] != evpn_vni_json
[attribute
]:
3285 "[DUT: %s] Verifying "
3286 "%s for VNI: %s [FAILED]||"
3293 vni_dict
[attribute
],
3294 evpn_vni_json
[attribute
],
3302 "[DUT: %s] Verifying"
3303 " %s for VNI: %s , "
3304 "Found Expected : %s ",
3308 evpn_vni_json
[attribute
],
3311 if evpn_vni_json
["state"] != "Up":
3313 "[DUT: %s] Failed: Verifying"
3314 " State for VNI: %s is not Up" % (dut
, vni
)
3321 " VNI: %s is not present in JSON" % (dut
, vni
)
3327 "[DUT %s]: Verifying VNI : %s "
3328 "details and state is Up [PASSED]!!",
3336 "[DUT: %s] Failed:" " vni details are not present in input data" % (dut
)
3340 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3344 @retry(attempts
=2, wait
=4, return_is_str
=True, initial_wait
=2)
3345 def verify_vrf_vni(tgen
, input_dict
):
3347 API to verify vrf vni details using "show vrf vni json"
3352 * `tgen`: topogen object
3353 * `input_dict`: having details like - for which router, evpn details
3354 needs to be verified
3363 "vxlanIntf": "vxlan75100",
3365 "routerMac": "00:80:48:ba:d1:00",
3373 result = verify_vrf_vni(tgen, input_dict)
3377 errormsg(str) or True
3380 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3381 for dut
in input_dict
.keys():
3382 rnode
= tgen
.routers()[dut
]
3384 logger
.info("[DUT: %s]: Verifying vrf vni details :", dut
)
3386 cmd
= "show vrf vni json"
3387 vrf_all_vni_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
3388 if not bool(vrf_all_vni_json
):
3389 errormsg
= "No output for '{}' cli".format(cmd
)
3392 if "vrfs" in input_dict
[dut
]:
3393 for vrfs
in input_dict
[dut
]["vrfs"]:
3394 for vrf
, vrf_dict
in vrfs
.items():
3396 for vrf_vni_json
in vrf_all_vni_json
["vrfs"]:
3397 if "vrf" in vrf_vni_json
:
3398 if vrf_vni_json
["vrf"] != vrf
:
3401 for attribute
in vrf_dict
.keys():
3402 if vrf_dict
[attribute
] == vrf_vni_json
[attribute
]:
3405 "[DUT %s]: VRF: %s, "
3407 ", Found Expected: %s "
3412 vrf_vni_json
[attribute
],
3416 "[DUT: %s] VRF: %s, "
3417 "verifying %s [FAILED!!] "
3424 vrf_dict
[attribute
],
3425 vrf_vni_json
[attribute
],
3431 errormsg
= "[DUT: %s] VRF: %s " "is not present in JSON" % (
3439 "[DUT %s] Verifying VRF: %s " " details [PASSED]!!",
3447 "[DUT: %s] Failed:" " vrf details are not present in input data" % (dut
)
3451 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))