]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/common_config.py
tests: Adding framework support for EVPN-Type5 automation
[mirror_frr.git] / tests / topotests / lib / common_config.py
1 #
2 # Copyright (c) 2019 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
5 #
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
9 # in all copies.
10 #
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
18 # OF THIS SOFTWARE.
19 #
20
21 from collections import OrderedDict
22 from datetime import datetime
23 from time import sleep
24 from copy import deepcopy
25 from subprocess import call
26 from subprocess import STDOUT as SUB_STDOUT
27 from subprocess import PIPE as SUB_PIPE
28 from subprocess import Popen
29 from functools import wraps
30 from re import search as re_search
31 from tempfile import mkdtemp
32
33 import StringIO
34 import os
35 import sys
36 import ConfigParser
37 import traceback
38 import socket
39 import ipaddress
40
41 from lib.topolog import logger, logger_config
42 from lib.topogen import TopoRouter, get_topogen
43 from lib.topotest import interface_set_status
44
45 FRRCFG_FILE = "frr_json.conf"
46 FRRCFG_BKUP_FILE = "frr_json_initial.conf"
47
48 ERROR_LIST = ["Malformed", "Failure", "Unknown", "Incomplete"]
49 ROUTER_LIST = []
50
51 ####
52 CD = os.path.dirname(os.path.realpath(__file__))
53 PYTESTINI_PATH = os.path.join(CD, "../pytest.ini")
54
55 # Creating tmp dir with testsuite name to avoid conflict condition when
56 # multiple testsuites run together. All temporary files would be created
57 # in this dir and this dir would be removed once testsuite run is
58 # completed
59 LOGDIR = "/tmp/topotests/"
60 TMPDIR = None
61
62 # NOTE: to save execution logs to log file frrtest_log_dir must be configured
63 # in `pytest.ini`.
64 config = ConfigParser.ConfigParser()
65 config.read(PYTESTINI_PATH)
66
67 config_section = "topogen"
68
69 if config.has_option("topogen", "verbosity"):
70 loglevel = config.get("topogen", "verbosity")
71 loglevel = loglevel.upper()
72 else:
73 loglevel = "INFO"
74
75 if config.has_option("topogen", "frrtest_log_dir"):
76 frrtest_log_dir = config.get("topogen", "frrtest_log_dir")
77 time_stamp = datetime.time(datetime.now())
78 logfile_name = "frr_test_bgp_"
79 frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp)
80 print("frrtest_log_file..", frrtest_log_file)
81
82 logger = logger_config.get_logger(
83 name="test_execution_logs", log_level=loglevel, target=frrtest_log_file
84 )
85 print("Logs will be sent to logfile: {}".format(frrtest_log_file))
86
87 if config.has_option("topogen", "show_router_config"):
88 show_router_config = config.get("topogen", "show_router_config")
89 else:
90 show_router_config = False
91
92 # env variable for setting what address type to test
93 ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES")
94
95
96 # Saves sequence id numbers
97 SEQ_ID = {"prefix_lists": {}, "route_maps": {}}
98
99
100 def get_seq_id(obj_type, router, obj_name):
101 """
102 Generates and saves sequence number in interval of 10
103 Parameters
104 ----------
105 * `obj_type`: prefix_lists or route_maps
106 * `router`: router name
107 *` obj_name`: name of the prefix-list or route-map
108 Returns
109 --------
110 Sequence number generated
111 """
112
113 router_data = SEQ_ID[obj_type].setdefault(router, {})
114 obj_data = router_data.setdefault(obj_name, {})
115 seq_id = obj_data.setdefault("seq_id", 0)
116
117 seq_id = int(seq_id) + 10
118 obj_data["seq_id"] = seq_id
119
120 return seq_id
121
122
123 def set_seq_id(obj_type, router, id, obj_name):
124 """
125 Saves sequence number if not auto-generated and given by user
126 Parameters
127 ----------
128 * `obj_type`: prefix_lists or route_maps
129 * `router`: router name
130 *` obj_name`: name of the prefix-list or route-map
131 """
132 router_data = SEQ_ID[obj_type].setdefault(router, {})
133 obj_data = router_data.setdefault(obj_name, {})
134 seq_id = obj_data.setdefault("seq_id", 0)
135
136 seq_id = int(seq_id) + int(id)
137 obj_data["seq_id"] = seq_id
138
139
140 class InvalidCLIError(Exception):
141 """Raise when the CLI command is wrong"""
142
143 pass
144
145
146 def run_frr_cmd(rnode, cmd, isjson=False):
147 """
148 Execute frr show commands in priviledged mode
149 * `rnode`: router node on which commands needs to executed
150 * `cmd`: Command to be executed on frr
151 * `isjson`: If command is to get json data or not
152 :return str:
153 """
154
155 if cmd:
156 ret_data = rnode.vtysh_cmd(cmd, isjson=isjson)
157
158 if True:
159 if isjson:
160 logger.debug(ret_data)
161 print_data = rnode.vtysh_cmd(cmd.rstrip("json"), isjson=False)
162 else:
163 print_data = ret_data
164
165 logger.info(
166 "Output for command [ %s] on router %s:\n%s",
167 cmd.rstrip("json"),
168 rnode.name,
169 print_data,
170 )
171 return ret_data
172
173 else:
174 raise InvalidCLIError("No actual cmd passed")
175
176
177 def apply_raw_config(tgen, input_dict):
178
179 """
180 API to configure raw configuration on device. This can be used for any cli
181 which does has not been implemented in JSON.
182
183 Parameters
184 ----------
185 * `tgen`: tgen onject
186 * `input_dict`: configuration that needs to be applied
187
188 Usage
189 -----
190 input_dict = {
191 "r2": {
192 "raw_config": [
193 "router bgp",
194 "no bgp update-group-split-horizon"
195 ]
196 }
197 }
198 Returns
199 -------
200 True or errormsg
201 """
202
203 result = True
204 for router_name in input_dict.keys():
205 config_cmd = input_dict[router_name]["raw_config"]
206
207 if not isinstance(config_cmd, list):
208 config_cmd = [config_cmd]
209
210 frr_cfg_file = "{}/{}/{}".format(TMPDIR, router_name, FRRCFG_FILE)
211 with open(frr_cfg_file, "w") as cfg:
212 for cmd in config_cmd:
213 cfg.write("{}\n".format(cmd))
214
215 result = load_config_to_router(tgen, router_name)
216
217 return result
218
219
220 def create_common_configuration(
221 tgen, router, data, config_type=None, build=False, load_config=True
222 ):
223 """
224 API to create object of class FRRConfig and also create frr_json.conf
225 file. It will create interface and common configurations and save it to
226 frr_json.conf and load to router
227 Parameters
228 ----------
229 * `tgen`: tgen onject
230 * `data`: Congiguration data saved in a list.
231 * `router` : router id to be configured.
232 * `config_type` : Syntactic information while writing configuration. Should
233 be one of the value as mentioned in the config_map below.
234 * `build` : Only for initial setup phase this is set as True
235 Returns
236 -------
237 True or False
238 """
239 TMPDIR = os.path.join(LOGDIR, tgen.modname)
240
241 fname = "{}/{}/{}".format(TMPDIR, router, FRRCFG_FILE)
242
243 config_map = OrderedDict(
244 {
245 "general_config": "! FRR General Config\n",
246 "interface_config": "! Interfaces Config\n",
247 "static_route": "! Static Route Config\n",
248 "prefix_list": "! Prefix List Config\n",
249 "bgp_community_list": "! Community List Config\n",
250 "route_maps": "! Route Maps Config\n",
251 "bgp": "! BGP Config\n",
252 "vrf": "! VRF Config\n",
253 }
254 )
255
256 if build:
257 mode = "a"
258 elif not load_config:
259 mode = "a"
260 else:
261 mode = "w"
262
263 try:
264 frr_cfg_fd = open(fname, mode)
265 if config_type:
266 frr_cfg_fd.write(config_map[config_type])
267 for line in data:
268 frr_cfg_fd.write("{} \n".format(str(line)))
269 frr_cfg_fd.write("\n")
270
271 except IOError as err:
272 logger.error(
273 "Unable to open FRR Config File. error(%s): %s" % (err.errno, err.strerror)
274 )
275 return False
276 finally:
277 frr_cfg_fd.close()
278
279 # If configuration applied from build, it will done at last
280 if not build and load_config:
281 load_config_to_router(tgen, router)
282
283 return True
284
285
286 def kill_router_daemons(tgen, router, daemons):
287 """
288 Router's current config would be saved to /etc/frr/ for each deamon
289 and deamon would be killed forcefully using SIGKILL.
290 * `tgen` : topogen object
291 * `router`: Device under test
292 * `daemons`: list of daemons to be killed
293 """
294
295 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
296
297 try:
298 router_list = tgen.routers()
299
300 # Saving router config to /etc/frr, which will be loaded to router
301 # when it starts
302 router_list[router].vtysh_cmd("write memory")
303
304 # Kill Daemons
305 result = router_list[router].killDaemons(daemons)
306 if len(result) > 0:
307 assert "Errors found post shutdown - details follow:" == 0, result
308 return result
309
310 except Exception as e:
311 errormsg = traceback.format_exc()
312 logger.error(errormsg)
313 return errormsg
314
315
316 def start_router_daemons(tgen, router, daemons):
317 """
318 Daemons defined by user would be started
319 * `tgen` : topogen object
320 * `router`: Device under test
321 * `daemons`: list of daemons to be killed
322 """
323
324 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
325
326 try:
327 router_list = tgen.routers()
328
329 # Start daemons
330 result = router_list[router].startDaemons(daemons)
331 return result
332
333 except Exception as e:
334 errormsg = traceback.format_exc()
335 logger.error(errormsg)
336 return errormsg
337
338 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
339 return True
340
341
342 def kill_mininet_routers_process(tgen):
343 """
344 Kill all mininet stale router' processes
345 * `tgen` : topogen object
346 """
347
348 router_list = tgen.routers()
349 for rname, router in router_list.iteritems():
350 daemon_list = [
351 "zebra",
352 "ospfd",
353 "ospf6d",
354 "bgpd",
355 "ripd",
356 "ripngd",
357 "isisd",
358 "pimd",
359 "ldpd",
360 "staticd",
361 ]
362 for daemon in daemon_list:
363 router.run("killall -9 {}".format(daemon))
364
365
366 def check_router_status(tgen):
367 """
368 Check if all daemons are running for all routers in topology
369 * `tgen` : topogen object
370 """
371
372 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
373
374 try:
375 router_list = tgen.routers()
376 for router, rnode in router_list.iteritems():
377
378 result = rnode.check_router_running()
379 if result != "":
380 daemons = []
381 if "bgpd" in result:
382 daemons.append("bgpd")
383 if "zebra" in result:
384 daemons.append("zebra")
385
386 rnode.startDaemons(daemons)
387
388 except Exception as e:
389 errormsg = traceback.format_exc()
390 logger.error(errormsg)
391 return errormsg
392
393 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
394 return True
395
396
397 def reset_config_on_routers(tgen, routerName=None):
398 """
399 Resets configuration on routers to the snapshot created using input JSON
400 file. It replaces existing router configuration with FRRCFG_BKUP_FILE
401
402 Parameters
403 ----------
404 * `tgen` : Topogen object
405 * `routerName` : router config is to be reset
406 """
407
408 logger.debug("Entering API: reset_config_on_routers")
409
410 router_list = tgen.routers()
411 for rname in ROUTER_LIST:
412 if routerName and routerName != rname:
413 continue
414
415 router = router_list[rname]
416 logger.info("Configuring router %s to initial test configuration", rname)
417
418 cfg = router.run("vtysh -c 'show running'")
419 fname = "{}/{}/frr.sav".format(TMPDIR, rname)
420 dname = "{}/{}/delta.conf".format(TMPDIR, rname)
421 f = open(fname, "w")
422 for line in cfg.split("\n"):
423 line = line.strip()
424
425 if (
426 line == "Building configuration..."
427 or line == "Current configuration:"
428 or not line
429 ):
430 continue
431 f.write(line)
432 f.write("\n")
433
434 f.close()
435 run_cfg_file = "{}/{}/frr.sav".format(TMPDIR, rname)
436 init_cfg_file = "{}/{}/frr_json_initial.conf".format(TMPDIR, rname)
437 command = "/usr/lib/frr/frr-reload.py --input {} --test {} > {}".format(
438 run_cfg_file, init_cfg_file, dname
439 )
440 result = call(command, shell=True, stderr=SUB_STDOUT, stdout=SUB_PIPE)
441
442 # Assert if command fail
443 if result > 0:
444 logger.error("Delta file creation failed. Command executed %s", command)
445 with open(run_cfg_file, "r") as fd:
446 logger.info(
447 "Running configuration saved in %s is:\n%s", run_cfg_file, fd.read()
448 )
449 with open(init_cfg_file, "r") as fd:
450 logger.info(
451 "Test configuration saved in %s is:\n%s", init_cfg_file, fd.read()
452 )
453
454 err_cmd = ["/usr/bin/vtysh", "-m", "-f", run_cfg_file]
455 result = Popen(err_cmd, stdout=SUB_PIPE, stderr=SUB_PIPE)
456 output = result.communicate()
457 for out_data in output:
458 temp_data = out_data.decode("utf-8").lower()
459 for out_err in ERROR_LIST:
460 if out_err.lower() in temp_data:
461 logger.error(
462 "Found errors while validating data in" " %s", run_cfg_file
463 )
464 raise InvalidCLIError(out_data)
465 raise InvalidCLIError("Unknown error in %s", output)
466
467 f = open(dname, "r")
468 delta = StringIO.StringIO()
469 delta.write("configure terminal\n")
470 t_delta = f.read()
471
472 # Don't disable debugs
473 check_debug = True
474
475 for line in t_delta.split("\n"):
476 line = line.strip()
477 if line == "Lines To Delete" or line == "===============" or not line:
478 continue
479
480 if line == "Lines To Add":
481 check_debug = False
482 continue
483
484 if line == "============" or not line:
485 continue
486
487 # Leave debugs and log output alone
488 if check_debug:
489 if "debug" in line or "log file" in line:
490 continue
491
492 delta.write(line)
493 delta.write("\n")
494
495 f.close()
496
497 delta.write("end\n")
498
499 output = router.vtysh_multicmd(delta.getvalue(), pretty_output=False)
500
501 delta.close()
502 delta = StringIO.StringIO()
503 cfg = router.run("vtysh -c 'show running'")
504 for line in cfg.split("\n"):
505 line = line.strip()
506 delta.write(line)
507 delta.write("\n")
508
509 # Router current configuration to log file or console if
510 # "show_router_config" is defined in "pytest.ini"
511 if show_router_config:
512 logger.info("Configuration on router {} after reset:".format(rname))
513 logger.info(delta.getvalue())
514 delta.close()
515
516 logger.debug("Exiting API: reset_config_on_routers")
517 return True
518
519
520 def load_config_to_router(tgen, routerName, save_bkup=False):
521 """
522 Loads configuration on router from the file FRRCFG_FILE.
523
524 Parameters
525 ----------
526 * `tgen` : Topogen object
527 * `routerName` : router for which configuration to be loaded
528 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
529 """
530
531 logger.debug("Entering API: load_config_to_router")
532
533 router_list = tgen.routers()
534 for rname in ROUTER_LIST:
535 if routerName and rname != routerName:
536 continue
537
538 router = router_list[rname]
539 try:
540 frr_cfg_file = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_FILE)
541 frr_cfg_bkup = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_BKUP_FILE)
542 with open(frr_cfg_file, "r+") as cfg:
543 data = cfg.read()
544 logger.info(
545 "Applying following configuration on router"
546 " {}:\n{}".format(rname, data)
547 )
548 if save_bkup:
549 with open(frr_cfg_bkup, "w") as bkup:
550 bkup.write(data)
551
552 output = router.vtysh_multicmd(data, pretty_output=False)
553 for out_err in ERROR_LIST:
554 if out_err.lower() in output.lower():
555 raise InvalidCLIError("%s" % output)
556
557 cfg.truncate(0)
558
559 except IOError as err:
560 errormsg = (
561 "Unable to open config File. error(%s):" " %s",
562 (err.errno, err.strerror),
563 )
564 return errormsg
565
566 # Router current configuration to log file or console if
567 # "show_router_config" is defined in "pytest.ini"
568 if show_router_config:
569 logger.info("New configuration for router {}:".format(rname))
570 new_config = router.run("vtysh -c 'show running'")
571 logger.info(new_config)
572
573 logger.debug("Exiting API: load_config_to_router")
574 return True
575
576
577 def get_frr_ipv6_linklocal(tgen, router, intf=None, vrf=None):
578 """
579 API to get the link local ipv6 address of a perticular interface using
580 FRR command 'show interface'
581
582 * `tgen`: tgen onject
583 * `router` : router for which hightest interface should be
584 calculated
585 * `intf` : interface for which linklocal address needs to be taken
586 * `vrf` : VRF name
587
588 Usage
589 -----
590 linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A)
591
592 Returns
593 -------
594 1) array of interface names to link local ips.
595 """
596
597 router_list = tgen.routers()
598 for rname, rnode in router_list.iteritems():
599 if rname != router:
600 continue
601
602 linklocal = []
603
604 if vrf:
605 cmd = "show interface vrf {}".format(vrf)
606 else:
607 cmd = "show interface"
608
609 ifaces = router_list[router].run('vtysh -c "{}"'.format(cmd))
610
611 # Fix newlines (make them all the same)
612 ifaces = ("\n".join(ifaces.splitlines()) + "\n").splitlines()
613
614 interface = None
615 ll_per_if_count = 0
616 for line in ifaces:
617 # Interface name
618 m = re_search("Interface ([a-zA-Z0-9-]+) is", line)
619 if m:
620 interface = m.group(1).split(" ")[0]
621 ll_per_if_count = 0
622
623 # Interface ip
624 m1 = re_search("inet6 (fe80[:a-fA-F0-9]+[\/0-9]+)", line)
625 if m1:
626 local = m1.group(1)
627 ll_per_if_count += 1
628 if ll_per_if_count > 1:
629 linklocal += [["%s-%s" % (interface, ll_per_if_count), local]]
630 else:
631 linklocal += [[interface, local]]
632
633 if linklocal:
634 if intf:
635 return [_linklocal[1] for _linklocal in linklocal if _linklocal[0] == intf][
636 0
637 ].split("/")[0]
638 return linklocal
639 else:
640 errormsg = "Link local ip missing on router {}"
641 return errormsg
642
643
644 def generate_support_bundle():
645 """
646 API to generate support bundle on any verification ste failure.
647 it runs a python utility, /usr/lib/frr/generate_support_bundle.py,
648 which basically runs defined CLIs and dumps the data to specified location
649 """
650
651 tgen = get_topogen()
652 router_list = tgen.routers()
653 test_name = sys._getframe(2).f_code.co_name
654 TMPDIR = os.path.join(LOGDIR, tgen.modname)
655
656 for rname, rnode in router_list.iteritems():
657 logger.info("Generating support bundle for {}".format(rname))
658 rnode.run("mkdir -p /var/log/frr")
659 bundle_log = rnode.run("python2 /usr/lib/frr/generate_support_bundle.py")
660 logger.info(bundle_log)
661
662 dst_bundle = "{}/{}/support_bundles/{}".format(TMPDIR, rname, test_name)
663 src_bundle = "/var/log/frr"
664 rnode.run("rm -rf {}".format(dst_bundle))
665 rnode.run("mkdir -p {}".format(dst_bundle))
666 rnode.run("mv -f {}/* {}".format(src_bundle, dst_bundle))
667
668 return True
669
670
671 def start_topology(tgen):
672 """
673 Starting topology, create tmp files which are loaded to routers
674 to start deamons and then start routers
675 * `tgen` : topogen object
676 """
677
678 global TMPDIR, ROUTER_LIST
679 # Starting topology
680 tgen.start_topology()
681
682 # Starting deamons
683
684 router_list = tgen.routers()
685 ROUTER_LIST = sorted(
686 router_list.keys(), key=lambda x: int(re_search("\d+", x).group(0))
687 )
688 TMPDIR = os.path.join(LOGDIR, tgen.modname)
689
690 router_list = tgen.routers()
691 for rname in ROUTER_LIST:
692 router = router_list[rname]
693
694 # It will help in debugging the failures, will give more details on which
695 # specific kernel version tests are failing
696 linux_ver = router.run("uname -a")
697 logger.info("Logging platform related details: \n %s \n", linux_ver)
698
699 try:
700 os.chdir(TMPDIR)
701
702 # Creating router named dir and empty zebra.conf bgpd.conf files
703 # inside the current directory
704 if os.path.isdir("{}".format(rname)):
705 os.system("rm -rf {}".format(rname))
706 os.mkdir("{}".format(rname))
707 os.system("chmod -R go+rw {}".format(rname))
708 os.chdir("{}/{}".format(TMPDIR, rname))
709 os.system("touch zebra.conf bgpd.conf")
710 else:
711 os.mkdir("{}".format(rname))
712 os.system("chmod -R go+rw {}".format(rname))
713 os.chdir("{}/{}".format(TMPDIR, rname))
714 os.system("touch zebra.conf bgpd.conf")
715
716 except IOError as (errno, strerror):
717 logger.error("I/O error({0}): {1}".format(errno, strerror))
718
719 # Loading empty zebra.conf file to router, to start the zebra deamon
720 router.load_config(
721 TopoRouter.RD_ZEBRA, "{}/{}/zebra.conf".format(TMPDIR, rname)
722 )
723 # Loading empty bgpd.conf file to router, to start the bgp deamon
724 router.load_config(TopoRouter.RD_BGP, "{}/{}/bgpd.conf".format(TMPDIR, rname))
725
726 # Starting routers
727 logger.info("Starting all routers once topology is created")
728 tgen.start_router()
729
730
731 def stop_router(tgen, router):
732 """
733 Router"s current config would be saved to /etc/frr/ for each deamon
734 and router and its deamons would be stopped.
735
736 * `tgen` : topogen object
737 * `router`: Device under test
738 """
739
740 router_list = tgen.routers()
741
742 # Saving router config to /etc/frr, which will be loaded to router
743 # when it starts
744 router_list[router].vtysh_cmd("write memory")
745
746 # Stop router
747 router_list[router].stop()
748
749
750 def start_router(tgen, router):
751 """
752 Router will started and config would be loaded from /etc/frr/ for each
753 deamon
754
755 * `tgen` : topogen object
756 * `router`: Device under test
757 """
758
759 logger.debug("Entering lib API: start_router")
760
761 try:
762 router_list = tgen.routers()
763
764 # Router and its deamons would be started and config would
765 # be loaded to router for each deamon from /etc/frr
766 router_list[router].start()
767
768 # Waiting for router to come up
769 sleep(5)
770
771 except Exception as e:
772 errormsg = traceback.format_exc()
773 logger.error(errormsg)
774 return errormsg
775
776 logger.debug("Exiting lib API: start_router()")
777 return True
778
779
780 def number_to_row(routerName):
781 """
782 Returns the number for the router.
783 Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23
784 etc
785 """
786 return int(routerName[1:])
787
788
789 def number_to_column(routerName):
790 """
791 Returns the number for the router.
792 Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1,
793 z23 = column 26 etc
794 """
795 return ord(routerName[0]) - 97
796
797
798 #############################################
799 # Common APIs, will be used by all protocols
800 #############################################
801
802
803 def create_vrf_cfg(tgen, topo, input_dict=None, build=False):
804 """
805 Create vrf configuration for created topology. VRF
806 configuration is provided in input json file.
807
808 VRF config is done in Linux Kernel:
809 * Create VRF
810 * Attach interface to VRF
811 * Bring up VRF
812
813 Parameters
814 ----------
815 * `tgen` : Topogen object
816 * `topo` : json file data
817 * `input_dict` : Input dict data, required when configuring
818 from testcase
819 * `build` : Only for initial setup phase this is set as True.
820
821 Usage
822 -----
823 input_dict={
824 "r3": {
825 "links": {
826 "r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_A"},
827 "r2-link2": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_B"},
828 "r2-link3": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_A"},
829 "r2-link4": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_B"},
830 },
831 "vrfs":[
832 {
833 "name": "RED_A",
834 "id": "1"
835 },
836 {
837 "name": "RED_B",
838 "id": "2"
839 },
840 {
841 "name": "BLUE_A",
842 "id": "3",
843 "delete": True
844 },
845 {
846 "name": "BLUE_B",
847 "id": "4"
848 }
849 ]
850 }
851 }
852 result = create_vrf_cfg(tgen, topo, input_dict)
853
854 Returns
855 -------
856 True or False
857 """
858 result = True
859 if not input_dict:
860 input_dict = deepcopy(topo)
861 else:
862 input_dict = deepcopy(input_dict)
863
864 try:
865 for c_router, c_data in input_dict.iteritems():
866 rnode = tgen.routers()[c_router]
867 if "vrfs" in c_data:
868 for vrf in c_data["vrfs"]:
869 config_data = []
870 del_action = vrf.setdefault("delete", False)
871 name = vrf.setdefault("name", None)
872 table_id = vrf.setdefault("id", None)
873 vni = vrf.setdefault("vni", None)
874 del_vni = vrf.setdefault("no_vni", None)
875
876 if del_action:
877 # Kernel cmd- Add VRF and table
878 cmd = "ip link del {} type vrf table {}".format(
879 vrf["name"], vrf["id"]
880 )
881
882 logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
883 rnode.run(cmd)
884
885 # Kernel cmd - Bring down VRF
886 cmd = "ip link set dev {} down".format(name)
887 logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
888 rnode.run(cmd)
889
890 else:
891 if name and table_id:
892 # Kernel cmd- Add VRF and table
893 cmd = "ip link add {} type vrf table {}".format(
894 name, table_id
895 )
896 logger.info(
897 "[DUT: %s]: Running kernel cmd " "[%s]", c_router, cmd
898 )
899 rnode.run(cmd)
900
901 # Kernel cmd - Bring up VRF
902 cmd = "ip link set dev {} up".format(name)
903 logger.info(
904 "[DUT: %s]: Running kernel " "cmd [%s]", c_router, cmd
905 )
906 rnode.run(cmd)
907
908 if "links" in c_data:
909 for destRouterLink, data in sorted(
910 c_data["links"].iteritems()
911 ):
912 # Loopback interfaces
913 if "type" in data and data["type"] == "loopback":
914 interface_name = destRouterLink
915 else:
916 interface_name = data["interface"]
917
918 if "vrf" in data:
919 vrf_list = data["vrf"]
920
921 if type(vrf_list) is not list:
922 vrf_list = [vrf_list]
923
924 for _vrf in vrf_list:
925 cmd = "ip link set {} master {}".format(
926 interface_name, _vrf
927 )
928
929 logger.info(
930 "[DUT: %s]: Running" " kernel cmd [%s]",
931 c_router,
932 cmd,
933 )
934 rnode.run(cmd)
935
936 if vni:
937 config_data.append("vrf {}".format(vrf["name"]))
938 cmd = "vni {}".format(vni)
939 config_data.append(cmd)
940
941 if del_vni:
942 config_data.append("vrf {}".format(vrf["name"]))
943 cmd = "no vni {}".format(del_vni)
944 config_data.append(cmd)
945
946 result = create_common_configuration(
947 tgen, c_router, config_data, "vrf", build=build
948 )
949
950 except InvalidCLIError:
951 # Traceback
952 errormsg = traceback.format_exc()
953 logger.error(errormsg)
954 return errormsg
955
956 return result
957
958
959 def create_interface_in_kernel(
960 tgen, dut, name, ip_addr, vrf=None, netmask=None, create=True
961 ):
962 """
963 Cretae interfaces in kernel for ipv4/ipv6
964 Config is done in Linux Kernel:
965
966 Parameters
967 ----------
968 * `tgen` : Topogen object
969 * `dut` : Device for which interfaces to be added
970 * `name` : interface name
971 * `ip_addr` : ip address for interface
972 * `vrf` : VRF name, to which interface will be associated
973 * `netmask` : netmask value, default is None
974 * `create`: Create interface in kernel, if created then no need
975 to create
976 """
977
978 rnode = tgen.routers()[dut]
979
980 if create:
981 cmd = "sudo ip link add name {} type dummy".format(name)
982 rnode.run(cmd)
983
984 addr_type = validate_ip_address(ip_addr)
985 if addr_type == "ipv4":
986 cmd = "ifconfig {} {} netmask {}".format(name, ip_addr, netmask)
987 else:
988 cmd = "ifconfig {} inet6 add {}/{}".format(name, ip_addr, netmask)
989
990 rnode.run(cmd)
991
992 if vrf:
993 cmd = "ip link set {} master {}".format(name, vrf)
994 rnode.run(cmd)
995
996
997 def shutdown_bringup_interface_in_kernel(tgen, dut, intf_name, ifaceaction=False):
998 """
999 Cretae interfaces in kernel for ipv4/ipv6
1000 Config is done in Linux Kernel:
1001
1002 Parameters
1003 ----------
1004 * `tgen` : Topogen object
1005 * `dut` : Device for which interfaces to be added
1006 * `intf_name` : interface name
1007 * `ifaceaction` : False to shutdown and True to bringup the
1008 ineterface
1009 """
1010
1011 rnode = tgen.routers()[dut]
1012
1013 cmd = "ip link set dev"
1014 if ifaceaction:
1015 action = "up"
1016 cmd = "{} {} {}".format(cmd, intf_name, action)
1017 else:
1018 action = "down"
1019 cmd = "{} {} {}".format(cmd, intf_name, action)
1020
1021 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1022 rnode.run(cmd)
1023
1024
1025 def validate_ip_address(ip_address):
1026 """
1027 Validates the type of ip address
1028 Parameters
1029 ----------
1030 * `ip_address`: IPv4/IPv6 address
1031 Returns
1032 -------
1033 Type of address as string
1034 """
1035
1036 if "/" in ip_address:
1037 ip_address = ip_address.split("/")[0]
1038
1039 v4 = True
1040 v6 = True
1041 try:
1042 socket.inet_aton(ip_address)
1043 except socket.error as error:
1044 logger.debug("Not a valid IPv4 address")
1045 v4 = False
1046 else:
1047 return "ipv4"
1048
1049 try:
1050 socket.inet_pton(socket.AF_INET6, ip_address)
1051 except socket.error as error:
1052 logger.debug("Not a valid IPv6 address")
1053 v6 = False
1054 else:
1055 return "ipv6"
1056
1057 if not v4 and not v6:
1058 raise Exception(
1059 "InvalidIpAddr", "%s is neither valid IPv4 or IPv6" " address" % ip_address
1060 )
1061
1062
1063 def check_address_types(addr_type=None):
1064 """
1065 Checks environment variable set and compares with the current address type
1066 """
1067
1068 addr_types_env = os.environ.get("ADDRESS_TYPES")
1069 if not addr_types_env:
1070 addr_types_env = "dual"
1071
1072 if addr_types_env == "dual":
1073 addr_types = ["ipv4", "ipv6"]
1074 elif addr_types_env == "ipv4":
1075 addr_types = ["ipv4"]
1076 elif addr_types_env == "ipv6":
1077 addr_types = ["ipv6"]
1078
1079 if addr_type is None:
1080 return addr_types
1081
1082 if addr_type not in addr_types:
1083 logger.debug(
1084 "{} not in supported/configured address types {}".format(
1085 addr_type, addr_types
1086 )
1087 )
1088 return False
1089
1090 return True
1091
1092
1093 def generate_ips(network, no_of_ips):
1094 """
1095 Returns list of IPs.
1096 based on start_ip and no_of_ips
1097 * `network` : from here the ip will start generating,
1098 start_ip will be
1099 * `no_of_ips` : these many IPs will be generated
1100 """
1101
1102 ipaddress_list = []
1103 if type(network) is not list:
1104 network = [network]
1105
1106 for start_ipaddr in network:
1107 if "/" in start_ipaddr:
1108 start_ip = start_ipaddr.split("/")[0]
1109 mask = int(start_ipaddr.split("/")[1])
1110
1111 addr_type = validate_ip_address(start_ip)
1112 if addr_type == "ipv4":
1113 start_ip = ipaddress.IPv4Address(unicode(start_ip))
1114 step = 2 ** (32 - mask)
1115 if addr_type == "ipv6":
1116 start_ip = ipaddress.IPv6Address(unicode(start_ip))
1117 step = 2 ** (128 - mask)
1118
1119 next_ip = start_ip
1120 count = 0
1121 while count < no_of_ips:
1122 ipaddress_list.append("{}/{}".format(next_ip, mask))
1123 if addr_type == "ipv6":
1124 next_ip = ipaddress.IPv6Address(int(next_ip) + step)
1125 else:
1126 next_ip += step
1127 count += 1
1128
1129 return ipaddress_list
1130
1131
1132 def find_interface_with_greater_ip(topo, router, loopback=True, interface=True):
1133 """
1134 Returns highest interface ip for ipv4/ipv6. If loopback is there then
1135 it will return highest IP from loopback IPs otherwise from physical
1136 interface IPs.
1137 * `topo` : json file data
1138 * `router` : router for which hightest interface should be calculated
1139 """
1140
1141 link_data = topo["routers"][router]["links"]
1142 lo_list = []
1143 interfaces_list = []
1144 lo_exists = False
1145 for destRouterLink, data in sorted(link_data.iteritems()):
1146 if loopback:
1147 if "type" in data and data["type"] == "loopback":
1148 lo_exists = True
1149 ip_address = topo["routers"][router]["links"][destRouterLink][
1150 "ipv4"
1151 ].split("/")[0]
1152 lo_list.append(ip_address)
1153 if interface:
1154 ip_address = topo["routers"][router]["links"][destRouterLink]["ipv4"].split(
1155 "/"
1156 )[0]
1157 interfaces_list.append(ip_address)
1158
1159 if lo_exists:
1160 return sorted(lo_list)[-1]
1161
1162 return sorted(interfaces_list)[-1]
1163
1164
1165 def write_test_header(tc_name):
1166 """ Display message at beginning of test case"""
1167 count = 20
1168 logger.info("*" * (len(tc_name) + count))
1169 step("START -> Testcase : %s" % tc_name, reset=True)
1170 logger.info("*" * (len(tc_name) + count))
1171
1172
1173 def write_test_footer(tc_name):
1174 """ Display message at end of test case"""
1175 count = 21
1176 logger.info("=" * (len(tc_name) + count))
1177 logger.info("Testcase : %s -> PASSED", tc_name)
1178 logger.info("=" * (len(tc_name) + count))
1179
1180
1181 def interface_status(tgen, topo, input_dict):
1182 """
1183 Delete ip route maps from device
1184 * `tgen` : Topogen object
1185 * `topo` : json file data
1186 * `input_dict` : for which router, route map has to be deleted
1187 Usage
1188 -----
1189 input_dict = {
1190 "r3": {
1191 "interface_list": ['eth1-r1-r2', 'eth2-r1-r3'],
1192 "status": "down"
1193 }
1194 }
1195 Returns
1196 -------
1197 errormsg(str) or True
1198 """
1199 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1200
1201 try:
1202 global frr_cfg
1203 for router in input_dict.keys():
1204
1205 interface_list = input_dict[router]["interface_list"]
1206 status = input_dict[router].setdefault("status", "up")
1207 for intf in interface_list:
1208 rnode = tgen.routers()[router]
1209 interface_set_status(rnode, intf, status)
1210
1211 # Load config to router
1212 load_config_to_router(tgen, router)
1213
1214 except Exception as e:
1215 # handle any exception
1216 logger.error("Error %s occured. Arguments %s.", e.message, e.args)
1217
1218 # Traceback
1219 errormsg = traceback.format_exc()
1220 logger.error(errormsg)
1221 return errormsg
1222
1223 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1224 return True
1225
1226
1227 def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, return_is_dict=False):
1228 """
1229 Retries function execution, if return is an errormsg or exception
1230
1231 * `attempts`: Number of attempts to make
1232 * `wait`: Number of seconds to wait between each attempt
1233 * `return_is_str`: Return val is an errormsg in case of failure
1234 * `initial_wait`: Sleeps for this much seconds before executing function
1235
1236 """
1237
1238 def _retry(func):
1239 @wraps(func)
1240 def func_retry(*args, **kwargs):
1241 _wait = kwargs.pop("wait", wait)
1242 _attempts = kwargs.pop("attempts", attempts)
1243 _attempts = int(_attempts)
1244 if _attempts < 0:
1245 raise ValueError("attempts must be 0 or greater")
1246
1247 if initial_wait > 0:
1248 logger.info("Waiting for [%s]s as initial delay", initial_wait)
1249 sleep(initial_wait)
1250
1251 _return_is_str = kwargs.pop("return_is_str", return_is_str)
1252 _return_is_dict = kwargs.pop("return_is_str", return_is_dict)
1253 for i in range(1, _attempts + 1):
1254 try:
1255 _expected = kwargs.setdefault("expected", True)
1256 kwargs.pop("expected")
1257 ret = func(*args, **kwargs)
1258 logger.debug("Function returned %s" % ret)
1259 if _return_is_str and isinstance(ret, bool) and _expected:
1260 return ret
1261 if (
1262 isinstance(ret, str) or isinstance(ret, unicode)
1263 ) and _expected is False:
1264 return ret
1265 if _return_is_dict and isinstance(ret, dict):
1266 return ret
1267
1268 if _attempts == i:
1269 generate_support_bundle()
1270 return ret
1271 except Exception as err:
1272 if _attempts == i:
1273 generate_support_bundle()
1274 logger.info("Max number of attempts (%r) reached", _attempts)
1275 raise
1276 else:
1277 logger.info("Function returned %s", err)
1278 if i < _attempts:
1279 logger.info("Retry [#%r] after sleeping for %ss" % (i, _wait))
1280 sleep(_wait)
1281
1282 func_retry._original = func
1283 return func_retry
1284
1285 return _retry
1286
1287
1288 class Stepper:
1289 """
1290 Prints step number for the test case step being executed
1291 """
1292
1293 count = 1
1294
1295 def __call__(self, msg, reset):
1296 if reset:
1297 Stepper.count = 1
1298 logger.info(msg)
1299 else:
1300 logger.info("STEP %s: '%s'", Stepper.count, msg)
1301 Stepper.count += 1
1302
1303
1304 def step(msg, reset=False):
1305 """
1306 Call Stepper to print test steps. Need to reset at the beginning of test.
1307 * ` msg` : Step message body.
1308 * `reset` : Reset step count to 1 when set to True.
1309 """
1310 _step = Stepper()
1311 _step(msg, reset)
1312
1313
1314 #############################################
1315 # These APIs, will used by testcase
1316 #############################################
1317 def create_interfaces_cfg(tgen, topo, build=False):
1318 """
1319 Create interface configuration for created topology. Basic Interface
1320 configuration is provided in input json file.
1321
1322 Parameters
1323 ----------
1324 * `tgen` : Topogen object
1325 * `topo` : json file data
1326 * `build` : Only for initial setup phase this is set as True.
1327
1328 Returns
1329 -------
1330 True or False
1331 """
1332 result = False
1333 topo = deepcopy(topo)
1334
1335 try:
1336 for c_router, c_data in topo.iteritems():
1337 interface_data = []
1338 for destRouterLink, data in sorted(c_data["links"].iteritems()):
1339 # Loopback interfaces
1340 if "type" in data and data["type"] == "loopback":
1341 interface_name = destRouterLink
1342 else:
1343 interface_name = data["interface"]
1344
1345 # Include vrf if present
1346 if "vrf" in data:
1347 interface_data.append(
1348 "interface {} vrf {}".format(
1349 str(interface_name), str(data["vrf"])
1350 )
1351 )
1352 else:
1353 interface_data.append("interface {}".format(str(interface_name)))
1354
1355 if "ipv4" in data:
1356 intf_addr = c_data["links"][destRouterLink]["ipv4"]
1357
1358 if "delete" in data and data["delete"]:
1359 interface_data.append("no ip address {}".format(intf_addr))
1360 else:
1361 interface_data.append("ip address {}".format(intf_addr))
1362 if "ipv6" in data:
1363 intf_addr = c_data["links"][destRouterLink]["ipv6"]
1364
1365 if "delete" in data and data["delete"]:
1366 interface_data.append("no ipv6 address {}".format(intf_addr))
1367 else:
1368 interface_data.append("ipv6 address {}".format(intf_addr))
1369
1370 if "ipv6-link-local" in data:
1371 intf_addr = c_data["links"][destRouterLink]["ipv6-link-local"]
1372
1373 if "delete" in data and data["delete"]:
1374 interface_data.append("no ipv6 address {}".format(intf_addr))
1375 else:
1376 interface_data.append("ipv6 address {}\n".format(intf_addr))
1377
1378 result = create_common_configuration(
1379 tgen, c_router, interface_data, "interface_config", build=build
1380 )
1381 except InvalidCLIError:
1382 # Traceback
1383 errormsg = traceback.format_exc()
1384 logger.error(errormsg)
1385 return errormsg
1386
1387 return result
1388
1389
1390 def create_static_routes(tgen, input_dict, build=False):
1391 """
1392 Create static routes for given router as defined in input_dict
1393
1394 Parameters
1395 ----------
1396 * `tgen` : Topogen object
1397 * `input_dict` : Input dict data, required when configuring from testcase
1398 * `build` : Only for initial setup phase this is set as True.
1399
1400 Usage
1401 -----
1402 input_dict should be in the format below:
1403 # static_routes: list of all routes
1404 # network: network address
1405 # no_of_ip: number of next-hop address that will be configured
1406 # admin_distance: admin distance for route/routes.
1407 # next_hop: starting next-hop address
1408 # tag: tag id for static routes
1409 # vrf: VRF name in which static routes needs to be created
1410 # delete: True if config to be removed. Default False.
1411
1412 Example:
1413 "routers": {
1414 "r1": {
1415 "static_routes": [
1416 {
1417 "network": "100.0.20.1/32",
1418 "no_of_ip": 9,
1419 "admin_distance": 100,
1420 "next_hop": "10.0.0.1",
1421 "tag": 4001,
1422 "vrf": "RED_A"
1423 "delete": true
1424 }
1425 ]
1426 }
1427 }
1428
1429 Returns
1430 -------
1431 errormsg(str) or True
1432 """
1433 result = False
1434 logger.debug("Entering lib API: create_static_routes()")
1435 input_dict = deepcopy(input_dict)
1436
1437 try:
1438 for router in input_dict.keys():
1439 if "static_routes" not in input_dict[router]:
1440 errormsg = "static_routes not present in input_dict"
1441 logger.info(errormsg)
1442 continue
1443
1444 static_routes_list = []
1445
1446 static_routes = input_dict[router]["static_routes"]
1447 for static_route in static_routes:
1448 del_action = static_route.setdefault("delete", False)
1449 no_of_ip = static_route.setdefault("no_of_ip", 1)
1450 network = static_route.setdefault("network", [])
1451 if type(network) is not list:
1452 network = [network]
1453
1454 admin_distance = static_route.setdefault("admin_distance", None)
1455 tag = static_route.setdefault("tag", None)
1456 vrf = static_route.setdefault("vrf", None)
1457 interface = static_route.setdefault("interface", None)
1458 next_hop = static_route.setdefault("next_hop", None)
1459 nexthop_vrf = static_route.setdefault("nexthop_vrf", None)
1460
1461 ip_list = generate_ips(network, no_of_ip)
1462 for ip in ip_list:
1463 addr_type = validate_ip_address(ip)
1464
1465 if addr_type == "ipv4":
1466 cmd = "ip route {}".format(ip)
1467 else:
1468 cmd = "ipv6 route {}".format(ip)
1469
1470 if interface:
1471 cmd = "{} {}".format(cmd, interface)
1472
1473 if next_hop:
1474 cmd = "{} {}".format(cmd, next_hop)
1475
1476 if nexthop_vrf:
1477 cmd = "{} nexthop-vrf {}".format(cmd, nexthop_vrf)
1478
1479 if vrf:
1480 cmd = "{} vrf {}".format(cmd, vrf)
1481
1482 if tag:
1483 cmd = "{} tag {}".format(cmd, str(tag))
1484
1485 if admin_distance:
1486 cmd = "{} {}".format(cmd, admin_distance)
1487
1488 if del_action:
1489 cmd = "no {}".format(cmd)
1490
1491 static_routes_list.append(cmd)
1492
1493 result = create_common_configuration(
1494 tgen, router, static_routes_list, "static_route", build=build
1495 )
1496
1497 except InvalidCLIError:
1498 # Traceback
1499 errormsg = traceback.format_exc()
1500 logger.error(errormsg)
1501 return errormsg
1502
1503 logger.debug("Exiting lib API: create_static_routes()")
1504 return result
1505
1506
1507 def create_prefix_lists(tgen, input_dict, build=False):
1508 """
1509 Create ip prefix lists as per the config provided in input
1510 JSON or input_dict
1511 Parameters
1512 ----------
1513 * `tgen` : Topogen object
1514 * `input_dict` : Input dict data, required when configuring from testcase
1515 * `build` : Only for initial setup phase this is set as True.
1516 Usage
1517 -----
1518 # pf_lists_1: name of prefix-list, user defined
1519 # seqid: prefix-list seqid, auto-generated if not given by user
1520 # network: criteria for applying prefix-list
1521 # action: permit/deny
1522 # le: less than or equal number of bits
1523 # ge: greater than or equal number of bits
1524 Example
1525 -------
1526 input_dict = {
1527 "r1": {
1528 "prefix_lists":{
1529 "ipv4": {
1530 "pf_list_1": [
1531 {
1532 "seqid": 10,
1533 "network": "any",
1534 "action": "permit",
1535 "le": "32",
1536 "ge": "30",
1537 "delete": True
1538 }
1539 ]
1540 }
1541 }
1542 }
1543 }
1544 Returns
1545 -------
1546 errormsg or True
1547 """
1548
1549 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1550 result = False
1551 try:
1552 for router in input_dict.keys():
1553 if "prefix_lists" not in input_dict[router]:
1554 errormsg = "prefix_lists not present in input_dict"
1555 logger.debug(errormsg)
1556 continue
1557
1558 config_data = []
1559 prefix_lists = input_dict[router]["prefix_lists"]
1560 for addr_type, prefix_data in prefix_lists.iteritems():
1561 if not check_address_types(addr_type):
1562 continue
1563
1564 for prefix_name, prefix_list in prefix_data.iteritems():
1565 for prefix_dict in prefix_list:
1566 if "action" not in prefix_dict or "network" not in prefix_dict:
1567 errormsg = "'action' or network' missing in" " input_dict"
1568 return errormsg
1569
1570 network_addr = prefix_dict["network"]
1571 action = prefix_dict["action"]
1572 le = prefix_dict.setdefault("le", None)
1573 ge = prefix_dict.setdefault("ge", None)
1574 seqid = prefix_dict.setdefault("seqid", None)
1575 del_action = prefix_dict.setdefault("delete", False)
1576 if seqid is None:
1577 seqid = get_seq_id("prefix_lists", router, prefix_name)
1578 else:
1579 set_seq_id("prefix_lists", router, seqid, prefix_name)
1580
1581 if addr_type == "ipv4":
1582 protocol = "ip"
1583 else:
1584 protocol = "ipv6"
1585
1586 cmd = "{} prefix-list {} seq {} {} {}".format(
1587 protocol, prefix_name, seqid, action, network_addr
1588 )
1589 if le:
1590 cmd = "{} le {}".format(cmd, le)
1591 if ge:
1592 cmd = "{} ge {}".format(cmd, ge)
1593
1594 if del_action:
1595 cmd = "no {}".format(cmd)
1596
1597 config_data.append(cmd)
1598 result = create_common_configuration(
1599 tgen, router, config_data, "prefix_list", build=build
1600 )
1601
1602 except InvalidCLIError:
1603 # Traceback
1604 errormsg = traceback.format_exc()
1605 logger.error(errormsg)
1606 return errormsg
1607
1608 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1609 return result
1610
1611
1612 def create_route_maps(tgen, input_dict, build=False):
1613 """
1614 Create route-map on the devices as per the arguments passed
1615 Parameters
1616 ----------
1617 * `tgen` : Topogen object
1618 * `input_dict` : Input dict data, required when configuring from testcase
1619 * `build` : Only for initial setup phase this is set as True.
1620 Usage
1621 -----
1622 # route_maps: key, value pair for route-map name and its attribute
1623 # rmap_match_prefix_list_1: user given name for route-map
1624 # action: PERMIT/DENY
1625 # match: key,value pair for match criteria. prefix_list, community-list,
1626 large-community-list or tag. Only one option at a time.
1627 # prefix_list: name of prefix list
1628 # large-community-list: name of large community list
1629 # community-ist: name of community list
1630 # tag: tag id for static routes
1631 # set: key, value pair for modifying route attributes
1632 # localpref: preference value for the network
1633 # med: metric value advertised for AS
1634 # aspath: set AS path value
1635 # weight: weight for the route
1636 # community: standard community value to be attached
1637 # large_community: large community value to be attached
1638 # community_additive: if set to "additive", adds community/large-community
1639 value to the existing values of the network prefix
1640 Example:
1641 --------
1642 input_dict = {
1643 "r1": {
1644 "route_maps": {
1645 "rmap_match_prefix_list_1": [
1646 {
1647 "action": "PERMIT",
1648 "match": {
1649 "ipv4": {
1650 "prefix_list": "pf_list_1"
1651 }
1652 "ipv6": {
1653 "prefix_list": "pf_list_1"
1654 }
1655 "large-community-list": {
1656 "id": "community_1",
1657 "exact_match": True
1658 }
1659 "community_list": {
1660 "id": "community_2",
1661 "exact_match": True
1662 }
1663 "tag": "tag_id"
1664 },
1665 "set": {
1666 "locPrf": 150,
1667 "metric": 30,
1668 "path": {
1669 "num": 20000,
1670 "action": "prepend",
1671 },
1672 "weight": 500,
1673 "community": {
1674 "num": "1:2 2:3",
1675 "action": additive
1676 }
1677 "large_community": {
1678 "num": "1:2:3 4:5;6",
1679 "action": additive
1680 },
1681 }
1682 }
1683 ]
1684 }
1685 }
1686 }
1687 Returns
1688 -------
1689 errormsg(str) or True
1690 """
1691
1692 result = False
1693 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1694 input_dict = deepcopy(input_dict)
1695 try:
1696 for router in input_dict.keys():
1697 if "route_maps" not in input_dict[router]:
1698 logger.debug("route_maps not present in input_dict")
1699 continue
1700 rmap_data = []
1701 for rmap_name, rmap_value in input_dict[router]["route_maps"].iteritems():
1702
1703 for rmap_dict in rmap_value:
1704 del_action = rmap_dict.setdefault("delete", False)
1705
1706 if del_action:
1707 rmap_data.append("no route-map {}".format(rmap_name))
1708 continue
1709
1710 if "action" not in rmap_dict:
1711 errormsg = "action not present in input_dict"
1712 logger.error(errormsg)
1713 return False
1714
1715 rmap_action = rmap_dict.setdefault("action", "deny")
1716
1717 seq_id = rmap_dict.setdefault("seq_id", None)
1718 if seq_id is None:
1719 seq_id = get_seq_id("route_maps", router, rmap_name)
1720 else:
1721 set_seq_id("route_maps", router, seq_id, rmap_name)
1722
1723 rmap_data.append(
1724 "route-map {} {} {}".format(rmap_name, rmap_action, seq_id)
1725 )
1726
1727 if "continue" in rmap_dict:
1728 continue_to = rmap_dict["continue"]
1729 if continue_to:
1730 rmap_data.append("on-match goto {}".format(continue_to))
1731 else:
1732 logger.error(
1733 "In continue, 'route-map entry "
1734 "sequence number' is not provided"
1735 )
1736 return False
1737
1738 if "goto" in rmap_dict:
1739 go_to = rmap_dict["goto"]
1740 if go_to:
1741 rmap_data.append("on-match goto {}".format(go_to))
1742 else:
1743 logger.error(
1744 "In goto, 'Goto Clause number' is not" " provided"
1745 )
1746 return False
1747
1748 if "call" in rmap_dict:
1749 call_rmap = rmap_dict["call"]
1750 if call_rmap:
1751 rmap_data.append("call {}".format(call_rmap))
1752 else:
1753 logger.error(
1754 "In call, 'destination Route-Map' is" " not provided"
1755 )
1756 return False
1757
1758 # Verifying if SET criteria is defined
1759 if "set" in rmap_dict:
1760 set_data = rmap_dict["set"]
1761 ipv4_data = set_data.setdefault("ipv4", {})
1762 ipv6_data = set_data.setdefault("ipv6", {})
1763 local_preference = set_data.setdefault("locPrf", None)
1764 metric = set_data.setdefault("metric", None)
1765 as_path = set_data.setdefault("path", {})
1766 weight = set_data.setdefault("weight", None)
1767 community = set_data.setdefault("community", {})
1768 large_community = set_data.setdefault("large_community", {})
1769 large_comm_list = set_data.setdefault("large_comm_list", {})
1770 set_action = set_data.setdefault("set_action", None)
1771 nexthop = set_data.setdefault("nexthop", None)
1772 origin = set_data.setdefault("origin", None)
1773 ext_comm_list = set_data.setdefault("extcommunity", {})
1774
1775 # Local Preference
1776 if local_preference:
1777 rmap_data.append(
1778 "set local-preference {}".format(local_preference)
1779 )
1780
1781 # Metric
1782 if metric:
1783 rmap_data.append("set metric {} \n".format(metric))
1784
1785 # Origin
1786 if origin:
1787 rmap_data.append("set origin {} \n".format(origin))
1788
1789 # AS Path Prepend
1790 if as_path:
1791 as_num = as_path.setdefault("as_num", None)
1792 as_action = as_path.setdefault("as_action", None)
1793 if as_action and as_num:
1794 rmap_data.append(
1795 "set as-path {} {}".format(as_action, as_num)
1796 )
1797
1798 # Community
1799 if community:
1800 num = community.setdefault("num", None)
1801 comm_action = community.setdefault("action", None)
1802 if num:
1803 cmd = "set community {}".format(num)
1804 if comm_action:
1805 cmd = "{} {}".format(cmd, comm_action)
1806 rmap_data.append(cmd)
1807 else:
1808 logger.error("In community, AS Num not" " provided")
1809 return False
1810
1811 if large_community:
1812 num = large_community.setdefault("num", None)
1813 comm_action = large_community.setdefault("action", None)
1814 if num:
1815 cmd = "set large-community {}".format(num)
1816 if comm_action:
1817 cmd = "{} {}".format(cmd, comm_action)
1818
1819 rmap_data.append(cmd)
1820 else:
1821 logger.error(
1822 "In large_community, AS Num not" " provided"
1823 )
1824 return False
1825 if large_comm_list:
1826 id = large_comm_list.setdefault("id", None)
1827 del_comm = large_comm_list.setdefault("delete", None)
1828 if id:
1829 cmd = "set large-comm-list {}".format(id)
1830 if del_comm:
1831 cmd = "{} delete".format(cmd)
1832
1833 rmap_data.append(cmd)
1834 else:
1835 logger.error("In large_comm_list 'id' not" " provided")
1836 return False
1837
1838 if ext_comm_list:
1839 rt = ext_comm_list.setdefault("rt", None)
1840 del_comm = ext_comm_list.setdefault("delete", None)
1841 if rt:
1842 cmd = "set extcommunity rt {}".format(rt)
1843 if del_comm:
1844 cmd = "{} delete".format(cmd)
1845
1846 rmap_data.append(cmd)
1847 else:
1848 logger.debug("In ext_comm_list 'rt' not" " provided")
1849 return False
1850
1851 # Weight
1852 if weight:
1853 rmap_data.append("set weight {}".format(weight))
1854 if ipv6_data:
1855 nexthop = ipv6_data.setdefault("nexthop", None)
1856 if nexthop:
1857 rmap_data.append("set ipv6 next-hop {}".format(nexthop))
1858
1859 # Adding MATCH and SET sequence to RMAP if defined
1860 if "match" in rmap_dict:
1861 match_data = rmap_dict["match"]
1862 ipv4_data = match_data.setdefault("ipv4", {})
1863 ipv6_data = match_data.setdefault("ipv6", {})
1864 community = match_data.setdefault("community_list", {})
1865 large_community = match_data.setdefault("large_community", {})
1866 large_community_list = match_data.setdefault(
1867 "large_community_list", {}
1868 )
1869
1870 metric = match_data.setdefault("metric", None)
1871 source_vrf = match_data.setdefault("source-vrf", None)
1872
1873 if ipv4_data:
1874 # fetch prefix list data from rmap
1875 prefix_name = ipv4_data.setdefault("prefix_lists", None)
1876 if prefix_name:
1877 rmap_data.append(
1878 "match ip address"
1879 " prefix-list {}".format(prefix_name)
1880 )
1881
1882 # fetch tag data from rmap
1883 tag = ipv4_data.setdefault("tag", None)
1884 if tag:
1885 rmap_data.append("match tag {}".format(tag))
1886
1887 # fetch large community data from rmap
1888 large_community_list = ipv4_data.setdefault(
1889 "large_community_list", {}
1890 )
1891 large_community = match_data.setdefault(
1892 "large_community", {}
1893 )
1894
1895 if ipv6_data:
1896 prefix_name = ipv6_data.setdefault("prefix_lists", None)
1897 if prefix_name:
1898 rmap_data.append(
1899 "match ipv6 address"
1900 " prefix-list {}".format(prefix_name)
1901 )
1902
1903 # fetch tag data from rmap
1904 tag = ipv6_data.setdefault("tag", None)
1905 if tag:
1906 rmap_data.append("match tag {}".format(tag))
1907
1908 # fetch large community data from rmap
1909 large_community_list = ipv6_data.setdefault(
1910 "large_community_list", {}
1911 )
1912 large_community = match_data.setdefault(
1913 "large_community", {}
1914 )
1915
1916 if community:
1917 if "id" not in community:
1918 logger.error(
1919 "'id' is mandatory for "
1920 "community-list in match"
1921 " criteria"
1922 )
1923 return False
1924 cmd = "match community {}".format(community["id"])
1925 exact_match = community.setdefault("exact_match", False)
1926 if exact_match:
1927 cmd = "{} exact-match".format(cmd)
1928
1929 rmap_data.append(cmd)
1930 if large_community:
1931 if "id" not in large_community:
1932 logger.error(
1933 "'id' is mandatory for "
1934 "large-community-list in match "
1935 "criteria"
1936 )
1937 return False
1938 cmd = "match large-community {}".format(
1939 large_community["id"]
1940 )
1941 exact_match = large_community.setdefault(
1942 "exact_match", False
1943 )
1944 if exact_match:
1945 cmd = "{} exact-match".format(cmd)
1946 rmap_data.append(cmd)
1947 if large_community_list:
1948 if "id" not in large_community_list:
1949 logger.error(
1950 "'id' is mandatory for "
1951 "large-community-list in match "
1952 "criteria"
1953 )
1954 return False
1955 cmd = "match large-community {}".format(
1956 large_community_list["id"]
1957 )
1958 exact_match = large_community_list.setdefault(
1959 "exact_match", False
1960 )
1961 if exact_match:
1962 cmd = "{} exact-match".format(cmd)
1963 rmap_data.append(cmd)
1964
1965 if source_vrf:
1966 cmd = "match source-vrf {}".format(source_vrf)
1967 rmap_data.append(cmd)
1968
1969 if metric:
1970 cmd = "match metric {}".format(metric)
1971 rmap_data.append(cmd)
1972
1973 result = create_common_configuration(
1974 tgen, router, rmap_data, "route_maps", build=build
1975 )
1976
1977 except InvalidCLIError:
1978 # Traceback
1979 errormsg = traceback.format_exc()
1980 logger.error(errormsg)
1981 return errormsg
1982
1983 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1984 return result
1985
1986
1987 def delete_route_maps(tgen, input_dict):
1988 """
1989 Delete ip route maps from device
1990 * `tgen` : Topogen object
1991 * `input_dict` : for which router,
1992 route map has to be deleted
1993 Usage
1994 -----
1995 # Delete route-map rmap_1 and rmap_2 from router r1
1996 input_dict = {
1997 "r1": {
1998 "route_maps": ["rmap_1", "rmap__2"]
1999 }
2000 }
2001 result = delete_route_maps("ipv4", input_dict)
2002 Returns
2003 -------
2004 errormsg(str) or True
2005 """
2006 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2007
2008 for router in input_dict.keys():
2009 route_maps = input_dict[router]["route_maps"][:]
2010 rmap_data = input_dict[router]
2011 rmap_data["route_maps"] = {}
2012 for route_map_name in route_maps:
2013 rmap_data["route_maps"].update({route_map_name: [{"delete": True}]})
2014
2015 return create_route_maps(tgen, input_dict)
2016
2017
2018 def create_bgp_community_lists(tgen, input_dict, build=False):
2019 """
2020 Create bgp community-list or large-community-list on the devices as per
2021 the arguments passed. Takes list of communities in input.
2022 Parameters
2023 ----------
2024 * `tgen` : Topogen object
2025 * `input_dict` : Input dict data, required when configuring from testcase
2026 * `build` : Only for initial setup phase this is set as True.
2027 Usage
2028 -----
2029 input_dict_1 = {
2030 "r3": {
2031 "bgp_community_lists": [
2032 {
2033 "community_type": "standard",
2034 "action": "permit",
2035 "name": "rmap_lcomm_{}".format(addr_type),
2036 "value": "1:1:1 1:2:3 2:1:1 2:2:2",
2037 "large": True
2038 }
2039 ]
2040 }
2041 }
2042 }
2043 result = create_bgp_community_lists(tgen, input_dict_1)
2044 """
2045
2046 result = False
2047 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2048 input_dict = deepcopy(input_dict)
2049 try:
2050 for router in input_dict.keys():
2051 if "bgp_community_lists" not in input_dict[router]:
2052 errormsg = "bgp_community_lists not present in input_dict"
2053 logger.debug(errormsg)
2054 continue
2055
2056 config_data = []
2057
2058 community_list = input_dict[router]["bgp_community_lists"]
2059 for community_dict in community_list:
2060 del_action = community_dict.setdefault("delete", False)
2061 community_type = community_dict.setdefault("community_type", None)
2062 action = community_dict.setdefault("action", None)
2063 value = community_dict.setdefault("value", "")
2064 large = community_dict.setdefault("large", None)
2065 name = community_dict.setdefault("name", None)
2066 if large:
2067 cmd = "bgp large-community-list"
2068 else:
2069 cmd = "bgp community-list"
2070
2071 if not large and not (community_type and action and value):
2072 errormsg = (
2073 "community_type, action and value are "
2074 "required in bgp_community_list"
2075 )
2076 logger.error(errormsg)
2077 return False
2078
2079 try:
2080 community_type = int(community_type)
2081 cmd = "{} {} {} {}".format(cmd, community_type, action, value)
2082 except ValueError:
2083
2084 cmd = "{} {} {} {} {}".format(
2085 cmd, community_type, name, action, value
2086 )
2087
2088 if del_action:
2089 cmd = "no {}".format(cmd)
2090
2091 config_data.append(cmd)
2092
2093 result = create_common_configuration(
2094 tgen, router, config_data, "bgp_community_list", build=build
2095 )
2096
2097 except InvalidCLIError:
2098 # Traceback
2099 errormsg = traceback.format_exc()
2100 logger.error(errormsg)
2101 return errormsg
2102
2103 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2104 return result
2105
2106
2107 def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False):
2108 """
2109 Shutdown or bringup router's interface "
2110 * `tgen` : Topogen object
2111 * `dut` : Device under test
2112 * `intf_name` : Interface name to be shut/no shut
2113 * `ifaceaction` : Action, to shut/no shut interface,
2114 by default is False
2115 Usage
2116 -----
2117 dut = "r3"
2118 intf = "r3-r1-eth0"
2119 # Shut down ineterface
2120 shutdown_bringup_interface(tgen, dut, intf, False)
2121 # Bring up ineterface
2122 shutdown_bringup_interface(tgen, dut, intf, True)
2123 Returns
2124 -------
2125 errormsg(str) or True
2126 """
2127
2128 router_list = tgen.routers()
2129 if ifaceaction:
2130 logger.info("Bringing up interface : {}".format(intf_name))
2131 else:
2132 logger.info("Shutting down interface : {}".format(intf_name))
2133
2134 interface_set_status(router_list[dut], intf_name, ifaceaction)
2135
2136
2137 def addKernelRoute(
2138 tgen, router, intf, group_addr_range, next_hop=None, src=None, del_action=None
2139 ):
2140 """
2141 Add route to kernel
2142
2143 Parameters:
2144 -----------
2145 * `tgen` : Topogen object
2146 * `router`: router for which kernal routes needs to be added
2147 * `intf`: interface name, for which kernal routes needs to be added
2148 * `bindToAddress`: bind to <host>, an interface or multicast
2149 address
2150
2151 returns:
2152 --------
2153 errormsg or True
2154 """
2155
2156 logger.debug("Entering lib API: addKernelRoute()")
2157
2158 rnode = tgen.routers()[router]
2159
2160 if type(group_addr_range) is not list:
2161 group_addr_range = [group_addr_range]
2162
2163 for grp_addr in group_addr_range:
2164
2165 addr_type = validate_ip_address(grp_addr)
2166 if addr_type == "ipv4":
2167 if next_hop is not None:
2168 cmd = "ip route add {} via {}".format(grp_addr, next_hop)
2169 else:
2170 cmd = "ip route add {} dev {}".format(grp_addr, intf)
2171 if del_action:
2172 cmd = "ip route del {}".format(grp_addr)
2173 verify_cmd = "ip route"
2174 elif addr_type == "ipv6":
2175 if intf and src:
2176 cmd = "ip -6 route add {} dev {} src {}".format(grp_addr, intf, src)
2177 else:
2178 cmd = "ip -6 route add {} via {}".format(grp_addr, next_hop)
2179 verify_cmd = "ip -6 route"
2180 if del_action:
2181 cmd = "ip -6 route del {}".format(grp_addr)
2182
2183 logger.info("[DUT: {}]: Running command: [{}]".format(router, cmd))
2184 output = rnode.run(cmd)
2185
2186 # Verifying if ip route added to kernal
2187 result = rnode.run(verify_cmd)
2188 logger.debug("{}\n{}".format(verify_cmd, result))
2189 if "/" in grp_addr:
2190 ip, mask = grp_addr.split("/")
2191 if mask == "32" or mask == "128":
2192 grp_addr = ip
2193
2194 if not re_search(r"{}".format(grp_addr), result) and mask is not "0":
2195 errormsg = (
2196 "[DUT: {}]: Kernal route is not added for group"
2197 " address {} Config output: {}".format(router, grp_addr, output)
2198 )
2199
2200 return errormsg
2201
2202 logger.debug("Exiting lib API: addKernelRoute()")
2203 return True
2204
2205
2206 def configure_vxlan(tgen, input_dict):
2207 """
2208 Add and configure vxlan
2209
2210 * `tgen`: tgen onject
2211 * `input_dict` : data for vxlan config
2212
2213 Usage:
2214 ------
2215 input_dict= {
2216 "dcg2":{
2217 "vxlan":[{
2218 "vxlan_name": "vxlan75100",
2219 "vxlan_id": "75100",
2220 "dstport": 4789,
2221 "local_addr": "120.0.0.1",
2222 "learning": "no",
2223 "delete": True
2224 }]
2225 }
2226 }
2227
2228 configure_vxlan(tgen, input_dict)
2229
2230 Returns:
2231 -------
2232 True or errormsg
2233
2234 """
2235
2236 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2237
2238 router_list = tgen.routers()
2239 for dut in input_dict.keys():
2240 rnode = tgen.routers()[dut]
2241
2242 if "vxlan" in input_dict[dut]:
2243 for vxlan_dict in input_dict[dut]["vxlan"]:
2244 cmd = "ip link "
2245
2246 del_vxlan = vxlan_dict.setdefault("delete", None)
2247 vxlan_names = vxlan_dict.setdefault("vxlan_name", [])
2248 vxlan_ids = vxlan_dict.setdefault("vxlan_id", [])
2249 dstport = vxlan_dict.setdefault("dstport", None)
2250 local_addr = vxlan_dict.setdefault("local_addr", None)
2251 learning = vxlan_dict.setdefault("learning", None)
2252
2253 config_data = []
2254 if vxlan_names and vxlan_ids:
2255 for vxlan_name, vxlan_id in zip(vxlan_names, vxlan_ids):
2256 cmd = "ip link"
2257
2258 if del_vxlan:
2259 cmd = "{} del {} type vxlan id {}".format(
2260 cmd, vxlan_name, vxlan_id
2261 )
2262 else:
2263 cmd = "{} add {} type vxlan id {}".format(
2264 cmd, vxlan_name, vxlan_id
2265 )
2266
2267 if dstport:
2268 cmd = "{} dstport {}".format(cmd, dstport)
2269
2270 if local_addr:
2271 ip_cmd = "ip addr add {} dev {}".format(
2272 local_addr, vxlan_name
2273 )
2274 if del_vxlan:
2275 ip_cmd = "ip addr del {} dev {}".format(
2276 local_addr, vxlan_name
2277 )
2278
2279 config_data.append(ip_cmd)
2280
2281 cmd = "{} local {}".format(cmd, local_addr)
2282
2283 if learning == "no":
2284 cmd = "{} {} learning".format(cmd, learning)
2285
2286 elif learning == "yes":
2287 cmd = "{} learning".format(cmd)
2288
2289 config_data.append(cmd)
2290
2291 try:
2292 for _cmd in config_data:
2293 logger.info("[DUT: %s]: Running command: %s", dut, _cmd)
2294 rnode.run(_cmd)
2295
2296 except InvalidCLIError:
2297 # Traceback
2298 errormsg = traceback.format_exc()
2299 logger.error(errormsg)
2300 return errormsg
2301
2302 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2303
2304 return True
2305
2306
2307 def configure_brctl(tgen, topo, input_dict):
2308 """
2309 Add and configure brctl
2310
2311 * `tgen`: tgen onject
2312 * `input_dict` : data for brctl config
2313
2314 Usage:
2315 ------
2316 input_dict= {
2317 dut:{
2318 "brctl": [{
2319 "brctl_name": "br100",
2320 "addvxlan": "vxlan75100",
2321 "vrf": "RED",
2322 "stp": "off"
2323 }]
2324 }
2325 }
2326
2327 configure_brctl(tgen, topo, input_dict)
2328
2329 Returns:
2330 -------
2331 True or errormsg
2332
2333 """
2334
2335 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2336
2337 router_list = tgen.routers()
2338 for dut in input_dict.keys():
2339 rnode = tgen.routers()[dut]
2340
2341 if "brctl" in input_dict[dut]:
2342 for brctl_dict in input_dict[dut]["brctl"]:
2343
2344 brctl_names = brctl_dict.setdefault("brctl_name", [])
2345 addvxlans = brctl_dict.setdefault("addvxlan", [])
2346 stp_values = brctl_dict.setdefault("stp", [])
2347 vrfs = brctl_dict.setdefault("vrf", [])
2348
2349 ip_cmd = "ip link set"
2350 for brctl_name, vxlan, vrf, stp in zip(
2351 brctl_names, addvxlans, vrfs, stp_values
2352 ):
2353
2354 ip_cmd_list = []
2355 cmd = "brctl addbr {}".format(brctl_name)
2356
2357 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
2358 rnode.run(cmd)
2359
2360 ip_cmd_list.append("{} up dev {}".format(ip_cmd, brctl_name))
2361
2362 if vxlan:
2363 cmd = "brctl addif {} {}".format(brctl_name, vxlan)
2364
2365 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
2366 rnode.run(cmd)
2367
2368 ip_cmd_list.append("{} up dev {}".format(ip_cmd, vxlan))
2369
2370 if stp:
2371 cmd = "brctl stp {} {}".format(brctl_name, stp)
2372
2373 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
2374 rnode.run(cmd)
2375
2376 if vrf:
2377 ip_cmd_list.append(
2378 "{} dev {} master {}".format(ip_cmd, brctl_name, vrf)
2379 )
2380
2381 for intf_name, data in topo["routers"][dut]["links"].items():
2382 if "vrf" not in data:
2383 continue
2384
2385 if data["vrf"] == vrf:
2386 ip_cmd_list.append(
2387 "{} up dev {}".format(ip_cmd, data["interface"])
2388 )
2389
2390 try:
2391 for _ip_cmd in ip_cmd_list:
2392 logger.info("[DUT: %s]: Running command: %s", dut, _ip_cmd)
2393 rnode.run(_ip_cmd)
2394
2395 except InvalidCLIError:
2396 # Traceback
2397 errormsg = traceback.format_exc()
2398 logger.error(errormsg)
2399 return errormsg
2400
2401 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2402 return True
2403
2404
2405 def configure_interface_mac(tgen, input_dict):
2406 """
2407 Add and configure brctl
2408
2409 * `tgen`: tgen onject
2410 * `input_dict` : data for mac config
2411
2412 input_mac= {
2413 "edge1":{
2414 "br75100": "00:80:48:BA:d1:00,
2415 "br75200": "00:80:48:BA:d1:00
2416 }
2417 }
2418
2419 configure_interface_mac(tgen, input_mac)
2420
2421 Returns:
2422 -------
2423 True or errormsg
2424
2425 """
2426
2427 router_list = tgen.routers()
2428 for dut in input_dict.keys():
2429 rnode = tgen.routers()[dut]
2430
2431 for intf, mac in input_dict[dut].items():
2432 cmd = "ifconfig {} hw ether {}".format(intf, mac)
2433 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
2434
2435 try:
2436 result = rnode.run(cmd)
2437 if len(result) != 0:
2438 return result
2439
2440 except InvalidCLIError:
2441 # Traceback
2442 errormsg = traceback.format_exc()
2443 logger.error(errormsg)
2444 return errormsg
2445
2446 return True
2447
2448
2449 #############################################
2450 # Verification APIs
2451 #############################################
2452 @retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
2453 def verify_rib(
2454 tgen,
2455 addr_type,
2456 dut,
2457 input_dict,
2458 next_hop=None,
2459 protocol=None,
2460 tag=None,
2461 metric=None,
2462 fib=None,
2463 ):
2464 """
2465 Data will be read from input_dict or input JSON file, API will generate
2466 same prefixes, which were redistributed by either create_static_routes() or
2467 advertise_networks_using_network_command() and do will verify next_hop and
2468 each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
2469 command o/p.
2470
2471 Parameters
2472 ----------
2473 * `tgen` : topogen object
2474 * `addr_type` : ip type, ipv4/ipv6
2475 * `dut`: Device Under Test, for which user wants to test the data
2476 * `input_dict` : input dict, has details of static routes
2477 * `next_hop`[optional]: next_hop which needs to be verified,
2478 default: static
2479 * `protocol`[optional]: protocol, default = None
2480
2481 Usage
2482 -----
2483 # RIB can be verified for static routes OR network advertised using
2484 network command. Following are input_dicts to create static routes
2485 and advertise networks using network command. Any one of the input_dict
2486 can be passed to verify_rib() to verify routes in DUT"s RIB.
2487
2488 # Creating static routes for r1
2489 input_dict = {
2490 "r1": {
2491 "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
2492 "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
2493 }}
2494 # Advertising networks using network command in router r1
2495 input_dict = {
2496 "r1": {
2497 "advertise_networks": [{"start_ip": "20.0.0.0/32",
2498 "no_of_network": 10},
2499 {"start_ip": "30.0.0.0/32"}]
2500 }}
2501 # Verifying ipv4 routes in router r1 learned via BGP
2502 dut = "r2"
2503 protocol = "bgp"
2504 result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
2505
2506 Returns
2507 -------
2508 errormsg(str) or True
2509 """
2510
2511 logger.info("Entering lib API: verify_rib()")
2512
2513 router_list = tgen.routers()
2514 additional_nexthops_in_required_nhs = []
2515 found_hops = []
2516 for routerInput in input_dict.keys():
2517 for router, rnode in router_list.iteritems():
2518 if router != dut:
2519 continue
2520
2521 logger.info("Checking router %s RIB:", router)
2522
2523 # Verifying RIB routes
2524 if addr_type == "ipv4":
2525 command = "show ip route"
2526 else:
2527 command = "show ipv6 route"
2528
2529 found_routes = []
2530 missing_routes = []
2531
2532 if "static_routes" in input_dict[routerInput]:
2533 static_routes = input_dict[routerInput]["static_routes"]
2534
2535 for static_route in static_routes:
2536 if "vrf" in static_route and static_route["vrf"] is not None:
2537
2538 logger.info(
2539 "[DUT: {}]: Verifying routes for VRF:"
2540 " {}".format(router, static_route["vrf"])
2541 )
2542
2543 cmd = "{} vrf {}".format(command, static_route["vrf"])
2544
2545 else:
2546 cmd = "{}".format(command)
2547
2548 if protocol:
2549 cmd = "{} {}".format(cmd, protocol)
2550
2551 cmd = "{} json".format(cmd)
2552
2553 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2554
2555 # Verifying output dictionary rib_routes_json is not empty
2556 if bool(rib_routes_json) is False:
2557 errormsg = "No route found in rib of router {}..".format(router)
2558 return errormsg
2559
2560 network = static_route["network"]
2561 if "no_of_ip" in static_route:
2562 no_of_ip = static_route["no_of_ip"]
2563 else:
2564 no_of_ip = 1
2565
2566 if "tag" in static_route:
2567 _tag = static_route["tag"]
2568 else:
2569 _tag = None
2570
2571 # Generating IPs for verification
2572 ip_list = generate_ips(network, no_of_ip)
2573 st_found = False
2574 nh_found = False
2575
2576 for st_rt in ip_list:
2577 st_rt = str(ipaddress.ip_network(unicode(st_rt)))
2578
2579 _addr_type = validate_ip_address(st_rt)
2580 if _addr_type != addr_type:
2581 continue
2582
2583 if st_rt in rib_routes_json:
2584 st_found = True
2585 found_routes.append(st_rt)
2586
2587 if fib and next_hop:
2588 if type(next_hop) is not list:
2589 next_hop = [next_hop]
2590
2591 for mnh in range(0, len(rib_routes_json[st_rt])):
2592 if (
2593 "fib"
2594 in rib_routes_json[st_rt][mnh]["nexthops"][0]
2595 ):
2596 found_hops.append(
2597 [
2598 rib_r["ip"]
2599 for rib_r in rib_routes_json[st_rt][
2600 mnh
2601 ]["nexthops"]
2602 ]
2603 )
2604
2605 if found_hops[0]:
2606 missing_list_of_nexthops = set(
2607 found_hops[0]
2608 ).difference(next_hop)
2609 additional_nexthops_in_required_nhs = set(
2610 next_hop
2611 ).difference(found_hops[0])
2612
2613 if additional_nexthops_in_required_nhs:
2614 logger.info(
2615 "Nexthop "
2616 "%s is not active for route %s in "
2617 "RIB of router %s\n",
2618 additional_nexthops_in_required_nhs,
2619 st_rt,
2620 dut,
2621 )
2622 errormsg = (
2623 "Nexthop {} is not active"
2624 " for route {} in RIB of router"
2625 " {}\n".format(
2626 additional_nexthops_in_required_nhs,
2627 st_rt,
2628 dut,
2629 )
2630 )
2631 return errormsg
2632 else:
2633 nh_found = True
2634
2635 elif next_hop and fib is None:
2636 if type(next_hop) is not list:
2637 next_hop = [next_hop]
2638 found_hops = [
2639 rib_r["ip"]
2640 for rib_r in rib_routes_json[st_rt][0]["nexthops"]
2641 ]
2642
2643 if found_hops:
2644 missing_list_of_nexthops = set(
2645 found_hops
2646 ).difference(next_hop)
2647 additional_nexthops_in_required_nhs = set(
2648 next_hop
2649 ).difference(found_hops)
2650
2651 if additional_nexthops_in_required_nhs:
2652 logger.info(
2653 "Missing nexthop %s for route"
2654 " %s in RIB of router %s\n",
2655 additional_nexthops_in_required_nhs,
2656 st_rt,
2657 dut,
2658 )
2659 errormsg = (
2660 "Nexthop {} is Missing for "
2661 "route {} in RIB of router {}\n".format(
2662 additional_nexthops_in_required_nhs,
2663 st_rt,
2664 dut,
2665 )
2666 )
2667 return errormsg
2668 else:
2669 nh_found = True
2670
2671 if tag:
2672 if "tag" not in rib_routes_json[st_rt][0]:
2673 errormsg = (
2674 "[DUT: {}]: tag is not"
2675 " present for"
2676 " route {} in RIB \n".format(dut, st_rt)
2677 )
2678 return errormsg
2679
2680 if _tag != rib_routes_json[st_rt][0]["tag"]:
2681 errormsg = (
2682 "[DUT: {}]: tag value {}"
2683 " is not matched for"
2684 " route {} in RIB \n".format(dut, _tag, st_rt,)
2685 )
2686 return errormsg
2687
2688 if metric is not None:
2689 if "metric" not in rib_routes_json[st_rt][0]:
2690 errormsg = (
2691 "[DUT: {}]: metric is"
2692 " not present for"
2693 " route {} in RIB \n".format(dut, st_rt)
2694 )
2695 return errormsg
2696
2697 if metric != rib_routes_json[st_rt][0]["metric"]:
2698 errormsg = (
2699 "[DUT: {}]: metric value "
2700 "{} is not matched for "
2701 "route {} in RIB \n".format(dut, metric, st_rt,)
2702 )
2703 return errormsg
2704
2705 else:
2706 missing_routes.append(st_rt)
2707
2708 if nh_found:
2709 logger.info(
2710 "[DUT: {}]: Found next_hop {} for all bgp"
2711 " routes in RIB".format(router, next_hop)
2712 )
2713
2714 if len(missing_routes) > 0:
2715 errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
2716 dut, missing_routes
2717 )
2718 return errormsg
2719
2720 if found_routes:
2721 logger.info(
2722 "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
2723 dut,
2724 found_routes,
2725 )
2726
2727 continue
2728
2729 if "bgp" in input_dict[routerInput]:
2730 if (
2731 "advertise_networks"
2732 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
2733 "unicast"
2734 ]
2735 ):
2736 continue
2737
2738 found_routes = []
2739 missing_routes = []
2740 advertise_network = input_dict[routerInput]["bgp"]["address_family"][
2741 addr_type
2742 ]["unicast"]["advertise_networks"]
2743
2744 # Continue if there are no network advertise
2745 if len(advertise_network) == 0:
2746 continue
2747
2748 for advertise_network_dict in advertise_network:
2749 if "vrf" in advertise_network_dict:
2750 cmd = "{} vrf {} json".format(command, static_route["vrf"])
2751 else:
2752 cmd = "{} json".format(command)
2753
2754 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2755
2756 # Verifying output dictionary rib_routes_json is not empty
2757 if bool(rib_routes_json) is False:
2758 errormsg = "No route found in rib of router {}..".format(router)
2759 return errormsg
2760
2761 start_ip = advertise_network_dict["network"]
2762 if "no_of_network" in advertise_network_dict:
2763 no_of_network = advertise_network_dict["no_of_network"]
2764 else:
2765 no_of_network = 1
2766
2767 # Generating IPs for verification
2768 ip_list = generate_ips(start_ip, no_of_network)
2769 st_found = False
2770 nh_found = False
2771
2772 for st_rt in ip_list:
2773 st_rt = str(ipaddress.ip_network(unicode(st_rt)))
2774
2775 _addr_type = validate_ip_address(st_rt)
2776 if _addr_type != addr_type:
2777 continue
2778
2779 if st_rt in rib_routes_json:
2780 st_found = True
2781 found_routes.append(st_rt)
2782
2783 if next_hop:
2784 if type(next_hop) is not list:
2785 next_hop = [next_hop]
2786
2787 count = 0
2788 for nh in next_hop:
2789 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
2790 if nh_dict["ip"] != nh:
2791 continue
2792 else:
2793 count += 1
2794
2795 if count == len(next_hop):
2796 nh_found = True
2797 else:
2798 errormsg = (
2799 "Nexthop {} is Missing"
2800 " for route {} in "
2801 "RIB of router {}\n".format(next_hop, st_rt, dut)
2802 )
2803 return errormsg
2804 else:
2805 missing_routes.append(st_rt)
2806
2807 if nh_found:
2808 logger.info(
2809 "Found next_hop {} for all routes in RIB"
2810 " of router {}\n".format(next_hop, dut)
2811 )
2812
2813 if len(missing_routes) > 0:
2814 errormsg = (
2815 "Missing {} route in RIB of router {}, "
2816 "routes: {} \n".format(addr_type, dut, missing_routes)
2817 )
2818 return errormsg
2819
2820 if found_routes:
2821 logger.info(
2822 "Verified {} routes in router {} RIB, found"
2823 " routes are: {}\n".format(addr_type, dut, found_routes)
2824 )
2825
2826 logger.info("Exiting lib API: verify_rib()")
2827 return True
2828
2829
2830 def verify_admin_distance_for_static_routes(tgen, input_dict):
2831 """
2832 API to verify admin distance for static routes as defined in input_dict/
2833 input JSON by running show ip/ipv6 route json command.
2834 Parameter
2835 ---------
2836 * `tgen` : topogen object
2837 * `input_dict`: having details like - for which router and static routes
2838 admin dsitance needs to be verified
2839 Usage
2840 -----
2841 # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
2842 10.0.0.2 in router r1
2843 input_dict = {
2844 "r1": {
2845 "static_routes": [{
2846 "network": "10.0.20.1/32",
2847 "admin_distance": 10,
2848 "next_hop": "10.0.0.2"
2849 }]
2850 }
2851 }
2852 result = verify_admin_distance_for_static_routes(tgen, input_dict)
2853 Returns
2854 -------
2855 errormsg(str) or True
2856 """
2857
2858 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2859
2860 for router in input_dict.keys():
2861 if router not in tgen.routers():
2862 continue
2863
2864 rnode = tgen.routers()[router]
2865
2866 for static_route in input_dict[router]["static_routes"]:
2867 addr_type = validate_ip_address(static_route["network"])
2868 # Command to execute
2869 if addr_type == "ipv4":
2870 command = "show ip route json"
2871 else:
2872 command = "show ipv6 route json"
2873 show_ip_route_json = run_frr_cmd(rnode, command, isjson=True)
2874
2875 logger.info(
2876 "Verifying admin distance for static route %s" " under dut %s:",
2877 static_route,
2878 router,
2879 )
2880 network = static_route["network"]
2881 next_hop = static_route["next_hop"]
2882 admin_distance = static_route["admin_distance"]
2883 route_data = show_ip_route_json[network][0]
2884 if network in show_ip_route_json:
2885 if route_data["nexthops"][0]["ip"] == next_hop:
2886 if route_data["distance"] != admin_distance:
2887 errormsg = (
2888 "Verification failed: admin distance"
2889 " for static route {} under dut {},"
2890 " found:{} but expected:{}".format(
2891 static_route,
2892 router,
2893 route_data["distance"],
2894 admin_distance,
2895 )
2896 )
2897 return errormsg
2898 else:
2899 logger.info(
2900 "Verification successful: admin"
2901 " distance for static route %s under"
2902 " dut %s, found:%s",
2903 static_route,
2904 router,
2905 route_data["distance"],
2906 )
2907
2908 else:
2909 errormsg = (
2910 "Static route {} not found in "
2911 "show_ip_route_json for dut {}".format(network, router)
2912 )
2913 return errormsg
2914
2915 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2916 return True
2917
2918
2919 def verify_prefix_lists(tgen, input_dict):
2920 """
2921 Running "show ip prefix-list" command and verifying given prefix-list
2922 is present in router.
2923 Parameters
2924 ----------
2925 * `tgen` : topogen object
2926 * `input_dict`: data to verify prefix lists
2927 Usage
2928 -----
2929 # To verify pf_list_1 is present in router r1
2930 input_dict = {
2931 "r1": {
2932 "prefix_lists": ["pf_list_1"]
2933 }}
2934 result = verify_prefix_lists("ipv4", input_dict, tgen)
2935 Returns
2936 -------
2937 errormsg(str) or True
2938 """
2939
2940 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2941
2942 for router in input_dict.keys():
2943 if router not in tgen.routers():
2944 continue
2945
2946 rnode = tgen.routers()[router]
2947
2948 # Show ip prefix list
2949 show_prefix_list = run_frr_cmd(rnode, "show ip prefix-list")
2950
2951 # Verify Prefix list is deleted
2952 prefix_lists_addr = input_dict[router]["prefix_lists"]
2953 for addr_type in prefix_lists_addr:
2954 if not check_address_types(addr_type):
2955 continue
2956
2957 for prefix_list in prefix_lists_addr[addr_type].keys():
2958 if prefix_list in show_prefix_list:
2959 errormsg = (
2960 "Prefix list {} is/are present in the router"
2961 " {}".format(prefix_list, router)
2962 )
2963 return errormsg
2964
2965 logger.info(
2966 "Prefix list %s is/are not present in the router" " from router %s",
2967 prefix_list,
2968 router,
2969 )
2970
2971 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2972 return True
2973
2974
2975 @retry(attempts=2, wait=4, return_is_str=True, initial_wait=2)
2976 def verify_route_maps(tgen, input_dict):
2977 """
2978 Running "show route-map" command and verifying given route-map
2979 is present in router.
2980 Parameters
2981 ----------
2982 * `tgen` : topogen object
2983 * `input_dict`: data to verify prefix lists
2984 Usage
2985 -----
2986 # To verify rmap_1 and rmap_2 are present in router r1
2987 input_dict = {
2988 "r1": {
2989 "route_maps": ["rmap_1", "rmap_2"]
2990 }
2991 }
2992 result = verify_route_maps(tgen, input_dict)
2993 Returns
2994 -------
2995 errormsg(str) or True
2996 """
2997
2998 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2999
3000 for router in input_dict.keys():
3001 if router not in tgen.routers():
3002 continue
3003
3004 rnode = tgen.routers()[router]
3005 # Show ip route-map
3006 show_route_maps = rnode.vtysh_cmd("show route-map")
3007
3008 # Verify route-map is deleted
3009 route_maps = input_dict[router]["route_maps"]
3010 for route_map in route_maps:
3011 if route_map in show_route_maps:
3012 errormsg = "Route map {} is not deleted from router" " {}".format(
3013 route_map, router
3014 )
3015 return errormsg
3016
3017 logger.info(
3018 "Route map %s is/are deleted successfully from" " router %s",
3019 route_maps,
3020 router,
3021 )
3022
3023 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3024 return True
3025
3026
3027 @retry(attempts=3, wait=4, return_is_str=True)
3028 def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
3029 """
3030 API to veiryf BGP large community is attached in route for any given
3031 DUT by running "show bgp ipv4/6 {route address} json" command.
3032 Parameters
3033 ----------
3034 * `tgen`: topogen object
3035 * `addr_type` : ip type, ipv4/ipv6
3036 * `dut`: Device Under Test
3037 * `network`: network for which set criteria needs to be verified
3038 * `input_dict`: having details like - for which router, community and
3039 values needs to be verified
3040 Usage
3041 -----
3042 networks = ["200.50.2.0/32"]
3043 input_dict = {
3044 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
3045 }
3046 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
3047 Returns
3048 -------
3049 errormsg(str) or True
3050 """
3051
3052 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3053 if router not in tgen.routers():
3054 return False
3055
3056 rnode = tgen.routers()[router]
3057
3058 logger.debug(
3059 "Verifying BGP community attributes on dut %s: for %s " "network %s",
3060 router,
3061 addr_type,
3062 network,
3063 )
3064
3065 for net in network:
3066 cmd = "show bgp {} {} json".format(addr_type, net)
3067 show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True)
3068 logger.info(show_bgp_json)
3069 if "paths" not in show_bgp_json:
3070 return "Prefix {} not found in BGP table of router: {}".format(net, router)
3071
3072 as_paths = show_bgp_json["paths"]
3073 found = False
3074 for i in range(len(as_paths)):
3075 if (
3076 "largeCommunity" in show_bgp_json["paths"][i]
3077 or "community" in show_bgp_json["paths"][i]
3078 ):
3079 found = True
3080 logger.info(
3081 "Large Community attribute is found for route:" " %s in router: %s",
3082 net,
3083 router,
3084 )
3085 if input_dict is not None:
3086 for criteria, comm_val in input_dict.items():
3087 show_val = show_bgp_json["paths"][i][criteria]["string"]
3088 if comm_val == show_val:
3089 logger.info(
3090 "Verifying BGP %s for prefix: %s"
3091 " in router: %s, found expected"
3092 " value: %s",
3093 criteria,
3094 net,
3095 router,
3096 comm_val,
3097 )
3098 else:
3099 errormsg = (
3100 "Failed: Verifying BGP attribute"
3101 " {} for route: {} in router: {}"
3102 ", expected value: {} but found"
3103 ": {}".format(criteria, net, router, comm_val, show_val)
3104 )
3105 return errormsg
3106
3107 if not found:
3108 errormsg = (
3109 "Large Community attribute is not found for route: "
3110 "{} in router: {} ".format(net, router)
3111 )
3112 return errormsg
3113
3114 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3115 return True
3116
3117
3118 def verify_create_community_list(tgen, input_dict):
3119 """
3120 API is to verify if large community list is created for any given DUT in
3121 input_dict by running "sh bgp large-community-list {"comm_name"} detail"
3122 command.
3123 Parameters
3124 ----------
3125 * `tgen`: topogen object
3126 * `input_dict`: having details like - for which router, large community
3127 needs to be verified
3128 Usage
3129 -----
3130 input_dict = {
3131 "r1": {
3132 "large-community-list": {
3133 "standard": {
3134 "Test1": [{"action": "PERMIT", "attribute":\
3135 ""}]
3136 }}}}
3137 result = verify_create_community_list(tgen, input_dict)
3138 Returns
3139 -------
3140 errormsg(str) or True
3141 """
3142
3143 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3144
3145 for router in input_dict.keys():
3146 if router not in tgen.routers():
3147 continue
3148
3149 rnode = tgen.routers()[router]
3150
3151 logger.info("Verifying large-community is created for dut %s:", router)
3152
3153 for comm_data in input_dict[router]["bgp_community_lists"]:
3154 comm_name = comm_data["name"]
3155 comm_type = comm_data["community_type"]
3156 show_bgp_community = run_frr_cmd(
3157 rnode, "show bgp large-community-list {} detail".format(comm_name)
3158 )
3159
3160 # Verify community list and type
3161 if comm_name in show_bgp_community and comm_type in show_bgp_community:
3162 logger.info(
3163 "BGP %s large-community-list %s is" " created", comm_type, comm_name
3164 )
3165 else:
3166 errormsg = "BGP {} large-community-list {} is not" " created".format(
3167 comm_type, comm_name
3168 )
3169 return errormsg
3170
3171 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3172 return True
3173
3174
3175 def verify_cli_json(tgen, input_dict):
3176 """
3177 API to verify if JSON is available for clis
3178 command.
3179
3180 Parameters
3181 ----------
3182 * `tgen`: topogen object
3183 * `input_dict`: CLIs for which JSON needs to be verified
3184 Usage
3185 -----
3186 input_dict = {
3187 "edge1":{
3188 "cli": ["show evpn vni detail", show evpn rmac vni all]
3189 }
3190 }
3191
3192 result = verify_cli_json(tgen, input_dict)
3193
3194 Returns
3195 -------
3196 errormsg(str) or True
3197 """
3198
3199 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3200 for dut in input_dict.keys():
3201 rnode = tgen.routers()[dut]
3202
3203 for cli in input_dict[dut]["cli"]:
3204 logger.info(
3205 "[DUT: %s]: Verifying JSON is available for " "CLI %s :", dut, cli
3206 )
3207
3208 test_cli = "{} json".format(cli)
3209 ret_json = rnode.vtysh_cmd(test_cli, isjson=True)
3210 if not bool(ret_json):
3211 errormsg = "CLI: %s, JSON format is not available" % (cli)
3212 return errormsg
3213 elif "unknown" in ret_json or "Unknown" in ret_json:
3214 errormsg = "CLI: %s, JSON format is not available" % (cli)
3215 return errormsg
3216 else:
3217 logger.info(
3218 "CLI : %s JSON format is available: " "\n %s", cli, ret_json
3219 )
3220
3221 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3222
3223 return True
3224
3225
3226 @retry(attempts=2, wait=4, return_is_str=True, initial_wait=2)
3227 def verify_evpn_vni(tgen, input_dict):
3228 """
3229 API to verify evpn vni details using "show evpn vni detail json"
3230 command.
3231
3232 Parameters
3233 ----------
3234 * `tgen`: topogen object
3235 * `input_dict`: having details like - for which router, evpn details
3236 needs to be verified
3237 Usage
3238 -----
3239 input_dict = {
3240 "edge1":{
3241 "vni": [
3242 {
3243 "75100":{
3244 "vrf": "RED",
3245 "vxlanIntf": "vxlan75100",
3246 "localVtepIp": "120.1.1.1",
3247 "sviIntf": "br100"
3248 }
3249 }
3250 ]
3251 }
3252 }
3253
3254 result = verify_evpn_vni(tgen, input_dict)
3255
3256 Returns
3257 -------
3258 errormsg(str) or True
3259 """
3260
3261 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3262 for dut in input_dict.keys():
3263 rnode = tgen.routers()[dut]
3264
3265 logger.info("[DUT: %s]: Verifying evpn vni details :", dut)
3266
3267 cmd = "show evpn vni detail json"
3268 evpn_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
3269 if not bool(evpn_all_vni_json):
3270 errormsg = "No output for '{}' cli".format(cmd)
3271 return errormsg
3272
3273 if "vni" in input_dict[dut]:
3274 for vni_dict in input_dict[dut]["vni"]:
3275 found = False
3276 vni = vni_dict["name"]
3277 for evpn_vni_json in evpn_all_vni_json:
3278 if "vni" in evpn_vni_json:
3279 if evpn_vni_json["vni"] != int(vni):
3280 continue
3281
3282 for attribute in vni_dict.keys():
3283 if vni_dict[attribute] != evpn_vni_json[attribute]:
3284 errormsg = (
3285 "[DUT: %s] Verifying "
3286 "%s for VNI: %s [FAILED]||"
3287 ", EXPECTED : %s "
3288 " FOUND : %s"
3289 % (
3290 dut,
3291 attribute,
3292 vni,
3293 vni_dict[attribute],
3294 evpn_vni_json[attribute],
3295 )
3296 )
3297 return errormsg
3298
3299 else:
3300 found = True
3301 logger.info(
3302 "[DUT: %s] Verifying"
3303 " %s for VNI: %s , "
3304 "Found Expected : %s ",
3305 dut,
3306 attribute,
3307 vni,
3308 evpn_vni_json[attribute],
3309 )
3310
3311 if evpn_vni_json["state"] != "Up":
3312 errormsg = (
3313 "[DUT: %s] Failed: Verifying"
3314 " State for VNI: %s is not Up" % (dut, vni)
3315 )
3316 return errormsg
3317
3318 else:
3319 errormsg = (
3320 "[DUT: %s] Failed:"
3321 " VNI: %s is not present in JSON" % (dut, vni)
3322 )
3323 return errormsg
3324
3325 if found:
3326 logger.info(
3327 "[DUT %s]: Verifying VNI : %s "
3328 "details and state is Up [PASSED]!!",
3329 dut,
3330 vni,
3331 )
3332 return True
3333
3334 else:
3335 errormsg = (
3336 "[DUT: %s] Failed:" " vni details are not present in input data" % (dut)
3337 )
3338 return errormsg
3339
3340 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3341 return False
3342
3343
3344 @retry(attempts=2, wait=4, return_is_str=True, initial_wait=2)
3345 def verify_vrf_vni(tgen, input_dict):
3346 """
3347 API to verify vrf vni details using "show vrf vni json"
3348 command.
3349
3350 Parameters
3351 ----------
3352 * `tgen`: topogen object
3353 * `input_dict`: having details like - for which router, evpn details
3354 needs to be verified
3355 Usage
3356 -----
3357 input_dict = {
3358 "edge1":{
3359 "vrfs": [
3360 {
3361 "RED":{
3362 "vni": 75000,
3363 "vxlanIntf": "vxlan75100",
3364 "sviIntf": "br100",
3365 "routerMac": "00:80:48:ba:d1:00",
3366 "state": "Up"
3367 }
3368 }
3369 ]
3370 }
3371 }
3372
3373 result = verify_vrf_vni(tgen, input_dict)
3374
3375 Returns
3376 -------
3377 errormsg(str) or True
3378 """
3379
3380 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3381 for dut in input_dict.keys():
3382 rnode = tgen.routers()[dut]
3383
3384 logger.info("[DUT: %s]: Verifying vrf vni details :", dut)
3385
3386 cmd = "show vrf vni json"
3387 vrf_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
3388 if not bool(vrf_all_vni_json):
3389 errormsg = "No output for '{}' cli".format(cmd)
3390 return errormsg
3391
3392 if "vrfs" in input_dict[dut]:
3393 for vrfs in input_dict[dut]["vrfs"]:
3394 for vrf, vrf_dict in vrfs.items():
3395 found = False
3396 for vrf_vni_json in vrf_all_vni_json["vrfs"]:
3397 if "vrf" in vrf_vni_json:
3398 if vrf_vni_json["vrf"] != vrf:
3399 continue
3400
3401 for attribute in vrf_dict.keys():
3402 if vrf_dict[attribute] == vrf_vni_json[attribute]:
3403 found = True
3404 logger.info(
3405 "[DUT %s]: VRF: %s, "
3406 "verifying %s "
3407 ", Found Expected: %s "
3408 "[PASSED]!!",
3409 dut,
3410 vrf,
3411 attribute,
3412 vrf_vni_json[attribute],
3413 )
3414 else:
3415 errormsg = (
3416 "[DUT: %s] VRF: %s, "
3417 "verifying %s [FAILED!!] "
3418 ", EXPECTED : %s "
3419 ", FOUND : %s"
3420 % (
3421 dut,
3422 vrf,
3423 attribute,
3424 vrf_dict[attribute],
3425 vrf_vni_json[attribute],
3426 )
3427 )
3428 return errormsg
3429
3430 else:
3431 errormsg = "[DUT: %s] VRF: %s " "is not present in JSON" % (
3432 dut,
3433 vrf,
3434 )
3435 return errormsg
3436
3437 if found:
3438 logger.info(
3439 "[DUT %s] Verifying VRF: %s " " details [PASSED]!!",
3440 dut,
3441 vrf,
3442 )
3443 return True
3444
3445 else:
3446 errormsg = (
3447 "[DUT: %s] Failed:" " vrf details are not present in input data" % (dut)
3448 )
3449 return errormsg
3450
3451 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3452 return False