]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/common_config.py
Merge pull request #5517 from mjstapp/fix_evpn_state
[mirror_frr.git] / tests / topotests / lib / common_config.py
1 #
2 # Copyright (c) 2019 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
5 #
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
9 # in all copies.
10 #
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
18 # OF THIS SOFTWARE.
19 #
20
21 from collections import OrderedDict
22 from datetime import datetime
23 from time import sleep
24 from copy import deepcopy
25 from subprocess import call
26 from subprocess import STDOUT as SUB_STDOUT
27 from subprocess import PIPE as SUB_PIPE
28 from subprocess import Popen
29 from functools import wraps
30 from re import search as re_search
31 from tempfile import mkdtemp
32
33 import StringIO
34 import os
35 import ConfigParser
36 import traceback
37 import socket
38 import ipaddr
39
40 from lib.topolog import logger, logger_config
41 from lib.topogen import TopoRouter
42 from lib.topotest import interface_set_status
43
44
45 FRRCFG_FILE = "frr_json.conf"
46 FRRCFG_BKUP_FILE = "frr_json_initial.conf"
47
48 ERROR_LIST = ["Malformed", "Failure", "Unknown", "Incomplete"]
49 ROUTER_LIST = []
50
51 ####
52 CD = os.path.dirname(os.path.realpath(__file__))
53 PYTESTINI_PATH = os.path.join(CD, "../pytest.ini")
54
55 # Creating tmp dir with testsuite name to avoid conflict condition when
56 # multiple testsuites run together. All temporary files would be created
57 # in this dir and this dir would be removed once testsuite run is
58 # completed
59 LOGDIR = "/tmp/topotests/"
60 TMPDIR = None
61
62 # NOTE: to save execution logs to log file frrtest_log_dir must be configured
63 # in `pytest.ini`.
64 config = ConfigParser.ConfigParser()
65 config.read(PYTESTINI_PATH)
66
67 config_section = "topogen"
68
69 if config.has_option("topogen", "verbosity"):
70 loglevel = config.get("topogen", "verbosity")
71 loglevel = loglevel.upper()
72 else:
73 loglevel = "INFO"
74
75 if config.has_option("topogen", "frrtest_log_dir"):
76 frrtest_log_dir = config.get("topogen", "frrtest_log_dir")
77 time_stamp = datetime.time(datetime.now())
78 logfile_name = "frr_test_bgp_"
79 frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp)
80 print("frrtest_log_file..", frrtest_log_file)
81
82 logger = logger_config.get_logger(name="test_execution_logs",
83 log_level=loglevel,
84 target=frrtest_log_file)
85 print("Logs will be sent to logfile: {}".format(frrtest_log_file))
86
87 if config.has_option("topogen", "show_router_config"):
88 show_router_config = config.get("topogen", "show_router_config")
89 else:
90 show_router_config = False
91
92 # env variable for setting what address type to test
93 ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES")
94
95
96 # Saves sequence id numbers
97 SEQ_ID = {
98 "prefix_lists": {},
99 "route_maps": {}
100 }
101
102
103 def get_seq_id(obj_type, router, obj_name):
104 """
105 Generates and saves sequence number in interval of 10
106
107 Parameters
108 ----------
109 * `obj_type`: prefix_lists or route_maps
110 * `router`: router name
111 *` obj_name`: name of the prefix-list or route-map
112
113 Returns
114 --------
115 Sequence number generated
116 """
117
118 router_data = SEQ_ID[obj_type].setdefault(router, {})
119 obj_data = router_data.setdefault(obj_name, {})
120 seq_id = obj_data.setdefault("seq_id", 0)
121
122 seq_id = int(seq_id) + 10
123 obj_data["seq_id"] = seq_id
124
125 return seq_id
126
127
128 def set_seq_id(obj_type, router, id, obj_name):
129 """
130 Saves sequence number if not auto-generated and given by user
131
132 Parameters
133 ----------
134 * `obj_type`: prefix_lists or route_maps
135 * `router`: router name
136 *` obj_name`: name of the prefix-list or route-map
137 """
138 router_data = SEQ_ID[obj_type].setdefault(router, {})
139 obj_data = router_data.setdefault(obj_name, {})
140 seq_id = obj_data.setdefault("seq_id", 0)
141
142 seq_id = int(seq_id) + int(id)
143 obj_data["seq_id"] = seq_id
144
145
146 class InvalidCLIError(Exception):
147 """Raise when the CLI command is wrong"""
148 pass
149
150
151 def run_frr_cmd(rnode, cmd, isjson=False):
152 """
153 Execute frr show commands in priviledged mode
154
155 * `rnode`: router node on which commands needs to executed
156 * `cmd`: Command to be executed on frr
157 * `isjson`: If command is to get json data or not
158
159 :return str:
160 """
161
162 if cmd:
163 ret_data = rnode.vtysh_cmd(cmd, isjson=isjson)
164
165 if True:
166 if isjson:
167 logger.debug(ret_data)
168 print_data = rnode.vtysh_cmd(cmd.rstrip("json"), isjson=False)
169 else:
170 print_data = ret_data
171
172 logger.info('Output for command [ %s] on router %s:\n%s',
173 cmd.rstrip("json"), rnode.name, print_data)
174 return ret_data
175
176 else:
177 raise InvalidCLIError('No actual cmd passed')
178
179
180 def create_common_configuration(tgen, router, data, config_type=None,
181 build=False):
182 """
183 API to create object of class FRRConfig and also create frr_json.conf
184 file. It will create interface and common configurations and save it to
185 frr_json.conf and load to router
186
187 Parameters
188 ----------
189 * `tgen`: tgen onject
190 * `data`: Congiguration data saved in a list.
191 * `router` : router id to be configured.
192 * `config_type` : Syntactic information while writing configuration. Should
193 be one of the value as mentioned in the config_map below.
194 * `build` : Only for initial setup phase this is set as True
195
196 Returns
197 -------
198 True or False
199 """
200 TMPDIR = os.path.join(LOGDIR, tgen.modname)
201
202 fname = "{}/{}/{}".format(TMPDIR, router, FRRCFG_FILE)
203
204 config_map = OrderedDict({
205 "general_config": "! FRR General Config\n",
206 "interface_config": "! Interfaces Config\n",
207 "static_route": "! Static Route Config\n",
208 "prefix_list": "! Prefix List Config\n",
209 "bgp_community_list": "! Community List Config\n",
210 "route_maps": "! Route Maps Config\n",
211 "bgp": "! BGP Config\n"
212 })
213
214 if build:
215 mode = "a"
216 else:
217 mode = "w"
218
219 try:
220 frr_cfg_fd = open(fname, mode)
221 if config_type:
222 frr_cfg_fd.write(config_map[config_type])
223 for line in data:
224 frr_cfg_fd.write("{} \n".format(str(line)))
225 frr_cfg_fd.write("\n")
226
227 except IOError as err:
228 logger.error("Unable to open FRR Config File. error(%s): %s" %
229 (err.errno, err.strerror))
230 return False
231 finally:
232 frr_cfg_fd.close()
233
234 # If configuration applied from build, it will done at last
235 if not build:
236 load_config_to_router(tgen, router)
237
238 return True
239
240
241 def reset_config_on_routers(tgen, routerName=None):
242 """
243 Resets configuration on routers to the snapshot created using input JSON
244 file. It replaces existing router configuration with FRRCFG_BKUP_FILE
245
246 Parameters
247 ----------
248 * `tgen` : Topogen object
249 * `routerName` : router config is to be reset
250 """
251
252 logger.debug("Entering API: reset_config_on_routers")
253
254 router_list = tgen.routers()
255 for rname in ROUTER_LIST:
256 if routerName and routerName != rname:
257 continue
258
259 router = router_list[rname]
260 logger.info("Configuring router %s to initial test configuration",
261 rname)
262 cfg = router.run("vtysh -c 'show running'")
263 fname = "{}/{}/frr.sav".format(TMPDIR, rname)
264 dname = "{}/{}/delta.conf".format(TMPDIR, rname)
265 f = open(fname, "w")
266 for line in cfg.split("\n"):
267 line = line.strip()
268
269 if (line == "Building configuration..." or
270 line == "Current configuration:" or
271 not line):
272 continue
273 f.write(line)
274 f.write("\n")
275
276 f.close()
277
278 run_cfg_file = "{}/{}/frr.sav".format(TMPDIR, rname)
279 init_cfg_file = "{}/{}/frr_json_initial.conf".format(TMPDIR, rname)
280
281 tempdir = mkdtemp()
282 with open(os.path.join(tempdir, 'vtysh.conf'), 'w') as fd:
283 pass
284
285 command = "/usr/lib/frr/frr-reload.py --confdir {} --input {} --test {} > {}". \
286 format(tempdir, run_cfg_file, init_cfg_file, dname)
287 result = call(command, shell=True, stderr=SUB_STDOUT,
288 stdout=SUB_PIPE)
289
290 os.unlink(os.path.join(tempdir, 'vtysh.conf'))
291 os.rmdir(tempdir)
292
293 # Assert if command fail
294 if result > 0:
295 logger.error("Delta file creation failed. Command executed %s",
296 command)
297 with open(run_cfg_file, 'r') as fd:
298 logger.info('Running configuration saved in %s is:\n%s',
299 run_cfg_file, fd.read())
300 with open(init_cfg_file, 'r') as fd:
301 logger.info('Test configuration saved in %s is:\n%s',
302 init_cfg_file, fd.read())
303
304 err_cmd = ['/usr/bin/vtysh', '-m', '-f', run_cfg_file]
305 result = Popen(err_cmd, stdout=SUB_PIPE, stderr=SUB_PIPE)
306 output = result.communicate()
307 for out_data in output:
308 temp_data = out_data.decode('utf-8').lower()
309 for out_err in ERROR_LIST:
310 if out_err.lower() in temp_data:
311 logger.error("Found errors while validating data in"
312 " %s", run_cfg_file)
313 raise InvalidCLIError(out_data)
314 raise InvalidCLIError("Unknown error in %s", output)
315
316 f = open(dname, "r")
317 delta = StringIO.StringIO()
318 delta.write("configure terminal\n")
319 t_delta = f.read()
320 for line in t_delta.split("\n"):
321 line = line.strip()
322 if (line == "Lines To Delete" or
323 line == "===============" or
324 line == "Lines To Add" or
325 line == "============" or
326 not line):
327 continue
328 delta.write(line)
329 delta.write("\n")
330
331 delta.write("end\n")
332 output = router.vtysh_multicmd(delta.getvalue(),
333 pretty_output=False)
334
335 delta.close()
336 delta = StringIO.StringIO()
337 cfg = router.run("vtysh -c 'show running'")
338 for line in cfg.split("\n"):
339 line = line.strip()
340 delta.write(line)
341 delta.write("\n")
342
343 # Router current configuration to log file or console if
344 # "show_router_config" is defined in "pytest.ini"
345 if show_router_config:
346 logger.info("Configuration on router {} after config reset:".
347 format(rname))
348 logger.info(delta.getvalue())
349 delta.close()
350
351 logger.debug("Exting API: reset_config_on_routers")
352 return True
353
354
355 def load_config_to_router(tgen, routerName, save_bkup=False):
356 """
357 Loads configuration on router from the file FRRCFG_FILE.
358
359 Parameters
360 ----------
361 * `tgen` : Topogen object
362 * `routerName` : router for which configuration to be loaded
363 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
364 """
365
366 logger.debug("Entering API: load_config_to_router")
367
368 router_list = tgen.routers()
369 for rname in ROUTER_LIST:
370 if routerName and routerName != rname:
371 continue
372
373 router = router_list[rname]
374 try:
375 frr_cfg_file = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_FILE)
376 frr_cfg_bkup = "{}/{}/{}".format(TMPDIR, rname,
377 FRRCFG_BKUP_FILE)
378 with open(frr_cfg_file, "r+") as cfg:
379 data = cfg.read()
380 logger.info("Applying following configuration on router"
381 " {}:\n{}".format(rname, data))
382 if save_bkup:
383 with open(frr_cfg_bkup, "w") as bkup:
384 bkup.write(data)
385
386 output = router.vtysh_multicmd(data, pretty_output=False)
387 for out_err in ERROR_LIST:
388 if out_err.lower() in output.lower():
389 raise InvalidCLIError("%s" % output)
390
391 cfg.truncate(0)
392 except IOError as err:
393 errormsg = ("Unable to open config File. error(%s):"
394 " %s", (err.errno, err.strerror))
395 return errormsg
396
397 # Router current configuration to log file or console if
398 # "show_router_config" is defined in "pytest.ini"
399 if show_router_config:
400 new_config = router.run("vtysh -c 'show running'")
401 logger.info(new_config)
402
403 logger.debug("Exting API: load_config_to_router")
404 return True
405
406
407 def start_topology(tgen):
408 """
409 Starting topology, create tmp files which are loaded to routers
410 to start deamons and then start routers
411 * `tgen` : topogen object
412 """
413
414 global TMPDIR, ROUTER_LIST
415 # Starting topology
416 tgen.start_topology()
417
418 # Starting deamons
419
420 router_list = tgen.routers()
421 ROUTER_LIST = sorted(router_list.keys(),
422 key=lambda x: int(re_search('\d+', x).group(0)))
423 TMPDIR = os.path.join(LOGDIR, tgen.modname)
424
425 router_list = tgen.routers()
426 for rname in ROUTER_LIST:
427 router = router_list[rname]
428 try:
429 os.chdir(TMPDIR)
430
431 # Creating router named dir and empty zebra.conf bgpd.conf files
432 # inside the current directory
433 if os.path.isdir('{}'.format(rname)):
434 os.system("rm -rf {}".format(rname))
435 os.mkdir('{}'.format(rname))
436 os.system('chmod -R go+rw {}'.format(rname))
437 os.chdir('{}/{}'.format(TMPDIR, rname))
438 os.system('touch zebra.conf bgpd.conf')
439 else:
440 os.mkdir('{}'.format(rname))
441 os.system('chmod -R go+rw {}'.format(rname))
442 os.chdir('{}/{}'.format(TMPDIR, rname))
443 os.system('touch zebra.conf bgpd.conf')
444
445 except IOError as (errno, strerror):
446 logger.error("I/O error({0}): {1}".format(errno, strerror))
447
448 # Loading empty zebra.conf file to router, to start the zebra deamon
449 router.load_config(
450 TopoRouter.RD_ZEBRA,
451 '{}/{}/zebra.conf'.format(TMPDIR, rname)
452 )
453 # Loading empty bgpd.conf file to router, to start the bgp deamon
454 router.load_config(
455 TopoRouter.RD_BGP,
456 '{}/{}/bgpd.conf'.format(TMPDIR, rname)
457 )
458
459 # Starting routers
460 logger.info("Starting all routers once topology is created")
461 tgen.start_router()
462
463
464 def number_to_row(routerName):
465 """
466 Returns the number for the router.
467 Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23
468 etc
469 """
470 return int(routerName[1:])
471
472
473 def number_to_column(routerName):
474 """
475 Returns the number for the router.
476 Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1,
477 z23 = column 26 etc
478 """
479 return ord(routerName[0]) - 97
480
481
482 #############################################
483 # Common APIs, will be used by all protocols
484 #############################################
485
486 def validate_ip_address(ip_address):
487 """
488 Validates the type of ip address
489
490 Parameters
491 ----------
492 * `ip_address`: IPv4/IPv6 address
493
494 Returns
495 -------
496 Type of address as string
497 """
498
499 if "/" in ip_address:
500 ip_address = ip_address.split("/")[0]
501
502 v4 = True
503 v6 = True
504 try:
505 socket.inet_aton(ip_address)
506 except socket.error as error:
507 logger.debug("Not a valid IPv4 address")
508 v4 = False
509 else:
510 return "ipv4"
511
512 try:
513 socket.inet_pton(socket.AF_INET6, ip_address)
514 except socket.error as error:
515 logger.debug("Not a valid IPv6 address")
516 v6 = False
517 else:
518 return "ipv6"
519
520 if not v4 and not v6:
521 raise Exception("InvalidIpAddr", "%s is neither valid IPv4 or IPv6"
522 " address" % ip_address)
523
524
525 def check_address_types(addr_type=None):
526 """
527 Checks environment variable set and compares with the current address type
528 """
529
530 addr_types_env = os.environ.get("ADDRESS_TYPES")
531 if not addr_types_env:
532 addr_types_env = "dual"
533
534 if addr_types_env == "dual":
535 addr_types = ["ipv4", "ipv6"]
536 elif addr_types_env == "ipv4":
537 addr_types = ["ipv4"]
538 elif addr_types_env == "ipv6":
539 addr_types = ["ipv6"]
540
541 if addr_type is None:
542 return addr_types
543
544 if addr_type not in addr_types:
545 logger.error("{} not in supported/configured address types {}".
546 format(addr_type, addr_types))
547 return False
548
549 return True
550
551
552 def generate_ips(network, no_of_ips):
553 """
554 Returns list of IPs.
555 based on start_ip and no_of_ips
556
557 * `network` : from here the ip will start generating,
558 start_ip will be
559 * `no_of_ips` : these many IPs will be generated
560 """
561
562 ipaddress_list = []
563 if type(network) is not list:
564 network = [network]
565
566 for start_ipaddr in network:
567 if "/" in start_ipaddr:
568 start_ip = start_ipaddr.split("/")[0]
569 mask = int(start_ipaddr.split("/")[1])
570
571 addr_type = validate_ip_address(start_ip)
572 if addr_type == "ipv4":
573 start_ip = ipaddr.IPv4Address(unicode(start_ip))
574 step = 2 ** (32 - mask)
575 if addr_type == "ipv6":
576 start_ip = ipaddr.IPv6Address(unicode(start_ip))
577 step = 2 ** (128 - mask)
578
579 next_ip = start_ip
580 count = 0
581 while count < no_of_ips:
582 ipaddress_list.append("{}/{}".format(next_ip, mask))
583 if addr_type == "ipv6":
584 next_ip = ipaddr.IPv6Address(int(next_ip) + step)
585 else:
586 next_ip += step
587 count += 1
588
589 return ipaddress_list
590
591
592 def find_interface_with_greater_ip(topo, router, loopback=True,
593 interface=True):
594 """
595 Returns highest interface ip for ipv4/ipv6. If loopback is there then
596 it will return highest IP from loopback IPs otherwise from physical
597 interface IPs.
598
599 * `topo` : json file data
600 * `router` : router for which hightest interface should be calculated
601 """
602
603 link_data = topo["routers"][router]["links"]
604 lo_list = []
605 interfaces_list = []
606 lo_exists = False
607 for destRouterLink, data in sorted(link_data.iteritems()):
608 if loopback:
609 if "type" in data and data["type"] == "loopback":
610 lo_exists = True
611 ip_address = topo["routers"][router]["links"][
612 destRouterLink]["ipv4"].split("/")[0]
613 lo_list.append(ip_address)
614 if interface:
615 ip_address = topo["routers"][router]["links"][
616 destRouterLink]["ipv4"].split("/")[0]
617 interfaces_list.append(ip_address)
618
619 if lo_exists:
620 return sorted(lo_list)[-1]
621
622 return sorted(interfaces_list)[-1]
623
624
625 def write_test_header(tc_name):
626 """ Display message at beginning of test case"""
627 count = 20
628 logger.info("*"*(len(tc_name)+count))
629 step("START -> Testcase : %s" % tc_name, reset=True)
630 logger.info("*"*(len(tc_name)+count))
631
632
633 def write_test_footer(tc_name):
634 """ Display message at end of test case"""
635 count = 21
636 logger.info("="*(len(tc_name)+count))
637 logger.info("Testcase : %s -> PASSED", tc_name)
638 logger.info("="*(len(tc_name)+count))
639
640
641 def interface_status(tgen, topo, input_dict):
642 """
643 Delete ip route maps from device
644
645 * `tgen` : Topogen object
646 * `topo` : json file data
647 * `input_dict` : for which router, route map has to be deleted
648
649 Usage
650 -----
651 input_dict = {
652 "r3": {
653 "interface_list": ['eth1-r1-r2', 'eth2-r1-r3'],
654 "status": "down"
655 }
656 }
657 Returns
658 -------
659 errormsg(str) or True
660 """
661 logger.debug("Entering lib API: interface_status()")
662
663 try:
664 global frr_cfg
665 for router in input_dict.keys():
666
667 interface_list = input_dict[router]['interface_list']
668 status = input_dict[router].setdefault('status', 'up')
669 for intf in interface_list:
670 rnode = tgen.routers()[router]
671 interface_set_status(rnode, intf, status)
672
673 # Load config to router
674 load_config_to_router(tgen, router)
675
676 except Exception as e:
677 # handle any exception
678 logger.error("Error %s occured. Arguments %s.", e.message, e.args)
679
680 # Traceback
681 errormsg = traceback.format_exc()
682 logger.error(errormsg)
683 return errormsg
684
685 logger.debug("Exiting lib API: interface_status()")
686 return True
687
688
689 def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0):
690 """
691 Retries function execution, if return is an errormsg or exception
692
693 * `attempts`: Number of attempts to make
694 * `wait`: Number of seconds to wait between each attempt
695 * `return_is_str`: Return val is an errormsg in case of failure
696 * `initial_wait`: Sleeps for this much seconds before executing function
697
698 """
699
700 def _retry(func):
701
702 @wraps(func)
703 def func_retry(*args, **kwargs):
704 _wait = kwargs.pop('wait', wait)
705 _attempts = kwargs.pop('attempts', attempts)
706 _attempts = int(_attempts)
707 if _attempts < 0:
708 raise ValueError("attempts must be 0 or greater")
709
710 if initial_wait > 0:
711 logger.info("Waiting for [%s]s as initial delay", initial_wait)
712 sleep(initial_wait)
713
714 _return_is_str = kwargs.pop('return_is_str', return_is_str)
715 for i in range(1, _attempts + 1):
716 try:
717 _expected = kwargs.setdefault('expected', True)
718 kwargs.pop('expected')
719 ret = func(*args, **kwargs)
720 logger.debug("Function returned %s" % ret)
721 if return_is_str and isinstance(ret, bool) and _expected:
722 return ret
723 if isinstance(ret, str) and _expected is False:
724 return ret
725
726 if _attempts == i:
727 return ret
728 except Exception as err:
729 if _attempts == i:
730 logger.info("Max number of attempts (%r) reached",
731 _attempts)
732 raise
733 else:
734 logger.info("Function returned %s", err)
735 if i < _attempts:
736 logger.info("Retry [#%r] after sleeping for %ss"
737 % (i, _wait))
738 sleep(_wait)
739 func_retry._original = func
740 return func_retry
741 return _retry
742
743
744 class Stepper:
745 """
746 Prints step number for the test case step being executed
747 """
748 count = 1
749
750 def __call__(self, msg, reset):
751 if reset:
752 Stepper.count = 1
753 logger.info(msg)
754 else:
755 logger.info("STEP %s: '%s'", Stepper.count, msg)
756 Stepper.count += 1
757
758
759 def step(msg, reset=False):
760 """
761 Call Stepper to print test steps. Need to reset at the beginning of test.
762 * ` msg` : Step message body.
763 * `reset` : Reset step count to 1 when set to True.
764 """
765 _step = Stepper()
766 _step(msg, reset)
767
768
769 #############################################
770 # These APIs, will used by testcase
771 #############################################
772 def create_interfaces_cfg(tgen, topo, build=False):
773 """
774 Create interface configuration for created topology. Basic Interface
775 configuration is provided in input json file.
776
777 Parameters
778 ----------
779 * `tgen` : Topogen object
780 * `topo` : json file data
781 * `build` : Only for initial setup phase this is set as True.
782
783 Returns
784 -------
785 True or False
786 """
787 result = False
788
789 try:
790 for c_router, c_data in topo.iteritems():
791 interface_data = []
792 for destRouterLink, data in sorted(c_data["links"].iteritems()):
793 # Loopback interfaces
794 if "type" in data and data["type"] == "loopback":
795 interface_name = destRouterLink
796 else:
797 interface_name = data["interface"]
798 interface_data.append("interface {}".format(
799 str(interface_name)
800 ))
801 if "ipv4" in data:
802 intf_addr = c_data["links"][destRouterLink]["ipv4"]
803 interface_data.append("ip address {}".format(
804 intf_addr
805 ))
806 if "ipv6" in data:
807 intf_addr = c_data["links"][destRouterLink]["ipv6"]
808 interface_data.append("ipv6 address {}".format(
809 intf_addr
810 ))
811
812 result = create_common_configuration(tgen, c_router,
813 interface_data,
814 "interface_config",
815 build=build)
816 except InvalidCLIError:
817 # Traceback
818 errormsg = traceback.format_exc()
819 logger.error(errormsg)
820 return errormsg
821
822 return result
823
824
825 def create_static_routes(tgen, input_dict, build=False):
826 """
827 Create static routes for given router as defined in input_dict
828
829 Parameters
830 ----------
831 * `tgen` : Topogen object
832 * `input_dict` : Input dict data, required when configuring from testcase
833 * `build` : Only for initial setup phase this is set as True.
834
835 Usage
836 -----
837 input_dict should be in the format below:
838 # static_routes: list of all routes
839 # network: network address
840 # no_of_ip: number of next-hop address that will be configured
841 # admin_distance: admin distance for route/routes.
842 # next_hop: starting next-hop address
843 # tag: tag id for static routes
844 # delete: True if config to be removed. Default False.
845
846 Example:
847 "routers": {
848 "r1": {
849 "static_routes": [
850 {
851 "network": "100.0.20.1/32",
852 "no_of_ip": 9,
853 "admin_distance": 100,
854 "next_hop": "10.0.0.1",
855 "tag": 4001
856 "delete": true
857 }
858 ]
859 }
860 }
861
862 Returns
863 -------
864 errormsg(str) or True
865 """
866 result = False
867 logger.debug("Entering lib API: create_static_routes()")
868 input_dict = deepcopy(input_dict)
869 try:
870 for router in input_dict.keys():
871 if "static_routes" not in input_dict[router]:
872 errormsg = "static_routes not present in input_dict"
873 logger.debug(errormsg)
874 continue
875
876 static_routes_list = []
877
878 static_routes = input_dict[router]["static_routes"]
879 for static_route in static_routes:
880 del_action = static_route.setdefault("delete", False)
881 # No of IPs
882 no_of_ip = static_route.setdefault("no_of_ip", 1)
883 admin_distance = static_route.setdefault("admin_distance",
884 None)
885 tag = static_route.setdefault("tag", None)
886 if "next_hop" not in static_route or \
887 "network" not in static_route:
888 errormsg = "'next_hop' or 'network' missing in" \
889 " input_dict"
890 return errormsg
891
892 next_hop = static_route["next_hop"]
893 network = static_route["network"]
894 if type(network) is not list:
895 network = [network]
896
897 ip_list = generate_ips(network, no_of_ip)
898 for ip in ip_list:
899 addr_type = validate_ip_address(ip)
900
901 if addr_type == "ipv4":
902 cmd = "ip route {} {}".format(ip, next_hop)
903 else:
904 cmd = "ipv6 route {} {}".format(ip, next_hop)
905
906 if tag:
907 cmd = "{} tag {}".format(cmd, str(tag))
908
909 if admin_distance:
910 cmd = "{} {}".format(cmd, admin_distance)
911
912 if del_action:
913 cmd = "no {}".format(cmd)
914
915 static_routes_list.append(cmd)
916
917 result = create_common_configuration(tgen, router,
918 static_routes_list,
919 "static_route",
920 build=build)
921
922 except InvalidCLIError:
923 # Traceback
924 errormsg = traceback.format_exc()
925 logger.error(errormsg)
926 return errormsg
927
928 logger.debug("Exiting lib API: create_static_routes()")
929 return result
930
931
932 def create_prefix_lists(tgen, input_dict, build=False):
933 """
934 Create ip prefix lists as per the config provided in input
935 JSON or input_dict
936
937 Parameters
938 ----------
939 * `tgen` : Topogen object
940 * `input_dict` : Input dict data, required when configuring from testcase
941 * `build` : Only for initial setup phase this is set as True.
942
943 Usage
944 -----
945 # pf_lists_1: name of prefix-list, user defined
946 # seqid: prefix-list seqid, auto-generated if not given by user
947 # network: criteria for applying prefix-list
948 # action: permit/deny
949 # le: less than or equal number of bits
950 # ge: greater than or equal number of bits
951
952 Example
953 -------
954 input_dict = {
955 "r1": {
956 "prefix_lists":{
957 "ipv4": {
958 "pf_list_1": [
959 {
960 "seqid": 10,
961 "network": "any",
962 "action": "permit",
963 "le": "32",
964 "ge": "30",
965 "delete": True
966 }
967 ]
968 }
969 }
970 }
971 }
972
973 Returns
974 -------
975 errormsg or True
976 """
977
978 logger.debug("Entering lib API: create_prefix_lists()")
979 result = False
980 try:
981 for router in input_dict.keys():
982 if "prefix_lists" not in input_dict[router]:
983 errormsg = "prefix_lists not present in input_dict"
984 logger.debug(errormsg)
985 continue
986
987 config_data = []
988 prefix_lists = input_dict[router]["prefix_lists"]
989 for addr_type, prefix_data in prefix_lists.iteritems():
990 if not check_address_types(addr_type):
991 continue
992
993 for prefix_name, prefix_list in prefix_data.iteritems():
994 for prefix_dict in prefix_list:
995 if "action" not in prefix_dict or \
996 "network" not in prefix_dict:
997 errormsg = "'action' or network' missing in" \
998 " input_dict"
999 return errormsg
1000
1001 network_addr = prefix_dict["network"]
1002 action = prefix_dict["action"]
1003 le = prefix_dict.setdefault("le", None)
1004 ge = prefix_dict.setdefault("ge", None)
1005 seqid = prefix_dict.setdefault("seqid", None)
1006 del_action = prefix_dict.setdefault("delete", False)
1007 if seqid is None:
1008 seqid = get_seq_id("prefix_lists", router,
1009 prefix_name)
1010 else:
1011 set_seq_id("prefix_lists", router, seqid,
1012 prefix_name)
1013
1014 if addr_type == "ipv4":
1015 protocol = "ip"
1016 else:
1017 protocol = "ipv6"
1018
1019 cmd = "{} prefix-list {} seq {} {} {}".format(
1020 protocol, prefix_name, seqid, action, network_addr
1021 )
1022 if le:
1023 cmd = "{} le {}".format(cmd, le)
1024 if ge:
1025 cmd = "{} ge {}".format(cmd, ge)
1026
1027 if del_action:
1028 cmd = "no {}".format(cmd)
1029
1030 config_data.append(cmd)
1031 result = create_common_configuration(tgen, router,
1032 config_data,
1033 "prefix_list",
1034 build=build)
1035
1036 except InvalidCLIError:
1037 # Traceback
1038 errormsg = traceback.format_exc()
1039 logger.error(errormsg)
1040 return errormsg
1041
1042 logger.debug("Exiting lib API: create_prefix_lists()")
1043 return result
1044
1045
1046 def create_route_maps(tgen, input_dict, build=False):
1047 """
1048 Create route-map on the devices as per the arguments passed
1049
1050 Parameters
1051 ----------
1052 * `tgen` : Topogen object
1053 * `input_dict` : Input dict data, required when configuring from testcase
1054 * `build` : Only for initial setup phase this is set as True.
1055
1056 Usage
1057 -----
1058 # route_maps: key, value pair for route-map name and its attribute
1059 # rmap_match_prefix_list_1: user given name for route-map
1060 # action: PERMIT/DENY
1061 # match: key,value pair for match criteria. prefix_list, community-list,
1062 large-community-list or tag. Only one option at a time.
1063 # prefix_list: name of prefix list
1064 # large-community-list: name of large community list
1065 # community-ist: name of community list
1066 # tag: tag id for static routes
1067 # set: key, value pair for modifying route attributes
1068 # localpref: preference value for the network
1069 # med: metric value advertised for AS
1070 # aspath: set AS path value
1071 # weight: weight for the route
1072 # community: standard community value to be attached
1073 # large_community: large community value to be attached
1074 # community_additive: if set to "additive", adds community/large-community
1075 value to the existing values of the network prefix
1076
1077 Example:
1078 --------
1079 input_dict = {
1080 "r1": {
1081 "route_maps": {
1082 "rmap_match_prefix_list_1": [
1083 {
1084 "action": "PERMIT",
1085 "match": {
1086 "ipv4": {
1087 "prefix_list": "pf_list_1"
1088 }
1089 "ipv6": {
1090 "prefix_list": "pf_list_1"
1091 }
1092
1093 "large-community-list": {
1094 "id": "community_1",
1095 "exact_match": True
1096 }
1097 "community_list": {
1098 "id": "community_2",
1099 "exact_match": True
1100 }
1101 "tag": "tag_id"
1102 },
1103 "set": {
1104 "localpref": 150,
1105 "med": 30,
1106 "aspath": {
1107 "num": 20000,
1108 "action": "prepend",
1109 },
1110 "weight": 500,
1111 "community": {
1112 "num": "1:2 2:3",
1113 "action": additive
1114 }
1115 "large_community": {
1116 "num": "1:2:3 4:5;6",
1117 "action": additive
1118 },
1119 }
1120 }
1121 ]
1122 }
1123 }
1124 }
1125
1126 Returns
1127 -------
1128 errormsg(str) or True
1129 """
1130
1131 result = False
1132 logger.debug("Entering lib API: create_route_maps()")
1133 input_dict = deepcopy(input_dict)
1134 try:
1135 for router in input_dict.keys():
1136 if "route_maps" not in input_dict[router]:
1137 logger.debug("route_maps not present in input_dict")
1138 continue
1139 rmap_data = []
1140 for rmap_name, rmap_value in \
1141 input_dict[router]["route_maps"].iteritems():
1142
1143 for rmap_dict in rmap_value:
1144 del_action = rmap_dict.setdefault("delete", False)
1145
1146 if del_action:
1147 rmap_data.append("no route-map {}".format(rmap_name))
1148 continue
1149
1150 if "action" not in rmap_dict:
1151 errormsg = "action not present in input_dict"
1152 logger.error(errormsg)
1153 return False
1154
1155 rmap_action = rmap_dict.setdefault("action", "deny")
1156
1157 seq_id = rmap_dict.setdefault("seq_id", None)
1158 if seq_id is None:
1159 seq_id = get_seq_id("route_maps", router, rmap_name)
1160 else:
1161 set_seq_id("route_maps", router, seq_id, rmap_name)
1162
1163 rmap_data.append("route-map {} {} {}".format(
1164 rmap_name, rmap_action, seq_id
1165 ))
1166
1167 if "continue" in rmap_dict:
1168 continue_to = rmap_dict["continue"]
1169 if continue_to:
1170 rmap_data.append("on-match goto {}".
1171 format(continue_to))
1172 else:
1173 logger.error("In continue, 'route-map entry "
1174 "sequence number' is not provided")
1175 return False
1176
1177 if "goto" in rmap_dict:
1178 go_to = rmap_dict["goto"]
1179 if go_to:
1180 rmap_data.append("on-match goto {}".
1181 format(go_to))
1182 else:
1183 logger.error("In goto, 'Goto Clause number' is not"
1184 " provided")
1185 return False
1186
1187 if "call" in rmap_dict:
1188 call_rmap = rmap_dict["call"]
1189 if call_rmap:
1190 rmap_data.append("call {}".
1191 format(call_rmap))
1192 else:
1193 logger.error("In call, 'destination Route-Map' is"
1194 " not provided")
1195 return False
1196
1197 # Verifying if SET criteria is defined
1198 if "set" in rmap_dict:
1199 set_data = rmap_dict["set"]
1200 ipv4_data = set_data.setdefault("ipv4", {})
1201 ipv6_data = set_data.setdefault("ipv6", {})
1202 local_preference = set_data.setdefault("localpref",
1203 None)
1204 metric = set_data.setdefault("med", None)
1205 as_path = set_data.setdefault("aspath", {})
1206 weight = set_data.setdefault("weight", None)
1207 community = set_data.setdefault("community", {})
1208 large_community = set_data.setdefault(
1209 "large_community", {})
1210 large_comm_list = set_data.setdefault(
1211 "large_comm_list", {})
1212 set_action = set_data.setdefault("set_action", None)
1213 nexthop = set_data.setdefault("nexthop", None)
1214 origin = set_data.setdefault("origin", None)
1215
1216 # Local Preference
1217 if local_preference:
1218 rmap_data.append("set local-preference {}".
1219 format(local_preference))
1220
1221 # Metric
1222 if metric:
1223 rmap_data.append("set metric {} \n".format(metric))
1224
1225 # Origin
1226 if origin:
1227 rmap_data.append("set origin {} \n".format(origin))
1228
1229 # AS Path Prepend
1230 if as_path:
1231 as_num = as_path.setdefault("as_num", None)
1232 as_action = as_path.setdefault("as_action", None)
1233 if as_action and as_num:
1234 rmap_data.append("set as-path {} {}".
1235 format(as_action, as_num))
1236
1237 # Community
1238 if community:
1239 num = community.setdefault("num", None)
1240 comm_action = community.setdefault("action", None)
1241 if num:
1242 cmd = "set community {}".format(num)
1243 if comm_action:
1244 cmd = "{} {}".format(cmd, comm_action)
1245 rmap_data.append(cmd)
1246 else:
1247 logger.error("In community, AS Num not"
1248 " provided")
1249 return False
1250
1251 if large_community:
1252 num = large_community.setdefault("num", None)
1253 comm_action = large_community.setdefault("action",
1254 None)
1255 if num:
1256 cmd = "set large-community {}".format(num)
1257 if comm_action:
1258 cmd = "{} {}".format(cmd, comm_action)
1259
1260 rmap_data.append(cmd)
1261 else:
1262 logger.error("In large_community, AS Num not"
1263 " provided")
1264 return False
1265 if large_comm_list:
1266 id = large_comm_list.setdefault("id", None)
1267 del_comm = large_comm_list.setdefault("delete",
1268 None)
1269 if id:
1270 cmd = "set large-comm-list {}".format(id)
1271 if del_comm:
1272 cmd = "{} delete".format(cmd)
1273
1274 rmap_data.append(cmd)
1275 else:
1276 logger.error("In large_comm_list 'id' not"
1277 " provided")
1278 return False
1279
1280 # Weight
1281 if weight:
1282 rmap_data.append("set weight {}".format(
1283 weight))
1284 if ipv6_data:
1285 nexthop = ipv6_data.setdefault("nexthop", None)
1286 if nexthop:
1287 rmap_data.append("set ipv6 next-hop {}".format(
1288 nexthop
1289 ))
1290
1291 # Adding MATCH and SET sequence to RMAP if defined
1292 if "match" in rmap_dict:
1293 match_data = rmap_dict["match"]
1294 ipv4_data = match_data.setdefault("ipv4", {})
1295 ipv6_data = match_data.setdefault("ipv6", {})
1296 community = match_data.setdefault(
1297 "community_list",{})
1298 large_community = match_data.setdefault(
1299 "large_community", {}
1300 )
1301 large_community_list = match_data.setdefault(
1302 "large_community_list", {}
1303 )
1304
1305 if ipv4_data:
1306 # fetch prefix list data from rmap
1307 prefix_name = \
1308 ipv4_data.setdefault("prefix_lists",
1309 None)
1310 if prefix_name:
1311 rmap_data.append("match ip address"
1312 " prefix-list {}".format(prefix_name))
1313
1314 # fetch tag data from rmap
1315 tag = ipv4_data.setdefault("tag", None)
1316 if tag:
1317 rmap_data.append("match tag {}".format(tag))
1318
1319 # fetch large community data from rmap
1320 large_community_list = ipv4_data.setdefault(
1321 "large_community_list",{})
1322 large_community = match_data.setdefault(
1323 "large_community", {})
1324
1325 if ipv6_data:
1326 prefix_name = ipv6_data.setdefault("prefix_lists",
1327 None)
1328 if prefix_name:
1329 rmap_data.append("match ipv6 address"
1330 " prefix-list {}".format(prefix_name))
1331
1332 # fetch tag data from rmap
1333 tag = ipv6_data.setdefault("tag", None)
1334 if tag:
1335 rmap_data.append("match tag {}".format(tag))
1336
1337 # fetch large community data from rmap
1338 large_community_list = ipv6_data.setdefault(
1339 "large_community_list",{})
1340 large_community = match_data.setdefault(
1341 "large_community", {})
1342
1343 if community:
1344 if "id" not in community:
1345 logger.error("'id' is mandatory for "
1346 "community-list in match"
1347 " criteria")
1348 return False
1349 cmd = "match community {}".format(community["id"])
1350 exact_match = community.setdefault("exact_match",
1351 False)
1352 if exact_match:
1353 cmd = "{} exact-match".format(cmd)
1354
1355 rmap_data.append(cmd)
1356 if large_community:
1357 if "id" not in large_community:
1358 logger.error("'id' is mandatory for "
1359 "large-community-list in match "
1360 "criteria")
1361 return False
1362 cmd = "match large-community {}".format(
1363 large_community["id"])
1364 exact_match = large_community.setdefault(
1365 "exact_match", False)
1366 if exact_match:
1367 cmd = "{} exact-match".format(cmd)
1368 rmap_data.append(cmd)
1369 if large_community_list:
1370 if "id" not in large_community_list:
1371 logger.error("'id' is mandatory for "
1372 "large-community-list in match "
1373 "criteria")
1374 return False
1375 cmd = "match large-community {}".format(
1376 large_community_list["id"])
1377 exact_match = large_community_list.setdefault(
1378 "exact_match", False)
1379 if exact_match:
1380 cmd = "{} exact-match".format(cmd)
1381 rmap_data.append(cmd)
1382
1383 result = create_common_configuration(tgen, router,
1384 rmap_data,
1385 "route_maps",
1386 build=build)
1387
1388 except InvalidCLIError:
1389 # Traceback
1390 errormsg = traceback.format_exc()
1391 logger.error(errormsg)
1392 return errormsg
1393
1394 logger.debug("Exiting lib API: create_route_maps()")
1395 return result
1396
1397
1398 def delete_route_maps(tgen, input_dict):
1399 """
1400 Delete ip route maps from device
1401
1402 * `tgen` : Topogen object
1403 * `input_dict` : for which router,
1404 route map has to be deleted
1405
1406 Usage
1407 -----
1408 # Delete route-map rmap_1 and rmap_2 from router r1
1409 input_dict = {
1410 "r1": {
1411 "route_maps": ["rmap_1", "rmap__2"]
1412 }
1413 }
1414 result = delete_route_maps("ipv4", input_dict)
1415
1416 Returns
1417 -------
1418 errormsg(str) or True
1419 """
1420 logger.info("Entering lib API: delete_route_maps()")
1421
1422 for router in input_dict.keys():
1423 route_maps = input_dict[router]["route_maps"][:]
1424 rmap_data = input_dict[router]
1425 rmap_data["route_maps"] = {}
1426 for route_map_name in route_maps:
1427 rmap_data["route_maps"].update({
1428 route_map_name:
1429 [{
1430 "delete": True
1431 }]
1432 })
1433
1434 return create_route_maps(tgen, input_dict)
1435
1436
1437 def create_bgp_community_lists(tgen, input_dict, build=False):
1438 """
1439 Create bgp community-list or large-community-list on the devices as per
1440 the arguments passed. Takes list of communities in input.
1441
1442 Parameters
1443 ----------
1444 * `tgen` : Topogen object
1445 * `input_dict` : Input dict data, required when configuring from testcase
1446 * `build` : Only for initial setup phase this is set as True.
1447 Usage
1448 -----
1449 input_dict_1 = {
1450 "r3": {
1451 "bgp_community_lists": [
1452 {
1453 "community_type": "standard",
1454 "action": "permit",
1455 "name": "rmap_lcomm_{}".format(addr_type),
1456 "value": "1:1:1 1:2:3 2:1:1 2:2:2",
1457 "large": True
1458 }
1459 ]
1460 }
1461 }
1462 }
1463 result = create_bgp_community_lists(tgen, input_dict_1)
1464 """
1465
1466 result = False
1467 logger.debug("Entering lib API: create_bgp_community_lists()")
1468 input_dict = deepcopy(input_dict)
1469 try:
1470 for router in input_dict.keys():
1471 if "bgp_community_lists" not in input_dict[router]:
1472 errormsg = "bgp_community_lists not present in input_dict"
1473 logger.debug(errormsg)
1474 continue
1475
1476 config_data = []
1477
1478 community_list = input_dict[router]["bgp_community_lists"]
1479 for community_dict in community_list:
1480 del_action = community_dict.setdefault("delete", False)
1481 community_type = community_dict.setdefault("community_type",
1482 None)
1483 action = community_dict.setdefault("action", None)
1484 value = community_dict.setdefault("value", '')
1485 large = community_dict.setdefault("large", None)
1486 name = community_dict.setdefault("name", None)
1487 if large:
1488 cmd = "bgp large-community-list"
1489 else:
1490 cmd = "bgp community-list"
1491
1492 if not large and not (community_type and action and value):
1493 errormsg = "community_type, action and value are " \
1494 "required in bgp_community_list"
1495 logger.error(errormsg)
1496 return False
1497
1498 try:
1499 community_type = int(community_type)
1500 cmd = "{} {} {} {}".format(cmd, community_type, action,
1501 value)
1502 except ValueError:
1503
1504 cmd = "{} {} {} {} {}".format(
1505 cmd, community_type, name, action, value)
1506
1507 if del_action:
1508 cmd = "no {}".format(cmd)
1509
1510 config_data.append(cmd)
1511
1512 result = create_common_configuration(tgen, router, config_data,
1513 "bgp_community_list",
1514 build=build)
1515
1516 except InvalidCLIError:
1517 # Traceback
1518 errormsg = traceback.format_exc()
1519 logger.error(errormsg)
1520 return errormsg
1521
1522 logger.debug("Exiting lib API: create_bgp_community_lists()")
1523 return result
1524
1525
1526 def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False):
1527 """
1528 Shutdown or bringup router's interface "
1529
1530 * `tgen` : Topogen object
1531 * `dut` : Device under test
1532 * `intf_name` : Interface name to be shut/no shut
1533 * `ifaceaction` : Action, to shut/no shut interface,
1534 by default is False
1535
1536 Usage
1537 -----
1538 dut = "r3"
1539 intf = "r3-r1-eth0"
1540 # Shut down ineterface
1541 shutdown_bringup_interface(tgen, dut, intf, False)
1542
1543 # Bring up ineterface
1544 shutdown_bringup_interface(tgen, dut, intf, True)
1545
1546 Returns
1547 -------
1548 errormsg(str) or True
1549 """
1550
1551 router_list = tgen.routers()
1552 if ifaceaction:
1553 logger.info("Bringing up interface : {}".format(intf_name))
1554 else:
1555 logger.info("Shutting down interface : {}".format(intf_name))
1556
1557 interface_set_status(router_list[dut], intf_name, ifaceaction)
1558
1559
1560 #############################################
1561 # Verification APIs
1562 #############################################
1563 @retry(attempts=10, return_is_str=True, initial_wait=2)
1564 def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
1565 """
1566 Data will be read from input_dict or input JSON file, API will generate
1567 same prefixes, which were redistributed by either create_static_routes() or
1568 advertise_networks_using_network_command() and do will verify next_hop and
1569 each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
1570 command o/p.
1571
1572 Parameters
1573 ----------
1574 * `tgen` : topogen object
1575 * `addr_type` : ip type, ipv4/ipv6
1576 * `dut`: Device Under Test, for which user wants to test the data
1577 * `input_dict` : input dict, has details of static routes
1578 * `next_hop`[optional]: next_hop which needs to be verified,
1579 default: static
1580 * `protocol`[optional]: protocol, default = None
1581
1582 Usage
1583 -----
1584 # RIB can be verified for static routes OR network advertised using
1585 network command. Following are input_dicts to create static routes
1586 and advertise networks using network command. Any one of the input_dict
1587 can be passed to verify_rib() to verify routes in DUT"s RIB.
1588
1589 # Creating static routes for r1
1590 input_dict = {
1591 "r1": {
1592 "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
1593 "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
1594 }}
1595 # Advertising networks using network command in router r1
1596 input_dict = {
1597 "r1": {
1598 "advertise_networks": [{"start_ip": "20.0.0.0/32",
1599 "no_of_network": 10},
1600 {"start_ip": "30.0.0.0/32"}]
1601 }}
1602 # Verifying ipv4 routes in router r1 learned via BGP
1603 dut = "r2"
1604 protocol = "bgp"
1605 result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
1606
1607 Returns
1608 -------
1609 errormsg(str) or True
1610 """
1611
1612 logger.debug("Entering lib API: verify_rib()")
1613
1614 router_list = tgen.routers()
1615 for routerInput in input_dict.keys():
1616 for router, rnode in router_list.iteritems():
1617 if router != dut:
1618 continue
1619
1620 # Verifying RIB routes
1621 if addr_type == "ipv4":
1622 if protocol:
1623 command = "show ip route {} json".format(protocol)
1624 else:
1625 command = "show ip route json"
1626 else:
1627 if protocol:
1628 command = "show ipv6 route {} json".format(protocol)
1629 else:
1630 command = "show ipv6 route json"
1631
1632 logger.info("Checking router %s RIB:", router)
1633 rib_routes_json = run_frr_cmd(rnode, command, isjson=True)
1634
1635 # Verifying output dictionary rib_routes_json is not empty
1636 if bool(rib_routes_json) is False:
1637 errormsg = "No {} route found in rib of router {}..". \
1638 format(protocol, router)
1639 return errormsg
1640
1641 if "static_routes" in input_dict[routerInput]:
1642 static_routes = input_dict[routerInput]["static_routes"]
1643 st_found = False
1644 nh_found = False
1645 for static_route in static_routes:
1646 found_routes = []
1647 missing_routes = []
1648
1649 network = static_route["network"]
1650 if "no_of_ip" in static_route:
1651 no_of_ip = static_route["no_of_ip"]
1652 else:
1653 no_of_ip = 1
1654
1655 # Generating IPs for verification
1656 ip_list = generate_ips(network, no_of_ip)
1657 for st_rt in ip_list:
1658 st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
1659
1660 if st_rt in rib_routes_json:
1661 st_found = True
1662 found_routes.append(st_rt)
1663
1664 if next_hop:
1665 if type(next_hop) is not list:
1666 next_hop = [next_hop]
1667
1668 found_hops = [rib_r["ip"] for rib_r in
1669 rib_routes_json[st_rt][0][
1670 "nexthops"]]
1671 for nh in found_hops:
1672 nh_found = False
1673 if nh and nh in next_hop:
1674 nh_found = True
1675 else:
1676 errormsg = ("Nexthop {} is Missing for {}"
1677 " route {} in RIB of router"
1678 " {}\n".format(next_hop,
1679 protocol,
1680 st_rt, dut))
1681
1682 return errormsg
1683 else:
1684 missing_routes.append(st_rt)
1685
1686 if nh_found:
1687 logger.info("Found next_hop %s for all routes in RIB of"
1688 " router %s\n", next_hop, dut)
1689
1690 if not st_found and len(missing_routes) > 0:
1691 errormsg = "Missing route in RIB of router {}, routes: " \
1692 "{}\n".format(dut, missing_routes)
1693 return errormsg
1694
1695 logger.info("Verified routes in router %s RIB, found routes"
1696 " are: %s\n", dut, found_routes)
1697
1698 continue
1699
1700 if "bgp" in input_dict[routerInput]:
1701 if 'advertise_networks' in input_dict[routerInput]["bgp"]\
1702 ["address_family"][addr_type]["unicast"]:
1703
1704 found_routes = []
1705 missing_routes = []
1706 advertise_network = input_dict[routerInput]["bgp"]\
1707 ["address_family"][addr_type]["unicast"]\
1708 ["advertise_networks"]
1709
1710 for advertise_network_dict in advertise_network:
1711 start_ip = advertise_network_dict["network"]
1712 if "no_of_network" in advertise_network_dict:
1713 no_of_network = advertise_network_dict["no_of_network"]
1714 else:
1715 no_of_network = 1
1716
1717 # Generating IPs for verification
1718 ip_list = generate_ips(start_ip, no_of_network)
1719 for st_rt in ip_list:
1720 st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
1721
1722 found = False
1723 nh_found = False
1724 if st_rt in rib_routes_json:
1725 found = True
1726 found_routes.append(st_rt)
1727
1728 if next_hop:
1729 if type(next_hop) is not list:
1730 next_hop = [next_hop]
1731
1732 for index, nh in enumerate(next_hop):
1733 if rib_routes_json[st_rt][0]\
1734 ['nexthops'][index]['ip'] == nh:
1735 nh_found = True
1736 else:
1737 errormsg=("Nexthop {} is Missing"
1738 " for {} route {} in "
1739 "RIB of router {}\n".\
1740 format(next_hop,
1741 protocol,
1742 st_rt, dut))
1743 return errormsg
1744
1745 else:
1746 missing_routes.append(st_rt)
1747
1748 if nh_found:
1749 logger.info("Found next_hop {} for all routes in RIB"
1750 " of router {}\n".format(next_hop, dut))
1751
1752 if not found and len(missing_routes) > 0:
1753 errormsg = ("Missing {} route in RIB of router {}, "
1754 "routes: {} \n".\
1755 format(addr_type, dut, missing_routes))
1756 return errormsg
1757
1758 logger.info("Verified {} routes in router {} RIB, found"
1759 " routes are: {}\n".\
1760 format(addr_type, dut, found_routes))
1761
1762 logger.debug("Exiting lib API: verify_rib()")
1763 return True
1764
1765
1766 def verify_admin_distance_for_static_routes(tgen, input_dict):
1767 """
1768 API to verify admin distance for static routes as defined in input_dict/
1769 input JSON by running show ip/ipv6 route json command.
1770
1771 Parameter
1772 ---------
1773 * `tgen` : topogen object
1774 * `input_dict`: having details like - for which router and static routes
1775 admin dsitance needs to be verified
1776 Usage
1777 -----
1778 # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
1779 10.0.0.2 in router r1
1780 input_dict = {
1781 "r1": {
1782 "static_routes": [{
1783 "network": "10.0.20.1/32",
1784 "admin_distance": 10,
1785 "next_hop": "10.0.0.2"
1786 }]
1787 }
1788 }
1789 result = verify_admin_distance_for_static_routes(tgen, input_dict)
1790
1791 Returns
1792 -------
1793 errormsg(str) or True
1794 """
1795
1796 logger.debug("Entering lib API: verify_admin_distance_for_static_routes()")
1797
1798 for router in input_dict.keys():
1799 if router not in tgen.routers():
1800 continue
1801
1802 rnode = tgen.routers()[router]
1803
1804 for static_route in input_dict[router]["static_routes"]:
1805 addr_type = validate_ip_address(static_route["network"])
1806 # Command to execute
1807 if addr_type == "ipv4":
1808 command = "show ip route json"
1809 else:
1810 command = "show ipv6 route json"
1811 show_ip_route_json = run_frr_cmd(rnode, command, isjson=True)
1812
1813 logger.info("Verifying admin distance for static route %s"
1814 " under dut %s:", static_route, router)
1815 network = static_route["network"]
1816 next_hop = static_route["next_hop"]
1817 admin_distance = static_route["admin_distance"]
1818 route_data = show_ip_route_json[network][0]
1819 if network in show_ip_route_json:
1820 if route_data["nexthops"][0]["ip"] == next_hop:
1821 if route_data["distance"] != admin_distance:
1822 errormsg = ("Verification failed: admin distance"
1823 " for static route {} under dut {},"
1824 " found:{} but expected:{}".
1825 format(static_route, router,
1826 route_data["distance"],
1827 admin_distance))
1828 return errormsg
1829 else:
1830 logger.info("Verification successful: admin"
1831 " distance for static route %s under"
1832 " dut %s, found:%s", static_route,
1833 router, route_data["distance"])
1834
1835 else:
1836 errormsg = ("Static route {} not found in "
1837 "show_ip_route_json for dut {}".
1838 format(network, router))
1839 return errormsg
1840
1841 logger.debug("Exiting lib API: verify_admin_distance_for_static_routes()")
1842 return True
1843
1844
1845 def verify_prefix_lists(tgen, input_dict):
1846 """
1847 Running "show ip prefix-list" command and verifying given prefix-list
1848 is present in router.
1849
1850 Parameters
1851 ----------
1852 * `tgen` : topogen object
1853 * `input_dict`: data to verify prefix lists
1854
1855 Usage
1856 -----
1857 # To verify pf_list_1 is present in router r1
1858 input_dict = {
1859 "r1": {
1860 "prefix_lists": ["pf_list_1"]
1861 }}
1862 result = verify_prefix_lists("ipv4", input_dict, tgen)
1863
1864 Returns
1865 -------
1866 errormsg(str) or True
1867 """
1868
1869 logger.debug("Entering lib API: verify_prefix_lists()")
1870
1871 for router in input_dict.keys():
1872 if router not in tgen.routers():
1873 continue
1874
1875 rnode = tgen.routers()[router]
1876
1877 # Show ip prefix list
1878 show_prefix_list = run_frr_cmd(rnode, "show ip prefix-list")
1879
1880 # Verify Prefix list is deleted
1881 prefix_lists_addr = input_dict[router]["prefix_lists"]
1882 for addr_type in prefix_lists_addr:
1883 if not check_address_types(addr_type):
1884 continue
1885
1886 for prefix_list in prefix_lists_addr[addr_type].keys():
1887 if prefix_list in show_prefix_list:
1888 errormsg = ("Prefix list {} is/are present in the router"
1889 " {}".format(prefix_list, router))
1890 return errormsg
1891
1892 logger.info("Prefix list %s is/are not present in the router"
1893 " from router %s", prefix_list, router)
1894
1895 logger.debug("Exiting lib API: verify_prefix_lists()")
1896 return True
1897
1898
1899 @retry(attempts=2, wait=4, return_is_str=True, initial_wait=2)
1900 def verify_route_maps(tgen, input_dict):
1901 """
1902 Running "show route-map" command and verifying given route-map
1903 is present in router.
1904 Parameters
1905 ----------
1906 * `tgen` : topogen object
1907 * `input_dict`: data to verify prefix lists
1908 Usage
1909 -----
1910 # To verify rmap_1 and rmap_2 are present in router r1
1911 input_dict = {
1912 "r1": {
1913 "route_maps": ["rmap_1", "rmap_2"]
1914 }
1915 }
1916 result = verify_route_maps(tgen, input_dict)
1917 Returns
1918 -------
1919 errormsg(str) or True
1920 """
1921
1922 logger.debug("Entering lib API: verify_route_maps()")
1923
1924 for router in input_dict.keys():
1925 if router not in tgen.routers():
1926 continue
1927
1928 rnode = tgen.routers()[router]
1929 # Show ip route-map
1930 show_route_maps = rnode.vtysh_cmd("show route-map")
1931
1932 # Verify route-map is deleted
1933 route_maps = input_dict[router]["route_maps"]
1934 for route_map in route_maps:
1935 if route_map in show_route_maps:
1936 errormsg = ("Route map {} is not deleted from router"
1937 " {}".format(route_map, router))
1938 return errormsg
1939
1940 logger.info("Route map %s is/are deleted successfully from"
1941 " router %s", route_maps, router)
1942
1943 logger.debug("Exiting lib API: verify_route_maps()")
1944 return True
1945
1946
1947 @retry(attempts=3, wait=4, return_is_str=True)
1948 def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
1949 """
1950 API to veiryf BGP large community is attached in route for any given
1951 DUT by running "show bgp ipv4/6 {route address} json" command.
1952
1953 Parameters
1954 ----------
1955 * `tgen`: topogen object
1956 * `addr_type` : ip type, ipv4/ipv6
1957 * `dut`: Device Under Test
1958 * `network`: network for which set criteria needs to be verified
1959 * `input_dict`: having details like - for which router, community and
1960 values needs to be verified
1961 Usage
1962 -----
1963 networks = ["200.50.2.0/32"]
1964 input_dict = {
1965 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
1966 }
1967 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
1968
1969 Returns
1970 -------
1971 errormsg(str) or True
1972 """
1973
1974 logger.info("Entering lib API: verify_bgp_community()")
1975 if router not in tgen.routers():
1976 return False
1977
1978 rnode = tgen.routers()[router]
1979
1980 logger.debug("Verifying BGP community attributes on dut %s: for %s "
1981 "network %s", router, addr_type, network)
1982
1983 for net in network:
1984 cmd = "show bgp {} {} json".format(addr_type, net)
1985 show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True)
1986 logger.info(show_bgp_json)
1987 if "paths" not in show_bgp_json:
1988 return "Prefix {} not found in BGP table of router: {}". \
1989 format(net, router)
1990
1991 as_paths = show_bgp_json["paths"]
1992 found = False
1993 for i in range(len(as_paths)):
1994 if "largeCommunity" in show_bgp_json["paths"][i] or \
1995 "community" in show_bgp_json["paths"][i]:
1996 found = True
1997 logger.info("Large Community attribute is found for route:"
1998 " %s in router: %s", net, router)
1999 if input_dict is not None:
2000 for criteria, comm_val in input_dict.items():
2001 show_val = show_bgp_json["paths"][i][criteria][
2002 "string"]
2003 if comm_val == show_val:
2004 logger.info("Verifying BGP %s for prefix: %s"
2005 " in router: %s, found expected"
2006 " value: %s", criteria, net, router,
2007 comm_val)
2008 else:
2009 errormsg = "Failed: Verifying BGP attribute" \
2010 " {} for route: {} in router: {}" \
2011 ", expected value: {} but found" \
2012 ": {}".format(
2013 criteria, net, router, comm_val,
2014 show_val)
2015 return errormsg
2016
2017 if not found:
2018 errormsg = (
2019 "Large Community attribute is not found for route: "
2020 "{} in router: {} ".format(net, router))
2021 return errormsg
2022
2023 logger.debug("Exiting lib API: verify_bgp_community()")
2024 return True
2025
2026
2027 def verify_create_community_list(tgen, input_dict):
2028 """
2029 API is to verify if large community list is created for any given DUT in
2030 input_dict by running "sh bgp large-community-list {"comm_name"} detail"
2031 command.
2032 Parameters
2033 ----------
2034 * `tgen`: topogen object
2035 * `input_dict`: having details like - for which router, large community
2036 needs to be verified
2037 Usage
2038 -----
2039 input_dict = {
2040 "r1": {
2041 "large-community-list": {
2042 "standard": {
2043 "Test1": [{"action": "PERMIT", "attribute":\
2044 ""}]
2045 }}}}
2046 result = verify_create_community_list(tgen, input_dict)
2047 Returns
2048 -------
2049 errormsg(str) or True
2050 """
2051
2052 logger.debug("Entering lib API: verify_create_community_list()")
2053
2054 for router in input_dict.keys():
2055 if router not in tgen.routers():
2056 continue
2057
2058 rnode = tgen.routers()[router]
2059
2060 logger.info("Verifying large-community is created for dut %s:",
2061 router)
2062
2063 for comm_data in input_dict[router]["bgp_community_lists"]:
2064 comm_name = comm_data["name"]
2065 comm_type = comm_data["community_type"]
2066 show_bgp_community = \
2067 run_frr_cmd(rnode,
2068 "show bgp large-community-list {} detail".
2069 format(comm_name))
2070
2071 # Verify community list and type
2072 if comm_name in show_bgp_community and comm_type in \
2073 show_bgp_community:
2074 logger.info("BGP %s large-community-list %s is"
2075 " created", comm_type, comm_name)
2076 else:
2077 errormsg = "BGP {} large-community-list {} is not" \
2078 " created".format(comm_type, comm_name)
2079 return errormsg
2080
2081 logger.debug("Exiting lib API: verify_create_community_list()")
2082 return True