]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/common_config.py
tests: please follow the style guide
[mirror_frr.git] / tests / topotests / lib / common_config.py
1 #
2 # Copyright (c) 2019 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
5 #
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
9 # in all copies.
10 #
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
18 # OF THIS SOFTWARE.
19 #
20
21 from collections import OrderedDict
22 from datetime import datetime
23 from time import sleep
24 from copy import deepcopy
25 from subprocess import call
26 from subprocess import STDOUT as SUB_STDOUT
27 from subprocess import PIPE as SUB_PIPE
28 from subprocess import Popen
29 from functools import wraps
30 from re import search as re_search
31 from tempfile import mkdtemp
32
33 import os
34 import io
35 import sys
36 import traceback
37 import socket
38 import ipaddress
39 import platform
40
41 if sys.version_info[0] > 2:
42 import configparser
43 else:
44 import StringIO
45 import ConfigParser as configparser
46
47 from lib.topolog import logger, logger_config
48 from lib.topogen import TopoRouter, get_topogen
49 from lib.topotest import interface_set_status, version_cmp, frr_unicode
50
51 FRRCFG_FILE = "frr_json.conf"
52 FRRCFG_BKUP_FILE = "frr_json_initial.conf"
53
54 ERROR_LIST = ["Malformed", "Failure", "Unknown", "Incomplete"]
55 ROUTER_LIST = []
56
57 ####
58 CD = os.path.dirname(os.path.realpath(__file__))
59 PYTESTINI_PATH = os.path.join(CD, "../pytest.ini")
60
61 # Creating tmp dir with testsuite name to avoid conflict condition when
62 # multiple testsuites run together. All temporary files would be created
63 # in this dir and this dir would be removed once testsuite run is
64 # completed
65 LOGDIR = "/tmp/topotests/"
66 TMPDIR = None
67
68 # NOTE: to save execution logs to log file frrtest_log_dir must be configured
69 # in `pytest.ini`.
70 config = configparser.ConfigParser()
71 config.read(PYTESTINI_PATH)
72
73 config_section = "topogen"
74
75 if config.has_option("topogen", "verbosity"):
76 loglevel = config.get("topogen", "verbosity")
77 loglevel = loglevel.upper()
78 else:
79 loglevel = "INFO"
80
81 if config.has_option("topogen", "frrtest_log_dir"):
82 frrtest_log_dir = config.get("topogen", "frrtest_log_dir")
83 time_stamp = datetime.time(datetime.now())
84 logfile_name = "frr_test_bgp_"
85 frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp)
86 print("frrtest_log_file..", frrtest_log_file)
87
88 logger = logger_config.get_logger(
89 name="test_execution_logs", log_level=loglevel, target=frrtest_log_file
90 )
91 print("Logs will be sent to logfile: {}".format(frrtest_log_file))
92
93 if config.has_option("topogen", "show_router_config"):
94 show_router_config = config.get("topogen", "show_router_config")
95 else:
96 show_router_config = False
97
98 # env variable for setting what address type to test
99 ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES")
100
101
102 # Saves sequence id numbers
103 SEQ_ID = {"prefix_lists": {}, "route_maps": {}}
104
105
106 def get_seq_id(obj_type, router, obj_name):
107 """
108 Generates and saves sequence number in interval of 10
109 Parameters
110 ----------
111 * `obj_type`: prefix_lists or route_maps
112 * `router`: router name
113 *` obj_name`: name of the prefix-list or route-map
114 Returns
115 --------
116 Sequence number generated
117 """
118
119 router_data = SEQ_ID[obj_type].setdefault(router, {})
120 obj_data = router_data.setdefault(obj_name, {})
121 seq_id = obj_data.setdefault("seq_id", 0)
122
123 seq_id = int(seq_id) + 10
124 obj_data["seq_id"] = seq_id
125
126 return seq_id
127
128
129 def set_seq_id(obj_type, router, id, obj_name):
130 """
131 Saves sequence number if not auto-generated and given by user
132 Parameters
133 ----------
134 * `obj_type`: prefix_lists or route_maps
135 * `router`: router name
136 *` obj_name`: name of the prefix-list or route-map
137 """
138 router_data = SEQ_ID[obj_type].setdefault(router, {})
139 obj_data = router_data.setdefault(obj_name, {})
140 seq_id = obj_data.setdefault("seq_id", 0)
141
142 seq_id = int(seq_id) + int(id)
143 obj_data["seq_id"] = seq_id
144
145
146 class InvalidCLIError(Exception):
147 """Raise when the CLI command is wrong"""
148
149 pass
150
151
152 def run_frr_cmd(rnode, cmd, isjson=False):
153 """
154 Execute frr show commands in priviledged mode
155 * `rnode`: router node on which commands needs to executed
156 * `cmd`: Command to be executed on frr
157 * `isjson`: If command is to get json data or not
158 :return str:
159 """
160
161 if cmd:
162 ret_data = rnode.vtysh_cmd(cmd, isjson=isjson)
163
164 if True:
165 if isjson:
166 logger.debug(ret_data)
167 print_data = rnode.vtysh_cmd(cmd.rstrip("json"), isjson=False)
168 else:
169 print_data = ret_data
170
171 logger.info(
172 "Output for command [ %s] on router %s:\n%s",
173 cmd.rstrip("json"),
174 rnode.name,
175 print_data,
176 )
177 return ret_data
178
179 else:
180 raise InvalidCLIError("No actual cmd passed")
181
182
183 def apply_raw_config(tgen, input_dict):
184
185 """
186 API to configure raw configuration on device. This can be used for any cli
187 which does has not been implemented in JSON.
188
189 Parameters
190 ----------
191 * `tgen`: tgen onject
192 * `input_dict`: configuration that needs to be applied
193
194 Usage
195 -----
196 input_dict = {
197 "r2": {
198 "raw_config": [
199 "router bgp",
200 "no bgp update-group-split-horizon"
201 ]
202 }
203 }
204 Returns
205 -------
206 True or errormsg
207 """
208
209 result = True
210 for router_name in input_dict.keys():
211 config_cmd = input_dict[router_name]["raw_config"]
212
213 if not isinstance(config_cmd, list):
214 config_cmd = [config_cmd]
215
216 frr_cfg_file = "{}/{}/{}".format(TMPDIR, router_name, FRRCFG_FILE)
217 with open(frr_cfg_file, "w") as cfg:
218 for cmd in config_cmd:
219 cfg.write("{}\n".format(cmd))
220
221 result = load_config_to_router(tgen, router_name)
222
223 return result
224
225
226 def create_common_configuration(
227 tgen, router, data, config_type=None, build=False, load_config=True
228 ):
229 """
230 API to create object of class FRRConfig and also create frr_json.conf
231 file. It will create interface and common configurations and save it to
232 frr_json.conf and load to router
233 Parameters
234 ----------
235 * `tgen`: tgen onject
236 * `data`: Congiguration data saved in a list.
237 * `router` : router id to be configured.
238 * `config_type` : Syntactic information while writing configuration. Should
239 be one of the value as mentioned in the config_map below.
240 * `build` : Only for initial setup phase this is set as True
241 Returns
242 -------
243 True or False
244 """
245 TMPDIR = os.path.join(LOGDIR, tgen.modname)
246
247 fname = "{}/{}/{}".format(TMPDIR, router, FRRCFG_FILE)
248
249 config_map = OrderedDict(
250 {
251 "general_config": "! FRR General Config\n",
252 "interface_config": "! Interfaces Config\n",
253 "static_route": "! Static Route Config\n",
254 "prefix_list": "! Prefix List Config\n",
255 "bgp_community_list": "! Community List Config\n",
256 "route_maps": "! Route Maps Config\n",
257 "bgp": "! BGP Config\n",
258 "vrf": "! VRF Config\n",
259 "ospf": "! OSPF Config\n",
260 }
261 )
262
263 if build:
264 mode = "a"
265 elif not load_config:
266 mode = "a"
267 else:
268 mode = "w"
269
270 try:
271 frr_cfg_fd = open(fname, mode)
272 if config_type:
273 frr_cfg_fd.write(config_map[config_type])
274 for line in data:
275 frr_cfg_fd.write("{} \n".format(str(line)))
276 frr_cfg_fd.write("\n")
277
278 except IOError as err:
279 logger.error(
280 "Unable to open FRR Config File. error(%s): %s" % (err.errno, err.strerror)
281 )
282 return False
283 finally:
284 frr_cfg_fd.close()
285
286 # If configuration applied from build, it will done at last
287 if not build and load_config:
288 load_config_to_router(tgen, router)
289
290 return True
291
292
293 def kill_router_daemons(tgen, router, daemons):
294 """
295 Router's current config would be saved to /etc/frr/ for each deamon
296 and deamon would be killed forcefully using SIGKILL.
297 * `tgen` : topogen object
298 * `router`: Device under test
299 * `daemons`: list of daemons to be killed
300 """
301
302 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
303
304 try:
305 router_list = tgen.routers()
306
307 # Saving router config to /etc/frr, which will be loaded to router
308 # when it starts
309 router_list[router].vtysh_cmd("write memory")
310
311 # Kill Daemons
312 result = router_list[router].killDaemons(daemons)
313 if len(result) > 0:
314 assert "Errors found post shutdown - details follow:" == 0, result
315 return result
316
317 except Exception as e:
318 errormsg = traceback.format_exc()
319 logger.error(errormsg)
320 return errormsg
321
322
323 def start_router_daemons(tgen, router, daemons):
324 """
325 Daemons defined by user would be started
326 * `tgen` : topogen object
327 * `router`: Device under test
328 * `daemons`: list of daemons to be killed
329 """
330
331 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
332
333 try:
334 router_list = tgen.routers()
335
336 # Start daemons
337 result = router_list[router].startDaemons(daemons)
338 return result
339
340 except Exception as e:
341 errormsg = traceback.format_exc()
342 logger.error(errormsg)
343 return errormsg
344
345 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
346 return True
347
348
349 def kill_mininet_routers_process(tgen):
350 """
351 Kill all mininet stale router' processes
352 * `tgen` : topogen object
353 """
354
355 router_list = tgen.routers()
356 for rname, router in router_list.items():
357 daemon_list = [
358 "zebra",
359 "ospfd",
360 "ospf6d",
361 "bgpd",
362 "ripd",
363 "ripngd",
364 "isisd",
365 "pimd",
366 "ldpd",
367 "staticd",
368 ]
369 for daemon in daemon_list:
370 router.run("killall -9 {}".format(daemon))
371
372
373 def check_router_status(tgen):
374 """
375 Check if all daemons are running for all routers in topology
376 * `tgen` : topogen object
377 """
378
379 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
380
381 try:
382 router_list = tgen.routers()
383 for router, rnode in router_list.items():
384
385 result = rnode.check_router_running()
386 if result != "":
387 daemons = []
388 if "bgpd" in result:
389 daemons.append("bgpd")
390 if "zebra" in result:
391 daemons.append("zebra")
392
393 rnode.startDaemons(daemons)
394
395 except Exception as e:
396 errormsg = traceback.format_exc()
397 logger.error(errormsg)
398 return errormsg
399
400 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
401 return True
402
403
404 def getStrIO():
405 """
406 Return a StringIO object appropriate for the current python version.
407 """
408 if sys.version_info[0] > 2:
409 return io.StringIO()
410 else:
411 return StringIO.StringIO()
412
413
414 def reset_config_on_routers(tgen, routerName=None):
415 """
416 Resets configuration on routers to the snapshot created using input JSON
417 file. It replaces existing router configuration with FRRCFG_BKUP_FILE
418
419 Parameters
420 ----------
421 * `tgen` : Topogen object
422 * `routerName` : router config is to be reset
423 """
424
425 logger.debug("Entering API: reset_config_on_routers")
426
427 router_list = tgen.routers()
428 for rname in ROUTER_LIST:
429 if routerName and routerName != rname:
430 continue
431
432 router = router_list[rname]
433 logger.info("Configuring router %s to initial test configuration", rname)
434
435 cfg = router.run("vtysh -c 'show running'")
436 fname = "{}/{}/frr.sav".format(TMPDIR, rname)
437 dname = "{}/{}/delta.conf".format(TMPDIR, rname)
438 f = open(fname, "w")
439 for line in cfg.split("\n"):
440 line = line.strip()
441
442 if (
443 line == "Building configuration..."
444 or line == "Current configuration:"
445 or not line
446 ):
447 continue
448 f.write(line)
449 f.write("\n")
450
451 f.close()
452 run_cfg_file = "{}/{}/frr.sav".format(TMPDIR, rname)
453 init_cfg_file = "{}/{}/frr_json_initial.conf".format(TMPDIR, rname)
454 command = "/usr/lib/frr/frr-reload.py --input {} --test {} > {}".format(
455 run_cfg_file, init_cfg_file, dname
456 )
457 result = call(command, shell=True, stderr=SUB_STDOUT, stdout=SUB_PIPE)
458
459 # Assert if command fail
460 if result > 0:
461 logger.error("Delta file creation failed. Command executed %s", command)
462 with open(run_cfg_file, "r") as fd:
463 logger.info(
464 "Running configuration saved in %s is:\n%s", run_cfg_file, fd.read()
465 )
466 with open(init_cfg_file, "r") as fd:
467 logger.info(
468 "Test configuration saved in %s is:\n%s", init_cfg_file, fd.read()
469 )
470
471 err_cmd = ["/usr/bin/vtysh", "-m", "-f", run_cfg_file]
472 result = Popen(err_cmd, stdout=SUB_PIPE, stderr=SUB_PIPE)
473 output = result.communicate()
474 for out_data in output:
475 temp_data = out_data.decode("utf-8").lower()
476 for out_err in ERROR_LIST:
477 if out_err.lower() in temp_data:
478 logger.error(
479 "Found errors while validating data in" " %s", run_cfg_file
480 )
481 raise InvalidCLIError(out_data)
482 raise InvalidCLIError("Unknown error in %s", output)
483
484 f = open(dname, "r")
485 delta = getStrIO()
486 delta.write("configure terminal\n")
487 t_delta = f.read()
488
489 # Don't disable debugs
490 check_debug = True
491
492 for line in t_delta.split("\n"):
493 line = line.strip()
494 if line == "Lines To Delete" or line == "===============" or not line:
495 continue
496
497 if line == "Lines To Add":
498 check_debug = False
499 continue
500
501 if line == "============" or not line:
502 continue
503
504 # Leave debugs and log output alone
505 if check_debug:
506 if "debug" in line or "log file" in line:
507 continue
508
509 delta.write(line)
510 delta.write("\n")
511
512 f.close()
513
514 delta.write("end\n")
515
516 output = router.vtysh_multicmd(delta.getvalue(), pretty_output=False)
517
518 delta.close()
519 delta = getStrIO()
520 cfg = router.run("vtysh -c 'show running'")
521 for line in cfg.split("\n"):
522 line = line.strip()
523 delta.write(line)
524 delta.write("\n")
525
526 # Router current configuration to log file or console if
527 # "show_router_config" is defined in "pytest.ini"
528 if show_router_config:
529 logger.info("Configuration on router {} after reset:".format(rname))
530 logger.info(delta.getvalue())
531 delta.close()
532
533 logger.debug("Exiting API: reset_config_on_routers")
534 return True
535
536
537 def load_config_to_router(tgen, routerName, save_bkup=False):
538 """
539 Loads configuration on router from the file FRRCFG_FILE.
540
541 Parameters
542 ----------
543 * `tgen` : Topogen object
544 * `routerName` : router for which configuration to be loaded
545 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
546 """
547
548 logger.debug("Entering API: load_config_to_router")
549
550 router_list = tgen.routers()
551 for rname in ROUTER_LIST:
552 if routerName and rname != routerName:
553 continue
554
555 router = router_list[rname]
556 try:
557 frr_cfg_file = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_FILE)
558 frr_cfg_bkup = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_BKUP_FILE)
559 with open(frr_cfg_file, "r+") as cfg:
560 data = cfg.read()
561 logger.info(
562 "Applying following configuration on router"
563 " {}:\n{}".format(rname, data)
564 )
565 if save_bkup:
566 with open(frr_cfg_bkup, "w") as bkup:
567 bkup.write(data)
568
569 output = router.vtysh_multicmd(data, pretty_output=False)
570 for out_err in ERROR_LIST:
571 if out_err.lower() in output.lower():
572 raise InvalidCLIError("%s" % output)
573
574 cfg.truncate(0)
575
576 except IOError as err:
577 errormsg = (
578 "Unable to open config File. error(%s):" " %s",
579 (err.errno, err.strerror),
580 )
581 return errormsg
582
583 # Router current configuration to log file or console if
584 # "show_router_config" is defined in "pytest.ini"
585 if show_router_config:
586 logger.info("New configuration for router {}:".format(rname))
587 new_config = router.run("vtysh -c 'show running'")
588 logger.info(new_config)
589
590 logger.debug("Exiting API: load_config_to_router")
591 return True
592
593
594 def get_frr_ipv6_linklocal(tgen, router, intf=None, vrf=None):
595 """
596 API to get the link local ipv6 address of a perticular interface using
597 FRR command 'show interface'
598
599 * `tgen`: tgen onject
600 * `router` : router for which hightest interface should be
601 calculated
602 * `intf` : interface for which linklocal address needs to be taken
603 * `vrf` : VRF name
604
605 Usage
606 -----
607 linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A)
608
609 Returns
610 -------
611 1) array of interface names to link local ips.
612 """
613
614 router_list = tgen.routers()
615 for rname, rnode in router_list.items():
616 if rname != router:
617 continue
618
619 linklocal = []
620
621 if vrf:
622 cmd = "show interface vrf {}".format(vrf)
623 else:
624 cmd = "show interface"
625
626 ifaces = router_list[router].run('vtysh -c "{}"'.format(cmd))
627
628 # Fix newlines (make them all the same)
629 ifaces = ("\n".join(ifaces.splitlines()) + "\n").splitlines()
630
631 interface = None
632 ll_per_if_count = 0
633 for line in ifaces:
634 # Interface name
635 m = re_search("Interface ([a-zA-Z0-9-]+) is", line)
636 if m:
637 interface = m.group(1).split(" ")[0]
638 ll_per_if_count = 0
639
640 # Interface ip
641 m1 = re_search("inet6 (fe80[:a-fA-F0-9]+[/0-9]+)", line)
642 if m1:
643 local = m1.group(1)
644 ll_per_if_count += 1
645 if ll_per_if_count > 1:
646 linklocal += [["%s-%s" % (interface, ll_per_if_count), local]]
647 else:
648 linklocal += [[interface, local]]
649
650 if linklocal:
651 if intf:
652 return [_linklocal[1] for _linklocal in linklocal if _linklocal[0] == intf][
653 0
654 ].split("/")[0]
655 return linklocal
656 else:
657 errormsg = "Link local ip missing on router {}"
658 return errormsg
659
660
661 def generate_support_bundle():
662 """
663 API to generate support bundle on any verification ste failure.
664 it runs a python utility, /usr/lib/frr/generate_support_bundle.py,
665 which basically runs defined CLIs and dumps the data to specified location
666 """
667
668 tgen = get_topogen()
669 router_list = tgen.routers()
670 test_name = sys._getframe(2).f_code.co_name
671 TMPDIR = os.path.join(LOGDIR, tgen.modname)
672
673 for rname, rnode in router_list.items():
674 logger.info("Generating support bundle for {}".format(rname))
675 rnode.run("mkdir -p /var/log/frr")
676 bundle_log = rnode.run("python2 /usr/lib/frr/generate_support_bundle.py")
677 logger.info(bundle_log)
678
679 dst_bundle = "{}/{}/support_bundles/{}".format(TMPDIR, rname, test_name)
680 src_bundle = "/var/log/frr"
681 rnode.run("rm -rf {}".format(dst_bundle))
682 rnode.run("mkdir -p {}".format(dst_bundle))
683 rnode.run("mv -f {}/* {}".format(src_bundle, dst_bundle))
684
685 return True
686
687
688 def start_topology(tgen, daemon=None):
689 """
690 Starting topology, create tmp files which are loaded to routers
691 to start deamons and then start routers
692 * `tgen` : topogen object
693 """
694
695 global TMPDIR, ROUTER_LIST
696 # Starting topology
697 tgen.start_topology()
698
699 # Starting deamons
700
701 router_list = tgen.routers()
702 ROUTER_LIST = sorted(
703 router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0))
704 )
705 TMPDIR = os.path.join(LOGDIR, tgen.modname)
706
707 linux_ver = ""
708 router_list = tgen.routers()
709 for rname in ROUTER_LIST:
710 router = router_list[rname]
711
712 # It will help in debugging the failures, will give more details on which
713 # specific kernel version tests are failing
714 if linux_ver == "":
715 linux_ver = router.run("uname -a")
716 logger.info("Logging platform related details: \n %s \n", linux_ver)
717
718 try:
719 os.chdir(TMPDIR)
720
721 # Creating router named dir and empty zebra.conf bgpd.conf files
722 # inside the current directory
723 if os.path.isdir("{}".format(rname)):
724 os.system("rm -rf {}".format(rname))
725 os.mkdir("{}".format(rname))
726 os.system("chmod -R go+rw {}".format(rname))
727 os.chdir("{}/{}".format(TMPDIR, rname))
728 os.system("touch zebra.conf bgpd.conf")
729 else:
730 os.mkdir("{}".format(rname))
731 os.system("chmod -R go+rw {}".format(rname))
732 os.chdir("{}/{}".format(TMPDIR, rname))
733 os.system("touch zebra.conf bgpd.conf")
734
735 except IOError as err:
736 logger.error("I/O error({0}): {1}".format(err.errno, err.strerror))
737
738 # Loading empty zebra.conf file to router, to start the zebra deamon
739 router.load_config(
740 TopoRouter.RD_ZEBRA, "{}/{}/zebra.conf".format(TMPDIR, rname)
741 )
742
743 # Loading empty bgpd.conf file to router, to start the bgp deamon
744 router.load_config(TopoRouter.RD_BGP, "{}/{}/bgpd.conf".format(TMPDIR, rname))
745
746 if daemon and "ospfd" in daemon:
747 # Loading empty ospf.conf file to router, to start the bgp deamon
748 router.load_config(
749 TopoRouter.RD_OSPF, "{}/{}/ospfd.conf".format(TMPDIR, rname)
750 )
751 # Starting routers
752 logger.info("Starting all routers once topology is created")
753 tgen.start_router()
754
755
756 def stop_router(tgen, router):
757 """
758 Router"s current config would be saved to /etc/frr/ for each deamon
759 and router and its deamons would be stopped.
760
761 * `tgen` : topogen object
762 * `router`: Device under test
763 """
764
765 router_list = tgen.routers()
766
767 # Saving router config to /etc/frr, which will be loaded to router
768 # when it starts
769 router_list[router].vtysh_cmd("write memory")
770
771 # Stop router
772 router_list[router].stop()
773
774
775 def start_router(tgen, router):
776 """
777 Router will started and config would be loaded from /etc/frr/ for each
778 deamon
779
780 * `tgen` : topogen object
781 * `router`: Device under test
782 """
783
784 logger.debug("Entering lib API: start_router")
785
786 try:
787 router_list = tgen.routers()
788
789 # Router and its deamons would be started and config would
790 # be loaded to router for each deamon from /etc/frr
791 router_list[router].start()
792
793 # Waiting for router to come up
794 sleep(5)
795
796 except Exception as e:
797 errormsg = traceback.format_exc()
798 logger.error(errormsg)
799 return errormsg
800
801 logger.debug("Exiting lib API: start_router()")
802 return True
803
804
805 def number_to_row(routerName):
806 """
807 Returns the number for the router.
808 Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23
809 etc
810 """
811 return int(routerName[1:])
812
813
814 def number_to_column(routerName):
815 """
816 Returns the number for the router.
817 Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1,
818 z23 = column 26 etc
819 """
820 return ord(routerName[0]) - 97
821
822
823 def topo_daemons(tgen, topo):
824 """
825 Returns daemon list required for the suite based on topojson.
826 """
827 daemon_list = []
828
829 router_list = tgen.routers()
830 ROUTER_LIST = sorted(
831 router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0))
832 )
833
834 for rtr in ROUTER_LIST:
835 if "ospf" in topo["routers"][rtr] and "ospfd" not in daemon_list:
836 daemon_list.append("ospfd")
837
838 return daemon_list
839
840
841 #############################################
842 # Common APIs, will be used by all protocols
843 #############################################
844
845
846 def create_vrf_cfg(tgen, topo, input_dict=None, build=False):
847 """
848 Create vrf configuration for created topology. VRF
849 configuration is provided in input json file.
850
851 VRF config is done in Linux Kernel:
852 * Create VRF
853 * Attach interface to VRF
854 * Bring up VRF
855
856 Parameters
857 ----------
858 * `tgen` : Topogen object
859 * `topo` : json file data
860 * `input_dict` : Input dict data, required when configuring
861 from testcase
862 * `build` : Only for initial setup phase this is set as True.
863
864 Usage
865 -----
866 input_dict={
867 "r3": {
868 "links": {
869 "r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_A"},
870 "r2-link2": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_B"},
871 "r2-link3": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_A"},
872 "r2-link4": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_B"},
873 },
874 "vrfs":[
875 {
876 "name": "RED_A",
877 "id": "1"
878 },
879 {
880 "name": "RED_B",
881 "id": "2"
882 },
883 {
884 "name": "BLUE_A",
885 "id": "3",
886 "delete": True
887 },
888 {
889 "name": "BLUE_B",
890 "id": "4"
891 }
892 ]
893 }
894 }
895 result = create_vrf_cfg(tgen, topo, input_dict)
896
897 Returns
898 -------
899 True or False
900 """
901 result = True
902 if not input_dict:
903 input_dict = deepcopy(topo)
904 else:
905 input_dict = deepcopy(input_dict)
906
907 try:
908 for c_router, c_data in input_dict.items():
909 rnode = tgen.routers()[c_router]
910 if "vrfs" in c_data:
911 for vrf in c_data["vrfs"]:
912 config_data = []
913 del_action = vrf.setdefault("delete", False)
914 name = vrf.setdefault("name", None)
915 table_id = vrf.setdefault("id", None)
916 vni = vrf.setdefault("vni", None)
917 del_vni = vrf.setdefault("no_vni", None)
918
919 if del_action:
920 # Kernel cmd- Add VRF and table
921 cmd = "ip link del {} type vrf table {}".format(
922 vrf["name"], vrf["id"]
923 )
924
925 logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
926 rnode.run(cmd)
927
928 # Kernel cmd - Bring down VRF
929 cmd = "ip link set dev {} down".format(name)
930 logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
931 rnode.run(cmd)
932
933 else:
934 if name and table_id:
935 # Kernel cmd- Add VRF and table
936 cmd = "ip link add {} type vrf table {}".format(
937 name, table_id
938 )
939 logger.info(
940 "[DUT: %s]: Running kernel cmd " "[%s]", c_router, cmd
941 )
942 rnode.run(cmd)
943
944 # Kernel cmd - Bring up VRF
945 cmd = "ip link set dev {} up".format(name)
946 logger.info(
947 "[DUT: %s]: Running kernel " "cmd [%s]", c_router, cmd
948 )
949 rnode.run(cmd)
950
951 if "links" in c_data:
952 for destRouterLink, data in sorted(
953 c_data["links"].items()
954 ):
955 # Loopback interfaces
956 if "type" in data and data["type"] == "loopback":
957 interface_name = destRouterLink
958 else:
959 interface_name = data["interface"]
960
961 if "vrf" in data:
962 vrf_list = data["vrf"]
963
964 if type(vrf_list) is not list:
965 vrf_list = [vrf_list]
966
967 for _vrf in vrf_list:
968 cmd = "ip link set {} master {}".format(
969 interface_name, _vrf
970 )
971
972 logger.info(
973 "[DUT: %s]: Running" " kernel cmd [%s]",
974 c_router,
975 cmd,
976 )
977 rnode.run(cmd)
978
979 if vni:
980 config_data.append("vrf {}".format(vrf["name"]))
981 cmd = "vni {}".format(vni)
982 config_data.append(cmd)
983
984 if del_vni:
985 config_data.append("vrf {}".format(vrf["name"]))
986 cmd = "no vni {}".format(del_vni)
987 config_data.append(cmd)
988
989 result = create_common_configuration(
990 tgen, c_router, config_data, "vrf", build=build
991 )
992
993 except InvalidCLIError:
994 # Traceback
995 errormsg = traceback.format_exc()
996 logger.error(errormsg)
997 return errormsg
998
999 return result
1000
1001
1002 def create_interface_in_kernel(
1003 tgen, dut, name, ip_addr, vrf=None, netmask=None, create=True
1004 ):
1005 """
1006 Cretae interfaces in kernel for ipv4/ipv6
1007 Config is done in Linux Kernel:
1008
1009 Parameters
1010 ----------
1011 * `tgen` : Topogen object
1012 * `dut` : Device for which interfaces to be added
1013 * `name` : interface name
1014 * `ip_addr` : ip address for interface
1015 * `vrf` : VRF name, to which interface will be associated
1016 * `netmask` : netmask value, default is None
1017 * `create`: Create interface in kernel, if created then no need
1018 to create
1019 """
1020
1021 rnode = tgen.routers()[dut]
1022
1023 if create:
1024 cmd = "sudo ip link add name {} type dummy".format(name)
1025 rnode.run(cmd)
1026
1027 addr_type = validate_ip_address(ip_addr)
1028 if addr_type == "ipv4":
1029 cmd = "ifconfig {} {} netmask {}".format(name, ip_addr, netmask)
1030 else:
1031 cmd = "ifconfig {} inet6 add {}/{}".format(name, ip_addr, netmask)
1032
1033 rnode.run(cmd)
1034
1035 if vrf:
1036 cmd = "ip link set {} master {}".format(name, vrf)
1037 rnode.run(cmd)
1038
1039
1040 def shutdown_bringup_interface_in_kernel(tgen, dut, intf_name, ifaceaction=False):
1041 """
1042 Cretae interfaces in kernel for ipv4/ipv6
1043 Config is done in Linux Kernel:
1044
1045 Parameters
1046 ----------
1047 * `tgen` : Topogen object
1048 * `dut` : Device for which interfaces to be added
1049 * `intf_name` : interface name
1050 * `ifaceaction` : False to shutdown and True to bringup the
1051 ineterface
1052 """
1053
1054 rnode = tgen.routers()[dut]
1055
1056 cmd = "ip link set dev"
1057 if ifaceaction:
1058 action = "up"
1059 cmd = "{} {} {}".format(cmd, intf_name, action)
1060 else:
1061 action = "down"
1062 cmd = "{} {} {}".format(cmd, intf_name, action)
1063
1064 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1065 rnode.run(cmd)
1066
1067
1068 def validate_ip_address(ip_address):
1069 """
1070 Validates the type of ip address
1071 Parameters
1072 ----------
1073 * `ip_address`: IPv4/IPv6 address
1074 Returns
1075 -------
1076 Type of address as string
1077 """
1078
1079 if "/" in ip_address:
1080 ip_address = ip_address.split("/")[0]
1081
1082 v4 = True
1083 v6 = True
1084 try:
1085 socket.inet_aton(ip_address)
1086 except socket.error as error:
1087 logger.debug("Not a valid IPv4 address")
1088 v4 = False
1089 else:
1090 return "ipv4"
1091
1092 try:
1093 socket.inet_pton(socket.AF_INET6, ip_address)
1094 except socket.error as error:
1095 logger.debug("Not a valid IPv6 address")
1096 v6 = False
1097 else:
1098 return "ipv6"
1099
1100 if not v4 and not v6:
1101 raise Exception(
1102 "InvalidIpAddr", "%s is neither valid IPv4 or IPv6" " address" % ip_address
1103 )
1104
1105
1106 def check_address_types(addr_type=None):
1107 """
1108 Checks environment variable set and compares with the current address type
1109 """
1110
1111 addr_types_env = os.environ.get("ADDRESS_TYPES")
1112 if not addr_types_env:
1113 addr_types_env = "dual"
1114
1115 if addr_types_env == "dual":
1116 addr_types = ["ipv4", "ipv6"]
1117 elif addr_types_env == "ipv4":
1118 addr_types = ["ipv4"]
1119 elif addr_types_env == "ipv6":
1120 addr_types = ["ipv6"]
1121
1122 if addr_type is None:
1123 return addr_types
1124
1125 if addr_type not in addr_types:
1126 logger.debug(
1127 "{} not in supported/configured address types {}".format(
1128 addr_type, addr_types
1129 )
1130 )
1131 return False
1132
1133 return True
1134
1135
1136 def generate_ips(network, no_of_ips):
1137 """
1138 Returns list of IPs.
1139 based on start_ip and no_of_ips
1140 * `network` : from here the ip will start generating,
1141 start_ip will be
1142 * `no_of_ips` : these many IPs will be generated
1143 """
1144
1145 ipaddress_list = []
1146 if type(network) is not list:
1147 network = [network]
1148
1149 for start_ipaddr in network:
1150 if "/" in start_ipaddr:
1151 start_ip = start_ipaddr.split("/")[0]
1152 mask = int(start_ipaddr.split("/")[1])
1153 else:
1154 logger.debug("start_ipaddr {} must have a / in it".format(start_ipaddr))
1155 assert 0
1156
1157 addr_type = validate_ip_address(start_ip)
1158 if addr_type == "ipv4":
1159 start_ip = ipaddress.IPv4Address(frr_unicode(start_ip))
1160 step = 2 ** (32 - mask)
1161 if addr_type == "ipv6":
1162 start_ip = ipaddress.IPv6Address(frr_unicode(start_ip))
1163 step = 2 ** (128 - mask)
1164
1165 next_ip = start_ip
1166 count = 0
1167 while count < no_of_ips:
1168 ipaddress_list.append("{}/{}".format(next_ip, mask))
1169 if addr_type == "ipv6":
1170 next_ip = ipaddress.IPv6Address(int(next_ip) + step)
1171 else:
1172 next_ip += step
1173 count += 1
1174
1175 return ipaddress_list
1176
1177
1178 def find_interface_with_greater_ip(topo, router, loopback=True, interface=True):
1179 """
1180 Returns highest interface ip for ipv4/ipv6. If loopback is there then
1181 it will return highest IP from loopback IPs otherwise from physical
1182 interface IPs.
1183 * `topo` : json file data
1184 * `router` : router for which hightest interface should be calculated
1185 """
1186
1187 link_data = topo["routers"][router]["links"]
1188 lo_list = []
1189 interfaces_list = []
1190 lo_exists = False
1191 for destRouterLink, data in sorted(link_data.items()):
1192 if loopback:
1193 if "type" in data and data["type"] == "loopback":
1194 lo_exists = True
1195 ip_address = topo["routers"][router]["links"][destRouterLink][
1196 "ipv4"
1197 ].split("/")[0]
1198 lo_list.append(ip_address)
1199 if interface:
1200 ip_address = topo["routers"][router]["links"][destRouterLink]["ipv4"].split(
1201 "/"
1202 )[0]
1203 interfaces_list.append(ip_address)
1204
1205 if lo_exists:
1206 return sorted(lo_list)[-1]
1207
1208 return sorted(interfaces_list)[-1]
1209
1210
1211 def write_test_header(tc_name):
1212 """ Display message at beginning of test case"""
1213 count = 20
1214 logger.info("*" * (len(tc_name) + count))
1215 step("START -> Testcase : %s" % tc_name, reset=True)
1216 logger.info("*" * (len(tc_name) + count))
1217
1218
1219 def write_test_footer(tc_name):
1220 """ Display message at end of test case"""
1221 count = 21
1222 logger.info("=" * (len(tc_name) + count))
1223 logger.info("Testcase : %s -> PASSED", tc_name)
1224 logger.info("=" * (len(tc_name) + count))
1225
1226
1227 def interface_status(tgen, topo, input_dict):
1228 """
1229 Delete ip route maps from device
1230 * `tgen` : Topogen object
1231 * `topo` : json file data
1232 * `input_dict` : for which router, route map has to be deleted
1233 Usage
1234 -----
1235 input_dict = {
1236 "r3": {
1237 "interface_list": ['eth1-r1-r2', 'eth2-r1-r3'],
1238 "status": "down"
1239 }
1240 }
1241 Returns
1242 -------
1243 errormsg(str) or True
1244 """
1245 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1246
1247 try:
1248 global frr_cfg
1249 for router in input_dict.keys():
1250
1251 interface_list = input_dict[router]["interface_list"]
1252 status = input_dict[router].setdefault("status", "up")
1253 for intf in interface_list:
1254 rnode = tgen.routers()[router]
1255 interface_set_status(rnode, intf, status)
1256
1257 # Load config to router
1258 load_config_to_router(tgen, router)
1259
1260 except Exception as e:
1261 # handle any exception
1262 logger.error("Error %s occured. Arguments %s.", e.message, e.args)
1263
1264 # Traceback
1265 errormsg = traceback.format_exc()
1266 logger.error(errormsg)
1267 return errormsg
1268
1269 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1270 return True
1271
1272
1273 def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, return_is_dict=False):
1274 """
1275 Retries function execution, if return is an errormsg or exception
1276
1277 * `attempts`: Number of attempts to make
1278 * `wait`: Number of seconds to wait between each attempt
1279 * `return_is_str`: Return val is an errormsg in case of failure
1280 * `initial_wait`: Sleeps for this much seconds before executing function
1281
1282 """
1283
1284 def _retry(func):
1285 @wraps(func)
1286 def func_retry(*args, **kwargs):
1287 _wait = kwargs.pop("wait", wait)
1288 _attempts = kwargs.pop("attempts", attempts)
1289 _attempts = int(_attempts)
1290 expected = True
1291 if _attempts < 0:
1292 raise ValueError("attempts must be 0 or greater")
1293
1294 if initial_wait > 0:
1295 logger.info("Waiting for [%s]s as initial delay", initial_wait)
1296 sleep(initial_wait)
1297
1298 _return_is_str = kwargs.pop("return_is_str", return_is_str)
1299 _return_is_dict = kwargs.pop("return_is_str", return_is_dict)
1300 for i in range(1, _attempts + 1):
1301 try:
1302 _expected = kwargs.setdefault("expected", True)
1303 if _expected is False:
1304 expected = _expected
1305 kwargs.pop("expected")
1306 ret = func(*args, **kwargs)
1307 logger.debug("Function returned %s", ret)
1308 if _return_is_str and isinstance(ret, bool) and _expected:
1309 return ret
1310 if (
1311 isinstance(ret, str) or isinstance(ret, unicode)
1312 ) and _expected is False:
1313 return ret
1314 if _return_is_dict and isinstance(ret, dict):
1315 return ret
1316
1317 if _attempts == i and expected:
1318 generate_support_bundle()
1319 return ret
1320 except Exception as err:
1321 if _attempts == i and expected:
1322 generate_support_bundle()
1323 logger.info("Max number of attempts (%r) reached", _attempts)
1324 raise
1325 else:
1326 logger.info("Function returned %s", err)
1327 if i < _attempts:
1328 logger.info("Retry [#%r] after sleeping for %ss" % (i, _wait))
1329 sleep(_wait)
1330
1331 func_retry._original = func
1332 return func_retry
1333
1334 return _retry
1335
1336
1337 class Stepper:
1338 """
1339 Prints step number for the test case step being executed
1340 """
1341
1342 count = 1
1343
1344 def __call__(self, msg, reset):
1345 if reset:
1346 Stepper.count = 1
1347 logger.info(msg)
1348 else:
1349 logger.info("STEP %s: '%s'", Stepper.count, msg)
1350 Stepper.count += 1
1351
1352
1353 def step(msg, reset=False):
1354 """
1355 Call Stepper to print test steps. Need to reset at the beginning of test.
1356 * ` msg` : Step message body.
1357 * `reset` : Reset step count to 1 when set to True.
1358 """
1359 _step = Stepper()
1360 _step(msg, reset)
1361
1362
1363 #############################################
1364 # These APIs, will used by testcase
1365 #############################################
1366 def create_interfaces_cfg(tgen, topo, build=False):
1367 """
1368 Create interface configuration for created topology. Basic Interface
1369 configuration is provided in input json file.
1370
1371 Parameters
1372 ----------
1373 * `tgen` : Topogen object
1374 * `topo` : json file data
1375 * `build` : Only for initial setup phase this is set as True.
1376
1377 Returns
1378 -------
1379 True or False
1380 """
1381 result = False
1382 topo = deepcopy(topo)
1383
1384 try:
1385 for c_router, c_data in topo.items():
1386 interface_data = []
1387 for destRouterLink, data in sorted(c_data["links"].items()):
1388 # Loopback interfaces
1389 if "type" in data and data["type"] == "loopback":
1390 interface_name = destRouterLink
1391 else:
1392 interface_name = data["interface"]
1393
1394 # Include vrf if present
1395 if "vrf" in data:
1396 interface_data.append(
1397 "interface {} vrf {}".format(
1398 str(interface_name), str(data["vrf"])
1399 )
1400 )
1401 else:
1402 interface_data.append("interface {}".format(str(interface_name)))
1403
1404 if "ipv4" in data:
1405 intf_addr = c_data["links"][destRouterLink]["ipv4"]
1406
1407 if "delete" in data and data["delete"]:
1408 interface_data.append("no ip address {}".format(intf_addr))
1409 else:
1410 interface_data.append("ip address {}".format(intf_addr))
1411 if "ipv6" in data:
1412 intf_addr = c_data["links"][destRouterLink]["ipv6"]
1413
1414 if "delete" in data and data["delete"]:
1415 interface_data.append("no ipv6 address {}".format(intf_addr))
1416 else:
1417 interface_data.append("ipv6 address {}".format(intf_addr))
1418
1419 if "ipv6-link-local" in data:
1420 intf_addr = c_data["links"][destRouterLink]["ipv6-link-local"]
1421
1422 if "delete" in data and data["delete"]:
1423 interface_data.append("no ipv6 address {}".format(intf_addr))
1424 else:
1425 interface_data.append("ipv6 address {}\n".format(intf_addr))
1426
1427 if "ospf" in data:
1428 ospf_data = data["ospf"]
1429 if "area" in ospf_data:
1430 intf_ospf_area = c_data["links"][destRouterLink]["ospf"]["area"]
1431 if "delete" in data and data["delete"]:
1432 interface_data.append("no ip ospf area")
1433 else:
1434 interface_data.append(
1435 "ip ospf area {}".format(intf_ospf_area)
1436 )
1437
1438 if "hello_interval" in ospf_data:
1439 intf_ospf_hello = c_data["links"][destRouterLink]["ospf"][
1440 "hello_interval"
1441 ]
1442 if "delete" in data and data["delete"]:
1443 interface_data.append("no ip ospf " " hello-interval")
1444 else:
1445 interface_data.append(
1446 "ip ospf " " hello-interval {}".format(intf_ospf_hello)
1447 )
1448
1449 if "dead_interval" in ospf_data:
1450 intf_ospf_dead = c_data["links"][destRouterLink]["ospf"][
1451 "dead_interval"
1452 ]
1453 if "delete" in data and data["delete"]:
1454 interface_data.append("no ip ospf" " dead-interval")
1455 else:
1456 interface_data.append(
1457 "ip ospf " " dead-interval {}".format(intf_ospf_dead)
1458 )
1459
1460 if "network" in ospf_data:
1461 intf_ospf_nw = c_data["links"][destRouterLink]["ospf"][
1462 "network"
1463 ]
1464 if "delete" in data and data["delete"]:
1465 interface_data.append(
1466 "no ip ospf" " network {}".format(intf_ospf_nw)
1467 )
1468 else:
1469 interface_data.append(
1470 "ip ospf" " network {}".format(intf_ospf_nw)
1471 )
1472
1473 if "priority" in ospf_data:
1474 intf_ospf_nw = c_data["links"][destRouterLink]["ospf"][
1475 "priority"
1476 ]
1477
1478 if "delete" in data and data["delete"]:
1479 interface_data.append("no ip ospf" " priority")
1480 else:
1481 interface_data.append(
1482 "ip ospf" " priority {}".format(intf_ospf_nw)
1483 )
1484 result = create_common_configuration(
1485 tgen, c_router, interface_data, "interface_config", build=build
1486 )
1487 except InvalidCLIError:
1488 # Traceback
1489 errormsg = traceback.format_exc()
1490 logger.error(errormsg)
1491 return errormsg
1492
1493 return result
1494
1495
1496 def create_static_routes(tgen, input_dict, build=False):
1497 """
1498 Create static routes for given router as defined in input_dict
1499
1500 Parameters
1501 ----------
1502 * `tgen` : Topogen object
1503 * `input_dict` : Input dict data, required when configuring from testcase
1504 * `build` : Only for initial setup phase this is set as True.
1505
1506 Usage
1507 -----
1508 input_dict should be in the format below:
1509 # static_routes: list of all routes
1510 # network: network address
1511 # no_of_ip: number of next-hop address that will be configured
1512 # admin_distance: admin distance for route/routes.
1513 # next_hop: starting next-hop address
1514 # tag: tag id for static routes
1515 # vrf: VRF name in which static routes needs to be created
1516 # delete: True if config to be removed. Default False.
1517
1518 Example:
1519 "routers": {
1520 "r1": {
1521 "static_routes": [
1522 {
1523 "network": "100.0.20.1/32",
1524 "no_of_ip": 9,
1525 "admin_distance": 100,
1526 "next_hop": "10.0.0.1",
1527 "tag": 4001,
1528 "vrf": "RED_A"
1529 "delete": true
1530 }
1531 ]
1532 }
1533 }
1534
1535 Returns
1536 -------
1537 errormsg(str) or True
1538 """
1539 result = False
1540 logger.debug("Entering lib API: create_static_routes()")
1541 input_dict = deepcopy(input_dict)
1542
1543 try:
1544 for router in input_dict.keys():
1545 if "static_routes" not in input_dict[router]:
1546 errormsg = "static_routes not present in input_dict"
1547 logger.info(errormsg)
1548 continue
1549
1550 static_routes_list = []
1551
1552 static_routes = input_dict[router]["static_routes"]
1553 for static_route in static_routes:
1554 del_action = static_route.setdefault("delete", False)
1555 no_of_ip = static_route.setdefault("no_of_ip", 1)
1556 network = static_route.setdefault("network", [])
1557 if type(network) is not list:
1558 network = [network]
1559
1560 admin_distance = static_route.setdefault("admin_distance", None)
1561 tag = static_route.setdefault("tag", None)
1562 vrf = static_route.setdefault("vrf", None)
1563 interface = static_route.setdefault("interface", None)
1564 next_hop = static_route.setdefault("next_hop", None)
1565 nexthop_vrf = static_route.setdefault("nexthop_vrf", None)
1566
1567 ip_list = generate_ips(network, no_of_ip)
1568 for ip in ip_list:
1569 addr_type = validate_ip_address(ip)
1570
1571 if addr_type == "ipv4":
1572 cmd = "ip route {}".format(ip)
1573 else:
1574 cmd = "ipv6 route {}".format(ip)
1575
1576 if interface:
1577 cmd = "{} {}".format(cmd, interface)
1578
1579 if next_hop:
1580 cmd = "{} {}".format(cmd, next_hop)
1581
1582 if nexthop_vrf:
1583 cmd = "{} nexthop-vrf {}".format(cmd, nexthop_vrf)
1584
1585 if vrf:
1586 cmd = "{} vrf {}".format(cmd, vrf)
1587
1588 if tag:
1589 cmd = "{} tag {}".format(cmd, str(tag))
1590
1591 if admin_distance:
1592 cmd = "{} {}".format(cmd, admin_distance)
1593
1594 if del_action:
1595 cmd = "no {}".format(cmd)
1596
1597 static_routes_list.append(cmd)
1598
1599 result = create_common_configuration(
1600 tgen, router, static_routes_list, "static_route", build=build
1601 )
1602
1603 except InvalidCLIError:
1604 # Traceback
1605 errormsg = traceback.format_exc()
1606 logger.error(errormsg)
1607 return errormsg
1608
1609 logger.debug("Exiting lib API: create_static_routes()")
1610 return result
1611
1612
1613 def create_prefix_lists(tgen, input_dict, build=False):
1614 """
1615 Create ip prefix lists as per the config provided in input
1616 JSON or input_dict
1617 Parameters
1618 ----------
1619 * `tgen` : Topogen object
1620 * `input_dict` : Input dict data, required when configuring from testcase
1621 * `build` : Only for initial setup phase this is set as True.
1622 Usage
1623 -----
1624 # pf_lists_1: name of prefix-list, user defined
1625 # seqid: prefix-list seqid, auto-generated if not given by user
1626 # network: criteria for applying prefix-list
1627 # action: permit/deny
1628 # le: less than or equal number of bits
1629 # ge: greater than or equal number of bits
1630 Example
1631 -------
1632 input_dict = {
1633 "r1": {
1634 "prefix_lists":{
1635 "ipv4": {
1636 "pf_list_1": [
1637 {
1638 "seqid": 10,
1639 "network": "any",
1640 "action": "permit",
1641 "le": "32",
1642 "ge": "30",
1643 "delete": True
1644 }
1645 ]
1646 }
1647 }
1648 }
1649 }
1650 Returns
1651 -------
1652 errormsg or True
1653 """
1654
1655 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1656 result = False
1657 try:
1658 for router in input_dict.keys():
1659 if "prefix_lists" not in input_dict[router]:
1660 errormsg = "prefix_lists not present in input_dict"
1661 logger.debug(errormsg)
1662 continue
1663
1664 config_data = []
1665 prefix_lists = input_dict[router]["prefix_lists"]
1666 for addr_type, prefix_data in prefix_lists.items():
1667 if not check_address_types(addr_type):
1668 continue
1669
1670 for prefix_name, prefix_list in prefix_data.items():
1671 for prefix_dict in prefix_list:
1672 if "action" not in prefix_dict or "network" not in prefix_dict:
1673 errormsg = "'action' or network' missing in" " input_dict"
1674 return errormsg
1675
1676 network_addr = prefix_dict["network"]
1677 action = prefix_dict["action"]
1678 le = prefix_dict.setdefault("le", None)
1679 ge = prefix_dict.setdefault("ge", None)
1680 seqid = prefix_dict.setdefault("seqid", None)
1681 del_action = prefix_dict.setdefault("delete", False)
1682 if seqid is None:
1683 seqid = get_seq_id("prefix_lists", router, prefix_name)
1684 else:
1685 set_seq_id("prefix_lists", router, seqid, prefix_name)
1686
1687 if addr_type == "ipv4":
1688 protocol = "ip"
1689 else:
1690 protocol = "ipv6"
1691
1692 cmd = "{} prefix-list {} seq {} {} {}".format(
1693 protocol, prefix_name, seqid, action, network_addr
1694 )
1695 if le:
1696 cmd = "{} le {}".format(cmd, le)
1697 if ge:
1698 cmd = "{} ge {}".format(cmd, ge)
1699
1700 if del_action:
1701 cmd = "no {}".format(cmd)
1702
1703 config_data.append(cmd)
1704 result = create_common_configuration(
1705 tgen, router, config_data, "prefix_list", build=build
1706 )
1707
1708 except InvalidCLIError:
1709 # Traceback
1710 errormsg = traceback.format_exc()
1711 logger.error(errormsg)
1712 return errormsg
1713
1714 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1715 return result
1716
1717
1718 def create_route_maps(tgen, input_dict, build=False):
1719 """
1720 Create route-map on the devices as per the arguments passed
1721 Parameters
1722 ----------
1723 * `tgen` : Topogen object
1724 * `input_dict` : Input dict data, required when configuring from testcase
1725 * `build` : Only for initial setup phase this is set as True.
1726 Usage
1727 -----
1728 # route_maps: key, value pair for route-map name and its attribute
1729 # rmap_match_prefix_list_1: user given name for route-map
1730 # action: PERMIT/DENY
1731 # match: key,value pair for match criteria. prefix_list, community-list,
1732 large-community-list or tag. Only one option at a time.
1733 # prefix_list: name of prefix list
1734 # large-community-list: name of large community list
1735 # community-ist: name of community list
1736 # tag: tag id for static routes
1737 # set: key, value pair for modifying route attributes
1738 # localpref: preference value for the network
1739 # med: metric value advertised for AS
1740 # aspath: set AS path value
1741 # weight: weight for the route
1742 # community: standard community value to be attached
1743 # large_community: large community value to be attached
1744 # community_additive: if set to "additive", adds community/large-community
1745 value to the existing values of the network prefix
1746 Example:
1747 --------
1748 input_dict = {
1749 "r1": {
1750 "route_maps": {
1751 "rmap_match_prefix_list_1": [
1752 {
1753 "action": "PERMIT",
1754 "match": {
1755 "ipv4": {
1756 "prefix_list": "pf_list_1"
1757 }
1758 "ipv6": {
1759 "prefix_list": "pf_list_1"
1760 }
1761 "large-community-list": {
1762 "id": "community_1",
1763 "exact_match": True
1764 }
1765 "community_list": {
1766 "id": "community_2",
1767 "exact_match": True
1768 }
1769 "tag": "tag_id"
1770 },
1771 "set": {
1772 "locPrf": 150,
1773 "metric": 30,
1774 "path": {
1775 "num": 20000,
1776 "action": "prepend",
1777 },
1778 "weight": 500,
1779 "community": {
1780 "num": "1:2 2:3",
1781 "action": additive
1782 }
1783 "large_community": {
1784 "num": "1:2:3 4:5;6",
1785 "action": additive
1786 },
1787 }
1788 }
1789 ]
1790 }
1791 }
1792 }
1793 Returns
1794 -------
1795 errormsg(str) or True
1796 """
1797
1798 result = False
1799 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1800 input_dict = deepcopy(input_dict)
1801 try:
1802 for router in input_dict.keys():
1803 if "route_maps" not in input_dict[router]:
1804 logger.debug("route_maps not present in input_dict")
1805 continue
1806 rmap_data = []
1807 for rmap_name, rmap_value in input_dict[router]["route_maps"].items():
1808
1809 for rmap_dict in rmap_value:
1810 del_action = rmap_dict.setdefault("delete", False)
1811
1812 if del_action:
1813 rmap_data.append("no route-map {}".format(rmap_name))
1814 continue
1815
1816 if "action" not in rmap_dict:
1817 errormsg = "action not present in input_dict"
1818 logger.error(errormsg)
1819 return False
1820
1821 rmap_action = rmap_dict.setdefault("action", "deny")
1822
1823 seq_id = rmap_dict.setdefault("seq_id", None)
1824 if seq_id is None:
1825 seq_id = get_seq_id("route_maps", router, rmap_name)
1826 else:
1827 set_seq_id("route_maps", router, seq_id, rmap_name)
1828
1829 rmap_data.append(
1830 "route-map {} {} {}".format(rmap_name, rmap_action, seq_id)
1831 )
1832
1833 if "continue" in rmap_dict:
1834 continue_to = rmap_dict["continue"]
1835 if continue_to:
1836 rmap_data.append("on-match goto {}".format(continue_to))
1837 else:
1838 logger.error(
1839 "In continue, 'route-map entry "
1840 "sequence number' is not provided"
1841 )
1842 return False
1843
1844 if "goto" in rmap_dict:
1845 go_to = rmap_dict["goto"]
1846 if go_to:
1847 rmap_data.append("on-match goto {}".format(go_to))
1848 else:
1849 logger.error(
1850 "In goto, 'Goto Clause number' is not" " provided"
1851 )
1852 return False
1853
1854 if "call" in rmap_dict:
1855 call_rmap = rmap_dict["call"]
1856 if call_rmap:
1857 rmap_data.append("call {}".format(call_rmap))
1858 else:
1859 logger.error(
1860 "In call, 'destination Route-Map' is" " not provided"
1861 )
1862 return False
1863
1864 # Verifying if SET criteria is defined
1865 if "set" in rmap_dict:
1866 set_data = rmap_dict["set"]
1867 ipv4_data = set_data.setdefault("ipv4", {})
1868 ipv6_data = set_data.setdefault("ipv6", {})
1869 local_preference = set_data.setdefault("locPrf", None)
1870 metric = set_data.setdefault("metric", None)
1871 as_path = set_data.setdefault("path", {})
1872 weight = set_data.setdefault("weight", None)
1873 community = set_data.setdefault("community", {})
1874 large_community = set_data.setdefault("large_community", {})
1875 large_comm_list = set_data.setdefault("large_comm_list", {})
1876 set_action = set_data.setdefault("set_action", None)
1877 nexthop = set_data.setdefault("nexthop", None)
1878 origin = set_data.setdefault("origin", None)
1879 ext_comm_list = set_data.setdefault("extcommunity", {})
1880
1881 # Local Preference
1882 if local_preference:
1883 rmap_data.append(
1884 "set local-preference {}".format(local_preference)
1885 )
1886
1887 # Metric
1888 if metric:
1889 rmap_data.append("set metric {} \n".format(metric))
1890
1891 # Origin
1892 if origin:
1893 rmap_data.append("set origin {} \n".format(origin))
1894
1895 # AS Path Prepend
1896 if as_path:
1897 as_num = as_path.setdefault("as_num", None)
1898 as_action = as_path.setdefault("as_action", None)
1899 if as_action and as_num:
1900 rmap_data.append(
1901 "set as-path {} {}".format(as_action, as_num)
1902 )
1903
1904 # Community
1905 if community:
1906 num = community.setdefault("num", None)
1907 comm_action = community.setdefault("action", None)
1908 if num:
1909 cmd = "set community {}".format(num)
1910 if comm_action:
1911 cmd = "{} {}".format(cmd, comm_action)
1912 rmap_data.append(cmd)
1913 else:
1914 logger.error("In community, AS Num not" " provided")
1915 return False
1916
1917 if large_community:
1918 num = large_community.setdefault("num", None)
1919 comm_action = large_community.setdefault("action", None)
1920 if num:
1921 cmd = "set large-community {}".format(num)
1922 if comm_action:
1923 cmd = "{} {}".format(cmd, comm_action)
1924
1925 rmap_data.append(cmd)
1926 else:
1927 logger.error(
1928 "In large_community, AS Num not" " provided"
1929 )
1930 return False
1931 if large_comm_list:
1932 id = large_comm_list.setdefault("id", None)
1933 del_comm = large_comm_list.setdefault("delete", None)
1934 if id:
1935 cmd = "set large-comm-list {}".format(id)
1936 if del_comm:
1937 cmd = "{} delete".format(cmd)
1938
1939 rmap_data.append(cmd)
1940 else:
1941 logger.error("In large_comm_list 'id' not" " provided")
1942 return False
1943
1944 if ext_comm_list:
1945 rt = ext_comm_list.setdefault("rt", None)
1946 del_comm = ext_comm_list.setdefault("delete", None)
1947 if rt:
1948 cmd = "set extcommunity rt {}".format(rt)
1949 if del_comm:
1950 cmd = "{} delete".format(cmd)
1951
1952 rmap_data.append(cmd)
1953 else:
1954 logger.debug("In ext_comm_list 'rt' not" " provided")
1955 return False
1956
1957 # Weight
1958 if weight:
1959 rmap_data.append("set weight {}".format(weight))
1960 if ipv6_data:
1961 nexthop = ipv6_data.setdefault("nexthop", None)
1962 if nexthop:
1963 rmap_data.append("set ipv6 next-hop {}".format(nexthop))
1964
1965 # Adding MATCH and SET sequence to RMAP if defined
1966 if "match" in rmap_dict:
1967 match_data = rmap_dict["match"]
1968 ipv4_data = match_data.setdefault("ipv4", {})
1969 ipv6_data = match_data.setdefault("ipv6", {})
1970 community = match_data.setdefault("community_list", {})
1971 large_community = match_data.setdefault("large_community", {})
1972 large_community_list = match_data.setdefault(
1973 "large_community_list", {}
1974 )
1975
1976 metric = match_data.setdefault("metric", None)
1977 source_vrf = match_data.setdefault("source-vrf", None)
1978
1979 if ipv4_data:
1980 # fetch prefix list data from rmap
1981 prefix_name = ipv4_data.setdefault("prefix_lists", None)
1982 if prefix_name:
1983 rmap_data.append(
1984 "match ip address"
1985 " prefix-list {}".format(prefix_name)
1986 )
1987
1988 # fetch tag data from rmap
1989 tag = ipv4_data.setdefault("tag", None)
1990 if tag:
1991 rmap_data.append("match tag {}".format(tag))
1992
1993 # fetch large community data from rmap
1994 large_community_list = ipv4_data.setdefault(
1995 "large_community_list", {}
1996 )
1997 large_community = match_data.setdefault(
1998 "large_community", {}
1999 )
2000
2001 if ipv6_data:
2002 prefix_name = ipv6_data.setdefault("prefix_lists", None)
2003 if prefix_name:
2004 rmap_data.append(
2005 "match ipv6 address"
2006 " prefix-list {}".format(prefix_name)
2007 )
2008
2009 # fetch tag data from rmap
2010 tag = ipv6_data.setdefault("tag", None)
2011 if tag:
2012 rmap_data.append("match tag {}".format(tag))
2013
2014 # fetch large community data from rmap
2015 large_community_list = ipv6_data.setdefault(
2016 "large_community_list", {}
2017 )
2018 large_community = match_data.setdefault(
2019 "large_community", {}
2020 )
2021
2022 if community:
2023 if "id" not in community:
2024 logger.error(
2025 "'id' is mandatory for "
2026 "community-list in match"
2027 " criteria"
2028 )
2029 return False
2030 cmd = "match community {}".format(community["id"])
2031 exact_match = community.setdefault("exact_match", False)
2032 if exact_match:
2033 cmd = "{} exact-match".format(cmd)
2034
2035 rmap_data.append(cmd)
2036 if large_community:
2037 if "id" not in large_community:
2038 logger.error(
2039 "'id' is mandatory for "
2040 "large-community-list in match "
2041 "criteria"
2042 )
2043 return False
2044 cmd = "match large-community {}".format(
2045 large_community["id"]
2046 )
2047 exact_match = large_community.setdefault(
2048 "exact_match", False
2049 )
2050 if exact_match:
2051 cmd = "{} exact-match".format(cmd)
2052 rmap_data.append(cmd)
2053 if large_community_list:
2054 if "id" not in large_community_list:
2055 logger.error(
2056 "'id' is mandatory for "
2057 "large-community-list in match "
2058 "criteria"
2059 )
2060 return False
2061 cmd = "match large-community {}".format(
2062 large_community_list["id"]
2063 )
2064 exact_match = large_community_list.setdefault(
2065 "exact_match", False
2066 )
2067 if exact_match:
2068 cmd = "{} exact-match".format(cmd)
2069 rmap_data.append(cmd)
2070
2071 if source_vrf:
2072 cmd = "match source-vrf {}".format(source_vrf)
2073 rmap_data.append(cmd)
2074
2075 if metric:
2076 cmd = "match metric {}".format(metric)
2077 rmap_data.append(cmd)
2078
2079 result = create_common_configuration(
2080 tgen, router, rmap_data, "route_maps", build=build
2081 )
2082
2083 except InvalidCLIError:
2084 # Traceback
2085 errormsg = traceback.format_exc()
2086 logger.error(errormsg)
2087 return errormsg
2088
2089 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2090 return result
2091
2092
2093 def delete_route_maps(tgen, input_dict):
2094 """
2095 Delete ip route maps from device
2096 * `tgen` : Topogen object
2097 * `input_dict` : for which router,
2098 route map has to be deleted
2099 Usage
2100 -----
2101 # Delete route-map rmap_1 and rmap_2 from router r1
2102 input_dict = {
2103 "r1": {
2104 "route_maps": ["rmap_1", "rmap__2"]
2105 }
2106 }
2107 result = delete_route_maps("ipv4", input_dict)
2108 Returns
2109 -------
2110 errormsg(str) or True
2111 """
2112 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2113
2114 for router in input_dict.keys():
2115 route_maps = input_dict[router]["route_maps"][:]
2116 rmap_data = input_dict[router]
2117 rmap_data["route_maps"] = {}
2118 for route_map_name in route_maps:
2119 rmap_data["route_maps"].update({route_map_name: [{"delete": True}]})
2120
2121 return create_route_maps(tgen, input_dict)
2122
2123
2124 def create_bgp_community_lists(tgen, input_dict, build=False):
2125 """
2126 Create bgp community-list or large-community-list on the devices as per
2127 the arguments passed. Takes list of communities in input.
2128 Parameters
2129 ----------
2130 * `tgen` : Topogen object
2131 * `input_dict` : Input dict data, required when configuring from testcase
2132 * `build` : Only for initial setup phase this is set as True.
2133 Usage
2134 -----
2135 input_dict_1 = {
2136 "r3": {
2137 "bgp_community_lists": [
2138 {
2139 "community_type": "standard",
2140 "action": "permit",
2141 "name": "rmap_lcomm_{}".format(addr_type),
2142 "value": "1:1:1 1:2:3 2:1:1 2:2:2",
2143 "large": True
2144 }
2145 ]
2146 }
2147 }
2148 }
2149 result = create_bgp_community_lists(tgen, input_dict_1)
2150 """
2151
2152 result = False
2153 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2154 input_dict = deepcopy(input_dict)
2155 try:
2156 for router in input_dict.keys():
2157 if "bgp_community_lists" not in input_dict[router]:
2158 errormsg = "bgp_community_lists not present in input_dict"
2159 logger.debug(errormsg)
2160 continue
2161
2162 config_data = []
2163
2164 community_list = input_dict[router]["bgp_community_lists"]
2165 for community_dict in community_list:
2166 del_action = community_dict.setdefault("delete", False)
2167 community_type = community_dict.setdefault("community_type", None)
2168 action = community_dict.setdefault("action", None)
2169 value = community_dict.setdefault("value", "")
2170 large = community_dict.setdefault("large", None)
2171 name = community_dict.setdefault("name", None)
2172 if large:
2173 cmd = "bgp large-community-list"
2174 else:
2175 cmd = "bgp community-list"
2176
2177 if not large and not (community_type and action and value):
2178 errormsg = (
2179 "community_type, action and value are "
2180 "required in bgp_community_list"
2181 )
2182 logger.error(errormsg)
2183 return False
2184
2185 try:
2186 community_type = int(community_type)
2187 cmd = "{} {} {} {}".format(cmd, community_type, action, value)
2188 except ValueError:
2189
2190 cmd = "{} {} {} {} {}".format(
2191 cmd, community_type, name, action, value
2192 )
2193
2194 if del_action:
2195 cmd = "no {}".format(cmd)
2196
2197 config_data.append(cmd)
2198
2199 result = create_common_configuration(
2200 tgen, router, config_data, "bgp_community_list", build=build
2201 )
2202
2203 except InvalidCLIError:
2204 # Traceback
2205 errormsg = traceback.format_exc()
2206 logger.error(errormsg)
2207 return errormsg
2208
2209 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2210 return result
2211
2212
2213 def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False):
2214 """
2215 Shutdown or bringup router's interface "
2216 * `tgen` : Topogen object
2217 * `dut` : Device under test
2218 * `intf_name` : Interface name to be shut/no shut
2219 * `ifaceaction` : Action, to shut/no shut interface,
2220 by default is False
2221 Usage
2222 -----
2223 dut = "r3"
2224 intf = "r3-r1-eth0"
2225 # Shut down ineterface
2226 shutdown_bringup_interface(tgen, dut, intf, False)
2227 # Bring up ineterface
2228 shutdown_bringup_interface(tgen, dut, intf, True)
2229 Returns
2230 -------
2231 errormsg(str) or True
2232 """
2233
2234 router_list = tgen.routers()
2235 if ifaceaction:
2236 logger.info("Bringing up interface : {}".format(intf_name))
2237 else:
2238 logger.info("Shutting down interface : {}".format(intf_name))
2239
2240 interface_set_status(router_list[dut], intf_name, ifaceaction)
2241
2242
2243 def addKernelRoute(
2244 tgen, router, intf, group_addr_range, next_hop=None, src=None, del_action=None
2245 ):
2246 """
2247 Add route to kernel
2248
2249 Parameters:
2250 -----------
2251 * `tgen` : Topogen object
2252 * `router`: router for which kernal routes needs to be added
2253 * `intf`: interface name, for which kernal routes needs to be added
2254 * `bindToAddress`: bind to <host>, an interface or multicast
2255 address
2256
2257 returns:
2258 --------
2259 errormsg or True
2260 """
2261
2262 logger.debug("Entering lib API: addKernelRoute()")
2263
2264 rnode = tgen.routers()[router]
2265
2266 if type(group_addr_range) is not list:
2267 group_addr_range = [group_addr_range]
2268
2269 for grp_addr in group_addr_range:
2270
2271 addr_type = validate_ip_address(grp_addr)
2272 if addr_type == "ipv4":
2273 if next_hop is not None:
2274 cmd = "ip route add {} via {}".format(grp_addr, next_hop)
2275 else:
2276 cmd = "ip route add {} dev {}".format(grp_addr, intf)
2277 if del_action:
2278 cmd = "ip route del {}".format(grp_addr)
2279 verify_cmd = "ip route"
2280 elif addr_type == "ipv6":
2281 if intf and src:
2282 cmd = "ip -6 route add {} dev {} src {}".format(grp_addr, intf, src)
2283 else:
2284 cmd = "ip -6 route add {} via {}".format(grp_addr, next_hop)
2285 verify_cmd = "ip -6 route"
2286 if del_action:
2287 cmd = "ip -6 route del {}".format(grp_addr)
2288
2289 logger.info("[DUT: {}]: Running command: [{}]".format(router, cmd))
2290 output = rnode.run(cmd)
2291
2292 # Verifying if ip route added to kernal
2293 result = rnode.run(verify_cmd)
2294 logger.debug("{}\n{}".format(verify_cmd, result))
2295 if "/" in grp_addr:
2296 ip, mask = grp_addr.split("/")
2297 if mask == "32" or mask == "128":
2298 grp_addr = ip
2299
2300 if not re_search(r"{}".format(grp_addr), result) and mask != "0":
2301 errormsg = (
2302 "[DUT: {}]: Kernal route is not added for group"
2303 " address {} Config output: {}".format(router, grp_addr, output)
2304 )
2305
2306 return errormsg
2307
2308 logger.debug("Exiting lib API: addKernelRoute()")
2309 return True
2310
2311
2312 def configure_vxlan(tgen, input_dict):
2313 """
2314 Add and configure vxlan
2315
2316 * `tgen`: tgen onject
2317 * `input_dict` : data for vxlan config
2318
2319 Usage:
2320 ------
2321 input_dict= {
2322 "dcg2":{
2323 "vxlan":[{
2324 "vxlan_name": "vxlan75100",
2325 "vxlan_id": "75100",
2326 "dstport": 4789,
2327 "local_addr": "120.0.0.1",
2328 "learning": "no",
2329 "delete": True
2330 }]
2331 }
2332 }
2333
2334 configure_vxlan(tgen, input_dict)
2335
2336 Returns:
2337 -------
2338 True or errormsg
2339
2340 """
2341
2342 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2343
2344 router_list = tgen.routers()
2345 for dut in input_dict.keys():
2346 rnode = tgen.routers()[dut]
2347
2348 if "vxlan" in input_dict[dut]:
2349 for vxlan_dict in input_dict[dut]["vxlan"]:
2350 cmd = "ip link "
2351
2352 del_vxlan = vxlan_dict.setdefault("delete", None)
2353 vxlan_names = vxlan_dict.setdefault("vxlan_name", [])
2354 vxlan_ids = vxlan_dict.setdefault("vxlan_id", [])
2355 dstport = vxlan_dict.setdefault("dstport", None)
2356 local_addr = vxlan_dict.setdefault("local_addr", None)
2357 learning = vxlan_dict.setdefault("learning", None)
2358
2359 config_data = []
2360 if vxlan_names and vxlan_ids:
2361 for vxlan_name, vxlan_id in zip(vxlan_names, vxlan_ids):
2362 cmd = "ip link"
2363
2364 if del_vxlan:
2365 cmd = "{} del {} type vxlan id {}".format(
2366 cmd, vxlan_name, vxlan_id
2367 )
2368 else:
2369 cmd = "{} add {} type vxlan id {}".format(
2370 cmd, vxlan_name, vxlan_id
2371 )
2372
2373 if dstport:
2374 cmd = "{} dstport {}".format(cmd, dstport)
2375
2376 if local_addr:
2377 ip_cmd = "ip addr add {} dev {}".format(
2378 local_addr, vxlan_name
2379 )
2380 if del_vxlan:
2381 ip_cmd = "ip addr del {} dev {}".format(
2382 local_addr, vxlan_name
2383 )
2384
2385 config_data.append(ip_cmd)
2386
2387 cmd = "{} local {}".format(cmd, local_addr)
2388
2389 if learning == "no":
2390 cmd = "{} nolearning".format(cmd)
2391
2392 elif learning == "yes":
2393 cmd = "{} learning".format(cmd)
2394
2395 config_data.append(cmd)
2396
2397 try:
2398 for _cmd in config_data:
2399 logger.info("[DUT: %s]: Running command: %s", dut, _cmd)
2400 rnode.run(_cmd)
2401
2402 except InvalidCLIError:
2403 # Traceback
2404 errormsg = traceback.format_exc()
2405 logger.error(errormsg)
2406 return errormsg
2407
2408 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2409
2410 return True
2411
2412
2413 def configure_brctl(tgen, topo, input_dict):
2414 """
2415 Add and configure brctl
2416
2417 * `tgen`: tgen onject
2418 * `input_dict` : data for brctl config
2419
2420 Usage:
2421 ------
2422 input_dict= {
2423 dut:{
2424 "brctl": [{
2425 "brctl_name": "br100",
2426 "addvxlan": "vxlan75100",
2427 "vrf": "RED",
2428 "stp": "off"
2429 }]
2430 }
2431 }
2432
2433 configure_brctl(tgen, topo, input_dict)
2434
2435 Returns:
2436 -------
2437 True or errormsg
2438
2439 """
2440
2441 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2442
2443 router_list = tgen.routers()
2444 for dut in input_dict.keys():
2445 rnode = tgen.routers()[dut]
2446
2447 if "brctl" in input_dict[dut]:
2448 for brctl_dict in input_dict[dut]["brctl"]:
2449
2450 brctl_names = brctl_dict.setdefault("brctl_name", [])
2451 addvxlans = brctl_dict.setdefault("addvxlan", [])
2452 stp_values = brctl_dict.setdefault("stp", [])
2453 vrfs = brctl_dict.setdefault("vrf", [])
2454
2455 ip_cmd = "ip link set"
2456 for brctl_name, vxlan, vrf, stp in zip(
2457 brctl_names, addvxlans, vrfs, stp_values
2458 ):
2459
2460 ip_cmd_list = []
2461 cmd = "ip link add name {} type bridge stp_state {}".format(
2462 brctl_name, stp
2463 )
2464
2465 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
2466 rnode.run(cmd)
2467
2468 ip_cmd_list.append("{} up dev {}".format(ip_cmd, brctl_name))
2469
2470 if vxlan:
2471 cmd = "{} dev {} master {}".format(ip_cmd, vxlan, brctl_name)
2472
2473 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
2474 rnode.run(cmd)
2475
2476 ip_cmd_list.append("{} up dev {}".format(ip_cmd, vxlan))
2477
2478 if vrf:
2479 ip_cmd_list.append(
2480 "{} dev {} master {}".format(ip_cmd, brctl_name, vrf)
2481 )
2482
2483 for intf_name, data in topo["routers"][dut]["links"].items():
2484 if "vrf" not in data:
2485 continue
2486
2487 if data["vrf"] == vrf:
2488 ip_cmd_list.append(
2489 "{} up dev {}".format(ip_cmd, data["interface"])
2490 )
2491
2492 try:
2493 for _ip_cmd in ip_cmd_list:
2494 logger.info("[DUT: %s]: Running command: %s", dut, _ip_cmd)
2495 rnode.run(_ip_cmd)
2496
2497 except InvalidCLIError:
2498 # Traceback
2499 errormsg = traceback.format_exc()
2500 logger.error(errormsg)
2501 return errormsg
2502
2503 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2504 return True
2505
2506
2507 def configure_interface_mac(tgen, input_dict):
2508 """
2509 Add and configure brctl
2510
2511 * `tgen`: tgen onject
2512 * `input_dict` : data for mac config
2513
2514 input_mac= {
2515 "edge1":{
2516 "br75100": "00:80:48:BA:d1:00,
2517 "br75200": "00:80:48:BA:d1:00
2518 }
2519 }
2520
2521 configure_interface_mac(tgen, input_mac)
2522
2523 Returns:
2524 -------
2525 True or errormsg
2526
2527 """
2528
2529 router_list = tgen.routers()
2530 for dut in input_dict.keys():
2531 rnode = tgen.routers()[dut]
2532
2533 for intf, mac in input_dict[dut].items():
2534 cmd = "ifconfig {} hw ether {}".format(intf, mac)
2535 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
2536
2537 try:
2538 result = rnode.run(cmd)
2539 if len(result) != 0:
2540 return result
2541
2542 except InvalidCLIError:
2543 # Traceback
2544 errormsg = traceback.format_exc()
2545 logger.error(errormsg)
2546 return errormsg
2547
2548 return True
2549
2550
2551 #############################################
2552 # Verification APIs
2553 #############################################
2554 @retry(attempts=6, wait=2, return_is_str=True)
2555 def verify_rib(
2556 tgen,
2557 addr_type,
2558 dut,
2559 input_dict,
2560 next_hop=None,
2561 protocol=None,
2562 tag=None,
2563 metric=None,
2564 fib=None,
2565 count_only=False,
2566 ):
2567 """
2568 Data will be read from input_dict or input JSON file, API will generate
2569 same prefixes, which were redistributed by either create_static_routes() or
2570 advertise_networks_using_network_command() and do will verify next_hop and
2571 each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
2572 command o/p.
2573
2574 Parameters
2575 ----------
2576 * `tgen` : topogen object
2577 * `addr_type` : ip type, ipv4/ipv6
2578 * `dut`: Device Under Test, for which user wants to test the data
2579 * `input_dict` : input dict, has details of static routes
2580 * `next_hop`[optional]: next_hop which needs to be verified,
2581 default: static
2582 * `protocol`[optional]: protocol, default = None
2583 * `count_only`[optional]: count of nexthops only, not specific addresses,
2584 default = False
2585
2586 Usage
2587 -----
2588 # RIB can be verified for static routes OR network advertised using
2589 network command. Following are input_dicts to create static routes
2590 and advertise networks using network command. Any one of the input_dict
2591 can be passed to verify_rib() to verify routes in DUT"s RIB.
2592
2593 # Creating static routes for r1
2594 input_dict = {
2595 "r1": {
2596 "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
2597 "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
2598 }}
2599 # Advertising networks using network command in router r1
2600 input_dict = {
2601 "r1": {
2602 "advertise_networks": [{"start_ip": "20.0.0.0/32",
2603 "no_of_network": 10},
2604 {"start_ip": "30.0.0.0/32"}]
2605 }}
2606 # Verifying ipv4 routes in router r1 learned via BGP
2607 dut = "r2"
2608 protocol = "bgp"
2609 result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
2610
2611 Returns
2612 -------
2613 errormsg(str) or True
2614 """
2615
2616 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2617
2618 router_list = tgen.routers()
2619 additional_nexthops_in_required_nhs = []
2620 found_hops = []
2621 for routerInput in input_dict.keys():
2622 for router, rnode in router_list.items():
2623 if router != dut:
2624 continue
2625
2626 logger.info("Checking router %s RIB:", router)
2627
2628 # Verifying RIB routes
2629 if addr_type == "ipv4":
2630 command = "show ip route"
2631 else:
2632 command = "show ipv6 route"
2633
2634 found_routes = []
2635 missing_routes = []
2636
2637 if "static_routes" in input_dict[routerInput]:
2638 static_routes = input_dict[routerInput]["static_routes"]
2639
2640 for static_route in static_routes:
2641 if "vrf" in static_route and static_route["vrf"] is not None:
2642
2643 logger.info(
2644 "[DUT: {}]: Verifying routes for VRF:"
2645 " {}".format(router, static_route["vrf"])
2646 )
2647
2648 cmd = "{} vrf {}".format(command, static_route["vrf"])
2649
2650 else:
2651 cmd = "{}".format(command)
2652
2653 if protocol:
2654 cmd = "{} {}".format(cmd, protocol)
2655
2656 cmd = "{} json".format(cmd)
2657
2658 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2659
2660 # Verifying output dictionary rib_routes_json is not empty
2661 if bool(rib_routes_json) is False:
2662 errormsg = "No route found in rib of router {}..".format(router)
2663 return errormsg
2664
2665 network = static_route["network"]
2666 if "no_of_ip" in static_route:
2667 no_of_ip = static_route["no_of_ip"]
2668 else:
2669 no_of_ip = 1
2670
2671 if "tag" in static_route:
2672 _tag = static_route["tag"]
2673 else:
2674 _tag = None
2675
2676 # Generating IPs for verification
2677 ip_list = generate_ips(network, no_of_ip)
2678 st_found = False
2679 nh_found = False
2680
2681 for st_rt in ip_list:
2682 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
2683
2684 _addr_type = validate_ip_address(st_rt)
2685 if _addr_type != addr_type:
2686 continue
2687
2688 if st_rt in rib_routes_json:
2689 st_found = True
2690 found_routes.append(st_rt)
2691
2692 if fib and next_hop:
2693 if type(next_hop) is not list:
2694 next_hop = [next_hop]
2695
2696 for mnh in range(0, len(rib_routes_json[st_rt])):
2697 if (
2698 "fib"
2699 in rib_routes_json[st_rt][mnh]["nexthops"][0]
2700 ):
2701 found_hops.append(
2702 [
2703 rib_r["ip"]
2704 for rib_r in rib_routes_json[st_rt][
2705 mnh
2706 ]["nexthops"]
2707 ]
2708 )
2709
2710 if found_hops[0]:
2711 missing_list_of_nexthops = set(
2712 found_hops[0]
2713 ).difference(next_hop)
2714 additional_nexthops_in_required_nhs = set(
2715 next_hop
2716 ).difference(found_hops[0])
2717
2718 if additional_nexthops_in_required_nhs:
2719 logger.info(
2720 "Nexthop "
2721 "%s is not active for route %s in "
2722 "RIB of router %s\n",
2723 additional_nexthops_in_required_nhs,
2724 st_rt,
2725 dut,
2726 )
2727 errormsg = (
2728 "Nexthop {} is not active"
2729 " for route {} in RIB of router"
2730 " {}\n".format(
2731 additional_nexthops_in_required_nhs,
2732 st_rt,
2733 dut,
2734 )
2735 )
2736 return errormsg
2737 else:
2738 nh_found = True
2739
2740 elif next_hop and fib is None:
2741 if type(next_hop) is not list:
2742 next_hop = [next_hop]
2743 found_hops = [
2744 rib_r["ip"]
2745 for rib_r in rib_routes_json[st_rt][0]["nexthops"]
2746 ]
2747
2748 # Check only the count of nexthops
2749 if count_only:
2750 if len(next_hop) == len(found_hops):
2751 nh_found = True
2752 else:
2753 errormsg = (
2754 "Nexthops are missing for "
2755 "route {} in RIB of router {}: "
2756 "expected {}, found {}\n".format(
2757 st_rt,
2758 dut,
2759 len(next_hop),
2760 len(found_hops),
2761 )
2762 )
2763 return errormsg
2764
2765 # Check the actual nexthops
2766 elif found_hops:
2767 missing_list_of_nexthops = set(
2768 found_hops
2769 ).difference(next_hop)
2770 additional_nexthops_in_required_nhs = set(
2771 next_hop
2772 ).difference(found_hops)
2773
2774 if additional_nexthops_in_required_nhs:
2775 logger.info(
2776 "Missing nexthop %s for route"
2777 " %s in RIB of router %s\n",
2778 additional_nexthops_in_required_nhs,
2779 st_rt,
2780 dut,
2781 )
2782 errormsg = (
2783 "Nexthop {} is Missing for "
2784 "route {} in RIB of router {}\n".format(
2785 additional_nexthops_in_required_nhs,
2786 st_rt,
2787 dut,
2788 )
2789 )
2790 return errormsg
2791 else:
2792 nh_found = True
2793
2794 if tag:
2795 if "tag" not in rib_routes_json[st_rt][0]:
2796 errormsg = (
2797 "[DUT: {}]: tag is not"
2798 " present for"
2799 " route {} in RIB \n".format(dut, st_rt)
2800 )
2801 return errormsg
2802
2803 if _tag != rib_routes_json[st_rt][0]["tag"]:
2804 errormsg = (
2805 "[DUT: {}]: tag value {}"
2806 " is not matched for"
2807 " route {} in RIB \n".format(
2808 dut,
2809 _tag,
2810 st_rt,
2811 )
2812 )
2813 return errormsg
2814
2815 if metric is not None:
2816 if "metric" not in rib_routes_json[st_rt][0]:
2817 errormsg = (
2818 "[DUT: {}]: metric is"
2819 " not present for"
2820 " route {} in RIB \n".format(dut, st_rt)
2821 )
2822 return errormsg
2823
2824 if metric != rib_routes_json[st_rt][0]["metric"]:
2825 errormsg = (
2826 "[DUT: {}]: metric value "
2827 "{} is not matched for "
2828 "route {} in RIB \n".format(
2829 dut,
2830 metric,
2831 st_rt,
2832 )
2833 )
2834 return errormsg
2835
2836 else:
2837 missing_routes.append(st_rt)
2838
2839 if nh_found:
2840 logger.info(
2841 "[DUT: {}]: Found next_hop {} for all bgp"
2842 " routes in RIB".format(router, next_hop)
2843 )
2844
2845 if len(missing_routes) > 0:
2846 errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
2847 dut, missing_routes
2848 )
2849 return errormsg
2850
2851 if found_routes:
2852 logger.info(
2853 "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
2854 dut,
2855 found_routes,
2856 )
2857
2858 continue
2859
2860 if "bgp" in input_dict[routerInput]:
2861 if (
2862 "advertise_networks"
2863 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
2864 "unicast"
2865 ]
2866 ):
2867 continue
2868
2869 found_routes = []
2870 missing_routes = []
2871 advertise_network = input_dict[routerInput]["bgp"]["address_family"][
2872 addr_type
2873 ]["unicast"]["advertise_networks"]
2874
2875 # Continue if there are no network advertise
2876 if len(advertise_network) == 0:
2877 continue
2878
2879 for advertise_network_dict in advertise_network:
2880 if "vrf" in advertise_network_dict:
2881 cmd = "{} vrf {} json".format(
2882 command, advertise_network_dict["vrf"]
2883 )
2884 else:
2885 cmd = "{} json".format(command)
2886
2887 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2888
2889 # Verifying output dictionary rib_routes_json is not empty
2890 if bool(rib_routes_json) is False:
2891 errormsg = "No route found in rib of router {}..".format(router)
2892 return errormsg
2893
2894 start_ip = advertise_network_dict["network"]
2895 if "no_of_network" in advertise_network_dict:
2896 no_of_network = advertise_network_dict["no_of_network"]
2897 else:
2898 no_of_network = 1
2899
2900 # Generating IPs for verification
2901 ip_list = generate_ips(start_ip, no_of_network)
2902 st_found = False
2903 nh_found = False
2904
2905 for st_rt in ip_list:
2906 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
2907
2908 _addr_type = validate_ip_address(st_rt)
2909 if _addr_type != addr_type:
2910 continue
2911
2912 if st_rt in rib_routes_json:
2913 st_found = True
2914 found_routes.append(st_rt)
2915
2916 if next_hop:
2917 if type(next_hop) is not list:
2918 next_hop = [next_hop]
2919
2920 count = 0
2921 for nh in next_hop:
2922 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
2923 if nh_dict["ip"] != nh:
2924 continue
2925 else:
2926 count += 1
2927
2928 if count == len(next_hop):
2929 nh_found = True
2930 else:
2931 errormsg = (
2932 "Nexthop {} is Missing"
2933 " for route {} in "
2934 "RIB of router {}\n".format(next_hop, st_rt, dut)
2935 )
2936 return errormsg
2937 else:
2938 missing_routes.append(st_rt)
2939
2940 if nh_found:
2941 logger.info(
2942 "Found next_hop {} for all routes in RIB"
2943 " of router {}\n".format(next_hop, dut)
2944 )
2945
2946 if len(missing_routes) > 0:
2947 errormsg = (
2948 "Missing {} route in RIB of router {}, "
2949 "routes: {} \n".format(addr_type, dut, missing_routes)
2950 )
2951 return errormsg
2952
2953 if found_routes:
2954 logger.info(
2955 "Verified {} routes in router {} RIB, found"
2956 " routes are: {}\n".format(addr_type, dut, found_routes)
2957 )
2958
2959 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2960 return True
2961
2962
2963 @retry(attempts=6, wait=2, return_is_str=True)
2964 def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
2965 """
2966 Data will be read from input_dict or input JSON file, API will generate
2967 same prefixes, which were redistributed by either create_static_routes() or
2968 advertise_networks_using_network_command() and will verify next_hop and
2969 each prefix/routes is present in "show ip/ipv6 fib json"
2970 command o/p.
2971
2972 Parameters
2973 ----------
2974 * `tgen` : topogen object
2975 * `addr_type` : ip type, ipv4/ipv6
2976 * `dut`: Device Under Test, for which user wants to test the data
2977 * `input_dict` : input dict, has details of static routes
2978 * `next_hop`[optional]: next_hop which needs to be verified,
2979 default: static
2980
2981 Usage
2982 -----
2983 input_routes_r1 = {
2984 "r1": {
2985 "static_routes": [{
2986 "network": ["1.1.1.1/32],
2987 "next_hop": "Null0",
2988 "vrf": "RED"
2989 }]
2990 }
2991 }
2992 result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1)
2993
2994 Returns
2995 -------
2996 errormsg(str) or True
2997 """
2998
2999 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3000
3001 router_list = tgen.routers()
3002 for routerInput in input_dict.keys():
3003 for router, rnode in router_list.items():
3004 if router != dut:
3005 continue
3006
3007 logger.info("Checking router %s FIB routes:", router)
3008
3009 # Verifying RIB routes
3010 if addr_type == "ipv4":
3011 command = "show ip fib"
3012 else:
3013 command = "show ipv6 fib"
3014
3015 found_routes = []
3016 missing_routes = []
3017
3018 if "static_routes" in input_dict[routerInput]:
3019 static_routes = input_dict[routerInput]["static_routes"]
3020
3021 for static_route in static_routes:
3022 if "vrf" in static_route and static_route["vrf"] is not None:
3023
3024 logger.info(
3025 "[DUT: {}]: Verifying routes for VRF:"
3026 " {}".format(router, static_route["vrf"])
3027 )
3028
3029 cmd = "{} vrf {}".format(command, static_route["vrf"])
3030
3031 else:
3032 cmd = "{}".format(command)
3033
3034 cmd = "{} json".format(cmd)
3035
3036 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3037
3038 # Verifying output dictionary rib_routes_json is not empty
3039 if bool(rib_routes_json) is False:
3040 errormsg = "[DUT: {}]: No route found in fib".format(router)
3041 return errormsg
3042
3043 network = static_route["network"]
3044 if "no_of_ip" in static_route:
3045 no_of_ip = static_route["no_of_ip"]
3046 else:
3047 no_of_ip = 1
3048
3049 # Generating IPs for verification
3050 ip_list = generate_ips(network, no_of_ip)
3051 st_found = False
3052 nh_found = False
3053
3054 for st_rt in ip_list:
3055 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
3056
3057 _addr_type = validate_ip_address(st_rt)
3058 if _addr_type != addr_type:
3059 continue
3060
3061 if st_rt in rib_routes_json:
3062 st_found = True
3063 found_routes.append(st_rt)
3064
3065 if next_hop:
3066 if type(next_hop) is not list:
3067 next_hop = [next_hop]
3068
3069 count = 0
3070 for nh in next_hop:
3071 for nh_dict in rib_routes_json[st_rt][0][
3072 "nexthops"
3073 ]:
3074 if nh_dict["ip"] != nh:
3075 continue
3076 else:
3077 count += 1
3078
3079 if count == len(next_hop):
3080 nh_found = True
3081 else:
3082 missing_routes.append(st_rt)
3083 errormsg = (
3084 "Nexthop {} is Missing"
3085 " for route {} in "
3086 "RIB of router {}\n".format(
3087 next_hop, st_rt, dut
3088 )
3089 )
3090 return errormsg
3091
3092 else:
3093 missing_routes.append(st_rt)
3094
3095 if len(missing_routes) > 0:
3096 errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format(
3097 dut, missing_routes
3098 )
3099 return errormsg
3100
3101 if nh_found:
3102 logger.info(
3103 "Found next_hop {} for all routes in RIB"
3104 " of router {}\n".format(next_hop, dut)
3105 )
3106
3107 if found_routes:
3108 logger.info(
3109 "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n",
3110 dut,
3111 found_routes,
3112 )
3113
3114 continue
3115
3116 if "bgp" in input_dict[routerInput]:
3117 if (
3118 "advertise_networks"
3119 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
3120 "unicast"
3121 ]
3122 ):
3123 continue
3124
3125 found_routes = []
3126 missing_routes = []
3127 advertise_network = input_dict[routerInput]["bgp"]["address_family"][
3128 addr_type
3129 ]["unicast"]["advertise_networks"]
3130
3131 # Continue if there are no network advertise
3132 if len(advertise_network) == 0:
3133 continue
3134
3135 for advertise_network_dict in advertise_network:
3136 if "vrf" in advertise_network_dict:
3137 cmd = "{} vrf {} json".format(command, static_route["vrf"])
3138 else:
3139 cmd = "{} json".format(command)
3140
3141 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3142
3143 # Verifying output dictionary rib_routes_json is not empty
3144 if bool(rib_routes_json) is False:
3145 errormsg = "No route found in rib of router {}..".format(router)
3146 return errormsg
3147
3148 start_ip = advertise_network_dict["network"]
3149 if "no_of_network" in advertise_network_dict:
3150 no_of_network = advertise_network_dict["no_of_network"]
3151 else:
3152 no_of_network = 1
3153
3154 # Generating IPs for verification
3155 ip_list = generate_ips(start_ip, no_of_network)
3156 st_found = False
3157 nh_found = False
3158
3159 for st_rt in ip_list:
3160 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
3161
3162 _addr_type = validate_ip_address(st_rt)
3163 if _addr_type != addr_type:
3164 continue
3165
3166 if st_rt in rib_routes_json:
3167 st_found = True
3168 found_routes.append(st_rt)
3169
3170 if next_hop:
3171 if type(next_hop) is not list:
3172 next_hop = [next_hop]
3173
3174 count = 0
3175 for nh in next_hop:
3176 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
3177 if nh_dict["ip"] != nh:
3178 continue
3179 else:
3180 count += 1
3181
3182 if count == len(next_hop):
3183 nh_found = True
3184 else:
3185 missing_routes.append(st_rt)
3186 errormsg = (
3187 "Nexthop {} is Missing"
3188 " for route {} in "
3189 "RIB of router {}\n".format(next_hop, st_rt, dut)
3190 )
3191 return errormsg
3192 else:
3193 missing_routes.append(st_rt)
3194
3195 if len(missing_routes) > 0:
3196 errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format(
3197 dut, missing_routes
3198 )
3199 return errormsg
3200
3201 if nh_found:
3202 logger.info(
3203 "Found next_hop {} for all routes in RIB"
3204 " of router {}\n".format(next_hop, dut)
3205 )
3206
3207 if found_routes:
3208 logger.info(
3209 "[DUT: {}]: Verified routes FIB"
3210 ", found routes are: {}\n".format(dut, found_routes)
3211 )
3212
3213 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3214 return True
3215
3216
3217 def verify_admin_distance_for_static_routes(tgen, input_dict):
3218 """
3219 API to verify admin distance for static routes as defined in input_dict/
3220 input JSON by running show ip/ipv6 route json command.
3221 Parameter
3222 ---------
3223 * `tgen` : topogen object
3224 * `input_dict`: having details like - for which router and static routes
3225 admin dsitance needs to be verified
3226 Usage
3227 -----
3228 # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
3229 10.0.0.2 in router r1
3230 input_dict = {
3231 "r1": {
3232 "static_routes": [{
3233 "network": "10.0.20.1/32",
3234 "admin_distance": 10,
3235 "next_hop": "10.0.0.2"
3236 }]
3237 }
3238 }
3239 result = verify_admin_distance_for_static_routes(tgen, input_dict)
3240 Returns
3241 -------
3242 errormsg(str) or True
3243 """
3244
3245 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3246
3247 for router in input_dict.keys():
3248 if router not in tgen.routers():
3249 continue
3250
3251 rnode = tgen.routers()[router]
3252
3253 for static_route in input_dict[router]["static_routes"]:
3254 addr_type = validate_ip_address(static_route["network"])
3255 # Command to execute
3256 if addr_type == "ipv4":
3257 command = "show ip route json"
3258 else:
3259 command = "show ipv6 route json"
3260 show_ip_route_json = run_frr_cmd(rnode, command, isjson=True)
3261
3262 logger.info(
3263 "Verifying admin distance for static route %s" " under dut %s:",
3264 static_route,
3265 router,
3266 )
3267 network = static_route["network"]
3268 next_hop = static_route["next_hop"]
3269 admin_distance = static_route["admin_distance"]
3270 route_data = show_ip_route_json[network][0]
3271 if network in show_ip_route_json:
3272 if route_data["nexthops"][0]["ip"] == next_hop:
3273 if route_data["distance"] != admin_distance:
3274 errormsg = (
3275 "Verification failed: admin distance"
3276 " for static route {} under dut {},"
3277 " found:{} but expected:{}".format(
3278 static_route,
3279 router,
3280 route_data["distance"],
3281 admin_distance,
3282 )
3283 )
3284 return errormsg
3285 else:
3286 logger.info(
3287 "Verification successful: admin"
3288 " distance for static route %s under"
3289 " dut %s, found:%s",
3290 static_route,
3291 router,
3292 route_data["distance"],
3293 )
3294
3295 else:
3296 errormsg = (
3297 "Static route {} not found in "
3298 "show_ip_route_json for dut {}".format(network, router)
3299 )
3300 return errormsg
3301
3302 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3303 return True
3304
3305
3306 def verify_prefix_lists(tgen, input_dict):
3307 """
3308 Running "show ip prefix-list" command and verifying given prefix-list
3309 is present in router.
3310 Parameters
3311 ----------
3312 * `tgen` : topogen object
3313 * `input_dict`: data to verify prefix lists
3314 Usage
3315 -----
3316 # To verify pf_list_1 is present in router r1
3317 input_dict = {
3318 "r1": {
3319 "prefix_lists": ["pf_list_1"]
3320 }}
3321 result = verify_prefix_lists("ipv4", input_dict, tgen)
3322 Returns
3323 -------
3324 errormsg(str) or True
3325 """
3326
3327 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3328
3329 for router in input_dict.keys():
3330 if router not in tgen.routers():
3331 continue
3332
3333 rnode = tgen.routers()[router]
3334
3335 # Show ip prefix list
3336 show_prefix_list = run_frr_cmd(rnode, "show ip prefix-list")
3337
3338 # Verify Prefix list is deleted
3339 prefix_lists_addr = input_dict[router]["prefix_lists"]
3340 for addr_type in prefix_lists_addr:
3341 if not check_address_types(addr_type):
3342 continue
3343
3344 for prefix_list in prefix_lists_addr[addr_type].keys():
3345 if prefix_list in show_prefix_list:
3346 errormsg = (
3347 "Prefix list {} is/are present in the router"
3348 " {}".format(prefix_list, router)
3349 )
3350 return errormsg
3351
3352 logger.info(
3353 "Prefix list %s is/are not present in the router" " from router %s",
3354 prefix_list,
3355 router,
3356 )
3357
3358 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3359 return True
3360
3361
3362 @retry(attempts=3, wait=4, return_is_str=True)
3363 def verify_route_maps(tgen, input_dict):
3364 """
3365 Running "show route-map" command and verifying given route-map
3366 is present in router.
3367 Parameters
3368 ----------
3369 * `tgen` : topogen object
3370 * `input_dict`: data to verify prefix lists
3371 Usage
3372 -----
3373 # To verify rmap_1 and rmap_2 are present in router r1
3374 input_dict = {
3375 "r1": {
3376 "route_maps": ["rmap_1", "rmap_2"]
3377 }
3378 }
3379 result = verify_route_maps(tgen, input_dict)
3380 Returns
3381 -------
3382 errormsg(str) or True
3383 """
3384
3385 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3386
3387 for router in input_dict.keys():
3388 if router not in tgen.routers():
3389 continue
3390
3391 rnode = tgen.routers()[router]
3392 # Show ip route-map
3393 show_route_maps = rnode.vtysh_cmd("show route-map")
3394
3395 # Verify route-map is deleted
3396 route_maps = input_dict[router]["route_maps"]
3397 for route_map in route_maps:
3398 if route_map in show_route_maps:
3399 errormsg = "Route map {} is not deleted from router" " {}".format(
3400 route_map, router
3401 )
3402 return errormsg
3403
3404 logger.info(
3405 "Route map %s is/are deleted successfully from" " router %s",
3406 route_maps,
3407 router,
3408 )
3409
3410 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3411 return True
3412
3413
3414 @retry(attempts=4, wait=4, return_is_str=True)
3415 def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
3416 """
3417 API to veiryf BGP large community is attached in route for any given
3418 DUT by running "show bgp ipv4/6 {route address} json" command.
3419 Parameters
3420 ----------
3421 * `tgen`: topogen object
3422 * `addr_type` : ip type, ipv4/ipv6
3423 * `dut`: Device Under Test
3424 * `network`: network for which set criteria needs to be verified
3425 * `input_dict`: having details like - for which router, community and
3426 values needs to be verified
3427 Usage
3428 -----
3429 networks = ["200.50.2.0/32"]
3430 input_dict = {
3431 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
3432 }
3433 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
3434 Returns
3435 -------
3436 errormsg(str) or True
3437 """
3438
3439 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3440 if router not in tgen.routers():
3441 return False
3442
3443 rnode = tgen.routers()[router]
3444
3445 logger.debug(
3446 "Verifying BGP community attributes on dut %s: for %s " "network %s",
3447 router,
3448 addr_type,
3449 network,
3450 )
3451
3452 for net in network:
3453 cmd = "show bgp {} {} json".format(addr_type, net)
3454 show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True)
3455 logger.info(show_bgp_json)
3456 if "paths" not in show_bgp_json:
3457 return "Prefix {} not found in BGP table of router: {}".format(net, router)
3458
3459 as_paths = show_bgp_json["paths"]
3460 found = False
3461 for i in range(len(as_paths)):
3462 if (
3463 "largeCommunity" in show_bgp_json["paths"][i]
3464 or "community" in show_bgp_json["paths"][i]
3465 ):
3466 found = True
3467 logger.info(
3468 "Large Community attribute is found for route:" " %s in router: %s",
3469 net,
3470 router,
3471 )
3472 if input_dict is not None:
3473 for criteria, comm_val in input_dict.items():
3474 show_val = show_bgp_json["paths"][i][criteria]["string"]
3475 if comm_val == show_val:
3476 logger.info(
3477 "Verifying BGP %s for prefix: %s"
3478 " in router: %s, found expected"
3479 " value: %s",
3480 criteria,
3481 net,
3482 router,
3483 comm_val,
3484 )
3485 else:
3486 errormsg = (
3487 "Failed: Verifying BGP attribute"
3488 " {} for route: {} in router: {}"
3489 ", expected value: {} but found"
3490 ": {}".format(criteria, net, router, comm_val, show_val)
3491 )
3492 return errormsg
3493
3494 if not found:
3495 errormsg = (
3496 "Large Community attribute is not found for route: "
3497 "{} in router: {} ".format(net, router)
3498 )
3499 return errormsg
3500
3501 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3502 return True
3503
3504
3505 def verify_create_community_list(tgen, input_dict):
3506 """
3507 API is to verify if large community list is created for any given DUT in
3508 input_dict by running "sh bgp large-community-list {"comm_name"} detail"
3509 command.
3510 Parameters
3511 ----------
3512 * `tgen`: topogen object
3513 * `input_dict`: having details like - for which router, large community
3514 needs to be verified
3515 Usage
3516 -----
3517 input_dict = {
3518 "r1": {
3519 "large-community-list": {
3520 "standard": {
3521 "Test1": [{"action": "PERMIT", "attribute":\
3522 ""}]
3523 }}}}
3524 result = verify_create_community_list(tgen, input_dict)
3525 Returns
3526 -------
3527 errormsg(str) or True
3528 """
3529
3530 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3531
3532 for router in input_dict.keys():
3533 if router not in tgen.routers():
3534 continue
3535
3536 rnode = tgen.routers()[router]
3537
3538 logger.info("Verifying large-community is created for dut %s:", router)
3539
3540 for comm_data in input_dict[router]["bgp_community_lists"]:
3541 comm_name = comm_data["name"]
3542 comm_type = comm_data["community_type"]
3543 show_bgp_community = run_frr_cmd(
3544 rnode, "show bgp large-community-list {} detail".format(comm_name)
3545 )
3546
3547 # Verify community list and type
3548 if comm_name in show_bgp_community and comm_type in show_bgp_community:
3549 logger.info(
3550 "BGP %s large-community-list %s is" " created", comm_type, comm_name
3551 )
3552 else:
3553 errormsg = "BGP {} large-community-list {} is not" " created".format(
3554 comm_type, comm_name
3555 )
3556 return errormsg
3557
3558 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3559 return True
3560
3561
3562 def verify_cli_json(tgen, input_dict):
3563 """
3564 API to verify if JSON is available for clis
3565 command.
3566
3567 Parameters
3568 ----------
3569 * `tgen`: topogen object
3570 * `input_dict`: CLIs for which JSON needs to be verified
3571 Usage
3572 -----
3573 input_dict = {
3574 "edge1":{
3575 "cli": ["show evpn vni detail", show evpn rmac vni all]
3576 }
3577 }
3578
3579 result = verify_cli_json(tgen, input_dict)
3580
3581 Returns
3582 -------
3583 errormsg(str) or True
3584 """
3585
3586 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3587 for dut in input_dict.keys():
3588 rnode = tgen.routers()[dut]
3589
3590 for cli in input_dict[dut]["cli"]:
3591 logger.info(
3592 "[DUT: %s]: Verifying JSON is available for " "CLI %s :", dut, cli
3593 )
3594
3595 test_cli = "{} json".format(cli)
3596 ret_json = rnode.vtysh_cmd(test_cli, isjson=True)
3597 if not bool(ret_json):
3598 errormsg = "CLI: %s, JSON format is not available" % (cli)
3599 return errormsg
3600 elif "unknown" in ret_json or "Unknown" in ret_json:
3601 errormsg = "CLI: %s, JSON format is not available" % (cli)
3602 return errormsg
3603 else:
3604 logger.info(
3605 "CLI : %s JSON format is available: " "\n %s", cli, ret_json
3606 )
3607
3608 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3609
3610 return True
3611
3612
3613 @retry(attempts=3, wait=4, return_is_str=True)
3614 def verify_evpn_vni(tgen, input_dict):
3615 """
3616 API to verify evpn vni details using "show evpn vni detail json"
3617 command.
3618
3619 Parameters
3620 ----------
3621 * `tgen`: topogen object
3622 * `input_dict`: having details like - for which router, evpn details
3623 needs to be verified
3624 Usage
3625 -----
3626 input_dict = {
3627 "edge1":{
3628 "vni": [
3629 {
3630 "75100":{
3631 "vrf": "RED",
3632 "vxlanIntf": "vxlan75100",
3633 "localVtepIp": "120.1.1.1",
3634 "sviIntf": "br100"
3635 }
3636 }
3637 ]
3638 }
3639 }
3640
3641 result = verify_evpn_vni(tgen, input_dict)
3642
3643 Returns
3644 -------
3645 errormsg(str) or True
3646 """
3647
3648 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3649 for dut in input_dict.keys():
3650 rnode = tgen.routers()[dut]
3651
3652 logger.info("[DUT: %s]: Verifying evpn vni details :", dut)
3653
3654 cmd = "show evpn vni detail json"
3655 evpn_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
3656 if not bool(evpn_all_vni_json):
3657 errormsg = "No output for '{}' cli".format(cmd)
3658 return errormsg
3659
3660 if "vni" in input_dict[dut]:
3661 for vni_dict in input_dict[dut]["vni"]:
3662 found = False
3663 vni = vni_dict["name"]
3664 for evpn_vni_json in evpn_all_vni_json:
3665 if "vni" in evpn_vni_json:
3666 if evpn_vni_json["vni"] != int(vni):
3667 continue
3668
3669 for attribute in vni_dict.keys():
3670 if vni_dict[attribute] != evpn_vni_json[attribute]:
3671 errormsg = (
3672 "[DUT: %s] Verifying "
3673 "%s for VNI: %s [FAILED]||"
3674 ", EXPECTED : %s "
3675 " FOUND : %s"
3676 % (
3677 dut,
3678 attribute,
3679 vni,
3680 vni_dict[attribute],
3681 evpn_vni_json[attribute],
3682 )
3683 )
3684 return errormsg
3685
3686 else:
3687 found = True
3688 logger.info(
3689 "[DUT: %s] Verifying"
3690 " %s for VNI: %s , "
3691 "Found Expected : %s ",
3692 dut,
3693 attribute,
3694 vni,
3695 evpn_vni_json[attribute],
3696 )
3697
3698 if evpn_vni_json["state"] != "Up":
3699 errormsg = (
3700 "[DUT: %s] Failed: Verifying"
3701 " State for VNI: %s is not Up" % (dut, vni)
3702 )
3703 return errormsg
3704
3705 else:
3706 errormsg = (
3707 "[DUT: %s] Failed:"
3708 " VNI: %s is not present in JSON" % (dut, vni)
3709 )
3710 return errormsg
3711
3712 if found:
3713 logger.info(
3714 "[DUT %s]: Verifying VNI : %s "
3715 "details and state is Up [PASSED]!!",
3716 dut,
3717 vni,
3718 )
3719 return True
3720
3721 else:
3722 errormsg = (
3723 "[DUT: %s] Failed:" " vni details are not present in input data" % (dut)
3724 )
3725 return errormsg
3726
3727 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3728 return False
3729
3730
3731 @retry(attempts=3, wait=4, return_is_str=True)
3732 def verify_vrf_vni(tgen, input_dict):
3733 """
3734 API to verify vrf vni details using "show vrf vni json"
3735 command.
3736
3737 Parameters
3738 ----------
3739 * `tgen`: topogen object
3740 * `input_dict`: having details like - for which router, evpn details
3741 needs to be verified
3742 Usage
3743 -----
3744 input_dict = {
3745 "edge1":{
3746 "vrfs": [
3747 {
3748 "RED":{
3749 "vni": 75000,
3750 "vxlanIntf": "vxlan75100",
3751 "sviIntf": "br100",
3752 "routerMac": "00:80:48:ba:d1:00",
3753 "state": "Up"
3754 }
3755 }
3756 ]
3757 }
3758 }
3759
3760 result = verify_vrf_vni(tgen, input_dict)
3761
3762 Returns
3763 -------
3764 errormsg(str) or True
3765 """
3766
3767 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3768 for dut in input_dict.keys():
3769 rnode = tgen.routers()[dut]
3770
3771 logger.info("[DUT: %s]: Verifying vrf vni details :", dut)
3772
3773 cmd = "show vrf vni json"
3774 vrf_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
3775 if not bool(vrf_all_vni_json):
3776 errormsg = "No output for '{}' cli".format(cmd)
3777 return errormsg
3778
3779 if "vrfs" in input_dict[dut]:
3780 for vrfs in input_dict[dut]["vrfs"]:
3781 for vrf, vrf_dict in vrfs.items():
3782 found = False
3783 for vrf_vni_json in vrf_all_vni_json["vrfs"]:
3784 if "vrf" in vrf_vni_json:
3785 if vrf_vni_json["vrf"] != vrf:
3786 continue
3787
3788 for attribute in vrf_dict.keys():
3789 if vrf_dict[attribute] == vrf_vni_json[attribute]:
3790 found = True
3791 logger.info(
3792 "[DUT %s]: VRF: %s, "
3793 "verifying %s "
3794 ", Found Expected: %s "
3795 "[PASSED]!!",
3796 dut,
3797 vrf,
3798 attribute,
3799 vrf_vni_json[attribute],
3800 )
3801 else:
3802 errormsg = (
3803 "[DUT: %s] VRF: %s, "
3804 "verifying %s [FAILED!!] "
3805 ", EXPECTED : %s "
3806 ", FOUND : %s"
3807 % (
3808 dut,
3809 vrf,
3810 attribute,
3811 vrf_dict[attribute],
3812 vrf_vni_json[attribute],
3813 )
3814 )
3815 return errormsg
3816
3817 else:
3818 errormsg = "[DUT: %s] VRF: %s " "is not present in JSON" % (
3819 dut,
3820 vrf,
3821 )
3822 return errormsg
3823
3824 if found:
3825 logger.info(
3826 "[DUT %s] Verifying VRF: %s " " details [PASSED]!!",
3827 dut,
3828 vrf,
3829 )
3830 return True
3831
3832 else:
3833 errormsg = (
3834 "[DUT: %s] Failed:" " vrf details are not present in input data" % (dut)
3835 )
3836 return errormsg
3837
3838 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3839 return False
3840
3841
3842 def required_linux_kernel_version(required_version):
3843 """
3844 This API is used to check linux version compatibility of the test suite.
3845 If version mentioned in required_version is higher than the linux kernel
3846 of the system, test suite will be skipped. This API returns true or errormsg.
3847
3848 Parameters
3849 ----------
3850 * `required_version` : Kernel version required for the suites to run.
3851
3852 Usage
3853 -----
3854 result = linux_kernel_version_lowerthan('4.15')
3855
3856 Returns
3857 -------
3858 errormsg(str) or True
3859 """
3860 system_kernel = platform.release()
3861 if version_cmp(system_kernel, required_version) < 0:
3862 error_msg = (
3863 'These tests will not run on kernel "{}", '
3864 "they require kernel >= {})".format(system_kernel, required_version)
3865 )
3866 return error_msg
3867 return True