]>
git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/common_config.py
2 # Copyright (c) 2019 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
21 from collections
import OrderedDict
22 from datetime
import datetime
23 from time
import sleep
24 from copy
import deepcopy
25 from subprocess
import call
26 from subprocess
import STDOUT
as SUB_STDOUT
27 from subprocess
import PIPE
as SUB_PIPE
28 from subprocess
import Popen
29 from functools
import wraps
30 from re
import search
as re_search
31 from tempfile
import mkdtemp
40 from lib
.topolog
import logger
, logger_config
41 from lib
.topogen
import TopoRouter
42 from lib
.topotest
import interface_set_status
45 FRRCFG_FILE
= "frr_json.conf"
46 FRRCFG_BKUP_FILE
= "frr_json_initial.conf"
48 ERROR_LIST
= ["Malformed", "Failure", "Unknown", "Incomplete"]
52 CD
= os
.path
.dirname(os
.path
.realpath(__file__
))
53 PYTESTINI_PATH
= os
.path
.join(CD
, "../pytest.ini")
55 # Creating tmp dir with testsuite name to avoid conflict condition when
56 # multiple testsuites run together. All temporary files would be created
57 # in this dir and this dir would be removed once testsuite run is
59 LOGDIR
= "/tmp/topotests/"
62 # NOTE: to save execution logs to log file frrtest_log_dir must be configured
64 config
= ConfigParser
.ConfigParser()
65 config
.read(PYTESTINI_PATH
)
67 config_section
= "topogen"
69 if config
.has_option("topogen", "verbosity"):
70 loglevel
= config
.get("topogen", "verbosity")
71 loglevel
= loglevel
.upper()
75 if config
.has_option("topogen", "frrtest_log_dir"):
76 frrtest_log_dir
= config
.get("topogen", "frrtest_log_dir")
77 time_stamp
= datetime
.time(datetime
.now())
78 logfile_name
= "frr_test_bgp_"
79 frrtest_log_file
= frrtest_log_dir
+ logfile_name
+ str(time_stamp
)
80 print("frrtest_log_file..", frrtest_log_file
)
82 logger
= logger_config
.get_logger(name
="test_execution_logs",
84 target
=frrtest_log_file
)
85 print("Logs will be sent to logfile: {}".format(frrtest_log_file
))
87 if config
.has_option("topogen", "show_router_config"):
88 show_router_config
= config
.get("topogen", "show_router_config")
90 show_router_config
= False
92 # env variable for setting what address type to test
93 ADDRESS_TYPES
= os
.environ
.get("ADDRESS_TYPES")
96 # Saves sequence id numbers
103 def get_seq_id(obj_type
, router
, obj_name
):
105 Generates and saves sequence number in interval of 10
109 * `obj_type`: prefix_lists or route_maps
110 * `router`: router name
111 *` obj_name`: name of the prefix-list or route-map
115 Sequence number generated
118 router_data
= SEQ_ID
[obj_type
].setdefault(router
, {})
119 obj_data
= router_data
.setdefault(obj_name
, {})
120 seq_id
= obj_data
.setdefault("seq_id", 0)
122 seq_id
= int(seq_id
) + 10
123 obj_data
["seq_id"] = seq_id
128 def set_seq_id(obj_type
, router
, id, obj_name
):
130 Saves sequence number if not auto-generated and given by user
134 * `obj_type`: prefix_lists or route_maps
135 * `router`: router name
136 *` obj_name`: name of the prefix-list or route-map
138 router_data
= SEQ_ID
[obj_type
].setdefault(router
, {})
139 obj_data
= router_data
.setdefault(obj_name
, {})
140 seq_id
= obj_data
.setdefault("seq_id", 0)
142 seq_id
= int(seq_id
) + int(id)
143 obj_data
["seq_id"] = seq_id
146 class InvalidCLIError(Exception):
147 """Raise when the CLI command is wrong"""
151 def run_frr_cmd(rnode
, cmd
, isjson
=False):
153 Execute frr show commands in priviledged mode
155 * `rnode`: router node on which commands needs to executed
156 * `cmd`: Command to be executed on frr
157 * `isjson`: If command is to get json data or not
163 ret_data
= rnode
.vtysh_cmd(cmd
, isjson
=isjson
)
167 logger
.debug(ret_data
)
168 print_data
= rnode
.vtysh_cmd(cmd
.rstrip("json"), isjson
=False)
170 print_data
= ret_data
172 logger
.info('Output for command [ %s] on router %s:\n%s',
173 cmd
.rstrip("json"), rnode
.name
, print_data
)
177 raise InvalidCLIError('No actual cmd passed')
180 def create_common_configuration(tgen
, router
, data
, config_type
=None,
183 API to create object of class FRRConfig and also create frr_json.conf
184 file. It will create interface and common configurations and save it to
185 frr_json.conf and load to router
189 * `tgen`: tgen onject
190 * `data`: Congiguration data saved in a list.
191 * `router` : router id to be configured.
192 * `config_type` : Syntactic information while writing configuration. Should
193 be one of the value as mentioned in the config_map below.
194 * `build` : Only for initial setup phase this is set as True
200 TMPDIR
= os
.path
.join(LOGDIR
, tgen
.modname
)
202 fname
= "{}/{}/{}".format(TMPDIR
, router
, FRRCFG_FILE
)
204 config_map
= OrderedDict({
205 "general_config": "! FRR General Config\n",
206 "interface_config": "! Interfaces Config\n",
207 "static_route": "! Static Route Config\n",
208 "prefix_list": "! Prefix List Config\n",
209 "bgp_community_list": "! Community List Config\n",
210 "route_maps": "! Route Maps Config\n",
211 "bgp": "! BGP Config\n"
220 frr_cfg_fd
= open(fname
, mode
)
222 frr_cfg_fd
.write(config_map
[config_type
])
224 frr_cfg_fd
.write("{} \n".format(str(line
)))
225 frr_cfg_fd
.write("\n")
227 except IOError as err
:
228 logger
.error("Unable to open FRR Config File. error(%s): %s" %
229 (err
.errno
, err
.strerror
))
234 # If configuration applied from build, it will done at last
236 load_config_to_router(tgen
, router
)
241 def reset_config_on_routers(tgen
, routerName
=None):
243 Resets configuration on routers to the snapshot created using input JSON
244 file. It replaces existing router configuration with FRRCFG_BKUP_FILE
248 * `tgen` : Topogen object
249 * `routerName` : router config is to be reset
252 logger
.debug("Entering API: reset_config_on_routers")
254 router_list
= tgen
.routers()
255 for rname
in ROUTER_LIST
:
256 if routerName
and routerName
!= rname
:
259 router
= router_list
[rname
]
260 logger
.info("Configuring router %s to initial test configuration",
262 cfg
= router
.run("vtysh -c 'show running'")
263 fname
= "{}/{}/frr.sav".format(TMPDIR
, rname
)
264 dname
= "{}/{}/delta.conf".format(TMPDIR
, rname
)
266 for line
in cfg
.split("\n"):
269 if (line
== "Building configuration..." or
270 line
== "Current configuration:" or
278 run_cfg_file
= "{}/{}/frr.sav".format(TMPDIR
, rname
)
279 init_cfg_file
= "{}/{}/frr_json_initial.conf".format(TMPDIR
, rname
)
282 with
open(os
.path
.join(tempdir
, 'vtysh.conf'), 'w') as fd
:
285 command
= "/usr/lib/frr/frr-reload.py --confdir {} --input {} --test {} > {}". \
286 format(tempdir
, run_cfg_file
, init_cfg_file
, dname
)
287 result
= call(command
, shell
=True, stderr
=SUB_STDOUT
,
290 os
.unlink(os
.path
.join(tempdir
, 'vtysh.conf'))
293 # Assert if command fail
295 logger
.error("Delta file creation failed. Command executed %s",
297 with
open(run_cfg_file
, 'r') as fd
:
298 logger
.info('Running configuration saved in %s is:\n%s',
299 run_cfg_file
, fd
.read())
300 with
open(init_cfg_file
, 'r') as fd
:
301 logger
.info('Test configuration saved in %s is:\n%s',
302 init_cfg_file
, fd
.read())
304 err_cmd
= ['/usr/bin/vtysh', '-m', '-f', run_cfg_file
]
305 result
= Popen(err_cmd
, stdout
=SUB_PIPE
, stderr
=SUB_PIPE
)
306 output
= result
.communicate()
307 for out_data
in output
:
308 temp_data
= out_data
.decode('utf-8').lower()
309 for out_err
in ERROR_LIST
:
310 if out_err
.lower() in temp_data
:
311 logger
.error("Found errors while validating data in"
313 raise InvalidCLIError(out_data
)
314 raise InvalidCLIError("Unknown error in %s", output
)
317 delta
= StringIO
.StringIO()
318 delta
.write("configure terminal\n")
320 for line
in t_delta
.split("\n"):
322 if (line
== "Lines To Delete" or
323 line
== "===============" or
324 line
== "Lines To Add" or
325 line
== "============" or
332 output
= router
.vtysh_multicmd(delta
.getvalue(),
336 delta
= StringIO
.StringIO()
337 cfg
= router
.run("vtysh -c 'show running'")
338 for line
in cfg
.split("\n"):
343 # Router current configuration to log file or console if
344 # "show_router_config" is defined in "pytest.ini"
345 if show_router_config
:
346 logger
.info("Configuration on router {} after config reset:".
348 logger
.info(delta
.getvalue())
351 logger
.debug("Exting API: reset_config_on_routers")
355 def load_config_to_router(tgen
, routerName
, save_bkup
=False):
357 Loads configuration on router from the file FRRCFG_FILE.
361 * `tgen` : Topogen object
362 * `routerName` : router for which configuration to be loaded
363 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
366 logger
.debug("Entering API: load_config_to_router")
368 router_list
= tgen
.routers()
369 for rname
in ROUTER_LIST
:
370 if routerName
and routerName
!= rname
:
373 router
= router_list
[rname
]
375 frr_cfg_file
= "{}/{}/{}".format(TMPDIR
, rname
, FRRCFG_FILE
)
376 frr_cfg_bkup
= "{}/{}/{}".format(TMPDIR
, rname
,
378 with
open(frr_cfg_file
, "r+") as cfg
:
380 logger
.info("Applying following configuration on router"
381 " {}:\n{}".format(rname
, data
))
383 with
open(frr_cfg_bkup
, "w") as bkup
:
386 output
= router
.vtysh_multicmd(data
, pretty_output
=False)
387 for out_err
in ERROR_LIST
:
388 if out_err
.lower() in output
.lower():
389 raise InvalidCLIError("%s" % output
)
392 except IOError as err
:
393 errormsg
= ("Unable to open config File. error(%s):"
394 " %s", (err
.errno
, err
.strerror
))
397 # Router current configuration to log file or console if
398 # "show_router_config" is defined in "pytest.ini"
399 if show_router_config
:
400 new_config
= router
.run("vtysh -c 'show running'")
401 logger
.info(new_config
)
403 logger
.debug("Exting API: load_config_to_router")
407 def start_topology(tgen
):
409 Starting topology, create tmp files which are loaded to routers
410 to start deamons and then start routers
411 * `tgen` : topogen object
414 global TMPDIR
, ROUTER_LIST
416 tgen
.start_topology()
420 router_list
= tgen
.routers()
421 ROUTER_LIST
= sorted(router_list
.keys(),
422 key
=lambda x
: int(re_search('\d+', x
).group(0)))
423 TMPDIR
= os
.path
.join(LOGDIR
, tgen
.modname
)
425 router_list
= tgen
.routers()
426 for rname
in ROUTER_LIST
:
427 router
= router_list
[rname
]
431 # Creating router named dir and empty zebra.conf bgpd.conf files
432 # inside the current directory
433 if os
.path
.isdir('{}'.format(rname
)):
434 os
.system("rm -rf {}".format(rname
))
435 os
.mkdir('{}'.format(rname
))
436 os
.system('chmod -R go+rw {}'.format(rname
))
437 os
.chdir('{}/{}'.format(TMPDIR
, rname
))
438 os
.system('touch zebra.conf bgpd.conf')
440 os
.mkdir('{}'.format(rname
))
441 os
.system('chmod -R go+rw {}'.format(rname
))
442 os
.chdir('{}/{}'.format(TMPDIR
, rname
))
443 os
.system('touch zebra.conf bgpd.conf')
445 except IOError as (errno
, strerror
):
446 logger
.error("I/O error({0}): {1}".format(errno
, strerror
))
448 # Loading empty zebra.conf file to router, to start the zebra deamon
451 '{}/{}/zebra.conf'.format(TMPDIR
, rname
)
453 # Loading empty bgpd.conf file to router, to start the bgp deamon
456 '{}/{}/bgpd.conf'.format(TMPDIR
, rname
)
460 logger
.info("Starting all routers once topology is created")
464 def number_to_row(routerName
):
466 Returns the number for the router.
467 Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23
470 return int(routerName
[1:])
473 def number_to_column(routerName
):
475 Returns the number for the router.
476 Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1,
479 return ord(routerName
[0]) - 97
482 #############################################
483 # Common APIs, will be used by all protocols
484 #############################################
486 def validate_ip_address(ip_address
):
488 Validates the type of ip address
492 * `ip_address`: IPv4/IPv6 address
496 Type of address as string
499 if "/" in ip_address
:
500 ip_address
= ip_address
.split("/")[0]
505 socket
.inet_aton(ip_address
)
506 except socket
.error
as error
:
507 logger
.debug("Not a valid IPv4 address")
513 socket
.inet_pton(socket
.AF_INET6
, ip_address
)
514 except socket
.error
as error
:
515 logger
.debug("Not a valid IPv6 address")
520 if not v4
and not v6
:
521 raise Exception("InvalidIpAddr", "%s is neither valid IPv4 or IPv6"
522 " address" % ip_address
)
525 def check_address_types(addr_type
=None):
527 Checks environment variable set and compares with the current address type
530 addr_types_env
= os
.environ
.get("ADDRESS_TYPES")
531 if not addr_types_env
:
532 addr_types_env
= "dual"
534 if addr_types_env
== "dual":
535 addr_types
= ["ipv4", "ipv6"]
536 elif addr_types_env
== "ipv4":
537 addr_types
= ["ipv4"]
538 elif addr_types_env
== "ipv6":
539 addr_types
= ["ipv6"]
541 if addr_type
is None:
544 if addr_type
not in addr_types
:
545 logger
.error("{} not in supported/configured address types {}".
546 format(addr_type
, addr_types
))
552 def generate_ips(network
, no_of_ips
):
555 based on start_ip and no_of_ips
557 * `network` : from here the ip will start generating,
559 * `no_of_ips` : these many IPs will be generated
563 if type(network
) is not list:
566 for start_ipaddr
in network
:
567 if "/" in start_ipaddr
:
568 start_ip
= start_ipaddr
.split("/")[0]
569 mask
= int(start_ipaddr
.split("/")[1])
571 addr_type
= validate_ip_address(start_ip
)
572 if addr_type
== "ipv4":
573 start_ip
= ipaddr
.IPv4Address(unicode(start_ip
))
574 step
= 2 ** (32 - mask
)
575 if addr_type
== "ipv6":
576 start_ip
= ipaddr
.IPv6Address(unicode(start_ip
))
577 step
= 2 ** (128 - mask
)
581 while count
< no_of_ips
:
582 ipaddress_list
.append("{}/{}".format(next_ip
, mask
))
583 if addr_type
== "ipv6":
584 next_ip
= ipaddr
.IPv6Address(int(next_ip
) + step
)
589 return ipaddress_list
592 def find_interface_with_greater_ip(topo
, router
, loopback
=True,
595 Returns highest interface ip for ipv4/ipv6. If loopback is there then
596 it will return highest IP from loopback IPs otherwise from physical
599 * `topo` : json file data
600 * `router` : router for which hightest interface should be calculated
603 link_data
= topo
["routers"][router
]["links"]
607 for destRouterLink
, data
in sorted(link_data
.iteritems()):
609 if "type" in data
and data
["type"] == "loopback":
611 ip_address
= topo
["routers"][router
]["links"][
612 destRouterLink
]["ipv4"].split("/")[0]
613 lo_list
.append(ip_address
)
615 ip_address
= topo
["routers"][router
]["links"][
616 destRouterLink
]["ipv4"].split("/")[0]
617 interfaces_list
.append(ip_address
)
620 return sorted(lo_list
)[-1]
622 return sorted(interfaces_list
)[-1]
625 def write_test_header(tc_name
):
626 """ Display message at beginning of test case"""
628 logger
.info("*"*(len(tc_name
)+count
))
629 step("START -> Testcase : %s" % tc_name
, reset
=True)
630 logger
.info("*"*(len(tc_name
)+count
))
633 def write_test_footer(tc_name
):
634 """ Display message at end of test case"""
636 logger
.info("="*(len(tc_name
)+count
))
637 logger
.info("Testcase : %s -> PASSED", tc_name
)
638 logger
.info("="*(len(tc_name
)+count
))
641 def interface_status(tgen
, topo
, input_dict
):
643 Delete ip route maps from device
645 * `tgen` : Topogen object
646 * `topo` : json file data
647 * `input_dict` : for which router, route map has to be deleted
653 "interface_list": ['eth1-r1-r2', 'eth2-r1-r3'],
659 errormsg(str) or True
661 logger
.debug("Entering lib API: interface_status()")
665 for router
in input_dict
.keys():
667 interface_list
= input_dict
[router
]['interface_list']
668 status
= input_dict
[router
].setdefault('status', 'up')
669 for intf
in interface_list
:
670 rnode
= tgen
.routers()[router
]
671 interface_set_status(rnode
, intf
, status
)
673 # Load config to router
674 load_config_to_router(tgen
, router
)
676 except Exception as e
:
677 # handle any exception
678 logger
.error("Error %s occured. Arguments %s.", e
.message
, e
.args
)
681 errormsg
= traceback
.format_exc()
682 logger
.error(errormsg
)
685 logger
.debug("Exiting lib API: interface_status()")
689 def retry(attempts
=3, wait
=2, return_is_str
=True, initial_wait
=0):
691 Retries function execution, if return is an errormsg or exception
693 * `attempts`: Number of attempts to make
694 * `wait`: Number of seconds to wait between each attempt
695 * `return_is_str`: Return val is an errormsg in case of failure
696 * `initial_wait`: Sleeps for this much seconds before executing function
703 def func_retry(*args
, **kwargs
):
704 _wait
= kwargs
.pop('wait', wait
)
705 _attempts
= kwargs
.pop('attempts', attempts
)
706 _attempts
= int(_attempts
)
708 raise ValueError("attempts must be 0 or greater")
711 logger
.info("Waiting for [%s]s as initial delay", initial_wait
)
714 _return_is_str
= kwargs
.pop('return_is_str', return_is_str
)
715 for i
in range(1, _attempts
+ 1):
717 _expected
= kwargs
.setdefault('expected', True)
718 kwargs
.pop('expected')
719 ret
= func(*args
, **kwargs
)
720 logger
.debug("Function returned %s" % ret
)
721 if return_is_str
and isinstance(ret
, bool) and _expected
:
723 if isinstance(ret
, str) and _expected
is False:
728 except Exception as err
:
730 logger
.info("Max number of attempts (%r) reached",
734 logger
.info("Function returned %s", err
)
736 logger
.info("Retry [#%r] after sleeping for %ss"
739 func_retry
._original
= func
746 Prints step number for the test case step being executed
750 def __call__(self
, msg
, reset
):
755 logger
.info("STEP %s: '%s'", Stepper
.count
, msg
)
759 def step(msg
, reset
=False):
761 Call Stepper to print test steps. Need to reset at the beginning of test.
762 * ` msg` : Step message body.
763 * `reset` : Reset step count to 1 when set to True.
769 #############################################
770 # These APIs, will used by testcase
771 #############################################
772 def create_interfaces_cfg(tgen
, topo
, build
=False):
774 Create interface configuration for created topology. Basic Interface
775 configuration is provided in input json file.
779 * `tgen` : Topogen object
780 * `topo` : json file data
781 * `build` : Only for initial setup phase this is set as True.
790 for c_router
, c_data
in topo
.iteritems():
792 for destRouterLink
, data
in sorted(c_data
["links"].iteritems()):
793 # Loopback interfaces
794 if "type" in data
and data
["type"] == "loopback":
795 interface_name
= destRouterLink
797 interface_name
= data
["interface"]
798 interface_data
.append("interface {}".format(
802 intf_addr
= c_data
["links"][destRouterLink
]["ipv4"]
803 interface_data
.append("ip address {}".format(
807 intf_addr
= c_data
["links"][destRouterLink
]["ipv6"]
808 interface_data
.append("ipv6 address {}".format(
812 result
= create_common_configuration(tgen
, c_router
,
816 except InvalidCLIError
:
818 errormsg
= traceback
.format_exc()
819 logger
.error(errormsg
)
825 def create_static_routes(tgen
, input_dict
, build
=False):
827 Create static routes for given router as defined in input_dict
831 * `tgen` : Topogen object
832 * `input_dict` : Input dict data, required when configuring from testcase
833 * `build` : Only for initial setup phase this is set as True.
837 input_dict should be in the format below:
838 # static_routes: list of all routes
839 # network: network address
840 # no_of_ip: number of next-hop address that will be configured
841 # admin_distance: admin distance for route/routes.
842 # next_hop: starting next-hop address
843 # tag: tag id for static routes
844 # delete: True if config to be removed. Default False.
851 "network": "100.0.20.1/32",
853 "admin_distance": 100,
854 "next_hop": "10.0.0.1",
864 errormsg(str) or True
867 logger
.debug("Entering lib API: create_static_routes()")
868 input_dict
= deepcopy(input_dict
)
870 for router
in input_dict
.keys():
871 if "static_routes" not in input_dict
[router
]:
872 errormsg
= "static_routes not present in input_dict"
873 logger
.debug(errormsg
)
876 static_routes_list
= []
878 static_routes
= input_dict
[router
]["static_routes"]
879 for static_route
in static_routes
:
880 del_action
= static_route
.setdefault("delete", False)
882 no_of_ip
= static_route
.setdefault("no_of_ip", 1)
883 admin_distance
= static_route
.setdefault("admin_distance",
885 tag
= static_route
.setdefault("tag", None)
886 if "next_hop" not in static_route
or \
887 "network" not in static_route
:
888 errormsg
= "'next_hop' or 'network' missing in" \
892 next_hop
= static_route
["next_hop"]
893 network
= static_route
["network"]
894 if type(network
) is not list:
897 ip_list
= generate_ips(network
, no_of_ip
)
899 addr_type
= validate_ip_address(ip
)
901 if addr_type
== "ipv4":
902 cmd
= "ip route {} {}".format(ip
, next_hop
)
904 cmd
= "ipv6 route {} {}".format(ip
, next_hop
)
907 cmd
= "{} tag {}".format(cmd
, str(tag
))
910 cmd
= "{} {}".format(cmd
, admin_distance
)
913 cmd
= "no {}".format(cmd
)
915 static_routes_list
.append(cmd
)
917 result
= create_common_configuration(tgen
, router
,
922 except InvalidCLIError
:
924 errormsg
= traceback
.format_exc()
925 logger
.error(errormsg
)
928 logger
.debug("Exiting lib API: create_static_routes()")
932 def create_prefix_lists(tgen
, input_dict
, build
=False):
934 Create ip prefix lists as per the config provided in input
939 * `tgen` : Topogen object
940 * `input_dict` : Input dict data, required when configuring from testcase
941 * `build` : Only for initial setup phase this is set as True.
945 # pf_lists_1: name of prefix-list, user defined
946 # seqid: prefix-list seqid, auto-generated if not given by user
947 # network: criteria for applying prefix-list
948 # action: permit/deny
949 # le: less than or equal number of bits
950 # ge: greater than or equal number of bits
978 logger
.debug("Entering lib API: create_prefix_lists()")
981 for router
in input_dict
.keys():
982 if "prefix_lists" not in input_dict
[router
]:
983 errormsg
= "prefix_lists not present in input_dict"
984 logger
.debug(errormsg
)
988 prefix_lists
= input_dict
[router
]["prefix_lists"]
989 for addr_type
, prefix_data
in prefix_lists
.iteritems():
990 if not check_address_types(addr_type
):
993 for prefix_name
, prefix_list
in prefix_data
.iteritems():
994 for prefix_dict
in prefix_list
:
995 if "action" not in prefix_dict
or \
996 "network" not in prefix_dict
:
997 errormsg
= "'action' or network' missing in" \
1001 network_addr
= prefix_dict
["network"]
1002 action
= prefix_dict
["action"]
1003 le
= prefix_dict
.setdefault("le", None)
1004 ge
= prefix_dict
.setdefault("ge", None)
1005 seqid
= prefix_dict
.setdefault("seqid", None)
1006 del_action
= prefix_dict
.setdefault("delete", False)
1008 seqid
= get_seq_id("prefix_lists", router
,
1011 set_seq_id("prefix_lists", router
, seqid
,
1014 if addr_type
== "ipv4":
1019 cmd
= "{} prefix-list {} seq {} {} {}".format(
1020 protocol
, prefix_name
, seqid
, action
, network_addr
1023 cmd
= "{} le {}".format(cmd
, le
)
1025 cmd
= "{} ge {}".format(cmd
, ge
)
1028 cmd
= "no {}".format(cmd
)
1030 config_data
.append(cmd
)
1031 result
= create_common_configuration(tgen
, router
,
1036 except InvalidCLIError
:
1038 errormsg
= traceback
.format_exc()
1039 logger
.error(errormsg
)
1042 logger
.debug("Exiting lib API: create_prefix_lists()")
1046 def create_route_maps(tgen
, input_dict
, build
=False):
1048 Create route-map on the devices as per the arguments passed
1052 * `tgen` : Topogen object
1053 * `input_dict` : Input dict data, required when configuring from testcase
1054 * `build` : Only for initial setup phase this is set as True.
1058 # route_maps: key, value pair for route-map name and its attribute
1059 # rmap_match_prefix_list_1: user given name for route-map
1060 # action: PERMIT/DENY
1061 # match: key,value pair for match criteria. prefix_list, community-list,
1062 large-community-list or tag. Only one option at a time.
1063 # prefix_list: name of prefix list
1064 # large-community-list: name of large community list
1065 # community-ist: name of community list
1066 # tag: tag id for static routes
1067 # set: key, value pair for modifying route attributes
1068 # localpref: preference value for the network
1069 # med: metric value advertised for AS
1070 # aspath: set AS path value
1071 # weight: weight for the route
1072 # community: standard community value to be attached
1073 # large_community: large community value to be attached
1074 # community_additive: if set to "additive", adds community/large-community
1075 value to the existing values of the network prefix
1082 "rmap_match_prefix_list_1": [
1087 "prefix_list": "pf_list_1"
1090 "prefix_list": "pf_list_1"
1093 "large-community-list": {
1094 "id": "community_1",
1098 "id": "community_2",
1108 "action": "prepend",
1115 "large_community": {
1116 "num": "1:2:3 4:5;6",
1128 errormsg(str) or True
1132 logger
.debug("Entering lib API: create_route_maps()")
1133 input_dict
= deepcopy(input_dict
)
1135 for router
in input_dict
.keys():
1136 if "route_maps" not in input_dict
[router
]:
1137 logger
.debug("route_maps not present in input_dict")
1140 for rmap_name
, rmap_value
in \
1141 input_dict
[router
]["route_maps"].iteritems():
1143 for rmap_dict
in rmap_value
:
1144 del_action
= rmap_dict
.setdefault("delete", False)
1147 rmap_data
.append("no route-map {}".format(rmap_name
))
1150 if "action" not in rmap_dict
:
1151 errormsg
= "action not present in input_dict"
1152 logger
.error(errormsg
)
1155 rmap_action
= rmap_dict
.setdefault("action", "deny")
1157 seq_id
= rmap_dict
.setdefault("seq_id", None)
1159 seq_id
= get_seq_id("route_maps", router
, rmap_name
)
1161 set_seq_id("route_maps", router
, seq_id
, rmap_name
)
1163 rmap_data
.append("route-map {} {} {}".format(
1164 rmap_name
, rmap_action
, seq_id
1167 if "continue" in rmap_dict
:
1168 continue_to
= rmap_dict
["continue"]
1170 rmap_data
.append("on-match goto {}".
1171 format(continue_to
))
1173 logger
.error("In continue, 'route-map entry "
1174 "sequence number' is not provided")
1177 if "goto" in rmap_dict
:
1178 go_to
= rmap_dict
["goto"]
1180 rmap_data
.append("on-match goto {}".
1183 logger
.error("In goto, 'Goto Clause number' is not"
1187 if "call" in rmap_dict
:
1188 call_rmap
= rmap_dict
["call"]
1190 rmap_data
.append("call {}".
1193 logger
.error("In call, 'destination Route-Map' is"
1197 # Verifying if SET criteria is defined
1198 if "set" in rmap_dict
:
1199 set_data
= rmap_dict
["set"]
1200 ipv4_data
= set_data
.setdefault("ipv4", {})
1201 ipv6_data
= set_data
.setdefault("ipv6", {})
1202 local_preference
= set_data
.setdefault("localpref",
1204 metric
= set_data
.setdefault("med", None)
1205 as_path
= set_data
.setdefault("aspath", {})
1206 weight
= set_data
.setdefault("weight", None)
1207 community
= set_data
.setdefault("community", {})
1208 large_community
= set_data
.setdefault(
1209 "large_community", {})
1210 large_comm_list
= set_data
.setdefault(
1211 "large_comm_list", {})
1212 set_action
= set_data
.setdefault("set_action", None)
1213 nexthop
= set_data
.setdefault("nexthop", None)
1214 origin
= set_data
.setdefault("origin", None)
1217 if local_preference
:
1218 rmap_data
.append("set local-preference {}".
1219 format(local_preference
))
1223 rmap_data
.append("set metric {} \n".format(metric
))
1227 rmap_data
.append("set origin {} \n".format(origin
))
1231 as_num
= as_path
.setdefault("as_num", None)
1232 as_action
= as_path
.setdefault("as_action", None)
1233 if as_action
and as_num
:
1234 rmap_data
.append("set as-path {} {}".
1235 format(as_action
, as_num
))
1239 num
= community
.setdefault("num", None)
1240 comm_action
= community
.setdefault("action", None)
1242 cmd
= "set community {}".format(num
)
1244 cmd
= "{} {}".format(cmd
, comm_action
)
1245 rmap_data
.append(cmd
)
1247 logger
.error("In community, AS Num not"
1252 num
= large_community
.setdefault("num", None)
1253 comm_action
= large_community
.setdefault("action",
1256 cmd
= "set large-community {}".format(num
)
1258 cmd
= "{} {}".format(cmd
, comm_action
)
1260 rmap_data
.append(cmd
)
1262 logger
.error("In large_community, AS Num not"
1266 id = large_comm_list
.setdefault("id", None)
1267 del_comm
= large_comm_list
.setdefault("delete",
1270 cmd
= "set large-comm-list {}".format(id)
1272 cmd
= "{} delete".format(cmd
)
1274 rmap_data
.append(cmd
)
1276 logger
.error("In large_comm_list 'id' not"
1282 rmap_data
.append("set weight {}".format(
1285 nexthop
= ipv6_data
.setdefault("nexthop", None)
1287 rmap_data
.append("set ipv6 next-hop {}".format(
1291 # Adding MATCH and SET sequence to RMAP if defined
1292 if "match" in rmap_dict
:
1293 match_data
= rmap_dict
["match"]
1294 ipv4_data
= match_data
.setdefault("ipv4", {})
1295 ipv6_data
= match_data
.setdefault("ipv6", {})
1296 community
= match_data
.setdefault(
1297 "community_list",{})
1298 large_community
= match_data
.setdefault(
1299 "large_community", {}
1301 large_community_list
= match_data
.setdefault(
1302 "large_community_list", {}
1306 # fetch prefix list data from rmap
1308 ipv4_data
.setdefault("prefix_lists",
1311 rmap_data
.append("match ip address"
1312 " prefix-list {}".format(prefix_name
))
1314 # fetch tag data from rmap
1315 tag
= ipv4_data
.setdefault("tag", None)
1317 rmap_data
.append("match tag {}".format(tag
))
1319 # fetch large community data from rmap
1320 large_community_list
= ipv4_data
.setdefault(
1321 "large_community_list",{})
1322 large_community
= match_data
.setdefault(
1323 "large_community", {})
1326 prefix_name
= ipv6_data
.setdefault("prefix_lists",
1329 rmap_data
.append("match ipv6 address"
1330 " prefix-list {}".format(prefix_name
))
1332 # fetch tag data from rmap
1333 tag
= ipv6_data
.setdefault("tag", None)
1335 rmap_data
.append("match tag {}".format(tag
))
1337 # fetch large community data from rmap
1338 large_community_list
= ipv6_data
.setdefault(
1339 "large_community_list",{})
1340 large_community
= match_data
.setdefault(
1341 "large_community", {})
1344 if "id" not in community
:
1345 logger
.error("'id' is mandatory for "
1346 "community-list in match"
1349 cmd
= "match community {}".format(community
["id"])
1350 exact_match
= community
.setdefault("exact_match",
1353 cmd
= "{} exact-match".format(cmd
)
1355 rmap_data
.append(cmd
)
1357 if "id" not in large_community
:
1358 logger
.error("'id' is mandatory for "
1359 "large-community-list in match "
1362 cmd
= "match large-community {}".format(
1363 large_community
["id"])
1364 exact_match
= large_community
.setdefault(
1365 "exact_match", False)
1367 cmd
= "{} exact-match".format(cmd
)
1368 rmap_data
.append(cmd
)
1369 if large_community_list
:
1370 if "id" not in large_community_list
:
1371 logger
.error("'id' is mandatory for "
1372 "large-community-list in match "
1375 cmd
= "match large-community {}".format(
1376 large_community_list
["id"])
1377 exact_match
= large_community_list
.setdefault(
1378 "exact_match", False)
1380 cmd
= "{} exact-match".format(cmd
)
1381 rmap_data
.append(cmd
)
1383 result
= create_common_configuration(tgen
, router
,
1388 except InvalidCLIError
:
1390 errormsg
= traceback
.format_exc()
1391 logger
.error(errormsg
)
1394 logger
.debug("Exiting lib API: create_route_maps()")
1398 def delete_route_maps(tgen
, input_dict
):
1400 Delete ip route maps from device
1402 * `tgen` : Topogen object
1403 * `input_dict` : for which router,
1404 route map has to be deleted
1408 # Delete route-map rmap_1 and rmap_2 from router r1
1411 "route_maps": ["rmap_1", "rmap__2"]
1414 result = delete_route_maps("ipv4", input_dict)
1418 errormsg(str) or True
1420 logger
.info("Entering lib API: delete_route_maps()")
1422 for router
in input_dict
.keys():
1423 route_maps
= input_dict
[router
]["route_maps"][:]
1424 rmap_data
= input_dict
[router
]
1425 rmap_data
["route_maps"] = {}
1426 for route_map_name
in route_maps
:
1427 rmap_data
["route_maps"].update({
1434 return create_route_maps(tgen
, input_dict
)
1437 def create_bgp_community_lists(tgen
, input_dict
, build
=False):
1439 Create bgp community-list or large-community-list on the devices as per
1440 the arguments passed. Takes list of communities in input.
1444 * `tgen` : Topogen object
1445 * `input_dict` : Input dict data, required when configuring from testcase
1446 * `build` : Only for initial setup phase this is set as True.
1451 "bgp_community_lists": [
1453 "community_type": "standard",
1455 "name": "rmap_lcomm_{}".format(addr_type),
1456 "value": "1:1:1 1:2:3 2:1:1 2:2:2",
1463 result = create_bgp_community_lists(tgen, input_dict_1)
1467 logger
.debug("Entering lib API: create_bgp_community_lists()")
1468 input_dict
= deepcopy(input_dict
)
1470 for router
in input_dict
.keys():
1471 if "bgp_community_lists" not in input_dict
[router
]:
1472 errormsg
= "bgp_community_lists not present in input_dict"
1473 logger
.debug(errormsg
)
1478 community_list
= input_dict
[router
]["bgp_community_lists"]
1479 for community_dict
in community_list
:
1480 del_action
= community_dict
.setdefault("delete", False)
1481 community_type
= community_dict
.setdefault("community_type",
1483 action
= community_dict
.setdefault("action", None)
1484 value
= community_dict
.setdefault("value", '')
1485 large
= community_dict
.setdefault("large", None)
1486 name
= community_dict
.setdefault("name", None)
1488 cmd
= "bgp large-community-list"
1490 cmd
= "bgp community-list"
1492 if not large
and not (community_type
and action
and value
):
1493 errormsg
= "community_type, action and value are " \
1494 "required in bgp_community_list"
1495 logger
.error(errormsg
)
1499 community_type
= int(community_type
)
1500 cmd
= "{} {} {} {}".format(cmd
, community_type
, action
,
1504 cmd
= "{} {} {} {} {}".format(
1505 cmd
, community_type
, name
, action
, value
)
1508 cmd
= "no {}".format(cmd
)
1510 config_data
.append(cmd
)
1512 result
= create_common_configuration(tgen
, router
, config_data
,
1513 "bgp_community_list",
1516 except InvalidCLIError
:
1518 errormsg
= traceback
.format_exc()
1519 logger
.error(errormsg
)
1522 logger
.debug("Exiting lib API: create_bgp_community_lists()")
1526 def shutdown_bringup_interface(tgen
, dut
, intf_name
, ifaceaction
=False):
1528 Shutdown or bringup router's interface "
1530 * `tgen` : Topogen object
1531 * `dut` : Device under test
1532 * `intf_name` : Interface name to be shut/no shut
1533 * `ifaceaction` : Action, to shut/no shut interface,
1540 # Shut down ineterface
1541 shutdown_bringup_interface(tgen, dut, intf, False)
1543 # Bring up ineterface
1544 shutdown_bringup_interface(tgen, dut, intf, True)
1548 errormsg(str) or True
1551 router_list
= tgen
.routers()
1553 logger
.info("Bringing up interface : {}".format(intf_name
))
1555 logger
.info("Shutting down interface : {}".format(intf_name
))
1557 interface_set_status(router_list
[dut
], intf_name
, ifaceaction
)
1560 #############################################
1562 #############################################
1563 @retry(attempts
=10, return_is_str
=True, initial_wait
=2)
1564 def verify_rib(tgen
, addr_type
, dut
, input_dict
, next_hop
=None, protocol
=None):
1566 Data will be read from input_dict or input JSON file, API will generate
1567 same prefixes, which were redistributed by either create_static_routes() or
1568 advertise_networks_using_network_command() and do will verify next_hop and
1569 each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
1574 * `tgen` : topogen object
1575 * `addr_type` : ip type, ipv4/ipv6
1576 * `dut`: Device Under Test, for which user wants to test the data
1577 * `input_dict` : input dict, has details of static routes
1578 * `next_hop`[optional]: next_hop which needs to be verified,
1580 * `protocol`[optional]: protocol, default = None
1584 # RIB can be verified for static routes OR network advertised using
1585 network command. Following are input_dicts to create static routes
1586 and advertise networks using network command. Any one of the input_dict
1587 can be passed to verify_rib() to verify routes in DUT"s RIB.
1589 # Creating static routes for r1
1592 "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
1593 "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
1595 # Advertising networks using network command in router r1
1598 "advertise_networks": [{"start_ip": "20.0.0.0/32",
1599 "no_of_network": 10},
1600 {"start_ip": "30.0.0.0/32"}]
1602 # Verifying ipv4 routes in router r1 learned via BGP
1605 result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
1609 errormsg(str) or True
1612 logger
.debug("Entering lib API: verify_rib()")
1614 router_list
= tgen
.routers()
1615 for routerInput
in input_dict
.keys():
1616 for router
, rnode
in router_list
.iteritems():
1620 # Verifying RIB routes
1621 if addr_type
== "ipv4":
1623 command
= "show ip route {} json".format(protocol
)
1625 command
= "show ip route json"
1628 command
= "show ipv6 route {} json".format(protocol
)
1630 command
= "show ipv6 route json"
1632 logger
.info("Checking router %s RIB:", router
)
1633 rib_routes_json
= run_frr_cmd(rnode
, command
, isjson
=True)
1635 # Verifying output dictionary rib_routes_json is not empty
1636 if bool(rib_routes_json
) is False:
1637 errormsg
= "No {} route found in rib of router {}..". \
1638 format(protocol
, router
)
1641 if "static_routes" in input_dict
[routerInput
]:
1642 static_routes
= input_dict
[routerInput
]["static_routes"]
1645 for static_route
in static_routes
:
1649 network
= static_route
["network"]
1650 if "no_of_ip" in static_route
:
1651 no_of_ip
= static_route
["no_of_ip"]
1655 # Generating IPs for verification
1656 ip_list
= generate_ips(network
, no_of_ip
)
1657 for st_rt
in ip_list
:
1658 st_rt
= str(ipaddr
.IPNetwork(unicode(st_rt
)))
1660 if st_rt
in rib_routes_json
:
1662 found_routes
.append(st_rt
)
1665 if type(next_hop
) is not list:
1666 next_hop
= [next_hop
]
1668 found_hops
= [rib_r
["ip"] for rib_r
in
1669 rib_routes_json
[st_rt
][0][
1671 for nh
in found_hops
:
1673 if nh
and nh
in next_hop
:
1676 errormsg
= ("Nexthop {} is Missing for {}"
1677 " route {} in RIB of router"
1678 " {}\n".format(next_hop
,
1684 missing_routes
.append(st_rt
)
1687 logger
.info("Found next_hop %s for all routes in RIB of"
1688 " router %s\n", next_hop
, dut
)
1690 if not st_found
and len(missing_routes
) > 0:
1691 errormsg
= "Missing route in RIB of router {}, routes: " \
1692 "{}\n".format(dut
, missing_routes
)
1695 logger
.info("Verified routes in router %s RIB, found routes"
1696 " are: %s\n", dut
, found_routes
)
1700 if "bgp" in input_dict
[routerInput
]:
1701 if 'advertise_networks' in input_dict
[routerInput
]["bgp"]\
1702 ["address_family"][addr_type
]["unicast"]:
1706 advertise_network
= input_dict
[routerInput
]["bgp"]\
1707 ["address_family"][addr_type
]["unicast"]\
1708 ["advertise_networks"]
1710 for advertise_network_dict
in advertise_network
:
1711 start_ip
= advertise_network_dict
["network"]
1712 if "no_of_network" in advertise_network_dict
:
1713 no_of_network
= advertise_network_dict
["no_of_network"]
1717 # Generating IPs for verification
1718 ip_list
= generate_ips(start_ip
, no_of_network
)
1719 for st_rt
in ip_list
:
1720 st_rt
= str(ipaddr
.IPNetwork(unicode(st_rt
)))
1724 if st_rt
in rib_routes_json
:
1726 found_routes
.append(st_rt
)
1729 if type(next_hop
) is not list:
1730 next_hop
= [next_hop
]
1732 for index
, nh
in enumerate(next_hop
):
1733 if rib_routes_json
[st_rt
][0]\
1734 ['nexthops'][index
]['ip'] == nh
:
1737 errormsg
=("Nexthop {} is Missing"
1738 " for {} route {} in "
1739 "RIB of router {}\n".\
1746 missing_routes
.append(st_rt
)
1749 logger
.info("Found next_hop {} for all routes in RIB"
1750 " of router {}\n".format(next_hop
, dut
))
1752 if not found
and len(missing_routes
) > 0:
1753 errormsg
= ("Missing {} route in RIB of router {}, "
1755 format(addr_type
, dut
, missing_routes
))
1758 logger
.info("Verified {} routes in router {} RIB, found"
1759 " routes are: {}\n".\
1760 format(addr_type
, dut
, found_routes
))
1762 logger
.debug("Exiting lib API: verify_rib()")
1766 def verify_admin_distance_for_static_routes(tgen
, input_dict
):
1768 API to verify admin distance for static routes as defined in input_dict/
1769 input JSON by running show ip/ipv6 route json command.
1773 * `tgen` : topogen object
1774 * `input_dict`: having details like - for which router and static routes
1775 admin dsitance needs to be verified
1778 # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
1779 10.0.0.2 in router r1
1783 "network": "10.0.20.1/32",
1784 "admin_distance": 10,
1785 "next_hop": "10.0.0.2"
1789 result = verify_admin_distance_for_static_routes(tgen, input_dict)
1793 errormsg(str) or True
1796 logger
.debug("Entering lib API: verify_admin_distance_for_static_routes()")
1798 for router
in input_dict
.keys():
1799 if router
not in tgen
.routers():
1802 rnode
= tgen
.routers()[router
]
1804 for static_route
in input_dict
[router
]["static_routes"]:
1805 addr_type
= validate_ip_address(static_route
["network"])
1806 # Command to execute
1807 if addr_type
== "ipv4":
1808 command
= "show ip route json"
1810 command
= "show ipv6 route json"
1811 show_ip_route_json
= run_frr_cmd(rnode
, command
, isjson
=True)
1813 logger
.info("Verifying admin distance for static route %s"
1814 " under dut %s:", static_route
, router
)
1815 network
= static_route
["network"]
1816 next_hop
= static_route
["next_hop"]
1817 admin_distance
= static_route
["admin_distance"]
1818 route_data
= show_ip_route_json
[network
][0]
1819 if network
in show_ip_route_json
:
1820 if route_data
["nexthops"][0]["ip"] == next_hop
:
1821 if route_data
["distance"] != admin_distance
:
1822 errormsg
= ("Verification failed: admin distance"
1823 " for static route {} under dut {},"
1824 " found:{} but expected:{}".
1825 format(static_route
, router
,
1826 route_data
["distance"],
1830 logger
.info("Verification successful: admin"
1831 " distance for static route %s under"
1832 " dut %s, found:%s", static_route
,
1833 router
, route_data
["distance"])
1836 errormsg
= ("Static route {} not found in "
1837 "show_ip_route_json for dut {}".
1838 format(network
, router
))
1841 logger
.debug("Exiting lib API: verify_admin_distance_for_static_routes()")
1845 def verify_prefix_lists(tgen
, input_dict
):
1847 Running "show ip prefix-list" command and verifying given prefix-list
1848 is present in router.
1852 * `tgen` : topogen object
1853 * `input_dict`: data to verify prefix lists
1857 # To verify pf_list_1 is present in router r1
1860 "prefix_lists": ["pf_list_1"]
1862 result = verify_prefix_lists("ipv4", input_dict, tgen)
1866 errormsg(str) or True
1869 logger
.debug("Entering lib API: verify_prefix_lists()")
1871 for router
in input_dict
.keys():
1872 if router
not in tgen
.routers():
1875 rnode
= tgen
.routers()[router
]
1877 # Show ip prefix list
1878 show_prefix_list
= run_frr_cmd(rnode
, "show ip prefix-list")
1880 # Verify Prefix list is deleted
1881 prefix_lists_addr
= input_dict
[router
]["prefix_lists"]
1882 for addr_type
in prefix_lists_addr
:
1883 if not check_address_types(addr_type
):
1886 for prefix_list
in prefix_lists_addr
[addr_type
].keys():
1887 if prefix_list
in show_prefix_list
:
1888 errormsg
= ("Prefix list {} is/are present in the router"
1889 " {}".format(prefix_list
, router
))
1892 logger
.info("Prefix list %s is/are not present in the router"
1893 " from router %s", prefix_list
, router
)
1895 logger
.debug("Exiting lib API: verify_prefix_lists()")
1899 @retry(attempts
=2, wait
=4, return_is_str
=True, initial_wait
=2)
1900 def verify_route_maps(tgen
, input_dict
):
1902 Running "show route-map" command and verifying given route-map
1903 is present in router.
1906 * `tgen` : topogen object
1907 * `input_dict`: data to verify prefix lists
1910 # To verify rmap_1 and rmap_2 are present in router r1
1913 "route_maps": ["rmap_1", "rmap_2"]
1916 result = verify_route_maps(tgen, input_dict)
1919 errormsg(str) or True
1922 logger
.debug("Entering lib API: verify_route_maps()")
1924 for router
in input_dict
.keys():
1925 if router
not in tgen
.routers():
1928 rnode
= tgen
.routers()[router
]
1930 show_route_maps
= rnode
.vtysh_cmd("show route-map")
1932 # Verify route-map is deleted
1933 route_maps
= input_dict
[router
]["route_maps"]
1934 for route_map
in route_maps
:
1935 if route_map
in show_route_maps
:
1936 errormsg
= ("Route map {} is not deleted from router"
1937 " {}".format(route_map
, router
))
1940 logger
.info("Route map %s is/are deleted successfully from"
1941 " router %s", route_maps
, router
)
1943 logger
.debug("Exiting lib API: verify_route_maps()")
1947 @retry(attempts
=3, wait
=4, return_is_str
=True)
1948 def verify_bgp_community(tgen
, addr_type
, router
, network
, input_dict
=None):
1950 API to veiryf BGP large community is attached in route for any given
1951 DUT by running "show bgp ipv4/6 {route address} json" command.
1955 * `tgen`: topogen object
1956 * `addr_type` : ip type, ipv4/ipv6
1957 * `dut`: Device Under Test
1958 * `network`: network for which set criteria needs to be verified
1959 * `input_dict`: having details like - for which router, community and
1960 values needs to be verified
1963 networks = ["200.50.2.0/32"]
1965 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
1967 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
1971 errormsg(str) or True
1974 logger
.info("Entering lib API: verify_bgp_community()")
1975 if router
not in tgen
.routers():
1978 rnode
= tgen
.routers()[router
]
1980 logger
.debug("Verifying BGP community attributes on dut %s: for %s "
1981 "network %s", router
, addr_type
, network
)
1984 cmd
= "show bgp {} {} json".format(addr_type
, net
)
1985 show_bgp_json
= rnode
.vtysh_cmd(cmd
, isjson
=True)
1986 logger
.info(show_bgp_json
)
1987 if "paths" not in show_bgp_json
:
1988 return "Prefix {} not found in BGP table of router: {}". \
1991 as_paths
= show_bgp_json
["paths"]
1993 for i
in range(len(as_paths
)):
1994 if "largeCommunity" in show_bgp_json
["paths"][i
] or \
1995 "community" in show_bgp_json
["paths"][i
]:
1997 logger
.info("Large Community attribute is found for route:"
1998 " %s in router: %s", net
, router
)
1999 if input_dict
is not None:
2000 for criteria
, comm_val
in input_dict
.items():
2001 show_val
= show_bgp_json
["paths"][i
][criteria
][
2003 if comm_val
== show_val
:
2004 logger
.info("Verifying BGP %s for prefix: %s"
2005 " in router: %s, found expected"
2006 " value: %s", criteria
, net
, router
,
2009 errormsg
= "Failed: Verifying BGP attribute" \
2010 " {} for route: {} in router: {}" \
2011 ", expected value: {} but found" \
2013 criteria
, net
, router
, comm_val
,
2019 "Large Community attribute is not found for route: "
2020 "{} in router: {} ".format(net
, router
))
2023 logger
.debug("Exiting lib API: verify_bgp_community()")
2027 def verify_create_community_list(tgen
, input_dict
):
2029 API is to verify if large community list is created for any given DUT in
2030 input_dict by running "sh bgp large-community-list {"comm_name"} detail"
2034 * `tgen`: topogen object
2035 * `input_dict`: having details like - for which router, large community
2036 needs to be verified
2041 "large-community-list": {
2043 "Test1": [{"action": "PERMIT", "attribute":\
2046 result = verify_create_community_list(tgen, input_dict)
2049 errormsg(str) or True
2052 logger
.debug("Entering lib API: verify_create_community_list()")
2054 for router
in input_dict
.keys():
2055 if router
not in tgen
.routers():
2058 rnode
= tgen
.routers()[router
]
2060 logger
.info("Verifying large-community is created for dut %s:",
2063 for comm_data
in input_dict
[router
]["bgp_community_lists"]:
2064 comm_name
= comm_data
["name"]
2065 comm_type
= comm_data
["community_type"]
2066 show_bgp_community
= \
2068 "show bgp large-community-list {} detail".
2071 # Verify community list and type
2072 if comm_name
in show_bgp_community
and comm_type
in \
2074 logger
.info("BGP %s large-community-list %s is"
2075 " created", comm_type
, comm_name
)
2077 errormsg
= "BGP {} large-community-list {} is not" \
2078 " created".format(comm_type
, comm_name
)
2081 logger
.debug("Exiting lib API: verify_create_community_list()")